/** * mds — A micro-display server * Copyright © 2014, 2015, 2016, 2017 Mattias Andrée (maandree@kth.se) * * This program is free software: you can redistribute it and/or modify * it under the terms of the GNU General Public License as published by * the Free Software Foundation, either version 3 of the License, or * (at your option) any later version. * * This program is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * GNU General Public License for more details. * * You should have received a copy of the GNU General Public License * along with this program. If not, see . */ #include "sending.h" #include "globals.h" #include "client.h" #include "queued-interception.h" #include "multicast.h" #include #include #include #include #include #include #include #include #include /** * Get the client by its socket's file descriptor in a synchronised manner * * @param client_fd The file descriptor of the client's socket * @return The client */ static client_t * client_by_socket(int client_fd) { size_t address; with_mutex (slave_mutex, address = fd_table_get(&client_map, client_fd);); return (client_t*)(void*)address; } /** * Send a multicast message to one recipient * * @param multicast The message * @param recipient The recipient * @param modifying Whether the recipient may modify the message * @return Evaluates to true if and only if the entire message was sent */ static int __attribute__((nonnull)) send_multicast_to_recipient(multicast_t *multicast, client_t *recipient, int modifying) { char *msg = multicast->message + multicast->message_ptr; size_t n = multicast->message_length - multicast->message_ptr; size_t sent; /* Skip Modify ID header if the interceptors will not perform a modification. */ if (!modifying && !multicast->message_ptr) { n -= multicast->message_prefix; multicast->message_ptr += multicast->message_prefix; } /* Send the message. */ n *= sizeof(char); with_mutex (recipient->mutex, if (recipient->open) { sent = send_message(recipient->socket_fd, msg + multicast->message_ptr, n); n -= sent; multicast->message_ptr += sent / sizeof(char); if (n > 0 && errno != EINTR) xperror(*argv); } ); return !n; } /** * Wait for the recipient of a multicast to reply * * @param recipient The recipient * @param modify_id The modify ID of the multicast */ static void __attribute__((nonnull)) wait_for_reply(client_t *recipient, uint64_t modify_id) { /* pthread_cond_timedwait is required to handle re-exec and termination because pthread_cond_timedwait and pthread_cond_wait ignore interruptions via signals. */ struct timespec timeout = { .tv_sec = 1, .tv_nsec = 0 }; with_mutex_if (modify_mutex, !recipient->modify_message, if (!hash_table_contains_key(&modify_map, (size_t)modify_id)) { hash_table_put(&modify_map, (size_t)modify_id, (size_t)(void*)recipient); pthread_cond_signal(&slave_cond); } ); with_mutex_if (recipient->modify_mutex, !recipient->modify_message, while (!recipient->modify_message && !terminating) pthread_cond_timedwait(&slave_cond, &slave_mutex, &timeout); if (!terminating) hash_table_remove(&modify_map, (size_t)modify_id); ); } /** * Multicast a message * * @param multicast The multicast message */ void multicast_message(multicast_t *multicast) { int consumed = 0, modifying = 0; uint64_t modify_id = 0; size_t i, n = strlen("Modify ID: "); char *value, *lf, *old_buf; mds_message_t* mod; client_t* client; queued_interception_t client_; if (startswith_n(multicast->message, "Modify ID: ", multicast->message_length, n)) { value = multicast->message + n; lf = strchr(value, '\n'); *lf = '\0'; modify_id = atou64(value); *lf = '\n'; } for (; multicast->interceptions_ptr < multicast->interceptions_count; multicast->interceptions_ptr++) { client_ = multicast->interceptions[multicast->interceptions_ptr]; client = client_.client; modifying = 0; /* After unmarshalling at re-exec, client will be NULL and must be mapped from its socket. */ if (!client) client_.client = client = client_by_socket(client_.socket_fd); /* Send the message to the recipient. */ if (!send_multicast_to_recipient(multicast, client, client_.modifying)) { /* Stop if we are re-exec:ing or terminating, or continue to next recipient on error. */ if (terminating) return; else continue; } /* Do not wait for a reply if it is non-modifying. */ if (!client_.modifying) { /* Reset how much of the message has been sent before we continue with next recipient. */ multicast->message_ptr = 0; continue; } /* Wait for a reply. */ wait_for_reply(client, modify_id); if (terminating) return; /* Act upon the reply. */ mod = client->modify_message; for (i = 0; i < mod->header_count; i++) { if (strequals(mod->headers[i], "Modify: yes")) { modifying = 1; consumed = mod->payload_size == 0; break; } } if (modifying && !consumed) { n = mod->payload_size; old_buf = multicast->message; if (xrealloc(multicast->message, multicast->message_prefix + n, char)) { xperror(*argv); multicast->message = old_buf; } else { memcpy(multicast->message + multicast->message_prefix, mod->payload, n); } } /* Free the reply. */ mds_message_destroy(client->modify_message); /* Reset how much of the message has been sent before we continue with next recipient. */ multicast->message_ptr = 0; if (consumed) break; } } /** * Send the next message in a clients multicast queue * * @param client The client */ void send_multicast_queue(client_t *client) { multicast_t multicast; size_t c; while (client->multicasts_count > 0) { with_mutex_if (client->mutex, client->multicasts_count > 0, c = (client->multicasts_count -= 1) * sizeof(multicast_t); multicast = client->multicasts[0]; memmove(client->multicasts, client->multicasts + 1, c); if (c == 0) { free(client->multicasts); client->multicasts = NULL; } ); multicast_message(&multicast); multicast_destroy(&multicast); } } /** * Send the messages that are in a clients reply queue * * @param client The client */ void send_reply_queue(client_t *client) { char *sendbuf = client->send_pending; char *sendbuf_ = sendbuf; size_t sent, n; if (!client->send_pending_size) return; n = client->send_pending_size; client->send_pending_size = 0; client->send_pending = NULL; with_mutex (client->mutex, while (n > 0) { sent = send_message(client->socket_fd, sendbuf_, n); n -= sent; sendbuf_ += sent / sizeof(char); if (n > 0 && errno != EINTR) { /* Ignore EINTR */ xperror(*argv); break; } } free(sendbuf); ); }