diff options
Diffstat (limited to 'src')
-rw-r--r-- | src/libmdsserver/fd-table.c | 6 | ||||
-rw-r--r-- | src/libmdsserver/macros.h | 27 | ||||
-rw-r--r-- | src/libmdsserver/mds-message.c | 21 | ||||
-rw-r--r-- | src/libmdsserver/util.c | 28 | ||||
-rw-r--r-- | src/libmdsserver/util.h | 12 | ||||
-rw-r--r-- | src/mds-server/mds-server.c | 222 |
6 files changed, 179 insertions, 137 deletions
diff --git a/src/libmdsserver/fd-table.c b/src/libmdsserver/fd-table.c index eb681f9..0852eb6 100644 --- a/src/libmdsserver/fd-table.c +++ b/src/libmdsserver/fd-table.c @@ -161,8 +161,7 @@ size_t fd_table_put(fd_table_t* restrict this, int key, size_t value) { size_t* old_values = this->values; size_t old_bitcap, new_bitcap; - this->values = realloc(this->values, (this->capacity << 1) * sizeof(size_t)); - if (this->values == NULL) + if (xrealloc(this->values, this->capacity << 1, size_t)) { this->values = old_values; return 0; @@ -177,8 +176,7 @@ size_t fd_table_put(fd_table_t* restrict this, int key, size_t value) if (new_bitcap > old_bitcap) { uint64_t* old_used = this->used; - this->used = realloc(this->used, new_bitcap * sizeof(size_t)); - if (this->used == NULL) + if (xrealloc(this->used, new_bitcap, size_t)) { this->used = old_used; this->capacity >>= 1; diff --git a/src/libmdsserver/macros.h b/src/libmdsserver/macros.h index 1623950..95f705f 100644 --- a/src/libmdsserver/macros.h +++ b/src/libmdsserver/macros.h @@ -77,6 +77,21 @@ instructions \ errno = pthread_mutex_unlock(&(mutex)) +/** + * Wrapper for `pthread_mutex_lock` and `pthread_mutex_unlock` with an embedded if-statement + * + * @param mutex:pthread_mutex_t The mutex + * @parma condition The condition to test + * @param instructions The instructions to run while the mutex is locked + */ +#define with_mutex_if(mutex, condition, instructions) \ + errno = pthread_mutex_lock(&(mutex)); \ + if (condition) \ + { \ + instructions \ + } \ + errno = pthread_mutex_unlock(&(mutex)) + /** * Return the maximum value of two values @@ -296,6 +311,18 @@ /** + * `remalloc` wrapper that returns whether the allocation was not successful + * + * @param var The variable to which to assign the reallocation + * @param elements The number of elements to allocate + * @param type The data type of the elements for which to create an allocation + * @return :int Evaluates to true if an only if the allocation failed + */ +#define xrealloc(var, elements, type) \ + ((var = realloc(var, (elements) * sizeof(type))) == NULL) + + +/** * Go to the label `pfail` if a condition is met * * @param CONDITION The condition diff --git a/src/libmdsserver/mds-message.c b/src/libmdsserver/mds-message.c index f6dca1a..bc71ca6 100644 --- a/src/libmdsserver/mds-message.c +++ b/src/libmdsserver/mds-message.c @@ -163,22 +163,13 @@ int mds_message_read(mds_message_t* restrict this, int fd) that it does not need to be reallocated again and again. */ if (header_commit_buffer == 0) { + char** old_headers = this->headers; header_commit_buffer = 8; - if (this->header_count == 0) + n = this->header_count + header_commit_buffer; + if (xrealloc(this->headers, n, char*)) { - if (xmalloc(this->headers, header_commit_buffer, char*)) - return -1; - } - else - { - char** old_headers = this->headers; - n = this->header_count + header_commit_buffer; - this->headers = realloc(this->headers, n * sizeof(char*)); - if (this->headers == NULL) - { - this->headers = old_headers; + this->headers = old_headers; return -1; - } } } @@ -261,9 +252,7 @@ int mds_message_read(mds_message_t* restrict this, int fd) if (n < 128) { char* old_buffer = this->buffer; - this->buffer_size <<= 1; - this->buffer = realloc(this->buffer, this->buffer_size * sizeof(char)); - if (this->buffer == NULL) + if (xrealloc(this->buffer, this->buffer_size <<= 1, char)) { this->buffer = old_buffer; this->buffer_size >>= 1; diff --git a/src/libmdsserver/util.c b/src/libmdsserver/util.c index 3b6cde1..41c6ff1 100644 --- a/src/libmdsserver/util.c +++ b/src/libmdsserver/util.c @@ -126,6 +126,7 @@ size_t send_message(int socket, const char* message, size_t length) size_t sent = 0; ssize_t just_sent; + errno = 0; while (length > 0) if ((just_sent = send(socket, message + sent, min(block_size, length), MSG_NOSIGNAL)) < 0) { @@ -222,8 +223,7 @@ char* full_read(int fd) if (state_buf_size == state_buf_ptr) { char* old_buf = state_buf; - state_buf = realloc(state_buf, (state_buf_size <<= 1) * sizeof(char)); - if (state_buf == NULL) + if (xrealloc(state_buf, state_buf_size <<= 1, char)) { free(old_buf); return NULL; @@ -245,3 +245,27 @@ char* full_read(int fd) return state_buf; } + +/** + * Check whether a string begins with a specific string, + * where neither of the strings are necessarily NUL-terminated + * + * @param haystack The string that should start with the other string + * @param needle The string the first string should start with + * @param haystack_n The length of `haystack` + * @param needle_n The length of `needle` + * @return Whether the `haystack` begins with `needle` + */ +int startswith_n(const char* haystack, const char* needle, size_t haystack_n, size_t needle_n) +{ + size_t i; + if (haystack_n < needle_n) + return 0; + + for (i = 0; i < needle_n; i++) + if (haystack[i] != needle[i]) + return 0; + + return 1; +} + diff --git a/src/libmdsserver/util.h b/src/libmdsserver/util.h index da2cb9f..1b533bb 100644 --- a/src/libmdsserver/util.h +++ b/src/libmdsserver/util.h @@ -92,6 +92,18 @@ int full_write(int fd, const char* buffer, size_t length); */ char* full_read(int fd); +/** + * Check whether a string begins with a specific string, + * where neither of the strings are necessarily NUL-terminated + * + * @param haystack The string that should start with the other string + * @param needle The string the first string should start with + * @param haystack_n The length of `haystack` + * @param needle_n The length of `needle` + * @return Whether the `haystack` begins with `needle` + */ +int startswith_n(const char* haystack, const char* needle, size_t haystack_n, size_t needle_n) __attribute__((pure)); + #endif diff --git a/src/mds-server/mds-server.c b/src/mds-server/mds-server.c index 0d669ad..4d146b7 100644 --- a/src/mds-server/mds-server.c +++ b/src/mds-server/mds-server.c @@ -551,19 +551,16 @@ void send_multicast_queue(client_t* client) while (client->multicasts_count > 0) { multicast_t multicast; - with_mutex (client->mutex, - if (client->multicasts_count > 0) - { - size_t c = (client->multicasts_count -= 1) * sizeof(multicast_t); - multicast = client->multicasts[0]; - memmove(client->multicasts, client->multicasts + 1, c); - if (c == 0) - { - free(client->multicasts); - client->multicasts = NULL; - } - } - ); + with_mutex_if (client->mutex, client->multicasts_count > 0, + size_t c = (client->multicasts_count -= 1) * sizeof(multicast_t); + multicast = client->multicasts[0]; + memmove(client->multicasts, client->multicasts + 1, c); + if (c == 0) + { + free(client->multicasts); + client->multicasts = NULL; + } + ); multicast_message(&multicast); multicast_destroy(&multicast); } @@ -592,13 +589,13 @@ void send_reply_queue(client_t* client) while (n > 0) { sent = send_message(client->socket_fd, sendbuf_, n); - if ((sent < n) && (errno != EINTR)) /* Ignore EINTR */ + n -= sent; + sendbuf_ += sent / sizeof(char); + if ((n > 0) && (errno != EINTR)) /* Ignore EINTR */ { perror(*argv); break; } - n -= sent; - sendbuf_ += sent / sizeof(char); } free(sendbuf); ); @@ -745,10 +742,7 @@ int message_received(client_t* client) if (assign_id && (client->id == 0)) { intercept |= 2; - with_mutex (slave_mutex, - client->id = next_id++; - if (next_id == 0) - { + with_mutex_if (slave_mutex, (client->id = ++next_id) == 0, eprint("this is impossible, ID counter has overflowed."); /* If the program ran for a millennium it would take c:a 585 assignments per nanosecond. This @@ -756,8 +750,7 @@ int message_received(client_t* client) dedication by generations of ponies (or just an alicorn) to maintain the process and transfer it new hardware.) */ abort(); - } - ); + ); } /* Make the client listen for messages addressed to it. */ @@ -795,8 +788,7 @@ int message_received(client_t* client) if (len > size) { char* old_buf = buf; - buf = realloc(buf, ((size <<= 1) + 1) * sizeof(char)); - if (buf == NULL) + if (xrealloc(buf, (size <<= 1) + 1, char)) { perror(*argv); free(old_buf); @@ -901,6 +893,35 @@ int message_received(client_t* client) /** + * Remove interception condition by index + * + * @param client The intercepting client + * @param index The index of the condition + */ +static void remove_intercept_condition(client_t* client, size_t index) +{ + interception_condition_t* conds = client->interception_conditions; + size_t n = client->interception_conditions_count; + + /* Remove the condition from the list. */ + memmove(conds + index, conds + index + 1, --n - index); + client->interception_conditions_count--; + + /* Shrink the list. */ + if (client->interception_conditions_count == 0) + { + free(conds); + client->interception_conditions = NULL; + } + else + if (xrealloc(conds, n, interception_condition_t)) + perror(*argv); + else + client->interception_conditions = conds; +} + + +/** * Add an interception condition for a client * * @param client The client @@ -934,51 +955,38 @@ void add_intercept_condition(client_t* client, char* condition, int64_t priority with for optimisation. */ for (i = 0; i < n; i++) { - if ((conds[i].header_hash == hash) && strequals(conds[i].condition, condition)) + if ((conds[i].header_hash != hash) || !strequals(conds[i].condition, condition)) { - if (stop) - { - /* Remove the condition from the list. */ - memmove(conds + i, conds + i + 1, --n - i); - client->interception_conditions_count--; - /* Shrink the list. */ - if (n == 0) - { - free(conds); - client->interception_conditions = NULL; - } - else - if ((conds = realloc(conds, n * sizeof(interception_condition_t))) == NULL) - perror(*argv); - else - client->interception_conditions = conds; - } - else + /* Look for the first non-modifying, this is a part of the + optimisation where we put all modifying conditions at the + beginning. */ + if ((nonmodifying < 0) && conds[i].modifying) + nonmodifying = (ssize_t)i; + continue; + } + + if (stop) + remove_intercept_condition(client, i); + else + { + /* Update parameters. */ + conds[i].priority = priority; + conds[i].modifying = modifying; + + if (modifying && (nonmodifying >= 0)) { - /* Update parameters. */ - conds[i].priority = priority; - conds[i].modifying = modifying; - - if (modifying && (nonmodifying >= 0)) - { - /* Optimisation: put conditions that are modifying - at the beginning. When a client is intercepting - we most know if any satisfying condition is - modifying. With this optimisation the first - satisfying condition will tell us if there is - any satisfying condition that is modifying. */ - interception_condition_t temp = conds[nonmodifying]; - conds[nonmodifying] = conds[i]; - conds[i] = temp; - } + /* Optimisation: put conditions that are modifying + at the beginning. When a client is intercepting + we most know if any satisfying condition is + modifying. With this optimisation the first + satisfying condition will tell us if there is + any satisfying condition that is modifying. */ + interception_condition_t temp = conds[nonmodifying]; + conds[nonmodifying] = conds[i]; + conds[i] = temp; } - return; } - /* Look for the first non-modifying, this is a part of the - optimisation where we put all modifying conditions at the - beginning. */ - if ((nonmodifying < 0) && conds[i].modifying) - nonmodifying = (ssize_t)i; + return; } if (stop) @@ -993,11 +1001,7 @@ void add_intercept_condition(client_t* client, char* condition, int64_t priority } /* Grow the interception condition list. */ - if (conds == NULL) - conds = malloc(1 * sizeof(interception_condition_t)); - else - conds = realloc(conds, (n + 1) * sizeof(interception_condition_t)); - if (conds == NULL) + if (xrealloc(conds, n + 1, interception_condition_t)) { perror(*argv); free(condition); @@ -1188,7 +1192,7 @@ void queue_message_multicast(char* message, size_t length, client_t* sender) xsnprintf(modify_id_header, "Modify ID: %" PRIu64 "\n", modify_id); n = strlen(modify_id_header); old_buf = message; - if ((message = realloc(message, (n + length) * sizeof(char))) == NULL) + if (xrealloc(message, n + length, char)) { message = old_buf; goto fail; @@ -1247,8 +1251,8 @@ void queue_message_multicast(char* message, size_t length, client_t* sender) void multicast_message(multicast_t* multicast) { uint64_t modify_id = 0; - size_t n = min(strlen("Modify ID: "), multicast->message_length); - if (!strncmp(multicast->message, "Modify ID: ", n)) + size_t n = strlen("Modify ID: "); + if (startswith_n(multicast->message, "Modify ID: ", multicast->message_length, n)) { char* value = multicast->message + n; char* lf = strchr(value, '\n'); @@ -1281,22 +1285,16 @@ void multicast_message(multicast_t* multicast) } /* Send the message. */ + n *= sizeof(char); with_mutex (client->mutex, - errno = 0; - n *= sizeof(char); if (client->open) - while (n > 0) - { - sent = send_message(client->socket_fd, msg + multicast->message_ptr, n); - if (sent < n) - { - if (errno != EINTR) - perror(*argv); - break; - } - n -= sent; - multicast->message_ptr += sent / sizeof(char); - } + { + sent = send_message(client->socket_fd, msg + multicast->message_ptr, n); + n -= sent; + multicast->message_ptr += sent / sizeof(char); + if ((n > 0) && (errno != EINTR)) + perror(*argv); + } ); /* Stop if we are re-exec:ing or terminating, or continue to next recipient on error. */ @@ -1331,42 +1329,35 @@ void multicast_message(multicast_t* multicast) mds_message_t* mod; /* Wait for a reply. */ - with_mutex (modify_mutex, - if (client->modify_message == NULL) - { - if (hash_table_contains_key(&modify_map, (size_t)modify_id) == 0) - { - hash_table_put(&modify_map, (size_t)modify_id, (size_t)(void*)client); - pthread_cond_signal(&slave_cond); - } - } - ); - with_mutex (client->modify_mutex, - if (client->modify_message == NULL) - { - while ((client->modify_message == NULL) && (terminating == 0)) - pthread_cond_timedwait(&slave_cond, &slave_mutex, &timeout); - if (terminating == 0) - hash_table_remove(&modify_map, (size_t)modify_id); - } - ); + with_mutex_if (modify_mutex, client->modify_message == NULL, + if (hash_table_contains_key(&modify_map, (size_t)modify_id) == 0) + { + hash_table_put(&modify_map, (size_t)modify_id, (size_t)(void*)client); + pthread_cond_signal(&slave_cond); + } + ); + with_mutex_if (client->modify_mutex, client->modify_message == NULL, + while ((client->modify_message == NULL) && (terminating == 0)) + pthread_cond_timedwait(&slave_cond, &slave_mutex, &timeout); + if (terminating == 0) + hash_table_remove(&modify_map, (size_t)modify_id); + ); if (terminating) return; /* Act upon the reply. */ mod = client->modify_message; for (i = 0; i < mod->header_count; i++) - if (!strcmp(mod->headers[i], "Modify: yes")) + if (strequals(mod->headers[i], "Modify: yes")) { modifying = 1; break; } if (modifying) { - old_buf = multicast->message; n = mod->payload_size; - multicast->message = realloc(old_buf, (multicast->message_prefix + n) * sizeof(char)); - if (multicast->message == NULL) + old_buf = multicast->message; + if (xrealloc(multicast->message, multicast->message_prefix + n, char)) { perror(*argv); multicast->message = old_buf; @@ -1507,11 +1498,12 @@ void signal_all(int signo) with_mutex (slave_mutex, foreach_linked_list_node (client_list, node) - { - client_t* value = (client_t*)(void*)(client_list.values[node]); - if (pthread_equal(current_thread, value->thread) == 0) - pthread_kill(value->thread, signo); - }); + { + client_t* value = (client_t*)(void*)(client_list.values[node]); + if (pthread_equal(current_thread, value->thread) == 0) + pthread_kill(value->thread, signo); + } + ); } |