diff options
Diffstat (limited to 'src/mds-server')
-rw-r--r-- | src/mds-server/mds-server.c | 222 |
1 files changed, 107 insertions, 115 deletions
diff --git a/src/mds-server/mds-server.c b/src/mds-server/mds-server.c index 0d669ad..4d146b7 100644 --- a/src/mds-server/mds-server.c +++ b/src/mds-server/mds-server.c @@ -551,19 +551,16 @@ void send_multicast_queue(client_t* client) while (client->multicasts_count > 0) { multicast_t multicast; - with_mutex (client->mutex, - if (client->multicasts_count > 0) - { - size_t c = (client->multicasts_count -= 1) * sizeof(multicast_t); - multicast = client->multicasts[0]; - memmove(client->multicasts, client->multicasts + 1, c); - if (c == 0) - { - free(client->multicasts); - client->multicasts = NULL; - } - } - ); + with_mutex_if (client->mutex, client->multicasts_count > 0, + size_t c = (client->multicasts_count -= 1) * sizeof(multicast_t); + multicast = client->multicasts[0]; + memmove(client->multicasts, client->multicasts + 1, c); + if (c == 0) + { + free(client->multicasts); + client->multicasts = NULL; + } + ); multicast_message(&multicast); multicast_destroy(&multicast); } @@ -592,13 +589,13 @@ void send_reply_queue(client_t* client) while (n > 0) { sent = send_message(client->socket_fd, sendbuf_, n); - if ((sent < n) && (errno != EINTR)) /* Ignore EINTR */ + n -= sent; + sendbuf_ += sent / sizeof(char); + if ((n > 0) && (errno != EINTR)) /* Ignore EINTR */ { perror(*argv); break; } - n -= sent; - sendbuf_ += sent / sizeof(char); } free(sendbuf); ); @@ -745,10 +742,7 @@ int message_received(client_t* client) if (assign_id && (client->id == 0)) { intercept |= 2; - with_mutex (slave_mutex, - client->id = next_id++; - if (next_id == 0) - { + with_mutex_if (slave_mutex, (client->id = ++next_id) == 0, eprint("this is impossible, ID counter has overflowed."); /* If the program ran for a millennium it would take c:a 585 assignments per nanosecond. This @@ -756,8 +750,7 @@ int message_received(client_t* client) dedication by generations of ponies (or just an alicorn) to maintain the process and transfer it new hardware.) */ abort(); - } - ); + ); } /* Make the client listen for messages addressed to it. */ @@ -795,8 +788,7 @@ int message_received(client_t* client) if (len > size) { char* old_buf = buf; - buf = realloc(buf, ((size <<= 1) + 1) * sizeof(char)); - if (buf == NULL) + if (xrealloc(buf, (size <<= 1) + 1, char)) { perror(*argv); free(old_buf); @@ -901,6 +893,35 @@ int message_received(client_t* client) /** + * Remove interception condition by index + * + * @param client The intercepting client + * @param index The index of the condition + */ +static void remove_intercept_condition(client_t* client, size_t index) +{ + interception_condition_t* conds = client->interception_conditions; + size_t n = client->interception_conditions_count; + + /* Remove the condition from the list. */ + memmove(conds + index, conds + index + 1, --n - index); + client->interception_conditions_count--; + + /* Shrink the list. */ + if (client->interception_conditions_count == 0) + { + free(conds); + client->interception_conditions = NULL; + } + else + if (xrealloc(conds, n, interception_condition_t)) + perror(*argv); + else + client->interception_conditions = conds; +} + + +/** * Add an interception condition for a client * * @param client The client @@ -934,51 +955,38 @@ void add_intercept_condition(client_t* client, char* condition, int64_t priority with for optimisation. */ for (i = 0; i < n; i++) { - if ((conds[i].header_hash == hash) && strequals(conds[i].condition, condition)) + if ((conds[i].header_hash != hash) || !strequals(conds[i].condition, condition)) { - if (stop) - { - /* Remove the condition from the list. */ - memmove(conds + i, conds + i + 1, --n - i); - client->interception_conditions_count--; - /* Shrink the list. */ - if (n == 0) - { - free(conds); - client->interception_conditions = NULL; - } - else - if ((conds = realloc(conds, n * sizeof(interception_condition_t))) == NULL) - perror(*argv); - else - client->interception_conditions = conds; - } - else + /* Look for the first non-modifying, this is a part of the + optimisation where we put all modifying conditions at the + beginning. */ + if ((nonmodifying < 0) && conds[i].modifying) + nonmodifying = (ssize_t)i; + continue; + } + + if (stop) + remove_intercept_condition(client, i); + else + { + /* Update parameters. */ + conds[i].priority = priority; + conds[i].modifying = modifying; + + if (modifying && (nonmodifying >= 0)) { - /* Update parameters. */ - conds[i].priority = priority; - conds[i].modifying = modifying; - - if (modifying && (nonmodifying >= 0)) - { - /* Optimisation: put conditions that are modifying - at the beginning. When a client is intercepting - we most know if any satisfying condition is - modifying. With this optimisation the first - satisfying condition will tell us if there is - any satisfying condition that is modifying. */ - interception_condition_t temp = conds[nonmodifying]; - conds[nonmodifying] = conds[i]; - conds[i] = temp; - } + /* Optimisation: put conditions that are modifying + at the beginning. When a client is intercepting + we most know if any satisfying condition is + modifying. With this optimisation the first + satisfying condition will tell us if there is + any satisfying condition that is modifying. */ + interception_condition_t temp = conds[nonmodifying]; + conds[nonmodifying] = conds[i]; + conds[i] = temp; } - return; } - /* Look for the first non-modifying, this is a part of the - optimisation where we put all modifying conditions at the - beginning. */ - if ((nonmodifying < 0) && conds[i].modifying) - nonmodifying = (ssize_t)i; + return; } if (stop) @@ -993,11 +1001,7 @@ void add_intercept_condition(client_t* client, char* condition, int64_t priority } /* Grow the interception condition list. */ - if (conds == NULL) - conds = malloc(1 * sizeof(interception_condition_t)); - else - conds = realloc(conds, (n + 1) * sizeof(interception_condition_t)); - if (conds == NULL) + if (xrealloc(conds, n + 1, interception_condition_t)) { perror(*argv); free(condition); @@ -1188,7 +1192,7 @@ void queue_message_multicast(char* message, size_t length, client_t* sender) xsnprintf(modify_id_header, "Modify ID: %" PRIu64 "\n", modify_id); n = strlen(modify_id_header); old_buf = message; - if ((message = realloc(message, (n + length) * sizeof(char))) == NULL) + if (xrealloc(message, n + length, char)) { message = old_buf; goto fail; @@ -1247,8 +1251,8 @@ void queue_message_multicast(char* message, size_t length, client_t* sender) void multicast_message(multicast_t* multicast) { uint64_t modify_id = 0; - size_t n = min(strlen("Modify ID: "), multicast->message_length); - if (!strncmp(multicast->message, "Modify ID: ", n)) + size_t n = strlen("Modify ID: "); + if (startswith_n(multicast->message, "Modify ID: ", multicast->message_length, n)) { char* value = multicast->message + n; char* lf = strchr(value, '\n'); @@ -1281,22 +1285,16 @@ void multicast_message(multicast_t* multicast) } /* Send the message. */ + n *= sizeof(char); with_mutex (client->mutex, - errno = 0; - n *= sizeof(char); if (client->open) - while (n > 0) - { - sent = send_message(client->socket_fd, msg + multicast->message_ptr, n); - if (sent < n) - { - if (errno != EINTR) - perror(*argv); - break; - } - n -= sent; - multicast->message_ptr += sent / sizeof(char); - } + { + sent = send_message(client->socket_fd, msg + multicast->message_ptr, n); + n -= sent; + multicast->message_ptr += sent / sizeof(char); + if ((n > 0) && (errno != EINTR)) + perror(*argv); + } ); /* Stop if we are re-exec:ing or terminating, or continue to next recipient on error. */ @@ -1331,42 +1329,35 @@ void multicast_message(multicast_t* multicast) mds_message_t* mod; /* Wait for a reply. */ - with_mutex (modify_mutex, - if (client->modify_message == NULL) - { - if (hash_table_contains_key(&modify_map, (size_t)modify_id) == 0) - { - hash_table_put(&modify_map, (size_t)modify_id, (size_t)(void*)client); - pthread_cond_signal(&slave_cond); - } - } - ); - with_mutex (client->modify_mutex, - if (client->modify_message == NULL) - { - while ((client->modify_message == NULL) && (terminating == 0)) - pthread_cond_timedwait(&slave_cond, &slave_mutex, &timeout); - if (terminating == 0) - hash_table_remove(&modify_map, (size_t)modify_id); - } - ); + with_mutex_if (modify_mutex, client->modify_message == NULL, + if (hash_table_contains_key(&modify_map, (size_t)modify_id) == 0) + { + hash_table_put(&modify_map, (size_t)modify_id, (size_t)(void*)client); + pthread_cond_signal(&slave_cond); + } + ); + with_mutex_if (client->modify_mutex, client->modify_message == NULL, + while ((client->modify_message == NULL) && (terminating == 0)) + pthread_cond_timedwait(&slave_cond, &slave_mutex, &timeout); + if (terminating == 0) + hash_table_remove(&modify_map, (size_t)modify_id); + ); if (terminating) return; /* Act upon the reply. */ mod = client->modify_message; for (i = 0; i < mod->header_count; i++) - if (!strcmp(mod->headers[i], "Modify: yes")) + if (strequals(mod->headers[i], "Modify: yes")) { modifying = 1; break; } if (modifying) { - old_buf = multicast->message; n = mod->payload_size; - multicast->message = realloc(old_buf, (multicast->message_prefix + n) * sizeof(char)); - if (multicast->message == NULL) + old_buf = multicast->message; + if (xrealloc(multicast->message, multicast->message_prefix + n, char)) { perror(*argv); multicast->message = old_buf; @@ -1507,11 +1498,12 @@ void signal_all(int signo) with_mutex (slave_mutex, foreach_linked_list_node (client_list, node) - { - client_t* value = (client_t*)(void*)(client_list.values[node]); - if (pthread_equal(current_thread, value->thread) == 0) - pthread_kill(value->thread, signo); - }); + { + client_t* value = (client_t*)(void*)(client_list.values[node]); + if (pthread_equal(current_thread, value->thread) == 0) + pthread_kill(value->thread, signo); + } + ); } |