aboutsummaryrefslogtreecommitdiffstats
path: root/src/mds-server
diff options
context:
space:
mode:
authorMattias Andrée <maandree@operamail.com>2014-05-18 11:54:45 +0200
committerMattias Andrée <maandree@operamail.com>2014-05-18 11:54:45 +0200
commitb64e5b3644b4c097e6ef72941e59c0abeb5f6502 (patch)
treefcccd8aafec3780fe80bd476f033969e837cf585 /src/mds-server
parentm (diff)
downloadmds-b64e5b3644b4c097e6ef72941e59c0abeb5f6502.tar.gz
mds-b64e5b3644b4c097e6ef72941e59c0abeb5f6502.tar.bz2
mds-b64e5b3644b4c097e6ef72941e59c0abeb5f6502.tar.xz
reduce code complexity
Signed-off-by: Mattias Andrée <maandree@operamail.com>
Diffstat (limited to 'src/mds-server')
-rw-r--r--src/mds-server/mds-server.c434
1 files changed, 261 insertions, 173 deletions
diff --git a/src/mds-server/mds-server.c b/src/mds-server/mds-server.c
index 4d146b7..d835a80 100644
--- a/src/mds-server/mds-server.c
+++ b/src/mds-server/mds-server.c
@@ -1048,6 +1048,125 @@ static int cmp_queued_interception(const void* a, const void* b)
/**
+ * Check if a condition matches any of a set of accepted patterns
+ *
+ * @param cond The condition
+ * @param hashes The hashes of the accepted header names
+ * @param keys The header names
+ * @param headers The header name–value pairs
+ * @param count The number of accepted patterns
+ * @return Evaluates to true if and only if a matching pattern was found
+ */
+static int is_condition_matching(interception_condition_t* cond,
+ size_t* hashes, char** keys, char** headers, size_t count)
+{
+ size_t i;
+ for (i = 0; i < count; i++)
+ if (*(cond->condition) == '\0')
+ return 1;
+ else if ((cond->header_hash == hashes[i]) &&
+ (strequals(cond->condition, keys[i]) ||
+ strequals(cond->condition, headers[i])))
+ return 1;
+ return 0;
+}
+
+
+/**
+ * Find a matching condition to any of a set of acceptable conditions
+ *
+ * @param client The intercepting client
+ * @param hashes The hashes of the accepted header names
+ * @param keys The header names
+ * @param headers The header name–value pairs
+ * @param count The number of accepted patterns
+ * @param interception_out Storage slot for found interception
+ * @return -1 on error, otherwise: evalutes to true iff a matching condition was found
+ */
+static int find_matching_condition(client_t* client, size_t* hashes, char** keys, char** headers, size_t count,
+ queued_interception_t* interception_out)
+{
+ pthread_mutex_t mutex = client->mutex;
+ interception_condition_t* conds = client->interception_conditions;
+ size_t n = 0, i;
+
+ errno = pthread_mutex_lock(&(mutex));
+ if (errno)
+ return -1;
+
+ /* Look for a matching condition. */
+ if (client->open)
+ n = client->interception_conditions_count;
+ for (i = 0; i < n; i++)
+ if (is_condition_matching(conds + i, hashes, keys, headers, count))
+ {
+ /* Report matching condition. */
+ interception_out->client = client;
+ interception_out->priority = conds[i].priority;
+ interception_out->modifying = conds[i].modifying;
+ break;
+ }
+
+ pthread_mutex_unlock(&(mutex));
+
+ return i < n;
+}
+
+
+/**
+ * Get all interceptors who have at least one condition matching any of a set of acceptable patterns
+ *
+ * @param sender The original sender of the message
+ * @param hashes The hashes of the accepted header names
+ * @param keys The header names
+ * @param headers The header name–value pairs
+ * @param count The number of accepted patterns
+ * @param interceptions_count_out Slot at where to store the number of found interceptors
+ * @return The found interceptors, `NULL` on error
+ */
+static queued_interception_t* get_interceptors(client_t* sender, size_t* hashes, char** keys, char** headers,
+ size_t count, size_t* interceptions_count_out)
+{
+ queued_interception_t* interceptions = NULL;
+ size_t interceptions_count = 0;
+ size_t n = 0;
+ ssize_t node;
+
+ /* Count clients. */
+ foreach_linked_list_node (client_list, node)
+ n++;
+
+ /* Allocate interceptor list. */
+ if (xmalloc(interceptions, n, queued_interception_t))
+ return NULL;
+
+ /* Search clients. */
+ foreach_linked_list_node (client_list, node)
+ {
+ client_t* client = (client_t*)(void*)(client_list.values[node]);
+
+ /* Look for and list a matching condition. */
+ if (client->open && (client != sender))
+ {
+ int r = find_matching_condition(client, hashes, keys, headers, count,
+ interceptions + interceptions_count);
+ if (r == -1)
+ {
+ free(interceptions);
+ return NULL;
+ }
+ if (r)
+ /* List client of there was a matching condition. */
+ interceptions_count++;
+ }
+ }
+
+ *interceptions_count_out = interceptions_count;
+ return interceptions;
+}
+
+
+/**
* Queue a message for multicasting
*
* @param message The message
@@ -1066,10 +1185,9 @@ void queue_message_multicast(char* message, size_t length, client_t* sender)
size_t interceptions_count = 0;
multicast_t* multicast = NULL;
size_t i;
- ssize_t node;
uint64_t modify_id;
char modify_id_header[13 + 3 * sizeof(uint64_t)];
- char* old_buf;
+ void* new_buf;
/* Count the number of headers. */
for (i = 0; i < n; i++)
@@ -1084,13 +1202,13 @@ void queue_message_multicast(char* message, size_t length, client_t* sender)
return; /* Invalid message. */
/* Allocate multicast message. */
- if (xmalloc(multicast, 1, multicast_t)) goto fail;
+ fail_if (xmalloc(multicast, 1, multicast_t));
multicast_initialise(multicast);
/* Allocate header lists. */
- if (xmalloc(hashes, header_count, size_t)) goto fail;
- if (xmalloc(headers, header_count, char*)) goto fail;
- if (xmalloc(header_values, header_count, char*)) goto fail;
+ fail_if (xmalloc(hashes, header_count, size_t));
+ fail_if (xmalloc(headers, header_count, char*));
+ fail_if (xmalloc(header_values, header_count, char*));
/* Populate header lists. */
for (i = 0; i < header_count; i++)
@@ -1102,14 +1220,14 @@ void queue_message_multicast(char* message, size_t length, client_t* sender)
if ((header_values[i] = strdup(msg)) == NULL)
{
header_count = i;
- goto fail;
+ goto pfail;
}
*colon = '\0';
if ((headers[i] = strdup(msg)) == NULL)
{
free(headers[i]);
header_count = i;
- goto fail;
+ goto pfail;
}
*colon = ':';
*end = '\n';
@@ -1120,65 +1238,9 @@ void queue_message_multicast(char* message, size_t length, client_t* sender)
/* Get intercepting clients. */
pthread_mutex_lock(&(slave_mutex));
- /* Count clients. */
- n = 0;
- foreach_linked_list_node (client_list, node)
- n++;
-
- /* Allocate interceptor list. */
- if (xmalloc(interceptions, n, queued_interception_t)) goto fail;
-
- /* Search clients. */
- foreach_linked_list_node (client_list, node)
- {
- client_t* client = (client_t*)(void*)(client_list.values[node]);
- pthread_mutex_t mutex = client->mutex;
- interception_condition_t* conds = client->interception_conditions;
- int64_t priority = 0; /* Initialise to stop incorrect warning. */
- int modifying = 0; /* Initialise to stop incorrect warning. */
- size_t j;
-
- /* Look for a matching condition. */
- n = client->interception_conditions_count;
- if (client->open && (client != sender))
- {
- pthread_mutex_lock(&(mutex));
- if (errno || (client->open == 0))
- n = 0;
- for (i = 0; i < n; i++)
- {
- interception_condition_t* cond = conds + i;
- for (j = 0; j < header_count; j++)
- {
- if (*(cond->condition) == '\0')
- break;
- if (cond->header_hash == hashes[j])
- if (strequals(cond->condition, headers[j]) ||
- strequals(cond->condition, header_values[j]))
- break;
- }
- if (j < header_count)
- {
- priority = cond->priority;
- modifying = cond->modifying;
- break;
- }
- }
- pthread_mutex_unlock(&(mutex));
- }
- else
- n = 0;
-
- /* List client of there was a matching condition. */
- if (i < n)
- {
- interceptions[interceptions_count].client = client;
- interceptions[interceptions_count].priority = priority;
- interceptions[interceptions_count].modifying = modifying;
- interceptions_count++;
- }
- }
+ interceptions = get_interceptors(sender, hashes, headers, header_values, header_count, &interceptions_count);
pthread_mutex_unlock(&(slave_mutex));
+ fail_if (interceptions == NULL);
/* Sort interceptors. */
qsort(interceptions, interceptions_count, sizeof(queued_interception_t), cmp_queued_interception);
@@ -1191,12 +1253,9 @@ void queue_message_multicast(char* message, size_t length, client_t* sender)
);
xsnprintf(modify_id_header, "Modify ID: %" PRIu64 "\n", modify_id);
n = strlen(modify_id_header);
- old_buf = message;
- if (xrealloc(message, n + length, char))
- {
- message = old_buf;
- goto fail;
- }
+ new_buf = message;
+ fail_if (xrealloc(new_buf, n + length, char));
+ message = new_buf;
memmove(message + n, message, length * sizeof(char));
memcpy(message, modify_id_header, n * sizeof(char));
@@ -1210,28 +1269,16 @@ void queue_message_multicast(char* message, size_t length, client_t* sender)
/* Queue message multicasting. */
with_mutex (sender->mutex,
- if (sender->multicasts == NULL)
- {
- if (xmalloc(sender->multicasts, 1, multicast_t))
- goto fail_queue;
- }
- else
- {
- multicast_t* new_buf;
- new_buf = realloc(sender->multicasts, (sender->multicasts_count + 1) * sizeof(multicast_t));
- if (new_buf == NULL)
- goto fail_queue;
- sender->multicasts = new_buf;
- }
+ new_buf = sender->multicasts;
+ if (xrealloc(new_buf, sender->multicasts_count + 1, multicast_t))
+ goto fail_queue;
+ sender->multicasts = new_buf;
sender->multicasts[sender->multicasts_count++] = *multicast;
multicast = NULL;
fail_queue:
);
- errno = 0;
fail: /* This is done before this function returns even if there was no error. */
- if (errno != 0)
- perror(*argv);
/* Release resources. */
xfree(headers, header_count);
xfree(header_values, header_count);
@@ -1240,6 +1287,96 @@ void queue_message_multicast(char* message, size_t length, client_t* sender)
if (multicast != NULL)
multicast_destroy(multicast);
free(multicast);
+ return;
+
+ pfail:
+ perror(*argv);
+ goto fail;
+}
+
+
+/**
+ * Get the client by its socket's file descriptor in a synchronised manner
+ *
+ * @param socket_fd The file descriptor of the client's socket
+ * @return The client
+ */
+static client_t* client_by_socket(int socket_fd)
+{
+ size_t address;
+ with_mutex (slave_mutex, address = fd_table_get(&client_map, socket_fd););
+ return (client_t*)(void*)address;
+}
+
+
+/**
+ * Send a multicast message to one recipient
+ *
+ * @param multicast The message
+ * @param recipient The recipient
+ * @param modifying Whether the recipient may modify the message
+ * @return Evaluates to true if and only if the entire message was sent
+ */
+static int send_multicast_to_recipient(multicast_t* multicast, client_t* recipient, int modifying)
+{
+ char* msg = multicast->message + multicast->message_ptr;
+ size_t n = multicast->message_length - multicast->message_ptr;
+ size_t sent;
+
+ /* Skip Modify ID header if the interceptors will not perform a modification. */
+ if ((modifying == 0) && (multicast->message_ptr == 0))
+ {
+ n -= multicast->message_prefix;
+ multicast->message_ptr += multicast->message_prefix;
+ }
+
+ /* Send the message. */
+ n *= sizeof(char);
+ with_mutex (recipient->mutex,
+ if (recipient->open)
+ {
+ sent = send_message(recipient->socket_fd, msg + multicast->message_ptr, n);
+ n -= sent;
+ multicast->message_ptr += sent / sizeof(char);
+ if ((n > 0) && (errno != EINTR))
+ perror(*argv);
+ }
+ );
+
+ return n == 0;
+}
+
+
+/**
+ * Wait for the recipient of a multicast to reply
+ *
+ * @param recipient The recipient
+ * @param modify_id The modify ID of the multicast
+ */
+static void wait_for_reply(client_t* recipient, uint64_t modify_id)
+{
+ /* pthread_cond_timedwait is required to handle re-exec and termination because
+ pthread_cond_timedwait and pthread_cond_wait ignore interruptions via signals. */
+ struct timespec timeout =
+ {
+ .tv_sec = 1,
+ .tv_nsec = 0
+ };
+
+ with_mutex_if (modify_mutex, recipient->modify_message == NULL,
+ if (hash_table_contains_key(&modify_map, (size_t)modify_id) == 0)
+ {
+ hash_table_put(&modify_map, (size_t)modify_id, (size_t)(void*)recipient);
+ pthread_cond_signal(&slave_cond);
+ }
+ );
+
+ with_mutex_if (recipient->modify_mutex, recipient->modify_message == NULL,
+ while ((recipient->modify_message == NULL) && (terminating == 0))
+ pthread_cond_timedwait(&slave_cond, &slave_mutex, &timeout);
+ if (terminating == 0)
+ hash_table_remove(&modify_map, (size_t)modify_id);
+ );
}
@@ -1265,41 +1402,19 @@ void multicast_message(multicast_t* multicast)
{
queued_interception_t client_ = multicast->interceptions[multicast->interceptions_ptr];
client_t* client = client_.client;
- char* msg = multicast->message + multicast->message_ptr;
- size_t sent;
- n = multicast->message_length - multicast->message_ptr;
+ int modifying = 0;
+ char* old_buf;
+ size_t i;
+ mds_message_t* mod;
/* After unmarshalling at re-exec, client will be NULL and must be mapped from its socket. */
if (client == NULL)
- {
- size_t address;
- with_mutex (slave_mutex, address = fd_table_get(&client_map, client_.socket_fd););
- client_.client = client = (client_t*)(void*)address;
- }
+ client_.client = client = client_by_socket(client_.socket_fd);
- /* Skip Modify ID header if the interceptors will not perform a modification. */
- if ((client_.modifying == 0) && (multicast->message_ptr == 0))
- {
- n -= multicast->message_prefix;
- multicast->message_ptr += multicast->message_prefix;
- }
-
- /* Send the message. */
- n *= sizeof(char);
- with_mutex (client->mutex,
- if (client->open)
- {
- sent = send_message(client->socket_fd, msg + multicast->message_ptr, n);
- n -= sent;
- multicast->message_ptr += sent / sizeof(char);
- if ((n > 0) && (errno != EINTR))
- perror(*argv);
- }
- );
-
- /* Stop if we are re-exec:ing or terminating, or continue to next recipient on error. */
- if (n > 0)
+ /* Send the message to the recipient. */
+ if (send_multicast_to_recipient(multicast, client, client_.modifying) == 0)
{
+ /* Stop if we are re-exec:ing or terminating, or continue to next recipient on error. */
if (terminating)
return;
else
@@ -1314,64 +1429,37 @@ void multicast_message(multicast_t* multicast)
continue;
}
- /* Wait for a reply and act upon it. */
- {
- /* pthread_cond_timedwait is required to handle re-exec and termination because
- pthread_cond_timedwait and pthread_cond_wait ignore interruptions via signals. */
- struct timespec timeout =
+ /* Wait for a reply. */
+ wait_for_reply(client, modify_id);
+ if (terminating)
+ return;
+
+ /* Act upon the reply. */
+ mod = client->modify_message;
+ for (i = 0; i < mod->header_count; i++)
+ if (strequals(mod->headers[i], "Modify: yes"))
{
- .tv_sec = 1,
- .tv_nsec = 0
- };
- int modifying = 0;
- char* old_buf;
- size_t i;
- mds_message_t* mod;
-
- /* Wait for a reply. */
- with_mutex_if (modify_mutex, client->modify_message == NULL,
- if (hash_table_contains_key(&modify_map, (size_t)modify_id) == 0)
- {
- hash_table_put(&modify_map, (size_t)modify_id, (size_t)(void*)client);
- pthread_cond_signal(&slave_cond);
- }
- );
- with_mutex_if (client->modify_mutex, client->modify_message == NULL,
- while ((client->modify_message == NULL) && (terminating == 0))
- pthread_cond_timedwait(&slave_cond, &slave_mutex, &timeout);
- if (terminating == 0)
- hash_table_remove(&modify_map, (size_t)modify_id);
- );
- if (terminating)
- return;
-
- /* Act upon the reply. */
- mod = client->modify_message;
- for (i = 0; i < mod->header_count; i++)
- if (strequals(mod->headers[i], "Modify: yes"))
+ modifying = 1;
+ break;
+ }
+ if (modifying)
+ {
+ n = mod->payload_size;
+ old_buf = multicast->message;
+ if (xrealloc(multicast->message, multicast->message_prefix + n, char))
{
- modifying = 1;
- break;
+ perror(*argv);
+ multicast->message = old_buf;
}
- if (modifying)
- {
- n = mod->payload_size;
- old_buf = multicast->message;
- if (xrealloc(multicast->message, multicast->message_prefix + n, char))
- {
- perror(*argv);
- multicast->message = old_buf;
- }
- else
- memcpy(multicast->message + multicast->message_prefix, mod->payload, n);
- }
-
- /* Free the reply. */
- mds_message_destroy(client->modify_message);
-
- /* Reset how much of the message has been sent before we continue with next recipient. */
- multicast->message_ptr = 0;
- }
+ else
+ memcpy(multicast->message + multicast->message_prefix, mod->payload, n);
+ }
+
+ /* Free the reply. */
+ mds_message_destroy(client->modify_message);
+
+ /* Reset how much of the message has been sent before we continue with next recipient. */
+ multicast->message_ptr = 0;
}
}