diff options
Diffstat (limited to 'src')
-rw-r--r-- | src/mds-server.c | 117 | ||||
-rw-r--r-- | src/mds-server.h | 10 |
2 files changed, 97 insertions, 30 deletions
diff --git a/src/mds-server.c b/src/mds-server.c index 3416777..715b826 100644 --- a/src/mds-server.c +++ b/src/mds-server.c @@ -417,6 +417,7 @@ void* slave_loop(void* data) /* NULL-out pointers. */ information->interception_conditions = NULL; + information->send_pending = NULL; /* Add to list of clients. */ pthread_mutex_lock(&slave_mutex); @@ -443,6 +444,7 @@ void* slave_loop(void* data) information->open = 1; information->id = 0; information->interception_conditions_count = 0; + information->send_pending_size = 0; if (mds_message_initialise(&(information->message))) { perror(*argv); @@ -471,6 +473,32 @@ void* slave_loop(void* data) if (information->open) while (reexecing == 0) { + /* Send queued messages. */ + if (information->send_pending_size > 0) + { + char* sendbuf = information->send_pending; + char* sendbuf_ = sendbuf; + size_t sent; + n = information->send_pending_size; + information->send_pending_size = 0; + information->send_pending = NULL; + with_mutex(information->mutex, + while (n > 0) + { + sent = send_message(information->socket_fd, sendbuf_, n); + if ((sent < n) && (errno != EINTR)) /* Ignore EINTR */ + { + perror(*argv); + break; + } + n -= sent; + sendbuf_ += sent; + } + free(sendbuf); + ); + } + + /* Fetch message.*/ r = mds_message_read(&(information->message), socket_fd); if (r == 0) message_received(information); @@ -533,6 +561,7 @@ void* slave_loop(void* data) if (mutex_created) pthread_mutex_destroy(&(information->mutex)); mds_message_destroy(&(information->message)); + free(information->send_pending); free(information); } @@ -687,15 +716,13 @@ void message_received(client_t* client) return; } mds_message_marshal(&message, msgbuf, 0); - multicast_message(msgbuf, n / sizeof(char)); + multicast_message(msgbuf, n / sizeof(char)); /* TODO support re-exec */ free(msgbuf); /* Send asigned ID. */ if (assign_id) { - size_t sent; - /* Construct response. */ n = 2 * 10 + strlen(message_id) + 1; n += strlen("ID assignment: :\nIn response to: \n\n"); @@ -714,23 +741,33 @@ void message_received(client_t* client) n = strlen(msgbuf); /* Multicast the reply. */ - multicast_message(msgbuf, n); + multicast_message(msgbuf, n); /* TODO support re-exec */ - /* Send message. */ + /* Queue message to be sent when this function returns. + This done to simplify `multicast_message` for re-exec. */ with_mutex(client->mutex, - while (n > 0) + if (client->send_pending_size == 0) + { + /* Set the pending message. */ + client->send_pending = msgbuf; + client->send_pending_size = n; + } + else { - sent = send_message(client->socket_fd, msgbuf, n); - if ((sent < n) && (errno != EINTR)) /* Ignore EINTR */ + /* Concatenate message to already pending messages. */ + size_t new_len = client->send_pending_size + n; + char* msg_new = realloc(client->send_pending, new_len * sizeof(char)); + if (msg_new != NULL) { - perror(*argv); - break; + memcpy(msg_new + client->send_pending_size, msgbuf, n * sizeof(char)); + client->send_pending = msg_new; + client->send_pending_size = new_len; } - n -= sent; - msgbuf += sent; + else + perror(*argv); + free(msgbuf); } ); - free(msgbuf); } } @@ -1001,23 +1038,20 @@ void multicast_message(char* message, size_t length) size_t sent; n = length; - /* Skip if the client has closed. */ - if (client->open == 0) - continue; - /* Send the message. */ with_mutex(client->mutex, - while (n > 0) - { - sent = send_message(client->socket_fd, msg, n); - if ((sent < n) && (errno != EINTR)) /* TODO Support reexec */ - { - perror(*argv); - break; + if (client->open) + while (n > 0) + { + sent = send_message(client->socket_fd, msg, n); + if ((sent < n) && (errno != EINTR)) /* TODO Support reexec */ + { + perror(*argv); + break; } - n -= sent; - msg += sent; - } + n -= sent; + msg += sent; + } ); /* Wait for a reply. */ @@ -1174,13 +1208,14 @@ int marshal_server(int fd) message = value->message; msg_size += mds_message_marshal_size(&message, 1); msg_size += n * (sizeof(size_t) + sizeof(int64_t) + sizeof(int)); + msg_size += value->send_pending_size * sizeof(char); for (j = 0; j < n; j++) msg_size += (strlen(value->interception_conditions[j].condition) + 1) * sizeof(char); } /* Calculate the grand size of all client information. */ - state_n = 5 + sizeof(size_t) + 2 * sizeof(int) + 1 * sizeof(uint64_t); + state_n = 6 + sizeof(size_t) + 2 * sizeof(int) + 1 * sizeof(uint64_t); state_n *= list_elements; state_n += msg_size; @@ -1224,6 +1259,13 @@ int marshal_server(int fd) buf_set_next(state_buf_, int, value->socket_fd); buf_set_next(state_buf_, int, value->open); buf_set_next(state_buf_, uint64_t, value->id); + /* Marshal the pending messages. */ + buf_set_next(state_buf_, size_t, value->send_pending_size); + if (value->send_pending_size > 0) + { + memcpy(state_buf_, value->send_pending, value->send_pending_size * sizeof(char)); + state_buf_ += value->send_pending_size; + } /* Marshal interception conditions. */ buf_set_next(state_buf_, size_t, n = value->interception_conditions_count); for (j = 0; j < n; j++) @@ -1413,10 +1455,24 @@ int unmarshal_server(int fd) buf_get_next(state_buf_, ssize_t, value->list_entry); buf_get_next(state_buf_, int, value->socket_fd); buf_get_next(state_buf_, int, value->open); - buf_set_next(state_buf_, uint64_t, value->id); + buf_get_next(state_buf_, uint64_t, value->id); + /* Unmarshal the pending messages. */ + buf_get_next(state_buf_, size_t, value->send_pending_size); + if (value->send_pending_size > 0) + { + if (xmalloc(value->send_pending, value->send_pending_size, char)) + { + perror(*argv); + goto clients_fail; + } + memcpy(value->send_pending, state_buf_, value->send_pending_size * sizeof(char)); + state_buf_ += value->send_pending_size; + } + else + value->send_pending = NULL; /* Unmarshal interception conditions. */ buf_get_next(state_buf_, size_t, value->interception_conditions_count = n); - seek = 5 * sizeof(size_t) + 2 * sizeof(int) + 1 * sizeof(uint64_t); + seek = 6 * sizeof(size_t) + 2 * sizeof(int) + 1 * sizeof(uint64_t); if (xmalloc(value->interception_conditions, n, interception_condition_t)) { perror(*argv); @@ -1463,6 +1519,7 @@ int unmarshal_server(int fd) free(value->interception_conditions[j].condition); free(value->interception_conditions); } + free(value->send_pending); free(value); } state_buf_ -= seek / sizeof(char); diff --git a/src/mds-server.h b/src/mds-server.h index 6269e9b..1e3869f 100644 --- a/src/mds-server.h +++ b/src/mds-server.h @@ -110,6 +110,16 @@ typedef struct client */ size_t interception_conditions_count; + /** + * Messages pending to be sent (concatenated) + */ + char* send_pending; + + /** + * The character length of the messages pending to be sent + */ + size_t send_pending_size; + } client_t; /** |