diff options
author | Mattias Andrée <maandree@operamail.com> | 2014-05-18 12:48:20 +0200 |
---|---|---|
committer | Mattias Andrée <maandree@operamail.com> | 2014-05-18 12:48:20 +0200 |
commit | 9d802a7b5617b4f4921ca3ec73bd3e46085297aa (patch) | |
tree | 7632c855c0708eeff948a0f09617d601f2dd3154 /src/mds-server/mds-server.c | |
parent | reduce code complexity (diff) | |
download | mds-9d802a7b5617b4f4921ca3ec73bd3e46085297aa.tar.gz mds-9d802a7b5617b4f4921ca3ec73bd3e46085297aa.tar.bz2 mds-9d802a7b5617b4f4921ca3ec73bd3e46085297aa.tar.xz |
reduce code complexity
Signed-off-by: Mattias Andrée <maandree@operamail.com>
Diffstat (limited to '')
-rw-r--r-- | src/mds-server/mds-server.c | 600 |
1 files changed, 11 insertions, 589 deletions
diff --git a/src/mds-server/mds-server.c b/src/mds-server/mds-server.c index d835a80..b129170 100644 --- a/src/mds-server/mds-server.c +++ b/src/mds-server/mds-server.c @@ -17,10 +17,14 @@ */ #include "mds-server.h" +#include "globals.h" #include "interception_condition.h" #include "client.h" #include "queued_interception.h" #include "multicast.h" +#include "signals.h" +#include "interceptors.h" +#include "sending.h" #include <libmdsserver/config.h> #include <libmdsserver/linked-list.h> @@ -31,7 +35,6 @@ #include <libmdsserver/util.h> #include <libmdsserver/hash-help.h> -#include <signal.h> #include <stdio.h> #include <string.h> #include <stdlib.h> @@ -51,93 +54,6 @@ -#define MDS_SERVER_VARS_VERSION 0 - - -/** - * Number of elements in `argv` - */ -static int argc; - -/** - * Command line arguments - */ -static char** argv; - - -/** - * The program run state, 1 when running, 0 when shutting down - */ -static volatile sig_atomic_t running = 1; - -/** - * Non-zero when the program is about to re-exec. - * Most at all times be at least as true as `terminating`. - */ -static volatile sig_atomic_t reexecing = 0; - -/** - * Non-zero when the program is about to terminate - */ -static volatile sig_atomic_t terminating = 0; - -/** - * The number of running slaves - */ -static int running_slaves = 0; - -/** - * Mutex for slave data - */ -static pthread_mutex_t slave_mutex; - -/** - * Condition for slave data - */ -static pthread_cond_t slave_cond; - -/** - * The thread that runs the master loop - */ -static pthread_t master_thread; - -/** - * Map from client socket file descriptor to all information (client_t) - */ -static fd_table_t client_map; - -/** - * List of client information (client_t) - */ -static linked_list_t client_list; - -/** - * The next free ID for a client - */ -static uint64_t next_id = 1; - -/** - * The next free ID for a message modifications - */ -static uint64_t next_modify_id = 1; - -/** - * Mutex for message modification - */ -static pthread_mutex_t modify_mutex; - -/** - * Condition for message modification - */ -static pthread_cond_t modify_cond; - -/** - * Map from modification ID to waiting client - */ -static hash_table_t modify_map; - - - /** * Entry point of the server * @@ -252,11 +168,8 @@ int main(int argc_, char** argv_) /* Store the current thread so it can be killed from elsewhere. */ master_thread = pthread_self(); - /* Make the server update without all slaves dying on SIGUSR1. */ - error_if (1, xsigaction(SIGUSR1, sigusr1_trap) < 0); - - /* Implement clean exit on SIGTERM. */ - error_if (1, xsigaction(SIGTERM, sigterm_trap) < 0); + /* Set up traps for especially handled signals. */ + error_if (1, trap_signals() < 0); /* Create mutex and condition for slave counter. */ error_if (1, (errno = pthread_mutex_init(&slave_mutex, NULL))); @@ -458,12 +371,8 @@ void* slave_loop(void* data) /* Store slave thread and create mutexes and conditions. */ fail_if (client_initialise_threading(information)); - - /* Make the server update without all slaves dying on SIGUSR1. */ - fail_if (xsigaction(SIGUSR1, sigusr1_trap) < 0); - - /* Implement clean exit on SIGTERM. */ - fail_if (xsigaction(SIGTERM, sigterm_trap) < 0); + /* Set up traps for especially handled signals. */ + fail_if (trap_signals() < 0); /* Fetch messages from the slave. */ @@ -742,7 +651,7 @@ int message_received(client_t* client) if (assign_id && (client->id == 0)) { intercept |= 2; - with_mutex_if (slave_mutex, (client->id = ++next_id) == 0, + with_mutex_if (slave_mutex, (client->id = ++next_client_id) == 0, eprint("this is impossible, ID counter has overflowed."); /* If the program ran for a millennium it would take c:a 585 assignments per nanosecond. This @@ -893,145 +802,6 @@ int message_received(client_t* client) /** - * Remove interception condition by index - * - * @param client The intercepting client - * @param index The index of the condition - */ -static void remove_intercept_condition(client_t* client, size_t index) -{ - interception_condition_t* conds = client->interception_conditions; - size_t n = client->interception_conditions_count; - - /* Remove the condition from the list. */ - memmove(conds + index, conds + index + 1, --n - index); - client->interception_conditions_count--; - - /* Shrink the list. */ - if (client->interception_conditions_count == 0) - { - free(conds); - client->interception_conditions = NULL; - } - else - if (xrealloc(conds, n, interception_condition_t)) - perror(*argv); - else - client->interception_conditions = conds; -} - - -/** - * Add an interception condition for a client - * - * @param client The client - * @param condition The header, optionally with value, to look for, or empty (not `NULL`) for all messages - * @param priority Interception priority - * @param modifying Whether the client may modify the messages - * @param stop Whether the condition should be removed rather than added - */ -void add_intercept_condition(client_t* client, char* condition, int64_t priority, int modifying, int stop) -{ - size_t n = client->interception_conditions_count; - interception_condition_t* conds = client->interception_conditions; - ssize_t nonmodifying = -1; - char* header = condition; - char* value; - size_t hash; - size_t i; - - /* Split header and value apart. */ - if ((value = strchr(header, ':')) != NULL) - { - *value = '\0'; /* NUL-terminate header. */ - value += 2; /* Skip over delimiter. */ - } - - /* Calcuate header hash (comparison optimisation) */ - hash = string_hash(header); - - /* Remove of update condition of already registered, - also look for non-modifying condition to swap position - with for optimisation. */ - for (i = 0; i < n; i++) - { - if ((conds[i].header_hash != hash) || !strequals(conds[i].condition, condition)) - { - /* Look for the first non-modifying, this is a part of the - optimisation where we put all modifying conditions at the - beginning. */ - if ((nonmodifying < 0) && conds[i].modifying) - nonmodifying = (ssize_t)i; - continue; - } - - if (stop) - remove_intercept_condition(client, i); - else - { - /* Update parameters. */ - conds[i].priority = priority; - conds[i].modifying = modifying; - - if (modifying && (nonmodifying >= 0)) - { - /* Optimisation: put conditions that are modifying - at the beginning. When a client is intercepting - we most know if any satisfying condition is - modifying. With this optimisation the first - satisfying condition will tell us if there is - any satisfying condition that is modifying. */ - interception_condition_t temp = conds[nonmodifying]; - conds[nonmodifying] = conds[i]; - conds[i] = temp; - } - } - return; - } - - if (stop) - eprint("client tried to stop intercepting messages that it does not intercept."); - else - { - /* Duplicate condition string. */ - if ((condition = strdup(condition)) == NULL) - { - perror(*argv); - return; - } - - /* Grow the interception condition list. */ - if (xrealloc(conds, n + 1, interception_condition_t)) - { - perror(*argv); - free(condition); - return; - } - client->interception_conditions = conds; - /* Store condition. */ - client->interception_conditions_count++; - conds[n].condition = condition; - conds[n].header_hash = hash; - conds[n].priority = priority; - conds[n].modifying = modifying; - - if (modifying && (nonmodifying >= 0)) - { - /* Optimisation: put conditions that are modifying - at the beginning. When a client is intercepting - we most know if any satisfying condition is - modifying. With this optimisation the first - satisfying condition will tell us if there is - any satisfying condition that is modifying. */ - interception_condition_t temp = conds[nonmodifying]; - conds[nonmodifying] = conds[n]; - conds[n] = temp; - } - } -} - - -/** * Compare two queued interceptors by priority * * @param a:const queued_interception_t* One of the interceptors @@ -1048,125 +818,6 @@ static int cmp_queued_interception(const void* a, const void* b) /** - * Check if a condition matches any of a set of accepted patterns - * - * @param cond The condition - * @param hashes The hashes of the accepted header names - * @param keys The header names - * @param headers The header name–value pairs - * @param count The number of accepted patterns - * @return Evaluates to true if and only if a matching pattern was found - */ -static int is_condition_matching(interception_condition_t* cond, - size_t* hashes, char** keys, char** headers, size_t count) -{ - size_t i; - for (i = 0; i < count; i++) - if (*(cond->condition) == '\0') - return 1; - else if ((cond->header_hash == hashes[i]) && - (strequals(cond->condition, keys[i]) || - strequals(cond->condition, headers[i]))) - return 1; - return 0; -} - - -/** - * Find a matching condition to any of a set of acceptable conditions - * - * @param client The intercepting client - * @param hashes The hashes of the accepted header names - * @param keys The header names - * @param headers The header name–value pairs - * @param count The number of accepted patterns - * @param interception_out Storage slot for found interception - * @return -1 on error, otherwise: evalutes to true iff a matching condition was found - */ -static int find_matching_condition(client_t* client, size_t* hashes, char** keys, char** headers, size_t count, - queued_interception_t* interception_out) -{ - pthread_mutex_t mutex = client->mutex; - interception_condition_t* conds = client->interception_conditions; - size_t n = 0, i; - - errno = pthread_mutex_lock(&(mutex)); - if (errno) - return -1; - - /* Look for a matching condition. */ - if (client->open) - n = client->interception_conditions_count; - for (i = 0; i < n; i++) - if (is_condition_matching(conds + i, hashes, keys, headers, count)) - { - /* Report matching condition. */ - interception_out->client = client; - interception_out->priority = conds[i].priority; - interception_out->modifying = conds[i].modifying; - break; - } - - pthread_mutex_unlock(&(mutex)); - - return i < n; -} - - -/** - * Get all interceptors who have at least one condition matching any of a set of acceptable patterns - * - * @param sender The original sender of the message - * @param hashes The hashes of the accepted header names - * @param keys The header names - * @param headers The header name–value pairs - * @param count The number of accepted patterns - * @param interceptions_count_out Slot at where to store the number of found interceptors - * @return The found interceptors, `NULL` on error - */ -static queued_interception_t* get_interceptors(client_t* sender, size_t* hashes, char** keys, char** headers, - size_t count, size_t* interceptions_count_out) -{ - queued_interception_t* interceptions = NULL; - size_t interceptions_count = 0; - size_t n = 0; - ssize_t node; - - /* Count clients. */ - foreach_linked_list_node (client_list, node) - n++; - - /* Allocate interceptor list. */ - if (xmalloc(interceptions, n, queued_interception_t)) - return NULL; - - /* Search clients. */ - foreach_linked_list_node (client_list, node) - { - client_t* client = (client_t*)(void*)(client_list.values[node]); - - /* Look for and list a matching condition. */ - if (client->open && (client != sender)) - { - int r = find_matching_condition(client, hashes, keys, headers, count, - interceptions + interceptions_count); - if (r == -1) - { - free(interceptions); - return NULL; - } - if (r) - /* List client of there was a matching condition. */ - interceptions_count++; - } - } - - *interceptions_count_out = interceptions_count; - return interceptions; -} - - -/** * Queue a message for multicasting * * @param message The message @@ -1296,175 +947,6 @@ void queue_message_multicast(char* message, size_t length, client_t* sender) /** - * Get the client by its socket's file descriptor in a synchronised manner - * - * @param socket_fd The file descriptor of the client's socket - * @return The client - */ -static client_t* client_by_socket(int socket_fd) -{ - size_t address; - with_mutex (slave_mutex, address = fd_table_get(&client_map, socket_fd);); - return (client_t*)(void*)address; -} - - -/** - * Send a multicast message to one recipient - * - * @param multicast The message - * @param recipient The recipient - * @param modifying Whether the recipient may modify the message - * @return Evaluates to true if and only if the entire message was sent - */ -static int send_multicast_to_recipient(multicast_t* multicast, client_t* recipient, int modifying) -{ - char* msg = multicast->message + multicast->message_ptr; - size_t n = multicast->message_length - multicast->message_ptr; - size_t sent; - - /* Skip Modify ID header if the interceptors will not perform a modification. */ - if ((modifying == 0) && (multicast->message_ptr == 0)) - { - n -= multicast->message_prefix; - multicast->message_ptr += multicast->message_prefix; - } - - /* Send the message. */ - n *= sizeof(char); - with_mutex (recipient->mutex, - if (recipient->open) - { - sent = send_message(recipient->socket_fd, msg + multicast->message_ptr, n); - n -= sent; - multicast->message_ptr += sent / sizeof(char); - if ((n > 0) && (errno != EINTR)) - perror(*argv); - } - ); - - return n == 0; -} - - -/** - * Wait for the recipient of a multicast to reply - * - * @param recipient The recipient - * @param modify_id The modify ID of the multicast - */ -static void wait_for_reply(client_t* recipient, uint64_t modify_id) -{ - /* pthread_cond_timedwait is required to handle re-exec and termination because - pthread_cond_timedwait and pthread_cond_wait ignore interruptions via signals. */ - struct timespec timeout = - { - .tv_sec = 1, - .tv_nsec = 0 - }; - - with_mutex_if (modify_mutex, recipient->modify_message == NULL, - if (hash_table_contains_key(&modify_map, (size_t)modify_id) == 0) - { - hash_table_put(&modify_map, (size_t)modify_id, (size_t)(void*)recipient); - pthread_cond_signal(&slave_cond); - } - ); - - with_mutex_if (recipient->modify_mutex, recipient->modify_message == NULL, - while ((recipient->modify_message == NULL) && (terminating == 0)) - pthread_cond_timedwait(&slave_cond, &slave_mutex, &timeout); - if (terminating == 0) - hash_table_remove(&modify_map, (size_t)modify_id); - ); -} - - -/** - * Multicast a message - * - * @param multicast The multicast message - */ -void multicast_message(multicast_t* multicast) -{ - uint64_t modify_id = 0; - size_t n = strlen("Modify ID: "); - if (startswith_n(multicast->message, "Modify ID: ", multicast->message_length, n)) - { - char* value = multicast->message + n; - char* lf = strchr(value, '\n'); - *lf = '\0'; - modify_id = (uint64_t)atoll(value); - *lf = '\n'; - } - - for (; multicast->interceptions_ptr < multicast->interceptions_count; multicast->interceptions_ptr++) - { - queued_interception_t client_ = multicast->interceptions[multicast->interceptions_ptr]; - client_t* client = client_.client; - int modifying = 0; - char* old_buf; - size_t i; - mds_message_t* mod; - - /* After unmarshalling at re-exec, client will be NULL and must be mapped from its socket. */ - if (client == NULL) - client_.client = client = client_by_socket(client_.socket_fd); - - /* Send the message to the recipient. */ - if (send_multicast_to_recipient(multicast, client, client_.modifying) == 0) - { - /* Stop if we are re-exec:ing or terminating, or continue to next recipient on error. */ - if (terminating) - return; - else - continue; - } - - /* Do not wait for a reply if it is non-modifying. */ - if (client_.modifying == 0) - { - /* Reset how much of the message has been sent before we continue with next recipient. */ - multicast->message_ptr = 0; - continue; - } - - /* Wait for a reply. */ - wait_for_reply(client, modify_id); - if (terminating) - return; - - /* Act upon the reply. */ - mod = client->modify_message; - for (i = 0; i < mod->header_count; i++) - if (strequals(mod->headers[i], "Modify: yes")) - { - modifying = 1; - break; - } - if (modifying) - { - n = mod->payload_size; - old_buf = multicast->message; - if (xrealloc(multicast->message, multicast->message_prefix + n, char)) - { - perror(*argv); - multicast->message = old_buf; - } - else - memcpy(multicast->message + multicast->message_prefix, mod->payload, n); - } - - /* Free the reply. */ - mds_message_destroy(client->modify_message); - - /* Reset how much of the message has been sent before we continue with next recipient. */ - multicast->message_ptr = 0; - } -} - - -/** * Exec into the mdsinitrc script * * @param args The arguments to the child process @@ -1536,66 +1018,6 @@ void run_initrc(char** args) /** - * Called with the signal SIGUSR1 is caught. - * This function should cue a re-exec of the program. - * - * @param signo The caught signal - */ -void sigusr1_trap(int signo) -{ - if (reexecing == 0) - { - terminating = reexecing = 1; - eprint("re-exec signal received."); - signal_all(signo); - } -} - - -/** - * Called with the signal SIGTERM is caught. - * This function should cue a termination of the program. - * - * @param signo The caught signal - */ -void sigterm_trap(int signo) -{ - if (terminating == 0) - { - terminating = 1; - eprint("terminate signal received."); - signal_all(signo); - } -} - - -/** - * Send a singal to all threads except the current thread - * - * @param signo The signal - */ -void signal_all(int signo) -{ - pthread_t current_thread; - ssize_t node; - - current_thread = pthread_self(); - - if (pthread_equal(current_thread, master_thread) == 0) - pthread_kill(master_thread, signo); - - with_mutex (slave_mutex, - foreach_linked_list_node (client_list, node) - { - client_t* value = (client_t*)(void*)(client_list.values[node]); - if (pthread_equal(current_thread, value->thread) == 0) - pthread_kill(value->thread, signo); - } - ); -} - - -/** * Marshal the server's state into a file * * @param fd The file descriptor @@ -1635,7 +1057,7 @@ int marshal_server(int fd) /* Marshal the miscellaneous state data. */ buf_set_next(state_buf_, sig_atomic_t, running); - buf_set_next(state_buf_, uint64_t, next_id); + buf_set_next(state_buf_, uint64_t, next_client_id); buf_set_next(state_buf_, uint64_t, next_modify_id); /* Tell the program how large the marshalled client list is and how any clients are marshalled. */ @@ -1733,7 +1155,7 @@ int unmarshal_server(int fd) /* Unmarshal the miscellaneous state data. */ buf_get_next(state_buf_, sig_atomic_t, running); - buf_get_next(state_buf_, uint64_t, next_id); + buf_get_next(state_buf_, uint64_t, next_client_id); buf_get_next(state_buf_, uint64_t, next_modify_id); /* Get the marshalled size of the client list and how any clients that are marshalled. */ |