aboutsummaryrefslogtreecommitdiffstats
path: root/src/libcoopgamma.c
diff options
context:
space:
mode:
authorMattias Andrée <maandree@kth.se>2016-07-15 17:54:17 +0200
committerMattias Andrée <maandree@kth.se>2016-07-15 17:54:17 +0200
commit6c6a8d38751318d1eea8be8e69a0a8f23e596f4d (patch)
treeb8e2e62f404071b9cd4dd0d505d04a835a9e88f4 /src/libcoopgamma.c
parentm doc (diff)
downloadlibcoopgamma-6c6a8d38751318d1eea8be8e69a0a8f23e596f4d.tar.gz
libcoopgamma-6c6a8d38751318d1eea8be8e69a0a8f23e596f4d.tar.bz2
libcoopgamma-6c6a8d38751318d1eea8be8e69a0a8f23e596f4d.tar.xz
Implement libcoopgamma_synchronise
Signed-off-by: Mattias Andrée <maandree@kth.se>
Diffstat (limited to 'src/libcoopgamma.c')
-rw-r--r--src/libcoopgamma.c118
1 files changed, 106 insertions, 12 deletions
diff --git a/src/libcoopgamma.c b/src/libcoopgamma.c
index 36ffde2..175828f 100644
--- a/src/libcoopgamma.c
+++ b/src/libcoopgamma.c
@@ -754,18 +754,8 @@ int libcoopgamma_error_unmarshal(libcoopgamma_error_t* restrict this,
*/
int libcoopgamma_context_initialise(libcoopgamma_context_t* restrict this)
{
+ memset(this, 0, sizeof(*this));
this->fd = -1;
- libcoopgamma_error_initialise(&(this->error));
- this->message_id = 0;
- this->outbound = NULL;
- this->outbound_head = 0;
- this->outbound_tail = 0;
- this->outbound_size = 0;
- this->inbound = NULL;
- this->inbound_head = 0;
- this->inbound_tail = 0;
- this->inbound_size = 0;
- this->length = 0;
return 0;
}
@@ -815,6 +805,10 @@ size_t libcoopgamma_context_marshal(const libcoopgamma_context_t* restrict this,
marshal_prim(this->inbound_head - this->inbound_tail, size_t);
marshal_buffer(this->inbound + this->inbound_head, this->inbound_head - this->inbound_tail);
marshal_prim(this->length, size_t);
+ marshal_prim(this->curline, size_t);
+ marshal_prim(this->in_response_to, uint32_t);
+ marshal_prim(this->have_all_headers, int);
+ marshal_prim(this->bad_message, int);
MARSHAL_EPILOGUE;
}
@@ -848,6 +842,10 @@ int libcoopgamma_context_unmarshal(libcoopgamma_context_t* restrict this,
this->inbound_size = this->inbound_head;
unmarshal_buffer(this->inbound, this->inbound_head);
unmarshal_prim(this->length, size_t);
+ unmarshal_prim(this->curline, size_t);
+ unmarshal_prim(this->in_response_to, uint32_t);
+ unmarshal_prim(this->have_all_headers, int);
+ unmarshal_prim(this->bad_message, int);
UNMARSHAL_EPILOGUE;
}
@@ -1272,6 +1270,7 @@ char* libcoopgamma_get_socket_file(const char* restrict method, const char* rest
int libcoopgamma_connect(const char* restrict method, const char* restrict site,
libcoopgamma_context_t* restrict ctx)
{
+ /* TODO */
}
@@ -1355,12 +1354,105 @@ int libcoopgamma_flush(libcoopgamma_context_t* restrict ctx)
* Functions that parse the message will detect such corruption.
* @return Zero on success, -1 on error, -2 if the message is ignored
* which happens if corresponding `libcoopgamma_async_context_t`
- * is not listed
+ * is not listed. If `-1` is returned, `errno` will be set,
+ * if it is set to `ENOTRECOVERABLE` you have receive a corrupt
+ * message and the context has been tainted beyond recover.
*/
int libcoopgamma_synchronise(libcoopgamma_context_t* restrict ctx,
libcoopgamma_async_context_t* restrict pending,
size_t n, size_t* restrict selected)
{
+ char temp[3 * sizeof(size_t) + 1];
+ ssize_t got;
+ size_t i;
+ char* p;
+ char* line;
+ char* value;
+
+ if (ctx->inbound_head == ctx->inbound_tail)
+ ctx->inbound_head = ctx->inbound_tail = 0;
+
+ if (ctx->inbound_tail > 0)
+ {
+ memmove(ctx->inbound, ctx->inbound + ctx->inbound_tail, ctx->inbound_head -= ctx->inbound_tail);
+ ctx->inbound_tail = 0;
+ }
+
+ for (;;)
+ {
+ if (ctx->inbound_head == ctx->inbound_size)
+ {
+ size_t new_size = ctx->inbound_size ? (ctx->inbound_size << 1) : 1024;
+ void* new = realloc(ctx->inbound, new_size);
+ if (new == NULL)
+ return -1;
+ ctx->inbound = new;
+ ctx->inbound_size = new_size;
+ }
+
+ got = recv(ctx->fd, ctx->inbound + ctx->inbound_head, ctx->inbound_size - ctx->inbound_head, 0);
+ if (got <= 0)
+ {
+ if (got == 0)
+ errno = ECONNRESET;
+ return -1;
+ }
+ ctx->inbound_head += (size_t)got;
+
+ while (ctx->have_all_headers == 0)
+ {
+ line = ctx->inbound + ctx->curline;
+ p = memchr(line, '\n', ctx->inbound_head - ctx->curline);
+ if (p == NULL)
+ break;
+ if (memchr(line, '\0', ctx->inbound_head - ctx->curline) != NULL)
+ ctx->bad_message = 1;
+ *p++ = '\0';
+ ctx->curline += (size_t)(p - ctx->inbound);
+ if (!*line)
+ {
+ ctx->have_all_headers = 1;
+ }
+ else if (strstr(line, "In response to: ") == line)
+ {
+ value = line + (sizeof("In response to: ") - 1);
+ ctx->in_response_to = (uint32_t)atol(value);
+ }
+ else if (strstr(line, "Length: ") == line)
+ {
+ value = line + (sizeof("Length: ") - 1);
+ ctx->length = (size_t)atol(value);
+ sprintf(temp, "%zu", ctx->length);
+ if (strcmp(value, temp))
+ goto fatal;
+ }
+ }
+
+ if (ctx->have_all_headers && (ctx->inbound_head >= ctx->curline + ctx->length))
+ {
+ ctx->curline += ctx->length;
+ if (ctx->bad_message)
+ {
+ ctx->bad_message = 0;
+ ctx->have_all_headers = 0;
+ ctx->length = 0;
+ errno = EBADMSG;
+ return -1;
+ }
+ for (i = 0; i < n; i++)
+ if (pending[i].message_id == ctx->in_response_to)
+ {
+ *selected = i;
+ return 0;
+ }
+ *selected = 0;
+ return -2;
+ }
+ }
+
+ fatal:
+ errno = ENOTRECOVERABLE;
+ return -1;
}
@@ -1476,10 +1568,12 @@ static char* next_header(libcoopgamma_context_t* restrict ctx)
*/
static char* next_payload(libcoopgamma_context_t* restrict ctx, size_t* n)
{
+ ctx->have_all_headers = 0;
if ((*n = ctx->length))
{
char* rc = ctx->inbound + ctx->inbound_tail;
ctx->inbound_tail += *n;
+ ctx->length = 0;
return rc;
}
else