diff options
author | Mattias Andrée <maandree@kth.se> | 2024-05-05 10:24:40 +0200 |
---|---|---|
committer | Mattias Andrée <maandree@kth.se> | 2024-05-05 10:24:40 +0200 |
commit | 3dfd6c11ed5599ab8baf4a6114f4ccb34150de6e (patch) | |
tree | b7717583b99f29028c85c3423cf43b104dfa8943 /libexec_run_pipeline.c | |
download | libexec-3dfd6c11ed5599ab8baf4a6114f4ccb34150de6e.tar.gz libexec-3dfd6c11ed5599ab8baf4a6114f4ccb34150de6e.tar.bz2 libexec-3dfd6c11ed5599ab8baf4a6114f4ccb34150de6e.tar.xz |
First commit
Signed-off-by: Mattias Andrée <maandree@kth.se>
Diffstat (limited to 'libexec_run_pipeline.c')
-rw-r--r-- | libexec_run_pipeline.c | 343 |
1 files changed, 343 insertions, 0 deletions
diff --git a/libexec_run_pipeline.c b/libexec_run_pipeline.c new file mode 100644 index 0000000..616f172 --- /dev/null +++ b/libexec_run_pipeline.c @@ -0,0 +1,343 @@ +/* See LICENSE file for copyright and license details. */ +#include "common.h" +#ifndef TEST + + +struct reap_context { + pid_t *pids; + size_t npids; + size_t procs_rem; + int *exit_statuses_out; /* nullable */ +}; + + +struct reaped { + pid_t pid; + int status; +}; + + +static struct reaped *reaped_elsewhere = NULL; +static size_t nreaped_elsewhere = 0; +static size_t reaped_elsewhere_size = 0; + + +static void +sigchld_handler(int signo) +{ + (void) signo; +} + + +static int +unlist_reap(pid_t pid, int *status) +{ + size_t i; + + for (i = 0; i < nreaped_elsewhere; i++) + if (reaped_elsewhere[i].pid == pid) + goto found; + return 0; + +found: + *status = reaped_elsewhere[i].status; + reaped_elsewhere[i] = reaped_elsewhere[--nreaped_elsewhere]; + + if (nreaped_elsewhere <= reaped_elsewhere_size / 4) { + free(reaped_elsewhere); + reaped_elsewhere = NULL; + nreaped_elsewhere = 0; + reaped_elsewhere_size = 0; + } + + return 1; +} + + +static int +reap_child(struct reap_context *ctx, pid_t pid) +{ + size_t i; + int status; + + if (!ctx->procs_rem) + return 0; + + for (i = 0; i < ctx->npids; i++) { + if (ctx->pids[i] == pid) { + if (waitpid(pid, &status, WNOHANG) != pid) + return -1; + if (ctx->exit_statuses_out) + ctx->exit_statuses_out[i] = status; + ctx->procs_rem -= 1; + ctx->pids[i] = -1; + while (ctx->npids && ctx->pids[ctx->npids - 1] == -1) + ctx->npids -= 1; + return 1; + } + } + + return 0; +} + + +static int +reap_children(struct reap_context *reapctx, int flags, + int (*reap_mutex_control)(int action /* -1 = acquire, +1 = release */, void *user4), void *user4, + int (*on_alien_child_death)(pid_t pid, void *user2), void *user2) +{ + siginfo_t info; + int r, status; + void *new; + size_t new_size; + size_t i; + + if (reap_mutex_control) + if ((*reap_mutex_control)(-1, user4)) + goto fail; + + if (nreaped_elsewhere) { + for (i = 0; i < reapctx->npids; i++) { + if (unlist_reap(reapctx->pids[i], &status)) { + if (reapctx->exit_statuses_out) + reapctx->exit_statuses_out[i] = status; + reapctx->procs_rem -= 1; + reapctx->pids[i] = -1; + } + } + while (reapctx->npids && reapctx->pids[reapctx->npids - 1] == -1) + reapctx->npids -= 1; + } + + for (;;) { + info.si_pid = 0; + if (waitid(P_ALL, 0, &info, flags | WNOWAIT)) + goto fail; + if (info.si_pid < 1) + break; + + r = reap_child(reapctx, info.si_pid); + if (r < 0) + goto fail; + if (r > 0) + continue; + + if (on_alien_child_death) { + r = (*on_alien_child_death)(info.si_pid, user2); + if (r < 0) + goto fail; + if (r > 0) + continue; + } + + if (nreaped_elsewhere == reaped_elsewhere_size) { + new_size = reaped_elsewhere_size + 4; + new = realloc(reaped_elsewhere, new_size * sizeof(*reaped_elsewhere)); + if (!new) + goto fail; + reaped_elsewhere = new; + reaped_elsewhere_size = new_size; + } + + if (waitpid(info.si_pid, &status, WNOHANG) != info.si_pid) { + if (!nreaped_elsewhere) { + free(reaped_elsewhere); + reaped_elsewhere = NULL; + reaped_elsewhere_size = 0; + } + goto fail; + } + + reaped_elsewhere[nreaped_elsewhere].pid = info.si_pid; + reaped_elsewhere[nreaped_elsewhere].status = status; + nreaped_elsewhere++; + } + + if (reap_mutex_control) + if ((*reap_mutex_control)(+1, user4)) + goto fail; + return 0; + +fail: + if (reap_mutex_control) + if ((*reap_mutex_control)(+1, user4)) + goto fail; + return -1; +} + + +int +libexec_run_pipeline(int (*on_alien_epoll)(int alien_epoll, uint32_t events, void *user1), int alien_epoll, void *user1, + int (*on_alien_child_death)(pid_t pid, void *user2), void *user2, + int (*after_fork)(struct libexec_command *cmd, int new_fd, void *user3), void *user3, + int (*reap_mutex_control)(int action /* -1 = acquire, +1 = release */, void *user4), void *user4, + int (*on_interrupt)(void *user5), void *user5, + const sigset_t *sigmask /* nullable */, + struct libexec_document *const *docs, size_t ndocs, int docs_1_level, + struct libexec_command *const *cmds, int *exit_statuses_out /* nullable */, size_t ncmds) +{ + struct libexec_document *input_docs = NULL; + size_t i, ninput_docs = 0, files_open; + struct epoll_event evs[8]; + int epfd, n, sigchld_set = 0, error, r; + struct libexec_document *doc; + struct sigaction sigchld, old_sigchld; + struct reap_context reapctx = { + .pids = NULL, + .npids = 0, + .procs_rem = 0, + .exit_statuses_out = exit_statuses_out + }; + + if (!ncmds) + return 0; + + memset(evs, 0, sizeof(evs)); + + epfd = epoll_create1(EPOLL_CLOEXEC); + if (epfd < 0) + return -1; + + if (sigaction(SIGCHLD, NULL, &old_sigchld)) + goto fail; + if (!(old_sigchld.sa_flags & SA_SIGINFO)) { + if (old_sigchld.sa_handler == SIG_IGN || old_sigchld.sa_handler == SIG_DFL) { + memset(&sigchld, 0, sizeof(sigchld)); + sigchld.sa_handler = &sigchld_handler; + sigemptyset(&sigchld.sa_mask); + if (sigaction(SIGCHLD, &sigchld, NULL)) + goto fail; + sigchld_set = 1; + } + } + + if (alien_epoll >= 0) { + evs[0].events = EPOLLIN; + evs[0].data.ptr = NULL; + if (epoll_ctl(epfd, EPOLL_CTL_ADD, alien_epoll, &evs[0])) + goto fail; + } + + reapctx.pids = calloc(ncmds, sizeof(*reapctx.pids)); + if (!reapctx.pids) + goto fail; + + for (i = 0; i < ncmds; i++) + if (libexec_spawn(&reapctx.pids[reapctx.npids++ /* may be set on failure */], cmds[i], + after_fork, user3, &input_docs, &ninput_docs, O_CLOEXEC | O_NONBLOCK)) + goto fail; + + for (i = 0; i < ninput_docs; i++) { + evs[0].events = EPOLLOUT; + evs[0].data.ptr = &input_docs[i]; + if (epoll_ctl(epfd, EPOLL_CTL_ADD, input_docs[i].fd, &evs[0])) + goto fail; + } + + for (i = 0; i < ndocs; i++) { + doc = docs_1_level ? &(*docs)[i] : docs[i]; + evs[0].events = EPOLLIN; + evs[0].data.ptr = doc; + doc->user = 1; + if (epoll_ctl(epfd, EPOLL_CTL_ADD, doc->fd, &evs[0])) + goto fail; + } + + reapctx.procs_rem = ncmds; + + files_open = ninput_docs + ndocs + (size_t)(alien_epoll >= 0); + while (files_open) { + if (reap_children(&reapctx, WNOHANG, reap_mutex_control, user4, on_alien_child_death, user2)) + goto fail; + + n = epoll_pwait(epfd, evs, sizeof(evs) / sizeof(*evs), -1, sigmask); + if (n < 0) { + if (errno != EINTR) + goto fail; + if (on_interrupt) + if ((*on_interrupt)(user5)) + goto fail; + continue; + } + + for (i = 0; i < (size_t)n; i++) { + if (evs[i].data.ptr == NULL) { + r = (*on_alien_epoll)(alien_epoll, evs[i].events, user1); + if (r < 0) { + goto fail; + } else if (r > 0) { + if (epoll_ctl(epfd, EPOLL_CTL_DEL, alien_epoll, &evs[i])) + goto fail; + files_open -= 1; + } + } else { + doc = evs[i].data.ptr; + send_recv_again: + if (doc->user ? libexec_recv_document(doc) : libexec_send_document(doc)) { + if (errno == EAGAIN) + continue; + if (errno == EINTR) { + if (on_interrupt) + if ((*on_interrupt)(user5)) + goto fail; + goto send_recv_again; + } + } else { + files_open -= 1; + if (doc->user) { + close(doc->fd); + doc->fd = -1; + } else { + libexec_destroy_document(doc); + } + } + } + } + } + free(input_docs); + close(epfd); + + while (reapctx.procs_rem) + if (reap_children(&reapctx, 0, reap_mutex_control, user4, on_alien_child_death, user2)) + goto fail; + + free(reapctx.pids); + if (sigchld_set) + if (sigaction(SIGCHLD, &old_sigchld, NULL)) + abort(); + + return 0; + +fail: + error = errno; + while (ninput_docs) + libexec_destroy_document(&input_docs[--ninput_docs]); + free(input_docs); + close(epfd); + if (reapctx.npids) { + if (reap_mutex_control) + if ((*reap_mutex_control)(+1, user4)) + abort(); + for (i = 0; i < reapctx.npids; i++) + if (reapctx.pids[i] >= 1) + if (unlist_reap(reapctx.pids[i], &(int){0}) || kill(reapctx.pids[i], SIGKILL)) + reapctx.pids[i] = -1; + for (i = 0; i < reapctx.npids; i++) + if (reapctx.pids[i] >= 1) + waitpid(reapctx.pids[i], &(int){0}, 0); + if (reap_mutex_control) + if ((*reap_mutex_control)(-1, user4)) + abort(); + } + free(reapctx.pids); + if (sigchld_set) + if (sigaction(SIGCHLD, &old_sigchld, NULL)) + abort(); + errno = error; + return -1; +} + + +#else +LIBEXEC_CONST__ int main(void) {return 0;} /* TODO test */ +#endif |