aboutsummaryrefslogblamecommitdiffstats
path: root/xpybar/config/myalsa.py
blob: 14b8a7bfc8cd2ed029db547e08ce88a356e21dce (plain) (tree)



































































































































































































































































                                                                                                                                                             
# -*- python -*-
from plugins.alsa import ALSA

from common import *

class MyALSA(Entry):
    def __init__(self, *args, timeout = None, sleep = None, cards = -1, mixers = None, colours = {}, **kwargs): ## TODO support multiple cards and all mixers
        self.colours = colours if colours is not None else {}
        if timeout is not None:
            self.timeout = timeout
        else:
            self.timeout = 5
        if sleep is not None:
            self.sleep = sleep
        else:
            self.sleep = 5
        if mixers is None:
            mixers = ('Master', 'PCM')
        elif isinstance(mixers, str):
            mixers = [mixers]
        else:
            mixers = mixers
        if cards is None:
            cards = [-1]
        elif isinstance(cards, str) or isinstance(cards, int):
            cards = [cards]
        else:
            cards = cards
        self.alsa = []
        for card in cards:
            for mixer in mixers: ## TODO support by name (and 'default')
                try:
                    if isinstance(mixer, tuple) or isinstance(mixer, list):
                        self.alsa.append([ALSA(card, m) for m in mixer])
                    else:
                        self.alsa.append(ALSA(card, mixer))
                except:
                    pass
        if True:
            method = 3
            try:
                path = os.environ['PATH'].split(':')
                for p in path:
                    p += '/bus'
                    if os.access(p, os.F_OK | os.X_OK, effective_ids = True):
                        method = 0
                        break
            except:
                pass
        else:
            try:
                import posix_ipc
                method = 1
            except:
                method = 3
                try:
                    path = os.environ['PATH'].split(':')
                    for p in path:
                        p += '/cmdipc'
                        if os.access(p, os.F_OK | os.X_OK, effective_ids = True):
                            method = 2
                            break
                except:
                    pass
        self.sep_width = Bar.coloured_length(SEPARATOR)
        self.get_volume()
        self.broadcast_update = None
        if method == 0:
            self.create_broadcast_update_bus()
        Entry.__init__(self, *args, **kwargs)
        xasync((self.refresh_bus, self.refresh_posix_ipc, self.refresh_cmdipc, self.refresh_wait)[method], name = 'alsa')
    
    def action(self, col, button, x, y):
        mixer = 0
        mixer_text = self.text.split(SEPARATOR)
        while mixer < len(mixer_text): ## the limit is just a precaution
            mixer_width = Bar.coloured_length(mixer_text[mixer])
            if col >= mixer_width:
                col -= mixer_width
                if col < self.sep_width:
                    return
                col -= self.sep_width
            else:
                break
            mixer += 1
        mixer_text = mixer_text[mixer]
        
        channel = -1
        mixer_text  = mixer_text.split(': ')
        mixer_label = ': '.join(mixer_text[:-1]) + ': '
        mixer_text  = mixer_text[-1].split(' ')
        mixer_label = Bar.coloured_length(mixer_label)
        if col >= mixer_label:
            col -= mixer_label
            while channel < len(mixer_text): ## the limit is just a precaution
                channel += 1
                channel_width = Bar.coloured_length(mixer_text[channel])
                if col < channel_width:
                    break
                col -= channel_width
                if col < 1:
                    channel = -1
                    break
                col -= 1
        
        mixer = self.alsa[mixer]
        if isinstance(mixer, list):
            if button == LEFT_BUTTON:
                self.switch_exclusive(mixer)
        elif button == LEFT_BUTTON:
            muted = mixer.get_mute()
            muted = not (any(muted) if channel == -1 else muted[channel])
            mixer.set_mute(muted, channel)
        elif button == RIGHT_BUTTON:
            volume = mixer.get_volume()
            volume = limited(sum(volume) / len(volume))
            mixer.set_volume(volume, -1)
        elif button in (SCROLL_UP, SCROLL_DOWN):
            volume = mixer.get_volume()
            adj = -5 if button == SCROLL_DOWN else 5
            if channel < 0:
                for ch in range(len(volume)):
                    mixer.set_volume(limited(volume[ch] + adj), ch)
            else:
                mixer.set_volume(limited(volume[channel] + adj), channel)
        else:
            return
        self.get_volume()
        self.invalidate()
        if self.broadcast_update is not None:
            self.broadcast_update()
    
    def switch_exclusive(self, mixers):
        def f(status):
            if all(status):
                return True
            elif any(status):
                return None
            else:
                return False
        muted = [f(m.get_mute()) for m in mixers]
        count = sum((0 if m else 1) for m in muted)
        if None in muted or count != 1:
            index = 0
        else:
            [index] = [i for i, m in enumerate(muted) if not m]
            index = (index + 1) % len(mixers)
        for m in mixers:
            m.set_mute(True)
        mixers[index].set_mute(False)
    
    def get_exclusive(self, mixers):
        def f(status):
            if all(status):
                return True
            elif any(status):
                return None
            else:
                return False
        muted = [f(m.get_mute()) for m in mixers]
        count = sum((0 if m else 1) for m in muted)
        if None in muted or count != 1:
            index = 0
            for m in mixers:
                m.set_mute(True)
            mixers[index].set_mute(False)
        else:
            [index] = [i for i, m in enumerate(muted) if not m]
        name = mixers[index].mixername
        if name in self.colours:
            name = '\033[%sm%s\033[0m' % (self.colours[name], name)
        return name
    
    def get_volume(self):
        text_v = lambda v : '--%' if v is None else ('%2i%%' % v)[:3]
        read_m = lambda m : '%s: %s' % (m.mixername, ' '.join(text_v(v) for v in m.get_volume()))
        text = SEPARATOR.join((self.get_exclusive(m) if isinstance(m, list) else read_m(m)) for m in self.alsa)
        self.text = text
    
    def refresh_bus(self):
        if 'BUS_AUDIO' in os.environ:
            bus = os.environ['BUS_AUDIO']
        else:
            bus = os.environ['XDG_RUNTIME_DIR'] + '/@bus/audio'
        wait = pdeath('HUP', 'bus', 'wait', bus, 'true')
        while True:
            try:
                spawn_read(*wait)
                self.get_volume()
            except:
                time.sleep(self.sleep)
    
    def create_broadcast_update_bus(self):
        if 'BUS_AUDIO' in os.environ:
            bus = os.environ['BUS_AUDIO']
        else:
            bus = os.environ['XDG_RUNTIME_DIR'] + '/@bus/audio'
        self.bus_broadcast = pdeath('HUP', 'bus', 'broadcast', bus, '%i volume' % os.getpid())
        self.broadcast_update = lambda : spawn_read(*(self.bus_broadcast))
    
    def refresh_posix_ipc(self):
        import posix_ipc
        s = posix_ipc.Semaphore('/.xpybar.alsa.0', posix_ipc.O_CREAT, 0o600, 1)
        c = posix_ipc.Semaphore('/.xpybar.alsa.1', posix_ipc.O_CREAT, 0o600, 0)
        q = posix_ipc.Semaphore('/.xpybar.alsa.2', posix_ipc.O_CREAT, 0o600, 0)
        try:
            s.acquire(self.timeout)
            while True:
                failed = False
                try:
                    c.release()
                    try:
                        s.release()
                        try:
                            q.acquire(timeout)
                        except posix_ipc.BusyError:
                            sys.exit(1)
                            pass
                        finally:
                            s.acquire(self.timeout)
                    finally:
                        c.acquire(self.timeout)
                except:
                    sys.exit(1)
                    failed = True
                try:
                    self.get_volume()
                except:
                    sys.exit(1)
                    failed = True
                if failed:
                    time.sleep(self.sleep)
        finally:
            s.release()
    
    def refresh_cmdipc(self):
        enter = pdeath('HUP', 'cmdipc', '-PCck', '/.xpybar.alsa.0/.xpybar.alsa.1/.xpybar.alsa.2', 'enter', '-b%i' % self.timeout)
        wait  = pdeath('HUP', 'cmdipc', '-PCk',  '/.xpybar.alsa.0/.xpybar.alsa.1/.xpybar.alsa.2', 'wait',  '-b%i' % self.timeout)
        leave = pdeath('HUP', 'cmdipc', '-PCk',  '/.xpybar.alsa.0/.xpybar.alsa.1/.xpybar.alsa.2', 'leave')
        try:
            spawn_read(*enter)
            while True:
                try:
                    spawn_read(*wait)
                    self.get_volume()
                except:
                    time.sleep(self.sleep)
        finally:
            spawn_read(*leave)
    
    def refresh_wait(self):
        while True:
            try:
                time.sleep(self.sleep)
                self.get_volume()
            except:
                pass
    
    def function(self):
        return self.text