From 12933adf2fc7e2d9814628d99259783d354d0563 Mon Sep 17 00:00:00 2001 From: Mattias Andrée Date: Mon, 28 Jul 2014 03:46:08 +0200 Subject: mds-registry: sync:ing and list action MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Mattias Andrée --- src/mds-registry.c | 149 +++++++++++++++++++++++++++++++++++++++++++++++++---- 1 file changed, 140 insertions(+), 9 deletions(-) diff --git a/src/mds-registry.c b/src/mds-registry.c index 8bd7057..c7ac518 100644 --- a/src/mds-registry.c +++ b/src/mds-registry.c @@ -73,6 +73,26 @@ static int connected = 1; */ static hash_table_t reg_table; +/** + * Reusable buffer for data to send + */ +static char* send_buffer = NULL; + +/** + * The size of `send_buffer` + */ +static size_t send_buffer_size = 0; + +/** + * General mutex + */ +static pthread_mutex_t reg_mutex; + +/** + * General condition + */ +static pthread_cond_t reg_cond; + /** @@ -83,6 +103,19 @@ static hash_table_t reg_table; */ int __attribute__((const)) preinitialise_server(void) { + if ((errno = pthread_mutex_init(®_mutex, NULL))) + { + perror(*argv); + return 1; + } + + if ((errno = pthread_cond_init(®_cond, NULL))) + { + perror(*argv); + pthread_mutex_destroy(®_mutex); + return 1; + } + return 0; } @@ -102,7 +135,7 @@ int initialise_server(void) "\n" "Command: register\n" "Client closed\n" /* TODO support not implemented yet */ - + /* -- NEXT MESSAGE -- */ "Command: reregister\n" "Message ID: 1\n" "\n"; @@ -194,6 +227,7 @@ int unmarshal_server(char* state_buf) r = mds_message_unmarshal(&received, state_buf); if (r) mds_message_destroy(&received); + /* TODO unmarshal ®_table */ reg_table.key_comparator = (compare_func*)string_comparator; reg_table.hasher = (hash_func*)string_hash; return r; @@ -219,6 +253,8 @@ int __attribute__((const)) reexec_failure_recover(void) */ int master_loop(void) { + int rc = 1; + while (!reexecing && !terminating) { int r = mds_message_read(&received, socket_fd); @@ -248,15 +284,20 @@ int master_loop(void) connected = 1; } - /* TODO if !reexecing or failing, cleanup reg_table */ - - mds_message_destroy(&received); - return 0; + rc = 0; + goto fail; pfail: perror(*argv); fail: + if (rc || !reexecing) + { + /* TODO cleanup reg_table */ + } + pthread_mutex_destroy(®_mutex); + pthread_cond_destroy(®_cond); mds_message_destroy(&received); - return 1; + free(send_buffer); + return rc; } @@ -400,6 +441,13 @@ int registry_action(size_t length, int action, const char* recv_client_id, const payload[length] = '\n'; + errno = pthread_mutex_lock(®_mutex); + if (errno) + { + perror(*argv); + return -1; + } + for (begin = 0; begin < length;) { char* end = rawmemchr(payload + begin, '\n'); @@ -416,14 +464,21 @@ int registry_action(size_t length, int action, const char* recv_client_id, const size_t address = hash_table_get(®_table, command_key); client_list_t* list = (client_list_t*)(void*)address; if (client_list_add(list, client) < 0) - return -1; + { + pthread_mutex_unlock(®_mutex); + return -1; + } } else { client_list_t* list = malloc(sizeof(client_list_t)); void* address = list; if (list == NULL) - return perror(*argv), -1; + { + perror(*argv); + pthread_mutex_unlock(®_mutex); + return -1; + } if (client_list_create(list, 1) || client_list_add(list, client) || (hash_table_put(®_table, command_key, (size_t)address) == 0)) @@ -431,6 +486,7 @@ int registry_action(size_t length, int action, const char* recv_client_id, const perror(*argv); client_list_destroy(list); free(list); + pthread_mutex_unlock(®_mutex); return -1; } } @@ -448,11 +504,14 @@ int registry_action(size_t length, int action, const char* recv_client_id, const perror(*argv); hash_table_destroy(wait_set, NULL, NULL); free(wait_set); + pthread_mutex_unlock(®_mutex); return -1; } } } + pthread_mutex_unlock(®_mutex); + if (action == 0) { /* TODO */ @@ -472,7 +531,79 @@ int registry_action(size_t length, int action, const char* recv_client_id, const */ int list_registry(const char* recv_client_id, const char* recv_message_id) { - /* TODO */ + size_t ptr = 0, i; + + if (send_buffer_size == 0) + { + if (xmalloc(send_buffer, 256, char)) + { + perror(*argv); + return -1; + } + send_buffer_size = 256; + } + + errno = pthread_mutex_lock(®_mutex); + if (errno) + { + perror(*argv); + return -1; + } + + for (i = 0; i < reg_table.capacity; i++) + { + hash_entry_t* bucket = reg_table.buckets[i]; + for (; bucket != NULL; bucket = bucket->next) + { + size_t key = bucket->key; + char* command = (char*)(void*)key; + size_t len = strlen(command); + + while (ptr + len + 1 >= send_buffer_size) + { + char* old = send_buffer; + if (xrealloc(send_buffer, send_buffer_size <<= 1, char)) + { + send_buffer = old; + send_buffer_size >>= 1; + perror(*argv); + pthread_mutex_unlock(®_mutex); + return -1; + } + } + + memcpy(send_buffer + ptr, command, len * sizeof(char)); + ptr += len; + send_buffer[ptr++] = '\n'; + } + } + + i = strlen(recv_message_id) + strlen(recv_client_id) + 10 + 19; + i += strlen("To: %s\nIn response to: %s\nMessage ID: %" PRIi32 "\nLength: %" PRIu64 "\n\n"); + + while (ptr + i >= send_buffer_size) + { + char* old = send_buffer; + if (xrealloc(send_buffer, send_buffer_size <<= 1, char)) + { + send_buffer = old; + send_buffer_size >>= 1; + perror(*argv); + pthread_mutex_unlock(®_mutex); + return -1; + } + } + + sprintf(send_buffer + ptr, "To: %s\nIn response to: %s\nMessage ID: %" PRIi32 "\nLength: %" PRIu64 "\n\n", + recv_message_id, recv_client_id, message_id, ptr); + + message_id = message_id == INT32_MAX ? 0 : (message_id + 1); + + pthread_mutex_unlock(®_mutex); + + if (full_send(send_buffer + ptr, strlen(send_buffer + ptr))) + return 1; + return full_send(send_buffer, ptr); } -- cgit v1.2.3-70-g09d2