From a33dd2293f619f4bdab65fbb89c29f9ac2279cc4 Mon Sep 17 00:00:00 2001 From: Mattias Andrée Date: Wed, 7 May 2014 01:13:31 +0200 Subject: misc MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Mattias Andrée --- doc/messages | 6 ++--- src/libmdsserver/util.c | 39 +++++++++++++++++++++++++++ src/libmdsserver/util.h | 13 +++++++++ src/mds-server.c | 70 ++++++++++++++++++++++++++++++++++++++++++++++--- src/mds-server.h | 5 ++++ 5 files changed, 126 insertions(+), 7 deletions(-) diff --git a/doc/messages b/doc/messages index 7236750..48c6b73 100644 --- a/doc/messages +++ b/doc/messages @@ -52,7 +52,7 @@ closed’ with the client's ID. Be aware that the ID may be 0:0, which is not unique. -Multicasts are sent by to clients that have ask for +Multicasts are sent to clients that have ask for the type of message this is being sent. This technique is also used to intercept message or receive message as part of a service that a server provides. Servers @@ -66,10 +66,10 @@ that is an signed 64-bit integer, the default value is zero. This is done by using the header ‘Priority’. A higher priority means that the message is sent earlier. If the client wishes to be able to modify the message -it most the header ‘Modifying’ with the value ‘yes’. +it most have the header ‘Modifying’ with the value ‘yes’. If the client wishes to receive all messages it should not include a payload, otherwise it which send a -LF delimited list of headers that it is interested it. +LF delimited list of headers that it is interested in. In this list it is possible to limit to exact values byte appending a colon and blank space (‘: ’) followed by the value to the header name. You can also request diff --git a/src/libmdsserver/util.c b/src/libmdsserver/util.c index 81793ae..b8b61d3 100644 --- a/src/libmdsserver/util.c +++ b/src/libmdsserver/util.c @@ -17,12 +17,15 @@ */ #include "util.h" #include "config.h" +#include "macros.h" #include #include #include #include #include +#include +#include /** @@ -106,3 +109,39 @@ int xsigaction(int signo, void (*function)(int signo)) return sigaction(signo, &action, NULL); } + +/** + * Send a message over a socket + * + * @param socket The file descriptor of the socket + * @param message The message to send + * @param length The length of the message + * @return The number of bytes that have been sent (even on error) + */ +size_t send_message(int socket, const char* message, size_t length) +{ + size_t block_size = length; + size_t sent = 0; + ssize_t just_sent; + + while (length > 0) + if ((just_sent = send(socket, message, min(block_size, length), MSG_NOSIGNAL)) < 0) + { + if (errno == EMSGSIZE) + { + block_size >>= 1; + if (block_size == 0) + return sent; + } + else if (errno != EINTR) + return sent; + } + else + { + message += (size_t)just_sent; + length -= (size_t)just_sent; + } + + return sent; +} + diff --git a/src/libmdsserver/util.h b/src/libmdsserver/util.h index 8bfaa71..3a6e9b1 100644 --- a/src/libmdsserver/util.h +++ b/src/libmdsserver/util.h @@ -19,6 +19,9 @@ #define MDS_LIBMDSSERVER_UTIL_H +#include + + /** * Read an environment variable, but handle it as undefined if empty * @@ -50,6 +53,16 @@ void reexec_server(int argc, char** argv, int reexeced); */ int xsigaction(int signo, void (*function)(int signo)); +/** + * Send a message over a socket + * + * @param socket The file descriptor of the socket + * @param message The message to send + * @param length The length of the message + * @return The number of bytes that have been sent (even on error) + */ +size_t send_message(int socket, const char* message, size_t length); + #endif diff --git a/src/mds-server.c b/src/mds-server.c index c6db625..08bd47e 100644 --- a/src/mds-server.c +++ b/src/mds-server.c @@ -42,6 +42,7 @@ #include #include #include +#include @@ -492,6 +493,8 @@ void* slave_loop(void* data) /* Store the thread so that other threads can kill it. */ information->thread = pthread_self(); + /* Create mutex to make sure two thread to not try to send messages concurrently. */ + pthread_mutex_init(&(information->mutex), NULL); /* Make the server update without all slaves dying on SIGUSR1. */ @@ -583,15 +586,74 @@ void* slave_loop(void* data) * * @param client The client has sent a message */ -void message_received(client_t* client) +void message_received(client_t* client) /* TODO */ { mds_message_t message = client->message; - size_t i; + int assign_id = 0; + const char* message_id = NULL; + size_t i, n; + char* msgbuf; + /* Parser headers. */ for (i = 0; i < message.header_count; i++) { - char* header = message.headers[i]; - /* TODO */ + const char* header = message.headers[i]; + if (strequals(header, "Command: assign-id")) + assign_id = 1; + else if (startswith(header, "Message ID: ")) + message_id = header + strlen("Message ID: "); + } + + /* Assign ID or reply with current ID. */ + if (assign_id) + { + /* Assign ID if not already assigned. */ + if (client->id == 0) + { + with_mutex(slave_mutex, + client->id = next_id++; + if (next_id == 0) + { + eprint("this is impossible, ID counter has overflowed."); + /* If the program ran for a millennium it would + take c:a 585 assignments per nanosecond. This + cannot possibly happen. (It would require serious + dedication by generations of ponies (or just an alicorn) + to maintain the process and transfer it new hardware.) */ + abort(); + } + ); + /* TODO: add interception: + To: $(assign_id) + Priority: 0 + Modifying: no + */ + } + + /* Construct response. */ + n = 2 * 10 + strlen(message_id) + 1; + n += strlen("ID assignment: :\nIn response to: \n\n"); + msgbuf = malloc(n * sizeof(char)); + if (msgbuf == NULL) + { + perror(*argv); + return; + } + snprintf(msgbuf, n, + "ID assignment: %" PRIu32 ":%" PRIu32 "\n" + "In response to: %s\n" + "\n", + (uint32_t)(client->id >> 32), + (uint32_t)(client->id >> 0), + message_id); + n = strlen(msgbuf); + + /* Send message. */ + with_mutex(client->mutex, + if (send_message(client->socket_fd, msgbuf, n) < n) /* TODO support EINTR*/ + perror(*argv); + ); + free(msgbuf); } } diff --git a/src/mds-server.h b/src/mds-server.h index cf0d24a..b751125 100644 --- a/src/mds-server.h +++ b/src/mds-server.h @@ -61,6 +61,11 @@ typedef struct client */ uint64_t id; + /** + * Mutex for sending data + */ + pthread_mutex_t mutex; + } client_t; -- cgit v1.2.3-70-g09d2