/** * mds — A micro-display server * Copyright © 2014, 2015 Mattias Andrée (maandree@member.fsf.org) * * This program is free software: you can redistribute it and/or modify * it under the terms of the GNU General Public License as published by * the Free Software Foundation, either version 3 of the License, or * (at your option) any later version. * * This program is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * GNU General Public License for more details. * * You should have received a copy of the GNU General Public License * along with this program. If not, see . */ #include "sending.h" #include "globals.h" #include "client.h" #include "queued-interception.h" #include "multicast.h" #include #include #include #include #include #include #include #include #include /** * Get the client by its socket's file descriptor in a synchronised manner * * @param client_fd The file descriptor of the client's socket * @return The client */ static client_t* client_by_socket(int client_fd) { size_t address; with_mutex (slave_mutex, address = fd_table_get(&client_map, client_fd);); return (client_t*)(void*)address; } /** * Send a multicast message to one recipient * * @param multicast The message * @param recipient The recipient * @param modifying Whether the recipient may modify the message * @return Evaluates to true if and only if the entire message was sent */ static int send_multicast_to_recipient(multicast_t* multicast, client_t* recipient, int modifying) { char* msg = multicast->message + multicast->message_ptr; size_t n = multicast->message_length - multicast->message_ptr; size_t sent; /* Skip Modify ID header if the interceptors will not perform a modification. */ if ((modifying == 0) && (multicast->message_ptr == 0)) { n -= multicast->message_prefix; multicast->message_ptr += multicast->message_prefix; } /* Send the message. */ n *= sizeof(char); with_mutex (recipient->mutex, if (recipient->open) { sent = send_message(recipient->socket_fd, msg + multicast->message_ptr, n); n -= sent; multicast->message_ptr += sent / sizeof(char); if ((n > 0) && (errno != EINTR)) xperror(*argv); } ); return n == 0; } /** * Wait for the recipient of a multicast to reply * * @param recipient The recipient * @param modify_id The modify ID of the multicast */ static void wait_for_reply(client_t* recipient, uint64_t modify_id) { /* pthread_cond_timedwait is required to handle re-exec and termination because pthread_cond_timedwait and pthread_cond_wait ignore interruptions via signals. */ struct timespec timeout = { .tv_sec = 1, .tv_nsec = 0 }; with_mutex_if (modify_mutex, recipient->modify_message == NULL, if (hash_table_contains_key(&modify_map, (size_t)modify_id) == 0) { hash_table_put(&modify_map, (size_t)modify_id, (size_t)(void*)recipient); pthread_cond_signal(&slave_cond); } ); with_mutex_if (recipient->modify_mutex, recipient->modify_message == NULL, while ((recipient->modify_message == NULL) && (terminating == 0)) pthread_cond_timedwait(&slave_cond, &slave_mutex, &timeout); if (terminating == 0) hash_table_remove(&modify_map, (size_t)modify_id); ); } /** * Multicast a message * * @param multicast The multicast message */ void multicast_message(multicast_t* multicast) { int consumed = 0; uint64_t modify_id = 0; size_t n = strlen("Modify ID: "); if (startswith_n(multicast->message, "Modify ID: ", multicast->message_length, n)) { char* value = multicast->message + n; char* lf = strchr(value, '\n'); *lf = '\0'; modify_id = atou64(value); *lf = '\n'; } for (; multicast->interceptions_ptr < multicast->interceptions_count; multicast->interceptions_ptr++) { queued_interception_t client_ = multicast->interceptions[multicast->interceptions_ptr]; client_t* client = client_.client; int modifying = 0; char* old_buf; size_t i; mds_message_t* mod; /* After unmarshalling at re-exec, client will be NULL and must be mapped from its socket. */ if (client == NULL) client_.client = client = client_by_socket(client_.socket_fd); /* Send the message to the recipient. */ if (send_multicast_to_recipient(multicast, client, client_.modifying) == 0) { /* Stop if we are re-exec:ing or terminating, or continue to next recipient on error. */ if (terminating) return; else continue; } /* Do not wait for a reply if it is non-modifying. */ if (client_.modifying == 0) { /* Reset how much of the message has been sent before we continue with next recipient. */ multicast->message_ptr = 0; continue; } /* Wait for a reply. */ wait_for_reply(client, modify_id); if (terminating) return; /* Act upon the reply. */ mod = client->modify_message; for (i = 0; i < mod->header_count; i++) if (strequals(mod->headers[i], "Modify: yes")) { modifying = 1; consumed = mod->payload_size == 0; break; } if (modifying && !consumed) { n = mod->payload_size; old_buf = multicast->message; if (xrealloc(multicast->message, multicast->message_prefix + n, char)) { xperror(*argv); multicast->message = old_buf; } else memcpy(multicast->message + multicast->message_prefix, mod->payload, n); } /* Free the reply. */ mds_message_destroy(client->modify_message); /* Reset how much of the message has been sent before we continue with next recipient. */ multicast->message_ptr = 0; if (consumed) break; } } /** * Send the next message in a clients multicast queue * * @param client The client */ void send_multicast_queue(client_t* client) { while (client->multicasts_count > 0) { multicast_t multicast; with_mutex_if (client->mutex, client->multicasts_count > 0, size_t c = (client->multicasts_count -= 1) * sizeof(multicast_t); multicast = client->multicasts[0]; memmove(client->multicasts, client->multicasts + 1, c); if (c == 0) { free(client->multicasts); client->multicasts = NULL; } ); multicast_message(&multicast); multicast_destroy(&multicast); } } /** * Send the messages that are in a clients reply queue * * @param client The client */ void send_reply_queue(client_t* client) { char* sendbuf = client->send_pending; char* sendbuf_ = sendbuf; size_t sent; size_t n; if (client->send_pending_size == 0) return; n = client->send_pending_size; client->send_pending_size = 0; client->send_pending = NULL; with_mutex (client->mutex, while (n > 0) { sent = send_message(client->socket_fd, sendbuf_, n); n -= sent; sendbuf_ += sent / sizeof(char); if ((n > 0) && (errno != EINTR)) /* Ignore EINTR */ { xperror(*argv); break; } } free(sendbuf); ); }