From 6c1e2641e08a32708df57b5e0c2b1bfd2b3ad42c Mon Sep 17 00:00:00 2001 From: Mattias Andrée Date: Sun, 17 May 2015 15:49:27 +0200 Subject: add timed calls MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Mattias Andrée --- src/bus.py | 132 +++++++++++++++++++++++++++++++------ src/native_bus.pyx | 189 ++++++++++++++++++++++++++++++++++++++++++++++++----- 2 files changed, 283 insertions(+), 38 deletions(-) diff --git a/src/bus.py b/src/bus.py index b07fe82..f59ad1b 100644 --- a/src/bus.py +++ b/src/bus.py @@ -156,32 +156,96 @@ class Bus: raise self.__oserror(e) + def write_timed(self, message : str, timeout : float, clock_id : int = None): + ''' + Broadcast a message a bus + + @param message:str The message to write, may not be longer than 2047 bytes + after UTF-8 encoding + @param timeout:float The time the function shall fail with `os.errno.EAGAIN`, + if it has not already completed + @param clock_id:int? The clock `timeout` is measured in, it must be a + predictable clock, if `None`, `timeout` is measured in + relative time instead of absolute time + ''' + from native_bus import bus_write_timed_wrapped + if clock_id is None: + import time + clock_id = time.CLOCK_MONOTONIC_RAW + timeout += timeout.clock_gettime(clock_id) + (r, e) = bus_write_timed_wrapped(self.bus, message, timeout, clock_id) + if r == -1: + raise self.__oserror(e) + + def read(self, callback : callable, user_data = None): ''' Listen (in a loop, forever) for new message on a bus - @param callback Function to call when a message is received, the - input parameters will be the read message and - `user_data` from the function's [`Bus.read`] parameter - with the same name. The message must have been parsed - or copied when `callback` returns as it may be over - overridden after that time. `callback` should - return either of the the values: - 0: stop listening - 1: continue listening - -1: an error has occurred - However, the function [`Bus.read`] will invoke - `callback` with `message` set to `None` one time - directly after it has started listening on the bus. - This is to the the program now it can safely continue - with any action that requires that the programs is - listening on the bus. - NB! The received message will not be decoded from UTF-8 - @param user_data See description of `callback` + @param callback:(message:str?, user_data:¿U?=user_data)→int + Function to call when a message is received, the + input parameters will be the read message and + `user_data` from the function's [`Bus.read`] parameter + with the same name. The message must have been parsed + or copied when `callback` returns as it may be over + overridden after that time. `callback` should + return either of the the values: + 0: stop listening + 1: continue listening + -1: an error has occurred + However, the function [`Bus.read`] will invoke + `callback` with `message` set to `None` one time + directly after it has started listening on the bus. + This is to the the program now it can safely continue + with any action that requires that the programs is + listening on the bus. + NB! The received message will not be decoded from UTF-8 + @param user_data:¿U? See description of `callback` ''' from native_bus import bus_read_wrapped - if bus_read_wrapped(self.bus, callback, user_data) == -1: - raise self.__oserror() + (r, e) = bus_read_wrapped(self.bus, callback, user_data) + if r == -1: + raise self.__oserror(e) + + + def read_timed(self, callback : callable, timeout : float, clock_id : int = None, user_data = None): + ''' + Listen (in a loop, forever) for new message on a bus + + @param callback:(message:str?, user_data:¿U?=user_data)→int + Function to call when a message is received, the + input parameters will be the read message and + `user_data` from the function's [`Bus.read`] parameter + with the same name. The message must have been parsed + or copied when `callback` returns as it may be over + overridden after that time. `callback` should + return either of the the values: + 0: stop listening + 1: continue listening + -1: an error has occurred + However, the function [`Bus.read`] will invoke + `callback` with `message` set to `None` one time + directly after it has started listening on the bus. + This is to the the program now it can safely continue + with any action that requires that the programs is + listening on the bus. + NB! The received message will not be decoded from UTF-8 + @param timeout:float The time the function shall fail with `os.errno.EAGAIN`, + if it has not already completed, note that the callback + function may or may not have been called + @param clock_id:int? The clock `timeout` is measured in, it must be a + predictable clock, if `None`, `timeout` is measured in + relative time instead of absolute time + @param user_data:¿U? See description of `callback` + ''' + from native_bus import bus_read_timed_wrapped + if clock_id is None: + import time + clock_id = time.CLOCK_MONOTONIC_RAW + timeout += timeout.clock_gettime(clock_id) + (r, e) = bus_read_timed_wrapped(self.bus, callback, user_data, timeout, clock_id) + if r == -1: + raise self.__oserror(e) def poll_start(self): @@ -232,6 +296,34 @@ class Bus: return message + def poll_timed(self, timeout : float, clock_id : int = None) -> bytes: + ''' + Wait for a message to be broadcasted on the bus. + The caller should make a copy of the received message, + without freeing the original copy, and parse it in a + separate thread. When the new thread has started be + started, the caller of this function should then + either call `Bus.poll_timed` again or `Bus.poll_stop`. + + @param timeout:float The time the function shall fail with `os.errno.EAGAIN`, + if it has not already completed + @param clock_id:int? The clock `timeout` is measured in, it must be a + predictable clock, if `None`, `timeout` is measured in + relative time instead of absolute time + @return :bytes The received message + NB! The received message will not be decoded from UTF-8 + ''' + from native_bus import bus_poll_timed_wrapped + if clock_id is None: + import time + clock_id = time.CLOCK_MONOTONIC_RAW + timeout += timeout.clock_gettime(clock_id) + (message, e) = bus_poll_timed_wrapped(self.bus, timeout, clock_id) + if message is None: + raise self.__oserror(e) + return message + + def chown(self, owner = None, group = None): ''' Change the ownership of a bus diff --git a/src/native_bus.pyx b/src/native_bus.pyx index 15215c8..861c16f 100644 --- a/src/native_bus.pyx +++ b/src/native_bus.pyx @@ -27,7 +27,12 @@ cimport cython from libc.stdlib cimport malloc, free from libc.errno cimport errno -from posix.types cimport uid_t, gid_t, mode_t +from posix.types cimport uid_t, gid_t, mode_t, clockid_t, time_t + +cdef struct timespec: + time_t tv_sec + long tv_nsec +ctypedef timespec timespec_t cdef extern int bus_create(const char *, int, char **) @@ -82,22 +87,61 @@ Broadcast a message a bus @return 0 on success, -1 on error ''' +cdef extern int bus_write_timed(long, const char *, timespec_t *, clockid_t) +''' +Broadcast a message a bus + +@param bus Bus information +@param message The message to write, may not be longer than + `BUS_MEMORY_SIZE` including the NUL-termination +@param timeout The time the operation shall fail with errno set + to `EAGAIN` if not completed +@param clockid The ID of the clock the `timeout` is measured with, + it most be a predictable clock +@return 0 on success, -1 on error +''' + cdef extern int bus_read(long, int (*)(const char *, void *), void *) ''' Listen (in a loop, forever) for new message on a bus -@param bus Bus information -@param callback Function to call when a message is received, the - input parameters will be the read message and - `user_data` from `bus_read`'s parameter with the - same name. The message must have been parsed or - copied when `callback` returns as it may be over - overridden after that time. `callback` should - return either of the the values: - 0: stop listening - 1: continue listening - -1: an error has occurred -@return 0 on success, -1 on error +@param bus Bus information +@param callback Function to call when a message is received, the + input parameters will be the read message and + `user_data` from `bus_read`'s parameter with the + same name. The message must have been parsed or + copied when `callback` returns as it may be over + overridden after that time. `callback` should + return either of the the values: + 0: stop listening + 1: continue listening + -1: an error has occurred +@param user_data Parameter passed to `callback` +@return 0 on success, -1 on error +''' + +cdef extern int bus_read_timed(long, int (*)(const char *, void *), void *, timespec_t *, clockid_t) +''' +Listen (in a loop, forever) for new message on a bus + +@param bus Bus information +@param callback Function to call when a message is received, the + input parameters will be the read message and + `user_data` from `bus_read`'s parameter with the + same name. The message must have been parsed or + copied when `callback` returns as it may be over + overridden after that time. `callback` should + return either of the the values: + 0: stop listening + 1: continue listening + -1: an error has occurred +@param user_data Parameter passed to `callback` +@param timeout The time the operation shall fail with errno set + to `EAGAIN` if not completed, note that the callback + function may or may not have been called +@param clockid The ID of the clock the `timeout` is measured with, + it most be a predictable clock +@return 0 on success, -1 on error ''' cdef extern int bus_poll_start(long) @@ -138,6 +182,23 @@ either call `bus_poll` again or `bus_poll_stop`. @return The received message, `NULL` on error ''' +cdef extern const char *bus_poll_timed(long, timespec_t *, clockid_t) +''' +Wait for a message to be broadcasted on the bus. +The caller should make a copy of the received message, +without freeing the original copy, and parse it in a +separate thread. When the new thread has started be +started, the caller of this function should then +either call `bus_poll_timed` again or `bus_poll_stop`. + +@param bus Bus information +@param timeout The time the operation shall fail with errno set + to `EAGAIN` if not completed +@param clockid The ID of the clock the `timeout` is measured with, + it most be a predictable clock +@return The received message, `NULL` on error +''' + cdef extern int bus_chown(const char *, uid_t, gid_t) ''' Change the ownership of a bus @@ -288,6 +349,32 @@ def bus_write_wrapped(bus : int, message : str, flags : int) -> tuple: return (r, e) +def bus_write_timed_wrapped(bus : int, message : str, timeout : float, clock_id : int) -> tuple: + ''' + Broadcast a message a bus + + @param bus:int Bus information + @param message:str The message to write, may not be longer than + `BUS_MEMORY_SIZE` including the NUL-termination + @param timeout:float The time the function shall fail with `os.errno.EAGAIN`, + if it has not already completed + @param clock_id:int The clock `timeout` is measured in, it must be a + predictable clock + @return :int 0 on success, -1 on error + @return :int The value of `errno` + ''' + cdef const char* cmessage + cdef bytes bs + cdef timespec_t timeout_spec + bs = message.encode('utf-8') + bytes([0]) + cmessage = bs + timeout_spec.tv_sec = int(timeout) + timeout_spec.tv_nsec = int((timeout - int(timeout)) * 1000000000) + r = bus_write_timed(bus, cmessage, &timeout_spec, clock_id) + e = errno + return (r, e) + + cdef int bus_callback_wrapper(const char *message, user_data): cdef bytes bs callback, user_data = tuple(user_data) @@ -303,16 +390,17 @@ def bus_read_wrapped(bus : int, callback : callable, user_data) -> tuple: Listen (in a loop, forever) for new message on a bus @param bus:int Bus information - @param callback:(str?, ¿V?)→int Function to call when a message is received, the + @param callback:(str?, ¿U?)→int Function to call when a message is received, the input parameters will be the read message and `user_data` from `bus_read`'s parameter with the same name. The message must have been parsed or copied when `callback` returns as it may be over overridden after that time. `callback` should return either of the the values: - 0: stop listening - 1: continue listening - -1: an error has occurred + 0: stop listening + 1: continue listening + -1: an error has occurred + @param user_data:¿U? Parameter passed to `callback` @return :int 0 on success, -1 on error @return :int The value of `errno` ''' @@ -322,6 +410,40 @@ def bus_read_wrapped(bus : int, callback : callable, user_data) -> tuple: return (r, e) +def bus_read_timed_wrapped(bus : int, callback : callable, user_data, timeout : float, clock_id : int) -> tuple: + ''' + Listen (in a loop, forever) for new message on a bus + + @param bus:int Bus information + @param callback:(str?, ¿U?)→int Function to call when a message is received, the + input parameters will be the read message and + `user_data` from `bus_read`'s parameter with the + same name. The message must have been parsed or + copied when `callback` returns as it may be over + overridden after that time. `callback` should + return either of the the values: + 0: stop listening + 1: continue listening + -1: an error has occurred + @param timeout:float The time the function shall fail with `os.errno.EAGAIN`, + if it has not already completed, note that the callback + function may or may not have been called + @param clock_id:int The clock `timeout` is measured in, it must be a + predictable clock + @param user_data:¿U? Parameter passed to `callback` + @return :int 0 on success, -1 on error + @return :int The value of `errno` + ''' + cdef timespec_t timeout_spec + user = (callback, user_data) + timeout_spec.tv_sec = int(timeout) + timeout_spec.tv_nsec = int((timeout - int(timeout)) * 1000000000) + r = bus_read_timed(bus, &bus_callback_wrapper, + user, &timeout_spec, clock_id) + e = errno + return (r, e) + + def bus_poll_start_wrapped(bus : int) -> tuple: ''' Announce that the thread is listening on the bus. @@ -365,7 +487,7 @@ def bus_poll_wrapped(bus : int, flags : int) -> tuple: either call `bus_poll_wrapped` again or `bus_poll_stop_wrapped`. - @param bus::int Bus information + @param bus:int Bus information @param flags:int `BUS_NOWAIT` if the bus should fail and set `errno` to `os.errno.EAGAIN` if there isn't already a message available on the bus @@ -382,6 +504,37 @@ def bus_poll_wrapped(bus : int, flags : int) -> tuple: return (bs, e) +def bus_poll_timed_wrapped(bus : int, timeout : float, clock_id : int) -> tuple: + ''' + Wait for a message to be broadcasted on the bus. + The caller should make a copy of the received message, + without freeing the original copy, and parse it in a + separate thread. When the new thread has started be + started, the caller of this function should then + either call `bus_poll_timed_wrapped` again or + `bus_poll_stop_wrapped`. + + @param bus:int Bus information + @param timeout:float The time the function shall fail with `os.errno.EAGAIN`, + if it has not already completed + @param clock_id:int The clock `timeout` is measured in, it must be a + predictable clock + @return :bytes The received message, `None` on error + @return :int The value of `errno` + ''' + cdef const char* msg + cdef bytes bs + cdef timespec_t timeout_spec + timeout_spec.tv_sec = int(timeout) + timeout_spec.tv_nsec = int((timeout - int(timeout)) * 1000000000) + msg = bus_poll_timed(bus, &timeout_spec, clock_id) + e = errno + if msg is NULL: + return (None, e) + bs = msg + return (bs, e) + + def bus_chown_wrapped(file : str, owner : int, group : int) -> tuple: ''' Change the ownership of a bus -- cgit v1.2.3-70-g09d2