aboutsummaryrefslogtreecommitdiffstats
path: root/libcoopgamma_synchronise.c
blob: 2b851162cebff59f68254fbcdc506f05a55e87f5 (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
/* See LICENSE file for copyright and license details. */
#include "common.h"


/**
 * Wait for the next message to be received
 * 
 * @param   ctx       The state of the library, must be connected
 * @param   pending   Information for each pending request
 * @param   n         The number of elements in `pending`
 * @param   selected  The index of the element in `pending` which corresponds
 *                    to the first inbound message, note that this only means
 *                    that the message is not for any of the other request,
 *                    if the message is corrupt any of the listed requests can
 *                    be selected even if it is not for any of the requests.
 *                    Functions that parse the message will detect such corruption.
 * @return            Zero on success, -1 on error. If the the message is ignored,
 *                    which happens if corresponding `libcoopgamma_async_context_t`
 *                    is not listed, -1 is returned and `errno` is set to 0. If -1
 *                    is returned, `errno` is set to `ENOTRECOVERABLE` you have
 *                    received a corrupt message and the context has been tainted
 *                    beyond recover.
 */
int
libcoopgamma_synchronise(libcoopgamma_context_t *restrict ctx, libcoopgamma_async_context_t *restrict pending,
                         size_t n, size_t *restrict selected)
{
	char temp[3 * sizeof(size_t) + 1];
	ssize_t got;
	size_t i;
	char *p;
	char *line;
	char *value;
	struct pollfd pollfd;
	size_t new_size;
	void *new;

	if (ctx->inbound_head == ctx->inbound_tail) {
		ctx->inbound_head = ctx->inbound_tail = ctx->curline = 0;
	} else if (ctx->inbound_tail > 0) {
		memmove(ctx->inbound, ctx->inbound + ctx->inbound_tail, ctx->inbound_head -= ctx->inbound_tail);
		ctx->curline -= ctx->inbound_tail;
		ctx->inbound_tail = 0;
	}

	pollfd.fd = ctx->fd;
	pollfd.events = POLLIN | POLLRDNORM | POLLRDBAND | POLLPRI;

	if (ctx->inbound_head)
		goto skip_recv;
	for (;;) {
		if (ctx->inbound_head == ctx->inbound_size) {
			new_size = ctx->inbound_size ? (ctx->inbound_size << 1) : 1024U;
			new = realloc(ctx->inbound, new_size);
			if (!new)
				return -1;
			ctx->inbound = new;
			ctx->inbound_size = new_size;
		}

		if (ctx->blocking) {
			pollfd.revents = 0;
			if (poll(&pollfd, (nfds_t)1, -1) < 0)
				return -1;
		}
		got = recv(ctx->fd, &ctx->inbound[ctx->inbound_head], ctx->inbound_size - ctx->inbound_head, 0);
		if (got <= 0) {
			if (got == 0)
				errno = ECONNRESET;
			return -1;
		}

#ifdef DEBUG_MODE
		fprintf(stderr, "\033[32m");
		fwrite(&ctx->inbound[ctx->inbound_head], (size_t)got, 1U, stderr);
		fprintf(stderr, "\033[m");
		fflush(stderr);
#endif

		ctx->inbound_head += (size_t)got;

	skip_recv:
		while (!ctx->have_all_headers) {
			line = &ctx->inbound[ctx->curline];
			p = memchr(line, '\n', ctx->inbound_head - ctx->curline);
			if (!p)
				break;
			if (memchr(line, '\0', (size_t)(p - line)) != NULL)
				ctx->bad_message = 1;
			*p++ = '\0';
			ctx->curline = (size_t)(p - ctx->inbound);
			if (!*line) {
				ctx->have_all_headers = 1;
			} else if (strstr(line, "In response to: ") == line) {
				value = &line[sizeof("In response to: ") - 1U];
				ctx->in_response_to = (uint32_t)atol(value);
			} else if (strstr(line, "Length: ") == line) {
				value = &line[sizeof("Length: ") - 1U];
				ctx->length = (size_t)atol(value);
				sprintf(temp, "%zu", ctx->length);
				if (strcmp(value, temp))
					goto fatal;
			}
		}

		if (ctx->have_all_headers && ctx->inbound_head >= ctx->curline + ctx->length) {
			ctx->curline += ctx->length;
			if (ctx->bad_message) {
				ctx->bad_message = 0;
				ctx->have_all_headers = 0;
				ctx->length = 0;
				ctx->inbound_tail = ctx->curline;
				errno = EBADMSG;
				return -1;
			}
			for (i = 0; i < n; i++) {
				if (pending[i].message_id == ctx->in_response_to) {
					*selected = i;
					return 0;
				}
			}
			*selected = 0;
			ctx->bad_message = 0;
			ctx->have_all_headers = 0;
			ctx->length = 0;
			ctx->inbound_tail = ctx->curline;
			errno = 0;
			return -1;
		}
	}

fatal:
	errno = ENOTRECOVERABLE;
	return -1;
}