diff options
author | Mattias Andrée <maandree@kth.se> | 2016-07-13 13:29:39 +0200 |
---|---|---|
committer | Mattias Andrée <maandree@kth.se> | 2016-07-13 13:29:39 +0200 |
commit | 879492ae4e15c02beebaaa163a9fd9959807e752 (patch) | |
tree | c7fe0c943dbbbf55bb85fe0cf3b0e4da48024dd1 | |
parent | Add ring[.ch] (diff) | |
download | coopgammad-879492ae4e15c02beebaaa163a9fd9959807e752.tar.gz coopgammad-879492ae4e15c02beebaaa163a9fd9959807e752.tar.bz2 coopgammad-879492ae4e15c02beebaaa163a9fd9959807e752.tar.xz |
Handle outbound messages
Signed-off-by: Mattias Andrée <maandree@kth.se>
Diffstat (limited to '')
-rw-r--r-- | src/ring.h | 12 | ||||
-rw-r--r-- | src/server.c | 246 | ||||
-rw-r--r-- | src/server.h | 10 |
3 files changed, 219 insertions, 49 deletions
@@ -113,3 +113,15 @@ void* ring_peek(struct ring* this, size_t* n); */ void ring_pop(struct ring* this, size_t n); +/** + * Check whether there is more data waiting + * in a ring buffer + * + * @param this The ring buffer + * @return 1 if there is more data, 0 otherwise + */ +static inline int ring_have_more(struct ring* this) +{ + return this->buffer != NULL; +} + diff --git a/src/server.c b/src/server.c index 16a9dcf..6cce72b 100644 --- a/src/server.c +++ b/src/server.c @@ -55,9 +55,14 @@ size_t connections_ptr = 0; size_t connections_used = 0; /** - * The clients' connections' message buffers + * The clients' connections' inbound-message buffers */ -struct message* client_messages = NULL; +struct message* inbound = NULL; + +/** + * The clients' connections' outbound-message buffers + */ +struct ring* outbound = NULL; /** @@ -110,9 +115,11 @@ void server_destroy(int disconnect) shutdown(connections[i], SHUT_RDWR); close(connections[i]); } - message_destroy(client_messages + i); + message_destroy(inbound + i); + ring_destroy(outbound + i); } - free(client_messages); + free(inbound); + free(outbound); free(connections); } @@ -144,7 +151,10 @@ size_t server_marshal(void* buf) for (i = 0; i < connections_used; i++) if (connections[i] >= 0) - off += message_marshal(client_messages + i, bs ? bs + off : NULL); + { + off += message_marshal(inbound + i, bs ? bs + off : NULL); + off += ring_marshal(outbound + i, bs ? bs + off : NULL); + } return off; } @@ -162,7 +172,7 @@ size_t server_unmarshal(const void* buf) const char* bs = buf; connections = NULL; - client_messages = NULL; + inbound = NULL; connections_ptr = *(const size_t*)(bs + off); off += sizeof(size_t); @@ -177,15 +187,18 @@ size_t server_unmarshal(const void* buf) return 0; off += connections_used * sizeof(*connections); - client_messages = malloc(connections_alloc * sizeof(*client_messages)); - if (client_messages == NULL) + inbound = malloc(connections_alloc * sizeof(*inbound)); + if (inbound == NULL) return 0; } for (i = 0; i < connections_used; i++) if (connections[i] >= 0) { - off += n = message_unmarshal(client_messages + i, bs ? bs + off : NULL); + off += n = message_unmarshal(inbound + i, bs + off); + if (n == 0) + return 0; + off += n = ring_unmarshal(outbound + i, bs + off); if (n == 0) return 0; } @@ -257,12 +270,18 @@ static int handle_server(void) connections = new; connections[connections_ptr] = fd; - new = realloc(client_messages, (connections_alloc + 10) * sizeof(*client_messages)); + new = realloc(outbound, (connections_alloc + 10) * sizeof(*outbound)); if (new == NULL) goto fail; - client_messages = new; + outbound = new; + ring_initialise(outbound + connections_ptr); + + new = realloc(inbound, (connections_alloc + 10) * sizeof(*inbound)); + if (new == NULL) + goto fail; + inbound = new; connections_alloc += 10; - if (message_initialise(client_messages + connections_ptr)) + if (message_initialise(inbound + connections_ptr)) goto fail; } @@ -314,7 +333,8 @@ static int connection_closed(int client) output->table_sums[j] = output->table_sums[k]; } if (updated >= 0) - flush_filters(output, (size_t)updated); + if (flush_filters(output, (size_t)updated) < 0) + return -1; } return 0; @@ -322,11 +342,118 @@ static int connection_closed(int client) /** + * Send a message + * + * @param conn The index of the connection + * @param buf The data to send + * @param n The size of `buf` + * @return Zero on success, -1 on error, 1 if disconncted + * EINTR, EAGAIN, EWOULDBLOCK, and ECONNRESET count + * as success (ECONNRESET cause 1 to be returned), + * and are handled appropriately. + */ +static int send_message(size_t conn, char* buf, size_t n) +{ + struct ring* ring = outbound + conn; + int fd = connections[conn]; + int saved_errno; + size_t ptr = 0; + ssize_t sent; + size_t chunksize = n; + size_t sendsize; + size_t old_n; + char* old_buf; + + while ((old_buf = ring_peek(ring, &old_n))) + { + size_t old_ptr = 0; + while (old_ptr < n) + { + sendsize = old_n - old_ptr < chunksize ? old_n - old_ptr : chunksize; + sent = send(fd, old_buf + old_ptr, sendsize, 0); + if (sent < 0) + { + if (errno != EMSGSIZE) + goto fail; + chunksize >>= 1; + if (chunksize == 0) + goto fail; + continue; + } + old_ptr += (size_t)sent; + ring_pop(ring, (size_t)sent); + } + } + + while (ptr < n) + { + sendsize = n - ptr < chunksize ? n - ptr : chunksize; + sent = send(fd, buf + ptr, sendsize, 0); + if (sent < 0) + { + if (errno != EMSGSIZE) + goto fail; + chunksize >>= 1; + if (chunksize == 0) + goto fail; + continue; + } + ptr += (size_t)sent; + } + + free(buf); + return 0; + + fail: + switch (errno) + { + case EINTR: + case EAGAIN: +#if EAGAIN != EWOULDBLOCK + case EWOULDBLOCK: +#endif + if (ring_push(ring, buf + ptr, n - ptr) < 0) + goto proper_fail; + free(buf); + return 0; + case ECONNRESET: + free(buf); + if (connection_closed(fd) < 0) + return -1; + return 1; + default: + break; + } + proper_fail: + saved_errno = errno; + free(buf); + errno = saved_errno; + return -1; +} + + +/** + * Continue sending the queued messages + * + * @param conn The index of the connection + * @return Zero on success, -1 on error, 1 if disconncted + * EINTR, EAGAIN, EWOULDBLOCK, and ECONNRESET count + * as success (ECONNRESET cause 1 to be returned), + * and are handled appropriately. + */ +static inline int continue_send(size_t conn) +{ + return send_message(conn, NULL, 0); +} + + +/** * Handle a ‘Command: enumerate-crtcs’ message * * @param conn The index of the connection * @param message_id The value of the ‘Message ID’ header - * @return Zero on success (even if ignored), -1 on error + * @return Zero on success (even if ignored), -1 on error, + * 1 if connection closed */ static int enumerate_crtcs(size_t conn, char* message_id) { @@ -375,7 +502,8 @@ static int enumerate_crtcs(size_t conn, char* message_id) * @param conn The index of the connection * @param message_id The value of the ‘Message ID’ header * @param crtc The value of the ‘CRTC’ header - * @return Zero on success (even if ignored), -1 on error + * @return Zero on success (even if ignored), -1 on error, + * 1 if connection closed */ static int get_gamma_info(size_t conn, char* message_id, char* crtc) { @@ -475,7 +603,8 @@ static int get_gamma_info(size_t conn, char* message_id, char* crtc) * @param coalesce The value of the ‘Coalesce’ header * @param high_priority The value of the ‘High priority’ header * @param low_priority The value of the ‘Low priority’ header - * @return Zero on success (even if ignored), -1 on error + * @return Zero on success (even if ignored), -1 on error, + * 1 if connection closed */ static int get_gamma(size_t conn, char* message_id, char* crtc, char* coalesce, char* high_priority, char* low_priority) @@ -647,11 +776,12 @@ static int get_gamma(size_t conn, char* message_id, char* crtc, char* coalesce, * @param priority The value of the ‘Priority’ header * @param class The value of the ‘Class’ header * @param lifespan The value of the ‘Lifespan’ header - * @return Zero on success (even if ignored), -1 on error + * @return Zero on success (even if ignored), -1 on error, + * 1 if connection closed */ static int set_gamma(size_t conn, char* crtc, char* priority, char* class, char* lifespan) { - struct message* msg = client_messages + conn; + struct message* msg = inbound + conn; struct output* output = NULL; struct filter filter; char* p; @@ -746,7 +876,7 @@ static int set_gamma(size_t conn, char* crtc, char* priority, char* class, char* */ static int handle_connection(size_t conn) { - struct message* msg = client_messages + conn; + struct message* msg = inbound + conn; int r, fd = connections[conn]; char* command = NULL; char* crtc = NULL; @@ -787,7 +917,8 @@ static int handle_connection(size_t conn) if (conn == connections_used) connections_used -= 1; message_destroy(msg); - connection_closed(fd); + if (connection_closed(fd) < 0) + return -1; return 1; } @@ -836,8 +967,8 @@ static int handle_connection(size_t conn) } else fprintf(stderr, "%s: ignoring unrecognised command: Command: %s\n", argv0, command); - if (r < 0) - return -1; + if (r) + return r; goto again; } @@ -850,15 +981,21 @@ static int handle_connection(size_t conn) */ int main_loop(void) { - fd_set fds_orig, fds_read, fds_ex; + fd_set fds_orig, fds_rd, fds_wr, fds_ex; int i, r, update, fdn = update_fdset(&fds_orig); size_t j; while (!reexec && !terminate) { - memcpy(&fds_read, &fds_orig, sizeof(fd_set)); - memcpy(&fds_ex, &fds_orig, sizeof(fd_set)); - if (select(fdn, &fds_read, NULL, &fds_ex, NULL) < 0) + memcpy(&fds_rd, &fds_orig, sizeof(fd_set)); + memcpy(&fds_ex, &fds_orig, sizeof(fd_set)); + + FD_ZERO(&fds_wr); + for (j = 0; j < connections_used; j++) + if ((connections[j] >= 0) && ring_have_more(outbound + j)) + FD_SET(connections[j], &fds_wr); + + if (select(fdn, &fds_rd, &fds_wr, &fds_ex, NULL) < 0) { if (errno == EINTR) continue; @@ -867,28 +1004,43 @@ int main_loop(void) update = 0; for (i = 0; i < fdn; i++) - if (FD_ISSET(i, &fds_read) || FD_ISSET(i, &fds_ex)) - { - if (i == socketfd) - r = handle_server(); - else - { - for (j = 0;; j++) - if (connections[j] == i) + { + int do_read = FD_ISSET(i, &fds_rd) || FD_ISSET(i, &fds_ex); + int do_write = FD_ISSET(i, &fds_wr); + if (do_read || do_write) + { + if (i == socketfd) + r = handle_server(); + else + { + for (j = 0;; j++) + if (connections[j] == i) + break; + r = do_read ? handle_connection(j) : 0; + } + switch (r) + { + case 0: + break; + case 1: + update = 1; + break; + default: + return -1; + } + if (do_write) + switch (continue_send(j)) + { + case 0: break; - r = handle_connection(j); - } - switch (r) - { - case 0: - break; - case 1: - update = 1; - break; - default: - return -1; - } - } + case 1: + update = 1; + break; + default: + return -1; + } + } + } if (update) update_fdset(&fds_orig); } diff --git a/src/server.h b/src/server.h index 31b7914..e70df7a 100644 --- a/src/server.h +++ b/src/server.h @@ -16,6 +16,7 @@ * along with this library. If not, see <http://www.gnu.org/licenses/>. */ #include "message.h" +#include "ring.h" #include <stddef.h> @@ -45,9 +46,14 @@ extern size_t connections_ptr; extern size_t connections_used; /** - * The clients' connections' message buffers + * The clients' connections' inbound-message buffers */ -extern struct message* client_messages; +extern struct message* inbound; + +/** + * The clients' connections' outbound-message buffers + */ +extern struct ring* outbound; |