diff options
Diffstat (limited to 'src')
-rw-r--r-- | src/mds-server/mds-server.c | 219 |
1 files changed, 109 insertions, 110 deletions
diff --git a/src/mds-server/mds-server.c b/src/mds-server/mds-server.c index de60228..96bee5b 100644 --- a/src/mds-server/mds-server.c +++ b/src/mds-server/mds-server.c @@ -584,6 +584,7 @@ void* slave_loop(void* data) if (information->multicasts_count > 0) { size_t c = information->multicasts_count -= 1; + c *= sizeof(multicast_t); multicast = information->multicasts[0]; memmove(information->multicasts, information->multicasts + 1, c); if (c == 0) @@ -756,50 +757,50 @@ int message_received(client_t* client) client_t* recipient; mds_message_t* multicast; - with_mutex(modify_mutex, - while (hash_table_contains_key(&modify_map, (size_t)modify_id) == 0) - { - if (reexecing) - { - pthread_mutex_unlock(&(modify_mutex)); - return 1; - } - pthread_cond_timedwait(&slave_cond, &slave_mutex, &timeout); - } - address = hash_table_get(&modify_map, (size_t)modify_id); - recipient = (client_t*)(void*)address; - if (xmalloc(multicast = recipient->modify_message, 1, mds_message_t)) - goto fail; - multicast->headers = NULL; - multicast->header_count = 0; - multicast->payload = NULL; - multicast->payload_size = 0; - multicast->payload_ptr = 0; - multicast->buffer = NULL; - multicast->buffer_size = 0; - multicast->buffer_ptr = 0; - multicast->stage = 0; - if (xmalloc(multicast->payload, message.payload_size, char)) - goto fail; - memcpy(multicast->payload, message.payload, message.payload_size * sizeof(char)); - if (xmalloc(multicast->headers, message.header_count, char*)) - goto fail; - for (i = 0; i < message.header_count; i++, multicast->header_count++) - { - multicast->headers[i] = strdup(message.headers[i]); - if (multicast->headers[i] == NULL) - goto fail; - } - goto done; - fail: - if (multicast != NULL) - { - mds_message_destroy(multicast); - free(multicast); - recipient->modify_message = NULL; - } - done: - ); + pthread_mutex_lock(&(modify_mutex)); + while (hash_table_contains_key(&modify_map, (size_t)modify_id) == 0) + { + if (reexecing) + { + pthread_mutex_unlock(&(modify_mutex)); + return 1; + } + pthread_cond_timedwait(&slave_cond, &slave_mutex, &timeout); + } + address = hash_table_get(&modify_map, (size_t)modify_id); + recipient = (client_t*)(void*)address; + if (xmalloc(multicast = recipient->modify_message, 1, mds_message_t)) + goto fail; + multicast->headers = NULL; + multicast->header_count = 0; + multicast->payload = NULL; + multicast->payload_size = 0; + multicast->payload_ptr = 0; + multicast->buffer = NULL; + multicast->buffer_size = 0; + multicast->buffer_ptr = 0; + multicast->stage = 0; + if (xmalloc(multicast->payload, message.payload_size, char)) + goto fail; + memcpy(multicast->payload, message.payload, message.payload_size * sizeof(char)); + if (xmalloc(multicast->headers, message.header_count, char*)) + goto fail; + for (i = 0; i < message.header_count; i++, multicast->header_count++) + { + multicast->headers[i] = strdup(message.headers[i]); + if (multicast->headers[i] == NULL) + goto fail; + } + goto done; + fail: + if (multicast != NULL) + { + mds_message_destroy(multicast); + free(multicast); + recipient->modify_message = NULL; + } + done: + pthread_mutex_unlock(&(modify_mutex)); with_mutex(client->modify_mutex, pthread_cond_signal(&(client->modify_cond));); /* Do nothing more, not not even multicast this message. */ @@ -1176,69 +1177,66 @@ void queue_message_multicast(char* message, size_t length, client_t* sender) } /* Get intercepting clients. */ - with_mutex(slave_mutex, - /* Count clients. */ - n = 0; - foreach_linked_list_node (client_list, node) - n++; - - /* Allocate interceptor list. */ - interceptions = malloc(n * sizeof(queued_interception_t*)); - - /* Search clients. */ - if (interceptions != NULL) - foreach_linked_list_node (client_list, node) - { - client_t* client = (client_t*)(void*)(client_list.values[node]); - pthread_mutex_t mutex = client->mutex; - interception_condition_t* conds = client->interception_conditions; - int64_t priority = 0; /* Initialise to stop incorrect warning. */ - int modifying = 0; /* Initialise to stop incorrect warning. */ - size_t j; - - /* Look for a matching condition. */ - n = client->interception_conditions_count; - if (client->open && (client != sender)) - { - with_mutex(mutex, - if (errno || (client->open == 0)) - n = 0; - for (i = 0; i < n; i++) - { - interception_condition_t* cond = conds + i; - for (j = 0; j < header_count; j++) - { - if (*(cond->condition) == '\0') - break; - if (cond->header_hash == hashes[j]) - if (strequals(cond->condition, headers[j]) || - strequals(cond->condition, header_values[j])) - break; - } - if (j < header_count) - { - priority = cond->priority; - modifying = cond->modifying; - break; - } - } - ); - } - else - n = 0; - - /* List client of there was a matching condition. */ - if (i < n) - { - interceptions[interceptions_count].client = client; - interceptions[interceptions_count].priority = priority; - interceptions[interceptions_count].modifying = modifying; - interceptions_count++; - } - } - ); - if (interceptions == NULL) - goto fail; + pthread_mutex_lock(&(slave_mutex)); + /* Count clients. */ + n = 0; + foreach_linked_list_node (client_list, node) + n++; + + /* Allocate interceptor list. */ + if (xmalloc(interceptions, n, queued_interception_t)) goto fail; + + /* Search clients. */ + foreach_linked_list_node (client_list, node) + { + client_t* client = (client_t*)(void*)(client_list.values[node]); + pthread_mutex_t mutex = client->mutex; + interception_condition_t* conds = client->interception_conditions; + int64_t priority = 0; /* Initialise to stop incorrect warning. */ + int modifying = 0; /* Initialise to stop incorrect warning. */ + size_t j; + + /* Look for a matching condition. */ + n = client->interception_conditions_count; + if (client->open && (client != sender)) + { + pthread_mutex_lock(&(mutex)); + if (errno || (client->open == 0)) + n = 0; + for (i = 0; i < n; i++) + { + interception_condition_t* cond = conds + i; + for (j = 0; j < header_count; j++) + { + if (*(cond->condition) == '\0') + break; + if (cond->header_hash == hashes[j]) + if (strequals(cond->condition, headers[j]) || + strequals(cond->condition, header_values[j])) + break; + } + if (j < header_count) + { + priority = cond->priority; + modifying = cond->modifying; + break; + } + } + pthread_mutex_unlock(&(mutex)); + } + else + n = 0; + + /* List client of there was a matching condition. */ + if (i < n) + { + interceptions[interceptions_count].client = client; + interceptions[interceptions_count].priority = priority; + interceptions[interceptions_count].modifying = modifying; + interceptions_count++; + } + } + pthread_mutex_unlock(&(slave_mutex)); /* Sort interceptors. */ qsort(interceptions, interceptions_count, sizeof(queued_interception_t), cmp_queued_interception); @@ -1311,9 +1309,10 @@ void queue_message_multicast(char* message, size_t length, client_t* sender) void multicast_message(multicast_t* multicast) { uint64_t modify_id = 0; - if (strstr(multicast->message, "Modify ID: ") == multicast->message) + size_t n = min(strlen("Modify ID: "), multicast->message_length); + if (!strncmp(multicast->message, "Modify ID: ", n)) { - char* value = multicast->message + strlen("Modify ID: "); + char* value = multicast->message + n; char* lf = strchr(value, '\n'); *lf = '\0'; modify_id = (uint64_t)atoll(value); @@ -1325,8 +1324,8 @@ void multicast_message(multicast_t* multicast) queued_interception_t client_ = multicast->interceptions[multicast->interceptions_ptr]; client_t* client = client_.client; char* msg = multicast->message + multicast->message_ptr; - size_t n = multicast->message_length - multicast->message_ptr; size_t sent; + n = multicast->message_length - multicast->message_ptr; /* After unmarshalling at re-exec, client will be NULL and must be mapped from its socket. */ if (client == NULL) |