diff options
-rw-r--r-- | doc/bus_poll.3 | 8 | ||||
-rw-r--r-- | doc/examples/nonblocking/.gitignore | 5 | ||||
-rw-r--r-- | doc/examples/nonblocking/Makefile | 13 | ||||
-rw-r--r-- | doc/examples/nonblocking/cleanup.c | 8 | ||||
-rw-r--r-- | doc/examples/nonblocking/init.c | 8 | ||||
-rw-r--r-- | doc/examples/nonblocking/poll.c | 42 | ||||
-rw-r--r-- | doc/examples/nonblocking/write.c | 32 | ||||
-rw-r--r-- | src/bus.c | 41 | ||||
-rw-r--r-- | src/bus.h | 23 |
9 files changed, 157 insertions, 23 deletions
diff --git a/doc/bus_poll.3 b/doc/bus_poll.3 index 857ea20..7962082 100644 --- a/doc/bus_poll.3 +++ b/doc/bus_poll.3 @@ -6,9 +6,9 @@ bus_poll - Wait a message to be broadcasted .nf #include <bus.h> .P -int bus_poll_start(bus_t *bus); +int bus_poll_start(bus_t *bus, int flags); int bus_poll_stop(const bus_t *bus); -const char * bus_poll(bus_t *bus, int flags); +const char * bus_poll(bus_t *bus); .fi .SH DESCRIPTION The @@ -16,7 +16,9 @@ The function waits for a message to broadcasted on the \fIbus\fP, and return the message it receives. The function fails if there is not already a message waiting on the bus when the function is called and (\fIflags\fP -&BUS_NOWAIT). Received messages shall be copied and parsed, and acted +&BUS_NOWAIT) was used the last time +.BR bus_poll_start(3) +was called. Received messages shall be copied and parsed, and acted upon, in a separate thread, and .BR bus_poll(3) or diff --git a/doc/examples/nonblocking/.gitignore b/doc/examples/nonblocking/.gitignore new file mode 100644 index 0000000..e5c1856 --- /dev/null +++ b/doc/examples/nonblocking/.gitignore @@ -0,0 +1,5 @@ +cleanup +init +write +poll + diff --git a/doc/examples/nonblocking/Makefile b/doc/examples/nonblocking/Makefile new file mode 100644 index 0000000..1b1cbd3 --- /dev/null +++ b/doc/examples/nonblocking/Makefile @@ -0,0 +1,13 @@ +COMMANDS = init cleanup write poll + +all: ${COMMANDS} + +%: %.c + ${CC} -Wall -Wextra -pedantic -std=c99 -lbus -o $@ $< + +clean: + -rm ${COMMANDS} + + +.PHONY: all clean + diff --git a/doc/examples/nonblocking/cleanup.c b/doc/examples/nonblocking/cleanup.c new file mode 100644 index 0000000..00f07bc --- /dev/null +++ b/doc/examples/nonblocking/cleanup.c @@ -0,0 +1,8 @@ +#include <bus.h> +#include <stdio.h> + +int main() +{ + return bus_unlink("/tmp/example-bus") && (perror("cleanup"), 1); +} + diff --git a/doc/examples/nonblocking/init.c b/doc/examples/nonblocking/init.c new file mode 100644 index 0000000..870e10d --- /dev/null +++ b/doc/examples/nonblocking/init.c @@ -0,0 +1,8 @@ +#include <bus.h> +#include <stdio.h> + +int main() +{ + return bus_create("/tmp/example-bus", 0, NULL) && (perror("init"), 1); +} + diff --git a/doc/examples/nonblocking/poll.c b/doc/examples/nonblocking/poll.c new file mode 100644 index 0000000..c918b5b --- /dev/null +++ b/doc/examples/nonblocking/poll.c @@ -0,0 +1,42 @@ +#include <bus.h> +#include <stdio.h> +#include <unistd.h> +#include <string.h> +#include <errno.h> + +#define t(stmt) if (stmt) goto fail + + + +int main() +{ + bus_t bus; + const char *message; + long long tick = 0; + t(bus_open(&bus, "/tmp/example-bus", BUS_RDONLY)); + t(bus_poll_start(&bus, BUS_NOWAIT)); + for (;;) { + message = bus_poll(&bus); + if (message == NULL) { + t(errno != EAGAIN); + printf("waiting... %lli\n", ++tick); + sleep(1); + continue; + } + tick = 0; + message = strchr(message, ' ') + 1; + if (!strcmp(message, "stop")) + break; + printf("\033[01m%s\033[21m\n", message); + } + t(bus_poll_stop(&bus)); + bus_close(&bus); + return 0; + +fail: + perror("poll"); + bus_poll_stop(&bus); + bus_close(&bus); + return 1; +} + diff --git a/doc/examples/nonblocking/write.c b/doc/examples/nonblocking/write.c new file mode 100644 index 0000000..7fe9690 --- /dev/null +++ b/doc/examples/nonblocking/write.c @@ -0,0 +1,32 @@ +#include <bus.h> +#include <stdio.h> +#include <unistd.h> +#include <stdint.h> + +#define t(stmt) if (stmt) goto fail + + + +static char message[BUS_MEMORY_SIZE]; + + + +int main(int argc, char *argv[]) +{ + bus_t bus; + if (argc < 2) { + fprintf(stderr, "%s: USAGE: %s message\n", argv[0], argv[0]); + return 2; + } + sprintf(message, "0 %s", argv[1]); + t(bus_open(&bus, "/tmp/example-bus", BUS_WRONLY)); + t(bus_write(&bus, message, BUS_NOWAIT)); + bus_close(&bus); + return 0; + +fail: + perror("write"); + bus_close(&bus); + return 1; +} + @@ -745,14 +745,25 @@ done: * misbehave, is `bus_poll` is written to expect * this function to have been called. * - * @param bus Bus information - * @return 0 on success, -1 on error + * @param bus Bus information + * @param flags `IPC_NOWAIT` if the bus should fail and set `errno` to + * `EAGAIN` if there isn't already a message available on + * the bus when `bus_poll` is called + * @return 0 on success, -1 on error */ int -bus_poll_start(bus_t *bus) +bus_poll_start(bus_t *bus, int flags) { bus->first_poll = 1; - return release_semaphore(bus, S, SEM_UNDO); + bus->flags = flags; + t(release_semaphore(bus, S, SEM_UNDO)); + if (flags & BUS_NOWAIT) { + t(release_semaphore(bus, Q, 0)); + } + return 0; + +fail: + return -1; } @@ -779,29 +790,35 @@ bus_poll_stop(const bus_t *bus) * started, the caller of this function should then * either call `bus_poll` again or `bus_poll_stop`. * - * @param bus Bus information - * @param flags `IPC_NOWAIT` if the bus should fail and set `errno` to - * `EAGAIN` if this isn't already a message available on the bus - * @return The received message, `NULL` on error + * @param bus Bus information + * @return The received message, `NULL` on error */ const char * -bus_poll(bus_t *bus, int flags) +bus_poll(bus_t *bus) { int state = 0, saved_errno; - (void) flags; if (!bus->first_poll) { t(release_semaphore(bus, W, SEM_UNDO)); state++; t(acquire_semaphore(bus, S, SEM_UNDO)); state++; t(zero_semaphore(bus, S, 0)); +#ifndef BUS_SEMAPHORES_ARE_SYNCHRONOUS_ME_HARDER t(zero_semaphore(bus, N, 0)); +#endif t(release_semaphore(bus, S, SEM_UNDO)); state--; t(acquire_semaphore(bus, W, SEM_UNDO)); state--; + if (bus->flags & BUS_NOWAIT) { + t(release_semaphore(bus, Q, 0)); + } } else { bus->first_poll = 0; } state--; - t(release_semaphore(bus, Q, 0)); - t(zero_semaphore(bus, Q, F(BUS_NOWAIT, IPC_NOWAIT))); + if (bus->flags & BUS_NOWAIT) { + t(zero_semaphore(bus, Q, IPC_NOWAIT)); + } else { + t(release_semaphore(bus, Q, 0)); + t(zero_semaphore(bus, Q, 0)); + } return bus->message; fail: @@ -105,6 +105,12 @@ typedef struct bus * if `bus_poll` failed during reading */ int first_poll; + + /** + * Flags used for polling + */ + int flags; + } bus_t; @@ -199,10 +205,13 @@ int bus_read(const bus_t *bus, int (*callback)(const char *message, void *user_d * misbehave, is `bus_poll` is written to expect * this function to have been called. * - * @param bus Bus information - * @return 0 on success, -1 on error + * @param bus Bus information + * @param flags `IPC_NOWAIT` if the bus should fail and set `errno` to + * `EAGAIN` if there isn't already a message available on + * the bus when `bus_poll` is called + * @return 0 on success, -1 on error */ -int bus_poll_start(bus_t *bus); +int bus_poll_start(bus_t *bus, int flags); /** * Announce that the thread has stopped listening on the bus. @@ -222,12 +231,10 @@ int bus_poll_stop(const bus_t *bus); * started, the caller of this function should then * either call `bus_poll` again or `bus_poll_stop`. * - * @param bus Bus information - * @param flags `IPC_NOWAIT` if the bus should fail and set `errno` to - * `EAGAIN` if this isn't already a message available on the bus - * @return The received message, `NULL` on error + * @param bus Bus information + * @return The received message, `NULL` on error */ -const char *bus_poll(bus_t *bus, int flags); +const char *bus_poll(bus_t *bus); /* TODO bus_poll_timed */ |