aboutsummaryrefslogtreecommitdiffstats
path: root/src/mds-server/mds-server.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/mds-server/mds-server.c')
-rw-r--r--src/mds-server/mds-server.c600
1 files changed, 11 insertions, 589 deletions
diff --git a/src/mds-server/mds-server.c b/src/mds-server/mds-server.c
index d835a80..b129170 100644
--- a/src/mds-server/mds-server.c
+++ b/src/mds-server/mds-server.c
@@ -17,10 +17,14 @@
*/
#include "mds-server.h"
+#include "globals.h"
#include "interception_condition.h"
#include "client.h"
#include "queued_interception.h"
#include "multicast.h"
+#include "signals.h"
+#include "interceptors.h"
+#include "sending.h"
#include <libmdsserver/config.h>
#include <libmdsserver/linked-list.h>
@@ -31,7 +35,6 @@
#include <libmdsserver/util.h>
#include <libmdsserver/hash-help.h>
-#include <signal.h>
#include <stdio.h>
#include <string.h>
#include <stdlib.h>
@@ -51,93 +54,6 @@
-#define MDS_SERVER_VARS_VERSION 0
-
-
-/**
- * Number of elements in `argv`
- */
-static int argc;
-
-/**
- * Command line arguments
- */
-static char** argv;
-
-
-/**
- * The program run state, 1 when running, 0 when shutting down
- */
-static volatile sig_atomic_t running = 1;
-
-/**
- * Non-zero when the program is about to re-exec.
- * Most at all times be at least as true as `terminating`.
- */
-static volatile sig_atomic_t reexecing = 0;
-
-/**
- * Non-zero when the program is about to terminate
- */
-static volatile sig_atomic_t terminating = 0;
-
-/**
- * The number of running slaves
- */
-static int running_slaves = 0;
-
-/**
- * Mutex for slave data
- */
-static pthread_mutex_t slave_mutex;
-
-/**
- * Condition for slave data
- */
-static pthread_cond_t slave_cond;
-
-/**
- * The thread that runs the master loop
- */
-static pthread_t master_thread;
-
-/**
- * Map from client socket file descriptor to all information (client_t)
- */
-static fd_table_t client_map;
-
-/**
- * List of client information (client_t)
- */
-static linked_list_t client_list;
-
-/**
- * The next free ID for a client
- */
-static uint64_t next_id = 1;
-
-/**
- * The next free ID for a message modifications
- */
-static uint64_t next_modify_id = 1;
-
-/**
- * Mutex for message modification
- */
-static pthread_mutex_t modify_mutex;
-
-/**
- * Condition for message modification
- */
-static pthread_cond_t modify_cond;
-
-/**
- * Map from modification ID to waiting client
- */
-static hash_table_t modify_map;
-
-
-
/**
* Entry point of the server
*
@@ -252,11 +168,8 @@ int main(int argc_, char** argv_)
/* Store the current thread so it can be killed from elsewhere. */
master_thread = pthread_self();
- /* Make the server update without all slaves dying on SIGUSR1. */
- error_if (1, xsigaction(SIGUSR1, sigusr1_trap) < 0);
-
- /* Implement clean exit on SIGTERM. */
- error_if (1, xsigaction(SIGTERM, sigterm_trap) < 0);
+ /* Set up traps for especially handled signals. */
+ error_if (1, trap_signals() < 0);
/* Create mutex and condition for slave counter. */
error_if (1, (errno = pthread_mutex_init(&slave_mutex, NULL)));
@@ -458,12 +371,8 @@ void* slave_loop(void* data)
/* Store slave thread and create mutexes and conditions. */
fail_if (client_initialise_threading(information));
-
- /* Make the server update without all slaves dying on SIGUSR1. */
- fail_if (xsigaction(SIGUSR1, sigusr1_trap) < 0);
-
- /* Implement clean exit on SIGTERM. */
- fail_if (xsigaction(SIGTERM, sigterm_trap) < 0);
+ /* Set up traps for especially handled signals. */
+ fail_if (trap_signals() < 0);
/* Fetch messages from the slave. */
@@ -742,7 +651,7 @@ int message_received(client_t* client)
if (assign_id && (client->id == 0))
{
intercept |= 2;
- with_mutex_if (slave_mutex, (client->id = ++next_id) == 0,
+ with_mutex_if (slave_mutex, (client->id = ++next_client_id) == 0,
eprint("this is impossible, ID counter has overflowed.");
/* If the program ran for a millennium it would
take c:a 585 assignments per nanosecond. This
@@ -893,145 +802,6 @@ int message_received(client_t* client)
/**
- * Remove interception condition by index
- *
- * @param client The intercepting client
- * @param index The index of the condition
- */
-static void remove_intercept_condition(client_t* client, size_t index)
-{
- interception_condition_t* conds = client->interception_conditions;
- size_t n = client->interception_conditions_count;
-
- /* Remove the condition from the list. */
- memmove(conds + index, conds + index + 1, --n - index);
- client->interception_conditions_count--;
-
- /* Shrink the list. */
- if (client->interception_conditions_count == 0)
- {
- free(conds);
- client->interception_conditions = NULL;
- }
- else
- if (xrealloc(conds, n, interception_condition_t))
- perror(*argv);
- else
- client->interception_conditions = conds;
-}
-
-
-/**
- * Add an interception condition for a client
- *
- * @param client The client
- * @param condition The header, optionally with value, to look for, or empty (not `NULL`) for all messages
- * @param priority Interception priority
- * @param modifying Whether the client may modify the messages
- * @param stop Whether the condition should be removed rather than added
- */
-void add_intercept_condition(client_t* client, char* condition, int64_t priority, int modifying, int stop)
-{
- size_t n = client->interception_conditions_count;
- interception_condition_t* conds = client->interception_conditions;
- ssize_t nonmodifying = -1;
- char* header = condition;
- char* value;
- size_t hash;
- size_t i;
-
- /* Split header and value apart. */
- if ((value = strchr(header, ':')) != NULL)
- {
- *value = '\0'; /* NUL-terminate header. */
- value += 2; /* Skip over delimiter. */
- }
-
- /* Calcuate header hash (comparison optimisation) */
- hash = string_hash(header);
-
- /* Remove of update condition of already registered,
- also look for non-modifying condition to swap position
- with for optimisation. */
- for (i = 0; i < n; i++)
- {
- if ((conds[i].header_hash != hash) || !strequals(conds[i].condition, condition))
- {
- /* Look for the first non-modifying, this is a part of the
- optimisation where we put all modifying conditions at the
- beginning. */
- if ((nonmodifying < 0) && conds[i].modifying)
- nonmodifying = (ssize_t)i;
- continue;
- }
-
- if (stop)
- remove_intercept_condition(client, i);
- else
- {
- /* Update parameters. */
- conds[i].priority = priority;
- conds[i].modifying = modifying;
-
- if (modifying && (nonmodifying >= 0))
- {
- /* Optimisation: put conditions that are modifying
- at the beginning. When a client is intercepting
- we most know if any satisfying condition is
- modifying. With this optimisation the first
- satisfying condition will tell us if there is
- any satisfying condition that is modifying. */
- interception_condition_t temp = conds[nonmodifying];
- conds[nonmodifying] = conds[i];
- conds[i] = temp;
- }
- }
- return;
- }
-
- if (stop)
- eprint("client tried to stop intercepting messages that it does not intercept.");
- else
- {
- /* Duplicate condition string. */
- if ((condition = strdup(condition)) == NULL)
- {
- perror(*argv);
- return;
- }
-
- /* Grow the interception condition list. */
- if (xrealloc(conds, n + 1, interception_condition_t))
- {
- perror(*argv);
- free(condition);
- return;
- }
- client->interception_conditions = conds;
- /* Store condition. */
- client->interception_conditions_count++;
- conds[n].condition = condition;
- conds[n].header_hash = hash;
- conds[n].priority = priority;
- conds[n].modifying = modifying;
-
- if (modifying && (nonmodifying >= 0))
- {
- /* Optimisation: put conditions that are modifying
- at the beginning. When a client is intercepting
- we most know if any satisfying condition is
- modifying. With this optimisation the first
- satisfying condition will tell us if there is
- any satisfying condition that is modifying. */
- interception_condition_t temp = conds[nonmodifying];
- conds[nonmodifying] = conds[n];
- conds[n] = temp;
- }
- }
-}
-
-
-/**
* Compare two queued interceptors by priority
*
* @param a:const queued_interception_t* One of the interceptors
@@ -1048,125 +818,6 @@ static int cmp_queued_interception(const void* a, const void* b)
/**
- * Check if a condition matches any of a set of accepted patterns
- *
- * @param cond The condition
- * @param hashes The hashes of the accepted header names
- * @param keys The header names
- * @param headers The header name–value pairs
- * @param count The number of accepted patterns
- * @return Evaluates to true if and only if a matching pattern was found
- */
-static int is_condition_matching(interception_condition_t* cond,
- size_t* hashes, char** keys, char** headers, size_t count)
-{
- size_t i;
- for (i = 0; i < count; i++)
- if (*(cond->condition) == '\0')
- return 1;
- else if ((cond->header_hash == hashes[i]) &&
- (strequals(cond->condition, keys[i]) ||
- strequals(cond->condition, headers[i])))
- return 1;
- return 0;
-}
-
-
-/**
- * Find a matching condition to any of a set of acceptable conditions
- *
- * @param client The intercepting client
- * @param hashes The hashes of the accepted header names
- * @param keys The header names
- * @param headers The header name–value pairs
- * @param count The number of accepted patterns
- * @param interception_out Storage slot for found interception
- * @return -1 on error, otherwise: evalutes to true iff a matching condition was found
- */
-static int find_matching_condition(client_t* client, size_t* hashes, char** keys, char** headers, size_t count,
- queued_interception_t* interception_out)
-{
- pthread_mutex_t mutex = client->mutex;
- interception_condition_t* conds = client->interception_conditions;
- size_t n = 0, i;
-
- errno = pthread_mutex_lock(&(mutex));
- if (errno)
- return -1;
-
- /* Look for a matching condition. */
- if (client->open)
- n = client->interception_conditions_count;
- for (i = 0; i < n; i++)
- if (is_condition_matching(conds + i, hashes, keys, headers, count))
- {
- /* Report matching condition. */
- interception_out->client = client;
- interception_out->priority = conds[i].priority;
- interception_out->modifying = conds[i].modifying;
- break;
- }
-
- pthread_mutex_unlock(&(mutex));
-
- return i < n;
-}
-
-
-/**
- * Get all interceptors who have at least one condition matching any of a set of acceptable patterns
- *
- * @param sender The original sender of the message
- * @param hashes The hashes of the accepted header names
- * @param keys The header names
- * @param headers The header name–value pairs
- * @param count The number of accepted patterns
- * @param interceptions_count_out Slot at where to store the number of found interceptors
- * @return The found interceptors, `NULL` on error
- */
-static queued_interception_t* get_interceptors(client_t* sender, size_t* hashes, char** keys, char** headers,
- size_t count, size_t* interceptions_count_out)
-{
- queued_interception_t* interceptions = NULL;
- size_t interceptions_count = 0;
- size_t n = 0;
- ssize_t node;
-
- /* Count clients. */
- foreach_linked_list_node (client_list, node)
- n++;
-
- /* Allocate interceptor list. */
- if (xmalloc(interceptions, n, queued_interception_t))
- return NULL;
-
- /* Search clients. */
- foreach_linked_list_node (client_list, node)
- {
- client_t* client = (client_t*)(void*)(client_list.values[node]);
-
- /* Look for and list a matching condition. */
- if (client->open && (client != sender))
- {
- int r = find_matching_condition(client, hashes, keys, headers, count,
- interceptions + interceptions_count);
- if (r == -1)
- {
- free(interceptions);
- return NULL;
- }
- if (r)
- /* List client of there was a matching condition. */
- interceptions_count++;
- }
- }
-
- *interceptions_count_out = interceptions_count;
- return interceptions;
-}
-
-
-/**
* Queue a message for multicasting
*
* @param message The message
@@ -1296,175 +947,6 @@ void queue_message_multicast(char* message, size_t length, client_t* sender)
/**
- * Get the client by its socket's file descriptor in a synchronised manner
- *
- * @param socket_fd The file descriptor of the client's socket
- * @return The client
- */
-static client_t* client_by_socket(int socket_fd)
-{
- size_t address;
- with_mutex (slave_mutex, address = fd_table_get(&client_map, socket_fd););
- return (client_t*)(void*)address;
-}
-
-
-/**
- * Send a multicast message to one recipient
- *
- * @param multicast The message
- * @param recipient The recipient
- * @param modifying Whether the recipient may modify the message
- * @return Evaluates to true if and only if the entire message was sent
- */
-static int send_multicast_to_recipient(multicast_t* multicast, client_t* recipient, int modifying)
-{
- char* msg = multicast->message + multicast->message_ptr;
- size_t n = multicast->message_length - multicast->message_ptr;
- size_t sent;
-
- /* Skip Modify ID header if the interceptors will not perform a modification. */
- if ((modifying == 0) && (multicast->message_ptr == 0))
- {
- n -= multicast->message_prefix;
- multicast->message_ptr += multicast->message_prefix;
- }
-
- /* Send the message. */
- n *= sizeof(char);
- with_mutex (recipient->mutex,
- if (recipient->open)
- {
- sent = send_message(recipient->socket_fd, msg + multicast->message_ptr, n);
- n -= sent;
- multicast->message_ptr += sent / sizeof(char);
- if ((n > 0) && (errno != EINTR))
- perror(*argv);
- }
- );
-
- return n == 0;
-}
-
-
-/**
- * Wait for the recipient of a multicast to reply
- *
- * @param recipient The recipient
- * @param modify_id The modify ID of the multicast
- */
-static void wait_for_reply(client_t* recipient, uint64_t modify_id)
-{
- /* pthread_cond_timedwait is required to handle re-exec and termination because
- pthread_cond_timedwait and pthread_cond_wait ignore interruptions via signals. */
- struct timespec timeout =
- {
- .tv_sec = 1,
- .tv_nsec = 0
- };
-
- with_mutex_if (modify_mutex, recipient->modify_message == NULL,
- if (hash_table_contains_key(&modify_map, (size_t)modify_id) == 0)
- {
- hash_table_put(&modify_map, (size_t)modify_id, (size_t)(void*)recipient);
- pthread_cond_signal(&slave_cond);
- }
- );
-
- with_mutex_if (recipient->modify_mutex, recipient->modify_message == NULL,
- while ((recipient->modify_message == NULL) && (terminating == 0))
- pthread_cond_timedwait(&slave_cond, &slave_mutex, &timeout);
- if (terminating == 0)
- hash_table_remove(&modify_map, (size_t)modify_id);
- );
-}
-
-
-/**
- * Multicast a message
- *
- * @param multicast The multicast message
- */
-void multicast_message(multicast_t* multicast)
-{
- uint64_t modify_id = 0;
- size_t n = strlen("Modify ID: ");
- if (startswith_n(multicast->message, "Modify ID: ", multicast->message_length, n))
- {
- char* value = multicast->message + n;
- char* lf = strchr(value, '\n');
- *lf = '\0';
- modify_id = (uint64_t)atoll(value);
- *lf = '\n';
- }
-
- for (; multicast->interceptions_ptr < multicast->interceptions_count; multicast->interceptions_ptr++)
- {
- queued_interception_t client_ = multicast->interceptions[multicast->interceptions_ptr];
- client_t* client = client_.client;
- int modifying = 0;
- char* old_buf;
- size_t i;
- mds_message_t* mod;
-
- /* After unmarshalling at re-exec, client will be NULL and must be mapped from its socket. */
- if (client == NULL)
- client_.client = client = client_by_socket(client_.socket_fd);
-
- /* Send the message to the recipient. */
- if (send_multicast_to_recipient(multicast, client, client_.modifying) == 0)
- {
- /* Stop if we are re-exec:ing or terminating, or continue to next recipient on error. */
- if (terminating)
- return;
- else
- continue;
- }
-
- /* Do not wait for a reply if it is non-modifying. */
- if (client_.modifying == 0)
- {
- /* Reset how much of the message has been sent before we continue with next recipient. */
- multicast->message_ptr = 0;
- continue;
- }
-
- /* Wait for a reply. */
- wait_for_reply(client, modify_id);
- if (terminating)
- return;
-
- /* Act upon the reply. */
- mod = client->modify_message;
- for (i = 0; i < mod->header_count; i++)
- if (strequals(mod->headers[i], "Modify: yes"))
- {
- modifying = 1;
- break;
- }
- if (modifying)
- {
- n = mod->payload_size;
- old_buf = multicast->message;
- if (xrealloc(multicast->message, multicast->message_prefix + n, char))
- {
- perror(*argv);
- multicast->message = old_buf;
- }
- else
- memcpy(multicast->message + multicast->message_prefix, mod->payload, n);
- }
-
- /* Free the reply. */
- mds_message_destroy(client->modify_message);
-
- /* Reset how much of the message has been sent before we continue with next recipient. */
- multicast->message_ptr = 0;
- }
-}
-
-
-/**
* Exec into the mdsinitrc script
*
* @param args The arguments to the child process
@@ -1536,66 +1018,6 @@ void run_initrc(char** args)
/**
- * Called with the signal SIGUSR1 is caught.
- * This function should cue a re-exec of the program.
- *
- * @param signo The caught signal
- */
-void sigusr1_trap(int signo)
-{
- if (reexecing == 0)
- {
- terminating = reexecing = 1;
- eprint("re-exec signal received.");
- signal_all(signo);
- }
-}
-
-
-/**
- * Called with the signal SIGTERM is caught.
- * This function should cue a termination of the program.
- *
- * @param signo The caught signal
- */
-void sigterm_trap(int signo)
-{
- if (terminating == 0)
- {
- terminating = 1;
- eprint("terminate signal received.");
- signal_all(signo);
- }
-}
-
-
-/**
- * Send a singal to all threads except the current thread
- *
- * @param signo The signal
- */
-void signal_all(int signo)
-{
- pthread_t current_thread;
- ssize_t node;
-
- current_thread = pthread_self();
-
- if (pthread_equal(current_thread, master_thread) == 0)
- pthread_kill(master_thread, signo);
-
- with_mutex (slave_mutex,
- foreach_linked_list_node (client_list, node)
- {
- client_t* value = (client_t*)(void*)(client_list.values[node]);
- if (pthread_equal(current_thread, value->thread) == 0)
- pthread_kill(value->thread, signo);
- }
- );
-}
-
-
-/**
* Marshal the server's state into a file
*
* @param fd The file descriptor
@@ -1635,7 +1057,7 @@ int marshal_server(int fd)
/* Marshal the miscellaneous state data. */
buf_set_next(state_buf_, sig_atomic_t, running);
- buf_set_next(state_buf_, uint64_t, next_id);
+ buf_set_next(state_buf_, uint64_t, next_client_id);
buf_set_next(state_buf_, uint64_t, next_modify_id);
/* Tell the program how large the marshalled client list is and how any clients are marshalled. */
@@ -1733,7 +1155,7 @@ int unmarshal_server(int fd)
/* Unmarshal the miscellaneous state data. */
buf_get_next(state_buf_, sig_atomic_t, running);
- buf_get_next(state_buf_, uint64_t, next_id);
+ buf_get_next(state_buf_, uint64_t, next_client_id);
buf_get_next(state_buf_, uint64_t, next_modify_id);
/* Get the marshalled size of the client list and how any clients that are marshalled. */