diff options
Diffstat (limited to '')
-rw-r--r-- | src/bus.c | 185 |
1 files changed, 178 insertions, 7 deletions
@@ -22,6 +22,7 @@ * DEALINGS IN THE SOFTWARE. */ #define _XOPEN_SOURCE 700 +#define _GNU_SOURCE #include "bus.h" #include <stdlib.h> @@ -119,7 +120,7 @@ semaphore_op(bus, semaphore, +1, flags) /** - * Wait for the value of a semphore to become 0 + * Wait for the value of a semaphore to become 0 * * @param bus:const bus_t * The bus * @param semaphore:int The index of the semaphore, `S`, `W`, `X` or `Q` @@ -130,6 +131,43 @@ semaphore_op(bus, semaphore, 0, flags) /** + * Decrease the value of a semaphore by 1 + * + * @param bus:const bus_t * The bus + * @param semaphore:int The index of the semaphore, `S`, `W`, `X` or `Q` + * @param flags:int `SEM_UNDO` if the action should be undone when the program exits, + * `IPC_NOWAIT` if the action should fail if it would block + * @param timeout:const struct timespec * The amount of time to wait before failing + * @return :int 0 on success, -1 on error + */ +#define acquire_semaphore_timed(bus, semaphore, flags, timeout) \ + semaphore_op_timed(bus, semaphore, -1, flags, timeout) + +/** + * Increase the value of a semaphore by 1 + * + * @param bus:const bus_t * The bus + * @param semaphore:int The index of the semaphore, `S`, `W`, `X` or `Q` + * @param flags:int `SEM_UNDO` if the action should be undone when the program exits + * @param timeout:const struct timespec * The amount of time to wait before failing + * @return :int 0 on success, -1 on error + */ +#define release_semaphore_timed(bus, semaphore, flags, timeout) \ + semaphore_op_timed(bus, semaphore, +1, flags, timeout) + +/** + * Wait for the value of a semaphore to become 0 + * + * @param bus:const bus_t * The bus + * @param semaphore:int The index of the semaphore, `S`, `W`, `X` or `Q` + * @param flags:int `IPC_NOWAIT` if the action should fail if it would block + * @param timeout:const struct timespec * The amount of time to wait before failing + * @return :int 0 on success, -1 on error + */ +#define zero_semaphore_timed(bus, semaphore, flags, timeout) \ + semaphore_op_timed(bus, semaphore, 0, flags, timeout) + +/** * Open the semaphore array * * @param bus:const bus_t * The bus @@ -150,6 +188,21 @@ /** + * Set `delta` to the convertion of `timeout` from absolute to relative time, + * measured in the clock whose ID is specified by `clockid` + * + * @scope timeout:struct timespec Output variable for relative time + * @scope timeout:const struct timespec * The absolute time + * @scope clockid:clockid_t The clock time is measured + */ +#define DELTA \ + do { \ + if (absolute_time_to_delta_time(&delta, timeout, clockid) < 0) goto fail; \ + else if ((delta.tv_sec < 0) || (delta.tv_nsec < 0)) { errno = EAGAIN; goto fail; } \ + } while (0) + + +/** * If `flags & (bus_flag)`, this macro evalutes to `sys_flag`, * otherwise this macro evalutes to 0. */ @@ -319,6 +372,27 @@ semaphore_op(const bus_t *bus, int semaphore, int delta, int flags) /** + * Increase or decrease the value of a semaphore, or wait the it to become 0 + * + * @param bus Bus information + * @param semaphore The index of the semaphore, `S`, `W`, `X` or `Q` + * @param delta The adjustment to make to the semaphore's value, 0 to wait for it to become 0 + * @param flags `SEM_UNDO` if the action should be undone when the program exits + * @param timeout The amount of time to wait before failing + * @return 0 on success, -1 on error + */ +static int +semaphore_op_timed(const bus_t *bus, int semaphore, int delta, int flags, const struct timespec *timeout) +{ + struct sembuf op; + op.sem_num = (unsigned short)semaphore; + op.sem_op = (short)delta; + op.sem_flg = (short)flags; + return semtimedop(bus->sem_id, &op, (size_t)1, timeout); +} + + +/** * Set the value of a semaphore * * @param bus Bus information @@ -722,10 +796,41 @@ fail: int bus_write_timed(const bus_t *bus, const char *message, const struct timespec *timeout, clockid_t clockid) { - /* TODO bus_write_timed */ + int saved_errno; +#ifndef BUS_SEMAPHORES_ARE_SYNCHRONOUS + int state = 0; +#endif + struct timespec delta; if (!timeout) return bus_write(bus, message, 0); - (void) bus, (void) message, (void) timeout, (void) clockid; + + DELTA; + if (acquire_semaphore_timed(bus, X, SEM_UNDO, &delta) == -1) + return -1; + DELTA; + t(zero_semaphore_timed(bus, W, 0, &delta)); + write_shared_memory(bus, message); +#ifndef BUS_SEMAPHORES_ARE_SYNCHRONOUS + t(release_semaphore(bus, N, SEM_UNDO)); state++; +#endif + t(write_semaphore(bus, Q, 0)); + t(zero_semaphore(bus, S, 0)); +#ifndef BUS_SEMAPHORES_ARE_SYNCHRONOUS + t(acquire_semaphore(bus, N, SEM_UNDO)); state--; +#endif + t(release_semaphore(bus, X, SEM_UNDO)); + return 0; + +fail: + saved_errno = errno; +#ifndef BUS_SEMAPHORES_ARE_SYNCHRONOUS + if (state > 0) + acquire_semaphore(bus, N, SEM_UNDO); +#endif + release_semaphore(bus, X, SEM_UNDO); + errno = saved_errno; + return -1; + } @@ -822,10 +927,46 @@ done: int bus_read_timed(const bus_t *bus, int (*callback)(const char *message, void *user_data), void *user_data, const struct timespec *timeout, clockid_t clockid) { - /* TODO bus_read_timed */ + int r, state = 0, saved_errno; + struct timespec delta; if (!timeout) return bus_read(bus, callback, user_data); - (void) bus, (void) callback, (void) user_data, (void) timeout, (void) clockid; + + DELTA; + if (release_semaphore_timed(bus, S, SEM_UNDO, &delta) == -1) + return -1; + t(r = callback(NULL, user_data)); + if (!r) goto done; + for (;;) { + DELTA; + t(release_semaphore_timed(bus, Q, 0, &delta)); + DELTA; + t(zero_semaphore_timed(bus, Q, 0, &delta)); + t(r = callback(bus->message, user_data)); + if (!r) goto done; + t(release_semaphore(bus, W, SEM_UNDO)); state++; + t(acquire_semaphore(bus, S, SEM_UNDO)); state++; + t(zero_semaphore(bus, S, 0)); +#ifndef BUS_SEMAPHORES_ARE_SYNCHRONOUS_ME_HARDER + t(zero_semaphore(bus, N, 0)); +#endif + t(release_semaphore(bus, S, SEM_UNDO)); state--; + t(acquire_semaphore(bus, W, SEM_UNDO)); state--; + } + +fail: + saved_errno = errno; + if (state > 1) + release_semaphore(bus, S, SEM_UNDO); + if (state > 0) + acquire_semaphore(bus, W, SEM_UNDO); + acquire_semaphore(bus, S, SEM_UNDO); + errno = saved_errno; + return -1; + +done: + t(acquire_semaphore(bus, S, SEM_UNDO)); + return 0; } @@ -943,10 +1084,40 @@ fail: */ const char *bus_poll_timed(bus_t *bus, const struct timespec *timeout, clockid_t clockid) { - /* TODO bus_poll_timed */ + int state = 0, saved_errno; + struct timespec delta; if (!timeout) return bus_poll(bus); - (void) bus, (void) timeout, (void) clockid; + + if (!bus->first_poll) { + t(release_semaphore(bus, W, SEM_UNDO)); state++; + t(acquire_semaphore(bus, S, SEM_UNDO)); state++; + t(zero_semaphore(bus, S, 0)); +#ifndef BUS_SEMAPHORES_ARE_SYNCHRONOUS_ME_HARDER + t(zero_semaphore(bus, N, 0)); +#endif + t(release_semaphore(bus, S, SEM_UNDO)); state--; + t(acquire_semaphore(bus, W, SEM_UNDO)); state--; + } else { + bus->first_poll = 0; + } + state--; + DELTA; + t(release_semaphore_timed(bus, Q, 0, &delta)); + DELTA; + t(zero_semaphore_timed(bus, Q, 0, &delta)); + return bus->message; + +fail: + saved_errno = errno; + if (state > 1) + release_semaphore(bus, S, SEM_UNDO); + if (state > 0) + acquire_semaphore(bus, W, SEM_UNDO); + if (state < 0) + bus->first_poll = 1; + errno = saved_errno; + return NULL; } |