# -*- python -*-
'''
xpybar – xmobar replacement written in python
Copyright © 2014, 2015, 2016 Mattias Andrée (maandree@member.fsf.org)
This program is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program. If not, see .
'''
# This is a template configuration of a panel that can display almost
# all information and control, but suited for a height constrained screen
# It is recommended to have python-posix_ipc and
# installed
import os
import sys
import pwd
import time
import threading
import subprocess
from util import *
from plugins.alsa import ALSA
from plugins.clock import Clock
from plugins.cpu import CPU
from plugins.cpuinfo import CPUInfo
from plugins.cpuonline import CPUOnline
from plugins.dentrystate import DentryState
from plugins.df import Discs
from plugins.discstats import DiscStats
from plugins.files import Files
from plugins.inodestate import InodeState
from plugins.ipaddress import IPAddress
from plugins.leapsec import LeapSeconds
from plugins.loadavg import AverageLoad
from plugins.locks import Locks
from plugins.mem import Memory
from plugins.moc import MOC
from plugins.network import Network
from plugins.pacman import Pacman
from plugins.ping import Ping
from plugins.random import Random
from plugins.snmp import SNMP
from plugins.snmp6 import SNMP6
from plugins.softirqs import SoftIRQs
from plugins.swaps import Swaps
from plugins.tzclock import TZClock
from plugins.uname import Uname
from plugins.uptime import Uptime
from plugins.users import Users
from plugins.vmstat import VMStat
from plugins.weather import Weather
from plugins.xdisplay import XDisplay
from plugins.xkb import XKB
#from plugins.application import Application
#from plugins.hdparm import HDParm
#from plugins.image import Image
#from plugins.irc import II
#from plugins.kmsg import KMsg
#from plugins.lunar import Lunar
#from plugins.menu import Menu
#from plugins.ropty import ROPTY
#from plugins.solar import Solar
import Xlib.X, Xlib.XK, Xlib.protocol.event
OUTPUT, HEIGHT_PER_LINE, YPOS, TOP = 0, 12, 0, True
clock = Clock(sync_to = 0.5)
TERMINAL = 'terminator'
LEFT_BUTTON = 1
MIDDLE_BUTTON = 2
RIGHT_BUTTON = 3
SCROLL_UP = 4
SCROLL_DOWN = 5
SCROLL_LEFT = 6
SCROLL_RIGHT = 7
FORWARD_BUTTON = 8 # X1
BACKWARD_BUTTON = 9 # X2
limited = lambda v : min(max(int(v + 0.5), 0), 100)
def t_(f):
try:
f()
except:
time.sleep(1)
t = lambda f : (lambda : t_(f))
def pdeath(signal, *command):
try:
path = os.environ['PATH'].split(':')
for p in path:
p += '/pdeath'
if os.access(p, os.X_OK, effective_ids = True):
return (p, signal, *command)
except:
pass
return command
HOME = pwd.getpwuid(os.getuid()).pw_dir
SEPARATOR = ' │ '
def invalidate():
try:
bar.invalidate()
except:
pass
class Entry:
def __init__(self, wrap = None, left = True, line = 0, prev = None):
self.left = left
self.line = line
self.prev = prev
self.width = 0
self.offset = 0
self.last_text = None
self.short_layout = None
self.wrapped = self.function if wrap is None else wrap(self.function)
def invalidate(self):
wrapper = self.wrapped
if isinstance(wrapper, Clocked):
wrapper = wrapper.sometimes
if isinstance(wrapper, Sometimes):
wrapper.counter = 0
invalidate()
def action(self, col, button, x, y):
pass
def click(self, row, lcol, rcol, button, x, y):
if row == self.line:
col = (lcol if self.left else rcol) - self.offset
if 0 <= col < self.width:
if self.left:
x -= self.offset * bar.font_width
else:
col = self.width - 1 - col
x = bar.width - x
x -= self.offset * bar.font_width
x = self.width * bar.font_width - x
y -= self.line * HEIGHT_PER_LINE
try:
self.action(col, button, x, y)
except Exception as err:
print(str(err), file = sys.stderr, flush = True)
return True
return False
def function(self):
return SEPARATOR
def __call__(self, *args, **kwargs):
try:
text = self.wrapped(*args, **kwargs)
if text is not self.last_text:
self.width = Bar.coloured_length(text)
self.last_text = text
return text
except Exception as err:
print(err)
def update_position(self):
if self.prev is not None:
self.prev.update_position()
self.offset = self.prev.offset + self.prev.width
class Group:
def __init__(self, functions):
global HEIGHT
self.pattern = ''
self.posupdate = []
left = None
right = None
lineno = 0
atleft = True
newline = False
lastright = None
new_functions = []
separator = None
for entry in functions:
if entry is None:
newline = not atleft
lineno += 1 if newline else 0
atleft = not atleft
continue
entry.left = atleft
entry.line = lineno
if newline:
self.pattern += '\n'
newline = False
if left is not None:
self.posupdate.append(left)
if right is not None:
self.posupdate.append(right)
left = None
right = None
lastright = None
if entry.left:
if left is not None:
separator = Entry(left = True)
new_functions.append(separator)
self.pattern += '%s'
separator.prev = left
left = separator
entry.prev = left
left = entry
elif right is None:
if left is not None:
self.pattern += '\0 '
right = entry
lastright = entry
else:
separator = Entry(left = False)
new_functions.append(separator)
self.pattern += '%s'
lastright.prev = separator
separator.prev = entry
lastright = entry
new_functions.append(entry)
self.pattern += '%s'
if left is not None:
self.posupdate.append(left)
if right is not None:
self.posupdate.append(right)
height = (lineno + 1) * HEIGHT_PER_LINE
if HEIGHT < height:
HEIGHT = height
self.functions = new_functions
class MyXMonad(Entry):
def __init__(self, *args, mod = Xlib.XK.XK_Super_L, ppWsSep = '', ppSep = ' ', **kwargs):
self.text = ''
self.text_short = ''
self.ws_sep_len = len(ppWsSep)
self.sep_len = len(ppSep)
self.mod = mod
self.show_full = False
self.short_layout = None
Entry.__init__(self, *args, **kwargs)
async(lambda : forever(t(self.refresh)), name = 'xmonad')
def action(self, col, button, x, y):
if button == MIDDLE_BUTTON:
self.show_full = not self.show_full
self.invalidate()
return
text = self.text
full = self.show_full
UTF8_STRING = display.intern_atom('UTF8_STRING')
_XMONAD_LOG = display.intern_atom('_XMONAD_LOG')
_NET_DESKTOP_NAMES = display.intern_atom('_NET_DESKTOP_NAMES')
xmonad_log = display.screen().root.get_full_property(_XMONAD_LOG, UTF8_STRING).value
xmonad_log = xmonad_log.split(' : ')
net_desktop_names = display.screen().root.get_full_property(_NET_DESKTOP_NAMES, UTF8_STRING).value
net_desktop_names = net_desktop_names[:-1].split('\0')
ws = 0
ws_hit = False
while ws < len(net_desktop_names):
if ws > 0:
col -= self.ws_sep_len
if col < 0:
break
width = len(net_desktop_names[ws] if full else str(ws + 1)) + 2
if col < width:
ws_hit = True
break
ws += 1
col -= width
if ws < len(net_desktop_names):
if button == LEFT_BUTTON:
self.switch_workspace(ws + 1)
elif button in (SCROLL_UP, SCROLL_DOWN):
xmonad_log = xmonad_log[0].split(' ')
active = [net_desktop_names.index(w[1:-1]) for w in xmonad_log if w[:1] == '<']
ws = [net_desktop_names.index(w[1:-1]) for w in xmonad_log if w[:1] == '['][0]
ws += -1 if button == SCROLL_UP else +1
ws %= len(net_desktop_names)
while ws in active:
ws += -1 if button == SCROLL_UP else +1
ws %= len(net_desktop_names)
self.switch_workspace(ws + 1)
elif button == LEFT_BUTTON:
col -= self.sep_len
layout = xmonad_log[1]
if not self.show_full and self.short_layout is not None:
layout = self.short_layout
if col < len(layout) + 2:
display.xtest_fake_input(Xlib.X.KeyPress, display.keysym_to_keycode(self.mod))
display.xtest_fake_input(Xlib.X.KeyPress, display.keysym_to_keycode(Xlib.XK.XK_space))
display.xtest_fake_input(Xlib.X.KeyRelease, display.keysym_to_keycode(Xlib.XK.XK_space))
display.xtest_fake_input(Xlib.X.KeyRelease, display.keysym_to_keycode(self.mod))
display.flush()
def refresh(self):
try:
text = input()
except:
sys.exit(0)
buf, esc = '', None
for c in text:
if esc is not None:
esc += c
if esc == '^':
buf += '^'
esc = None
elif esc[-1] == ')':
if esc.startswith('bg(') or esc.startswith('fg('):
c = 4 if esc.startswith('bg(') else 3
esc = esc[3 : -1]
if esc == '':
buf += '\033[%i9m' % c
else:
r, g, b = esc[1 : 3], esc[3 : 5], esc[5 : 7]
r, g, b = int(r, 16), int(g, 16), int(b, 16)
r, g, b = str(r), str(g), str(b)
buf += '\033[%i8;2;%sm' % (c, ';'.join([r, g, b]))
esc = None
elif c == '^':
esc = ''
else:
buf += c
self.text = buf
short = ''
space, esc, layout, lbuf = False, False, 0, ''
ws = 1
for c in buf:
if esc:
short += c
esc = not (c == '~' or 'a' <= c.lower() <= 'z')
elif c == '\033':
esc = True
if layout == 2:
layout = 3
end = ''
while lbuf.startswith(' '):
short += ' '
lbuf = lbuf[1:]
while lbuf.endswith(' '):
end += ' '
lbuf = lbuf[:-1]
self.short_layout = self.shorten_layout(lbuf)
short += self.short_layout + end
short += c
elif ws <= 9:
if c in ' <>[]':
space = not space
if not space:
short += '%i' % ws
ws += 1
if ws == 10:
layout = 1
short += c
elif not space:
short += c
elif layout == 1:
if c == ' ':
short += c
else:
layout = 2
lbuf += c
elif layout == 2:
lbuf += c
else:
short += c
self.text_short = short
self.invalidate()
def shorten_layout(self, text):
text = text.split(' ')
incl = []
for word in text:
if word not in ('ImageButtonDeco', 'Maximize', 'Minimize'):
incl.append(word)
return ' '.join(incl).replace('Mirror ', '\\')
def switch_workspace(self, ws):
display.xtest_fake_input(Xlib.X.KeyPress, display.keysym_to_keycode(self.mod))
display.xtest_fake_input(Xlib.X.KeyPress, display.keysym_to_keycode(Xlib.XK.XK_0 + ws))
display.xtest_fake_input(Xlib.X.KeyRelease, display.keysym_to_keycode(Xlib.XK.XK_0 + ws))
display.xtest_fake_input(Xlib.X.KeyRelease, display.keysym_to_keycode(self.mod))
display.flush()
def function(self):
return self.text if self.show_full else self.text_short
class MyScroll(Entry):
def function(self):
return '↓↑'
def action(self, col, button, x, y):
global groups, groupi, group
if button in (LEFT_BUTTON, RIGHT_BUTTON, SCROLL_UP, SCROLL_DOWN):
groupi = (groupi + (+1 if button in (LEFT_BUTTON, SCROLL_DOWN) else -1)) % len(groups)
group = groups[groupi]
self.invalidate()
class MyClock(Entry):
def __init__(self, *args, format = '%Y-%m-%d %T', long_format = '%Y-(%m)%b-%d %T, %a w%V, %Z', tzs = None, **kwargs):
self.default_format = format
self.long_format = long_format
self.clock = clock
self.set_format(self.clock, self.default_format)
self.tzs = [] if tzs is None else tzs
self.tzi = 0
mqueue_map['timefmt'] = self.mqueue
mqueue_map['tz'] = self.mqueue
Entry.__init__(self, *args, **kwargs)
def mqueue(self, args):
if args[0] == 'timefmt':
args = args[1:]
if len(args) == 0:
self.action(0, RIGHT_BUTTON, 0, 0)
else:
self.set_format(self.clock, ' '.join(args))
self.invalidate()
if args[0] == 'tz':
args = args[1:]
if len(args) == 0:
self.action(0, LEFT_BUTTON, 0, 0)
else:
old_clock = self.clock
tz = '/'.join(args)
try:
tzclock = TZClock(timezone = tz, format = self.clock.u_format)
except:
try:
tzclock = TZClock(timezone = tz.upper(), format = self.clock.u_format)
except:
print('%s: unknown typezone: %s' % (sys.argv[0], tz), file = sys.stderr, flush = True)
return
tzclock.u_format = tzclock.format
self.clock = tzclock
if old_clock is not clock:
del old_clock
self.invalidate()
def action(self, col, button, x, y):
if button == LEFT_BUTTON:
if self.clock is clock:
clock.utc = not clock.utc
self.set_format(clock, self.clock.u_format)
else:
self.set_format(clock, self.clock.u_format)
self.clock = clock
self.invalidate()
elif button == RIGHT_BUTTON:
if self.clock.u_format == self.default_format:
self.set_format(self.clock, self.long_format)
else:
self.set_format(self.clock, self.default_format)
self.invalidate()
elif button in (SCROLL_UP, SCROLL_DOWN):
i = self.tzi + (+1 if button == SCROLL_UP else -1)
if not (0 <= i <= len(self.tzs)):
return
self.tzi = i
old_clock = self.clock
if i > 0:
tzclock = TZClock(timezone = self.tzs[i - 1], format = self.clock.u_format)
tzclock.u_format = tzclock.format
self.clock = tzclock
else:
self.clock = clock
if old_clock is not clock:
del old_clock
self.invalidate()
def set_format(self, clck, format):
clck.u_format = format
if not (clck is clock and clck.utc):
clck.format = format
else:
fmt, esc = '', False
for c in format:
if esc:
fmt += c
if c == '%' or 'a' <= c.lower() <= 'z':
esc = False
if c == 'Z':
while fmt[-1] != '%':
fmt = fmt[:-1]
fmt = fmt[:-1] + 'UTC'
elif c == '%':
esc = True
fmt += c
else:
fmt += c
clck.format = fmt
def function(self):
clck = self.clock
colour = '0'
if clck is not clock or clck.utc:
colour += ';7'
return '\033[%sm%s\033[00m' % (colour, clck.read())
class MyComputer(Entry):
def __init__(self, *args, locks = None, **kwargs):
self.have_linux_libre, self.have_pacman = True, None
self.linux_installed, self.linux_latest = None, None
self.last_uname_read = ''
self.last_users_read = ''
self.xdisplay = None
self.you = pwd.getpwuid(os.getuid()).pw_name
self.display = 0
self.show_detailed = False
self.keys = []
if locks is None:
locks = ['num', 'cap']
self.num_lock = 'num' in locks or 'number' in locks or 'numerical' in locks or 'numpad' in locks
self.cap_lock = 'cap' in locks or 'caps' in locks
self.scr_lock = 'scr' in locks or 'scroll' in locks or 'scrl' in locks
if self.num_lock:
self.keys.append(Xlib.XK.XK_Num_Lock)
if self.cap_lock:
self.keys.append(Xlib.XK.XK_Caps_Lock)
if self.scr_lock:
self.keys.append(Xlib.XK.XK_Scroll_Lock)
Entry.__init__(self, *args, **kwargs)
def action(self, col, button, x, y):
if button == LEFT_BUTTON:
if self.display == 3:
col -= 5
if col % 4 == 3:
return
col //= 4
key = self.keys[col]
display.xtest_fake_input(Xlib.X.KeyPress, display.keysym_to_keycode(key))
display.xtest_fake_input(Xlib.X.KeyRelease, display.keysym_to_keycode(key))
display.flush()
elif button == MIDDLE_BUTTON:
self.show_detailed = not self.show_detailed
self.invalidate()
elif button == RIGHT_BUTTON:
if self.display == 0:
async(lambda : subprocess.Popen(['mate-system-monitor']).wait())
else:
self.invalidate()
elif button == SCROLL_UP:
self.display = min(self.display + 1, 2) ## TODO the keyboard support is not too good
self.invalidate()
elif button == SCROLL_DOWN:
self.display = max(self.display - 1, 0)
self.invalidate()
def function_uname(self):
uname = Uname()
nodename = uname.nodename
kernel_release = uname.kernel_release
operating_system = uname.operating_system
lts = '-lts' if kernel_release.lower().endswith('-lts') else ''
if self.have_pacman is None:
self.have_pacman = True
try:
self.linux_installed = Pacman('linux-libre' + lts, True)
except:
self.have_linux_libre = False
try:
self.linux_installed = Pacman('linux' + lts, True)
except:
self.have_pacman = False
if self.have_pacman:
try:
self.linux_latest = Pacman(('linux-libre' if self.have_linux_libre else 'linux') + lts, False)
except:
self.have_pacman = None
elif self.have_pacman:
try:
self.linux_installed = Pacman(('linux-libre' if self.have_linux_libre else 'linux') + lts, True)
self.linux_latest = Pacman(('linux-libre' if self.have_linux_libre else 'linux') + lts, False)
except:
self.have_pacman = None
if (self.have_pacman is not None) and self.have_pacman:
linux_running = kernel_release.split('-')
linux_running, kernel_release = linux_running[:2], linux_running[2:]
linux_running = '-'.join(linux_running)
kernel_release = '-' + '-'.join(kernel_release)
self.linux_installed = self.linux_installed.version
self.linux_latest = self.linux_latest.version
if self.linux_installed == self.linux_latest:
if linux_running == self.linux_installed:
linux_running = '\033[32m%s\033[39m' % linux_running
else:
if linux_running == self.linux_installed:
linux_running = '\033[33m%s\033[39m' % linux_running
else:
linux_running = '\033[31m%s\033[39m' % linux_running
kernel_release = linux_running + kernel_release
rc = '%s %s %s'
rc %= (nodename, kernel_release, operating_system)
self.last_uname_read = rc
return rc
def function_users(self): ## TODO use inotify
users = Users('devfs').users
def colour_user(user):
if user == 'root': return '\033[31m%s\033[39m'
elif not user == self.you: return '\033[33m%s\033[39m'
else: return '%s'
users = ['%s{%i}' % (colour_user(u) % u, len(users[u])) for u in users.keys()]
users = 'Usr: %s' % (' '.join(users))
self.last_users_read = users
return users
def function_xdisplay(self):
if self.xdisplay is None:
x = XDisplay()
if x.connection is None:
self.xdisplay = 'No DISPLAY environment variable set'
self.xdisplay_detailed = self.xdisplay
else:
self.xdisplay = 'X: %s' % x.connection
self.xdisplay_detailed = 'Host: %s' % ('(localhost)' if x.host is None else x.host)
self.xdisplay_detailed += SEPARATOR
self.xdisplay_detailed += 'Display: %s' % (x.display)
if x.screen is not None:
self.xdisplay_detailed += SEPARATOR
self.xdisplay_detailed += 'Screen: %s' % (x.screen)
if x.vt is not None:
self.xdisplay += '%sVt: %i' % (SEPARATOR, x.vt)
self.xdisplay_detailed += '%sVt: %i' % (SEPARATOR, x.vt)
return self.xdisplay_detailed if self.show_detailed else self.xdisplay
def function_xkb(self): ## add update listener
xkb = XKB().get_locks()
kbd = 'Kbd:'
if self.num_lock:
kbd += ' \033[3%imnum\033[0m' % (1 if (xkb & XKB.NUM) == 0 else 2)
if self.cap_lock:
kbd += ' \033[3%imcap\033[0m' % (1 if (xkb & XKB.CAPS) == 0 else 2)
if self.scr_lock:
kbd += ' \033[3%imscr\033[0m' % (1 if (xkb & XKB.SCROLL) == 0 else 2)
return kbd
def function(self):
if self.display == 0:
return self.function_uname()
elif self.display == 1:
return self.function_users()
elif self.display == 2:
return self.function_xdisplay()
elif self.display == 3:
return self.function_xkb()
class MyWeather(Entry):
def __init__(self, *args, stations = None, fahrenheit = False, wind = 'm/s', from_wind = False, visibility = 'km', **kwargs):
self.semaphore = threading.Semaphore()
self.stations = [MyWeather.get_metar_station()] if stations is None else [s.upper() for s in stations]
self.reports = [None] * len(self.stations)
self.up_to_date = [False] * len(self.stations)
self.in_deg_f = fahrenheit
self.wind_unit = wind
self.from_wind = from_wind
self.visibility_unit = visibility
self.show_temp = True
self.show_wind_speed = False
self.show_wind_gusts = False
self.show_wind_dir = False
self.show_wind_chill = False
self.show_humidity = False
self.show_dew = False
self.show_pressure = False
self.show_visibility = False
self.show_sky = False
self.show_station = False
self.station = 0
self.updated = False
self.segments = []
self.refresh = Sometimes(lambda : async(self.refresh_, name = 'weather'), 20 * 60 * 4)
self.refresh()
Entry.__init__(self, *args, **kwargs)
def action(self, col, button, x, y):
if button == MIDDLE_BUTTON:
self.show_temp = True
self.show_wind_speed = True
self.show_wind_gusts = True
self.show_wind_dir = True
self.show_wind_chill = True
self.show_humidity = True
self.show_dew = True
self.show_pressure = True
self.show_visibility = True
self.show_sky = True
self.show_station = True
self.invalidate()
elif button == RIGHT_BUTTON:
self.refresh.counter = 0
self.refresh()
elif button in (LEFT_BUTTON, SCROLL_UP, SCROLL_DOWN):
index = None
for i, seg in enumerate(self.segments):
if seg is None:
continue
(x, width) = seg
if 0 <= col - x < width:
index = i
break
if index is None:
return
if index == 0: # station
if button == LEFT_BUTTON:
self.show_station = False
elif button == SCROLL_UP:
station = self.station
if station + 1 < len(self.stations):
self.station = station + 1
else:
station = self.station
if station > 0:
self.station = station - 1
elif index == 1: # temperature
if button == LEFT_BUTTON:
self.show_temp = False
else:
self.in_deg_f = button == SCROLL_UP
elif index == 2: # wind speed/gusts
if button == LEFT_BUTTON:
if self.plusminus is None or col < self.plusminus:
self.show_wind_speed = False
else:
self.show_wind_gusts = False
else:
if self.wind_unit == 'm/s':
self.wind_unit = 'mph' if button == SCROLL_UP else 'm/s'
elif self.wind_unit == 'mph':
self.wind_unit = 'kn' if button == SCROLL_UP else 'm/s'
elif self.wind_unit in ('kn', 'kt', 'knot', 'knots'):
self.wind_unit = 'km/h' if button == SCROLL_UP else 'mph'
elif self.wind_unit in ('km/h', 'kmph'):
self.wind_unit = 'ft/s' if button == SCROLL_UP else 'kn'
elif self.wind_unit in ('ft/s', 'ftps'):
self.wind_unit = 'b' if button == SCROLL_UP else 'km/h'
else:
self.wind_unit = 'b' if button == SCROLL_UP else 'ft/s'
elif index == 3: # wind direction
if button == LEFT_BUTTON:
self.show_wind_dir = False
else:
self.from_wind = button == SCROLL_UP
elif index == 4: # wind chill
if button == LEFT_BUTTON:
self.show_wind_chill = False
else:
self.in_deg_f = button == SCROLL_UP
elif index == 5: # relative humidity
if button == LEFT_BUTTON:
self.show_humidity = False
else:
return
elif index == 6: # dew point
if button == LEFT_BUTTON:
self.show_dew = False
else:
self.in_deg_f = button == SCROLL_UP
elif index == 7: # pressure
if button == LEFT_BUTTON:
self.show_pressure = False
else:
return
elif index == 8: # visibility
if button == LEFT_BUTTON:
self.show_visibility = False
elif button == SCROLL_UP:
unit = self.visibility_unit
if unit in {'km' : 'm', 'm' : 'mi'}:
self.visibility_unit = {'km' : 'm', 'm' : 'mi'}[unit]
else:
unit = self.visibility_unit
if unit in {'mi' : 'm', 'mile' : 'm', 'm' : 'km'}:
self.visibility_unit = {'mi' : 'm', 'mile' : 'm', 'm' : 'km'}[unit]
elif index == 9: # sky conditions
if button == LEFT_BUTTON:
self.show_sky = False
else:
return
self.invalidate()
def refresh_(self):
if self.semaphore.acquire(blocking = False):
for index, station in enumerate(self.stations):
try:
self.reports[index] = Weather(station)
self.up_to_date[index] = True
except:
self.up_to_date[index] = False
self.semaphore.release()
self.invalidate()
def invalidate(self):
self.updated = False
Entry.invalidate(self)
def get_weather(self, index):
station = self.stations[index]
report = self.reports[index]
up_to_date = self.up_to_date[index]
station = station[0] + station[1:].lower()
if report is None:
if self.show_station:
return '%s: ?' % station
else:
return '?'
text = []
if self.show_temp and report.temp is not None:
value = report.temp
colour = '34'
if value < -10: colour = '39;44'
if value >= 18: colour = '39'
if value >= 25: colour = '33'
if value >= 30: colour = '31'
if self.in_deg_f:
value = value * 9 / 5 + 32
value = '\033[%sm%.0f\033[0m%s' % (colour, value, '°F' if self.in_deg_f else '°C')
text.append(value)
else:
text.append(None)
if self.show_wind_speed and report.wind_speed is not None:
knot = report.wind_speed
kmph = knot * 1.852 # km/h
mps = kmph / 3.6 # m/s
mph = knot * 1.151 # mph
ftps = mps * 3.281 # ft/s
beaufort = (mps / 0.8365) ** (2 / 3) # B
if self.wind_unit in ('kn', 'kt', 'knot', 'knots'):
value, unit = knot, 'kn'
elif self.wind_unit in ('km/h', 'kmph'):
value, unit = kmph, 'km/h'
elif self.wind_unit == 'mph':
value, unit = mph, 'mph'
elif self.wind_unit in ('ft/s', 'ftps'):
value, unit = ftps, 'ft/s'
elif self.wind_unit.lower() in ('beaufort', 'beaufort scale', 'bf', 'bt', 'b'):
value, unit = beaufort, 'B'
else:
value, unit = mps, 'm/s'
beaufort = int(beaufort + 0.5)
colour = '32'
if beaufort >= 1: colour = '39'
if beaufort >= 3: colour = '32'
if beaufort >= 7: colour = '33'
if beaufort >= 10: colour = '31'
if beaufort >= 12: colour = '39;41'
wind = '\033[%sm%.0f\033[0m' % (colour, value)
if self.show_wind_gusts and report.wind_gusts is not None:
knot = report.wind_gusts
kmph = knot * 1.852 # km/h
mps = kmph / 3.6 # m/s
mph = knot * 1.151 # mph
ftps = mps * 3.281 # ft/s
beaufort = (mps / 0.8365) ** (2 / 3) # B
if self.wind_unit in ('kn', 'kt', 'knot', 'knots'):
value = knot
elif self.wind_unit in ('km/h', 'kmph'):
value = kmph
elif self.wind_unit == 'mph':
value = mph
elif self.wind_unit in ('ft/s', 'ftps'):
value = ftps
elif self.wind_unit.lower() in ('beaufort', 'beaufort scale', 'bf', 'bt', 'b'):
value = beaufort
else:
value = mps
beaufort = int(beaufort + 0.5)
colour = '32'
if beaufort >= 1: colour = '39'
if beaufort >= 3: colour = '32'
if beaufort >= 7: colour = '33'
if beaufort >= 10: colour = '31'
if beaufort >= 12: colour = '39;41'
wind += '±\033[%sm%.0f\033[0m' % (colour, value)
wind += unit
text.append(wind)
else:
text.append(None)
def wind_dir(direction):
if not self.from_wind:
direction += 180
direction %= 360
if direction < 0:
direction += 360
if (direction < 90):
if direction == 0:
return 'N'
if direction <= 45:
return 'N%.0f°E' % direction
return 'E%.0f°N' % (90 - direction)
elif (90 <= direction < 180):
if direction == 0:
return 'E'
direction -= 90
if direction <= 45:
return 'E%.0f°S' % direction
return 'S%.0f°E' % (90 - direction)
elif (180 <= direction < 270):
if direction == 0:
return 'S'
direction -= 180
if direction <= 45:
return 'S%.0f°W' % direction
return 'W%.0f°S' % (90 - direction)
else:
if direction == 0:
return 'W'
direction -= 270
if direction <= 45:
return 'W%.0f°N' % direction
return 'N%.0f°W' % (90 - direction)
if not self.show_wind_dir:
wind = None
elif report.wind_var is not None:
wind = wind_dir(report.wind_var[0]) + '-' + wind_dir(report.wind_var[1])
elif report.wind_dir is not None:
wind = wind_dir(report.wind_dir)
else:
wind = None
if wind is not None:
if self.from_wind:
wind += '→'
else:
wind = '→' + wind
text.append(wind)
else:
text.append(None)
if self.show_wind_chill and report.wind_chill is not None:
value = report.wind_chill
# Coloured by frostbite time, however the wind is not taken
# into account so the colour is approximate
colour = '39'
if value <= -28: colour = '33'
if value <= -37: colour = '31'
if value <= -46: colour = '39;41'
if self.in_deg_f:
value = value * 9 / 5 + 32
value = 'w\033[%sm%.0f\033[0m%s' % (colour, value, '°F' if self.in_deg_f else '°C')
text.append(value)
else:
text.append(None)
if self.show_humidity and report.humidity is not None:
value = report.humidity
colour = '34'
if value >= 26: colour = '32'
if value >= 31: colour = '39'
if value >= 37: colour = '33'
if value >= 52: colour = '31'
if value >= 73: colour = '39;41'
value = '\033[%sm%.0f\033[0m%%' % (colour, value)
text.append(value)
else:
text.append(None)
if self.show_dew and report.dew is not None:
value = report.dew
colour = '34'
if value >= 10: colour = '32'
if value >= 13: colour = '39'
if value >= 16: colour = '33'
if value >= 21: colour = '31'
if value >= 26: colour = '39;41'
if self.in_deg_f:
value = value * 9 / 5 + 32
value = 'd\033[%sm%.0f\033[0m%s' % (colour, value, '°F' if self.in_deg_f else '°C')
text.append(value)
else:
text.append(None)
if self.show_pressure and report.pressure is not None:
text.append('%.0fhPa' % report.pressure)
else:
text.append(None)
if self.show_visibility and report.visibility is not None:
mile = report.visibility
km = mile * 1.609344
m = mile * 1609.344
if self.visibility_unit in ('mile', 'mi'):
value, unit = mile, 'mi'
elif self.visibility_unit == 'm':
value, unit = m, 'm'
else:
value, unit = km, 'km'
text.append('%.0f%s' % (value, unit))
else:
text.append(None)
if self.show_sky and 'Sky conditions' in report.fields:
value = report.fields['Sky conditions']
if not value == '':
text.append(value)
else:
text.append(None)
else:
text.append(None)
segments = []
x = 0
if self.show_station:
w = len(station)
segments.append((x, w))
x += w + 2
else:
segments.append(None)
x -= 1
self.plusminus = None
for i, segment in enumerate(text):
if segment is None:
segments.append(None)
else:
if i == 1 and '±' in segment:
self.plusminus = x + Bar.coloured_length(segment.split('±')[0])
w = Bar.coloured_length(segment)
x += 1
segments.append((x, w))
x += w
self.segments = segments
text = [s for s in text if s is not None]
if len(text) == 0:
return station
if not up_to_date:
if len(text) <= 1:
text = ' '.join(text) + '?'
else:
text = ' '.join(text) + ' ?'
else:
text = ' '.join(text)
if self.show_station:
text = '%s: %s' % (station, text)
return text
def function(self):
if not self.updated:
self.updated = True
self.text = self.get_weather(self.station)
rc = self.text
self.refresh()
return rc
@staticmethod
def get_metar_station():
try:
with open(HOME + '/.config/metar', 'rb') as file:
station = file.read()
except:
try:
with open('/etc/metar', 'rb') as file:
station = file.read()
except:
print('~/.config/metar is not set', file = sys.stderr, flush = True)
station = b'ESSA'
return station.decode('utf-8', 'strict').split('\n')[0].upper()
class MyNews(Entry):
def __init__(self, *args, status_path = None, **kwargs):
if status_path is not None:
self.status_path = status_path
else:
self.status_path = HOME + '/.var/lib/featherweight/status'
self.get_news()
Entry.__init__(self, *args, **kwargs)
async(self.refresh, name = 'news')
def action(self, col, button, x, y):
if button == LEFT_BUTTON:
async(lambda : subprocess.Popen([TERMINAL, '-e', 'featherweight']).wait())
elif button == RIGHT_BUTTON:
async(lambda : subprocess.Popen(['featherweight', '--update', '--system']).wait())
def get_news(self):
try:
with open(self.status_path, 'rb') as file:
status = int(file.read().decode('utf-8', 'replace').replace('\n', ''))
except:
status = 0
colour = '31'
if status <= 0: colour = '0'
elif status <= 5: colour = '32'
elif status <= 10: colour = '33'
self.text = 'News: \033[%sm%i\033[0m' % (colour, status)
def refresh(self):
while True:
try:
spawn_read(*pdeath('HUP', 'inotifywait', self.status_path, '-e', 'close_write'))
self.get_news()
except:
time.sleep(1)
def function(self):
return self.text
class MyALSA(Entry):
def __init__(self, *args, timeout = None, sleep = None, mixers = None, **kwargs):
if timeout is not None:
self.timeout = timeout
else:
self.timeout = 5
if sleep is not None:
self.sleep = sleep
else:
self.sleep = 5
if mixers is not None:
mixers = mixers
else:
mixers = ('Master', 'PCM')
self.alsa = []
for mixer in mixers:
try:
self.alsa.append(ALSA(mixername = mixer))
except:
pass
try:
import posix_ipc
method = 0
except:
method = 2
try:
path = os.environ['PATH'].split(':')
for p in path:
p += '/cmdipc'
if os.access(p, os.F_OK | os.X_OK, effective_ids = True):
method = 1
break
except:
pass
self.sep_width = Bar.coloured_length(SEPARATOR)
self.get_volume()
Entry.__init__(self, *args, **kwargs)
async((self.refresh_posix_ipc, self.refresh_cmdipc, self.refresh_wait)[method], name = 'alsa')
def action(self, col, button, x, y):
mixer = 0
mixer_text = self.text.split(SEPARATOR)
while mixer < len(mixer_text): ## the limit is just a precaution
mixer_width = Bar.coloured_length(mixer_text[mixer])
if col >= mixer_width:
col -= mixer_width
if col < self.sep_width:
return
col -= self.sep_width
else:
break
mixer += 1
mixer_text = mixer_text[mixer]
channel = -1
mixer_text = mixer_text.split(': ')
mixer_label = ': '.join(mixer_text[:-1]) + ': '
mixer_text = mixer_text[-1].split(' ')
mixer_label = Bar.coloured_length(mixer_label)
if col >= mixer_label:
col -= mixer_label
while channel < len(mixer_text): ## the limit is just a precaution
channel += 1
channel_width = Bar.coloured_length(mixer_text[channel])
if col < channel_width:
break
col -= channel_width
if col < 1:
channel = -1
break
col -= 1
mixer = self.alsa[mixer]
if button == LEFT_BUTTON:
muted = mixer.get_mute()
muted = not (any(muted) if channel == -1 else muted[channel])
mixer.set_mute(muted, channel)
elif button == RIGHT_BUTTON:
volume = mixer.get_volume()
volume = limited(sum(volume) / len(volume))
mixer.set_volume(volume, -1)
elif button in (SCROLL_UP, SCROLL_DOWN):
volume = mixer.get_volume()
adj = -5 if button == SCROLL_DOWN else 5
if channel < 0:
for ch in range(len(volume)):
mixer.set_volume(limited(volume[ch] + adj), ch)
else:
mixer.set_volume(limited(volume[channel] + adj), channel)
else:
return
self.get_volume()
self.invalidate()
def get_volume(self):
text_v = lambda v : '--%' if v is None else ('%2i%%' % v)[:3]
read_m = lambda m : '%s: %s' % (m.mixername, ' '.join(text_v(v) for v in m.get_volume()))
self.text = SEPARATOR.join(read_m(m) for m in self.alsa)
def refresh_posix_ipc(self):
import posix_ipc
s = posix_ipc.Semaphore('/.xpybar.alsa.0', posix_ipc.O_CREAT, 0o600, 1)
c = posix_ipc.Semaphore('/.xpybar.alsa.1', posix_ipc.O_CREAT, 0o600, 0)
q = posix_ipc.Semaphore('/.xpybar.alsa.2', posix_ipc.O_CREAT, 0o600, 0)
while True:
try:
s.release() ; c.release()
try:
q.acquire(timeout)
except posix_ipc.BusyError:
pass
c.acquire(None)
s.acquire(None)
self.get_volume()
except:
time.sleep(self.sleep)
def refresh_cmdipc(self):
command = pdeath('HUP', 'cmdipc', '-PCck', '/.xpybar.alsa.0/.xpybar.alsa.1/.xpybar.alsa.2', 'wait', '-b%i' % self.timeout)
while True:
try:
spawn_read(*command)
self.get_volume()
except:
time.sleep(self.sleep)
def refresh_wait(self):
while True:
try:
time.sleep(self.sleep)
self.get_volume()
except:
pass
def function(self):
return self.text
class MyBattery(Entry):
def __init__(self, *args, **kwargs):
self.show_all = False
self.show_battery = 0
Entry.__init__(self, *args, **kwargs)
def action(self, col, button, x, y):
if button == LEFT_BUTTON:
self.show_all = not self.show_all
self.invalidate()
elif button == SCROLL_UP:
self.show_battery += 1
self.invalidate()
elif button == SCROLL_DOWN:
self.show_battery -= 1
if self.show_battery < 0:
self.show_battery = 0
self.invalidate()
def colourise_(self, text):
try:
value = text.replace(',', '.')
value = float(value) if '.' in value else int(value)
colour = '39'
if value < 80: colour = '32'
if value < 50: colour = '33'
if value < 15: colour = '31'
if value < 5: colour = '7;31'
return '\033[%sm%s\033[0m' % (colour, text)
except:
return text
def colourise(self, text):
buf = []
state = None
for c in reversed(text):
if c == '\t':
c = ' '
if state is not None:
if c == ' ':
if state == '':
buf.append(c)
continue
buf.append(self.colourise_(state))
buf.append(c)
state = None
else:
state = c + state
elif c == '%':
buf.append(c)
state = ''
else:
buf.append(c)
if state is not None and state != '':
buf.append(self.colourise_(state))
return ''.join(reversed(buf))
def function(self):
try:
data = spawn_read('acpi').split('\n')
except:
return 'acpi missing'
if self.show_battery >= len(data):
self.show_battery = len(data) - 1
if len(data) == 1:
return self.colourise(': '.join(data[0].split(': ')[1:]))
if self.show_all:
data = [self.colourise(': '.join(datum.split(': ')[1:])) for datum in data]
data = SEPARATOR.join('Bat%i: %s' % (i, datum) for i, datum in enumerate(data))
return data
data = data[self.show_battery]
data = ': '.join(data.split(': ')[1:])
data = 'Bat%i: %s' % (self.show_battery, self.colourise(data))
return data
class MyCPU(Entry):
def __init__(self, *args, **kwargs):
self.last_sirq_split = SoftIRQs()
cpu = CPU()
self.last_time = time.monotonic()
self.last_cpu_stat = cpu.cpu
self.last_cpu_total = sum(cpu.cpu)
self.last_cpus_total = [sum(c) for c in cpu.cpus]
self.last_cpus_stat = [[c[s] for c in cpu.cpus] for s in range(len(cpu.cpu))]
self.last_intr = cpu.intr_total
self.last_ctxt = cpu.ctxt
self.last_fork = cpu.processes
self.last_sirq = cpu.softirq_total
self.none = self.colourise(None)
self.coloured = tuple(self.colourise(i) for i in range(101))
self.show_all = False
self.show_function = 0
self.functions = [ lambda *v : self.function_cpu(CPU.idle, '', *v)
, self.function_intr
, self.function_ctxt
, self.function_fork
, self.function_proc
, self.function_sirq
]
for key in self.last_sirq_split.keys:
make = lambda key : (lambda *v : self.function_sirq_split(key, *v))
self.functions.append(make(key))
self.functions += [ lambda *v : self.function_cpu(CPU.user, '(user)', *v)
, lambda *v : self.function_cpu(CPU.nice, '(nice)', *v)
, lambda *v : self.function_cpu(CPU.system, '(system)', *v)
, lambda *v : self.function_cpu(CPU.iowait, '(iowait)', *v)
, lambda *v : self.function_cpu(CPU.irq, '(irq)', *v)
, lambda *v : self.function_cpu(CPU.softirq, '(softirq)', *v)
, lambda *v : self.function_cpu(CPU.steal, '(steal)', *v)
, lambda *v : self.function_cpu(CPU.guest, '(guest)', *v)
, lambda *v : self.function_cpu(CPU.guest_nice, '(guest nice)', *v)
, self.function_load
, self.function_task
, self.function_pid
, self.function_online
]
Entry.__init__(self, *args, **kwargs)
def action(self, col, button, x, y):
if button == LEFT_BUTTON:
self.show_all = not self.show_all
self.invalidate()
elif button == SCROLL_UP:
n = self.show_function + 1
if n < len(self.functions):
self.show_function = n
self.invalidate()
elif button == SCROLL_DOWN:
n = self.show_function - 1
if n >= 0:
self.show_function = n
self.invalidate()
def usage(self, now_stat, now_total, last_stat, last_total, idle):
total = now_total - last_total
stat = now_stat - last_stat
return None if total == 0 else (total - stat if idle else stat) * 100 / total
def colourise_(self, value):
if value is None:
return self.none
else:
return self.coloured[limited(value)]
def colourise(self, value):
if value is None:
return '--%'
elif value >= 100:
return '\033[31m100\033[0m'
colour = '39'
if value >= 5: colour = '32'
if value >= 50: colour = '33'
if value >= 90: colour = '31'
return '\033[%sm%2i\033[0m%%' % (colour, value)
def function_cpu(self, stat, name, cpu, display, tdiff):
now_cpu_stat = self.now_cpu_stat[stat]
now_cpus_stat = self.now_cpus_stat[stat]
last_cpus_stat = self.last_cpus_stat[stat]
last_cpu_stat = self.last_cpu_stat[stat]
if display:
cpu = (now_cpu_stat, self.now_cpu_total, last_cpu_stat, self.last_cpu_total)
cpu = self.colourise_(self.usage(*cpu, idle = stat == CPU.idle))
if self.show_all:
cpus = (now_cpus_stat, self.now_cpus_total, last_cpus_stat, self.last_cpus_total)
cpus = ' '.join(self.colourise_(self.usage(*c, idle = stat == CPU.idle)) for c in zip(*cpus))
cpu = 'Cpu%s: %s : %s' % (name, cpus, cpu)
else:
cpu = 'Cpu%s: %s' % (name, cpu)
return cpu
def function_intr(self, cpu, display, tdiff):
now_intr = cpu.intr_total
if display:
cpu = 'Intr: %.0fHz' % ((now_intr - self.last_intr) / tdiff)
self.last_intr = now_intr
return cpu
def function_ctxt(self, cpu, display, tdiff):
now_ctxt = cpu.ctxt
if display:
cpu = 'Ctxt: %.0fHz' % ((now_ctxt - self.last_ctxt) / tdiff)
self.last_ctxt = now_ctxt
return cpu
def function_fork(self, cpu, display, tdiff):
now_fork = cpu.processes
if display:
cpu = 'Fork: %.0fHz' % ((now_fork - self.last_fork) / tdiff)
self.last_fork = now_fork
return cpu
def function_sirq(self, cpu, display, tdiff):
now_sirq = cpu.softirq_total
if display:
cpu = 'Sirq: %.0fHz' % ((now_sirq - self.last_sirq) / tdiff)
self.last_sirq = now_sirq
return cpu
def function_proc(self, cpu, display, tdiff):
if display:
cpu = 'Proc: %irun %iio' % (cpu.procs_running, cpu.procs_blocked)
return cpu
def function_load(self, cpu, display, tdiff):
if display:
load = AverageLoad()
cpu = 'Load: %.2f %.2f %.2f' % (load.total_avg_5_min, load.total_avg_10_min, load.total_avg_15_min)
return cpu
def function_task(self, cpu, display, tdiff):
if display:
load = AverageLoad()
cpu = 'Task: %i/%i (%.0f%%)' % (load.active_tasks, load.total_tasks, load.active_tasks * 100 / load.total_tasks)
return cpu
def function_pid(self, cpu, display, tdiff):
if display:
load = AverageLoad()
cpu = 'Last PID: %i' % load.last_pid
return cpu
def function_online(self, cpu, display, tdiff):
if display:
try:
cpu, cpuonline, on, off = '', CPUOnline(), 0, 0
online, offline = cpuonline.online, cpuonline.offline
while on < len(online) and off < len(offline):
if online[on] < offline[off]:
cpu += ' \033[32m%s\033[0m' % online[on]
on += 1
else:
cpu += ' \033[31m%s\033[0m' % offline[off]
off += 1
cpu += ''.join(' \033[32m%s\033[0m' % c for c in online[on:])
cpu += ''.join(' \033[31m%s\033[0m' % c for c in offline[off:])
cpu = 'Online:%s' % cpu
except Exception as e:
cpu = str(e)
return cpu
def function_sirq_split(self, key, cpu, display, tdiff):
if display:
label = 'Sirq(%s)' % key.lower().replace('_', ' ').replace(' rx', '↓').replace(' tx', '↑')
now = self.now_sirq_split[key]
last = self.last_sirq_split[key]
hz = lambda n, l : '%0.fHz' % ((n - l) / tdiff)
n = len(now)
snow, slast = sum(now), sum(last)
anow, alast = snow / n, slast / n
each = ''
if self.show_all:
each = '%s : ' % ' '.join(hz(n, l) for n, l in zip(now, last))
else:
each = ''
cpu = '%s: %s%s(%s)' % (label, each, hz(snow, slast), hz(anow, alast))
return cpu
def function(self):
now = time.monotonic()
cpu = CPU()
self.now_sirq_split = SoftIRQs()
tdiff = now - self.last_time
self.last_time = now
display = self.show_function
self.now_cpu_stat = cpu.cpu
self.now_cpu_total = sum(cpu.cpu)
self.now_cpus_total = [sum(c) for c in cpu.cpus]
self.now_cpus_stat = [[c[s] for c in cpu.cpus] for s in range(len(cpu.cpu))]
for i in range(len(self.functions)):
if i == display:
ret = self.functions[i](cpu, True, tdiff)
else:
self.functions[i](cpu, False, tdiff)
self.last_cpus_stat = self.now_cpus_stat
self.last_cpu_stat = self.now_cpu_stat
self.last_cpus_total = self.now_cpus_total
self.last_cpu_total = self.now_cpu_total
self.last_sirq_split = self.now_sirq_split
return ret
class MyMemory(Entry):
def __init__(self, *args, **kwargs):
self.coloured = tuple(self.colourise(i) for i in range(101))
self.show_labels = False
self.show_value = -1
self.show_default = False
self.show_swaps = False
self.show_swap = 0
self.labels = list(sorted(Memory().keys))
swaps = Swaps()
self.prio = swaps.header_map['Priority']
self.used = swaps.header_map['Used']
self.size = swaps.header_map['Size']
self.swap = swaps.header_map['Filename']
Entry.__init__(self, *args, **kwargs)
def action(self, col, button, x, y):
if button == LEFT_BUTTON:
if self.show_value >= 0:
self.show_default = not self.show_default
self.invalidate()
elif button == MIDDLE_BUTTON:
self.show_labels = not self.show_labels
self.invalidate()
elif button == RIGHT_BUTTON:
self.show_swaps = not self.show_swaps
self.invalidate()
elif button == SCROLL_UP:
if self.show_swaps:
self.show_swap += 1
self.invalidate()
else:
n = self.show_value + 1
if n < len(self.labels):
self.show_value = n
self.invalidate()
elif button == SCROLL_DOWN:
if self.show_swaps:
n = self.show_swap
if n > 0:
self.show_swap = n - 1
self.invalidate()
else:
self.show_value -= 1
if self.show_value < -1:
self.show_value = -1
self.invalidate()
def u(self, value):
units = 'KMGTPE'
unit = 0
while unit + 1 < len(units) and value >= 1024:
unit += 1
value /= 1024
return '%.1f%s' % (value, units[unit])
def colourise(self, value):
if value >= 100:
return '\033[31m100\033[0m'
colour = '39'
if value > 30: colour = '32'
if value > 50: colour = '33'
if value > 80: colour = '31'
return '\033[%sm%i\033[0m%%' % (colour, value)
def function(self):
if self.show_swaps:
swaps = Swaps()
while True:
if len(swaps.swaps) == 0:
return 'no swaps available'
try:
swap = swaps.swaps[self.show_swap]
used, size = int(swap[self.used]), int(swap[self.size])
swap, prio = swap[self.swap], int(swap[self.prio])
return '%s: %s (%sB/%sB, prio %i)' % (swap, self.colourise(used * 100 / size),
self.u(used), self.u(size), prio)
except:
self.show_swap -= 1
if self.show_swap < 0:
self.show_swaps = False
return self.function()
else:
memory = Memory()
if not self.show_default and self.show_value > 0:
label = self.labels[self.show_value]
try:
value = memory[label]
unit = 0
units = ['K', 'M', 'G', 'T', 'P', 'E']
while (unit + 1 < len(units)) and (value >= 1024):
value /= 1024
unit += 1
return '%s: %.1f%sB' % (label, value, units[unit])
except:
self.show_value = -1
if memory.mem_total == 0:
mem = '---'
shm = '---'
else:
mem = self.coloured[limited(memory.mem_used * 100 / memory.mem_total)]
shm = self.coloured[limited(memory.shmem * 100 / memory.mem_total)]
if memory.swap_total == 0:
swp = 'n/a'
else:
swp = self.coloured[limited(memory.swap_used * 100 / memory.swap_total)]
if self.show_labels:
mem = 'Mem: %s%sSwp: %s%sShm: %s' % (mem, SEPARATOR, swp, SEPARATOR, shm)
else:
mem = 'Mem: %s %s %s' % (mem, swp, shm)
return mem
class MyStat(Entry):
def __init__(self, *args, **kwargs):
self.keys_4 = SNMP().keys
self.keys_6 = SNMP6().keys
self.keys_vm = VMStat().keys
self.show_group = 0
self.show_value_4 = 0
self.show_value_6 = 0
self.show_value_vm = 0
Entry.__init__(self, *args, **kwargs)
def action(self, col, button, x, y):
if button == LEFT_BUTTON:
n = self.show_group + 1
if n < 4:
self.show_group = n
self.invalidate()
elif button == MIDDLE_BUTTON:
self.invalidate()
elif button == RIGHT_BUTTON:
n = self.show_group
if n > 0:
self.show_group = n - 1
self.invalidate()
elif button == SCROLL_UP:
if self.show_group == 0:
n = self.show_value_4
if n + 1 < len(self.keys_4):
self.show_value_4 = n + 1
self.invalidate()
elif self.show_group == 1:
n = self.show_value_6
if n + 1 < len(self.keys_6):
self.show_value_6 = n + 1
self.invalidate()
elif self.show_group == 2:
n = self.show_value_vm
if n + 1 < len(self.keys_vm):
self.show_value_vm = n + 1
self.invalidate()
elif self.show_group == 3:
pass
elif button == SCROLL_DOWN:
if self.show_group == 0:
n = self.show_value_4
if n > 0:
self.show_value_4 = n - 1
self.invalidate()
elif self.show_group == 1:
n = self.show_value_6
if n > 0:
self.show_value_6 = n - 1
self.invalidate()
elif self.show_group == 2:
n = self.show_value_vm
if n > 0:
self.show_value_vm = n - 1
self.invalidate()
elif self.show_group == 3:
pass
def function(self):
try:
if self.show_group == 0:
label = self.keys_4[self.show_value_4]
value = SNMP()[label]
elif self.show_group == 1:
label = self.keys_6[self.show_value_6]
value = SNMP6()[label]
elif self.show_group == 2:
label = self.keys_vm[self.show_value_vm]
value = VMStat()[label]
label = ''.join(w[:1].upper() + w[1:] for w in label.split('_'))
elif self.show_group == 3:
uptime = Uptime()
avg = uptime.uptime_seconds - uptime.average_idle_seconds
wrk = (avg) * 100 / uptime.uptime_seconds
up = '%id%i:%02i' % uptime.uptime[:3]
avg = '%id%i:%02i' % Uptime.split_time(avg)[:3]
return 'Uptime: %.0f%% %s / %s' % (wrk, avg, up)
return '%s: %i' % (label, value)
except Exception as e:
return str(e)
class MyNetwork(Entry):
def __init__(self, *args, limits = None, ignore = None, pings = None, **kwargs):
self.limits = { 'rx_bytes' : None # Download speed in bytes (not bits)
, 'tx_bytes' : None # Upload speed in bytes (not bits)
, 'rx_total' : None # Download cap in bytes
, 'tx_total' : None # Upload cap in bytes
}
if limits is not None:
self.limits = limits
if pings is None:
pings = ['gateway']
elif isinstance(pings, str) or isinstance(pings, Ping):
pings = [pings]
pings = [(Ping(targets = Ping.get_nics(p)) if isinstance(p, str) else p).monitors for p in pings]
self.pings = {}
for ping in pings:
for nic in ping.keys():
if nic not in self.pings:
self.pings[nic] = ping[nic]
else:
self.pings[nic] += ping[nic]
self.ignore = ['lo'] if ignore is None else ignore
self.net_time = time.monotonic()
self.net_last = {}
self.show_all = False
self.show_name = False
self.show_value = 0
self.labels = ['bytes', 'total', 'packets', 'errs',
'drop', 'fifo', 'frame', 'colls',
'carrier', 'compressed', 'multicast']
self.in_bytes = False # in bits if showing total
Entry.__init__(self, *args, **kwargs)
def action(self, col, button, x, y):
if button == LEFT_BUTTON:
self.show_all = not self.show_all
elif button == MIDDLE_BUTTON:
self.show_name = not self.show_name
elif button == RIGHT_BUTTON:
self.in_bytes = not self.in_bytes
elif button == SCROLL_UP:
n = self.show_value + 1
if n >= len(self.labels):
return
self.show_value = n
elif button == SCROLL_DOWN:
n = self.show_value - 1
if self.show_value < 0:
return
self.show_value = n
else:
return
self.invalidate()
def colourise(self, value, percent):
colour = '39'
if percent > 25: colour = '32'
if percent > 50: colour = '33'
if percent > 90: colour = '31'
return '\033[%sm%3.0f\033[0m' % (colour, value)
def KBps(self, network, device, direction, net_tdiff):
direction += '_bytes'
value = network[device][direction]
limit = None
if direction in self.limits:
limit = self.limits[direction]
if limit is None:
limit = 12500000 # 100 mbps
if device in self.net_last:
value -= self.net_last[device][direction]
else:
value = 0
percent = value * 100 / (limit * net_tdiff)
value /= 1024 * net_tdiff
return self.colourise(value, percent)
def kbps(self, network, device, direction, net_tdiff):
direction += '_bytes'
value = network[device][direction]
limit = None
if direction in self.limits:
limit = self.limits[direction]
if limit is None:
limit = 12500000 # 100 mbps
if device in self.net_last:
value -= self.net_last[device][direction]
else:
value = 0
percent = value * 100 / (limit * net_tdiff)
value /= 128 * net_tdiff
return self.colourise(value, percent)
def total(self, network, device, direction, bits):
direction += '_bytes'
value = network[device][direction]
limit = self.limits[direction] if direction in self.limits else None
if limit is not None:
percent = value * 100 / limit
colour = '32'
if percent >= 25: colour = '39'
if percent >= 50: colour = '33'
if percent >= 75: colour = '31'
if percent >= 95: colour = '7;33'
if percent >= 100: colour = '7;31'
else:
colour = '39'
if bits:
value *= 8
unit = 0
if bits:
units = ['', 'k', 'm', 'g', 't', 'p', 'e']
else:
units = ['', 'K', 'M', 'G', 'T', 'P', 'E']
while (unit + 1 < len(units)) and (value >= 1024):
value /= 1024
unit += 1
return '\033[%sm%.1f\033[0m%s' % (colour, value, units[unit])
def ping(self, monitor):
monitor.semaphore.acquire()
try:
latency = monitor.get_latency(True)[1]
droptime = monitor.dropped_time(True)
if droptime:
return ' \033[31m%4is\033[00m' % droptime
elif latency is None:
return ' \033[31mdrop?\033[00m'
colour = '31'
if latency < 5: colour = '32'
elif latency < 10: colour = '39'
elif latency < 20: colour = '33'
return ' \033[%sm%5.2f\033[00m' % (colour, latency)
finally:
monitor.semaphore.release()
def function(self):
net_now = time.monotonic()
net_tdiff, self.net_time = net_now - self.net_time, net_now
network = Network(*self.ignore).devices
label = self.labels[self.show_value]
show_total = label == 'total'
show_bytes = label == 'bytes'
xlabel = 'bytes' if show_total else label
suffix = ''
if show_total and self.in_bytes:
f = lambda dev, dir : self.total(network, dev, dir, True) + 'b'
elif show_total:
f = lambda dev, dir : self.total(network, dev, dir, False) + 'B'
elif show_bytes and self.in_bytes:
f = lambda dev, dir : self.KBps(network, dev, dir, net_tdiff) + 'KB/s'
elif show_bytes:
f = lambda dev, dir : self.kbps(network, dev, dir, net_tdiff) + 'kbps'
else:
f = lambda dev, dir : '%i' % network[dev][dir + '_' + label]
suffix = ' ' + label
def create(lbl, dev):
ret = ''
if lbl is not None:
ret += lbl + ': '
have_down = ('rx_' + xlabel) in network[dev]
have_up = ('tx_' + xlabel) in network[dev]
if have_down:
ret += f(dev, 'rx') + '↓'
if have_up:
ret += ' '
if have_up:
ret += f(dev, 'tx') + '↑'
ret += suffix
if show_bytes or show_total:
if dev == '*':
for dev in self.pings.keys():
for ping in self.pings[dev]:
ret += self.ping(ping)
elif dev in self.pings:
for ping in self.pings[dev]:
ret += self.ping(ping)
return ret
if self.show_all:
if self.show_name:
net = [create(dev, dev) for dev in network]
else:
net = [create(None, dev) for dev in network]
net = (SEPARATOR if self.show_name else ' ').join(net)
else:
devsum = {}
for dev in network.keys():
table = network[dev]
for key in table.keys():
if key not in devsum:
devsum[key] = 0
devsum[key] += table[key]
network['*'] = devsum
if self.show_name:
net = create('Net', '*')
else:
net = create(None, '*')
self.net_last = network
return net
class MyIO(Entry):
def __init__(self, *args, fs_name_map = None, fs_ignore = None, fs_type_ignore = None, **kwargs):
self.fs_name_map = [] if fs_name_map is None else fs_name_map
self.fs_ignore = set([] if fs_ignore is None else fs_ignore)
self.fs_type_ignore = set(MyIO.nodev_fs() if fs_type_ignore is None else fs_type_ignore)
self.show_value = 0
self.show_disc_value = 0
self.show_overall = True
self.show_disc = 0
self.disc_map = None
Entry.__init__(self, *args, **kwargs)
@staticmethod
def nodev_fs():
ret = []
with open('/proc/filesystems', 'rb') as file:
data = file.read()
data = data.decode('utf-8', 'strict')[:-1].replace('\t', ' ').split('\n')
for line in data:
cols = line.split(' ')
if 'nodev' in cols[:-1]:
ret.append(cols[-1])
ret += ['fuse.gvfsd-fuse']
return ret
def action(self, col, button, x, y):
if button == LEFT_BUTTON:
overall = self.show_overall
disc_map = self.disc_map
if overall and disc_map is not None:
x = 0
for text, disc in disc_map:
if col < x:
break
w = Bar.coloured_length(text)
if col < x + w:
if disc is not None:
self.show_disc = disc
break
x += w + 1
self.show_overall = not overall
elif button == MIDDLE_BUTTON and not self.show_overall:
self.show_disc_value = max(self.show_disc_value - 1, 0)
elif button == RIGHT_BUTTON and not self.show_overall:
self.show_disc_value = min(self.show_disc_value + 1, 4)
elif button == SCROLL_UP:
if self.show_overall:
self.show_value = min(self.show_value + 1, 13)
else:
show_disc = self.show_disc
if isinstance(show_disc, str):
try:
show_disc = [d.filesystem for d in self.get_discs()].index(show_disc)
except:
show_disc = -1
self.show_disc = show_disc + 1
elif button == SCROLL_DOWN:
if self.show_overall:
self.show_value = max(self.show_value - 1, 0)
else:
show_disc = self.show_disc
if isinstance(show_disc, str):
try:
show_disc = [d.filesystem for d in self.get_discs()].index(show_disc)
except:
show_disc = -1
self.show_disc = max(show_disc - 1, 0)
else:
return
self.invalidate()
def colour(self, percent):
colour = '39'
if percent < 25: colour = '32'
if percent > 50: colour = '33'
if percent > 90: colour = '31'
if percent > 95: colour = '39;41'
return colour
def unit(self, value):
units = ['', 'K', 'M', 'G', 'T', 'P', 'E']
unit = 0
while unit + 1 < len(units) and value >= 1024:
unit += 1
value /= 1024
return (value, units[unit])
def si(self, value, baseunit = 0):
unit = baseunit + 3
units = ['n', 'µ', 'm', '', 'k', 'M', 'G', 'T', 'P', 'E']
while unit + 1 < len(units) and value >= 1000:
unit += 1
value /= 1000
return (value, units[unit])
def get_discs(self):
discs = Discs()
fs = [discs.filesystems[f] for f in sorted(discs.filesystems.keys()) if
f not in self.fs_ignore and
discs.filesystems[f].fstype not in self.fs_type_ignore and
f[0] == '/']
return fs
def function_overall(self):
df = []
if self.show_value == 0:
label = 'Df'
self.disc_map = [(label + ':', None)]
used = 0
available = 0
for disc in self.get_discs():
percent = disc.used * 100 / (disc.used + disc.available)
text = '\033[%sm%.0f\033[0m%%' % (self.colour(percent), percent)
df.append(text)
self.disc_map.append((text, disc.filesystem))
used += disc.used
available += disc.available
df.append(':')
percent = used * 100 / (used + available)
df.append('\033[%sm%.0f\033[0m%%' % (self.colour(percent), percent))
elif self.show_value == 1:
label = 'Df'
self.disc_map = [(label + ':', None)]
used = 0
available = 0
for disc in self.get_discs():
percent = disc.used * 100 / (disc.used + disc.available)
text = '\033[%sm%.1f\033[0m%s' % (self.colour(percent), *self.unit(disc.available))
df.append(text)
self.disc_map.append((text, disc.filesystem))
used += disc.used
available += disc.available
df.append(':')
percent = used * 100 / (used + available)
df.append('\033[%sm%.1f\033[0m%s' % (self.colour(percent), *self.unit(available)))
elif self.show_value == 2:
label = 'Df(inodes)'
self.disc_map = [(label + ':', None)]
used = 0
available = 0
for disc in self.get_discs():
percent = disc.iused * 100 / (disc.iused + disc.ifree)
text = '\033[%sm%.0f\033[0m%%' % (self.colour(percent), percent)
df.append(text)
self.disc_map.append((text, disc.filesystem))
used += disc.iused
available += disc.ifree
df.append(':')
percent = used * 100 / (used + available)
df.append('\033[%sm%.0f\033[0m%%' % (self.colour(percent), percent))
elif self.show_value == 3:
label = 'Dent'
dentry = DentryState()
percent = (dentry.nr_dentry - dentry.nr_unused) * 100 / dentry.nr_dentry
df.append('\033[%sm%.0f\033[0m%%' % (self.colour(percent), percent))
df.append('%is' % dentry.age_limit)
df.append('%ipages' % dentry.want_pages)
elif self.show_value == 4:
label = 'Inode'
inode = InodeState()
percent = (inode.nr_inodes - inode.nr_free_inodes) * 100 / inode.nr_inodes
df.append('\033[%sm%.0f\033[0m%%%s' % (self.colour(percent), percent,
'' if inode.preshrink == 0 else ' preshrink'))
elif self.show_value == 5:
label = 'File'
files = Files()
percent = (files.nr_files - files.nr_free_files) * 100 / files.file_max
df.append('\033[%sm%.0f\033[0m%%' % (self.colour(percent), percent))
df.append('%i/%i/%i' % (files.nr_files - files.nr_free_files, files.nr_files, files.file_max))
elif self.show_value == 6:
label = 'Lock'
locks = Locks().locks
ar, aw, mr, mw = 0, 0, 0, 0
for lock in locks:
if lock.mandatory:
if lock.shared:
mr += 1
else:
mw += 1
else:
if lock.shared:
ar += 1
else:
aw += 1
df.append('%iar' % ar)
df.append('%iaw' % aw)
df.append('%imr' % mr)
df.append('%imw' % mw)
df.append(':')
df.append('%ia' % (ar + aw))
df.append('%im' % (mr + mw))
df.append('%ir' % (ar + mr))
df.append('%iw' % (aw + mw))
df.append(':')
df.append('%i' % (ar + aw + mr + mw))
elif self.show_value == 7:
label = 'Rand'
random = Random()
df.append(str(random.poolsize))
elif self.show_value in (8, 11):
label = 'Disc(r %s)' % ('sum' if self.show_value == 8 else 'avg')
stats = DiscStats().devices.values()
r_complete, r_merge, r_sectors, r_time = 0, 0, 0, 0
for stat in stats:
r_complete += stat.r_complete
r_merge += stat.r_merge
r_sectors += stat.r_sectors
r_time += stat.r_time
n = 1 if self.show_value == 8 else len(stats)
r_complete /= n
r_merge /= n
r_sectors /= n
r_time /= n
df.append('%.1f%scompleted' % self.si(r_complete))
df.append('%.1f%smerge' % self.si(r_merge))
df.append('%.1f%ssectors' % self.si(r_sectors))
df.append('%.1f%ss' % self.si(r_time, -1))
elif self.show_value in (9, 12):
label = 'Disc(w %s)' % ('sum' if self.show_value == 9 else 'avg')
stats = DiscStats().devices.values()
w_complete, w_merge, w_sectors, w_time = 0, 0, 0, 0
for stat in stats:
w_complete += stat.w_complete
w_merge += stat.w_merge
w_sectors += stat.w_sectors
w_time += stat.w_time
n = 1 if self.show_value == 9 else len(stats)
w_complete /= n
w_merge /= n
w_sectors /= n
w_time /= n
df.append('%.1f%scomplete' % self.si(w_complete))
df.append('%.1f%smerge' % self.si(w_merge))
df.append('%.1f%ssectors' % self.si(w_sectors))
df.append('%.1f%ss' % self.si(w_time, -1))
elif self.show_value in (10, 13):
label = 'Disc(io %s)' % ('sum' if self.show_value == 10 else 'avg')
stats = DiscStats().devices.values()
io_current, io_time, io_weighted_time = 0, 0, 0
for stat in stats:
io_current += stat.io_current
io_time += stat.io_time
io_weighted_time += stat.io_weighted_time
n = 1 if self.show_value == 10 else len(stats)
io_current /= n
io_time /= n
io_weighted_time /= n
df.append('%.1f%s' % self.si(io_current))
df.append('%.1f%ss' % self.si(io_time, -1))
df.append('%.1f%ss(weighted)' % self.si(io_weighted_time, -1))
return '%s: %s' % (label, ' '.join(df))
def function_disc(self):
discs = self.get_discs()
disc = self.show_disc
if isinstance(disc, str):
for i, d in enumerate(discs):
if d.filesystem == disc:
disc = i
break
if isinstance(disc, str):
disc = 0
elif disc >= len(discs):
disc = len(discs) - 1
disc = discs[disc]
name = disc.filesystem
name = self.fs_name_map[name] if name in self.fs_name_map else name.split('/')[-1]
df = []
if self.show_disc_value > 1:
try:
devices = DiscStats().devices
device = None
fs = os.path.realpath(disc.filesystem)
stat = None
for dev in devices.keys():
if '/dev/' + dev == fs:
stat = devices[dev]
break
if stat is None:
self.show_disc_value = 0
except:
self.show_disc_value = 0
if self.show_disc_value == 0:
percent = disc.used * 100 / (disc.used + disc.available)
text = '\033[%sm%.0f\033[0m%%(%.1f%sB/%.f%sB)' % (self.colour(percent), percent,
*self.unit(disc.available),
*self.unit(disc.used + disc.available))
df.append(text)
percent = disc.iused * 100 / (disc.iused + disc.ifree)
text = '\033[%sm%.0f\033[0m%%(%.1f%s/%.1f%s)inodes' % (self.colour(percent), percent,
*self.si(disc.iused), *self.si(disc.inodes))
df.append(text)
elif self.show_disc_value == 1:
df.append(disc.fstype)
df.append(disc.mountpoint)
elif self.show_disc_value == 2:
name += '(r)'
df.append('%.1f%scompleted' % self.si(stat.r_complete))
df.append('%.1f%smerge' % self.si(stat.r_merge))
df.append('%.1f%ssectors' % self.si(stat.r_sectors))
df.append('%.1f%ss' % self.si(stat.r_time, -1))
elif self.show_disc_value == 3:
name += '(w)'
df.append('%.1f%scomplete' % self.si(stat.w_complete))
df.append('%.1f%smerge' % self.si(stat.w_merge))
df.append('%.1f%ssectors' % self.si(stat.w_sectors))
df.append('%.1f%ss' % self.si(stat.w_time, -1))
elif self.show_disc_value == 4:
name += '(io)'
df.append('%.1f%s' % self.si(stat.io_current))
df.append('%.1f%ss' % self.si(stat.io_time, -1))
df.append('%.1f%ss(weighted)' % self.si(stat.io_weighted_time, -1))
return '%s: %s' % (name, ' '.join(df))
def function(self):
try:
self.disc_map = None
if self.show_overall:
return self.function_overall()
else:
return self.function_disc()
except Exception as e:
return str(e)
class MyTop(Entry):
def __init__(self, *args, **kwargs):
self.show_pid = False
self.show_cpu = True
self.show_label = True
self.show_nic = None
self.top_cmd = pdeath('HUP', 'top', '-b', '-n', '1', '-o', '%CPU', '-w', '10000')
self.refresh()
Entry.__init__(self, *args, **kwargs)
async(lambda : watch(5, t(self.refresh)), name = 'top')
def action(self, col, button, x, y):
if button == LEFT_BUTTON:
self.show_pid = not self.show_pid
elif button == MIDDLE_BUTTON:
self.show_label = not self.show_label
elif button == RIGHT_BUTTON:
self.show_cpu = not self.show_cpu
else:
return
self.refresh()
self.invalidate()
def refresh(self):
top = spawn_read(*self.top_cmd).split('\n')
top = [line for line in top if not line.startswith('%')][6]
top = [col for col in top.replace('\t', ' ').split(' ') if not col == '']
text = 'Top: ' if self.show_label else ''
if self.show_pid:
text += top[0] + ' '
if self.show_cpu:
text += top[6] + ' '
text += ' '.join(top[10:])
self.text = text
def function(self):
return self.text
class MyMOC(Entry):
def __init__(self, *args, ignore = None, **kwargs):
self.show_state = False
self.display = 0
self.function_display = Sometimes(self.function_display_, 4)
Entry.__init__(self, *args, **kwargs)
def invalidate(self):
self.function_display.counter = 0
Entry.invalidate(self)
def action(self, col, button, x, y):
if button == RIGHT_BUTTON:
self.show_state = not self.show_state
self.invalidate()
elif self.show_state:
if button == LEFT_BUTTON:
self.invalidate()
elif button == SCROLL_UP:
n = self.display + 1
if n >= 6:
return
self.display = n
self.invalidate()
elif button == SCROLL_DOWN:
n = self.display - 1
if n < 0:
return
self.display = n
self.invalidate()
else:
moc = MOC()
if moc.state == MOC.NOT_RUNNING:
return
elif button == LEFT_BUTTON:
if col < 5:
return
col -= 5
if col % 3 == 2:
return
col //= 3
if col == 0: async(lambda : moc.play().wait())
elif col == 1: async(lambda : moc.toggle_pause().wait())
elif col == 2: async(lambda : moc.stop().wait())
elif col == 3: async(lambda : moc.previous().wait())
elif col == 4: async(lambda : moc.next().wait())
elif button == SCROLL_UP: async(lambda : moc.seek(+5).wait())
elif button == SCROLL_DOWN: async(lambda : moc.seek(-5).wait())
def function_display_(self):
moc = MOC()
display = self.display
if display == 0 or moc.state in (MOC.NOT_RUNNING, MOC.STOPPED):
return 'Moc: %s' % { MOC.NOT_RUNNING : 'not running'
, MOC.STOPPED : 'stopped'
, MOC.PAUSED : 'paused'
, MOC.PLAYING : 'playing'}[moc.state]
key = ['SongTitle', 'Album', 'Artist', 'Title', 'File'][display - 1]
label = key.lower() if display != 1 else 'song'
return 'Moc(%s): %s' % (label, moc[key])
def function(self):
if not self.show_state:
return 'Moc: |> || [] |< >|'
return self.function_display()
class MyIPAddress(Entry):
def __init__(self, *args, ignore = None, **kwargs):
self.ignore = ['lo'] if ignore is None else ignore
self.text = None
self.show_public = True
self.show_nic = None
Entry.__init__(self, *args, **kwargs)
def init():
self.refresh()
async(lambda : Clock(sync_to = 10 * Clock.MINUTES).continuous_sync(t(self.refresh)), name = 'ipaddress')
async(init, name = 'ipaddress')
def action(self, col, button, x, y):
if button == LEFT_BUTTON:
self.show_public = not self.show_public
self.invalidate()
elif button == RIGHT_BUTTON:
async(self.refresh, name = 'ipaddress')
elif button in (SCROLL_UP, SCROLL_DOWN):
nic = self.show_nic
if nic is None:
return
nics = list(sorted(self.text[0].keys()))
try:
nic = nics.index(nic)
except:
if len(nics) == 0:
self.show_nic = None
self.invalidate()
else:
self.show_nic = nics[0]
self.invalidate()
nic += +1 if button == SCROLL_UP else -1
if 0 <= nic < len(nics):
self.show_nic = nics[nic]
self.invalidate()
def refresh(self):
text_private, text_public = {}, {}
ipa = IPAddress(*(self.ignore))
for nic in ipa.nics.keys():
(state, a4, a6, a) = ipa.nics[nic]
if state == IPAddress.DOWN:
label = '31'
elif state == IPAddress.UP:
label = '32'
elif state == IPAddress.UNKNOWN:
label = '33'
else:
label = '39'
label = '\033[%sm%s\033[0m' % (label, nic)
pub = label + ':'
prv = pub
if a4 is not None:
prv += ' ' + a4
if a6 is not None:
prv += ' ' + a6
elif a4 is None:
prv += ' no address'
pub += ' ' + a if a is not None else ' no address'
text_private[nic] = prv
text_public[nic] = pub
self.text = (text_private, text_public)
self.invalidate()
def function(self):
text = self.text
if text is None:
return '...'
text = text[1 if self.show_public else 0]
nic = self.show_nic
if nic is None or nic not in text:
if len(text.keys()) == 0:
self.show_nic = None
return 'No NIC available'
nic = sorted(text.keys())[0]
self.show_nic = nic
return text[nic]
class MyLeapsec(Entry):
def __init__(self, *args, **kwargs):
self.announcement = None
Entry.__init__(self, *args, **kwargs)
def init():
self.refresh()
async(lambda : Clock(sync_to = Clock.MINUTES).continuous_sync(Sometimes(t(self.refresh), 12 * 60)), name = 'leapsec')
async(init, name = 'leapsec')
def action(self, col, button, x, y):
if button == RIGHT_BUTTON:
self.refresh()
def refresh(self):
leapanon = LeapSeconds()
found = -1
now = time.time()
for index in range(len(leapanon)):
if leapanon[index][3] >= now:
found = index
break
announcement = leapanon[found]
text = ('+%i' if announcement[4] > 0 else '%i') % announcement[4]
text += 's ' + ('\033[%sm%s\033[00m ago' if found < 0 else 'in \033[%sm%s\033[00m')
text += ' end of UTC %i-%02i-%0i' % announcement[:3]
if announcement[5] == LeapSeconds.SECONDARY:
text += ' (secondary)'
elif announcement[5] == LeapSeconds.OUT_OF_BAND:
text += ' (out of band)'
self.announcement = (text, announcement[3])
self.invalidate()
def dur(self, t):
s, t = t % 60, t // 60
m, t = t % 60, t // 60
h, d = t % 24, t // 24
if d > 0:
return '%id%ih%i\'%02i"' % (d, h, m, s)
elif h > 0:
return '%ih%i\'%02i"' % (h, m, s)
elif m > 0:
return '%i\'%02i"' % (m, s)
else:
return '%02is' % s
def colour(self, time_until):
c = '00'
if time_until >= 0:
if time_until < 10:
c = '41'
elif time_until < 60:
c = '31'
elif time_until < 60 * 60:
c = '33'
elif time_until < 24 * 60 * 60:
c = '32'
return c
def function(self):
if self.announcement is None:
return '...'
(text, when) = self.announcement
time_until = when - int(time.time())
return text % (self.colour(time_until), self.dur(abs(time_until)))
class MyTimer(Entry):
def __init__(self, *args, **kwargs):
self.start = None
self.length = None
self.notified = False
self.pause_text = None
mqueue_map['timer'] = self.mqueue
Entry.__init__(self, *args, **kwargs)
def mqueue(self, args):
if args[1] == 'stop':
self.stop()
elif args[1] in ('pause', 'resume'):
self.pause()
else:
duration = args[1].split(':')
duration.reverse()
seconds = 0
if len(duration) > 0: seconds += int(duration[0])
if len(duration) > 1: seconds += int(duration[1]) * 60
if len(duration) > 2: seconds += int(duration[2]) * 60 * 60
if len(duration) > 3: seconds += int(duration[3]) * 60 * 60 * 24
self.start = time.time()
self.length = seconds
self.notified = False
self.pause_text = None
self.invalidate()
def action(self, col, button, x, y):
if button == LEFT_BUTTON:
self.pause()
elif button == RIGHT_BUTTON:
self.stop()
def pause(self):
if self.pause_text is not None:
self.start = time.time()
self.pause_text = None
elif self.length - int(time.time() - self.start) < 0:
self.start, self.length, self.notified, self.pause_text = None, None, False, None
else:
self.pause_text = ''
self.length -= int(time.time() - self.start)
self.pause_text = self.dur(self.length)
self.invalidate()
def stop(self):
self.start = None
self.length = None
self.notified = False
self.pause_text = None
self.invalidate()
def dur(self, t):
s, t = t % 60, t // 60
m, t = t % 60, t // 60
h, d = t % 24, t // 24
if d > 0:
return '%id %i:%02i:%02i' % (d, h, m, s)
elif h > 0:
return '%i:%02i:%02i' % (h, m, s)
else:
return '%i:%02i' % (m, s)
def notify(self):
subprocess.Popen(['notify-send', '-u', 'critical', 'You are done with your task']).wait()
def wall(self):
subprocess.Popen(['wall', 'You are done with your task']).wait()
def function(self):
if self.pause_text is not None:
return '%s (paused)' % self.pause_text
if self.start is None:
return 'Timer inactive'
countdown = self.length - int(time.time() - self.start)
if countdown > 0:
return self.dur(countdown)
if not self.notified:
self.notified = True
async(self.notify, name = 'timer notify')
async(self.wall, name = 'timer wall')
countdown %= 2
if countdown == 0:
return '\033[37;41mYou are done\033[00m'
else:
return '\033[31mYou are done\033[00m'
def mqueue_wait():
import posix_ipc
qkey = '/.xpybar.' + os.environ['DISPLAY'].split('.')[0]
q = posix_ipc.MessageQueue(qkey, posix_ipc.O_CREAT, 0o600, 8, 128)
while True:
try:
message = q.receive(None)[0].decode('utf-8', 'replace').split(' ')
if message[0] in mqueue_map:
try:
mqueue_map[message[0]](message)
except Exception as err:
print('%s: %s' % (sys.argv[0], str(err)), file = sys.stderr, flush = True)
else:
print('%s: unrecognised message: %s' % (sys.argv[0], ' '.join(message)), file = sys.stderr, flush = True)
except:
time.sleep(1)
mqueue_map = {}
myxmonad = MyXMonad(None)
myscroll = MyScroll(None)
functions = [ [ myscroll
, myxmonad
, None
, MyTimer (None)
, MyNetwork (lambda f : Clocked(f, 2))
, MyMemory (lambda f : Clocked(f, 2))
, MyCPU (lambda f : Clocked(f, 2))
, MyBattery (lambda f : Clocked(f, 10))
, MyALSA (None)
, MyClock (lambda f : Clocked(f, 1))
]
, [ myscroll
, myxmonad
, None
, MyTop (None)
, MyIO (lambda f : Clocked(f, 10))
, MyStat (lambda f : Clocked(f, 10))
, MyNews (None)
, MyWeather (None)
, MyComputer (lambda f : Clocked(f, 20))
]
, [ myscroll
, myxmonad
, None
, MyIPAddress(lambda f : Clocked(f, 20))
, MyLeapsec (None)
, MyMOC (None)
]
]
HEIGHT = 0
groups = [Group(f) for f in functions]
groupi = 0
group = groups[groupi]
semaphore = threading.Semaphore()
def update_per_clock():
if semaphore.acquire(blocking = False):
try:
for g in groups:
for f in g.functions:
if isinstance(f.wrapped, Clocked):
f(True)
finally:
semaphore.release()
invalidate()
start_ = start
def start():
start_()
bar.clear()
get_display().flush()
async(lambda : clock.continuous_sync(t(update_per_clock)), name = 'clock')
async(mqueue_wait, name = 'mqueue')
def redraw():
if semaphore.acquire(blocking = False):
try:
gr = groups[groupi]
for g in groups:
if g is not gr:
for f in g.functions:
f()
values = gr.pattern % tuple(f() for f in gr.functions)
bar.partial_clear(0, bar.width, 10, 0, 2, values)
bar.draw_coloured_splitted_text(0, bar.width, 10, 0, 2, values)
finally:
semaphore.release()
return True
return False
def unhandled_event(e):
global groups, groupi, group
if isinstance(e, Xlib.protocol.event.ButtonPress):
y = e.event_y
x = e.event_x
row = y // HEIGHT_PER_LINE
lcol = x // bar.font_width
rcol = (bar.width - x) // bar.font_width
button = e.detail
if button in (FORWARD_BUTTON, BACKWARD_BUTTON):
groupi = (groupi + (+1 if button == BACKWARD_BUTTON else -1)) % len(groups)
group = groups[groupi]
invalidate()
else:
for f in group.posupdate:
f.update_position()
for f in group.functions:
if f.click(row, lcol, rcol, button, x, y):
break