aboutsummaryrefslogtreecommitdiffstats
path: root/src/mds-server/mds-server.c
diff options
context:
space:
mode:
authorMattias Andrée <maandree@operamail.com>2014-05-18 07:08:32 +0200
committerMattias Andrée <maandree@operamail.com>2014-05-18 07:08:32 +0200
commit1936e97603ba95ae2f657208349ef072039aca8c (patch)
tree87c5cfd2ef5e5f77f8d1ba1723013988af9a2b1c /src/mds-server/mds-server.c
parentstyle + reduce code complexity (diff)
downloadmds-1936e97603ba95ae2f657208349ef072039aca8c.tar.gz
mds-1936e97603ba95ae2f657208349ef072039aca8c.tar.bz2
mds-1936e97603ba95ae2f657208349ef072039aca8c.tar.xz
reduce code complexity
Signed-off-by: Mattias Andrée <maandree@operamail.com>
Diffstat (limited to 'src/mds-server/mds-server.c')
-rw-r--r--src/mds-server/mds-server.c226
1 files changed, 117 insertions, 109 deletions
diff --git a/src/mds-server/mds-server.c b/src/mds-server/mds-server.c
index 039a478..c9e41c0 100644
--- a/src/mds-server/mds-server.c
+++ b/src/mds-server/mds-server.c
@@ -425,15 +425,7 @@ void* slave_loop(void* data)
{
/* Create information table. */
fail_if (xmalloc(information, 1, client_t));
-
- /* NULL-out pointers and initialisation markers. */
- information->interception_conditions = NULL;
- information->send_pending = NULL;
- information->modify_message = NULL;
- information->multicasts = NULL;
- information->mutex_created = 0;
- information->modify_mutex_created = 0;
- information->modify_cond_created = 0;
+ client_initialise(information);
/* Add to list of clients. */
pthread_mutex_lock(&slave_mutex);
@@ -459,26 +451,11 @@ void* slave_loop(void* data)
information->list_entry = entry;
information->socket_fd = socket_fd;
information->open = 1;
- information->id = 0;
- information->interception_conditions_count = 0;
- information->send_pending_size = 0;
- information->multicasts_count = 0;
fail_if (mds_message_initialise(&(information->message)));
}
-
- /* Store the thread so that other threads can kill it. */
- information->thread = pthread_self();
- /* Create mutex to make sure two thread to not try to send
- messages concurrently, and other slave local actions. */
- fail_if ((errno = pthread_mutex_init(&(information->mutex), NULL)));
- information->mutex_created = 1;
-
- /* Create mutex and codition for multicast interception replies. */
- fail_if ((errno = pthread_mutex_init(&(information->modify_mutex), NULL)));
- information->modify_mutex_created = 1;
- fail_if ((errno = pthread_cond_init(&(information->modify_cond), NULL)));
- information->modify_cond_created = 1;
+ /* Store slave thread and create mutexes and conditions. */
+ fail_if (client_initialise_threading(information));
/* Make the server update without all slaves dying on SIGUSR1. */
@@ -489,93 +466,32 @@ void* slave_loop(void* data)
/* Fetch messages from the slave. */
- while ((terminating == 0) && (information->open == 0))
+ while ((terminating == 0) && information->open)
{
/* Send queued multicast messages. */
- if (information->multicasts_count > 0)
- {
- multicast_t multicast;
- with_mutex (information->mutex,
- if (information->multicasts_count > 0)
- {
- size_t c = (information->multicasts_count -= 1) * sizeof(multicast_t);
- multicast = information->multicasts[0];
- memmove(information->multicasts, information->multicasts + 1, c);
- if (c == 0)
- {
- free(information->multicasts);
- information->multicasts = NULL;
- }
- }
- );
- multicast_message(&multicast);
- multicast_destroy(&multicast);
- }
+ send_multicast_queue(information);
/* Send queued messages. */
- if (information->send_pending_size > 0)
- {
- char* sendbuf = information->send_pending;
- char* sendbuf_ = sendbuf;
- size_t sent;
- n = information->send_pending_size;
- information->send_pending_size = 0;
- information->send_pending = NULL;
- with_mutex (information->mutex,
- while (n > 0)
- {
- sent = send_message(information->socket_fd, sendbuf_, n);
- if ((sent < n) && (errno != EINTR)) /* Ignore EINTR */
- {
- perror(*argv);
- break;
- }
- n -= sent;
- sendbuf_ += sent / sizeof(char);
- }
- free(sendbuf);
- );
- }
+ send_reply_queue(information);
+
/* Fetch message. */
- r = mds_message_read(&(information->message), socket_fd);
- if (r == 0)
- {
- if (message_received(information) == 1)
- goto terminate;
- }
- else
- if (r == -2)
- {
- eprint("corrupt message received.");
- goto fail;
- }
- else if (errno == ECONNRESET)
- {
- r = mds_message_read(&(information->message), socket_fd);
- if ((r == 0) && message_received(information))
- goto terminate;
- information->open = 0;
- /* Connection closed. */
- }
- else if (errno == EINTR)
- {
- /* Stop the thread if we are re-exec:ing the server. */
- if (terminating)
- goto terminate;
- }
- else
- goto pfail;
+ r = fetch_message(information);
+ if ((r == 0) && message_received(information))
+ goto terminate;
+ else if (r == -2)
+ goto fail;
+ else if (r && (errno == EINTR) && terminating)
+ goto terminate; /* Stop the thread if we are re-exec:ing or terminating the server. */
}
- /* Stop the thread if we are re-exec:ing the server. */
+ /* Stop the thread if we are re-exec:ing or terminating the server. */
if (terminating)
goto terminate;
/* Multicast information about the client closing. */
n = 2 * 10 + 1 + strlen("Client closed: :\n\n");
- if (xmalloc(msgbuf, n, char))
- goto fail;
+ fail_if (xmalloc(msgbuf, n, char));
snprintf(msgbuf, n,
"Client closed: %" PRIu32 ":%" PRIu32 "\n"
"\n",
@@ -586,11 +502,11 @@ void* slave_loop(void* data)
msgbuf = NULL;
- terminate:
+ terminate: /* This done on success as well. */
if (reexecing)
goto reexec;
- fail: /* The loop does break, this done on success as well. */
+ fail: /* This done on success as well. */
/* Close socket and free resources. */
close(socket_fd);
free(msgbuf);
@@ -604,7 +520,6 @@ void* slave_loop(void* data)
linked_list_remove(&client_list, entry);
running_slaves--;
pthread_cond_signal(&slave_cond););
-
return NULL;
@@ -621,17 +536,110 @@ void* slave_loop(void* data)
with_mutex (slave_mutex,
running_slaves--;
pthread_cond_signal(&slave_cond););
-
return NULL;
}
/**
+ * Send the next message in a clients multicast queue
+ *
+ * @param client The client
+ */
+void send_multicast_queue(client_t* client)
+{
+ while (client->multicasts_count > 0)
+ {
+ multicast_t multicast;
+ with_mutex (client->mutex,
+ if (client->multicasts_count > 0)
+ {
+ size_t c = (client->multicasts_count -= 1) * sizeof(multicast_t);
+ multicast = client->multicasts[0];
+ memmove(client->multicasts, client->multicasts + 1, c);
+ if (c == 0)
+ {
+ free(client->multicasts);
+ client->multicasts = NULL;
+ }
+ }
+ );
+ multicast_message(&multicast);
+ multicast_destroy(&multicast);
+ }
+}
+
+
+/**
+ * Send the messages that are in a clients reply queue
+ *
+ * @param client The client
+ */
+void send_reply_queue(client_t* client)
+{
+ char* sendbuf = client->send_pending;
+ char* sendbuf_ = sendbuf;
+ size_t sent;
+ size_t n;
+
+ if (client->send_pending_size == 0)
+ return;
+
+ n = client->send_pending_size;
+ client->send_pending_size = 0;
+ client->send_pending = NULL;
+ with_mutex (client->mutex,
+ while (n > 0)
+ {
+ sent = send_message(client->socket_fd, sendbuf_, n);
+ if ((sent < n) && (errno != EINTR)) /* Ignore EINTR */
+ {
+ perror(*argv);
+ break;
+ }
+ n -= sent;
+ sendbuf_ += sent / sizeof(char);
+ }
+ free(sendbuf);
+ );
+}
+
+
+/**
+ * Receive a full message and update open status if the client closes
+ *
+ * @param client The client
+ * @return Zero on success, -2 on failure, otherwise -1
+ */
+int fetch_message(client_t* client)
+{
+ int r = mds_message_read(&(client->message), client->socket_fd);
+ if (r == 0)
+ return 0;
+
+ if (r == -2)
+ eprint("corrupt message received.");
+ else if (errno == ECONNRESET)
+ {
+ r = mds_message_read(&(client->message), client->socket_fd);
+ client->open = 0;
+ /* Connection closed. */
+ }
+ else if (errno != EINTR)
+ {
+ r = -2;
+ perror(*argv);
+ }
+
+ return r;
+}
+
+
+/**
* Perform actions that should be taken when
* a message has been received from a client
*
* @param client The client has sent a message
- * @return Normally zero, but 1 if exited because of re-exec
+ * @return Normally zero, but 1 if exited because of re-exec or termination
*/
int message_received(client_t* client)
{
@@ -664,7 +672,7 @@ int message_received(client_t* client)
/* Notify waiting client about a received message modification. */
if (modifying != 0)
{
- /* pthread_cond_timedwait is required to handle re-exec because
+ /* pthread_cond_timedwait is required to handle re-exec and termination because
pthread_cond_timedwait and pthread_cond_wait ignore interruptions via signals. */
struct timespec timeout =
{
@@ -861,7 +869,7 @@ int message_received(client_t* client)
queue_message_multicast(msgbuf_, n, client);
/* Queue message to be sent when this function returns.
- This done to simplify `multicast_message` for re-exec. */
+ This done to simplify `multicast_message` for re-exec and termination. */
with_mutex (client->mutex,
if (client->send_pending_size == 0)
{
@@ -1290,7 +1298,7 @@ void multicast_message(multicast_t* multicast)
}
);
- /* Stop if we are re-exec:ing, or continue to next recipient on error. */
+ /* Stop if we are re-exec:ing or terminating, or continue to next recipient on error. */
if (n > 0)
{
if (terminating)
@@ -1309,7 +1317,7 @@ void multicast_message(multicast_t* multicast)
/* Wait for a reply and act upon it. */
{
- /* pthread_cond_timedwait is required to handle re-exec because
+ /* pthread_cond_timedwait is required to handle re-exec and termination because
pthread_cond_timedwait and pthread_cond_wait ignore interruptions via signals. */
struct timespec timeout =
{