1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
|
# -*- python -*-
'''
xpybar – xmobar replacement written in python
Copyright © 2014, 2015, 2016, 2017, 2018, 2019 Mattias Andrée (m@maandreese)
This program is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>.
'''
# A xpybar configuration example testing the features of plugins.locks
import time
import threading
from plugins.locks import Locks
OUTPUT, HEIGHT, YPOS, TOP = 0, 12, 24, True
locks = Locks().locks
headers = ('Id', 'Syscall', 'Level', 'Shared', 'Pid', 'Device', 'Inode', 'Range', 'Mountpoint', 'Pathname')
def convert(lock):
rc = []
rc.append(str(lock.lock_id))
rc.append({'FLOCK' : 'flock', 'POSIX' : 'lockf'}[lock.lock_type])
rc.append('mandatory' if lock.mandatory else 'advisory')
rc.append('yes' if lock.shared else 'no')
rc.append(str(lock.pid))
rc.append('%i:%i' % (lock.major, lock.minor))
rc.append(str(lock.inode))
rc.append('%i..%s' % (lock.start, 'eof' if lock.end is None else str(lock.end)))
rc += ['...'] * 2
return rc
text_locks = [convert(lock) for lock in locks]
HEIGHT *= len(text_locks)
def locate_files():
dev_cache = {}
for i, lock in enumerate(locks):
if (lock.major, lock.minor) in dev_cache:
mountpoint, device = dev_cache[(lock.major, lock.minor)]
text_locks[i][-2] = mountpoint
text_locks[i][5] = '%s(%s)' % (text_locks[i][5], device)
else:
device = lock.find_device()
mountpoint = device[1] if device is not None else '(not found)'
device = device[2] if device is not None else None
dev_cache[(lock.major, lock.minor)] = (mountpoint, device)
text_locks[i][-2] = mountpoint
if device is not None:
text_locks[i][5] = '%s(%s)' % (text_locks[i][5], device)
if not mountpoint.startswith('/'):
text_locks[i][-1] = '(not found)'
try:
with open('/proc/%i/comm' % lock.pid, 'rb') as file:
cmd = file.read()
cmd = cmd.decode('utf-8', 'replace')[:-1]
text_locks[i][4] = '%s(%s)' % (text_locks[i][4], cmd)
except:
pass
bar.invalidate()
for i, lock in enumerate(locks):
mountpoint = text_locks[i][-2]
if not mountpoint.startswith('/'):
continue
pathnames = lock.find_pathname(mountpoint)
if len(pathnames) == 0:
text_locks[i][-1] = '(not found)'
else:
text_locks[i][-1] = pathnames[0]
bar.invalidate()
start_ = start
def start():
start_()
xasync(locate_files)
semaphore = threading.Semaphore()
def redraw():
semaphore.acquire(blocking = True)
text = '\n'.join(' │ '.join('%s: %s' % (h, v) for (h, v) in zip(headers, lock)) for lock in text_locks)
bar.clear()
bar.draw_coloured_text(0, 10, 0, 2, text)
semaphore.release()
|