Blob Blame History Raw
/*
 *   Copyright (c) 2018 Red Hat, Inc. <http://www.redhat.com>
 *   This file is part of GlusterFS.
 *
 *   This file is licensed to you under your choice of the GNU Lesser
 *   General Public License, version 3 or any later version (LGPLv3 or
 *   later), or the GNU General Public License, version 2 (GPLv2), in all
 *   cases as published by the Free Software Foundation.
 */

#include <glusterfs/glusterfs.h>
#include <glusterfs/xlator.h>
#include <glusterfs/defaults.h>
#include "cloudsync.h"
#include "cloudsync-common.h"
#include <glusterfs/call-stub.h>
#include "cloudsync-autogen-fops.h"

#include <string.h>
#include <dlfcn.h>

void
cs_cleanup_private(cs_private_t *priv)
{
    if (priv) {
        if (priv->stores) {
            priv->stores->fini(priv->stores->config);
            GF_FREE(priv->stores);
        }

        pthread_spin_destroy(&priv->lock);
        GF_FREE(priv);
    }

    return;
}

struct cs_plugin plugins[] = {
    {.name = "cloudsyncs3",
     .library = "cloudsyncs3.so",
     .description = "cloudsync s3 store."},
#if defined(__linux__)
    {.name = "cvlt",
     .library = "cloudsynccvlt.so",
     .description = "Commvault content store."},
#endif
    {.name = NULL},
};

int
cs_init(xlator_t *this)
{
    cs_private_t *priv = NULL;
    gf_boolean_t per_vol = _gf_false;
    int ret = 0;
    char *libpath = NULL;
    store_methods_t *store_methods = NULL;
    void *handle = NULL;
    char *temp_str = NULL;
    int index = 0;
    char *libname = NULL;

    priv = GF_CALLOC(1, sizeof(*priv), gf_cs_mt_cs_private_t);
    if (!priv) {
        gf_msg(this->name, GF_LOG_ERROR, 0, 0, "insufficient memory");
        goto out;
    }

    priv->this = this;

    this->local_pool = mem_pool_new(cs_local_t, 512);
    if (!this->local_pool) {
        gf_msg(this->name, GF_LOG_ERROR, 0, ENOMEM, "initialisation failed.");
        ret = -1;
        goto out;
    }

    this->private = priv;

    GF_OPTION_INIT("cloudsync-remote-read", priv->remote_read, bool, out);

    /* temp workaround. Should be configurable through glusterd*/
    per_vol = _gf_true;

    if (per_vol) {
        if (dict_get_str(this->options, "cloudsync-storetype", &temp_str) ==
            0) {
            for (index = 0; plugins[index].name; index++) {
                if (!strcmp(temp_str, plugins[index].name)) {
                    libname = plugins[index].library;
                    break;
                }
            }
        } else {
            ret = 0;
        }

        if (!libname) {
            gf_msg(this->name, GF_LOG_WARNING, 0, 0, "no plugin enabled");
            ret = 0;
            goto out;
        }

        ret = gf_asprintf(&libpath, "%s/%s", CS_PLUGINDIR, libname);
        if (ret == -1) {
            goto out;
        }

        handle = dlopen(libpath, RTLD_NOW);
        if (!handle) {
            gf_msg(this->name, GF_LOG_WARNING, 0, 0,
                   "could not "
                   "load the required library. %s",
                   dlerror());
            ret = 0;
            goto out;
        } else {
            gf_msg(this->name, GF_LOG_INFO, 0, 0,
                   "loading library:%s successful", libname);
        }

        priv->stores = GF_CALLOC(1, sizeof(struct cs_remote_stores),
                                 gf_cs_mt_cs_remote_stores_t);
        if (!priv->stores) {
            gf_msg(this->name, GF_LOG_ERROR, 0, 0,
                   "Could not "
                   "allocate memory for priv->stores");
            ret = -1;
            goto out;
        }

        (void)dlerror(); /* clear out previous error string */

        /* load library methods */
        store_methods = (store_methods_t *)dlsym(handle, "store_ops");
        if (!store_methods) {
            gf_msg(this->name, GF_LOG_ERROR, 0, 0, "null store_methods %s",
                   dlerror());
            ret = -1;
            goto out;
        }

        (void)dlerror();

        if (priv->remote_read) {
            priv->stores->rdfop = store_methods->fop_remote_read;
            if (!priv->stores->rdfop) {
                gf_msg(this->name, GF_LOG_ERROR, 0, 0,
                       "failed to get"
                       " read fop %s",
                       dlerror());
                ret = -1;
                goto out;
            }
        }

        priv->stores->dlfop = store_methods->fop_download;
        if (!priv->stores->dlfop) {
            gf_msg(this->name, GF_LOG_ERROR, 0, 0,
                   "failed to get"
                   " download fop %s",
                   dlerror());
            ret = -1;
            goto out;
        }

        (void)dlerror();
        priv->stores->init = store_methods->fop_init;
        if (!priv->stores->init) {
            gf_msg(this->name, GF_LOG_ERROR, 0, 0,
                   "failed to get"
                   " init fop %s",
                   dlerror());
            ret = -1;
            goto out;
        }

        (void)dlerror();
        priv->stores->reconfigure = store_methods->fop_reconfigure;
        if (!priv->stores->reconfigure) {
            gf_msg(this->name, GF_LOG_ERROR, 0, 0,
                   "failed to get"
                   " reconfigure fop %s",
                   dlerror());
            ret = -1;
            goto out;
        }

        priv->stores->handle = handle;

        priv->stores->config = (void *)((priv->stores->init)(this));
        if (!priv->stores->config) {
            gf_msg(this->name, GF_LOG_ERROR, 0, 0, "null config");
            ret = -1;
            goto out;
        }
    }

    ret = 0;

out:
    if (ret == -1) {
        if (this->local_pool)
            mem_pool_destroy(this->local_pool);

        cs_cleanup_private(priv);

        if (handle) {
            dlclose(handle);
        }
    }

    GF_FREE(libpath);

    return ret;
}

int
cs_forget(xlator_t *this, inode_t *inode)
{
    uint64_t ctx_int = 0;
    cs_inode_ctx_t *ctx = NULL;

    inode_ctx_del(inode, this, &ctx_int);
    if (!ctx_int)
        return 0;

    ctx = (cs_inode_ctx_t *)(uintptr_t)ctx_int;

    GF_FREE(ctx);
    return 0;
}

void
cs_fini(xlator_t *this)
{
    cs_private_t *priv = NULL;
    priv = this->private;

    cs_cleanup_private(priv);
}

int
cs_reconfigure(xlator_t *this, dict_t *options)
{
    cs_private_t *priv = NULL;
    int ret = 0;

    priv = this->private;
    if (!priv) {
        ret = -1;
        goto out;
    }

    GF_OPTION_RECONF("cloudsync-remote-read", priv->remote_read, options, bool,
                     out);

    /* needed only for per volume configuration*/
    ret = priv->stores->reconfigure(this, options);

out:
    return ret;
}

int32_t
cs_mem_acct_init(xlator_t *this)
{
    int ret = -1;

    GF_VALIDATE_OR_GOTO("cloudsync", this, out);

    ret = xlator_mem_acct_init(this, gf_cs_mt_end + 1);

    if (ret != 0) {
        gf_msg(this->name, GF_LOG_ERROR, 0, 0, "Memory accounting init failed");
        return ret;
    }
out:
    return ret;
}

int32_t
cs_readdirp(call_frame_t *frame, xlator_t *this, fd_t *fd, size_t size,
            off_t off, dict_t *xdata)
{
    int ret = 0;
    int op_errno = ENOMEM;

    if (!xdata) {
        xdata = dict_new();
        if (!xdata) {
            gf_msg(this->name, GF_LOG_ERROR, 0, ENOMEM,
                   "failed to create "
                   "dict");
            goto err;
        }
    }

    ret = dict_set_uint32(xdata, GF_CS_OBJECT_STATUS, 1);
    if (ret) {
        gf_msg(this->name, GF_LOG_ERROR, 0, 0,
               "dict_set failed key:"
               " %s",
               GF_CS_OBJECT_STATUS);
        goto err;
    }

    STACK_WIND(frame, default_readdirp_cbk, FIRST_CHILD(this),
               FIRST_CHILD(this)->fops->readdirp, fd, size, off, xdata);
    return 0;
err:
    STACK_UNWIND_STRICT(readdirp, frame, -1, op_errno, NULL, NULL);
    return 0;
}

int32_t
cs_truncate_cbk(call_frame_t *frame, void *cookie, xlator_t *this,
                int32_t op_ret, int32_t op_errno, struct iatt *prebuf,
                struct iatt *postbuf, dict_t *xdata)
{
    cs_local_t *local = NULL;
    int ret = 0;
    uint64_t val = 0;

    local = frame->local;

    local->call_cnt++;

    if (op_ret == -1) {
        gf_msg(this->name, GF_LOG_ERROR, 0, 0, "truncate failed");
        ret = dict_get_uint64(xdata, GF_CS_OBJECT_STATUS, &val);
        if (ret == 0) {
            if (val == GF_CS_ERROR) {
                gf_msg(this->name, GF_LOG_ERROR, 0, 0,
                       "could not get file state, unwinding");
                op_ret = -1;
                op_errno = EIO;
                goto unwind;
            } else {
                __cs_inode_ctx_update(this, local->loc.inode, val);
                gf_msg(this->name, GF_LOG_INFO, 0, 0, " state = %" PRIu64, val);

                if (local->call_cnt == 1 &&
                    (val == GF_CS_REMOTE || val == GF_CS_DOWNLOADING)) {
                    gf_msg(this->name, GF_LOG_WARNING, 0, 0,
                           "will repair and download "
                           "the file, current state : %" PRIu64,
                           val);
                    goto repair;
                } else {
                    gf_msg(this->name, GF_LOG_ERROR, 0, 0,
                           "second truncate, Unwinding");
                    goto unwind;
                }
            }
        } else {
            gf_msg(this->name, GF_LOG_ERROR, 0, 0,
                   "file state "
                   "could not be figured, unwinding");
            goto unwind;
        }
    } else {
        /* successful write => file is local */
        __cs_inode_ctx_update(this, local->loc.inode, GF_CS_LOCAL);
        gf_msg(this->name, GF_LOG_INFO, 0, 0,
               "state : GF_CS_LOCAL"
               ", truncate successful");

        goto unwind;
    }

repair:
    ret = locate_and_execute(frame);
    if (ret) {
        goto unwind;
    }

    return 0;

unwind:
    CS_STACK_UNWIND(truncate, frame, op_ret, op_errno, prebuf, postbuf, xdata);
    return 0;
}

int32_t
cs_truncate(call_frame_t *frame, xlator_t *this, loc_t *loc, off_t offset,
            dict_t *xdata)
{
    int op_errno = -1;
    cs_local_t *local = NULL;
    int ret = 0;
    cs_inode_ctx_t *ctx = NULL;
    gf_cs_obj_state state = -1;

    VALIDATE_OR_GOTO(frame, err);
    VALIDATE_OR_GOTO(this, err);
    VALIDATE_OR_GOTO(loc, err);

    local = cs_local_init(this, frame, loc, NULL, GF_FOP_TRUNCATE);
    if (!local) {
        gf_msg(this->name, GF_LOG_ERROR, 0, 0, "local init failed");
        op_errno = ENOMEM;
        goto err;
    }

    __cs_inode_ctx_get(this, loc->inode, &ctx);

    if (ctx)
        state = __cs_get_file_state(this, loc->inode, ctx);
    else
        state = GF_CS_LOCAL;

    local->xattr_req = xdata ? dict_ref(xdata) : (xdata = dict_new());

    ret = dict_set_uint32(local->xattr_req, GF_CS_OBJECT_STATUS, 1);
    if (ret) {
        gf_msg(this->name, GF_LOG_ERROR, 0, 0,
               "dict_set failed key:"
               " %s",
               GF_CS_OBJECT_STATUS);
        goto err;
    }

    local->stub = fop_truncate_stub(frame, cs_resume_truncate, loc, offset,
                                    xdata);
    if (!local->stub) {
        gf_msg(this->name, GF_LOG_ERROR, 0, 0, "insufficient memory");
        op_errno = ENOMEM;
        goto err;
    }

    if (state == GF_CS_LOCAL) {
        STACK_WIND(frame, cs_truncate_cbk, FIRST_CHILD(this),
                   FIRST_CHILD(this)->fops->truncate, loc, offset, xdata);

    } else {
        local->call_cnt++;
        ret = locate_and_execute(frame);
        if (ret) {
            op_errno = ENOMEM;
            goto err;
        }
    }

    return 0;
err:
    CS_STACK_UNWIND(truncate, frame, -1, op_errno, NULL, NULL, NULL);
    return 0;
}

int32_t
cs_statfs_cbk(call_frame_t *frame, void *cookie, xlator_t *this, int32_t op_ret,
              int32_t op_errno, struct statvfs *buf, dict_t *xdata)
{
    STACK_UNWIND_STRICT(statfs, frame, op_ret, op_errno, buf, xdata);
    return 0;
}

int32_t
cs_statfs(call_frame_t *frame, xlator_t *this, loc_t *loc, dict_t *xdata)
{
    STACK_WIND(frame, cs_statfs_cbk, FIRST_CHILD(this),
               FIRST_CHILD(this)->fops->statfs, loc, xdata);
    return 0;
}

int32_t
cs_getxattr_cbk(call_frame_t *frame, void *cookie, xlator_t *this,
                int32_t op_ret, int32_t op_errno, dict_t *dict, dict_t *xdata)
{
    STACK_UNWIND_STRICT(getxattr, frame, op_ret, op_errno, dict, xdata);
    return 0;
}

int32_t
cs_getxattr(call_frame_t *frame, xlator_t *this, loc_t *loc, const char *name,
            dict_t *xattr_req)
{
    STACK_WIND(frame, cs_getxattr_cbk, FIRST_CHILD(this),
               FIRST_CHILD(this)->fops->getxattr, loc, name, xattr_req);
    return 0;
}

int32_t
cs_setxattr_cbk(call_frame_t *frame, void *cookie, xlator_t *this,
                int32_t op_ret, int32_t op_errno, dict_t *xdata)
{
    cs_local_t *local = NULL;

    local = frame->local;

    if (local->locked)
        cs_inodelk_unlock(frame);

    CS_STACK_UNWIND(setxattr, frame, op_ret, op_errno, xdata);

    return 0;
}

int32_t
cs_setxattr(call_frame_t *frame, xlator_t *this, loc_t *loc, dict_t *dict,
            int32_t flags, dict_t *xdata)
{
    data_t *tmp = NULL;
    cs_local_t *local = NULL;
    int ret = 0;

    VALIDATE_OR_GOTO(frame, err);
    VALIDATE_OR_GOTO(this, err);

    local = cs_local_init(this, frame, loc, NULL, GF_FOP_SETXATTR);
    if (!local) {
        ret = -1;
        goto err;
    }

    local->xattr_req = xdata ? dict_ref(xdata) : (xdata = dict_new());

    tmp = dict_get(dict, GF_CS_OBJECT_UPLOAD_COMPLETE);
    if (tmp) {
        /* Value of key should be the atime */
        local->stub = fop_setxattr_stub(frame, cs_resume_setxattr, loc, dict,
                                        flags, xdata);

        if (!local->stub)
            goto err;

        ret = locate_and_execute(frame);
        if (ret) {
            goto err;
        }

        return 0;
    }

    STACK_WIND(frame, cs_setxattr_cbk, FIRST_CHILD(this),
               FIRST_CHILD(this)->fops->setxattr, loc, dict, flags, xdata);
    return 0;
err:
    CS_STACK_UNWIND(setxattr, frame, -1, errno, NULL);
    return 0;
}

int32_t
cs_fgetxattr_cbk(call_frame_t *frame, void *cookie, xlator_t *this,
                 int32_t op_ret, int32_t op_errno, dict_t *dict, dict_t *xdata)
{
    STACK_UNWIND_STRICT(fgetxattr, frame, op_ret, op_errno, dict, xdata);
    return 0;
}

int32_t
cs_fgetxattr(call_frame_t *frame, xlator_t *this, fd_t *fd, const char *name,
             dict_t *xdata)
{
    STACK_WIND(frame, cs_fgetxattr_cbk, FIRST_CHILD(this),
               FIRST_CHILD(this)->fops->fgetxattr, fd, name, xdata);
    return 0;
}

int32_t
cs_fsetxattr_cbk(call_frame_t *frame, void *cookie, xlator_t *this,
                 int32_t op_ret, int32_t op_errno, dict_t *xdata)
{
    STACK_UNWIND_STRICT(fsetxattr, frame, op_ret, op_errno, xdata);
    return 0;
}

int32_t
cs_fsetxattr(call_frame_t *frame, xlator_t *this, fd_t *fd, dict_t *dict,
             int32_t flags, dict_t *xdata)
{
    STACK_WIND(frame, cs_fsetxattr_cbk, FIRST_CHILD(this),
               FIRST_CHILD(this)->fops->fsetxattr, fd, dict, flags, xdata);
    return 0;
}

int32_t
cs_unlink_cbk(call_frame_t *frame, void *cookie, xlator_t *this, int32_t op_ret,
              int32_t op_errno, struct iatt *preparent, struct iatt *postparent,
              dict_t *xdata)
{
    STACK_UNWIND_STRICT(unlink, frame, op_ret, op_errno, preparent, postparent,
                        xdata);
    return 0;
}

int32_t
cs_unlink(call_frame_t *frame, xlator_t *this, loc_t *loc, int32_t flags,
          dict_t *xattr_req)
{
    cs_local_t *local = NULL;
    int ret = 0;

    local = cs_local_init(this, frame, loc, NULL, GF_FOP_UNLINK);
    if (!local)
        goto err;

    local->xattr_req = xattr_req ? dict_ref(xattr_req) : dict_new();

    ret = dict_set_uint32(local->xattr_req, GF_CS_OBJECT_STATUS, 1);
    if (ret) {
        gf_msg(this->name, GF_LOG_ERROR, 0, 0,
               "dict_set failed key:"
               " %s",
               GF_CS_OBJECT_STATUS);
        goto err;
    }
    STACK_WIND(frame, cs_unlink_cbk, FIRST_CHILD(this),
               FIRST_CHILD(this)->fops->unlink, loc, flags, local->xattr_req);
    return 0;
err:
    CS_STACK_UNWIND(unlink, frame, -1, errno, NULL, NULL, NULL);
    return 0;
}

int32_t
cs_open_cbk(call_frame_t *frame, void *cookie, xlator_t *this, int32_t op_ret,
            int32_t op_errno, fd_t *fd, dict_t *xdata)
{
    int ret = 0;
    uint64_t val = 0;

    if (op_ret == 0) {
        ret = dict_get_uint64(xdata, GF_CS_OBJECT_STATUS, &val);
        if (!ret) {
            ret = __cs_inode_ctx_update(this, fd->inode, val);
            if (ret) {
                gf_msg(this->name, GF_LOG_ERROR, 0, 0, "ctx update failed");
            }
        }
    } else {
        cs_inode_ctx_reset(this, fd->inode);
    }

    CS_STACK_UNWIND(open, frame, op_ret, op_errno, fd, xdata);
    return 0;
}

int32_t
cs_open(call_frame_t *frame, xlator_t *this, loc_t *loc, int32_t flags,
        fd_t *fd, dict_t *xattr_req)
{
    cs_local_t *local = NULL;
    int ret = 0;

    local = cs_local_init(this, frame, NULL, fd, GF_FOP_OPEN);
    if (!local)
        goto err;

    local->xattr_req = xattr_req ? dict_ref(xattr_req) : dict_new();

    ret = dict_set_uint32(local->xattr_req, GF_CS_OBJECT_STATUS, 1);
    if (ret) {
        gf_msg(this->name, GF_LOG_ERROR, 0, 0,
               "dict_set failed key:"
               " %s",
               GF_CS_OBJECT_STATUS);
        goto err;
    }

    STACK_WIND(frame, cs_open_cbk, FIRST_CHILD(this),
               FIRST_CHILD(this)->fops->open, loc, flags, fd, local->xattr_req);
    return 0;
err:
    CS_STACK_UNWIND(open, frame, -1, errno, NULL, NULL);
    return 0;
}

int32_t
cs_fstat_cbk(call_frame_t *frame, void *cookie, xlator_t *this, int32_t op_ret,
             int32_t op_errno, struct iatt *buf, dict_t *xdata)
{
    int ret = 0;
    uint64_t val = 0;
    fd_t *fd = NULL;
    cs_local_t *local = NULL;

    local = frame->local;

    fd = local->fd;

    if (op_ret == 0) {
        ret = dict_get_uint64(xdata, GF_CS_OBJECT_STATUS, &val);
        if (!ret) {
            gf_msg_debug(this->name, 0, "state %" PRIu64, val);
            ret = __cs_inode_ctx_update(this, fd->inode, val);
            if (ret) {
                gf_msg(this->name, GF_LOG_ERROR, 0, 0, "ctx update failed");
            }
        }
    } else {
        cs_inode_ctx_reset(this, fd->inode);
    }

    CS_STACK_UNWIND(fstat, frame, op_ret, op_errno, buf, xdata);

    return 0;
}

int32_t
cs_fstat(call_frame_t *frame, xlator_t *this, fd_t *fd, dict_t *xattr_req)
{
    cs_local_t *local = NULL;
    int ret = 0;

    local = cs_local_init(this, frame, NULL, fd, GF_FOP_FSTAT);
    if (!local)
        goto err;

    if (fd->inode->ia_type == IA_IFDIR)
        goto wind;

    local->xattr_req = xattr_req ? dict_ref(xattr_req) : dict_new();

    ret = dict_set_uint32(local->xattr_req, GF_CS_OBJECT_STATUS, 1);
    if (ret) {
        gf_msg(this->name, GF_LOG_ERROR, 0, 0,
               "dict_set failed key:"
               " %s",
               GF_CS_OBJECT_STATUS);
        goto err;
    }

wind:
    STACK_WIND(frame, cs_fstat_cbk, FIRST_CHILD(this),
               FIRST_CHILD(this)->fops->fstat, fd, local->xattr_req);
    return 0;
err:
    CS_STACK_UNWIND(fstat, frame, -1, errno, NULL, NULL);
    return 0;
}

cs_local_t *
cs_local_init(xlator_t *this, call_frame_t *frame, loc_t *loc, fd_t *fd,
              glusterfs_fop_t fop)
{
    cs_local_t *local = NULL;
    int ret = 0;

    local = mem_get0(this->local_pool);
    if (!local)
        goto out;

    if (loc) {
        ret = loc_copy(&local->loc, loc);
        if (ret)
            goto out;
    }

    if (fd) {
        local->fd = fd_ref(fd);
    }

    local->op_ret = -1;
    local->op_errno = EUCLEAN;
    local->fop = fop;
    local->dloffset = 0;
    frame->local = local;
    local->locked = _gf_false;
    local->call_cnt = 0;
out:
    if (ret) {
        if (local)
            mem_put(local);
        local = NULL;
    }

    return local;
}

call_frame_t *
cs_lock_frame(call_frame_t *parent_frame)
{
    call_frame_t *lock_frame = NULL;

    lock_frame = copy_frame(parent_frame);

    if (lock_frame == NULL)
        goto out;

    set_lk_owner_from_ptr(&lock_frame->root->lk_owner, parent_frame->root);

out:
    return lock_frame;
}

void
cs_lock_wipe(call_frame_t *lock_frame)
{
    CS_STACK_DESTROY(lock_frame);
}

int32_t
cs_inodelk_unlock_cbk(call_frame_t *frame, void *cookie, xlator_t *this,
                      int32_t op_ret, int32_t op_errno, dict_t *xdata)
{
    cs_lock_wipe(frame);

    return 0;
}

int
cs_inodelk_unlock(call_frame_t *main_frame)
{
    xlator_t *this = NULL;
    struct gf_flock flock = {
        0,
    };
    call_frame_t *lock_frame = NULL;
    cs_local_t *lock_local = NULL;
    cs_local_t *main_local = NULL;
    int ret = 0;

    this = main_frame->this;
    main_local = main_frame->local;

    lock_frame = cs_lock_frame(main_frame);
    if (!lock_frame)
        goto out;

    lock_local = cs_local_init(this, lock_frame, NULL, NULL, 0);
    if (!lock_local)
        goto out;

    ret = cs_build_loc(&lock_local->loc, main_frame);
    if (ret) {
        goto out;
    }

    flock.l_type = F_UNLCK;

    main_local->locked = _gf_false;

    STACK_WIND(lock_frame, cs_inodelk_unlock_cbk, FIRST_CHILD(this),
               FIRST_CHILD(this)->fops->inodelk, CS_LOCK_DOMAIN,
               &lock_local->loc, F_SETLKW, &flock, NULL);

    return 0;

out:
    gf_msg(this->name, GF_LOG_ERROR, 0, 0,
           "Stale lock would be found on"
           " server");

    if (lock_frame)
        cs_lock_wipe(lock_frame);

    return 0;
}

int
cs_download_task(void *arg)
{
    call_frame_t *frame = NULL;
    xlator_t *this = NULL;
    cs_private_t *priv = NULL;
    int ret = -1;
    char *sign_req = NULL;
    fd_t *fd = NULL;
    cs_local_t *local = NULL;
    dict_t *dict = NULL;

    frame = (call_frame_t *)arg;

    this = frame->this;

    priv = this->private;

    if (!priv->stores) {
        gf_msg(this->name, GF_LOG_ERROR, 0, 0,
               "No remote store "
               "plugins found");
        ret = -1;
        goto out;
    }

    local = frame->local;

    if (local->fd)
        fd = fd_anonymous(local->fd->inode);
    else
        fd = fd_anonymous(local->loc.inode);

    if (!fd) {
        gf_msg("CS", GF_LOG_ERROR, 0, 0, "fd creation failed");
        ret = -1;
        goto out;
    }

    local->dlfd = fd;
    local->dloffset = 0;

    dict = dict_new();
    if (!dict) {
        gf_msg(this->name, GF_LOG_ERROR, 0, ENOMEM,
               "failed to create "
               "dict");
        ret = -1;
        goto out;
    }

    ret = dict_set_uint32(dict, GF_CS_OBJECT_DOWNLOADING, 1);
    if (ret) {
        gf_msg(this->name, GF_LOG_ERROR, 0, 0, "dict_set failed");
        ret = -1;
        goto out;
    }

    ret = syncop_fsetxattr(this, local->fd, dict, 0, NULL, NULL);
    if (ret) {
        gf_msg(this->name, GF_LOG_ERROR, 0, 0,
               "fsetxattr failed "
               "key %s",
               GF_CS_OBJECT_DOWNLOADING);
        ret = -1;
        goto out;
    }
    /*this calling method is for per volume setting */
    ret = priv->stores->dlfop(frame, priv->stores->config);
    if (ret) {
        gf_msg(this->name, GF_LOG_ERROR, 0, 0,
               "download failed"
               ", remotepath: %s",
               local->remotepath);

        /*using dlfd as it is anonymous and have RDWR flag*/
        ret = syncop_ftruncate(FIRST_CHILD(this), local->dlfd, 0, NULL, NULL,
                               NULL, NULL);
        if (ret) {
            gf_msg(this->name, GF_LOG_ERROR, 0, -ret, "ftruncate failed");
        } else {
            gf_msg_debug(this->name, 0, "ftruncate succeed");
        }

        ret = -1;
        goto out;
    } else {
        gf_msg(this->name, GF_LOG_INFO, 0, 0,
               "download success, path"
               " : %s",
               local->remotepath);

        ret = syncop_fremovexattr(this, local->fd, GF_CS_OBJECT_REMOTE, NULL,
                                  NULL);
        if (ret) {
            gf_msg(this->name, GF_LOG_ERROR, 0, -ret,
                   "removexattr failed, remotexattr");
            ret = -1;
            goto out;
        } else {
            gf_msg_debug(this->name, 0,
                         "fremovexattr success, "
                         "path : %s",
                         local->remotepath);
        }

        ret = syncop_fremovexattr(this, local->fd, GF_CS_OBJECT_DOWNLOADING,
                                  NULL, NULL);
        if (ret) {
            gf_msg(this->name, GF_LOG_ERROR, 0, -ret,
                   "removexattr failed, downloading xattr, path %s",
                   local->remotepath);
            ret = -1;
            goto out;
        } else {
            gf_msg_debug(this->name, 0,
                         "fremovexattr success"
                         " path  %s",
                         local->remotepath);
        }
    }

out:
    GF_FREE(sign_req);

    if (dict)
        dict_unref(dict);

    if (fd) {
        fd_unref(fd);
        local->dlfd = NULL;
    }

    return ret;
}

int
cs_download(call_frame_t *frame)
{
    int ret = 0;
    cs_local_t *local = NULL;
    xlator_t *this = NULL;

    local = frame->local;
    this = frame->this;

    if (!local->remotepath) {
        ret = -1;
        gf_msg(this->name, GF_LOG_ERROR, 0, 0,
               "remote path not"
               " available. Check posix logs to resolve");
        goto out;
    }

    ret = cs_download_task((void *)frame);
out:
    return ret;
}

int
cs_set_xattr_req(call_frame_t *frame)
{
    cs_local_t *local = NULL;
    GF_UNUSED int ret = 0;

    local = frame->local;

    /* When remote reads are performed (i.e. reads on remote store),
     * there needs to be a way to associate a file on gluster volume
     * with its correspnding file on the remote store. In order to do
     * that, a unique key can be maintained as an xattr
     * (GF_CS_XATTR_ARCHIVE_UUID)on the stub file on gluster bricks.
     * This xattr should be provided to the plugin to
     * perform the read fop on the correct file. This assumes that the file
     * hierarchy and name need not be the same on remote store as that of
     * the gluster volume.
     */
    ret = dict_set_str(local->xattr_req, GF_CS_XATTR_ARCHIVE_UUID, "1");

    return 0;
}

int
cs_update_xattrs(call_frame_t *frame, dict_t *xdata)
{
    cs_local_t *local = NULL;
    xlator_t *this = NULL;
    int size = -1;
    GF_UNUSED int ret = 0;

    local = frame->local;
    this = frame->this;

    local->xattrinfo.lxattr = GF_CALLOC(1, sizeof(cs_loc_xattr_t),
                                        gf_cs_mt_cs_lxattr_t);
    if (!local->xattrinfo.lxattr) {
        local->op_ret = -1;
        local->op_errno = ENOMEM;
        goto err;
    }

    gf_uuid_copy(local->xattrinfo.lxattr->gfid, local->loc.gfid);

    if (local->remotepath) {
        local->xattrinfo.lxattr->file_path = gf_strdup(local->remotepath);
        if (!local->xattrinfo.lxattr->file_path) {
            local->op_ret = -1;
            local->op_errno = ENOMEM;
            goto err;
        }
    }

    ret = dict_get_gfuuid(xdata, GF_CS_XATTR_ARCHIVE_UUID,
                          &(local->xattrinfo.lxattr->uuid));

    if (ret) {
        gf_uuid_clear(local->xattrinfo.lxattr->uuid);
    }
    size = strlen(this->name) - strlen("-cloudsync") + 1;
    local->xattrinfo.lxattr->volname = GF_CALLOC(1, size, gf_common_mt_char);
    if (!local->xattrinfo.lxattr->volname) {
        local->op_ret = -1;
        local->op_errno = ENOMEM;
        goto err;
    }
    strncpy(local->xattrinfo.lxattr->volname, this->name, size - 1);
    local->xattrinfo.lxattr->volname[size - 1] = '\0';

    return 0;
err:
    cs_xattrinfo_wipe(local);
    return -1;
}

int
cs_serve_readv(call_frame_t *frame, off_t offset, size_t size, uint32_t flags)
{
    xlator_t *this = NULL;
    cs_private_t *priv = NULL;
    int ret = -1;
    fd_t *fd = NULL;
    cs_local_t *local = NULL;

    local = frame->local;
    this = frame->this;
    priv = this->private;

    if (!local->remotepath) {
        ret = -1;
        gf_msg(this->name, GF_LOG_ERROR, 0, 0,
               "remote path not"
               " available. Check posix logs to resolve");
        goto out;
    }

    if (!priv->stores) {
        gf_msg(this->name, GF_LOG_ERROR, 0, 0,
               "No remote store "
               "plugins found");
        ret = -1;
        goto out;
    }

    if (local->fd) {
        fd = fd_anonymous(local->fd->inode);
    } else {
        fd = fd_anonymous(local->loc.inode);
    }

    local->xattrinfo.size = size;
    local->xattrinfo.offset = offset;
    local->xattrinfo.flags = flags;

    if (!fd) {
        gf_msg("CS", GF_LOG_ERROR, 0, 0, "fd creation failed");
        ret = -1;
        goto out;
    }

    local->dlfd = fd;
    local->dloffset = offset;

    /*this calling method is for per volume setting */
    ret = priv->stores->rdfop(frame, priv->stores->config);
    if (ret) {
        gf_msg(this->name, GF_LOG_ERROR, 0, 0,
               "read failed"
               ", remotepath: %s",
               local->remotepath);
        ret = -1;
        goto out;
    } else {
        gf_msg(this->name, GF_LOG_INFO, 0, 0,
               "read success, path"
               " : %s",
               local->remotepath);
    }

out:
    if (fd) {
        fd_unref(fd);
        local->dlfd = NULL;
    }
    return ret;
}

int32_t
cs_readv_cbk(call_frame_t *frame, void *cookie, xlator_t *this, int32_t op_ret,
             int32_t op_errno, struct iovec *vector, int32_t count,
             struct iatt *stbuf, struct iobref *iobref, dict_t *xdata)
{
    cs_local_t *local = NULL;
    int ret = 0;
    uint64_t val = 0;
    fd_t *fd = NULL;

    local = frame->local;
    fd = local->fd;

    local->call_cnt++;

    if (op_ret == -1) {
        ret = dict_get_uint64(xdata, GF_CS_OBJECT_STATUS, &val);
        if (ret == 0) {
            if (val == GF_CS_ERROR) {
                gf_msg(this->name, GF_LOG_ERROR, 0, 0,
                       "could not get file state, unwinding");
                op_ret = -1;
                op_errno = EIO;
                goto unwind;
            } else {
                __cs_inode_ctx_update(this, fd->inode, val);
                gf_msg(this->name, GF_LOG_INFO, 0, 0, " state = %" PRIu64, val);

                if (local->call_cnt == 1 &&
                    (val == GF_CS_REMOTE || val == GF_CS_DOWNLOADING)) {
                    gf_msg(this->name, GF_LOG_INFO, 0, 0,
                           " will read from remote : %" PRIu64, val);
                    goto repair;
                } else {
                    gf_msg(this->name, GF_LOG_ERROR, 0, 0,
                           "second readv, Unwinding");
                    goto unwind;
                }
            }
        } else {
            gf_msg(this->name, GF_LOG_ERROR, 0, 0,
                   "file state "
                   "could not be figured, unwinding");
            goto unwind;
        }
    } else {
        /* successful readv => file is local */
        __cs_inode_ctx_update(this, fd->inode, GF_CS_LOCAL);
        gf_msg(this->name, GF_LOG_INFO, 0, 0,
               "state : GF_CS_LOCAL"
               ", readv successful");

        goto unwind;
    }

repair:
    ret = locate_and_execute(frame);
    if (ret) {
        goto unwind;
    }

    return 0;

unwind:
    CS_STACK_UNWIND(readv, frame, op_ret, op_errno, vector, count, stbuf,
                    iobref, xdata);

    return 0;
}

int32_t
cs_resume_readv(call_frame_t *frame, xlator_t *this, fd_t *fd, size_t size,
                off_t offset, uint32_t flags, dict_t *xdata)
{
    int ret = 0;

    ret = cs_resume_postprocess(this, frame, fd->inode);
    if (ret) {
        goto unwind;
    }

    cs_inodelk_unlock(frame);

    STACK_WIND(frame, cs_readv_cbk, FIRST_CHILD(this),
               FIRST_CHILD(this)->fops->readv, fd, size, offset, flags, xdata);

    return 0;

unwind:
    cs_inodelk_unlock(frame);

    cs_common_cbk(frame);

    return 0;
}

int32_t
cs_resume_remote_readv(call_frame_t *frame, xlator_t *this, fd_t *fd,
                       size_t size, off_t offset, uint32_t flags, dict_t *xdata)
{
    int ret = 0;
    cs_local_t *local = NULL;
    gf_cs_obj_state state = -1;
    cs_inode_ctx_t *ctx = NULL;

    cs_inodelk_unlock(frame);

    local = frame->local;
    if (!local) {
        ret = -1;
        goto unwind;
    }

    __cs_inode_ctx_get(this, fd->inode, &ctx);

    state = __cs_get_file_state(this, fd->inode, ctx);
    if (state == GF_CS_ERROR) {
        gf_msg(this->name, GF_LOG_ERROR, 0, 0,
               "status is GF_CS_ERROR."
               " Aborting readv");
        local->op_ret = -1;
        local->op_errno = EREMOTE;
        ret = -1;
        goto unwind;
    }

    /* Serve readv from remote store only if it is remote. */
    gf_msg_debug(this->name, 0, "status of file %s is %d",
                 local->remotepath ? local->remotepath : "", state);

    /* We will reach this condition if local inode ctx had REMOTE
     * state when the control was in cs_readv but after stat
     * we got an updated state saying that the file is LOCAL.
     */
    if (state == GF_CS_LOCAL) {
        STACK_WIND(frame, cs_readv_cbk, FIRST_CHILD(this),
                   FIRST_CHILD(this)->fops->readv, fd, size, offset, flags,
                   xdata);
    } else if (state == GF_CS_REMOTE) {
        ret = cs_resume_remote_readv_postprocess(this, frame, fd->inode, offset,
                                                 size, flags);
        /* Failed to submit the remote readv fop to plugin */
        if (ret) {
            local->op_ret = -1;
            local->op_errno = EREMOTE;
            goto unwind;
        }
        /* When the file is in any other intermediate state,
         * we should not perform remote reads.
         */
    } else {
        local->op_ret = -1;
        local->op_errno = EINVAL;
        goto unwind;
    }

    return 0;

unwind:
    cs_common_cbk(frame);

    return 0;
}

int32_t
cs_readv(call_frame_t *frame, xlator_t *this, fd_t *fd, size_t size,
         off_t offset, uint32_t flags, dict_t *xdata)
{
    int op_errno = -1;
    cs_local_t *local = NULL;
    int ret = 0;
    cs_inode_ctx_t *ctx = NULL;
    gf_cs_obj_state state = -1;
    cs_private_t *priv = NULL;

    VALIDATE_OR_GOTO(frame, err);
    VALIDATE_OR_GOTO(this, err);
    VALIDATE_OR_GOTO(fd, err);

    priv = this->private;

    local = cs_local_init(this, frame, NULL, fd, GF_FOP_READ);
    if (!local) {
        gf_msg(this->name, GF_LOG_ERROR, 0, 0, "local init failed");
        op_errno = ENOMEM;
        goto err;
    }

    __cs_inode_ctx_get(this, fd->inode, &ctx);

    if (ctx)
        state = __cs_get_file_state(this, fd->inode, ctx);
    else
        state = GF_CS_LOCAL;

    local->xattr_req = xdata ? dict_ref(xdata) : (xdata = dict_new());

    ret = dict_set_uint32(local->xattr_req, GF_CS_OBJECT_STATUS, 1);
    if (ret) {
        gf_msg(this->name, GF_LOG_ERROR, 0, 0,
               "dict_set failed key:"
               " %s",
               GF_CS_OBJECT_STATUS);
        goto err;
    }

    if (priv->remote_read) {
        local->stub = fop_readv_stub(frame, cs_resume_remote_readv, fd, size,
                                     offset, flags, xdata);
    } else {
        local->stub = fop_readv_stub(frame, cs_resume_readv, fd, size, offset,
                                     flags, xdata);
    }
    if (!local->stub) {
        gf_msg(this->name, GF_LOG_ERROR, 0, 0, "insufficient memory");
        op_errno = ENOMEM;
        goto err;
    }

    if (state == GF_CS_LOCAL) {
        STACK_WIND(frame, cs_readv_cbk, FIRST_CHILD(this),
                   FIRST_CHILD(this)->fops->readv, fd, size, offset, flags,
                   xdata);
    } else {
        local->call_cnt++;
        ret = locate_and_execute(frame);
        if (ret) {
            op_errno = ENOMEM;
            goto err;
        }
    }

    return 0;

err:
    CS_STACK_UNWIND(readv, frame, -1, op_errno, NULL, -1, NULL, NULL, NULL);

    return 0;
}

int
cs_resume_remote_readv_postprocess(xlator_t *this, call_frame_t *frame,
                                   inode_t *inode, off_t offset, size_t size,
                                   uint32_t flags)
{
    int ret = 0;

    ret = cs_serve_readv(frame, offset, size, flags);

    return ret;
}

int
cs_stat_check_cbk(call_frame_t *frame, void *cookie, xlator_t *this, int op_ret,
                  int op_errno, struct iatt *stbuf, dict_t *xdata)
{
    cs_local_t *local = NULL;
    call_stub_t *stub = NULL;
    char *filepath = NULL;
    int ret = 0;
    inode_t *inode = NULL;
    uint64_t val = 0;

    local = frame->local;

    if (op_ret == -1) {
        local->op_ret = op_ret;
        local->op_errno = op_errno;
        gf_msg(this->name, GF_LOG_ERROR, 0, op_errno, "stat check failed");
        goto err;
    } else {
        if (local->fd)
            inode = local->fd->inode;
        else
            inode = local->loc.inode;

        if (!inode) {
            local->op_ret = -1;
            local->op_errno = EINVAL;
            gf_msg(this->name, GF_LOG_ERROR, 0, 0,
                   "null inode "
                   "returned");
            goto err;
        }

        ret = dict_get_uint64(xdata, GF_CS_OBJECT_STATUS, &val);
        if (ret == 0) {
            if (val == GF_CS_ERROR) {
                cs_inode_ctx_reset(this, inode);
                local->op_ret = -1;
                local->op_errno = EIO;
                gf_msg(this->name, GF_LOG_ERROR, 0, 0,
                       "status = GF_CS_ERROR. failed to get "
                       " file state");
                goto err;
            } else {
                ret = __cs_inode_ctx_update(this, inode, val);
                gf_msg_debug(this->name, 0, "status : %" PRIu64, val);
                if (ret) {
                    gf_msg(this->name, GF_LOG_ERROR, 0, 0, "ctx update failed");
                    local->op_ret = -1;
                    local->op_errno = ENOMEM;
                    goto err;
                }
            }
        } else {
            gf_msg_debug(this->name, 0, "status not found in dict");
            local->op_ret = -1;
            local->op_errno = ENOMEM;
            goto err;
        }

        ret = dict_get_str(xdata, GF_CS_OBJECT_REMOTE, &filepath);
        if (filepath) {
            gf_msg_debug(this->name, 0, "filepath returned %s", filepath);
            local->remotepath = gf_strdup(filepath);
            if (!local->remotepath) {
                local->op_ret = -1;
                local->op_errno = ENOMEM;
                goto err;
            }
        } else {
            gf_msg_debug(this->name, 0, "NULL filepath");
        }

        ret = cs_update_xattrs(frame, xdata);
        if (ret)
            goto err;

        local->op_ret = 0;
        local->xattr_rsp = dict_ref(xdata);
        memcpy(&local->stbuf, stbuf, sizeof(struct iatt));
    }

    stub = local->stub;
    local->stub = NULL;
    call_resume(stub);

    return 0;
err:
    cs_inodelk_unlock(frame);

    cs_common_cbk(frame);

    return 0;
}

int
cs_do_stat_check(call_frame_t *main_frame)
{
    cs_local_t *local = NULL;
    xlator_t *this = NULL;
    int ret = 0;

    local = main_frame->local;
    this = main_frame->this;

    ret = dict_set_uint32(local->xattr_req, GF_CS_OBJECT_REPAIR, 256);
    if (ret) {
        gf_msg(this->name, GF_LOG_ERROR, 0, 0, "dict_set failed");
        goto err;
    }

    cs_set_xattr_req(main_frame);

    if (local->fd) {
        STACK_WIND(main_frame, cs_stat_check_cbk, FIRST_CHILD(this),
                   FIRST_CHILD(this)->fops->fstat, local->fd, local->xattr_req);
    } else {
        STACK_WIND(main_frame, cs_stat_check_cbk, FIRST_CHILD(this),
                   FIRST_CHILD(this)->fops->stat, &local->loc,
                   local->xattr_req);
    }

    return 0;

err:
    cs_inodelk_unlock(main_frame);

    cs_common_cbk(main_frame);

    return 0;
}

void
cs_common_cbk(call_frame_t *frame)
{
    glusterfs_fop_t fop = -1;
    cs_local_t *local = NULL;

    local = frame->local;

    fop = local->fop;

    /*Note: Only the failure case needs to be handled here. Since for
     * successful stat check the fop will resume anyway. The unwind can
     * happen from the fop_cbk and each cbk can unlock the inodelk in case
     * a lock was taken before. The lock status can be stored in frame */

    /* for failure case  */

    /*TODO: add other fops*/
    switch (fop) {
        case GF_FOP_WRITE:
            CS_STACK_UNWIND(writev, frame, local->op_ret, local->op_errno, NULL,
                            NULL, NULL);
            break;

        case GF_FOP_SETXATTR:
            CS_STACK_UNWIND(setxattr, frame, local->op_ret, local->op_errno,
                            NULL);
            break;
        case GF_FOP_READ:
            CS_STACK_UNWIND(readv, frame, local->op_ret, local->op_errno, NULL,
                            0, NULL, NULL, NULL);
            break;
        case GF_FOP_FTRUNCATE:
            CS_STACK_UNWIND(ftruncate, frame, local->op_ret, local->op_errno,
                            NULL, NULL, NULL);
            break;

        case GF_FOP_TRUNCATE:
            CS_STACK_UNWIND(truncate, frame, local->op_ret, local->op_errno,
                            NULL, NULL, NULL);
            break;
        default:
            break;
    }

    return;
}

int
cs_blocking_inodelk_cbk(call_frame_t *lock_frame, void *cookie, xlator_t *this,
                        int32_t op_ret, int32_t op_errno, dict_t *xdata)
{
    cs_local_t *main_local = NULL;
    call_frame_t *main_frame = NULL;
    cs_local_t *lock_local = NULL;

    lock_local = lock_frame->local;

    main_frame = lock_local->main_frame;
    main_local = main_frame->local;

    if (op_ret) {
        gf_msg(this->name, GF_LOG_ERROR, 0, 0, "inodelk failed");
        main_local->op_errno = op_errno;
        main_local->op_ret = op_ret;
        goto err;
    }

    main_local->locked = _gf_true;

    cs_lock_wipe(lock_frame);

    cs_do_stat_check(main_frame);

    return 0;
err:
    cs_common_cbk(main_frame);

    cs_lock_wipe(lock_frame);

    return 0;
}

int
cs_build_loc(loc_t *loc, call_frame_t *frame)
{
    cs_local_t *local = NULL;
    int ret = -1;

    local = frame->local;

    if (local->fd) {
        loc->inode = inode_ref(local->fd->inode);
        if (loc->inode) {
            gf_uuid_copy(loc->gfid, loc->inode->gfid);
            ret = 0;
            goto out;
        } else {
            ret = -1;
            goto out;
        }
    } else {
        loc->inode = inode_ref(local->loc.inode);
        if (loc->inode) {
            gf_uuid_copy(loc->gfid, loc->inode->gfid);
            ret = 0;
            goto out;
        } else {
            ret = -1;
            goto out;
        }
    }
out:
    return ret;
}

int
cs_blocking_inodelk(call_frame_t *parent_frame)
{
    call_frame_t *lock_frame = NULL;
    cs_local_t *lock_local = NULL;
    xlator_t *this = NULL;
    struct gf_flock flock = {
        0,
    };
    int ret = 0;

    this = parent_frame->this;

    lock_frame = cs_lock_frame(parent_frame);
    if (!lock_frame) {
        gf_msg(this->name, GF_LOG_ERROR, 0, 0, "insuffcient memory");
        goto err;
    }

    lock_local = cs_local_init(this, lock_frame, NULL, NULL, 0);
    if (!lock_local) {
        gf_msg(this->name, GF_LOG_ERROR, 0, 0, "local init failed");
        goto err;
    }

    lock_local->main_frame = parent_frame;

    flock.l_type = F_WRLCK;

    ret = cs_build_loc(&lock_local->loc, parent_frame);
    if (ret) {
        gf_msg(this->name, GF_LOG_ERROR, 0, 0, "build_loc failed");
        goto err;
    }

    STACK_WIND(lock_frame, cs_blocking_inodelk_cbk, FIRST_CHILD(this),
               FIRST_CHILD(this)->fops->inodelk, CS_LOCK_DOMAIN,
               &lock_local->loc, F_SETLKW, &flock, NULL);

    return 0;
err:
    if (lock_frame)
        cs_lock_wipe(lock_frame);

    return -1;
}

int
locate_and_execute(call_frame_t *frame)
{
    int ret = 0;

    ret = cs_blocking_inodelk(frame);

    if (ret)
        return -1;
    else
        return 0;
}

int32_t
cs_resume_truncate(call_frame_t *frame, xlator_t *this, loc_t *loc,
                   off_t offset, dict_t *xattr_req)
{
    cs_local_t *local = NULL;
    int ret = 0;

    local = frame->local;

    ret = cs_resume_postprocess(this, frame, loc->inode);
    if (ret) {
        goto unwind;
    }

    cs_inodelk_unlock(frame);

    STACK_WIND(frame, cs_truncate_cbk, FIRST_CHILD(this),
               FIRST_CHILD(this)->fops->truncate, loc, offset,
               local->xattr_req);

    return 0;

unwind:
    cs_inodelk_unlock(frame);

    cs_common_cbk(frame);

    return 0;
}

int32_t
cs_resume_setxattr(call_frame_t *frame, xlator_t *this, loc_t *loc,
                   dict_t *dict, int32_t flags, dict_t *xdata)
{
    cs_local_t *local = NULL;
    cs_inode_ctx_t *ctx = NULL;
    gf_cs_obj_state state = GF_CS_ERROR;

    local = frame->local;

    __cs_inode_ctx_get(this, loc->inode, &ctx);

    state = __cs_get_file_state(this, loc->inode, ctx);

    if (state == GF_CS_ERROR) {
        /* file is already remote */
        local->op_ret = -1;
        local->op_errno = EINVAL;
        gf_msg(this->name, GF_LOG_WARNING, 0, 0,
               "file %s , could not figure file state", loc->path);
        goto unwind;
    }

    if (state == GF_CS_REMOTE) {
        /* file is already remote */
        local->op_ret = -1;
        local->op_errno = EINVAL;
        gf_msg(this->name, GF_LOG_WARNING, 0, EINVAL,
               "file %s is already remote", loc->path);
        goto unwind;
    }

    if (state == GF_CS_DOWNLOADING) {
        gf_msg(this->name, GF_LOG_WARNING, 0, 0,
               " file is in downloading state.");
        local->op_ret = -1;
        local->op_errno = EINVAL;
        goto unwind;
    }

    STACK_WIND(frame, cs_setxattr_cbk, FIRST_CHILD(this),
               FIRST_CHILD(this)->fops->setxattr, loc, dict, flags,
               local->xattr_req);

    return 0;
unwind:
    cs_inodelk_unlock(frame);

    cs_common_cbk(frame);

    return 0;
}

gf_cs_obj_state
__cs_get_file_state(xlator_t *this, inode_t *inode, cs_inode_ctx_t *ctx)
{
    gf_cs_obj_state state = -1;

    if (!ctx)
        return GF_CS_ERROR;

    LOCK(&inode->lock);
    {
        state = ctx->state;
    }
    UNLOCK(&inode->lock);

    return state;
}

void
__cs_inode_ctx_get(xlator_t *this, inode_t *inode, cs_inode_ctx_t **ctx)
{
    uint64_t ctxint = 0;
    int ret = 0;

    LOCK(&inode->lock);
    {
        ret = __inode_ctx_get(inode, this, &ctxint);
    }
    UNLOCK(&inode->lock);

    if (ret)
        *ctx = NULL;
    else
        *ctx = (cs_inode_ctx_t *)(uintptr_t)ctxint;

    return;
}

int
__cs_inode_ctx_update(xlator_t *this, inode_t *inode, uint64_t val)
{
    cs_inode_ctx_t *ctx = NULL;
    uint64_t ctxint = 0;
    int ret = 0;

    LOCK(&inode->lock);
    {
        ret = __inode_ctx_get(inode, this, &ctxint);
        if (ret) {
            ctx = GF_CALLOC(1, sizeof(*ctx), gf_cs_mt_cs_inode_ctx_t);
            if (!ctx) {
                gf_msg(this->name, GF_LOG_ERROR, 0, 0, "ctx allocation failed");
                ret = -1;
                goto out;
            }

            ctx->state = val;

            ctxint = (uint64_t)(uintptr_t)ctx;

            ret = __inode_ctx_set(inode, this, &ctxint);
            if (ret) {
                GF_FREE(ctx);
                goto out;
            }
        } else {
            ctx = (cs_inode_ctx_t *)(uintptr_t)ctxint;

            ctx->state = val;
        }
    }

out:
    UNLOCK(&inode->lock);

    return ret;
}

int
cs_inode_ctx_reset(xlator_t *this, inode_t *inode)
{
    cs_inode_ctx_t *ctx = NULL;
    uint64_t ctxint = 0;

    inode_ctx_del(inode, this, &ctxint);
    if (!ctxint) {
        return 0;
    }

    ctx = (cs_inode_ctx_t *)(uintptr_t)ctxint;

    GF_FREE(ctx);
    return 0;
}

int
cs_resume_postprocess(xlator_t *this, call_frame_t *frame, inode_t *inode)
{
    cs_local_t *local = NULL;
    gf_cs_obj_state state = -1;
    cs_inode_ctx_t *ctx = NULL;
    int ret = 0;

    local = frame->local;
    if (!local) {
        ret = -1;
        goto out;
    }

    __cs_inode_ctx_get(this, inode, &ctx);

    state = __cs_get_file_state(this, inode, ctx);
    if (state == GF_CS_ERROR) {
        gf_msg(this->name, GF_LOG_ERROR, 0, 0,
               "status is GF_CS_ERROR."
               " Aborting write");
        local->op_ret = -1;
        local->op_errno = EREMOTE;
        ret = -1;
        goto out;
    }

    if (state == GF_CS_REMOTE || state == GF_CS_DOWNLOADING) {
        gf_msg_debug(this->name, 0, "status is %d", state);
        ret = cs_download(frame);
        if (ret == 0) {
            gf_msg_debug(this->name, 0, "Winding for Final Write");
        } else {
            gf_msg(this->name, GF_LOG_ERROR, 0, 0,
                   " download failed, unwinding writev");
            local->op_ret = -1;
            local->op_errno = EREMOTE;
            ret = -1;
        }
    }
out:
    return ret;
}

int32_t
__cs_get_dict_str(char **str, dict_t *xattr, const char *name, int *errnum)
{
    data_t *data = NULL;
    int ret = -1;

    assert(str != NULL);

    data = dict_get(xattr, (char *)name);
    if (!data) {
        *errnum = ENODATA;
        goto out;
    }

    *str = GF_CALLOC(data->len + 1, sizeof(char), gf_common_mt_char);
    if (!(*str)) {
        *errnum = ENOMEM;
        goto out;
    }

    memcpy(*str, data->data, sizeof(char) * (data->len));
    return 0;

out:
    return ret;
}

int32_t
__cs_get_dict_uuid(uuid_t uuid, dict_t *xattr, const char *name, int *errnum)
{
    data_t *data = NULL;
    int ret = -1;

    assert(uuid != NULL);

    data = dict_get(xattr, (char *)name);
    if (!data) {
        *errnum = ENODATA;
        goto out;
    }

    assert(data->len == sizeof(uuid_t));

    gf_uuid_copy(uuid, (unsigned char *)data->data);
    return 0;

out:
    return ret;
}

int32_t
cs_fdctx_to_dict(xlator_t *this, fd_t *fd, dict_t *dict)
{
    return 0;
}

int32_t
cs_inode(xlator_t *this)
{
    return 0;
}

int32_t
cs_inode_to_dict(xlator_t *this, dict_t *dict)
{
    return 0;
}

int32_t
cs_history(xlator_t *this)
{
    return 0;
}

int32_t
cs_fd(xlator_t *this)
{
    return 0;
}

int32_t
cs_fd_to_dict(xlator_t *this, dict_t *dict)
{
    return 0;
}

int32_t
cs_fdctx(xlator_t *this, fd_t *fd)
{
    return 0;
}

int32_t
cs_inodectx(xlator_t *this, inode_t *ino)
{
    return 0;
}

int32_t
cs_inodectx_to_dict(xlator_t *this, inode_t *ino, dict_t *dict)
{
    return 0;
}

int32_t
cs_priv_to_dict(xlator_t *this, dict_t *dict, char *brickname)
{
    return 0;
}

int32_t
cs_priv(xlator_t *this)
{
    return 0;
}

int
cs_notify(xlator_t *this, int event, void *data, ...)
{
    return default_notify(this, event, data);
}

struct xlator_fops cs_fops = {
    .stat = cs_stat,
    .readdirp = cs_readdirp,
    .truncate = cs_truncate,
    .seek = cs_seek,
    .statfs = cs_statfs,
    .fallocate = cs_fallocate,
    .discard = cs_discard,
    .getxattr = cs_getxattr,
    .writev = cs_writev,
    .setxattr = cs_setxattr,
    .fgetxattr = cs_fgetxattr,
    .lookup = cs_lookup,
    .fsetxattr = cs_fsetxattr,
    .readv = cs_readv,
    .ftruncate = cs_ftruncate,
    .rchecksum = cs_rchecksum,
    .unlink = cs_unlink,
    .open = cs_open,
    .fstat = cs_fstat,
    .zerofill = cs_zerofill,
};

struct xlator_cbks cs_cbks = {
    .forget = cs_forget,
};

struct xlator_dumpops cs_dumpops = {
    .fdctx_to_dict = cs_fdctx_to_dict,
    .inode = cs_inode,
    .inode_to_dict = cs_inode_to_dict,
    .history = cs_history,
    .fd = cs_fd,
    .fd_to_dict = cs_fd_to_dict,
    .fdctx = cs_fdctx,
    .inodectx = cs_inodectx,
    .inodectx_to_dict = cs_inodectx_to_dict,
    .priv_to_dict = cs_priv_to_dict,
    .priv = cs_priv,
};

struct volume_options cs_options[] = {
    {.key = {"cloudsync-storetype"},
     .type = GF_OPTION_TYPE_STR,
     .description = "Defines which remote store is enabled"},
    {.key = {"cloudsync-remote-read"},
     .type = GF_OPTION_TYPE_BOOL,
     .description = "Defines a remote read fop when on"},
    {.key = {"cloudsync-store-id"},
     .type = GF_OPTION_TYPE_STR,
     .description = "Defines a volume wide store id"},
    {.key = {"cloudsync-product-id"},
     .type = GF_OPTION_TYPE_STR,
     .description = "Defines a volume wide product id"},
    {.key = {NULL}},
};

xlator_api_t xlator_api = {
    .init = cs_init,
    .fini = cs_fini,
    .notify = cs_notify,
    .reconfigure = cs_reconfigure,
    .mem_acct_init = cs_mem_acct_init,
    .dumpops = &cs_dumpops,
    .fops = &cs_fops,
    .cbks = &cs_cbks,
    .options = cs_options,
    .identifier = "cloudsync",
    .category = GF_TECH_PREVIEW,
};