aboutsummaryrefslogtreecommitdiffstats
path: root/src/mds-registry/registry.c
diff options
context:
space:
mode:
Diffstat (limited to '')
-rw-r--r--src/mds-registry/registry.c68
1 files changed, 28 insertions, 40 deletions
diff --git a/src/mds-registry/registry.c b/src/mds-registry/registry.c
index 6fed005..8c0132e 100644
--- a/src/mds-registry/registry.c
+++ b/src/mds-registry/registry.c
@@ -19,6 +19,7 @@
#include "util.h"
#include "globals.h"
+#include "slave.h"
#include "../mds-base.h"
@@ -31,6 +32,7 @@
#include <string.h>
#include <stdio.h>
#include <stdlib.h>
+#include <pthread.h>
@@ -43,14 +45,14 @@
static int handle_close_message(void)
{
/* Servers do not close too often, there is no need to
- optimise this with another hash table. */
+ optimise this with another hash table. Doing so would
+ also require some caution because the keys are 32-bit
+ on 32-bit computers, and the client ID:s are 64-bit. */
size_t i, j, ptr = 0, size = 1;
size_t* keys = NULL;
size_t* old_keys;
- fail_if ((errno = pthread_mutex_lock(&reg_mutex)));
-
/* Remove server for all protocols. */
@@ -75,9 +77,19 @@ static int handle_close_message(void)
goto fail;
keys[ptr++] = entry->key;
}
+
+
+ /* Mark client as closed. */
+
+ close_slaves(client);
}
+ /* Close slaves those clients have closed. */
+
+ with_mutex (slave_mutex, pthread_cond_broadcast(&slave_cond););
+
+
/* Remove protocol that no longer have any supporting servers. */
for (i = 0; i < ptr; i++)
@@ -93,15 +105,12 @@ static int handle_close_message(void)
free(command);
}
- pthread_mutex_unlock(&reg_mutex);
-
free(keys);
return 0;
pfail:
perror(*argv);
fail:
free(keys);
- pthread_mutex_unlock(&reg_mutex);
return -1;
}
@@ -153,6 +162,9 @@ static int registry_action_add(int has_key, char* command, size_t command_key, u
}
}
+ /* Notify slaves. */
+ fail_if (advance_slaves(command));
+
return 0;
pfail:
perror(*argv);
@@ -258,11 +270,13 @@ static int registry_action(size_t length, int action, const char* recv_client_id
if (action == 0)
{
wait_set = malloc(sizeof(hash_table_t));
+ if (wait_set == NULL)
+ return -1;
if (hash_table_create(wait_set))
{
hash_table_destroy(wait_set, NULL, NULL);
free(wait_set);
- goto pfail;
+ return -1;
}
wait_set->key_comparator = (compare_func*)string_comparator;
wait_set->hasher = (hash_func*)string_hash;
@@ -292,8 +306,6 @@ static int registry_action(size_t length, int action, const char* recv_client_id
/* For all protocols in the payload, either add or remove
them from or to the protocl table or the wait set. */
- fail_if ((errno = pthread_mutex_lock(&reg_mutex)));
-
for (begin = 0; begin < length;)
{
char* end = rawmemchr(payload + begin, '\n');
@@ -305,28 +317,16 @@ static int registry_action(size_t length, int action, const char* recv_client_id
if (len > 0)
if (registry_action_act(command, action, client, wait_set))
- goto fail_in_mutex;
+ return -1;
}
- pthread_mutex_unlock(&reg_mutex);
-
/* If ‘Action: wait’, start a new thread that waits for the protocols and the responds. */
if (action == 0)
- {
- /* FIXME */
- }
+ return start_slave(wait_set, recv_client_id, recv_message_id);
return 0;
-
-
- pfail:
- perror(*argv);
- return -1;
- fail_in_mutex:
- pthread_mutex_unlock(&reg_mutex);
- return -1;
}
@@ -348,12 +348,11 @@ static int list_registry(const char* recv_client_id, const char* recv_message_id
if (send_buffer_size == 0)
{
- fail_if (xmalloc(send_buffer, 256, char));
+ if (xmalloc(send_buffer, 256, char))
+ return -1;
send_buffer_size = 256;
}
- fail_if ((errno = pthread_mutex_lock(&reg_mutex)));
-
/* Add all protocols to the send buffer. */
@@ -366,7 +365,7 @@ static int list_registry(const char* recv_client_id, const char* recv_message_id
/* Make sure the send buffer can fit all protocols. */
while (ptr + len + 1 >= send_buffer_size)
if (growalloc(old, send_buffer, send_buffer_size, char))
- goto fail_in_mutex;
+ return -1;
memcpy(send_buffer + ptr, command, len * sizeof(char));
ptr += len;
@@ -381,7 +380,7 @@ static int list_registry(const char* recv_client_id, const char* recv_message_id
while (ptr + i >= send_buffer_size)
if (growalloc(old, send_buffer, send_buffer_size, char))
- goto fail_in_mutex;
+ return -1;
/* Construct message headers. */
@@ -391,21 +390,10 @@ static int list_registry(const char* recv_client_id, const char* recv_message_id
/* Increase message ID. */
message_id = message_id == INT32_MAX ? 0 : (message_id + 1);
- pthread_mutex_unlock(&reg_mutex);
-
/* Send message. */
if (full_send(send_buffer + ptr, strlen(send_buffer + ptr)))
return 1;
return full_send(send_buffer, ptr);
-
-
- fail_in_mutex:
- pthread_mutex_unlock(&reg_mutex);
- return -1;
-
- pfail:
- perror(*argv);
- return -1;
}
@@ -413,7 +401,7 @@ static int list_registry(const char* recv_client_id, const char* recv_message_id
* Handle the received message containing ‘Command: register’-header–value
*
* @return Zero on success -1 on error or interruption,
- * errno will be set accordingly
+ * `errno` will be set accordingly
*/
static int handle_register_message(void)
{