aboutsummaryrefslogtreecommitdiffstats
path: root/src/mds-server/mds-server.c
diff options
context:
space:
mode:
Diffstat (limited to '')
-rw-r--r--src/mds-server/mds-server.c259
1 files changed, 1 insertions, 258 deletions
diff --git a/src/mds-server/mds-server.c b/src/mds-server/mds-server.c
index a8f3836..4eefe31 100644
--- a/src/mds-server/mds-server.c
+++ b/src/mds-server/mds-server.c
@@ -27,32 +27,24 @@
#include "sending.h"
#include "slavery.h"
#include "reexec.h"
+#include "receiving.h"
#include <libmdsserver/config.h>
#include <libmdsserver/linked-list.h>
#include <libmdsserver/hash-table.h>
#include <libmdsserver/fd-table.h>
-#include <libmdsserver/mds-message.h>
#include <libmdsserver/macros.h>
#include <libmdsserver/util.h>
#include <libmdsserver/hash-help.h>
#include <stdio.h>
-#include <string.h>
-#include <stdlib.h>
#include <limits.h>
#include <unistd.h>
#include <pwd.h>
#include <errno.h>
-#include <pthread.h>
#include <sys/socket.h>
-#include <fcntl.h>
-#include <sys/stat.h>
-#include <sys/types.h>
-#include <sys/mman.h>
#include <dirent.h>
#include <inttypes.h>
-#include <time.h>
@@ -345,255 +337,6 @@ void* slave_loop(void* data)
/**
- * Perform actions that should be taken when
- * a message has been received from a client
- *
- * @param client The client has sent a message
- * @return Normally zero, but 1 if exited because of re-exec or termination
- */
-int message_received(client_t* client)
-{
- mds_message_t message = client->message;
- int assign_id = 0;
- int modifying = 0;
- int intercept = 0;
- int64_t priority = 0;
- int stop = 0;
- const char* message_id = NULL;
- uint64_t modify_id = 0;
- size_t i, n;
- char* msgbuf;
-
-
- /* Parser headers. */
- for (i = 0; i < message.header_count; i++)
- {
- const char* h = message.headers[i];
- if (strequals(h, "Command: assign-id")) assign_id = 1;
- else if (strequals(h, "Command: intercept")) intercept = 1;
- else if (strequals(h, "Modifying: yes")) modifying = 1;
- else if (strequals(h, "Stop: yes")) stop = 1;
- else if (startswith(h, "Message ID: ")) message_id = strstr(h, ": ") + 2;
- else if (startswith(h, "Priority: ")) priority = atoll(strstr(h, ": ") + 2);
- else if (startswith(h, "Modify ID: ")) modify_id = (uint64_t)atoll(strstr(h, ": ") + 2);
- }
-
-
- /* Notify waiting client about a received message modification. */
- if (modifying)
- {
- /* pthread_cond_timedwait is required to handle re-exec and termination because
- pthread_cond_timedwait and pthread_cond_wait ignore interruptions via signals. */
- struct timespec timeout =
- {
- .tv_sec = 1,
- .tv_nsec = 0
- };
- size_t address;
- client_t* recipient;
- mds_message_t* multicast;
-
- pthread_mutex_lock(&(modify_mutex));
- while (hash_table_contains_key(&modify_map, (size_t)modify_id) == 0)
- {
- if (terminating)
- {
- pthread_mutex_unlock(&(modify_mutex));
- return 1;
- }
- pthread_cond_timedwait(&slave_cond, &slave_mutex, &timeout);
- }
- address = hash_table_get(&modify_map, (size_t)modify_id);
- recipient = (client_t*)(void*)address;
- fail_if (xmalloc(multicast = recipient->modify_message, 1, mds_message_t));
- mds_message_zero_initialise(multicast);
- fail_if (xmalloc(multicast->payload, message.payload_size, char));
- memcpy(multicast->payload, message.payload, message.payload_size * sizeof(char));
- fail_if (xmalloc(multicast->headers, message.header_count, char*));
- for (i = 0; i < message.header_count; i++, multicast->header_count++)
- {
- multicast->headers[i] = strdup(message.headers[i]);
- fail_if (multicast->headers[i] == NULL);
- }
- done:
- pthread_mutex_unlock(&(modify_mutex));
- with_mutex (client->modify_mutex, pthread_cond_signal(&(client->modify_cond)););
-
- /* Do nothing more, not not even multicast this message. */
- return 0;
- pfail:
- perror(*argv);
- if (multicast != NULL)
- {
- mds_message_destroy(multicast);
- free(multicast);
- recipient->modify_message = NULL;
- }
- goto done;
- }
-
-
- if (message_id == NULL)
- {
- eprint("received message with a message ID, ignoring.");
- return 0;
- }
-
- /* Assign ID if not already assigned. */
- if (assign_id && (client->id == 0))
- {
- intercept |= 2;
- with_mutex_if (slave_mutex, (client->id = next_client_id++) == 0,
- eprint("this is impossible, ID counter has overflowed.");
- /* If the program ran for a millennium it would
- take c:a 585 assignments per nanosecond. This
- cannot possibly happen. (It would require serious
- dedication by generations of ponies (or just an alicorn)
- to maintain the process and transfer it new hardware.) */
- abort();
- );
- }
-
- /* Make the client listen for messages addressed to it. */
- if (intercept)
- {
- size_t size = 64; /* Atleast 25, otherwise we get into problems at ((intercept & 2)) */
- char* buf;
- if (xmalloc(buf, size + 1, char))
- {
- perror(*argv);
- return 0;
- }
-
- pthread_mutex_lock(&(client->mutex));
- if ((intercept & 1)) /* from payload */
- {
- char* payload = client->message.payload;
- size_t payload_size = client->message.payload_size;
- if (client->message.payload_size == 0) /* All messages */
- {
- *buf = '\0';
- add_intercept_condition(client, buf, priority, modifying, stop);
- }
- else /* Filtered messages */
- for (;;)
- {
- char* end = memchr(payload, '\n', payload_size);
- size_t len = end == NULL ? payload_size : (size_t)(end - payload);
- if (len == 0)
- {
- payload++;
- payload_size--;
- break;
- }
- if (len > size)
- {
- char* old_buf = buf;
- if (xrealloc(buf, (size <<= 1) + 1, char))
- {
- perror(*argv);
- free(old_buf);
- pthread_mutex_unlock(&(client->mutex));
- return 0;
- }
- }
- memcpy(buf, payload, len);
- buf[len] = '\0';
- add_intercept_condition(client, buf, priority, modifying, stop);
- if (end == NULL)
- break;
- payload = end + 1;
- payload_size -= len + 1;
- }
- }
- if ((intercept & 2)) /* "To: $(client->id)" */
- {
- snprintf(buf, size, "To: %" PRIu32 ":%" PRIu32,
- (uint32_t)(client->id >> 32),
- (uint32_t)(client->id >> 0));
- add_intercept_condition(client, buf, priority, modifying, 0);
- }
- pthread_mutex_unlock(&(client->mutex));
-
- free(buf);
- }
-
-
- /* Multicast the message. */
- n = mds_message_compose_size(&message);
- if ((msgbuf = malloc(n)) == NULL)
- {
- perror(*argv);
- return 0;
- }
- mds_message_compose(&message, msgbuf);
- queue_message_multicast(msgbuf, n / sizeof(char), client);
-
-
- /* Send asigned ID. */
- if (assign_id)
- {
- char* msgbuf_;
-
- /* Construct response. */
- n = 2 * 10 + strlen(message_id) + 1;
- n += strlen("ID assignment: :\nIn response to: \n\n");
- if (xmalloc(msgbuf, n, char))
- {
- perror(*argv);
- return 0;
- }
- snprintf(msgbuf, n,
- "ID assignment: %" PRIu32 ":%" PRIu32 "\n"
- "In response to: %s\n"
- "\n",
- (uint32_t)(client->id >> 32),
- (uint32_t)(client->id >> 0),
- message_id == NULL ? "" : message_id);
- n = strlen(msgbuf);
-
- /* Multicast the reply. */
- msgbuf_ = strdup(msgbuf);
- if (msgbuf_ == NULL)
- {
- perror(*argv);
- free(msgbuf);
- return 0;
- }
- queue_message_multicast(msgbuf_, n, client);
-
- /* Queue message to be sent when this function returns.
- This done to simplify `multicast_message` for re-exec and termination. */
- with_mutex (client->mutex,
- if (client->send_pending_size == 0)
- {
- /* Set the pending message. */
- client->send_pending = msgbuf;
- client->send_pending_size = n;
- }
- else
- {
- /* Concatenate message to already pending messages. */
- size_t new_len = client->send_pending_size + n;
- char* msg_new = client->send_pending;
- if (xrealloc(msg_new, new_len, char))
- perror(*argv);
- else
- {
- memcpy(msg_new + client->send_pending_size, msgbuf, n * sizeof(char));
- client->send_pending = msg_new;
- client->send_pending_size = new_len;
- }
- free(msgbuf);
- }
- );
- }
-
- return 0;
-}
-
-
-/**
* Compare two queued interceptors by priority
*
* @param a:const queued_interception_t* One of the interceptors