Blob Blame History Raw
/*
  Copyright (c) 2008-2013 Red Hat, Inc. <http://www.redhat.com>
  This file is part of GlusterFS.

  This file is licensed to you under your choice of the GNU Lesser
  General Public License, version 3 or any later version (LGPLv3 or
  later), or the GNU General Public License, version 2 (GPLv2), in all
  cases as published by the Free Software Foundation.
*/

/*
 * performance/readdir-ahead preloads a local buffer with directory entries
 * on opendir. The optimization involves using maximum sized gluster rpc
 * requests (128k) to minimize overhead of smaller client requests.
 *
 * For example, fuse currently supports a maximum readdir buffer of 4k
 * (regardless of the filesystem client's buffer size). readdir-ahead should
 * effectively convert these smaller requests into fewer, larger sized requests
 * for simple, sequential workloads (i.e., ls).
 *
 * The translator is currently designed to handle the simple, sequential case
 * only. If a non-sequential directory read occurs, readdir-ahead disables
 * preloads on the directory.
 */

#include <math.h>
#include <glusterfs/glusterfs.h>
#include <glusterfs/xlator.h>
#include <glusterfs/call-stub.h>
#include "readdir-ahead.h"
#include "readdir-ahead-mem-types.h"
#include <glusterfs/defaults.h>
#include "readdir-ahead-messages.h"
static int
rda_fill_fd(call_frame_t *, xlator_t *, fd_t *);

static void
rda_local_wipe(struct rda_local *local)
{
    if (local->fd)
        fd_unref(local->fd);
    if (local->xattrs)
        dict_unref(local->xattrs);
    if (local->inode)
        inode_unref(local->inode);
}

/*
 * Get (or create) the fd context for storing prepopulated directory
 * entries.
 */
static struct rda_fd_ctx *
get_rda_fd_ctx(fd_t *fd, xlator_t *this)
{
    uint64_t val;
    struct rda_fd_ctx *ctx;

    LOCK(&fd->lock);

    if (__fd_ctx_get(fd, this, &val) < 0) {
        ctx = GF_CALLOC(1, sizeof(struct rda_fd_ctx), gf_rda_mt_rda_fd_ctx);
        if (!ctx)
            goto out;

        LOCK_INIT(&ctx->lock);
        INIT_LIST_HEAD(&ctx->entries.list);
        ctx->state = RDA_FD_NEW;
        /* ctx offset values initialized to 0 */
        ctx->xattrs = NULL;

        if (__fd_ctx_set(fd, this, (uint64_t)(uintptr_t)ctx) < 0) {
            GF_FREE(ctx);
            ctx = NULL;
            goto out;
        }
    } else {
        ctx = (struct rda_fd_ctx *)(uintptr_t)val;
    }
out:
    UNLOCK(&fd->lock);
    return ctx;
}

static rda_inode_ctx_t *
__rda_inode_ctx_get(inode_t *inode, xlator_t *this)
{
    int ret = -1;
    uint64_t ctx_uint = 0;
    rda_inode_ctx_t *ctx_p = NULL;

    ret = __inode_ctx_get1(inode, this, &ctx_uint);
    if (ret == 0)
        return (rda_inode_ctx_t *)(uintptr_t)ctx_uint;

    ctx_p = GF_CALLOC(1, sizeof(*ctx_p), gf_rda_mt_inode_ctx_t);
    if (!ctx_p)
        return NULL;

    GF_ATOMIC_INIT(ctx_p->generation, 0);

    ret = __inode_ctx_set1(inode, this, (uint64_t *)&ctx_p);
    if (ret < 0) {
        GF_FREE(ctx_p);
        return NULL;
    }

    return ctx_p;
}

static int
__rda_inode_ctx_update_iatts(inode_t *inode, xlator_t *this,
                             struct iatt *stbuf_in, struct iatt *stbuf_out,
                             uint64_t generation)
{
    rda_inode_ctx_t *ctx_p = NULL;
    struct iatt tmp_stat = {
        0,
    };

    ctx_p = __rda_inode_ctx_get(inode, this);
    if (!ctx_p)
        return -1;

    if ((!stbuf_in) || (stbuf_in->ia_ctime == 0)) {
        /* A fop modified a file but valid stbuf is not provided.
         * Can't update iatt to reflect results of fop and hence
         * invalidate the iatt stored in dentry.
         *
         * An example of this case can be response of write request
         * that is cached in write-behind.
         */
        if (stbuf_in)
            tmp_stat = *stbuf_in;
        else
            tmp_stat = ctx_p->statbuf;
        memset(&ctx_p->statbuf, 0, sizeof(ctx_p->statbuf));
        gf_uuid_copy(ctx_p->statbuf.ia_gfid, tmp_stat.ia_gfid);
        ctx_p->statbuf.ia_type = tmp_stat.ia_type;
        GF_ATOMIC_INC(ctx_p->generation);
    } else {
        if (ctx_p->statbuf.ia_ctime) {
            if (stbuf_in->ia_ctime < ctx_p->statbuf.ia_ctime) {
                goto out;
            }

            if ((stbuf_in->ia_ctime == ctx_p->statbuf.ia_ctime) &&
                (stbuf_in->ia_ctime_nsec < ctx_p->statbuf.ia_ctime_nsec)) {
                goto out;
            }
        } else {
            if ((generation != -1) &&
                (generation != GF_ATOMIC_GET(ctx_p->generation)))
                goto out;
        }

        ctx_p->statbuf = *stbuf_in;
    }

out:
    if (stbuf_out)
        *stbuf_out = ctx_p->statbuf;

    return 0;
}

static int
rda_inode_ctx_update_iatts(inode_t *inode, xlator_t *this,
                           struct iatt *stbuf_in, struct iatt *stbuf_out,
                           uint64_t generation)
{
    int ret = -1;

    LOCK(&inode->lock);
    {
        ret = __rda_inode_ctx_update_iatts(inode, this, stbuf_in, stbuf_out,
                                           generation);
    }
    UNLOCK(&inode->lock);

    return ret;
}

/*
 * Reset the tracking state of the context.
 */
static void
rda_reset_ctx(xlator_t *this, struct rda_fd_ctx *ctx)
{
    struct rda_priv *priv = NULL;

    priv = this->private;

    ctx->state = RDA_FD_NEW;
    ctx->cur_offset = 0;
    ctx->next_offset = 0;
    ctx->op_errno = 0;

    gf_dirent_free(&ctx->entries);
    GF_ATOMIC_SUB(priv->rda_cache_size, ctx->cur_size);
    ctx->cur_size = 0;

    if (ctx->xattrs) {
        dict_unref(ctx->xattrs);
        ctx->xattrs = NULL;
    }
}

static void
rda_mark_inode_dirty(xlator_t *this, inode_t *inode)
{
    inode_t *parent = NULL;
    fd_t *fd = NULL;
    uint64_t val = 0;
    int32_t ret = 0;
    struct rda_fd_ctx *fd_ctx = NULL;
    char gfid[GF_UUID_BUF_SIZE] = {0};

    parent = inode_parent(inode, NULL, NULL);
    if (parent) {
        LOCK(&parent->lock);
        {
            list_for_each_entry(fd, &parent->fd_list, inode_list)
            {
                val = 0;
                fd_ctx_get(fd, this, &val);
                if (val == 0)
                    continue;

                fd_ctx = (void *)(uintptr_t)val;
                uuid_utoa_r(inode->gfid, gfid);
                if (!GF_ATOMIC_GET(fd_ctx->prefetching))
                    continue;

                LOCK(&fd_ctx->lock);
                {
                    if (GF_ATOMIC_GET(fd_ctx->prefetching)) {
                        if (fd_ctx->writes_during_prefetch == NULL)
                            fd_ctx->writes_during_prefetch = dict_new();

                        ret = dict_set_int8(fd_ctx->writes_during_prefetch,
                                            gfid, 1);
                        if (ret < 0) {
                            gf_log(this->name, GF_LOG_WARNING,
                                   "marking to invalidate stats of %s from an "
                                   "in progress "
                                   "prefetching has failed, might result in "
                                   "stale stat to "
                                   "application",
                                   gfid);
                        }
                    }
                }
                UNLOCK(&fd_ctx->lock);
            }
        }
        UNLOCK(&parent->lock);
        inode_unref(parent);
    }

    return;
}

/*
 * Check whether we can handle a request. Offset verification is done by the
 * caller, so we only check whether the preload buffer has completion status
 * (including an error) or has some data to return.
 */
static gf_boolean_t
rda_can_serve_readdirp(struct rda_fd_ctx *ctx, size_t request_size)
{
    if ((ctx->state & RDA_FD_EOD) || (ctx->state & RDA_FD_ERROR) ||
        (!(ctx->state & RDA_FD_PLUGGED) && (ctx->cur_size > 0)) ||
        (request_size && ctx->cur_size >= request_size))
        return _gf_true;

    return _gf_false;
}

void
rda_inode_ctx_get_iatt(inode_t *inode, xlator_t *this, struct iatt *attr)
{
    rda_inode_ctx_t *ctx_p = NULL;

    if (!inode || !this || !attr)
        goto out;

    LOCK(&inode->lock);
    {
        ctx_p = __rda_inode_ctx_get(inode, this);
        if (ctx_p) {
            *attr = ctx_p->statbuf;
        }
    }
    UNLOCK(&inode->lock);

out:
    return;
}

/*
 * Serve a request from the fd dentry list based on the size of the request
 * buffer. ctx must be locked.
 */
static int32_t
__rda_fill_readdirp(xlator_t *this, gf_dirent_t *entries, size_t request_size,
                    struct rda_fd_ctx *ctx)
{
    gf_dirent_t *dirent, *tmp;
    size_t dirent_size, size = 0;
    int32_t count = 0;
    struct rda_priv *priv = NULL;
    struct iatt tmp_stat = {
        0,
    };

    priv = this->private;

    list_for_each_entry_safe(dirent, tmp, &ctx->entries.list, list)
    {
        dirent_size = gf_dirent_size(dirent->d_name);
        if (size + dirent_size > request_size)
            break;

        memset(&tmp_stat, 0, sizeof(tmp_stat));

        if (dirent->inode && (!((strcmp(dirent->d_name, ".") == 0) ||
                                (strcmp(dirent->d_name, "..") == 0)))) {
            rda_inode_ctx_get_iatt(dirent->inode, this, &tmp_stat);
            dirent->d_stat = tmp_stat;
        }

        size += dirent_size;
        list_del_init(&dirent->list);
        ctx->cur_size -= dirent_size;

        GF_ATOMIC_SUB(priv->rda_cache_size, dirent_size);

        list_add_tail(&dirent->list, &entries->list);
        ctx->cur_offset = dirent->d_off;
        count++;
    }

    if (ctx->cur_size <= priv->rda_low_wmark)
        ctx->state |= RDA_FD_PLUGGED;

    return count;
}

static int32_t
__rda_serve_readdirp(xlator_t *this, struct rda_fd_ctx *ctx, size_t size,
                     gf_dirent_t *entries, int *op_errno)
{
    int32_t ret = 0;

    ret = __rda_fill_readdirp(this, entries, size, ctx);

    if (!ret && (ctx->state & RDA_FD_ERROR)) {
        ret = -1;
        ctx->state &= ~RDA_FD_ERROR;

        /*
         * the preload has stopped running in the event of an error, so
         * pass all future requests along
         */
        ctx->state |= RDA_FD_BYPASS;
    }
    /*
     * Use the op_errno sent by lower layers as xlators above will check
     * the op_errno for identifying whether readdir is completed or not.
     */
    *op_errno = ctx->op_errno;

    return ret;
}

static int32_t
rda_readdirp(call_frame_t *frame, xlator_t *this, fd_t *fd, size_t size,
             off_t off, dict_t *xdata)
{
    struct rda_fd_ctx *ctx = NULL;
    int fill = 0;
    gf_dirent_t entries;
    int ret = 0;
    int op_errno = 0;
    gf_boolean_t serve = _gf_false;

    ctx = get_rda_fd_ctx(fd, this);
    if (!ctx)
        goto err;

    if (ctx->state & RDA_FD_BYPASS)
        goto bypass;

    INIT_LIST_HEAD(&entries.list);
    LOCK(&ctx->lock);

    /* recheck now that we have the lock */
    if (ctx->state & RDA_FD_BYPASS) {
        UNLOCK(&ctx->lock);
        goto bypass;
    }

    /*
     * If a new read comes in at offset 0 and the buffer has been
     * completed, reset the context and kickstart the filler again.
     */
    if (!off && (ctx->state & RDA_FD_EOD) && (ctx->cur_size == 0)) {
        rda_reset_ctx(this, ctx);
        /*
         * Unref and discard the 'list of xattrs to be fetched'
         * stored during opendir call. This is done above - inside
         * rda_reset_ctx().
         * Now, ref the xdata passed by md-cache in actual readdirp()
         * call and use that for all subsequent internal readdirp()
         * requests issued by this xlator.
         */
        ctx->xattrs = dict_ref(xdata);
        fill = 1;
    }

    /*
     * If a readdir occurs at an unexpected offset or we already have a
     * request pending, admit defeat and just get out of the way.
     */
    if (off != ctx->cur_offset || ctx->stub) {
        ctx->state |= RDA_FD_BYPASS;
        UNLOCK(&ctx->lock);
        goto bypass;
    }

    /*
     * If we haven't bypassed the preload, this means we can either serve
     * the request out of the preload or the request that enables us to do
     * so is in flight...
     */
    if (rda_can_serve_readdirp(ctx, size)) {
        ret = __rda_serve_readdirp(this, ctx, size, &entries, &op_errno);
        serve = _gf_true;

        if (op_errno == ENOENT &&
            !((ctx->state & RDA_FD_EOD) && (ctx->cur_size == 0)))
            op_errno = 0;
    } else {
        ctx->stub = fop_readdirp_stub(frame, NULL, fd, size, off, xdata);
        if (!ctx->stub) {
            UNLOCK(&ctx->lock);
            goto err;
        }

        if (!(ctx->state & RDA_FD_RUNNING)) {
            fill = 1;
            if (!ctx->xattrs)
                ctx->xattrs = dict_ref(xdata);
            ctx->state |= RDA_FD_RUNNING;
        }
    }

    UNLOCK(&ctx->lock);

    if (serve) {
        STACK_UNWIND_STRICT(readdirp, frame, ret, op_errno, &entries, xdata);
        gf_dirent_free(&entries);
    }

    if (fill)
        rda_fill_fd(frame, this, fd);

    return 0;

bypass:
    STACK_WIND(frame, default_readdirp_cbk, FIRST_CHILD(this),
               FIRST_CHILD(this)->fops->readdirp, fd, size, off, xdata);
    return 0;

err:
    STACK_UNWIND_STRICT(readdirp, frame, -1, ENOMEM, NULL, NULL);
    return 0;
}

static int32_t
rda_fill_fd_cbk(call_frame_t *frame, void *cookie, xlator_t *this,
                int32_t op_ret, int32_t op_errno, gf_dirent_t *entries,
                dict_t *xdata)
{
    gf_dirent_t *dirent = NULL;
    gf_dirent_t *tmp = NULL;
    gf_dirent_t serve_entries;
    struct rda_local *local = frame->local;
    struct rda_fd_ctx *ctx = local->ctx;
    struct rda_priv *priv = this->private;
    int fill = 1;
    size_t dirent_size = 0;
    int ret = 0;
    gf_boolean_t serve = _gf_false;
    call_stub_t *stub = NULL;
    char gfid[GF_UUID_BUF_SIZE] = {
        0,
    };
    uint64_t generation = 0;
    call_frame_t *fill_frame = NULL;

    INIT_LIST_HEAD(&serve_entries.list);
    LOCK(&ctx->lock);

    /* Verify that the preload buffer is still pending on this data. */
    if (ctx->next_offset != local->offset) {
        gf_msg(this->name, GF_LOG_ERROR, 0, READDIR_AHEAD_MSG_OUT_OF_SEQUENCE,
               "Out of sequence directory preload.");
        ctx->state |= (RDA_FD_BYPASS | RDA_FD_ERROR);
        ctx->op_errno = EUCLEAN;

        goto out;
    }

    if (entries) {
        list_for_each_entry_safe(dirent, tmp, &entries->list, list)
        {
            list_del_init(&dirent->list);

            /* must preserve entry order */
            list_add_tail(&dirent->list, &ctx->entries.list);
            if (dirent->inode) {
                /* If ctxp->stat is invalidated, don't update it
                 * with dirent->d_stat as we don't have
                 * generation number of the inode when readdirp
                 * request was initiated. So, we pass 0 for
                 * generation number
                 */

                generation = -1;
                if (ctx->writes_during_prefetch) {
                    memset(gfid, 0, sizeof(gfid));
                    uuid_utoa_r(dirent->inode->gfid, gfid);
                    if (dict_get(ctx->writes_during_prefetch, gfid))
                        generation = 0;
                }

                if (!((strcmp(dirent->d_name, ".") == 0) ||
                      (strcmp(dirent->d_name, "..") == 0))) {
                    rda_inode_ctx_update_iatts(dirent->inode, this,
                                               &dirent->d_stat, &dirent->d_stat,
                                               generation);
                }
            }

            dirent_size = gf_dirent_size(dirent->d_name);

            ctx->cur_size += dirent_size;

            GF_ATOMIC_ADD(priv->rda_cache_size, dirent_size);

            ctx->next_offset = dirent->d_off;
        }
    }

    if (ctx->writes_during_prefetch) {
        dict_unref(ctx->writes_during_prefetch);
        ctx->writes_during_prefetch = NULL;
    }

    GF_ATOMIC_DEC(ctx->prefetching);

    if (ctx->cur_size >= priv->rda_high_wmark)
        ctx->state &= ~RDA_FD_PLUGGED;

    if (!op_ret || op_errno == ENOENT) {
        /* we've hit eod */
        ctx->state &= ~RDA_FD_RUNNING;
        ctx->state |= RDA_FD_EOD;
        ctx->op_errno = op_errno;
    } else if (op_ret == -1) {
        /* kill the preload and pend the error */
        ctx->state &= ~RDA_FD_RUNNING;
        ctx->state |= RDA_FD_ERROR;
        ctx->op_errno = op_errno;
    }

    /*
     * NOTE: The strict bypass logic in readdirp() means a pending request
     * is always based on ctx->cur_offset.
     */
    if (ctx->stub && rda_can_serve_readdirp(ctx, ctx->stub->args.size)) {
        ret = __rda_serve_readdirp(this, ctx, ctx->stub->args.size,
                                   &serve_entries, &op_errno);
        serve = _gf_true;
        stub = ctx->stub;
        ctx->stub = NULL;
    }

out:
    /*
     * If we have been marked for bypass and have no pending stub, clear the
     * run state so we stop preloading the context with entries.
     */
    if (!ctx->stub &&
        ((ctx->state & RDA_FD_BYPASS) ||
         GF_ATOMIC_GET(priv->rda_cache_size) > priv->rda_cache_limit))
        ctx->state &= ~RDA_FD_RUNNING;

    if (!(ctx->state & RDA_FD_RUNNING)) {
        fill = 0;
        if (ctx->xattrs) {
            /*
             * fill = 0 and hence rda_fill_fd() won't be invoked.
             * unref for ref taken in rda_fill_fd()
             */
            dict_unref(ctx->xattrs);
            ctx->xattrs = NULL;
        }

        fill_frame = ctx->fill_frame;
        ctx->fill_frame = NULL;
    }

    if (op_errno == ENOENT &&
        !((ctx->state & RDA_FD_EOD) && (ctx->cur_size == 0)))
        op_errno = 0;

    UNLOCK(&ctx->lock);
    if (fill_frame) {
        rda_local_wipe(fill_frame->local);
        STACK_DESTROY(fill_frame->root);
    }

    if (serve) {
        STACK_UNWIND_STRICT(readdirp, stub->frame, ret, op_errno,
                            &serve_entries, xdata);
        gf_dirent_free(&serve_entries);
        call_stub_destroy(stub);
    }

    if (fill)
        rda_fill_fd(frame, this, local->fd);

    return 0;
}

/*
 * Start prepopulating the fd context with directory entries.
 */
static int
rda_fill_fd(call_frame_t *frame, xlator_t *this, fd_t *fd)
{
    call_frame_t *nframe = NULL;
    struct rda_local *local = NULL;
    struct rda_local *orig_local = frame->local;
    struct rda_fd_ctx *ctx;
    off_t offset;
    struct rda_priv *priv = this->private;

    ctx = get_rda_fd_ctx(fd, this);
    if (!ctx)
        goto err;

    LOCK(&ctx->lock);

    if (ctx->state & RDA_FD_NEW) {
        ctx->state &= ~RDA_FD_NEW;
        ctx->state |= RDA_FD_RUNNING;
        if (priv->rda_low_wmark)
            ctx->state |= RDA_FD_PLUGGED;
    }

    offset = ctx->next_offset;

    if (!ctx->fill_frame) {
        nframe = copy_frame(frame);
        if (!nframe) {
            UNLOCK(&ctx->lock);
            goto err;
        }

        local = mem_get0(this->local_pool);
        if (!local) {
            UNLOCK(&ctx->lock);
            goto err;
        }

        local->ctx = ctx;
        local->fd = fd_ref(fd);
        nframe->local = local;

        ctx->fill_frame = nframe;

        if (!ctx->xattrs && orig_local && orig_local->xattrs) {
            /* when this function is invoked by rda_opendir_cbk */
            ctx->xattrs = dict_ref(orig_local->xattrs);
        }
    } else {
        nframe = ctx->fill_frame;
        local = nframe->local;
    }

    local->offset = offset;
    GF_ATOMIC_INC(ctx->prefetching);

    UNLOCK(&ctx->lock);

    STACK_WIND(nframe, rda_fill_fd_cbk, FIRST_CHILD(this),
               FIRST_CHILD(this)->fops->readdirp, fd, priv->rda_req_size,
               offset, ctx->xattrs);

    return 0;

err:
    if (nframe) {
        rda_local_wipe(nframe->local);
        FRAME_DESTROY(nframe);
    }

    return -1;
}

static int32_t
rda_opendir_cbk(call_frame_t *frame, void *cookie, xlator_t *this,
                int32_t op_ret, int32_t op_errno, fd_t *fd, dict_t *xdata)
{
    if (!op_ret)
        rda_fill_fd(frame, this, fd);

    RDA_STACK_UNWIND(opendir, frame, op_ret, op_errno, fd, xdata);
    return 0;
}

static int32_t
rda_opendir(call_frame_t *frame, xlator_t *this, loc_t *loc, fd_t *fd,
            dict_t *xdata)
{
    int op_errno = 0;
    struct rda_local *local = NULL;

    if (xdata) {
        local = mem_get0(this->local_pool);
        if (!local) {
            op_errno = ENOMEM;
            goto unwind;
        }

        /*
         * Retrieve list of keys set by md-cache xlator and store it
         * in local to be consumed in rda_opendir_cbk
         */
        local->xattrs = dict_copy_with_ref(xdata, NULL);
        frame->local = local;
    }

    STACK_WIND(frame, rda_opendir_cbk, FIRST_CHILD(this),
               FIRST_CHILD(this)->fops->opendir, loc, fd, xdata);
    return 0;

unwind:
    STACK_UNWIND_STRICT(opendir, frame, -1, op_errno, fd, xdata);
    return 0;
}

static int32_t
rda_writev_cbk(call_frame_t *frame, void *cookie, xlator_t *this,
               int32_t op_ret, int32_t op_errno, struct iatt *prebuf,
               struct iatt *postbuf, dict_t *xdata)
{
    struct rda_local *local = NULL;
    struct iatt postbuf_out = {
        0,
    };

    if (op_ret < 0)
        goto unwind;

    local = frame->local;

    rda_mark_inode_dirty(this, local->inode);

    rda_inode_ctx_update_iatts(local->inode, this, postbuf, &postbuf_out,
                               local->generation);

unwind:
    RDA_STACK_UNWIND(writev, frame, op_ret, op_errno, prebuf, &postbuf_out,
                     xdata);
    return 0;
}

static int32_t
rda_writev(call_frame_t *frame, xlator_t *this, fd_t *fd, struct iovec *vector,
           int32_t count, off_t off, uint32_t flags, struct iobref *iobref,
           dict_t *xdata)
{
    RDA_COMMON_MODIFICATION_FOP(writev, frame, this, fd->inode, xdata, fd,
                                vector, count, off, flags, iobref);
    return 0;
}

static int32_t
rda_fallocate_cbk(call_frame_t *frame, void *cookie, xlator_t *this,
                  int32_t op_ret, int32_t op_errno, struct iatt *prebuf,
                  struct iatt *postbuf, dict_t *xdata)
{
    struct rda_local *local = NULL;
    struct iatt postbuf_out = {
        0,
    };

    if (op_ret < 0)
        goto unwind;

    local = frame->local;
    rda_mark_inode_dirty(this, local->inode);
    rda_inode_ctx_update_iatts(local->inode, this, postbuf, &postbuf_out,
                               local->generation);

unwind:
    RDA_STACK_UNWIND(fallocate, frame, op_ret, op_errno, prebuf, &postbuf_out,
                     xdata);
    return 0;
}

static int32_t
rda_fallocate(call_frame_t *frame, xlator_t *this, fd_t *fd, int32_t keep_size,
              off_t offset, size_t len, dict_t *xdata)
{
    RDA_COMMON_MODIFICATION_FOP(fallocate, frame, this, fd->inode, xdata, fd,
                                keep_size, offset, len);
    return 0;
}

static int32_t
rda_zerofill_cbk(call_frame_t *frame, void *cookie, xlator_t *this,
                 int32_t op_ret, int32_t op_errno, struct iatt *prebuf,
                 struct iatt *postbuf, dict_t *xdata)
{
    struct rda_local *local = NULL;
    struct iatt postbuf_out = {
        0,
    };

    if (op_ret < 0)
        goto unwind;

    local = frame->local;
    rda_mark_inode_dirty(this, local->inode);
    rda_inode_ctx_update_iatts(local->inode, this, postbuf, &postbuf_out,
                               local->generation);

unwind:
    RDA_STACK_UNWIND(zerofill, frame, op_ret, op_errno, prebuf, &postbuf_out,
                     xdata);
    return 0;
}

static int32_t
rda_zerofill(call_frame_t *frame, xlator_t *this, fd_t *fd, off_t offset,
             off_t len, dict_t *xdata)
{
    RDA_COMMON_MODIFICATION_FOP(zerofill, frame, this, fd->inode, xdata, fd,
                                offset, len);
    return 0;
}

static int32_t
rda_discard_cbk(call_frame_t *frame, void *cookie, xlator_t *this,
                int32_t op_ret, int32_t op_errno, struct iatt *prebuf,
                struct iatt *postbuf, dict_t *xdata)
{
    struct rda_local *local = NULL;
    struct iatt postbuf_out = {
        0,
    };

    if (op_ret < 0)
        goto unwind;

    local = frame->local;
    rda_mark_inode_dirty(this, local->inode);
    rda_inode_ctx_update_iatts(local->inode, this, postbuf, &postbuf_out,
                               local->generation);

unwind:
    RDA_STACK_UNWIND(discard, frame, op_ret, op_errno, prebuf, &postbuf_out,
                     xdata);
    return 0;
}

static int32_t
rda_discard(call_frame_t *frame, xlator_t *this, fd_t *fd, off_t offset,
            size_t len, dict_t *xdata)
{
    RDA_COMMON_MODIFICATION_FOP(discard, frame, this, fd->inode, xdata, fd,
                                offset, len);
    return 0;
}

static int32_t
rda_ftruncate_cbk(call_frame_t *frame, void *cookie, xlator_t *this,
                  int32_t op_ret, int32_t op_errno, struct iatt *prebuf,
                  struct iatt *postbuf, dict_t *xdata)
{
    struct rda_local *local = NULL;
    struct iatt postbuf_out = {
        0,
    };

    if (op_ret < 0)
        goto unwind;

    local = frame->local;
    rda_mark_inode_dirty(this, local->inode);
    rda_inode_ctx_update_iatts(local->inode, this, postbuf, &postbuf_out,
                               local->generation);

unwind:
    RDA_STACK_UNWIND(ftruncate, frame, op_ret, op_errno, prebuf, &postbuf_out,
                     xdata);
    return 0;
}

static int32_t
rda_ftruncate(call_frame_t *frame, xlator_t *this, fd_t *fd, off_t offset,
              dict_t *xdata)
{
    RDA_COMMON_MODIFICATION_FOP(ftruncate, frame, this, fd->inode, xdata, fd,
                                offset);
    return 0;
}

static int32_t
rda_truncate_cbk(call_frame_t *frame, void *cookie, xlator_t *this,
                 int32_t op_ret, int32_t op_errno, struct iatt *prebuf,
                 struct iatt *postbuf, dict_t *xdata)
{
    struct rda_local *local = NULL;
    struct iatt postbuf_out = {
        0,
    };

    if (op_ret < 0)
        goto unwind;

    local = frame->local;
    rda_mark_inode_dirty(this, local->inode);
    rda_inode_ctx_update_iatts(local->inode, this, postbuf, &postbuf_out,
                               local->generation);

unwind:
    RDA_STACK_UNWIND(ftruncate, frame, op_ret, op_errno, prebuf, &postbuf_out,
                     xdata);
    return 0;
}

static int32_t
rda_truncate(call_frame_t *frame, xlator_t *this, loc_t *loc, off_t offset,
             dict_t *xdata)
{
    RDA_COMMON_MODIFICATION_FOP(truncate, frame, this, loc->inode, xdata, loc,
                                offset);
    return 0;
}

static int32_t
rda_setxattr_cbk(call_frame_t *frame, void *cookie, xlator_t *this,
                 int32_t op_ret, int32_t op_errno, dict_t *xdata)
{
    struct rda_local *local = NULL;

    if (op_ret < 0)
        goto unwind;

    local = frame->local;
    rda_mark_inode_dirty(this, local->inode);
    rda_inode_ctx_update_iatts(local->inode, this, NULL, NULL,
                               local->generation);
unwind:
    RDA_STACK_UNWIND(setxattr, frame, op_ret, op_errno, xdata);
    return 0;
}

static int32_t
rda_setxattr(call_frame_t *frame, xlator_t *this, loc_t *loc, dict_t *dict,
             int32_t flags, dict_t *xdata)
{
    RDA_COMMON_MODIFICATION_FOP(setxattr, frame, this, loc->inode, xdata, loc,
                                dict, flags);
    return 0;
}

static int32_t
rda_fsetxattr_cbk(call_frame_t *frame, void *cookie, xlator_t *this,
                  int32_t op_ret, int32_t op_errno, dict_t *xdata)
{
    struct rda_local *local = NULL;

    if (op_ret < 0)
        goto unwind;

    local = frame->local;
    rda_mark_inode_dirty(this, local->inode);
    rda_inode_ctx_update_iatts(local->inode, this, NULL, NULL,
                               local->generation);
unwind:
    RDA_STACK_UNWIND(fsetxattr, frame, op_ret, op_errno, xdata);
    return 0;
}

static int32_t
rda_fsetxattr(call_frame_t *frame, xlator_t *this, fd_t *fd, dict_t *dict,
              int32_t flags, dict_t *xdata)
{
    RDA_COMMON_MODIFICATION_FOP(fsetxattr, frame, this, fd->inode, xdata, fd,
                                dict, flags);
    return 0;
}

static int32_t
rda_setattr_cbk(call_frame_t *frame, void *cookie, xlator_t *this,
                int32_t op_ret, int32_t op_errno, struct iatt *statpre,
                struct iatt *statpost, dict_t *xdata)
{
    struct rda_local *local = NULL;
    struct iatt postbuf_out = {
        0,
    };

    if (op_ret < 0)
        goto unwind;

    local = frame->local;
    rda_mark_inode_dirty(this, local->inode);
    rda_inode_ctx_update_iatts(local->inode, this, statpost, &postbuf_out,
                               local->generation);

unwind:
    RDA_STACK_UNWIND(setattr, frame, op_ret, op_errno, statpre, &postbuf_out,
                     xdata);
    return 0;
}

static int32_t
rda_setattr(call_frame_t *frame, xlator_t *this, loc_t *loc, struct iatt *stbuf,
            int32_t valid, dict_t *xdata)
{
    RDA_COMMON_MODIFICATION_FOP(setattr, frame, this, loc->inode, xdata, loc,
                                stbuf, valid);
    return 0;
}

static int32_t
rda_fsetattr_cbk(call_frame_t *frame, void *cookie, xlator_t *this,
                 int32_t op_ret, int32_t op_errno, struct iatt *statpre,
                 struct iatt *statpost, dict_t *xdata)
{
    struct rda_local *local = NULL;
    struct iatt postbuf_out = {
        0,
    };

    if (op_ret < 0)
        goto unwind;

    local = frame->local;
    rda_mark_inode_dirty(this, local->inode);
    rda_inode_ctx_update_iatts(local->inode, this, statpost, &postbuf_out,
                               local->generation);

unwind:
    RDA_STACK_UNWIND(fsetattr, frame, op_ret, op_errno, statpre, &postbuf_out,
                     xdata);
    return 0;
}

static int32_t
rda_fsetattr(call_frame_t *frame, xlator_t *this, fd_t *fd, struct iatt *stbuf,
             int32_t valid, dict_t *xdata)
{
    RDA_COMMON_MODIFICATION_FOP(fsetattr, frame, this, fd->inode, xdata, fd,
                                stbuf, valid);
    return 0;
}

static int32_t
rda_removexattr_cbk(call_frame_t *frame, void *cookie, xlator_t *this,
                    int32_t op_ret, int32_t op_errno, dict_t *xdata)
{
    struct rda_local *local = NULL;

    if (op_ret < 0)
        goto unwind;

    local = frame->local;
    rda_mark_inode_dirty(this, local->inode);
    rda_inode_ctx_update_iatts(local->inode, this, NULL, NULL,
                               local->generation);
unwind:
    RDA_STACK_UNWIND(removexattr, frame, op_ret, op_errno, xdata);
    return 0;
}

static int32_t
rda_removexattr(call_frame_t *frame, xlator_t *this, loc_t *loc,
                const char *name, dict_t *xdata)
{
    RDA_COMMON_MODIFICATION_FOP(removexattr, frame, this, loc->inode, xdata,
                                loc, name);
    return 0;
}

static int32_t
rda_fremovexattr_cbk(call_frame_t *frame, void *cookie, xlator_t *this,
                     int32_t op_ret, int32_t op_errno, dict_t *xdata)
{
    struct rda_local *local = NULL;

    if (op_ret < 0)
        goto unwind;

    local = frame->local;
    rda_mark_inode_dirty(this, local->inode);
    rda_inode_ctx_update_iatts(local->inode, this, NULL, NULL,
                               local->generation);
unwind:
    RDA_STACK_UNWIND(fremovexattr, frame, op_ret, op_errno, xdata);
    return 0;
}

static int32_t
rda_fremovexattr(call_frame_t *frame, xlator_t *this, fd_t *fd,
                 const char *name, dict_t *xdata)
{
    RDA_COMMON_MODIFICATION_FOP(fremovexattr, frame, this, fd->inode, xdata, fd,
                                name);
    return 0;
}

static int32_t
rda_releasedir(xlator_t *this, fd_t *fd)
{
    uint64_t val;
    struct rda_fd_ctx *ctx;

    if (fd_ctx_del(fd, this, &val) < 0)
        return -1;

    ctx = (struct rda_fd_ctx *)(uintptr_t)val;
    if (!ctx)
        return 0;

    rda_reset_ctx(this, ctx);

    if (ctx->fill_frame)
        STACK_DESTROY(ctx->fill_frame->root);

    if (ctx->stub)
        gf_msg(this->name, GF_LOG_ERROR, 0,
               READDIR_AHEAD_MSG_DIR_RELEASE_PENDING_STUB,
               "released a directory with a pending stub");

    GF_FREE(ctx);
    return 0;
}

static int
rda_forget(xlator_t *this, inode_t *inode)
{
    uint64_t ctx_uint = 0;
    rda_inode_ctx_t *ctx = NULL;

    inode_ctx_del1(inode, this, &ctx_uint);
    if (!ctx_uint)
        return 0;

    ctx = (rda_inode_ctx_t *)(uintptr_t)ctx_uint;

    GF_FREE(ctx);

    return 0;
}

int32_t
mem_acct_init(xlator_t *this)
{
    int ret = -1;

    if (!this)
        goto out;

    ret = xlator_mem_acct_init(this, gf_rda_mt_end + 1);

    if (ret != 0)
        gf_msg(this->name, GF_LOG_ERROR, ENOMEM, READDIR_AHEAD_MSG_NO_MEMORY,
               "Memory accounting init"
               "failed");

out:
    return ret;
}

int
reconfigure(xlator_t *this, dict_t *options)
{
    struct rda_priv *priv = this->private;

    GF_OPTION_RECONF("rda-request-size", priv->rda_req_size, options,
                     size_uint64, err);
    GF_OPTION_RECONF("rda-low-wmark", priv->rda_low_wmark, options, size_uint64,
                     err);
    GF_OPTION_RECONF("rda-high-wmark", priv->rda_high_wmark, options,
                     size_uint64, err);
    GF_OPTION_RECONF("rda-cache-limit", priv->rda_cache_limit, options,
                     size_uint64, err);
    GF_OPTION_RECONF("parallel-readdir", priv->parallel_readdir, options, bool,
                     err);
    GF_OPTION_RECONF("pass-through", this->pass_through, options, bool, err);

    return 0;
err:
    return -1;
}

int
init(xlator_t *this)
{
    struct rda_priv *priv = NULL;

    GF_VALIDATE_OR_GOTO("readdir-ahead", this, err);

    if (!this->children || this->children->next) {
        gf_msg(this->name, GF_LOG_ERROR, 0,
               READDIR_AHEAD_MSG_XLATOR_CHILD_MISCONFIGURED,
               "FATAL: readdir-ahead not configured with exactly one"
               " child");
        goto err;
    }

    if (!this->parents) {
        gf_msg(this->name, GF_LOG_WARNING, 0,
               READDIR_AHEAD_MSG_VOL_MISCONFIGURED,
               "dangling volume. check volfile ");
    }

    priv = GF_CALLOC(1, sizeof(struct rda_priv), gf_rda_mt_rda_priv);
    if (!priv)
        goto err;
    this->private = priv;

    GF_ATOMIC_INIT(priv->rda_cache_size, 0);

    this->local_pool = mem_pool_new(struct rda_local, 32);
    if (!this->local_pool)
        goto err;

    GF_OPTION_INIT("rda-request-size", priv->rda_req_size, size_uint64, err);
    GF_OPTION_INIT("rda-low-wmark", priv->rda_low_wmark, size_uint64, err);
    GF_OPTION_INIT("rda-high-wmark", priv->rda_high_wmark, size_uint64, err);
    GF_OPTION_INIT("rda-cache-limit", priv->rda_cache_limit, size_uint64, err);
    GF_OPTION_INIT("parallel-readdir", priv->parallel_readdir, bool, err);
    GF_OPTION_INIT("pass-through", this->pass_through, bool, err);

    return 0;

err:
    if (this->local_pool)
        mem_pool_destroy(this->local_pool);
    if (priv)
        GF_FREE(priv);

    return -1;
}

void
fini(xlator_t *this)
{
    GF_VALIDATE_OR_GOTO("readdir-ahead", this, out);

    GF_FREE(this->private);

out:
    return;
}

struct xlator_fops fops = {
    .opendir = rda_opendir,
    .readdirp = rda_readdirp,
    /* inode write */
    /* TODO: invalidate a dentry's stats if its pointing to a directory
     * when entry operations happen in that directory
     */
    .writev = rda_writev,
    .truncate = rda_truncate,
    .ftruncate = rda_ftruncate,
    .fallocate = rda_fallocate,
    .discard = rda_discard,
    .zerofill = rda_zerofill,
    /* metadata write */
    .setxattr = rda_setxattr,
    .fsetxattr = rda_fsetxattr,
    .setattr = rda_setattr,
    .fsetattr = rda_fsetattr,
    .removexattr = rda_removexattr,
    .fremovexattr = rda_fremovexattr,
};

struct xlator_cbks cbks = {
    .releasedir = rda_releasedir,
    .forget = rda_forget,
};

struct volume_options options[] = {
    {
        .key = {"readdir-ahead"},
        .type = GF_OPTION_TYPE_BOOL,
        .default_value = "off",
        .description = "enable/disable readdir-ahead",
        .op_version = {GD_OP_VERSION_6_0},
        .flags = OPT_FLAG_SETTABLE,
    },
    {
        .key = {"rda-request-size"},
        .type = GF_OPTION_TYPE_SIZET,
        .min = 4096,
        .max = 131072,
        .default_value = "131072",
        .description = "size of buffer in readdirp calls initiated by "
                       "readdir-ahead ",
    },
    {
        .key = {"rda-low-wmark"},
        .type = GF_OPTION_TYPE_SIZET,
        .min = 0,
        .max = 10 * GF_UNIT_MB,
        .default_value = "4096",
        .description = "the value under which readdir-ahead plugs",
    },
    {
        .key = {"rda-high-wmark"},
        .type = GF_OPTION_TYPE_SIZET,
        .min = 0,
        .max = 100 * GF_UNIT_MB,
        .default_value = "128KB",
        .description = "the value over which readdir-ahead unplugs",
    },
    {
        .key = {"rda-cache-limit"},
        .type = GF_OPTION_TYPE_SIZET,
        .min = 0,
        .max = INFINITY,
        .default_value = "10MB",
        .description = "maximum size of cache consumed by readdir-ahead "
                       "xlator. This value is global and total memory "
                       "consumption by readdir-ahead is capped by this "
                       "value, irrespective of the number/size of "
                       "directories cached",
    },
    {.key = {"parallel-readdir"},
     .type = GF_OPTION_TYPE_BOOL,
     .op_version = {GD_OP_VERSION_3_10_0},
     .flags = OPT_FLAG_SETTABLE | OPT_FLAG_CLIENT_OPT | OPT_FLAG_DOC,
     .default_value = "off",
     .description = "If this option is enabled, the readdir operation "
                    "is performed in parallel on all the bricks, thus "
                    "improving the performance of readdir. Note that "
                    "the performance improvement is higher in large "
                    "clusters"},
    {.key = {"pass-through"},
     .type = GF_OPTION_TYPE_BOOL,
     .default_value = "false",
     .op_version = {GD_OP_VERSION_4_1_0},
     .flags = OPT_FLAG_SETTABLE | OPT_FLAG_DOC | OPT_FLAG_CLIENT_OPT,
     .tags = {"readdir-ahead"},
     .description = "Enable/Disable readdir ahead translator"},
    {.key = {NULL}},
};

xlator_api_t xlator_api = {
    .init = init,
    .fini = fini,
    .reconfigure = reconfigure,
    .mem_acct_init = mem_acct_init,
    .op_version = {1}, /* Present from the initial version */
    .fops = &fops,
    .cbks = &cbks,
    .options = options,
    .identifier = "readdir-ahead",
    .category = GF_MAINTAINED,
};