aboutsummaryrefslogtreecommitdiffstats
path: root/src/bus.c
diff options
context:
space:
mode:
Diffstat (limited to '')
-rw-r--r--src/bus.c95
1 files changed, 63 insertions, 32 deletions
diff --git a/src/bus.c b/src/bus.c
index 220b486..25bfbd8 100644
--- a/src/bus.c
+++ b/src/bus.c
@@ -61,9 +61,14 @@
#define Q 3
/**
+ * Semaphore used to notify `bus_read` that it may restore `S`
+ */
+#define N 4
+
+/**
* The number of semaphores in the semaphore array
*/
-#define BUS_SEMAPHORES 4
+#define BUS_SEMAPHORES 5
@@ -72,7 +77,8 @@
*
* @param bus:const bus_t * The bus
* @param semaphore:int The index of the semaphore, `S`, `W`, `X` or `Q`
- * @param flags:int `SEM_UNDO` if the action should be undone when the program exits
+ * @param flags:int `SEM_UNDO` if the action should be undone when the program exits,
+ * `IPC_NOWAIT` if the action should fail if it would block
* @return :int 0 on success, -1 on error
*/
#define acquire_semaphore(bus, semaphore, flags) \
@@ -94,10 +100,11 @@
*
* @param bus:const bus_t * The bus
* @param semaphore:int The index of the semaphore, `S`, `W`, `X` or `Q`
+ * @param flags:int `IPC_NOWAIT` if the action should fail if it would block
* @return :int 0 on success, -1 on error
*/
-#define zero_semaphore(bus, semaphore) \
- semaphore_op(bus, semaphore, 0, 0)
+#define zero_semaphore(bus, semaphore, flags) \
+ semaphore_op(bus, semaphore, 0, flags)
/**
* Open the semaphore array
@@ -601,17 +608,26 @@ fail:
* `BUS_MEMORY_SIZE` including the NUL-termination
* @return 0 on success, -1 on error
*/
-int
+int /* TODO nonblocking */
bus_write(const bus_t *bus, const char *message)
{
- t(acquire_semaphore(bus, X, SEM_UNDO));
- t(zero_semaphore(bus, W));
+ int state = 0, saved_errno;
+ if (acquire_semaphore(bus, X, SEM_UNDO) == -1)
+ return -1;
+ t(zero_semaphore(bus, W, 0));
write_shared_memory(bus, message);
+ t(release_semaphore(bus, N, SEM_UNDO)); state++;
t(write_semaphore(bus, Q, 0));
- t(zero_semaphore(bus, S));
+ t(zero_semaphore(bus, S, 0));
+ t(acquire_semaphore(bus, N, SEM_UNDO)); state--;
t(release_semaphore(bus, X, SEM_UNDO));
return 0;
fail:
+ saved_errno = errno;
+ if (state > 0)
+ acquire_semaphore(bus, N, SEM_UNDO);
+ release_semaphore(bus, X, SEM_UNDO);
+ errno = saved_errno;
return -1;
}
@@ -641,29 +657,36 @@ fail:
int
bus_read(const bus_t *bus, int (*callback)(const char *message, void *user_data), void *user_data)
{
- int r;
- t(release_semaphore(bus, S, SEM_UNDO));
+ int r, state = 0, saved_errno;
+ if (release_semaphore(bus, S, SEM_UNDO) == -1)
+ return -1;
t(r = callback(NULL, user_data));
- if (!r) {
- t(acquire_semaphore(bus, S, SEM_UNDO));
- return 0;
- }
+ if (!r) goto done;
for (;;) {
t(release_semaphore(bus, Q, 0));
- t(zero_semaphore(bus, Q));
+ t(zero_semaphore(bus, Q, 0));
t(r = callback(bus->message, user_data));
- if (!r) {
- t(acquire_semaphore(bus, S, SEM_UNDO));
- return 0;
- }
- t(release_semaphore(bus, W, SEM_UNDO));
- t(acquire_semaphore(bus, S, SEM_UNDO));
- t(zero_semaphore(bus, S));
- t(release_semaphore(bus, S, SEM_UNDO));
- t(acquire_semaphore(bus, W, SEM_UNDO));
+ if (!r) goto done;
+ t(release_semaphore(bus, W, SEM_UNDO)); state++;
+ t(acquire_semaphore(bus, S, SEM_UNDO)); state++;
+ t(zero_semaphore(bus, S, 0));
+ t(zero_semaphore(bus, N, 0));
+ t(release_semaphore(bus, S, SEM_UNDO)); state--;
+ t(acquire_semaphore(bus, W, SEM_UNDO)); state--;
}
fail:
+ saved_errno = errno;
+ if (state > 1)
+ release_semaphore(bus, S, SEM_UNDO);
+ if (state > 0)
+ acquire_semaphore(bus, W, SEM_UNDO);
+ acquire_semaphore(bus, S, SEM_UNDO);
+ errno = saved_errno;
return -1;
+
+done:
+ t(acquire_semaphore(bus, S, SEM_UNDO));
+ return 0;
}
@@ -678,26 +701,34 @@ bus_poll_start(bus_t *bus)
int
bus_poll_stop(const bus_t *bus)
{
- return acquire_semaphore(bus, S, SEM_UNDO);
+ return acquire_semaphore(bus, S, SEM_UNDO | IPC_NOWAIT);
}
-const char *
+const char * /* TODO nonblocking */
bus_poll(bus_t *bus)
{
+ int state = 0, saved_errno;
if (!bus->first_poll) {
- t(release_semaphore(bus, W, SEM_UNDO));
- t(acquire_semaphore(bus, S, SEM_UNDO));
- t(zero_semaphore(bus, S));
- t(release_semaphore(bus, S, SEM_UNDO));
- t(acquire_semaphore(bus, W, SEM_UNDO));
+ t(release_semaphore(bus, W, SEM_UNDO)); state++;
+ t(acquire_semaphore(bus, S, SEM_UNDO)); state++;
+ t(zero_semaphore(bus, S, 0));
+ t(zero_semaphore(bus, N, 0));
+ t(release_semaphore(bus, S, SEM_UNDO)); state--;
+ t(acquire_semaphore(bus, W, SEM_UNDO)); state--;
} else {
bus->first_poll = 0;
}
t(release_semaphore(bus, Q, 0));
- t(zero_semaphore(bus, Q));
+ t(zero_semaphore(bus, Q, 0));
return bus->message;
fail:
+ saved_errno = errno;
+ if (state > 1)
+ release_semaphore(bus, S, SEM_UNDO);
+ if (state > 0)
+ acquire_semaphore(bus, W, SEM_UNDO);
+ errno = saved_errno;
return NULL;
}