aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--src/mds-registry.c149
1 files changed, 140 insertions, 9 deletions
diff --git a/src/mds-registry.c b/src/mds-registry.c
index 8bd7057..c7ac518 100644
--- a/src/mds-registry.c
+++ b/src/mds-registry.c
@@ -73,6 +73,26 @@ static int connected = 1;
*/
static hash_table_t reg_table;
+/**
+ * Reusable buffer for data to send
+ */
+static char* send_buffer = NULL;
+
+/**
+ * The size of `send_buffer`
+ */
+static size_t send_buffer_size = 0;
+
+/**
+ * General mutex
+ */
+static pthread_mutex_t reg_mutex;
+
+/**
+ * General condition
+ */
+static pthread_cond_t reg_cond;
+
/**
@@ -83,6 +103,19 @@ static hash_table_t reg_table;
*/
int __attribute__((const)) preinitialise_server(void)
{
+ if ((errno = pthread_mutex_init(&reg_mutex, NULL)))
+ {
+ perror(*argv);
+ return 1;
+ }
+
+ if ((errno = pthread_cond_init(&reg_cond, NULL)))
+ {
+ perror(*argv);
+ pthread_mutex_destroy(&reg_mutex);
+ return 1;
+ }
+
return 0;
}
@@ -102,7 +135,7 @@ int initialise_server(void)
"\n"
"Command: register\n"
"Client closed\n" /* TODO support not implemented yet */
-
+ /* -- NEXT MESSAGE -- */
"Command: reregister\n"
"Message ID: 1\n"
"\n";
@@ -194,6 +227,7 @@ int unmarshal_server(char* state_buf)
r = mds_message_unmarshal(&received, state_buf);
if (r)
mds_message_destroy(&received);
+ /* TODO unmarshal &reg_table */
reg_table.key_comparator = (compare_func*)string_comparator;
reg_table.hasher = (hash_func*)string_hash;
return r;
@@ -219,6 +253,8 @@ int __attribute__((const)) reexec_failure_recover(void)
*/
int master_loop(void)
{
+ int rc = 1;
+
while (!reexecing && !terminating)
{
int r = mds_message_read(&received, socket_fd);
@@ -248,15 +284,20 @@ int master_loop(void)
connected = 1;
}
- /* TODO if !reexecing or failing, cleanup reg_table */
-
- mds_message_destroy(&received);
- return 0;
+ rc = 0;
+ goto fail;
pfail:
perror(*argv);
fail:
+ if (rc || !reexecing)
+ {
+ /* TODO cleanup reg_table */
+ }
+ pthread_mutex_destroy(&reg_mutex);
+ pthread_cond_destroy(&reg_cond);
mds_message_destroy(&received);
- return 1;
+ free(send_buffer);
+ return rc;
}
@@ -400,6 +441,13 @@ int registry_action(size_t length, int action, const char* recv_client_id, const
payload[length] = '\n';
+ errno = pthread_mutex_lock(&reg_mutex);
+ if (errno)
+ {
+ perror(*argv);
+ return -1;
+ }
+
for (begin = 0; begin < length;)
{
char* end = rawmemchr(payload + begin, '\n');
@@ -416,14 +464,21 @@ int registry_action(size_t length, int action, const char* recv_client_id, const
size_t address = hash_table_get(&reg_table, command_key);
client_list_t* list = (client_list_t*)(void*)address;
if (client_list_add(list, client) < 0)
- return -1;
+ {
+ pthread_mutex_unlock(&reg_mutex);
+ return -1;
+ }
}
else
{
client_list_t* list = malloc(sizeof(client_list_t));
void* address = list;
if (list == NULL)
- return perror(*argv), -1;
+ {
+ perror(*argv);
+ pthread_mutex_unlock(&reg_mutex);
+ return -1;
+ }
if (client_list_create(list, 1) ||
client_list_add(list, client) ||
(hash_table_put(&reg_table, command_key, (size_t)address) == 0))
@@ -431,6 +486,7 @@ int registry_action(size_t length, int action, const char* recv_client_id, const
perror(*argv);
client_list_destroy(list);
free(list);
+ pthread_mutex_unlock(&reg_mutex);
return -1;
}
}
@@ -448,11 +504,14 @@ int registry_action(size_t length, int action, const char* recv_client_id, const
perror(*argv);
hash_table_destroy(wait_set, NULL, NULL);
free(wait_set);
+ pthread_mutex_unlock(&reg_mutex);
return -1;
}
}
}
+ pthread_mutex_unlock(&reg_mutex);
+
if (action == 0)
{
/* TODO */
@@ -472,7 +531,79 @@ int registry_action(size_t length, int action, const char* recv_client_id, const
*/
int list_registry(const char* recv_client_id, const char* recv_message_id)
{
- /* TODO */
+ size_t ptr = 0, i;
+
+ if (send_buffer_size == 0)
+ {
+ if (xmalloc(send_buffer, 256, char))
+ {
+ perror(*argv);
+ return -1;
+ }
+ send_buffer_size = 256;
+ }
+
+ errno = pthread_mutex_lock(&reg_mutex);
+ if (errno)
+ {
+ perror(*argv);
+ return -1;
+ }
+
+ for (i = 0; i < reg_table.capacity; i++)
+ {
+ hash_entry_t* bucket = reg_table.buckets[i];
+ for (; bucket != NULL; bucket = bucket->next)
+ {
+ size_t key = bucket->key;
+ char* command = (char*)(void*)key;
+ size_t len = strlen(command);
+
+ while (ptr + len + 1 >= send_buffer_size)
+ {
+ char* old = send_buffer;
+ if (xrealloc(send_buffer, send_buffer_size <<= 1, char))
+ {
+ send_buffer = old;
+ send_buffer_size >>= 1;
+ perror(*argv);
+ pthread_mutex_unlock(&reg_mutex);
+ return -1;
+ }
+ }
+
+ memcpy(send_buffer + ptr, command, len * sizeof(char));
+ ptr += len;
+ send_buffer[ptr++] = '\n';
+ }
+ }
+
+ i = strlen(recv_message_id) + strlen(recv_client_id) + 10 + 19;
+ i += strlen("To: %s\nIn response to: %s\nMessage ID: %" PRIi32 "\nLength: %" PRIu64 "\n\n");
+
+ while (ptr + i >= send_buffer_size)
+ {
+ char* old = send_buffer;
+ if (xrealloc(send_buffer, send_buffer_size <<= 1, char))
+ {
+ send_buffer = old;
+ send_buffer_size >>= 1;
+ perror(*argv);
+ pthread_mutex_unlock(&reg_mutex);
+ return -1;
+ }
+ }
+
+ sprintf(send_buffer + ptr, "To: %s\nIn response to: %s\nMessage ID: %" PRIi32 "\nLength: %" PRIu64 "\n\n",
+ recv_message_id, recv_client_id, message_id, ptr);
+
+ message_id = message_id == INT32_MAX ? 0 : (message_id + 1);
+
+ pthread_mutex_unlock(&reg_mutex);
+
+ if (full_send(send_buffer + ptr, strlen(send_buffer + ptr)))
+ return 1;
+ return full_send(send_buffer, ptr);
}