diff options
Diffstat (limited to '')
-rw-r--r-- | src/bus.c | 459 | ||||
-rw-r--r-- | src/bus.h | 103 | ||||
-rw-r--r-- | src/cmdline.c | 322 |
3 files changed, 629 insertions, 255 deletions
diff --git a/src/bus.c b/src/bus.c new file mode 100644 index 0000000..6dd6f1a --- /dev/null +++ b/src/bus.c @@ -0,0 +1,459 @@ +/** + * MIT/X Consortium License + * + * Copyright © 2015 Mattias Andrée <maandree@member.fsf.org> + * + * Permission is hereby granted, free of charge, to any person obtaining a + * copy of this software and associated documentation files (the "Software"), + * to deal in the Software without restriction, including without limitation + * the rights to use, copy, modify, merge, publish, distribute, sublicense, + * and/or sell copies of the Software, and to permit persons to whom the + * Software is furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in + * all copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL + * THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING + * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER + * DEALINGS IN THE SOFTWARE. + */ +#define _XOPEN_SOURCE 700 +#include "bus.h" + +#include <stdlib.h> +#include <stdio.h> +#include <time.h> +#include <errno.h> +#include <string.h> +#include <unistd.h> + +#include <sys/ipc.h> +#include <sys/sem.h> +#include <sys/shm.h> + + + +/** + * Semaphore used to signal `bus_write` that `bus_read` is ready + */ +#define S 0 + +/** + * Semaphore for making `bus_write` wait while `bus_read` is reseting `S` + */ +#define W 1 + +/** + * Binary semaphore for making `bus_write` exclusively locked + */ +#define X 2 + +/** + * Semaphore used to cue `bus_read` that it may read the shared memory + */ +#define Q 3 + +/** + * The number of semaphores in the semaphore array + */ +#define BUS_SEMAPHORES 4 + + + +/** + * Decrease the value of a semaphore by 1 + * + * @param bus:const bus_t * The bus + * @param semaphore:unsigned short The index of the semaphore, `S`, `W`, `X` or `Q` + * @param flags:short `SEM_UNDO` if the action should be undone when the program exits + * @return :int 0 on success, -1 on error + */ +#define acquire_semaphore(bus, semaphore, flags) \ + semaphore_op(bus, semaphore, -1, flags) + +/** + * Increase the value of a semaphore by 1 + * + * @param bus:const bus_t * The bus + * @param semaphore:unsigned short The index of the semaphore, `S`, `W`, `X` or `Q` + * @param flags:short `SEM_UNDO` if the action should be undone when the program exits + * @return :int 0 on success, -1 on error + */ +#define release_semaphore(bus, semaphore, flags) \ + semaphore_op(bus, semaphore, +1, flags) + +/** + * Wait for the value of a semphore to become 0 + * + * @param bus:const bus_t * The bus + * @param semaphore:unsigned short The index of the semaphore, `S`, `W`, `X` or `Q` + * @return :int 0 on success, -1 on error + */ +#define zero_semaphore(bus, semaphore) \ + semaphore_op(bus, semaphore, 0, 0) + +/** + * Open the semaphore array + * + * @param bus:const bus_t * The bus + * @return :int 0 on success, -1 on error + */ +#define open_semaphores(bus) \ + (((bus)->sem_id = semget((bus)->key_sem, BUS_SEMAPHORES, 0600)) == -1 ? -1 : 0) + +/** + * Write a message to the shared memory + * + * @param bus:const bus_t * The bus + * @param msg:const char * The message + * @return :int 0 on success, -1 on error + */ +#define write_shared_memory(bus, msg) \ + (memcpy((bus)->message, msg, (strlen(msg) + 1) * sizeof(char))) + + + +/** + * Statement wrapper that goes to `fail` on failure + */ +#define t(inst) \ + if ((inst) == -1) goto fail + + + +#ifdef _SEM_SEMUN_UNDEFINED +union semun { + int val; + struct semid_ds *buf; + unsigned short *array; +}; +#endif + + + +/** + * Create a semaphore array for the bus + * + * @param bus Bus information to fill with the key of the created semaphore array + * @return 0 on success, -1 on error + */ +static int +create_semaphores(bus_t *bus) +{ + int id = -1, rint, saved_errno; + double r; + union semun values; + + values.array = NULL; + + /* Create semaphore array. */ + for (;;) { + rint = rand(); + r = (double)rint; + r /= (double)RAND_MAX + 1; + r *= (1 << (8 * sizeof(key_t) - 2)) - 1; + bus->key_sem = (key_t)r + 1; + if (bus->key_sem == IPC_PRIVATE) + continue; + id = semget(bus->key_sem, BUS_SEMAPHORES, IPC_CREAT | IPC_EXCL | 0600); + if (id != -1) + break; + if ((errno != EEXIST) && (errno != EINTR)) + goto fail; + } + + /* Initialise the array. */ + values.array = calloc(BUS_SEMAPHORES, sizeof(unsigned short)); + values.array[X] = 1; + if (!values.array) + goto fail; + if (semctl(id, 0, SETALL, values.array) == -1) + goto fail; + free(values.array); + values.array = NULL; + + return 0; + +fail: + saved_errno = errno; + if ((id != -1) && (semctl(id, 0, IPC_RMID) == -1)) + perror(argv0); + free(values.array); + errno = saved_errno; + return -1; +} + + +/** + * Create a shared memory for the bus + * + * @param bus Bus information to fill with the key of the created shared memory + * @return 0 on success, -1 on error + */ +static int +create_shared_memory(bus_t *bus) +{ + int id = -1, rint, saved_errno; + double r; + struct shmid_ds _info; + + /* Create shared memory. */ + for (;;) { + rint = rand(); + r = (double)rint; + r /= (double)RAND_MAX + 1; + r *= (1 << (8 * sizeof(key_t) - 2)) - 1; + bus->key_shm = (key_t)r + 1; + if (bus->key_shm == IPC_PRIVATE) + continue; + id = shmget(bus->key_shm, BUS_MEMORY_SIZE, IPC_CREAT | IPC_EXCL | 0600); + if (id != -1) + break; + if ((errno != EEXIST) && (errno != EINTR)) + goto fail; + } + + return 0; + +fail: + saved_errno = errno; + if ((id != -1) && (shmctl(id, IPC_RMID, &_info) == -1)) + perror(argv0); + errno = saved_errno; + return -1; +} + + +/** + * Remove the semaphore array for the bus + * + * @param bus Bus information + * @return 0 on success, -1 on error + */ +static int +remove_semaphores(const bus_t *bus) +{ + int id = semget(bus->key_sem, BUS_SEMAPHORES, 0600); + return ((id == -1) || (semctl(id, 0, IPC_RMID) == -1)) ? -1 : 0; +} + + +/** + * Remove the shared memory for the bus + * + * @param bus Bus information + * @return 0 on success, -1 on error + */ +static int +remove_shared_memory(const bus_t *bus) +{ + struct shmid_ds _info; + int id = shmget(bus->key_shm, BUS_MEMORY_SIZE, 0600); + return ((id == -1) || (shmctl(sem_id, IPC_RMID, &_info) == -1)) ? -1 : 0; +} + + +/** + * Increase or decrease the value of a semaphore, or wait the it to become 0 + * + * @param bus Bus information + * @param semaphore The index of the semaphore, `S`, `W`, `X` or `Q` + * @param delta The adjustment to make to the semaphore's value, 0 to wait for it to become 0 + * @param flags `SEM_UNDO` if the action should be undone when the program exits + * @return 0 on success, -1 on error + */ +static int +semaphore_op(const bus_t *bus, unsigned short semaphore, short delta, short flags) +{ + struct sembuf op; + op.sem_op = delta; + op.sem_num = semaphore; + op.sem_flg = flags; + return semop(bus->sem_id, &op, 1); +} + + +/** + * Set the value of a semaphore + * + * @param bus Bus information + * @param semaphore The index of the semaphore, `S`, `W`, `X` or `Q` + * @param value The new value of the semaphore + * @return 0 on success, -1 on error + */ +static int +write_semaphore(const bus_t *bus, unsigned short semaphore, int value) +{ + union semun semval; + semval.val = value; + return semctl(bus->sem_id, semaphore, SETVAL, semval); +} + + +/** + * Open the shared memory for the bus + * + * @param bus Bus information + * @param flags `BUS_RDONLY`, `BUS_WRONLY` or `BUS_RDWR` + * @return 0 on success, -1 on error + */ +static int +open_shared_memory(const bus_t *bus, int flags) +{ + int id; + void *address; + t(id = shmget(bus->key_shm, BUS_MEMORY_SIZE, 0600)); + address = shmat(id, NULL, (flags & BUS_RDONLY) ? SHM_RDONLY : 0); + if ((address == (void *)-1) || !address) + goto fail; + this->message = (char *)address; + return 0; +fail: + return -1; +} + + +/** + * Close the shared memory for the bus + * + * @param bus Bus information + * @return 0 on success, -1 on error + */ +static int +close_shared_memory(const bus_t *bus) +{ + t(shmdt(this->message)); + this->message = NULL; + return 0; +fail: + return -1; +} + + + +const char * +bus_create(const char *file, int flags) +{ + int saved_errno; + bus_t bus; + bus.sem_id = -1; + bus.key_sem = -1; + bus.key_shm = -1; + bus.message = NULL; + + srand((unsigned int)time(NULL) + (unsigned int)rand()); + + t(create_semaphores(&bus)); + t(create_shared_memory(&bus)); + return NULL; /* TODO */ + +fail: + saved_errno = errno; + if (bus.key_sem) + remove_semaphores(&bus); + if (bus.key_shm) + remove_shared_memory(&bus); + errno = saved_errno; + return NULL; +} + + +int +bus_unlink(const char *file) +{ + int r = 0, saved_errno = 0; + bus_t bus; + t(bus_open(&bus, file, -1)); + + r |= remove_semaphores(&bus); + if (r && !saved_errno) + saved_errno = errno; + + r |= remove_shared_memory(&bus); + if (r && !saved_errno) + saved_errno = errno; + + r |= unlink(file); + if (r && !saved_errno) + saved_errno = errno; + + errno = saved_errno; + return r; +fail: + return -1; +} + + +int +bus_open(bus_t *bus, const char *file, int flags) +{ + bus->sem_id = -1; + bus->key_sem = -1; + bus->key_shm = -1; + bus->message = NULL; + + /* TODO */ + + if (flags >= 0) { + t(open_semaphores(bus)); + t(open_shared_memory(bus, flags)); + } + return 0; +fail: + return -1; +} + + +int +bus_close(bus_t *bus) +{ + if (bus->address) + t(close_shared_memory(bus)); + return 0; +fail: + return -1; +} + + +int +bus_write(const bus_t *bus, const char *message) +{ + t(acquire_semaphore(bus, X, SEM_UNDO)); + t(zero_semaphore(bus, W)); + t(write_shared_memory(bus, message)); + t(write_semaphore(bus, Q, 0)); + t(zero_semaphore(bus, S)); + t(release_semaphore(bus, X, SEM_UNDO)); + return 0; +fail: + return -1; +} + + +int +bus_read(const bus_t *bus, int (*callback)(const char *message, void *user_data), void *user_data) +{ + int r; + t(release_semaphore(bus, S, SEM_UNDO)); + for (;;) { + t(release_semaphore(bus, Q, 0)); + t(zero_semaphore(bus, Q)); + t(r = callback(bus->message, user_data)); + if (!r) { + t(acquire_semaphore(bus, S, SEM_UNDO)); + return 0; + } + t(release_semaphore(bus, W, SEM_UNDO)); + t(acquire_semaphore(bus, S, SEM_UNDO)); + t(zero_semaphore(bus, S)); + t(release_semaphore(bus, S, SEM_UNDO)); + t(acquire_semaphore(bus, W, SEM_UNDO)); + } +fail: + return -1; +} + diff --git a/src/bus.h b/src/bus.h new file mode 100644 index 0000000..faeb2c0 --- /dev/null +++ b/src/bus.h @@ -0,0 +1,103 @@ +/** + * MIT/X Consortium License + * + * Copyright © 2015 Mattias Andrée <maandree@member.fsf.org> + * + * Permission is hereby granted, free of charge, to any person obtaining a + * copy of this software and associated documentation files (the "Software"), + * to deal in the Software without restriction, including without limitation + * the rights to use, copy, modify, merge, publish, distribute, sublicense, + * and/or sell copies of the Software, and to permit persons to whom the + * Software is furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in + * all copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL + * THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING + * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER + * DEALINGS IN THE SOFTWARE. + */ +#ifndef BUS_H +#define BUS_H + + +#include <sys/types.h> + + + +/** + * Open the bus for reading only + */ +#define BUS_RDONLY 1 + +/** + * Open the bus for writing only + */ +#define BUS_WRONLY 0 + +/** + * Open the bus for both reading and writing only + */ +#define BUS_RDWR 0 + +/** + * Fail to create bus if its file already exists + */ +#define BUS_EXCL 2 + + + +/** + * The number of bytes in storeable in the shared memory, + * note that this includes the NUL-termination. + * This means that message can be at most one byte smaller. + */ +#define BUS_MEMORY_SIZE 2048 + + + +/** + * Bus information + */ +typedef struct bus +{ + /** + * The key for the semaphore array + */ + key_t key_sem; + + /** + * The key for the + */ + key_t key_shm; + + /** + * The ID of the semaphore array + */ + int sem_id; + + /** + * The address of the shared memory + */ + char *message; +} bus_t; + + + +const char *bus_create(const char *file, int flags); +int bus_unlink(const char *file); + +int bus_open(bus_t *bus, const char *file, int flags); +int bus_close(bus_t *bus); + +int bus_write(const bus_t *bus, const char *message); +int bus_read(const bus_t *bus, int (*callback)(const char *message, void *user_data), void *user_data); + + + +#endif + diff --git a/src/cmdline.c b/src/cmdline.c index de81bc1..d3b2a07 100644 --- a/src/cmdline.c +++ b/src/cmdline.c @@ -1,51 +1,37 @@ -#define _XOPEN_SOURCE 700 -#include <stdlib.h> -#include <stdio.h> -#include <time.h> -#include <errno.h> -#include <string.h> -#include <unistd.h> - -#include <sys/ipc.h> -#include <sys/sem.h> -#include <sys/shm.h> - - - -#ifdef _SEM_SEMUN_UNDEFINED -union semun { - int val; - struct semid_ds *buf; - unsigned short *array; -}; -#endif - - -#define S 0 -#define W 1 -#define X 2 -#define Q 3 - -#define SEMAPHORES 4 -#define MEMORY_SIZE (2 * 1024) - - -#define open_semaphore() ((sem_id = semget(key_sem, SEMAPHORES, 0600)) == -1 ? -1 : 0) -#define acquire_semaphore(semaphore, delta, undo) semaphore_op(semaphore, -delta, undo) -#define release_semaphore(semaphore, delta, undo) semaphore_op(semaphore, +delta, undo) -#define zero_semaphore(semaphore) semaphore_op(semaphore, 0, 0) - - +/** + * MIT/X Consortium License + * + * Copyright © 2015 Mattias Andrée <maandree@member.fsf.org> + * + * Permission is hereby granted, free of charge, to any person obtaining a + * copy of this software and associated documentation files (the "Software"), + * to deal in the Software without restriction, including without limitation + * the rights to use, copy, modify, merge, publish, distribute, sublicense, + * and/or sell copies of the Software, and to permit persons to whom the + * Software is furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in + * all copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL + * THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING + * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER + * DEALINGS IN THE SOFTWARE. + */ +#include "bus.h" + + +/** + * Statement wrapper that goes to `fail` on failure + */ #define t(inst) if ((inst) == -1) goto fail - char *argv0; - -static int sem_id = -1; -static key_t key_sem = -1; -static key_t key_shm = -1; - +static const char *command; static int @@ -76,180 +62,24 @@ fail: static int -create_semaphores(void) -{ - int id = -1, rint, saved_errno; - double r; - union semun values; - - values.array = NULL; - - /* Create semaphore array. */ - for (;;) { - rint = rand(); - r = (double)rint; - r /= (double)RAND_MAX + 1; - r *= (1 << (8 * sizeof(key_t) - 2)) - 1; - key_sem = (key_t)r + 1; - if (key_sem == IPC_PRIVATE) - continue; - id = semget(key_sem, SEMAPHORES, IPC_CREAT | IPC_EXCL | 0600); - if (id != -1) - break; - if ((errno != EEXIST) && (errno != EINTR)) - goto fail; - } - - /* Initialise the array. */ - values.array = calloc(SEMAPHORES, sizeof(unsigned short)); - values.array[X] = 1; - if (!values.array) - goto fail; - if (semctl(id, 0, SETALL, values.array) == -1) - goto fail; - free(values.array); - values.array = NULL; - - printf("%zi\n", (ssize_t)key_sem); - return 0; - -fail: - saved_errno = errno; - if ((id != -1) && (semctl(id, 0, IPC_RMID) == -1)) - perror(argv0); - free(values.array); - errno = saved_errno; - return -1; -} - - -static int -create_shared_memory(void) -{ - int id = -1, rint, saved_errno; - double r; - struct shmid_ds _info; - - /* Create shared memory. */ - for (;;) { - rint = rand(); - r = (double)rint; - r /= (double)RAND_MAX + 1; - r *= (1 << (8 * sizeof(key_t) - 2)) - 1; - key_shm = (key_t)r + 1; - if (key_shm == IPC_PRIVATE) - continue; - id = shmget(key_shm, MEMORY_SIZE, IPC_CREAT | IPC_EXCL | 0600); - if (id != -1) - break; - if ((errno != EEXIST) && (errno != EINTR)) - goto fail; - } - - printf("%zi\n", (ssize_t)key_shm); - return 0; - -fail: - saved_errno = errno; - if ((id != -1) && (shmctl(id, IPC_RMID, &_info) == -1)) - perror(argv0); - errno = saved_errno; - return -1; -} - - -static int -remove_semaphores(void) -{ - int id = semget(key_sem, SEMAPHORES, 0600); - return ((id == -1) || (semctl(id, 0, IPC_RMID) == -1)) ? -1 : 0; -} - - -static int -remove_shared_memory(void) -{ - struct shmid_ds _info; - int id = shmget(key_shm, MEMORY_SIZE, 0600); - return ((id == -1) || (shmctl(sem_id, IPC_RMID, &_info) == -1)) ? -1 : 0; -} - - -static int -semaphore_op(unsigned short semaphore, short delta, int undo) -{ - struct sembuf op; - op.sem_op = delta; - op.sem_num = semaphore; - op.sem_flg = undo ? SEM_UNDO : 0; - return semop(sem_id, &op, 1); -} - - -static int -write_semaphore(unsigned short semaphore, int value) -{ - union semun semval; - semval.val = value; - return semctl(sem_id, semaphore, SETVAL, semval); -} - - - -static int -read_shared_memory(char *message) -{ - int id, saved_errno; - void *address = NULL; - - t(id = shmget(key_shm, MEMORY_SIZE, 0600)); - address = shmat(id, NULL, SHM_RDONLY); - if ((address == (void *)-1) || !address) - goto fail; - strncpy(message, address, MEMORY_SIZE); - t(shmdt(address)); - return 0; - -fail: - saved_errno = errno; - if (address && (shmdt(address) == -1)) - perror(argv0); - errno = saved_errno; - return -1; -} - - -static int -write_shared_memory(const char *message) +spawn_continue(const char *message) { - int id, saved_errno; - void *address = NULL; - - t(id = shmget(key_shm, MEMORY_SIZE, 0600)); - address = shmat(id, NULL, 0); - if ((address == (void *)-1) || !address) - goto fail; - memcpy(address, message, (strlen(message) + 1) * sizeof(char)); - t(shmdt(address)); - return 0; - -fail: - saved_errno = errno; - if (address && (shmdt(address) == -1)) - perror(argv0); - errno = saved_errno; - return -1; + pid_t pid = fork(); + if (pid) + return pid == -1 ? -1 : 1; + setenv("arg", message, 1); + execlp("sh", "sh", "-c", command, NULL); + perror(argv0); + exit(1); } static int -spawn(const char *command, const char *message) +spawn_break(const char *message) { pid_t pid = fork(); - if (pid) return pid == -1 ? -1 : 0; - setenv("arg", message, 1); execlp("sh", "sh", "-c", command, NULL); perror(argv0); @@ -260,55 +90,37 @@ spawn(const char *command, const char *message) int main(int argc, char *argv[]) { - char read_message[MEMORY_SIZE]; + bus_t bus; argv0 = *argv; - if ((argc == 2) && !strcmp(argv[1], "create")) { - srand((unsigned int)time(NULL)); - t(create_semaphores()); - t(create_shared_memory()); - - } else if ((argc == 2) && !strcmp(argv[1], "remove")) { - t(get_keys()); - t(remove_semaphores()); - t(remove_shared_memory()); - - } else if ((argc == 3) && !strcmp(argv[1], "listen")) { - t(get_keys()); - t(open_semaphore()); - t(release_semaphore(S, 1, 1)); - for (;;) { - t(release_semaphore(Q, 1, 0)); - t(zero_semaphore(Q)); - t(read_shared_memory(read_message)); - t(spawn(argv[2], read_message)); - t(release_semaphore(W, 1, 1)); - t(acquire_semaphore(S, 1, 1)); - t(zero_semaphore(S)); - t(release_semaphore(S, 1, 1)); - t(acquire_semaphore(W, 1, 1)); - } - - } else if ((argc == 3) && !strcmp(argv[1], "wait")) { - t(get_keys()); - t(open_semaphore()); - t(release_semaphore(S, 1, 1)); - t(release_semaphore(Q, 1, 0)); - t(zero_semaphore(Q)); - t(read_shared_memory(read_message)); - t(spawn(argv[2], read_message)); - t(acquire_semaphore(S, 1, 1)); - - } else if ((argc == 3) && !strcmp(argv[1], "broadcast")) { - t(get_keys()); - t(open_semaphore()); - t(acquire_semaphore(X, 1, 1)); - t(zero_semaphore(W)); - t(write_shared_memory(argv[2])); - t(write_semaphore(Q, 0)); - t(zero_semaphore(S)); - t(release_semaphore(X, 1, 1)); + if ((argc == 3) && !strcmp(argv[1], "create")) { + t(bus_create(argv[2], 0) ? 0 : -1); + + } else if ((argc == 2) && !strcmp(argv[1], "create")) { + char *file = bus_create(NULL, 0); + t(file ? 0 : -1); + printf("%s\n", file); + + } else if ((argc == 3) && !strcmp(argv[1], "remove")) { + t(bus_unlink(argv[2])); + + } else if ((argc == 4) && !strcmp(argv[1], "listen")) { + command = argv[3]; + t(bus_open(&bus, argv[2], BUS_RDONLY)); + t(bus_read(&bus, spawn_continue, NULL)); + t(bus_close(&bus)); + + } else if ((argc == 4) && !strcmp(argv[1], "wait")) { + command = argv[3]; + t(bus_open(&bus, argv[2], BUS_RDONLY)); + t(bus_read(&bus, spawn_break, NULL)); + t(bus_close(&bus)); + + } else if ((argc == 4) && !strcmp(argv[1], "broadcast")) { + t(bus_open(&bus, argv[2], BUS_WRONLY)); + t(bus_write(&bus, argv[3])) + t(bus_close(&bus)); } else return 2; |