diff options
Diffstat (limited to '')
-rw-r--r-- | src/mds-registry/registry.c | 68 |
1 files changed, 28 insertions, 40 deletions
diff --git a/src/mds-registry/registry.c b/src/mds-registry/registry.c index 6fed005..8c0132e 100644 --- a/src/mds-registry/registry.c +++ b/src/mds-registry/registry.c @@ -19,6 +19,7 @@ #include "util.h" #include "globals.h" +#include "slave.h" #include "../mds-base.h" @@ -31,6 +32,7 @@ #include <string.h> #include <stdio.h> #include <stdlib.h> +#include <pthread.h> @@ -43,14 +45,14 @@ static int handle_close_message(void) { /* Servers do not close too often, there is no need to - optimise this with another hash table. */ + optimise this with another hash table. Doing so would + also require some caution because the keys are 32-bit + on 32-bit computers, and the client ID:s are 64-bit. */ size_t i, j, ptr = 0, size = 1; size_t* keys = NULL; size_t* old_keys; - fail_if ((errno = pthread_mutex_lock(®_mutex))); - /* Remove server for all protocols. */ @@ -75,9 +77,19 @@ static int handle_close_message(void) goto fail; keys[ptr++] = entry->key; } + + + /* Mark client as closed. */ + + close_slaves(client); } + /* Close slaves those clients have closed. */ + + with_mutex (slave_mutex, pthread_cond_broadcast(&slave_cond);); + + /* Remove protocol that no longer have any supporting servers. */ for (i = 0; i < ptr; i++) @@ -93,15 +105,12 @@ static int handle_close_message(void) free(command); } - pthread_mutex_unlock(®_mutex); - free(keys); return 0; pfail: perror(*argv); fail: free(keys); - pthread_mutex_unlock(®_mutex); return -1; } @@ -153,6 +162,9 @@ static int registry_action_add(int has_key, char* command, size_t command_key, u } } + /* Notify slaves. */ + fail_if (advance_slaves(command)); + return 0; pfail: perror(*argv); @@ -258,11 +270,13 @@ static int registry_action(size_t length, int action, const char* recv_client_id if (action == 0) { wait_set = malloc(sizeof(hash_table_t)); + if (wait_set == NULL) + return -1; if (hash_table_create(wait_set)) { hash_table_destroy(wait_set, NULL, NULL); free(wait_set); - goto pfail; + return -1; } wait_set->key_comparator = (compare_func*)string_comparator; wait_set->hasher = (hash_func*)string_hash; @@ -292,8 +306,6 @@ static int registry_action(size_t length, int action, const char* recv_client_id /* For all protocols in the payload, either add or remove them from or to the protocl table or the wait set. */ - fail_if ((errno = pthread_mutex_lock(®_mutex))); - for (begin = 0; begin < length;) { char* end = rawmemchr(payload + begin, '\n'); @@ -305,28 +317,16 @@ static int registry_action(size_t length, int action, const char* recv_client_id if (len > 0) if (registry_action_act(command, action, client, wait_set)) - goto fail_in_mutex; + return -1; } - pthread_mutex_unlock(®_mutex); - /* If ‘Action: wait’, start a new thread that waits for the protocols and the responds. */ if (action == 0) - { - /* FIXME */ - } + return start_slave(wait_set, recv_client_id, recv_message_id); return 0; - - - pfail: - perror(*argv); - return -1; - fail_in_mutex: - pthread_mutex_unlock(®_mutex); - return -1; } @@ -348,12 +348,11 @@ static int list_registry(const char* recv_client_id, const char* recv_message_id if (send_buffer_size == 0) { - fail_if (xmalloc(send_buffer, 256, char)); + if (xmalloc(send_buffer, 256, char)) + return -1; send_buffer_size = 256; } - fail_if ((errno = pthread_mutex_lock(®_mutex))); - /* Add all protocols to the send buffer. */ @@ -366,7 +365,7 @@ static int list_registry(const char* recv_client_id, const char* recv_message_id /* Make sure the send buffer can fit all protocols. */ while (ptr + len + 1 >= send_buffer_size) if (growalloc(old, send_buffer, send_buffer_size, char)) - goto fail_in_mutex; + return -1; memcpy(send_buffer + ptr, command, len * sizeof(char)); ptr += len; @@ -381,7 +380,7 @@ static int list_registry(const char* recv_client_id, const char* recv_message_id while (ptr + i >= send_buffer_size) if (growalloc(old, send_buffer, send_buffer_size, char)) - goto fail_in_mutex; + return -1; /* Construct message headers. */ @@ -391,21 +390,10 @@ static int list_registry(const char* recv_client_id, const char* recv_message_id /* Increase message ID. */ message_id = message_id == INT32_MAX ? 0 : (message_id + 1); - pthread_mutex_unlock(®_mutex); - /* Send message. */ if (full_send(send_buffer + ptr, strlen(send_buffer + ptr))) return 1; return full_send(send_buffer, ptr); - - - fail_in_mutex: - pthread_mutex_unlock(®_mutex); - return -1; - - pfail: - perror(*argv); - return -1; } @@ -413,7 +401,7 @@ static int list_registry(const char* recv_client_id, const char* recv_message_id * Handle the received message containing ‘Command: register’-header–value * * @return Zero on success -1 on error or interruption, - * errno will be set accordingly + * `errno` will be set accordingly */ static int handle_register_message(void) { |