aboutsummaryrefslogtreecommitdiffstats
path: root/libbus.c
diff options
context:
space:
mode:
authorMattias Andrée <maandree@kth.se>2017-12-11 23:13:37 +0100
committerMattias Andrée <maandree@kth.se>2017-12-11 23:13:37 +0100
commite35ba8684be9951fa2129503477ccd5ed6e4e5fc (patch)
tree756292d5a18ad6011a9311fdea8159f474385b65 /libbus.c
parenttypo (diff)
downloadbus-e35ba8684be9951fa2129503477ccd5ed6e4e5fc.tar.gz
bus-e35ba8684be9951fa2129503477ccd5ed6e4e5fc.tar.bz2
bus-e35ba8684be9951fa2129503477ccd5ed6e4e5fc.tar.xz
Simplify, do not install examples or info manual, and change license3.1.7
Signed-off-by: Mattias Andrée <maandree@kth.se>
Diffstat (limited to 'libbus.c')
-rw-r--r--libbus.c1173
1 files changed, 1173 insertions, 0 deletions
diff --git a/libbus.c b/libbus.c
new file mode 100644
index 0000000..0a84353
--- /dev/null
+++ b/libbus.c
@@ -0,0 +1,1173 @@
+/* See LICENSE file for copyright and license details. */
+#include "bus.h"
+
+#include <sys/ipc.h>
+#include <sys/sem.h>
+#include <sys/shm.h>
+#include <sys/stat.h>
+#include <sys/types.h>
+#include <errno.h>
+#include <fcntl.h>
+#include <stdio.h>
+#include <stdlib.h>
+#include <string.h>
+#include <time.h>
+#include <time.h>
+#include <unistd.h>
+
+
+#ifdef BUS_SEMAPHORES_ARE_SYNCHRONOUS_ME_EVEN_HARDER
+# ifndef BUS_SEMAPHORES_ARE_SYNCHRONOUS_ME_HARDER
+# define BUS_SEMAPHORES_ARE_SYNCHRONOUS_ME_HARDER
+# endif
+#endif
+#ifdef BUS_SEMAPHORES_ARE_SYNCHRONOUS_ME_HARDER
+# ifndef BUS_SEMAPHORES_ARE_SYNCHRONOUS
+# define BUS_SEMAPHORES_ARE_SYNCHRONOUS
+# endif
+#endif
+
+
+/**
+ * Semaphore used to signal `bus_write` that `bus_read` is ready
+ */
+#define S 0
+
+/**
+ * Semaphore for making `bus_write` wait while `bus_read` is reseting `S`
+ */
+#define W 1
+
+/**
+ * Binary semaphore for making `bus_write` exclusively locked
+ */
+#define X 2
+
+/**
+ * Semaphore used to cue `bus_read` that it may read the shared memory
+ */
+#define Q 3
+
+#ifndef BUS_SEMAPHORES_ARE_SYNCHRONOUS_ME_EVEN_HARDER
+/**
+ * Semaphore used to notify `bus_read` that it may restore `S`
+ */
+# define N 4
+
+/**
+ * The number of semaphores in the semaphore array
+ */
+# define BUS_SEMAPHORES 5
+#else
+# define BUS_SEMAPHORES 4
+#endif
+
+/**
+ * The default permission mits of the bus
+ */
+#define DEFAULT_MODE 0600
+
+
+
+/**
+ * Decrease the value of a semaphore by 1
+ *
+ * @param bus:const bus_t * The bus
+ * @param semaphore:int The index of the semaphore, `S`, `W`, `X` or `Q`
+ * @param flags:int `SEM_UNDO` if the action should be undone when the program exits,
+ * `IPC_NOWAIT` if the action should fail if it would block
+ * @return :int 0 on success, -1 on error
+ */
+#define acquire_semaphore(bus, semaphore, flags) \
+ semaphore_op(bus, semaphore, -1, flags)
+
+/**
+ * Increase the value of a semaphore by 1
+ *
+ * @param bus:const bus_t * The bus
+ * @param semaphore:int The index of the semaphore, `S`, `W`, `X` or `Q`
+ * @param flags:int `SEM_UNDO` if the action should be undone when the program exits
+ * @return :int 0 on success, -1 on error
+ */
+#define release_semaphore(bus, semaphore, flags) \
+ semaphore_op(bus, semaphore, +1, flags)
+
+/**
+ * Wait for the value of a semaphore to become 0
+ *
+ * @param bus:const bus_t * The bus
+ * @param semaphore:int The index of the semaphore, `S`, `W`, `X` or `Q`
+ * @param flags:int `IPC_NOWAIT` if the action should fail if it would block
+ * @return :int 0 on success, -1 on error
+ */
+#define zero_semaphore(bus, semaphore, flags) \
+ semaphore_op(bus, semaphore, 0, flags)
+
+/**
+ * Decrease the value of a semaphore by 1
+ *
+ * @param bus:const bus_t * The bus
+ * @param semaphore:int The index of the semaphore, `S`, `W`, `X` or `Q`
+ * @param flags:int `SEM_UNDO` if the action should be undone when the program exits,
+ * `IPC_NOWAIT` if the action should fail if it would block
+ * @param timeout:const struct timespec * The amount of time to wait before failing
+ * @return :int 0 on success, -1 on error
+ */
+#define acquire_semaphore_timed(bus, semaphore, flags, timeout) \
+ semaphore_op_timed(bus, semaphore, -1, flags, timeout)
+
+/**
+ * Increase the value of a semaphore by 1
+ *
+ * @param bus:const bus_t * The bus
+ * @param semaphore:int The index of the semaphore, `S`, `W`, `X` or `Q`
+ * @param flags:int `SEM_UNDO` if the action should be undone when the program exits
+ * @param timeout:const struct timespec * The amount of time to wait before failing
+ * @return :int 0 on success, -1 on error
+ */
+#define release_semaphore_timed(bus, semaphore, flags, timeout) \
+ semaphore_op_timed(bus, semaphore, +1, flags, timeout)
+
+/**
+ * Wait for the value of a semaphore to become 0
+ *
+ * @param bus:const bus_t * The bus
+ * @param semaphore:int The index of the semaphore, `S`, `W`, `X` or `Q`
+ * @param flags:int `IPC_NOWAIT` if the action should fail if it would block
+ * @param timeout:const struct timespec * The amount of time to wait before failing
+ * @return :int 0 on success, -1 on error
+ */
+#define zero_semaphore_timed(bus, semaphore, flags, timeout) \
+ semaphore_op_timed(bus, semaphore, 0, flags, timeout)
+
+/**
+ * Open the semaphore array
+ *
+ * @param bus:const bus_t * The bus
+ * @return :int 0 on success, -1 on error
+ */
+#define open_semaphores(bus) \
+ (((bus)->sem_id = semget((bus)->key_sem, BUS_SEMAPHORES, 0)) == -1 ? -1 : 0)
+
+/**
+ * Write a message to the shared memory
+ *
+ * @param bus:const bus_t * The bus
+ * @param msg:const char * The message
+ * @return :int 0 on success, -1 on error
+ */
+#define write_shared_memory(bus, msg) \
+ (memcpy((bus)->message, msg, (strlen(msg) + 1) * sizeof(char)))
+
+
+/**
+ * Set `delta` to the convertion of `timeout` from absolute to relative time,
+ * measured in the clock whose ID is specified by `clockid`
+ *
+ * @scope timeout:struct timespec Output variable for relative time
+ * @scope timeout:const struct timespec * The absolute time
+ * @scope clockid:clockid_t The clock time is measured
+ */
+#define DELTA \
+ do { \
+ if (absolute_time_to_delta_time(&delta, timeout, clockid) < 0) { \
+ goto fail; \
+ } else if ((delta.tv_sec < 0) || (delta.tv_nsec < 0)) { \
+ errno = EAGAIN; \
+ goto fail; \
+ }\
+ } while (0)
+
+
+/**
+ * If `flags & (bus_flag)`, this macro evalutes to `sys_flag`,
+ * otherwise this macro evalutes to 0.
+ */
+#define F(bus_flag, sys_flag) \
+ ((flags & (bus_flag)) ? sys_flag : 0)
+
+
+
+/**
+ * Statement wrapper that goes to `fail` on failure
+ */
+#define t(inst) \
+ do { if ((inst) == -1) goto fail; } while (0)
+
+
+
+#ifdef _SEM_SEMUN_UNDEFINED
+union semun {
+ int val;
+ struct semid_ds *buf;
+ unsigned short *array;
+};
+#endif
+
+
+
+/**
+ * Create a semaphore array for the bus
+ *
+ * @param bus Bus information to fill with the key of the created semaphore array
+ * @return 0 on success, -1 on error
+ */
+static int
+create_semaphores(bus_t *bus)
+{
+ int id = -1, rint, saved_errno;
+ double r;
+ union semun values;
+
+ values.array = NULL;
+
+ /* Create semaphore array. */
+ for (;;) {
+ rint = rand();
+ r = (double)rint;
+ r /= (double)RAND_MAX + 1;
+ r *= (1 << (8 * sizeof(key_t) - 2)) - 1;
+ bus->key_sem = (key_t)r + 1;
+ if (bus->key_sem == IPC_PRIVATE)
+ continue;
+ id = semget(bus->key_sem, BUS_SEMAPHORES, IPC_CREAT | IPC_EXCL | DEFAULT_MODE);
+ if (id != -1)
+ break;
+ if ((errno != EEXIST) && (errno != EINTR))
+ goto fail;
+ }
+
+ /* Initialise the array. */
+ values.array = calloc((size_t)BUS_SEMAPHORES, sizeof(unsigned short));
+ if (!values.array)
+ goto fail;
+ values.array[X] = 1;
+ if (semctl(id, 0, SETALL, values.array) == -1)
+ goto fail;
+ free(values.array);
+ values.array = NULL;
+
+ return 0;
+
+fail:
+ saved_errno = errno;
+ if (id != -1)
+ semctl(id, 0, IPC_RMID);
+ free(values.array);
+ errno = saved_errno;
+ return -1;
+}
+
+
+/**
+ * Create a shared memory for the bus
+ *
+ * @param bus Bus information to fill with the key of the created shared memory
+ * @return 0 on success, -1 on error
+ */
+static int
+create_shared_memory(bus_t *bus)
+{
+ int id = -1, rint, saved_errno;
+ double r;
+ struct shmid_ds _info;
+
+ /* Create shared memory. */
+ for (;;) {
+ rint = rand();
+ r = (double)rint;
+ r /= (double)RAND_MAX + 1;
+ r *= (1 << (8 * sizeof(key_t) - 2)) - 1;
+ bus->key_shm = (key_t)r + 1;
+ if (bus->key_shm == IPC_PRIVATE)
+ continue;
+ id = shmget(bus->key_shm, (size_t)BUS_MEMORY_SIZE, IPC_CREAT | IPC_EXCL | DEFAULT_MODE);
+ if (id != -1)
+ break;
+ if ((errno != EEXIST) && (errno != EINTR))
+ goto fail;
+ }
+
+ return 0;
+
+fail:
+ saved_errno = errno;
+ if (id != -1)
+ shmctl(id, IPC_RMID, &_info);
+ errno = saved_errno;
+ return -1;
+}
+
+
+/**
+ * Remove the semaphore array for the bus
+ *
+ * @param bus Bus information
+ * @return 0 on success, -1 on error
+ */
+static int
+remove_semaphores(const bus_t *bus)
+{
+ int id = semget(bus->key_sem, BUS_SEMAPHORES, 0);
+ return ((id == -1) || (semctl(id, 0, IPC_RMID) == -1)) ? -1 : 0;
+}
+
+
+/**
+ * Remove the shared memory for the bus
+ *
+ * @param bus Bus information
+ * @return 0 on success, -1 on error
+ */
+static int
+remove_shared_memory(const bus_t *bus)
+{
+ struct shmid_ds _info;
+ int id = shmget(bus->key_shm, (size_t)BUS_MEMORY_SIZE, 0);
+ return ((id == -1) || (shmctl(id, IPC_RMID, &_info) == -1)) ? -1 : 0;
+}
+
+
+/**
+ * Increase or decrease the value of a semaphore, or wait the it to become 0
+ *
+ * @param bus Bus information
+ * @param semaphore The index of the semaphore, `S`, `W`, `X` or `Q`
+ * @param delta The adjustment to make to the semaphore's value, 0 to wait for it to become 0
+ * @param flags `SEM_UNDO` if the action should be undone when the program exits
+ * @return 0 on success, -1 on error
+ */
+static int
+semaphore_op(const bus_t *bus, int semaphore, int delta, int flags)
+{
+ struct sembuf op;
+ op.sem_num = (unsigned short)semaphore;
+ op.sem_op = (short)delta;
+ op.sem_flg = (short)flags;
+ return semop(bus->sem_id, &op, (size_t)1);
+}
+
+
+/**
+ * Increase or decrease the value of a semaphore, or wait the it to become 0
+ *
+ * @param bus Bus information
+ * @param semaphore The index of the semaphore, `S`, `W`, `X` or `Q`
+ * @param delta The adjustment to make to the semaphore's value, 0 to wait for it to become 0
+ * @param flags `SEM_UNDO` if the action should be undone when the program exits
+ * @param timeout The amount of time to wait before failing
+ * @return 0 on success, -1 on error
+ */
+static int
+semaphore_op_timed(const bus_t *bus, int semaphore, int delta, int flags, const struct timespec *timeout)
+{
+ struct sembuf op;
+ op.sem_num = (unsigned short)semaphore;
+ op.sem_op = (short)delta;
+ op.sem_flg = (short)flags;
+ return semtimedop(bus->sem_id, &op, (size_t)1, timeout);
+}
+
+
+/**
+ * Set the value of a semaphore
+ *
+ * @param bus Bus information
+ * @param semaphore The index of the semaphore, `S`, `W`, `X` or `Q`
+ * @param value The new value of the semaphore
+ * @return 0 on success, -1 on error
+ */
+static int
+write_semaphore(const bus_t *bus, unsigned semaphore, int value)
+{
+ union semun semval;
+ semval.val = value;
+ return semctl(bus->sem_id, (unsigned short)semaphore, SETVAL, semval);
+}
+
+
+/**
+ * Open the shared memory for the bus
+ *
+ * @param bus Bus information
+ * @param flags `BUS_RDONLY`, `BUS_WRONLY` or `BUS_RDWR`
+ * @return 0 on success, -1 on error
+ */
+static int
+open_shared_memory(bus_t *bus, int flags)
+{
+ int id;
+ void *address;
+ t(id = shmget(bus->key_shm, (size_t)BUS_MEMORY_SIZE, 0));
+ address = shmat(id, NULL, (flags & BUS_RDONLY) ? SHM_RDONLY : 0);
+ if ((address == (void *)-1) || !address)
+ goto fail;
+ bus->message = (char *)address;
+ return 0;
+fail:
+ return -1;
+}
+
+
+/**
+ * Close the shared memory for the bus
+ *
+ * @param bus Bus information
+ * @return 0 on success, -1 on error
+ */
+static int
+close_shared_memory(bus_t *bus)
+{
+ t(shmdt(bus->message));
+ bus->message = NULL;
+ return 0;
+fail:
+ return -1;
+}
+
+
+/**
+ * Get a random ASCII letter or digit
+ *
+ * @return A random ASCII letter or digit
+ */
+static char
+randomchar(void)
+{
+ int rint = rand();
+ double r = (double)rint;
+ r /= (double)RAND_MAX + 1;
+ r *= 10 + 26 + 26;
+ return "0123456789qwertyuiopasdfghjklzxcvbnmQWERTYUIOPASDFGHJKLZXCVBNM"[(int)r];
+}
+
+
+/**
+ * Basically, this is `mkdir -p -m $mode $pathname`
+ *
+ * @param pathname The pathname of the directory to create if missing
+ * @param mode The permission bits of any created directory
+ * @return 0 on sucess, -1 on error
+ */
+static int
+mkdirs(char *pathname, mode_t mode)
+{
+ size_t i, n = strlen(pathname);
+ char c;
+ for (i = 0; i < n; i++)
+ if (pathname[i] != '/')
+ break;
+ for (; i < n; i++) {
+ if (pathname[i] == '/') {
+ c = pathname[i];
+ if (access(pathname, F_OK))
+ if (mkdir(pathname, mode) < 0)
+ return -1;
+ pathname[i] = c;
+ break;
+ }
+ }
+ if (access(pathname, F_OK))
+ if (mkdir(pathname, mode) < 0)
+ return -1;
+ return 0;
+}
+
+
+/**
+ * Convert an absolute time to a relative time
+ *
+ * @param delta Output parameter for the relative time
+ * @param absolute The absolute time
+ * @param clockid The ID of the clock the time is measured in
+ * @return 0 on success, -1 on error
+ */
+static int
+absolute_time_to_delta_time(struct timespec *delta, const struct timespec *absolute, clockid_t clockid)
+{
+ if (clock_gettime(clockid, delta) < 0)
+ return -1;
+
+ delta->tv_sec = absolute->tv_sec - delta->tv_sec;
+ delta->tv_nsec = absolute->tv_nsec - delta->tv_nsec;
+
+ if (delta->tv_nsec < 0L) {
+ delta->tv_nsec += 1000000000L;
+ delta->tv_sec -= 1;
+ }
+ if (delta->tv_nsec >= 1000000000L) {
+ delta->tv_nsec -= 1000000000L;
+ delta->tv_sec += 1;
+ }
+
+ return 0;
+}
+
+
+
+/**
+ * Create a new bus
+ *
+ * @param file The pathname of the bus, `NULL` to create a random one
+ * @param flags `BUS_EXCL` (if `file` is not `NULL`) to fail if the file
+ * already exists, otherwise if the file exists, nothing
+ * will happen;
+ * `BUS_INTR` to fail if interrupted
+ * @param out_file Output parameter for the pathname of the bus
+ * @return 0 on success, -1 on error
+ */
+int
+bus_create(const char *restrict file, int flags, char **restrict out_file)
+{
+ int fd = -1, saved_errno;
+ bus_t bus;
+ char buf[1 + 2 * (3 * sizeof(ssize_t) + 2)];
+ size_t ptr, len;
+ ssize_t wrote;
+ char *genfile = NULL;
+ const char *env;
+
+ if (out_file)
+ *out_file = NULL;
+
+ bus.sem_id = -1;
+ bus.key_sem = -1;
+ bus.key_shm = -1;
+ bus.message = NULL;
+ bus.first_poll = 0;
+
+ srand((unsigned int)time(NULL) + (unsigned int)rand());
+
+ if (file) {
+ fd = open(file, O_WRONLY | O_CREAT | O_EXCL, DEFAULT_MODE);
+ if (fd == -1) {
+ if ((errno != EEXIST) || (flags & BUS_EXCL))
+ return -1;
+ goto done;
+ }
+ } else {
+ env = getenv("XDG_RUNTIME_DIR");
+ if (!env || !*env)
+ env = "/run";
+ genfile = malloc((strlen(env) + 6 + 7 + 30) * sizeof(char));
+ if (!genfile)
+ goto fail;
+ if (out_file)
+ *out_file = genfile;
+ sprintf(genfile, "%s/bus", env);
+ t(mkdirs(genfile, 0755));
+ sprintf(genfile, "%s/bus/random.", env);
+ len = strlen(genfile);
+ genfile[len + 30] = '\0';
+ retry:
+ for (ptr = 0; ptr < 30; ptr++)
+ genfile[len + ptr] = randomchar();
+ fd = open(genfile, O_WRONLY | O_CREAT | O_EXCL, DEFAULT_MODE);
+ if (fd == -1) {
+ if (errno == EEXIST)
+ goto retry;
+ return -1;
+ }
+ }
+
+ t(create_semaphores(&bus));
+ t(create_shared_memory(&bus));
+
+ sprintf(buf, "%zi\n%zi\n", (ssize_t)(bus.key_sem), (ssize_t)(bus.key_shm));
+ for (len = strlen(buf), ptr = 0; ptr < len;) {
+ wrote = write(fd, buf + ptr, len - ptr);
+ if (wrote < 0) {
+ if ((errno != EINTR) || (flags & BUS_INTR))
+ goto fail;
+ } else {
+ ptr += (size_t)wrote;
+ }
+ }
+ close(fd);
+
+done:
+ if (out_file && !*out_file) {
+ len = strlen(file) + 1;
+ *out_file = malloc(len * sizeof(char));
+ memcpy(*out_file, file, len * sizeof(char));
+ } else if (!out_file) {
+ free(genfile);
+ }
+ return 0;
+
+fail:
+ saved_errno = errno;
+ if (bus.key_sem)
+ remove_semaphores(&bus);
+ if (bus.key_shm)
+ remove_shared_memory(&bus);
+ if (fd == -1)
+ close(fd);
+ if (out_file)
+ *out_file = NULL;
+ free(genfile);
+ unlink(file);
+ errno = saved_errno;
+ return -1;
+}
+
+
+/**
+ * Remove a bus
+ *
+ * @param file The pathname of the bus
+ * @return 0 on success, -1 on error
+ */
+int
+bus_unlink(const char *file)
+{
+ int r = 0, saved_errno = 0;
+ bus_t bus;
+ t(bus_open(&bus, file, -1));
+
+ r |= remove_semaphores(&bus);
+ if (r && !saved_errno)
+ saved_errno = errno;
+
+ r |= remove_shared_memory(&bus);
+ if (r && !saved_errno)
+ saved_errno = errno;
+
+ r |= unlink(file);
+ if (r && !saved_errno)
+ saved_errno = errno;
+
+ errno = saved_errno;
+ return r;
+fail:
+ return -1;
+}
+
+
+/**
+ * Open an existing bus
+ *
+ * @param bus Bus information to fill
+ * @param file The filename of the bus
+ * @param flags `BUS_RDONLY`, `BUS_WRONLY` or `BUS_RDWR`
+ * any negative value is used internally
+ * for telling the function to not actually
+ * opening the bus, but just to parse the file
+ * @return 0 on success, -1 on error
+ */
+int
+bus_open(bus_t *restrict bus, const char *restrict file, int flags)
+{
+ int saved_errno;
+ char *line = NULL;
+ size_t len = 0;
+ FILE *f;
+
+ bus->sem_id = -1;
+ bus->key_sem = -1;
+ bus->key_shm = -1;
+ bus->message = NULL;
+
+ f = fopen(file, "r");
+
+ t(getline(&line, &len, f));
+ t(bus->key_sem = (key_t)atoll(line));
+ free(line), line = NULL, len = 0;
+
+ t(getline(&line, &len, f));
+ t(bus->key_shm = (key_t)atoll(line));
+ free(line), line = NULL;
+
+ fclose(f);
+
+ if (flags >= 0) {
+ t(open_semaphores(bus));
+ t(open_shared_memory(bus, flags));
+ }
+ return 0;
+fail:
+ saved_errno = errno;
+ free(line);
+ errno = saved_errno;
+ return -1;
+}
+
+
+/**
+ * Close a bus
+ *
+ * @param bus Bus information
+ * @return 0 on success, -1 on error
+ */
+int
+bus_close(bus_t *bus)
+{
+ bus->sem_id = -1;
+ if (bus->message)
+ t(close_shared_memory(bus));
+ bus->message = NULL;
+ return 0;
+
+fail:
+ return -1;
+}
+
+
+/**
+ * Broadcast a message on a bus
+ *
+ * @param bus Bus information
+ * @param message The message to write, may not be longer than
+ * `BUS_MEMORY_SIZE` including the NUL-termination
+ * @param flags `BUS_NOWAIT` if this function shall fail if
+ * another process is currently running this
+ * procedure
+ * @return 0 on success, -1 on error
+ */
+int
+bus_write(const bus_t *bus, const char *message, int flags)
+{
+ int saved_errno;
+#ifndef BUS_SEMAPHORES_ARE_SYNCHRONOUS
+ int state = 0;
+#endif
+ if (acquire_semaphore(bus, X, SEM_UNDO | F(BUS_NOWAIT, IPC_NOWAIT)) == -1)
+ return -1;
+ t(zero_semaphore(bus, W, 0));
+ write_shared_memory(bus, message);
+#ifndef BUS_SEMAPHORES_ARE_SYNCHRONOUS
+ t(release_semaphore(bus, N, SEM_UNDO)); state++;
+#endif
+ t(write_semaphore(bus, Q, 0));
+ t(zero_semaphore(bus, S, 0));
+#ifndef BUS_SEMAPHORES_ARE_SYNCHRONOUS
+ t(acquire_semaphore(bus, N, SEM_UNDO)); state--;
+#endif
+ t(release_semaphore(bus, X, SEM_UNDO));
+ return 0;
+
+fail:
+ saved_errno = errno;
+#ifndef BUS_SEMAPHORES_ARE_SYNCHRONOUS
+ if (state > 0)
+ acquire_semaphore(bus, N, SEM_UNDO);
+#endif
+ release_semaphore(bus, X, SEM_UNDO);
+ errno = saved_errno;
+ return -1;
+}
+
+
+/**
+ * Broadcast a message on a bus
+ *
+ * @param bus Bus information
+ * @param message The message to write, may not be longer than
+ * `BUS_MEMORY_SIZE` including the NUL-termination
+ * @param timeout The time the operation shall fail with errno set
+ * to `EAGAIN` if not completed
+ * @param clockid The ID of the clock the `timeout` is measured with,
+ * it most be a predictable clock
+ * @return 0 on success, -1 on error
+ */
+int bus_write_timed(const bus_t *bus, const char *message,
+ const struct timespec *timeout, clockid_t clockid)
+{
+ int saved_errno;
+#ifndef BUS_SEMAPHORES_ARE_SYNCHRONOUS
+ int state = 0;
+#endif
+ struct timespec delta;
+ if (!timeout)
+ return bus_write(bus, message, 0);
+
+ DELTA;
+ if (acquire_semaphore_timed(bus, X, SEM_UNDO, &delta) == -1)
+ return -1;
+ DELTA;
+ t(zero_semaphore_timed(bus, W, 0, &delta));
+ write_shared_memory(bus, message);
+#ifndef BUS_SEMAPHORES_ARE_SYNCHRONOUS
+ t(release_semaphore(bus, N, SEM_UNDO)); state++;
+#endif
+ t(write_semaphore(bus, Q, 0));
+ t(zero_semaphore(bus, S, 0));
+#ifndef BUS_SEMAPHORES_ARE_SYNCHRONOUS
+ t(acquire_semaphore(bus, N, SEM_UNDO)); state--;
+#endif
+ t(release_semaphore(bus, X, SEM_UNDO));
+ return 0;
+
+fail:
+ saved_errno = errno;
+#ifndef BUS_SEMAPHORES_ARE_SYNCHRONOUS
+ if (state > 0)
+ acquire_semaphore(bus, N, SEM_UNDO);
+#endif
+ release_semaphore(bus, X, SEM_UNDO);
+ errno = saved_errno;
+ return -1;
+}
+
+
+/**
+ * Listen (in a loop, forever) for new message on a bus
+ *
+ * @param bus Bus information
+ * @param callback Function to call when a message is received, the
+ * input parameters will be the read message and
+ * `user_data` from `bus_read`'s parameter with the
+ * same name. The message must have been parsed or
+ * copied when `callback` returns as it may be over
+ * overridden after that time. `callback` should
+ * return either of the the values:
+ * * 0: stop listening
+ * * 1: continue listening
+ * * -1: an error has occurred
+ * However, the function [`bus_read`] will invoke
+ * `callback` with `message` set to `NULL`one time
+ * directly after it has started listening on the
+ * bus. This is to the the program now it can safely
+ * continue with any action that requires that the
+ * programs is listening on the bus.
+ * @param user_data Parameter passed to `callback`
+ * @return 0 on success, -1 on error
+ */
+int
+bus_read(const bus_t *restrict bus, int (*callback)(const char *message, void *user_data), void *user_data)
+{
+ int r, state = 0, saved_errno;
+ if (release_semaphore(bus, S, SEM_UNDO) == -1)
+ return -1;
+ t(r = callback(NULL, user_data));
+ if (!r) goto done;
+ for (;;) {
+ t(release_semaphore(bus, Q, 0));
+ t(zero_semaphore(bus, Q, 0));
+ t(r = callback(bus->message, user_data));
+ if (!r) goto done;
+ t(release_semaphore(bus, W, SEM_UNDO)); state++;
+ t(acquire_semaphore(bus, S, SEM_UNDO)); state++;
+ t(zero_semaphore(bus, S, 0));
+#ifndef BUS_SEMAPHORES_ARE_SYNCHRONOUS_ME_HARDER
+ t(zero_semaphore(bus, N, 0));
+#endif
+ t(release_semaphore(bus, S, SEM_UNDO)); state--;
+ t(acquire_semaphore(bus, W, SEM_UNDO)); state--;
+ }
+
+fail:
+ saved_errno = errno;
+ if (state > 1)
+ release_semaphore(bus, S, SEM_UNDO);
+ if (state > 0)
+ acquire_semaphore(bus, W, SEM_UNDO);
+ acquire_semaphore(bus, S, SEM_UNDO);
+ errno = saved_errno;
+ return -1;
+
+done:
+ t(acquire_semaphore(bus, S, SEM_UNDO));
+ return 0;
+}
+
+
+/**
+ * Listen (in a loop, forever) for new message on a bus
+ *
+ * @param bus Bus information
+ * @param callback Function to call when a message is received, the
+ * input parameters will be the read message and
+ * `user_data` from `bus_read`'s parameter with the
+ * same name. The message must have been parsed or
+ * copied when `callback` returns as it may be over
+ * overridden after that time. `callback` should
+ * return either of the the values:
+ * * 0: stop listening
+ * * 1: continue listening
+ * * -1: an error has occurred
+ * However, the function [`bus_read`] will invoke
+ * `callback` with `message` set to `NULL`one time
+ * directly after it has started listening on the
+ * bus. This is to the the program now it can safely
+ * continue with any action that requires that the
+ * programs is listening on the bus.
+ * @param user_data Parameter passed to `callback`
+ * @param timeout The time the operation shall fail with errno set
+ * to `EAGAIN` if not completed, note that the callback
+ * function may or may not have been called
+ * @param clockid The ID of the clock the `timeout` is measured with,
+ * it most be a predictable clock
+ * @return 0 on success, -1 on error
+ */
+int bus_read_timed(const bus_t *restrict bus, int (*callback)(const char *message, void *user_data),
+ void *user_data, const struct timespec *timeout, clockid_t clockid)
+{
+ int r, state = 0, saved_errno;
+ struct timespec delta;
+ if (!timeout)
+ return bus_read(bus, callback, user_data);
+
+ DELTA;
+ if (release_semaphore_timed(bus, S, SEM_UNDO, &delta) == -1)
+ return -1;
+ t(r = callback(NULL, user_data));
+ if (!r) goto done;
+ for (;;) {
+ DELTA;
+ t(release_semaphore_timed(bus, Q, 0, &delta));
+ DELTA;
+ t(zero_semaphore_timed(bus, Q, 0, &delta));
+ t(r = callback(bus->message, user_data));
+ if (!r) goto done;
+ t(release_semaphore(bus, W, SEM_UNDO)); state++;
+ t(acquire_semaphore(bus, S, SEM_UNDO)); state++;
+ t(zero_semaphore(bus, S, 0));
+#ifndef BUS_SEMAPHORES_ARE_SYNCHRONOUS_ME_HARDER
+ t(zero_semaphore(bus, N, 0));
+#endif
+ t(release_semaphore(bus, S, SEM_UNDO)); state--;
+ t(acquire_semaphore(bus, W, SEM_UNDO)); state--;
+ }
+
+fail:
+ saved_errno = errno;
+ if (state > 1)
+ release_semaphore(bus, S, SEM_UNDO);
+ if (state > 0)
+ acquire_semaphore(bus, W, SEM_UNDO);
+ acquire_semaphore(bus, S, SEM_UNDO);
+ errno = saved_errno;
+ return -1;
+
+done:
+ t(acquire_semaphore(bus, S, SEM_UNDO));
+ return 0;
+}
+
+
+/**
+ * Announce that the thread is listening on the bus.
+ * This is required so the will does not miss any
+ * messages due to race conditions. Additionally,
+ * not calling this function will cause the bus the
+ * misbehave, is `bus_poll` is written to expect
+ * this function to have been called.
+ *
+ * @param bus Bus information
+ * @return 0 on success, -1 on error
+ */
+int
+bus_poll_start(bus_t *bus)
+{
+ bus->first_poll = 1;
+ t(release_semaphore(bus, S, SEM_UNDO));
+ t(release_semaphore(bus, Q, 0));
+ return 0;
+
+fail:
+ return -1;
+}
+
+
+/**
+ * Announce that the thread has stopped listening on the bus.
+ * This is required so that the thread does not cause others
+ * to wait indefinitely.
+ *
+ * @param bus Bus information
+ * @return 0 on success, -1 on error
+ */
+int
+bus_poll_stop(const bus_t *bus)
+{
+ return acquire_semaphore(bus, S, SEM_UNDO | IPC_NOWAIT);
+}
+
+
+/**
+ * Wait for a message to be broadcasted on the bus.
+ * The caller should make a copy of the received message,
+ * without freeing the original copy, and parse it in a
+ * separate thread. When the new thread has started be
+ * started, the caller of this function should then
+ * either call `bus_poll` again or `bus_poll_stop`.
+ *
+ * @param bus Bus information
+ * @param flags `BUS_NOWAIT` if the bus should fail and set `errno` to
+ * `EAGAIN` if there isn't already a message available on the bus
+ * @return The received message, `NULL` on error
+ */
+const char *
+bus_poll(bus_t *bus, int flags)
+{
+ int state = 0, saved_errno;
+ if (!bus->first_poll) {
+ t(release_semaphore(bus, W, SEM_UNDO)); state++;
+ t(acquire_semaphore(bus, S, SEM_UNDO)); state++;
+ t(zero_semaphore(bus, S, 0));
+#ifndef BUS_SEMAPHORES_ARE_SYNCHRONOUS_ME_HARDER
+ t(zero_semaphore(bus, N, 0));
+#endif
+ t(release_semaphore(bus, S, SEM_UNDO)); state--;
+ t(acquire_semaphore(bus, W, SEM_UNDO)); state--;
+ t(release_semaphore(bus, Q, 0));
+ } else {
+ bus->first_poll = 0;
+ }
+ state--;
+ t(zero_semaphore(bus, Q, F(BUS_NOWAIT, IPC_NOWAIT)));
+ return bus->message;
+
+fail:
+ saved_errno = errno;
+ if (state > 1)
+ release_semaphore(bus, S, SEM_UNDO);
+ if (state > 0)
+ acquire_semaphore(bus, W, SEM_UNDO);
+ if (state < 0)
+ bus->first_poll = 1;
+ errno = saved_errno;
+ return NULL;
+}
+
+
+/**
+ * Wait for a message to be broadcasted on the bus.
+ * The caller should make a copy of the received message,
+ * without freeing the original copy, and parse it in a
+ * separate thread. When the new thread has started be
+ * started, the caller of this function should then
+ * either call `bus_poll_timed` again or `bus_poll_stop`.
+ *
+ * @param bus Bus information
+ * @param timeout The time the operation shall fail with errno set
+ * to `EAGAIN` if not completed
+ * @param clockid The ID of the clock the `timeout` is measured with,
+ * it most be a predictable clock
+ * @return The received message, `NULL` on error
+ */
+const char *bus_poll_timed(bus_t *bus, const struct timespec *timeout, clockid_t clockid)
+{
+ int state = 0, saved_errno;
+ struct timespec delta;
+ if (!timeout)
+ return bus_poll(bus, 0);
+
+ if (!bus->first_poll) {
+ t(release_semaphore(bus, W, SEM_UNDO)); state++;
+ t(acquire_semaphore(bus, S, SEM_UNDO)); state++;
+ t(zero_semaphore(bus, S, 0));
+#ifndef BUS_SEMAPHORES_ARE_SYNCHRONOUS_ME_HARDER
+ t(zero_semaphore(bus, N, 0));
+#endif
+ t(release_semaphore(bus, S, SEM_UNDO)); state--;
+ t(acquire_semaphore(bus, W, SEM_UNDO)); state--;
+ t(release_semaphore(bus, Q, 0));
+ } else {
+ bus->first_poll = 0;
+ }
+ state--;
+ DELTA;
+ t(zero_semaphore_timed(bus, Q, 0, &delta));
+ return bus->message;
+
+fail:
+ saved_errno = errno;
+ if (state > 1)
+ release_semaphore(bus, S, SEM_UNDO);
+ if (state > 0)
+ acquire_semaphore(bus, W, SEM_UNDO);
+ if (state < 0)
+ bus->first_poll = 1;
+ errno = saved_errno;
+ return NULL;
+}
+
+
+/**
+ * Change the ownership of a bus
+ *
+ * `stat(2)` can be used of the bus's associated file to get the bus's ownership
+ *
+ * @param file The pathname of the bus
+ * @param owner The user ID of the bus's new owner
+ * @param group The group ID of the bus's new group
+ * @return 0 on success, -1 on error
+ */
+int
+bus_chown(const char *file, uid_t owner, gid_t group)
+{
+ bus_t bus;
+ struct semid_ds sem_stat;
+ struct shmid_ds shm_stat;
+ int shm_id;
+
+ t(bus_open(&bus, file, -1));
+ t(chown(file, owner, group));
+
+ /* chown sem */
+ t(open_semaphores(&bus));
+ t(semctl(bus.sem_id, 0, IPC_STAT, &sem_stat));
+ sem_stat.sem_perm.uid = owner;
+ sem_stat.sem_perm.gid = group;
+ t(semctl(bus.sem_id, 0, IPC_SET, &sem_stat));
+
+ /* chown shm */
+ t(shm_id = shmget(bus.key_shm, (size_t)BUS_MEMORY_SIZE, 0));
+ t(shmctl(shm_id, IPC_STAT, &shm_stat));
+ shm_stat.shm_perm.uid = owner;
+ shm_stat.shm_perm.gid = group;
+ t(shmctl(shm_id, IPC_SET, &shm_stat));
+
+ return 0;
+fail:
+ return -1;
+}
+
+
+/**
+ * Change the permissions for a bus
+ *
+ * `stat(2)` can be used of the bus's associated file to get the bus's permissions
+ *
+ * @param file The pathname of the bus
+ * @param mode The permissions of the bus, any permission for a user implies
+ * full permissions for that user, except only the owner may
+ * edit the bus's associated file
+ * @return 0 on success, -1 on error
+ */
+int
+bus_chmod(const char *file, mode_t mode)
+{
+ bus_t bus;
+ mode_t fmode;
+ struct semid_ds sem_stat;
+ struct shmid_ds shm_stat;
+ int shm_id;
+
+ mode = (mode & S_IRWXU) ? (mode | S_IRWXU) : (mode & (mode_t)~S_IRWXU);
+ mode = (mode & S_IRWXG) ? (mode | S_IRWXG) : (mode & (mode_t)~S_IRWXG);
+ mode = (mode & S_IRWXO) ? (mode | S_IRWXO) : (mode & (mode_t)~S_IRWXO);
+ mode &= (S_IWUSR | S_IWGRP | S_IWOTH | S_IRUSR | S_IRGRP | S_IROTH);
+ fmode = mode & (mode_t)~(S_IWGRP | S_IWOTH);
+
+ t(bus_open(&bus, file, -1));
+ t(chmod(file, fmode));
+
+ /* chmod sem */
+ t(open_semaphores(&bus));
+ t(semctl(bus.sem_id, 0, IPC_STAT, &sem_stat));
+ sem_stat.sem_perm.mode = (unsigned short)mode;
+ t(semctl(bus.sem_id, 0, IPC_SET, &sem_stat));
+
+ /* chmod shm */
+ t(shm_id = shmget(bus.key_shm, (size_t)BUS_MEMORY_SIZE, 0));
+ t(shmctl(shm_id, IPC_STAT, &shm_stat));
+ shm_stat.shm_perm.mode = (unsigned short)mode;
+ t(shmctl(shm_id, IPC_SET, &shm_stat));
+
+ return 0;
+fail:
+ return -1;
+}