diff options
Diffstat (limited to 'src/mds-registry/slave.c')
-rw-r--r-- | src/mds-registry/slave.c | 403 |
1 files changed, 403 insertions, 0 deletions
diff --git a/src/mds-registry/slave.c b/src/mds-registry/slave.c new file mode 100644 index 0000000..9bc2fd1 --- /dev/null +++ b/src/mds-registry/slave.c @@ -0,0 +1,403 @@ +/** + * mds — A micro-display server + * Copyright © 2014 Mattias Andrée (maandree@member.fsf.org) + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + */ +#include "slave.h" + +#include "util.h" +#include "globals.h" + +#include "../mds-base.h" + +#include <libmdsserver/macros.h> + +#include <string.h> +#include <errno.h> +#include <pthread.h> + + + +/** + * Master function for slave threads + * + * @param data Input data + * @return Output data + */ +static void* slave_loop(void* data) +{ + slave_t* slave = data; + + if (slave->closed) + goto done; + + /* Set up traps for especially handled signals. */ + fail_if (trap_signals() < 0); + + fail_if ((errno = pthread_mutex_lock(&slave_mutex))); + + while (!reexecing && !terminating) + { + if ((slave->wait_set->size == 0) || slave->closed) + break; + pthread_cond_wait(&slave_cond, &slave_mutex); + } + + pthread_mutex_unlock(&slave_mutex); + + if (!(slave->closed) && slave->wait_set->size) + ; /* FIXME send inside slave_mutex */ + + goto done; + + pfail: + perror(*argv); + done: + with_mutex (slave_mutex, + if (!reexecing) + linked_list_remove(&slave_list, slave->node); + running_slaves--; + if (running_slaves == 0) + pthread_cond_signal(&slave_cond); + ); + return NULL; +} + + +/** + * Start a slave thread + * + * @param wait_set Set of protocols for which to wait that they become available + * @param recv_client_id The ID of the waiting client + * @param recv_message_id The ID of the message that triggered the waiting + * @return Non-zero on error + */ +int start_slave(hash_table_t* restrict wait_set, const char* restrict recv_client_id, const char* restrict recv_message_id) +{ + slave_t* slave = slave_create(wait_set, recv_client_id, recv_message_id); + size_t slave_address; + ssize_t node = LINKED_LIST_UNUSED; + + fail_if (slave == NULL); + fail_if ((errno = pthread_mutex_lock(&slave_mutex))); + + slave_address = (size_t)(void*)slave; + slave->node = node = linked_list_insert_end(&slave_list, slave_address); + if (slave->node == LINKED_LIST_UNUSED) + goto pfail_in_mutex; + + if ((errno = pthread_create(&(slave->thread), NULL, slave_loop, (void*)(intptr_t)slave))) + goto pfail_in_mutex; + + if ((errno = pthread_detach(slave->thread))) + perror(*argv); + + running_slaves++; + pthread_mutex_unlock(&slave_mutex); + + return 0; + pfail: + perror(*argv); + goto more_fail; + pfail_in_mutex: + perror(*argv); + pthread_mutex_unlock(&slave_mutex); + more_fail: + if (node != LINKED_LIST_UNUSED) + linked_list_remove(&slave_list, node); + return -1; +} + + +/** + * Close all slaves associated with a client + * + * @param client The client's ID + */ +void close_slaves(uint64_t client) +{ + ssize_t node; + with_mutex (slave_mutex, + foreach_linked_list_node (slave_list, node) + { + slave_t* slave = (slave_t*)(void*)(slave_list.values[node]); + if (slave->client == client) + slave->closed = 1; + } + ); +} + + +/** + * Notify slaves that a protocol has become available + * + * @param command The protocol + * @return Non-zero on error, `ernno`will be set accordingly + */ +int advance_slaves(char* command) +{ + size_t key = (size_t)(void*)command; + int signal_slaves = 0; + ssize_t node; + + if ((errno = pthread_mutex_lock(&slave_mutex))) + return -1; + + foreach_linked_list_node (slave_list, node) + { + slave_t* slave = (slave_t*)(void*)(slave_list.values[node]); + if (hash_table_contains_key(slave->wait_set, key)) + { + hash_table_remove(slave->wait_set, key); + signal_slaves |= slave->wait_set == 0; + } + } + + if (signal_slaves) + pthread_cond_broadcast(&slave_cond); + + pthread_mutex_unlock(&slave_mutex); + return 0; +} + + +/** + * Create a slave + * + * @param wait_set Set of protocols for which to wait that they become available + * @param recv_client_id The ID of the waiting client + * @param recv_message_id The ID of the message that triggered the waiting + * @return The slave, `NULL` on error, `errno` will be set accordingly + */ +slave_t* slave_create(hash_table_t* restrict wait_set, const char* restrict recv_client_id, const char* restrict recv_message_id) +{ + slave_t* restrict rc = NULL; + + if (xmalloc(rc, 1, slave_t)) + return NULL; + + slave_initialise(rc); + rc->wait_set = wait_set; + rc->client = parse_client_id(recv_client_id); + + if ((rc->client_id = strdup(recv_client_id)) == NULL) + goto fail; + + if ((rc->message_id = strdup(recv_message_id)) == NULL) + goto fail; + + return rc; + + fail: + slave_destroy(rc); + free(rc); + return NULL; +} + + +/** + * Initialise a slave + * + * @param this Memory slot in which to store the new slave information + */ +void slave_initialise(slave_t* restrict this) +{ + this->wait_set = NULL; + this->client_id = NULL; + this->message_id = NULL; + this->closed = 0; +} + + +/** + * Release all resources assoicated with a slave + * + * @param this The slave information + */ +void slave_destroy(slave_t* restrict this) +{ + if (this->wait_set != NULL) + { + hash_table_destroy(this->wait_set, (free_func*)reg_table_free_key, NULL); + free(this->wait_set); + this->wait_set = NULL; + } + + free(this->client_id); + this->client_id = NULL; + + free(this->message_id); + this->message_id = NULL; +} + + +/** + * Calculate the buffer size need to marshal slave information + * + * @param this The slave information + * @return The number of bytes to allocate to the output buffer + */ +size_t slave_marshal_size(const slave_t* restrict this) +{ + size_t rc = 2 * sizeof(int) + sizeof(ssize_t) + sizeof(size_t) + sizeof(uint64_t); + hash_entry_t* restrict entry; + size_t n; + + rc += (strlen(this->client_id) + strlen(this->message_id) + 2) * sizeof(char); + + foreach_hash_table_entry (*(this->wait_set), n, entry) + { + char* protocol = (char*)(void*)(entry->key); + rc += strlen(protocol) + 1; + } + + return rc; +} + + +/** + * Marshals slave information + * + * @param this The slave information + * @param data Output buffer for the marshalled data + * @return The number of bytes that have been written (everything will be written) + */ +size_t slave_marshal(const slave_t* restrict this, char* restrict data) +{ + hash_entry_t* restrict entry; + size_t n; + + buf_set_next(data, int, SLAVE_T_VERSION); + buf_set_next(data, int, this->closed); + buf_set_next(data, ssize_t, this->node); + buf_set_next(data, uint64_t, this->client); + + memcpy(data, this->client_id, (strlen(this->client_id) + 1) * sizeof(char)); + data += strlen(this->client_id) + 1; + + memcpy(data, this->message_id, (strlen(this->message_id) + 1) * sizeof(char)); + data += strlen(this->message_id) + 1; + + n = this->wait_set->size; + buf_set_next(data, size_t, n); + + foreach_hash_table_entry (*(this->wait_set), n, entry) + { + char* restrict protocol = (char*)(void*)(entry->key); + memcpy(data, protocol, (strlen(protocol) + 1) * sizeof(char)); + data += strlen(protocol) + 1; + } + + return slave_marshal_size(this); +} + + +/** + * Unmarshals slave information + * + * @param this Memory slot in which to store the new slave information + * @param data In buffer with the marshalled data + * @return Zero on error, errno will be set accordingly, otherwise the number of read bytes. + * Destroy the slave information on error. + */ +size_t slave_unmarshal(slave_t* restrict this, char* restrict data) +{ + size_t key, n, m, rc = 2 * sizeof(int) + sizeof(ssize_t) + sizeof(size_t) + sizeof(uint64_t); + char* protocol; + + this->wait_set = NULL; + this->client_id = NULL; + this->message_id = NULL; + + /* buf_get_next(data, int, SLAVE_T_VERSION); */ + buf_next(data, int, 1); + + buf_get_next(data, int, this->closed); + buf_get_next(data, ssize_t, this->node); + buf_get_next(data, uint64_t, this->client); + + n = (strlen((char*)data) + 1) * sizeof(char); + if ((this->client_id = malloc(n)) == NULL) + return 0; + memcpy(this->client_id, data, n); + data += n, rc += n; + + n = (strlen((char*)data) + 1) * sizeof(char); + if ((this->message_id = malloc(n)) == NULL) + return 0; + memcpy(this->message_id, data, n); + data += n, rc += n; + + if ((this->wait_set = malloc(sizeof(hash_table_t))) == NULL) + return 0; + if (hash_table_create(this->wait_set)) + return 0; + + buf_get_next(data, size_t, m); + + while (m--) + { + n = (strlen((char*)data) + 1) * sizeof(char); + if ((protocol = malloc(n)) == NULL) + return 0; + memcpy(protocol, data, n); + data += n, rc += n; + + key = (size_t)(void*)protocol; + if (hash_table_put(this->wait_set, key, 1) == 0) + if (errno) + { + free(protocol); + return 0; + } + } + + return rc; +} + + +/** + * Pretend to unmarshal slave information + * + * @param data In buffer with the marshalled data + * @return The number of read bytes + */ +size_t slave_unmarshal_skip(char* restrict data) +{ + size_t n, m, rc = 2 * sizeof(int) + sizeof(ssize_t) + sizeof(size_t) + sizeof(uint64_t); + + /* buf_get_next(data, int, SLAVE_T_VERSION); */ + buf_next(data, int, 1); + + buf_next(data, int, 1); + buf_next(data, ssize_t, 1); + + n = (strlen((char*)data) + 1) * sizeof(char); + data += n, rc += n; + + n = (strlen((char*)data) + 1) * sizeof(char); + data += n, rc += n; + + buf_get_next(data, size_t, m); + + while (m--) + { + n = (strlen((char*)data) + 1) * sizeof(char); + data += n, rc += n; + } + + return rc; +} + |