From 03deac86d4448e0422fefefea0018d6f5bea1a8d Mon Sep 17 00:00:00 2001 From: Mattias Andrée Date: Sat, 2 Aug 2014 22:24:14 +0200 Subject: mds-registry: add time-to-live MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Mattias Andrée --- src/mds-registry/registry.c | 2 +- src/mds-registry/slave.c | 38 +++++++++++++++++++++++++++++++++++++- src/mds-registry/slave.h | 14 +++++++++++++- 3 files changed, 51 insertions(+), 3 deletions(-) (limited to 'src/mds-registry') diff --git a/src/mds-registry/registry.c b/src/mds-registry/registry.c index 80bfbe3..b678a4f 100644 --- a/src/mds-registry/registry.c +++ b/src/mds-registry/registry.c @@ -426,7 +426,7 @@ static int handle_register_message(void) else continue; - /* Stop if we got all headers we recognised. */ + /* Stop if we got all headers we recognised, except ‘Time to live’. */ if (recv_client_id && recv_message_id && recv_length && recv_action) break; } diff --git a/src/mds-registry/slave.c b/src/mds-registry/slave.c index 5a1063f..1be9322 100644 --- a/src/mds-registry/slave.c +++ b/src/mds-registry/slave.c @@ -81,6 +81,7 @@ static void* slave_loop(void* data) .tv_nsec = 0 }; slave_t* slave = data; + struct timespec now; if (slave->closed) goto done; @@ -94,6 +95,15 @@ static void* slave_loop(void* data) { if ((slave->wait_set->size == 0) || slave->closed) break; + if (slave->timed) + { + fail_if (monotone(&now)); + if (now.tv_sec > slave->dethklok.tv_sec) + break; + if (now.tv_sec == slave->dethklok.tv_sec) + if (now.tv_nsec >= slave->dethklok.tv_nsec) + break; + } pthread_cond_timedwait(&slave_cond, &slave_mutex, &timeout); } @@ -156,7 +166,7 @@ int start_created_slave(slave_t* restrict slave) int start_slave(hash_table_t* restrict wait_set, const char* restrict recv_client_id, const char* restrict recv_message_id) { slave_t* slave = slave_create(wait_set, recv_client_id, recv_message_id); - size_t slave_address; + size_t slave_address, i; ssize_t node = LINKED_LIST_UNUSED; fail_if (slave == NULL); @@ -167,6 +177,17 @@ int start_slave(hash_table_t* restrict wait_set, const char* restrict recv_clien if (slave->node == LINKED_LIST_UNUSED) goto pfail_in_mutex; + for (i = 0; i < received.header_count; i++) + if (startswith(received.headers[i], "Time to live: ")) + { + const char* ttl = received.headers[i] + strlen("Time to live: "); + slave->timed = 1; + if (monotone(&(slave->dethklok))) + goto pfail_in_mutex; + slave->dethklok.tv_sec += (time_t)atoll(ttl); + break; + } + if ((errno = pthread_create(&(slave->thread), NULL, slave_loop, (void*)(intptr_t)slave))) goto pfail_in_mutex; @@ -287,6 +308,9 @@ void slave_initialise(slave_t* restrict this) this->client_id = NULL; this->message_id = NULL; this->closed = 0; + this->dethklok.tv_sec = 0; + this->dethklok.tv_nsec = 0; + this->timed = 0; } @@ -324,6 +348,7 @@ size_t slave_marshal_size(const slave_t* restrict this) hash_entry_t* restrict entry; size_t n; + rc += sizeof(int) + sizeof(time_t) + sizeof(long); rc += (strlen(this->client_id) + strlen(this->message_id) + 2) * sizeof(char); foreach_hash_table_entry (*(this->wait_set), n, entry) @@ -352,6 +377,9 @@ size_t slave_marshal(const slave_t* restrict this, char* restrict data) buf_set_next(data, int, this->closed); buf_set_next(data, ssize_t, this->node); buf_set_next(data, uint64_t, this->client); + buf_set_next(data, int, this->timed); + buf_set_next(data, time_t, this->dethklok.tv_sec); + buf_set_next(data, long, this->dethklok.tv_nsec); memcpy(data, this->client_id, (strlen(this->client_id) + 1) * sizeof(char)); data += strlen(this->client_id) + 1; @@ -396,6 +424,9 @@ size_t slave_unmarshal(slave_t* restrict this, char* restrict data) buf_get_next(data, int, this->closed); buf_get_next(data, ssize_t, this->node); buf_get_next(data, uint64_t, this->client); + buf_get_next(data, int, this->timed); + buf_get_next(data, time_t, this->dethklok.tv_sec); + buf_get_next(data, long, this->dethklok.tv_nsec); n = (strlen((char*)data) + 1) * sizeof(char); if ((this->client_id = malloc(n)) == NULL) @@ -446,12 +477,17 @@ size_t slave_unmarshal(slave_t* restrict this, char* restrict data) size_t slave_unmarshal_skip(char* restrict data) { size_t n, m, rc = 2 * sizeof(int) + sizeof(ssize_t) + sizeof(size_t) + sizeof(uint64_t); + rc += sizeof(int) + sizeof(time_t) + sizeof(long); /* buf_get_next(data, int, SLAVE_T_VERSION); */ buf_next(data, int, 1); buf_next(data, int, 1); buf_next(data, ssize_t, 1); + buf_next(data, uint64_t, 1); + buf_next(data, int, 1); + buf_next(data, time_t, 1); + buf_next(data, long, 1); n = (strlen((char*)data) + 1) * sizeof(char); data += n, rc += n; diff --git a/src/mds-registry/slave.h b/src/mds-registry/slave.h index cd98f80..9f08f07 100644 --- a/src/mds-registry/slave.h +++ b/src/mds-registry/slave.h @@ -24,6 +24,7 @@ #include #include #include +#include @@ -32,7 +33,7 @@ /** * Slave information, a thread waiting for protocols to become available */ -typedef struct slave /* TODO: add time-to-live */ +typedef struct slave { /** * Set of protocols for which to wait that they become available @@ -69,6 +70,17 @@ typedef struct slave /* TODO: add time-to-live */ */ pthread_t thread; + /** + * The time slave should die if its condition + * has not be meet at that time + */ + struct timespec dethklok; + + /** + * Whether `dethklok` should apply + */ + int timed; + } slave_t; -- cgit v1.2.3-70-g09d2