aboutsummaryrefslogtreecommitdiffstats
path: root/src/mds-registry
diff options
context:
space:
mode:
authorMattias Andrée <maandree@operamail.com>2014-08-02 22:24:14 +0200
committerMattias Andrée <maandree@operamail.com>2014-08-02 22:24:20 +0200
commit03deac86d4448e0422fefefea0018d6f5bea1a8d (patch)
tree3fbc9995599b241ad13b91d84e591d1f428f9f01 /src/mds-registry
parentmds-registry use timedwait in slaves so we can stop when reexecing of terminating (diff)
downloadmds-03deac86d4448e0422fefefea0018d6f5bea1a8d.tar.gz
mds-03deac86d4448e0422fefefea0018d6f5bea1a8d.tar.bz2
mds-03deac86d4448e0422fefefea0018d6f5bea1a8d.tar.xz
mds-registry: add time-to-live
Signed-off-by: Mattias Andrée <maandree@operamail.com>
Diffstat (limited to '')
-rw-r--r--src/mds-registry/registry.c2
-rw-r--r--src/mds-registry/slave.c38
-rw-r--r--src/mds-registry/slave.h14
3 files changed, 51 insertions, 3 deletions
diff --git a/src/mds-registry/registry.c b/src/mds-registry/registry.c
index 80bfbe3..b678a4f 100644
--- a/src/mds-registry/registry.c
+++ b/src/mds-registry/registry.c
@@ -426,7 +426,7 @@ static int handle_register_message(void)
else
continue;
- /* Stop if we got all headers we recognised. */
+ /* Stop if we got all headers we recognised, except ‘Time to live’. */
if (recv_client_id && recv_message_id && recv_length && recv_action)
break;
}
diff --git a/src/mds-registry/slave.c b/src/mds-registry/slave.c
index 5a1063f..1be9322 100644
--- a/src/mds-registry/slave.c
+++ b/src/mds-registry/slave.c
@@ -81,6 +81,7 @@ static void* slave_loop(void* data)
.tv_nsec = 0
};
slave_t* slave = data;
+ struct timespec now;
if (slave->closed)
goto done;
@@ -94,6 +95,15 @@ static void* slave_loop(void* data)
{
if ((slave->wait_set->size == 0) || slave->closed)
break;
+ if (slave->timed)
+ {
+ fail_if (monotone(&now));
+ if (now.tv_sec > slave->dethklok.tv_sec)
+ break;
+ if (now.tv_sec == slave->dethklok.tv_sec)
+ if (now.tv_nsec >= slave->dethklok.tv_nsec)
+ break;
+ }
pthread_cond_timedwait(&slave_cond, &slave_mutex, &timeout);
}
@@ -156,7 +166,7 @@ int start_created_slave(slave_t* restrict slave)
int start_slave(hash_table_t* restrict wait_set, const char* restrict recv_client_id, const char* restrict recv_message_id)
{
slave_t* slave = slave_create(wait_set, recv_client_id, recv_message_id);
- size_t slave_address;
+ size_t slave_address, i;
ssize_t node = LINKED_LIST_UNUSED;
fail_if (slave == NULL);
@@ -167,6 +177,17 @@ int start_slave(hash_table_t* restrict wait_set, const char* restrict recv_clien
if (slave->node == LINKED_LIST_UNUSED)
goto pfail_in_mutex;
+ for (i = 0; i < received.header_count; i++)
+ if (startswith(received.headers[i], "Time to live: "))
+ {
+ const char* ttl = received.headers[i] + strlen("Time to live: ");
+ slave->timed = 1;
+ if (monotone(&(slave->dethklok)))
+ goto pfail_in_mutex;
+ slave->dethklok.tv_sec += (time_t)atoll(ttl);
+ break;
+ }
+
if ((errno = pthread_create(&(slave->thread), NULL, slave_loop, (void*)(intptr_t)slave)))
goto pfail_in_mutex;
@@ -287,6 +308,9 @@ void slave_initialise(slave_t* restrict this)
this->client_id = NULL;
this->message_id = NULL;
this->closed = 0;
+ this->dethklok.tv_sec = 0;
+ this->dethklok.tv_nsec = 0;
+ this->timed = 0;
}
@@ -324,6 +348,7 @@ size_t slave_marshal_size(const slave_t* restrict this)
hash_entry_t* restrict entry;
size_t n;
+ rc += sizeof(int) + sizeof(time_t) + sizeof(long);
rc += (strlen(this->client_id) + strlen(this->message_id) + 2) * sizeof(char);
foreach_hash_table_entry (*(this->wait_set), n, entry)
@@ -352,6 +377,9 @@ size_t slave_marshal(const slave_t* restrict this, char* restrict data)
buf_set_next(data, int, this->closed);
buf_set_next(data, ssize_t, this->node);
buf_set_next(data, uint64_t, this->client);
+ buf_set_next(data, int, this->timed);
+ buf_set_next(data, time_t, this->dethklok.tv_sec);
+ buf_set_next(data, long, this->dethklok.tv_nsec);
memcpy(data, this->client_id, (strlen(this->client_id) + 1) * sizeof(char));
data += strlen(this->client_id) + 1;
@@ -396,6 +424,9 @@ size_t slave_unmarshal(slave_t* restrict this, char* restrict data)
buf_get_next(data, int, this->closed);
buf_get_next(data, ssize_t, this->node);
buf_get_next(data, uint64_t, this->client);
+ buf_get_next(data, int, this->timed);
+ buf_get_next(data, time_t, this->dethklok.tv_sec);
+ buf_get_next(data, long, this->dethklok.tv_nsec);
n = (strlen((char*)data) + 1) * sizeof(char);
if ((this->client_id = malloc(n)) == NULL)
@@ -446,12 +477,17 @@ size_t slave_unmarshal(slave_t* restrict this, char* restrict data)
size_t slave_unmarshal_skip(char* restrict data)
{
size_t n, m, rc = 2 * sizeof(int) + sizeof(ssize_t) + sizeof(size_t) + sizeof(uint64_t);
+ rc += sizeof(int) + sizeof(time_t) + sizeof(long);
/* buf_get_next(data, int, SLAVE_T_VERSION); */
buf_next(data, int, 1);
buf_next(data, int, 1);
buf_next(data, ssize_t, 1);
+ buf_next(data, uint64_t, 1);
+ buf_next(data, int, 1);
+ buf_next(data, time_t, 1);
+ buf_next(data, long, 1);
n = (strlen((char*)data) + 1) * sizeof(char);
data += n, rc += n;
diff --git a/src/mds-registry/slave.h b/src/mds-registry/slave.h
index cd98f80..9f08f07 100644
--- a/src/mds-registry/slave.h
+++ b/src/mds-registry/slave.h
@@ -24,6 +24,7 @@
#include <stdlib.h>
#include <stdint.h>
#include <pthread.h>
+#include <time.h>
@@ -32,7 +33,7 @@
/**
* Slave information, a thread waiting for protocols to become available
*/
-typedef struct slave /* TODO: add time-to-live */
+typedef struct slave
{
/**
* Set of protocols for which to wait that they become available
@@ -69,6 +70,17 @@ typedef struct slave /* TODO: add time-to-live */
*/
pthread_t thread;
+ /**
+ * The time slave should die if its condition
+ * has not be meet at that time
+ */
+ struct timespec dethklok;
+
+ /**
+ * Whether `dethklok` should apply
+ */
+ int timed;
+
} slave_t;