diff options
author | Mattias Andrée <maandree@kth.se> | 2016-07-15 17:54:17 +0200 |
---|---|---|
committer | Mattias Andrée <maandree@kth.se> | 2016-07-15 17:54:17 +0200 |
commit | 6c6a8d38751318d1eea8be8e69a0a8f23e596f4d (patch) | |
tree | b8e2e62f404071b9cd4dd0d505d04a835a9e88f4 /src/libcoopgamma.c | |
parent | m doc (diff) | |
download | libcoopgamma-6c6a8d38751318d1eea8be8e69a0a8f23e596f4d.tar.gz libcoopgamma-6c6a8d38751318d1eea8be8e69a0a8f23e596f4d.tar.bz2 libcoopgamma-6c6a8d38751318d1eea8be8e69a0a8f23e596f4d.tar.xz |
Implement libcoopgamma_synchronise
Signed-off-by: Mattias Andrée <maandree@kth.se>
Diffstat (limited to 'src/libcoopgamma.c')
-rw-r--r-- | src/libcoopgamma.c | 118 |
1 files changed, 106 insertions, 12 deletions
diff --git a/src/libcoopgamma.c b/src/libcoopgamma.c index 36ffde2..175828f 100644 --- a/src/libcoopgamma.c +++ b/src/libcoopgamma.c @@ -754,18 +754,8 @@ int libcoopgamma_error_unmarshal(libcoopgamma_error_t* restrict this, */ int libcoopgamma_context_initialise(libcoopgamma_context_t* restrict this) { + memset(this, 0, sizeof(*this)); this->fd = -1; - libcoopgamma_error_initialise(&(this->error)); - this->message_id = 0; - this->outbound = NULL; - this->outbound_head = 0; - this->outbound_tail = 0; - this->outbound_size = 0; - this->inbound = NULL; - this->inbound_head = 0; - this->inbound_tail = 0; - this->inbound_size = 0; - this->length = 0; return 0; } @@ -815,6 +805,10 @@ size_t libcoopgamma_context_marshal(const libcoopgamma_context_t* restrict this, marshal_prim(this->inbound_head - this->inbound_tail, size_t); marshal_buffer(this->inbound + this->inbound_head, this->inbound_head - this->inbound_tail); marshal_prim(this->length, size_t); + marshal_prim(this->curline, size_t); + marshal_prim(this->in_response_to, uint32_t); + marshal_prim(this->have_all_headers, int); + marshal_prim(this->bad_message, int); MARSHAL_EPILOGUE; } @@ -848,6 +842,10 @@ int libcoopgamma_context_unmarshal(libcoopgamma_context_t* restrict this, this->inbound_size = this->inbound_head; unmarshal_buffer(this->inbound, this->inbound_head); unmarshal_prim(this->length, size_t); + unmarshal_prim(this->curline, size_t); + unmarshal_prim(this->in_response_to, uint32_t); + unmarshal_prim(this->have_all_headers, int); + unmarshal_prim(this->bad_message, int); UNMARSHAL_EPILOGUE; } @@ -1272,6 +1270,7 @@ char* libcoopgamma_get_socket_file(const char* restrict method, const char* rest int libcoopgamma_connect(const char* restrict method, const char* restrict site, libcoopgamma_context_t* restrict ctx) { + /* TODO */ } @@ -1355,12 +1354,105 @@ int libcoopgamma_flush(libcoopgamma_context_t* restrict ctx) * Functions that parse the message will detect such corruption. * @return Zero on success, -1 on error, -2 if the message is ignored * which happens if corresponding `libcoopgamma_async_context_t` - * is not listed + * is not listed. If `-1` is returned, `errno` will be set, + * if it is set to `ENOTRECOVERABLE` you have receive a corrupt + * message and the context has been tainted beyond recover. */ int libcoopgamma_synchronise(libcoopgamma_context_t* restrict ctx, libcoopgamma_async_context_t* restrict pending, size_t n, size_t* restrict selected) { + char temp[3 * sizeof(size_t) + 1]; + ssize_t got; + size_t i; + char* p; + char* line; + char* value; + + if (ctx->inbound_head == ctx->inbound_tail) + ctx->inbound_head = ctx->inbound_tail = 0; + + if (ctx->inbound_tail > 0) + { + memmove(ctx->inbound, ctx->inbound + ctx->inbound_tail, ctx->inbound_head -= ctx->inbound_tail); + ctx->inbound_tail = 0; + } + + for (;;) + { + if (ctx->inbound_head == ctx->inbound_size) + { + size_t new_size = ctx->inbound_size ? (ctx->inbound_size << 1) : 1024; + void* new = realloc(ctx->inbound, new_size); + if (new == NULL) + return -1; + ctx->inbound = new; + ctx->inbound_size = new_size; + } + + got = recv(ctx->fd, ctx->inbound + ctx->inbound_head, ctx->inbound_size - ctx->inbound_head, 0); + if (got <= 0) + { + if (got == 0) + errno = ECONNRESET; + return -1; + } + ctx->inbound_head += (size_t)got; + + while (ctx->have_all_headers == 0) + { + line = ctx->inbound + ctx->curline; + p = memchr(line, '\n', ctx->inbound_head - ctx->curline); + if (p == NULL) + break; + if (memchr(line, '\0', ctx->inbound_head - ctx->curline) != NULL) + ctx->bad_message = 1; + *p++ = '\0'; + ctx->curline += (size_t)(p - ctx->inbound); + if (!*line) + { + ctx->have_all_headers = 1; + } + else if (strstr(line, "In response to: ") == line) + { + value = line + (sizeof("In response to: ") - 1); + ctx->in_response_to = (uint32_t)atol(value); + } + else if (strstr(line, "Length: ") == line) + { + value = line + (sizeof("Length: ") - 1); + ctx->length = (size_t)atol(value); + sprintf(temp, "%zu", ctx->length); + if (strcmp(value, temp)) + goto fatal; + } + } + + if (ctx->have_all_headers && (ctx->inbound_head >= ctx->curline + ctx->length)) + { + ctx->curline += ctx->length; + if (ctx->bad_message) + { + ctx->bad_message = 0; + ctx->have_all_headers = 0; + ctx->length = 0; + errno = EBADMSG; + return -1; + } + for (i = 0; i < n; i++) + if (pending[i].message_id == ctx->in_response_to) + { + *selected = i; + return 0; + } + *selected = 0; + return -2; + } + } + + fatal: + errno = ENOTRECOVERABLE; + return -1; } @@ -1476,10 +1568,12 @@ static char* next_header(libcoopgamma_context_t* restrict ctx) */ static char* next_payload(libcoopgamma_context_t* restrict ctx, size_t* n) { + ctx->have_all_headers = 0; if ((*n = ctx->length)) { char* rc = ctx->inbound + ctx->inbound_tail; ctx->inbound_tail += *n; + ctx->length = 0; return rc; } else |