diff options
-rw-r--r-- | src/mds-server/client.c | 68 | ||||
-rw-r--r-- | src/mds-server/client.h | 32 | ||||
-rw-r--r-- | src/mds-server/mds-server.c | 226 | ||||
-rw-r--r-- | src/mds-server/mds-server.h | 24 |
4 files changed, 240 insertions, 110 deletions
diff --git a/src/mds-server/client.c b/src/mds-server/client.c index 15afc18..76c7250 100644 --- a/src/mds-server/client.c +++ b/src/mds-server/client.c @@ -24,10 +24,78 @@ #include <stdlib.h> #include <string.h> #include <pthread.h> +#include <errno.h> /** + * Initialise a client + * + * The following fields will not be initialised: + * - message + * - thread + * - mutex + * - modify_mutex + * - modify_cond + * + * The follow fields will be initialised to `-1`: + * - list_entry + * - socket_fd + * + * @param this Memory slot in which to store the new client information + */ +void client_initialise(client_t* restrict this) +{ + this->list_entry = -1; + this->socket_fd = -1; + this->open = 0; + this->id = 0; + this->mutex_created = 0; + this->interception_conditions = NULL; + this->interception_conditions_count = 0; + this->multicasts = NULL; + this->multicasts_count = 0; + this->send_pending = NULL; + this->send_pending_size = 0; + this->modify_message = NULL; + this->modify_mutex_created = 0; + this->modify_cond_created = 0; +} + + +/** + * Initialise fields that have to do with threading + * + * This method initialises the following fields: + * - thread + * - mutex + * - modify_mutex + * - modify_cond + * + * @param this The client information + * @return Zero on success, -1 on error + */ +int client_initialise_threading(client_t* restrict this) +{ + /* Store the thread so that other threads can kill it. */ + this->thread = pthread_self(); + + /* Create mutex to make sure two thread to not try to send + messages concurrently, and other client local actions. */ + if ((errno = pthread_mutex_init(&(this->mutex), NULL))) return -1; + this->mutex_created = 1; + + /* Create mutex and codition for multicast interception replies. */ + if ((errno = pthread_mutex_init(&(this->modify_mutex), NULL))) return -1; + this->modify_mutex_created = 1; + if ((errno = pthread_cond_init(&(this->modify_cond), NULL))) return -1; + this->modify_cond_created = 1; + + return 0; +} + + +/** * Release all resources assoicated with a client * * @param this The client information diff --git a/src/mds-server/client.h b/src/mds-server/client.h index 2aeb813..39ed791 100644 --- a/src/mds-server/client.h +++ b/src/mds-server/client.h @@ -137,6 +137,38 @@ typedef struct client /** + * Initialise a client + * + * The following fields will not be initialised: + * - message + * - thread + * - mutex + * - modify_mutex + * - modify_cond + * + * The follow fields will be initialised to `-1`: + * - list_entry + * - socket_fd + * + * @param this Memory slot in which to store the new client information + */ +void client_initialise(client_t* restrict this); + +/** + * Initialise fields that have to do with threading + * + * This method initialises the following fields: + * - thread + * - mutex + * - modify_mutex + * - modify_cond + * + * @param this The client information + * @return Zero on success, -1 on error + */ +int client_initialise_threading(client_t* restrict this); + +/** * Release all resources assoicated with a client * * @param this The client information diff --git a/src/mds-server/mds-server.c b/src/mds-server/mds-server.c index 039a478..c9e41c0 100644 --- a/src/mds-server/mds-server.c +++ b/src/mds-server/mds-server.c @@ -425,15 +425,7 @@ void* slave_loop(void* data) { /* Create information table. */ fail_if (xmalloc(information, 1, client_t)); - - /* NULL-out pointers and initialisation markers. */ - information->interception_conditions = NULL; - information->send_pending = NULL; - information->modify_message = NULL; - information->multicasts = NULL; - information->mutex_created = 0; - information->modify_mutex_created = 0; - information->modify_cond_created = 0; + client_initialise(information); /* Add to list of clients. */ pthread_mutex_lock(&slave_mutex); @@ -459,26 +451,11 @@ void* slave_loop(void* data) information->list_entry = entry; information->socket_fd = socket_fd; information->open = 1; - information->id = 0; - information->interception_conditions_count = 0; - information->send_pending_size = 0; - information->multicasts_count = 0; fail_if (mds_message_initialise(&(information->message))); } - - /* Store the thread so that other threads can kill it. */ - information->thread = pthread_self(); - /* Create mutex to make sure two thread to not try to send - messages concurrently, and other slave local actions. */ - fail_if ((errno = pthread_mutex_init(&(information->mutex), NULL))); - information->mutex_created = 1; - - /* Create mutex and codition for multicast interception replies. */ - fail_if ((errno = pthread_mutex_init(&(information->modify_mutex), NULL))); - information->modify_mutex_created = 1; - fail_if ((errno = pthread_cond_init(&(information->modify_cond), NULL))); - information->modify_cond_created = 1; + /* Store slave thread and create mutexes and conditions. */ + fail_if (client_initialise_threading(information)); /* Make the server update without all slaves dying on SIGUSR1. */ @@ -489,93 +466,32 @@ void* slave_loop(void* data) /* Fetch messages from the slave. */ - while ((terminating == 0) && (information->open == 0)) + while ((terminating == 0) && information->open) { /* Send queued multicast messages. */ - if (information->multicasts_count > 0) - { - multicast_t multicast; - with_mutex (information->mutex, - if (information->multicasts_count > 0) - { - size_t c = (information->multicasts_count -= 1) * sizeof(multicast_t); - multicast = information->multicasts[0]; - memmove(information->multicasts, information->multicasts + 1, c); - if (c == 0) - { - free(information->multicasts); - information->multicasts = NULL; - } - } - ); - multicast_message(&multicast); - multicast_destroy(&multicast); - } + send_multicast_queue(information); /* Send queued messages. */ - if (information->send_pending_size > 0) - { - char* sendbuf = information->send_pending; - char* sendbuf_ = sendbuf; - size_t sent; - n = information->send_pending_size; - information->send_pending_size = 0; - information->send_pending = NULL; - with_mutex (information->mutex, - while (n > 0) - { - sent = send_message(information->socket_fd, sendbuf_, n); - if ((sent < n) && (errno != EINTR)) /* Ignore EINTR */ - { - perror(*argv); - break; - } - n -= sent; - sendbuf_ += sent / sizeof(char); - } - free(sendbuf); - ); - } + send_reply_queue(information); + /* Fetch message. */ - r = mds_message_read(&(information->message), socket_fd); - if (r == 0) - { - if (message_received(information) == 1) - goto terminate; - } - else - if (r == -2) - { - eprint("corrupt message received."); - goto fail; - } - else if (errno == ECONNRESET) - { - r = mds_message_read(&(information->message), socket_fd); - if ((r == 0) && message_received(information)) - goto terminate; - information->open = 0; - /* Connection closed. */ - } - else if (errno == EINTR) - { - /* Stop the thread if we are re-exec:ing the server. */ - if (terminating) - goto terminate; - } - else - goto pfail; + r = fetch_message(information); + if ((r == 0) && message_received(information)) + goto terminate; + else if (r == -2) + goto fail; + else if (r && (errno == EINTR) && terminating) + goto terminate; /* Stop the thread if we are re-exec:ing or terminating the server. */ } - /* Stop the thread if we are re-exec:ing the server. */ + /* Stop the thread if we are re-exec:ing or terminating the server. */ if (terminating) goto terminate; /* Multicast information about the client closing. */ n = 2 * 10 + 1 + strlen("Client closed: :\n\n"); - if (xmalloc(msgbuf, n, char)) - goto fail; + fail_if (xmalloc(msgbuf, n, char)); snprintf(msgbuf, n, "Client closed: %" PRIu32 ":%" PRIu32 "\n" "\n", @@ -586,11 +502,11 @@ void* slave_loop(void* data) msgbuf = NULL; - terminate: + terminate: /* This done on success as well. */ if (reexecing) goto reexec; - fail: /* The loop does break, this done on success as well. */ + fail: /* This done on success as well. */ /* Close socket and free resources. */ close(socket_fd); free(msgbuf); @@ -604,7 +520,6 @@ void* slave_loop(void* data) linked_list_remove(&client_list, entry); running_slaves--; pthread_cond_signal(&slave_cond);); - return NULL; @@ -621,17 +536,110 @@ void* slave_loop(void* data) with_mutex (slave_mutex, running_slaves--; pthread_cond_signal(&slave_cond);); - return NULL; } /** + * Send the next message in a clients multicast queue + * + * @param client The client + */ +void send_multicast_queue(client_t* client) +{ + while (client->multicasts_count > 0) + { + multicast_t multicast; + with_mutex (client->mutex, + if (client->multicasts_count > 0) + { + size_t c = (client->multicasts_count -= 1) * sizeof(multicast_t); + multicast = client->multicasts[0]; + memmove(client->multicasts, client->multicasts + 1, c); + if (c == 0) + { + free(client->multicasts); + client->multicasts = NULL; + } + } + ); + multicast_message(&multicast); + multicast_destroy(&multicast); + } +} + + +/** + * Send the messages that are in a clients reply queue + * + * @param client The client + */ +void send_reply_queue(client_t* client) +{ + char* sendbuf = client->send_pending; + char* sendbuf_ = sendbuf; + size_t sent; + size_t n; + + if (client->send_pending_size == 0) + return; + + n = client->send_pending_size; + client->send_pending_size = 0; + client->send_pending = NULL; + with_mutex (client->mutex, + while (n > 0) + { + sent = send_message(client->socket_fd, sendbuf_, n); + if ((sent < n) && (errno != EINTR)) /* Ignore EINTR */ + { + perror(*argv); + break; + } + n -= sent; + sendbuf_ += sent / sizeof(char); + } + free(sendbuf); + ); +} + + +/** + * Receive a full message and update open status if the client closes + * + * @param client The client + * @return Zero on success, -2 on failure, otherwise -1 + */ +int fetch_message(client_t* client) +{ + int r = mds_message_read(&(client->message), client->socket_fd); + if (r == 0) + return 0; + + if (r == -2) + eprint("corrupt message received."); + else if (errno == ECONNRESET) + { + r = mds_message_read(&(client->message), client->socket_fd); + client->open = 0; + /* Connection closed. */ + } + else if (errno != EINTR) + { + r = -2; + perror(*argv); + } + + return r; +} + + +/** * Perform actions that should be taken when * a message has been received from a client * * @param client The client has sent a message - * @return Normally zero, but 1 if exited because of re-exec + * @return Normally zero, but 1 if exited because of re-exec or termination */ int message_received(client_t* client) { @@ -664,7 +672,7 @@ int message_received(client_t* client) /* Notify waiting client about a received message modification. */ if (modifying != 0) { - /* pthread_cond_timedwait is required to handle re-exec because + /* pthread_cond_timedwait is required to handle re-exec and termination because pthread_cond_timedwait and pthread_cond_wait ignore interruptions via signals. */ struct timespec timeout = { @@ -861,7 +869,7 @@ int message_received(client_t* client) queue_message_multicast(msgbuf_, n, client); /* Queue message to be sent when this function returns. - This done to simplify `multicast_message` for re-exec. */ + This done to simplify `multicast_message` for re-exec and termination. */ with_mutex (client->mutex, if (client->send_pending_size == 0) { @@ -1290,7 +1298,7 @@ void multicast_message(multicast_t* multicast) } ); - /* Stop if we are re-exec:ing, or continue to next recipient on error. */ + /* Stop if we are re-exec:ing or terminating, or continue to next recipient on error. */ if (n > 0) { if (terminating) @@ -1309,7 +1317,7 @@ void multicast_message(multicast_t* multicast) /* Wait for a reply and act upon it. */ { - /* pthread_cond_timedwait is required to handle re-exec because + /* pthread_cond_timedwait is required to handle re-exec and termination because pthread_cond_timedwait and pthread_cond_wait ignore interruptions via signals. */ struct timespec timeout = { diff --git a/src/mds-server/mds-server.h b/src/mds-server/mds-server.h index 08f7fd4..9abf095 100644 --- a/src/mds-server/mds-server.h +++ b/src/mds-server/mds-server.h @@ -35,11 +35,25 @@ void* slave_loop(void* data); /** + * Send the next message in a clients multicast queue + * + * @param client The client + */ +void send_multicast_queue(client_t* client); + +/** + * Send the messages that are in a clients reply queue + * + * @param client The client + */ +void send_reply_queue(client_t* client); + +/** * Perform actions that should be taken when * a message has been received from a client * * @param client The client has sent a message - * @return Normally zero, but 1 if exited because of re-exec + * @return Normally zero, but 1 if exited because of re-exec or termination */ int message_received(client_t* client); @@ -64,6 +78,14 @@ void add_intercept_condition(client_t* client, char* condition, int64_t priority void queue_message_multicast(char* message, size_t length, client_t* sender); /** + * Receive a full message and update open status if the client closes + * + * @param client The client + * @return Zero on success, -2 on failure, otherwise -1 + */ +int fetch_message(client_t* client); + +/** * Multicast a message * * @param multicast The multicast message |