Blob Blame History Raw
/* SPDX-License-Identifier: MIT */
/*
 * gcc -Wall -O2 -D_GNU_SOURCE -o ucontext-cp ucontext-cp.c -luring
 */
#define _POSIX_C_SOURCE 199309L
#include <stdio.h>
#include <fcntl.h>
#include <string.h>
#include <stdlib.h>
#include <unistd.h>
#include <assert.h>
#include <errno.h>
#include <ucontext.h>
#include <signal.h>
#include <inttypes.h>
#include <sys/types.h>
#include <sys/ioctl.h>
#include <sys/timerfd.h>
#include <sys/poll.h>
#include "liburing.h"

#define QD	64
#define BS	1024

#ifndef SIGSTKSZ
#define SIGSTKSZ 8192
#endif

typedef struct {
	struct io_uring *ring;
	unsigned char stack_buf[SIGSTKSZ];
	ucontext_t ctx_main, ctx_fnew;
} async_context;

typedef struct {
	async_context *pctx;
	int *psuccess;
	int *pfailure;
	int infd;
	int outfd;
} arguments_bundle;

#define DEFINE_AWAIT_OP(operation) 					\
static ssize_t await_##operation(					\
	async_context *pctx,						\
	int fd,								\
	const struct iovec *ioves,					\
	unsigned int nr_vecs,						\
	off_t offset)							\
{									\
	struct io_uring_sqe *sqe = io_uring_get_sqe(pctx->ring);	\
	struct io_uring_cqe *cqe;					\
									\
	if (!sqe)							\
		return -1;						\
									\
	io_uring_prep_##operation(sqe, fd, ioves, nr_vecs, offset);	\
	io_uring_sqe_set_data(sqe, pctx);				\
	swapcontext(&pctx->ctx_fnew, &pctx->ctx_main);			\
	io_uring_peek_cqe(pctx->ring, &cqe);				\
	assert(cqe);							\
	io_uring_cqe_seen(pctx->ring, cqe);				\
									\
	return cqe->res;						\
}

DEFINE_AWAIT_OP(readv)
DEFINE_AWAIT_OP(writev)
#undef DEFINE_AWAIT_OP

int await_poll(async_context *pctx, int fd, short poll_mask) {
	struct io_uring_sqe *sqe = io_uring_get_sqe(pctx->ring);
	struct io_uring_cqe *cqe;
	if (!sqe)
		return -1;

	io_uring_prep_poll_add(sqe, fd, poll_mask);
	io_uring_sqe_set_data(sqe, pctx);
	swapcontext(&pctx->ctx_fnew, &pctx->ctx_main);
	io_uring_peek_cqe(pctx->ring, &cqe);
	assert(cqe);
	io_uring_cqe_seen(pctx->ring, cqe);

	return cqe->res;
}

int await_delay(async_context *pctx, time_t seconds) {
	struct io_uring_sqe *sqe = io_uring_get_sqe(pctx->ring);
	struct io_uring_cqe *cqe;
	struct __kernel_timespec ts = {
		.tv_sec = seconds,
		.tv_nsec = 0
	};

	if (!sqe)
		return -1;

	io_uring_prep_timeout(sqe, &ts, 0, 0);
	io_uring_sqe_set_data(sqe, pctx);
	swapcontext(&pctx->ctx_fnew, &pctx->ctx_main);
	io_uring_peek_cqe(pctx->ring, &cqe);
	assert(cqe);
	io_uring_cqe_seen(pctx->ring, cqe);

	return 0;
}

static int setup_context(async_context *pctx, struct io_uring *ring)
{
	int ret;

	pctx->ring = ring;
	ret = getcontext(&pctx->ctx_fnew);
	if (ret < 0) {
		perror("getcontext");
		return -1;
	}
	pctx->ctx_fnew.uc_stack.ss_sp = &pctx->stack_buf;
	pctx->ctx_fnew.uc_stack.ss_size = sizeof(pctx->stack_buf);
	pctx->ctx_fnew.uc_link = &pctx->ctx_main;

	return 0;
}

static int copy_file(async_context *pctx, int infd, int outfd, struct iovec* piov)
{
	off_t offset = 0;

	for (;;) {
		ssize_t bytes_read;

		printf("%d->%d: readv %ld bytes from %ld\n", infd, outfd, (long) piov->iov_len, (long) offset);
		if ((bytes_read = await_readv(pctx, infd, piov, 1, offset)) < 0) {
			perror("await_readv");
			return 1;
		}
		if (bytes_read == 0)
			return 0;

		piov->iov_len = bytes_read;

		printf("%d->%d: writev %ld bytes from %ld\n", infd, outfd, (long) piov->iov_len, (long) offset);
		if (await_writev(pctx, outfd, piov, 1, offset) != bytes_read) {
			perror("await_writev");
			return 1;
		}
		if (bytes_read < BS)
			return 0;
		offset += bytes_read;

		printf("%d->%d: wait %ds\n", infd, outfd, 1);
		await_delay(pctx, 1);
	}
}

static void copy_file_wrapper(arguments_bundle *pbundle)
{
	struct iovec iov = {
		.iov_base = malloc(BS),
		.iov_len = BS,
	};
	async_context *pctx = pbundle->pctx;

	int ret = copy_file(pctx, pbundle->infd, pbundle->outfd, &iov);

	printf("%d->%d: done with ret code %d\n", pbundle->infd, pbundle->outfd, ret);

	if (ret == 0) {
		++*pbundle->psuccess;
	} else {
		++*pbundle->pfailure;
	}

	free(iov.iov_base);
	close(pbundle->infd);
	close(pbundle->outfd);
	free(pbundle->pctx);
	free(pbundle);

	swapcontext(&pctx->ctx_fnew, &pctx->ctx_main);
}

int main(int argc, char *argv[])
{
	struct io_uring ring;
	int i, req_count, ret;
	int success = 0, failure = 0;

	if (argc < 3) {
		fprintf(stderr, "%s: infile1 outfile1 [infile2 outfile2 [...]]\n", argv[0]);
		return 1;
	}

	ret = io_uring_queue_init(QD, &ring, 0);
	if (ret < 0) {
		fprintf(stderr, "queue_init: %s\n", strerror(-ret));
		return -1;
	}

	req_count = (argc - 1) / 2;
	printf("copying %d files...\n", req_count);

	for (i = 1; i < argc; i += 2) {
		int infd, outfd;

		async_context *pctx = malloc(sizeof(*pctx));

		if (!pctx || setup_context(pctx, &ring))
			return 1;

		infd = open(argv[i], O_RDONLY);
		if (infd < 0) {
			perror("open infile");
			return 1;
		}
		outfd = open(argv[i + 1], O_WRONLY | O_CREAT | O_TRUNC, 0644);
		if (outfd < 0) {
			perror("open outfile");
			return 1;
		}

		arguments_bundle *pbundle = malloc(sizeof(*pbundle));
		pbundle->pctx = pctx;
		pbundle->psuccess = &success;
		pbundle->pfailure = &failure;
		pbundle->infd = infd;
		pbundle->outfd = outfd;

		makecontext(&pctx->ctx_fnew, (void (*)(void)) copy_file_wrapper, 1, pbundle);

		if (swapcontext(&pctx->ctx_main, &pctx->ctx_fnew)) {
			perror("swapcontext");
			return 1;
		}
	}

	/* event loop */
	while (success + failure < req_count) {
		struct io_uring_cqe *cqe;

		/* usually be timed waiting */
		ret = io_uring_submit_and_wait(&ring, 1);
		if (ret < 0) {
			fprintf(stderr, "submit_and_wait: %s\n", strerror(-ret));
			return 1;
		}

		ret = io_uring_wait_cqe(&ring, &cqe);
		if (ret < 0) {
			fprintf(stderr, "wait_cqe: %s\n", strerror(-ret));
			return 1;
		}

		async_context *pctx = io_uring_cqe_get_data(cqe);

		if (swapcontext(&pctx->ctx_main, &pctx->ctx_fnew)) {
			perror("swapcontext");
			return 1;
		}
	}

	io_uring_queue_exit(&ring);

	printf("finished with %d success(es) and %d failure(s)\n", success, failure);

	return failure > 0;
}