diff options
Diffstat (limited to 'src')
| -rw-r--r-- | src/mds-server.c | 117 | ||||
| -rw-r--r-- | src/mds-server.h | 10 | 
2 files changed, 97 insertions, 30 deletions
| diff --git a/src/mds-server.c b/src/mds-server.c index 3416777..715b826 100644 --- a/src/mds-server.c +++ b/src/mds-server.c @@ -417,6 +417,7 @@ void* slave_loop(void* data)        /* NULL-out pointers. */        information->interception_conditions = NULL; +      information->send_pending = NULL;        /* Add to list of clients. */        pthread_mutex_lock(&slave_mutex); @@ -443,6 +444,7 @@ void* slave_loop(void* data)        information->open = 1;        information->id = 0;        information->interception_conditions_count = 0; +      information->send_pending_size = 0;        if (mds_message_initialise(&(information->message)))  	{  	  perror(*argv); @@ -471,6 +473,32 @@ void* slave_loop(void* data)    if (information->open)      while (reexecing == 0)        { +	/* Send queued messages. */ +	if (information->send_pending_size > 0) +	  { +	    char* sendbuf = information->send_pending; +	    char* sendbuf_ = sendbuf; +	    size_t sent; +	    n = information->send_pending_size; +	    information->send_pending_size = 0; +	    information->send_pending = NULL; +	    with_mutex(information->mutex, +		       while (n > 0) +			 { +			   sent = send_message(information->socket_fd, sendbuf_, n); +			   if ((sent < n) && (errno != EINTR)) /* Ignore EINTR */ +			     { +			       perror(*argv); +			       break; +			     } +			   n -= sent; +			   sendbuf_ += sent; +			 } +		       free(sendbuf); +		       ); +	  } +	 +	/* Fetch message.*/  	r = mds_message_read(&(information->message), socket_fd);  	if (r == 0)  	  message_received(information); @@ -533,6 +561,7 @@ void* slave_loop(void* data)        if (mutex_created)  	pthread_mutex_destroy(&(information->mutex));        mds_message_destroy(&(information->message)); +      free(information->send_pending);        free(information);      } @@ -687,15 +716,13 @@ void message_received(client_t* client)        return;      }    mds_message_marshal(&message, msgbuf, 0); -  multicast_message(msgbuf, n / sizeof(char)); +  multicast_message(msgbuf, n / sizeof(char)); /* TODO support re-exec */    free(msgbuf);    /* Send asigned ID. */    if (assign_id)      { -      size_t sent; -              /* Construct response. */        n = 2 * 10 + strlen(message_id) + 1;        n += strlen("ID assignment: :\nIn response to: \n\n"); @@ -714,23 +741,33 @@ void message_received(client_t* client)        n = strlen(msgbuf);        /* Multicast the reply. */ -      multicast_message(msgbuf, n); +      multicast_message(msgbuf, n); /* TODO support re-exec */ -      /* Send message. */ +      /* Queue message to be sent when this function returns. +         This done to simplify `multicast_message` for re-exec. */        with_mutex(client->mutex, -		 while (n > 0) +		 if (client->send_pending_size == 0) +		   { +		     /* Set the pending message. */ +		     client->send_pending = msgbuf; +		     client->send_pending_size = n; +		   } +		 else  		   { -		     sent = send_message(client->socket_fd, msgbuf, n); -		     if ((sent < n) && (errno != EINTR)) /* Ignore EINTR */ +		     /* Concatenate message to already pending messages. */ +		     size_t new_len = client->send_pending_size + n; +		     char* msg_new = realloc(client->send_pending, new_len * sizeof(char)); +		     if (msg_new != NULL)  		       { -			 perror(*argv); -			 break; +			 memcpy(msg_new + client->send_pending_size, msgbuf, n * sizeof(char)); +			 client->send_pending = msg_new; +			 client->send_pending_size = new_len;  		       } -		     n -= sent; -		     msgbuf += sent; +		     else +		       perror(*argv); +		     free(msgbuf);  		   }  		 ); -      free(msgbuf);      }  } @@ -1001,23 +1038,20 @@ void multicast_message(char* message, size_t length)        size_t sent;        n = length; -      /* Skip if the client has closed. */ -      if (client->open == 0) -	continue; -              /* Send the message. */        with_mutex(client->mutex, -		 while (n > 0) -		   { -		     sent = send_message(client->socket_fd, msg, n); -		     if ((sent < n) && (errno != EINTR)) /* TODO Support reexec */ -		       { -			 perror(*argv); -			 break; +		 if (client->open) +		   while (n > 0) +		     { +		       sent = send_message(client->socket_fd, msg, n); +		       if ((sent < n) && (errno != EINTR)) /* TODO Support reexec */ +			 { +			   perror(*argv); +			   break;  		       } -		     n -= sent; -		     msg += sent; -		   } +		       n -= sent; +		       msg += sent; +		     }  		 );        /* Wait for a reply. */ @@ -1174,13 +1208,14 @@ int marshal_server(int fd)        message = value->message;        msg_size += mds_message_marshal_size(&message, 1);        msg_size += n * (sizeof(size_t) + sizeof(int64_t) + sizeof(int)); +      msg_size += value->send_pending_size * sizeof(char);        for (j = 0; j < n; j++)  	msg_size += (strlen(value->interception_conditions[j].condition) + 1) * sizeof(char);      }    /* Calculate the grand size of all client information. */ -  state_n = 5 + sizeof(size_t) + 2 * sizeof(int) + 1 * sizeof(uint64_t); +  state_n = 6 + sizeof(size_t) + 2 * sizeof(int) + 1 * sizeof(uint64_t);    state_n *= list_elements;    state_n += msg_size; @@ -1224,6 +1259,13 @@ int marshal_server(int fd)        buf_set_next(state_buf_, int, value->socket_fd);        buf_set_next(state_buf_, int, value->open);        buf_set_next(state_buf_, uint64_t, value->id); +      /* Marshal the pending messages. */ +      buf_set_next(state_buf_, size_t, value->send_pending_size); +      if (value->send_pending_size > 0) +	{ +	  memcpy(state_buf_, value->send_pending, value->send_pending_size * sizeof(char)); +	  state_buf_ += value->send_pending_size; +	}        /* Marshal interception conditions. */        buf_set_next(state_buf_, size_t, n = value->interception_conditions_count);        for (j = 0; j < n; j++) @@ -1413,10 +1455,24 @@ int unmarshal_server(int fd)        buf_get_next(state_buf_, ssize_t, value->list_entry);        buf_get_next(state_buf_, int, value->socket_fd);        buf_get_next(state_buf_, int, value->open); -      buf_set_next(state_buf_, uint64_t, value->id); +      buf_get_next(state_buf_, uint64_t, value->id); +      /* Unmarshal the pending messages. */ +      buf_get_next(state_buf_, size_t, value->send_pending_size); +      if (value->send_pending_size > 0) +	{ +	  if (xmalloc(value->send_pending, value->send_pending_size, char)) +	    { +	      perror(*argv); +	      goto clients_fail; +	    } +	  memcpy(value->send_pending, state_buf_, value->send_pending_size * sizeof(char)); +	  state_buf_ += value->send_pending_size; +	} +      else +	value->send_pending = NULL;        /* Unmarshal interception conditions. */        buf_get_next(state_buf_, size_t, value->interception_conditions_count = n); -      seek = 5 * sizeof(size_t) + 2 * sizeof(int) + 1 * sizeof(uint64_t); +      seek = 6 * sizeof(size_t) + 2 * sizeof(int) + 1 * sizeof(uint64_t);        if (xmalloc(value->interception_conditions, n, interception_condition_t))  	{  	  perror(*argv); @@ -1463,6 +1519,7 @@ int unmarshal_server(int fd)  		free(value->interception_conditions[j].condition);  	      free(value->interception_conditions);  	    } +	  free(value->send_pending);  	  free(value);  	}        state_buf_ -= seek / sizeof(char); diff --git a/src/mds-server.h b/src/mds-server.h index 6269e9b..1e3869f 100644 --- a/src/mds-server.h +++ b/src/mds-server.h @@ -110,6 +110,16 @@ typedef struct client     */    size_t interception_conditions_count; +  /** +   * Messages pending to be sent (concatenated) +   */ +  char* send_pending; +   +  /** +   * The character length of the messages pending to be sent +   */ +  size_t send_pending_size; +    } client_t;  /** | 
