aboutsummaryrefslogtreecommitdiffstats
path: root/src/mds-server/sending.c
diff options
context:
space:
mode:
Diffstat (limited to '')
-rw-r--r--src/mds-server/sending.c204
1 files changed, 204 insertions, 0 deletions
diff --git a/src/mds-server/sending.c b/src/mds-server/sending.c
new file mode 100644
index 0000000..93fba5a
--- /dev/null
+++ b/src/mds-server/sending.c
@@ -0,0 +1,204 @@
+/**
+ * mds — A micro-display server
+ * Copyright © 2014 Mattias Andrée (maandree@member.fsf.org)
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation, either version 3 of the License, or
+ * (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
+ */
+#include "sending.h"
+
+#include "globals.h"
+#include "client.h"
+#include "queued_interception.h"
+#include "multicast.h"
+
+#include <libmdsserver/mds-message.h>
+#include <libmdsserver/macros.h>
+#include <libmdsserver/util.h>
+
+#include <stddef.h>
+#include <stdint.h>
+#include <errno.h>
+#include <stdio.h>
+#include <string.h>
+
+
+
+/**
+ * Get the client by its socket's file descriptor in a synchronised manner
+ *
+ * @param socket_fd The file descriptor of the client's socket
+ * @return The client
+ */
+static client_t* client_by_socket(int socket_fd)
+{
+ size_t address;
+ with_mutex (slave_mutex, address = fd_table_get(&client_map, socket_fd););
+ return (client_t*)(void*)address;
+}
+
+
+/**
+ * Send a multicast message to one recipient
+ *
+ * @param multicast The message
+ * @param recipient The recipient
+ * @param modifying Whether the recipient may modify the message
+ * @return Evaluates to true if and only if the entire message was sent
+ */
+static int send_multicast_to_recipient(multicast_t* multicast, client_t* recipient, int modifying)
+{
+ char* msg = multicast->message + multicast->message_ptr;
+ size_t n = multicast->message_length - multicast->message_ptr;
+ size_t sent;
+
+ /* Skip Modify ID header if the interceptors will not perform a modification. */
+ if ((modifying == 0) && (multicast->message_ptr == 0))
+ {
+ n -= multicast->message_prefix;
+ multicast->message_ptr += multicast->message_prefix;
+ }
+
+ /* Send the message. */
+ n *= sizeof(char);
+ with_mutex (recipient->mutex,
+ if (recipient->open)
+ {
+ sent = send_message(recipient->socket_fd, msg + multicast->message_ptr, n);
+ n -= sent;
+ multicast->message_ptr += sent / sizeof(char);
+ if ((n > 0) && (errno != EINTR))
+ perror(*argv);
+ }
+ );
+
+ return n == 0;
+}
+
+
+/**
+ * Wait for the recipient of a multicast to reply
+ *
+ * @param recipient The recipient
+ * @param modify_id The modify ID of the multicast
+ */
+static void wait_for_reply(client_t* recipient, uint64_t modify_id)
+{
+ /* pthread_cond_timedwait is required to handle re-exec and termination because
+ pthread_cond_timedwait and pthread_cond_wait ignore interruptions via signals. */
+ struct timespec timeout =
+ {
+ .tv_sec = 1,
+ .tv_nsec = 0
+ };
+
+ with_mutex_if (modify_mutex, recipient->modify_message == NULL,
+ if (hash_table_contains_key(&modify_map, (size_t)modify_id) == 0)
+ {
+ hash_table_put(&modify_map, (size_t)modify_id, (size_t)(void*)recipient);
+ pthread_cond_signal(&slave_cond);
+ }
+ );
+
+ with_mutex_if (recipient->modify_mutex, recipient->modify_message == NULL,
+ while ((recipient->modify_message == NULL) && (terminating == 0))
+ pthread_cond_timedwait(&slave_cond, &slave_mutex, &timeout);
+ if (terminating == 0)
+ hash_table_remove(&modify_map, (size_t)modify_id);
+ );
+}
+
+
+/**
+ * Multicast a message
+ *
+ * @param multicast The multicast message
+ */
+void multicast_message(multicast_t* multicast)
+{
+ uint64_t modify_id = 0;
+ size_t n = strlen("Modify ID: ");
+ if (startswith_n(multicast->message, "Modify ID: ", multicast->message_length, n))
+ {
+ char* value = multicast->message + n;
+ char* lf = strchr(value, '\n');
+ *lf = '\0';
+ modify_id = (uint64_t)atoll(value);
+ *lf = '\n';
+ }
+
+ for (; multicast->interceptions_ptr < multicast->interceptions_count; multicast->interceptions_ptr++)
+ {
+ queued_interception_t client_ = multicast->interceptions[multicast->interceptions_ptr];
+ client_t* client = client_.client;
+ int modifying = 0;
+ char* old_buf;
+ size_t i;
+ mds_message_t* mod;
+
+ /* After unmarshalling at re-exec, client will be NULL and must be mapped from its socket. */
+ if (client == NULL)
+ client_.client = client = client_by_socket(client_.socket_fd);
+
+ /* Send the message to the recipient. */
+ if (send_multicast_to_recipient(multicast, client, client_.modifying) == 0)
+ {
+ /* Stop if we are re-exec:ing or terminating, or continue to next recipient on error. */
+ if (terminating)
+ return;
+ else
+ continue;
+ }
+
+ /* Do not wait for a reply if it is non-modifying. */
+ if (client_.modifying == 0)
+ {
+ /* Reset how much of the message has been sent before we continue with next recipient. */
+ multicast->message_ptr = 0;
+ continue;
+ }
+
+ /* Wait for a reply. */
+ wait_for_reply(client, modify_id);
+ if (terminating)
+ return;
+
+ /* Act upon the reply. */
+ mod = client->modify_message;
+ for (i = 0; i < mod->header_count; i++)
+ if (strequals(mod->headers[i], "Modify: yes"))
+ {
+ modifying = 1;
+ break;
+ }
+ if (modifying)
+ {
+ n = mod->payload_size;
+ old_buf = multicast->message;
+ if (xrealloc(multicast->message, multicast->message_prefix + n, char))
+ {
+ perror(*argv);
+ multicast->message = old_buf;
+ }
+ else
+ memcpy(multicast->message + multicast->message_prefix, mod->payload, n);
+ }
+
+ /* Free the reply. */
+ mds_message_destroy(client->modify_message);
+
+ /* Reset how much of the message has been sent before we continue with next recipient. */
+ multicast->message_ptr = 0;
+ }
+}
+