aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorMattias Andrée <maandree@operamail.com>2014-05-07 03:37:58 +0200
committerMattias Andrée <maandree@operamail.com>2014-05-07 03:37:58 +0200
commit86c69dcc64df4e1b0c94b8e3ed4b2ce99d0cc7e4 (patch)
tree6b48a05068a4f0288451fde3b05513afbb80a916
parentm doc (diff)
downloadmds-86c69dcc64df4e1b0c94b8e3ed4b2ce99d0cc7e4.tar.gz
mds-86c69dcc64df4e1b0c94b8e3ed4b2ce99d0cc7e4.tar.bz2
mds-86c69dcc64df4e1b0c94b8e3ed4b2ce99d0cc7e4.tar.xz
I should really write better commit messages
Signed-off-by: Mattias Andrée <maandree@operamail.com>
-rw-r--r--src/mds-server.c183
-rw-r--r--src/mds-server.h44
2 files changed, 186 insertions, 41 deletions
diff --git a/src/mds-server.c b/src/mds-server.c
index 3ddfaa9..d90ce6a 100644
--- a/src/mds-server.c
+++ b/src/mds-server.c
@@ -445,6 +445,7 @@ void* slave_loop(void* data)
ssize_t entry = LINKED_LIST_UNUSED;
size_t information_address = fd_table_get(&client_map, (size_t)socket_fd);
client_t* information = (client_t*)(void*)information_address;
+ int mutex_created = 0;
size_t tmp;
int r;
@@ -459,6 +460,9 @@ void* slave_loop(void* data)
goto fail;
}
+ /* NULL-out pointers. */
+ information->interception_conditions = NULL;
+
/* Add to list of clients. */
pthread_mutex_lock(&slave_mutex);
entry = linked_list_insert_end(&client_list, (size_t)(void*)information);
@@ -483,6 +487,7 @@ void* slave_loop(void* data)
information->socket_fd = socket_fd;
information->open = 1;
information->id = 0;
+ information->interception_conditions_count = 0;
if (mds_message_initialise(&(information->message)))
{
perror(*argv);
@@ -493,8 +498,10 @@ void* slave_loop(void* data)
/* Store the thread so that other threads can kill it. */
information->thread = pthread_self();
- /* Create mutex to make sure two thread to not try to send messages concurrently. */
+ /* Create mutex to make sure two thread to not try to send
+ messages concurrently, and other slave local actions. */
pthread_mutex_init(&(information->mutex), NULL);
+ mutex_created = 1;
/* Make the server update without all slaves dying on SIGUSR1. */
@@ -552,13 +559,23 @@ void* slave_loop(void* data)
close(socket_fd);
if (information != NULL)
{
+ if (information->interception_conditions != NULL)
+ {
+ size_t i;
+ for (i = 0; i < information->interception_conditions_count; i++)
+ if (information->interception_conditions[i].condition != NULL)
+ free(information->interception_conditions[i].condition);
+ free(information->interception_conditions);
+ }
+ if (mutex_created)
+ pthread_mutex_destroy(&(information->mutex));
mds_message_destroy(&(information->message));
free(information);
}
- fd_table_remove(&client_map, socket_fd);
/* Unlist client and decrease the slave count. */
with_mutex(slave_mutex,
+ fd_table_remove(&client_map, socket_fd);
if (entry != LINKED_LIST_UNUSED)
linked_list_remove(&client_list, entry);
running_slaves--;
@@ -590,6 +607,10 @@ void message_received(client_t* client) /* TODO */
{
mds_message_t message = client->message;
int assign_id = 0;
+ int modifying = 0;
+ int intercept = 0;
+ int64_t priority = 0;
+ int stop = 0;
const char* message_id = NULL;
size_t i, n;
char* msgbuf;
@@ -597,41 +618,60 @@ void message_received(client_t* client) /* TODO */
/* Parser headers. */
for (i = 0; i < message.header_count; i++)
{
- const char* header = message.headers[i];
- if (strequals(header, "Command: assign-id"))
- assign_id = 1;
- else if (startswith(header, "Message ID: "))
- message_id = header + strlen("Message ID: ");
+ const char* h = message.headers[i];
+ if (strequals(h, "Command: assign-id")) assign_id = 1;
+ else if (strequals(h, "Command: intercept")) intercept = 1;
+ else if (strequals(h, "Modifying: yes")) modifying = 1;
+ else if (strequals(h, "Stop: yes")) stop = 1;
+ else if (startswith(h, "Message ID: ")) message_id = strstr(h, ": ") + 2;
+ else if (startswith(h, "Priority: ")) priority = atoll(strstr(h, ": ") + 2);
}
- /* Assign ID or reply with current ID. */
- if (assign_id)
+ /* Ignore message if not labeled with a message ID. */
+ if (message_id == NULL)
+ {
+ eprint("received message with a message ID, ignoring.");
+ return;
+ }
+
+ /* Assign ID if not already assigned. */
+ if (assign_id && (client->id == 0))
+ {
+ intercept |= 2;
+ with_mutex(slave_mutex,
+ client->id = next_id++;
+ if (next_id == 0)
+ {
+ eprint("this is impossible, ID counter has overflowed.");
+ /* If the program ran for a millennium it would
+ take c:a 585 assignments per nanosecond. This
+ cannot possibly happen. (It would require serious
+ dedication by generations of ponies (or just an alicorn)
+ to maintain the process and transfer it new hardware.) */
+ abort();
+ }
+ );
+ }
+
+ /* Make the client listen for messages addressed to it. */
+ if (intercept)
{
- /* Assign ID if not already assigned. */
- if (client->id == 0)
+ pthread_mutex_lock(&(client->mutex));
+ if ((intercept & 1)) /* from payload */
{
- /* Do the assignment. */
- with_mutex(slave_mutex,
- client->id = next_id++;
- if (next_id == 0)
- {
- eprint("this is impossible, ID counter has overflowed.");
- /* If the program ran for a millennium it would
- take c:a 585 assignments per nanosecond. This
- cannot possibly happen. (It would require serious
- dedication by generations of ponies (or just an alicorn)
- to maintain the process and transfer it new hardware.) */
- abort();
- }
- );
-
- /* TODO: Make the client listen for messages addressed to it.
- To: $(assign_id)
- Priority: 0
- Modifying: no
- */
+ /* TODO */
}
-
+ if ((intercept & 2)) /* "To: $(client->id)" */
+ {
+ /* TODO */
+ }
+ pthread_mutex_unlock(&(client->mutex));
+ }
+
+
+ /* Send asigned ID. */
+ if (assign_id)
+ {
/* Construct response. */
n = 2 * 10 + strlen(message_id) + 1;
n += strlen("ID assignment: :\nIn response to: \n\n");
@@ -647,7 +687,7 @@ void message_received(client_t* client) /* TODO */
"\n",
(uint32_t)(client->id >> 32),
(uint32_t)(client->id >> 0),
- message_id);
+ message_id == NULL ? "" : message_id);
n = strlen(msgbuf);
/* Send message. */
@@ -778,20 +818,29 @@ int marshal_server(int fd)
size_t state_n;
ssize_t wrote;
ssize_t node;
+ size_t j, n;
/* Calculate the grand size of all messages and their buffers. */
for (node = client_list.edge;; list_elements++)
{
mds_message_t message;
+ client_t* value;
if ((node = client_list.next[node]) == client_list.edge)
break;
- message = ((client_t*)(void*)(client_list.values[node]))->message;
+
+ value = (client_t*)(void*)(client_list.values[node]);
+ n = value->interception_conditions_count;
+ message = value->message;
msg_size += mds_message_marshal_size(&message, 1);
+ msg_size += n * (sizeof(size_t) + sizeof(int64_t) + sizeof(int));
+
+ for (j = 0; j < n; j++)
+ msg_size += (strlen(value->interception_conditions[j].condition) + 1) * sizeof(char);
}
/* Calculate the grand size of all client information. */
- state_n = sizeof(ssize_t) + 1 * sizeof(int) + 2 * sizeof(size_t);
+ state_n = 5 + sizeof(size_t) + 2 * sizeof(int) + 1 * sizeof(uint64_t);
state_n *= list_elements;
state_n += msg_size;
@@ -835,6 +884,17 @@ int marshal_server(int fd)
buf_set_next(state_buf_, int, value->socket_fd);
buf_set_next(state_buf_, int, value->open);
buf_set_next(state_buf_, uint64_t, value->id);
+ /* Marshal interception conditions. */
+ buf_set_next(state_buf_, size_t, n = value->interception_conditions_count);
+ for (j = 0; j < n; j++)
+ {
+ interception_condition_t cond = value->interception_conditions[j];
+ memcpy(state_buf_, cond.condition, strlen(cond.condition) + 1);
+ buf_next(state_buf_, char, strlen(cond.condition) + 1);
+ buf_set_next(state_buf_, size_t, cond.header_hash);
+ buf_set_next(state_buf_, int64_t, cond.priority);
+ buf_set_next(state_buf_, int, cond.modifying);
+ }
/* Marshal the message. */
mds_message_marshal(&(value->message), state_buf_, 1);
state_buf_ += msg_size / sizeof(char);
@@ -994,6 +1054,8 @@ int unmarshal_server(int fd)
/* Unmarshal the clients. */
for (i = 0; i < list_elements; i++)
{
+ size_t seek = 0;
+ size_t j = 0, n = 0;
size_t value_address;
size_t msg_size;
client_t* value;
@@ -1014,15 +1076,36 @@ int unmarshal_server(int fd)
buf_get_next(state_buf_, int, value->socket_fd);
buf_get_next(state_buf_, int, value->open);
buf_set_next(state_buf_, uint64_t, value->id);
+ /* Unmarshal interception conditions. */
+ buf_get_next(state_buf_, size_t, value->interception_conditions_count = n);
+ seek = 5 * sizeof(size_t) + 2 * sizeof(int) + 1 * sizeof(uint64_t);
+ value->interception_conditions = malloc(n * sizeof(interception_condition_t));
+ if (value->interception_conditions == NULL)
+ {
+ perror(*argv);
+ goto clients_fail;
+ }
+ for (j = 0; j < n; j++)
+ {
+ interception_condition_t* cond = value->interception_conditions + j;
+ size_t m = strlen(state_buf_) + 1;
+ if ((cond->condition = malloc(m * sizeof(char))) == NULL)
+ {
+ perror(*argv);
+ goto clients_fail;
+ }
+ memcpy(cond->condition, state_buf_, m);
+ buf_next(state_buf_, char, m);
+ buf_get_next(state_buf_, size_t, cond->header_hash);
+ buf_get_next(state_buf_, int64_t, cond->priority);
+ buf_get_next(state_buf_, int, cond->modifying);
+ seek += m * sizeof(char) + sizeof(size_t) + sizeof(int64_t) + sizeof(int);
+ }
/* Unmarshal the message. */
if (mds_message_unmarshal(&(value->message), state_buf_))
{
perror(*argv);
mds_message_destroy(&(value->message));
- free(value);
- buf_prev(state_buf_, uint64_t, 1);
- buf_prev(state_buf_, int, 2);
- buf_prev(state_buf_, size_t, 3);
goto clients_fail;
}
state_buf_ += msg_size / sizeof(char);
@@ -1035,15 +1118,35 @@ int unmarshal_server(int fd)
continue;
clients_fail:
with_error = 1;
+ if (value != NULL)
+ {
+ if (value->interception_conditions != NULL)
+ {
+ for (j = 0; j < n; j++)
+ if (value->interception_conditions[j].condition != NULL)
+ free(value->interception_conditions[j].condition);
+ free(value->interception_conditions);
+ }
+ free(value);
+ }
+ state_buf_ -= seek / sizeof(char);
for (; i < list_elements; i++)
{
/* There is not need to close the sockets, it is done by
the caller because there are conditions where we cannot
get here anyway. */
msg_size = ((size_t*)state_buf_)[1];
- buf_next(state_buf_, size_t, 3);
+ buf_next(state_buf_, size_t, 4);
buf_next(state_buf_, int, 2);
buf_next(state_buf_, uint64_t, 1);
+ buf_get_next(state_buf_, size_t, n);
+ for (j = 0; j < n; j++)
+ {
+ buf_next(state_buf_, char, strlen(state_buf_) + 1);
+ buf_next(state_buf_, size_t, 1);
+ buf_next(state_buf_, int64_t, 1);
+ buf_next(state_buf_, int, 1);
+ }
state_buf_ += msg_size / sizeof(char);
}
break;
diff --git a/src/mds-server.h b/src/mds-server.h
index b751125..c8a4ac2 100644
--- a/src/mds-server.h
+++ b/src/mds-server.h
@@ -26,6 +26,36 @@
#include <stdint.h>
+
+/**
+ * A condition for a message being intercepted by a client
+ */
+typedef struct interception_condition
+{
+ /**
+ * The header of messages to intercept, optionally with a value,
+ * empty (most not be NULL) for all messages.
+ */
+ char* condition;
+
+ /**
+ * The hash of the header of messages to intercept
+ */
+ size_t header_hash;
+
+ /**
+ * The interception priority
+ */
+ int64_t priority;
+
+ /**
+ * Whether the messages may get modified by the client
+ */
+ int modifying;
+
+} interception_condition_t;
+
+
/**
* Client information structure
*/
@@ -62,10 +92,22 @@ typedef struct client
uint64_t id;
/**
- * Mutex for sending data
+ * Mutex for sending data and other
+ * actions that only affacts this client
*/
pthread_mutex_t mutex;
+ /**
+ * The messages interception conditions conditions
+ * for the client
+ */
+ interception_condition_t* interception_conditions;
+
+ /**
+ * The number of interception conditions
+ */
+ size_t interception_conditions_count;
+
} client_t;