aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--src/mds-server.c117
-rw-r--r--src/mds-server.h10
2 files changed, 97 insertions, 30 deletions
diff --git a/src/mds-server.c b/src/mds-server.c
index 3416777..715b826 100644
--- a/src/mds-server.c
+++ b/src/mds-server.c
@@ -417,6 +417,7 @@ void* slave_loop(void* data)
/* NULL-out pointers. */
information->interception_conditions = NULL;
+ information->send_pending = NULL;
/* Add to list of clients. */
pthread_mutex_lock(&slave_mutex);
@@ -443,6 +444,7 @@ void* slave_loop(void* data)
information->open = 1;
information->id = 0;
information->interception_conditions_count = 0;
+ information->send_pending_size = 0;
if (mds_message_initialise(&(information->message)))
{
perror(*argv);
@@ -471,6 +473,32 @@ void* slave_loop(void* data)
if (information->open)
while (reexecing == 0)
{
+ /* Send queued messages. */
+ if (information->send_pending_size > 0)
+ {
+ char* sendbuf = information->send_pending;
+ char* sendbuf_ = sendbuf;
+ size_t sent;
+ n = information->send_pending_size;
+ information->send_pending_size = 0;
+ information->send_pending = NULL;
+ with_mutex(information->mutex,
+ while (n > 0)
+ {
+ sent = send_message(information->socket_fd, sendbuf_, n);
+ if ((sent < n) && (errno != EINTR)) /* Ignore EINTR */
+ {
+ perror(*argv);
+ break;
+ }
+ n -= sent;
+ sendbuf_ += sent;
+ }
+ free(sendbuf);
+ );
+ }
+
+ /* Fetch message.*/
r = mds_message_read(&(information->message), socket_fd);
if (r == 0)
message_received(information);
@@ -533,6 +561,7 @@ void* slave_loop(void* data)
if (mutex_created)
pthread_mutex_destroy(&(information->mutex));
mds_message_destroy(&(information->message));
+ free(information->send_pending);
free(information);
}
@@ -687,15 +716,13 @@ void message_received(client_t* client)
return;
}
mds_message_marshal(&message, msgbuf, 0);
- multicast_message(msgbuf, n / sizeof(char));
+ multicast_message(msgbuf, n / sizeof(char)); /* TODO support re-exec */
free(msgbuf);
/* Send asigned ID. */
if (assign_id)
{
- size_t sent;
-
/* Construct response. */
n = 2 * 10 + strlen(message_id) + 1;
n += strlen("ID assignment: :\nIn response to: \n\n");
@@ -714,23 +741,33 @@ void message_received(client_t* client)
n = strlen(msgbuf);
/* Multicast the reply. */
- multicast_message(msgbuf, n);
+ multicast_message(msgbuf, n); /* TODO support re-exec */
- /* Send message. */
+ /* Queue message to be sent when this function returns.
+ This done to simplify `multicast_message` for re-exec. */
with_mutex(client->mutex,
- while (n > 0)
+ if (client->send_pending_size == 0)
+ {
+ /* Set the pending message. */
+ client->send_pending = msgbuf;
+ client->send_pending_size = n;
+ }
+ else
{
- sent = send_message(client->socket_fd, msgbuf, n);
- if ((sent < n) && (errno != EINTR)) /* Ignore EINTR */
+ /* Concatenate message to already pending messages. */
+ size_t new_len = client->send_pending_size + n;
+ char* msg_new = realloc(client->send_pending, new_len * sizeof(char));
+ if (msg_new != NULL)
{
- perror(*argv);
- break;
+ memcpy(msg_new + client->send_pending_size, msgbuf, n * sizeof(char));
+ client->send_pending = msg_new;
+ client->send_pending_size = new_len;
}
- n -= sent;
- msgbuf += sent;
+ else
+ perror(*argv);
+ free(msgbuf);
}
);
- free(msgbuf);
}
}
@@ -1001,23 +1038,20 @@ void multicast_message(char* message, size_t length)
size_t sent;
n = length;
- /* Skip if the client has closed. */
- if (client->open == 0)
- continue;
-
/* Send the message. */
with_mutex(client->mutex,
- while (n > 0)
- {
- sent = send_message(client->socket_fd, msg, n);
- if ((sent < n) && (errno != EINTR)) /* TODO Support reexec */
- {
- perror(*argv);
- break;
+ if (client->open)
+ while (n > 0)
+ {
+ sent = send_message(client->socket_fd, msg, n);
+ if ((sent < n) && (errno != EINTR)) /* TODO Support reexec */
+ {
+ perror(*argv);
+ break;
}
- n -= sent;
- msg += sent;
- }
+ n -= sent;
+ msg += sent;
+ }
);
/* Wait for a reply. */
@@ -1174,13 +1208,14 @@ int marshal_server(int fd)
message = value->message;
msg_size += mds_message_marshal_size(&message, 1);
msg_size += n * (sizeof(size_t) + sizeof(int64_t) + sizeof(int));
+ msg_size += value->send_pending_size * sizeof(char);
for (j = 0; j < n; j++)
msg_size += (strlen(value->interception_conditions[j].condition) + 1) * sizeof(char);
}
/* Calculate the grand size of all client information. */
- state_n = 5 + sizeof(size_t) + 2 * sizeof(int) + 1 * sizeof(uint64_t);
+ state_n = 6 + sizeof(size_t) + 2 * sizeof(int) + 1 * sizeof(uint64_t);
state_n *= list_elements;
state_n += msg_size;
@@ -1224,6 +1259,13 @@ int marshal_server(int fd)
buf_set_next(state_buf_, int, value->socket_fd);
buf_set_next(state_buf_, int, value->open);
buf_set_next(state_buf_, uint64_t, value->id);
+ /* Marshal the pending messages. */
+ buf_set_next(state_buf_, size_t, value->send_pending_size);
+ if (value->send_pending_size > 0)
+ {
+ memcpy(state_buf_, value->send_pending, value->send_pending_size * sizeof(char));
+ state_buf_ += value->send_pending_size;
+ }
/* Marshal interception conditions. */
buf_set_next(state_buf_, size_t, n = value->interception_conditions_count);
for (j = 0; j < n; j++)
@@ -1413,10 +1455,24 @@ int unmarshal_server(int fd)
buf_get_next(state_buf_, ssize_t, value->list_entry);
buf_get_next(state_buf_, int, value->socket_fd);
buf_get_next(state_buf_, int, value->open);
- buf_set_next(state_buf_, uint64_t, value->id);
+ buf_get_next(state_buf_, uint64_t, value->id);
+ /* Unmarshal the pending messages. */
+ buf_get_next(state_buf_, size_t, value->send_pending_size);
+ if (value->send_pending_size > 0)
+ {
+ if (xmalloc(value->send_pending, value->send_pending_size, char))
+ {
+ perror(*argv);
+ goto clients_fail;
+ }
+ memcpy(value->send_pending, state_buf_, value->send_pending_size * sizeof(char));
+ state_buf_ += value->send_pending_size;
+ }
+ else
+ value->send_pending = NULL;
/* Unmarshal interception conditions. */
buf_get_next(state_buf_, size_t, value->interception_conditions_count = n);
- seek = 5 * sizeof(size_t) + 2 * sizeof(int) + 1 * sizeof(uint64_t);
+ seek = 6 * sizeof(size_t) + 2 * sizeof(int) + 1 * sizeof(uint64_t);
if (xmalloc(value->interception_conditions, n, interception_condition_t))
{
perror(*argv);
@@ -1463,6 +1519,7 @@ int unmarshal_server(int fd)
free(value->interception_conditions[j].condition);
free(value->interception_conditions);
}
+ free(value->send_pending);
free(value);
}
state_buf_ -= seek / sizeof(char);
diff --git a/src/mds-server.h b/src/mds-server.h
index 6269e9b..1e3869f 100644
--- a/src/mds-server.h
+++ b/src/mds-server.h
@@ -110,6 +110,16 @@ typedef struct client
*/
size_t interception_conditions_count;
+ /**
+ * Messages pending to be sent (concatenated)
+ */
+ char* send_pending;
+
+ /**
+ * The character length of the messages pending to be sent
+ */
+ size_t send_pending_size;
+
} client_t;
/**