/* See LICENSE file for copyright and license details. */ #include "common.h" /** * Wait for the next message to be received * * @param ctx The state of the library, must be connected * @param pending Information for each pending request * @param n The number of elements in `pending` * @param selected The index of the element in `pending` which corresponds * to the first inbound message, note that this only means * that the message is not for any of the other request, * if the message is corrupt any of the listed requests can * be selected even if it is not for any of the requests. * Functions that parse the message will detect such corruption. * @return Zero on success, -1 on error. If the the message is ignored, * which happens if corresponding `libcoopgamma_async_context_t` * is not listed, -1 is returned and `errno` is set to 0. If -1 * is returned, `errno` is set to `ENOTRECOVERABLE` you have * received a corrupt message and the context has been tainted * beyond recover. */ int libcoopgamma_synchronise(libcoopgamma_context_t *restrict ctx, libcoopgamma_async_context_t *restrict pending, size_t n, size_t *restrict selected) { char temp[3 * sizeof(size_t) + 1]; ssize_t got; size_t i; char *p; char *line; char *value; struct pollfd pollfd; size_t new_size; void *new; if (ctx->inbound_head == ctx->inbound_tail) { ctx->inbound_head = ctx->inbound_tail = ctx->curline = 0; } else if (ctx->inbound_tail > 0) { memmove(ctx->inbound, ctx->inbound + ctx->inbound_tail, ctx->inbound_head -= ctx->inbound_tail); ctx->curline -= ctx->inbound_tail; ctx->inbound_tail = 0; } pollfd.fd = ctx->fd; pollfd.events = POLLIN | POLLRDNORM | POLLRDBAND | POLLPRI; if (ctx->inbound_head) goto skip_recv; for (;;) { if (ctx->inbound_head == ctx->inbound_size) { new_size = ctx->inbound_size ? (ctx->inbound_size << 1) : 1024U; new = realloc(ctx->inbound, new_size); if (!new) return -1; ctx->inbound = new; ctx->inbound_size = new_size; } if (ctx->blocking) { pollfd.revents = 0; if (poll(&pollfd, (nfds_t)1, -1) < 0) return -1; } got = recv(ctx->fd, &ctx->inbound[ctx->inbound_head], ctx->inbound_size - ctx->inbound_head, 0); if (got <= 0) { if (got == 0) errno = ECONNRESET; return -1; } #ifdef DEBUG_MODE fprintf(stderr, "\033[32m"); fwrite(&ctx->inbound[ctx->inbound_head], (size_t)got, 1U, stderr); fprintf(stderr, "\033[m"); fflush(stderr); #endif ctx->inbound_head += (size_t)got; skip_recv: while (!ctx->have_all_headers) { line = &ctx->inbound[ctx->curline]; p = memchr(line, '\n', ctx->inbound_head - ctx->curline); if (!p) break; if (memchr(line, '\0', (size_t)(p - line)) != NULL) ctx->bad_message = 1; *p++ = '\0'; ctx->curline = (size_t)(p - ctx->inbound); if (!*line) { ctx->have_all_headers = 1; } else if (strstr(line, "In response to: ") == line) { value = &line[sizeof("In response to: ") - 1U]; ctx->in_response_to = (uint32_t)atol(value); } else if (strstr(line, "Length: ") == line) { value = &line[sizeof("Length: ") - 1U]; ctx->length = (size_t)atol(value); sprintf(temp, "%zu", ctx->length); if (strcmp(value, temp)) goto fatal; } } if (ctx->have_all_headers && ctx->inbound_head >= ctx->curline + ctx->length) { ctx->curline += ctx->length; if (ctx->bad_message) { ctx->bad_message = 0; ctx->have_all_headers = 0; ctx->length = 0; ctx->inbound_tail = ctx->curline; errno = EBADMSG; return -1; } for (i = 0; i < n; i++) { if (pending[i].message_id == ctx->in_response_to) { *selected = i; return 0; } } *selected = 0; ctx->bad_message = 0; ctx->have_all_headers = 0; ctx->length = 0; ctx->inbound_tail = ctx->curline; errno = 0; return -1; } } fatal: errno = ENOTRECOVERABLE; return -1; }