From 9e8dec188d55ca1f0a3b33acab702ced8ed07a18 Mon Sep 17 00:00:00 2001 From: Mattias Andrée Date: Sun, 5 Nov 2017 00:09:50 +0100 Subject: Work on changing style, and an important typo fix MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Mattias Andrée --- src/mds-registry/registry.c | 699 +++++++++++++++++++++----------------------- 1 file changed, 333 insertions(+), 366 deletions(-) (limited to 'src/mds-registry/registry.c') diff --git a/src/mds-registry/registry.c b/src/mds-registry/registry.c index 77cd463..28b38d1 100644 --- a/src/mds-registry/registry.c +++ b/src/mds-registry/registry.c @@ -44,8 +44,8 @@ * @param length:size_t The length of the message * @return :int Zero on success, -1 on error */ -#define full_send(message, length) \ - ((full_send)(socket_fd, message, length)) +#define full_send(message, length)\ + ((full_send)(socket_fd, message, length)) /** @@ -54,74 +54,69 @@ * @return Zero on success -1 on error or interruption, * `errno` will be set accordingly */ -static int handle_close_message(void) +static int +handle_close_message(void) { - /* Servers do not close too often, there is no need to - optimise this with another hash table. Doing so would - also require some caution because the keys are 32-bit - on 32-bit computers, and the client ID:s are 64-bit. */ - - size_t i, j, ptr = 0, size = 1; - size_t* keys = NULL; - size_t* old_keys; - - - /* Remove server for all protocols. */ - - for (i = 0; i < received.header_count; i++) - if (startswith(received.headers[i], "Client closed: ")) - { - uint64_t client = parse_client_id(received.headers[i] + strlen("Client closed: ")); - hash_entry_t* entry; - - foreach_hash_table_entry (reg_table, j, entry) - { - /* Remove server from list of servers that support the protocol, - once, if it is in the list. */ - client_list_t* list = (client_list_t*)(void*)(entry->value); - client_list_remove(list, client); - if (list->size) - continue; - - /* If no servers support the protocol, list the protocol for removal. */ - fail_if ((keys == NULL) && xmalloc(keys, size, size_t)); - fail_if (ptr == size ? growalloc(old_keys, keys, size, size_t) : 0); - keys[ptr++] = entry->key; - } - - - /* Mark client as closed. */ + /* Servers do not close too often, there is no need to + optimise this with another hash table. Doing so would + also require some caution because the keys are 32-bit + on 32-bit computers, and the client ID:s are 64-bit. */ + + size_t i, j, ptr = 0, size = 1; + size_t *keys = NULL; + size_t *old_keys; + uint64_t client; + hash_entry_t *entry; + client_list_t *list; + char *command; + + /* Remove server for all protocols. */ + for (i = 0; i < received.header_count; i++) { + if (startswith(received.headers[i], "Client closed: ")) { + client = parse_client_id(received.headers[i] + strlen("Client closed: ")); + + foreach_hash_table_entry (reg_table, j, entry) { + /* Remove server from list of servers that support the protocol, + once, if it is in the list. */ + list = (void *)(entry->value); + client_list_remove(list, client); + if (list->size) + continue; + + /* If no servers support the protocol, list the protocol for removal. */ + fail_if (!keys && xmalloc(keys, size, size_t)); + fail_if (ptr == size ? growalloc(old_keys, keys, size, size_t) : 0); + keys[ptr++] = entry->key; + } + - close_slaves(client); - } - - - /* Close slaves those clients have closed. */ - - with_mutex (slave_mutex, pthread_cond_broadcast(&slave_cond);); - - - /* Remove protocol that no longer have any supporting servers. */ - - for (i = 0; i < ptr; i++) - { - hash_entry_t* entry = hash_table_get_entry(®_table, keys[i]); - client_list_t* list = (client_list_t*)(void*)(entry->value); - char* command = (char*)(void*)(entry->key); - - hash_table_remove(®_table, entry->key); - - client_list_destroy(list); - free(list); - free(command); - } - - free(keys); - return 0; - fail: - xperror(*argv); - free(keys); - return -1; + /* Mark client as closed. */ + close_slaves(client); + } + } + + /* Close slaves those clients have closed. */ + with_mutex (slave_mutex, pthread_cond_broadcast(&slave_cond);); + + /* Remove protocol that no longer have any supporting servers. */ + for (i = 0; i < ptr; i++) { + entry = hash_table_get_entry(®_table, keys[i]); + list = (void *)(entry->value); + command = (void *)(entry->key); + + hash_table_remove(®_table, entry->key); + + client_list_destroy(list); + free(list); + free(command); + } + + free(keys); + return 0; +fail: + xperror(*argv); + free(keys); + return -1; } @@ -134,54 +129,50 @@ static int handle_close_message(void) * @param client The ID of the client that implements the server-side of the protocol * @return Non-zero on error */ -__attribute__((nonnull)) -static int registry_action_add(int has_key, char* command, size_t command_key, uint64_t client) +static int __attribute__((nonnull)) +registry_action_add(int has_key, char *command, size_t command_key, uint64_t client) { - int saved_errno; - - if (has_key) - { - /* Add server to protocol if the protocol is already in the table. */ - size_t address = hash_table_get(®_table, command_key); - client_list_t* list = (client_list_t*)(void*)address; - fail_if (client_list_add(list, client) < 0); - } - else - { - /* If the protocol is not already in the table. */ - - /* Allocate list of servers for the protocol. */ - client_list_t* list; - void* address; - fail_if (xmalloc(address = list, 1, client_list_t)); - /* Duplicate the protocol name so it can be accessed later. */ - if (xstrdup_nn(command, command)) - { - saved_errno = errno, free(list), errno = saved_errno; - fail_if (1); - } - /* Create list of servers, add server to list and add the protocol to the table. */ - command_key = (size_t)(void*)command; - if (client_list_create(list, 1) || - client_list_add(list, client) || - (hash_table_put(®_table, command_key, (size_t)address) == 0)) - { - saved_errno = errno; - client_list_destroy(list); - free(list); - free(command); - errno = saved_errno; - fail_if (1); + int saved_errno; + client_list_t *list; + size_t address; + void *paddress; + + if (has_key) { + /* Add server to protocol if the protocol is already in the table. */ + address = hash_table_get(®_table, command_key); + list = (void *)address; + fail_if (client_list_add(list, client) < 0); + } else { + /* If the protocol is not already in the table. */ + + /* Allocate list of servers for the protocol. */ + fail_if (xmalloc(paddress = list, 1, client_list_t)); + /* Duplicate the protocol name so it can be accessed later. */ + if (xstrdup_nn(command, command)) { + saved_errno = errno, free(list), errno = saved_errno; + fail_if (1); + } + /* Create list of servers, add server to list and add the protocol to the table. */ + command_key = (size_t)(void*)command; + if (client_list_create(list, 1) || + client_list_add(list, client) || + !hash_table_put(®_table, command_key, (size_t)paddress)) { + saved_errno = errno; + client_list_destroy(list); + free(list); + free(command); + errno = saved_errno; + fail_if (1); + } } - } - - /* Notify slaves. */ - fail_if (advance_slaves(command)); - - return 0; - fail: - xperror(*argv); - return -1; + + /* Notify slaves. */ + fail_if (advance_slaves(command)); + + return 0; +fail: + xperror(*argv); + return -1; } @@ -192,23 +183,23 @@ static int registry_action_add(int has_key, char* command, size_t command_key, u * @param client The ID of the client that implements the server-side of the protocol * @return Non-zero on error */ -static void registry_action_remove(size_t command_key, uint64_t client) +static void +registry_action_remove(size_t command_key, uint64_t client) { - hash_entry_t* entry = hash_table_get_entry(®_table, command_key); - size_t address = entry->value; - client_list_t* list = (client_list_t*)(void*)address; - - /* Remove server from protocol. */ - client_list_remove(list, client); - - /* Remove protocol if no servers support it anymore. */ - if (list->size == 0) - { - client_list_destroy(list); - free(list); - hash_table_remove(®_table, command_key); - reg_table_free_key(entry->key); - } + hash_entry_t *entry = hash_table_get_entry(®_table, command_key); + size_t address = entry->value; + client_list_t *list = (void *)address; + + /* Remove server from protocol. */ + client_list_remove(list, client); + + /* Remove protocol if no servers support it anymore. */ + if (!list->size) { + client_list_destroy(list); + free(list); + hash_table_remove(®_table, command_key); + reg_table_free_key(entry->key); + } } @@ -222,40 +213,46 @@ static void registry_action_remove(size_t command_key, uint64_t client) * @param wait_set Table to fill with missing protocols if `action == 0` * @return Non-zero on error */ -__attribute__((nonnull)) -static int registry_action_act(char* command, int action, uint64_t client, hash_table_t* wait_set) +static int __attribute__((nonnull)) +registry_action_act(char *command, int action, uint64_t client, hash_table_t *wait_set) { - size_t command_key = (size_t)(void*)command; - int has_key = hash_table_contains_key(®_table, command_key); - int saved_errno; - - if (action == 1) - { - /* Register server to protocol. */ - fail_if (registry_action_add(has_key, command, command_key, client)); - } - else if ((action == -1) && has_key) - /* Unregister server from protocol. */ - registry_action_remove(command_key, client); - else if ((action == 0) && !has_key) - { - /* Add protocol to wait set of not present in the protocol table. */ - fail_if (xstrdup_nn(command, command)); - command_key = (size_t)(void*)command; - if (hash_table_put(wait_set, command_key, 1) == 0) - if (errno) - { - saved_errno = errno, free(command), errno = saved_errno; - fail_if (1); - } - } - - return 0; - fail: - xperror(*argv); - if (action != 1) - hash_table_destroy(wait_set, (free_func*)reg_table_free_key, NULL), free(wait_set); - return -1; + size_t command_key = (size_t)(void *)command; + int has_key = hash_table_contains_key(®_table, command_key); + int saved_errno; + + switch (action) { + case 1: + /* Register server to protocol. */ + fail_if (registry_action_add(has_key, command, command_key, client)); + break; + case -1: + if (has_key) + /* Unregister server from protocol. */ + registry_action_remove(command_key, client); + break; + case 0: + if (has_key) + break; + /* Add protocol to wait set of not present in the protocol table. */ + fail_if (xstrdup_nn(command, command)); + command_key = (size_t)(void*)command; + if (!hash_table_put(wait_set, command_key, 1) && errno) { + saved_errno = errno, free(command), errno = saved_errno; + fail_if (1); + } + break; + default: + break; + } + + return 0; +fail: + xperror(*argv); + if (action != 1) { + hash_table_destroy(wait_set, (free_func *)reg_table_free_key, NULL); + free(wait_set); + } + return -1; } @@ -270,71 +267,57 @@ static int registry_action_act(char* command, int action, uint64_t client, hash_ * @return Zero on success -1 on error or interruption, * `errno` will be set accordingly */ -__attribute__((nonnull)) -static int registry_action(size_t length, int action, const char* recv_client_id, const char* recv_message_id) +static int __attribute__((nonnull)) +registry_action(size_t length, int action, const char *recv_client_id, const char *recv_message_id) { - char* payload = received.payload; - uint64_t client = action ? parse_client_id(recv_client_id) : 0; - hash_table_t* wait_set = NULL; - size_t begin; - int saved_errno; - - - /* If ‘Action: wait’, create a set for the protocols that are not already available. */ - - if (action == 0) - { - fail_if (xmalloc(wait_set, 1, hash_table_t)); - fail_if (hash_table_create(wait_set)); - wait_set->key_comparator = (compare_func*)string_comparator; - wait_set->hasher = (hash_func*)string_hash; - } - - - /* If the payload buffer is full, increase it so we can fit another character. */ - - if (received.payload_size == length) - { - fail_if (growalloc(old, received.payload, received.payload_size, char)); - payload = received.payload; - } - - - /* LF-terminate the payload, perhaps it did not have a terminal LF. */ - - payload[length] = '\n'; - - - /* For all protocols in the payload, either add or remove - them from or to the protocl table or the wait set. */ - - for (begin = 0; begin < length;) - { - char* end = rawmemchr(payload + begin, '\n'); - size_t len = (size_t)(end - payload) - begin - 1; - char* command = payload + begin; - - command[len] = '\0'; - begin += len + 1; - - if (len > 0) - if (registry_action_act(command, action, client, wait_set)) - fail_if (wait_set = NULL, 1); - } - - - /* If ‘Action: wait’, start a new thread that waits for the protocols and the responds. */ - - if (action == 0) - if (start_slave(wait_set, recv_client_id, recv_message_id)) - fail_if (wait_set = NULL, 1); - - return 0; - fail: - saved_errno = errno; - if (wait_set != NULL) - hash_table_destroy(wait_set, NULL, NULL), free(wait_set); - return errno = saved_errno, -1; + char *payload = received.payload; + uint64_t client = action ? parse_client_id(recv_client_id) : 0; + hash_table_t *wait_set = NULL; + size_t begin, len; + int saved_errno; + char *end, *command; + + /* If ‘Action: wait’, create a set for the protocols that are not already available. */ + if (!action) { + fail_if (xmalloc(wait_set, 1, hash_table_t)); + fail_if (hash_table_create(wait_set)); + wait_set->key_comparator = (compare_func*)string_comparator; + wait_set->hasher = (hash_func*)string_hash; + } + + /* If the payload buffer is full, increase it so we can fit another character. */ + if (received.payload_size == length) { + fail_if (growalloc(old, received.payload, received.payload_size, char)); + payload = received.payload; + } + + /* LF-terminate the payload, perhaps it did not have a terminal LF. */ + payload[length] = '\n'; + + /* For all protocols in the payload, either add or remove + them from or to the protocl table or the wait set. */ + for (begin = 0; begin < length;) { + end = rawmemchr(payload + begin, '\n'); + len = (size_t)(end - payload) - begin - 1; + command = payload + begin; + + command[len] = '\0'; + begin += len + 1; + + if (len > 0 && registry_action_act(command, action, client, wait_set)) + fail_if (wait_set = NULL, 1); + } + + /* If ‘Action: wait’, start a new thread that waits for the protocols and the responds. */ + if (!action && start_slave(wait_set, recv_client_id, recv_message_id)) + fail_if (wait_set = NULL, 1); + + return 0; +fail: + saved_errno = errno; + if (wait_set) + hash_table_destroy(wait_set, NULL, NULL), free(wait_set); + return errno = saved_errno, -1; } @@ -346,73 +329,66 @@ static int registry_action(size_t length, int action, const char* recv_client_id * @return Zero on success, -1 on error or interruption, * `errno` will be set accordingly */ -__attribute__((nonnull)) -static int list_registry(const char* recv_client_id, const char* recv_message_id) +static int __attribute__((nonnull)) +list_registry(const char *recv_client_id, const char *recv_message_id) { - size_t ptr = 0, i; - hash_entry_t* entry; - - - /* Allocate the send buffer for the first time, it cannot be doubled if it is zero. */ - - if (send_buffer_size == 0) - { - fail_if (xmalloc(send_buffer, 256, char)); - send_buffer_size = 256; - } - - - /* Add all protocols to the send buffer. */ - - foreach_hash_table_entry (reg_table, i, entry) - { - size_t key = entry->key; - char* command = (char*)(void*)key; - size_t len = strlen(command); - - /* Make sure the send buffer can fit all protocols. */ - while (ptr + len + 1 >= send_buffer_size) - fail_if (growalloc(old, send_buffer, send_buffer_size, char)); - - memcpy(send_buffer + ptr, command, len * sizeof(char)); - ptr += len; - send_buffer[ptr++] = '\n'; - } - - - /* Make sure the message headers can fit the send buffer. */ - - i = sizeof("To: \n" - "In response to: \n" - "Message ID: \n" - "Origin command: register\n" - "Length: \n" - "\n") / sizeof(char) - 1; - i += strlen(recv_message_id) + strlen(recv_client_id) + 10 + 19; - - while (ptr + i >= send_buffer_size) - fail_if (growalloc(old, send_buffer, send_buffer_size, char)); - - - /* Construct message headers. */ - sprintf(send_buffer + ptr, - "To: %s\n" - "In response to: %s\n" - "Message ID: %" PRIu32 "\n" - "Origin command: register\n" - "Length: %" PRIu64 "\n" - "\n", - recv_client_id, recv_message_id, message_id, ptr); - - /* Increase message ID. */ - with_mutex (slave_mutex, message_id = message_id == UINT32_MAX ? 0 : (message_id + 1);); - - /* Send message. */ - fail_if (full_send(send_buffer + ptr, strlen(send_buffer + ptr))); - fail_if (full_send(send_buffer, ptr)); - return 0; - fail: - return -1; + size_t ptr = 0, i, key, len; + hash_entry_t *entry; + char *command; + + /* Allocate the send buffer for the first time, it cannot be doubled if it is zero. */ + if (!send_buffer_size) { + fail_if (xmalloc(send_buffer, 256, char)); + send_buffer_size = 256; + } + + /* Add all protocols to the send buffer. */ + + foreach_hash_table_entry (reg_table, i, entry) { + key = entry->key; + command = (char*)(void*)key; + len = strlen(command); + + /* Make sure the send buffer can fit all protocols. */ + while (ptr + len + 1 >= send_buffer_size) + fail_if (growalloc(old, send_buffer, send_buffer_size, char)); + + memcpy(send_buffer + ptr, command, len * sizeof(char)); + ptr += len; + send_buffer[ptr++] = '\n'; + } + + /* Make sure the message headers can fit the send buffer. */ + i = sizeof("To: \n" + "In response to: \n" + "Message ID: \n" + "Origin command: register\n" + "Length: \n" + "\n") / sizeof(char) - 1; + i += strlen(recv_message_id) + strlen(recv_client_id) + 10 + 19; + + while (ptr + i >= send_buffer_size) + fail_if (growalloc(old, send_buffer, send_buffer_size, char)); + + /* Construct message headers. */ + sprintf(send_buffer + ptr, + "To: %s\n" + "In response to: %s\n" + "Message ID: %" PRIu32 "\n" + "Origin command: register\n" + "Length: %" PRIu64 "\n" + "\n", + recv_client_id, recv_message_id, message_id, ptr); + + /* Increase message ID. */ + with_mutex (slave_mutex, message_id = message_id == UINT32_MAX ? 0 : (message_id + 1);); + + /* Send message. */ + fail_if (full_send(send_buffer + ptr, strlen(send_buffer + ptr))); + fail_if (full_send(send_buffer, ptr)); + return 0; +fail: + return -1; } @@ -422,71 +398,63 @@ static int list_registry(const char* recv_client_id, const char* recv_message_id * @return Zero on success -1 on error or interruption, * `errno` will be set accordingly */ -static int handle_register_message(void) +static int +handle_register_message(void) { - /* Fetch message headers. */ - - const char* recv_client_id = NULL; - const char* recv_message_id = NULL; - const char* recv_length = NULL; - const char* recv_action = NULL; - size_t i, length = 0; - -#define __get_header(storage, header) \ - (startswith(received.headers[i], header)) \ - storage = received.headers[i] + strlen(header) - - for (i = 0; i < received.header_count; i++) - { - if __get_header(recv_client_id, "Client ID: "); - else if __get_header(recv_message_id, "Message ID: "); - else if __get_header(recv_length, "Length: "); - else if __get_header(recv_action, "Action: "); - else - continue; - - /* Stop if we got all headers we recognised, except ‘Time to live’. */ - if (recv_client_id && recv_message_id && recv_length && recv_action) - break; - } - + /* Fetch message headers. */ + const char *recv_client_id = NULL; + const char *recv_message_id = NULL; + const char *recv_length = NULL; + const char *recv_action = NULL; + size_t i, length = 0; + +#define __get_header(storage, header)\ + (startswith(received.headers[i], header))\ + storage = received.headers[i] + strlen(header) + + for (i = 0; i < received.header_count; i++) { + if __get_header(recv_client_id, "Client ID: "); + else if __get_header(recv_message_id, "Message ID: "); + else if __get_header(recv_length, "Length: "); + else if __get_header(recv_action, "Action: "); + else + continue; + + /* Stop if we got all headers we recognised, except ‘Time to live’. */ + if (recv_client_id && recv_message_id && recv_length && recv_action) + break; + } + #undef __get_header - - - /* Validate headers. */ - - if ((recv_client_id == NULL) || (strequals(recv_client_id, "0:0"))) - return eprint("received message from anonymous sender, ignoring."), 0; - else if (strchr(recv_client_id, ':') == NULL) - return eprint("received message from sender without a colon it its ID, ignoring, invalid ID."), 0; - else if ((recv_length == NULL) && ((recv_action == NULL) || !strequals(recv_action, "list"))) - return eprint("received empty message without `Action: list`, ignoring, has no effect."), 0; - else if (recv_message_id == NULL) - return eprint("received message without ID, ignoring, master server is misbehaving."), 0; - - - /* Get message length, and make sure the action is defined. */ - - if (recv_length != NULL) - length = atoz(recv_length); - if (recv_action != NULL) - recv_action = "add"; - - - /* Perform action. */ - -#define __registry_action(action) registry_action(length, action, recv_client_id, recv_message_id) - - if (strequals(recv_action, "add")) return __registry_action(1); - else if (strequals(recv_action, "remove")) return __registry_action(-1); - else if (strequals(recv_action, "wait")) return __registry_action(0); - else if (strequals(recv_action, "list")) return list_registry(recv_client_id, recv_message_id); - else - { - eprint("received invalid action, ignoring."); - return 0; - } - + + /* Validate headers. */ + if (!recv_client_id || strequals(recv_client_id, "0:0")) + return eprint("received message from anonymous sender, ignoring."), 0; + else if (!strchr(recv_client_id, ':')) + return eprint("received message from sender without a colon it its ID, ignoring, invalid ID."), 0; + else if (!recv_length && (!recv_action || !strequals(recv_action, "list"))) + return eprint("received empty message without `Action: list`, ignoring, has no effect."), 0; + else if (!recv_message_id) + return eprint("received message without ID, ignoring, master server is misbehaving."), 0; + + /* Get message length, and make sure the action is defined. */ + if (recv_length) + length = atoz(recv_length); + if (recv_action) + recv_action = "add"; + + /* Perform action. */ +#define __registry_action(action) registry_action(length, action, recv_client_id, recv_message_id) + + if (strequals(recv_action, "add")) return __registry_action(1); + else if (strequals(recv_action, "remove")) return __registry_action(-1); + else if (strequals(recv_action, "wait")) return __registry_action(0); + else if (strequals(recv_action, "list")) return list_registry(recv_client_id, recv_message_id); + else { + eprint("received invalid action, ignoring."); + return 0; + } + #undef __registry_action } @@ -499,16 +467,15 @@ static int handle_register_message(void) */ int handle_message(void) { - size_t i; - for (i = 0; i < received.header_count; i++) - if (strequals(received.headers[i], "Command: register")) - { - fail_if (handle_register_message()); + size_t i; + for (i = 0; i < received.header_count; i++) { + if (strequals(received.headers[i], "Command: register")) { + fail_if (handle_register_message()); + return 0; + } + } + fail_if (handle_close_message()); return 0; - } - fail_if (handle_close_message()); - return 0; fail: - return -1; + return -1; } - -- cgit v1.2.3-70-g09d2