diff options
author | Mattias Andrée <m@maandree.se> | 2025-02-10 17:50:58 +0100 |
---|---|---|
committer | Mattias Andrée <m@maandree.se> | 2025-02-10 17:52:46 +0100 |
commit | ec1bcdcd0dd6e196303e8d9a30b3b2740e32c502 (patch) | |
tree | dcc759aaf897c915827659e00644f12503cf1268 /libcoopgamma_synchronise.c | |
parent | Improve makefile (diff) | |
download | libcoopgamma-ec1bcdcd0dd6e196303e8d9a30b3b2740e32c502.tar.gz libcoopgamma-ec1bcdcd0dd6e196303e8d9a30b3b2740e32c502.tar.bz2 libcoopgamma-ec1bcdcd0dd6e196303e8d9a30b3b2740e32c502.tar.xz |
Minor code improvements and split into multiple c files
Signed-off-by: Mattias Andrée <m@maandree.se>
Diffstat (limited to 'libcoopgamma_synchronise.c')
-rw-r--r-- | libcoopgamma_synchronise.c | 135 |
1 files changed, 135 insertions, 0 deletions
diff --git a/libcoopgamma_synchronise.c b/libcoopgamma_synchronise.c new file mode 100644 index 0000000..2b85116 --- /dev/null +++ b/libcoopgamma_synchronise.c @@ -0,0 +1,135 @@ +/* See LICENSE file for copyright and license details. */ +#include "common.h" + + +/** + * Wait for the next message to be received + * + * @param ctx The state of the library, must be connected + * @param pending Information for each pending request + * @param n The number of elements in `pending` + * @param selected The index of the element in `pending` which corresponds + * to the first inbound message, note that this only means + * that the message is not for any of the other request, + * if the message is corrupt any of the listed requests can + * be selected even if it is not for any of the requests. + * Functions that parse the message will detect such corruption. + * @return Zero on success, -1 on error. If the the message is ignored, + * which happens if corresponding `libcoopgamma_async_context_t` + * is not listed, -1 is returned and `errno` is set to 0. If -1 + * is returned, `errno` is set to `ENOTRECOVERABLE` you have + * received a corrupt message and the context has been tainted + * beyond recover. + */ +int +libcoopgamma_synchronise(libcoopgamma_context_t *restrict ctx, libcoopgamma_async_context_t *restrict pending, + size_t n, size_t *restrict selected) +{ + char temp[3 * sizeof(size_t) + 1]; + ssize_t got; + size_t i; + char *p; + char *line; + char *value; + struct pollfd pollfd; + size_t new_size; + void *new; + + if (ctx->inbound_head == ctx->inbound_tail) { + ctx->inbound_head = ctx->inbound_tail = ctx->curline = 0; + } else if (ctx->inbound_tail > 0) { + memmove(ctx->inbound, ctx->inbound + ctx->inbound_tail, ctx->inbound_head -= ctx->inbound_tail); + ctx->curline -= ctx->inbound_tail; + ctx->inbound_tail = 0; + } + + pollfd.fd = ctx->fd; + pollfd.events = POLLIN | POLLRDNORM | POLLRDBAND | POLLPRI; + + if (ctx->inbound_head) + goto skip_recv; + for (;;) { + if (ctx->inbound_head == ctx->inbound_size) { + new_size = ctx->inbound_size ? (ctx->inbound_size << 1) : 1024U; + new = realloc(ctx->inbound, new_size); + if (!new) + return -1; + ctx->inbound = new; + ctx->inbound_size = new_size; + } + + if (ctx->blocking) { + pollfd.revents = 0; + if (poll(&pollfd, (nfds_t)1, -1) < 0) + return -1; + } + got = recv(ctx->fd, &ctx->inbound[ctx->inbound_head], ctx->inbound_size - ctx->inbound_head, 0); + if (got <= 0) { + if (got == 0) + errno = ECONNRESET; + return -1; + } + +#ifdef DEBUG_MODE + fprintf(stderr, "\033[32m"); + fwrite(&ctx->inbound[ctx->inbound_head], (size_t)got, 1U, stderr); + fprintf(stderr, "\033[m"); + fflush(stderr); +#endif + + ctx->inbound_head += (size_t)got; + + skip_recv: + while (!ctx->have_all_headers) { + line = &ctx->inbound[ctx->curline]; + p = memchr(line, '\n', ctx->inbound_head - ctx->curline); + if (!p) + break; + if (memchr(line, '\0', (size_t)(p - line)) != NULL) + ctx->bad_message = 1; + *p++ = '\0'; + ctx->curline = (size_t)(p - ctx->inbound); + if (!*line) { + ctx->have_all_headers = 1; + } else if (strstr(line, "In response to: ") == line) { + value = &line[sizeof("In response to: ") - 1U]; + ctx->in_response_to = (uint32_t)atol(value); + } else if (strstr(line, "Length: ") == line) { + value = &line[sizeof("Length: ") - 1U]; + ctx->length = (size_t)atol(value); + sprintf(temp, "%zu", ctx->length); + if (strcmp(value, temp)) + goto fatal; + } + } + + if (ctx->have_all_headers && ctx->inbound_head >= ctx->curline + ctx->length) { + ctx->curline += ctx->length; + if (ctx->bad_message) { + ctx->bad_message = 0; + ctx->have_all_headers = 0; + ctx->length = 0; + ctx->inbound_tail = ctx->curline; + errno = EBADMSG; + return -1; + } + for (i = 0; i < n; i++) { + if (pending[i].message_id == ctx->in_response_to) { + *selected = i; + return 0; + } + } + *selected = 0; + ctx->bad_message = 0; + ctx->have_all_headers = 0; + ctx->length = 0; + ctx->inbound_tail = ctx->curline; + errno = 0; + return -1; + } + } + +fatal: + errno = ENOTRECOVERABLE; + return -1; +} |