/* See LICENSE file for copyright and license details. */
#include "common.h"
#ifndef TEST
struct reap_context {
pid_t *pids;
size_t npids;
size_t procs_rem;
int *exit_statuses_out; /* nullable */
};
struct reaped {
pid_t pid;
int status;
};
static struct reaped *reaped_elsewhere = NULL;
static size_t nreaped_elsewhere = 0;
static size_t reaped_elsewhere_size = 0;
static void
sigchld_handler(int signo)
{
(void) signo;
}
static int
unlist_reap(pid_t pid, int *status)
{
size_t i;
for (i = 0; i < nreaped_elsewhere; i++)
if (reaped_elsewhere[i].pid == pid)
goto found;
return 0;
found:
*status = reaped_elsewhere[i].status;
reaped_elsewhere[i] = reaped_elsewhere[--nreaped_elsewhere];
if (nreaped_elsewhere <= reaped_elsewhere_size / 4) {
free(reaped_elsewhere);
reaped_elsewhere = NULL;
nreaped_elsewhere = 0;
reaped_elsewhere_size = 0;
}
return 1;
}
static int
reap_child(struct reap_context *ctx, pid_t pid)
{
size_t i;
int status;
if (!ctx->procs_rem)
return 0;
for (i = 0; i < ctx->npids; i++) {
if (ctx->pids[i] == pid) {
if (waitpid(pid, &status, WNOHANG) != pid)
return -1;
if (ctx->exit_statuses_out)
ctx->exit_statuses_out[i] = status;
ctx->procs_rem -= 1;
ctx->pids[i] = -1;
while (ctx->npids && ctx->pids[ctx->npids - 1] == -1)
ctx->npids -= 1;
return 1;
}
}
return 0;
}
static int
reap_children(struct reap_context *reapctx, int flags,
int (*reap_mutex_control)(int action /* -1 = acquire, +1 = release */, void *user4), void *user4,
int (*on_alien_child_death)(pid_t pid, void *user2), void *user2)
{
siginfo_t info;
int r, status;
void *new;
size_t new_size;
size_t i;
if (reap_mutex_control)
if ((*reap_mutex_control)(-1, user4))
goto fail;
if (nreaped_elsewhere) {
for (i = 0; i < reapctx->npids; i++) {
if (unlist_reap(reapctx->pids[i], &status)) {
if (reapctx->exit_statuses_out)
reapctx->exit_statuses_out[i] = status;
reapctx->procs_rem -= 1;
reapctx->pids[i] = -1;
}
}
while (reapctx->npids && reapctx->pids[reapctx->npids - 1] == -1)
reapctx->npids -= 1;
}
for (;;) {
info.si_pid = 0;
if (waitid(P_ALL, 0, &info, flags | WNOWAIT))
goto fail;
if (info.si_pid < 1)
break;
r = reap_child(reapctx, info.si_pid);
if (r < 0)
goto fail;
if (r > 0)
continue;
if (on_alien_child_death) {
r = (*on_alien_child_death)(info.si_pid, user2);
if (r < 0)
goto fail;
if (r > 0)
continue;
}
if (nreaped_elsewhere == reaped_elsewhere_size) {
new_size = reaped_elsewhere_size + 4;
new = realloc(reaped_elsewhere, new_size * sizeof(*reaped_elsewhere));
if (!new)
goto fail;
reaped_elsewhere = new;
reaped_elsewhere_size = new_size;
}
if (waitpid(info.si_pid, &status, WNOHANG) != info.si_pid) {
if (!nreaped_elsewhere) {
free(reaped_elsewhere);
reaped_elsewhere = NULL;
reaped_elsewhere_size = 0;
}
goto fail;
}
reaped_elsewhere[nreaped_elsewhere].pid = info.si_pid;
reaped_elsewhere[nreaped_elsewhere].status = status;
nreaped_elsewhere++;
}
if (reap_mutex_control)
if ((*reap_mutex_control)(+1, user4))
goto fail;
return 0;
fail:
if (reap_mutex_control)
if ((*reap_mutex_control)(+1, user4))
goto fail;
return -1;
}
int
libexec_run_pipeline(int (*on_alien_epoll)(int alien_epoll, uint32_t events, void *user1), int alien_epoll, void *user1,
int (*on_alien_child_death)(pid_t pid, void *user2), void *user2,
int (*after_fork)(struct libexec_command *cmd, int new_fd, void *user3), void *user3,
int (*reap_mutex_control)(int action /* -1 = acquire, +1 = release */, void *user4), void *user4,
int (*on_interrupt)(void *user5), void *user5,
const sigset_t *sigmask /* nullable */,
struct libexec_document *const *docs, size_t ndocs, int docs_1_level,
struct libexec_command *const *cmds, int *exit_statuses_out /* nullable */, size_t ncmds)
{
struct libexec_document *input_docs = NULL;
size_t i, ninput_docs = 0, files_open;
struct epoll_event evs[8];
int epfd, n, sigchld_set = 0, error, r;
struct libexec_document *doc;
struct sigaction sigchld, old_sigchld;
struct reap_context reapctx = {
.pids = NULL,
.npids = 0,
.procs_rem = 0,
.exit_statuses_out = exit_statuses_out
};
if (!ncmds)
return 0;
memset(evs, 0, sizeof(evs));
epfd = epoll_create1(EPOLL_CLOEXEC);
if (epfd < 0)
return -1;
if (sigaction(SIGCHLD, NULL, &old_sigchld))
goto fail;
if (!(old_sigchld.sa_flags & SA_SIGINFO)) {
if (old_sigchld.sa_handler == SIG_IGN || old_sigchld.sa_handler == SIG_DFL) {
memset(&sigchld, 0, sizeof(sigchld));
sigchld.sa_handler = &sigchld_handler;
sigemptyset(&sigchld.sa_mask);
if (sigaction(SIGCHLD, &sigchld, NULL))
goto fail;
sigchld_set = 1;
}
}
if (alien_epoll >= 0) {
evs[0].events = EPOLLIN;
evs[0].data.ptr = NULL;
if (epoll_ctl(epfd, EPOLL_CTL_ADD, alien_epoll, &evs[0]))
goto fail;
}
reapctx.pids = calloc(ncmds, sizeof(*reapctx.pids));
if (!reapctx.pids)
goto fail;
for (i = 0; i < ncmds; i++)
if (libexec_spawn(&reapctx.pids[reapctx.npids++ /* may be set on failure */], cmds[i],
after_fork, user3, &input_docs, &ninput_docs, O_CLOEXEC | O_NONBLOCK))
goto fail;
for (i = 0; i < ninput_docs; i++) {
evs[0].events = EPOLLOUT;
evs[0].data.ptr = &input_docs[i];
if (epoll_ctl(epfd, EPOLL_CTL_ADD, input_docs[i].fd, &evs[0]))
goto fail;
}
for (i = 0; i < ndocs; i++) {
doc = docs_1_level ? &(*docs)[i] : docs[i];
evs[0].events = EPOLLIN;
evs[0].data.ptr = doc;
doc->user = 1;
if (epoll_ctl(epfd, EPOLL_CTL_ADD, doc->fd, &evs[0]))
goto fail;
}
reapctx.procs_rem = ncmds;
files_open = ninput_docs + ndocs + (size_t)(alien_epoll >= 0);
while (files_open) {
if (reap_children(&reapctx, WNOHANG, reap_mutex_control, user4, on_alien_child_death, user2))
goto fail;
n = epoll_pwait(epfd, evs, sizeof(evs) / sizeof(*evs), -1, sigmask);
if (n < 0) {
if (errno != EINTR)
goto fail;
if (on_interrupt)
if ((*on_interrupt)(user5))
goto fail;
continue;
}
for (i = 0; i < (size_t)n; i++) {
if (evs[i].data.ptr == NULL) {
r = (*on_alien_epoll)(alien_epoll, evs[i].events, user1);
if (r < 0) {
goto fail;
} else if (r > 0) {
if (epoll_ctl(epfd, EPOLL_CTL_DEL, alien_epoll, &evs[i]))
goto fail;
files_open -= 1;
}
} else {
doc = evs[i].data.ptr;
send_recv_again:
if (doc->user ? libexec_recv_document(doc) : libexec_send_document(doc)) {
if (errno == EAGAIN)
continue;
if (errno == EINTR) {
if (on_interrupt)
if ((*on_interrupt)(user5))
goto fail;
goto send_recv_again;
}
} else {
files_open -= 1;
if (doc->user) {
close(doc->fd);
doc->fd = -1;
} else {
libexec_destroy_document(doc);
}
}
}
}
}
free(input_docs);
close(epfd);
while (reapctx.procs_rem)
if (reap_children(&reapctx, 0, reap_mutex_control, user4, on_alien_child_death, user2))
goto fail;
free(reapctx.pids);
if (sigchld_set)
if (sigaction(SIGCHLD, &old_sigchld, NULL))
abort();
return 0;
fail:
error = errno;
while (ninput_docs)
libexec_destroy_document(&input_docs[--ninput_docs]);
free(input_docs);
close(epfd);
if (reapctx.npids) {
if (reap_mutex_control)
if ((*reap_mutex_control)(+1, user4))
abort();
for (i = 0; i < reapctx.npids; i++)
if (reapctx.pids[i] >= 1)
if (unlist_reap(reapctx.pids[i], &(int){0}) || kill(reapctx.pids[i], SIGKILL))
reapctx.pids[i] = -1;
for (i = 0; i < reapctx.npids; i++)
if (reapctx.pids[i] >= 1)
waitpid(reapctx.pids[i], &(int){0}, 0);
if (reap_mutex_control)
if ((*reap_mutex_control)(-1, user4))
abort();
}
free(reapctx.pids);
if (sigchld_set)
if (sigaction(SIGCHLD, &old_sigchld, NULL))
abort();
errno = error;
return -1;
}
#else
LIBEXEC_CONST__ int main(void) {return 0;} /* TODO test */
#endif