/*
Copyright (c) 2014 Red Hat, Inc. <http://www.redhat.com>
This file is part of GlusterFS.
This file is licensed to you under your choice of the GNU Lesser
General Public License, version 3 or any later version (LGPLv3 or
later), or the GNU General Public License, version 2 (GPLv2), in all
cases as published by the Free Software Foundation.
*/
#include "barrier.h"
#include <glusterfs/defaults.h>
#include <glusterfs/call-stub.h>
#include <glusterfs/statedump.h>
void
barrier_local_set_gfid(call_frame_t *frame, uuid_t gfid, xlator_t *this)
{
if (gfid) {
uuid_t *id = GF_MALLOC(sizeof(uuid_t), gf_common_mt_uuid_t);
if (!id) {
gf_log(this->name, GF_LOG_WARNING,
"Could not set gfid"
". gfid will not be dumped in statedump file.");
return;
}
gf_uuid_copy(*id, gfid);
frame->local = id;
}
}
void
barrier_local_free_gfid(call_frame_t *frame)
{
if (frame->local) {
GF_FREE(frame->local);
frame->local = NULL;
}
}
int32_t
barrier_truncate_cbk_resume(call_frame_t *frame, void *cookie, xlator_t *this,
int32_t op_ret, int32_t op_errno,
struct iatt *prebuf, struct iatt *postbuf,
dict_t *xdata)
{
barrier_local_free_gfid(frame);
STACK_UNWIND_STRICT(truncate, frame, op_ret, op_errno, prebuf, postbuf,
xdata);
return 0;
}
int32_t
barrier_ftruncate_cbk_resume(call_frame_t *frame, void *cookie, xlator_t *this,
int32_t op_ret, int32_t op_errno,
struct iatt *prebuf, struct iatt *postbuf,
dict_t *xdata)
{
barrier_local_free_gfid(frame);
STACK_UNWIND_STRICT(ftruncate, frame, op_ret, op_errno, prebuf, postbuf,
xdata);
return 0;
}
int32_t
barrier_unlink_cbk_resume(call_frame_t *frame, void *cookie, xlator_t *this,
int32_t op_ret, int32_t op_errno,
struct iatt *preparent, struct iatt *postparent,
dict_t *xdata)
{
barrier_local_free_gfid(frame);
STACK_UNWIND_STRICT(unlink, frame, op_ret, op_errno, preparent, postparent,
xdata);
return 0;
}
int32_t
barrier_rmdir_cbk_resume(call_frame_t *frame, void *cookie, xlator_t *this,
int32_t op_ret, int32_t op_errno,
struct iatt *preparent, struct iatt *postparent,
dict_t *xdata)
{
barrier_local_free_gfid(frame);
STACK_UNWIND_STRICT(rmdir, frame, op_ret, op_errno, preparent, postparent,
xdata);
return 0;
}
int32_t
barrier_rename_cbk_resume(call_frame_t *frame, void *cookie, xlator_t *this,
int32_t op_ret, int32_t op_errno, struct iatt *buf,
struct iatt *preoldparent, struct iatt *postoldparent,
struct iatt *prenewparent, struct iatt *postnewparent,
dict_t *xdata)
{
barrier_local_free_gfid(frame);
STACK_UNWIND_STRICT(rename, frame, op_ret, op_errno, buf, preoldparent,
postoldparent, prenewparent, postnewparent, xdata);
return 0;
}
int32_t
barrier_writev_cbk_resume(call_frame_t *frame, void *cookie, xlator_t *this,
int32_t op_ret, int32_t op_errno, struct iatt *prebuf,
struct iatt *postbuf, dict_t *xdata)
{
barrier_local_free_gfid(frame);
STACK_UNWIND_STRICT(writev, frame, op_ret, op_errno, prebuf, postbuf,
xdata);
return 0;
}
int32_t
barrier_fsync_cbk_resume(call_frame_t *frame, void *cookie, xlator_t *this,
int32_t op_ret, int32_t op_errno, struct iatt *prebuf,
struct iatt *postbuf, dict_t *xdata)
{
barrier_local_free_gfid(frame);
STACK_UNWIND_STRICT(fsync, frame, op_ret, op_errno, prebuf, postbuf, xdata);
return 0;
}
int32_t
barrier_removexattr_cbk_resume(call_frame_t *frame, void *cookie,
xlator_t *this, int32_t op_ret, int32_t op_errno,
dict_t *xdata)
{
barrier_local_free_gfid(frame);
STACK_UNWIND_STRICT(removexattr, frame, op_ret, op_errno, xdata);
return 0;
}
int32_t
barrier_fremovexattr_cbk_resume(call_frame_t *frame, void *cookie,
xlator_t *this, int32_t op_ret,
int32_t op_errno, dict_t *xdata)
{
barrier_local_free_gfid(frame);
STACK_UNWIND_STRICT(fremovexattr, frame, op_ret, op_errno, xdata);
return 0;
}
int32_t
barrier_writev_cbk(call_frame_t *frame, void *cookie, xlator_t *this,
int32_t op_ret, int32_t op_errno, struct iatt *prebuf,
struct iatt *postbuf, dict_t *xdata)
{
BARRIER_FOP_CBK(writev, out, frame, this, op_ret, op_errno, prebuf, postbuf,
xdata);
out:
return 0;
}
int32_t
barrier_fremovexattr_cbk(call_frame_t *frame, void *cookie, xlator_t *this,
int32_t op_ret, int32_t op_errno, dict_t *xdata)
{
BARRIER_FOP_CBK(fremovexattr, out, frame, this, op_ret, op_errno, xdata);
out:
return 0;
}
int32_t
barrier_removexattr_cbk(call_frame_t *frame, void *cookie, xlator_t *this,
int32_t op_ret, int32_t op_errno, dict_t *xdata)
{
BARRIER_FOP_CBK(removexattr, out, frame, this, op_ret, op_errno, xdata);
out:
return 0;
}
int32_t
barrier_truncate_cbk(call_frame_t *frame, void *cookie, xlator_t *this,
int32_t op_ret, int32_t op_errno, struct iatt *prebuf,
struct iatt *postbuf, dict_t *xdata)
{
BARRIER_FOP_CBK(truncate, out, frame, this, op_ret, op_errno, prebuf,
postbuf, xdata);
out:
return 0;
}
int32_t
barrier_ftruncate_cbk(call_frame_t *frame, void *cookie, xlator_t *this,
int32_t op_ret, int32_t op_errno, struct iatt *prebuf,
struct iatt *postbuf, dict_t *xdata)
{
BARRIER_FOP_CBK(ftruncate, out, frame, this, op_ret, op_errno, prebuf,
postbuf, xdata);
out:
return 0;
}
int32_t
barrier_rename_cbk(call_frame_t *frame, void *cookie, xlator_t *this,
int32_t op_ret, int32_t op_errno, struct iatt *buf,
struct iatt *preoldparent, struct iatt *postoldparent,
struct iatt *prenewparent, struct iatt *postnewparent,
dict_t *xdata)
{
BARRIER_FOP_CBK(rename, out, frame, this, op_ret, op_errno, buf,
preoldparent, postoldparent, prenewparent, postnewparent,
xdata);
out:
return 0;
}
int32_t
barrier_rmdir_cbk(call_frame_t *frame, void *cookie, xlator_t *this,
int32_t op_ret, int32_t op_errno, struct iatt *preparent,
struct iatt *postparent, dict_t *xdata)
{
BARRIER_FOP_CBK(rmdir, out, frame, this, op_ret, op_errno, preparent,
postparent, xdata);
out:
return 0;
}
int32_t
barrier_unlink_cbk(call_frame_t *frame, void *cookie, xlator_t *this,
int32_t op_ret, int32_t op_errno, struct iatt *preparent,
struct iatt *postparent, dict_t *xdata)
{
BARRIER_FOP_CBK(unlink, out, frame, this, op_ret, op_errno, preparent,
postparent, xdata);
out:
return 0;
}
int32_t
barrier_fsync_cbk(call_frame_t *frame, void *cookie, xlator_t *this,
int32_t op_ret, int32_t op_errno, struct iatt *prebuf,
struct iatt *postbuf, dict_t *xdata)
{
BARRIER_FOP_CBK(fsync, out, frame, this, op_ret, op_errno, prebuf, postbuf,
xdata);
out:
return 0;
}
int32_t
barrier_writev(call_frame_t *frame, xlator_t *this, fd_t *fd,
struct iovec *vector, int32_t count, off_t off, uint32_t flags,
struct iobref *iobref, dict_t *xdata)
{
if (!((flags | fd->flags) & (O_SYNC | O_DSYNC))) {
STACK_WIND_TAIL(frame, FIRST_CHILD(this),
FIRST_CHILD(this)->fops->writev, fd, vector, count, off,
flags, iobref, xdata);
return 0;
}
barrier_local_set_gfid(frame, fd->inode->gfid, this);
STACK_WIND(frame, barrier_writev_cbk, FIRST_CHILD(this),
FIRST_CHILD(this)->fops->writev, fd, vector, count, off, flags,
iobref, xdata);
return 0;
}
int32_t
barrier_fremovexattr(call_frame_t *frame, xlator_t *this, fd_t *fd,
const char *name, dict_t *xdata)
{
barrier_local_set_gfid(frame, fd->inode->gfid, this);
STACK_WIND(frame, barrier_fremovexattr_cbk, FIRST_CHILD(this),
FIRST_CHILD(this)->fops->fremovexattr, fd, name, xdata);
return 0;
}
int32_t
barrier_removexattr(call_frame_t *frame, xlator_t *this, loc_t *loc,
const char *name, dict_t *xdata)
{
barrier_local_set_gfid(frame, loc->inode->gfid, this);
STACK_WIND(frame, barrier_removexattr_cbk, FIRST_CHILD(this),
FIRST_CHILD(this)->fops->removexattr, loc, name, xdata);
return 0;
}
int32_t
barrier_truncate(call_frame_t *frame, xlator_t *this, loc_t *loc, off_t offset,
dict_t *xdata)
{
barrier_local_set_gfid(frame, loc->inode->gfid, this);
STACK_WIND(frame, barrier_truncate_cbk, FIRST_CHILD(this),
FIRST_CHILD(this)->fops->truncate, loc, offset, xdata);
return 0;
}
int32_t
barrier_rename(call_frame_t *frame, xlator_t *this, loc_t *oldloc,
loc_t *newloc, dict_t *xdata)
{
barrier_local_set_gfid(frame, oldloc->inode->gfid, this);
STACK_WIND(frame, barrier_rename_cbk, FIRST_CHILD(this),
FIRST_CHILD(this)->fops->rename, oldloc, newloc, xdata);
return 0;
}
int
barrier_rmdir(call_frame_t *frame, xlator_t *this, loc_t *loc, int flags,
dict_t *xdata)
{
barrier_local_set_gfid(frame, loc->inode->gfid, this);
STACK_WIND(frame, barrier_rmdir_cbk, FIRST_CHILD(this),
FIRST_CHILD(this)->fops->rmdir, loc, flags, xdata);
return 0;
}
int32_t
barrier_unlink(call_frame_t *frame, xlator_t *this, loc_t *loc, int xflag,
dict_t *xdata)
{
barrier_local_set_gfid(frame, loc->inode->gfid, this);
STACK_WIND(frame, barrier_unlink_cbk, FIRST_CHILD(this),
FIRST_CHILD(this)->fops->unlink, loc, xflag, xdata);
return 0;
}
int32_t
barrier_ftruncate(call_frame_t *frame, xlator_t *this, fd_t *fd, off_t offset,
dict_t *xdata)
{
barrier_local_set_gfid(frame, fd->inode->gfid, this);
STACK_WIND(frame, barrier_ftruncate_cbk, FIRST_CHILD(this),
FIRST_CHILD(this)->fops->ftruncate, fd, offset, xdata);
return 0;
}
int32_t
barrier_fsync(call_frame_t *frame, xlator_t *this, fd_t *fd, int32_t flags,
dict_t *xdata)
{
barrier_local_set_gfid(frame, fd->inode->gfid, this);
STACK_WIND(frame, barrier_fsync_cbk, FIRST_CHILD(this),
FIRST_CHILD(this)->fops->fsync, fd, flags, xdata);
return 0;
}
call_stub_t *
__barrier_dequeue(xlator_t *this, struct list_head *queue)
{
call_stub_t *stub = NULL;
barrier_priv_t *priv = NULL;
priv = this->private;
GF_ASSERT(priv);
if (list_empty(queue))
goto out;
stub = list_entry(queue->next, call_stub_t, list);
list_del_init(&stub->list);
out:
return stub;
}
void
barrier_dequeue_all(xlator_t *this, struct list_head *queue)
{
call_stub_t *stub = NULL;
gf_log(this->name, GF_LOG_INFO, "Dequeuing all the barriered fops");
/* TODO: Start the below task in a new thread */
while ((stub = __barrier_dequeue(this, queue)))
call_resume(stub);
gf_log(this->name, GF_LOG_INFO,
"Dequeuing the barriered fops is "
"finished");
return;
}
void
barrier_timeout(void *data)
{
xlator_t *this = NULL;
barrier_priv_t *priv = NULL;
struct list_head queue = {
0,
};
this = data;
THIS = this;
priv = this->private;
INIT_LIST_HEAD(&queue);
gf_log(this->name, GF_LOG_CRITICAL,
"Disabling barrier because of "
"the barrier timeout.");
LOCK(&priv->lock);
{
__barrier_disable(this, &queue);
}
UNLOCK(&priv->lock);
barrier_dequeue_all(this, &queue);
return;
}
void
__barrier_enqueue(xlator_t *this, call_stub_t *stub)
{
barrier_priv_t *priv = NULL;
priv = this->private;
GF_ASSERT(priv);
list_add_tail(&stub->list, &priv->queue);
priv->queue_size++;
return;
}
void
__barrier_disable(xlator_t *this, struct list_head *queue)
{
GF_UNUSED int ret = 0;
barrier_priv_t *priv = NULL;
priv = this->private;
GF_ASSERT(priv);
if (priv->timer) {
ret = gf_timer_call_cancel(this->ctx, priv->timer);
priv->timer = NULL;
}
list_splice_init(&priv->queue, queue);
priv->queue_size = 0;
priv->barrier_enabled = _gf_false;
}
int
__barrier_enable(xlator_t *this, barrier_priv_t *priv)
{
int ret = -1;
priv->timer = gf_timer_call_after(this->ctx, priv->timeout, barrier_timeout,
(void *)this);
if (!priv->timer) {
gf_log(this->name, GF_LOG_CRITICAL,
"Couldn't add barrier "
"timeout event.");
goto out;
}
priv->barrier_enabled = _gf_true;
ret = 0;
out:
return ret;
}
int
notify(xlator_t *this, int event, void *data, ...)
{
barrier_priv_t *priv = this->private;
dict_t *dict = NULL;
int ret = -1;
int barrier_enabled = _gf_false;
struct list_head queue = {
0,
};
GF_ASSERT(priv);
INIT_LIST_HEAD(&queue);
switch (event) {
case GF_EVENT_TRANSLATOR_OP: {
dict = data;
barrier_enabled = dict_get_str_boolean(dict, "barrier", -1);
if (barrier_enabled == -1) {
gf_log(this->name, GF_LOG_ERROR,
"Could not fetch "
" barrier key from the dictionary.");
goto out;
}
LOCK(&priv->lock);
{
if (!priv->barrier_enabled) {
if (barrier_enabled) {
ret = __barrier_enable(this, priv);
} else {
UNLOCK(&priv->lock);
gf_log(this->name, GF_LOG_ERROR, "Already disabled.");
goto post_unlock;
}
} else {
if (!barrier_enabled) {
__barrier_disable(this, &queue);
ret = 0;
} else {
UNLOCK(&priv->lock);
gf_log(this->name, GF_LOG_ERROR, "Already enabled");
goto post_unlock;
}
}
}
UNLOCK(&priv->lock);
post_unlock:
if (!list_empty(&queue))
barrier_dequeue_all(this, &queue);
break;
}
default: {
default_notify(this, event, data);
ret = 0;
goto out;
}
}
out:
return ret;
}
int
reconfigure(xlator_t *this, dict_t *options)
{
barrier_priv_t *priv = NULL;
int ret = -1;
gf_boolean_t barrier_enabled = _gf_false;
uint32_t timeout = {
0,
};
struct list_head queue = {
0,
};
priv = this->private;
GF_ASSERT(priv);
GF_OPTION_RECONF("barrier", barrier_enabled, options, bool, out);
GF_OPTION_RECONF("barrier-timeout", timeout, options, time, out);
INIT_LIST_HEAD(&queue);
LOCK(&priv->lock);
{
if (!priv->barrier_enabled) {
if (barrier_enabled) {
ret = __barrier_enable(this, priv);
if (ret) {
goto unlock;
}
}
} else {
if (!barrier_enabled) {
__barrier_disable(this, &queue);
}
}
priv->timeout.tv_sec = timeout;
ret = 0;
}
unlock:
UNLOCK(&priv->lock);
if (!list_empty(&queue))
barrier_dequeue_all(this, &queue);
out:
return ret;
}
int32_t
mem_acct_init(xlator_t *this)
{
int ret = -1;
ret = xlator_mem_acct_init(this, gf_barrier_mt_end + 1);
if (ret)
gf_log(this->name, GF_LOG_ERROR,
"Memory accounting "
"initialization failed.");
return ret;
}
int
init(xlator_t *this)
{
int ret = -1;
barrier_priv_t *priv = NULL;
uint32_t timeout = {
0,
};
if (!this->children || this->children->next) {
gf_log(this->name, GF_LOG_ERROR,
"'barrier' not configured with exactly one child");
goto out;
}
if (!this->parents)
gf_log(this->name, GF_LOG_WARNING, "dangling volume. check volfile ");
priv = GF_CALLOC(1, sizeof(*priv), gf_barrier_mt_priv_t);
if (!priv)
goto out;
LOCK_INIT(&priv->lock);
GF_OPTION_INIT("barrier", priv->barrier_enabled, bool, out);
GF_OPTION_INIT("barrier-timeout", timeout, time, out);
priv->timeout.tv_sec = timeout;
INIT_LIST_HEAD(&priv->queue);
if (priv->barrier_enabled) {
ret = __barrier_enable(this, priv);
if (ret == -1)
goto out;
}
this->private = priv;
ret = 0;
out:
if (ret && priv)
GF_FREE(priv);
return ret;
}
void
fini(xlator_t *this)
{
barrier_priv_t *priv = NULL;
struct list_head queue = {
0,
};
priv = this->private;
if (!priv)
goto out;
INIT_LIST_HEAD(&queue);
gf_log(this->name, GF_LOG_INFO,
"Disabling barriering and dequeuing "
"all the queued fops");
LOCK(&priv->lock);
{
__barrier_disable(this, &queue);
}
UNLOCK(&priv->lock);
if (!list_empty(&queue))
barrier_dequeue_all(this, &queue);
this->private = NULL;
LOCK_DESTROY(&priv->lock);
GF_FREE(priv);
out:
return;
}
static void
barrier_dump_stub(call_stub_t *stub, char *prefix)
{
char key[GF_DUMP_MAX_BUF_LEN] = {
0,
};
gf_proc_dump_build_key(key, prefix, "fop");
gf_proc_dump_write(key, "%s", gf_fop_list[stub->fop]);
if (stub->frame->local) {
gf_proc_dump_build_key(key, prefix, "gfid");
gf_proc_dump_write(key, "%s",
uuid_utoa(*(uuid_t *)(stub->frame->local)));
}
if (stub->args.loc.path) {
gf_proc_dump_build_key(key, prefix, "path");
gf_proc_dump_write(key, "%s", stub->args.loc.path);
}
if (stub->args.loc.name) {
gf_proc_dump_build_key(key, prefix, "name");
gf_proc_dump_write(key, "%s", stub->args.loc.name);
}
return;
}
static void
__barrier_dump_queue(barrier_priv_t *priv)
{
call_stub_t *stub = NULL;
char key[GF_DUMP_MAX_BUF_LEN] = {
0,
};
int i = 0;
GF_VALIDATE_OR_GOTO("barrier", priv, out);
list_for_each_entry(stub, &priv->queue, list)
{
snprintf(key, sizeof(key), "stub.%d", i++);
gf_proc_dump_add_section("%s", key);
barrier_dump_stub(stub, key);
}
out:
return;
}
int
barrier_dump_priv(xlator_t *this)
{
int ret = -1;
char key[GF_DUMP_MAX_BUF_LEN] = {
0,
};
barrier_priv_t *priv = NULL;
GF_VALIDATE_OR_GOTO("barrier", this, out);
priv = this->private;
if (!priv)
return 0;
gf_proc_dump_build_key(key, "xlator.features.barrier", "priv");
gf_proc_dump_add_section("%s", key);
LOCK(&priv->lock);
{
gf_proc_dump_build_key(key, "barrier", "enabled");
gf_proc_dump_write(key, "%d", priv->barrier_enabled);
gf_proc_dump_build_key(key, "barrier", "timeout");
gf_proc_dump_write(key, "%ld", priv->timeout.tv_sec);
if (priv->barrier_enabled) {
gf_proc_dump_build_key(key, "barrier", "queue_size");
gf_proc_dump_write(key, "%d", priv->queue_size);
__barrier_dump_queue(priv);
}
}
UNLOCK(&priv->lock);
out:
return ret;
}
struct xlator_fops fops = {
/* Barrier Class fops */
.rmdir = barrier_rmdir,
.unlink = barrier_unlink,
.rename = barrier_rename,
.removexattr = barrier_removexattr,
.fremovexattr = barrier_fremovexattr,
.truncate = barrier_truncate,
.ftruncate = barrier_ftruncate,
.fsync = barrier_fsync,
/* Writes with only O_SYNC flag */
.writev = barrier_writev,
};
struct xlator_dumpops dumpops = {
.priv = barrier_dump_priv,
};
struct xlator_cbks cbks;
struct volume_options options[] = {
{.key = {"barrier"},
.type = GF_OPTION_TYPE_BOOL,
.default_value = "disable",
.op_version = {GD_OP_VERSION_3_6_0},
.flags = OPT_FLAG_SETTABLE,
.description = "When \"enabled\", blocks acknowledgements to application "
"for file operations such as rmdir, rename, unlink, "
"removexattr, fremovexattr, truncate, ftruncate, "
"write (with O_SYNC), fsync. It is turned \"off\" by "
"default."},
{.key = {"barrier-timeout"},
.type = GF_OPTION_TYPE_TIME,
.default_value = BARRIER_TIMEOUT,
.op_version = {GD_OP_VERSION_3_6_0},
.flags = OPT_FLAG_SETTABLE,
.description = "After 'timeout' seconds since the time 'barrier' "
"option was set to \"on\", acknowledgements to file "
"operations are no longer blocked and previously "
"blocked acknowledgements are sent to the application"},
{.key = {NULL}},
};
xlator_api_t xlator_api = {
.init = init,
.fini = fini,
.notify = notify,
.reconfigure = reconfigure,
.mem_acct_init = mem_acct_init,
.op_version = {1}, /* Present from the initial version */
.dumpops = &dumpops,
.fops = &fops,
.cbks = &cbks,
.options = options,
.identifier = "barrier",
.category = GF_MAINTAINED,
};