/**
* mds — A micro-display server
* Copyright © 2014 Mattias Andrée (maandree@member.fsf.org)
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "mds-server.h"
#include "interception_condition.h"
#include "client.h"
#include "queued_interception.h"
#include "multicast.h"
#include <libmdsserver/config.h>
#include <libmdsserver/linked-list.h>
#include <libmdsserver/hash-table.h>
#include <libmdsserver/fd-table.h>
#include <libmdsserver/mds-message.h>
#include <libmdsserver/macros.h>
#include <libmdsserver/util.h>
#include <libmdsserver/hash-help.h>
#include <signal.h>
#include <stdio.h>
#include <string.h>
#include <stdlib.h>
#include <limits.h>
#include <unistd.h>
#include <pwd.h>
#include <errno.h>
#include <pthread.h>
#include <sys/socket.h>
#include <sys/mman.h>
#include <fcntl.h>
#include <sys/stat.h>
#include <sys/types.h>
#include <dirent.h>
#include <inttypes.h>
#include <time.h>
#define MDS_SERVER_VARS_VERSION 0
/**
* Number of elements in `argv`
*/
static int argc;
/**
* Command line arguments
*/
static char** argv;
/**
* The program run state, 1 when running, 0 when shutting down
*/
static volatile sig_atomic_t running = 1;
/**
* Non-zero when the program is about to re-exec.
* Most at all times be at least as true as `terminating`.
*/
static volatile sig_atomic_t reexecing = 0;
/**
* Non-zero when the program is about to terminate
*/
static volatile sig_atomic_t terminating = 0;
/**
* The number of running slaves
*/
static int running_slaves = 0;
/**
* Mutex for slave data
*/
static pthread_mutex_t slave_mutex;
/**
* Condition for slave data
*/
static pthread_cond_t slave_cond;
/**
* The thread that runs the master loop
*/
static pthread_t master_thread;
/**
* Map from client socket file descriptor to all information (client_t)
*/
static fd_table_t client_map;
/**
* List of client information (client_t)
*/
static linked_list_t client_list;
/**
* The next free ID for a client
*/
static uint64_t next_id = 1;
/**
* The next free ID for a message modifications
*/
static uint64_t next_modify_id = 1;
/**
* Mutex for message modification
*/
static pthread_mutex_t modify_mutex;
/**
* Condition for message modification
*/
static pthread_cond_t modify_cond;
/**
* Map from modification ID to waiting client
*/
static hash_table_t modify_map;
/**
* Entry point of the server
*
* @param argc_ Number of elements in `argv_`
* @param argv_ Command line arguments
* @return Non-zero on error
*/
int main(int argc_, char** argv_)
{
int is_respawn = -1;
int socket_fd = -1;
int reexec = 0;
int unparsed_args_ptr = 1;
char* unparsed_args[ARGC_LIMIT + LIBEXEC_ARGC_EXTRA_LIMIT + 1];
int i;
pthread_t slave_thread;
#if (LIBEXEC_ARGC_EXTRA_LIMIT < 3)
# error LIBEXEC_ARGC_EXTRA_LIMIT is too small, need at least 3.
#endif
argc = argc_;
argv = argv_;
/* Drop privileges like it's hot. */
fail_if (drop_privileges());
/* Sanity check the number of command line arguments. */
exit_if (argc > ARGC_LIMIT + LIBEXEC_ARGC_EXTRA_LIMIT,
eprint("that number of arguments is ridiculous, I will not allow it."););
/* Parse command line arguments. */
for (i = 1; i < argc; i++)
{
char* arg = argv[i];
int v;
if ((v = strequals(arg, "--initial-spawn")) || /* Initial spawn? */
strequals(arg, "--respawn")) /* Respawning after crash? */
{
exit_if (is_respawn == v,
eprintf("conflicting arguments %s and %s cannot be combined.",
"--initial-spawn", "--respawn"););
is_respawn = !v;
}
else if (startswith(arg, "--socket-fd=")) /* Socket file descriptor. */
{
exit_if (socket_fd != -1,
eprintf("duplicate declaration of %s.", "--socket-fd"););
exit_if (strict_atoi(arg += strlen("--socket-fd="), &socket_fd, 0, INT_MAX) < 0,
eprintf("invalid value for %s: %s.", "--socket-fd", arg););
}
else if (strequals(arg, "--re-exec")) /* Re-exec state-marshal. */
reexec = 1;
else
/* Not recognised, it is probably for another server. */
unparsed_args[unparsed_args_ptr++] = arg;
}
unparsed_args[unparsed_args_ptr] = NULL;
if (reexec)
{
is_respawn = 1;
eprint("re-exec performed.");
}
/* Check that manditory arguments have been specified. */
exit_if (is_respawn < 0,
eprintf("missing state argument, require either %s or %s.",
"--initial-spawn", "--respawn"););
exit_if (socket_fd < 0,
eprint("missing socket file descriptor argument."););
/* Run mdsinitrc. */
if (is_respawn == 0)
{
pid_t pid = fork();
fail_if (pid == (pid_t)-1);
if (pid == 0) /* Child process exec:s, the parent continues without waiting for it. */
{
/* Close all files except stdin, stdout and stderr. */
close_files((fd > 2) || (fd == socket_fd));
/* Run mdsinitrc. */
run_initrc(unparsed_args);
return 1;
}
}
#define __free(I) \
if (I >= 0) fd_table_destroy(&client_map, NULL, NULL); \
if (I >= 1) linked_list_destroy(&client_list); \
if (I >= 2) pthread_mutex_destroy(&slave_mutex); \
if (I >= 3) pthread_cond_destroy(&slave_cond); \
if (I >= 4) pthread_mutex_destroy(&modify_mutex); \
if (I >= 5) pthread_cond_destroy(&modify_cond); \
if (I >= 6) hash_table_destroy(&modify_map, NULL, NULL)
#define error_if(I, CONDITION) if (CONDITION) { perror(*argv); __free(I); return 1; }
/* Create list and table of clients. */
if (reexec == 0)
{
error_if (0, fd_table_create(&client_map));
error_if (1, linked_list_create(&client_list, 32));
}
/* Store the current thread so it can be killed from elsewhere. */
master_thread = pthread_self();
/* Make the server update without all slaves dying on SIGUSR1. */
error_if (1, xsigaction(SIGUSR1, sigusr1_trap) < 0);
/* Implement clean exit on SIGTERM. */
error_if (1, xsigaction(SIGTERM, sigterm_trap) < 0);
/* Create mutex and condition for slave counter. */
error_if (1, (errno = pthread_mutex_init(&slave_mutex, NULL)));
error_if (2, (errno = pthread_cond_init(&slave_cond, NULL)));
/* Create mutex, condition and map for message modification. */
error_if (3, (errno = pthread_mutex_init(&modify_mutex, NULL)));
error_if (4, (errno = pthread_cond_init(&modify_cond, NULL)));
error_if (5, hash_table_create(&modify_map));
#undef error_if
/* Unmarshal the state of the server. */
if (reexec)
{
pid_t pid = getpid();
int reexec_fd, r;
char shm_path[NAME_MAX + 1];
/* Acquire access to marshalled data. */
xsnprintf(shm_path, SHM_PATH_PATTERN, (unsigned long int)pid);
reexec_fd = shm_open(shm_path, O_RDONLY, S_IRWXU);
fail_if (reexec_fd < 0); /* Critical. */
/* Unmarshal state. */
r = unmarshal_server(reexec_fd);
close(reexec_fd);
shm_unlink(shm_path);
if (r < 0)
{
/* Close all files (hopefully sockets) we do not know what they are. */
close_files((fd > 2) && (fd != socket_fd) && (fd_table_contains_key(&client_map, fd) == 0));
}
}
/* Accepting incoming connections. */
while (running && (terminating == 0))
{
/* Accept connection. */
int client_fd = accept(socket_fd, NULL, NULL);
/* Handle errors and shutdown. */
if (client_fd == -1)
{
switch (errno)
{
case EINTR:
/* Interrupted. */
if (terminating)
goto terminate;
break;
case ECONNABORTED:
case EINVAL:
/* Closing. */
running = 0;
break;
default:
/* Error. */
perror(*argv);
break;
}
continue;
}
/* Increase number of running slaves. */
with_mutex (slave_mutex, running_slaves++;);
/* Start slave thread. */
create_slave(&slave_thread, client_fd);
}
terminate:
if (reexecing)
goto reexec;
/* Join with all slaves threads. */
with_mutex (slave_mutex,
while (running_slaves > 0)
pthread_cond_wait(&slave_cond, &slave_mutex););
/* Release resources. */
__free(9999);
return 0;
#undef __free
reexec:
{
pid_t pid = getpid();
int reexec_fd;
char shm_path[NAME_MAX + 1];
ssize_t node;
/* Join with all slaves threads. */
with_mutex (slave_mutex,
while (running_slaves > 0)
pthread_cond_wait(&slave_cond, &slave_mutex););
/* Release resources. */
pthread_mutex_destroy(&slave_mutex);
pthread_cond_destroy(&slave_cond);
pthread_mutex_destroy(&modify_mutex);
pthread_cond_destroy(&modify_cond);
hash_table_destroy(&modify_map, NULL, NULL);
/* Marshal the state of the server. */
xsnprintf(shm_path, SHM_PATH_PATTERN, (unsigned long int)pid);
reexec_fd = shm_open(shm_path, O_RDWR | O_CREAT | O_EXCL, S_IRWXU);
fail_if (reexec_fd < 0);
if (marshal_server(reexec_fd) < 0)
goto reexec_fail;
close(reexec_fd);
reexec_fd = -1;
/* Release resources. */
foreach_linked_list_node (client_list, node)
{
client_t* client = (client_t*)(void*)(client_list.values[node]);
client_destroy(client);
}
fd_table_destroy(&client_map, NULL, NULL);
linked_list_destroy(&client_list);
/* Re-exec the server. */
reexec_server(argc, argv, reexec);
reexec_fail:
perror(*argv);
if (reexec_fd >= 0)
close(reexec_fd);
shm_unlink(shm_path);
/* Returning non-zero is important, otherwise the server cannot
be respawn if the re-exec fails. */
return 1;
}
pfail:
perror(*argv);
return 1;
}
/**
* Master function for slave threads
*
* @param data Input data
* @return Outout data
*/
void* slave_loop(void* data)
{
int socket_fd = (int)(intptr_t)data;
ssize_t entry = LINKED_LIST_UNUSED;
size_t information_address = fd_table_get(&client_map, (size_t)socket_fd);
client_t* information = (client_t*)(void*)information_address;
char* msgbuf = NULL;
size_t n;
size_t tmp;
int r;
if (information == NULL)
{
/* Create information table. */
fail_if (xmalloc(information, 1, client_t));
client_initialise(information);
/* Add to list of clients. */
pthread_mutex_lock(&slave_mutex);
entry = linked_list_insert_end(&client_list, (size_t)(void*)information);
if (entry == LINKED_LIST_UNUSED)
{
perror(*argv);
pthread_mutex_unlock(&slave_mutex);
goto fail;
}
/* Add client to table. */
tmp = fd_table_put(&client_map, socket_fd, (size_t)(void*)information);
if ((tmp == 0) && errno)
{
perror(*argv);
pthread_mutex_unlock(&slave_mutex);
goto fail;
}
pthread_mutex_unlock(&slave_mutex);
/* Fill information table. */
information->list_entry = entry;
information->socket_fd = socket_fd;
information->open = 1;
fail_if (mds_message_initialise(&(information->message)));
}
/* Store slave thread and create mutexes and conditions. */
fail_if (client_initialise_threading(information));
/* Make the server update without all slaves dying on SIGUSR1. */
fail_if (xsigaction(SIGUSR1, sigusr1_trap) < 0);
/* Implement clean exit on SIGTERM. */
fail_if (xsigaction(SIGTERM, sigterm_trap) < 0);
/* Fetch messages from the slave. */
while ((terminating == 0) && information->open)
{
/* Send queued multicast messages. */
send_multicast_queue(information);
/* Send queued messages. */
send_reply_queue(information);
/* Fetch message. */
r = fetch_message(information);
if ((r == 0) && message_received(information))
goto terminate;
else if (r == -2)
goto fail;
else if (r && (errno == EINTR) && terminating)
goto terminate; /* Stop the thread if we are re-exec:ing or terminating the server. */
}
/* Stop the thread if we are re-exec:ing or terminating the server. */
if (terminating)
goto terminate;
/* Multicast information about the client closing. */
n = 2 * 10 + 1 + strlen("Client closed: :\n\n");
fail_if (xmalloc(msgbuf, n, char));
snprintf(msgbuf, n,
"Client closed: %" PRIu32 ":%" PRIu32 "\n"
"\n",
(uint32_t)(information->id >> 32),
(uint32_t)(information->id >> 0));
n = strlen(msgbuf);
queue_message_multicast(msgbuf, n, information);
msgbuf = NULL;
terminate: /* This done on success as well. */
if (reexecing)
goto reexec;
fail: /* This done on success as well. */
/* Close socket and free resources. */
close(socket_fd);
free(msgbuf);
if (information != NULL)
client_destroy(information);
/* Unlist client and decrease the slave count. */
with_mutex (slave_mutex,
fd_table_remove(&client_map, socket_fd);
if (entry != LINKED_LIST_UNUSED)
linked_list_remove(&client_list, entry);
running_slaves--;
pthread_cond_signal(&slave_cond););
return NULL;
pfail:
perror(*argv);
goto fail;
reexec:
/* Tell the master thread that the slave has closed,
this is done because re-exec causes a race-condition
between the acception of a slave and the execution
of the the slave thread. */
with_mutex (slave_mutex,
running_slaves--;
pthread_cond_signal(&slave_cond););
return NULL;
}
/**
* Send the next message in a clients multicast queue
*
* @param client The client
*/
void send_multicast_queue(client_t* client)
{
while (client->multicasts_count > 0)
{
multicast_t multicast;
with_mutex_if (client->mutex, client->multicasts_count > 0,
size_t c = (client->multicasts_count -= 1) * sizeof(multicast_t);
multicast = client->multicasts[0];
memmove(client->multicasts, client->multicasts + 1, c);
if (c == 0)
{
free(client->multicasts);
client->multicasts = NULL;
}
);
multicast_message(&multicast);
multicast_destroy(&multicast);
}
}
/**
* Send the messages that are in a clients reply queue
*
* @param client The client
*/
void send_reply_queue(client_t* client)
{
char* sendbuf = client->send_pending;
char* sendbuf_ = sendbuf;
size_t sent;
size_t n;
if (client->send_pending_size == 0)
return;
n = client->send_pending_size;
client->send_pending_size = 0;
client->send_pending = NULL;
with_mutex (client->mutex,
while (n > 0)
{
sent = send_message(client->socket_fd, sendbuf_, n);
n -= sent;
sendbuf_ += sent / sizeof(char);
if ((n > 0) && (errno != EINTR)) /* Ignore EINTR */
{
perror(*argv);
break;
}
}
free(sendbuf);
);
}
/**
* Receive a full message and update open status if the client closes
*
* @param client The client
* @return Zero on success, -2 on failure, otherwise -1
*/
int fetch_message(client_t* client)
{
int r = mds_message_read(&(client->message), client->socket_fd);
if (r == 0)
return 0;
if (r == -2)
eprint("corrupt message received.");
else if (errno == ECONNRESET)
{
r = mds_message_read(&(client->message), client->socket_fd);
client->open = 0;
/* Connection closed. */
}
else if (errno != EINTR)
{
r = -2;
perror(*argv);
}
return r;
}
/**
* Perform actions that should be taken when
* a message has been received from a client
*
* @param client The client has sent a message
* @return Normally zero, but 1 if exited because of re-exec or termination
*/
int message_received(client_t* client)
{
mds_message_t message = client->message;
int assign_id = 0;
int modifying = 0;
int intercept = 0;
int64_t priority = 0;
int stop = 0;
const char* message_id = NULL;
uint64_t modify_id = 0;
size_t i, n;
char* msgbuf;
/* Parser headers. */
for (i = 0; i < message.header_count; i++)
{
const char* h = message.headers[i];
if (strequals(h, "Command: assign-id")) assign_id = 1;
else if (strequals(h, "Command: intercept")) intercept = 1;
else if (strequals(h, "Modifying: yes")) modifying = 1;
else if (strequals(h, "Stop: yes")) stop = 1;
else if (startswith(h, "Message ID: ")) message_id = strstr(h, ": ") + 2;
else if (startswith(h, "Priority: ")) priority = atoll(strstr(h, ": ") + 2);
else if (startswith(h, "Modify ID: ")) modify_id = (uint64_t)atoll(strstr(h, ": ") + 2);
}
/* Notify waiting client about a received message modification. */
if (modifying != 0)
{
/* pthread_cond_timedwait is required to handle re-exec and termination because
pthread_cond_timedwait and pthread_cond_wait ignore interruptions via signals. */
struct timespec timeout =
{
.tv_sec = 1,
.tv_nsec = 0
};
size_t address;
client_t* recipient;
mds_message_t* multicast;
pthread_mutex_lock(&(modify_mutex));
while (hash_table_contains_key(&modify_map, (size_t)modify_id) == 0)
{
if (terminating)
{
pthread_mutex_unlock(&(modify_mutex));
return 1;
}
pthread_cond_timedwait(&slave_cond, &slave_mutex, &timeout);
}
address = hash_table_get(&modify_map, (size_t)modify_id);
recipient = (client_t*)(void*)address;
if (xmalloc(multicast = recipient->modify_message, 1, mds_message_t))
goto fail;
multicast->headers = NULL;
multicast->header_count = 0;
multicast->payload = NULL;
multicast->payload_size = 0;
multicast->payload_ptr = 0;
multicast->buffer = NULL;
multicast->buffer_size = 0;
multicast->buffer_ptr = 0;
multicast->stage = 0;
if (xmalloc(multicast->payload, message.payload_size, char))
goto fail;
memcpy(multicast->payload, message.payload, message.payload_size * sizeof(char));
if (xmalloc(multicast->headers, message.header_count, char*))
goto fail;
for (i = 0; i < message.header_count; i++, multicast->header_count++)
{
multicast->headers[i] = strdup(message.headers[i]);
if (multicast->headers[i] == NULL)
goto fail;
}
goto done;
fail:
if (multicast != NULL)
{
mds_message_destroy(multicast);
free(multicast);
recipient->modify_message = NULL;
}
done:
pthread_mutex_unlock(&(modify_mutex));
with_mutex (client->modify_mutex, pthread_cond_signal(&(client->modify_cond)););
/* Do nothing more, not not even multicast this message. */
return 0;
}
if (message_id == NULL)
{
eprint("received message with a message ID, ignoring.");
return 0;
}
/* Assign ID if not already assigned. */
if (assign_id && (client->id == 0))
{
intercept |= 2;
with_mutex_if (slave_mutex, (client->id = ++next_id) == 0,
eprint("this is impossible, ID counter has overflowed.");
/* If the program ran for a millennium it would
take c:a 585 assignments per nanosecond. This
cannot possibly happen. (It would require serious
dedication by generations of ponies (or just an alicorn)
to maintain the process and transfer it new hardware.) */
abort();
);
}
/* Make the client listen for messages addressed to it. */
if (intercept)
{
size_t size = 64; /* Atleast 25, otherwise we get into problems at ((intercept & 2)) */
char* buf;
if (xmalloc(buf, size + 1, char))
{
perror(*argv);
return 0;
}
pthread_mutex_lock(&(client->mutex));
if ((intercept & 1)) /* from payload */
{
char* payload = client->message.payload;
size_t payload_size = client->message.payload_size;
if (client->message.payload_size == 0) /* All messages */
{
*buf = '\0';
add_intercept_condition(client, buf, priority, modifying, stop);
}
else /* Filtered messages */
for (;;)
{
char* end = memchr(payload, '\n', payload_size);
size_t len = end == NULL ? payload_size : (size_t)(end - payload);
if (len == 0)
{
payload++;
payload_size--;
break;
}
if (len > size)
{
char* old_buf = buf;
if (xrealloc(buf, (size <<= 1) + 1, char))
{
perror(*argv);
free(old_buf);
pthread_mutex_unlock(&(client->mutex));
return 0;
}
}
memcpy(buf, payload, len);
buf[len] = '\0';
add_intercept_condition(client, buf, priority, modifying, stop);
if (end == NULL)
break;
payload = end + 1;
payload_size -= len + 1;
}
}
if ((intercept & 2)) /* "To: $(client->id)" */
{
snprintf(buf, size, "To: %" PRIu32 ":%" PRIu32,
(uint32_t)(client->id >> 32),
(uint32_t)(client->id >> 0));
add_intercept_condition(client, buf, priority, modifying, 0);
}
pthread_mutex_unlock(&(client->mutex));
free(buf);
}
/* Multicast the message. */
n = mds_message_compose_size(&message);
if ((msgbuf = malloc(n)) == NULL)
{
perror(*argv);
return 0;
}
mds_message_compose(&message, msgbuf);
queue_message_multicast(msgbuf, n / sizeof(char), client);
/* Send asigned ID. */
if (assign_id)
{
char* msgbuf_;
/* Construct response. */
n = 2 * 10 + strlen(message_id) + 1;
n += strlen("ID assignment: :\nIn response to: \n\n");
if (xmalloc(msgbuf, n, char))
{
perror(*argv);
return 0;
}
snprintf(msgbuf, n,
"ID assignment: %" PRIu32 ":%" PRIu32 "\n"
"In response to: %s\n"
"\n",
(uint32_t)(client->id >> 32),
(uint32_t)(client->id >> 0),
message_id == NULL ? "" : message_id);
n = strlen(msgbuf);
/* Multicast the reply. */
msgbuf_ = strdup(msgbuf);
if (msgbuf_ == NULL)
{
perror(*argv);
free(msgbuf);
return 0;
}
queue_message_multicast(msgbuf_, n, client);
/* Queue message to be sent when this function returns.
This done to simplify `multicast_message` for re-exec and termination. */
with_mutex (client->mutex,
if (client->send_pending_size == 0)
{
/* Set the pending message. */
client->send_pending = msgbuf;
client->send_pending_size = n;
}
else
{
/* Concatenate message to already pending messages. */
size_t new_len = client->send_pending_size + n;
char* msg_new = realloc(client->send_pending, new_len * sizeof(char));
if (msg_new != NULL)
{
memcpy(msg_new + client->send_pending_size, msgbuf, n * sizeof(char));
client->send_pending = msg_new;
client->send_pending_size = new_len;
}
else
perror(*argv);
free(msgbuf);
}
);
}
return 0;
}
/**
* Remove interception condition by index
*
* @param client The intercepting client
* @param index The index of the condition
*/
static void remove_intercept_condition(client_t* client, size_t index)
{
interception_condition_t* conds = client->interception_conditions;
size_t n = client->interception_conditions_count;
/* Remove the condition from the list. */
memmove(conds + index, conds + index + 1, --n - index);
client->interception_conditions_count--;
/* Shrink the list. */
if (client->interception_conditions_count == 0)
{
free(conds);
client->interception_conditions = NULL;
}
else
if (xrealloc(conds, n, interception_condition_t))
perror(*argv);
else
client->interception_conditions = conds;
}
/**
* Add an interception condition for a client
*
* @param client The client
* @param condition The header, optionally with value, to look for, or empty (not `NULL`) for all messages
* @param priority Interception priority
* @param modifying Whether the client may modify the messages
* @param stop Whether the condition should be removed rather than added
*/
void add_intercept_condition(client_t* client, char* condition, int64_t priority, int modifying, int stop)
{
size_t n = client->interception_conditions_count;
interception_condition_t* conds = client->interception_conditions;
ssize_t nonmodifying = -1;
char* header = condition;
char* value;
size_t hash;
size_t i;
/* Split header and value apart. */
if ((value = strchr(header, ':')) != NULL)
{
*value = '\0'; /* NUL-terminate header. */
value += 2; /* Skip over delimiter. */
}
/* Calcuate header hash (comparison optimisation) */
hash = string_hash(header);
/* Remove of update condition of already registered,
also look for non-modifying condition to swap position
with for optimisation. */
for (i = 0; i < n; i++)
{
if ((conds[i].header_hash != hash) || !strequals(conds[i].condition, condition))
{
/* Look for the first non-modifying, this is a part of the
optimisation where we put all modifying conditions at the
beginning. */
if ((nonmodifying < 0) && conds[i].modifying)
nonmodifying = (ssize_t)i;
continue;
}
if (stop)
remove_intercept_condition(client, i);
else
{
/* Update parameters. */
conds[i].priority = priority;
conds[i].modifying = modifying;
if (modifying && (nonmodifying >= 0))
{
/* Optimisation: put conditions that are modifying
at the beginning. When a client is intercepting
we most know if any satisfying condition is
modifying. With this optimisation the first
satisfying condition will tell us if there is
any satisfying condition that is modifying. */
interception_condition_t temp = conds[nonmodifying];
conds[nonmodifying] = conds[i];
conds[i] = temp;
}
}
return;
}
if (stop)
eprint("client tried to stop intercepting messages that it does not intercept.");
else
{
/* Duplicate condition string. */
if ((condition = strdup(condition)) == NULL)
{
perror(*argv);
return;
}
/* Grow the interception condition list. */
if (xrealloc(conds, n + 1, interception_condition_t))
{
perror(*argv);
free(condition);
return;
}
client->interception_conditions = conds;
/* Store condition. */
client->interception_conditions_count++;
conds[n].condition = condition;
conds[n].header_hash = hash;
conds[n].priority = priority;
conds[n].modifying = modifying;
if (modifying && (nonmodifying >= 0))
{
/* Optimisation: put conditions that are modifying
at the beginning. When a client is intercepting
we most know if any satisfying condition is
modifying. With this optimisation the first
satisfying condition will tell us if there is
any satisfying condition that is modifying. */
interception_condition_t temp = conds[nonmodifying];
conds[nonmodifying] = conds[n];
conds[n] = temp;
}
}
}
/**
* Compare two queued interceptors by priority
*
* @param a:const queued_interception_t* One of the interceptors
* @param b:const queued_interception_t* The other of the two interceptors
* @return Negative if a before b, positive if a after b, otherwise zero
*/
static int cmp_queued_interception(const void* a, const void* b)
{
const queued_interception_t* p = b; /* Highest first, so swap them. */
const queued_interception_t* q = a;
return p->priority < q->priority ? -1 :
p->priority > q->priority ? 1 : 0;
}
/**
* Check if a condition matches any of a set of accepted patterns
*
* @param cond The condition
* @param hashes The hashes of the accepted header names
* @param keys The header names
* @param headers The header name–value pairs
* @param count The number of accepted patterns
* @return Evaluates to true if and only if a matching pattern was found
*/
static int is_condition_matching(interception_condition_t* cond,
size_t* hashes, char** keys, char** headers, size_t count)
{
size_t i;
for (i = 0; i < count; i++)
if (*(cond->condition) == '\0')
return 1;
else if ((cond->header_hash == hashes[i]) &&
(strequals(cond->condition, keys[i]) ||
strequals(cond->condition, headers[i])))
return 1;
return 0;
}
/**
* Find a matching condition to any of a set of acceptable conditions
*
* @param client The intercepting client
* @param hashes The hashes of the accepted header names
* @param keys The header names
* @param headers The header name–value pairs
* @param count The number of accepted patterns
* @param interception_out Storage slot for found interception
* @return -1 on error, otherwise: evalutes to true iff a matching condition was found
*/
static int find_matching_condition(client_t* client, size_t* hashes, char** keys, char** headers, size_t count,
queued_interception_t* interception_out)
{
pthread_mutex_t mutex = client->mutex;
interception_condition_t* conds = client->interception_conditions;
size_t n = 0, i;
errno = pthread_mutex_lock(&(mutex));
if (errno)
return -1;
/* Look for a matching condition. */
if (client->open)
n = client->interception_conditions_count;
for (i = 0; i < n; i++)
if (is_condition_matching(conds + i, hashes, keys, headers, count))
{
/* Report matching condition. */
interception_out->client = client;
interception_out->priority = conds[i].priority;
interception_out->modifying = conds[i].modifying;
break;
}
pthread_mutex_unlock(&(mutex));
return i < n;
}
/**
* Get all interceptors who have at least one condition matching any of a set of acceptable patterns
*
* @param sender The original sender of the message
* @param hashes The hashes of the accepted header names
* @param keys The header names
* @param headers The header name–value pairs
* @param count The number of accepted patterns
* @param interceptions_count_out Slot at where to store the number of found interceptors
* @return The found interceptors, `NULL` on error
*/
static queued_interception_t* get_interceptors(client_t* sender, size_t* hashes, char** keys, char** headers,
size_t count, size_t* interceptions_count_out)
{
queued_interception_t* interceptions = NULL;
size_t interceptions_count = 0;
size_t n = 0;
ssize_t node;
/* Count clients. */
foreach_linked_list_node (client_list, node)
n++;
/* Allocate interceptor list. */
if (xmalloc(interceptions, n, queued_interception_t))
return NULL;
/* Search clients. */
foreach_linked_list_node (client_list, node)
{
client_t* client = (client_t*)(void*)(client_list.values[node]);
/* Look for and list a matching condition. */
if (client->open && (client != sender))
{
int r = find_matching_condition(client, hashes, keys, headers, count,
interceptions + interceptions_count);
if (r == -1)
{
free(interceptions);
return NULL;
}
if (r)
/* List client of there was a matching condition. */
interceptions_count++;
}
}
*interceptions_count_out = interceptions_count;
return interceptions;
}
/**
* Queue a message for multicasting
*
* @param message The message
* @param length The length of the message
* @param sender The original sender of the message
*/
void queue_message_multicast(char* message, size_t length, client_t* sender)
{
char* msg = message;
size_t header_count = 0;
size_t n = length - 1;
size_t* hashes = NULL;
char** headers = NULL;
char** header_values = NULL;
queued_interception_t* interceptions = NULL;
size_t interceptions_count = 0;
multicast_t* multicast = NULL;
size_t i;
uint64_t modify_id;
char modify_id_header[13 + 3 * sizeof(uint64_t)];
void* new_buf;
/* Count the number of headers. */
for (i = 0; i < n; i++)
if (message[i] == '\n')
{
header_count++;
if (message[i + 1] == '\n')
break;
}
if (header_count == 0)
return; /* Invalid message. */
/* Allocate multicast message. */
fail_if (xmalloc(multicast, 1, multicast_t));
multicast_initialise(multicast);
/* Allocate header lists. */
fail_if (xmalloc(hashes, header_count, size_t));
fail_if (xmalloc(headers, header_count, char*));
fail_if (xmalloc(header_values, header_count, char*));
/* Populate header lists. */
for (i = 0; i < header_count; i++)
{
char* end = strchr(msg, '\n');
char* colon = strchr(msg, ':');
*end = '\0';
if ((header_values[i] = strdup(msg)) == NULL)
{
header_count = i;
goto pfail;
}
*colon = '\0';
if ((headers[i] = strdup(msg)) == NULL)
{
free(headers[i]);
header_count = i;
goto pfail;
}
*colon = ':';
*end = '\n';
hashes[i] = string_hash(headers[i]);
msg = end + 1;
}
/* Get intercepting clients. */
pthread_mutex_lock(&(slave_mutex));
interceptions = get_interceptors(sender, hashes, headers, header_values, header_count, &interceptions_count);
pthread_mutex_unlock(&(slave_mutex));
fail_if (interceptions == NULL);
/* Sort interceptors. */
qsort(interceptions, interceptions_count, sizeof(queued_interception_t), cmp_queued_interception);
/* Add prefix to message with ‘Modify ID’ header. */
with_mutex (slave_mutex,
modify_id = next_modify_id++;
if (next_modify_id == 0)
next_modify_id = 1;
);
xsnprintf(modify_id_header, "Modify ID: %" PRIu64 "\n", modify_id);
n = strlen(modify_id_header);
new_buf = message;
fail_if (xrealloc(new_buf, n + length, char));
message = new_buf;
memmove(message + n, message, length * sizeof(char));
memcpy(message, modify_id_header, n * sizeof(char));
/* Store information. */
multicast->interceptions = interceptions;
multicast->interceptions_count = interceptions_count;
multicast->message = message;
multicast->message_length = length + n;
multicast->message_prefix = n;
message = NULL;
/* Queue message multicasting. */
with_mutex (sender->mutex,
new_buf = sender->multicasts;
if (xrealloc(new_buf, sender->multicasts_count + 1, multicast_t))
goto fail_queue;
sender->multicasts = new_buf;
sender->multicasts[sender->multicasts_count++] = *multicast;
multicast = NULL;
fail_queue:
);
fail: /* This is done before this function returns even if there was no error. */
/* Release resources. */
xfree(headers, header_count);
xfree(header_values, header_count);
free(hashes);
free(message);
if (multicast != NULL)
multicast_destroy(multicast);
free(multicast);
return;
pfail:
perror(*argv);
goto fail;
}
/**
* Get the client by its socket's file descriptor in a synchronised manner
*
* @param socket_fd The file descriptor of the client's socket
* @return The client
*/
static client_t* client_by_socket(int socket_fd)
{
size_t address;
with_mutex (slave_mutex, address = fd_table_get(&client_map, socket_fd););
return (client_t*)(void*)address;
}
/**
* Send a multicast message to one recipient
*
* @param multicast The message
* @param recipient The recipient
* @param modifying Whether the recipient may modify the message
* @return Evaluates to true if and only if the entire message was sent
*/
static int send_multicast_to_recipient(multicast_t* multicast, client_t* recipient, int modifying)
{
char* msg = multicast->message + multicast->message_ptr;
size_t n = multicast->message_length - multicast->message_ptr;
size_t sent;
/* Skip Modify ID header if the interceptors will not perform a modification. */
if ((modifying == 0) && (multicast->message_ptr == 0))
{
n -= multicast->message_prefix;
multicast->message_ptr += multicast->message_prefix;
}
/* Send the message. */
n *= sizeof(char);
with_mutex (recipient->mutex,
if (recipient->open)
{
sent = send_message(recipient->socket_fd, msg + multicast->message_ptr, n);
n -= sent;
multicast->message_ptr += sent / sizeof(char);
if ((n > 0) && (errno != EINTR))
perror(*argv);
}
);
return n == 0;
}
/**
* Wait for the recipient of a multicast to reply
*
* @param recipient The recipient
* @param modify_id The modify ID of the multicast
*/
static void wait_for_reply(client_t* recipient, uint64_t modify_id)
{
/* pthread_cond_timedwait is required to handle re-exec and termination because
pthread_cond_timedwait and pthread_cond_wait ignore interruptions via signals. */
struct timespec timeout =
{
.tv_sec = 1,
.tv_nsec = 0
};
with_mutex_if (modify_mutex, recipient->modify_message == NULL,
if (hash_table_contains_key(&modify_map, (size_t)modify_id) == 0)
{
hash_table_put(&modify_map, (size_t)modify_id, (size_t)(void*)recipient);
pthread_cond_signal(&slave_cond);
}
);
with_mutex_if (recipient->modify_mutex, recipient->modify_message == NULL,
while ((recipient->modify_message == NULL) && (terminating == 0))
pthread_cond_timedwait(&slave_cond, &slave_mutex, &timeout);
if (terminating == 0)
hash_table_remove(&modify_map, (size_t)modify_id);
);
}
/**
* Multicast a message
*
* @param multicast The multicast message
*/
void multicast_message(multicast_t* multicast)
{
uint64_t modify_id = 0;
size_t n = strlen("Modify ID: ");
if (startswith_n(multicast->message, "Modify ID: ", multicast->message_length, n))
{
char* value = multicast->message + n;
char* lf = strchr(value, '\n');
*lf = '\0';
modify_id = (uint64_t)atoll(value);
*lf = '\n';
}
for (; multicast->interceptions_ptr < multicast->interceptions_count; multicast->interceptions_ptr++)
{
queued_interception_t client_ = multicast->interceptions[multicast->interceptions_ptr];
client_t* client = client_.client;
int modifying = 0;
char* old_buf;
size_t i;
mds_message_t* mod;
/* After unmarshalling at re-exec, client will be NULL and must be mapped from its socket. */
if (client == NULL)
client_.client = client = client_by_socket(client_.socket_fd);
/* Send the message to the recipient. */
if (send_multicast_to_recipient(multicast, client, client_.modifying) == 0)
{
/* Stop if we are re-exec:ing or terminating, or continue to next recipient on error. */
if (terminating)
return;
else
continue;
}
/* Do not wait for a reply if it is non-modifying. */
if (client_.modifying == 0)
{
/* Reset how much of the message has been sent before we continue with next recipient. */
multicast->message_ptr = 0;
continue;
}
/* Wait for a reply. */
wait_for_reply(client, modify_id);
if (terminating)
return;
/* Act upon the reply. */
mod = client->modify_message;
for (i = 0; i < mod->header_count; i++)
if (strequals(mod->headers[i], "Modify: yes"))
{
modifying = 1;
break;
}
if (modifying)
{
n = mod->payload_size;
old_buf = multicast->message;
if (xrealloc(multicast->message, multicast->message_prefix + n, char))
{
perror(*argv);
multicast->message = old_buf;
}
else
memcpy(multicast->message + multicast->message_prefix, mod->payload, n);
}
/* Free the reply. */
mds_message_destroy(client->modify_message);
/* Reset how much of the message has been sent before we continue with next recipient. */
multicast->message_ptr = 0;
}
}
/**
* Exec into the mdsinitrc script
*
* @param args The arguments to the child process
*/
void run_initrc(char** args)
{
char pathname[PATH_MAX];
struct passwd* pwd;
char* env;
char* home;
args[0] = pathname;
#define __exec(FORMAT, ...) \
xsnprintf(pathname, FORMAT, __VA_ARGS__); execv(args[0], args)
/* Test $XDG_CONFIG_HOME. */
if ((env = getenv_nonempty("XDG_CONFIG_HOME")) != NULL)
{
__exec("%s/%s", env, INITRC_FILE);
}
/* Test $HOME. */
if ((env = getenv_nonempty("HOME")) != NULL)
{
__exec("%s/.config/%s", env, INITRC_FILE);
__exec("%s/.%s", env, INITRC_FILE);
}
/* Test ~. */
pwd = getpwuid(getuid()); /* Ignore error. */
if (pwd != NULL)
{
home = pwd->pw_dir;
if ((home != NULL) && (*home != '\0'))
{
__exec("%s/.config/%s", home, INITRC_FILE);
__exec("%s/.%s", home, INITRC_FILE);
}
}
/* Test $XDG_CONFIG_DIRS. */
if ((env = getenv_nonempty("XDG_CONFIG_DIRS")) != NULL)
{
char* begin = env;
char* end;
int len;
for (;;)
{
end = strchrnul(begin, ':');
len = (int)(end - begin);
if (len > 0)
{
__exec("%.*s/%s", len, begin, INITRC_FILE);
}
if (*end == '\0')
break;
begin = end + 1;
}
}
/* Test /etc. */
__exec("%s/%s", SYSCONFDIR, INITRC_FILE);
#undef __exec
/* Everything failed. */
eprintf("unable to run %s file, you might as well kill me.", INITRC_FILE);
}
/**
* Called with the signal SIGUSR1 is caught.
* This function should cue a re-exec of the program.
*
* @param signo The caught signal
*/
void sigusr1_trap(int signo)
{
if (reexecing == 0)
{
terminating = reexecing = 1;
eprint("re-exec signal received.");
signal_all(signo);
}
}
/**
* Called with the signal SIGTERM is caught.
* This function should cue a termination of the program.
*
* @param signo The caught signal
*/
void sigterm_trap(int signo)
{
if (terminating == 0)
{
terminating = 1;
eprint("terminate signal received.");
signal_all(signo);
}
}
/**
* Send a singal to all threads except the current thread
*
* @param signo The signal
*/
void signal_all(int signo)
{
pthread_t current_thread;
ssize_t node;
current_thread = pthread_self();
if (pthread_equal(current_thread, master_thread) == 0)
pthread_kill(master_thread, signo);
with_mutex (slave_mutex,
foreach_linked_list_node (client_list, node)
{
client_t* value = (client_t*)(void*)(client_list.values[node]);
if (pthread_equal(current_thread, value->thread) == 0)
pthread_kill(value->thread, signo);
}
);
}
/**
* Marshal the server's state into a file
*
* @param fd The file descriptor
* @return Negative on error
*/
int marshal_server(int fd)
{
size_t list_size = linked_list_marshal_size(&client_list);
size_t map_size = fd_table_marshal_size(&client_map);
size_t list_elements = 0;
size_t state_n = 0;
char* state_buf = NULL;
char* state_buf_;
ssize_t node;
/* Calculate the grand size of all client information. */
for (node = client_list.edge;; list_elements++)
{
if ((node = client_list.next[node]) == client_list.edge)
break;
state_n += client_marshal_size((client_t*)(void*)(client_list.values[node]));
}
/* Add the size of the rest of the program's state. */
state_n += sizeof(int) + sizeof(sig_atomic_t) + 2 * sizeof(uint64_t) + 2 * sizeof(size_t);
state_n += list_elements * sizeof(size_t) + list_size + map_size;
/* Allocate a buffer for all data except the client list and the client map. */
state_buf = state_buf_ = malloc(state_n);
if (state_buf == NULL)
goto fail;
/* Tell the new version of the program what version of the program it is marshalling. */
buf_set_next(state_buf_, int, MDS_SERVER_VARS_VERSION);
/* Marshal the miscellaneous state data. */
buf_set_next(state_buf_, sig_atomic_t, running);
buf_set_next(state_buf_, uint64_t, next_id);
buf_set_next(state_buf_, uint64_t, next_modify_id);
/* Tell the program how large the marshalled client list is and how any clients are marshalled. */
buf_set_next(state_buf_, size_t, list_size);
buf_set_next(state_buf_, size_t, list_elements);
/* Marshal the clients. */
foreach_linked_list_node (client_list, node)
{
/* Get the memory address of the client. */
size_t value_address = client_list.values[node];
/* Get the client's information. */
client_t* value = (client_t*)(void*)value_address;
/* Marshal the address, it is used the the client list and the client map, that will be marshalled. */
buf_set_next(state_buf_, size_t, value_address);
/* Marshal the client informationation. */
state_buf_ += client_marshal(value, state_buf_) / sizeof(char);
}
/* Marshal the client list. */
linked_list_marshal(&client_list, state_buf_);
state_buf_ += list_size / sizeof(char);
/* Marshal the client map. */
fd_table_marshal(&client_map, state_buf_);
/* Send the marshalled data into the file. */
if (full_write(fd, state_buf, state_n) < 0)
goto fail;
free(state_buf);
return 0;
fail:
perror(*argv);
free(state_buf);
return -1;
}
/**
* Address translation table used by `unmarshal_server` and `remapper`
*/
static hash_table_t unmarshal_remap_map;
/**
* Address translator for `unmarshal_server`
*
* @param old The old address
* @return The new address
*/
static size_t unmarshal_remapper(size_t old)
{
return hash_table_get(&unmarshal_remap_map, old);
}
/**
* Unmarshal the server's state from a file
*
* @param fd The file descriptor
* @return Negative on error
*/
int unmarshal_server(int fd)
{
int with_error = 0;
char* state_buf;
char* state_buf_;
size_t list_size;
size_t list_elements;
size_t i;
ssize_t node;
pthread_t slave_thread;
/* Read the file. */
if ((state_buf = state_buf_ = full_read(fd)) == NULL)
{
perror(*argv);
return -1;
}
/* Create memory address remapping table. */
if (hash_table_create(&unmarshal_remap_map))
{
perror(*argv);
free(state_buf);
hash_table_destroy(&unmarshal_remap_map, NULL, NULL);
return -1;
}
/* Get the marshal protocal version. Not needed, there is only the one version right now. */
/* buf_get(state_buf_, int, 0, MDS_SERVER_VARS_VERSION); */
buf_next(state_buf_, int, 1);
/* Unmarshal the miscellaneous state data. */
buf_get_next(state_buf_, sig_atomic_t, running);
buf_get_next(state_buf_, uint64_t, next_id);
buf_get_next(state_buf_, uint64_t, next_modify_id);
/* Get the marshalled size of the client list and how any clients that are marshalled. */
buf_get_next(state_buf_, size_t, list_size);
buf_get_next(state_buf_, size_t, list_elements);
/* Unmarshal the clients. */
for (i = 0; i < list_elements; i++)
{
size_t n;
size_t value_address;
client_t* value;
/* Allocate the client's information. */
if (xmalloc(value, 1, client_t))
goto clients_fail;
/* Unmarshal the address, it is used the the client list and the client map, that are also marshalled. */
buf_get_next(state_buf_, size_t, value_address);
/* Unmarshal the client information. */
n = client_unmarshal(value, state_buf_);
if (n == 0)
goto clients_fail;
/* Populate the remapping table. */
if (hash_table_put(&unmarshal_remap_map, value_address, (size_t)(void*)value) == 0)
if (errno)
goto clients_fail;
/* Delayed seeking. */
state_buf_ += n / sizeof(char);
/* On error, seek past all clients. */
continue;
clients_fail:
perror(*argv);
with_error = 1;
if (value != NULL)
{
buf_prev(state_buf_, size_t, 1);
free(value);
}
for (; i < list_elements; i++)
/* There is not need to close the sockets, it is done by
the caller because there are conditions where we cannot
get here anyway. */
state_buf_ += client_unmarshal_skip(state_buf_) / sizeof(char);
break;
}
/* Unmarshal the client list. */
if (linked_list_unmarshal(&client_list, state_buf_))
goto critical_fail;
state_buf_ += list_size / sizeof(char);
/* Unmarshal the client map. */
if (fd_table_unmarshal(&client_map, state_buf_, unmarshal_remapper))
goto critical_fail;
/* Release the raw data. */
free(state_buf);
/* Remove non-found elements from the fd table. */
#define __bit(I, _OP_) client_map.used[I / 64] _OP_ ((uint64_t)1 << (I % 64))
if (with_error)
for (i = 0; i < client_map.capacity; i++)
if ((__bit(i, &)) && (client_map.values[i] == 0))
/* Lets not presume that fd-table actually initialise its allocations. */
__bit(i, &= ~);
#undef __bit
/* Remap the linked list and remove non-found elements, and start the clients. */
foreach_linked_list_node (client_list, node)
{
/* Remap the linked list and remove non-found elements. */
size_t new_address = unmarshal_remapper(client_list.values[node]);
client_list.values[node] = new_address;
if (new_address == 0) /* Returned if missing (or if the address is the invalid NULL.) */
linked_list_remove(&client_list, node);
else
{
/* Start the clients. (Errors do not need to be reported.) */
client_t* client = (client_t*)(void*)new_address;
int socket_fd = client->socket_fd;
/* Increase number of running slaves. */
with_mutex (slave_mutex, running_slaves++;);
/* Start slave thread. */
create_slave(&slave_thread, socket_fd);
}
}
/* Release the remapping table's resources. */
hash_table_destroy(&unmarshal_remap_map, NULL, NULL);
return -with_error;
critical_fail:
perror(*argv);
abort();
}
/**
* Create, start and detache a slave thread
*
* @param thread The address at where to store the thread
* @param socket_fd The file descriptor of the slave's socket
* @return Zero on success, -1 on error, error message will have been printed
*/
int create_slave(pthread_t* thread_slot, int socket_fd)
{
if ((errno = pthread_create(thread_slot, NULL, slave_loop, (void*)(intptr_t)socket_fd)))
{
perror(*argv);
with_mutex (slave_mutex, running_slaves--;);
return -1;
}
if ((errno = pthread_detach(*thread_slot)))
{
perror(*argv);
return -1;
}
return 0;
}