aboutsummaryrefslogblamecommitdiffstats
path: root/examples/plugins/locks
blob: 4cce02d5adf8fead78adc70c297a23c9759c9f77 (plain) (tree)
1
2
3
4
                

                                               
                                                                            













                                                                     






































































                                                                                                           
                        









                                                                                                             
# -*- python -*-
'''
xpybar – xmobar replacement written in python
Copyright © 2014, 2015, 2016, 2017, 2018  Mattias Andrée (maandree@kth.se)

This program is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.

This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
GNU General Public License for more details.

You should have received a copy of the GNU General Public License
along with this program.  If not, see <http://www.gnu.org/licenses/>.
'''

# A xpybar configuration example testing the features of plugins.locks

import time
import threading

from plugins.locks import Locks


OUTPUT, HEIGHT, YPOS, TOP = 0, 12, 24, True


locks = Locks().locks
headers = ('Id', 'Syscall', 'Level', 'Shared', 'Pid', 'Device', 'Inode', 'Range', 'Mountpoint', 'Pathname')
def convert(lock):
    rc = []
    rc.append(str(lock.lock_id))
    rc.append({'FLOCK' : 'flock', 'POSIX' : 'lockf'}[lock.lock_type])
    rc.append('mandatory' if lock.mandatory else 'advisory')
    rc.append('yes' if lock.shared else 'no')
    rc.append(str(lock.pid))
    rc.append('%i:%i' % (lock.major, lock.minor))
    rc.append(str(lock.inode))
    rc.append('%i..%s' % (lock.start, 'eof' if lock.end is None else str(lock.end)))
    rc += ['...'] * 2
    return rc
text_locks = [convert(lock) for lock in locks]

HEIGHT *= len(text_locks)


def locate_files():
    dev_cache = {}
    for i, lock in enumerate(locks):
        if (lock.major, lock.minor) in dev_cache:
            mountpoint, device = dev_cache[(lock.major, lock.minor)]
            text_locks[i][-2] = mountpoint
            text_locks[i][5] = '%s(%s)' % (text_locks[i][5], device)
        else:
            device = lock.find_device()
            mountpoint = device[1] if device is not None else '(not found)'
            device     = device[2] if device is not None else None
            dev_cache[(lock.major, lock.minor)] = (mountpoint, device)
            text_locks[i][-2] = mountpoint
            if device is not None:
                text_locks[i][5] = '%s(%s)' % (text_locks[i][5], device)
        if not mountpoint.startswith('/'):
            text_locks[i][-1] = '(not found)'
        try:
            with open('/proc/%i/comm' % lock.pid, 'rb') as file:
                cmd = file.read()
            cmd = cmd.decode('utf-8', 'replace')[:-1]
            text_locks[i][4] = '%s(%s)' % (text_locks[i][4], cmd)
        except:
            pass
    bar.invalidate()
    for i, lock in enumerate(locks):
        mountpoint = text_locks[i][-2]
        if not mountpoint.startswith('/'):
            continue
        pathnames = lock.find_pathname(mountpoint)
        if len(pathnames) == 0:
            text_locks[i][-1] = '(not found)'
        else:
            text_locks[i][-1] = pathnames[0]
        bar.invalidate()


start_ = start
def start():
    start_()
    xasync(locate_files)


semaphore = threading.Semaphore()
def redraw():
    semaphore.acquire(blocking = True)
    text = '\n'.join(' │ '.join('%s: %s' % (h, v) for (h, v) in zip(headers, lock)) for lock in text_locks)
    bar.clear()
    bar.draw_coloured_text(0, 10, 0, 2, text)
    semaphore.release()