From 3e21f6d13c0a70db95fec8b5a71b758223ff4293 Mon Sep 17 00:00:00 2001 From: Mattias Andrée Date: Sat, 26 Jun 2021 13:18:37 +0200 Subject: Add xpybar MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Mattias Andrée --- xpybar/config/mynetwork.py | 205 +++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 205 insertions(+) create mode 100644 xpybar/config/mynetwork.py (limited to 'xpybar/config/mynetwork.py') diff --git a/xpybar/config/mynetwork.py b/xpybar/config/mynetwork.py new file mode 100644 index 0000000..79cd271 --- /dev/null +++ b/xpybar/config/mynetwork.py @@ -0,0 +1,205 @@ +# -*- python -*- +from plugins.network import Network +from plugins.ping import Ping + +from common import * + +class MyNetwork(Entry): + def __init__(self, *args, limits = None, ignore = None, pings = None, **kwargs): + self.limits = { 'rx_bytes' : None # Download speed in bytes (not bits) + , 'tx_bytes' : None # Upload speed in bytes (not bits) + , 'rx_total' : None # Download cap in bytes + , 'tx_total' : None # Upload cap in bytes + } + if limits is not None: + self.limits = limits + if pings is None: + pings = ['gateway'] + elif isinstance(pings, str) or isinstance(pings, Ping): + pings = [pings] + pings = [(Ping(targets = Ping.get_nics(p)) if isinstance(p, str) else p).monitors for p in pings] + self.pings = {} + for ping in pings: + for nic in ping.keys(): + if nic not in self.pings: + self.pings[nic] = ping[nic] + else: + self.pings[nic] += ping[nic] + self.ignore = ['lo'] if ignore is None else ignore + self.net_time = time.monotonic() + self.net_last = {} + self.show_all = True + self.show_name = True + self.show_value = 0 + self.labels = ['bytes', 'total', 'packets', 'errs', + 'drop', 'fifo', 'frame', 'colls', + 'carrier', 'compressed', 'multicast'] + self.in_bytes = False # in bits if showing total + Entry.__init__(self, *args, **kwargs) + + def action(self, col, button, x, y): + if button == LEFT_BUTTON: + self.show_all = not self.show_all + elif button == MIDDLE_BUTTON: + self.show_name = not self.show_name + elif button == RIGHT_BUTTON: + self.in_bytes = not self.in_bytes + elif button == SCROLL_UP: + n = self.show_value + 1 + if n >= len(self.labels): + return + self.show_value = n + elif button == SCROLL_DOWN: + n = self.show_value - 1 + if self.show_value < 0: + return + self.show_value = n + else: + return + self.invalidate() + + def colourise(self, value, percent): + colour = '39' + if percent > 25: colour = '32' + if percent > 50: colour = '33' + if percent > 90: colour = '31' + return '\033[%sm%3.0f\033[0m' % (colour, value) + + def KBps(self, network, device, direction, net_tdiff): + direction += '_bytes' + value = network[device][direction] + limit = None + if direction in self.limits: + limit = self.limits[direction] + if limit is None: + limit = 12500000 # 100 mbps + if device in self.net_last: + value -= self.net_last[device][direction] + else: + value = 0 + percent = value * 100 / (limit * net_tdiff) + value /= 1024 * net_tdiff + return self.colourise(value, percent) + + def kbps(self, network, device, direction, net_tdiff): + direction += '_bytes' + value = network[device][direction] + limit = None + if direction in self.limits: + limit = self.limits[direction] + if limit is None: + limit = 12500000 # 100 mbps + if device in self.net_last: + value -= self.net_last[device][direction] + else: + value = 0 + percent = value * 100 / (limit * net_tdiff) + value /= 128 * net_tdiff + return self.colourise(value, percent) + + def total(self, network, device, direction, bits): + direction += '_bytes' + value = network[device][direction] + limit = self.limits[direction] if direction in self.limits else None + if limit is not None: + percent = value * 100 / limit + colour = '32' + if percent >= 25: colour = '39' + if percent >= 50: colour = '33' + if percent >= 75: colour = '31' + if percent >= 95: colour = '7;33' + if percent >= 100: colour = '7;31' + else: + colour = '39' + if bits: + value *= 8 + unit = 0 + if bits: + units = ['', 'k', 'm', 'g', 't', 'p', 'e'] + else: + units = ['', 'K', 'M', 'G', 'T', 'P', 'E'] + while (unit + 1 < len(units)) and (value >= 1024): + value /= 1024 + unit += 1 + return '\033[%sm%.1f\033[0m%s' % (colour, value, units[unit]) + + def ping(self, monitor): + monitor.semaphore.acquire() + try: + latency = monitor.get_latency(True)[1] + droptime = monitor.dropped_time(True) + if droptime: + return ' \033[31m%4is\033[00m' % droptime + elif latency is None: + return ' \033[31mdrop?\033[00m' + colour = '31' + if latency < 5: colour = '32' + elif latency < 10: colour = '39' + elif latency < 20: colour = '33' + return ' \033[%sm%5.2f\033[00m' % (colour, latency) + finally: + monitor.semaphore.release() + + def function(self): + net_now = time.monotonic() + net_tdiff, self.net_time = net_now - self.net_time, net_now + network = Network(*self.ignore).devices + label = self.labels[self.show_value] + show_total = label == 'total' + show_bytes = label == 'bytes' + xlabel = 'bytes' if show_total else label + suffix = '' + if show_total and self.in_bytes: + f = lambda dev, dir : self.total(network, dev, dir, True) + 'b' + elif show_total: + f = lambda dev, dir : self.total(network, dev, dir, False) + 'B' + elif show_bytes and self.in_bytes: + f = lambda dev, dir : self.KBps(network, dev, dir, net_tdiff) + 'KB/s' + elif show_bytes: + f = lambda dev, dir : self.kbps(network, dev, dir, net_tdiff) + 'kbps' + else: + f = lambda dev, dir : '%i' % network[dev][dir + '_' + label] + suffix = ' ' + label + def create(lbl, dev): + ret = '' + if lbl is not None: + ret += lbl + ': ' + have_down = ('rx_' + xlabel) in network[dev] + have_up = ('tx_' + xlabel) in network[dev] + if have_down: + ret += f(dev, 'rx') + '↓' + if have_up: + ret += ' ' + if have_up: + ret += f(dev, 'tx') + '↑' + ret += suffix + if show_bytes or show_total: + if dev == '*': + for dev in self.pings.keys(): + for ping in self.pings[dev]: + ret += self.ping(ping) + elif dev in self.pings: + for ping in self.pings[dev]: + ret += self.ping(ping) + return ret + if self.show_all: + if self.show_name: + net = [create(dev, dev) for dev in network] + else: + net = [create(None, dev) for dev in network] + net = (SEPARATOR if self.show_name else ' ').join(net) + else: + devsum = {} + for dev in network.keys(): + table = network[dev] + for key in table.keys(): + if key not in devsum: + devsum[key] = 0 + devsum[key] += table[key] + network['*'] = devsum + if self.show_name: + net = create('Net', '*') + else: + net = create(None, '*') + self.net_last = network + return net -- cgit v1.2.3-70-g09d2