# -*- python -*- import os import sys from plugins.application import Application from plugins.clock import Clock from plugins.cpuinfo import CPUInfo from plugins.hdparm import HDParm from plugins.image import Image from plugins.kmsg import KMsg from plugins.loadavg import AverageLoad from plugins.lunar import Lunar from plugins.menu import Menu from plugins.ropty import ROPTY from plugins.solar import Solar sys.path.append(os.path.dirname(config_file) if '/' in config_file else '.') from common import * G.globals = globals() G.clock = Clock(sync_to = 0.5) %%>if test -x /usr/bin/terminator; then G.TERMINAL = 'terminator' %%>elif test -x /usr/bin/st; then G.TERMINAL = 'st' %%>else G.TERMINAL = 'xterm' %%>fi G.OUTPUT, G.HEIGHT_PER_LINE, G.YPOS, G.TOP = 0, 12, 0, True G.FONT = '-misc-fixed-medium-r-normal-*-10-*-*-*-c-*-iso10646-1' from myalsa import MyALSA from mybacklight import MyBacklight from mybattery import MyBattery from mybrilliance import MyBrilliance from myclock import MyClock from mycomputer import MyComputer from mycpu import MyCPU from myio import MyIO from myipaddress import MyIPAddress from myirc import MyIRC from mylid import MyLid from myleapsec import MyLeapsec from mymemory import MyMemory from mymoc import MyMOC from mynetwork import MyNetwork, Ping from mynews import MyNews from myscroll import MyScroll from mystat import MyStat from mysun import MySun from mytimer import MyTimer from mytop import MyTop from myweather import MyWeather from myxmonad import MyXMonad def mqueue_wait(): import posix_ipc qkey = '/.xpybar.' + os.environ['DISPLAY'].split('.')[0] q = posix_ipc.MessageQueue(qkey, posix_ipc.O_CREAT, 0o600, 8, 128) while True: try: message = q.receive(None)[0].decode('utf-8', 'replace').split(' ') if message[0] in mqueue_map: try: mqueue_map[message[0]](message) except Exception as err: print('%s: %s' % (sys.argv[0], str(err)), file = sys.stderr, flush = True) else: print('%s: unrecognised message: %s' % (sys.argv[0], ' '.join(message)), file = sys.stderr, flush = True) except: time.sleep(1) myxmonad = MyXMonad(None) myscroll = MyScroll(None) myclock = MyClock (lambda f : Clocked(f, 1), format = '%Y-(%m)%b-%d %T, %a w%V, %Z', long_format = '%Y-%m-%d %T') mixers = ['Master', 'PCM'] ## TODO #mixers.append(('Headphone', 'Speaker')) #myii = ... #myirc = ... #from plugins.ii import II #from myii import MyII #def irc_watch(s): # s = s.lower() # return any(map(lambda x : x in s, ['mattias andrée', 'maandree', 'mandree', 'maandre', 'mandre'])) #ii = II('irc.oftc.net/#suckless', prefix = HOME + '/.irc') #myii = MyII(None, channel = 'irc.oftc.net/#suckless', prefix = HOME + '/.irc', watch = irc_watch) #myirc = MyIRC(None, ii = myii) #myii.display = myirc mylid = ... mybattery = ... %%>if test -d /proc/acpi/button/lid; then mylid = MyLid (None) %%>fi %%>if test ! $(ls /sys/class/power_supply/ | wc -l) = 0; then mybattery = MyBattery(None) %%>fi pingthese = [] %%>if test -r ~/.dotfiles/.secrets/ping-"$(hostname | tr '[[:upper:]]' '[[:lower:]]')"; then %%> for address in $(cat ~/.dotfiles/.secrets/ping-"$(hostname | tr '[[:upper:]]' '[[:lower:]]')"); do pingthese.append(Ping(targets = Ping.get_nics('%%{address}'), interval = 30)) %%> done %%>fi try: with open(HOME + '/.config/metar', 'rb') as file: metar_stations = file.read().decode('utf-8', 'strict').split('\n') except: raise try: with open('/etc/metar', 'rb') as file: metar_stations = file.read().decode('utf-8', 'strict').split('\n') except: metar_stations = [] metar_stations = [x[0].upper() + x[1:].lower() for x in metar_stations if x != ''] functions = [ [ myxmonad , None , MyTimer (None, alarms = []) , MyALSA (None, mixers = mixers, colours = {'Speaker' : '31'}) , MyComputer (lambda f : Clocked(f, 20)) , myscroll , None , myclock , MyCPU (lambda f : Clocked(f, 2)) , MyMemory (lambda f : Clocked(f, 2)) , None , MyNetwork (lambda f : Clocked(f, 2), pings = pingthese) #, myirc %%>if test -x /usr/bin/featherweight; then , MyNews (None) %%>fi , MyWeather (None, stations = metar_stations) if metar_stations else ... #, MySun (None, clock = myclock) ] , [ myxmonad , None , MyIPAddress (lambda f : Clocked(f, 20), public = False) , MyMOC (None) , myscroll , None , myclock , mylid , mybattery #, myii , None , MyStat (lambda f : Clocked(f, 10)) , MyBrilliance(None) #, MyBacklight (None) #, MyIO (lambda f : Clocked(f, 10), fs_ignore = []) ] ] functions = [[mon for mon in group if mon != ...] for group in functions] G.HEIGHT = 0 G.groups = [Group(f) for f in functions] G.groupi = 0 G.group = G.groups[G.groupi] G.semaphore = threading.Semaphore() def update_per_clock(): if G.semaphore.acquire(blocking = False): try: for g in G.groups: for f in g.functions: if isinstance(f.wrapped, Clocked): f(True) finally: G.semaphore.release() invalidate() start_ = start def start(): start_() #if myii is not ...: # myii.start() bar.clear() get_display().flush() xasync(lambda : G.clock.continuous_sync(t(update_per_clock)), name = 'clock') xasync(mqueue_wait, name = 'mqueue') def redraw(): if G.semaphore.acquire(blocking = False): try: gr = G.groups[G.groupi] for g in G.groups: if g is not gr: for f in g.functions: f() values = gr.pattern % tuple(f() for f in gr.functions) bar.partial_clear(0, bar.width, 10, 0, 2, values) bar.draw_coloured_splitted_text(0, bar.width, 10, 0, 2, values) finally: G.semaphore.release() return True return False def unhandled_event(e): if isinstance(e, Xlib.protocol.event.ButtonPress): y = e.event_y x = e.event_x row = y // HEIGHT_PER_LINE lcol = x // bar.font_width rcol = (bar.width - x) // bar.font_width button = e.detail if button in (FORWARD_BUTTON, BACKWARD_BUTTON): G.groupi = (G.groupi + (+1 if button == BACKWARD_BUTTON else -1)) % len(G.groups) G.group = G.groups[G.groupi] invalidate() else: for f in G.group.posupdate: f.update_position() for f in G.group.functions: if f.click(row, lcol, rcol, button, x, y): break