/* * Copyright (c) 2005 Hannes Reinecke, Suse */ #define _GNU_SOURCE #include #include #include #include #include #include #include #include #include #include #include #include #include "checkers.h" #include "../libmultipath/debug.h" #include "../libmultipath/time-util.h" #define AIO_GROUP_SIZE 1024 /* Note: This checker type relies on the fact that only one checker can be run * at a time, since multiple checkers share the same aio_group, and must be * able to modify other checker's async_reqs. If multple checkers become able * to be run at the same time, this checker will need to add locking, and * probably polling on event fds, to deal with that */ struct aio_group { struct list_head node; int holders; io_context_t ioctx; struct list_head orphans; }; struct async_req { struct iocb io; unsigned int blksize; unsigned char * buf; struct list_head node; int state; /* PATH_REMOVED means this is an orphan */ }; static LIST_HEAD(aio_grp_list); enum { MSG_DIRECTIO_UNKNOWN = CHECKER_FIRST_MSGID, MSG_DIRECTIO_PENDING, MSG_DIRECTIO_BLOCKSIZE, }; #define _IDX(x) (MSG_DIRECTIO_##x - CHECKER_FIRST_MSGID) const char *libcheck_msgtable[] = { [_IDX(UNKNOWN)] = " is not available", [_IDX(PENDING)] = " is waiting on aio", [_IDX(BLOCKSIZE)] = " cannot get blocksize, set default", NULL, }; #define LOG(prio, fmt, args...) condlog(prio, "directio: " fmt, ##args) struct directio_context { int running; int reset_flags; struct aio_group *aio_grp; struct async_req *req; }; static struct aio_group * add_aio_group(void) { struct aio_group *aio_grp; aio_grp = malloc(sizeof(struct aio_group)); if (!aio_grp) return NULL; memset(aio_grp, 0, sizeof(struct aio_group)); INIT_LIST_HEAD(&aio_grp->orphans); if (io_setup(AIO_GROUP_SIZE, &aio_grp->ioctx) != 0) { LOG(1, "io_setup failed"); if (errno == EAGAIN) LOG(1, "global number of io events too small. Increase fs.aio-max-nr with sysctl"); free(aio_grp); return NULL; } list_add(&aio_grp->node, &aio_grp_list); return aio_grp; } static int set_aio_group(struct directio_context *ct) { struct aio_group *aio_grp = NULL; list_for_each_entry(aio_grp, &aio_grp_list, node) if (aio_grp->holders < AIO_GROUP_SIZE) goto found; aio_grp = add_aio_group(); if (!aio_grp) { ct->aio_grp = NULL; return -1; } found: aio_grp->holders++; ct->aio_grp = aio_grp; return 0; } static void remove_aio_group(struct aio_group *aio_grp) { struct async_req *req, *tmp; io_destroy(aio_grp->ioctx); list_for_each_entry_safe(req, tmp, &aio_grp->orphans, node) { list_del(&req->node); free(req->buf); free(req); } list_del(&aio_grp->node); free(aio_grp); } /* If an aio_group is completely full of orphans, then no checkers can * use it, which means that no checkers can clear out the orphans. To * avoid keeping the useless group around, simply remove remove the * group */ static void check_orphaned_group(struct aio_group *aio_grp) { int count = 0; struct list_head *item; if (aio_grp->holders < AIO_GROUP_SIZE) return; list_for_each(item, &aio_grp->orphans) count++; if (count >= AIO_GROUP_SIZE) remove_aio_group(aio_grp); } void libcheck_reset (void) { struct aio_group *aio_grp, *tmp; list_for_each_entry_safe(aio_grp, tmp, &aio_grp_list, node) remove_aio_group(aio_grp); } int libcheck_init (struct checker * c) { unsigned long pgsize = getpagesize(); struct directio_context * ct; struct async_req *req = NULL; long flags; ct = malloc(sizeof(struct directio_context)); if (!ct) return 1; memset(ct, 0, sizeof(struct directio_context)); if (set_aio_group(ct) < 0) goto out; req = malloc(sizeof(struct async_req)); if (!req) { goto out; } memset(req, 0, sizeof(struct async_req)); INIT_LIST_HEAD(&req->node); if (ioctl(c->fd, BLKBSZGET, &req->blksize) < 0) { c->msgid = MSG_DIRECTIO_BLOCKSIZE; req->blksize = 4096; } if (req->blksize > 4096) { /* * Sanity check for DASD; BSZGET is broken */ req->blksize = 4096; } if (!req->blksize) goto out; if (posix_memalign((void **)&req->buf, pgsize, req->blksize) != 0) goto out; flags = fcntl(c->fd, F_GETFL); if (flags < 0) goto out; if (!(flags & O_DIRECT)) { flags |= O_DIRECT; if (fcntl(c->fd, F_SETFL, flags) < 0) goto out; ct->reset_flags = 1; } /* Successfully initialized, return the context. */ ct->req = req; c->context = (void *) ct; return 0; out: if (req) { if (req->buf) free(req->buf); free(req); } if (ct->aio_grp) ct->aio_grp->holders--; free(ct); return 1; } void libcheck_free (struct checker * c) { struct directio_context * ct = (struct directio_context *)c->context; struct io_event event; long flags; if (!ct) return; if (ct->reset_flags) { if ((flags = fcntl(c->fd, F_GETFL)) >= 0) { int ret __attribute__ ((unused)); flags &= ~O_DIRECT; /* No point in checking for errors */ ret = fcntl(c->fd, F_SETFL, flags); } } if (ct->running && (ct->req->state != PATH_PENDING || io_cancel(ct->aio_grp->ioctx, &ct->req->io, &event) == 0)) ct->running = 0; if (!ct->running) { free(ct->req->buf); free(ct->req); ct->aio_grp->holders--; } else { ct->req->state = PATH_REMOVED; list_add(&ct->req->node, &ct->aio_grp->orphans); check_orphaned_group(ct->aio_grp); } free(ct); c->context = NULL; } static int get_events(struct aio_group *aio_grp, struct timespec *timeout) { struct io_event events[128]; int i, nr, got_events = 0; struct timespec zero_timeout = {0}; struct timespec *timep = timeout; do { errno = 0; nr = io_getevents(aio_grp->ioctx, 1, 128, events, timep); got_events |= (nr > 0); for (i = 0; i < nr; i++) { struct async_req *req = container_of(events[i].obj, struct async_req, io); LOG(3, "io finished %lu/%lu", events[i].res, events[i].res2); /* got an orphaned request */ if (req->state == PATH_REMOVED) { list_del(&req->node); free(req->buf); free(req); aio_grp->holders--; } else req->state = (events[i].res == req->blksize) ? PATH_UP : PATH_DOWN; } timep = &zero_timeout; } while (nr == 128); /* assume there are more events and try again */ if (nr < 0) LOG(3, "async io getevents returned %i (errno=%s)", nr, strerror(errno)); return got_events; } static int check_state(int fd, struct directio_context *ct, int sync, int timeout_secs) { struct timespec timeout = { .tv_nsec = 1000 }; struct stat sb; int rc; long r; struct timespec currtime, endtime; if (fstat(fd, &sb) == 0) { LOG(4, "called for %x", (unsigned) sb.st_rdev); } if (sync > 0) { LOG(4, "called in synchronous mode"); timeout.tv_sec = timeout_secs; timeout.tv_nsec = 0; } if (ct->running) { if (ct->req->state != PATH_PENDING) { ct->running = 0; return ct->req->state; } } else { struct iocb *ios[1] = { &ct->req->io }; LOG(3, "starting new request"); memset(&ct->req->io, 0, sizeof(struct iocb)); io_prep_pread(&ct->req->io, fd, ct->req->buf, ct->req->blksize, 0); ct->req->state = PATH_PENDING; if (io_submit(ct->aio_grp->ioctx, 1, ios) != 1) { LOG(3, "io_submit error %i", errno); return PATH_UNCHECKED; } } ct->running++; get_monotonic_time(&endtime); endtime.tv_sec += timeout.tv_sec; endtime.tv_nsec += timeout.tv_nsec; normalize_timespec(&endtime); while(1) { r = get_events(ct->aio_grp, &timeout); if (ct->req->state != PATH_PENDING) { ct->running = 0; return ct->req->state; } else if (r == 0 || (timeout.tv_sec == 0 && timeout.tv_nsec == 0)) break; get_monotonic_time(&currtime); timespecsub(&endtime, &currtime, &timeout); if (timeout.tv_sec < 0) timeout.tv_sec = timeout.tv_nsec = 0; } if (ct->running > timeout_secs || sync) { struct io_event event; LOG(3, "abort check on timeout"); r = io_cancel(ct->aio_grp->ioctx, &ct->req->io, &event); /* * Only reset ct->running if we really * could abort the pending I/O */ if (!r) ct->running = 0; rc = PATH_DOWN; } else { LOG(3, "async io pending"); rc = PATH_PENDING; } return rc; } int libcheck_check (struct checker * c) { int ret; struct directio_context * ct = (struct directio_context *)c->context; if (!ct) return PATH_UNCHECKED; ret = check_state(c->fd, ct, checker_is_sync(c), c->timeout); switch (ret) { case PATH_UNCHECKED: c->msgid = MSG_DIRECTIO_UNKNOWN; break; case PATH_DOWN: c->msgid = CHECKER_MSGID_DOWN; break; case PATH_UP: c->msgid = CHECKER_MSGID_UP; break; case PATH_PENDING: c->msgid = MSG_DIRECTIO_PENDING; break; default: break; } return ret; }