# -*- python -*- from plugins.network import Network from plugins.ping import Ping from common import * class MyNetwork(Entry): def __init__(self, *args, limits = None, ignore = None, pings = None, **kwargs): self.limits = { 'rx_bytes' : None # Download speed in bytes (not bits) , 'tx_bytes' : None # Upload speed in bytes (not bits) , 'rx_total' : None # Download cap in bytes , 'tx_total' : None # Upload cap in bytes } if limits is not None: self.limits = limits if pings is None: pings = ['gateway'] elif isinstance(pings, str) or isinstance(pings, Ping): pings = [pings] pings = [(Ping(targets = Ping.get_nics(p)) if isinstance(p, str) else p).monitors for p in pings] self.pings = {} for ping in pings: for nic in ping.keys(): if nic not in self.pings: self.pings[nic] = ping[nic] else: self.pings[nic] += ping[nic] self.ignore = ['lo'] if ignore is None else ignore self.net_time = time.monotonic() self.net_last = {} self.show_all = True self.show_name = True self.show_value = 0 self.labels = ['bytes', 'total', 'packets', 'errs', 'drop', 'fifo', 'frame', 'colls', 'carrier', 'compressed', 'multicast'] self.in_bytes = False # in bits if showing total Entry.__init__(self, *args, **kwargs) def action(self, col, button, x, y): if button == LEFT_BUTTON: self.show_all = not self.show_all elif button == MIDDLE_BUTTON: self.show_name = not self.show_name elif button == RIGHT_BUTTON: self.in_bytes = not self.in_bytes elif button == SCROLL_UP: n = self.show_value + 1 if n >= len(self.labels): return self.show_value = n elif button == SCROLL_DOWN: n = self.show_value - 1 if self.show_value < 0: return self.show_value = n else: return self.invalidate() def colourise(self, value, percent): colour = '39' if percent > 25: colour = '32' if percent > 50: colour = '33' if percent > 90: colour = '31' return '\033[%sm%3.0f\033[0m' % (colour, value) def KBps(self, network, device, direction, net_tdiff): direction += '_bytes' value = network[device][direction] limit = None if direction in self.limits: limit = self.limits[direction] if limit is None: limit = 12500000 # 100 mbps if device in self.net_last: value -= self.net_last[device][direction] else: value = 0 percent = value * 100 / (limit * net_tdiff) value /= 1024 * net_tdiff return self.colourise(value, percent) def kbps(self, network, device, direction, net_tdiff): direction += '_bytes' value = network[device][direction] limit = None if direction in self.limits: limit = self.limits[direction] if limit is None: limit = 12500000 # 100 mbps if device in self.net_last: value -= self.net_last[device][direction] else: value = 0 percent = value * 100 / (limit * net_tdiff) value /= 128 * net_tdiff return self.colourise(value, percent) def total(self, network, device, direction, bits): direction += '_bytes' value = network[device][direction] limit = self.limits[direction] if direction in self.limits else None if limit is not None: percent = value * 100 / limit colour = '32' if percent >= 25: colour = '39' if percent >= 50: colour = '33' if percent >= 75: colour = '31' if percent >= 95: colour = '7;33' if percent >= 100: colour = '7;31' else: colour = '39' if bits: value *= 8 unit = 0 if bits: units = ['', 'k', 'm', 'g', 't', 'p', 'e'] else: units = ['', 'K', 'M', 'G', 'T', 'P', 'E'] while (unit + 1 < len(units)) and (value >= 1024): value /= 1024 unit += 1 return '\033[%sm%.1f\033[0m%s' % (colour, value, units[unit]) def ping(self, monitor): monitor.semaphore.acquire() try: latency = monitor.get_latency(True)[1] droptime = monitor.dropped_time(True) if droptime: return ' \033[31m%4is\033[00m' % droptime elif latency is None: return ' \033[31mdrop?\033[00m' colour = '31' if latency < 5: colour = '32' elif latency < 10: colour = '39' elif latency < 20: colour = '33' return ' \033[%sm%5.2f\033[00m' % (colour, latency) finally: monitor.semaphore.release() def function(self): net_now = time.monotonic() net_tdiff, self.net_time = net_now - self.net_time, net_now network = Network(*self.ignore).devices label = self.labels[self.show_value] show_total = label == 'total' show_bytes = label == 'bytes' xlabel = 'bytes' if show_total else label suffix = '' if show_total and self.in_bytes: f = lambda dev, dir : self.total(network, dev, dir, True) + 'b' elif show_total: f = lambda dev, dir : self.total(network, dev, dir, False) + 'B' elif show_bytes and self.in_bytes: f = lambda dev, dir : self.KBps(network, dev, dir, net_tdiff) + 'KB/s' elif show_bytes: f = lambda dev, dir : self.kbps(network, dev, dir, net_tdiff) + 'kbps' else: f = lambda dev, dir : '%i' % network[dev][dir + '_' + label] suffix = ' ' + label def create(lbl, dev): ret = '' if lbl is not None: ret += lbl + ': ' have_down = ('rx_' + xlabel) in network[dev] have_up = ('tx_' + xlabel) in network[dev] if have_down: ret += f(dev, 'rx') + '↓' if have_up: ret += ' ' if have_up: ret += f(dev, 'tx') + '↑' ret += suffix if show_bytes or show_total: if dev == '*': for dev in self.pings.keys(): for ping in self.pings[dev]: ret += self.ping(ping) elif dev in self.pings: for ping in self.pings[dev]: ret += self.ping(ping) return ret if self.show_all: if self.show_name: net = [create(dev, dev) for dev in network] else: net = [create(None, dev) for dev in network] net = (SEPARATOR if self.show_name else ' ').join(net) else: devsum = {} for dev in network.keys(): table = network[dev] for key in table.keys(): if key not in devsum: devsum[key] = 0 devsum[key] += table[key] network['*'] = devsum if self.show_name: net = create('Net', '*') else: net = create(None, '*') self.net_last = network return net