#!/usr/bin/env python3 # See LICENSE file for copyright and license details. copyright=''' ISC License © 2014, 2025 Mattias Andrée Permission to use, copy, modify, and/or distribute this software for any purpose with or without fee is hereby granted, provided that the above copyright notice and this permission notice appear in all copies. THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. ''' import os import sys import socket import signal import threading from subprocess import Popen, PIPE PROGRAM_NAME = 'nightshift' ''' :str The name of the program ''' PROGRAM_VERSION = '0.4' ''' :str The version of the program ''' ## Set process title def setproctitle(title): ''' Set process title @param title:str The title of the process ''' import ctypes try: # Remove path, keep only the file, # otherwise we get really bad effects, namely # the name title is truncates by the number # of slashes in the title. At least that is # the observed behaviour when using procps-ng. title = title.split('/')[-1] # Create strng buffer with title title = title.encode(sys.getdefaultencoding(), 'replace') title = ctypes.create_string_buffer(title) if 'linux' in sys.platform: # Set process title on Linux libc = ctypes.cdll.LoadLibrary('libc.so.6') libc.prctl(15, ctypes.byref(title), 0, 0, 0) elif 'bsd' in sys.platform: # Set process title on at least FreeBSD libc = ctypes.cdll.LoadLibrary('libc.so.7') libc.setproctitle(ctypes.create_string_buffer(b'-%s'), title) except: pass setproctitle(sys.argv[0]) backlog = 5 ''' :int The size of the server socket's backlog ''' red_args = None ''' :list? Raw arguments passed to redshift ''' red_opts = ['-v'] ''' :list Nightshift parsed options passed to redshift ''' daemon = 0 ''' :int Whether or not to run as daemon, 2 if revived ''' kill = 0 ''' :int Whether or not to kill the redshift and the nightshift daemon, 0 for no, 1 for yes, 2 for immediately ''' toggle = False ''' :bool Whether or not to toggle redshift ''' set_status = None ''' :bool? `True` if redshift should be enabled, `False` for disble, otherwise `None` ''' set_freeze = None ''' :bool? `True` if redshift should be frozen, `False` for thawed, otherwise `None` ''' status = False ''' :bool Whether or not to get the current status ''' conf_opts = [] ''' :list This list will always have at least one element. This list is filled with options passed to the configurations, with the first element being the configuration file ''' config_file = None ''' :str? The configuration file, same as the first element in `conf_opts` ''' ## Parse options add_to_red_opts = False reading_conf_opts = False for arg in sys.argv[1:]: if add_to_red_opts: red_opts.append(arg) add_to_red_opts = False elif reading_conf_opts: if arg == '}': reading_conf_opts = False else: conf_opts.append(arg) elif isinstance(config_file, list): config_file = arg elif red_args is not None: red_args.append(arg) elif arg == '{': reading_conf_opts = True elif arg in ('-V', '--version', '-version'): ## Print the version of nightshift and of redshift print('%s %s' % (PROGRAM_NAME, PROGRAM_VERSION)) Popen(['redshift', '-V'], stdout = sys.stdout, env = redshift_env).wait() sys.exit(0) elif arg in ('-C', '--copyright', '-copyright'): ## Print copyright information print(copyright[1 : -1]) sys.exit(0) elif arg in ('-W', '--warranty', '-warranty'): ## Print warranty disclaimer print(copyright.split('\n\n')[-2]) sys.exit(0) elif arg in ('-h', '-?', '--help', '-help'): ## Display help message text = '''USAGE: nightshift [OPTIONS...] ['{' SCRIPT-OPTIONS... '}'] ['--' REDSHIFT-OPTIONS...] Terminal user interface for redshift, a program for setting the colour temperature of the display according to the time of day. -h --help Display this help message -V --version Show program version -C --copyright Show program copyright information -W --warranty Show program warrantly disclaimer -d --daemon Start as daemon -x --reset --kill Remove adjustment from screen +x --toggle Temporarily disable or enable adjustments +d --disable Temporarily disable adjustments +e --enable Re-enable adjustments +f --freeze Temporarily freeze the redshift process +t --thaw Thaw the redshift process -s --status Print status information +c --script FILE Load nightshift configuration script from specified file -c --config FILE Load redshift settings from specified file -b --brightness DAY:NIGHT Screen brightness to set at daytime/night -b --brightness BRIGHTNESS Screen brightness to apply -t --temperature DAY:NIGHT Colour temperature to set at daytime/night -t --temperature TEMP Colour temperature to apply -l --location LAT:LON Your current location -l --location PROVIDER Select provider for automatic location updates (Type `list' to see available providers) -m --method METHOD Method to use to set colour temperature (Type `list' to see available methods) -r --no-transition Disable temperature transitions ''' text = text.split('\n')[:-1] indent = min([len(line) - len(line.lstrip()) for line in text if line.rstrip().startswith(' ')]) print('\n'.join([line[indent:] if line.startswith(' ') else line for line in text])) sys.exit(0) elif arg == '--': red_args = [] else: subargs = [arg] if arg.startswith('-') and not arg.startswith('--'): subargs = ['-' + letter for letter in arg[1:]] elif arg.startswith('+') and not arg.startswith('++'): subargs = ['+' + letter for letter in arg[1:]] elif arg.startswith('=') and not arg.startswith('=='): subargs = ['=' + letter for letter in arg[1:]] red_arg = '' for arg in subargs: if (add_to_red_opts is None) or add_to_red_opts: add_to_red_opts = None red_arg += arg elif isinstance(config_file, list): config_file.append(arg) elif arg in ('-d', '--daemon'): daemon = 1 elif arg in ('=d', '==daemon'): daemon = 2 elif arg in ('-x', '--reset', '--kill'): kill += 1 elif arg in ('+x', '--toggle'): toggle = True elif arg in ('+d', '--disable'): set_status = False elif arg in ('+e', '--enable'): set_status = True elif arg in ('+f', '--freeze'): set_freeze = True elif arg in ('+t', '--thaw'): set_freeze = False elif arg in ('-s', '--status'): status = True elif arg in ('+c', '--script'): config_file = [] else: add_to_red_opts = True if arg in ('-c', '--config'): red_opts.append('-c') elif arg in ('-b', '--brightness'): red_opts.append('-b') elif arg in ('-t', '--temperature'): red_opts.append('-t') elif arg in ('-l', '--location'): red_opts.append('-l') elif arg in ('-m', '--method'): red_opts.append('-m') elif arg in ('-r', '--no-transition'): red_opts.append('-r') else: ## Unrecognised option sys.stderr.write('%s: error: unrecognised option: %s\n' % (sys.argv[0], arg)) sys.exit(1) if add_to_red_opts is None: red_opts.append(red_arg) add_to_red_opts = False if isinstance(config_file, list) and (len(config_file) > 0): config_file = ''.join(config_file) if isinstance(config_file, list): sys.stderr.write('%s: error: premature end of arguments\n' % sys.argv[0]) sys.exit(1) # Parse help request for -l and -m for opt in ('-l', '-m'): i = 0 while opt in red_opts[i:]: i = red_opts.index(opt) + 1 if not i == len(red_opts): arg = red_opts[i] if (arg == 'list') or ('help' in arg.split(':')): proc = ['redshift', opt, arg] proc = Popen(proc, stdout = sys.stdout, stderr = sys.stderr, env = redshift_env) proc.wait() sys.exit(proc.returncode) # Translate single-parameter -t into dual-parameter -t i = 0 while '-t' in red_opts[i:]: i = red_opts.index('-t') + 1 if not i == len(red_opts): if ':' not in red_opts[i]: red_opts[i] = '%s:%s' % (red_opts[i], red_opts[i]) # Construct name of socket socket_path = '%s.%s~%s' % ('/dev/shm/', PROGRAM_NAME, os.environ['USER']) ''' The pathname of the interprocess communication socket for nightshift ''' # The status of redshift red_brightness, red_temperature = 1, 6500 red_brightnesses, red_temperatures = (1, 1), (5500, 3500) red_period, red_location = 1, (0, 0) red_status, red_running, red_dying, red_frozen = True, True, False, False red_condition, broadcast_condition = None, None ## Create locale free environment for redshift redshift_env = os.environ.copy() for var in ('LANG', 'LANGUAGE', 'LC_ALL', 'LC_MESSAGES'): redshift_env[var] = 'C' def read_status(proc, sock): ''' Read status from redshift @param proc:Popen The redshift process @param sock:socket The server socket ''' global red_brightness, red_temperature global red_brightnesses, red_temperatures global red_period, red_location global red_status, red_running released = True while True: got = proc.stdout.readline() if (got is None) or (len(got) == 0): if red_frozen: proc.wait() continue break got = got.decode('utf-8', 'replace')[:-1] if ': 'not in got: continue (key, value) = got.split(': ') if released: red_condition.acquire() try: if key == 'Location': def coordcomp(v): v = (v + ' N').split(' ')[:2] return float(v[0]) * (-1 if v[1] in 'SW' else 1) red_location = [coordcomp(v) for v in value.split(', ')] # Followed by 'Temperatures' elif key == 'Temperatures': red_temperatures = [float(v.split(' ')[0][:-1]) for v in value.split(', ')] # Followed by two parameter 'Brightness' elif key == 'Period': if value == 'Night': red_period = 0 elif value == 'Daytime': red_period = 1 else: red_period = float(value.split(' ')[1][1 : -1]) / 100 # Followed by 'Color temperature' elif key == 'Color temperature': red_temperature = float(value[:-1]) # Followed by one parameter 'Brightness' elif key == 'Brightness': if ':' in value: red_brightnesses = [float(v) for v in value.split(':')] else: red_brightness = float(value) # Neither version is followed by anything, notify and release released = True elif key == 'Status': red_status = value == 'Enabled' # Not followed by anything, notify and release released = True if released: red_condition.notify_all() red_condition.release() except: pass if released: red_condition.acquire() red_running = False red_condition.notify_all() red_condition.release() sock.shutdown(socket.SHUT_RDWR) def broadcast_status(sock): ''' Broadcast status updates @param sock:socket The socket connected to the client ''' try: while True: broadcast_condition.acquire() try: broadcast_condition.wait() red_condition.acquire() try: message = generate_status_message() sock.sendall((message + '\n').encode('utf-8')) finally: red_condition.release() finally: broadcast_condition.release() except: pass def generate_status_message(): ''' Generate message to send to the client to inform about the status @return :str Status message ''' message = 'Current brightness: %f\n' % red_brightness message += 'Daytime brightness: %f\n' % red_brightnesses[0] message += 'Night brightness: %f\n' % red_brightnesses[1] message += 'Current temperature: %f\n' % red_temperature message += 'Daytime temperature: %f\n' % red_temperatures[0] message += 'Night temperature: %f\n' % red_temperatures[1] message += 'Dayness: %f\n' % red_period message += 'Latitude: %f\n' % red_location[0] message += 'Longitude: %f\n' % red_location[1] message += 'Enabled: %s\n' % ('yes' if red_status else 'no') message += 'Running: %s\n' % ('yes' if red_running else 'no') message += 'Dying: %s\n' % ('yes' if red_dying else 'no') message += 'Frozen: %s\n' % ('yes' if red_frozen else 'no') return message def use_client(sock, proc): ''' Communication with client @param sock:socket The socket connected to the client @param proc:Popen The redshift process ''' global red_dying, red_frozen buf = '' closed = False while not closed: try: got = sock.recv(128).decode('utf-8', 'strict') if (got is None) or (len(got) == 0): break except: break buf += got while '\n' in buf: buf = buf.split('\n') message, buf = buf[0], '\n'.join(buf[1:]) if message == 'status': red_condition.acquire() try: message = generate_status_message() sock.sendall((message + '\n').encode('utf-8')) finally: red_condition.release() elif message == 'toggle': if (not red_dying) and (not red_frozen): proc.send_signal(signal.SIGUSR1) elif message == 'disable': if (not red_dying) and (not red_frozen): if red_status: proc.send_signal(signal.SIGUSR1) elif message == 'enable': if (not red_dying) and (not red_frozen): if not red_status: proc.send_signal(signal.SIGUSR1) elif message == 'freeze': broadcast_condition.acquire() try: if not red_frozen: red_frozen = True proc.send_signal(signal.SIGTSTP) broadcast_condition.notify_all() finally: broadcast_condition.release() elif message == 'thaw': broadcast_condition.acquire() try: if red_frozen: red_frozen = False proc.send_signal(signal.SIGCONT) broadcast_condition.notify_all() finally: broadcast_condition.release() elif message == 'kill': if red_frozen: red_frozen = False proc.send_signal(signal.SIGCONT) red_dying = True proc.terminate() import time time.sleep(0.05) # XXX sometimes redshift is too slow elif message == 'close': closed = True elif message == 'listen': def listen(): while True: red_condition.acquire() try: red_condition.wait() message = generate_status_message() sock.sendall((message + '\n').encode('utf-8')) except: break finally: red_condition.release() thread = threading.Thread(target = listen) thread.setDaemon(True) thread.start() sock.close() def start_daemon_threads(proc, sock): ''' Start the threads for the daemon @param sock:socket The server socket @param proc:Popen The redshift process ''' pass def run_as_daemon(sock): ''' Perform daemon logic @param sock:socket The server socket ''' global red_condition, broadcast_condition, red_pid # Create status conditions red_condition = threading.Condition() broadcast_condition = threading.Condition() # Start redshift command = ['redshift'] + red_opts if red_args is not None: command += red_args proc = Popen(command, stdout = PIPE, stderr = open(os.devnull)) start_daemon_threads(proc, sock) # Read status from redshift thread = threading.Thread(target = read_status, args = (proc, sock)) thread.setDaemon(True) thread.start() red_condition.acquire() broke = False while red_running: red_condition.release() try: (client_sock, _client_address) = sock.accept() except: broke = True break # We have shut down the socket so that accept halts client_thread = threading.Thread(target = use_client, args = (client_sock, proc)) client_thread.setDaemon(True) client_thread.start() # Broadcast status from redshift broadacast_thread = threading.Thread(target = broadcast_status, args = (client_sock,)) broadacast_thread.setDaemon(True) broadacast_thread.start() red_condition.acquire() if not broke: red_condition.release() thread.join() def do_daemon(reexec): ''' Run actions for --daemon or ==daemon @param reexec:bool Wether to perform actions for ==daemon ''' if not reexec: if (kill > 0) or toggle or (set_status is not None) or (set_freeze is not None) or status: disallowed = '-x, +x, +e, +d, +f, +t and -s' print('%s: error: %s can be used when running as the daemon' % (disallowed, sys.argv[0])) sys.exit(1) # Create server socket try: os.unlink(socket_path) except: pass # The fill does (probably) not exist sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) sock.bind(socket_path) sock.listen(backlog) # Signal respawner if reexec: print() sys.stdout.close() # Perform daemon logic run_as_daemon(sock) # Close socket sock.close() def not_running(): ''' Run actions for --status when the daemon is not running ''' print('Not running') def do_status(): ''' Run actions for --status when the daemon is running ''' sock.sendall('status\n'.encode('utf-8')) buf = '' while True: got = sock.recv(1024) if (got is None) or (len(got) == 0): break buf += got.decode('utf-8', 'replace') if '\n\n' in buf: break buf = buf.split('\n\n')[0] + '\n' sys.stdout.buffer.write(buf.encode('utf-8')) sys.stdout.buffer.flush() def do_toggle(): ''' Run actions for --toggle ''' sock.sendall('toggle\n'.encode('utf-8')) def do_disable(): ''' Run actions for --disable ''' sock.sendall('disable\n'.encode('utf-8')) def do_enable(): ''' Run actions for --enable ''' sock.sendall('enable\n'.encode('utf-8')) def do_freeze(): ''' Run actions for --freeze ''' sock.sendall('freeze\n'.encode('utf-8')) def do_thaw(): ''' Run actions for --thaw ''' sock.sendall('thaw\n'.encode('utf-8')) def do_kill(): ''' Run actions for --kill ''' sock.sendall('kill\n'.encode('utf-8')) if kill > 1: sock.sendall('kill\n'.encode('utf-8')) def create_daemon(): ''' Start daemon when it is required but is not running ''' ## Server is not running # Create pipe for interprocess signal (r_end, w_end) = os.pipe() # Duplicate process pid = os.fork() if pid == 0: ## Daemon (child) # Close stdin and stdout if ('DEBUG' not in os.environ) or (not os.environ['DEBUG'] == 'yes'): os.close(sys.stdin.fileno()) os.close(sys.stdout.fileno()) # Create server socket try: os.unlink(socket_path) except: pass # The fill does (probably) not exist sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) sock.bind(socket_path) sock.listen(backlog) # Send signal with os.fdopen(w_end, 'wb') as file: file.write(b'\n') file.flush() # Close the pipe os.close(r_end) # Perform daemon logic run_as_daemon(sock) # Close socket sock.close() # Close process sys.exit(0) else: ## Front-end (parent) # Wait for a signal rc = None with os.fdopen(r_end, 'rb') as file: file.read(1) # Close the pipe os.close(w_end) def create_client(): ''' Create client socket and start daemon if not running @return :socket The client socket ''' # Create socket sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) try: # Connect to the server sock.connect(socket_path) except: # The process need separate sockets, lets close it # and let both process recreate it sock.close() sock = None if status: not_running() sys.exit(0) if sock is None: # Create daemon and wait for it to start listening for clients create_daemon() # Connect to the server sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) sock.connect(socket_path) return sock def run_as_client(): ''' Perform client actions ''' # Temporarily disable or enable redshift if set_status is not None: if set_status: do_enable() else: do_disable() elif toggle: do_toggle() # Freeze or thaw redshift if set_freeze is not None: if set_freeze: do_freeze() else: do_thaw() # Kill redshift and the nightshift daemon if kill > 0: do_kill() # Get redshift status if status: do_status() sock.close() # Start user interface if (kill == 0) and not (status or toggle or (set_status is not None) or (set_freeze is not None)): sock.sendall('listen\n'.encode('utf-8')) user_interface() def do_client(): ''' Do everything that has to do with being a client ''' global sock # Connect to client sock = create_client() # Perform client actions run_as_client() # Close socket try: sock.sendall('close\n'.encode('utf-8')) except: pass sock.close() def respawn_daemon(): ''' Restart the nightshift daemon ''' global sock # Close old socket sock.close() ## Server is not running # Create pipe for interprocess signal (r_end, w_end) = os.pipe() # Duplicate process pid = os.fork() if pid == 0: ## Daemon (child) # Close stdin and stdout os.close(sys.stdin.fileno()) os.close(sys.stdout.fileno()) # Replace stdout with the pipe os.dup2(w_end, sys.stdout.fileno()) os.close(w_end) # Reexecute image exe = os.readlink('/proc/self/exe') os.execl(exe, exe, *(sys.argv + ['==daemon'])) else: ## Front-end (parent) # Wait for a signal rc = None with os.fdopen(r_end, 'rb') as file: file.read(1) # Close the pipe os.close(w_end) # Connect to the server sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) sock.connect(socket_path) def run(): ''' Run as either the daemon (if --daemon or ==daemon) or as a client (otherwise) ''' if daemon > 0: do_daemon(daemon == 2) else: do_client() g, l = globals(), dict(locals()) for key in l: g[key] = l[key] ## Import interface.py with shared globals # Get the Python version v = sys.version_info if (v.major > 3) or ((v.major == 3) and (v.minor >= 4)): # The (new) Python 3.4 way import importlib.util exec(importlib.util.find_spec('interface').loader.get_code('interface'), g) else: # The deprecated legacy way import importlib exec(importlib.find_loader('interface').get_code('interface'), g) ## Load extension and configurations via nightshiftrc # No configuration script has been selected explicitly, # so select one automatically. if config_file is None: # Possible auto-selected configuration scripts, # earlier ones have precedence, we can only select one. files = [] def add_files(var, *ps, multi = False): if var == '~': try: # Get the home (also known as initial) directory of the real user import pwd var = pwd.getpwuid(os.getuid()).pw_dir except: return else: # Resolve environment variable or use empty string if none is selected if (var is None) or (var in os.environ) and (not os.environ[var] == ''): var = '' if var is None else os.environ[var] else: return paths = [var] # Split environment variable value if it is a multi valeu variable if multi and os.pathsep in var: paths = [v for v in var.split(os.pathsep) if not v == ''] # Add files according to patterns for p in ps: p = p.replace('/', os.sep).replace('%', PROGRAM_NAME) for v in paths: files.append(v + p) add_files('XDG_CONFIG_HOME', '/%/%rc', '/%rc') add_files('HOME', '/.config/%/%rc', '/.config/%rc', '/.%rc') add_files('~', '/.config/%/%rc', '/.config/%rc', '/.%rc') add_files('XDG_CONFIG_DIRS', '/%rc', multi = True) add_files(None, '/etc/%rc') for file in files: # If the file we exists, if os.path.exists(file): # select it, config_file = file # and stop trying files with lower precedence. break # As the zeroth argument for the configuration script, # add the configurion script file. Just like the zeroth # command line argument is the invoked command. conf_opts = [config_file] + conf_opts if config_file is not None: code = None # Read configuration script file with open(config_file, 'rb') as script: code = script.read() # Decode configurion script file and add a line break # at the end to ensure that the last line is empty. # If it is not, we will get errors. code = code.decode('utf-8', 'strict') + '\n' # Compile the configuration script, code = compile(code, config_file, 'exec') # and run it, with it have the same # globals as this module, so that it can # not only use want we have defined, but # also redefine it for us. exec(code, g) run()