aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
Diffstat (limited to '')
-rw-r--r--src/bus.c185
1 files changed, 178 insertions, 7 deletions
diff --git a/src/bus.c b/src/bus.c
index 06c708a..516def1 100644
--- a/src/bus.c
+++ b/src/bus.c
@@ -22,6 +22,7 @@
* DEALINGS IN THE SOFTWARE.
*/
#define _XOPEN_SOURCE 700
+#define _GNU_SOURCE
#include "bus.h"
#include <stdlib.h>
@@ -119,7 +120,7 @@
semaphore_op(bus, semaphore, +1, flags)
/**
- * Wait for the value of a semphore to become 0
+ * Wait for the value of a semaphore to become 0
*
* @param bus:const bus_t * The bus
* @param semaphore:int The index of the semaphore, `S`, `W`, `X` or `Q`
@@ -130,6 +131,43 @@
semaphore_op(bus, semaphore, 0, flags)
/**
+ * Decrease the value of a semaphore by 1
+ *
+ * @param bus:const bus_t * The bus
+ * @param semaphore:int The index of the semaphore, `S`, `W`, `X` or `Q`
+ * @param flags:int `SEM_UNDO` if the action should be undone when the program exits,
+ * `IPC_NOWAIT` if the action should fail if it would block
+ * @param timeout:const struct timespec * The amount of time to wait before failing
+ * @return :int 0 on success, -1 on error
+ */
+#define acquire_semaphore_timed(bus, semaphore, flags, timeout) \
+ semaphore_op_timed(bus, semaphore, -1, flags, timeout)
+
+/**
+ * Increase the value of a semaphore by 1
+ *
+ * @param bus:const bus_t * The bus
+ * @param semaphore:int The index of the semaphore, `S`, `W`, `X` or `Q`
+ * @param flags:int `SEM_UNDO` if the action should be undone when the program exits
+ * @param timeout:const struct timespec * The amount of time to wait before failing
+ * @return :int 0 on success, -1 on error
+ */
+#define release_semaphore_timed(bus, semaphore, flags, timeout) \
+ semaphore_op_timed(bus, semaphore, +1, flags, timeout)
+
+/**
+ * Wait for the value of a semaphore to become 0
+ *
+ * @param bus:const bus_t * The bus
+ * @param semaphore:int The index of the semaphore, `S`, `W`, `X` or `Q`
+ * @param flags:int `IPC_NOWAIT` if the action should fail if it would block
+ * @param timeout:const struct timespec * The amount of time to wait before failing
+ * @return :int 0 on success, -1 on error
+ */
+#define zero_semaphore_timed(bus, semaphore, flags, timeout) \
+ semaphore_op_timed(bus, semaphore, 0, flags, timeout)
+
+/**
* Open the semaphore array
*
* @param bus:const bus_t * The bus
@@ -150,6 +188,21 @@
/**
+ * Set `delta` to the convertion of `timeout` from absolute to relative time,
+ * measured in the clock whose ID is specified by `clockid`
+ *
+ * @scope timeout:struct timespec Output variable for relative time
+ * @scope timeout:const struct timespec * The absolute time
+ * @scope clockid:clockid_t The clock time is measured
+ */
+#define DELTA \
+ do { \
+ if (absolute_time_to_delta_time(&delta, timeout, clockid) < 0) goto fail; \
+ else if ((delta.tv_sec < 0) || (delta.tv_nsec < 0)) { errno = EAGAIN; goto fail; } \
+ } while (0)
+
+
+/**
* If `flags & (bus_flag)`, this macro evalutes to `sys_flag`,
* otherwise this macro evalutes to 0.
*/
@@ -319,6 +372,27 @@ semaphore_op(const bus_t *bus, int semaphore, int delta, int flags)
/**
+ * Increase or decrease the value of a semaphore, or wait the it to become 0
+ *
+ * @param bus Bus information
+ * @param semaphore The index of the semaphore, `S`, `W`, `X` or `Q`
+ * @param delta The adjustment to make to the semaphore's value, 0 to wait for it to become 0
+ * @param flags `SEM_UNDO` if the action should be undone when the program exits
+ * @param timeout The amount of time to wait before failing
+ * @return 0 on success, -1 on error
+ */
+static int
+semaphore_op_timed(const bus_t *bus, int semaphore, int delta, int flags, const struct timespec *timeout)
+{
+ struct sembuf op;
+ op.sem_num = (unsigned short)semaphore;
+ op.sem_op = (short)delta;
+ op.sem_flg = (short)flags;
+ return semtimedop(bus->sem_id, &op, (size_t)1, timeout);
+}
+
+
+/**
* Set the value of a semaphore
*
* @param bus Bus information
@@ -722,10 +796,41 @@ fail:
int bus_write_timed(const bus_t *bus, const char *message,
const struct timespec *timeout, clockid_t clockid)
{
- /* TODO bus_write_timed */
+ int saved_errno;
+#ifndef BUS_SEMAPHORES_ARE_SYNCHRONOUS
+ int state = 0;
+#endif
+ struct timespec delta;
if (!timeout)
return bus_write(bus, message, 0);
- (void) bus, (void) message, (void) timeout, (void) clockid;
+
+ DELTA;
+ if (acquire_semaphore_timed(bus, X, SEM_UNDO, &delta) == -1)
+ return -1;
+ DELTA;
+ t(zero_semaphore_timed(bus, W, 0, &delta));
+ write_shared_memory(bus, message);
+#ifndef BUS_SEMAPHORES_ARE_SYNCHRONOUS
+ t(release_semaphore(bus, N, SEM_UNDO)); state++;
+#endif
+ t(write_semaphore(bus, Q, 0));
+ t(zero_semaphore(bus, S, 0));
+#ifndef BUS_SEMAPHORES_ARE_SYNCHRONOUS
+ t(acquire_semaphore(bus, N, SEM_UNDO)); state--;
+#endif
+ t(release_semaphore(bus, X, SEM_UNDO));
+ return 0;
+
+fail:
+ saved_errno = errno;
+#ifndef BUS_SEMAPHORES_ARE_SYNCHRONOUS
+ if (state > 0)
+ acquire_semaphore(bus, N, SEM_UNDO);
+#endif
+ release_semaphore(bus, X, SEM_UNDO);
+ errno = saved_errno;
+ return -1;
+
}
@@ -822,10 +927,46 @@ done:
int bus_read_timed(const bus_t *bus, int (*callback)(const char *message, void *user_data),
void *user_data, const struct timespec *timeout, clockid_t clockid)
{
- /* TODO bus_read_timed */
+ int r, state = 0, saved_errno;
+ struct timespec delta;
if (!timeout)
return bus_read(bus, callback, user_data);
- (void) bus, (void) callback, (void) user_data, (void) timeout, (void) clockid;
+
+ DELTA;
+ if (release_semaphore_timed(bus, S, SEM_UNDO, &delta) == -1)
+ return -1;
+ t(r = callback(NULL, user_data));
+ if (!r) goto done;
+ for (;;) {
+ DELTA;
+ t(release_semaphore_timed(bus, Q, 0, &delta));
+ DELTA;
+ t(zero_semaphore_timed(bus, Q, 0, &delta));
+ t(r = callback(bus->message, user_data));
+ if (!r) goto done;
+ t(release_semaphore(bus, W, SEM_UNDO)); state++;
+ t(acquire_semaphore(bus, S, SEM_UNDO)); state++;
+ t(zero_semaphore(bus, S, 0));
+#ifndef BUS_SEMAPHORES_ARE_SYNCHRONOUS_ME_HARDER
+ t(zero_semaphore(bus, N, 0));
+#endif
+ t(release_semaphore(bus, S, SEM_UNDO)); state--;
+ t(acquire_semaphore(bus, W, SEM_UNDO)); state--;
+ }
+
+fail:
+ saved_errno = errno;
+ if (state > 1)
+ release_semaphore(bus, S, SEM_UNDO);
+ if (state > 0)
+ acquire_semaphore(bus, W, SEM_UNDO);
+ acquire_semaphore(bus, S, SEM_UNDO);
+ errno = saved_errno;
+ return -1;
+
+done:
+ t(acquire_semaphore(bus, S, SEM_UNDO));
+ return 0;
}
@@ -943,10 +1084,40 @@ fail:
*/
const char *bus_poll_timed(bus_t *bus, const struct timespec *timeout, clockid_t clockid)
{
- /* TODO bus_poll_timed */
+ int state = 0, saved_errno;
+ struct timespec delta;
if (!timeout)
return bus_poll(bus);
- (void) bus, (void) timeout, (void) clockid;
+
+ if (!bus->first_poll) {
+ t(release_semaphore(bus, W, SEM_UNDO)); state++;
+ t(acquire_semaphore(bus, S, SEM_UNDO)); state++;
+ t(zero_semaphore(bus, S, 0));
+#ifndef BUS_SEMAPHORES_ARE_SYNCHRONOUS_ME_HARDER
+ t(zero_semaphore(bus, N, 0));
+#endif
+ t(release_semaphore(bus, S, SEM_UNDO)); state--;
+ t(acquire_semaphore(bus, W, SEM_UNDO)); state--;
+ } else {
+ bus->first_poll = 0;
+ }
+ state--;
+ DELTA;
+ t(release_semaphore_timed(bus, Q, 0, &delta));
+ DELTA;
+ t(zero_semaphore_timed(bus, Q, 0, &delta));
+ return bus->message;
+
+fail:
+ saved_errno = errno;
+ if (state > 1)
+ release_semaphore(bus, S, SEM_UNDO);
+ if (state > 0)
+ acquire_semaphore(bus, W, SEM_UNDO);
+ if (state < 0)
+ bus->first_poll = 1;
+ errno = saved_errno;
+ return NULL;
}