Blob Blame History Raw
/*
  Copyright (c) 2008-2012 Red Hat, Inc. <http://www.redhat.com>
  This file is part of GlusterFS.

  This file is licensed to you under your choice of the GNU Lesser
  General Public License, version 3 or any later version (LGPLv3 or
  later), or the GNU General Public License, version 2 (GPLv2), in all
  cases as published by the Free Software Foundation.
*/

#include <glusterfs/call-stub.h>
#include <glusterfs/defaults.h>
#include <glusterfs/glusterfs.h>
#include <glusterfs/logging.h>
#include <glusterfs/dict.h>
#include <glusterfs/xlator.h>
#include "io-threads.h"
#include <signal.h>
#include <stdlib.h>
#include <sys/time.h>
#include <time.h>
#include <glusterfs/locking.h>
#include "io-threads-messages.h"
#include <glusterfs/timespec.h>

void *
iot_worker(void *arg);
int
iot_workers_scale(iot_conf_t *conf);
int
__iot_workers_scale(iot_conf_t *conf);
struct volume_options options[];

#define IOT_FOP(name, frame, this, args...)                                    \
    do {                                                                       \
        call_stub_t *__stub = NULL;                                            \
        int __ret = -1;                                                        \
                                                                               \
        __stub = fop_##name##_stub(frame, default_##name##_resume, args);      \
        if (!__stub) {                                                         \
            __ret = -ENOMEM;                                                   \
            goto out;                                                          \
        }                                                                      \
                                                                               \
        __ret = iot_schedule(frame, this, __stub);                             \
                                                                               \
    out:                                                                       \
        if (__ret < 0) {                                                       \
            default_##name##_failure_cbk(frame, -__ret);                       \
            if (__stub != NULL) {                                              \
                call_stub_destroy(__stub);                                     \
            }                                                                  \
        }                                                                      \
    } while (0)

iot_client_ctx_t *
iot_get_ctx(xlator_t *this, client_t *client)
{
    iot_client_ctx_t *ctx = NULL;
    iot_client_ctx_t *setted_ctx = NULL;
    int i;

    if (client_ctx_get(client, this, (void **)&ctx) != 0) {
        ctx = GF_CALLOC(GF_FOP_PRI_MAX, sizeof(*ctx), gf_iot_mt_client_ctx_t);
        if (ctx) {
            for (i = 0; i < GF_FOP_PRI_MAX; ++i) {
                INIT_LIST_HEAD(&ctx[i].clients);
                INIT_LIST_HEAD(&ctx[i].reqs);
            }
            setted_ctx = client_ctx_set(client, this, ctx);
            if (ctx != setted_ctx) {
                GF_FREE(ctx);
                ctx = setted_ctx;
            }
        }
    }

    return ctx;
}

call_stub_t *
__iot_dequeue(iot_conf_t *conf, int *pri)
{
    call_stub_t *stub = NULL;
    int i = 0;
    iot_client_ctx_t *ctx;

    *pri = -1;
    for (i = 0; i < GF_FOP_PRI_MAX; i++) {
        if (conf->ac_iot_count[i] >= conf->ac_iot_limit[i]) {
            continue;
        }

        if (list_empty(&conf->clients[i])) {
            continue;
        }

        /* Get the first per-client queue for this priority. */
        ctx = list_first_entry(&conf->clients[i], iot_client_ctx_t, clients);
        if (!ctx) {
            continue;
        }

        if (list_empty(&ctx->reqs)) {
            continue;
        }

        /* Get the first request on that queue. */
        stub = list_first_entry(&ctx->reqs, call_stub_t, list);
        list_del_init(&stub->list);
        if (list_empty(&ctx->reqs)) {
            list_del_init(&ctx->clients);
        } else {
            list_rotate_left(&conf->clients[i]);
        }

        conf->ac_iot_count[i]++;
        conf->queue_marked[i] = _gf_false;
        *pri = i;
        break;
    }

    if (!stub)
        return NULL;

    conf->queue_size--;
    conf->queue_sizes[*pri]--;

    return stub;
}

void
__iot_enqueue(iot_conf_t *conf, call_stub_t *stub, int pri)
{
    client_t *client = stub->frame->root->client;
    iot_client_ctx_t *ctx;

    if (pri < 0 || pri >= GF_FOP_PRI_MAX)
        pri = GF_FOP_PRI_MAX - 1;

    if (client) {
        ctx = iot_get_ctx(THIS, client);
        if (ctx) {
            ctx = &ctx[pri];
        }
    } else {
        ctx = NULL;
    }
    if (!ctx) {
        ctx = &conf->no_client[pri];
    }

    if (list_empty(&ctx->reqs)) {
        list_add_tail(&ctx->clients, &conf->clients[pri]);
    }
    list_add_tail(&stub->list, &ctx->reqs);

    conf->queue_size++;
    GF_ATOMIC_INC(conf->stub_cnt);
    conf->queue_sizes[pri]++;
}

void *
iot_worker(void *data)
{
    iot_conf_t *conf = NULL;
    xlator_t *this = NULL;
    call_stub_t *stub = NULL;
    struct timespec sleep_till = {
        0,
    };
    int ret = 0;
    int pri = -1;
    gf_boolean_t bye = _gf_false;

    conf = data;
    this = conf->this;
    THIS = this;

    for (;;) {
        pthread_mutex_lock(&conf->mutex);
        {
            if (pri != -1) {
                conf->ac_iot_count[pri]--;
                pri = -1;
            }
            while (conf->queue_size == 0) {
                if (conf->down) {
                    bye = _gf_true; /*Avoid sleep*/
                    break;
                }

                clock_gettime(CLOCK_REALTIME_COARSE, &sleep_till);
                sleep_till.tv_sec += conf->idle_time;

                conf->sleep_count++;
                ret = pthread_cond_timedwait(&conf->cond, &conf->mutex,
                                             &sleep_till);
                conf->sleep_count--;

                if (conf->down || ret == ETIMEDOUT) {
                    bye = _gf_true;
                    break;
                }
            }

            if (bye) {
                if (conf->down || conf->curr_count > IOT_MIN_THREADS) {
                    conf->curr_count--;
                    if (conf->curr_count == 0)
                        pthread_cond_broadcast(&conf->cond);
                    gf_msg_debug(conf->this->name, 0,
                                 "terminated. "
                                 "conf->curr_count=%d",
                                 conf->curr_count);
                } else {
                    bye = _gf_false;
                }
            }

            if (!bye)
                stub = __iot_dequeue(conf, &pri);
        }
        pthread_mutex_unlock(&conf->mutex);

        if (stub) { /* guard against spurious wakeups */
            if (stub->poison) {
                gf_log(this->name, GF_LOG_INFO, "Dropping poisoned request %p.",
                       stub);
                call_stub_destroy(stub);
            } else {
                call_resume(stub);
            }
            GF_ATOMIC_DEC(conf->stub_cnt);
        }
        stub = NULL;

        if (bye)
            break;
    }

    return NULL;
}

int
do_iot_schedule(iot_conf_t *conf, call_stub_t *stub, int pri)
{
    int ret = 0;

    pthread_mutex_lock(&conf->mutex);
    {
        __iot_enqueue(conf, stub, pri);

        pthread_cond_signal(&conf->cond);

        ret = __iot_workers_scale(conf);
    }
    pthread_mutex_unlock(&conf->mutex);

    return ret;
}

char *
iot_get_pri_meaning(gf_fop_pri_t pri)
{
    char *name = NULL;
    switch (pri) {
        case GF_FOP_PRI_HI:
            name = "fast";
            break;
        case GF_FOP_PRI_NORMAL:
            name = "normal";
            break;
        case GF_FOP_PRI_LO:
            name = "slow";
            break;
        case GF_FOP_PRI_LEAST:
            name = "least";
            break;
        case GF_FOP_PRI_MAX:
            name = "invalid";
            break;
        case GF_FOP_PRI_UNSPEC:
            name = "unspecified";
            break;
    }
    return name;
}

int
iot_schedule(call_frame_t *frame, xlator_t *this, call_stub_t *stub)
{
    int ret = -1;
    gf_fop_pri_t pri = GF_FOP_PRI_MAX - 1;
    iot_conf_t *conf = this->private;

    if ((frame->root->pid < GF_CLIENT_PID_MAX) &&
        (frame->root->pid != GF_CLIENT_PID_NO_ROOT_SQUASH) &&
        conf->least_priority) {
        pri = GF_FOP_PRI_LEAST;
        goto out;
    }

    switch (stub->fop) {
        case GF_FOP_OPEN:
        case GF_FOP_STAT:
        case GF_FOP_FSTAT:
        case GF_FOP_LOOKUP:
        case GF_FOP_ACCESS:
        case GF_FOP_READLINK:
        case GF_FOP_OPENDIR:
        case GF_FOP_STATFS:
        case GF_FOP_READDIR:
        case GF_FOP_READDIRP:
        case GF_FOP_GETACTIVELK:
        case GF_FOP_SETACTIVELK:
        case GF_FOP_ICREATE:
        case GF_FOP_NAMELINK:
            pri = GF_FOP_PRI_HI;
            break;

        case GF_FOP_CREATE:
        case GF_FOP_FLUSH:
        case GF_FOP_LK:
        case GF_FOP_INODELK:
        case GF_FOP_FINODELK:
        case GF_FOP_ENTRYLK:
        case GF_FOP_FENTRYLK:
        case GF_FOP_LEASE:
        case GF_FOP_UNLINK:
        case GF_FOP_SETATTR:
        case GF_FOP_FSETATTR:
        case GF_FOP_MKNOD:
        case GF_FOP_MKDIR:
        case GF_FOP_RMDIR:
        case GF_FOP_SYMLINK:
        case GF_FOP_RENAME:
        case GF_FOP_LINK:
        case GF_FOP_SETXATTR:
        case GF_FOP_GETXATTR:
        case GF_FOP_FGETXATTR:
        case GF_FOP_FSETXATTR:
        case GF_FOP_REMOVEXATTR:
        case GF_FOP_FREMOVEXATTR:
        case GF_FOP_PUT:
            pri = GF_FOP_PRI_NORMAL;
            break;

        case GF_FOP_READ:
        case GF_FOP_WRITE:
        case GF_FOP_FSYNC:
        case GF_FOP_TRUNCATE:
        case GF_FOP_FTRUNCATE:
        case GF_FOP_FSYNCDIR:
        case GF_FOP_XATTROP:
        case GF_FOP_FXATTROP:
        case GF_FOP_RCHECKSUM:
        case GF_FOP_FALLOCATE:
        case GF_FOP_DISCARD:
        case GF_FOP_ZEROFILL:
        case GF_FOP_SEEK:
            pri = GF_FOP_PRI_LO;
            break;

        case GF_FOP_FORGET:
        case GF_FOP_RELEASE:
        case GF_FOP_RELEASEDIR:
        case GF_FOP_GETSPEC:
            break;
        case GF_FOP_IPC:
        default:
            return -EINVAL;
    }
out:
    gf_msg_debug(this->name, 0, "%s scheduled as %s priority fop",
                 gf_fop_list[stub->fop], iot_get_pri_meaning(pri));
    if (this->private)
        ret = do_iot_schedule(this->private, stub, pri);
    return ret;
}

int
iot_lookup(call_frame_t *frame, xlator_t *this, loc_t *loc, dict_t *xdata)
{
    IOT_FOP(lookup, frame, this, loc, xdata);
    return 0;
}

int
iot_setattr(call_frame_t *frame, xlator_t *this, loc_t *loc, struct iatt *stbuf,
            int32_t valid, dict_t *xdata)
{
    IOT_FOP(setattr, frame, this, loc, stbuf, valid, xdata);
    return 0;
}

int
iot_fsetattr(call_frame_t *frame, xlator_t *this, fd_t *fd, struct iatt *stbuf,
             int32_t valid, dict_t *xdata)
{
    IOT_FOP(fsetattr, frame, this, fd, stbuf, valid, xdata);
    return 0;
}

int
iot_access(call_frame_t *frame, xlator_t *this, loc_t *loc, int32_t mask,
           dict_t *xdata)
{
    IOT_FOP(access, frame, this, loc, mask, xdata);
    return 0;
}

int
iot_readlink(call_frame_t *frame, xlator_t *this, loc_t *loc, size_t size,
             dict_t *xdata)
{
    IOT_FOP(readlink, frame, this, loc, size, xdata);
    return 0;
}

int
iot_mknod(call_frame_t *frame, xlator_t *this, loc_t *loc, mode_t mode,
          dev_t rdev, mode_t umask, dict_t *xdata)
{
    IOT_FOP(mknod, frame, this, loc, mode, rdev, umask, xdata);
    return 0;
}

int
iot_mkdir(call_frame_t *frame, xlator_t *this, loc_t *loc, mode_t mode,
          mode_t umask, dict_t *xdata)
{
    IOT_FOP(mkdir, frame, this, loc, mode, umask, xdata);
    return 0;
}

int
iot_rmdir(call_frame_t *frame, xlator_t *this, loc_t *loc, int flags,
          dict_t *xdata)
{
    IOT_FOP(rmdir, frame, this, loc, flags, xdata);
    return 0;
}

int
iot_symlink(call_frame_t *frame, xlator_t *this, const char *linkname,
            loc_t *loc, mode_t umask, dict_t *xdata)
{
    IOT_FOP(symlink, frame, this, linkname, loc, umask, xdata);
    return 0;
}

int
iot_rename(call_frame_t *frame, xlator_t *this, loc_t *oldloc, loc_t *newloc,
           dict_t *xdata)
{
    IOT_FOP(rename, frame, this, oldloc, newloc, xdata);
    return 0;
}

int
iot_open(call_frame_t *frame, xlator_t *this, loc_t *loc, int32_t flags,
         fd_t *fd, dict_t *xdata)
{
    IOT_FOP(open, frame, this, loc, flags, fd, xdata);
    return 0;
}

int
iot_create(call_frame_t *frame, xlator_t *this, loc_t *loc, int32_t flags,
           mode_t mode, mode_t umask, fd_t *fd, dict_t *xdata)
{
    IOT_FOP(create, frame, this, loc, flags, mode, umask, fd, xdata);
    return 0;
}

int
iot_put(call_frame_t *frame, xlator_t *this, loc_t *loc, mode_t mode,
        mode_t umask, uint32_t flags, struct iovec *vector, int32_t count,
        off_t offset, struct iobref *iobref, dict_t *xattr, dict_t *xdata)
{
    IOT_FOP(put, frame, this, loc, mode, umask, flags, vector, count, offset,
            iobref, xattr, xdata);
    return 0;
}

int
iot_readv(call_frame_t *frame, xlator_t *this, fd_t *fd, size_t size,
          off_t offset, uint32_t flags, dict_t *xdata)
{
    IOT_FOP(readv, frame, this, fd, size, offset, flags, xdata);
    return 0;
}

int
iot_flush(call_frame_t *frame, xlator_t *this, fd_t *fd, dict_t *xdata)
{
    IOT_FOP(flush, frame, this, fd, xdata);
    return 0;
}

int
iot_fsync(call_frame_t *frame, xlator_t *this, fd_t *fd, int32_t datasync,
          dict_t *xdata)
{
    IOT_FOP(fsync, frame, this, fd, datasync, xdata);
    return 0;
}

int
iot_writev(call_frame_t *frame, xlator_t *this, fd_t *fd, struct iovec *vector,
           int32_t count, off_t offset, uint32_t flags, struct iobref *iobref,
           dict_t *xdata)
{
    IOT_FOP(writev, frame, this, fd, vector, count, offset, flags, iobref,
            xdata);
    return 0;
}

int
iot_lk(call_frame_t *frame, xlator_t *this, fd_t *fd, int32_t cmd,
       struct gf_flock *flock, dict_t *xdata)
{
    IOT_FOP(lk, frame, this, fd, cmd, flock, xdata);
    return 0;
}

int
iot_stat(call_frame_t *frame, xlator_t *this, loc_t *loc, dict_t *xdata)
{
    IOT_FOP(stat, frame, this, loc, xdata);
    return 0;
}

int
iot_fstat(call_frame_t *frame, xlator_t *this, fd_t *fd, dict_t *xdata)
{
    IOT_FOP(fstat, frame, this, fd, xdata);
    return 0;
}

int
iot_truncate(call_frame_t *frame, xlator_t *this, loc_t *loc, off_t offset,
             dict_t *xdata)
{
    IOT_FOP(truncate, frame, this, loc, offset, xdata);
    return 0;
}

int
iot_ftruncate(call_frame_t *frame, xlator_t *this, fd_t *fd, off_t offset,
              dict_t *xdata)
{
    IOT_FOP(ftruncate, frame, this, fd, offset, xdata);
    return 0;
}

int
iot_unlink(call_frame_t *frame, xlator_t *this, loc_t *loc, int32_t xflag,
           dict_t *xdata)
{
    IOT_FOP(unlink, frame, this, loc, xflag, xdata);
    return 0;
}

int
iot_link(call_frame_t *frame, xlator_t *this, loc_t *oldloc, loc_t *newloc,
         dict_t *xdata)
{
    IOT_FOP(link, frame, this, oldloc, newloc, xdata);
    return 0;
}

int
iot_opendir(call_frame_t *frame, xlator_t *this, loc_t *loc, fd_t *fd,
            dict_t *xdata)
{
    IOT_FOP(opendir, frame, this, loc, fd, xdata);
    return 0;
}

int
iot_fsyncdir(call_frame_t *frame, xlator_t *this, fd_t *fd, int datasync,
             dict_t *xdata)
{
    IOT_FOP(fsyncdir, frame, this, fd, datasync, xdata);
    return 0;
}

int
iot_statfs(call_frame_t *frame, xlator_t *this, loc_t *loc, dict_t *xdata)
{
    IOT_FOP(statfs, frame, this, loc, xdata);
    return 0;
}

int
iot_setxattr(call_frame_t *frame, xlator_t *this, loc_t *loc, dict_t *dict,
             int32_t flags, dict_t *xdata)
{
    IOT_FOP(setxattr, frame, this, loc, dict, flags, xdata);
    return 0;
}

int
iot_getxattr(call_frame_t *frame, xlator_t *this, loc_t *loc, const char *name,
             dict_t *xdata)
{
    iot_conf_t *conf = NULL;
    dict_t *depths = NULL;
    int i = 0;
    int32_t op_ret = 0;
    int32_t op_errno = 0;

    conf = this->private;

    if (name && strcmp(name, IO_THREADS_QUEUE_SIZE_KEY) == 0) {
        /*
         * We explicitly do not want a reference count
         * for this dict in this translator
         */
        depths = dict_new();
        if (!depths) {
            op_ret = -1;
            op_errno = ENOMEM;
            goto unwind_special_getxattr;
        }

        for (i = 0; i < GF_FOP_PRI_MAX; i++) {
            if (dict_set_int32(depths, (char *)fop_pri_to_string(i),
                               conf->queue_sizes[i]) != 0) {
                dict_unref(depths);
                depths = NULL;
                goto unwind_special_getxattr;
            }
        }

    unwind_special_getxattr:
        STACK_UNWIND_STRICT(getxattr, frame, op_ret, op_errno, depths, xdata);
        if (depths)
            dict_unref(depths);
        return 0;
    }

    IOT_FOP(getxattr, frame, this, loc, name, xdata);
    return 0;
}

int
iot_fgetxattr(call_frame_t *frame, xlator_t *this, fd_t *fd, const char *name,
              dict_t *xdata)
{
    IOT_FOP(fgetxattr, frame, this, fd, name, xdata);
    return 0;
}

int
iot_fsetxattr(call_frame_t *frame, xlator_t *this, fd_t *fd, dict_t *dict,
              int32_t flags, dict_t *xdata)
{
    IOT_FOP(fsetxattr, frame, this, fd, dict, flags, xdata);
    return 0;
}

int
iot_removexattr(call_frame_t *frame, xlator_t *this, loc_t *loc,
                const char *name, dict_t *xdata)
{
    IOT_FOP(removexattr, frame, this, loc, name, xdata);
    return 0;
}

int
iot_fremovexattr(call_frame_t *frame, xlator_t *this, fd_t *fd,
                 const char *name, dict_t *xdata)
{
    IOT_FOP(fremovexattr, frame, this, fd, name, xdata);
    return 0;
}

int
iot_readdirp(call_frame_t *frame, xlator_t *this, fd_t *fd, size_t size,
             off_t offset, dict_t *xdata)
{
    IOT_FOP(readdirp, frame, this, fd, size, offset, xdata);
    return 0;
}

int
iot_readdir(call_frame_t *frame, xlator_t *this, fd_t *fd, size_t size,
            off_t offset, dict_t *xdata)
{
    IOT_FOP(readdir, frame, this, fd, size, offset, xdata);
    return 0;
}

int
iot_inodelk(call_frame_t *frame, xlator_t *this, const char *volume, loc_t *loc,
            int32_t cmd, struct gf_flock *lock, dict_t *xdata)
{
    IOT_FOP(inodelk, frame, this, volume, loc, cmd, lock, xdata);
    return 0;
}

int
iot_finodelk(call_frame_t *frame, xlator_t *this, const char *volume, fd_t *fd,
             int32_t cmd, struct gf_flock *lock, dict_t *xdata)
{
    IOT_FOP(finodelk, frame, this, volume, fd, cmd, lock, xdata);
    return 0;
}

int
iot_entrylk(call_frame_t *frame, xlator_t *this, const char *volume, loc_t *loc,
            const char *basename, entrylk_cmd cmd, entrylk_type type,
            dict_t *xdata)
{
    IOT_FOP(entrylk, frame, this, volume, loc, basename, cmd, type, xdata);
    return 0;
}

int
iot_fentrylk(call_frame_t *frame, xlator_t *this, const char *volume, fd_t *fd,
             const char *basename, entrylk_cmd cmd, entrylk_type type,
             dict_t *xdata)
{
    IOT_FOP(fentrylk, frame, this, volume, fd, basename, cmd, type, xdata);
    return 0;
}

int
iot_xattrop(call_frame_t *frame, xlator_t *this, loc_t *loc,
            gf_xattrop_flags_t optype, dict_t *xattr, dict_t *xdata)
{
    IOT_FOP(xattrop, frame, this, loc, optype, xattr, xdata);
    return 0;
}

int
iot_fxattrop(call_frame_t *frame, xlator_t *this, fd_t *fd,
             gf_xattrop_flags_t optype, dict_t *xattr, dict_t *xdata)
{
    IOT_FOP(fxattrop, frame, this, fd, optype, xattr, xdata);
    return 0;
}

int32_t
iot_rchecksum(call_frame_t *frame, xlator_t *this, fd_t *fd, off_t offset,
              int32_t len, dict_t *xdata)
{
    IOT_FOP(rchecksum, frame, this, fd, offset, len, xdata);
    return 0;
}

int
iot_fallocate(call_frame_t *frame, xlator_t *this, fd_t *fd, int32_t mode,
              off_t offset, size_t len, dict_t *xdata)
{
    IOT_FOP(fallocate, frame, this, fd, mode, offset, len, xdata);
    return 0;
}

int
iot_discard(call_frame_t *frame, xlator_t *this, fd_t *fd, off_t offset,
            size_t len, dict_t *xdata)
{
    IOT_FOP(discard, frame, this, fd, offset, len, xdata);
    return 0;
}

int
iot_zerofill(call_frame_t *frame, xlator_t *this, fd_t *fd, off_t offset,
             off_t len, dict_t *xdata)
{
    IOT_FOP(zerofill, frame, this, fd, offset, len, xdata);
    return 0;
}

int
iot_seek(call_frame_t *frame, xlator_t *this, fd_t *fd, off_t offset,
         gf_seek_what_t what, dict_t *xdata)
{
    IOT_FOP(seek, frame, this, fd, offset, what, xdata);
    return 0;
}

int
iot_lease(call_frame_t *frame, xlator_t *this, loc_t *loc,
          struct gf_lease *lease, dict_t *xdata)
{
    IOT_FOP(lease, frame, this, loc, lease, xdata);
    return 0;
}

int
iot_getactivelk(call_frame_t *frame, xlator_t *this, loc_t *loc, dict_t *xdata)
{
    IOT_FOP(getactivelk, frame, this, loc, xdata);
    return 0;
}

int
iot_setactivelk(call_frame_t *frame, xlator_t *this, loc_t *loc,
                lock_migration_info_t *locklist, dict_t *xdata)
{
    IOT_FOP(setactivelk, frame, this, loc, locklist, xdata);
    return 0;
}

int
__iot_workers_scale(iot_conf_t *conf)
{
    int scale = 0;
    int diff = 0;
    pthread_t thread;
    int ret = 0;
    int i = 0;

    for (i = 0; i < GF_FOP_PRI_MAX; i++)
        scale += min(conf->queue_sizes[i], conf->ac_iot_limit[i]);

    if (scale < IOT_MIN_THREADS)
        scale = IOT_MIN_THREADS;

    if (scale > conf->max_count)
        scale = conf->max_count;

    if (conf->curr_count < scale) {
        diff = scale - conf->curr_count;
    }

    while (diff) {
        diff--;

        ret = gf_thread_create(&thread, &conf->w_attr, iot_worker, conf,
                               "iotwr%03hx", conf->curr_count & 0x3ff);
        if (ret == 0) {
            conf->curr_count++;
            gf_msg_debug(conf->this->name, 0,
                         "scaled threads to %d (queue_size=%d/%d)",
                         conf->curr_count, conf->queue_size, scale);
        } else {
            break;
        }
    }

    return diff;
}

int
iot_workers_scale(iot_conf_t *conf)
{
    int ret = -1;

    if (conf == NULL) {
        ret = -EINVAL;
        goto out;
    }

    pthread_mutex_lock(&conf->mutex);
    {
        ret = __iot_workers_scale(conf);
    }
    pthread_mutex_unlock(&conf->mutex);

out:
    return ret;
}

int
set_stack_size(iot_conf_t *conf)
{
    int err = 0;
    size_t stacksize = IOT_THREAD_STACK_SIZE;
    xlator_t *this = NULL;

    this = THIS;

    err = pthread_attr_init(&conf->w_attr);
    if (err != 0) {
        gf_msg(this->name, GF_LOG_ERROR, err, IO_THREADS_MSG_INIT_FAILED,
               "Thread attribute initialization failed");
        return err;
    }

    err = pthread_attr_setstacksize(&conf->w_attr, stacksize);
    if (err == EINVAL) {
        err = pthread_attr_getstacksize(&conf->w_attr, &stacksize);
        if (!err) {
            gf_msg(this->name, GF_LOG_WARNING, 0, IO_THREADS_MSG_SIZE_NOT_SET,
                   "Using default thread stack size %zd", stacksize);
        } else {
            gf_msg(this->name, GF_LOG_WARNING, 0, IO_THREADS_MSG_SIZE_NOT_SET,
                   "Using default thread stack size");
            err = 0;
        }
    }

    conf->stack_size = stacksize;
    return err;
}

int32_t
mem_acct_init(xlator_t *this)
{
    int ret = -1;

    if (!this)
        return ret;

    ret = xlator_mem_acct_init(this, gf_iot_mt_end + 1);

    if (ret != 0) {
        gf_msg(this->name, GF_LOG_ERROR, ENOMEM, IO_THREADS_MSG_NO_MEMORY,
               "Memory accounting init failed");
        return ret;
    }

    return ret;
}

int
iot_priv_dump(xlator_t *this)
{
    iot_conf_t *conf = NULL;
    char key_prefix[GF_DUMP_MAX_BUF_LEN];
    char key[GF_DUMP_MAX_BUF_LEN];
    int i = 0;

    if (!this)
        return 0;

    conf = this->private;
    if (!conf)
        return 0;

    snprintf(key_prefix, GF_DUMP_MAX_BUF_LEN, "%s.%s", this->type, this->name);

    gf_proc_dump_add_section("%s", key_prefix);

    gf_proc_dump_write("maximum_threads_count", "%d", conf->max_count);
    gf_proc_dump_write("current_threads_count", "%d", conf->curr_count);
    gf_proc_dump_write("sleep_count", "%d", conf->sleep_count);
    gf_proc_dump_write("idle_time", "%d", conf->idle_time);
    gf_proc_dump_write("stack_size", "%zd", conf->stack_size);
    gf_proc_dump_write("max_high_priority_threads", "%d",
                       conf->ac_iot_limit[GF_FOP_PRI_HI]);
    gf_proc_dump_write("max_normal_priority_threads", "%d",
                       conf->ac_iot_limit[GF_FOP_PRI_NORMAL]);
    gf_proc_dump_write("max_low_priority_threads", "%d",
                       conf->ac_iot_limit[GF_FOP_PRI_LO]);
    gf_proc_dump_write("max_least_priority_threads", "%d",
                       conf->ac_iot_limit[GF_FOP_PRI_LEAST]);
    gf_proc_dump_write("current_high_priority_threads", "%d",
                       conf->ac_iot_count[GF_FOP_PRI_HI]);
    gf_proc_dump_write("current_normal_priority_threads", "%d",
                       conf->ac_iot_count[GF_FOP_PRI_NORMAL]);
    gf_proc_dump_write("current_low_priority_threads", "%d",
                       conf->ac_iot_count[GF_FOP_PRI_LO]);
    gf_proc_dump_write("current_least_priority_threads", "%d",
                       conf->ac_iot_count[GF_FOP_PRI_LEAST]);
    for (i = 0; i < GF_FOP_PRI_MAX; i++) {
        if (!conf->queue_sizes[i])
            continue;
        snprintf(key, sizeof(key), "%s_priority_queue_length",
                 iot_get_pri_meaning(i));
        gf_proc_dump_write(key, "%d", conf->queue_sizes[i]);
    }

    return 0;
}

/*
 * We use a decay model to keep track and make sure we're not spawning new
 * threads too often.  Each increment adds a large value to a counter, and that
 * counter keeps ticking back down to zero over a fairly long period.  For
 * example, let's use ONE_WEEK=604800 seconds, and we want to detect when we
 * have N=3 increments during that time.  Thus, our threshold is
 * (N-1)*ONE_WEEK.  To see how it works, look at three examples.
 *
 *   (a) Two events close together, then one more almost a week later.  The
 *   first two events push our counter to 2*ONE_WEEK plus a bit.  At the third
 *   event, we decay down to ONE_WEEK plus a bit and then add ONE_WEEK for the
 *   new event, exceeding our threshold.
 *
 *   (b) One event, then two more almost a week later.  At the time of the
 *   second and third events, the counter is already non-zero, so when we add
 *   2*ONE_WEEK we exceed again.
 *
 *   (c) Three events, spaced three days apart.  At the time of the second
 *   event, we decay down to approxitely ONE_WEEK*4/7 and then add another
 *   ONE_WEEK.  At the third event, we decay again down to ONE_WEEK*8/7 and add
 *   another ONE_WEEK, so boom.
 *
 * Note that in all three cases if that last event came a day later our counter
 * would have decayed a bit more and we would *not* exceed our threshold.  It's
 * not exactly the same as a precise "three in one week" limit, but it's very
 * close and it allows the same kind of tweaking while requiring only constant
 * space - no arrays of variable length N to allocate or maintain.  All we need
 * (for each queue) is the value plus the time of the last update.
 */

typedef struct {
    uint32_t value;
    time_t update_time;
} threshold_t;
/*
 * Variables so that I can hack these for testing.
 * TBD: make these tunable?
 */
static uint32_t THRESH_SECONDS = 604800;
static uint32_t THRESH_EVENTS = 3;
static uint32_t THRESH_LIMIT = 1209600; /* SECONDS * (EVENTS-1) */

static void
iot_apply_event(xlator_t *this, threshold_t *thresh)
{
    struct timespec now;
    time_t delta;

    /* Refresh for manual testing/debugging.  It's cheap. */
    THRESH_LIMIT = THRESH_SECONDS * (THRESH_EVENTS - 1);

    timespec_now(&now);

    if (thresh->value && thresh->update_time) {
        delta = now.tv_sec - thresh->update_time;
        /* Be careful about underflow. */
        if (thresh->value <= delta) {
            thresh->value = 0;
        } else {
            thresh->value -= delta;
        }
    }

    thresh->value += THRESH_SECONDS;
    if (thresh->value >= THRESH_LIMIT) {
        gf_log(this->name, GF_LOG_EMERG, "watchdog firing too often");
        /*
         * The default action for SIGTRAP is to dump core, but the fact
         * that it's distinct from other signals we use means that
         * there are other possibilities as well (e.g. drop into gdb or
         * invoke a special handler).
         */
        kill(getpid(), SIGTRAP);
    }

    thresh->update_time = now.tv_sec;
}

static void *
iot_watchdog(void *arg)
{
    xlator_t *this = arg;
    iot_conf_t *priv = this->private;
    int i;
    int bad_times[GF_FOP_PRI_MAX] = {
        0,
    };
    threshold_t thresholds[GF_FOP_PRI_MAX] = {{
        0,
    }};

    for (;;) {
        sleep(max(priv->watchdog_secs / 5, 1));
        pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, NULL);
        pthread_mutex_lock(&priv->mutex);
        for (i = 0; i < GF_FOP_PRI_MAX; ++i) {
            if (priv->queue_marked[i]) {
                if (++bad_times[i] >= 5) {
                    gf_log(this->name, GF_LOG_WARNING, "queue %d stalled", i);
                    iot_apply_event(this, &thresholds[i]);
                    /*
                     * We might not get here if the event
                     * put us over our threshold.
                     */
                    ++(priv->ac_iot_limit[i]);
                    bad_times[i] = 0;
                }
            } else {
                bad_times[i] = 0;
            }
            priv->queue_marked[i] = (priv->queue_sizes[i] > 0);
        }
        pthread_mutex_unlock(&priv->mutex);
        pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, NULL);
    }

    /* NOTREACHED */
    return NULL;
}

static void
start_iot_watchdog(xlator_t *this)
{
    iot_conf_t *priv = this->private;
    int ret;

    if (priv->watchdog_running) {
        return;
    }

    ret = pthread_create(&priv->watchdog_thread, NULL, iot_watchdog, this);
    if (ret == 0) {
        priv->watchdog_running = _gf_true;
    } else {
        gf_log(this->name, GF_LOG_WARNING,
               "pthread_create(iot_watchdog) failed");
    }
}

static void
stop_iot_watchdog(xlator_t *this)
{
    iot_conf_t *priv = this->private;

    if (!priv->watchdog_running) {
        return;
    }

    if (pthread_cancel(priv->watchdog_thread) != 0) {
        gf_log(this->name, GF_LOG_WARNING,
               "pthread_cancel(iot_watchdog) failed");
    }

    if (pthread_join(priv->watchdog_thread, NULL) != 0) {
        gf_log(this->name, GF_LOG_WARNING, "pthread_join(iot_watchdog) failed");
    }

    /* Failure probably means it's already dead. */
    priv->watchdog_running = _gf_false;
}

int
reconfigure(xlator_t *this, dict_t *options)
{
    iot_conf_t *conf = NULL;
    int ret = -1;

    conf = this->private;
    if (!conf)
        goto out;

    GF_OPTION_RECONF("thread-count", conf->max_count, options, int32, out);

    GF_OPTION_RECONF("high-prio-threads", conf->ac_iot_limit[GF_FOP_PRI_HI],
                     options, int32, out);

    GF_OPTION_RECONF("normal-prio-threads",
                     conf->ac_iot_limit[GF_FOP_PRI_NORMAL], options, int32,
                     out);

    GF_OPTION_RECONF("low-prio-threads", conf->ac_iot_limit[GF_FOP_PRI_LO],
                     options, int32, out);

    GF_OPTION_RECONF("least-prio-threads", conf->ac_iot_limit[GF_FOP_PRI_LEAST],
                     options, int32, out);

    GF_OPTION_RECONF("enable-least-priority", conf->least_priority, options,
                     bool, out);

    GF_OPTION_RECONF("cleanup-disconnected-reqs",
                     conf->cleanup_disconnected_reqs, options, bool, out);

    GF_OPTION_RECONF("watchdog-secs", conf->watchdog_secs, options, int32, out);

    GF_OPTION_RECONF("pass-through", this->pass_through, options, bool, out);

    if (conf->watchdog_secs > 0) {
        start_iot_watchdog(this);
    } else {
        stop_iot_watchdog(this);
    }

    ret = 0;
out:
    return ret;
}

int
init(xlator_t *this)
{
    iot_conf_t *conf = NULL;
    int ret = -1;
    int i = 0;

    if (!this->children || this->children->next) {
        gf_msg("io-threads", GF_LOG_ERROR, 0,
               IO_THREADS_MSG_XLATOR_CHILD_MISCONFIGURED,
               "FATAL: iot not configured "
               "with exactly one child");
        goto out;
    }

    if (!this->parents) {
        gf_msg(this->name, GF_LOG_WARNING, 0, IO_THREADS_MSG_VOL_MISCONFIGURED,
               "dangling volume. check volfile ");
    }

    conf = (void *)GF_CALLOC(1, sizeof(*conf), gf_iot_mt_iot_conf_t);
    if (conf == NULL) {
        gf_msg(this->name, GF_LOG_ERROR, ENOMEM, IO_THREADS_MSG_NO_MEMORY,
               "out of memory");
        goto out;
    }

    if ((ret = pthread_cond_init(&conf->cond, NULL)) != 0) {
        gf_msg(this->name, GF_LOG_ERROR, 0, IO_THREADS_MSG_INIT_FAILED,
               "pthread_cond_init failed (%d)", ret);
        goto out;
    }
    conf->cond_inited = _gf_true;

    if ((ret = pthread_mutex_init(&conf->mutex, NULL)) != 0) {
        gf_msg(this->name, GF_LOG_ERROR, 0, IO_THREADS_MSG_INIT_FAILED,
               "pthread_mutex_init failed (%d)", ret);
        goto out;
    }
    conf->mutex_inited = _gf_true;

    ret = set_stack_size(conf);

    if (ret != 0)
        goto out;

    ret = -1;

    GF_OPTION_INIT("thread-count", conf->max_count, int32, out);

    GF_OPTION_INIT("high-prio-threads", conf->ac_iot_limit[GF_FOP_PRI_HI],
                   int32, out);

    GF_OPTION_INIT("normal-prio-threads", conf->ac_iot_limit[GF_FOP_PRI_NORMAL],
                   int32, out);

    GF_OPTION_INIT("low-prio-threads", conf->ac_iot_limit[GF_FOP_PRI_LO], int32,
                   out);

    GF_OPTION_INIT("least-prio-threads", conf->ac_iot_limit[GF_FOP_PRI_LEAST],
                   int32, out);

    GF_OPTION_INIT("idle-time", conf->idle_time, int32, out);

    GF_OPTION_INIT("enable-least-priority", conf->least_priority, bool, out);

    GF_OPTION_INIT("cleanup-disconnected-reqs", conf->cleanup_disconnected_reqs,
                   bool, out);

    GF_OPTION_INIT("pass-through", this->pass_through, bool, out);

    conf->this = this;
    GF_ATOMIC_INIT(conf->stub_cnt, 0);

    for (i = 0; i < GF_FOP_PRI_MAX; i++) {
        INIT_LIST_HEAD(&conf->clients[i]);
        INIT_LIST_HEAD(&conf->no_client[i].clients);
        INIT_LIST_HEAD(&conf->no_client[i].reqs);
    }

    ret = iot_workers_scale(conf);

    if (ret == -1) {
        gf_msg(this->name, GF_LOG_ERROR, 0, IO_THREADS_MSG_INIT_FAILED,
               "cannot initialize worker threads, exiting init");
        goto out;
    }

    this->private = conf;

    conf->watchdog_secs = 0;
    GF_OPTION_INIT("watchdog-secs", conf->watchdog_secs, int32, out);
    if (conf->watchdog_secs > 0) {
        start_iot_watchdog(this);
    }

    ret = 0;
out:
    if (ret)
        GF_FREE(conf);

    return ret;
}

static void
iot_exit_threads(iot_conf_t *conf)
{
    pthread_mutex_lock(&conf->mutex);
    {
        conf->down = _gf_true;
        /*Let all the threads know that xl is going down*/
        pthread_cond_broadcast(&conf->cond);
        while (conf->curr_count) /*Wait for threads to exit*/
            pthread_cond_wait(&conf->cond, &conf->mutex);
    }
    pthread_mutex_unlock(&conf->mutex);
}

int
notify(xlator_t *this, int32_t event, void *data, ...)
{
    iot_conf_t *conf = this->private;
    xlator_t *victim = data;
    uint64_t stub_cnt = 0;
    struct timespec sleep_till = {
        0,
    };

    if (GF_EVENT_PARENT_DOWN == event) {
        if (victim->cleanup_starting) {
            clock_gettime(CLOCK_REALTIME, &sleep_till);
            sleep_till.tv_sec += 1;
            /* Wait for draining stub from queue before notify PARENT_DOWN */
            stub_cnt = GF_ATOMIC_GET(conf->stub_cnt);

            pthread_mutex_lock(&conf->mutex);
            {
                while (stub_cnt) {
                    (void)pthread_cond_timedwait(&conf->cond, &conf->mutex,
                                                 &sleep_till);
                    stub_cnt = GF_ATOMIC_GET(conf->stub_cnt);
                }
            }
            pthread_mutex_unlock(&conf->mutex);

            gf_log(this->name, GF_LOG_INFO,
                   "Notify GF_EVENT_PARENT_DOWN for brick %s", victim->name);
        } else {
            iot_exit_threads(conf);
        }
    }

    if (GF_EVENT_CHILD_DOWN == event) {
        if (victim->cleanup_starting) {
            iot_exit_threads(conf);
            gf_log(this->name, GF_LOG_INFO,
                   "Notify GF_EVENT_CHILD_DOWN for brick %s", victim->name);
        }
    }

    default_notify(this, event, data);

    return 0;
}

void
fini(xlator_t *this)
{
    iot_conf_t *conf = this->private;

    if (!conf)
        return;

    if (conf->mutex_inited && conf->cond_inited)
        iot_exit_threads(conf);

    if (conf->cond_inited)
        pthread_cond_destroy(&conf->cond);

    if (conf->mutex_inited)
        pthread_mutex_destroy(&conf->mutex);

    stop_iot_watchdog(this);

    GF_FREE(conf);

    this->private = NULL;
    return;
}

int
iot_client_destroy(xlator_t *this, client_t *client)
{
    void *tmp = NULL;

    if (client_ctx_del(client, this, &tmp) == 0) {
        GF_FREE(tmp);
    }

    return 0;
}

static int
iot_disconnect_cbk(xlator_t *this, client_t *client)
{
    int i;
    call_stub_t *curr;
    call_stub_t *next;
    iot_conf_t *conf = this->private;
    iot_client_ctx_t *ctx;

    if (!conf || !conf->cleanup_disconnected_reqs) {
        goto out;
    }

    pthread_mutex_lock(&conf->mutex);
    for (i = 0; i < GF_FOP_PRI_MAX; i++) {
        ctx = &conf->no_client[i];
        list_for_each_entry_safe(curr, next, &ctx->reqs, list)
        {
            if (curr->frame->root->client != client) {
                continue;
            }
            gf_log(this->name, GF_LOG_INFO,
                   "poisoning %s fop at %p for client %s",
                   gf_fop_list[curr->fop], curr, client->client_uid);
            curr->poison = _gf_true;
        }
    }
    pthread_mutex_unlock(&conf->mutex);

out:
    return 0;
}

struct xlator_dumpops dumpops = {
    .priv = iot_priv_dump,
};

struct xlator_fops fops = {
    .open = iot_open,
    .create = iot_create,
    .readv = iot_readv,
    .writev = iot_writev,
    .flush = iot_flush,
    .fsync = iot_fsync,
    .lk = iot_lk,
    .stat = iot_stat,
    .fstat = iot_fstat,
    .truncate = iot_truncate,
    .ftruncate = iot_ftruncate,
    .unlink = iot_unlink,
    .lookup = iot_lookup,
    .setattr = iot_setattr,
    .fsetattr = iot_fsetattr,
    .access = iot_access,
    .readlink = iot_readlink,
    .mknod = iot_mknod,
    .mkdir = iot_mkdir,
    .rmdir = iot_rmdir,
    .symlink = iot_symlink,
    .rename = iot_rename,
    .link = iot_link,
    .opendir = iot_opendir,
    .fsyncdir = iot_fsyncdir,
    .statfs = iot_statfs,
    .setxattr = iot_setxattr,
    .getxattr = iot_getxattr,
    .fgetxattr = iot_fgetxattr,
    .fsetxattr = iot_fsetxattr,
    .removexattr = iot_removexattr,
    .fremovexattr = iot_fremovexattr,
    .readdir = iot_readdir,
    .readdirp = iot_readdirp,
    .inodelk = iot_inodelk,
    .finodelk = iot_finodelk,
    .entrylk = iot_entrylk,
    .fentrylk = iot_fentrylk,
    .xattrop = iot_xattrop,
    .fxattrop = iot_fxattrop,
    .rchecksum = iot_rchecksum,
    .fallocate = iot_fallocate,
    .discard = iot_discard,
    .zerofill = iot_zerofill,
    .seek = iot_seek,
    .lease = iot_lease,
    .getactivelk = iot_getactivelk,
    .setactivelk = iot_setactivelk,
    .put = iot_put,
};

struct xlator_cbks cbks = {
    .client_destroy = iot_client_destroy,
    .client_disconnect = iot_disconnect_cbk,
};

struct volume_options options[] = {
    {.key = {"thread-count"},
     .type = GF_OPTION_TYPE_INT,
     .min = IOT_MIN_THREADS,
     .max = IOT_MAX_THREADS,
     .default_value = "16",
     .op_version = {1},
     .flags = OPT_FLAG_SETTABLE | OPT_FLAG_DOC,
     .tags = {"io-threads"},
     /*.option = "thread-count"*/
     .description = "Number of threads in IO threads translator which "
                    "perform concurrent IO operations"

    },
    {.key = {"high-prio-threads"},
     .type = GF_OPTION_TYPE_INT,
     .min = IOT_MIN_THREADS,
     .max = IOT_MAX_THREADS,
     .default_value = "16",
     .op_version = {1},
     .flags = OPT_FLAG_SETTABLE | OPT_FLAG_DOC,
     .tags = {"io-threads"},
     .description = "Max number of threads in IO threads translator which "
                    "perform high priority IO operations at a given time"

    },
    {.key = {"normal-prio-threads"},
     .type = GF_OPTION_TYPE_INT,
     .min = IOT_MIN_THREADS,
     .max = IOT_MAX_THREADS,
     .default_value = "16",
     .op_version = {1},
     .flags = OPT_FLAG_SETTABLE | OPT_FLAG_DOC,
     .tags = {"io-threads"},
     .description = "Max number of threads in IO threads translator which "
                    "perform normal priority IO operations at a given time"

    },
    {.key = {"low-prio-threads"},
     .type = GF_OPTION_TYPE_INT,
     .min = IOT_MIN_THREADS,
     .max = IOT_MAX_THREADS,
     .default_value = "16",
     .op_version = {1},
     .flags = OPT_FLAG_SETTABLE | OPT_FLAG_DOC,
     .tags = {"io-threads"},
     .description = "Max number of threads in IO threads translator which "
                    "perform low priority IO operations at a given time"

    },
    {.key = {"least-prio-threads"},
     .type = GF_OPTION_TYPE_INT,
     .min = IOT_MIN_THREADS,
     .max = IOT_MAX_THREADS,
     .default_value = "1",
     .op_version = {1},
     .flags = OPT_FLAG_SETTABLE | OPT_FLAG_DOC,
     .tags = {"io-threads"},
     .description = "Max number of threads in IO threads translator which "
                    "perform least priority IO operations at a given time"},
    {.key = {"enable-least-priority"},
     .type = GF_OPTION_TYPE_BOOL,
     .default_value = SITE_H_ENABLE_LEAST_PRIORITY,
     .op_version = {1},
     .flags = OPT_FLAG_SETTABLE | OPT_FLAG_DOC,
     .tags = {"io-threads"},
     .description = "Enable/Disable least priority"},
    {
        .key = {"idle-time"},
        .type = GF_OPTION_TYPE_INT,
        .min = 1,
        .max = 0x7fffffff,
        .default_value = "120",
    },
    {.key = {"watchdog-secs"},
     .type = GF_OPTION_TYPE_INT,
     .min = 0,
     .default_value = 0,
     .op_version = {GD_OP_VERSION_4_1_0},
     .flags = OPT_FLAG_SETTABLE | OPT_FLAG_DOC,
     .tags = {"io-threads"},
     .description = "Number of seconds a queue must be stalled before "
                    "starting an 'emergency' thread."},
    {.key = {"cleanup-disconnected-reqs"},
     .type = GF_OPTION_TYPE_BOOL,
     .default_value = "off",
     .op_version = {GD_OP_VERSION_4_1_0},
     .flags = OPT_FLAG_SETTABLE | OPT_FLAG_DOC | OPT_FLAG_CLIENT_OPT,
     .tags = {"io-threads"},
     .description = "'Poison' queued requests when a client disconnects"},
    {.key = {"pass-through"},
     .type = GF_OPTION_TYPE_BOOL,
     .default_value = "false",
     .op_version = {GD_OP_VERSION_4_1_0},
     .flags = OPT_FLAG_SETTABLE | OPT_FLAG_DOC | OPT_FLAG_CLIENT_OPT,
     .tags = {"io-threads"},
     .description = "Enable/Disable io threads translator"},
    {
        .key = {NULL},
    },
};

xlator_api_t xlator_api = {
    .init = init,
    .fini = fini,
    .notify = notify,
    .reconfigure = reconfigure,
    .mem_acct_init = mem_acct_init,
    .op_version = {1}, /* Present from the initial version */
    .dumpops = &dumpops,
    .fops = &fops,
    .cbks = &cbks,
    .options = options,
    .identifier = "io-threads",
    .category = GF_MAINTAINED,
};