aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorMattias Andrée <maandree@kth.se>2016-10-04 15:42:56 +0200
committerMattias Andrée <maandree@kth.se>2016-10-04 15:42:56 +0200
commit13c47c115b6b87bf9793c47fca4ca42ac1b69251 (patch)
tree13d12d720a5700753314507c35b0e7421c6952f8
parentupdate ipaddress (diff)
downloadxpybar-1.17.tar.gz
xpybar-1.17.tar.bz2
xpybar-1.17.tar.xz
add compact example1.17
Signed-off-by: Mattias Andrée <maandree@kth.se>
Diffstat (limited to '')
-rw-r--r--examples/compact2756
1 files changed, 2756 insertions, 0 deletions
diff --git a/examples/compact b/examples/compact
new file mode 100644
index 0000000..6ba9fe7
--- /dev/null
+++ b/examples/compact
@@ -0,0 +1,2756 @@
+# -*- python -*-
+'''
+xpybar – xmobar replacement written in python
+Copyright © 2014, 2015, 2016 Mattias Andrée (maandree@member.fsf.org)
+
+This program is free software: you can redistribute it and/or modify
+it under the terms of the GNU General Public License as published by
+the Free Software Foundation, either version 3 of the License, or
+(at your option) any later version.
+
+This program is distributed in the hope that it will be useful,
+but WITHOUT ANY WARRANTY; without even the implied warranty of
+MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+GNU General Public License for more details.
+
+You should have received a copy of the GNU General Public License
+along with this program. If not, see <http://www.gnu.org/licenses/>.
+'''
+
+# This is a template configuration of a panel that can display almost
+# all information and control, but suited for a height constrained screen
+
+# It is recommended to have python-posix_ipc and
+# <https://github.com/maandree/pdeath> installed
+
+import os
+import sys
+import pwd
+import time
+import threading
+import subprocess
+
+from util import *
+
+from plugins.alsa import ALSA
+from plugins.clock import Clock
+from plugins.cpu import CPU
+from plugins.cpuinfo import CPUInfo
+from plugins.cpuonline import CPUOnline
+from plugins.dentrystate import DentryState
+from plugins.df import Discs
+from plugins.discstats import DiscStats
+from plugins.files import Files
+from plugins.inodestate import InodeState
+from plugins.ipaddress import IPAddress
+from plugins.leapsec import LeapSeconds
+from plugins.loadavg import AverageLoad
+from plugins.locks import Locks
+from plugins.mem import Memory
+from plugins.moc import MOC
+from plugins.network import Network
+from plugins.pacman import Pacman
+from plugins.ping import Ping
+from plugins.random import Random
+from plugins.snmp import SNMP
+from plugins.snmp6 import SNMP6
+from plugins.softirqs import SoftIRQs
+from plugins.swaps import Swaps
+from plugins.tzclock import TZClock
+from plugins.uname import Uname
+from plugins.uptime import Uptime
+from plugins.users import Users
+from plugins.vmstat import VMStat
+from plugins.weather import Weather
+from plugins.xdisplay import XDisplay
+from plugins.xkb import XKB
+#from plugins.application import Application
+#from plugins.hdparm import HDParm
+#from plugins.image import Image
+#from plugins.kmsg import KMsg
+#from plugins.lunar import Lunar
+#from plugins.menu import Menu
+#from plugins.ropty import ROPTY
+#from plugins.solar import Solar
+
+import Xlib.X, Xlib.XK, Xlib.protocol.event
+
+
+
+OUTPUT, HEIGHT_PER_LINE, YPOS, TOP = 0, 12, 0, True
+
+clock = Clock(sync_to = 0.5)
+
+TERMINAL = 'terminator'
+
+
+
+LEFT_BUTTON = 1
+MIDDLE_BUTTON = 2
+RIGHT_BUTTON = 3
+SCROLL_UP = 4
+SCROLL_DOWN = 5
+FORWARD_BUTTON = 8 # X1
+BACKWARD_BUTTON = 9 # X2
+
+
+
+limited = lambda v : min(max(int(v + 0.5), 0), 100)
+
+def t_(f):
+ try:
+ f()
+ except:
+ time.sleep(1)
+
+t = lambda f : (lambda : t_(f))
+
+def pdeath(signal, *command):
+ try:
+ path = os.environ['PATH'].split(':')
+ for p in path:
+ p += '/pdeath'
+ if os.access(p, os.F_OK | os.X_OK, effective_ids = True):
+ return (p, signal, *command)
+ except:
+ pass
+ return command
+
+HOME = pwd.getpwuid(os.getuid()).pw_dir
+
+
+
+SEPARATOR = ' │ '
+
+def invalidate():
+ try:
+ bar.invalidate()
+ except:
+ pass
+
+class Entry:
+ def __init__(self, wrap = None, left = True, line = 0, prev = None):
+ self.left = left
+ self.line = line
+ self.prev = prev
+ self.width = 0
+ self.offset = 0
+ self.last_text = None
+ self.short_layout = None
+ self.wrapped = self.function if wrap is None else wrap(self.function)
+
+ def invalidate(self):
+ wrapper = self.wrapped
+ if isinstance(wrapper, Clocked):
+ wrapper = wrapper.sometimes
+ if isinstance(wrapper, Sometimes):
+ wrapper.counter = 0
+ invalidate()
+
+ def action(self, col, button, x, y):
+ pass
+
+ def click(self, row, lcol, rcol, button, x, y):
+ if row == self.line:
+ col = (lcol if self.left else rcol) - self.offset
+ if 0 <= col < self.width:
+ if self.left:
+ x -= self.offset * bar.font_width
+ else:
+ col = self.width - 1 - col
+ x = bar.width - x
+ x -= self.offset * bar.font_width
+ x = self.width * bar.font_width - x
+ y -= self.line * HEIGHT_PER_LINE
+ try:
+ self.action(col, button, x, y)
+ except Exception as err:
+ print(str(err), file = sys.stderr, flush = True)
+ return True
+ return False
+
+ def function(self):
+ return SEPARATOR
+
+ def __call__(self, *args, **kwargs):
+ try:
+ text = self.wrapped(*args, **kwargs)
+ if text is not self.last_text:
+ self.width = Bar.coloured_length(text)
+ self.last_text = text
+ return text
+ except Exception as err:
+ print(err)
+
+ def update_position(self):
+ if self.prev is not None:
+ self.prev.update_position()
+ self.offset = self.prev.offset + self.prev.width
+
+class Group:
+ def __init__(self, functions):
+ global HEIGHT
+ self.pattern = ''
+ self.posupdate = []
+ left = None
+ right = None
+ lineno = 0
+ atleft = True
+ newline = False
+ lastright = None
+ new_functions = []
+ separator = None
+ for entry in functions:
+ if entry is None:
+ newline = not atleft
+ lineno += 1 if newline else 0
+ atleft = not atleft
+ continue
+ entry.left = atleft
+ entry.line = lineno
+ if newline:
+ self.pattern += '\n'
+ newline = False
+ if left is not None:
+ self.posupdate.append(left)
+ if right is not None:
+ self.posupdate.append(right)
+ left = None
+ right = None
+ lastright = None
+ if entry.left:
+ if left is not None:
+ separator = Entry(left = True)
+ new_functions.append(separator)
+ self.pattern += '%s'
+ separator.prev = left
+ left = separator
+ entry.prev = left
+ left = entry
+ elif right is None:
+ if left is not None:
+ self.pattern += '\0 '
+ right = entry
+ lastright = entry
+ else:
+ separator = Entry(left = False)
+ new_functions.append(separator)
+ self.pattern += '%s'
+ lastright.prev = separator
+ separator.prev = entry
+ lastright = entry
+ new_functions.append(entry)
+ self.pattern += '%s'
+ if left is not None:
+ self.posupdate.append(left)
+ if right is not None:
+ self.posupdate.append(right)
+ height = (lineno + 1) * HEIGHT_PER_LINE
+ if HEIGHT < height:
+ HEIGHT = height
+ self.functions = new_functions
+
+
+
+class MyXMonad(Entry):
+ def __init__(self, *args, mod = Xlib.XK.XK_Super_L, ppWsSep = '', ppSep = ' ', **kwargs):
+ self.text = ''
+ self.text_short = ''
+ self.ws_sep_len = len(ppWsSep)
+ self.sep_len = len(ppSep)
+ self.mod = mod
+ self.show_full = False
+ self.short_layout = None
+ Entry.__init__(self, *args, **kwargs)
+ async(lambda : forever(t(self.refresh)), name = 'xmonad')
+
+ def action(self, col, button, x, y):
+ if button == MIDDLE_BUTTON:
+ self.show_full = not self.show_full
+ self.invalidate()
+ return
+ text = self.text
+ full = self.show_full
+ UTF8_STRING = display.intern_atom('UTF8_STRING')
+ _XMONAD_LOG = display.intern_atom('_XMONAD_LOG')
+ _NET_DESKTOP_NAMES = display.intern_atom('_NET_DESKTOP_NAMES')
+ xmonad_log = display.screen().root.get_full_property(_XMONAD_LOG, UTF8_STRING).value
+ xmonad_log = xmonad_log.split(' : ')
+ net_desktop_names = display.screen().root.get_full_property(_NET_DESKTOP_NAMES, UTF8_STRING).value
+ net_desktop_names = net_desktop_names[:-1].split('\0')
+ ws = 0
+ ws_hit = False
+ while ws < len(net_desktop_names):
+ if ws > 0:
+ col -= self.ws_sep_len
+ if col < 0:
+ break
+ width = len(net_desktop_names[ws] if full else str(ws + 1)) + 2
+ if col < width:
+ ws_hit = True
+ break
+ ws += 1
+ col -= width
+ if ws < len(net_desktop_names):
+ if button == LEFT_BUTTON:
+ self.switch_workspace(ws + 1)
+ elif button in (SCROLL_UP, SCROLL_DOWN):
+ xmonad_log = xmonad_log[0].split(' ')
+ active = [net_desktop_names.index(w[1:-1]) for w in xmonad_log if w[:1] == '<']
+ ws = [net_desktop_names.index(w[1:-1]) for w in xmonad_log if w[:1] == '['][0]
+ ws += -1 if button == SCROLL_UP else +1
+ ws %= len(net_desktop_names)
+ while ws in active:
+ ws += -1 if button == SCROLL_UP else +1
+ ws %= len(net_desktop_names)
+ self.switch_workspace(ws + 1)
+ elif button == LEFT_BUTTON:
+ col -= self.sep_len
+ layout = xmonad_log[1]
+ if not self.show_full and self.short_layout is not None:
+ layout = self.short_layout
+ if col < len(layout) + 2:
+ display.xtest_fake_input(Xlib.X.KeyPress, display.keysym_to_keycode(self.mod))
+ display.xtest_fake_input(Xlib.X.KeyPress, display.keysym_to_keycode(Xlib.XK.XK_space))
+ display.xtest_fake_input(Xlib.X.KeyRelease, display.keysym_to_keycode(Xlib.XK.XK_space))
+ display.xtest_fake_input(Xlib.X.KeyRelease, display.keysym_to_keycode(self.mod))
+ display.flush()
+
+ def refresh(self):
+ try:
+ text = input()
+ except:
+ sys.exit(0)
+ buf, esc = '', None
+ for c in text:
+ if esc is not None:
+ esc += c
+ if esc == '^':
+ buf += '^'
+ esc = None
+ elif esc[-1] == ')':
+ if esc.startswith('bg(') or esc.startswith('fg('):
+ c = 4 if esc.startswith('bg(') else 3
+ esc = esc[3 : -1]
+ if esc == '':
+ buf += '\033[%i9m' % c
+ else:
+ r, g, b = esc[1 : 3], esc[3 : 5], esc[5 : 7]
+ r, g, b = int(r, 16), int(g, 16), int(b, 16)
+ r, g, b = str(r), str(g), str(b)
+ buf += '\033[%i8;2;%sm' % (c, ';'.join([r, g, b]))
+ esc = None
+ elif c == '^':
+ esc = ''
+ else:
+ buf += c
+ self.text = buf
+ short = ''
+ space, esc, layout, lbuf = False, False, 0, ''
+ ws = 1
+ for c in buf:
+ if esc:
+ short += c
+ esc = not (c == '~' or 'a' <= c.lower() <= 'z')
+ elif c == '\033':
+ esc = True
+ if layout == 2:
+ layout = 3
+ end = ''
+ while lbuf.startswith(' '):
+ short += ' '
+ lbuf = lbuf[1:]
+ while lbuf.endswith(' '):
+ end += ' '
+ lbuf = lbuf[:-1]
+ self.short_layout = self.shorten_layout(lbuf)
+ short += self.short_layout + end
+ short += c
+ elif ws <= 9:
+ if c in ' <>[]':
+ space = not space
+ if not space:
+ short += '%i' % ws
+ ws += 1
+ if ws == 10:
+ layout = 1
+ short += c
+ elif not space:
+ short += c
+ elif layout == 1:
+ if c == ' ':
+ short += c
+ else:
+ layout = 2
+ lbuf += c
+ elif layout == 2:
+ lbuf += c
+ else:
+ short += c
+ self.text_short = short
+ self.invalidate()
+
+ def shorten_layout(self, text):
+ text = text.split(' ')
+ incl = []
+ for word in text:
+ if word not in ('ImageButtonDeco', 'Maximize', 'Minimize'):
+ incl.append(word)
+ return ' '.join(incl).replace('Mirror ', '\\')
+
+ def switch_workspace(self, ws):
+ display.xtest_fake_input(Xlib.X.KeyPress, display.keysym_to_keycode(self.mod))
+ display.xtest_fake_input(Xlib.X.KeyPress, display.keysym_to_keycode(Xlib.XK.XK_0 + ws))
+ display.xtest_fake_input(Xlib.X.KeyRelease, display.keysym_to_keycode(Xlib.XK.XK_0 + ws))
+ display.xtest_fake_input(Xlib.X.KeyRelease, display.keysym_to_keycode(self.mod))
+ display.flush()
+
+ def function(self):
+ return self.text if self.show_full else self.text_short
+
+
+class MyScroll(Entry):
+ def function(self):
+ return '↓↑'
+
+ def action(self, col, button, x, y):
+ global groups, groupi, group
+ if button in (LEFT_BUTTON, RIGHT_BUTTON, SCROLL_UP, SCROLL_DOWN):
+ groupi = (groupi + (+1 if button in (LEFT_BUTTON, SCROLL_DOWN) else -1)) % len(groups)
+ group = groups[groupi]
+ self.invalidate()
+
+
+class MyClock(Entry):
+ def __init__(self, *args, format = '%Y-%m-%d %T', long_format = '%Y-(%m)%b-%d %T, %a w%V, %Z', tzs = None, **kwargs):
+ self.default_format = format
+ self.long_format = long_format
+ self.clock = clock
+ self.set_format(self.clock, self.default_format)
+ self.tzs = [] if tzs is None else tzs
+ self.tzi = 0
+ mqueue_map['timefmt'] = self.mqueue
+ mqueue_map['tz'] = self.mqueue
+ Entry.__init__(self, *args, **kwargs)
+
+ def mqueue(self, args):
+ if args[0] == 'timefmt':
+ args = args[1:]
+ if len(args) == 0:
+ self.action(0, RIGHT_BUTTON, 0, 0)
+ else:
+ self.set_format(self.clock, ' '.join(args))
+ self.invalidate()
+ if args[0] == 'tz':
+ args = args[1:]
+ if len(args) == 0:
+ self.action(0, LEFT_BUTTON, 0, 0)
+ else:
+ old_clock = self.clock
+ tz = '/'.join(args)
+ try:
+ tzclock = TZClock(timezone = tz, format = self.clock.u_format)
+ except:
+ try:
+ tzclock = TZClock(timezone = tz.upper(), format = self.clock.u_format)
+ except:
+ print('%s: unknown typezone: %s' % (sys.argv[0], tz), file = sys.stderr, flush = True)
+ return
+ tzclock.u_format = tzclock.format
+ self.clock = tzclock
+ if old_clock is not clock:
+ del old_clock
+ self.invalidate()
+
+ def action(self, col, button, x, y):
+ if button == LEFT_BUTTON:
+ if self.clock is clock:
+ clock.utc = not clock.utc
+ self.set_format(clock, self.clock.u_format)
+ else:
+ self.set_format(clock, self.clock.u_format)
+ self.clock = clock
+ self.invalidate()
+ elif button == RIGHT_BUTTON:
+ if self.clock.u_format == self.default_format:
+ self.set_format(self.clock, self.long_format)
+ else:
+ self.set_format(self.clock, self.default_format)
+ self.invalidate()
+ elif button in (SCROLL_UP, SCROLL_DOWN):
+ i = self.tzi + (+1 if button == SCROLL_UP else -1)
+ if not (0 <= i <= len(self.tzs)):
+ return
+ self.tzi = i
+ old_clock = self.clock
+ if i > 0:
+ tzclock = TZClock(timezone = self.tzs[i - 1], format = self.clock.u_format)
+ tzclock.u_format = tzclock.format
+ self.clock = tzclock
+ else:
+ self.clock = clock
+ if old_clock is not clock:
+ del old_clock
+ self.invalidate()
+
+ def set_format(self, clck, format):
+ clck.u_format = format
+ if not (clck is clock and clck.utc):
+ clck.format = format
+ else:
+ fmt, esc = '', False
+ for c in format:
+ if esc:
+ fmt += c
+ if c == '%' or 'a' <= c.lower() <= 'z':
+ esc = False
+ if c == 'Z':
+ while fmt[-1] != '%':
+ fmt = fmt[:-1]
+ fmt = fmt[:-1] + 'UTC'
+ elif c == '%':
+ esc = True
+ fmt += c
+ else:
+ fmt += c
+ clck.format = fmt
+
+ def function(self):
+ clck = self.clock
+ colour = '0'
+ if clck is not clock or clck.utc:
+ colour += ';7'
+ return '\033[%sm%s\033[00m' % (colour, clck.read())
+
+
+class MyComputer(Entry):
+ def __init__(self, *args, locks = None, **kwargs):
+ self.have_linux_libre, self.have_pacman = True, None
+ self.linux_installed, self.linux_latest = None, None
+ self.last_uname_read = ''
+ self.last_users_read = ''
+ self.xdisplay = None
+ self.you = pwd.getpwuid(os.getuid()).pw_name
+ self.display = 0
+ self.show_detailed = False
+ self.keys = []
+ if locks is None:
+ locks = ['num', 'cap']
+ self.num_lock = 'num' in locks or 'number' in locks or 'numerical' in locks or 'numpad' in locks
+ self.cap_lock = 'cap' in locks or 'caps' in locks
+ self.scr_lock = 'scr' in locks or 'scroll' in locks or 'scrl' in locks
+ if self.num_lock:
+ self.keys.append(Xlib.XK.XK_Num_Lock)
+ if self.cap_lock:
+ self.keys.append(Xlib.XK.XK_Caps_Lock)
+ if self.scr_lock:
+ self.keys.append(Xlib.XK.XK_Scroll_Lock)
+ Entry.__init__(self, *args, **kwargs)
+
+ def action(self, col, button, x, y):
+ if button == LEFT_BUTTON:
+ if self.display == 3:
+ col -= 5
+ if col % 4 == 3:
+ return
+ col //= 4
+ key = self.keys[col]
+ display.xtest_fake_input(Xlib.X.KeyPress, display.keysym_to_keycode(key))
+ display.xtest_fake_input(Xlib.X.KeyRelease, display.keysym_to_keycode(key))
+ display.flush()
+ elif button == MIDDLE_BUTTON:
+ self.show_detailed = not self.show_detailed
+ self.invalidate()
+ elif button == RIGHT_BUTTON:
+ if self.display == 0:
+ async(lambda : subprocess.Popen(['mate-system-monitor']).wait())
+ else:
+ self.invalidate()
+ elif button == SCROLL_UP:
+ self.display = min(self.display + 1, 2) ## TODO the keyboard support is not too good
+ self.invalidate()
+ elif button == SCROLL_DOWN:
+ self.display = max(self.display - 1, 0)
+ self.invalidate()
+
+ def function_uname(self):
+ uname = Uname()
+ nodename = uname.nodename
+ kernel_release = uname.kernel_release
+ operating_system = uname.operating_system
+
+ lts = '-lts' if kernel_release.lower().endswith('-lts') else ''
+ if self.have_pacman is None:
+ self.have_pacman = True
+ try:
+ self.linux_installed = Pacman('linux-libre' + lts, True)
+ except:
+ self.have_linux_libre = False
+ try:
+ self.linux_installed = Pacman('linux' + lts, True)
+ except:
+ self.have_pacman = False
+ if self.have_pacman:
+ try:
+ self.linux_latest = Pacman(('linux-libre' if self.have_linux_libre else 'linux') + lts, False)
+ except:
+ self.have_pacman = None
+ elif self.have_pacman:
+ try:
+ self.linux_installed = Pacman(('linux-libre' if self.have_linux_libre else 'linux') + lts, True)
+ self.linux_latest = Pacman(('linux-libre' if self.have_linux_libre else 'linux') + lts, False)
+ except:
+ self.have_pacman = None
+
+ if (self.have_pacman is not None) and self.have_pacman:
+ linux_running = kernel_release.split('-')
+ linux_running, kernel_release = linux_running[:2], linux_running[2:]
+ linux_running = '-'.join(linux_running)
+ kernel_release = '-' + '-'.join(kernel_release)
+ self.linux_installed = self.linux_installed.version
+ self.linux_latest = self.linux_latest.version
+ if self.linux_installed == self.linux_latest:
+ if linux_running == self.linux_installed:
+ linux_running = '\033[32m%s\033[39m' % linux_running
+ else:
+ if linux_running == self.linux_installed:
+ linux_running = '\033[33m%s\033[39m' % linux_running
+ else:
+ linux_running = '\033[31m%s\033[39m' % linux_running
+ kernel_release = linux_running + kernel_release
+
+ rc = '%s %s %s'
+ rc %= (nodename, kernel_release, operating_system)
+ self.last_uname_read = rc
+ return rc
+
+ def function_users(self): ## TODO use inotify
+ users = Users('devfs').users
+ def colour_user(user):
+ if user == 'root': return '\033[31m%s\033[39m'
+ elif not user == self.you: return '\033[33m%s\033[39m'
+ else: return '%s'
+ users = ['%s{%i}' % (colour_user(u) % u, len(users[u])) for u in users.keys()]
+ users = 'Usr: %s' % (' '.join(users))
+ self.last_users_read = users
+ return users
+
+ def function_xdisplay(self):
+ if self.xdisplay is None:
+ x = XDisplay()
+ if x.connection is None:
+ self.xdisplay = 'No DISPLAY environment variable set'
+ self.xdisplay_detailed = self.xdisplay
+ else:
+ self.xdisplay = 'X: %s' % x.connection
+ self.xdisplay_detailed = 'Host: %s' % ('(localhost)' if x.host is None else x.host)
+ self.xdisplay_detailed += SEPARATOR
+ self.xdisplay_detailed += 'Display: %s' % (x.display)
+ if x.screen is not None:
+ self.xdisplay_detailed += SEPARATOR
+ self.xdisplay_detailed += 'Screen: %s' % (x.screen)
+ if x.vt is not None:
+ self.xdisplay += '%sVt: %i' % (SEPARATOR, x.vt)
+ self.xdisplay_detailed += '%sVt: %i' % (SEPARATOR, x.vt)
+ return self.xdisplay_detailed if self.show_detailed else self.xdisplay
+
+ def function_xkb(self): ## add update listener
+ xkb = XKB().get_locks()
+ kbd = 'Kbd:'
+ if self.num_lock:
+ kbd += ' \033[3%imnum\033[0m' % (1 if (xkb & XKB.NUM) == 0 else 2)
+ if self.cap_lock:
+ kbd += ' \033[3%imcap\033[0m' % (1 if (xkb & XKB.CAPS) == 0 else 2)
+ if self.scr_lock:
+ kbd += ' \033[3%imscr\033[0m' % (1 if (xkb & XKB.SCROLL) == 0 else 2)
+ return kbd
+
+ def function(self):
+ if self.display == 0:
+ return self.function_uname()
+ elif self.display == 1:
+ return self.function_users()
+ elif self.display == 2:
+ return self.function_xdisplay()
+ elif self.display == 3:
+ return self.function_xkb()
+
+
+class MyWeather(Entry):
+ def __init__(self, *args, stations = None, fahrenheit = False, wind = 'm/s', from_wind = False, visibility = 'km', **kwargs):
+ self.semaphore = threading.Semaphore()
+ self.stations = [MyWeather.get_metar_station()] if stations is None else [s.upper() for s in stations]
+ self.reports = [None] * len(self.stations)
+ self.up_to_date = [False] * len(self.stations)
+ self.in_deg_f = fahrenheit
+ self.wind_unit = wind
+ self.from_wind = from_wind
+ self.visibility_unit = visibility
+ self.show_temp = True
+ self.show_wind_speed = False
+ self.show_wind_gusts = False
+ self.show_wind_dir = False
+ self.show_wind_chill = False
+ self.show_humidity = False
+ self.show_dew = False
+ self.show_pressure = False
+ self.show_visibility = False
+ self.show_sky = False
+ self.show_station = False
+ self.station = 0
+ self.updated = False
+ self.segments = []
+ self.refresh = Sometimes(lambda : async(self.refresh_, name = 'weather'), 20 * 60 * 4)
+ self.refresh()
+ Entry.__init__(self, *args, **kwargs)
+
+ def action(self, col, button, x, y):
+ if button == MIDDLE_BUTTON:
+ self.show_temp = True
+ self.show_wind_speed = True
+ self.show_wind_gusts = True
+ self.show_wind_dir = True
+ self.show_wind_chill = True
+ self.show_humidity = True
+ self.show_dew = True
+ self.show_pressure = True
+ self.show_visibility = True
+ self.show_sky = True
+ self.show_station = True
+ self.invalidate()
+ elif button == RIGHT_BUTTON:
+ self.refresh.counter = 0
+ self.refresh()
+ elif button in (LEFT_BUTTON, SCROLL_UP, SCROLL_DOWN):
+ index = None
+ for i, seg in enumerate(self.segments):
+ if seg is None:
+ continue
+ (x, width) = seg
+ if 0 <= col - x < width:
+ index = i
+ break
+ if index is None:
+ return
+ if index == 0: # station
+ if button == LEFT_BUTTON:
+ self.show_station = False
+ elif button == SCROLL_UP:
+ station = self.station
+ if station + 1 < len(self.stations):
+ self.station = station + 1
+ else:
+ station = self.station
+ if station > 0:
+ self.station = station - 1
+ elif index == 1: # temperature
+ if button == LEFT_BUTTON:
+ self.show_temp = False
+ else:
+ self.in_deg_f = button == SCROLL_UP
+ elif index == 2: # wind speed/gusts
+ if button == LEFT_BUTTON:
+ if self.plusminus is None or col < self.plusminus:
+ self.show_wind_speed = False
+ else:
+ self.show_wind_gusts = False
+ else:
+ if self.wind_unit == 'm/s':
+ self.wind_unit = 'mph' if button == SCROLL_UP else 'm/s'
+ elif self.wind_unit == 'mph':
+ self.wind_unit = 'kn' if button == SCROLL_UP else 'm/s'
+ elif self.wind_unit in ('kn', 'kt', 'knot', 'knots'):
+ self.wind_unit = 'km/h' if button == SCROLL_UP else 'mph'
+ elif self.wind_unit in ('km/h', 'kmph'):
+ self.wind_unit = 'ft/s' if button == SCROLL_UP else 'kn'
+ elif self.wind_unit in ('ft/s', 'ftps'):
+ self.wind_unit = 'b' if button == SCROLL_UP else 'km/h'
+ else:
+ self.wind_unit = 'b' if button == SCROLL_UP else 'ft/s'
+ elif index == 3: # wind direction
+ if button == LEFT_BUTTON:
+ self.show_wind_dir = False
+ else:
+ self.from_wind = button == SCROLL_UP
+ elif index == 4: # wind chill
+ if button == LEFT_BUTTON:
+ self.show_wind_chill = False
+ else:
+ self.in_deg_f = button == SCROLL_UP
+ elif index == 5: # relative humidity
+ if button == LEFT_BUTTON:
+ self.show_humidity = False
+ else:
+ return
+ elif index == 6: # dew point
+ if button == LEFT_BUTTON:
+ self.show_dew = False
+ else:
+ self.in_deg_f = button == SCROLL_UP
+ elif index == 7: # pressure
+ if button == LEFT_BUTTON:
+ self.show_pressure = False
+ else:
+ return
+ elif index == 8: # visibility
+ if button == LEFT_BUTTON:
+ self.show_visibility = False
+ elif button == SCROLL_UP:
+ unit = self.visibility_unit
+ if unit in {'km' : 'm', 'm' : 'mi'}:
+ self.visibility_unit = {'km' : 'm', 'm' : 'mi'}[unit]
+ else:
+ unit = self.visibility_unit
+ if unit in {'mi' : 'm', 'mile' : 'm', 'm' : 'km'}:
+ self.visibility_unit = {'mi' : 'm', 'mile' : 'm', 'm' : 'km'}[unit]
+ elif index == 9: # sky conditions
+ if button == LEFT_BUTTON:
+ self.show_sky = False
+ else:
+ return
+ self.invalidate()
+
+ def refresh_(self):
+ if self.semaphore.acquire(blocking = False):
+ for index, station in enumerate(self.stations):
+ try:
+ self.reports[index] = Weather(station)
+ self.up_to_date[index] = True
+ except:
+ self.up_to_date[index] = False
+ self.semaphore.release()
+ self.invalidate()
+
+ def invalidate(self):
+ self.updated = False
+ Entry.invalidate(self)
+
+ def get_weather(self, index):
+ station = self.stations[index]
+ report = self.reports[index]
+ up_to_date = self.up_to_date[index]
+ station = station[0] + station[1:].lower()
+
+ if report is None:
+ if self.show_station:
+ return '%s: ?' % station
+ else:
+ return '?'
+
+ text = []
+
+ if self.show_temp and report.temp is not None:
+ value = report.temp
+ colour = '34'
+ if value < -10: colour = '39;44'
+ if value >= 18: colour = '39'
+ if value >= 25: colour = '33'
+ if value >= 30: colour = '31'
+ if self.in_deg_f:
+ value = value * 9 / 5 + 32
+ value = '\033[%sm%.0f\033[0m%s' % (colour, value, '°F' if self.in_deg_f else '°C')
+ text.append(value)
+ else:
+ text.append(None)
+
+ if self.show_wind_speed and report.wind_speed is not None:
+ knot = report.wind_speed
+ kmph = knot * 1.852 # km/h
+ mps = kmph / 3.6 # m/s
+ mph = knot * 1.151 # mph
+ ftps = mps * 3.281 # ft/s
+ beaufort = (mps / 0.8365) ** (2 / 3) # B
+ if self.wind_unit in ('kn', 'kt', 'knot', 'knots'):
+ value, unit = knot, 'kn'
+ elif self.wind_unit in ('km/h', 'kmph'):
+ value, unit = kmph, 'km/h'
+ elif self.wind_unit == 'mph':
+ value, unit = mph, 'mph'
+ elif self.wind_unit in ('ft/s', 'ftps'):
+ value, unit = ftps, 'ft/s'
+ elif self.wind_unit.lower() in ('beaufort', 'beaufort scale', 'bf', 'bt', 'b'):
+ value, unit = beaufort, 'B'
+ else:
+ value, unit = mps, 'm/s'
+ beaufort = int(beaufort + 0.5)
+ colour = '32'
+ if beaufort >= 1: colour = '39'
+ if beaufort >= 3: colour = '32'
+ if beaufort >= 7: colour = '33'
+ if beaufort >= 10: colour = '31'
+ if beaufort >= 12: colour = '39;41'
+ wind = '\033[%sm%.0f\033[0m' % (colour, value)
+ if self.show_wind_gusts and report.wind_gusts is not None:
+ knot = report.wind_gusts
+ kmph = knot * 1.852 # km/h
+ mps = kmph / 3.6 # m/s
+ mph = knot * 1.151 # mph
+ ftps = mps * 3.281 # ft/s
+ beaufort = (mps / 0.8365) ** (2 / 3) # B
+ if self.wind_unit in ('kn', 'kt', 'knot', 'knots'):
+ value = knot
+ elif self.wind_unit in ('km/h', 'kmph'):
+ value = kmph
+ elif self.wind_unit == 'mph':
+ value = mph
+ elif self.wind_unit in ('ft/s', 'ftps'):
+ value = ftps
+ elif self.wind_unit.lower() in ('beaufort', 'beaufort scale', 'bf', 'bt', 'b'):
+ value = beaufort
+ else:
+ value = mps
+ beaufort = int(beaufort + 0.5)
+ colour = '32'
+ if beaufort >= 1: colour = '39'
+ if beaufort >= 3: colour = '32'
+ if beaufort >= 7: colour = '33'
+ if beaufort >= 10: colour = '31'
+ if beaufort >= 12: colour = '39;41'
+ wind += '±\033[%sm%.0f\033[0m' % (colour, value)
+ wind += unit
+ text.append(wind)
+ else:
+ text.append(None)
+
+ def wind_dir(direction):
+ if not self.from_wind:
+ direction += 180
+ direction %= 360
+ if direction < 0:
+ direction += 360
+ if (direction < 90):
+ if direction == 0:
+ return 'N'
+ if direction <= 45:
+ return 'N%.0f°E' % direction
+ return 'E%.0f°N' % (90 - direction)
+ elif (90 <= direction < 180):
+ if direction == 0:
+ return 'E'
+ direction -= 90
+ if direction <= 45:
+ return 'E%.0f°S' % direction
+ return 'S%.0f°E' % (90 - direction)
+ elif (180 <= direction < 270):
+ if direction == 0:
+ return 'S'
+ direction -= 180
+ if direction <= 45:
+ return 'S%.0f°W' % direction
+ return 'W%.0f°S' % (90 - direction)
+ else:
+ if direction == 0:
+ return 'W'
+ direction -= 270
+ if direction <= 45:
+ return 'W%.0f°N' % direction
+ return 'N%.0f°W' % (90 - direction)
+ if not self.show_wind_dir:
+ wind = None
+ elif report.wind_var is not None:
+ wind = wind_dir(report.wind_var[0]) + '-' + wind_dir(report.wind_var[1])
+ elif report.wind_dir is not None:
+ wind = wind_dir(report.wind_dir)
+ else:
+ wind = None
+ if wind is not None:
+ if self.from_wind:
+ wind += '→'
+ else:
+ wind = '→' + wind
+ text.append(wind)
+ else:
+ text.append(None)
+
+ if self.show_wind_chill and report.wind_chill is not None:
+ value = report.wind_chill
+ # Coloured by frostbite time, however the wind is not taken
+ # into account so the colour is approximate
+ colour = '39'
+ if value <= -28: colour = '33'
+ if value <= -37: colour = '31'
+ if value <= -46: colour = '39;41'
+ if self.in_deg_f:
+ value = value * 9 / 5 + 32
+ value = 'w\033[%sm%.0f\033[0m%s' % (colour, value, '°F' if self.in_deg_f else '°C')
+ text.append(value)
+ else:
+ text.append(None)
+
+ if self.show_humidity and report.humidity is not None:
+ value = report.humidity
+ colour = '34'
+ if value >= 26: colour = '32'
+ if value >= 31: colour = '39'
+ if value >= 37: colour = '33'
+ if value >= 52: colour = '31'
+ if value >= 73: colour = '39;41'
+ value = '\033[%sm%.0f\033[0m%%' % (colour, value)
+ text.append(value)
+ else:
+ text.append(None)
+
+ if self.show_dew and report.dew is not None:
+ value = report.dew
+ colour = '34'
+ if value >= 10: colour = '32'
+ if value >= 13: colour = '39'
+ if value >= 16: colour = '33'
+ if value >= 21: colour = '31'
+ if value >= 26: colour = '39;41'
+ if self.in_deg_f:
+ value = value * 9 / 5 + 32
+ value = 'd\033[%sm%.0f\033[0m%s' % (colour, value, '°F' if self.in_deg_f else '°C')
+ text.append(value)
+ else:
+ text.append(None)
+
+ if self.show_pressure and report.pressure is not None:
+ text.append('%.0fhPa' % report.pressure)
+ else:
+ text.append(None)
+
+ if self.show_visibility and report.visibility is not None:
+ mile = report.visibility
+ km = mile * 1.609344
+ m = mile * 1609.344
+ if self.visibility_unit in ('mile', 'mi'):
+ value, unit = mile, 'mi'
+ elif self.visibility_unit == 'm':
+ value, unit = m, 'm'
+ else:
+ value, unit = km, 'km'
+ text.append('%.0f%s' % (value, unit))
+ else:
+ text.append(None)
+
+ if self.show_sky and 'Sky conditions' in report.fields:
+ value = report.fields['Sky conditions']
+ if not value == '':
+ text.append(value)
+ else:
+ text.append(None)
+ else:
+ text.append(None)
+
+ segments = []
+ x = 0
+ if self.show_station:
+ w = len(station)
+ segments.append((x, w))
+ x += w + 2
+ else:
+ segments.append(None)
+ x -= 1
+ self.plusminus = None
+ for i, segment in enumerate(text):
+ if segment is None:
+ segments.append(None)
+ else:
+ if i == 1 and '±' in segment:
+ self.plusminus = x + Bar.coloured_length(segment.split('±')[0])
+ w = Bar.coloured_length(segment)
+ x += 1
+ segments.append((x, w))
+ x += w
+ self.segments = segments
+ text = [s for s in text if s is not None]
+
+ if len(text) == 0:
+ return station
+
+ if not up_to_date:
+ if len(text) <= 1:
+ text = ' '.join(text) + '?'
+ else:
+ text = ' '.join(text) + ' ?'
+ else:
+ text = ' '.join(text)
+ if self.show_station:
+ text = '%s: %s' % (station, text)
+ return text
+
+ def function(self):
+ if not self.updated:
+ self.updated = True
+ self.text = self.get_weather(self.station)
+ rc = self.text
+ self.refresh()
+ return rc
+
+ @staticmethod
+ def get_metar_station():
+ try:
+ with open(HOME + '/.config/metar', 'rb') as file:
+ station = file.read()
+ except:
+ try:
+ with open('/etc/metar', 'rb') as file:
+ station = file.read()
+ except:
+ print('~/.config/metar is not set', file = sys.stderr, flush = True)
+ station = b'ESSA'
+ return station.decode('utf-8', 'strict').split('\n')[0].upper()
+
+
+class MyNews(Entry):
+ def __init__(self, *args, status_path = None, **kwargs):
+ if status_path is not None:
+ self.status_path = status_path
+ else:
+ self.status_path = HOME + '/.var/lib/featherweight/status'
+ self.get_news()
+ Entry.__init__(self, *args, **kwargs)
+ async(self.refresh, name = 'news')
+
+ def action(self, col, button, x, y):
+ if button == LEFT_BUTTON:
+ async(lambda : subprocess.Popen([TERMINAL, '-e', 'featherweight']).wait())
+ elif button == RIGHT_BUTTON:
+ async(lambda : subprocess.Popen(['featherweight', '--update', '--system']).wait())
+
+ def get_news(self):
+ try:
+ with open(self.status_path, 'rb') as file:
+ status = int(file.read().decode('utf-8', 'replace').replace('\n', ''))
+ except:
+ status = 0
+ colour = '31'
+ if status <= 0: colour = '0'
+ elif status <= 5: colour = '32'
+ elif status <= 10: colour = '33'
+ self.text = 'News: \033[%sm%i\033[0m' % (colour, status)
+
+ def refresh(self):
+ while True:
+ try:
+ spawn_read(*pdeath('HUP', 'inotifywait', self.status_path, '-e', 'close_write'))
+ self.get_news()
+ except:
+ time.sleep(1)
+
+ def function(self):
+ return self.text
+
+
+class MyALSA(Entry):
+ def __init__(self, *args, timeout = None, sleep = None, mixers = None, **kwargs):
+ if timeout is not None:
+ self.timeout = timeout
+ else:
+ self.timeout = 5
+ if sleep is not None:
+ self.sleep = sleep
+ else:
+ self.sleep = 5
+ if mixers is not None:
+ mixers = mixers
+ else:
+ mixers = ('Master', 'PCM')
+ self.alsa = []
+ for mixer in mixers:
+ try:
+ self.alsa.append(ALSA(mixername = mixer))
+ except:
+ pass
+ try:
+ import posix_ipc
+ method = 0
+ except:
+ method = 2
+ try:
+ path = os.environ['PATH'].split(':')
+ for p in path:
+ p += '/cmdipc'
+ if os.access(p, os.F_OK | os.X_OK, effective_ids = True):
+ method = 1
+ break
+ except:
+ pass
+ self.sep_width = Bar.coloured_length(SEPARATOR)
+ self.get_volume()
+ Entry.__init__(self, *args, **kwargs)
+ async((self.refresh_posix_ipc, self.refresh_cmdipc, self.refresh_wait)[method], name = 'alsa')
+
+ def action(self, col, button, x, y):
+ mixer = 0
+ mixer_text = self.text.split(SEPARATOR)
+ while mixer < len(mixer_text): ## the limit is just a precaution
+ mixer_width = Bar.coloured_length(mixer_text[mixer])
+ if col >= mixer_width:
+ col -= mixer_width
+ if col < self.sep_width:
+ return
+ col -= self.sep_width
+ else:
+ break
+ mixer += 1
+ mixer_text = mixer_text[mixer]
+
+ channel = -1
+ mixer_text = mixer_text.split(': ')
+ mixer_label = ': '.join(mixer_text[:-1]) + ': '
+ mixer_text = mixer_text[-1].split(' ')
+ mixer_label = Bar.coloured_length(mixer_label)
+ if col >= mixer_label:
+ col -= mixer_label
+ while channel < len(mixer_text): ## the limit is just a precaution
+ channel += 1
+ channel_width = Bar.coloured_length(mixer_text[channel])
+ if col < channel_width:
+ break
+ col -= channel_width
+ if col < 1:
+ channel = -1
+ break
+ col -= 1
+
+ mixer = self.alsa[mixer]
+ if button == LEFT_BUTTON:
+ muted = mixer.get_mute()
+ muted = not (any(muted) if channel == -1 else muted[channel])
+ mixer.set_mute(muted, channel)
+ elif button == RIGHT_BUTTON:
+ volume = mixer.get_volume()
+ volume = limited(sum(volume) / len(volume))
+ mixer.set_volume(volume, -1)
+ elif button in (SCROLL_UP, SCROLL_DOWN):
+ volume = mixer.get_volume()
+ adj = -5 if button == SCROLL_DOWN else 5
+ if channel < 0:
+ for ch in range(len(volume)):
+ mixer.set_volume(limited(volume[ch] + adj), ch)
+ else:
+ mixer.set_volume(limited(volume[channel] + adj), channel)
+ else:
+ return
+ self.get_volume()
+ self.invalidate()
+
+ def get_volume(self):
+ text_v = lambda v : '--%' if v is None else ('%2i%%' % v)[:3]
+ read_m = lambda m : '%s: %s' % (m.mixername, ' '.join(text_v(v) for v in m.get_volume()))
+ self.text = SEPARATOR.join(read_m(m) for m in self.alsa)
+
+ def refresh_posix_ipc(self):
+ import posix_ipc
+ s = posix_ipc.Semaphore('/.xpybar.alsa.0', posix_ipc.O_CREAT, 0o600, 1)
+ c = posix_ipc.Semaphore('/.xpybar.alsa.1', posix_ipc.O_CREAT, 0o600, 0)
+ q = posix_ipc.Semaphore('/.xpybar.alsa.2', posix_ipc.O_CREAT, 0o600, 0)
+ while True:
+ try:
+ s.release() ; c.release()
+ try:
+ q.acquire(timeout)
+ except posix_ipc.BusyError:
+ pass
+ c.acquire(None)
+ s.acquire(None)
+ self.get_volume()
+ except:
+ time.sleep(self.sleep)
+
+ def refresh_cmdipc(self):
+ command = pdeath('HUP', 'cmdipc', '-PCck', '/.xpybar.alsa.0/.xpybar.alsa.1/.xpybar.alsa.2', 'wait', '-b%i' % self.timeout)
+ while True:
+ try:
+ spawn_read(*command)
+ self.get_volume()
+ except:
+ time.sleep(self.sleep)
+
+ def refresh_wait(self):
+ while True:
+ try:
+ time.sleep(self.sleep)
+ self.get_volume()
+ except:
+ pass
+
+ def function(self):
+ return self.text
+
+
+class MyBattery(Entry):
+ def __init__(self, *args, **kwargs):
+ self.show_all = False
+ self.show_battery = 0
+ Entry.__init__(self, *args, **kwargs)
+
+ def action(self, col, button, x, y):
+ if button == LEFT_BUTTON:
+ self.show_all = not self.show_all
+ self.invalidate()
+ elif button == SCROLL_UP:
+ self.show_battery += 1
+ self.invalidate()
+ elif button == SCROLL_DOWN:
+ self.show_battery -= 1
+ if self.show_battery < 0:
+ self.show_battery = 0
+ self.invalidate()
+
+ def colourise_(self, text):
+ try:
+ value = text.replace(',', '.')
+ value = float(value) if '.' in value else int(value)
+ colour = '39'
+ if value < 80: colour = '32'
+ if value < 50: colour = '33'
+ if value < 15: colour = '31'
+ if value < 5: colour = '7;31'
+ return '\033[%sm%s\033[0m' % (colour, text)
+ except:
+ return text
+
+ def colourise(self, text):
+ buf = []
+ state = None
+ for c in reversed(text):
+ if c == '\t':
+ c = ' '
+ if state is not None:
+ if c == ' ':
+ if state == '':
+ buf.append(c)
+ continue
+ buf.append(self.colourise_(state))
+ buf.append(c)
+ state = None
+ else:
+ state = c + state
+ elif c == '%':
+ buf.append(c)
+ state = ''
+ else:
+ buf.append(c)
+ if state is not None and state != '':
+ buf.append(self.colourise_(state))
+ return ''.join(reversed(buf))
+
+ def function(self):
+ try:
+ data = spawn_read('acpi').split('\n')
+ except:
+ return 'acpi missing'
+ if self.show_battery >= len(data):
+ self.show_battery = len(data) - 1
+ if len(data) == 1:
+ return self.colourise(': '.join(data[0].split(': ')[1:]))
+ if self.show_all:
+ data = [self.colourise(': '.join(datum.split(': ')[1:])) for datum in data]
+ data = SEPARATOR.join('Bat%i: %s' % (i, datum) for i, datum in enumerate(data))
+ return data
+ data = data[self.show_battery]
+ data = ': '.join(data.split(': ')[1:])
+ data = 'Bat%i: %s' % (self.show_battery, self.colourise(data))
+ return data
+
+
+class MyCPU(Entry):
+ def __init__(self, *args, **kwargs):
+ self.last_sirq_split = SoftIRQs()
+ cpu = CPU()
+ self.last_time = time.monotonic()
+ self.last_cpu_stat = cpu.cpu
+ self.last_cpu_total = sum(cpu.cpu)
+ self.last_cpus_total = [sum(c) for c in cpu.cpus]
+ self.last_cpus_stat = [[c[s] for c in cpu.cpus] for s in range(len(cpu.cpu))]
+ self.last_intr = cpu.intr_total
+ self.last_ctxt = cpu.ctxt
+ self.last_fork = cpu.processes
+ self.last_sirq = cpu.softirq_total
+ self.none = self.colourise(None)
+ self.coloured = tuple(self.colourise(i) for i in range(101))
+ self.show_all = False
+ self.show_function = 0
+ self.functions = [ lambda *v : self.function_cpu(CPU.idle, '', *v)
+ , self.function_intr
+ , self.function_ctxt
+ , self.function_fork
+ , self.function_proc
+ , self.function_sirq
+ ]
+ for key in self.last_sirq_split.keys:
+ make = lambda key : (lambda *v : self.function_sirq_split(key, *v))
+ self.functions.append(make(key))
+ self.functions += [ lambda *v : self.function_cpu(CPU.user, '(user)', *v)
+ , lambda *v : self.function_cpu(CPU.nice, '(nice)', *v)
+ , lambda *v : self.function_cpu(CPU.system, '(system)', *v)
+ , lambda *v : self.function_cpu(CPU.iowait, '(iowait)', *v)
+ , lambda *v : self.function_cpu(CPU.irq, '(irq)', *v)
+ , lambda *v : self.function_cpu(CPU.softirq, '(softirq)', *v)
+ , lambda *v : self.function_cpu(CPU.steal, '(steal)', *v)
+ , lambda *v : self.function_cpu(CPU.guest, '(guest)', *v)
+ , lambda *v : self.function_cpu(CPU.guest_nice, '(guest nice)', *v)
+ , self.function_load
+ , self.function_task
+ , self.function_pid
+ , self.function_online
+ ]
+ Entry.__init__(self, *args, **kwargs)
+
+ def action(self, col, button, x, y):
+ if button == LEFT_BUTTON:
+ self.show_all = not self.show_all
+ self.invalidate()
+ elif button == SCROLL_UP:
+ n = self.show_function + 1
+ if n < len(self.functions):
+ self.show_function = n
+ self.invalidate()
+ elif button == SCROLL_DOWN:
+ n = self.show_function - 1
+ if n >= 0:
+ self.show_function = n
+ self.invalidate()
+
+ def usage(self, now_stat, now_total, last_stat, last_total, idle):
+ total = now_total - last_total
+ stat = now_stat - last_stat
+ return None if total == 0 else (total - stat if idle else stat) * 100 / total
+
+ def colourise_(self, value):
+ if value is None:
+ return self.none
+ else:
+ return self.coloured[limited(value)]
+
+ def colourise(self, value):
+ if value is None:
+ return '--%'
+ elif value >= 100:
+ return '\033[31m100\033[0m'
+ colour = '39'
+ if value >= 5: colour = '32'
+ if value >= 50: colour = '33'
+ if value >= 90: colour = '31'
+ return '\033[%sm%2i\033[0m%%' % (colour, value)
+
+ def function_cpu(self, stat, name, cpu, display, tdiff):
+ now_cpu_stat = self.now_cpu_stat[stat]
+ now_cpus_stat = self.now_cpus_stat[stat]
+ last_cpus_stat = self.last_cpus_stat[stat]
+ last_cpu_stat = self.last_cpu_stat[stat]
+
+ if display:
+ cpu = (now_cpu_stat, self.now_cpu_total, last_cpu_stat, self.last_cpu_total)
+ cpu = self.colourise_(self.usage(*cpu, idle = stat == CPU.idle))
+ if self.show_all:
+ cpus = (now_cpus_stat, self.now_cpus_total, last_cpus_stat, self.last_cpus_total)
+ cpus = ' '.join(self.colourise_(self.usage(*c, idle = stat == CPU.idle)) for c in zip(*cpus))
+ cpu = 'Cpu%s: %s : %s' % (name, cpus, cpu)
+ else:
+ cpu = 'Cpu%s: %s' % (name, cpu)
+
+ return cpu
+
+ def function_intr(self, cpu, display, tdiff):
+ now_intr = cpu.intr_total
+ if display:
+ cpu = 'Intr: %.0fHz' % ((now_intr - self.last_intr) / tdiff)
+ self.last_intr = now_intr
+ return cpu
+
+ def function_ctxt(self, cpu, display, tdiff):
+ now_ctxt = cpu.ctxt
+ if display:
+ cpu = 'Ctxt: %.0fHz' % ((now_ctxt - self.last_ctxt) / tdiff)
+ self.last_ctxt = now_ctxt
+ return cpu
+
+ def function_fork(self, cpu, display, tdiff):
+ now_fork = cpu.processes
+ if display:
+ cpu = 'Fork: %.0fHz' % ((now_fork - self.last_fork) / tdiff)
+ self.last_fork = now_fork
+ return cpu
+
+ def function_sirq(self, cpu, display, tdiff):
+ now_sirq = cpu.softirq_total
+ if display:
+ cpu = 'Sirq: %.0fHz' % ((now_sirq - self.last_sirq) / tdiff)
+ self.last_sirq = now_sirq
+ return cpu
+
+ def function_proc(self, cpu, display, tdiff):
+ if display:
+ cpu = 'Proc: %irun %iio' % (cpu.procs_running, cpu.procs_blocked)
+ return cpu
+
+ def function_load(self, cpu, display, tdiff):
+ if display:
+ load = AverageLoad()
+ cpu = 'Load: %.2f %.2f %.2f' % (load.total_avg_5_min, load.total_avg_10_min, load.total_avg_15_min)
+ return cpu
+
+ def function_task(self, cpu, display, tdiff):
+ if display:
+ load = AverageLoad()
+ cpu = 'Task: %i/%i (%.0f%%)' % (load.active_tasks, load.total_tasks, load.active_tasks * 100 / load.total_tasks)
+ return cpu
+
+ def function_pid(self, cpu, display, tdiff):
+ if display:
+ load = AverageLoad()
+ cpu = 'Last PID: %i' % load.last_pid
+ return cpu
+
+ def function_online(self, cpu, display, tdiff):
+ if display:
+ try:
+ cpu, cpuonline, on, off = '', CPUOnline(), 0, 0
+ online, offline = cpuonline.online, cpuonline.offline
+ while on < len(online) and off < len(offline):
+ if online[on] < offline[off]:
+ cpu += ' \033[32m%s\033[0m' % online[on]
+ on += 1
+ else:
+ cpu += ' \033[31m%s\033[0m' % offline[off]
+ off += 1
+ cpu += ''.join(' \033[32m%s\033[0m' % c for c in online[on:])
+ cpu += ''.join(' \033[31m%s\033[0m' % c for c in offline[off:])
+ cpu = 'Online:%s' % cpu
+ except Exception as e:
+ cpu = str(e)
+ return cpu
+
+ def function_sirq_split(self, key, cpu, display, tdiff):
+ if display:
+ label = 'Sirq(%s)' % key.lower().replace('_', ' ').replace(' rx', '↓').replace(' tx', '↑')
+ now = self.now_sirq_split[key]
+ last = self.last_sirq_split[key]
+ hz = lambda n, l : '%0.fHz' % ((n - l) / tdiff)
+ n = len(now)
+ snow, slast = sum(now), sum(last)
+ anow, alast = snow / n, slast / n
+ each = ''
+ if self.show_all:
+ each = '%s : ' % ' '.join(hz(n, l) for n, l in zip(now, last))
+ else:
+ each = ''
+ cpu = '%s: %s%s(%s)' % (label, each, hz(snow, slast), hz(anow, alast))
+ return cpu
+
+ def function(self):
+ now = time.monotonic()
+ cpu = CPU()
+ self.now_sirq_split = SoftIRQs()
+ tdiff = now - self.last_time
+ self.last_time = now
+ display = self.show_function
+ self.now_cpu_stat = cpu.cpu
+ self.now_cpu_total = sum(cpu.cpu)
+ self.now_cpus_total = [sum(c) for c in cpu.cpus]
+ self.now_cpus_stat = [[c[s] for c in cpu.cpus] for s in range(len(cpu.cpu))]
+ for i in range(len(self.functions)):
+ if i == display:
+ ret = self.functions[i](cpu, True, tdiff)
+ else:
+ self.functions[i](cpu, False, tdiff)
+ self.last_cpus_stat = self.now_cpus_stat
+ self.last_cpu_stat = self.now_cpu_stat
+ self.last_cpus_total = self.now_cpus_total
+ self.last_cpu_total = self.now_cpu_total
+ self.last_sirq_split = self.now_sirq_split
+ return ret
+
+
+class MyMemory(Entry):
+ def __init__(self, *args, **kwargs):
+ self.coloured = tuple(self.colourise(i) for i in range(101))
+ self.show_labels = False
+ self.show_value = -1
+ self.show_default = False
+ self.show_swaps = False
+ self.show_swap = 0
+ self.labels = list(sorted(Memory().keys))
+ swaps = Swaps()
+ self.prio = swaps.header_map['Priority']
+ self.used = swaps.header_map['Used']
+ self.size = swaps.header_map['Size']
+ self.swap = swaps.header_map['Filename']
+ Entry.__init__(self, *args, **kwargs)
+
+ def action(self, col, button, x, y):
+ if button == LEFT_BUTTON:
+ if self.show_value >= 0:
+ self.show_default = not self.show_default
+ self.invalidate()
+ elif button == MIDDLE_BUTTON:
+ self.show_labels = not self.show_labels
+ self.invalidate()
+ elif button == RIGHT_BUTTON:
+ self.show_swaps = not self.show_swaps
+ self.invalidate()
+ elif button == SCROLL_UP:
+ if self.show_swaps:
+ self.show_swap += 1
+ self.invalidate()
+ else:
+ n = self.show_value + 1
+ if n < len(self.labels):
+ self.show_value = n
+ self.invalidate()
+ elif button == SCROLL_DOWN:
+ if self.show_swaps:
+ n = self.show_swap
+ if n > 0:
+ self.show_swap = n - 1
+ self.invalidate()
+ else:
+ self.show_value -= 1
+ if self.show_value < -1:
+ self.show_value = -1
+ self.invalidate()
+
+ def u(self, value):
+ units = 'KMGTPE'
+ unit = 0
+ while unit + 1 < len(units) and value >= 1024:
+ unit += 1
+ value /= 1024
+ return '%.1f%s' % (value, units[unit])
+
+ def colourise(self, value):
+ if value >= 100:
+ return '\033[31m100\033[0m'
+ colour = '39'
+ if value > 30: colour = '32'
+ if value > 50: colour = '33'
+ if value > 80: colour = '31'
+ return '\033[%sm%i\033[0m%%' % (colour, value)
+
+ def function(self):
+ if self.show_swaps:
+ swaps = Swaps()
+ while True:
+ if len(swaps.swaps) == 0:
+ return 'no swaps available'
+ try:
+ swap = swaps.swaps[self.show_swap]
+ used, size = int(swap[self.used]), int(swap[self.size])
+ swap, prio = swap[self.swap], int(swap[self.prio])
+ return '%s: %s (%sB/%sB, prio %i)' % (swap, self.colourise(used * 100 / size),
+ self.u(used), self.u(size), prio)
+ except:
+ self.show_swap -= 1
+ if self.show_swap < 0:
+ self.show_swaps = False
+ return self.function()
+ else:
+ memory = Memory()
+ if not self.show_default and self.show_value > 0:
+ label = self.labels[self.show_value]
+ try:
+ value = memory[label]
+ unit = 0
+ units = ['K', 'M', 'G', 'T', 'P', 'E']
+ while (unit + 1 < len(units)) and (value >= 1024):
+ value /= 1024
+ unit += 1
+ return '%s: %.1f%sB' % (label, value, units[unit])
+ except:
+ self.show_value = -1
+ if memory.mem_total == 0:
+ mem = '---'
+ shm = '---'
+ else:
+ mem = self.coloured[limited(memory.mem_used * 100 / memory.mem_total)]
+ shm = self.coloured[limited(memory.shmem * 100 / memory.mem_total)]
+ if memory.swap_total == 0:
+ swp = 'n/a'
+ else:
+ swp = self.coloured[limited(memory.swap_used * 100 / memory.swap_total)]
+ if self.show_labels:
+ mem = 'Mem: %s%sSwp: %s%sShm: %s' % (mem, SEPARATOR, swp, SEPARATOR, shm)
+ else:
+ mem = 'Mem: %s %s %s' % (mem, swp, shm)
+ return mem
+
+
+class MyStat(Entry):
+ def __init__(self, *args, **kwargs):
+ self.keys_4 = SNMP().keys
+ self.keys_6 = SNMP6().keys
+ self.keys_vm = VMStat().keys
+ self.show_group = 0
+ self.show_value_4 = 0
+ self.show_value_6 = 0
+ self.show_value_vm = 0
+ Entry.__init__(self, *args, **kwargs)
+
+ def action(self, col, button, x, y):
+ if button == LEFT_BUTTON:
+ n = self.show_group + 1
+ if n < 4:
+ self.show_group = n
+ self.invalidate()
+ elif button == MIDDLE_BUTTON:
+ self.invalidate()
+ elif button == RIGHT_BUTTON:
+ n = self.show_group
+ if n > 0:
+ self.show_group = n - 1
+ self.invalidate()
+ elif button == SCROLL_UP:
+ if self.show_group == 0:
+ n = self.show_value_4
+ if n + 1 < len(self.keys_4):
+ self.show_value_4 = n + 1
+ self.invalidate()
+ elif self.show_group == 1:
+ n = self.show_value_6
+ if n + 1 < len(self.keys_6):
+ self.show_value_6 = n + 1
+ self.invalidate()
+ elif self.show_group == 2:
+ n = self.show_value_vm
+ if n + 1 < len(self.keys_vm):
+ self.show_value_vm = n + 1
+ self.invalidate()
+ elif self.show_group == 3:
+ pass
+ elif button == SCROLL_DOWN:
+ if self.show_group == 0:
+ n = self.show_value_4
+ if n > 0:
+ self.show_value_4 = n - 1
+ self.invalidate()
+ elif self.show_group == 1:
+ n = self.show_value_6
+ if n > 0:
+ self.show_value_6 = n - 1
+ self.invalidate()
+ elif self.show_group == 2:
+ n = self.show_value_vm
+ if n > 0:
+ self.show_value_vm = n - 1
+ self.invalidate()
+ elif self.show_group == 3:
+ pass
+
+ def function(self):
+ try:
+ if self.show_group == 0:
+ label = self.keys_4[self.show_value_4]
+ value = SNMP()[label]
+ elif self.show_group == 1:
+ label = self.keys_6[self.show_value_6]
+ value = SNMP6()[label]
+ elif self.show_group == 2:
+ label = self.keys_vm[self.show_value_vm]
+ value = VMStat()[label]
+ label = ''.join(w[:1].upper() + w[1:] for w in label.split('_'))
+ elif self.show_group == 3:
+ uptime = Uptime()
+ avg = uptime.uptime_seconds - uptime.average_idle_seconds
+ wrk = (avg) * 100 / uptime.uptime_seconds
+ up = '%id%i:%02i' % uptime.uptime[:3]
+ avg = '%id%i:%02i' % Uptime.split_time(avg)[:3]
+ return 'Uptime: %.0f%% %s / %s' % (wrk, avg, up)
+ return '%s: %i' % (label, value)
+ except Exception as e:
+ return str(e)
+
+
+class MyNetwork(Entry):
+ def __init__(self, *args, limits = None, ignore = None, pings = None, **kwargs):
+ self.limits = { 'rx_bytes' : None # Download speed in bytes (not bits)
+ , 'tx_bytes' : None # Upload speed in bytes (not bits)
+ , 'rx_total' : None # Download cap in bytes
+ , 'tx_total' : None # Upload cap in bytes
+ }
+ if limits is not None:
+ self.limits = limits
+ if pings is None:
+ pings = ['gateway']
+ elif isinstance(pings, str) or isinstance(pings, Ping):
+ pings = [pings]
+ pings = [(Ping(targets = Ping.get_nics(p)) if isinstance(p, str) else p).monitors for p in pings]
+ self.pings = {}
+ for ping in pings:
+ for nic in ping.keys():
+ if nic not in self.pings:
+ self.pings[nic] = ping[nic]
+ else:
+ self.pings[nic] += ping[nic]
+ self.ignore = ['lo'] if ignore is None else ignore
+ self.net_time = time.monotonic()
+ self.net_last = {}
+ self.show_all = False
+ self.show_name = False
+ self.show_value = 0
+ self.labels = ['bytes', 'total', 'packets', 'errs',
+ 'drop', 'fifo', 'frame', 'colls',
+ 'carrier', 'compressed', 'multicast']
+ self.in_bytes = False # in bits if showing total
+ Entry.__init__(self, *args, **kwargs)
+
+ def action(self, col, button, x, y):
+ if button == LEFT_BUTTON:
+ self.show_all = not self.show_all
+ elif button == MIDDLE_BUTTON:
+ self.show_name = not self.show_name
+ elif button == RIGHT_BUTTON:
+ self.in_bytes = not self.in_bytes
+ elif button == SCROLL_UP:
+ n = self.show_value + 1
+ if n >= len(self.labels):
+ return
+ self.show_value = n
+ elif button == SCROLL_DOWN:
+ n = self.show_value - 1
+ if self.show_value < 0:
+ return
+ self.show_value = n
+ else:
+ return
+ self.invalidate()
+
+ def colourise(self, value, percent):
+ colour = '39'
+ if percent > 25: colour = '32'
+ if percent > 50: colour = '33'
+ if percent > 90: colour = '31'
+ return '\033[%sm%3.0f\033[0m' % (colour, value)
+
+ def KBps(self, network, device, direction, net_tdiff):
+ direction += '_bytes'
+ value = network[device][direction]
+ limit = None
+ if direction in self.limits:
+ limit = self.limits[direction]
+ if limit is None:
+ limit = 12500000 # 100 mbps
+ if device in self.net_last:
+ value -= self.net_last[device][direction]
+ else:
+ value = 0
+ percent = value * 100 / (limit * net_tdiff)
+ value /= 1024 * net_tdiff
+ return self.colourise(value, percent)
+
+ def kbps(self, network, device, direction, net_tdiff):
+ direction += '_bytes'
+ value = network[device][direction]
+ limit = None
+ if direction in self.limits:
+ limit = self.limits[direction]
+ if limit is None:
+ limit = 12500000 # 100 mbps
+ if device in self.net_last:
+ value -= self.net_last[device][direction]
+ else:
+ value = 0
+ percent = value * 100 / (limit * net_tdiff)
+ value /= 128 * net_tdiff
+ return self.colourise(value, percent)
+
+ def total(self, network, device, direction, bits):
+ direction += '_bytes'
+ value = network[device][direction]
+ limit = self.limits[direction] if direction in self.limits else None
+ if limit is not None:
+ percent = value * 100 / limit
+ colour = '32'
+ if percent >= 25: colour = '39'
+ if percent >= 50: colour = '33'
+ if percent >= 75: colour = '31'
+ if percent >= 95: colour = '7;33'
+ if percent >= 100: colour = '7;31'
+ else:
+ colour = '39'
+ if bits:
+ value *= 8
+ unit = 0
+ if bits:
+ units = ['', 'k', 'm', 'g', 't', 'p', 'e']
+ else:
+ units = ['', 'K', 'M', 'G', 'T', 'P', 'E']
+ while (unit + 1 < len(units)) and (value >= 1024):
+ value /= 1024
+ unit += 1
+ return '\033[%sm%.1f\033[0m%s' % (colour, value, units[unit])
+
+ def ping(self, monitor):
+ monitor.semaphore.acquire()
+ try:
+ latency = monitor.get_latency(True)[1]
+ droptime = monitor.dropped_time(True)
+ if droptime:
+ return ' \033[31m%4is\033[00m' % droptime
+ elif latency is None:
+ return ' \033[31mdrop?\033[00m'
+ colour = '31'
+ if latency < 5: colour = '32'
+ elif latency < 10: colour = '39'
+ elif latency < 20: colour = '33'
+ return ' \033[%sm%5.2f\033[00m' % (colour, latency)
+ finally:
+ monitor.semaphore.release()
+
+ def function(self):
+ net_now = time.monotonic()
+ net_tdiff, self.net_time = net_now - self.net_time, net_now
+ network = Network(*self.ignore).devices
+ label = self.labels[self.show_value]
+ show_total = label == 'total'
+ show_bytes = label == 'bytes'
+ xlabel = 'bytes' if show_total else label
+ suffix = ''
+ if show_total and self.in_bytes:
+ f = lambda dev, dir : self.total(network, dev, dir, True) + 'b'
+ elif show_total:
+ f = lambda dev, dir : self.total(network, dev, dir, False) + 'B'
+ elif show_bytes and self.in_bytes:
+ f = lambda dev, dir : self.KBps(network, dev, dir, net_tdiff) + 'KB/s'
+ elif show_bytes:
+ f = lambda dev, dir : self.kbps(network, dev, dir, net_tdiff) + 'kbps'
+ else:
+ f = lambda dev, dir : '%i' % network[dev][dir + '_' + label]
+ suffix = ' ' + label
+ def create(lbl, dev):
+ ret = ''
+ if lbl is not None:
+ ret += lbl + ': '
+ have_down = ('rx_' + xlabel) in network[dev]
+ have_up = ('tx_' + xlabel) in network[dev]
+ if have_down:
+ ret += f(dev, 'rx') + '↓'
+ if have_up:
+ ret += ' '
+ if have_up:
+ ret += f(dev, 'tx') + '↑'
+ ret += suffix
+ if show_bytes or show_total:
+ if dev == '*':
+ for dev in self.pings.keys():
+ for ping in self.pings[dev]:
+ ret += self.ping(ping)
+ elif dev in self.pings:
+ for ping in self.pings[dev]:
+ ret += self.ping(ping)
+ return ret
+ if self.show_all:
+ if self.show_name:
+ net = [create(dev, dev) for dev in network]
+ else:
+ net = [create(None, dev) for dev in network]
+ net = (SEPARATOR if self.show_name else ' ').join(net)
+ else:
+ devsum = {}
+ for dev in network.keys():
+ table = network[dev]
+ for key in table.keys():
+ if key not in devsum:
+ devsum[key] = 0
+ devsum[key] += table[key]
+ network['*'] = devsum
+ if self.show_name:
+ net = create('Net', '*')
+ else:
+ net = create(None, '*')
+ self.net_last = network
+ return net
+
+
+class MyIO(Entry):
+ def __init__(self, *args, fs_name_map = None, fs_ignore = None, fs_type_ignore = None, **kwargs):
+ self.fs_name_map = [] if fs_name_map is None else fs_name_map
+ self.fs_ignore = set([] if fs_ignore is None else fs_ignore)
+ self.fs_type_ignore = set(MyIO.nodev_fs() if fs_type_ignore is None else fs_type_ignore)
+ self.show_value = 0
+ self.show_disc_value = 0
+ self.show_overall = True
+ self.show_disc = 0
+ self.disc_map = None
+ Entry.__init__(self, *args, **kwargs)
+
+ @staticmethod
+ def nodev_fs():
+ ret = []
+ with open('/proc/filesystems', 'rb') as file:
+ data = file.read()
+ data = data.decode('utf-8', 'strict')[:-1].replace('\t', ' ').split('\n')
+ for line in data:
+ cols = line.split(' ')
+ if 'nodev' in cols[:-1]:
+ ret.append(cols[-1])
+ ret += ['fuse.gvfsd-fuse']
+ return ret
+
+ def action(self, col, button, x, y):
+ if button == LEFT_BUTTON:
+ overall = self.show_overall
+ disc_map = self.disc_map
+ if overall and disc_map is not None:
+ x = 0
+ for text, disc in disc_map:
+ if col < x:
+ break
+ w = Bar.coloured_length(text)
+ if col < x + w:
+ if disc is not None:
+ self.show_disc = disc
+ break
+ x += w + 1
+ self.show_overall = not overall
+ elif button == MIDDLE_BUTTON and not self.show_overall:
+ self.show_disc_value = max(self.show_disc_value - 1, 0)
+ elif button == RIGHT_BUTTON and not self.show_overall:
+ self.show_disc_value = min(self.show_disc_value + 1, 4)
+ elif button == SCROLL_UP:
+ if self.show_overall:
+ self.show_value = min(self.show_value + 1, 13)
+ else:
+ show_disc = self.show_disc
+ if isinstance(show_disc, str):
+ try:
+ show_disc = [d.filesystem for d in self.get_discs()].index(show_disc)
+ except:
+ show_disc = -1
+ self.show_disc = show_disc + 1
+ elif button == SCROLL_DOWN:
+ if self.show_overall:
+ self.show_value = max(self.show_value - 1, 0)
+ else:
+ show_disc = self.show_disc
+ if isinstance(show_disc, str):
+ try:
+ show_disc = [d.filesystem for d in self.get_discs()].index(show_disc)
+ except:
+ show_disc = -1
+ self.show_disc = max(show_disc - 1, 0)
+ else:
+ return
+ self.invalidate()
+
+ def colour(self, percent):
+ colour = '39'
+ if percent < 25: colour = '32'
+ if percent > 50: colour = '33'
+ if percent > 90: colour = '31'
+ if percent > 95: colour = '39;41'
+ return colour
+
+ def unit(self, value):
+ units = ['', 'K', 'M', 'G', 'T', 'P', 'E']
+ unit = 0
+ while unit + 1 < len(units) and value >= 1024:
+ unit += 1
+ value /= 1024
+ return (value, units[unit])
+
+ def si(self, value, baseunit = 0):
+ unit = baseunit + 3
+ units = ['n', 'µ', 'm', '', 'k', 'M', 'G', 'T', 'P', 'E']
+ while unit + 1 < len(units) and value >= 1000:
+ unit += 1
+ value /= 1000
+ return (value, units[unit])
+
+ def get_discs(self):
+ discs = Discs()
+ fs = [discs.filesystems[f] for f in sorted(discs.filesystems.keys()) if
+ f not in self.fs_ignore and
+ discs.filesystems[f].fstype not in self.fs_type_ignore and
+ f[0] == '/']
+ return fs
+
+ def function_overall(self):
+ df = []
+ if self.show_value == 0:
+ label = 'Df'
+ self.disc_map = [(label + ':', None)]
+ used = 0
+ available = 0
+ for disc in self.get_discs():
+ percent = disc.used * 100 / (disc.used + disc.available)
+ text = '\033[%sm%.0f\033[0m%%' % (self.colour(percent), percent)
+ df.append(text)
+ self.disc_map.append((text, disc.filesystem))
+ used += disc.used
+ available += disc.available
+ df.append(':')
+ percent = used * 100 / (used + available)
+ df.append('\033[%sm%.0f\033[0m%%' % (self.colour(percent), percent))
+
+ elif self.show_value == 1:
+ label = 'Df'
+ self.disc_map = [(label + ':', None)]
+ used = 0
+ available = 0
+ for disc in self.get_discs():
+ percent = disc.used * 100 / (disc.used + disc.available)
+ text = '\033[%sm%.1f\033[0m%s' % (self.colour(percent), *self.unit(disc.available))
+ df.append(text)
+ self.disc_map.append((text, disc.filesystem))
+ used += disc.used
+ available += disc.available
+ df.append(':')
+ percent = used * 100 / (used + available)
+ df.append('\033[%sm%.1f\033[0m%s' % (self.colour(percent), *self.unit(available)))
+
+ elif self.show_value == 2:
+ label = 'Df(inodes)'
+ self.disc_map = [(label + ':', None)]
+ used = 0
+ available = 0
+ for disc in self.get_discs():
+ percent = disc.iused * 100 / (disc.iused + disc.ifree)
+ text = '\033[%sm%.0f\033[0m%%' % (self.colour(percent), percent)
+ df.append(text)
+ self.disc_map.append((text, disc.filesystem))
+ used += disc.iused
+ available += disc.ifree
+ df.append(':')
+ percent = used * 100 / (used + available)
+ df.append('\033[%sm%.0f\033[0m%%' % (self.colour(percent), percent))
+
+ elif self.show_value == 3:
+ label = 'Dent'
+ dentry = DentryState()
+ percent = (dentry.nr_dentry - dentry.nr_unused) * 100 / dentry.nr_dentry
+ df.append('\033[%sm%.0f\033[0m%%' % (self.colour(percent), percent))
+ df.append('%is' % dentry.age_limit)
+ df.append('%ipages' % dentry.want_pages)
+
+ elif self.show_value == 4:
+ label = 'Inode'
+ inode = InodeState()
+ percent = (inode.nr_inodes - inode.nr_free_inodes) * 100 / inode.nr_inodes
+ df.append('\033[%sm%.0f\033[0m%%%s' % (self.colour(percent), percent,
+ '' if inode.preshrink == 0 else ' preshrink'))
+ elif self.show_value == 5:
+ label = 'File'
+ files = Files()
+ percent = (files.nr_files - files.nr_free_files) * 100 / files.file_max
+ df.append('\033[%sm%.0f\033[0m%%' % (self.colour(percent), percent))
+ df.append('%i/%i/%i' % (files.nr_files - files.nr_free_files, files.nr_files, files.file_max))
+
+ elif self.show_value == 6:
+ label = 'Lock'
+ locks = Locks().locks
+ ar, aw, mr, mw = 0, 0, 0, 0
+ for lock in locks:
+ if lock.mandatory:
+ if lock.shared:
+ mr += 1
+ else:
+ mw += 1
+ else:
+ if lock.shared:
+ ar += 1
+ else:
+ aw += 1
+ df.append('%iar' % ar)
+ df.append('%iaw' % aw)
+ df.append('%imr' % mr)
+ df.append('%imw' % mw)
+ df.append(':')
+ df.append('%ia' % (ar + aw))
+ df.append('%im' % (mr + mw))
+ df.append('%ir' % (ar + mr))
+ df.append('%iw' % (aw + mw))
+ df.append(':')
+ df.append('%i' % (ar + aw + mr + mw))
+
+ elif self.show_value == 7:
+ label = 'Rand'
+ random = Random()
+ df.append(str(random.poolsize))
+
+ elif self.show_value in (8, 11):
+ label = 'Disc(r %s)' % ('sum' if self.show_value == 8 else 'avg')
+ stats = DiscStats().devices.values()
+ r_complete, r_merge, r_sectors, r_time = 0, 0, 0, 0
+ for stat in stats:
+ r_complete += stat.r_complete
+ r_merge += stat.r_merge
+ r_sectors += stat.r_sectors
+ r_time += stat.r_time
+ n = 1 if self.show_value == 8 else len(stats)
+ r_complete /= n
+ r_merge /= n
+ r_sectors /= n
+ r_time /= n
+ df.append('%.1f%scompleted' % self.si(r_complete))
+ df.append('%.1f%smerge' % self.si(r_merge))
+ df.append('%.1f%ssectors' % self.si(r_sectors))
+ df.append('%.1f%ss' % self.si(r_time, -1))
+
+ elif self.show_value in (9, 12):
+ label = 'Disc(w %s)' % ('sum' if self.show_value == 9 else 'avg')
+ stats = DiscStats().devices.values()
+ w_complete, w_merge, w_sectors, w_time = 0, 0, 0, 0
+ for stat in stats:
+ w_complete += stat.w_complete
+ w_merge += stat.w_merge
+ w_sectors += stat.w_sectors
+ w_time += stat.w_time
+ n = 1 if self.show_value == 9 else len(stats)
+ w_complete /= n
+ w_merge /= n
+ w_sectors /= n
+ w_time /= n
+ df.append('%.1f%scomplete' % self.si(w_complete))
+ df.append('%.1f%smerge' % self.si(w_merge))
+ df.append('%.1f%ssectors' % self.si(w_sectors))
+ df.append('%.1f%ss' % self.si(w_time, -1))
+
+ elif self.show_value in (10, 13):
+ label = 'Disc(io %s)' % ('sum' if self.show_value == 10 else 'avg')
+ stats = DiscStats().devices.values()
+ io_current, io_time, io_weighted_time = 0, 0, 0
+ for stat in stats:
+ io_current += stat.io_current
+ io_time += stat.io_time
+ io_weighted_time += stat.io_weighted_time
+ n = 1 if self.show_value == 10 else len(stats)
+ io_current /= n
+ io_time /= n
+ io_weighted_time /= n
+ df.append('%.1f%s' % self.si(io_current))
+ df.append('%.1f%ss' % self.si(io_time, -1))
+ df.append('%.1f%ss(weighted)' % self.si(io_weighted_time, -1))
+
+ return '%s: %s' % (label, ' '.join(df))
+
+ def function_disc(self):
+ discs = self.get_discs()
+ disc = self.show_disc
+ if isinstance(disc, str):
+ for i, d in enumerate(discs):
+ if d.filesystem == disc:
+ disc = i
+ break
+ if isinstance(disc, str):
+ disc = 0
+ elif disc >= len(discs):
+ disc = len(discs) - 1
+ disc = discs[disc]
+ name = disc.filesystem
+ name = self.fs_name_map[name] if name in self.fs_name_map else name.split('/')[-1]
+ df = []
+
+ if self.show_disc_value > 1:
+ try:
+ devices = DiscStats().devices
+ device = None
+ fs = os.path.realpath(disc.filesystem)
+ stat = None
+ for dev in devices.keys():
+ if '/dev/' + dev == fs:
+ stat = devices[dev]
+ break
+ if stat is None:
+ self.show_disc_value = 0
+ except:
+ self.show_disc_value = 0
+
+ if self.show_disc_value == 0:
+ percent = disc.used * 100 / (disc.used + disc.available)
+ text = '\033[%sm%.0f\033[0m%%(%.1f%sB/%.f%sB)' % (self.colour(percent), percent,
+ *self.unit(disc.available),
+ *self.unit(disc.used + disc.available))
+ df.append(text)
+ percent = disc.iused * 100 / (disc.iused + disc.ifree)
+ text = '\033[%sm%.0f\033[0m%%(%.1f%s/%.1f%s)inodes' % (self.colour(percent), percent,
+ *self.si(disc.iused), *self.si(disc.inodes))
+ df.append(text)
+
+ elif self.show_disc_value == 1:
+ df.append(disc.fstype)
+ df.append(disc.mountpoint)
+
+ elif self.show_disc_value == 2:
+ name += '(r)'
+ df.append('%.1f%scompleted' % self.si(stat.r_complete))
+ df.append('%.1f%smerge' % self.si(stat.r_merge))
+ df.append('%.1f%ssectors' % self.si(stat.r_sectors))
+ df.append('%.1f%ss' % self.si(stat.r_time, -1))
+
+ elif self.show_disc_value == 3:
+ name += '(w)'
+ df.append('%.1f%scomplete' % self.si(stat.w_complete))
+ df.append('%.1f%smerge' % self.si(stat.w_merge))
+ df.append('%.1f%ssectors' % self.si(stat.w_sectors))
+ df.append('%.1f%ss' % self.si(stat.w_time, -1))
+
+ elif self.show_disc_value == 4:
+ name += '(io)'
+ df.append('%.1f%s' % self.si(stat.io_current))
+ df.append('%.1f%ss' % self.si(stat.io_time, -1))
+ df.append('%.1f%ss(weighted)' % self.si(stat.io_weighted_time, -1))
+
+ return '%s: %s' % (name, ' '.join(df))
+
+ def function(self):
+ try:
+ self.disc_map = None
+ if self.show_overall:
+ return self.function_overall()
+ else:
+ return self.function_disc()
+ except Exception as e:
+ return str(e)
+
+
+class MyTop(Entry):
+ def __init__(self, *args, **kwargs):
+ self.show_pid = False
+ self.show_cpu = True
+ self.show_label = True
+ self.show_nic = None
+ self.top_cmd = pdeath('HUP', 'top', '-b', '-n', '1', '-o', '%CPU', '-w', '10000')
+ self.refresh()
+ Entry.__init__(self, *args, **kwargs)
+ async(lambda : watch(5, t(self.refresh)), name = 'top')
+
+ def action(self, col, button, x, y):
+ if button == LEFT_BUTTON:
+ self.show_pid = not self.show_pid
+ elif button == MIDDLE_BUTTON:
+ self.show_label = not self.show_label
+ elif button == RIGHT_BUTTON:
+ self.show_cpu = not self.show_cpu
+ else:
+ return
+ self.refresh()
+ self.invalidate()
+
+ def refresh(self):
+ top = spawn_read(*self.top_cmd).split('\n')
+ top = [line for line in top if not line.startswith('%')][6]
+ top = [col for col in top.replace('\t', ' ').split(' ') if not col == '']
+ text = 'Top: ' if self.show_label else ''
+ if self.show_pid:
+ text += top[0] + ' '
+ if self.show_cpu:
+ text += top[6] + ' '
+ text += ' '.join(top[10:])
+ self.text = text
+
+ def function(self):
+ return self.text
+
+
+class MyMOC(Entry):
+ def __init__(self, *args, ignore = None, **kwargs):
+ self.show_state = False
+ self.display = 0
+ self.function_display = Sometimes(self.function_display_, 4)
+ Entry.__init__(self, *args, **kwargs)
+
+ def invalidate(self):
+ self.function_display.counter = 0
+ Entry.invalidate(self)
+
+ def action(self, col, button, x, y):
+ if button == RIGHT_BUTTON:
+ self.show_state = not self.show_state
+ self.invalidate()
+ elif self.show_state:
+ if button == LEFT_BUTTON:
+ self.invalidate()
+ elif button == SCROLL_UP:
+ n = self.display + 1
+ if n >= 6:
+ return
+ self.display = n
+ self.invalidate()
+ elif button == SCROLL_DOWN:
+ n = self.display - 1
+ if n < 0:
+ return
+ self.display = n
+ self.invalidate()
+ else:
+ moc = MOC()
+ if moc.state == MOC.NOT_RUNNING:
+ return
+ elif button == LEFT_BUTTON:
+ if col < 5:
+ return
+ col -= 5
+ if col % 3 == 2:
+ return
+ col //= 3
+ if col == 0: async(lambda : moc.play().wait())
+ elif col == 1: async(lambda : moc.toggle_pause().wait())
+ elif col == 2: async(lambda : moc.stop().wait())
+ elif col == 3: async(lambda : moc.previous().wait())
+ elif col == 4: async(lambda : moc.next().wait())
+ elif button == SCROLL_UP: async(lambda : moc.seek(+5).wait())
+ elif button == SCROLL_DOWN: async(lambda : moc.seek(-5).wait())
+
+ def function_display_(self):
+ moc = MOC()
+ display = self.display
+ if display == 0 or moc.state in (MOC.NOT_RUNNING, MOC.STOPPED):
+ return 'Moc: %s' % { MOC.NOT_RUNNING : 'not running'
+ , MOC.STOPPED : 'stopped'
+ , MOC.PAUSED : 'paused'
+ , MOC.PLAYING : 'playing'}[moc.state]
+ key = ['SongTitle', 'Album', 'Artist', 'Title', 'File'][display - 1]
+ label = key.lower() if display != 1 else 'song'
+ return 'Moc(%s): %s' % (label, moc[key])
+
+ def function(self):
+ if not self.show_state:
+ return 'Moc: |> || [] |< >|'
+ return self.function_display()
+
+
+class MyIPAddress(Entry):
+ def __init__(self, *args, ignore = None, **kwargs):
+ self.ignore = ['lo'] if ignore is None else ignore
+ self.text = None
+ self.show_public = True
+ self.show_nic = None
+ Entry.__init__(self, *args, **kwargs)
+ def init():
+ self.refresh()
+ async(lambda : Clock(sync_to = 10 * Clock.MINUTES).continuous_sync(t(self.refresh)), name = 'ipaddress')
+ async(init, name = 'ipaddress')
+
+ def action(self, col, button, x, y):
+ if button == LEFT_BUTTON:
+ self.show_public = not self.show_public
+ self.invalidate()
+ elif button == RIGHT_BUTTON:
+ async(self.refresh, name = 'ipaddress')
+ elif button in (SCROLL_UP, SCROLL_DOWN):
+ nic = self.show_nic
+ if nic is None:
+ return
+ nics = list(sorted(self.text[0].keys()))
+ try:
+ nic = nics.index(nic)
+ except:
+ if len(nics) == 0:
+ self.show_nic = None
+ self.invalidate()
+ else:
+ self.show_nic = nics[0]
+ self.invalidate()
+ nic += +1 if button == SCROLL_UP else -1
+ if 0 <= nic < len(nics):
+ self.show_nic = nics[nic]
+ self.invalidate()
+
+ def refresh(self):
+ text_private, text_public = {}, {}
+ ipa = IPAddress(*(self.ignore))
+ for nic in ipa.nics.keys():
+ (state, a4, a6, a) = ipa.nics[nic]
+ if state == IPAddress.DOWN:
+ label = '31'
+ elif state == IPAddress.UP:
+ label = '32'
+ elif state == IPAddress.UNKNOWN:
+ label = '33'
+ else:
+ label = '39'
+ label = '\033[%sm%s\033[0m' % (label, nic)
+ pub = label + ':'
+ prv = pub
+ if a4 is not None:
+ prv += ' ' + a4
+ if a6 is not None:
+ prv += ' ' + a6
+ elif a4 is None:
+ prv += ' no address'
+ pub += ' ' + a if a is not None else ' no address'
+ text_private[nic] = prv
+ text_public[nic] = pub
+ self.text = (text_private, text_public)
+ self.invalidate()
+
+ def function(self):
+ text = self.text
+ if text is None:
+ return '...'
+ text = text[1 if self.show_public else 0]
+ nic = self.show_nic
+ if nic is None or nic not in text:
+ if len(text.keys()) == 0:
+ self.show_nic = None
+ return 'No NIC available'
+ nic = sorted(text.keys())[0]
+ self.show_nic = nic
+ return text[nic]
+
+
+class MyLeapsec(Entry):
+ def __init__(self, *args, **kwargs):
+ self.announcement = None
+ Entry.__init__(self, *args, **kwargs)
+ def init():
+ self.refresh()
+ async(lambda : Clock(sync_to = Clock.MINUTES).continuous_sync(Sometimes(t(self.refresh), 12 * 60)), name = 'leapsec')
+ async(init, name = 'leapsec')
+
+ def action(self, col, button, x, y):
+ if button == RIGHT_BUTTON:
+ self.refresh()
+
+ def refresh(self):
+ leapanon = LeapSeconds()
+ found = -1
+ now = time.time()
+ for index in range(len(leapanon)):
+ if leapanon[index][3] >= now:
+ found = index
+ break
+ announcement = leapanon[found]
+ text = ('+%i' if announcement[4] > 0 else '%i') % announcement[4]
+ text += 's ' + ('\033[%sm%s\033[00m ago' if found < 0 else 'in \033[%sm%s\033[00m')
+ text += ' end of UTC %i-%02i-%0i' % announcement[:3]
+ if announcement[5] == LeapSeconds.SECONDARY:
+ text += ' (secondary)'
+ elif announcement[5] == LeapSeconds.OUT_OF_BAND:
+ text += ' (out of band)'
+ self.announcement = (text, announcement[3])
+ self.invalidate()
+
+ def dur(self, t):
+ s, t = t % 60, t // 60
+ m, t = t % 60, t // 60
+ h, d = t % 24, t // 24
+ if d > 0:
+ return '%id%ih%i\'%02i"' % (d, h, m, s)
+ elif h > 0:
+ return '%ih%i\'%02i"' % (h, m, s)
+ elif m > 0:
+ return '%i\'%02i"' % (m, s)
+ else:
+ return '%02is' % s
+
+ def colour(self, time_until):
+ c = '00'
+ if time_until >= 0:
+ if time_until < 10:
+ c = '41'
+ elif time_until < 60:
+ c = '31'
+ elif time_until < 60 * 60:
+ c = '33'
+ elif time_until < 24 * 60 * 60:
+ c = '32'
+ return c
+
+ def function(self):
+ if self.announcement is None:
+ return '...'
+ (text, when) = self.announcement
+ time_until = when - int(time.time())
+ return text % (self.colour(time_until), self.dur(abs(time_until)))
+
+
+class MyTimer(Entry):
+ def __init__(self, *args, **kwargs):
+ self.start = None
+ self.length = None
+ self.notified = False
+ self.pause_text = None
+ mqueue_map['timer'] = self.mqueue
+ Entry.__init__(self, *args, **kwargs)
+
+ def mqueue(self, args):
+ if args[1] == 'stop':
+ self.stop()
+ elif args[1] in ('pause', 'resume'):
+ self.pause()
+ else:
+ duration = args[1].split(':')
+ duration.reverse()
+ seconds = 0
+ if len(duration) > 0: seconds += int(duration[0])
+ if len(duration) > 1: seconds += int(duration[1]) * 60
+ if len(duration) > 2: seconds += int(duration[2]) * 60 * 60
+ if len(duration) > 3: seconds += int(duration[3]) * 60 * 60 * 24
+ self.start = time.time()
+ self.length = seconds
+ self.notified = False
+ self.pause_text = None
+ self.invalidate()
+
+ def action(self, col, button, x, y):
+ if button == LEFT_BUTTON:
+ self.pause()
+ elif button == RIGHT_BUTTON:
+ self.stop()
+
+ def pause(self):
+ if self.pause_text is not None:
+ self.start = time.time()
+ self.pause_text = None
+ elif self.length - int(time.time() - self.start) < 0:
+ self.start, self.length, self.notified, self.pause_text = None, None, False, None
+ else:
+ self.pause_text = ''
+ self.length -= int(time.time() - self.start)
+ self.pause_text = self.dur(self.length)
+ self.invalidate()
+
+ def stop(self):
+ self.start = None
+ self.length = None
+ self.notified = False
+ self.pause_text = None
+ self.invalidate()
+
+ def dur(self, t):
+ s, t = t % 60, t // 60
+ m, t = t % 60, t // 60
+ h, d = t % 24, t // 24
+ if d > 0:
+ return '%id %i:%02i:%02i' % (d, h, m, s)
+ elif h > 0:
+ return '%i:%02i:%02i' % (h, m, s)
+ else:
+ return '%i:%02i' % (m, s)
+
+ def notify(self):
+ subprocess.Popen(['notify-send', '-u', 'critical', 'You are done with your task']).wait()
+
+ def wall(self):
+ subprocess.Popen(['wall', 'You are done with your task']).wait()
+
+ def function(self):
+ if self.pause_text is not None:
+ return '%s (paused)' % self.pause_text
+ if self.start is None:
+ return 'Timer inactive'
+ countdown = self.length - int(time.time() - self.start)
+ if countdown > 0:
+ return self.dur(countdown)
+ if not self.notified:
+ self.notified = True
+ async(self.notify, name = 'timer notify')
+ async(self.wall, name = 'timer wall')
+ countdown %= 2
+ if countdown == 0:
+ return '\033[37;41mYou are done\033[00m'
+ else:
+ return '\033[31mYou are done\033[00m'
+
+
+
+def mqueue_wait():
+ import posix_ipc
+ qkey = '/.xpybar.' + os.environ['DISPLAY'].split('.')[0]
+ q = posix_ipc.MessageQueue(qkey, posix_ipc.O_CREAT, 0o600, 8, 128)
+ while True:
+ try:
+ message = q.receive(None)[0].decode('utf-8', 'replace').split(' ')
+ if message[0] in mqueue_map:
+ try:
+ mqueue_map[message[0]](message)
+ except Exception as err:
+ print('%s: %s' % (sys.argv[0], str(err)), file = sys.stderr, flush = True)
+ else:
+ print('%s: unrecognised message: %s' % (sys.argv[0], ' '.join(message)), file = sys.stderr, flush = True)
+ except:
+ time.sleep(1)
+
+
+
+mqueue_map = {}
+
+myxmonad = MyXMonad(None)
+myscroll = MyScroll(None)
+
+functions = [ [ myscroll
+ , myxmonad
+ , None
+ , MyTimer (None)
+ , MyNetwork (lambda f : Clocked(f, 2))
+ , MyMemory (lambda f : Clocked(f, 2))
+ , MyCPU (lambda f : Clocked(f, 2))
+ , MyBattery (lambda f : Clocked(f, 10))
+ , MyALSA (None)
+ , MyClock (lambda f : Clocked(f, 1))
+ ]
+ , [ myscroll
+ , myxmonad
+ , None
+ , MyTop (None)
+ , MyIO (lambda f : Clocked(f, 10))
+ , MyStat (lambda f : Clocked(f, 10))
+ , MyNews (None)
+ , MyWeather (None)
+ , MyComputer (lambda f : Clocked(f, 20))
+ ]
+ , [ myscroll
+ , myxmonad
+ , None
+ , MyIPAddress(lambda f : Clocked(f, 20))
+ , MyLeapsec (None)
+ , MyMOC (None)
+ ]
+ ]
+
+HEIGHT = 0
+groups = [Group(f) for f in functions]
+groupi = 0
+group = groups[groupi]
+semaphore = threading.Semaphore()
+
+def update_per_clock():
+ if semaphore.acquire(blocking = False):
+ try:
+ for g in groups:
+ for f in g.functions:
+ if isinstance(f.wrapped, Clocked):
+ f(True)
+ finally:
+ semaphore.release()
+ invalidate()
+
+start_ = start
+def start():
+ start_()
+ bar.clear()
+ get_display().flush()
+ async(lambda : clock.continuous_sync(t(update_per_clock)), name = 'clock')
+ async(mqueue_wait, name = 'mqueue')
+
+def redraw():
+ if semaphore.acquire(blocking = False):
+ try:
+ gr = groups[groupi]
+ for g in groups:
+ if g is not gr:
+ for f in g.functions:
+ f()
+ values = gr.pattern % tuple(f() for f in gr.functions)
+ bar.partial_clear(0, bar.width, 10, 0, 2, values)
+ bar.draw_coloured_splitted_text(0, bar.width, 10, 0, 2, values)
+ finally:
+ semaphore.release()
+ return True
+ return False
+
+def unhandled_event(e):
+ global groups, groupi, group
+ if isinstance(e, Xlib.protocol.event.ButtonPress):
+ y = e.event_y
+ x = e.event_x
+ row = y // HEIGHT_PER_LINE
+ lcol = x // bar.font_width
+ rcol = (bar.width - x) // bar.font_width
+ button = e.detail
+ if button in (FORWARD_BUTTON, BACKWARD_BUTTON):
+ groupi = (groupi + (+1 if button == BACKWARD_BUTTON else -1)) % len(groups)
+ group = groups[groupi]
+ invalidate()
+ else:
+ for f in group.posupdate:
+ f.update_position()
+ for f in group.functions:
+ if f.click(row, lcol, rcol, button, x, y):
+ break
+