diff options
author | Mattias Andrée <maandree@operamail.com> | 2014-08-02 22:24:14 +0200 |
---|---|---|
committer | Mattias Andrée <maandree@operamail.com> | 2014-08-02 22:24:20 +0200 |
commit | 03deac86d4448e0422fefefea0018d6f5bea1a8d (patch) | |
tree | 3fbc9995599b241ad13b91d84e591d1f428f9f01 /src/mds-registry/slave.c | |
parent | mds-registry use timedwait in slaves so we can stop when reexecing of terminating (diff) | |
download | mds-03deac86d4448e0422fefefea0018d6f5bea1a8d.tar.gz mds-03deac86d4448e0422fefefea0018d6f5bea1a8d.tar.bz2 mds-03deac86d4448e0422fefefea0018d6f5bea1a8d.tar.xz |
mds-registry: add time-to-live
Signed-off-by: Mattias Andrée <maandree@operamail.com>
Diffstat (limited to '')
-rw-r--r-- | src/mds-registry/slave.c | 38 |
1 files changed, 37 insertions, 1 deletions
diff --git a/src/mds-registry/slave.c b/src/mds-registry/slave.c index 5a1063f..1be9322 100644 --- a/src/mds-registry/slave.c +++ b/src/mds-registry/slave.c @@ -81,6 +81,7 @@ static void* slave_loop(void* data) .tv_nsec = 0 }; slave_t* slave = data; + struct timespec now; if (slave->closed) goto done; @@ -94,6 +95,15 @@ static void* slave_loop(void* data) { if ((slave->wait_set->size == 0) || slave->closed) break; + if (slave->timed) + { + fail_if (monotone(&now)); + if (now.tv_sec > slave->dethklok.tv_sec) + break; + if (now.tv_sec == slave->dethklok.tv_sec) + if (now.tv_nsec >= slave->dethklok.tv_nsec) + break; + } pthread_cond_timedwait(&slave_cond, &slave_mutex, &timeout); } @@ -156,7 +166,7 @@ int start_created_slave(slave_t* restrict slave) int start_slave(hash_table_t* restrict wait_set, const char* restrict recv_client_id, const char* restrict recv_message_id) { slave_t* slave = slave_create(wait_set, recv_client_id, recv_message_id); - size_t slave_address; + size_t slave_address, i; ssize_t node = LINKED_LIST_UNUSED; fail_if (slave == NULL); @@ -167,6 +177,17 @@ int start_slave(hash_table_t* restrict wait_set, const char* restrict recv_clien if (slave->node == LINKED_LIST_UNUSED) goto pfail_in_mutex; + for (i = 0; i < received.header_count; i++) + if (startswith(received.headers[i], "Time to live: ")) + { + const char* ttl = received.headers[i] + strlen("Time to live: "); + slave->timed = 1; + if (monotone(&(slave->dethklok))) + goto pfail_in_mutex; + slave->dethklok.tv_sec += (time_t)atoll(ttl); + break; + } + if ((errno = pthread_create(&(slave->thread), NULL, slave_loop, (void*)(intptr_t)slave))) goto pfail_in_mutex; @@ -287,6 +308,9 @@ void slave_initialise(slave_t* restrict this) this->client_id = NULL; this->message_id = NULL; this->closed = 0; + this->dethklok.tv_sec = 0; + this->dethklok.tv_nsec = 0; + this->timed = 0; } @@ -324,6 +348,7 @@ size_t slave_marshal_size(const slave_t* restrict this) hash_entry_t* restrict entry; size_t n; + rc += sizeof(int) + sizeof(time_t) + sizeof(long); rc += (strlen(this->client_id) + strlen(this->message_id) + 2) * sizeof(char); foreach_hash_table_entry (*(this->wait_set), n, entry) @@ -352,6 +377,9 @@ size_t slave_marshal(const slave_t* restrict this, char* restrict data) buf_set_next(data, int, this->closed); buf_set_next(data, ssize_t, this->node); buf_set_next(data, uint64_t, this->client); + buf_set_next(data, int, this->timed); + buf_set_next(data, time_t, this->dethklok.tv_sec); + buf_set_next(data, long, this->dethklok.tv_nsec); memcpy(data, this->client_id, (strlen(this->client_id) + 1) * sizeof(char)); data += strlen(this->client_id) + 1; @@ -396,6 +424,9 @@ size_t slave_unmarshal(slave_t* restrict this, char* restrict data) buf_get_next(data, int, this->closed); buf_get_next(data, ssize_t, this->node); buf_get_next(data, uint64_t, this->client); + buf_get_next(data, int, this->timed); + buf_get_next(data, time_t, this->dethklok.tv_sec); + buf_get_next(data, long, this->dethklok.tv_nsec); n = (strlen((char*)data) + 1) * sizeof(char); if ((this->client_id = malloc(n)) == NULL) @@ -446,12 +477,17 @@ size_t slave_unmarshal(slave_t* restrict this, char* restrict data) size_t slave_unmarshal_skip(char* restrict data) { size_t n, m, rc = 2 * sizeof(int) + sizeof(ssize_t) + sizeof(size_t) + sizeof(uint64_t); + rc += sizeof(int) + sizeof(time_t) + sizeof(long); /* buf_get_next(data, int, SLAVE_T_VERSION); */ buf_next(data, int, 1); buf_next(data, int, 1); buf_next(data, ssize_t, 1); + buf_next(data, uint64_t, 1); + buf_next(data, int, 1); + buf_next(data, time_t, 1); + buf_next(data, long, 1); n = (strlen((char*)data) + 1) * sizeof(char); data += n, rc += n; |