diff options
author | Mattias Andrée <maandree@kth.se> | 2017-11-05 00:09:50 +0100 |
---|---|---|
committer | Mattias Andrée <maandree@kth.se> | 2017-11-05 00:09:50 +0100 |
commit | 9e8dec188d55ca1f0a3b33acab702ced8ed07a18 (patch) | |
tree | cbb43c22e72674dc672e645e6596358e3868568e /src/mds-server/sending.c | |
parent | typo (diff) | |
download | mds-9e8dec188d55ca1f0a3b33acab702ced8ed07a18.tar.gz mds-9e8dec188d55ca1f0a3b33acab702ced8ed07a18.tar.bz2 mds-9e8dec188d55ca1f0a3b33acab702ced8ed07a18.tar.xz |
Work on changing style, and an important typo fix
Signed-off-by: Mattias Andrée <maandree@kth.se>
Diffstat (limited to '')
-rw-r--r-- | src/mds-server/sending.c | 345 |
1 files changed, 167 insertions, 178 deletions
diff --git a/src/mds-server/sending.c b/src/mds-server/sending.c index 05e5cc0..e6a8094 100644 --- a/src/mds-server/sending.c +++ b/src/mds-server/sending.c @@ -41,11 +41,12 @@ * @param client_fd The file descriptor of the client's socket * @return The client */ -static client_t* client_by_socket(int client_fd) +static client_t * +client_by_socket(int client_fd) { - size_t address; - with_mutex (slave_mutex, address = fd_table_get(&client_map, client_fd);); - return (client_t*)(void*)address; + size_t address; + with_mutex (slave_mutex, address = fd_table_get(&client_map, client_fd);); + return (client_t*)(void*)address; } @@ -57,34 +58,32 @@ static client_t* client_by_socket(int client_fd) * @param modifying Whether the recipient may modify the message * @return Evaluates to true if and only if the entire message was sent */ -__attribute__((nonnull)) -static int send_multicast_to_recipient(multicast_t* multicast, client_t* recipient, int modifying) +static int __attribute__((nonnull)) +send_multicast_to_recipient(multicast_t *multicast, client_t *recipient, int modifying) { - char* msg = multicast->message + multicast->message_ptr; - size_t n = multicast->message_length - multicast->message_ptr; - size_t sent; - - /* Skip Modify ID header if the interceptors will not perform a modification. */ - if ((modifying == 0) && (multicast->message_ptr == 0)) - { - n -= multicast->message_prefix; - multicast->message_ptr += multicast->message_prefix; - } - - /* Send the message. */ - n *= sizeof(char); - with_mutex (recipient->mutex, - if (recipient->open) - { - sent = send_message(recipient->socket_fd, msg + multicast->message_ptr, n); - n -= sent; - multicast->message_ptr += sent / sizeof(char); - if ((n > 0) && (errno != EINTR)) - xperror(*argv); - } - ); - - return n == 0; + char *msg = multicast->message + multicast->message_ptr; + size_t n = multicast->message_length - multicast->message_ptr; + size_t sent; + + /* Skip Modify ID header if the interceptors will not perform a modification. */ + if (!modifying && !multicast->message_ptr) { + n -= multicast->message_prefix; + multicast->message_ptr += multicast->message_prefix; + } + + /* Send the message. */ + n *= sizeof(char); + with_mutex (recipient->mutex, + if (recipient->open) { + sent = send_message(recipient->socket_fd, msg + multicast->message_ptr, n); + n -= sent; + multicast->message_ptr += sent / sizeof(char); + if (n > 0 && errno != EINTR) + xperror(*argv); + } + ); + + return !n; } @@ -94,31 +93,29 @@ static int send_multicast_to_recipient(multicast_t* multicast, client_t* recipie * @param recipient The recipient * @param modify_id The modify ID of the multicast */ -__attribute__((nonnull)) -static void wait_for_reply(client_t* recipient, uint64_t modify_id) +static void __attribute__((nonnull)) +wait_for_reply(client_t *recipient, uint64_t modify_id) { - /* pthread_cond_timedwait is required to handle re-exec and termination because - pthread_cond_timedwait and pthread_cond_wait ignore interruptions via signals. */ - struct timespec timeout = - { - .tv_sec = 1, - .tv_nsec = 0 - }; - - with_mutex_if (modify_mutex, recipient->modify_message == NULL, - if (hash_table_contains_key(&modify_map, (size_t)modify_id) == 0) - { - hash_table_put(&modify_map, (size_t)modify_id, (size_t)(void*)recipient); - pthread_cond_signal(&slave_cond); - } - ); - - with_mutex_if (recipient->modify_mutex, recipient->modify_message == NULL, - while ((recipient->modify_message == NULL) && (terminating == 0)) - pthread_cond_timedwait(&slave_cond, &slave_mutex, &timeout); - if (terminating == 0) - hash_table_remove(&modify_map, (size_t)modify_id); - ); + /* pthread_cond_timedwait is required to handle re-exec and termination because + pthread_cond_timedwait and pthread_cond_wait ignore interruptions via signals. */ + struct timespec timeout = { + .tv_sec = 1, + .tv_nsec = 0 + }; + + with_mutex_if (modify_mutex, !recipient->modify_message, + if (!hash_table_contains_key(&modify_map, (size_t)modify_id)) { + hash_table_put(&modify_map, (size_t)modify_id, (size_t)(void*)recipient); + pthread_cond_signal(&slave_cond); + } + ); + + with_mutex_if (recipient->modify_mutex, !recipient->modify_message, + while (!recipient->modify_message && !terminating) + pthread_cond_timedwait(&slave_cond, &slave_mutex, &timeout); + if (!terminating) + hash_table_remove(&modify_map, (size_t)modify_id); + ); } @@ -127,88 +124,83 @@ static void wait_for_reply(client_t* recipient, uint64_t modify_id) * * @param multicast The multicast message */ -void multicast_message(multicast_t* multicast) +void multicast_message(multicast_t *multicast) { - int consumed = 0; - uint64_t modify_id = 0; - size_t n = strlen("Modify ID: "); - - if (startswith_n(multicast->message, "Modify ID: ", multicast->message_length, n)) - { - char* value = multicast->message + n; - char* lf = strchr(value, '\n'); - *lf = '\0'; - modify_id = atou64(value); - *lf = '\n'; - } - - for (; multicast->interceptions_ptr < multicast->interceptions_count; multicast->interceptions_ptr++) - { - queued_interception_t client_ = multicast->interceptions[multicast->interceptions_ptr]; - client_t* client = client_.client; - int modifying = 0; - char* old_buf; - size_t i; - mds_message_t* mod; - - /* After unmarshalling at re-exec, client will be NULL and must be mapped from its socket. */ - if (client == NULL) - client_.client = client = client_by_socket(client_.socket_fd); - - /* Send the message to the recipient. */ - if (send_multicast_to_recipient(multicast, client, client_.modifying) == 0) - { - /* Stop if we are re-exec:ing or terminating, or continue to next recipient on error. */ - if (terminating) - return; - else - continue; - } - - /* Do not wait for a reply if it is non-modifying. */ - if (client_.modifying == 0) - { - /* Reset how much of the message has been sent before we continue with next recipient. */ - multicast->message_ptr = 0; - continue; + int consumed = 0, modifying = 0; + uint64_t modify_id = 0; + size_t i, n = strlen("Modify ID: "); + char *value, *lf, *old_buf; + mds_message_t* mod; + client_t* client; + queued_interception_t client_; + + if (startswith_n(multicast->message, "Modify ID: ", multicast->message_length, n)) { + value = multicast->message + n; + lf = strchr(value, '\n'); + *lf = '\0'; + modify_id = atou64(value); + *lf = '\n'; } - - /* Wait for a reply. */ - wait_for_reply(client, modify_id); - if (terminating) - return; - - /* Act upon the reply. */ - mod = client->modify_message; - for (i = 0; i < mod->header_count; i++) - if (strequals(mod->headers[i], "Modify: yes")) - { - modifying = 1; - consumed = mod->payload_size == 0; - break; - } - if (modifying && !consumed) - { - n = mod->payload_size; - old_buf = multicast->message; - if (xrealloc(multicast->message, multicast->message_prefix + n, char)) - { - xperror(*argv); - multicast->message = old_buf; - } - else - memcpy(multicast->message + multicast->message_prefix, mod->payload, n); + + for (; multicast->interceptions_ptr < multicast->interceptions_count; multicast->interceptions_ptr++) { + client_ = multicast->interceptions[multicast->interceptions_ptr]; + client = client_.client; + modifying = 0; + + /* After unmarshalling at re-exec, client will be NULL and must be mapped from its socket. */ + if (!client) + client_.client = client = client_by_socket(client_.socket_fd); + + /* Send the message to the recipient. */ + if (!send_multicast_to_recipient(multicast, client, client_.modifying)) { + /* Stop if we are re-exec:ing or terminating, or continue to next recipient on error. */ + if (terminating) + return; + else + continue; + } + + /* Do not wait for a reply if it is non-modifying. */ + if (!client_.modifying) { + /* Reset how much of the message has been sent before we continue with next recipient. */ + multicast->message_ptr = 0; + continue; + } + + /* Wait for a reply. */ + wait_for_reply(client, modify_id); + if (terminating) + return; + + /* Act upon the reply. */ + mod = client->modify_message; + for (i = 0; i < mod->header_count; i++) { + if (strequals(mod->headers[i], "Modify: yes")) { + modifying = 1; + consumed = mod->payload_size == 0; + break; + } + } + if (modifying && !consumed) { + n = mod->payload_size; + old_buf = multicast->message; + if (xrealloc(multicast->message, multicast->message_prefix + n, char)) { + xperror(*argv); + multicast->message = old_buf; + } else { + memcpy(multicast->message + multicast->message_prefix, mod->payload, n); + } + } + + /* Free the reply. */ + mds_message_destroy(client->modify_message); + + /* Reset how much of the message has been sent before we continue with next recipient. */ + multicast->message_ptr = 0; + + if (consumed) + break; } - - /* Free the reply. */ - mds_message_destroy(client->modify_message); - - /* Reset how much of the message has been sent before we continue with next recipient. */ - multicast->message_ptr = 0; - - if (consumed) - break; - } } @@ -217,24 +209,24 @@ void multicast_message(multicast_t* multicast) * * @param client The client */ -void send_multicast_queue(client_t* client) +void +send_multicast_queue(client_t *client) { - while (client->multicasts_count > 0) - { - multicast_t multicast; - with_mutex_if (client->mutex, client->multicasts_count > 0, - size_t c = (client->multicasts_count -= 1) * sizeof(multicast_t); - multicast = client->multicasts[0]; - memmove(client->multicasts, client->multicasts + 1, c); - if (c == 0) - { - free(client->multicasts); - client->multicasts = NULL; - } - ); - multicast_message(&multicast); - multicast_destroy(&multicast); - } + multicast_t multicast; + size_t c; + while (client->multicasts_count > 0) { + with_mutex_if (client->mutex, client->multicasts_count > 0, + c = (client->multicasts_count -= 1) * sizeof(multicast_t); + multicast = client->multicasts[0]; + memmove(client->multicasts, client->multicasts + 1, c); + if (c == 0) { + free(client->multicasts); + client->multicasts = NULL; + } + ); + multicast_message(&multicast); + multicast_destroy(&multicast); + } } @@ -243,32 +235,29 @@ void send_multicast_queue(client_t* client) * * @param client The client */ -void send_reply_queue(client_t* client) +void +send_reply_queue(client_t *client) { - char* sendbuf = client->send_pending; - char* sendbuf_ = sendbuf; - size_t sent; - size_t n; - - if (client->send_pending_size == 0) - return; - - n = client->send_pending_size; - client->send_pending_size = 0; - client->send_pending = NULL; - with_mutex (client->mutex, - while (n > 0) - { - sent = send_message(client->socket_fd, sendbuf_, n); - n -= sent; - sendbuf_ += sent / sizeof(char); - if ((n > 0) && (errno != EINTR)) /* Ignore EINTR */ - { - xperror(*argv); - break; - } - } - free(sendbuf); - ); -} + char *sendbuf = client->send_pending; + char *sendbuf_ = sendbuf; + size_t sent, n; + + if (!client->send_pending_size) + return; + n = client->send_pending_size; + client->send_pending_size = 0; + client->send_pending = NULL; + with_mutex (client->mutex, + while (n > 0) { + sent = send_message(client->socket_fd, sendbuf_, n); + n -= sent; + sendbuf_ += sent / sizeof(char); + if (n > 0 && errno != EINTR) { /* Ignore EINTR */ + xperror(*argv); + break; + } + } + free(sendbuf); + ); +} |