aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--doc/bus_poll.38
-rw-r--r--doc/examples/nonblocking/.gitignore5
-rw-r--r--doc/examples/nonblocking/Makefile13
-rw-r--r--doc/examples/nonblocking/cleanup.c8
-rw-r--r--doc/examples/nonblocking/init.c8
-rw-r--r--doc/examples/nonblocking/poll.c42
-rw-r--r--doc/examples/nonblocking/write.c32
-rw-r--r--src/bus.c41
-rw-r--r--src/bus.h23
9 files changed, 157 insertions, 23 deletions
diff --git a/doc/bus_poll.3 b/doc/bus_poll.3
index 857ea20..7962082 100644
--- a/doc/bus_poll.3
+++ b/doc/bus_poll.3
@@ -6,9 +6,9 @@ bus_poll - Wait a message to be broadcasted
.nf
#include <bus.h>
.P
-int bus_poll_start(bus_t *bus);
+int bus_poll_start(bus_t *bus, int flags);
int bus_poll_stop(const bus_t *bus);
-const char * bus_poll(bus_t *bus, int flags);
+const char * bus_poll(bus_t *bus);
.fi
.SH DESCRIPTION
The
@@ -16,7 +16,9 @@ The
function waits for a message to broadcasted on the \fIbus\fP, and return
the message it receives. The function fails if there is not already a
message waiting on the bus when the function is called and (\fIflags\fP
-&BUS_NOWAIT). Received messages shall be copied and parsed, and acted
+&BUS_NOWAIT) was used the last time
+.BR bus_poll_start(3)
+was called. Received messages shall be copied and parsed, and acted
upon, in a separate thread, and
.BR bus_poll(3)
or
diff --git a/doc/examples/nonblocking/.gitignore b/doc/examples/nonblocking/.gitignore
new file mode 100644
index 0000000..e5c1856
--- /dev/null
+++ b/doc/examples/nonblocking/.gitignore
@@ -0,0 +1,5 @@
+cleanup
+init
+write
+poll
+
diff --git a/doc/examples/nonblocking/Makefile b/doc/examples/nonblocking/Makefile
new file mode 100644
index 0000000..1b1cbd3
--- /dev/null
+++ b/doc/examples/nonblocking/Makefile
@@ -0,0 +1,13 @@
+COMMANDS = init cleanup write poll
+
+all: ${COMMANDS}
+
+%: %.c
+ ${CC} -Wall -Wextra -pedantic -std=c99 -lbus -o $@ $<
+
+clean:
+ -rm ${COMMANDS}
+
+
+.PHONY: all clean
+
diff --git a/doc/examples/nonblocking/cleanup.c b/doc/examples/nonblocking/cleanup.c
new file mode 100644
index 0000000..00f07bc
--- /dev/null
+++ b/doc/examples/nonblocking/cleanup.c
@@ -0,0 +1,8 @@
+#include <bus.h>
+#include <stdio.h>
+
+int main()
+{
+ return bus_unlink("/tmp/example-bus") && (perror("cleanup"), 1);
+}
+
diff --git a/doc/examples/nonblocking/init.c b/doc/examples/nonblocking/init.c
new file mode 100644
index 0000000..870e10d
--- /dev/null
+++ b/doc/examples/nonblocking/init.c
@@ -0,0 +1,8 @@
+#include <bus.h>
+#include <stdio.h>
+
+int main()
+{
+ return bus_create("/tmp/example-bus", 0, NULL) && (perror("init"), 1);
+}
+
diff --git a/doc/examples/nonblocking/poll.c b/doc/examples/nonblocking/poll.c
new file mode 100644
index 0000000..c918b5b
--- /dev/null
+++ b/doc/examples/nonblocking/poll.c
@@ -0,0 +1,42 @@
+#include <bus.h>
+#include <stdio.h>
+#include <unistd.h>
+#include <string.h>
+#include <errno.h>
+
+#define t(stmt) if (stmt) goto fail
+
+
+
+int main()
+{
+ bus_t bus;
+ const char *message;
+ long long tick = 0;
+ t(bus_open(&bus, "/tmp/example-bus", BUS_RDONLY));
+ t(bus_poll_start(&bus, BUS_NOWAIT));
+ for (;;) {
+ message = bus_poll(&bus);
+ if (message == NULL) {
+ t(errno != EAGAIN);
+ printf("waiting... %lli\n", ++tick);
+ sleep(1);
+ continue;
+ }
+ tick = 0;
+ message = strchr(message, ' ') + 1;
+ if (!strcmp(message, "stop"))
+ break;
+ printf("\033[01m%s\033[21m\n", message);
+ }
+ t(bus_poll_stop(&bus));
+ bus_close(&bus);
+ return 0;
+
+fail:
+ perror("poll");
+ bus_poll_stop(&bus);
+ bus_close(&bus);
+ return 1;
+}
+
diff --git a/doc/examples/nonblocking/write.c b/doc/examples/nonblocking/write.c
new file mode 100644
index 0000000..7fe9690
--- /dev/null
+++ b/doc/examples/nonblocking/write.c
@@ -0,0 +1,32 @@
+#include <bus.h>
+#include <stdio.h>
+#include <unistd.h>
+#include <stdint.h>
+
+#define t(stmt) if (stmt) goto fail
+
+
+
+static char message[BUS_MEMORY_SIZE];
+
+
+
+int main(int argc, char *argv[])
+{
+ bus_t bus;
+ if (argc < 2) {
+ fprintf(stderr, "%s: USAGE: %s message\n", argv[0], argv[0]);
+ return 2;
+ }
+ sprintf(message, "0 %s", argv[1]);
+ t(bus_open(&bus, "/tmp/example-bus", BUS_WRONLY));
+ t(bus_write(&bus, message, BUS_NOWAIT));
+ bus_close(&bus);
+ return 0;
+
+fail:
+ perror("write");
+ bus_close(&bus);
+ return 1;
+}
+
diff --git a/src/bus.c b/src/bus.c
index 72747ea..4b2d0f9 100644
--- a/src/bus.c
+++ b/src/bus.c
@@ -745,14 +745,25 @@ done:
* misbehave, is `bus_poll` is written to expect
* this function to have been called.
*
- * @param bus Bus information
- * @return 0 on success, -1 on error
+ * @param bus Bus information
+ * @param flags `IPC_NOWAIT` if the bus should fail and set `errno` to
+ * `EAGAIN` if there isn't already a message available on
+ * the bus when `bus_poll` is called
+ * @return 0 on success, -1 on error
*/
int
-bus_poll_start(bus_t *bus)
+bus_poll_start(bus_t *bus, int flags)
{
bus->first_poll = 1;
- return release_semaphore(bus, S, SEM_UNDO);
+ bus->flags = flags;
+ t(release_semaphore(bus, S, SEM_UNDO));
+ if (flags & BUS_NOWAIT) {
+ t(release_semaphore(bus, Q, 0));
+ }
+ return 0;
+
+fail:
+ return -1;
}
@@ -779,29 +790,35 @@ bus_poll_stop(const bus_t *bus)
* started, the caller of this function should then
* either call `bus_poll` again or `bus_poll_stop`.
*
- * @param bus Bus information
- * @param flags `IPC_NOWAIT` if the bus should fail and set `errno` to
- * `EAGAIN` if this isn't already a message available on the bus
- * @return The received message, `NULL` on error
+ * @param bus Bus information
+ * @return The received message, `NULL` on error
*/
const char *
-bus_poll(bus_t *bus, int flags)
+bus_poll(bus_t *bus)
{
int state = 0, saved_errno;
- (void) flags;
if (!bus->first_poll) {
t(release_semaphore(bus, W, SEM_UNDO)); state++;
t(acquire_semaphore(bus, S, SEM_UNDO)); state++;
t(zero_semaphore(bus, S, 0));
+#ifndef BUS_SEMAPHORES_ARE_SYNCHRONOUS_ME_HARDER
t(zero_semaphore(bus, N, 0));
+#endif
t(release_semaphore(bus, S, SEM_UNDO)); state--;
t(acquire_semaphore(bus, W, SEM_UNDO)); state--;
+ if (bus->flags & BUS_NOWAIT) {
+ t(release_semaphore(bus, Q, 0));
+ }
} else {
bus->first_poll = 0;
}
state--;
- t(release_semaphore(bus, Q, 0));
- t(zero_semaphore(bus, Q, F(BUS_NOWAIT, IPC_NOWAIT)));
+ if (bus->flags & BUS_NOWAIT) {
+ t(zero_semaphore(bus, Q, IPC_NOWAIT));
+ } else {
+ t(release_semaphore(bus, Q, 0));
+ t(zero_semaphore(bus, Q, 0));
+ }
return bus->message;
fail:
diff --git a/src/bus.h b/src/bus.h
index 4f38067..2cd534b 100644
--- a/src/bus.h
+++ b/src/bus.h
@@ -105,6 +105,12 @@ typedef struct bus
* if `bus_poll` failed during reading
*/
int first_poll;
+
+ /**
+ * Flags used for polling
+ */
+ int flags;
+
} bus_t;
@@ -199,10 +205,13 @@ int bus_read(const bus_t *bus, int (*callback)(const char *message, void *user_d
* misbehave, is `bus_poll` is written to expect
* this function to have been called.
*
- * @param bus Bus information
- * @return 0 on success, -1 on error
+ * @param bus Bus information
+ * @param flags `IPC_NOWAIT` if the bus should fail and set `errno` to
+ * `EAGAIN` if there isn't already a message available on
+ * the bus when `bus_poll` is called
+ * @return 0 on success, -1 on error
*/
-int bus_poll_start(bus_t *bus);
+int bus_poll_start(bus_t *bus, int flags);
/**
* Announce that the thread has stopped listening on the bus.
@@ -222,12 +231,10 @@ int bus_poll_stop(const bus_t *bus);
* started, the caller of this function should then
* either call `bus_poll` again or `bus_poll_stop`.
*
- * @param bus Bus information
- * @param flags `IPC_NOWAIT` if the bus should fail and set `errno` to
- * `EAGAIN` if this isn't already a message available on the bus
- * @return The received message, `NULL` on error
+ * @param bus Bus information
+ * @return The received message, `NULL` on error
*/
-const char *bus_poll(bus_t *bus, int flags);
+const char *bus_poll(bus_t *bus);
/* TODO bus_poll_timed */