Blob Blame History Raw
#include <stdlib.h>
#include <glusterfs/xlator.h>
#include <glusterfs/glusterfs.h>
#include "libcvlt.h"
#include "cloudsync-common.h"
#include "cvlt-messages.h"

#define LIBARCHIVE_SO "libopenarchive.so"
#define ALIGN_SIZE 4096
#define CVLT_TRAILER "cvltv1"

store_methods_t store_ops = {
    .fop_download = cvlt_download,
    .fop_init = cvlt_init,
    .fop_reconfigure = cvlt_reconfigure,
    .fop_fini = cvlt_fini,
    .fop_remote_read = cvlt_read,
};

static const int32_t num_req = 32;
static const int32_t num_iatt = 32;
static char *plugin = "cvlt_cloudSync";

int32_t
mem_acct_init(xlator_t *this)
{
    int ret = -1;

    if (!this)
        return ret;

    ret = xlator_mem_acct_init(this, gf_libcvlt_mt_end + 1);

    if (ret != 0) {
        return ret;
    }

    return ret;
}

static void
cvlt_free_resources(archive_t *arch)
{
    /*
     * We will release all the resources that were allocated by the xlator.
     * Check whether there are any buffers which have not been released
     * back to a mempool.
     */

    if (arch->handle) {
        dlclose(arch->handle);
    }

    if (arch->iobuf_pool) {
        iobuf_pool_destroy(arch->iobuf_pool);
    }

    if (arch->req_pool) {
        mem_pool_destroy(arch->req_pool);
        arch->req_pool = NULL;
    }

    return;
}

static int32_t
cvlt_extract_store_fops(xlator_t *this, archive_t *arch)
{
    int32_t op_ret = -1;
    get_archstore_methods_t get_archstore_methods;

    /*
     * libopenarchive.so defines methods for performing data management
     * operations. We will extract the methods from library and these
     * methods will be invoked for moving data between glusterfs volume
     * and the data management product.
     */

    VALIDATE_OR_GOTO(arch, err);

    arch->handle = dlopen(LIBARCHIVE_SO, RTLD_NOW);
    if (!arch->handle) {
        gf_msg(plugin, GF_LOG_ERROR, 0, CVLT_DLOPEN_FAILED,
               " failed to open %s ", LIBARCHIVE_SO);
        return op_ret;
    }

    dlerror(); /* Clear any existing error */

    get_archstore_methods = dlsym(arch->handle, "get_archstore_methods");
    if (!get_archstore_methods) {
        gf_msg(plugin, GF_LOG_ERROR, 0, CVLT_EXTRACTION_FAILED,
               " Error extracting get_archstore_methods()");
        dlclose(arch->handle);
        arch->handle = NULL;
        return op_ret;
    }

    op_ret = get_archstore_methods(&(arch->fops));
    if (op_ret) {
        gf_msg(plugin, GF_LOG_ERROR, 0, CVLT_EXTRACTION_FAILED,
               " Failed to extract methods in get_archstore_methods");
        dlclose(arch->handle);
        arch->handle = NULL;
        return op_ret;
    }

err:
    return op_ret;
}

static int32_t
cvlt_alloc_resources(xlator_t *this, archive_t *arch, int num_req, int num_iatt)
{
    /*
     * Initialize information about all the memory pools that will be
     * used by this xlator.
     */
    arch->nreqs = 0;

    arch->req_pool = NULL;

    arch->handle = NULL;
    arch->xl = this;

    arch->req_pool = mem_pool_new(cvlt_request_t, num_req);
    if (!arch->req_pool) {
        goto err;
    }

    arch->iobuf_pool = iobuf_pool_new();
    if (!arch->iobuf_pool) {
        goto err;
    }

    if (cvlt_extract_store_fops(this, arch)) {
        goto err;
    }

    return 0;

err:

    return -1;
}

static void
cvlt_req_init(cvlt_request_t *req)
{
    sem_init(&(req->sem), 0, 0);

    return;
}

static void
cvlt_req_destroy(cvlt_request_t *req)
{
    if (req->iobuf) {
        iobuf_unref(req->iobuf);
    }

    if (req->iobref) {
        iobref_unref(req->iobref);
    }

    sem_destroy(&(req->sem));

    return;
}

static cvlt_request_t *
cvlt_alloc_req(archive_t *arch)
{
    cvlt_request_t *reqptr = NULL;

    if (!arch) {
        goto err;
    }

    if (arch->req_pool) {
        reqptr = mem_get0(arch->req_pool);
        if (reqptr) {
            cvlt_req_init(reqptr);
        }
    }

    if (reqptr) {
        LOCK(&(arch->lock));
        arch->nreqs++;
        UNLOCK(&(arch->lock));
    }

err:
    return reqptr;
}

static int32_t
cvlt_free_req(archive_t *arch, cvlt_request_t *reqptr)
{
    if (!reqptr) {
        goto err;
    }

    if (!arch) {
        goto err;
    }

    if (arch->req_pool) {
        /*
         * Free the request resources if they exist.
         */

        cvlt_req_destroy(reqptr);
        mem_put(reqptr);

        LOCK(&(arch->lock));
        arch->nreqs--;
        UNLOCK(&(arch->lock));
    }

    return 0;

err:
    return -1;
}

static int32_t
cvlt_init_xlator(xlator_t *this, archive_t *arch, int num_req, int num_iatt)
{
    int32_t ret = -1;
    int32_t errnum = -1;
    int32_t locked = 0;

    /*
     * Perform all the initializations needed for brining up the xlator.
     */
    if (!arch) {
        goto err;
    }

    LOCK_INIT(&(arch->lock));
    LOCK(&(arch->lock));

    locked = 1;

    ret = cvlt_alloc_resources(this, arch, num_req, num_iatt);

    if (ret) {
        goto err;
    }

    /*
     * Now that the fops have been extracted initialize the store
     */
    ret = arch->fops.init(&(arch->descinfo), &errnum, plugin);
    if (ret) {
        goto err;
    }

    UNLOCK(&(arch->lock));
    locked = 0;
    ret = 0;

    return ret;

err:
    cvlt_free_resources(arch);

    if (locked) {
        UNLOCK(&(arch->lock));
    }

    return ret;
}

static int32_t
cvlt_term_xlator(archive_t *arch)
{
    int32_t errnum = -1;

    if (!arch) {
        goto err;
    }

    LOCK(&(arch->lock));

    /*
     * Release the resources that have been allocated inside store
     */
    arch->fops.fini(&(arch->descinfo), &errnum);

    cvlt_free_resources(arch);

    UNLOCK(&(arch->lock));

    GF_FREE(arch);

    return 0;

err:
    return -1;
}

static int32_t
cvlt_init_store_info(archive_t *priv, archstore_info_t *store_info)
{
    if (!store_info) {
        return -1;
    }

    store_info->prod = priv->product_id;
    store_info->prodlen = strlen(priv->product_id);

    store_info->id = priv->store_id;
    store_info->idlen = strlen(priv->store_id);

    return 0;
}

static int32_t
cvlt_init_file_info(cs_loc_xattr_t *xattr, archstore_fileinfo_t *file_info)
{
    if (!xattr || !file_info) {
        return -1;
    }

    gf_uuid_copy(file_info->uuid, xattr->uuid);
    file_info->path = xattr->file_path;
    file_info->pathlength = strlen(xattr->file_path);

    return 0;
}

static int32_t
cvlt_init_gluster_store_info(cs_loc_xattr_t *xattr,
                             archstore_info_t *store_info)
{
    static char *product = "glusterfs";

    if (!xattr || !store_info) {
        return -1;
    }

    store_info->prod = product;
    store_info->prodlen = strlen(product);

    store_info->id = xattr->volname;
    store_info->idlen = strlen(xattr->volname);

    return 0;
}

static int32_t
cvlt_init_gluster_file_info(cs_loc_xattr_t *xattr,
                            archstore_fileinfo_t *file_info)
{
    if (!xattr || !file_info) {
        return -1;
    }

    gf_uuid_copy(file_info->uuid, xattr->gfid);
    file_info->path = xattr->file_path;
    file_info->pathlength = strlen(xattr->file_path);

    return 0;
}

static void
cvlt_copy_stat_info(struct iatt *buf, cs_size_xattr_t *xattrs)
{
    /*
     * If the file was archived then the reported size will not be a
     * correct one. We need to fix this.
     */
    if (buf && xattrs) {
        buf->ia_size = xattrs->size;
        buf->ia_blksize = xattrs->blksize;
        buf->ia_blocks = xattrs->blocks;
    }

    return;
}

static void
cvlt_readv_complete(archstore_desc_t *desc, app_callback_info_t *cbkinfo,
                    void *cookie, int64_t op_ret, int32_t op_errno)
{
    struct iovec iov;
    xlator_t *this = NULL;
    struct iatt postbuf = {
        0,
    };
    call_frame_t *frame = NULL;
    cvlt_request_t *req = (cvlt_request_t *)cookie;
    cs_local_t *local = NULL;
    cs_private_t *cspriv = NULL;
    archive_t *priv = NULL;

    frame = req->frame;
    this = frame->this;
    local = frame->local;

    cspriv = this->private;
    priv = (archive_t *)cspriv->stores->config;

    if (strcmp(priv->trailer, CVLT_TRAILER)) {
        op_ret = -1;
        op_errno = EINVAL;
        goto out;
    }

    gf_msg_debug(plugin, 0,
                 " Read callback invoked offset:%" PRIu64 "bytes: %" PRIu64
                 " op : %d ret : %" PRId64 " errno : %d",
                 req->offset, req->bytes, req->op_type, op_ret, op_errno);

    if (op_ret < 0) {
        goto out;
    }

    req->iobref = iobref_new();
    if (!req->iobref) {
        op_ret = -1;
        op_errno = ENOMEM;
        goto out;
    }

    iobref_add(req->iobref, req->iobuf);
    iov.iov_base = iobuf_ptr(req->iobuf);
    iov.iov_len = op_ret;

    cvlt_copy_stat_info(&postbuf, &(req->szxattr));

    /*
     * Hack to notify higher layers of EOF.
     */
    if (!postbuf.ia_size || (req->offset + iov.iov_len >= postbuf.ia_size)) {
        gf_msg_debug(plugin, 0, " signalling end-of-file for uuid=%s",
                     uuid_utoa(req->file_info.uuid));
        op_errno = ENOENT;
    }

out:

    STACK_UNWIND_STRICT(readv, frame, op_ret, op_errno, &iov, 1, &postbuf,
                        req->iobref, local->xattr_rsp);

    if (req) {
        cvlt_free_req(priv, req);
    }

    return;
}

static void
cvlt_download_complete(archstore_desc_t *store, app_callback_info_t *cbk_info,
                       void *cookie, int64_t ret, int errcode)
{
    cvlt_request_t *req = (cvlt_request_t *)cookie;

    gf_msg_debug(plugin, 0,
                 " Download callback invoked  ret : %" PRId64 " errno : %d",
                 ret, errcode);

    req->op_ret = ret;
    req->op_errno = errcode;
    sem_post(&(req->sem));

    return;
}

void *
cvlt_init(xlator_t *this)
{
    int ret = 0;
    archive_t *priv = NULL;

    if (!this->children || this->children->next) {
        gf_msg(plugin, GF_LOG_ERROR, ENOMEM, 0,
               "should have exactly one child");
        ret = -1;
        goto out;
    }

    if (!this->parents) {
        gf_msg(plugin, GF_LOG_ERROR, ENOMEM, 0,
               "dangling volume. check volfile");
        ret = -1;
        goto out;
    }

    priv = GF_CALLOC(1, sizeof(archive_t), gf_libcvlt_mt_cvlt_private_t);
    if (!priv) {
        ret = -1;
        goto out;
    }

    priv->trailer = CVLT_TRAILER;
    if (cvlt_init_xlator(this, priv, num_req, num_iatt)) {
        gf_msg(plugin, GF_LOG_ERROR, ENOMEM, 0, "xlator init failed");
        ret = -1;
        goto out;
    }

    GF_OPTION_INIT("cloudsync-store-id", priv->store_id, str, out);
    GF_OPTION_INIT("cloudsync-product-id", priv->product_id, str, out);

    gf_msg(plugin, GF_LOG_INFO, 0, 0,
           "store id is : %s "
           "product id is : %s.",
           priv->store_id, priv->product_id);
out:
    if (ret == -1) {
        cvlt_term_xlator(priv);
        return (NULL);
    }
    return priv;
}

int
cvlt_reconfigure(xlator_t *this, dict_t *options)
{
    cs_private_t *cspriv = NULL;
    archive_t *priv = NULL;

    cspriv = this->private;
    priv = (archive_t *)cspriv->stores->config;

    if (strcmp(priv->trailer, CVLT_TRAILER))
        goto out;

    GF_OPTION_RECONF("cloudsync-store-id", priv->store_id, options, str, out);

    GF_OPTION_RECONF("cloudsync-product-id", priv->product_id, options, str,
                     out);
    gf_msg_debug(plugin, 0,
                 "store id is : %s "
                 "product id is : %s.",
                 priv->store_id, priv->product_id);
    return 0;
out:
    return -1;
}

void
cvlt_fini(void *config)
{
    archive_t *priv = NULL;

    priv = (archive_t *)config;

    if (strcmp(priv->trailer, CVLT_TRAILER))
        return;

    cvlt_term_xlator(priv);
    gf_msg(plugin, GF_LOG_INFO, 0, CVLT_FREE, " released xlator resources");
    return;
}

int
cvlt_download(call_frame_t *frame, void *config)
{
    archive_t *parch = NULL;
    cs_local_t *local = frame->local;
    cs_loc_xattr_t *locxattr = local->xattrinfo.lxattr;
    cvlt_request_t *req = NULL;
    archstore_info_t dest_storeinfo;
    archstore_fileinfo_t dest_fileinfo;
    int32_t op_ret, op_errno;

    parch = (archive_t *)config;

    if (strcmp(parch->trailer, CVLT_TRAILER)) {
        op_ret = -1;
        op_errno = EINVAL;
        goto err;
    }

    gf_msg_debug(plugin, 0, " download invoked for uuid = %s  gfid=%s ",
                 locxattr->uuid, uuid_utoa(locxattr->gfid));

    if (!(parch->fops.restore)) {
        op_errno = ELIBBAD;
        goto err;
    }

    /*
     * Download needs to be processed. Allocate a request.
     */
    req = cvlt_alloc_req(parch);

    if (!req) {
        gf_msg(plugin, GF_LOG_ERROR, ENOMEM, CVLT_RESOURCE_ALLOCATION_FAILED,
               " failed to allocated request for gfid=%s",
               uuid_utoa(locxattr->gfid));
        op_errno = ENOMEM;
        goto err;
    }

    /*
     * Initialize the request object.
     */
    req->op_type = CVLT_RESTORE_OP;
    req->frame = frame;

    /*
     * The file is currently residing inside a data management store.
     * To restore the file contents we need to provide the information
     * about data management store.
     */
    op_ret = cvlt_init_store_info(parch, &(req->store_info));
    if (op_ret < 0) {
        gf_msg(plugin, GF_LOG_ERROR, 0, CVLT_EXTRACTION_FAILED,
               " failed to extract store info for gfid=%s",
               uuid_utoa(locxattr->gfid));
        goto err;
    }

    op_ret = cvlt_init_file_info(locxattr, &(req->file_info));
    if (op_ret < 0) {
        gf_msg(plugin, GF_LOG_ERROR, 0, CVLT_EXTRACTION_FAILED,
               " failed to extract file info for gfid=%s",
               uuid_utoa(locxattr->gfid));
        goto err;
    }

    /*
     * We need t perform in-place restore of the file from data managment
     * store to gusterfs volume.
     */
    op_ret = cvlt_init_gluster_store_info(locxattr, &dest_storeinfo);
    if (op_ret < 0) {
        gf_msg(plugin, GF_LOG_ERROR, 0, CVLT_EXTRACTION_FAILED,
               " failed to extract destination store info for gfid=%s",
               uuid_utoa(locxattr->gfid));
        goto err;
    }

    op_ret = cvlt_init_gluster_file_info(locxattr, &dest_fileinfo);
    if (op_ret < 0) {
        gf_msg(plugin, GF_LOG_ERROR, 0, CVLT_EXTRACTION_FAILED,
               " failed to extract file info for gfid=%s",
               uuid_utoa(locxattr->gfid));
        goto err;
    }

    /*
     * Submit the restore request.
     */
    op_ret = parch->fops.restore(&(parch->descinfo), &(req->store_info),
                                 &(req->file_info), &dest_storeinfo,
                                 &dest_fileinfo, &op_errno,
                                 cvlt_download_complete, req);
    if (op_ret < 0) {
        gf_msg(plugin, GF_LOG_ERROR, 0, CVLT_RESTORE_FAILED,
               " failed to restore file gfid=%s from data managment store",
               uuid_utoa(locxattr->gfid));
        goto err;
    }

    /*
     * Wait for the restore to complete.
     */
    sem_wait(&(req->sem));

    if (req->op_ret < 0) {
        gf_msg(plugin, GF_LOG_ERROR, 0, CVLT_RESTORE_FAILED,
               " restored failed for gfid=%s", uuid_utoa(locxattr->gfid));
        goto err;
    }

    if (req) {
        cvlt_free_req(parch, req);
    }

    return 0;

err:

    if (req) {
        cvlt_free_req(parch, req);
    }

    return -1;
}

int
cvlt_read(call_frame_t *frame, void *config)
{
    int32_t op_ret = -1;
    int32_t op_errno = 0;
    archive_t *parch = NULL;
    cvlt_request_t *req = NULL;
    struct iovec iov = {
        0,
    };
    struct iobref *iobref;
    size_t size = 0;
    off_t off = 0;

    cs_local_t *local = frame->local;
    cs_loc_xattr_t *locxattr = local->xattrinfo.lxattr;

    size = local->xattrinfo.size;
    off = local->xattrinfo.offset;

    parch = (archive_t *)config;

    if (strcmp(parch->trailer, CVLT_TRAILER)) {
        op_ret = -1;
        op_errno = EINVAL;
        goto err;
    }

    gf_msg_debug(plugin, 0,
                 " read invoked for gfid = %s offset = %" PRIu64
                 " file_size = %" PRIu64,
                 uuid_utoa(locxattr->gfid), off, local->stbuf.ia_size);

    if (off >= local->stbuf.ia_size) {
        /*
         * Hack to notify higher layers of EOF.
         */

        op_errno = ENOENT;
        op_ret = 0;

        gf_msg(plugin, GF_LOG_ERROR, 0, CVLT_READ_FAILED,
               " reporting end-of-file for gfid=%s", uuid_utoa(locxattr->gfid));

        goto err;
    }

    if (!size) {
        op_errno = EINVAL;

        gf_msg(plugin, GF_LOG_ERROR, 0, CVLT_READ_FAILED,
               " zero size read attempted on gfid=%s",
               uuid_utoa(locxattr->gfid));
        goto err;
    }

    if (!(parch->fops.read)) {
        op_errno = ELIBBAD;
        goto err;
    }

    /*
     * The read request need to be processed. Allocate a request.
     */
    req = cvlt_alloc_req(parch);

    if (!req) {
        gf_msg(plugin, GF_LOG_ERROR, ENOMEM, CVLT_NO_MEMORY,
               " failed to allocated request for gfid=%s",
               uuid_utoa(locxattr->gfid));
        op_errno = ENOMEM;
        goto err;
    }

    req->iobuf = iobuf_get_page_aligned(parch->iobuf_pool, size, ALIGN_SIZE);
    if (!req->iobuf) {
        op_errno = ENOMEM;
        goto err;
    }

    /*
     * Initialize the request object.
     */
    req->op_type = CVLT_READ_OP;
    req->offset = off;
    req->bytes = size;
    req->frame = frame;
    req->szxattr.size = local->stbuf.ia_size;
    req->szxattr.blocks = local->stbuf.ia_blocks;
    req->szxattr.blksize = local->stbuf.ia_blksize;

    /*
     * The file is currently residing inside a data management store.
     * To read the file contents we need to provide the information
     * about data management store.
     */
    op_ret = cvlt_init_store_info(parch, &(req->store_info));
    if (op_ret < 0) {
        gf_msg(plugin, GF_LOG_ERROR, 0, CVLT_EXTRACTION_FAILED,
               " failed to extract store info for gfid=%s"
               " offset=%" PRIu64 " size=%" GF_PRI_SIZET
               ", "
               " buf=%p",
               uuid_utoa(locxattr->gfid), off, size, req->iobuf->ptr);
        goto err;
    }

    op_ret = cvlt_init_file_info(locxattr, &(req->file_info));
    if (op_ret < 0) {
        gf_msg(plugin, GF_LOG_ERROR, 0, CVLT_EXTRACTION_FAILED,
               " failed to extract file info for gfid=%s"
               " offset=%" PRIu64 " size=%" GF_PRI_SIZET
               ", "
               " buf=%p",
               uuid_utoa(locxattr->gfid), off, size, req->iobuf->ptr);
        goto err;
    }

    /*
     * Submit the read request.
     */
    op_ret = parch->fops.read(&(parch->descinfo), &(req->store_info),
                              &(req->file_info), off, req->iobuf->ptr, size,
                              &op_errno, cvlt_readv_complete, req);

    if (op_ret < 0) {
        gf_msg(plugin, GF_LOG_ERROR, 0, CVLT_EXTRACTION_FAILED,
               " read failed on gfid=%s"
               " offset=%" PRIu64 " size=%" GF_PRI_SIZET
               ", "
               " buf=%p",
               uuid_utoa(locxattr->gfid), off, size, req->iobuf->ptr);
        goto err;
    }

    return 0;

err:

    iobref = iobref_new();
    gf_msg_debug(plugin, 0, " read unwinding stack op_ret = %d, op_errno = %d",
                 op_ret, op_errno);

    STACK_UNWIND_STRICT(readv, frame, op_ret, op_errno, &iov, 1,
                        &(local->stbuf), iobref, local->xattr_rsp);

    if (iobref) {
        iobref_unref(iobref);
    }

    if (req) {
        cvlt_free_req(parch, req);
    }

    return 0;
}