aboutsummaryrefslogtreecommitdiffstats
path: root/src/mds-registry.c
diff options
context:
space:
mode:
Diffstat (limited to '')
-rw-r--r--src/mds-registry.c249
1 files changed, 245 insertions, 4 deletions
diff --git a/src/mds-registry.c b/src/mds-registry.c
index e0d3966..8bd7057 100644
--- a/src/mds-registry.c
+++ b/src/mds-registry.c
@@ -20,6 +20,9 @@
#include <libmdsserver/macros.h>
#include <libmdsserver/util.h>
#include <libmdsserver/mds-message.h>
+#include <libmdsserver/hash-table.h>
+#include <libmdsserver/hash-help.h>
+#include <libmdsserver/client-list.h>
#include <errno.h>
#include <inttypes.h>
@@ -53,7 +56,7 @@ server_characteristics_t server_characteristics =
/**
* Value of the ‘Message ID’ header for the next message
*/
-static int32_t message_id = 1;
+static int32_t message_id = 2;
/**
* Buffer for received messages
@@ -65,6 +68,11 @@ static mds_message_t received;
*/
static int connected = 1;
+/**
+ * Protocol registry table
+ */
+static hash_table_t reg_table;
+
/**
@@ -90,12 +98,25 @@ int initialise_server(void)
const char* const message =
"Command: intercept\n"
"Message ID: 0\n"
- "Length: 18\n"
+ "Length: 32\n"
"\n"
- "Command: register\n";
+ "Command: register\n"
+ "Client closed\n" /* TODO support not implemented yet */
+
+ "Command: reregister\n"
+ "Message ID: 1\n"
+ "\n";
if (full_send(message, strlen(message)))
return 1;
+ if (hash_table_create_tuned(&reg_table, 32))
+ {
+ perror(*argv);
+ hash_table_destroy(&reg_table, NULL, NULL);
+ return 1;
+ }
+ reg_table.key_comparator = (compare_func*)string_comparator;
+ reg_table.hasher = (hash_func*)string_hash;
server_initialised();
mds_message_initialise(&received);
return 0;
@@ -173,6 +194,8 @@ int unmarshal_server(char* state_buf)
r = mds_message_unmarshal(&received, state_buf);
if (r)
mds_message_destroy(&received);
+ reg_table.key_comparator = (compare_func*)string_comparator;
+ reg_table.hasher = (hash_func*)string_hash;
return r;
}
@@ -201,7 +224,7 @@ int master_loop(void)
int r = mds_message_read(&received, socket_fd);
if (r == 0)
{
- r = 0; /* TODO handle message */
+ r = handle_message();
if (r == 0)
continue;
}
@@ -225,6 +248,8 @@ int master_loop(void)
connected = 1;
}
+ /* TODO if !reexecing or failing, cleanup reg_table */
+
mds_message_destroy(&received);
return 0;
pfail:
@@ -236,6 +261,222 @@ int master_loop(void)
/**
+ * Handle the received message
+ *
+ * @return Zero on success -1 on error or interruption,
+ * errno will be set accordingly
+ */
+int handle_message(void)
+{
+ const char* recv_client_id = NULL;
+ const char* recv_message_id = NULL;
+ const char* recv_length = NULL;
+ const char* recv_action = NULL;
+ size_t i, length = 0;
+
+#define __get_header(storage, header) \
+ (startswith(received.headers[i], header)) \
+ storage = received.headers[i] + strlen(header)
+
+ for (i = 0; i < received.header_count; i++)
+ {
+ if __get_header(recv_client_id, "Client ID: ");
+ else if __get_header(recv_message_id, "Message ID: ");
+ else if __get_header(recv_length, "Length: ");
+ else if __get_header(recv_action, "Action: ");
+ else
+ continue;
+ if (recv_client_id && recv_message_id && recv_length && recv_action)
+ break;
+ }
+
+#undef __get_header
+
+
+ if ((recv_client_id == NULL) || (strequals(recv_client_id, "0:0")))
+ {
+ eprint("received message from anonymous sender, ignoring.");
+ return 0;
+ }
+ else if (strchr(recv_client_id, ':') == NULL)
+ {
+ eprint("received message from sender without a colon it its ID, ignoring, invalid ID.");
+ return 0;
+ }
+ else if ((recv_length == NULL) && ((recv_action == NULL) || !strequals(recv_action, "list")))
+ {
+ eprint("received empty message without `Action: list`, ignoring, has no effect.");
+ return 0;
+ }
+ else if (recv_message_id == NULL)
+ {
+ eprint("received message with ID, ignoring, master server is misbehaving.");
+ return 0;
+ }
+
+
+ if (recv_length != NULL)
+ length = (size_t)atoll(recv_length);
+ if (recv_action != NULL)
+ recv_action = "add";
+
+#define __registry_action(action) registry_action(length, action, recv_client_id, recv_message_id)
+
+ if (strequals(recv_action, "add")) return __registry_action(1);
+ else if (strequals(recv_action, "remove")) return __registry_action(-1);
+ else if (strequals(recv_action, "wait")) return __registry_action(0);
+ else if (strequals(recv_action, "list")) return list_registry(recv_client_id, recv_message_id);
+ else
+ {
+ eprint("received invalid action, ignoring.");
+ return 0;
+ }
+
+#undef __registry_action
+}
+
+
+/**
+ * Perform an action over the registry
+ *
+ * @param length The length of the received message
+ * @param action -1 to remove command, +1 to add commands, 0 to
+ * wait until the message commnds are registered
+ * @param recv_client_id The ID of the client
+ * @param recv_message_id The ID of the received message
+ * @return Zero on success -1 on error or interruption,
+ * errno will be set accordingly
+ */
+int registry_action(size_t length, int action, const char* recv_client_id, const char* recv_message_id)
+{
+ char* payload = received.payload;
+ uint64_t client = 0;
+ hash_table_t* wait_set = NULL;
+ size_t begin;
+ char client_words[22];
+ char* client_high;
+ char* client_low;
+
+ if (action)
+ {
+ strcpy(client_high = client_words, recv_client_id);
+ client_low = rawmemchr(client_words, ':');
+ *client_low++ = '\0';
+ client = (uint64_t)atoll(client_high);
+ client <<= 32;
+ client |= (uint64_t)atoll(client_low);
+ }
+ else
+ {
+ wait_set = malloc(sizeof(hash_table_t));
+ if (hash_table_create(wait_set))
+ {
+ perror(*argv);
+ hash_table_destroy(wait_set, NULL, NULL);
+ free(wait_set);
+ return -1;
+ }
+ wait_set->key_comparator = (compare_func*)string_comparator;
+ wait_set->hasher = (hash_func*)string_hash;
+ }
+
+ if (received.payload_size == length)
+ {
+ if (xrealloc(received.payload, received.payload_size <<= 1, char))
+ {
+ received.payload = payload;
+ received.payload_size >>= 1;
+ perror(*argv);
+ if (wait_set != NULL)
+ {
+ hash_table_destroy(wait_set, NULL, NULL);
+ free(wait_set);
+ }
+ return -1;
+ }
+ else
+ payload = received.payload;
+ }
+
+ payload[length] = '\n';
+
+ for (begin = 0; begin < length;)
+ {
+ char* end = rawmemchr(payload + begin, '\n');
+ size_t len = (size_t)(end - payload) - begin - 1;
+ char* command = payload + begin;
+ size_t command_key = (size_t)(void*)command;
+
+ command[len] = '\0';
+ begin += len + 1;
+
+ if (action == 1)
+ if (hash_table_contains_key(&reg_table, command_key))
+ {
+ size_t address = hash_table_get(&reg_table, command_key);
+ client_list_t* list = (client_list_t*)(void*)address;
+ if (client_list_add(list, client) < 0)
+ return -1;
+ }
+ else
+ {
+ client_list_t* list = malloc(sizeof(client_list_t));
+ void* address = list;
+ if (list == NULL)
+ return perror(*argv), -1;
+ if (client_list_create(list, 1) ||
+ client_list_add(list, client) ||
+ (hash_table_put(&reg_table, command_key, (size_t)address) == 0))
+ {
+ perror(*argv);
+ client_list_destroy(list);
+ free(list);
+ return -1;
+ }
+ }
+ else if ((action == -1) && hash_table_contains_key(&reg_table, command_key))
+ {
+ size_t address = hash_table_get(&reg_table, command_key);
+ client_list_t* list = (client_list_t*)(void*)address;
+ client_list_remove(list, client);
+ }
+ else if ((action == 0) && !hash_table_contains_key(&reg_table, command_key))
+ {
+ if (hash_table_put(wait_set, command_key, 1) == 0)
+ if (errno)
+ {
+ perror(*argv);
+ hash_table_destroy(wait_set, NULL, NULL);
+ free(wait_set);
+ return -1;
+ }
+ }
+ }
+
+ if (action == 0)
+ {
+ /* TODO */
+ }
+
+ return 0;
+}
+
+
+/**
+ * Send a list of all registered commands to a client
+ *
+ * @param recv_client_id The ID of the client
+ * @param recv_message_id The ID of the received message
+ * @return Zero on success -1 on error or interruption,
+ * errno will be set accordingly
+ */
+int list_registry(const char* recv_client_id, const char* recv_message_id)
+{
+ /* TODO */
+}
+
+
+/**
* Send a full message even if interrupted
*
* @param message The message to send