aboutsummaryrefslogtreecommitdiffstats
path: root/src/mds-registry/slave.c
diff options
context:
space:
mode:
authorMattias Andrée <maandree@operamail.com>2014-08-02 22:24:14 +0200
committerMattias Andrée <maandree@operamail.com>2014-08-02 22:24:20 +0200
commit03deac86d4448e0422fefefea0018d6f5bea1a8d (patch)
tree3fbc9995599b241ad13b91d84e591d1f428f9f01 /src/mds-registry/slave.c
parentmds-registry use timedwait in slaves so we can stop when reexecing of terminating (diff)
downloadmds-03deac86d4448e0422fefefea0018d6f5bea1a8d.tar.gz
mds-03deac86d4448e0422fefefea0018d6f5bea1a8d.tar.bz2
mds-03deac86d4448e0422fefefea0018d6f5bea1a8d.tar.xz
mds-registry: add time-to-live
Signed-off-by: Mattias Andrée <maandree@operamail.com>
Diffstat (limited to '')
-rw-r--r--src/mds-registry/slave.c38
1 files changed, 37 insertions, 1 deletions
diff --git a/src/mds-registry/slave.c b/src/mds-registry/slave.c
index 5a1063f..1be9322 100644
--- a/src/mds-registry/slave.c
+++ b/src/mds-registry/slave.c
@@ -81,6 +81,7 @@ static void* slave_loop(void* data)
.tv_nsec = 0
};
slave_t* slave = data;
+ struct timespec now;
if (slave->closed)
goto done;
@@ -94,6 +95,15 @@ static void* slave_loop(void* data)
{
if ((slave->wait_set->size == 0) || slave->closed)
break;
+ if (slave->timed)
+ {
+ fail_if (monotone(&now));
+ if (now.tv_sec > slave->dethklok.tv_sec)
+ break;
+ if (now.tv_sec == slave->dethklok.tv_sec)
+ if (now.tv_nsec >= slave->dethklok.tv_nsec)
+ break;
+ }
pthread_cond_timedwait(&slave_cond, &slave_mutex, &timeout);
}
@@ -156,7 +166,7 @@ int start_created_slave(slave_t* restrict slave)
int start_slave(hash_table_t* restrict wait_set, const char* restrict recv_client_id, const char* restrict recv_message_id)
{
slave_t* slave = slave_create(wait_set, recv_client_id, recv_message_id);
- size_t slave_address;
+ size_t slave_address, i;
ssize_t node = LINKED_LIST_UNUSED;
fail_if (slave == NULL);
@@ -167,6 +177,17 @@ int start_slave(hash_table_t* restrict wait_set, const char* restrict recv_clien
if (slave->node == LINKED_LIST_UNUSED)
goto pfail_in_mutex;
+ for (i = 0; i < received.header_count; i++)
+ if (startswith(received.headers[i], "Time to live: "))
+ {
+ const char* ttl = received.headers[i] + strlen("Time to live: ");
+ slave->timed = 1;
+ if (monotone(&(slave->dethklok)))
+ goto pfail_in_mutex;
+ slave->dethklok.tv_sec += (time_t)atoll(ttl);
+ break;
+ }
+
if ((errno = pthread_create(&(slave->thread), NULL, slave_loop, (void*)(intptr_t)slave)))
goto pfail_in_mutex;
@@ -287,6 +308,9 @@ void slave_initialise(slave_t* restrict this)
this->client_id = NULL;
this->message_id = NULL;
this->closed = 0;
+ this->dethklok.tv_sec = 0;
+ this->dethklok.tv_nsec = 0;
+ this->timed = 0;
}
@@ -324,6 +348,7 @@ size_t slave_marshal_size(const slave_t* restrict this)
hash_entry_t* restrict entry;
size_t n;
+ rc += sizeof(int) + sizeof(time_t) + sizeof(long);
rc += (strlen(this->client_id) + strlen(this->message_id) + 2) * sizeof(char);
foreach_hash_table_entry (*(this->wait_set), n, entry)
@@ -352,6 +377,9 @@ size_t slave_marshal(const slave_t* restrict this, char* restrict data)
buf_set_next(data, int, this->closed);
buf_set_next(data, ssize_t, this->node);
buf_set_next(data, uint64_t, this->client);
+ buf_set_next(data, int, this->timed);
+ buf_set_next(data, time_t, this->dethklok.tv_sec);
+ buf_set_next(data, long, this->dethklok.tv_nsec);
memcpy(data, this->client_id, (strlen(this->client_id) + 1) * sizeof(char));
data += strlen(this->client_id) + 1;
@@ -396,6 +424,9 @@ size_t slave_unmarshal(slave_t* restrict this, char* restrict data)
buf_get_next(data, int, this->closed);
buf_get_next(data, ssize_t, this->node);
buf_get_next(data, uint64_t, this->client);
+ buf_get_next(data, int, this->timed);
+ buf_get_next(data, time_t, this->dethklok.tv_sec);
+ buf_get_next(data, long, this->dethklok.tv_nsec);
n = (strlen((char*)data) + 1) * sizeof(char);
if ((this->client_id = malloc(n)) == NULL)
@@ -446,12 +477,17 @@ size_t slave_unmarshal(slave_t* restrict this, char* restrict data)
size_t slave_unmarshal_skip(char* restrict data)
{
size_t n, m, rc = 2 * sizeof(int) + sizeof(ssize_t) + sizeof(size_t) + sizeof(uint64_t);
+ rc += sizeof(int) + sizeof(time_t) + sizeof(long);
/* buf_get_next(data, int, SLAVE_T_VERSION); */
buf_next(data, int, 1);
buf_next(data, int, 1);
buf_next(data, ssize_t, 1);
+ buf_next(data, uint64_t, 1);
+ buf_next(data, int, 1);
+ buf_next(data, time_t, 1);
+ buf_next(data, long, 1);
n = (strlen((char*)data) + 1) * sizeof(char);
data += n, rc += n;