# -*- python -*- # This example demonstrates how you can make a # multithreaded configurations script. # This file is dual-licensed under GNU General Public License # version 3 and GNU Free Documentation License version 1.3. # Copyright © 2014, 2015, 2016, 2017 Mattias Andrée (m@maandree.se) # # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by # the Free Software Foundation, either version 3 of the License, or # (at your option) any later version. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU General Public License # along with this program. If not, see . # Copyright © 2014, 2015, 2016, 2017 Mattias Andrée (m@maandree.se) # # Permission is granted to copy, distribute and/or modify this document # under the terms of the GNU Free Documentation License, Version 1.3 # or any later version published by the Free Software Foundation; # with no Invariant Sections, no Front-Cover Texts, and no Back-Cover Texts. # You should have received a copy of the GNU General Public License # along with this software package. If not, see . import threading # Geographical coodinates. # ("Kristall, vertikal accent i glas och stål" (Crystal, vertical accent # in glass and steal) in this example. A glass obelisk, lit from the inside # with adjustable colours and a default colour of 5600 K, in the middle # of a hyperelliptic roundabout.) latitude, longitude = 59.3326, 18.0652 # Adjust settings by solar elevation. get_dayness = lambda : sun(latitude, longitude) # Colour temperature at high day and high night, respectively. temperature_day, temperature_night = [6500], [3700] # Gamma of the monitors. gamma_red = [1.16, 1.10] gamma_green = [1.15, 1.16] gamma_blue = [1.11, 1.10] # Make colour curves thread local. class threadlocal: def __init__(self, obj): self.default = obj self.tmap = {} def __getitem__(self, i): t = threading.get_ident() if t not in self.tmap: self.tmap[t] = self.default[:] return self.tmap[t][i] def __len__(self): t = threading.get_ident() if t not in self.tmap: self.tmap[t] = self.default[:] return len(self.tmap[t]) def __setitem__(self, i, x): t = threading.get_ident() if t not in self.tmap: self.tmap[t] = self.default[:] self.tmap[t][i] = x def __delitem__(self, i): t = threading.get_ident() if t not in self.tmap: self.tmap[t] = self.default[:] del self.tmap[t][x] r_curve = threadlocal([i / (i_size - 1) for i in range(i_size)]) g_curve = threadlocal([i / (i_size - 1) for i in range(i_size)]) b_curve = threadlocal([i / (i_size - 1) for i in range(i_size)]) ## cmf_10deg uses non-thread safe cache, run once in advance so it is not done by the threads. cmf_10deg(0) # Thread synchronisation barrier. barrier = threading.Barrier(len(gamma_red) + 1) # Help functions for colour interpolation. interpol, purify = None, None # Parameters in `periodically`. fade_ = None # randr is not threadsafe, and absolutely not if you have multiple screens. # drm can be threadsafe but is by default not. # This semaphore is used to make sure that two threads are not accessing randr or drm at the same time. flush_semaphore = threading.Semaphore() def adjust(m): ''' Adjust monitor colours @param m:int The CRTC index ''' while True: # Wait for start cue. barrier.wait() # Calculate temperature. temperature_ = interpol(temperature_day, temperature_night) if fade_ is not None: temperature_ = purify(temperature_, 6500) # Remove settings from last run. start_over() # Apply colour temperature using raw CIE 1964 10 degree CMF data with interpolation. temperature(temperature_, lambda t : divide_by_maximum(cmf_10deg(t))) # Clip colour curves to fit [0, 1] to avoid errors by complex numbers. clip() # Apply gamma correction to monitor. gamma(gamma_red[m], gamma_green[m], gamma_blue[m]) # Flush settings to monitor. flush_semaphore.acquire() (drm if ttymode else randr)(m) flush_semaphore.release() # Signal thread completion. barrier.wait() # Create threads. for m in range(len(gamma_red)): thread = threading.Thread(target = adjust, args = (m,)) thread.setDaemon(True) thread.start() last_dayness = None def periodically(year, month, day, hour, minute, second, weekday, fade): ''' Invoked periodically If you want to control at what to invoke this function next time you can set the value of the global variable `wait_period` to the number of seconds to wait before invoking this function again. The value does not need to be an integer. @param year:int The year @param month:int The month, 1 = January, 12 = December @param day:int The day, minimum value is 1, probable maximum value is 31 (*) @param hour:int The hour, minimum value is 0, maximum value is 23 @param minute:int The minute, minimum value is 0, maximum value is 59 @param second:int The second, minimum value is 0, probable maximum value is 60 (**) @param weekday:int The weekday, 1 = Monday, 7 = Sunday @param fade:float? Blueshift can use this function to fade into a state when it start or exits. `fade` can either be negative, zero or positive or `None`, but the magnitude of value cannot exceed 1. When Blueshift starts, this function will be invoked multiple with the time parameters of the time it is invoked and each time `fade` will increase towards 1, starting at 0, when the value is 1, the settings should be applied to 100 %. After this this function will be invoked once again with `fade` being `None`. When Blueshift exits the same behaviour is used except, `fade` decrease towards -1 but start slightly below 0, when -1 is reached all settings should be normal. Then Blueshift will NOT invoke this function with `fade` being `None`, instead it will by itself revert all settings and quit. (*) Can be exceeded if the calendar system is changed, like in 1712-(02)Feb-30 (**) See https://en.wikipedia.org/wiki/Leap_second ''' global last_dayness, wait_period, interpol, purify, fade_ dayness = get_dayness() # Do not do unnecessary work. if fade is None: if dayness == last_dayness: return last_dayness = dayness # Pass parameters to threads. fade_ = fade # Help functions for colour interpolation. interpol = lambda _day, _night : _day[m % len(_day)] * dayness + _night[m % len(_night)] * (1 - dayness) purify = lambda current, pure : current * abs(fade) + pure * (1 - abs(fade)) # Signal all threads to start. barrier.wait() # Wait for all threads. barrier.wait() def reset(): ''' Invoked to reset the displays ''' for m in range(len(gamma_red)): # Remove settings from last run. start_over() # Apply gamma correction to monitor. gamma(gamma_red[m], gamma_green[m], gamma_blue[m]) # Flush settings to monitor. (drm if ttymode else randr)(m) # Set transition time, 0 on high day and 5 seconds on high night. fadein_time = 5 * (1 - get_dayness()) # Do 10 changes per second. fadein_steps = fadein_time * 10 # Transition on exit in the same way, calculated on exit. old_signal_SIGTERM = signal_SIGTERM if 'SIGTERM' not in conf_storage: conf_storage['SIGTERM'] = old_signal_SIGTERM else: old_signal_SIGTERM = conf_storage['SIGTERM'] def signal_SIGTERM(signum, frame): global fadeout_time, fadeout_steps fadeout_time = 5 * (1 - get_dayness()) fadeout_steps = fadeout_time * 10 old_signal_SIGTERM(signum, frame)