Blob Blame History Raw
/*
 * Copyright (c) 2005 Hannes Reinecke, Suse
 */
#define _GNU_SOURCE
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <sys/types.h>
#include <sys/stat.h>
#include <unistd.h>
#include <fcntl.h>
#include <sys/ioctl.h>
#include <linux/fs.h>
#include <errno.h>
#include <unistd.h>
#include <libaio.h>

#include "checkers.h"
#include "../libmultipath/debug.h"
#include "../libmultipath/time-util.h"

#define AIO_GROUP_SIZE 1024

/* Note: This checker type relies on the fact that only one checker can be run
 * at a time, since multiple checkers share the same aio_group, and must be
 * able to modify other checker's async_reqs. If multple checkers become able
 * to be run at the same time, this checker will need to add locking, and
 * probably polling on event fds, to deal with that */

struct aio_group {
	struct list_head node;
	int holders;
	io_context_t ioctx;
	struct list_head orphans;
};

struct async_req {
	struct iocb io;
	unsigned int blksize;
	unsigned char *	buf;
	struct list_head node;
	int state; /* PATH_REMOVED means this is an orphan */
};

static LIST_HEAD(aio_grp_list);

enum {
	MSG_DIRECTIO_UNKNOWN = CHECKER_FIRST_MSGID,
	MSG_DIRECTIO_PENDING,
	MSG_DIRECTIO_BLOCKSIZE,
};

#define _IDX(x) (MSG_DIRECTIO_##x - CHECKER_FIRST_MSGID)
const char *libcheck_msgtable[] = {
	[_IDX(UNKNOWN)] = " is not available",
	[_IDX(PENDING)] = " is waiting on aio",
	[_IDX(BLOCKSIZE)] = " cannot get blocksize, set default",
	NULL,
};

#define LOG(prio, fmt, args...) condlog(prio, "directio: " fmt, ##args)

struct directio_context {
	int		running;
	int		reset_flags;
	struct aio_group *aio_grp;
	struct async_req *req;
};

static struct aio_group *
add_aio_group(void)
{
	struct aio_group *aio_grp;

	aio_grp = malloc(sizeof(struct aio_group));
	if (!aio_grp)
		return NULL;
	memset(aio_grp, 0, sizeof(struct aio_group));
	INIT_LIST_HEAD(&aio_grp->orphans);

	if (io_setup(AIO_GROUP_SIZE, &aio_grp->ioctx) != 0) {
		LOG(1, "io_setup failed");
		if (errno == EAGAIN)
			LOG(1, "global number of io events too small. Increase fs.aio-max-nr with sysctl");
		free(aio_grp);
		return NULL;
	}
	list_add(&aio_grp->node, &aio_grp_list);
	return aio_grp;
}

static int
set_aio_group(struct directio_context *ct)
{
	struct aio_group *aio_grp = NULL;

	list_for_each_entry(aio_grp, &aio_grp_list, node)
		if (aio_grp->holders < AIO_GROUP_SIZE)
			goto found;
	aio_grp = add_aio_group();
	if (!aio_grp) {
		ct->aio_grp = NULL;
		return -1;
	}
found:
	aio_grp->holders++;
	ct->aio_grp = aio_grp;
	return 0;
}

static void
remove_aio_group(struct aio_group *aio_grp)
{
	struct async_req *req, *tmp;

	io_destroy(aio_grp->ioctx);
	list_for_each_entry_safe(req, tmp, &aio_grp->orphans, node) {
		list_del(&req->node);
		free(req->buf);
		free(req);
	}
	list_del(&aio_grp->node);
	free(aio_grp);
}

/* If an aio_group is completely full of orphans, then no checkers can
 * use it, which means that no checkers can clear out the orphans. To
 * avoid keeping the useless group around, simply remove remove the
 * group */
static void
check_orphaned_group(struct aio_group *aio_grp)
{
	int count = 0;
	struct list_head *item;

	if (aio_grp->holders < AIO_GROUP_SIZE)
		return;
	list_for_each(item, &aio_grp->orphans)
		count++;
	if (count >= AIO_GROUP_SIZE)
		remove_aio_group(aio_grp);
}

void libcheck_reset (void)
{
	struct aio_group *aio_grp, *tmp;

	list_for_each_entry_safe(aio_grp, tmp, &aio_grp_list, node)
		remove_aio_group(aio_grp);
}

int libcheck_init (struct checker * c)
{
	unsigned long pgsize = getpagesize();
	struct directio_context * ct;
	struct async_req *req = NULL;
	long flags;

	ct = malloc(sizeof(struct directio_context));
	if (!ct)
		return 1;
	memset(ct, 0, sizeof(struct directio_context));

	if (set_aio_group(ct) < 0)
		goto out;

	req = malloc(sizeof(struct async_req));
	if (!req) {
		goto out;
	}
	memset(req, 0, sizeof(struct async_req));
	INIT_LIST_HEAD(&req->node);

	if (ioctl(c->fd, BLKBSZGET, &req->blksize) < 0) {
		c->msgid = MSG_DIRECTIO_BLOCKSIZE;
		req->blksize = 4096;
	}
	if (req->blksize > 4096) {
		/*
		 * Sanity check for DASD; BSZGET is broken
		 */
		req->blksize = 4096;
	}
	if (!req->blksize)
		goto out;

	if (posix_memalign((void **)&req->buf, pgsize, req->blksize) != 0)
		goto out;

	flags = fcntl(c->fd, F_GETFL);
	if (flags < 0)
		goto out;
	if (!(flags & O_DIRECT)) {
		flags |= O_DIRECT;
		if (fcntl(c->fd, F_SETFL, flags) < 0)
			goto out;
		ct->reset_flags = 1;
	}

	/* Successfully initialized, return the context. */
	ct->req = req;
	c->context = (void *) ct;
	return 0;

out:
	if (req) {
		if (req->buf)
			free(req->buf);
		free(req);
	}
	if (ct->aio_grp)
		ct->aio_grp->holders--;
	free(ct);
	return 1;
}

void libcheck_free (struct checker * c)
{
	struct directio_context * ct = (struct directio_context *)c->context;
	struct io_event event;
	long flags;

	if (!ct)
		return;

	if (ct->reset_flags) {
		if ((flags = fcntl(c->fd, F_GETFL)) >= 0) {
			int ret __attribute__ ((unused));

			flags &= ~O_DIRECT;
			/* No point in checking for errors */
			ret = fcntl(c->fd, F_SETFL, flags);
		}
	}

	if (ct->running &&
	    (ct->req->state != PATH_PENDING ||
	     io_cancel(ct->aio_grp->ioctx, &ct->req->io, &event) == 0))
		ct->running = 0;
	if (!ct->running) {
		free(ct->req->buf);
		free(ct->req);
		ct->aio_grp->holders--;
	} else {
		ct->req->state = PATH_REMOVED;
		list_add(&ct->req->node, &ct->aio_grp->orphans);
		check_orphaned_group(ct->aio_grp);
	}

	free(ct);
	c->context = NULL;
}

static int
get_events(struct aio_group *aio_grp, struct timespec *timeout)
{
	struct io_event events[128];
	int i, nr, got_events = 0;
	struct timespec zero_timeout = {0};
	struct timespec *timep = timeout;

	do {
		errno = 0;
		nr = io_getevents(aio_grp->ioctx, 1, 128, events, timep);
		got_events |= (nr > 0);

		for (i = 0; i < nr; i++) {
			struct async_req *req = container_of(events[i].obj, struct async_req, io);

			LOG(3, "io finished %lu/%lu", events[i].res,
			    events[i].res2);

			/* got an orphaned request */
			if (req->state == PATH_REMOVED) {
				list_del(&req->node);
				free(req->buf);
				free(req);
				aio_grp->holders--;
			} else
				req->state = (events[i].res == req->blksize) ?
					      PATH_UP : PATH_DOWN;
		}
		timep = &zero_timeout;
	} while (nr == 128); /* assume there are more events and try again */

	if (nr < 0)
		LOG(3, "async io getevents returned %i (errno=%s)",
		    nr, strerror(errno));

	return got_events;
}

static int
check_state(int fd, struct directio_context *ct, int sync, int timeout_secs)
{
	struct timespec	timeout = { .tv_nsec = 1000 };
	struct stat	sb;
	int		rc;
	long		r;
	struct timespec currtime, endtime;

	if (fstat(fd, &sb) == 0) {
		LOG(4, "called for %x", (unsigned) sb.st_rdev);
	}
	if (sync > 0) {
		LOG(4, "called in synchronous mode");
		timeout.tv_sec  = timeout_secs;
		timeout.tv_nsec = 0;
	}

	if (ct->running) {
		if (ct->req->state != PATH_PENDING) {
			ct->running = 0;
			return ct->req->state;
		}
	} else {
		struct iocb *ios[1] = { &ct->req->io };

		LOG(3, "starting new request");
		memset(&ct->req->io, 0, sizeof(struct iocb));
		io_prep_pread(&ct->req->io, fd, ct->req->buf,
			      ct->req->blksize, 0);
		ct->req->state = PATH_PENDING;
		if (io_submit(ct->aio_grp->ioctx, 1, ios) != 1) {
			LOG(3, "io_submit error %i", errno);
			return PATH_UNCHECKED;
		}
	}
	ct->running++;

	get_monotonic_time(&endtime);
	endtime.tv_sec += timeout.tv_sec;
	endtime.tv_nsec += timeout.tv_nsec;
	normalize_timespec(&endtime);
	while(1) {
		r = get_events(ct->aio_grp, &timeout);

		if (ct->req->state != PATH_PENDING) {
			ct->running = 0;
			return ct->req->state;
		} else if (r == 0 ||
			   (timeout.tv_sec == 0 && timeout.tv_nsec == 0))
			break;

		get_monotonic_time(&currtime);
		timespecsub(&endtime, &currtime, &timeout);
		if (timeout.tv_sec < 0)
			timeout.tv_sec = timeout.tv_nsec = 0;
	}
	if (ct->running > timeout_secs || sync) {
		struct io_event event;

		LOG(3, "abort check on timeout");

		r = io_cancel(ct->aio_grp->ioctx, &ct->req->io, &event);
		/*
		 * Only reset ct->running if we really
		 * could abort the pending I/O
		 */
		if (!r)
			ct->running = 0;
		rc = PATH_DOWN;
	} else {
		LOG(3, "async io pending");
		rc = PATH_PENDING;
	}

	return rc;
}

int libcheck_check (struct checker * c)
{
	int ret;
	struct directio_context * ct = (struct directio_context *)c->context;

	if (!ct)
		return PATH_UNCHECKED;

	ret = check_state(c->fd, ct, checker_is_sync(c), c->timeout);

	switch (ret)
	{
	case PATH_UNCHECKED:
		c->msgid = MSG_DIRECTIO_UNKNOWN;
		break;
	case PATH_DOWN:
		c->msgid = CHECKER_MSGID_DOWN;
		break;
	case PATH_UP:
		c->msgid = CHECKER_MSGID_UP;
		break;
	case PATH_PENDING:
		c->msgid = MSG_DIRECTIO_PENDING;
		break;
	default:
		break;
	}
	return ret;
}