diff options
| author | Mattias Andrée <maandree@operamail.com> | 2014-05-11 20:38:35 +0200 | 
|---|---|---|
| committer | Mattias Andrée <maandree@operamail.com> | 2014-05-11 20:38:35 +0200 | 
| commit | 4ccb5bfda23550315a89d625b99b3cc48c3e42fa (patch) | |
| tree | e8ff3675874c8b9bb01d3ad8036dff743ae3bb2a | |
| parent | m doc (diff) | |
| download | mds-4ccb5bfda23550315a89d625b99b3cc48c3e42fa.tar.gz mds-4ccb5bfda23550315a89d625b99b3cc48c3e42fa.tar.bz2 mds-4ccb5bfda23550315a89d625b99b3cc48c3e42fa.tar.xz  | |
receive modifications
Signed-off-by: Mattias Andrée <maandree@operamail.com>
Diffstat (limited to '')
| -rw-r--r-- | src/mds-server/client.c | 6 | ||||
| -rw-r--r-- | src/mds-server/client.h | 20 | ||||
| -rw-r--r-- | src/mds-server/mds-server.c | 129 | 
3 files changed, 134 insertions, 21 deletions
diff --git a/src/mds-server/client.c b/src/mds-server/client.c index 6c46c90..3c8fb2d 100644 --- a/src/mds-server/client.c +++ b/src/mds-server/client.c @@ -57,6 +57,10 @@ void client_destroy(client_t* restrict this)        mds_message_destroy(this->modify_message);        free(this->modify_message);      } +  if (this->modify_mutex_created) +    pthread_mutex_destroy(&(this->modify_mutex)); +  if (this->modify_cond_created) +    pthread_cond_destroy(&(this->modify_cond));    free(this);  } @@ -132,6 +136,8 @@ size_t client_unmarshal(client_t* restrict this, char* restrict data)    this->multicasts = NULL;    this->send_pending = NULL;    this->mutex_created = 0; +  this->modify_mutex_created = 0; +  this->modify_cond_created = 0;    this->multicasts_count = 0;    buf_get_next(data, ssize_t, this->list_entry);    buf_get_next(data, int, this->socket_fd); diff --git a/src/mds-server/client.h b/src/mds-server/client.h index 972ff84..2aeb813 100644 --- a/src/mds-server/client.h +++ b/src/mds-server/client.h @@ -112,6 +112,26 @@ typedef struct client     */    struct mds_message* modify_message; +  /** +   * Mutex for `modify_message`  +   */ +  pthread_mutex_t modify_mutex; +   +  /** +   * Condidition for `modify_message`  +   */ +  pthread_cond_t modify_cond; +   +  /** +   * Whether `modify_mutex` has been initialised +   */ +  int modify_mutex_created; +   +  /** +   * Whether `modify_cond` has been initialised +   */ +  int modify_cond_created; +    } client_t; diff --git a/src/mds-server/mds-server.c b/src/mds-server/mds-server.c index 8fbcd44..0c8ba92 100644 --- a/src/mds-server/mds-server.c +++ b/src/mds-server/mds-server.c @@ -493,6 +493,8 @@ void* slave_loop(void* data)        information->send_pending = NULL;        information->modify_message = NULL;        information->mutex_created = 0; +      information->modify_mutex_created = 0; +      information->modify_cond_created = 0;        /* Add to list of clients. */        pthread_mutex_lock(&slave_mutex); @@ -540,6 +542,20 @@ void* slave_loop(void* data)      }    information->mutex_created = 1; +  /* Create mutex and codition for multicast interception replies. */ +  if ((errno = pthread_mutex_init(&(information->modify_mutex), NULL)) != 0) +    { +      perror(*argv); +      goto fail; +    } +  information->modify_mutex_created = 1; +  if ((errno = pthread_cond_init(&(information->modify_cond), NULL)) != 0) +    { +      perror(*argv); +      goto fail; +    } +  information->modify_cond_created = 1; +      /* Make the server update without all slaves dying on SIGUSR1. */    if (xsigaction(SIGUSR1, sigusr1_trap) < 0) @@ -683,7 +699,7 @@ void* slave_loop(void* data)   *    * @param  client  The client has sent a message   */ -void message_received(client_t* client) /* TODO Modify ID */ +void message_received(client_t* client)  {    mds_message_t message = client->message;    int assign_id = 0; @@ -692,6 +708,7 @@ void message_received(client_t* client) /* TODO Modify ID */    int64_t priority = 0;    int stop = 0;    const char* message_id = NULL; +  uint64_t modify_id = 0;    size_t i, n;    char* msgbuf; @@ -706,9 +723,71 @@ void message_received(client_t* client) /* TODO Modify ID */        else if (strequals(h,  "Stop: yes"))           stop       = 1;        else if (startswith(h, "Message ID: "))        message_id = strstr(h, ": ") + 2;        else if (startswith(h, "Priority: "))          priority   = atoll(strstr(h, ": ") + 2); +      else if (startswith(h, "Modify ID: "))         modify_id  = (uint64_t)atoll(strstr(h, ": ") + 2); +    } +   +   +  /* Notify waiting client about a received message modification. */ +  if (modifying != 0) +    { +      /* pthread_cond_timedwait is required to handle re-exec because +	 pthread_cond_timedwait and pthread_cond_wait ignore interruptions via signals. */ +      struct timespec timeout = +	{ +	  .tv_sec = 1, +	  .tv_nsec = 0 +	}; +      size_t address; +      client_t* recipient; +      mds_message_t* multicast; +       +      with_mutex(modify_mutex, +		 while (hash_table_contains_key(&modify_map, (size_t)modify_id) == 0) +		   { +		     /* TODO support re-exec */ +		     pthread_cond_timedwait(&slave_cond, &slave_mutex, &timeout); +		   } +		 address = hash_table_get(&modify_map, (size_t)modify_id); +		 recipient = (client_t*)(void*)address; +		 if (xmalloc(multicast = recipient->modify_message, 1, mds_message_t)) +		   goto fail; +		 multicast->headers = NULL; +		 multicast->header_count = 0; +		 multicast->payload = NULL; +		 multicast->payload_size = 0; +		 multicast->payload_ptr = 0; +		 multicast->buffer = NULL; +		 multicast->buffer_size = 0; +		 multicast->buffer_ptr = 0; +		 multicast->stage = 0; +		 if (xmalloc(multicast->payload, message.payload_size, char)) +		   goto fail; +		 memcpy(multicast->payload, message.payload, message.payload_size * sizeof(char)); +		 if (xmalloc(multicast->headers, message.header_count, char*)) +		   goto fail; +		 for (i = 0; i < message.header_count; i++, multicast->header_count++) +		   { +		     multicast->headers[i] = strdup(message.headers[i]); +		     if (multicast->headers[i] == NULL) +		       goto fail; +		   } +		 goto done; +	       fail: +		 if (multicast != NULL) +		   { +		     mds_message_destroy(multicast); +		     free(multicast); +		     recipient->modify_message = NULL; +		   } +	       done: +		 ); +      with_mutex(client->modify_mutex, pthread_cond_signal(&(client->modify_cond));); +       +      /* Do nothing more, not not even multicast this message. */ +      return;      } -  /* Ignore message if not labelled with a message ID. */ +      if (message_id == NULL)      {        eprint("received message with a message ID, ignoring."); @@ -1292,6 +1371,11 @@ void multicast_message(multicast_t* multicast)  			     hash_table_put(&modify_map, (size_t)modify_id, (size_t)(void*)client);  			     pthread_cond_signal(&slave_cond);  			   } +		       } +		     ); +	  with_mutex(client->modify_mutex, +		     if (client->modify_message == NULL) +		       {  			 for (;;)  			   {  			     pthread_cond_timedwait(&slave_cond, &slave_mutex, &timeout); @@ -1306,29 +1390,32 @@ void multicast_message(multicast_t* multicast)  	    return;  	  /* Act upon the reply. */ -	  mod = client->modify_message; -	  for (i = 0; i < mod->header_count; i++) -	    if (!strcmp(mod->headers[i], "Modify: yes")) -	      { -		modifying = 1; -		break; -	      } -	  if (modifying) +	  if (client->modify_message != NULL)  	    { -	      old_buf = multicast->message; -	      n = mod->payload_size; -	      multicast->message = realloc(old_buf, (multicast->message_prefix + n) * sizeof(char)); -	      if (multicast->message == NULL) +	      mod = client->modify_message; +	      for (i = 0; i < mod->header_count; i++) +		if (!strcmp(mod->headers[i], "Modify: yes")) +		  { +		    modifying = 1; +		    break; +		  } +	      if (modifying)  		{ -		  perror(*argv); -		  multicast->message = old_buf; +		  old_buf = multicast->message; +		  n = mod->payload_size; +		  multicast->message = realloc(old_buf, (multicast->message_prefix + n) * sizeof(char)); +		  if (multicast->message == NULL) +		    { +		      perror(*argv); +		      multicast->message = old_buf; +		    } +		  else +		    memcpy(multicast->message + multicast->message_prefix, mod->payload, n);  		} -	      else -		memcpy(multicast->message + multicast->message_prefix, mod->payload, n); +	       +	      /* Free the reply. */ +	      mds_message_destroy(client->modify_message);  	    } -	   -	  /* Free the reply. */ -	  mds_message_destroy(client->modify_message);  	}        /* Reset how much of the message has been sent before we continue with next recipient. */  | 
