/* See LICENSE file for copyright and license details. */ #include "common.h" #ifndef TEST struct reap_context { pid_t *pids; size_t npids; size_t procs_rem; int *exit_statuses_out; /* nullable */ }; struct reaped { pid_t pid; int status; }; static struct reaped *reaped_elsewhere = NULL; static size_t nreaped_elsewhere = 0; static size_t reaped_elsewhere_size = 0; static void sigchld_handler(int signo) { (void) signo; } static int unlist_reap(pid_t pid, int *status) { size_t i; for (i = 0; i < nreaped_elsewhere; i++) if (reaped_elsewhere[i].pid == pid) goto found; return 0; found: *status = reaped_elsewhere[i].status; reaped_elsewhere[i] = reaped_elsewhere[--nreaped_elsewhere]; if (nreaped_elsewhere <= reaped_elsewhere_size / 4) { free(reaped_elsewhere); reaped_elsewhere = NULL; nreaped_elsewhere = 0; reaped_elsewhere_size = 0; } return 1; } static int reap_child(struct reap_context *ctx, pid_t pid) { size_t i; int status; if (!ctx->procs_rem) return 0; for (i = 0; i < ctx->npids; i++) { if (ctx->pids[i] == pid) { if (waitpid(pid, &status, WNOHANG) != pid) return -1; if (ctx->exit_statuses_out) ctx->exit_statuses_out[i] = status; ctx->procs_rem -= 1; ctx->pids[i] = -1; while (ctx->npids && ctx->pids[ctx->npids - 1] == -1) ctx->npids -= 1; return 1; } } return 0; } static int reap_children(struct reap_context *reapctx, int flags, int (*reap_mutex_control)(int action /* -1 = acquire, +1 = release */, void *user4), void *user4, int (*on_alien_child_death)(pid_t pid, void *user2), void *user2) { siginfo_t info; int r, status; void *new; size_t new_size; size_t i; if (reap_mutex_control) if ((*reap_mutex_control)(-1, user4)) goto fail; if (nreaped_elsewhere) { for (i = 0; i < reapctx->npids; i++) { if (unlist_reap(reapctx->pids[i], &status)) { if (reapctx->exit_statuses_out) reapctx->exit_statuses_out[i] = status; reapctx->procs_rem -= 1; reapctx->pids[i] = -1; } } while (reapctx->npids && reapctx->pids[reapctx->npids - 1] == -1) reapctx->npids -= 1; } for (;;) { info.si_pid = 0; if (waitid(P_ALL, 0, &info, flags | WNOWAIT)) goto fail; if (info.si_pid < 1) break; r = reap_child(reapctx, info.si_pid); if (r < 0) goto fail; if (r > 0) continue; if (on_alien_child_death) { r = (*on_alien_child_death)(info.si_pid, user2); if (r < 0) goto fail; if (r > 0) continue; } if (nreaped_elsewhere == reaped_elsewhere_size) { new_size = reaped_elsewhere_size + 4; new = realloc(reaped_elsewhere, new_size * sizeof(*reaped_elsewhere)); if (!new) goto fail; reaped_elsewhere = new; reaped_elsewhere_size = new_size; } if (waitpid(info.si_pid, &status, WNOHANG) != info.si_pid) { if (!nreaped_elsewhere) { free(reaped_elsewhere); reaped_elsewhere = NULL; reaped_elsewhere_size = 0; } goto fail; } reaped_elsewhere[nreaped_elsewhere].pid = info.si_pid; reaped_elsewhere[nreaped_elsewhere].status = status; nreaped_elsewhere++; } if (reap_mutex_control) if ((*reap_mutex_control)(+1, user4)) goto fail; return 0; fail: if (reap_mutex_control) if ((*reap_mutex_control)(+1, user4)) goto fail; return -1; } int libexec_run_pipeline(int (*on_alien_epoll)(int alien_epoll, uint32_t events, void *user1), int alien_epoll, void *user1, int (*on_alien_child_death)(pid_t pid, void *user2), void *user2, int (*after_fork)(struct libexec_command *cmd, int new_fd, void *user3), void *user3, int (*reap_mutex_control)(int action /* -1 = acquire, +1 = release */, void *user4), void *user4, int (*on_interrupt)(void *user5), void *user5, const sigset_t *sigmask /* nullable */, struct libexec_document *const *docs, size_t ndocs, int docs_1_level, struct libexec_command *const *cmds, int *exit_statuses_out /* nullable */, size_t ncmds) { struct libexec_document *input_docs = NULL; size_t i, ninput_docs = 0, files_open; struct epoll_event evs[8]; int epfd, n, sigchld_set = 0, error, r; struct libexec_document *doc; struct sigaction sigchld, old_sigchld; struct reap_context reapctx = { .pids = NULL, .npids = 0, .procs_rem = 0, .exit_statuses_out = exit_statuses_out }; if (!ncmds) return 0; memset(evs, 0, sizeof(evs)); epfd = epoll_create1(EPOLL_CLOEXEC); if (epfd < 0) return -1; if (sigaction(SIGCHLD, NULL, &old_sigchld)) goto fail; if (!(old_sigchld.sa_flags & SA_SIGINFO)) { if (old_sigchld.sa_handler == SIG_IGN || old_sigchld.sa_handler == SIG_DFL) { memset(&sigchld, 0, sizeof(sigchld)); sigchld.sa_handler = &sigchld_handler; sigemptyset(&sigchld.sa_mask); if (sigaction(SIGCHLD, &sigchld, NULL)) goto fail; sigchld_set = 1; } } if (alien_epoll >= 0) { evs[0].events = EPOLLIN; evs[0].data.ptr = NULL; if (epoll_ctl(epfd, EPOLL_CTL_ADD, alien_epoll, &evs[0])) goto fail; } reapctx.pids = calloc(ncmds, sizeof(*reapctx.pids)); if (!reapctx.pids) goto fail; for (i = 0; i < ncmds; i++) if (libexec_spawn(&reapctx.pids[reapctx.npids++ /* may be set on failure */], cmds[i], after_fork, user3, &input_docs, &ninput_docs, O_CLOEXEC | O_NONBLOCK)) goto fail; for (i = 0; i < ninput_docs; i++) { evs[0].events = EPOLLOUT; evs[0].data.ptr = &input_docs[i]; if (epoll_ctl(epfd, EPOLL_CTL_ADD, input_docs[i].fd, &evs[0])) goto fail; } for (i = 0; i < ndocs; i++) { doc = docs_1_level ? &(*docs)[i] : docs[i]; evs[0].events = EPOLLIN; evs[0].data.ptr = doc; doc->user = 1; if (epoll_ctl(epfd, EPOLL_CTL_ADD, doc->fd, &evs[0])) goto fail; } reapctx.procs_rem = ncmds; files_open = ninput_docs + ndocs + (size_t)(alien_epoll >= 0); while (files_open) { if (reap_children(&reapctx, WNOHANG, reap_mutex_control, user4, on_alien_child_death, user2)) goto fail; n = epoll_pwait(epfd, evs, sizeof(evs) / sizeof(*evs), -1, sigmask); if (n < 0) { if (errno != EINTR) goto fail; if (on_interrupt) if ((*on_interrupt)(user5)) goto fail; continue; } for (i = 0; i < (size_t)n; i++) { if (evs[i].data.ptr == NULL) { r = (*on_alien_epoll)(alien_epoll, evs[i].events, user1); if (r < 0) { goto fail; } else if (r > 0) { if (epoll_ctl(epfd, EPOLL_CTL_DEL, alien_epoll, &evs[i])) goto fail; files_open -= 1; } } else { doc = evs[i].data.ptr; send_recv_again: if (doc->user ? libexec_recv_document(doc) : libexec_send_document(doc)) { if (errno == EAGAIN) continue; if (errno == EINTR) { if (on_interrupt) if ((*on_interrupt)(user5)) goto fail; goto send_recv_again; } } else { files_open -= 1; if (doc->user) { close(doc->fd); doc->fd = -1; } else { libexec_destroy_document(doc); } } } } } free(input_docs); close(epfd); while (reapctx.procs_rem) if (reap_children(&reapctx, 0, reap_mutex_control, user4, on_alien_child_death, user2)) goto fail; free(reapctx.pids); if (sigchld_set) if (sigaction(SIGCHLD, &old_sigchld, NULL)) abort(); return 0; fail: error = errno; while (ninput_docs) libexec_destroy_document(&input_docs[--ninput_docs]); free(input_docs); close(epfd); if (reapctx.npids) { if (reap_mutex_control) if ((*reap_mutex_control)(+1, user4)) abort(); for (i = 0; i < reapctx.npids; i++) if (reapctx.pids[i] >= 1) if (unlist_reap(reapctx.pids[i], &(int){0}) || kill(reapctx.pids[i], SIGKILL)) reapctx.pids[i] = -1; for (i = 0; i < reapctx.npids; i++) if (reapctx.pids[i] >= 1) waitpid(reapctx.pids[i], &(int){0}, 0); if (reap_mutex_control) if ((*reap_mutex_control)(-1, user4)) abort(); } free(reapctx.pids); if (sigchld_set) if (sigaction(SIGCHLD, &old_sigchld, NULL)) abort(); errno = error; return -1; } #else LIBEXEC_CONST__ int main(void) {return 0;} /* TODO test */ #endif