aboutsummaryrefslogtreecommitdiffstats
path: root/src/mds-server/receiving.c
diff options
context:
space:
mode:
authorMattias Andrée <maandree@kth.se>2017-11-05 00:09:50 +0100
committerMattias Andrée <maandree@kth.se>2017-11-05 00:09:50 +0100
commit9e8dec188d55ca1f0a3b33acab702ced8ed07a18 (patch)
treecbb43c22e72674dc672e645e6596358e3868568e /src/mds-server/receiving.c
parenttypo (diff)
downloadmds-9e8dec188d55ca1f0a3b33acab702ced8ed07a18.tar.gz
mds-9e8dec188d55ca1f0a3b33acab702ced8ed07a18.tar.bz2
mds-9e8dec188d55ca1f0a3b33acab702ced8ed07a18.tar.xz
Work on changing style, and an important typo fix
Signed-off-by: Mattias Andrée <maandree@kth.se>
Diffstat (limited to 'src/mds-server/receiving.c')
-rw-r--r--src/mds-server/receiving.c465
1 files changed, 225 insertions, 240 deletions
diff --git a/src/mds-server/receiving.c b/src/mds-server/receiving.c
index b93f638..09684ae 100644
--- a/src/mds-server/receiving.c
+++ b/src/mds-server/receiving.c
@@ -40,7 +40,7 @@
* @param sender The original sender of the message
*/
__attribute__((nonnull))
-void queue_message_multicast(char* message, size_t length, client_t* sender);
+void queue_message_multicast(char *message, size_t length, client_t *sender);
/**
@@ -51,55 +51,50 @@ void queue_message_multicast(char* message, size_t length, client_t* sender);
* @param modify_id The modify ID of the message
* @return Normally zero, but 1 if exited because of re-exec or termination
*/
-__attribute__((nonnull))
-static int modifying_notify(client_t* client, mds_message_t message, uint64_t modify_id)
+static int __attribute__((nonnull))
+modifying_notify(client_t *client, mds_message_t message, uint64_t modify_id)
{
- /* pthread_cond_timedwait is required to handle re-exec and termination because
- pthread_cond_timedwait and pthread_cond_wait ignore interruptions via signals. */
- struct timespec timeout =
- {
- .tv_sec = 1,
- .tv_nsec = 0
- };
- size_t address;
- client_t* recipient;
- mds_message_t* multicast;
- size_t i;
-
- pthread_mutex_lock(&(modify_mutex));
- while (hash_table_contains_key(&modify_map, (size_t)modify_id) == 0)
- {
- if (terminating)
- {
- pthread_mutex_unlock(&(modify_mutex));
- return 1;
+ /* pthread_cond_timedwait is required to handle re-exec and termination because
+ pthread_cond_timedwait and pthread_cond_wait ignore interruptions via signals. */
+ struct timespec timeout = {
+ .tv_sec = 1,
+ .tv_nsec = 0
+ };
+ size_t address;
+ client_t *recipient;
+ mds_message_t *multicast;
+ size_t i;
+
+ pthread_mutex_lock(&(modify_mutex));
+ while (!hash_table_contains_key(&modify_map, (size_t)modify_id)) {
+ if (terminating) {
+ pthread_mutex_unlock(&(modify_mutex));
+ return 1;
+ }
+ pthread_cond_timedwait(&slave_cond, &slave_mutex, &timeout);
}
- pthread_cond_timedwait(&slave_cond, &slave_mutex, &timeout);
- }
- address = hash_table_get(&modify_map, (size_t)modify_id);
- recipient = (client_t*)(void*)address;
- fail_if (xmalloc(multicast = recipient->modify_message, 1, mds_message_t));
- mds_message_zero_initialise(multicast);
- fail_if (xmemdup(multicast->payload, message.payload, message.payload_size, char));
- fail_if (xmalloc(multicast->headers, message.header_count, char*));
- for (i = 0; i < message.header_count; i++, multicast->header_count++)
- fail_if (xstrdup(multicast->headers[i], message.headers[i]));
- done:
- pthread_mutex_unlock(&(modify_mutex));
- with_mutex (client->modify_mutex, pthread_cond_signal(&(client->modify_cond)););
-
- return 0;
-
-
- fail:
- xperror(*argv);
- if (multicast != NULL)
- {
- mds_message_destroy(multicast);
- free(multicast);
- recipient->modify_message = NULL;
- }
- goto done;
+ address = hash_table_get(&modify_map, (size_t)modify_id);
+ recipient = (void *)address;
+ fail_if (xmalloc(multicast = recipient->modify_message, 1, mds_message_t));
+ mds_message_zero_initialise(multicast);
+ fail_if (xmemdup(multicast->payload, message.payload, message.payload_size, char));
+ fail_if (xmalloc(multicast->headers, message.header_count, char*));
+ for (i = 0; i < message.header_count; i++, multicast->header_count++)
+ fail_if (xstrdup(multicast->headers[i], message.headers[i]));
+done:
+ pthread_mutex_unlock(&(modify_mutex));
+ with_mutex (client->modify_mutex, pthread_cond_signal(&(client->modify_cond)););
+
+ return 0;
+
+fail:
+ xperror(*argv);
+ if (multicast) {
+ mds_message_destroy(multicast);
+ free(multicast);
+ recipient->modify_message = NULL;
+ }
+ goto done;
}
@@ -112,61 +107,56 @@ static int modifying_notify(client_t* client, mds_message_t message, uint64_t mo
* @param stop Whether to stop listening rather than start or reconfigure
* @return Zero on success, -1 on error
*/
-__attribute__((nonnull))
-static int add_intercept_conditions_from_message(client_t* client, int modifying, int64_t priority, int stop)
+static int __attribute__((nonnull))
+add_intercept_conditions_from_message(client_t *client, int modifying, int64_t priority, int stop)
{
- int saved_errno;
- char* payload = client->message.payload;
- size_t payload_size = client->message.payload_size;
- size_t size = 64;
- char* buf;
-
- fail_if (xmalloc(buf, size + 1, char));
-
- /* All messages. */
- if (client->message.payload_size == 0)
- {
- *buf = '\0';
- add_intercept_condition(client, buf, priority, modifying, stop);
- goto done;
- }
-
- /* Filtered messages. */
- for (;;)
- {
- char* end = memchr(payload, '\n', payload_size);
- size_t len = end == NULL ? payload_size : (size_t)(end - payload);
- if (len == 0)
- {
- payload++;
- payload_size--;
- break;
+ int saved_errno;
+ char *payload = client->message.payload;
+ size_t payload_size = client->message.payload_size;
+ size_t size = 64, len;
+ char *buf, *end, *old_buf;
+
+ fail_if (xmalloc(buf, size + 1, char));
+
+ /* All messages. */
+ if (!client->message.payload_size) {
+ *buf = '\0';
+ add_intercept_condition(client, buf, priority, modifying, stop);
+ goto done;
}
- if (len > size)
- {
- char* old_buf = buf;
- if (xrealloc(buf, (size <<= 1) + 1, char))
- {
- saved_errno = errno;
- free(old_buf);
- pthread_mutex_unlock(&(client->mutex));
- fail_if (errno = saved_errno, 1);
- }
+
+ /* Filtered messages. */
+ for (;;) {
+ end = memchr(payload, '\n', payload_size);
+ len = !end ? payload_size : (size_t)(end - payload);
+ if (len == 0) {
+ payload++;
+ payload_size--;
+ break;
+ }
+ if (len > size) {
+ old_buf = buf;
+ if (xrealloc(buf, (size <<= 1) + 1, char)) {
+ saved_errno = errno;
+ free(old_buf);
+ pthread_mutex_unlock(&(client->mutex));
+ fail_if (errno = saved_errno, 1);
+ }
+ }
+ memcpy(buf, payload, len);
+ buf[len] = '\0';
+ add_intercept_condition(client, buf, priority, modifying, stop);
+ if (!end)
+ break;
+ payload = end + 1;
+ payload_size -= len + 1;
}
- memcpy(buf, payload, len);
- buf[len] = '\0';
- add_intercept_condition(client, buf, priority, modifying, stop);
- if (end == NULL)
- break;
- payload = end + 1;
- payload_size -= len + 1;
- }
-
- done:
- free(buf);
- return 0;
- fail:
- return -1;
+
+done:
+ free(buf);
+ return 0;
+fail:
+ return -1;
}
@@ -177,60 +167,58 @@ static int add_intercept_conditions_from_message(client_t* client, int modifying
* @param message_id The message ID of the ID request
* @return Zero on success, -1 on error
*/
-__attribute__((nonnull(1)))
-static int assign_and_send_id(client_t* client, const char* message_id)
+static int __attribute__((nonnull(1)))
+assign_and_send_id(client_t *client, const char *message_id)
{
- char* msgbuf = NULL;
- char* msgbuf_;
- size_t n;
- int rc = -1;
-
- /* Construct response. */
- n = 2 * 10 + strlen(message_id);
- n += sizeof("ID assignment: :\nIn response to: \n\n") / sizeof(char);
- fail_if (xmalloc(msgbuf, n, char));
- snprintf(msgbuf, n,
- "ID assignment: %" PRIu32 ":%" PRIu32 "\n"
- "In response to: %s\n"
- "\n",
- (uint32_t)(client->id >> 32),
- (uint32_t)(client->id >> 0),
- message_id == NULL ? "" : message_id);
- n = strlen(msgbuf);
-
- /* Multicast the reply. */
- fail_if (xstrdup(msgbuf_, msgbuf));
- queue_message_multicast(msgbuf_, n, client);
-
- /* Queue message to be sent when this function returns.
- This done to simplify `multicast_message` for re-exec and termination. */
-#define fail fail_in_mutex
- with_mutex (client->mutex,
- if (client->send_pending_size == 0)
- {
- /* Set the pending message. */
- client->send_pending = msgbuf;
- client->send_pending_size = n;
- }
- else
- {
- /* Concatenate message to already pending messages. */
- size_t new_len = client->send_pending_size + n;
- char* msg_new = client->send_pending;
- fail_if (xrealloc(msg_new, new_len, char));
- memcpy(msg_new + client->send_pending_size, msgbuf, n * sizeof(char));
- client->send_pending = msg_new;
- client->send_pending_size = new_len;
- }
- (msgbuf = NULL, rc = 0, errno = 0);
- fail_in_mutex:
- );
+ char *msgbuf = NULL;
+ char *msgbuf_;
+ char *msg_new;
+ size_t n, new_len;
+ int rc = -1;
+
+ /* Construct response. */
+ n = 2 * 10 + strlen(message_id);
+ n += sizeof("ID assignment: :\nIn response to: \n\n") / sizeof(char);
+ fail_if (xmalloc(msgbuf, n, char));
+ snprintf(msgbuf, n,
+ "ID assignment: %" PRIu32 ":%" PRIu32 "\n"
+ "In response to: %s\n"
+ "\n",
+ (uint32_t)(client->id >> 32),
+ (uint32_t)(client->id >> 0),
+ !message_id ? "" : message_id);
+ n = strlen(msgbuf);
+
+ /* Multicast the reply. */
+ fail_if (xstrdup(msgbuf_, msgbuf));
+ queue_message_multicast(msgbuf_, n, client);
+
+ /* Queue message to be sent when this function returns.
+ This done to simplify `multicast_message` for re-exec and termination. */
+#define fail fail_in_mutex
+ with_mutex (client->mutex,
+ if (!client->send_pending_size) {
+ /* Set the pending message. */
+ client->send_pending = msgbuf;
+ client->send_pending_size = n;
+ } else {
+ /* Concatenate message to already pending messages. */
+ new_len = client->send_pending_size + n;
+ msg_new = client->send_pending;
+ fail_if (xrealloc(msg_new, new_len, char));
+ memcpy(msg_new + client->send_pending_size, msgbuf, n * sizeof(char));
+ client->send_pending = msg_new;
+ client->send_pending_size = new_len;
+ }
+ (msgbuf = NULL, rc = 0, errno = 0);
+ fail_in_mutex:
+ );
#undef fail
- fail: /* Also success. */
- xperror(*argv);
- free(msgbuf);
- return rc;
+fail: /* Also success. */
+ xperror(*argv);
+ free(msgbuf);
+ return rc;
}
@@ -241,95 +229,92 @@ static int assign_and_send_id(client_t* client, const char* message_id)
* @param client The client whom sent the message
* @return Normally zero, but 1 if exited because of re-exec or termination
*/
-int message_received(client_t* client)
+int
+message_received(client_t *client)
{
- mds_message_t message = client->message;
- int assign_id = 0;
- int modifying = 0;
- int intercept = 0;
- int64_t priority = 0;
- int stop = 0;
- const char* message_id = NULL;
- uint64_t modify_id = 0;
- char* msgbuf = NULL;
- size_t i, n;
-
-
- /* Parser headers. */
- for (i = 0; i < message.header_count; i++)
- {
- const char* h = message.headers[i];
- if (strequals(h, "Command: assign-id")) assign_id = 1;
- else if (strequals(h, "Command: intercept")) intercept = 1;
- else if (strequals(h, "Modifying: yes")) modifying = 1;
- else if (strequals(h, "Stop: yes")) stop = 1;
- else if (startswith(h, "Message ID: ")) message_id = strstr(h, ": ") + 2;
- else if (startswith(h, "Priority: ")) priority = ato64(strstr(h, ": ") + 2);
- else if (startswith(h, "Modify ID: ")) modify_id = atou64(strstr(h, ": ") + 2);
- }
-
-
- /* Notify waiting client about a received message modification. */
- if (modifying)
- return modifying_notify(client, message, modify_id);
- /* Do nothing more, not not even multicast this message. */
-
-
- if (message_id == NULL)
- {
- eprint("received message without a message ID, ignoring.");
- return 0;
- }
-
- /* Assign ID if not already assigned. */
- if (assign_id && (client->id == 0))
- {
- intercept |= 2;
- with_mutex_if (slave_mutex, (client->id = next_client_id++) == 0,
- eprint("this is impossible, ID counter has overflowed.");
- /* If the program ran for a millennium it would
- take c:a 585 assignments per nanosecond. This
- cannot possibly happen. (It would require serious
- dedication by generations of ponies (or just an alicorn)
- to maintain the process and transfer it new hardware.) */
- abort();
- );
- }
-
- /* Make the client listen for messages addressed to it. */
- if (intercept)
- {
- pthread_mutex_lock(&(client->mutex));
- if ((intercept & 1)) /* from payload */
- fail_if (add_intercept_conditions_from_message(client, modifying, priority, stop) < 0);
- if ((intercept & 2)) /* "To: $(client->id)" */
- {
- char buf[26];
- xsnprintf(buf, "To: %" PRIu32 ":%" PRIu32,
- (uint32_t)(client->id >> 32),
- (uint32_t)(client->id >> 0));
- add_intercept_condition(client, buf, priority, modifying, 0);
+ mds_message_t message = client->message;
+ int assign_id = 0;
+ int modifying = 0;
+ int intercept = 0;
+ int64_t priority = 0;
+ int stop = 0;
+ const char *message_id = NULL;
+ uint64_t modify_id = 0;
+ char *msgbuf = NULL;
+ size_t i, n;
+ const char *h;
+ char buf[26];
+
+
+ /* Parser headers. */
+ for (i = 0; i < message.header_count; i++) {
+ h = message.headers[i];
+ if (strequals(h, "Command: assign-id")) assign_id = 1;
+ else if (strequals(h, "Command: intercept")) intercept = 1;
+ else if (strequals(h, "Modifying: yes")) modifying = 1;
+ else if (strequals(h, "Stop: yes")) stop = 1;
+ else if (startswith(h, "Message ID: ")) message_id = strstr(h, ": ") + 2;
+ else if (startswith(h, "Priority: ")) priority = ato64(strstr(h, ": ") + 2);
+ else if (startswith(h, "Modify ID: ")) modify_id = atou64(strstr(h, ": ") + 2);
}
- pthread_mutex_unlock(&(client->mutex));
- }
-
-
- /* Multicast the message. */
- n = mds_message_compose_size(&message);
- fail_if (xbmalloc(msgbuf, n));
- mds_message_compose(&message, msgbuf);
- queue_message_multicast(msgbuf, n / sizeof(char), client);
- msgbuf = NULL;
-
-
- /* Send asigned ID. */
- if (assign_id)
- fail_if (assign_and_send_id(client, message_id) < 0);
-
- return 0;
-
- fail:
- xperror(*argv);
- free(msgbuf);
- return 0;
+
+
+ /* Notify waiting client about a received message modification. */
+ if (modifying)
+ return modifying_notify(client, message, modify_id);
+ /* Do nothing more, not not even multicast this message. */
+
+
+ if (!message_id) {
+ eprint("received message without a message ID, ignoring.");
+ return 0;
+ }
+
+ /* Assign ID if not already assigned. */
+ if (assign_id && !client->id) {
+ intercept |= 2;
+ with_mutex_if (slave_mutex, !(client->id = next_client_id++),
+ eprint("this is impossible, ID counter has overflowed.");
+ /* If the program ran for a millennium it would
+ take c:a 585 assignments per nanosecond. This
+ cannot possibly happen. (It would require serious
+ dedication by generations of ponies (or just an alicorn)
+ to maintain the process and transfer it new hardware.) */
+ abort();
+ );
+ }
+
+ /* Make the client listen for messages addressed to it. */
+ if (intercept) {
+ pthread_mutex_lock(&(client->mutex));
+ if ((intercept & 1)) /* from payload */
+ fail_if (add_intercept_conditions_from_message(client, modifying, priority, stop) < 0);
+ if ((intercept & 2)) { /* "To: $(client->id)" */
+ xsnprintf(buf, "To: %" PRIu32 ":%" PRIu32,
+ (uint32_t)(client->id >> 32),
+ (uint32_t)(client->id >> 0));
+ add_intercept_condition(client, buf, priority, modifying, 0);
+ }
+ pthread_mutex_unlock(&(client->mutex));
+ }
+
+
+ /* Multicast the message. */
+ n = mds_message_compose_size(&message);
+ fail_if (xbmalloc(msgbuf, n));
+ mds_message_compose(&message, msgbuf);
+ queue_message_multicast(msgbuf, n / sizeof(char), client);
+ msgbuf = NULL;
+
+
+ /* Send asigned ID. */
+ if (assign_id)
+ fail_if (assign_and_send_id(client, message_id) < 0);
+
+ return 0;
+
+fail:
+ xperror(*argv);
+ free(msgbuf);
+ return 0;
}