diff options
Diffstat (limited to '')
-rw-r--r-- | src/mds-server/client.c | 409 | ||||
-rw-r--r-- | src/mds-server/client.h | 211 | ||||
-rw-r--r-- | src/mds-server/globals.c | 1 | ||||
-rw-r--r-- | src/mds-server/globals.h | 2 | ||||
-rw-r--r-- | src/mds-server/interception-condition.c | 67 | ||||
-rw-r--r-- | src/mds-server/interception-condition.h | 57 | ||||
-rw-r--r-- | src/mds-server/interceptors.c | 376 | ||||
-rw-r--r-- | src/mds-server/interceptors.h | 15 | ||||
-rw-r--r-- | src/mds-server/mds-server.c | 835 | ||||
-rw-r--r-- | src/mds-server/mds-server.h | 7 | ||||
-rw-r--r-- | src/mds-server/multicast.c | 154 | ||||
-rw-r--r-- | src/mds-server/multicast.h | 85 | ||||
-rw-r--r-- | src/mds-server/queued-interception.c | 41 | ||||
-rw-r--r-- | src/mds-server/queued-interception.h | 47 | ||||
-rw-r--r-- | src/mds-server/receiving.c | 465 | ||||
-rw-r--r-- | src/mds-server/receiving.h | 3 | ||||
-rw-r--r-- | src/mds-server/reexec.c | 399 | ||||
-rw-r--r-- | src/mds-server/sending.c | 345 | ||||
-rw-r--r-- | src/mds-server/sending.h | 7 | ||||
-rw-r--r-- | src/mds-server/signals.c | 33 | ||||
-rw-r--r-- | src/mds-server/slavery.c | 159 | ||||
-rw-r--r-- | src/mds-server/slavery.h | 7 |
22 files changed, 1825 insertions, 1900 deletions
diff --git a/src/mds-server/client.c b/src/mds-server/client.c index 749a6ca..ad7104b 100644 --- a/src/mds-server/client.c +++ b/src/mds-server/client.c @@ -44,22 +44,23 @@ * * @param this Memory slot in which to store the new client information */ -void client_initialise(client_t* restrict this) +void +client_initialise(client_t *restrict this) { - this->list_entry = -1; - this->socket_fd = -1; - this->open = 0; - this->id = 0; - this->mutex_created = 0; - this->interception_conditions = NULL; - this->interception_conditions_count = 0; - this->multicasts = NULL; - this->multicasts_count = 0; - this->send_pending = NULL; - this->send_pending_size = 0; - this->modify_message = NULL; - this->modify_mutex_created = 0; - this->modify_cond_created = 0; + this->list_entry = -1; + this->socket_fd = -1; + this->open = 0; + this->id = 0; + this->mutex_created = 0; + this->interception_conditions = NULL; + this->interception_conditions_count = 0; + this->multicasts = NULL; + this->multicasts_count = 0; + this->send_pending = NULL; + this->send_pending_size = 0; + this->modify_message = NULL; + this->modify_mutex_created = 0; + this->modify_cond_created = 0; } @@ -75,25 +76,26 @@ void client_initialise(client_t* restrict this) * @param this The client information * @return Zero on success, -1 on error */ -int client_initialise_threading(client_t* restrict this) +int +client_initialise_threading(client_t *restrict this) { - /* Store the thread so that other threads can kill it. */ - this->thread = pthread_self(); - - /* Create mutex to make sure two thread to not try to send - messages concurrently, and other client local actions. */ - fail_if ((errno = pthread_mutex_init(&(this->mutex), NULL))); - this->mutex_created = 1; - - /* Create mutex and codition for multicast interception replies. */ - fail_if ((errno = pthread_mutex_init(&(this->modify_mutex), NULL))); - this->modify_mutex_created = 1; - fail_if ((errno = pthread_cond_init(&(this->modify_cond), NULL))); - this->modify_cond_created = 1; - - return 0; + /* Store the thread so that other threads can kill it. */ + this->thread = pthread_self(); + + /* Create mutex to make sure two thread to not try to send + messages concurrently, and other client local actions. */ + fail_if ((errno = pthread_mutex_init(&(this->mutex), NULL))); + this->mutex_created = 1; + + /* Create mutex and codition for multicast interception replies. */ + fail_if ((errno = pthread_mutex_init(&(this->modify_mutex), NULL))); + this->modify_mutex_created = 1; + fail_if ((errno = pthread_cond_init(&(this->modify_cond), NULL))); + this->modify_cond_created = 1; + + return 0; fail: - return -1; + return -1; } @@ -102,36 +104,33 @@ int client_initialise_threading(client_t* restrict this) * * @param this The client information */ -void client_destroy(client_t* restrict this) +void +client_destroy(client_t *restrict this) { - if (this->interception_conditions != NULL) - { - size_t i; - for (i = 0; i < this->interception_conditions_count; i++) - free(this->interception_conditions[i].condition); - free(this->interception_conditions); - } - if (this->mutex_created) - pthread_mutex_destroy(&(this->mutex)); - mds_message_destroy(&(this->message)); - if (this->multicasts != NULL) - { - size_t i; - for (i = 0; i < this->multicasts_count; i++) - multicast_destroy(this->multicasts + i); - free(this->multicasts); - } - free(this->send_pending); - if (this->modify_message != NULL) - { - mds_message_destroy(this->modify_message); - free(this->modify_message); - } - if (this->modify_mutex_created) - pthread_mutex_destroy(&(this->modify_mutex)); - if (this->modify_cond_created) - pthread_cond_destroy(&(this->modify_cond)); - free(this); + size_t i; + if (this->interception_conditions) { + for (i = 0; i < this->interception_conditions_count; i++) + free(this->interception_conditions[i].condition); + free(this->interception_conditions); + } + if (this->mutex_created) + pthread_mutex_destroy(&(this->mutex)); + mds_message_destroy(&(this->message)); + if (this->multicasts) { + for (i = 0; i < this->multicasts_count; i++) + multicast_destroy(this->multicasts + i); + free(this->multicasts); + } + free(this->send_pending); + if (this->modify_message) { + mds_message_destroy(this->modify_message); + free(this->modify_message); + } + if (this->modify_mutex_created) + pthread_mutex_destroy(&(this->modify_mutex)); + if (this->modify_cond_created) + pthread_cond_destroy(&(this->modify_cond)); + free(this); } @@ -141,19 +140,20 @@ void client_destroy(client_t* restrict this) * @param this The client information * @return The number of bytes to allocate to the output buffer */ -size_t client_marshal_size(const client_t* restrict this) +size_t +client_marshal_size(const client_t *restrict this) { - size_t i, n = sizeof(ssize_t) + 3 * sizeof(int) + sizeof(uint64_t) + 5 * sizeof(size_t); - - n += mds_message_marshal_size(&(this->message)); - for (i = 0; i < this->interception_conditions_count; i++) - n += interception_condition_marshal_size(this->interception_conditions + i); - for (i = 0; i < this->multicasts_count; i++) - n += multicast_marshal_size(this->multicasts + i); - n += this->send_pending_size * sizeof(char); - n += this->modify_message == NULL ? 0 : mds_message_marshal_size(this->modify_message); - - return n; + size_t i, n = sizeof(ssize_t) + 3 * sizeof(int) + sizeof(uint64_t) + 5 * sizeof(size_t); + + n += mds_message_marshal_size(&(this->message)); + for (i = 0; i < this->interception_conditions_count; i++) + n += interception_condition_marshal_size(this->interception_conditions + i); + for (i = 0; i < this->multicasts_count; i++) + n += multicast_marshal_size(this->multicasts + i); + n += this->send_pending_size * sizeof(char); + n += !this->modify_message ? 0 : mds_message_marshal_size(this->modify_message); + + return n; } @@ -164,34 +164,35 @@ size_t client_marshal_size(const client_t* restrict this) * @param data Output buffer for the marshalled data * @return The number of bytes that have been written (everything will be written) */ -size_t client_marshal(const client_t* restrict this, char* restrict data) +size_t +client_marshal(const client_t *restrict this, char *restrict data) { - size_t i, n; - buf_set_next(data, int, CLIENT_T_VERSION); - buf_set_next(data, ssize_t, this->list_entry); - buf_set_next(data, int, this->socket_fd); - buf_set_next(data, int, this->open); - buf_set_next(data, uint64_t, this->id); - n = mds_message_marshal_size(&(this->message)); - buf_set_next(data, size_t, n); - if (n > 0) - mds_message_marshal(&(this->message), data); - data += n / sizeof(char); - buf_set_next(data, size_t, this->interception_conditions_count); - for (i = 0; i < this->interception_conditions_count; i++) - data += n = interception_condition_marshal(this->interception_conditions + i, data) / sizeof(char); - buf_set_next(data, size_t, this->multicasts_count); - for (i = 0; i < this->multicasts_count; i++) - data += multicast_marshal(this->multicasts + i, data) / sizeof(char); - buf_set_next(data, size_t, this->send_pending_size); - if (this->send_pending_size > 0) - memcpy(data, this->send_pending, this->send_pending_size * sizeof(char)); - data += this->send_pending_size; - n = this->modify_message == NULL ? 0 : mds_message_marshal_size(this->modify_message); - buf_set_next(data, size_t, n); - if (this->modify_message != NULL) - mds_message_marshal(this->modify_message, data); - return client_marshal_size(this); + size_t i, n; + buf_set_next(data, int, CLIENT_T_VERSION); + buf_set_next(data, ssize_t, this->list_entry); + buf_set_next(data, int, this->socket_fd); + buf_set_next(data, int, this->open); + buf_set_next(data, uint64_t, this->id); + n = mds_message_marshal_size(&(this->message)); + buf_set_next(data, size_t, n); + if (n > 0) + mds_message_marshal(&(this->message), data); + data += n / sizeof(char); + buf_set_next(data, size_t, this->interception_conditions_count); + for (i = 0; i < this->interception_conditions_count; i++) + data += n = interception_condition_marshal(this->interception_conditions + i, data) / sizeof(char); + buf_set_next(data, size_t, this->multicasts_count); + for (i = 0; i < this->multicasts_count; i++) + data += multicast_marshal(this->multicasts + i, data) / sizeof(char); + buf_set_next(data, size_t, this->send_pending_size); + if (this->send_pending_size > 0) + memcpy(data, this->send_pending, this->send_pending_size * sizeof(char)); + data += this->send_pending_size; + n = !this->modify_message ? 0 : mds_message_marshal_size(this->modify_message); + buf_set_next(data, size_t, n); + if (this->modify_message) + mds_message_marshal(this->modify_message, data); + return client_marshal_size(this); } @@ -202,122 +203,116 @@ size_t client_marshal(const client_t* restrict this, char* restrict data) * @param data In buffer with the marshalled data * @return Zero on error, `errno` will be set accordingly, otherwise the number of read bytes */ -size_t client_unmarshal(client_t* restrict this, char* restrict data) +size_t +client_unmarshal(client_t *restrict this, char *restrict data) { - size_t i, n, rc = sizeof(ssize_t) + 3 * sizeof(int) + sizeof(uint64_t) + 5 * sizeof(size_t); - int saved_errno, stage = 0; - this->interception_conditions = NULL; - this->multicasts = NULL; - this->send_pending = NULL; - this->mutex_created = 0; - this->modify_mutex_created = 0; - this->modify_cond_created = 0; - this->multicasts_count = 0; - /* buf_get_next(data, int, CLIENT_T_VERSION); */ - buf_next(data, int, 1); - buf_get_next(data, ssize_t, this->list_entry); - buf_get_next(data, int, this->socket_fd); - buf_get_next(data, int, this->open); - buf_get_next(data, uint64_t, this->id); - buf_get_next(data, size_t, n); - if (n > 0) - fail_if (mds_message_unmarshal(&(this->message), data)); - stage++; - data += n / sizeof(char); - rc += n; - buf_get_next(data, size_t, this->interception_conditions_count); - fail_if (xmalloc(this->interception_conditions, - this->interception_conditions_count, interception_condition_t)); - for (i = 0; i < this->interception_conditions_count; i++) - { - n = interception_condition_unmarshal(this->interception_conditions + i, data); - if (n == 0) - { - this->interception_conditions_count = i - 1; - fail_if (1); + size_t i, n, m, rc = sizeof(ssize_t) + 3 * sizeof(int) + sizeof(uint64_t) + 5 * sizeof(size_t); + int saved_errno, stage = 0; + this->interception_conditions = NULL; + this->multicasts = NULL; + this->send_pending = NULL; + this->mutex_created = 0; + this->modify_mutex_created = 0; + this->modify_cond_created = 0; + this->multicasts_count = 0; + /* buf_get_next(data, int, CLIENT_T_VERSION); */ + buf_next(data, int, 1); + buf_get_next(data, ssize_t, this->list_entry); + buf_get_next(data, int, this->socket_fd); + buf_get_next(data, int, this->open); + buf_get_next(data, uint64_t, this->id); + buf_get_next(data, size_t, n); + if (n > 0) + fail_if (mds_message_unmarshal(&(this->message), data)); + stage++; + data += n / sizeof(char); + rc += n; + buf_get_next(data, size_t, this->interception_conditions_count); + fail_if (xmalloc(this->interception_conditions, this->interception_conditions_count, interception_condition_t)); + for (i = 0; i < this->interception_conditions_count; i++) { + n = interception_condition_unmarshal(this->interception_conditions + i, data); + if (!n) { + this->interception_conditions_count = i - 1; + fail_if (1); + } + data += n / sizeof(char); + rc += n; } - data += n / sizeof(char); - rc += n; - } - buf_get_next(data, size_t, n); - fail_if (xmalloc(this->multicasts, n, multicast_t)); - for (i = 0; i < n; i++, this->multicasts_count++) - { - size_t m = multicast_unmarshal(this->multicasts + i, data); - fail_if (m == 0); - data += m / sizeof(char); - rc += m; - } - buf_get_next(data, size_t, this->send_pending_size); - if (this->send_pending_size > 0) - { - fail_if (xmemdup(this->send_pending, data, this->send_pending_size, char)); - data += this->send_pending_size, rc += this->send_pending_size * sizeof(char); - } - buf_get_next(data, size_t, n); - if (n > 0) - mds_message_unmarshal(this->modify_message, data); - else - this->modify_message = NULL; - rc += n * sizeof(char); - return rc; - - fail: - saved_errno = errno; - if (stage == 0) - goto done_failing; - mds_message_destroy(&(this->message)); - for (i = 0; i < this->interception_conditions_count; i++) - free(this->interception_conditions[i].condition); - free(this->interception_conditions); - for (i = 0; i < this->multicasts_count; i++) - multicast_destroy(this->multicasts + i); - free(this->multicasts); - free(this->send_pending); - if (this->modify_message != NULL) - { - mds_message_destroy(this->modify_message); - free(this->modify_message); - } - done_failing: - return errno = saved_errno, (size_t)0; + buf_get_next(data, size_t, n); + fail_if (xmalloc(this->multicasts, n, multicast_t)); + for (i = 0; i < n; i++, this->multicasts_count++) { + m = multicast_unmarshal(this->multicasts + i, data); + fail_if (!m); + data += m / sizeof(char); + rc += m; + } + buf_get_next(data, size_t, this->send_pending_size); + if (this->send_pending_size > 0) { + fail_if (xmemdup(this->send_pending, data, this->send_pending_size, char)); + data += this->send_pending_size, rc += this->send_pending_size * sizeof(char); + } + buf_get_next(data, size_t, n); + if (n > 0) + mds_message_unmarshal(this->modify_message, data); + else + this->modify_message = NULL; + rc += n * sizeof(char); + return rc; + +fail: + saved_errno = errno; + if (!stage) + goto done_failing; + mds_message_destroy(&(this->message)); + for (i = 0; i < this->interception_conditions_count; i++) + free(this->interception_conditions[i].condition); + free(this->interception_conditions); + for (i = 0; i < this->multicasts_count; i++) + multicast_destroy(this->multicasts + i); + free(this->multicasts); + free(this->send_pending); + if (this->modify_message) { + mds_message_destroy(this->modify_message); + free(this->modify_message); + } +done_failing: + return errno = saved_errno, (size_t)0; } + /** * Pretend to unmarshal client information * * @param data In buffer with the marshalled data * @return The number of read bytes */ -size_t client_unmarshal_skip(char* restrict data) +size_t +client_unmarshal_skip(char *restrict data) { - size_t n, c, rc = sizeof(ssize_t) + 3 * sizeof(int) + sizeof(uint64_t) + 5 * sizeof(size_t); - buf_next(data, int, 1); - buf_next(data, ssize_t, 1); - buf_next(data, int, 2); - buf_next(data, uint64_t, 1); - buf_get_next(data, size_t, n); - data += n / sizeof(char); - rc += n; - buf_get_next(data, size_t, c); - while (c--) - { - n = interception_condition_unmarshal_skip(data); - data += n / sizeof(char); - rc += n; - } - buf_get_next(data, size_t, c); - while (c--) - { - n = multicast_unmarshal_skip(data); - data += n / sizeof(char); - rc += n; - } - buf_get_next(data, size_t, n); - data += n; - rc += n * sizeof(char); - buf_get_next(data, size_t, n); - rc += n * sizeof(char); - return rc; + size_t n, c, rc = sizeof(ssize_t) + 3 * sizeof(int) + sizeof(uint64_t) + 5 * sizeof(size_t); + buf_next(data, int, 1); + buf_next(data, ssize_t, 1); + buf_next(data, int, 2); + buf_next(data, uint64_t, 1); + buf_get_next(data, size_t, n); + data += n / sizeof(char); + rc += n; + buf_get_next(data, size_t, c); + while (c--) { + n = interception_condition_unmarshal_skip(data); + data += n / sizeof(char); + rc += n; + } + buf_get_next(data, size_t, c); + while (c--) { + n = multicast_unmarshal_skip(data); + data += n / sizeof(char); + rc += n; + } + buf_get_next(data, size_t, n); + data += n; + rc += n * sizeof(char); + buf_get_next(data, size_t, n); + rc += n * sizeof(char); + return rc; } - diff --git a/src/mds-server/client.h b/src/mds-server/client.h index cf0aec6..b8c5533 100644 --- a/src/mds-server/client.h +++ b/src/mds-server/client.h @@ -31,110 +31,110 @@ -#define CLIENT_T_VERSION 0 +#define CLIENT_T_VERSION 0 /** * Client information structure */ typedef struct client { - /** - * The client's entry in the list of clients - */ - ssize_t list_entry; - - /** - * The socket file descriptor for the socket connected to the client - */ - int socket_fd; - - /** - * Whether the socket is open - */ - int open; - - /** - * Message read buffer for the client - */ - struct mds_message message; - - /** - * The read thread for the client - */ - pthread_t thread; - - /** - * The client's ID - */ - uint64_t id; - - /** - * Mutex for sending data and other - * actions that only affacts this client - */ - pthread_mutex_t mutex; - - /** - * Whether `mutex` has been initialised - */ - int mutex_created; - - /** - * The messages interception conditions conditions - * for the client - */ - struct interception_condition* interception_conditions; - - /** - * The number of interception conditions - */ - size_t interception_conditions_count; - - /** - * Pending multicast messages - */ - struct multicast* multicasts; - - /** - * The number of pending multicast messages - */ - size_t multicasts_count; - - /** - * Messages pending to be sent (concatenated) - */ - char* send_pending; - - /** - * The character length of the messages pending to be sent - */ - size_t send_pending_size; - - /** - * Pending reply to the multicast interception - */ - struct mds_message* modify_message; - - /** - * Mutex for `modify_message` - */ - pthread_mutex_t modify_mutex; - - /** - * Condidition for `modify_message` - */ - pthread_cond_t modify_cond; - - /** - * Whether `modify_mutex` has been initialised - */ - int modify_mutex_created; - - /** - * Whether `modify_cond` has been initialised - */ - int modify_cond_created; - + /** + * The client's entry in the list of clients + */ + ssize_t list_entry; + + /** + * The socket file descriptor for the socket connected to the client + */ + int socket_fd; + + /** + * Whether the socket is open + */ + int open; + + /** + * Message read buffer for the client + */ + struct mds_message message; + + /** + * The read thread for the client + */ + pthread_t thread; + + /** + * The client's ID + */ + uint64_t id; + + /** + * Mutex for sending data and other + * actions that only affacts this client + */ + pthread_mutex_t mutex; + + /** + * Whether `mutex` has been initialised + */ + int mutex_created; + + /** + * The messages interception conditions conditions + * for the client + */ + struct interception_condition *interception_conditions; + + /** + * The number of interception conditions + */ + size_t interception_conditions_count; + + /** + * Pending multicast messages + */ + struct multicast *multicasts; + + /** + * The number of pending multicast messages + */ + size_t multicasts_count; + + /** + * Messages pending to be sent (concatenated) + */ + char *send_pending; + + /** + * The character length of the messages pending to be sent + */ + size_t send_pending_size; + + /** + * Pending reply to the multicast interception + */ + struct mds_message *modify_message; + + /** + * Mutex for `modify_message` + */ + pthread_mutex_t modify_mutex; + + /** + * Condidition for `modify_message` + */ + pthread_cond_t modify_cond; + + /** + * Whether `modify_mutex` has been initialised + */ + int modify_mutex_created; + + /** + * Whether `modify_cond` has been initialised + */ + int modify_cond_created; + } client_t; @@ -156,7 +156,7 @@ typedef struct client * @param this Memory slot in which to store the new client information */ __attribute__((nonnull)) -void client_initialise(client_t* restrict this); +void client_initialise(client_t *restrict this); /** * Initialise fields that have to do with threading @@ -171,7 +171,7 @@ void client_initialise(client_t* restrict this); * @return Zero on success, -1 on error */ __attribute__((nonnull)) -int client_initialise_threading(client_t* restrict this); +int client_initialise_threading(client_t *restrict this); /** * Release all resources assoicated with a client @@ -179,7 +179,7 @@ int client_initialise_threading(client_t* restrict this); * @param this The client information */ __attribute__((nonnull)) -void client_destroy(client_t* restrict this); +void client_destroy(client_t *restrict this); /** * Calculate the buffer size need to marshal client information @@ -188,7 +188,7 @@ void client_destroy(client_t* restrict this); * @return The number of bytes to allocate to the output buffer */ __attribute__((pure, nonnull)) -size_t client_marshal_size(const client_t* restrict this); +size_t client_marshal_size(const client_t *restrict this); /** * Marshals client information @@ -198,7 +198,7 @@ size_t client_marshal_size(const client_t* restrict this); * @return The number of bytes that have been written (everything will be written) */ __attribute__((nonnull)) -size_t client_marshal(const client_t* restrict this, char* restrict data); +size_t client_marshal(const client_t *restrict this, char *restrict data); /** * Unmarshals client information @@ -209,7 +209,7 @@ size_t client_marshal(const client_t* restrict this, char* restrict data); * number of read bytes. Destroy the client information on error. */ __attribute__((nonnull)) -size_t client_unmarshal(client_t* restrict this, char* restrict data); +size_t client_unmarshal(client_t *restrict this, char *restrict data); /** * Pretend to unmarshal client information @@ -218,8 +218,7 @@ size_t client_unmarshal(client_t* restrict this, char* restrict data); * @return The number of read bytes */ __attribute__((pure, nonnull)) -size_t client_unmarshal_skip(char* restrict data); +size_t client_unmarshal_skip(char *restrict data); #endif - diff --git a/src/mds-server/globals.c b/src/mds-server/globals.c index 077c222..548befa 100644 --- a/src/mds-server/globals.c +++ b/src/mds-server/globals.c @@ -73,4 +73,3 @@ pthread_cond_t modify_cond; * Map from modification ID to waiting client */ hash_table_t modify_map; - diff --git a/src/mds-server/globals.h b/src/mds-server/globals.h index e836ffe..4ca7376 100644 --- a/src/mds-server/globals.h +++ b/src/mds-server/globals.h @@ -31,7 +31,7 @@ -#define MDS_SERVER_VARS_VERSION 0 +#define MDS_SERVER_VARS_VERSION 0 diff --git a/src/mds-server/interception-condition.c b/src/mds-server/interception-condition.c index 3ef66b0..4b35796 100644 --- a/src/mds-server/interception-condition.c +++ b/src/mds-server/interception-condition.c @@ -30,9 +30,10 @@ * @param this The interception condition * @return The number of bytes to allocate to the output buffer */ -size_t interception_condition_marshal_size(const interception_condition_t* restrict this) +size_t +interception_condition_marshal_size(const interception_condition_t *restrict this) { - return sizeof(size_t) + sizeof(int64_t) + 2 * sizeof(int) + (strlen(this->condition) + 1) * sizeof(char); + return sizeof(size_t) + sizeof(int64_t) + 2 * sizeof(int) + (strlen(this->condition) + 1) * sizeof(char); } /** @@ -42,15 +43,16 @@ size_t interception_condition_marshal_size(const interception_condition_t* restr * @param data Output buffer for the marshalled data * @return The number of bytes that have been written (everything will be written) */ -size_t interception_condition_marshal(const interception_condition_t* restrict this, char* restrict data) +size_t +interception_condition_marshal(const interception_condition_t *restrict this, char *restrict data) { - size_t n = (strlen(this->condition) + 1) * sizeof(char); - buf_set_next(data, int, INTERCEPTION_CONDITION_T_VERSION); - buf_set_next(data, size_t, this->header_hash); - buf_set_next(data, int64_t, this->priority); - buf_set_next(data, int, this->modifying); - memcpy(data, this->condition, n); - return sizeof(size_t) + sizeof(int64_t) + 2 * sizeof(int) + n; + size_t n = (strlen(this->condition) + 1) * sizeof(char); + buf_set_next(data, int, INTERCEPTION_CONDITION_T_VERSION); + buf_set_next(data, size_t, this->header_hash); + buf_set_next(data, int64_t, this->priority); + buf_set_next(data, int, this->modifying); + memcpy(data, this->condition, n); + return sizeof(size_t) + sizeof(int64_t) + 2 * sizeof(int) + n; } @@ -61,20 +63,21 @@ size_t interception_condition_marshal(const interception_condition_t* restrict t * @param data In buffer with the marshalled data * @return Zero on error, `errno` will be set accordingly, otherwise the number of read bytes */ -size_t interception_condition_unmarshal(interception_condition_t* restrict this, char* restrict data) +size_t +interception_condition_unmarshal(interception_condition_t *restrict this, char *restrict data) { - size_t n; - this->condition = NULL; - /* buf_get_next(data, int, INTERCEPTION_CONDITION_T_VERSION); */ - buf_next(data, int, 1); - buf_get_next(data, size_t, this->header_hash); - buf_get_next(data, int64_t, this->priority); - buf_get_next(data, int, this->modifying); - n = strlen(data) + 1; - fail_if (xmemdup(this->condition, data, n, char)); - return sizeof(size_t) + sizeof(int64_t) + 2 * sizeof(int) + n * sizeof(char); - fail: - return 0; + size_t n; + this->condition = NULL; + /* buf_get_next(data, int, INTERCEPTION_CONDITION_T_VERSION); */ + buf_next(data, int, 1); + buf_get_next(data, size_t, this->header_hash); + buf_get_next(data, int64_t, this->priority); + buf_get_next(data, int, this->modifying); + n = strlen(data) + 1; + fail_if (xmemdup(this->condition, data, n, char)); + return sizeof(size_t) + sizeof(int64_t) + 2 * sizeof(int) + n * sizeof(char); +fail: + return 0; } @@ -84,14 +87,14 @@ size_t interception_condition_unmarshal(interception_condition_t* restrict this, * @param data In buffer with the marshalled data * @return The number of read bytes */ -size_t interception_condition_unmarshal_skip(char* restrict data) +size_t +interception_condition_unmarshal_skip(char *restrict data) { - size_t n = sizeof(size_t) + sizeof(int64_t) + 2 * sizeof(int); - buf_next(data, int, 1); - buf_next(data, size_t, 1); - buf_next(data, int64_t, 1); - buf_next(data, int, 1); - n += (strlen(data) + 1) * sizeof(char); - return n; + size_t n = sizeof(size_t) + sizeof(int64_t) + 2 * sizeof(int); + buf_next(data, int, 1); + buf_next(data, size_t, 1); + buf_next(data, int64_t, 1); + buf_next(data, int, 1); + n += (strlen(data) + 1) * sizeof(char); + return n; } - diff --git a/src/mds-server/interception-condition.h b/src/mds-server/interception-condition.h index 73919cc..e562f22 100644 --- a/src/mds-server/interception-condition.h +++ b/src/mds-server/interception-condition.h @@ -23,36 +23,36 @@ #include <stdint.h> -#define INTERCEPTION_CONDITION_T_VERSION 0 +#define INTERCEPTION_CONDITION_T_VERSION 0 /** * A condition for a message being intercepted by a client */ typedef struct interception_condition { - /** - * The header of messages to intercept, optionally with a value, - * empty (most not be NULL) for all messages. - */ - char* condition; - - /** - * The hash of the header of messages to intercept - */ - size_t header_hash; - - /** - * The interception priority. The client should be - * consistent with the priority for conditions that - * are not mutually exclusive. - */ - int64_t priority; - - /** - * Whether the messages may get modified by the client - */ - int modifying; - + /** + * The header of messages to intercept, optionally with a value, + * empty (most not be NULL) for all messages. + */ + char *condition; + + /** + * The hash of the header of messages to intercept + */ + size_t header_hash; + + /** + * The interception priority. The client should be + * consistent with the priority for conditions that + * are not mutually exclusive. + */ + int64_t priority; + + /** + * Whether the messages may get modified by the client + */ + int modifying; + } interception_condition_t; @@ -63,7 +63,7 @@ typedef struct interception_condition * @return The number of bytes to allocate to the output buffer */ __attribute__((pure, nonnull)) -size_t interception_condition_marshal_size(const interception_condition_t* restrict this); +size_t interception_condition_marshal_size(const interception_condition_t *restrict this); /** * Marshals an interception condition @@ -73,7 +73,7 @@ size_t interception_condition_marshal_size(const interception_condition_t* restr * @return The number of bytes that have been written (everything will be written) */ __attribute__((nonnull)) -size_t interception_condition_marshal(const interception_condition_t* restrict this, char* restrict data); +size_t interception_condition_marshal(const interception_condition_t *restrict this, char *restrict data); /** * Unmarshals an interception condition @@ -84,7 +84,7 @@ size_t interception_condition_marshal(const interception_condition_t* restrict t * number of read bytes. Destroy the interception condition on error. */ __attribute__((nonnull)) -size_t interception_condition_unmarshal(interception_condition_t* restrict this, char* restrict data); +size_t interception_condition_unmarshal(interception_condition_t *restrict this, char *restrict data); /** * Pretend to an interception condition @@ -93,8 +93,7 @@ size_t interception_condition_unmarshal(interception_condition_t* restrict this, * @return The number of read bytes */ __attribute__((pure, nonnull)) -size_t interception_condition_unmarshal_skip(char* restrict data); +size_t interception_condition_unmarshal_skip(char *restrict data); #endif - diff --git a/src/mds-server/interceptors.c b/src/mds-server/interceptors.c index fccb0fa..a39e0d6 100644 --- a/src/mds-server/interceptors.c +++ b/src/mds-server/interceptors.c @@ -38,28 +38,26 @@ * @param client The intercepting client * @param index The index of the condition */ -__attribute__((nonnull)) -static void remove_intercept_condition(client_t* client, size_t index) +static void __attribute__((nonnull)) +remove_intercept_condition(client_t *client, size_t index) { - interception_condition_t* conds = client->interception_conditions; - size_t n = client->interception_conditions_count; - - /* Remove the condition from the list. */ - free(conds[index].condition); - memmove(conds + index, conds + index + 1, --n - index); - client->interception_conditions_count--; - - /* Shrink the list. */ - if (client->interception_conditions_count == 0) - { - free(conds); - client->interception_conditions = NULL; - } - else - if (xrealloc(conds, n, interception_condition_t)) - xperror(*argv); - else - client->interception_conditions = conds; + interception_condition_t *conds = client->interception_conditions; + size_t n = client->interception_conditions_count; + + /* Remove the condition from the list. */ + free(conds[index].condition); + memmove(conds + index, conds + index + 1, --n - index); + client->interception_conditions_count--; + + /* Shrink the list. */ + if (!client->interception_conditions_count) { + free(conds); + client->interception_conditions = NULL; + } else if (xrealloc(conds, n, interception_condition_t)) { + xperror(*argv); + } else { + client->interception_conditions = conds; + } } @@ -72,106 +70,100 @@ static void remove_intercept_condition(client_t* client, size_t index) * @param modifying Whether the client may modify the messages * @param stop Whether the condition should be removed rather than added */ -void add_intercept_condition(client_t* client, char* condition, int64_t priority, int modifying, int stop) +void +add_intercept_condition(client_t *client, char *condition, int64_t priority, int modifying, int stop) { - size_t n = client->interception_conditions_count; - interception_condition_t* conds = client->interception_conditions; - ssize_t nonmodifying = -1; - char* header = condition; - char* colon = NULL; - char* value; - size_t hash; - size_t i; - - /* Split header and value apart. */ - if ((value = strchr(header, ':')) != NULL) - { - *value = '\0'; /* NUL-terminate header. */ - colon = value; /* End of header. */ - value += 2; /* Skip over delimiter. */ - } - - /* Calcuate header hash (comparison optimisation) */ - hash = string_hash(header); - /* Undo header–value splitting. */ - if (colon != NULL) - *colon = ':'; - - /* Remove of update condition of already registered, - also look for non-modifying condition to swap position - with for optimisation. */ - for (i = 0; i < n; i++) - { - if ((conds[i].header_hash != hash) || !strequals(conds[i].condition, condition)) - { - /* Look for the first non-modifying, this is a part of the - optimisation where we put all modifying conditions at the - beginning. */ - if ((nonmodifying < 0) && conds[i].modifying) - nonmodifying = (ssize_t)i; - continue; + size_t n = client->interception_conditions_count; + interception_condition_t *conds = client->interception_conditions; + ssize_t nonmodifying = -1; + char *header = condition; + char *colon = NULL; + char *value; + size_t hash, i; + interception_condition_t temp; + + /* Split header and value apart. */ + if ((value = strchr(header, ':'))) { + *value = '\0'; /* NUL-terminate header. */ + colon = value; /* End of header. */ + value += 2; /* Skip over delimiter. */ } - - if (stop) - remove_intercept_condition(client, i); - else - { - /* Update parameters. */ - conds[i].priority = priority; - conds[i].modifying = modifying; - - if (modifying && (nonmodifying >= 0)) - { - /* Optimisation: put conditions that are modifying - at the beginning. When a client is intercepting - we most know if any satisfying condition is - modifying. With this optimisation the first - satisfying condition will tell us if there is - any satisfying condition that is modifying. */ - interception_condition_t temp = conds[nonmodifying]; - conds[nonmodifying] = conds[i]; - conds[i] = temp; - } + + /* Calcuate header hash (comparison optimisation) */ + hash = string_hash(header); + /* Undo header–value splitting. */ + if (colon) + *colon = ':'; + + /* Remove of update condition of already registered, + also look for non-modifying condition to swap position + with for optimisation. */ + for (i = 0; i < n; i++) { + if ((conds[i].header_hash != hash) || !strequals(conds[i].condition, condition)) { + /* Look for the first non-modifying, this is a part of the + optimisation where we put all modifying conditions at the + beginning. */ + if (nonmodifying < 0 && conds[i].modifying) + nonmodifying = (ssize_t)i; + continue; + } + + if (stop) { + remove_intercept_condition(client, i); + } else { + /* Update parameters. */ + conds[i].priority = priority; + conds[i].modifying = modifying; + + if (modifying && nonmodifying >= 0) { + /* Optimisation: put conditions that are modifying + at the beginning. When a client is intercepting + we most know if any satisfying condition is + modifying. With this optimisation the first + satisfying condition will tell us if there is + any satisfying condition that is modifying. */ + temp = conds[nonmodifying]; + conds[nonmodifying] = conds[i]; + conds[i] = temp; + } + } + return; } - return; - } - - if (stop) - eprint("client tried to stop intercepting messages that it does not intercept."); - else - { - /* Duplicate condition string. */ - fail_if (xstrdup_nn(condition, condition)); - - /* Grow the interception condition list. */ - fail_if (xrealloc(conds, n + 1, interception_condition_t)); - client->interception_conditions = conds; - /* Store condition. */ - client->interception_conditions_count++; - conds[n].condition = condition; - conds[n].header_hash = hash; - conds[n].priority = priority; - conds[n].modifying = modifying; - - if (modifying && (nonmodifying >= 0)) - { - /* Optimisation: put conditions that are modifying - at the beginning. When a client is intercepting - we most know if any satisfying condition is - modifying. With this optimisation the first - satisfying condition will tell us if there is - any satisfying condition that is modifying. */ - interception_condition_t temp = conds[nonmodifying]; - conds[nonmodifying] = conds[n]; - conds[n] = temp; + + if (stop) { + eprint("client tried to stop intercepting messages that it does not intercept."); + } else { + /* Duplicate condition string. */ + fail_if (xstrdup_nn(condition, condition)); + + /* Grow the interception condition list. */ + fail_if (xrealloc(conds, n + 1, interception_condition_t)); + client->interception_conditions = conds; + /* Store condition. */ + client->interception_conditions_count++; + conds[n].condition = condition; + conds[n].header_hash = hash; + conds[n].priority = priority; + conds[n].modifying = modifying; + + if (modifying && nonmodifying >= 0) { + /* Optimisation: put conditions that are modifying + at the beginning. When a client is intercepting + we most know if any satisfying condition is + modifying. With this optimisation the first + satisfying condition will tell us if there is + any satisfying condition that is modifying. */ + temp = conds[nonmodifying]; + conds[nonmodifying] = conds[n]; + conds[n] = temp; + } } - } - return; - fail: - xperror(*argv); - free(condition); - return; + return; +fail: + xperror(*argv); + free(condition); + return; } @@ -185,18 +177,20 @@ void add_intercept_condition(client_t* client, char* condition, int64_t priority * @param count The number of accepted patterns * @return Evaluates to true if and only if a matching pattern was found */ -int is_condition_matching(interception_condition_t* cond, size_t* hashes, - char** keys, char** headers, size_t count) +int +is_condition_matching(interception_condition_t *cond, size_t *hashes, + char **keys, char **headers, size_t count) { - size_t i; - for (i = 0; i < count; i++) - if (*(cond->condition) == '\0') - return 1; - else if ((cond->header_hash == hashes[i]) && - (strequals(cond->condition, keys[i]) || - strequals(cond->condition, headers[i]))) - return 1; - return 0; + size_t i; + for (i = 0; i < count; i++) { + if (*cond->condition == '\0') + return 1; + else if ((cond->header_hash == hashes[i]) && + (strequals(cond->condition, keys[i]) || + strequals(cond->condition, headers[i]))) + return 1; + } + return 0; } @@ -211,33 +205,34 @@ int is_condition_matching(interception_condition_t* cond, size_t* hashes, * @param interception_out Storage slot for found interception * @return -1 on error, otherwise: evalutes to true iff a matching condition was found */ -int find_matching_condition(client_t* client, size_t* hashes, char** keys, char** headers, - size_t count, queued_interception_t* interception_out) +int +find_matching_condition(client_t *client, size_t *hashes, char **keys, char **headers, + size_t count, queued_interception_t *interception_out) { - pthread_mutex_t mutex = client->mutex; - interception_condition_t* conds = client->interception_conditions; - size_t n = 0, i; - - fail_if ((errno = pthread_mutex_lock(&(mutex)))); - - /* Look for a matching condition. */ - if (client->open) - n = client->interception_conditions_count; - for (i = 0; i < n; i++) - if (is_condition_matching(conds + i, hashes, keys, headers, count)) - { - /* Report matching condition. */ - interception_out->client = client; - interception_out->priority = conds[i].priority; - interception_out->modifying = conds[i].modifying; - break; - } - - pthread_mutex_unlock(&(mutex)); - - return i < n; - fail: - return -1; + pthread_mutex_t mutex = client->mutex; + interception_condition_t *conds = client->interception_conditions; + size_t n = 0, i; + + fail_if ((errno = pthread_mutex_lock(&(mutex)))); + + /* Look for a matching condition. */ + if (client->open) + n = client->interception_conditions_count; + for (i = 0; i < n; i++) { + if (is_condition_matching(conds + i, hashes, keys, headers, count)) { + /* Report matching condition. */ + interception_out->client = client; + interception_out->priority = conds[i].priority; + interception_out->modifying = conds[i].modifying; + break; + } + } + + pthread_mutex_unlock(&mutex); + + return i < n; +fail: + return -1; } @@ -252,44 +247,43 @@ int find_matching_condition(client_t* client, size_t* hashes, char** keys, char* * @param interceptions_count_out Slot at where to store the number of found interceptors * @return The found interceptors, `NULL` on error */ -queued_interception_t* get_interceptors(client_t* sender, size_t* hashes, char** keys, char** headers, - size_t count, size_t* interceptions_count_out) +queued_interception_t * +get_interceptors(client_t *sender, size_t *hashes, char **keys, char **headers, + size_t count, size_t *interceptions_count_out) { - queued_interception_t* interceptions = NULL; - size_t interceptions_count = 0, n = 0; - ssize_t node; - int saved_errno; - - /* Count clients. */ - foreach_linked_list_node (client_list, node) - n++; - - /* Allocate interceptor list. */ - fail_if (xmalloc(interceptions, n, queued_interception_t)); - - /* Search clients. */ - foreach_linked_list_node (client_list, node) - { - client_t* client = (client_t*)(void*)(client_list.values[node]); - - /* Look for and list a matching condition. */ - if (client->open && (client != sender)) - { - int r = find_matching_condition(client, hashes, keys, headers, count, - interceptions + interceptions_count); - fail_if (r == -1); - if (r) - /* List client of there was a matching condition. */ - interceptions_count++; + queued_interception_t *interceptions = NULL; + size_t interceptions_count = 0, n = 0; + ssize_t node; + int saved_errno, r; + client_t *client; + + /* Count clients. */ + foreach_linked_list_node (client_list, node) + n++; + + /* Allocate interceptor list. */ + fail_if (xmalloc(interceptions, n, queued_interception_t)); + + /* Search clients. */ + foreach_linked_list_node (client_list, node) { + client = (void *)(client_list.values[node]); + + /* Look for and list a matching condition. */ + if (client->open && (client != sender)) { + r = find_matching_condition(client, hashes, keys, headers, count, + interceptions + interceptions_count); + fail_if (r == -1); + if (r) + /* List client of there was a matching condition. */ + interceptions_count++; + } } - } - - *interceptions_count_out = interceptions_count; - return interceptions; - - fail: - saved_errno = errno; - free(interceptions); - return errno = saved_errno, NULL; -} + *interceptions_count_out = interceptions_count; + return interceptions; + +fail: + saved_errno = errno; + free(interceptions); + return errno = saved_errno, NULL; +} diff --git a/src/mds-server/interceptors.h b/src/mds-server/interceptors.h index d5cfc6f..e51d240 100644 --- a/src/mds-server/interceptors.h +++ b/src/mds-server/interceptors.h @@ -37,7 +37,7 @@ * @param stop Whether the condition should be removed rather than added */ __attribute__((nonnull)) -void add_intercept_condition(client_t* client, char* condition, int64_t priority, int modifying, int stop); +void add_intercept_condition(client_t *client, char *condition, int64_t priority, int modifying, int stop); /** @@ -51,8 +51,8 @@ void add_intercept_condition(client_t* client, char* condition, int64_t priority * @return Evaluates to true if and only if a matching pattern was found */ __attribute__((pure, nonnull(1))) -int is_condition_matching(interception_condition_t* cond, size_t* hashes, - char** keys, char** headers, size_t count); +int is_condition_matching(interception_condition_t *cond, size_t *hashes, + char **keys, char **headers, size_t count); /** @@ -67,8 +67,8 @@ int is_condition_matching(interception_condition_t* cond, size_t* hashes, * @return -1 on error, otherwise: evalutes to true iff a matching condition was found */ __attribute__((pure, nonnull(1, 6))) -int find_matching_condition(client_t* client, size_t* hashes, char** keys, char** headers, - size_t count, queued_interception_t* interception_out); +int find_matching_condition(client_t *client, size_t *hashes, char **keys, char **headers, + size_t count, queued_interception_t *interception_out); /** @@ -83,8 +83,7 @@ int find_matching_condition(client_t* client, size_t* hashes, char** keys, char* * @return The found interceptors, `NULL` on error */ __attribute__((pure, nonnull(1, 6))) -queued_interception_t* get_interceptors(client_t* sender, size_t* hashes, char** keys, char** headers, - size_t count, size_t* interceptions_count_out); +queued_interception_t *get_interceptors(client_t *sender, size_t *hashes, char **keys, char **headers, + size_t count, size_t *interceptions_count_out); #endif - diff --git a/src/mds-server/mds-server.c b/src/mds-server/mds-server.c index cfedf82..bb0e26c 100644 --- a/src/mds-server/mds-server.c +++ b/src/mds-server/mds-server.c @@ -51,29 +51,28 @@ * * This tells the server-base how to behave */ -server_characteristics_t server_characteristics = - { - .require_privileges = 0, - .require_display = 0, /* We will service one ourself. */ - .require_respawn_info = 1, - .sanity_check_argc = 1, - .fork_for_safety = 0, - .danger_is_deadly = 0 - }; - - - -#define __free(I) \ - if (I > 0) pthread_mutex_destroy(&slave_mutex); \ - if (I > 1) pthread_cond_destroy(&slave_cond); \ - if (I > 2) pthread_mutex_destroy(&modify_mutex); \ - if (I > 3) pthread_cond_destroy(&modify_cond); \ - if (I >= 4) hash_table_destroy(&modify_map, NULL, NULL); \ - if (I >= 5) fd_table_destroy(&client_map, NULL, NULL); \ - if (I >= 6) linked_list_destroy(&client_list) - -#define error_if(I, CONDITION) \ - if (CONDITION) { xperror(*argv); __free(I); return 1; } +server_characteristics_t server_characteristics = { + .require_privileges = 0, + .require_display = 0, /* We will service one ourself. */ + .require_respawn_info = 1, + .sanity_check_argc = 1, + .fork_for_safety = 0, + .danger_is_deadly = 0 +}; + + + +#define __free(I)\ + if (I > 0) pthread_mutex_destroy(&slave_mutex);\ + if (I > 1) pthread_cond_destroy(&slave_cond);\ + if (I > 2) pthread_mutex_destroy(&modify_mutex);\ + if (I > 3) pthread_cond_destroy(&modify_cond);\ + if (I >= 4) hash_table_destroy(&modify_map, NULL, NULL);\ + if (I >= 5) fd_table_destroy(&client_map, NULL, NULL);\ + if (I >= 6) linked_list_destroy(&client_list) + +#define error_if(I, CONDITION)\ + if (CONDITION) { xperror(*argv); __free(I); return 1; } /** @@ -82,74 +81,71 @@ server_characteristics_t server_characteristics = * * @return Non-zero on error */ -int preinitialise_server(void) +int +preinitialise_server(void) { - int unparsed_args_ptr = 1; - char* unparsed_args[ARGC_LIMIT + LIBEXEC_ARGC_EXTRA_LIMIT + 1]; - int i; - + int unparsed_args_ptr = 1; + char *unparsed_args[ARGC_LIMIT + LIBEXEC_ARGC_EXTRA_LIMIT + 1]; + char *arg; + int i; + pid_t pid; + #if (LIBEXEC_ARGC_EXTRA_LIMIT < 3) # error LIBEXEC_ARGC_EXTRA_LIMIT is too small, need at least 3. #endif - - - /* Parse command line arguments. */ - for (i = 1; i < argc; i++) - { - char* arg = argv[i]; - if (startswith(arg, "--socket-fd=")) /* Socket file descriptor. */ - { - exit_if (socket_fd != -1, - eprintf("duplicate declaration of %s.", "--socket-fd");); - exit_if (strict_atoi(arg += strlen("--socket-fd="), &socket_fd, 0, INT_MAX) < 0, - eprintf("invalid value for %s: %s.", "--socket-fd", arg);); + + + /* Parse command line arguments. */ + for (i = 1; i < argc; i++) { + arg = argv[i]; + if (startswith(arg, "--socket-fd=")) { /* Socket file descriptor. */ + exit_if (socket_fd != -1, + eprintf("duplicate declaration of %s.", "--socket-fd");); + exit_if (strict_atoi(arg += strlen("--socket-fd="), &socket_fd, 0, INT_MAX) < 0, + eprintf("invalid value for %s: %s.", "--socket-fd", arg);); + } else if (startswith(arg, "--alarm=")) { /* Schedule an alarm signal for forced abort. */ + alarm((unsigned)min(atou(arg + strlen("--alarm=")), 60)); /* At most 1 minute. */ + } else if (!strequals(arg, "--initial-spawn") && !strequals(arg, "--respawn")) { + /* Not recognised, it is probably for another server. */ + unparsed_args[unparsed_args_ptr++] = arg; + } } - else if (startswith(arg, "--alarm=")) /* Schedule an alarm signal for forced abort. */ - alarm((unsigned)min(atou(arg + strlen("--alarm=")), 60)); /* At most 1 minute. */ - else - if (!strequals(arg, "--initial-spawn") && !strequals(arg, "--respawn")) - /* Not recognised, it is probably for another server. */ - unparsed_args[unparsed_args_ptr++] = arg; - } - unparsed_args[unparsed_args_ptr] = NULL; - - /* Check that mandatory arguments have been specified. */ - exit_if (socket_fd < 0, - eprint("missing socket file descriptor argument.");); - - - /* Run mdsinitrc. */ - if (is_respawn == 0) - { - pid_t pid = fork(); - fail_if (pid == (pid_t)-1); - - if (pid == 0) /* Child process exec:s, the parent continues without waiting for it. */ - { - /* Close all files except stdin, stdout and stderr. */ - close_files((fd > 2) || (fd == socket_fd)); - - /* Run mdsinitrc. */ - run_initrc(unparsed_args); /* Does not return. */ + unparsed_args[unparsed_args_ptr] = NULL; + + /* Check that mandatory arguments have been specified. */ + exit_if (socket_fd < 0, eprint("missing socket file descriptor argument.");); + + + /* Run mdsinitrc. */ + if (!is_respawn) { + pid = fork(); + fail_if (pid == (pid_t)-1); + + if (!pid) { /* Child process exec:s, the parent continues without waiting for it. */ + /* Close all files except stdin, stdout and stderr. */ + close_files(fd > 2 || fd == socket_fd); + + /* Run mdsinitrc. */ + run_initrc(unparsed_args); /* Does not return. */ + } } - } - - - /* Create mutex and condition for slave counter. */ - error_if (0, (errno = pthread_mutex_init(&slave_mutex, NULL))); - error_if (1, (errno = pthread_cond_init(&slave_cond, NULL))); - - /* Create mutex, condition and map for message modification. */ - error_if (2, (errno = pthread_mutex_init(&modify_mutex, NULL))); - error_if (3, (errno = pthread_cond_init(&modify_cond, NULL))); - error_if (4, hash_table_create(&modify_map)); - - - return 0; - - fail: - xperror(*argv); - return 1; + + + /* Create mutex and condition for slave counter. */ + error_if (0, (errno = pthread_mutex_init(&slave_mutex, NULL))); + error_if (1, (errno = pthread_cond_init(&slave_cond, NULL))); + + /* Create mutex, condition and map for message modification. */ + error_if (2, (errno = pthread_mutex_init(&modify_mutex, NULL))); + error_if (3, (errno = pthread_cond_init(&modify_cond, NULL))); + error_if (4, hash_table_create(&modify_map)); + + + return 0; + +fail: + xperror(*argv); + return 1; } @@ -159,13 +155,14 @@ int preinitialise_server(void) * * @return Non-zero on error */ -int initialise_server(void) +int +initialise_server(void) { - /* Create list and table of clients. */ - error_if (5, fd_table_create(&client_map)); - error_if (6, linked_list_create(&client_list, 32)); - - return 0; + /* Create list and table of clients. */ + error_if (5, fd_table_create(&client_map)); + error_if (6, linked_list_create(&client_list, 32)); + + return 0; } @@ -175,10 +172,11 @@ int initialise_server(void) * * @return Non-zero on error */ -int __attribute__((const)) postinitialise_server(void) +int __attribute__((const)) +postinitialise_server(void) { - /* We do not need to initialise anything else. */ - return 0; + /* We do not need to initialise anything else. */ + return 0; } @@ -187,33 +185,31 @@ int __attribute__((const)) postinitialise_server(void) * * @return Non-zero on error */ -int master_loop(void) +int +master_loop(void) { - /* Accepting incoming connections and take care of dangers. */ - while (running && (terminating == 0)) - { - if (danger) - { - danger = 0; - with_mutex (slave_mutex, linked_list_pack(&client_list);); + /* Accepting incoming connections and take care of dangers. */ + while (running && !terminating) { + if (danger) { + danger = 0; + with_mutex (slave_mutex, linked_list_pack(&client_list);); + } + + if (accept_connection() == 1) + break; } - - if (accept_connection() == 1) - break; - } - - /* Join with all slaves threads. */ - with_mutex (slave_mutex, - while (running_slaves > 0) - pthread_cond_wait(&slave_cond, &slave_mutex);); - - if (reexecing == 0) - { - /* Release resources. */ - __free(9999); - } - - return 0; + + /* Join with all slaves threads. */ + with_mutex (slave_mutex, + while (running_slaves > 0) + pthread_cond_wait(&slave_cond, &slave_mutex);); + + if (!reexecing) { + /* Release resources. */ + __free(9999); + } + + return 0; } @@ -226,29 +222,29 @@ int master_loop(void) * * @return Zero normally, 1 if terminating */ -int accept_connection(void) +int +accept_connection(void) { - pthread_t slave_thread; - int client_fd; - - /* Accept connection. */ - client_fd = accept(socket_fd, NULL, NULL); - if (client_fd >= 0) - { - /* Increase number of running slaves. */ - with_mutex (slave_mutex, running_slaves++;); - - /* Start slave thread. */ - create_slave(&slave_thread, client_fd); - } - else - /* Handle errors and shutdown. */ - if ((errno == EINTR) && terminating) /* Interrupted for termination. */ - return 1; - else if ((errno == ECONNABORTED) || (errno == EINVAL)) /* Closing. */ - running = 0; - else if (errno != EINTR) /* Error. */ - xperror(*argv); + pthread_t slave_thread; + int client_fd; + + /* Accept connection. */ + client_fd = accept(socket_fd, NULL, NULL); + if (client_fd >= 0) { + /* Increase number of running slaves. */ + with_mutex (slave_mutex, running_slaves++;); + + /* Start slave thread. */ + create_slave(&slave_thread, client_fd); + } else { + /* Handle errors and shutdown. */ + if (errno == EINTR && terminating) /* Interrupted for termination. */ + return 1; + else if (errno == ECONNABORTED || errno == EINVAL) /* Closing. */ + running = 0; + else if (errno != EINTR) /* Error. */ + xperror(*argv); + } return 0; } @@ -259,108 +255,105 @@ int accept_connection(void) * @param data Input data * @return Output data */ -void* slave_loop(void* data) +void * +slave_loop(void *data) { - int slave_fd = (int)(intptr_t)data; - size_t information_address = fd_table_get(&client_map, (size_t)slave_fd); - client_t* information = (client_t*)(void*)information_address; - char* msgbuf = NULL; - char buf[] = "To: all"; - size_t n; - int r; - - - if (information == NULL) /* Did not re-exec. */ - { - /* Initialise the client. */ - fail_if ((information = initialise_client(slave_fd)) == NULL); - - /* Register client to receive broadcasts. */ - add_intercept_condition(information, buf, 0, 0, 0); - } - - /* Store slave thread and create mutexes and conditions. */ - fail_if (client_initialise_threading(information)); - - /* Set up traps for especially handled signals. */ - fail_if (trap_signals() < 0); - - - /* Fetch messages from the slave. */ - while ((terminating == 0) && information->open) - { - /* Send queued multicast messages. */ - send_multicast_queue(information); - - /* Send queued messages. */ - send_reply_queue(information); - - - /* Fetch message. */ - r = fetch_message(information); - if ((r == 0) && message_received(information)) - goto terminate; - else if (r == -2) + int slave_fd = (int)(intptr_t)data; + size_t information_address = fd_table_get(&client_map, (size_t)slave_fd); + client_t *information = (void *)information_address; + char *msgbuf = NULL; + char buf[] = "To: all"; + size_t n; + int r; + + + if (!information) { /* Did not re-exec. */ + /* Initialise the client. */ + fail_if (!(information = initialise_client(slave_fd))); + + /* Register client to receive broadcasts. */ + add_intercept_condition(information, buf, 0, 0, 0); + } + + /* Store slave thread and create mutexes and conditions. */ + fail_if (client_initialise_threading(information)); + + /* Set up traps for especially handled signals. */ + fail_if (trap_signals() < 0); + + + /* Fetch messages from the slave. */ + while (!terminating && information->open) { + /* Send queued multicast messages. */ + send_multicast_queue(information); + + /* Send queued messages. */ + send_reply_queue(information); + + /* Fetch message. */ + r = fetch_message(information); + if (!r && message_received(information)) + goto terminate; + else if (r == -2) + goto done; + else if (r && errno == EINTR && terminating) + goto terminate; /* Stop the thread if we are re-exec:ing or terminating the server. */ + } + /* Stop the thread if we are re-exec:ing or terminating the server. */ + if (terminating) + goto terminate; + + + /* Multicast information about the client closing. */ + n = 2 * 10 + 1 + strlen("Client closed: :\n\n"); + fail_if (xmalloc(msgbuf, n, char)); + snprintf(msgbuf, n, + "Client closed: %" PRIu32 ":%" PRIu32 "\n" + "\n", + (uint32_t)(information->id >> 32), + (uint32_t)(information->id >> 0)); + n = strlen(msgbuf); + queue_message_multicast(msgbuf, n, information); + msgbuf = NULL; + send_multicast_queue(information); + + +terminate: /* This done on success as well. */ + if (reexecing) + goto reexec; + +done: + /* Close socket and free resources. */ + xclose(slave_fd); + free(msgbuf); + if (information) { + /* Unlist and free client. */ + with_mutex (slave_mutex, linked_list_remove(&client_list, information->list_entry);); + client_destroy(information); + } + + /* Unmap client and decrease the slave count. */ + with_mutex (slave_mutex, + fd_table_remove(&client_map, slave_fd); + running_slaves--; + pthread_cond_signal(&slave_cond);); + return NULL; + + +fail: + xperror(*argv); goto done; - else if (r && (errno == EINTR) && terminating) - goto terminate; /* Stop the thread if we are re-exec:ing or terminating the server. */ - } - /* Stop the thread if we are re-exec:ing or terminating the server. */ - if (terminating) - goto terminate; - - - /* Multicast information about the client closing. */ - n = 2 * 10 + 1 + strlen("Client closed: :\n\n"); - fail_if (xmalloc(msgbuf, n, char)); - snprintf(msgbuf, n, - "Client closed: %" PRIu32 ":%" PRIu32 "\n" - "\n", - (uint32_t)(information->id >> 32), - (uint32_t)(information->id >> 0)); - n = strlen(msgbuf); - queue_message_multicast(msgbuf, n, information); - msgbuf = NULL; - send_multicast_queue(information); - - - terminate: /* This done on success as well. */ - if (reexecing) - goto reexec; - - done: - /* Close socket and free resources. */ - xclose(slave_fd); - free(msgbuf); - if (information != NULL) - { - /* Unlist and free client. */ - with_mutex (slave_mutex, linked_list_remove(&client_list, information->list_entry);); - client_destroy(information); - } - - /* Unmap client and decrease the slave count. */ - with_mutex (slave_mutex, - fd_table_remove(&client_map, slave_fd); - running_slaves--; - pthread_cond_signal(&slave_cond);); - return NULL; - - - fail: - xperror(*argv); - goto done; - - - reexec: - /* Tell the master thread that the slave has closed, - this is done because re-exec causes a race-condition - between the acception of a slave and the execution - of the the slave thread. */ - with_mutex (slave_mutex, - running_slaves--; - pthread_cond_signal(&slave_cond);); - return NULL; + + +reexec: + /* Tell the master thread that the slave has closed, + this is done because re-exec causes a race-condition + between the acception of a slave and the execution + of the the slave thread. */ + with_mutex (slave_mutex, + running_slaves--; + pthread_cond_signal(&slave_cond);); + return NULL; } @@ -371,13 +364,12 @@ void* slave_loop(void* data) * @param b:const queued_interception_t* The other of the two interceptors * @return Negative if a before b, positive if a after b, otherwise zero */ -__attribute__((nonnull)) -static int cmp_queued_interception(const void* a, const void* b) +static int __attribute__((nonnull)) +cmp_queued_interception(const void *a, const void *b) { - const queued_interception_t* p = b; /* Highest first, so swap them. */ - const queued_interception_t* q = a; - return p->priority < q->priority ? -1 : - p->priority > q->priority ? 1 : 0; + const queued_interception_t *p = b; /* Highest first, so swap them. */ + const queued_interception_t *q = a; + return p->priority < q->priority ? -1 : p->priority > q->priority; } @@ -388,127 +380,127 @@ static int cmp_queued_interception(const void* a, const void* b) * @param length The length of the message * @param sender The original sender of the message */ -void queue_message_multicast(char* message, size_t length, client_t* sender) +void +queue_message_multicast(char *message, size_t length, client_t *sender) { - char* msg = message; - size_t header_count = 0; - size_t n = length - 1; - size_t* hashes = NULL; - char** headers = NULL; - char** header_values = NULL; - queued_interception_t* interceptions = NULL; - size_t interceptions_count = 0; - multicast_t* multicast = NULL; - size_t i; - uint64_t modify_id; - char modify_id_header[13 + 3 * sizeof(uint64_t)]; - void* new_buf; - int saved_errno; - - /* Count the number of headers. */ - for (i = 0; i < n; i++) - if (message[i] == '\n') - if (header_count++, message[i + 1] == '\n') - break; - - if (header_count == 0) - return; /* Invalid message. */ - - /* Allocate multicast message. */ - fail_if (xmalloc(multicast, 1, multicast_t)); - multicast_initialise(multicast); - - /* Allocate header lists. */ - fail_if (xmalloc(hashes, header_count, size_t)); - fail_if (xmalloc(headers, header_count, char*)); - fail_if (xmalloc(header_values, header_count, char*)); - - /* Populate header lists. */ - for (i = 0; i < header_count; i++) - { - char* end = strchr(msg, '\n'); - char* colon = strchr(msg, ':'); - - *end = '\0'; - if (xstrdup(header_values[i], msg)) - { - header_count = i; - fail_if (1); - } - *colon = '\0'; - if (xstrdup(headers[i], msg)) - { - saved_errno = errno, free(headers[i]), errno = saved_errno; - header_count = i; - fail_if (1); - } - *colon = ':'; - *end = '\n'; - hashes[i] = string_hash(headers[i]); - - msg = end + 1; - } - - /* Get intercepting clients. */ - pthread_mutex_lock(&(slave_mutex)); - interceptions = get_interceptors(sender, hashes, headers, header_values, header_count, &interceptions_count); - pthread_mutex_unlock(&(slave_mutex)); - fail_if (interceptions == NULL); - - /* Sort interceptors. */ - qsort(interceptions, interceptions_count, sizeof(queued_interception_t), cmp_queued_interception); - - /* Add prefix to message with ‘Modify ID’ header. */ - with_mutex (slave_mutex, - modify_id = next_modify_id++; - if (next_modify_id == 0) - next_modify_id = 1; - ); - xsnprintf(modify_id_header, "Modify ID: %" PRIu64 "\n", modify_id); - n = strlen(modify_id_header); - new_buf = message; - fail_if (xrealloc(new_buf, n + length, char)); - message = new_buf; - memmove(message + n, message, length * sizeof(char)); - memcpy(message, modify_id_header, n * sizeof(char)); - - /* Store information. */ - multicast->interceptions = interceptions; - multicast->interceptions_count = interceptions_count; - multicast->message = message; - multicast->message_length = length + n; - multicast->message_prefix = n; - message = NULL; - -#define fail fail_in_mutex - /* Queue message multicasting. */ - with_mutex (sender->mutex, - new_buf = sender->multicasts; - fail_if (xrealloc(new_buf, sender->multicasts_count + 1, multicast_t)); - sender->multicasts = new_buf; - sender->multicasts[sender->multicasts_count++] = *multicast; - free(multicast); - multicast = NULL; - errno = 0; - fail_in_mutex: - xperror(*argv); - ); + char *msg = message; + size_t header_count = 0; + size_t n = length - 1; + size_t *hashes = NULL; + char **headers = NULL; + char **header_values = NULL; + queued_interception_t *interceptions = NULL; + size_t interceptions_count = 0; + multicast_t *multicast = NULL; + size_t i; + uint64_t modify_id; + char modify_id_header[13 + 3 * sizeof(uint64_t)]; + void *new_buf; + int saved_errno; + char *end, *colon; + + /* Count the number of headers. */ + for (i = 0; i < n; i++) + if (message[i] == '\n') + if (header_count++, message[i + 1] == '\n') + break; + + if (!header_count) + return; /* Invalid message. */ + + /* Allocate multicast message. */ + fail_if (xmalloc(multicast, 1, multicast_t)); + multicast_initialise(multicast); + + /* Allocate header lists. */ + fail_if (xmalloc(hashes, header_count, size_t)); + fail_if (xmalloc(headers, header_count, char *)); + fail_if (xmalloc(header_values, header_count, char *)); + + /* Populate header lists. */ + for (i = 0; i < header_count; i++) + { + end = strchr(msg, '\n'); + colon = strchr(msg, ':'); + + *end = '\0'; + if (xstrdup(header_values[i], msg)) { + header_count = i; + fail_if (1); + } + *colon = '\0'; + if (xstrdup(headers[i], msg)) { + saved_errno = errno, free(headers[i]), errno = saved_errno; + header_count = i; + fail_if (1); + } + *colon = ':'; + *end = '\n'; + hashes[i] = string_hash(headers[i]); + + msg = end + 1; + } + + /* Get intercepting clients. */ + pthread_mutex_lock(&(slave_mutex)); + interceptions = get_interceptors(sender, hashes, headers, header_values, header_count, &interceptions_count); + pthread_mutex_unlock(&(slave_mutex)); + fail_if (!interceptions); + + /* Sort interceptors. */ + qsort(interceptions, interceptions_count, sizeof(queued_interception_t), cmp_queued_interception); + + /* Add prefix to message with ‘Modify ID’ header. */ + with_mutex (slave_mutex, + modify_id = next_modify_id++; + if (!next_modify_id) + next_modify_id = 1; + ); + xsnprintf(modify_id_header, "Modify ID: %" PRIu64 "\n", modify_id); + n = strlen(modify_id_header); + new_buf = message; + fail_if (xrealloc(new_buf, n + length, char)); + message = new_buf; + memmove(message + n, message, length * sizeof(char)); + memcpy(message, modify_id_header, n * sizeof(char)); + + /* Store information. */ + multicast->interceptions = interceptions; + multicast->interceptions_count = interceptions_count; + multicast->message = message; + multicast->message_length = length + n; + multicast->message_prefix = n; + message = NULL; + +#define fail fail_in_mutex + /* Queue message multicasting. */ + with_mutex (sender->mutex, + new_buf = sender->multicasts; + fail_if (xrealloc(new_buf, sender->multicasts_count + 1, multicast_t)); + sender->multicasts = new_buf; + sender->multicasts[sender->multicasts_count++] = *multicast; + free(multicast); + multicast = NULL; + errno = 0; + fail_in_mutex: + xperror(*argv); + ); #undef fail - - done: - /* Release resources. */ - xfree(headers, header_count); - xfree(header_values, header_count); - free(hashes); - free(message); - if (multicast != NULL) - multicast_destroy(multicast); - free(multicast); - return; - - fail: - xperror(*argv); - goto done; + +done: + /* Release resources. */ + xfree(headers, header_count); + xfree(header_values, header_count); + free(hashes); + free(message); + if (multicast) + multicast_destroy(multicast); + free(multicast); + return; + +fail: + xperror(*argv); + goto done; } @@ -517,70 +509,59 @@ void queue_message_multicast(char* message, size_t length, client_t* sender) * * @param args The arguments to the child process */ -void run_initrc(char** args) +void +run_initrc(char **args) { - char pathname[PATH_MAX]; - struct passwd* pwd; - char* env; - char* home; - args[0] = pathname; - -#define __exec(FORMAT, ...) \ - xsnprintf(pathname, FORMAT, __VA_ARGS__); execv(args[0], args) - - /* Test $XDG_CONFIG_HOME. */ - if ((env = getenv_nonempty("XDG_CONFIG_HOME")) != NULL) - { - __exec("%s/%s", env, INITRC_FILE); - } - - /* Test $HOME. */ - if ((env = getenv_nonempty("HOME")) != NULL) - { - __exec("%s/.config/%s", env, INITRC_FILE); - __exec("%s/.%s", env, INITRC_FILE); - } - - /* Test ~. */ - pwd = getpwuid(getuid()); /* Ignore error. */ - if (pwd != NULL) - { - home = pwd->pw_dir; - if ((home != NULL) && (*home != '\0')) - { - __exec("%s/.config/%s", home, INITRC_FILE); - __exec("%s/.%s", home, INITRC_FILE); + char pathname[PATH_MAX]; + struct passwd *pwd; + char *env, *home, *begin, *end; + int len; + + args[0] = pathname; + +#define __exec(FORMAT, ...)\ + (xsnprintf(pathname, FORMAT, __VA_ARGS__), execv(args[0], args)) + + /* Test $XDG_CONFIG_HOME. */ + if ((env = getenv_nonempty("XDG_CONFIG_HOME"))) + __exec("%s/%s", env, INITRC_FILE); + + /* Test $HOME. */ + if ((env = getenv_nonempty("HOME"))) { + __exec("%s/.config/%s", env, INITRC_FILE); + __exec("%s/.%s", env, INITRC_FILE); } - } - - /* Test $XDG_CONFIG_DIRS. */ - if ((env = getenv_nonempty("XDG_CONFIG_DIRS")) != NULL) - { - char* begin = env; - char* end; - int len; - for (;;) - { - end = strchrnul(begin, ':'); - len = (int)(end - begin); - if (len > 0) - { - __exec("%.*s/%s", len, begin, INITRC_FILE); - } - if (*end == '\0') - break; - begin = end + 1; + + /* Test ~. */ + pwd = getpwuid(getuid()); /* Ignore error. */ + if (pwd) { + home = pwd->pw_dir; + if (home && *home) { + __exec("%s/.config/%s", home, INITRC_FILE); + __exec("%s/.%s", home, INITRC_FILE); + } + } + + /* Test $XDG_CONFIG_DIRS. */ + if ((env = getenv_nonempty("XDG_CONFIG_DIRS"))) { + for (begin = env;;) { + end = strchrnul(begin, ':'); + len = (int)(end - begin); + if (len > 0) + __exec("%.*s/%s", len, begin, INITRC_FILE); + if (!*end) + break; + begin = end + 1; + } } - } - - /* Test /etc. */ - __exec("%s/%s", SYSCONFDIR, INITRC_FILE); + + /* Test /etc. */ + __exec("%s/%s", SYSCONFDIR, INITRC_FILE); #undef __exec - - /* Everything failed. */ - eprintf("unable to run %s file, you might as well kill me.", INITRC_FILE); - /* (‘me’ actually refers to the parant, whence it will to be coming.) */ - exit(0); -} + /* Everything failed. */ + eprintf("unable to run %s file, you might as well kill me.", INITRC_FILE); + /* (‘me’ actually refers to the parant, whence it will to be coming.) */ + exit(0); +} diff --git a/src/mds-server/mds-server.h b/src/mds-server/mds-server.h index 44c9acc..50c18ac 100644 --- a/src/mds-server/mds-server.h +++ b/src/mds-server/mds-server.h @@ -37,7 +37,7 @@ int accept_connection(void); * @param data Input data * @return Output data */ -void* slave_loop(void* data); +void *slave_loop(void *data); /** * Queue a message for multicasting @@ -47,7 +47,7 @@ void* slave_loop(void* data); * @param sender The original sender of the message */ __attribute__((nonnull)) -void queue_message_multicast(char* message, size_t length, client_t* sender); +void queue_message_multicast(char *message, size_t length, client_t *sender); /** * Exec into the mdsinitrc script @@ -55,8 +55,7 @@ void queue_message_multicast(char* message, size_t length, client_t* sender); * @param args The arguments to the child process */ __attribute__((noreturn, nonnull)) -void run_initrc(char** args); +void run_initrc(char **args); #endif - diff --git a/src/mds-server/multicast.c b/src/mds-server/multicast.c index d30530f..a71ddf4 100644 --- a/src/mds-server/multicast.c +++ b/src/mds-server/multicast.c @@ -30,15 +30,16 @@ * * @param this The message multicast state */ -void multicast_initialise(multicast_t* restrict this) +void +multicast_initialise(multicast_t *restrict this) { - this->interceptions = NULL; - this->interceptions_count = 0; - this->interceptions_ptr = 0; - this->message = NULL; - this->message_length = 0; - this->message_ptr = 0; - this->message_prefix = 0; + this->interceptions = NULL; + this->interceptions_count = 0; + this->interceptions_ptr = 0; + this->message = NULL; + this->message_length = 0; + this->message_ptr = 0; + this->message_prefix = 0; } @@ -47,10 +48,11 @@ void multicast_initialise(multicast_t* restrict this) * * @param this The message multicast state */ -void multicast_destroy(multicast_t* restrict this) +void +multicast_destroy(multicast_t *restrict this) { - free(this->interceptions); - free(this->message); + free(this->interceptions); + free(this->message); } @@ -60,13 +62,13 @@ void multicast_destroy(multicast_t* restrict this) * @param this The client information * @return The number of bytes to allocate to the output buffer */ -size_t multicast_marshal_size(const multicast_t* restrict this) +size_t +multicast_marshal_size(const multicast_t *restrict this) { - size_t rc = sizeof(int) + 5 * sizeof(size_t) + this->message_length * sizeof(char); - size_t i; - for (i = 0; i < this->interceptions_count; i++) - rc += queued_interception_marshal_size(); - return rc; + size_t i, rc = sizeof(int) + 5 * sizeof(size_t) + this->message_length * sizeof(char); + for (i = 0; i < this->interceptions_count; i++) + rc += queued_interception_marshal_size(); + return rc; } @@ -77,28 +79,26 @@ size_t multicast_marshal_size(const multicast_t* restrict this) * @param data Output buffer for the marshalled data * @return The number of bytes that have been written (everything will be written) */ -size_t multicast_marshal(const multicast_t* restrict this, char* restrict data) +size_t +multicast_marshal(const multicast_t *restrict this, char *restrict data) { - size_t rc = sizeof(int) + 5 * sizeof(size_t); - size_t i, n; - buf_set_next(data, int, MULTICAST_T_VERSION); - buf_set_next(data, size_t, this->interceptions_count); - buf_set_next(data, size_t, this->interceptions_ptr); - buf_set_next(data, size_t, this->message_length); - buf_set_next(data, size_t, this->message_ptr); - buf_set_next(data, size_t, this->message_prefix); - for (i = 0; i < this->interceptions_count; i++) - { - n = queued_interception_marshal(this->interceptions + i, data); - data += n / sizeof(char); - rc += n; - } - if (this->message_length > 0) - { - memcpy(data, this->message, this->message_length * sizeof(char)); - rc += this->message_length * sizeof(char); - } - return rc; + size_t i, n, rc = sizeof(int) + 5 * sizeof(size_t); + buf_set_next(data, int, MULTICAST_T_VERSION); + buf_set_next(data, size_t, this->interceptions_count); + buf_set_next(data, size_t, this->interceptions_ptr); + buf_set_next(data, size_t, this->message_length); + buf_set_next(data, size_t, this->message_ptr); + buf_set_next(data, size_t, this->message_prefix); + for (i = 0; i < this->interceptions_count; i++) { + n = queued_interception_marshal(this->interceptions + i, data); + data += n / sizeof(char); + rc += n; + } + if (this->message_length > 0) { + memcpy(data, this->message, this->message_length * sizeof(char)); + rc += this->message_length * sizeof(char); + } + return rc; } @@ -110,35 +110,33 @@ size_t multicast_marshal(const multicast_t* restrict this, char* restrict data) * @return Zero on error, `errno` will be set accordingly, otherwise the * number of read bytes. Destroy the client information on error. */ -size_t multicast_unmarshal(multicast_t* restrict this, char* restrict data) +size_t +multicast_unmarshal(multicast_t *restrict this, char *restrict data) { - size_t rc = sizeof(int) + 5 * sizeof(size_t); - size_t i, n; - this->interceptions = NULL; - this->message = NULL; - /* buf_get_next(data, int, MULTICAST_T_VERSION); */ - buf_next(data, int, 1); - buf_get_next(data, size_t, this->interceptions_count); - buf_get_next(data, size_t, this->interceptions_ptr); - buf_get_next(data, size_t, this->message_length); - buf_get_next(data, size_t, this->message_ptr); - buf_get_next(data, size_t, this->message_prefix); - if (this->interceptions_count > 0) - fail_if (xmalloc(this->interceptions, this->interceptions_count, queued_interception_t)); - for (i = 0; i < this->interceptions_count; i++) - { - n = queued_interception_unmarshal(this->interceptions + i, data); - data += n / sizeof(char); - rc += n; - } - if (this->message_length > 0) - { - fail_if (xmemdup(this->message, data, this->message_length, char)); - rc += this->message_length * sizeof(char); - } - return rc; - fail: - return 0; + size_t i, n, rc = sizeof(int) + 5 * sizeof(size_t); + this->interceptions = NULL; + this->message = NULL; + /* buf_get_next(data, int, MULTICAST_T_VERSION); */ + buf_next(data, int, 1); + buf_get_next(data, size_t, this->interceptions_count); + buf_get_next(data, size_t, this->interceptions_ptr); + buf_get_next(data, size_t, this->message_length); + buf_get_next(data, size_t, this->message_ptr); + buf_get_next(data, size_t, this->message_prefix); + if (this->interceptions_count > 0) + fail_if (xmalloc(this->interceptions, this->interceptions_count, queued_interception_t)); + for (i = 0; i < this->interceptions_count; i++) { + n = queued_interception_unmarshal(this->interceptions + i, data); + data += n / sizeof(char); + rc += n; + } + if (this->message_length > 0) { + fail_if (xmemdup(this->message, data, this->message_length, char)); + rc += this->message_length * sizeof(char); + } + return rc; +fail: + return 0; } @@ -148,18 +146,16 @@ size_t multicast_unmarshal(multicast_t* restrict this, char* restrict data) * @param data In buffer with the marshalled data * @return The number of read bytes */ -size_t multicast_unmarshal_skip(char* restrict data) +size_t +multicast_unmarshal_skip(char *restrict data) { - size_t interceptions_count = buf_cast(data, size_t, 0); - size_t message_length = buf_cast(data, size_t, 2); - size_t rc = sizeof(int) + 5 * sizeof(size_t) + message_length * sizeof(char); - size_t n; - while (interceptions_count--) - { - n = queued_interception_unmarshal_skip(); - data += n / sizeof(char); - rc += n; - } - return rc; + size_t interceptions_count = buf_cast(data, size_t, 0); + size_t message_length = buf_cast(data, size_t, 2); + size_t n, rc = sizeof(int) + 5 * sizeof(size_t) + message_length * sizeof(char); + while (interceptions_count--) { + n = queued_interception_unmarshal_skip(); + data += n / sizeof(char); + rc += n; + } + return rc; } - diff --git a/src/mds-server/multicast.h b/src/mds-server/multicast.h index 0cd20e1..24dc704 100644 --- a/src/mds-server/multicast.h +++ b/src/mds-server/multicast.h @@ -22,48 +22,48 @@ #include "queued-interception.h" -#define MULTICAST_T_VERSION 0 +#define MULTICAST_T_VERSION 0 /** * Message multicast state */ typedef struct multicast { - /** - * Queue of clients that is listening this message - */ - struct queued_interception* interceptions; - - /** - * The number of clients in `interceptions` - */ - size_t interceptions_count; - - /** - * The index of the current/next client in `interceptions` to whom to send the message - */ - size_t interceptions_ptr; - - /** - * The message to send - */ - char* message; - - /** - * The length of `message` - */ - size_t message_length; - - /** - * How much of the message that has already been sent to the current recipient - */ - size_t message_ptr; - - /** - * How much of the message to skip if the recipient is not a modifier - */ - size_t message_prefix; - + /** + * Queue of clients that is listening this message + */ + struct queued_interception *interceptions; + + /** + * The number of clients in `interceptions` + */ + size_t interceptions_count; + + /** + * The index of the current/next client in `interceptions` to whom to send the message + */ + size_t interceptions_ptr; + + /** + * The message to send + */ + char *message; + + /** + * The length of `message` + */ + size_t message_length; + + /** + * How much of the message that has already been sent to the current recipient + */ + size_t message_ptr; + + /** + * How much of the message to skip if the recipient is not a modifier + */ + size_t message_prefix; + } multicast_t; @@ -73,7 +73,7 @@ typedef struct multicast * @param this The message multicast state */ __attribute__((nonnull)) -void multicast_initialise(multicast_t* restrict this); +void multicast_initialise(multicast_t *restrict this); /** * Destroy a message multicast state @@ -81,7 +81,7 @@ void multicast_initialise(multicast_t* restrict this); * @param this The message multicast state */ __attribute__((nonnull)) -void multicast_destroy(multicast_t* restrict this); +void multicast_destroy(multicast_t *restrict this); /** * Calculate the buffer size need to marshal a message multicast state @@ -90,7 +90,7 @@ void multicast_destroy(multicast_t* restrict this); * @return The number of bytes to allocate to the output buffer */ __attribute__((pure, nonnull)) -size_t multicast_marshal_size(const multicast_t* restrict this); +size_t multicast_marshal_size(const multicast_t *restrict this); /** * Marshals a message multicast state @@ -100,7 +100,7 @@ size_t multicast_marshal_size(const multicast_t* restrict this); * @return The number of bytes that have been written (everything will be written) */ __attribute__((nonnull)) -size_t multicast_marshal(const multicast_t* restrict this, char* restrict data); +size_t multicast_marshal(const multicast_t *restrict this, char *restrict data); /** * Unmarshals a message multicast state @@ -111,7 +111,7 @@ size_t multicast_marshal(const multicast_t* restrict this, char* restrict data); * number of read bytes. Destroy the message multicast state on error. */ __attribute__((nonnull)) -size_t multicast_unmarshal(multicast_t* restrict this, char* restrict data); +size_t multicast_unmarshal(multicast_t *restrict this, char *restrict data); /** * Pretend to unmarshal a message multicast state @@ -120,9 +120,8 @@ size_t multicast_unmarshal(multicast_t* restrict this, char* restrict data); * @return The number of read bytes */ __attribute__((pure, nonnull)) -size_t multicast_unmarshal_skip(char* restrict data); +size_t multicast_unmarshal_skip(char *restrict data); #endif - diff --git a/src/mds-server/queued-interception.c b/src/mds-server/queued-interception.c index ff00339..b74eb11 100644 --- a/src/mds-server/queued-interception.c +++ b/src/mds-server/queued-interception.c @@ -26,9 +26,10 @@ * @param this The client information * @return The number of bytes to allocate to the output buffer */ -size_t queued_interception_marshal_size(void) +size_t +queued_interception_marshal_size(void) { - return sizeof(int64_t) + 3 * sizeof(int); + return sizeof(int64_t) + 3 * sizeof(int); } @@ -39,13 +40,14 @@ size_t queued_interception_marshal_size(void) * @param data Output buffer for the marshalled data * @return The number of bytes that have been written (everything will be written) */ -size_t queued_interception_marshal(const queued_interception_t* restrict this, char* restrict data) +size_t +queued_interception_marshal(const queued_interception_t *restrict this, char *restrict data) { - buf_set_next(data, int, QUEUED_INTERCEPTION_T_VERSION); - buf_set_next(data, int64_t, this->priority); - buf_set_next(data, int, this->modifying); - buf_set_next(data, int, this->client->socket_fd); - return queued_interception_marshal_size(); + buf_set_next(data, int, QUEUED_INTERCEPTION_T_VERSION); + buf_set_next(data, int64_t, this->priority); + buf_set_next(data, int, this->modifying); + buf_set_next(data, int, this->client->socket_fd); + return queued_interception_marshal_size(); } @@ -56,15 +58,16 @@ size_t queued_interception_marshal(const queued_interception_t* restrict this, c * @param data In buffer with the marshalled data * @return Zero on error, `errno` will be set accordingly, otherwise the number of read bytes. */ -size_t queued_interception_unmarshal(queued_interception_t* restrict this, char* restrict data) +size_t +queued_interception_unmarshal(queued_interception_t *restrict this, char *restrict data) { - this->client = NULL; - /* buf_get_next(data, int, QUEUED_INTERCEPTION_T_VERSION); */ - buf_next(data, int, 1); - buf_get_next(data, int64_t, this->priority); - buf_get_next(data, int, this->modifying); - buf_get_next(data, int, this->socket_fd); - return queued_interception_marshal_size(); + this->client = NULL; + /* buf_get_next(data, int, QUEUED_INTERCEPTION_T_VERSION); */ + buf_next(data, int, 1); + buf_get_next(data, int64_t, this->priority); + buf_get_next(data, int, this->modifying); + buf_get_next(data, int, this->socket_fd); + return queued_interception_marshal_size(); } @@ -74,8 +77,8 @@ size_t queued_interception_unmarshal(queued_interception_t* restrict this, char* * @param data In buffer with the marshalled data * @return The number of read bytes */ -size_t queued_interception_unmarshal_skip(void) +size_t +queued_interception_unmarshal_skip(void) { - return queued_interception_marshal_size(); + return queued_interception_marshal_size(); } - diff --git a/src/mds-server/queued-interception.h b/src/mds-server/queued-interception.h index 5a17579..56f9d62 100644 --- a/src/mds-server/queued-interception.h +++ b/src/mds-server/queued-interception.h @@ -24,33 +24,33 @@ #include <stdint.h> -#define QUEUED_INTERCEPTION_T_VERSION 0 +#define QUEUED_INTERCEPTION_T_VERSION 0 /** * A queued interception */ typedef struct queued_interception { - /** - * The intercepting client - */ - struct client* client; - - /** - * The interception priority - */ - int64_t priority; - - /** - * Whether the messages may get modified by the client - */ - int modifying; - - /** - * The file descriptor of the intercepting client's socket (used for unmarshalling) - */ - int socket_fd; - + /** + * The intercepting client + */ + struct client *client; + + /** + * The interception priority + */ + int64_t priority; + + /** + * Whether the messages may get modified by the client + */ + int modifying; + + /** + * The file descriptor of the intercepting client's socket (used for unmarshalling) + */ + int socket_fd; + } queued_interception_t; @@ -71,7 +71,7 @@ size_t queued_interception_marshal_size(void); * @return The number of bytes that have been written (everything will be written) */ __attribute__((nonnull)) -size_t queued_interception_marshal(const queued_interception_t* restrict this, char* restrict data); +size_t queued_interception_marshal(const queued_interception_t *restrict this, char *restrict data); /** * Unmarshals a queued interception @@ -81,7 +81,7 @@ size_t queued_interception_marshal(const queued_interception_t* restrict this, c * @return Zero on error, `errno` will be set accordingly, otherwise the number of read bytes. */ __attribute__((nonnull)) -size_t queued_interception_unmarshal(queued_interception_t* restrict this, char* restrict data); +size_t queued_interception_unmarshal(queued_interception_t *restrict this, char *restrict data); /** * Pretend to unmarshal a queued interception @@ -94,4 +94,3 @@ size_t queued_interception_unmarshal_skip(void); #endif - diff --git a/src/mds-server/receiving.c b/src/mds-server/receiving.c index b93f638..09684ae 100644 --- a/src/mds-server/receiving.c +++ b/src/mds-server/receiving.c @@ -40,7 +40,7 @@ * @param sender The original sender of the message */ __attribute__((nonnull)) -void queue_message_multicast(char* message, size_t length, client_t* sender); +void queue_message_multicast(char *message, size_t length, client_t *sender); /** @@ -51,55 +51,50 @@ void queue_message_multicast(char* message, size_t length, client_t* sender); * @param modify_id The modify ID of the message * @return Normally zero, but 1 if exited because of re-exec or termination */ -__attribute__((nonnull)) -static int modifying_notify(client_t* client, mds_message_t message, uint64_t modify_id) +static int __attribute__((nonnull)) +modifying_notify(client_t *client, mds_message_t message, uint64_t modify_id) { - /* pthread_cond_timedwait is required to handle re-exec and termination because - pthread_cond_timedwait and pthread_cond_wait ignore interruptions via signals. */ - struct timespec timeout = - { - .tv_sec = 1, - .tv_nsec = 0 - }; - size_t address; - client_t* recipient; - mds_message_t* multicast; - size_t i; - - pthread_mutex_lock(&(modify_mutex)); - while (hash_table_contains_key(&modify_map, (size_t)modify_id) == 0) - { - if (terminating) - { - pthread_mutex_unlock(&(modify_mutex)); - return 1; + /* pthread_cond_timedwait is required to handle re-exec and termination because + pthread_cond_timedwait and pthread_cond_wait ignore interruptions via signals. */ + struct timespec timeout = { + .tv_sec = 1, + .tv_nsec = 0 + }; + size_t address; + client_t *recipient; + mds_message_t *multicast; + size_t i; + + pthread_mutex_lock(&(modify_mutex)); + while (!hash_table_contains_key(&modify_map, (size_t)modify_id)) { + if (terminating) { + pthread_mutex_unlock(&(modify_mutex)); + return 1; + } + pthread_cond_timedwait(&slave_cond, &slave_mutex, &timeout); } - pthread_cond_timedwait(&slave_cond, &slave_mutex, &timeout); - } - address = hash_table_get(&modify_map, (size_t)modify_id); - recipient = (client_t*)(void*)address; - fail_if (xmalloc(multicast = recipient->modify_message, 1, mds_message_t)); - mds_message_zero_initialise(multicast); - fail_if (xmemdup(multicast->payload, message.payload, message.payload_size, char)); - fail_if (xmalloc(multicast->headers, message.header_count, char*)); - for (i = 0; i < message.header_count; i++, multicast->header_count++) - fail_if (xstrdup(multicast->headers[i], message.headers[i])); - done: - pthread_mutex_unlock(&(modify_mutex)); - with_mutex (client->modify_mutex, pthread_cond_signal(&(client->modify_cond));); - - return 0; - - - fail: - xperror(*argv); - if (multicast != NULL) - { - mds_message_destroy(multicast); - free(multicast); - recipient->modify_message = NULL; - } - goto done; + address = hash_table_get(&modify_map, (size_t)modify_id); + recipient = (void *)address; + fail_if (xmalloc(multicast = recipient->modify_message, 1, mds_message_t)); + mds_message_zero_initialise(multicast); + fail_if (xmemdup(multicast->payload, message.payload, message.payload_size, char)); + fail_if (xmalloc(multicast->headers, message.header_count, char*)); + for (i = 0; i < message.header_count; i++, multicast->header_count++) + fail_if (xstrdup(multicast->headers[i], message.headers[i])); +done: + pthread_mutex_unlock(&(modify_mutex)); + with_mutex (client->modify_mutex, pthread_cond_signal(&(client->modify_cond));); + + return 0; + +fail: + xperror(*argv); + if (multicast) { + mds_message_destroy(multicast); + free(multicast); + recipient->modify_message = NULL; + } + goto done; } @@ -112,61 +107,56 @@ static int modifying_notify(client_t* client, mds_message_t message, uint64_t mo * @param stop Whether to stop listening rather than start or reconfigure * @return Zero on success, -1 on error */ -__attribute__((nonnull)) -static int add_intercept_conditions_from_message(client_t* client, int modifying, int64_t priority, int stop) +static int __attribute__((nonnull)) +add_intercept_conditions_from_message(client_t *client, int modifying, int64_t priority, int stop) { - int saved_errno; - char* payload = client->message.payload; - size_t payload_size = client->message.payload_size; - size_t size = 64; - char* buf; - - fail_if (xmalloc(buf, size + 1, char)); - - /* All messages. */ - if (client->message.payload_size == 0) - { - *buf = '\0'; - add_intercept_condition(client, buf, priority, modifying, stop); - goto done; - } - - /* Filtered messages. */ - for (;;) - { - char* end = memchr(payload, '\n', payload_size); - size_t len = end == NULL ? payload_size : (size_t)(end - payload); - if (len == 0) - { - payload++; - payload_size--; - break; + int saved_errno; + char *payload = client->message.payload; + size_t payload_size = client->message.payload_size; + size_t size = 64, len; + char *buf, *end, *old_buf; + + fail_if (xmalloc(buf, size + 1, char)); + + /* All messages. */ + if (!client->message.payload_size) { + *buf = '\0'; + add_intercept_condition(client, buf, priority, modifying, stop); + goto done; } - if (len > size) - { - char* old_buf = buf; - if (xrealloc(buf, (size <<= 1) + 1, char)) - { - saved_errno = errno; - free(old_buf); - pthread_mutex_unlock(&(client->mutex)); - fail_if (errno = saved_errno, 1); - } + + /* Filtered messages. */ + for (;;) { + end = memchr(payload, '\n', payload_size); + len = !end ? payload_size : (size_t)(end - payload); + if (len == 0) { + payload++; + payload_size--; + break; + } + if (len > size) { + old_buf = buf; + if (xrealloc(buf, (size <<= 1) + 1, char)) { + saved_errno = errno; + free(old_buf); + pthread_mutex_unlock(&(client->mutex)); + fail_if (errno = saved_errno, 1); + } + } + memcpy(buf, payload, len); + buf[len] = '\0'; + add_intercept_condition(client, buf, priority, modifying, stop); + if (!end) + break; + payload = end + 1; + payload_size -= len + 1; } - memcpy(buf, payload, len); - buf[len] = '\0'; - add_intercept_condition(client, buf, priority, modifying, stop); - if (end == NULL) - break; - payload = end + 1; - payload_size -= len + 1; - } - - done: - free(buf); - return 0; - fail: - return -1; + +done: + free(buf); + return 0; +fail: + return -1; } @@ -177,60 +167,58 @@ static int add_intercept_conditions_from_message(client_t* client, int modifying * @param message_id The message ID of the ID request * @return Zero on success, -1 on error */ -__attribute__((nonnull(1))) -static int assign_and_send_id(client_t* client, const char* message_id) +static int __attribute__((nonnull(1))) +assign_and_send_id(client_t *client, const char *message_id) { - char* msgbuf = NULL; - char* msgbuf_; - size_t n; - int rc = -1; - - /* Construct response. */ - n = 2 * 10 + strlen(message_id); - n += sizeof("ID assignment: :\nIn response to: \n\n") / sizeof(char); - fail_if (xmalloc(msgbuf, n, char)); - snprintf(msgbuf, n, - "ID assignment: %" PRIu32 ":%" PRIu32 "\n" - "In response to: %s\n" - "\n", - (uint32_t)(client->id >> 32), - (uint32_t)(client->id >> 0), - message_id == NULL ? "" : message_id); - n = strlen(msgbuf); - - /* Multicast the reply. */ - fail_if (xstrdup(msgbuf_, msgbuf)); - queue_message_multicast(msgbuf_, n, client); - - /* Queue message to be sent when this function returns. - This done to simplify `multicast_message` for re-exec and termination. */ -#define fail fail_in_mutex - with_mutex (client->mutex, - if (client->send_pending_size == 0) - { - /* Set the pending message. */ - client->send_pending = msgbuf; - client->send_pending_size = n; - } - else - { - /* Concatenate message to already pending messages. */ - size_t new_len = client->send_pending_size + n; - char* msg_new = client->send_pending; - fail_if (xrealloc(msg_new, new_len, char)); - memcpy(msg_new + client->send_pending_size, msgbuf, n * sizeof(char)); - client->send_pending = msg_new; - client->send_pending_size = new_len; - } - (msgbuf = NULL, rc = 0, errno = 0); - fail_in_mutex: - ); + char *msgbuf = NULL; + char *msgbuf_; + char *msg_new; + size_t n, new_len; + int rc = -1; + + /* Construct response. */ + n = 2 * 10 + strlen(message_id); + n += sizeof("ID assignment: :\nIn response to: \n\n") / sizeof(char); + fail_if (xmalloc(msgbuf, n, char)); + snprintf(msgbuf, n, + "ID assignment: %" PRIu32 ":%" PRIu32 "\n" + "In response to: %s\n" + "\n", + (uint32_t)(client->id >> 32), + (uint32_t)(client->id >> 0), + !message_id ? "" : message_id); + n = strlen(msgbuf); + + /* Multicast the reply. */ + fail_if (xstrdup(msgbuf_, msgbuf)); + queue_message_multicast(msgbuf_, n, client); + + /* Queue message to be sent when this function returns. + This done to simplify `multicast_message` for re-exec and termination. */ +#define fail fail_in_mutex + with_mutex (client->mutex, + if (!client->send_pending_size) { + /* Set the pending message. */ + client->send_pending = msgbuf; + client->send_pending_size = n; + } else { + /* Concatenate message to already pending messages. */ + new_len = client->send_pending_size + n; + msg_new = client->send_pending; + fail_if (xrealloc(msg_new, new_len, char)); + memcpy(msg_new + client->send_pending_size, msgbuf, n * sizeof(char)); + client->send_pending = msg_new; + client->send_pending_size = new_len; + } + (msgbuf = NULL, rc = 0, errno = 0); + fail_in_mutex: + ); #undef fail - fail: /* Also success. */ - xperror(*argv); - free(msgbuf); - return rc; +fail: /* Also success. */ + xperror(*argv); + free(msgbuf); + return rc; } @@ -241,95 +229,92 @@ static int assign_and_send_id(client_t* client, const char* message_id) * @param client The client whom sent the message * @return Normally zero, but 1 if exited because of re-exec or termination */ -int message_received(client_t* client) +int +message_received(client_t *client) { - mds_message_t message = client->message; - int assign_id = 0; - int modifying = 0; - int intercept = 0; - int64_t priority = 0; - int stop = 0; - const char* message_id = NULL; - uint64_t modify_id = 0; - char* msgbuf = NULL; - size_t i, n; - - - /* Parser headers. */ - for (i = 0; i < message.header_count; i++) - { - const char* h = message.headers[i]; - if (strequals(h, "Command: assign-id")) assign_id = 1; - else if (strequals(h, "Command: intercept")) intercept = 1; - else if (strequals(h, "Modifying: yes")) modifying = 1; - else if (strequals(h, "Stop: yes")) stop = 1; - else if (startswith(h, "Message ID: ")) message_id = strstr(h, ": ") + 2; - else if (startswith(h, "Priority: ")) priority = ato64(strstr(h, ": ") + 2); - else if (startswith(h, "Modify ID: ")) modify_id = atou64(strstr(h, ": ") + 2); - } - - - /* Notify waiting client about a received message modification. */ - if (modifying) - return modifying_notify(client, message, modify_id); - /* Do nothing more, not not even multicast this message. */ - - - if (message_id == NULL) - { - eprint("received message without a message ID, ignoring."); - return 0; - } - - /* Assign ID if not already assigned. */ - if (assign_id && (client->id == 0)) - { - intercept |= 2; - with_mutex_if (slave_mutex, (client->id = next_client_id++) == 0, - eprint("this is impossible, ID counter has overflowed."); - /* If the program ran for a millennium it would - take c:a 585 assignments per nanosecond. This - cannot possibly happen. (It would require serious - dedication by generations of ponies (or just an alicorn) - to maintain the process and transfer it new hardware.) */ - abort(); - ); - } - - /* Make the client listen for messages addressed to it. */ - if (intercept) - { - pthread_mutex_lock(&(client->mutex)); - if ((intercept & 1)) /* from payload */ - fail_if (add_intercept_conditions_from_message(client, modifying, priority, stop) < 0); - if ((intercept & 2)) /* "To: $(client->id)" */ - { - char buf[26]; - xsnprintf(buf, "To: %" PRIu32 ":%" PRIu32, - (uint32_t)(client->id >> 32), - (uint32_t)(client->id >> 0)); - add_intercept_condition(client, buf, priority, modifying, 0); + mds_message_t message = client->message; + int assign_id = 0; + int modifying = 0; + int intercept = 0; + int64_t priority = 0; + int stop = 0; + const char *message_id = NULL; + uint64_t modify_id = 0; + char *msgbuf = NULL; + size_t i, n; + const char *h; + char buf[26]; + + + /* Parser headers. */ + for (i = 0; i < message.header_count; i++) { + h = message.headers[i]; + if (strequals(h, "Command: assign-id")) assign_id = 1; + else if (strequals(h, "Command: intercept")) intercept = 1; + else if (strequals(h, "Modifying: yes")) modifying = 1; + else if (strequals(h, "Stop: yes")) stop = 1; + else if (startswith(h, "Message ID: ")) message_id = strstr(h, ": ") + 2; + else if (startswith(h, "Priority: ")) priority = ato64(strstr(h, ": ") + 2); + else if (startswith(h, "Modify ID: ")) modify_id = atou64(strstr(h, ": ") + 2); } - pthread_mutex_unlock(&(client->mutex)); - } - - - /* Multicast the message. */ - n = mds_message_compose_size(&message); - fail_if (xbmalloc(msgbuf, n)); - mds_message_compose(&message, msgbuf); - queue_message_multicast(msgbuf, n / sizeof(char), client); - msgbuf = NULL; - - - /* Send asigned ID. */ - if (assign_id) - fail_if (assign_and_send_id(client, message_id) < 0); - - return 0; - - fail: - xperror(*argv); - free(msgbuf); - return 0; + + + /* Notify waiting client about a received message modification. */ + if (modifying) + return modifying_notify(client, message, modify_id); + /* Do nothing more, not not even multicast this message. */ + + + if (!message_id) { + eprint("received message without a message ID, ignoring."); + return 0; + } + + /* Assign ID if not already assigned. */ + if (assign_id && !client->id) { + intercept |= 2; + with_mutex_if (slave_mutex, !(client->id = next_client_id++), + eprint("this is impossible, ID counter has overflowed."); + /* If the program ran for a millennium it would + take c:a 585 assignments per nanosecond. This + cannot possibly happen. (It would require serious + dedication by generations of ponies (or just an alicorn) + to maintain the process and transfer it new hardware.) */ + abort(); + ); + } + + /* Make the client listen for messages addressed to it. */ + if (intercept) { + pthread_mutex_lock(&(client->mutex)); + if ((intercept & 1)) /* from payload */ + fail_if (add_intercept_conditions_from_message(client, modifying, priority, stop) < 0); + if ((intercept & 2)) { /* "To: $(client->id)" */ + xsnprintf(buf, "To: %" PRIu32 ":%" PRIu32, + (uint32_t)(client->id >> 32), + (uint32_t)(client->id >> 0)); + add_intercept_condition(client, buf, priority, modifying, 0); + } + pthread_mutex_unlock(&(client->mutex)); + } + + + /* Multicast the message. */ + n = mds_message_compose_size(&message); + fail_if (xbmalloc(msgbuf, n)); + mds_message_compose(&message, msgbuf); + queue_message_multicast(msgbuf, n / sizeof(char), client); + msgbuf = NULL; + + + /* Send asigned ID. */ + if (assign_id) + fail_if (assign_and_send_id(client, message_id) < 0); + + return 0; + +fail: + xperror(*argv); + free(msgbuf); + return 0; } diff --git a/src/mds-server/receiving.h b/src/mds-server/receiving.h index 612c94b..cc684b0 100644 --- a/src/mds-server/receiving.h +++ b/src/mds-server/receiving.h @@ -30,8 +30,7 @@ * @return Normally zero, but 1 if exited because of re-exec or termination */ __attribute__((nonnull)) -int message_received(client_t* client); +int message_received(client_t *client); #endif - diff --git a/src/mds-server/reexec.c b/src/mds-server/reexec.c index bac066a..a2e0620 100644 --- a/src/mds-server/reexec.c +++ b/src/mds-server/reexec.c @@ -49,26 +49,26 @@ * * @return The number of bytes that will be stored by `marshal_server` */ -size_t marshal_server_size(void) +size_t +marshal_server_size(void) { - size_t list_size = linked_list_marshal_size(&client_list); - size_t map_size = fd_table_marshal_size(&client_map); - size_t list_elements = 0; - size_t state_n = 0; - ssize_t node; - - /* Calculate the grand size of all client information. */ - foreach_linked_list_node (client_list, node) - { - state_n += client_marshal_size((client_t*)(void*)(client_list.values[node])); - list_elements++; - } - - /* Add the size of the rest of the program's state. */ - state_n += sizeof(int) + sizeof(sig_atomic_t) + 2 * sizeof(uint64_t) + 2 * sizeof(size_t); - state_n += list_elements * sizeof(size_t) + list_size + map_size; - - return state_n; + size_t list_size = linked_list_marshal_size(&client_list); + size_t map_size = fd_table_marshal_size(&client_map); + size_t list_elements = 0; + size_t state_n = 0; + ssize_t node; + + /* Calculate the grand size of all client information. */ + foreach_linked_list_node (client_list, node) { + state_n += client_marshal_size((void *)(client_list.values[node])); + list_elements++; + } + + /* Add the size of the rest of the program's state. */ + state_n += sizeof(int) + sizeof(sig_atomic_t) + 2 * sizeof(uint64_t) + 2 * sizeof(size_t); + state_n += list_elements * sizeof(size_t) + list_size + map_size; + + return state_n; } @@ -78,68 +78,69 @@ size_t marshal_server_size(void) * @param state_buf The buffer for the marshalled data * @return Non-zero on error */ -int marshal_server(char* state_buf) +int +marshal_server(char *state_buf) { - size_t list_size = linked_list_marshal_size(&client_list); - size_t list_elements = 0; - ssize_t node; - - - /* Release resources. */ - pthread_mutex_destroy(&slave_mutex); - pthread_cond_destroy(&slave_cond); - pthread_mutex_destroy(&modify_mutex); - pthread_cond_destroy(&modify_cond); - hash_table_destroy(&modify_map, NULL, NULL); - - - /* Count the number of clients that online. */ - foreach_linked_list_node (client_list, node) - list_elements++; - - /* Tell the new version of the program what version of the program it is marshalling. */ - buf_set_next(state_buf, int, MDS_SERVER_VARS_VERSION); - - /* Marshal the miscellaneous state data. */ - buf_set_next(state_buf, sig_atomic_t, running); - buf_set_next(state_buf, uint64_t, next_client_id); - buf_set_next(state_buf, uint64_t, next_modify_id); - - /* Tell the program how large the marshalled client list is and how any clients are marshalled. */ - buf_set_next(state_buf, size_t, list_size); - buf_set_next(state_buf, size_t, list_elements); - - /* Marshal the clients. */ - foreach_linked_list_node (client_list, node) - { - /* Get the memory address of the client. */ - size_t value_address = client_list.values[node]; - /* Get the client's information. */ - client_t* value = (client_t*)(void*)value_address; - - /* Marshal the address, it is used the the client list and the client map, that will be marshalled. */ - buf_set_next(state_buf, size_t, value_address); - /* Marshal the client informationation. */ - state_buf += client_marshal(value, state_buf) / sizeof(char); - } - - /* Marshal the client list. */ - linked_list_marshal(&client_list, state_buf); - state_buf += list_size / sizeof(char); - /* Marshal the client map. */ - fd_table_marshal(&client_map, state_buf); - - - /* Release resources. */ - foreach_linked_list_node (client_list, node) - { - client_t* client = (client_t*)(void*)(client_list.values[node]); - client_destroy(client); - } - fd_table_destroy(&client_map, NULL, NULL); - linked_list_destroy(&client_list); - - return 0; + size_t list_size = linked_list_marshal_size(&client_list); + size_t list_elements = 0; + ssize_t node; + size_t value_address; + client_t *value, *client; + + + /* Release resources. */ + pthread_mutex_destroy(&slave_mutex); + pthread_cond_destroy(&slave_cond); + pthread_mutex_destroy(&modify_mutex); + pthread_cond_destroy(&modify_cond); + hash_table_destroy(&modify_map, NULL, NULL); + + + /* Count the number of clients that online. */ + foreach_linked_list_node (client_list, node) + list_elements++; + + /* Tell the new version of the program what version of the program it is marshalling. */ + buf_set_next(state_buf, int, MDS_SERVER_VARS_VERSION); + + /* Marshal the miscellaneous state data. */ + buf_set_next(state_buf, sig_atomic_t, running); + buf_set_next(state_buf, uint64_t, next_client_id); + buf_set_next(state_buf, uint64_t, next_modify_id); + + /* Tell the program how large the marshalled client list is and how any clients are marshalled. */ + buf_set_next(state_buf, size_t, list_size); + buf_set_next(state_buf, size_t, list_elements); + + /* Marshal the clients. */ + foreach_linked_list_node (client_list, node) { + /* Get the memory address of the client. */ + value_address = client_list.values[node]; + /* Get the client's information. */ + value = (void *)value_address; + + /* Marshal the address, it is used the the client list and the client map, that will be marshalled. */ + buf_set_next(state_buf, size_t, value_address); + /* Marshal the client informationation. */ + state_buf += client_marshal(value, state_buf) / sizeof(char); + } + + /* Marshal the client list. */ + linked_list_marshal(&client_list, state_buf); + state_buf += list_size / sizeof(char); + /* Marshal the client map. */ + fd_table_marshal(&client_map, state_buf); + + + /* Release resources. */ + foreach_linked_list_node (client_list, node) { + client = (void *)(client_list.values[node]); + client_destroy(client); + } + fd_table_destroy(&client_map, NULL, NULL); + linked_list_destroy(&client_list); + + return 0; } @@ -154,9 +155,10 @@ static hash_table_t unmarshal_remap_map; * @param old The old address * @return The new address */ -static size_t unmarshal_remapper(size_t old) +static size_t +unmarshal_remapper(size_t old) { - return hash_table_get(&unmarshal_remap_map, old); + return hash_table_get(&unmarshal_remap_map, old); } @@ -170,131 +172,126 @@ static size_t unmarshal_remapper(size_t old) * @param state_buf The marshalled data that as not been read already * @return Non-zero on error */ -int unmarshal_server(char* state_buf) +int +unmarshal_server(char *state_buf) { - int with_error = 0; - size_t list_size; - size_t list_elements; - size_t i; - ssize_t node; - pthread_t slave_thread; - -#define fail soft_fail + int with_error = 0; + size_t list_size; + size_t list_elements; + size_t i; + ssize_t node; + pthread_t slave_thread; + size_t n, value_address, new_address; + client_t *value, *client; + int slave_fd; - /* Create memory address remapping table. */ - fail_if (hash_table_create(&unmarshal_remap_map)); +#define fail soft_fail + + /* Create memory address remapping table. */ + fail_if (hash_table_create(&unmarshal_remap_map)); #undef fail -#define fail clients_fail - - /* Get the marshal protocal version. Not needed, there is only the one version right now. */ - /* buf_get(state_buf, int, 0, MDS_SERVER_VARS_VERSION); */ - buf_next(state_buf, int, 1); +#define fail clients_fail - /* Unmarshal the miscellaneous state data. */ - buf_get_next(state_buf, sig_atomic_t, running); - buf_get_next(state_buf, uint64_t, next_client_id); - buf_get_next(state_buf, uint64_t, next_modify_id); - - /* Get the marshalled size of the client list and how any clients that are marshalled. */ - buf_get_next(state_buf, size_t, list_size); - buf_get_next(state_buf, size_t, list_elements); - - /* Unmarshal the clients. */ - for (i = 0; i < list_elements; i++) - { - size_t n; - size_t value_address; - client_t* value; - - /* Allocate the client's information. */ - fail_if (xmalloc(value, 1, client_t)); - - /* Unmarshal the address, it is used the the client list and the client map, that are also marshalled. */ - buf_get_next(state_buf, size_t, value_address); - /* Unmarshal the client information. */ - fail_if (n = client_unmarshal(value, state_buf), n == 0); - - /* Populate the remapping table. */ - if (hash_table_put(&unmarshal_remap_map, value_address, (size_t)(void*)value) == 0) - fail_if (errno); - - /* Delayed seeking. */ - state_buf += n / sizeof(char); - - - /* On error, seek past all clients. */ - continue; - clients_fail: - xperror(*argv); - with_error = 1; - if (value != NULL) - { - buf_prev(state_buf, size_t, 1); - free(value); + /* Get the marshal protocal version. Not needed, there is only the one version right now. */ + /* buf_get(state_buf, int, 0, MDS_SERVER_VARS_VERSION); */ + buf_next(state_buf, int, 1); + + /* Unmarshal the miscellaneous state data. */ + buf_get_next(state_buf, sig_atomic_t, running); + buf_get_next(state_buf, uint64_t, next_client_id); + buf_get_next(state_buf, uint64_t, next_modify_id); + + /* Get the marshalled size of the client list and how any clients that are marshalled. */ + buf_get_next(state_buf, size_t, list_size); + buf_get_next(state_buf, size_t, list_elements); + + /* Unmarshal the clients. */ + for (i = 0; i < list_elements; i++) { + /* Allocate the client's information. */ + fail_if (xmalloc(value, 1, client_t)); + + /* Unmarshal the address, it is used the the client list and the client map, that are also marshalled. */ + buf_get_next(state_buf, size_t, value_address); + /* Unmarshal the client information. */ + fail_if (n = client_unmarshal(value, state_buf), n == 0); + + /* Populate the remapping table. */ + if (!hash_table_put(&unmarshal_remap_map, value_address, (size_t)(void *)value)) + fail_if (errno); + + /* Delayed seeking. */ + state_buf += n / sizeof(char); + + /* On error, seek past all clients. */ + continue; + clients_fail: + xperror(*argv); + with_error = 1; + if (value) { + buf_prev(state_buf, size_t, 1); + free(value); + } + for (; i < list_elements; i++) + /* There is not need to close the sockets, it is done by + the caller because there are conditions where we cannot + get here anyway. */ + state_buf += client_unmarshal_skip(state_buf) / sizeof(char); + break; } - for (; i < list_elements; i++) - /* There is not need to close the sockets, it is done by - the caller because there are conditions where we cannot - get here anyway. */ - state_buf += client_unmarshal_skip(state_buf) / sizeof(char); - break; - } - + #undef fail #define fail critical_fail - - /* Unmarshal the client list. */ - fail_if (linked_list_unmarshal(&client_list, state_buf)); - state_buf += list_size / sizeof(char); - - /* Unmarshal the client map. */ - fail_if (fd_table_unmarshal(&client_map, state_buf, unmarshal_remapper)); - - /* Remove non-found elements from the fd table. */ -#define __bit(I, _OP_) client_map.used[I / 64] _OP_ ((uint64_t)1 << (I % 64)) - if (with_error) - for (i = 0; i < client_map.capacity; i++) - if ((__bit(i, &)) && (client_map.values[i] == 0)) - /* Lets not presume that fd-table actually initialise its allocations. */ - __bit(i, &= ~); + + /* Unmarshal the client list. */ + fail_if (linked_list_unmarshal(&client_list, state_buf)); + state_buf += list_size / sizeof(char); + + /* Unmarshal the client map. */ + fail_if (fd_table_unmarshal(&client_map, state_buf, unmarshal_remapper)); + + /* Remove non-found elements from the fd table. */ +#define __bit(I, _OP_) (client_map.used[I / 64] _OP_ ((uint64_t)1 << (I % 64))) + if (with_error) + for (i = 0; i < client_map.capacity; i++) + if (__bit(i, &) && !client_map.values[i]) + /* Lets not presume that fd-table actually initialise its allocations. */ + __bit(i, &= ~); #undef __bit - - /* Remap the linked list and remove non-found elements, and start the clients. */ - foreach_linked_list_node (client_list, node) - { - /* Remap the linked list and remove non-found elements. */ - size_t new_address = unmarshal_remapper(client_list.values[node]); - client_list.values[node] = new_address; - if (new_address == 0) /* Returned if missing (or if the address is the invalid NULL.) */ - linked_list_remove(&client_list, node); - else - { - /* Start the clients. (Errors do not need to be reported.) */ - client_t* client = (client_t*)(void*)new_address; - int slave_fd = client->socket_fd; - - /* Increase number of running slaves. */ - with_mutex (slave_mutex, running_slaves++;); - - /* Start slave thread. */ - create_slave(&slave_thread, slave_fd); + + /* Remap the linked list and remove non-found elements, and start the clients. */ + foreach_linked_list_node (client_list, node) { + /* Remap the linked list and remove non-found elements. */ + new_address = unmarshal_remapper(client_list.values[node]); + client_list.values[node] = new_address; + if (new_address == 0) { /* Returned if missing (or if the address is the invalid NULL.) */ + linked_list_remove(&client_list, node); + } else { + /* Start the clients. (Errors do not need to be reported.) */ + client = (client_t*)(void*)new_address; + slave_fd = client->socket_fd; + + /* Increase number of running slaves. */ + with_mutex (slave_mutex, running_slaves++;); + + /* Start slave thread. */ + create_slave(&slave_thread, slave_fd); + } } - } - - /* Release the remapping table's resources. */ - hash_table_destroy(&unmarshal_remap_map, NULL, NULL); - - return with_error; + + /* Release the remapping table's resources. */ + hash_table_destroy(&unmarshal_remap_map, NULL, NULL); + + return with_error; #undef fail - - soft_fail: - xperror(*argv); - hash_table_destroy(&unmarshal_remap_map, NULL, NULL); - return -1; - critical_fail: - xperror(*argv); - abort(); + +soft_fail: + xperror(*argv); + hash_table_destroy(&unmarshal_remap_map, NULL, NULL); + return -1; +critical_fail: + xperror(*argv); + abort(); } @@ -304,10 +301,10 @@ int unmarshal_server(char* state_buf) * * @return Non-zero on error */ -int reexec_failure_recover(void) +int +reexec_failure_recover(void) { - /* Close all files (hopefully sockets) we do not know what they are. */ - close_files((fd > 2) && (fd != socket_fd) && (fd_table_contains_key(&client_map, fd) == 0)); - return 0; + /* Close all files (hopefully sockets) we do not know what they are. */ + close_files(fd > 2 && fd != socket_fd && !fd_table_contains_key(&client_map, fd)); + return 0; } - diff --git a/src/mds-server/sending.c b/src/mds-server/sending.c index 05e5cc0..e6a8094 100644 --- a/src/mds-server/sending.c +++ b/src/mds-server/sending.c @@ -41,11 +41,12 @@ * @param client_fd The file descriptor of the client's socket * @return The client */ -static client_t* client_by_socket(int client_fd) +static client_t * +client_by_socket(int client_fd) { - size_t address; - with_mutex (slave_mutex, address = fd_table_get(&client_map, client_fd);); - return (client_t*)(void*)address; + size_t address; + with_mutex (slave_mutex, address = fd_table_get(&client_map, client_fd);); + return (client_t*)(void*)address; } @@ -57,34 +58,32 @@ static client_t* client_by_socket(int client_fd) * @param modifying Whether the recipient may modify the message * @return Evaluates to true if and only if the entire message was sent */ -__attribute__((nonnull)) -static int send_multicast_to_recipient(multicast_t* multicast, client_t* recipient, int modifying) +static int __attribute__((nonnull)) +send_multicast_to_recipient(multicast_t *multicast, client_t *recipient, int modifying) { - char* msg = multicast->message + multicast->message_ptr; - size_t n = multicast->message_length - multicast->message_ptr; - size_t sent; - - /* Skip Modify ID header if the interceptors will not perform a modification. */ - if ((modifying == 0) && (multicast->message_ptr == 0)) - { - n -= multicast->message_prefix; - multicast->message_ptr += multicast->message_prefix; - } - - /* Send the message. */ - n *= sizeof(char); - with_mutex (recipient->mutex, - if (recipient->open) - { - sent = send_message(recipient->socket_fd, msg + multicast->message_ptr, n); - n -= sent; - multicast->message_ptr += sent / sizeof(char); - if ((n > 0) && (errno != EINTR)) - xperror(*argv); - } - ); - - return n == 0; + char *msg = multicast->message + multicast->message_ptr; + size_t n = multicast->message_length - multicast->message_ptr; + size_t sent; + + /* Skip Modify ID header if the interceptors will not perform a modification. */ + if (!modifying && !multicast->message_ptr) { + n -= multicast->message_prefix; + multicast->message_ptr += multicast->message_prefix; + } + + /* Send the message. */ + n *= sizeof(char); + with_mutex (recipient->mutex, + if (recipient->open) { + sent = send_message(recipient->socket_fd, msg + multicast->message_ptr, n); + n -= sent; + multicast->message_ptr += sent / sizeof(char); + if (n > 0 && errno != EINTR) + xperror(*argv); + } + ); + + return !n; } @@ -94,31 +93,29 @@ static int send_multicast_to_recipient(multicast_t* multicast, client_t* recipie * @param recipient The recipient * @param modify_id The modify ID of the multicast */ -__attribute__((nonnull)) -static void wait_for_reply(client_t* recipient, uint64_t modify_id) +static void __attribute__((nonnull)) +wait_for_reply(client_t *recipient, uint64_t modify_id) { - /* pthread_cond_timedwait is required to handle re-exec and termination because - pthread_cond_timedwait and pthread_cond_wait ignore interruptions via signals. */ - struct timespec timeout = - { - .tv_sec = 1, - .tv_nsec = 0 - }; - - with_mutex_if (modify_mutex, recipient->modify_message == NULL, - if (hash_table_contains_key(&modify_map, (size_t)modify_id) == 0) - { - hash_table_put(&modify_map, (size_t)modify_id, (size_t)(void*)recipient); - pthread_cond_signal(&slave_cond); - } - ); - - with_mutex_if (recipient->modify_mutex, recipient->modify_message == NULL, - while ((recipient->modify_message == NULL) && (terminating == 0)) - pthread_cond_timedwait(&slave_cond, &slave_mutex, &timeout); - if (terminating == 0) - hash_table_remove(&modify_map, (size_t)modify_id); - ); + /* pthread_cond_timedwait is required to handle re-exec and termination because + pthread_cond_timedwait and pthread_cond_wait ignore interruptions via signals. */ + struct timespec timeout = { + .tv_sec = 1, + .tv_nsec = 0 + }; + + with_mutex_if (modify_mutex, !recipient->modify_message, + if (!hash_table_contains_key(&modify_map, (size_t)modify_id)) { + hash_table_put(&modify_map, (size_t)modify_id, (size_t)(void*)recipient); + pthread_cond_signal(&slave_cond); + } + ); + + with_mutex_if (recipient->modify_mutex, !recipient->modify_message, + while (!recipient->modify_message && !terminating) + pthread_cond_timedwait(&slave_cond, &slave_mutex, &timeout); + if (!terminating) + hash_table_remove(&modify_map, (size_t)modify_id); + ); } @@ -127,88 +124,83 @@ static void wait_for_reply(client_t* recipient, uint64_t modify_id) * * @param multicast The multicast message */ -void multicast_message(multicast_t* multicast) +void multicast_message(multicast_t *multicast) { - int consumed = 0; - uint64_t modify_id = 0; - size_t n = strlen("Modify ID: "); - - if (startswith_n(multicast->message, "Modify ID: ", multicast->message_length, n)) - { - char* value = multicast->message + n; - char* lf = strchr(value, '\n'); - *lf = '\0'; - modify_id = atou64(value); - *lf = '\n'; - } - - for (; multicast->interceptions_ptr < multicast->interceptions_count; multicast->interceptions_ptr++) - { - queued_interception_t client_ = multicast->interceptions[multicast->interceptions_ptr]; - client_t* client = client_.client; - int modifying = 0; - char* old_buf; - size_t i; - mds_message_t* mod; - - /* After unmarshalling at re-exec, client will be NULL and must be mapped from its socket. */ - if (client == NULL) - client_.client = client = client_by_socket(client_.socket_fd); - - /* Send the message to the recipient. */ - if (send_multicast_to_recipient(multicast, client, client_.modifying) == 0) - { - /* Stop if we are re-exec:ing or terminating, or continue to next recipient on error. */ - if (terminating) - return; - else - continue; - } - - /* Do not wait for a reply if it is non-modifying. */ - if (client_.modifying == 0) - { - /* Reset how much of the message has been sent before we continue with next recipient. */ - multicast->message_ptr = 0; - continue; + int consumed = 0, modifying = 0; + uint64_t modify_id = 0; + size_t i, n = strlen("Modify ID: "); + char *value, *lf, *old_buf; + mds_message_t* mod; + client_t* client; + queued_interception_t client_; + + if (startswith_n(multicast->message, "Modify ID: ", multicast->message_length, n)) { + value = multicast->message + n; + lf = strchr(value, '\n'); + *lf = '\0'; + modify_id = atou64(value); + *lf = '\n'; } - - /* Wait for a reply. */ - wait_for_reply(client, modify_id); - if (terminating) - return; - - /* Act upon the reply. */ - mod = client->modify_message; - for (i = 0; i < mod->header_count; i++) - if (strequals(mod->headers[i], "Modify: yes")) - { - modifying = 1; - consumed = mod->payload_size == 0; - break; - } - if (modifying && !consumed) - { - n = mod->payload_size; - old_buf = multicast->message; - if (xrealloc(multicast->message, multicast->message_prefix + n, char)) - { - xperror(*argv); - multicast->message = old_buf; - } - else - memcpy(multicast->message + multicast->message_prefix, mod->payload, n); + + for (; multicast->interceptions_ptr < multicast->interceptions_count; multicast->interceptions_ptr++) { + client_ = multicast->interceptions[multicast->interceptions_ptr]; + client = client_.client; + modifying = 0; + + /* After unmarshalling at re-exec, client will be NULL and must be mapped from its socket. */ + if (!client) + client_.client = client = client_by_socket(client_.socket_fd); + + /* Send the message to the recipient. */ + if (!send_multicast_to_recipient(multicast, client, client_.modifying)) { + /* Stop if we are re-exec:ing or terminating, or continue to next recipient on error. */ + if (terminating) + return; + else + continue; + } + + /* Do not wait for a reply if it is non-modifying. */ + if (!client_.modifying) { + /* Reset how much of the message has been sent before we continue with next recipient. */ + multicast->message_ptr = 0; + continue; + } + + /* Wait for a reply. */ + wait_for_reply(client, modify_id); + if (terminating) + return; + + /* Act upon the reply. */ + mod = client->modify_message; + for (i = 0; i < mod->header_count; i++) { + if (strequals(mod->headers[i], "Modify: yes")) { + modifying = 1; + consumed = mod->payload_size == 0; + break; + } + } + if (modifying && !consumed) { + n = mod->payload_size; + old_buf = multicast->message; + if (xrealloc(multicast->message, multicast->message_prefix + n, char)) { + xperror(*argv); + multicast->message = old_buf; + } else { + memcpy(multicast->message + multicast->message_prefix, mod->payload, n); + } + } + + /* Free the reply. */ + mds_message_destroy(client->modify_message); + + /* Reset how much of the message has been sent before we continue with next recipient. */ + multicast->message_ptr = 0; + + if (consumed) + break; } - - /* Free the reply. */ - mds_message_destroy(client->modify_message); - - /* Reset how much of the message has been sent before we continue with next recipient. */ - multicast->message_ptr = 0; - - if (consumed) - break; - } } @@ -217,24 +209,24 @@ void multicast_message(multicast_t* multicast) * * @param client The client */ -void send_multicast_queue(client_t* client) +void +send_multicast_queue(client_t *client) { - while (client->multicasts_count > 0) - { - multicast_t multicast; - with_mutex_if (client->mutex, client->multicasts_count > 0, - size_t c = (client->multicasts_count -= 1) * sizeof(multicast_t); - multicast = client->multicasts[0]; - memmove(client->multicasts, client->multicasts + 1, c); - if (c == 0) - { - free(client->multicasts); - client->multicasts = NULL; - } - ); - multicast_message(&multicast); - multicast_destroy(&multicast); - } + multicast_t multicast; + size_t c; + while (client->multicasts_count > 0) { + with_mutex_if (client->mutex, client->multicasts_count > 0, + c = (client->multicasts_count -= 1) * sizeof(multicast_t); + multicast = client->multicasts[0]; + memmove(client->multicasts, client->multicasts + 1, c); + if (c == 0) { + free(client->multicasts); + client->multicasts = NULL; + } + ); + multicast_message(&multicast); + multicast_destroy(&multicast); + } } @@ -243,32 +235,29 @@ void send_multicast_queue(client_t* client) * * @param client The client */ -void send_reply_queue(client_t* client) +void +send_reply_queue(client_t *client) { - char* sendbuf = client->send_pending; - char* sendbuf_ = sendbuf; - size_t sent; - size_t n; - - if (client->send_pending_size == 0) - return; - - n = client->send_pending_size; - client->send_pending_size = 0; - client->send_pending = NULL; - with_mutex (client->mutex, - while (n > 0) - { - sent = send_message(client->socket_fd, sendbuf_, n); - n -= sent; - sendbuf_ += sent / sizeof(char); - if ((n > 0) && (errno != EINTR)) /* Ignore EINTR */ - { - xperror(*argv); - break; - } - } - free(sendbuf); - ); -} + char *sendbuf = client->send_pending; + char *sendbuf_ = sendbuf; + size_t sent, n; + + if (!client->send_pending_size) + return; + n = client->send_pending_size; + client->send_pending_size = 0; + client->send_pending = NULL; + with_mutex (client->mutex, + while (n > 0) { + sent = send_message(client->socket_fd, sendbuf_, n); + n -= sent; + sendbuf_ += sent / sizeof(char); + if (n > 0 && errno != EINTR) { /* Ignore EINTR */ + xperror(*argv); + break; + } + } + free(sendbuf); + ); +} diff --git a/src/mds-server/sending.h b/src/mds-server/sending.h index 487b711..7a4fdd8 100644 --- a/src/mds-server/sending.h +++ b/src/mds-server/sending.h @@ -29,7 +29,7 @@ * @param multicast The multicast message */ __attribute__((nonnull)) -void multicast_message(multicast_t* multicast); +void multicast_message(multicast_t *multicast); /** * Send the next message in a clients multicast queue @@ -37,7 +37,7 @@ void multicast_message(multicast_t* multicast); * @param client The client */ __attribute__((nonnull)) -void send_multicast_queue(client_t* client); +void send_multicast_queue(client_t *client); /** * Send the messages that are in a clients reply queue @@ -45,8 +45,7 @@ void send_multicast_queue(client_t* client); * @param client The client */ __attribute__((nonnull)) -void send_reply_queue(client_t* client); +void send_reply_queue(client_t *client); #endif - diff --git a/src/mds-server/signals.c b/src/mds-server/signals.c index 8e4dc0e..8e1ce1d 100644 --- a/src/mds-server/signals.c +++ b/src/mds-server/signals.c @@ -32,21 +32,20 @@ */ void signal_all(int signo) { - pthread_t current_thread; - ssize_t node; - - current_thread = pthread_self(); - - if (pthread_equal(current_thread, master_thread) == 0) - pthread_kill(master_thread, signo); - - with_mutex (slave_mutex, - foreach_linked_list_node (client_list, node) - { - client_t* value = (client_t*)(void*)(client_list.values[node]); - if (pthread_equal(current_thread, value->thread) == 0) - pthread_kill(value->thread, signo); - } - ); -} + pthread_t current_thread; + ssize_t node; + client_t *value; + + current_thread = pthread_self(); + if (pthread_equal(current_thread, master_thread) == 0) + pthread_kill(master_thread, signo); + + with_mutex (slave_mutex, + foreach_linked_list_node (client_list, node) { + value = (client_t*)(void*)(client_list.values[node]); + if (!pthread_equal(current_thread, value->thread)) + pthread_kill(value->thread, signo); + } + ); +} diff --git a/src/mds-server/slavery.c b/src/mds-server/slavery.c index de0336b..6669da6 100644 --- a/src/mds-server/slavery.c +++ b/src/mds-server/slavery.c @@ -35,7 +35,7 @@ * @param data Input data * @return Outout data */ -void* slave_loop(void*); +void *slave_loop(void *); /** @@ -44,34 +44,29 @@ void* slave_loop(void*); * @param client The client * @return Zero on success, -2 on failure, otherwise -1 */ -int fetch_message(client_t* client) +int +fetch_message(client_t *client) { - int r = mds_message_read(&(client->message), client->socket_fd); - - if (r == 0) - return 0; - - if (r == -2) - { - eprint("corrupt message received."); - fail_if (1); - } - else if (errno == ECONNRESET) - { - r = mds_message_read(&(client->message), client->socket_fd); - client->open = 0; - /* Connection closed. */ - } - else if (errno != EINTR) - { - xperror(*argv); - fail_if (1); - } - - fail_if (r == -2); - return r; - fail: - return -2; + int r = mds_message_read(&(client->message), client->socket_fd); + + if (!r) { + return 0; + } else if (r == -2) { + eprint("corrupt message received."); + fail_if (1); + } else if (errno == ECONNRESET) { + r = mds_message_read(&(client->message), client->socket_fd); + client->open = 0; + /* Connection closed. */ + } else if (errno != EINTR) { + xperror(*argv); + fail_if (1); + } + + fail_if (r == -2); + return r; +fail: + return -2; } @@ -82,22 +77,21 @@ int fetch_message(client_t* client) * @param slave_fd The file descriptor of the slave's socket * @return Zero on success, -1 on error, error message will have been printed */ -int create_slave(pthread_t* thread_slot, int slave_fd) +int +create_slave(pthread_t *thread_slot, int slave_fd) { - if ((errno = pthread_create(thread_slot, NULL, slave_loop, (void*)(intptr_t)slave_fd))) - { - xperror(*argv); - with_mutex (slave_mutex, running_slaves--;); - fail_if (1); - } - if ((errno = pthread_detach(*thread_slot))) - { - xperror(*argv); - fail_if (1); - } - return 0; - fail: - return -1; + if ((errno = pthread_create(thread_slot, NULL, slave_loop, (void *)(intptr_t)slave_fd))) { + xperror(*argv); + with_mutex (slave_mutex, running_slaves--;); + fail_if (1); + } + if ((errno = pthread_detach(*thread_slot))) { + xperror(*argv); + fail_if (1); + } + return 0; +fail: + return -1; } @@ -107,45 +101,44 @@ int create_slave(pthread_t* thread_slot, int slave_fd) * @param client_fd The file descriptor of the client's socket * @return The client information, `NULL` on error */ -client_t* initialise_client(int client_fd) +client_t * +initialise_client(int client_fd) { - ssize_t entry = LINKED_LIST_UNUSED; - client_t* information; - int locked = 0, saved_errno; - size_t tmp; - - /* Create information table. */ - fail_if (xmalloc(information, 1, client_t)); - client_initialise(information); - - /* Add to list of clients. */ - fail_if ((errno = pthread_mutex_lock(&slave_mutex))); - locked = 1; - entry = linked_list_insert_end(&client_list, (size_t)(void*)information); - fail_if (entry == LINKED_LIST_UNUSED); - - /* Add client to table. */ - tmp = fd_table_put(&client_map, client_fd, (size_t)(void*)information); - fail_if ((tmp == 0) && errno); - pthread_mutex_unlock(&slave_mutex); - locked = 0; - - /* Fill information table. */ - information->list_entry = entry; - information->socket_fd = client_fd; - information->open = 1; - fail_if (mds_message_initialise(&(information->message))); - - return information; - - - fail: - saved_errno = errno; - if (locked) - pthread_mutex_unlock(&slave_mutex); - free(information); - if (entry != LINKED_LIST_UNUSED) - with_mutex (slave_mutex, linked_list_remove(&client_list, entry);); - return errno = saved_errno, NULL; -} + ssize_t entry = LINKED_LIST_UNUSED; + client_t *information; + int locked = 0, saved_errno; + size_t tmp; + + /* Create information table. */ + fail_if (xmalloc(information, 1, client_t)); + client_initialise(information); + + /* Add to list of clients. */ + fail_if ((errno = pthread_mutex_lock(&slave_mutex))); + locked = 1; + entry = linked_list_insert_end(&client_list, (size_t)(void *)information); + fail_if (entry == LINKED_LIST_UNUSED); + /* Add client to table. */ + tmp = fd_table_put(&client_map, client_fd, (size_t)(void *)information); + fail_if (!tmp && errno); + pthread_mutex_unlock(&slave_mutex); + locked = 0; + + /* Fill information table. */ + information->list_entry = entry; + information->socket_fd = client_fd; + information->open = 1; + fail_if (mds_message_initialise(&(information->message))); + + return information; + +fail: + saved_errno = errno; + if (locked) + pthread_mutex_unlock(&slave_mutex); + free(information); + if (entry != LINKED_LIST_UNUSED) + with_mutex (slave_mutex, linked_list_remove(&client_list, entry);); + return errno = saved_errno, NULL; +} diff --git a/src/mds-server/slavery.h b/src/mds-server/slavery.h index 72e745d..89f2b83 100644 --- a/src/mds-server/slavery.h +++ b/src/mds-server/slavery.h @@ -31,7 +31,7 @@ * @return Zero on success, -2 on failure, otherwise -1 */ __attribute__((nonnull)) -int fetch_message(client_t* client); +int fetch_message(client_t *client); /** * Create, start and detache a slave thread @@ -41,7 +41,7 @@ int fetch_message(client_t* client); * @return Zero on success, -1 on error, error message will have been printed */ __attribute__((nonnull)) -int create_slave(pthread_t* thread_slot, int slave_fd); +int create_slave(pthread_t *thread_slot, int slave_fd); /** * Initialise a client, except for threading @@ -49,8 +49,7 @@ int create_slave(pthread_t* thread_slot, int slave_fd); * @param client_fd The file descriptor of the client's socket * @return The client information, `NULL` on error */ -client_t* initialise_client(int client_fd); +client_t *initialise_client(int client_fd); #endif - |