aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorMattias Andrée <maandree@operamail.com>2014-05-11 07:02:16 +0200
committerMattias Andrée <maandree@operamail.com>2014-05-11 07:02:16 +0200
commitbc190c5ac25155e0c4a53f8ea74e16082de6229d (patch)
tree4d1e1f509e3b9f506dcdea8191a18ce2e1835d4e
parentm + release all resources before re-execing (diff)
downloadmds-bc190c5ac25155e0c4a53f8ea74e16082de6229d.tar.gz
mds-bc190c5ac25155e0c4a53f8ea74e16082de6229d.tar.bz2
mds-bc190c5ac25155e0c4a53f8ea74e16082de6229d.tar.xz
a much of multicasting stuff
Signed-off-by: Mattias Andrée <maandree@operamail.com>
-rw-r--r--Makefile15
-rw-r--r--src/libmdsserver/macros.h4
-rw-r--r--src/mds-server/client.c40
-rw-r--r--src/mds-server/client.h15
-rw-r--r--src/mds-server/mds-server.c186
-rw-r--r--src/mds-server/mds-server.h13
-rw-r--r--src/mds-server/multicast.c156
-rw-r--r--src/mds-server/multicast.h120
-rw-r--r--src/mds-server/queued_interception.c78
-rw-r--r--src/mds-server/queued_interception.h42
10 files changed, 623 insertions, 46 deletions
diff --git a/Makefile b/Makefile
index acea291..d891b85 100644
--- a/Makefile
+++ b/Makefile
@@ -69,31 +69,32 @@ LIBOBJ = linked-list hash-table fd-table mds-message util
all: bin/mds bin/mds-server/mds-server bin/libmdsserver.so
-MDS_SERVER_OBJ = mds-server/mds-server mds-server/interception_condition mds-server/client
+MDS_SERVER_OBJ = mds-server/mds-server mds-server/interception_condition mds-server/client \
+ mds-server/multicast mds-server/queued_interception
bin/mds-server/mds-server: $(foreach O,$(MDS_SERVER_OBJ),obj/$(O).o) bin/libmdsserver.so
mkdir -p $(shell dirname $@)
- gcc $(C_FLAGS) -o $@ -Lbin -lmdsserver -lrt $(foreach O,$(MDS_SERVER_OBJ),obj/$(O).o)
+ $(CC) $(C_FLAGS) -o $@ -Lbin -lmdsserver -lrt $(foreach O,$(MDS_SERVER_OBJ),obj/$(O).o)
bin/%: obj/%.o bin/libmdsserver.so
mkdir -p $(shell dirname $@)
- gcc $(C_FLAGS) -o $@ -Lbin -lmdsserver -lrt obj/$*.o
+ $(CC) $(C_FLAGS) -o $@ -Lbin -lmdsserver -lrt obj/$*.o
obj/mds-server/%.o: src/mds-server/%.c src/mds-server/*.h src/libmdsserver/*.h
mkdir -p $(shell dirname $@)
- gcc $(C_FLAGS) -Isrc -c -o $@ $<
+ $(CC) $(C_FLAGS) -Isrc -c -o $@ $<
obj/%.o: src/%.c src/*.h src/libmdsserver/*.h
mkdir -p $(shell dirname $@)
- gcc $(C_FLAGS) -Isrc -c -o $@ $<
+ $(CC) $(C_FLAGS) -Isrc -c -o $@ $<
bin/libmdsserver.so: $(foreach O,$(LIBOBJ),obj/libmdsserver/$(O).o)
mkdir -p $(shell dirname $@)
- gcc $(C_FLAGS) -shared -o $@ $^
+ $(CC) $(C_FLAGS) -shared -o $@ $^
obj/libmdsserver/%.o: src/libmdsserver/%.c src/libmdsserver/*.h
mkdir -p $(shell dirname $@)
- gcc $(C_FLAGS) -fPIC -c -o $@ $<
+ $(CC) $(C_FLAGS) -fPIC -c -o $@ $<
diff --git a/src/libmdsserver/macros.h b/src/libmdsserver/macros.h
index 32024a3..d70afc9 100644
--- a/src/libmdsserver/macros.h
+++ b/src/libmdsserver/macros.h
@@ -73,9 +73,9 @@
* @param instructions The instructions to run while the mutex is locked
*/
#define with_mutex(mutex, instructions) \
- pthread_mutex_lock(&(mutex)); \
+ errno = pthread_mutex_lock(&(mutex)); \
instructions \
- pthread_mutex_unlock(&(mutex))
+ errno = pthread_mutex_unlock(&(mutex))
/**
diff --git a/src/mds-server/client.c b/src/mds-server/client.c
index 0e8e0b5..0a8b107 100644
--- a/src/mds-server/client.c
+++ b/src/mds-server/client.c
@@ -17,6 +17,8 @@
*/
#include "client.h"
+#include "multicast.h"
+
#include <libmdsserver/macros.h>
#include <stdlib.h>
@@ -42,6 +44,13 @@ void client_destroy(client_t* restrict this)
if (this->mutex_created)
pthread_mutex_destroy(&(this->mutex));
mds_message_destroy(&(this->message));
+ if (this->multicasts != NULL)
+ {
+ size_t i;
+ for (i = 0; i < this->multicasts_count; i++)
+ multicast_destroy(this->multicasts + i);
+ free(this->multicasts);
+ }
free(this->send_pending);
free(this);
}
@@ -55,12 +64,14 @@ void client_destroy(client_t* restrict this)
*/
size_t client_marshal_size(const client_t* restrict this)
{
- size_t n = sizeof(ssize_t) + 2 * sizeof(int) + sizeof(uint64_t) + 3 * sizeof(size_t);
+ size_t n = sizeof(ssize_t) + 2 * sizeof(int) + sizeof(uint64_t) + 4 * sizeof(size_t);
size_t i;
n += mds_message_marshal_size(&(this->message), 1);
for (i = 0; i < this->interception_conditions_count; i++)
n += interception_condition_marshal_size(this->interception_conditions + i);
+ for (i = 0; i < this->multicasts_count; i++)
+ n += multicast_marshal_size(this->multicasts + i);
n += this->send_pending_size * sizeof(char);
return n;
@@ -88,6 +99,9 @@ size_t client_marshal(const client_t* restrict this, char* restrict data)
buf_set_next(data, size_t, this->interception_conditions_count);
for (i = 0; i < this->interception_conditions_count; i++)
data += interception_condition_marshal(this->interception_conditions + i, data) / sizeof(char);
+ buf_set_next(data, size_t, this->multicasts_count);
+ for (i = 0; i < this->multicasts_count; i++)
+ data += multicast_marshal(this->multicasts + i, data) / sizeof(char);
buf_set_next(data, size_t, this->send_pending_size);
if (this->send_pending_size > 0)
memcpy(data, this->send_pending, this->send_pending_size * sizeof(char));
@@ -104,10 +118,12 @@ size_t client_marshal(const client_t* restrict this, char* restrict data)
*/
size_t client_unmarshal(client_t* restrict this, char* restrict data)
{
- size_t i, n, rc = sizeof(ssize_t) + 2 * sizeof(int) + sizeof(uint64_t) + 3 * sizeof(size_t);
+ size_t i, n, rc = sizeof(ssize_t) + 2 * sizeof(int) + sizeof(uint64_t) + 4 * sizeof(size_t);
this->interception_conditions = NULL;
+ this->multicasts = NULL;
this->send_pending = NULL;
this->mutex_created = 0;
+ this->multicasts_count = 0;
buf_get_next(data, ssize_t, this->list_entry);
buf_get_next(data, int, this->socket_fd);
buf_get_next(data, int, this->open);
@@ -131,6 +147,16 @@ size_t client_unmarshal(client_t* restrict this, char* restrict data)
data += n / sizeof(char);
rc += n;
}
+ buf_get_next(data, size_t, n);
+ if (xmalloc(this->multicasts, n, sizeof(multicast_t)))
+ goto fail;
+ for (i = 0; i < n; i++, this->multicasts_count++)
+ {
+ size_t m = multicast_unmarshal(this->multicasts + i, data);
+ if (m == 0)
+ goto fail;
+ data += m / sizeof(char);
+ }
buf_get_next(data, size_t, this->send_pending_size);
if (this->send_pending_size > 0)
{
@@ -145,6 +171,9 @@ size_t client_unmarshal(client_t* restrict this, char* restrict data)
for (i = 0; i < this->interception_conditions_count; i++)
free(this->interception_conditions[i].condition);
free(this->interception_conditions);
+ for (i = 0; i < this->multicasts_count; i++)
+ multicast_destroy(this->multicasts + i);
+ free(this->multicasts);
free(this->send_pending);
return 0;
}
@@ -167,6 +196,13 @@ size_t client_unmarshal_skip(char* restrict data)
buf_get_next(data, size_t, c);
while (c--)
{
+ n = multicast_unmarshal_skip(data);
+ data += n / sizeof(char);
+ rc += n;
+ }
+ buf_get_next(data, size_t, c);
+ while (c--)
+ {
n = interception_condition_unmarshal_skip(data);
data += n / sizeof(char);
rc += n;
diff --git a/src/mds-server/client.h b/src/mds-server/client.h
index 6b98ade..da5d14f 100644
--- a/src/mds-server/client.h
+++ b/src/mds-server/client.h
@@ -20,6 +20,7 @@
#include "interception_condition.h"
+#include "multicast.h"
#include <libmdsserver/mds-message.h>
@@ -52,7 +53,7 @@ typedef struct client
/**
* Message read buffer for the client
*/
- mds_message_t message;
+ struct mds_message message;
/**
* The read thread for the client
@@ -79,7 +80,7 @@ typedef struct client
* The messages interception conditions conditions
* for the client
*/
- interception_condition_t* interception_conditions;
+ struct interception_condition* interception_conditions;
/**
* The number of interception conditions
@@ -87,6 +88,16 @@ typedef struct client
size_t interception_conditions_count;
/**
+ * Pending multicast messages
+ */
+ struct multicast* multicasts;
+
+ /**
+ * The number of pending multicast messages
+ */
+ size_t multicasts_count;
+
+ /**
* Messages pending to be sent (concatenated)
*/
char* send_pending;
diff --git a/src/mds-server/mds-server.c b/src/mds-server/mds-server.c
index 00f8ceb..bc3b20f 100644
--- a/src/mds-server/mds-server.c
+++ b/src/mds-server/mds-server.c
@@ -16,9 +16,11 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "mds-server.h"
+
#include "interception_condition.h"
#include "client.h"
#include "queued_interception.h"
+#include "multicast.h"
#include <libmdsserver/config.h>
#include <libmdsserver/linked-list.h>
@@ -108,6 +110,11 @@ static linked_list_t client_list;
*/
static uint64_t next_id = 1;
+/**
+ * The next free ID for a message modifications
+ */
+static uint64_t next_modify_id = 1;
+
/**
@@ -457,12 +464,13 @@ void* slave_loop(void* data)
/* Add client to table. */
tmp = fd_table_put(&client_map, socket_fd, (size_t)(void*)information);
- pthread_mutex_unlock(&slave_mutex);
if ((tmp == 0) && errno)
{
perror(*argv);
+ pthread_mutex_unlock(&slave_mutex);
goto fail;
}
+ pthread_mutex_unlock(&slave_mutex);
/* Fill information table. */
information->list_entry = entry;
@@ -503,6 +511,27 @@ void* slave_loop(void* data)
if (information->open)
while (reexecing == 0)
{
+ /* Send queued multicast messages. */
+ if (information->multicasts_count > 0)
+ {
+ multicast_t multicast;
+ with_mutex(information->mutex,
+ if (information->multicasts_count > 0)
+ {
+ size_t c = information->multicasts_count -= 1;
+ multicast = information->multicasts[0];
+ memmove(information->multicasts, information->multicasts + 1, c);
+ if (c == 0)
+ {
+ free(information->multicasts);
+ information->multicasts = NULL;
+ }
+ }
+ );
+ multicast_message(&multicast);
+ multicast_destroy(&multicast);
+ }
+
/* Send queued messages. */
if (information->send_pending_size > 0)
{
@@ -572,7 +601,7 @@ void* slave_loop(void* data)
(uint32_t)(information->id >> 32),
(uint32_t)(information->id >> 0));
n = strlen(msgbuf) + 1;
- multicast_message(msgbuf, n);
+ queue_message_multicast(msgbuf, n, information);
fail: /* The loop does break, this done on success as well. */
@@ -733,13 +762,14 @@ void message_received(client_t* client)
return;
}
mds_message_marshal(&message, msgbuf, 0);
- multicast_message(msgbuf, n / sizeof(char)); /* TODO support re-exec */
- free(msgbuf);
+ queue_message_multicast(msgbuf, n / sizeof(char), client);
/* Send asigned ID. */
if (assign_id)
{
+ char* msgbuf_;
+
/* Construct response. */
n = 2 * 10 + strlen(message_id) + 1;
n += strlen("ID assignment: :\nIn response to: \n\n");
@@ -758,7 +788,14 @@ void message_received(client_t* client)
n = strlen(msgbuf);
/* Multicast the reply. */
- multicast_message(msgbuf, n); /* TODO support re-exec */
+ msgbuf_ = strdup(msgbuf);
+ if (msgbuf_ == NULL)
+ {
+ perror(*argv);
+ free(msgbuf);
+ return;
+ }
+ queue_message_multicast(msgbuf, n, client);
/* Queue message to be sent when this function returns.
This done to simplify `multicast_message` for re-exec. */
@@ -925,12 +962,13 @@ static int cmp_queued_interception(const void* a, const void* b)
/**
- * Multicast a message
+ * Queue a message for multicasting
*
* @param message The message
* @param length The length of the message
+ * @param sender The original sender of the message
*/
-void multicast_message(char* message, size_t length)
+void queue_message_multicast(char* message, size_t length, client_t* sender)
{
size_t header_count = 0;
size_t n = length - 1;
@@ -939,8 +977,12 @@ void multicast_message(char* message, size_t length)
char** header_values = NULL;
queued_interception_t* interceptions = NULL;
size_t interceptions_count = 0;
+ multicast_t* multicast = NULL;
size_t i;
ssize_t node;
+ uint64_t modify_id;
+ char modify_id_header[13 + 3 * sizeof(uint64_t)];
+ char* old_buf;
/* Count the number of headers. */
for (i = 0; i < n; i++)
@@ -954,6 +996,10 @@ void multicast_message(char* message, size_t length)
if (header_count == 0)
return; /* Invalid message. */
+ /* Allocate multicast message. */
+ if (xmalloc(multicast, 1, sizeof(multicast_t))) goto fail;
+ multicast_initialise(multicast);
+
/* Allocate header lists. */
if (xmalloc(hashes, header_count, size_t)) goto fail;
if (xmalloc(headers, header_count, char*)) goto fail;
@@ -1008,7 +1054,6 @@ void multicast_message(char* message, size_t length)
/* Look for a matching condition. */
n = client->interception_conditions_count;
- errno = 0;
if (client->open)
{
with_mutex(mutex,
@@ -1054,48 +1099,127 @@ void multicast_message(char* message, size_t length)
/* Sort interceptors. */
qsort(interceptions, interceptions_count, sizeof(queued_interception_t), cmp_queued_interception);
- /* Send message to interceptors. */
- for (i = 0; i < interceptions_count; i++)
+ /* Add prefix to message with ‘Modify ID’ header. */
+ with_mutex(slave_mutex,
+ modify_id = next_modify_id++;
+ if (next_modify_id == 0)
+ next_modify_id = 1;
+ );
+ xsnprintf(modify_id_header, "Modify ID: %" PRIu64 "\n", modify_id);
+ n = strlen(modify_id_header);
+ old_buf = message;
+ if ((message = realloc(message, (n + length) * sizeof(char))) == NULL)
{
- queued_interception_t client_ = interceptions[i];
+ message = old_buf;
+ goto fail;
+ }
+ memmove(message + n, message, (n + length) * sizeof(char));
+ memcpy(message, modify_id_header, n * sizeof(char));
+
+ /* Store information. */
+ multicast->interceptions = interceptions;
+ multicast->interceptions_count = interceptions_count;
+ multicast->message = message;
+ multicast->message_length = length;
+ multicast->message_prefix = n;
+ message = NULL;
+
+ /* Queue message multicasting. */
+ with_mutex(sender->mutex,
+ if (sender->multicasts_count == 1)
+ {
+ if (xmalloc(sender->multicasts, 1, multicast_t))
+ goto fail_queue;
+ }
+ else
+ {
+ multicast_t* new_buf;
+ new_buf = realloc(sender->multicasts, (sender->multicasts_count + 1) * sizeof(multicast_t));
+ if (new_buf == NULL)
+ goto fail_queue;
+ sender->multicasts = new_buf;
+ }
+ sender->multicasts[sender->multicasts_count++] = *multicast;
+ multicast = NULL;
+ fail_queue:
+ );
+
+ errno = 0;
+ fail: /* This is done before this function returns even if there was no error. */
+ if (errno != 0)
+ perror(*argv);
+ /* Release resources. */
+ xfree(headers, header_count);
+ xfree(header_values, header_count);
+ free(hashes);
+ free(message);
+ if (multicast != NULL)
+ multicast_destroy(multicast);
+ free(multicast);
+}
+
+
+/**
+ * Multicast a message
+ *
+ * @param multicast The multicast message
+ */
+void multicast_message(multicast_t* multicast)
+{
+ for (; multicast->interceptions_ptr < multicast->interceptions_count; multicast->interceptions_ptr++)
+ {
+ queued_interception_t client_ = multicast->interceptions[multicast->interceptions_ptr];
client_t* client = client_.client;
- char* msg = message;
+ char* msg = multicast->message + multicast->message_ptr;
+ size_t n = multicast->message_length - multicast->message_ptr;
size_t sent;
- n = length;
+
+ /* After unmarshalling at re-exec, client will be NULL and must be mapped from its socket. */
+ if (client == NULL)
+ {
+ size_t address;
+ with_mutex(slave_mutex, address = fd_table_get(&client_map, client_.socket_fd););
+ client_.client = client = (client_t*)(void*)address;
+ }
+
+ /* Skip Modify ID header if the interceptors will not perform a modification. */
+ if ((client_.modifying == 0) && (multicast->message_ptr == 0))
+ {
+ n -= multicast->message_prefix;
+ multicast->message_ptr += multicast->message_prefix;
+ }
/* Send the message. */
with_mutex(client->mutex,
+ errno = 0;
if (client->open)
while (n > 0)
{
- sent = send_message(client->socket_fd, msg, n);
- if ((sent < n) && (errno != EINTR)) /* TODO Support reexec */
+ sent = send_message(client->socket_fd, msg + multicast->message_ptr, n);
+ if (sent < n)
{
- perror(*argv);
+ if (errno != EINTR)
+ perror(*argv);
break;
}
n -= sent;
- msg += sent;
+ multicast->message_ptr += sent;
}
);
+ /* Stop if we are re-exec:ing. */
+ if ((n > 0) && (errno == EINTR))
+ return;
+
/* Wait for a reply. */
- if ((n > 0) && client_.modifying)
+ if ((n == 0) && client_.modifying)
{
/* TODO */
}
+
+ /* Reset how much of the message has been sent before we continue with next recipient. */
+ multicast->message_ptr = 0;
}
-
-
- errno = 0;
- fail: /* This is done before this function returns even if there was no error. */
- if (errno != 0)
- perror(*argv);
- /* Release resources. */
- xfree(headers, header_count);
- xfree(header_values, header_count);
- free(interceptions);
- free(hashes);
}
@@ -1226,7 +1350,7 @@ int marshal_server(int fd)
}
/* Add the size of the rest of the program's state. */
- state_n += sizeof(int) + sizeof(sig_atomic_t) + sizeof(uint64_t) + 2 * sizeof(size_t);
+ state_n += sizeof(int) + sizeof(sig_atomic_t) + 2 * sizeof(uint64_t) + 2 * sizeof(size_t);
state_n += list_elements * sizeof(size_t) + list_size + map_size;
/* Allocate a buffer for all data except the client list and the client map. */
@@ -1241,6 +1365,7 @@ int marshal_server(int fd)
/* Marshal the miscellaneous state data. */
buf_set_next(state_buf_, sig_atomic_t, running);
buf_set_next(state_buf_, uint64_t, next_id);
+ buf_set_next(state_buf_, uint64_t, next_modify_id);
/* Tell the program how large the marshalled client list is and how any clients are marshalled. */
buf_set_next(state_buf_, size_t, list_size);
@@ -1338,6 +1463,7 @@ int unmarshal_server(int fd)
/* Unmarshal the miscellaneous state data. */
buf_get_next(state_buf_, sig_atomic_t, running);
buf_get_next(state_buf_, uint64_t, next_id);
+ buf_get_next(state_buf_, uint64_t, next_modify_id);
/* Get the marshalled size of the client list and how any clients that are marshalled. */
buf_get_next(state_buf_, size_t, list_size);
diff --git a/src/mds-server/mds-server.h b/src/mds-server/mds-server.h
index b2dafd4..4dd3c4a 100644
--- a/src/mds-server/mds-server.h
+++ b/src/mds-server/mds-server.h
@@ -20,6 +20,7 @@
#include "client.h"
+#include "multicast.h"
#include <stddef.h>
@@ -53,12 +54,20 @@ void message_received(client_t* client);
void add_intercept_condition(client_t* client, char* condition, int64_t priority, int modifying, int stop);
/**
- * Multicast a message
+ * Queue a message for multicasting
*
* @param message The message
* @param length The length of the message
+ * @param sender The original sender of the message
+ */
+void queue_message_multicast(char* message, size_t length, client_t* sender);
+
+/**
+ * Multicast a message
+ *
+ * @param multicast The multicast message
*/
-void multicast_message(char* message, size_t length);
+void multicast_message(multicast_t* multicast);
/**
* Exec into the mdsinitrc script
diff --git a/src/mds-server/multicast.c b/src/mds-server/multicast.c
new file mode 100644
index 0000000..9be0e8d
--- /dev/null
+++ b/src/mds-server/multicast.c
@@ -0,0 +1,156 @@
+/**
+ * mds — A micro-display server
+ * Copyright © 2014 Mattias Andrée (maandree@member.fsf.org)
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation, either version 3 of the License, or
+ * (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
+ */
+#include "multicast.h"
+
+#include "interception_condition.h"
+
+#include <libmdsserver/macros.h>
+
+#include <stdlib.h>
+#include <string.h>
+
+
+/**
+ * Initialise a message multicast state
+ *
+ * @param this The message multicast state
+ */
+void multicast_initialise(multicast_t* restrict this)
+{
+ this->interceptions = NULL;
+ this->interceptions_count = 0;
+ this->interceptions_ptr = 0;
+ this->message = NULL;
+ this->message_length = 0;
+ this->message_ptr = 0;
+ this->message_prefix = 0;
+}
+
+
+/**
+ * Destroy a message multicast state
+ *
+ * @param this The message multicast state
+ */
+void multicast_destroy(multicast_t* restrict this)
+{
+ free(this->interceptions);
+ free(this->message);
+}
+
+
+/**
+ * Calculate the buffer size need to marshal a message multicast state
+ *
+ * @param this The client information
+ * @return The number of bytes to allocate to the output buffer
+ */
+size_t multicast_marshal_size(const multicast_t* restrict this)
+{
+ size_t rc = 5 * sizeof(size_t) + this->message_length * sizeof(char);
+ size_t i;
+ for (i = 0; i < this->interceptions_count; i++)
+ rc += queued_interception_marshal_size();
+ return rc;
+}
+
+
+/**
+ * Marshals a message multicast state
+ *
+ * @param this The message multicast state
+ * @param data Output buffer for the marshalled data
+ * @return The number of bytes that have been written (everything will be written)
+ */
+size_t multicast_marshal(const multicast_t* restrict this, char* restrict data)
+{
+ size_t rc = 5 * sizeof(size_t);
+ size_t i, n;
+ buf_set_next(data, size_t, this->interceptions_count);
+ buf_set_next(data, size_t, this->interceptions_ptr);
+ buf_set_next(data, size_t, this->message_length);
+ buf_set_next(data, size_t, this->message_ptr);
+ buf_set_next(data, size_t, this->message_prefix);
+ for (i = 0; i < this->interceptions_count; i++)
+ {
+ n = queued_interception_marshal(this->interceptions + i, data);
+ data += n / sizeof(char);
+ rc += n;
+ }
+ memcpy(data, this->message, this->message_length * sizeof(char));
+ rc += this->message_length * sizeof(char);
+ return rc;
+}
+
+
+/**
+ * Unmarshals a message multicast state
+ *
+ * @param this Memory slot in which to store the new message multicast state
+ * @param data In buffer with the marshalled data
+ * @return Zero on error, errno will be set accordingly, otherwise the number of read bytes.
+ * Destroy the client information on error.
+ */
+size_t multicast_unmarshal(multicast_t* restrict this, char* restrict data)
+{
+ size_t rc = 5 * sizeof(size_t);
+ size_t i, n;
+ this->interceptions = NULL;
+ this->message = NULL;
+ buf_get_next(data, size_t, this->interceptions_count);
+ buf_get_next(data, size_t, this->interceptions_ptr);
+ buf_get_next(data, size_t, this->message_length);
+ buf_get_next(data, size_t, this->message_ptr);
+ buf_get_next(data, size_t, this->message_prefix);
+ if (xmalloc(this->interceptions, this->interceptions_count, queued_interception_t))
+ return 0;
+ for (i = 0; i < this->interceptions_count; i++)
+ {
+ n = queued_interception_unmarshal(this->interceptions + i, data);
+ data += n / sizeof(char);
+ rc += n;
+ }
+ if (xmalloc(this->message, this->message_length, char))
+ return 0;
+ memcpy(this->message, data, this->message_length * sizeof(char));
+ rc += this->message_length * sizeof(char);
+ return rc;
+}
+
+
+/**
+ * Pretend to unmarshal a message multicast state
+ *
+ * @param data In buffer with the marshalled data
+ * @return The number of read bytes
+ */
+size_t multicast_unmarshal_skip(char* restrict data)
+{
+ size_t interceptions_count = buf_cast(data, size_t, 0);
+ size_t message_length = buf_cast(data, size_t, 2);
+ size_t rc = 5 * sizeof(size_t) + message_length * sizeof(char);
+ size_t n;
+ while (interceptions_count--)
+ {
+ n = queued_interception_unmarshal_skip();
+ data += n / sizeof(char);
+ rc += n;
+ }
+ return rc;
+}
+
diff --git a/src/mds-server/multicast.h b/src/mds-server/multicast.h
new file mode 100644
index 0000000..3ea1e07
--- /dev/null
+++ b/src/mds-server/multicast.h
@@ -0,0 +1,120 @@
+/**
+ * mds — A micro-display server
+ * Copyright © 2014 Mattias Andrée (maandree@member.fsf.org)
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation, either version 3 of the License, or
+ * (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
+ */
+#ifndef MDS_MDS_SERVER_MULTICAST_H
+#define MDS_MDS_SERVER_MULTICAST_H
+
+
+#include "queued_interception.h"
+
+
+/**
+ * Message multicast state
+ */
+typedef struct multicast
+{
+ /**
+ * Queue of clients that is listening this message
+ */
+ struct queued_interception* interceptions;
+
+ /**
+ * The number of clients in `interceptions`
+ */
+ size_t interceptions_count;
+
+ /**
+ * The index of the current/next client in `interceptions` to whom to send the message
+ */
+ size_t interceptions_ptr;
+
+ /**
+ * The message to send
+ */
+ char* message;
+
+ /**
+ * The length of `message`
+ */
+ size_t message_length;
+
+ /**
+ * How much of the message that has already been sent to the current recipient
+ */
+ size_t message_ptr;
+
+ /**
+ * How much of the message to skip if the recipient is not a modifier
+ */
+ size_t message_prefix;
+
+} multicast_t;
+
+
+/**
+ * Initialise a message multicast state
+ *
+ * @param this The message multicast state
+ */
+void multicast_initialise(multicast_t* restrict this);
+
+/**
+ * Destroy a message multicast state
+ *
+ * @param this The message multicast state
+ */
+void multicast_destroy(multicast_t* restrict this);
+
+/**
+ * Calculate the buffer size need to marshal a message multicast state
+ *
+ * @param this The client information
+ * @return The number of bytes to allocate to the output buffer
+ */
+size_t multicast_marshal_size(const multicast_t* restrict this) __attribute__((pure));
+
+/**
+ * Marshals a message multicast state
+ *
+ * @param this The message multicast state
+ * @param data Output buffer for the marshalled data
+ * @return The number of bytes that have been written (everything will be written)
+ */
+size_t multicast_marshal(const multicast_t* restrict this, char* restrict data);
+
+/**
+ * Unmarshals a message multicast state
+ *
+ * @param this Memory slot in which to store the new message multicast state
+ * @param data In buffer with the marshalled data
+ * @return Zero on error, errno will be set accordingly, otherwise the number of read bytes.
+ * Destroy the message multicast state on error.
+ */
+size_t multicast_unmarshal(multicast_t* restrict this, char* restrict data);
+
+/**
+ * Pretend to unmarshal a message multicast state
+ *
+ * @param data In buffer with the marshalled data
+ * @return The number of read bytes
+ */
+size_t multicast_unmarshal_skip(char* restrict data) __attribute__((pure));
+
+
+
+#endif
+
diff --git a/src/mds-server/queued_interception.c b/src/mds-server/queued_interception.c
new file mode 100644
index 0000000..56d58c3
--- /dev/null
+++ b/src/mds-server/queued_interception.c
@@ -0,0 +1,78 @@
+/**
+ * mds — A micro-display server
+ * Copyright © 2014 Mattias Andrée (maandree@member.fsf.org)
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation, either version 3 of the License, or
+ * (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
+ */
+#include "queued_interception.h"
+
+#include <libmdsserver/macros.h>
+
+
+/**
+ * Calculate the buffer size need to marshal a queued interception
+ *
+ * @param this The client information
+ * @return The number of bytes to allocate to the output buffer
+ */
+size_t queued_interception_marshal_size(void)
+{
+ return sizeof(int64_t) + 2 * sizeof(int);
+}
+
+
+/**
+ * Marshals a queued interception
+ *
+ * @param this The queued interception
+ * @param data Output buffer for the marshalled data
+ * @return The number of bytes that have been written (everything will be written)
+ */
+size_t queued_interception_marshal(const queued_interception_t* restrict this, char* restrict data)
+{
+ buf_set_next(data, int64_t, this->priority);
+ buf_set_next(data, int, this->modifying);
+ buf_set_next(data, int, this->client->socket_fd);
+ return queued_interception_marshal_size();
+}
+
+
+/**
+ * Unmarshals a queued interception
+ *
+ * @param this Memory slot in which to store the new queued interception
+ * @param data In buffer with the marshalled data
+ * @return Zero on error, errno will be set accordingly, otherwise the number of read bytes.
+ */
+size_t queued_interception_unmarshal(queued_interception_t* restrict this, char* restrict data)
+{
+ this->client = NULL;
+ buf_get_next(data, int64_t, this->priority);
+ buf_get_next(data, int, this->modifying);
+ buf_get_next(data, int, this->socket_fd);
+ return queued_interception_marshal_size();
+}
+
+
+/**
+ * Pretend to unmarshal a queued interception
+ *
+ * @param data In buffer with the marshalled data
+ * @return The number of read bytes
+ */
+size_t queued_interception_unmarshal_skip(void)
+{
+ return queued_interception_marshal_size();
+}
+
diff --git a/src/mds-server/queued_interception.h b/src/mds-server/queued_interception.h
index 6f08a8a..ddab418 100644
--- a/src/mds-server/queued_interception.h
+++ b/src/mds-server/queued_interception.h
@@ -32,7 +32,7 @@ typedef struct queued_interception
/**
* The intercepting client
*/
- client_t* client;
+ struct client* client;
/**
* The interception priority
@@ -44,8 +44,48 @@ typedef struct queued_interception
*/
int modifying;
+ /**
+ * The file descriptor of the intercepting client's socket (used for unmarshalling)
+ */
+ int socket_fd;
+
} queued_interception_t;
+/**
+ * Calculate the buffer size need to marshal a queued interception
+ *
+ * @param this The client information
+ * @return The number of bytes to allocate to the output buffer
+ */
+size_t queued_interception_marshal_size(void) __attribute__((const));
+
+/**
+ * Marshals a queued interception
+ *
+ * @param this The queued interception
+ * @param data Output buffer for the marshalled data
+ * @return The number of bytes that have been written (everything will be written)
+ */
+size_t queued_interception_marshal(const queued_interception_t* restrict this, char* restrict data);
+
+/**
+ * Unmarshals a queued interception
+ *
+ * @param this Memory slot in which to store the new queued interception
+ * @param data In buffer with the marshalled data
+ * @return Zero on error, errno will be set accordingly, otherwise the number of read bytes.
+ */
+size_t queued_interception_unmarshal(queued_interception_t* restrict this, char* restrict data);
+
+/**
+ * Pretend to unmarshal a queued interception
+ *
+ * @param data In buffer with the marshalled data
+ * @return The number of read bytes
+ */
+size_t queued_interception_unmarshal_skip(void) __attribute__((const));
+
+
#endif