aboutsummaryrefslogtreecommitdiffstats
path: root/src/mds-server/sending.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/mds-server/sending.c')
-rw-r--r--src/mds-server/sending.c345
1 files changed, 167 insertions, 178 deletions
diff --git a/src/mds-server/sending.c b/src/mds-server/sending.c
index 05e5cc0..e6a8094 100644
--- a/src/mds-server/sending.c
+++ b/src/mds-server/sending.c
@@ -41,11 +41,12 @@
* @param client_fd The file descriptor of the client's socket
* @return The client
*/
-static client_t* client_by_socket(int client_fd)
+static client_t *
+client_by_socket(int client_fd)
{
- size_t address;
- with_mutex (slave_mutex, address = fd_table_get(&client_map, client_fd););
- return (client_t*)(void*)address;
+ size_t address;
+ with_mutex (slave_mutex, address = fd_table_get(&client_map, client_fd););
+ return (client_t*)(void*)address;
}
@@ -57,34 +58,32 @@ static client_t* client_by_socket(int client_fd)
* @param modifying Whether the recipient may modify the message
* @return Evaluates to true if and only if the entire message was sent
*/
-__attribute__((nonnull))
-static int send_multicast_to_recipient(multicast_t* multicast, client_t* recipient, int modifying)
+static int __attribute__((nonnull))
+send_multicast_to_recipient(multicast_t *multicast, client_t *recipient, int modifying)
{
- char* msg = multicast->message + multicast->message_ptr;
- size_t n = multicast->message_length - multicast->message_ptr;
- size_t sent;
-
- /* Skip Modify ID header if the interceptors will not perform a modification. */
- if ((modifying == 0) && (multicast->message_ptr == 0))
- {
- n -= multicast->message_prefix;
- multicast->message_ptr += multicast->message_prefix;
- }
-
- /* Send the message. */
- n *= sizeof(char);
- with_mutex (recipient->mutex,
- if (recipient->open)
- {
- sent = send_message(recipient->socket_fd, msg + multicast->message_ptr, n);
- n -= sent;
- multicast->message_ptr += sent / sizeof(char);
- if ((n > 0) && (errno != EINTR))
- xperror(*argv);
- }
- );
-
- return n == 0;
+ char *msg = multicast->message + multicast->message_ptr;
+ size_t n = multicast->message_length - multicast->message_ptr;
+ size_t sent;
+
+ /* Skip Modify ID header if the interceptors will not perform a modification. */
+ if (!modifying && !multicast->message_ptr) {
+ n -= multicast->message_prefix;
+ multicast->message_ptr += multicast->message_prefix;
+ }
+
+ /* Send the message. */
+ n *= sizeof(char);
+ with_mutex (recipient->mutex,
+ if (recipient->open) {
+ sent = send_message(recipient->socket_fd, msg + multicast->message_ptr, n);
+ n -= sent;
+ multicast->message_ptr += sent / sizeof(char);
+ if (n > 0 && errno != EINTR)
+ xperror(*argv);
+ }
+ );
+
+ return !n;
}
@@ -94,31 +93,29 @@ static int send_multicast_to_recipient(multicast_t* multicast, client_t* recipie
* @param recipient The recipient
* @param modify_id The modify ID of the multicast
*/
-__attribute__((nonnull))
-static void wait_for_reply(client_t* recipient, uint64_t modify_id)
+static void __attribute__((nonnull))
+wait_for_reply(client_t *recipient, uint64_t modify_id)
{
- /* pthread_cond_timedwait is required to handle re-exec and termination because
- pthread_cond_timedwait and pthread_cond_wait ignore interruptions via signals. */
- struct timespec timeout =
- {
- .tv_sec = 1,
- .tv_nsec = 0
- };
-
- with_mutex_if (modify_mutex, recipient->modify_message == NULL,
- if (hash_table_contains_key(&modify_map, (size_t)modify_id) == 0)
- {
- hash_table_put(&modify_map, (size_t)modify_id, (size_t)(void*)recipient);
- pthread_cond_signal(&slave_cond);
- }
- );
-
- with_mutex_if (recipient->modify_mutex, recipient->modify_message == NULL,
- while ((recipient->modify_message == NULL) && (terminating == 0))
- pthread_cond_timedwait(&slave_cond, &slave_mutex, &timeout);
- if (terminating == 0)
- hash_table_remove(&modify_map, (size_t)modify_id);
- );
+ /* pthread_cond_timedwait is required to handle re-exec and termination because
+ pthread_cond_timedwait and pthread_cond_wait ignore interruptions via signals. */
+ struct timespec timeout = {
+ .tv_sec = 1,
+ .tv_nsec = 0
+ };
+
+ with_mutex_if (modify_mutex, !recipient->modify_message,
+ if (!hash_table_contains_key(&modify_map, (size_t)modify_id)) {
+ hash_table_put(&modify_map, (size_t)modify_id, (size_t)(void*)recipient);
+ pthread_cond_signal(&slave_cond);
+ }
+ );
+
+ with_mutex_if (recipient->modify_mutex, !recipient->modify_message,
+ while (!recipient->modify_message && !terminating)
+ pthread_cond_timedwait(&slave_cond, &slave_mutex, &timeout);
+ if (!terminating)
+ hash_table_remove(&modify_map, (size_t)modify_id);
+ );
}
@@ -127,88 +124,83 @@ static void wait_for_reply(client_t* recipient, uint64_t modify_id)
*
* @param multicast The multicast message
*/
-void multicast_message(multicast_t* multicast)
+void multicast_message(multicast_t *multicast)
{
- int consumed = 0;
- uint64_t modify_id = 0;
- size_t n = strlen("Modify ID: ");
-
- if (startswith_n(multicast->message, "Modify ID: ", multicast->message_length, n))
- {
- char* value = multicast->message + n;
- char* lf = strchr(value, '\n');
- *lf = '\0';
- modify_id = atou64(value);
- *lf = '\n';
- }
-
- for (; multicast->interceptions_ptr < multicast->interceptions_count; multicast->interceptions_ptr++)
- {
- queued_interception_t client_ = multicast->interceptions[multicast->interceptions_ptr];
- client_t* client = client_.client;
- int modifying = 0;
- char* old_buf;
- size_t i;
- mds_message_t* mod;
-
- /* After unmarshalling at re-exec, client will be NULL and must be mapped from its socket. */
- if (client == NULL)
- client_.client = client = client_by_socket(client_.socket_fd);
-
- /* Send the message to the recipient. */
- if (send_multicast_to_recipient(multicast, client, client_.modifying) == 0)
- {
- /* Stop if we are re-exec:ing or terminating, or continue to next recipient on error. */
- if (terminating)
- return;
- else
- continue;
- }
-
- /* Do not wait for a reply if it is non-modifying. */
- if (client_.modifying == 0)
- {
- /* Reset how much of the message has been sent before we continue with next recipient. */
- multicast->message_ptr = 0;
- continue;
+ int consumed = 0, modifying = 0;
+ uint64_t modify_id = 0;
+ size_t i, n = strlen("Modify ID: ");
+ char *value, *lf, *old_buf;
+ mds_message_t* mod;
+ client_t* client;
+ queued_interception_t client_;
+
+ if (startswith_n(multicast->message, "Modify ID: ", multicast->message_length, n)) {
+ value = multicast->message + n;
+ lf = strchr(value, '\n');
+ *lf = '\0';
+ modify_id = atou64(value);
+ *lf = '\n';
}
-
- /* Wait for a reply. */
- wait_for_reply(client, modify_id);
- if (terminating)
- return;
-
- /* Act upon the reply. */
- mod = client->modify_message;
- for (i = 0; i < mod->header_count; i++)
- if (strequals(mod->headers[i], "Modify: yes"))
- {
- modifying = 1;
- consumed = mod->payload_size == 0;
- break;
- }
- if (modifying && !consumed)
- {
- n = mod->payload_size;
- old_buf = multicast->message;
- if (xrealloc(multicast->message, multicast->message_prefix + n, char))
- {
- xperror(*argv);
- multicast->message = old_buf;
- }
- else
- memcpy(multicast->message + multicast->message_prefix, mod->payload, n);
+
+ for (; multicast->interceptions_ptr < multicast->interceptions_count; multicast->interceptions_ptr++) {
+ client_ = multicast->interceptions[multicast->interceptions_ptr];
+ client = client_.client;
+ modifying = 0;
+
+ /* After unmarshalling at re-exec, client will be NULL and must be mapped from its socket. */
+ if (!client)
+ client_.client = client = client_by_socket(client_.socket_fd);
+
+ /* Send the message to the recipient. */
+ if (!send_multicast_to_recipient(multicast, client, client_.modifying)) {
+ /* Stop if we are re-exec:ing or terminating, or continue to next recipient on error. */
+ if (terminating)
+ return;
+ else
+ continue;
+ }
+
+ /* Do not wait for a reply if it is non-modifying. */
+ if (!client_.modifying) {
+ /* Reset how much of the message has been sent before we continue with next recipient. */
+ multicast->message_ptr = 0;
+ continue;
+ }
+
+ /* Wait for a reply. */
+ wait_for_reply(client, modify_id);
+ if (terminating)
+ return;
+
+ /* Act upon the reply. */
+ mod = client->modify_message;
+ for (i = 0; i < mod->header_count; i++) {
+ if (strequals(mod->headers[i], "Modify: yes")) {
+ modifying = 1;
+ consumed = mod->payload_size == 0;
+ break;
+ }
+ }
+ if (modifying && !consumed) {
+ n = mod->payload_size;
+ old_buf = multicast->message;
+ if (xrealloc(multicast->message, multicast->message_prefix + n, char)) {
+ xperror(*argv);
+ multicast->message = old_buf;
+ } else {
+ memcpy(multicast->message + multicast->message_prefix, mod->payload, n);
+ }
+ }
+
+ /* Free the reply. */
+ mds_message_destroy(client->modify_message);
+
+ /* Reset how much of the message has been sent before we continue with next recipient. */
+ multicast->message_ptr = 0;
+
+ if (consumed)
+ break;
}
-
- /* Free the reply. */
- mds_message_destroy(client->modify_message);
-
- /* Reset how much of the message has been sent before we continue with next recipient. */
- multicast->message_ptr = 0;
-
- if (consumed)
- break;
- }
}
@@ -217,24 +209,24 @@ void multicast_message(multicast_t* multicast)
*
* @param client The client
*/
-void send_multicast_queue(client_t* client)
+void
+send_multicast_queue(client_t *client)
{
- while (client->multicasts_count > 0)
- {
- multicast_t multicast;
- with_mutex_if (client->mutex, client->multicasts_count > 0,
- size_t c = (client->multicasts_count -= 1) * sizeof(multicast_t);
- multicast = client->multicasts[0];
- memmove(client->multicasts, client->multicasts + 1, c);
- if (c == 0)
- {
- free(client->multicasts);
- client->multicasts = NULL;
- }
- );
- multicast_message(&multicast);
- multicast_destroy(&multicast);
- }
+ multicast_t multicast;
+ size_t c;
+ while (client->multicasts_count > 0) {
+ with_mutex_if (client->mutex, client->multicasts_count > 0,
+ c = (client->multicasts_count -= 1) * sizeof(multicast_t);
+ multicast = client->multicasts[0];
+ memmove(client->multicasts, client->multicasts + 1, c);
+ if (c == 0) {
+ free(client->multicasts);
+ client->multicasts = NULL;
+ }
+ );
+ multicast_message(&multicast);
+ multicast_destroy(&multicast);
+ }
}
@@ -243,32 +235,29 @@ void send_multicast_queue(client_t* client)
*
* @param client The client
*/
-void send_reply_queue(client_t* client)
+void
+send_reply_queue(client_t *client)
{
- char* sendbuf = client->send_pending;
- char* sendbuf_ = sendbuf;
- size_t sent;
- size_t n;
-
- if (client->send_pending_size == 0)
- return;
-
- n = client->send_pending_size;
- client->send_pending_size = 0;
- client->send_pending = NULL;
- with_mutex (client->mutex,
- while (n > 0)
- {
- sent = send_message(client->socket_fd, sendbuf_, n);
- n -= sent;
- sendbuf_ += sent / sizeof(char);
- if ((n > 0) && (errno != EINTR)) /* Ignore EINTR */
- {
- xperror(*argv);
- break;
- }
- }
- free(sendbuf);
- );
-}
+ char *sendbuf = client->send_pending;
+ char *sendbuf_ = sendbuf;
+ size_t sent, n;
+
+ if (!client->send_pending_size)
+ return;
+ n = client->send_pending_size;
+ client->send_pending_size = 0;
+ client->send_pending = NULL;
+ with_mutex (client->mutex,
+ while (n > 0) {
+ sent = send_message(client->socket_fd, sendbuf_, n);
+ n -= sent;
+ sendbuf_ += sent / sizeof(char);
+ if (n > 0 && errno != EINTR) { /* Ignore EINTR */
+ xperror(*argv);
+ break;
+ }
+ }
+ free(sendbuf);
+ );
+}