# -*- python -*-
from plugins.alsa import ALSA
import alsaaudio
from common import *
class MixerAction:
def __init__(self, name, action):
self.name = name
self.action = action
class MyALSA(Entry):
def __init__(self, *args, timeout = None, sleep = None, cards = -1, mixers = None, colours = {}, prefix = '', **kwargs): ## TODO support multiple cards and all mixers
self.colours = colours if colours is not None else {}
if timeout is not None:
self.timeout = timeout
else:
self.timeout = 5
if sleep is not None:
self.sleep = sleep
else:
self.sleep = 5
if mixers is None:
mixers = ('Master', 'PCM')
elif isinstance(mixers, str):
mixers = [mixers]
else:
mixers = mixers
if cards is None:
cards = [-1]
elif isinstance(cards, str) or isinstance(cards, int):
cards = [cards]
else:
cards = cards
self.alsa = []
self.actions = []
names = {}
for index in alsaaudio.card_indexes():
for name in alsaaudio.card_name(index):
names[name] = index
for card in cards:
card = names.get(card, card)
for mixer in mixers: ## TODO support by name (and 'default')
try:
if isinstance(mixer, MixerAction):
self.actions.append(mixer.action)
mixer = mixer.name
else:
self.actions.append(None)
if isinstance(mixer, tuple) or isinstance(mixer, list):
self.alsa.append([ALSA(card, m) for m in mixer])
else:
self.alsa.append(ALSA(card, mixer))
except:
pass
if True:
method = 3
try:
path = os.environ['PATH'].split(':')
for p in path:
p += '/bus'
if os.access(p, os.F_OK | os.X_OK, effective_ids = True):
method = 0
break
except:
pass
else:
try:
import posix_ipc
method = 1
except:
method = 3
try:
path = os.environ['PATH'].split(':')
for p in path:
p += '/cmdipc'
if os.access(p, os.F_OK | os.X_OK, effective_ids = True):
method = 2
break
except:
pass
self.prefix = prefix
self.sep_width = Bar.coloured_length(SEPARATOR)
self.get_volume()
self.broadcast_update = None
if method == 0:
self.create_broadcast_update_bus()
Entry.__init__(self, *args, **kwargs)
xasync((self.refresh_bus, self.refresh_posix_ipc, self.refresh_cmdipc, self.refresh_wait)[method], name = 'alsa')
def action(self, col, button, x, y):
if not self.alsa:
return
mixer = 0
mixer_text = self.text.split(SEPARATOR)
while mixer < len(mixer_text): ## the limit is just a precaution
mixer_width = Bar.coloured_length(mixer_text[mixer])
if col >= mixer_width:
col -= mixer_width
if col < self.sep_width:
return
col -= self.sep_width
else:
break
mixer += 1
mixer_text = mixer_text[mixer]
channel = -1
mixer_text = mixer_text.split(': ')
mixer_label = ': '.join(mixer_text[:-1]) + ': '
mixer_text = mixer_text[-1].split(' ')
mixer_label = Bar.coloured_length(mixer_label)
if col >= mixer_label:
col -= mixer_label
while channel < len(mixer_text): ## the limit is just a precaution
channel += 1
channel_width = Bar.coloured_length(mixer_text[channel])
if col < channel_width:
break
col -= channel_width
if col < 1:
channel = -1
break
col -= 1
action = self.actions[mixer]
mixer = self.alsa[mixer]
if isinstance(mixer, list):
if button == LEFT_BUTTON:
self.switch_exclusive(mixer)
elif button == LEFT_BUTTON:
muted = mixer.get_mute()
muted = not (any(muted) if channel == -1 else muted[channel])
mixer.set_mute(muted, channel)
elif button == RIGHT_BUTTON:
volume = mixer.get_volume()
volume = limited(sum(volume) / len(volume))
mixer.set_volume(volume, -1)
elif button == MIDDLE_BUTTON:
if action is not None:
print('middle click: ' + str(action is not None))
action()
elif button in (SCROLL_UP, SCROLL_DOWN):
volume = mixer.get_volume()
adj = -5 if button == SCROLL_DOWN else 5
if channel < 0:
for ch in range(len(volume)):
mixer.set_volume(limited(volume[ch] + adj), ch)
else:
mixer.set_volume(limited(volume[channel] + adj), channel)
else:
return
self.get_volume()
self.invalidate()
if self.broadcast_update is not None:
self.broadcast_update()
def switch_exclusive(self, mixers):
def f(status):
if all(status):
return True
elif any(status):
return None
else:
return False
muted = [f(m.get_mute()) for m in mixers]
count = sum((0 if m else 1) for m in muted)
if None in muted or count != 1:
index = 0
else:
[index] = [i for i, m in enumerate(muted) if not m]
index = (index + 1) % len(mixers)
for m in mixers:
m.set_mute(True)
mixers[index].set_mute(False)
def get_exclusive(self, mixers):
def f(status):
if all(status):
return True
elif any(status):
return None
else:
return False
muted = [f(m.get_mute()) for m in mixers]
count = sum((0 if m else 1) for m in muted)
if None in muted or count != 1:
index = 0
for m in mixers:
m.set_mute(True)
mixers[index].set_mute(False)
else:
[index] = [i for i, m in enumerate(muted) if not m]
name = mixers[index].mixername
if name in self.colours:
name = '\033[%sm%s\033[0m' % (self.colours[name], name)
return name
def get_volume(self):
text_v = lambda v : '--%' if v is None else ('%2i%%' % v)[:3]
read_m = lambda m : '%s%s: %s' % (self.prefix, m.mixername, ' '.join(text_v(v) for v in m.get_volume()))
text = SEPARATOR.join((self.get_exclusive(m) if isinstance(m, list) else read_m(m)) for m in self.alsa)
if not text:
text = '%s%s' % (self.prefix, 'disconnected')
self.text = text
def refresh_bus(self):
if 'BUS_AUDIO' in os.environ:
bus = os.environ['BUS_AUDIO']
else:
bus = os.environ['XDG_RUNTIME_DIR'] + '/@bus/audio'
wait = pdeath('HUP', 'bus', 'wait', bus, 'true')
while True:
try:
spawn_read(*wait)
self.get_volume()
except:
time.sleep(self.sleep)
def create_broadcast_update_bus(self):
if 'BUS_AUDIO' in os.environ:
bus = os.environ['BUS_AUDIO']
else:
bus = os.environ['XDG_RUNTIME_DIR'] + '/@bus/audio'
self.bus_broadcast = pdeath('HUP', 'bus', 'broadcast', bus, '%i volume' % os.getpid())
self.broadcast_update = lambda : spawn_read(*(self.bus_broadcast))
def refresh_posix_ipc(self):
import posix_ipc
s = posix_ipc.Semaphore('/.xpybar.alsa.0', posix_ipc.O_CREAT, 0o600, 1)
c = posix_ipc.Semaphore('/.xpybar.alsa.1', posix_ipc.O_CREAT, 0o600, 0)
q = posix_ipc.Semaphore('/.xpybar.alsa.2', posix_ipc.O_CREAT, 0o600, 0)
try:
s.acquire(self.timeout)
while True:
failed = False
try:
c.release()
try:
s.release()
try:
q.acquire(timeout)
except posix_ipc.BusyError:
sys.exit(1)
pass
finally:
s.acquire(self.timeout)
finally:
c.acquire(self.timeout)
except:
sys.exit(1)
failed = True
try:
self.get_volume()
except:
sys.exit(1)
failed = True
if failed:
time.sleep(self.sleep)
finally:
s.release()
def refresh_cmdipc(self):
enter = pdeath('HUP', 'cmdipc', '-PCck', '/.xpybar.alsa.0/.xpybar.alsa.1/.xpybar.alsa.2', 'enter', '-b%i' % self.timeout)
wait = pdeath('HUP', 'cmdipc', '-PCk', '/.xpybar.alsa.0/.xpybar.alsa.1/.xpybar.alsa.2', 'wait', '-b%i' % self.timeout)
leave = pdeath('HUP', 'cmdipc', '-PCk', '/.xpybar.alsa.0/.xpybar.alsa.1/.xpybar.alsa.2', 'leave')
try:
spawn_read(*enter)
while True:
try:
spawn_read(*wait)
self.get_volume()
except:
time.sleep(self.sleep)
finally:
spawn_read(*leave)
def refresh_wait(self):
while True:
try:
time.sleep(self.sleep)
self.get_volume()
except:
pass
def function(self):
return self.text