# -*- python -*- # A moderate xpybar configuration example that has a few monitors that are updates continuously import time from plugins.clock import Clock from plugins.cpu import CPU from plugins.mem import Memory from plugins.network import Network from plugins.users import Users from plugins.pacman import Pacman from plugins.uname import Uname from plugins.weather import Weather from plugins.chase import Chase OUTPUT, YPOS, TOP = 0, 24, True clock_ = Clock(format = '%Y-(%m)%b-%d %T, %a w%V, %Z', sync_to = 0.5) def memory(): memory_ = Memory() def colourise(value, format = '%.0f'): colour = '39' if value > 30: colour = '32' if value > 50: colour = '33' if value > 80: colour = '31' return '\033[%sm%s\033[0m%%' % (colour, format % value) mem = 'Mem: ' + colourise(memory_.mem_used * 100 / memory_.mem_total) swp = 'Swp: ' + colourise(memory_.swap_used * 100 / memory_.swap_total) shm = 'Shm: ' + colourise(memory_.shmem * 100 / memory_.mem_total) return '%s │ %s │ %s' % (mem, swp, shm) def users(): users_ = Users().users you = os.environ['USER'] def colour_user(user): if user == 'root': return '\033[31m%s\033[39m' elif not user == you: return '\033[33m%s\033[39m' else: return '%s' users = ['%s{%i}' % (colour_user(u) % u, len(users_[u])) for u in users_.keys()] users = 'Users: %s' % (' '.join(users)) return users have_linux_libre, have_pacman = True, None linux_installed, linux_latest = None, None def uname(): global have_linux_libre, have_pacman, linux_installed, linux_latest if have_pacman is None: try: linux_installed = Pacman('linux-libre', True) except: have_linux_libre = False try: linux_installed = Pacman('linux', True) except: have_pacman = False if have_pacman: try: linux_latest = Pacman('linux-libre' if have_linux_libre else 'linux', False) except: have_pacman = None elif have_pacman: try: linux_installed = Pacman('linux-libre' if have_linux_libre else 'linux', True) linux_latest = Pacman('linux-libre' if have_linux_libre else 'linux', False) except: have_pacman = None uname_ = Uname() nodename = uname_.nodename kernel_release = uname_.kernel_release operating_system = uname_.operating_system if (have_pacman is not None) and have_pacman: linux_running = kernel_release.split('-') linux_running, kernel_release = linux_running[:2], linux_running[2:] linux_running = '-'.join(linux_running) kernel_release = '-' + '-'.join(kernel_release) linux_installed = linux_installed.version linux_latest = linux_latest.version if linux_installed == linux_latest: if linux_running == linux_installed: linux_running = '\033[32m%s\033[39m' % linux_running else: if linux_running == linux_installed: linux_running = '\033[33m%s\033[39m' % linux_running else: linux_running = '\033[31m%s\033[39m' % linux_running kernel_release = linux_running + kernel_release uname_ = '%s %s %s' uname_ %= (nodename, kernel_release, operating_system) return uname_ net_time = time.monotonic() net_last = {} def network(): global net_time, net_last net_now = time.monotonic() net_tdiff, net_time = net_now - net_time, net_now net_ = Network('lo').devices def colourise(value): colour = '39' if value > 40: colour = '32' if value > 8000: colour = '33' if value > 60000: colour = '31' return '\033[%sm%3.0f\033[0m' % (colour, value) def kbps(device, direction): direction += '_bytes' value = net_[device][direction] if device in net_last: value -= net_last[device][direction] else: value = 0 value /= 128 * net_tdiff return colourise(value) net = [(dev, kbps(dev, 'rx'), kbps(dev, 'tx')) for dev in net_] net = ['%s: %skbps↓ %skbps↑' % (dev, down, up) for dev, down, up in net] net = '%s' % ' '.join(net) net_last = net_ return net last_cpu_idle, last_cpu_total = 0, 0 last_cpus_idle, last_cpus_total = [], [] def cpu(): global last_cpus_idle, last_cpus_total, last_cpu_idle, last_cpu_total cpu_ = CPU() now_cpu_idle, now_cpus_idle = cpu_.cpu[CPU.idle], [cpu[CPU.idle] for cpu in cpu_.cpus] now_cpu_total, now_cpus_total = sum(cpu_.cpu), [sum(cpu) for cpu in cpu_.cpus] def cpu_usage(now_idle, now_total, last_idle, last_total): total = now_total - last_total idle = now_idle - last_idle return None if total == 0 else (total - idle) * 100 / total def colourise(value): if value is None: return '--%' colour = '39' if value >= 5: colour = '32' if value >= 50: colour = '33' if value >= 90: colour = '31' return '\033[%sm%2.0f\033[0m%%' % (colour, value) if len(now_cpus_idle) > len(last_cpus_idle): last_cpus_idle += now_cpus_idle[len(last_cpus_idle):] last_cpus_total += now_cpus_total[len(last_cpus_total):] cpus = zip(now_cpus_idle, now_cpus_total, last_cpus_idle, last_cpus_total) cpus = ' '.join([colourise(cpu_usage(*c)) for c in cpus]) cpu = colourise(cpu_usage(now_cpu_idle, now_cpu_total, last_cpu_idle, last_cpu_total)) cpu = 'Cpu: %s : %s' % (cpus, cpu) last_cpus_idle, last_cpus_total = now_cpus_idle, now_cpus_total last_cpu_idle, last_cpu_total = now_cpu_idle, now_cpu_total return cpu weather_last = '?' weather_semaphore = threading.Semaphore() def weather_update_(): global weather_last if weather_semaphore.acquire(blocking = False): try: weather_ = Weather('ESSA').temp colour = '34' if weather_ < -10: colour = '39;44' if weather_ >= 18: colour = '39' if weather_ >= 25: colour = '33' if weather_ >= 30: colour = '31' weather_ = '\033[%sm%.0f\033[0m°C' % (colour, weather_) weather_last = weather_ except: if not weather_last.endswith('?'): weather_last += '?' weather_semaphore.release() weather_update = Sometimes(lambda : async(weather_update_), 20 * 60 * 2) def weather(): rc = weather_last weather_update() return rc chase_ = Chase() chase_update = Sometimes(lambda : async(chase_.update), 2 * 60 * 60 * 2) def chase(): status = chase_.status status = '39' if status is None else ('32' if status else '31') chase_update() return '\033[%smChase\033[0m' % status functions = [ Sometimes(lambda : clock_.read(), 1 * 2), lambda : time.time() % 1, Sometimes(users, 1 * 2), weather, chase, cpu, memory, network, Sometimes(uname, 30 * 60 * 2), ] pattern = [ '%s │ %.2f │ %s │ %s │ %s }{ %s │ %s │ %s │ %s' ] start_ = start def start(): start_() def refresh(): if redraw(): get_display().flush() async(lambda : clock_.continuous_sync(refresh)) HEIGHT = len(pattern) * 12 pattern = '\n'.join([p.replace('}{', '\0') for p in pattern]) semaphore = threading.Semaphore() def redraw(): if semaphore.acquire(blocking = False): values = pattern % tuple([f() for f in functions]) bar.clear() #print(values.replace('\0', ' ' * 8)) bar.draw_coloured_splitted_text(0, bar.width, 10, 0, 2, values) semaphore.release() return True return False