diff options
Diffstat (limited to '')
| -rw-r--r-- | src/bus.c | 185 | 
1 files changed, 178 insertions, 7 deletions
| @@ -22,6 +22,7 @@   * DEALINGS IN THE SOFTWARE.   */  #define _XOPEN_SOURCE 700 +#define _GNU_SOURCE  #include "bus.h"  #include <stdlib.h> @@ -119,7 +120,7 @@  	semaphore_op(bus, semaphore, +1, flags)  /** - * Wait for the value of a semphore to become 0 + * Wait for the value of a semaphore to become 0   *    * @param   bus:const bus_t *  The bus   * @param   semaphore:int      The index of the semaphore, `S`, `W`, `X` or `Q` @@ -130,6 +131,43 @@  	semaphore_op(bus, semaphore, 0, flags)  /** + * Decrease the value of a semaphore by 1 + *  + * @param   bus:const bus_t *                The bus + * @param   semaphore:int                    The index of the semaphore, `S`, `W`, `X` or `Q` + * @param   flags:int                        `SEM_UNDO` if the action should be undone when the program exits, + *                                           `IPC_NOWAIT` if the action should fail if it would block + * @param   timeout:const struct timespec *  The amount of time to wait before failing + * @return  :int                             0 on success, -1 on error + */ +#define acquire_semaphore_timed(bus, semaphore, flags, timeout) \ +	semaphore_op_timed(bus, semaphore, -1, flags, timeout) + +/** + * Increase the value of a semaphore by 1 + *  + * @param   bus:const bus_t *                The bus + * @param   semaphore:int                    The index of the semaphore, `S`, `W`, `X` or `Q` + * @param   flags:int                        `SEM_UNDO` if the action should be undone when the program exits + * @param   timeout:const struct timespec *  The amount of time to wait before failing + * @return  :int                             0 on success, -1 on error + */ +#define release_semaphore_timed(bus, semaphore, flags, timeout) \ +	semaphore_op_timed(bus, semaphore, +1, flags, timeout) + +/** + * Wait for the value of a semaphore to become 0 + *  + * @param   bus:const bus_t *                The bus + * @param   semaphore:int                    The index of the semaphore, `S`, `W`, `X` or `Q` + * @param   flags:int                        `IPC_NOWAIT` if the action should fail if it would block + * @param   timeout:const struct timespec *  The amount of time to wait before failing + * @return  :int                             0 on success, -1 on error + */ +#define zero_semaphore_timed(bus, semaphore, flags, timeout) \ +	semaphore_op_timed(bus, semaphore, 0, flags, timeout) + +/**   * Open the semaphore array   *    * @param   bus:const bus_t *  The bus @@ -150,6 +188,21 @@  /** + * Set `delta` to the convertion of `timeout` from absolute to relative time, + * measured in the clock whose ID is specified by `clockid` + *  + * @scope  timeout:struct timespec          Output variable for relative time + * @scope  timeout:const struct timespec *  The absolute time + * @scope  clockid:clockid_t                The clock time is measured + */ +#define DELTA \ +	do { \ +		if (absolute_time_to_delta_time(&delta, timeout, clockid) < 0)  goto fail; \ +		else if ((delta.tv_sec < 0) || (delta.tv_nsec < 0))  { errno = EAGAIN; goto fail; } \ +	} while (0) + + +/**   * If `flags & (bus_flag)`, this macro evalutes to `sys_flag`,   * otherwise this macro evalutes to 0.   */ @@ -319,6 +372,27 @@ semaphore_op(const bus_t *bus, int semaphore, int delta, int flags)  /** + * Increase or decrease the value of a semaphore, or wait the it to become 0 + *  + * @param   bus        Bus information + * @param   semaphore  The index of the semaphore, `S`, `W`, `X` or `Q` + * @param   delta      The adjustment to make to the semaphore's value, 0 to wait for it to become 0 + * @param   flags      `SEM_UNDO` if the action should be undone when the program exits + * @param   timeout    The amount of time to wait before failing + * @return             0 on success, -1 on error + */ +static int +semaphore_op_timed(const bus_t *bus, int semaphore, int delta, int flags, const struct timespec *timeout) +{ +	struct sembuf op; +	op.sem_num = (unsigned short)semaphore; +	op.sem_op = (short)delta; +	op.sem_flg = (short)flags; +	return semtimedop(bus->sem_id, &op, (size_t)1, timeout); +} + + +/**   * Set the value of a semaphore   *    * @param   bus        Bus information @@ -722,10 +796,41 @@ fail:  int bus_write_timed(const bus_t *bus, const char *message,  		    const struct timespec *timeout, clockid_t clockid)  { -	/* TODO bus_write_timed */ +	int saved_errno; +#ifndef BUS_SEMAPHORES_ARE_SYNCHRONOUS +	int state = 0; +#endif +	struct timespec delta;  	if (!timeout)  		return bus_write(bus, message, 0); -	(void) bus, (void) message, (void) timeout, (void) clockid; + +	DELTA; +	if (acquire_semaphore_timed(bus, X, SEM_UNDO, &delta) == -1) +		return -1; +	DELTA; +	t(zero_semaphore_timed(bus, W, 0, &delta)); +	write_shared_memory(bus, message); +#ifndef BUS_SEMAPHORES_ARE_SYNCHRONOUS +	t(release_semaphore(bus, N, SEM_UNDO));  state++; +#endif +	t(write_semaphore(bus, Q, 0)); +	t(zero_semaphore(bus, S, 0)); +#ifndef BUS_SEMAPHORES_ARE_SYNCHRONOUS +	t(acquire_semaphore(bus, N, SEM_UNDO));  state--; +#endif +	t(release_semaphore(bus, X, SEM_UNDO)); +	return 0; + +fail: +	saved_errno = errno; +#ifndef BUS_SEMAPHORES_ARE_SYNCHRONOUS +	if (state > 0) +		acquire_semaphore(bus, N, SEM_UNDO); +#endif +	release_semaphore(bus, X, SEM_UNDO); +	errno = saved_errno; +	return -1; +  } @@ -822,10 +927,46 @@ done:  int bus_read_timed(const bus_t *bus, int (*callback)(const char *message, void *user_data),  		   void *user_data, const struct timespec *timeout, clockid_t clockid)  { -	/* TODO bus_read_timed */ +	int r, state = 0, saved_errno; +	struct timespec delta;  	if (!timeout)  		return bus_read(bus, callback, user_data); -	(void) bus, (void) callback, (void) user_data, (void) timeout, (void) clockid; + +	DELTA; +	if (release_semaphore_timed(bus, S, SEM_UNDO, &delta) == -1) +		return -1; +	t(r = callback(NULL, user_data)); +	if (!r)  goto done; +	for (;;) { +		DELTA; +		t(release_semaphore_timed(bus, Q, 0, &delta)); +		DELTA; +		t(zero_semaphore_timed(bus, Q, 0, &delta)); +		t(r = callback(bus->message, user_data)); +		if (!r)  goto done; +		t(release_semaphore(bus, W, SEM_UNDO));  state++; +		t(acquire_semaphore(bus, S, SEM_UNDO));  state++; +		t(zero_semaphore(bus, S, 0)); +#ifndef BUS_SEMAPHORES_ARE_SYNCHRONOUS_ME_HARDER +		t(zero_semaphore(bus, N, 0)); +#endif +		t(release_semaphore(bus, S, SEM_UNDO));  state--; +		t(acquire_semaphore(bus, W, SEM_UNDO));  state--; +	} + +fail: +	saved_errno = errno; +	if (state > 1) +		release_semaphore(bus, S, SEM_UNDO); +	if (state > 0) +		acquire_semaphore(bus, W, SEM_UNDO); +	acquire_semaphore(bus, S, SEM_UNDO); +	errno = saved_errno; +	return -1; + +done: +	t(acquire_semaphore(bus, S, SEM_UNDO)); +	return 0;  } @@ -943,10 +1084,40 @@ fail:   */  const char *bus_poll_timed(bus_t *bus, const struct timespec *timeout, clockid_t clockid)  { -	/* TODO bus_poll_timed */ +	int state = 0, saved_errno; +	struct timespec delta;  	if (!timeout)  		return bus_poll(bus); -	(void) bus, (void) timeout, (void) clockid; + +	if (!bus->first_poll) { +		t(release_semaphore(bus, W, SEM_UNDO));  state++; +		t(acquire_semaphore(bus, S, SEM_UNDO));  state++; +		t(zero_semaphore(bus, S, 0)); +#ifndef BUS_SEMAPHORES_ARE_SYNCHRONOUS_ME_HARDER +		t(zero_semaphore(bus, N, 0)); +#endif +		t(release_semaphore(bus, S, SEM_UNDO));  state--; +		t(acquire_semaphore(bus, W, SEM_UNDO));  state--; +	} else { +		bus->first_poll = 0; +	} +	state--; +	DELTA; +	t(release_semaphore_timed(bus, Q, 0, &delta)); +	DELTA; +	t(zero_semaphore_timed(bus, Q, 0, &delta)); +	return bus->message; + +fail: +	saved_errno = errno; +	if (state > 1) +		release_semaphore(bus, S, SEM_UNDO); +	if (state > 0) +		acquire_semaphore(bus, W, SEM_UNDO); +	if (state < 0) +		bus->first_poll = 1; +	errno = saved_errno; +	return NULL;  } | 
