aboutsummaryrefslogtreecommitdiffstats
path: root/src
diff options
context:
space:
mode:
authorMattias Andrée <maandree@kth.se>2016-07-13 13:29:39 +0200
committerMattias Andrée <maandree@kth.se>2016-07-13 13:29:39 +0200
commit879492ae4e15c02beebaaa163a9fd9959807e752 (patch)
treec7fe0c943dbbbf55bb85fe0cf3b0e4da48024dd1 /src
parentAdd ring[.ch] (diff)
downloadcoopgammad-879492ae4e15c02beebaaa163a9fd9959807e752.tar.gz
coopgammad-879492ae4e15c02beebaaa163a9fd9959807e752.tar.bz2
coopgammad-879492ae4e15c02beebaaa163a9fd9959807e752.tar.xz
Handle outbound messages
Signed-off-by: Mattias Andrée <maandree@kth.se>
Diffstat (limited to '')
-rw-r--r--src/ring.h12
-rw-r--r--src/server.c246
-rw-r--r--src/server.h10
3 files changed, 219 insertions, 49 deletions
diff --git a/src/ring.h b/src/ring.h
index bf22226..7281d46 100644
--- a/src/ring.h
+++ b/src/ring.h
@@ -113,3 +113,15 @@ void* ring_peek(struct ring* this, size_t* n);
*/
void ring_pop(struct ring* this, size_t n);
+/**
+ * Check whether there is more data waiting
+ * in a ring buffer
+ *
+ * @param this The ring buffer
+ * @return 1 if there is more data, 0 otherwise
+ */
+static inline int ring_have_more(struct ring* this)
+{
+ return this->buffer != NULL;
+}
+
diff --git a/src/server.c b/src/server.c
index 16a9dcf..6cce72b 100644
--- a/src/server.c
+++ b/src/server.c
@@ -55,9 +55,14 @@ size_t connections_ptr = 0;
size_t connections_used = 0;
/**
- * The clients' connections' message buffers
+ * The clients' connections' inbound-message buffers
*/
-struct message* client_messages = NULL;
+struct message* inbound = NULL;
+
+/**
+ * The clients' connections' outbound-message buffers
+ */
+struct ring* outbound = NULL;
/**
@@ -110,9 +115,11 @@ void server_destroy(int disconnect)
shutdown(connections[i], SHUT_RDWR);
close(connections[i]);
}
- message_destroy(client_messages + i);
+ message_destroy(inbound + i);
+ ring_destroy(outbound + i);
}
- free(client_messages);
+ free(inbound);
+ free(outbound);
free(connections);
}
@@ -144,7 +151,10 @@ size_t server_marshal(void* buf)
for (i = 0; i < connections_used; i++)
if (connections[i] >= 0)
- off += message_marshal(client_messages + i, bs ? bs + off : NULL);
+ {
+ off += message_marshal(inbound + i, bs ? bs + off : NULL);
+ off += ring_marshal(outbound + i, bs ? bs + off : NULL);
+ }
return off;
}
@@ -162,7 +172,7 @@ size_t server_unmarshal(const void* buf)
const char* bs = buf;
connections = NULL;
- client_messages = NULL;
+ inbound = NULL;
connections_ptr = *(const size_t*)(bs + off);
off += sizeof(size_t);
@@ -177,15 +187,18 @@ size_t server_unmarshal(const void* buf)
return 0;
off += connections_used * sizeof(*connections);
- client_messages = malloc(connections_alloc * sizeof(*client_messages));
- if (client_messages == NULL)
+ inbound = malloc(connections_alloc * sizeof(*inbound));
+ if (inbound == NULL)
return 0;
}
for (i = 0; i < connections_used; i++)
if (connections[i] >= 0)
{
- off += n = message_unmarshal(client_messages + i, bs ? bs + off : NULL);
+ off += n = message_unmarshal(inbound + i, bs + off);
+ if (n == 0)
+ return 0;
+ off += n = ring_unmarshal(outbound + i, bs + off);
if (n == 0)
return 0;
}
@@ -257,12 +270,18 @@ static int handle_server(void)
connections = new;
connections[connections_ptr] = fd;
- new = realloc(client_messages, (connections_alloc + 10) * sizeof(*client_messages));
+ new = realloc(outbound, (connections_alloc + 10) * sizeof(*outbound));
if (new == NULL)
goto fail;
- client_messages = new;
+ outbound = new;
+ ring_initialise(outbound + connections_ptr);
+
+ new = realloc(inbound, (connections_alloc + 10) * sizeof(*inbound));
+ if (new == NULL)
+ goto fail;
+ inbound = new;
connections_alloc += 10;
- if (message_initialise(client_messages + connections_ptr))
+ if (message_initialise(inbound + connections_ptr))
goto fail;
}
@@ -314,7 +333,8 @@ static int connection_closed(int client)
output->table_sums[j] = output->table_sums[k];
}
if (updated >= 0)
- flush_filters(output, (size_t)updated);
+ if (flush_filters(output, (size_t)updated) < 0)
+ return -1;
}
return 0;
@@ -322,11 +342,118 @@ static int connection_closed(int client)
/**
+ * Send a message
+ *
+ * @param conn The index of the connection
+ * @param buf The data to send
+ * @param n The size of `buf`
+ * @return Zero on success, -1 on error, 1 if disconncted
+ * EINTR, EAGAIN, EWOULDBLOCK, and ECONNRESET count
+ * as success (ECONNRESET cause 1 to be returned),
+ * and are handled appropriately.
+ */
+static int send_message(size_t conn, char* buf, size_t n)
+{
+ struct ring* ring = outbound + conn;
+ int fd = connections[conn];
+ int saved_errno;
+ size_t ptr = 0;
+ ssize_t sent;
+ size_t chunksize = n;
+ size_t sendsize;
+ size_t old_n;
+ char* old_buf;
+
+ while ((old_buf = ring_peek(ring, &old_n)))
+ {
+ size_t old_ptr = 0;
+ while (old_ptr < n)
+ {
+ sendsize = old_n - old_ptr < chunksize ? old_n - old_ptr : chunksize;
+ sent = send(fd, old_buf + old_ptr, sendsize, 0);
+ if (sent < 0)
+ {
+ if (errno != EMSGSIZE)
+ goto fail;
+ chunksize >>= 1;
+ if (chunksize == 0)
+ goto fail;
+ continue;
+ }
+ old_ptr += (size_t)sent;
+ ring_pop(ring, (size_t)sent);
+ }
+ }
+
+ while (ptr < n)
+ {
+ sendsize = n - ptr < chunksize ? n - ptr : chunksize;
+ sent = send(fd, buf + ptr, sendsize, 0);
+ if (sent < 0)
+ {
+ if (errno != EMSGSIZE)
+ goto fail;
+ chunksize >>= 1;
+ if (chunksize == 0)
+ goto fail;
+ continue;
+ }
+ ptr += (size_t)sent;
+ }
+
+ free(buf);
+ return 0;
+
+ fail:
+ switch (errno)
+ {
+ case EINTR:
+ case EAGAIN:
+#if EAGAIN != EWOULDBLOCK
+ case EWOULDBLOCK:
+#endif
+ if (ring_push(ring, buf + ptr, n - ptr) < 0)
+ goto proper_fail;
+ free(buf);
+ return 0;
+ case ECONNRESET:
+ free(buf);
+ if (connection_closed(fd) < 0)
+ return -1;
+ return 1;
+ default:
+ break;
+ }
+ proper_fail:
+ saved_errno = errno;
+ free(buf);
+ errno = saved_errno;
+ return -1;
+}
+
+
+/**
+ * Continue sending the queued messages
+ *
+ * @param conn The index of the connection
+ * @return Zero on success, -1 on error, 1 if disconncted
+ * EINTR, EAGAIN, EWOULDBLOCK, and ECONNRESET count
+ * as success (ECONNRESET cause 1 to be returned),
+ * and are handled appropriately.
+ */
+static inline int continue_send(size_t conn)
+{
+ return send_message(conn, NULL, 0);
+}
+
+
+/**
* Handle a ‘Command: enumerate-crtcs’ message
*
* @param conn The index of the connection
* @param message_id The value of the ‘Message ID’ header
- * @return Zero on success (even if ignored), -1 on error
+ * @return Zero on success (even if ignored), -1 on error,
+ * 1 if connection closed
*/
static int enumerate_crtcs(size_t conn, char* message_id)
{
@@ -375,7 +502,8 @@ static int enumerate_crtcs(size_t conn, char* message_id)
* @param conn The index of the connection
* @param message_id The value of the ‘Message ID’ header
* @param crtc The value of the ‘CRTC’ header
- * @return Zero on success (even if ignored), -1 on error
+ * @return Zero on success (even if ignored), -1 on error,
+ * 1 if connection closed
*/
static int get_gamma_info(size_t conn, char* message_id, char* crtc)
{
@@ -475,7 +603,8 @@ static int get_gamma_info(size_t conn, char* message_id, char* crtc)
* @param coalesce The value of the ‘Coalesce’ header
* @param high_priority The value of the ‘High priority’ header
* @param low_priority The value of the ‘Low priority’ header
- * @return Zero on success (even if ignored), -1 on error
+ * @return Zero on success (even if ignored), -1 on error,
+ * 1 if connection closed
*/
static int get_gamma(size_t conn, char* message_id, char* crtc, char* coalesce,
char* high_priority, char* low_priority)
@@ -647,11 +776,12 @@ static int get_gamma(size_t conn, char* message_id, char* crtc, char* coalesce,
* @param priority The value of the ‘Priority’ header
* @param class The value of the ‘Class’ header
* @param lifespan The value of the ‘Lifespan’ header
- * @return Zero on success (even if ignored), -1 on error
+ * @return Zero on success (even if ignored), -1 on error,
+ * 1 if connection closed
*/
static int set_gamma(size_t conn, char* crtc, char* priority, char* class, char* lifespan)
{
- struct message* msg = client_messages + conn;
+ struct message* msg = inbound + conn;
struct output* output = NULL;
struct filter filter;
char* p;
@@ -746,7 +876,7 @@ static int set_gamma(size_t conn, char* crtc, char* priority, char* class, char*
*/
static int handle_connection(size_t conn)
{
- struct message* msg = client_messages + conn;
+ struct message* msg = inbound + conn;
int r, fd = connections[conn];
char* command = NULL;
char* crtc = NULL;
@@ -787,7 +917,8 @@ static int handle_connection(size_t conn)
if (conn == connections_used)
connections_used -= 1;
message_destroy(msg);
- connection_closed(fd);
+ if (connection_closed(fd) < 0)
+ return -1;
return 1;
}
@@ -836,8 +967,8 @@ static int handle_connection(size_t conn)
}
else
fprintf(stderr, "%s: ignoring unrecognised command: Command: %s\n", argv0, command);
- if (r < 0)
- return -1;
+ if (r)
+ return r;
goto again;
}
@@ -850,15 +981,21 @@ static int handle_connection(size_t conn)
*/
int main_loop(void)
{
- fd_set fds_orig, fds_read, fds_ex;
+ fd_set fds_orig, fds_rd, fds_wr, fds_ex;
int i, r, update, fdn = update_fdset(&fds_orig);
size_t j;
while (!reexec && !terminate)
{
- memcpy(&fds_read, &fds_orig, sizeof(fd_set));
- memcpy(&fds_ex, &fds_orig, sizeof(fd_set));
- if (select(fdn, &fds_read, NULL, &fds_ex, NULL) < 0)
+ memcpy(&fds_rd, &fds_orig, sizeof(fd_set));
+ memcpy(&fds_ex, &fds_orig, sizeof(fd_set));
+
+ FD_ZERO(&fds_wr);
+ for (j = 0; j < connections_used; j++)
+ if ((connections[j] >= 0) && ring_have_more(outbound + j))
+ FD_SET(connections[j], &fds_wr);
+
+ if (select(fdn, &fds_rd, &fds_wr, &fds_ex, NULL) < 0)
{
if (errno == EINTR)
continue;
@@ -867,28 +1004,43 @@ int main_loop(void)
update = 0;
for (i = 0; i < fdn; i++)
- if (FD_ISSET(i, &fds_read) || FD_ISSET(i, &fds_ex))
- {
- if (i == socketfd)
- r = handle_server();
- else
- {
- for (j = 0;; j++)
- if (connections[j] == i)
+ {
+ int do_read = FD_ISSET(i, &fds_rd) || FD_ISSET(i, &fds_ex);
+ int do_write = FD_ISSET(i, &fds_wr);
+ if (do_read || do_write)
+ {
+ if (i == socketfd)
+ r = handle_server();
+ else
+ {
+ for (j = 0;; j++)
+ if (connections[j] == i)
+ break;
+ r = do_read ? handle_connection(j) : 0;
+ }
+ switch (r)
+ {
+ case 0:
+ break;
+ case 1:
+ update = 1;
+ break;
+ default:
+ return -1;
+ }
+ if (do_write)
+ switch (continue_send(j))
+ {
+ case 0:
break;
- r = handle_connection(j);
- }
- switch (r)
- {
- case 0:
- break;
- case 1:
- update = 1;
- break;
- default:
- return -1;
- }
- }
+ case 1:
+ update = 1;
+ break;
+ default:
+ return -1;
+ }
+ }
+ }
if (update)
update_fdset(&fds_orig);
}
diff --git a/src/server.h b/src/server.h
index 31b7914..e70df7a 100644
--- a/src/server.h
+++ b/src/server.h
@@ -16,6 +16,7 @@
* along with this library. If not, see <http://www.gnu.org/licenses/>.
*/
#include "message.h"
+#include "ring.h"
#include <stddef.h>
@@ -45,9 +46,14 @@ extern size_t connections_ptr;
extern size_t connections_used;
/**
- * The clients' connections' message buffers
+ * The clients' connections' inbound-message buffers
*/
-extern struct message* client_messages;
+extern struct message* inbound;
+
+/**
+ * The clients' connections' outbound-message buffers
+ */
+extern struct ring* outbound;