aboutsummaryrefslogtreecommitdiffstats
path: root/src/mds-server/mds-server.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/mds-server/mds-server.c')
-rw-r--r--src/mds-server/mds-server.c617
1 files changed, 268 insertions, 349 deletions
diff --git a/src/mds-server/mds-server.c b/src/mds-server/mds-server.c
index 5b81222..8bb7b83 100644
--- a/src/mds-server/mds-server.c
+++ b/src/mds-server/mds-server.c
@@ -137,7 +137,6 @@ static pthread_cond_t modify_cond;
static hash_table_t modify_map;
-
/**
* Entry point of the server
*
@@ -160,24 +159,21 @@ int main(int argc_, char** argv_)
#endif
+#define exit_if(CONDITION, INSTRUCTION) if (CONDITION) { INSTRUCTION return 1; }
+
+
argc = argc_;
argv = argv_;
/* Drop privileges like it's hot. */
if (drop_privileges())
- {
- perror(*argv);
- return 1;
- }
+ goto pfail;
/* Sanity check the number of command line arguments. */
- if (argc > ARGC_LIMIT + LIBEXEC_ARGC_EXTRA_LIMIT)
- {
- eprint("that number of arguments is ridiculous, I will not allow it.");
- return 1;
- }
+ exit_if (argc > ARGC_LIMIT + LIBEXEC_ARGC_EXTRA_LIMIT,
+ eprint("that number of arguments is ridiculous, I will not allow it."););
/* Parse command line arguments. */
@@ -187,26 +183,18 @@ int main(int argc_, char** argv_)
int v;
if ((v = strequals(arg, "--initial-spawn")) || /* Initial spawn? */
strequals(arg, "--respawn")) /* Respawning after crash? */
- if (is_respawn == v)
- {
- eprintf("conflicting arguments %s and %s cannot be combined.",
- "--initial-spawn", "--respawn");
- return 1;
- }
- else
+ {
+ exit_if (is_respawn == v,
+ eprintf("conflicting arguments %s and %s cannot be combined.",
+ "--initial-spawn", "--respawn"););
is_respawn = !v;
+ }
else if (startswith(arg, "--socket-fd=")) /* Socket file descriptor. */
{
- if (socket_fd != -1)
- {
- eprintf("duplicate declaration of %s.", "--socket-fd");
- return 1;
- }
- if (strict_atoi(arg += strlen("--socket-fd="), &socket_fd, 0, INT_MAX) < 0)
- {
- eprintf("invalid value for %s: %s.", "--socket-fd", arg);
- return 1;
- }
+ exit_if (socket_fd != -1,
+ eprintf("duplicate declaration of %s.", "--socket-fd"););
+ exit_if (strict_atoi(arg += strlen("--socket-fd="), &socket_fd, 0, INT_MAX) < 0,
+ eprintf("invalid value for %s: %s.", "--socket-fd", arg););
}
else if (strequals(arg, "--re-exec")) /* Re-exec state-marshal. */
reexec = 1;
@@ -223,17 +211,14 @@ int main(int argc_, char** argv_)
/* Check that manditory arguments have been specified. */
- if (is_respawn < 0)
- {
- eprintf("missing state argument, require either %s or %s.",
- "--initial-spawn", "--respawn");
- return 1;
- }
- if (socket_fd < 0)
- {
- eprint("missing socket file descriptor argument.");
- return 1;
- }
+ exit_if (is_respawn < 0,
+ eprintf("missing state argument, require either %s or %s.",
+ "--initial-spawn", "--respawn"););
+ exit_if (socket_fd < 0,
+ eprint("missing socket file descriptor argument."););
+
+
+#undef exit_if
/* Run mdsinitrc. */
@@ -241,16 +226,13 @@ int main(int argc_, char** argv_)
{
pid_t pid = fork();
if (pid == (pid_t)-1)
- {
- perror(*argv);
- return 1;
- }
+ goto pfail;
if (pid == 0) /* Child process exec:s, the parent continues without waiting for it. */
{
/* Close all files except stdin, stdout and stderr. */
close_files((fd > 2) || (fd == socket_fd));
- /* Run initrc */
+ /* Run mdsinitrc. */
run_initrc(unparsed_args);
return 1;
}
@@ -265,78 +247,36 @@ int main(int argc_, char** argv_)
if (I >= 5) pthread_cond_destroy(&modify_cond); \
if (I >= 6) hash_table_destroy(&modify_map, NULL, NULL)
+#define __error(I) perror(*argv); __free(I); return 1
+
/* Create list and table of clients. */
if (reexec == 0)
{
- if (fd_table_create(&client_map))
- {
- perror(*argv);
- __free(0);
- return 1;
- }
- if (linked_list_create(&client_list, 32))
- {
- perror(*argv);
- __free(1);
- return 1;
- }
+ if (fd_table_create(&client_map)) { __error(0); }
+ if (linked_list_create(&client_list, 32)) { __error(1); }
}
/* Store the current thread so it can be killed from elsewhere. */
master_thread = pthread_self();
-
/* Make the server update without all slaves dying on SIGUSR1. */
- if (xsigaction(SIGUSR1, sigusr1_trap) < 0)
- {
- perror(*argv);
- __free(1);
- return 1;
- }
+ if (xsigaction(SIGUSR1, sigusr1_trap) < 0) { __error(1); }
/* Implement clean exit on SIGTERM. */
- if (xsigaction(SIGTERM, sigterm_trap) < 0)
- {
- perror(*argv);
- __free(1);
- return 1;
- }
+ if (xsigaction(SIGTERM, sigterm_trap) < 0) { __error(1); }
/* Create mutex and condition for slave counter. */
- if ((errno = pthread_mutex_init(&slave_mutex, NULL)) != 0)
- {
- perror(*argv);
- __free(1);
- return 1;
- }
- if ((errno = pthread_cond_init(&slave_cond, NULL)) != 0)
- {
- perror(*argv);
- __free(2);
- return 1;
- }
+ if ((errno = pthread_mutex_init(&slave_mutex, NULL))) { __error(1); }
+ if ((errno = pthread_cond_init(&slave_cond, NULL))) { __error(2); }
/* Create mutex, condition and map for message modification. */
- if ((errno = pthread_mutex_init(&modify_mutex, NULL)) != 0)
- {
- perror(*argv);
- __free(3);
- return 1;
- }
- if ((errno = pthread_cond_init(&modify_cond, NULL)) != 0)
- {
- perror(*argv);
- __free(4);
- return 1;
- }
- if (hash_table_create(&modify_map))
- {
- perror(*argv);
- __free(5);
- return 1;
- }
+ if ((errno = pthread_mutex_init(&modify_mutex, NULL))) { __error(3); }
+ if ((errno = pthread_cond_init(&modify_cond, NULL))) { __error(4); }
+ if (hash_table_create(&modify_map)) { __error(5); }
+
+#undef __error
/* Unmarshal the state of the server. */
@@ -400,15 +340,7 @@ int main(int argc_, char** argv_)
with_mutex(slave_mutex, running_slaves++;);
/* Start slave thread. */
- errno = pthread_create(&slave_thread, NULL, slave_loop, (void*)(intptr_t)client_fd);
- if (errno)
- {
- perror(*argv);
- with_mutex(slave_mutex, running_slaves--;);
- }
- errno = pthread_detach(slave_thread);
- if (errno)
- perror(*argv);
+ create_slave(&slave_thread, client_fd);
}
terminate:
@@ -424,10 +356,10 @@ int main(int argc_, char** argv_)
/* Release resources. */
__free(9999);
-#undef __free
-
return 0;
+#undef __free
+
reexec:
{
@@ -452,10 +384,7 @@ int main(int argc_, char** argv_)
xsnprintf(shm_path, SHM_PATH_PATTERN, (unsigned long int)pid);
reexec_fd = shm_open(shm_path, O_RDWR | O_CREAT | O_EXCL, S_IRWXU);
if (reexec_fd < 0)
- {
- perror(*argv);
- return 1;
- }
+ goto pfail;
if (marshal_server(reexec_fd) < 0)
goto reexec_fail;
close(reexec_fd);
@@ -482,6 +411,10 @@ int main(int argc_, char** argv_)
be respawn if the re-exec fails. */
return 1;
}
+
+ pfail:
+ perror(*argv);
+ return 1;
}
@@ -507,10 +440,7 @@ void* slave_loop(void* data)
{
/* Create information table. */
if (xmalloc(information, 1, client_t))
- {
- perror(*argv);
- goto fail;
- }
+ goto pfail;
/* NULL-out pointers and initialisation markers. */
information->interception_conditions = NULL;
@@ -550,10 +480,7 @@ void* slave_loop(void* data)
information->send_pending_size = 0;
information->multicasts_count = 0;
if (mds_message_initialise(&(information->message)))
- {
- perror(*argv);
- goto fail;
- }
+ goto pfail;
}
@@ -561,129 +488,102 @@ void* slave_loop(void* data)
information->thread = pthread_self();
/* Create mutex to make sure two thread to not try to send
messages concurrently, and other slave local actions. */
- if ((errno = pthread_mutex_init(&(information->mutex), NULL)) != 0)
- {
- perror(*argv);
- goto fail;
- }
+ if ((errno = pthread_mutex_init(&(information->mutex), NULL))) goto pfail;
information->mutex_created = 1;
/* Create mutex and codition for multicast interception replies. */
- if ((errno = pthread_mutex_init(&(information->modify_mutex), NULL)) != 0)
- {
- perror(*argv);
- goto fail;
- }
+ if ((errno = pthread_mutex_init(&(information->modify_mutex), NULL))) goto pfail;
information->modify_mutex_created = 1;
- if ((errno = pthread_cond_init(&(information->modify_cond), NULL)) != 0)
- {
- perror(*argv);
- goto fail;
- }
+ if ((errno = pthread_cond_init(&(information->modify_cond), NULL))) goto pfail;
information->modify_cond_created = 1;
/* Make the server update without all slaves dying on SIGUSR1. */
- if (xsigaction(SIGUSR1, sigusr1_trap) < 0)
- {
- perror(*argv);
- goto fail;
- }
+ if (xsigaction(SIGUSR1, sigusr1_trap) < 0) goto pfail;
/* Implement clean exit on SIGTERM. */
- if (xsigaction(SIGTERM, sigterm_trap) < 0)
- {
- perror(*argv);
- goto fail;
- }
+ if (xsigaction(SIGTERM, sigterm_trap) < 0) goto pfail;
/* Fetch messages from the slave. */
- if (information->open)
- while (terminating == 0)
- {
- /* Send queued multicast messages. */
- if (information->multicasts_count > 0)
+ while ((terminating == 0) && (information->open == 0))
+ {
+ /* Send queued multicast messages. */
+ if (information->multicasts_count > 0)
+ {
+ multicast_t multicast;
+ with_mutex(information->mutex,
+ if (information->multicasts_count > 0)
+ {
+ size_t c = (information->multicasts_count -= 1) * sizeof(multicast_t);
+ multicast = information->multicasts[0];
+ memmove(information->multicasts, information->multicasts + 1, c);
+ if (c == 0)
+ {
+ free(information->multicasts);
+ information->multicasts = NULL;
+ }
+ }
+ );
+ multicast_message(&multicast);
+ multicast_destroy(&multicast);
+ }
+
+ /* Send queued messages. */
+ if (information->send_pending_size > 0)
+ {
+ char* sendbuf = information->send_pending;
+ char* sendbuf_ = sendbuf;
+ size_t sent;
+ n = information->send_pending_size;
+ information->send_pending_size = 0;
+ information->send_pending = NULL;
+ with_mutex(information->mutex,
+ while (n > 0)
+ {
+ sent = send_message(information->socket_fd, sendbuf_, n);
+ if ((sent < n) && (errno != EINTR)) /* Ignore EINTR */
+ {
+ perror(*argv);
+ break;
+ }
+ n -= sent;
+ sendbuf_ += sent / sizeof(char);
+ }
+ free(sendbuf);
+ );
+ }
+
+ /* Fetch message. */
+ r = mds_message_read(&(information->message), socket_fd);
+ if (r == 0)
+ {
+ if (message_received(information) == 1)
+ goto terminate;
+ }
+ else
+ if (r == -2)
{
- multicast_t multicast;
- with_mutex(information->mutex,
- if (information->multicasts_count > 0)
- {
- size_t c = information->multicasts_count -= 1;
- c *= sizeof(multicast_t);
- multicast = information->multicasts[0];
- memmove(information->multicasts, information->multicasts + 1, c);
- if (c == 0)
- {
- free(information->multicasts);
- information->multicasts = NULL;
- }
- }
- );
- multicast_message(&multicast);
- multicast_destroy(&multicast);
+ eprint("corrupt message received.");
+ goto fail;
}
-
- /* Send queued messages. */
- if (information->send_pending_size > 0)
+ else if (errno == ECONNRESET)
{
- char* sendbuf = information->send_pending;
- char* sendbuf_ = sendbuf;
- size_t sent;
- n = information->send_pending_size;
- information->send_pending_size = 0;
- information->send_pending = NULL;
- with_mutex(information->mutex,
- while (n > 0)
- {
- sent = send_message(information->socket_fd, sendbuf_, n);
- if ((sent < n) && (errno != EINTR)) /* Ignore EINTR */
- {
- perror(*argv);
- break;
- }
- n -= sent;
- sendbuf_ += sent / sizeof(char);
- }
- free(sendbuf);
- );
+ r = mds_message_read(&(information->message), socket_fd);
+ if ((r == 0) && message_received(information))
+ goto terminate;
+ information->open = 0;
+ /* Connection closed. */
}
-
- /* Fetch message. */
- r = mds_message_read(&(information->message), socket_fd);
- if (r == 0)
+ else if (errno == EINTR)
{
- if (message_received(information) == 1)
+ /* Stop the thread if we are re-exec:ing the server. */
+ if (terminating)
goto terminate;
}
else
- if (r == -2)
- {
- eprint("corrupt message received.");
- goto fail;
- }
- else if (errno == ECONNRESET)
- {
- r = mds_message_read(&(information->message), socket_fd);
- if (r == 0)
- if (message_received(information))
- goto terminate;
- information->open = 0;
- /* Connection closed. */
- break;
- }
- else if (errno == EINTR)
- {
- /* Stop the thread if we are re-exec:ing the server. */
- if (terminating)
- goto terminate;
- }
- else
- {
- perror(*argv);
- goto fail;
- }
- }
+ goto pfail;
+ }
/* Stop the thread if we are re-exec:ing the server. */
if (terminating)
goto terminate;
@@ -725,6 +625,11 @@ void* slave_loop(void* data)
return NULL;
+ pfail:
+ perror(*argv);
+ goto fail;
+
+
reexec:
/* Tell the master thread that the slave has closed,
this is done because re-exec causes a race-condition
@@ -1037,46 +942,46 @@ void add_intercept_condition(client_t* client, char* condition, int64_t priority
with for optimisation. */
for (i = 0; i < n; i++)
{
- if (conds[i].header_hash == hash)
- if (strequals(conds[i].condition, condition))
- {
- if (stop)
- {
- /* Remove the condition from the list. */
- memmove(conds + i, conds + i + 1, --n - i);
- client->interception_conditions_count--;
- /* Shrink the list. */
- if (n == 0)
- {
- free(conds);
- client->interception_conditions = NULL;
- }
+ if ((conds[i].header_hash == hash) && strequals(conds[i].condition, condition))
+ {
+ if (stop)
+ {
+ /* Remove the condition from the list. */
+ memmove(conds + i, conds + i + 1, --n - i);
+ client->interception_conditions_count--;
+ /* Shrink the list. */
+ if (n == 0)
+ {
+ free(conds);
+ client->interception_conditions = NULL;
+ }
+ else
+ if ((conds = realloc(conds, n * sizeof(interception_condition_t))) == NULL)
+ perror(*argv);
else
- if ((conds = realloc(conds, n * sizeof(interception_condition_t))) == NULL)
- perror(*argv);
- else
- client->interception_conditions = conds;
- }
- else
- {
- /* Update parameters. */
- conds[i].priority = priority;
- conds[i].modifying = modifying;
- if (modifying && (nonmodifying >= 0))
- {
- /* Optimisation: put conditions that are modifying
- at the beginning. When a client is intercepting
- we most know if any satisfying condition is
- modifying. With this optimisation the first
- satisfying condition will tell us if there is
- any satisfying condition that is modifying. */
- interception_condition_t temp = conds[nonmodifying];
- conds[nonmodifying] = conds[i];
- conds[i] = temp;
- }
- }
- return;
- }
+ client->interception_conditions = conds;
+ }
+ else
+ {
+ /* Update parameters. */
+ conds[i].priority = priority;
+ conds[i].modifying = modifying;
+
+ if (modifying && (nonmodifying >= 0))
+ {
+ /* Optimisation: put conditions that are modifying
+ at the beginning. When a client is intercepting
+ we most know if any satisfying condition is
+ modifying. With this optimisation the first
+ satisfying condition will tell us if there is
+ any satisfying condition that is modifying. */
+ interception_condition_t temp = conds[nonmodifying];
+ conds[nonmodifying] = conds[i];
+ conds[i] = temp;
+ }
+ }
+ return;
+ }
/* Look for the first non-modifying, this is a part of the
optimisation where we put all modifying conditions at the
beginning. */
@@ -1411,79 +1316,79 @@ void multicast_message(multicast_t* multicast)
continue;
}
- /* Wait for a reply and act upon it. */
- if ((n == 0) && client_.modifying)
+ /* Do not wait for a reply if it is non-modifying. */
+ if (client_.modifying)
{
- /* pthread_cond_timedwait is required to handle re-exec because
- pthread_cond_timedwait and pthread_cond_wait ignore interruptions via signals. */
- struct timespec timeout =
- {
- .tv_sec = 1,
- .tv_nsec = 0
- };
- int modifying = 0;
- char* old_buf;
- size_t i;
- mds_message_t* mod;
-
- /* Wait for a reply. */
- with_mutex(modify_mutex,
- if (client->modify_message == NULL)
- {
- if (hash_table_contains_key(&modify_map, (size_t)modify_id) == 0)
- {
- hash_table_put(&modify_map, (size_t)modify_id, (size_t)(void*)client);
- pthread_cond_signal(&slave_cond);
- }
- }
- );
- with_mutex(client->modify_mutex,
- if (client->modify_message == NULL)
- {
- for (;;)
- {
- pthread_cond_timedwait(&slave_cond, &slave_mutex, &timeout);
- if ((client->modify_message != NULL) && terminating)
- break;
- }
- if (terminating == 0)
- hash_table_remove(&modify_map, (size_t)modify_id);
- }
- );
- if (terminating)
- return;
-
- /* Act upon the reply. */
- if (client->modify_message != NULL)
- {
- mod = client->modify_message;
- for (i = 0; i < mod->header_count; i++)
- if (!strcmp(mod->headers[i], "Modify: yes"))
- {
- modifying = 1;
- break;
- }
- if (modifying)
- {
- old_buf = multicast->message;
- n = mod->payload_size;
- multicast->message = realloc(old_buf, (multicast->message_prefix + n) * sizeof(char));
- if (multicast->message == NULL)
- {
- perror(*argv);
- multicast->message = old_buf;
- }
- else
- memcpy(multicast->message + multicast->message_prefix, mod->payload, n);
- }
-
- /* Free the reply. */
- mds_message_destroy(client->modify_message);
- }
+ /* Reset how much of the message has been sent before we continue with next recipient. */
+ multicast->message_ptr = 0;
+ continue;
}
- /* Reset how much of the message has been sent before we continue with next recipient. */
- multicast->message_ptr = 0;
+ /* Wait for a reply and act upon it. */
+ {
+ /* pthread_cond_timedwait is required to handle re-exec because
+ pthread_cond_timedwait and pthread_cond_wait ignore interruptions via signals. */
+ struct timespec timeout =
+ {
+ .tv_sec = 1,
+ .tv_nsec = 0
+ };
+ int modifying = 0;
+ char* old_buf;
+ size_t i;
+ mds_message_t* mod;
+
+ /* Wait for a reply. */
+ with_mutex(modify_mutex,
+ if (client->modify_message == NULL)
+ {
+ if (hash_table_contains_key(&modify_map, (size_t)modify_id) == 0)
+ {
+ hash_table_put(&modify_map, (size_t)modify_id, (size_t)(void*)client);
+ pthread_cond_signal(&slave_cond);
+ }
+ }
+ );
+ with_mutex(client->modify_mutex,
+ if (client->modify_message == NULL)
+ {
+ while ((client->modify_message == NULL) && (terminating == 0))
+ pthread_cond_timedwait(&slave_cond, &slave_mutex, &timeout);
+ if (terminating == 0)
+ hash_table_remove(&modify_map, (size_t)modify_id);
+ }
+ );
+ if (terminating)
+ return;
+
+ /* Act upon the reply. */
+ mod = client->modify_message;
+ for (i = 0; i < mod->header_count; i++)
+ if (!strcmp(mod->headers[i], "Modify: yes"))
+ {
+ modifying = 1;
+ break;
+ }
+ if (modifying)
+ {
+ old_buf = multicast->message;
+ n = mod->payload_size;
+ multicast->message = realloc(old_buf, (multicast->message_prefix + n) * sizeof(char));
+ if (multicast->message == NULL)
+ {
+ perror(*argv);
+ multicast->message = old_buf;
+ }
+ else
+ memcpy(multicast->message + multicast->message_prefix, mod->payload, n);
+ }
+
+ /* Free the reply. */
+ mds_message_destroy(client->modify_message);
+
+ /* Reset how much of the message has been sent before we continue with next recipient. */
+ multicast->message_ptr = 0;
+ }
}
}
@@ -1809,18 +1714,12 @@ int unmarshal_server(int fd)
/* Unmarshal the client list. */
if (linked_list_unmarshal(&client_list, state_buf_))
- {
- perror(*argv);
- abort(); /* Critical. */
- }
+ goto critical_fail;
state_buf_ += list_size / sizeof(char);
/* Unmarshal the client map. */
if (fd_table_unmarshal(&client_map, state_buf_, unmarshal_remapper))
- {
- perror(*argv);
- abort(); /* Critical. */
- }
+ goto critical_fail;
/* Release the raw data. */
free(state_buf);
@@ -1851,15 +1750,7 @@ int unmarshal_server(int fd)
with_mutex(slave_mutex, running_slaves++;);
/* Start slave thread. */
- errno = pthread_create(&slave_thread, NULL, slave_loop, (void*)(intptr_t)socket_fd);
- if (errno)
- {
- perror(*argv);
- with_mutex(slave_mutex, running_slaves--;);
- }
- errno = pthread_detach(slave_thread);
- if (errno)
- perror(*argv);
+ create_slave(&slave_thread, socket_fd);
}
}
@@ -1867,5 +1758,33 @@ int unmarshal_server(int fd)
hash_table_destroy(&unmarshal_remap_map, NULL, NULL);
return -with_error;
+
+ critical_fail:
+ perror(*argv);
+ abort();
+}
+
+
+/**
+ * Create, start and detache a slave thread
+ *
+ * @param thread The address at where to store the thread
+ * @param socket_fd The file descriptor of the slave's socket
+ * @return Zero on success, -1 on error, error message will have been printed
+ */
+int create_slave(pthread_t* thread_slot, int socket_fd)
+{
+ if ((errno = pthread_create(thread_slot, NULL, slave_loop, (void*)(intptr_t)socket_fd)))
+ {
+ perror(*argv);
+ with_mutex(slave_mutex, running_slaves--;);
+ return -1;
+ }
+ if ((errno = pthread_detach(*thread_slot)))
+ {
+ perror(*argv);
+ return -1;
+ }
+ return 0;
}