diff options
Diffstat (limited to '')
-rw-r--r-- | src/mds-server/mds-server.c | 186 |
1 files changed, 156 insertions, 30 deletions
diff --git a/src/mds-server/mds-server.c b/src/mds-server/mds-server.c index 00f8ceb..bc3b20f 100644 --- a/src/mds-server/mds-server.c +++ b/src/mds-server/mds-server.c @@ -16,9 +16,11 @@ * along with this program. If not, see <http://www.gnu.org/licenses/>. */ #include "mds-server.h" + #include "interception_condition.h" #include "client.h" #include "queued_interception.h" +#include "multicast.h" #include <libmdsserver/config.h> #include <libmdsserver/linked-list.h> @@ -108,6 +110,11 @@ static linked_list_t client_list; */ static uint64_t next_id = 1; +/** + * The next free ID for a message modifications + */ +static uint64_t next_modify_id = 1; + /** @@ -457,12 +464,13 @@ void* slave_loop(void* data) /* Add client to table. */ tmp = fd_table_put(&client_map, socket_fd, (size_t)(void*)information); - pthread_mutex_unlock(&slave_mutex); if ((tmp == 0) && errno) { perror(*argv); + pthread_mutex_unlock(&slave_mutex); goto fail; } + pthread_mutex_unlock(&slave_mutex); /* Fill information table. */ information->list_entry = entry; @@ -503,6 +511,27 @@ void* slave_loop(void* data) if (information->open) while (reexecing == 0) { + /* Send queued multicast messages. */ + if (information->multicasts_count > 0) + { + multicast_t multicast; + with_mutex(information->mutex, + if (information->multicasts_count > 0) + { + size_t c = information->multicasts_count -= 1; + multicast = information->multicasts[0]; + memmove(information->multicasts, information->multicasts + 1, c); + if (c == 0) + { + free(information->multicasts); + information->multicasts = NULL; + } + } + ); + multicast_message(&multicast); + multicast_destroy(&multicast); + } + /* Send queued messages. */ if (information->send_pending_size > 0) { @@ -572,7 +601,7 @@ void* slave_loop(void* data) (uint32_t)(information->id >> 32), (uint32_t)(information->id >> 0)); n = strlen(msgbuf) + 1; - multicast_message(msgbuf, n); + queue_message_multicast(msgbuf, n, information); fail: /* The loop does break, this done on success as well. */ @@ -733,13 +762,14 @@ void message_received(client_t* client) return; } mds_message_marshal(&message, msgbuf, 0); - multicast_message(msgbuf, n / sizeof(char)); /* TODO support re-exec */ - free(msgbuf); + queue_message_multicast(msgbuf, n / sizeof(char), client); /* Send asigned ID. */ if (assign_id) { + char* msgbuf_; + /* Construct response. */ n = 2 * 10 + strlen(message_id) + 1; n += strlen("ID assignment: :\nIn response to: \n\n"); @@ -758,7 +788,14 @@ void message_received(client_t* client) n = strlen(msgbuf); /* Multicast the reply. */ - multicast_message(msgbuf, n); /* TODO support re-exec */ + msgbuf_ = strdup(msgbuf); + if (msgbuf_ == NULL) + { + perror(*argv); + free(msgbuf); + return; + } + queue_message_multicast(msgbuf, n, client); /* Queue message to be sent when this function returns. This done to simplify `multicast_message` for re-exec. */ @@ -925,12 +962,13 @@ static int cmp_queued_interception(const void* a, const void* b) /** - * Multicast a message + * Queue a message for multicasting * * @param message The message * @param length The length of the message + * @param sender The original sender of the message */ -void multicast_message(char* message, size_t length) +void queue_message_multicast(char* message, size_t length, client_t* sender) { size_t header_count = 0; size_t n = length - 1; @@ -939,8 +977,12 @@ void multicast_message(char* message, size_t length) char** header_values = NULL; queued_interception_t* interceptions = NULL; size_t interceptions_count = 0; + multicast_t* multicast = NULL; size_t i; ssize_t node; + uint64_t modify_id; + char modify_id_header[13 + 3 * sizeof(uint64_t)]; + char* old_buf; /* Count the number of headers. */ for (i = 0; i < n; i++) @@ -954,6 +996,10 @@ void multicast_message(char* message, size_t length) if (header_count == 0) return; /* Invalid message. */ + /* Allocate multicast message. */ + if (xmalloc(multicast, 1, sizeof(multicast_t))) goto fail; + multicast_initialise(multicast); + /* Allocate header lists. */ if (xmalloc(hashes, header_count, size_t)) goto fail; if (xmalloc(headers, header_count, char*)) goto fail; @@ -1008,7 +1054,6 @@ void multicast_message(char* message, size_t length) /* Look for a matching condition. */ n = client->interception_conditions_count; - errno = 0; if (client->open) { with_mutex(mutex, @@ -1054,48 +1099,127 @@ void multicast_message(char* message, size_t length) /* Sort interceptors. */ qsort(interceptions, interceptions_count, sizeof(queued_interception_t), cmp_queued_interception); - /* Send message to interceptors. */ - for (i = 0; i < interceptions_count; i++) + /* Add prefix to message with ‘Modify ID’ header. */ + with_mutex(slave_mutex, + modify_id = next_modify_id++; + if (next_modify_id == 0) + next_modify_id = 1; + ); + xsnprintf(modify_id_header, "Modify ID: %" PRIu64 "\n", modify_id); + n = strlen(modify_id_header); + old_buf = message; + if ((message = realloc(message, (n + length) * sizeof(char))) == NULL) { - queued_interception_t client_ = interceptions[i]; + message = old_buf; + goto fail; + } + memmove(message + n, message, (n + length) * sizeof(char)); + memcpy(message, modify_id_header, n * sizeof(char)); + + /* Store information. */ + multicast->interceptions = interceptions; + multicast->interceptions_count = interceptions_count; + multicast->message = message; + multicast->message_length = length; + multicast->message_prefix = n; + message = NULL; + + /* Queue message multicasting. */ + with_mutex(sender->mutex, + if (sender->multicasts_count == 1) + { + if (xmalloc(sender->multicasts, 1, multicast_t)) + goto fail_queue; + } + else + { + multicast_t* new_buf; + new_buf = realloc(sender->multicasts, (sender->multicasts_count + 1) * sizeof(multicast_t)); + if (new_buf == NULL) + goto fail_queue; + sender->multicasts = new_buf; + } + sender->multicasts[sender->multicasts_count++] = *multicast; + multicast = NULL; + fail_queue: + ); + + errno = 0; + fail: /* This is done before this function returns even if there was no error. */ + if (errno != 0) + perror(*argv); + /* Release resources. */ + xfree(headers, header_count); + xfree(header_values, header_count); + free(hashes); + free(message); + if (multicast != NULL) + multicast_destroy(multicast); + free(multicast); +} + + +/** + * Multicast a message + * + * @param multicast The multicast message + */ +void multicast_message(multicast_t* multicast) +{ + for (; multicast->interceptions_ptr < multicast->interceptions_count; multicast->interceptions_ptr++) + { + queued_interception_t client_ = multicast->interceptions[multicast->interceptions_ptr]; client_t* client = client_.client; - char* msg = message; + char* msg = multicast->message + multicast->message_ptr; + size_t n = multicast->message_length - multicast->message_ptr; size_t sent; - n = length; + + /* After unmarshalling at re-exec, client will be NULL and must be mapped from its socket. */ + if (client == NULL) + { + size_t address; + with_mutex(slave_mutex, address = fd_table_get(&client_map, client_.socket_fd);); + client_.client = client = (client_t*)(void*)address; + } + + /* Skip Modify ID header if the interceptors will not perform a modification. */ + if ((client_.modifying == 0) && (multicast->message_ptr == 0)) + { + n -= multicast->message_prefix; + multicast->message_ptr += multicast->message_prefix; + } /* Send the message. */ with_mutex(client->mutex, + errno = 0; if (client->open) while (n > 0) { - sent = send_message(client->socket_fd, msg, n); - if ((sent < n) && (errno != EINTR)) /* TODO Support reexec */ + sent = send_message(client->socket_fd, msg + multicast->message_ptr, n); + if (sent < n) { - perror(*argv); + if (errno != EINTR) + perror(*argv); break; } n -= sent; - msg += sent; + multicast->message_ptr += sent; } ); + /* Stop if we are re-exec:ing. */ + if ((n > 0) && (errno == EINTR)) + return; + /* Wait for a reply. */ - if ((n > 0) && client_.modifying) + if ((n == 0) && client_.modifying) { /* TODO */ } + + /* Reset how much of the message has been sent before we continue with next recipient. */ + multicast->message_ptr = 0; } - - - errno = 0; - fail: /* This is done before this function returns even if there was no error. */ - if (errno != 0) - perror(*argv); - /* Release resources. */ - xfree(headers, header_count); - xfree(header_values, header_count); - free(interceptions); - free(hashes); } @@ -1226,7 +1350,7 @@ int marshal_server(int fd) } /* Add the size of the rest of the program's state. */ - state_n += sizeof(int) + sizeof(sig_atomic_t) + sizeof(uint64_t) + 2 * sizeof(size_t); + state_n += sizeof(int) + sizeof(sig_atomic_t) + 2 * sizeof(uint64_t) + 2 * sizeof(size_t); state_n += list_elements * sizeof(size_t) + list_size + map_size; /* Allocate a buffer for all data except the client list and the client map. */ @@ -1241,6 +1365,7 @@ int marshal_server(int fd) /* Marshal the miscellaneous state data. */ buf_set_next(state_buf_, sig_atomic_t, running); buf_set_next(state_buf_, uint64_t, next_id); + buf_set_next(state_buf_, uint64_t, next_modify_id); /* Tell the program how large the marshalled client list is and how any clients are marshalled. */ buf_set_next(state_buf_, size_t, list_size); @@ -1338,6 +1463,7 @@ int unmarshal_server(int fd) /* Unmarshal the miscellaneous state data. */ buf_get_next(state_buf_, sig_atomic_t, running); buf_get_next(state_buf_, uint64_t, next_id); + buf_get_next(state_buf_, uint64_t, next_modify_id); /* Get the marshalled size of the client list and how any clients that are marshalled. */ buf_get_next(state_buf_, size_t, list_size); |