aboutsummaryrefslogtreecommitdiffstats
path: root/src/libmdsclient/inbound.c
diff options
context:
space:
mode:
authorMattias Andrée <maandree@kth.se>2017-11-05 00:09:50 +0100
committerMattias Andrée <maandree@kth.se>2017-11-05 00:09:50 +0100
commit9e8dec188d55ca1f0a3b33acab702ced8ed07a18 (patch)
treecbb43c22e72674dc672e645e6596358e3868568e /src/libmdsclient/inbound.c
parenttypo (diff)
downloadmds-9e8dec188d55ca1f0a3b33acab702ced8ed07a18.tar.gz
mds-9e8dec188d55ca1f0a3b33acab702ced8ed07a18.tar.bz2
mds-9e8dec188d55ca1f0a3b33acab702ced8ed07a18.tar.xz
Work on changing style, and an important typo fix
Signed-off-by: Mattias Andrée <maandree@kth.se>
Diffstat (limited to 'src/libmdsclient/inbound.c')
-rw-r--r--src/libmdsclient/inbound.c1041
1 files changed, 519 insertions, 522 deletions
diff --git a/src/libmdsclient/inbound.c b/src/libmdsclient/inbound.c
index ccff51e..4b74b58 100644
--- a/src/libmdsclient/inbound.c
+++ b/src/libmdsclient/inbound.c
@@ -28,8 +28,8 @@
#include <assert.h>
-#define try(INSTRUCTION) if ((r = INSTRUCTION) < 0) return r
-#define static_strlen(str) (sizeof(str) / sizeof(char) - 1)
+#define try(INSTRUCTION) do { if ((r = INSTRUCTION) < 0) return r; } while (0)
+#define static_strlen(str) (sizeof(str) / sizeof(char) - 1)
@@ -44,18 +44,19 @@
* @throws ENOMEM Out of memory. Possibly, the process hit the RLIMIT_AS or
* RLIMIT_DATA limit described in getrlimit(2).
*/
-int libmds_message_initialise(libmds_message_t* restrict this)
+int
+libmds_message_initialise(libmds_message_t *restrict this)
{
- this->headers = NULL;
- this->header_count = 0;
- this->payload = NULL;
- this->payload_size = 0;
- this->buffer_size = 128;
- this->buffer_ptr = 0;
- this->stage = 0;
- this->flattened = 0;
- this->buffer = malloc(this->buffer_size * sizeof(char));
- return this->buffer == NULL ? -1 : 0;
+ this->headers = NULL;
+ this->header_count = 0;
+ this->payload = NULL;
+ this->payload_size = 0;
+ this->buffer_size = 128;
+ this->buffer_ptr = 0;
+ this->stage = 0;
+ this->flattened = 0;
+ this->buffer = malloc(this->buffer_size * sizeof(char));
+ return this->buffer == NULL ? -1 : 0;
}
@@ -64,20 +65,20 @@ int libmds_message_initialise(libmds_message_t* restrict this)
*
* @param this The message
*/
-void libmds_message_destroy(libmds_message_t* restrict this)
+void
+libmds_message_destroy(libmds_message_t *restrict this)
{
- if (this->flattened == 0)
- {
- free(this->headers), this->headers = NULL;
- free(this->buffer), this->buffer = NULL;
- }
+ if (!this->flattened) {
+ free(this->headers), this->headers = NULL;
+ free(this->buffer), this->buffer = NULL;
+ }
}
/**
* Release all resources in a message, should
* be done even if initialisation fails
-*
+ *
* @param this The message
* @param pool Message allocation pool, may be `NULL`
* @return The duplicate, you do not need to call `libmds_message_destroy`
@@ -88,36 +89,37 @@ void libmds_message_destroy(libmds_message_t* restrict this)
* @throws ENOMEM Out of memory. Possibly, the process hit the RLIMIT_AS or
* RLIMIT_DATA limit described in getrlimit(2).
*/
-libmds_message_t* libmds_message_duplicate(libmds_message_t* restrict this, libmds_mpool_t* restrict pool)
+libmds_message_t *
+libmds_message_duplicate(libmds_message_t *restrict this, libmds_mpool_t *restrict pool)
{
- size_t flattened_size, reused, i, n = this->header_count;
- libmds_message_t* rc;
-
- flattened_size = sizeof(libmds_message_t) + this->buffer_off * sizeof(char) + n * sizeof(void*);
- repoll:
- reused = 0;
- rc = pool == NULL ? NULL : libmds_mpool_poll(pool);
- if (rc != NULL)
- if ((reused = rc->flattened) < flattened_size)
- {
- free(rc);
- goto repoll;
- }
- if ((rc == NULL) && (rc = malloc(flattened_size), rc == NULL))
- return NULL;
-
- *rc = *this;
- rc->flattened = reused ? reused : flattened_size;
- rc->buffer_size = this->buffer_off;
-
- rc->buffer = ((char*)rc) + sizeof(libmds_message_t) / sizeof(char);
- rc->headers = rc->header_count ? (char**)(void*)(rc->buffer + this->buffer_off) : NULL;
- rc->payload = rc->payload_size ? (rc->buffer + (size_t)(this->payload - this->buffer)) : NULL;
- for (i = 0; i < n; i++)
- rc->headers[i] = rc->buffer + (size_t)(this->headers[i] - this->buffer);
-
- memcpy(rc->buffer, this->buffer, this->buffer_off * sizeof(char));
- return rc;
+ size_t flattened_size, reused, i, n = this->header_count;
+ libmds_message_t *rc;
+
+ flattened_size = sizeof(libmds_message_t) + this->buffer_off * sizeof(char) + n * sizeof(void*);
+repoll:
+ reused = 0;
+ rc = !pool ? NULL : libmds_mpool_poll(pool);
+ if (rc) {
+ if ((reused = rc->flattened) < flattened_size) {
+ free(rc);
+ goto repoll;
+ }
+ }
+ if (!rc && !(rc = malloc(flattened_size)))
+ return NULL;
+
+ *rc = *this;
+ rc->flattened = reused ? reused : flattened_size;
+ rc->buffer_size = this->buffer_off;
+
+ rc->buffer = ((char*)rc) + sizeof(libmds_message_t) / sizeof(char);
+ rc->headers = rc->header_count ? (char**)(void*)(rc->buffer + this->buffer_off) : NULL;
+ rc->payload = rc->payload_size ? (rc->buffer + (size_t)(this->payload - this->buffer)) : NULL;
+ for (i = 0; i < n; i++)
+ rc->headers[i] = rc->buffer + (size_t)(this->headers[i] - this->buffer);
+
+ memcpy(rc->buffer, this->buffer, this->buffer_off * sizeof(char));
+ return rc;
}
@@ -128,73 +130,71 @@ libmds_message_t* libmds_message_duplicate(libmds_message_t* restrict this, libm
* @param allow_modified_nul Whether Modified UTF-8 is allowed, which allows a two-byte encoding for NUL
* @return Zero if good, -1 on encoding error
*/
-__attribute__((nonnull, warn_unused_result)) /* Cannibalised from <libmdsserver/util.h>. */
-static int verify_utf8(const char* string, int allow_modified_nul)
+static int __attribute__((nonnull, warn_unused_result)) /* Cannibalised from <libmdsserver/util.h>. */
+verify_utf8(const char *string, int allow_modified_nul)
{
- static long BYTES_TO_MIN_BITS[] = {0, 0, 8, 12, 17, 22, 37};
- static long BYTES_TO_MAX_BITS[] = {0, 7, 11, 16, 21, 26, 31};
- long bytes = 0, read_bytes = 0, bits = 0, c, character;
-
- /* min bits max bits
- 0....... 0 7
- 110..... 10...... 8 11
- 1110.... 10...... 10...... 12 16
- 11110... 10...... 10...... 10...... 17 21
- 111110.. 10...... 10...... 10...... 10...... 22 26
- 1111110. 10...... 10...... 10...... 10...... 10...... 27 31
- */
-
- while ((c = (long)(*string++)))
- if (read_bytes == 0)
- {
- /* First byte of the character. */
-
- if ((c & 0x80) == 0x00)
- /* Single-byte character. */
- continue;
-
- if ((c & 0xC0) == 0x80)
- /* Single-byte character marked as multibyte, or
- a non-first byte in a multibyte character. */
- return -1;
-
- /* Multibyte character. */
- while ((c & 0x80))
- bytes++, c <<= 1;
- read_bytes = 1;
- character = c & 0x7F;
- if (bytes > 6)
- /* 31-bit characters can be encoded with 6-bytes,
- and UTF-8 does not cover higher code points. */
- return -1;
- }
- else
- {
- /* Not first byte of the character. */
-
- if ((c & 0xC0) != 0x80)
- /* Beginning of new character before a
- multibyte character has ended. */
- return -1;
-
- character = (character << 6) | (c & 0x7F);
-
- if (++read_bytes < bytes)
- /* Not at last byte yet. */
- continue;
-
- /* Check that the character is not unnecessarily long. */
- while (character)
- character >>= 1, bits++;
- bits = ((bits == 0) && (bytes == 2) && allow_modified_nul) ? 8 : bits;
- if ((bits < BYTES_TO_MIN_BITS[bytes]) || (BYTES_TO_MAX_BITS[bytes] < bits))
- return -1;
-
- read_bytes = bytes = bits = 0;
- }
-
- /* Make sure we did not stop at the middle of a multibyte character. */
- return read_bytes == 0 ? 0 : -1;
+ static long BYTES_TO_MIN_BITS[] = {0, 0, 8, 12, 17, 22, 37};
+ static long BYTES_TO_MAX_BITS[] = {0, 7, 11, 16, 21, 26, 31};
+ long bytes = 0, read_bytes = 0, bits = 0, c, character;
+
+ /* min bits max bits
+ 0....... 0 7
+ 110..... 10...... 8 11
+ 1110.... 10...... 10...... 12 16
+ 11110... 10...... 10...... 10...... 17 21
+ 111110.. 10...... 10...... 10...... 10...... 22 26
+ 1111110. 10...... 10...... 10...... 10...... 10...... 27 31
+ */
+
+ while ((c = (long)(*string++))) {
+ if (!read_bytes) {
+ /* First byte of the character. */
+
+ if (!(c & 0x80))
+ /* Single-byte character. */
+ continue;
+
+ if ((c & 0xC0) == 0x80)
+ /* Single-byte character marked as multibyte, or
+ a non-first byte in a multibyte character. */
+ return -1;
+
+ /* Multibyte character. */
+ while ((c & 0x80))
+ bytes++, c <<= 1;
+ read_bytes = 1;
+ character = c & 0x7F;
+ if (bytes > 6)
+ /* 31-bit characters can be encoded with 6-bytes,
+ and UTF-8 does not cover higher code points. */
+ return -1;
+ } else {
+ /* Not first byte of the character. */
+
+ if ((c & 0xC0) != 0x80)
+ /* Beginning of new character before a
+ multibyte character has ended. */
+ return -1;
+
+ character = (character << 6) | (c & 0x7F);
+
+ if (++read_bytes < bytes)
+ /* Not at last byte yet. */
+ continue;
+
+ /* Check that the character is not unnecessarily long. */
+ while (character)
+ character >>= 1, bits++;
+ bits = ((bits == 0) && (bytes == 2) && allow_modified_nul) ? 8 : bits;
+ if ((bits < BYTES_TO_MIN_BITS[bytes]) || (BYTES_TO_MAX_BITS[bytes] < bits))
+ return -1;
+
+ read_bytes = bytes = bits = 0;
+ }
+ }
+
+ /* Make sure we did not stop at the middle of a multibyte character. */
+ return read_bytes == 0 ? 0 : -1;
}
@@ -209,14 +209,14 @@ static int verify_utf8(const char* string, int allow_modified_nul)
* @throws ENOMEM Out of memory. Possibly, the process hit the RLIMIT_AS or
* RLIMIT_DATA limit described in getrlimit(2).
*/
-__attribute__((nonnull, warn_unused_result))
-static int extend_headers(libmds_message_t* restrict this, size_t extent)
+static int __attribute__((nonnull, warn_unused_result))
+extend_headers(libmds_message_t *restrict this, size_t extent)
{
- char** new_headers = realloc(this->headers, (this->header_count + extent) * sizeof(char*));
- if (new_headers == NULL)
- return -1;
- this->headers = new_headers;
- return 0;
+ char **new_headers = realloc(this->headers, (this->header_count + extent) * sizeof(char *));
+ if (!new_headers)
+ return -1;
+ this->headers = new_headers;
+ return 0;
}
@@ -230,19 +230,19 @@ static int extend_headers(libmds_message_t* restrict this, size_t extent)
* @throws ENOMEM Out of memory. Possibly, the process hit the RLIMIT_AS or
* RLIMIT_DATA limit described in getrlimit(2).
*/
-__attribute__((nonnull, warn_unused_result))
-static int extend_buffer(libmds_message_t* restrict this, int shift)
+static int __attribute__((nonnull, warn_unused_result))
+extend_buffer(libmds_message_t *restrict this, int shift)
{
- size_t i, n = this->header_count;
- char* new_buf = realloc(this->buffer, (this->buffer_size << shift) * sizeof(char));
- if (new_buf == NULL)
- return -1;
- if (new_buf != this->buffer)
- for (i = 0; i < n; i++)
- this->headers[i] = new_buf + (size_t)(this->headers[i] - this->buffer);
- this->buffer = new_buf;
- this->buffer_size <<= shift;
- return 0;
+ size_t i, n = this->header_count;
+ char *new_buf = realloc(this->buffer, (this->buffer_size << shift) * sizeof(char));
+ if (!new_buf)
+ return -1;
+ if (new_buf != this->buffer)
+ for (i = 0; i < n; i++)
+ this->headers[i] = new_buf + (size_t)(this->headers[i] - this->buffer);
+ this->buffer = new_buf;
+ this->buffer_size <<= shift;
+ return 0;
}
/**
@@ -250,22 +250,22 @@ static int extend_buffer(libmds_message_t* restrict this, int shift)
*
* @param this The message
*/
-__attribute__((nonnull))
-static void reset_message(libmds_message_t* restrict this)
+static void __attribute__((nonnull))
+reset_message(libmds_message_t *restrict this)
{
- size_t overrun = this->buffer_ptr - this->buffer_off;
-
- if (overrun)
- memmove(this->buffer, this->buffer + this->buffer_off, overrun * sizeof(char));
- this->buffer_ptr -= this->buffer_off;
- this->buffer_off = 0;
-
- free(this->headers);
- this->headers = NULL;
- this->header_count = 0;
-
- this->payload = NULL;
- this->payload_size = 0;
+ size_t overrun = this->buffer_ptr - this->buffer_off;
+
+ if (overrun)
+ memmove(this->buffer, this->buffer + this->buffer_off, overrun * sizeof(char));
+ this->buffer_ptr -= this->buffer_off;
+ this->buffer_off = 0;
+
+ free(this->headers);
+ this->headers = NULL;
+ this->header_count = 0;
+
+ this->payload = NULL;
+ this->payload_size = 0;
}
@@ -275,29 +275,29 @@ static void reset_message(libmds_message_t* restrict this)
* @param this The message
* @return Zero on success, negative on error (malformated message: unrecoverable state)
*/
-__attribute__((pure, nonnull, warn_unused_result))
-static int get_payload_length(libmds_message_t* restrict this)
+static int __attribute__((pure, nonnull, warn_unused_result))
+get_payload_length(libmds_message_t *restrict this)
{
- char* header;
- size_t i;
-
- for (i = 0; i < this->header_count; i++)
- if (strstr(this->headers[i], "Length: ") == this->headers[i])
- {
- /* Store the message length. */
- header = this->headers[i] + static_strlen("Length: ");
- this->payload_size = (size_t)atoll(header);
-
- /* Do not except a length that is not correctly formated. */
- for (; *header; header++)
- if ((*header < '0') || ('9' < *header))
- return -2; /* Malformated value, enters unrecoverable state. */
-
- /* Stop searching for the ‘Length’ header, we have found and parsed it. */
- break;
- }
-
- return 0;
+ char *header;
+ size_t i;
+
+ for (i = 0; i < this->header_count; i++) {
+ if (strstr(this->headers[i], "Length: ") == this->headers[i]) {
+ /* Store the message length. */
+ header = this->headers[i] + static_strlen("Length: ");
+ this->payload_size = (size_t)atoll(header);
+
+ /* Do not except a length that is not correctly formated. */
+ for (; *header; header++)
+ if (*header < '0' || '9' < *header)
+ return -2; /* Malformated value, enters unrecoverable state. */
+
+ /* Stop searching for the ‘Length’ header, we have found and parsed it. */
+ break;
+ }
+ }
+
+ return 0;
}
@@ -308,21 +308,21 @@ static int get_payload_length(libmds_message_t* restrict this)
* @param length The length of the header
* @return Zero if valid, negative if invalid (malformated message: unrecoverable state)
*/
-__attribute__((pure, nonnull, warn_unused_result))
-static int validate_header(const char* header, size_t length)
+static int __attribute__((pure, nonnull, warn_unused_result))
+validate_header(const char *header, size_t length)
{
- char* p = memchr(header, ':', length * sizeof(char));
-
- if (verify_utf8(header, 0) < 0)
- /* Either the string is not UTF-8, or your are under an UTF-8 attack,
- let's just call this unrecoverable because the client will not correct. */
- return -2;
-
- if ((p == NULL) || /* Buck you, rawmemchr should not segfault the program. */
- (p[1] != ' ')) /* Also an invalid format. ' ' is mandated after the ':'. */
- return -2;
-
- return 0;
+ char *p = memchr(header, ':', length * sizeof(char));
+
+ if (verify_utf8(header, 0) < 0)
+ /* Either the string is not UTF-8, or your are under an UTF-8 attack,
+ let's just call this unrecoverable because the client will not correct. */
+ return -2;
+
+ if (!p || /* Buck you, rawmemchr should not segfault the program. */
+ (p[1] != ' ')) /* Also an invalid format. ' ' is mandated after the ':'. */
+ return -2;
+
+ return 0;
}
@@ -336,28 +336,28 @@ static int validate_header(const char* header, size_t length)
* @throws ENOMEM Out of memory. Possibly, the process hit the RLIMIT_AS or
* RLIMIT_DATA limit described in getrlimit(2).
*/
-__attribute__((nonnull))
-static int initialise_payload(libmds_message_t* restrict this)
+static int __attribute__((nonnull))
+initialise_payload(libmds_message_t *restrict this)
{
- int shift = 0;
-
- /* Skip over the \n (end of empty line) we found from the buffer. */
- this->buffer_off++;
-
- /* Get the length of the payload. */
- if (get_payload_length(this) < 0)
- return -2; /* Malformated value, enters unrecoverable state. */
-
- /* Reallocate the buffer if it is too small. */
- while (this->buffer_off + this->payload_size > this->buffer_size << shift)
- shift++;
- if (shift ? (extend_buffer(this, shift) < 0) : 0)
- return -1;
-
- /* Set pointer to payload. */
- this->payload = this->buffer + this->buffer_off;
-
- return 0;
+ int shift = 0;
+
+ /* Skip over the \n (end of empty line) we found from the buffer. */
+ this->buffer_off++;
+
+ /* Get the length of the payload. */
+ if (get_payload_length(this) < 0)
+ return -2; /* Malformated value, enters unrecoverable state. */
+
+ /* Reallocate the buffer if it is too small. */
+ while (this->buffer_off + this->payload_size > this->buffer_size << shift)
+ shift++;
+ if (shift ? (extend_buffer(this, shift) < 0) : 0)
+ return -1;
+
+ /* Set pointer to payload. */
+ this->payload = this->buffer + this->buffer_off;
+
+ return 0;
}
@@ -371,28 +371,28 @@ static int initialise_payload(libmds_message_t* restrict this)
* @throws ENOMEM Out of memory. Possibly, the process hit the RLIMIT_AS or
* RLIMIT_DATA limit described in getrlimit(2).
*/
-__attribute__((nonnull, warn_unused_result))
-static int store_header(libmds_message_t* restrict this, size_t length)
+static int __attribute__((nonnull, warn_unused_result))
+store_header(libmds_message_t *restrict this, size_t length)
{
- char* header;
-
- /* Get pointer to header in buffer. */
- header = this->buffer + this->buffer_off;
- /* NUL-terminate the header. */
- header[length - 1] = '\0';
-
- /* Update read offset. */
- this->buffer += length;
-
- /* Make sure the the header syntax is correct so that
- the program does not need to care about it. */
- if (validate_header(header, length))
- return -2;
-
- /* Store the header in the header list. */
- this->headers[this->header_count++] = header;
-
- return 0;
+ char *header;
+
+ /* Get pointer to header in buffer. */
+ header = this->buffer + this->buffer_off;
+ /* NUL-terminate the header. */
+ header[length - 1] = '\0';
+
+ /* Update read offset. */
+ this->buffer += length;
+
+ /* Make sure the the header syntax is correct so that
+ the program does not need to care about it. */
+ if (validate_header(header, length))
+ return -2;
+
+ /* Store the header in the header list. */
+ this->headers[this->header_count++] = header;
+
+ return 0;
}
@@ -407,36 +407,35 @@ static int store_header(libmds_message_t* restrict this, size_t length)
* RLIMIT_DATA limit described in getrlimit(2).
* @throws Any error specified for recv(3)
*/
-__attribute__((nonnull))
-static int continue_read(libmds_message_t* restrict this, int fd)
+static int __attribute__((nonnull))
+continue_read(libmds_message_t *restrict this, int fd)
{
- size_t n;
- ssize_t got;
- int r;
-
- /* Figure out how much space we have left in the read buffer. */
- n = this->buffer_size - this->buffer_ptr;
-
- /* If we do not have too much left, */
- if (n < 128)
- {
- /* grow the buffer, */
- try (extend_buffer(this, 1));
+ size_t n;
+ ssize_t got;
+ int r;
+
+ /* Figure out how much space we have left in the read buffer. */
+ n = this->buffer_size - this->buffer_ptr;
+
+ /* If we do not have too much left, */
+ if (n < 128) {
+ /* grow the buffer, */
+ try (extend_buffer(this, 1));
- /* and recalculate how much space we have left. */
- n = this->buffer_size - this->buffer_ptr;
- }
-
- /* Then read from the socket. */
- errno = 0;
- got = recv(fd, this->buffer + this->buffer_ptr, n, 0);
- this->buffer_ptr += (size_t)(got < 0 ? 0 : got);
- if (errno)
- return -1;
- if (got == 0)
- return errno = ECONNRESET, -1;
-
- return 0;
+ /* and recalculate how much space we have left. */
+ n = this->buffer_size - this->buffer_ptr;
+ }
+
+ /* Then read from the socket. */
+ errno = 0;
+ got = recv(fd, this->buffer + this->buffer_ptr, n, 0);
+ this->buffer_ptr += (size_t)(got < 0 ? 0 : got);
+ if (errno)
+ return -1;
+ if (!got)
+ return errno = ECONNRESET, -1;
+
+ return 0;
}
@@ -457,77 +456,71 @@ static int continue_read(libmds_message_t* restrict this, int fd)
* RLIMIT_DATA limit described in getrlimit(2).
* @throws Any error specified for recv(3)
*/
-int libmds_message_read(libmds_message_t* restrict this, int fd)
+int
+libmds_message_read(libmds_message_t *restrict this, int fd)
{
- size_t header_commit_buffer = 0;
- int r;
-
- /* If we are at stage 2, we are done and it is time to start over.
- This is important because the function could have been interrupted. */
- if (this->stage == 2)
- {
- reset_message(this);
- this->stage = 0;
- }
-
- /* Read from file descriptor until we have a full message. */
- for (;;)
- {
- char* p;
- size_t length;
-
- /* Stage 0: headers. */
- /* Read all headers that we have stored into the read buffer. */
- while ((this->stage == 0) &&
- ((p = memchr(this->buffer + this->buffer_off, '\n',
- (this->buffer_ptr - this->buffer_off) * sizeof(char))) != NULL))
- if ((length = (size_t)(p - (this->buffer + this->buffer_off))))
- {
- /* We have found a header. */
-
- /* On every eighth header found with this function call,
- we prepare the header list for eight more headers so
- that it does not need to be reallocated again and again. */
- if (header_commit_buffer == 0)
- try (extend_headers(this, header_commit_buffer = 8));
-
- /* Store header. */
- try (store_header(this, length + 1));
- header_commit_buffer -= 1;
- }
- else
- {
- /* We have found an empty line, i.e. the end of the headers. */
-
- /* Make sure the full payload fits the buffer, and set
- * the payload buffer pointer. */
- try (initialise_payload(this));
-
- /* Mark end of stage, next stage is getting the payload. */
- this->stage = 1;
- }
-
-
- /* Stage 1: payload. */
- if ((this->stage == 1) && (this->buffer_ptr - this->buffer_off >= this->payload_size))
- {
- /* If we have filled the payload (or there was no payload),
- mark the end of this stage, i.e. that the message is
- complete, and return with success. */
- this->stage = 2;
-
- /* Mark the end of the message. */
- this->buffer_off += this->payload_size;
-
- return 0;
+ size_t header_commit_buffer = 0;
+ int r;
+ char *p;
+ size_t length;
+
+ /* If we are at stage 2, we are done and it is time to start over.
+ This is important because the function could have been interrupted. */
+ if (this->stage == 2) {
+ reset_message(this);
+ this->stage = 0;
+ }
+
+ /* Read from file descriptor until we have a full message. */
+ for (;;) {
+ /* Stage 0: headers. */
+ /* Read all headers that we have stored into the read buffer. */
+ while (!this->stage && ((p = memchr(this->buffer + this->buffer_off, '\n',
+ (this->buffer_ptr - this->buffer_off) * sizeof(char))))) {
+ if ((length = (size_t)(p - (this->buffer + this->buffer_off)))) {
+ /* We have found a header. */
+
+ /* On every eighth header found with this function call,
+ we prepare the header list for eight more headers so
+ that it does not need to be reallocated again and again. */
+ if (!header_commit_buffer)
+ try (extend_headers(this, header_commit_buffer = 8));
+
+ /* Store header. */
+ try (store_header(this, length + 1));
+ header_commit_buffer -= 1;
+ } else {
+ /* We have found an empty line, i.e. the end of the headers. */
+
+ /* Make sure the full payload fits the buffer, and set
+ * the payload buffer pointer. */
+ try (initialise_payload(this));
+
+ /* Mark end of stage, next stage is getting the payload. */
+ this->stage = 1;
+ }
+ }
+
+
+ /* Stage 1: payload. */
+ if (this->stage == 1 && this->buffer_ptr - this->buffer_off >= this->payload_size) {
+ /* If we have filled the payload (or there was no payload),
+ mark the end of this stage, i.e. that the message is
+ complete, and return with success. */
+ this->stage = 2;
+
+ /* Mark the end of the message. */
+ this->buffer_off += this->payload_size;
+
+ return 0;
+ }
+
+
+ /* If stage 1 was not completed. */
+
+ /* Continue reading from the socket into the buffer. */
+ try (continue_read(this, fd));
}
-
-
- /* If stage 1 was not completed. */
-
- /* Continue reading from the socket into the buffer. */
- try (continue_read(this, fd));
- }
}
@@ -541,29 +534,30 @@ int libmds_message_read(libmds_message_t* restrict this, int fd)
* @throws ENOMEM Out of memory. Possibly, the process hit the RLIMIT_AS or
* RLIMIT_DATA limit described in getrlimit(2).
*/
-int libmds_mspool_initialise(libmds_mspool_t* restrict this)
+int
+libmds_mspool_initialise(libmds_mspool_t *restrict this)
{
- int stage = 0, saved_errno;
- this->size = 1;
- this->head = 0;
- this->tail = 0;
- this->spooled_bytes = 0;
- this->spool_limit_bytes = 4 << 10;
- this->spool_limit_messages = 8;
- this->please_post = 0;
- this->messages = malloc(sizeof(libmds_message_t*));
- if (this->messages == NULL)
- return -1;
- if (sem_init(&(this->lock), 0, 1) < 0) goto fail; else stage++;
- if (sem_init(&(this->semaphore), 0, 0) < 0) goto fail; else stage++;
- if (sem_init(&(this->wait_semaphore), 0, 0) < 0) goto fail;
- return 0;
+ int stage = 0, saved_errno;
+ this->size = 1;
+ this->head = 0;
+ this->tail = 0;
+ this->spooled_bytes = 0;
+ this->spool_limit_bytes = 4 << 10;
+ this->spool_limit_messages = 8;
+ this->please_post = 0;
+ this->messages = malloc(sizeof(libmds_message_t*));
+ if (!this->messages)
+ return -1;
+ if (sem_init(&(this->lock), 0, 1) < 0) goto fail; else stage++;
+ if (sem_init(&(this->semaphore), 0, 0) < 0) goto fail; else stage++;
+ if (sem_init(&(this->wait_semaphore), 0, 0) < 0) goto fail;
+ return 0;
fail:
- saved_errno = errno;
- if (stage >= 1) sem_destroy(&(this->lock));
- if (stage >= 2) sem_destroy(&(this->semaphore));
- free(this->messages), this->messages = NULL;
- return errno = saved_errno, -1;
+ saved_errno = errno;
+ if (stage >= 1) sem_destroy(&(this->lock));
+ if (stage >= 2) sem_destroy(&(this->semaphore));
+ free(this->messages), this->messages = NULL;
+ return errno = saved_errno, -1;
}
@@ -572,17 +566,18 @@ int libmds_mspool_initialise(libmds_mspool_t* restrict this)
*
* @param this The message spool
*/
-void libmds_mspool_destroy(libmds_mspool_t* restrict this)
+void
+libmds_mspool_destroy(libmds_mspool_t *restrict this)
{
- if (this->messages == NULL)
- return;
- while (this->tail < this->head)
- free(this->messages[this->tail++]);
- sem_destroy(&(this->lock));
- sem_destroy(&(this->semaphore));
- sem_destroy(&(this->wait_semaphore));
- free(this->messages);
- this->messages = NULL;
+ if (!this->messages)
+ return;
+ while (this->tail < this->head)
+ free(this->messages[this->tail++]);
+ sem_destroy(&(this->lock));
+ sem_destroy(&(this->semaphore));
+ sem_destroy(&(this->wait_semaphore));
+ free(this->messages);
+ this->messages = NULL;
}
@@ -597,61 +592,59 @@ void libmds_mspool_destroy(libmds_mspool_t* restrict this)
* @throws ENOMEM Out of memory. Possibly, the process hit the RLIMIT_AS or
* RLIMIT_DATA limit described in getrlimit(2).
*/
-int libmds_mspool_spool(libmds_mspool_t* restrict this, libmds_message_t* restrict message)
+int
+libmds_mspool_spool(libmds_mspool_t *restrict this, libmds_message_t *restrict message)
{
- libmds_message_t** new;
- int saved_errno;
-
- start_over:
- /* Lock. */
- if (sem_wait(&(this->lock)) < 0)
- return -1;
-
- /* Block if spool is full. */
- if ((this->spooled_bytes >= this->spool_limit_bytes) ||
- (this->head - this->tail >= this->spool_limit_messages))
- {
- this->please_post++;
- if ((sem_post(&(this->lock)) < 0) || (sem_wait(&(this->wait_semaphore)) < 0))
- return this->please_post--, -1;
- goto start_over;
- }
-
- /* Rebase if the tail has come too far. */
- if (this->tail << 1 > this->size)
- {
- memmove(this->messages, this->messages + this->tail,
- (this->head - this->tail) * sizeof(sizeof(libmds_message_t*)));
- this->head -= this->tail, this->tail = 0;
- }
-
- /* Grow the spool if necessary. */
- if (this->size == this->head)
- {
- new = realloc(this->messages, (this->size << 1) * sizeof(libmds_message_t*));
- if (new == NULL)
- goto fail;
- this->messages = new;
- this->size <<= 1;
- }
-
- /* Spool. */
- this->spooled_bytes += message->flattened;
- this->messages[this->head++] = message;
-
- /* Signal. */
- if (sem_post(&(this->semaphore)) < 0)
- goto fail;
-
- /* Unlock. */
- if (sem_post(&(this->lock)) < 0)
- return -1;
-
- return 0;
- fail:
- saved_errno = errno;
- sem_post(&(this->lock));
- return errno = saved_errno, -1;
+ libmds_message_t **new;
+ int saved_errno;
+
+start_over:
+ /* Lock. */
+ if (sem_wait(&(this->lock)) < 0)
+ return -1;
+
+ /* Block if spool is full. */
+ if ((this->spooled_bytes >= this->spool_limit_bytes) ||
+ (this->head - this->tail >= this->spool_limit_messages)) {
+ this->please_post++;
+ if (sem_post(&this->lock) < 0 || sem_wait(&this->wait_semaphore) < 0)
+ return this->please_post--, -1;
+ goto start_over;
+ }
+
+ /* Rebase if the tail has come too far. */
+ if (this->tail << 1 > this->size) {
+ memmove(this->messages, this->messages + this->tail,
+ (this->head - this->tail) * sizeof(sizeof(libmds_message_t *)));
+ this->head -= this->tail, this->tail = 0;
+ }
+
+ /* Grow the spool if necessary. */
+ if (this->size == this->head) {
+ new = realloc(this->messages, (this->size << 1) * sizeof(libmds_message_t *));
+ if (!new)
+ goto fail;
+ this->messages = new;
+ this->size <<= 1;
+ }
+
+ /* Spool. */
+ this->spooled_bytes += message->flattened;
+ this->messages[this->head++] = message;
+
+ /* Signal. */
+ if (sem_post(&(this->semaphore)) < 0)
+ goto fail;
+
+ /* Unlock. */
+ if (sem_post(&(this->lock)) < 0)
+ return -1;
+
+ return 0;
+fail:
+ saved_errno = errno;
+ sem_post(&(this->lock));
+ return errno = saved_errno, -1;
}
@@ -661,38 +654,38 @@ int libmds_mspool_spool(libmds_mspool_t* restrict this, libmds_message_t* restri
* @param this The message spool, must not be empty
* @return A spooled message, `NULL`on error, `errno` will be set accordingly
*/
-static inline libmds_message_t* mspool_poll(libmds_mspool_t* restrict this)
+static inline libmds_message_t *
+mspool_poll(libmds_mspool_t *restrict this)
{
- libmds_message_t* msg;
-
- /* Lock. */
- if (sem_wait(&(this->lock)) < 0)
- return sem_post(&(this->semaphore)), NULL;
-
- /* Fetch message. */
- assert(this->tail < this->head);
- msg = this->messages[this->tail++];
- this->spooled_bytes -= msg->flattened;
-
- /* Unblock spooler, takes effect when this->lock is unlocked. */
- if (this->please_post)
- {
- if (sem_post(&(this->wait_semaphore)) < 0)
- perror("libmsdclient");
- else
- this->please_post--;
- }
-
- /* Unlock. */
- if (sem_post(&(this->lock)) < 0)
- goto fail;
-
- return msg;
- fail:
- sem_post(&(this->semaphore));
- this->messages[--(this->tail)] = msg;
- this->spooled_bytes += msg->flattened;
- return NULL;
+ libmds_message_t *msg;
+
+ /* Lock. */
+ if (sem_wait(&(this->lock)) < 0)
+ return sem_post(&(this->semaphore)), NULL;
+
+ /* Fetch message. */
+ assert(this->tail < this->head);
+ msg = this->messages[this->tail++];
+ this->spooled_bytes -= msg->flattened;
+
+ /* Unblock spooler, takes effect when this->lock is unlocked. */
+ if (this->please_post) {
+ if (sem_post(&(this->wait_semaphore)) < 0)
+ perror("libmsdclient");
+ else
+ this->please_post--;
+ }
+
+ /* Unlock. */
+ if (sem_post(&(this->lock)) < 0)
+ goto fail;
+
+ return msg;
+fail:
+ sem_post(&(this->semaphore));
+ this->messages[--(this->tail)] = msg;
+ this->spooled_bytes += msg->flattened;
+ return NULL;
}
@@ -704,13 +697,14 @@ static inline libmds_message_t* mspool_poll(libmds_mspool_t* restrict this)
*
* @throws EINTR If interrupted
*/
-libmds_message_t* libmds_mspool_poll(libmds_mspool_t* restrict this)
+libmds_message_t *
+libmds_mspool_poll(libmds_mspool_t *restrict this)
{
- /* Wait until there is a message available. */
- if (sem_wait(&(this->semaphore)) < 0)
- return NULL;
-
- return mspool_poll(this);
+ /* Wait until there is a message available. */
+ if (sem_wait(&(this->semaphore)) < 0)
+ return NULL;
+
+ return mspool_poll(this);
}
@@ -728,16 +722,16 @@ libmds_message_t* libmds_mspool_poll(libmds_mspool_t* restrict this)
* @throws EINVAL If `deadline->tv_nsecs` is outside [0, 1 milliard[
* @throws ETIMEDOUT If the time specified `deadline` passed and the spool was till empty
*/
-libmds_message_t* libmds_mspool_poll_try(libmds_mspool_t* restrict this,
- const struct timespec* restrict deadline)
+libmds_message_t *
+libmds_mspool_poll_try(libmds_mspool_t *restrict this, const struct timespec *restrict deadline)
{
- /* Is a message available? */
- if ((deadline == NULL) && (sem_trywait(&(this->semaphore)) < 0))
- return NULL;
- if ((deadline != NULL) && (sem_timedwait(&(this->semaphore), deadline) < 0))
- return NULL;
-
- return mspool_poll(this);
+ /* Is a message available? */
+ if (!deadline && sem_trywait(&this->semaphore) < 0)
+ return NULL;
+ if (deadline && sem_timedwait(&this->semaphore, deadline) < 0)
+ return NULL;
+
+ return mspool_poll(this);
}
@@ -752,21 +746,22 @@ libmds_message_t* libmds_mspool_poll_try(libmds_mspool_t* restrict this,
* @throws ENOMEM Out of memory. Possibly, the process hit the RLIMIT_AS or
* RLIMIT_DATA limit described in getrlimit(2).
*/
-int libmds_mpool_initialise(libmds_mpool_t* restrict this, size_t size)
+int
+libmds_mpool_initialise(libmds_mpool_t *restrict this, size_t size)
{
- int saved_errno;
- this->size = size;
- this->tip = 0;
- this->messages = malloc(size * sizeof(libmds_message_t*));
- if (this->messages == NULL)
- return -1;
- if (sem_init(&(this->lock), 0, 1) < 0)
- goto fail;
- return 0;
- fail:
- saved_errno = errno;
- free(this->messages), this->messages = NULL;
- return errno = saved_errno, -1;
+ int saved_errno;
+ this->size = size;
+ this->tip = 0;
+ this->messages = malloc(size * sizeof(libmds_message_t*));
+ if (!this->messages)
+ return -1;
+ if (sem_init(&this->lock, 0, 1) < 0)
+ goto fail;
+ return 0;
+fail:
+ saved_errno = errno;
+ free(this->messages), this->messages = NULL;
+ return errno = saved_errno, -1;
}
@@ -776,15 +771,16 @@ int libmds_mpool_initialise(libmds_mpool_t* restrict this, size_t size)
*
* @param this The message allocation pool
*/
-void libmds_mpool_destroy(libmds_mpool_t* restrict this)
+void
+libmds_mpool_destroy(libmds_mpool_t *restrict this)
{
- if (this->messages == NULL)
- return;
- while (this->tip--)
- free(this->messages[this->tip]);
- sem_destroy(&(this->lock));
- free(this->messages);
- this->messages = NULL;
+ if (!this->messages)
+ return;
+ while (this->tip--)
+ free(this->messages[this->tip]);
+ sem_destroy(&this->lock);
+ free(this->messages);
+ this->messages = NULL;
}
@@ -797,32 +793,32 @@ void libmds_mpool_destroy(libmds_mpool_t* restrict this)
* or `libmds_mspool_poll_try`)
* @return Zero on success, -1 on error, `errno` will be set accordingly
*/
-int libmds_mpool_offer(libmds_mpool_t* restrict this, libmds_message_t* restrict message)
+int
+libmds_mpool_offer(libmds_mpool_t *restrict this, libmds_message_t *restrict message)
{
- /* Discard if pool is full. */
- if (this->tip == this->size)
- return free(message), 0;
-
- /* Lock. */
- if (sem_wait(&(this->lock)) < 0)
- return -1;
-
- /* Discard if pool is still full. */
- if (this->tip == this->size)
- {
- free(message);
- goto unlock;
- }
-
- /* Pool allocation. */
- this->messages[this->tip++] = message;
-
- unlock:
- /* Unlock. */
- if (sem_post(&(this->lock)) < 0)
- return -1;
-
- return 0;
+ /* Discard if pool is full. */
+ if (this->tip == this->size)
+ return free(message), 0;
+
+ /* Lock. */
+ if (sem_wait(&this->lock) < 0)
+ return -1;
+
+ /* Discard if pool is still full. */
+ if (this->tip == this->size) {
+ free(message);
+ goto unlock;
+ }
+
+ /* Pool allocation. */
+ this->messages[this->tip++] = message;
+
+unlock:
+ /* Unlock. */
+ if (sem_post(&this->lock) < 0)
+ return -1;
+
+ return 0;
}
@@ -834,26 +830,27 @@ int libmds_mpool_offer(libmds_mpool_t* restrict this, libmds_message_t* restrict
* are available. If `NULL` is returned, `errno` is set to zero,
* if the pool was empty, otherwise `errno` will describe the error.
*/
-libmds_message_t* libmds_mpool_poll(libmds_mpool_t* restrict this)
+libmds_message_t *
+libmds_mpool_poll(libmds_mpool_t *restrict this)
{
- libmds_message_t* msg = NULL;
-
- /* Is the pool empty? Return nothing. */
- if (this->tip == 0)
- return errno = 0, NULL;
-
- /* Lock. */
- if (sem_wait(&(this->lock)) < 0)
- return NULL;
-
- /* Do we have a message? Take it */
- if (this->tip)
- msg = this->messages[--(this->tip)];
-
- /* Unlock. */
- if (sem_post(&(this->lock)) < 0)
- return this->tip--, NULL;
-
- return errno = 0, msg;
+ libmds_message_t *msg = NULL;
+
+ /* Is the pool empty? Return nothing. */
+ if (!this->tip)
+ return errno = 0, NULL;
+
+ /* Lock. */
+ if (sem_wait(&this->lock) < 0)
+ return NULL;
+
+ /* Do we have a message? Take it */
+ if (this->tip)
+ msg = this->messages[--(this->tip)];
+
+ /* Unlock. */
+ if (sem_post(&this->lock) < 0)
+ return this->tip--, NULL;
+
+ return errno = 0, msg;
}