aboutsummaryrefslogtreecommitdiffstats
path: root/src/mds-server
diff options
context:
space:
mode:
authorMattias Andrée <maandree@operamail.com>2014-05-11 19:14:57 +0200
committerMattias Andrée <maandree@operamail.com>2014-05-11 19:14:57 +0200
commitd8b997547139babdddadb2e401bb6d3e75ff9227 (patch)
treeb9ef4ead80e1410bb1d5fcc7b78360133a6e8ee9 /src/mds-server
parentderp (diff)
downloadmds-d8b997547139babdddadb2e401bb6d3e75ff9227.tar.gz
mds-d8b997547139babdddadb2e401bb6d3e75ff9227.tar.bz2
mds-d8b997547139babdddadb2e401bb6d3e75ff9227.tar.xz
handle interceptions, everything except receiving modifications
Signed-off-by: Mattias Andrée <maandree@operamail.com>
Diffstat (limited to 'src/mds-server')
-rw-r--r--src/mds-server/client.c26
-rw-r--r--src/mds-server/client.h5
-rw-r--r--src/mds-server/mds-server.c145
3 files changed, 156 insertions, 20 deletions
diff --git a/src/mds-server/client.c b/src/mds-server/client.c
index 0a8b107..6c46c90 100644
--- a/src/mds-server/client.c
+++ b/src/mds-server/client.c
@@ -52,6 +52,11 @@ void client_destroy(client_t* restrict this)
free(this->multicasts);
}
free(this->send_pending);
+ if (this->modify_message != NULL)
+ {
+ mds_message_destroy(this->modify_message);
+ free(this->modify_message);
+ }
free(this);
}
@@ -64,7 +69,7 @@ void client_destroy(client_t* restrict this)
*/
size_t client_marshal_size(const client_t* restrict this)
{
- size_t n = sizeof(ssize_t) + 2 * sizeof(int) + sizeof(uint64_t) + 4 * sizeof(size_t);
+ size_t n = sizeof(ssize_t) + 2 * sizeof(int) + sizeof(uint64_t) + 5 * sizeof(size_t);
size_t i;
n += mds_message_marshal_size(&(this->message), 1);
@@ -73,6 +78,7 @@ size_t client_marshal_size(const client_t* restrict this)
for (i = 0; i < this->multicasts_count; i++)
n += multicast_marshal_size(this->multicasts + i);
n += this->send_pending_size * sizeof(char);
+ n += this->modify_message == NULL ? 0 : mds_message_marshal_size(this->modify_message, 1);
return n;
}
@@ -105,6 +111,9 @@ size_t client_marshal(const client_t* restrict this, char* restrict data)
buf_set_next(data, size_t, this->send_pending_size);
if (this->send_pending_size > 0)
memcpy(data, this->send_pending, this->send_pending_size * sizeof(char));
+ data += this->send_pending_size;
+ if (this->modify_message != NULL)
+ mds_message_marshal(this->modify_message, data, 1);
return client_marshal_size(this);
}
@@ -163,7 +172,13 @@ size_t client_unmarshal(client_t* restrict this, char* restrict data)
if (xmalloc(this->send_pending, this->send_pending_size, char))
goto fail;
memcpy(this->send_pending, data, this->send_pending_size * sizeof(char));
+ data += this->send_pending_size;
}
+ buf_get_next(data, size_t, n);
+ if (n > 0)
+ mds_message_unmarshal(this->modify_message, data);
+ else
+ this->modify_message = NULL;
return rc;
fail:
@@ -175,6 +190,11 @@ size_t client_unmarshal(client_t* restrict this, char* restrict data)
multicast_destroy(this->multicasts + i);
free(this->multicasts);
free(this->send_pending);
+ if (this->modify_message != NULL)
+ {
+ mds_message_destroy(this->modify_message);
+ free(this->modify_message);
+ }
return 0;
}
@@ -186,7 +206,7 @@ size_t client_unmarshal(client_t* restrict this, char* restrict data)
*/
size_t client_unmarshal_skip(char* restrict data)
{
- size_t n, c, rc = sizeof(ssize_t) + 2 * sizeof(int) + sizeof(uint64_t) + 3 * sizeof(size_t);
+ size_t n, c, rc = sizeof(ssize_t) + 2 * sizeof(int) + sizeof(uint64_t) + 5 * sizeof(size_t);
buf_next(data, ssize_t, 1);
buf_next(data, int, 2);
buf_next(data, uint64_t, 1);
@@ -209,6 +229,8 @@ size_t client_unmarshal_skip(char* restrict data)
}
buf_get_next(data, size_t, n);
rc += n * sizeof(char);
+ buf_get_next(data, size_t, n);
+ rc += n * sizeof(char);
return rc;
}
diff --git a/src/mds-server/client.h b/src/mds-server/client.h
index da5d14f..972ff84 100644
--- a/src/mds-server/client.h
+++ b/src/mds-server/client.h
@@ -107,6 +107,11 @@ typedef struct client
*/
size_t send_pending_size;
+ /**
+ * Pending reply to the multicast interception
+ */
+ struct mds_message* modify_message;
+
} client_t;
diff --git a/src/mds-server/mds-server.c b/src/mds-server/mds-server.c
index 694197b..5e60e3d 100644
--- a/src/mds-server/mds-server.c
+++ b/src/mds-server/mds-server.c
@@ -48,6 +48,7 @@
#include <sys/types.h>
#include <dirent.h>
#include <inttypes.h>
+#include <time.h>
@@ -115,6 +116,21 @@ static uint64_t next_id = 1;
*/
static uint64_t next_modify_id = 1;
+/**
+ * Mutex for message modification
+ */
+static pthread_mutex_t modify_mutex;
+
+/**
+ * Condition for message modification
+ */
+static pthread_cond_t modify_cond;
+
+/**
+ * Map from modification ID to waiting client
+ */
+static hash_table_t modify_map;
+
/**
@@ -229,20 +245,28 @@ int main(int argc_, char** argv_)
}
+#define __free(I) \
+ if (I <= 0) fd_table_destroy(&client_map, NULL, NULL); \
+ if (I <= 1) linked_list_destroy(&client_list); \
+ if (I <= 2) pthread_mutex_destroy(&slave_mutex); \
+ if (I <= 3) pthread_cond_destroy(&slave_cond); \
+ if (I <= 4) pthread_mutex_destroy(&modify_mutex); \
+ if (I <= 5) pthread_cond_destroy(&modify_cond)
+
+
/* Create list and table of clients. */
if (reexec == 0)
{
if (fd_table_create(&client_map))
{
perror(*argv);
- fd_table_destroy(&client_map, NULL, NULL);
+ __free(0);
return 1;
}
if (linked_list_create(&client_list, 32))
{
perror(*argv);
- fd_table_destroy(&client_map, NULL, NULL);
- linked_list_destroy(&client_list);
+ __free(1);
return 1;
}
}
@@ -256,26 +280,41 @@ int main(int argc_, char** argv_)
if (xsigaction(SIGUSR1, sigusr1_trap) < 0)
{
perror(*argv);
- fd_table_destroy(&client_map, NULL, NULL);
- linked_list_destroy(&client_list);
+ __free(1);
return 1;
}
-
/* Create mutex and condition for slave counter. */
if ((errno = pthread_mutex_init(&slave_mutex, NULL)) != 0)
{
perror(*argv);
- fd_table_destroy(&client_map, NULL, NULL);
- linked_list_destroy(&client_list);
+ __free(1);
return 1;
}
if ((errno = pthread_cond_init(&slave_cond, NULL)) != 0)
{
perror(*argv);
- fd_table_destroy(&client_map, NULL, NULL);
- linked_list_destroy(&client_list);
- pthread_mutex_destroy(&slave_mutex);
+ __free(2);
+ return 1;
+ }
+
+ /* Create mutex, condition and map for message modification. */
+ if ((errno = pthread_mutex_init(&modify_mutex, NULL)) != 0)
+ {
+ perror(*argv);
+ __free(3);
+ return 1;
+ }
+ if ((errno = pthread_cond_init(&modify_cond, NULL)) != 0)
+ {
+ perror(*argv);
+ __free(4);
+ return 1;
+ }
+ if (hash_table_create(&modify_map))
+ {
+ perror(*argv);
+ __free(5);
return 1;
}
@@ -359,10 +398,9 @@ int main(int argc_, char** argv_)
/* Release resources. */
- fd_table_destroy(&client_map, NULL, NULL);
- linked_list_destroy(&client_list);
- pthread_mutex_destroy(&slave_mutex);
- pthread_cond_destroy(&slave_cond);
+ __free(9999);
+
+#undef __free
return 0;
@@ -382,6 +420,9 @@ int main(int argc_, char** argv_)
/* Release resources. */
pthread_mutex_destroy(&slave_mutex);
pthread_cond_destroy(&slave_cond);
+ pthread_mutex_destroy(&modify_mutex);
+ pthread_cond_destroy(&modify_cond);
+ hash_table_destroy(&modify_map, NULL, NULL);
/* Marshal the state of the server. */
xsnprintf(shm_path, SHM_PATH_PATTERN, (unsigned long int)pid);
@@ -450,6 +491,7 @@ void* slave_loop(void* data)
/* NULL-out pointers and initialisation markers. */
information->interception_conditions = NULL;
information->send_pending = NULL;
+ information->modify_message = NULL;
information->mutex_created = 0;
/* Add to list of clients. */
@@ -641,7 +683,7 @@ void* slave_loop(void* data)
*
* @param client The client has sent a message
*/
-void message_received(client_t* client)
+void message_received(client_t* client) /* TODO Modify ID */
{
mds_message_t message = client->message;
int assign_id = 0;
@@ -1166,6 +1208,16 @@ void queue_message_multicast(char* message, size_t length, client_t* sender)
*/
void multicast_message(multicast_t* multicast)
{
+ uint64_t modify_id = 0;
+ if (strstr(multicast->message, "Modify ID: ") == multicast->message)
+ {
+ char* value = multicast->message + strlen("Modify ID: ");
+ char* lf = strchr(value, '\n');
+ *lf = '\0';
+ modify_id = (uint64_t)atoll(value);
+ *lf = '\n';
+ }
+
for (; multicast->interceptions_ptr < multicast->interceptions_count; multicast->interceptions_ptr++)
{
queued_interception_t client_ = multicast->interceptions[multicast->interceptions_ptr];
@@ -1212,10 +1264,67 @@ void multicast_message(multicast_t* multicast)
if ((n > 0) && reexecing)
return;
- /* Wait for a reply. */
+ /* Wait for a reply and act upon it. */
if ((n == 0) && client_.modifying)
{
- /* TODO */
+ /* pthread_cond_timedwait is required to handle re-exec because
+ pthread_cond_timedwait and pthread_cond_wait ignore interruptions via signals. */
+ struct timespec timeout =
+ {
+ .tv_sec = 1,
+ .tv_nsec = 0
+ };
+ int modifying = 0;
+ char* old_buf;
+ size_t i;
+ mds_message_t* mod;
+
+ /* Wait for a reply. */
+ with_mutex(modify_mutex,
+ if (client->modify_message == NULL)
+ {
+ if (hash_table_contains_key(&modify_map, (size_t)modify_id) == 0)
+ {
+ hash_table_put(&modify_map, (size_t)modify_id, (size_t)(void*)client);
+ pthread_cond_signal(&slave_cond);
+ }
+ for (;;)
+ {
+ pthread_cond_timedwait(&slave_cond, &slave_mutex, &timeout);
+ if ((client->modify_message != NULL) && reexecing)
+ break;
+ }
+ if (reexecing == 0)
+ hash_table_remove(&modify_map, (size_t)modify_id);
+ }
+ );
+ if (reexecing)
+ return;
+
+ /* Act upon the reply. */
+ mod = client->modify_message;
+ for (i = 0; i < mod->header_count; i++)
+ if (!strcmp(mod->headers[i], "Modify: yes"))
+ {
+ modifying = 1;
+ break;
+ }
+ if (modifying)
+ {
+ old_buf = multicast->message;
+ n = mod->payload_size;
+ multicast->message = realloc(old_buf, (multicast->message_prefix + n) * sizeof(char));
+ if (multicast->message == NULL)
+ {
+ perror(*argv);
+ multicast->message = old_buf;
+ }
+ else
+ memcpy(multicast->message + multicast->message_prefix, mod->payload, n);
+ }
+
+ /* Free the reply. */
+ mds_message_destroy(client->modify_message);
}
/* Reset how much of the message has been sent before we continue with next recipient. */