aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--src/mds-server/client.c68
-rw-r--r--src/mds-server/client.h32
-rw-r--r--src/mds-server/mds-server.c226
-rw-r--r--src/mds-server/mds-server.h24
4 files changed, 240 insertions, 110 deletions
diff --git a/src/mds-server/client.c b/src/mds-server/client.c
index 15afc18..76c7250 100644
--- a/src/mds-server/client.c
+++ b/src/mds-server/client.c
@@ -24,10 +24,78 @@
#include <stdlib.h>
#include <string.h>
#include <pthread.h>
+#include <errno.h>
/**
+ * Initialise a client
+ *
+ * The following fields will not be initialised:
+ * - message
+ * - thread
+ * - mutex
+ * - modify_mutex
+ * - modify_cond
+ *
+ * The follow fields will be initialised to `-1`:
+ * - list_entry
+ * - socket_fd
+ *
+ * @param this Memory slot in which to store the new client information
+ */
+void client_initialise(client_t* restrict this)
+{
+ this->list_entry = -1;
+ this->socket_fd = -1;
+ this->open = 0;
+ this->id = 0;
+ this->mutex_created = 0;
+ this->interception_conditions = NULL;
+ this->interception_conditions_count = 0;
+ this->multicasts = NULL;
+ this->multicasts_count = 0;
+ this->send_pending = NULL;
+ this->send_pending_size = 0;
+ this->modify_message = NULL;
+ this->modify_mutex_created = 0;
+ this->modify_cond_created = 0;
+}
+
+
+/**
+ * Initialise fields that have to do with threading
+ *
+ * This method initialises the following fields:
+ * - thread
+ * - mutex
+ * - modify_mutex
+ * - modify_cond
+ *
+ * @param this The client information
+ * @return Zero on success, -1 on error
+ */
+int client_initialise_threading(client_t* restrict this)
+{
+ /* Store the thread so that other threads can kill it. */
+ this->thread = pthread_self();
+
+ /* Create mutex to make sure two thread to not try to send
+ messages concurrently, and other client local actions. */
+ if ((errno = pthread_mutex_init(&(this->mutex), NULL))) return -1;
+ this->mutex_created = 1;
+
+ /* Create mutex and codition for multicast interception replies. */
+ if ((errno = pthread_mutex_init(&(this->modify_mutex), NULL))) return -1;
+ this->modify_mutex_created = 1;
+ if ((errno = pthread_cond_init(&(this->modify_cond), NULL))) return -1;
+ this->modify_cond_created = 1;
+
+ return 0;
+}
+
+
+/**
* Release all resources assoicated with a client
*
* @param this The client information
diff --git a/src/mds-server/client.h b/src/mds-server/client.h
index 2aeb813..39ed791 100644
--- a/src/mds-server/client.h
+++ b/src/mds-server/client.h
@@ -137,6 +137,38 @@ typedef struct client
/**
+ * Initialise a client
+ *
+ * The following fields will not be initialised:
+ * - message
+ * - thread
+ * - mutex
+ * - modify_mutex
+ * - modify_cond
+ *
+ * The follow fields will be initialised to `-1`:
+ * - list_entry
+ * - socket_fd
+ *
+ * @param this Memory slot in which to store the new client information
+ */
+void client_initialise(client_t* restrict this);
+
+/**
+ * Initialise fields that have to do with threading
+ *
+ * This method initialises the following fields:
+ * - thread
+ * - mutex
+ * - modify_mutex
+ * - modify_cond
+ *
+ * @param this The client information
+ * @return Zero on success, -1 on error
+ */
+int client_initialise_threading(client_t* restrict this);
+
+/**
* Release all resources assoicated with a client
*
* @param this The client information
diff --git a/src/mds-server/mds-server.c b/src/mds-server/mds-server.c
index 039a478..c9e41c0 100644
--- a/src/mds-server/mds-server.c
+++ b/src/mds-server/mds-server.c
@@ -425,15 +425,7 @@ void* slave_loop(void* data)
{
/* Create information table. */
fail_if (xmalloc(information, 1, client_t));
-
- /* NULL-out pointers and initialisation markers. */
- information->interception_conditions = NULL;
- information->send_pending = NULL;
- information->modify_message = NULL;
- information->multicasts = NULL;
- information->mutex_created = 0;
- information->modify_mutex_created = 0;
- information->modify_cond_created = 0;
+ client_initialise(information);
/* Add to list of clients. */
pthread_mutex_lock(&slave_mutex);
@@ -459,26 +451,11 @@ void* slave_loop(void* data)
information->list_entry = entry;
information->socket_fd = socket_fd;
information->open = 1;
- information->id = 0;
- information->interception_conditions_count = 0;
- information->send_pending_size = 0;
- information->multicasts_count = 0;
fail_if (mds_message_initialise(&(information->message)));
}
-
- /* Store the thread so that other threads can kill it. */
- information->thread = pthread_self();
- /* Create mutex to make sure two thread to not try to send
- messages concurrently, and other slave local actions. */
- fail_if ((errno = pthread_mutex_init(&(information->mutex), NULL)));
- information->mutex_created = 1;
-
- /* Create mutex and codition for multicast interception replies. */
- fail_if ((errno = pthread_mutex_init(&(information->modify_mutex), NULL)));
- information->modify_mutex_created = 1;
- fail_if ((errno = pthread_cond_init(&(information->modify_cond), NULL)));
- information->modify_cond_created = 1;
+ /* Store slave thread and create mutexes and conditions. */
+ fail_if (client_initialise_threading(information));
/* Make the server update without all slaves dying on SIGUSR1. */
@@ -489,93 +466,32 @@ void* slave_loop(void* data)
/* Fetch messages from the slave. */
- while ((terminating == 0) && (information->open == 0))
+ while ((terminating == 0) && information->open)
{
/* Send queued multicast messages. */
- if (information->multicasts_count > 0)
- {
- multicast_t multicast;
- with_mutex (information->mutex,
- if (information->multicasts_count > 0)
- {
- size_t c = (information->multicasts_count -= 1) * sizeof(multicast_t);
- multicast = information->multicasts[0];
- memmove(information->multicasts, information->multicasts + 1, c);
- if (c == 0)
- {
- free(information->multicasts);
- information->multicasts = NULL;
- }
- }
- );
- multicast_message(&multicast);
- multicast_destroy(&multicast);
- }
+ send_multicast_queue(information);
/* Send queued messages. */
- if (information->send_pending_size > 0)
- {
- char* sendbuf = information->send_pending;
- char* sendbuf_ = sendbuf;
- size_t sent;
- n = information->send_pending_size;
- information->send_pending_size = 0;
- information->send_pending = NULL;
- with_mutex (information->mutex,
- while (n > 0)
- {
- sent = send_message(information->socket_fd, sendbuf_, n);
- if ((sent < n) && (errno != EINTR)) /* Ignore EINTR */
- {
- perror(*argv);
- break;
- }
- n -= sent;
- sendbuf_ += sent / sizeof(char);
- }
- free(sendbuf);
- );
- }
+ send_reply_queue(information);
+
/* Fetch message. */
- r = mds_message_read(&(information->message), socket_fd);
- if (r == 0)
- {
- if (message_received(information) == 1)
- goto terminate;
- }
- else
- if (r == -2)
- {
- eprint("corrupt message received.");
- goto fail;
- }
- else if (errno == ECONNRESET)
- {
- r = mds_message_read(&(information->message), socket_fd);
- if ((r == 0) && message_received(information))
- goto terminate;
- information->open = 0;
- /* Connection closed. */
- }
- else if (errno == EINTR)
- {
- /* Stop the thread if we are re-exec:ing the server. */
- if (terminating)
- goto terminate;
- }
- else
- goto pfail;
+ r = fetch_message(information);
+ if ((r == 0) && message_received(information))
+ goto terminate;
+ else if (r == -2)
+ goto fail;
+ else if (r && (errno == EINTR) && terminating)
+ goto terminate; /* Stop the thread if we are re-exec:ing or terminating the server. */
}
- /* Stop the thread if we are re-exec:ing the server. */
+ /* Stop the thread if we are re-exec:ing or terminating the server. */
if (terminating)
goto terminate;
/* Multicast information about the client closing. */
n = 2 * 10 + 1 + strlen("Client closed: :\n\n");
- if (xmalloc(msgbuf, n, char))
- goto fail;
+ fail_if (xmalloc(msgbuf, n, char));
snprintf(msgbuf, n,
"Client closed: %" PRIu32 ":%" PRIu32 "\n"
"\n",
@@ -586,11 +502,11 @@ void* slave_loop(void* data)
msgbuf = NULL;
- terminate:
+ terminate: /* This done on success as well. */
if (reexecing)
goto reexec;
- fail: /* The loop does break, this done on success as well. */
+ fail: /* This done on success as well. */
/* Close socket and free resources. */
close(socket_fd);
free(msgbuf);
@@ -604,7 +520,6 @@ void* slave_loop(void* data)
linked_list_remove(&client_list, entry);
running_slaves--;
pthread_cond_signal(&slave_cond););
-
return NULL;
@@ -621,17 +536,110 @@ void* slave_loop(void* data)
with_mutex (slave_mutex,
running_slaves--;
pthread_cond_signal(&slave_cond););
-
return NULL;
}
/**
+ * Send the next message in a clients multicast queue
+ *
+ * @param client The client
+ */
+void send_multicast_queue(client_t* client)
+{
+ while (client->multicasts_count > 0)
+ {
+ multicast_t multicast;
+ with_mutex (client->mutex,
+ if (client->multicasts_count > 0)
+ {
+ size_t c = (client->multicasts_count -= 1) * sizeof(multicast_t);
+ multicast = client->multicasts[0];
+ memmove(client->multicasts, client->multicasts + 1, c);
+ if (c == 0)
+ {
+ free(client->multicasts);
+ client->multicasts = NULL;
+ }
+ }
+ );
+ multicast_message(&multicast);
+ multicast_destroy(&multicast);
+ }
+}
+
+
+/**
+ * Send the messages that are in a clients reply queue
+ *
+ * @param client The client
+ */
+void send_reply_queue(client_t* client)
+{
+ char* sendbuf = client->send_pending;
+ char* sendbuf_ = sendbuf;
+ size_t sent;
+ size_t n;
+
+ if (client->send_pending_size == 0)
+ return;
+
+ n = client->send_pending_size;
+ client->send_pending_size = 0;
+ client->send_pending = NULL;
+ with_mutex (client->mutex,
+ while (n > 0)
+ {
+ sent = send_message(client->socket_fd, sendbuf_, n);
+ if ((sent < n) && (errno != EINTR)) /* Ignore EINTR */
+ {
+ perror(*argv);
+ break;
+ }
+ n -= sent;
+ sendbuf_ += sent / sizeof(char);
+ }
+ free(sendbuf);
+ );
+}
+
+
+/**
+ * Receive a full message and update open status if the client closes
+ *
+ * @param client The client
+ * @return Zero on success, -2 on failure, otherwise -1
+ */
+int fetch_message(client_t* client)
+{
+ int r = mds_message_read(&(client->message), client->socket_fd);
+ if (r == 0)
+ return 0;
+
+ if (r == -2)
+ eprint("corrupt message received.");
+ else if (errno == ECONNRESET)
+ {
+ r = mds_message_read(&(client->message), client->socket_fd);
+ client->open = 0;
+ /* Connection closed. */
+ }
+ else if (errno != EINTR)
+ {
+ r = -2;
+ perror(*argv);
+ }
+
+ return r;
+}
+
+
+/**
* Perform actions that should be taken when
* a message has been received from a client
*
* @param client The client has sent a message
- * @return Normally zero, but 1 if exited because of re-exec
+ * @return Normally zero, but 1 if exited because of re-exec or termination
*/
int message_received(client_t* client)
{
@@ -664,7 +672,7 @@ int message_received(client_t* client)
/* Notify waiting client about a received message modification. */
if (modifying != 0)
{
- /* pthread_cond_timedwait is required to handle re-exec because
+ /* pthread_cond_timedwait is required to handle re-exec and termination because
pthread_cond_timedwait and pthread_cond_wait ignore interruptions via signals. */
struct timespec timeout =
{
@@ -861,7 +869,7 @@ int message_received(client_t* client)
queue_message_multicast(msgbuf_, n, client);
/* Queue message to be sent when this function returns.
- This done to simplify `multicast_message` for re-exec. */
+ This done to simplify `multicast_message` for re-exec and termination. */
with_mutex (client->mutex,
if (client->send_pending_size == 0)
{
@@ -1290,7 +1298,7 @@ void multicast_message(multicast_t* multicast)
}
);
- /* Stop if we are re-exec:ing, or continue to next recipient on error. */
+ /* Stop if we are re-exec:ing or terminating, or continue to next recipient on error. */
if (n > 0)
{
if (terminating)
@@ -1309,7 +1317,7 @@ void multicast_message(multicast_t* multicast)
/* Wait for a reply and act upon it. */
{
- /* pthread_cond_timedwait is required to handle re-exec because
+ /* pthread_cond_timedwait is required to handle re-exec and termination because
pthread_cond_timedwait and pthread_cond_wait ignore interruptions via signals. */
struct timespec timeout =
{
diff --git a/src/mds-server/mds-server.h b/src/mds-server/mds-server.h
index 08f7fd4..9abf095 100644
--- a/src/mds-server/mds-server.h
+++ b/src/mds-server/mds-server.h
@@ -35,11 +35,25 @@
void* slave_loop(void* data);
/**
+ * Send the next message in a clients multicast queue
+ *
+ * @param client The client
+ */
+void send_multicast_queue(client_t* client);
+
+/**
+ * Send the messages that are in a clients reply queue
+ *
+ * @param client The client
+ */
+void send_reply_queue(client_t* client);
+
+/**
* Perform actions that should be taken when
* a message has been received from a client
*
* @param client The client has sent a message
- * @return Normally zero, but 1 if exited because of re-exec
+ * @return Normally zero, but 1 if exited because of re-exec or termination
*/
int message_received(client_t* client);
@@ -64,6 +78,14 @@ void add_intercept_condition(client_t* client, char* condition, int64_t priority
void queue_message_multicast(char* message, size_t length, client_t* sender);
/**
+ * Receive a full message and update open status if the client closes
+ *
+ * @param client The client
+ * @return Zero on success, -2 on failure, otherwise -1
+ */
+int fetch_message(client_t* client);
+
+/**
* Multicast a message
*
* @param multicast The multicast message