From 9d802a7b5617b4f4921ca3ec73bd3e46085297aa Mon Sep 17 00:00:00 2001 From: Mattias Andrée Date: Sun, 18 May 2014 12:48:20 +0200 Subject: reduce code complexity MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Mattias Andrée --- src/mds-server/globals.c | 39 +++ src/mds-server/globals.h | 121 +++++++++ src/mds-server/interceptors.c | 291 ++++++++++++++++++++ src/mds-server/interceptors.h | 86 ++++++ src/mds-server/mds-server.c | 600 +----------------------------------------- src/mds-server/mds-server.h | 43 --- src/mds-server/sending.c | 204 ++++++++++++++ src/mds-server/sending.h | 34 +++ src/mds-server/signals.c | 109 ++++++++ src/mds-server/signals.h | 31 +++ 10 files changed, 926 insertions(+), 632 deletions(-) create mode 100644 src/mds-server/globals.c create mode 100644 src/mds-server/globals.h create mode 100644 src/mds-server/interceptors.c create mode 100644 src/mds-server/interceptors.h create mode 100644 src/mds-server/sending.c create mode 100644 src/mds-server/sending.h create mode 100644 src/mds-server/signals.c create mode 100644 src/mds-server/signals.h (limited to 'src/mds-server') diff --git a/src/mds-server/globals.c b/src/mds-server/globals.c new file mode 100644 index 0000000..0d2c6b6 --- /dev/null +++ b/src/mds-server/globals.c @@ -0,0 +1,39 @@ +/** + * mds — A micro-display server + * Copyright © 2014 Mattias Andrée (maandree@member.fsf.org) + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program. If not, see . + */ +#include "globals.h" + + +int argc; +char** argv; + +volatile sig_atomic_t running = 1; +volatile sig_atomic_t reexecing = 0; +volatile sig_atomic_t terminating = 0; + +size_t running_slaves = 0; +pthread_mutex_t slave_mutex; +pthread_cond_t slave_cond; +pthread_t master_thread; +fd_table_t client_map; +linked_list_t client_list; +uint64_t next_client_id = 1; +uint64_t next_modify_id = 1; +pthread_mutex_t modify_mutex; +pthread_cond_t modify_cond; +hash_table_t modify_map; + diff --git a/src/mds-server/globals.h b/src/mds-server/globals.h new file mode 100644 index 0000000..1016ae7 --- /dev/null +++ b/src/mds-server/globals.h @@ -0,0 +1,121 @@ +/** + * mds — A micro-display server + * Copyright © 2014 Mattias Andrée (maandree@member.fsf.org) + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program. If not, see . + */ +#ifndef MDS_MDS_SERVER_GLOBALS_H +#define MDS_MDS_SERVER_GLOBALS_H + + +#include +#include +#include + +#include +#include +#include + + + +#define MDS_SERVER_VARS_VERSION 0 + + + +/** + * Number of elements in `argv` + */ +extern int argc; + +/** + * Command line arguments + */ +extern char** argv; + + +/** + * The program run state, 1 when running, 0 when shutting down + */ +extern volatile sig_atomic_t running; + +/** + * Non-zero when the program is about to re-exec. + * Most at all times be at least as true as `terminating`. + */ +extern volatile sig_atomic_t reexecing; + +/** + * Non-zero when the program is about to terminate + */ +extern volatile sig_atomic_t terminating; + + +/** + * The number of running slaves + */ +extern size_t running_slaves; + +/** + * Mutex for slave data + */ +extern pthread_mutex_t slave_mutex; + +/** + * Condition for slave data + */ +extern pthread_cond_t slave_cond; + +/** + * The thread that runs the master loop + */ +extern pthread_t master_thread; + +/** + * Map from client socket file descriptor to all information (client_t) + */ +extern fd_table_t client_map; + +/** + * List of client information (client_t) + */ +extern linked_list_t client_list; + +/** + * The next free ID for a client + */ +extern uint64_t next_client_id; + +/** + * The next free ID for a message modifications + */ +extern uint64_t next_modify_id; + +/** + * Mutex for message modification + */ +extern pthread_mutex_t modify_mutex; + +/** + * Condition for message modification + */ +extern pthread_cond_t modify_cond; + +/** + * Map from modification ID to waiting client + */ +extern hash_table_t modify_map; + + +#endif + diff --git a/src/mds-server/interceptors.c b/src/mds-server/interceptors.c new file mode 100644 index 0000000..9d602c7 --- /dev/null +++ b/src/mds-server/interceptors.c @@ -0,0 +1,291 @@ +/** + * mds — A micro-display server + * Copyright © 2014 Mattias Andrée (maandree@member.fsf.org) + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program. If not, see . + */ +#include "interceptors.h" + +#include "globals.h" +#include "interception_condition.h" +#include "client.h" +#include "queued_interception.h" + +#include +#include + +#include +#include +#include +#include +#include + + +/** + * Remove interception condition by index + * + * @param client The intercepting client + * @param index The index of the condition + */ +static void remove_intercept_condition(client_t* client, size_t index) +{ + interception_condition_t* conds = client->interception_conditions; + size_t n = client->interception_conditions_count; + + /* Remove the condition from the list. */ + memmove(conds + index, conds + index + 1, --n - index); + client->interception_conditions_count--; + + /* Shrink the list. */ + if (client->interception_conditions_count == 0) + { + free(conds); + client->interception_conditions = NULL; + } + else + if (xrealloc(conds, n, interception_condition_t)) + perror(*argv); + else + client->interception_conditions = conds; +} + + +/** + * Add an interception condition for a client + * + * @param client The client + * @param condition The header, optionally with value, to look for, or empty (not NULL) for all messages + * @param priority Interception priority + * @param modifying Whether the client may modify the messages + * @param stop Whether the condition should be removed rather than added + */ +void add_intercept_condition(client_t* client, char* condition, int64_t priority, int modifying, int stop) +{ + size_t n = client->interception_conditions_count; + interception_condition_t* conds = client->interception_conditions; + ssize_t nonmodifying = -1; + char* header = condition; + char* value; + size_t hash; + size_t i; + + /* Split header and value apart. */ + if ((value = strchr(header, ':')) != NULL) + { + *value = '\0'; /* NUL-terminate header. */ + value += 2; /* Skip over delimiter. */ + } + + /* Calcuate header hash (comparison optimisation) */ + hash = string_hash(header); + + /* Remove of update condition of already registered, + also look for non-modifying condition to swap position + with for optimisation. */ + for (i = 0; i < n; i++) + { + if ((conds[i].header_hash != hash) || !strequals(conds[i].condition, condition)) + { + /* Look for the first non-modifying, this is a part of the + optimisation where we put all modifying conditions at the + beginning. */ + if ((nonmodifying < 0) && conds[i].modifying) + nonmodifying = (ssize_t)i; + continue; + } + + if (stop) + remove_intercept_condition(client, i); + else + { + /* Update parameters. */ + conds[i].priority = priority; + conds[i].modifying = modifying; + + if (modifying && (nonmodifying >= 0)) + { + /* Optimisation: put conditions that are modifying + at the beginning. When a client is intercepting + we most know if any satisfying condition is + modifying. With this optimisation the first + satisfying condition will tell us if there is + any satisfying condition that is modifying. */ + interception_condition_t temp = conds[nonmodifying]; + conds[nonmodifying] = conds[i]; + conds[i] = temp; + } + } + return; + } + + if (stop) + eprint("client tried to stop intercepting messages that it does not intercept."); + else + { + /* Duplicate condition string. */ + if ((condition = strdup(condition)) == NULL) + { + perror(*argv); + return; + } + + /* Grow the interception condition list. */ + if (xrealloc(conds, n + 1, interception_condition_t)) + { + perror(*argv); + free(condition); + return; + } + client->interception_conditions = conds; + /* Store condition. */ + client->interception_conditions_count++; + conds[n].condition = condition; + conds[n].header_hash = hash; + conds[n].priority = priority; + conds[n].modifying = modifying; + + if (modifying && (nonmodifying >= 0)) + { + /* Optimisation: put conditions that are modifying + at the beginning. When a client is intercepting + we most know if any satisfying condition is + modifying. With this optimisation the first + satisfying condition will tell us if there is + any satisfying condition that is modifying. */ + interception_condition_t temp = conds[nonmodifying]; + conds[nonmodifying] = conds[n]; + conds[n] = temp; + } + } +} + + +/** + * Check if a condition matches any of a set of accepted patterns + * + * @param cond The condition + * @param hashes The hashes of the accepted header names + * @param keys The header names + * @param headers The header name–value pairs + * @param count The number of accepted patterns + * @return Evaluates to true if and only if a matching pattern was found + */ +int is_condition_matching(interception_condition_t* cond, size_t* hashes, + char** keys, char** headers, size_t count) +{ + size_t i; + for (i = 0; i < count; i++) + if (*(cond->condition) == '\0') + return 1; + else if ((cond->header_hash == hashes[i]) && + (strequals(cond->condition, keys[i]) || + strequals(cond->condition, headers[i]))) + return 1; + return 0; +} + + +/** + * Find a matching condition to any of a set of acceptable conditions + * + * @param client The intercepting client + * @param hashes The hashes of the accepted header names + * @param keys The header names + * @param headers The header name–value pairs + * @param count The number of accepted patterns + * @param interception_out Storage slot for found interception + * @return -1 on error, otherwise: evalutes to true iff a matching condition was found + */ +int find_matching_condition(client_t* client, size_t* hashes, char** keys, char** headers, + size_t count, queued_interception_t* interception_out) +{ + pthread_mutex_t mutex = client->mutex; + interception_condition_t* conds = client->interception_conditions; + size_t n = 0, i; + + errno = pthread_mutex_lock(&(mutex)); + if (errno) + return -1; + + /* Look for a matching condition. */ + if (client->open) + n = client->interception_conditions_count; + for (i = 0; i < n; i++) + if (is_condition_matching(conds + i, hashes, keys, headers, count)) + { + /* Report matching condition. */ + interception_out->client = client; + interception_out->priority = conds[i].priority; + interception_out->modifying = conds[i].modifying; + break; + } + + pthread_mutex_unlock(&(mutex)); + + return i < n; +} + + +/** + * Get all interceptors who have at least one condition matching any of a set of acceptable patterns + * + * @param sender The original sender of the message + * @param hashes The hashes of the accepted header names + * @param keys The header names + * @param headers The header name–value pairs + * @param count The number of accepted patterns + * @param interceptions_count_out Slot at where to store the number of found interceptors + * @return The found interceptors, NULL on error + */ +queued_interception_t* get_interceptors(client_t* sender, size_t* hashes, char** keys, char** headers, + size_t count, size_t* interceptions_count_out) +{ + queued_interception_t* interceptions = NULL; + size_t interceptions_count = 0; + size_t n = 0; + ssize_t node; + + /* Count clients. */ + foreach_linked_list_node (client_list, node) + n++; + + /* Allocate interceptor list. */ + if (xmalloc(interceptions, n, queued_interception_t)) + return NULL; + + /* Search clients. */ + foreach_linked_list_node (client_list, node) + { + client_t* client = (client_t*)(void*)(client_list.values[node]); + + /* Look for and list a matching condition. */ + if (client->open && (client != sender)) + { + int r = find_matching_condition(client, hashes, keys, headers, count, + interceptions + interceptions_count); + if (r == -1) + { + free(interceptions); + return NULL; + } + if (r) + /* List client of there was a matching condition. */ + interceptions_count++; + } + } + + *interceptions_count_out = interceptions_count; + return interceptions; +} + diff --git a/src/mds-server/interceptors.h b/src/mds-server/interceptors.h new file mode 100644 index 0000000..06c76e1 --- /dev/null +++ b/src/mds-server/interceptors.h @@ -0,0 +1,86 @@ +/** + * mds — A micro-display server + * Copyright © 2014 Mattias Andrée (maandree@member.fsf.org) + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program. If not, see . + */ +#ifndef MDS_MDS_SERVER_INTERCEPTORS_H +#define MDS_MDS_SERVER_INTERCEPTORS_H + + +#include "interception_condition.h" +#include "client.h" +#include "queued_interception.h" + +#include +#include + + +/** + * Add an interception condition for a client + * + * @param client The client + * @param condition The header, optionally with value, to look for, or empty (not NULL) for all messages + * @param priority Interception priority + * @param modifying Whether the client may modify the messages + * @param stop Whether the condition should be removed rather than added + */ +void add_intercept_condition(client_t* client, char* condition, int64_t priority, int modifying, int stop); + + +/** + * Check if a condition matches any of a set of accepted patterns + * + * @param cond The condition + * @param hashes The hashes of the accepted header names + * @param keys The header names + * @param headers The header name–value pairs + * @param count The number of accepted patterns + * @return Evaluates to true if and only if a matching pattern was found + */ +int is_condition_matching(interception_condition_t* cond, size_t* hashes, + char** keys, char** headers, size_t count) __attribute__((pure)); + + +/** + * Find a matching condition to any of a set of acceptable conditions + * + * @param client The intercepting client + * @param hashes The hashes of the accepted header names + * @param keys The header names + * @param headers The header name–value pairs + * @param count The number of accepted patterns + * @param interception_out Storage slot for found interception + * @return -1 on error, otherwise: evalutes to true iff a matching condition was found + */ +int find_matching_condition(client_t* client, size_t* hashes, char** keys, char** headers, + size_t count, queued_interception_t* interception_out); + + +/** + * Get all interceptors who have at least one condition matching any of a set of acceptable patterns + * + * @param sender The original sender of the message + * @param hashes The hashes of the accepted header names + * @param keys The header names + * @param headers The header name–value pairs + * @param count The number of accepted patterns + * @param interceptions_count_out Slot at where to store the number of found interceptors + * @return The found interceptors, NULL on error + */ +queued_interception_t* get_interceptors(client_t* sender, size_t* hashes, char** keys, char** headers, + size_t count, size_t* interceptions_count_out); + +#endif + diff --git a/src/mds-server/mds-server.c b/src/mds-server/mds-server.c index d835a80..b129170 100644 --- a/src/mds-server/mds-server.c +++ b/src/mds-server/mds-server.c @@ -17,10 +17,14 @@ */ #include "mds-server.h" +#include "globals.h" #include "interception_condition.h" #include "client.h" #include "queued_interception.h" #include "multicast.h" +#include "signals.h" +#include "interceptors.h" +#include "sending.h" #include #include @@ -31,7 +35,6 @@ #include #include -#include #include #include #include @@ -51,93 +54,6 @@ -#define MDS_SERVER_VARS_VERSION 0 - - -/** - * Number of elements in `argv` - */ -static int argc; - -/** - * Command line arguments - */ -static char** argv; - - -/** - * The program run state, 1 when running, 0 when shutting down - */ -static volatile sig_atomic_t running = 1; - -/** - * Non-zero when the program is about to re-exec. - * Most at all times be at least as true as `terminating`. - */ -static volatile sig_atomic_t reexecing = 0; - -/** - * Non-zero when the program is about to terminate - */ -static volatile sig_atomic_t terminating = 0; - -/** - * The number of running slaves - */ -static int running_slaves = 0; - -/** - * Mutex for slave data - */ -static pthread_mutex_t slave_mutex; - -/** - * Condition for slave data - */ -static pthread_cond_t slave_cond; - -/** - * The thread that runs the master loop - */ -static pthread_t master_thread; - -/** - * Map from client socket file descriptor to all information (client_t) - */ -static fd_table_t client_map; - -/** - * List of client information (client_t) - */ -static linked_list_t client_list; - -/** - * The next free ID for a client - */ -static uint64_t next_id = 1; - -/** - * The next free ID for a message modifications - */ -static uint64_t next_modify_id = 1; - -/** - * Mutex for message modification - */ -static pthread_mutex_t modify_mutex; - -/** - * Condition for message modification - */ -static pthread_cond_t modify_cond; - -/** - * Map from modification ID to waiting client - */ -static hash_table_t modify_map; - - - /** * Entry point of the server * @@ -252,11 +168,8 @@ int main(int argc_, char** argv_) /* Store the current thread so it can be killed from elsewhere. */ master_thread = pthread_self(); - /* Make the server update without all slaves dying on SIGUSR1. */ - error_if (1, xsigaction(SIGUSR1, sigusr1_trap) < 0); - - /* Implement clean exit on SIGTERM. */ - error_if (1, xsigaction(SIGTERM, sigterm_trap) < 0); + /* Set up traps for especially handled signals. */ + error_if (1, trap_signals() < 0); /* Create mutex and condition for slave counter. */ error_if (1, (errno = pthread_mutex_init(&slave_mutex, NULL))); @@ -458,12 +371,8 @@ void* slave_loop(void* data) /* Store slave thread and create mutexes and conditions. */ fail_if (client_initialise_threading(information)); - - /* Make the server update without all slaves dying on SIGUSR1. */ - fail_if (xsigaction(SIGUSR1, sigusr1_trap) < 0); - - /* Implement clean exit on SIGTERM. */ - fail_if (xsigaction(SIGTERM, sigterm_trap) < 0); + /* Set up traps for especially handled signals. */ + fail_if (trap_signals() < 0); /* Fetch messages from the slave. */ @@ -742,7 +651,7 @@ int message_received(client_t* client) if (assign_id && (client->id == 0)) { intercept |= 2; - with_mutex_if (slave_mutex, (client->id = ++next_id) == 0, + with_mutex_if (slave_mutex, (client->id = ++next_client_id) == 0, eprint("this is impossible, ID counter has overflowed."); /* If the program ran for a millennium it would take c:a 585 assignments per nanosecond. This @@ -892,145 +801,6 @@ int message_received(client_t* client) } -/** - * Remove interception condition by index - * - * @param client The intercepting client - * @param index The index of the condition - */ -static void remove_intercept_condition(client_t* client, size_t index) -{ - interception_condition_t* conds = client->interception_conditions; - size_t n = client->interception_conditions_count; - - /* Remove the condition from the list. */ - memmove(conds + index, conds + index + 1, --n - index); - client->interception_conditions_count--; - - /* Shrink the list. */ - if (client->interception_conditions_count == 0) - { - free(conds); - client->interception_conditions = NULL; - } - else - if (xrealloc(conds, n, interception_condition_t)) - perror(*argv); - else - client->interception_conditions = conds; -} - - -/** - * Add an interception condition for a client - * - * @param client The client - * @param condition The header, optionally with value, to look for, or empty (not `NULL`) for all messages - * @param priority Interception priority - * @param modifying Whether the client may modify the messages - * @param stop Whether the condition should be removed rather than added - */ -void add_intercept_condition(client_t* client, char* condition, int64_t priority, int modifying, int stop) -{ - size_t n = client->interception_conditions_count; - interception_condition_t* conds = client->interception_conditions; - ssize_t nonmodifying = -1; - char* header = condition; - char* value; - size_t hash; - size_t i; - - /* Split header and value apart. */ - if ((value = strchr(header, ':')) != NULL) - { - *value = '\0'; /* NUL-terminate header. */ - value += 2; /* Skip over delimiter. */ - } - - /* Calcuate header hash (comparison optimisation) */ - hash = string_hash(header); - - /* Remove of update condition of already registered, - also look for non-modifying condition to swap position - with for optimisation. */ - for (i = 0; i < n; i++) - { - if ((conds[i].header_hash != hash) || !strequals(conds[i].condition, condition)) - { - /* Look for the first non-modifying, this is a part of the - optimisation where we put all modifying conditions at the - beginning. */ - if ((nonmodifying < 0) && conds[i].modifying) - nonmodifying = (ssize_t)i; - continue; - } - - if (stop) - remove_intercept_condition(client, i); - else - { - /* Update parameters. */ - conds[i].priority = priority; - conds[i].modifying = modifying; - - if (modifying && (nonmodifying >= 0)) - { - /* Optimisation: put conditions that are modifying - at the beginning. When a client is intercepting - we most know if any satisfying condition is - modifying. With this optimisation the first - satisfying condition will tell us if there is - any satisfying condition that is modifying. */ - interception_condition_t temp = conds[nonmodifying]; - conds[nonmodifying] = conds[i]; - conds[i] = temp; - } - } - return; - } - - if (stop) - eprint("client tried to stop intercepting messages that it does not intercept."); - else - { - /* Duplicate condition string. */ - if ((condition = strdup(condition)) == NULL) - { - perror(*argv); - return; - } - - /* Grow the interception condition list. */ - if (xrealloc(conds, n + 1, interception_condition_t)) - { - perror(*argv); - free(condition); - return; - } - client->interception_conditions = conds; - /* Store condition. */ - client->interception_conditions_count++; - conds[n].condition = condition; - conds[n].header_hash = hash; - conds[n].priority = priority; - conds[n].modifying = modifying; - - if (modifying && (nonmodifying >= 0)) - { - /* Optimisation: put conditions that are modifying - at the beginning. When a client is intercepting - we most know if any satisfying condition is - modifying. With this optimisation the first - satisfying condition will tell us if there is - any satisfying condition that is modifying. */ - interception_condition_t temp = conds[nonmodifying]; - conds[nonmodifying] = conds[n]; - conds[n] = temp; - } - } -} - - /** * Compare two queued interceptors by priority * @@ -1047,125 +817,6 @@ static int cmp_queued_interception(const void* a, const void* b) } -/** - * Check if a condition matches any of a set of accepted patterns - * - * @param cond The condition - * @param hashes The hashes of the accepted header names - * @param keys The header names - * @param headers The header name–value pairs - * @param count The number of accepted patterns - * @return Evaluates to true if and only if a matching pattern was found - */ -static int is_condition_matching(interception_condition_t* cond, - size_t* hashes, char** keys, char** headers, size_t count) -{ - size_t i; - for (i = 0; i < count; i++) - if (*(cond->condition) == '\0') - return 1; - else if ((cond->header_hash == hashes[i]) && - (strequals(cond->condition, keys[i]) || - strequals(cond->condition, headers[i]))) - return 1; - return 0; -} - - -/** - * Find a matching condition to any of a set of acceptable conditions - * - * @param client The intercepting client - * @param hashes The hashes of the accepted header names - * @param keys The header names - * @param headers The header name–value pairs - * @param count The number of accepted patterns - * @param interception_out Storage slot for found interception - * @return -1 on error, otherwise: evalutes to true iff a matching condition was found - */ -static int find_matching_condition(client_t* client, size_t* hashes, char** keys, char** headers, size_t count, - queued_interception_t* interception_out) -{ - pthread_mutex_t mutex = client->mutex; - interception_condition_t* conds = client->interception_conditions; - size_t n = 0, i; - - errno = pthread_mutex_lock(&(mutex)); - if (errno) - return -1; - - /* Look for a matching condition. */ - if (client->open) - n = client->interception_conditions_count; - for (i = 0; i < n; i++) - if (is_condition_matching(conds + i, hashes, keys, headers, count)) - { - /* Report matching condition. */ - interception_out->client = client; - interception_out->priority = conds[i].priority; - interception_out->modifying = conds[i].modifying; - break; - } - - pthread_mutex_unlock(&(mutex)); - - return i < n; -} - - -/** - * Get all interceptors who have at least one condition matching any of a set of acceptable patterns - * - * @param sender The original sender of the message - * @param hashes The hashes of the accepted header names - * @param keys The header names - * @param headers The header name–value pairs - * @param count The number of accepted patterns - * @param interceptions_count_out Slot at where to store the number of found interceptors - * @return The found interceptors, `NULL` on error - */ -static queued_interception_t* get_interceptors(client_t* sender, size_t* hashes, char** keys, char** headers, - size_t count, size_t* interceptions_count_out) -{ - queued_interception_t* interceptions = NULL; - size_t interceptions_count = 0; - size_t n = 0; - ssize_t node; - - /* Count clients. */ - foreach_linked_list_node (client_list, node) - n++; - - /* Allocate interceptor list. */ - if (xmalloc(interceptions, n, queued_interception_t)) - return NULL; - - /* Search clients. */ - foreach_linked_list_node (client_list, node) - { - client_t* client = (client_t*)(void*)(client_list.values[node]); - - /* Look for and list a matching condition. */ - if (client->open && (client != sender)) - { - int r = find_matching_condition(client, hashes, keys, headers, count, - interceptions + interceptions_count); - if (r == -1) - { - free(interceptions); - return NULL; - } - if (r) - /* List client of there was a matching condition. */ - interceptions_count++; - } - } - - *interceptions_count_out = interceptions_count; - return interceptions; -} - - /** * Queue a message for multicasting * @@ -1295,175 +946,6 @@ void queue_message_multicast(char* message, size_t length, client_t* sender) } -/** - * Get the client by its socket's file descriptor in a synchronised manner - * - * @param socket_fd The file descriptor of the client's socket - * @return The client - */ -static client_t* client_by_socket(int socket_fd) -{ - size_t address; - with_mutex (slave_mutex, address = fd_table_get(&client_map, socket_fd);); - return (client_t*)(void*)address; -} - - -/** - * Send a multicast message to one recipient - * - * @param multicast The message - * @param recipient The recipient - * @param modifying Whether the recipient may modify the message - * @return Evaluates to true if and only if the entire message was sent - */ -static int send_multicast_to_recipient(multicast_t* multicast, client_t* recipient, int modifying) -{ - char* msg = multicast->message + multicast->message_ptr; - size_t n = multicast->message_length - multicast->message_ptr; - size_t sent; - - /* Skip Modify ID header if the interceptors will not perform a modification. */ - if ((modifying == 0) && (multicast->message_ptr == 0)) - { - n -= multicast->message_prefix; - multicast->message_ptr += multicast->message_prefix; - } - - /* Send the message. */ - n *= sizeof(char); - with_mutex (recipient->mutex, - if (recipient->open) - { - sent = send_message(recipient->socket_fd, msg + multicast->message_ptr, n); - n -= sent; - multicast->message_ptr += sent / sizeof(char); - if ((n > 0) && (errno != EINTR)) - perror(*argv); - } - ); - - return n == 0; -} - - -/** - * Wait for the recipient of a multicast to reply - * - * @param recipient The recipient - * @param modify_id The modify ID of the multicast - */ -static void wait_for_reply(client_t* recipient, uint64_t modify_id) -{ - /* pthread_cond_timedwait is required to handle re-exec and termination because - pthread_cond_timedwait and pthread_cond_wait ignore interruptions via signals. */ - struct timespec timeout = - { - .tv_sec = 1, - .tv_nsec = 0 - }; - - with_mutex_if (modify_mutex, recipient->modify_message == NULL, - if (hash_table_contains_key(&modify_map, (size_t)modify_id) == 0) - { - hash_table_put(&modify_map, (size_t)modify_id, (size_t)(void*)recipient); - pthread_cond_signal(&slave_cond); - } - ); - - with_mutex_if (recipient->modify_mutex, recipient->modify_message == NULL, - while ((recipient->modify_message == NULL) && (terminating == 0)) - pthread_cond_timedwait(&slave_cond, &slave_mutex, &timeout); - if (terminating == 0) - hash_table_remove(&modify_map, (size_t)modify_id); - ); -} - - -/** - * Multicast a message - * - * @param multicast The multicast message - */ -void multicast_message(multicast_t* multicast) -{ - uint64_t modify_id = 0; - size_t n = strlen("Modify ID: "); - if (startswith_n(multicast->message, "Modify ID: ", multicast->message_length, n)) - { - char* value = multicast->message + n; - char* lf = strchr(value, '\n'); - *lf = '\0'; - modify_id = (uint64_t)atoll(value); - *lf = '\n'; - } - - for (; multicast->interceptions_ptr < multicast->interceptions_count; multicast->interceptions_ptr++) - { - queued_interception_t client_ = multicast->interceptions[multicast->interceptions_ptr]; - client_t* client = client_.client; - int modifying = 0; - char* old_buf; - size_t i; - mds_message_t* mod; - - /* After unmarshalling at re-exec, client will be NULL and must be mapped from its socket. */ - if (client == NULL) - client_.client = client = client_by_socket(client_.socket_fd); - - /* Send the message to the recipient. */ - if (send_multicast_to_recipient(multicast, client, client_.modifying) == 0) - { - /* Stop if we are re-exec:ing or terminating, or continue to next recipient on error. */ - if (terminating) - return; - else - continue; - } - - /* Do not wait for a reply if it is non-modifying. */ - if (client_.modifying == 0) - { - /* Reset how much of the message has been sent before we continue with next recipient. */ - multicast->message_ptr = 0; - continue; - } - - /* Wait for a reply. */ - wait_for_reply(client, modify_id); - if (terminating) - return; - - /* Act upon the reply. */ - mod = client->modify_message; - for (i = 0; i < mod->header_count; i++) - if (strequals(mod->headers[i], "Modify: yes")) - { - modifying = 1; - break; - } - if (modifying) - { - n = mod->payload_size; - old_buf = multicast->message; - if (xrealloc(multicast->message, multicast->message_prefix + n, char)) - { - perror(*argv); - multicast->message = old_buf; - } - else - memcpy(multicast->message + multicast->message_prefix, mod->payload, n); - } - - /* Free the reply. */ - mds_message_destroy(client->modify_message); - - /* Reset how much of the message has been sent before we continue with next recipient. */ - multicast->message_ptr = 0; - } -} - - /** * Exec into the mdsinitrc script * @@ -1535,66 +1017,6 @@ void run_initrc(char** args) } -/** - * Called with the signal SIGUSR1 is caught. - * This function should cue a re-exec of the program. - * - * @param signo The caught signal - */ -void sigusr1_trap(int signo) -{ - if (reexecing == 0) - { - terminating = reexecing = 1; - eprint("re-exec signal received."); - signal_all(signo); - } -} - - -/** - * Called with the signal SIGTERM is caught. - * This function should cue a termination of the program. - * - * @param signo The caught signal - */ -void sigterm_trap(int signo) -{ - if (terminating == 0) - { - terminating = 1; - eprint("terminate signal received."); - signal_all(signo); - } -} - - -/** - * Send a singal to all threads except the current thread - * - * @param signo The signal - */ -void signal_all(int signo) -{ - pthread_t current_thread; - ssize_t node; - - current_thread = pthread_self(); - - if (pthread_equal(current_thread, master_thread) == 0) - pthread_kill(master_thread, signo); - - with_mutex (slave_mutex, - foreach_linked_list_node (client_list, node) - { - client_t* value = (client_t*)(void*)(client_list.values[node]); - if (pthread_equal(current_thread, value->thread) == 0) - pthread_kill(value->thread, signo); - } - ); -} - - /** * Marshal the server's state into a file * @@ -1635,7 +1057,7 @@ int marshal_server(int fd) /* Marshal the miscellaneous state data. */ buf_set_next(state_buf_, sig_atomic_t, running); - buf_set_next(state_buf_, uint64_t, next_id); + buf_set_next(state_buf_, uint64_t, next_client_id); buf_set_next(state_buf_, uint64_t, next_modify_id); /* Tell the program how large the marshalled client list is and how any clients are marshalled. */ @@ -1733,7 +1155,7 @@ int unmarshal_server(int fd) /* Unmarshal the miscellaneous state data. */ buf_get_next(state_buf_, sig_atomic_t, running); - buf_get_next(state_buf_, uint64_t, next_id); + buf_get_next(state_buf_, uint64_t, next_client_id); buf_get_next(state_buf_, uint64_t, next_modify_id); /* Get the marshalled size of the client list and how any clients that are marshalled. */ diff --git a/src/mds-server/mds-server.h b/src/mds-server/mds-server.h index 9abf095..3936e32 100644 --- a/src/mds-server/mds-server.h +++ b/src/mds-server/mds-server.h @@ -18,14 +18,12 @@ #ifndef MDS_MDS_SERVER_H #define MDS_MDS_SERVER_H - #include "client.h" #include "multicast.h" #include - /** * Master function for slave threads * @@ -57,17 +55,6 @@ void send_reply_queue(client_t* client); */ int message_received(client_t* client); -/** - * Add an interception condition for a client - * - * @param client The client - * @param condition The header, optionally with value, to look for, or empty (not `NULL`) for all messages - * @param priority Interception priority - * @param modifying Whether the client may modify the messages - * @param stop Whether the condition should be removed rather than added - */ -void add_intercept_condition(client_t* client, char* condition, int64_t priority, int modifying, int stop); - /** * Queue a message for multicasting * @@ -85,13 +72,6 @@ void queue_message_multicast(char* message, size_t length, client_t* sender); */ int fetch_message(client_t* client); -/** - * Multicast a message - * - * @param multicast The multicast message - */ -void multicast_message(multicast_t* multicast); - /** * Exec into the mdsinitrc script * @@ -99,29 +79,6 @@ void multicast_message(multicast_t* multicast); */ void run_initrc(char** args); -/** - * Called with the signal SIGUSR1 is caught. - * This function should cue a re-exec of the program. - * - * @param signo The caught signal - */ -void sigusr1_trap(int signo); - -/** - * Called with the signal SIGTERM is caught. - * This function should cue a termination of the program. - * - * @param signo The caught signal - */ -void sigterm_trap(int signo); - -/** - * Send a singal to all threads except the current thread - * - * @param signo The signal - */ -void signal_all(int signo); - /** * Marshal the server's state into a file * diff --git a/src/mds-server/sending.c b/src/mds-server/sending.c new file mode 100644 index 0000000..93fba5a --- /dev/null +++ b/src/mds-server/sending.c @@ -0,0 +1,204 @@ +/** + * mds — A micro-display server + * Copyright © 2014 Mattias Andrée (maandree@member.fsf.org) + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program. If not, see . + */ +#include "sending.h" + +#include "globals.h" +#include "client.h" +#include "queued_interception.h" +#include "multicast.h" + +#include +#include +#include + +#include +#include +#include +#include +#include + + + +/** + * Get the client by its socket's file descriptor in a synchronised manner + * + * @param socket_fd The file descriptor of the client's socket + * @return The client + */ +static client_t* client_by_socket(int socket_fd) +{ + size_t address; + with_mutex (slave_mutex, address = fd_table_get(&client_map, socket_fd);); + return (client_t*)(void*)address; +} + + +/** + * Send a multicast message to one recipient + * + * @param multicast The message + * @param recipient The recipient + * @param modifying Whether the recipient may modify the message + * @return Evaluates to true if and only if the entire message was sent + */ +static int send_multicast_to_recipient(multicast_t* multicast, client_t* recipient, int modifying) +{ + char* msg = multicast->message + multicast->message_ptr; + size_t n = multicast->message_length - multicast->message_ptr; + size_t sent; + + /* Skip Modify ID header if the interceptors will not perform a modification. */ + if ((modifying == 0) && (multicast->message_ptr == 0)) + { + n -= multicast->message_prefix; + multicast->message_ptr += multicast->message_prefix; + } + + /* Send the message. */ + n *= sizeof(char); + with_mutex (recipient->mutex, + if (recipient->open) + { + sent = send_message(recipient->socket_fd, msg + multicast->message_ptr, n); + n -= sent; + multicast->message_ptr += sent / sizeof(char); + if ((n > 0) && (errno != EINTR)) + perror(*argv); + } + ); + + return n == 0; +} + + +/** + * Wait for the recipient of a multicast to reply + * + * @param recipient The recipient + * @param modify_id The modify ID of the multicast + */ +static void wait_for_reply(client_t* recipient, uint64_t modify_id) +{ + /* pthread_cond_timedwait is required to handle re-exec and termination because + pthread_cond_timedwait and pthread_cond_wait ignore interruptions via signals. */ + struct timespec timeout = + { + .tv_sec = 1, + .tv_nsec = 0 + }; + + with_mutex_if (modify_mutex, recipient->modify_message == NULL, + if (hash_table_contains_key(&modify_map, (size_t)modify_id) == 0) + { + hash_table_put(&modify_map, (size_t)modify_id, (size_t)(void*)recipient); + pthread_cond_signal(&slave_cond); + } + ); + + with_mutex_if (recipient->modify_mutex, recipient->modify_message == NULL, + while ((recipient->modify_message == NULL) && (terminating == 0)) + pthread_cond_timedwait(&slave_cond, &slave_mutex, &timeout); + if (terminating == 0) + hash_table_remove(&modify_map, (size_t)modify_id); + ); +} + + +/** + * Multicast a message + * + * @param multicast The multicast message + */ +void multicast_message(multicast_t* multicast) +{ + uint64_t modify_id = 0; + size_t n = strlen("Modify ID: "); + if (startswith_n(multicast->message, "Modify ID: ", multicast->message_length, n)) + { + char* value = multicast->message + n; + char* lf = strchr(value, '\n'); + *lf = '\0'; + modify_id = (uint64_t)atoll(value); + *lf = '\n'; + } + + for (; multicast->interceptions_ptr < multicast->interceptions_count; multicast->interceptions_ptr++) + { + queued_interception_t client_ = multicast->interceptions[multicast->interceptions_ptr]; + client_t* client = client_.client; + int modifying = 0; + char* old_buf; + size_t i; + mds_message_t* mod; + + /* After unmarshalling at re-exec, client will be NULL and must be mapped from its socket. */ + if (client == NULL) + client_.client = client = client_by_socket(client_.socket_fd); + + /* Send the message to the recipient. */ + if (send_multicast_to_recipient(multicast, client, client_.modifying) == 0) + { + /* Stop if we are re-exec:ing or terminating, or continue to next recipient on error. */ + if (terminating) + return; + else + continue; + } + + /* Do not wait for a reply if it is non-modifying. */ + if (client_.modifying == 0) + { + /* Reset how much of the message has been sent before we continue with next recipient. */ + multicast->message_ptr = 0; + continue; + } + + /* Wait for a reply. */ + wait_for_reply(client, modify_id); + if (terminating) + return; + + /* Act upon the reply. */ + mod = client->modify_message; + for (i = 0; i < mod->header_count; i++) + if (strequals(mod->headers[i], "Modify: yes")) + { + modifying = 1; + break; + } + if (modifying) + { + n = mod->payload_size; + old_buf = multicast->message; + if (xrealloc(multicast->message, multicast->message_prefix + n, char)) + { + perror(*argv); + multicast->message = old_buf; + } + else + memcpy(multicast->message + multicast->message_prefix, mod->payload, n); + } + + /* Free the reply. */ + mds_message_destroy(client->modify_message); + + /* Reset how much of the message has been sent before we continue with next recipient. */ + multicast->message_ptr = 0; + } +} + diff --git a/src/mds-server/sending.h b/src/mds-server/sending.h new file mode 100644 index 0000000..693bd37 --- /dev/null +++ b/src/mds-server/sending.h @@ -0,0 +1,34 @@ +/** + * mds — A micro-display server + * Copyright © 2014 Mattias Andrée (maandree@member.fsf.org) + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program. If not, see . + */ +#ifndef MDS_MDS_SERVER_SENDING_H +#define MDS_MDS_SERVER_SENDING_H + + +#include "multicast.h" + + +/** + * Multicast a message + * + * @param multicast The multicast message + */ +void multicast_message(multicast_t* multicast); + + +#endif + diff --git a/src/mds-server/signals.c b/src/mds-server/signals.c new file mode 100644 index 0000000..18a32b6 --- /dev/null +++ b/src/mds-server/signals.c @@ -0,0 +1,109 @@ +/** + * mds — A micro-display server + * Copyright © 2014 Mattias Andrée (maandree@member.fsf.org) + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program. If not, see . + */ +#include "signals.h" + +#include "globals.h" +#include "client.h" + +#include +#include +#include + +#include +#include +#include +#include +#include + + +/** + * Send a singal to all threads except the current thread + * + * @param signo The signal + */ +static void signal_all(int signo) +{ + pthread_t current_thread; + ssize_t node; + + current_thread = pthread_self(); + + if (pthread_equal(current_thread, master_thread) == 0) + pthread_kill(master_thread, signo); + + with_mutex (slave_mutex, + foreach_linked_list_node (client_list, node) + { + client_t* value = (client_t*)(void*)(client_list.values[node]); + if (pthread_equal(current_thread, value->thread) == 0) + pthread_kill(value->thread, signo); + } + ); +} + + +/** + * Called with the signal SIGUSR1 is caught. + * This function should cue a re-exec of the program. + * + * @param signo The caught signal + */ +static void sigusr1_trap(int signo) +{ + if (reexecing == 0) + { + terminating = reexecing = 1; + eprint("re-exec signal received."); + signal_all(signo); + } +} + + +/** + * Called with the signal SIGTERM is caught. + * This function should cue a termination of the program. + * + * @param signo The caught signal + */ +static void sigterm_trap(int signo) +{ + if (terminating == 0) + { + terminating = 1; + eprint("terminate signal received."); + signal_all(signo); + } +} + + +/** + * Set up signal traps for all especially handled signals + * + * @return Zero on success, -1 on error + */ +int trap_signals(void) +{ + /* Make the server update without all slaves dying on SIGUSR1. */ + if (xsigaction(SIGUSR1, sigusr1_trap) < 0) return -1; + + /* Implement clean exit on SIGTERM. */ + if (xsigaction(SIGTERM, sigterm_trap) < 0) return -1; + + return 0; +} + diff --git a/src/mds-server/signals.h b/src/mds-server/signals.h new file mode 100644 index 0000000..b4a49ed --- /dev/null +++ b/src/mds-server/signals.h @@ -0,0 +1,31 @@ +/** + * mds — A micro-display server + * Copyright © 2014 Mattias Andrée (maandree@member.fsf.org) + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program. If not, see . + */ +#ifndef MDS_MDS_SERVER_SIGNALS_H +#define MDS_MDS_SERVER_SIGNALS_H + + +/** + * Set up signal traps for all especially handled signals + * + * @return Zero on success, -1 on error + */ +int trap_signals(void); + + +#endif + -- cgit v1.2.3-70-g09d2