# -*- python -*- # This example demonstrates how you can make a # multithreaded configurations script # This file is dual-licensed under GNU General Public License # version 3 and GNU Free Documentation License version 1.3. # Copyright © 2014 Mattias Andrée (maandree@member.fsf.org) # # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by # the Free Software Foundation, either version 3 of the License, or # (at your option) any later version. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU General Public License # along with this program. If not, see . # Copyright © 2014 Mattias Andrée (maandree@member.fsf.org) # # Permission is granted to copy, distribute and/or modify this document # under the terms of the GNU Free Documentation License, Version 1.3 # or any later version published by the Free Software Foundation; # with no Invariant Sections, no Front-Cover Texts, and no Back-Cover Texts. # You should have received a copy of the GNU General Public License # along with this software package. If not, see . import threading # Geographical coodinates. # (KTH building D computer laboratories in this example.) latitude, longitude = 59.3472, 18.0728 # Adjust settings by solar elevation. get_dayness = lambda : sun(latitude, longitude) # Colour temperature at high day and high night, respectively. temperature_day, temperature_night = [6500], [3700] # Gamma of the monitors. gamma_red = [1.16, 1.10] gamma_green = [1.15, 1.16] gamma_blue = [1.11, 1.10] # Make colour curves thread local. class threadlocal: def __init__(self, obj): self.default = obj self.tmap = {} def __getitem__(self, i): t = threading.get_ident() if t not in self.tmap: self.tmap[t] = self.default[:] return self.tmap[t][i] def __len__(self): t = threading.get_ident() if t not in self.tmap: self.tmap[t] = self.default[:] return len(self.tmap[t]) def __setitem__(self, i, x): t = threading.get_ident() if t not in self.tmap: self.tmap[t] = self.default[:] self.tmap[t][i] = x def __delitem__(self, i): t = threading.get_ident() if t not in self.tmap: self.tmap[t] = self.default[:] del self.tmap[t][x] r_curve = threadlocal([i / (i_size - 1) for i in range(i_size)]) g_curve = threadlocal([i / (i_size - 1) for i in range(i_size)]) b_curve = threadlocal([i / (i_size - 1) for i in range(i_size)]) ## cmf_10deg uses non-thread safe cache, run once in advance so it is not done by the threads. cmf_10deg(0) # Thread synchronisation barrier. barrier = threading.Barrier(len(gamma_red) + 1) # Help functions for colour interpolation. interpol, purify = None, None # Parameters in `periodically`. fade_ = None # randr is not threadsafe, and absolutely not if you have multiple screens. # drm can be threadsafe but is by default not. # This semaphore is used to make sure that two threads are not accessing randr or drm at the same time. flush_semaphore = threading.Semaphore() def adjust(m): ''' Adjust monitor colours @param m:int The CRTC index ''' while True: # Wait for start cue. barrier.wait() # Calculate temperature. temperature_ = interpol(temperature_day, temperature_night) if fade_ is not None: temperature_ = purify(temperature_, 6500) # Remove settings from last run. start_over() # Apply colour temperature using raw CIE 1964 10 degree CMF data with interpolation. temperature(temperature_, lambda t : divide_by_maximum(cmf_10deg(t))) # Clip colour curves to fit [0, 1] to avoid errors by complex numbers. clip() # Apply gamma correction to monitor. gamma(gamma_red[m], gamma_green[m], gamma_blue[m]) # Flush settings to monitor. flush_semaphore.acquire() (drm if ttymode else randr)(m) flush_semaphore.release() # Signal thread completion. barrier.wait() # Create threads. for m in range(len(gamma_red)): thread = threading.Thread(target = adjust, args = (m,)) thread.setDaemon(True) thread.start() last_dayness = None def periodically(year, month, day, hour, minute, second, weekday, fade): ''' :(int, int, int, int, int, int, int, float?)?→void Place holder for periodically invoked function Invoked periodically If you want to control at what to invoke this function next time you can set the value of the global variable `wait_period` to the number of seconds to wait before invoking this function again. The value does not need to be an integer. @param year:int The year @param month:int The month, 1 = January, 12 = December @param day:int The day, minimum value is 1, probable maximum value is 31 (*) @param hour:int The hour, minimum value is 0, maximum value is 23 @param minute:int The minute, minimum value is 0, maximum value is 59 @param second:int The second, minimum value is 0, probable maximum value is 60 (**) @param weekday:int The weekday, 1 = Monday, 7 = Sunday @param fade:float? Blueshift can use this function to fade into a state when it start or exits. `fade` can either be negative, zero or positive or `None`, but the magnitude of value cannot exceed 1. When Blueshift starts, this function will be invoked multiple with the time parameters of the time it is invoked and each time `fade` will increase towards 1, starting at 0, when the value is 1, the settings should be applied to 100 %. After this this function will be invoked once again with `fade` being `None`. When Blueshift exits the same behaviour is used except, `fade` decrease towards -1 but start slightly below 0, when -1 is reached all settings should be normal. Then Blueshift will NOT invoke this function with `fade` being `None`, instead it will by itself revert all settings and quit. (*) Can be exceeded if the calendar system is changed, like in 1712-(02)Feb-30 (**) See https://en.wikipedia.org/wiki/Leap_second ''' global last_dayness, wait_period, interpol, purify, fade_ dayness = get_dayness() # Do not do unnecessary work. if fade is None: if dayness == last_dayness: return last_dayness = dayness # Pass parameters to threads. fade_ = fade # Help functions for colour interpolation. interpol = lambda _day, _night : _day[m % len(_day)] * dayness + _night[m % len(_night)] * (1 - dayness) purify = lambda current, pure : current * abs(fade) + pure * (1 - abs(fade)) # Signal all threads to start. barrier.wait() # Wait for all threads. barrier.wait() def reset(): ''' Invoked to reset the displays ''' for m in range(len(gamma_red)): # Remove settings from last run. start_over() # Apply gamma correction to monitor. gamma(gamma_red[m], gamma_green[m], gamma_blue[m]) # Flush settings to monitor. (drm if ttymode else randr)(m) # Set transition time, 0 on high day and 5 seconds on high night. fadein_time = 5 * (1 - get_dayness()) # Do 10 changes per second. fadein_steps = fadein_time * 10 # Transition on exit in the same way, calculated on exit. old_signal_SIGTERM = signal_SIGTERM if 'SIGTERM' not in conf_storage: conf_storage['SIGTERM'] = old_signal_SIGTERM else: old_signal_SIGTERM = conf_storage['SIGTERM'] def signal_SIGTERM(signum, frame): global fadeout_time, fadeout_steps fadeout_time = 5 * (1 - get_dayness()) fadeout_steps = fadeout_time * 10 old_signal_SIGTERM(signum, frame)