diff options
Diffstat (limited to '')
-rw-r--r-- | src/mds-server/mds-server.c | 617 | ||||
-rw-r--r-- | src/mds-server/mds-server.h | 9 |
2 files changed, 277 insertions, 349 deletions
diff --git a/src/mds-server/mds-server.c b/src/mds-server/mds-server.c index 5b81222..8bb7b83 100644 --- a/src/mds-server/mds-server.c +++ b/src/mds-server/mds-server.c @@ -137,7 +137,6 @@ static pthread_cond_t modify_cond; static hash_table_t modify_map; - /** * Entry point of the server * @@ -160,24 +159,21 @@ int main(int argc_, char** argv_) #endif +#define exit_if(CONDITION, INSTRUCTION) if (CONDITION) { INSTRUCTION return 1; } + + argc = argc_; argv = argv_; /* Drop privileges like it's hot. */ if (drop_privileges()) - { - perror(*argv); - return 1; - } + goto pfail; /* Sanity check the number of command line arguments. */ - if (argc > ARGC_LIMIT + LIBEXEC_ARGC_EXTRA_LIMIT) - { - eprint("that number of arguments is ridiculous, I will not allow it."); - return 1; - } + exit_if (argc > ARGC_LIMIT + LIBEXEC_ARGC_EXTRA_LIMIT, + eprint("that number of arguments is ridiculous, I will not allow it.");); /* Parse command line arguments. */ @@ -187,26 +183,18 @@ int main(int argc_, char** argv_) int v; if ((v = strequals(arg, "--initial-spawn")) || /* Initial spawn? */ strequals(arg, "--respawn")) /* Respawning after crash? */ - if (is_respawn == v) - { - eprintf("conflicting arguments %s and %s cannot be combined.", - "--initial-spawn", "--respawn"); - return 1; - } - else + { + exit_if (is_respawn == v, + eprintf("conflicting arguments %s and %s cannot be combined.", + "--initial-spawn", "--respawn");); is_respawn = !v; + } else if (startswith(arg, "--socket-fd=")) /* Socket file descriptor. */ { - if (socket_fd != -1) - { - eprintf("duplicate declaration of %s.", "--socket-fd"); - return 1; - } - if (strict_atoi(arg += strlen("--socket-fd="), &socket_fd, 0, INT_MAX) < 0) - { - eprintf("invalid value for %s: %s.", "--socket-fd", arg); - return 1; - } + exit_if (socket_fd != -1, + eprintf("duplicate declaration of %s.", "--socket-fd");); + exit_if (strict_atoi(arg += strlen("--socket-fd="), &socket_fd, 0, INT_MAX) < 0, + eprintf("invalid value for %s: %s.", "--socket-fd", arg);); } else if (strequals(arg, "--re-exec")) /* Re-exec state-marshal. */ reexec = 1; @@ -223,17 +211,14 @@ int main(int argc_, char** argv_) /* Check that manditory arguments have been specified. */ - if (is_respawn < 0) - { - eprintf("missing state argument, require either %s or %s.", - "--initial-spawn", "--respawn"); - return 1; - } - if (socket_fd < 0) - { - eprint("missing socket file descriptor argument."); - return 1; - } + exit_if (is_respawn < 0, + eprintf("missing state argument, require either %s or %s.", + "--initial-spawn", "--respawn");); + exit_if (socket_fd < 0, + eprint("missing socket file descriptor argument.");); + + +#undef exit_if /* Run mdsinitrc. */ @@ -241,16 +226,13 @@ int main(int argc_, char** argv_) { pid_t pid = fork(); if (pid == (pid_t)-1) - { - perror(*argv); - return 1; - } + goto pfail; if (pid == 0) /* Child process exec:s, the parent continues without waiting for it. */ { /* Close all files except stdin, stdout and stderr. */ close_files((fd > 2) || (fd == socket_fd)); - /* Run initrc */ + /* Run mdsinitrc. */ run_initrc(unparsed_args); return 1; } @@ -265,78 +247,36 @@ int main(int argc_, char** argv_) if (I >= 5) pthread_cond_destroy(&modify_cond); \ if (I >= 6) hash_table_destroy(&modify_map, NULL, NULL) +#define __error(I) perror(*argv); __free(I); return 1 + /* Create list and table of clients. */ if (reexec == 0) { - if (fd_table_create(&client_map)) - { - perror(*argv); - __free(0); - return 1; - } - if (linked_list_create(&client_list, 32)) - { - perror(*argv); - __free(1); - return 1; - } + if (fd_table_create(&client_map)) { __error(0); } + if (linked_list_create(&client_list, 32)) { __error(1); } } /* Store the current thread so it can be killed from elsewhere. */ master_thread = pthread_self(); - /* Make the server update without all slaves dying on SIGUSR1. */ - if (xsigaction(SIGUSR1, sigusr1_trap) < 0) - { - perror(*argv); - __free(1); - return 1; - } + if (xsigaction(SIGUSR1, sigusr1_trap) < 0) { __error(1); } /* Implement clean exit on SIGTERM. */ - if (xsigaction(SIGTERM, sigterm_trap) < 0) - { - perror(*argv); - __free(1); - return 1; - } + if (xsigaction(SIGTERM, sigterm_trap) < 0) { __error(1); } /* Create mutex and condition for slave counter. */ - if ((errno = pthread_mutex_init(&slave_mutex, NULL)) != 0) - { - perror(*argv); - __free(1); - return 1; - } - if ((errno = pthread_cond_init(&slave_cond, NULL)) != 0) - { - perror(*argv); - __free(2); - return 1; - } + if ((errno = pthread_mutex_init(&slave_mutex, NULL))) { __error(1); } + if ((errno = pthread_cond_init(&slave_cond, NULL))) { __error(2); } /* Create mutex, condition and map for message modification. */ - if ((errno = pthread_mutex_init(&modify_mutex, NULL)) != 0) - { - perror(*argv); - __free(3); - return 1; - } - if ((errno = pthread_cond_init(&modify_cond, NULL)) != 0) - { - perror(*argv); - __free(4); - return 1; - } - if (hash_table_create(&modify_map)) - { - perror(*argv); - __free(5); - return 1; - } + if ((errno = pthread_mutex_init(&modify_mutex, NULL))) { __error(3); } + if ((errno = pthread_cond_init(&modify_cond, NULL))) { __error(4); } + if (hash_table_create(&modify_map)) { __error(5); } + +#undef __error /* Unmarshal the state of the server. */ @@ -400,15 +340,7 @@ int main(int argc_, char** argv_) with_mutex(slave_mutex, running_slaves++;); /* Start slave thread. */ - errno = pthread_create(&slave_thread, NULL, slave_loop, (void*)(intptr_t)client_fd); - if (errno) - { - perror(*argv); - with_mutex(slave_mutex, running_slaves--;); - } - errno = pthread_detach(slave_thread); - if (errno) - perror(*argv); + create_slave(&slave_thread, client_fd); } terminate: @@ -424,10 +356,10 @@ int main(int argc_, char** argv_) /* Release resources. */ __free(9999); -#undef __free - return 0; +#undef __free + reexec: { @@ -452,10 +384,7 @@ int main(int argc_, char** argv_) xsnprintf(shm_path, SHM_PATH_PATTERN, (unsigned long int)pid); reexec_fd = shm_open(shm_path, O_RDWR | O_CREAT | O_EXCL, S_IRWXU); if (reexec_fd < 0) - { - perror(*argv); - return 1; - } + goto pfail; if (marshal_server(reexec_fd) < 0) goto reexec_fail; close(reexec_fd); @@ -482,6 +411,10 @@ int main(int argc_, char** argv_) be respawn if the re-exec fails. */ return 1; } + + pfail: + perror(*argv); + return 1; } @@ -507,10 +440,7 @@ void* slave_loop(void* data) { /* Create information table. */ if (xmalloc(information, 1, client_t)) - { - perror(*argv); - goto fail; - } + goto pfail; /* NULL-out pointers and initialisation markers. */ information->interception_conditions = NULL; @@ -550,10 +480,7 @@ void* slave_loop(void* data) information->send_pending_size = 0; information->multicasts_count = 0; if (mds_message_initialise(&(information->message))) - { - perror(*argv); - goto fail; - } + goto pfail; } @@ -561,129 +488,102 @@ void* slave_loop(void* data) information->thread = pthread_self(); /* Create mutex to make sure two thread to not try to send messages concurrently, and other slave local actions. */ - if ((errno = pthread_mutex_init(&(information->mutex), NULL)) != 0) - { - perror(*argv); - goto fail; - } + if ((errno = pthread_mutex_init(&(information->mutex), NULL))) goto pfail; information->mutex_created = 1; /* Create mutex and codition for multicast interception replies. */ - if ((errno = pthread_mutex_init(&(information->modify_mutex), NULL)) != 0) - { - perror(*argv); - goto fail; - } + if ((errno = pthread_mutex_init(&(information->modify_mutex), NULL))) goto pfail; information->modify_mutex_created = 1; - if ((errno = pthread_cond_init(&(information->modify_cond), NULL)) != 0) - { - perror(*argv); - goto fail; - } + if ((errno = pthread_cond_init(&(information->modify_cond), NULL))) goto pfail; information->modify_cond_created = 1; /* Make the server update without all slaves dying on SIGUSR1. */ - if (xsigaction(SIGUSR1, sigusr1_trap) < 0) - { - perror(*argv); - goto fail; - } + if (xsigaction(SIGUSR1, sigusr1_trap) < 0) goto pfail; /* Implement clean exit on SIGTERM. */ - if (xsigaction(SIGTERM, sigterm_trap) < 0) - { - perror(*argv); - goto fail; - } + if (xsigaction(SIGTERM, sigterm_trap) < 0) goto pfail; /* Fetch messages from the slave. */ - if (information->open) - while (terminating == 0) - { - /* Send queued multicast messages. */ - if (information->multicasts_count > 0) + while ((terminating == 0) && (information->open == 0)) + { + /* Send queued multicast messages. */ + if (information->multicasts_count > 0) + { + multicast_t multicast; + with_mutex(information->mutex, + if (information->multicasts_count > 0) + { + size_t c = (information->multicasts_count -= 1) * sizeof(multicast_t); + multicast = information->multicasts[0]; + memmove(information->multicasts, information->multicasts + 1, c); + if (c == 0) + { + free(information->multicasts); + information->multicasts = NULL; + } + } + ); + multicast_message(&multicast); + multicast_destroy(&multicast); + } + + /* Send queued messages. */ + if (information->send_pending_size > 0) + { + char* sendbuf = information->send_pending; + char* sendbuf_ = sendbuf; + size_t sent; + n = information->send_pending_size; + information->send_pending_size = 0; + information->send_pending = NULL; + with_mutex(information->mutex, + while (n > 0) + { + sent = send_message(information->socket_fd, sendbuf_, n); + if ((sent < n) && (errno != EINTR)) /* Ignore EINTR */ + { + perror(*argv); + break; + } + n -= sent; + sendbuf_ += sent / sizeof(char); + } + free(sendbuf); + ); + } + + /* Fetch message. */ + r = mds_message_read(&(information->message), socket_fd); + if (r == 0) + { + if (message_received(information) == 1) + goto terminate; + } + else + if (r == -2) { - multicast_t multicast; - with_mutex(information->mutex, - if (information->multicasts_count > 0) - { - size_t c = information->multicasts_count -= 1; - c *= sizeof(multicast_t); - multicast = information->multicasts[0]; - memmove(information->multicasts, information->multicasts + 1, c); - if (c == 0) - { - free(information->multicasts); - information->multicasts = NULL; - } - } - ); - multicast_message(&multicast); - multicast_destroy(&multicast); + eprint("corrupt message received."); + goto fail; } - - /* Send queued messages. */ - if (information->send_pending_size > 0) + else if (errno == ECONNRESET) { - char* sendbuf = information->send_pending; - char* sendbuf_ = sendbuf; - size_t sent; - n = information->send_pending_size; - information->send_pending_size = 0; - information->send_pending = NULL; - with_mutex(information->mutex, - while (n > 0) - { - sent = send_message(information->socket_fd, sendbuf_, n); - if ((sent < n) && (errno != EINTR)) /* Ignore EINTR */ - { - perror(*argv); - break; - } - n -= sent; - sendbuf_ += sent / sizeof(char); - } - free(sendbuf); - ); + r = mds_message_read(&(information->message), socket_fd); + if ((r == 0) && message_received(information)) + goto terminate; + information->open = 0; + /* Connection closed. */ } - - /* Fetch message. */ - r = mds_message_read(&(information->message), socket_fd); - if (r == 0) + else if (errno == EINTR) { - if (message_received(information) == 1) + /* Stop the thread if we are re-exec:ing the server. */ + if (terminating) goto terminate; } else - if (r == -2) - { - eprint("corrupt message received."); - goto fail; - } - else if (errno == ECONNRESET) - { - r = mds_message_read(&(information->message), socket_fd); - if (r == 0) - if (message_received(information)) - goto terminate; - information->open = 0; - /* Connection closed. */ - break; - } - else if (errno == EINTR) - { - /* Stop the thread if we are re-exec:ing the server. */ - if (terminating) - goto terminate; - } - else - { - perror(*argv); - goto fail; - } - } + goto pfail; + } /* Stop the thread if we are re-exec:ing the server. */ if (terminating) goto terminate; @@ -725,6 +625,11 @@ void* slave_loop(void* data) return NULL; + pfail: + perror(*argv); + goto fail; + + reexec: /* Tell the master thread that the slave has closed, this is done because re-exec causes a race-condition @@ -1037,46 +942,46 @@ void add_intercept_condition(client_t* client, char* condition, int64_t priority with for optimisation. */ for (i = 0; i < n; i++) { - if (conds[i].header_hash == hash) - if (strequals(conds[i].condition, condition)) - { - if (stop) - { - /* Remove the condition from the list. */ - memmove(conds + i, conds + i + 1, --n - i); - client->interception_conditions_count--; - /* Shrink the list. */ - if (n == 0) - { - free(conds); - client->interception_conditions = NULL; - } + if ((conds[i].header_hash == hash) && strequals(conds[i].condition, condition)) + { + if (stop) + { + /* Remove the condition from the list. */ + memmove(conds + i, conds + i + 1, --n - i); + client->interception_conditions_count--; + /* Shrink the list. */ + if (n == 0) + { + free(conds); + client->interception_conditions = NULL; + } + else + if ((conds = realloc(conds, n * sizeof(interception_condition_t))) == NULL) + perror(*argv); else - if ((conds = realloc(conds, n * sizeof(interception_condition_t))) == NULL) - perror(*argv); - else - client->interception_conditions = conds; - } - else - { - /* Update parameters. */ - conds[i].priority = priority; - conds[i].modifying = modifying; - if (modifying && (nonmodifying >= 0)) - { - /* Optimisation: put conditions that are modifying - at the beginning. When a client is intercepting - we most know if any satisfying condition is - modifying. With this optimisation the first - satisfying condition will tell us if there is - any satisfying condition that is modifying. */ - interception_condition_t temp = conds[nonmodifying]; - conds[nonmodifying] = conds[i]; - conds[i] = temp; - } - } - return; - } + client->interception_conditions = conds; + } + else + { + /* Update parameters. */ + conds[i].priority = priority; + conds[i].modifying = modifying; + + if (modifying && (nonmodifying >= 0)) + { + /* Optimisation: put conditions that are modifying + at the beginning. When a client is intercepting + we most know if any satisfying condition is + modifying. With this optimisation the first + satisfying condition will tell us if there is + any satisfying condition that is modifying. */ + interception_condition_t temp = conds[nonmodifying]; + conds[nonmodifying] = conds[i]; + conds[i] = temp; + } + } + return; + } /* Look for the first non-modifying, this is a part of the optimisation where we put all modifying conditions at the beginning. */ @@ -1411,79 +1316,79 @@ void multicast_message(multicast_t* multicast) continue; } - /* Wait for a reply and act upon it. */ - if ((n == 0) && client_.modifying) + /* Do not wait for a reply if it is non-modifying. */ + if (client_.modifying) { - /* pthread_cond_timedwait is required to handle re-exec because - pthread_cond_timedwait and pthread_cond_wait ignore interruptions via signals. */ - struct timespec timeout = - { - .tv_sec = 1, - .tv_nsec = 0 - }; - int modifying = 0; - char* old_buf; - size_t i; - mds_message_t* mod; - - /* Wait for a reply. */ - with_mutex(modify_mutex, - if (client->modify_message == NULL) - { - if (hash_table_contains_key(&modify_map, (size_t)modify_id) == 0) - { - hash_table_put(&modify_map, (size_t)modify_id, (size_t)(void*)client); - pthread_cond_signal(&slave_cond); - } - } - ); - with_mutex(client->modify_mutex, - if (client->modify_message == NULL) - { - for (;;) - { - pthread_cond_timedwait(&slave_cond, &slave_mutex, &timeout); - if ((client->modify_message != NULL) && terminating) - break; - } - if (terminating == 0) - hash_table_remove(&modify_map, (size_t)modify_id); - } - ); - if (terminating) - return; - - /* Act upon the reply. */ - if (client->modify_message != NULL) - { - mod = client->modify_message; - for (i = 0; i < mod->header_count; i++) - if (!strcmp(mod->headers[i], "Modify: yes")) - { - modifying = 1; - break; - } - if (modifying) - { - old_buf = multicast->message; - n = mod->payload_size; - multicast->message = realloc(old_buf, (multicast->message_prefix + n) * sizeof(char)); - if (multicast->message == NULL) - { - perror(*argv); - multicast->message = old_buf; - } - else - memcpy(multicast->message + multicast->message_prefix, mod->payload, n); - } - - /* Free the reply. */ - mds_message_destroy(client->modify_message); - } + /* Reset how much of the message has been sent before we continue with next recipient. */ + multicast->message_ptr = 0; + continue; } - /* Reset how much of the message has been sent before we continue with next recipient. */ - multicast->message_ptr = 0; + /* Wait for a reply and act upon it. */ + { + /* pthread_cond_timedwait is required to handle re-exec because + pthread_cond_timedwait and pthread_cond_wait ignore interruptions via signals. */ + struct timespec timeout = + { + .tv_sec = 1, + .tv_nsec = 0 + }; + int modifying = 0; + char* old_buf; + size_t i; + mds_message_t* mod; + + /* Wait for a reply. */ + with_mutex(modify_mutex, + if (client->modify_message == NULL) + { + if (hash_table_contains_key(&modify_map, (size_t)modify_id) == 0) + { + hash_table_put(&modify_map, (size_t)modify_id, (size_t)(void*)client); + pthread_cond_signal(&slave_cond); + } + } + ); + with_mutex(client->modify_mutex, + if (client->modify_message == NULL) + { + while ((client->modify_message == NULL) && (terminating == 0)) + pthread_cond_timedwait(&slave_cond, &slave_mutex, &timeout); + if (terminating == 0) + hash_table_remove(&modify_map, (size_t)modify_id); + } + ); + if (terminating) + return; + + /* Act upon the reply. */ + mod = client->modify_message; + for (i = 0; i < mod->header_count; i++) + if (!strcmp(mod->headers[i], "Modify: yes")) + { + modifying = 1; + break; + } + if (modifying) + { + old_buf = multicast->message; + n = mod->payload_size; + multicast->message = realloc(old_buf, (multicast->message_prefix + n) * sizeof(char)); + if (multicast->message == NULL) + { + perror(*argv); + multicast->message = old_buf; + } + else + memcpy(multicast->message + multicast->message_prefix, mod->payload, n); + } + + /* Free the reply. */ + mds_message_destroy(client->modify_message); + + /* Reset how much of the message has been sent before we continue with next recipient. */ + multicast->message_ptr = 0; + } } } @@ -1809,18 +1714,12 @@ int unmarshal_server(int fd) /* Unmarshal the client list. */ if (linked_list_unmarshal(&client_list, state_buf_)) - { - perror(*argv); - abort(); /* Critical. */ - } + goto critical_fail; state_buf_ += list_size / sizeof(char); /* Unmarshal the client map. */ if (fd_table_unmarshal(&client_map, state_buf_, unmarshal_remapper)) - { - perror(*argv); - abort(); /* Critical. */ - } + goto critical_fail; /* Release the raw data. */ free(state_buf); @@ -1851,15 +1750,7 @@ int unmarshal_server(int fd) with_mutex(slave_mutex, running_slaves++;); /* Start slave thread. */ - errno = pthread_create(&slave_thread, NULL, slave_loop, (void*)(intptr_t)socket_fd); - if (errno) - { - perror(*argv); - with_mutex(slave_mutex, running_slaves--;); - } - errno = pthread_detach(slave_thread); - if (errno) - perror(*argv); + create_slave(&slave_thread, socket_fd); } } @@ -1867,5 +1758,33 @@ int unmarshal_server(int fd) hash_table_destroy(&unmarshal_remap_map, NULL, NULL); return -with_error; + + critical_fail: + perror(*argv); + abort(); +} + + +/** + * Create, start and detache a slave thread + * + * @param thread The address at where to store the thread + * @param socket_fd The file descriptor of the slave's socket + * @return Zero on success, -1 on error, error message will have been printed + */ +int create_slave(pthread_t* thread_slot, int socket_fd) +{ + if ((errno = pthread_create(thread_slot, NULL, slave_loop, (void*)(intptr_t)socket_fd))) + { + perror(*argv); + with_mutex(slave_mutex, running_slaves--;); + return -1; + } + if ((errno = pthread_detach(*thread_slot))) + { + perror(*argv); + return -1; + } + return 0; } diff --git a/src/mds-server/mds-server.h b/src/mds-server/mds-server.h index 43662d7..08f7fd4 100644 --- a/src/mds-server/mds-server.h +++ b/src/mds-server/mds-server.h @@ -116,6 +116,15 @@ int marshal_server(int fd); */ int unmarshal_server(int fd); +/** + * Create, start and detache a slave thread + * + * @param thread The address at where to store the thread + * @param socket_fd The file descriptor of the slave's socket + * @return Zero on success, -1 on error, error message will have been printed + */ +int create_slave(pthread_t* thread_slot, int socket_fd); + #endif |