# -*- python -*- from plugins.weather import Weather from common import * class MyWeather(Entry): def __init__(self, *args, stations = None, fahrenheit = False, wind = 'm/s', from_wind = False, visibility = 'km', **kwargs): self.semaphore = threading.Semaphore() self.stations = [MyWeather.get_metar_station()] if stations is None else [s.upper() for s in stations] self.reports = [None] * len(self.stations) self.up_to_date = [False] * len(self.stations) self.in_deg_f = fahrenheit self.wind_unit = wind self.from_wind = from_wind self.visibility_unit = visibility self.show_temp = True self.show_wind_speed = False self.show_wind_gusts = False self.show_wind_dir = False self.show_wind_chill = False self.show_humidity = False self.show_dew = False self.show_pressure = False self.show_visibility = False self.show_sky = False self.show_station = False self.station = 0 self.updated = False self.segments = [] self.refresh = Sometimes(lambda : xasync(self.refresh_, name = 'weather'), 20 * 60 * 4) self.refresh() Entry.__init__(self, *args, **kwargs) def action(self, col, button, x, y): if button == MIDDLE_BUTTON: self.show_temp = True self.show_wind_speed = True self.show_wind_gusts = True self.show_wind_dir = True self.show_wind_chill = True self.show_humidity = True self.show_dew = True self.show_pressure = True self.show_visibility = True self.show_sky = True self.show_station = True self.invalidate() elif button == RIGHT_BUTTON: self.refresh.counter = 0 self.refresh() elif button in (LEFT_BUTTON, SCROLL_UP, SCROLL_DOWN): index = None for i, seg in enumerate(self.segments): if seg is None: continue (x, width) = seg if 0 <= col - x < width: index = i break if index is None: return if index == 0: # station if button == LEFT_BUTTON: self.show_station = False elif button == SCROLL_UP: station = self.station if station + 1 < len(self.stations): self.station = station + 1 else: station = self.station if station > 0: self.station = station - 1 elif index == 1: # temperature if button == LEFT_BUTTON: self.show_temp = False else: self.in_deg_f = button == SCROLL_UP elif index == 2: # wind speed/gusts if button == LEFT_BUTTON: if self.towards is None or col < self.towards: self.show_wind_speed = False else: self.show_wind_gusts = False else: if self.wind_unit == 'm/s': self.wind_unit = 'mph' if button == SCROLL_UP else 'm/s' elif self.wind_unit == 'mph': self.wind_unit = 'kn' if button == SCROLL_UP else 'm/s' elif self.wind_unit in ('kn', 'kt', 'knot', 'knots'): self.wind_unit = 'km/h' if button == SCROLL_UP else 'mph' elif self.wind_unit in ('km/h', 'kmph'): self.wind_unit = 'ft/s' if button == SCROLL_UP else 'kn' elif self.wind_unit in ('ft/s', 'ftps'): self.wind_unit = 'b' if button == SCROLL_UP else 'km/h' else: self.wind_unit = 'b' if button == SCROLL_UP else 'ft/s' elif index == 3: # wind direction if button == LEFT_BUTTON: self.show_wind_dir = False else: self.from_wind = button == SCROLL_UP elif index == 4: # wind chill if button == LEFT_BUTTON: self.show_wind_chill = False else: self.in_deg_f = button == SCROLL_UP elif index == 5: # relative humidity if button == LEFT_BUTTON: self.show_humidity = False else: return elif index == 6: # dew point if button == LEFT_BUTTON: self.show_dew = False else: self.in_deg_f = button == SCROLL_UP elif index == 7: # pressure if button == LEFT_BUTTON: self.show_pressure = False else: return elif index == 8: # visibility if button == LEFT_BUTTON: self.show_visibility = False elif button == SCROLL_UP: unit = self.visibility_unit if unit in {'km' : 'm', 'm' : 'mi'}: self.visibility_unit = {'km' : 'm', 'm' : 'mi'}[unit] else: unit = self.visibility_unit if unit in {'mi' : 'm', 'mile' : 'm', 'm' : 'km'}: self.visibility_unit = {'mi' : 'm', 'mile' : 'm', 'm' : 'km'}[unit] elif index == 9: # sky conditions if button == LEFT_BUTTON: self.show_sky = False else: return self.invalidate() def refresh_(self): if self.semaphore.acquire(blocking = False): for index, station in enumerate(self.stations): try: self.reports[index] = Weather(station) self.up_to_date[index] = True except: self.up_to_date[index] = False self.semaphore.release() self.invalidate() def invalidate(self): self.updated = False Entry.invalidate(self) def get_weather(self, index): station = self.stations[index] report = self.reports[index] up_to_date = self.up_to_date[index] station = station[0] + station[1:].lower() if report is None: if self.show_station: return '%s: ?' % station else: return '?' text = [] if self.show_temp and report.temp is not None: value = report.temp colour = '34' if value < -10: colour = '39;44' if value >= 18: colour = '39' if value >= 25: colour = '33' if value >= 30: colour = '31' if self.in_deg_f: value = value * 9 / 5 + 32 value = '\033[%sm%.0f\033[0m%s' % (colour, value, '°F' if self.in_deg_f else '°C') text.append(value) else: text.append(None) if self.show_wind_speed and report.wind_speed is not None: knot = report.wind_speed kmph = knot * 1.852 # km/h mps = kmph / 3.6 # m/s mph = knot * 1.151 # mph ftps = mps * 3.281 # ft/s beaufort = (mps / 0.8365) ** (2 / 3) # B if self.wind_unit in ('kn', 'kt', 'knot', 'knots'): value, unit = knot, 'kn' elif self.wind_unit in ('km/h', 'kmph'): value, unit = kmph, 'km/h' elif self.wind_unit == 'mph': value, unit = mph, 'mph' elif self.wind_unit in ('ft/s', 'ftps'): value, unit = ftps, 'ft/s' elif self.wind_unit.lower() in ('beaufort', 'beaufort scale', 'bf', 'bt', 'b'): value, unit = beaufort, 'B' else: value, unit = mps, 'm/s' beaufort = int(beaufort + 0.5) colour = '32' if beaufort >= 1: colour = '39' if beaufort >= 3: colour = '32' if beaufort >= 7: colour = '33' if beaufort >= 10: colour = '31' if beaufort >= 12: colour = '39;41' wind = '\033[%sm%.0f\033[0m' % (colour, value) if self.show_wind_gusts and report.wind_gusts is not None: knot = report.wind_gusts kmph = knot * 1.852 # km/h mps = kmph / 3.6 # m/s mph = knot * 1.151 # mph ftps = mps * 3.281 # ft/s beaufort = (mps / 0.8365) ** (2 / 3) # B if self.wind_unit in ('kn', 'kt', 'knot', 'knots'): value = knot elif self.wind_unit in ('km/h', 'kmph'): value = kmph elif self.wind_unit == 'mph': value = mph elif self.wind_unit in ('ft/s', 'ftps'): value = ftps elif self.wind_unit.lower() in ('beaufort', 'beaufort scale', 'bf', 'bt', 'b'): value = beaufort else: value = mps beaufort = int(beaufort + 0.5) colour = '32' if beaufort >= 1: colour = '39' if beaufort >= 3: colour = '32' if beaufort >= 7: colour = '33' if beaufort >= 10: colour = '31' if beaufort >= 12: colour = '39;41' wind += '→\033[%sm%.0f\033[0m' % (colour, value) wind += unit text.append(wind) else: text.append(None) def wind_dir(direction): if not self.from_wind: direction += 180 direction %= 360 if direction < 0: direction += 360 if (direction < 90): if direction == 0: return 'N' if direction <= 45: return 'N%.0f°E' % direction return 'E%.0f°N' % (90 - direction) elif (90 <= direction < 180): if direction == 90: return 'E' direction -= 90 if direction <= 45: return 'E%.0f°S' % direction return 'S%.0f°E' % (90 - direction) elif (180 <= direction < 270): if direction == 180: return 'S' direction -= 180 if direction <= 45: return 'S%.0f°W' % direction return 'W%.0f°S' % (90 - direction) else: if direction == 270: return 'W' direction -= 270 if direction <= 45: return 'W%.0f°N' % direction return 'N%.0f°W' % (90 - direction) if not self.show_wind_dir: wind = None elif report.wind_var is not None: wind = wind_dir(report.wind_var[0]) + '-' + wind_dir(report.wind_var[1]) elif report.wind_dir is not None: wind = wind_dir(report.wind_dir) else: wind = None if wind is not None: if self.from_wind: wind += '→' else: wind = '→' + wind text.append(wind) else: text.append(None) if self.show_wind_chill and report.wind_chill is not None: value = report.wind_chill # Coloured by frostbite time, however the wind is not taken # into account so the colour is approximate colour = '39' if value <= -28: colour = '33' if value <= -37: colour = '31' if value <= -46: colour = '39;41' if self.in_deg_f: value = value * 9 / 5 + 32 value = 'w\033[%sm%.0f\033[0m%s' % (colour, value, '°F' if self.in_deg_f else '°C') text.append(value) else: text.append(None) if self.show_humidity and report.humidity is not None: value = report.humidity colour = '34' if value >= 26: colour = '32' if value >= 31: colour = '39' if value >= 37: colour = '33' if value >= 52: colour = '31' if value >= 73: colour = '39;41' value = '\033[%sm%.0f\033[0m%%' % (colour, value) text.append(value) else: text.append(None) if self.show_dew and report.dew is not None: value = report.dew colour = '34' if value >= 10: colour = '32' if value >= 13: colour = '39' if value >= 16: colour = '33' if value >= 21: colour = '31' if value >= 26: colour = '39;41' if self.in_deg_f: value = value * 9 / 5 + 32 value = 'd\033[%sm%.0f\033[0m%s' % (colour, value, '°F' if self.in_deg_f else '°C') text.append(value) else: text.append(None) if self.show_pressure and report.pressure is not None: text.append('%.0fhPa' % report.pressure) else: text.append(None) if self.show_visibility and report.visibility is not None: mile = report.visibility km = mile * 1.609344 m = mile * 1609.344 if self.visibility_unit in ('mile', 'mi'): value, unit = mile, 'mi' elif self.visibility_unit == 'm': value, unit = m, 'm' else: value, unit = km, 'km' text.append('%.0f%s' % (value, unit)) else: text.append(None) if self.show_sky and 'Sky conditions' in report.fields: value = report.fields['Sky conditions'] if not value == '': text.append(value) else: text.append(None) else: text.append(None) segments = [] x = 0 if self.show_station: w = len(station) segments.append((x, w)) x += w + 2 else: segments.append(None) x -= 1 self.towards = None for i, segment in enumerate(text): if segment is None: segments.append(None) else: if i == 1 and '→' in segment: self.towards = x + Bar.coloured_length(segment.split('→')[0]) w = Bar.coloured_length(segment) x += 1 segments.append((x, w)) x += w self.segments = segments text = [s for s in text if s is not None] if len(text) == 0: return station if not up_to_date: if len(text) <= 1: text = ' '.join(text) + '?' else: text = ' '.join(text) + ' ?' else: text = ' '.join(text) if self.show_station: text = '%s: %s' % (station, text) return text def function(self): if not self.updated: self.updated = True self.text = self.get_weather(self.station) rc = self.text self.refresh() return rc @staticmethod def get_metar_station(): try: with open(HOME + '/.config/metar', 'rb') as file: station = file.read() except: try: with open('/etc/metar', 'rb') as file: station = file.read() except: print('~/.config/metar is not set', file = sys.stderr, flush = True) station = b'ESSA' return station.decode('utf-8', 'strict').split('\n')[0].upper()