aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
Diffstat (limited to '')
-rw-r--r--src/mds-server/client.c6
-rw-r--r--src/mds-server/client.h20
-rw-r--r--src/mds-server/mds-server.c129
3 files changed, 134 insertions, 21 deletions
diff --git a/src/mds-server/client.c b/src/mds-server/client.c
index 6c46c90..3c8fb2d 100644
--- a/src/mds-server/client.c
+++ b/src/mds-server/client.c
@@ -57,6 +57,10 @@ void client_destroy(client_t* restrict this)
mds_message_destroy(this->modify_message);
free(this->modify_message);
}
+ if (this->modify_mutex_created)
+ pthread_mutex_destroy(&(this->modify_mutex));
+ if (this->modify_cond_created)
+ pthread_cond_destroy(&(this->modify_cond));
free(this);
}
@@ -132,6 +136,8 @@ size_t client_unmarshal(client_t* restrict this, char* restrict data)
this->multicasts = NULL;
this->send_pending = NULL;
this->mutex_created = 0;
+ this->modify_mutex_created = 0;
+ this->modify_cond_created = 0;
this->multicasts_count = 0;
buf_get_next(data, ssize_t, this->list_entry);
buf_get_next(data, int, this->socket_fd);
diff --git a/src/mds-server/client.h b/src/mds-server/client.h
index 972ff84..2aeb813 100644
--- a/src/mds-server/client.h
+++ b/src/mds-server/client.h
@@ -112,6 +112,26 @@ typedef struct client
*/
struct mds_message* modify_message;
+ /**
+ * Mutex for `modify_message`
+ */
+ pthread_mutex_t modify_mutex;
+
+ /**
+ * Condidition for `modify_message`
+ */
+ pthread_cond_t modify_cond;
+
+ /**
+ * Whether `modify_mutex` has been initialised
+ */
+ int modify_mutex_created;
+
+ /**
+ * Whether `modify_cond` has been initialised
+ */
+ int modify_cond_created;
+
} client_t;
diff --git a/src/mds-server/mds-server.c b/src/mds-server/mds-server.c
index 8fbcd44..0c8ba92 100644
--- a/src/mds-server/mds-server.c
+++ b/src/mds-server/mds-server.c
@@ -493,6 +493,8 @@ void* slave_loop(void* data)
information->send_pending = NULL;
information->modify_message = NULL;
information->mutex_created = 0;
+ information->modify_mutex_created = 0;
+ information->modify_cond_created = 0;
/* Add to list of clients. */
pthread_mutex_lock(&slave_mutex);
@@ -540,6 +542,20 @@ void* slave_loop(void* data)
}
information->mutex_created = 1;
+ /* Create mutex and codition for multicast interception replies. */
+ if ((errno = pthread_mutex_init(&(information->modify_mutex), NULL)) != 0)
+ {
+ perror(*argv);
+ goto fail;
+ }
+ information->modify_mutex_created = 1;
+ if ((errno = pthread_cond_init(&(information->modify_cond), NULL)) != 0)
+ {
+ perror(*argv);
+ goto fail;
+ }
+ information->modify_cond_created = 1;
+
/* Make the server update without all slaves dying on SIGUSR1. */
if (xsigaction(SIGUSR1, sigusr1_trap) < 0)
@@ -683,7 +699,7 @@ void* slave_loop(void* data)
*
* @param client The client has sent a message
*/
-void message_received(client_t* client) /* TODO Modify ID */
+void message_received(client_t* client)
{
mds_message_t message = client->message;
int assign_id = 0;
@@ -692,6 +708,7 @@ void message_received(client_t* client) /* TODO Modify ID */
int64_t priority = 0;
int stop = 0;
const char* message_id = NULL;
+ uint64_t modify_id = 0;
size_t i, n;
char* msgbuf;
@@ -706,9 +723,71 @@ void message_received(client_t* client) /* TODO Modify ID */
else if (strequals(h, "Stop: yes")) stop = 1;
else if (startswith(h, "Message ID: ")) message_id = strstr(h, ": ") + 2;
else if (startswith(h, "Priority: ")) priority = atoll(strstr(h, ": ") + 2);
+ else if (startswith(h, "Modify ID: ")) modify_id = (uint64_t)atoll(strstr(h, ": ") + 2);
+ }
+
+
+ /* Notify waiting client about a received message modification. */
+ if (modifying != 0)
+ {
+ /* pthread_cond_timedwait is required to handle re-exec because
+ pthread_cond_timedwait and pthread_cond_wait ignore interruptions via signals. */
+ struct timespec timeout =
+ {
+ .tv_sec = 1,
+ .tv_nsec = 0
+ };
+ size_t address;
+ client_t* recipient;
+ mds_message_t* multicast;
+
+ with_mutex(modify_mutex,
+ while (hash_table_contains_key(&modify_map, (size_t)modify_id) == 0)
+ {
+ /* TODO support re-exec */
+ pthread_cond_timedwait(&slave_cond, &slave_mutex, &timeout);
+ }
+ address = hash_table_get(&modify_map, (size_t)modify_id);
+ recipient = (client_t*)(void*)address;
+ if (xmalloc(multicast = recipient->modify_message, 1, mds_message_t))
+ goto fail;
+ multicast->headers = NULL;
+ multicast->header_count = 0;
+ multicast->payload = NULL;
+ multicast->payload_size = 0;
+ multicast->payload_ptr = 0;
+ multicast->buffer = NULL;
+ multicast->buffer_size = 0;
+ multicast->buffer_ptr = 0;
+ multicast->stage = 0;
+ if (xmalloc(multicast->payload, message.payload_size, char))
+ goto fail;
+ memcpy(multicast->payload, message.payload, message.payload_size * sizeof(char));
+ if (xmalloc(multicast->headers, message.header_count, char*))
+ goto fail;
+ for (i = 0; i < message.header_count; i++, multicast->header_count++)
+ {
+ multicast->headers[i] = strdup(message.headers[i]);
+ if (multicast->headers[i] == NULL)
+ goto fail;
+ }
+ goto done;
+ fail:
+ if (multicast != NULL)
+ {
+ mds_message_destroy(multicast);
+ free(multicast);
+ recipient->modify_message = NULL;
+ }
+ done:
+ );
+ with_mutex(client->modify_mutex, pthread_cond_signal(&(client->modify_cond)););
+
+ /* Do nothing more, not not even multicast this message. */
+ return;
}
- /* Ignore message if not labelled with a message ID. */
+
if (message_id == NULL)
{
eprint("received message with a message ID, ignoring.");
@@ -1292,6 +1371,11 @@ void multicast_message(multicast_t* multicast)
hash_table_put(&modify_map, (size_t)modify_id, (size_t)(void*)client);
pthread_cond_signal(&slave_cond);
}
+ }
+ );
+ with_mutex(client->modify_mutex,
+ if (client->modify_message == NULL)
+ {
for (;;)
{
pthread_cond_timedwait(&slave_cond, &slave_mutex, &timeout);
@@ -1306,29 +1390,32 @@ void multicast_message(multicast_t* multicast)
return;
/* Act upon the reply. */
- mod = client->modify_message;
- for (i = 0; i < mod->header_count; i++)
- if (!strcmp(mod->headers[i], "Modify: yes"))
- {
- modifying = 1;
- break;
- }
- if (modifying)
+ if (client->modify_message != NULL)
{
- old_buf = multicast->message;
- n = mod->payload_size;
- multicast->message = realloc(old_buf, (multicast->message_prefix + n) * sizeof(char));
- if (multicast->message == NULL)
+ mod = client->modify_message;
+ for (i = 0; i < mod->header_count; i++)
+ if (!strcmp(mod->headers[i], "Modify: yes"))
+ {
+ modifying = 1;
+ break;
+ }
+ if (modifying)
{
- perror(*argv);
- multicast->message = old_buf;
+ old_buf = multicast->message;
+ n = mod->payload_size;
+ multicast->message = realloc(old_buf, (multicast->message_prefix + n) * sizeof(char));
+ if (multicast->message == NULL)
+ {
+ perror(*argv);
+ multicast->message = old_buf;
+ }
+ else
+ memcpy(multicast->message + multicast->message_prefix, mod->payload, n);
}
- else
- memcpy(multicast->message + multicast->message_prefix, mod->payload, n);
+
+ /* Free the reply. */
+ mds_message_destroy(client->modify_message);
}
-
- /* Free the reply. */
- mds_message_destroy(client->modify_message);
}
/* Reset how much of the message has been sent before we continue with next recipient. */