diff options
Diffstat (limited to '')
-rw-r--r-- | src/mds-registry/registry.c | 64 |
1 files changed, 58 insertions, 6 deletions
diff --git a/src/mds-registry/registry.c b/src/mds-registry/registry.c index 98f5ad1..6fed005 100644 --- a/src/mds-registry/registry.c +++ b/src/mds-registry/registry.c @@ -51,6 +51,9 @@ static int handle_close_message(void) fail_if ((errno = pthread_mutex_lock(®_mutex))); + + /* Remove server for all protocols. */ + for (i = 0; i < received.header_count; i++) if (startswith(received.headers[i], "Client closed: ")) { @@ -59,11 +62,14 @@ static int handle_close_message(void) foreach_hash_table_entry (reg_table, j, entry) { + /* Remove server from list of servers that support the protocol, + once, if it is in the list. */ client_list_t* list = (client_list_t*)(void*)(entry->value); client_list_remove(list, client); if (list->size) continue; + /* If no servers support the protocol, list the protocol for removal. */ fail_if ((keys == NULL) && xmalloc(keys, size, size_t)); if (ptr == size ? growalloc(old_keys, keys, size, size_t) : 0) goto fail; @@ -71,6 +77,9 @@ static int handle_close_message(void) } } + + /* Remove protocol that no longer have any supporting servers. */ + for (i = 0; i < ptr; i++) { hash_entry_t* entry = hash_table_get_entry(®_table, keys[i]); @@ -92,6 +101,7 @@ static int handle_close_message(void) perror(*argv); fail: free(keys); + pthread_mutex_unlock(®_mutex); return -1; } @@ -109,6 +119,7 @@ static int registry_action_add(int has_key, char* command, size_t command_key, u { if (has_key) { + /* Add server to protocol if the protocol is already in the table. */ size_t address = hash_table_get(®_table, command_key); client_list_t* list = (client_list_t*)(void*)address; if (client_list_add(list, client) < 0) @@ -116,15 +127,20 @@ static int registry_action_add(int has_key, char* command, size_t command_key, u } else { + /* If the protocol is not already in the table. */ + + /* Allocate list of servers for the protocol. */ client_list_t* list = malloc(sizeof(client_list_t)); void* address = list; if (list == NULL) goto pfail; + /* Duplicate the protocol name so it can be accessed later. */ if ((command = strdup(command)) == NULL) { free(list); goto pfail; } + /* Create list of servers, add server to list and add the protocol to the table. */ command_key = (size_t)(void*)command; if (client_list_create(list, 1) || client_list_add(list, client) || @@ -156,7 +172,11 @@ static void registry_action_remove(size_t command_key, uint64_t client) hash_entry_t* entry = hash_table_get_entry(®_table, command_key); size_t address = entry->value; client_list_t* list = (client_list_t*)(void*)address; + + /* Remove server from protocol. */ client_list_remove(list, client); + + /* Remove protocol if no servers support it anymore. */ if (list->size == 0) { client_list_destroy(list); @@ -184,13 +204,16 @@ static int registry_action_act(char* command, int action, uint64_t client, hash_ if (action == 1) { + /* Register server to protocol. */ if (registry_action_add(has_key, command, command_key, client)) return -1; } else if ((action == -1) && has_key) + /* Unregister server from protocol. */ registry_action_remove(command_key, client); else if ((action == 0) && !has_key) { + /* Add protocl to wait set of not present in the protocol table. */ if ((command = strdup(command)) == NULL) goto pfail_wait; command_key = (size_t)(void*)command; @@ -229,6 +252,9 @@ static int registry_action(size_t length, int action, const char* recv_client_id hash_table_t* wait_set = NULL; size_t begin; + + /* If ‘Action: wait’, create a set for the protocols that are not already available. */ + if (action == 0) { wait_set = malloc(sizeof(hash_table_t)); @@ -242,6 +268,9 @@ static int registry_action(size_t length, int action, const char* recv_client_id wait_set->hasher = (hash_func*)string_hash; } + + /* If the payload buffer is full, increase it so we can fit another character. */ + if (received.payload_size == length) { if (growalloc(old, received.payload, received.payload_size, char)) @@ -254,8 +283,15 @@ static int registry_action(size_t length, int action, const char* recv_client_id payload = received.payload; } + + /* LF-terminate the payload, perhaps it did not have a terminal LF. */ + payload[length] = '\n'; + + /* For all protocols in the payload, either add or remove + them from or to the protocl table or the wait set. */ + fail_if ((errno = pthread_mutex_lock(®_mutex))); for (begin = 0; begin < length;) @@ -267,11 +303,15 @@ static int registry_action(size_t length, int action, const char* recv_client_id command[len] = '\0'; begin += len + 1; - if (registry_action_act(command, action, client, wait_set)) - goto fail_in_mutex; + if (len > 0) + if (registry_action_act(command, action, client, wait_set)) + goto fail_in_mutex; } pthread_mutex_unlock(®_mutex); + + + /* If ‘Action: wait’, start a new thread that waits for the protocols and the responds. */ if (action == 0) { @@ -303,6 +343,9 @@ static int list_registry(const char* recv_client_id, const char* recv_message_id size_t ptr = 0, i; hash_entry_t* entry; + + /* Allocate the send buffer for the first time, it cannot be doubled if it is zero. */ + if (send_buffer_size == 0) { fail_if (xmalloc(send_buffer, 256, char)); @@ -311,12 +354,16 @@ static int list_registry(const char* recv_client_id, const char* recv_message_id fail_if ((errno = pthread_mutex_lock(®_mutex))); + + /* Add all protocols to the send buffer. */ + foreach_hash_table_entry (reg_table, i, entry) { size_t key = entry->key; char* command = (char*)(void*)key; size_t len = strlen(command); + /* Make sure the send buffer can fit all protocols. */ while (ptr + len + 1 >= send_buffer_size) if (growalloc(old, send_buffer, send_buffer_size, char)) goto fail_in_mutex; @@ -326,22 +373,27 @@ static int list_registry(const char* recv_client_id, const char* recv_message_id send_buffer[ptr++] = '\n'; } + + /* Make sure the message headers can fit the send buffer. */ + i = strlen(recv_message_id) + strlen(recv_client_id) + 10 + 19; i += strlen("To: %s\nIn response to: %s\nMessage ID: %" PRIi32 "\nLength: %" PRIu64 "\n\n"); while (ptr + i >= send_buffer_size) - { - if (growalloc(old, send_buffer, send_buffer_size, char)) - goto fail_in_mutex; - } + if (growalloc(old, send_buffer, send_buffer_size, char)) + goto fail_in_mutex; + + /* Construct message headers. */ sprintf(send_buffer + ptr, "To: %s\nIn response to: %s\nMessage ID: %" PRIi32 "\nLength: %" PRIu64 "\n\n", recv_message_id, recv_client_id, message_id, ptr); + /* Increase message ID. */ message_id = message_id == INT32_MAX ? 0 : (message_id + 1); pthread_mutex_unlock(®_mutex); + /* Send message. */ if (full_send(send_buffer + ptr, strlen(send_buffer + ptr))) return 1; return full_send(send_buffer, ptr); |