diff options
| author | Mattias Andrée <maandree@operamail.com> | 2015-04-16 04:50:14 +0200 | 
|---|---|---|
| committer | Mattias Andrée <maandree@operamail.com> | 2015-04-16 04:50:14 +0200 | 
| commit | 263f78a945d407319140c26bcbff3cec6fbd364f (patch) | |
| tree | e30c4d7b3d524d19e294cf64dbd68f8041ddadf6 /src/bus.c | |
| parent | m readme (diff) | |
| download | bus-263f78a945d407319140c26bcbff3cec6fbd364f.tar.gz bus-263f78a945d407319140c26bcbff3cec6fbd364f.tar.bz2 bus-263f78a945d407319140c26bcbff3cec6fbd364f.tar.xz | |
work on librarisation
Signed-off-by: Mattias Andrée <maandree@operamail.com>
Diffstat (limited to '')
| -rw-r--r-- | src/bus.c | 459 | 
1 files changed, 459 insertions, 0 deletions
| diff --git a/src/bus.c b/src/bus.c new file mode 100644 index 0000000..6dd6f1a --- /dev/null +++ b/src/bus.c @@ -0,0 +1,459 @@ +/** + * MIT/X Consortium License + *  + * Copyright © 2015  Mattias Andrée <maandree@member.fsf.org> + *  + * Permission is hereby granted, free of charge, to any person obtaining a + * copy of this software and associated documentation files (the "Software"), + * to deal in the Software without restriction, including without limitation + * the rights to use, copy, modify, merge, publish, distribute, sublicense, + * and/or sell copies of the Software, and to permit persons to whom the + * Software is furnished to do so, subject to the following conditions: + *  + * The above copyright notice and this permission notice shall be included in + * all copies or substantial portions of the Software. + *  + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT.  IN NO EVENT SHALL + * THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING + * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER + * DEALINGS IN THE SOFTWARE. + */ +#define _XOPEN_SOURCE 700 +#include "bus.h" + +#include <stdlib.h> +#include <stdio.h> +#include <time.h> +#include <errno.h> +#include <string.h> +#include <unistd.h> + +#include <sys/ipc.h> +#include <sys/sem.h> +#include <sys/shm.h> + + + +/** + * Semaphore used to signal `bus_write` that `bus_read` is ready + */ +#define S 0 + +/** + * Semaphore for making `bus_write` wait while `bus_read` is reseting `S` + */ +#define W 1 + +/** + * Binary semaphore for making `bus_write` exclusively locked + */ +#define X 2 + +/** + * Semaphore used to cue `bus_read` that it may read the shared memory + */ +#define Q 3 + +/** + * The number of semaphores in the semaphore array + */ +#define BUS_SEMAPHORES 4 + + + +/** + * Decrease the value of a semaphore by 1 + *  + * @param   bus:const bus_t *         The bus + * @param   semaphore:unsigned short  The index of the semaphore, `S`, `W`, `X` or `Q` + * @param   flags:short               `SEM_UNDO` if the action should be undone when the program exits + * @return  :int                      0 on success, -1 on error + */ +#define acquire_semaphore(bus, semaphore, flags) \ +	semaphore_op(bus, semaphore, -1, flags) + +/** + * Increase the value of a semaphore by 1 + *  + * @param   bus:const bus_t *         The bus + * @param   semaphore:unsigned short  The index of the semaphore, `S`, `W`, `X` or `Q` + * @param   flags:short               `SEM_UNDO` if the action should be undone when the program exits + * @return  :int                      0 on success, -1 on error + */ +#define release_semaphore(bus, semaphore, flags) \ +	semaphore_op(bus, semaphore, +1, flags) + +/** + * Wait for the value of a semphore to become 0 + *  + * @param   bus:const bus_t *         The bus + * @param   semaphore:unsigned short  The index of the semaphore, `S`, `W`, `X` or `Q` + * @return  :int                      0 on success, -1 on error + */ +#define zero_semaphore(bus, semaphore) \ +	semaphore_op(bus, semaphore, 0, 0) + +/** + * Open the semaphore array + *  + * @param   bus:const bus_t *  The bus + * @return  :int               0 on success, -1 on error + */ +#define open_semaphores(bus) \ +	(((bus)->sem_id = semget((bus)->key_sem, BUS_SEMAPHORES, 0600)) == -1 ? -1 : 0) + +/** + * Write a message to the shared memory + *  + * @param   bus:const bus_t *  The bus + * @param   msg:const char *   The message + * @return  :int               0 on success, -1 on error + */ +#define write_shared_memory(bus, msg) \ +	(memcpy((bus)->message, msg, (strlen(msg) + 1) * sizeof(char))) + + + +/** + * Statement wrapper that goes to `fail` on failure + */ +#define t(inst) \ +	if ((inst) == -1)  goto fail + + + +#ifdef _SEM_SEMUN_UNDEFINED +union semun { +	int val; +	struct semid_ds *buf; +	unsigned short *array; +}; +#endif + + + +/** + * Create a semaphore array for the bus + *  + * @param   bus  Bus information to fill with the key of the created semaphore array + * @return       0 on success, -1 on error + */ +static int +create_semaphores(bus_t *bus) +{ +	int id = -1, rint, saved_errno; +	double r; +	union semun values; + +	values.array = NULL; + +	/* Create semaphore array. */ +	for (;;) { +		rint = rand(); +		r = (double)rint; +		r /= (double)RAND_MAX + 1; +		r *= (1 << (8 * sizeof(key_t) - 2)) - 1; +		bus->key_sem = (key_t)r + 1; +		if (bus->key_sem == IPC_PRIVATE) +			continue; +		id = semget(bus->key_sem, BUS_SEMAPHORES, IPC_CREAT | IPC_EXCL | 0600); +		if (id != -1) +			break; +		if ((errno != EEXIST) && (errno != EINTR)) +			goto fail; +	} + +	/* Initialise the array. */ +	values.array = calloc(BUS_SEMAPHORES, sizeof(unsigned short)); +	values.array[X] = 1; +	if (!values.array) +		goto fail; +	if (semctl(id, 0, SETALL, values.array) == -1) +		goto fail; +	free(values.array); +	values.array = NULL; + +	return 0; + +fail: +	saved_errno = errno; +	if ((id != -1) && (semctl(id, 0, IPC_RMID) == -1)) +		perror(argv0); +	free(values.array); +	errno = saved_errno; +	return -1; +} + + +/** + * Create a shared memory for the bus + *  + * @param   bus  Bus information to fill with the key of the created shared memory + * @return       0 on success, -1 on error + */ +static int +create_shared_memory(bus_t *bus) +{ +	int id = -1, rint, saved_errno; +	double r; +	struct shmid_ds _info; + +	/* Create shared memory. */ +	for (;;) { +		rint = rand(); +		r = (double)rint; +		r /= (double)RAND_MAX + 1; +		r *= (1 << (8 * sizeof(key_t) - 2)) - 1; +		bus->key_shm = (key_t)r + 1; +		if (bus->key_shm == IPC_PRIVATE) +			continue; +		id = shmget(bus->key_shm, BUS_MEMORY_SIZE, IPC_CREAT | IPC_EXCL | 0600); +		if (id != -1) +			break; +		if ((errno != EEXIST) && (errno != EINTR)) +			goto fail; +	} + +	return 0; + +fail: +	saved_errno = errno; +	if ((id != -1) && (shmctl(id, IPC_RMID, &_info) == -1)) +		perror(argv0); +	errno = saved_errno; +	return -1; +} + + +/** + * Remove the semaphore array for the bus + *  + * @param   bus  Bus information + * @return       0 on success, -1 on error + */ +static int +remove_semaphores(const bus_t *bus) +{ +	int id = semget(bus->key_sem, BUS_SEMAPHORES, 0600); +	return ((id == -1) || (semctl(id, 0, IPC_RMID) == -1)) ? -1 : 0; +} + + +/** + * Remove the shared memory for the bus + *  + * @param   bus  Bus information + * @return       0 on success, -1 on error + */ +static int +remove_shared_memory(const bus_t *bus) +{ +	struct shmid_ds _info; +	int id = shmget(bus->key_shm, BUS_MEMORY_SIZE, 0600); +	return ((id == -1) || (shmctl(sem_id, IPC_RMID, &_info) == -1)) ? -1 : 0; +} + + +/** + * Increase or decrease the value of a semaphore, or wait the it to become 0 + *  + * @param   bus        Bus information + * @param   semaphore  The index of the semaphore, `S`, `W`, `X` or `Q` + * @param   delta      The adjustment to make to the semaphore's value, 0 to wait for it to become 0 + * @param   flags      `SEM_UNDO` if the action should be undone when the program exits + * @return             0 on success, -1 on error + */ +static int +semaphore_op(const bus_t *bus, unsigned short semaphore, short delta, short flags) +{ +	struct sembuf op; +	op.sem_op = delta; +	op.sem_num = semaphore; +	op.sem_flg = flags; +	return semop(bus->sem_id, &op, 1); +} + + +/** + * Set the value of a semaphore + *  + * @param   bus        Bus information + * @param   semaphore  The index of the semaphore, `S`, `W`, `X` or `Q` + * @param   value      The new value of the semaphore + * @return             0 on success, -1 on error + */ +static int +write_semaphore(const bus_t *bus, unsigned short semaphore, int value) +{ +	union semun semval; +	semval.val = value; +	return semctl(bus->sem_id, semaphore, SETVAL, semval); +} + + +/** + * Open the shared memory for the bus + *  + * @param   bus    Bus information + * @param   flags  `BUS_RDONLY`, `BUS_WRONLY` or `BUS_RDWR` + * @return         0 on success, -1 on error + */ +static int +open_shared_memory(const bus_t *bus, int flags) +{ +	int id; +	void *address; +	t(id = shmget(bus->key_shm, BUS_MEMORY_SIZE, 0600)); +	address = shmat(id, NULL, (flags & BUS_RDONLY) ? SHM_RDONLY : 0); +	if ((address == (void *)-1) || !address) +		goto fail; +	this->message = (char *)address; +	return 0; +fail: +	return -1; +} + + +/** + * Close the shared memory for the bus + *  + * @param   bus  Bus information + * @return       0 on success, -1 on error + */ +static int +close_shared_memory(const bus_t *bus) +{ +	t(shmdt(this->message)); +	this->message = NULL; +	return 0; +fail: +	return -1; +} + + + +const char * +bus_create(const char *file, int flags) +{ +	int saved_errno; +	bus_t bus; +	bus.sem_id = -1; +	bus.key_sem = -1; +	bus.key_shm = -1; +	bus.message = NULL; + +	srand((unsigned int)time(NULL) + (unsigned int)rand()); + +	t(create_semaphores(&bus)); +	t(create_shared_memory(&bus)); +	return NULL; /* TODO */ + +fail: +	saved_errno = errno; +	if (bus.key_sem) +		remove_semaphores(&bus); +	if (bus.key_shm) +		remove_shared_memory(&bus); +	errno = saved_errno; +	return NULL; +} + + +int +bus_unlink(const char *file) +{ +	int r = 0, saved_errno = 0; +	bus_t bus; +	t(bus_open(&bus, file, -1)); + +	r |= remove_semaphores(&bus); +	if (r && !saved_errno) +		saved_errno = errno; + +	r |= remove_shared_memory(&bus); +	if (r && !saved_errno) +		saved_errno = errno; + +	r |= unlink(file); +	if (r && !saved_errno) +		saved_errno = errno; + +	errno = saved_errno; +	return r; +fail: +	return -1; +} + + +int +bus_open(bus_t *bus, const char *file, int flags) +{ +	bus->sem_id = -1; +	bus->key_sem = -1; +	bus->key_shm = -1; +	bus->message = NULL; + +	/* TODO */ + +	if (flags >= 0) { +		t(open_semaphores(bus)); +		t(open_shared_memory(bus, flags)); +	} +	return 0; +fail: +	return -1; +} + + +int +bus_close(bus_t *bus) +{ +	if (bus->address) +		t(close_shared_memory(bus)); +	return 0; +fail: +	return -1; +} + + +int +bus_write(const bus_t *bus, const char *message) +{ +	t(acquire_semaphore(bus, X, SEM_UNDO)); +	t(zero_semaphore(bus, W)); +	t(write_shared_memory(bus, message)); +	t(write_semaphore(bus, Q, 0)); +	t(zero_semaphore(bus, S)); +	t(release_semaphore(bus, X, SEM_UNDO)); +	return 0; +fail: +	return -1; +} + + +int +bus_read(const bus_t *bus, int (*callback)(const char *message, void *user_data), void *user_data) +{ +	int r; +	t(release_semaphore(bus, S, SEM_UNDO)); +	for (;;) { +		t(release_semaphore(bus, Q, 0)); +		t(zero_semaphore(bus, Q)); +		t(r = callback(bus->message, user_data)); +		if (!r) { +			t(acquire_semaphore(bus, S, SEM_UNDO)); +			return 0; +		} +		t(release_semaphore(bus, W, SEM_UNDO)); +		t(acquire_semaphore(bus, S, SEM_UNDO)); +		t(zero_semaphore(bus, S)); +		t(release_semaphore(bus, S, SEM_UNDO)); +		t(acquire_semaphore(bus, W, SEM_UNDO)); +	} +fail: +	return -1; +} + | 
