aboutsummaryrefslogtreecommitdiffstats
path: root/interface.py
diff options
context:
space:
mode:
authorMattias Andrée <m@maandree.se>2025-02-23 16:39:17 +0100
committerMattias Andrée <m@maandree.se>2025-02-23 16:39:17 +0100
commitc8b1e38beac497f87690b6574433d03a1fadb875 (patch)
treefb544b277e57b8758fb0690047d565b42c506a45 /interface.py
parentUpdate e-mail (diff)
downloadnightshift-c8b1e38beac497f87690b6574433d03a1fadb875.tar.gz
nightshift-c8b1e38beac497f87690b6574433d03a1fadb875.tar.bz2
nightshift-c8b1e38beac497f87690b6574433d03a1fadb875.tar.xz
clean up and change license
Signed-off-by: Mattias Andrée <m@maandree.se>
Diffstat (limited to 'interface.py')
-rw-r--r--interface.py199
1 files changed, 199 insertions, 0 deletions
diff --git a/interface.py b/interface.py
new file mode 100644
index 0000000..1a112a5
--- /dev/null
+++ b/interface.py
@@ -0,0 +1,199 @@
+#!/usr/bin/env python3
+# See LICENSE file for copyright and license details.
+
+import sys
+import fcntl
+import struct
+import signal
+import termios
+import threading
+
+
+ui_state = { 'focus' : 0
+ }
+
+
+def user_interface():
+ '''
+ Start user interface
+ '''
+ global red_condition
+ red_condition = threading.Condition()
+ ui_winch()
+ daemon_thread(ui_status).start()
+ daemon_thread(ui_refresh).start()
+
+ print('\033[?1049h\033[?25l')
+ saved_stty = termios.tcgetattr(sys.stdout.fileno())
+ stty = termios.tcgetattr(sys.stdout.fileno())
+ stty[3] &= ~(termios.ICANON | termios.ECHO | termios.ISIG)
+ try:
+ termios.tcsetattr(sys.stdout.fileno(), termios.TCSAFLUSH, stty)
+ sock.sendall('status\n'.encode('utf-8'))
+ ui_read()
+ finally:
+ termios.tcsetattr(sys.stdout.fileno(), termios.TCSAFLUSH, saved_stty)
+ sys.stdout.buffer.write('\033[?25h\033[?1049l'.encode('utf-8'))
+ sys.stdout.buffer.flush()
+
+
+def ui_print():
+ _button = lambda *i : ('[\033[1m%s\033[m]' if ui_state['focus'] in i else '<%s>')
+ temperature = tuple([red_temperature] + list(red_temperatures))
+ brightness = [b * 100 for b in [red_brightness] + list(red_brightnesses)]
+ print('\033[H', end = '')
+ if red_running:
+ lat, lon = red_location
+ _if = lambda pn, v : pn[0] if v >= 0 else pn[1]
+ print('\033[2KLocation: %.4f°%s %.4f°%s' % (abs(lat), _if('NS', lat), abs(lon), _if('EW', lon)))
+ print('\033[2KTemperature: %.0f K (day: %.0f K, night: %.0f K)' % tuple(temperature))
+ print('\033[2KBrightness: %.0f %% (day: %.0f %%, night: %.0f %%)' % tuple(brightness))
+ print('\033[2KDayness: %.0f %%' % (red_period * 100))
+ print('\033[2K' + ('Dying' if red_dying else ('Enabled' if red_status else 'Disabled')))
+ print('\033[2K\n\033[2K', end = '')
+ if not red_dying:
+ if red_frozen:
+ print(_button(0, 1) % 'Thaw', end = ' ')
+ print(_button(2) % 'Kill', end = ' ')
+ print(_button(3) % 'Close')
+ else:
+ print(_button(0) % ('Disable' if red_status else 'Enable'), end = ' ')
+ print(_button(1) % 'Freeze', end = ' ')
+ print(_button(2) % 'Kill', end = ' ')
+ print(_button(3) % 'Close')
+ else:
+ print(_button(0, 1, 2) % 'Kill immediately', end = ' ')
+ print(_button(3) % 'Close')
+ else:
+ print('\033[2KNot running')
+ print('\033[2K\n\033[2K', end = '')
+ print(_button(0, 1, 2) % 'Revive', end = ' ')
+ print(_button(3) % 'Close')
+ print('\033[J')
+
+
+def ui_read():
+ global red_dying, red_frozen
+ inbuf = sys.stdin.buffer
+ while True:
+ c = inbuf.read(1)
+ if c == b'q':
+ break
+ elif c == b'\t':
+ red_condition.acquire()
+ try:
+ if red_running and not red_dying:
+ if red_frozen and (ui_state['focus'] == 0):
+ ui_state['focus'] = 1
+ ui_state['focus'] = (ui_state['focus'] + 1) % 4
+ if red_frozen and (ui_state['focus'] == 0):
+ ui_state['focus'] = 1
+ elif ui_state['focus'] == 3:
+ ui_state['focus'] = 0
+ else:
+ ui_state['focus'] = 3
+ red_condition.notify()
+ finally:
+ red_condition.release()
+ elif c in b' \n':
+ red_condition.acquire()
+ try:
+ if ui_state['focus'] == 3:
+ break
+ elif red_running:
+ if red_dying or (ui_state['focus'] == 2):
+ sock.sendall('kill\n'.encode('utf-8'))
+ red_dying = True
+ elif red_frozen:
+ sock.sendall('thaw\n'.encode('utf-8'))
+ red_frozen = False
+ else:
+ if ui_state['focus'] == 0:
+ sock.sendall('toggle\n'.encode('utf-8'))
+ elif ui_state['focus'] == 1:
+ sock.sendall('freeze\n'.encode('utf-8'))
+ red_frozen = True
+ red_condition.notify()
+ else:
+ respawn_daemon()
+ daemon_thread(ui_status).start()
+ sock.sendall('status\n'.encode('utf-8'))
+ sock.sendall('listen\n'.encode('utf-8'))
+ finally:
+ red_condition.release()
+
+
+def ui_refresh():
+ while True:
+ red_condition.acquire()
+ try:
+ red_condition.wait()
+ ui_print()
+ finally:
+ red_condition.release()
+
+
+def ui_winch():
+ global height, width
+ (height, width) = struct.unpack('hh', fcntl.ioctl(sys.stdout.fileno(), termios.TIOCGWINSZ, '1234'))
+ def winch(signal, frame):
+ global height, width
+ (height, width) = struct.unpack('hh', fcntl.ioctl(sys.stdout.fileno(), termios.TIOCGWINSZ, '1234'))
+ red_condition.acquire()
+ try:
+ red_condition.notify()
+ finally:
+ red_condition.release()
+ signal.signal(signal.SIGWINCH, winch)
+
+
+def ui_status():
+ buf = ''
+ continue_to_run = True
+ while continue_to_run:
+ while '\n\n' not in buf:
+ got = sock.recv(1024)
+ if (got is None) or (len(got) == 0):
+ continue_to_run = False
+ break
+ buf += got.decode('utf-8', 'replace')
+ if '\n\n' in buf:
+ break
+ if continue_to_run:
+ msg, buf = buf.split('\n\n')[0], '\n\n'.join(buf.split('\n\n')[1:])
+ ui_status_callback(dict([line.split(': ') for line in msg.split('\n')]))
+ ui_status_callback(None)
+
+
+def ui_status_callback(status):
+ global red_brightness, red_temperature, red_brightnesses, red_temperatures
+ global red_period, red_location, red_status, red_running, red_dying, red_frozen
+ if status is not None:
+ brightness = [float(status['%s brightness' % k]) for k in ('Current', 'Daytime', 'Night')]
+ temperature = [float(status['%s temperature' % k]) for k in ('Current', 'Daytime', 'Night')]
+ red_condition.acquire()
+ try:
+ red_brightness, red_brightnesses = brightness[0], tuple(brightness[1:])
+ red_temperature, red_temperatures = temperature[0], tuple(temperature[1:])
+ red_period = float(status['Dayness'])
+ red_location = (float(status['Latitude']), float(status['Longitude']))
+ red_status = status['Enabled'] == 'yes'
+ red_running = status['Running'] == 'yes'
+ red_dying = status['Dying'] == 'yes'
+ red_frozen = status['Frozen'] == 'yes'
+ red_condition.notify()
+ finally:
+ red_condition.release()
+ else:
+ red_condition.acquire()
+ try:
+ red_running = False
+ red_condition.notify()
+ finally:
+ red_condition.release()
+
+
+def daemon_thread(target, **kwargs):
+ thread = threading.Thread(target = target, **kwargs)
+ thread.setDaemon(True)
+ return thread