# -*- python -*-
# This example demonstrates how you can make a
# multithreaded configurations script.
# This file is dual-licensed under GNU General Public License
# version 3 and GNU Free Documentation License version 1.3.
# Copyright © 2014 Mattias Andrée (maandree@member.fsf.org)
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see .
# Copyright © 2014 Mattias Andrée (maandree@member.fsf.org)
#
# Permission is granted to copy, distribute and/or modify this document
# under the terms of the GNU Free Documentation License, Version 1.3
# or any later version published by the Free Software Foundation;
# with no Invariant Sections, no Front-Cover Texts, and no Back-Cover Texts.
# You should have received a copy of the GNU General Public License
# along with this software package. If not, see .
import threading
# Geographical coodinates.
# ("Kristall, vertikal accent i glas och stål" (Crystal, vertical accent
# in glass and steal) in this example. A glass obelisk, lit from the inside
# with adjustable colours and a default colour of 5600 K, in the middle
# of a hyperelliptic roundabout.)
latitude, longitude = 59.3326, 18.0652
# Adjust settings by solar elevation.
get_dayness = lambda : sun(latitude, longitude)
# Colour temperature at high day and high night, respectively.
temperature_day, temperature_night = [6500], [3700]
# Gamma of the monitors.
gamma_red = [1.16, 1.10]
gamma_green = [1.15, 1.16]
gamma_blue = [1.11, 1.10]
# Make colour curves thread local.
class threadlocal:
def __init__(self, obj):
self.default = obj
self.tmap = {}
def __getitem__(self, i):
t = threading.get_ident()
if t not in self.tmap:
self.tmap[t] = self.default[:]
return self.tmap[t][i]
def __len__(self):
t = threading.get_ident()
if t not in self.tmap:
self.tmap[t] = self.default[:]
return len(self.tmap[t])
def __setitem__(self, i, x):
t = threading.get_ident()
if t not in self.tmap:
self.tmap[t] = self.default[:]
self.tmap[t][i] = x
def __delitem__(self, i):
t = threading.get_ident()
if t not in self.tmap:
self.tmap[t] = self.default[:]
del self.tmap[t][x]
r_curve = threadlocal([i / (i_size - 1) for i in range(i_size)])
g_curve = threadlocal([i / (i_size - 1) for i in range(i_size)])
b_curve = threadlocal([i / (i_size - 1) for i in range(i_size)])
## cmf_10deg uses non-thread safe cache, run once in advance so it is not done by the threads.
cmf_10deg(0)
# Thread synchronisation barrier.
barrier = threading.Barrier(len(gamma_red) + 1)
# Help functions for colour interpolation.
interpol, purify = None, None
# Parameters in `periodically`.
fade_ = None
# randr is not threadsafe, and absolutely not if you have multiple screens.
# drm can be threadsafe but is by default not.
# This semaphore is used to make sure that two threads are not accessing randr or drm at the same time.
flush_semaphore = threading.Semaphore()
def adjust(m):
'''
Adjust monitor colours
@param m:int The CRTC index
'''
while True:
# Wait for start cue.
barrier.wait()
# Calculate temperature.
temperature_ = interpol(temperature_day, temperature_night)
if fade_ is not None:
temperature_ = purify(temperature_, 6500)
# Remove settings from last run.
start_over()
# Apply colour temperature using raw CIE 1964 10 degree CMF data with interpolation.
temperature(temperature_, lambda t : divide_by_maximum(cmf_10deg(t)))
# Clip colour curves to fit [0, 1] to avoid errors by complex numbers.
clip()
# Apply gamma correction to monitor.
gamma(gamma_red[m], gamma_green[m], gamma_blue[m])
# Flush settings to monitor.
flush_semaphore.acquire()
(drm if ttymode else randr)(m)
flush_semaphore.release()
# Signal thread completion.
barrier.wait()
# Create threads.
for m in range(len(gamma_red)):
thread = threading.Thread(target = adjust, args = (m,))
thread.setDaemon(True)
thread.start()
last_dayness = None
def periodically(year, month, day, hour, minute, second, weekday, fade):
'''
Invoked periodically
If you want to control at what to invoke this function next time
you can set the value of the global variable `wait_period` to the
number of seconds to wait before invoking this function again.
The value does not need to be an integer.
@param year:int The year
@param month:int The month, 1 = January, 12 = December
@param day:int The day, minimum value is 1, probable maximum value is 31 (*)
@param hour:int The hour, minimum value is 0, maximum value is 23
@param minute:int The minute, minimum value is 0, maximum value is 59
@param second:int The second, minimum value is 0, probable maximum value is 60 (**)
@param weekday:int The weekday, 1 = Monday, 7 = Sunday
@param fade:float? Blueshift can use this function to fade into a state when it start
or exits. `fade` can either be negative, zero or positive or `None`,
but the magnitude of value cannot exceed 1. When Blueshift starts,
this function will be invoked multiple with the time parameters
of the time it is invoked and each time `fade` will increase towards
1, starting at 0, when the value is 1, the settings should be applied
to 100 %. After this this function will be invoked once again with
`fade` being `None`. When Blueshift exits the same behaviour is used
except, `fade` decrease towards -1 but start slightly below 0, when
-1 is reached all settings should be normal. Then Blueshift will NOT
invoke this function with `fade` being `None`, instead it will by
itself revert all settings and quit.
(*) Can be exceeded if the calendar system is changed, like in 1712-(02)Feb-30
(**) See https://en.wikipedia.org/wiki/Leap_second
'''
global last_dayness, wait_period, interpol, purify, fade_
dayness = get_dayness()
# Do not do unnecessary work.
if fade is None:
if dayness == last_dayness:
return
last_dayness = dayness
# Pass parameters to threads.
fade_ = fade
# Help functions for colour interpolation.
interpol = lambda _day, _night : _day[m % len(_day)] * dayness + _night[m % len(_night)] * (1 - dayness)
purify = lambda current, pure : current * abs(fade) + pure * (1 - abs(fade))
# Signal all threads to start.
barrier.wait()
# Wait for all threads.
barrier.wait()
def reset():
'''
Invoked to reset the displays
'''
for m in range(len(gamma_red)):
# Remove settings from last run.
start_over()
# Apply gamma correction to monitor.
gamma(gamma_red[m], gamma_green[m], gamma_blue[m])
# Flush settings to monitor.
(drm if ttymode else randr)(m)
# Set transition time, 0 on high day and 5 seconds on high night.
fadein_time = 5 * (1 - get_dayness())
# Do 10 changes per second.
fadein_steps = fadein_time * 10
# Transition on exit in the same way, calculated on exit.
old_signal_SIGTERM = signal_SIGTERM
if 'SIGTERM' not in conf_storage:
conf_storage['SIGTERM'] = old_signal_SIGTERM
else:
old_signal_SIGTERM = conf_storage['SIGTERM']
def signal_SIGTERM(signum, frame):
global fadeout_time, fadeout_steps
fadeout_time = 5 * (1 - get_dayness())
fadeout_steps = fadeout_time * 10
old_signal_SIGTERM(signum, frame)