aboutsummaryrefslogtreecommitdiffstats
path: root/src
diff options
context:
space:
mode:
authorMattias Andrée <maandree@operamail.com>2015-09-04 19:19:22 +0200
committerMattias Andrée <maandree@operamail.com>2015-09-04 19:19:22 +0200
commit266004dea7e489246a0234769d26c93d17a19780 (patch)
treea6d437906f63aa1f23e72a59b7e6983cc233f60c /src
parentm + prototypes for message spooling and pooling (diff)
downloadmds-266004dea7e489246a0234769d26c93d17a19780.tar.gz
mds-266004dea7e489246a0234769d26c93d17a19780.tar.bz2
mds-266004dea7e489246a0234769d26c93d17a19780.tar.xz
implement message spool and pool
Signed-off-by: Mattias Andrée <maandree@operamail.com>
Diffstat (limited to 'src')
-rw-r--r--src/libmdsclient/inbound.c322
-rw-r--r--src/libmdsclient/inbound.h25
2 files changed, 340 insertions, 7 deletions
diff --git a/src/libmdsclient/inbound.c b/src/libmdsclient/inbound.c
index 62ea172..cb151a6 100644
--- a/src/libmdsclient/inbound.c
+++ b/src/libmdsclient/inbound.c
@@ -24,6 +24,8 @@
#include <errno.h>
#include <unistd.h>
#include <sys/socket.h>
+#include <stdio.h>
+#include <assert.h>
#define try(INSTRUCTION) if ((r = INSTRUCTION) < 0) return r
@@ -37,7 +39,7 @@
*
* @param this Memory slot in which to store the new message
* @return Zero on success, -1 error, `errno` will be set
- * accordingly. Destroy the message on error.
+ * accordingly
*
* @throws ENOMEM Out of memory. Possibly, the process hit the RLIMIT_AS or
* RLIMIT_DATA limit described in getrlimit(2).
@@ -51,8 +53,8 @@ int libmds_message_initialise(libmds_message_t* restrict this)
this->buffer_size = 128;
this->buffer_ptr = 0;
this->stage = 0;
- this->buffer = malloc(this->buffer_size * sizeof(char));
this->flattened = 0;
+ this->buffer = malloc(this->buffer_size * sizeof(char));
return this->buffer == NULL ? -1 : 0;
}
@@ -76,7 +78,7 @@ void libmds_message_destroy(libmds_message_t* restrict this)
/**
* Release all resources in a message, should
* be done even if initialisation fails
- *
+*
* @param this The message
* @return The duplicate, you do not need to call `libmds_message_destroy`
* on it before you call `free` on it. However, you cannot use
@@ -516,3 +518,317 @@ int libmds_message_read(libmds_message_t* restrict this, int fd)
}
}
+
+
+/**
+ * Initialise a message spool
+ *
+ * @param this The message spool
+ * @return Zero on success, -1 on error, `errno` will be set accordingly
+ *
+ * @throws ENOMEM Out of memory. Possibly, the process hit the RLIMIT_AS or
+ * RLIMIT_DATA limit described in getrlimit(2).
+ */
+int libmds_mspool_initialise(libmds_mspool_t* restrict this)
+{
+ int stage = 0, saved_errno;
+ this->size = 1;
+ this->head = 0;
+ this->tail = 0;
+ this->spooled_bytes = 0;
+ this->spool_limit_bytes = 4 << 10;
+ this->spool_limit_messages = 8;
+ this->please_post = 0;
+ this->messages = malloc(sizeof(libmds_message_t*));
+ if (this->messages == NULL)
+ return -1;
+ if (sem_init(&(this->lock), 0, 1) < 0) goto fail; stage++;
+ if (sem_init(&(this->semaphore), 0, 0) < 0) goto fail; stage++;
+ if (sem_init(&(this->wait_semaphore), 0, 0) < 0) goto fail;
+ return 0;
+ fail:
+ saved_errno = errno;
+ if (stage >= 1) sem_destroy(&(this->lock));
+ if (stage >= 2) sem_destroy(&(this->semaphore));
+ free(this->messages), this->messages = NULL;
+ return errno = saved_errno, -1;
+}
+
+
+/**
+ * Destroy a message spool, deallocate its resources
+ *
+ * @param this The message spool
+ */
+void libmds_mspool_destroy(libmds_mspool_t* restrict this)
+{
+ if (this->messages == NULL)
+ return;
+ sem_destroy(&(this->lock));
+ sem_destroy(&(this->semaphore));
+ sem_destroy(&(this->wait_semaphore));
+ free(this->messages);
+ this->messages = NULL;
+}
+
+
+/**
+ * Spool a message
+ *
+ * @param this The message spool
+ * @param message The message to spool, must be flat (created with `libmds_message_duplicate`)
+ * @return Zero on success, -1 on error, `errno` will be set accordingly
+ *
+ * @throws EINTR If interrupted
+ * @throws ENOMEM Out of memory. Possibly, the process hit the RLIMIT_AS or
+ * RLIMIT_DATA limit described in getrlimit(2).
+ */
+int libmds_mspool_spool(libmds_mspool_t* restrict this, libmds_message_t* restrict message)
+{
+ libmds_message_t** new;
+ int saved_errno;
+
+ start_over:
+ /* Lock. */
+ if (sem_wait(&(this->lock)) < 0)
+ return -1;
+
+ /* Block if spool is full. */
+ if ((this->spooled_bytes >= this->spool_limit_bytes) ||
+ (this->head - this->tail >= this->spool_limit_messages))
+ {
+ this->please_post++;
+ if ((sem_post(&(this->lock)) < 0) || (sem_wait(&(this->wait_semaphore)) < 0))
+ return this->please_post--, -1;
+ goto start_over;
+ }
+
+ /* Rebase if the tail has come too far. */
+ if (this->tail << 1 > this->size)
+ {
+ memmove(this->messages, this->messages + this->tail,
+ (this->head - this->tail) * sizeof(sizeof(libmds_message_t*)));
+ this->head -= this->tail, this->tail = 0;
+ }
+
+ /* Grow the spool if necessary. */
+ if (this->size == this->head)
+ {
+ new = realloc(this->messages, (this->size << 1) * sizeof(libmds_message_t*));
+ if (new == NULL)
+ goto fail;
+ this->messages = new;
+ this->size <<= 1;
+ }
+
+ /* Spool. */
+ this->spooled_bytes += message->flattened;
+ this->messages[this->head++] = message;
+
+ /* Signal. */
+ if (sem_post(&(this->semaphore)) < 0)
+ goto fail;
+
+ /* Unlock. */
+ if (sem_post(&(this->lock)) < 0)
+ return -1;
+
+ return 0;
+ fail:
+ saved_errno = errno;
+ sem_post(&(this->lock));
+ return errno = saved_errno, -1;
+}
+
+
+/**
+ * Poll a message from a spool
+ *
+ * @param this The message spool, must not be empty
+ * @return A spooled message, `NULL`on error, `errno` will be set accordingly
+ */
+static inline libmds_message_t* mspool_poll(libmds_mspool_t* restrict this)
+{
+ libmds_message_t* msg;
+
+ /* Lock. */
+ if (sem_wait(&(this->lock)) < 0)
+ return sem_post(&(this->semaphore)), NULL;
+
+ /* Fetch message. */
+ assert(this->tail < this->head);
+ msg = this->messages[this->tail++];
+
+ /* Unblock spooler. */
+ if (this->please_post)
+ {
+ if (sem_post(&(this->wait_semaphore)) < 0)
+ perror("libmsdclient");
+ else
+ this->please_post--;
+ }
+
+ /* Unlock. */
+ if (sem_post(&(this->lock)) < 0)
+ return sem_post(&(this->semaphore)), this->messages[--(this->tail)] = msg, NULL;
+
+ return msg;
+}
+
+
+/**
+ * Poll a message from a spool, wait if empty
+ *
+ * @param this The message spool
+ * @return A spooled message, `NULL`on error, `errno` will be set accordingly
+ *
+ * @throws EINTR If interrupted
+ */
+libmds_message_t* libmds_mspool_poll(libmds_mspool_t* restrict this)
+{
+ /* Wait until there is a message available. */
+ if (sem_wait(&(this->semaphore)) < 0)
+ return NULL;
+
+ return mspool_poll(this);
+}
+
+
+/**
+ * Poll a message from a spool, wait for a limited time
+ * or return unsuccessfully if empty
+ *
+ * @param this The message spool
+ * @param deadline The CLOCK_REALTIME time the function must return,
+ * `NULL` to return immediately if it would block
+ * @return A spooled message, `NULL`on error, `errno` will be set accordingly
+ *
+ * @throws EINTR If interrupted
+ * @throws EAGAIN If `deadline` is `NULL` and the spool is empty
+ * @throws EINVAL If `deadline->tv_nsecs` is outside [0, 1 milliard[
+ * @throws ETIMEDOUT If the time specified `deadline` passed and the spool was till empty
+ */
+libmds_message_t* libmds_mspool_poll_try(libmds_mspool_t* restrict this,
+ const struct timespec* restrict deadline)
+{
+ /* Is a message available? */
+ if ((deadline == NULL) && (sem_trywait(&(this->semaphore)) < 0))
+ return NULL;
+ if ((deadline != NULL) && (sem_timedwait(&(this->semaphore), deadline) < 0))
+ return NULL;
+
+ return mspool_poll(this);
+}
+
+
+
+/**
+ * Initialise a pool of reusable message allocations
+ *
+ * @param this The message allocation pool
+ * @param size The number of allocations that may be pooled
+ * @return Zero on success, -1 on error, `errno` will be set accordingly
+ */
+int libmds_mpool_initialise(libmds_mpool_t* restrict this, size_t size)
+{
+ int saved_errno;
+ this->size = size;
+ this->tip = 0;
+ this->messages = malloc(size * sizeof(libmds_message_t*));
+ if (this->messages == NULL)
+ return -1;
+ if (sem_init(&(this->lock), 0, 1) < 0)
+ goto fail;
+ return 0;
+ fail:
+ saved_errno = errno;
+ free(this->messages), this->messages = NULL;
+ return errno = saved_errno, -1;
+}
+
+
+/**
+ * Destroy a pool of reusable message allocations,
+ * deallocate its resources and pooled allocations
+ *
+ * @param this The message allocation pool
+ */
+void libmds_mpool_destroy(libmds_mpool_t* restrict this)
+{
+ if (this->messages == NULL)
+ return;
+ sem_destroy(&(this->lock));
+ free(this->messages);
+ this->messages = NULL;
+}
+
+
+/**
+ * Add a message allocation to a pool
+ *
+ * @param this The message allocation pool
+ * @param message Message allocation to pool, must be flat (created with
+ * `libmds_message_duplicate` or fetched with `libmds_mspool_poll`
+ * or `libmds_mspool_poll_try`)
+ * @return Zero on success, -1 on error, `errno` will be set accordingly
+ */
+int libmds_mpool_offer(libmds_mpool_t* restrict this, libmds_message_t* restrict message)
+{
+ /* Discard if pool is full. */
+ if (this->tip == this->size)
+ return free(message), 0;
+
+ /* Lock. */
+ if (sem_wait(&(this->lock)) < 0)
+ return -1;
+
+ /* Discard if pool is still full. */
+ if (this->tip == this->size)
+ {
+ free(message);
+ goto unlock;
+ }
+
+ /* Pool allocation. */
+ this->messages[this->tip++] = message;
+
+ unlock:
+ /* Unlock. */
+ if (sem_post(&(this->lock)) < 0)
+ return -1;
+
+ return 0;
+}
+
+
+/**
+ * Fetch a message allocation from a pool
+ *
+ * @param this The message allocation pool
+ * @return An offered message allocation, `NULL` on error or if none
+ * are available. If `NULL` is returned, `errno` is set to zero,
+ * if the pool was empty, otherwise `errno` will describe the error.
+ */
+libmds_message_t* libmds_mpool_poll(libmds_mpool_t* restrict this)
+{
+ libmds_message_t* msg = NULL;
+
+ /* Is the pool empty? Return nothing. */
+ if (this->tip == 0)
+ return errno = 0, NULL;
+
+ /* Lock. */
+ if (sem_wait(&(this->lock)) < 0)
+ return NULL;
+
+ /* Do we have a message? Take it */
+ if (this->tip)
+ msg = this->messages[--(this->tip)];
+
+ /* Unlock. */
+ if (sem_post(&(this->lock)) < 0)
+ return this->tip--, NULL;
+
+ return errno = 0, msg;
+}
+
diff --git a/src/libmdsclient/inbound.h b/src/libmdsclient/inbound.h
index 7682b54..05fcc74 100644
--- a/src/libmdsclient/inbound.h
+++ b/src/libmdsclient/inbound.h
@@ -210,7 +210,7 @@ typedef struct libmds_mpool
*
* @param this Memory slot in which to store the new message
* @return Zero on success, -1 error, `errno` will be set
- * accordingly. Destroy the message on error.
+ * accordingly
*
* @throws ENOMEM Out of memory. Possibly, the process hit the RLIMIT_AS or
* RLIMIT_DATA limit described in getrlimit(2).
@@ -267,6 +267,9 @@ int libmds_message_read(libmds_message_t* restrict this, int fd);
*
* @param this The message spool
* @return Zero on success, -1 on error, `errno` will be set accordingly
+ *
+ * @throws ENOMEM Out of memory. Possibly, the process hit the RLIMIT_AS or
+ * RLIMIT_DATA limit described in getrlimit(2).
*/
__attribute__((nonnull, warn_unused_result))
int libmds_mspool_initialise(libmds_mspool_t* restrict this);
@@ -283,8 +286,12 @@ void libmds_mspool_destroy(libmds_mspool_t* restrict this);
* Spool a message
*
* @param this The message spool
- * @param message The message to spool
+ * @param message The message to spool, must be flat (created with `libmds_message_duplicate`)
* @return Zero on success, -1 on error, `errno` will be set accordingly
+ *
+ * @throws EINTR If interrupted
+ * @throws ENOMEM Out of memory. Possibly, the process hit the RLIMIT_AS or
+ * RLIMIT_DATA limit described in getrlimit(2).
*/
__attribute__((nonnull, warn_unused_result))
int libmds_mspool_spool(libmds_mspool_t* restrict this, libmds_message_t* restrict message);
@@ -294,6 +301,8 @@ int libmds_mspool_spool(libmds_mspool_t* restrict this, libmds_message_t* restri
*
* @param this The message spool
* @return A spooled message, `NULL`on error, `errno` will be set accordingly
+ *
+ * @throws EINTR If interrupted
*/
__attribute__((nonnull, warn_unused_result, malloc))
libmds_message_t* libmds_mspool_poll(libmds_mspool_t* restrict this);
@@ -306,9 +315,15 @@ libmds_message_t* libmds_mspool_poll(libmds_mspool_t* restrict this);
* @param deadline The CLOCK_REALTIME time the function must return,
* `NULL` to return immediately if it would block
* @return A spooled message, `NULL`on error, `errno` will be set accordingly
+ *
+ * @throws EINTR If interrupted
+ * @throws EAGAIN If `deadline` is `NULL` and the spool is empty
+ * @throws EINVAL If `deadline->tv_nsecs` is outside [0, 1 milliard[
+ * @throws ETIMEDOUT If the time specified `deadline` passed and the spool was till empty
*/
__attribute__((nonnull(1), warn_unused_result, malloc))
-libmds_message_t* libmds_mspool_poll_try(libmds_mspool_t* restrict this, const struct timespec* deadline);
+libmds_message_t* libmds_mspool_poll_try(libmds_mspool_t* restrict this,
+ const struct timespec* restrict deadline);
@@ -335,7 +350,9 @@ void libmds_mpool_destroy(libmds_mpool_t* restrict this);
* Add a message allocation to a pool
*
* @param this The message allocation pool
- * @param message Message allocation to pool
+ * @param message Message allocation to pool, must be flat (created with
+ * `libmds_message_duplicate` or fetched with `libmds_mspool_poll`
+ * or `libmds_mspool_poll_try`)
* @return Zero on success, -1 on error, `errno` will be set accordingly
*/
__attribute__((nonnull, warn_unused_result))