aboutsummaryrefslogtreecommitdiffstats
path: root/libexec_run_pipeline.c
diff options
context:
space:
mode:
authorMattias Andrée <maandree@kth.se>2024-05-05 10:24:40 +0200
committerMattias Andrée <maandree@kth.se>2024-05-05 10:24:40 +0200
commit3dfd6c11ed5599ab8baf4a6114f4ccb34150de6e (patch)
treeb7717583b99f29028c85c3423cf43b104dfa8943 /libexec_run_pipeline.c
downloadlibexec-3dfd6c11ed5599ab8baf4a6114f4ccb34150de6e.tar.gz
libexec-3dfd6c11ed5599ab8baf4a6114f4ccb34150de6e.tar.bz2
libexec-3dfd6c11ed5599ab8baf4a6114f4ccb34150de6e.tar.xz
First commit
Signed-off-by: Mattias Andrée <maandree@kth.se>
Diffstat (limited to 'libexec_run_pipeline.c')
-rw-r--r--libexec_run_pipeline.c343
1 files changed, 343 insertions, 0 deletions
diff --git a/libexec_run_pipeline.c b/libexec_run_pipeline.c
new file mode 100644
index 0000000..616f172
--- /dev/null
+++ b/libexec_run_pipeline.c
@@ -0,0 +1,343 @@
+/* See LICENSE file for copyright and license details. */
+#include "common.h"
+#ifndef TEST
+
+
+struct reap_context {
+ pid_t *pids;
+ size_t npids;
+ size_t procs_rem;
+ int *exit_statuses_out; /* nullable */
+};
+
+
+struct reaped {
+ pid_t pid;
+ int status;
+};
+
+
+static struct reaped *reaped_elsewhere = NULL;
+static size_t nreaped_elsewhere = 0;
+static size_t reaped_elsewhere_size = 0;
+
+
+static void
+sigchld_handler(int signo)
+{
+ (void) signo;
+}
+
+
+static int
+unlist_reap(pid_t pid, int *status)
+{
+ size_t i;
+
+ for (i = 0; i < nreaped_elsewhere; i++)
+ if (reaped_elsewhere[i].pid == pid)
+ goto found;
+ return 0;
+
+found:
+ *status = reaped_elsewhere[i].status;
+ reaped_elsewhere[i] = reaped_elsewhere[--nreaped_elsewhere];
+
+ if (nreaped_elsewhere <= reaped_elsewhere_size / 4) {
+ free(reaped_elsewhere);
+ reaped_elsewhere = NULL;
+ nreaped_elsewhere = 0;
+ reaped_elsewhere_size = 0;
+ }
+
+ return 1;
+}
+
+
+static int
+reap_child(struct reap_context *ctx, pid_t pid)
+{
+ size_t i;
+ int status;
+
+ if (!ctx->procs_rem)
+ return 0;
+
+ for (i = 0; i < ctx->npids; i++) {
+ if (ctx->pids[i] == pid) {
+ if (waitpid(pid, &status, WNOHANG) != pid)
+ return -1;
+ if (ctx->exit_statuses_out)
+ ctx->exit_statuses_out[i] = status;
+ ctx->procs_rem -= 1;
+ ctx->pids[i] = -1;
+ while (ctx->npids && ctx->pids[ctx->npids - 1] == -1)
+ ctx->npids -= 1;
+ return 1;
+ }
+ }
+
+ return 0;
+}
+
+
+static int
+reap_children(struct reap_context *reapctx, int flags,
+ int (*reap_mutex_control)(int action /* -1 = acquire, +1 = release */, void *user4), void *user4,
+ int (*on_alien_child_death)(pid_t pid, void *user2), void *user2)
+{
+ siginfo_t info;
+ int r, status;
+ void *new;
+ size_t new_size;
+ size_t i;
+
+ if (reap_mutex_control)
+ if ((*reap_mutex_control)(-1, user4))
+ goto fail;
+
+ if (nreaped_elsewhere) {
+ for (i = 0; i < reapctx->npids; i++) {
+ if (unlist_reap(reapctx->pids[i], &status)) {
+ if (reapctx->exit_statuses_out)
+ reapctx->exit_statuses_out[i] = status;
+ reapctx->procs_rem -= 1;
+ reapctx->pids[i] = -1;
+ }
+ }
+ while (reapctx->npids && reapctx->pids[reapctx->npids - 1] == -1)
+ reapctx->npids -= 1;
+ }
+
+ for (;;) {
+ info.si_pid = 0;
+ if (waitid(P_ALL, 0, &info, flags | WNOWAIT))
+ goto fail;
+ if (info.si_pid < 1)
+ break;
+
+ r = reap_child(reapctx, info.si_pid);
+ if (r < 0)
+ goto fail;
+ if (r > 0)
+ continue;
+
+ if (on_alien_child_death) {
+ r = (*on_alien_child_death)(info.si_pid, user2);
+ if (r < 0)
+ goto fail;
+ if (r > 0)
+ continue;
+ }
+
+ if (nreaped_elsewhere == reaped_elsewhere_size) {
+ new_size = reaped_elsewhere_size + 4;
+ new = realloc(reaped_elsewhere, new_size * sizeof(*reaped_elsewhere));
+ if (!new)
+ goto fail;
+ reaped_elsewhere = new;
+ reaped_elsewhere_size = new_size;
+ }
+
+ if (waitpid(info.si_pid, &status, WNOHANG) != info.si_pid) {
+ if (!nreaped_elsewhere) {
+ free(reaped_elsewhere);
+ reaped_elsewhere = NULL;
+ reaped_elsewhere_size = 0;
+ }
+ goto fail;
+ }
+
+ reaped_elsewhere[nreaped_elsewhere].pid = info.si_pid;
+ reaped_elsewhere[nreaped_elsewhere].status = status;
+ nreaped_elsewhere++;
+ }
+
+ if (reap_mutex_control)
+ if ((*reap_mutex_control)(+1, user4))
+ goto fail;
+ return 0;
+
+fail:
+ if (reap_mutex_control)
+ if ((*reap_mutex_control)(+1, user4))
+ goto fail;
+ return -1;
+}
+
+
+int
+libexec_run_pipeline(int (*on_alien_epoll)(int alien_epoll, uint32_t events, void *user1), int alien_epoll, void *user1,
+ int (*on_alien_child_death)(pid_t pid, void *user2), void *user2,
+ int (*after_fork)(struct libexec_command *cmd, int new_fd, void *user3), void *user3,
+ int (*reap_mutex_control)(int action /* -1 = acquire, +1 = release */, void *user4), void *user4,
+ int (*on_interrupt)(void *user5), void *user5,
+ const sigset_t *sigmask /* nullable */,
+ struct libexec_document *const *docs, size_t ndocs, int docs_1_level,
+ struct libexec_command *const *cmds, int *exit_statuses_out /* nullable */, size_t ncmds)
+{
+ struct libexec_document *input_docs = NULL;
+ size_t i, ninput_docs = 0, files_open;
+ struct epoll_event evs[8];
+ int epfd, n, sigchld_set = 0, error, r;
+ struct libexec_document *doc;
+ struct sigaction sigchld, old_sigchld;
+ struct reap_context reapctx = {
+ .pids = NULL,
+ .npids = 0,
+ .procs_rem = 0,
+ .exit_statuses_out = exit_statuses_out
+ };
+
+ if (!ncmds)
+ return 0;
+
+ memset(evs, 0, sizeof(evs));
+
+ epfd = epoll_create1(EPOLL_CLOEXEC);
+ if (epfd < 0)
+ return -1;
+
+ if (sigaction(SIGCHLD, NULL, &old_sigchld))
+ goto fail;
+ if (!(old_sigchld.sa_flags & SA_SIGINFO)) {
+ if (old_sigchld.sa_handler == SIG_IGN || old_sigchld.sa_handler == SIG_DFL) {
+ memset(&sigchld, 0, sizeof(sigchld));
+ sigchld.sa_handler = &sigchld_handler;
+ sigemptyset(&sigchld.sa_mask);
+ if (sigaction(SIGCHLD, &sigchld, NULL))
+ goto fail;
+ sigchld_set = 1;
+ }
+ }
+
+ if (alien_epoll >= 0) {
+ evs[0].events = EPOLLIN;
+ evs[0].data.ptr = NULL;
+ if (epoll_ctl(epfd, EPOLL_CTL_ADD, alien_epoll, &evs[0]))
+ goto fail;
+ }
+
+ reapctx.pids = calloc(ncmds, sizeof(*reapctx.pids));
+ if (!reapctx.pids)
+ goto fail;
+
+ for (i = 0; i < ncmds; i++)
+ if (libexec_spawn(&reapctx.pids[reapctx.npids++ /* may be set on failure */], cmds[i],
+ after_fork, user3, &input_docs, &ninput_docs, O_CLOEXEC | O_NONBLOCK))
+ goto fail;
+
+ for (i = 0; i < ninput_docs; i++) {
+ evs[0].events = EPOLLOUT;
+ evs[0].data.ptr = &input_docs[i];
+ if (epoll_ctl(epfd, EPOLL_CTL_ADD, input_docs[i].fd, &evs[0]))
+ goto fail;
+ }
+
+ for (i = 0; i < ndocs; i++) {
+ doc = docs_1_level ? &(*docs)[i] : docs[i];
+ evs[0].events = EPOLLIN;
+ evs[0].data.ptr = doc;
+ doc->user = 1;
+ if (epoll_ctl(epfd, EPOLL_CTL_ADD, doc->fd, &evs[0]))
+ goto fail;
+ }
+
+ reapctx.procs_rem = ncmds;
+
+ files_open = ninput_docs + ndocs + (size_t)(alien_epoll >= 0);
+ while (files_open) {
+ if (reap_children(&reapctx, WNOHANG, reap_mutex_control, user4, on_alien_child_death, user2))
+ goto fail;
+
+ n = epoll_pwait(epfd, evs, sizeof(evs) / sizeof(*evs), -1, sigmask);
+ if (n < 0) {
+ if (errno != EINTR)
+ goto fail;
+ if (on_interrupt)
+ if ((*on_interrupt)(user5))
+ goto fail;
+ continue;
+ }
+
+ for (i = 0; i < (size_t)n; i++) {
+ if (evs[i].data.ptr == NULL) {
+ r = (*on_alien_epoll)(alien_epoll, evs[i].events, user1);
+ if (r < 0) {
+ goto fail;
+ } else if (r > 0) {
+ if (epoll_ctl(epfd, EPOLL_CTL_DEL, alien_epoll, &evs[i]))
+ goto fail;
+ files_open -= 1;
+ }
+ } else {
+ doc = evs[i].data.ptr;
+ send_recv_again:
+ if (doc->user ? libexec_recv_document(doc) : libexec_send_document(doc)) {
+ if (errno == EAGAIN)
+ continue;
+ if (errno == EINTR) {
+ if (on_interrupt)
+ if ((*on_interrupt)(user5))
+ goto fail;
+ goto send_recv_again;
+ }
+ } else {
+ files_open -= 1;
+ if (doc->user) {
+ close(doc->fd);
+ doc->fd = -1;
+ } else {
+ libexec_destroy_document(doc);
+ }
+ }
+ }
+ }
+ }
+ free(input_docs);
+ close(epfd);
+
+ while (reapctx.procs_rem)
+ if (reap_children(&reapctx, 0, reap_mutex_control, user4, on_alien_child_death, user2))
+ goto fail;
+
+ free(reapctx.pids);
+ if (sigchld_set)
+ if (sigaction(SIGCHLD, &old_sigchld, NULL))
+ abort();
+
+ return 0;
+
+fail:
+ error = errno;
+ while (ninput_docs)
+ libexec_destroy_document(&input_docs[--ninput_docs]);
+ free(input_docs);
+ close(epfd);
+ if (reapctx.npids) {
+ if (reap_mutex_control)
+ if ((*reap_mutex_control)(+1, user4))
+ abort();
+ for (i = 0; i < reapctx.npids; i++)
+ if (reapctx.pids[i] >= 1)
+ if (unlist_reap(reapctx.pids[i], &(int){0}) || kill(reapctx.pids[i], SIGKILL))
+ reapctx.pids[i] = -1;
+ for (i = 0; i < reapctx.npids; i++)
+ if (reapctx.pids[i] >= 1)
+ waitpid(reapctx.pids[i], &(int){0}, 0);
+ if (reap_mutex_control)
+ if ((*reap_mutex_control)(-1, user4))
+ abort();
+ }
+ free(reapctx.pids);
+ if (sigchld_set)
+ if (sigaction(SIGCHLD, &old_sigchld, NULL))
+ abort();
+ errno = error;
+ return -1;
+}
+
+
+#else
+LIBEXEC_CONST__ int main(void) {return 0;} /* TODO test */
+#endif