aboutsummaryrefslogtreecommitdiffstats
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to '')
-rw-r--r--src/bus.py132
-rw-r--r--src/native_bus.pyx189
2 files changed, 283 insertions, 38 deletions
diff --git a/src/bus.py b/src/bus.py
index b07fe82..f59ad1b 100644
--- a/src/bus.py
+++ b/src/bus.py
@@ -156,32 +156,96 @@ class Bus:
raise self.__oserror(e)
+ def write_timed(self, message : str, timeout : float, clock_id : int = None):
+ '''
+ Broadcast a message a bus
+
+ @param message:str The message to write, may not be longer than 2047 bytes
+ after UTF-8 encoding
+ @param timeout:float The time the function shall fail with `os.errno.EAGAIN`,
+ if it has not already completed
+ @param clock_id:int? The clock `timeout` is measured in, it must be a
+ predictable clock, if `None`, `timeout` is measured in
+ relative time instead of absolute time
+ '''
+ from native_bus import bus_write_timed_wrapped
+ if clock_id is None:
+ import time
+ clock_id = time.CLOCK_MONOTONIC_RAW
+ timeout += timeout.clock_gettime(clock_id)
+ (r, e) = bus_write_timed_wrapped(self.bus, message, timeout, clock_id)
+ if r == -1:
+ raise self.__oserror(e)
+
+
def read(self, callback : callable, user_data = None):
'''
Listen (in a loop, forever) for new message on a bus
- @param callback Function to call when a message is received, the
- input parameters will be the read message and
- `user_data` from the function's [`Bus.read`] parameter
- with the same name. The message must have been parsed
- or copied when `callback` returns as it may be over
- overridden after that time. `callback` should
- return either of the the values:
- 0: stop listening
- 1: continue listening
- -1: an error has occurred
- However, the function [`Bus.read`] will invoke
- `callback` with `message` set to `None` one time
- directly after it has started listening on the bus.
- This is to the the program now it can safely continue
- with any action that requires that the programs is
- listening on the bus.
- NB! The received message will not be decoded from UTF-8
- @param user_data See description of `callback`
+ @param callback:(message:str?, user_data:¿U?=user_data)→int
+ Function to call when a message is received, the
+ input parameters will be the read message and
+ `user_data` from the function's [`Bus.read`] parameter
+ with the same name. The message must have been parsed
+ or copied when `callback` returns as it may be over
+ overridden after that time. `callback` should
+ return either of the the values:
+ 0: stop listening
+ 1: continue listening
+ -1: an error has occurred
+ However, the function [`Bus.read`] will invoke
+ `callback` with `message` set to `None` one time
+ directly after it has started listening on the bus.
+ This is to the the program now it can safely continue
+ with any action that requires that the programs is
+ listening on the bus.
+ NB! The received message will not be decoded from UTF-8
+ @param user_data:¿U? See description of `callback`
'''
from native_bus import bus_read_wrapped
- if bus_read_wrapped(self.bus, callback, user_data) == -1:
- raise self.__oserror()
+ (r, e) = bus_read_wrapped(self.bus, callback, user_data)
+ if r == -1:
+ raise self.__oserror(e)
+
+
+ def read_timed(self, callback : callable, timeout : float, clock_id : int = None, user_data = None):
+ '''
+ Listen (in a loop, forever) for new message on a bus
+
+ @param callback:(message:str?, user_data:¿U?=user_data)→int
+ Function to call when a message is received, the
+ input parameters will be the read message and
+ `user_data` from the function's [`Bus.read`] parameter
+ with the same name. The message must have been parsed
+ or copied when `callback` returns as it may be over
+ overridden after that time. `callback` should
+ return either of the the values:
+ 0: stop listening
+ 1: continue listening
+ -1: an error has occurred
+ However, the function [`Bus.read`] will invoke
+ `callback` with `message` set to `None` one time
+ directly after it has started listening on the bus.
+ This is to the the program now it can safely continue
+ with any action that requires that the programs is
+ listening on the bus.
+ NB! The received message will not be decoded from UTF-8
+ @param timeout:float The time the function shall fail with `os.errno.EAGAIN`,
+ if it has not already completed, note that the callback
+ function may or may not have been called
+ @param clock_id:int? The clock `timeout` is measured in, it must be a
+ predictable clock, if `None`, `timeout` is measured in
+ relative time instead of absolute time
+ @param user_data:¿U? See description of `callback`
+ '''
+ from native_bus import bus_read_timed_wrapped
+ if clock_id is None:
+ import time
+ clock_id = time.CLOCK_MONOTONIC_RAW
+ timeout += timeout.clock_gettime(clock_id)
+ (r, e) = bus_read_timed_wrapped(self.bus, callback, user_data, timeout, clock_id)
+ if r == -1:
+ raise self.__oserror(e)
def poll_start(self):
@@ -232,6 +296,34 @@ class Bus:
return message
+ def poll_timed(self, timeout : float, clock_id : int = None) -> bytes:
+ '''
+ Wait for a message to be broadcasted on the bus.
+ The caller should make a copy of the received message,
+ without freeing the original copy, and parse it in a
+ separate thread. When the new thread has started be
+ started, the caller of this function should then
+ either call `Bus.poll_timed` again or `Bus.poll_stop`.
+
+ @param timeout:float The time the function shall fail with `os.errno.EAGAIN`,
+ if it has not already completed
+ @param clock_id:int? The clock `timeout` is measured in, it must be a
+ predictable clock, if `None`, `timeout` is measured in
+ relative time instead of absolute time
+ @return :bytes The received message
+ NB! The received message will not be decoded from UTF-8
+ '''
+ from native_bus import bus_poll_timed_wrapped
+ if clock_id is None:
+ import time
+ clock_id = time.CLOCK_MONOTONIC_RAW
+ timeout += timeout.clock_gettime(clock_id)
+ (message, e) = bus_poll_timed_wrapped(self.bus, timeout, clock_id)
+ if message is None:
+ raise self.__oserror(e)
+ return message
+
+
def chown(self, owner = None, group = None):
'''
Change the ownership of a bus
diff --git a/src/native_bus.pyx b/src/native_bus.pyx
index 15215c8..861c16f 100644
--- a/src/native_bus.pyx
+++ b/src/native_bus.pyx
@@ -27,7 +27,12 @@ cimport cython
from libc.stdlib cimport malloc, free
from libc.errno cimport errno
-from posix.types cimport uid_t, gid_t, mode_t
+from posix.types cimport uid_t, gid_t, mode_t, clockid_t, time_t
+
+cdef struct timespec:
+ time_t tv_sec
+ long tv_nsec
+ctypedef timespec timespec_t
cdef extern int bus_create(const char *, int, char **)
@@ -82,22 +87,61 @@ Broadcast a message a bus
@return 0 on success, -1 on error
'''
+cdef extern int bus_write_timed(long, const char *, timespec_t *, clockid_t)
+'''
+Broadcast a message a bus
+
+@param bus Bus information
+@param message The message to write, may not be longer than
+ `BUS_MEMORY_SIZE` including the NUL-termination
+@param timeout The time the operation shall fail with errno set
+ to `EAGAIN` if not completed
+@param clockid The ID of the clock the `timeout` is measured with,
+ it most be a predictable clock
+@return 0 on success, -1 on error
+'''
+
cdef extern int bus_read(long, int (*)(const char *, void *), void *)
'''
Listen (in a loop, forever) for new message on a bus
-@param bus Bus information
-@param callback Function to call when a message is received, the
- input parameters will be the read message and
- `user_data` from `bus_read`'s parameter with the
- same name. The message must have been parsed or
- copied when `callback` returns as it may be over
- overridden after that time. `callback` should
- return either of the the values:
- 0: stop listening
- 1: continue listening
- -1: an error has occurred
-@return 0 on success, -1 on error
+@param bus Bus information
+@param callback Function to call when a message is received, the
+ input parameters will be the read message and
+ `user_data` from `bus_read`'s parameter with the
+ same name. The message must have been parsed or
+ copied when `callback` returns as it may be over
+ overridden after that time. `callback` should
+ return either of the the values:
+ 0: stop listening
+ 1: continue listening
+ -1: an error has occurred
+@param user_data Parameter passed to `callback`
+@return 0 on success, -1 on error
+'''
+
+cdef extern int bus_read_timed(long, int (*)(const char *, void *), void *, timespec_t *, clockid_t)
+'''
+Listen (in a loop, forever) for new message on a bus
+
+@param bus Bus information
+@param callback Function to call when a message is received, the
+ input parameters will be the read message and
+ `user_data` from `bus_read`'s parameter with the
+ same name. The message must have been parsed or
+ copied when `callback` returns as it may be over
+ overridden after that time. `callback` should
+ return either of the the values:
+ 0: stop listening
+ 1: continue listening
+ -1: an error has occurred
+@param user_data Parameter passed to `callback`
+@param timeout The time the operation shall fail with errno set
+ to `EAGAIN` if not completed, note that the callback
+ function may or may not have been called
+@param clockid The ID of the clock the `timeout` is measured with,
+ it most be a predictable clock
+@return 0 on success, -1 on error
'''
cdef extern int bus_poll_start(long)
@@ -138,6 +182,23 @@ either call `bus_poll` again or `bus_poll_stop`.
@return The received message, `NULL` on error
'''
+cdef extern const char *bus_poll_timed(long, timespec_t *, clockid_t)
+'''
+Wait for a message to be broadcasted on the bus.
+The caller should make a copy of the received message,
+without freeing the original copy, and parse it in a
+separate thread. When the new thread has started be
+started, the caller of this function should then
+either call `bus_poll_timed` again or `bus_poll_stop`.
+
+@param bus Bus information
+@param timeout The time the operation shall fail with errno set
+ to `EAGAIN` if not completed
+@param clockid The ID of the clock the `timeout` is measured with,
+ it most be a predictable clock
+@return The received message, `NULL` on error
+'''
+
cdef extern int bus_chown(const char *, uid_t, gid_t)
'''
Change the ownership of a bus
@@ -288,6 +349,32 @@ def bus_write_wrapped(bus : int, message : str, flags : int) -> tuple:
return (r, e)
+def bus_write_timed_wrapped(bus : int, message : str, timeout : float, clock_id : int) -> tuple:
+ '''
+ Broadcast a message a bus
+
+ @param bus:int Bus information
+ @param message:str The message to write, may not be longer than
+ `BUS_MEMORY_SIZE` including the NUL-termination
+ @param timeout:float The time the function shall fail with `os.errno.EAGAIN`,
+ if it has not already completed
+ @param clock_id:int The clock `timeout` is measured in, it must be a
+ predictable clock
+ @return :int 0 on success, -1 on error
+ @return :int The value of `errno`
+ '''
+ cdef const char* cmessage
+ cdef bytes bs
+ cdef timespec_t timeout_spec
+ bs = message.encode('utf-8') + bytes([0])
+ cmessage = bs
+ timeout_spec.tv_sec = <time_t>int(timeout)
+ timeout_spec.tv_nsec = <long>int((timeout - int(timeout)) * 1000000000)
+ r = bus_write_timed(<long>bus, cmessage, &timeout_spec, <clockid_t>clock_id)
+ e = errno
+ return (r, e)
+
+
cdef int bus_callback_wrapper(const char *message, user_data):
cdef bytes bs
callback, user_data = tuple(<object>user_data)
@@ -303,16 +390,17 @@ def bus_read_wrapped(bus : int, callback : callable, user_data) -> tuple:
Listen (in a loop, forever) for new message on a bus
@param bus:int Bus information
- @param callback:(str?, ¿V?)→int Function to call when a message is received, the
+ @param callback:(str?, ¿U?)→int Function to call when a message is received, the
input parameters will be the read message and
`user_data` from `bus_read`'s parameter with the
same name. The message must have been parsed or
copied when `callback` returns as it may be over
overridden after that time. `callback` should
return either of the the values:
- 0: stop listening
- 1: continue listening
- -1: an error has occurred
+ 0: stop listening
+ 1: continue listening
+ -1: an error has occurred
+ @param user_data:¿U? Parameter passed to `callback`
@return :int 0 on success, -1 on error
@return :int The value of `errno`
'''
@@ -322,6 +410,40 @@ def bus_read_wrapped(bus : int, callback : callable, user_data) -> tuple:
return (r, e)
+def bus_read_timed_wrapped(bus : int, callback : callable, user_data, timeout : float, clock_id : int) -> tuple:
+ '''
+ Listen (in a loop, forever) for new message on a bus
+
+ @param bus:int Bus information
+ @param callback:(str?, ¿U?)→int Function to call when a message is received, the
+ input parameters will be the read message and
+ `user_data` from `bus_read`'s parameter with the
+ same name. The message must have been parsed or
+ copied when `callback` returns as it may be over
+ overridden after that time. `callback` should
+ return either of the the values:
+ 0: stop listening
+ 1: continue listening
+ -1: an error has occurred
+ @param timeout:float The time the function shall fail with `os.errno.EAGAIN`,
+ if it has not already completed, note that the callback
+ function may or may not have been called
+ @param clock_id:int The clock `timeout` is measured in, it must be a
+ predictable clock
+ @param user_data:¿U? Parameter passed to `callback`
+ @return :int 0 on success, -1 on error
+ @return :int The value of `errno`
+ '''
+ cdef timespec_t timeout_spec
+ user = (callback, user_data)
+ timeout_spec.tv_sec = <time_t>int(timeout)
+ timeout_spec.tv_nsec = <long>int((timeout - int(timeout)) * 1000000000)
+ r = bus_read_timed(<long>bus, <int (*)(const char *, void *)>&bus_callback_wrapper,
+ <void *>user, &timeout_spec, <clockid_t>clock_id)
+ e = errno
+ return (r, e)
+
+
def bus_poll_start_wrapped(bus : int) -> tuple:
'''
Announce that the thread is listening on the bus.
@@ -365,7 +487,7 @@ def bus_poll_wrapped(bus : int, flags : int) -> tuple:
either call `bus_poll_wrapped` again or
`bus_poll_stop_wrapped`.
- @param bus::int Bus information
+ @param bus:int Bus information
@param flags:int `BUS_NOWAIT` if the bus should fail and set `errno`
to `os.errno.EAGAIN` if there isn't already a message
available on the bus
@@ -382,6 +504,37 @@ def bus_poll_wrapped(bus : int, flags : int) -> tuple:
return (bs, e)
+def bus_poll_timed_wrapped(bus : int, timeout : float, clock_id : int) -> tuple:
+ '''
+ Wait for a message to be broadcasted on the bus.
+ The caller should make a copy of the received message,
+ without freeing the original copy, and parse it in a
+ separate thread. When the new thread has started be
+ started, the caller of this function should then
+ either call `bus_poll_timed_wrapped` again or
+ `bus_poll_stop_wrapped`.
+
+ @param bus:int Bus information
+ @param timeout:float The time the function shall fail with `os.errno.EAGAIN`,
+ if it has not already completed
+ @param clock_id:int The clock `timeout` is measured in, it must be a
+ predictable clock
+ @return :bytes The received message, `None` on error
+ @return :int The value of `errno`
+ '''
+ cdef const char* msg
+ cdef bytes bs
+ cdef timespec_t timeout_spec
+ timeout_spec.tv_sec = <time_t>int(timeout)
+ timeout_spec.tv_nsec = <long>int((timeout - int(timeout)) * 1000000000)
+ msg = bus_poll_timed(<long>bus, &timeout_spec, <clockid_t>clock_id)
+ e = errno
+ if msg is NULL:
+ return (None, e)
+ bs = msg
+ return (bs, e)
+
+
def bus_chown_wrapped(file : str, owner : int, group : int) -> tuple:
'''
Change the ownership of a bus