aboutsummaryrefslogtreecommitdiffstats
path: root/src/mds-server/mds-server.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/mds-server/mds-server.c')
-rw-r--r--src/mds-server/mds-server.c222
1 files changed, 107 insertions, 115 deletions
diff --git a/src/mds-server/mds-server.c b/src/mds-server/mds-server.c
index 0d669ad..4d146b7 100644
--- a/src/mds-server/mds-server.c
+++ b/src/mds-server/mds-server.c
@@ -551,19 +551,16 @@ void send_multicast_queue(client_t* client)
while (client->multicasts_count > 0)
{
multicast_t multicast;
- with_mutex (client->mutex,
- if (client->multicasts_count > 0)
- {
- size_t c = (client->multicasts_count -= 1) * sizeof(multicast_t);
- multicast = client->multicasts[0];
- memmove(client->multicasts, client->multicasts + 1, c);
- if (c == 0)
- {
- free(client->multicasts);
- client->multicasts = NULL;
- }
- }
- );
+ with_mutex_if (client->mutex, client->multicasts_count > 0,
+ size_t c = (client->multicasts_count -= 1) * sizeof(multicast_t);
+ multicast = client->multicasts[0];
+ memmove(client->multicasts, client->multicasts + 1, c);
+ if (c == 0)
+ {
+ free(client->multicasts);
+ client->multicasts = NULL;
+ }
+ );
multicast_message(&multicast);
multicast_destroy(&multicast);
}
@@ -592,13 +589,13 @@ void send_reply_queue(client_t* client)
while (n > 0)
{
sent = send_message(client->socket_fd, sendbuf_, n);
- if ((sent < n) && (errno != EINTR)) /* Ignore EINTR */
+ n -= sent;
+ sendbuf_ += sent / sizeof(char);
+ if ((n > 0) && (errno != EINTR)) /* Ignore EINTR */
{
perror(*argv);
break;
}
- n -= sent;
- sendbuf_ += sent / sizeof(char);
}
free(sendbuf);
);
@@ -745,10 +742,7 @@ int message_received(client_t* client)
if (assign_id && (client->id == 0))
{
intercept |= 2;
- with_mutex (slave_mutex,
- client->id = next_id++;
- if (next_id == 0)
- {
+ with_mutex_if (slave_mutex, (client->id = ++next_id) == 0,
eprint("this is impossible, ID counter has overflowed.");
/* If the program ran for a millennium it would
take c:a 585 assignments per nanosecond. This
@@ -756,8 +750,7 @@ int message_received(client_t* client)
dedication by generations of ponies (or just an alicorn)
to maintain the process and transfer it new hardware.) */
abort();
- }
- );
+ );
}
/* Make the client listen for messages addressed to it. */
@@ -795,8 +788,7 @@ int message_received(client_t* client)
if (len > size)
{
char* old_buf = buf;
- buf = realloc(buf, ((size <<= 1) + 1) * sizeof(char));
- if (buf == NULL)
+ if (xrealloc(buf, (size <<= 1) + 1, char))
{
perror(*argv);
free(old_buf);
@@ -901,6 +893,35 @@ int message_received(client_t* client)
/**
+ * Remove interception condition by index
+ *
+ * @param client The intercepting client
+ * @param index The index of the condition
+ */
+static void remove_intercept_condition(client_t* client, size_t index)
+{
+ interception_condition_t* conds = client->interception_conditions;
+ size_t n = client->interception_conditions_count;
+
+ /* Remove the condition from the list. */
+ memmove(conds + index, conds + index + 1, --n - index);
+ client->interception_conditions_count--;
+
+ /* Shrink the list. */
+ if (client->interception_conditions_count == 0)
+ {
+ free(conds);
+ client->interception_conditions = NULL;
+ }
+ else
+ if (xrealloc(conds, n, interception_condition_t))
+ perror(*argv);
+ else
+ client->interception_conditions = conds;
+}
+
+
+/**
* Add an interception condition for a client
*
* @param client The client
@@ -934,51 +955,38 @@ void add_intercept_condition(client_t* client, char* condition, int64_t priority
with for optimisation. */
for (i = 0; i < n; i++)
{
- if ((conds[i].header_hash == hash) && strequals(conds[i].condition, condition))
+ if ((conds[i].header_hash != hash) || !strequals(conds[i].condition, condition))
{
- if (stop)
- {
- /* Remove the condition from the list. */
- memmove(conds + i, conds + i + 1, --n - i);
- client->interception_conditions_count--;
- /* Shrink the list. */
- if (n == 0)
- {
- free(conds);
- client->interception_conditions = NULL;
- }
- else
- if ((conds = realloc(conds, n * sizeof(interception_condition_t))) == NULL)
- perror(*argv);
- else
- client->interception_conditions = conds;
- }
- else
+ /* Look for the first non-modifying, this is a part of the
+ optimisation where we put all modifying conditions at the
+ beginning. */
+ if ((nonmodifying < 0) && conds[i].modifying)
+ nonmodifying = (ssize_t)i;
+ continue;
+ }
+
+ if (stop)
+ remove_intercept_condition(client, i);
+ else
+ {
+ /* Update parameters. */
+ conds[i].priority = priority;
+ conds[i].modifying = modifying;
+
+ if (modifying && (nonmodifying >= 0))
{
- /* Update parameters. */
- conds[i].priority = priority;
- conds[i].modifying = modifying;
-
- if (modifying && (nonmodifying >= 0))
- {
- /* Optimisation: put conditions that are modifying
- at the beginning. When a client is intercepting
- we most know if any satisfying condition is
- modifying. With this optimisation the first
- satisfying condition will tell us if there is
- any satisfying condition that is modifying. */
- interception_condition_t temp = conds[nonmodifying];
- conds[nonmodifying] = conds[i];
- conds[i] = temp;
- }
+ /* Optimisation: put conditions that are modifying
+ at the beginning. When a client is intercepting
+ we most know if any satisfying condition is
+ modifying. With this optimisation the first
+ satisfying condition will tell us if there is
+ any satisfying condition that is modifying. */
+ interception_condition_t temp = conds[nonmodifying];
+ conds[nonmodifying] = conds[i];
+ conds[i] = temp;
}
- return;
}
- /* Look for the first non-modifying, this is a part of the
- optimisation where we put all modifying conditions at the
- beginning. */
- if ((nonmodifying < 0) && conds[i].modifying)
- nonmodifying = (ssize_t)i;
+ return;
}
if (stop)
@@ -993,11 +1001,7 @@ void add_intercept_condition(client_t* client, char* condition, int64_t priority
}
/* Grow the interception condition list. */
- if (conds == NULL)
- conds = malloc(1 * sizeof(interception_condition_t));
- else
- conds = realloc(conds, (n + 1) * sizeof(interception_condition_t));
- if (conds == NULL)
+ if (xrealloc(conds, n + 1, interception_condition_t))
{
perror(*argv);
free(condition);
@@ -1188,7 +1192,7 @@ void queue_message_multicast(char* message, size_t length, client_t* sender)
xsnprintf(modify_id_header, "Modify ID: %" PRIu64 "\n", modify_id);
n = strlen(modify_id_header);
old_buf = message;
- if ((message = realloc(message, (n + length) * sizeof(char))) == NULL)
+ if (xrealloc(message, n + length, char))
{
message = old_buf;
goto fail;
@@ -1247,8 +1251,8 @@ void queue_message_multicast(char* message, size_t length, client_t* sender)
void multicast_message(multicast_t* multicast)
{
uint64_t modify_id = 0;
- size_t n = min(strlen("Modify ID: "), multicast->message_length);
- if (!strncmp(multicast->message, "Modify ID: ", n))
+ size_t n = strlen("Modify ID: ");
+ if (startswith_n(multicast->message, "Modify ID: ", multicast->message_length, n))
{
char* value = multicast->message + n;
char* lf = strchr(value, '\n');
@@ -1281,22 +1285,16 @@ void multicast_message(multicast_t* multicast)
}
/* Send the message. */
+ n *= sizeof(char);
with_mutex (client->mutex,
- errno = 0;
- n *= sizeof(char);
if (client->open)
- while (n > 0)
- {
- sent = send_message(client->socket_fd, msg + multicast->message_ptr, n);
- if (sent < n)
- {
- if (errno != EINTR)
- perror(*argv);
- break;
- }
- n -= sent;
- multicast->message_ptr += sent / sizeof(char);
- }
+ {
+ sent = send_message(client->socket_fd, msg + multicast->message_ptr, n);
+ n -= sent;
+ multicast->message_ptr += sent / sizeof(char);
+ if ((n > 0) && (errno != EINTR))
+ perror(*argv);
+ }
);
/* Stop if we are re-exec:ing or terminating, or continue to next recipient on error. */
@@ -1331,42 +1329,35 @@ void multicast_message(multicast_t* multicast)
mds_message_t* mod;
/* Wait for a reply. */
- with_mutex (modify_mutex,
- if (client->modify_message == NULL)
- {
- if (hash_table_contains_key(&modify_map, (size_t)modify_id) == 0)
- {
- hash_table_put(&modify_map, (size_t)modify_id, (size_t)(void*)client);
- pthread_cond_signal(&slave_cond);
- }
- }
- );
- with_mutex (client->modify_mutex,
- if (client->modify_message == NULL)
- {
- while ((client->modify_message == NULL) && (terminating == 0))
- pthread_cond_timedwait(&slave_cond, &slave_mutex, &timeout);
- if (terminating == 0)
- hash_table_remove(&modify_map, (size_t)modify_id);
- }
- );
+ with_mutex_if (modify_mutex, client->modify_message == NULL,
+ if (hash_table_contains_key(&modify_map, (size_t)modify_id) == 0)
+ {
+ hash_table_put(&modify_map, (size_t)modify_id, (size_t)(void*)client);
+ pthread_cond_signal(&slave_cond);
+ }
+ );
+ with_mutex_if (client->modify_mutex, client->modify_message == NULL,
+ while ((client->modify_message == NULL) && (terminating == 0))
+ pthread_cond_timedwait(&slave_cond, &slave_mutex, &timeout);
+ if (terminating == 0)
+ hash_table_remove(&modify_map, (size_t)modify_id);
+ );
if (terminating)
return;
/* Act upon the reply. */
mod = client->modify_message;
for (i = 0; i < mod->header_count; i++)
- if (!strcmp(mod->headers[i], "Modify: yes"))
+ if (strequals(mod->headers[i], "Modify: yes"))
{
modifying = 1;
break;
}
if (modifying)
{
- old_buf = multicast->message;
n = mod->payload_size;
- multicast->message = realloc(old_buf, (multicast->message_prefix + n) * sizeof(char));
- if (multicast->message == NULL)
+ old_buf = multicast->message;
+ if (xrealloc(multicast->message, multicast->message_prefix + n, char))
{
perror(*argv);
multicast->message = old_buf;
@@ -1507,11 +1498,12 @@ void signal_all(int signo)
with_mutex (slave_mutex,
foreach_linked_list_node (client_list, node)
- {
- client_t* value = (client_t*)(void*)(client_list.values[node]);
- if (pthread_equal(current_thread, value->thread) == 0)
- pthread_kill(value->thread, signo);
- });
+ {
+ client_t* value = (client_t*)(void*)(client_list.values[node]);
+ if (pthread_equal(current_thread, value->thread) == 0)
+ pthread_kill(value->thread, signo);
+ }
+ );
}