diff options
Diffstat (limited to 'src/bus.c')
-rw-r--r-- | src/bus.c | 95 |
1 files changed, 63 insertions, 32 deletions
@@ -61,9 +61,14 @@ #define Q 3 /** + * Semaphore used to notify `bus_read` that it may restore `S` + */ +#define N 4 + +/** * The number of semaphores in the semaphore array */ -#define BUS_SEMAPHORES 4 +#define BUS_SEMAPHORES 5 @@ -72,7 +77,8 @@ * * @param bus:const bus_t * The bus * @param semaphore:int The index of the semaphore, `S`, `W`, `X` or `Q` - * @param flags:int `SEM_UNDO` if the action should be undone when the program exits + * @param flags:int `SEM_UNDO` if the action should be undone when the program exits, + * `IPC_NOWAIT` if the action should fail if it would block * @return :int 0 on success, -1 on error */ #define acquire_semaphore(bus, semaphore, flags) \ @@ -94,10 +100,11 @@ * * @param bus:const bus_t * The bus * @param semaphore:int The index of the semaphore, `S`, `W`, `X` or `Q` + * @param flags:int `IPC_NOWAIT` if the action should fail if it would block * @return :int 0 on success, -1 on error */ -#define zero_semaphore(bus, semaphore) \ - semaphore_op(bus, semaphore, 0, 0) +#define zero_semaphore(bus, semaphore, flags) \ + semaphore_op(bus, semaphore, 0, flags) /** * Open the semaphore array @@ -601,17 +608,26 @@ fail: * `BUS_MEMORY_SIZE` including the NUL-termination * @return 0 on success, -1 on error */ -int +int /* TODO nonblocking */ bus_write(const bus_t *bus, const char *message) { - t(acquire_semaphore(bus, X, SEM_UNDO)); - t(zero_semaphore(bus, W)); + int state = 0, saved_errno; + if (acquire_semaphore(bus, X, SEM_UNDO) == -1) + return -1; + t(zero_semaphore(bus, W, 0)); write_shared_memory(bus, message); + t(release_semaphore(bus, N, SEM_UNDO)); state++; t(write_semaphore(bus, Q, 0)); - t(zero_semaphore(bus, S)); + t(zero_semaphore(bus, S, 0)); + t(acquire_semaphore(bus, N, SEM_UNDO)); state--; t(release_semaphore(bus, X, SEM_UNDO)); return 0; fail: + saved_errno = errno; + if (state > 0) + acquire_semaphore(bus, N, SEM_UNDO); + release_semaphore(bus, X, SEM_UNDO); + errno = saved_errno; return -1; } @@ -641,29 +657,36 @@ fail: int bus_read(const bus_t *bus, int (*callback)(const char *message, void *user_data), void *user_data) { - int r; - t(release_semaphore(bus, S, SEM_UNDO)); + int r, state = 0, saved_errno; + if (release_semaphore(bus, S, SEM_UNDO) == -1) + return -1; t(r = callback(NULL, user_data)); - if (!r) { - t(acquire_semaphore(bus, S, SEM_UNDO)); - return 0; - } + if (!r) goto done; for (;;) { t(release_semaphore(bus, Q, 0)); - t(zero_semaphore(bus, Q)); + t(zero_semaphore(bus, Q, 0)); t(r = callback(bus->message, user_data)); - if (!r) { - t(acquire_semaphore(bus, S, SEM_UNDO)); - return 0; - } - t(release_semaphore(bus, W, SEM_UNDO)); - t(acquire_semaphore(bus, S, SEM_UNDO)); - t(zero_semaphore(bus, S)); - t(release_semaphore(bus, S, SEM_UNDO)); - t(acquire_semaphore(bus, W, SEM_UNDO)); + if (!r) goto done; + t(release_semaphore(bus, W, SEM_UNDO)); state++; + t(acquire_semaphore(bus, S, SEM_UNDO)); state++; + t(zero_semaphore(bus, S, 0)); + t(zero_semaphore(bus, N, 0)); + t(release_semaphore(bus, S, SEM_UNDO)); state--; + t(acquire_semaphore(bus, W, SEM_UNDO)); state--; } fail: + saved_errno = errno; + if (state > 1) + release_semaphore(bus, S, SEM_UNDO); + if (state > 0) + acquire_semaphore(bus, W, SEM_UNDO); + acquire_semaphore(bus, S, SEM_UNDO); + errno = saved_errno; return -1; + +done: + t(acquire_semaphore(bus, S, SEM_UNDO)); + return 0; } @@ -678,26 +701,34 @@ bus_poll_start(bus_t *bus) int bus_poll_stop(const bus_t *bus) { - return acquire_semaphore(bus, S, SEM_UNDO); + return acquire_semaphore(bus, S, SEM_UNDO | IPC_NOWAIT); } -const char * +const char * /* TODO nonblocking */ bus_poll(bus_t *bus) { + int state = 0, saved_errno; if (!bus->first_poll) { - t(release_semaphore(bus, W, SEM_UNDO)); - t(acquire_semaphore(bus, S, SEM_UNDO)); - t(zero_semaphore(bus, S)); - t(release_semaphore(bus, S, SEM_UNDO)); - t(acquire_semaphore(bus, W, SEM_UNDO)); + t(release_semaphore(bus, W, SEM_UNDO)); state++; + t(acquire_semaphore(bus, S, SEM_UNDO)); state++; + t(zero_semaphore(bus, S, 0)); + t(zero_semaphore(bus, N, 0)); + t(release_semaphore(bus, S, SEM_UNDO)); state--; + t(acquire_semaphore(bus, W, SEM_UNDO)); state--; } else { bus->first_poll = 0; } t(release_semaphore(bus, Q, 0)); - t(zero_semaphore(bus, Q)); + t(zero_semaphore(bus, Q, 0)); return bus->message; fail: + saved_errno = errno; + if (state > 1) + release_semaphore(bus, S, SEM_UNDO); + if (state > 0) + acquire_semaphore(bus, W, SEM_UNDO); + errno = saved_errno; return NULL; } |