aboutsummaryrefslogblamecommitdiffstats
path: root/libexec_run_pipeline.c
blob: 616f172e2986d7277a5950f3b0a017ab8357df6a (plain) (tree)






















































































































































































































































































































































                                                                                                                        
/* See LICENSE file for copyright and license details. */
#include "common.h"
#ifndef TEST


struct reap_context {
	pid_t *pids;
	size_t npids;
	size_t procs_rem;
	int *exit_statuses_out; /* nullable */
};


struct reaped {
	pid_t pid;
	int status;
};


static struct reaped *reaped_elsewhere = NULL;
static size_t nreaped_elsewhere = 0;
static size_t reaped_elsewhere_size = 0;


static void
sigchld_handler(int signo)
{
	(void) signo;
}


static int
unlist_reap(pid_t pid, int *status)
{
	size_t i;

	for (i = 0; i < nreaped_elsewhere; i++)
		if (reaped_elsewhere[i].pid == pid)
			goto found;
	return 0;

found:
	*status = reaped_elsewhere[i].status;
	reaped_elsewhere[i] = reaped_elsewhere[--nreaped_elsewhere];

	if (nreaped_elsewhere <= reaped_elsewhere_size / 4) {
		free(reaped_elsewhere);
		reaped_elsewhere = NULL;
		nreaped_elsewhere = 0;
		reaped_elsewhere_size = 0;
	}

	return 1;
}


static int
reap_child(struct reap_context *ctx, pid_t pid)
{
	size_t i;
	int status;

	if (!ctx->procs_rem)
		return 0;

	for (i = 0; i < ctx->npids; i++) {
		if (ctx->pids[i] == pid) {
			if (waitpid(pid, &status, WNOHANG) != pid)
				return -1;
			if (ctx->exit_statuses_out)
				ctx->exit_statuses_out[i] = status;
			ctx->procs_rem -= 1;
			ctx->pids[i] = -1;
			while (ctx->npids && ctx->pids[ctx->npids - 1] == -1)
				ctx->npids -= 1;
			return 1;
		}
	}

	return 0;
}


static int
reap_children(struct reap_context *reapctx, int flags,
              int (*reap_mutex_control)(int action /* -1 = acquire, +1 = release */, void *user4), void *user4,
              int (*on_alien_child_death)(pid_t pid, void *user2), void *user2)
{
	siginfo_t info;
	int r, status;
	void *new;
	size_t new_size;
	size_t i;

	if (reap_mutex_control)
		if ((*reap_mutex_control)(-1, user4))
			goto fail;

	if (nreaped_elsewhere) {
		for (i = 0; i < reapctx->npids; i++) {
			if (unlist_reap(reapctx->pids[i], &status)) {
				if (reapctx->exit_statuses_out)
					reapctx->exit_statuses_out[i] = status;
				reapctx->procs_rem -= 1;
				reapctx->pids[i] = -1;
			}
		}
		while (reapctx->npids && reapctx->pids[reapctx->npids - 1] == -1)
			reapctx->npids -= 1;
	}

	for (;;) {
		info.si_pid = 0;
		if (waitid(P_ALL, 0, &info, flags | WNOWAIT))
			goto fail;
		if (info.si_pid < 1)
			break;

		r = reap_child(reapctx, info.si_pid);
		if (r < 0)
			goto fail;
		if (r > 0)
			continue;

		if (on_alien_child_death) {
			r = (*on_alien_child_death)(info.si_pid, user2);
			if (r < 0)
				goto fail;
			if (r > 0)
				continue;
		}

		if (nreaped_elsewhere == reaped_elsewhere_size) {
			new_size = reaped_elsewhere_size + 4;
			new = realloc(reaped_elsewhere, new_size * sizeof(*reaped_elsewhere));
			if (!new)
				goto fail;
			reaped_elsewhere = new;
			reaped_elsewhere_size = new_size;
		}

		if (waitpid(info.si_pid, &status, WNOHANG) != info.si_pid) {
			if (!nreaped_elsewhere) {
				free(reaped_elsewhere);
				reaped_elsewhere = NULL;
				reaped_elsewhere_size = 0;
			}
			goto fail;
		}

		reaped_elsewhere[nreaped_elsewhere].pid = info.si_pid;
		reaped_elsewhere[nreaped_elsewhere].status = status;
		nreaped_elsewhere++;
	}

	if (reap_mutex_control)
		if ((*reap_mutex_control)(+1, user4))
			goto fail;
	return 0;

fail:
	if (reap_mutex_control)
		if ((*reap_mutex_control)(+1, user4))
			goto fail;
	return -1;
}


int
libexec_run_pipeline(int (*on_alien_epoll)(int alien_epoll, uint32_t events, void *user1), int alien_epoll, void *user1,
                     int (*on_alien_child_death)(pid_t pid, void *user2), void *user2,
                     int (*after_fork)(struct libexec_command *cmd, int new_fd, void *user3), void *user3,
                     int (*reap_mutex_control)(int action /* -1 = acquire, +1 = release */, void *user4), void *user4,
		     int (*on_interrupt)(void *user5), void *user5,
                     const sigset_t *sigmask /* nullable */,
                     struct libexec_document *const *docs, size_t ndocs, int docs_1_level,
                     struct libexec_command *const *cmds, int *exit_statuses_out /* nullable */, size_t ncmds)
{
	struct libexec_document *input_docs = NULL;
	size_t i, ninput_docs = 0, files_open;
	struct epoll_event evs[8];
	int epfd, n, sigchld_set = 0, error, r;
	struct libexec_document *doc;
	struct sigaction sigchld, old_sigchld;
	struct reap_context reapctx = {
		.pids = NULL,
		.npids = 0,
		.procs_rem = 0,
		.exit_statuses_out = exit_statuses_out
	};

	if (!ncmds)
		return 0;

	memset(evs, 0, sizeof(evs));

	epfd = epoll_create1(EPOLL_CLOEXEC);
	if (epfd < 0)
		return -1;

	if (sigaction(SIGCHLD, NULL, &old_sigchld))
		goto fail;
	if (!(old_sigchld.sa_flags & SA_SIGINFO)) {
		if (old_sigchld.sa_handler == SIG_IGN || old_sigchld.sa_handler == SIG_DFL) {
			memset(&sigchld, 0, sizeof(sigchld));
			sigchld.sa_handler = &sigchld_handler;
			sigemptyset(&sigchld.sa_mask);
			if (sigaction(SIGCHLD, &sigchld, NULL))
				goto fail;
			sigchld_set = 1;
		}
	}

	if (alien_epoll >= 0) {
		evs[0].events = EPOLLIN;
		evs[0].data.ptr = NULL;
		if (epoll_ctl(epfd, EPOLL_CTL_ADD, alien_epoll, &evs[0]))
			goto fail;
	}

	reapctx.pids = calloc(ncmds, sizeof(*reapctx.pids));
	if (!reapctx.pids)
		goto fail;

	for (i = 0; i < ncmds; i++)
		if (libexec_spawn(&reapctx.pids[reapctx.npids++ /* may be set on failure */], cmds[i],
		                  after_fork, user3, &input_docs, &ninput_docs, O_CLOEXEC | O_NONBLOCK))
			goto fail;

	for (i = 0; i < ninput_docs; i++) {
		evs[0].events = EPOLLOUT;
		evs[0].data.ptr = &input_docs[i];
		if (epoll_ctl(epfd, EPOLL_CTL_ADD, input_docs[i].fd, &evs[0]))
			goto fail;
	}

	for (i = 0; i < ndocs; i++) {
		doc = docs_1_level ? &(*docs)[i] : docs[i];
		evs[0].events = EPOLLIN;
		evs[0].data.ptr = doc;
		doc->user = 1;
		if (epoll_ctl(epfd, EPOLL_CTL_ADD, doc->fd, &evs[0]))
			goto fail;
	}

	reapctx.procs_rem = ncmds;

	files_open = ninput_docs + ndocs + (size_t)(alien_epoll >= 0);
	while (files_open) {
		if (reap_children(&reapctx, WNOHANG, reap_mutex_control, user4, on_alien_child_death, user2))
			goto fail;

		n = epoll_pwait(epfd, evs, sizeof(evs) / sizeof(*evs), -1, sigmask);
		if (n < 0) {
			if (errno != EINTR)
				goto fail;
			if (on_interrupt)
				if ((*on_interrupt)(user5))
					goto fail;
			continue;
		}

		for (i = 0; i < (size_t)n; i++) {
			if (evs[i].data.ptr == NULL) {
				r = (*on_alien_epoll)(alien_epoll, evs[i].events, user1);
				if (r < 0) {
					goto fail;
				} else if (r > 0) {
					if (epoll_ctl(epfd, EPOLL_CTL_DEL, alien_epoll, &evs[i]))
						goto fail;
					files_open -= 1;
				}
			} else {
				doc = evs[i].data.ptr;
			send_recv_again:
				if (doc->user ? libexec_recv_document(doc) : libexec_send_document(doc)) {
					if (errno == EAGAIN)
						continue;
					if (errno == EINTR) {
						if (on_interrupt)
							if ((*on_interrupt)(user5))
								goto fail;
						goto send_recv_again;
					}
				} else {
					files_open -= 1;
					if (doc->user) {
						close(doc->fd);
						doc->fd = -1;
					} else {
						libexec_destroy_document(doc);
					}
				}
			}
		}
	}
	free(input_docs);
	close(epfd);

	while (reapctx.procs_rem)
		if (reap_children(&reapctx, 0, reap_mutex_control, user4, on_alien_child_death, user2))
			goto fail;

	free(reapctx.pids);
	if (sigchld_set)
		if (sigaction(SIGCHLD, &old_sigchld, NULL))
			abort();

	return 0;

fail:
	error = errno;
	while (ninput_docs)
		libexec_destroy_document(&input_docs[--ninput_docs]);
	free(input_docs);
	close(epfd);
	if (reapctx.npids) {
		if (reap_mutex_control)
			if ((*reap_mutex_control)(+1, user4))
				abort();
		for (i = 0; i < reapctx.npids; i++)
			if (reapctx.pids[i] >= 1)
				if (unlist_reap(reapctx.pids[i], &(int){0}) || kill(reapctx.pids[i], SIGKILL))
					reapctx.pids[i] = -1;
		for (i = 0; i < reapctx.npids; i++)
			if (reapctx.pids[i] >= 1)
				waitpid(reapctx.pids[i], &(int){0}, 0);
		if (reap_mutex_control)
			if ((*reap_mutex_control)(-1, user4))
				abort();
	}
	free(reapctx.pids);
	if (sigchld_set)
		if (sigaction(SIGCHLD, &old_sigchld, NULL))
			abort();
	errno = error;
	return -1;
}


#else
LIBEXEC_CONST__ int main(void) {return 0;} /* TODO test */
#endif