From d8b997547139babdddadb2e401bb6d3e75ff9227 Mon Sep 17 00:00:00 2001 From: Mattias Andrée Date: Sun, 11 May 2014 19:14:57 +0200 Subject: handle interceptions, everything except receiving modifications MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Mattias Andrée --- src/mds-server/client.c | 26 +++++++- src/mds-server/client.h | 5 ++ src/mds-server/mds-server.c | 145 ++++++++++++++++++++++++++++++++++++++------ 3 files changed, 156 insertions(+), 20 deletions(-) (limited to 'src/mds-server') diff --git a/src/mds-server/client.c b/src/mds-server/client.c index 0a8b107..6c46c90 100644 --- a/src/mds-server/client.c +++ b/src/mds-server/client.c @@ -52,6 +52,11 @@ void client_destroy(client_t* restrict this) free(this->multicasts); } free(this->send_pending); + if (this->modify_message != NULL) + { + mds_message_destroy(this->modify_message); + free(this->modify_message); + } free(this); } @@ -64,7 +69,7 @@ void client_destroy(client_t* restrict this) */ size_t client_marshal_size(const client_t* restrict this) { - size_t n = sizeof(ssize_t) + 2 * sizeof(int) + sizeof(uint64_t) + 4 * sizeof(size_t); + size_t n = sizeof(ssize_t) + 2 * sizeof(int) + sizeof(uint64_t) + 5 * sizeof(size_t); size_t i; n += mds_message_marshal_size(&(this->message), 1); @@ -73,6 +78,7 @@ size_t client_marshal_size(const client_t* restrict this) for (i = 0; i < this->multicasts_count; i++) n += multicast_marshal_size(this->multicasts + i); n += this->send_pending_size * sizeof(char); + n += this->modify_message == NULL ? 0 : mds_message_marshal_size(this->modify_message, 1); return n; } @@ -105,6 +111,9 @@ size_t client_marshal(const client_t* restrict this, char* restrict data) buf_set_next(data, size_t, this->send_pending_size); if (this->send_pending_size > 0) memcpy(data, this->send_pending, this->send_pending_size * sizeof(char)); + data += this->send_pending_size; + if (this->modify_message != NULL) + mds_message_marshal(this->modify_message, data, 1); return client_marshal_size(this); } @@ -163,7 +172,13 @@ size_t client_unmarshal(client_t* restrict this, char* restrict data) if (xmalloc(this->send_pending, this->send_pending_size, char)) goto fail; memcpy(this->send_pending, data, this->send_pending_size * sizeof(char)); + data += this->send_pending_size; } + buf_get_next(data, size_t, n); + if (n > 0) + mds_message_unmarshal(this->modify_message, data); + else + this->modify_message = NULL; return rc; fail: @@ -175,6 +190,11 @@ size_t client_unmarshal(client_t* restrict this, char* restrict data) multicast_destroy(this->multicasts + i); free(this->multicasts); free(this->send_pending); + if (this->modify_message != NULL) + { + mds_message_destroy(this->modify_message); + free(this->modify_message); + } return 0; } @@ -186,7 +206,7 @@ size_t client_unmarshal(client_t* restrict this, char* restrict data) */ size_t client_unmarshal_skip(char* restrict data) { - size_t n, c, rc = sizeof(ssize_t) + 2 * sizeof(int) + sizeof(uint64_t) + 3 * sizeof(size_t); + size_t n, c, rc = sizeof(ssize_t) + 2 * sizeof(int) + sizeof(uint64_t) + 5 * sizeof(size_t); buf_next(data, ssize_t, 1); buf_next(data, int, 2); buf_next(data, uint64_t, 1); @@ -209,6 +229,8 @@ size_t client_unmarshal_skip(char* restrict data) } buf_get_next(data, size_t, n); rc += n * sizeof(char); + buf_get_next(data, size_t, n); + rc += n * sizeof(char); return rc; } diff --git a/src/mds-server/client.h b/src/mds-server/client.h index da5d14f..972ff84 100644 --- a/src/mds-server/client.h +++ b/src/mds-server/client.h @@ -107,6 +107,11 @@ typedef struct client */ size_t send_pending_size; + /** + * Pending reply to the multicast interception + */ + struct mds_message* modify_message; + } client_t; diff --git a/src/mds-server/mds-server.c b/src/mds-server/mds-server.c index 694197b..5e60e3d 100644 --- a/src/mds-server/mds-server.c +++ b/src/mds-server/mds-server.c @@ -48,6 +48,7 @@ #include #include #include +#include @@ -115,6 +116,21 @@ static uint64_t next_id = 1; */ static uint64_t next_modify_id = 1; +/** + * Mutex for message modification + */ +static pthread_mutex_t modify_mutex; + +/** + * Condition for message modification + */ +static pthread_cond_t modify_cond; + +/** + * Map from modification ID to waiting client + */ +static hash_table_t modify_map; + /** @@ -229,20 +245,28 @@ int main(int argc_, char** argv_) } +#define __free(I) \ + if (I <= 0) fd_table_destroy(&client_map, NULL, NULL); \ + if (I <= 1) linked_list_destroy(&client_list); \ + if (I <= 2) pthread_mutex_destroy(&slave_mutex); \ + if (I <= 3) pthread_cond_destroy(&slave_cond); \ + if (I <= 4) pthread_mutex_destroy(&modify_mutex); \ + if (I <= 5) pthread_cond_destroy(&modify_cond) + + /* Create list and table of clients. */ if (reexec == 0) { if (fd_table_create(&client_map)) { perror(*argv); - fd_table_destroy(&client_map, NULL, NULL); + __free(0); return 1; } if (linked_list_create(&client_list, 32)) { perror(*argv); - fd_table_destroy(&client_map, NULL, NULL); - linked_list_destroy(&client_list); + __free(1); return 1; } } @@ -256,26 +280,41 @@ int main(int argc_, char** argv_) if (xsigaction(SIGUSR1, sigusr1_trap) < 0) { perror(*argv); - fd_table_destroy(&client_map, NULL, NULL); - linked_list_destroy(&client_list); + __free(1); return 1; } - /* Create mutex and condition for slave counter. */ if ((errno = pthread_mutex_init(&slave_mutex, NULL)) != 0) { perror(*argv); - fd_table_destroy(&client_map, NULL, NULL); - linked_list_destroy(&client_list); + __free(1); return 1; } if ((errno = pthread_cond_init(&slave_cond, NULL)) != 0) { perror(*argv); - fd_table_destroy(&client_map, NULL, NULL); - linked_list_destroy(&client_list); - pthread_mutex_destroy(&slave_mutex); + __free(2); + return 1; + } + + /* Create mutex, condition and map for message modification. */ + if ((errno = pthread_mutex_init(&modify_mutex, NULL)) != 0) + { + perror(*argv); + __free(3); + return 1; + } + if ((errno = pthread_cond_init(&modify_cond, NULL)) != 0) + { + perror(*argv); + __free(4); + return 1; + } + if (hash_table_create(&modify_map)) + { + perror(*argv); + __free(5); return 1; } @@ -359,10 +398,9 @@ int main(int argc_, char** argv_) /* Release resources. */ - fd_table_destroy(&client_map, NULL, NULL); - linked_list_destroy(&client_list); - pthread_mutex_destroy(&slave_mutex); - pthread_cond_destroy(&slave_cond); + __free(9999); + +#undef __free return 0; @@ -382,6 +420,9 @@ int main(int argc_, char** argv_) /* Release resources. */ pthread_mutex_destroy(&slave_mutex); pthread_cond_destroy(&slave_cond); + pthread_mutex_destroy(&modify_mutex); + pthread_cond_destroy(&modify_cond); + hash_table_destroy(&modify_map, NULL, NULL); /* Marshal the state of the server. */ xsnprintf(shm_path, SHM_PATH_PATTERN, (unsigned long int)pid); @@ -450,6 +491,7 @@ void* slave_loop(void* data) /* NULL-out pointers and initialisation markers. */ information->interception_conditions = NULL; information->send_pending = NULL; + information->modify_message = NULL; information->mutex_created = 0; /* Add to list of clients. */ @@ -641,7 +683,7 @@ void* slave_loop(void* data) * * @param client The client has sent a message */ -void message_received(client_t* client) +void message_received(client_t* client) /* TODO Modify ID */ { mds_message_t message = client->message; int assign_id = 0; @@ -1166,6 +1208,16 @@ void queue_message_multicast(char* message, size_t length, client_t* sender) */ void multicast_message(multicast_t* multicast) { + uint64_t modify_id = 0; + if (strstr(multicast->message, "Modify ID: ") == multicast->message) + { + char* value = multicast->message + strlen("Modify ID: "); + char* lf = strchr(value, '\n'); + *lf = '\0'; + modify_id = (uint64_t)atoll(value); + *lf = '\n'; + } + for (; multicast->interceptions_ptr < multicast->interceptions_count; multicast->interceptions_ptr++) { queued_interception_t client_ = multicast->interceptions[multicast->interceptions_ptr]; @@ -1212,10 +1264,67 @@ void multicast_message(multicast_t* multicast) if ((n > 0) && reexecing) return; - /* Wait for a reply. */ + /* Wait for a reply and act upon it. */ if ((n == 0) && client_.modifying) { - /* TODO */ + /* pthread_cond_timedwait is required to handle re-exec because + pthread_cond_timedwait and pthread_cond_wait ignore interruptions via signals. */ + struct timespec timeout = + { + .tv_sec = 1, + .tv_nsec = 0 + }; + int modifying = 0; + char* old_buf; + size_t i; + mds_message_t* mod; + + /* Wait for a reply. */ + with_mutex(modify_mutex, + if (client->modify_message == NULL) + { + if (hash_table_contains_key(&modify_map, (size_t)modify_id) == 0) + { + hash_table_put(&modify_map, (size_t)modify_id, (size_t)(void*)client); + pthread_cond_signal(&slave_cond); + } + for (;;) + { + pthread_cond_timedwait(&slave_cond, &slave_mutex, &timeout); + if ((client->modify_message != NULL) && reexecing) + break; + } + if (reexecing == 0) + hash_table_remove(&modify_map, (size_t)modify_id); + } + ); + if (reexecing) + return; + + /* Act upon the reply. */ + mod = client->modify_message; + for (i = 0; i < mod->header_count; i++) + if (!strcmp(mod->headers[i], "Modify: yes")) + { + modifying = 1; + break; + } + if (modifying) + { + old_buf = multicast->message; + n = mod->payload_size; + multicast->message = realloc(old_buf, (multicast->message_prefix + n) * sizeof(char)); + if (multicast->message == NULL) + { + perror(*argv); + multicast->message = old_buf; + } + else + memcpy(multicast->message + multicast->message_prefix, mod->payload, n); + } + + /* Free the reply. */ + mds_message_destroy(client->modify_message); } /* Reset how much of the message has been sent before we continue with next recipient. */ -- cgit v1.2.3-70-g09d2