aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
Diffstat (limited to '')
-rw-r--r--src/mds-registry/registry.c64
1 files changed, 58 insertions, 6 deletions
diff --git a/src/mds-registry/registry.c b/src/mds-registry/registry.c
index 98f5ad1..6fed005 100644
--- a/src/mds-registry/registry.c
+++ b/src/mds-registry/registry.c
@@ -51,6 +51,9 @@ static int handle_close_message(void)
fail_if ((errno = pthread_mutex_lock(&reg_mutex)));
+
+ /* Remove server for all protocols. */
+
for (i = 0; i < received.header_count; i++)
if (startswith(received.headers[i], "Client closed: "))
{
@@ -59,11 +62,14 @@ static int handle_close_message(void)
foreach_hash_table_entry (reg_table, j, entry)
{
+ /* Remove server from list of servers that support the protocol,
+ once, if it is in the list. */
client_list_t* list = (client_list_t*)(void*)(entry->value);
client_list_remove(list, client);
if (list->size)
continue;
+ /* If no servers support the protocol, list the protocol for removal. */
fail_if ((keys == NULL) && xmalloc(keys, size, size_t));
if (ptr == size ? growalloc(old_keys, keys, size, size_t) : 0)
goto fail;
@@ -71,6 +77,9 @@ static int handle_close_message(void)
}
}
+
+ /* Remove protocol that no longer have any supporting servers. */
+
for (i = 0; i < ptr; i++)
{
hash_entry_t* entry = hash_table_get_entry(&reg_table, keys[i]);
@@ -92,6 +101,7 @@ static int handle_close_message(void)
perror(*argv);
fail:
free(keys);
+ pthread_mutex_unlock(&reg_mutex);
return -1;
}
@@ -109,6 +119,7 @@ static int registry_action_add(int has_key, char* command, size_t command_key, u
{
if (has_key)
{
+ /* Add server to protocol if the protocol is already in the table. */
size_t address = hash_table_get(&reg_table, command_key);
client_list_t* list = (client_list_t*)(void*)address;
if (client_list_add(list, client) < 0)
@@ -116,15 +127,20 @@ static int registry_action_add(int has_key, char* command, size_t command_key, u
}
else
{
+ /* If the protocol is not already in the table. */
+
+ /* Allocate list of servers for the protocol. */
client_list_t* list = malloc(sizeof(client_list_t));
void* address = list;
if (list == NULL)
goto pfail;
+ /* Duplicate the protocol name so it can be accessed later. */
if ((command = strdup(command)) == NULL)
{
free(list);
goto pfail;
}
+ /* Create list of servers, add server to list and add the protocol to the table. */
command_key = (size_t)(void*)command;
if (client_list_create(list, 1) ||
client_list_add(list, client) ||
@@ -156,7 +172,11 @@ static void registry_action_remove(size_t command_key, uint64_t client)
hash_entry_t* entry = hash_table_get_entry(&reg_table, command_key);
size_t address = entry->value;
client_list_t* list = (client_list_t*)(void*)address;
+
+ /* Remove server from protocol. */
client_list_remove(list, client);
+
+ /* Remove protocol if no servers support it anymore. */
if (list->size == 0)
{
client_list_destroy(list);
@@ -184,13 +204,16 @@ static int registry_action_act(char* command, int action, uint64_t client, hash_
if (action == 1)
{
+ /* Register server to protocol. */
if (registry_action_add(has_key, command, command_key, client))
return -1;
}
else if ((action == -1) && has_key)
+ /* Unregister server from protocol. */
registry_action_remove(command_key, client);
else if ((action == 0) && !has_key)
{
+ /* Add protocl to wait set of not present in the protocol table. */
if ((command = strdup(command)) == NULL)
goto pfail_wait;
command_key = (size_t)(void*)command;
@@ -229,6 +252,9 @@ static int registry_action(size_t length, int action, const char* recv_client_id
hash_table_t* wait_set = NULL;
size_t begin;
+
+ /* If ‘Action: wait’, create a set for the protocols that are not already available. */
+
if (action == 0)
{
wait_set = malloc(sizeof(hash_table_t));
@@ -242,6 +268,9 @@ static int registry_action(size_t length, int action, const char* recv_client_id
wait_set->hasher = (hash_func*)string_hash;
}
+
+ /* If the payload buffer is full, increase it so we can fit another character. */
+
if (received.payload_size == length)
{
if (growalloc(old, received.payload, received.payload_size, char))
@@ -254,8 +283,15 @@ static int registry_action(size_t length, int action, const char* recv_client_id
payload = received.payload;
}
+
+ /* LF-terminate the payload, perhaps it did not have a terminal LF. */
+
payload[length] = '\n';
+
+ /* For all protocols in the payload, either add or remove
+ them from or to the protocl table or the wait set. */
+
fail_if ((errno = pthread_mutex_lock(&reg_mutex)));
for (begin = 0; begin < length;)
@@ -267,11 +303,15 @@ static int registry_action(size_t length, int action, const char* recv_client_id
command[len] = '\0';
begin += len + 1;
- if (registry_action_act(command, action, client, wait_set))
- goto fail_in_mutex;
+ if (len > 0)
+ if (registry_action_act(command, action, client, wait_set))
+ goto fail_in_mutex;
}
pthread_mutex_unlock(&reg_mutex);
+
+
+ /* If ‘Action: wait’, start a new thread that waits for the protocols and the responds. */
if (action == 0)
{
@@ -303,6 +343,9 @@ static int list_registry(const char* recv_client_id, const char* recv_message_id
size_t ptr = 0, i;
hash_entry_t* entry;
+
+ /* Allocate the send buffer for the first time, it cannot be doubled if it is zero. */
+
if (send_buffer_size == 0)
{
fail_if (xmalloc(send_buffer, 256, char));
@@ -311,12 +354,16 @@ static int list_registry(const char* recv_client_id, const char* recv_message_id
fail_if ((errno = pthread_mutex_lock(&reg_mutex)));
+
+ /* Add all protocols to the send buffer. */
+
foreach_hash_table_entry (reg_table, i, entry)
{
size_t key = entry->key;
char* command = (char*)(void*)key;
size_t len = strlen(command);
+ /* Make sure the send buffer can fit all protocols. */
while (ptr + len + 1 >= send_buffer_size)
if (growalloc(old, send_buffer, send_buffer_size, char))
goto fail_in_mutex;
@@ -326,22 +373,27 @@ static int list_registry(const char* recv_client_id, const char* recv_message_id
send_buffer[ptr++] = '\n';
}
+
+ /* Make sure the message headers can fit the send buffer. */
+
i = strlen(recv_message_id) + strlen(recv_client_id) + 10 + 19;
i += strlen("To: %s\nIn response to: %s\nMessage ID: %" PRIi32 "\nLength: %" PRIu64 "\n\n");
while (ptr + i >= send_buffer_size)
- {
- if (growalloc(old, send_buffer, send_buffer_size, char))
- goto fail_in_mutex;
- }
+ if (growalloc(old, send_buffer, send_buffer_size, char))
+ goto fail_in_mutex;
+
+ /* Construct message headers. */
sprintf(send_buffer + ptr, "To: %s\nIn response to: %s\nMessage ID: %" PRIi32 "\nLength: %" PRIu64 "\n\n",
recv_message_id, recv_client_id, message_id, ptr);
+ /* Increase message ID. */
message_id = message_id == INT32_MAX ? 0 : (message_id + 1);
pthread_mutex_unlock(&reg_mutex);
+ /* Send message. */
if (full_send(send_buffer + ptr, strlen(send_buffer + ptr)))
return 1;
return full_send(send_buffer, ptr);