/** * mds — A micro-display server * Copyright © 2014 Mattias Andrée (maandree@member.fsf.org) * * This program is free software: you can redistribute it and/or modify * it under the terms of the GNU General Public License as published by * the Free Software Foundation, either version 3 of the License, or * (at your option) any later version. * * This program is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * GNU General Public License for more details. * * You should have received a copy of the GNU General Public License * along with this program. If not, see . */ #include "mds-server.h" #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #define MDS_SERVER_VARS_VERSION 0 /** * Number of elements in `argv` */ static int argc; /** * Command line arguments */ static char** argv; /** * The program run state, 1 when running, 0 when shutting down */ static volatile sig_atomic_t running = 1; /** * Non-zero when the program is about to re-exec */ static volatile sig_atomic_t reexecing = 0; /** * The number of running slaves */ static int running_slaves = 0; /** * Mutex for slave data */ static pthread_mutex_t slave_mutex; /** * Condition for slave data */ static pthread_cond_t slave_cond; /** * The thread that runs the master loop */ static pthread_t master_thread; /** * Map from client socket file descriptor to all information (client_t) */ static fd_table_t client_map; /** * List of client information (client_t) */ static linked_list_t client_list; /** * The next free ID for a client */ static uint64_t next_id = 1; /** * Entry point of the server * * @param argc_ Number of elements in `argv_` * @param argv_ Command line arguments * @return Non-zero on error */ int main(int argc_, char** argv_) { int is_respawn = -1; int socket_fd = -1; int reexec = 0; int unparsed_args_ptr = 1; char* unparsed_args[ARGC_LIMIT + LIBEXEC_ARGC_EXTRA_LIMIT + 1]; int i; pthread_t _slave_thread; argc = argc_; argv = argv_; /* Drop privileges like it's hot. */ if (drop_privileges()) { perror(*argv); return 1; } /* Sanity check the number of command line arguments. */ if (argc > ARGC_LIMIT + LIBEXEC_ARGC_EXTRA_LIMIT) { eprint("that number of arguments is ridiculous, I will not allow it."); return 1; } /* Parse command line arguments. */ for (i = 1; i < argc; i++) { char* arg = argv[i]; if (strequals(arg, "--initial-spawn")) /* Initial spawn? */ if (is_respawn == 1) { eprintf("conflicting arguments %s and %s cannot be combined.", "--initial-spawn", "--respawn"); return 1; } else is_respawn = 0; else if (strequals(arg, "--respawn")) /* Respawning after crash? */ if (is_respawn == 0) { eprintf("conflicting arguments %s and %s cannot be combined.", "--initial-spawn", "--respawn"); return 1; } else is_respawn = 1; else if (startswith(arg, "--socket-fd=")) /* Socket file descriptor. */ { long int r; char* endptr; if (socket_fd != -1) { eprintf("duplicate declaration of %s.", "--socket-fd"); return -1; } arg += strlen("--socket-fd="); r = strtol(arg, &endptr, 10); if ((*argv == '\0') || isspace(*argv) || (endptr - arg != (ssize_t)strlen(arg)) || (r < 0) || (r > INT_MAX)) { eprintf("invalid value for %s: %s.", "--socket-fd", arg); return 1; } socket_fd = (int)r; } else if (strequals(arg, "--re-exec")) /* Re-exec state-marshal. */ reexec = 1; else /* Not recognised, it is probably for another server. */ unparsed_args[unparsed_args_ptr++] = arg; } unparsed_args[unparsed_args_ptr] = NULL; if (reexec) is_respawn = 1; /* Check that manditory arguments have been specified. */ if (is_respawn < 0) { eprintf("missing state argument, require either %s or %s.", "--initial-spawn", "--respawn"); return 1; } if (socket_fd < 0) { eprint("missing socket file descriptor argument."); return 1; } /* Run mdsinitrc. */ if (is_respawn == 0) { pid_t pid; pid = fork(); if (pid == (pid_t)-1) { perror(*argv); return 1; } if (pid == 0) /* Child process exec:s, the parent continues without waiting for it. */ { /* Close all files except stdin, stdout and stderr. */ DIR* dir = opendir(SELF_FD); struct dirent* file; if (dir == NULL) perror(*argv); /* Well, that is just unfortunate, but we cannot really do anything. */ else while ((file = readdir(dir)) != NULL) if (strcmp(file->d_name, ".") && strcmp(file->d_name, "..")) { int fd = atoi(file->d_name); if (fd > 2) close(fd); } closedir(dir); close(socket_fd); /* Perhaps it is stdin, stdout or stderr. */ /* Run initrc */ run_initrc(unparsed_args); return 1; } } /* Create list and table of clients. */ if (reexec == 0) { if (fd_table_create(&client_map)) { perror(*argv); fd_table_destroy(&client_map, NULL, NULL); return 1; } if (linked_list_create(&client_list, 32)) { perror(*argv); fd_table_destroy(&client_map, NULL, NULL); linked_list_destroy(&client_list); return 1; } } /* Store the current thread so it can be killed from elsewhere. */ master_thread = pthread_self(); /* Make the server update without all slaves dying on SIGUSR1. */ if (xsigaction(SIGUSR1, sigusr1_trap) < 0) { perror(*argv); fd_table_destroy(&client_map, NULL, NULL); linked_list_destroy(&client_list); return 1; } /* Create mutex and condition for slave counter. */ pthread_mutex_init(&slave_mutex, NULL); pthread_cond_init(&slave_cond, NULL); /* Unmarshal the state of the server. */ if (reexec) { pid_t pid = getpid(); int reexec_fd, r; char shm_path[NAME_MAX + 1]; xsnprintf(shm_path, SHM_PATH_PATTERN, (unsigned long int)pid); reexec_fd = shm_open(shm_path, O_RDWR | O_CREAT | O_EXCL, S_IRWXU); if (reexec_fd < 0) { perror(*argv); r = -1; } else { r = unmarshal_server(reexec_fd); close(reexec_fd); shm_unlink(shm_path); } if (r < 0) { /* Close all files (hopefully sockets) we do not know what they are. */ DIR* dir = opendir(SELF_FD); struct dirent* file; if (dir == NULL) perror(*argv); /* Well, that is just unfortunate, but we cannot really do anything. */ else while ((file = readdir(dir)) != NULL) if (strcmp(file->d_name, ".") && strcmp(file->d_name, "..")) { int fd = atoi(file->d_name); if ((fd > 2) && (fd != socket_fd) && (fd_table_contains_key(&client_map, fd) == 0)) close(fd); } closedir(dir); } } /* Accepting incoming connections. */ while (running && (reexecing == 0)) { /* Accept connection. */ int client_fd = accept(socket_fd, NULL, NULL); /* Handle errors and shutdown. */ if (client_fd == -1) { switch (errno) { case EINTR: /* Interrupted. */ if (reexecing) goto reexec; break; case ECONNABORTED: case EINVAL: /* Closing. */ running = 0; break; default: /* Error. */ perror(*argv); break; } continue; } /* Increase number of running slaves. */ with_mutex(slave_mutex, running_slaves++;); /* Start slave thread. */ errno = pthread_create(&_slave_thread, NULL, slave_loop, (void*)(intptr_t)client_fd); if (errno) { perror(*argv); with_mutex(slave_mutex, running_slaves--;); } } if (reexecing) goto reexec; /* Wait for all slaves to close. */ with_mutex(slave_mutex, while (running_slaves > 0) pthread_cond_wait(&slave_cond, &slave_mutex);); /* Release resources. */ fd_table_destroy(&client_map, NULL, NULL); linked_list_destroy(&client_list); pthread_mutex_destroy(&slave_mutex); pthread_cond_destroy(&slave_cond); return 0; reexec: { pid_t pid = getpid(); int reexec_fd; char shm_path[NAME_MAX + 1]; /* Release resources. */ pthread_mutex_destroy(&slave_mutex); pthread_cond_destroy(&slave_cond); /* Join with all slaves threads. */ with_mutex(slave_mutex, while (running_slaves > 0) pthread_cond_wait(&slave_cond, &slave_mutex);); /* Marshal the state of the server. */ xsnprintf(shm_path, SHM_PATH_PATTERN, (unsigned long int)pid); reexec_fd = shm_open(shm_path, O_RDWR | O_CREAT | O_EXCL, S_IRWXU); if (reexec_fd < 0) { perror(*argv); return 1; } if (marshal_server(reexec_fd) < 0) goto reexec_fail; close(reexec_fd); reexec_fd = -1; /* Re-exec the server. */ reexec_server(argc, argv, reexec); reexec_fail: perror(*argv); if (reexec_fd >= 0) close(reexec_fd); shm_unlink(shm_path); /* Returning non-zero is important, otherwise the server cannot be respawn if the re-exec fails. */ return 1; } } /** * Master function for slave threads * * @param data Input data * @return Outout data */ void* slave_loop(void* data) { int socket_fd = (int)(intptr_t)data; ssize_t entry = LINKED_LIST_UNUSED; size_t information_address = fd_table_get(&client_map, (size_t)socket_fd); client_t* information = (client_t*)(void*)information_address; int mutex_created = 0; size_t tmp; int r; if (information == NULL) { /* Create information table. */ information = malloc(sizeof(client_t)); if (information == NULL) { perror(*argv); goto fail; } /* NULL-out pointers. */ information->interception_conditions = NULL; /* Add to list of clients. */ pthread_mutex_lock(&slave_mutex); entry = linked_list_insert_end(&client_list, (size_t)(void*)information); if (entry == LINKED_LIST_UNUSED) { perror(*argv); pthread_mutex_unlock(&slave_mutex); goto fail; } /* Add client to table. */ tmp = fd_table_put(&client_map, socket_fd, (size_t)(void*)information); pthread_mutex_unlock(&slave_mutex); if ((tmp == 0) && errno) { perror(*argv); goto fail; } /* Fill information table. */ information->list_entry = entry; information->socket_fd = socket_fd; information->open = 1; information->id = 0; information->interception_conditions_count = 0; if (mds_message_initialise(&(information->message))) { perror(*argv); goto fail; } } /* Store the thread so that other threads can kill it. */ information->thread = pthread_self(); /* Create mutex to make sure two thread to not try to send messages concurrently, and other slave local actions. */ pthread_mutex_init(&(information->mutex), NULL); mutex_created = 1; /* Make the server update without all slaves dying on SIGUSR1. */ if (xsigaction(SIGUSR1, sigusr1_trap) < 0) { perror(*argv); goto fail; } /* Fetch messages from the slave. */ if (information->open) while (reexecing == 0) { r = mds_message_read(&(information->message), socket_fd); if (r == 0) message_received(information); else if (r == -2) { eprint("corrupt message received."); goto fail; } else if (errno == ECONNRESET) { r = mds_message_read(&(information->message), socket_fd); information->open = 0; if (r == 0) message_received(information); /* Connection closed. */ break; } else if (errno == EINTR) { /* Stop the thread if we are re-exec:ing the server. */ if (reexecing) goto reexec; } else { perror(*argv); goto fail; } } /* Stop the thread if we are re-exec:ing the server. */ if (reexecing) goto reexec; /* TODO multicast information about the client closing. */ fail: /* The loop does break, this done on success as well. */ /* Close socket and free resources. */ close(socket_fd); if (information != NULL) { if (information->interception_conditions != NULL) { size_t i; for (i = 0; i < information->interception_conditions_count; i++) if (information->interception_conditions[i].condition != NULL) free(information->interception_conditions[i].condition); free(information->interception_conditions); } if (mutex_created) pthread_mutex_destroy(&(information->mutex)); mds_message_destroy(&(information->message)); free(information); } /* Unlist client and decrease the slave count. */ with_mutex(slave_mutex, fd_table_remove(&client_map, socket_fd); if (entry != LINKED_LIST_UNUSED) linked_list_remove(&client_list, entry); running_slaves--; pthread_cond_signal(&slave_cond);); return NULL; reexec: /* Tell the master thread that the slave has closed, this is done because re-exec causes a race-condition between the acception of a slave and the execution of the the slave thread. */ with_mutex(slave_mutex, running_slaves--; pthread_cond_signal(&slave_cond);); return NULL; } /** * Perform actions that should be taken when * a message has been received from a client * * @param client The client has sent a message */ void message_received(client_t* client) /* TODO */ { mds_message_t message = client->message; int assign_id = 0; int modifying = 0; int intercept = 0; int64_t priority = 0; int stop = 0; const char* message_id = NULL; size_t i, n; char* msgbuf; /* Parser headers. */ for (i = 0; i < message.header_count; i++) { const char* h = message.headers[i]; if (strequals(h, "Command: assign-id")) assign_id = 1; else if (strequals(h, "Command: intercept")) intercept = 1; else if (strequals(h, "Modifying: yes")) modifying = 1; else if (strequals(h, "Stop: yes")) stop = 1; else if (startswith(h, "Message ID: ")) message_id = strstr(h, ": ") + 2; else if (startswith(h, "Priority: ")) priority = atoll(strstr(h, ": ") + 2); } /* Ignore message if not labeled with a message ID. */ if (message_id == NULL) { eprint("received message with a message ID, ignoring."); return; } /* Assign ID if not already assigned. */ if (assign_id && (client->id == 0)) { intercept |= 2; with_mutex(slave_mutex, client->id = next_id++; if (next_id == 0) { eprint("this is impossible, ID counter has overflowed."); /* If the program ran for a millennium it would take c:a 585 assignments per nanosecond. This cannot possibly happen. (It would require serious dedication by generations of ponies (or just an alicorn) to maintain the process and transfer it new hardware.) */ abort(); } ); } /* Make the client listen for messages addressed to it. */ if (intercept) { pthread_mutex_lock(&(client->mutex)); if ((intercept & 1)) /* from payload */ { /* TODO */ } if ((intercept & 2)) /* "To: $(client->id)" */ { /* TODO */ } pthread_mutex_unlock(&(client->mutex)); } /* Send asigned ID. */ if (assign_id) { /* Construct response. */ n = 2 * 10 + strlen(message_id) + 1; n += strlen("ID assignment: :\nIn response to: \n\n"); msgbuf = malloc(n * sizeof(char)); if (msgbuf == NULL) { perror(*argv); return; } snprintf(msgbuf, n, "ID assignment: %" PRIu32 ":%" PRIu32 "\n" "In response to: %s\n" "\n", (uint32_t)(client->id >> 32), (uint32_t)(client->id >> 0), message_id == NULL ? "" : message_id); n = strlen(msgbuf); /* Send message. */ with_mutex(client->mutex, if (send_message(client->socket_fd, msgbuf, n) < n) /* TODO support EINTR */ perror(*argv); ); free(msgbuf); } } /** * Exec into the mdsinitrc script * * @param args The arguments to the child process */ void run_initrc(char** args) { char pathname[PATH_MAX]; struct passwd* pwd; char* env; char* home; args[0] = pathname; #define __exec(FORMAT, ...) \ xsnprintf(pathname, FORMAT, __VA_ARGS__); execv(args[0], args) /* Test $XDG_CONFIG_HOME. */ if ((env = getenv_nonempty("XDG_CONFIG_HOME")) != NULL) { __exec("%s/%s", env, INITRC_FILE); } /* Test $HOME. */ if ((env = getenv_nonempty("HOME")) != NULL) { __exec("%s/.config/%s", env, INITRC_FILE); __exec("%s/.%s", env, INITRC_FILE); } /* Test ~. */ pwd = getpwuid(getuid()); /* Ignore error. */ if (pwd != NULL) { home = pwd->pw_dir; if ((home != NULL) && (*home != '\0')) { __exec("%s/.config/%s", home, INITRC_FILE); __exec("%s/.%s", home, INITRC_FILE); } } /* Test $XDG_CONFIG_DIRS. */ if ((env = getenv_nonempty("XDG_CONFIG_DIRS")) != NULL) { char* begin = env; char* end; int len; for (;;) { end = strchrnul(begin, ':'); len = (int)(end - begin); if (len > 0) { __exec("%.*s/%s", len, begin, INITRC_FILE); } if (*end == '\0') break; begin = end + 1; } } /* Test /etc. */ __exec("%s/%s", SYSCONFDIR, INITRC_FILE); #undef __exec /* Everything failed. */ eprintf("unable to run %s file, you might as well kill me.", INITRC_FILE); } /** * Called with the signal SIGUSR1 is caught. * This function should cue a re-exec of the program. * * @param signo The caught signal */ void sigusr1_trap(int signo) { if (reexecing == 0) { pthread_t current_thread; ssize_t node; reexecing = 1; current_thread = pthread_self(); if (pthread_equal(current_thread, master_thread) == 0) pthread_kill(master_thread, signo); with_mutex(slave_mutex, foreach_linked_list_node (client_list, node) { client_t* value = (client_t*)(void*)(client_list.values[node]); if (pthread_equal(current_thread, value->thread) == 0) pthread_kill(value->thread, signo); }); } } /** * Marshal the server's state into a file * * @param fd The file descriptor * @return Negative on error */ int marshal_server(int fd) { size_t list_size = linked_list_marshal_size(&client_list); size_t map_size = fd_table_marshal_size(&client_map); size_t list_elements = 0; size_t msg_size = 0; char* state_buf = NULL; char* state_buf_; size_t state_n; ssize_t wrote; ssize_t node; size_t j, n; /* Calculate the grand size of all messages and their buffers. */ for (node = client_list.edge;; list_elements++) { mds_message_t message; client_t* value; if ((node = client_list.next[node]) == client_list.edge) break; value = (client_t*)(void*)(client_list.values[node]); n = value->interception_conditions_count; message = value->message; msg_size += mds_message_marshal_size(&message, 1); msg_size += n * (sizeof(size_t) + sizeof(int64_t) + sizeof(int)); for (j = 0; j < n; j++) msg_size += (strlen(value->interception_conditions[j].condition) + 1) * sizeof(char); } /* Calculate the grand size of all client information. */ state_n = 5 + sizeof(size_t) + 2 * sizeof(int) + 1 * sizeof(uint64_t); state_n *= list_elements; state_n += msg_size; /* Add the size of the rest of the program's state. */ state_n += 2 * sizeof(int) + 1 * sizeof(sig_atomic_t) + 2 * sizeof(size_t); /* Allocate a buffer for all data except the client list and the client map. */ state_buf = state_buf_ = malloc(state_n); if (state_buf == NULL) goto fail; /* Tell the new version of the program what version of the program it is marshalling. */ buf_set_next(state_buf_, int, MDS_SERVER_VARS_VERSION); /* Marshal the miscellaneous state data. */ buf_set_next(state_buf_, sig_atomic_t, running); buf_set_next(state_buf_, uint64_t, next_id); /* Tell the program how large the marshalled client list is and how any clients are marshalled. */ buf_set_next(state_buf_, size_t, list_size); buf_set_next(state_buf_, size_t, list_elements); /* Marshal the clients. */ foreach_linked_list_node (client_list, node) { /* Get the memory address of the client. */ size_t value_address = client_list.values[node]; /* Get the client's information. */ client_t* value = (client_t*)(void*)value_address; /* Get the marshalled size of the message. */ msg_size = mds_message_marshal_size(&(value->message), 1); /* Marshal the address, it is used the the client list and the client map, that will be marshalled. */ buf_set_next(state_buf_, size_t, value_address); /* Tell the program how large the marshalled message is. */ buf_set_next(state_buf_, size_t, msg_size); /* Marshal the client info. */ buf_set_next(state_buf_, ssize_t, value->list_entry); buf_set_next(state_buf_, int, value->socket_fd); buf_set_next(state_buf_, int, value->open); buf_set_next(state_buf_, uint64_t, value->id); /* Marshal interception conditions. */ buf_set_next(state_buf_, size_t, n = value->interception_conditions_count); for (j = 0; j < n; j++) { interception_condition_t cond = value->interception_conditions[j]; memcpy(state_buf_, cond.condition, strlen(cond.condition) + 1); buf_next(state_buf_, char, strlen(cond.condition) + 1); buf_set_next(state_buf_, size_t, cond.header_hash); buf_set_next(state_buf_, int64_t, cond.priority); buf_set_next(state_buf_, int, cond.modifying); } /* Marshal the message. */ mds_message_marshal(&(value->message), state_buf_, 1); state_buf_ += msg_size / sizeof(char); } /* Send the marshalled data into the file. */ while (state_n > 0) { errno = 0; wrote = write(fd, state_buf, state_n); if (errno && (errno != EINTR)) goto fail; state_n -= (size_t)max(wrote, 0); state_buf += (size_t)max(wrote, 0); } free(state_buf); /* Marshal, and send into the file, the client list. */ state_buf = malloc(list_size); if (state_buf == NULL) goto fail; linked_list_marshal(&client_list, state_buf); while (list_size > 0) { errno = 0; wrote = write(fd, state_buf, list_size); if (errno && (errno != EINTR)) goto fail; list_size -= (size_t)max(wrote, 0); state_buf += (size_t)max(wrote, 0); } free(state_buf); /* Marshal, and send into the file, the client map. */ state_buf = malloc(map_size); if (state_buf == NULL) goto fail; fd_table_marshal(&client_map, state_buf); while (map_size > 0) { errno = 0; wrote = write(fd, state_buf, map_size); if (errno && (errno != EINTR)) goto fail; map_size -= (size_t)max(wrote, 0); state_buf += (size_t)max(wrote, 0); } free(state_buf); return 0; fail: if (state_buf != NULL) free(state_buf); return -1; } /** * Address translation table used by `unmarshal_server` and `remapper` */ static hash_table_t unmarshal_remap_map; /** * Address translator for `unmarshal_server` * * @param old The old address * @return The new address */ static size_t unmarshal_remapper(size_t old) { return hash_table_get(&unmarshal_remap_map, old); } /** * Unmarshal the server's state from a file * * @param fd The file descriptor * @return Negative on error */ int unmarshal_server(int fd) { int with_error = 0; size_t state_buf_size = 8 << 10; size_t state_buf_ptr = 0; ssize_t got; char* state_buf; char* state_buf_; size_t list_size; size_t list_elements; size_t i; ssize_t node; pthread_t _slave_thread; /* Allocate buffer for data. */ state_buf = state_buf_ = malloc(state_buf_size * sizeof(char)); if (state_buf == NULL) { perror(*argv); return -1; } /* Read the file. */ for (;;) { /* Grow buffer if it is too small. */ if (state_buf_size == state_buf_ptr) { char* old_buf = state_buf; state_buf = realloc(state_buf, (state_buf_size <<= 1) * sizeof(char)); if (state_buf == NULL) { perror(*argv); free(old_buf); return -1; } } /* Read from the file into the buffer. */ got = read(fd, state_buf + state_buf_ptr, state_buf_size - state_buf_ptr); if (got < 0) { perror(*argv); free(state_buf); return -1; } if (got == 0) break; state_buf_ptr += (size_t)got; } /* Create memory address remapping table. */ if (hash_table_create(&unmarshal_remap_map)) { perror(*argv); free(state_buf); hash_table_destroy(&unmarshal_remap_map, NULL, NULL); return -1; } /* Get the marshal protocal version. Not needed, there is only the one version right now. */ /* buf_get(state_buf_, int, 0, MDS_SERVER_VARS_VERSION); */ buf_next(state_buf_, int, 1); /* Unmarshal the miscellaneous state data. */ buf_get_next(state_buf_, sig_atomic_t, running); buf_get_next(state_buf_, uint64_t, next_id); /* Get the marshalled size of the client list and how any clients that are marshalled. */ buf_get_next(state_buf_, size_t, list_size); buf_get_next(state_buf_, size_t, list_elements); /* Unmarshal the clients. */ for (i = 0; i < list_elements; i++) { size_t seek = 0; size_t j = 0, n = 0; size_t value_address; size_t msg_size; client_t* value; /* Allocate the client's information. */ if ((value = malloc(sizeof(client_t))) == NULL) { perror(*argv); goto clients_fail; } /* Unmarshal the address, it is used the the client list and the client map, that are also marshalled. */ buf_get_next(state_buf_, size_t, value_address); /* Get the marshalled size of the message. */ buf_get_next(state_buf_, size_t, msg_size); /* Unmarshal the client info. */ buf_get_next(state_buf_, ssize_t, value->list_entry); buf_get_next(state_buf_, int, value->socket_fd); buf_get_next(state_buf_, int, value->open); buf_set_next(state_buf_, uint64_t, value->id); /* Unmarshal interception conditions. */ buf_get_next(state_buf_, size_t, value->interception_conditions_count = n); seek = 5 * sizeof(size_t) + 2 * sizeof(int) + 1 * sizeof(uint64_t); value->interception_conditions = malloc(n * sizeof(interception_condition_t)); if (value->interception_conditions == NULL) { perror(*argv); goto clients_fail; } for (j = 0; j < n; j++) { interception_condition_t* cond = value->interception_conditions + j; size_t m = strlen(state_buf_) + 1; if ((cond->condition = malloc(m * sizeof(char))) == NULL) { perror(*argv); goto clients_fail; } memcpy(cond->condition, state_buf_, m); buf_next(state_buf_, char, m); buf_get_next(state_buf_, size_t, cond->header_hash); buf_get_next(state_buf_, int64_t, cond->priority); buf_get_next(state_buf_, int, cond->modifying); seek += m * sizeof(char) + sizeof(size_t) + sizeof(int64_t) + sizeof(int); } /* Unmarshal the message. */ if (mds_message_unmarshal(&(value->message), state_buf_)) { perror(*argv); mds_message_destroy(&(value->message)); goto clients_fail; } state_buf_ += msg_size / sizeof(char); /* Populate the remapping table. */ hash_table_put(&unmarshal_remap_map, value_address, (size_t)(void*)value); /* On error, seek past all clients. */ continue; clients_fail: with_error = 1; if (value != NULL) { if (value->interception_conditions != NULL) { for (j = 0; j < n; j++) if (value->interception_conditions[j].condition != NULL) free(value->interception_conditions[j].condition); free(value->interception_conditions); } free(value); } state_buf_ -= seek / sizeof(char); for (; i < list_elements; i++) { /* There is not need to close the sockets, it is done by the caller because there are conditions where we cannot get here anyway. */ msg_size = ((size_t*)state_buf_)[1]; buf_next(state_buf_, size_t, 4); buf_next(state_buf_, int, 2); buf_next(state_buf_, uint64_t, 1); buf_get_next(state_buf_, size_t, n); for (j = 0; j < n; j++) { buf_next(state_buf_, char, strlen(state_buf_) + 1); buf_next(state_buf_, size_t, 1); buf_next(state_buf_, int64_t, 1); buf_next(state_buf_, int, 1); } state_buf_ += msg_size / sizeof(char); } break; } /* Unmarshal the client list. */ linked_list_unmarshal(&client_list, state_buf_); state_buf_ += list_size / sizeof(char); /* Unmarshal the client map. */ fd_table_unmarshal(&client_map, state_buf_, unmarshal_remapper); /* Release the raw data. */ free(state_buf); /* Remove non-found elements from the fd table. */ if (with_error) for (i = 0; i < client_map.capacity; i++) if (client_map.used[i / 64] & ((uint64_t)1 << (i % 64))) if (client_map.values[i] == 0) /* Lets not presume that fd-table actually initialise its allocations. */ client_map.used[i / 64] &= ~((uint64_t)1 << (i % 64)); /* Remap the linked list and remove non-found elements, and start the clients. */ foreach_linked_list_node (client_list, node) { /* Remap the linked list and remove non-found elements. */ size_t new_address = unmarshal_remapper(client_list.values[node]); client_list.values[node] = new_address; if (new_address == 0) /* Returned if missing (or if the address is the invalid NULL.) */ linked_list_remove(&client_list, node); else { /* Start the clients. (Errors do not need to be reported.) */ client_t* client = (client_t*)(void*)new_address; int socket_fd = client->socket_fd; /* Increase number of running slaves. */ with_mutex(slave_mutex, running_slaves++;); /* Start slave thread. */ errno = pthread_create(&_slave_thread, NULL, slave_loop, (void*)(intptr_t)socket_fd); if (errno) { perror(*argv); with_mutex(slave_mutex, running_slaves--;); } } } /* Release the remapping table's resources. */ hash_table_destroy(&unmarshal_remap_map, NULL, NULL); return -with_error; }