From d406ae4c144d550f4468cf1e3f0fcf21d81ee2cd Mon Sep 17 00:00:00 2001 From: Mattias Andrée Date: Sun, 29 Mar 2015 03:22:42 +0200 Subject: new examples/moderate MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Mattias Andrée --- examples/moderate | 901 +++++++++++++++++++++++++++++++++++++++++------------- 1 file changed, 689 insertions(+), 212 deletions(-) (limited to 'examples/moderate') diff --git a/examples/moderate b/examples/moderate index 905fe01..00f0f6b 100644 --- a/examples/moderate +++ b/examples/moderate @@ -1,241 +1,718 @@ # -*- python -*- -# A moderate xpybar configuration example that has a few monitors that are updates continuously +# A moderate xpybar configuration example that has a few monitors +# that are updates continuously, and rat support -import os import time +import threading -from plugins.clock import Clock -from plugins.cpu import CPU -from plugins.mem import Memory +from util import * + +from plugins.clock import Clock +from plugins.cpu import CPU +from plugins.mem import Memory from plugins.network import Network -from plugins.users import Users -from plugins.pacman import Pacman -from plugins.uname import Uname -from plugins.weather import Weather -from plugins.chase import Chase - - -OUTPUT, YPOS, TOP = 0, 24, True - -clock_ = Clock(format = '%Y-(%m)%b-%d %T, %a w%V, %Z', sync_to = 0.5) - - -def memory(): - memory_ = Memory() - def colourise(value, format = '%.0f'): - colour = '39' - if value > 30: colour = '32' - if value > 50: colour = '33' - if value > 80: colour = '31' - return '\033[%sm%s\033[0m%%' % (colour, format % value) - mem = 'Mem: ' + colourise(memory_.mem_used * 100 / memory_.mem_total) - swp = 'Swp: ' + colourise(memory_.swap_used * 100 / memory_.swap_total) - shm = 'Shm: ' + colourise(memory_.shmem * 100 / memory_.mem_total) - return '%s │ %s │ %s' % (mem, swp, shm) - - -def users(): - users_ = Users().users - you = os.environ['USER'] - def colour_user(user): - if user == 'root': return '\033[31m%s\033[39m' - elif not user == you: return '\033[33m%s\033[39m' - else: return '%s' - users = ['%s{%i}' % (colour_user(u) % u, len(users_[u])) for u in users_.keys()] - users = 'Users: %s' % (' '.join(users)) - return users - - -have_linux_libre, have_pacman = True, None -linux_installed, linux_latest = None, None -def uname(): - global have_linux_libre, have_pacman, linux_installed, linux_latest - - uname_ = Uname() - nodename = uname_.nodename - kernel_release = uname_.kernel_release - operating_system = uname_.operating_system +from plugins.ping import Ping +from plugins.alsa import ALSA +from plugins.moc import MOC + +import Xlib.protocol.event + + + +OUTPUT = 0 +''' +:int The index of the monitor that the panel is displayed on +''' + +YPOS = 24 +''' +:int The panels position relative to the upper or lower edge + of the monitor (which is determined by `TOP`) +''' + +TOP = True +''' +:bool True: `YPOS` is relative to the upper edge + False: `YPOS` is relative to the lower edge +''' + + + +TICKS_PER_SECOND = 4 +''' +:float The number of times per second the panel is updated +''' + + +time_format = '%Y-(%m)%b-%d %T, %a w%V, %Z' +''' +:str The format the clock is displayed in +''' + +my_clock = Clock(format = time_format, utc = False, + sync_to = Clock.SECONDS / TICKS_PER_SECOND) +''' +:Clock The clock monitor +''' + +my_cpu = '...' +''' +:str The output of the CPU monitor +''' + +my_mem = '...' +''' +:str The output of the RAM monitor +''' + +my_swp = '...' +''' +:str The output of the swap memory monitor +''' + +my_shm = '...' +''' +:str The output of the shared memory monitor +''' + +my_net = 'Net: ...' +''' +:str The output of the network monitor +''' + +my_snd = '...' +''' +:str The output of the audio monitor +''' + +my_moc = '...' +''' +:str The output of the music on console monitor +''' + + + +limited = lambda v : min(max(int(v + 0.5), 0), 100) +''' +:(float)→int Round an float to nearest integer and limit the range to [0, 100] +''' + + +len_ = len +len = lambda string : colour_aware_len(string, len_) + + + +############################################################################################################### + + +############################################################################################################### +# CPU monitor + + +def cpu(): + ''' + Update CPU usage + ''' + global my_cpu, last_cpus_idle, last_cpus_total, last_cpu_idle, last_cpu_total + try: + cpu_ = CPU() + now_cpu_idle, now_cpus_idle = cpu_.cpu[CPU.idle], [cpu[CPU.idle] for cpu in cpu_.cpus] + now_cpu_total, now_cpus_total = sum(cpu_.cpu), [sum(cpu) for cpu in cpu_.cpus] + if len(now_cpus_idle) > len(last_cpus_idle): + last_cpus_idle += now_cpus_idle[len(last_cpus_idle):] + last_cpus_total += now_cpus_total[len(last_cpus_total):] + cpus = zip(now_cpus_idle, now_cpus_total, last_cpus_idle, last_cpus_total) + cpus = ' '.join([cpu_colourise(cpu_usage(*c)) for c in cpus]) + cpu_ = cpu_colourise(cpu_usage(now_cpu_idle, now_cpu_total, last_cpu_idle, last_cpu_total)) + cpu_ = '%s : %s' % (cpus, cpu_) + last_cpus_idle, last_cpus_total = now_cpus_idle, now_cpus_total + last_cpu_idle, last_cpu_total = now_cpu_idle, now_cpu_total + my_cpu = cpu_ + except: + my_cpu = '...' + +def cpu_usage(now_idle, now_total, last_idle, last_total): + ''' + Calculate the CPU usage - lts = '-lts' if kernel_release.lower().endswith('-lts') else '' - if have_pacman is None: - have_pacman = True - try: - linux_installed = Pacman('linux-libre' + lts, True) - except: - have_linux_libre = False - try: - linux_installed = Pacman('linux' + lts, True) - except: - have_pacman = False - if have_pacman: - try: - linux_latest = Pacman(('linux-libre' if have_linux_libre else 'linux') + lts, False) - except: - have_pacman = None - elif have_pacman: - try: - linux_installed = Pacman(('linux-libre' if have_linux_libre else 'linux') + lts, True) - linux_latest = Pacman(('linux-libre' if have_linux_libre else 'linux') + lts, False) - except: - have_pacman = None + @param now_idle:int Time spent in the idle task, at the current measurement + @param now_total:int Total time that has passed, at the current measurement + @param last_idle:int Time spent in the idle task, at the last measurement + @param last_total:int Total time that has passed, at the last measurement + @return :float? The CPU usage, `None` if not time has passed + ''' + total = now_total - last_total + idle = now_idle - last_idle + return None if total == 0 else (total - idle) * 100 / total + +def cpu_colourise_(value): + ''' + Colourise a CPU usage value - if (have_pacman is not None) and have_pacman: - linux_running = kernel_release.split('-') - linux_running, kernel_release = linux_running[:2], linux_running[2:] - linux_running = '-'.join(linux_running) - kernel_release = '-' + '-'.join(kernel_release) - linux_installed = linux_installed.version - linux_latest = linux_latest.version - if linux_installed == linux_latest: - if linux_running == linux_installed: - linux_running = '\033[32m%s\033[39m' % linux_running + @param value:int The CPU usage + @return :str `value` coloured with an appropriate colour + ''' + if value is None: + return '--%' + elif value >= 100: + return '\033[31m100\033[0m' + colour = '39' + if value >= 5: colour = '32' + if value >= 50: colour = '33' + if value >= 90: colour = '31' + return '\033[%sm%2i\033[0m%%' % (colour, value) + +cpu_none = cpu_colourise_(None) +''' +:str Cache for the coloursation of the value `None` +''' + +cpu_coloured = tuple(cpu_colourise_(i) for i in range(101)) +''' +:tuple Cache of colourised CPU usage values +''' + +cpu_colourise = lambda v : cpu_none if v is None else cpu_coloured[limited(v)] +''' +:(value:int)→str Cached version of `cpu_colourise_` +''' + +last_cpu_idle = 0 +''' +:int Time spent in the idle task, at the last measurement, on all CPU-threads +''' + +last_cpu_total = 0 +''' +:int Total time that has passed, at the last measurement, on all CPU-threads +''' + +last_cpus_idle = [] +''' +:int Time spent in the idle task, at the last measurement, for each CPU-thread +''' + +last_cpus_total = [] +''' +:int Total time that has passed, at the last measurement, for each CPU-thread +''' + + + +############################################################################################################### +# Memory usage monitor + + +def mem(): + ''' + Update memory usage + ''' + global my_mem, my_swp, my_shm + try: + memory = Memory() + if memory.mem_total == 0: + my_mem = '---' + my_shm = '---' else: - if linux_running == linux_installed: - linux_running = '\033[33m%s\033[39m' % linux_running + my_mem = memory_coloured[limited(memory.mem_used * 100 / memory.mem_total)] + my_shm = memory_coloured[limited(memory.shmem * 100 / memory.mem_total)] + if memory.swap_total == 0: + my_swp = 'off' + else: + my_swp = memory_coloured[limited(memory.swap_used * 100 / memory.swap_total)] + except: + my_mem = '...' + my_swp = '...' + my_shm = '...' + +def memory_colourise(value): + ''' + Colourise a memory usage value + + @param value:int The memory usage + @return :str `value` coloured with an appropriate colour + ''' + if value >= 100: + return '\033[31m100\033[0m' + colour = '39' + if value > 30: colour = '32' + if value > 50: colour = '33' + if value > 80: colour = '31' + return '\033[%sm%i\033[0m%%' % (colour, value) + +memory_coloured = tuple(memory_colourise(i) for i in range(101)) +''' +:tuple Cache of colourised memory usage values +''' + + + +############################################################################################################### +# Network monitor + + +def net(): + ''' + Update network usage and latency + ''' + global my_net, net_time, net_last + try: + net_now = time.monotonic() + net_tdiff, net_time = net_now - net_time, net_now + devs = Network('lo').devices + def kbps(device, direction): + ''' + Get the number of kilobits transmitted or received per second since the last measurement + + @param device:str The network device + @param direction:str 'rx' for received data, 'tx' for transmitted data + @return :str The number of kilobits transmitted or received per second, colourised + ''' + direction += '_bytes' + value = devs[device][direction] + if device in net_last: + value -= net_last[device][direction] else: - linux_running = '\033[31m%s\033[39m' % linux_running - kernel_release = linux_running + kernel_release - uname_ = '%s %s %s' - uname_ %= (nodename, kernel_release, operating_system) - return uname_ + value = 0 + value /= 128 * net_tdiff + return network_colourise(value) + def KBps(device, direction): + ''' + Get the number of kilobytes transmitted or received per second since the last measurement + + @param device:str The network device + @param direction:str 'rx' for received data, 'tx' for transmitted data + @return :float The number of kilobytes transmitted or received per second + ''' + direction += '_bytes' + value = devs[device][direction] + if device in net_last: + value -= net_last[device][direction] + else: + value = 0 + value /= 1024 * net_tdiff + return value + my_net = ' '.join('%s: %skbps(%.0fKB/s)↓ %skbps(%.0fKB/s)↑ %s' % + (dev, kbps(dev, 'rx'), KBps(dev, 'rx'), kbps(dev, 'tx'), KBps(dev, 'tx'), ping(dev)) + for dev in devs) + net_last = devs + except: + my_net = 'Net: ...' + +def network_colourise(value): + ''' + Colourise a network usage value + + @param value:int The network usage, in kilobits per second + @return :str `value` coloured with an appropriate colour + ''' + colour = '39' + if value > 40: colour = '32' + if value > 8000: colour = '33' + if value > 60000: colour = '31' + return '\033[%sm%3.0f\033[0m' % (colour, value) +def ping(device): + ''' + Get the latency for a network device + + @param device:str The device + @return :str The latency, colourised + ''' + try: + monitor = my_ping[device][0] + monitor.semaphore.acquire() + try: + latency = monitor.get_latency(True)[1] + droptime = monitor.dropped_time(True) + if droptime: + return '\033[31m%4is\033[00m' % droptime + elif latency is None: + return '\033[31mdrop?\033[00m' + colour = '31' + if latency < 5: colour = '32' + elif latency < 10: colour = '00' + elif latency < 20: colour = '33' + return '\033[%sm%5.2f\033[00m' % (colour, latency) + finally: + monitor.semaphore.release() + except: + return '...' + +my_ping = Ping(targets = Ping.get_nics(), interval = 10).monitors +''' +:Ping Latency monitor +''' net_time = time.monotonic() +''' +:float The time of the last reading +''' + net_last = {} -def network(): - global net_time, net_last - net_now = time.monotonic() - net_tdiff, net_time = net_now - net_time, net_now - net_ = Network('lo').devices - def colourise(value): - colour = '39' - if value > 40: colour = '32' - if value > 8000: colour = '33' - if value > 60000: colour = '31' - return '\033[%sm%3.0f\033[0m' % (colour, value) - def kbps(device, direction): - direction += '_bytes' - value = net_[device][direction] - if device in net_last: - value -= net_last[device][direction] - else: - value = 0 - value /= 128 * net_tdiff - return colourise(value) - def KBps(device, direction): - direction += '_bytes' - value = net_[device][direction] - if device in net_last: - value -= net_last[device][direction] - else: - value = 0 - value /= 1024 * net_tdiff - return value - net = [(dev, kbps(dev, 'rx'), KBps(dev, 'rx'), kbps(dev, 'tx'), KBps(dev, 'tx')) for dev in net_] - net = ['%s: %skbps(%.0fKB/s)↓ %skbps(%.0fKB/s)↑' % x for x in net] - net = '%s' % ' '.join(net) - net_last = net_ - return net - - -last_cpu_idle, last_cpu_total = 0, 0 -last_cpus_idle, last_cpus_total = [], [] -def cpu(): - global last_cpus_idle, last_cpus_total, last_cpu_idle, last_cpu_total - cpu_ = CPU() - now_cpu_idle, now_cpus_idle = cpu_.cpu[CPU.idle], [cpu[CPU.idle] for cpu in cpu_.cpus] - now_cpu_total, now_cpus_total = sum(cpu_.cpu), [sum(cpu) for cpu in cpu_.cpus] - def cpu_usage(now_idle, now_total, last_idle, last_total): - total = now_total - last_total - idle = now_idle - last_idle - return None if total == 0 else (total - idle) * 100 / total - def colourise(value): - if value is None: - return '--%' - colour = '39' - if value >= 5: colour = '32' - if value >= 50: colour = '33' - if value >= 90: colour = '31' - return '\033[%sm%2.0f\033[0m%%' % (colour, value) - if len(now_cpus_idle) > len(last_cpus_idle): - last_cpus_idle += now_cpus_idle[len(last_cpus_idle):] - last_cpus_total += now_cpus_total[len(last_cpus_total):] - cpus = zip(now_cpus_idle, now_cpus_total, last_cpus_idle, last_cpus_total) - cpus = ' '.join([colourise(cpu_usage(*c)) for c in cpus]) - cpu = colourise(cpu_usage(now_cpu_idle, now_cpu_total, last_cpu_idle, last_cpu_total)) - cpu = 'Cpu: %s : %s' % (cpus, cpu) - last_cpus_idle, last_cpus_total = now_cpus_idle, now_cpus_total - last_cpu_idle, last_cpu_total = now_cpu_idle, now_cpu_total - return cpu - - -weather_last = '?' -weather_semaphore = threading.Semaphore() -def weather_update_(): - global weather_last - if weather_semaphore.acquire(blocking = False): +''' +:dict> Readings from the update +''' + + + +############################################################################################################### +# Audio monitor + + +def snd(add_missing_mixers = True): + ''' + Update audio volume + + @param add_missing_mixers:bool Whether missing mixers should be added + ''' + global my_snd, stops_alsa + stops_ = [] + if add_missing_mixers: try: - weather_ = Weather('ESSA').temp - colour = '34' - if weather_ < -10: colour = '39;44' - if weather_ >= 18: colour = '39' - if weather_ >= 25: colour = '33' - if weather_ >= 30: colour = '31' - weather_ = '\033[%sm%.0f\033[0m°C' % (colour, weather_) - weather_last = weather_ + for i, mixer in my_alsa: + if mixer is None: + my_alsa[i] = alsa(*(mixers[i])) except: - if not weather_last.endswith('?'): - weather_last += '?' - weather_semaphore.release() -weather_update = Sometimes(lambda : async(weather_update_), 20 * 60 * 2) -def weather(): - rc = weather_last - weather_update() - return rc - - -chase_ = Chase() -chase_update = Sometimes(lambda : async(chase_.update), 2 * 60 * 60 * 2) -def chase(): - status = chase_.status - status = '39' if status is None else ('32' if status else '31') - chase_update() - return '\033[%smChase\033[0m' % status - - - -functions = [ Sometimes(lambda : clock_.read(), 1 * 2), - lambda : time.time() % 1, - Sometimes(users, 1 * 2), - weather, - chase, - cpu, - memory, - network, - Sometimes(uname, 30 * 60 * 2), + pass + try: + offset = 0 + my_snd_ = [] + for mixer in my_alsa: + if mixer is not None: + text = snd_read_m(mixer) + my_snd_.append(text) + text_len = len(text) + mixer_stops = [0, text_len] + mixer_name_len, text = len(text.split(': ')[0]), ': '.join(text.split(': ')[1:]) + for i in range((len(text) + 1) // 4): + mixer_stops.append(mixer_name_len + 2 + 4 * i + 0) + mixer_stops.append(mixer_name_len + 2 + 4 * i + 3) + stops_.append(tuple(offset + stop for stop in mixer_stops)) + offset += text_len + 3 + else: + stops_.append((-1, -1)) + my_snd = ' │ '.join(my_snd_) + stops_alsa = stops_ + except: + my_snd = '...' + stops_alsa = [(-1, -1)] * len(my_alsa) + +def alsa(cardindex, mixername): + ''' + The a volume controller for a mixer + + @param cardindex:int The index of the audio card + @param mixername:str The name of the mixer + @return :Alsa? Volume controller, `None` if the mixers is not available + ''' + try: + return ALSA(cardindex, mixername) + except: + return None + +snd_text_v = lambda v : '--%' if v is None else ('%2i%%' % v)[:3] +''' +:(v:int?)→str Convert a volume integer to a volume str, `None` as input means it is muted +''' + +snd_read_m = lambda m : '%s: %s' % (m.mixername, ' '.join(snd_text_v(v) for v in m.get_volume())) +''' +:(m:ALSA)→str Create a string representing the current volumes on a mixer +''' + +mixers = ((0, 'Master'), (0, 'Headphone'), (0, 'Speaker'), (0, 'PCM')) +''' +:tuple<(cardindex:int, mixername:str)> List of mixers that should be monitored +''' + +my_alsa = [alsa(card, mixer) for card, mixer in mixers] +''' +:list ALSA volume controllers +''' + +stops_alsa = [(-1, -1)] * len(my_alsa) +''' +:itr> Map from mixer index to location of substring in the monitor display. + The first element is where the mixer starts, and the second element is + where the mixer stops. Each mixer may have 2 additional elements for each + channel, in that case that are starts and stops, alternating. +''' + + + +############################################################################################################### +# Music on console monitor + + +my_moc = '> || [] |< >|' + +moc_controller = MOC() +''' +:MOC The MOC controller +''' + + + +############################################################################################################### + + +############################################################################################################### + + + +functions = [ my_clock.read + , lambda : my_cpu + , lambda : my_mem + , lambda : my_swp + , lambda : my_shm + + , lambda : my_net + , lambda : my_snd + , lambda : my_moc ] +''' +:itr<()→str> Functions that are evulated and replaces the %s:s in + `pattern` every time to panel is redrawn +''' -pattern = [ '%s │ %.2f │ %s │ %s │ %s }{ %s │ %s │ %s │ %s' +pattern = [ [ '%n%s%n │ %nCpu: %s%n │ %nMem: %s%n │ %nSwp: %s%n │ %nShm: %s%n' + , '%n%s%n │ %n%s%n │ %nMoc: %s%n' + ] ] +''' +:itr> The layout of the monitors on the panel +''' + + +async_fun = [ Sometimes(cpu, TICKS_PER_SECOND) + , Sometimes(mem, TICKS_PER_SECOND) + , Sometimes(net, TICKS_PER_SECOND) + , Sometimes(snd, TICKS_PER_SECOND * 3) + ] +''' +:itr<()→void> Monitors that are be update asynchronously +''' + +semaphore = threading.Semaphore() +''' +:Semaphore Semaphore used to make sure functions do not step on each others' toes +''' + + +HEIGHT_PER_LINE = 12 +''' +:int The height of each line +''' + + +HEIGHT = len(pattern) * HEIGHT_PER_LINE +''' +:int The height of the panel +''' + + +stops = [-1] * (2 * len(functions)) +''' +:itr Locations of stops marked by `%n` in `pattern` +''' + + + +pattern = '\n'.join('\0'.join(p) for p in pattern) + start_ = start def start(): + ''' + Invoked when it is time to create panels and map them + ''' + # Create panel, clear it, and synchronise start_() - async(lambda : clock_.continuous_sync(lambda : bar.invalidate())) + bar.clear() + get_display().flush() + + def update_sys(): + ''' + Update data asynchronously, but do not redraw the panel + ''' + [f() for f in async_fun] + + def update_clock(): + ''' + Update the clock, and redraw the panel + ''' + if semaphore.acquire(blocking = False): + try: + for f in functions: + if isinstance(f, Clocked): + f(True) + finally: + semaphore.release() + bar.invalidate() + + # Start monitoring + async(lambda : watch(1 / TICKS_PER_SECOND, update_sys), name = 'sys') + async(lambda : my_clock.continuous_sync(update_clock), name = 'clock') + -HEIGHT = len(pattern) * 12 -pattern = '\n'.join([p.replace('}{', '\0') for p in pattern]) -semaphore = threading.Semaphore() def redraw(): + ''' + Invoked when redraw is needed + ''' + global stops if semaphore.acquire(blocking = False): - values = pattern % tuple([f() for f in functions]) - #print(values.replace('\0', ' ' * 8)) - bar.partial_clear(0, bar.width, 10, 0, 2, values) - bar.draw_coloured_splitted_text(0, bar.width, 10, 0, 2, values) - semaphore.release() - return True - return False + try: + (values, stops) = sprintf(pattern, *(f() for f in functions)) + bar.partial_clear(0, bar.width, 10, 0, 2, values) + bar.draw_coloured_splitted_text(0, bar.width, 10, 0, 2, values) + finally: + semaphore.release() + + + +LEFT_BUTTON = 1 +''' +:int The index of the left button +''' + +MIDDLE_BUTTON = 2 +''' +:int The index of the middle button +''' + +RIGHT_BUTTON = 3 +''' +:int The index of the right button +''' + +SCROLL_UP = 4 +''' +:int The index of the psuedo-button for scrolling upwards +''' + +SCROLL_DOWN = 5 +''' +:int The index of the psuedo-button for scrolling downwards +''' + +FORWARD_BUTTON = 8 # X1 +''' +:int The index of the forward button, also known as X1 +''' + +BACKWARD_BUTTON = 9 # X2 +''' +:int The index of the backward button, also known as X2 +''' + + +def unhandled_event(e): + ''' + Invoked when an unrecognised even is polled, + feel free to replace this completely + + @param e The event + ''' + if isinstance(e, Xlib.protocol.event.ButtonPress): + y = e.event_y // HEIGHT_PER_LINE + lx = e.event_x // bar.font_width + rx = (bar.width - e.event_x) // bar.font_width + button = e.detail + button_pressed(y, lx, rx, button) + + +def button_pressed(y, lx, rx, button): + ''' + Called from `unhandled_event` when a button on a pointer device is pressed + + @param y:int The line that the pointer is on, zero based + @param lx:int The column the pointer is on, relative to the left edge, zero based + @param rx:int The column the pointer is on, relative to the right edge, zero based + @param button:int The button on the device that is pressed + ''' + # Stops at the left side of line 0 + stops_l0 = stops[0 : 10] + + # Stops at the right side of line 0 + stops_r0 = [stops[15] - x for x in stops[10 : 16]] + + try: + if y == 0: + if stops_l0[0] <= lx < stops_l0[1]: # clock + if button == LEFT_BUTTON: + Clock.__init__(my_clock, time_format, not my_clock.utc, my_clock.sync_to) + bar.invalidate() + elif stops_l0[2] <= lx < stops_l0[3]: # cpu + pass + elif stops_l0[4] <= lx < stops_l0[5]: # mem + pass + elif stops_l0[6] <= lx < stops_l0[7]: # swp + pass + elif stops_l0[8] <= lx < stops_l0[9]: # shm + pass + elif stops_r0[0] > rx >= stops_r0[1]: # net + pass + elif stops_r0[2] > rx >= stops_r0[3]: # snd + button_pressed_mixer(stops_r0[2] - rx - 1, button) + elif stops_r0[4] > rx >= stops_r0[5]: # moc + mx = stops_r0[4] - rx - 1 - 5 + if button == LEFT_BUTTON: + if 0 <= mx < 1: async(lambda : moc_controller.play().wait()) # > + elif 2 <= mx < 4: async(lambda : moc_controller.toggle_pause().wait()) # || + elif 5 <= mx < 7: async(lambda : moc_controller.stop().wait()) # [] + elif 8 <= mx < 10: async(lambda : moc_controller.previous().wait()) # |< + elif 11 <= mx < 13: async(lambda : moc_controller.next().wait()) # >| + elif button == FORWARD_BUTTON: async(lambda : moc_controller.next().wait()) + elif button == BACKWARD_BUTTON: async(lambda : moc_controller.previous().wait()) + elif button == SCROLL_UP: async(lambda : moc_controller.seek(+5).wait()) + elif button == SCROLL_DOWN: async(lambda : moc_controller.seek(-5).wait()) + except: + pass + + +def button_pressed_mixer(mx, button): + ''' + Called from `button_pressed` when the user is touched the audio mixer monitor + + @param mx:int The column the pointer is at right-relative to the left edge of the mixer display + @param button:int The button on the device that is pressed + ''' + for mixer_index, mixer_stops in enumerate(stops_alsa): + if mixer_stops[0] <= mx < mixer_stops[1]: + # Get channel + channel = ALSA.ALL_CHANNELS + if not button == RIGHT_BUTTON: # not (balance channels) + mixer_stops = mixer_stops[2:] + for channel_index in range(len(mixer_stops) // 2): + if mixer_stops[channel_index * 2 + 0] <= mx < mixer_stops[channel_index * 2 + 1]: + channel = channel_index + break + # Get mixer + mixer = my_alsa[mixer_index] + # Get volumes and all selected channels + volumes = mixer.get_volume() + channels = list(range(len(volumes))) if channel == ALSA.ALL_CHANNELS else [channel] + # Filter volumes to selected channels + volumes = [volume for c, volume in enumerate(volumes) if c in channels] + + # Control the volume + if button == LEFT_BUTTON: # toggle mute + mute = not any(volume is None for volume in volumes) + [mixer.set_mute(mute, c) for c in channels] + elif button == RIGHT_BUTTON: # balance channels + mixer.set_volume(sum(volumes) // len(volumes), ALSA.ALL_CHANNELS) + elif button == SCROLL_UP: # turn up the volume + [mixer.set_volume(limited(v + 5), c) for c, v in zip(channels, volumes)] + elif button == SCROLL_DOWN: # turn down the volume + [mixer.set_volume(limited(v - 5), c) for c, v in zip(channels, volumes)] + + # Update the panel + snd(False) + bar.invalidate() + break -- cgit v1.2.3-70-g09d2