aboutsummaryrefslogtreecommitdiffstats
path: root/src/mds-server/mds-server.c
diff options
context:
space:
mode:
Diffstat (limited to '')
-rw-r--r--src/mds-server/mds-server.c468
1 files changed, 34 insertions, 434 deletions
diff --git a/src/mds-server/mds-server.c b/src/mds-server/mds-server.c
index 2f587a7..7da5f39 100644
--- a/src/mds-server/mds-server.c
+++ b/src/mds-server/mds-server.c
@@ -25,6 +25,8 @@
#include "signals.h"
#include "interceptors.h"
#include "sending.h"
+#include "slavery.h"
+#include "reexec.h"
#include <libmdsserver/config.h>
#include <libmdsserver/linked-list.h>
@@ -44,10 +46,10 @@
#include <errno.h>
#include <pthread.h>
#include <sys/socket.h>
-#include <sys/mman.h>
#include <fcntl.h>
#include <sys/stat.h>
#include <sys/types.h>
+#include <sys/mman.h>
#include <dirent.h>
#include <inttypes.h>
#include <time.h>
@@ -185,25 +187,7 @@ int main(int argc_, char** argv_)
/* Unmarshal the state of the server. */
if (reexec)
- {
- pid_t pid = getpid();
- int reexec_fd, r;
- char shm_path[NAME_MAX + 1];
-
- /* Acquire access to marshalled data. */
- xsnprintf(shm_path, SHM_PATH_PATTERN, (unsigned long int)pid);
- reexec_fd = shm_open(shm_path, O_RDONLY, S_IRWXU);
- fail_if (reexec_fd < 0); /* Critical. */
- /* Unmarshal state. */
- r = unmarshal_server(reexec_fd);
- close(reexec_fd);
- shm_unlink(shm_path);
- if (r < 0)
- {
- /* Close all files (hopefully sockets) we do not know what they are. */
- close_files((fd > 2) && (fd != socket_fd) && (fd_table_contains_key(&client_map, fd) == 0));
- }
- }
+ complete_reexec(socket_fd);
/* Accepting incoming connections. */
while (running && (terminating == 0))
@@ -229,7 +213,6 @@ int main(int argc_, char** argv_)
}
terminate:
-
if (reexecing)
goto reexec;
@@ -247,54 +230,9 @@ int main(int argc_, char** argv_)
reexec:
- {
- pid_t pid = getpid();
- int reexec_fd;
- char shm_path[NAME_MAX + 1];
- ssize_t node;
-
- /* Join with all slaves threads. */
- with_mutex (slave_mutex,
- while (running_slaves > 0)
- pthread_cond_wait(&slave_cond, &slave_mutex););
-
- /* Release resources. */
- pthread_mutex_destroy(&slave_mutex);
- pthread_cond_destroy(&slave_cond);
- pthread_mutex_destroy(&modify_mutex);
- pthread_cond_destroy(&modify_cond);
- hash_table_destroy(&modify_map, NULL, NULL);
-
- /* Marshal the state of the server. */
- xsnprintf(shm_path, SHM_PATH_PATTERN, (unsigned long int)pid);
- reexec_fd = shm_open(shm_path, O_RDWR | O_CREAT | O_EXCL, S_IRWXU);
- fail_if (reexec_fd < 0);
- if (marshal_server(reexec_fd) < 0)
- goto reexec_fail;
- close(reexec_fd);
- reexec_fd = -1;
-
- /* Release resources. */
- foreach_linked_list_node (client_list, node)
- {
- client_t* client = (client_t*)(void*)(client_list.values[node]);
- client_destroy(client);
- }
- fd_table_destroy(&client_map, NULL, NULL);
- linked_list_destroy(&client_list);
-
- /* Re-exec the server. */
- reexec_server(argc, argv, reexec);
-
- reexec_fail:
- perror(*argv);
- if (reexec_fd >= 0)
- close(reexec_fd);
- shm_unlink(shm_path);
- /* Returning non-zero is important, otherwise the server cannot
- be respawn if the re-exec fails. */
- return 1;
- }
+ perform_reexec(reexec);
+ /* Returning non-zero is important, otherwise the server cannot
+ be respawn if the re-exec fails. */
pfail:
perror(*argv);
@@ -311,47 +249,16 @@ int main(int argc_, char** argv_)
void* slave_loop(void* data)
{
int socket_fd = (int)(intptr_t)data;
- ssize_t entry = LINKED_LIST_UNUSED;
size_t information_address = fd_table_get(&client_map, (size_t)socket_fd);
client_t* information = (client_t*)(void*)information_address;
char* msgbuf = NULL;
size_t n;
- size_t tmp;
int r;
+ /* Intiailsie the client. */
if (information == NULL)
- {
- /* Create information table. */
- fail_if (xmalloc(information, 1, client_t));
- client_initialise(information);
-
- /* Add to list of clients. */
- pthread_mutex_lock(&slave_mutex);
- entry = linked_list_insert_end(&client_list, (size_t)(void*)information);
- if (entry == LINKED_LIST_UNUSED)
- {
- perror(*argv);
- pthread_mutex_unlock(&slave_mutex);
- goto fail;
- }
-
- /* Add client to table. */
- tmp = fd_table_put(&client_map, socket_fd, (size_t)(void*)information);
- if ((tmp == 0) && errno)
- {
- perror(*argv);
- pthread_mutex_unlock(&slave_mutex);
- goto fail;
- }
- pthread_mutex_unlock(&slave_mutex);
-
- /* Fill information table. */
- information->list_entry = entry;
- information->socket_fd = socket_fd;
- information->open = 1;
- fail_if (mds_message_initialise(&(information->message)));
- }
+ fail_if ((information = initialise_client(socket_fd)) == NULL);
/* Store slave thread and create mutexes and conditions. */
fail_if (client_initialise_threading(information));
@@ -406,13 +313,15 @@ void* slave_loop(void* data)
close(socket_fd);
free(msgbuf);
if (information != NULL)
- client_destroy(information);
+ {
+ /* Unlist and free client. */
+ with_mutex (slave_mutex, linked_list_remove(&client_list, information->list_entry););
+ client_destroy(information);
+ }
- /* Unlist client and decrease the slave count. */
+ /* Unmap client and decrease the slave count. */
with_mutex (slave_mutex,
fd_table_remove(&client_map, socket_fd);
- if (entry != LINKED_LIST_UNUSED)
- linked_list_remove(&client_list, entry);
running_slaves--;
pthread_cond_signal(&slave_cond););
return NULL;
@@ -436,36 +345,6 @@ void* slave_loop(void* data)
/**
- * Receive a full message and update open status if the client closes
- *
- * @param client The client
- * @return Zero on success, -2 on failure, otherwise -1
- */
-int fetch_message(client_t* client)
-{
- int r = mds_message_read(&(client->message), client->socket_fd);
- if (r == 0)
- return 0;
-
- if (r == -2)
- eprint("corrupt message received.");
- else if (errno == ECONNRESET)
- {
- r = mds_message_read(&(client->message), client->socket_fd);
- client->open = 0;
- /* Connection closed. */
- }
- else if (errno != EINTR)
- {
- r = -2;
- perror(*argv);
- }
-
- return r;
-}
-
-
-/**
* Perform actions that should be taken when
* a message has been received from a client
*
@@ -501,7 +380,7 @@ int message_received(client_t* client)
/* Notify waiting client about a received message modification. */
- if (modifying != 0)
+ if (modifying)
{
/* pthread_cond_timedwait is required to handle re-exec and termination because
pthread_cond_timedwait and pthread_cond_wait ignore interruptions via signals. */
@@ -526,35 +405,15 @@ int message_received(client_t* client)
}
address = hash_table_get(&modify_map, (size_t)modify_id);
recipient = (client_t*)(void*)address;
- if (xmalloc(multicast = recipient->modify_message, 1, mds_message_t))
- goto fail;
- multicast->headers = NULL;
- multicast->header_count = 0;
- multicast->payload = NULL;
- multicast->payload_size = 0;
- multicast->payload_ptr = 0;
- multicast->buffer = NULL;
- multicast->buffer_size = 0;
- multicast->buffer_ptr = 0;
- multicast->stage = 0;
- if (xmalloc(multicast->payload, message.payload_size, char))
- goto fail;
+ fail_if (xmalloc(multicast = recipient->modify_message, 1, mds_message_t));
+ mds_message_zero_initialise(multicast);
+ fail_if (xmalloc(multicast->payload, message.payload_size, char));
memcpy(multicast->payload, message.payload, message.payload_size * sizeof(char));
- if (xmalloc(multicast->headers, message.header_count, char*))
- goto fail;
+ fail_if (xmalloc(multicast->headers, message.header_count, char*));
for (i = 0; i < message.header_count; i++, multicast->header_count++)
{
multicast->headers[i] = strdup(message.headers[i]);
- if (multicast->headers[i] == NULL)
- goto fail;
- }
- goto done;
- fail:
- if (multicast != NULL)
- {
- mds_message_destroy(multicast);
- free(multicast);
- recipient->modify_message = NULL;
+ fail_if (multicast->headers[i] == NULL);
}
done:
pthread_mutex_unlock(&(modify_mutex));
@@ -562,6 +421,15 @@ int message_received(client_t* client)
/* Do nothing more, not not even multicast this message. */
return 0;
+ pfail:
+ perror(*argv);
+ if (multicast != NULL)
+ {
+ mds_message_destroy(multicast);
+ free(multicast);
+ recipient->modify_message = NULL;
+ }
+ goto done;
}
@@ -707,15 +575,15 @@ int message_received(client_t* client)
{
/* Concatenate message to already pending messages. */
size_t new_len = client->send_pending_size + n;
- char* msg_new = realloc(client->send_pending, new_len * sizeof(char));
- if (msg_new != NULL)
+ char* msg_new = client->send_pending;
+ if (xrealloc(msg_new, new_len, char))
+ perror(*argv);
+ else
{
memcpy(msg_new + client->send_pending_size, msgbuf, n * sizeof(char));
client->send_pending = msg_new;
client->send_pending_size = new_len;
}
- else
- perror(*argv);
free(msgbuf);
}
);
@@ -940,271 +808,3 @@ void run_initrc(char** args)
eprintf("unable to run %s file, you might as well kill me.", INITRC_FILE);
}
-
-/**
- * Marshal the server's state into a file
- *
- * @param fd The file descriptor
- * @return Negative on error
- */
-int marshal_server(int fd)
-{
- size_t list_size = linked_list_marshal_size(&client_list);
- size_t map_size = fd_table_marshal_size(&client_map);
- size_t list_elements = 0;
- size_t state_n = 0;
- char* state_buf = NULL;
- char* state_buf_;
- ssize_t node;
-
-
- /* Calculate the grand size of all client information. */
- for (node = client_list.edge;; list_elements++)
- {
- if ((node = client_list.next[node]) == client_list.edge)
- break;
- state_n += client_marshal_size((client_t*)(void*)(client_list.values[node]));
- }
-
- /* Add the size of the rest of the program's state. */
- state_n += sizeof(int) + sizeof(sig_atomic_t) + 2 * sizeof(uint64_t) + 2 * sizeof(size_t);
- state_n += list_elements * sizeof(size_t) + list_size + map_size;
-
- /* Allocate a buffer for all data except the client list and the client map. */
- state_buf = state_buf_ = malloc(state_n);
- if (state_buf == NULL)
- goto fail;
-
-
- /* Tell the new version of the program what version of the program it is marshalling. */
- buf_set_next(state_buf_, int, MDS_SERVER_VARS_VERSION);
-
- /* Marshal the miscellaneous state data. */
- buf_set_next(state_buf_, sig_atomic_t, running);
- buf_set_next(state_buf_, uint64_t, next_client_id);
- buf_set_next(state_buf_, uint64_t, next_modify_id);
-
- /* Tell the program how large the marshalled client list is and how any clients are marshalled. */
- buf_set_next(state_buf_, size_t, list_size);
- buf_set_next(state_buf_, size_t, list_elements);
-
- /* Marshal the clients. */
- foreach_linked_list_node (client_list, node)
- {
- /* Get the memory address of the client. */
- size_t value_address = client_list.values[node];
- /* Get the client's information. */
- client_t* value = (client_t*)(void*)value_address;
-
- /* Marshal the address, it is used the the client list and the client map, that will be marshalled. */
- buf_set_next(state_buf_, size_t, value_address);
- /* Marshal the client informationation. */
- state_buf_ += client_marshal(value, state_buf_) / sizeof(char);
- }
-
- /* Marshal the client list. */
- linked_list_marshal(&client_list, state_buf_);
- state_buf_ += list_size / sizeof(char);
- /* Marshal the client map. */
- fd_table_marshal(&client_map, state_buf_);
-
- /* Send the marshalled data into the file. */
- if (full_write(fd, state_buf, state_n) < 0)
- goto fail;
- free(state_buf);
-
- return 0;
-
- fail:
- perror(*argv);
- free(state_buf);
- return -1;
-}
-
-
-/**
- * Address translation table used by `unmarshal_server` and `remapper`
- */
-static hash_table_t unmarshal_remap_map;
-
-/**
- * Address translator for `unmarshal_server`
- *
- * @param old The old address
- * @return The new address
- */
-static size_t unmarshal_remapper(size_t old)
-{
- return hash_table_get(&unmarshal_remap_map, old);
-}
-
-/**
- * Unmarshal the server's state from a file
- *
- * @param fd The file descriptor
- * @return Negative on error
- */
-int unmarshal_server(int fd)
-{
- int with_error = 0;
- char* state_buf;
- char* state_buf_;
- size_t list_size;
- size_t list_elements;
- size_t i;
- ssize_t node;
- pthread_t slave_thread;
-
-
- /* Read the file. */
- if ((state_buf = state_buf_ = full_read(fd)) == NULL)
- {
- perror(*argv);
- return -1;
- }
-
- /* Create memory address remapping table. */
- if (hash_table_create(&unmarshal_remap_map))
- {
- perror(*argv);
- free(state_buf);
- hash_table_destroy(&unmarshal_remap_map, NULL, NULL);
- return -1;
- }
-
-
- /* Get the marshal protocal version. Not needed, there is only the one version right now. */
- /* buf_get(state_buf_, int, 0, MDS_SERVER_VARS_VERSION); */
- buf_next(state_buf_, int, 1);
-
- /* Unmarshal the miscellaneous state data. */
- buf_get_next(state_buf_, sig_atomic_t, running);
- buf_get_next(state_buf_, uint64_t, next_client_id);
- buf_get_next(state_buf_, uint64_t, next_modify_id);
-
- /* Get the marshalled size of the client list and how any clients that are marshalled. */
- buf_get_next(state_buf_, size_t, list_size);
- buf_get_next(state_buf_, size_t, list_elements);
-
- /* Unmarshal the clients. */
- for (i = 0; i < list_elements; i++)
- {
- size_t n;
- size_t value_address;
- client_t* value;
-
- /* Allocate the client's information. */
- if (xmalloc(value, 1, client_t))
- goto clients_fail;
-
- /* Unmarshal the address, it is used the the client list and the client map, that are also marshalled. */
- buf_get_next(state_buf_, size_t, value_address);
- /* Unmarshal the client information. */
- n = client_unmarshal(value, state_buf_);
- if (n == 0)
- goto clients_fail;
-
- /* Populate the remapping table. */
- if (hash_table_put(&unmarshal_remap_map, value_address, (size_t)(void*)value) == 0)
- if (errno)
- goto clients_fail;
-
- /* Delayed seeking. */
- state_buf_ += n / sizeof(char);
-
-
- /* On error, seek past all clients. */
- continue;
- clients_fail:
- perror(*argv);
- with_error = 1;
- if (value != NULL)
- {
- buf_prev(state_buf_, size_t, 1);
- free(value);
- }
- for (; i < list_elements; i++)
- /* There is not need to close the sockets, it is done by
- the caller because there are conditions where we cannot
- get here anyway. */
- state_buf_ += client_unmarshal_skip(state_buf_) / sizeof(char);
- break;
- }
-
- /* Unmarshal the client list. */
- if (linked_list_unmarshal(&client_list, state_buf_))
- goto critical_fail;
- state_buf_ += list_size / sizeof(char);
-
- /* Unmarshal the client map. */
- if (fd_table_unmarshal(&client_map, state_buf_, unmarshal_remapper))
- goto critical_fail;
-
- /* Release the raw data. */
- free(state_buf);
-
- /* Remove non-found elements from the fd table. */
-#define __bit(I, _OP_) client_map.used[I / 64] _OP_ ((uint64_t)1 << (I % 64))
- if (with_error)
- for (i = 0; i < client_map.capacity; i++)
- if ((__bit(i, &)) && (client_map.values[i] == 0))
- /* Lets not presume that fd-table actually initialise its allocations. */
- __bit(i, &= ~);
-#undef __bit
-
- /* Remap the linked list and remove non-found elements, and start the clients. */
- foreach_linked_list_node (client_list, node)
- {
- /* Remap the linked list and remove non-found elements. */
- size_t new_address = unmarshal_remapper(client_list.values[node]);
- client_list.values[node] = new_address;
- if (new_address == 0) /* Returned if missing (or if the address is the invalid NULL.) */
- linked_list_remove(&client_list, node);
- else
- {
- /* Start the clients. (Errors do not need to be reported.) */
- client_t* client = (client_t*)(void*)new_address;
- int socket_fd = client->socket_fd;
-
- /* Increase number of running slaves. */
- with_mutex (slave_mutex, running_slaves++;);
-
- /* Start slave thread. */
- create_slave(&slave_thread, socket_fd);
- }
- }
-
- /* Release the remapping table's resources. */
- hash_table_destroy(&unmarshal_remap_map, NULL, NULL);
-
- return -with_error;
-
- critical_fail:
- perror(*argv);
- abort();
-}
-
-
-/**
- * Create, start and detache a slave thread
- *
- * @param thread The address at where to store the thread
- * @param socket_fd The file descriptor of the slave's socket
- * @return Zero on success, -1 on error, error message will have been printed
- */
-int create_slave(pthread_t* thread_slot, int socket_fd)
-{
- if ((errno = pthread_create(thread_slot, NULL, slave_loop, (void*)(intptr_t)socket_fd)))
- {
- perror(*argv);
- with_mutex (slave_mutex, running_slaves--;);
- return -1;
- }
- if ((errno = pthread_detach(*thread_slot)))
- {
- perror(*argv);
- return -1;
- }
- return 0;
-}
-