diff options
Diffstat (limited to '')
-rw-r--r-- | xpybar/config/myalsa.py | 260 |
1 files changed, 260 insertions, 0 deletions
diff --git a/xpybar/config/myalsa.py b/xpybar/config/myalsa.py new file mode 100644 index 0000000..14b8a7b --- /dev/null +++ b/xpybar/config/myalsa.py @@ -0,0 +1,260 @@ +# -*- python -*- +from plugins.alsa import ALSA + +from common import * + +class MyALSA(Entry): + def __init__(self, *args, timeout = None, sleep = None, cards = -1, mixers = None, colours = {}, **kwargs): ## TODO support multiple cards and all mixers + self.colours = colours if colours is not None else {} + if timeout is not None: + self.timeout = timeout + else: + self.timeout = 5 + if sleep is not None: + self.sleep = sleep + else: + self.sleep = 5 + if mixers is None: + mixers = ('Master', 'PCM') + elif isinstance(mixers, str): + mixers = [mixers] + else: + mixers = mixers + if cards is None: + cards = [-1] + elif isinstance(cards, str) or isinstance(cards, int): + cards = [cards] + else: + cards = cards + self.alsa = [] + for card in cards: + for mixer in mixers: ## TODO support by name (and 'default') + try: + if isinstance(mixer, tuple) or isinstance(mixer, list): + self.alsa.append([ALSA(card, m) for m in mixer]) + else: + self.alsa.append(ALSA(card, mixer)) + except: + pass + if True: + method = 3 + try: + path = os.environ['PATH'].split(':') + for p in path: + p += '/bus' + if os.access(p, os.F_OK | os.X_OK, effective_ids = True): + method = 0 + break + except: + pass + else: + try: + import posix_ipc + method = 1 + except: + method = 3 + try: + path = os.environ['PATH'].split(':') + for p in path: + p += '/cmdipc' + if os.access(p, os.F_OK | os.X_OK, effective_ids = True): + method = 2 + break + except: + pass + self.sep_width = Bar.coloured_length(SEPARATOR) + self.get_volume() + self.broadcast_update = None + if method == 0: + self.create_broadcast_update_bus() + Entry.__init__(self, *args, **kwargs) + xasync((self.refresh_bus, self.refresh_posix_ipc, self.refresh_cmdipc, self.refresh_wait)[method], name = 'alsa') + + def action(self, col, button, x, y): + mixer = 0 + mixer_text = self.text.split(SEPARATOR) + while mixer < len(mixer_text): ## the limit is just a precaution + mixer_width = Bar.coloured_length(mixer_text[mixer]) + if col >= mixer_width: + col -= mixer_width + if col < self.sep_width: + return + col -= self.sep_width + else: + break + mixer += 1 + mixer_text = mixer_text[mixer] + + channel = -1 + mixer_text = mixer_text.split(': ') + mixer_label = ': '.join(mixer_text[:-1]) + ': ' + mixer_text = mixer_text[-1].split(' ') + mixer_label = Bar.coloured_length(mixer_label) + if col >= mixer_label: + col -= mixer_label + while channel < len(mixer_text): ## the limit is just a precaution + channel += 1 + channel_width = Bar.coloured_length(mixer_text[channel]) + if col < channel_width: + break + col -= channel_width + if col < 1: + channel = -1 + break + col -= 1 + + mixer = self.alsa[mixer] + if isinstance(mixer, list): + if button == LEFT_BUTTON: + self.switch_exclusive(mixer) + elif button == LEFT_BUTTON: + muted = mixer.get_mute() + muted = not (any(muted) if channel == -1 else muted[channel]) + mixer.set_mute(muted, channel) + elif button == RIGHT_BUTTON: + volume = mixer.get_volume() + volume = limited(sum(volume) / len(volume)) + mixer.set_volume(volume, -1) + elif button in (SCROLL_UP, SCROLL_DOWN): + volume = mixer.get_volume() + adj = -5 if button == SCROLL_DOWN else 5 + if channel < 0: + for ch in range(len(volume)): + mixer.set_volume(limited(volume[ch] + adj), ch) + else: + mixer.set_volume(limited(volume[channel] + adj), channel) + else: + return + self.get_volume() + self.invalidate() + if self.broadcast_update is not None: + self.broadcast_update() + + def switch_exclusive(self, mixers): + def f(status): + if all(status): + return True + elif any(status): + return None + else: + return False + muted = [f(m.get_mute()) for m in mixers] + count = sum((0 if m else 1) for m in muted) + if None in muted or count != 1: + index = 0 + else: + [index] = [i for i, m in enumerate(muted) if not m] + index = (index + 1) % len(mixers) + for m in mixers: + m.set_mute(True) + mixers[index].set_mute(False) + + def get_exclusive(self, mixers): + def f(status): + if all(status): + return True + elif any(status): + return None + else: + return False + muted = [f(m.get_mute()) for m in mixers] + count = sum((0 if m else 1) for m in muted) + if None in muted or count != 1: + index = 0 + for m in mixers: + m.set_mute(True) + mixers[index].set_mute(False) + else: + [index] = [i for i, m in enumerate(muted) if not m] + name = mixers[index].mixername + if name in self.colours: + name = '\033[%sm%s\033[0m' % (self.colours[name], name) + return name + + def get_volume(self): + text_v = lambda v : '--%' if v is None else ('%2i%%' % v)[:3] + read_m = lambda m : '%s: %s' % (m.mixername, ' '.join(text_v(v) for v in m.get_volume())) + text = SEPARATOR.join((self.get_exclusive(m) if isinstance(m, list) else read_m(m)) for m in self.alsa) + self.text = text + + def refresh_bus(self): + if 'BUS_AUDIO' in os.environ: + bus = os.environ['BUS_AUDIO'] + else: + bus = os.environ['XDG_RUNTIME_DIR'] + '/@bus/audio' + wait = pdeath('HUP', 'bus', 'wait', bus, 'true') + while True: + try: + spawn_read(*wait) + self.get_volume() + except: + time.sleep(self.sleep) + + def create_broadcast_update_bus(self): + if 'BUS_AUDIO' in os.environ: + bus = os.environ['BUS_AUDIO'] + else: + bus = os.environ['XDG_RUNTIME_DIR'] + '/@bus/audio' + self.bus_broadcast = pdeath('HUP', 'bus', 'broadcast', bus, '%i volume' % os.getpid()) + self.broadcast_update = lambda : spawn_read(*(self.bus_broadcast)) + + def refresh_posix_ipc(self): + import posix_ipc + s = posix_ipc.Semaphore('/.xpybar.alsa.0', posix_ipc.O_CREAT, 0o600, 1) + c = posix_ipc.Semaphore('/.xpybar.alsa.1', posix_ipc.O_CREAT, 0o600, 0) + q = posix_ipc.Semaphore('/.xpybar.alsa.2', posix_ipc.O_CREAT, 0o600, 0) + try: + s.acquire(self.timeout) + while True: + failed = False + try: + c.release() + try: + s.release() + try: + q.acquire(timeout) + except posix_ipc.BusyError: + sys.exit(1) + pass + finally: + s.acquire(self.timeout) + finally: + c.acquire(self.timeout) + except: + sys.exit(1) + failed = True + try: + self.get_volume() + except: + sys.exit(1) + failed = True + if failed: + time.sleep(self.sleep) + finally: + s.release() + + def refresh_cmdipc(self): + enter = pdeath('HUP', 'cmdipc', '-PCck', '/.xpybar.alsa.0/.xpybar.alsa.1/.xpybar.alsa.2', 'enter', '-b%i' % self.timeout) + wait = pdeath('HUP', 'cmdipc', '-PCk', '/.xpybar.alsa.0/.xpybar.alsa.1/.xpybar.alsa.2', 'wait', '-b%i' % self.timeout) + leave = pdeath('HUP', 'cmdipc', '-PCk', '/.xpybar.alsa.0/.xpybar.alsa.1/.xpybar.alsa.2', 'leave') + try: + spawn_read(*enter) + while True: + try: + spawn_read(*wait) + self.get_volume() + except: + time.sleep(self.sleep) + finally: + spawn_read(*leave) + + def refresh_wait(self): + while True: + try: + time.sleep(self.sleep) + self.get_volume() + except: + pass + + def function(self): + return self.text |