diff options
Diffstat (limited to '')
-rw-r--r-- | src/libmdsserver/util.c | 39 | ||||
-rw-r--r-- | src/libmdsserver/util.h | 13 | ||||
-rw-r--r-- | src/mds-server.c | 70 | ||||
-rw-r--r-- | src/mds-server.h | 5 |
4 files changed, 123 insertions, 4 deletions
diff --git a/src/libmdsserver/util.c b/src/libmdsserver/util.c index 81793ae..b8b61d3 100644 --- a/src/libmdsserver/util.c +++ b/src/libmdsserver/util.c @@ -17,12 +17,15 @@ */ #include "util.h" #include "config.h" +#include "macros.h" #include <stdlib.h> #include <unistd.h> #include <limits.h> #include <string.h> #include <signal.h> +#include <sys/socket.h> +#include <errno.h> /** @@ -106,3 +109,39 @@ int xsigaction(int signo, void (*function)(int signo)) return sigaction(signo, &action, NULL); } + +/** + * Send a message over a socket + * + * @param socket The file descriptor of the socket + * @param message The message to send + * @param length The length of the message + * @return The number of bytes that have been sent (even on error) + */ +size_t send_message(int socket, const char* message, size_t length) +{ + size_t block_size = length; + size_t sent = 0; + ssize_t just_sent; + + while (length > 0) + if ((just_sent = send(socket, message, min(block_size, length), MSG_NOSIGNAL)) < 0) + { + if (errno == EMSGSIZE) + { + block_size >>= 1; + if (block_size == 0) + return sent; + } + else if (errno != EINTR) + return sent; + } + else + { + message += (size_t)just_sent; + length -= (size_t)just_sent; + } + + return sent; +} + diff --git a/src/libmdsserver/util.h b/src/libmdsserver/util.h index 8bfaa71..3a6e9b1 100644 --- a/src/libmdsserver/util.h +++ b/src/libmdsserver/util.h @@ -19,6 +19,9 @@ #define MDS_LIBMDSSERVER_UTIL_H +#include <stddef.h> + + /** * Read an environment variable, but handle it as undefined if empty * @@ -50,6 +53,16 @@ void reexec_server(int argc, char** argv, int reexeced); */ int xsigaction(int signo, void (*function)(int signo)); +/** + * Send a message over a socket + * + * @param socket The file descriptor of the socket + * @param message The message to send + * @param length The length of the message + * @return The number of bytes that have been sent (even on error) + */ +size_t send_message(int socket, const char* message, size_t length); + #endif diff --git a/src/mds-server.c b/src/mds-server.c index c6db625..08bd47e 100644 --- a/src/mds-server.c +++ b/src/mds-server.c @@ -42,6 +42,7 @@ #include <sys/stat.h> #include <sys/types.h> #include <dirent.h> +#include <inttypes.h> @@ -492,6 +493,8 @@ void* slave_loop(void* data) /* Store the thread so that other threads can kill it. */ information->thread = pthread_self(); + /* Create mutex to make sure two thread to not try to send messages concurrently. */ + pthread_mutex_init(&(information->mutex), NULL); /* Make the server update without all slaves dying on SIGUSR1. */ @@ -583,15 +586,74 @@ void* slave_loop(void* data) * * @param client The client has sent a message */ -void message_received(client_t* client) +void message_received(client_t* client) /* TODO */ { mds_message_t message = client->message; - size_t i; + int assign_id = 0; + const char* message_id = NULL; + size_t i, n; + char* msgbuf; + /* Parser headers. */ for (i = 0; i < message.header_count; i++) { - char* header = message.headers[i]; - /* TODO */ + const char* header = message.headers[i]; + if (strequals(header, "Command: assign-id")) + assign_id = 1; + else if (startswith(header, "Message ID: ")) + message_id = header + strlen("Message ID: "); + } + + /* Assign ID or reply with current ID. */ + if (assign_id) + { + /* Assign ID if not already assigned. */ + if (client->id == 0) + { + with_mutex(slave_mutex, + client->id = next_id++; + if (next_id == 0) + { + eprint("this is impossible, ID counter has overflowed."); + /* If the program ran for a millennium it would + take c:a 585 assignments per nanosecond. This + cannot possibly happen. (It would require serious + dedication by generations of ponies (or just an alicorn) + to maintain the process and transfer it new hardware.) */ + abort(); + } + ); + /* TODO: add interception: + To: $(assign_id) + Priority: 0 + Modifying: no + */ + } + + /* Construct response. */ + n = 2 * 10 + strlen(message_id) + 1; + n += strlen("ID assignment: :\nIn response to: \n\n"); + msgbuf = malloc(n * sizeof(char)); + if (msgbuf == NULL) + { + perror(*argv); + return; + } + snprintf(msgbuf, n, + "ID assignment: %" PRIu32 ":%" PRIu32 "\n" + "In response to: %s\n" + "\n", + (uint32_t)(client->id >> 32), + (uint32_t)(client->id >> 0), + message_id); + n = strlen(msgbuf); + + /* Send message. */ + with_mutex(client->mutex, + if (send_message(client->socket_fd, msgbuf, n) < n) /* TODO support EINTR*/ + perror(*argv); + ); + free(msgbuf); } } diff --git a/src/mds-server.h b/src/mds-server.h index cf0d24a..b751125 100644 --- a/src/mds-server.h +++ b/src/mds-server.h @@ -61,6 +61,11 @@ typedef struct client */ uint64_t id; + /** + * Mutex for sending data + */ + pthread_mutex_t mutex; + } client_t; |