From 4ccb5bfda23550315a89d625b99b3cc48c3e42fa Mon Sep 17 00:00:00 2001 From: Mattias Andrée Date: Sun, 11 May 2014 20:38:35 +0200 Subject: receive modifications MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Mattias Andrée --- src/mds-server/client.c | 6 +++ src/mds-server/client.h | 20 +++++++ src/mds-server/mds-server.c | 129 ++++++++++++++++++++++++++++++++++++-------- 3 files changed, 134 insertions(+), 21 deletions(-) (limited to 'src') diff --git a/src/mds-server/client.c b/src/mds-server/client.c index 6c46c90..3c8fb2d 100644 --- a/src/mds-server/client.c +++ b/src/mds-server/client.c @@ -57,6 +57,10 @@ void client_destroy(client_t* restrict this) mds_message_destroy(this->modify_message); free(this->modify_message); } + if (this->modify_mutex_created) + pthread_mutex_destroy(&(this->modify_mutex)); + if (this->modify_cond_created) + pthread_cond_destroy(&(this->modify_cond)); free(this); } @@ -132,6 +136,8 @@ size_t client_unmarshal(client_t* restrict this, char* restrict data) this->multicasts = NULL; this->send_pending = NULL; this->mutex_created = 0; + this->modify_mutex_created = 0; + this->modify_cond_created = 0; this->multicasts_count = 0; buf_get_next(data, ssize_t, this->list_entry); buf_get_next(data, int, this->socket_fd); diff --git a/src/mds-server/client.h b/src/mds-server/client.h index 972ff84..2aeb813 100644 --- a/src/mds-server/client.h +++ b/src/mds-server/client.h @@ -112,6 +112,26 @@ typedef struct client */ struct mds_message* modify_message; + /** + * Mutex for `modify_message` + */ + pthread_mutex_t modify_mutex; + + /** + * Condidition for `modify_message` + */ + pthread_cond_t modify_cond; + + /** + * Whether `modify_mutex` has been initialised + */ + int modify_mutex_created; + + /** + * Whether `modify_cond` has been initialised + */ + int modify_cond_created; + } client_t; diff --git a/src/mds-server/mds-server.c b/src/mds-server/mds-server.c index 8fbcd44..0c8ba92 100644 --- a/src/mds-server/mds-server.c +++ b/src/mds-server/mds-server.c @@ -493,6 +493,8 @@ void* slave_loop(void* data) information->send_pending = NULL; information->modify_message = NULL; information->mutex_created = 0; + information->modify_mutex_created = 0; + information->modify_cond_created = 0; /* Add to list of clients. */ pthread_mutex_lock(&slave_mutex); @@ -540,6 +542,20 @@ void* slave_loop(void* data) } information->mutex_created = 1; + /* Create mutex and codition for multicast interception replies. */ + if ((errno = pthread_mutex_init(&(information->modify_mutex), NULL)) != 0) + { + perror(*argv); + goto fail; + } + information->modify_mutex_created = 1; + if ((errno = pthread_cond_init(&(information->modify_cond), NULL)) != 0) + { + perror(*argv); + goto fail; + } + information->modify_cond_created = 1; + /* Make the server update without all slaves dying on SIGUSR1. */ if (xsigaction(SIGUSR1, sigusr1_trap) < 0) @@ -683,7 +699,7 @@ void* slave_loop(void* data) * * @param client The client has sent a message */ -void message_received(client_t* client) /* TODO Modify ID */ +void message_received(client_t* client) { mds_message_t message = client->message; int assign_id = 0; @@ -692,6 +708,7 @@ void message_received(client_t* client) /* TODO Modify ID */ int64_t priority = 0; int stop = 0; const char* message_id = NULL; + uint64_t modify_id = 0; size_t i, n; char* msgbuf; @@ -706,9 +723,71 @@ void message_received(client_t* client) /* TODO Modify ID */ else if (strequals(h, "Stop: yes")) stop = 1; else if (startswith(h, "Message ID: ")) message_id = strstr(h, ": ") + 2; else if (startswith(h, "Priority: ")) priority = atoll(strstr(h, ": ") + 2); + else if (startswith(h, "Modify ID: ")) modify_id = (uint64_t)atoll(strstr(h, ": ") + 2); + } + + + /* Notify waiting client about a received message modification. */ + if (modifying != 0) + { + /* pthread_cond_timedwait is required to handle re-exec because + pthread_cond_timedwait and pthread_cond_wait ignore interruptions via signals. */ + struct timespec timeout = + { + .tv_sec = 1, + .tv_nsec = 0 + }; + size_t address; + client_t* recipient; + mds_message_t* multicast; + + with_mutex(modify_mutex, + while (hash_table_contains_key(&modify_map, (size_t)modify_id) == 0) + { + /* TODO support re-exec */ + pthread_cond_timedwait(&slave_cond, &slave_mutex, &timeout); + } + address = hash_table_get(&modify_map, (size_t)modify_id); + recipient = (client_t*)(void*)address; + if (xmalloc(multicast = recipient->modify_message, 1, mds_message_t)) + goto fail; + multicast->headers = NULL; + multicast->header_count = 0; + multicast->payload = NULL; + multicast->payload_size = 0; + multicast->payload_ptr = 0; + multicast->buffer = NULL; + multicast->buffer_size = 0; + multicast->buffer_ptr = 0; + multicast->stage = 0; + if (xmalloc(multicast->payload, message.payload_size, char)) + goto fail; + memcpy(multicast->payload, message.payload, message.payload_size * sizeof(char)); + if (xmalloc(multicast->headers, message.header_count, char*)) + goto fail; + for (i = 0; i < message.header_count; i++, multicast->header_count++) + { + multicast->headers[i] = strdup(message.headers[i]); + if (multicast->headers[i] == NULL) + goto fail; + } + goto done; + fail: + if (multicast != NULL) + { + mds_message_destroy(multicast); + free(multicast); + recipient->modify_message = NULL; + } + done: + ); + with_mutex(client->modify_mutex, pthread_cond_signal(&(client->modify_cond));); + + /* Do nothing more, not not even multicast this message. */ + return; } - /* Ignore message if not labelled with a message ID. */ + if (message_id == NULL) { eprint("received message with a message ID, ignoring."); @@ -1292,6 +1371,11 @@ void multicast_message(multicast_t* multicast) hash_table_put(&modify_map, (size_t)modify_id, (size_t)(void*)client); pthread_cond_signal(&slave_cond); } + } + ); + with_mutex(client->modify_mutex, + if (client->modify_message == NULL) + { for (;;) { pthread_cond_timedwait(&slave_cond, &slave_mutex, &timeout); @@ -1306,29 +1390,32 @@ void multicast_message(multicast_t* multicast) return; /* Act upon the reply. */ - mod = client->modify_message; - for (i = 0; i < mod->header_count; i++) - if (!strcmp(mod->headers[i], "Modify: yes")) - { - modifying = 1; - break; - } - if (modifying) + if (client->modify_message != NULL) { - old_buf = multicast->message; - n = mod->payload_size; - multicast->message = realloc(old_buf, (multicast->message_prefix + n) * sizeof(char)); - if (multicast->message == NULL) + mod = client->modify_message; + for (i = 0; i < mod->header_count; i++) + if (!strcmp(mod->headers[i], "Modify: yes")) + { + modifying = 1; + break; + } + if (modifying) { - perror(*argv); - multicast->message = old_buf; + old_buf = multicast->message; + n = mod->payload_size; + multicast->message = realloc(old_buf, (multicast->message_prefix + n) * sizeof(char)); + if (multicast->message == NULL) + { + perror(*argv); + multicast->message = old_buf; + } + else + memcpy(multicast->message + multicast->message_prefix, mod->payload, n); } - else - memcpy(multicast->message + multicast->message_prefix, mod->payload, n); + + /* Free the reply. */ + mds_message_destroy(client->modify_message); } - - /* Free the reply. */ - mds_message_destroy(client->modify_message); } /* Reset how much of the message has been sent before we continue with next recipient. */ -- cgit v1.2.3-70-g09d2