aboutsummaryrefslogtreecommitdiffstats
path: root/src/mds-server
diff options
context:
space:
mode:
Diffstat (limited to 'src/mds-server')
-rw-r--r--src/mds-server/mds-server.c219
1 files changed, 109 insertions, 110 deletions
diff --git a/src/mds-server/mds-server.c b/src/mds-server/mds-server.c
index de60228..96bee5b 100644
--- a/src/mds-server/mds-server.c
+++ b/src/mds-server/mds-server.c
@@ -584,6 +584,7 @@ void* slave_loop(void* data)
if (information->multicasts_count > 0)
{
size_t c = information->multicasts_count -= 1;
+ c *= sizeof(multicast_t);
multicast = information->multicasts[0];
memmove(information->multicasts, information->multicasts + 1, c);
if (c == 0)
@@ -756,50 +757,50 @@ int message_received(client_t* client)
client_t* recipient;
mds_message_t* multicast;
- with_mutex(modify_mutex,
- while (hash_table_contains_key(&modify_map, (size_t)modify_id) == 0)
- {
- if (reexecing)
- {
- pthread_mutex_unlock(&(modify_mutex));
- return 1;
- }
- pthread_cond_timedwait(&slave_cond, &slave_mutex, &timeout);
- }
- address = hash_table_get(&modify_map, (size_t)modify_id);
- recipient = (client_t*)(void*)address;
- if (xmalloc(multicast = recipient->modify_message, 1, mds_message_t))
- goto fail;
- multicast->headers = NULL;
- multicast->header_count = 0;
- multicast->payload = NULL;
- multicast->payload_size = 0;
- multicast->payload_ptr = 0;
- multicast->buffer = NULL;
- multicast->buffer_size = 0;
- multicast->buffer_ptr = 0;
- multicast->stage = 0;
- if (xmalloc(multicast->payload, message.payload_size, char))
- goto fail;
- memcpy(multicast->payload, message.payload, message.payload_size * sizeof(char));
- if (xmalloc(multicast->headers, message.header_count, char*))
- goto fail;
- for (i = 0; i < message.header_count; i++, multicast->header_count++)
- {
- multicast->headers[i] = strdup(message.headers[i]);
- if (multicast->headers[i] == NULL)
- goto fail;
- }
- goto done;
- fail:
- if (multicast != NULL)
- {
- mds_message_destroy(multicast);
- free(multicast);
- recipient->modify_message = NULL;
- }
- done:
- );
+ pthread_mutex_lock(&(modify_mutex));
+ while (hash_table_contains_key(&modify_map, (size_t)modify_id) == 0)
+ {
+ if (reexecing)
+ {
+ pthread_mutex_unlock(&(modify_mutex));
+ return 1;
+ }
+ pthread_cond_timedwait(&slave_cond, &slave_mutex, &timeout);
+ }
+ address = hash_table_get(&modify_map, (size_t)modify_id);
+ recipient = (client_t*)(void*)address;
+ if (xmalloc(multicast = recipient->modify_message, 1, mds_message_t))
+ goto fail;
+ multicast->headers = NULL;
+ multicast->header_count = 0;
+ multicast->payload = NULL;
+ multicast->payload_size = 0;
+ multicast->payload_ptr = 0;
+ multicast->buffer = NULL;
+ multicast->buffer_size = 0;
+ multicast->buffer_ptr = 0;
+ multicast->stage = 0;
+ if (xmalloc(multicast->payload, message.payload_size, char))
+ goto fail;
+ memcpy(multicast->payload, message.payload, message.payload_size * sizeof(char));
+ if (xmalloc(multicast->headers, message.header_count, char*))
+ goto fail;
+ for (i = 0; i < message.header_count; i++, multicast->header_count++)
+ {
+ multicast->headers[i] = strdup(message.headers[i]);
+ if (multicast->headers[i] == NULL)
+ goto fail;
+ }
+ goto done;
+ fail:
+ if (multicast != NULL)
+ {
+ mds_message_destroy(multicast);
+ free(multicast);
+ recipient->modify_message = NULL;
+ }
+ done:
+ pthread_mutex_unlock(&(modify_mutex));
with_mutex(client->modify_mutex, pthread_cond_signal(&(client->modify_cond)););
/* Do nothing more, not not even multicast this message. */
@@ -1176,69 +1177,66 @@ void queue_message_multicast(char* message, size_t length, client_t* sender)
}
/* Get intercepting clients. */
- with_mutex(slave_mutex,
- /* Count clients. */
- n = 0;
- foreach_linked_list_node (client_list, node)
- n++;
-
- /* Allocate interceptor list. */
- interceptions = malloc(n * sizeof(queued_interception_t*));
-
- /* Search clients. */
- if (interceptions != NULL)
- foreach_linked_list_node (client_list, node)
- {
- client_t* client = (client_t*)(void*)(client_list.values[node]);
- pthread_mutex_t mutex = client->mutex;
- interception_condition_t* conds = client->interception_conditions;
- int64_t priority = 0; /* Initialise to stop incorrect warning. */
- int modifying = 0; /* Initialise to stop incorrect warning. */
- size_t j;
-
- /* Look for a matching condition. */
- n = client->interception_conditions_count;
- if (client->open && (client != sender))
- {
- with_mutex(mutex,
- if (errno || (client->open == 0))
- n = 0;
- for (i = 0; i < n; i++)
- {
- interception_condition_t* cond = conds + i;
- for (j = 0; j < header_count; j++)
- {
- if (*(cond->condition) == '\0')
- break;
- if (cond->header_hash == hashes[j])
- if (strequals(cond->condition, headers[j]) ||
- strequals(cond->condition, header_values[j]))
- break;
- }
- if (j < header_count)
- {
- priority = cond->priority;
- modifying = cond->modifying;
- break;
- }
- }
- );
- }
- else
- n = 0;
-
- /* List client of there was a matching condition. */
- if (i < n)
- {
- interceptions[interceptions_count].client = client;
- interceptions[interceptions_count].priority = priority;
- interceptions[interceptions_count].modifying = modifying;
- interceptions_count++;
- }
- }
- );
- if (interceptions == NULL)
- goto fail;
+ pthread_mutex_lock(&(slave_mutex));
+ /* Count clients. */
+ n = 0;
+ foreach_linked_list_node (client_list, node)
+ n++;
+
+ /* Allocate interceptor list. */
+ if (xmalloc(interceptions, n, queued_interception_t)) goto fail;
+
+ /* Search clients. */
+ foreach_linked_list_node (client_list, node)
+ {
+ client_t* client = (client_t*)(void*)(client_list.values[node]);
+ pthread_mutex_t mutex = client->mutex;
+ interception_condition_t* conds = client->interception_conditions;
+ int64_t priority = 0; /* Initialise to stop incorrect warning. */
+ int modifying = 0; /* Initialise to stop incorrect warning. */
+ size_t j;
+
+ /* Look for a matching condition. */
+ n = client->interception_conditions_count;
+ if (client->open && (client != sender))
+ {
+ pthread_mutex_lock(&(mutex));
+ if (errno || (client->open == 0))
+ n = 0;
+ for (i = 0; i < n; i++)
+ {
+ interception_condition_t* cond = conds + i;
+ for (j = 0; j < header_count; j++)
+ {
+ if (*(cond->condition) == '\0')
+ break;
+ if (cond->header_hash == hashes[j])
+ if (strequals(cond->condition, headers[j]) ||
+ strequals(cond->condition, header_values[j]))
+ break;
+ }
+ if (j < header_count)
+ {
+ priority = cond->priority;
+ modifying = cond->modifying;
+ break;
+ }
+ }
+ pthread_mutex_unlock(&(mutex));
+ }
+ else
+ n = 0;
+
+ /* List client of there was a matching condition. */
+ if (i < n)
+ {
+ interceptions[interceptions_count].client = client;
+ interceptions[interceptions_count].priority = priority;
+ interceptions[interceptions_count].modifying = modifying;
+ interceptions_count++;
+ }
+ }
+ pthread_mutex_unlock(&(slave_mutex));
/* Sort interceptors. */
qsort(interceptions, interceptions_count, sizeof(queued_interception_t), cmp_queued_interception);
@@ -1311,9 +1309,10 @@ void queue_message_multicast(char* message, size_t length, client_t* sender)
void multicast_message(multicast_t* multicast)
{
uint64_t modify_id = 0;
- if (strstr(multicast->message, "Modify ID: ") == multicast->message)
+ size_t n = min(strlen("Modify ID: "), multicast->message_length);
+ if (!strncmp(multicast->message, "Modify ID: ", n))
{
- char* value = multicast->message + strlen("Modify ID: ");
+ char* value = multicast->message + n;
char* lf = strchr(value, '\n');
*lf = '\0';
modify_id = (uint64_t)atoll(value);
@@ -1325,8 +1324,8 @@ void multicast_message(multicast_t* multicast)
queued_interception_t client_ = multicast->interceptions[multicast->interceptions_ptr];
client_t* client = client_.client;
char* msg = multicast->message + multicast->message_ptr;
- size_t n = multicast->message_length - multicast->message_ptr;
size_t sent;
+ n = multicast->message_length - multicast->message_ptr;
/* After unmarshalling at re-exec, client will be NULL and must be mapped from its socket. */
if (client == NULL)