# -*- python -*-
from plugins.weather import Weather
from common import *
class MyWeather(Entry):
def __init__(self, *args, stations = None, fahrenheit = False, wind = 'm/s', from_wind = False, visibility = 'km', **kwargs):
self.semaphore = threading.Semaphore()
self.stations = [MyWeather.get_metar_station()] if stations is None else [s.upper() for s in stations]
self.reports = [None] * len(self.stations)
self.up_to_date = [False] * len(self.stations)
self.in_deg_f = fahrenheit
self.wind_unit = wind
self.from_wind = from_wind
self.visibility_unit = visibility
self.show_temp = True
self.show_wind_speed = False
self.show_wind_gusts = False
self.show_wind_dir = False
self.show_wind_chill = False
self.show_humidity = False
self.show_dew = False
self.show_pressure = False
self.show_visibility = False
self.show_sky = False
self.show_station = False
self.station = 0
self.updated = False
self.segments = []
self.refresh = Sometimes(lambda : xasync(self.refresh_, name = 'weather'), 20 * 60 * 4)
self.refresh()
Entry.__init__(self, *args, **kwargs)
def action(self, col, button, x, y):
if button == MIDDLE_BUTTON:
self.show_temp = True
self.show_wind_speed = True
self.show_wind_gusts = True
self.show_wind_dir = True
self.show_wind_chill = True
self.show_humidity = True
self.show_dew = True
self.show_pressure = True
self.show_visibility = True
self.show_sky = True
self.show_station = True
self.invalidate()
elif button == RIGHT_BUTTON:
self.refresh.counter = 0
self.refresh()
elif button in (LEFT_BUTTON, SCROLL_UP, SCROLL_DOWN):
index = None
for i, seg in enumerate(self.segments):
if seg is None:
continue
(x, width) = seg
if 0 <= col - x < width:
index = i
break
if index is None:
return
if index == 0: # station
if button == LEFT_BUTTON:
self.show_station = False
elif button == SCROLL_UP:
station = self.station
if station + 1 < len(self.stations):
self.station = station + 1
else:
station = self.station
if station > 0:
self.station = station - 1
elif index == 1: # temperature
if button == LEFT_BUTTON:
self.show_temp = False
else:
self.in_deg_f = button == SCROLL_UP
elif index == 2: # wind speed/gusts
if button == LEFT_BUTTON:
if self.towards is None or col < self.towards:
self.show_wind_speed = False
else:
self.show_wind_gusts = False
else:
if self.wind_unit == 'm/s':
self.wind_unit = 'mph' if button == SCROLL_UP else 'm/s'
elif self.wind_unit == 'mph':
self.wind_unit = 'kn' if button == SCROLL_UP else 'm/s'
elif self.wind_unit in ('kn', 'kt', 'knot', 'knots'):
self.wind_unit = 'km/h' if button == SCROLL_UP else 'mph'
elif self.wind_unit in ('km/h', 'kmph'):
self.wind_unit = 'ft/s' if button == SCROLL_UP else 'kn'
elif self.wind_unit in ('ft/s', 'ftps'):
self.wind_unit = 'b' if button == SCROLL_UP else 'km/h'
else:
self.wind_unit = 'b' if button == SCROLL_UP else 'ft/s'
elif index == 3: # wind direction
if button == LEFT_BUTTON:
self.show_wind_dir = False
else:
self.from_wind = button == SCROLL_UP
elif index == 4: # wind chill
if button == LEFT_BUTTON:
self.show_wind_chill = False
else:
self.in_deg_f = button == SCROLL_UP
elif index == 5: # relative humidity
if button == LEFT_BUTTON:
self.show_humidity = False
else:
return
elif index == 6: # dew point
if button == LEFT_BUTTON:
self.show_dew = False
else:
self.in_deg_f = button == SCROLL_UP
elif index == 7: # pressure
if button == LEFT_BUTTON:
self.show_pressure = False
else:
return
elif index == 8: # visibility
if button == LEFT_BUTTON:
self.show_visibility = False
elif button == SCROLL_UP:
unit = self.visibility_unit
if unit in {'km' : 'm', 'm' : 'mi'}:
self.visibility_unit = {'km' : 'm', 'm' : 'mi'}[unit]
else:
unit = self.visibility_unit
if unit in {'mi' : 'm', 'mile' : 'm', 'm' : 'km'}:
self.visibility_unit = {'mi' : 'm', 'mile' : 'm', 'm' : 'km'}[unit]
elif index == 9: # sky conditions
if button == LEFT_BUTTON:
self.show_sky = False
else:
return
self.invalidate()
def refresh_(self):
if self.semaphore.acquire(blocking = False):
for index, station in enumerate(self.stations):
try:
self.reports[index] = Weather(station)
self.up_to_date[index] = True
except:
self.up_to_date[index] = False
self.semaphore.release()
self.invalidate()
def invalidate(self):
self.updated = False
Entry.invalidate(self)
def get_weather(self, index):
station = self.stations[index]
report = self.reports[index]
up_to_date = self.up_to_date[index]
station = station[0] + station[1:].lower()
if report is None:
if self.show_station:
return '%s: ?' % station
else:
return '?'
text = []
if self.show_temp and report.temp is not None:
value = report.temp
colour = '34'
if value < -10: colour = '39;44'
if value >= 18: colour = '39'
if value >= 25: colour = '33'
if value >= 30: colour = '31'
if self.in_deg_f:
value = value * 9 / 5 + 32
value = '\033[%sm%.0f\033[0m%s' % (colour, value, '°F' if self.in_deg_f else '°C')
text.append(value)
else:
text.append(None)
if self.show_wind_speed and report.wind_speed is not None:
knot = report.wind_speed
kmph = knot * 1.852 # km/h
mps = kmph / 3.6 # m/s
mph = knot * 1.151 # mph
ftps = mps * 3.281 # ft/s
beaufort = (mps / 0.8365) ** (2 / 3) # B
if self.wind_unit in ('kn', 'kt', 'knot', 'knots'):
value, unit = knot, 'kn'
elif self.wind_unit in ('km/h', 'kmph'):
value, unit = kmph, 'km/h'
elif self.wind_unit == 'mph':
value, unit = mph, 'mph'
elif self.wind_unit in ('ft/s', 'ftps'):
value, unit = ftps, 'ft/s'
elif self.wind_unit.lower() in ('beaufort', 'beaufort scale', 'bf', 'bt', 'b'):
value, unit = beaufort, 'B'
else:
value, unit = mps, 'm/s'
beaufort = int(beaufort + 0.5)
colour = '32'
if beaufort >= 1: colour = '39'
if beaufort >= 3: colour = '32'
if beaufort >= 7: colour = '33'
if beaufort >= 10: colour = '31'
if beaufort >= 12: colour = '39;41'
wind = '\033[%sm%.0f\033[0m' % (colour, value)
if self.show_wind_gusts and report.wind_gusts is not None:
knot = report.wind_gusts
kmph = knot * 1.852 # km/h
mps = kmph / 3.6 # m/s
mph = knot * 1.151 # mph
ftps = mps * 3.281 # ft/s
beaufort = (mps / 0.8365) ** (2 / 3) # B
if self.wind_unit in ('kn', 'kt', 'knot', 'knots'):
value = knot
elif self.wind_unit in ('km/h', 'kmph'):
value = kmph
elif self.wind_unit == 'mph':
value = mph
elif self.wind_unit in ('ft/s', 'ftps'):
value = ftps
elif self.wind_unit.lower() in ('beaufort', 'beaufort scale', 'bf', 'bt', 'b'):
value = beaufort
else:
value = mps
beaufort = int(beaufort + 0.5)
colour = '32'
if beaufort >= 1: colour = '39'
if beaufort >= 3: colour = '32'
if beaufort >= 7: colour = '33'
if beaufort >= 10: colour = '31'
if beaufort >= 12: colour = '39;41'
wind += '→\033[%sm%.0f\033[0m' % (colour, value)
wind += unit
text.append(wind)
else:
text.append(None)
def wind_dir(direction):
if not self.from_wind:
direction += 180
direction %= 360
if direction < 0:
direction += 360
if (direction < 90):
if direction == 0:
return 'N'
if direction <= 45:
return 'N%.0f°E' % direction
return 'E%.0f°N' % (90 - direction)
elif (90 <= direction < 180):
if direction == 90:
return 'E'
direction -= 90
if direction <= 45:
return 'E%.0f°S' % direction
return 'S%.0f°E' % (90 - direction)
elif (180 <= direction < 270):
if direction == 180:
return 'S'
direction -= 180
if direction <= 45:
return 'S%.0f°W' % direction
return 'W%.0f°S' % (90 - direction)
else:
if direction == 270:
return 'W'
direction -= 270
if direction <= 45:
return 'W%.0f°N' % direction
return 'N%.0f°W' % (90 - direction)
if not self.show_wind_dir:
wind = None
elif report.wind_var is not None:
wind = wind_dir(report.wind_var[0]) + '-' + wind_dir(report.wind_var[1])
elif report.wind_dir is not None:
wind = wind_dir(report.wind_dir)
else:
wind = None
if wind is not None:
if self.from_wind:
wind += '→'
else:
wind = '→' + wind
text.append(wind)
else:
text.append(None)
if self.show_wind_chill and report.wind_chill is not None:
value = report.wind_chill
# Coloured by frostbite time, however the wind is not taken
# into account so the colour is approximate
colour = '39'
if value <= -28: colour = '33'
if value <= -37: colour = '31'
if value <= -46: colour = '39;41'
if self.in_deg_f:
value = value * 9 / 5 + 32
value = 'w\033[%sm%.0f\033[0m%s' % (colour, value, '°F' if self.in_deg_f else '°C')
text.append(value)
else:
text.append(None)
if self.show_humidity and report.humidity is not None:
value = report.humidity
colour = '34'
if value >= 26: colour = '32'
if value >= 31: colour = '39'
if value >= 37: colour = '33'
if value >= 52: colour = '31'
if value >= 73: colour = '39;41'
value = '\033[%sm%.0f\033[0m%%' % (colour, value)
text.append(value)
else:
text.append(None)
if self.show_dew and report.dew is not None:
value = report.dew
colour = '34'
if value >= 10: colour = '32'
if value >= 13: colour = '39'
if value >= 16: colour = '33'
if value >= 21: colour = '31'
if value >= 26: colour = '39;41'
if self.in_deg_f:
value = value * 9 / 5 + 32
value = 'd\033[%sm%.0f\033[0m%s' % (colour, value, '°F' if self.in_deg_f else '°C')
text.append(value)
else:
text.append(None)
if self.show_pressure and report.pressure is not None:
text.append('%.0fhPa' % report.pressure)
else:
text.append(None)
if self.show_visibility and report.visibility is not None:
mile = report.visibility
km = mile * 1.609344
m = mile * 1609.344
if self.visibility_unit in ('mile', 'mi'):
value, unit = mile, 'mi'
elif self.visibility_unit == 'm':
value, unit = m, 'm'
else:
value, unit = km, 'km'
text.append('%.0f%s' % (value, unit))
else:
text.append(None)
if self.show_sky and 'Sky conditions' in report.fields:
value = report.fields['Sky conditions']
if not value == '':
text.append(value)
else:
text.append(None)
else:
text.append(None)
segments = []
x = 0
if self.show_station:
w = len(station)
segments.append((x, w))
x += w + 2
else:
segments.append(None)
x -= 1
self.towards = None
for i, segment in enumerate(text):
if segment is None:
segments.append(None)
else:
if i == 1 and '→' in segment:
self.towards = x + Bar.coloured_length(segment.split('→')[0])
w = Bar.coloured_length(segment)
x += 1
segments.append((x, w))
x += w
self.segments = segments
text = [s for s in text if s is not None]
if len(text) == 0:
return station
if not up_to_date:
if len(text) <= 1:
text = ' '.join(text) + '?'
else:
text = ' '.join(text) + ' ?'
else:
text = ' '.join(text)
if self.show_station:
text = '%s: %s' % (station, text)
return text
def function(self):
if not self.updated:
self.updated = True
self.text = self.get_weather(self.station)
rc = self.text
self.refresh()
return rc
@staticmethod
def get_metar_station():
try:
with open(HOME + '/.config/metar', 'rb') as file:
station = file.read()
except:
try:
with open('/etc/metar', 'rb') as file:
station = file.read()
except:
print('~/.config/metar is not set', file = sys.stderr, flush = True)
station = b'ESSA'
return station.decode('utf-8', 'strict').split('\n')[0].upper()