From 9e8dec188d55ca1f0a3b33acab702ced8ed07a18 Mon Sep 17 00:00:00 2001 From: Mattias Andrée Date: Sun, 5 Nov 2017 00:09:50 +0100 Subject: Work on changing style, and an important typo fix MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Mattias Andrée --- src/mds-server/receiving.c | 465 ++++++++++++++++++++++----------------------- 1 file changed, 225 insertions(+), 240 deletions(-) (limited to 'src/mds-server/receiving.c') diff --git a/src/mds-server/receiving.c b/src/mds-server/receiving.c index b93f638..09684ae 100644 --- a/src/mds-server/receiving.c +++ b/src/mds-server/receiving.c @@ -40,7 +40,7 @@ * @param sender The original sender of the message */ __attribute__((nonnull)) -void queue_message_multicast(char* message, size_t length, client_t* sender); +void queue_message_multicast(char *message, size_t length, client_t *sender); /** @@ -51,55 +51,50 @@ void queue_message_multicast(char* message, size_t length, client_t* sender); * @param modify_id The modify ID of the message * @return Normally zero, but 1 if exited because of re-exec or termination */ -__attribute__((nonnull)) -static int modifying_notify(client_t* client, mds_message_t message, uint64_t modify_id) +static int __attribute__((nonnull)) +modifying_notify(client_t *client, mds_message_t message, uint64_t modify_id) { - /* pthread_cond_timedwait is required to handle re-exec and termination because - pthread_cond_timedwait and pthread_cond_wait ignore interruptions via signals. */ - struct timespec timeout = - { - .tv_sec = 1, - .tv_nsec = 0 - }; - size_t address; - client_t* recipient; - mds_message_t* multicast; - size_t i; - - pthread_mutex_lock(&(modify_mutex)); - while (hash_table_contains_key(&modify_map, (size_t)modify_id) == 0) - { - if (terminating) - { - pthread_mutex_unlock(&(modify_mutex)); - return 1; + /* pthread_cond_timedwait is required to handle re-exec and termination because + pthread_cond_timedwait and pthread_cond_wait ignore interruptions via signals. */ + struct timespec timeout = { + .tv_sec = 1, + .tv_nsec = 0 + }; + size_t address; + client_t *recipient; + mds_message_t *multicast; + size_t i; + + pthread_mutex_lock(&(modify_mutex)); + while (!hash_table_contains_key(&modify_map, (size_t)modify_id)) { + if (terminating) { + pthread_mutex_unlock(&(modify_mutex)); + return 1; + } + pthread_cond_timedwait(&slave_cond, &slave_mutex, &timeout); } - pthread_cond_timedwait(&slave_cond, &slave_mutex, &timeout); - } - address = hash_table_get(&modify_map, (size_t)modify_id); - recipient = (client_t*)(void*)address; - fail_if (xmalloc(multicast = recipient->modify_message, 1, mds_message_t)); - mds_message_zero_initialise(multicast); - fail_if (xmemdup(multicast->payload, message.payload, message.payload_size, char)); - fail_if (xmalloc(multicast->headers, message.header_count, char*)); - for (i = 0; i < message.header_count; i++, multicast->header_count++) - fail_if (xstrdup(multicast->headers[i], message.headers[i])); - done: - pthread_mutex_unlock(&(modify_mutex)); - with_mutex (client->modify_mutex, pthread_cond_signal(&(client->modify_cond));); - - return 0; - - - fail: - xperror(*argv); - if (multicast != NULL) - { - mds_message_destroy(multicast); - free(multicast); - recipient->modify_message = NULL; - } - goto done; + address = hash_table_get(&modify_map, (size_t)modify_id); + recipient = (void *)address; + fail_if (xmalloc(multicast = recipient->modify_message, 1, mds_message_t)); + mds_message_zero_initialise(multicast); + fail_if (xmemdup(multicast->payload, message.payload, message.payload_size, char)); + fail_if (xmalloc(multicast->headers, message.header_count, char*)); + for (i = 0; i < message.header_count; i++, multicast->header_count++) + fail_if (xstrdup(multicast->headers[i], message.headers[i])); +done: + pthread_mutex_unlock(&(modify_mutex)); + with_mutex (client->modify_mutex, pthread_cond_signal(&(client->modify_cond));); + + return 0; + +fail: + xperror(*argv); + if (multicast) { + mds_message_destroy(multicast); + free(multicast); + recipient->modify_message = NULL; + } + goto done; } @@ -112,61 +107,56 @@ static int modifying_notify(client_t* client, mds_message_t message, uint64_t mo * @param stop Whether to stop listening rather than start or reconfigure * @return Zero on success, -1 on error */ -__attribute__((nonnull)) -static int add_intercept_conditions_from_message(client_t* client, int modifying, int64_t priority, int stop) +static int __attribute__((nonnull)) +add_intercept_conditions_from_message(client_t *client, int modifying, int64_t priority, int stop) { - int saved_errno; - char* payload = client->message.payload; - size_t payload_size = client->message.payload_size; - size_t size = 64; - char* buf; - - fail_if (xmalloc(buf, size + 1, char)); - - /* All messages. */ - if (client->message.payload_size == 0) - { - *buf = '\0'; - add_intercept_condition(client, buf, priority, modifying, stop); - goto done; - } - - /* Filtered messages. */ - for (;;) - { - char* end = memchr(payload, '\n', payload_size); - size_t len = end == NULL ? payload_size : (size_t)(end - payload); - if (len == 0) - { - payload++; - payload_size--; - break; + int saved_errno; + char *payload = client->message.payload; + size_t payload_size = client->message.payload_size; + size_t size = 64, len; + char *buf, *end, *old_buf; + + fail_if (xmalloc(buf, size + 1, char)); + + /* All messages. */ + if (!client->message.payload_size) { + *buf = '\0'; + add_intercept_condition(client, buf, priority, modifying, stop); + goto done; } - if (len > size) - { - char* old_buf = buf; - if (xrealloc(buf, (size <<= 1) + 1, char)) - { - saved_errno = errno; - free(old_buf); - pthread_mutex_unlock(&(client->mutex)); - fail_if (errno = saved_errno, 1); - } + + /* Filtered messages. */ + for (;;) { + end = memchr(payload, '\n', payload_size); + len = !end ? payload_size : (size_t)(end - payload); + if (len == 0) { + payload++; + payload_size--; + break; + } + if (len > size) { + old_buf = buf; + if (xrealloc(buf, (size <<= 1) + 1, char)) { + saved_errno = errno; + free(old_buf); + pthread_mutex_unlock(&(client->mutex)); + fail_if (errno = saved_errno, 1); + } + } + memcpy(buf, payload, len); + buf[len] = '\0'; + add_intercept_condition(client, buf, priority, modifying, stop); + if (!end) + break; + payload = end + 1; + payload_size -= len + 1; } - memcpy(buf, payload, len); - buf[len] = '\0'; - add_intercept_condition(client, buf, priority, modifying, stop); - if (end == NULL) - break; - payload = end + 1; - payload_size -= len + 1; - } - - done: - free(buf); - return 0; - fail: - return -1; + +done: + free(buf); + return 0; +fail: + return -1; } @@ -177,60 +167,58 @@ static int add_intercept_conditions_from_message(client_t* client, int modifying * @param message_id The message ID of the ID request * @return Zero on success, -1 on error */ -__attribute__((nonnull(1))) -static int assign_and_send_id(client_t* client, const char* message_id) +static int __attribute__((nonnull(1))) +assign_and_send_id(client_t *client, const char *message_id) { - char* msgbuf = NULL; - char* msgbuf_; - size_t n; - int rc = -1; - - /* Construct response. */ - n = 2 * 10 + strlen(message_id); - n += sizeof("ID assignment: :\nIn response to: \n\n") / sizeof(char); - fail_if (xmalloc(msgbuf, n, char)); - snprintf(msgbuf, n, - "ID assignment: %" PRIu32 ":%" PRIu32 "\n" - "In response to: %s\n" - "\n", - (uint32_t)(client->id >> 32), - (uint32_t)(client->id >> 0), - message_id == NULL ? "" : message_id); - n = strlen(msgbuf); - - /* Multicast the reply. */ - fail_if (xstrdup(msgbuf_, msgbuf)); - queue_message_multicast(msgbuf_, n, client); - - /* Queue message to be sent when this function returns. - This done to simplify `multicast_message` for re-exec and termination. */ -#define fail fail_in_mutex - with_mutex (client->mutex, - if (client->send_pending_size == 0) - { - /* Set the pending message. */ - client->send_pending = msgbuf; - client->send_pending_size = n; - } - else - { - /* Concatenate message to already pending messages. */ - size_t new_len = client->send_pending_size + n; - char* msg_new = client->send_pending; - fail_if (xrealloc(msg_new, new_len, char)); - memcpy(msg_new + client->send_pending_size, msgbuf, n * sizeof(char)); - client->send_pending = msg_new; - client->send_pending_size = new_len; - } - (msgbuf = NULL, rc = 0, errno = 0); - fail_in_mutex: - ); + char *msgbuf = NULL; + char *msgbuf_; + char *msg_new; + size_t n, new_len; + int rc = -1; + + /* Construct response. */ + n = 2 * 10 + strlen(message_id); + n += sizeof("ID assignment: :\nIn response to: \n\n") / sizeof(char); + fail_if (xmalloc(msgbuf, n, char)); + snprintf(msgbuf, n, + "ID assignment: %" PRIu32 ":%" PRIu32 "\n" + "In response to: %s\n" + "\n", + (uint32_t)(client->id >> 32), + (uint32_t)(client->id >> 0), + !message_id ? "" : message_id); + n = strlen(msgbuf); + + /* Multicast the reply. */ + fail_if (xstrdup(msgbuf_, msgbuf)); + queue_message_multicast(msgbuf_, n, client); + + /* Queue message to be sent when this function returns. + This done to simplify `multicast_message` for re-exec and termination. */ +#define fail fail_in_mutex + with_mutex (client->mutex, + if (!client->send_pending_size) { + /* Set the pending message. */ + client->send_pending = msgbuf; + client->send_pending_size = n; + } else { + /* Concatenate message to already pending messages. */ + new_len = client->send_pending_size + n; + msg_new = client->send_pending; + fail_if (xrealloc(msg_new, new_len, char)); + memcpy(msg_new + client->send_pending_size, msgbuf, n * sizeof(char)); + client->send_pending = msg_new; + client->send_pending_size = new_len; + } + (msgbuf = NULL, rc = 0, errno = 0); + fail_in_mutex: + ); #undef fail - fail: /* Also success. */ - xperror(*argv); - free(msgbuf); - return rc; +fail: /* Also success. */ + xperror(*argv); + free(msgbuf); + return rc; } @@ -241,95 +229,92 @@ static int assign_and_send_id(client_t* client, const char* message_id) * @param client The client whom sent the message * @return Normally zero, but 1 if exited because of re-exec or termination */ -int message_received(client_t* client) +int +message_received(client_t *client) { - mds_message_t message = client->message; - int assign_id = 0; - int modifying = 0; - int intercept = 0; - int64_t priority = 0; - int stop = 0; - const char* message_id = NULL; - uint64_t modify_id = 0; - char* msgbuf = NULL; - size_t i, n; - - - /* Parser headers. */ - for (i = 0; i < message.header_count; i++) - { - const char* h = message.headers[i]; - if (strequals(h, "Command: assign-id")) assign_id = 1; - else if (strequals(h, "Command: intercept")) intercept = 1; - else if (strequals(h, "Modifying: yes")) modifying = 1; - else if (strequals(h, "Stop: yes")) stop = 1; - else if (startswith(h, "Message ID: ")) message_id = strstr(h, ": ") + 2; - else if (startswith(h, "Priority: ")) priority = ato64(strstr(h, ": ") + 2); - else if (startswith(h, "Modify ID: ")) modify_id = atou64(strstr(h, ": ") + 2); - } - - - /* Notify waiting client about a received message modification. */ - if (modifying) - return modifying_notify(client, message, modify_id); - /* Do nothing more, not not even multicast this message. */ - - - if (message_id == NULL) - { - eprint("received message without a message ID, ignoring."); - return 0; - } - - /* Assign ID if not already assigned. */ - if (assign_id && (client->id == 0)) - { - intercept |= 2; - with_mutex_if (slave_mutex, (client->id = next_client_id++) == 0, - eprint("this is impossible, ID counter has overflowed."); - /* If the program ran for a millennium it would - take c:a 585 assignments per nanosecond. This - cannot possibly happen. (It would require serious - dedication by generations of ponies (or just an alicorn) - to maintain the process and transfer it new hardware.) */ - abort(); - ); - } - - /* Make the client listen for messages addressed to it. */ - if (intercept) - { - pthread_mutex_lock(&(client->mutex)); - if ((intercept & 1)) /* from payload */ - fail_if (add_intercept_conditions_from_message(client, modifying, priority, stop) < 0); - if ((intercept & 2)) /* "To: $(client->id)" */ - { - char buf[26]; - xsnprintf(buf, "To: %" PRIu32 ":%" PRIu32, - (uint32_t)(client->id >> 32), - (uint32_t)(client->id >> 0)); - add_intercept_condition(client, buf, priority, modifying, 0); + mds_message_t message = client->message; + int assign_id = 0; + int modifying = 0; + int intercept = 0; + int64_t priority = 0; + int stop = 0; + const char *message_id = NULL; + uint64_t modify_id = 0; + char *msgbuf = NULL; + size_t i, n; + const char *h; + char buf[26]; + + + /* Parser headers. */ + for (i = 0; i < message.header_count; i++) { + h = message.headers[i]; + if (strequals(h, "Command: assign-id")) assign_id = 1; + else if (strequals(h, "Command: intercept")) intercept = 1; + else if (strequals(h, "Modifying: yes")) modifying = 1; + else if (strequals(h, "Stop: yes")) stop = 1; + else if (startswith(h, "Message ID: ")) message_id = strstr(h, ": ") + 2; + else if (startswith(h, "Priority: ")) priority = ato64(strstr(h, ": ") + 2); + else if (startswith(h, "Modify ID: ")) modify_id = atou64(strstr(h, ": ") + 2); } - pthread_mutex_unlock(&(client->mutex)); - } - - - /* Multicast the message. */ - n = mds_message_compose_size(&message); - fail_if (xbmalloc(msgbuf, n)); - mds_message_compose(&message, msgbuf); - queue_message_multicast(msgbuf, n / sizeof(char), client); - msgbuf = NULL; - - - /* Send asigned ID. */ - if (assign_id) - fail_if (assign_and_send_id(client, message_id) < 0); - - return 0; - - fail: - xperror(*argv); - free(msgbuf); - return 0; + + + /* Notify waiting client about a received message modification. */ + if (modifying) + return modifying_notify(client, message, modify_id); + /* Do nothing more, not not even multicast this message. */ + + + if (!message_id) { + eprint("received message without a message ID, ignoring."); + return 0; + } + + /* Assign ID if not already assigned. */ + if (assign_id && !client->id) { + intercept |= 2; + with_mutex_if (slave_mutex, !(client->id = next_client_id++), + eprint("this is impossible, ID counter has overflowed."); + /* If the program ran for a millennium it would + take c:a 585 assignments per nanosecond. This + cannot possibly happen. (It would require serious + dedication by generations of ponies (or just an alicorn) + to maintain the process and transfer it new hardware.) */ + abort(); + ); + } + + /* Make the client listen for messages addressed to it. */ + if (intercept) { + pthread_mutex_lock(&(client->mutex)); + if ((intercept & 1)) /* from payload */ + fail_if (add_intercept_conditions_from_message(client, modifying, priority, stop) < 0); + if ((intercept & 2)) { /* "To: $(client->id)" */ + xsnprintf(buf, "To: %" PRIu32 ":%" PRIu32, + (uint32_t)(client->id >> 32), + (uint32_t)(client->id >> 0)); + add_intercept_condition(client, buf, priority, modifying, 0); + } + pthread_mutex_unlock(&(client->mutex)); + } + + + /* Multicast the message. */ + n = mds_message_compose_size(&message); + fail_if (xbmalloc(msgbuf, n)); + mds_message_compose(&message, msgbuf); + queue_message_multicast(msgbuf, n / sizeof(char), client); + msgbuf = NULL; + + + /* Send asigned ID. */ + if (assign_id) + fail_if (assign_and_send_id(client, message_id) < 0); + + return 0; + +fail: + xperror(*argv); + free(msgbuf); + return 0; } -- cgit v1.2.3-70-g09d2