aboutsummaryrefslogtreecommitdiffstats
path: root/examples/moderate
diff options
context:
space:
mode:
authorMattias Andrée <maandree@operamail.com>2015-03-29 03:22:42 +0200
committerMattias Andrée <maandree@operamail.com>2015-03-29 03:22:42 +0200
commitd406ae4c144d550f4468cf1e3f0fcf21d81ee2cd (patch)
treec5a6dceb00c90f874dad2f4535afc2579376c505 /examples/moderate
parentupdate todo (diff)
downloadxpybar-d406ae4c144d550f4468cf1e3f0fcf21d81ee2cd.tar.gz
xpybar-d406ae4c144d550f4468cf1e3f0fcf21d81ee2cd.tar.bz2
xpybar-d406ae4c144d550f4468cf1e3f0fcf21d81ee2cd.tar.xz
new examples/moderate
Signed-off-by: Mattias Andrée <maandree@operamail.com>
Diffstat (limited to '')
-rw-r--r--examples/moderate901
1 files changed, 689 insertions, 212 deletions
diff --git a/examples/moderate b/examples/moderate
index 905fe01..00f0f6b 100644
--- a/examples/moderate
+++ b/examples/moderate
@@ -1,241 +1,718 @@
# -*- python -*-
-# A moderate xpybar configuration example that has a few monitors that are updates continuously
+# A moderate xpybar configuration example that has a few monitors
+# that are updates continuously, and rat support
-import os
import time
+import threading
-from plugins.clock import Clock
-from plugins.cpu import CPU
-from plugins.mem import Memory
+from util import *
+
+from plugins.clock import Clock
+from plugins.cpu import CPU
+from plugins.mem import Memory
from plugins.network import Network
-from plugins.users import Users
-from plugins.pacman import Pacman
-from plugins.uname import Uname
-from plugins.weather import Weather
-from plugins.chase import Chase
-
-
-OUTPUT, YPOS, TOP = 0, 24, True
-
-clock_ = Clock(format = '%Y-(%m)%b-%d %T, %a w%V, %Z', sync_to = 0.5)
-
-
-def memory():
- memory_ = Memory()
- def colourise(value, format = '%.0f'):
- colour = '39'
- if value > 30: colour = '32'
- if value > 50: colour = '33'
- if value > 80: colour = '31'
- return '\033[%sm%s\033[0m%%' % (colour, format % value)
- mem = 'Mem: ' + colourise(memory_.mem_used * 100 / memory_.mem_total)
- swp = 'Swp: ' + colourise(memory_.swap_used * 100 / memory_.swap_total)
- shm = 'Shm: ' + colourise(memory_.shmem * 100 / memory_.mem_total)
- return '%s │ %s │ %s' % (mem, swp, shm)
-
-
-def users():
- users_ = Users().users
- you = os.environ['USER']
- def colour_user(user):
- if user == 'root': return '\033[31m%s\033[39m'
- elif not user == you: return '\033[33m%s\033[39m'
- else: return '%s'
- users = ['%s{%i}' % (colour_user(u) % u, len(users_[u])) for u in users_.keys()]
- users = 'Users: %s' % (' '.join(users))
- return users
-
-
-have_linux_libre, have_pacman = True, None
-linux_installed, linux_latest = None, None
-def uname():
- global have_linux_libre, have_pacman, linux_installed, linux_latest
-
- uname_ = Uname()
- nodename = uname_.nodename
- kernel_release = uname_.kernel_release
- operating_system = uname_.operating_system
+from plugins.ping import Ping
+from plugins.alsa import ALSA
+from plugins.moc import MOC
+
+import Xlib.protocol.event
+
+
+
+OUTPUT = 0
+'''
+:int The index of the monitor that the panel is displayed on
+'''
+
+YPOS = 24
+'''
+:int The panels position relative to the upper or lower edge
+ of the monitor (which is determined by `TOP`)
+'''
+
+TOP = True
+'''
+:bool True: `YPOS` is relative to the upper edge
+ False: `YPOS` is relative to the lower edge
+'''
+
+
+
+TICKS_PER_SECOND = 4
+'''
+:float The number of times per second the panel is updated
+'''
+
+
+time_format = '%Y-(%m)%b-%d %T, %a w%V, %Z'
+'''
+:str The format the clock is displayed in
+'''
+
+my_clock = Clock(format = time_format, utc = False,
+ sync_to = Clock.SECONDS / TICKS_PER_SECOND)
+'''
+:Clock The clock monitor
+'''
+
+my_cpu = '...'
+'''
+:str The output of the CPU monitor
+'''
+
+my_mem = '...'
+'''
+:str The output of the RAM monitor
+'''
+
+my_swp = '...'
+'''
+:str The output of the swap memory monitor
+'''
+
+my_shm = '...'
+'''
+:str The output of the shared memory monitor
+'''
+
+my_net = 'Net: ...'
+'''
+:str The output of the network monitor
+'''
+
+my_snd = '...'
+'''
+:str The output of the audio monitor
+'''
+
+my_moc = '...'
+'''
+:str The output of the music on console monitor
+'''
+
+
+
+limited = lambda v : min(max(int(v + 0.5), 0), 100)
+'''
+:(float)→int Round an float to nearest integer and limit the range to [0, 100]
+'''
+
+
+len_ = len
+len = lambda string : colour_aware_len(string, len_)
+
+
+
+###############################################################################################################
+
+
+###############################################################################################################
+# CPU monitor
+
+
+def cpu():
+ '''
+ Update CPU usage
+ '''
+ global my_cpu, last_cpus_idle, last_cpus_total, last_cpu_idle, last_cpu_total
+ try:
+ cpu_ = CPU()
+ now_cpu_idle, now_cpus_idle = cpu_.cpu[CPU.idle], [cpu[CPU.idle] for cpu in cpu_.cpus]
+ now_cpu_total, now_cpus_total = sum(cpu_.cpu), [sum(cpu) for cpu in cpu_.cpus]
+ if len(now_cpus_idle) > len(last_cpus_idle):
+ last_cpus_idle += now_cpus_idle[len(last_cpus_idle):]
+ last_cpus_total += now_cpus_total[len(last_cpus_total):]
+ cpus = zip(now_cpus_idle, now_cpus_total, last_cpus_idle, last_cpus_total)
+ cpus = ' '.join([cpu_colourise(cpu_usage(*c)) for c in cpus])
+ cpu_ = cpu_colourise(cpu_usage(now_cpu_idle, now_cpu_total, last_cpu_idle, last_cpu_total))
+ cpu_ = '%s : %s' % (cpus, cpu_)
+ last_cpus_idle, last_cpus_total = now_cpus_idle, now_cpus_total
+ last_cpu_idle, last_cpu_total = now_cpu_idle, now_cpu_total
+ my_cpu = cpu_
+ except:
+ my_cpu = '...'
+
+def cpu_usage(now_idle, now_total, last_idle, last_total):
+ '''
+ Calculate the CPU usage
- lts = '-lts' if kernel_release.lower().endswith('-lts') else ''
- if have_pacman is None:
- have_pacman = True
- try:
- linux_installed = Pacman('linux-libre' + lts, True)
- except:
- have_linux_libre = False
- try:
- linux_installed = Pacman('linux' + lts, True)
- except:
- have_pacman = False
- if have_pacman:
- try:
- linux_latest = Pacman(('linux-libre' if have_linux_libre else 'linux') + lts, False)
- except:
- have_pacman = None
- elif have_pacman:
- try:
- linux_installed = Pacman(('linux-libre' if have_linux_libre else 'linux') + lts, True)
- linux_latest = Pacman(('linux-libre' if have_linux_libre else 'linux') + lts, False)
- except:
- have_pacman = None
+ @param now_idle:int Time spent in the idle task, at the current measurement
+ @param now_total:int Total time that has passed, at the current measurement
+ @param last_idle:int Time spent in the idle task, at the last measurement
+ @param last_total:int Total time that has passed, at the last measurement
+ @return :float? The CPU usage, `None` if not time has passed
+ '''
+ total = now_total - last_total
+ idle = now_idle - last_idle
+ return None if total == 0 else (total - idle) * 100 / total
+
+def cpu_colourise_(value):
+ '''
+ Colourise a CPU usage value
- if (have_pacman is not None) and have_pacman:
- linux_running = kernel_release.split('-')
- linux_running, kernel_release = linux_running[:2], linux_running[2:]
- linux_running = '-'.join(linux_running)
- kernel_release = '-' + '-'.join(kernel_release)
- linux_installed = linux_installed.version
- linux_latest = linux_latest.version
- if linux_installed == linux_latest:
- if linux_running == linux_installed:
- linux_running = '\033[32m%s\033[39m' % linux_running
+ @param value:int The CPU usage
+ @return :str `value` coloured with an appropriate colour
+ '''
+ if value is None:
+ return '--%'
+ elif value >= 100:
+ return '\033[31m100\033[0m'
+ colour = '39'
+ if value >= 5: colour = '32'
+ if value >= 50: colour = '33'
+ if value >= 90: colour = '31'
+ return '\033[%sm%2i\033[0m%%' % (colour, value)
+
+cpu_none = cpu_colourise_(None)
+'''
+:str Cache for the coloursation of the value `None`
+'''
+
+cpu_coloured = tuple(cpu_colourise_(i) for i in range(101))
+'''
+:tuple<str> Cache of colourised CPU usage values
+'''
+
+cpu_colourise = lambda v : cpu_none if v is None else cpu_coloured[limited(v)]
+'''
+:(value:int)→str Cached version of `cpu_colourise_`
+'''
+
+last_cpu_idle = 0
+'''
+:int Time spent in the idle task, at the last measurement, on all CPU-threads
+'''
+
+last_cpu_total = 0
+'''
+:int Total time that has passed, at the last measurement, on all CPU-threads
+'''
+
+last_cpus_idle = []
+'''
+:int Time spent in the idle task, at the last measurement, for each CPU-thread
+'''
+
+last_cpus_total = []
+'''
+:int Total time that has passed, at the last measurement, for each CPU-thread
+'''
+
+
+
+###############################################################################################################
+# Memory usage monitor
+
+
+def mem():
+ '''
+ Update memory usage
+ '''
+ global my_mem, my_swp, my_shm
+ try:
+ memory = Memory()
+ if memory.mem_total == 0:
+ my_mem = '---'
+ my_shm = '---'
else:
- if linux_running == linux_installed:
- linux_running = '\033[33m%s\033[39m' % linux_running
+ my_mem = memory_coloured[limited(memory.mem_used * 100 / memory.mem_total)]
+ my_shm = memory_coloured[limited(memory.shmem * 100 / memory.mem_total)]
+ if memory.swap_total == 0:
+ my_swp = 'off'
+ else:
+ my_swp = memory_coloured[limited(memory.swap_used * 100 / memory.swap_total)]
+ except:
+ my_mem = '...'
+ my_swp = '...'
+ my_shm = '...'
+
+def memory_colourise(value):
+ '''
+ Colourise a memory usage value
+
+ @param value:int The memory usage
+ @return :str `value` coloured with an appropriate colour
+ '''
+ if value >= 100:
+ return '\033[31m100\033[0m'
+ colour = '39'
+ if value > 30: colour = '32'
+ if value > 50: colour = '33'
+ if value > 80: colour = '31'
+ return '\033[%sm%i\033[0m%%' % (colour, value)
+
+memory_coloured = tuple(memory_colourise(i) for i in range(101))
+'''
+:tuple<str> Cache of colourised memory usage values
+'''
+
+
+
+###############################################################################################################
+# Network monitor
+
+
+def net():
+ '''
+ Update network usage and latency
+ '''
+ global my_net, net_time, net_last
+ try:
+ net_now = time.monotonic()
+ net_tdiff, net_time = net_now - net_time, net_now
+ devs = Network('lo').devices
+ def kbps(device, direction):
+ '''
+ Get the number of kilobits transmitted or received per second since the last measurement
+
+ @param device:str The network device
+ @param direction:str 'rx' for received data, 'tx' for transmitted data
+ @return :str The number of kilobits transmitted or received per second, colourised
+ '''
+ direction += '_bytes'
+ value = devs[device][direction]
+ if device in net_last:
+ value -= net_last[device][direction]
else:
- linux_running = '\033[31m%s\033[39m' % linux_running
- kernel_release = linux_running + kernel_release
- uname_ = '%s %s %s'
- uname_ %= (nodename, kernel_release, operating_system)
- return uname_
+ value = 0
+ value /= 128 * net_tdiff
+ return network_colourise(value)
+ def KBps(device, direction):
+ '''
+ Get the number of kilobytes transmitted or received per second since the last measurement
+
+ @param device:str The network device
+ @param direction:str 'rx' for received data, 'tx' for transmitted data
+ @return :float The number of kilobytes transmitted or received per second
+ '''
+ direction += '_bytes'
+ value = devs[device][direction]
+ if device in net_last:
+ value -= net_last[device][direction]
+ else:
+ value = 0
+ value /= 1024 * net_tdiff
+ return value
+ my_net = ' '.join('%s: %skbps(%.0fKB/s)↓ %skbps(%.0fKB/s)↑ %s' %
+ (dev, kbps(dev, 'rx'), KBps(dev, 'rx'), kbps(dev, 'tx'), KBps(dev, 'tx'), ping(dev))
+ for dev in devs)
+ net_last = devs
+ except:
+ my_net = 'Net: ...'
+
+def network_colourise(value):
+ '''
+ Colourise a network usage value
+
+ @param value:int The network usage, in kilobits per second
+ @return :str `value` coloured with an appropriate colour
+ '''
+ colour = '39'
+ if value > 40: colour = '32'
+ if value > 8000: colour = '33'
+ if value > 60000: colour = '31'
+ return '\033[%sm%3.0f\033[0m' % (colour, value)
+def ping(device):
+ '''
+ Get the latency for a network device
+
+ @param device:str The device
+ @return :str The latency, colourised
+ '''
+ try:
+ monitor = my_ping[device][0]
+ monitor.semaphore.acquire()
+ try:
+ latency = monitor.get_latency(True)[1]
+ droptime = monitor.dropped_time(True)
+ if droptime:
+ return '\033[31m%4is\033[00m' % droptime
+ elif latency is None:
+ return '\033[31mdrop?\033[00m'
+ colour = '31'
+ if latency < 5: colour = '32'
+ elif latency < 10: colour = '00'
+ elif latency < 20: colour = '33'
+ return '\033[%sm%5.2f\033[00m' % (colour, latency)
+ finally:
+ monitor.semaphore.release()
+ except:
+ return '...'
+
+my_ping = Ping(targets = Ping.get_nics(), interval = 10).monitors
+'''
+:Ping Latency monitor
+'''
net_time = time.monotonic()
+'''
+:float The time of the last reading
+'''
+
net_last = {}
-def network():
- global net_time, net_last
- net_now = time.monotonic()
- net_tdiff, net_time = net_now - net_time, net_now
- net_ = Network('lo').devices
- def colourise(value):
- colour = '39'
- if value > 40: colour = '32'
- if value > 8000: colour = '33'
- if value > 60000: colour = '31'
- return '\033[%sm%3.0f\033[0m' % (colour, value)
- def kbps(device, direction):
- direction += '_bytes'
- value = net_[device][direction]
- if device in net_last:
- value -= net_last[device][direction]
- else:
- value = 0
- value /= 128 * net_tdiff
- return colourise(value)
- def KBps(device, direction):
- direction += '_bytes'
- value = net_[device][direction]
- if device in net_last:
- value -= net_last[device][direction]
- else:
- value = 0
- value /= 1024 * net_tdiff
- return value
- net = [(dev, kbps(dev, 'rx'), KBps(dev, 'rx'), kbps(dev, 'tx'), KBps(dev, 'tx')) for dev in net_]
- net = ['%s: %skbps(%.0fKB/s)↓ %skbps(%.0fKB/s)↑' % x for x in net]
- net = '%s' % ' '.join(net)
- net_last = net_
- return net
-
-
-last_cpu_idle, last_cpu_total = 0, 0
-last_cpus_idle, last_cpus_total = [], []
-def cpu():
- global last_cpus_idle, last_cpus_total, last_cpu_idle, last_cpu_total
- cpu_ = CPU()
- now_cpu_idle, now_cpus_idle = cpu_.cpu[CPU.idle], [cpu[CPU.idle] for cpu in cpu_.cpus]
- now_cpu_total, now_cpus_total = sum(cpu_.cpu), [sum(cpu) for cpu in cpu_.cpus]
- def cpu_usage(now_idle, now_total, last_idle, last_total):
- total = now_total - last_total
- idle = now_idle - last_idle
- return None if total == 0 else (total - idle) * 100 / total
- def colourise(value):
- if value is None:
- return '--%'
- colour = '39'
- if value >= 5: colour = '32'
- if value >= 50: colour = '33'
- if value >= 90: colour = '31'
- return '\033[%sm%2.0f\033[0m%%' % (colour, value)
- if len(now_cpus_idle) > len(last_cpus_idle):
- last_cpus_idle += now_cpus_idle[len(last_cpus_idle):]
- last_cpus_total += now_cpus_total[len(last_cpus_total):]
- cpus = zip(now_cpus_idle, now_cpus_total, last_cpus_idle, last_cpus_total)
- cpus = ' '.join([colourise(cpu_usage(*c)) for c in cpus])
- cpu = colourise(cpu_usage(now_cpu_idle, now_cpu_total, last_cpu_idle, last_cpu_total))
- cpu = 'Cpu: %s : %s' % (cpus, cpu)
- last_cpus_idle, last_cpus_total = now_cpus_idle, now_cpus_total
- last_cpu_idle, last_cpu_total = now_cpu_idle, now_cpu_total
- return cpu
-
-
-weather_last = '?'
-weather_semaphore = threading.Semaphore()
-def weather_update_():
- global weather_last
- if weather_semaphore.acquire(blocking = False):
+'''
+:dict<str, dict<str, int>> Readings from the update
+'''
+
+
+
+###############################################################################################################
+# Audio monitor
+
+
+def snd(add_missing_mixers = True):
+ '''
+ Update audio volume
+
+ @param add_missing_mixers:bool Whether missing mixers should be added
+ '''
+ global my_snd, stops_alsa
+ stops_ = []
+ if add_missing_mixers:
try:
- weather_ = Weather('ESSA').temp
- colour = '34'
- if weather_ < -10: colour = '39;44'
- if weather_ >= 18: colour = '39'
- if weather_ >= 25: colour = '33'
- if weather_ >= 30: colour = '31'
- weather_ = '\033[%sm%.0f\033[0m°C' % (colour, weather_)
- weather_last = weather_
+ for i, mixer in my_alsa:
+ if mixer is None:
+ my_alsa[i] = alsa(*(mixers[i]))
except:
- if not weather_last.endswith('?'):
- weather_last += '?'
- weather_semaphore.release()
-weather_update = Sometimes(lambda : async(weather_update_), 20 * 60 * 2)
-def weather():
- rc = weather_last
- weather_update()
- return rc
-
-
-chase_ = Chase()
-chase_update = Sometimes(lambda : async(chase_.update), 2 * 60 * 60 * 2)
-def chase():
- status = chase_.status
- status = '39' if status is None else ('32' if status else '31')
- chase_update()
- return '\033[%smChase\033[0m' % status
-
-
-
-functions = [ Sometimes(lambda : clock_.read(), 1 * 2),
- lambda : time.time() % 1,
- Sometimes(users, 1 * 2),
- weather,
- chase,
- cpu,
- memory,
- network,
- Sometimes(uname, 30 * 60 * 2),
+ pass
+ try:
+ offset = 0
+ my_snd_ = []
+ for mixer in my_alsa:
+ if mixer is not None:
+ text = snd_read_m(mixer)
+ my_snd_.append(text)
+ text_len = len(text)
+ mixer_stops = [0, text_len]
+ mixer_name_len, text = len(text.split(': ')[0]), ': '.join(text.split(': ')[1:])
+ for i in range((len(text) + 1) // 4):
+ mixer_stops.append(mixer_name_len + 2 + 4 * i + 0)
+ mixer_stops.append(mixer_name_len + 2 + 4 * i + 3)
+ stops_.append(tuple(offset + stop for stop in mixer_stops))
+ offset += text_len + 3
+ else:
+ stops_.append((-1, -1))
+ my_snd = ' │ '.join(my_snd_)
+ stops_alsa = stops_
+ except:
+ my_snd = '...'
+ stops_alsa = [(-1, -1)] * len(my_alsa)
+
+def alsa(cardindex, mixername):
+ '''
+ The a volume controller for a mixer
+
+ @param cardindex:int The index of the audio card
+ @param mixername:str The name of the mixer
+ @return :Alsa? Volume controller, `None` if the mixers is not available
+ '''
+ try:
+ return ALSA(cardindex, mixername)
+ except:
+ return None
+
+snd_text_v = lambda v : '--%' if v is None else ('%2i%%' % v)[:3]
+'''
+:(v:int?)→str Convert a volume integer to a volume str, `None` as input means it is muted
+'''
+
+snd_read_m = lambda m : '%s: %s' % (m.mixername, ' '.join(snd_text_v(v) for v in m.get_volume()))
+'''
+:(m:ALSA)→str Create a string representing the current volumes on a mixer
+'''
+
+mixers = ((0, 'Master'), (0, 'Headphone'), (0, 'Speaker'), (0, 'PCM'))
+'''
+:tuple<(cardindex:int, mixername:str)> List of mixers that should be monitored
+'''
+
+my_alsa = [alsa(card, mixer) for card, mixer in mixers]
+'''
+:list<ALSA> ALSA volume controllers
+'''
+
+stops_alsa = [(-1, -1)] * len(my_alsa)
+'''
+:itr<itr<int>> Map from mixer index to location of substring in the monitor display.
+ The first element is where the mixer starts, and the second element is
+ where the mixer stops. Each mixer may have 2 additional elements for each
+ channel, in that case that are starts and stops, alternating.
+'''
+
+
+
+###############################################################################################################
+# Music on console monitor
+
+
+my_moc = '> || [] |< >|'
+
+moc_controller = MOC()
+'''
+:MOC The MOC controller
+'''
+
+
+
+###############################################################################################################
+
+
+###############################################################################################################
+
+
+
+functions = [ my_clock.read
+ , lambda : my_cpu
+ , lambda : my_mem
+ , lambda : my_swp
+ , lambda : my_shm
+
+ , lambda : my_net
+ , lambda : my_snd
+ , lambda : my_moc
]
+'''
+:itr<()→str> Functions that are evulated and replaces the %s:s in
+ `pattern` every time to panel is redrawn
+'''
-pattern = [ '%s │ %.2f │ %s │ %s │ %s }{ %s │ %s │ %s │ %s'
+pattern = [ [ '%n%s%n │ %nCpu: %s%n │ %nMem: %s%n │ %nSwp: %s%n │ %nShm: %s%n'
+ , '%n%s%n │ %n%s%n │ %nMoc: %s%n'
+ ]
]
+'''
+:itr<itr<str>> The layout of the monitors on the panel
+'''
+
+
+async_fun = [ Sometimes(cpu, TICKS_PER_SECOND)
+ , Sometimes(mem, TICKS_PER_SECOND)
+ , Sometimes(net, TICKS_PER_SECOND)
+ , Sometimes(snd, TICKS_PER_SECOND * 3)
+ ]
+'''
+:itr<()→void> Monitors that are be update asynchronously
+'''
+
+semaphore = threading.Semaphore()
+'''
+:Semaphore Semaphore used to make sure functions do not step on each others' toes
+'''
+
+
+HEIGHT_PER_LINE = 12
+'''
+:int The height of each line
+'''
+
+
+HEIGHT = len(pattern) * HEIGHT_PER_LINE
+'''
+:int The height of the panel
+'''
+
+
+stops = [-1] * (2 * len(functions))
+'''
+:itr<int> Locations of stops marked by `%n` in `pattern`
+'''
+
+
+
+pattern = '\n'.join('\0'.join(p) for p in pattern)
+
start_ = start
def start():
+ '''
+ Invoked when it is time to create panels and map them
+ '''
+ # Create panel, clear it, and synchronise
start_()
- async(lambda : clock_.continuous_sync(lambda : bar.invalidate()))
+ bar.clear()
+ get_display().flush()
+
+ def update_sys():
+ '''
+ Update data asynchronously, but do not redraw the panel
+ '''
+ [f() for f in async_fun]
+
+ def update_clock():
+ '''
+ Update the clock, and redraw the panel
+ '''
+ if semaphore.acquire(blocking = False):
+ try:
+ for f in functions:
+ if isinstance(f, Clocked):
+ f(True)
+ finally:
+ semaphore.release()
+ bar.invalidate()
+
+ # Start monitoring
+ async(lambda : watch(1 / TICKS_PER_SECOND, update_sys), name = 'sys')
+ async(lambda : my_clock.continuous_sync(update_clock), name = 'clock')
+
-HEIGHT = len(pattern) * 12
-pattern = '\n'.join([p.replace('}{', '\0') for p in pattern])
-semaphore = threading.Semaphore()
def redraw():
+ '''
+ Invoked when redraw is needed
+ '''
+ global stops
if semaphore.acquire(blocking = False):
- values = pattern % tuple([f() for f in functions])
- #print(values.replace('\0', ' ' * 8))
- bar.partial_clear(0, bar.width, 10, 0, 2, values)
- bar.draw_coloured_splitted_text(0, bar.width, 10, 0, 2, values)
- semaphore.release()
- return True
- return False
+ try:
+ (values, stops) = sprintf(pattern, *(f() for f in functions))
+ bar.partial_clear(0, bar.width, 10, 0, 2, values)
+ bar.draw_coloured_splitted_text(0, bar.width, 10, 0, 2, values)
+ finally:
+ semaphore.release()
+
+
+
+LEFT_BUTTON = 1
+'''
+:int The index of the left button
+'''
+
+MIDDLE_BUTTON = 2
+'''
+:int The index of the middle button
+'''
+
+RIGHT_BUTTON = 3
+'''
+:int The index of the right button
+'''
+
+SCROLL_UP = 4
+'''
+:int The index of the psuedo-button for scrolling upwards
+'''
+
+SCROLL_DOWN = 5
+'''
+:int The index of the psuedo-button for scrolling downwards
+'''
+
+FORWARD_BUTTON = 8 # X1
+'''
+:int The index of the forward button, also known as X1
+'''
+
+BACKWARD_BUTTON = 9 # X2
+'''
+:int The index of the backward button, also known as X2
+'''
+
+
+def unhandled_event(e):
+ '''
+ Invoked when an unrecognised even is polled,
+ feel free to replace this completely
+
+ @param e The event
+ '''
+ if isinstance(e, Xlib.protocol.event.ButtonPress):
+ y = e.event_y // HEIGHT_PER_LINE
+ lx = e.event_x // bar.font_width
+ rx = (bar.width - e.event_x) // bar.font_width
+ button = e.detail
+ button_pressed(y, lx, rx, button)
+
+
+def button_pressed(y, lx, rx, button):
+ '''
+ Called from `unhandled_event` when a button on a pointer device is pressed
+
+ @param y:int The line that the pointer is on, zero based
+ @param lx:int The column the pointer is on, relative to the left edge, zero based
+ @param rx:int The column the pointer is on, relative to the right edge, zero based
+ @param button:int The button on the device that is pressed
+ '''
+ # Stops at the left side of line 0
+ stops_l0 = stops[0 : 10]
+
+ # Stops at the right side of line 0
+ stops_r0 = [stops[15] - x for x in stops[10 : 16]]
+
+ try:
+ if y == 0:
+ if stops_l0[0] <= lx < stops_l0[1]: # clock
+ if button == LEFT_BUTTON:
+ Clock.__init__(my_clock, time_format, not my_clock.utc, my_clock.sync_to)
+ bar.invalidate()
+ elif stops_l0[2] <= lx < stops_l0[3]: # cpu
+ pass
+ elif stops_l0[4] <= lx < stops_l0[5]: # mem
+ pass
+ elif stops_l0[6] <= lx < stops_l0[7]: # swp
+ pass
+ elif stops_l0[8] <= lx < stops_l0[9]: # shm
+ pass
+ elif stops_r0[0] > rx >= stops_r0[1]: # net
+ pass
+ elif stops_r0[2] > rx >= stops_r0[3]: # snd
+ button_pressed_mixer(stops_r0[2] - rx - 1, button)
+ elif stops_r0[4] > rx >= stops_r0[5]: # moc
+ mx = stops_r0[4] - rx - 1 - 5
+ if button == LEFT_BUTTON:
+ if 0 <= mx < 1: async(lambda : moc_controller.play().wait()) # >
+ elif 2 <= mx < 4: async(lambda : moc_controller.toggle_pause().wait()) # ||
+ elif 5 <= mx < 7: async(lambda : moc_controller.stop().wait()) # []
+ elif 8 <= mx < 10: async(lambda : moc_controller.previous().wait()) # |<
+ elif 11 <= mx < 13: async(lambda : moc_controller.next().wait()) # >|
+ elif button == FORWARD_BUTTON: async(lambda : moc_controller.next().wait())
+ elif button == BACKWARD_BUTTON: async(lambda : moc_controller.previous().wait())
+ elif button == SCROLL_UP: async(lambda : moc_controller.seek(+5).wait())
+ elif button == SCROLL_DOWN: async(lambda : moc_controller.seek(-5).wait())
+ except:
+ pass
+
+
+def button_pressed_mixer(mx, button):
+ '''
+ Called from `button_pressed` when the user is touched the audio mixer monitor
+
+ @param mx:int The column the pointer is at right-relative to the left edge of the mixer display
+ @param button:int The button on the device that is pressed
+ '''
+ for mixer_index, mixer_stops in enumerate(stops_alsa):
+ if mixer_stops[0] <= mx < mixer_stops[1]:
+ # Get channel
+ channel = ALSA.ALL_CHANNELS
+ if not button == RIGHT_BUTTON: # not (balance channels)
+ mixer_stops = mixer_stops[2:]
+ for channel_index in range(len(mixer_stops) // 2):
+ if mixer_stops[channel_index * 2 + 0] <= mx < mixer_stops[channel_index * 2 + 1]:
+ channel = channel_index
+ break
+ # Get mixer
+ mixer = my_alsa[mixer_index]
+ # Get volumes and all selected channels
+ volumes = mixer.get_volume()
+ channels = list(range(len(volumes))) if channel == ALSA.ALL_CHANNELS else [channel]
+ # Filter volumes to selected channels
+ volumes = [volume for c, volume in enumerate(volumes) if c in channels]
+
+ # Control the volume
+ if button == LEFT_BUTTON: # toggle mute
+ mute = not any(volume is None for volume in volumes)
+ [mixer.set_mute(mute, c) for c in channels]
+ elif button == RIGHT_BUTTON: # balance channels
+ mixer.set_volume(sum(volumes) // len(volumes), ALSA.ALL_CHANNELS)
+ elif button == SCROLL_UP: # turn up the volume
+ [mixer.set_volume(limited(v + 5), c) for c, v in zip(channels, volumes)]
+ elif button == SCROLL_DOWN: # turn down the volume
+ [mixer.set_volume(limited(v - 5), c) for c, v in zip(channels, volumes)]
+
+ # Update the panel
+ snd(False)
+ bar.invalidate()
+ break