diff options
Diffstat (limited to 'src')
-rw-r--r-- | src/libmdsserver/macros.h | 2 | ||||
-rw-r--r-- | src/mds-server/mds-server.c | 434 |
2 files changed, 262 insertions, 174 deletions
diff --git a/src/libmdsserver/macros.h b/src/libmdsserver/macros.h index 3f7b831..95f705f 100644 --- a/src/libmdsserver/macros.h +++ b/src/libmdsserver/macros.h @@ -211,7 +211,7 @@ * @return :int Whether the strings are equal */ #define strequals(a, b) \ - ((a == b) || (strcmp(a, b) == 0)) + (strcmp(a, b) == 0) /** diff --git a/src/mds-server/mds-server.c b/src/mds-server/mds-server.c index 4d146b7..d835a80 100644 --- a/src/mds-server/mds-server.c +++ b/src/mds-server/mds-server.c @@ -1048,6 +1048,125 @@ static int cmp_queued_interception(const void* a, const void* b) /** + * Check if a condition matches any of a set of accepted patterns + * + * @param cond The condition + * @param hashes The hashes of the accepted header names + * @param keys The header names + * @param headers The header name–value pairs + * @param count The number of accepted patterns + * @return Evaluates to true if and only if a matching pattern was found + */ +static int is_condition_matching(interception_condition_t* cond, + size_t* hashes, char** keys, char** headers, size_t count) +{ + size_t i; + for (i = 0; i < count; i++) + if (*(cond->condition) == '\0') + return 1; + else if ((cond->header_hash == hashes[i]) && + (strequals(cond->condition, keys[i]) || + strequals(cond->condition, headers[i]))) + return 1; + return 0; +} + + +/** + * Find a matching condition to any of a set of acceptable conditions + * + * @param client The intercepting client + * @param hashes The hashes of the accepted header names + * @param keys The header names + * @param headers The header name–value pairs + * @param count The number of accepted patterns + * @param interception_out Storage slot for found interception + * @return -1 on error, otherwise: evalutes to true iff a matching condition was found + */ +static int find_matching_condition(client_t* client, size_t* hashes, char** keys, char** headers, size_t count, + queued_interception_t* interception_out) +{ + pthread_mutex_t mutex = client->mutex; + interception_condition_t* conds = client->interception_conditions; + size_t n = 0, i; + + errno = pthread_mutex_lock(&(mutex)); + if (errno) + return -1; + + /* Look for a matching condition. */ + if (client->open) + n = client->interception_conditions_count; + for (i = 0; i < n; i++) + if (is_condition_matching(conds + i, hashes, keys, headers, count)) + { + /* Report matching condition. */ + interception_out->client = client; + interception_out->priority = conds[i].priority; + interception_out->modifying = conds[i].modifying; + break; + } + + pthread_mutex_unlock(&(mutex)); + + return i < n; +} + + +/** + * Get all interceptors who have at least one condition matching any of a set of acceptable patterns + * + * @param sender The original sender of the message + * @param hashes The hashes of the accepted header names + * @param keys The header names + * @param headers The header name–value pairs + * @param count The number of accepted patterns + * @param interceptions_count_out Slot at where to store the number of found interceptors + * @return The found interceptors, `NULL` on error + */ +static queued_interception_t* get_interceptors(client_t* sender, size_t* hashes, char** keys, char** headers, + size_t count, size_t* interceptions_count_out) +{ + queued_interception_t* interceptions = NULL; + size_t interceptions_count = 0; + size_t n = 0; + ssize_t node; + + /* Count clients. */ + foreach_linked_list_node (client_list, node) + n++; + + /* Allocate interceptor list. */ + if (xmalloc(interceptions, n, queued_interception_t)) + return NULL; + + /* Search clients. */ + foreach_linked_list_node (client_list, node) + { + client_t* client = (client_t*)(void*)(client_list.values[node]); + + /* Look for and list a matching condition. */ + if (client->open && (client != sender)) + { + int r = find_matching_condition(client, hashes, keys, headers, count, + interceptions + interceptions_count); + if (r == -1) + { + free(interceptions); + return NULL; + } + if (r) + /* List client of there was a matching condition. */ + interceptions_count++; + } + } + + *interceptions_count_out = interceptions_count; + return interceptions; +} + + +/** * Queue a message for multicasting * * @param message The message @@ -1066,10 +1185,9 @@ void queue_message_multicast(char* message, size_t length, client_t* sender) size_t interceptions_count = 0; multicast_t* multicast = NULL; size_t i; - ssize_t node; uint64_t modify_id; char modify_id_header[13 + 3 * sizeof(uint64_t)]; - char* old_buf; + void* new_buf; /* Count the number of headers. */ for (i = 0; i < n; i++) @@ -1084,13 +1202,13 @@ void queue_message_multicast(char* message, size_t length, client_t* sender) return; /* Invalid message. */ /* Allocate multicast message. */ - if (xmalloc(multicast, 1, multicast_t)) goto fail; + fail_if (xmalloc(multicast, 1, multicast_t)); multicast_initialise(multicast); /* Allocate header lists. */ - if (xmalloc(hashes, header_count, size_t)) goto fail; - if (xmalloc(headers, header_count, char*)) goto fail; - if (xmalloc(header_values, header_count, char*)) goto fail; + fail_if (xmalloc(hashes, header_count, size_t)); + fail_if (xmalloc(headers, header_count, char*)); + fail_if (xmalloc(header_values, header_count, char*)); /* Populate header lists. */ for (i = 0; i < header_count; i++) @@ -1102,14 +1220,14 @@ void queue_message_multicast(char* message, size_t length, client_t* sender) if ((header_values[i] = strdup(msg)) == NULL) { header_count = i; - goto fail; + goto pfail; } *colon = '\0'; if ((headers[i] = strdup(msg)) == NULL) { free(headers[i]); header_count = i; - goto fail; + goto pfail; } *colon = ':'; *end = '\n'; @@ -1120,65 +1238,9 @@ void queue_message_multicast(char* message, size_t length, client_t* sender) /* Get intercepting clients. */ pthread_mutex_lock(&(slave_mutex)); - /* Count clients. */ - n = 0; - foreach_linked_list_node (client_list, node) - n++; - - /* Allocate interceptor list. */ - if (xmalloc(interceptions, n, queued_interception_t)) goto fail; - - /* Search clients. */ - foreach_linked_list_node (client_list, node) - { - client_t* client = (client_t*)(void*)(client_list.values[node]); - pthread_mutex_t mutex = client->mutex; - interception_condition_t* conds = client->interception_conditions; - int64_t priority = 0; /* Initialise to stop incorrect warning. */ - int modifying = 0; /* Initialise to stop incorrect warning. */ - size_t j; - - /* Look for a matching condition. */ - n = client->interception_conditions_count; - if (client->open && (client != sender)) - { - pthread_mutex_lock(&(mutex)); - if (errno || (client->open == 0)) - n = 0; - for (i = 0; i < n; i++) - { - interception_condition_t* cond = conds + i; - for (j = 0; j < header_count; j++) - { - if (*(cond->condition) == '\0') - break; - if (cond->header_hash == hashes[j]) - if (strequals(cond->condition, headers[j]) || - strequals(cond->condition, header_values[j])) - break; - } - if (j < header_count) - { - priority = cond->priority; - modifying = cond->modifying; - break; - } - } - pthread_mutex_unlock(&(mutex)); - } - else - n = 0; - - /* List client of there was a matching condition. */ - if (i < n) - { - interceptions[interceptions_count].client = client; - interceptions[interceptions_count].priority = priority; - interceptions[interceptions_count].modifying = modifying; - interceptions_count++; - } - } + interceptions = get_interceptors(sender, hashes, headers, header_values, header_count, &interceptions_count); pthread_mutex_unlock(&(slave_mutex)); + fail_if (interceptions == NULL); /* Sort interceptors. */ qsort(interceptions, interceptions_count, sizeof(queued_interception_t), cmp_queued_interception); @@ -1191,12 +1253,9 @@ void queue_message_multicast(char* message, size_t length, client_t* sender) ); xsnprintf(modify_id_header, "Modify ID: %" PRIu64 "\n", modify_id); n = strlen(modify_id_header); - old_buf = message; - if (xrealloc(message, n + length, char)) - { - message = old_buf; - goto fail; - } + new_buf = message; + fail_if (xrealloc(new_buf, n + length, char)); + message = new_buf; memmove(message + n, message, length * sizeof(char)); memcpy(message, modify_id_header, n * sizeof(char)); @@ -1210,28 +1269,16 @@ void queue_message_multicast(char* message, size_t length, client_t* sender) /* Queue message multicasting. */ with_mutex (sender->mutex, - if (sender->multicasts == NULL) - { - if (xmalloc(sender->multicasts, 1, multicast_t)) - goto fail_queue; - } - else - { - multicast_t* new_buf; - new_buf = realloc(sender->multicasts, (sender->multicasts_count + 1) * sizeof(multicast_t)); - if (new_buf == NULL) - goto fail_queue; - sender->multicasts = new_buf; - } + new_buf = sender->multicasts; + if (xrealloc(new_buf, sender->multicasts_count + 1, multicast_t)) + goto fail_queue; + sender->multicasts = new_buf; sender->multicasts[sender->multicasts_count++] = *multicast; multicast = NULL; fail_queue: ); - errno = 0; fail: /* This is done before this function returns even if there was no error. */ - if (errno != 0) - perror(*argv); /* Release resources. */ xfree(headers, header_count); xfree(header_values, header_count); @@ -1240,6 +1287,96 @@ void queue_message_multicast(char* message, size_t length, client_t* sender) if (multicast != NULL) multicast_destroy(multicast); free(multicast); + return; + + pfail: + perror(*argv); + goto fail; +} + + +/** + * Get the client by its socket's file descriptor in a synchronised manner + * + * @param socket_fd The file descriptor of the client's socket + * @return The client + */ +static client_t* client_by_socket(int socket_fd) +{ + size_t address; + with_mutex (slave_mutex, address = fd_table_get(&client_map, socket_fd);); + return (client_t*)(void*)address; +} + + +/** + * Send a multicast message to one recipient + * + * @param multicast The message + * @param recipient The recipient + * @param modifying Whether the recipient may modify the message + * @return Evaluates to true if and only if the entire message was sent + */ +static int send_multicast_to_recipient(multicast_t* multicast, client_t* recipient, int modifying) +{ + char* msg = multicast->message + multicast->message_ptr; + size_t n = multicast->message_length - multicast->message_ptr; + size_t sent; + + /* Skip Modify ID header if the interceptors will not perform a modification. */ + if ((modifying == 0) && (multicast->message_ptr == 0)) + { + n -= multicast->message_prefix; + multicast->message_ptr += multicast->message_prefix; + } + + /* Send the message. */ + n *= sizeof(char); + with_mutex (recipient->mutex, + if (recipient->open) + { + sent = send_message(recipient->socket_fd, msg + multicast->message_ptr, n); + n -= sent; + multicast->message_ptr += sent / sizeof(char); + if ((n > 0) && (errno != EINTR)) + perror(*argv); + } + ); + + return n == 0; +} + + +/** + * Wait for the recipient of a multicast to reply + * + * @param recipient The recipient + * @param modify_id The modify ID of the multicast + */ +static void wait_for_reply(client_t* recipient, uint64_t modify_id) +{ + /* pthread_cond_timedwait is required to handle re-exec and termination because + pthread_cond_timedwait and pthread_cond_wait ignore interruptions via signals. */ + struct timespec timeout = + { + .tv_sec = 1, + .tv_nsec = 0 + }; + + with_mutex_if (modify_mutex, recipient->modify_message == NULL, + if (hash_table_contains_key(&modify_map, (size_t)modify_id) == 0) + { + hash_table_put(&modify_map, (size_t)modify_id, (size_t)(void*)recipient); + pthread_cond_signal(&slave_cond); + } + ); + + with_mutex_if (recipient->modify_mutex, recipient->modify_message == NULL, + while ((recipient->modify_message == NULL) && (terminating == 0)) + pthread_cond_timedwait(&slave_cond, &slave_mutex, &timeout); + if (terminating == 0) + hash_table_remove(&modify_map, (size_t)modify_id); + ); } @@ -1265,41 +1402,19 @@ void multicast_message(multicast_t* multicast) { queued_interception_t client_ = multicast->interceptions[multicast->interceptions_ptr]; client_t* client = client_.client; - char* msg = multicast->message + multicast->message_ptr; - size_t sent; - n = multicast->message_length - multicast->message_ptr; + int modifying = 0; + char* old_buf; + size_t i; + mds_message_t* mod; /* After unmarshalling at re-exec, client will be NULL and must be mapped from its socket. */ if (client == NULL) - { - size_t address; - with_mutex (slave_mutex, address = fd_table_get(&client_map, client_.socket_fd);); - client_.client = client = (client_t*)(void*)address; - } + client_.client = client = client_by_socket(client_.socket_fd); - /* Skip Modify ID header if the interceptors will not perform a modification. */ - if ((client_.modifying == 0) && (multicast->message_ptr == 0)) - { - n -= multicast->message_prefix; - multicast->message_ptr += multicast->message_prefix; - } - - /* Send the message. */ - n *= sizeof(char); - with_mutex (client->mutex, - if (client->open) - { - sent = send_message(client->socket_fd, msg + multicast->message_ptr, n); - n -= sent; - multicast->message_ptr += sent / sizeof(char); - if ((n > 0) && (errno != EINTR)) - perror(*argv); - } - ); - - /* Stop if we are re-exec:ing or terminating, or continue to next recipient on error. */ - if (n > 0) + /* Send the message to the recipient. */ + if (send_multicast_to_recipient(multicast, client, client_.modifying) == 0) { + /* Stop if we are re-exec:ing or terminating, or continue to next recipient on error. */ if (terminating) return; else @@ -1314,64 +1429,37 @@ void multicast_message(multicast_t* multicast) continue; } - /* Wait for a reply and act upon it. */ - { - /* pthread_cond_timedwait is required to handle re-exec and termination because - pthread_cond_timedwait and pthread_cond_wait ignore interruptions via signals. */ - struct timespec timeout = + /* Wait for a reply. */ + wait_for_reply(client, modify_id); + if (terminating) + return; + + /* Act upon the reply. */ + mod = client->modify_message; + for (i = 0; i < mod->header_count; i++) + if (strequals(mod->headers[i], "Modify: yes")) { - .tv_sec = 1, - .tv_nsec = 0 - }; - int modifying = 0; - char* old_buf; - size_t i; - mds_message_t* mod; - - /* Wait for a reply. */ - with_mutex_if (modify_mutex, client->modify_message == NULL, - if (hash_table_contains_key(&modify_map, (size_t)modify_id) == 0) - { - hash_table_put(&modify_map, (size_t)modify_id, (size_t)(void*)client); - pthread_cond_signal(&slave_cond); - } - ); - with_mutex_if (client->modify_mutex, client->modify_message == NULL, - while ((client->modify_message == NULL) && (terminating == 0)) - pthread_cond_timedwait(&slave_cond, &slave_mutex, &timeout); - if (terminating == 0) - hash_table_remove(&modify_map, (size_t)modify_id); - ); - if (terminating) - return; - - /* Act upon the reply. */ - mod = client->modify_message; - for (i = 0; i < mod->header_count; i++) - if (strequals(mod->headers[i], "Modify: yes")) + modifying = 1; + break; + } + if (modifying) + { + n = mod->payload_size; + old_buf = multicast->message; + if (xrealloc(multicast->message, multicast->message_prefix + n, char)) { - modifying = 1; - break; + perror(*argv); + multicast->message = old_buf; } - if (modifying) - { - n = mod->payload_size; - old_buf = multicast->message; - if (xrealloc(multicast->message, multicast->message_prefix + n, char)) - { - perror(*argv); - multicast->message = old_buf; - } - else - memcpy(multicast->message + multicast->message_prefix, mod->payload, n); - } - - /* Free the reply. */ - mds_message_destroy(client->modify_message); - - /* Reset how much of the message has been sent before we continue with next recipient. */ - multicast->message_ptr = 0; - } + else + memcpy(multicast->message + multicast->message_prefix, mod->payload, n); + } + + /* Free the reply. */ + mds_message_destroy(client->modify_message); + + /* Reset how much of the message has been sent before we continue with next recipient. */ + multicast->message_ptr = 0; } } |