aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorMattias Andrée <maandree@operamail.com>2014-05-07 01:13:31 +0200
committerMattias Andrée <maandree@operamail.com>2014-05-07 01:13:31 +0200
commita33dd2293f619f4bdab65fbb89c29f9ac2279cc4 (patch)
tree9297cdb9f607dc92c3887d522c8e7db292e3b8e8
parentadd a todo (diff)
downloadmds-a33dd2293f619f4bdab65fbb89c29f9ac2279cc4.tar.gz
mds-a33dd2293f619f4bdab65fbb89c29f9ac2279cc4.tar.bz2
mds-a33dd2293f619f4bdab65fbb89c29f9ac2279cc4.tar.xz
misc
Signed-off-by: Mattias Andrée <maandree@operamail.com>
-rw-r--r--doc/messages6
-rw-r--r--src/libmdsserver/util.c39
-rw-r--r--src/libmdsserver/util.h13
-rw-r--r--src/mds-server.c70
-rw-r--r--src/mds-server.h5
5 files changed, 126 insertions, 7 deletions
diff --git a/doc/messages b/doc/messages
index 7236750..48c6b73 100644
--- a/doc/messages
+++ b/doc/messages
@@ -52,7 +52,7 @@ closed’ with the client's ID. Be aware that the ID
may be 0:0, which is not unique.
-Multicasts are sent by to clients that have ask for
+Multicasts are sent to clients that have ask for
the type of message this is being sent. This technique
is also used to intercept message or receive message
as part of a service that a server provides. Servers
@@ -66,10 +66,10 @@ that is an signed 64-bit integer, the default value is
zero. This is done by using the header ‘Priority’. A
higher priority means that the message is sent earlier.
If the client wishes to be able to modify the message
-it most the header ‘Modifying’ with the value ‘yes’.
+it most have the header ‘Modifying’ with the value ‘yes’.
If the client wishes to receive all messages it should
not include a payload, otherwise it which send a
-LF delimited list of headers that it is interested it.
+LF delimited list of headers that it is interested in.
In this list it is possible to limit to exact values
byte appending a colon and blank space (‘: ’) followed
by the value to the header name. You can also request
diff --git a/src/libmdsserver/util.c b/src/libmdsserver/util.c
index 81793ae..b8b61d3 100644
--- a/src/libmdsserver/util.c
+++ b/src/libmdsserver/util.c
@@ -17,12 +17,15 @@
*/
#include "util.h"
#include "config.h"
+#include "macros.h"
#include <stdlib.h>
#include <unistd.h>
#include <limits.h>
#include <string.h>
#include <signal.h>
+#include <sys/socket.h>
+#include <errno.h>
/**
@@ -106,3 +109,39 @@ int xsigaction(int signo, void (*function)(int signo))
return sigaction(signo, &action, NULL);
}
+
+/**
+ * Send a message over a socket
+ *
+ * @param socket The file descriptor of the socket
+ * @param message The message to send
+ * @param length The length of the message
+ * @return The number of bytes that have been sent (even on error)
+ */
+size_t send_message(int socket, const char* message, size_t length)
+{
+ size_t block_size = length;
+ size_t sent = 0;
+ ssize_t just_sent;
+
+ while (length > 0)
+ if ((just_sent = send(socket, message, min(block_size, length), MSG_NOSIGNAL)) < 0)
+ {
+ if (errno == EMSGSIZE)
+ {
+ block_size >>= 1;
+ if (block_size == 0)
+ return sent;
+ }
+ else if (errno != EINTR)
+ return sent;
+ }
+ else
+ {
+ message += (size_t)just_sent;
+ length -= (size_t)just_sent;
+ }
+
+ return sent;
+}
+
diff --git a/src/libmdsserver/util.h b/src/libmdsserver/util.h
index 8bfaa71..3a6e9b1 100644
--- a/src/libmdsserver/util.h
+++ b/src/libmdsserver/util.h
@@ -19,6 +19,9 @@
#define MDS_LIBMDSSERVER_UTIL_H
+#include <stddef.h>
+
+
/**
* Read an environment variable, but handle it as undefined if empty
*
@@ -50,6 +53,16 @@ void reexec_server(int argc, char** argv, int reexeced);
*/
int xsigaction(int signo, void (*function)(int signo));
+/**
+ * Send a message over a socket
+ *
+ * @param socket The file descriptor of the socket
+ * @param message The message to send
+ * @param length The length of the message
+ * @return The number of bytes that have been sent (even on error)
+ */
+size_t send_message(int socket, const char* message, size_t length);
+
#endif
diff --git a/src/mds-server.c b/src/mds-server.c
index c6db625..08bd47e 100644
--- a/src/mds-server.c
+++ b/src/mds-server.c
@@ -42,6 +42,7 @@
#include <sys/stat.h>
#include <sys/types.h>
#include <dirent.h>
+#include <inttypes.h>
@@ -492,6 +493,8 @@ void* slave_loop(void* data)
/* Store the thread so that other threads can kill it. */
information->thread = pthread_self();
+ /* Create mutex to make sure two thread to not try to send messages concurrently. */
+ pthread_mutex_init(&(information->mutex), NULL);
/* Make the server update without all slaves dying on SIGUSR1. */
@@ -583,15 +586,74 @@ void* slave_loop(void* data)
*
* @param client The client has sent a message
*/
-void message_received(client_t* client)
+void message_received(client_t* client) /* TODO */
{
mds_message_t message = client->message;
- size_t i;
+ int assign_id = 0;
+ const char* message_id = NULL;
+ size_t i, n;
+ char* msgbuf;
+ /* Parser headers. */
for (i = 0; i < message.header_count; i++)
{
- char* header = message.headers[i];
- /* TODO */
+ const char* header = message.headers[i];
+ if (strequals(header, "Command: assign-id"))
+ assign_id = 1;
+ else if (startswith(header, "Message ID: "))
+ message_id = header + strlen("Message ID: ");
+ }
+
+ /* Assign ID or reply with current ID. */
+ if (assign_id)
+ {
+ /* Assign ID if not already assigned. */
+ if (client->id == 0)
+ {
+ with_mutex(slave_mutex,
+ client->id = next_id++;
+ if (next_id == 0)
+ {
+ eprint("this is impossible, ID counter has overflowed.");
+ /* If the program ran for a millennium it would
+ take c:a 585 assignments per nanosecond. This
+ cannot possibly happen. (It would require serious
+ dedication by generations of ponies (or just an alicorn)
+ to maintain the process and transfer it new hardware.) */
+ abort();
+ }
+ );
+ /* TODO: add interception:
+ To: $(assign_id)
+ Priority: 0
+ Modifying: no
+ */
+ }
+
+ /* Construct response. */
+ n = 2 * 10 + strlen(message_id) + 1;
+ n += strlen("ID assignment: :\nIn response to: \n\n");
+ msgbuf = malloc(n * sizeof(char));
+ if (msgbuf == NULL)
+ {
+ perror(*argv);
+ return;
+ }
+ snprintf(msgbuf, n,
+ "ID assignment: %" PRIu32 ":%" PRIu32 "\n"
+ "In response to: %s\n"
+ "\n",
+ (uint32_t)(client->id >> 32),
+ (uint32_t)(client->id >> 0),
+ message_id);
+ n = strlen(msgbuf);
+
+ /* Send message. */
+ with_mutex(client->mutex,
+ if (send_message(client->socket_fd, msgbuf, n) < n) /* TODO support EINTR*/
+ perror(*argv);
+ );
+ free(msgbuf);
}
}
diff --git a/src/mds-server.h b/src/mds-server.h
index cf0d24a..b751125 100644
--- a/src/mds-server.h
+++ b/src/mds-server.h
@@ -61,6 +61,11 @@ typedef struct client
*/
uint64_t id;
+ /**
+ * Mutex for sending data
+ */
+ pthread_mutex_t mutex;
+
} client_t;