diff options
author | Mattias Andrée <maandree@operamail.com> | 2015-09-04 19:19:22 +0200 |
---|---|---|
committer | Mattias Andrée <maandree@operamail.com> | 2015-09-04 19:19:22 +0200 |
commit | 266004dea7e489246a0234769d26c93d17a19780 (patch) | |
tree | a6d437906f63aa1f23e72a59b7e6983cc233f60c | |
parent | m + prototypes for message spooling and pooling (diff) | |
download | mds-266004dea7e489246a0234769d26c93d17a19780.tar.gz mds-266004dea7e489246a0234769d26c93d17a19780.tar.bz2 mds-266004dea7e489246a0234769d26c93d17a19780.tar.xz |
implement message spool and pool
Signed-off-by: Mattias Andrée <maandree@operamail.com>
-rw-r--r-- | src/libmdsclient/inbound.c | 322 | ||||
-rw-r--r-- | src/libmdsclient/inbound.h | 25 |
2 files changed, 340 insertions, 7 deletions
diff --git a/src/libmdsclient/inbound.c b/src/libmdsclient/inbound.c index 62ea172..cb151a6 100644 --- a/src/libmdsclient/inbound.c +++ b/src/libmdsclient/inbound.c @@ -24,6 +24,8 @@ #include <errno.h> #include <unistd.h> #include <sys/socket.h> +#include <stdio.h> +#include <assert.h> #define try(INSTRUCTION) if ((r = INSTRUCTION) < 0) return r @@ -37,7 +39,7 @@ * * @param this Memory slot in which to store the new message * @return Zero on success, -1 error, `errno` will be set - * accordingly. Destroy the message on error. + * accordingly * * @throws ENOMEM Out of memory. Possibly, the process hit the RLIMIT_AS or * RLIMIT_DATA limit described in getrlimit(2). @@ -51,8 +53,8 @@ int libmds_message_initialise(libmds_message_t* restrict this) this->buffer_size = 128; this->buffer_ptr = 0; this->stage = 0; - this->buffer = malloc(this->buffer_size * sizeof(char)); this->flattened = 0; + this->buffer = malloc(this->buffer_size * sizeof(char)); return this->buffer == NULL ? -1 : 0; } @@ -76,7 +78,7 @@ void libmds_message_destroy(libmds_message_t* restrict this) /** * Release all resources in a message, should * be done even if initialisation fails - * +* * @param this The message * @return The duplicate, you do not need to call `libmds_message_destroy` * on it before you call `free` on it. However, you cannot use @@ -516,3 +518,317 @@ int libmds_message_read(libmds_message_t* restrict this, int fd) } } + + +/** + * Initialise a message spool + * + * @param this The message spool + * @return Zero on success, -1 on error, `errno` will be set accordingly + * + * @throws ENOMEM Out of memory. Possibly, the process hit the RLIMIT_AS or + * RLIMIT_DATA limit described in getrlimit(2). + */ +int libmds_mspool_initialise(libmds_mspool_t* restrict this) +{ + int stage = 0, saved_errno; + this->size = 1; + this->head = 0; + this->tail = 0; + this->spooled_bytes = 0; + this->spool_limit_bytes = 4 << 10; + this->spool_limit_messages = 8; + this->please_post = 0; + this->messages = malloc(sizeof(libmds_message_t*)); + if (this->messages == NULL) + return -1; + if (sem_init(&(this->lock), 0, 1) < 0) goto fail; stage++; + if (sem_init(&(this->semaphore), 0, 0) < 0) goto fail; stage++; + if (sem_init(&(this->wait_semaphore), 0, 0) < 0) goto fail; + return 0; + fail: + saved_errno = errno; + if (stage >= 1) sem_destroy(&(this->lock)); + if (stage >= 2) sem_destroy(&(this->semaphore)); + free(this->messages), this->messages = NULL; + return errno = saved_errno, -1; +} + + +/** + * Destroy a message spool, deallocate its resources + * + * @param this The message spool + */ +void libmds_mspool_destroy(libmds_mspool_t* restrict this) +{ + if (this->messages == NULL) + return; + sem_destroy(&(this->lock)); + sem_destroy(&(this->semaphore)); + sem_destroy(&(this->wait_semaphore)); + free(this->messages); + this->messages = NULL; +} + + +/** + * Spool a message + * + * @param this The message spool + * @param message The message to spool, must be flat (created with `libmds_message_duplicate`) + * @return Zero on success, -1 on error, `errno` will be set accordingly + * + * @throws EINTR If interrupted + * @throws ENOMEM Out of memory. Possibly, the process hit the RLIMIT_AS or + * RLIMIT_DATA limit described in getrlimit(2). + */ +int libmds_mspool_spool(libmds_mspool_t* restrict this, libmds_message_t* restrict message) +{ + libmds_message_t** new; + int saved_errno; + + start_over: + /* Lock. */ + if (sem_wait(&(this->lock)) < 0) + return -1; + + /* Block if spool is full. */ + if ((this->spooled_bytes >= this->spool_limit_bytes) || + (this->head - this->tail >= this->spool_limit_messages)) + { + this->please_post++; + if ((sem_post(&(this->lock)) < 0) || (sem_wait(&(this->wait_semaphore)) < 0)) + return this->please_post--, -1; + goto start_over; + } + + /* Rebase if the tail has come too far. */ + if (this->tail << 1 > this->size) + { + memmove(this->messages, this->messages + this->tail, + (this->head - this->tail) * sizeof(sizeof(libmds_message_t*))); + this->head -= this->tail, this->tail = 0; + } + + /* Grow the spool if necessary. */ + if (this->size == this->head) + { + new = realloc(this->messages, (this->size << 1) * sizeof(libmds_message_t*)); + if (new == NULL) + goto fail; + this->messages = new; + this->size <<= 1; + } + + /* Spool. */ + this->spooled_bytes += message->flattened; + this->messages[this->head++] = message; + + /* Signal. */ + if (sem_post(&(this->semaphore)) < 0) + goto fail; + + /* Unlock. */ + if (sem_post(&(this->lock)) < 0) + return -1; + + return 0; + fail: + saved_errno = errno; + sem_post(&(this->lock)); + return errno = saved_errno, -1; +} + + +/** + * Poll a message from a spool + * + * @param this The message spool, must not be empty + * @return A spooled message, `NULL`on error, `errno` will be set accordingly + */ +static inline libmds_message_t* mspool_poll(libmds_mspool_t* restrict this) +{ + libmds_message_t* msg; + + /* Lock. */ + if (sem_wait(&(this->lock)) < 0) + return sem_post(&(this->semaphore)), NULL; + + /* Fetch message. */ + assert(this->tail < this->head); + msg = this->messages[this->tail++]; + + /* Unblock spooler. */ + if (this->please_post) + { + if (sem_post(&(this->wait_semaphore)) < 0) + perror("libmsdclient"); + else + this->please_post--; + } + + /* Unlock. */ + if (sem_post(&(this->lock)) < 0) + return sem_post(&(this->semaphore)), this->messages[--(this->tail)] = msg, NULL; + + return msg; +} + + +/** + * Poll a message from a spool, wait if empty + * + * @param this The message spool + * @return A spooled message, `NULL`on error, `errno` will be set accordingly + * + * @throws EINTR If interrupted + */ +libmds_message_t* libmds_mspool_poll(libmds_mspool_t* restrict this) +{ + /* Wait until there is a message available. */ + if (sem_wait(&(this->semaphore)) < 0) + return NULL; + + return mspool_poll(this); +} + + +/** + * Poll a message from a spool, wait for a limited time + * or return unsuccessfully if empty + * + * @param this The message spool + * @param deadline The CLOCK_REALTIME time the function must return, + * `NULL` to return immediately if it would block + * @return A spooled message, `NULL`on error, `errno` will be set accordingly + * + * @throws EINTR If interrupted + * @throws EAGAIN If `deadline` is `NULL` and the spool is empty + * @throws EINVAL If `deadline->tv_nsecs` is outside [0, 1 milliard[ + * @throws ETIMEDOUT If the time specified `deadline` passed and the spool was till empty + */ +libmds_message_t* libmds_mspool_poll_try(libmds_mspool_t* restrict this, + const struct timespec* restrict deadline) +{ + /* Is a message available? */ + if ((deadline == NULL) && (sem_trywait(&(this->semaphore)) < 0)) + return NULL; + if ((deadline != NULL) && (sem_timedwait(&(this->semaphore), deadline) < 0)) + return NULL; + + return mspool_poll(this); +} + + + +/** + * Initialise a pool of reusable message allocations + * + * @param this The message allocation pool + * @param size The number of allocations that may be pooled + * @return Zero on success, -1 on error, `errno` will be set accordingly + */ +int libmds_mpool_initialise(libmds_mpool_t* restrict this, size_t size) +{ + int saved_errno; + this->size = size; + this->tip = 0; + this->messages = malloc(size * sizeof(libmds_message_t*)); + if (this->messages == NULL) + return -1; + if (sem_init(&(this->lock), 0, 1) < 0) + goto fail; + return 0; + fail: + saved_errno = errno; + free(this->messages), this->messages = NULL; + return errno = saved_errno, -1; +} + + +/** + * Destroy a pool of reusable message allocations, + * deallocate its resources and pooled allocations + * + * @param this The message allocation pool + */ +void libmds_mpool_destroy(libmds_mpool_t* restrict this) +{ + if (this->messages == NULL) + return; + sem_destroy(&(this->lock)); + free(this->messages); + this->messages = NULL; +} + + +/** + * Add a message allocation to a pool + * + * @param this The message allocation pool + * @param message Message allocation to pool, must be flat (created with + * `libmds_message_duplicate` or fetched with `libmds_mspool_poll` + * or `libmds_mspool_poll_try`) + * @return Zero on success, -1 on error, `errno` will be set accordingly + */ +int libmds_mpool_offer(libmds_mpool_t* restrict this, libmds_message_t* restrict message) +{ + /* Discard if pool is full. */ + if (this->tip == this->size) + return free(message), 0; + + /* Lock. */ + if (sem_wait(&(this->lock)) < 0) + return -1; + + /* Discard if pool is still full. */ + if (this->tip == this->size) + { + free(message); + goto unlock; + } + + /* Pool allocation. */ + this->messages[this->tip++] = message; + + unlock: + /* Unlock. */ + if (sem_post(&(this->lock)) < 0) + return -1; + + return 0; +} + + +/** + * Fetch a message allocation from a pool + * + * @param this The message allocation pool + * @return An offered message allocation, `NULL` on error or if none + * are available. If `NULL` is returned, `errno` is set to zero, + * if the pool was empty, otherwise `errno` will describe the error. + */ +libmds_message_t* libmds_mpool_poll(libmds_mpool_t* restrict this) +{ + libmds_message_t* msg = NULL; + + /* Is the pool empty? Return nothing. */ + if (this->tip == 0) + return errno = 0, NULL; + + /* Lock. */ + if (sem_wait(&(this->lock)) < 0) + return NULL; + + /* Do we have a message? Take it */ + if (this->tip) + msg = this->messages[--(this->tip)]; + + /* Unlock. */ + if (sem_post(&(this->lock)) < 0) + return this->tip--, NULL; + + return errno = 0, msg; +} + diff --git a/src/libmdsclient/inbound.h b/src/libmdsclient/inbound.h index 7682b54..05fcc74 100644 --- a/src/libmdsclient/inbound.h +++ b/src/libmdsclient/inbound.h @@ -210,7 +210,7 @@ typedef struct libmds_mpool * * @param this Memory slot in which to store the new message * @return Zero on success, -1 error, `errno` will be set - * accordingly. Destroy the message on error. + * accordingly * * @throws ENOMEM Out of memory. Possibly, the process hit the RLIMIT_AS or * RLIMIT_DATA limit described in getrlimit(2). @@ -267,6 +267,9 @@ int libmds_message_read(libmds_message_t* restrict this, int fd); * * @param this The message spool * @return Zero on success, -1 on error, `errno` will be set accordingly + * + * @throws ENOMEM Out of memory. Possibly, the process hit the RLIMIT_AS or + * RLIMIT_DATA limit described in getrlimit(2). */ __attribute__((nonnull, warn_unused_result)) int libmds_mspool_initialise(libmds_mspool_t* restrict this); @@ -283,8 +286,12 @@ void libmds_mspool_destroy(libmds_mspool_t* restrict this); * Spool a message * * @param this The message spool - * @param message The message to spool + * @param message The message to spool, must be flat (created with `libmds_message_duplicate`) * @return Zero on success, -1 on error, `errno` will be set accordingly + * + * @throws EINTR If interrupted + * @throws ENOMEM Out of memory. Possibly, the process hit the RLIMIT_AS or + * RLIMIT_DATA limit described in getrlimit(2). */ __attribute__((nonnull, warn_unused_result)) int libmds_mspool_spool(libmds_mspool_t* restrict this, libmds_message_t* restrict message); @@ -294,6 +301,8 @@ int libmds_mspool_spool(libmds_mspool_t* restrict this, libmds_message_t* restri * * @param this The message spool * @return A spooled message, `NULL`on error, `errno` will be set accordingly + * + * @throws EINTR If interrupted */ __attribute__((nonnull, warn_unused_result, malloc)) libmds_message_t* libmds_mspool_poll(libmds_mspool_t* restrict this); @@ -306,9 +315,15 @@ libmds_message_t* libmds_mspool_poll(libmds_mspool_t* restrict this); * @param deadline The CLOCK_REALTIME time the function must return, * `NULL` to return immediately if it would block * @return A spooled message, `NULL`on error, `errno` will be set accordingly + * + * @throws EINTR If interrupted + * @throws EAGAIN If `deadline` is `NULL` and the spool is empty + * @throws EINVAL If `deadline->tv_nsecs` is outside [0, 1 milliard[ + * @throws ETIMEDOUT If the time specified `deadline` passed and the spool was till empty */ __attribute__((nonnull(1), warn_unused_result, malloc)) -libmds_message_t* libmds_mspool_poll_try(libmds_mspool_t* restrict this, const struct timespec* deadline); +libmds_message_t* libmds_mspool_poll_try(libmds_mspool_t* restrict this, + const struct timespec* restrict deadline); @@ -335,7 +350,9 @@ void libmds_mpool_destroy(libmds_mpool_t* restrict this); * Add a message allocation to a pool * * @param this The message allocation pool - * @param message Message allocation to pool + * @param message Message allocation to pool, must be flat (created with + * `libmds_message_duplicate` or fetched with `libmds_mspool_poll` + * or `libmds_mspool_poll_try`) * @return Zero on success, -1 on error, `errno` will be set accordingly */ __attribute__((nonnull, warn_unused_result)) |