diff options
-rw-r--r-- | src/mds-server.c | 183 | ||||
-rw-r--r-- | src/mds-server.h | 44 |
2 files changed, 186 insertions, 41 deletions
diff --git a/src/mds-server.c b/src/mds-server.c index 3ddfaa9..d90ce6a 100644 --- a/src/mds-server.c +++ b/src/mds-server.c @@ -445,6 +445,7 @@ void* slave_loop(void* data) ssize_t entry = LINKED_LIST_UNUSED; size_t information_address = fd_table_get(&client_map, (size_t)socket_fd); client_t* information = (client_t*)(void*)information_address; + int mutex_created = 0; size_t tmp; int r; @@ -459,6 +460,9 @@ void* slave_loop(void* data) goto fail; } + /* NULL-out pointers. */ + information->interception_conditions = NULL; + /* Add to list of clients. */ pthread_mutex_lock(&slave_mutex); entry = linked_list_insert_end(&client_list, (size_t)(void*)information); @@ -483,6 +487,7 @@ void* slave_loop(void* data) information->socket_fd = socket_fd; information->open = 1; information->id = 0; + information->interception_conditions_count = 0; if (mds_message_initialise(&(information->message))) { perror(*argv); @@ -493,8 +498,10 @@ void* slave_loop(void* data) /* Store the thread so that other threads can kill it. */ information->thread = pthread_self(); - /* Create mutex to make sure two thread to not try to send messages concurrently. */ + /* Create mutex to make sure two thread to not try to send + messages concurrently, and other slave local actions. */ pthread_mutex_init(&(information->mutex), NULL); + mutex_created = 1; /* Make the server update without all slaves dying on SIGUSR1. */ @@ -552,13 +559,23 @@ void* slave_loop(void* data) close(socket_fd); if (information != NULL) { + if (information->interception_conditions != NULL) + { + size_t i; + for (i = 0; i < information->interception_conditions_count; i++) + if (information->interception_conditions[i].condition != NULL) + free(information->interception_conditions[i].condition); + free(information->interception_conditions); + } + if (mutex_created) + pthread_mutex_destroy(&(information->mutex)); mds_message_destroy(&(information->message)); free(information); } - fd_table_remove(&client_map, socket_fd); /* Unlist client and decrease the slave count. */ with_mutex(slave_mutex, + fd_table_remove(&client_map, socket_fd); if (entry != LINKED_LIST_UNUSED) linked_list_remove(&client_list, entry); running_slaves--; @@ -590,6 +607,10 @@ void message_received(client_t* client) /* TODO */ { mds_message_t message = client->message; int assign_id = 0; + int modifying = 0; + int intercept = 0; + int64_t priority = 0; + int stop = 0; const char* message_id = NULL; size_t i, n; char* msgbuf; @@ -597,41 +618,60 @@ void message_received(client_t* client) /* TODO */ /* Parser headers. */ for (i = 0; i < message.header_count; i++) { - const char* header = message.headers[i]; - if (strequals(header, "Command: assign-id")) - assign_id = 1; - else if (startswith(header, "Message ID: ")) - message_id = header + strlen("Message ID: "); + const char* h = message.headers[i]; + if (strequals(h, "Command: assign-id")) assign_id = 1; + else if (strequals(h, "Command: intercept")) intercept = 1; + else if (strequals(h, "Modifying: yes")) modifying = 1; + else if (strequals(h, "Stop: yes")) stop = 1; + else if (startswith(h, "Message ID: ")) message_id = strstr(h, ": ") + 2; + else if (startswith(h, "Priority: ")) priority = atoll(strstr(h, ": ") + 2); } - /* Assign ID or reply with current ID. */ - if (assign_id) + /* Ignore message if not labeled with a message ID. */ + if (message_id == NULL) + { + eprint("received message with a message ID, ignoring."); + return; + } + + /* Assign ID if not already assigned. */ + if (assign_id && (client->id == 0)) + { + intercept |= 2; + with_mutex(slave_mutex, + client->id = next_id++; + if (next_id == 0) + { + eprint("this is impossible, ID counter has overflowed."); + /* If the program ran for a millennium it would + take c:a 585 assignments per nanosecond. This + cannot possibly happen. (It would require serious + dedication by generations of ponies (or just an alicorn) + to maintain the process and transfer it new hardware.) */ + abort(); + } + ); + } + + /* Make the client listen for messages addressed to it. */ + if (intercept) { - /* Assign ID if not already assigned. */ - if (client->id == 0) + pthread_mutex_lock(&(client->mutex)); + if ((intercept & 1)) /* from payload */ { - /* Do the assignment. */ - with_mutex(slave_mutex, - client->id = next_id++; - if (next_id == 0) - { - eprint("this is impossible, ID counter has overflowed."); - /* If the program ran for a millennium it would - take c:a 585 assignments per nanosecond. This - cannot possibly happen. (It would require serious - dedication by generations of ponies (or just an alicorn) - to maintain the process and transfer it new hardware.) */ - abort(); - } - ); - - /* TODO: Make the client listen for messages addressed to it. - To: $(assign_id) - Priority: 0 - Modifying: no - */ + /* TODO */ } - + if ((intercept & 2)) /* "To: $(client->id)" */ + { + /* TODO */ + } + pthread_mutex_unlock(&(client->mutex)); + } + + + /* Send asigned ID. */ + if (assign_id) + { /* Construct response. */ n = 2 * 10 + strlen(message_id) + 1; n += strlen("ID assignment: :\nIn response to: \n\n"); @@ -647,7 +687,7 @@ void message_received(client_t* client) /* TODO */ "\n", (uint32_t)(client->id >> 32), (uint32_t)(client->id >> 0), - message_id); + message_id == NULL ? "" : message_id); n = strlen(msgbuf); /* Send message. */ @@ -778,20 +818,29 @@ int marshal_server(int fd) size_t state_n; ssize_t wrote; ssize_t node; + size_t j, n; /* Calculate the grand size of all messages and their buffers. */ for (node = client_list.edge;; list_elements++) { mds_message_t message; + client_t* value; if ((node = client_list.next[node]) == client_list.edge) break; - message = ((client_t*)(void*)(client_list.values[node]))->message; + + value = (client_t*)(void*)(client_list.values[node]); + n = value->interception_conditions_count; + message = value->message; msg_size += mds_message_marshal_size(&message, 1); + msg_size += n * (sizeof(size_t) + sizeof(int64_t) + sizeof(int)); + + for (j = 0; j < n; j++) + msg_size += (strlen(value->interception_conditions[j].condition) + 1) * sizeof(char); } /* Calculate the grand size of all client information. */ - state_n = sizeof(ssize_t) + 1 * sizeof(int) + 2 * sizeof(size_t); + state_n = 5 + sizeof(size_t) + 2 * sizeof(int) + 1 * sizeof(uint64_t); state_n *= list_elements; state_n += msg_size; @@ -835,6 +884,17 @@ int marshal_server(int fd) buf_set_next(state_buf_, int, value->socket_fd); buf_set_next(state_buf_, int, value->open); buf_set_next(state_buf_, uint64_t, value->id); + /* Marshal interception conditions. */ + buf_set_next(state_buf_, size_t, n = value->interception_conditions_count); + for (j = 0; j < n; j++) + { + interception_condition_t cond = value->interception_conditions[j]; + memcpy(state_buf_, cond.condition, strlen(cond.condition) + 1); + buf_next(state_buf_, char, strlen(cond.condition) + 1); + buf_set_next(state_buf_, size_t, cond.header_hash); + buf_set_next(state_buf_, int64_t, cond.priority); + buf_set_next(state_buf_, int, cond.modifying); + } /* Marshal the message. */ mds_message_marshal(&(value->message), state_buf_, 1); state_buf_ += msg_size / sizeof(char); @@ -994,6 +1054,8 @@ int unmarshal_server(int fd) /* Unmarshal the clients. */ for (i = 0; i < list_elements; i++) { + size_t seek = 0; + size_t j = 0, n = 0; size_t value_address; size_t msg_size; client_t* value; @@ -1014,15 +1076,36 @@ int unmarshal_server(int fd) buf_get_next(state_buf_, int, value->socket_fd); buf_get_next(state_buf_, int, value->open); buf_set_next(state_buf_, uint64_t, value->id); + /* Unmarshal interception conditions. */ + buf_get_next(state_buf_, size_t, value->interception_conditions_count = n); + seek = 5 * sizeof(size_t) + 2 * sizeof(int) + 1 * sizeof(uint64_t); + value->interception_conditions = malloc(n * sizeof(interception_condition_t)); + if (value->interception_conditions == NULL) + { + perror(*argv); + goto clients_fail; + } + for (j = 0; j < n; j++) + { + interception_condition_t* cond = value->interception_conditions + j; + size_t m = strlen(state_buf_) + 1; + if ((cond->condition = malloc(m * sizeof(char))) == NULL) + { + perror(*argv); + goto clients_fail; + } + memcpy(cond->condition, state_buf_, m); + buf_next(state_buf_, char, m); + buf_get_next(state_buf_, size_t, cond->header_hash); + buf_get_next(state_buf_, int64_t, cond->priority); + buf_get_next(state_buf_, int, cond->modifying); + seek += m * sizeof(char) + sizeof(size_t) + sizeof(int64_t) + sizeof(int); + } /* Unmarshal the message. */ if (mds_message_unmarshal(&(value->message), state_buf_)) { perror(*argv); mds_message_destroy(&(value->message)); - free(value); - buf_prev(state_buf_, uint64_t, 1); - buf_prev(state_buf_, int, 2); - buf_prev(state_buf_, size_t, 3); goto clients_fail; } state_buf_ += msg_size / sizeof(char); @@ -1035,15 +1118,35 @@ int unmarshal_server(int fd) continue; clients_fail: with_error = 1; + if (value != NULL) + { + if (value->interception_conditions != NULL) + { + for (j = 0; j < n; j++) + if (value->interception_conditions[j].condition != NULL) + free(value->interception_conditions[j].condition); + free(value->interception_conditions); + } + free(value); + } + state_buf_ -= seek / sizeof(char); for (; i < list_elements; i++) { /* There is not need to close the sockets, it is done by the caller because there are conditions where we cannot get here anyway. */ msg_size = ((size_t*)state_buf_)[1]; - buf_next(state_buf_, size_t, 3); + buf_next(state_buf_, size_t, 4); buf_next(state_buf_, int, 2); buf_next(state_buf_, uint64_t, 1); + buf_get_next(state_buf_, size_t, n); + for (j = 0; j < n; j++) + { + buf_next(state_buf_, char, strlen(state_buf_) + 1); + buf_next(state_buf_, size_t, 1); + buf_next(state_buf_, int64_t, 1); + buf_next(state_buf_, int, 1); + } state_buf_ += msg_size / sizeof(char); } break; diff --git a/src/mds-server.h b/src/mds-server.h index b751125..c8a4ac2 100644 --- a/src/mds-server.h +++ b/src/mds-server.h @@ -26,6 +26,36 @@ #include <stdint.h> + +/** + * A condition for a message being intercepted by a client + */ +typedef struct interception_condition +{ + /** + * The header of messages to intercept, optionally with a value, + * empty (most not be NULL) for all messages. + */ + char* condition; + + /** + * The hash of the header of messages to intercept + */ + size_t header_hash; + + /** + * The interception priority + */ + int64_t priority; + + /** + * Whether the messages may get modified by the client + */ + int modifying; + +} interception_condition_t; + + /** * Client information structure */ @@ -62,10 +92,22 @@ typedef struct client uint64_t id; /** - * Mutex for sending data + * Mutex for sending data and other + * actions that only affacts this client */ pthread_mutex_t mutex; + /** + * The messages interception conditions conditions + * for the client + */ + interception_condition_t* interception_conditions; + + /** + * The number of interception conditions + */ + size_t interception_conditions_count; + } client_t; |