diff options
-rw-r--r-- | doc/protocol | 41 | ||||
-rw-r--r-- | src/bus.c | 95 |
2 files changed, 84 insertions, 52 deletions
diff --git a/doc/protocol b/doc/protocol index bdff6ac..def8f6f 100644 --- a/doc/protocol +++ b/doc/protocol @@ -1,33 +1,34 @@ -init: +create: Select a filename. - Create XSI semaphore array {S = 0, W = 0, X = 1 and Q = 0} - with random key. Store the semaphore array's key in - decimal form on the first line in the selected file. + Create XSI semaphore array {S = 0, W = 0, X = 1, Q = 0 and N = 0} + with random key. Store the semaphore array's key in decimal form + on the first line in the selected file. - Create XSI shared memory, with an allocation of 2048 bytes, - with a random key. Store the shared memory's key in - decimal form on the second line in the selected file. + Create XSI shared memory, with an allocation of 2048 bytes, with a + random key. Store the shared memory's key in decimal form on the + second line in the selected file. broadcast: with P(X): Z(W) Write NUL-terminate message to shared memory - Q := 0 - Z(S) + with V(N): + Q := 0 + Z(S) listen: - V(S) with undo on exit - forever: - V(Q) - Z(Q) - Read NUL-terminated message from shared memory - if breaking: - P(S) with undo on exit - break - with V(W): - with P(S): - Z(S) + with V(S): + forever: + V(Q) + Z(Q) + Read NUL-terminated message from shared memory + if breaking: + break + with V(W): + with P(S): + Z(S) + Z(N) @@ -61,9 +61,14 @@ #define Q 3 /** + * Semaphore used to notify `bus_read` that it may restore `S` + */ +#define N 4 + +/** * The number of semaphores in the semaphore array */ -#define BUS_SEMAPHORES 4 +#define BUS_SEMAPHORES 5 @@ -72,7 +77,8 @@ * * @param bus:const bus_t * The bus * @param semaphore:int The index of the semaphore, `S`, `W`, `X` or `Q` - * @param flags:int `SEM_UNDO` if the action should be undone when the program exits + * @param flags:int `SEM_UNDO` if the action should be undone when the program exits, + * `IPC_NOWAIT` if the action should fail if it would block * @return :int 0 on success, -1 on error */ #define acquire_semaphore(bus, semaphore, flags) \ @@ -94,10 +100,11 @@ * * @param bus:const bus_t * The bus * @param semaphore:int The index of the semaphore, `S`, `W`, `X` or `Q` + * @param flags:int `IPC_NOWAIT` if the action should fail if it would block * @return :int 0 on success, -1 on error */ -#define zero_semaphore(bus, semaphore) \ - semaphore_op(bus, semaphore, 0, 0) +#define zero_semaphore(bus, semaphore, flags) \ + semaphore_op(bus, semaphore, 0, flags) /** * Open the semaphore array @@ -601,17 +608,26 @@ fail: * `BUS_MEMORY_SIZE` including the NUL-termination * @return 0 on success, -1 on error */ -int +int /* TODO nonblocking */ bus_write(const bus_t *bus, const char *message) { - t(acquire_semaphore(bus, X, SEM_UNDO)); - t(zero_semaphore(bus, W)); + int state = 0, saved_errno; + if (acquire_semaphore(bus, X, SEM_UNDO) == -1) + return -1; + t(zero_semaphore(bus, W, 0)); write_shared_memory(bus, message); + t(release_semaphore(bus, N, SEM_UNDO)); state++; t(write_semaphore(bus, Q, 0)); - t(zero_semaphore(bus, S)); + t(zero_semaphore(bus, S, 0)); + t(acquire_semaphore(bus, N, SEM_UNDO)); state--; t(release_semaphore(bus, X, SEM_UNDO)); return 0; fail: + saved_errno = errno; + if (state > 0) + acquire_semaphore(bus, N, SEM_UNDO); + release_semaphore(bus, X, SEM_UNDO); + errno = saved_errno; return -1; } @@ -641,29 +657,36 @@ fail: int bus_read(const bus_t *bus, int (*callback)(const char *message, void *user_data), void *user_data) { - int r; - t(release_semaphore(bus, S, SEM_UNDO)); + int r, state = 0, saved_errno; + if (release_semaphore(bus, S, SEM_UNDO) == -1) + return -1; t(r = callback(NULL, user_data)); - if (!r) { - t(acquire_semaphore(bus, S, SEM_UNDO)); - return 0; - } + if (!r) goto done; for (;;) { t(release_semaphore(bus, Q, 0)); - t(zero_semaphore(bus, Q)); + t(zero_semaphore(bus, Q, 0)); t(r = callback(bus->message, user_data)); - if (!r) { - t(acquire_semaphore(bus, S, SEM_UNDO)); - return 0; - } - t(release_semaphore(bus, W, SEM_UNDO)); - t(acquire_semaphore(bus, S, SEM_UNDO)); - t(zero_semaphore(bus, S)); - t(release_semaphore(bus, S, SEM_UNDO)); - t(acquire_semaphore(bus, W, SEM_UNDO)); + if (!r) goto done; + t(release_semaphore(bus, W, SEM_UNDO)); state++; + t(acquire_semaphore(bus, S, SEM_UNDO)); state++; + t(zero_semaphore(bus, S, 0)); + t(zero_semaphore(bus, N, 0)); + t(release_semaphore(bus, S, SEM_UNDO)); state--; + t(acquire_semaphore(bus, W, SEM_UNDO)); state--; } fail: + saved_errno = errno; + if (state > 1) + release_semaphore(bus, S, SEM_UNDO); + if (state > 0) + acquire_semaphore(bus, W, SEM_UNDO); + acquire_semaphore(bus, S, SEM_UNDO); + errno = saved_errno; return -1; + +done: + t(acquire_semaphore(bus, S, SEM_UNDO)); + return 0; } @@ -678,26 +701,34 @@ bus_poll_start(bus_t *bus) int bus_poll_stop(const bus_t *bus) { - return acquire_semaphore(bus, S, SEM_UNDO); + return acquire_semaphore(bus, S, SEM_UNDO | IPC_NOWAIT); } -const char * +const char * /* TODO nonblocking */ bus_poll(bus_t *bus) { + int state = 0, saved_errno; if (!bus->first_poll) { - t(release_semaphore(bus, W, SEM_UNDO)); - t(acquire_semaphore(bus, S, SEM_UNDO)); - t(zero_semaphore(bus, S)); - t(release_semaphore(bus, S, SEM_UNDO)); - t(acquire_semaphore(bus, W, SEM_UNDO)); + t(release_semaphore(bus, W, SEM_UNDO)); state++; + t(acquire_semaphore(bus, S, SEM_UNDO)); state++; + t(zero_semaphore(bus, S, 0)); + t(zero_semaphore(bus, N, 0)); + t(release_semaphore(bus, S, SEM_UNDO)); state--; + t(acquire_semaphore(bus, W, SEM_UNDO)); state--; } else { bus->first_poll = 0; } t(release_semaphore(bus, Q, 0)); - t(zero_semaphore(bus, Q)); + t(zero_semaphore(bus, Q, 0)); return bus->message; fail: + saved_errno = errno; + if (state > 1) + release_semaphore(bus, S, SEM_UNDO); + if (state > 0) + acquire_semaphore(bus, W, SEM_UNDO); + errno = saved_errno; return NULL; } |