/** * mds — A micro-display server * Copyright © 2014 Mattias Andrée (maandree@member.fsf.org) * * This program is free software: you can redistribute it and/or modify * it under the terms of the GNU General Public License as published by * the Free Software Foundation, either version 3 of the License, or * (at your option) any later version. * * This program is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * GNU General Public License for more details. * * You should have received a copy of the GNU General Public License * along with this program. If not, see . */ #include "mds-server.h" #include "interception_condition.h" #include "client.h" #include "queued_interception.h" #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #define MDS_SERVER_VARS_VERSION 0 /** * Number of elements in `argv` */ static int argc; /** * Command line arguments */ static char** argv; /** * The program run state, 1 when running, 0 when shutting down */ static volatile sig_atomic_t running = 1; /** * Non-zero when the program is about to re-exec */ static volatile sig_atomic_t reexecing = 0; /** * The number of running slaves */ static int running_slaves = 0; /** * Mutex for slave data */ static pthread_mutex_t slave_mutex; /** * Condition for slave data */ static pthread_cond_t slave_cond; /** * The thread that runs the master loop */ static pthread_t master_thread; /** * Map from client socket file descriptor to all information (client_t) */ static fd_table_t client_map; /** * List of client information (client_t) */ static linked_list_t client_list; /** * The next free ID for a client */ static uint64_t next_id = 1; /** * Entry point of the server * * @param argc_ Number of elements in `argv_` * @param argv_ Command line arguments * @return Non-zero on error */ int main(int argc_, char** argv_) { int is_respawn = -1; int socket_fd = -1; int reexec = 0; int unparsed_args_ptr = 1; char* unparsed_args[ARGC_LIMIT + LIBEXEC_ARGC_EXTRA_LIMIT + 1]; int i; pthread_t _slave_thread; argc = argc_; argv = argv_; /* Drop privileges like it's hot. */ if (drop_privileges()) { perror(*argv); return 1; } /* Sanity check the number of command line arguments. */ if (argc > ARGC_LIMIT + LIBEXEC_ARGC_EXTRA_LIMIT) { eprint("that number of arguments is ridiculous, I will not allow it."); return 1; } /* Parse command line arguments. */ for (i = 1; i < argc; i++) { char* arg = argv[i]; int v; if ((v = strequals(arg, "--initial-spawn")) || /* Initial spawn? */ strequals(arg, "--respawn")) /* Respawning after crash? */ if (is_respawn == v) { eprintf("conflicting arguments %s and %s cannot be combined.", "--initial-spawn", "--respawn"); return 1; } else is_respawn = !v; else if (startswith(arg, "--socket-fd=")) /* Socket file descriptor. */ { if (socket_fd != -1) { eprintf("duplicate declaration of %s.", "--socket-fd"); return 1; } if (strict_atoi(arg += strlen("--socket-fd="), &socket_fd, 0, INT_MAX) < 0) { eprintf("invalid value for %s: %s.", "--socket-fd", arg); return 1; } } else if (strequals(arg, "--re-exec")) /* Re-exec state-marshal. */ reexec = 1; else /* Not recognised, it is probably for another server. */ unparsed_args[unparsed_args_ptr++] = arg; } unparsed_args[unparsed_args_ptr] = NULL; if (reexec) is_respawn = 1; /* Check that manditory arguments have been specified. */ if (is_respawn < 0) { eprintf("missing state argument, require either %s or %s.", "--initial-spawn", "--respawn"); return 1; } if (socket_fd < 0) { eprint("missing socket file descriptor argument."); return 1; } /* Run mdsinitrc. */ if (is_respawn == 0) { pid_t pid = fork(); if (pid == (pid_t)-1) { perror(*argv); return 1; } if (pid == 0) /* Child process exec:s, the parent continues without waiting for it. */ { /* Close all files except stdin, stdout and stderr. */ close_files((fd > 2) || (fd == socket_fd)); /* Run initrc */ run_initrc(unparsed_args); return 1; } } /* Create list and table of clients. */ if (reexec == 0) { if (fd_table_create(&client_map)) { perror(*argv); fd_table_destroy(&client_map, NULL, NULL); return 1; } if (linked_list_create(&client_list, 32)) { perror(*argv); fd_table_destroy(&client_map, NULL, NULL); linked_list_destroy(&client_list); return 1; } } /* Store the current thread so it can be killed from elsewhere. */ master_thread = pthread_self(); /* Make the server update without all slaves dying on SIGUSR1. */ if (xsigaction(SIGUSR1, sigusr1_trap) < 0) { perror(*argv); fd_table_destroy(&client_map, NULL, NULL); linked_list_destroy(&client_list); return 1; } /* Create mutex and condition for slave counter. */ pthread_mutex_init(&slave_mutex, NULL); pthread_cond_init(&slave_cond, NULL); /* Unmarshal the state of the server. */ if (reexec) { pid_t pid = getpid(); int reexec_fd, r; char shm_path[NAME_MAX + 1]; xsnprintf(shm_path, SHM_PATH_PATTERN, (unsigned long int)pid); reexec_fd = shm_open(shm_path, O_RDWR | O_CREAT | O_EXCL, S_IRWXU); if (reexec_fd < 0) { perror(*argv); r = -1; } else { r = unmarshal_server(reexec_fd); close(reexec_fd); shm_unlink(shm_path); } if (r < 0) { /* Close all files (hopefully sockets) we do not know what they are. */ close_files((fd > 2) && (fd != socket_fd) && (fd_table_contains_key(&client_map, fd) == 0)); } } /* Accepting incoming connections. */ while (running && (reexecing == 0)) { /* Accept connection. */ int client_fd = accept(socket_fd, NULL, NULL); /* Handle errors and shutdown. */ if (client_fd == -1) { switch (errno) { case EINTR: /* Interrupted. */ if (reexecing) goto reexec; break; case ECONNABORTED: case EINVAL: /* Closing. */ running = 0; break; default: /* Error. */ perror(*argv); break; } continue; } /* Increase number of running slaves. */ with_mutex(slave_mutex, running_slaves++;); /* Start slave thread. */ errno = pthread_create(&_slave_thread, NULL, slave_loop, (void*)(intptr_t)client_fd); if (errno) { perror(*argv); with_mutex(slave_mutex, running_slaves--;); } } if (reexecing) goto reexec; /* Wait for all slaves to close. */ with_mutex(slave_mutex, while (running_slaves > 0) pthread_cond_wait(&slave_cond, &slave_mutex);); /* Release resources. */ fd_table_destroy(&client_map, NULL, NULL); linked_list_destroy(&client_list); pthread_mutex_destroy(&slave_mutex); pthread_cond_destroy(&slave_cond); return 0; reexec: { pid_t pid = getpid(); int reexec_fd; char shm_path[NAME_MAX + 1]; /* Join with all slaves threads. */ with_mutex(slave_mutex, while (running_slaves > 0) pthread_cond_wait(&slave_cond, &slave_mutex);); /* Release resources. */ pthread_mutex_destroy(&slave_mutex); pthread_cond_destroy(&slave_cond); /* Marshal the state of the server. */ xsnprintf(shm_path, SHM_PATH_PATTERN, (unsigned long int)pid); reexec_fd = shm_open(shm_path, O_RDWR | O_CREAT | O_EXCL, S_IRWXU); if (reexec_fd < 0) { perror(*argv); return 1; } if (marshal_server(reexec_fd) < 0) goto reexec_fail; close(reexec_fd); reexec_fd = -1; /* Re-exec the server. */ reexec_server(argc, argv, reexec); reexec_fail: perror(*argv); if (reexec_fd >= 0) close(reexec_fd); shm_unlink(shm_path); /* Returning non-zero is important, otherwise the server cannot be respawn if the re-exec fails. */ return 1; } } /** * Master function for slave threads * * @param data Input data * @return Outout data */ void* slave_loop(void* data) { int socket_fd = (int)(intptr_t)data; ssize_t entry = LINKED_LIST_UNUSED; size_t information_address = fd_table_get(&client_map, (size_t)socket_fd); client_t* information = (client_t*)(void*)information_address; int mutex_created = 0; char* msgbuf = NULL; size_t n; size_t tmp; int r; if (information == NULL) { /* Create information table. */ if (xmalloc(information, 1, client_t)) { perror(*argv); goto fail; } /* NULL-out pointers. */ information->interception_conditions = NULL; information->send_pending = NULL; /* Add to list of clients. */ pthread_mutex_lock(&slave_mutex); entry = linked_list_insert_end(&client_list, (size_t)(void*)information); if (entry == LINKED_LIST_UNUSED) { perror(*argv); pthread_mutex_unlock(&slave_mutex); goto fail; } /* Add client to table. */ tmp = fd_table_put(&client_map, socket_fd, (size_t)(void*)information); pthread_mutex_unlock(&slave_mutex); if ((tmp == 0) && errno) { perror(*argv); goto fail; } /* Fill information table. */ information->list_entry = entry; information->socket_fd = socket_fd; information->open = 1; information->id = 0; information->interception_conditions_count = 0; information->send_pending_size = 0; if (mds_message_initialise(&(information->message))) { perror(*argv); goto fail; } } /* Store the thread so that other threads can kill it. */ information->thread = pthread_self(); /* Create mutex to make sure two thread to not try to send messages concurrently, and other slave local actions. */ pthread_mutex_init(&(information->mutex), NULL); mutex_created = 1; /* Make the server update without all slaves dying on SIGUSR1. */ if (xsigaction(SIGUSR1, sigusr1_trap) < 0) { perror(*argv); goto fail; } /* Fetch messages from the slave. */ if (information->open) while (reexecing == 0) { /* Send queued messages. */ if (information->send_pending_size > 0) { char* sendbuf = information->send_pending; char* sendbuf_ = sendbuf; size_t sent; n = information->send_pending_size; information->send_pending_size = 0; information->send_pending = NULL; with_mutex(information->mutex, while (n > 0) { sent = send_message(information->socket_fd, sendbuf_, n); if ((sent < n) && (errno != EINTR)) /* Ignore EINTR */ { perror(*argv); break; } n -= sent; sendbuf_ += sent; } free(sendbuf); ); } /* Fetch message. */ r = mds_message_read(&(information->message), socket_fd); if (r == 0) message_received(information); else if (r == -2) { eprint("corrupt message received."); goto fail; } else if (errno == ECONNRESET) { r = mds_message_read(&(information->message), socket_fd); information->open = 0; if (r == 0) message_received(information); /* Connection closed. */ break; } else if (errno == EINTR) { /* Stop the thread if we are re-exec:ing the server. */ if (reexecing) goto reexec; } else { perror(*argv); goto fail; } } /* Stop the thread if we are re-exec:ing the server. */ if (reexecing) goto reexec; /* Multicast information about the client closing. */ n = 2 * 10 + 1 + strlen("Client closed: :\n\n"); snprintf(msgbuf, n, "Client closed: %" PRIu32 ":%" PRIu32 "\n" "\n", (uint32_t)(information->id >> 32), (uint32_t)(information->id >> 0)); n = strlen(msgbuf) + 1; multicast_message(msgbuf, n); fail: /* The loop does break, this done on success as well. */ /* Close socket and free resources. */ close(socket_fd); free(msgbuf); if (information != NULL) { if (information->interception_conditions != NULL) { size_t i; for (i = 0; i < information->interception_conditions_count; i++) free(information->interception_conditions[i].condition); free(information->interception_conditions); } if (mutex_created) pthread_mutex_destroy(&(information->mutex)); mds_message_destroy(&(information->message)); free(information->send_pending); free(information); } /* Unlist client and decrease the slave count. */ with_mutex(slave_mutex, fd_table_remove(&client_map, socket_fd); if (entry != LINKED_LIST_UNUSED) linked_list_remove(&client_list, entry); running_slaves--; pthread_cond_signal(&slave_cond);); return NULL; reexec: /* Tell the master thread that the slave has closed, this is done because re-exec causes a race-condition between the acception of a slave and the execution of the the slave thread. */ with_mutex(slave_mutex, running_slaves--; pthread_cond_signal(&slave_cond);); return NULL; } /** * Perform actions that should be taken when * a message has been received from a client * * @param client The client has sent a message */ void message_received(client_t* client) { mds_message_t message = client->message; int assign_id = 0; int modifying = 0; int intercept = 0; int64_t priority = 0; int stop = 0; const char* message_id = NULL; size_t i, n; char* msgbuf; /* Parser headers. */ for (i = 0; i < message.header_count; i++) { const char* h = message.headers[i]; if (strequals(h, "Command: assign-id")) assign_id = 1; else if (strequals(h, "Command: intercept")) intercept = 1; else if (strequals(h, "Modifying: yes")) modifying = 1; else if (strequals(h, "Stop: yes")) stop = 1; else if (startswith(h, "Message ID: ")) message_id = strstr(h, ": ") + 2; else if (startswith(h, "Priority: ")) priority = atoll(strstr(h, ": ") + 2); } /* Ignore message if not labelled with a message ID. */ if (message_id == NULL) { eprint("received message with a message ID, ignoring."); return; } /* Assign ID if not already assigned. */ if (assign_id && (client->id == 0)) { intercept |= 2; with_mutex(slave_mutex, client->id = next_id++; if (next_id == 0) { eprint("this is impossible, ID counter has overflowed."); /* If the program ran for a millennium it would take c:a 585 assignments per nanosecond. This cannot possibly happen. (It would require serious dedication by generations of ponies (or just an alicorn) to maintain the process and transfer it new hardware.) */ abort(); } ); } /* Make the client listen for messages addressed to it. */ if (intercept) { size_t size = 64; /* Atleast 25, otherwise we get into problems at ((intercept & 2)) */ char* buf; if (xmalloc(buf, size + 1, char)) { perror(*argv); return; } pthread_mutex_lock(&(client->mutex)); if ((intercept & 1)) /* from payload */ { char* payload = client->message.payload; if (client->message.payload_size == 0) /* All messages */ { *buf = '\0'; add_intercept_condition(client, buf, priority, modifying, stop); } else /* Filtered messages */ for (;;) { char* end = strchrnul(payload, '\n'); size_t len = (size_t)(end - payload); if (len == 0) { payload++; break; } if (len > size) { char* old_buf = buf; buf = realloc(buf, ((size <<= 1) + 1) * sizeof(char)); if (buf == NULL) { perror(*argv); free(old_buf); return; } } memcpy(buf, payload, len); buf[len] = '\0'; add_intercept_condition(client, buf, priority, modifying, stop); if (*end == '\0') break; payload = end + 1; } } if ((intercept & 2)) /* "To: $(client->id)" */ { snprintf(buf, size, "To: %" PRIu32 ":%" PRIu32, (uint32_t)(client->id >> 32), (uint32_t)(client->id >> 0)); add_intercept_condition(client, buf, priority, modifying, stop); } pthread_mutex_unlock(&(client->mutex)); free(buf); } /* Multicast the message. */ n = mds_message_marshal_size(&message, 0); if ((msgbuf = malloc(n)) == NULL) { perror(*argv); return; } mds_message_marshal(&message, msgbuf, 0); multicast_message(msgbuf, n / sizeof(char)); /* TODO support re-exec */ free(msgbuf); /* Send asigned ID. */ if (assign_id) { /* Construct response. */ n = 2 * 10 + strlen(message_id) + 1; n += strlen("ID assignment: :\nIn response to: \n\n"); if (xmalloc(msgbuf, n, char)) { perror(*argv); return; } snprintf(msgbuf, n, "ID assignment: %" PRIu32 ":%" PRIu32 "\n" "In response to: %s\n" "\n", (uint32_t)(client->id >> 32), (uint32_t)(client->id >> 0), message_id == NULL ? "" : message_id); n = strlen(msgbuf); /* Multicast the reply. */ multicast_message(msgbuf, n); /* TODO support re-exec */ /* Queue message to be sent when this function returns. This done to simplify `multicast_message` for re-exec. */ with_mutex(client->mutex, if (client->send_pending_size == 0) { /* Set the pending message. */ client->send_pending = msgbuf; client->send_pending_size = n; } else { /* Concatenate message to already pending messages. */ size_t new_len = client->send_pending_size + n; char* msg_new = realloc(client->send_pending, new_len * sizeof(char)); if (msg_new != NULL) { memcpy(msg_new + client->send_pending_size, msgbuf, n * sizeof(char)); client->send_pending = msg_new; client->send_pending_size = new_len; } else perror(*argv); free(msgbuf); } ); } } /** * Add an interception condition for a client * * @param client The client * @param condition The header, optionally with value, to look for, or empty (not `NULL`) for all messages * @param priority Interception priority * @param modifying Whether the client may modify the messages * @param stop Whether the condition should be removed rather than added */ void add_intercept_condition(client_t* client, char* condition, int64_t priority, int modifying, int stop) { size_t n = client->interception_conditions_count; interception_condition_t* conds = client->interception_conditions; ssize_t nonmodifying = -1; char* header = condition; char* value; size_t hash; size_t i; if ((condition = strdup(condition)) == NULL) { perror(*argv); return; } if ((value = strchr(header, ':')) != NULL) { *value = '\0'; /* NUL-terminate header. */ value += 2; /* Skip over delimiter. */ } hash = string_hash(header); for (i = 0; i < n; i++) { if (conds[i].header_hash == hash) if (strequals(conds[i].condition, condition)) { if (stop) { /* Remove the condition from the list. */ memmove(conds + i, conds + i + 1, --n - i); client->interception_conditions_count--; /* Diminish the list. */ if (n == 0) { free(conds); client->interception_conditions = NULL; } else if ((conds = realloc(conds, n * sizeof(interception_condition_t))) == NULL) perror(*argv); else client->interception_conditions = conds; } else { /* Update parameters. */ conds[i].priority = priority; conds[i].modifying = modifying; if (modifying && (nonmodifying >= 0)) { /* Optimisation: put conditions that are modifying at the beginning. When a client is intercepting we most know if any satisfying condition is modifying. With this optimisation the first satisfying condition will tell us if there is any satisfying condition that is modifying. */ interception_condition_t temp = conds[nonmodifying]; conds[nonmodifying] = conds[i]; conds[i] = temp; } } return; } /* Look for the first non-modifying, this is a part of the optimisation where we put all modifying conditions at the beginning. */ if ((nonmodifying < 0) && conds[i].modifying) nonmodifying = (ssize_t)i; } if (stop) eprint("client tried to stop intercepting messages that it does not intercept."); else { /* Grow the interception condition list. */ if (conds == NULL) conds = malloc(1 * sizeof(interception_condition_t)); else conds = realloc(conds, (n + 1) * sizeof(interception_condition_t)); if (conds == NULL) { perror(*argv); return; } client->interception_conditions = conds; /* Store condition. */ client->interception_conditions_count++; conds[n].condition = condition; conds[n].header_hash = hash; conds[n].priority = priority; conds[n].modifying = modifying; if (modifying && (nonmodifying >= 0)) { /* Optimisation: put conditions that are modifying at the beginning. When a client is intercepting we most know if any satisfying condition is modifying. With this optimisation the first satisfying condition will tell us if there is any satisfying condition that is modifying. */ interception_condition_t temp = conds[nonmodifying]; conds[nonmodifying] = conds[n]; conds[n] = temp; } } } /** * Compare two queued interceptors by priority * * @param a:const queued_interception_t* One of the interceptors * @param b:const queued_interception_t* The other of the two interceptors * @return Negative if a before b, positive if a after b, otherwise zero */ static int cmp_queued_interception(const void* a, const void* b) { const queued_interception_t* p = b; /* Highest first, so swap them. */ const queued_interception_t* q = a; int64_t diff = p->priority - q->priority; return diff < 0 ? -1 : diff > 0 ? 1 : 0; } /** * Multicast a message * * @param message The message * @param length The length of the message */ void multicast_message(char* message, size_t length) { size_t header_count = 0; size_t n = length - 1; size_t* hashes = NULL; char** headers = NULL; char** header_values = NULL; queued_interception_t* interceptions = NULL; size_t interceptions_count = 0; size_t i; ssize_t node; /* Count the number of headers. */ for (i = 0; i < n; i++) if (message[i] == '\n') { header_count++; if (message[i + 1] == '\n') break; } if (header_count == 0) return; /* Invalid message. */ /* Allocate header lists. */ if (xmalloc(hashes, header_count, size_t)) goto fail; if (xmalloc(headers, header_count, char*)) goto fail; if (xmalloc(header_values, header_count, char*)) goto fail; /* Populate header lists. */ for (i = 0; i < header_count; i++) { char* end = strchr(message, '\n'); char* colon = strchr(message, ':'); *end = '\0'; if ((header_values[i] = strdup(message)) == NULL) { header_count = i; goto fail; } *colon = '\0'; if ((headers[i] = strdup(message)) == NULL) { free(headers[i]); header_count = i; goto fail; } *colon = ':'; *end = '\n'; hashes[i] = string_hash(headers[i]); message = end + 1; } /* Get intercepting clients. */ with_mutex(slave_mutex, /* Count clients. */ n = 0; foreach_linked_list_node (client_list, node) n++; /* Allocate interceptor list. */ interceptions = malloc(n * sizeof(queued_interception_t*)); /* Search clients. */ if (interceptions != NULL) foreach_linked_list_node (client_list, node) { client_t* client = (client_t*)(void*)(client_list.values[node]); pthread_mutex_t mutex = client->mutex; interception_condition_t* conds = client->interception_conditions; int64_t priority = 0; /* Initialise to stop incorrect warning. */ int modifying = 0; /* Initialise to stop incorrect warning. */ size_t j; /* Look for a matching condition. */ n = client->interception_conditions_count; with_mutex(mutex, for (i = 0; i < n; i++) { interception_condition_t* cond = conds + i; for (j = 0; j < header_count; j++) { if (*(cond->condition) == '\0') break; if (cond->header_hash == hashes[j]) if (strequals(cond->condition, headers[j]) || strequals(cond->condition, header_values[j])) break; } if (j < header_count) { priority = cond->priority; modifying = cond->modifying; break; } } ); /* List client of there was a matching condition. */ if (i < n) { interceptions[interceptions_count].client = client; interceptions[interceptions_count].priority = priority; interceptions[interceptions_count].modifying = modifying; interceptions_count++; } } ); if (interceptions == NULL) goto fail; /* Sort interceptors. */ qsort(interceptions, interceptions_count, sizeof(queued_interception_t), cmp_queued_interception); /* Send message to interceptors. */ for (i = 0; i < interceptions_count; i++) { queued_interception_t client_ = interceptions[i]; client_t* client = client_.client; char* msg = message; size_t sent; n = length; /* Send the message. */ with_mutex(client->mutex, if (client->open) while (n > 0) { sent = send_message(client->socket_fd, msg, n); if ((sent < n) && (errno != EINTR)) /* TODO Support reexec */ { perror(*argv); break; } n -= sent; msg += sent; } ); /* Wait for a reply. */ if ((n > 0) && client_.modifying) { /* TODO */ } } errno = 0; fail: /* This is done before this function returns even if there was no error. */ if (errno != 0) perror(*argv); /* Release resources. */ xfree(headers, header_count); xfree(header_values, header_count); free(interceptions); free(hashes); } /** * Exec into the mdsinitrc script * * @param args The arguments to the child process */ void run_initrc(char** args) { char pathname[PATH_MAX]; struct passwd* pwd; char* env; char* home; args[0] = pathname; #define __exec(FORMAT, ...) \ xsnprintf(pathname, FORMAT, __VA_ARGS__); execv(args[0], args) /* Test $XDG_CONFIG_HOME. */ if ((env = getenv_nonempty("XDG_CONFIG_HOME")) != NULL) { __exec("%s/%s", env, INITRC_FILE); } /* Test $HOME. */ if ((env = getenv_nonempty("HOME")) != NULL) { __exec("%s/.config/%s", env, INITRC_FILE); __exec("%s/.%s", env, INITRC_FILE); } /* Test ~. */ pwd = getpwuid(getuid()); /* Ignore error. */ if (pwd != NULL) { home = pwd->pw_dir; if ((home != NULL) && (*home != '\0')) { __exec("%s/.config/%s", home, INITRC_FILE); __exec("%s/.%s", home, INITRC_FILE); } } /* Test $XDG_CONFIG_DIRS. */ if ((env = getenv_nonempty("XDG_CONFIG_DIRS")) != NULL) { char* begin = env; char* end; int len; for (;;) { end = strchrnul(begin, ':'); len = (int)(end - begin); if (len > 0) { __exec("%.*s/%s", len, begin, INITRC_FILE); } if (*end == '\0') break; begin = end + 1; } } /* Test /etc. */ __exec("%s/%s", SYSCONFDIR, INITRC_FILE); #undef __exec /* Everything failed. */ eprintf("unable to run %s file, you might as well kill me.", INITRC_FILE); } /** * Called with the signal SIGUSR1 is caught. * This function should cue a re-exec of the program. * * @param signo The caught signal */ void sigusr1_trap(int signo) { if (reexecing == 0) { pthread_t current_thread; ssize_t node; reexecing = 1; current_thread = pthread_self(); if (pthread_equal(current_thread, master_thread) == 0) pthread_kill(master_thread, signo); with_mutex(slave_mutex, foreach_linked_list_node (client_list, node) { client_t* value = (client_t*)(void*)(client_list.values[node]); if (pthread_equal(current_thread, value->thread) == 0) pthread_kill(value->thread, signo); }); } } /** * Marshal the server's state into a file * * @param fd The file descriptor * @return Negative on error */ int marshal_server(int fd) { size_t list_size = linked_list_marshal_size(&client_list); size_t map_size = fd_table_marshal_size(&client_map); size_t list_elements = 0; size_t state_n = 0; char* state_buf = NULL; char* state_buf_; ssize_t node; /* Calculate the grand size of all client information. */ for (node = client_list.edge;; list_elements++) { if ((node = client_list.next[node]) == client_list.edge) break; state_n += client_marshal_size((client_t*)(void*)(client_list.values[node])); } /* Add the size of the rest of the program's state. */ state_n += sizeof(int) + sizeof(sig_atomic_t) + sizeof(uint64_t) + 2 * sizeof(size_t); state_n += list_elements * sizeof(size_t) + list_size + map_size; /* Allocate a buffer for all data except the client list and the client map. */ state_buf = state_buf_ = malloc(state_n); if (state_buf == NULL) goto fail; /* Tell the new version of the program what version of the program it is marshalling. */ buf_set_next(state_buf_, int, MDS_SERVER_VARS_VERSION); /* Marshal the miscellaneous state data. */ buf_set_next(state_buf_, sig_atomic_t, running); buf_set_next(state_buf_, uint64_t, next_id); /* Tell the program how large the marshalled client list is and how any clients are marshalled. */ buf_set_next(state_buf_, size_t, list_size); buf_set_next(state_buf_, size_t, list_elements); /* Marshal the clients. */ foreach_linked_list_node (client_list, node) { /* Get the memory address of the client. */ size_t value_address = client_list.values[node]; /* Get the client's information. */ client_t* value = (client_t*)(void*)value_address; /* Marshal the address, it is used the the client list and the client map, that will be marshalled. */ buf_set_next(state_buf_, size_t, value_address); /* Marshal the client informationation. */ state_buf_ += client_marshal(value, state_buf_) / sizeof(char); } /* Marshal the client list. */ linked_list_marshal(&client_list, state_buf_); state_buf_ += list_size / sizeof(char); /* Marshal the client map. */ fd_table_marshal(&client_map, state_buf_); /* Send the marshalled data into the file. */ if (full_write(fd, state_buf, state_n) < 0) goto fail; free(state_buf); return 0; fail: perror(*argv); free(state_buf); return -1; } /** * Address translation table used by `unmarshal_server` and `remapper` */ static hash_table_t unmarshal_remap_map; /** * Address translator for `unmarshal_server` * * @param old The old address * @return The new address */ static size_t unmarshal_remapper(size_t old) { return hash_table_get(&unmarshal_remap_map, old); } /** * Unmarshal the server's state from a file * * @param fd The file descriptor * @return Negative on error */ int unmarshal_server(int fd) { int with_error = 0; char* state_buf; char* state_buf_; size_t list_size; size_t list_elements; size_t i; ssize_t node; pthread_t _slave_thread; /* Read the file. */ if ((state_buf = state_buf_ = full_read(fd)) == NULL) { perror(*argv); return -1; } /* Create memory address remapping table. */ if (hash_table_create(&unmarshal_remap_map)) { perror(*argv); free(state_buf); hash_table_destroy(&unmarshal_remap_map, NULL, NULL); return -1; } /* Get the marshal protocal version. Not needed, there is only the one version right now. */ /* buf_get(state_buf_, int, 0, MDS_SERVER_VARS_VERSION); */ buf_next(state_buf_, int, 1); /* Unmarshal the miscellaneous state data. */ buf_get_next(state_buf_, sig_atomic_t, running); buf_get_next(state_buf_, uint64_t, next_id); /* Get the marshalled size of the client list and how any clients that are marshalled. */ buf_get_next(state_buf_, size_t, list_size); buf_get_next(state_buf_, size_t, list_elements); /* Unmarshal the clients. */ for (i = 0; i < list_elements; i++) { size_t n; size_t value_address; client_t* value; /* Allocate the client's information. */ if (xmalloc(value, 1, client_t)) goto clients_fail; /* Unmarshal the address, it is used the the client list and the client map, that are also marshalled. */ buf_get_next(state_buf_, size_t, value_address); /* Unmarshal the client information. */ n = client_unmarshal(value, state_buf_); if (n == 0) goto clients_fail; /* Populate the remapping table. */ if (hash_table_put(&unmarshal_remap_map, value_address, (size_t)(void*)value) == 0) if (errno) goto clients_fail; state_buf_ += n / sizeof(char); /* On error, seek past all clients. */ continue; clients_fail: perror(*argv); with_error = 1; if (value != NULL) { buf_prev(state_buf_, size_t, 1); free(value); } for (; i < list_elements; i++) /* There is not need to close the sockets, it is done by the caller because there are conditions where we cannot get here anyway. */ state_buf_ += client_unmarshal_skip(state_buf_) / sizeof(char); break; } /* Unmarshal the client list. */ if (linked_list_unmarshal(&client_list, state_buf_)) { perror(*argv); abort(); /* Critical. */ } state_buf_ += list_size / sizeof(char); /* Unmarshal the client map. */ if (fd_table_unmarshal(&client_map, state_buf_, unmarshal_remapper)) { perror(*argv); abort(); /* Critical. */ } /* Release the raw data. */ free(state_buf); /* Remove non-found elements from the fd table. */ if (with_error) for (i = 0; i < client_map.capacity; i++) if (client_map.used[i / 64] & ((uint64_t)1 << (i % 64))) /* Lets not presume that fd-table actually initialise its allocations. */ if (client_map.values[i] == 0) client_map.used[i / 64] &= ~((uint64_t)1 << (i % 64)); /* Remap the linked list and remove non-found elements, and start the clients. */ foreach_linked_list_node (client_list, node) { /* Remap the linked list and remove non-found elements. */ size_t new_address = unmarshal_remapper(client_list.values[node]); client_list.values[node] = new_address; if (new_address == 0) /* Returned if missing (or if the address is the invalid NULL.) */ linked_list_remove(&client_list, node); else { /* Start the clients. (Errors do not need to be reported.) */ client_t* client = (client_t*)(void*)new_address; int socket_fd = client->socket_fd; /* Increase number of running slaves. */ with_mutex(slave_mutex, running_slaves++;); /* Start slave thread. */ errno = pthread_create(&_slave_thread, NULL, slave_loop, (void*)(intptr_t)socket_fd); if (errno) { perror(*argv); with_mutex(slave_mutex, running_slaves--;); } } } /* Release the remapping table's resources. */ hash_table_destroy(&unmarshal_remap_map, NULL, NULL); return -with_error; }