diff options
-rw-r--r-- | Makefile | 15 | ||||
-rw-r--r-- | src/libmdsserver/macros.h | 4 | ||||
-rw-r--r-- | src/mds-server/client.c | 40 | ||||
-rw-r--r-- | src/mds-server/client.h | 15 | ||||
-rw-r--r-- | src/mds-server/mds-server.c | 186 | ||||
-rw-r--r-- | src/mds-server/mds-server.h | 13 | ||||
-rw-r--r-- | src/mds-server/multicast.c | 156 | ||||
-rw-r--r-- | src/mds-server/multicast.h | 120 | ||||
-rw-r--r-- | src/mds-server/queued_interception.c | 78 | ||||
-rw-r--r-- | src/mds-server/queued_interception.h | 42 |
10 files changed, 623 insertions, 46 deletions
@@ -69,31 +69,32 @@ LIBOBJ = linked-list hash-table fd-table mds-message util all: bin/mds bin/mds-server/mds-server bin/libmdsserver.so -MDS_SERVER_OBJ = mds-server/mds-server mds-server/interception_condition mds-server/client +MDS_SERVER_OBJ = mds-server/mds-server mds-server/interception_condition mds-server/client \ + mds-server/multicast mds-server/queued_interception bin/mds-server/mds-server: $(foreach O,$(MDS_SERVER_OBJ),obj/$(O).o) bin/libmdsserver.so mkdir -p $(shell dirname $@) - gcc $(C_FLAGS) -o $@ -Lbin -lmdsserver -lrt $(foreach O,$(MDS_SERVER_OBJ),obj/$(O).o) + $(CC) $(C_FLAGS) -o $@ -Lbin -lmdsserver -lrt $(foreach O,$(MDS_SERVER_OBJ),obj/$(O).o) bin/%: obj/%.o bin/libmdsserver.so mkdir -p $(shell dirname $@) - gcc $(C_FLAGS) -o $@ -Lbin -lmdsserver -lrt obj/$*.o + $(CC) $(C_FLAGS) -o $@ -Lbin -lmdsserver -lrt obj/$*.o obj/mds-server/%.o: src/mds-server/%.c src/mds-server/*.h src/libmdsserver/*.h mkdir -p $(shell dirname $@) - gcc $(C_FLAGS) -Isrc -c -o $@ $< + $(CC) $(C_FLAGS) -Isrc -c -o $@ $< obj/%.o: src/%.c src/*.h src/libmdsserver/*.h mkdir -p $(shell dirname $@) - gcc $(C_FLAGS) -Isrc -c -o $@ $< + $(CC) $(C_FLAGS) -Isrc -c -o $@ $< bin/libmdsserver.so: $(foreach O,$(LIBOBJ),obj/libmdsserver/$(O).o) mkdir -p $(shell dirname $@) - gcc $(C_FLAGS) -shared -o $@ $^ + $(CC) $(C_FLAGS) -shared -o $@ $^ obj/libmdsserver/%.o: src/libmdsserver/%.c src/libmdsserver/*.h mkdir -p $(shell dirname $@) - gcc $(C_FLAGS) -fPIC -c -o $@ $< + $(CC) $(C_FLAGS) -fPIC -c -o $@ $< diff --git a/src/libmdsserver/macros.h b/src/libmdsserver/macros.h index 32024a3..d70afc9 100644 --- a/src/libmdsserver/macros.h +++ b/src/libmdsserver/macros.h @@ -73,9 +73,9 @@ * @param instructions The instructions to run while the mutex is locked */ #define with_mutex(mutex, instructions) \ - pthread_mutex_lock(&(mutex)); \ + errno = pthread_mutex_lock(&(mutex)); \ instructions \ - pthread_mutex_unlock(&(mutex)) + errno = pthread_mutex_unlock(&(mutex)) /** diff --git a/src/mds-server/client.c b/src/mds-server/client.c index 0e8e0b5..0a8b107 100644 --- a/src/mds-server/client.c +++ b/src/mds-server/client.c @@ -17,6 +17,8 @@ */ #include "client.h" +#include "multicast.h" + #include <libmdsserver/macros.h> #include <stdlib.h> @@ -42,6 +44,13 @@ void client_destroy(client_t* restrict this) if (this->mutex_created) pthread_mutex_destroy(&(this->mutex)); mds_message_destroy(&(this->message)); + if (this->multicasts != NULL) + { + size_t i; + for (i = 0; i < this->multicasts_count; i++) + multicast_destroy(this->multicasts + i); + free(this->multicasts); + } free(this->send_pending); free(this); } @@ -55,12 +64,14 @@ void client_destroy(client_t* restrict this) */ size_t client_marshal_size(const client_t* restrict this) { - size_t n = sizeof(ssize_t) + 2 * sizeof(int) + sizeof(uint64_t) + 3 * sizeof(size_t); + size_t n = sizeof(ssize_t) + 2 * sizeof(int) + sizeof(uint64_t) + 4 * sizeof(size_t); size_t i; n += mds_message_marshal_size(&(this->message), 1); for (i = 0; i < this->interception_conditions_count; i++) n += interception_condition_marshal_size(this->interception_conditions + i); + for (i = 0; i < this->multicasts_count; i++) + n += multicast_marshal_size(this->multicasts + i); n += this->send_pending_size * sizeof(char); return n; @@ -88,6 +99,9 @@ size_t client_marshal(const client_t* restrict this, char* restrict data) buf_set_next(data, size_t, this->interception_conditions_count); for (i = 0; i < this->interception_conditions_count; i++) data += interception_condition_marshal(this->interception_conditions + i, data) / sizeof(char); + buf_set_next(data, size_t, this->multicasts_count); + for (i = 0; i < this->multicasts_count; i++) + data += multicast_marshal(this->multicasts + i, data) / sizeof(char); buf_set_next(data, size_t, this->send_pending_size); if (this->send_pending_size > 0) memcpy(data, this->send_pending, this->send_pending_size * sizeof(char)); @@ -104,10 +118,12 @@ size_t client_marshal(const client_t* restrict this, char* restrict data) */ size_t client_unmarshal(client_t* restrict this, char* restrict data) { - size_t i, n, rc = sizeof(ssize_t) + 2 * sizeof(int) + sizeof(uint64_t) + 3 * sizeof(size_t); + size_t i, n, rc = sizeof(ssize_t) + 2 * sizeof(int) + sizeof(uint64_t) + 4 * sizeof(size_t); this->interception_conditions = NULL; + this->multicasts = NULL; this->send_pending = NULL; this->mutex_created = 0; + this->multicasts_count = 0; buf_get_next(data, ssize_t, this->list_entry); buf_get_next(data, int, this->socket_fd); buf_get_next(data, int, this->open); @@ -131,6 +147,16 @@ size_t client_unmarshal(client_t* restrict this, char* restrict data) data += n / sizeof(char); rc += n; } + buf_get_next(data, size_t, n); + if (xmalloc(this->multicasts, n, sizeof(multicast_t))) + goto fail; + for (i = 0; i < n; i++, this->multicasts_count++) + { + size_t m = multicast_unmarshal(this->multicasts + i, data); + if (m == 0) + goto fail; + data += m / sizeof(char); + } buf_get_next(data, size_t, this->send_pending_size); if (this->send_pending_size > 0) { @@ -145,6 +171,9 @@ size_t client_unmarshal(client_t* restrict this, char* restrict data) for (i = 0; i < this->interception_conditions_count; i++) free(this->interception_conditions[i].condition); free(this->interception_conditions); + for (i = 0; i < this->multicasts_count; i++) + multicast_destroy(this->multicasts + i); + free(this->multicasts); free(this->send_pending); return 0; } @@ -167,6 +196,13 @@ size_t client_unmarshal_skip(char* restrict data) buf_get_next(data, size_t, c); while (c--) { + n = multicast_unmarshal_skip(data); + data += n / sizeof(char); + rc += n; + } + buf_get_next(data, size_t, c); + while (c--) + { n = interception_condition_unmarshal_skip(data); data += n / sizeof(char); rc += n; diff --git a/src/mds-server/client.h b/src/mds-server/client.h index 6b98ade..da5d14f 100644 --- a/src/mds-server/client.h +++ b/src/mds-server/client.h @@ -20,6 +20,7 @@ #include "interception_condition.h" +#include "multicast.h" #include <libmdsserver/mds-message.h> @@ -52,7 +53,7 @@ typedef struct client /** * Message read buffer for the client */ - mds_message_t message; + struct mds_message message; /** * The read thread for the client @@ -79,7 +80,7 @@ typedef struct client * The messages interception conditions conditions * for the client */ - interception_condition_t* interception_conditions; + struct interception_condition* interception_conditions; /** * The number of interception conditions @@ -87,6 +88,16 @@ typedef struct client size_t interception_conditions_count; /** + * Pending multicast messages + */ + struct multicast* multicasts; + + /** + * The number of pending multicast messages + */ + size_t multicasts_count; + + /** * Messages pending to be sent (concatenated) */ char* send_pending; diff --git a/src/mds-server/mds-server.c b/src/mds-server/mds-server.c index 00f8ceb..bc3b20f 100644 --- a/src/mds-server/mds-server.c +++ b/src/mds-server/mds-server.c @@ -16,9 +16,11 @@ * along with this program. If not, see <http://www.gnu.org/licenses/>. */ #include "mds-server.h" + #include "interception_condition.h" #include "client.h" #include "queued_interception.h" +#include "multicast.h" #include <libmdsserver/config.h> #include <libmdsserver/linked-list.h> @@ -108,6 +110,11 @@ static linked_list_t client_list; */ static uint64_t next_id = 1; +/** + * The next free ID for a message modifications + */ +static uint64_t next_modify_id = 1; + /** @@ -457,12 +464,13 @@ void* slave_loop(void* data) /* Add client to table. */ tmp = fd_table_put(&client_map, socket_fd, (size_t)(void*)information); - pthread_mutex_unlock(&slave_mutex); if ((tmp == 0) && errno) { perror(*argv); + pthread_mutex_unlock(&slave_mutex); goto fail; } + pthread_mutex_unlock(&slave_mutex); /* Fill information table. */ information->list_entry = entry; @@ -503,6 +511,27 @@ void* slave_loop(void* data) if (information->open) while (reexecing == 0) { + /* Send queued multicast messages. */ + if (information->multicasts_count > 0) + { + multicast_t multicast; + with_mutex(information->mutex, + if (information->multicasts_count > 0) + { + size_t c = information->multicasts_count -= 1; + multicast = information->multicasts[0]; + memmove(information->multicasts, information->multicasts + 1, c); + if (c == 0) + { + free(information->multicasts); + information->multicasts = NULL; + } + } + ); + multicast_message(&multicast); + multicast_destroy(&multicast); + } + /* Send queued messages. */ if (information->send_pending_size > 0) { @@ -572,7 +601,7 @@ void* slave_loop(void* data) (uint32_t)(information->id >> 32), (uint32_t)(information->id >> 0)); n = strlen(msgbuf) + 1; - multicast_message(msgbuf, n); + queue_message_multicast(msgbuf, n, information); fail: /* The loop does break, this done on success as well. */ @@ -733,13 +762,14 @@ void message_received(client_t* client) return; } mds_message_marshal(&message, msgbuf, 0); - multicast_message(msgbuf, n / sizeof(char)); /* TODO support re-exec */ - free(msgbuf); + queue_message_multicast(msgbuf, n / sizeof(char), client); /* Send asigned ID. */ if (assign_id) { + char* msgbuf_; + /* Construct response. */ n = 2 * 10 + strlen(message_id) + 1; n += strlen("ID assignment: :\nIn response to: \n\n"); @@ -758,7 +788,14 @@ void message_received(client_t* client) n = strlen(msgbuf); /* Multicast the reply. */ - multicast_message(msgbuf, n); /* TODO support re-exec */ + msgbuf_ = strdup(msgbuf); + if (msgbuf_ == NULL) + { + perror(*argv); + free(msgbuf); + return; + } + queue_message_multicast(msgbuf, n, client); /* Queue message to be sent when this function returns. This done to simplify `multicast_message` for re-exec. */ @@ -925,12 +962,13 @@ static int cmp_queued_interception(const void* a, const void* b) /** - * Multicast a message + * Queue a message for multicasting * * @param message The message * @param length The length of the message + * @param sender The original sender of the message */ -void multicast_message(char* message, size_t length) +void queue_message_multicast(char* message, size_t length, client_t* sender) { size_t header_count = 0; size_t n = length - 1; @@ -939,8 +977,12 @@ void multicast_message(char* message, size_t length) char** header_values = NULL; queued_interception_t* interceptions = NULL; size_t interceptions_count = 0; + multicast_t* multicast = NULL; size_t i; ssize_t node; + uint64_t modify_id; + char modify_id_header[13 + 3 * sizeof(uint64_t)]; + char* old_buf; /* Count the number of headers. */ for (i = 0; i < n; i++) @@ -954,6 +996,10 @@ void multicast_message(char* message, size_t length) if (header_count == 0) return; /* Invalid message. */ + /* Allocate multicast message. */ + if (xmalloc(multicast, 1, sizeof(multicast_t))) goto fail; + multicast_initialise(multicast); + /* Allocate header lists. */ if (xmalloc(hashes, header_count, size_t)) goto fail; if (xmalloc(headers, header_count, char*)) goto fail; @@ -1008,7 +1054,6 @@ void multicast_message(char* message, size_t length) /* Look for a matching condition. */ n = client->interception_conditions_count; - errno = 0; if (client->open) { with_mutex(mutex, @@ -1054,48 +1099,127 @@ void multicast_message(char* message, size_t length) /* Sort interceptors. */ qsort(interceptions, interceptions_count, sizeof(queued_interception_t), cmp_queued_interception); - /* Send message to interceptors. */ - for (i = 0; i < interceptions_count; i++) + /* Add prefix to message with ‘Modify ID’ header. */ + with_mutex(slave_mutex, + modify_id = next_modify_id++; + if (next_modify_id == 0) + next_modify_id = 1; + ); + xsnprintf(modify_id_header, "Modify ID: %" PRIu64 "\n", modify_id); + n = strlen(modify_id_header); + old_buf = message; + if ((message = realloc(message, (n + length) * sizeof(char))) == NULL) { - queued_interception_t client_ = interceptions[i]; + message = old_buf; + goto fail; + } + memmove(message + n, message, (n + length) * sizeof(char)); + memcpy(message, modify_id_header, n * sizeof(char)); + + /* Store information. */ + multicast->interceptions = interceptions; + multicast->interceptions_count = interceptions_count; + multicast->message = message; + multicast->message_length = length; + multicast->message_prefix = n; + message = NULL; + + /* Queue message multicasting. */ + with_mutex(sender->mutex, + if (sender->multicasts_count == 1) + { + if (xmalloc(sender->multicasts, 1, multicast_t)) + goto fail_queue; + } + else + { + multicast_t* new_buf; + new_buf = realloc(sender->multicasts, (sender->multicasts_count + 1) * sizeof(multicast_t)); + if (new_buf == NULL) + goto fail_queue; + sender->multicasts = new_buf; + } + sender->multicasts[sender->multicasts_count++] = *multicast; + multicast = NULL; + fail_queue: + ); + + errno = 0; + fail: /* This is done before this function returns even if there was no error. */ + if (errno != 0) + perror(*argv); + /* Release resources. */ + xfree(headers, header_count); + xfree(header_values, header_count); + free(hashes); + free(message); + if (multicast != NULL) + multicast_destroy(multicast); + free(multicast); +} + + +/** + * Multicast a message + * + * @param multicast The multicast message + */ +void multicast_message(multicast_t* multicast) +{ + for (; multicast->interceptions_ptr < multicast->interceptions_count; multicast->interceptions_ptr++) + { + queued_interception_t client_ = multicast->interceptions[multicast->interceptions_ptr]; client_t* client = client_.client; - char* msg = message; + char* msg = multicast->message + multicast->message_ptr; + size_t n = multicast->message_length - multicast->message_ptr; size_t sent; - n = length; + + /* After unmarshalling at re-exec, client will be NULL and must be mapped from its socket. */ + if (client == NULL) + { + size_t address; + with_mutex(slave_mutex, address = fd_table_get(&client_map, client_.socket_fd);); + client_.client = client = (client_t*)(void*)address; + } + + /* Skip Modify ID header if the interceptors will not perform a modification. */ + if ((client_.modifying == 0) && (multicast->message_ptr == 0)) + { + n -= multicast->message_prefix; + multicast->message_ptr += multicast->message_prefix; + } /* Send the message. */ with_mutex(client->mutex, + errno = 0; if (client->open) while (n > 0) { - sent = send_message(client->socket_fd, msg, n); - if ((sent < n) && (errno != EINTR)) /* TODO Support reexec */ + sent = send_message(client->socket_fd, msg + multicast->message_ptr, n); + if (sent < n) { - perror(*argv); + if (errno != EINTR) + perror(*argv); break; } n -= sent; - msg += sent; + multicast->message_ptr += sent; } ); + /* Stop if we are re-exec:ing. */ + if ((n > 0) && (errno == EINTR)) + return; + /* Wait for a reply. */ - if ((n > 0) && client_.modifying) + if ((n == 0) && client_.modifying) { /* TODO */ } + + /* Reset how much of the message has been sent before we continue with next recipient. */ + multicast->message_ptr = 0; } - - - errno = 0; - fail: /* This is done before this function returns even if there was no error. */ - if (errno != 0) - perror(*argv); - /* Release resources. */ - xfree(headers, header_count); - xfree(header_values, header_count); - free(interceptions); - free(hashes); } @@ -1226,7 +1350,7 @@ int marshal_server(int fd) } /* Add the size of the rest of the program's state. */ - state_n += sizeof(int) + sizeof(sig_atomic_t) + sizeof(uint64_t) + 2 * sizeof(size_t); + state_n += sizeof(int) + sizeof(sig_atomic_t) + 2 * sizeof(uint64_t) + 2 * sizeof(size_t); state_n += list_elements * sizeof(size_t) + list_size + map_size; /* Allocate a buffer for all data except the client list and the client map. */ @@ -1241,6 +1365,7 @@ int marshal_server(int fd) /* Marshal the miscellaneous state data. */ buf_set_next(state_buf_, sig_atomic_t, running); buf_set_next(state_buf_, uint64_t, next_id); + buf_set_next(state_buf_, uint64_t, next_modify_id); /* Tell the program how large the marshalled client list is and how any clients are marshalled. */ buf_set_next(state_buf_, size_t, list_size); @@ -1338,6 +1463,7 @@ int unmarshal_server(int fd) /* Unmarshal the miscellaneous state data. */ buf_get_next(state_buf_, sig_atomic_t, running); buf_get_next(state_buf_, uint64_t, next_id); + buf_get_next(state_buf_, uint64_t, next_modify_id); /* Get the marshalled size of the client list and how any clients that are marshalled. */ buf_get_next(state_buf_, size_t, list_size); diff --git a/src/mds-server/mds-server.h b/src/mds-server/mds-server.h index b2dafd4..4dd3c4a 100644 --- a/src/mds-server/mds-server.h +++ b/src/mds-server/mds-server.h @@ -20,6 +20,7 @@ #include "client.h" +#include "multicast.h" #include <stddef.h> @@ -53,12 +54,20 @@ void message_received(client_t* client); void add_intercept_condition(client_t* client, char* condition, int64_t priority, int modifying, int stop); /** - * Multicast a message + * Queue a message for multicasting * * @param message The message * @param length The length of the message + * @param sender The original sender of the message + */ +void queue_message_multicast(char* message, size_t length, client_t* sender); + +/** + * Multicast a message + * + * @param multicast The multicast message */ -void multicast_message(char* message, size_t length); +void multicast_message(multicast_t* multicast); /** * Exec into the mdsinitrc script diff --git a/src/mds-server/multicast.c b/src/mds-server/multicast.c new file mode 100644 index 0000000..9be0e8d --- /dev/null +++ b/src/mds-server/multicast.c @@ -0,0 +1,156 @@ +/** + * mds — A micro-display server + * Copyright © 2014 Mattias Andrée (maandree@member.fsf.org) + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + */ +#include "multicast.h" + +#include "interception_condition.h" + +#include <libmdsserver/macros.h> + +#include <stdlib.h> +#include <string.h> + + +/** + * Initialise a message multicast state + * + * @param this The message multicast state + */ +void multicast_initialise(multicast_t* restrict this) +{ + this->interceptions = NULL; + this->interceptions_count = 0; + this->interceptions_ptr = 0; + this->message = NULL; + this->message_length = 0; + this->message_ptr = 0; + this->message_prefix = 0; +} + + +/** + * Destroy a message multicast state + * + * @param this The message multicast state + */ +void multicast_destroy(multicast_t* restrict this) +{ + free(this->interceptions); + free(this->message); +} + + +/** + * Calculate the buffer size need to marshal a message multicast state + * + * @param this The client information + * @return The number of bytes to allocate to the output buffer + */ +size_t multicast_marshal_size(const multicast_t* restrict this) +{ + size_t rc = 5 * sizeof(size_t) + this->message_length * sizeof(char); + size_t i; + for (i = 0; i < this->interceptions_count; i++) + rc += queued_interception_marshal_size(); + return rc; +} + + +/** + * Marshals a message multicast state + * + * @param this The message multicast state + * @param data Output buffer for the marshalled data + * @return The number of bytes that have been written (everything will be written) + */ +size_t multicast_marshal(const multicast_t* restrict this, char* restrict data) +{ + size_t rc = 5 * sizeof(size_t); + size_t i, n; + buf_set_next(data, size_t, this->interceptions_count); + buf_set_next(data, size_t, this->interceptions_ptr); + buf_set_next(data, size_t, this->message_length); + buf_set_next(data, size_t, this->message_ptr); + buf_set_next(data, size_t, this->message_prefix); + for (i = 0; i < this->interceptions_count; i++) + { + n = queued_interception_marshal(this->interceptions + i, data); + data += n / sizeof(char); + rc += n; + } + memcpy(data, this->message, this->message_length * sizeof(char)); + rc += this->message_length * sizeof(char); + return rc; +} + + +/** + * Unmarshals a message multicast state + * + * @param this Memory slot in which to store the new message multicast state + * @param data In buffer with the marshalled data + * @return Zero on error, errno will be set accordingly, otherwise the number of read bytes. + * Destroy the client information on error. + */ +size_t multicast_unmarshal(multicast_t* restrict this, char* restrict data) +{ + size_t rc = 5 * sizeof(size_t); + size_t i, n; + this->interceptions = NULL; + this->message = NULL; + buf_get_next(data, size_t, this->interceptions_count); + buf_get_next(data, size_t, this->interceptions_ptr); + buf_get_next(data, size_t, this->message_length); + buf_get_next(data, size_t, this->message_ptr); + buf_get_next(data, size_t, this->message_prefix); + if (xmalloc(this->interceptions, this->interceptions_count, queued_interception_t)) + return 0; + for (i = 0; i < this->interceptions_count; i++) + { + n = queued_interception_unmarshal(this->interceptions + i, data); + data += n / sizeof(char); + rc += n; + } + if (xmalloc(this->message, this->message_length, char)) + return 0; + memcpy(this->message, data, this->message_length * sizeof(char)); + rc += this->message_length * sizeof(char); + return rc; +} + + +/** + * Pretend to unmarshal a message multicast state + * + * @param data In buffer with the marshalled data + * @return The number of read bytes + */ +size_t multicast_unmarshal_skip(char* restrict data) +{ + size_t interceptions_count = buf_cast(data, size_t, 0); + size_t message_length = buf_cast(data, size_t, 2); + size_t rc = 5 * sizeof(size_t) + message_length * sizeof(char); + size_t n; + while (interceptions_count--) + { + n = queued_interception_unmarshal_skip(); + data += n / sizeof(char); + rc += n; + } + return rc; +} + diff --git a/src/mds-server/multicast.h b/src/mds-server/multicast.h new file mode 100644 index 0000000..3ea1e07 --- /dev/null +++ b/src/mds-server/multicast.h @@ -0,0 +1,120 @@ +/** + * mds — A micro-display server + * Copyright © 2014 Mattias Andrée (maandree@member.fsf.org) + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + */ +#ifndef MDS_MDS_SERVER_MULTICAST_H +#define MDS_MDS_SERVER_MULTICAST_H + + +#include "queued_interception.h" + + +/** + * Message multicast state + */ +typedef struct multicast +{ + /** + * Queue of clients that is listening this message + */ + struct queued_interception* interceptions; + + /** + * The number of clients in `interceptions` + */ + size_t interceptions_count; + + /** + * The index of the current/next client in `interceptions` to whom to send the message + */ + size_t interceptions_ptr; + + /** + * The message to send + */ + char* message; + + /** + * The length of `message` + */ + size_t message_length; + + /** + * How much of the message that has already been sent to the current recipient + */ + size_t message_ptr; + + /** + * How much of the message to skip if the recipient is not a modifier + */ + size_t message_prefix; + +} multicast_t; + + +/** + * Initialise a message multicast state + * + * @param this The message multicast state + */ +void multicast_initialise(multicast_t* restrict this); + +/** + * Destroy a message multicast state + * + * @param this The message multicast state + */ +void multicast_destroy(multicast_t* restrict this); + +/** + * Calculate the buffer size need to marshal a message multicast state + * + * @param this The client information + * @return The number of bytes to allocate to the output buffer + */ +size_t multicast_marshal_size(const multicast_t* restrict this) __attribute__((pure)); + +/** + * Marshals a message multicast state + * + * @param this The message multicast state + * @param data Output buffer for the marshalled data + * @return The number of bytes that have been written (everything will be written) + */ +size_t multicast_marshal(const multicast_t* restrict this, char* restrict data); + +/** + * Unmarshals a message multicast state + * + * @param this Memory slot in which to store the new message multicast state + * @param data In buffer with the marshalled data + * @return Zero on error, errno will be set accordingly, otherwise the number of read bytes. + * Destroy the message multicast state on error. + */ +size_t multicast_unmarshal(multicast_t* restrict this, char* restrict data); + +/** + * Pretend to unmarshal a message multicast state + * + * @param data In buffer with the marshalled data + * @return The number of read bytes + */ +size_t multicast_unmarshal_skip(char* restrict data) __attribute__((pure)); + + + +#endif + diff --git a/src/mds-server/queued_interception.c b/src/mds-server/queued_interception.c new file mode 100644 index 0000000..56d58c3 --- /dev/null +++ b/src/mds-server/queued_interception.c @@ -0,0 +1,78 @@ +/** + * mds — A micro-display server + * Copyright © 2014 Mattias Andrée (maandree@member.fsf.org) + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + */ +#include "queued_interception.h" + +#include <libmdsserver/macros.h> + + +/** + * Calculate the buffer size need to marshal a queued interception + * + * @param this The client information + * @return The number of bytes to allocate to the output buffer + */ +size_t queued_interception_marshal_size(void) +{ + return sizeof(int64_t) + 2 * sizeof(int); +} + + +/** + * Marshals a queued interception + * + * @param this The queued interception + * @param data Output buffer for the marshalled data + * @return The number of bytes that have been written (everything will be written) + */ +size_t queued_interception_marshal(const queued_interception_t* restrict this, char* restrict data) +{ + buf_set_next(data, int64_t, this->priority); + buf_set_next(data, int, this->modifying); + buf_set_next(data, int, this->client->socket_fd); + return queued_interception_marshal_size(); +} + + +/** + * Unmarshals a queued interception + * + * @param this Memory slot in which to store the new queued interception + * @param data In buffer with the marshalled data + * @return Zero on error, errno will be set accordingly, otherwise the number of read bytes. + */ +size_t queued_interception_unmarshal(queued_interception_t* restrict this, char* restrict data) +{ + this->client = NULL; + buf_get_next(data, int64_t, this->priority); + buf_get_next(data, int, this->modifying); + buf_get_next(data, int, this->socket_fd); + return queued_interception_marshal_size(); +} + + +/** + * Pretend to unmarshal a queued interception + * + * @param data In buffer with the marshalled data + * @return The number of read bytes + */ +size_t queued_interception_unmarshal_skip(void) +{ + return queued_interception_marshal_size(); +} + diff --git a/src/mds-server/queued_interception.h b/src/mds-server/queued_interception.h index 6f08a8a..ddab418 100644 --- a/src/mds-server/queued_interception.h +++ b/src/mds-server/queued_interception.h @@ -32,7 +32,7 @@ typedef struct queued_interception /** * The intercepting client */ - client_t* client; + struct client* client; /** * The interception priority @@ -44,8 +44,48 @@ typedef struct queued_interception */ int modifying; + /** + * The file descriptor of the intercepting client's socket (used for unmarshalling) + */ + int socket_fd; + } queued_interception_t; +/** + * Calculate the buffer size need to marshal a queued interception + * + * @param this The client information + * @return The number of bytes to allocate to the output buffer + */ +size_t queued_interception_marshal_size(void) __attribute__((const)); + +/** + * Marshals a queued interception + * + * @param this The queued interception + * @param data Output buffer for the marshalled data + * @return The number of bytes that have been written (everything will be written) + */ +size_t queued_interception_marshal(const queued_interception_t* restrict this, char* restrict data); + +/** + * Unmarshals a queued interception + * + * @param this Memory slot in which to store the new queued interception + * @param data In buffer with the marshalled data + * @return Zero on error, errno will be set accordingly, otherwise the number of read bytes. + */ +size_t queued_interception_unmarshal(queued_interception_t* restrict this, char* restrict data); + +/** + * Pretend to unmarshal a queued interception + * + * @param data In buffer with the marshalled data + * @return The number of read bytes + */ +size_t queued_interception_unmarshal_skip(void) __attribute__((const)); + + #endif |