aboutsummaryrefslogtreecommitdiffstats
path: root/src
diff options
context:
space:
mode:
authorMattias Andrée <maandree@operamail.com>2014-05-19 01:22:37 +0200
committerMattias Andrée <maandree@operamail.com>2014-05-19 01:22:37 +0200
commitb1d19d091446197a64f540137e7c5259014f7cee (patch)
tree417e7206ba10582e07e6ca09c18602a5f29c56f9 /src
parentm (diff)
downloadmds-b1d19d091446197a64f540137e7c5259014f7cee.tar.gz
mds-b1d19d091446197a64f540137e7c5259014f7cee.tar.bz2
mds-b1d19d091446197a64f540137e7c5259014f7cee.tar.xz
reduce code complexity
Signed-off-by: Mattias Andrée <maandree@operamail.com>
Diffstat (limited to 'src')
-rw-r--r--src/mds-server/mds-server.c259
-rw-r--r--src/mds-server/mds-server.h10
-rw-r--r--src/mds-server/receiving.c334
-rw-r--r--src/mds-server/receiving.h36
-rw-r--r--src/mds-server/slavery.c6
5 files changed, 377 insertions, 268 deletions
diff --git a/src/mds-server/mds-server.c b/src/mds-server/mds-server.c
index a8f3836..4eefe31 100644
--- a/src/mds-server/mds-server.c
+++ b/src/mds-server/mds-server.c
@@ -27,32 +27,24 @@
#include "sending.h"
#include "slavery.h"
#include "reexec.h"
+#include "receiving.h"
#include <libmdsserver/config.h>
#include <libmdsserver/linked-list.h>
#include <libmdsserver/hash-table.h>
#include <libmdsserver/fd-table.h>
-#include <libmdsserver/mds-message.h>
#include <libmdsserver/macros.h>
#include <libmdsserver/util.h>
#include <libmdsserver/hash-help.h>
#include <stdio.h>
-#include <string.h>
-#include <stdlib.h>
#include <limits.h>
#include <unistd.h>
#include <pwd.h>
#include <errno.h>
-#include <pthread.h>
#include <sys/socket.h>
-#include <fcntl.h>
-#include <sys/stat.h>
-#include <sys/types.h>
-#include <sys/mman.h>
#include <dirent.h>
#include <inttypes.h>
-#include <time.h>
@@ -345,255 +337,6 @@ void* slave_loop(void* data)
/**
- * Perform actions that should be taken when
- * a message has been received from a client
- *
- * @param client The client has sent a message
- * @return Normally zero, but 1 if exited because of re-exec or termination
- */
-int message_received(client_t* client)
-{
- mds_message_t message = client->message;
- int assign_id = 0;
- int modifying = 0;
- int intercept = 0;
- int64_t priority = 0;
- int stop = 0;
- const char* message_id = NULL;
- uint64_t modify_id = 0;
- size_t i, n;
- char* msgbuf;
-
-
- /* Parser headers. */
- for (i = 0; i < message.header_count; i++)
- {
- const char* h = message.headers[i];
- if (strequals(h, "Command: assign-id")) assign_id = 1;
- else if (strequals(h, "Command: intercept")) intercept = 1;
- else if (strequals(h, "Modifying: yes")) modifying = 1;
- else if (strequals(h, "Stop: yes")) stop = 1;
- else if (startswith(h, "Message ID: ")) message_id = strstr(h, ": ") + 2;
- else if (startswith(h, "Priority: ")) priority = atoll(strstr(h, ": ") + 2);
- else if (startswith(h, "Modify ID: ")) modify_id = (uint64_t)atoll(strstr(h, ": ") + 2);
- }
-
-
- /* Notify waiting client about a received message modification. */
- if (modifying)
- {
- /* pthread_cond_timedwait is required to handle re-exec and termination because
- pthread_cond_timedwait and pthread_cond_wait ignore interruptions via signals. */
- struct timespec timeout =
- {
- .tv_sec = 1,
- .tv_nsec = 0
- };
- size_t address;
- client_t* recipient;
- mds_message_t* multicast;
-
- pthread_mutex_lock(&(modify_mutex));
- while (hash_table_contains_key(&modify_map, (size_t)modify_id) == 0)
- {
- if (terminating)
- {
- pthread_mutex_unlock(&(modify_mutex));
- return 1;
- }
- pthread_cond_timedwait(&slave_cond, &slave_mutex, &timeout);
- }
- address = hash_table_get(&modify_map, (size_t)modify_id);
- recipient = (client_t*)(void*)address;
- fail_if (xmalloc(multicast = recipient->modify_message, 1, mds_message_t));
- mds_message_zero_initialise(multicast);
- fail_if (xmalloc(multicast->payload, message.payload_size, char));
- memcpy(multicast->payload, message.payload, message.payload_size * sizeof(char));
- fail_if (xmalloc(multicast->headers, message.header_count, char*));
- for (i = 0; i < message.header_count; i++, multicast->header_count++)
- {
- multicast->headers[i] = strdup(message.headers[i]);
- fail_if (multicast->headers[i] == NULL);
- }
- done:
- pthread_mutex_unlock(&(modify_mutex));
- with_mutex (client->modify_mutex, pthread_cond_signal(&(client->modify_cond)););
-
- /* Do nothing more, not not even multicast this message. */
- return 0;
- pfail:
- perror(*argv);
- if (multicast != NULL)
- {
- mds_message_destroy(multicast);
- free(multicast);
- recipient->modify_message = NULL;
- }
- goto done;
- }
-
-
- if (message_id == NULL)
- {
- eprint("received message with a message ID, ignoring.");
- return 0;
- }
-
- /* Assign ID if not already assigned. */
- if (assign_id && (client->id == 0))
- {
- intercept |= 2;
- with_mutex_if (slave_mutex, (client->id = next_client_id++) == 0,
- eprint("this is impossible, ID counter has overflowed.");
- /* If the program ran for a millennium it would
- take c:a 585 assignments per nanosecond. This
- cannot possibly happen. (It would require serious
- dedication by generations of ponies (or just an alicorn)
- to maintain the process and transfer it new hardware.) */
- abort();
- );
- }
-
- /* Make the client listen for messages addressed to it. */
- if (intercept)
- {
- size_t size = 64; /* Atleast 25, otherwise we get into problems at ((intercept & 2)) */
- char* buf;
- if (xmalloc(buf, size + 1, char))
- {
- perror(*argv);
- return 0;
- }
-
- pthread_mutex_lock(&(client->mutex));
- if ((intercept & 1)) /* from payload */
- {
- char* payload = client->message.payload;
- size_t payload_size = client->message.payload_size;
- if (client->message.payload_size == 0) /* All messages */
- {
- *buf = '\0';
- add_intercept_condition(client, buf, priority, modifying, stop);
- }
- else /* Filtered messages */
- for (;;)
- {
- char* end = memchr(payload, '\n', payload_size);
- size_t len = end == NULL ? payload_size : (size_t)(end - payload);
- if (len == 0)
- {
- payload++;
- payload_size--;
- break;
- }
- if (len > size)
- {
- char* old_buf = buf;
- if (xrealloc(buf, (size <<= 1) + 1, char))
- {
- perror(*argv);
- free(old_buf);
- pthread_mutex_unlock(&(client->mutex));
- return 0;
- }
- }
- memcpy(buf, payload, len);
- buf[len] = '\0';
- add_intercept_condition(client, buf, priority, modifying, stop);
- if (end == NULL)
- break;
- payload = end + 1;
- payload_size -= len + 1;
- }
- }
- if ((intercept & 2)) /* "To: $(client->id)" */
- {
- snprintf(buf, size, "To: %" PRIu32 ":%" PRIu32,
- (uint32_t)(client->id >> 32),
- (uint32_t)(client->id >> 0));
- add_intercept_condition(client, buf, priority, modifying, 0);
- }
- pthread_mutex_unlock(&(client->mutex));
-
- free(buf);
- }
-
-
- /* Multicast the message. */
- n = mds_message_compose_size(&message);
- if ((msgbuf = malloc(n)) == NULL)
- {
- perror(*argv);
- return 0;
- }
- mds_message_compose(&message, msgbuf);
- queue_message_multicast(msgbuf, n / sizeof(char), client);
-
-
- /* Send asigned ID. */
- if (assign_id)
- {
- char* msgbuf_;
-
- /* Construct response. */
- n = 2 * 10 + strlen(message_id) + 1;
- n += strlen("ID assignment: :\nIn response to: \n\n");
- if (xmalloc(msgbuf, n, char))
- {
- perror(*argv);
- return 0;
- }
- snprintf(msgbuf, n,
- "ID assignment: %" PRIu32 ":%" PRIu32 "\n"
- "In response to: %s\n"
- "\n",
- (uint32_t)(client->id >> 32),
- (uint32_t)(client->id >> 0),
- message_id == NULL ? "" : message_id);
- n = strlen(msgbuf);
-
- /* Multicast the reply. */
- msgbuf_ = strdup(msgbuf);
- if (msgbuf_ == NULL)
- {
- perror(*argv);
- free(msgbuf);
- return 0;
- }
- queue_message_multicast(msgbuf_, n, client);
-
- /* Queue message to be sent when this function returns.
- This done to simplify `multicast_message` for re-exec and termination. */
- with_mutex (client->mutex,
- if (client->send_pending_size == 0)
- {
- /* Set the pending message. */
- client->send_pending = msgbuf;
- client->send_pending_size = n;
- }
- else
- {
- /* Concatenate message to already pending messages. */
- size_t new_len = client->send_pending_size + n;
- char* msg_new = client->send_pending;
- if (xrealloc(msg_new, new_len, char))
- perror(*argv);
- else
- {
- memcpy(msg_new + client->send_pending_size, msgbuf, n * sizeof(char));
- client->send_pending = msg_new;
- client->send_pending_size = new_len;
- }
- free(msgbuf);
- }
- );
- }
-
- return 0;
-}
-
-
-/**
* Compare two queued interceptors by priority
*
* @param a:const queued_interception_t* One of the interceptors
diff --git a/src/mds-server/mds-server.h b/src/mds-server/mds-server.h
index c4d166f..8154807 100644
--- a/src/mds-server/mds-server.h
+++ b/src/mds-server/mds-server.h
@@ -20,7 +20,6 @@
#include "client.h"
-#include "multicast.h"
#include <stddef.h>
@@ -34,15 +33,6 @@
void* slave_loop(void* data);
/**
- * Perform actions that should be taken when
- * a message has been received from a client
- *
- * @param client The client has sent a message
- * @return Normally zero, but 1 if exited because of re-exec or termination
- */
-int message_received(client_t* client);
-
-/**
* Queue a message for multicasting
*
* @param message The message
diff --git a/src/mds-server/receiving.c b/src/mds-server/receiving.c
new file mode 100644
index 0000000..e4dbfe3
--- /dev/null
+++ b/src/mds-server/receiving.c
@@ -0,0 +1,334 @@
+/**
+ * mds — A micro-display server
+ * Copyright © 2014 Mattias Andrée (maandree@member.fsf.org)
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation, either version 3 of the License, or
+ * (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
+ */
+#include "receiving.h"
+
+#include "globals.h"
+#include "client.h"
+#include "interceptors.h"
+
+#include <libmdsserver/hash-table.h>
+#include <libmdsserver/mds-message.h>
+#include <libmdsserver/macros.h>
+
+#include <stddef.h>
+#include <inttypes.h>
+#include <errno.h>
+#include <string.h>
+#include <stdio.h>
+
+
+/**
+ * Queue a message for multicasting
+ *
+ * @param message The message
+ * @param length The length of the message
+ * @param sender The original sender of the message
+ */
+void queue_message_multicast(char* message, size_t length, client_t* sender);
+
+
+/**
+ * Notify waiting client about a received message modification
+ *
+ * @param client The client whom sent the message
+ * @param message The message
+ * @param modify_id The modify ID of the message
+ * @return Normally zero, but 1 if exited because of re-exec or termination
+ */
+static int modifying_notify(client_t* client, mds_message_t message, uint64_t modify_id)
+{
+ /* pthread_cond_timedwait is required to handle re-exec and termination because
+ pthread_cond_timedwait and pthread_cond_wait ignore interruptions via signals. */
+ struct timespec timeout =
+ {
+ .tv_sec = 1,
+ .tv_nsec = 0
+ };
+ size_t address;
+ client_t* recipient;
+ mds_message_t* multicast;
+ size_t i;
+
+ pthread_mutex_lock(&(modify_mutex));
+ while (hash_table_contains_key(&modify_map, (size_t)modify_id) == 0)
+ {
+ if (terminating)
+ {
+ pthread_mutex_unlock(&(modify_mutex));
+ return 1;
+ }
+ pthread_cond_timedwait(&slave_cond, &slave_mutex, &timeout);
+ }
+ address = hash_table_get(&modify_map, (size_t)modify_id);
+ recipient = (client_t*)(void*)address;
+ fail_if (xmalloc(multicast = recipient->modify_message, 1, mds_message_t));
+ mds_message_zero_initialise(multicast);
+ fail_if (xmalloc(multicast->payload, message.payload_size, char));
+ memcpy(multicast->payload, message.payload, message.payload_size * sizeof(char));
+ fail_if (xmalloc(multicast->headers, message.header_count, char*));
+ for (i = 0; i < message.header_count; i++, multicast->header_count++)
+ {
+ multicast->headers[i] = strdup(message.headers[i]);
+ fail_if (multicast->headers[i] == NULL);
+ }
+ done:
+ pthread_mutex_unlock(&(modify_mutex));
+ with_mutex (client->modify_mutex, pthread_cond_signal(&(client->modify_cond)););
+
+ return 0;
+
+
+ pfail:
+ perror(*argv);
+ if (multicast != NULL)
+ {
+ mds_message_destroy(multicast);
+ free(multicast);
+ recipient->modify_message = NULL;
+ }
+ goto done;
+}
+
+
+/**
+ * Add intercept conditions listed in the payload of a message
+ *
+ * @param client The intercepting client
+ * @param modifying Whether then client may modify the messages
+ * @param priority The client's interception priority
+ * @param stop Whether to stop listening rather than start or reconfigure
+ * @return Zero on success, -1 on error
+ */
+static int add_intercept_conditions_from_message(client_t* client, int modifying, int64_t priority, int stop)
+{
+ int errno_ = 0;
+ char* payload = client->message.payload;
+ size_t payload_size = client->message.payload_size;
+ size_t size = 64;
+ char* buf;
+
+ if (xmalloc(buf, size + 1, char))
+ return -1;
+
+ /* All messages */
+ if (client->message.payload_size == 0)
+ {
+ *buf = '\0';
+ add_intercept_condition(client, buf, priority, modifying, stop);
+ goto done;
+ }
+
+ /* Filtered messages */
+ for (;;)
+ {
+ char* end = memchr(payload, '\n', payload_size);
+ size_t len = end == NULL ? payload_size : (size_t)(end - payload);
+ if (len == 0)
+ {
+ payload++;
+ payload_size--;
+ break;
+ }
+ if (len > size)
+ {
+ char* old_buf = buf;
+ if (xrealloc(buf, (size <<= 1) + 1, char))
+ {
+ errno_ = errno;
+ free(old_buf);
+ pthread_mutex_unlock(&(client->mutex));
+ break;
+ }
+ }
+ memcpy(buf, payload, len);
+ buf[len] = '\0';
+ add_intercept_condition(client, buf, priority, modifying, stop);
+ if (end == NULL)
+ break;
+ payload = end + 1;
+ payload_size -= len + 1;
+ }
+
+ done:
+ free(buf);
+ errno = errno_;
+ return errno_ ? -1 : 0;
+}
+
+
+/**
+ * Assign and ID to a client, if not already assigned, and send it to that client
+ *
+ * @param client The client to who an ID should be assigned
+ * @param message_id The message ID of the ID request
+ * @return Zero on success, -1 on error
+ */
+static int assign_and_send_id(client_t* client, const char* message_id)
+{
+ char* msgbuf = NULL;
+ char* msgbuf_;
+ size_t n;
+
+ /* Construct response. */
+ n = 2 * 10 + strlen(message_id) + 1;
+ n += strlen("ID assignment: :\nIn response to: \n\n");
+ fail_if (xmalloc(msgbuf, n, char));
+ snprintf(msgbuf, n,
+ "ID assignment: %" PRIu32 ":%" PRIu32 "\n"
+ "In response to: %s\n"
+ "\n",
+ (uint32_t)(client->id >> 32),
+ (uint32_t)(client->id >> 0),
+ message_id == NULL ? "" : message_id);
+ n = strlen(msgbuf);
+
+ /* Multicast the reply. */
+ msgbuf_ = strdup(msgbuf);
+ fail_if (msgbuf_ == NULL);
+ queue_message_multicast(msgbuf_, n, client);
+
+ /* Queue message to be sent when this function returns.
+ This done to simplify `multicast_message` for re-exec and termination. */
+ with_mutex (client->mutex,
+ if (client->send_pending_size == 0)
+ {
+ /* Set the pending message. */
+ client->send_pending = msgbuf;
+ client->send_pending_size = n;
+ }
+ else
+ {
+ /* Concatenate message to already pending messages. */
+ size_t new_len = client->send_pending_size + n;
+ char* msg_new = client->send_pending;
+ if (xrealloc(msg_new, new_len, char))
+ goto fail;
+ memcpy(msg_new + client->send_pending_size, msgbuf, n * sizeof(char));
+ client->send_pending = msg_new;
+ client->send_pending_size = new_len;
+ }
+ fail:
+ );
+
+ return 0;
+
+ pfail:
+ free(msgbuf);
+ return -1;
+}
+
+
+/**
+ * Perform actions that should be taken when
+ * a message has been received from a client
+ *
+ * @param client The client whom sent the message
+ * @return Normally zero, but 1 if exited because of re-exec or termination
+ */
+int message_received(client_t* client)
+{
+ mds_message_t message = client->message;
+ int assign_id = 0;
+ int modifying = 0;
+ int intercept = 0;
+ int64_t priority = 0;
+ int stop = 0;
+ const char* message_id = NULL;
+ uint64_t modify_id = 0;
+ char* msgbuf = NULL;
+ size_t i, n;
+
+
+ /* Parser headers. */
+ for (i = 0; i < message.header_count; i++)
+ {
+ const char* h = message.headers[i];
+ if (strequals(h, "Command: assign-id")) assign_id = 1;
+ else if (strequals(h, "Command: intercept")) intercept = 1;
+ else if (strequals(h, "Modifying: yes")) modifying = 1;
+ else if (strequals(h, "Stop: yes")) stop = 1;
+ else if (startswith(h, "Message ID: ")) message_id = strstr(h, ": ") + 2;
+ else if (startswith(h, "Priority: ")) priority = atoll(strstr(h, ": ") + 2);
+ else if (startswith(h, "Modify ID: ")) modify_id = (uint64_t)atoll(strstr(h, ": ") + 2);
+ }
+
+
+ /* Notify waiting client about a received message modification. */
+ if (modifying)
+ return modifying_notify(client, message, modify_id);
+ /* Do nothing more, not not even multicast this message. */
+
+
+ if (message_id == NULL)
+ {
+ eprint("received message without a message ID, ignoring.");
+ return 0;
+ }
+
+ /* Assign ID if not already assigned. */
+ if (assign_id && (client->id == 0))
+ {
+ intercept |= 2;
+ with_mutex_if (slave_mutex, (client->id = next_client_id++) == 0,
+ eprint("this is impossible, ID counter has overflowed.");
+ /* If the program ran for a millennium it would
+ take c:a 585 assignments per nanosecond. This
+ cannot possibly happen. (It would require serious
+ dedication by generations of ponies (or just an alicorn)
+ to maintain the process and transfer it new hardware.) */
+ abort();
+ );
+ }
+
+ /* Make the client listen for messages addressed to it. */
+ if (intercept)
+ {
+ pthread_mutex_lock(&(client->mutex));
+ if ((intercept & 1)) /* from payload */
+ fail_if (add_intercept_conditions_from_message(client, modifying, priority, stop) < 0);
+ if ((intercept & 2)) /* "To: $(client->id)" */
+ {
+ char buf[26];
+ xsnprintf(buf, "To: %" PRIu32 ":%" PRIu32,
+ (uint32_t)(client->id >> 32),
+ (uint32_t)(client->id >> 0));
+ add_intercept_condition(client, buf, priority, modifying, 0);
+ }
+ pthread_mutex_unlock(&(client->mutex));
+ }
+
+
+ /* Multicast the message. */
+ n = mds_message_compose_size(&message);
+ fail_if ((msgbuf = malloc(n)) == NULL);
+ mds_message_compose(&message, msgbuf);
+ queue_message_multicast(msgbuf, n / sizeof(char), client);
+ msgbuf = NULL;
+
+
+ /* Send asigned ID. */
+ if (assign_id)
+ fail_if (assign_and_send_id(client, message_id) < 0);
+
+ return 0;
+
+ pfail:
+ perror(*argv);
+ free(msgbuf);
+ return 0;
+}
diff --git a/src/mds-server/receiving.h b/src/mds-server/receiving.h
new file mode 100644
index 0000000..9f84423
--- /dev/null
+++ b/src/mds-server/receiving.h
@@ -0,0 +1,36 @@
+/**
+ * mds — A micro-display server
+ * Copyright © 2014 Mattias Andrée (maandree@member.fsf.org)
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation, either version 3 of the License, or
+ * (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
+ */
+#ifndef MDS_MDS_SERVER_RECEIVING_H
+#define MDS_MDS_SERVER_RECEIVING_H
+
+
+#include "client.h"
+
+
+/**
+ * Perform actions that should be taken when
+ * a message has been received from a client
+ *
+ * @param client The client whom sent the message
+ * @return Normally zero, but 1 if exited because of re-exec or termination
+ */
+int message_received(client_t* client);
+
+
+#endif
+
diff --git a/src/mds-server/slavery.c b/src/mds-server/slavery.c
index 1a5555e..627bde0 100644
--- a/src/mds-server/slavery.c
+++ b/src/mds-server/slavery.c
@@ -29,6 +29,12 @@
#include <stdio.h>
+/**
+ * Master function for slave threads
+ *
+ * @param data Input data
+ * @return Outout data
+ */
void* slave_loop(void*);