aboutsummaryrefslogtreecommitdiffstats
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to '')
-rw-r--r--src/libmdsserver/util.c39
-rw-r--r--src/libmdsserver/util.h13
-rw-r--r--src/mds-server.c70
-rw-r--r--src/mds-server.h5
4 files changed, 123 insertions, 4 deletions
diff --git a/src/libmdsserver/util.c b/src/libmdsserver/util.c
index 81793ae..b8b61d3 100644
--- a/src/libmdsserver/util.c
+++ b/src/libmdsserver/util.c
@@ -17,12 +17,15 @@
*/
#include "util.h"
#include "config.h"
+#include "macros.h"
#include <stdlib.h>
#include <unistd.h>
#include <limits.h>
#include <string.h>
#include <signal.h>
+#include <sys/socket.h>
+#include <errno.h>
/**
@@ -106,3 +109,39 @@ int xsigaction(int signo, void (*function)(int signo))
return sigaction(signo, &action, NULL);
}
+
+/**
+ * Send a message over a socket
+ *
+ * @param socket The file descriptor of the socket
+ * @param message The message to send
+ * @param length The length of the message
+ * @return The number of bytes that have been sent (even on error)
+ */
+size_t send_message(int socket, const char* message, size_t length)
+{
+ size_t block_size = length;
+ size_t sent = 0;
+ ssize_t just_sent;
+
+ while (length > 0)
+ if ((just_sent = send(socket, message, min(block_size, length), MSG_NOSIGNAL)) < 0)
+ {
+ if (errno == EMSGSIZE)
+ {
+ block_size >>= 1;
+ if (block_size == 0)
+ return sent;
+ }
+ else if (errno != EINTR)
+ return sent;
+ }
+ else
+ {
+ message += (size_t)just_sent;
+ length -= (size_t)just_sent;
+ }
+
+ return sent;
+}
+
diff --git a/src/libmdsserver/util.h b/src/libmdsserver/util.h
index 8bfaa71..3a6e9b1 100644
--- a/src/libmdsserver/util.h
+++ b/src/libmdsserver/util.h
@@ -19,6 +19,9 @@
#define MDS_LIBMDSSERVER_UTIL_H
+#include <stddef.h>
+
+
/**
* Read an environment variable, but handle it as undefined if empty
*
@@ -50,6 +53,16 @@ void reexec_server(int argc, char** argv, int reexeced);
*/
int xsigaction(int signo, void (*function)(int signo));
+/**
+ * Send a message over a socket
+ *
+ * @param socket The file descriptor of the socket
+ * @param message The message to send
+ * @param length The length of the message
+ * @return The number of bytes that have been sent (even on error)
+ */
+size_t send_message(int socket, const char* message, size_t length);
+
#endif
diff --git a/src/mds-server.c b/src/mds-server.c
index c6db625..08bd47e 100644
--- a/src/mds-server.c
+++ b/src/mds-server.c
@@ -42,6 +42,7 @@
#include <sys/stat.h>
#include <sys/types.h>
#include <dirent.h>
+#include <inttypes.h>
@@ -492,6 +493,8 @@ void* slave_loop(void* data)
/* Store the thread so that other threads can kill it. */
information->thread = pthread_self();
+ /* Create mutex to make sure two thread to not try to send messages concurrently. */
+ pthread_mutex_init(&(information->mutex), NULL);
/* Make the server update without all slaves dying on SIGUSR1. */
@@ -583,15 +586,74 @@ void* slave_loop(void* data)
*
* @param client The client has sent a message
*/
-void message_received(client_t* client)
+void message_received(client_t* client) /* TODO */
{
mds_message_t message = client->message;
- size_t i;
+ int assign_id = 0;
+ const char* message_id = NULL;
+ size_t i, n;
+ char* msgbuf;
+ /* Parser headers. */
for (i = 0; i < message.header_count; i++)
{
- char* header = message.headers[i];
- /* TODO */
+ const char* header = message.headers[i];
+ if (strequals(header, "Command: assign-id"))
+ assign_id = 1;
+ else if (startswith(header, "Message ID: "))
+ message_id = header + strlen("Message ID: ");
+ }
+
+ /* Assign ID or reply with current ID. */
+ if (assign_id)
+ {
+ /* Assign ID if not already assigned. */
+ if (client->id == 0)
+ {
+ with_mutex(slave_mutex,
+ client->id = next_id++;
+ if (next_id == 0)
+ {
+ eprint("this is impossible, ID counter has overflowed.");
+ /* If the program ran for a millennium it would
+ take c:a 585 assignments per nanosecond. This
+ cannot possibly happen. (It would require serious
+ dedication by generations of ponies (or just an alicorn)
+ to maintain the process and transfer it new hardware.) */
+ abort();
+ }
+ );
+ /* TODO: add interception:
+ To: $(assign_id)
+ Priority: 0
+ Modifying: no
+ */
+ }
+
+ /* Construct response. */
+ n = 2 * 10 + strlen(message_id) + 1;
+ n += strlen("ID assignment: :\nIn response to: \n\n");
+ msgbuf = malloc(n * sizeof(char));
+ if (msgbuf == NULL)
+ {
+ perror(*argv);
+ return;
+ }
+ snprintf(msgbuf, n,
+ "ID assignment: %" PRIu32 ":%" PRIu32 "\n"
+ "In response to: %s\n"
+ "\n",
+ (uint32_t)(client->id >> 32),
+ (uint32_t)(client->id >> 0),
+ message_id);
+ n = strlen(msgbuf);
+
+ /* Send message. */
+ with_mutex(client->mutex,
+ if (send_message(client->socket_fd, msgbuf, n) < n) /* TODO support EINTR*/
+ perror(*argv);
+ );
+ free(msgbuf);
}
}
diff --git a/src/mds-server.h b/src/mds-server.h
index cf0d24a..b751125 100644
--- a/src/mds-server.h
+++ b/src/mds-server.h
@@ -61,6 +61,11 @@ typedef struct client
*/
uint64_t id;
+ /**
+ * Mutex for sending data
+ */
+ pthread_mutex_t mutex;
+
} client_t;