diff options
Diffstat (limited to 'src')
| -rw-r--r-- | src/bus.c | 95 | 
1 files changed, 63 insertions, 32 deletions
| @@ -61,9 +61,14 @@  #define Q  3  /** + * Semaphore used to notify `bus_read` that it may restore `S` + */ +#define N  4 + +/**   * The number of semaphores in the semaphore array   */ -#define BUS_SEMAPHORES  4 +#define BUS_SEMAPHORES  5 @@ -72,7 +77,8 @@   *    * @param   bus:const bus_t *  The bus   * @param   semaphore:int      The index of the semaphore, `S`, `W`, `X` or `Q` - * @param   flags:int          `SEM_UNDO` if the action should be undone when the program exits + * @param   flags:int          `SEM_UNDO` if the action should be undone when the program exits, + *                             `IPC_NOWAIT` if the action should fail if it would block   * @return  :int               0 on success, -1 on error   */  #define acquire_semaphore(bus, semaphore, flags) \ @@ -94,10 +100,11 @@   *    * @param   bus:const bus_t *  The bus   * @param   semaphore:int      The index of the semaphore, `S`, `W`, `X` or `Q` + * @param   flags:int          `IPC_NOWAIT` if the action should fail if it would block   * @return  :int               0 on success, -1 on error   */ -#define zero_semaphore(bus, semaphore) \ -	semaphore_op(bus, semaphore, 0, 0) +#define zero_semaphore(bus, semaphore, flags) \ +	semaphore_op(bus, semaphore, 0, flags)  /**   * Open the semaphore array @@ -601,17 +608,26 @@ fail:   *                   `BUS_MEMORY_SIZE` including the NUL-termination   * @return           0 on success, -1 on error   */ -int +int /* TODO nonblocking */  bus_write(const bus_t *bus, const char *message)  { -	t(acquire_semaphore(bus, X, SEM_UNDO)); -	t(zero_semaphore(bus, W)); +	int state = 0, saved_errno; +	if (acquire_semaphore(bus, X, SEM_UNDO) == -1) +		return -1; +	t(zero_semaphore(bus, W, 0));  	write_shared_memory(bus, message); +	t(release_semaphore(bus, N, SEM_UNDO));  state++;  	t(write_semaphore(bus, Q, 0)); -	t(zero_semaphore(bus, S)); +	t(zero_semaphore(bus, S, 0)); +	t(acquire_semaphore(bus, N, SEM_UNDO));  state--;  	t(release_semaphore(bus, X, SEM_UNDO));  	return 0;  fail: +	saved_errno = errno; +	if (state > 0) +		acquire_semaphore(bus, N, SEM_UNDO); +	release_semaphore(bus, X, SEM_UNDO); +	errno = saved_errno;  	return -1;  } @@ -641,29 +657,36 @@ fail:  int  bus_read(const bus_t *bus, int (*callback)(const char *message, void *user_data), void *user_data)  { -	int r; -	t(release_semaphore(bus, S, SEM_UNDO)); +	int r, state = 0, saved_errno; +	if (release_semaphore(bus, S, SEM_UNDO) == -1) +		return -1;  	t(r = callback(NULL, user_data)); -	if (!r) { -		t(acquire_semaphore(bus, S, SEM_UNDO)); -		return 0; -	} +	if (!r)  goto done;  	for (;;) {  		t(release_semaphore(bus, Q, 0)); -		t(zero_semaphore(bus, Q)); +		t(zero_semaphore(bus, Q, 0));  		t(r = callback(bus->message, user_data)); -		if (!r) { -			t(acquire_semaphore(bus, S, SEM_UNDO)); -			return 0; -		} -		t(release_semaphore(bus, W, SEM_UNDO)); -		t(acquire_semaphore(bus, S, SEM_UNDO)); -		t(zero_semaphore(bus, S)); -		t(release_semaphore(bus, S, SEM_UNDO)); -		t(acquire_semaphore(bus, W, SEM_UNDO)); +		if (!r)  goto done; +		t(release_semaphore(bus, W, SEM_UNDO));  state++; +		t(acquire_semaphore(bus, S, SEM_UNDO));  state++; +		t(zero_semaphore(bus, S, 0)); +		t(zero_semaphore(bus, N, 0)); +		t(release_semaphore(bus, S, SEM_UNDO));  state--; +		t(acquire_semaphore(bus, W, SEM_UNDO));  state--;  	}  fail: +	saved_errno = errno; +	if (state > 1) +		release_semaphore(bus, S, SEM_UNDO); +	if (state > 0) +		acquire_semaphore(bus, W, SEM_UNDO); +	acquire_semaphore(bus, S, SEM_UNDO); +	errno = saved_errno;  	return -1; + +done: +	t(acquire_semaphore(bus, S, SEM_UNDO)); +	return 0;  } @@ -678,26 +701,34 @@ bus_poll_start(bus_t *bus)  int  bus_poll_stop(const bus_t *bus)  { -	return acquire_semaphore(bus, S, SEM_UNDO); +	return acquire_semaphore(bus, S, SEM_UNDO | IPC_NOWAIT);  } -const char * +const char * /* TODO nonblocking */  bus_poll(bus_t *bus)  { +	int state = 0, saved_errno;  	if (!bus->first_poll) { -		t(release_semaphore(bus, W, SEM_UNDO)); -		t(acquire_semaphore(bus, S, SEM_UNDO)); -		t(zero_semaphore(bus, S)); -		t(release_semaphore(bus, S, SEM_UNDO)); -		t(acquire_semaphore(bus, W, SEM_UNDO)); +		t(release_semaphore(bus, W, SEM_UNDO));  state++; +		t(acquire_semaphore(bus, S, SEM_UNDO));  state++; +		t(zero_semaphore(bus, S, 0)); +		t(zero_semaphore(bus, N, 0)); +		t(release_semaphore(bus, S, SEM_UNDO));  state--; +		t(acquire_semaphore(bus, W, SEM_UNDO));  state--;  	} else {  		bus->first_poll = 0;  	}  	t(release_semaphore(bus, Q, 0)); -	t(zero_semaphore(bus, Q)); +	t(zero_semaphore(bus, Q, 0));  	return bus->message;  fail: +	saved_errno = errno; +	if (state > 1) +		release_semaphore(bus, S, SEM_UNDO); +	if (state > 0) +		acquire_semaphore(bus, W, SEM_UNDO); +	errno = saved_errno;  	return NULL;  } | 
