diff options
author | Mattias Andrée <maandree@operamail.com> | 2014-08-02 20:01:03 +0200 |
---|---|---|
committer | Mattias Andrée <maandree@operamail.com> | 2014-08-02 20:01:03 +0200 |
commit | 32bf8aea8f3a46ee6c2808cd2369a7558b7b6bc7 (patch) | |
tree | fcbdc048f48642c033d2c61bd9ca98ab1d02d56d | |
parent | m (diff) | |
download | mds-32bf8aea8f3a46ee6c2808cd2369a7558b7b6bc7.tar.gz mds-32bf8aea8f3a46ee6c2808cd2369a7558b7b6bc7.tar.bz2 mds-32bf8aea8f3a46ee6c2808cd2369a7558b7b6bc7.tar.xz |
misc
Signed-off-by: Mattias Andrée <maandree@operamail.com>
-rw-r--r-- | Makefile | 3 | ||||
-rw-r--r-- | TODO | 1 | ||||
-rw-r--r-- | src/libmdsserver/linked-list.h | 1 | ||||
-rw-r--r-- | src/mds-base.c | 15 | ||||
-rw-r--r-- | src/mds-base.h | 8 | ||||
-rw-r--r-- | src/mds-registry/globals.c | 27 | ||||
-rw-r--r-- | src/mds-registry/globals.h | 23 | ||||
-rw-r--r-- | src/mds-registry/mds-registry.c | 32 | ||||
-rw-r--r-- | src/mds-registry/registry.c | 68 | ||||
-rw-r--r-- | src/mds-registry/signals.c | 96 | ||||
-rw-r--r-- | src/mds-registry/signals.h | 26 | ||||
-rw-r--r-- | src/mds-registry/slave.c | 403 | ||||
-rw-r--r-- | src/mds-registry/slave.h | 160 | ||||
-rw-r--r-- | src/mds-server/mds-server.c | 2 | ||||
-rw-r--r-- | src/mds-server/mds-server.h | 2 |
15 files changed, 798 insertions, 69 deletions
@@ -20,7 +20,8 @@ OBJ_mds-server_ = mds-server interception-condition client multicast \ queued-interception globals signals interceptors \ sending slavery reexec receiving -OBJ_mds-registry_ = mds-registry util globals reexec registry +OBJ_mds-registry_ = mds-registry util globals reexec registry signals \ + slave OBJ_mds-server = $(foreach O,$(OBJ_mds-server_),obj/mds-server/$(O).o) OBJ_mds-registry = $(foreach O,$(OBJ_mds-registry_),obj/mds-registry/$(O).o) @@ -39,4 +39,5 @@ Missing servers: cmd-registry Command line interface for the protocol registry Fast lanes +Optimise use of mutexe diff --git a/src/libmdsserver/linked-list.h b/src/libmdsserver/linked-list.h index 6ed7076..e77f33d 100644 --- a/src/libmdsserver/linked-list.h +++ b/src/libmdsserver/linked-list.h @@ -276,7 +276,6 @@ int linked_list_unmarshal(linked_list_t* restrict this, char* restrict data); #define foreach_linked_list_node(list, node) \ for (node = (list).edge; node = (list).next[node], node != (list).edge;) - /** * Print the content of the list * diff --git a/src/mds-base.c b/src/mds-base.c index 9baa42e..04ae25b 100644 --- a/src/mds-base.c +++ b/src/mds-base.c @@ -208,6 +208,18 @@ void __attribute__((weak)) server_initialised(void) /** + * This function is called when an intraprocess signal + * that used to send a notification to a thread + * + * @param signo The signal that has been received + */ +void received_noop(int signo) +{ + (void) signo; +} + + +/** * This function is called when a signal that * signals the server to re-exec has been received * @@ -473,6 +485,9 @@ int trap_signals(void) /* Implement clean exit on SIGINT. */ fail_if (xsigaction(SIGINT, received_terminate) < 0); + /* Implement clean exit on SIGRTMIN. */ + fail_if (xsigaction(SIGRTMIN, received_noop) < 0); + return 0; pfail: perror(*argv); diff --git a/src/mds-base.h b/src/mds-base.h index c8666a1..edc42fa 100644 --- a/src/mds-base.h +++ b/src/mds-base.h @@ -159,6 +159,14 @@ void server_initialised(void); /* __attribute__((weak)) */ /** + * This function is called when an intraprocess signal + * that used to send a notification to a thread + * + * @param signo The signal that has been received + */ +void received_noop(int signo) __attribute__((weak, const)); + +/** * This function should be implemented by the actual server implementation * if the server is multithreaded * diff --git a/src/mds-registry/globals.c b/src/mds-registry/globals.c index 981cad1..e275eb7 100644 --- a/src/mds-registry/globals.c +++ b/src/mds-registry/globals.c @@ -50,17 +50,32 @@ char* send_buffer = NULL; size_t send_buffer_size = 0; /** - * General mutex + * Used to temporarily store the old value when reallocating heap-allocations */ -pthread_mutex_t reg_mutex; +char* old; /** - * General condition + * The master thread */ -pthread_cond_t reg_cond; +pthread_t master_thread; /** - * Used to temporarily store the old value when reallocating heap-allocations + * The number of running slaves */ -char* old; +size_t running_slaves = 0; + +/** + * List of running slaves + */ +linked_list_t slave_list; + +/** + * Mutex for slave data + */ +pthread_mutex_t slave_mutex; + +/** + * Condition for slave data + */ +pthread_cond_t slave_cond; diff --git a/src/mds-registry/globals.h b/src/mds-registry/globals.h index 9c4b42f..dcbf91d 100644 --- a/src/mds-registry/globals.h +++ b/src/mds-registry/globals.h @@ -21,6 +21,7 @@ #include <libmdsserver/mds-message.h> #include <libmdsserver/hash-table.h> +#include <libmdsserver/linked-list.h> #include <stdint.h> #include <stddef.h> @@ -62,19 +63,29 @@ extern char* send_buffer; extern size_t send_buffer_size; /** - * General mutex + * Used to temporarily store the old value when reallocating heap-allocations */ -extern pthread_mutex_t reg_mutex; +extern char* old; /** - * General condition + * The number of running slaves */ -extern pthread_cond_t reg_cond; +extern size_t running_slaves; /** - * Used to temporarily store the old value when reallocating heap-allocations + * List of running slaves */ -extern char* old; +extern linked_list_t slave_list; /* TODO (un)marshal */ + +/** + * Mutex for slave data + */ +extern pthread_mutex_t slave_mutex; + +/** + * Condition for slave data + */ +extern pthread_cond_t slave_cond; #endif diff --git a/src/mds-registry/mds-registry.c b/src/mds-registry/mds-registry.c index 96cf05d..aa759c8 100644 --- a/src/mds-registry/mds-registry.c +++ b/src/mds-registry/mds-registry.c @@ -23,6 +23,7 @@ #include <libmdsserver/macros.h> #include <libmdsserver/hash-help.h> +#include <libmdsserver/linked-list.h> #include <errno.h> #include <stdio.h> @@ -54,20 +55,20 @@ server_characteristics_t server_characteristics = */ int preinitialise_server(void) { - if ((errno = pthread_mutex_init(®_mutex, NULL))) - { - perror(*argv); - return 1; - } + int stage = 0; - if ((errno = pthread_cond_init(®_cond, NULL))) - { - perror(*argv); - pthread_mutex_destroy(®_mutex); - return 1; - } + fail_if ((errno = pthread_mutex_init(&slave_mutex, NULL))); stage++; + fail_if ((errno = pthread_cond_init(&slave_cond, NULL))); stage++; + + linked_list_create(&slave_list, 2); return 0; + + pfail: + perror(*argv); + if (stage >= 1) pthread_mutex_destroy(&slave_mutex); + if (stage >= 2) pthread_cond_destroy(&slave_cond); + return 1; } @@ -177,13 +178,18 @@ int master_loop(void) pfail: perror(*argv); fail: + /* Join with all slaves threads. */ + with_mutex (slave_mutex, + while (running_slaves > 0) + pthread_cond_wait(&slave_cond, &slave_mutex);); + if (rc || !reexecing) { hash_table_destroy(®_table, (free_func*)reg_table_free_key, (free_func*)reg_table_free_value); mds_message_destroy(&received); } - pthread_mutex_destroy(®_mutex); - pthread_cond_destroy(®_cond); + pthread_mutex_destroy(&slave_mutex); + pthread_cond_destroy(&slave_cond); free(send_buffer); return rc; } diff --git a/src/mds-registry/registry.c b/src/mds-registry/registry.c index 6fed005..8c0132e 100644 --- a/src/mds-registry/registry.c +++ b/src/mds-registry/registry.c @@ -19,6 +19,7 @@ #include "util.h" #include "globals.h" +#include "slave.h" #include "../mds-base.h" @@ -31,6 +32,7 @@ #include <string.h> #include <stdio.h> #include <stdlib.h> +#include <pthread.h> @@ -43,14 +45,14 @@ static int handle_close_message(void) { /* Servers do not close too often, there is no need to - optimise this with another hash table. */ + optimise this with another hash table. Doing so would + also require some caution because the keys are 32-bit + on 32-bit computers, and the client ID:s are 64-bit. */ size_t i, j, ptr = 0, size = 1; size_t* keys = NULL; size_t* old_keys; - fail_if ((errno = pthread_mutex_lock(®_mutex))); - /* Remove server for all protocols. */ @@ -75,9 +77,19 @@ static int handle_close_message(void) goto fail; keys[ptr++] = entry->key; } + + + /* Mark client as closed. */ + + close_slaves(client); } + /* Close slaves those clients have closed. */ + + with_mutex (slave_mutex, pthread_cond_broadcast(&slave_cond);); + + /* Remove protocol that no longer have any supporting servers. */ for (i = 0; i < ptr; i++) @@ -93,15 +105,12 @@ static int handle_close_message(void) free(command); } - pthread_mutex_unlock(®_mutex); - free(keys); return 0; pfail: perror(*argv); fail: free(keys); - pthread_mutex_unlock(®_mutex); return -1; } @@ -153,6 +162,9 @@ static int registry_action_add(int has_key, char* command, size_t command_key, u } } + /* Notify slaves. */ + fail_if (advance_slaves(command)); + return 0; pfail: perror(*argv); @@ -258,11 +270,13 @@ static int registry_action(size_t length, int action, const char* recv_client_id if (action == 0) { wait_set = malloc(sizeof(hash_table_t)); + if (wait_set == NULL) + return -1; if (hash_table_create(wait_set)) { hash_table_destroy(wait_set, NULL, NULL); free(wait_set); - goto pfail; + return -1; } wait_set->key_comparator = (compare_func*)string_comparator; wait_set->hasher = (hash_func*)string_hash; @@ -292,8 +306,6 @@ static int registry_action(size_t length, int action, const char* recv_client_id /* For all protocols in the payload, either add or remove them from or to the protocl table or the wait set. */ - fail_if ((errno = pthread_mutex_lock(®_mutex))); - for (begin = 0; begin < length;) { char* end = rawmemchr(payload + begin, '\n'); @@ -305,28 +317,16 @@ static int registry_action(size_t length, int action, const char* recv_client_id if (len > 0) if (registry_action_act(command, action, client, wait_set)) - goto fail_in_mutex; + return -1; } - pthread_mutex_unlock(®_mutex); - /* If ‘Action: wait’, start a new thread that waits for the protocols and the responds. */ if (action == 0) - { - /* FIXME */ - } + return start_slave(wait_set, recv_client_id, recv_message_id); return 0; - - - pfail: - perror(*argv); - return -1; - fail_in_mutex: - pthread_mutex_unlock(®_mutex); - return -1; } @@ -348,12 +348,11 @@ static int list_registry(const char* recv_client_id, const char* recv_message_id if (send_buffer_size == 0) { - fail_if (xmalloc(send_buffer, 256, char)); + if (xmalloc(send_buffer, 256, char)) + return -1; send_buffer_size = 256; } - fail_if ((errno = pthread_mutex_lock(®_mutex))); - /* Add all protocols to the send buffer. */ @@ -366,7 +365,7 @@ static int list_registry(const char* recv_client_id, const char* recv_message_id /* Make sure the send buffer can fit all protocols. */ while (ptr + len + 1 >= send_buffer_size) if (growalloc(old, send_buffer, send_buffer_size, char)) - goto fail_in_mutex; + return -1; memcpy(send_buffer + ptr, command, len * sizeof(char)); ptr += len; @@ -381,7 +380,7 @@ static int list_registry(const char* recv_client_id, const char* recv_message_id while (ptr + i >= send_buffer_size) if (growalloc(old, send_buffer, send_buffer_size, char)) - goto fail_in_mutex; + return -1; /* Construct message headers. */ @@ -391,21 +390,10 @@ static int list_registry(const char* recv_client_id, const char* recv_message_id /* Increase message ID. */ message_id = message_id == INT32_MAX ? 0 : (message_id + 1); - pthread_mutex_unlock(®_mutex); - /* Send message. */ if (full_send(send_buffer + ptr, strlen(send_buffer + ptr))) return 1; return full_send(send_buffer, ptr); - - - fail_in_mutex: - pthread_mutex_unlock(®_mutex); - return -1; - - pfail: - perror(*argv); - return -1; } @@ -413,7 +401,7 @@ static int list_registry(const char* recv_client_id, const char* recv_message_id * Handle the received message containing ‘Command: register’-header–value * * @return Zero on success -1 on error or interruption, - * errno will be set accordingly + * `errno` will be set accordingly */ static int handle_register_message(void) { diff --git a/src/mds-registry/signals.c b/src/mds-registry/signals.c new file mode 100644 index 0000000..7d1f5b4 --- /dev/null +++ b/src/mds-registry/signals.c @@ -0,0 +1,96 @@ +/** + * mds — A micro-display server + * Copyright © 2014 Mattias Andrée (maandree@member.fsf.org) + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + */ +#include "signals.h" + +#include "globals.h" +#include "slave.h" + +#include "../mds-base.h" + +#include <libmdsserver/linked-list.h> +#include <libmdsserver/macros.h> + +#include <stdio.h> +#include <pthread.h> +#include <errno.h> + + + +/** + * Send a singal to all threads except the current thread + * + * @param signo The signal + */ +static void signal_all(int signo) +{ + pthread_t current_thread; + ssize_t node; + + current_thread = pthread_self(); + + if (pthread_equal(current_thread, master_thread) == 0) + pthread_kill(master_thread, signo); + + with_mutex (slave_mutex, + foreach_linked_list_node (slave_list, node) + { + slave_t* value = (slave_t*)(void*)(slave_list.values[node]); + if (pthread_equal(current_thread, value->thread) == 0) + pthread_kill(value->thread, signo); + } + ); +} + + +/** + * This function is called when a signal that + * signals the server to re-exec has been received + * + * When this function is invoked, it should set `reexecing` to a non-zero value + * + * @param signo The signal that has been received + */ +void received_reexec(int signo) +{ + if (reexecing == 0) + { + terminating = reexecing = 1; + eprint("re-exec signal received."); + signal_all(signo); + } +} + + +/** + * This function is called when a signal that + * signals the server to re-exec has been received + * + * When this function is invoked, it should set `terminating` to a non-zero value + * + * @param signo The signal that has been received + */ +void received_terminate(int signo) +{ + if (terminating == 0) + { + terminating = 1; + eprint("terminate signal received."); + signal_all(signo); + } +} + diff --git a/src/mds-registry/signals.h b/src/mds-registry/signals.h new file mode 100644 index 0000000..65070e9 --- /dev/null +++ b/src/mds-registry/signals.h @@ -0,0 +1,26 @@ +/** + * mds — A micro-display server + * Copyright © 2014 Mattias Andrée (maandree@member.fsf.org) + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + */ +#ifndef MDS_MDS_REGISTRY_SIGNALS_H +#define MDS_MDS_REGISTRY_SIGNALS_H + + +#include "../mds-base.h" + + +#endif + diff --git a/src/mds-registry/slave.c b/src/mds-registry/slave.c new file mode 100644 index 0000000..9bc2fd1 --- /dev/null +++ b/src/mds-registry/slave.c @@ -0,0 +1,403 @@ +/** + * mds — A micro-display server + * Copyright © 2014 Mattias Andrée (maandree@member.fsf.org) + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + */ +#include "slave.h" + +#include "util.h" +#include "globals.h" + +#include "../mds-base.h" + +#include <libmdsserver/macros.h> + +#include <string.h> +#include <errno.h> +#include <pthread.h> + + + +/** + * Master function for slave threads + * + * @param data Input data + * @return Output data + */ +static void* slave_loop(void* data) +{ + slave_t* slave = data; + + if (slave->closed) + goto done; + + /* Set up traps for especially handled signals. */ + fail_if (trap_signals() < 0); + + fail_if ((errno = pthread_mutex_lock(&slave_mutex))); + + while (!reexecing && !terminating) + { + if ((slave->wait_set->size == 0) || slave->closed) + break; + pthread_cond_wait(&slave_cond, &slave_mutex); + } + + pthread_mutex_unlock(&slave_mutex); + + if (!(slave->closed) && slave->wait_set->size) + ; /* FIXME send inside slave_mutex */ + + goto done; + + pfail: + perror(*argv); + done: + with_mutex (slave_mutex, + if (!reexecing) + linked_list_remove(&slave_list, slave->node); + running_slaves--; + if (running_slaves == 0) + pthread_cond_signal(&slave_cond); + ); + return NULL; +} + + +/** + * Start a slave thread + * + * @param wait_set Set of protocols for which to wait that they become available + * @param recv_client_id The ID of the waiting client + * @param recv_message_id The ID of the message that triggered the waiting + * @return Non-zero on error + */ +int start_slave(hash_table_t* restrict wait_set, const char* restrict recv_client_id, const char* restrict recv_message_id) +{ + slave_t* slave = slave_create(wait_set, recv_client_id, recv_message_id); + size_t slave_address; + ssize_t node = LINKED_LIST_UNUSED; + + fail_if (slave == NULL); + fail_if ((errno = pthread_mutex_lock(&slave_mutex))); + + slave_address = (size_t)(void*)slave; + slave->node = node = linked_list_insert_end(&slave_list, slave_address); + if (slave->node == LINKED_LIST_UNUSED) + goto pfail_in_mutex; + + if ((errno = pthread_create(&(slave->thread), NULL, slave_loop, (void*)(intptr_t)slave))) + goto pfail_in_mutex; + + if ((errno = pthread_detach(slave->thread))) + perror(*argv); + + running_slaves++; + pthread_mutex_unlock(&slave_mutex); + + return 0; + pfail: + perror(*argv); + goto more_fail; + pfail_in_mutex: + perror(*argv); + pthread_mutex_unlock(&slave_mutex); + more_fail: + if (node != LINKED_LIST_UNUSED) + linked_list_remove(&slave_list, node); + return -1; +} + + +/** + * Close all slaves associated with a client + * + * @param client The client's ID + */ +void close_slaves(uint64_t client) +{ + ssize_t node; + with_mutex (slave_mutex, + foreach_linked_list_node (slave_list, node) + { + slave_t* slave = (slave_t*)(void*)(slave_list.values[node]); + if (slave->client == client) + slave->closed = 1; + } + ); +} + + +/** + * Notify slaves that a protocol has become available + * + * @param command The protocol + * @return Non-zero on error, `ernno`will be set accordingly + */ +int advance_slaves(char* command) +{ + size_t key = (size_t)(void*)command; + int signal_slaves = 0; + ssize_t node; + + if ((errno = pthread_mutex_lock(&slave_mutex))) + return -1; + + foreach_linked_list_node (slave_list, node) + { + slave_t* slave = (slave_t*)(void*)(slave_list.values[node]); + if (hash_table_contains_key(slave->wait_set, key)) + { + hash_table_remove(slave->wait_set, key); + signal_slaves |= slave->wait_set == 0; + } + } + + if (signal_slaves) + pthread_cond_broadcast(&slave_cond); + + pthread_mutex_unlock(&slave_mutex); + return 0; +} + + +/** + * Create a slave + * + * @param wait_set Set of protocols for which to wait that they become available + * @param recv_client_id The ID of the waiting client + * @param recv_message_id The ID of the message that triggered the waiting + * @return The slave, `NULL` on error, `errno` will be set accordingly + */ +slave_t* slave_create(hash_table_t* restrict wait_set, const char* restrict recv_client_id, const char* restrict recv_message_id) +{ + slave_t* restrict rc = NULL; + + if (xmalloc(rc, 1, slave_t)) + return NULL; + + slave_initialise(rc); + rc->wait_set = wait_set; + rc->client = parse_client_id(recv_client_id); + + if ((rc->client_id = strdup(recv_client_id)) == NULL) + goto fail; + + if ((rc->message_id = strdup(recv_message_id)) == NULL) + goto fail; + + return rc; + + fail: + slave_destroy(rc); + free(rc); + return NULL; +} + + +/** + * Initialise a slave + * + * @param this Memory slot in which to store the new slave information + */ +void slave_initialise(slave_t* restrict this) +{ + this->wait_set = NULL; + this->client_id = NULL; + this->message_id = NULL; + this->closed = 0; +} + + +/** + * Release all resources assoicated with a slave + * + * @param this The slave information + */ +void slave_destroy(slave_t* restrict this) +{ + if (this->wait_set != NULL) + { + hash_table_destroy(this->wait_set, (free_func*)reg_table_free_key, NULL); + free(this->wait_set); + this->wait_set = NULL; + } + + free(this->client_id); + this->client_id = NULL; + + free(this->message_id); + this->message_id = NULL; +} + + +/** + * Calculate the buffer size need to marshal slave information + * + * @param this The slave information + * @return The number of bytes to allocate to the output buffer + */ +size_t slave_marshal_size(const slave_t* restrict this) +{ + size_t rc = 2 * sizeof(int) + sizeof(ssize_t) + sizeof(size_t) + sizeof(uint64_t); + hash_entry_t* restrict entry; + size_t n; + + rc += (strlen(this->client_id) + strlen(this->message_id) + 2) * sizeof(char); + + foreach_hash_table_entry (*(this->wait_set), n, entry) + { + char* protocol = (char*)(void*)(entry->key); + rc += strlen(protocol) + 1; + } + + return rc; +} + + +/** + * Marshals slave information + * + * @param this The slave information + * @param data Output buffer for the marshalled data + * @return The number of bytes that have been written (everything will be written) + */ +size_t slave_marshal(const slave_t* restrict this, char* restrict data) +{ + hash_entry_t* restrict entry; + size_t n; + + buf_set_next(data, int, SLAVE_T_VERSION); + buf_set_next(data, int, this->closed); + buf_set_next(data, ssize_t, this->node); + buf_set_next(data, uint64_t, this->client); + + memcpy(data, this->client_id, (strlen(this->client_id) + 1) * sizeof(char)); + data += strlen(this->client_id) + 1; + + memcpy(data, this->message_id, (strlen(this->message_id) + 1) * sizeof(char)); + data += strlen(this->message_id) + 1; + + n = this->wait_set->size; + buf_set_next(data, size_t, n); + + foreach_hash_table_entry (*(this->wait_set), n, entry) + { + char* restrict protocol = (char*)(void*)(entry->key); + memcpy(data, protocol, (strlen(protocol) + 1) * sizeof(char)); + data += strlen(protocol) + 1; + } + + return slave_marshal_size(this); +} + + +/** + * Unmarshals slave information + * + * @param this Memory slot in which to store the new slave information + * @param data In buffer with the marshalled data + * @return Zero on error, errno will be set accordingly, otherwise the number of read bytes. + * Destroy the slave information on error. + */ +size_t slave_unmarshal(slave_t* restrict this, char* restrict data) +{ + size_t key, n, m, rc = 2 * sizeof(int) + sizeof(ssize_t) + sizeof(size_t) + sizeof(uint64_t); + char* protocol; + + this->wait_set = NULL; + this->client_id = NULL; + this->message_id = NULL; + + /* buf_get_next(data, int, SLAVE_T_VERSION); */ + buf_next(data, int, 1); + + buf_get_next(data, int, this->closed); + buf_get_next(data, ssize_t, this->node); + buf_get_next(data, uint64_t, this->client); + + n = (strlen((char*)data) + 1) * sizeof(char); + if ((this->client_id = malloc(n)) == NULL) + return 0; + memcpy(this->client_id, data, n); + data += n, rc += n; + + n = (strlen((char*)data) + 1) * sizeof(char); + if ((this->message_id = malloc(n)) == NULL) + return 0; + memcpy(this->message_id, data, n); + data += n, rc += n; + + if ((this->wait_set = malloc(sizeof(hash_table_t))) == NULL) + return 0; + if (hash_table_create(this->wait_set)) + return 0; + + buf_get_next(data, size_t, m); + + while (m--) + { + n = (strlen((char*)data) + 1) * sizeof(char); + if ((protocol = malloc(n)) == NULL) + return 0; + memcpy(protocol, data, n); + data += n, rc += n; + + key = (size_t)(void*)protocol; + if (hash_table_put(this->wait_set, key, 1) == 0) + if (errno) + { + free(protocol); + return 0; + } + } + + return rc; +} + + +/** + * Pretend to unmarshal slave information + * + * @param data In buffer with the marshalled data + * @return The number of read bytes + */ +size_t slave_unmarshal_skip(char* restrict data) +{ + size_t n, m, rc = 2 * sizeof(int) + sizeof(ssize_t) + sizeof(size_t) + sizeof(uint64_t); + + /* buf_get_next(data, int, SLAVE_T_VERSION); */ + buf_next(data, int, 1); + + buf_next(data, int, 1); + buf_next(data, ssize_t, 1); + + n = (strlen((char*)data) + 1) * sizeof(char); + data += n, rc += n; + + n = (strlen((char*)data) + 1) * sizeof(char); + data += n, rc += n; + + buf_get_next(data, size_t, m); + + while (m--) + { + n = (strlen((char*)data) + 1) * sizeof(char); + data += n, rc += n; + } + + return rc; +} + diff --git a/src/mds-registry/slave.h b/src/mds-registry/slave.h new file mode 100644 index 0000000..ae8ae09 --- /dev/null +++ b/src/mds-registry/slave.h @@ -0,0 +1,160 @@ +/** + * mds — A micro-display server + * Copyright © 2014 Mattias Andrée (maandree@member.fsf.org) + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + */ +#ifndef MDS_MDS_REGISTRY_SLAVE_H +#define MDS_MDS_REGISTRY_SLAVE_H + + +#include <libmdsserver/hash-table.h> + +#include <stdlib.h> +#include <stdint.h> +#include <pthread.h> + + + +#define SLAVE_T_VERSION 0 + +/** + * Slave information, a thread waiting for protocols to become available + */ +typedef struct slave /* TODO: add time-to-live */ +{ + /** + * Set of protocols for which to wait that they become available + */ + hash_table_t* wait_set; + + /** + * The ID of the waiting client + */ + uint64_t client; + + /** + * The ID of the waiting client + */ + char* client_id; + + /** + * The ID of the message that triggered the waiting + */ + char* message_id; + + /** + * The slave's node in the linked list of slaves + */ + ssize_t node; + + /** + * Whether the client has been closed + */ + volatile int closed; + + /** + * The slave thread + */ + pthread_t thread; + +} slave_t; + + + +/** + * Start a slave thread + * + * @param wait_set Set of protocols for which to wait that they become available + * @param recv_client_id The ID of the waiting client + * @param recv_message_id The ID of the message that triggered the waiting + * @return Non-zero on error + */ +int start_slave(hash_table_t* restrict wait_set, const char* restrict recv_client_id, const char* restrict recv_message_id); + +/** + * Close all slaves associated with a client + * + * @param client The client's ID + */ +void close_slaves(uint64_t client); + +/** + * Notify slaves that a protocol has become available + * + * @param command The protocol + * @return Non-zero on error, `ernno`will be set accordingly + */ +int advance_slaves(char* command); + +/** + * Create a slave + * + * @return The slave + */ +slave_t* slave_create(hash_table_t* restrict wait_set, const char* restrict recv_client_id, const char* restrict recv_message_id); + + +/** + * Initialise a slave + * + * @param this Memory slot in which to store the new slave information + */ +void slave_initialise(slave_t* restrict this); + +/** + * Release all resources assoicated with a slave + * + * @param this The slave information + */ +void slave_destroy(slave_t* restrict this); + +/** + * Calculate the buffer size need to marshal slave information + * + * @param this The slave information + * @return The number of bytes to allocate to the output buffer + */ +size_t slave_marshal_size(const slave_t* restrict this) __attribute__((pure)); + +/** + * Marshals slave information + * + * @param this The slave information + * @param data Output buffer for the marshalled data + * @return The number of bytes that have been written (everything will be written) + */ +size_t slave_marshal(const slave_t* restrict this, char* restrict data); + +/** + * Unmarshals slave information + * + * @param this Memory slot in which to store the new slave information + * @param data In buffer with the marshalled data + * @return Zero on error, errno will be set accordingly, otherwise the number of read bytes. + * Destroy the slave information on error. + */ +size_t slave_unmarshal(slave_t* restrict this, char* restrict data); + +/** + * Pretend to unmarshal slave information + * + * @param data In buffer with the marshalled data + * @return The number of read bytes + */ +size_t slave_unmarshal_skip(char* restrict data) __attribute__((pure)); + + +#endif + diff --git a/src/mds-server/mds-server.c b/src/mds-server/mds-server.c index d6846ec..d0b6b87 100644 --- a/src/mds-server/mds-server.c +++ b/src/mds-server/mds-server.c @@ -248,7 +248,7 @@ int accept_connection(void) * Master function for slave threads * * @param data Input data - * @return Outout data + * @return Output data */ void* slave_loop(void* data) { diff --git a/src/mds-server/mds-server.h b/src/mds-server/mds-server.h index 3bce7ed..4d6c5e7 100644 --- a/src/mds-server/mds-server.h +++ b/src/mds-server/mds-server.h @@ -35,7 +35,7 @@ int accept_connection(void); * Master function for slave threads * * @param data Input data - * @return Outout data + * @return Output data */ void* slave_loop(void* data); |