From f29845d3bba7032c7f61d25a34441d9ab7ff8a4e Mon Sep 17 00:00:00 2001 From: Mattias Andrée Date: Sat, 14 Jan 2017 06:33:00 +0100 Subject: Add definition checks for posix_fadvise flags + blind-concat: add -j MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Mattias Andrée --- src/blind-concat.c | 137 ++++++++++++++++++++++++++++++++++++++++++++++++++--- 1 file changed, 130 insertions(+), 7 deletions(-) (limited to 'src/blind-concat.c') diff --git a/src/blind-concat.c b/src/blind-concat.c index d789a5a..63a9b21 100644 --- a/src/blind-concat.c +++ b/src/blind-concat.c @@ -2,16 +2,21 @@ #include "stream.h" #include "util.h" +#if defined(HAVE_SYS_EPOLL_H) +# include +#endif #include +#include #include #include +#include #include #include -USAGE("[-o output-file] first-stream ... last-stream") +USAGE("[-o output-file [-j jobs]] first-stream ... last-stream") static void -concat_to_stdout(int argc, char *argv[]) +concat_to_stdout(int argc, char *argv[], const char *fname) { struct stream *streams; size_t frames = 0; @@ -32,11 +37,11 @@ concat_to_stdout(int argc, char *argv[]) streams->frames = frames; fprint_stream_head(stdout, streams); - efflush(stdout, ""); + efflush(stdout, fname); for (i = 0; i < argc; i++) { for (; eread_stream(streams + i, SIZE_MAX); streams[i].ptr = 0) - ewriteall(STDOUT_FILENO, streams[i].buf, streams[i].ptr, ""); + ewriteall(STDOUT_FILENO, streams[i].buf, streams[i].ptr, fname); close(streams[i].fd); } @@ -90,26 +95,144 @@ concat_to_file(int argc, char *argv[], char *output_file) close(fd); } +static void +concat_to_file_parallel(int argc, char *argv[], char *output_file, size_t jobs) +{ + struct epoll_event *events; + struct stream *streams; + size_t *ptrs; + char head[STREAM_HEAD_MAX]; + size_t ptr, frame_size, frames = 0, next = 0, j; + ssize_t headlen; + int fd, i, n, pollfd; + +#if !defined(HAVE_SYS_EPOLL_H) + fd = eopen(output_file, O_WRONLY | O_CREAT | O_TRUNC, 0666); + if (fd != STDOUT_FILENO && dup2(fd, STDOUT_FILENO) == -1) + eprintf("dup2:"); + concat_to_stdout(argc, argv, output_file); + return; +#else + + if (jobs > (size_t)argc) + jobs = (size_t)argc; + + fd = eopen(output_file, O_RDWR | O_CREAT | O_TRUNC, 0666); + events = emalloc(jobs * sizeof(*events)); + streams = emalloc((size_t)argc * sizeof(*streams)); + ptrs = emalloc((size_t)argc * sizeof(*ptrs)); + + for (i = 0; i < argc; i++) { + streams[i].file = argv[i]; + streams[i].fd = eopen(streams[i].file, O_RDONLY); + einit_stream(streams + i); + if (i) + echeck_compat(streams + i, streams); + if (streams[i].frames > SIZE_MAX - frames) + eprintf("resulting video is too long\n"); + frames += streams[i].frames; + } + + sprintf(head, "%zu %zu %zu %s\n%cuivf%zn", + frames, streams->width, streams->height, streams->pixfmt, 0, &headlen); + + echeck_frame_size(streams->width, streams->height, streams->pixel_size, 0, output_file); + frame_size = streams->width * streams->height * streams->pixel_size; + ptr = (size_t)headlen; + for (i = 0; i < argc; i++) { + ptrs[i] = ptr; + ptr += streams->frames * frame_size; + } + if (ftruncate(fd, (off_t)ptr)) + eprintf("ftruncate %s:", output_file); +#if defined(POSIX_FADV_RANDOM) + posix_fadvise(fd, (size_t)headlen, 0, POSIX_FADV_RANDOM); +#endif + + pollfd = epoll_create1(0); + if (pollfd == -1) + eprintf("epoll_create1:"); + + epwriteall(fd, head, (size_t)head_len, 0, output_file); + for (i = 0; i < argc; i++) { + epwriteall(fd, streams[i].buf, streams[i].ptr, ptrs[i], output_file); + ptrs[i] += streams[i].ptr; + streams[i].ptr = 0; + } + + for (j = 0; j < jobs; j++, next++) { + events->events = EPOLLIN; + events->data.u64 = next; + if (epoll_ctl(pollfd, EPOLL_CTL_ADD, streams[next].fd, events) == -1) { + if ((errno == ENOMEM || errno == ENOSPC) && j) + break; + eprintf("epoll_ctl:"); + } + } + jobs = j; + + while (jobs) { + n = epoll_wait(pollfd, events, jobs, -1); + if (n < 0) + eprintf("epoll_wait:"); + for (i = 0; i < n; i++) { + j = events[i].data.u64; + if (!eread_stream(streams + j)) { + close(streams[j].fd); + if (next < (size_t)argc) { + events->events = EPOLLIN; + events->data.u64 = next; + if (epoll_ctl(pollfd, EPOLL_CTL_ADD, streams[next].fd, events) == -1) { + if ((errno == ENOMEM || errno == ENOSPC) && j) + break; + eprintf("epoll_ctl:"); + } + next++; + } else { + jobs--; + } + } else { + epwriteall(fd, streams[j].buf, streams[j].ptr, ptrs[j], output_file); + ptrs[j] += streams[j].ptr; + streams[j].ptr = 0; + } + } + } + + close(pollfd); + free(events); + free(streams); + free(ptrs); + +#endif +} + int main(int argc, char *argv[]) { char *output_file = NULL; + size_t jobs = 0; ARGBEGIN { case 'o': output_file = EARG(); break; + case 'j': + jobs = etozu_flag('j', EARG(), 1, SHRT_MAX); + break; default: usage(); } ARGEND; - if (argc < 2) + if (argc < 2 || (jobs && !output_file)) usage(); - if (output_file) + if (jobs) + concat_to_file_parallel(argc, argv, output_file, jobs); + else if (output_file) concat_to_file(argc, argv, output_file); else - concat_to_stdout(argc, argv); + concat_to_stdout(argc, argv, ""); return 0; } -- cgit v1.2.3-70-g09d2