From 32bf8aea8f3a46ee6c2808cd2369a7558b7b6bc7 Mon Sep 17 00:00:00 2001
From: Mattias Andrée <maandree@operamail.com>
Date: Sat, 2 Aug 2014 20:01:03 +0200
Subject: misc
MIME-Version: 1.0
Content-Type: text/plain; charset=UTF-8
Content-Transfer-Encoding: 8bit

Signed-off-by: Mattias Andrée <maandree@operamail.com>
---
 src/libmdsserver/linked-list.h  |   1 -
 src/mds-base.c                  |  15 ++
 src/mds-base.h                  |   8 +
 src/mds-registry/globals.c      |  27 ++-
 src/mds-registry/globals.h      |  23 ++-
 src/mds-registry/mds-registry.c |  32 ++--
 src/mds-registry/registry.c     |  68 +++----
 src/mds-registry/signals.c      |  96 ++++++++++
 src/mds-registry/signals.h      |  26 +++
 src/mds-registry/slave.c        | 403 ++++++++++++++++++++++++++++++++++++++++
 src/mds-registry/slave.h        | 160 ++++++++++++++++
 src/mds-server/mds-server.c     |   2 +-
 src/mds-server/mds-server.h     |   2 +-
 13 files changed, 795 insertions(+), 68 deletions(-)
 create mode 100644 src/mds-registry/signals.c
 create mode 100644 src/mds-registry/signals.h
 create mode 100644 src/mds-registry/slave.c
 create mode 100644 src/mds-registry/slave.h

(limited to 'src')

diff --git a/src/libmdsserver/linked-list.h b/src/libmdsserver/linked-list.h
index 6ed7076..e77f33d 100644
--- a/src/libmdsserver/linked-list.h
+++ b/src/libmdsserver/linked-list.h
@@ -276,7 +276,6 @@ int linked_list_unmarshal(linked_list_t* restrict this, char* restrict data);
 #define foreach_linked_list_node(list, node)  \
   for (node = (list).edge; node = (list).next[node], node != (list).edge;)
 
-
 /**
  * Print the content of the list
  * 
diff --git a/src/mds-base.c b/src/mds-base.c
index 9baa42e..04ae25b 100644
--- a/src/mds-base.c
+++ b/src/mds-base.c
@@ -207,6 +207,18 @@ void __attribute__((weak)) server_initialised(void)
 }
 
 
+/**
+ * This function is called when an intraprocess signal
+ * that used to send a notification to a thread
+ * 
+ * @param  signo  The signal that has been received
+ */
+void received_noop(int signo)
+{
+  (void) signo;
+}
+
+
 /**
  * This function is called when a signal that
  * signals the server to re-exec has been received
@@ -473,6 +485,9 @@ int trap_signals(void)
   /* Implement clean exit on SIGINT. */
   fail_if (xsigaction(SIGINT, received_terminate) < 0);
   
+  /* Implement clean exit on SIGRTMIN. */
+  fail_if (xsigaction(SIGRTMIN, received_noop) < 0);
+  
   return 0;
  pfail:
   perror(*argv);
diff --git a/src/mds-base.h b/src/mds-base.h
index c8666a1..edc42fa 100644
--- a/src/mds-base.h
+++ b/src/mds-base.h
@@ -158,6 +158,14 @@ int connect_to_display(void); /* __attribute__((weak)) */
 void server_initialised(void); /* __attribute__((weak)) */
 
 
+/**
+ * This function is called when an intraprocess signal
+ * that used to send a notification to a thread
+ * 
+ * @param  signo  The signal that has been received
+ */
+void received_noop(int signo) __attribute__((weak, const));
+
 /**
  * This function should be implemented by the actual server implementation
  * if the server is multithreaded
diff --git a/src/mds-registry/globals.c b/src/mds-registry/globals.c
index 981cad1..e275eb7 100644
--- a/src/mds-registry/globals.c
+++ b/src/mds-registry/globals.c
@@ -50,17 +50,32 @@ char* send_buffer = NULL;
 size_t send_buffer_size = 0;
 
 /**
- * General mutex
+ * Used to temporarily store the old value when reallocating heap-allocations
  */
-pthread_mutex_t reg_mutex;
+char* old;
 
 /**
- * General condition
+ * The master thread
  */
-pthread_cond_t reg_cond;
+pthread_t master_thread;
 
 /**
- * Used to temporarily store the old value when reallocating heap-allocations
+ * The number of running slaves
  */
-char* old;
+size_t running_slaves = 0;
+
+/**
+ * List of running slaves
+ */
+linked_list_t slave_list;
+
+/**
+ * Mutex for slave data
+ */
+pthread_mutex_t slave_mutex;
+
+/**
+ * Condition for slave data
+ */
+pthread_cond_t slave_cond;
 
diff --git a/src/mds-registry/globals.h b/src/mds-registry/globals.h
index 9c4b42f..dcbf91d 100644
--- a/src/mds-registry/globals.h
+++ b/src/mds-registry/globals.h
@@ -21,6 +21,7 @@
 
 #include <libmdsserver/mds-message.h>
 #include <libmdsserver/hash-table.h>
+#include <libmdsserver/linked-list.h>
 
 #include <stdint.h>
 #include <stddef.h>
@@ -62,19 +63,29 @@ extern char* send_buffer;
 extern size_t send_buffer_size;
 
 /**
- * General mutex
+ * Used to temporarily store the old value when reallocating heap-allocations
  */
-extern pthread_mutex_t reg_mutex;
+extern char* old;
 
 /**
- * General condition
+ * The number of running slaves
  */
-extern pthread_cond_t reg_cond;
+extern size_t running_slaves;
 
 /**
- * Used to temporarily store the old value when reallocating heap-allocations
+ * List of running slaves
  */
-extern char* old;
+extern linked_list_t slave_list; /* TODO (un)marshal */
+
+/**
+ * Mutex for slave data
+ */
+extern pthread_mutex_t slave_mutex;
+
+/**
+ * Condition for slave data
+ */
+extern pthread_cond_t slave_cond;
 
 
 #endif
diff --git a/src/mds-registry/mds-registry.c b/src/mds-registry/mds-registry.c
index 96cf05d..aa759c8 100644
--- a/src/mds-registry/mds-registry.c
+++ b/src/mds-registry/mds-registry.c
@@ -23,6 +23,7 @@
 
 #include <libmdsserver/macros.h>
 #include <libmdsserver/hash-help.h>
+#include <libmdsserver/linked-list.h>
 
 #include <errno.h>
 #include <stdio.h>
@@ -54,20 +55,20 @@ server_characteristics_t server_characteristics =
  */
 int preinitialise_server(void)
 {
-  if ((errno = pthread_mutex_init(&reg_mutex, NULL)))
-    {
-      perror(*argv);
-      return 1;
-    }
+  int stage = 0;
   
-  if ((errno = pthread_cond_init(&reg_cond, NULL)))
-    {
-      perror(*argv);
-      pthread_mutex_destroy(&reg_mutex);
-      return 1;
-    }
+  fail_if ((errno = pthread_mutex_init(&slave_mutex, NULL)));  stage++;
+  fail_if ((errno = pthread_cond_init(&slave_cond, NULL)));  stage++;
+  
+  linked_list_create(&slave_list, 2);
   
   return 0;
+  
+ pfail:
+  perror(*argv);
+  if (stage >= 1)  pthread_mutex_destroy(&slave_mutex);
+  if (stage >= 2)  pthread_cond_destroy(&slave_cond);
+  return 1;
 }
 
 
@@ -177,13 +178,18 @@ int master_loop(void)
  pfail:
   perror(*argv);
  fail:
+  /* Join with all slaves threads. */
+  with_mutex (slave_mutex,
+              while (running_slaves > 0)
+                pthread_cond_wait(&slave_cond, &slave_mutex););
+  
   if (rc || !reexecing)
     {
       hash_table_destroy(&reg_table, (free_func*)reg_table_free_key, (free_func*)reg_table_free_value);
       mds_message_destroy(&received);
     }
-  pthread_mutex_destroy(&reg_mutex);
-  pthread_cond_destroy(&reg_cond);
+  pthread_mutex_destroy(&slave_mutex);
+  pthread_cond_destroy(&slave_cond);
   free(send_buffer);
   return rc;
 }
diff --git a/src/mds-registry/registry.c b/src/mds-registry/registry.c
index 6fed005..8c0132e 100644
--- a/src/mds-registry/registry.c
+++ b/src/mds-registry/registry.c
@@ -19,6 +19,7 @@
 
 #include "util.h"
 #include "globals.h"
+#include "slave.h"
 
 #include "../mds-base.h"
 
@@ -31,6 +32,7 @@
 #include <string.h>
 #include <stdio.h>
 #include <stdlib.h>
+#include <pthread.h>
 
 
 
@@ -43,14 +45,14 @@
 static int handle_close_message(void)
 {
   /* Servers do not close too often, there is no need to
-     optimise this with another hash table. */
+     optimise this with another hash table. Doing so would
+     also require some caution because the keys are 32-bit
+     on 32-bit computers, and the client ID:s are 64-bit. */
   
   size_t i, j, ptr = 0, size = 1;
   size_t* keys = NULL;
   size_t* old_keys;
   
-  fail_if ((errno = pthread_mutex_lock(&reg_mutex)));
-  
   
   /* Remove server for all protocols. */
   
@@ -75,9 +77,19 @@ static int handle_close_message(void)
 	      goto fail;
 	    keys[ptr++] = entry->key;
 	  }
+	
+	
+	/* Mark client as closed. */
+	
+	close_slaves(client);
       }
   
   
+  /* Close slaves those clients have closed. */
+  
+  with_mutex (slave_mutex, pthread_cond_broadcast(&slave_cond););
+  
+  
   /* Remove protocol that no longer have any supporting servers. */
   
   for (i = 0; i < ptr; i++)
@@ -93,15 +105,12 @@ static int handle_close_message(void)
       free(command);
     }
   
-  pthread_mutex_unlock(&reg_mutex);
-  
   free(keys);
   return 0;
  pfail:
   perror(*argv);
  fail:
   free(keys);
-  pthread_mutex_unlock(&reg_mutex);
   return -1;
 }
 
@@ -153,6 +162,9 @@ static int registry_action_add(int has_key, char* command, size_t command_key, u
 	}
     }
   
+  /* Notify slaves. */
+  fail_if (advance_slaves(command));
+  
   return 0;
  pfail:
   perror(*argv);
@@ -258,11 +270,13 @@ static int registry_action(size_t length, int action, const char* recv_client_id
   if (action == 0)
     {
       wait_set = malloc(sizeof(hash_table_t));
+      if (wait_set == NULL)
+	return -1;
       if (hash_table_create(wait_set))
 	{
 	  hash_table_destroy(wait_set, NULL, NULL);
 	  free(wait_set);
-	  goto pfail;
+	  return -1;
 	}
       wait_set->key_comparator = (compare_func*)string_comparator;
       wait_set->hasher = (hash_func*)string_hash;
@@ -292,8 +306,6 @@ static int registry_action(size_t length, int action, const char* recv_client_id
   /* For all protocols in the payload, either add or remove
      them from or to the protocl table or the wait set. */
   
-  fail_if ((errno = pthread_mutex_lock(&reg_mutex)));
-  
   for (begin = 0; begin < length;)
     {
       char* end = rawmemchr(payload + begin, '\n');
@@ -305,28 +317,16 @@ static int registry_action(size_t length, int action, const char* recv_client_id
       
       if (len > 0)
 	if (registry_action_act(command, action, client, wait_set))
-	  goto fail_in_mutex;
+	  return -1;
     }
   
-  pthread_mutex_unlock(&reg_mutex);
-    
   
   /* If ‘Action: wait’, start a new thread that waits for the protocols and the responds. */
   
   if (action == 0)
-    {
-      /* FIXME */
-    }
+    return start_slave(wait_set, recv_client_id, recv_message_id);
   
   return 0;
-  
-  
- pfail:
-  perror(*argv);
-  return -1;
- fail_in_mutex:
-  pthread_mutex_unlock(&reg_mutex);
-  return -1;
 }
 
 
@@ -348,12 +348,11 @@ static int list_registry(const char* recv_client_id, const char* recv_message_id
   
   if (send_buffer_size == 0)
     {
-      fail_if (xmalloc(send_buffer, 256, char));
+      if (xmalloc(send_buffer, 256, char))
+	return -1;
       send_buffer_size = 256;
     }
   
-  fail_if ((errno = pthread_mutex_lock(&reg_mutex)));
-  
   
   /* Add all protocols to the send buffer. */
   
@@ -366,7 +365,7 @@ static int list_registry(const char* recv_client_id, const char* recv_message_id
       /* Make sure the send buffer can fit all protocols. */
       while (ptr + len + 1 >= send_buffer_size)
 	if (growalloc(old, send_buffer, send_buffer_size, char))
-	  goto fail_in_mutex;
+	  return -1;
       
       memcpy(send_buffer + ptr, command, len * sizeof(char));
       ptr += len;
@@ -381,7 +380,7 @@ static int list_registry(const char* recv_client_id, const char* recv_message_id
   
   while (ptr + i >= send_buffer_size)
     if (growalloc(old, send_buffer, send_buffer_size, char))
-      goto fail_in_mutex;
+      return -1;
   
   
   /* Construct message headers. */
@@ -391,21 +390,10 @@ static int list_registry(const char* recv_client_id, const char* recv_message_id
   /* Increase message ID. */
   message_id = message_id == INT32_MAX ? 0 : (message_id + 1);
   
-  pthread_mutex_unlock(&reg_mutex);
-  
   /* Send message. */
   if (full_send(send_buffer + ptr, strlen(send_buffer + ptr)))
     return 1;
   return full_send(send_buffer, ptr);
-  
-  
- fail_in_mutex:
-  pthread_mutex_unlock(&reg_mutex);
-  return -1;
-  
- pfail:
-  perror(*argv);
-  return -1;
 }
 
 
@@ -413,7 +401,7 @@ static int list_registry(const char* recv_client_id, const char* recv_message_id
  * Handle the received message containing ‘Command: register’-header–value
  * 
  * @return  Zero on success -1 on error or interruption,
- *          errno will be set accordingly
+ *          `errno` will be set accordingly
  */
 static int handle_register_message(void)
 {
diff --git a/src/mds-registry/signals.c b/src/mds-registry/signals.c
new file mode 100644
index 0000000..7d1f5b4
--- /dev/null
+++ b/src/mds-registry/signals.c
@@ -0,0 +1,96 @@
+/**
+ * mds — A micro-display server
+ * Copyright © 2014  Mattias Andrée (maandree@member.fsf.org)
+ * 
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation, either version 3 of the License, or
+ * (at your option) any later version.
+ * 
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+ * GNU General Public License for more details.
+ * 
+ * You should have received a copy of the GNU General Public License
+ * along with this program.  If not, see <http://www.gnu.org/licenses/>.
+ */
+#include "signals.h"
+
+#include "globals.h"
+#include "slave.h"
+
+#include "../mds-base.h"
+
+#include <libmdsserver/linked-list.h>
+#include <libmdsserver/macros.h>
+
+#include <stdio.h>
+#include <pthread.h>
+#include <errno.h>
+
+
+
+/**
+ * Send a singal to all threads except the current thread
+ * 
+ * @param  signo  The signal
+ */
+static void signal_all(int signo)
+{      
+  pthread_t current_thread;
+  ssize_t node;
+  
+  current_thread = pthread_self();
+  
+  if (pthread_equal(current_thread, master_thread) == 0)
+    pthread_kill(master_thread, signo);
+  
+  with_mutex (slave_mutex,
+	      foreach_linked_list_node (slave_list, node)
+	        {
+		  slave_t* value = (slave_t*)(void*)(slave_list.values[node]);
+		  if (pthread_equal(current_thread, value->thread) == 0)
+		    pthread_kill(value->thread, signo);
+		}
+	      );
+}
+
+
+/**
+ * This function is called when a signal that
+ * signals the server to re-exec has been received
+ * 
+ * When this function is invoked, it should set `reexecing` to a non-zero value
+ * 
+ * @param  signo  The signal that has been received
+ */
+void received_reexec(int signo)
+{
+  if (reexecing == 0)
+    {
+      terminating = reexecing = 1;
+      eprint("re-exec signal received.");
+      signal_all(signo);
+    }
+}
+
+
+/**
+ * This function is called when a signal that
+ * signals the server to re-exec has been received
+ * 
+ * When this function is invoked, it should set `terminating` to a non-zero value
+ * 
+ * @param  signo  The signal that has been received
+ */
+void received_terminate(int signo)
+{
+  if (terminating == 0)
+    {
+      terminating = 1;
+      eprint("terminate signal received.");
+      signal_all(signo);
+    }
+}
+
diff --git a/src/mds-registry/signals.h b/src/mds-registry/signals.h
new file mode 100644
index 0000000..65070e9
--- /dev/null
+++ b/src/mds-registry/signals.h
@@ -0,0 +1,26 @@
+/**
+ * mds — A micro-display server
+ * Copyright © 2014  Mattias Andrée (maandree@member.fsf.org)
+ * 
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation, either version 3 of the License, or
+ * (at your option) any later version.
+ * 
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+ * GNU General Public License for more details.
+ * 
+ * You should have received a copy of the GNU General Public License
+ * along with this program.  If not, see <http://www.gnu.org/licenses/>.
+ */
+#ifndef MDS_MDS_REGISTRY_SIGNALS_H
+#define MDS_MDS_REGISTRY_SIGNALS_H
+
+
+#include "../mds-base.h"
+
+
+#endif
+
diff --git a/src/mds-registry/slave.c b/src/mds-registry/slave.c
new file mode 100644
index 0000000..9bc2fd1
--- /dev/null
+++ b/src/mds-registry/slave.c
@@ -0,0 +1,403 @@
+/**
+ * mds — A micro-display server
+ * Copyright © 2014  Mattias Andrée (maandree@member.fsf.org)
+ * 
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation, either version 3 of the License, or
+ * (at your option) any later version.
+ * 
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+ * GNU General Public License for more details.
+ * 
+ * You should have received a copy of the GNU General Public License
+ * along with this program.  If not, see <http://www.gnu.org/licenses/>.
+ */
+#include "slave.h"
+
+#include "util.h"
+#include "globals.h"
+
+#include "../mds-base.h"
+
+#include <libmdsserver/macros.h>
+
+#include <string.h>
+#include <errno.h>
+#include <pthread.h>
+
+
+
+/**
+ * Master function for slave threads
+ * 
+ * @param   data  Input data
+ * @return        Output data
+ */
+static void* slave_loop(void* data)
+{
+  slave_t* slave = data;
+  
+  if (slave->closed)
+    goto done;
+  
+  /* Set up traps for especially handled signals. */
+  fail_if (trap_signals() < 0);
+  
+  fail_if ((errno = pthread_mutex_lock(&slave_mutex)));
+  
+  while (!reexecing && !terminating)
+    {
+      if ((slave->wait_set->size == 0) || slave->closed)
+	break;
+      pthread_cond_wait(&slave_cond, &slave_mutex);
+    }
+  
+  pthread_mutex_unlock(&slave_mutex);
+  
+  if (!(slave->closed) && slave->wait_set->size)
+    ; /* FIXME send inside slave_mutex */
+  
+  goto done;
+  
+ pfail:
+  perror(*argv);
+ done:
+  with_mutex (slave_mutex,
+	      if (!reexecing)
+		linked_list_remove(&slave_list, slave->node);
+	      running_slaves--;
+	      if (running_slaves == 0)
+		pthread_cond_signal(&slave_cond);
+	     );
+  return NULL;
+}
+
+
+/**
+ * Start a slave thread
+ * 
+ * @param   wait_set         Set of protocols for which to wait that they become available
+ * @param   recv_client_id   The ID of the waiting client
+ * @param   recv_message_id  The ID of the message that triggered the waiting
+ * @return                   Non-zero on error
+ */
+int start_slave(hash_table_t* restrict wait_set, const char* restrict recv_client_id, const char* restrict recv_message_id)
+{
+  slave_t* slave = slave_create(wait_set, recv_client_id, recv_message_id);
+  size_t slave_address;
+  ssize_t node = LINKED_LIST_UNUSED;
+  
+  fail_if (slave == NULL);
+  fail_if ((errno = pthread_mutex_lock(&slave_mutex)));
+  
+  slave_address = (size_t)(void*)slave;
+  slave->node = node = linked_list_insert_end(&slave_list, slave_address);
+  if (slave->node == LINKED_LIST_UNUSED)
+    goto pfail_in_mutex;
+  
+  if ((errno = pthread_create(&(slave->thread), NULL, slave_loop, (void*)(intptr_t)slave)))
+    goto pfail_in_mutex;
+  
+  if ((errno = pthread_detach(slave->thread)))
+    perror(*argv);
+  
+  running_slaves++;
+  pthread_mutex_unlock(&slave_mutex);
+  
+  return 0;
+ pfail:
+  perror(*argv);
+  goto more_fail;
+ pfail_in_mutex:
+  perror(*argv);
+  pthread_mutex_unlock(&slave_mutex);
+ more_fail:
+  if (node != LINKED_LIST_UNUSED)
+    linked_list_remove(&slave_list, node);
+  return -1;
+}
+
+
+/**
+ * Close all slaves associated with a client
+ * 
+ * @param  client  The client's ID
+ */
+void close_slaves(uint64_t client)
+{
+  ssize_t node;
+  with_mutex (slave_mutex,
+	      foreach_linked_list_node (slave_list, node)
+	        {
+		  slave_t* slave = (slave_t*)(void*)(slave_list.values[node]);
+		  if (slave->client == client)
+		    slave->closed = 1;
+		}
+	     );
+}
+
+
+/**
+ * Notify slaves that a protocol has become available
+ * 
+ * @param   command  The protocol
+ * @return           Non-zero on error, `ernno`will be set accordingly
+ */
+int advance_slaves(char* command)
+{
+  size_t key = (size_t)(void*)command;
+  int signal_slaves = 0;
+  ssize_t node;
+  
+  if ((errno = pthread_mutex_lock(&slave_mutex)))
+    return -1;
+  
+  foreach_linked_list_node (slave_list, node)
+    {
+      slave_t* slave = (slave_t*)(void*)(slave_list.values[node]);
+      if (hash_table_contains_key(slave->wait_set, key))
+	{
+	  hash_table_remove(slave->wait_set, key);
+	  signal_slaves |= slave->wait_set == 0;
+	}
+    }
+  
+  if (signal_slaves)
+    pthread_cond_broadcast(&slave_cond);
+
+  pthread_mutex_unlock(&slave_mutex);
+  return 0;
+}
+
+
+/**
+ * Create a slave
+ * 
+ * @param   wait_set         Set of protocols for which to wait that they become available
+ * @param   recv_client_id   The ID of the waiting client
+ * @param   recv_message_id  The ID of the message that triggered the waiting
+ * @return                   The slave, `NULL` on error, `errno` will be set accordingly
+ */
+slave_t* slave_create(hash_table_t* restrict wait_set, const char* restrict recv_client_id, const char* restrict recv_message_id)
+{
+  slave_t* restrict rc = NULL;
+  
+  if (xmalloc(rc, 1, slave_t))
+    return NULL;
+  
+  slave_initialise(rc);
+  rc->wait_set = wait_set;
+  rc->client = parse_client_id(recv_client_id);
+  
+  if ((rc->client_id = strdup(recv_client_id)) == NULL)
+    goto fail;
+  
+  if ((rc->message_id = strdup(recv_message_id)) == NULL)
+    goto fail;
+  
+  return rc;
+  
+ fail:
+  slave_destroy(rc);
+  free(rc);
+  return NULL;
+}
+
+
+/**
+ * Initialise a slave
+ * 
+ * @param  this  Memory slot in which to store the new slave information
+ */
+void slave_initialise(slave_t* restrict this)
+{
+  this->wait_set = NULL;
+  this->client_id = NULL;
+  this->message_id = NULL;
+  this->closed = 0;
+}
+
+
+/**
+ * Release all resources assoicated with a slave
+ * 
+ * @param  this  The slave information
+ */
+void slave_destroy(slave_t* restrict this)
+{
+  if (this->wait_set != NULL)
+    {
+      hash_table_destroy(this->wait_set, (free_func*)reg_table_free_key, NULL);
+      free(this->wait_set);
+      this->wait_set = NULL;
+    }
+  
+  free(this->client_id);
+  this->client_id = NULL;
+  
+  free(this->message_id);
+  this->message_id = NULL;
+}
+
+
+/**
+ * Calculate the buffer size need to marshal slave information
+ * 
+ * @param   this  The slave information
+ * @return        The number of bytes to allocate to the output buffer
+ */
+size_t slave_marshal_size(const slave_t* restrict this)
+{
+  size_t rc = 2 * sizeof(int) + sizeof(ssize_t) + sizeof(size_t) + sizeof(uint64_t);
+  hash_entry_t* restrict entry;
+  size_t n;
+  
+  rc += (strlen(this->client_id) + strlen(this->message_id) + 2) * sizeof(char);
+  
+  foreach_hash_table_entry (*(this->wait_set), n, entry)
+    {
+      char* protocol = (char*)(void*)(entry->key);
+      rc += strlen(protocol) + 1;
+    }
+  
+  return rc;
+}
+
+
+/**
+ * Marshals slave information
+ * 
+ * @param   this  The slave information
+ * @param   data  Output buffer for the marshalled data
+ * @return        The number of bytes that have been written (everything will be written)
+ */
+size_t slave_marshal(const slave_t* restrict this, char* restrict data)
+{
+  hash_entry_t* restrict entry;
+  size_t n;
+  
+  buf_set_next(data, int, SLAVE_T_VERSION);
+  buf_set_next(data, int, this->closed);
+  buf_set_next(data, ssize_t, this->node);
+  buf_set_next(data, uint64_t, this->client);
+  
+  memcpy(data, this->client_id, (strlen(this->client_id) + 1) * sizeof(char));
+  data += strlen(this->client_id) + 1;
+  
+  memcpy(data, this->message_id, (strlen(this->message_id) + 1) * sizeof(char));
+  data += strlen(this->message_id) + 1;
+  
+  n = this->wait_set->size;
+  buf_set_next(data, size_t, n);
+  
+  foreach_hash_table_entry (*(this->wait_set), n, entry)
+    {
+      char* restrict protocol = (char*)(void*)(entry->key);
+      memcpy(data, protocol, (strlen(protocol) + 1) * sizeof(char));
+      data += strlen(protocol) + 1;
+    }
+  
+  return slave_marshal_size(this);
+}
+
+
+/**
+ * Unmarshals slave information
+ * 
+ * @param   this  Memory slot in which to store the new slave information
+ * @param   data  In buffer with the marshalled data
+ * @return        Zero on error, errno will be set accordingly, otherwise the number of read bytes.
+ *                Destroy the slave information on error.
+ */
+size_t slave_unmarshal(slave_t* restrict this, char* restrict data)
+{
+  size_t key, n, m, rc = 2 * sizeof(int) + sizeof(ssize_t) + sizeof(size_t) + sizeof(uint64_t);
+  char* protocol;
+  
+  this->wait_set = NULL;
+  this->client_id = NULL;
+  this->message_id = NULL;
+  
+  /* buf_get_next(data, int, SLAVE_T_VERSION); */
+  buf_next(data, int, 1);
+  
+  buf_get_next(data, int, this->closed);
+  buf_get_next(data, ssize_t, this->node);
+  buf_get_next(data, uint64_t, this->client);
+  
+  n = (strlen((char*)data) + 1) * sizeof(char);
+  if ((this->client_id = malloc(n)) == NULL)
+    return 0;
+  memcpy(this->client_id, data, n);
+  data += n, rc += n;
+  
+  n = (strlen((char*)data) + 1) * sizeof(char);
+  if ((this->message_id = malloc(n)) == NULL)
+    return 0;
+  memcpy(this->message_id, data, n);
+  data += n, rc += n;
+  
+  if ((this->wait_set = malloc(sizeof(hash_table_t))) == NULL)
+    return 0;
+  if (hash_table_create(this->wait_set))
+    return 0;
+  
+  buf_get_next(data, size_t, m);
+  
+  while (m--)
+    {
+      n = (strlen((char*)data) + 1) * sizeof(char);
+      if ((protocol = malloc(n)) == NULL)
+	return 0;
+      memcpy(protocol, data, n);
+      data += n, rc += n;
+      
+      key = (size_t)(void*)protocol;
+      if (hash_table_put(this->wait_set, key, 1) == 0)
+	if (errno)
+	  {
+	    free(protocol);
+	    return 0;
+	  }
+    }
+  
+  return rc;
+}
+
+
+/**
+ * Pretend to unmarshal slave information
+ * 
+ * @param   data  In buffer with the marshalled data
+ * @return        The number of read bytes
+ */
+size_t slave_unmarshal_skip(char* restrict data)
+{
+  size_t n, m, rc = 2 * sizeof(int) + sizeof(ssize_t) + sizeof(size_t) + sizeof(uint64_t);
+  
+  /* buf_get_next(data, int, SLAVE_T_VERSION); */
+  buf_next(data, int, 1);
+  
+  buf_next(data, int, 1);
+  buf_next(data, ssize_t, 1);
+  
+  n = (strlen((char*)data) + 1) * sizeof(char);
+  data += n, rc += n;
+  
+  n = (strlen((char*)data) + 1) * sizeof(char);
+  data += n, rc += n;
+  
+  buf_get_next(data, size_t, m);
+  
+  while (m--)
+    {
+      n = (strlen((char*)data) + 1) * sizeof(char);
+      data += n, rc += n;
+    }
+  
+  return rc;
+}
+
diff --git a/src/mds-registry/slave.h b/src/mds-registry/slave.h
new file mode 100644
index 0000000..ae8ae09
--- /dev/null
+++ b/src/mds-registry/slave.h
@@ -0,0 +1,160 @@
+/**
+ * mds — A micro-display server
+ * Copyright © 2014  Mattias Andrée (maandree@member.fsf.org)
+ * 
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation, either version 3 of the License, or
+ * (at your option) any later version.
+ * 
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+ * GNU General Public License for more details.
+ * 
+ * You should have received a copy of the GNU General Public License
+ * along with this program.  If not, see <http://www.gnu.org/licenses/>.
+ */
+#ifndef MDS_MDS_REGISTRY_SLAVE_H
+#define MDS_MDS_REGISTRY_SLAVE_H
+
+
+#include <libmdsserver/hash-table.h>
+
+#include <stdlib.h>
+#include <stdint.h>
+#include <pthread.h>
+
+
+
+#define SLAVE_T_VERSION  0
+
+/**
+ * Slave information, a thread waiting for protocols to become available
+ */
+typedef struct slave /* TODO: add time-to-live */
+{
+  /**
+   * Set of protocols for which to wait that they become available
+   */
+  hash_table_t* wait_set;
+  
+  /**
+   * The ID of the waiting client
+   */
+  uint64_t client;
+  
+  /**
+   * The ID of the waiting client
+   */
+  char* client_id;
+  
+  /**
+   * The ID of the message that triggered the waiting
+   */
+  char* message_id;
+  
+  /**
+   * The slave's node in the linked list of slaves
+   */
+  ssize_t node;
+  
+  /**
+   * Whether the client has been closed
+   */
+  volatile int closed;
+  
+  /**
+   * The slave thread
+   */
+  pthread_t thread;
+  
+} slave_t;
+
+
+
+/**
+ * Start a slave thread
+ * 
+ * @param   wait_set         Set of protocols for which to wait that they become available
+ * @param   recv_client_id   The ID of the waiting client
+ * @param   recv_message_id  The ID of the message that triggered the waiting
+ * @return                   Non-zero on error
+ */
+int start_slave(hash_table_t* restrict wait_set, const char* restrict recv_client_id, const char* restrict recv_message_id);
+
+/**
+ * Close all slaves associated with a client
+ * 
+ * @param  client  The client's ID
+ */
+void close_slaves(uint64_t client);
+
+/**
+ * Notify slaves that a protocol has become available
+ * 
+ * @param   command  The protocol
+ * @return           Non-zero on error, `ernno`will be set accordingly
+ */
+int advance_slaves(char* command);
+
+/**
+ * Create a slave
+ * 
+ * @return  The slave
+ */
+slave_t* slave_create(hash_table_t* restrict wait_set, const char* restrict recv_client_id, const char* restrict recv_message_id);
+
+
+/**
+ * Initialise a slave
+ * 
+ * @param  this  Memory slot in which to store the new slave information
+ */
+void slave_initialise(slave_t* restrict this);
+
+/**
+ * Release all resources assoicated with a slave
+ * 
+ * @param  this  The slave information
+ */
+void slave_destroy(slave_t* restrict this);
+
+/**
+ * Calculate the buffer size need to marshal slave information
+ * 
+ * @param   this  The slave information
+ * @return        The number of bytes to allocate to the output buffer
+ */
+size_t slave_marshal_size(const slave_t* restrict this) __attribute__((pure));
+
+/**
+ * Marshals slave information
+ * 
+ * @param   this  The slave information
+ * @param   data  Output buffer for the marshalled data
+ * @return        The number of bytes that have been written (everything will be written)
+ */
+size_t slave_marshal(const slave_t* restrict this, char* restrict data);
+
+/**
+ * Unmarshals slave information
+ * 
+ * @param   this  Memory slot in which to store the new slave information
+ * @param   data  In buffer with the marshalled data
+ * @return        Zero on error, errno will be set accordingly, otherwise the number of read bytes.
+ *                Destroy the slave information on error.
+ */
+size_t slave_unmarshal(slave_t* restrict this, char* restrict data);
+
+/**
+ * Pretend to unmarshal slave information
+ * 
+ * @param   data  In buffer with the marshalled data
+ * @return        The number of read bytes
+ */
+size_t slave_unmarshal_skip(char* restrict data) __attribute__((pure));
+
+
+#endif
+
diff --git a/src/mds-server/mds-server.c b/src/mds-server/mds-server.c
index d6846ec..d0b6b87 100644
--- a/src/mds-server/mds-server.c
+++ b/src/mds-server/mds-server.c
@@ -248,7 +248,7 @@ int accept_connection(void)
  * Master function for slave threads
  * 
  * @param   data  Input data
- * @return        Outout data
+ * @return        Output data
  */
 void* slave_loop(void* data)
 {
diff --git a/src/mds-server/mds-server.h b/src/mds-server/mds-server.h
index 3bce7ed..4d6c5e7 100644
--- a/src/mds-server/mds-server.h
+++ b/src/mds-server/mds-server.h
@@ -35,7 +35,7 @@ int accept_connection(void);
  * Master function for slave threads
  * 
  * @param   data  Input data
- * @return        Outout data
+ * @return        Output data
  */
 void* slave_loop(void* data);
 
-- 
cgit v1.2.3-70-g09d2