aboutsummaryrefslogtreecommitdiffstats
path: root/xpybar/config/myalsa.py
diff options
context:
space:
mode:
Diffstat (limited to 'xpybar/config/myalsa.py')
-rw-r--r--xpybar/config/myalsa.py260
1 files changed, 260 insertions, 0 deletions
diff --git a/xpybar/config/myalsa.py b/xpybar/config/myalsa.py
new file mode 100644
index 0000000..14b8a7b
--- /dev/null
+++ b/xpybar/config/myalsa.py
@@ -0,0 +1,260 @@
+# -*- python -*-
+from plugins.alsa import ALSA
+
+from common import *
+
+class MyALSA(Entry):
+ def __init__(self, *args, timeout = None, sleep = None, cards = -1, mixers = None, colours = {}, **kwargs): ## TODO support multiple cards and all mixers
+ self.colours = colours if colours is not None else {}
+ if timeout is not None:
+ self.timeout = timeout
+ else:
+ self.timeout = 5
+ if sleep is not None:
+ self.sleep = sleep
+ else:
+ self.sleep = 5
+ if mixers is None:
+ mixers = ('Master', 'PCM')
+ elif isinstance(mixers, str):
+ mixers = [mixers]
+ else:
+ mixers = mixers
+ if cards is None:
+ cards = [-1]
+ elif isinstance(cards, str) or isinstance(cards, int):
+ cards = [cards]
+ else:
+ cards = cards
+ self.alsa = []
+ for card in cards:
+ for mixer in mixers: ## TODO support by name (and 'default')
+ try:
+ if isinstance(mixer, tuple) or isinstance(mixer, list):
+ self.alsa.append([ALSA(card, m) for m in mixer])
+ else:
+ self.alsa.append(ALSA(card, mixer))
+ except:
+ pass
+ if True:
+ method = 3
+ try:
+ path = os.environ['PATH'].split(':')
+ for p in path:
+ p += '/bus'
+ if os.access(p, os.F_OK | os.X_OK, effective_ids = True):
+ method = 0
+ break
+ except:
+ pass
+ else:
+ try:
+ import posix_ipc
+ method = 1
+ except:
+ method = 3
+ try:
+ path = os.environ['PATH'].split(':')
+ for p in path:
+ p += '/cmdipc'
+ if os.access(p, os.F_OK | os.X_OK, effective_ids = True):
+ method = 2
+ break
+ except:
+ pass
+ self.sep_width = Bar.coloured_length(SEPARATOR)
+ self.get_volume()
+ self.broadcast_update = None
+ if method == 0:
+ self.create_broadcast_update_bus()
+ Entry.__init__(self, *args, **kwargs)
+ xasync((self.refresh_bus, self.refresh_posix_ipc, self.refresh_cmdipc, self.refresh_wait)[method], name = 'alsa')
+
+ def action(self, col, button, x, y):
+ mixer = 0
+ mixer_text = self.text.split(SEPARATOR)
+ while mixer < len(mixer_text): ## the limit is just a precaution
+ mixer_width = Bar.coloured_length(mixer_text[mixer])
+ if col >= mixer_width:
+ col -= mixer_width
+ if col < self.sep_width:
+ return
+ col -= self.sep_width
+ else:
+ break
+ mixer += 1
+ mixer_text = mixer_text[mixer]
+
+ channel = -1
+ mixer_text = mixer_text.split(': ')
+ mixer_label = ': '.join(mixer_text[:-1]) + ': '
+ mixer_text = mixer_text[-1].split(' ')
+ mixer_label = Bar.coloured_length(mixer_label)
+ if col >= mixer_label:
+ col -= mixer_label
+ while channel < len(mixer_text): ## the limit is just a precaution
+ channel += 1
+ channel_width = Bar.coloured_length(mixer_text[channel])
+ if col < channel_width:
+ break
+ col -= channel_width
+ if col < 1:
+ channel = -1
+ break
+ col -= 1
+
+ mixer = self.alsa[mixer]
+ if isinstance(mixer, list):
+ if button == LEFT_BUTTON:
+ self.switch_exclusive(mixer)
+ elif button == LEFT_BUTTON:
+ muted = mixer.get_mute()
+ muted = not (any(muted) if channel == -1 else muted[channel])
+ mixer.set_mute(muted, channel)
+ elif button == RIGHT_BUTTON:
+ volume = mixer.get_volume()
+ volume = limited(sum(volume) / len(volume))
+ mixer.set_volume(volume, -1)
+ elif button in (SCROLL_UP, SCROLL_DOWN):
+ volume = mixer.get_volume()
+ adj = -5 if button == SCROLL_DOWN else 5
+ if channel < 0:
+ for ch in range(len(volume)):
+ mixer.set_volume(limited(volume[ch] + adj), ch)
+ else:
+ mixer.set_volume(limited(volume[channel] + adj), channel)
+ else:
+ return
+ self.get_volume()
+ self.invalidate()
+ if self.broadcast_update is not None:
+ self.broadcast_update()
+
+ def switch_exclusive(self, mixers):
+ def f(status):
+ if all(status):
+ return True
+ elif any(status):
+ return None
+ else:
+ return False
+ muted = [f(m.get_mute()) for m in mixers]
+ count = sum((0 if m else 1) for m in muted)
+ if None in muted or count != 1:
+ index = 0
+ else:
+ [index] = [i for i, m in enumerate(muted) if not m]
+ index = (index + 1) % len(mixers)
+ for m in mixers:
+ m.set_mute(True)
+ mixers[index].set_mute(False)
+
+ def get_exclusive(self, mixers):
+ def f(status):
+ if all(status):
+ return True
+ elif any(status):
+ return None
+ else:
+ return False
+ muted = [f(m.get_mute()) for m in mixers]
+ count = sum((0 if m else 1) for m in muted)
+ if None in muted or count != 1:
+ index = 0
+ for m in mixers:
+ m.set_mute(True)
+ mixers[index].set_mute(False)
+ else:
+ [index] = [i for i, m in enumerate(muted) if not m]
+ name = mixers[index].mixername
+ if name in self.colours:
+ name = '\033[%sm%s\033[0m' % (self.colours[name], name)
+ return name
+
+ def get_volume(self):
+ text_v = lambda v : '--%' if v is None else ('%2i%%' % v)[:3]
+ read_m = lambda m : '%s: %s' % (m.mixername, ' '.join(text_v(v) for v in m.get_volume()))
+ text = SEPARATOR.join((self.get_exclusive(m) if isinstance(m, list) else read_m(m)) for m in self.alsa)
+ self.text = text
+
+ def refresh_bus(self):
+ if 'BUS_AUDIO' in os.environ:
+ bus = os.environ['BUS_AUDIO']
+ else:
+ bus = os.environ['XDG_RUNTIME_DIR'] + '/@bus/audio'
+ wait = pdeath('HUP', 'bus', 'wait', bus, 'true')
+ while True:
+ try:
+ spawn_read(*wait)
+ self.get_volume()
+ except:
+ time.sleep(self.sleep)
+
+ def create_broadcast_update_bus(self):
+ if 'BUS_AUDIO' in os.environ:
+ bus = os.environ['BUS_AUDIO']
+ else:
+ bus = os.environ['XDG_RUNTIME_DIR'] + '/@bus/audio'
+ self.bus_broadcast = pdeath('HUP', 'bus', 'broadcast', bus, '%i volume' % os.getpid())
+ self.broadcast_update = lambda : spawn_read(*(self.bus_broadcast))
+
+ def refresh_posix_ipc(self):
+ import posix_ipc
+ s = posix_ipc.Semaphore('/.xpybar.alsa.0', posix_ipc.O_CREAT, 0o600, 1)
+ c = posix_ipc.Semaphore('/.xpybar.alsa.1', posix_ipc.O_CREAT, 0o600, 0)
+ q = posix_ipc.Semaphore('/.xpybar.alsa.2', posix_ipc.O_CREAT, 0o600, 0)
+ try:
+ s.acquire(self.timeout)
+ while True:
+ failed = False
+ try:
+ c.release()
+ try:
+ s.release()
+ try:
+ q.acquire(timeout)
+ except posix_ipc.BusyError:
+ sys.exit(1)
+ pass
+ finally:
+ s.acquire(self.timeout)
+ finally:
+ c.acquire(self.timeout)
+ except:
+ sys.exit(1)
+ failed = True
+ try:
+ self.get_volume()
+ except:
+ sys.exit(1)
+ failed = True
+ if failed:
+ time.sleep(self.sleep)
+ finally:
+ s.release()
+
+ def refresh_cmdipc(self):
+ enter = pdeath('HUP', 'cmdipc', '-PCck', '/.xpybar.alsa.0/.xpybar.alsa.1/.xpybar.alsa.2', 'enter', '-b%i' % self.timeout)
+ wait = pdeath('HUP', 'cmdipc', '-PCk', '/.xpybar.alsa.0/.xpybar.alsa.1/.xpybar.alsa.2', 'wait', '-b%i' % self.timeout)
+ leave = pdeath('HUP', 'cmdipc', '-PCk', '/.xpybar.alsa.0/.xpybar.alsa.1/.xpybar.alsa.2', 'leave')
+ try:
+ spawn_read(*enter)
+ while True:
+ try:
+ spawn_read(*wait)
+ self.get_volume()
+ except:
+ time.sleep(self.sleep)
+ finally:
+ spawn_read(*leave)
+
+ def refresh_wait(self):
+ while True:
+ try:
+ time.sleep(self.sleep)
+ self.get_volume()
+ except:
+ pass
+
+ def function(self):
+ return self.text