aboutsummaryrefslogtreecommitdiffstats
path: root/src/mds-server/mds-server.c
diff options
context:
space:
mode:
authorMattias Andrée <maandree@operamail.com>2014-05-11 07:02:16 +0200
committerMattias Andrée <maandree@operamail.com>2014-05-11 07:02:16 +0200
commitbc190c5ac25155e0c4a53f8ea74e16082de6229d (patch)
tree4d1e1f509e3b9f506dcdea8191a18ce2e1835d4e /src/mds-server/mds-server.c
parentm + release all resources before re-execing (diff)
downloadmds-bc190c5ac25155e0c4a53f8ea74e16082de6229d.tar.gz
mds-bc190c5ac25155e0c4a53f8ea74e16082de6229d.tar.bz2
mds-bc190c5ac25155e0c4a53f8ea74e16082de6229d.tar.xz
a much of multicasting stuff
Signed-off-by: Mattias Andrée <maandree@operamail.com>
Diffstat (limited to 'src/mds-server/mds-server.c')
-rw-r--r--src/mds-server/mds-server.c186
1 files changed, 156 insertions, 30 deletions
diff --git a/src/mds-server/mds-server.c b/src/mds-server/mds-server.c
index 00f8ceb..bc3b20f 100644
--- a/src/mds-server/mds-server.c
+++ b/src/mds-server/mds-server.c
@@ -16,9 +16,11 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "mds-server.h"
+
#include "interception_condition.h"
#include "client.h"
#include "queued_interception.h"
+#include "multicast.h"
#include <libmdsserver/config.h>
#include <libmdsserver/linked-list.h>
@@ -108,6 +110,11 @@ static linked_list_t client_list;
*/
static uint64_t next_id = 1;
+/**
+ * The next free ID for a message modifications
+ */
+static uint64_t next_modify_id = 1;
+
/**
@@ -457,12 +464,13 @@ void* slave_loop(void* data)
/* Add client to table. */
tmp = fd_table_put(&client_map, socket_fd, (size_t)(void*)information);
- pthread_mutex_unlock(&slave_mutex);
if ((tmp == 0) && errno)
{
perror(*argv);
+ pthread_mutex_unlock(&slave_mutex);
goto fail;
}
+ pthread_mutex_unlock(&slave_mutex);
/* Fill information table. */
information->list_entry = entry;
@@ -503,6 +511,27 @@ void* slave_loop(void* data)
if (information->open)
while (reexecing == 0)
{
+ /* Send queued multicast messages. */
+ if (information->multicasts_count > 0)
+ {
+ multicast_t multicast;
+ with_mutex(information->mutex,
+ if (information->multicasts_count > 0)
+ {
+ size_t c = information->multicasts_count -= 1;
+ multicast = information->multicasts[0];
+ memmove(information->multicasts, information->multicasts + 1, c);
+ if (c == 0)
+ {
+ free(information->multicasts);
+ information->multicasts = NULL;
+ }
+ }
+ );
+ multicast_message(&multicast);
+ multicast_destroy(&multicast);
+ }
+
/* Send queued messages. */
if (information->send_pending_size > 0)
{
@@ -572,7 +601,7 @@ void* slave_loop(void* data)
(uint32_t)(information->id >> 32),
(uint32_t)(information->id >> 0));
n = strlen(msgbuf) + 1;
- multicast_message(msgbuf, n);
+ queue_message_multicast(msgbuf, n, information);
fail: /* The loop does break, this done on success as well. */
@@ -733,13 +762,14 @@ void message_received(client_t* client)
return;
}
mds_message_marshal(&message, msgbuf, 0);
- multicast_message(msgbuf, n / sizeof(char)); /* TODO support re-exec */
- free(msgbuf);
+ queue_message_multicast(msgbuf, n / sizeof(char), client);
/* Send asigned ID. */
if (assign_id)
{
+ char* msgbuf_;
+
/* Construct response. */
n = 2 * 10 + strlen(message_id) + 1;
n += strlen("ID assignment: :\nIn response to: \n\n");
@@ -758,7 +788,14 @@ void message_received(client_t* client)
n = strlen(msgbuf);
/* Multicast the reply. */
- multicast_message(msgbuf, n); /* TODO support re-exec */
+ msgbuf_ = strdup(msgbuf);
+ if (msgbuf_ == NULL)
+ {
+ perror(*argv);
+ free(msgbuf);
+ return;
+ }
+ queue_message_multicast(msgbuf, n, client);
/* Queue message to be sent when this function returns.
This done to simplify `multicast_message` for re-exec. */
@@ -925,12 +962,13 @@ static int cmp_queued_interception(const void* a, const void* b)
/**
- * Multicast a message
+ * Queue a message for multicasting
*
* @param message The message
* @param length The length of the message
+ * @param sender The original sender of the message
*/
-void multicast_message(char* message, size_t length)
+void queue_message_multicast(char* message, size_t length, client_t* sender)
{
size_t header_count = 0;
size_t n = length - 1;
@@ -939,8 +977,12 @@ void multicast_message(char* message, size_t length)
char** header_values = NULL;
queued_interception_t* interceptions = NULL;
size_t interceptions_count = 0;
+ multicast_t* multicast = NULL;
size_t i;
ssize_t node;
+ uint64_t modify_id;
+ char modify_id_header[13 + 3 * sizeof(uint64_t)];
+ char* old_buf;
/* Count the number of headers. */
for (i = 0; i < n; i++)
@@ -954,6 +996,10 @@ void multicast_message(char* message, size_t length)
if (header_count == 0)
return; /* Invalid message. */
+ /* Allocate multicast message. */
+ if (xmalloc(multicast, 1, sizeof(multicast_t))) goto fail;
+ multicast_initialise(multicast);
+
/* Allocate header lists. */
if (xmalloc(hashes, header_count, size_t)) goto fail;
if (xmalloc(headers, header_count, char*)) goto fail;
@@ -1008,7 +1054,6 @@ void multicast_message(char* message, size_t length)
/* Look for a matching condition. */
n = client->interception_conditions_count;
- errno = 0;
if (client->open)
{
with_mutex(mutex,
@@ -1054,48 +1099,127 @@ void multicast_message(char* message, size_t length)
/* Sort interceptors. */
qsort(interceptions, interceptions_count, sizeof(queued_interception_t), cmp_queued_interception);
- /* Send message to interceptors. */
- for (i = 0; i < interceptions_count; i++)
+ /* Add prefix to message with ‘Modify ID’ header. */
+ with_mutex(slave_mutex,
+ modify_id = next_modify_id++;
+ if (next_modify_id == 0)
+ next_modify_id = 1;
+ );
+ xsnprintf(modify_id_header, "Modify ID: %" PRIu64 "\n", modify_id);
+ n = strlen(modify_id_header);
+ old_buf = message;
+ if ((message = realloc(message, (n + length) * sizeof(char))) == NULL)
{
- queued_interception_t client_ = interceptions[i];
+ message = old_buf;
+ goto fail;
+ }
+ memmove(message + n, message, (n + length) * sizeof(char));
+ memcpy(message, modify_id_header, n * sizeof(char));
+
+ /* Store information. */
+ multicast->interceptions = interceptions;
+ multicast->interceptions_count = interceptions_count;
+ multicast->message = message;
+ multicast->message_length = length;
+ multicast->message_prefix = n;
+ message = NULL;
+
+ /* Queue message multicasting. */
+ with_mutex(sender->mutex,
+ if (sender->multicasts_count == 1)
+ {
+ if (xmalloc(sender->multicasts, 1, multicast_t))
+ goto fail_queue;
+ }
+ else
+ {
+ multicast_t* new_buf;
+ new_buf = realloc(sender->multicasts, (sender->multicasts_count + 1) * sizeof(multicast_t));
+ if (new_buf == NULL)
+ goto fail_queue;
+ sender->multicasts = new_buf;
+ }
+ sender->multicasts[sender->multicasts_count++] = *multicast;
+ multicast = NULL;
+ fail_queue:
+ );
+
+ errno = 0;
+ fail: /* This is done before this function returns even if there was no error. */
+ if (errno != 0)
+ perror(*argv);
+ /* Release resources. */
+ xfree(headers, header_count);
+ xfree(header_values, header_count);
+ free(hashes);
+ free(message);
+ if (multicast != NULL)
+ multicast_destroy(multicast);
+ free(multicast);
+}
+
+
+/**
+ * Multicast a message
+ *
+ * @param multicast The multicast message
+ */
+void multicast_message(multicast_t* multicast)
+{
+ for (; multicast->interceptions_ptr < multicast->interceptions_count; multicast->interceptions_ptr++)
+ {
+ queued_interception_t client_ = multicast->interceptions[multicast->interceptions_ptr];
client_t* client = client_.client;
- char* msg = message;
+ char* msg = multicast->message + multicast->message_ptr;
+ size_t n = multicast->message_length - multicast->message_ptr;
size_t sent;
- n = length;
+
+ /* After unmarshalling at re-exec, client will be NULL and must be mapped from its socket. */
+ if (client == NULL)
+ {
+ size_t address;
+ with_mutex(slave_mutex, address = fd_table_get(&client_map, client_.socket_fd););
+ client_.client = client = (client_t*)(void*)address;
+ }
+
+ /* Skip Modify ID header if the interceptors will not perform a modification. */
+ if ((client_.modifying == 0) && (multicast->message_ptr == 0))
+ {
+ n -= multicast->message_prefix;
+ multicast->message_ptr += multicast->message_prefix;
+ }
/* Send the message. */
with_mutex(client->mutex,
+ errno = 0;
if (client->open)
while (n > 0)
{
- sent = send_message(client->socket_fd, msg, n);
- if ((sent < n) && (errno != EINTR)) /* TODO Support reexec */
+ sent = send_message(client->socket_fd, msg + multicast->message_ptr, n);
+ if (sent < n)
{
- perror(*argv);
+ if (errno != EINTR)
+ perror(*argv);
break;
}
n -= sent;
- msg += sent;
+ multicast->message_ptr += sent;
}
);
+ /* Stop if we are re-exec:ing. */
+ if ((n > 0) && (errno == EINTR))
+ return;
+
/* Wait for a reply. */
- if ((n > 0) && client_.modifying)
+ if ((n == 0) && client_.modifying)
{
/* TODO */
}
+
+ /* Reset how much of the message has been sent before we continue with next recipient. */
+ multicast->message_ptr = 0;
}
-
-
- errno = 0;
- fail: /* This is done before this function returns even if there was no error. */
- if (errno != 0)
- perror(*argv);
- /* Release resources. */
- xfree(headers, header_count);
- xfree(header_values, header_count);
- free(interceptions);
- free(hashes);
}
@@ -1226,7 +1350,7 @@ int marshal_server(int fd)
}
/* Add the size of the rest of the program's state. */
- state_n += sizeof(int) + sizeof(sig_atomic_t) + sizeof(uint64_t) + 2 * sizeof(size_t);
+ state_n += sizeof(int) + sizeof(sig_atomic_t) + 2 * sizeof(uint64_t) + 2 * sizeof(size_t);
state_n += list_elements * sizeof(size_t) + list_size + map_size;
/* Allocate a buffer for all data except the client list and the client map. */
@@ -1241,6 +1365,7 @@ int marshal_server(int fd)
/* Marshal the miscellaneous state data. */
buf_set_next(state_buf_, sig_atomic_t, running);
buf_set_next(state_buf_, uint64_t, next_id);
+ buf_set_next(state_buf_, uint64_t, next_modify_id);
/* Tell the program how large the marshalled client list is and how any clients are marshalled. */
buf_set_next(state_buf_, size_t, list_size);
@@ -1338,6 +1463,7 @@ int unmarshal_server(int fd)
/* Unmarshal the miscellaneous state data. */
buf_get_next(state_buf_, sig_atomic_t, running);
buf_get_next(state_buf_, uint64_t, next_id);
+ buf_get_next(state_buf_, uint64_t, next_modify_id);
/* Get the marshalled size of the client list and how any clients that are marshalled. */
buf_get_next(state_buf_, size_t, list_size);