aboutsummaryrefslogtreecommitdiffstats
path: root/src/mds-registry
diff options
context:
space:
mode:
authorMattias Andrée <maandree@kth.se>2017-11-05 00:09:50 +0100
committerMattias Andrée <maandree@kth.se>2017-11-05 00:09:50 +0100
commit9e8dec188d55ca1f0a3b33acab702ced8ed07a18 (patch)
treecbb43c22e72674dc672e645e6596358e3868568e /src/mds-registry
parenttypo (diff)
downloadmds-9e8dec188d55ca1f0a3b33acab702ced8ed07a18.tar.gz
mds-9e8dec188d55ca1f0a3b33acab702ced8ed07a18.tar.bz2
mds-9e8dec188d55ca1f0a3b33acab702ced8ed07a18.tar.xz
Work on changing style, and an important typo fix
Signed-off-by: Mattias Andrée <maandree@kth.se>
Diffstat (limited to '')
-rw-r--r--src/mds-registry/globals.c6
-rw-r--r--src/mds-registry/globals.h8
-rw-r--r--src/mds-registry/mds-registry.c246
-rw-r--r--src/mds-registry/mds-registry.h1
-rw-r--r--src/mds-registry/reexec.c296
-rw-r--r--src/mds-registry/reexec.h1
-rw-r--r--src/mds-registry/registry.c699
-rw-r--r--src/mds-registry/registry.h1
-rw-r--r--src/mds-registry/signals.c33
-rw-r--r--src/mds-registry/signals.h1
-rw-r--r--src/mds-registry/slave.c677
-rw-r--r--src/mds-registry/slave.h119
-rw-r--r--src/mds-registry/util.c11
-rw-r--r--src/mds-registry/util.h2
14 files changed, 1029 insertions, 1072 deletions
diff --git a/src/mds-registry/globals.c b/src/mds-registry/globals.c
index 6a93f3f..98d54d5 100644
--- a/src/mds-registry/globals.c
+++ b/src/mds-registry/globals.c
@@ -18,7 +18,6 @@
#include "globals.h"
-
/**
* Value of the ‘Message ID’ header for the next message
*/
@@ -42,7 +41,7 @@ hash_table_t reg_table;
/**
* Reusable buffer for data to send
*/
-char* send_buffer = NULL;
+char *send_buffer = NULL;
/**
* The size of `send_buffer`
@@ -52,7 +51,7 @@ size_t send_buffer_size = 0;
/**
* Used to temporarily store the old value when reallocating heap-allocations
*/
-char* old;
+char *old;
/**
* The master thread
@@ -78,4 +77,3 @@ pthread_mutex_t slave_mutex;
* Condition for slave data
*/
pthread_cond_t slave_cond;
-
diff --git a/src/mds-registry/globals.h b/src/mds-registry/globals.h
index 68bd5c6..e1d7aab 100644
--- a/src/mds-registry/globals.h
+++ b/src/mds-registry/globals.h
@@ -28,8 +28,7 @@
#include <pthread.h>
-#define MDS_REGISTRY_VARS_VERSION 0
-
+#define MDS_REGISTRY_VARS_VERSION 0
/**
@@ -55,7 +54,7 @@ extern hash_table_t reg_table;
/**
* Reusable buffer for data to send
*/
-extern char* send_buffer;
+extern char *send_buffer;
/**
* The size of `send_buffer`
@@ -65,7 +64,7 @@ extern size_t send_buffer_size;
/**
* Used to temporarily store the old value when reallocating heap-allocations
*/
-extern char* old;
+extern char *old;
/**
* The number of running slaves
@@ -89,4 +88,3 @@ extern pthread_cond_t slave_cond;
#endif
-
diff --git a/src/mds-registry/mds-registry.c b/src/mds-registry/mds-registry.c
index 6d93517..8927a62 100644
--- a/src/mds-registry/mds-registry.c
+++ b/src/mds-registry/mds-registry.c
@@ -38,15 +38,14 @@
*
* This tells the server-base how to behave
*/
-server_characteristics_t server_characteristics =
- {
- .require_privileges = 0,
- .require_display = 1,
- .require_respawn_info = 0,
- .sanity_check_argc = 1,
- .fork_for_safety = 0,
- .danger_is_deadly = 0
- };
+server_characteristics_t server_characteristics = {
+ .require_privileges = 0,
+ .require_display = 1,
+ .require_respawn_info = 0,
+ .sanity_check_argc = 1,
+ .fork_for_safety = 0,
+ .danger_is_deadly = 0
+};
@@ -57,8 +56,8 @@ server_characteristics_t server_characteristics =
* @param length:size_t The length of the message
* @return :int Zero on success, -1 on error
*/
-#define full_send(message, length) \
- ((full_send)(socket_fd, message, length))
+#define full_send(message, length)\
+ ((full_send)(socket_fd, message, length))
/**
@@ -69,20 +68,20 @@ server_characteristics_t server_characteristics =
*/
int preinitialise_server(void)
{
- int stage = 0;
-
- fail_if ((errno = pthread_mutex_init(&slave_mutex, NULL))); stage++;
- fail_if ((errno = pthread_cond_init(&slave_cond, NULL))); stage++;
-
- linked_list_create(&slave_list, 2);
-
- return 0;
-
- fail:
- xperror(*argv);
- if (stage >= 1) pthread_mutex_destroy(&slave_mutex);
- if (stage >= 2) pthread_cond_destroy(&slave_cond);
- return 1;
+ int stage = 0;
+
+ fail_if ((errno = pthread_mutex_init(&slave_mutex, NULL))); stage++;
+ fail_if ((errno = pthread_cond_init(&slave_cond, NULL))); stage++;
+
+ linked_list_create(&slave_list, 2);
+
+ return 0;
+
+fail:
+ xperror(*argv);
+ if (stage >= 1) pthread_mutex_destroy(&slave_mutex);
+ if (stage >= 2) pthread_cond_destroy(&slave_cond);
+ return 1;
}
@@ -92,43 +91,44 @@ int preinitialise_server(void)
*
* @return Non-zero on error
*/
-int initialise_server(void)
+int
+initialise_server(void)
{
- int stage = 0;
- const char* const message =
- "Command: intercept\n"
- "Message ID: 0\n"
- "Length: 32\n"
- "\n"
- "Command: register\n"
- "Client closed\n"
- /* -- NEXT MESSAGE -- */
- "Command: reregister\n"
- "Message ID: 1\n"
- "\n";
-
- /* We are asking all servers to reregister their
- protocols for two reasons:
-
- 1) The server would otherwise not get registrations
- from servers started before this server.
- 2) If this server crashes we may miss registrations
- that happen between the crash and the recovery.
- */
-
- fail_if (full_send(message, strlen(message))); stage++;
- fail_if (hash_table_create_tuned(&reg_table, 32));
- reg_table.key_comparator = (compare_func*)string_comparator;
- reg_table.hasher = (hash_func*)string_hash;
- fail_if (server_initialised() < 0); stage++;
- fail_if (mds_message_initialise(&received));
-
- return 0;
- fail:
- xperror(*argv);
- if (stage >= 1) hash_table_destroy(&reg_table, NULL, NULL);
- if (stage >= 2) mds_message_destroy(&received);
- return 1;
+ int stage = 0;
+ const char *const message =
+ "Command: intercept\n"
+ "Message ID: 0\n"
+ "Length: 32\n"
+ "\n"
+ "Command: register\n"
+ "Client closed\n"
+ /* -- NEXT MESSAGE -- */
+ "Command: reregister\n"
+ "Message ID: 1\n"
+ "\n";
+
+ /* We are asking all servers to reregister their
+ protocols for two reasons:
+
+ 1) The server would otherwise not get registrations
+ from servers started before this server.
+ 2) If this server crashes we may miss registrations
+ that happen between the crash and the recovery.
+ */
+
+ fail_if (full_send(message, strlen(message))); stage++;
+ fail_if (hash_table_create_tuned(&reg_table, 32));
+ reg_table.key_comparator = (compare_func*)string_comparator;
+ reg_table.hasher = (hash_func*)string_hash;
+ fail_if (server_initialised() < 0); stage++;
+ fail_if (mds_message_initialise(&received));
+
+ return 0;
+fail:
+ xperror(*argv);
+ if (stage >= 1) hash_table_destroy(&reg_table, NULL, NULL);
+ if (stage >= 2) mds_message_destroy(&received);
+ return 1;
}
@@ -138,17 +138,18 @@ int initialise_server(void)
*
* @return Non-zero on error
*/
-int postinitialise_server(void)
+int
+postinitialise_server(void)
{
- if (connected)
- return 0;
-
- fail_if (reconnect_to_display());
- connected = 1;
- return 0;
- fail:
- mds_message_destroy(&received);
- return 1;
+ if (connected)
+ return 0;
+
+ fail_if (reconnect_to_display());
+ connected = 1;
+ return 0;
+fail:
+ mds_message_destroy(&received);
+ return 1;
}
@@ -157,60 +158,57 @@ int postinitialise_server(void)
*
* @return Non-zero on error
*/
-int master_loop(void)
+int
+master_loop(void)
{
- int rc = 1, r;
-
- while (!reexecing && !terminating)
- {
- if (danger)
- {
- danger = 0;
- free(send_buffer), send_buffer = NULL;
- send_buffer_size = 0;
- with_mutex (slave_mutex, linked_list_pack(&slave_list););
+ int rc = 1, r;
+
+ while (!reexecing && !terminating) {
+ if (danger) {
+ danger = 0;
+ free(send_buffer);
+ send_buffer = NULL;
+ send_buffer_size = 0;
+ with_mutex (slave_mutex, linked_list_pack(&slave_list););
+ }
+
+ if (!(r = mds_message_read(&received, socket_fd)))
+ if (!(r = handle_message()))
+ continue;
+
+ if (r == -2) {
+ eprint("corrupt message received, aborting.");
+ goto done;
+ } else if (errno == EINTR) {
+ continue;
+ } else {
+ fail_if (errno != ECONNRESET);
+ }
+
+ eprint("lost connection to server.");
+ mds_message_destroy(&received);
+ mds_message_initialise(&received);
+ connected = 0;
+ fail_if (reconnect_to_display());
+ connected = 1;
}
-
- if (r = mds_message_read(&received, socket_fd), r == 0)
- if (r = handle_message(), r == 0)
- continue;
-
- if (r == -2)
- {
- eprint("corrupt message received, aborting.");
- goto done;
- }
- else if (errno == EINTR)
- continue;
- else
- fail_if (errno != ECONNRESET);
-
- eprint("lost connection to server.");
- mds_message_destroy(&received);
- mds_message_initialise(&received);
- connected = 0;
- fail_if (reconnect_to_display());
- connected = 1;
- }
-
- rc = 0;
- goto done;
- fail:
- xperror(*argv);
+
+ rc = 0;
+ goto done;
+fail:
+ xperror(*argv);
done:
- /* Join with all slaves threads. */
- with_mutex (slave_mutex,
- while (running_slaves > 0)
- pthread_cond_wait(&slave_cond, &slave_mutex););
-
- if (rc || !reexecing)
- {
- hash_table_destroy(&reg_table, (free_func*)reg_table_free_key, (free_func*)reg_table_free_value);
- mds_message_destroy(&received);
- }
- pthread_mutex_destroy(&slave_mutex);
- pthread_cond_destroy(&slave_cond);
- free(send_buffer);
- return rc;
+ /* Join with all slaves threads. */
+ with_mutex (slave_mutex,
+ while (running_slaves > 0)
+ pthread_cond_wait(&slave_cond, &slave_mutex););
+
+ if (rc || !reexecing) {
+ hash_table_destroy(&reg_table, (free_func*)reg_table_free_key, (free_func*)reg_table_free_value);
+ mds_message_destroy(&received);
+ }
+ pthread_mutex_destroy(&slave_mutex);
+ pthread_cond_destroy(&slave_cond);
+ free(send_buffer);
+ return rc;
}
-
diff --git a/src/mds-registry/mds-registry.h b/src/mds-registry/mds-registry.h
index 75b5ddf..88c0734 100644
--- a/src/mds-registry/mds-registry.h
+++ b/src/mds-registry/mds-registry.h
@@ -23,4 +23,3 @@
#endif
-
diff --git a/src/mds-registry/reexec.c b/src/mds-registry/reexec.c
index bd70381..e7280af 100644
--- a/src/mds-registry/reexec.c
+++ b/src/mds-registry/reexec.c
@@ -40,31 +40,34 @@
*
* @return The number of bytes that will be stored by `marshal_server`
*/
-size_t marshal_server_size(void)
+size_t
+marshal_server_size(void)
{
- size_t i, rc = 2 * sizeof(int) + sizeof(uint32_t) + 4 * sizeof(size_t);
- hash_entry_t* entry;
- ssize_t node;
-
- rc += mds_message_marshal_size(&received);
- rc += linked_list_marshal_size(&slave_list);
-
- foreach_hash_table_entry (reg_table, i, entry)
- {
- char* command = (char*)(void*)(entry->key);
- size_t len = strlen(command) + 1;
- client_list_t* list = (client_list_t*)(void*)(entry->value);
-
- rc += len + sizeof(size_t) + client_list_marshal_size(list);
- }
-
- foreach_linked_list_node (slave_list, node)
- {
- slave_t* slave = (slave_t*)(void*)slave_list.values[node];
- rc += slave_marshal_size(slave);
- }
-
- return rc;
+ size_t i, rc = 2 * sizeof(int) + sizeof(uint32_t) + 4 * sizeof(size_t);
+ hash_entry_t *entry;
+ ssize_t node;
+ char *command;
+ size_t len;
+ client_list_t *list;
+ slave_t *slave;
+
+ rc += mds_message_marshal_size(&received);
+ rc += linked_list_marshal_size(&slave_list);
+
+ foreach_hash_table_entry (reg_table, i, entry) {
+ command = (void *)entry->key;
+ len = strlen(command) + 1;
+ list = (void *)entry->value;
+
+ rc += len + sizeof(size_t) + client_list_marshal_size(list);
+ }
+
+ foreach_linked_list_node (slave_list, node) {
+ slave = (void *)slave_list.values[node];
+ rc += slave_marshal_size(slave);
+ }
+
+ return rc;
}
@@ -74,52 +77,55 @@ size_t marshal_server_size(void)
* @param state_buf The buffer for the marshalled data
* @return Non-zero on error
*/
-int marshal_server(char* state_buf)
+int
+marshal_server(char *state_buf)
{
- size_t i, n = mds_message_marshal_size(&received);
- hash_entry_t* entry;
- ssize_t node;
-
- buf_set_next(state_buf, int, MDS_REGISTRY_VARS_VERSION);
- buf_set_next(state_buf, int, connected);
- buf_set_next(state_buf, uint32_t, message_id);
- buf_set_next(state_buf, size_t, n);
- mds_message_marshal(&received, state_buf);
- state_buf += n / sizeof(char);
-
- buf_set_next(state_buf, size_t, reg_table.capacity);
- buf_set_next(state_buf, size_t, reg_table.size);
- foreach_hash_table_entry (reg_table, i, entry)
- {
- char* command = (char*)(void*)(entry->key);
- size_t len = strlen(command) + 1;
- client_list_t* list = (client_list_t*)(void*)(entry->value);
-
- memcpy(state_buf, command, len * sizeof(char));
- state_buf += len;
-
- n = client_list_marshal_size(list);
- buf_set_next(state_buf, size_t, n);
- client_list_marshal(list, state_buf);
- state_buf += n / sizeof(char);
- }
-
- n = linked_list_marshal_size(&slave_list);
- buf_set_next(state_buf, size_t, n);
- linked_list_marshal(&slave_list, state_buf);
- state_buf += n / sizeof(char);
-
- foreach_linked_list_node (slave_list, node)
- {
- slave_t* slave = (slave_t*)(void*)(slave_list.values[node]);
- state_buf += slave_marshal(slave, state_buf) / sizeof(char);
- slave_destroy(slave);
- }
-
- hash_table_destroy(&reg_table, (free_func*)reg_table_free_key, (free_func*)reg_table_free_value);
- mds_message_destroy(&received);
- linked_list_destroy(&slave_list);
- return 0;
+ size_t i, n = mds_message_marshal_size(&received);
+ hash_entry_t *entry;
+ ssize_t node;
+ char *command;
+ size_t len;
+ client_list_t *list;
+ slave_t *slave;
+
+ buf_set_next(state_buf, int, MDS_REGISTRY_VARS_VERSION);
+ buf_set_next(state_buf, int, connected);
+ buf_set_next(state_buf, uint32_t, message_id);
+ buf_set_next(state_buf, size_t, n);
+ mds_message_marshal(&received, state_buf);
+ state_buf += n / sizeof(char);
+
+ buf_set_next(state_buf, size_t, reg_table.capacity);
+ buf_set_next(state_buf, size_t, reg_table.size);
+ foreach_hash_table_entry (reg_table, i, entry) {
+ command = (void *)(entry->key);
+ len = strlen(command) + 1;
+ list = (void *)(entry->value);
+
+ memcpy(state_buf, command, len * sizeof(char));
+ state_buf += len;
+
+ n = client_list_marshal_size(list);
+ buf_set_next(state_buf, size_t, n);
+ client_list_marshal(list, state_buf);
+ state_buf += n / sizeof(char);
+ }
+
+ n = linked_list_marshal_size(&slave_list);
+ buf_set_next(state_buf, size_t, n);
+ linked_list_marshal(&slave_list, state_buf);
+ state_buf += n / sizeof(char);
+
+ foreach_linked_list_node (slave_list, node) {
+ slave = (void *)(slave_list.values[node]);
+ state_buf += slave_marshal(slave, state_buf) / sizeof(char);
+ slave_destroy(slave);
+ }
+
+ hash_table_destroy(&reg_table, (free_func *)reg_table_free_key, (free_func *)reg_table_free_value);
+ mds_message_destroy(&received);
+ linked_list_destroy(&slave_list);
+ return 0;
}
@@ -133,81 +139,79 @@ int marshal_server(char* state_buf)
* @param state_buf The marshalled data that as not been read already
* @return Non-zero on error
*/
-int unmarshal_server(char* state_buf)
+int
+unmarshal_server(char *state_buf)
{
- char* command;
- client_list_t* list;
- slave_t* slave;
- size_t i, n, m;
- ssize_t node;
- int stage = 0;
-
- /* buf_get_next(state_buf, int, MDS_REGISTRY_VARS_VERSION); */
- buf_next(state_buf, int, 1);
- buf_get_next(state_buf, int, connected);
- buf_get_next(state_buf, uint32_t, message_id);
- buf_get_next(state_buf, size_t, n);
- fail_if (mds_message_unmarshal(&received, state_buf));
- state_buf += n / sizeof(char);
- stage = 1;
-
- buf_get_next(state_buf, size_t, n);
- fail_if (hash_table_create_tuned(&reg_table, n));
- buf_get_next(state_buf, size_t, n);
- for (i = 0; i < n; i++)
- {
- stage = 1;
- fail_if (xstrdup(command, state_buf));
- state_buf += strlen(command) + 1;
-
- stage = 2;
- fail_if (xmalloc(list, 1, client_list_t));
- buf_get_next(state_buf, size_t, m);
- stage = 3;
- fail_if (client_list_unmarshal(list, state_buf));
- state_buf += m / sizeof(char);
-
- hash_table_put(&reg_table, (size_t)(void*)command, (size_t)(void*)list);
- fail_if (errno);
- }
- command = NULL;
- stage = 4;
-
- reg_table.key_comparator = (compare_func*)string_comparator;
- reg_table.hasher = (hash_func*)string_hash;
-
- buf_get_next(state_buf, size_t, n);
- fail_if (linked_list_unmarshal(&slave_list, state_buf));
- state_buf += n / sizeof(char);
-
- foreach_linked_list_node (slave_list, node)
- {
- stage = 5;
- fail_if (xmalloc(slave, 1, slave_t));
- stage = 6;
- fail_if ((n = slave_unmarshal(slave, state_buf)) == 0);
- state_buf += n / sizeof(char);
- slave_list.values[node] = (size_t)(void*)slave;
- }
-
- foreach_linked_list_node (slave_list, node)
- {
- slave = (slave_t*)(void*)(slave_list.values[node]);
- fail_if (start_created_slave(slave));
- }
-
- return 0;
- fail:
- xperror(*argv);
- mds_message_destroy(&received);
- if (stage >= 1)
- hash_table_destroy(&reg_table, (free_func*)reg_table_free_key, (free_func*)reg_table_free_value);
- if (stage >= 2) free(command);
- if (stage >= 3) client_list_destroy(list), free(list);
- if (stage >= 5) linked_list_destroy(&slave_list);
- if (stage >= 6) slave_destroy(slave), free(slave);
- abort();
- return -1;
+ char *command;
+ client_list_t *list;
+ slave_t *slave;
+ size_t i, n, m;
+ ssize_t node;
+ int stage = 0;
+
+ /* buf_get_next(state_buf, int, MDS_REGISTRY_VARS_VERSION); */
+ buf_next(state_buf, int, 1);
+ buf_get_next(state_buf, int, connected);
+ buf_get_next(state_buf, uint32_t, message_id);
+ buf_get_next(state_buf, size_t, n);
+ fail_if (mds_message_unmarshal(&received, state_buf));
+ state_buf += n / sizeof(char);
+ stage = 1;
+
+ buf_get_next(state_buf, size_t, n);
+ fail_if (hash_table_create_tuned(&reg_table, n));
+ buf_get_next(state_buf, size_t, n);
+ for (i = 0; i < n; i++) {
+ stage = 1;
+ fail_if (xstrdup(command, state_buf));
+ state_buf += strlen(command) + 1;
+
+ stage = 2;
+ fail_if (xmalloc(list, 1, client_list_t));
+ buf_get_next(state_buf, size_t, m);
+ stage = 3;
+ fail_if (client_list_unmarshal(list, state_buf));
+ state_buf += m / sizeof(char);
+
+ hash_table_put(&reg_table, (size_t)(void *)command, (size_t)(void *)list);
+ fail_if (errno);
+ }
+ command = NULL;
+ stage = 4;
+
+ reg_table.key_comparator = (compare_func*)string_comparator;
+ reg_table.hasher = (hash_func*)string_hash;
+
+ buf_get_next(state_buf, size_t, n);
+ fail_if (linked_list_unmarshal(&slave_list, state_buf));
+ state_buf += n / sizeof(char);
+
+ foreach_linked_list_node (slave_list, node) {
+ stage = 5;
+ fail_if (xmalloc(slave, 1, slave_t));
+ stage = 6;
+ fail_if ((n = slave_unmarshal(slave, state_buf)) == 0);
+ state_buf += n / sizeof(char);
+ slave_list.values[node] = (size_t)(void *)slave;
+ }
+
+ foreach_linked_list_node (slave_list, node) {
+ slave = (slave_t *)(void *)(slave_list.values[node]);
+ fail_if (start_created_slave(slave));
+ }
+
+ return 0;
+fail:
+ xperror(*argv);
+ mds_message_destroy(&received);
+ if (stage >= 1)
+ hash_table_destroy(&reg_table, (free_func *)reg_table_free_key, (free_func *)reg_table_free_value);
+ if (stage >= 2) free(command);
+ if (stage >= 3) client_list_destroy(list), free(list);
+ if (stage >= 5) linked_list_destroy(&slave_list);
+ if (stage >= 6) slave_destroy(slave), free(slave);
+ abort();
+ return -1;
}
@@ -217,8 +221,8 @@ int unmarshal_server(char* state_buf)
*
* @return Non-zero on error
*/
-int __attribute__((const)) reexec_failure_recover(void)
+int __attribute__((const))
+reexec_failure_recover(void)
{
- return -1;
+ return -1;
}
-
diff --git a/src/mds-registry/reexec.h b/src/mds-registry/reexec.h
index 7b95466..4238c9a 100644
--- a/src/mds-registry/reexec.h
+++ b/src/mds-registry/reexec.h
@@ -23,4 +23,3 @@
#endif
-
diff --git a/src/mds-registry/registry.c b/src/mds-registry/registry.c
index 77cd463..28b38d1 100644
--- a/src/mds-registry/registry.c
+++ b/src/mds-registry/registry.c
@@ -44,8 +44,8 @@
* @param length:size_t The length of the message
* @return :int Zero on success, -1 on error
*/
-#define full_send(message, length) \
- ((full_send)(socket_fd, message, length))
+#define full_send(message, length)\
+ ((full_send)(socket_fd, message, length))
/**
@@ -54,74 +54,69 @@
* @return Zero on success -1 on error or interruption,
* `errno` will be set accordingly
*/
-static int handle_close_message(void)
+static int
+handle_close_message(void)
{
- /* Servers do not close too often, there is no need to
- optimise this with another hash table. Doing so would
- also require some caution because the keys are 32-bit
- on 32-bit computers, and the client ID:s are 64-bit. */
-
- size_t i, j, ptr = 0, size = 1;
- size_t* keys = NULL;
- size_t* old_keys;
-
-
- /* Remove server for all protocols. */
-
- for (i = 0; i < received.header_count; i++)
- if (startswith(received.headers[i], "Client closed: "))
- {
- uint64_t client = parse_client_id(received.headers[i] + strlen("Client closed: "));
- hash_entry_t* entry;
-
- foreach_hash_table_entry (reg_table, j, entry)
- {
- /* Remove server from list of servers that support the protocol,
- once, if it is in the list. */
- client_list_t* list = (client_list_t*)(void*)(entry->value);
- client_list_remove(list, client);
- if (list->size)
- continue;
-
- /* If no servers support the protocol, list the protocol for removal. */
- fail_if ((keys == NULL) && xmalloc(keys, size, size_t));
- fail_if (ptr == size ? growalloc(old_keys, keys, size, size_t) : 0);
- keys[ptr++] = entry->key;
- }
-
-
- /* Mark client as closed. */
+ /* Servers do not close too often, there is no need to
+ optimise this with another hash table. Doing so would
+ also require some caution because the keys are 32-bit
+ on 32-bit computers, and the client ID:s are 64-bit. */
+
+ size_t i, j, ptr = 0, size = 1;
+ size_t *keys = NULL;
+ size_t *old_keys;
+ uint64_t client;
+ hash_entry_t *entry;
+ client_list_t *list;
+ char *command;
+
+ /* Remove server for all protocols. */
+ for (i = 0; i < received.header_count; i++) {
+ if (startswith(received.headers[i], "Client closed: ")) {
+ client = parse_client_id(received.headers[i] + strlen("Client closed: "));
+
+ foreach_hash_table_entry (reg_table, j, entry) {
+ /* Remove server from list of servers that support the protocol,
+ once, if it is in the list. */
+ list = (void *)(entry->value);
+ client_list_remove(list, client);
+ if (list->size)
+ continue;
+
+ /* If no servers support the protocol, list the protocol for removal. */
+ fail_if (!keys && xmalloc(keys, size, size_t));
+ fail_if (ptr == size ? growalloc(old_keys, keys, size, size_t) : 0);
+ keys[ptr++] = entry->key;
+ }
+
- close_slaves(client);
- }
-
-
- /* Close slaves those clients have closed. */
-
- with_mutex (slave_mutex, pthread_cond_broadcast(&slave_cond););
-
-
- /* Remove protocol that no longer have any supporting servers. */
-
- for (i = 0; i < ptr; i++)
- {
- hash_entry_t* entry = hash_table_get_entry(&reg_table, keys[i]);
- client_list_t* list = (client_list_t*)(void*)(entry->value);
- char* command = (char*)(void*)(entry->key);
-
- hash_table_remove(&reg_table, entry->key);
-
- client_list_destroy(list);
- free(list);
- free(command);
- }
-
- free(keys);
- return 0;
- fail:
- xperror(*argv);
- free(keys);
- return -1;
+ /* Mark client as closed. */
+ close_slaves(client);
+ }
+ }
+
+ /* Close slaves those clients have closed. */
+ with_mutex (slave_mutex, pthread_cond_broadcast(&slave_cond););
+
+ /* Remove protocol that no longer have any supporting servers. */
+ for (i = 0; i < ptr; i++) {
+ entry = hash_table_get_entry(&reg_table, keys[i]);
+ list = (void *)(entry->value);
+ command = (void *)(entry->key);
+
+ hash_table_remove(&reg_table, entry->key);
+
+ client_list_destroy(list);
+ free(list);
+ free(command);
+ }
+
+ free(keys);
+ return 0;
+fail:
+ xperror(*argv);
+ free(keys);
+ return -1;
}
@@ -134,54 +129,50 @@ static int handle_close_message(void)
* @param client The ID of the client that implements the server-side of the protocol
* @return Non-zero on error
*/
-__attribute__((nonnull))
-static int registry_action_add(int has_key, char* command, size_t command_key, uint64_t client)
+static int __attribute__((nonnull))
+registry_action_add(int has_key, char *command, size_t command_key, uint64_t client)
{
- int saved_errno;
-
- if (has_key)
- {
- /* Add server to protocol if the protocol is already in the table. */
- size_t address = hash_table_get(&reg_table, command_key);
- client_list_t* list = (client_list_t*)(void*)address;
- fail_if (client_list_add(list, client) < 0);
- }
- else
- {
- /* If the protocol is not already in the table. */
-
- /* Allocate list of servers for the protocol. */
- client_list_t* list;
- void* address;
- fail_if (xmalloc(address = list, 1, client_list_t));
- /* Duplicate the protocol name so it can be accessed later. */
- if (xstrdup_nn(command, command))
- {
- saved_errno = errno, free(list), errno = saved_errno;
- fail_if (1);
- }
- /* Create list of servers, add server to list and add the protocol to the table. */
- command_key = (size_t)(void*)command;
- if (client_list_create(list, 1) ||
- client_list_add(list, client) ||
- (hash_table_put(&reg_table, command_key, (size_t)address) == 0))
- {
- saved_errno = errno;
- client_list_destroy(list);
- free(list);
- free(command);
- errno = saved_errno;
- fail_if (1);
+ int saved_errno;
+ client_list_t *list;
+ size_t address;
+ void *paddress;
+
+ if (has_key) {
+ /* Add server to protocol if the protocol is already in the table. */
+ address = hash_table_get(&reg_table, command_key);
+ list = (void *)address;
+ fail_if (client_list_add(list, client) < 0);
+ } else {
+ /* If the protocol is not already in the table. */
+
+ /* Allocate list of servers for the protocol. */
+ fail_if (xmalloc(paddress = list, 1, client_list_t));
+ /* Duplicate the protocol name so it can be accessed later. */
+ if (xstrdup_nn(command, command)) {
+ saved_errno = errno, free(list), errno = saved_errno;
+ fail_if (1);
+ }
+ /* Create list of servers, add server to list and add the protocol to the table. */
+ command_key = (size_t)(void*)command;
+ if (client_list_create(list, 1) ||
+ client_list_add(list, client) ||
+ !hash_table_put(&reg_table, command_key, (size_t)paddress)) {
+ saved_errno = errno;
+ client_list_destroy(list);
+ free(list);
+ free(command);
+ errno = saved_errno;
+ fail_if (1);
+ }
}
- }
-
- /* Notify slaves. */
- fail_if (advance_slaves(command));
-
- return 0;
- fail:
- xperror(*argv);
- return -1;
+
+ /* Notify slaves. */
+ fail_if (advance_slaves(command));
+
+ return 0;
+fail:
+ xperror(*argv);
+ return -1;
}
@@ -192,23 +183,23 @@ static int registry_action_add(int has_key, char* command, size_t command_key, u
* @param client The ID of the client that implements the server-side of the protocol
* @return Non-zero on error
*/
-static void registry_action_remove(size_t command_key, uint64_t client)
+static void
+registry_action_remove(size_t command_key, uint64_t client)
{
- hash_entry_t* entry = hash_table_get_entry(&reg_table, command_key);
- size_t address = entry->value;
- client_list_t* list = (client_list_t*)(void*)address;
-
- /* Remove server from protocol. */
- client_list_remove(list, client);
-
- /* Remove protocol if no servers support it anymore. */
- if (list->size == 0)
- {
- client_list_destroy(list);
- free(list);
- hash_table_remove(&reg_table, command_key);
- reg_table_free_key(entry->key);
- }
+ hash_entry_t *entry = hash_table_get_entry(&reg_table, command_key);
+ size_t address = entry->value;
+ client_list_t *list = (void *)address;
+
+ /* Remove server from protocol. */
+ client_list_remove(list, client);
+
+ /* Remove protocol if no servers support it anymore. */
+ if (!list->size) {
+ client_list_destroy(list);
+ free(list);
+ hash_table_remove(&reg_table, command_key);
+ reg_table_free_key(entry->key);
+ }
}
@@ -222,40 +213,46 @@ static void registry_action_remove(size_t command_key, uint64_t client)
* @param wait_set Table to fill with missing protocols if `action == 0`
* @return Non-zero on error
*/
-__attribute__((nonnull))
-static int registry_action_act(char* command, int action, uint64_t client, hash_table_t* wait_set)
+static int __attribute__((nonnull))
+registry_action_act(char *command, int action, uint64_t client, hash_table_t *wait_set)
{
- size_t command_key = (size_t)(void*)command;
- int has_key = hash_table_contains_key(&reg_table, command_key);
- int saved_errno;
-
- if (action == 1)
- {
- /* Register server to protocol. */
- fail_if (registry_action_add(has_key, command, command_key, client));
- }
- else if ((action == -1) && has_key)
- /* Unregister server from protocol. */
- registry_action_remove(command_key, client);
- else if ((action == 0) && !has_key)
- {
- /* Add protocol to wait set of not present in the protocol table. */
- fail_if (xstrdup_nn(command, command));
- command_key = (size_t)(void*)command;
- if (hash_table_put(wait_set, command_key, 1) == 0)
- if (errno)
- {
- saved_errno = errno, free(command), errno = saved_errno;
- fail_if (1);
- }
- }
-
- return 0;
- fail:
- xperror(*argv);
- if (action != 1)
- hash_table_destroy(wait_set, (free_func*)reg_table_free_key, NULL), free(wait_set);
- return -1;
+ size_t command_key = (size_t)(void *)command;
+ int has_key = hash_table_contains_key(&reg_table, command_key);
+ int saved_errno;
+
+ switch (action) {
+ case 1:
+ /* Register server to protocol. */
+ fail_if (registry_action_add(has_key, command, command_key, client));
+ break;
+ case -1:
+ if (has_key)
+ /* Unregister server from protocol. */
+ registry_action_remove(command_key, client);
+ break;
+ case 0:
+ if (has_key)
+ break;
+ /* Add protocol to wait set of not present in the protocol table. */
+ fail_if (xstrdup_nn(command, command));
+ command_key = (size_t)(void*)command;
+ if (!hash_table_put(wait_set, command_key, 1) && errno) {
+ saved_errno = errno, free(command), errno = saved_errno;
+ fail_if (1);
+ }
+ break;
+ default:
+ break;
+ }
+
+ return 0;
+fail:
+ xperror(*argv);
+ if (action != 1) {
+ hash_table_destroy(wait_set, (free_func *)reg_table_free_key, NULL);
+ free(wait_set);
+ }
+ return -1;
}
@@ -270,71 +267,57 @@ static int registry_action_act(char* command, int action, uint64_t client, hash_
* @return Zero on success -1 on error or interruption,
* `errno` will be set accordingly
*/
-__attribute__((nonnull))
-static int registry_action(size_t length, int action, const char* recv_client_id, const char* recv_message_id)
+static int __attribute__((nonnull))
+registry_action(size_t length, int action, const char *recv_client_id, const char *recv_message_id)
{
- char* payload = received.payload;
- uint64_t client = action ? parse_client_id(recv_client_id) : 0;
- hash_table_t* wait_set = NULL;
- size_t begin;
- int saved_errno;
-
-
- /* If ‘Action: wait’, create a set for the protocols that are not already available. */
-
- if (action == 0)
- {
- fail_if (xmalloc(wait_set, 1, hash_table_t));
- fail_if (hash_table_create(wait_set));
- wait_set->key_comparator = (compare_func*)string_comparator;
- wait_set->hasher = (hash_func*)string_hash;
- }
-
-
- /* If the payload buffer is full, increase it so we can fit another character. */
-
- if (received.payload_size == length)
- {
- fail_if (growalloc(old, received.payload, received.payload_size, char));
- payload = received.payload;
- }
-
-
- /* LF-terminate the payload, perhaps it did not have a terminal LF. */
-
- payload[length] = '\n';
-
-
- /* For all protocols in the payload, either add or remove
- them from or to the protocl table or the wait set. */
-
- for (begin = 0; begin < length;)
- {
- char* end = rawmemchr(payload + begin, '\n');
- size_t len = (size_t)(end - payload) - begin - 1;
- char* command = payload + begin;
-
- command[len] = '\0';
- begin += len + 1;
-
- if (len > 0)
- if (registry_action_act(command, action, client, wait_set))
- fail_if (wait_set = NULL, 1);
- }
-
-
- /* If ‘Action: wait’, start a new thread that waits for the protocols and the responds. */
-
- if (action == 0)
- if (start_slave(wait_set, recv_client_id, recv_message_id))
- fail_if (wait_set = NULL, 1);
-
- return 0;
- fail:
- saved_errno = errno;
- if (wait_set != NULL)
- hash_table_destroy(wait_set, NULL, NULL), free(wait_set);
- return errno = saved_errno, -1;
+ char *payload = received.payload;
+ uint64_t client = action ? parse_client_id(recv_client_id) : 0;
+ hash_table_t *wait_set = NULL;
+ size_t begin, len;
+ int saved_errno;
+ char *end, *command;
+
+ /* If ‘Action: wait’, create a set for the protocols that are not already available. */
+ if (!action) {
+ fail_if (xmalloc(wait_set, 1, hash_table_t));
+ fail_if (hash_table_create(wait_set));
+ wait_set->key_comparator = (compare_func*)string_comparator;
+ wait_set->hasher = (hash_func*)string_hash;
+ }
+
+ /* If the payload buffer is full, increase it so we can fit another character. */
+ if (received.payload_size == length) {
+ fail_if (growalloc(old, received.payload, received.payload_size, char));
+ payload = received.payload;
+ }
+
+ /* LF-terminate the payload, perhaps it did not have a terminal LF. */
+ payload[length] = '\n';
+
+ /* For all protocols in the payload, either add or remove
+ them from or to the protocl table or the wait set. */
+ for (begin = 0; begin < length;) {
+ end = rawmemchr(payload + begin, '\n');
+ len = (size_t)(end - payload) - begin - 1;
+ command = payload + begin;
+
+ command[len] = '\0';
+ begin += len + 1;
+
+ if (len > 0 && registry_action_act(command, action, client, wait_set))
+ fail_if (wait_set = NULL, 1);
+ }
+
+ /* If ‘Action: wait’, start a new thread that waits for the protocols and the responds. */
+ if (!action && start_slave(wait_set, recv_client_id, recv_message_id))
+ fail_if (wait_set = NULL, 1);
+
+ return 0;
+fail:
+ saved_errno = errno;
+ if (wait_set)
+ hash_table_destroy(wait_set, NULL, NULL), free(wait_set);
+ return errno = saved_errno, -1;
}
@@ -346,73 +329,66 @@ static int registry_action(size_t length, int action, const char* recv_client_id
* @return Zero on success, -1 on error or interruption,
* `errno` will be set accordingly
*/
-__attribute__((nonnull))
-static int list_registry(const char* recv_client_id, const char* recv_message_id)
+static int __attribute__((nonnull))
+list_registry(const char *recv_client_id, const char *recv_message_id)
{
- size_t ptr = 0, i;
- hash_entry_t* entry;
-
-
- /* Allocate the send buffer for the first time, it cannot be doubled if it is zero. */
-
- if (send_buffer_size == 0)
- {
- fail_if (xmalloc(send_buffer, 256, char));
- send_buffer_size = 256;
- }
-
-
- /* Add all protocols to the send buffer. */
-
- foreach_hash_table_entry (reg_table, i, entry)
- {
- size_t key = entry->key;
- char* command = (char*)(void*)key;
- size_t len = strlen(command);
-
- /* Make sure the send buffer can fit all protocols. */
- while (ptr + len + 1 >= send_buffer_size)
- fail_if (growalloc(old, send_buffer, send_buffer_size, char));
-
- memcpy(send_buffer + ptr, command, len * sizeof(char));
- ptr += len;
- send_buffer[ptr++] = '\n';
- }
-
-
- /* Make sure the message headers can fit the send buffer. */
-
- i = sizeof("To: \n"
- "In response to: \n"
- "Message ID: \n"
- "Origin command: register\n"
- "Length: \n"
- "\n") / sizeof(char) - 1;
- i += strlen(recv_message_id) + strlen(recv_client_id) + 10 + 19;
-
- while (ptr + i >= send_buffer_size)
- fail_if (growalloc(old, send_buffer, send_buffer_size, char));
-
-
- /* Construct message headers. */
- sprintf(send_buffer + ptr,
- "To: %s\n"
- "In response to: %s\n"
- "Message ID: %" PRIu32 "\n"
- "Origin command: register\n"
- "Length: %" PRIu64 "\n"
- "\n",
- recv_client_id, recv_message_id, message_id, ptr);
-
- /* Increase message ID. */
- with_mutex (slave_mutex, message_id = message_id == UINT32_MAX ? 0 : (message_id + 1););
-
- /* Send message. */
- fail_if (full_send(send_buffer + ptr, strlen(send_buffer + ptr)));
- fail_if (full_send(send_buffer, ptr));
- return 0;
- fail:
- return -1;
+ size_t ptr = 0, i, key, len;
+ hash_entry_t *entry;
+ char *command;
+
+ /* Allocate the send buffer for the first time, it cannot be doubled if it is zero. */
+ if (!send_buffer_size) {
+ fail_if (xmalloc(send_buffer, 256, char));
+ send_buffer_size = 256;
+ }
+
+ /* Add all protocols to the send buffer. */
+
+ foreach_hash_table_entry (reg_table, i, entry) {
+ key = entry->key;
+ command = (char*)(void*)key;
+ len = strlen(command);
+
+ /* Make sure the send buffer can fit all protocols. */
+ while (ptr + len + 1 >= send_buffer_size)
+ fail_if (growalloc(old, send_buffer, send_buffer_size, char));
+
+ memcpy(send_buffer + ptr, command, len * sizeof(char));
+ ptr += len;
+ send_buffer[ptr++] = '\n';
+ }
+
+ /* Make sure the message headers can fit the send buffer. */
+ i = sizeof("To: \n"
+ "In response to: \n"
+ "Message ID: \n"
+ "Origin command: register\n"
+ "Length: \n"
+ "\n") / sizeof(char) - 1;
+ i += strlen(recv_message_id) + strlen(recv_client_id) + 10 + 19;
+
+ while (ptr + i >= send_buffer_size)
+ fail_if (growalloc(old, send_buffer, send_buffer_size, char));
+
+ /* Construct message headers. */
+ sprintf(send_buffer + ptr,
+ "To: %s\n"
+ "In response to: %s\n"
+ "Message ID: %" PRIu32 "\n"
+ "Origin command: register\n"
+ "Length: %" PRIu64 "\n"
+ "\n",
+ recv_client_id, recv_message_id, message_id, ptr);
+
+ /* Increase message ID. */
+ with_mutex (slave_mutex, message_id = message_id == UINT32_MAX ? 0 : (message_id + 1););
+
+ /* Send message. */
+ fail_if (full_send(send_buffer + ptr, strlen(send_buffer + ptr)));
+ fail_if (full_send(send_buffer, ptr));
+ return 0;
+fail:
+ return -1;
}
@@ -422,71 +398,63 @@ static int list_registry(const char* recv_client_id, const char* recv_message_id
* @return Zero on success -1 on error or interruption,
* `errno` will be set accordingly
*/
-static int handle_register_message(void)
+static int
+handle_register_message(void)
{
- /* Fetch message headers. */
-
- const char* recv_client_id = NULL;
- const char* recv_message_id = NULL;
- const char* recv_length = NULL;
- const char* recv_action = NULL;
- size_t i, length = 0;
-
-#define __get_header(storage, header) \
- (startswith(received.headers[i], header)) \
- storage = received.headers[i] + strlen(header)
-
- for (i = 0; i < received.header_count; i++)
- {
- if __get_header(recv_client_id, "Client ID: ");
- else if __get_header(recv_message_id, "Message ID: ");
- else if __get_header(recv_length, "Length: ");
- else if __get_header(recv_action, "Action: ");
- else
- continue;
-
- /* Stop if we got all headers we recognised, except ‘Time to live’. */
- if (recv_client_id && recv_message_id && recv_length && recv_action)
- break;
- }
-
+ /* Fetch message headers. */
+ const char *recv_client_id = NULL;
+ const char *recv_message_id = NULL;
+ const char *recv_length = NULL;
+ const char *recv_action = NULL;
+ size_t i, length = 0;
+
+#define __get_header(storage, header)\
+ (startswith(received.headers[i], header))\
+ storage = received.headers[i] + strlen(header)
+
+ for (i = 0; i < received.header_count; i++) {
+ if __get_header(recv_client_id, "Client ID: ");
+ else if __get_header(recv_message_id, "Message ID: ");
+ else if __get_header(recv_length, "Length: ");
+ else if __get_header(recv_action, "Action: ");
+ else
+ continue;
+
+ /* Stop if we got all headers we recognised, except ‘Time to live’. */
+ if (recv_client_id && recv_message_id && recv_length && recv_action)
+ break;
+ }
+
#undef __get_header
-
-
- /* Validate headers. */
-
- if ((recv_client_id == NULL) || (strequals(recv_client_id, "0:0")))
- return eprint("received message from anonymous sender, ignoring."), 0;
- else if (strchr(recv_client_id, ':') == NULL)
- return eprint("received message from sender without a colon it its ID, ignoring, invalid ID."), 0;
- else if ((recv_length == NULL) && ((recv_action == NULL) || !strequals(recv_action, "list")))
- return eprint("received empty message without `Action: list`, ignoring, has no effect."), 0;
- else if (recv_message_id == NULL)
- return eprint("received message without ID, ignoring, master server is misbehaving."), 0;
-
-
- /* Get message length, and make sure the action is defined. */
-
- if (recv_length != NULL)
- length = atoz(recv_length);
- if (recv_action != NULL)
- recv_action = "add";
-
-
- /* Perform action. */
-
-#define __registry_action(action) registry_action(length, action, recv_client_id, recv_message_id)
-
- if (strequals(recv_action, "add")) return __registry_action(1);
- else if (strequals(recv_action, "remove")) return __registry_action(-1);
- else if (strequals(recv_action, "wait")) return __registry_action(0);
- else if (strequals(recv_action, "list")) return list_registry(recv_client_id, recv_message_id);
- else
- {
- eprint("received invalid action, ignoring.");
- return 0;
- }
-
+
+ /* Validate headers. */
+ if (!recv_client_id || strequals(recv_client_id, "0:0"))
+ return eprint("received message from anonymous sender, ignoring."), 0;
+ else if (!strchr(recv_client_id, ':'))
+ return eprint("received message from sender without a colon it its ID, ignoring, invalid ID."), 0;
+ else if (!recv_length && (!recv_action || !strequals(recv_action, "list")))
+ return eprint("received empty message without `Action: list`, ignoring, has no effect."), 0;
+ else if (!recv_message_id)
+ return eprint("received message without ID, ignoring, master server is misbehaving."), 0;
+
+ /* Get message length, and make sure the action is defined. */
+ if (recv_length)
+ length = atoz(recv_length);
+ if (recv_action)
+ recv_action = "add";
+
+ /* Perform action. */
+#define __registry_action(action) registry_action(length, action, recv_client_id, recv_message_id)
+
+ if (strequals(recv_action, "add")) return __registry_action(1);
+ else if (strequals(recv_action, "remove")) return __registry_action(-1);
+ else if (strequals(recv_action, "wait")) return __registry_action(0);
+ else if (strequals(recv_action, "list")) return list_registry(recv_client_id, recv_message_id);
+ else {
+ eprint("received invalid action, ignoring.");
+ return 0;
+ }
+
#undef __registry_action
}
@@ -499,16 +467,15 @@ static int handle_register_message(void)
*/
int handle_message(void)
{
- size_t i;
- for (i = 0; i < received.header_count; i++)
- if (strequals(received.headers[i], "Command: register"))
- {
- fail_if (handle_register_message());
+ size_t i;
+ for (i = 0; i < received.header_count; i++) {
+ if (strequals(received.headers[i], "Command: register")) {
+ fail_if (handle_register_message());
+ return 0;
+ }
+ }
+ fail_if (handle_close_message());
return 0;
- }
- fail_if (handle_close_message());
- return 0;
fail:
- return -1;
+ return -1;
}
-
diff --git a/src/mds-registry/registry.h b/src/mds-registry/registry.h
index 319d615..e35b14f 100644
--- a/src/mds-registry/registry.h
+++ b/src/mds-registry/registry.h
@@ -29,4 +29,3 @@ int handle_message(void);
#endif
-
diff --git a/src/mds-registry/signals.c b/src/mds-registry/signals.c
index 024e2f7..22be1aa 100644
--- a/src/mds-registry/signals.c
+++ b/src/mds-registry/signals.c
@@ -36,21 +36,20 @@
*/
void signal_all(int signo)
{
- pthread_t current_thread;
- ssize_t node;
-
- current_thread = pthread_self();
-
- if (pthread_equal(current_thread, master_thread) == 0)
- pthread_kill(master_thread, signo);
-
- with_mutex (slave_mutex,
- foreach_linked_list_node (slave_list, node)
- {
- slave_t* value = (slave_t*)(void*)(slave_list.values[node]);
- if (pthread_equal(current_thread, value->thread) == 0)
- pthread_kill(value->thread, signo);
- }
- );
+ pthread_t current_thread;
+ ssize_t node;
+ slave_t *value;
+
+ current_thread = pthread_self();
+
+ if (!pthread_equal(current_thread, master_thread))
+ pthread_kill(master_thread, signo);
+
+ with_mutex (slave_mutex,
+ foreach_linked_list_node (slave_list, node) {
+ value = (slave_t*)(void*)(slave_list.values[node]);
+ if (pthread_equal(current_thread, value->thread) == 0)
+ pthread_kill(value->thread, signo);
+ }
+ );
}
-
diff --git a/src/mds-registry/signals.h b/src/mds-registry/signals.h
index 38d3121..f2b5ccd 100644
--- a/src/mds-registry/signals.h
+++ b/src/mds-registry/signals.h
@@ -23,4 +23,3 @@
#endif
-
diff --git a/src/mds-registry/slave.c b/src/mds-registry/slave.c
index b936c29..be77c5b 100644
--- a/src/mds-registry/slave.c
+++ b/src/mds-registry/slave.c
@@ -31,40 +31,38 @@
#include <inttypes.h>
-
/**
* Notify the waiting client that it may resume
*
* @param slave The slave
* @return Non-zero, `errno` will be set accordingly
*/
-__attribute__((nonnull))
-static int slave_notify_client(slave_t* slave)
+static int __attribute__((nonnull))
+slave_notify_client(slave_t *slave)
{
- char buf[sizeof("To: %s\nIn response to: %s\nMessage ID: %" PRIu32 "\nOrigin command: register\n\n")
- / sizeof(char) + 41];
- size_t ptr = 0, sent, left;
-
- /* Construct message headers. */
- sprintf(buf, "To: %s\nIn response to: %s\nMessage ID: %" PRIu32 "\nOrigin command: register\n\n",
- slave->client_id, slave->message_id, message_id);
-
- /* Increase message ID. */
- message_id = message_id == UINT32_MAX ? 0 : (message_id + 1);
-
- /* Send message to client. */
- left = strlen(buf);
- while (left > 0)
- {
- sent = send_message(socket_fd, buf + ptr, left);
- fail_if ((sent < left) && errno && (errno != EINTR));
- left -= sent;
- ptr += sent;
- }
-
- return 0;
- fail:
- return -1;
+ char buf[sizeof("To: %s\nIn response to: %s\nMessage ID: %" PRIu32 "\nOrigin command: register\n\n")
+ / sizeof(char) + 41];
+ size_t ptr = 0, sent, left;
+
+ /* Construct message headers. */
+ sprintf(buf, "To: %s\nIn response to: %s\nMessage ID: %" PRIu32 "\nOrigin command: register\n\n",
+ slave->client_id, slave->message_id, message_id);
+
+ /* Increase message ID. */
+ message_id = message_id == UINT32_MAX ? 0 : (message_id + 1);
+
+ /* Send message to client. */
+ left = strlen(buf);
+ while (left > 0) {
+ sent = send_message(socket_fd, buf + ptr, left);
+ fail_if ((sent < left) && errno && (errno != EINTR));
+ left -= sent;
+ ptr += sent;
+ }
+
+ return 0;
+fail:
+ return -1;
}
@@ -74,60 +72,58 @@ static int slave_notify_client(slave_t* slave)
* @param data Input data
* @return Output data
*/
-static void* slave_loop(void* data)
+static void *
+slave_loop(void *data)
{
- /* pthread_cond_timedwait is required to handle re-exec and termination because
- pthread_cond_timedwait and pthread_cond_wait ignore interruptions via signals. */
- struct timespec timeout =
- {
- .tv_sec = 1,
- .tv_nsec = 0
- };
- slave_t* slave = data;
- struct timespec now;
-
- if (slave->closed)
- goto done;
-
- /* Set up traps for especially handled signals. */
- fail_if (trap_signals() < 0);
-
- fail_if ((errno = pthread_mutex_lock(&slave_mutex)));
-
- while (!reexecing && !terminating)
- {
- if ((slave->wait_set->size == 0) || slave->closed)
- break;
- if (slave->timed)
- {
- fail_if (monotone(&now));
- if (now.tv_sec > slave->dethklok.tv_sec)
- break;
- if (now.tv_sec == slave->dethklok.tv_sec)
- if (now.tv_nsec >= slave->dethklok.tv_nsec)
- break;
+ /* pthread_cond_timedwait is required to handle re-exec and termination because
+ pthread_cond_timedwait and pthread_cond_wait ignore interruptions via signals. */
+ struct timespec timeout = {
+ .tv_sec = 1,
+ .tv_nsec = 0
+ };
+ slave_t* slave = data;
+ struct timespec now;
+
+ if (slave->closed)
+ goto done;
+
+ /* Set up traps for especially handled signals. */
+ fail_if (trap_signals() < 0);
+
+ fail_if ((errno = pthread_mutex_lock(&slave_mutex)));
+
+ while (!reexecing && !terminating) {
+ if (!slave->wait_set->size || slave->closed)
+ break;
+ if (slave->timed) {
+ fail_if (monotone(&now));
+ if (now.tv_sec > slave->dethklok.tv_sec)
+ break;
+ if (now.tv_sec == slave->dethklok.tv_sec)
+ if (now.tv_nsec >= slave->dethklok.tv_nsec)
+ break;
+ }
+ pthread_cond_timedwait(&slave_cond, &slave_mutex, &timeout);
}
- pthread_cond_timedwait(&slave_cond, &slave_mutex, &timeout);
- }
-
- if (!(slave->closed) && (slave->wait_set->size == 0))
- slave_notify_client(slave);
-
- pthread_mutex_unlock(&slave_mutex);
-
- goto done;
-
- fail:
- xperror(*argv);
- done:
- with_mutex (slave_mutex,
- if (!reexecing)
- linked_list_remove(&slave_list, slave->node);
- running_slaves--;
- if (running_slaves == 0)
- pthread_cond_signal(&slave_cond);
- );
- return NULL;
+
+ if (!slave->closed && !slave->wait_set->size)
+ slave_notify_client(slave);
+
+ pthread_mutex_unlock(&slave_mutex);
+
+ goto done;
+
+fail:
+ xperror(*argv);
+done:
+ with_mutex (slave_mutex,
+ if (!reexecing)
+ linked_list_remove(&slave_list, slave->node);
+ running_slaves--;
+ if (!running_slaves)
+ pthread_cond_signal(&slave_cond);
+ );
+ return NULL;
}
@@ -137,26 +133,27 @@ static void* slave_loop(void* data)
* @param slave The slave
* @return Non-zero on error, `errno` will be set accordingly
*/
-int start_created_slave(slave_t* restrict slave)
+int
+start_created_slave(slave_t *restrict slave)
{
- int locked = 0;
-
- fail_if ((errno = pthread_mutex_lock(&slave_mutex)));
- locked = 1;
-
- fail_if ((errno = pthread_create(&(slave->thread), NULL, slave_loop, (void*)(intptr_t)slave)));
-
- if ((errno = pthread_detach(slave->thread)))
- xperror(*argv);
-
- running_slaves++;
- pthread_mutex_unlock(&slave_mutex);
-
- return 0;
- fail:
- if (locked)
- pthread_mutex_unlock(&slave_mutex);
- return -1;
+ int locked = 0;
+
+ fail_if ((errno = pthread_mutex_lock(&slave_mutex)));
+ locked = 1;
+
+ fail_if ((errno = pthread_create(&(slave->thread), NULL, slave_loop, (void*)(intptr_t)slave)));
+
+ if ((errno = pthread_detach(slave->thread)))
+ xperror(*argv);
+
+ running_slaves++;
+ pthread_mutex_unlock(&slave_mutex);
+
+ return 0;
+fail:
+ if (locked)
+ pthread_mutex_unlock(&slave_mutex);
+ return -1;
}
@@ -168,49 +165,50 @@ int start_created_slave(slave_t* restrict slave)
* @param recv_message_id The ID of the message that triggered the waiting
* @return Non-zero on error
*/
-int start_slave(hash_table_t* restrict wait_set, const char* restrict recv_client_id,
- const char* restrict recv_message_id)
+int
+start_slave(hash_table_t *restrict wait_set, const char *restrict recv_client_id, const char *restrict recv_message_id)
{
- slave_t* slave = slave_create(wait_set, recv_client_id, recv_message_id);
- size_t slave_address, i;
- ssize_t node = LINKED_LIST_UNUSED;
- int locked = 0;
-
- fail_if (slave == NULL);
- fail_if ((errno = pthread_mutex_lock(&slave_mutex)));
- locked = 1;
-
- slave_address = (size_t)(void*)slave;
- slave->node = node = linked_list_insert_end(&slave_list, slave_address);
- fail_if (slave->node == LINKED_LIST_UNUSED);
-
- for (i = 0; i < received.header_count; i++)
- if (startswith(received.headers[i], "Time to live: "))
- {
- const char* ttl = received.headers[i] + strlen("Time to live: ");
- slave->timed = 1;
- fail_if (monotone(&(slave->dethklok)));
- slave->dethklok.tv_sec += (time_t)atoll(ttl);
- /* It should really be `atol`, but we want to be future-proof. */
- break;
- }
-
- fail_if ((errno = pthread_create(&(slave->thread), NULL, slave_loop, (void*)(intptr_t)slave)));
-
- if ((errno = pthread_detach(slave->thread)))
- xperror(*argv);
-
- running_slaves++;
- pthread_mutex_unlock(&slave_mutex);
-
- return 0;
- fail:
- xperror(*argv);
- if (locked)
- pthread_mutex_unlock(&slave_mutex);
- if (node != LINKED_LIST_UNUSED)
- linked_list_remove(&slave_list, node);
- return -1;
+ slave_t *slave = slave_create(wait_set, recv_client_id, recv_message_id);
+ size_t slave_address, i;
+ ssize_t node = LINKED_LIST_UNUSED;
+ int locked = 0;
+ const char* ttl;
+
+ fail_if (!slave);
+ fail_if ((errno = pthread_mutex_lock(&slave_mutex)));
+ locked = 1;
+
+ slave_address = (size_t)(void*)slave;
+ slave->node = node = linked_list_insert_end(&slave_list, slave_address);
+ fail_if (slave->node == LINKED_LIST_UNUSED);
+
+ for (i = 0; i < received.header_count; i++) {
+ if (startswith(received.headers[i], "Time to live: ")) {
+ ttl = received.headers[i] + strlen("Time to live: ");
+ slave->timed = 1;
+ fail_if (monotone(&(slave->dethklok)));
+ slave->dethklok.tv_sec += (time_t)atoll(ttl);
+ /* It should really be `atol`, but we want to be future-proof. */
+ break;
+ }
+ }
+
+ fail_if ((errno = pthread_create(&(slave->thread), NULL, slave_loop, (void*)(intptr_t)slave)));
+
+ if ((errno = pthread_detach(slave->thread)))
+ xperror(*argv);
+
+ running_slaves++;
+ pthread_mutex_unlock(&slave_mutex);
+
+ return 0;
+fail:
+ xperror(*argv);
+ if (locked)
+ pthread_mutex_unlock(&slave_mutex);
+ if (node != LINKED_LIST_UNUSED)
+ linked_list_remove(&slave_list, node);
+ return -1;
}
@@ -219,17 +217,18 @@ int start_slave(hash_table_t* restrict wait_set, const char* restrict recv_clien
*
* @param client The client's ID
*/
-void close_slaves(uint64_t client)
+void
+close_slaves(uint64_t client)
{
- ssize_t node;
- with_mutex (slave_mutex,
- foreach_linked_list_node (slave_list, node)
- {
- slave_t* slave = (slave_t*)(void*)(slave_list.values[node]);
- if (slave->client == client)
- slave->closed = 1;
- }
- );
+ ssize_t node;
+ slave_t *slave;
+ with_mutex (slave_mutex,
+ foreach_linked_list_node (slave_list, node) {
+ slave = (slave_t *)(void *)(slave_list.values[node]);
+ if (slave->client == client)
+ slave->closed = 1;
+ }
+ );
}
@@ -239,31 +238,31 @@ void close_slaves(uint64_t client)
* @param command The protocol
* @return Non-zero on error, `ernno`will be set accordingly
*/
-int advance_slaves(char* command)
+int
+advance_slaves(char *command)
{
- size_t key = (size_t)(void*)command;
- int signal_slaves = 0;
- ssize_t node;
-
- fail_if ((errno = pthread_mutex_lock(&slave_mutex)));
-
- foreach_linked_list_node (slave_list, node)
- {
- slave_t* slave = (slave_t*)(void*)(slave_list.values[node]);
- if (hash_table_contains_key(slave->wait_set, key))
- {
- hash_table_remove(slave->wait_set, key);
- signal_slaves |= slave->wait_set == 0;
+ size_t key = (size_t)(void *)command;
+ int signal_slaves = 0;
+ ssize_t node;
+ slave_t *slave;
+
+ fail_if ((errno = pthread_mutex_lock(&slave_mutex)));
+
+ foreach_linked_list_node (slave_list, node) {
+ slave = (void *)(slave_list.values[node]);
+ if (hash_table_contains_key(slave->wait_set, key)) {
+ hash_table_remove(slave->wait_set, key);
+ signal_slaves |= slave->wait_set == 0;
+ }
}
- }
-
- if (signal_slaves)
- pthread_cond_broadcast(&slave_cond);
- pthread_mutex_unlock(&slave_mutex);
- return 0;
- fail:
- return -1;
+ if (signal_slaves)
+ pthread_cond_broadcast(&slave_cond);
+
+ pthread_mutex_unlock(&slave_mutex);
+ return 0;
+fail:
+ return -1;
}
@@ -275,27 +274,27 @@ int advance_slaves(char* command)
* @param recv_message_id The ID of the message that triggered the waiting
* @return The slave, `NULL` on error, `errno` will be set accordingly
*/
-slave_t* slave_create(hash_table_t* restrict wait_set, const char* restrict recv_client_id,
- const char* restrict recv_message_id)
+slave_t *
+slave_create(hash_table_t *restrict wait_set, const char *restrict recv_client_id, const char *restrict recv_message_id)
{
- slave_t* restrict rc = NULL;
- int saved_errno;
-
- fail_if (xmalloc(rc, 1, slave_t));
-
- slave_initialise(rc);
- rc->wait_set = wait_set;
- rc->client = parse_client_id(recv_client_id);
-
- fail_if (xstrdup_nn(rc->client_id, recv_client_id));
- fail_if (xstrdup_nn(rc->message_id, recv_message_id));
-
- return rc;
-
- fail:
- saved_errno = errno;
- slave_destroy(rc), free(rc);
- return errno = saved_errno, NULL;
+ slave_t *restrict rc = NULL;
+ int saved_errno;
+
+ fail_if (xmalloc(rc, 1, slave_t));
+
+ slave_initialise(rc);
+ rc->wait_set = wait_set;
+ rc->client = parse_client_id(recv_client_id);
+
+ fail_if (xstrdup_nn(rc->client_id, recv_client_id));
+ fail_if (xstrdup_nn(rc->message_id, recv_message_id));
+
+ return rc;
+
+fail:
+ saved_errno = errno;
+ slave_destroy(rc), free(rc);
+ return errno = saved_errno, NULL;
}
@@ -304,15 +303,16 @@ slave_t* slave_create(hash_table_t* restrict wait_set, const char* restrict recv
*
* @param this Memory slot in which to store the new slave information
*/
-void slave_initialise(slave_t* restrict this)
+void
+slave_initialise(slave_t *restrict this)
{
- this->wait_set = NULL;
- this->client_id = NULL;
- this->message_id = NULL;
- this->closed = 0;
- this->dethklok.tv_sec = 0;
- this->dethklok.tv_nsec = 0;
- this->timed = 0;
+ this->wait_set = NULL;
+ this->client_id = NULL;
+ this->message_id = NULL;
+ this->closed = 0;
+ this->dethklok.tv_sec = 0;
+ this->dethklok.tv_nsec = 0;
+ this->timed = 0;
}
@@ -321,23 +321,23 @@ void slave_initialise(slave_t* restrict this)
*
* @param this The slave information
*/
-void slave_destroy(slave_t* restrict this)
+void
+slave_destroy(slave_t *restrict this)
{
- if (this == NULL)
- return;
-
- if (this->wait_set != NULL)
- {
- hash_table_destroy(this->wait_set, (free_func*)reg_table_free_key, NULL);
- free(this->wait_set);
- this->wait_set = NULL;
- }
-
- free(this->client_id);
- this->client_id = NULL;
-
- free(this->message_id);
- this->message_id = NULL;
+ if (!this)
+ return;
+
+ if (this->wait_set) {
+ hash_table_destroy(this->wait_set, (free_func*)reg_table_free_key, NULL);
+ free(this->wait_set);
+ this->wait_set = NULL;
+ }
+
+ free(this->client_id);
+ this->client_id = NULL;
+
+ free(this->message_id);
+ this->message_id = NULL;
}
@@ -347,23 +347,24 @@ void slave_destroy(slave_t* restrict this)
* @param this The slave information
* @return The number of bytes to allocate to the output buffer
*/
-size_t slave_marshal_size(const slave_t* restrict this)
+size_t
+slave_marshal_size(const slave_t *restrict this)
{
- size_t rc;
- hash_entry_t* restrict entry;
- size_t n;
-
- rc = sizeof(int) + sizeof(sig_atomic_t) + sizeof(ssize_t) + sizeof(size_t) + sizeof(uint64_t);
- rc += sizeof(int) + sizeof(time_t) + sizeof(long);
- rc += (strlen(this->client_id) + strlen(this->message_id) + 2) * sizeof(char);
-
- foreach_hash_table_entry (*(this->wait_set), n, entry)
- {
- char* protocol = (char*)(void*)(entry->key);
- rc += strlen(protocol) + 1;
- }
-
- return rc;
+ size_t rc;
+ hash_entry_t *restrict entry;
+ size_t n;
+ char *protocol;
+
+ rc = sizeof(int) + sizeof(sig_atomic_t) + sizeof(ssize_t) + sizeof(size_t) + sizeof(uint64_t);
+ rc += sizeof(int) + sizeof(time_t) + sizeof(long);
+ rc += (strlen(this->client_id) + strlen(this->message_id) + 2) * sizeof(char);
+
+ foreach_hash_table_entry (*(this->wait_set), n, entry) {
+ protocol = (void *)(entry->key);
+ rc += strlen(protocol) + 1;
+ }
+
+ return rc;
}
@@ -374,36 +375,37 @@ size_t slave_marshal_size(const slave_t* restrict this)
* @param data Output buffer for the marshalled data
* @return The number of bytes that have been written (everything will be written)
*/
-size_t slave_marshal(const slave_t* restrict this, char* restrict data)
+size_t
+slave_marshal(const slave_t *restrict this, char *restrict data)
{
- hash_entry_t* restrict entry;
- size_t n;
-
- buf_set_next(data, int, SLAVE_T_VERSION);
- buf_set_next(data, sig_atomic_t, this->closed);
- buf_set_next(data, ssize_t, this->node);
- buf_set_next(data, uint64_t, this->client);
- buf_set_next(data, int, this->timed);
- buf_set_next(data, time_t, this->dethklok.tv_sec);
- buf_set_next(data, long, this->dethklok.tv_nsec);
-
- memcpy(data, this->client_id, (strlen(this->client_id) + 1) * sizeof(char));
- data += strlen(this->client_id) + 1;
-
- memcpy(data, this->message_id, (strlen(this->message_id) + 1) * sizeof(char));
- data += strlen(this->message_id) + 1;
-
- n = this->wait_set->size;
- buf_set_next(data, size_t, n);
-
- foreach_hash_table_entry (*(this->wait_set), n, entry)
- {
- char* restrict protocol = (char*)(void*)(entry->key);
- memcpy(data, protocol, (strlen(protocol) + 1) * sizeof(char));
- data += strlen(protocol) + 1;
- }
-
- return slave_marshal_size(this);
+ hash_entry_t *restrict entry;
+ size_t n;
+ char *restrict protocol;
+
+ buf_set_next(data, int, SLAVE_T_VERSION);
+ buf_set_next(data, sig_atomic_t, this->closed);
+ buf_set_next(data, ssize_t, this->node);
+ buf_set_next(data, uint64_t, this->client);
+ buf_set_next(data, int, this->timed);
+ buf_set_next(data, time_t, this->dethklok.tv_sec);
+ buf_set_next(data, long, this->dethklok.tv_nsec);
+
+ memcpy(data, this->client_id, (strlen(this->client_id) + 1) * sizeof(char));
+ data += strlen(this->client_id) + 1;
+
+ memcpy(data, this->message_id, (strlen(this->message_id) + 1) * sizeof(char));
+ data += strlen(this->message_id) + 1;
+
+ n = this->wait_set->size;
+ buf_set_next(data, size_t, n);
+
+ foreach_hash_table_entry (*(this->wait_set), n, entry) {
+ protocol = (void *)(entry->key);
+ memcpy(data, protocol, (strlen(protocol) + 1) * sizeof(char));
+ data += strlen(protocol) + 1;
+ }
+
+ return slave_marshal_size(this);
}
@@ -415,55 +417,55 @@ size_t slave_marshal(const slave_t* restrict this, char* restrict data)
* @return Zero on error, `errno` will be set accordingly, otherwise the
* number of read bytes. Destroy the slave information on error.
*/
-size_t slave_unmarshal(slave_t* restrict this, char* restrict data)
+size_t
+slave_unmarshal(slave_t *restrict this, char *restrict data)
{
- size_t key, n, m, rc = 2 * sizeof(int) + sizeof(ssize_t) + sizeof(size_t) + sizeof(uint64_t);
- char* protocol = NULL;
- int saved_errno;
-
- this->wait_set = NULL;
- this->client_id = NULL;
- this->message_id = NULL;
-
- /* buf_get_next(data, int, SLAVE_T_VERSION); */
- buf_next(data, int, 1);
-
- buf_get_next(data, sig_atomic_t, this->closed);
- buf_get_next(data, ssize_t, this->node);
- buf_get_next(data, uint64_t, this->client);
- buf_get_next(data, int, this->timed);
- buf_get_next(data, time_t, this->dethklok.tv_sec);
- buf_get_next(data, long, this->dethklok.tv_nsec);
-
- n = strlen((char*)data) + 1;
- fail_if (xmemdup(this->client_id, data, n, char));
- data += n, rc += n * sizeof(char);
-
- n = strlen((char*)data) + 1;
- fail_if (xmemdup(this->message_id, data, n, char));
- data += n, rc += n * sizeof(char);
-
- fail_if (xmalloc(this->wait_set, 1, hash_table_t));
- fail_if (hash_table_create(this->wait_set));
-
- buf_get_next(data, size_t, m);
-
- while (m--)
- {
- n = strlen((char*)data) + 1;
- fail_if (xmemdup(protocol, data, n, char));
- data += n, rc += n * sizeof(char);
-
- key = (size_t)(void*)protocol;
- if (hash_table_put(this->wait_set, key, 1) == 0)
- fail_if (errno);
- }
-
- return rc;
- fail:
- saved_errno = errno;
- free(protocol);
- return errno = saved_errno, (size_t)0;
+ size_t key, n, m, rc = 2 * sizeof(int) + sizeof(ssize_t) + sizeof(size_t) + sizeof(uint64_t);
+ char *protocol = NULL;
+ int saved_errno;
+
+ this->wait_set = NULL;
+ this->client_id = NULL;
+ this->message_id = NULL;
+
+ /* buf_get_next(data, int, SLAVE_T_VERSION); */
+ buf_next(data, int, 1);
+
+ buf_get_next(data, sig_atomic_t, this->closed);
+ buf_get_next(data, ssize_t, this->node);
+ buf_get_next(data, uint64_t, this->client);
+ buf_get_next(data, int, this->timed);
+ buf_get_next(data, time_t, this->dethklok.tv_sec);
+ buf_get_next(data, long, this->dethklok.tv_nsec);
+
+ n = strlen((char *)data) + 1;
+ fail_if (xmemdup(this->client_id, data, n, char));
+ data += n, rc += n * sizeof(char);
+
+ n = strlen((char *)data) + 1;
+ fail_if (xmemdup(this->message_id, data, n, char));
+ data += n, rc += n * sizeof(char);
+
+ fail_if (xmalloc(this->wait_set, 1, hash_table_t));
+ fail_if (hash_table_create(this->wait_set));
+
+ buf_get_next(data, size_t, m);
+
+ while (m--) {
+ n = strlen((char *)data) + 1;
+ fail_if (xmemdup(protocol, data, n, char));
+ data += n, rc += n * sizeof(char);
+
+ key = (size_t)(void*)protocol;
+ if (!hash_table_put(this->wait_set, key, 1))
+ fail_if (errno);
+ }
+
+ return rc;
+fail:
+ saved_errno = errno;
+ free(protocol);
+ return errno = saved_errno, (size_t)0;
}
@@ -473,35 +475,34 @@ size_t slave_unmarshal(slave_t* restrict this, char* restrict data)
* @param data In buffer with the marshalled data
* @return The number of read bytes
*/
-size_t slave_unmarshal_skip(char* restrict data)
+size_t
+slave_unmarshal_skip(char *restrict data)
{
- size_t n, m, rc = 2 * sizeof(int) + sizeof(ssize_t) + sizeof(size_t) + sizeof(uint64_t);
- rc += sizeof(int) + sizeof(time_t) + sizeof(long);
-
- /* buf_get_next(data, int, SLAVE_T_VERSION); */
- buf_next(data, int, 1);
-
- buf_next(data, sig_atomic_t, 1);
- buf_next(data, ssize_t, 1);
- buf_next(data, uint64_t, 1);
- buf_next(data, int, 1);
- buf_next(data, time_t, 1);
- buf_next(data, long, 1);
-
- n = (strlen((char*)data) + 1) * sizeof(char);
- data += n, rc += n;
-
- n = (strlen((char*)data) + 1) * sizeof(char);
- data += n, rc += n;
-
- buf_get_next(data, size_t, m);
-
- while (m--)
- {
- n = (strlen((char*)data) + 1) * sizeof(char);
- data += n, rc += n;
- }
+ size_t n, m, rc = 2 * sizeof(int) + sizeof(ssize_t) + sizeof(size_t) + sizeof(uint64_t);
+ rc += sizeof(int) + sizeof(time_t) + sizeof(long);
+
+ /* buf_get_next(data, int, SLAVE_T_VERSION); */
+ buf_next(data, int, 1);
+
+ buf_next(data, sig_atomic_t, 1);
+ buf_next(data, ssize_t, 1);
+ buf_next(data, uint64_t, 1);
+ buf_next(data, int, 1);
+ buf_next(data, time_t, 1);
+ buf_next(data, long, 1);
- return rc;
-}
+ n = (strlen((char *)data) + 1) * sizeof(char);
+ data += n, rc += n;
+
+ n = (strlen((char *)data) + 1) * sizeof(char);
+ data += n, rc += n;
+
+ buf_get_next(data, size_t, m);
+ while (m--) {
+ n = (strlen((char*)data) + 1) * sizeof(char);
+ data += n, rc += n;
+ }
+
+ return rc;
+}
diff --git a/src/mds-registry/slave.h b/src/mds-registry/slave.h
index 14ff04b..616575d 100644
--- a/src/mds-registry/slave.h
+++ b/src/mds-registry/slave.h
@@ -30,59 +30,59 @@
-#define SLAVE_T_VERSION 0
+#define SLAVE_T_VERSION 0
/**
* Slave information, a thread waiting for protocols to become available
*/
typedef struct slave
{
- /**
- * Set of protocols for which to wait that they become available
- */
- hash_table_t* wait_set;
-
- /**
- * The ID of the waiting client
- */
- uint64_t client;
-
- /**
- * The ID of the waiting client
- */
- char* client_id;
-
- /**
- * The ID of the message that triggered the waiting
- */
- char* message_id;
-
- /**
- * The slave's node in the linked list of slaves
- */
- ssize_t node;
-
- /**
- * Whether the client has been closed
- */
- volatile sig_atomic_t closed;
-
- /**
- * The slave thread
- */
- pthread_t thread;
-
- /**
- * The time slave should die if its condition
- * has not be meet at that time
- */
- struct timespec dethklok;
-
- /**
- * Whether `dethklok` should apply
- */
- int timed;
-
+ /**
+ * Set of protocols for which to wait that they become available
+ */
+ hash_table_t *wait_set;
+
+ /**
+ * The ID of the waiting client
+ */
+ uint64_t client;
+
+ /**
+ * The ID of the waiting client
+ */
+ char *client_id;
+
+ /**
+ * The ID of the message that triggered the waiting
+ */
+ char *message_id;
+
+ /**
+ * The slave's node in the linked list of slaves
+ */
+ ssize_t node;
+
+ /**
+ * Whether the client has been closed
+ */
+ volatile sig_atomic_t closed;
+
+ /**
+ * The slave thread
+ */
+ pthread_t thread;
+
+ /**
+ * The time slave should die if its condition
+ * has not be meet at that time
+ */
+ struct timespec dethklok;
+
+ /**
+ * Whether `dethklok` should apply
+ */
+ int timed;
+
} slave_t;
@@ -94,7 +94,7 @@ typedef struct slave
* @return Non-zero on error, `errno` will be set accordingly
*/
__attribute__((nonnull))
-int start_created_slave(slave_t* restrict slave);
+int start_created_slave(slave_t *restrict slave);
/**
* Start a slave thread
@@ -105,8 +105,8 @@ int start_created_slave(slave_t* restrict slave);
* @return Non-zero on error
*/
__attribute__((nonnull))
-int start_slave(hash_table_t* restrict wait_set, const char* restrict recv_client_id,
- const char* restrict recv_message_id);
+int start_slave(hash_table_t *restrict wait_set, const char *restrict recv_client_id,
+ const char *restrict recv_message_id);
/**
* Close all slaves associated with a client
@@ -122,7 +122,7 @@ void close_slaves(uint64_t client);
* @return Non-zero on error, `ernno`will be set accordingly
*/
__attribute__((nonnull))
-int advance_slaves(char* command);
+int advance_slaves(char *command);
/**
* Create a slave
@@ -133,8 +133,8 @@ int advance_slaves(char* command);
* @return The slave, `NULL` on error, `errno` will be set accordingly
*/
__attribute__((nonnull))
-slave_t* slave_create(hash_table_t* restrict wait_set, const char* restrict recv_client_id,
- const char* restrict recv_message_id);
+slave_t *slave_create(hash_table_t *restrict wait_set, const char *restrict recv_client_id,
+ const char *restrict recv_message_id);
/**
@@ -143,14 +143,14 @@ slave_t* slave_create(hash_table_t* restrict wait_set, const char* restrict recv
* @param this Memory slot in which to store the new slave information
*/
__attribute__((nonnull))
-void slave_initialise(slave_t* restrict this);
+void slave_initialise(slave_t *restrict this);
/**
* Release all resources assoicated with a slave
*
* @param this The slave information
*/
-void slave_destroy(slave_t* restrict this);
+void slave_destroy(slave_t *restrict this);
/**
* Calculate the buffer size need to marshal slave information
@@ -159,7 +159,7 @@ void slave_destroy(slave_t* restrict this);
* @return The number of bytes to allocate to the output buffer
*/
__attribute__((pure, nonnull))
-size_t slave_marshal_size(const slave_t* restrict this);
+size_t slave_marshal_size(const slave_t *restrict this);
/**
* Marshals slave information
@@ -169,7 +169,7 @@ size_t slave_marshal_size(const slave_t* restrict this);
* @return The number of bytes that have been written (everything will be written)
*/
__attribute__((nonnull))
-size_t slave_marshal(const slave_t* restrict this, char* restrict data);
+size_t slave_marshal(const slave_t *restrict this, char *restrict data);
/**
* Unmarshals slave information
@@ -180,7 +180,7 @@ size_t slave_marshal(const slave_t* restrict this, char* restrict data);
* number of read bytes. Destroy the slave information on error.
*/
__attribute__((nonnull))
-size_t slave_unmarshal(slave_t* restrict this, char* restrict data);
+size_t slave_unmarshal(slave_t *restrict this, char *restrict data);
/**
* Pretend to unmarshal slave information
@@ -189,8 +189,7 @@ size_t slave_unmarshal(slave_t* restrict this, char* restrict data);
* @return The number of read bytes
*/
__attribute__((pure, nonnull))
-size_t slave_unmarshal_skip(char* restrict data);
+size_t slave_unmarshal_skip(char *restrict data);
#endif
-
diff --git a/src/mds-registry/util.c b/src/mds-registry/util.c
index e49867f..4889a09 100644
--- a/src/mds-registry/util.c
+++ b/src/mds-registry/util.c
@@ -39,8 +39,8 @@
*/
void reg_table_free_key(size_t obj)
{
- char* command = (char*)(void*)obj;
- free(command);
+ char *command = (void *)obj;
+ free(command);
}
@@ -51,8 +51,7 @@ void reg_table_free_key(size_t obj)
*/
void reg_table_free_value(size_t obj)
{
- client_list_t* list = (client_list_t*)(void*)obj;
- client_list_destroy(list);
- free(list);
+ client_list_t *list = (void *)obj;
+ client_list_destroy(list);
+ free(list);
}
-
diff --git a/src/mds-registry/util.h b/src/mds-registry/util.h
index c4be42a..222b887 100644
--- a/src/mds-registry/util.h
+++ b/src/mds-registry/util.h
@@ -38,5 +38,3 @@ void reg_table_free_value(size_t obj);
#endif
-
-