aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--doc/protocol41
-rw-r--r--src/bus.c95
2 files changed, 84 insertions, 52 deletions
diff --git a/doc/protocol b/doc/protocol
index bdff6ac..def8f6f 100644
--- a/doc/protocol
+++ b/doc/protocol
@@ -1,33 +1,34 @@
-init:
+create:
Select a filename.
- Create XSI semaphore array {S = 0, W = 0, X = 1 and Q = 0}
- with random key. Store the semaphore array's key in
- decimal form on the first line in the selected file.
+ Create XSI semaphore array {S = 0, W = 0, X = 1, Q = 0 and N = 0}
+ with random key. Store the semaphore array's key in decimal form
+ on the first line in the selected file.
- Create XSI shared memory, with an allocation of 2048 bytes,
- with a random key. Store the shared memory's key in
- decimal form on the second line in the selected file.
+ Create XSI shared memory, with an allocation of 2048 bytes, with a
+ random key. Store the shared memory's key in decimal form on the
+ second line in the selected file.
broadcast:
with P(X):
Z(W)
Write NUL-terminate message to shared memory
- Q := 0
- Z(S)
+ with V(N):
+ Q := 0
+ Z(S)
listen:
- V(S) with undo on exit
- forever:
- V(Q)
- Z(Q)
- Read NUL-terminated message from shared memory
- if breaking:
- P(S) with undo on exit
- break
- with V(W):
- with P(S):
- Z(S)
+ with V(S):
+ forever:
+ V(Q)
+ Z(Q)
+ Read NUL-terminated message from shared memory
+ if breaking:
+ break
+ with V(W):
+ with P(S):
+ Z(S)
+ Z(N)
diff --git a/src/bus.c b/src/bus.c
index 220b486..25bfbd8 100644
--- a/src/bus.c
+++ b/src/bus.c
@@ -61,9 +61,14 @@
#define Q 3
/**
+ * Semaphore used to notify `bus_read` that it may restore `S`
+ */
+#define N 4
+
+/**
* The number of semaphores in the semaphore array
*/
-#define BUS_SEMAPHORES 4
+#define BUS_SEMAPHORES 5
@@ -72,7 +77,8 @@
*
* @param bus:const bus_t * The bus
* @param semaphore:int The index of the semaphore, `S`, `W`, `X` or `Q`
- * @param flags:int `SEM_UNDO` if the action should be undone when the program exits
+ * @param flags:int `SEM_UNDO` if the action should be undone when the program exits,
+ * `IPC_NOWAIT` if the action should fail if it would block
* @return :int 0 on success, -1 on error
*/
#define acquire_semaphore(bus, semaphore, flags) \
@@ -94,10 +100,11 @@
*
* @param bus:const bus_t * The bus
* @param semaphore:int The index of the semaphore, `S`, `W`, `X` or `Q`
+ * @param flags:int `IPC_NOWAIT` if the action should fail if it would block
* @return :int 0 on success, -1 on error
*/
-#define zero_semaphore(bus, semaphore) \
- semaphore_op(bus, semaphore, 0, 0)
+#define zero_semaphore(bus, semaphore, flags) \
+ semaphore_op(bus, semaphore, 0, flags)
/**
* Open the semaphore array
@@ -601,17 +608,26 @@ fail:
* `BUS_MEMORY_SIZE` including the NUL-termination
* @return 0 on success, -1 on error
*/
-int
+int /* TODO nonblocking */
bus_write(const bus_t *bus, const char *message)
{
- t(acquire_semaphore(bus, X, SEM_UNDO));
- t(zero_semaphore(bus, W));
+ int state = 0, saved_errno;
+ if (acquire_semaphore(bus, X, SEM_UNDO) == -1)
+ return -1;
+ t(zero_semaphore(bus, W, 0));
write_shared_memory(bus, message);
+ t(release_semaphore(bus, N, SEM_UNDO)); state++;
t(write_semaphore(bus, Q, 0));
- t(zero_semaphore(bus, S));
+ t(zero_semaphore(bus, S, 0));
+ t(acquire_semaphore(bus, N, SEM_UNDO)); state--;
t(release_semaphore(bus, X, SEM_UNDO));
return 0;
fail:
+ saved_errno = errno;
+ if (state > 0)
+ acquire_semaphore(bus, N, SEM_UNDO);
+ release_semaphore(bus, X, SEM_UNDO);
+ errno = saved_errno;
return -1;
}
@@ -641,29 +657,36 @@ fail:
int
bus_read(const bus_t *bus, int (*callback)(const char *message, void *user_data), void *user_data)
{
- int r;
- t(release_semaphore(bus, S, SEM_UNDO));
+ int r, state = 0, saved_errno;
+ if (release_semaphore(bus, S, SEM_UNDO) == -1)
+ return -1;
t(r = callback(NULL, user_data));
- if (!r) {
- t(acquire_semaphore(bus, S, SEM_UNDO));
- return 0;
- }
+ if (!r) goto done;
for (;;) {
t(release_semaphore(bus, Q, 0));
- t(zero_semaphore(bus, Q));
+ t(zero_semaphore(bus, Q, 0));
t(r = callback(bus->message, user_data));
- if (!r) {
- t(acquire_semaphore(bus, S, SEM_UNDO));
- return 0;
- }
- t(release_semaphore(bus, W, SEM_UNDO));
- t(acquire_semaphore(bus, S, SEM_UNDO));
- t(zero_semaphore(bus, S));
- t(release_semaphore(bus, S, SEM_UNDO));
- t(acquire_semaphore(bus, W, SEM_UNDO));
+ if (!r) goto done;
+ t(release_semaphore(bus, W, SEM_UNDO)); state++;
+ t(acquire_semaphore(bus, S, SEM_UNDO)); state++;
+ t(zero_semaphore(bus, S, 0));
+ t(zero_semaphore(bus, N, 0));
+ t(release_semaphore(bus, S, SEM_UNDO)); state--;
+ t(acquire_semaphore(bus, W, SEM_UNDO)); state--;
}
fail:
+ saved_errno = errno;
+ if (state > 1)
+ release_semaphore(bus, S, SEM_UNDO);
+ if (state > 0)
+ acquire_semaphore(bus, W, SEM_UNDO);
+ acquire_semaphore(bus, S, SEM_UNDO);
+ errno = saved_errno;
return -1;
+
+done:
+ t(acquire_semaphore(bus, S, SEM_UNDO));
+ return 0;
}
@@ -678,26 +701,34 @@ bus_poll_start(bus_t *bus)
int
bus_poll_stop(const bus_t *bus)
{
- return acquire_semaphore(bus, S, SEM_UNDO);
+ return acquire_semaphore(bus, S, SEM_UNDO | IPC_NOWAIT);
}
-const char *
+const char * /* TODO nonblocking */
bus_poll(bus_t *bus)
{
+ int state = 0, saved_errno;
if (!bus->first_poll) {
- t(release_semaphore(bus, W, SEM_UNDO));
- t(acquire_semaphore(bus, S, SEM_UNDO));
- t(zero_semaphore(bus, S));
- t(release_semaphore(bus, S, SEM_UNDO));
- t(acquire_semaphore(bus, W, SEM_UNDO));
+ t(release_semaphore(bus, W, SEM_UNDO)); state++;
+ t(acquire_semaphore(bus, S, SEM_UNDO)); state++;
+ t(zero_semaphore(bus, S, 0));
+ t(zero_semaphore(bus, N, 0));
+ t(release_semaphore(bus, S, SEM_UNDO)); state--;
+ t(acquire_semaphore(bus, W, SEM_UNDO)); state--;
} else {
bus->first_poll = 0;
}
t(release_semaphore(bus, Q, 0));
- t(zero_semaphore(bus, Q));
+ t(zero_semaphore(bus, Q, 0));
return bus->message;
fail:
+ saved_errno = errno;
+ if (state > 1)
+ release_semaphore(bus, S, SEM_UNDO);
+ if (state > 0)
+ acquire_semaphore(bus, W, SEM_UNDO);
+ errno = saved_errno;
return NULL;
}