Blob Blame History Raw
/*
   Copyright (c) 2014 Red Hat, Inc. <http://www.redhat.com>
   This file is part of GlusterFS.

   This file is licensed to you under your choice of the GNU Lesser
   General Public License, version 3 or any later version (LGPLv3 or
   later), or the GNU General Public License, version 2 (GPLv2), in all
   cases as published by the Free Software Foundation.
*/

#include "barrier.h"
#include <glusterfs/defaults.h>
#include <glusterfs/call-stub.h>

#include <glusterfs/statedump.h>

void
barrier_local_set_gfid(call_frame_t *frame, uuid_t gfid, xlator_t *this)
{
    if (gfid) {
        uuid_t *id = GF_MALLOC(sizeof(uuid_t), gf_common_mt_uuid_t);
        if (!id) {
            gf_log(this->name, GF_LOG_WARNING,
                   "Could not set gfid"
                   ". gfid will not be dumped in statedump file.");
            return;
        }
        gf_uuid_copy(*id, gfid);
        frame->local = id;
    }
}

void
barrier_local_free_gfid(call_frame_t *frame)
{
    if (frame->local) {
        GF_FREE(frame->local);
        frame->local = NULL;
    }
}

int32_t
barrier_truncate_cbk_resume(call_frame_t *frame, void *cookie, xlator_t *this,
                            int32_t op_ret, int32_t op_errno,
                            struct iatt *prebuf, struct iatt *postbuf,
                            dict_t *xdata)
{
    barrier_local_free_gfid(frame);
    STACK_UNWIND_STRICT(truncate, frame, op_ret, op_errno, prebuf, postbuf,
                        xdata);
    return 0;
}

int32_t
barrier_ftruncate_cbk_resume(call_frame_t *frame, void *cookie, xlator_t *this,
                             int32_t op_ret, int32_t op_errno,
                             struct iatt *prebuf, struct iatt *postbuf,
                             dict_t *xdata)
{
    barrier_local_free_gfid(frame);
    STACK_UNWIND_STRICT(ftruncate, frame, op_ret, op_errno, prebuf, postbuf,
                        xdata);
    return 0;
}

int32_t
barrier_unlink_cbk_resume(call_frame_t *frame, void *cookie, xlator_t *this,
                          int32_t op_ret, int32_t op_errno,
                          struct iatt *preparent, struct iatt *postparent,
                          dict_t *xdata)
{
    barrier_local_free_gfid(frame);
    STACK_UNWIND_STRICT(unlink, frame, op_ret, op_errno, preparent, postparent,
                        xdata);
    return 0;
}

int32_t
barrier_rmdir_cbk_resume(call_frame_t *frame, void *cookie, xlator_t *this,
                         int32_t op_ret, int32_t op_errno,
                         struct iatt *preparent, struct iatt *postparent,
                         dict_t *xdata)
{
    barrier_local_free_gfid(frame);
    STACK_UNWIND_STRICT(rmdir, frame, op_ret, op_errno, preparent, postparent,
                        xdata);
    return 0;
}

int32_t
barrier_rename_cbk_resume(call_frame_t *frame, void *cookie, xlator_t *this,
                          int32_t op_ret, int32_t op_errno, struct iatt *buf,
                          struct iatt *preoldparent, struct iatt *postoldparent,
                          struct iatt *prenewparent, struct iatt *postnewparent,
                          dict_t *xdata)
{
    barrier_local_free_gfid(frame);
    STACK_UNWIND_STRICT(rename, frame, op_ret, op_errno, buf, preoldparent,
                        postoldparent, prenewparent, postnewparent, xdata);
    return 0;
}

int32_t
barrier_writev_cbk_resume(call_frame_t *frame, void *cookie, xlator_t *this,
                          int32_t op_ret, int32_t op_errno, struct iatt *prebuf,
                          struct iatt *postbuf, dict_t *xdata)
{
    barrier_local_free_gfid(frame);
    STACK_UNWIND_STRICT(writev, frame, op_ret, op_errno, prebuf, postbuf,
                        xdata);
    return 0;
}

int32_t
barrier_fsync_cbk_resume(call_frame_t *frame, void *cookie, xlator_t *this,
                         int32_t op_ret, int32_t op_errno, struct iatt *prebuf,
                         struct iatt *postbuf, dict_t *xdata)
{
    barrier_local_free_gfid(frame);
    STACK_UNWIND_STRICT(fsync, frame, op_ret, op_errno, prebuf, postbuf, xdata);
    return 0;
}

int32_t
barrier_removexattr_cbk_resume(call_frame_t *frame, void *cookie,
                               xlator_t *this, int32_t op_ret, int32_t op_errno,
                               dict_t *xdata)
{
    barrier_local_free_gfid(frame);
    STACK_UNWIND_STRICT(removexattr, frame, op_ret, op_errno, xdata);
    return 0;
}

int32_t
barrier_fremovexattr_cbk_resume(call_frame_t *frame, void *cookie,
                                xlator_t *this, int32_t op_ret,
                                int32_t op_errno, dict_t *xdata)
{
    barrier_local_free_gfid(frame);
    STACK_UNWIND_STRICT(fremovexattr, frame, op_ret, op_errno, xdata);
    return 0;
}

int32_t
barrier_writev_cbk(call_frame_t *frame, void *cookie, xlator_t *this,
                   int32_t op_ret, int32_t op_errno, struct iatt *prebuf,
                   struct iatt *postbuf, dict_t *xdata)
{
    BARRIER_FOP_CBK(writev, out, frame, this, op_ret, op_errno, prebuf, postbuf,
                    xdata);
out:
    return 0;
}

int32_t
barrier_fremovexattr_cbk(call_frame_t *frame, void *cookie, xlator_t *this,
                         int32_t op_ret, int32_t op_errno, dict_t *xdata)
{
    BARRIER_FOP_CBK(fremovexattr, out, frame, this, op_ret, op_errno, xdata);
out:
    return 0;
}

int32_t
barrier_removexattr_cbk(call_frame_t *frame, void *cookie, xlator_t *this,
                        int32_t op_ret, int32_t op_errno, dict_t *xdata)
{
    BARRIER_FOP_CBK(removexattr, out, frame, this, op_ret, op_errno, xdata);
out:
    return 0;
}

int32_t
barrier_truncate_cbk(call_frame_t *frame, void *cookie, xlator_t *this,
                     int32_t op_ret, int32_t op_errno, struct iatt *prebuf,
                     struct iatt *postbuf, dict_t *xdata)
{
    BARRIER_FOP_CBK(truncate, out, frame, this, op_ret, op_errno, prebuf,
                    postbuf, xdata);
out:
    return 0;
}

int32_t
barrier_ftruncate_cbk(call_frame_t *frame, void *cookie, xlator_t *this,
                      int32_t op_ret, int32_t op_errno, struct iatt *prebuf,
                      struct iatt *postbuf, dict_t *xdata)
{
    BARRIER_FOP_CBK(ftruncate, out, frame, this, op_ret, op_errno, prebuf,
                    postbuf, xdata);
out:
    return 0;
}

int32_t
barrier_rename_cbk(call_frame_t *frame, void *cookie, xlator_t *this,
                   int32_t op_ret, int32_t op_errno, struct iatt *buf,
                   struct iatt *preoldparent, struct iatt *postoldparent,
                   struct iatt *prenewparent, struct iatt *postnewparent,
                   dict_t *xdata)
{
    BARRIER_FOP_CBK(rename, out, frame, this, op_ret, op_errno, buf,
                    preoldparent, postoldparent, prenewparent, postnewparent,
                    xdata);
out:
    return 0;
}

int32_t
barrier_rmdir_cbk(call_frame_t *frame, void *cookie, xlator_t *this,
                  int32_t op_ret, int32_t op_errno, struct iatt *preparent,
                  struct iatt *postparent, dict_t *xdata)
{
    BARRIER_FOP_CBK(rmdir, out, frame, this, op_ret, op_errno, preparent,
                    postparent, xdata);
out:
    return 0;
}

int32_t
barrier_unlink_cbk(call_frame_t *frame, void *cookie, xlator_t *this,
                   int32_t op_ret, int32_t op_errno, struct iatt *preparent,
                   struct iatt *postparent, dict_t *xdata)
{
    BARRIER_FOP_CBK(unlink, out, frame, this, op_ret, op_errno, preparent,
                    postparent, xdata);
out:
    return 0;
}

int32_t
barrier_fsync_cbk(call_frame_t *frame, void *cookie, xlator_t *this,
                  int32_t op_ret, int32_t op_errno, struct iatt *prebuf,
                  struct iatt *postbuf, dict_t *xdata)
{
    BARRIER_FOP_CBK(fsync, out, frame, this, op_ret, op_errno, prebuf, postbuf,
                    xdata);
out:
    return 0;
}

int32_t
barrier_writev(call_frame_t *frame, xlator_t *this, fd_t *fd,
               struct iovec *vector, int32_t count, off_t off, uint32_t flags,
               struct iobref *iobref, dict_t *xdata)
{
    if (!((flags | fd->flags) & (O_SYNC | O_DSYNC))) {
        STACK_WIND_TAIL(frame, FIRST_CHILD(this),
                        FIRST_CHILD(this)->fops->writev, fd, vector, count, off,
                        flags, iobref, xdata);

        return 0;
    }

    barrier_local_set_gfid(frame, fd->inode->gfid, this);
    STACK_WIND(frame, barrier_writev_cbk, FIRST_CHILD(this),
               FIRST_CHILD(this)->fops->writev, fd, vector, count, off, flags,
               iobref, xdata);
    return 0;
}

int32_t
barrier_fremovexattr(call_frame_t *frame, xlator_t *this, fd_t *fd,
                     const char *name, dict_t *xdata)
{
    barrier_local_set_gfid(frame, fd->inode->gfid, this);
    STACK_WIND(frame, barrier_fremovexattr_cbk, FIRST_CHILD(this),
               FIRST_CHILD(this)->fops->fremovexattr, fd, name, xdata);
    return 0;
}

int32_t
barrier_removexattr(call_frame_t *frame, xlator_t *this, loc_t *loc,
                    const char *name, dict_t *xdata)
{
    barrier_local_set_gfid(frame, loc->inode->gfid, this);
    STACK_WIND(frame, barrier_removexattr_cbk, FIRST_CHILD(this),
               FIRST_CHILD(this)->fops->removexattr, loc, name, xdata);
    return 0;
}

int32_t
barrier_truncate(call_frame_t *frame, xlator_t *this, loc_t *loc, off_t offset,
                 dict_t *xdata)
{
    barrier_local_set_gfid(frame, loc->inode->gfid, this);
    STACK_WIND(frame, barrier_truncate_cbk, FIRST_CHILD(this),
               FIRST_CHILD(this)->fops->truncate, loc, offset, xdata);
    return 0;
}

int32_t
barrier_rename(call_frame_t *frame, xlator_t *this, loc_t *oldloc,
               loc_t *newloc, dict_t *xdata)
{
    barrier_local_set_gfid(frame, oldloc->inode->gfid, this);
    STACK_WIND(frame, barrier_rename_cbk, FIRST_CHILD(this),
               FIRST_CHILD(this)->fops->rename, oldloc, newloc, xdata);
    return 0;
}

int
barrier_rmdir(call_frame_t *frame, xlator_t *this, loc_t *loc, int flags,
              dict_t *xdata)
{
    barrier_local_set_gfid(frame, loc->inode->gfid, this);
    STACK_WIND(frame, barrier_rmdir_cbk, FIRST_CHILD(this),
               FIRST_CHILD(this)->fops->rmdir, loc, flags, xdata);
    return 0;
}

int32_t
barrier_unlink(call_frame_t *frame, xlator_t *this, loc_t *loc, int xflag,
               dict_t *xdata)
{
    barrier_local_set_gfid(frame, loc->inode->gfid, this);
    STACK_WIND(frame, barrier_unlink_cbk, FIRST_CHILD(this),
               FIRST_CHILD(this)->fops->unlink, loc, xflag, xdata);
    return 0;
}

int32_t
barrier_ftruncate(call_frame_t *frame, xlator_t *this, fd_t *fd, off_t offset,
                  dict_t *xdata)
{
    barrier_local_set_gfid(frame, fd->inode->gfid, this);
    STACK_WIND(frame, barrier_ftruncate_cbk, FIRST_CHILD(this),
               FIRST_CHILD(this)->fops->ftruncate, fd, offset, xdata);
    return 0;
}

int32_t
barrier_fsync(call_frame_t *frame, xlator_t *this, fd_t *fd, int32_t flags,
              dict_t *xdata)
{
    barrier_local_set_gfid(frame, fd->inode->gfid, this);
    STACK_WIND(frame, barrier_fsync_cbk, FIRST_CHILD(this),
               FIRST_CHILD(this)->fops->fsync, fd, flags, xdata);
    return 0;
}

call_stub_t *
__barrier_dequeue(xlator_t *this, struct list_head *queue)
{
    call_stub_t *stub = NULL;
    barrier_priv_t *priv = NULL;

    priv = this->private;
    GF_ASSERT(priv);

    if (list_empty(queue))
        goto out;

    stub = list_entry(queue->next, call_stub_t, list);
    list_del_init(&stub->list);

out:
    return stub;
}

void
barrier_dequeue_all(xlator_t *this, struct list_head *queue)
{
    call_stub_t *stub = NULL;

    gf_log(this->name, GF_LOG_INFO, "Dequeuing all the barriered fops");

    /* TODO: Start the below task in a new thread */
    while ((stub = __barrier_dequeue(this, queue)))
        call_resume(stub);

    gf_log(this->name, GF_LOG_INFO,
           "Dequeuing the barriered fops is "
           "finished");
    return;
}

void
barrier_timeout(void *data)
{
    xlator_t *this = NULL;
    barrier_priv_t *priv = NULL;
    struct list_head queue = {
        0,
    };

    this = data;
    THIS = this;
    priv = this->private;

    INIT_LIST_HEAD(&queue);

    gf_log(this->name, GF_LOG_CRITICAL,
           "Disabling barrier because of "
           "the barrier timeout.");

    LOCK(&priv->lock);
    {
        __barrier_disable(this, &queue);
    }
    UNLOCK(&priv->lock);

    barrier_dequeue_all(this, &queue);

    return;
}

void
__barrier_enqueue(xlator_t *this, call_stub_t *stub)
{
    barrier_priv_t *priv = NULL;

    priv = this->private;
    GF_ASSERT(priv);

    list_add_tail(&stub->list, &priv->queue);
    priv->queue_size++;

    return;
}

void
__barrier_disable(xlator_t *this, struct list_head *queue)
{
    GF_UNUSED int ret = 0;
    barrier_priv_t *priv = NULL;

    priv = this->private;
    GF_ASSERT(priv);

    if (priv->timer) {
        ret = gf_timer_call_cancel(this->ctx, priv->timer);
        priv->timer = NULL;
    }

    list_splice_init(&priv->queue, queue);
    priv->queue_size = 0;
    priv->barrier_enabled = _gf_false;
}

int
__barrier_enable(xlator_t *this, barrier_priv_t *priv)
{
    int ret = -1;

    priv->timer = gf_timer_call_after(this->ctx, priv->timeout, barrier_timeout,
                                      (void *)this);
    if (!priv->timer) {
        gf_log(this->name, GF_LOG_CRITICAL,
               "Couldn't add barrier "
               "timeout event.");
        goto out;
    }

    priv->barrier_enabled = _gf_true;
    ret = 0;
out:
    return ret;
}

int
notify(xlator_t *this, int event, void *data, ...)
{
    barrier_priv_t *priv = this->private;
    dict_t *dict = NULL;
    int ret = -1;
    int barrier_enabled = _gf_false;
    struct list_head queue = {
        0,
    };

    GF_ASSERT(priv);
    INIT_LIST_HEAD(&queue);

    switch (event) {
        case GF_EVENT_TRANSLATOR_OP: {
            dict = data;
            barrier_enabled = dict_get_str_boolean(dict, "barrier", -1);

            if (barrier_enabled == -1) {
                gf_log(this->name, GF_LOG_ERROR,
                       "Could not fetch "
                       " barrier key from the dictionary.");
                goto out;
            }

            LOCK(&priv->lock);
            {
                if (!priv->barrier_enabled) {
                    if (barrier_enabled) {
                        ret = __barrier_enable(this, priv);
                    } else {
                        UNLOCK(&priv->lock);
                        gf_log(this->name, GF_LOG_ERROR, "Already disabled.");
                        goto post_unlock;
                    }
                } else {
                    if (!barrier_enabled) {
                        __barrier_disable(this, &queue);
                        ret = 0;
                    } else {
                        UNLOCK(&priv->lock);
                        gf_log(this->name, GF_LOG_ERROR, "Already enabled");
                        goto post_unlock;
                    }
                }
            }
            UNLOCK(&priv->lock);
        post_unlock:
            if (!list_empty(&queue))
                barrier_dequeue_all(this, &queue);

            break;
        }
        default: {
            default_notify(this, event, data);
            ret = 0;
            goto out;
        }
    }
out:
    return ret;
}

int
reconfigure(xlator_t *this, dict_t *options)
{
    barrier_priv_t *priv = NULL;
    int ret = -1;
    gf_boolean_t barrier_enabled = _gf_false;
    uint32_t timeout = {
        0,
    };
    struct list_head queue = {
        0,
    };

    priv = this->private;
    GF_ASSERT(priv);

    GF_OPTION_RECONF("barrier", barrier_enabled, options, bool, out);
    GF_OPTION_RECONF("barrier-timeout", timeout, options, time, out);

    INIT_LIST_HEAD(&queue);

    LOCK(&priv->lock);
    {
        if (!priv->barrier_enabled) {
            if (barrier_enabled) {
                ret = __barrier_enable(this, priv);
                if (ret) {
                    goto unlock;
                }
            }
        } else {
            if (!barrier_enabled) {
                __barrier_disable(this, &queue);
            }
        }
        priv->timeout.tv_sec = timeout;
        ret = 0;
    }
unlock:
    UNLOCK(&priv->lock);

    if (!list_empty(&queue))
        barrier_dequeue_all(this, &queue);

out:
    return ret;
}

int32_t
mem_acct_init(xlator_t *this)
{
    int ret = -1;

    ret = xlator_mem_acct_init(this, gf_barrier_mt_end + 1);
    if (ret)
        gf_log(this->name, GF_LOG_ERROR,
               "Memory accounting "
               "initialization failed.");

    return ret;
}

int
init(xlator_t *this)
{
    int ret = -1;
    barrier_priv_t *priv = NULL;
    uint32_t timeout = {
        0,
    };

    if (!this->children || this->children->next) {
        gf_log(this->name, GF_LOG_ERROR,
               "'barrier' not configured with exactly one child");
        goto out;
    }

    if (!this->parents)
        gf_log(this->name, GF_LOG_WARNING, "dangling volume. check volfile ");

    priv = GF_CALLOC(1, sizeof(*priv), gf_barrier_mt_priv_t);
    if (!priv)
        goto out;

    LOCK_INIT(&priv->lock);

    GF_OPTION_INIT("barrier", priv->barrier_enabled, bool, out);
    GF_OPTION_INIT("barrier-timeout", timeout, time, out);
    priv->timeout.tv_sec = timeout;

    INIT_LIST_HEAD(&priv->queue);

    if (priv->barrier_enabled) {
        ret = __barrier_enable(this, priv);
        if (ret == -1)
            goto out;
    }

    this->private = priv;
    ret = 0;
out:
    if (ret && priv)
        GF_FREE(priv);

    return ret;
}

void
fini(xlator_t *this)
{
    barrier_priv_t *priv = NULL;
    struct list_head queue = {
        0,
    };

    priv = this->private;
    if (!priv)
        goto out;

    INIT_LIST_HEAD(&queue);

    gf_log(this->name, GF_LOG_INFO,
           "Disabling barriering and dequeuing "
           "all the queued fops");
    LOCK(&priv->lock);
    {
        __barrier_disable(this, &queue);
    }
    UNLOCK(&priv->lock);

    if (!list_empty(&queue))
        barrier_dequeue_all(this, &queue);

    this->private = NULL;

    LOCK_DESTROY(&priv->lock);
    GF_FREE(priv);
out:
    return;
}

static void
barrier_dump_stub(call_stub_t *stub, char *prefix)
{
    char key[GF_DUMP_MAX_BUF_LEN] = {
        0,
    };

    gf_proc_dump_build_key(key, prefix, "fop");
    gf_proc_dump_write(key, "%s", gf_fop_list[stub->fop]);

    if (stub->frame->local) {
        gf_proc_dump_build_key(key, prefix, "gfid");
        gf_proc_dump_write(key, "%s",
                           uuid_utoa(*(uuid_t *)(stub->frame->local)));
    }
    if (stub->args.loc.path) {
        gf_proc_dump_build_key(key, prefix, "path");
        gf_proc_dump_write(key, "%s", stub->args.loc.path);
    }
    if (stub->args.loc.name) {
        gf_proc_dump_build_key(key, prefix, "name");
        gf_proc_dump_write(key, "%s", stub->args.loc.name);
    }

    return;
}

static void
__barrier_dump_queue(barrier_priv_t *priv)
{
    call_stub_t *stub = NULL;
    char key[GF_DUMP_MAX_BUF_LEN] = {
        0,
    };
    int i = 0;

    GF_VALIDATE_OR_GOTO("barrier", priv, out);

    list_for_each_entry(stub, &priv->queue, list)
    {
        snprintf(key, sizeof(key), "stub.%d", i++);
        gf_proc_dump_add_section("%s", key);
        barrier_dump_stub(stub, key);
    }

out:
    return;
}

int
barrier_dump_priv(xlator_t *this)
{
    int ret = -1;
    char key[GF_DUMP_MAX_BUF_LEN] = {
        0,
    };
    barrier_priv_t *priv = NULL;

    GF_VALIDATE_OR_GOTO("barrier", this, out);

    priv = this->private;
    if (!priv)
        return 0;

    gf_proc_dump_build_key(key, "xlator.features.barrier", "priv");
    gf_proc_dump_add_section("%s", key);

    LOCK(&priv->lock);
    {
        gf_proc_dump_build_key(key, "barrier", "enabled");
        gf_proc_dump_write(key, "%d", priv->barrier_enabled);
        gf_proc_dump_build_key(key, "barrier", "timeout");
        gf_proc_dump_write(key, "%ld", priv->timeout.tv_sec);
        if (priv->barrier_enabled) {
            gf_proc_dump_build_key(key, "barrier", "queue_size");
            gf_proc_dump_write(key, "%d", priv->queue_size);
            __barrier_dump_queue(priv);
        }
    }
    UNLOCK(&priv->lock);

out:
    return ret;
}

struct xlator_fops fops = {

    /* Barrier Class fops */
    .rmdir = barrier_rmdir,
    .unlink = barrier_unlink,
    .rename = barrier_rename,
    .removexattr = barrier_removexattr,
    .fremovexattr = barrier_fremovexattr,
    .truncate = barrier_truncate,
    .ftruncate = barrier_ftruncate,
    .fsync = barrier_fsync,

    /* Writes with only O_SYNC flag */
    .writev = barrier_writev,
};

struct xlator_dumpops dumpops = {
    .priv = barrier_dump_priv,
};

struct xlator_cbks cbks;

struct volume_options options[] = {
    {.key = {"barrier"},
     .type = GF_OPTION_TYPE_BOOL,
     .default_value = "disable",
     .op_version = {GD_OP_VERSION_3_6_0},
     .flags = OPT_FLAG_SETTABLE,
     .description = "When \"enabled\", blocks acknowledgements to application "
                    "for file operations such as rmdir, rename, unlink, "
                    "removexattr, fremovexattr, truncate, ftruncate, "
                    "write (with O_SYNC), fsync. It is turned \"off\" by "
                    "default."},
    {.key = {"barrier-timeout"},
     .type = GF_OPTION_TYPE_TIME,
     .default_value = BARRIER_TIMEOUT,
     .op_version = {GD_OP_VERSION_3_6_0},
     .flags = OPT_FLAG_SETTABLE,
     .description = "After 'timeout' seconds since the time 'barrier' "
                    "option was set to \"on\", acknowledgements to file "
                    "operations are no longer blocked and previously "
                    "blocked acknowledgements are sent to the application"},
    {.key = {NULL}},
};

xlator_api_t xlator_api = {
    .init = init,
    .fini = fini,
    .notify = notify,
    .reconfigure = reconfigure,
    .mem_acct_init = mem_acct_init,
    .op_version = {1}, /* Present from the initial version */
    .dumpops = &dumpops,
    .fops = &fops,
    .cbks = &cbks,
    .options = options,
    .identifier = "barrier",
    .category = GF_MAINTAINED,
};