# -*- python -*- from plugins.alsa import ALSA from common import * class MyALSA(Entry): def __init__(self, *args, timeout = None, sleep = None, cards = -1, mixers = None, colours = {}, **kwargs): ## TODO support multiple cards and all mixers self.colours = colours if colours is not None else {} if timeout is not None: self.timeout = timeout else: self.timeout = 5 if sleep is not None: self.sleep = sleep else: self.sleep = 5 if mixers is None: mixers = ('Master', 'PCM') elif isinstance(mixers, str): mixers = [mixers] else: mixers = mixers if cards is None: cards = [-1] elif isinstance(cards, str) or isinstance(cards, int): cards = [cards] else: cards = cards self.alsa = [] for card in cards: for mixer in mixers: ## TODO support by name (and 'default') try: if isinstance(mixer, tuple) or isinstance(mixer, list): self.alsa.append([ALSA(card, m) for m in mixer]) else: self.alsa.append(ALSA(card, mixer)) except: pass if True: method = 3 try: path = os.environ['PATH'].split(':') for p in path: p += '/bus' if os.access(p, os.F_OK | os.X_OK, effective_ids = True): method = 0 break except: pass else: try: import posix_ipc method = 1 except: method = 3 try: path = os.environ['PATH'].split(':') for p in path: p += '/cmdipc' if os.access(p, os.F_OK | os.X_OK, effective_ids = True): method = 2 break except: pass self.sep_width = Bar.coloured_length(SEPARATOR) self.get_volume() self.broadcast_update = None if method == 0: self.create_broadcast_update_bus() Entry.__init__(self, *args, **kwargs) xasync((self.refresh_bus, self.refresh_posix_ipc, self.refresh_cmdipc, self.refresh_wait)[method], name = 'alsa') def action(self, col, button, x, y): mixer = 0 mixer_text = self.text.split(SEPARATOR) while mixer < len(mixer_text): ## the limit is just a precaution mixer_width = Bar.coloured_length(mixer_text[mixer]) if col >= mixer_width: col -= mixer_width if col < self.sep_width: return col -= self.sep_width else: break mixer += 1 mixer_text = mixer_text[mixer] channel = -1 mixer_text = mixer_text.split(': ') mixer_label = ': '.join(mixer_text[:-1]) + ': ' mixer_text = mixer_text[-1].split(' ') mixer_label = Bar.coloured_length(mixer_label) if col >= mixer_label: col -= mixer_label while channel < len(mixer_text): ## the limit is just a precaution channel += 1 channel_width = Bar.coloured_length(mixer_text[channel]) if col < channel_width: break col -= channel_width if col < 1: channel = -1 break col -= 1 mixer = self.alsa[mixer] if isinstance(mixer, list): if button == LEFT_BUTTON: self.switch_exclusive(mixer) elif button == LEFT_BUTTON: muted = mixer.get_mute() muted = not (any(muted) if channel == -1 else muted[channel]) mixer.set_mute(muted, channel) elif button == RIGHT_BUTTON: volume = mixer.get_volume() volume = limited(sum(volume) / len(volume)) mixer.set_volume(volume, -1) elif button in (SCROLL_UP, SCROLL_DOWN): volume = mixer.get_volume() adj = -5 if button == SCROLL_DOWN else 5 if channel < 0: for ch in range(len(volume)): mixer.set_volume(limited(volume[ch] + adj), ch) else: mixer.set_volume(limited(volume[channel] + adj), channel) else: return self.get_volume() self.invalidate() if self.broadcast_update is not None: self.broadcast_update() def switch_exclusive(self, mixers): def f(status): if all(status): return True elif any(status): return None else: return False muted = [f(m.get_mute()) for m in mixers] count = sum((0 if m else 1) for m in muted) if None in muted or count != 1: index = 0 else: [index] = [i for i, m in enumerate(muted) if not m] index = (index + 1) % len(mixers) for m in mixers: m.set_mute(True) mixers[index].set_mute(False) def get_exclusive(self, mixers): def f(status): if all(status): return True elif any(status): return None else: return False muted = [f(m.get_mute()) for m in mixers] count = sum((0 if m else 1) for m in muted) if None in muted or count != 1: index = 0 for m in mixers: m.set_mute(True) mixers[index].set_mute(False) else: [index] = [i for i, m in enumerate(muted) if not m] name = mixers[index].mixername if name in self.colours: name = '\033[%sm%s\033[0m' % (self.colours[name], name) return name def get_volume(self): text_v = lambda v : '--%' if v is None else ('%2i%%' % v)[:3] read_m = lambda m : '%s: %s' % (m.mixername, ' '.join(text_v(v) for v in m.get_volume())) text = SEPARATOR.join((self.get_exclusive(m) if isinstance(m, list) else read_m(m)) for m in self.alsa) self.text = text def refresh_bus(self): if 'BUS_AUDIO' in os.environ: bus = os.environ['BUS_AUDIO'] else: bus = os.environ['XDG_RUNTIME_DIR'] + '/@bus/audio' wait = pdeath('HUP', 'bus', 'wait', bus, 'true') while True: try: spawn_read(*wait) self.get_volume() except: time.sleep(self.sleep) def create_broadcast_update_bus(self): if 'BUS_AUDIO' in os.environ: bus = os.environ['BUS_AUDIO'] else: bus = os.environ['XDG_RUNTIME_DIR'] + '/@bus/audio' self.bus_broadcast = pdeath('HUP', 'bus', 'broadcast', bus, '%i volume' % os.getpid()) self.broadcast_update = lambda : spawn_read(*(self.bus_broadcast)) def refresh_posix_ipc(self): import posix_ipc s = posix_ipc.Semaphore('/.xpybar.alsa.0', posix_ipc.O_CREAT, 0o600, 1) c = posix_ipc.Semaphore('/.xpybar.alsa.1', posix_ipc.O_CREAT, 0o600, 0) q = posix_ipc.Semaphore('/.xpybar.alsa.2', posix_ipc.O_CREAT, 0o600, 0) try: s.acquire(self.timeout) while True: failed = False try: c.release() try: s.release() try: q.acquire(timeout) except posix_ipc.BusyError: sys.exit(1) pass finally: s.acquire(self.timeout) finally: c.acquire(self.timeout) except: sys.exit(1) failed = True try: self.get_volume() except: sys.exit(1) failed = True if failed: time.sleep(self.sleep) finally: s.release() def refresh_cmdipc(self): enter = pdeath('HUP', 'cmdipc', '-PCck', '/.xpybar.alsa.0/.xpybar.alsa.1/.xpybar.alsa.2', 'enter', '-b%i' % self.timeout) wait = pdeath('HUP', 'cmdipc', '-PCk', '/.xpybar.alsa.0/.xpybar.alsa.1/.xpybar.alsa.2', 'wait', '-b%i' % self.timeout) leave = pdeath('HUP', 'cmdipc', '-PCk', '/.xpybar.alsa.0/.xpybar.alsa.1/.xpybar.alsa.2', 'leave') try: spawn_read(*enter) while True: try: spawn_read(*wait) self.get_volume() except: time.sleep(self.sleep) finally: spawn_read(*leave) def refresh_wait(self): while True: try: time.sleep(self.sleep) self.get_volume() except: pass def function(self): return self.text