aboutsummaryrefslogblamecommitdiffstats
path: root/src/mds-server/receiving.c
blob: 60e86b7c73648bd55fec846b1580330f33db3e8b (plain) (tree)
1
2
3

                                 
                                                                         





































                                                                        
                        
                                                                             









                                                                                       

                                                                             
 

















                                                                                            
         





















                                                                                           











                                                                                 

                                                                                                  
 












                                                                                
         

























                                                                                
         





                  









                                                                                 

                                                            
 










































                                                                                                  
           
  



                         









                                                                                    

                                  
 























                                                                                                       
         



























































                                                                                                               
 
/**
 * mds — A micro-display server
 * Copyright © 2014, 2015, 2016, 2017  Mattias Andrée (maandree@kth.se)
 * 
 * This program is free software: you can redistribute it and/or modify
 * it under the terms of the GNU General Public License as published by
 * the Free Software Foundation, either version 3 of the License, or
 * (at your option) any later version.
 * 
 * This program is distributed in the hope that it will be useful,
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 * GNU General Public License for more details.
 * 
 * You should have received a copy of the GNU General Public License
 * along with this program.  If not, see <http://www.gnu.org/licenses/>.
 */
#include "receiving.h"

#include "globals.h"
#include "client.h"
#include "interceptors.h"

#include <libmdsserver/hash-table.h>
#include <libmdsserver/mds-message.h>
#include <libmdsserver/macros.h>

#include <stddef.h>
#include <inttypes.h>
#include <errno.h>
#include <string.h>
#include <stdio.h>


/**
 * Queue a message for multicasting
 * 
 * @param  message  The message
 * @param  length   The length of the message
 * @param  sender   The original sender of the message
 */
__attribute__((nonnull))
void queue_message_multicast(char *message, size_t length, client_t *sender);


/**
 * Notify waiting client about a received message modification
 * 
 * @param   client     The client whom sent the message
 * @param   message    The message
 * @param   modify_id  The modify ID of the message
 * @return             Normally zero, but 1 if exited because of re-exec or termination
 */
static int __attribute__((nonnull))
modifying_notify(client_t *client, mds_message_t message, uint64_t modify_id)
{
	/* pthread_cond_timedwait is required to handle re-exec and termination because
	   pthread_cond_timedwait and pthread_cond_wait ignore interruptions via signals. */
	struct timespec timeout = {
		.tv_sec = 1,
		.tv_nsec = 0
	};
	size_t address;
	client_t *recipient;
	mds_message_t *multicast;
	size_t i;

	pthread_mutex_lock(&(modify_mutex));
	while (!hash_table_contains_key(&modify_map, (size_t)modify_id)) {
		if (terminating) {
			pthread_mutex_unlock(&(modify_mutex));
			return 1;
		}
		pthread_cond_timedwait(&slave_cond, &slave_mutex, &timeout);
	}
	address = hash_table_get(&modify_map, (size_t)modify_id);
	recipient = (void *)address;
	fail_if (xmalloc(multicast = recipient->modify_message, 1, mds_message_t));
	mds_message_zero_initialise(multicast);
	fail_if (xmemdup(multicast->payload, message.payload, message.payload_size, char));
	fail_if (xmalloc(multicast->headers, message.header_count, char*));
	for (i = 0; i < message.header_count; i++, multicast->header_count++)
		fail_if (xstrdup(multicast->headers[i], message.headers[i]));
done:
	pthread_mutex_unlock(&(modify_mutex));
	with_mutex (client->modify_mutex, pthread_cond_signal(&(client->modify_cond)););

	return 0;

fail:
	xperror(*argv);
	if (multicast) {
		mds_message_destroy(multicast);
		free(multicast);
		recipient->modify_message = NULL;
	}
	goto done;
}


/**
 * Add intercept conditions listed in the payload of a message
 * 
 * @param   client     The intercepting client
 * @param   modifying  Whether then client may modify the messages
 * @param   priority   The client's interception priority
 * @param   stop       Whether to stop listening rather than start or reconfigure
 * @return             Zero on success, -1 on error
 */
static int __attribute__((nonnull))
add_intercept_conditions_from_message(client_t *client, int modifying, int64_t priority, int stop)
{
	int saved_errno;
	char *payload = client->message.payload;
	size_t payload_size = client->message.payload_size;
	size_t size = 64, len;
	char *buf, *end, *old_buf;

	fail_if (xmalloc(buf, size + 1, char));

	/* All messages. */
	if (!client->message.payload_size) {
		*buf = '\0';
		add_intercept_condition(client, buf, priority, modifying, stop);
		goto done;
	}

	/* Filtered messages. */
	for (;;) {
		end = memchr(payload, '\n', payload_size);
		len = !end ? payload_size : (size_t)(end - payload);
		if (len == 0) {
			payload++;
			payload_size--;
			break;
		}
		if (len > size) {
			old_buf = buf;
			if (xrealloc(buf, (size <<= 1) + 1, char)) {
				saved_errno = errno;
				free(old_buf);
				pthread_mutex_unlock(&(client->mutex));
				fail_if (errno = saved_errno, 1);
			}
		}
		memcpy(buf, payload, len);
		buf[len] = '\0';
		add_intercept_condition(client, buf, priority, modifying, stop);
		if (!end)
			break;
		payload = end + 1;
		payload_size -= len + 1;
	}

done:
	free(buf);
	return 0;
fail:
	return -1;
}


/**
 * Assign and ID to a client, if not already assigned, and send it to that client
 * 
 * @param   client      The client to who an ID should be assigned
 * @param   message_id  The message ID of the ID request
 * @return              Zero on success, -1 on error
 */
static int __attribute__((nonnull(1)))
assign_and_send_id(client_t *client, const char *message_id)
{
	char *msgbuf = NULL;
	char *msgbuf_;
	char *msg_new;
	size_t n, new_len;
	int rc = -1;

	/* Construct response. */
	n = 2 * 10 + strlen(message_id);
	n += sizeof("ID assignment: :\nIn response to: \n\n") / sizeof(char);
	fail_if (xmalloc(msgbuf, n, char));
	snprintf(msgbuf, n,
	         "ID assignment: %" PRIu32 ":%" PRIu32 "\n"
	         "In response to: %s\n"
	         "\n",
	         (uint32_t)(client->id >> 32),
	         (uint32_t)(client->id >>  0),
	         !message_id ? "" : message_id);
	n = strlen(msgbuf);

	/* Multicast the reply. */
	fail_if (xstrdup(msgbuf_, msgbuf));
	queue_message_multicast(msgbuf_, n, client);

	/* Queue message to be sent when this function returns.
	   This done to simplify `multicast_message` for re-exec and termination. */
#define fail fail_in_mutex
	with_mutex (client->mutex,
	            if (!client->send_pending_size) {
	                    /* Set the pending message. */
	                    client->send_pending = msgbuf;
	                    client->send_pending_size = n;
	            } else {
	                    /* Concatenate message to already pending messages. */
	                    new_len = client->send_pending_size + n;
	                    msg_new = client->send_pending;
	                    fail_if (xrealloc(msg_new, new_len, char));
	                    memcpy(msg_new + client->send_pending_size, msgbuf, n * sizeof(char));
	                    client->send_pending = msg_new;
	                    client->send_pending_size = new_len;
	            }
	            (msgbuf = NULL, rc = 0, errno = 0);
	fail_in_mutex:
	           );
#undef fail
  
fail: /* Also success. */
	xperror(*argv);
	free(msgbuf);
	return rc;
}


/**
 * Perform actions that should be taken when
 * a message has been received from a client
 * 
 * @param   client  The client whom sent the message
 * @return          Normally zero, but 1 if exited because of re-exec or termination
 */
int
message_received(client_t *client)
{
	mds_message_t message = client->message;
	int assign_id = 0;
	int modifying = 0;
	int intercept = 0;
	int64_t priority = 0;
	int stop = 0;
	const char *message_id = NULL;
	uint64_t modify_id = 0;
	char *msgbuf = NULL;
	size_t i, n;
	const char *h;
	char buf[26];


	/* Parser headers. */
	for (i = 0; i < message.header_count; i++) {
		h = message.headers[i];
		if      (strequals(h,  "Command: assign-id")) assign_id  = 1;
		else if (strequals(h,  "Command: intercept")) intercept  = 1;
		else if (strequals(h,  "Modifying: yes"))     modifying  = 1;
		else if (strequals(h,  "Stop: yes"))          stop       = 1;
		else if (startswith(h, "Message ID: "))       message_id = strstr(h, ": ") + 2;
		else if (startswith(h, "Priority: "))         priority   = ato64(strstr(h, ": ") + 2);
		else if (startswith(h, "Modify ID: "))        modify_id  = atou64(strstr(h, ": ") + 2);
	}


	/* Notify waiting client about a received message modification. */
	if (modifying)
		return modifying_notify(client, message, modify_id);
	/* Do nothing more, not not even multicast this message. */


	if (!message_id) {
		eprint("received message without a message ID, ignoring.");
		return 0;
	}

	/* Assign ID if not already assigned. */
	if (assign_id && !client->id) {
		intercept |= 2;
		with_mutex_if (slave_mutex, !(client->id = next_client_id++),
		               eprint("this is impossible, ID counter has overflowed.");
		               /* If the program ran for a millennium it would
		                  take c:a 585 assignments per nanosecond. This
		                  cannot possibly happen. (It would require serious
		                  dedication by generations of ponies (or just an alicorn)
		                  to maintain the process and transfer it new hardware.) */
		               abort();
		              );
	}

	/* Make the client listen for messages addressed to it. */
	if (intercept) {
		pthread_mutex_lock(&(client->mutex));
		if ((intercept & 1)) /* from payload */
			fail_if (add_intercept_conditions_from_message(client, modifying, priority, stop) < 0);
		if ((intercept & 2)) { /* "To: $(client->id)" */
			xsnprintf(buf, "To: %" PRIu32 ":%" PRIu32,
			          (uint32_t)(client->id >> 32),
			          (uint32_t)(client->id >>  0));
			add_intercept_condition(client, buf, priority, modifying, 0);
		}
		pthread_mutex_unlock(&(client->mutex));
	}


	/* Multicast the message. */
	n = mds_message_compose_size(&message);
	fail_if (xbmalloc(msgbuf, n));
	mds_message_compose(&message, msgbuf);
	queue_message_multicast(msgbuf, n / sizeof(char), client);
	msgbuf = NULL;


	/* Send asigned ID. */
	if (assign_id)
		fail_if (assign_and_send_id(client, message_id) < 0);

	return 0;

fail:
	xperror(*argv);
	free(msgbuf);
	return 0;
}