aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--Makefile3
-rw-r--r--TODO1
-rw-r--r--src/libmdsserver/linked-list.h1
-rw-r--r--src/mds-base.c15
-rw-r--r--src/mds-base.h8
-rw-r--r--src/mds-registry/globals.c27
-rw-r--r--src/mds-registry/globals.h23
-rw-r--r--src/mds-registry/mds-registry.c32
-rw-r--r--src/mds-registry/registry.c68
-rw-r--r--src/mds-registry/signals.c96
-rw-r--r--src/mds-registry/signals.h26
-rw-r--r--src/mds-registry/slave.c403
-rw-r--r--src/mds-registry/slave.h160
-rw-r--r--src/mds-server/mds-server.c2
-rw-r--r--src/mds-server/mds-server.h2
15 files changed, 798 insertions, 69 deletions
diff --git a/Makefile b/Makefile
index d760f1e..a41bc22 100644
--- a/Makefile
+++ b/Makefile
@@ -20,7 +20,8 @@ OBJ_mds-server_ = mds-server interception-condition client multicast \
queued-interception globals signals interceptors \
sending slavery reexec receiving
-OBJ_mds-registry_ = mds-registry util globals reexec registry
+OBJ_mds-registry_ = mds-registry util globals reexec registry signals \
+ slave
OBJ_mds-server = $(foreach O,$(OBJ_mds-server_),obj/mds-server/$(O).o)
OBJ_mds-registry = $(foreach O,$(OBJ_mds-registry_),obj/mds-registry/$(O).o)
diff --git a/TODO b/TODO
index 5a7ba8c..6bf4e34 100644
--- a/TODO
+++ b/TODO
@@ -39,4 +39,5 @@ Missing servers:
cmd-registry Command line interface for the protocol registry
Fast lanes
+Optimise use of mutexe
diff --git a/src/libmdsserver/linked-list.h b/src/libmdsserver/linked-list.h
index 6ed7076..e77f33d 100644
--- a/src/libmdsserver/linked-list.h
+++ b/src/libmdsserver/linked-list.h
@@ -276,7 +276,6 @@ int linked_list_unmarshal(linked_list_t* restrict this, char* restrict data);
#define foreach_linked_list_node(list, node) \
for (node = (list).edge; node = (list).next[node], node != (list).edge;)
-
/**
* Print the content of the list
*
diff --git a/src/mds-base.c b/src/mds-base.c
index 9baa42e..04ae25b 100644
--- a/src/mds-base.c
+++ b/src/mds-base.c
@@ -208,6 +208,18 @@ void __attribute__((weak)) server_initialised(void)
/**
+ * This function is called when an intraprocess signal
+ * that used to send a notification to a thread
+ *
+ * @param signo The signal that has been received
+ */
+void received_noop(int signo)
+{
+ (void) signo;
+}
+
+
+/**
* This function is called when a signal that
* signals the server to re-exec has been received
*
@@ -473,6 +485,9 @@ int trap_signals(void)
/* Implement clean exit on SIGINT. */
fail_if (xsigaction(SIGINT, received_terminate) < 0);
+ /* Implement clean exit on SIGRTMIN. */
+ fail_if (xsigaction(SIGRTMIN, received_noop) < 0);
+
return 0;
pfail:
perror(*argv);
diff --git a/src/mds-base.h b/src/mds-base.h
index c8666a1..edc42fa 100644
--- a/src/mds-base.h
+++ b/src/mds-base.h
@@ -159,6 +159,14 @@ void server_initialised(void); /* __attribute__((weak)) */
/**
+ * This function is called when an intraprocess signal
+ * that used to send a notification to a thread
+ *
+ * @param signo The signal that has been received
+ */
+void received_noop(int signo) __attribute__((weak, const));
+
+/**
* This function should be implemented by the actual server implementation
* if the server is multithreaded
*
diff --git a/src/mds-registry/globals.c b/src/mds-registry/globals.c
index 981cad1..e275eb7 100644
--- a/src/mds-registry/globals.c
+++ b/src/mds-registry/globals.c
@@ -50,17 +50,32 @@ char* send_buffer = NULL;
size_t send_buffer_size = 0;
/**
- * General mutex
+ * Used to temporarily store the old value when reallocating heap-allocations
*/
-pthread_mutex_t reg_mutex;
+char* old;
/**
- * General condition
+ * The master thread
*/
-pthread_cond_t reg_cond;
+pthread_t master_thread;
/**
- * Used to temporarily store the old value when reallocating heap-allocations
+ * The number of running slaves
*/
-char* old;
+size_t running_slaves = 0;
+
+/**
+ * List of running slaves
+ */
+linked_list_t slave_list;
+
+/**
+ * Mutex for slave data
+ */
+pthread_mutex_t slave_mutex;
+
+/**
+ * Condition for slave data
+ */
+pthread_cond_t slave_cond;
diff --git a/src/mds-registry/globals.h b/src/mds-registry/globals.h
index 9c4b42f..dcbf91d 100644
--- a/src/mds-registry/globals.h
+++ b/src/mds-registry/globals.h
@@ -21,6 +21,7 @@
#include <libmdsserver/mds-message.h>
#include <libmdsserver/hash-table.h>
+#include <libmdsserver/linked-list.h>
#include <stdint.h>
#include <stddef.h>
@@ -62,19 +63,29 @@ extern char* send_buffer;
extern size_t send_buffer_size;
/**
- * General mutex
+ * Used to temporarily store the old value when reallocating heap-allocations
*/
-extern pthread_mutex_t reg_mutex;
+extern char* old;
/**
- * General condition
+ * The number of running slaves
*/
-extern pthread_cond_t reg_cond;
+extern size_t running_slaves;
/**
- * Used to temporarily store the old value when reallocating heap-allocations
+ * List of running slaves
*/
-extern char* old;
+extern linked_list_t slave_list; /* TODO (un)marshal */
+
+/**
+ * Mutex for slave data
+ */
+extern pthread_mutex_t slave_mutex;
+
+/**
+ * Condition for slave data
+ */
+extern pthread_cond_t slave_cond;
#endif
diff --git a/src/mds-registry/mds-registry.c b/src/mds-registry/mds-registry.c
index 96cf05d..aa759c8 100644
--- a/src/mds-registry/mds-registry.c
+++ b/src/mds-registry/mds-registry.c
@@ -23,6 +23,7 @@
#include <libmdsserver/macros.h>
#include <libmdsserver/hash-help.h>
+#include <libmdsserver/linked-list.h>
#include <errno.h>
#include <stdio.h>
@@ -54,20 +55,20 @@ server_characteristics_t server_characteristics =
*/
int preinitialise_server(void)
{
- if ((errno = pthread_mutex_init(&reg_mutex, NULL)))
- {
- perror(*argv);
- return 1;
- }
+ int stage = 0;
- if ((errno = pthread_cond_init(&reg_cond, NULL)))
- {
- perror(*argv);
- pthread_mutex_destroy(&reg_mutex);
- return 1;
- }
+ fail_if ((errno = pthread_mutex_init(&slave_mutex, NULL))); stage++;
+ fail_if ((errno = pthread_cond_init(&slave_cond, NULL))); stage++;
+
+ linked_list_create(&slave_list, 2);
return 0;
+
+ pfail:
+ perror(*argv);
+ if (stage >= 1) pthread_mutex_destroy(&slave_mutex);
+ if (stage >= 2) pthread_cond_destroy(&slave_cond);
+ return 1;
}
@@ -177,13 +178,18 @@ int master_loop(void)
pfail:
perror(*argv);
fail:
+ /* Join with all slaves threads. */
+ with_mutex (slave_mutex,
+ while (running_slaves > 0)
+ pthread_cond_wait(&slave_cond, &slave_mutex););
+
if (rc || !reexecing)
{
hash_table_destroy(&reg_table, (free_func*)reg_table_free_key, (free_func*)reg_table_free_value);
mds_message_destroy(&received);
}
- pthread_mutex_destroy(&reg_mutex);
- pthread_cond_destroy(&reg_cond);
+ pthread_mutex_destroy(&slave_mutex);
+ pthread_cond_destroy(&slave_cond);
free(send_buffer);
return rc;
}
diff --git a/src/mds-registry/registry.c b/src/mds-registry/registry.c
index 6fed005..8c0132e 100644
--- a/src/mds-registry/registry.c
+++ b/src/mds-registry/registry.c
@@ -19,6 +19,7 @@
#include "util.h"
#include "globals.h"
+#include "slave.h"
#include "../mds-base.h"
@@ -31,6 +32,7 @@
#include <string.h>
#include <stdio.h>
#include <stdlib.h>
+#include <pthread.h>
@@ -43,14 +45,14 @@
static int handle_close_message(void)
{
/* Servers do not close too often, there is no need to
- optimise this with another hash table. */
+ optimise this with another hash table. Doing so would
+ also require some caution because the keys are 32-bit
+ on 32-bit computers, and the client ID:s are 64-bit. */
size_t i, j, ptr = 0, size = 1;
size_t* keys = NULL;
size_t* old_keys;
- fail_if ((errno = pthread_mutex_lock(&reg_mutex)));
-
/* Remove server for all protocols. */
@@ -75,9 +77,19 @@ static int handle_close_message(void)
goto fail;
keys[ptr++] = entry->key;
}
+
+
+ /* Mark client as closed. */
+
+ close_slaves(client);
}
+ /* Close slaves those clients have closed. */
+
+ with_mutex (slave_mutex, pthread_cond_broadcast(&slave_cond););
+
+
/* Remove protocol that no longer have any supporting servers. */
for (i = 0; i < ptr; i++)
@@ -93,15 +105,12 @@ static int handle_close_message(void)
free(command);
}
- pthread_mutex_unlock(&reg_mutex);
-
free(keys);
return 0;
pfail:
perror(*argv);
fail:
free(keys);
- pthread_mutex_unlock(&reg_mutex);
return -1;
}
@@ -153,6 +162,9 @@ static int registry_action_add(int has_key, char* command, size_t command_key, u
}
}
+ /* Notify slaves. */
+ fail_if (advance_slaves(command));
+
return 0;
pfail:
perror(*argv);
@@ -258,11 +270,13 @@ static int registry_action(size_t length, int action, const char* recv_client_id
if (action == 0)
{
wait_set = malloc(sizeof(hash_table_t));
+ if (wait_set == NULL)
+ return -1;
if (hash_table_create(wait_set))
{
hash_table_destroy(wait_set, NULL, NULL);
free(wait_set);
- goto pfail;
+ return -1;
}
wait_set->key_comparator = (compare_func*)string_comparator;
wait_set->hasher = (hash_func*)string_hash;
@@ -292,8 +306,6 @@ static int registry_action(size_t length, int action, const char* recv_client_id
/* For all protocols in the payload, either add or remove
them from or to the protocl table or the wait set. */
- fail_if ((errno = pthread_mutex_lock(&reg_mutex)));
-
for (begin = 0; begin < length;)
{
char* end = rawmemchr(payload + begin, '\n');
@@ -305,28 +317,16 @@ static int registry_action(size_t length, int action, const char* recv_client_id
if (len > 0)
if (registry_action_act(command, action, client, wait_set))
- goto fail_in_mutex;
+ return -1;
}
- pthread_mutex_unlock(&reg_mutex);
-
/* If ‘Action: wait’, start a new thread that waits for the protocols and the responds. */
if (action == 0)
- {
- /* FIXME */
- }
+ return start_slave(wait_set, recv_client_id, recv_message_id);
return 0;
-
-
- pfail:
- perror(*argv);
- return -1;
- fail_in_mutex:
- pthread_mutex_unlock(&reg_mutex);
- return -1;
}
@@ -348,12 +348,11 @@ static int list_registry(const char* recv_client_id, const char* recv_message_id
if (send_buffer_size == 0)
{
- fail_if (xmalloc(send_buffer, 256, char));
+ if (xmalloc(send_buffer, 256, char))
+ return -1;
send_buffer_size = 256;
}
- fail_if ((errno = pthread_mutex_lock(&reg_mutex)));
-
/* Add all protocols to the send buffer. */
@@ -366,7 +365,7 @@ static int list_registry(const char* recv_client_id, const char* recv_message_id
/* Make sure the send buffer can fit all protocols. */
while (ptr + len + 1 >= send_buffer_size)
if (growalloc(old, send_buffer, send_buffer_size, char))
- goto fail_in_mutex;
+ return -1;
memcpy(send_buffer + ptr, command, len * sizeof(char));
ptr += len;
@@ -381,7 +380,7 @@ static int list_registry(const char* recv_client_id, const char* recv_message_id
while (ptr + i >= send_buffer_size)
if (growalloc(old, send_buffer, send_buffer_size, char))
- goto fail_in_mutex;
+ return -1;
/* Construct message headers. */
@@ -391,21 +390,10 @@ static int list_registry(const char* recv_client_id, const char* recv_message_id
/* Increase message ID. */
message_id = message_id == INT32_MAX ? 0 : (message_id + 1);
- pthread_mutex_unlock(&reg_mutex);
-
/* Send message. */
if (full_send(send_buffer + ptr, strlen(send_buffer + ptr)))
return 1;
return full_send(send_buffer, ptr);
-
-
- fail_in_mutex:
- pthread_mutex_unlock(&reg_mutex);
- return -1;
-
- pfail:
- perror(*argv);
- return -1;
}
@@ -413,7 +401,7 @@ static int list_registry(const char* recv_client_id, const char* recv_message_id
* Handle the received message containing ‘Command: register’-header–value
*
* @return Zero on success -1 on error or interruption,
- * errno will be set accordingly
+ * `errno` will be set accordingly
*/
static int handle_register_message(void)
{
diff --git a/src/mds-registry/signals.c b/src/mds-registry/signals.c
new file mode 100644
index 0000000..7d1f5b4
--- /dev/null
+++ b/src/mds-registry/signals.c
@@ -0,0 +1,96 @@
+/**
+ * mds — A micro-display server
+ * Copyright © 2014 Mattias Andrée (maandree@member.fsf.org)
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation, either version 3 of the License, or
+ * (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
+ */
+#include "signals.h"
+
+#include "globals.h"
+#include "slave.h"
+
+#include "../mds-base.h"
+
+#include <libmdsserver/linked-list.h>
+#include <libmdsserver/macros.h>
+
+#include <stdio.h>
+#include <pthread.h>
+#include <errno.h>
+
+
+
+/**
+ * Send a singal to all threads except the current thread
+ *
+ * @param signo The signal
+ */
+static void signal_all(int signo)
+{
+ pthread_t current_thread;
+ ssize_t node;
+
+ current_thread = pthread_self();
+
+ if (pthread_equal(current_thread, master_thread) == 0)
+ pthread_kill(master_thread, signo);
+
+ with_mutex (slave_mutex,
+ foreach_linked_list_node (slave_list, node)
+ {
+ slave_t* value = (slave_t*)(void*)(slave_list.values[node]);
+ if (pthread_equal(current_thread, value->thread) == 0)
+ pthread_kill(value->thread, signo);
+ }
+ );
+}
+
+
+/**
+ * This function is called when a signal that
+ * signals the server to re-exec has been received
+ *
+ * When this function is invoked, it should set `reexecing` to a non-zero value
+ *
+ * @param signo The signal that has been received
+ */
+void received_reexec(int signo)
+{
+ if (reexecing == 0)
+ {
+ terminating = reexecing = 1;
+ eprint("re-exec signal received.");
+ signal_all(signo);
+ }
+}
+
+
+/**
+ * This function is called when a signal that
+ * signals the server to re-exec has been received
+ *
+ * When this function is invoked, it should set `terminating` to a non-zero value
+ *
+ * @param signo The signal that has been received
+ */
+void received_terminate(int signo)
+{
+ if (terminating == 0)
+ {
+ terminating = 1;
+ eprint("terminate signal received.");
+ signal_all(signo);
+ }
+}
+
diff --git a/src/mds-registry/signals.h b/src/mds-registry/signals.h
new file mode 100644
index 0000000..65070e9
--- /dev/null
+++ b/src/mds-registry/signals.h
@@ -0,0 +1,26 @@
+/**
+ * mds — A micro-display server
+ * Copyright © 2014 Mattias Andrée (maandree@member.fsf.org)
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation, either version 3 of the License, or
+ * (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
+ */
+#ifndef MDS_MDS_REGISTRY_SIGNALS_H
+#define MDS_MDS_REGISTRY_SIGNALS_H
+
+
+#include "../mds-base.h"
+
+
+#endif
+
diff --git a/src/mds-registry/slave.c b/src/mds-registry/slave.c
new file mode 100644
index 0000000..9bc2fd1
--- /dev/null
+++ b/src/mds-registry/slave.c
@@ -0,0 +1,403 @@
+/**
+ * mds — A micro-display server
+ * Copyright © 2014 Mattias Andrée (maandree@member.fsf.org)
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation, either version 3 of the License, or
+ * (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
+ */
+#include "slave.h"
+
+#include "util.h"
+#include "globals.h"
+
+#include "../mds-base.h"
+
+#include <libmdsserver/macros.h>
+
+#include <string.h>
+#include <errno.h>
+#include <pthread.h>
+
+
+
+/**
+ * Master function for slave threads
+ *
+ * @param data Input data
+ * @return Output data
+ */
+static void* slave_loop(void* data)
+{
+ slave_t* slave = data;
+
+ if (slave->closed)
+ goto done;
+
+ /* Set up traps for especially handled signals. */
+ fail_if (trap_signals() < 0);
+
+ fail_if ((errno = pthread_mutex_lock(&slave_mutex)));
+
+ while (!reexecing && !terminating)
+ {
+ if ((slave->wait_set->size == 0) || slave->closed)
+ break;
+ pthread_cond_wait(&slave_cond, &slave_mutex);
+ }
+
+ pthread_mutex_unlock(&slave_mutex);
+
+ if (!(slave->closed) && slave->wait_set->size)
+ ; /* FIXME send inside slave_mutex */
+
+ goto done;
+
+ pfail:
+ perror(*argv);
+ done:
+ with_mutex (slave_mutex,
+ if (!reexecing)
+ linked_list_remove(&slave_list, slave->node);
+ running_slaves--;
+ if (running_slaves == 0)
+ pthread_cond_signal(&slave_cond);
+ );
+ return NULL;
+}
+
+
+/**
+ * Start a slave thread
+ *
+ * @param wait_set Set of protocols for which to wait that they become available
+ * @param recv_client_id The ID of the waiting client
+ * @param recv_message_id The ID of the message that triggered the waiting
+ * @return Non-zero on error
+ */
+int start_slave(hash_table_t* restrict wait_set, const char* restrict recv_client_id, const char* restrict recv_message_id)
+{
+ slave_t* slave = slave_create(wait_set, recv_client_id, recv_message_id);
+ size_t slave_address;
+ ssize_t node = LINKED_LIST_UNUSED;
+
+ fail_if (slave == NULL);
+ fail_if ((errno = pthread_mutex_lock(&slave_mutex)));
+
+ slave_address = (size_t)(void*)slave;
+ slave->node = node = linked_list_insert_end(&slave_list, slave_address);
+ if (slave->node == LINKED_LIST_UNUSED)
+ goto pfail_in_mutex;
+
+ if ((errno = pthread_create(&(slave->thread), NULL, slave_loop, (void*)(intptr_t)slave)))
+ goto pfail_in_mutex;
+
+ if ((errno = pthread_detach(slave->thread)))
+ perror(*argv);
+
+ running_slaves++;
+ pthread_mutex_unlock(&slave_mutex);
+
+ return 0;
+ pfail:
+ perror(*argv);
+ goto more_fail;
+ pfail_in_mutex:
+ perror(*argv);
+ pthread_mutex_unlock(&slave_mutex);
+ more_fail:
+ if (node != LINKED_LIST_UNUSED)
+ linked_list_remove(&slave_list, node);
+ return -1;
+}
+
+
+/**
+ * Close all slaves associated with a client
+ *
+ * @param client The client's ID
+ */
+void close_slaves(uint64_t client)
+{
+ ssize_t node;
+ with_mutex (slave_mutex,
+ foreach_linked_list_node (slave_list, node)
+ {
+ slave_t* slave = (slave_t*)(void*)(slave_list.values[node]);
+ if (slave->client == client)
+ slave->closed = 1;
+ }
+ );
+}
+
+
+/**
+ * Notify slaves that a protocol has become available
+ *
+ * @param command The protocol
+ * @return Non-zero on error, `ernno`will be set accordingly
+ */
+int advance_slaves(char* command)
+{
+ size_t key = (size_t)(void*)command;
+ int signal_slaves = 0;
+ ssize_t node;
+
+ if ((errno = pthread_mutex_lock(&slave_mutex)))
+ return -1;
+
+ foreach_linked_list_node (slave_list, node)
+ {
+ slave_t* slave = (slave_t*)(void*)(slave_list.values[node]);
+ if (hash_table_contains_key(slave->wait_set, key))
+ {
+ hash_table_remove(slave->wait_set, key);
+ signal_slaves |= slave->wait_set == 0;
+ }
+ }
+
+ if (signal_slaves)
+ pthread_cond_broadcast(&slave_cond);
+
+ pthread_mutex_unlock(&slave_mutex);
+ return 0;
+}
+
+
+/**
+ * Create a slave
+ *
+ * @param wait_set Set of protocols for which to wait that they become available
+ * @param recv_client_id The ID of the waiting client
+ * @param recv_message_id The ID of the message that triggered the waiting
+ * @return The slave, `NULL` on error, `errno` will be set accordingly
+ */
+slave_t* slave_create(hash_table_t* restrict wait_set, const char* restrict recv_client_id, const char* restrict recv_message_id)
+{
+ slave_t* restrict rc = NULL;
+
+ if (xmalloc(rc, 1, slave_t))
+ return NULL;
+
+ slave_initialise(rc);
+ rc->wait_set = wait_set;
+ rc->client = parse_client_id(recv_client_id);
+
+ if ((rc->client_id = strdup(recv_client_id)) == NULL)
+ goto fail;
+
+ if ((rc->message_id = strdup(recv_message_id)) == NULL)
+ goto fail;
+
+ return rc;
+
+ fail:
+ slave_destroy(rc);
+ free(rc);
+ return NULL;
+}
+
+
+/**
+ * Initialise a slave
+ *
+ * @param this Memory slot in which to store the new slave information
+ */
+void slave_initialise(slave_t* restrict this)
+{
+ this->wait_set = NULL;
+ this->client_id = NULL;
+ this->message_id = NULL;
+ this->closed = 0;
+}
+
+
+/**
+ * Release all resources assoicated with a slave
+ *
+ * @param this The slave information
+ */
+void slave_destroy(slave_t* restrict this)
+{
+ if (this->wait_set != NULL)
+ {
+ hash_table_destroy(this->wait_set, (free_func*)reg_table_free_key, NULL);
+ free(this->wait_set);
+ this->wait_set = NULL;
+ }
+
+ free(this->client_id);
+ this->client_id = NULL;
+
+ free(this->message_id);
+ this->message_id = NULL;
+}
+
+
+/**
+ * Calculate the buffer size need to marshal slave information
+ *
+ * @param this The slave information
+ * @return The number of bytes to allocate to the output buffer
+ */
+size_t slave_marshal_size(const slave_t* restrict this)
+{
+ size_t rc = 2 * sizeof(int) + sizeof(ssize_t) + sizeof(size_t) + sizeof(uint64_t);
+ hash_entry_t* restrict entry;
+ size_t n;
+
+ rc += (strlen(this->client_id) + strlen(this->message_id) + 2) * sizeof(char);
+
+ foreach_hash_table_entry (*(this->wait_set), n, entry)
+ {
+ char* protocol = (char*)(void*)(entry->key);
+ rc += strlen(protocol) + 1;
+ }
+
+ return rc;
+}
+
+
+/**
+ * Marshals slave information
+ *
+ * @param this The slave information
+ * @param data Output buffer for the marshalled data
+ * @return The number of bytes that have been written (everything will be written)
+ */
+size_t slave_marshal(const slave_t* restrict this, char* restrict data)
+{
+ hash_entry_t* restrict entry;
+ size_t n;
+
+ buf_set_next(data, int, SLAVE_T_VERSION);
+ buf_set_next(data, int, this->closed);
+ buf_set_next(data, ssize_t, this->node);
+ buf_set_next(data, uint64_t, this->client);
+
+ memcpy(data, this->client_id, (strlen(this->client_id) + 1) * sizeof(char));
+ data += strlen(this->client_id) + 1;
+
+ memcpy(data, this->message_id, (strlen(this->message_id) + 1) * sizeof(char));
+ data += strlen(this->message_id) + 1;
+
+ n = this->wait_set->size;
+ buf_set_next(data, size_t, n);
+
+ foreach_hash_table_entry (*(this->wait_set), n, entry)
+ {
+ char* restrict protocol = (char*)(void*)(entry->key);
+ memcpy(data, protocol, (strlen(protocol) + 1) * sizeof(char));
+ data += strlen(protocol) + 1;
+ }
+
+ return slave_marshal_size(this);
+}
+
+
+/**
+ * Unmarshals slave information
+ *
+ * @param this Memory slot in which to store the new slave information
+ * @param data In buffer with the marshalled data
+ * @return Zero on error, errno will be set accordingly, otherwise the number of read bytes.
+ * Destroy the slave information on error.
+ */
+size_t slave_unmarshal(slave_t* restrict this, char* restrict data)
+{
+ size_t key, n, m, rc = 2 * sizeof(int) + sizeof(ssize_t) + sizeof(size_t) + sizeof(uint64_t);
+ char* protocol;
+
+ this->wait_set = NULL;
+ this->client_id = NULL;
+ this->message_id = NULL;
+
+ /* buf_get_next(data, int, SLAVE_T_VERSION); */
+ buf_next(data, int, 1);
+
+ buf_get_next(data, int, this->closed);
+ buf_get_next(data, ssize_t, this->node);
+ buf_get_next(data, uint64_t, this->client);
+
+ n = (strlen((char*)data) + 1) * sizeof(char);
+ if ((this->client_id = malloc(n)) == NULL)
+ return 0;
+ memcpy(this->client_id, data, n);
+ data += n, rc += n;
+
+ n = (strlen((char*)data) + 1) * sizeof(char);
+ if ((this->message_id = malloc(n)) == NULL)
+ return 0;
+ memcpy(this->message_id, data, n);
+ data += n, rc += n;
+
+ if ((this->wait_set = malloc(sizeof(hash_table_t))) == NULL)
+ return 0;
+ if (hash_table_create(this->wait_set))
+ return 0;
+
+ buf_get_next(data, size_t, m);
+
+ while (m--)
+ {
+ n = (strlen((char*)data) + 1) * sizeof(char);
+ if ((protocol = malloc(n)) == NULL)
+ return 0;
+ memcpy(protocol, data, n);
+ data += n, rc += n;
+
+ key = (size_t)(void*)protocol;
+ if (hash_table_put(this->wait_set, key, 1) == 0)
+ if (errno)
+ {
+ free(protocol);
+ return 0;
+ }
+ }
+
+ return rc;
+}
+
+
+/**
+ * Pretend to unmarshal slave information
+ *
+ * @param data In buffer with the marshalled data
+ * @return The number of read bytes
+ */
+size_t slave_unmarshal_skip(char* restrict data)
+{
+ size_t n, m, rc = 2 * sizeof(int) + sizeof(ssize_t) + sizeof(size_t) + sizeof(uint64_t);
+
+ /* buf_get_next(data, int, SLAVE_T_VERSION); */
+ buf_next(data, int, 1);
+
+ buf_next(data, int, 1);
+ buf_next(data, ssize_t, 1);
+
+ n = (strlen((char*)data) + 1) * sizeof(char);
+ data += n, rc += n;
+
+ n = (strlen((char*)data) + 1) * sizeof(char);
+ data += n, rc += n;
+
+ buf_get_next(data, size_t, m);
+
+ while (m--)
+ {
+ n = (strlen((char*)data) + 1) * sizeof(char);
+ data += n, rc += n;
+ }
+
+ return rc;
+}
+
diff --git a/src/mds-registry/slave.h b/src/mds-registry/slave.h
new file mode 100644
index 0000000..ae8ae09
--- /dev/null
+++ b/src/mds-registry/slave.h
@@ -0,0 +1,160 @@
+/**
+ * mds — A micro-display server
+ * Copyright © 2014 Mattias Andrée (maandree@member.fsf.org)
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation, either version 3 of the License, or
+ * (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
+ */
+#ifndef MDS_MDS_REGISTRY_SLAVE_H
+#define MDS_MDS_REGISTRY_SLAVE_H
+
+
+#include <libmdsserver/hash-table.h>
+
+#include <stdlib.h>
+#include <stdint.h>
+#include <pthread.h>
+
+
+
+#define SLAVE_T_VERSION 0
+
+/**
+ * Slave information, a thread waiting for protocols to become available
+ */
+typedef struct slave /* TODO: add time-to-live */
+{
+ /**
+ * Set of protocols for which to wait that they become available
+ */
+ hash_table_t* wait_set;
+
+ /**
+ * The ID of the waiting client
+ */
+ uint64_t client;
+
+ /**
+ * The ID of the waiting client
+ */
+ char* client_id;
+
+ /**
+ * The ID of the message that triggered the waiting
+ */
+ char* message_id;
+
+ /**
+ * The slave's node in the linked list of slaves
+ */
+ ssize_t node;
+
+ /**
+ * Whether the client has been closed
+ */
+ volatile int closed;
+
+ /**
+ * The slave thread
+ */
+ pthread_t thread;
+
+} slave_t;
+
+
+
+/**
+ * Start a slave thread
+ *
+ * @param wait_set Set of protocols for which to wait that they become available
+ * @param recv_client_id The ID of the waiting client
+ * @param recv_message_id The ID of the message that triggered the waiting
+ * @return Non-zero on error
+ */
+int start_slave(hash_table_t* restrict wait_set, const char* restrict recv_client_id, const char* restrict recv_message_id);
+
+/**
+ * Close all slaves associated with a client
+ *
+ * @param client The client's ID
+ */
+void close_slaves(uint64_t client);
+
+/**
+ * Notify slaves that a protocol has become available
+ *
+ * @param command The protocol
+ * @return Non-zero on error, `ernno`will be set accordingly
+ */
+int advance_slaves(char* command);
+
+/**
+ * Create a slave
+ *
+ * @return The slave
+ */
+slave_t* slave_create(hash_table_t* restrict wait_set, const char* restrict recv_client_id, const char* restrict recv_message_id);
+
+
+/**
+ * Initialise a slave
+ *
+ * @param this Memory slot in which to store the new slave information
+ */
+void slave_initialise(slave_t* restrict this);
+
+/**
+ * Release all resources assoicated with a slave
+ *
+ * @param this The slave information
+ */
+void slave_destroy(slave_t* restrict this);
+
+/**
+ * Calculate the buffer size need to marshal slave information
+ *
+ * @param this The slave information
+ * @return The number of bytes to allocate to the output buffer
+ */
+size_t slave_marshal_size(const slave_t* restrict this) __attribute__((pure));
+
+/**
+ * Marshals slave information
+ *
+ * @param this The slave information
+ * @param data Output buffer for the marshalled data
+ * @return The number of bytes that have been written (everything will be written)
+ */
+size_t slave_marshal(const slave_t* restrict this, char* restrict data);
+
+/**
+ * Unmarshals slave information
+ *
+ * @param this Memory slot in which to store the new slave information
+ * @param data In buffer with the marshalled data
+ * @return Zero on error, errno will be set accordingly, otherwise the number of read bytes.
+ * Destroy the slave information on error.
+ */
+size_t slave_unmarshal(slave_t* restrict this, char* restrict data);
+
+/**
+ * Pretend to unmarshal slave information
+ *
+ * @param data In buffer with the marshalled data
+ * @return The number of read bytes
+ */
+size_t slave_unmarshal_skip(char* restrict data) __attribute__((pure));
+
+
+#endif
+
diff --git a/src/mds-server/mds-server.c b/src/mds-server/mds-server.c
index d6846ec..d0b6b87 100644
--- a/src/mds-server/mds-server.c
+++ b/src/mds-server/mds-server.c
@@ -248,7 +248,7 @@ int accept_connection(void)
* Master function for slave threads
*
* @param data Input data
- * @return Outout data
+ * @return Output data
*/
void* slave_loop(void* data)
{
diff --git a/src/mds-server/mds-server.h b/src/mds-server/mds-server.h
index 3bce7ed..4d6c5e7 100644
--- a/src/mds-server/mds-server.h
+++ b/src/mds-server/mds-server.h
@@ -35,7 +35,7 @@ int accept_connection(void);
* Master function for slave threads
*
* @param data Input data
- * @return Outout data
+ * @return Output data
*/
void* slave_loop(void* data);