# -*- python -*-
# This example uses a text based configuration file to make
# it easier for non-programmers to use Blueshift. It is however
# rather limited, the lisp-esque example is a bit more complex
# but do much more. It will # read a file with the same pathname
# just with ‘.conf’ # appended (‘textconf.conf’ in this case.)
# However, if the filename of this file ends with with ‘rc’,
# that part will be removed, for example, if you rename this
# script to ‘~/.blueshiftrc’ it will read ‘~/.blueshift.conf’
# rather than ‘~/.blueshiftrc.conf’.
# Copyright © 2014, 2015, 2016, 2017 Mattias Andrée (m@maandree.se)
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
import os
import sys
import time
import subprocess
# Get the name of .conf file
conf = '%s.conf' % (config_file[:-2] if config_file.endswith('rc') else config_file)
# Read .conf file
with open(conf, 'r') as file:
conf = file.read()
# Parse .conf file
sections = {'blueshift' : []}
section = []
sections['blueshift'].append(section)
def remove_comment(text, spawn_aware):
if (';' in text) or ('#' in text):
if not spawn_aware:
return text[:text.replace('#', ';').find(';')]
buf, cmd, stack = '', 0, 0
for c in text:
if cmd > 0:
if c == '(':
stack += 1
elif c == ')':
stack -= 1
if cmd == 1:
cmd = 2 if c == '(' else 0
elif (c == ')') and (stack == 0):
cmd = 0
elif c == '$':
cmd = 1
stack = 0
elif (c == ';') or (c == '#'):
break
buf += c
return buf
return text
for line in conf.split('\n'):
line = line.strip()
if line.startswith('[') and remove_comment(line, False).rstrip().endswith(']'):
line = remove_comment(line, False).rstrip()
section_name = line[1 : -1].strip().lower()
if section_name not in sections:
sections[section_name] = []
section = []
sections[section_name].append(section)
elif line.startswith(';') or line.startswith('#'):
continue
elif ('=' in line) or (':' in line):
line = remove_comment(line, True)
if ('=' in line) or (':' in line):
eq = len(line) if '=' not in line else line.find('=')
cl = len(line) if ':' not in line else line.find(':')
eq = min(eq, cl)
section.append((line[:eq].strip().lower(), line[eq + 1:].strip()))
elif len(line.strip()) > 0:
sys.stderr.buffer.write(('Malformated line: %s\n' % line).encode('utf-8'))
elif len(line.strip()) > 0:
sys.stderr.buffer.write(('Malformated line: %s\n' % line).encode('utf-8'))
sys.stderr.buffer.flush()
# Default values
location = None
adjustment_method_x = ['randr']
adjustment_method_tty = ['drm']
points = ['solar', '3', '-6']
# List of adjustments and temporary monitor information
adjustments = []
monitors = []
crtc = None
screen = None
name = None
edid = None
bldev = []
blmin = []
def parse_value(value):
'''
Parse a setting value
@param value:str The value to parse
@return :(list<str>, bool, bool, bool) The words in the value string, with commands spawned,
and with 'linear', 'cie' and 'default' filtered out,
and their
existance is put as booleans
'''
def spawn(cmd):
'''
Spawn an external process and read its output, but only the first line
@param cmd:str The command to spawn
@return :str? The first line of the command's output, `None` on failure
'''
proc = subprocess.Popen(['sh', '-c', cmd], stdout = subprocess.PIPE, stderr = sys.stderr)
output = proc.communicate()[0].split('\n')[0]
if (proc.returncode == 0) and (len(output) > 0):
return output
return None
words, buf, cmd, stack = [], '', None, 0
for c in value:
if cmd is not None:
if c == '(':
stack += 1
elif c == ')':
stack -= 1
if cmd == '':
if c == '(':
cmd += '('
else:
cmd = None
buf += '$'
else:
cmd += c
if (c == ')') and (stack == 0):
cmd = cmd[1 : -1]
cmd = spawn(cmd)
if cmd is not None:
buf = cmd
cmd = None
elif c == ' ':
if not buf == '':
words.append(buf)
buf = ''
elif c == '$':
cmd = ''
stack = 0
else:
buf += c
if not buf == '':
words.append(buf)
return ([w for w in words if w not in ['linear', 'cie', 'default']],
'linear' in words, 'cie' in words, 'default' in words)
# Evaluate .conf file
def make_f(f, value, default):
'''
Make an adjustment function
@param f:(*¿V??)→void The function that makes the adjustment
@param value:list<¿V??> The values for each time point
@param default:list<¿V??> The default value
'''
ff = None
value_ = []
for val in value:
value_ += val
if any(map(lambda v : v is None, value_ + default)):
def ff(t, a):
val0 = value[(int(t) + 0) % len(value)]
val1 = value[(int(t) + 1) % len(value)]
t %= 1
val = zip(val0, val1, default)
def interpol(v0, v1, d):
if (v0 is None) or (v1 is None) or (d is None):
if ( d is None) and a == 0: return None
if (v0 is None) and t == 0: return None
if (v1 is None) and t == 1: return None
v0 = v0 * (1 - t) if v0 is not None else 0
v1 = v1 * t if v1 is not None else 0
return v0 + v1
val = [interpol(v0, v1, d) for v0, v1, d in val]
f(*val)
else:
def ff(t, a):
val0 = value[(int(t) + 0) % len(value)]
val1 = value[(int(t) + 1) % len(value)]
t %= 1
val = zip(val0, val1, default)
val = [(v0 * (1 - t) + v1 * t) * a + (1 - a) * d for v0, v1, d in val]
f(*val)
return ff
def float3(value):
'''
Parse a string representation of a float trio
@param value:str The float trio as a string
@return :[float?, float?, float?] The float trio as a float list
'''
value = [None if v == 'none' else float(v) for v in value.split(':')]
if len(value) < 3:
value *= 3
return value[:3]
def float6(value):
'''
Parse a string representation of a float pair-trio
@param value:str The float pair-trio as a string
@return :[float?]*6 The float pair-trio as a float list
'''
(part1, part2) = [[float(v) for v in val.split(':')] for val in value.split('..')]
if len(part1) < 3: part1 *= 3
if len(part2) < 3: part2 *= 3
part1 = part1[:3]
part2 = part2[:3]
value = []
for p, q in zip(part1, part2):
value.append(p)
value.append(q)
return value
backlight_value = 1
def add_adjustments(adjsections, adjustments):
'''
Add adjustions from a section to a list
@param adjsections:list<list<(str, str)>> The sections
@param adjustments:list<(float, float)→void> The list to fill with adjustments
'''
global location, points, adjustment_method_x, adjustment_method_tty, crtc, screen, bldev, blmin, name, edid
for section in adjsections:
for (setting, value) in section:
(value, linear, cie, default) = parse_value(value)
new_adjustment = None
if linear:
adjustments.append(lambda _t, _a: linearise())
if setting == 'location': location = value
elif setting == 'points': points = value
elif setting == 'adjustment-method-x': adjustment_method_x = value
elif setting == 'adjustment-method-tty': adjustment_method_tty = value
elif setting == 'crtc': crtc = value
elif setting == 'screen': screen = value
elif setting == 'card': screen = value
elif setting == 'name': name = value
elif setting == 'edid': edid = value
elif setting == 'backlight-device': bldev = value
elif setting == 'backlight-minimum': blmin = [int(v) for v in value]
elif setting == 'backlight':
def f(x):
global backlight_value
backlight_value *= f
new_adjustment = make_f(f, [[float(v)] for v in value], [1])
elif setting == 'temperature':
f_ = cie_temperature if cie else temperature
f = lambda x : f_(x, lambda t : divide_by_maximum(cmf_10deg(t)))
new_adjustment = make_f(f, [[float(v)] for v in value], [6500])
elif setting == 'contrast':
f = cie_contrast if cie else rgb_contrast
new_adjustment = make_f(f, [float3(v) for v in value], 3 * [1])
elif setting == 'brightness':
f = cie_brightness if cie else rgb_brightness
new_adjustment = make_f(f, [float3(v) for v in value], 3 * [1])
elif setting == 'gamma':
def f(*levels):
clip()
gamma(*levels)
new_adjustment = make_f(f, [float3(v) for v in value], 3 * [1])
elif setting == 'negative':
def f(*values):
negative(*[not v == 0 for v in values])
new_adjustment = make_f(f, [float3(v) for v in value], 3 * [0])
elif setting == 'invert':
def f(*values):
(cie_invert if cie else rgb_invert)(*[not v == 0 for v in values])
new_adjustment = make_f(f, [float3(v) for v in value], 3 * [0])
elif setting == 'sigmoid':
new_adjustment = make_f(sigmoid, [float3(v) for v in value], 3 * [None])
elif setting == 'limits':
f = cie_limits if cie else rgb_limits
new_adjustment = make_f(f, [float6(v) for v in value], 3 * [0, 1])
elif setting == 'icc':
def noop():
pass
profiles = [noop if val == 'none' else load_load(val) for val in value]
new_adjustment = make_icc_interpolation(profiles)
elif setting == 'monitor':
add_adjustments(sections[' '.join(['monitor'] + value)], adjustments)
else:
sys.stderr.buffer.write(('Setting not recognised: %s\n' % setting).encode('utf-8'))
sys.stderr.buffer.flush()
if new_adjustment is not None:
if default:
new_adjustment_ = new_adjustment
def f(t, a):
new_adjustment_(t, 1)
new_adjustment = f
adjustments.append(new_adjustment)
if linear:
adjustments.append(lambda _t, _a: standardise())
add_adjustments(sections['blueshift'], adjustments)
adjustment_method = adjustment_method_tty if ttymode else adjustment_method_x
adjustment_method = adjustment_method[0]
list_method = 'randr' if adjustment_method == 'vidmode' else adjustment_method
screen_list = None
for section in sections[adjustment_method]:
output_adjustments = []
crtc, screen, name, edid, bldev, blmin = None, None, None, None, [], []
add_adjustments([section], output_adjustments)
crtc_screen = (crtc is None) or (screen is None)
name_edid = (name is not None) or (edid is not None)
if (screen_list is None) and (crtc_screen or name_edid):
screen_list = list_screens(list_method)
if screen is None:
screen = list(range(len(screen_list)))
else:
screen = [int(s) for s in screen]
crtcs = {}
for s in screen:
crtcs[s] = []
if crtc is not None:
crtcs[s] += [int(c) for c in crtc]
elif (name is None) and (edid is None):
crtcs[s] += list(range(screen_list[s].crtc_count))
if name is not None:
s = crtcs[s]
crtcs[s] += [(d.crtc for d in screen_list.find_by_name(n) if d.crtc not in s) for n in name]
if edid is not None:
s = crtcs[s]
crtcs[s] += [(d.crtc for d in screen_list.find_by_edid(e) if d.crtc not in s) for e in edid]
monitors.append((crtcs, screen, bldev, blmin, output_adjustments))
# Get gamma adjustment/reader functions
get_method = {'randr' : randr_get, 'vidmode' : vidmode_get, 'drm' : drm_get}
set_method = {'randr' : randr, 'vidmode' : vidmode, 'drm' : drm }
get_method = get_method[adjustment_method]
set_method = set_method[adjustment_method]
# Save gamma ramps
saved = {}
for crtcs, screens, _bldev, _blmin, _adj in monitors:
for screen in screens:
if screen not in saved:
saved[screen] = {}
saved_ = saved[screen]
for crtc in crtcs[screen]:
saved_[crtc] = get_method(crtc, screen)
# Evaluate location
latitude, longitude = None, None
if 'solar' in points:
if (location is None) or (len(location) == 0):
sys.stderr.buffer.write(('Location missing\n').encode('utf-8'))
sys.stderr.buffer.flush()
sys.exit(1)
try:
if not len(location) == 2:
raise Exception()
location = [float(c) for c in location]
except:
sys.stderr.buffer.write(('Malformation location\n').encode('utf-8'))
sys.stderr.buffer.flush()
sys.exit(1)
if not ((-90 <= location[0] <= 90) and (-180 <= location[0] <= 180)):
sys.stderr.buffer.write(('Invalid location\n').encode('utf-8'))
sys.stderr.buffer.flush()
sys.exit(1)
(latitude, longitude) = location
# Evaluate point ## TODO Make this a standard part of Blueshift
if ('solar' not in points) and ('time' not in points):
sys.stderr.buffer.write(('Invalid points settings\n').encode('utf-8'))
sys.stderr.buffer.flush()
sys.exit(1)
reduce_points = 'reduce' in points
solar_points = 'solar' in points
# TODO support brackets (see textconf.conf)
def t(point):
point = [float(p) for p in point.split(':')]
while len(point) > 3:
point.append(0)
v = sum([v * 60 ** (2 - i) for i, v in enumerate(point)])
return v % (24 * 60 * 60)
points = [float(p) if solar_points else t(p) for p in points if p not in ['solar', 'time', 'reduce']]
points = list(enumerate(points))
if reduce_points:
n = len(points) - 1
points = [(r / n, v) for r, v in points]
get_timepoint = None
points.sort(key = lambda x : x[1])
if not solar_points: # TODO does these really handle `reduce` correctly?
one_day = 24 * 60 * 60
points.append((points[0][0], points[0][1] + one_day))
points = [(points[-2][0], points[-2][1] - one_day)] + points
def get_timepoint():
v = time.time() % one_day
for i in range(len(points) - 1):
a, b = points[i][1], points[i + 1][1]
if a <= v <= b:
a_, b_ = points[i][0], points[i + 1][0]
v = (v - a) / (b - a)
if (a_ + 1 == b_) or (b_ == 0):
return v + points[i][0]
else:
return points[i][1] - v
return 1 # should never happen
if solar_points:
def get_timepoint():
v = solar_elevation(latitude, longitude)
for i in range(len(points) - 1):
a, b = points[i][1], points[i + 1][1]
if a <= v <= b:
a_, b_ = points[i][0], points[i + 1][0]
v = (v - a) / (b - a)
if (a_ + 1 == b_) or (b_ == 0):
return v + points[i][0]
else:
return points[i][1] - v
if v < points[0][1]:
return points[0][0]
return points[-1][0]
wait_period = 5
'''
:float The number of seconds to wait before invoking `periodically` again
'''
# Create backlight device connection
adjbl = False
if 'PATH' in os.environ:
path = os.environ['PATH'].split(os.path.pathsep)
sep = os.path.sep
for p in path:
f = p + sep + 'adjbacklight'
if os.path.exists(f):
if os.access(f, os.X_OK):
adjbl = True
break
makebl = lambda dev, blmin : Backlight(dev, adjbacklight = adjbl, minimum = blmin)
makebls = lambda dev, blmin : [makebl(d, b) for d, b in zip(dev, blmin) if not d == 'None']
monitors = [(crtcs, screens, makebls(dev, blmin), adj) for crtcs, screens, dev, blmin, adj in monitors]
# Save backlight settings
saved_backlight_ = [[(b, b.brightness) for b in bl] for _c, _s, bl, _a in monitors if bl is not None]
saved_backlight = []
for sb in saved_backlight_:
saved_backlight += sb
def periodically(year, month, day, hour, minute, second, weekday, fade):
'''
Invoked periodically
If you want to control at what to invoke this function next time
you can set the value of the global variable `wait_period` to the
number of seconds to wait before invoking this function again.
The value does not need to be an integer.
@param year:int The year
@param month:int The month, 1 = January, 12 = December
@param day:int The day, minimum value is 1, probable maximum value is 31 (*)
@param hour:int The hour, minimum value is 0, maximum value is 23
@param minute:int The minute, minimum value is 0, maximum value is 59
@param second:int The second, minimum value is 0, probable maximum value is 60 (**)
@param weekday:int The weekday, 1 = Monday, 7 = Sunday
@param fade:float? Blueshift can use this function to fade into a state when it start
or exits. `fade` can either be negative, zero or positive or `None`,
but the magnitude of value cannot exceed 1. When Blueshift starts,
this function will be invoked multiple with the time parameters
of the time it is invoked and each time `fade` will increase towards
1, starting at 0, when the value is 1, the settings should be applied
to 100 %. After this this function will be invoked once again with
`fade` being `None`. When Blueshift exits the same behaviour is used
except, `fade` decrease towards -1 but start slightly below 0, when
-1 is reached all settings should be normal. Then Blueshift will NOT
invoke this function with `fade` being `None`, instead it will by
itself revert all settings and quit.
(*) Can be exceeded if the calendar system is changed, like in 1712-(02)Feb-30
(**) See https://en.wikipedia.org/wiki/Leap_second
'''
global backlight_value
start_over()
alpha = 1 if fade is None else abs(fade)
timepoint = get_timepoint()
backlight_value = 1
for adjustment in adjustments:
adjustment(timepoint, alpha)
stored = store()
stored_backlight_value = backlight_value
for crtcs, screens, bldevs, output_adjustments in monitors:
restore(stored)
backlight_value = stored_backlight_value
for adjustment in output_adjustments:
adjustment(timepoint, alpha)
for screen in screens:
set_method(*(crtcs[screen]), screen = screen)
for bldev in bldevs:
bldev.brightness = backlight_value * bldev.maximum
def reset():
'''
Invoked to reset the displays
'''
for crtcs, screens, _bldevs, _adj in monitors:
for screen in screens:
saved_ = saved[screen]
for crtc in crtcs[screen]:
start_over()
saved_[crtc]()
set_method(crtc, screen = screen)
for dev, lvl in saved_backlight:
dev.brightness = lvl