# -*- python -*-
import os
import sys
from plugins.application import Application
from plugins.clock import Clock
from plugins.cpuinfo import CPUInfo
from plugins.hdparm import HDParm
from plugins.image import Image
from plugins.kmsg import KMsg
from plugins.loadavg import AverageLoad
from plugins.lunar import Lunar
from plugins.menu import Menu
from plugins.ropty import ROPTY
from plugins.solar import Solar
sys.path.append(os.path.dirname(config_file) if '/' in config_file else '.')
from common import *
G.globals = globals()
G.clock = Clock(sync_to = 0.5)
%%>if test -x /usr/bin/terminator; then
G.TERMINAL = 'terminator'
%%>elif test -x /usr/bin/st; then
G.TERMINAL = 'st'
%%>else
G.TERMINAL = 'xterm'
%%>fi
G.OUTPUT, G.HEIGHT_PER_LINE, G.YPOS, G.TOP = 0, 12, 0, True
G.FONT = '-misc-fixed-medium-r-normal-*-10-*-*-*-c-*-iso10646-1'
from myalsa import MyALSA, MixerAction
from mybacklight import MyBacklight
from mybattery import MyBattery
from mybrilliance import MyBrilliance
from mycg import MyCGNegative
from myclock import MyClock
from mycomputer import MyComputer
from mycpu import MyCPU
from myio import MyIO
from myipaddress import MyIPAddress
from myirc import MyIRC
from mylid import MyLid
from myleapsec import MyLeapsec
from mymemory import MyMemory
from mymoc import MyMOC
from mynetwork import MyNetwork, Ping
from mynews import MyNews
from myscroll import MyScroll
from mystat import MyStat
from mysun import MySun
from mytimer import MyTimer
from mytop import MyTop
from myweather import MyWeather
from myxmonad import MyXMonad
%%>hostname="$(hostname | tr '[[:upper:]]' '[[:lower:]]')"
def mqueue_wait():
import posix_ipc
qkey = '/.xpybar.' + os.environ['DISPLAY'].split('.')[0]
q = posix_ipc.MessageQueue(qkey, posix_ipc.O_CREAT, 0o600, 8, 128)
while True:
try:
message = q.receive(None)[0].decode('utf-8', 'replace').split(' ')
if message[0] in mqueue_map:
try:
mqueue_map[message[0]](message)
except Exception as err:
print('%s: %s' % (sys.argv[0], str(err)), file = sys.stderr, flush = True)
else:
print('%s: unrecognised message: %s' % (sys.argv[0], ' '.join(message)), file = sys.stderr, flush = True)
except:
time.sleep(1)
myxmonad = MyXMonad(None)
myscroll = MyScroll(None)
myclock = MyClock (lambda f : Clocked(f, 1), format = '%Y-(%m)%b-%d %T, %a w%V, %Z', long_format = '%Y-%m-%d %T')
%%>if iswork; then
with open('/proc/cpuinfo', 'r') as f:
isvm = f.readlines()
isvm = [line.replace('\n', ' ').replace('\t', ' ').split(':') for line in isvm if line]
isvm = any('hypervisor' in ':'.join(line[1:]).split(' ') for line in isvm if line[0].strip().startswith('flags'))
card1 = 'Ensoniq AudioPCI' if isvm else 'HDA Intel PCH'
%%>else
card1 = 'HD-Audio Generic'
%%>fi
mixers1 = ['Master', 'PCM'] ## TODO
%%>if iswork; then
#mixers1.append(('Headphone', 'Speaker'))
if not isvm:
mixers1.append(MixerAction('Headphone', lambda : spawn_read('headphones')))
mixers1.append(MixerAction('Speaker', lambda : spawn_read('speakers')))
%%>fi
card2 = 'Yeti Stereo Microphone'
mixers2 = ['Speaker', 'Mic']
#myii = ...
#myirc = ...
#from plugins.ii import II
#from myii import MyII
#def irc_watch(s):
# s = s.lower()
# return any(map(lambda x : x in s, ['mattias andrée', 'maandree', 'mandree', 'maandre', 'mandre']))
#ii = II('irc.oftc.net/#suckless', prefix = HOME + '/.irc')
#myii = MyII(None, channel = 'irc.oftc.net/#suckless', prefix = HOME + '/.irc', watch = irc_watch)
#myirc = MyIRC(None, ii = myii)
#myii.display = myirc
mylid = ...
mybattery = ...
%%>if test -d /proc/acpi/button/lid; then
mylid = MyLid (None)
%%>fi
%%>if test ! $(ls /sys/class/power_supply/ | wc -l) = 0; then
mybattery = MyBattery(None)
%%>fi
pingthese = []
%%>if test -r ~/.dotfiles/.secrets/ping-"$hostname"; then
%%> for address in $(cat ~/.dotfiles/.secrets/ping-"$hostname"); do
pingthese.append(Ping(targets = Ping.get_nics('%%{address}'), interval = 30))
%%> done
%%>fi
try:
with open(HOME + '/.config/metar', 'rb') as file:
metar_stations = file.read().decode('utf-8', 'strict').split('\n')
except:
raise
try:
with open('/etc/metar', 'rb') as file:
metar_stations = file.read().decode('utf-8', 'strict').split('\n')
except:
metar_stations = []
metar_stations = [x[0].upper() + x[1:].lower() for x in metar_stations if x != '']
netrenamemap = {
'lo' : None,
'veth42d1872' : None,
'enxc84bd6ba1a73' : None,
'enxc84bd6ba1a91' : None,
'enx00249b1e3c30' : 'Ctrl',
'ens33' : 'Nat', # VMWare
'ens37' : 'Ctrl', # VMWare
'enxc84d44213db2' : 'Home', # USB-C eth+usb dongle
'enx0050b6cbd51b' : 'Home', # USB-C eth dongle
'enxb44506e09918' : 'Eth',
'wlp0s20f3' : 'WiFi'
}
netignorelist = [k for k, v in netrenamemap.items() if v is None]
netrenamemap = {k: v for k, v in netrenamemap.items() if v is not None}
functions = [ [ myxmonad
, None
, MyTimer (None, alarms = [])
, MyALSA (None, cards = card1, mixers = mixers1, colours = {'Speaker' : '31'})
%%>if test "$hostname" = zenith; then
, MyALSA (None, cards = card2, mixers = mixers2, prefix = 'Y.')
%%>fi
, MyComputer (lambda f : Clocked(f, 20))
, myscroll
, None
, myclock
, MyCPU (lambda f : Clocked(f, 2))
, MyMemory (lambda f : Clocked(f, 2))
, None
, MyNetwork (lambda f : Clocked(f, 2), ignore = netignorelist, pings = pingthese, renamemap = netrenamemap)
#, myirc
%%>if test -x /usr/bin/featherweight; then
, MyNews (None)
%%>fi
, MyWeather (None, stations = metar_stations) if metar_stations else ...
#, MySun (None, clock = myclock)
]
, [ myxmonad
, None
%%>if test "$hostname" = zenith; then
, MyIPAddress (lambda f : Clocked(f, 20), public = False)
%%>else
, MyALSA (None, cards = card2, mixers = mixers2, prefix = 'Yeti.')
%%>fi
, MyMOC (None)
, myscroll
, None
, myclock
, mylid
, mybattery
#, myii
, None
, MyStat (lambda f : Clocked(f, 10))
%%>if test ! "$hostname" = zenith; then
, MyCGNegative()
%%>fi
, MyBrilliance(None)
%%>if test ! "$hostname" = zenith; then
, MyBacklight (None)
%%>fi
#, MyIO (lambda f : Clocked(f, 10), fs_ignore = [])
]
]
functions = [[mon for mon in group if mon != ...] for group in functions]
G.HEIGHT = 0
G.groups = [Group(f) for f in functions]
G.groupi = 0
G.group = G.groups[G.groupi]
G.semaphore = threading.Semaphore()
def update_per_clock():
if G.semaphore.acquire(blocking = False):
try:
for g in G.groups:
for f in g.functions:
if isinstance(f.wrapped, Clocked):
f(True)
finally:
G.semaphore.release()
invalidate()
start_ = start
def start():
start_()
#if myii is not ...:
# myii.start()
bar.clear()
get_display().flush()
xasync(lambda : G.clock.continuous_sync(t(update_per_clock)), name = 'clock')
xasync(mqueue_wait, name = 'mqueue')
def redraw():
if G.semaphore.acquire(blocking = False):
try:
gr = G.groups[G.groupi]
for g in G.groups:
if g is not gr:
for f in g.functions:
f()
values = gr.pattern % tuple(f() for f in gr.functions)
bar.partial_clear(0, bar.width, 10, 0, 2, values)
bar.draw_coloured_splitted_text(0, bar.width, 10, 0, 2, values)
finally:
G.semaphore.release()
return True
return False
def unhandled_event(e):
if isinstance(e, Xlib.protocol.event.ButtonPress):
y = e.event_y
x = e.event_x
row = y // HEIGHT_PER_LINE
lcol = x // bar.font_width
rcol = (bar.width - x) // bar.font_width
button = e.detail
if button in (FORWARD_BUTTON, BACKWARD_BUTTON):
G.groupi = (G.groupi + (+1 if button == BACKWARD_BUTTON else -1)) % len(G.groups)
G.group = G.groups[G.groupi]
invalidate()
else:
for f in G.group.posupdate:
f.update_position()
for f in G.group.functions:
if f.click(row, lcol, rcol, button, x, y):
break