#!/usr/bin/env python3 # -*- python -*- copyright=''' nightshift - A terminal user interface for redshift Copyright © 2014 Mattias Andrée (maandree@member.fsf.org) This program is free software: you can redistribute it and/or modify it under the terms of the GNU General Public License as published by the Free Software Foundation, either version 3 of the License, or (at your option) any later version. This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details. You should have received a copy of the GNU General Public License along with this program. If not, see . ''' import os import sys import socket import signal import threading from subprocess import Popen, PIPE PROGRAM_NAME = 'nightshift' ''' :str The name of the program ''' PROGRAM_VERSION = '0.4' ''' :str The version of the program ''' ## Set process title def setproctitle(title): ''' Set process title @param title:str The title of the process ''' import ctypes try: # Remove path, keep only the file, # otherwise we get really bad effects, namely # the name title is truncates by the number # of slashes in the title. At least that is # the observed behaviour when using procps-ng. title = title.split('/')[-1] # Create strng buffer with title title = title.encode(sys.getdefaultencoding(), 'replace') title = ctypes.create_string_buffer(title) if 'linux' in sys.platform: # Set process title on Linux libc = ctypes.cdll.LoadLibrary('libc.so.6') libc.prctl(15, ctypes.byref(title), 0, 0, 0) elif 'bsd' in sys.platform: # Set process title on at least FreeBSD libc = ctypes.cdll.LoadLibrary('libc.so.7') libc.setproctitle(ctypes.create_string_buffer(b'-%s'), title) except: pass setproctitle(sys.argv[0]) backlog = 5 ''' :int The size of the server socket's backlog ''' red_args = None ''' :list? Raw arguments passed to redshift ''' red_opts = ['-v'] ''' :list Nightshift parsed options passed to redshift ''' daemon = 0 ''' :int Whether or not to run as daemon, 2 if revived ''' kill = 0 ''' :int Whether or not to kill the redshift and the nightshift daemon, 0 for no, 1 for yes, 2 for immediately ''' toggle = False ''' :bool Whether or not to toggle redshift ''' set_status = None ''' :bool? `True` if redshift should be enabled, `False` for disble, otherwise `None` ''' set_freeze = None ''' :bool? `True` if redshift should be frozen, `False` for thawed, otherwise `None` ''' status = False ''' :bool Whether or not to get the current status ''' conf_opts = [] ''' :list This list will always have at least one element. This list is filled with options passed to the configurations, with the first element being the configuration file ''' config_file = None ''' :str? The configuration file, same as the first element in `conf_opts` ''' ## Parse options add_to_red_opts = False reading_conf_opts = False for arg in sys.argv[1:]: if add_to_red_opts: red_opts.append(arg) add_to_red_opts = False elif reading_conf_opts: if arg == '}': reading_conf_opts = False else: conf_opts.append(arg) elif isinstance(config_file, list): config_file = arg elif red_args is not None: red_args.append(arg) elif arg == '{': reading_conf_opts = True elif arg in ('-V', '--version', '-version'): ## Print the version of nightshift and of redshift print('%s %s' % (PROGRAM_NAME, PROGRAM_VERSION)) Popen(['redshift', '-V'], stdout = sys.stdout, env = redshift_env).wait() sys.exit(0) elif arg in ('-C', '--copyright', '-copyright'): ## Print copyright information print(copyright[1 : -1]) sys.exit(0) elif arg in ('-W', '--warranty', '-warranty'): ## Print warranty disclaimer print(copyright.split('\n\n')[-2]) sys.exit(0) elif arg in ('-h', '-?', '--help', '-help'): ## Display help message text = '''USAGE: nightshift [OPTIONS...] ['{' SCRIPT-OPTIONS... '}'] ['--' REDSHIFT-OPTIONS...] Terminal user interface for redshift, a program for setting the colour temperature of the display according to the time of day. -h --help Display this help message -V --version Show program version -C --copyright Show program copyright information -W --warranty Show program warrantly disclaimer -d --daemon Start as daemon -x --reset --kill Remove adjustment from screen +x --toggle Temporarily disable or enable adjustments +d --disable Temporarily disable adjustments +e --enable Re-enable adjustments +f --freeze Temporarily freeze the redshift process +t --thaw Thaw the redshift process -s --status Print status information +c --script FILE Load nightshift configuration script from specified file -c --config FILE Load redshift settings from specified file -b --brightness DAY:NIGHT Screen brightness to set at daytime/night -b --brightness BRIGHTNESS Screen brightness to apply -t --temperature DAY:NIGHT Colour temperature to set at daytime/night -t --temperature TEMP Colour temperature to apply -l --location LAT:LON Your current location -l --location PROVIDER Select provider for automatic location updates (Type `list' to see available providers) -m --method METHOD Method to use to set colour temperature (Type `list' to see available methods) -r --no-transition Disable temperature transitions Please report bugs to ''' text = text.split('\n')[:-1] indent = min([len(line) - len(line.lstrip()) for line in text if line.rstrip().startswith(' ')]) print('\n'.join([line[indent:] if line.startswith(' ') else line for line in text])) sys.exit(0) elif arg == '--': red_args = [] else: subargs = [arg] if arg.startswith('-') and not arg.startswith('--'): subargs = ['-' + letter for letter in arg[1:]] elif arg.startswith('+') and not arg.startswith('++'): subargs = ['+' + letter for letter in arg[1:]] elif arg.startswith('=') and not arg.startswith('=='): subargs = ['=' + letter for letter in arg[1:]] red_arg = '' for arg in subargs: if (add_to_red_opts is None) or add_to_red_opts: add_to_red_opts = None red_arg += arg elif isinstance(config_file, list): config_file.append(arg) elif arg in ('-d', '--daemon'): daemon = 1 elif arg in ('=d', '==daemon'): daemon = 2 elif arg in ('-x', '--reset', '--kill'): kill += 1 elif arg in ('+x', '--toggle'): toggle = True elif arg in ('+d', '--disable'): set_status = False elif arg in ('+e', '--enable'): set_status = True elif arg in ('+f', '--freeze'): set_freeze = True elif arg in ('+t', '--thaw'): set_freeze = False elif arg in ('-s', '--status'): status = True elif arg in ('+c', '--script'): config_file = [] else: add_to_red_opts = True if arg in ('-c', '--config'): red_opts.append('-c') elif arg in ('-b', '--brightness'): red_opts.append('-b') elif arg in ('-t', '--temperature'): red_opts.append('-t') elif arg in ('-l', '--location'): red_opts.append('-l') elif arg in ('-m', '--method'): red_opts.append('-m') elif arg in ('-r', '--no-transition'): red_opts.append('-r') else: ## Unrecognised option sys.stderr.write('%s: error: unrecognised option: %s\n' % (sys.argv[0], arg)) sys.exit(1) if add_to_red_opts is None: red_opts.append(red_arg) add_to_red_opts = False if isinstance(config_file, list) and (len(config_file) > 0): config_file = ''.join(config_file) if isinstance(config_file, list): sys.stderr.write('%s: error: premature end of arguments\n' % sys.argv[0]) sys.exit(1) # Parse help request for -l and -m for opt in ('-l', '-m'): i = 0 while opt in red_opts[i:]: i = red_opts.index(opt) + 1 if not i == len(red_opts): arg = red_opts[i] if (arg == 'list') or ('help' in arg.split(':')): proc = ['redshift', opt, arg] proc = Popen(proc, stdout = sys.stdout, stderr = sys.stderr, env = redshift_env) proc.wait() sys.exit(proc.returncode) # Translate single-parameter -t into dual-parameter -t i = 0 while '-t' in red_opts[i:]: i = red_opts.index('-t') + 1 if not i == len(red_opts): if ':' not in red_opts[i]: red_opts[i] = '%s:%s' % (red_opts[i], red_opts[i]) # Construct name of socket socket_path = '%s.%s~%s' % ('/dev/shm/', PROGRAM_NAME, os.environ['USER']) ''' The pathname of the interprocess communication socket for nightshift ''' # The status of redshift red_brightness, red_temperature = 1, 6500 red_brightnesses, red_temperatures = (1, 1), (5500, 3500) red_period, red_location = 1, (0, 0) red_status, red_running, red_dying, red_frozen = True, True, False, False red_condition, broadcast_condition = None, None ## Create locale free environment for redshift redshift_env = os.environ.copy() for var in ('LANG', 'LANGUAGE', 'LC_ALL', 'LC_MESSAGES'): redshift_env[var] = 'C' def read_status(proc, sock): ''' Read status from redshift @param proc:Popen The redshift process @param sock:socket The server socket ''' global red_brightness, red_temperature global red_brightnesses, red_temperatures global red_period, red_location global red_status, red_running released = True while True: got = proc.stdout.readline() if (got is None) or (len(got) == 0): if red_frozen: proc.wait() continue break got = got.decode('utf-8', 'replace')[:-1] if ': 'not in got: continue (key, value) = got.split(': ') if released: red_condition.acquire() try: if key == 'Location': def coordcomp(v): v = (v + ' N').split(' ')[:2] return float(v[0]) * (-1 if v[1] in 'SW' else 1) red_location = [coordcomp(v) for v in value.split(', ')] # Followed by 'Temperatures' elif key == 'Temperatures': red_temperatures = [float(v.split(' ')[0][:-1]) for v in value.split(', ')] # Followed by two parameter 'Brightness' elif key == 'Period': if value == 'Night': red_period = 0 elif value == 'Daytime': red_period = 1 else: red_period = float(value.split(' ')[1][1 : -1]) / 100 # Followed by 'Color temperature' elif key == 'Color temperature': red_temperature = float(value[:-1]) # Followed by one parameter 'Brightness' elif key == 'Brightness': if ':' in value: red_brightnesses = [float(v) for v in value.split(':')] else: red_brightness = float(value) # Neither version is followed by anything, notify and release released = True elif key == 'Status': red_status = value == 'Enabled' # Not followed by anything, notify and release released = True if released: red_condition.notify_all() red_condition.release() except: pass if released: red_condition.acquire() red_running = False red_condition.notify_all() red_condition.release() sock.shutdown(socket.SHUT_RDWR) def broadcast_status(sock): ''' Broadcast status updates @param sock:socket The socket connected to the client ''' try: while True: broadcast_condition.acquire() try: broadcast_condition.wait() red_condition.acquire() try: message = generate_status_message() sock.sendall((message + '\n').encode('utf-8')) finally: red_condition.release() finally: broadcast_condition.release() except: pass def generate_status_message(): ''' Generate message to send to the client to inform about the status @return :str Status message ''' message = 'Current brightness: %f\n' % red_brightness message += 'Daytime brightness: %f\n' % red_brightnesses[0] message += 'Night brightness: %f\n' % red_brightnesses[1] message += 'Current temperature: %f\n' % red_temperature message += 'Daytime temperature: %f\n' % red_temperatures[0] message += 'Night temperature: %f\n' % red_temperatures[1] message += 'Dayness: %f\n' % red_period message += 'Latitude: %f\n' % red_location[0] message += 'Longitude: %f\n' % red_location[1] message += 'Enabled: %s\n' % ('yes' if red_status else 'no') message += 'Running: %s\n' % ('yes' if red_running else 'no') message += 'Dying: %s\n' % ('yes' if red_dying else 'no') message += 'Frozen: %s\n' % ('yes' if red_frozen else 'no') return message def use_client(sock, proc): ''' Communication with client @param sock:socket The socket connected to the client @param proc:Popen The redshift process ''' global red_dying, red_frozen buf = '' closed = False while not closed: try: got = sock.recv(128).decode('utf-8', 'error') if (got is None) or (len(got) == 0): break except: break buf += got while '\n' in buf: buf = buf.split('\n') message, buf = buf[0], '\n'.join(buf[1:]) if message == 'status': red_condition.acquire() try: message = generate_status_message() sock.sendall((message + '\n').encode('utf-8')) finally: red_condition.release() elif message == 'toggle': if (not red_dying) and (not red_frozen): proc.send_signal(signal.SIGUSR1) elif message == 'disable': if (not red_dying) and (not red_frozen): if red_status: proc.send_signal(signal.SIGUSR1) elif message == 'enable': if (not red_dying) and (not red_frozen): if not red_status: proc.send_signal(signal.SIGUSR1) elif message == 'freeze': broadcast_condition.acquire() try: if not red_frozen: red_frozen = True proc.send_signal(signal.SIGTSTP) broadcast_condition.notify_all() finally: broadcast_condition.release() elif message == 'thaw': broadcast_condition.acquire() try: if red_frozen: red_frozen = False proc.send_signal(signal.SIGCONT) broadcast_condition.notify_all() finally: broadcast_condition.release() elif message == 'kill': if red_frozen: red_frozen = False proc.send_signal(signal.SIGCONT) red_dying = True proc.terminate() import time time.sleep(0.05) # XXX sometimes redshift is too slow elif message == 'close': closed = True elif message == 'listen': def listen(): while True: red_condition.acquire() try: red_condition.wait() message = generate_status_message() sock.sendall((message + '\n').encode('utf-8')) except: break finally: red_condition.release() thread = threading.Thread(target = listen) thread.setDaemon(True) thread.start() sock.close() def start_daemon_threads(proc, sock): ''' Start the threads for the daemon @param sock:socket The server socket @param proc:Popen The redshift process ''' pass def run_as_daemon(sock): ''' Perform daemon logic @param sock:socket The server socket ''' global red_condition, broadcast_condition, red_pid # Create status conditions red_condition = threading.Condition() broadcast_condition = threading.Condition() # Start redshift command = ['redshift'] + red_opts if red_args is not None: command += red_args proc = Popen(command, stdout = PIPE, stderr = open(os.devnull)) start_daemon_threads(proc, sock) # Read status from redshift thread = threading.Thread(target = read_status, args = (proc, sock)) thread.setDaemon(True) thread.start() red_condition.acquire() broke = False while red_running: red_condition.release() try: (client_sock, _client_address) = sock.accept() except: broke = True break # We have shut down the socket so that accept halts client_thread = threading.Thread(target = use_client, args = (client_sock, proc)) client_thread.setDaemon(True) client_thread.start() # Broadcast status from redshift broadacast_thread = threading.Thread(target = broadcast_status, args = (client_sock,)) broadacast_thread.setDaemon(True) broadacast_thread.start() red_condition.acquire() if not broke: red_condition.release() thread.join() def do_daemon(reexec): ''' Run actions for --daemon or ==daemon @param reexec:bool Wether to perform actions for ==daemon ''' if not reexec: if (kill > 0) or toggle or (set_status is not None) or (set_freeze is not None) or status: disallowed = '-x, +x, +e, +d, +f, +t and -s' print('%s: error: %s can be used when running as the daemon' % (disallowed, sys.argv[0])) sys.exit(1) # Create server socket try: os.unlink(socket_path) except: pass # The fill does (probably) not exist sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) sock.bind(socket_path) sock.listen(backlog) # Signal respawner if reexec: print() sys.stdout.close() # Perform daemon logic run_as_daemon(sock) # Close socket sock.close() def not_running(): ''' Run actions for --status when the daemon is not running ''' print('Not running') def do_status(): ''' Run actions for --status when the daemon is running ''' sock.sendall('status\n'.encode('utf-8')) buf = '' while True: got = sock.recv(1024) if (got is None) or (len(got) == 0): break buf += got.decode('utf-8', 'replace') if '\n\n' in buf: break buf = buf.split('\n\n')[0] + '\n' sys.stdout.buffer.write(buf.encode('utf-8')) sys.stdout.buffer.flush() def do_toggle(): ''' Run actions for --toggle ''' sock.sendall('toggle\n'.encode('utf-8')) def do_disable(): ''' Run actions for --disable ''' sock.sendall('disable\n'.encode('utf-8')) def do_enable(): ''' Run actions for --enable ''' sock.sendall('enable\n'.encode('utf-8')) def do_freeze(): ''' Run actions for --freeze ''' sock.sendall('freeze\n'.encode('utf-8')) def do_thaw(): ''' Run actions for --thaw ''' sock.sendall('thaw\n'.encode('utf-8')) def do_kill(): ''' Run actions for --kill ''' sock.sendall('kill\n'.encode('utf-8')) if kill > 1: sock.sendall('kill\n'.encode('utf-8')) def create_daemon(): ''' Start daemon when it is required but is not running ''' ## Server is not running # Create pipe for interprocess signal (r_end, w_end) = os.pipe() # Duplicate process pid = os.fork() if pid == 0: ## Daemon (child) # Close stdin and stdout if ('DEBUG' not in os.environ) or (not os.environ['DEBUG'] == 'yes'): os.close(sys.stdin.fileno()) os.close(sys.stdout.fileno()) # Create server socket try: os.unlink(socket_path) except: pass # The fill does (probably) not exist sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) sock.bind(socket_path) sock.listen(backlog) # Send signal with os.fdopen(w_end, 'wb') as file: file.write(b'\n') file.flush() # Close the pipe os.close(r_end) # Perform daemon logic run_as_daemon(sock) # Close socket sock.close() # Close process sys.exit(0) else: ## Front-end (parent) # Wait for a signal rc = None with os.fdopen(r_end, 'rb') as file: file.read(1) # Close the pipe os.close(w_end) def create_client(): ''' Create client socket and start daemon if not running @return :socket The client socket ''' # Create socket sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) try: # Connect to the server sock.connect(socket_path) except: # The process need separate sockets, lets close it # and let both process recreate it sock.close() sock = None if status: not_running() sys.exit(0) if sock is None: # Create daemon and wait for it to start listening for clients create_daemon() # Connect to the server sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) sock.connect(socket_path) return sock def run_as_client(): ''' Perform client actions ''' # Temporarily disable or enable redshift if set_status is not None: if set_status: do_enable() else: do_disable() elif toggle: do_toggle() # Freeze or thaw redshift if set_freeze is not None: if set_freeze: do_freeze() else: do_thaw() # Kill redshift and the nightshift daemon if kill > 0: do_kill() # Get redshift status if status: do_status() sock.close() # Start user interface if (kill == 0) and not (status or toggle or (set_status is not None) or (set_freeze is not None)): sock.sendall('listen\n'.encode('utf-8')) user_interface() def do_client(): ''' Do everything that has to do with being a client ''' global sock # Connect to client sock = create_client() # Perform client actions run_as_client() # Close socket try: sock.sendall('close\n'.encode('utf-8')) except: pass sock.close() def respawn_daemon(): ''' Restart the nightshift daemon ''' global sock # Close old socket sock.close() ## Server is not running # Create pipe for interprocess signal (r_end, w_end) = os.pipe() # Duplicate process pid = os.fork() if pid == 0: ## Daemon (child) # Close stdin and stdout os.close(sys.stdin.fileno()) os.close(sys.stdout.fileno()) # Replace stdout with the pipe os.dup2(w_end, sys.stdout.fileno()) os.close(w_end) # Reexecute image exe = os.readlink('/proc/self/exe') os.execl(exe, exe, *(sys.argv + ['==daemon'])) else: ## Front-end (parent) # Wait for a signal rc = None with os.fdopen(r_end, 'rb') as file: file.read(1) # Close the pipe os.close(w_end) # Connect to the server sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) sock.connect(socket_path) def run(): ''' Run as either the daemon (if --daemon or ==daemon) or as a client (otherwise) ''' if daemon > 0: do_daemon(daemon == 2) else: do_client() g, l = globals(), dict(locals()) for key in l: g[key] = l[key] ## Import interface.py with shared globals # Get the Python version v = sys.version_info if (v.major > 3) or ((v.major == 3) and (v.minor >= 4)): # The (new) Python 3.4 way import importlib.util exec(importlib.util.find_spec('interface').loader.get_code('interface'), g) else: # The deprecated legacy way import importlib exec(importlib.find_loader('interface').get_code('interface'), g) ## Load extension and configurations via nightshiftrc # No configuration script has been selected explicitly, # so select one automatically. if config_file is None: # Possible auto-selected configuration scripts, # earlier ones have precedence, we can only select one. files = [] def add_files(var, *ps, multi = False): if var == '~': try: # Get the home (also known as initial) directory of the real user import pwd var = pwd.getpwuid(os.getuid()).pw_dir except: return else: # Resolve environment variable or use empty string if none is selected if (var is None) or (var in os.environ) and (not os.environ[var] == ''): var = '' if var is None else os.environ[var] else: return paths = [var] # Split environment variable value if it is a multi valeu variable if multi and os.pathsep in var: paths = [v for v in var.split(os.pathsep) if not v == ''] # Add files according to patterns for p in ps: p = p.replace('/', os.sep).replace('%', PROGRAM_NAME) for v in paths: files.append(v + p) add_files('XDG_CONFIG_HOME', '/%/%rc', '/%rc') add_files('HOME', '/.config/%/%rc', '/.config/%rc', '/.%rc') add_files('~', '/.config/%/%rc', '/.config/%rc', '/.%rc') add_files('XDG_CONFIG_DIRS', '/%rc', multi = True) add_files(None, '/etc/%rc') for file in files: # If the file we exists, if os.path.exists(file): # select it, config_file = file # and stop trying files with lower precedence. break # As the zeroth argument for the configuration script, # add the configurion script file. Just like the zeroth # command line argument is the invoked command. conf_opts = [config_file] + conf_opts if config_file is not None: code = None # Read configuration script file with open(config_file, 'rb') as script: code = script.read() # Decode configurion script file and add a line break # at the end to ensure that the last line is empty. # If it is not, we will get errors. code = code.decode('utf-8', 'error') + '\n' # Compile the configuration script, code = compile(code, config_file, 'exec') # and run it, with it have the same # globals as this module, so that it can # not only use want we have defined, but # also redefine it for us. exec(code, g) run()