# -*- python -*- ''' xpybar – xmobar replacement written in python Copyright © 2014, 2015, 2016, 2017, 2018, 2019 Mattias Andrée (m@maandreese) This program is free software: you can redistribute it and/or modify it under the terms of the GNU General Public License as published by the Free Software Foundation, either version 3 of the License, or (at your option) any later version. This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details. You should have received a copy of the GNU General Public License along with this program. If not, see . ''' # A moderate xpybar configuration example that has a few monitors # that are updates continuously, and rat support import time import threading from util import * from plugins.clock import Clock from plugins.cpu import CPU from plugins.mem import Memory from plugins.network import Network from plugins.ping import Ping from plugins.alsa import ALSA from plugins.moc import MOC import Xlib.protocol.event OUTPUT = 0 ''' :int The index of the monitor that the panel is displayed on ''' YPOS = 24 ''' :int The panels position relative to the upper or lower edge of the monitor (which is determined by `TOP`) ''' TOP = True ''' :bool True: `YPOS` is relative to the upper edge False: `YPOS` is relative to the lower edge ''' TICKS_PER_SECOND = 4 ''' :float The number of times per second the panel is updated ''' time_format = '%Y-(%m)%b-%d %T, %a w%V, %Z' ''' :str The format the clock is displayed in ''' my_clock = Clock(format = time_format, utc = False, sync_to = Clock.SECONDS / TICKS_PER_SECOND) ''' :Clock The clock monitor ''' my_cpu = '...' ''' :str The output of the CPU monitor ''' my_mem = '...' ''' :str The output of the RAM monitor ''' my_swp = '...' ''' :str The output of the swap memory monitor ''' my_shm = '...' ''' :str The output of the shared memory monitor ''' my_net = 'Net: ...' ''' :str The output of the network monitor ''' my_snd = '...' ''' :str The output of the audio monitor ''' my_moc = '...' ''' :str The output of the music on console monitor ''' limited = lambda v : min(max(int(v + 0.5), 0), 100) ''' :(float)→int Round an float to nearest integer and limit the range to [0, 100] ''' len_ = len len = lambda string : colour_aware_len(string, len_) ############################################################################################################### ############################################################################################################### # CPU monitor def cpu(): ''' Update CPU usage ''' global my_cpu, last_cpus_idle, last_cpus_total, last_cpu_idle, last_cpu_total try: cpu_ = CPU() now_cpu_idle, now_cpus_idle = cpu_.cpu[CPU.idle], [cpu[CPU.idle] for cpu in cpu_.cpus] now_cpu_total, now_cpus_total = sum(cpu_.cpu), [sum(cpu) for cpu in cpu_.cpus] if len(now_cpus_idle) > len(last_cpus_idle): last_cpus_idle += now_cpus_idle[len(last_cpus_idle):] last_cpus_total += now_cpus_total[len(last_cpus_total):] cpus = zip(now_cpus_idle, now_cpus_total, last_cpus_idle, last_cpus_total) cpus = ' '.join([cpu_colourise(cpu_usage(*c)) for c in cpus]) cpu_ = cpu_colourise(cpu_usage(now_cpu_idle, now_cpu_total, last_cpu_idle, last_cpu_total)) cpu_ = '%s : %s' % (cpus, cpu_) last_cpus_idle, last_cpus_total = now_cpus_idle, now_cpus_total last_cpu_idle, last_cpu_total = now_cpu_idle, now_cpu_total my_cpu = cpu_ except: my_cpu = '...' def cpu_usage(now_idle, now_total, last_idle, last_total): ''' Calculate the CPU usage @param now_idle:int Time spent in the idle task, at the current measurement @param now_total:int Total time that has passed, at the current measurement @param last_idle:int Time spent in the idle task, at the last measurement @param last_total:int Total time that has passed, at the last measurement @return :float? The CPU usage, `None` if not time has passed ''' total = now_total - last_total idle = now_idle - last_idle return None if total == 0 else (total - idle) * 100 / total def cpu_colourise_(value): ''' Colourise a CPU usage value @param value:int The CPU usage @return :str `value` coloured with an appropriate colour ''' if value is None: return '--%' elif value >= 100: return '\033[31m100\033[0m' colour = '39' if value >= 5: colour = '32' if value >= 50: colour = '33' if value >= 90: colour = '31' return '\033[%sm%2i\033[0m%%' % (colour, value) cpu_none = cpu_colourise_(None) ''' :str Cache for the coloursation of the value `None` ''' cpu_coloured = tuple(cpu_colourise_(i) for i in range(101)) ''' :tuple Cache of colourised CPU usage values ''' cpu_colourise = lambda v : cpu_none if v is None else cpu_coloured[limited(v)] ''' :(value:int)→str Cached version of `cpu_colourise_` ''' last_cpu_idle = 0 ''' :int Time spent in the idle task, at the last measurement, on all CPU-threads ''' last_cpu_total = 0 ''' :int Total time that has passed, at the last measurement, on all CPU-threads ''' last_cpus_idle = [] ''' :int Time spent in the idle task, at the last measurement, for each CPU-thread ''' last_cpus_total = [] ''' :int Total time that has passed, at the last measurement, for each CPU-thread ''' ############################################################################################################### # Memory usage monitor def mem(): ''' Update memory usage ''' global my_mem, my_swp, my_shm try: memory = Memory() if memory.mem_total == 0: my_mem = '---' my_shm = '---' else: my_mem = memory_coloured[limited(memory.mem_used * 100 / memory.mem_total)] my_shm = memory_coloured[limited(memory.shmem * 100 / memory.mem_total)] if memory.swap_total == 0: my_swp = 'off' else: my_swp = memory_coloured[limited(memory.swap_used * 100 / memory.swap_total)] except: my_mem = '...' my_swp = '...' my_shm = '...' def memory_colourise(value): ''' Colourise a memory usage value @param value:int The memory usage @return :str `value` coloured with an appropriate colour ''' if value >= 100: return '\033[31m100\033[0m' colour = '39' if value > 30: colour = '32' if value > 50: colour = '33' if value > 80: colour = '31' return '\033[%sm%i\033[0m%%' % (colour, value) memory_coloured = tuple(memory_colourise(i) for i in range(101)) ''' :tuple Cache of colourised memory usage values ''' ############################################################################################################### # Network monitor def net(): ''' Update network usage and latency ''' global my_net, net_time, net_last try: net_now = time.monotonic() net_tdiff, net_time = net_now - net_time, net_now devs = Network('lo').devices def kbps(device, direction): ''' Get the number of kilobits transmitted or received per second since the last measurement @param device:str The network device @param direction:str 'rx' for received data, 'tx' for transmitted data @return :str The number of kilobits transmitted or received per second, colourised ''' direction += '_bytes' value = devs[device][direction] if device in net_last: value -= net_last[device][direction] else: value = 0 value /= 128 * net_tdiff return network_colourise(value) def KBps(device, direction): ''' Get the number of kilobytes transmitted or received per second since the last measurement @param device:str The network device @param direction:str 'rx' for received data, 'tx' for transmitted data @return :float The number of kilobytes transmitted or received per second ''' direction += '_bytes' value = devs[device][direction] if device in net_last: value -= net_last[device][direction] else: value = 0 value /= 1024 * net_tdiff return value my_net = ' '.join('%s: %skbps(%.0fKB/s)↓ %skbps(%.0fKB/s)↑ %s' % (dev, kbps(dev, 'rx'), KBps(dev, 'rx'), kbps(dev, 'tx'), KBps(dev, 'tx'), ping(dev)) for dev in devs) net_last = devs except: my_net = 'Net: ...' def network_colourise(value): ''' Colourise a network usage value @param value:int The network usage, in kilobits per second @return :str `value` coloured with an appropriate colour ''' colour = '39' if value > 40: colour = '32' if value > 8000: colour = '33' if value > 60000: colour = '31' return '\033[%sm%3.0f\033[0m' % (colour, value) def ping(device): ''' Get the latency for a network device @param device:str The device @return :str The latency, colourised ''' try: monitor = my_ping[device][0] monitor.semaphore.acquire() try: latency = monitor.get_latency(True)[1] droptime = monitor.dropped_time(True) if droptime: return '\033[31m%4is\033[00m' % droptime elif latency is None: return '\033[31mdrop?\033[00m' colour = '31' if latency < 5: colour = '32' elif latency < 10: colour = '00' elif latency < 20: colour = '33' return '\033[%sm%5.2f\033[00m' % (colour, latency) finally: monitor.semaphore.release() except: return '...' my_ping = Ping(targets = Ping.get_nics(), interval = 10).monitors ''' :Ping Latency monitor ''' net_time = time.monotonic() ''' :float The time of the last reading ''' net_last = {} ''' :dict> Readings from the update ''' ############################################################################################################### # Audio monitor def snd(add_missing_mixers = True): ''' Update audio volume @param add_missing_mixers:bool Whether missing mixers should be added ''' global my_snd, stops_alsa stops_ = [] if add_missing_mixers: try: for i, mixer in my_alsa: if mixer is None: my_alsa[i] = alsa(*(mixers[i])) except: pass try: offset = 0 my_snd_ = [] for mixer in my_alsa: if mixer is not None: text = snd_read_m(mixer) my_snd_.append(text) text_len = len(text) mixer_stops = [0, text_len] mixer_name_len, text = len(text.split(': ')[0]), ': '.join(text.split(': ')[1:]) for i in range((len(text) + 1) // 4): mixer_stops.append(mixer_name_len + 2 + 4 * i + 0) mixer_stops.append(mixer_name_len + 2 + 4 * i + 3) stops_.append(tuple(offset + stop for stop in mixer_stops)) offset += text_len + 3 else: stops_.append((-1, -1)) my_snd = ' │ '.join(my_snd_) stops_alsa = stops_ except: my_snd = '...' stops_alsa = [(-1, -1)] * len(my_alsa) def alsa(cardindex, mixername): ''' The a volume controller for a mixer @param cardindex:int The index of the audio card @param mixername:str The name of the mixer @return :Alsa? Volume controller, `None` if the mixers is not available ''' try: return ALSA(cardindex, mixername) except: return None snd_text_v = lambda v : '--%' if v is None else ('%2i%%' % v)[:3] ''' :(v:int?)→str Convert a volume integer to a volume str, `None` as input means it is muted ''' snd_read_m = lambda m : '%s: %s' % (m.mixername, ' '.join(snd_text_v(v) for v in m.get_volume())) ''' :(m:ALSA)→str Create a string representing the current volumes on a mixer ''' mixers = ((0, 'Master'), (0, 'Headphone'), (0, 'Speaker'), (0, 'PCM')) ''' :tuple<(cardindex:int, mixername:str)> List of mixers that should be monitored ''' my_alsa = [alsa(card, mixer) for card, mixer in mixers] ''' :list ALSA volume controllers ''' stops_alsa = [(-1, -1)] * len(my_alsa) ''' :itr> Map from mixer index to location of substring in the monitor display. The first element is where the mixer starts, and the second element is where the mixer stops. Each mixer may have 2 additional elements for each channel, in that case that are starts and stops, alternating. ''' ############################################################################################################### # Music on console monitor my_moc = '> || [] |< >|' moc_controller = MOC() ''' :MOC The MOC controller ''' ############################################################################################################### ############################################################################################################### functions = [ my_clock.read , lambda : my_cpu , lambda : my_mem , lambda : my_swp , lambda : my_shm , lambda : my_net , lambda : my_snd , lambda : my_moc ] ''' :itr<()→str> Functions that are evulated and replaces the %s:s in `pattern` every time to panel is redrawn ''' pattern = [ [ '%n%s%n │ %nCpu: %s%n │ %nMem: %s%n │ %nSwp: %s%n │ %nShm: %s%n' , '%n%s%n │ %n%s%n │ %nMoc: %s%n' ] ] ''' :itr> The layout of the monitors on the panel ''' async_fun = [ Sometimes(cpu, TICKS_PER_SECOND) , Sometimes(mem, TICKS_PER_SECOND) , Sometimes(net, TICKS_PER_SECOND) , Sometimes(snd, TICKS_PER_SECOND * 3) ] ''' :itr<()→void> Monitors that are be update asynchronously ''' semaphore = threading.Semaphore() ''' :Semaphore Semaphore used to make sure functions do not step on each others' toes ''' HEIGHT_PER_LINE = 12 ''' :int The height of each line ''' HEIGHT = len(pattern) * HEIGHT_PER_LINE ''' :int The height of the panel ''' stops = [-1] * (2 * len(functions)) ''' :itr Locations of stops marked by `%n` in `pattern` ''' pattern = '\n'.join('\0'.join(p) for p in pattern) start_ = start def start(): ''' Invoked when it is time to create panels and map them ''' # Create panel, clear it, and synchronise start_() bar.clear() get_display().flush() def update_sys(): ''' Update data asynchronously, but do not redraw the panel ''' [f() for f in async_fun] def update_clock(): ''' Update the clock, and redraw the panel ''' if semaphore.acquire(blocking = False): try: for f in functions: if isinstance(f, Clocked): f(True) finally: semaphore.release() bar.invalidate() # Start monitoring xasync(lambda : watch(1 / TICKS_PER_SECOND, update_sys), name = 'sys') xasync(lambda : my_clock.continuous_sync(update_clock), name = 'clock') def redraw(): ''' Invoked when redraw is needed ''' global stops if semaphore.acquire(blocking = False): try: (values, stops) = sprintf(pattern, *(f() for f in functions)) bar.partial_clear(0, bar.width, 10, 0, 2, values) bar.draw_coloured_splitted_text(0, bar.width, 10, 0, 2, values) finally: semaphore.release() LEFT_BUTTON = 1 ''' :int The index of the left button ''' MIDDLE_BUTTON = 2 ''' :int The index of the middle button ''' RIGHT_BUTTON = 3 ''' :int The index of the right button ''' SCROLL_UP = 4 ''' :int The index of the psuedo-button for scrolling upwards ''' SCROLL_DOWN = 5 ''' :int The index of the psuedo-button for scrolling downwards ''' SCROLL_LEFT = 6 ''' :int The index of the psuedo-button for scrolling left ''' SCROLL_RIGHT = 7 ''' :int The index of the psuedo-button for scrolling right ''' FORWARD_BUTTON = 8 # X1 ''' :int The index of the forward button, also known as X1 ''' BACKWARD_BUTTON = 9 # X2 ''' :int The index of the backward button, also known as X2 ''' def unhandled_event(e): ''' Invoked when an unrecognised even is polled, feel free to replace this completely @param e The event ''' if isinstance(e, Xlib.protocol.event.ButtonPress): y = e.event_y // HEIGHT_PER_LINE lx = e.event_x // bar.font_width rx = (bar.width - e.event_x) // bar.font_width button = e.detail button_pressed(y, lx, rx, button) def button_pressed(y, lx, rx, button): ''' Called from `unhandled_event` when a button on a pointer device is pressed @param y:int The line that the pointer is on, zero based @param lx:int The column the pointer is on, relative to the left edge, zero based @param rx:int The column the pointer is on, relative to the right edge, zero based @param button:int The button on the device that is pressed ''' # Stops at the left side of line 0 stops_l0 = stops[0 : 10] # Stops at the right side of line 0 stops_r0 = [stops[15] - x for x in stops[10 : 16]] try: if y == 0: if stops_l0[0] <= lx < stops_l0[1]: # clock if button == LEFT_BUTTON: Clock.__init__(my_clock, time_format, not my_clock.utc, my_clock.sync_to) bar.invalidate() elif stops_l0[2] <= lx < stops_l0[3]: # cpu pass elif stops_l0[4] <= lx < stops_l0[5]: # mem pass elif stops_l0[6] <= lx < stops_l0[7]: # swp pass elif stops_l0[8] <= lx < stops_l0[9]: # shm pass elif stops_r0[0] > rx >= stops_r0[1]: # net pass elif stops_r0[2] > rx >= stops_r0[3]: # snd button_pressed_mixer(stops_r0[2] - rx - 1, button) elif stops_r0[4] > rx >= stops_r0[5]: # moc mx = stops_r0[4] - rx - 1 - 5 if button == LEFT_BUTTON: if 0 <= mx < 1: xasync(lambda : moc_controller.play().wait()) # > elif 2 <= mx < 4: xasync(lambda : moc_controller.toggle_pause().wait()) # || elif 5 <= mx < 7: xasync(lambda : moc_controller.stop().wait()) # [] elif 8 <= mx < 10: xasync(lambda : moc_controller.previous().wait()) # |< elif 11 <= mx < 13: xasync(lambda : moc_controller.next().wait()) # >| elif button == FORWARD_BUTTON: xasync(lambda : moc_controller.next().wait()) elif button == BACKWARD_BUTTON: xasync(lambda : moc_controller.previous().wait()) elif button == SCROLL_UP: xasync(lambda : moc_controller.seek(+5).wait()) elif button == SCROLL_DOWN: xasync(lambda : moc_controller.seek(-5).wait()) except: pass def button_pressed_mixer(mx, button): ''' Called from `button_pressed` when the user is touched the audio mixer monitor @param mx:int The column the pointer is at right-relative to the left edge of the mixer display @param button:int The button on the device that is pressed ''' for mixer_index, mixer_stops in enumerate(stops_alsa): if mixer_stops[0] <= mx < mixer_stops[1]: # Get channel channel = ALSA.ALL_CHANNELS if not button == RIGHT_BUTTON: # not (balance channels) mixer_stops = mixer_stops[2:] for channel_index in range(len(mixer_stops) // 2): if mixer_stops[channel_index * 2 + 0] <= mx < mixer_stops[channel_index * 2 + 1]: channel = channel_index break # Get mixer mixer = my_alsa[mixer_index] # Get volumes and all selected channels volumes = mixer.get_volume() channels = list(range(len(volumes))) if channel == ALSA.ALL_CHANNELS else [channel] # Filter volumes to selected channels volumes = [volume for c, volume in enumerate(volumes) if c in channels] # Control the volume if button == LEFT_BUTTON: # toggle mute mute = not any(volume is None for volume in volumes) [mixer.set_mute(mute, c) for c in channels] elif button == RIGHT_BUTTON: # balance channels mixer.set_volume(sum(volumes) // len(volumes), ALSA.ALL_CHANNELS) elif button == SCROLL_UP: # turn up the volume [mixer.set_volume(limited(v + 5), c) for c, v in zip(channels, volumes)] elif button == SCROLL_DOWN: # turn down the volume [mixer.set_volume(limited(v - 5), c) for c, v in zip(channels, volumes)] # Update the panel snd(False) bar.invalidate() break