/** * mds — A micro-display server * Copyright © 2014 Mattias Andrée (maandree@member.fsf.org) * * This program is free software: you can redistribute it and/or modify * it under the terms of the GNU General Public License as published by * the Free Software Foundation, either version 3 of the License, or * (at your option) any later version. * * This program is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * GNU General Public License for more details. * * You should have received a copy of the GNU General Public License * along with this program. If not, see . */ #include "mds-server.h" #include "globals.h" #include "interception_condition.h" #include "client.h" #include "queued_interception.h" #include "multicast.h" #include "signals.h" #include "interceptors.h" #include "sending.h" #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include /** * Entry point of the server * * @param argc_ Number of elements in `argv_` * @param argv_ Command line arguments * @return Non-zero on error */ int main(int argc_, char** argv_) { int is_respawn = -1; int socket_fd = -1; int reexec = 0; int unparsed_args_ptr = 1; char* unparsed_args[ARGC_LIMIT + LIBEXEC_ARGC_EXTRA_LIMIT + 1]; int i; pthread_t slave_thread; #if (LIBEXEC_ARGC_EXTRA_LIMIT < 3) # error LIBEXEC_ARGC_EXTRA_LIMIT is too small, need at least 3. #endif argc = argc_; argv = argv_; /* Drop privileges like it's hot. */ fail_if (drop_privileges()); /* Sanity check the number of command line arguments. */ exit_if (argc > ARGC_LIMIT + LIBEXEC_ARGC_EXTRA_LIMIT, eprint("that number of arguments is ridiculous, I will not allow it.");); /* Parse command line arguments. */ for (i = 1; i < argc; i++) { char* arg = argv[i]; int v; if ((v = strequals(arg, "--initial-spawn")) || /* Initial spawn? */ strequals(arg, "--respawn")) /* Respawning after crash? */ { exit_if (is_respawn == v, eprintf("conflicting arguments %s and %s cannot be combined.", "--initial-spawn", "--respawn");); is_respawn = !v; } else if (startswith(arg, "--socket-fd=")) /* Socket file descriptor. */ { exit_if (socket_fd != -1, eprintf("duplicate declaration of %s.", "--socket-fd");); exit_if (strict_atoi(arg += strlen("--socket-fd="), &socket_fd, 0, INT_MAX) < 0, eprintf("invalid value for %s: %s.", "--socket-fd", arg);); } else if (strequals(arg, "--re-exec")) /* Re-exec state-marshal. */ reexec = 1; else /* Not recognised, it is probably for another server. */ unparsed_args[unparsed_args_ptr++] = arg; } unparsed_args[unparsed_args_ptr] = NULL; if (reexec) { is_respawn = 1; eprint("re-exec performed."); } /* Check that manditory arguments have been specified. */ exit_if (is_respawn < 0, eprintf("missing state argument, require either %s or %s.", "--initial-spawn", "--respawn");); exit_if (socket_fd < 0, eprint("missing socket file descriptor argument.");); /* Run mdsinitrc. */ if (is_respawn == 0) { pid_t pid = fork(); fail_if (pid == (pid_t)-1); if (pid == 0) /* Child process exec:s, the parent continues without waiting for it. */ { /* Close all files except stdin, stdout and stderr. */ close_files((fd > 2) || (fd == socket_fd)); /* Run mdsinitrc. */ run_initrc(unparsed_args); return 1; } } #define __free(I) \ if (I >= 0) fd_table_destroy(&client_map, NULL, NULL); \ if (I >= 1) linked_list_destroy(&client_list); \ if (I >= 2) pthread_mutex_destroy(&slave_mutex); \ if (I >= 3) pthread_cond_destroy(&slave_cond); \ if (I >= 4) pthread_mutex_destroy(&modify_mutex); \ if (I >= 5) pthread_cond_destroy(&modify_cond); \ if (I >= 6) hash_table_destroy(&modify_map, NULL, NULL) #define error_if(I, CONDITION) if (CONDITION) { perror(*argv); __free(I); return 1; } /* Create list and table of clients. */ if (reexec == 0) { error_if (0, fd_table_create(&client_map)); error_if (1, linked_list_create(&client_list, 32)); } /* Store the current thread so it can be killed from elsewhere. */ master_thread = pthread_self(); /* Set up traps for especially handled signals. */ error_if (1, trap_signals() < 0); /* Create mutex and condition for slave counter. */ error_if (1, (errno = pthread_mutex_init(&slave_mutex, NULL))); error_if (2, (errno = pthread_cond_init(&slave_cond, NULL))); /* Create mutex, condition and map for message modification. */ error_if (3, (errno = pthread_mutex_init(&modify_mutex, NULL))); error_if (4, (errno = pthread_cond_init(&modify_cond, NULL))); error_if (5, hash_table_create(&modify_map)); #undef error_if /* Unmarshal the state of the server. */ if (reexec) { pid_t pid = getpid(); int reexec_fd, r; char shm_path[NAME_MAX + 1]; /* Acquire access to marshalled data. */ xsnprintf(shm_path, SHM_PATH_PATTERN, (unsigned long int)pid); reexec_fd = shm_open(shm_path, O_RDONLY, S_IRWXU); fail_if (reexec_fd < 0); /* Critical. */ /* Unmarshal state. */ r = unmarshal_server(reexec_fd); close(reexec_fd); shm_unlink(shm_path); if (r < 0) { /* Close all files (hopefully sockets) we do not know what they are. */ close_files((fd > 2) && (fd != socket_fd) && (fd_table_contains_key(&client_map, fd) == 0)); } } /* Accepting incoming connections. */ while (running && (terminating == 0)) { /* Accept connection. */ int client_fd = accept(socket_fd, NULL, NULL); if (client_fd >= 0) { /* Increase number of running slaves. */ with_mutex (slave_mutex, running_slaves++;); /* Start slave thread. */ create_slave(&slave_thread, client_fd); } else /* Handle errors and shutdown. */ if ((errno == EINTR) && terminating) /* Interrupted for termination. */ goto terminate; else if ((errno == ECONNABORTED) || (errno == EINVAL)) /* Closing. */ running = 0; else if (errno != EINTR) /* Error. */ perror(*argv); } terminate: if (reexecing) goto reexec; /* Join with all slaves threads. */ with_mutex (slave_mutex, while (running_slaves > 0) pthread_cond_wait(&slave_cond, &slave_mutex);); /* Release resources. */ __free(9999); return 0; #undef __free reexec: { pid_t pid = getpid(); int reexec_fd; char shm_path[NAME_MAX + 1]; ssize_t node; /* Join with all slaves threads. */ with_mutex (slave_mutex, while (running_slaves > 0) pthread_cond_wait(&slave_cond, &slave_mutex);); /* Release resources. */ pthread_mutex_destroy(&slave_mutex); pthread_cond_destroy(&slave_cond); pthread_mutex_destroy(&modify_mutex); pthread_cond_destroy(&modify_cond); hash_table_destroy(&modify_map, NULL, NULL); /* Marshal the state of the server. */ xsnprintf(shm_path, SHM_PATH_PATTERN, (unsigned long int)pid); reexec_fd = shm_open(shm_path, O_RDWR | O_CREAT | O_EXCL, S_IRWXU); fail_if (reexec_fd < 0); if (marshal_server(reexec_fd) < 0) goto reexec_fail; close(reexec_fd); reexec_fd = -1; /* Release resources. */ foreach_linked_list_node (client_list, node) { client_t* client = (client_t*)(void*)(client_list.values[node]); client_destroy(client); } fd_table_destroy(&client_map, NULL, NULL); linked_list_destroy(&client_list); /* Re-exec the server. */ reexec_server(argc, argv, reexec); reexec_fail: perror(*argv); if (reexec_fd >= 0) close(reexec_fd); shm_unlink(shm_path); /* Returning non-zero is important, otherwise the server cannot be respawn if the re-exec fails. */ return 1; } pfail: perror(*argv); return 1; } /** * Master function for slave threads * * @param data Input data * @return Outout data */ void* slave_loop(void* data) { int socket_fd = (int)(intptr_t)data; ssize_t entry = LINKED_LIST_UNUSED; size_t information_address = fd_table_get(&client_map, (size_t)socket_fd); client_t* information = (client_t*)(void*)information_address; char* msgbuf = NULL; size_t n; size_t tmp; int r; if (information == NULL) { /* Create information table. */ fail_if (xmalloc(information, 1, client_t)); client_initialise(information); /* Add to list of clients. */ pthread_mutex_lock(&slave_mutex); entry = linked_list_insert_end(&client_list, (size_t)(void*)information); if (entry == LINKED_LIST_UNUSED) { perror(*argv); pthread_mutex_unlock(&slave_mutex); goto fail; } /* Add client to table. */ tmp = fd_table_put(&client_map, socket_fd, (size_t)(void*)information); if ((tmp == 0) && errno) { perror(*argv); pthread_mutex_unlock(&slave_mutex); goto fail; } pthread_mutex_unlock(&slave_mutex); /* Fill information table. */ information->list_entry = entry; information->socket_fd = socket_fd; information->open = 1; fail_if (mds_message_initialise(&(information->message))); } /* Store slave thread and create mutexes and conditions. */ fail_if (client_initialise_threading(information)); /* Set up traps for especially handled signals. */ fail_if (trap_signals() < 0); /* Fetch messages from the slave. */ while ((terminating == 0) && information->open) { /* Send queued multicast messages. */ send_multicast_queue(information); /* Send queued messages. */ send_reply_queue(information); /* Fetch message. */ r = fetch_message(information); if ((r == 0) && message_received(information)) goto terminate; else if (r == -2) goto fail; else if (r && (errno == EINTR) && terminating) goto terminate; /* Stop the thread if we are re-exec:ing or terminating the server. */ } /* Stop the thread if we are re-exec:ing or terminating the server. */ if (terminating) goto terminate; /* Multicast information about the client closing. */ n = 2 * 10 + 1 + strlen("Client closed: :\n\n"); fail_if (xmalloc(msgbuf, n, char)); snprintf(msgbuf, n, "Client closed: %" PRIu32 ":%" PRIu32 "\n" "\n", (uint32_t)(information->id >> 32), (uint32_t)(information->id >> 0)); n = strlen(msgbuf); queue_message_multicast(msgbuf, n, information); msgbuf = NULL; terminate: /* This done on success as well. */ if (reexecing) goto reexec; fail: /* This done on success as well. */ /* Close socket and free resources. */ close(socket_fd); free(msgbuf); if (information != NULL) client_destroy(information); /* Unlist client and decrease the slave count. */ with_mutex (slave_mutex, fd_table_remove(&client_map, socket_fd); if (entry != LINKED_LIST_UNUSED) linked_list_remove(&client_list, entry); running_slaves--; pthread_cond_signal(&slave_cond);); return NULL; pfail: perror(*argv); goto fail; reexec: /* Tell the master thread that the slave has closed, this is done because re-exec causes a race-condition between the acception of a slave and the execution of the the slave thread. */ with_mutex (slave_mutex, running_slaves--; pthread_cond_signal(&slave_cond);); return NULL; } /** * Receive a full message and update open status if the client closes * * @param client The client * @return Zero on success, -2 on failure, otherwise -1 */ int fetch_message(client_t* client) { int r = mds_message_read(&(client->message), client->socket_fd); if (r == 0) return 0; if (r == -2) eprint("corrupt message received."); else if (errno == ECONNRESET) { r = mds_message_read(&(client->message), client->socket_fd); client->open = 0; /* Connection closed. */ } else if (errno != EINTR) { r = -2; perror(*argv); } return r; } /** * Perform actions that should be taken when * a message has been received from a client * * @param client The client has sent a message * @return Normally zero, but 1 if exited because of re-exec or termination */ int message_received(client_t* client) { mds_message_t message = client->message; int assign_id = 0; int modifying = 0; int intercept = 0; int64_t priority = 0; int stop = 0; const char* message_id = NULL; uint64_t modify_id = 0; size_t i, n; char* msgbuf; /* Parser headers. */ for (i = 0; i < message.header_count; i++) { const char* h = message.headers[i]; if (strequals(h, "Command: assign-id")) assign_id = 1; else if (strequals(h, "Command: intercept")) intercept = 1; else if (strequals(h, "Modifying: yes")) modifying = 1; else if (strequals(h, "Stop: yes")) stop = 1; else if (startswith(h, "Message ID: ")) message_id = strstr(h, ": ") + 2; else if (startswith(h, "Priority: ")) priority = atoll(strstr(h, ": ") + 2); else if (startswith(h, "Modify ID: ")) modify_id = (uint64_t)atoll(strstr(h, ": ") + 2); } /* Notify waiting client about a received message modification. */ if (modifying != 0) { /* pthread_cond_timedwait is required to handle re-exec and termination because pthread_cond_timedwait and pthread_cond_wait ignore interruptions via signals. */ struct timespec timeout = { .tv_sec = 1, .tv_nsec = 0 }; size_t address; client_t* recipient; mds_message_t* multicast; pthread_mutex_lock(&(modify_mutex)); while (hash_table_contains_key(&modify_map, (size_t)modify_id) == 0) { if (terminating) { pthread_mutex_unlock(&(modify_mutex)); return 1; } pthread_cond_timedwait(&slave_cond, &slave_mutex, &timeout); } address = hash_table_get(&modify_map, (size_t)modify_id); recipient = (client_t*)(void*)address; if (xmalloc(multicast = recipient->modify_message, 1, mds_message_t)) goto fail; multicast->headers = NULL; multicast->header_count = 0; multicast->payload = NULL; multicast->payload_size = 0; multicast->payload_ptr = 0; multicast->buffer = NULL; multicast->buffer_size = 0; multicast->buffer_ptr = 0; multicast->stage = 0; if (xmalloc(multicast->payload, message.payload_size, char)) goto fail; memcpy(multicast->payload, message.payload, message.payload_size * sizeof(char)); if (xmalloc(multicast->headers, message.header_count, char*)) goto fail; for (i = 0; i < message.header_count; i++, multicast->header_count++) { multicast->headers[i] = strdup(message.headers[i]); if (multicast->headers[i] == NULL) goto fail; } goto done; fail: if (multicast != NULL) { mds_message_destroy(multicast); free(multicast); recipient->modify_message = NULL; } done: pthread_mutex_unlock(&(modify_mutex)); with_mutex (client->modify_mutex, pthread_cond_signal(&(client->modify_cond));); /* Do nothing more, not not even multicast this message. */ return 0; } if (message_id == NULL) { eprint("received message with a message ID, ignoring."); return 0; } /* Assign ID if not already assigned. */ if (assign_id && (client->id == 0)) { intercept |= 2; with_mutex_if (slave_mutex, (client->id = ++next_client_id) == 0, eprint("this is impossible, ID counter has overflowed."); /* If the program ran for a millennium it would take c:a 585 assignments per nanosecond. This cannot possibly happen. (It would require serious dedication by generations of ponies (or just an alicorn) to maintain the process and transfer it new hardware.) */ abort(); ); } /* Make the client listen for messages addressed to it. */ if (intercept) { size_t size = 64; /* Atleast 25, otherwise we get into problems at ((intercept & 2)) */ char* buf; if (xmalloc(buf, size + 1, char)) { perror(*argv); return 0; } pthread_mutex_lock(&(client->mutex)); if ((intercept & 1)) /* from payload */ { char* payload = client->message.payload; size_t payload_size = client->message.payload_size; if (client->message.payload_size == 0) /* All messages */ { *buf = '\0'; add_intercept_condition(client, buf, priority, modifying, stop); } else /* Filtered messages */ for (;;) { char* end = memchr(payload, '\n', payload_size); size_t len = end == NULL ? payload_size : (size_t)(end - payload); if (len == 0) { payload++; payload_size--; break; } if (len > size) { char* old_buf = buf; if (xrealloc(buf, (size <<= 1) + 1, char)) { perror(*argv); free(old_buf); pthread_mutex_unlock(&(client->mutex)); return 0; } } memcpy(buf, payload, len); buf[len] = '\0'; add_intercept_condition(client, buf, priority, modifying, stop); if (end == NULL) break; payload = end + 1; payload_size -= len + 1; } } if ((intercept & 2)) /* "To: $(client->id)" */ { snprintf(buf, size, "To: %" PRIu32 ":%" PRIu32, (uint32_t)(client->id >> 32), (uint32_t)(client->id >> 0)); add_intercept_condition(client, buf, priority, modifying, 0); } pthread_mutex_unlock(&(client->mutex)); free(buf); } /* Multicast the message. */ n = mds_message_compose_size(&message); if ((msgbuf = malloc(n)) == NULL) { perror(*argv); return 0; } mds_message_compose(&message, msgbuf); queue_message_multicast(msgbuf, n / sizeof(char), client); /* Send asigned ID. */ if (assign_id) { char* msgbuf_; /* Construct response. */ n = 2 * 10 + strlen(message_id) + 1; n += strlen("ID assignment: :\nIn response to: \n\n"); if (xmalloc(msgbuf, n, char)) { perror(*argv); return 0; } snprintf(msgbuf, n, "ID assignment: %" PRIu32 ":%" PRIu32 "\n" "In response to: %s\n" "\n", (uint32_t)(client->id >> 32), (uint32_t)(client->id >> 0), message_id == NULL ? "" : message_id); n = strlen(msgbuf); /* Multicast the reply. */ msgbuf_ = strdup(msgbuf); if (msgbuf_ == NULL) { perror(*argv); free(msgbuf); return 0; } queue_message_multicast(msgbuf_, n, client); /* Queue message to be sent when this function returns. This done to simplify `multicast_message` for re-exec and termination. */ with_mutex (client->mutex, if (client->send_pending_size == 0) { /* Set the pending message. */ client->send_pending = msgbuf; client->send_pending_size = n; } else { /* Concatenate message to already pending messages. */ size_t new_len = client->send_pending_size + n; char* msg_new = realloc(client->send_pending, new_len * sizeof(char)); if (msg_new != NULL) { memcpy(msg_new + client->send_pending_size, msgbuf, n * sizeof(char)); client->send_pending = msg_new; client->send_pending_size = new_len; } else perror(*argv); free(msgbuf); } ); } return 0; } /** * Compare two queued interceptors by priority * * @param a:const queued_interception_t* One of the interceptors * @param b:const queued_interception_t* The other of the two interceptors * @return Negative if a before b, positive if a after b, otherwise zero */ static int cmp_queued_interception(const void* a, const void* b) { const queued_interception_t* p = b; /* Highest first, so swap them. */ const queued_interception_t* q = a; return p->priority < q->priority ? -1 : p->priority > q->priority ? 1 : 0; } /** * Queue a message for multicasting * * @param message The message * @param length The length of the message * @param sender The original sender of the message */ void queue_message_multicast(char* message, size_t length, client_t* sender) { char* msg = message; size_t header_count = 0; size_t n = length - 1; size_t* hashes = NULL; char** headers = NULL; char** header_values = NULL; queued_interception_t* interceptions = NULL; size_t interceptions_count = 0; multicast_t* multicast = NULL; size_t i; uint64_t modify_id; char modify_id_header[13 + 3 * sizeof(uint64_t)]; void* new_buf; /* Count the number of headers. */ for (i = 0; i < n; i++) if (message[i] == '\n') { header_count++; if (message[i + 1] == '\n') break; } if (header_count == 0) return; /* Invalid message. */ /* Allocate multicast message. */ fail_if (xmalloc(multicast, 1, multicast_t)); multicast_initialise(multicast); /* Allocate header lists. */ fail_if (xmalloc(hashes, header_count, size_t)); fail_if (xmalloc(headers, header_count, char*)); fail_if (xmalloc(header_values, header_count, char*)); /* Populate header lists. */ for (i = 0; i < header_count; i++) { char* end = strchr(msg, '\n'); char* colon = strchr(msg, ':'); *end = '\0'; if ((header_values[i] = strdup(msg)) == NULL) { header_count = i; goto pfail; } *colon = '\0'; if ((headers[i] = strdup(msg)) == NULL) { free(headers[i]); header_count = i; goto pfail; } *colon = ':'; *end = '\n'; hashes[i] = string_hash(headers[i]); msg = end + 1; } /* Get intercepting clients. */ pthread_mutex_lock(&(slave_mutex)); interceptions = get_interceptors(sender, hashes, headers, header_values, header_count, &interceptions_count); pthread_mutex_unlock(&(slave_mutex)); fail_if (interceptions == NULL); /* Sort interceptors. */ qsort(interceptions, interceptions_count, sizeof(queued_interception_t), cmp_queued_interception); /* Add prefix to message with ‘Modify ID’ header. */ with_mutex (slave_mutex, modify_id = next_modify_id++; if (next_modify_id == 0) next_modify_id = 1; ); xsnprintf(modify_id_header, "Modify ID: %" PRIu64 "\n", modify_id); n = strlen(modify_id_header); new_buf = message; fail_if (xrealloc(new_buf, n + length, char)); message = new_buf; memmove(message + n, message, length * sizeof(char)); memcpy(message, modify_id_header, n * sizeof(char)); /* Store information. */ multicast->interceptions = interceptions; multicast->interceptions_count = interceptions_count; multicast->message = message; multicast->message_length = length + n; multicast->message_prefix = n; message = NULL; /* Queue message multicasting. */ with_mutex (sender->mutex, new_buf = sender->multicasts; if (xrealloc(new_buf, sender->multicasts_count + 1, multicast_t)) goto fail_queue; sender->multicasts = new_buf; sender->multicasts[sender->multicasts_count++] = *multicast; multicast = NULL; fail_queue: ); fail: /* This is done before this function returns even if there was no error. */ /* Release resources. */ xfree(headers, header_count); xfree(header_values, header_count); free(hashes); free(message); if (multicast != NULL) multicast_destroy(multicast); free(multicast); return; pfail: perror(*argv); goto fail; } /** * Exec into the mdsinitrc script * * @param args The arguments to the child process */ void run_initrc(char** args) { char pathname[PATH_MAX]; struct passwd* pwd; char* env; char* home; args[0] = pathname; #define __exec(FORMAT, ...) \ xsnprintf(pathname, FORMAT, __VA_ARGS__); execv(args[0], args) /* Test $XDG_CONFIG_HOME. */ if ((env = getenv_nonempty("XDG_CONFIG_HOME")) != NULL) { __exec("%s/%s", env, INITRC_FILE); } /* Test $HOME. */ if ((env = getenv_nonempty("HOME")) != NULL) { __exec("%s/.config/%s", env, INITRC_FILE); __exec("%s/.%s", env, INITRC_FILE); } /* Test ~. */ pwd = getpwuid(getuid()); /* Ignore error. */ if (pwd != NULL) { home = pwd->pw_dir; if ((home != NULL) && (*home != '\0')) { __exec("%s/.config/%s", home, INITRC_FILE); __exec("%s/.%s", home, INITRC_FILE); } } /* Test $XDG_CONFIG_DIRS. */ if ((env = getenv_nonempty("XDG_CONFIG_DIRS")) != NULL) { char* begin = env; char* end; int len; for (;;) { end = strchrnul(begin, ':'); len = (int)(end - begin); if (len > 0) { __exec("%.*s/%s", len, begin, INITRC_FILE); } if (*end == '\0') break; begin = end + 1; } } /* Test /etc. */ __exec("%s/%s", SYSCONFDIR, INITRC_FILE); #undef __exec /* Everything failed. */ eprintf("unable to run %s file, you might as well kill me.", INITRC_FILE); } /** * Marshal the server's state into a file * * @param fd The file descriptor * @return Negative on error */ int marshal_server(int fd) { size_t list_size = linked_list_marshal_size(&client_list); size_t map_size = fd_table_marshal_size(&client_map); size_t list_elements = 0; size_t state_n = 0; char* state_buf = NULL; char* state_buf_; ssize_t node; /* Calculate the grand size of all client information. */ for (node = client_list.edge;; list_elements++) { if ((node = client_list.next[node]) == client_list.edge) break; state_n += client_marshal_size((client_t*)(void*)(client_list.values[node])); } /* Add the size of the rest of the program's state. */ state_n += sizeof(int) + sizeof(sig_atomic_t) + 2 * sizeof(uint64_t) + 2 * sizeof(size_t); state_n += list_elements * sizeof(size_t) + list_size + map_size; /* Allocate a buffer for all data except the client list and the client map. */ state_buf = state_buf_ = malloc(state_n); if (state_buf == NULL) goto fail; /* Tell the new version of the program what version of the program it is marshalling. */ buf_set_next(state_buf_, int, MDS_SERVER_VARS_VERSION); /* Marshal the miscellaneous state data. */ buf_set_next(state_buf_, sig_atomic_t, running); buf_set_next(state_buf_, uint64_t, next_client_id); buf_set_next(state_buf_, uint64_t, next_modify_id); /* Tell the program how large the marshalled client list is and how any clients are marshalled. */ buf_set_next(state_buf_, size_t, list_size); buf_set_next(state_buf_, size_t, list_elements); /* Marshal the clients. */ foreach_linked_list_node (client_list, node) { /* Get the memory address of the client. */ size_t value_address = client_list.values[node]; /* Get the client's information. */ client_t* value = (client_t*)(void*)value_address; /* Marshal the address, it is used the the client list and the client map, that will be marshalled. */ buf_set_next(state_buf_, size_t, value_address); /* Marshal the client informationation. */ state_buf_ += client_marshal(value, state_buf_) / sizeof(char); } /* Marshal the client list. */ linked_list_marshal(&client_list, state_buf_); state_buf_ += list_size / sizeof(char); /* Marshal the client map. */ fd_table_marshal(&client_map, state_buf_); /* Send the marshalled data into the file. */ if (full_write(fd, state_buf, state_n) < 0) goto fail; free(state_buf); return 0; fail: perror(*argv); free(state_buf); return -1; } /** * Address translation table used by `unmarshal_server` and `remapper` */ static hash_table_t unmarshal_remap_map; /** * Address translator for `unmarshal_server` * * @param old The old address * @return The new address */ static size_t unmarshal_remapper(size_t old) { return hash_table_get(&unmarshal_remap_map, old); } /** * Unmarshal the server's state from a file * * @param fd The file descriptor * @return Negative on error */ int unmarshal_server(int fd) { int with_error = 0; char* state_buf; char* state_buf_; size_t list_size; size_t list_elements; size_t i; ssize_t node; pthread_t slave_thread; /* Read the file. */ if ((state_buf = state_buf_ = full_read(fd)) == NULL) { perror(*argv); return -1; } /* Create memory address remapping table. */ if (hash_table_create(&unmarshal_remap_map)) { perror(*argv); free(state_buf); hash_table_destroy(&unmarshal_remap_map, NULL, NULL); return -1; } /* Get the marshal protocal version. Not needed, there is only the one version right now. */ /* buf_get(state_buf_, int, 0, MDS_SERVER_VARS_VERSION); */ buf_next(state_buf_, int, 1); /* Unmarshal the miscellaneous state data. */ buf_get_next(state_buf_, sig_atomic_t, running); buf_get_next(state_buf_, uint64_t, next_client_id); buf_get_next(state_buf_, uint64_t, next_modify_id); /* Get the marshalled size of the client list and how any clients that are marshalled. */ buf_get_next(state_buf_, size_t, list_size); buf_get_next(state_buf_, size_t, list_elements); /* Unmarshal the clients. */ for (i = 0; i < list_elements; i++) { size_t n; size_t value_address; client_t* value; /* Allocate the client's information. */ if (xmalloc(value, 1, client_t)) goto clients_fail; /* Unmarshal the address, it is used the the client list and the client map, that are also marshalled. */ buf_get_next(state_buf_, size_t, value_address); /* Unmarshal the client information. */ n = client_unmarshal(value, state_buf_); if (n == 0) goto clients_fail; /* Populate the remapping table. */ if (hash_table_put(&unmarshal_remap_map, value_address, (size_t)(void*)value) == 0) if (errno) goto clients_fail; /* Delayed seeking. */ state_buf_ += n / sizeof(char); /* On error, seek past all clients. */ continue; clients_fail: perror(*argv); with_error = 1; if (value != NULL) { buf_prev(state_buf_, size_t, 1); free(value); } for (; i < list_elements; i++) /* There is not need to close the sockets, it is done by the caller because there are conditions where we cannot get here anyway. */ state_buf_ += client_unmarshal_skip(state_buf_) / sizeof(char); break; } /* Unmarshal the client list. */ if (linked_list_unmarshal(&client_list, state_buf_)) goto critical_fail; state_buf_ += list_size / sizeof(char); /* Unmarshal the client map. */ if (fd_table_unmarshal(&client_map, state_buf_, unmarshal_remapper)) goto critical_fail; /* Release the raw data. */ free(state_buf); /* Remove non-found elements from the fd table. */ #define __bit(I, _OP_) client_map.used[I / 64] _OP_ ((uint64_t)1 << (I % 64)) if (with_error) for (i = 0; i < client_map.capacity; i++) if ((__bit(i, &)) && (client_map.values[i] == 0)) /* Lets not presume that fd-table actually initialise its allocations. */ __bit(i, &= ~); #undef __bit /* Remap the linked list and remove non-found elements, and start the clients. */ foreach_linked_list_node (client_list, node) { /* Remap the linked list and remove non-found elements. */ size_t new_address = unmarshal_remapper(client_list.values[node]); client_list.values[node] = new_address; if (new_address == 0) /* Returned if missing (or if the address is the invalid NULL.) */ linked_list_remove(&client_list, node); else { /* Start the clients. (Errors do not need to be reported.) */ client_t* client = (client_t*)(void*)new_address; int socket_fd = client->socket_fd; /* Increase number of running slaves. */ with_mutex (slave_mutex, running_slaves++;); /* Start slave thread. */ create_slave(&slave_thread, socket_fd); } } /* Release the remapping table's resources. */ hash_table_destroy(&unmarshal_remap_map, NULL, NULL); return -with_error; critical_fail: perror(*argv); abort(); } /** * Create, start and detache a slave thread * * @param thread The address at where to store the thread * @param socket_fd The file descriptor of the slave's socket * @return Zero on success, -1 on error, error message will have been printed */ int create_slave(pthread_t* thread_slot, int socket_fd) { if ((errno = pthread_create(thread_slot, NULL, slave_loop, (void*)(intptr_t)socket_fd))) { perror(*argv); with_mutex (slave_mutex, running_slaves--;); return -1; } if ((errno = pthread_detach(*thread_slot))) { perror(*argv); return -1; } return 0; }