diff options
Diffstat (limited to '')
-rw-r--r-- | src/mds-registry/slave.c | 677 |
1 files changed, 339 insertions, 338 deletions
diff --git a/src/mds-registry/slave.c b/src/mds-registry/slave.c index b936c29..be77c5b 100644 --- a/src/mds-registry/slave.c +++ b/src/mds-registry/slave.c @@ -31,40 +31,38 @@ #include <inttypes.h> - /** * Notify the waiting client that it may resume * * @param slave The slave * @return Non-zero, `errno` will be set accordingly */ -__attribute__((nonnull)) -static int slave_notify_client(slave_t* slave) +static int __attribute__((nonnull)) +slave_notify_client(slave_t *slave) { - char buf[sizeof("To: %s\nIn response to: %s\nMessage ID: %" PRIu32 "\nOrigin command: register\n\n") - / sizeof(char) + 41]; - size_t ptr = 0, sent, left; - - /* Construct message headers. */ - sprintf(buf, "To: %s\nIn response to: %s\nMessage ID: %" PRIu32 "\nOrigin command: register\n\n", - slave->client_id, slave->message_id, message_id); - - /* Increase message ID. */ - message_id = message_id == UINT32_MAX ? 0 : (message_id + 1); - - /* Send message to client. */ - left = strlen(buf); - while (left > 0) - { - sent = send_message(socket_fd, buf + ptr, left); - fail_if ((sent < left) && errno && (errno != EINTR)); - left -= sent; - ptr += sent; - } - - return 0; - fail: - return -1; + char buf[sizeof("To: %s\nIn response to: %s\nMessage ID: %" PRIu32 "\nOrigin command: register\n\n") + / sizeof(char) + 41]; + size_t ptr = 0, sent, left; + + /* Construct message headers. */ + sprintf(buf, "To: %s\nIn response to: %s\nMessage ID: %" PRIu32 "\nOrigin command: register\n\n", + slave->client_id, slave->message_id, message_id); + + /* Increase message ID. */ + message_id = message_id == UINT32_MAX ? 0 : (message_id + 1); + + /* Send message to client. */ + left = strlen(buf); + while (left > 0) { + sent = send_message(socket_fd, buf + ptr, left); + fail_if ((sent < left) && errno && (errno != EINTR)); + left -= sent; + ptr += sent; + } + + return 0; +fail: + return -1; } @@ -74,60 +72,58 @@ static int slave_notify_client(slave_t* slave) * @param data Input data * @return Output data */ -static void* slave_loop(void* data) +static void * +slave_loop(void *data) { - /* pthread_cond_timedwait is required to handle re-exec and termination because - pthread_cond_timedwait and pthread_cond_wait ignore interruptions via signals. */ - struct timespec timeout = - { - .tv_sec = 1, - .tv_nsec = 0 - }; - slave_t* slave = data; - struct timespec now; - - if (slave->closed) - goto done; - - /* Set up traps for especially handled signals. */ - fail_if (trap_signals() < 0); - - fail_if ((errno = pthread_mutex_lock(&slave_mutex))); - - while (!reexecing && !terminating) - { - if ((slave->wait_set->size == 0) || slave->closed) - break; - if (slave->timed) - { - fail_if (monotone(&now)); - if (now.tv_sec > slave->dethklok.tv_sec) - break; - if (now.tv_sec == slave->dethklok.tv_sec) - if (now.tv_nsec >= slave->dethklok.tv_nsec) - break; + /* pthread_cond_timedwait is required to handle re-exec and termination because + pthread_cond_timedwait and pthread_cond_wait ignore interruptions via signals. */ + struct timespec timeout = { + .tv_sec = 1, + .tv_nsec = 0 + }; + slave_t* slave = data; + struct timespec now; + + if (slave->closed) + goto done; + + /* Set up traps for especially handled signals. */ + fail_if (trap_signals() < 0); + + fail_if ((errno = pthread_mutex_lock(&slave_mutex))); + + while (!reexecing && !terminating) { + if (!slave->wait_set->size || slave->closed) + break; + if (slave->timed) { + fail_if (monotone(&now)); + if (now.tv_sec > slave->dethklok.tv_sec) + break; + if (now.tv_sec == slave->dethklok.tv_sec) + if (now.tv_nsec >= slave->dethklok.tv_nsec) + break; + } + pthread_cond_timedwait(&slave_cond, &slave_mutex, &timeout); } - pthread_cond_timedwait(&slave_cond, &slave_mutex, &timeout); - } - - if (!(slave->closed) && (slave->wait_set->size == 0)) - slave_notify_client(slave); - - pthread_mutex_unlock(&slave_mutex); - - goto done; - - fail: - xperror(*argv); - done: - with_mutex (slave_mutex, - if (!reexecing) - linked_list_remove(&slave_list, slave->node); - running_slaves--; - if (running_slaves == 0) - pthread_cond_signal(&slave_cond); - ); - return NULL; + + if (!slave->closed && !slave->wait_set->size) + slave_notify_client(slave); + + pthread_mutex_unlock(&slave_mutex); + + goto done; + +fail: + xperror(*argv); +done: + with_mutex (slave_mutex, + if (!reexecing) + linked_list_remove(&slave_list, slave->node); + running_slaves--; + if (!running_slaves) + pthread_cond_signal(&slave_cond); + ); + return NULL; } @@ -137,26 +133,27 @@ static void* slave_loop(void* data) * @param slave The slave * @return Non-zero on error, `errno` will be set accordingly */ -int start_created_slave(slave_t* restrict slave) +int +start_created_slave(slave_t *restrict slave) { - int locked = 0; - - fail_if ((errno = pthread_mutex_lock(&slave_mutex))); - locked = 1; - - fail_if ((errno = pthread_create(&(slave->thread), NULL, slave_loop, (void*)(intptr_t)slave))); - - if ((errno = pthread_detach(slave->thread))) - xperror(*argv); - - running_slaves++; - pthread_mutex_unlock(&slave_mutex); - - return 0; - fail: - if (locked) - pthread_mutex_unlock(&slave_mutex); - return -1; + int locked = 0; + + fail_if ((errno = pthread_mutex_lock(&slave_mutex))); + locked = 1; + + fail_if ((errno = pthread_create(&(slave->thread), NULL, slave_loop, (void*)(intptr_t)slave))); + + if ((errno = pthread_detach(slave->thread))) + xperror(*argv); + + running_slaves++; + pthread_mutex_unlock(&slave_mutex); + + return 0; +fail: + if (locked) + pthread_mutex_unlock(&slave_mutex); + return -1; } @@ -168,49 +165,50 @@ int start_created_slave(slave_t* restrict slave) * @param recv_message_id The ID of the message that triggered the waiting * @return Non-zero on error */ -int start_slave(hash_table_t* restrict wait_set, const char* restrict recv_client_id, - const char* restrict recv_message_id) +int +start_slave(hash_table_t *restrict wait_set, const char *restrict recv_client_id, const char *restrict recv_message_id) { - slave_t* slave = slave_create(wait_set, recv_client_id, recv_message_id); - size_t slave_address, i; - ssize_t node = LINKED_LIST_UNUSED; - int locked = 0; - - fail_if (slave == NULL); - fail_if ((errno = pthread_mutex_lock(&slave_mutex))); - locked = 1; - - slave_address = (size_t)(void*)slave; - slave->node = node = linked_list_insert_end(&slave_list, slave_address); - fail_if (slave->node == LINKED_LIST_UNUSED); - - for (i = 0; i < received.header_count; i++) - if (startswith(received.headers[i], "Time to live: ")) - { - const char* ttl = received.headers[i] + strlen("Time to live: "); - slave->timed = 1; - fail_if (monotone(&(slave->dethklok))); - slave->dethklok.tv_sec += (time_t)atoll(ttl); - /* It should really be `atol`, but we want to be future-proof. */ - break; - } - - fail_if ((errno = pthread_create(&(slave->thread), NULL, slave_loop, (void*)(intptr_t)slave))); - - if ((errno = pthread_detach(slave->thread))) - xperror(*argv); - - running_slaves++; - pthread_mutex_unlock(&slave_mutex); - - return 0; - fail: - xperror(*argv); - if (locked) - pthread_mutex_unlock(&slave_mutex); - if (node != LINKED_LIST_UNUSED) - linked_list_remove(&slave_list, node); - return -1; + slave_t *slave = slave_create(wait_set, recv_client_id, recv_message_id); + size_t slave_address, i; + ssize_t node = LINKED_LIST_UNUSED; + int locked = 0; + const char* ttl; + + fail_if (!slave); + fail_if ((errno = pthread_mutex_lock(&slave_mutex))); + locked = 1; + + slave_address = (size_t)(void*)slave; + slave->node = node = linked_list_insert_end(&slave_list, slave_address); + fail_if (slave->node == LINKED_LIST_UNUSED); + + for (i = 0; i < received.header_count; i++) { + if (startswith(received.headers[i], "Time to live: ")) { + ttl = received.headers[i] + strlen("Time to live: "); + slave->timed = 1; + fail_if (monotone(&(slave->dethklok))); + slave->dethklok.tv_sec += (time_t)atoll(ttl); + /* It should really be `atol`, but we want to be future-proof. */ + break; + } + } + + fail_if ((errno = pthread_create(&(slave->thread), NULL, slave_loop, (void*)(intptr_t)slave))); + + if ((errno = pthread_detach(slave->thread))) + xperror(*argv); + + running_slaves++; + pthread_mutex_unlock(&slave_mutex); + + return 0; +fail: + xperror(*argv); + if (locked) + pthread_mutex_unlock(&slave_mutex); + if (node != LINKED_LIST_UNUSED) + linked_list_remove(&slave_list, node); + return -1; } @@ -219,17 +217,18 @@ int start_slave(hash_table_t* restrict wait_set, const char* restrict recv_clien * * @param client The client's ID */ -void close_slaves(uint64_t client) +void +close_slaves(uint64_t client) { - ssize_t node; - with_mutex (slave_mutex, - foreach_linked_list_node (slave_list, node) - { - slave_t* slave = (slave_t*)(void*)(slave_list.values[node]); - if (slave->client == client) - slave->closed = 1; - } - ); + ssize_t node; + slave_t *slave; + with_mutex (slave_mutex, + foreach_linked_list_node (slave_list, node) { + slave = (slave_t *)(void *)(slave_list.values[node]); + if (slave->client == client) + slave->closed = 1; + } + ); } @@ -239,31 +238,31 @@ void close_slaves(uint64_t client) * @param command The protocol * @return Non-zero on error, `ernno`will be set accordingly */ -int advance_slaves(char* command) +int +advance_slaves(char *command) { - size_t key = (size_t)(void*)command; - int signal_slaves = 0; - ssize_t node; - - fail_if ((errno = pthread_mutex_lock(&slave_mutex))); - - foreach_linked_list_node (slave_list, node) - { - slave_t* slave = (slave_t*)(void*)(slave_list.values[node]); - if (hash_table_contains_key(slave->wait_set, key)) - { - hash_table_remove(slave->wait_set, key); - signal_slaves |= slave->wait_set == 0; + size_t key = (size_t)(void *)command; + int signal_slaves = 0; + ssize_t node; + slave_t *slave; + + fail_if ((errno = pthread_mutex_lock(&slave_mutex))); + + foreach_linked_list_node (slave_list, node) { + slave = (void *)(slave_list.values[node]); + if (hash_table_contains_key(slave->wait_set, key)) { + hash_table_remove(slave->wait_set, key); + signal_slaves |= slave->wait_set == 0; + } } - } - - if (signal_slaves) - pthread_cond_broadcast(&slave_cond); - pthread_mutex_unlock(&slave_mutex); - return 0; - fail: - return -1; + if (signal_slaves) + pthread_cond_broadcast(&slave_cond); + + pthread_mutex_unlock(&slave_mutex); + return 0; +fail: + return -1; } @@ -275,27 +274,27 @@ int advance_slaves(char* command) * @param recv_message_id The ID of the message that triggered the waiting * @return The slave, `NULL` on error, `errno` will be set accordingly */ -slave_t* slave_create(hash_table_t* restrict wait_set, const char* restrict recv_client_id, - const char* restrict recv_message_id) +slave_t * +slave_create(hash_table_t *restrict wait_set, const char *restrict recv_client_id, const char *restrict recv_message_id) { - slave_t* restrict rc = NULL; - int saved_errno; - - fail_if (xmalloc(rc, 1, slave_t)); - - slave_initialise(rc); - rc->wait_set = wait_set; - rc->client = parse_client_id(recv_client_id); - - fail_if (xstrdup_nn(rc->client_id, recv_client_id)); - fail_if (xstrdup_nn(rc->message_id, recv_message_id)); - - return rc; - - fail: - saved_errno = errno; - slave_destroy(rc), free(rc); - return errno = saved_errno, NULL; + slave_t *restrict rc = NULL; + int saved_errno; + + fail_if (xmalloc(rc, 1, slave_t)); + + slave_initialise(rc); + rc->wait_set = wait_set; + rc->client = parse_client_id(recv_client_id); + + fail_if (xstrdup_nn(rc->client_id, recv_client_id)); + fail_if (xstrdup_nn(rc->message_id, recv_message_id)); + + return rc; + +fail: + saved_errno = errno; + slave_destroy(rc), free(rc); + return errno = saved_errno, NULL; } @@ -304,15 +303,16 @@ slave_t* slave_create(hash_table_t* restrict wait_set, const char* restrict recv * * @param this Memory slot in which to store the new slave information */ -void slave_initialise(slave_t* restrict this) +void +slave_initialise(slave_t *restrict this) { - this->wait_set = NULL; - this->client_id = NULL; - this->message_id = NULL; - this->closed = 0; - this->dethklok.tv_sec = 0; - this->dethklok.tv_nsec = 0; - this->timed = 0; + this->wait_set = NULL; + this->client_id = NULL; + this->message_id = NULL; + this->closed = 0; + this->dethklok.tv_sec = 0; + this->dethklok.tv_nsec = 0; + this->timed = 0; } @@ -321,23 +321,23 @@ void slave_initialise(slave_t* restrict this) * * @param this The slave information */ -void slave_destroy(slave_t* restrict this) +void +slave_destroy(slave_t *restrict this) { - if (this == NULL) - return; - - if (this->wait_set != NULL) - { - hash_table_destroy(this->wait_set, (free_func*)reg_table_free_key, NULL); - free(this->wait_set); - this->wait_set = NULL; - } - - free(this->client_id); - this->client_id = NULL; - - free(this->message_id); - this->message_id = NULL; + if (!this) + return; + + if (this->wait_set) { + hash_table_destroy(this->wait_set, (free_func*)reg_table_free_key, NULL); + free(this->wait_set); + this->wait_set = NULL; + } + + free(this->client_id); + this->client_id = NULL; + + free(this->message_id); + this->message_id = NULL; } @@ -347,23 +347,24 @@ void slave_destroy(slave_t* restrict this) * @param this The slave information * @return The number of bytes to allocate to the output buffer */ -size_t slave_marshal_size(const slave_t* restrict this) +size_t +slave_marshal_size(const slave_t *restrict this) { - size_t rc; - hash_entry_t* restrict entry; - size_t n; - - rc = sizeof(int) + sizeof(sig_atomic_t) + sizeof(ssize_t) + sizeof(size_t) + sizeof(uint64_t); - rc += sizeof(int) + sizeof(time_t) + sizeof(long); - rc += (strlen(this->client_id) + strlen(this->message_id) + 2) * sizeof(char); - - foreach_hash_table_entry (*(this->wait_set), n, entry) - { - char* protocol = (char*)(void*)(entry->key); - rc += strlen(protocol) + 1; - } - - return rc; + size_t rc; + hash_entry_t *restrict entry; + size_t n; + char *protocol; + + rc = sizeof(int) + sizeof(sig_atomic_t) + sizeof(ssize_t) + sizeof(size_t) + sizeof(uint64_t); + rc += sizeof(int) + sizeof(time_t) + sizeof(long); + rc += (strlen(this->client_id) + strlen(this->message_id) + 2) * sizeof(char); + + foreach_hash_table_entry (*(this->wait_set), n, entry) { + protocol = (void *)(entry->key); + rc += strlen(protocol) + 1; + } + + return rc; } @@ -374,36 +375,37 @@ size_t slave_marshal_size(const slave_t* restrict this) * @param data Output buffer for the marshalled data * @return The number of bytes that have been written (everything will be written) */ -size_t slave_marshal(const slave_t* restrict this, char* restrict data) +size_t +slave_marshal(const slave_t *restrict this, char *restrict data) { - hash_entry_t* restrict entry; - size_t n; - - buf_set_next(data, int, SLAVE_T_VERSION); - buf_set_next(data, sig_atomic_t, this->closed); - buf_set_next(data, ssize_t, this->node); - buf_set_next(data, uint64_t, this->client); - buf_set_next(data, int, this->timed); - buf_set_next(data, time_t, this->dethklok.tv_sec); - buf_set_next(data, long, this->dethklok.tv_nsec); - - memcpy(data, this->client_id, (strlen(this->client_id) + 1) * sizeof(char)); - data += strlen(this->client_id) + 1; - - memcpy(data, this->message_id, (strlen(this->message_id) + 1) * sizeof(char)); - data += strlen(this->message_id) + 1; - - n = this->wait_set->size; - buf_set_next(data, size_t, n); - - foreach_hash_table_entry (*(this->wait_set), n, entry) - { - char* restrict protocol = (char*)(void*)(entry->key); - memcpy(data, protocol, (strlen(protocol) + 1) * sizeof(char)); - data += strlen(protocol) + 1; - } - - return slave_marshal_size(this); + hash_entry_t *restrict entry; + size_t n; + char *restrict protocol; + + buf_set_next(data, int, SLAVE_T_VERSION); + buf_set_next(data, sig_atomic_t, this->closed); + buf_set_next(data, ssize_t, this->node); + buf_set_next(data, uint64_t, this->client); + buf_set_next(data, int, this->timed); + buf_set_next(data, time_t, this->dethklok.tv_sec); + buf_set_next(data, long, this->dethklok.tv_nsec); + + memcpy(data, this->client_id, (strlen(this->client_id) + 1) * sizeof(char)); + data += strlen(this->client_id) + 1; + + memcpy(data, this->message_id, (strlen(this->message_id) + 1) * sizeof(char)); + data += strlen(this->message_id) + 1; + + n = this->wait_set->size; + buf_set_next(data, size_t, n); + + foreach_hash_table_entry (*(this->wait_set), n, entry) { + protocol = (void *)(entry->key); + memcpy(data, protocol, (strlen(protocol) + 1) * sizeof(char)); + data += strlen(protocol) + 1; + } + + return slave_marshal_size(this); } @@ -415,55 +417,55 @@ size_t slave_marshal(const slave_t* restrict this, char* restrict data) * @return Zero on error, `errno` will be set accordingly, otherwise the * number of read bytes. Destroy the slave information on error. */ -size_t slave_unmarshal(slave_t* restrict this, char* restrict data) +size_t +slave_unmarshal(slave_t *restrict this, char *restrict data) { - size_t key, n, m, rc = 2 * sizeof(int) + sizeof(ssize_t) + sizeof(size_t) + sizeof(uint64_t); - char* protocol = NULL; - int saved_errno; - - this->wait_set = NULL; - this->client_id = NULL; - this->message_id = NULL; - - /* buf_get_next(data, int, SLAVE_T_VERSION); */ - buf_next(data, int, 1); - - buf_get_next(data, sig_atomic_t, this->closed); - buf_get_next(data, ssize_t, this->node); - buf_get_next(data, uint64_t, this->client); - buf_get_next(data, int, this->timed); - buf_get_next(data, time_t, this->dethklok.tv_sec); - buf_get_next(data, long, this->dethklok.tv_nsec); - - n = strlen((char*)data) + 1; - fail_if (xmemdup(this->client_id, data, n, char)); - data += n, rc += n * sizeof(char); - - n = strlen((char*)data) + 1; - fail_if (xmemdup(this->message_id, data, n, char)); - data += n, rc += n * sizeof(char); - - fail_if (xmalloc(this->wait_set, 1, hash_table_t)); - fail_if (hash_table_create(this->wait_set)); - - buf_get_next(data, size_t, m); - - while (m--) - { - n = strlen((char*)data) + 1; - fail_if (xmemdup(protocol, data, n, char)); - data += n, rc += n * sizeof(char); - - key = (size_t)(void*)protocol; - if (hash_table_put(this->wait_set, key, 1) == 0) - fail_if (errno); - } - - return rc; - fail: - saved_errno = errno; - free(protocol); - return errno = saved_errno, (size_t)0; + size_t key, n, m, rc = 2 * sizeof(int) + sizeof(ssize_t) + sizeof(size_t) + sizeof(uint64_t); + char *protocol = NULL; + int saved_errno; + + this->wait_set = NULL; + this->client_id = NULL; + this->message_id = NULL; + + /* buf_get_next(data, int, SLAVE_T_VERSION); */ + buf_next(data, int, 1); + + buf_get_next(data, sig_atomic_t, this->closed); + buf_get_next(data, ssize_t, this->node); + buf_get_next(data, uint64_t, this->client); + buf_get_next(data, int, this->timed); + buf_get_next(data, time_t, this->dethklok.tv_sec); + buf_get_next(data, long, this->dethklok.tv_nsec); + + n = strlen((char *)data) + 1; + fail_if (xmemdup(this->client_id, data, n, char)); + data += n, rc += n * sizeof(char); + + n = strlen((char *)data) + 1; + fail_if (xmemdup(this->message_id, data, n, char)); + data += n, rc += n * sizeof(char); + + fail_if (xmalloc(this->wait_set, 1, hash_table_t)); + fail_if (hash_table_create(this->wait_set)); + + buf_get_next(data, size_t, m); + + while (m--) { + n = strlen((char *)data) + 1; + fail_if (xmemdup(protocol, data, n, char)); + data += n, rc += n * sizeof(char); + + key = (size_t)(void*)protocol; + if (!hash_table_put(this->wait_set, key, 1)) + fail_if (errno); + } + + return rc; +fail: + saved_errno = errno; + free(protocol); + return errno = saved_errno, (size_t)0; } @@ -473,35 +475,34 @@ size_t slave_unmarshal(slave_t* restrict this, char* restrict data) * @param data In buffer with the marshalled data * @return The number of read bytes */ -size_t slave_unmarshal_skip(char* restrict data) +size_t +slave_unmarshal_skip(char *restrict data) { - size_t n, m, rc = 2 * sizeof(int) + sizeof(ssize_t) + sizeof(size_t) + sizeof(uint64_t); - rc += sizeof(int) + sizeof(time_t) + sizeof(long); - - /* buf_get_next(data, int, SLAVE_T_VERSION); */ - buf_next(data, int, 1); - - buf_next(data, sig_atomic_t, 1); - buf_next(data, ssize_t, 1); - buf_next(data, uint64_t, 1); - buf_next(data, int, 1); - buf_next(data, time_t, 1); - buf_next(data, long, 1); - - n = (strlen((char*)data) + 1) * sizeof(char); - data += n, rc += n; - - n = (strlen((char*)data) + 1) * sizeof(char); - data += n, rc += n; - - buf_get_next(data, size_t, m); - - while (m--) - { - n = (strlen((char*)data) + 1) * sizeof(char); - data += n, rc += n; - } + size_t n, m, rc = 2 * sizeof(int) + sizeof(ssize_t) + sizeof(size_t) + sizeof(uint64_t); + rc += sizeof(int) + sizeof(time_t) + sizeof(long); + + /* buf_get_next(data, int, SLAVE_T_VERSION); */ + buf_next(data, int, 1); + + buf_next(data, sig_atomic_t, 1); + buf_next(data, ssize_t, 1); + buf_next(data, uint64_t, 1); + buf_next(data, int, 1); + buf_next(data, time_t, 1); + buf_next(data, long, 1); - return rc; -} + n = (strlen((char *)data) + 1) * sizeof(char); + data += n, rc += n; + + n = (strlen((char *)data) + 1) * sizeof(char); + data += n, rc += n; + + buf_get_next(data, size_t, m); + while (m--) { + n = (strlen((char*)data) + 1) * sizeof(char); + data += n, rc += n; + } + + return rc; +} |