Blob Blame History Raw
/*
   Copyright (c) 2008-2012 Red Hat, Inc. <http://www.redhat.com>
   This file is part of GlusterFS.

   This file is licensed to you under your choice of the GNU Lesser
   General Public License, version 3 or any later version (LGPLv3 or
   later), or the GNU General Public License, version 2 (GPLv2), in all
   cases as published by the Free Software Foundation.
*/
#include <fnmatch.h>

#include "quota.h"
#include <glusterfs/common-utils.h>
#include <glusterfs/defaults.h>
#include <glusterfs/statedump.h>
#include <glusterfs/quota-common-utils.h>
#include "quota-messages.h"
#include <glusterfs/events.h>

struct volume_options options[];

static int32_t
__quota_init_inode_ctx(inode_t *inode, xlator_t *this,
                       quota_inode_ctx_t **context)
{
    int32_t ret = -1;
    quota_inode_ctx_t *ctx = NULL;

    if (inode == NULL) {
        goto out;
    }

    QUOTA_ALLOC_OR_GOTO(ctx, quota_inode_ctx_t, out);

    LOCK_INIT(&ctx->lock);

    if (context != NULL) {
        *context = ctx;
    }

    INIT_LIST_HEAD(&ctx->parents);

    ret = __inode_ctx_put(inode, this, (uint64_t)(long)ctx);
    if (ret) {
        gf_msg(this->name, GF_LOG_WARNING, 0, Q_MSG_INODE_CTX_SET_FAILED,
               "cannot set quota context "
               "in inode (gfid:%s)",
               uuid_utoa(inode->gfid));
        GF_FREE(ctx);
    }
out:
    return ret;
}

static int32_t
quota_inode_ctx_get(inode_t *inode, xlator_t *this, quota_inode_ctx_t **ctx,
                    char create_if_absent)
{
    int32_t ret = 0;
    uint64_t ctx_int;

    LOCK(&inode->lock);
    {
        ret = __inode_ctx_get(inode, this, &ctx_int);

        if ((ret == 0) && (ctx != NULL)) {
            *ctx = (quota_inode_ctx_t *)(unsigned long)ctx_int;
        } else if (create_if_absent) {
            ret = __quota_init_inode_ctx(inode, this, ctx);
        }
    }
    UNLOCK(&inode->lock);

    return ret;
}

int
quota_loc_fill(loc_t *loc, inode_t *inode, inode_t *parent, char *path)
{
    int ret = -1;

    if (!loc || (inode == NULL))
        return ret;

    if (inode) {
        loc->inode = inode_ref(inode);
        gf_uuid_copy(loc->gfid, inode->gfid);
    }

    if (parent) {
        loc->parent = inode_ref(parent);
    }

    if (path != NULL) {
        loc->path = gf_strdup(path);

        loc->name = strrchr(loc->path, '/');
        if (loc->name) {
            loc->name++;
        }
    }

    ret = 0;

    return ret;
}

int
quota_inode_loc_fill(inode_t *inode, loc_t *loc)
{
    char *resolvedpath = NULL;
    inode_t *parent = NULL;
    int ret = -1;
    xlator_t *this = NULL;

    if ((!inode) || (!loc)) {
        return ret;
    }

    this = THIS;

    if ((inode) && __is_root_gfid(inode->gfid)) {
        loc->parent = NULL;
        goto ignore_parent;
    }

    parent = inode_parent(inode, 0, NULL);
    if (!parent) {
        gf_msg_debug(this->name, 0,
                     "cannot find parent for "
                     "inode (gfid:%s)",
                     uuid_utoa(inode->gfid));
    }

ignore_parent:
    ret = inode_path(inode, NULL, &resolvedpath);
    if (ret < 0) {
        gf_msg_debug(this->name, 0,
                     "cannot construct path for "
                     "inode (gfid:%s)",
                     uuid_utoa(inode->gfid));
    }

    ret = quota_loc_fill(loc, inode, parent, resolvedpath);
    if (ret < 0) {
        gf_msg(this->name, GF_LOG_WARNING, ENOMEM, Q_MSG_ENOMEM,
               "cannot fill loc");
        goto err;
    }

err:
    if (parent) {
        inode_unref(parent);
    }

    GF_FREE(resolvedpath);

    return ret;
}

int32_t
quota_local_cleanup(quota_local_t *local)
{
    if (local == NULL) {
        goto out;
    }

    loc_wipe(&local->loc);
    loc_wipe(&local->newloc);
    loc_wipe(&local->oldloc);
    loc_wipe(&local->validate_loc);

    inode_unref(local->inode);

    if (local->xdata)
        dict_unref(local->xdata);

    if (local->validate_xdata)
        dict_unref(local->validate_xdata);

    if (local->stub)
        call_stub_destroy(local->stub);

    LOCK_DESTROY(&local->lock);

    mem_put(local);
out:
    return 0;
}

static quota_local_t *
quota_local_new()
{
    quota_local_t *local = NULL;
    local = mem_get0(THIS->local_pool);
    if (local == NULL)
        goto out;

    LOCK_INIT(&local->lock);
    local->space_available = -1;

out:
    return local;
}

quota_dentry_t *
__quota_dentry_new(quota_inode_ctx_t *ctx, char *name, uuid_t par)
{
    quota_dentry_t *dentry = NULL;
    GF_UNUSED int32_t ret = 0;

    QUOTA_ALLOC_OR_GOTO(dentry, quota_dentry_t, err);

    INIT_LIST_HEAD(&dentry->next);

    dentry->name = gf_strdup(name);
    if (dentry->name == NULL) {
        GF_FREE(dentry);
        dentry = NULL;
        goto err;
    }

    gf_uuid_copy(dentry->par, par);

    if (ctx != NULL)
        list_add_tail(&dentry->next, &ctx->parents);

err:
    return dentry;
}

void
__quota_dentry_free(quota_dentry_t *dentry)
{
    if (dentry == NULL) {
        goto out;
    }

    list_del_init(&dentry->next);

    GF_FREE(dentry->name);
    GF_FREE(dentry);
out:
    return;
}

void
__quota_dentry_del(quota_inode_ctx_t *ctx, const char *name, uuid_t par)
{
    quota_dentry_t *dentry = NULL;
    quota_dentry_t *tmp = NULL;

    list_for_each_entry_safe(dentry, tmp, &ctx->parents, next)
    {
        if ((strcmp(dentry->name, name) == 0) &&
            (gf_uuid_compare(dentry->par, par) == 0)) {
            __quota_dentry_free(dentry);
            break;
        }
    }
}

void
quota_dentry_del(quota_inode_ctx_t *ctx, const char *name, uuid_t par)
{
    LOCK(&ctx->lock);
    {
        __quota_dentry_del(ctx, name, par);
    }
    UNLOCK(&ctx->lock);
}

static inode_t *
__quota_inode_parent(inode_t *inode, uuid_t pargfid, const char *name)
{
    inode_t *parent = NULL;

    parent = inode_parent(inode, pargfid, name);
    inode_unref(inode);
    return parent;
}

static inode_t *
quota_inode_parent(inode_t *inode, uuid_t pargfid, const char *name)
{
    inode_t *parent = NULL;

    parent = __quota_inode_parent(inode, pargfid, name);
    if (!parent)
        gf_msg_callingfn(THIS->name, GF_LOG_ERROR, 0, Q_MSG_PARENT_NULL,
                         "Failed to find "
                         "ancestor for inode (%s)",
                         uuid_utoa(inode->gfid));

    return parent;
}

int32_t
quota_inode_depth(inode_t *inode)
{
    int depth = 0;
    inode_t *cur_inode = NULL;

    cur_inode = inode_ref(inode);
    while (cur_inode && !__is_root_gfid(cur_inode->gfid)) {
        depth++;
        cur_inode = quota_inode_parent(cur_inode, 0, NULL);
        if (!cur_inode)
            depth = -1;
    }

    if (cur_inode)
        inode_unref(cur_inode);

    return depth;
}

int32_t
quota_find_common_ancestor(inode_t *inode1, inode_t *inode2,
                           uuid_t *common_ancestor)
{
    int32_t depth1 = 0;
    int32_t depth2 = 0;
    int32_t ret = -1;
    inode_t *cur_inode1 = NULL;
    inode_t *cur_inode2 = NULL;

    depth1 = quota_inode_depth(inode1);
    if (depth1 < 0)
        goto out;

    depth2 = quota_inode_depth(inode2);
    if (depth2 < 0)
        goto out;

    cur_inode1 = inode_ref(inode1);
    cur_inode2 = inode_ref(inode2);

    while (cur_inode1 && depth1 > depth2) {
        cur_inode1 = quota_inode_parent(cur_inode1, 0, NULL);
        depth1--;
    }

    while (cur_inode2 && depth2 > depth1) {
        cur_inode2 = quota_inode_parent(cur_inode2, 0, NULL);
        depth2--;
    }

    while (depth1 && cur_inode1 && cur_inode2 && cur_inode1 != cur_inode2) {
        cur_inode1 = quota_inode_parent(cur_inode1, 0, NULL);
        cur_inode2 = quota_inode_parent(cur_inode2, 0, NULL);
        depth1--;
    }

    if (cur_inode1 && cur_inode2) {
        gf_uuid_copy(*common_ancestor, cur_inode1->gfid);
        ret = 0;
    }
out:
    if (cur_inode1)
        inode_unref(cur_inode1);

    if (cur_inode2)
        inode_unref(cur_inode2);

    return ret;
}

void
check_ancestory_continue(struct list_head *parents, inode_t *inode,
                         int32_t op_ret, int32_t op_errno, void *data)
{
    call_frame_t *frame = NULL;
    quota_local_t *local = NULL;
    uint32_t link_count = 0;

    frame = data;
    local = frame->local;

    if (parents && list_empty(parents)) {
        gf_msg(THIS->name, GF_LOG_WARNING, EIO, Q_MSG_ANCESTRY_BUILD_FAILED,
               "Couldn't build ancestry for inode (gfid:%s). "
               "Without knowing ancestors till root, quota "
               "cannot be enforced. "
               "Hence, failing fop with EIO",
               uuid_utoa(inode->gfid));
        op_errno = EIO;
        op_ret = -1;
    }

    LOCK(&local->lock);
    {
        link_count = --local->link_count;
        if (op_ret < 0) {
            local->op_ret = op_ret;
            local->op_errno = op_errno;
        }
    }
    UNLOCK(&local->lock);

    if (link_count == 0)
        local->fop_continue_cbk(frame);
}

void
check_ancestory(call_frame_t *frame, inode_t *inode)
{
    inode_t *cur_inode = NULL;
    inode_t *parent = NULL;

    cur_inode = inode_ref(inode);
    while (cur_inode && !__is_root_gfid(cur_inode->gfid)) {
        parent = inode_parent(cur_inode, 0, NULL);
        if (!parent) {
            quota_build_ancestry(cur_inode, check_ancestory_continue, frame);
            inode_unref(cur_inode);
            return;
        }
        inode_unref(cur_inode);
        cur_inode = parent;
    }

    if (cur_inode) {
        inode_unref(cur_inode);
        check_ancestory_continue(NULL, NULL, 0, 0, frame);
    } else {
        check_ancestory_continue(NULL, NULL, -1, ESTALE, frame);
    }
}

void
check_ancestory_2_cbk(struct list_head *parents, inode_t *inode, int32_t op_ret,
                      int32_t op_errno, void *data)
{
    inode_t *this_inode = NULL;
    quota_inode_ctx_t *ctx = NULL;

    this_inode = data;

    if (op_ret < 0)
        goto out;

    if (parents == NULL || list_empty(parents)) {
        gf_msg(THIS->name, GF_LOG_WARNING, 0, Q_MSG_ENFORCEMENT_FAILED,
               "Couldn't build ancestry for inode (gfid:%s). "
               "Without knowing ancestors till root, quota "
               "cannot be enforced.",
               uuid_utoa(this_inode->gfid));
        goto out;
    }

    quota_inode_ctx_get(this_inode, THIS, &ctx, 0);
    if (ctx)
        ctx->ancestry_built = _gf_true;

out:
    inode_unref(this_inode);
}

void
check_ancestory_2(xlator_t *this, quota_local_t *local, inode_t *inode)
{
    inode_t *cur_inode = NULL;
    inode_t *parent = NULL;
    quota_inode_ctx_t *ctx = NULL;
    char *name = NULL;
    uuid_t pgfid = {0};

    name = (char *)local->loc.name;
    if (local->loc.parent) {
        gf_uuid_copy(pgfid, local->loc.parent->gfid);
    }

    cur_inode = inode_ref(inode);
    while (cur_inode && !__is_root_gfid(cur_inode->gfid)) {
        quota_inode_ctx_get(cur_inode, this, &ctx, 0);
        /* build ancestry is required only on the first lookup,
         * so stop crawling when the inode_ctx is set for an inode
         */
        if (ctx && ctx->ancestry_built)
            goto setctx;

        parent = inode_parent(cur_inode, pgfid, name);
        if (!parent) {
            quota_build_ancestry(cur_inode, check_ancestory_2_cbk,
                                 inode_ref(inode));
            goto out;
        }

        if (name != NULL) {
            name = NULL;
            gf_uuid_clear(pgfid);
        }

        inode_unref(cur_inode);
        cur_inode = parent;
    }

setctx:
    if (cur_inode && cur_inode != inode) {
        quota_inode_ctx_get(inode, this, &ctx, 0);
        if (ctx)
            ctx->ancestry_built = _gf_true;
    }
out:
    if (cur_inode)
        inode_unref(cur_inode);
}

static void
quota_link_count_decrement(call_frame_t *frame)
{
    call_frame_t *tmpframe = NULL;
    quota_local_t *local = NULL;
    call_stub_t *stub = NULL;
    int link_count = -1;

    local = frame->local;
    if (local && local->par_frame) {
        local = local->par_frame->local;
        tmpframe = frame;
    }

    if (local == NULL)
        goto out;

    LOCK(&local->lock);
    {
        link_count = --local->link_count;
        if (link_count == 0) {
            stub = local->stub;
            local->stub = NULL;
        }
    }
    UNLOCK(&local->lock);

    if (stub != NULL) {
        call_resume(stub);
    }

out:
    if (tmpframe) {
        local = tmpframe->local;
        tmpframe->local = NULL;

        STACK_DESTROY(frame->root);
        if (local)
            quota_local_cleanup(local);
    }

    return;
}

static void
quota_handle_validate_error(call_frame_t *frame, int32_t op_ret,
                            int32_t op_errno)
{
    quota_local_t *local;

    local = frame->local;
    if (local && local->par_frame)
        local = local->par_frame->local;

    if (local == NULL)
        goto out;

    LOCK(&local->lock);
    {
        if (op_ret < 0) {
            local->op_ret = op_ret;
            local->op_errno = op_errno;
        }
    }
    UNLOCK(&local->lock);

    /* we abort checking limits on this path to root */
    quota_link_count_decrement(frame);
out:
    return;
}

int32_t
quota_validate_cbk(call_frame_t *frame, void *cookie, xlator_t *this,
                   int32_t op_ret, int32_t op_errno, inode_t *inode,
                   struct iatt *buf, dict_t *xdata, struct iatt *postparent)
{
    quota_local_t *local = NULL;
    int32_t ret = 0;
    quota_inode_ctx_t *ctx = NULL;
    uint64_t value = 0;
    quota_meta_t size = {
        0,
    };
    struct timeval tv = {
        0,
    };

    local = frame->local;

    if (op_ret < 0) {
        goto unwind;
    }

    GF_ASSERT(local);
    GF_ASSERT(frame);
    GF_VALIDATE_OR_GOTO_WITH_ERROR("quota", this, unwind, op_errno, EINVAL);
    GF_VALIDATE_OR_GOTO_WITH_ERROR(this->name, xdata, unwind, op_errno, EINVAL);

    ret = inode_ctx_get(local->validate_loc.inode, this, &value);

    ctx = (quota_inode_ctx_t *)(unsigned long)value;
    if ((ret == -1) || (ctx == NULL)) {
        gf_msg(this->name, GF_LOG_WARNING, EINVAL, Q_MSG_INODE_CTX_GET_FAILED,
               "quota context is"
               " not present in  inode (gfid:%s)",
               uuid_utoa(local->validate_loc.inode->gfid));
        op_errno = EINVAL;
        goto unwind;
    }

    ret = quota_dict_get_meta(xdata, QUOTA_SIZE_KEY, SLEN(QUOTA_SIZE_KEY),
                              &size);
    if (ret == -1) {
        gf_msg(this->name, GF_LOG_WARNING, EINVAL, Q_MSG_SIZE_KEY_MISSING,
               "quota size key not present "
               "in dict");
        op_errno = EINVAL;
    }

    local->just_validated = 1; /* so that we don't go into infinite
                                * loop of validation and checking
                                * limit when timeout is zero.
                                */
    gettimeofday(&tv, NULL);
    LOCK(&ctx->lock);
    {
        ctx->size = size.size;
        ctx->file_count = size.file_count;
        ctx->dir_count = size.dir_count;
        memcpy(&ctx->tv, &tv, sizeof(struct timeval));
    }
    UNLOCK(&ctx->lock);

    quota_check_limit(frame, local->validate_loc.inode, this);
    return 0;

unwind:
    quota_handle_validate_error(frame, op_ret, op_errno);
    return 0;
}

static uint64_t
quota_time_elapsed(struct timeval *now, struct timeval *then)
{
    return (now->tv_sec - then->tv_sec);
}

int32_t
quota_timeout(struct timeval *tv, int32_t timeout)
{
    struct timeval now = {
        0,
    };
    int32_t timed_out = 0;

    gettimeofday(&now, NULL);

    if (quota_time_elapsed(&now, tv) >= timeout) {
        timed_out = 1;
    }

    return timed_out;
}

/* Return: 1 if new entry added
 *         0 no entry added
 *        -1 on errors
 */
static int32_t
quota_add_parent(struct list_head *list, char *name, uuid_t pgfid)
{
    quota_dentry_t *entry = NULL;
    gf_boolean_t found = _gf_false;
    int ret = 0;

    if (!list_empty(list)) {
        list_for_each_entry(entry, list, next)
        {
            if (gf_uuid_compare(pgfid, entry->par) == 0) {
                found = _gf_true;
                goto out;
            }
        }
    }

    entry = __quota_dentry_new(NULL, name, pgfid);
    if (entry)
        list_add_tail(&entry->next, list);
    else
        ret = -1;

out:
    if (found)
        return 0;
    else if (ret == 0)
        return 1;
    else
        return -1;
}

/* This function iterates the parent list in inode
 * context and add unique parent to the list
 * Returns number of dentry added to the list, or -1 on errors
 */
static int32_t
quota_add_parents_from_ctx(quota_inode_ctx_t *ctx, struct list_head *list)
{
    int ret = 0;
    quota_dentry_t *dentry = NULL;
    int32_t count = 0;

    if (ctx == NULL || list == NULL)
        goto out;

    LOCK(&ctx->lock);
    {
        list_for_each_entry(dentry, &ctx->parents, next)
        {
            ret = quota_add_parent(list, dentry->name, dentry->par);
            if (ret == 1)
                count++;
            else if (ret == -1)
                break;
        }
    }
    UNLOCK(&ctx->lock);

out:
    return (ret == -1) ? -1 : count;
}

int32_t
quota_build_ancestry_cbk(call_frame_t *frame, void *cookie, xlator_t *this,
                         int32_t op_ret, int32_t op_errno, gf_dirent_t *entries,
                         dict_t *xdata)
{
    inode_t *parent = NULL;
    inode_t *tmp_parent = NULL;
    inode_t *linked_inode = NULL;
    inode_t *tmp_inode = NULL;
    gf_dirent_t *entry = NULL;
    loc_t loc = {
        0,
    };
    quota_dentry_t *dentry = NULL;
    quota_dentry_t *tmp = NULL;
    quota_inode_ctx_t *ctx = NULL;
    struct list_head parents;
    quota_local_t *local = NULL;
    int ret;

    INIT_LIST_HEAD(&parents);

    local = frame->local;
    frame->local = NULL;

    if (op_ret < 0)
        goto err;

    if ((op_ret > 0) && (entries != NULL)) {
        list_for_each_entry(entry, &entries->list, list)
        {
            if (__is_root_gfid(entry->inode->gfid)) {
                /* The list contains a sub-list for each
                 * possible path to the target inode. Each
                 * sub-list starts with the root entry of the
                 * tree and is followed by the child entries
                 * for a particular path to the target entry.
                 * The root entry is an implied sub-list
                 * delimiter, as it denotes we have started
                 * processing a new path. Reset the parent
                 * pointer and continue
                 */

                tmp_parent = NULL;
            } else {
                /* For a non-root entry, link this inode */
                linked_inode = inode_link(entry->inode, tmp_parent,
                                          entry->d_name, &entry->d_stat);
                if (linked_inode) {
                    tmp_inode = entry->inode;
                    entry->inode = linked_inode;
                    inode_unref(tmp_inode);
                } else {
                    gf_msg(this->name, GF_LOG_WARNING, EINVAL,
                           Q_MSG_PARENT_NULL, "inode link failed");
                    op_errno = EINVAL;
                    goto err;
                }
            }

            gf_uuid_copy(loc.gfid, entry->d_stat.ia_gfid);

            loc.inode = inode_ref(entry->inode);
            loc.parent = inode_ref(tmp_parent);
            loc.name = entry->d_name;

            quota_fill_inodectx(this, entry->inode, entry->dict, &loc,
                                &entry->d_stat, &op_errno);

            /* For non-directory, posix_get_ancestry_non_directory
             * returns all hard-links that are represented by nodes
             * adjacent to each other in the dentry-list.
             * (Unlike the directory case where adjacent nodes
             * either have a parent/child relationship or belong to
             * different paths).
             */
            if (entry->inode->ia_type == IA_IFDIR)
                tmp_parent = entry->inode;

            loc_wipe(&loc);
        }
    }

    parent = inode_parent(local->loc.inode, 0, NULL);
    if (parent == NULL) {
        gf_msg(this->name, GF_LOG_WARNING, EINVAL, Q_MSG_PARENT_NULL,
               "parent is NULL");
        op_errno = EINVAL;
        goto err;
    }

    quota_inode_ctx_get(local->loc.inode, this, &ctx, 0);

    ret = quota_add_parents_from_ctx(ctx, &parents);
    if (ret == -1) {
        op_errno = errno;
        goto err;
    }

    if (list_empty(&parents)) {
        /* we built ancestry for a directory */
        list_for_each_entry(entry, &entries->list, list)
        {
            if (entry->inode == local->loc.inode)
                break;
        }

        /* Getting assertion here, need to investigate
           comment for now
           GF_ASSERT (&entry->list != &entries->list);
        */

        ret = quota_add_parent(&parents, entry->d_name, parent->gfid);
        if (ret == -1) {
            op_errno = errno;
            goto err;
        }
    }

    local->ancestry_cbk(&parents, local->loc.inode, 0, 0, local->ancestry_data);
    goto cleanup;

err:
    local->ancestry_cbk(NULL, NULL, -1, op_errno, local->ancestry_data);

cleanup:
    STACK_DESTROY(frame->root);
    quota_local_cleanup(local);

    if (parent != NULL) {
        inode_unref(parent);
        parent = NULL;
    }

    if (!list_empty(&parents)) {
        list_for_each_entry_safe(dentry, tmp, &parents, next)
        {
            __quota_dentry_free(dentry);
        }
    }

    return 0;
}

int
quota_build_ancestry(inode_t *inode, quota_ancestry_built_t ancestry_cbk,
                     void *data)
{
    fd_t *fd = NULL;
    quota_local_t *local = NULL;
    call_frame_t *new_frame = NULL;
    int op_errno = ENOMEM;
    int op_ret = -1;
    xlator_t *this = NULL;
    dict_t *xdata_req = NULL;

    this = THIS;

    xdata_req = dict_new();
    if (xdata_req == NULL)
        goto err;

    fd = fd_anonymous(inode);
    if (fd == NULL)
        goto err;

    new_frame = create_frame(this, this->ctx->pool);
    if (new_frame == NULL)
        goto err;

    local = quota_local_new();
    if (local == NULL)
        goto err;

    new_frame->root->uid = new_frame->root->gid = 0;
    new_frame->local = local;
    local->ancestry_cbk = ancestry_cbk;
    local->ancestry_data = data;
    local->loc.inode = inode_ref(inode);

    op_ret = dict_set_int8(xdata_req, QUOTA_LIMIT_KEY, 1);
    if (op_ret < 0) {
        op_errno = -op_ret;
        goto err;
    }

    op_ret = dict_set_int8(xdata_req, QUOTA_LIMIT_OBJECTS_KEY, 1);
    if (op_ret < 0) {
        op_errno = -op_ret;
        goto err;
    }

    op_ret = dict_set_int8(xdata_req, GET_ANCESTRY_DENTRY_KEY, 1);
    if (op_ret < 0) {
        op_errno = -op_ret;
        goto err;
    }

    /* This would ask posix layer to construct dentry chain till root
     * We don't need to do a opendir, we can use the anonymous fd
     * here for  the readidrp.
     * avoiding opendir also reduces the window size where another FOP
     * can be executed before completion of build ancestry
     */
    STACK_WIND(new_frame, quota_build_ancestry_cbk, FIRST_CHILD(this),
               FIRST_CHILD(this)->fops->readdirp, fd, 0, 0, xdata_req);

    op_ret = 0;

err:
    if (fd)
        fd_unref(fd);

    if (xdata_req)
        dict_unref(xdata_req);

    if (op_ret < 0) {
        ancestry_cbk(NULL, NULL, -1, op_errno, data);

        if (new_frame) {
            local = new_frame->local;
            new_frame->local = NULL;
            STACK_DESTROY(new_frame->root);
        }

        if (local)
            quota_local_cleanup(local);
    }

    return 0;
}

int
quota_validate(call_frame_t *frame, inode_t *inode, xlator_t *this,
               fop_lookup_cbk_t cbk_fn)
{
    quota_local_t *local = NULL;
    int ret = 0;
    dict_t *xdata = NULL;
    quota_priv_t *priv = NULL;

    local = frame->local;
    priv = this->private;

    LOCK(&local->lock);
    {
        loc_wipe(&local->validate_loc);

        ret = quota_inode_loc_fill(inode, &local->validate_loc);
        if (ret < 0) {
            gf_msg(this->name, GF_LOG_WARNING, ENOMEM, Q_MSG_ENFORCEMENT_FAILED,
                   "cannot fill loc for inode (gfid:%s), hence "
                   "aborting quota-checks and continuing with fop",
                   uuid_utoa(inode->gfid));
        }
    }
    UNLOCK(&local->lock);

    if (ret < 0) {
        ret = -ENOMEM;
        goto err;
    }

    xdata = dict_new();
    if (xdata == NULL) {
        ret = -ENOMEM;
        goto err;
    }

    ret = dict_set_int8(xdata, QUOTA_SIZE_KEY, 1);
    if (ret < 0) {
        gf_msg(this->name, GF_LOG_WARNING, ENOMEM, Q_MSG_ENOMEM,
               "dict set failed");
        ret = -ENOMEM;
        goto err;
    }

    ret = dict_set_str(xdata, "volume-uuid", priv->volume_uuid);
    if (ret < 0) {
        gf_msg(this->name, GF_LOG_WARNING, ENOMEM, Q_MSG_ENOMEM,
               "dict set failed");
        ret = -ENOMEM;
        goto err;
    }

    ret = quota_enforcer_lookup(frame, this, xdata, cbk_fn);
    if (ret < 0) {
        ret = -ENOTCONN;
        goto err;
    }

    ret = 0;
err:
    if (xdata)
        dict_unref(xdata);

    return ret;
}

void
quota_check_limit_continuation(struct list_head *parents, inode_t *inode,
                               int32_t op_ret, int32_t op_errno, void *data)
{
    call_frame_t *frame = NULL;
    xlator_t *this = NULL;
    quota_local_t *local = NULL;
    quota_local_t *par_local = NULL;
    quota_dentry_t *entry = NULL;
    inode_t *parent = NULL;
    int parent_count = 0;

    frame = data;
    local = frame->local;
    this = THIS;

    if (local->par_frame)
        par_local = local->par_frame->local;
    else
        par_local = local;

    if ((op_ret < 0) || list_empty(parents)) {
        if (op_ret >= 0) {
            gf_msg(this->name, GF_LOG_WARNING, EIO, Q_MSG_ANCESTRY_BUILD_FAILED,
                   "Couldn't build ancestry for inode (gfid:%s). "
                   "Without knowing ancestors till root, quota"
                   "cannot be enforced. "
                   "Hence, failing fop with EIO",
                   uuid_utoa(inode->gfid));
            op_errno = EIO;
        }

        quota_handle_validate_error(frame, -1, op_errno);
        goto out;
    }

    list_for_each_entry(entry, parents, next) { parent_count++; }

    LOCK(&par_local->lock);
    {
        par_local->link_count += (parent_count - 1);
    }
    UNLOCK(&par_local->lock);

    if (local->par_frame) {
        list_for_each_entry(entry, parents, next)
        {
            parent = inode_find(inode->table, entry->par);
            quota_check_limit(frame, parent, this);
            inode_unref(parent);
        }
    } else {
        list_for_each_entry(entry, parents, next)
        {
            parent = do_quota_check_limit(frame, inode, this, entry, _gf_true);
            if (parent)
                inode_unref(parent);
            else
                quota_link_count_decrement(frame);
        }
    }

out:
    return;
}

int32_t
quota_check_object_limit(call_frame_t *frame, quota_inode_ctx_t *ctx,
                         quota_priv_t *priv, inode_t *_inode, xlator_t *this,
                         int32_t *op_errno, int just_validated,
                         quota_local_t *local, gf_boolean_t *skip_check)
{
    int32_t ret = -1;
    uint32_t timeout = 0;
    char need_validate = 0;
    gf_boolean_t hard_limit_exceeded = 0;
    int64_t object_aggr_count = 0;

    GF_ASSERT(frame);
    GF_ASSERT(priv);
    GF_ASSERT(_inode);
    GF_ASSERT(this);
    GF_ASSERT(local);

    if (ctx != NULL && (ctx->object_hard_lim > 0 || ctx->object_soft_lim)) {
        LOCK(&ctx->lock);
        {
            timeout = priv->soft_timeout;

            object_aggr_count = ctx->file_count + ctx->dir_count + 1;
            if (((ctx->object_soft_lim >= 0) &&
                 (object_aggr_count) > ctx->object_soft_lim)) {
                timeout = priv->hard_timeout;
            }

            if (!just_validated && quota_timeout(&ctx->tv, timeout)) {
                need_validate = 1;
            } else if ((object_aggr_count) > ctx->object_hard_lim) {
                hard_limit_exceeded = 1;
            }
        }
        UNLOCK(&ctx->lock);

        if (need_validate && *skip_check != _gf_true) {
            *skip_check = _gf_true;
            ret = quota_validate(frame, _inode, this, quota_validate_cbk);
            if (ret < 0) {
                *op_errno = -ret;
                *skip_check = _gf_false;
            }
            goto out;
        }

        if (hard_limit_exceeded) {
            local->op_ret = -1;
            local->op_errno = EDQUOT;
            *op_errno = EDQUOT;
            goto out;
        }

        /*We log usage only if quota limit is configured on
           that inode
        */
        quota_log_usage(this, ctx, _inode, 0);
    }

    ret = 0;

out:
    return ret;
}

int32_t
quota_check_size_limit(call_frame_t *frame, quota_inode_ctx_t *ctx,
                       quota_priv_t *priv, inode_t *_inode, xlator_t *this,
                       int32_t *op_errno, int just_validated, int64_t delta,
                       quota_local_t *local, gf_boolean_t *skip_check)
{
    int32_t ret = -1;
    uint32_t timeout = 0;
    char need_validate = 0;
    gf_boolean_t hard_limit_exceeded = 0;
    int64_t space_available = 0;
    int64_t wouldbe_size = 0;

    GF_ASSERT(frame);
    GF_ASSERT(priv);
    GF_ASSERT(_inode);
    GF_ASSERT(this);
    GF_ASSERT(local);

    if (ctx != NULL && (ctx->hard_lim > 0 || ctx->soft_lim > 0)) {
        wouldbe_size = ctx->size + delta;

        LOCK(&ctx->lock);
        {
            timeout = priv->soft_timeout;

            if ((ctx->soft_lim >= 0) && (wouldbe_size > ctx->soft_lim)) {
                timeout = priv->hard_timeout;
            }

            if (!just_validated && quota_timeout(&ctx->tv, timeout)) {
                need_validate = 1;
            } else if (wouldbe_size >= ctx->hard_lim) {
                hard_limit_exceeded = 1;
            }
        }
        UNLOCK(&ctx->lock);

        if (need_validate && *skip_check != _gf_true) {
            *skip_check = _gf_true;
            ret = quota_validate(frame, _inode, this, quota_validate_cbk);
            if (ret < 0) {
                *op_errno = -ret;
                *skip_check = _gf_false;
            }
            goto out;
        }

        if (hard_limit_exceeded) {
            local->op_ret = -1;
            local->op_errno = EDQUOT;

            space_available = ctx->hard_lim - ctx->size;

            if (space_available < 0)
                space_available = 0;

            if ((local->space_available < 0) ||
                (local->space_available > space_available)) {
                local->space_available = space_available;
            }

            if (space_available == 0) {
                *op_errno = EDQUOT;
                goto out;
            }
        }

        /* We log usage only if quota limit is configured on
           that inode. */
        quota_log_usage(this, ctx, _inode, delta);
    }

    ret = 0;
out:
    return ret;
}

int32_t
quota_check_limit(call_frame_t *frame, inode_t *inode, xlator_t *this)
{
    int32_t ret = -1, op_errno = EINVAL;
    inode_t *_inode = NULL, *parent = NULL;
    quota_inode_ctx_t *ctx = NULL;
    quota_priv_t *priv = NULL;
    quota_local_t *local = NULL;
    quota_local_t *par_local = NULL;
    char just_validated = 0;
    int64_t delta = 0;
    int8_t object_delta = 0;
    uint64_t value = 0;
    gf_boolean_t skip_check = _gf_false;

    GF_VALIDATE_OR_GOTO("quota", this, err);
    GF_VALIDATE_OR_GOTO(this->name, frame, err);
    GF_VALIDATE_OR_GOTO(this->name, inode, err);

    local = frame->local;
    GF_VALIDATE_OR_GOTO(this->name, local, err);

    if (local->par_frame) {
        par_local = local->par_frame->local;
        GF_VALIDATE_OR_GOTO(this->name, par_local, err);
    } else {
        par_local = local;
    }

    delta = par_local->delta;
    object_delta = par_local->object_delta;

    GF_VALIDATE_OR_GOTO(this->name, par_local->stub, err);
    /* Allow all the trusted clients
     * Don't block the gluster internal processes like rebalance, gsyncd,
     * self heal etc from the disk quotas.
     *
     * Method: Allow all the clients with PID negative. This is by the
     * assumption that any kernel assigned pid doesn't have the negative
     * number.
     */
    if (0 > frame->root->pid) {
        ret = 0;
        quota_link_count_decrement(frame);
        goto done;
    }

    priv = this->private;

    inode_ctx_get(inode, this, &value);
    ctx = (quota_inode_ctx_t *)(unsigned long)value;

    _inode = inode_ref(inode);

    LOCK(&local->lock);
    {
        just_validated = local->just_validated;
        local->just_validated = 0;
    }
    UNLOCK(&local->lock);

    do {
        /* In a rename operation, enforce should be stopped at common
           ancestor */
        if (!gf_uuid_is_null(par_local->common_ancestor) &&
            !gf_uuid_compare(_inode->gfid, par_local->common_ancestor)) {
            quota_link_count_decrement(frame);
            break;
        }

        if (object_delta <= 0)
            goto skip_check_object_limit;

        ret = quota_check_object_limit(frame, ctx, priv, _inode, this,
                                       &op_errno, just_validated, par_local,
                                       &skip_check);
        if (skip_check == _gf_true)
            goto done;

        if (ret) {
            if (op_errno != EDQUOT)
                gf_msg(this->name, GF_LOG_ERROR, 0, Q_MSG_ENFORCEMENT_FAILED,
                       "Failed to "
                       "check quota object limit");
            goto err;
        }

    skip_check_object_limit:
        ret = quota_check_size_limit(frame, ctx, priv, _inode, this, &op_errno,
                                     just_validated, delta, par_local,
                                     &skip_check);
        if (skip_check == _gf_true)
            goto done;

        if (ret) {
            if (op_errno != EDQUOT)
                gf_msg(this->name, GF_LOG_ERROR, 0, Q_MSG_ENFORCEMENT_FAILED,
                       "Failed to "
                       "check quota size limit");
            goto err;
        }

        if (__is_root_gfid(_inode->gfid)) {
            quota_link_count_decrement(frame);
            break;
        }

        parent = inode_parent(_inode, 0, NULL);
        if (parent == NULL) {
            ret = quota_build_ancestry(_inode, quota_check_limit_continuation,
                                       frame);
            if (ret < 0) {
                op_errno = -ret;
                goto err;
            }

            break;
        }

        inode_unref(_inode);
        _inode = parent;
        just_validated = 0;

        value = 0;
        inode_ctx_get(_inode, this, &value);
        ctx = (quota_inode_ctx_t *)(unsigned long)value;
    } while (1);

done:
    if (_inode != NULL) {
        inode_unref(_inode);
        _inode = NULL;
    }
    return 0;

err:
    quota_handle_validate_error(frame, -1, op_errno);

    inode_unref(_inode);
    return 0;
}

inode_t *
do_quota_check_limit(call_frame_t *frame, inode_t *inode, xlator_t *this,
                     quota_dentry_t *dentry, gf_boolean_t force)
{
    int32_t ret = -1;
    inode_t *parent = NULL;
    call_frame_t *new_frame = NULL;
    quota_local_t *new_local = NULL;

    parent = inode_parent(inode, dentry->par, dentry->name);
    if (parent == NULL) {
        if (force)
            parent = inode_find(inode->table, dentry->par);
        else
            goto out;
    }
    if (parent == NULL)
        goto out;

    new_frame = copy_frame(frame);
    if (new_frame == NULL)
        goto out;

    new_local = quota_local_new();
    if (new_local == NULL)
        goto out;

    new_frame->local = new_local;
    new_local->par_frame = frame;

    quota_check_limit(new_frame, parent, this);

    ret = 0;
out:
    if (ret < 0) {
        if (parent) {
            /* Caller should decrement link_count, in case parent is
             * NULL
             */
            quota_handle_validate_error(frame, -1, ENOMEM);
        }

        if (new_frame) {
            new_frame->local = NULL;
            STACK_DESTROY(new_frame->root);
        }
    }

    return parent;
}

static int
quota_get_limits(xlator_t *this, dict_t *dict, int64_t *hard_lim,
                 int64_t *soft_lim, int64_t *object_hard_limit,
                 int64_t *object_soft_limit)
{
    quota_limits_t *limit = NULL;
    quota_limits_t *object_limit = NULL;
    quota_priv_t *priv = NULL;
    int64_t soft_lim_percent = 0;
    int64_t *ptr = NULL;
    int ret = 0;

    if ((this == NULL) || (dict == NULL) || (hard_lim == NULL) ||
        (soft_lim == NULL))
        goto out;

    priv = this->private;

    ret = dict_get_bin(dict, QUOTA_LIMIT_KEY, (void **)&ptr);
    limit = (quota_limits_t *)ptr;

    if (limit) {
        *hard_lim = ntoh64(limit->hl);
        soft_lim_percent = ntoh64(limit->sl);
    }

    if (soft_lim_percent < 0) {
        soft_lim_percent = priv->default_soft_lim;
    }

    if ((*hard_lim > 0) && (soft_lim_percent > 0)) {
        *soft_lim = (soft_lim_percent * (*hard_lim)) / 100;
    }

    ret = dict_get_bin(dict, QUOTA_LIMIT_OBJECTS_KEY, (void **)&ptr);
    if (ret)
        return 0;
    object_limit = (quota_limits_t *)ptr;

    if (object_limit) {
        *object_hard_limit = ntoh64(object_limit->hl);
        soft_lim_percent = ntoh64(object_limit->sl);
    }

    if (soft_lim_percent < 0) {
        soft_lim_percent = priv->default_soft_lim;
    }

    if ((*object_hard_limit > 0) && (soft_lim_percent > 0)) {
        *object_soft_limit = (soft_lim_percent * (*object_hard_limit)) / 100;
    }

out:
    return 0;
}

int
quota_fill_inodectx(xlator_t *this, inode_t *inode, dict_t *dict, loc_t *loc,
                    struct iatt *buf, int32_t *op_errno)
{
    int32_t ret = -1;
    char found = 0;
    quota_inode_ctx_t *ctx = NULL;
    quota_dentry_t *dentry = NULL;
    uint64_t value = 0;
    int64_t hard_lim = 0;
    int64_t soft_lim = 0;
    int64_t object_hard_limit = 0;
    int64_t object_soft_limit = 0;

    quota_get_limits(this, dict, &hard_lim, &soft_lim, &object_hard_limit,
                     &object_soft_limit);

    inode_ctx_get(inode, this, &value);
    ctx = (quota_inode_ctx_t *)(unsigned long)value;

    if ((((ctx == NULL) || (ctx->hard_lim == hard_lim)) && (hard_lim < 0) &&
         !QUOTA_REG_OR_LNK_FILE(buf->ia_type))) {
        ret = 0;
        goto out;
    }

    ret = quota_inode_ctx_get(inode, this, &ctx, 1);
    if ((ret == -1) || (ctx == NULL)) {
        gf_msg(this->name, GF_LOG_WARNING, ENOMEM, Q_MSG_INODE_CTX_GET_FAILED,
               "cannot create quota "
               "context in inode(gfid:%s)",
               uuid_utoa(inode->gfid));
        ret = -1;
        *op_errno = ENOMEM;
        goto out;
    }

    LOCK(&ctx->lock);
    {
        ctx->hard_lim = hard_lim;
        ctx->soft_lim = soft_lim;
        ctx->object_hard_lim = object_hard_limit;
        ctx->object_soft_lim = object_soft_limit;

        ctx->buf = *buf;

        if (!QUOTA_REG_OR_LNK_FILE(buf->ia_type)) {
            goto unlock;
        }

        /* do nothing if it is a nameless lookup */
        if (loc->name == NULL || !loc->parent)
            goto unlock;

        list_for_each_entry(dentry, &ctx->parents, next)
        {
            if ((strcmp(dentry->name, loc->name) == 0) &&
                (gf_uuid_compare(loc->parent->gfid, dentry->par) == 0)) {
                found = 1;
                break;
            }
        }

        if (!found) {
            dentry = __quota_dentry_new(ctx, (char *)loc->name,
                                        loc->parent->gfid);
            if (dentry == NULL) {
                /*
                  gf_msg (this->name, GF_LOG_WARNING, ENOMEM,
                          Q_MSG_ENOMEM,
                          "cannot create a new dentry (par:%"
-                                          PRId64", name:%s) for inode(ino:%"
-                                          PRId64", gfid:%s)",
-                                          uuid_utoa (local->loc.inode->gfid));
                */
                ret = -1;
                *op_errno = ENOMEM;
                goto unlock;
            }
        }
    }
unlock:
    UNLOCK(&ctx->lock);

out:
    return ret;
}

/*
 * return _gf_true if enforcement is needed and _gf_false otherwise
 */
gf_boolean_t
should_quota_enforce(xlator_t *this, dict_t *dict, glusterfs_fop_t fop)
{
    int ret = 0;

    ret = dict_check_flag(dict, GF_INTERNAL_CTX_KEY, GF_DHT_HEAL_DIR);

    if (fop == GF_FOP_MKDIR && ret == DICT_FLAG_SET) {
        return _gf_false;
    } else if (ret == -ENOENT) {
        gf_msg(this->name, GF_LOG_DEBUG, EINVAL, Q_MSG_INTERNAL_FOP_KEY_MISSING,
               "No internal fop context present");
        goto out;
    }
out:
    return _gf_true;
}

int32_t
quota_lookup_cbk(call_frame_t *frame, void *cookie, xlator_t *this,
                 int32_t op_ret, int32_t op_errno, inode_t *inode,
                 struct iatt *buf, dict_t *dict, struct iatt *postparent)
{
    quota_local_t *local = NULL;
    inode_t *this_inode = NULL;

    local = frame->local;
    frame->local = NULL;

    if (op_ret >= 0 && inode) {
        this_inode = inode_ref(inode);

        op_ret = quota_fill_inodectx(this, inode, dict, &local->loc, buf,
                                     &op_errno);
        if (op_ret < 0)
            op_errno = ENOMEM;
    }

    QUOTA_STACK_UNWIND(lookup, frame, op_ret, op_errno, inode, buf, dict,
                       postparent);

    if (op_ret < 0 || this_inode == NULL || gf_uuid_is_null(this_inode->gfid))
        goto out;

    check_ancestory_2(this, local, this_inode);

out:
    if (this_inode)
        inode_unref(this_inode);

    quota_local_cleanup(local);

    return 0;
}

int32_t
quota_lookup(call_frame_t *frame, xlator_t *this, loc_t *loc, dict_t *xattr_req)
{
    quota_priv_t *priv = NULL;
    int32_t ret = -1;
    quota_local_t *local = NULL;

    priv = this->private;

    WIND_IF_QUOTAOFF(priv->is_quota_on, off);

    xattr_req = xattr_req ? dict_ref(xattr_req) : dict_new();
    if (!xattr_req)
        goto err;

    local = quota_local_new();
    if (local == NULL) {
        goto err;
    }

    frame->local = local;
    loc_copy(&local->loc, loc);

    ret = dict_set_int8(xattr_req, QUOTA_LIMIT_KEY, 1);
    if (ret < 0) {
        gf_msg(this->name, GF_LOG_WARNING, ENOMEM, Q_MSG_ENOMEM,
               "dict set of key for "
               "hard-limit failed");
        goto err;
    }

    ret = dict_set_int8(xattr_req, QUOTA_LIMIT_OBJECTS_KEY, 1);
    if (ret < 0) {
        gf_msg(this->name, GF_LOG_WARNING, ENOMEM, Q_MSG_ENOMEM,
               "dict set of key for quota object limit failed");
        goto err;
    }

    STACK_WIND(frame, quota_lookup_cbk, FIRST_CHILD(this),
               FIRST_CHILD(this)->fops->lookup, loc, xattr_req);

    ret = 0;

err:
    if (xattr_req)
        dict_unref(xattr_req);

    if (ret < 0) {
        QUOTA_STACK_UNWIND(lookup, frame, -1, ENOMEM, NULL, NULL, NULL, NULL);
    }

    return 0;

off:
    STACK_WIND_TAIL(frame, FIRST_CHILD(this), FIRST_CHILD(this)->fops->lookup,
                    loc, xattr_req);
    return 0;
}

int32_t
quota_writev_cbk(call_frame_t *frame, void *cookie, xlator_t *this,
                 int32_t op_ret, int32_t op_errno, struct iatt *prebuf,
                 struct iatt *postbuf, dict_t *xdata)
{
    int32_t ret = 0;
    uint64_t ctx_int = 0;
    quota_inode_ctx_t *ctx = NULL;
    quota_local_t *local = NULL;

    local = frame->local;

    if ((op_ret < 0) || (local == NULL) || (postbuf == NULL)) {
        goto out;
    }

    ret = inode_ctx_get(local->loc.inode, this, &ctx_int);
    if (ret) {
        gf_msg(this->name, GF_LOG_WARNING, 0, Q_MSG_INODE_CTX_GET_FAILED,
               "%s: failed to get the "
               "context",
               local->loc.path);
        goto out;
    }

    ctx = (quota_inode_ctx_t *)(unsigned long)ctx_int;

    if (ctx == NULL) {
        gf_msg(this->name, GF_LOG_WARNING, 0, Q_MSG_INODE_CTX_GET_FAILED,
               "quota context not set in %s (gfid:%s)", local->loc.path,
               uuid_utoa(local->loc.inode->gfid));
        goto out;
    }

    LOCK(&ctx->lock);
    {
        ctx->buf = *postbuf;
    }
    UNLOCK(&ctx->lock);

out:
    QUOTA_STACK_UNWIND(writev, frame, op_ret, op_errno, prebuf, postbuf, xdata);

    return 0;
}

static int gf_quota_enforcer_log;

int32_t
quota_writev_helper(call_frame_t *frame, xlator_t *this, fd_t *fd,
                    struct iovec *vector, int32_t count, off_t off,
                    uint32_t flags, struct iobref *iobref, dict_t *xdata)
{
    quota_local_t *local = NULL;
    int32_t op_errno = EINVAL;
    struct iovec *new_vector = NULL;
    int32_t new_count = 0;

    local = frame->local;

    GF_VALIDATE_OR_GOTO("quota", local, unwind);

    if (local->op_ret == -1) {
        op_errno = local->op_errno;

        if ((op_errno == EDQUOT) && (local->space_available > 0)) {
            new_count = iov_subset(vector, count, 0, local->space_available,
                                   NULL);

            new_vector = GF_CALLOC(new_count, sizeof(struct iovec),
                                   gf_common_mt_iovec);
            if (new_vector == NULL) {
                local->op_ret = -1;
                local->op_errno = ENOMEM;
                goto unwind;
            }

            new_count = iov_subset(vector, count, 0, local->space_available,
                                   new_vector);

            vector = new_vector;
            count = new_count;
        } else if (op_errno == ENOENT || op_errno == ESTALE) {
            /* We may get ENOENT/ESTALE in case of below scenario
             *     fd = open file.txt
             *     unlink file.txt
             *     write on fd
             * Here build_ancestry can fail as the file is removed.
             * For now ignore ENOENT/ESTALE with writes on active fd
             * We need to re-visit this code once we understand
             * how other file-system behave in this scenario
             */
            gf_msg_debug(this->name, 0,
                         "quota enforcer failed "
                         "with ENOENT/ESTALE on %s, cannot check "
                         "quota limits and allowing writes",
                         uuid_utoa(fd->inode->gfid));
        } else if ((op_errno == EINVAL) &&
                   !inode_parent(local->loc.inode, 0, NULL)) {
            /* We may get INVAL with parent == NULL,
             * in case of below scenario
             *     1. enable quota
             *     2. glusterfsd stop/start
             *     3. nameless lookup
             *     4. write on fd
             * Here build_ancestry can fail as the file's pgfid
             * is't exist.
             * For now ignore EINVAL with writes on active fd
             * untils the pgfid is created at name lookup
             */
            GF_LOG_OCCASIONALLY(gf_quota_enforcer_log, this->name,
                                GF_LOG_CRITICAL,
                                "Quota cannot be enforced as "
                                "parent is not available and writes are being "
                                "allowed without checking whether they are "
                                "within quota limits. This can happen if Quota "
                                "crawl is not complete. If crawl has been "
                                "completed, please file a bug.");
        } else {
            goto unwind;
        }
    }

    STACK_WIND(frame, quota_writev_cbk, FIRST_CHILD(this),
               FIRST_CHILD(this)->fops->writev, fd, vector, count, off, flags,
               iobref, xdata);

    if (new_vector != NULL)
        GF_FREE(new_vector);

    return 0;

unwind:
    QUOTA_STACK_UNWIND(writev, frame, -1, op_errno, NULL, NULL, NULL);
    return 0;
}

int32_t
quota_writev(call_frame_t *frame, xlator_t *this, fd_t *fd,
             struct iovec *vector, int32_t count, off_t off, uint32_t flags,
             struct iobref *iobref, dict_t *xdata)
{
    quota_priv_t *priv = NULL;
    int32_t op_errno = EINVAL;
    int32_t parents = 0;
    int32_t fail_count = 0;
    uint64_t size = 0;
    quota_local_t *local = NULL;
    quota_inode_ctx_t *ctx = NULL;
    quota_dentry_t *dentry = NULL, *tmp = NULL;
    call_stub_t *stub = NULL;
    struct list_head head;
    inode_t *par_inode = NULL;

    priv = this->private;

    WIND_IF_QUOTAOFF(priv->is_quota_on, off);

    INIT_LIST_HEAD(&head);

    GF_ASSERT(frame);
    GF_VALIDATE_OR_GOTO("quota", this, unwind);
    GF_VALIDATE_OR_GOTO(this->name, fd, unwind);

    local = quota_local_new();
    if (local == NULL) {
        goto unwind;
    }

    frame->local = local;
    local->loc.inode = inode_ref(fd->inode);

    (void)quota_inode_ctx_get(fd->inode, this, &ctx, 0);
    if (ctx == NULL) {
        gf_msg_debug(this->name, 0,
                     "quota context is NULL on inode"
                     " (%s). If quota is not enabled recently and "
                     "crawler has finished crawling, its an error",
                     uuid_utoa(fd->inode->gfid));
    }

    stub = fop_writev_stub(frame, quota_writev_helper, fd, vector, count, off,
                           flags, iobref, xdata);
    if (stub == NULL) {
        op_errno = ENOMEM;
        goto unwind;
    }

    priv = this->private;
    GF_VALIDATE_OR_GOTO(this->name, priv, unwind);

    parents = quota_add_parents_from_ctx(ctx, &head);
    if (parents == -1) {
        op_errno = errno;
        goto unwind;
    }

    size = iov_length(vector, count);

    LOCK(&local->lock);
    {
        local->delta = size;
        local->object_delta = 0;
        local->link_count = (parents != 0) ? parents : 1;
        local->stub = stub;
    }
    UNLOCK(&local->lock);

    if (parents == 0) {
        /* nameless lookup on this inode, allow quota to reconstruct
         * ancestry as part of check_limit.
         */
        quota_check_limit(frame, fd->inode, this);
    } else {
        list_for_each_entry_safe(dentry, tmp, &head, next)
        {
            par_inode = do_quota_check_limit(frame, fd->inode, this, dentry,
                                             _gf_false);
            if (par_inode == NULL) {
                /* remove stale entry from inode ctx */
                quota_dentry_del(ctx, dentry->name, dentry->par);
                parents--;
                fail_count++;
            } else {
                inode_unref(par_inode);
            }
            __quota_dentry_free(dentry);
        }

        if (parents == 0) {
            LOCK(&local->lock);
            {
                local->link_count++;
            }
            UNLOCK(&local->lock);
            quota_check_limit(frame, fd->inode, this);
        }

        while (fail_count != 0) {
            quota_link_count_decrement(frame);
            fail_count--;
        }
    }

    return 0;

unwind:
    QUOTA_STACK_UNWIND(writev, frame, -1, op_errno, NULL, NULL, NULL);
    return 0;

off:
    STACK_WIND_TAIL(frame, FIRST_CHILD(this), FIRST_CHILD(this)->fops->writev,
                    fd, vector, count, off, flags, iobref, xdata);
    return 0;
}

int32_t
quota_mkdir_cbk(call_frame_t *frame, void *cookie, xlator_t *this,
                int32_t op_ret, int32_t op_errno, inode_t *inode,
                struct iatt *buf, struct iatt *preparent,
                struct iatt *postparent, dict_t *xdata)
{
    QUOTA_STACK_UNWIND(mkdir, frame, op_ret, op_errno, inode, buf, preparent,
                       postparent, xdata);
    return 0;
}

int32_t
quota_mkdir_helper(call_frame_t *frame, xlator_t *this, loc_t *loc, mode_t mode,
                   mode_t umask, dict_t *xdata)
{
    quota_local_t *local = NULL;
    int32_t op_errno = EINVAL;

    local = frame->local;

    GF_VALIDATE_OR_GOTO("quota", local, unwind);

    op_errno = local->op_errno;

    if (local->op_ret == -1) {
        goto unwind;
    }

    STACK_WIND(frame, quota_mkdir_cbk, FIRST_CHILD(this),
               FIRST_CHILD(this)->fops->mkdir, loc, mode, umask, xdata);

    return 0;

unwind:
    QUOTA_STACK_UNWIND(mkdir, frame, -1, op_errno, NULL, NULL, NULL, NULL,
                       NULL);
    return 0;
}

int32_t
quota_mkdir(call_frame_t *frame, xlator_t *this, loc_t *loc, mode_t mode,
            mode_t umask, dict_t *xdata)
{
    quota_priv_t *priv = NULL;
    int32_t ret = 0, op_errno = 0;
    quota_local_t *local = NULL;
    call_stub_t *stub = NULL;

    priv = this->private;
    WIND_IF_QUOTAOFF(priv->is_quota_on, off);

    if (!should_quota_enforce(this, xdata, GF_FOP_MKDIR)) {
        gf_msg(this->name, GF_LOG_DEBUG, 0, Q_MSG_ENFORCEMENT_SKIPPED,
               "Enforcement has been skipped(internal fop).");
        goto off;
    }

    local = quota_local_new();
    if (local == NULL) {
        op_errno = ENOMEM;
        goto err;
    }

    frame->local = local;

    ret = loc_copy(&local->loc, loc);
    if (ret) {
        op_errno = ENOMEM;
        gf_msg(this->name, GF_LOG_WARNING, ENOMEM, Q_MSG_ENOMEM,
               "loc_copy failed");
        goto err;
    }

    stub = fop_mkdir_stub(frame, quota_mkdir_helper, loc, mode, umask, xdata);
    if (stub == NULL) {
        op_errno = ENOMEM;
        goto err;
    }

    LOCK(&local->lock);
    {
        local->stub = stub;
        local->delta = 0;
        local->object_delta = 1;
        local->link_count = 1;
    }
    UNLOCK(&local->lock);

    quota_check_limit(frame, loc->parent, this);
    return 0;

err:
    QUOTA_STACK_UNWIND(mkdir, frame, -1, op_errno, NULL, NULL, NULL, NULL,
                       NULL);

    return 0;

off:
    STACK_WIND_TAIL(frame, FIRST_CHILD(this), FIRST_CHILD(this)->fops->mkdir,
                    loc, mode, umask, xdata);

    return 0;
}

int32_t
quota_create_cbk(call_frame_t *frame, void *cookie, xlator_t *this,
                 int32_t op_ret, int32_t op_errno, fd_t *fd, inode_t *inode,
                 struct iatt *buf, struct iatt *preparent,
                 struct iatt *postparent, dict_t *xdata)
{
    int32_t ret = -1;
    quota_local_t *local = NULL;
    quota_inode_ctx_t *ctx = NULL;
    quota_dentry_t *dentry = NULL;

    local = frame->local;
    if (op_ret < 0) {
        goto unwind;
    }

    ret = quota_inode_ctx_get(inode, this, &ctx, 1);
    if ((ret == -1) || (ctx == NULL)) {
        gf_msg(this->name, GF_LOG_WARNING, ENOMEM, Q_MSG_INODE_CTX_GET_FAILED,
               "cannot create quota "
               "context in inode(gfid:%s)",
               uuid_utoa(inode->gfid));
        op_ret = -1;
        op_errno = ENOMEM;
        goto unwind;
    }

    LOCK(&ctx->lock);
    {
        ctx->buf = *buf;

        dentry = __quota_dentry_new(ctx, (char *)local->loc.name,
                                    local->loc.parent->gfid);
        if (dentry == NULL) {
            gf_msg(this->name, GF_LOG_WARNING, ENOMEM, Q_MSG_ENOMEM,
                   "cannot create a new dentry "
                   "(name:%s) for inode(gfid:%s)",
                   local->loc.name, uuid_utoa(local->loc.inode->gfid));
            op_ret = -1;
            op_errno = ENOMEM;
            goto unlock;
        }
    }
unlock:
    UNLOCK(&ctx->lock);

unwind:
    QUOTA_STACK_UNWIND(create, frame, op_ret, op_errno, fd, inode, buf,
                       preparent, postparent, xdata);
    return 0;
}

int32_t
quota_create_helper(call_frame_t *frame, xlator_t *this, loc_t *loc,
                    int32_t flags, mode_t mode, mode_t umask, fd_t *fd,
                    dict_t *xdata)
{
    quota_local_t *local = NULL;
    int32_t op_errno = EINVAL;

    local = frame->local;

    GF_VALIDATE_OR_GOTO("quota", local, unwind);

    if (local->op_ret == -1) {
        op_errno = local->op_errno;
        goto unwind;
    }

    STACK_WIND(frame, quota_create_cbk, FIRST_CHILD(this),
               FIRST_CHILD(this)->fops->create, loc, flags, mode, umask, fd,
               xdata);
    return 0;

unwind:
    QUOTA_STACK_UNWIND(create, frame, -1, op_errno, NULL, NULL, NULL, NULL,
                       NULL, NULL);
    return 0;
}

int32_t
quota_create(call_frame_t *frame, xlator_t *this, loc_t *loc, int32_t flags,
             mode_t mode, mode_t umask, fd_t *fd, dict_t *xdata)
{
    quota_priv_t *priv = NULL;
    int32_t ret = -1;
    quota_local_t *local = NULL;
    int32_t op_errno = 0;
    call_stub_t *stub = NULL;

    priv = this->private;

    WIND_IF_QUOTAOFF(priv->is_quota_on, off);
    QUOTA_WIND_FOR_INTERNAL_FOP(xdata, off);

    local = quota_local_new();
    if (local == NULL) {
        op_errno = ENOMEM;
        goto err;
    }

    frame->local = local;

    ret = loc_copy(&local->loc, loc);
    if (ret) {
        gf_msg(this->name, GF_LOG_WARNING, ENOMEM, Q_MSG_ENOMEM,
               "loc_copy failed");
        op_errno = ENOMEM;
        goto err;
    }

    stub = fop_create_stub(frame, quota_create_helper, loc, flags, mode, umask,
                           fd, xdata);
    if (stub == NULL) {
        goto err;
    }

    LOCK(&local->lock);
    {
        local->link_count = 1;
        local->stub = stub;
        local->delta = 0;
        local->object_delta = 1;
    }
    UNLOCK(&local->lock);

    quota_check_limit(frame, loc->parent, this);
    return 0;
err:
    QUOTA_STACK_UNWIND(create, frame, -1, op_errno, NULL, NULL, NULL, NULL,
                       NULL, NULL);

    return 0;

off:
    STACK_WIND_TAIL(frame, FIRST_CHILD(this), FIRST_CHILD(this)->fops->create,
                    loc, flags, mode, umask, fd, xdata);
    return 0;
}

int32_t
quota_unlink_cbk(call_frame_t *frame, void *cookie, xlator_t *this,
                 int32_t op_ret, int32_t op_errno, struct iatt *preparent,
                 struct iatt *postparent, dict_t *xdata)
{
    quota_local_t *local = NULL;
    quota_inode_ctx_t *ctx = NULL;
    uint64_t value = 0;

    if (op_ret < 0) {
        goto out;
    }

    local = (quota_local_t *)frame->local;

    inode_ctx_get(local->loc.inode, this, &value);
    ctx = (quota_inode_ctx_t *)(unsigned long)value;

    if (ctx == NULL) {
        gf_msg(this->name, GF_LOG_INFO, EINVAL, Q_MSG_INODE_CTX_GET_FAILED,
               "quota context not set inode (gfid:%s)",
               uuid_utoa(local->loc.gfid));
        goto out;
    }

    quota_dentry_del(ctx, local->loc.name, local->loc.parent->gfid);

out:
    QUOTA_STACK_UNWIND(unlink, frame, op_ret, op_errno, preparent, postparent,
                       xdata);
    return 0;
}

int32_t
quota_unlink(call_frame_t *frame, xlator_t *this, loc_t *loc, int xflag,
             dict_t *xdata)
{
    quota_priv_t *priv = NULL;
    int32_t ret = -1;
    quota_local_t *local = NULL;

    priv = this->private;

    WIND_IF_QUOTAOFF(priv->is_quota_on, off);

    local = quota_local_new();
    if (local == NULL) {
        goto err;
    }

    frame->local = local;

    ret = loc_copy(&local->loc, loc);
    if (ret) {
        gf_msg(this->name, GF_LOG_WARNING, ENOMEM, Q_MSG_ENOMEM,
               "loc_copy failed");
        goto err;
    }

    STACK_WIND(frame, quota_unlink_cbk, FIRST_CHILD(this),
               FIRST_CHILD(this)->fops->unlink, loc, xflag, xdata);

    ret = 0;

err:
    if (ret == -1) {
        QUOTA_STACK_UNWIND(unlink, frame, -1, 0, NULL, NULL, NULL);
    }

    return 0;

off:
    STACK_WIND_TAIL(frame, FIRST_CHILD(this), FIRST_CHILD(this)->fops->unlink,
                    loc, xflag, xdata);
    return 0;
}

int32_t
quota_link_cbk(call_frame_t *frame, void *cookie, xlator_t *this,
               int32_t op_ret, int32_t op_errno, inode_t *inode,
               struct iatt *buf, struct iatt *preparent,
               struct iatt *postparent, dict_t *xdata)
{
    int32_t ret = -1;
    quota_local_t *local = NULL;
    quota_inode_ctx_t *ctx = NULL;
    quota_dentry_t *dentry = NULL;
    char found = 0;

    if (op_ret < 0) {
        goto out;
    }

    local = (quota_local_t *)frame->local;

    ret = quota_inode_ctx_get(inode, this, &ctx, 0);
    if ((ret == -1) || (ctx == NULL)) {
        gf_msg_debug(this->name, 0,
                     "quota context is NULL on inode"
                     " (%s). If quota is not enabled recently and "
                     "crawler has finished crawling, its an error",
                     uuid_utoa(inode->gfid));
        goto out;
    }

    LOCK(&ctx->lock);
    {
        list_for_each_entry(dentry, &ctx->parents, next)
        {
            if ((strcmp(dentry->name, local->loc.name) == 0) &&
                (gf_uuid_compare(local->loc.parent->gfid, dentry->par) == 0)) {
                found = 1;

                gf_msg_debug(this->name, 0,
                             "new entry being"
                             " linked (name:%s) for inode "
                             "(gfid:%s) is already present "
                             "in inode-dentry-list",
                             dentry->name, uuid_utoa(local->loc.inode->gfid));
                break;
            }
        }

        if (!found) {
            dentry = __quota_dentry_new(ctx, (char *)local->loc.name,
                                        local->loc.parent->gfid);
            if (dentry == NULL) {
                gf_msg(this->name, GF_LOG_WARNING, ENOMEM, Q_MSG_ENOMEM,
                       "cannot create a new dentry (name:%s)"
                       "for inode(gfid:%s)",
                       local->loc.name, uuid_utoa(local->loc.inode->gfid));
                op_ret = -1;
                op_errno = ENOMEM;
                goto unlock;
            }
        }

        ctx->buf = *buf;
    }
unlock:
    UNLOCK(&ctx->lock);

out:
    QUOTA_STACK_UNWIND(link, frame, op_ret, op_errno, inode, buf, preparent,
                       postparent, xdata);

    return 0;
}

int32_t
quota_link_helper(call_frame_t *frame, xlator_t *this, loc_t *oldloc,
                  loc_t *newloc, dict_t *xdata)
{
    quota_local_t *local = NULL;
    int32_t op_errno = EINVAL;

    local = frame->local;

    GF_VALIDATE_OR_GOTO("quota", local, unwind);

    op_errno = local->op_errno;

    if (local->op_ret == -1) {
        goto unwind;
    }

    STACK_WIND(frame, quota_link_cbk, FIRST_CHILD(this),
               FIRST_CHILD(this)->fops->link, oldloc, newloc, xdata);
    return 0;

unwind:
    QUOTA_STACK_UNWIND(link, frame, -1, op_errno, NULL, NULL, NULL, NULL, NULL);
    return 0;
}

void
quota_link_continue(call_frame_t *frame)
{
    int32_t ret = -1;
    int32_t op_errno = EIO;
    quota_local_t *local = NULL;
    uuid_t common_ancestor = {0};
    xlator_t *this = NULL;
    quota_inode_ctx_t *ctx = NULL;
    inode_t *src_parent = NULL;
    inode_t *dst_parent = NULL;

    local = frame->local;
    this = THIS;

    if (local->op_ret < 0) {
        op_errno = local->op_errno;
        goto err;
    }

    if (local->xdata && dict_get(local->xdata, GLUSTERFS_INTERNAL_FOP_KEY)) {
        /* Treat link as rename, crawl upwards only till common ancestor
         */
        ret = quota_find_common_ancestor(
            local->oldloc.inode, local->newloc.parent, &common_ancestor);
        if (ret < 0 || gf_uuid_is_null(common_ancestor)) {
            gf_msg(this->name, GF_LOG_ERROR, ESTALE,
                   Q_MSG_ANCESTRY_BUILD_FAILED,
                   "failed to get "
                   "common_ancestor for %s and %s",
                   local->oldloc.path, local->newloc.path);
            op_errno = ESTALE;
            goto err;
        }
    } else {
        /* Treat link as a new file.
         * TODO: Currently marker accounts twice for the links created
         * across directories.
         * This needs re-visit if marker accounts only once
         * for the links created across directories
         */
        if (local->oldloc.parent)
            src_parent = inode_ref(local->oldloc.parent);
        else
            src_parent = inode_parent(local->oldloc.inode, 0, NULL);
        dst_parent = local->newloc.parent;

        /* No need to check quota limit if src and dst parents are same
         */
        if (src_parent == dst_parent ||
            gf_uuid_compare(src_parent->gfid, dst_parent->gfid) == 0) {
            inode_unref(src_parent);
            goto wind;
        }

        inode_unref(src_parent);
    }

    quota_inode_ctx_get(local->oldloc.inode, this, &ctx, 0);
    if (ctx == NULL) {
        gf_msg_debug(this->name, 0,
                     "quota context is NULL on inode"
                     " (%s). If quota is not enabled recently and "
                     "crawler has finished crawling, its an error",
                     uuid_utoa(local->oldloc.inode->gfid));
    }

    LOCK(&local->lock);
    {
        local->link_count = 1;
        local->delta = (ctx != NULL) ? ctx->buf.ia_blocks * 512 : 0;
        local->object_delta = 1;
        gf_uuid_copy(local->common_ancestor, common_ancestor);
    }
    UNLOCK(&local->lock);

    quota_check_limit(frame, local->newloc.parent, this);
    return;

err:
    QUOTA_STACK_UNWIND(link, frame, -1, op_errno, NULL, NULL, NULL, NULL, NULL);
    return;

wind:
    STACK_WIND(frame, quota_link_cbk, FIRST_CHILD(this),
               FIRST_CHILD(this)->fops->link, &(local->oldloc),
               &(local->newloc), local->xdata);
    return;
}

int32_t
quota_link(call_frame_t *frame, xlator_t *this, loc_t *oldloc, loc_t *newloc,
           dict_t *xdata)
{
    quota_priv_t *priv = NULL;
    int32_t ret = -1;
    int32_t op_errno = ENOMEM;
    quota_local_t *local = NULL;
    call_stub_t *stub = NULL;

    priv = this->private;

    WIND_IF_QUOTAOFF(priv->is_quota_on, off);

    local = quota_local_new();
    if (local == NULL) {
        goto err;
    }

    frame->local = (void *)local;

    if (xdata)
        local->xdata = dict_ref(xdata);

    ret = loc_copy(&local->loc, newloc);
    if (ret == -1) {
        gf_msg(this->name, GF_LOG_WARNING, ENOMEM, Q_MSG_ENOMEM,
               "loc_copy failed");
        goto err;
    }

    ret = loc_copy(&local->oldloc, oldloc);
    if (ret < 0) {
        gf_msg(this->name, GF_LOG_WARNING, ENOMEM, Q_MSG_ENOMEM,
               "loc_copy failed");
        goto err;
    }

    ret = loc_copy(&local->newloc, newloc);
    if (ret < 0) {
        gf_msg(this->name, GF_LOG_WARNING, ENOMEM, Q_MSG_ENOMEM,
               "loc_copy failed");
        goto err;
    }

    /* No need to check quota limit if src and dst parents are same */
    if (oldloc->parent && newloc->parent &&
        !gf_uuid_compare(oldloc->parent->gfid, newloc->parent->gfid)) {
        gf_msg_debug(this->name, GF_LOG_DEBUG,
                     "link %s -> %s are "
                     "in the same directory, so skip check limit",
                     oldloc->path, newloc->path);
        goto wind;
    }

    stub = fop_link_stub(frame, quota_link_helper, oldloc, newloc, xdata);
    if (stub == NULL) {
        goto err;
    }

    LOCK(&local->lock);
    {
        local->link_count = 2;
        local->fop_continue_cbk = quota_link_continue;
        local->stub = stub;
    }
    UNLOCK(&local->lock);

    check_ancestory(frame, newloc->parent);

    /* source parent can be NULL, so do check_ancestry on a file */
    if (oldloc->parent)
        check_ancestory(frame, oldloc->parent);
    else
        check_ancestory(frame, oldloc->inode);

    return 0;

err:
    QUOTA_STACK_UNWIND(link, frame, -1, op_errno, NULL, NULL, NULL, NULL, NULL);
    return 0;

off:
    STACK_WIND_TAIL(frame, FIRST_CHILD(this), FIRST_CHILD(this)->fops->link,
                    oldloc, newloc, xdata);
    return 0;

wind:
    STACK_WIND(frame, quota_link_cbk, FIRST_CHILD(this),
               FIRST_CHILD(this)->fops->link, oldloc, newloc, xdata);
    return 0;
}

int32_t
quota_rename_cbk(call_frame_t *frame, void *cookie, xlator_t *this,
                 int32_t op_ret, int32_t op_errno, struct iatt *buf,
                 struct iatt *preoldparent, struct iatt *postoldparent,
                 struct iatt *prenewparent, struct iatt *postnewparent,
                 dict_t *xdata)
{
    int32_t ret = -1;
    quota_local_t *local = NULL;
    quota_inode_ctx_t *ctx = NULL;
    quota_dentry_t *old_dentry = NULL, *dentry = NULL;
    char new_dentry_found = 0;

    if (op_ret < 0) {
        goto out;
    }

    local = frame->local;

    GF_VALIDATE_OR_GOTO("quota", local, out);

    if (!QUOTA_REG_OR_LNK_FILE(local->oldloc.inode->ia_type))
        goto out;

    ret = quota_inode_ctx_get(local->oldloc.inode, this, &ctx, 0);
    if ((ret == -1) || (ctx == NULL)) {
        gf_msg_debug(this->name, 0,
                     "quota context is NULL on inode"
                     " (%s). If quota is not enabled recently and "
                     "crawler has finished crawling, its an error",
                     uuid_utoa(local->oldloc.inode->gfid));

        goto out;
    }

    LOCK(&ctx->lock);
    {
        list_for_each_entry(dentry, &ctx->parents, next)
        {
            if ((strcmp(dentry->name, local->oldloc.name) == 0) &&
                (gf_uuid_compare(local->oldloc.parent->gfid, dentry->par) ==
                 0)) {
                old_dentry = dentry;
            } else if ((strcmp(dentry->name, local->newloc.name) == 0) &&
                       (gf_uuid_compare(local->newloc.parent->gfid,
                                        dentry->par) == 0)) {
                new_dentry_found = 1;
                gf_msg_debug(this->name, 0,
                             "new entry being "
                             "linked (name:%s) for inode (gfid:%s) "
                             "is in inode-dentry-list",
                             dentry->name,
                             uuid_utoa(local->oldloc.inode->gfid));
            }

            if (old_dentry && new_dentry_found)
                break;
        }

        if (old_dentry != NULL) {
            __quota_dentry_free(old_dentry);
        } else {
            gf_msg_debug(this->name, 0,
                         "dentry corresponding"
                         "the path just renamed (name:%s) is not"
                         " present",
                         local->oldloc.name);
        }

        if (!new_dentry_found) {
            dentry = __quota_dentry_new(ctx, (char *)local->newloc.name,
                                        local->newloc.parent->gfid);
            if (dentry == NULL) {
                gf_msg(this->name, GF_LOG_WARNING, ENOMEM, Q_MSG_ENOMEM,
                       "cannot create a new dentry (name:%s) "
                       "for inode(gfid:%s)",
                       local->newloc.name,
                       uuid_utoa(local->newloc.inode->gfid));
                op_ret = -1;
                op_errno = ENOMEM;
                goto unlock;
            }
        }

        ctx->buf = *buf;
    }
unlock:
    UNLOCK(&ctx->lock);

out:
    QUOTA_STACK_UNWIND(rename, frame, op_ret, op_errno, buf, preoldparent,
                       postoldparent, prenewparent, postnewparent, xdata);

    return 0;
}

int32_t
quota_rename_helper(call_frame_t *frame, xlator_t *this, loc_t *oldloc,
                    loc_t *newloc, dict_t *xdata)
{
    quota_local_t *local = NULL;
    int32_t op_errno = EINVAL;

    local = frame->local;

    GF_VALIDATE_OR_GOTO("quota", local, unwind);

    op_errno = local->op_errno;

    if (local->op_ret == -1) {
        goto unwind;
    }

    STACK_WIND(frame, quota_rename_cbk, FIRST_CHILD(this),
               FIRST_CHILD(this)->fops->rename, oldloc, newloc, xdata);

    return 0;

unwind:
    QUOTA_STACK_UNWIND(rename, frame, -1, op_errno, NULL, NULL, NULL, NULL,
                       NULL, NULL);
    return 0;
}

static int32_t
quota_rename_get_size_cbk(call_frame_t *frame, void *cookie, xlator_t *this,
                          int32_t op_ret, int32_t op_errno, inode_t *inode,
                          struct iatt *buf, dict_t *xdata,
                          struct iatt *postparent)
{
    quota_local_t *local = NULL;
    int32_t ret = 0;
    int64_t *size = 0;

    GF_ASSERT(frame);
    GF_VALIDATE_OR_GOTO_WITH_ERROR("quota", this, out, op_errno, EINVAL);
    GF_VALIDATE_OR_GOTO_WITH_ERROR(this->name, xdata, out, op_errno, EINVAL);
    local = frame->local;
    GF_ASSERT(local);
    local->link_count = 1;

    if (op_ret < 0)
        goto out;

    ret = dict_get_bin(xdata, QUOTA_SIZE_KEY, (void **)&size);
    if (ret < 0) {
        gf_msg(this->name, GF_LOG_WARNING, EINVAL, Q_MSG_SIZE_KEY_MISSING,
               "size key not present in dict");
        op_errno = EINVAL;
        goto out;
    }
    local->delta = ntoh64(*size);
    local->object_delta = 1;
    quota_check_limit(frame, local->newloc.parent, this);
    return 0;

out:
    quota_handle_validate_error(frame, -1, op_errno);
    return 0;
}

void
quota_rename_continue(call_frame_t *frame)
{
    int32_t ret = -1;
    int32_t op_errno = EIO;
    quota_local_t *local = NULL;
    uuid_t common_ancestor = {0};
    xlator_t *this = NULL;
    quota_inode_ctx_t *ctx = NULL;

    local = frame->local;
    this = THIS;

    if (local->op_ret < 0) {
        op_errno = local->op_errno;
        goto err;
    }

    ret = quota_find_common_ancestor(local->oldloc.parent, local->newloc.parent,
                                     &common_ancestor);
    if (ret < 0 || gf_uuid_is_null(common_ancestor)) {
        gf_msg(this->name, GF_LOG_ERROR, ESTALE, Q_MSG_ANCESTRY_BUILD_FAILED,
               "failed to get "
               "common_ancestor for %s and %s",
               local->oldloc.path, local->newloc.path);
        op_errno = ESTALE;
        goto err;
    }

    LOCK(&local->lock);
    {
        local->link_count = 1;
        gf_uuid_copy(local->common_ancestor, common_ancestor);
    }
    UNLOCK(&local->lock);

    if (QUOTA_REG_OR_LNK_FILE(local->oldloc.inode->ia_type)) {
        ret = quota_inode_ctx_get(local->oldloc.inode, this, &ctx, 0);
        if (ctx == NULL) {
            gf_msg(this->name, GF_LOG_WARNING, 0, Q_MSG_INODE_CTX_GET_FAILED,
                   "quota context not set in inode (gfid:%s), "
                   "considering file size as zero while enforcing "
                   "quota on new ancestry",
                   uuid_utoa(local->oldloc.inode->gfid));

            local->delta = 0;
            local->object_delta = 1;
        } else {
            /* FIXME: We need to account for the size occupied by
             * this inode on the target directory. To avoid double
             * accounting, we need to modify enforcer to perform
             * quota_check_limit only up till the least common
             * ancestor directory inode*/

            /* FIXME: The following code assumes that regular files
             * and link files are present, in their entirety, in a
             * single brick. This *assumption is invalid in the
             * case of stripe.*/

            local->delta = ctx->buf.ia_blocks * 512;
            local->object_delta = 1;
        }

    } else if (IA_ISDIR(local->oldloc.inode->ia_type)) {
        ret = quota_validate(frame, local->oldloc.inode, this,
                             quota_rename_get_size_cbk);
        if (ret) {
            op_errno = -ret;
            goto err;
        }

        return;
    }

    quota_check_limit(frame, local->newloc.parent, this);
    return;

err:
    QUOTA_STACK_UNWIND(rename, frame, -1, op_errno, NULL, NULL, NULL, NULL,
                       NULL, NULL);
    return;
}

int32_t
quota_rename(call_frame_t *frame, xlator_t *this, loc_t *oldloc, loc_t *newloc,
             dict_t *xdata)
{
    quota_priv_t *priv = NULL;
    int32_t ret = -1;
    int32_t op_errno = ENOMEM;
    quota_local_t *local = NULL;
    call_stub_t *stub = NULL;

    priv = this->private;

    WIND_IF_QUOTAOFF(priv->is_quota_on, off);

    local = quota_local_new();
    if (local == NULL) {
        goto err;
    }

    frame->local = local;

    ret = loc_copy(&local->oldloc, oldloc);
    if (ret < 0) {
        gf_msg(this->name, GF_LOG_WARNING, ENOMEM, Q_MSG_ENOMEM,
               "loc_copy failed");
        goto err;
    }

    ret = loc_copy(&local->newloc, newloc);
    if (ret < 0) {
        gf_msg(this->name, GF_LOG_WARNING, ENOMEM, Q_MSG_ENOMEM,
               "loc_copy failed");
        goto err;
    }

    /* No need to check quota limit if src and dst parents are same */
    if (oldloc->parent && newloc->parent &&
        !gf_uuid_compare(oldloc->parent->gfid, newloc->parent->gfid)) {
        gf_msg_debug(this->name, 0,
                     "rename %s -> %s are "
                     "in the same directory, so skip check limit",
                     oldloc->path, newloc->path);
        goto wind;
    }

    stub = fop_rename_stub(frame, quota_rename_helper, oldloc, newloc, xdata);
    if (stub == NULL) {
        goto err;
    }

    LOCK(&local->lock);
    {
        /* link_count here tell how many check_ancestry should be done
         * before continuing the FOP
         */
        local->link_count = 2;
        local->stub = stub;
        local->fop_continue_cbk = quota_rename_continue;
    }
    UNLOCK(&local->lock);

    check_ancestory(frame, newloc->parent);
    check_ancestory(frame, oldloc->parent);
    return 0;

err:
    QUOTA_STACK_UNWIND(rename, frame, -1, op_errno, NULL, NULL, NULL, NULL,
                       NULL, NULL);
    return 0;

off:
    STACK_WIND_TAIL(frame, FIRST_CHILD(this), FIRST_CHILD(this)->fops->rename,
                    oldloc, newloc, xdata);
    return 0;

wind:
    STACK_WIND(frame, quota_rename_cbk, FIRST_CHILD(this),
               FIRST_CHILD(this)->fops->rename, oldloc, newloc, xdata);
    return 0;
}

int32_t
quota_symlink_cbk(call_frame_t *frame, void *cookie, xlator_t *this,
                  int32_t op_ret, int32_t op_errno, inode_t *inode,
                  struct iatt *buf, struct iatt *preparent,
                  struct iatt *postparent, dict_t *xdata)
{
    quota_local_t *local = NULL;
    quota_inode_ctx_t *ctx = NULL;
    quota_dentry_t *dentry = NULL;
    int32_t ret = -1;

    if (op_ret < 0) {
        goto out;
    }

    local = frame->local;

    ret = quota_inode_ctx_get(local->loc.inode, this, &ctx, 1);
    if ((ret == -1) || (ctx == NULL)) {
        gf_msg_debug(this->name, 0,
                     "quota context is NULL on inode"
                     " (%s). If quota is not enabled recently and "
                     "crawler has finished crawling, its an error",
                     uuid_utoa(local->loc.inode->gfid));

        goto out;
    }

    LOCK(&ctx->lock);
    {
        ctx->buf = *buf;

        dentry = __quota_dentry_new(ctx, (char *)local->loc.name,
                                    local->loc.parent->gfid);
        if (dentry == NULL) {
            gf_msg(this->name, GF_LOG_WARNING, ENOMEM, Q_MSG_ENOMEM,
                   "cannot create "
                   "a new dentry (name:%s) for inode(gfid:%s)",
                   local->loc.name, uuid_utoa(local->loc.inode->gfid));
            op_ret = -1;
            op_errno = ENOMEM;
        }
    }
    UNLOCK(&ctx->lock);

out:
    QUOTA_STACK_UNWIND(symlink, frame, op_ret, op_errno, inode, buf, preparent,
                       postparent, xdata);

    return 0;
}

int
quota_symlink_helper(call_frame_t *frame, xlator_t *this, const char *linkpath,
                     loc_t *loc, mode_t umask, dict_t *xdata)
{
    quota_local_t *local = NULL;
    int32_t op_errno = EINVAL;

    local = frame->local;

    GF_VALIDATE_OR_GOTO("quota", local, unwind);

    if (local->op_ret == -1) {
        op_errno = local->op_errno;
        goto unwind;
    }

    STACK_WIND(frame, quota_symlink_cbk, FIRST_CHILD(this),
               FIRST_CHILD(this)->fops->symlink, linkpath, loc, umask, xdata);
    return 0;

unwind:
    QUOTA_STACK_UNWIND(symlink, frame, -1, op_errno, NULL, NULL, NULL, NULL,
                       NULL);
    return 0;
}

int
quota_symlink(call_frame_t *frame, xlator_t *this, const char *linkpath,
              loc_t *loc, mode_t umask, dict_t *xdata)
{
    quota_priv_t *priv = NULL;
    int32_t ret = -1;
    int32_t op_errno = ENOMEM;
    quota_local_t *local = NULL;
    call_stub_t *stub = NULL;

    priv = this->private;

    WIND_IF_QUOTAOFF(priv->is_quota_on, off);

    local = quota_local_new();
    if (local == NULL) {
        goto err;
    }

    frame->local = local;

    ret = loc_copy(&local->loc, loc);
    if (ret < 0) {
        gf_msg(this->name, GF_LOG_WARNING, ENOMEM, Q_MSG_ENOMEM,
               "loc_copy failed");
        goto err;
    }

    stub = fop_symlink_stub(frame, quota_symlink_helper, linkpath, loc, umask,
                            xdata);
    if (stub == NULL) {
        goto err;
    }

    LOCK(&local->lock);
    {
        local->stub = stub;
        local->delta = strlen(linkpath);
        local->object_delta = 1;
        local->link_count = 1;
    }
    UNLOCK(&local->lock);

    quota_check_limit(frame, loc->parent, this);
    return 0;

err:
    QUOTA_STACK_UNWIND(symlink, frame, -1, op_errno, NULL, NULL, NULL, NULL,
                       NULL);

    return 0;

off:
    STACK_WIND_TAIL(frame, FIRST_CHILD(this), FIRST_CHILD(this)->fops->symlink,
                    linkpath, loc, umask, xdata);
    return 0;
}

int32_t
quota_truncate_cbk(call_frame_t *frame, void *cookie, xlator_t *this,
                   int32_t op_ret, int32_t op_errno, struct iatt *prebuf,
                   struct iatt *postbuf, dict_t *xdata)
{
    quota_local_t *local = NULL;
    quota_inode_ctx_t *ctx = NULL;

    if (op_ret < 0) {
        goto out;
    }

    local = frame->local;

    GF_VALIDATE_OR_GOTO("quota", local, out);

    quota_inode_ctx_get(local->loc.inode, this, &ctx, 0);
    if (ctx == NULL) {
        gf_msg_debug(this->name, 0,
                     "quota context is NULL on inode"
                     " (%s). If quota is not enabled recently and "
                     "crawler has finished crawling, its an error",
                     uuid_utoa(local->loc.inode->gfid));
        goto out;
    }

    LOCK(&ctx->lock);
    {
        ctx->buf = *postbuf;
    }
    UNLOCK(&ctx->lock);

out:
    QUOTA_STACK_UNWIND(truncate, frame, op_ret, op_errno, prebuf, postbuf,
                       xdata);
    return 0;
}

int32_t
quota_truncate(call_frame_t *frame, xlator_t *this, loc_t *loc, off_t offset,
               dict_t *xdata)
{
    quota_priv_t *priv = NULL;
    int32_t ret = -1;
    quota_local_t *local = NULL;

    priv = this->private;

    WIND_IF_QUOTAOFF(priv->is_quota_on, off);

    local = quota_local_new();
    if (local == NULL) {
        goto err;
    }

    frame->local = local;

    ret = loc_copy(&local->loc, loc);
    if (ret < 0) {
        gf_msg(this->name, GF_LOG_WARNING, ENOMEM, Q_MSG_ENOMEM,
               "loc_copy failed");
        goto err;
    }

    STACK_WIND(frame, quota_truncate_cbk, FIRST_CHILD(this),
               FIRST_CHILD(this)->fops->truncate, loc, offset, xdata);

    return 0;

err:
    QUOTA_STACK_UNWIND(truncate, frame, -1, ENOMEM, NULL, NULL, NULL);

    return 0;
off:
    STACK_WIND_TAIL(frame, FIRST_CHILD(this), FIRST_CHILD(this)->fops->truncate,
                    loc, offset, xdata);
    return 0;
}

int32_t
quota_ftruncate_cbk(call_frame_t *frame, void *cookie, xlator_t *this,
                    int32_t op_ret, int32_t op_errno, struct iatt *prebuf,
                    struct iatt *postbuf, dict_t *xdata)
{
    quota_local_t *local = NULL;
    quota_inode_ctx_t *ctx = NULL;

    if (op_ret < 0) {
        goto out;
    }

    local = frame->local;

    GF_VALIDATE_OR_GOTO("quota", local, out);

    quota_inode_ctx_get(local->loc.inode, this, &ctx, 0);
    if (ctx == NULL) {
        gf_msg_debug(this->name, 0,
                     "quota context is NULL on inode"
                     " (%s). If quota is not enabled recently and "
                     "crawler has finished crawling, its an error",
                     uuid_utoa(local->loc.inode->gfid));
        goto out;
    }

    LOCK(&ctx->lock);
    {
        ctx->buf = *postbuf;
    }
    UNLOCK(&ctx->lock);

out:
    QUOTA_STACK_UNWIND(ftruncate, frame, op_ret, op_errno, prebuf, postbuf,
                       xdata);
    return 0;
}

int32_t
quota_ftruncate(call_frame_t *frame, xlator_t *this, fd_t *fd, off_t offset,
                dict_t *xdata)
{
    quota_priv_t *priv = NULL;
    quota_local_t *local = NULL;

    priv = this->private;

    WIND_IF_QUOTAOFF(priv->is_quota_on, off);

    local = quota_local_new();
    if (local == NULL)
        goto err;

    frame->local = local;

    local->loc.inode = inode_ref(fd->inode);

    STACK_WIND(frame, quota_ftruncate_cbk, FIRST_CHILD(this),
               FIRST_CHILD(this)->fops->ftruncate, fd, offset, xdata);

    return 0;
err:
    QUOTA_STACK_UNWIND(ftruncate, frame, -1, ENOMEM, NULL, NULL, NULL);

    return 0;

off:
    STACK_WIND_TAIL(frame, FIRST_CHILD(this),
                    FIRST_CHILD(this)->fops->ftruncate, fd, offset, xdata);
    return 0;
}

static int32_t
quota_send_dir_limit_to_cli(call_frame_t *frame, xlator_t *this, inode_t *inode,
                            const char *name, const int namelen)
{
    int32_t ret = 0;
    int dir_limit_len = 0;
    char dir_limit[64] = {
        0,
    };
    dict_t *dict = NULL;
    quota_inode_ctx_t *ctx = NULL;
    uint64_t value = 0;
    quota_priv_t *priv = NULL;

    priv = this->private;
    if (!priv->is_quota_on) {
        dir_limit_len = snprintf(dir_limit, sizeof(dir_limit),
                                 "Quota is disabled please turn on");
        goto dict_set;
    }

    ret = inode_ctx_get(inode, this, &value);
    if (ret < 0)
        goto out;

    ctx = (quota_inode_ctx_t *)(unsigned long)value;
    dir_limit_len = snprintf(dir_limit, sizeof(dir_limit),
                             "%" PRId64 ",%" PRId64, ctx->size, ctx->hard_lim);

dict_set:
    dict = dict_new();
    if (dict == NULL) {
        ret = -1;
        goto out;
    }

    ret = dict_set_nstrn(dict, (char *)name, namelen, dir_limit, dir_limit_len);
    if (ret < 0)
        goto out;

    gf_msg_debug(this->name, 0, "str = %s", dir_limit);

    QUOTA_STACK_UNWIND(getxattr, frame, 0, 0, dict, NULL);

    ret = 0;

out:
    if (dict)
        dict_unref(dict);
    return ret;
}

int32_t
quota_fgetxattr(call_frame_t *frame, xlator_t *this, fd_t *fd, const char *name,
                dict_t *xdata)
{
    int32_t ret = 0;

    if (name && strcasecmp(name, "trusted.limit.list") == 0) {
        ret = quota_send_dir_limit_to_cli(frame, this, fd->inode,
                                          "trusted.limit.list",
                                          SLEN("trusted.limit.list"));
        if (ret == 0) {
            return 0;
        }
    }

    STACK_WIND(frame, default_fgetxattr_cbk, FIRST_CHILD(this),
               FIRST_CHILD(this)->fops->fgetxattr, fd, name, xdata);
    return 0;
}

int32_t
quota_getxattr(call_frame_t *frame, xlator_t *this, loc_t *loc,
               const char *name, dict_t *xdata)
{
    int32_t ret = 0;

    if ((name != NULL) && strcasecmp(name, "trusted.limit.list") == 0) {
        ret = quota_send_dir_limit_to_cli(frame, this, loc->inode,
                                          "trusted.limit.list",
                                          SLEN("trusted.limit.list"));
        if (ret == 0)
            return 0;
    }

    STACK_WIND(frame, default_getxattr_cbk, FIRST_CHILD(this),
               FIRST_CHILD(this)->fops->getxattr, loc, name, xdata);
    return 0;
}

int32_t
quota_stat_cbk(call_frame_t *frame, void *cookie, xlator_t *this,
               int32_t op_ret, int32_t op_errno, struct iatt *buf,
               dict_t *xdata)
{
    quota_local_t *local = NULL;
    quota_inode_ctx_t *ctx = NULL;

    if (op_ret < 0) {
        goto out;
    }

    local = frame->local;

    GF_VALIDATE_OR_GOTO("quota", local, out);

    quota_inode_ctx_get(local->loc.inode, this, &ctx, 0);
    if (ctx == NULL) {
        if (!IA_ISDIR(buf->ia_type)) {
            gf_msg_debug(this->name, 0,
                         "quota context is NULL on inode"
                         " (%s). If quota is not enabled recently and "
                         "crawler has finished crawling, its an error",
                         uuid_utoa(local->loc.inode->gfid));
        }

        goto out;
    }

    if (buf) {
        LOCK(&ctx->lock);
        ctx->buf = *buf;
        UNLOCK(&ctx->lock);
    }

out:
    QUOTA_STACK_UNWIND(stat, frame, op_ret, op_errno, buf, xdata);
    return 0;
}

int32_t
quota_stat(call_frame_t *frame, xlator_t *this, loc_t *loc, dict_t *xdata)
{
    quota_priv_t *priv = NULL;
    quota_local_t *local = NULL;
    int32_t ret = -1;

    priv = this->private;

    WIND_IF_QUOTAOFF(priv->is_quota_on, off);

    local = quota_local_new();
    if (local == NULL) {
        goto unwind;
    }

    frame->local = local;
    ret = loc_copy(&local->loc, loc);
    if (ret < 0) {
        gf_msg(this->name, GF_LOG_WARNING, ENOMEM, Q_MSG_ENOMEM,
               "loc_copy failed");
        goto unwind;
    }

    STACK_WIND(frame, quota_stat_cbk, FIRST_CHILD(this),
               FIRST_CHILD(this)->fops->stat, loc, xdata);
    return 0;

unwind:
    QUOTA_STACK_UNWIND(stat, frame, -1, ENOMEM, NULL, NULL);
    return 0;

off:
    STACK_WIND_TAIL(frame, FIRST_CHILD(this), FIRST_CHILD(this)->fops->stat,
                    loc, xdata);
    return 0;
}

int32_t
quota_fstat_cbk(call_frame_t *frame, void *cookie, xlator_t *this,
                int32_t op_ret, int32_t op_errno, struct iatt *buf,
                dict_t *xdata)
{
    quota_local_t *local = NULL;
    quota_inode_ctx_t *ctx = NULL;

    if (op_ret < 0) {
        goto out;
    }

    local = frame->local;

    GF_VALIDATE_OR_GOTO("quota", local, out);

    quota_inode_ctx_get(local->loc.inode, this, &ctx, 0);
    if (ctx == NULL) {
        if (!IA_ISDIR(buf->ia_type)) {
            gf_msg_debug(this->name, 0,
                         "quota context is NULL on inode"
                         " (%s). If quota is not enabled recently and "
                         "crawler has finished crawling, its an error",
                         uuid_utoa(local->loc.inode->gfid));
        }

        goto out;
    }

    if (buf) {
        LOCK(&ctx->lock);
        ctx->buf = *buf;
        UNLOCK(&ctx->lock);
    }

out:
    QUOTA_STACK_UNWIND(fstat, frame, op_ret, op_errno, buf, xdata);
    return 0;
}

int32_t
quota_fstat(call_frame_t *frame, xlator_t *this, fd_t *fd, dict_t *xdata)
{
    quota_priv_t *priv = NULL;
    quota_local_t *local = NULL;

    priv = this->private;

    WIND_IF_QUOTAOFF(priv->is_quota_on, off);

    local = quota_local_new();
    if (local == NULL) {
        goto unwind;
    }

    frame->local = local;

    local->loc.inode = inode_ref(fd->inode);

    STACK_WIND(frame, quota_fstat_cbk, FIRST_CHILD(this),
               FIRST_CHILD(this)->fops->fstat, fd, xdata);
    return 0;

unwind:
    QUOTA_STACK_UNWIND(fstat, frame, -1, ENOMEM, NULL, NULL);
    return 0;

off:
    STACK_WIND_TAIL(frame, FIRST_CHILD(this), FIRST_CHILD(this)->fops->fstat,
                    fd, xdata);
    return 0;
}

int32_t
quota_readlink_cbk(call_frame_t *frame, void *cookie, xlator_t *this,
                   int32_t op_ret, int32_t op_errno, const char *path,
                   struct iatt *buf, dict_t *xdata)
{
    quota_local_t *local = NULL;
    quota_inode_ctx_t *ctx = NULL;

    if (op_ret < 0) {
        goto out;
    }

    local = frame->local;

    GF_VALIDATE_OR_GOTO("quota", local, out);

    quota_inode_ctx_get(local->loc.inode, this, &ctx, 0);
    if (ctx == NULL) {
        gf_msg_debug(this->name, 0,
                     "quota context is NULL on inode"
                     " (%s). If quota is not enabled recently and "
                     "crawler has finished crawling, its an error",
                     uuid_utoa(local->loc.inode->gfid));
        goto out;
    }

    LOCK(&ctx->lock);
    {
        ctx->buf = *buf;
    }
    UNLOCK(&ctx->lock);

out:
    QUOTA_STACK_UNWIND(readlink, frame, op_ret, op_errno, path, buf, xdata);
    return 0;
}

int32_t
quota_readlink(call_frame_t *frame, xlator_t *this, loc_t *loc, size_t size,
               dict_t *xdata)
{
    quota_priv_t *priv = NULL;
    quota_local_t *local = NULL;
    int32_t ret = -1;

    priv = this->private;

    WIND_IF_QUOTAOFF(priv->is_quota_on, off);

    local = quota_local_new();
    if (local == NULL) {
        goto unwind;
    }

    frame->local = local;

    ret = loc_copy(&local->loc, loc);
    if (ret < 0) {
        gf_msg(this->name, GF_LOG_WARNING, ENOMEM, Q_MSG_ENOMEM,
               "loc_copy failed");
        goto unwind;
    }

    STACK_WIND(frame, quota_readlink_cbk, FIRST_CHILD(this),
               FIRST_CHILD(this)->fops->readlink, loc, size, xdata);
    return 0;

unwind:
    QUOTA_STACK_UNWIND(readlink, frame, -1, ENOMEM, NULL, NULL, NULL);
    return 0;

off:
    STACK_WIND_TAIL(frame, FIRST_CHILD(this), FIRST_CHILD(this)->fops->readlink,
                    loc, size, xdata);
    return 0;
}

int32_t
quota_readv_cbk(call_frame_t *frame, void *cookie, xlator_t *this,
                int32_t op_ret, int32_t op_errno, struct iovec *vector,
                int32_t count, struct iatt *buf, struct iobref *iobref,
                dict_t *xdata)
{
    quota_local_t *local = NULL;
    quota_inode_ctx_t *ctx = NULL;

    if (op_ret < 0) {
        goto out;
    }

    local = frame->local;

    GF_VALIDATE_OR_GOTO("quota", local, out);

    quota_inode_ctx_get(local->loc.inode, this, &ctx, 0);
    if (ctx == NULL) {
        gf_msg_debug(this->name, 0,
                     "quota context is NULL on inode"
                     " (%s). If quota is not enabled recently and "
                     "crawler has finished crawling, its an error",
                     uuid_utoa(local->loc.inode->gfid));
        goto out;
    }

    LOCK(&ctx->lock);
    {
        ctx->buf = *buf;
    }
    UNLOCK(&ctx->lock);

out:
    QUOTA_STACK_UNWIND(readv, frame, op_ret, op_errno, vector, count, buf,
                       iobref, xdata);
    return 0;
}

int32_t
quota_readv(call_frame_t *frame, xlator_t *this, fd_t *fd, size_t size,
            off_t offset, uint32_t flags, dict_t *xdata)
{
    quota_priv_t *priv = NULL;
    quota_local_t *local = NULL;

    priv = this->private;

    WIND_IF_QUOTAOFF(priv->is_quota_on, off);

    local = quota_local_new();
    if (local == NULL) {
        goto unwind;
    }

    frame->local = local;

    local->loc.inode = inode_ref(fd->inode);

    STACK_WIND(frame, quota_readv_cbk, FIRST_CHILD(this),
               FIRST_CHILD(this)->fops->readv, fd, size, offset, flags, xdata);
    return 0;

unwind:
    QUOTA_STACK_UNWIND(readv, frame, -1, ENOMEM, NULL, -1, NULL, NULL, NULL);
    return 0;

off:
    STACK_WIND_TAIL(frame, FIRST_CHILD(this), FIRST_CHILD(this)->fops->readv,
                    fd, size, offset, flags, xdata);
    return 0;
}

int32_t
quota_fsync_cbk(call_frame_t *frame, void *cookie, xlator_t *this,
                int32_t op_ret, int32_t op_errno, struct iatt *prebuf,
                struct iatt *postbuf, dict_t *xdata)
{
    quota_local_t *local = NULL;
    quota_inode_ctx_t *ctx = NULL;

    if (op_ret < 0) {
        goto out;
    }

    local = frame->local;

    GF_VALIDATE_OR_GOTO("quota", local, out);

    quota_inode_ctx_get(local->loc.inode, this, &ctx, 0);
    if (ctx == NULL) {
        gf_msg_debug(this->name, 0,
                     "quota context is NULL on inode"
                     " (%s). If quota is not enabled recently and "
                     "crawler has finished crawling, its an error",
                     uuid_utoa(local->loc.inode->gfid));
        goto out;
    }

    LOCK(&ctx->lock);
    {
        ctx->buf = *postbuf;
    }
    UNLOCK(&ctx->lock);

out:
    QUOTA_STACK_UNWIND(fsync, frame, op_ret, op_errno, prebuf, postbuf, xdata);
    return 0;
}

int32_t
quota_fsync(call_frame_t *frame, xlator_t *this, fd_t *fd, int32_t flags,
            dict_t *xdata)
{
    quota_priv_t *priv = NULL;
    quota_local_t *local = NULL;

    priv = this->private;

    WIND_IF_QUOTAOFF(priv->is_quota_on, off);

    local = quota_local_new();
    if (local == NULL) {
        goto unwind;
    }

    local->loc.inode = inode_ref(fd->inode);

    frame->local = local;

    STACK_WIND(frame, quota_fsync_cbk, FIRST_CHILD(this),
               FIRST_CHILD(this)->fops->fsync, fd, flags, xdata);
    return 0;

unwind:
    QUOTA_STACK_UNWIND(fsync, frame, -1, ENOMEM, NULL, NULL, NULL);
    return 0;

off:
    STACK_WIND_TAIL(frame, FIRST_CHILD(this), FIRST_CHILD(this)->fops->fsync,
                    fd, flags, xdata);
    return 0;
}

int32_t
quota_setattr_cbk(call_frame_t *frame, void *cookie, xlator_t *this,
                  int32_t op_ret, int32_t op_errno, struct iatt *statpre,
                  struct iatt *statpost, dict_t *xdata)
{
    quota_local_t *local = NULL;
    quota_inode_ctx_t *ctx = NULL;

    if (op_ret < 0) {
        goto out;
    }

    local = frame->local;

    GF_VALIDATE_OR_GOTO("quota", local, out);

    quota_inode_ctx_get(local->loc.inode, this, &ctx, 0);
    if (ctx == NULL) {
        if (!IA_ISDIR(statpost->ia_type)) {
            gf_msg_debug(this->name, 0,
                         "quota context is NULL on inode"
                         " (%s). If quota is not enabled recently and "
                         "crawler has finished crawling, its an error",
                         uuid_utoa(local->loc.inode->gfid));
        }

        goto out;
    }

    if (statpost) {
        LOCK(&ctx->lock);
        ctx->buf = *statpost;
        UNLOCK(&ctx->lock);
    }

out:
    QUOTA_STACK_UNWIND(setattr, frame, op_ret, op_errno, statpre, statpost,
                       xdata);
    return 0;
}

int32_t
quota_setattr(call_frame_t *frame, xlator_t *this, loc_t *loc,
              struct iatt *stbuf, int32_t valid, dict_t *xdata)
{
    quota_priv_t *priv = NULL;
    quota_local_t *local = NULL;
    int32_t ret = -1;

    priv = this->private;

    WIND_IF_QUOTAOFF(priv->is_quota_on, off);

    local = quota_local_new();
    if (local == NULL) {
        goto unwind;
    }

    frame->local = local;

    ret = loc_copy(&local->loc, loc);
    if (ret < 0) {
        gf_msg(this->name, GF_LOG_WARNING, ENOMEM, Q_MSG_ENOMEM,
               "loc_copy failed");
        goto unwind;
    }

    STACK_WIND(frame, quota_setattr_cbk, FIRST_CHILD(this),
               FIRST_CHILD(this)->fops->setattr, loc, stbuf, valid, xdata);
    return 0;

unwind:
    QUOTA_STACK_UNWIND(setattr, frame, -1, ENOMEM, NULL, NULL, NULL);
    return 0;

off:
    STACK_WIND_TAIL(frame, FIRST_CHILD(this), FIRST_CHILD(this)->fops->setattr,
                    loc, stbuf, valid, xdata);
    return 0;
}

int32_t
quota_fsetattr_cbk(call_frame_t *frame, void *cookie, xlator_t *this,
                   int32_t op_ret, int32_t op_errno, struct iatt *statpre,
                   struct iatt *statpost, dict_t *xdata)
{
    quota_local_t *local = NULL;
    quota_inode_ctx_t *ctx = NULL;

    if (op_ret < 0) {
        goto out;
    }

    local = frame->local;

    GF_VALIDATE_OR_GOTO("quota", local, out);

    quota_inode_ctx_get(local->loc.inode, this, &ctx, 0);
    if (ctx == NULL) {
        if (!IA_ISDIR(statpost->ia_type)) {
            gf_msg_debug(this->name, 0,
                         "quota context is NULL on inode"
                         " (%s). If quota is not enabled recently and "
                         "crawler has finished crawling, its an error",
                         uuid_utoa(local->loc.inode->gfid));
        }

        goto out;
    }

    LOCK(&ctx->lock);
    {
        ctx->buf = *statpost;
    }
    UNLOCK(&ctx->lock);

out:
    QUOTA_STACK_UNWIND(fsetattr, frame, op_ret, op_errno, statpre, statpost,
                       xdata);
    return 0;
}

int32_t
quota_fsetattr(call_frame_t *frame, xlator_t *this, fd_t *fd,
               struct iatt *stbuf, int32_t valid, dict_t *xdata)
{
    quota_priv_t *priv = NULL;
    quota_local_t *local = NULL;

    priv = this->private;

    WIND_IF_QUOTAOFF(priv->is_quota_on, off);

    local = quota_local_new();
    if (local == NULL) {
        goto unwind;
    }

    frame->local = local;

    local->loc.inode = inode_ref(fd->inode);

    STACK_WIND(frame, quota_fsetattr_cbk, FIRST_CHILD(this),
               FIRST_CHILD(this)->fops->fsetattr, fd, stbuf, valid, xdata);
    return 0;

unwind:
    QUOTA_STACK_UNWIND(fsetattr, frame, -1, ENOMEM, NULL, NULL, NULL);
    return 0;

off:
    STACK_WIND_TAIL(frame, FIRST_CHILD(this), FIRST_CHILD(this)->fops->fsetattr,
                    fd, stbuf, valid, xdata);
    return 0;
}

int32_t
quota_mknod_cbk(call_frame_t *frame, void *cookie, xlator_t *this,
                int32_t op_ret, int32_t op_errno, inode_t *inode,
                struct iatt *buf, struct iatt *preparent,
                struct iatt *postparent, dict_t *xdata)
{
    int32_t ret = -1;
    quota_local_t *local = NULL;
    quota_inode_ctx_t *ctx = NULL;
    quota_dentry_t *dentry = NULL;

    local = frame->local;
    if (op_ret < 0) {
        goto unwind;
    }

    ret = quota_inode_ctx_get(inode, this, &ctx, 1);
    if ((ret == -1) || (ctx == NULL)) {
        gf_msg(this->name, GF_LOG_WARNING, 0, Q_MSG_INODE_CTX_GET_FAILED,
               "cannot create quota context in "
               "inode(gfid:%s)",
               uuid_utoa(inode->gfid));
        op_ret = -1;
        op_errno = ENOMEM;
        goto unwind;
    }

    LOCK(&ctx->lock);
    {
        ctx->buf = *buf;

        dentry = __quota_dentry_new(ctx, (char *)local->loc.name,
                                    local->loc.parent->gfid);
        if (dentry == NULL) {
            gf_msg(this->name, GF_LOG_WARNING, ENOMEM, Q_MSG_ENOMEM,
                   "cannot create a new dentry "
                   "(name:%s) for inode(gfid:%s)",
                   local->loc.name, uuid_utoa(local->loc.inode->gfid));
            op_ret = -1;
            op_errno = ENOMEM;
            goto unlock;
        }
    }
unlock:
    UNLOCK(&ctx->lock);

unwind:
    QUOTA_STACK_UNWIND(mknod, frame, op_ret, op_errno, inode, buf, preparent,
                       postparent, xdata);
    return 0;
}

int
quota_mknod_helper(call_frame_t *frame, xlator_t *this, loc_t *loc, mode_t mode,
                   dev_t rdev, mode_t umask, dict_t *xdata)
{
    quota_local_t *local = NULL;
    int32_t op_errno = EINVAL;

    local = frame->local;

    GF_VALIDATE_OR_GOTO("quota", local, unwind);

    if (local->op_ret == -1) {
        op_errno = local->op_errno;
        goto unwind;
    }

    STACK_WIND(frame, quota_mknod_cbk, FIRST_CHILD(this),
               FIRST_CHILD(this)->fops->mknod, loc, mode, rdev, umask, xdata);

    return 0;

unwind:
    QUOTA_STACK_UNWIND(mknod, frame, -1, op_errno, NULL, NULL, NULL, NULL,
                       NULL);
    return 0;
}

int
quota_mknod(call_frame_t *frame, xlator_t *this, loc_t *loc, mode_t mode,
            dev_t rdev, mode_t umask, dict_t *xdata)
{
    quota_priv_t *priv = NULL;
    int32_t ret = -1;
    quota_local_t *local = NULL;
    call_stub_t *stub = NULL;

    priv = this->private;

    WIND_IF_QUOTAOFF(priv->is_quota_on, off);
    QUOTA_WIND_FOR_INTERNAL_FOP(xdata, off);

    local = quota_local_new();
    if (local == NULL) {
        goto err;
    }

    frame->local = local;

    ret = loc_copy(&local->loc, loc);
    if (ret) {
        gf_msg(this->name, GF_LOG_WARNING, ENOMEM, Q_MSG_ENOMEM,
               "loc_copy failed");
        goto err;
    }

    stub = fop_mknod_stub(frame, quota_mknod_helper, loc, mode, rdev, umask,
                          xdata);
    if (stub == NULL) {
        goto err;
    }

    LOCK(&local->lock);
    {
        local->link_count = 1;
        local->stub = stub;
        local->delta = 0;
        local->object_delta = 1;
    }
    UNLOCK(&local->lock);

    quota_check_limit(frame, loc->parent, this);
    return 0;

err:
    QUOTA_STACK_UNWIND(mknod, frame, -1, ENOMEM, NULL, NULL, NULL, NULL, NULL);
    return 0;

off:
    STACK_WIND_TAIL(frame, FIRST_CHILD(this), FIRST_CHILD(this)->fops->mknod,
                    loc, mode, rdev, umask, xdata);
    return 0;
}

int
quota_setxattr_cbk(call_frame_t *frame, void *cookie, xlator_t *this,
                   int op_ret, int op_errno, dict_t *xdata)
{
    quota_local_t *local = NULL;
    quota_inode_ctx_t *ctx = NULL;
    int ret = 0;

    if (op_ret < 0) {
        goto out;
    }

    local = frame->local;
    if (!local)
        goto out;

    ret = quota_inode_ctx_get(local->loc.inode, this, &ctx, 1);
    if ((ret < 0) || (ctx == NULL)) {
        op_errno = -1;
        goto out;
    }

    LOCK(&ctx->lock);
    {
        ctx->hard_lim = local->limit.hl;
        ctx->soft_lim = local->limit.sl;
        ctx->object_hard_lim = local->object_limit.hl;
        ctx->object_soft_lim = local->object_limit.sl;
    }
    UNLOCK(&ctx->lock);

out:
    QUOTA_STACK_UNWIND(setxattr, frame, op_ret, op_errno, xdata);
    return 0;
}

int
quota_setxattr(call_frame_t *frame, xlator_t *this, loc_t *loc, dict_t *dict,
               int flags, dict_t *xdata)
{
    quota_priv_t *priv = NULL;
    int op_errno = EINVAL;
    int op_ret = -1;
    int64_t hard_lim = -1;
    int64_t soft_lim = -1;
    int64_t object_hard_limit = -1;
    int64_t object_soft_limit = -1;
    quota_local_t *local = NULL;
    gf_boolean_t internal_fop = _gf_false;

    priv = this->private;

    WIND_IF_QUOTAOFF(priv->is_quota_on, off);

    VALIDATE_OR_GOTO(frame, err);
    VALIDATE_OR_GOTO(this, err);
    VALIDATE_OR_GOTO(loc, err);

    if (xdata && dict_get_sizen(xdata, GLUSTERFS_INTERNAL_FOP_KEY))
        internal_fop = _gf_true;

    if (frame->root->pid >= 0 && internal_fop == _gf_false) {
        GF_IF_INTERNAL_XATTR_GOTO("trusted.glusterfs.quota*", dict, op_errno,
                                  err);
        GF_IF_INTERNAL_XATTR_GOTO("trusted.pgfid*", dict, op_errno, err);
    }

    quota_get_limits(this, dict, &hard_lim, &soft_lim, &object_hard_limit,
                     &object_soft_limit);

    if (hard_lim > 0 || object_hard_limit > 0) {
        local = quota_local_new();
        if (local == NULL) {
            op_errno = ENOMEM;
            goto err;
        }
        frame->local = local;
        loc_copy(&local->loc, loc);
    }

    if (hard_lim > 0) {
        local->limit.hl = hard_lim;
        local->limit.sl = soft_lim;
    }

    if (object_hard_limit > 0) {
        local->object_limit.hl = object_hard_limit;
        local->object_limit.sl = object_soft_limit;
    }

    STACK_WIND(frame, quota_setxattr_cbk, FIRST_CHILD(this),
               FIRST_CHILD(this)->fops->setxattr, loc, dict, flags, xdata);
    return 0;
err:
    QUOTA_STACK_UNWIND(setxattr, frame, op_ret, op_errno, NULL);
    return 0;

off:
    STACK_WIND_TAIL(frame, FIRST_CHILD(this), FIRST_CHILD(this)->fops->setxattr,
                    loc, dict, flags, xdata);
    return 0;
}

int
quota_fsetxattr_cbk(call_frame_t *frame, void *cookie, xlator_t *this,
                    int op_ret, int op_errno, dict_t *xdata)
{
    quota_inode_ctx_t *ctx = NULL;
    quota_local_t *local = NULL;

    if (op_ret < 0)
        goto out;

    local = frame->local;
    if (!local)
        goto out;

    op_ret = quota_inode_ctx_get(local->loc.inode, this, &ctx, 1);
    if ((op_ret < 0) || (ctx == NULL)) {
        op_errno = ENOMEM;
        goto out;
    }

    LOCK(&ctx->lock);
    {
        ctx->hard_lim = local->limit.hl;
        ctx->soft_lim = local->limit.sl;
        ctx->object_hard_lim = local->object_limit.hl;
        ctx->object_soft_lim = local->object_limit.sl;
    }
    UNLOCK(&ctx->lock);

out:
    QUOTA_STACK_UNWIND(fsetxattr, frame, op_ret, op_errno, xdata);
    return 0;
}

int
quota_fsetxattr(call_frame_t *frame, xlator_t *this, fd_t *fd, dict_t *dict,
                int flags, dict_t *xdata)
{
    quota_priv_t *priv = NULL;
    int32_t op_ret = -1;
    int32_t op_errno = EINVAL;
    quota_local_t *local = NULL;
    int64_t hard_lim = -1;
    int64_t soft_lim = -1;
    int64_t object_hard_limit = -1;
    int64_t object_soft_limit = -1;

    priv = this->private;

    WIND_IF_QUOTAOFF(priv->is_quota_on, off);

    VALIDATE_OR_GOTO(frame, err);
    VALIDATE_OR_GOTO(this, err);
    VALIDATE_OR_GOTO(fd, err);

    if (0 <= frame->root->pid) {
        GF_IF_INTERNAL_XATTR_GOTO("trusted.glusterfs.quota*", dict, op_errno,
                                  err);
        GF_IF_INTERNAL_XATTR_GOTO("trusted.pgfid*", dict, op_errno, err);
    }

    quota_get_limits(this, dict, &hard_lim, &soft_lim, &object_hard_limit,
                     &object_soft_limit);

    if (hard_lim > 0 || object_hard_limit > 0) {
        local = quota_local_new();
        if (local == NULL) {
            op_errno = ENOMEM;
            goto err;
        }
        frame->local = local;
        local->loc.inode = inode_ref(fd->inode);
    }

    if (hard_lim > 0) {
        local->limit.hl = hard_lim;
        local->limit.sl = soft_lim;
    }

    if (object_hard_limit > 0) {
        local->object_limit.hl = object_hard_limit;
        local->object_limit.sl = object_soft_limit;
    }

    STACK_WIND(frame, quota_fsetxattr_cbk, FIRST_CHILD(this),
               FIRST_CHILD(this)->fops->fsetxattr, fd, dict, flags, xdata);
    return 0;
err:
    QUOTA_STACK_UNWIND(fsetxattr, frame, op_ret, op_errno, NULL);
    return 0;

off:
    STACK_WIND_TAIL(frame, FIRST_CHILD(this),
                    FIRST_CHILD(this)->fops->fsetxattr, fd, dict, flags, xdata);
    return 0;
}

int
quota_removexattr_cbk(call_frame_t *frame, void *cookie, xlator_t *this,
                      int32_t op_ret, int32_t op_errno, dict_t *xdata)
{
    QUOTA_STACK_UNWIND(removexattr, frame, op_ret, op_errno, xdata);
    return 0;
}

int
quota_removexattr(call_frame_t *frame, xlator_t *this, loc_t *loc,
                  const char *name, dict_t *xdata)
{
    quota_priv_t *priv = NULL;
    int32_t op_errno = EINVAL;

    priv = this->private;

    WIND_IF_QUOTAOFF(priv->is_quota_on, off);

    VALIDATE_OR_GOTO(this, err);

    /* all quota xattrs can be cleaned up by doing setxattr on special key.
     * Hence its ok that we don't allow removexattr on quota keys here.
     */
    if (frame->root->pid >= 0) {
        GF_IF_NATIVE_XATTR_GOTO("trusted.glusterfs.quota*", name, op_errno,
                                err);
        GF_IF_NATIVE_XATTR_GOTO("trusted.pgfid*", name, op_errno, err);
    }

    VALIDATE_OR_GOTO(frame, err);
    VALIDATE_OR_GOTO(loc, err);

    STACK_WIND(frame, quota_removexattr_cbk, FIRST_CHILD(this),
               FIRST_CHILD(this)->fops->removexattr, loc, name, xdata);
    return 0;

err:
    QUOTA_STACK_UNWIND(removexattr, frame, -1, op_errno, NULL);
    return 0;

off:
    STACK_WIND_TAIL(frame, FIRST_CHILD(this),
                    FIRST_CHILD(this)->fops->removexattr, loc, name, xdata);
    return 0;
}

int
quota_fremovexattr_cbk(call_frame_t *frame, void *cookie, xlator_t *this,
                       int32_t op_ret, int32_t op_errno, dict_t *xdata)
{
    QUOTA_STACK_UNWIND(fremovexattr, frame, op_ret, op_errno, xdata);
    return 0;
}

int
quota_fremovexattr(call_frame_t *frame, xlator_t *this, fd_t *fd,
                   const char *name, dict_t *xdata)
{
    quota_priv_t *priv = NULL;
    int32_t op_ret = -1;
    int32_t op_errno = EINVAL;

    priv = this->private;

    WIND_IF_QUOTAOFF(priv->is_quota_on, off);

    VALIDATE_OR_GOTO(frame, err);
    VALIDATE_OR_GOTO(this, err);
    VALIDATE_OR_GOTO(fd, err);

    if (frame->root->pid >= 0) {
        GF_IF_NATIVE_XATTR_GOTO("trusted.glusterfs.quota*", name, op_errno,
                                err);
        GF_IF_NATIVE_XATTR_GOTO("trusted.pgfid*", name, op_errno, err);
    }
    STACK_WIND(frame, quota_fremovexattr_cbk, FIRST_CHILD(this),
               FIRST_CHILD(this)->fops->fremovexattr, fd, name, xdata);
    return 0;
err:
    QUOTA_STACK_UNWIND(fremovexattr, frame, op_ret, op_errno, NULL);
    return 0;

off:
    STACK_WIND_TAIL(frame, FIRST_CHILD(this),
                    FIRST_CHILD(this)->fops->fremovexattr, fd, name, xdata);
    return 0;
}

int32_t
quota_statfs_cbk(call_frame_t *frame, void *cookie, xlator_t *this,
                 int32_t op_ret, int32_t op_errno, struct statvfs *buf,
                 dict_t *xdata)
{
    inode_t *inode = NULL;
    uint64_t value = 0;
    int64_t usage = -1;
    int64_t avail = -1;
    int64_t blocks = 0;
    quota_inode_ctx_t *ctx = NULL;
    int ret = 0;

    inode = cookie;

    /* This fop will fail mostly in case of client disconnect,
     * which is already logged. Hence, not logging here */
    if (op_ret == -1)
        goto unwind;
    /*
     * We should never get here unless quota_statfs (below) sent us a
     * cookie, and it would only do so if the value was non-NULL.  This
     * check is therefore just routine defensive coding.
     */

    GF_VALIDATE_OR_GOTO("quota", inode, unwind);

    inode_ctx_get(inode, this, &value);
    ctx = (quota_inode_ctx_t *)(unsigned long)value;
    if (!ctx || ctx->hard_lim <= 0)
        goto unwind;

    { /* statfs is adjusted in this code block */
        usage = (ctx->size) / buf->f_bsize;

        blocks = ctx->hard_lim / buf->f_bsize;
        buf->f_blocks = blocks;

        avail = buf->f_blocks - usage;
        avail = max(avail, 0);

        buf->f_bfree = avail;
        /*
         * We have to assume that the total assigned quota
         * won't cause us to dip into the reserved space,
         * because dealing with the overcommitted cases is
         * just too hairy (especially when different bricks
         * might be using different reserved percentages and
         * such).
         */
        buf->f_bavail = buf->f_bfree;
    }

    xdata = xdata ? dict_ref(xdata) : dict_new();
    if (!xdata)
        goto unwind;

    ret = dict_set_int8(xdata, "quota-deem-statfs", 1);
    if (-1 == ret)
        gf_msg(this->name, GF_LOG_ERROR, ENOMEM, Q_MSG_ENOMEM,
               "Dict set failed, deem-statfs option may "
               "have no effect");

unwind:
    QUOTA_STACK_UNWIND(statfs, frame, op_ret, op_errno, buf, xdata);

    if (xdata)
        dict_unref(xdata);

    return 0;
}

int32_t
quota_statfs_helper(call_frame_t *frame, xlator_t *this, loc_t *loc,
                    dict_t *xdata)
{
    quota_local_t *local = frame->local;
    int op_errno = EINVAL;

    GF_VALIDATE_OR_GOTO("quota", local, err);

    if (-1 == local->op_ret) {
        op_errno = local->op_errno;
        goto err;
    }

    STACK_WIND_COOKIE(frame, quota_statfs_cbk, local->inode, FIRST_CHILD(this),
                      FIRST_CHILD(this)->fops->statfs, loc, xdata);
    return 0;
err:
    QUOTA_STACK_UNWIND(statfs, frame, -1, op_errno, NULL, NULL);

    return 0;
}

int32_t
quota_statfs_validate_cbk(call_frame_t *frame, void *cookie, xlator_t *this,
                          int32_t op_ret, int32_t op_errno, inode_t *inode,
                          struct iatt *buf, dict_t *xdata,
                          struct iatt *postparent)
{
    quota_local_t *local = NULL;
    int32_t ret = 0;
    quota_inode_ctx_t *ctx = NULL;
    uint64_t value = 0;
    quota_meta_t size = {
        0,
    };
    struct timeval tv = {
        0,
    };

    local = frame->local;

    if (op_ret < 0)
        goto resume;

    GF_ASSERT(local);
    GF_ASSERT(frame);
    GF_VALIDATE_OR_GOTO_WITH_ERROR("quota", this, resume, op_errno, EINVAL);
    GF_VALIDATE_OR_GOTO_WITH_ERROR(this->name, xdata, resume, op_errno, EINVAL);

    ret = inode_ctx_get(local->validate_loc.inode, this, &value);

    ctx = (quota_inode_ctx_t *)(unsigned long)value;
    if ((ret == -1) || (ctx == NULL)) {
        gf_msg(this->name, GF_LOG_WARNING, EINVAL, Q_MSG_INODE_CTX_GET_FAILED,
               "quota context is not present in inode (gfid:%s)",
               uuid_utoa(local->validate_loc.inode->gfid));
        op_errno = EINVAL;
        goto resume;
    }

    ret = quota_dict_get_meta(xdata, QUOTA_SIZE_KEY, SLEN(QUOTA_SIZE_KEY),
                              &size);
    if (ret == -1) {
        gf_msg(this->name, GF_LOG_WARNING, EINVAL, Q_MSG_SIZE_KEY_MISSING,
               "size key not present in "
               "dict");
        op_errno = EINVAL;
    }

    gettimeofday(&tv, NULL);
    LOCK(&ctx->lock);
    {
        ctx->size = size.size;
        ctx->file_count = size.file_count;
        ctx->dir_count = size.dir_count;
        memcpy(&ctx->tv, &tv, sizeof(struct timeval));
    }
    UNLOCK(&ctx->lock);

resume:
    local->op_errno = op_errno;
    quota_link_count_decrement(frame);
    return 0;
}

void
quota_get_limit_dir_continuation(struct list_head *parents, inode_t *inode,
                                 int32_t op_ret, int32_t op_errno, void *data)
{
    call_frame_t *frame = NULL;
    xlator_t *this = NULL;
    quota_dentry_t *entry = NULL;
    inode_t *parent = NULL;

    frame = data;
    this = THIS;

    if ((op_ret < 0) || list_empty(parents)) {
        if (op_ret >= 0) {
            gf_msg(this->name, GF_LOG_WARNING, EIO, Q_MSG_ANCESTRY_BUILD_FAILED,
                   "Couldn't build ancestry for inode (gfid:%s). "
                   "Without knowing ancestors till root, quota "
                   "cannot be enforced. "
                   "Hence, failing fop with EIO",
                   uuid_utoa(inode->gfid));
            op_errno = EIO;
        }

        quota_handle_validate_error(frame, -1, op_errno);
        goto out;
    }

    entry = list_entry(parents, quota_dentry_t, next);
    parent = inode_find(inode->table, entry->par);

    quota_get_limit_dir(frame, parent, this);

    inode_unref(parent);
out:
    return;
}

void
quota_statfs_continue(call_frame_t *frame, xlator_t *this, inode_t *inode)
{
    quota_local_t *local = frame->local;
    int ret = -1;

    LOCK(&local->lock);
    {
        local->inode = inode_ref(inode);
    }
    UNLOCK(&local->lock);

    ret = quota_validate(frame, local->inode, this, quota_statfs_validate_cbk);
    if (0 > ret)
        quota_handle_validate_error(frame, -1, -ret);
}

void
quota_get_limit_dir(call_frame_t *frame, inode_t *cur_inode, xlator_t *this)
{
    inode_t *inode = NULL;
    inode_t *parent = NULL;
    uint64_t value = 0;
    quota_inode_ctx_t *ctx = NULL;
    quota_local_t *local = frame->local;

    if (!cur_inode)
        goto out;

    inode = inode_ref(cur_inode);
    while (inode) {
        value = 0;
        inode_ctx_get(inode, this, &value);

        if (value) {
            ctx = (quota_inode_ctx_t *)(unsigned long)value;
            if (ctx->hard_lim > 0)
                break;
        }

        if (__is_root_gfid(inode->gfid))
            goto off;

        parent = inode_parent(inode, 0, NULL);
        if (!parent) {
            (void)quota_build_ancestry(inode, quota_get_limit_dir_continuation,
                                       frame);
            goto out;
        }

        inode_unref(inode);
        inode = parent;
    }

    quota_statfs_continue(frame, this, inode);
    inode_unref(inode);
    return;

off:
    gf_msg_debug(this->name, 0, "No limit set on the inode or it's parents.");

    QUOTA_STACK_WIND_TAIL(frame, FIRST_CHILD(this),
                          FIRST_CHILD(this)->fops->statfs, &local->loc,
                          local->xdata);
out:
    inode_unref(inode);

    return;
}

int32_t
quota_statfs(call_frame_t *frame, xlator_t *this, loc_t *loc, dict_t *xdata)
{
    int op_errno = 0;
    int ret = -1;
    int8_t ignore_deem_statfs = 0;
    quota_priv_t *priv = NULL;
    quota_local_t *local = NULL;
    call_stub_t *stub = NULL;

    priv = this->private;
    GF_ASSERT(loc);

    WIND_IF_QUOTAOFF(priv->is_quota_on, off);

    ret = dict_get_int8(xdata, GF_INTERNAL_IGNORE_DEEM_STATFS,
                        &ignore_deem_statfs);
    ret = 0;

    if (ignore_deem_statfs)
        goto off;

    if (priv->consider_statfs && loc->inode) {
        local = quota_local_new();
        if (!local) {
            op_errno = ENOMEM;
            goto err;
        }
        frame->local = local;

        ret = loc_copy(&local->loc, loc);
        if (-1 == ret) {
            op_errno = ENOMEM;
            goto err;
        }

        if (xdata)
            local->xdata = dict_ref(xdata);

        stub = fop_statfs_stub(frame, quota_statfs_helper, &local->loc,
                               local->xdata);
        if (!stub) {
            op_errno = ENOMEM;
            goto err;
        }

        LOCK(&local->lock);
        {
            local->link_count = 1;
            local->stub = stub;
        }
        UNLOCK(&local->lock);

        quota_get_limit_dir(frame, loc->inode, this);

        return 0;
    }

    /*
     * We have to make sure that we never get to quota_statfs_cbk
     * with a cookie that points to something other than an inode,
     * which is exactly what would happen with STACK_UNWIND using
     * that as a callback.  Therefore, use default_statfs_cbk in
     * this case instead.
     *
     * Also if the option deem-statfs is not set to "on" don't
     * bother calculating quota limit on / in statfs_cbk.
     */
    if (priv->consider_statfs)
        gf_log(this->name, GF_LOG_ERROR,
               "Missing inode, can't adjust for quota");

off:
    STACK_WIND_TAIL(frame, FIRST_CHILD(this), FIRST_CHILD(this)->fops->statfs,
                    loc, xdata);
    return 0;

err:
    QUOTA_STACK_UNWIND(statfs, frame, -1, op_errno, NULL, NULL);

    return 0;
}

int
quota_readdirp_cbk(call_frame_t *frame, void *cookie, xlator_t *this,
                   int op_ret, int op_errno, gf_dirent_t *entries,
                   dict_t *xdata)
{
    gf_dirent_t *entry = NULL;
    quota_local_t *local = NULL;
    loc_t loc = {
        0,
    };

    if (op_ret <= 0)
        goto unwind;

    local = frame->local;

    list_for_each_entry(entry, &entries->list, list)
    {
        if ((strcmp(entry->d_name, ".") == 0) ||
            (strcmp(entry->d_name, "..") == 0) || entry->inode == NULL)
            continue;

        gf_uuid_copy(loc.gfid, entry->d_stat.ia_gfid);
        loc.inode = inode_ref(entry->inode);
        loc.parent = inode_ref(local->loc.inode);
        gf_uuid_copy(loc.pargfid, loc.parent->gfid);
        loc.name = entry->d_name;

        quota_fill_inodectx(this, entry->inode, entry->dict, &loc,
                            &entry->d_stat, &op_errno);

        loc_wipe(&loc);
    }

unwind:
    QUOTA_STACK_UNWIND(readdirp, frame, op_ret, op_errno, entries, xdata);

    return 0;
}

int
quota_readdirp(call_frame_t *frame, xlator_t *this, fd_t *fd, size_t size,
               off_t offset, dict_t *dict)
{
    quota_priv_t *priv = NULL;
    int ret = 0;
    gf_boolean_t new_dict = _gf_false;
    quota_local_t *local = NULL;

    priv = this->private;

    WIND_IF_QUOTAOFF(priv->is_quota_on, off);

    local = quota_local_new();

    if (local == NULL) {
        goto err;
    }

    frame->local = local;

    local->loc.inode = inode_ref(fd->inode);

    if (dict == NULL) {
        dict = dict_new();
        new_dict = _gf_true;
    }

    if (dict) {
        ret = dict_set_int8(dict, QUOTA_LIMIT_KEY, 1);
        if (ret < 0) {
            gf_msg(this->name, GF_LOG_WARNING, ENOMEM, Q_MSG_ENOMEM,
                   "dict set of key for hard-limit");
            goto err;
        }
    }

    if (dict) {
        ret = dict_set_int8(dict, QUOTA_LIMIT_OBJECTS_KEY, 1);
        if (ret < 0) {
            gf_msg(this->name, GF_LOG_WARNING, ENOMEM, Q_MSG_ENOMEM,
                   "dict set of key for hard-limit "
                   "failed");
            goto err;
        }
    }

    STACK_WIND(frame, quota_readdirp_cbk, FIRST_CHILD(this),
               FIRST_CHILD(this)->fops->readdirp, fd, size, offset, dict);

    if (new_dict) {
        dict_unref(dict);
    }

    return 0;
err:
    STACK_UNWIND_STRICT(readdirp, frame, -1, EINVAL, NULL, NULL);

    if (new_dict) {
        dict_unref(dict);
    }

    return 0;

off:
    STACK_WIND_TAIL(frame, FIRST_CHILD(this), FIRST_CHILD(this)->fops->readdirp,
                    fd, size, offset, dict);
    return 0;
}

int32_t
quota_fallocate_cbk(call_frame_t *frame, void *cookie, xlator_t *this,
                    int32_t op_ret, int32_t op_errno, struct iatt *prebuf,
                    struct iatt *postbuf, dict_t *xdata)
{
    int32_t ret = 0;
    uint64_t ctx_int = 0;
    quota_inode_ctx_t *ctx = NULL;
    quota_local_t *local = NULL;

    local = frame->local;

    if ((op_ret < 0) || (local == NULL)) {
        goto out;
    }

    ret = inode_ctx_get(local->loc.inode, this, &ctx_int);
    if (ret) {
        gf_msg(this->name, GF_LOG_WARNING, 0, Q_MSG_INODE_CTX_GET_FAILED,
               "%s: failed to get the context", local->loc.path);
        goto out;
    }

    ctx = (quota_inode_ctx_t *)(unsigned long)ctx_int;

    if (ctx == NULL) {
        gf_msg(this->name, GF_LOG_WARNING, 0, Q_MSG_INODE_CTX_GET_FAILED,
               "quota context not set in %s (gfid:%s)", local->loc.path,
               uuid_utoa(local->loc.inode->gfid));
        goto out;
    }

    LOCK(&ctx->lock);
    {
        ctx->buf = *postbuf;
    }
    UNLOCK(&ctx->lock);

out:
    QUOTA_STACK_UNWIND(fallocate, frame, op_ret, op_errno, prebuf, postbuf,
                       xdata);

    return 0;
}

int32_t
quota_fallocate_helper(call_frame_t *frame, xlator_t *this, fd_t *fd,
                       int32_t mode, off_t offset, size_t len, dict_t *xdata)
{
    quota_local_t *local = NULL;
    int32_t op_errno = EINVAL;

    local = frame->local;

    GF_VALIDATE_OR_GOTO("quota", local, unwind);

    if (local->op_ret == -1) {
        op_errno = local->op_errno;
        if (op_errno == ENOENT || op_errno == ESTALE) {
            /* We may get ENOENT/ESTALE in case of below scenario
             *     fd = open file.txt
             *     unlink file.txt
             *     fallocate on fd
             * Here build_ancestry can fail as the file is removed.
             * For now ignore ENOENT/ESTALE on active fd
             * We need to re-visit this code once we understand
             * how other file-system behave in this scenario
             */
            gf_msg_debug(this->name, 0,
                         "quota enforcer failed "
                         "with ENOENT/ESTALE on %s, cannot check "
                         "quota limits and allowing fallocate",
                         uuid_utoa(fd->inode->gfid));
        } else {
            goto unwind;
        }
    }

    STACK_WIND(frame, quota_fallocate_cbk, FIRST_CHILD(this),
               FIRST_CHILD(this)->fops->fallocate, fd, mode, offset, len,
               xdata);
    return 0;

unwind:
    QUOTA_STACK_UNWIND(fallocate, frame, -1, op_errno, NULL, NULL, NULL);
    return 0;
}

int32_t
quota_fallocate(call_frame_t *frame, xlator_t *this, fd_t *fd, int32_t mode,
                off_t offset, size_t len, dict_t *xdata)
{
    int32_t op_errno = EINVAL;
    int32_t parents = 0;
    int32_t fail_count = 0;
    quota_local_t *local = NULL;
    quota_inode_ctx_t *ctx = NULL;
    quota_priv_t *priv = NULL;
    quota_dentry_t *dentry = NULL;
    quota_dentry_t *tmp = NULL;
    call_stub_t *stub = NULL;
    struct list_head head = {
        0,
    };
    inode_t *par_inode = NULL;

    priv = this->private;
    GF_VALIDATE_OR_GOTO(this->name, priv, unwind);

    WIND_IF_QUOTAOFF(priv->is_quota_on, off);

    INIT_LIST_HEAD(&head);

    GF_ASSERT(frame);
    GF_VALIDATE_OR_GOTO("quota", this, unwind);
    GF_VALIDATE_OR_GOTO(this->name, fd, unwind);

    local = quota_local_new();
    if (local == NULL) {
        goto unwind;
    }

    frame->local = local;
    local->loc.inode = inode_ref(fd->inode);

    (void)quota_inode_ctx_get(fd->inode, this, &ctx, 0);
    if (ctx == NULL) {
        gf_msg_debug(this->name, 0,
                     "quota context is NULL on inode"
                     " (%s). If quota is not enabled recently and "
                     "crawler has finished crawling, its an error",
                     uuid_utoa(local->loc.inode->gfid));
    }

    stub = fop_fallocate_stub(frame, quota_fallocate_helper, fd, mode, offset,
                              len, xdata);
    if (stub == NULL) {
        op_errno = ENOMEM;
        goto unwind;
    }

    priv = this->private;
    GF_VALIDATE_OR_GOTO(this->name, priv, unwind);

    parents = quota_add_parents_from_ctx(ctx, &head);
    if (parents == -1) {
        op_errno = errno;
        goto unwind;
    }

    /*
     * Note that by using len as the delta we're assuming the range from
     * offset to offset+len has not already been allocated. This can result
     * in ENOSPC errors attempting to allocate an already allocated range.
     */
    local->delta = len;
    local->object_delta = 0;
    local->stub = stub;
    local->link_count = parents;

    if (parents == 0) {
        local->link_count = 1;
        quota_check_limit(frame, fd->inode, this);
    } else {
        list_for_each_entry_safe(dentry, tmp, &head, next)
        {
            par_inode = do_quota_check_limit(frame, fd->inode, this, dentry,
                                             _gf_false);
            if (par_inode == NULL) {
                /* remove stale entry from inode_ctx */
                quota_dentry_del(ctx, dentry->name, dentry->par);
                parents--;
                fail_count++;
            } else {
                inode_unref(par_inode);
            }
            __quota_dentry_free(dentry);
        }

        if (parents == 0) {
            LOCK(&local->lock);
            {
                local->link_count++;
            }
            UNLOCK(&local->lock);
            quota_check_limit(frame, fd->inode, this);
        }

        while (fail_count != 0) {
            quota_link_count_decrement(frame);
            fail_count--;
        }
    }

    return 0;

unwind:
    QUOTA_STACK_UNWIND(fallocate, frame, -1, op_errno, NULL, NULL, NULL);
    return 0;

off:
    STACK_WIND_TAIL(frame, FIRST_CHILD(this),
                    FIRST_CHILD(this)->fops->fallocate, fd, mode, offset, len,
                    xdata);
    return 0;
}

void
quota_log_helper(char **usage_str, int64_t cur_size, inode_t *inode,
                 char **path, struct timeval *cur_time)
{
    xlator_t *this = THIS;

    if (!usage_str || !inode || !path || !cur_time) {
        gf_log(this->name, GF_LOG_ERROR, "Received null argument");
        return;
    }

    *usage_str = gf_uint64_2human_readable(cur_size);
    if (!(*usage_str))
        gf_msg(this->name, GF_LOG_ERROR, ENOMEM, Q_MSG_ENOMEM,
               "integer to string conversion failed Reason"
               ":\"Cannot allocate memory\"");

    inode_path(inode, NULL, path);
    if (!(*path))
        *path = uuid_utoa(inode->gfid);

    gettimeofday(cur_time, NULL);
}

/* Logs if
 *  i.   Usage crossed soft limit
 *  ii.  Usage above soft limit and alert-time elapsed
 */
void
quota_log_usage(xlator_t *this, quota_inode_ctx_t *ctx, inode_t *inode,
                int64_t delta)
{
    struct timeval cur_time = {
        0,
    };
    char *usage_str = NULL;
    char *path = NULL;
    int64_t cur_size = 0;
    quota_priv_t *priv = NULL;

    priv = this->private;
    cur_size = ctx->size + delta;

    if ((ctx->soft_lim <= 0) || cur_size < ctx->soft_lim)
        return;

    /* Usage crossed/reached soft limit */
    if (DID_REACH_LIMIT(ctx->soft_lim, ctx->size, cur_size)) {
        quota_log_helper(&usage_str, cur_size, inode, &path, &cur_time);

        gf_msg(this->name, GF_LOG_ALERT, 0, Q_MSG_CROSSED_SOFT_LIMIT,
               "Usage crossed soft limit: "
               "%s used by %s",
               usage_str, path);

        gf_event(EVENT_QUOTA_CROSSED_SOFT_LIMIT,
                 "Usage=%s;volume=%s;"
                 "path=%s",
                 usage_str, priv->volume_uuid, path);

        ctx->prev_log = cur_time;

    }
    /* Usage is above soft limit */
    else if (cur_size > ctx->soft_lim &&
             quota_timeout(&ctx->prev_log, priv->log_timeout)) {
        quota_log_helper(&usage_str, cur_size, inode, &path, &cur_time);

        gf_msg(this->name, GF_LOG_ALERT, 0, Q_MSG_CROSSED_SOFT_LIMIT,
               "Usage is above soft limit: %s used by %s", usage_str, path);

        gf_event(EVENT_QUOTA_CROSSED_SOFT_LIMIT,
                 "Usage=%s;volume=%s;"
                 "path=%s",
                 usage_str, priv->volume_uuid, path);

        ctx->prev_log = cur_time;
    }

    if (usage_str)
        GF_FREE(usage_str);
}

int32_t
mem_acct_init(xlator_t *this)
{
    int ret = -1;

    if (!this)
        return ret;

    ret = xlator_mem_acct_init(this, gf_quota_mt_end + 1);

    if (ret != 0) {
        gf_msg(this->name, GF_LOG_WARNING, ENOMEM, Q_MSG_ENOMEM,
               "Memory accounting init failed");
        return ret;
    }

    return ret;
}

int32_t
quota_forget(xlator_t *this, inode_t *inode)
{
    int32_t ret = 0;
    uint64_t ctx_int = 0;
    quota_inode_ctx_t *ctx = NULL;
    quota_dentry_t *dentry = NULL, *tmp;

    ret = inode_ctx_del(inode, this, &ctx_int);

    if (ret < 0) {
        return 0;
    }

    ctx = (quota_inode_ctx_t *)(long)ctx_int;

    LOCK(&ctx->lock);
    {
        list_for_each_entry_safe(dentry, tmp, &ctx->parents, next)
        {
            __quota_dentry_free(dentry);
        }
    }
    UNLOCK(&ctx->lock);

    LOCK_DESTROY(&ctx->lock);

    GF_FREE(ctx);

    return 0;
}

int
notify(xlator_t *this, int event, void *data, ...)
{
    quota_priv_t *priv = NULL;
    int ret = 0;
    rpc_clnt_t *rpc = NULL;
    gf_boolean_t conn_status = _gf_true;
    xlator_t *victim = data;

    priv = this->private;
    if (!priv || !priv->is_quota_on)
        goto out;

    if (event == GF_EVENT_PARENT_DOWN) {
        rpc = priv->rpc_clnt;
        if (rpc) {
            rpc_clnt_disable(rpc);
            pthread_mutex_lock(&priv->conn_mutex);
            {
                conn_status = priv->conn_status;
                while (conn_status) {
                    (void)pthread_cond_wait(&priv->conn_cond,
                                            &priv->conn_mutex);
                    conn_status = priv->conn_status;
                }
            }
            pthread_mutex_unlock(&priv->conn_mutex);
            gf_log(this->name, GF_LOG_INFO,
                   "Notify GF_EVENT_PARENT_DOWN for brick %s", victim->name);
        }
    }

out:
    ret = default_notify(this, event, data);
    return ret;
}

int32_t
init(xlator_t *this)
{
    int32_t ret = -1;
    quota_priv_t *priv = NULL;
    rpc_clnt_t *rpc = NULL;

    if ((this->children == NULL) || this->children->next) {
        gf_msg(this->name, GF_LOG_ERROR, 0, Q_MSG_INVALID_VOLFILE,
               "FATAL: quota (%s) not configured with "
               "exactly one child",
               this->name);
        return -1;
    }

    if (this->parents == NULL) {
        gf_msg(this->name, GF_LOG_WARNING, 0, Q_MSG_INVALID_VOLFILE,
               "dangling volume. check volfile");
    }

    QUOTA_ALLOC_OR_GOTO(priv, quota_priv_t, err);

    LOCK_INIT(&priv->lock);

    this->private = priv;

    GF_OPTION_INIT("deem-statfs", priv->consider_statfs, bool, err);
    GF_OPTION_INIT("server-quota", priv->is_quota_on, bool, err);
    GF_OPTION_INIT("default-soft-limit", priv->default_soft_lim, percent, err);
    GF_OPTION_INIT("soft-timeout", priv->soft_timeout, time, err);
    GF_OPTION_INIT("hard-timeout", priv->hard_timeout, time, err);
    GF_OPTION_INIT("alert-time", priv->log_timeout, time, err);
    GF_OPTION_INIT("volume-uuid", priv->volume_uuid, str, err);

    this->local_pool = mem_pool_new(quota_local_t, 64);
    if (!this->local_pool) {
        ret = -1;
        gf_msg(this->name, GF_LOG_ERROR, ENOMEM, Q_MSG_ENOMEM,
               "failed to create local_t's memory pool");
        goto err;
    }

    pthread_mutex_init(&priv->conn_mutex, NULL);
    pthread_cond_init(&priv->conn_cond, NULL);
    priv->conn_status = _gf_false;

    if (priv->is_quota_on) {
        rpc = quota_enforcer_init(this, this->options);
        if (rpc == NULL) {
            ret = -1;
            gf_msg(this->name, GF_LOG_WARNING, 0,
                   Q_MSG_QUOTA_ENFORCER_RPC_INIT_FAILED,
                   "quota enforcer rpc init failed");
            goto err;
        }

        LOCK(&priv->lock);
        {
            priv->rpc_clnt = rpc;
        }
        UNLOCK(&priv->lock);
    }

    ret = 0;
err:
    return ret;
}

int
reconfigure(xlator_t *this, dict_t *options)
{
    int32_t ret = -1;
    quota_priv_t *priv = NULL;
    gf_boolean_t quota_on = _gf_false;
    rpc_clnt_t *rpc = NULL;

    priv = this->private;

    GF_OPTION_RECONF("deem-statfs", priv->consider_statfs, options, bool, out);
    GF_OPTION_RECONF("server-quota", quota_on, options, bool, out);
    GF_OPTION_RECONF("default-soft-limit", priv->default_soft_lim, options,
                     percent, out);
    GF_OPTION_RECONF("alert-time", priv->log_timeout, options, time, out);
    GF_OPTION_RECONF("soft-timeout", priv->soft_timeout, options, time, out);
    GF_OPTION_RECONF("hard-timeout", priv->hard_timeout, options, time, out);

    if (quota_on) {
        priv->rpc_clnt = quota_enforcer_init(this, this->options);
        if (priv->rpc_clnt == NULL) {
            ret = -1;
            gf_msg(this->name, GF_LOG_WARNING, 0,
                   Q_MSG_QUOTA_ENFORCER_RPC_INIT_FAILED,
                   "quota enforcer rpc init failed");
            goto out;
        }

    } else {
        LOCK(&priv->lock);
        {
            rpc = priv->rpc_clnt;
            priv->rpc_clnt = NULL;
        }
        UNLOCK(&priv->lock);

        if (rpc != NULL) {
            // Quotad is shutdown when there is no started volume
            // which has quota enabled. So, we should disable the
            // enforcer client when quota is disabled on a volume,
            // to avoid spurious reconnect attempts to a service
            // (quotad), that is known to be down.
            rpc_clnt_unref(rpc);
        }
    }

    priv->is_quota_on = quota_on;

    ret = 0;
out:
    return ret;
}

int32_t
quota_priv_dump(xlator_t *this)
{
    quota_priv_t *priv = NULL;
    int32_t ret = -1;

    GF_ASSERT(this);

    priv = this->private;
    if (!priv)
        goto out;

    gf_proc_dump_add_section("xlators.features.quota.priv");

    ret = TRY_LOCK(&priv->lock);
    if (ret)
        goto out;
    else {
        gf_proc_dump_write("soft-timeout", "%d", priv->soft_timeout);
        gf_proc_dump_write("hard-timeout", "%d", priv->hard_timeout);
        gf_proc_dump_write("alert-time", "%d", priv->log_timeout);
        gf_proc_dump_write("quota-on", "%d", priv->is_quota_on);
        gf_proc_dump_write("statfs", "%d", priv->consider_statfs);
        gf_proc_dump_write("volume-uuid", "%s", priv->volume_uuid);
        gf_proc_dump_write("validation-count", "%" PRIu64,
                           priv->validation_count);
    }
    UNLOCK(&priv->lock);

out:
    return 0;
}

void
fini(xlator_t *this)
{
    quota_priv_t *priv = NULL;
    rpc_clnt_t *rpc = NULL;

    priv = this->private;
    if (!priv)
        return;
    rpc = priv->rpc_clnt;
    priv->rpc_clnt = NULL;
    if (rpc) {
        rpc_clnt_connection_cleanup(&rpc->conn);
        rpc_clnt_unref(rpc);
    }

    this->private = NULL;
    LOCK_DESTROY(&priv->lock);
    pthread_mutex_destroy(&priv->conn_mutex);
    pthread_cond_destroy(&priv->conn_cond);

    GF_FREE(priv);
    if (this->local_pool) {
        mem_pool_destroy(this->local_pool);
        this->local_pool = NULL;
    }
    return;
}

struct xlator_fops fops = {
    .statfs = quota_statfs,
    .lookup = quota_lookup,
    .writev = quota_writev,
    .create = quota_create,
    .mkdir = quota_mkdir,
    .truncate = quota_truncate,
    .ftruncate = quota_ftruncate,
    .unlink = quota_unlink,
    .symlink = quota_symlink,
    .link = quota_link,
    .rename = quota_rename,
    .getxattr = quota_getxattr,
    .fgetxattr = quota_fgetxattr,
    .stat = quota_stat,
    .fstat = quota_fstat,
    .readlink = quota_readlink,
    .readv = quota_readv,
    .fsync = quota_fsync,
    .setattr = quota_setattr,
    .fsetattr = quota_fsetattr,
    .mknod = quota_mknod,
    .setxattr = quota_setxattr,
    .fsetxattr = quota_fsetxattr,
    .removexattr = quota_removexattr,
    .fremovexattr = quota_fremovexattr,
    .readdirp = quota_readdirp,
    .fallocate = quota_fallocate,
};

struct xlator_cbks cbks = {.forget = quota_forget};

struct xlator_dumpops dumpops = {
    .priv = quota_priv_dump,
};
struct volume_options options[] = {
    {
        .key = {"enable"},
        .type = GF_OPTION_TYPE_BOOL,
        .default_value = "off",
        .description = "enable is the volume option that can be used "
                       "to turn on quota.",
        .op_version = {1},
        .flags = OPT_FLAG_SETTABLE | OPT_FLAG_DOC,
        .level = OPT_STATUS_BASIC,
        .tags = {},
    },
    {
        .key = {"deem-statfs"},
        .type = GF_OPTION_TYPE_BOOL,
        .default_value = "on",
        .description = "If set to on, it takes quota limits into"
                       " consideration while estimating fs size. (df command)"
                       " (Default is on).",
        .op_version = {2},
        .flags = OPT_FLAG_SETTABLE | OPT_FLAG_DOC,
        .tags = {},
    },
    {
        .key = {"server-quota"},
        .type = GF_OPTION_TYPE_BOOL,
        .default_value = "off",
        .description = "Skip the quota enforcement if the feature is"
                       " not turned on. This is not a user exposed option.",
        .flags = OPT_FLAG_NONE,
    },
    {
        .key = {"default-soft-limit"},
        .type = GF_OPTION_TYPE_PERCENT,
        .default_value = "80%",
        .op_version = {3},
        .description = "Soft limit is expressed as a proportion of hard limit."
                       " Default-soft-limit is the proportion used when the "
                       " user does not supply any soft limit value.",
        .flags = OPT_FLAG_SETTABLE | OPT_FLAG_DOC,
        .tags = {},
    },
    {
        .key = {"soft-timeout"},
        .type = GF_OPTION_TYPE_TIME,
        .min = 0,
        .max = 1800,
        .default_value = "60",
        .description = "quota caches the directory sizes on client. "
                       "soft-timeout indicates the timeout for the validity of"
                       " cache before soft-limit has been crossed.",
        .op_version = {3},
        .flags = OPT_FLAG_SETTABLE | OPT_FLAG_DOC,
        .tags = {},
    },
    {
        .key = {"hard-timeout"},
        .type = GF_OPTION_TYPE_TIME,
        .min = 0,
        .max = 60,
        .default_value = "5",
        .description = "quota caches the directory sizes on client. "
                       "hard-timeout indicates the timeout for the validity of"
                       " cache after soft-limit has been crossed.",
        .op_version = {3},
        .flags = OPT_FLAG_SETTABLE | OPT_FLAG_DOC,
        .tags = {},
    },
    {.key = {"volume-uuid"},
     .type = GF_OPTION_TYPE_STR,
     .default_value = "{{ volume.id }}",
     .description = "uuid of the volume this brick is part of."},
    {
        .key = {"alert-time"},
        .type = GF_OPTION_TYPE_TIME,
        .min = 0,
        .max = 7 * 86400,
        .default_value = "86400",
        .op_version = {3},
        .flags = OPT_FLAG_SETTABLE | OPT_FLAG_DOC,
        .description = "Frequency of limit breach messages in log.",
        .tags = {},
    },
    {.key = {NULL}}};

xlator_api_t xlator_api = {
    .init = init,
    .fini = fini,
    .notify = notify,
    .reconfigure = reconfigure,
    .mem_acct_init = mem_acct_init,
    .op_version = {1}, /* Present from the initial version */
    .fops = &fops,
    .cbks = &cbks,
    .options = options,
    .identifier = "quota",
    .category = GF_MAINTAINED,
};