/*
Copyright (c) 2013 Red Hat, Inc. <http://www.redhat.com>
This file is part of GlusterFS.
This file is licensed to you under your choice of the GNU Lesser
General Public License, version 3 or any later version (LGPLv3 or
later), or the GNU General Public License, version 2 (GPLv2), in all
cases as published by the Free Software Foundation.
*/
#include <glusterfs/xlator.h>
#include <glusterfs/defaults.h>
#include <glusterfs/syscall.h>
#include <glusterfs/logging.h>
#include <glusterfs/iobuf.h>
#include "changelog-rt.h"
#include "changelog-encoders.h"
#include "changelog-mem-types.h"
#include "changelog-messages.h"
#include <pthread.h>
#include <signal.h>
#include "changelog-rpc.h"
#include "errno.h"
static struct changelog_bootstrap cb_bootstrap[] = {
{
.mode = CHANGELOG_MODE_RT,
.ctor = changelog_rt_init,
.dtor = changelog_rt_fini,
},
};
/* Entry operations - TYPE III */
/**
* entry operations do not undergo inode version checking.
*/
/* {{{ */
/* rmdir */
int32_t
changelog_rmdir_cbk(call_frame_t *frame, void *cookie, xlator_t *this,
int32_t op_ret, int32_t op_errno, struct iatt *preparent,
struct iatt *postparent, dict_t *xdata)
{
changelog_priv_t *priv = NULL;
changelog_local_t *local = NULL;
priv = this->private;
local = frame->local;
CHANGELOG_COND_GOTO(priv, ((op_ret < 0) || !local), unwind);
changelog_update(this, priv, local, CHANGELOG_TYPE_ENTRY);
unwind:
changelog_dec_fop_cnt(this, priv, local);
CHANGELOG_STACK_UNWIND(rmdir, frame, op_ret, op_errno, preparent,
postparent, xdata);
return 0;
}
int32_t
changelog_rmdir_resume(call_frame_t *frame, xlator_t *this, loc_t *loc,
int xflags, dict_t *xdata)
{
changelog_priv_t *priv = NULL;
priv = this->private;
gf_msg_debug(this->name, 0, "Dequeue rmdir");
changelog_color_fop_and_inc_cnt(this, priv, frame->local);
STACK_WIND(frame, changelog_rmdir_cbk, FIRST_CHILD(this),
FIRST_CHILD(this)->fops->rmdir, loc, xflags, xdata);
return 0;
}
int32_t
changelog_rmdir(call_frame_t *frame, xlator_t *this, loc_t *loc, int xflags,
dict_t *xdata)
{
size_t xtra_len = 0;
changelog_priv_t *priv = NULL;
changelog_opt_t *co = NULL;
call_stub_t *stub = NULL;
struct list_head queue = {
0,
};
gf_boolean_t barrier_enabled = _gf_false;
INIT_LIST_HEAD(&queue);
priv = this->private;
CHANGELOG_NOT_ACTIVE_THEN_GOTO(frame, priv, wind);
CHANGELOG_INIT_NOCHECK(this, frame->local, NULL, loc->inode->gfid, 2);
co = changelog_get_usable_buffer(frame->local);
if (!co)
goto wind;
CHANGLOG_FILL_FOP_NUMBER(co, frame->root->op, fop_fn, xtra_len);
co++;
if (priv->capture_del_path) {
CHANGELOG_FILL_ENTRY_DIR_PATH(co, loc->pargfid, loc->name, del_entry_fn,
del_entry_free_fn, xtra_len, wind,
_gf_true);
} else {
CHANGELOG_FILL_ENTRY_DIR_PATH(co, loc->pargfid, loc->name, del_entry_fn,
del_entry_free_fn, xtra_len, wind,
_gf_false);
}
changelog_set_usable_record_and_length(frame->local, xtra_len, 2);
/* changelog barrier */
/* Color assignment and increment of fop_cnt for rmdir/unlink/rename
* should be made with in priv lock if changelog barrier is not enabled.
* Because if counter is not incremented yet, draining wakes up and
* publishes the changelog but later these fops might hit the disk and
* present in snapped volume but where as the intention is these fops
* should not be present in snapped volume.
*/
LOCK(&priv->lock);
{
if ((barrier_enabled = priv->barrier_enabled)) {
stub = fop_rmdir_stub(frame, changelog_rmdir_resume, loc, xflags,
xdata);
if (!stub)
__chlog_barrier_disable(this, &queue);
else
__chlog_barrier_enqueue(this, stub);
} else {
((changelog_local_t *)frame->local)->color = priv->current_color;
changelog_inc_fop_cnt(this, priv, frame->local);
}
}
UNLOCK(&priv->lock);
if (barrier_enabled && stub) {
gf_msg_debug(this->name, 0, "Enqueue rmdir");
goto out;
}
if (barrier_enabled && !stub) {
gf_smsg(this->name, GF_LOG_ERROR, ENOMEM, CHANGELOG_MSG_NO_MEMORY,
"Failed to barrier FOPs, disabling changelog barrier",
"fop=rmdir", NULL);
chlog_barrier_dequeue_all(this, &queue);
}
/* changelog barrier */
wind:
STACK_WIND(frame, changelog_rmdir_cbk, FIRST_CHILD(this),
FIRST_CHILD(this)->fops->rmdir, loc, xflags, xdata);
out:
return 0;
}
/* unlink */
int32_t
changelog_unlink_cbk(call_frame_t *frame, void *cookie, xlator_t *this,
int32_t op_ret, int32_t op_errno, struct iatt *preparent,
struct iatt *postparent, dict_t *xdata)
{
changelog_priv_t *priv = NULL;
changelog_local_t *local = NULL;
priv = this->private;
local = frame->local;
CHANGELOG_COND_GOTO(priv, ((op_ret < 0) || !local), unwind);
changelog_update(this, priv, local, CHANGELOG_TYPE_ENTRY);
unwind:
changelog_dec_fop_cnt(this, priv, local);
CHANGELOG_STACK_UNWIND(unlink, frame, op_ret, op_errno, preparent,
postparent, xdata);
return 0;
}
int32_t
changelog_unlink_resume(call_frame_t *frame, xlator_t *this, loc_t *loc,
int xflags, dict_t *xdata)
{
changelog_priv_t *priv = NULL;
priv = this->private;
gf_msg_debug(this->name, 0, "Dequeue unlink");
changelog_color_fop_and_inc_cnt(this, priv, frame->local);
STACK_WIND(frame, changelog_unlink_cbk, FIRST_CHILD(this),
FIRST_CHILD(this)->fops->unlink, loc, xflags, xdata);
return 0;
}
int32_t
changelog_unlink(call_frame_t *frame, xlator_t *this, loc_t *loc, int xflags,
dict_t *xdata)
{
size_t xtra_len = 0;
changelog_priv_t *priv = NULL;
changelog_opt_t *co = NULL;
call_stub_t *stub = NULL;
struct list_head queue = {
0,
};
gf_boolean_t barrier_enabled = _gf_false;
dht_changelog_rename_info_t *info = NULL;
int ret = 0;
char *old_name = NULL;
char *new_name = NULL;
char *nname = NULL;
INIT_LIST_HEAD(&queue);
priv = this->private;
CHANGELOG_NOT_ACTIVE_THEN_GOTO(frame, priv, wind);
ret = dict_get_bin(xdata, DHT_CHANGELOG_RENAME_OP_KEY, (void **)&info);
if (!ret) { /* special case: unlink considered as rename */
/* 3 == fop + oldloc + newloc */
old_name = alloca(info->oldname_len);
new_name = alloca(info->newname_len);
CHANGELOG_INIT_NOCHECK(this, frame->local, NULL, loc->inode->gfid, 3);
co = changelog_get_usable_buffer(frame->local);
if (!co)
goto wind;
CHANGLOG_FILL_FOP_NUMBER(co, GF_FOP_RENAME, fop_fn, xtra_len);
co++;
strncpy(old_name, info->buffer, info->oldname_len);
CHANGELOG_FILL_ENTRY(co, info->old_pargfid, old_name, entry_fn,
entry_free_fn, xtra_len, wind);
co++;
/* new name resides just after old name */
nname = info->buffer + info->oldname_len;
strncpy(new_name, nname, info->newname_len);
CHANGELOG_FILL_ENTRY(co, info->new_pargfid, new_name, entry_fn,
entry_free_fn, xtra_len, wind);
changelog_set_usable_record_and_length(frame->local, xtra_len, 3);
} else { /* default unlink */
CHANGELOG_IF_INTERNAL_FOP_THEN_GOTO(frame, xdata, wind);
CHANGELOG_INIT_NOCHECK(this, frame->local, NULL, loc->inode->gfid, 2);
co = changelog_get_usable_buffer(frame->local);
if (!co)
goto wind;
CHANGLOG_FILL_FOP_NUMBER(co, frame->root->op, fop_fn, xtra_len);
co++;
if (priv->capture_del_path) {
CHANGELOG_FILL_ENTRY_DIR_PATH(co, loc->pargfid, loc->name,
del_entry_fn, del_entry_free_fn,
xtra_len, wind, _gf_true);
} else {
CHANGELOG_FILL_ENTRY_DIR_PATH(co, loc->pargfid, loc->name,
del_entry_fn, del_entry_free_fn,
xtra_len, wind, _gf_false);
}
changelog_set_usable_record_and_length(frame->local, xtra_len, 2);
}
/* changelog barrier */
LOCK(&priv->lock);
{
if ((barrier_enabled = priv->barrier_enabled)) {
stub = fop_unlink_stub(frame, changelog_unlink_resume, loc, xflags,
xdata);
if (!stub)
__chlog_barrier_disable(this, &queue);
else
__chlog_barrier_enqueue(this, stub);
} else {
((changelog_local_t *)frame->local)->color = priv->current_color;
changelog_inc_fop_cnt(this, priv, frame->local);
}
}
UNLOCK(&priv->lock);
if (barrier_enabled && stub) {
gf_msg_debug(this->name, 0, "Enqueue unlink");
goto out;
}
if (barrier_enabled && !stub) {
gf_smsg(this->name, GF_LOG_ERROR, ENOMEM, CHANGELOG_MSG_NO_MEMORY,
"Failed to barrier FOPs, disabling changelog barrier",
"fop=unlink", NULL);
chlog_barrier_dequeue_all(this, &queue);
}
/* changelog barrier */
wind:
STACK_WIND(frame, changelog_unlink_cbk, FIRST_CHILD(this),
FIRST_CHILD(this)->fops->unlink, loc, xflags, xdata);
out:
return 0;
}
/* rename */
int32_t
changelog_rename_cbk(call_frame_t *frame, void *cookie, xlator_t *this,
int32_t op_ret, int32_t op_errno, struct iatt *buf,
struct iatt *preoldparent, struct iatt *postoldparent,
struct iatt *prenewparent, struct iatt *postnewparent,
dict_t *xdata)
{
changelog_priv_t *priv = NULL;
changelog_local_t *local = NULL;
priv = this->private;
local = frame->local;
CHANGELOG_COND_GOTO(priv, ((op_ret < 0) || !local), unwind);
changelog_update(this, priv, local, CHANGELOG_TYPE_ENTRY);
unwind:
changelog_dec_fop_cnt(this, priv, local);
CHANGELOG_STACK_UNWIND(rename, frame, op_ret, op_errno, buf, preoldparent,
postoldparent, prenewparent, postnewparent, xdata);
return 0;
}
int32_t
changelog_rename_resume(call_frame_t *frame, xlator_t *this, loc_t *oldloc,
loc_t *newloc, dict_t *xdata)
{
changelog_priv_t *priv = NULL;
priv = this->private;
gf_msg_debug(this->name, 0, "Dequeue rename");
changelog_color_fop_and_inc_cnt(this, priv, frame->local);
STACK_WIND(frame, changelog_rename_cbk, FIRST_CHILD(this),
FIRST_CHILD(this)->fops->rename, oldloc, newloc, xdata);
return 0;
}
int32_t
changelog_rename(call_frame_t *frame, xlator_t *this, loc_t *oldloc,
loc_t *newloc, dict_t *xdata)
{
size_t xtra_len = 0;
changelog_priv_t *priv = NULL;
changelog_opt_t *co = NULL;
call_stub_t *stub = NULL;
struct list_head queue = {
0,
};
gf_boolean_t barrier_enabled = _gf_false;
dht_changelog_rename_info_t *info = NULL;
int ret = 0;
INIT_LIST_HEAD(&queue);
priv = this->private;
CHANGELOG_NOT_ACTIVE_THEN_GOTO(frame, priv, wind);
ret = dict_get_bin(xdata, DHT_CHANGELOG_RENAME_OP_KEY, (void **)&info);
if (ret && oldloc->inode->ia_type != IA_IFDIR) {
/* xdata "NOT" set for a non-directory,
* Special rename => avoid logging */
goto wind;
}
/* 3 == fop + oldloc + newloc */
CHANGELOG_INIT_NOCHECK(this, frame->local, NULL, oldloc->inode->gfid, 3);
co = changelog_get_usable_buffer(frame->local);
if (!co)
goto wind;
CHANGLOG_FILL_FOP_NUMBER(co, frame->root->op, fop_fn, xtra_len);
co++;
CHANGELOG_FILL_ENTRY(co, oldloc->pargfid, oldloc->name, entry_fn,
entry_free_fn, xtra_len, wind);
co++;
CHANGELOG_FILL_ENTRY(co, newloc->pargfid, newloc->name, entry_fn,
entry_free_fn, xtra_len, wind);
changelog_set_usable_record_and_length(frame->local, xtra_len, 3);
/* changelog barrier */
LOCK(&priv->lock);
{
if ((barrier_enabled = priv->barrier_enabled)) {
stub = fop_rename_stub(frame, changelog_rename_resume, oldloc,
newloc, xdata);
if (!stub)
__chlog_barrier_disable(this, &queue);
else
__chlog_barrier_enqueue(this, stub);
} else {
((changelog_local_t *)frame->local)->color = priv->current_color;
changelog_inc_fop_cnt(this, priv, frame->local);
}
}
UNLOCK(&priv->lock);
if (barrier_enabled && stub) {
gf_msg_debug(this->name, 0, "Enqueue rename");
goto out;
}
if (barrier_enabled && !stub) {
gf_smsg(this->name, GF_LOG_ERROR, ENOMEM, CHANGELOG_MSG_NO_MEMORY,
"Failed to barrier FOPs, disabling changelog barrier",
"fop=rename", NULL);
chlog_barrier_dequeue_all(this, &queue);
}
/* changelog barrier */
wind:
STACK_WIND(frame, changelog_rename_cbk, FIRST_CHILD(this),
FIRST_CHILD(this)->fops->rename, oldloc, newloc, xdata);
out:
return 0;
}
/* link */
int32_t
changelog_link_cbk(call_frame_t *frame, void *cookie, xlator_t *this,
int32_t op_ret, int32_t op_errno, inode_t *inode,
struct iatt *buf, struct iatt *preparent,
struct iatt *postparent, dict_t *xdata)
{
changelog_priv_t *priv = NULL;
changelog_local_t *local = NULL;
priv = this->private;
local = frame->local;
CHANGELOG_COND_GOTO(priv, ((op_ret < 0) || !local), unwind);
changelog_update(this, priv, local, CHANGELOG_TYPE_ENTRY);
unwind:
changelog_dec_fop_cnt(this, priv, local);
CHANGELOG_STACK_UNWIND(link, frame, op_ret, op_errno, inode, buf, preparent,
postparent, xdata);
return 0;
}
int32_t
changelog_link_resume(call_frame_t *frame, xlator_t *this, loc_t *oldloc,
loc_t *newloc, dict_t *xdata)
{
changelog_priv_t *priv = NULL;
GF_VALIDATE_OR_GOTO("changelog", this, out);
GF_VALIDATE_OR_GOTO("changelog", this->fops, out);
GF_VALIDATE_OR_GOTO("changelog", frame, out);
priv = this->private;
gf_msg_debug(this->name, 0, "Dequeuing link");
changelog_color_fop_and_inc_cnt(this, priv, frame->local);
STACK_WIND(frame, changelog_link_cbk, FIRST_CHILD(this),
FIRST_CHILD(this)->fops->link, oldloc, newloc, xdata);
return 0;
out:
return -1;
}
int32_t
changelog_link(call_frame_t *frame, xlator_t *this, loc_t *oldloc,
loc_t *newloc, dict_t *xdata)
{
size_t xtra_len = 0;
changelog_priv_t *priv = NULL;
changelog_opt_t *co = NULL;
call_stub_t *stub = NULL;
struct list_head queue = {
0,
};
gf_boolean_t barrier_enabled = _gf_false;
priv = this->private;
CHANGELOG_NOT_ACTIVE_THEN_GOTO(frame, priv, wind);
CHANGELOG_IF_INTERNAL_FOP_THEN_GOTO(frame, xdata, wind);
CHANGELOG_INIT_NOCHECK(this, frame->local, NULL, oldloc->gfid, 2);
co = changelog_get_usable_buffer(frame->local);
if (!co)
goto wind;
CHANGLOG_FILL_FOP_NUMBER(co, frame->root->op, fop_fn, xtra_len);
co++;
CHANGELOG_FILL_ENTRY(co, newloc->pargfid, newloc->name, entry_fn,
entry_free_fn, xtra_len, wind);
changelog_set_usable_record_and_length(frame->local, xtra_len, 2);
LOCK(&priv->lock);
{
if ((barrier_enabled = priv->barrier_enabled)) {
stub = fop_link_stub(frame, changelog_link_resume, oldloc, newloc,
xdata);
if (!stub)
__chlog_barrier_disable(this, &queue);
else
__chlog_barrier_enqueue(this, stub);
} else {
((changelog_local_t *)frame->local)->color = priv->current_color;
changelog_inc_fop_cnt(this, priv, frame->local);
}
}
UNLOCK(&priv->lock);
if (barrier_enabled && stub) {
gf_msg_debug(this->name, 0, "Enqueued link");
goto out;
}
if (barrier_enabled && !stub) {
gf_smsg(this->name, GF_LOG_ERROR, 0, CHANGELOG_MSG_NO_MEMORY,
"Failed to barrier FOPs, disabling changelog barrier",
"fop=link", NULL);
chlog_barrier_dequeue_all(this, &queue);
}
wind:
STACK_WIND(frame, changelog_link_cbk, FIRST_CHILD(this),
FIRST_CHILD(this)->fops->link, oldloc, newloc, xdata);
out:
return 0;
}
/* mkdir */
int32_t
changelog_mkdir_cbk(call_frame_t *frame, void *cookie, xlator_t *this,
int32_t op_ret, int32_t op_errno, inode_t *inode,
struct iatt *buf, struct iatt *preparent,
struct iatt *postparent, dict_t *xdata)
{
changelog_priv_t *priv = NULL;
changelog_local_t *local = NULL;
priv = this->private;
local = frame->local;
CHANGELOG_COND_GOTO(priv, ((op_ret < 0) || !local), unwind);
changelog_update(this, priv, local, CHANGELOG_TYPE_ENTRY);
unwind:
changelog_dec_fop_cnt(this, priv, local);
CHANGELOG_STACK_UNWIND(mkdir, frame, op_ret, op_errno, inode, buf,
preparent, postparent, xdata);
return 0;
}
int32_t
changelog_mkdir_resume(call_frame_t *frame, xlator_t *this, loc_t *loc,
mode_t mode, mode_t umask, dict_t *xdata)
{
changelog_priv_t *priv = NULL;
GF_VALIDATE_OR_GOTO("changelog", this, out);
GF_VALIDATE_OR_GOTO("changelog", this->fops, out);
GF_VALIDATE_OR_GOTO("changelog", frame, out);
priv = this->private;
gf_msg_debug(this->name, 0, "Dequeuing mkdir");
changelog_color_fop_and_inc_cnt(this, priv, frame->local);
STACK_WIND(frame, changelog_mkdir_cbk, FIRST_CHILD(this),
FIRST_CHILD(this)->fops->mkdir, loc, mode, umask, xdata);
return 0;
out:
return -1;
}
int32_t
changelog_mkdir(call_frame_t *frame, xlator_t *this, loc_t *loc, mode_t mode,
mode_t umask, dict_t *xdata)
{
int ret = -1;
uuid_t gfid = {
0,
};
size_t xtra_len = 0;
changelog_priv_t *priv = NULL;
changelog_opt_t *co = NULL;
call_stub_t *stub = NULL;
struct list_head queue = {
0,
};
gf_boolean_t barrier_enabled = _gf_false;
priv = this->private;
CHANGELOG_NOT_ACTIVE_THEN_GOTO(frame, priv, wind);
ret = dict_get_gfuuid(xdata, "gfid-req", &gfid);
if (ret) {
gf_msg_debug(this->name, 0, "failed to get gfid from dict");
goto wind;
}
CHANGELOG_INIT_NOCHECK(this, frame->local, NULL, gfid, 5);
co = changelog_get_usable_buffer(frame->local);
if (!co)
goto wind;
CHANGLOG_FILL_FOP_NUMBER(co, frame->root->op, fop_fn, xtra_len);
co++;
CHANGELOG_FILL_UINT32(co, S_IFDIR | mode, number_fn, xtra_len);
co++;
CHANGELOG_FILL_UINT32(co, frame->root->uid, number_fn, xtra_len);
co++;
CHANGELOG_FILL_UINT32(co, frame->root->gid, number_fn, xtra_len);
co++;
CHANGELOG_FILL_ENTRY(co, loc->pargfid, loc->name, entry_fn, entry_free_fn,
xtra_len, wind);
changelog_set_usable_record_and_length(frame->local, xtra_len, 5);
LOCK(&priv->lock);
{
if ((barrier_enabled = priv->barrier_enabled)) {
stub = fop_mkdir_stub(frame, changelog_mkdir_resume, loc, mode,
umask, xdata);
if (!stub)
__chlog_barrier_disable(this, &queue);
else
__chlog_barrier_enqueue(this, stub);
} else {
((changelog_local_t *)frame->local)->color = priv->current_color;
changelog_inc_fop_cnt(this, priv, frame->local);
}
}
UNLOCK(&priv->lock);
if (barrier_enabled && stub) {
gf_msg_debug(this->name, 0, "Enqueued mkdir");
goto out;
}
if (barrier_enabled && !stub) {
gf_smsg(this->name, GF_LOG_ERROR, ENOMEM, CHANGELOG_MSG_NO_MEMORY,
"Failed to barrier FOPs, disabling changelog barrier",
"fop=mkdir", NULL);
chlog_barrier_dequeue_all(this, &queue);
}
wind:
STACK_WIND(frame, changelog_mkdir_cbk, FIRST_CHILD(this),
FIRST_CHILD(this)->fops->mkdir, loc, mode, umask, xdata);
out:
return 0;
}
/* symlink */
int32_t
changelog_symlink_cbk(call_frame_t *frame, void *cookie, xlator_t *this,
int32_t op_ret, int32_t op_errno, inode_t *inode,
struct iatt *buf, struct iatt *preparent,
struct iatt *postparent, dict_t *xdata)
{
changelog_priv_t *priv = NULL;
changelog_local_t *local = NULL;
priv = this->private;
local = frame->local;
CHANGELOG_COND_GOTO(priv, ((op_ret < 0) || !local), unwind);
changelog_update(this, priv, local, CHANGELOG_TYPE_ENTRY);
unwind:
changelog_dec_fop_cnt(this, priv, local);
CHANGELOG_STACK_UNWIND(symlink, frame, op_ret, op_errno, inode, buf,
preparent, postparent, xdata);
return 0;
}
int32_t
changelog_symlink_resume(call_frame_t *frame, xlator_t *this,
const char *linkname, loc_t *loc, mode_t umask,
dict_t *xdata)
{
changelog_priv_t *priv = NULL;
GF_VALIDATE_OR_GOTO("changelog", this, out);
GF_VALIDATE_OR_GOTO("changelog", this->fops, out);
GF_VALIDATE_OR_GOTO("changelog", frame, out);
priv = this->private;
gf_msg_debug(this->name, 0, "Dequeuing symlink");
changelog_color_fop_and_inc_cnt(this, priv, frame->local);
STACK_WIND(frame, changelog_symlink_cbk, FIRST_CHILD(this),
FIRST_CHILD(this)->fops->symlink, linkname, loc, umask, xdata);
return 0;
out:
return -1;
}
int32_t
changelog_symlink(call_frame_t *frame, xlator_t *this, const char *linkname,
loc_t *loc, mode_t umask, dict_t *xdata)
{
int ret = -1;
size_t xtra_len = 0;
uuid_t gfid = {
0,
};
changelog_priv_t *priv = NULL;
changelog_opt_t *co = NULL;
call_stub_t *stub = NULL;
struct list_head queue = {
0,
};
gf_boolean_t barrier_enabled = _gf_false;
priv = this->private;
CHANGELOG_NOT_ACTIVE_THEN_GOTO(frame, priv, wind);
ret = dict_get_gfuuid(xdata, "gfid-req", &gfid);
if (ret) {
gf_msg_debug(this->name, 0, "failed to get gfid from dict");
goto wind;
}
CHANGELOG_INIT_NOCHECK(this, frame->local, NULL, gfid, 2);
co = changelog_get_usable_buffer(frame->local);
if (!co)
goto wind;
CHANGLOG_FILL_FOP_NUMBER(co, frame->root->op, fop_fn, xtra_len);
co++;
CHANGELOG_FILL_ENTRY(co, loc->pargfid, loc->name, entry_fn, entry_free_fn,
xtra_len, wind);
changelog_set_usable_record_and_length(frame->local, xtra_len, 2);
LOCK(&priv->lock);
{
if ((barrier_enabled = priv->barrier_enabled)) {
stub = fop_symlink_stub(frame, changelog_symlink_resume, linkname,
loc, umask, xdata);
if (!stub)
__chlog_barrier_disable(this, &queue);
else
__chlog_barrier_enqueue(this, stub);
} else {
((changelog_local_t *)frame->local)->color = priv->current_color;
changelog_inc_fop_cnt(this, priv, frame->local);
}
}
UNLOCK(&priv->lock);
if (barrier_enabled && stub) {
gf_msg_debug(this->name, 0, "Enqueued symlink");
goto out;
}
if (barrier_enabled && !stub) {
gf_smsg(this->name, GF_LOG_ERROR, ENOMEM, CHANGELOG_MSG_NO_MEMORY,
"Failed to barrier FOPs, disabling changelog barrier",
"fop=symlink", NULL);
chlog_barrier_dequeue_all(this, &queue);
}
wind:
STACK_WIND(frame, changelog_symlink_cbk, FIRST_CHILD(this),
FIRST_CHILD(this)->fops->symlink, linkname, loc, umask, xdata);
out:
return 0;
}
/* mknod */
int32_t
changelog_mknod_cbk(call_frame_t *frame, void *cookie, xlator_t *this,
int32_t op_ret, int32_t op_errno, inode_t *inode,
struct iatt *buf, struct iatt *preparent,
struct iatt *postparent, dict_t *xdata)
{
changelog_priv_t *priv = NULL;
changelog_local_t *local = NULL;
priv = this->private;
local = frame->local;
CHANGELOG_COND_GOTO(priv, ((op_ret < 0) || !local), unwind);
changelog_update(this, priv, local, CHANGELOG_TYPE_ENTRY);
unwind:
changelog_dec_fop_cnt(this, priv, local);
CHANGELOG_STACK_UNWIND(mknod, frame, op_ret, op_errno, inode, buf,
preparent, postparent, xdata);
return 0;
}
int32_t
changelog_mknod_resume(call_frame_t *frame, xlator_t *this, loc_t *loc,
mode_t mode, dev_t rdev, mode_t umask, dict_t *xdata)
{
changelog_priv_t *priv = NULL;
GF_VALIDATE_OR_GOTO("changelog", this, out);
GF_VALIDATE_OR_GOTO("changelog", this->fops, out);
GF_VALIDATE_OR_GOTO("changelog", frame, out);
priv = this->private;
gf_msg_debug(this->name, 0, "Dequeuing mknod");
changelog_color_fop_and_inc_cnt(this, priv, frame->local);
STACK_WIND(frame, changelog_mknod_cbk, FIRST_CHILD(this),
FIRST_CHILD(this)->fops->mknod, loc, mode, rdev, umask, xdata);
return 0;
out:
return -1;
}
int32_t
changelog_mknod(call_frame_t *frame, xlator_t *this, loc_t *loc, mode_t mode,
dev_t dev, mode_t umask, dict_t *xdata)
{
int ret = -1;
uuid_t gfid = {
0,
};
size_t xtra_len = 0;
changelog_priv_t *priv = NULL;
changelog_opt_t *co = NULL;
call_stub_t *stub = NULL;
struct list_head queue = {
0,
};
gf_boolean_t barrier_enabled = _gf_false;
priv = this->private;
/* Check whether changelog active */
if (!(priv->active))
goto wind;
/* Check whether rebalance activity */
if (frame->root->pid == GF_CLIENT_PID_DEFRAG)
goto wind;
/* If tier-dht linkto is SET, ignore about verifiying :
* 1. Whether internal fop AND
* 2. Whether tier rebalance process activity (this will help in
* recording mknod if tier rebalance process calls this mknod) */
if (!(dict_get(xdata, "trusted.tier.tier-dht.linkto"))) {
CHANGELOG_IF_INTERNAL_FOP_THEN_GOTO(frame, xdata, wind);
if (frame->root->pid == GF_CLIENT_PID_TIER_DEFRAG)
goto wind;
}
ret = dict_get_gfuuid(xdata, "gfid-req", &gfid);
if (ret) {
gf_msg_debug(this->name, 0, "failed to get gfid from dict");
goto wind;
}
CHANGELOG_INIT_NOCHECK(this, frame->local, NULL, gfid, 5);
co = changelog_get_usable_buffer(frame->local);
if (!co)
goto wind;
CHANGLOG_FILL_FOP_NUMBER(co, frame->root->op, fop_fn, xtra_len);
co++;
CHANGELOG_FILL_UINT32(co, mode, number_fn, xtra_len);
co++;
CHANGELOG_FILL_UINT32(co, frame->root->uid, number_fn, xtra_len);
co++;
CHANGELOG_FILL_UINT32(co, frame->root->gid, number_fn, xtra_len);
co++;
CHANGELOG_FILL_ENTRY(co, loc->pargfid, loc->name, entry_fn, entry_free_fn,
xtra_len, wind);
changelog_set_usable_record_and_length(frame->local, xtra_len, 5);
LOCK(&priv->lock);
{
if ((barrier_enabled = priv->barrier_enabled)) {
stub = fop_mknod_stub(frame, changelog_mknod_resume, loc, mode, dev,
umask, xdata);
if (!stub)
__chlog_barrier_disable(this, &queue);
else
__chlog_barrier_enqueue(this, stub);
} else {
((changelog_local_t *)frame->local)->color = priv->current_color;
changelog_inc_fop_cnt(this, priv, frame->local);
}
}
UNLOCK(&priv->lock);
if (barrier_enabled && stub) {
gf_msg_debug(this->name, 0, "Enqueued mknod");
goto out;
}
if (barrier_enabled && !stub) {
gf_smsg(this->name, GF_LOG_ERROR, ENOMEM, CHANGELOG_MSG_NO_MEMORY,
"Failed to barrier FOPs, disabling changelog barrier",
"fop=mknod", NULL);
chlog_barrier_dequeue_all(this, &queue);
}
wind:
STACK_WIND(frame, changelog_mknod_cbk, FIRST_CHILD(this),
FIRST_CHILD(this)->fops->mknod, loc, mode, dev, umask, xdata);
out:
return 0;
}
/* create */
int32_t
changelog_create_cbk(call_frame_t *frame, void *cookie, xlator_t *this,
int32_t op_ret, int32_t op_errno, fd_t *fd, inode_t *inode,
struct iatt *buf, struct iatt *preparent,
struct iatt *postparent, dict_t *xdata)
{
int32_t ret = 0;
changelog_priv_t *priv = NULL;
changelog_local_t *local = NULL;
changelog_event_t ev = {
0,
};
priv = this->private;
local = frame->local;
CHANGELOG_COND_GOTO(priv, ((op_ret < 0) || !local), unwind);
/* fill the event structure.. similar to open() */
ev.ev_type = CHANGELOG_OP_TYPE_CREATE;
gf_uuid_copy(ev.u.create.gfid, buf->ia_gfid);
ev.u.create.flags = fd->flags;
changelog_dispatch_event(this, priv, &ev);
if (changelog_ev_selected(this, &priv->ev_selection,
CHANGELOG_OP_TYPE_RELEASE)) {
ret = fd_ctx_set(fd, this, (uint64_t)(long)0x1);
if (ret)
gf_msg(this->name, GF_LOG_WARNING, 0, CHANGELOG_MSG_SET_FD_CONTEXT,
"could not set fd context (for release cbk)");
}
changelog_update(this, priv, local, CHANGELOG_TYPE_ENTRY);
unwind:
changelog_dec_fop_cnt(this, priv, local);
CHANGELOG_STACK_UNWIND(create, frame, op_ret, op_errno, fd, inode, buf,
preparent, postparent, xdata);
return 0;
}
int32_t
changelog_create_resume(call_frame_t *frame, xlator_t *this, loc_t *loc,
int32_t flags, mode_t mode, mode_t umask, fd_t *fd,
dict_t *xdata)
{
changelog_priv_t *priv = NULL;
GF_VALIDATE_OR_GOTO("changelog", this, out);
GF_VALIDATE_OR_GOTO("changelog", this->fops, out);
GF_VALIDATE_OR_GOTO("changelog", frame, out);
priv = this->private;
gf_msg_debug(this->name, 0, "Dequeuing create");
changelog_color_fop_and_inc_cnt(this, priv, frame->local);
STACK_WIND(frame, changelog_create_cbk, FIRST_CHILD(this),
FIRST_CHILD(this)->fops->create, loc, flags, mode, umask, fd,
xdata);
return 0;
out:
return -1;
}
int32_t
changelog_create(call_frame_t *frame, xlator_t *this, loc_t *loc, int32_t flags,
mode_t mode, mode_t umask, fd_t *fd, dict_t *xdata)
{
int ret = -1;
uuid_t gfid = {
0,
};
changelog_opt_t *co = NULL;
changelog_priv_t *priv = NULL;
size_t xtra_len = 0;
call_stub_t *stub = NULL;
struct list_head queue = {
0,
};
gf_boolean_t barrier_enabled = _gf_false;
priv = this->private;
CHANGELOG_NOT_ACTIVE_THEN_GOTO(frame, priv, wind);
ret = dict_get_gfuuid(xdata, "gfid-req", &gfid);
if (ret) {
gf_msg_debug(this->name, 0, "failed to get gfid from dict");
goto wind;
}
/* init with two extra records */
CHANGELOG_INIT_NOCHECK(this, frame->local, NULL, gfid, 5);
if (!frame->local)
goto wind;
co = changelog_get_usable_buffer(frame->local);
if (!co)
goto wind;
CHANGLOG_FILL_FOP_NUMBER(co, frame->root->op, fop_fn, xtra_len);
co++;
CHANGELOG_FILL_UINT32(co, mode, number_fn, xtra_len);
co++;
CHANGELOG_FILL_UINT32(co, frame->root->uid, number_fn, xtra_len);
co++;
CHANGELOG_FILL_UINT32(co, frame->root->gid, number_fn, xtra_len);
co++;
CHANGELOG_FILL_ENTRY(co, loc->pargfid, loc->name, entry_fn, entry_free_fn,
xtra_len, wind);
changelog_set_usable_record_and_length(frame->local, xtra_len, 5);
LOCK(&priv->lock);
{
if ((barrier_enabled = priv->barrier_enabled)) {
stub = fop_create_stub(frame, changelog_create_resume, loc, flags,
mode, umask, fd, xdata);
if (!stub)
__chlog_barrier_disable(this, &queue);
else
__chlog_barrier_enqueue(this, stub);
} else {
((changelog_local_t *)frame->local)->color = priv->current_color;
changelog_inc_fop_cnt(this, priv, frame->local);
}
}
UNLOCK(&priv->lock);
if (barrier_enabled && stub) {
gf_msg_debug(this->name, 0, "Enqueued create");
goto out;
}
if (barrier_enabled && !stub) {
gf_smsg(this->name, GF_LOG_ERROR, ENOMEM, CHANGELOG_MSG_NO_MEMORY,
"Failed to barrier FOPs, disabling changelog barrier",
"fop=create", NULL);
chlog_barrier_dequeue_all(this, &queue);
}
wind:
STACK_WIND(frame, changelog_create_cbk, FIRST_CHILD(this),
FIRST_CHILD(this)->fops->create, loc, flags, mode, umask, fd,
xdata);
out:
return 0;
}
/* }}} */
/* Metadata modification fops - TYPE II */
/* {{{ */
/* {f}setattr */
int32_t
changelog_fsetattr_cbk(call_frame_t *frame, void *cookie, xlator_t *this,
int32_t op_ret, int32_t op_errno,
struct iatt *preop_stbuf, struct iatt *postop_stbuf,
dict_t *xdata)
{
changelog_priv_t *priv = NULL;
changelog_local_t *local = NULL;
priv = this->private;
local = frame->local;
CHANGELOG_COND_GOTO(priv, ((op_ret < 0) || !local), unwind);
changelog_update(this, priv, local, CHANGELOG_TYPE_METADATA);
unwind:
changelog_dec_fop_cnt(this, priv, local);
CHANGELOG_STACK_UNWIND(fsetattr, frame, op_ret, op_errno, preop_stbuf,
postop_stbuf, xdata);
return 0;
}
int32_t
changelog_fsetattr(call_frame_t *frame, xlator_t *this, fd_t *fd,
struct iatt *stbuf, int32_t valid, dict_t *xdata)
{
changelog_priv_t *priv = NULL;
changelog_opt_t *co = NULL;
size_t xtra_len = 0;
priv = this->private;
CHANGELOG_NOT_ACTIVE_THEN_GOTO(frame, priv, wind);
CHANGELOG_OP_BOUNDARY_CHECK(frame, wind);
CHANGELOG_INIT(this, frame->local, fd->inode, fd->inode->gfid, 1);
if (!frame->local)
goto wind;
co = changelog_get_usable_buffer(frame->local);
if (!co)
goto wind;
CHANGLOG_FILL_FOP_NUMBER(co, frame->root->op, fop_fn, xtra_len);
changelog_set_usable_record_and_length(frame->local, xtra_len, 1);
wind:
changelog_color_fop_and_inc_cnt(this, priv, frame->local);
STACK_WIND(frame, changelog_fsetattr_cbk, FIRST_CHILD(this),
FIRST_CHILD(this)->fops->fsetattr, fd, stbuf, valid, xdata);
return 0;
}
int32_t
changelog_setattr_cbk(call_frame_t *frame, void *cookie, xlator_t *this,
int32_t op_ret, int32_t op_errno,
struct iatt *preop_stbuf, struct iatt *postop_stbuf,
dict_t *xdata)
{
changelog_priv_t *priv = NULL;
changelog_local_t *local = NULL;
priv = this->private;
local = frame->local;
CHANGELOG_COND_GOTO(priv, ((op_ret < 0) || !local), unwind);
changelog_update(this, priv, local, CHANGELOG_TYPE_METADATA);
unwind:
changelog_dec_fop_cnt(this, priv, local);
CHANGELOG_STACK_UNWIND(setattr, frame, op_ret, op_errno, preop_stbuf,
postop_stbuf, xdata);
return 0;
}
int32_t
changelog_setattr(call_frame_t *frame, xlator_t *this, loc_t *loc,
struct iatt *stbuf, int32_t valid, dict_t *xdata)
{
changelog_priv_t *priv = NULL;
changelog_opt_t *co = NULL;
size_t xtra_len = 0;
uuid_t shard_root_gfid = {
0,
};
priv = this->private;
CHANGELOG_NOT_ACTIVE_THEN_GOTO(frame, priv, wind);
CHANGELOG_IF_INTERNAL_FOP_THEN_GOTO(frame, xdata, wind);
/* Do not record META on .shard */
gf_uuid_parse(SHARD_ROOT_GFID, shard_root_gfid);
if (gf_uuid_compare(loc->gfid, shard_root_gfid) == 0) {
goto wind;
}
CHANGELOG_OP_BOUNDARY_CHECK(frame, wind);
CHANGELOG_INIT(this, frame->local, loc->inode, loc->inode->gfid, 1);
if (!frame->local)
goto wind;
co = changelog_get_usable_buffer(frame->local);
if (!co)
goto wind;
CHANGLOG_FILL_FOP_NUMBER(co, frame->root->op, fop_fn, xtra_len);
changelog_set_usable_record_and_length(frame->local, xtra_len, 1);
wind:
changelog_color_fop_and_inc_cnt(this, priv, frame->local);
STACK_WIND(frame, changelog_setattr_cbk, FIRST_CHILD(this),
FIRST_CHILD(this)->fops->setattr, loc, stbuf, valid, xdata);
return 0;
}
/* {f}removexattr */
int32_t
changelog_fremovexattr_cbk(call_frame_t *frame, void *cookie, xlator_t *this,
int32_t op_ret, int32_t op_errno, dict_t *xdata)
{
changelog_priv_t *priv = NULL;
changelog_local_t *local = NULL;
priv = this->private;
local = frame->local;
CHANGELOG_COND_GOTO(priv, ((op_ret < 0) || !local), unwind);
changelog_update(this, priv, local, CHANGELOG_TYPE_METADATA_XATTR);
unwind:
changelog_dec_fop_cnt(this, priv, local);
CHANGELOG_STACK_UNWIND(fremovexattr, frame, op_ret, op_errno, xdata);
return 0;
}
int32_t
changelog_fremovexattr(call_frame_t *frame, xlator_t *this, fd_t *fd,
const char *name, dict_t *xdata)
{
changelog_priv_t *priv = NULL;
changelog_opt_t *co = NULL;
size_t xtra_len = 0;
priv = this->private;
CHANGELOG_NOT_ACTIVE_THEN_GOTO(frame, priv, wind);
CHANGELOG_OP_BOUNDARY_CHECK(frame, wind);
CHANGELOG_INIT(this, frame->local, fd->inode, fd->inode->gfid, 1);
co = changelog_get_usable_buffer(frame->local);
if (!co)
goto wind;
CHANGLOG_FILL_FOP_NUMBER(co, frame->root->op, fop_fn, xtra_len);
changelog_set_usable_record_and_length(frame->local, xtra_len, 1);
wind:
changelog_color_fop_and_inc_cnt(this, priv, frame->local);
STACK_WIND(frame, changelog_fremovexattr_cbk, FIRST_CHILD(this),
FIRST_CHILD(this)->fops->fremovexattr, fd, name, xdata);
return 0;
}
int32_t
changelog_removexattr_cbk(call_frame_t *frame, void *cookie, xlator_t *this,
int32_t op_ret, int32_t op_errno, dict_t *xdata)
{
changelog_priv_t *priv = NULL;
changelog_local_t *local = NULL;
priv = this->private;
local = frame->local;
CHANGELOG_COND_GOTO(priv, ((op_ret < 0) || !local), unwind);
changelog_update(this, priv, local, CHANGELOG_TYPE_METADATA_XATTR);
unwind:
changelog_dec_fop_cnt(this, priv, local);
CHANGELOG_STACK_UNWIND(removexattr, frame, op_ret, op_errno, xdata);
return 0;
}
int32_t
changelog_removexattr(call_frame_t *frame, xlator_t *this, loc_t *loc,
const char *name, dict_t *xdata)
{
changelog_priv_t *priv = NULL;
changelog_opt_t *co = NULL;
size_t xtra_len = 0;
priv = this->private;
CHANGELOG_NOT_ACTIVE_THEN_GOTO(frame, priv, wind);
CHANGELOG_OP_BOUNDARY_CHECK(frame, wind);
CHANGELOG_INIT(this, frame->local, loc->inode, loc->inode->gfid, 1);
co = changelog_get_usable_buffer(frame->local);
if (!co)
goto wind;
CHANGLOG_FILL_FOP_NUMBER(co, frame->root->op, fop_fn, xtra_len);
changelog_set_usable_record_and_length(frame->local, xtra_len, 1);
wind:
changelog_color_fop_and_inc_cnt(this, priv, frame->local);
STACK_WIND(frame, changelog_removexattr_cbk, FIRST_CHILD(this),
FIRST_CHILD(this)->fops->removexattr, loc, name, xdata);
return 0;
}
/* {f}setxattr */
int32_t
changelog_setxattr_cbk(call_frame_t *frame, void *cookie, xlator_t *this,
int32_t op_ret, int32_t op_errno, dict_t *xdata)
{
changelog_priv_t *priv = NULL;
changelog_local_t *local = NULL;
priv = this->private;
local = frame->local;
CHANGELOG_COND_GOTO(priv, ((op_ret < 0) || !local), unwind);
changelog_update(this, priv, local, CHANGELOG_TYPE_METADATA_XATTR);
unwind:
changelog_dec_fop_cnt(this, priv, local);
CHANGELOG_STACK_UNWIND(setxattr, frame, op_ret, op_errno, xdata);
return 0;
}
/* changelog_handle_virtual_xattr:
* Handles virtual setxattr 'glusterfs.geo-rep.trigger-sync' on files.
* Following is the behaviour based on the value of xattr.
* 1: Captures only DATA entry in changelog.
* 2: Tries to captures both ENTRY and DATA entry in
* changelog. If failed to get pargfid, only DATA
* entry is captured.
* any other value: ENOTSUP is returned.
*/
static void
changelog_handle_virtual_xattr(call_frame_t *frame, xlator_t *this, loc_t *loc,
dict_t *dict)
{
changelog_priv_t *priv = NULL;
changelog_local_t *local = NULL;
int32_t value = 0;
int ret = 0;
int dict_ret = 0;
gf_boolean_t valid = _gf_false;
priv = this->private;
GF_ASSERT(priv);
dict_ret = dict_get_int32(dict, GF_XATTR_TRIGGER_SYNC, &value);
if ((dict_ret == 0 && value == 1) && ((loc->inode->ia_type == IA_IFDIR) ||
(loc->inode->ia_type == IA_IFREG)))
valid = _gf_true;
if (valid) {
ret = changelog_fill_entry_buf(frame, this, loc, &local);
if (ret) {
gf_smsg(this->name, GF_LOG_INFO, 0, CHANGELOG_MSG_ENTRY_BUF_INFO,
"Entry cannot be"
" captured for gfid, Capturing DATA"
" entry.",
"gfid=%s", uuid_utoa(loc->inode->gfid), NULL);
goto unwind;
}
changelog_update(this, priv, local, CHANGELOG_TYPE_ENTRY);
unwind:
/* Capture DATA only if it's a file. */
if (loc->inode->ia_type != IA_IFDIR)
changelog_update(this, priv, frame->local, CHANGELOG_TYPE_DATA);
/* Assign local to prev_entry, so unwind will take
* care of cleanup. */
((changelog_local_t *)(frame->local))->prev_entry = local;
CHANGELOG_STACK_UNWIND(setxattr, frame, 0, 0, NULL);
return;
} else {
CHANGELOG_STACK_UNWIND(setxattr, frame, -1, ENOTSUP, NULL);
return;
}
}
int32_t
changelog_setxattr(call_frame_t *frame, xlator_t *this, loc_t *loc,
dict_t *dict, int32_t flags, dict_t *xdata)
{
changelog_priv_t *priv = NULL;
changelog_opt_t *co = NULL;
size_t xtra_len = 0;
priv = this->private;
CHANGELOG_NOT_ACTIVE_THEN_GOTO(frame, priv, wind);
CHANGELOG_OP_BOUNDARY_CHECK(frame, wind);
CHANGELOG_INIT(this, frame->local, loc->inode, loc->inode->gfid, 1);
/* On setting this virtual xattr on a file, an explicit data
* sync is triggered from geo-rep as CREATE|DATA entry is
* recorded in changelog based on xattr value.
*/
if (dict_get(dict, GF_XATTR_TRIGGER_SYNC)) {
changelog_handle_virtual_xattr(frame, this, loc, dict);
return 0;
}
co = changelog_get_usable_buffer(frame->local);
if (!co)
goto wind;
CHANGLOG_FILL_FOP_NUMBER(co, frame->root->op, fop_fn, xtra_len);
changelog_set_usable_record_and_length(frame->local, xtra_len, 1);
wind:
changelog_color_fop_and_inc_cnt(this, priv, frame->local);
STACK_WIND(frame, changelog_setxattr_cbk, FIRST_CHILD(this),
FIRST_CHILD(this)->fops->setxattr, loc, dict, flags, xdata);
return 0;
}
int32_t
changelog_fsetxattr_cbk(call_frame_t *frame, void *cookie, xlator_t *this,
int32_t op_ret, int32_t op_errno, dict_t *xdata)
{
changelog_priv_t *priv = NULL;
changelog_local_t *local = NULL;
priv = this->private;
local = frame->local;
CHANGELOG_COND_GOTO(priv, ((op_ret < 0) || !local), unwind);
changelog_update(this, priv, local, CHANGELOG_TYPE_METADATA_XATTR);
unwind:
changelog_dec_fop_cnt(this, priv, local);
CHANGELOG_STACK_UNWIND(fsetxattr, frame, op_ret, op_errno, xdata);
return 0;
}
int32_t
changelog_fsetxattr(call_frame_t *frame, xlator_t *this, fd_t *fd, dict_t *dict,
int32_t flags, dict_t *xdata)
{
changelog_priv_t *priv = NULL;
changelog_opt_t *co = NULL;
size_t xtra_len = 0;
priv = this->private;
CHANGELOG_NOT_ACTIVE_THEN_GOTO(frame, priv, wind);
CHANGELOG_IF_INTERNAL_FOP_THEN_GOTO(frame, xdata, wind);
CHANGELOG_OP_BOUNDARY_CHECK(frame, wind);
CHANGELOG_INIT(this, frame->local, fd->inode, fd->inode->gfid, 1);
co = changelog_get_usable_buffer(frame->local);
if (!co)
goto wind;
CHANGLOG_FILL_FOP_NUMBER(co, frame->root->op, fop_fn, xtra_len);
changelog_set_usable_record_and_length(frame->local, xtra_len, 1);
wind:
changelog_color_fop_and_inc_cnt(this, priv, frame->local);
STACK_WIND(frame, changelog_fsetxattr_cbk, FIRST_CHILD(this),
FIRST_CHILD(this)->fops->fsetxattr, fd, dict, flags, xdata);
return 0;
}
int32_t
changelog_xattrop_cbk(call_frame_t *frame, void *cookie, xlator_t *this,
int32_t op_ret, int32_t op_errno, dict_t *xattr,
dict_t *xdata)
{
changelog_priv_t *priv = NULL;
changelog_local_t *local = NULL;
priv = this->private;
local = frame->local;
CHANGELOG_COND_GOTO(priv, ((op_ret < 0) || !local), unwind);
changelog_update(this, priv, local, CHANGELOG_TYPE_METADATA);
unwind:
changelog_dec_fop_cnt(this, priv, local);
CHANGELOG_STACK_UNWIND(xattrop, frame, op_ret, op_errno, xattr, xdata);
return 0;
}
int32_t
changelog_xattrop(call_frame_t *frame, xlator_t *this, loc_t *loc,
gf_xattrop_flags_t optype, dict_t *xattr, dict_t *xdata)
{
changelog_priv_t *priv = NULL;
changelog_opt_t *co = NULL;
size_t xtra_len = 0;
int ret = 0;
void *size_attr = NULL;
priv = this->private;
CHANGELOG_NOT_ACTIVE_THEN_GOTO(frame, priv, wind);
ret = dict_get_ptr(xattr, GF_XATTR_SHARD_FILE_SIZE, &size_attr);
if (ret)
goto wind;
CHANGELOG_OP_BOUNDARY_CHECK(frame, wind);
CHANGELOG_INIT(this, frame->local, loc->inode, loc->inode->gfid, 1);
co = changelog_get_usable_buffer(frame->local);
if (!co)
goto wind;
CHANGLOG_FILL_FOP_NUMBER(co, frame->root->op, fop_fn, xtra_len);
changelog_set_usable_record_and_length(frame->local, xtra_len, 1);
wind:
changelog_color_fop_and_inc_cnt(this, priv, frame->local);
STACK_WIND(frame, changelog_xattrop_cbk, FIRST_CHILD(this),
FIRST_CHILD(this)->fops->xattrop, loc, optype, xattr, xdata);
return 0;
}
int32_t
changelog_fxattrop_cbk(call_frame_t *frame, void *cookie, xlator_t *this,
int32_t op_ret, int32_t op_errno, dict_t *xattr,
dict_t *xdata)
{
changelog_priv_t *priv = NULL;
changelog_local_t *local = NULL;
priv = this->private;
local = frame->local;
CHANGELOG_COND_GOTO(priv, ((op_ret < 0) || !local), unwind);
changelog_update(this, priv, local, CHANGELOG_TYPE_METADATA_XATTR);
unwind:
changelog_dec_fop_cnt(this, priv, local);
CHANGELOG_STACK_UNWIND(fxattrop, frame, op_ret, op_errno, xattr, xdata);
return 0;
}
int32_t
changelog_fxattrop(call_frame_t *frame, xlator_t *this, fd_t *fd,
gf_xattrop_flags_t optype, dict_t *xattr, dict_t *xdata)
{
changelog_priv_t *priv = NULL;
changelog_opt_t *co = NULL;
size_t xtra_len = 0;
void *size_attr = NULL;
int ret = 0;
priv = this->private;
CHANGELOG_NOT_ACTIVE_THEN_GOTO(frame, priv, wind);
ret = dict_get_ptr(xattr, GF_XATTR_SHARD_FILE_SIZE, &size_attr);
if (ret)
goto wind;
CHANGELOG_OP_BOUNDARY_CHECK(frame, wind);
CHANGELOG_INIT(this, frame->local, fd->inode, fd->inode->gfid, 1);
co = changelog_get_usable_buffer(frame->local);
if (!co)
goto wind;
CHANGLOG_FILL_FOP_NUMBER(co, frame->root->op, fop_fn, xtra_len);
changelog_set_usable_record_and_length(frame->local, xtra_len, 1);
wind:
changelog_color_fop_and_inc_cnt(this, priv, frame->local);
STACK_WIND(frame, changelog_fxattrop_cbk, FIRST_CHILD(this),
FIRST_CHILD(this)->fops->fxattrop, fd, optype, xattr, xdata);
return 0;
}
/* }}} */
/* Data modification fops - TYPE I */
/* {{{ */
/* {f}truncate() */
int32_t
changelog_truncate_cbk(call_frame_t *frame, void *cookie, xlator_t *this,
int32_t op_ret, int32_t op_errno, struct iatt *prebuf,
struct iatt *postbuf, dict_t *xdata)
{
changelog_priv_t *priv = NULL;
changelog_local_t *local = NULL;
priv = this->private;
local = frame->local;
CHANGELOG_COND_GOTO(priv, ((op_ret < 0) || !local), unwind);
changelog_update(this, priv, local, CHANGELOG_TYPE_DATA);
unwind:
changelog_dec_fop_cnt(this, priv, local);
CHANGELOG_STACK_UNWIND(truncate, frame, op_ret, op_errno, prebuf, postbuf,
xdata);
return 0;
}
int32_t
changelog_truncate(call_frame_t *frame, xlator_t *this, loc_t *loc,
off_t offset, dict_t *xdata)
{
changelog_priv_t *priv = NULL;
priv = this->private;
CHANGELOG_NOT_ACTIVE_THEN_GOTO(frame, priv, wind);
CHANGELOG_INIT(this, frame->local, loc->inode, loc->inode->gfid, 0);
LOCK(&priv->c_snap_lock);
{
if (priv->c_snap_fd != -1 && priv->barrier_enabled == _gf_true) {
changelog_snap_handle_ascii_change(
this, &(((changelog_local_t *)(frame->local))->cld));
}
}
UNLOCK(&priv->c_snap_lock);
wind:
changelog_color_fop_and_inc_cnt(this, priv, frame->local);
STACK_WIND(frame, changelog_truncate_cbk, FIRST_CHILD(this),
FIRST_CHILD(this)->fops->truncate, loc, offset, xdata);
return 0;
}
int32_t
changelog_ftruncate_cbk(call_frame_t *frame, void *cookie, xlator_t *this,
int32_t op_ret, int32_t op_errno, struct iatt *prebuf,
struct iatt *postbuf, dict_t *xdata)
{
changelog_priv_t *priv = NULL;
changelog_local_t *local = NULL;
priv = this->private;
local = frame->local;
CHANGELOG_COND_GOTO(priv, ((op_ret < 0) || !local), unwind);
changelog_update(this, priv, local, CHANGELOG_TYPE_DATA);
unwind:
changelog_dec_fop_cnt(this, priv, local);
CHANGELOG_STACK_UNWIND(ftruncate, frame, op_ret, op_errno, prebuf, postbuf,
xdata);
return 0;
}
int32_t
changelog_ftruncate(call_frame_t *frame, xlator_t *this, fd_t *fd, off_t offset,
dict_t *xdata)
{
changelog_priv_t *priv = NULL;
priv = this->private;
CHANGELOG_NOT_ACTIVE_THEN_GOTO(frame, priv, wind);
CHANGELOG_INIT(this, frame->local, fd->inode, fd->inode->gfid, 0);
LOCK(&priv->c_snap_lock);
{
if (priv->c_snap_fd != -1 && priv->barrier_enabled == _gf_true) {
changelog_snap_handle_ascii_change(
this, &(((changelog_local_t *)(frame->local))->cld));
}
}
UNLOCK(&priv->c_snap_lock);
wind:
changelog_color_fop_and_inc_cnt(this, priv, frame->local);
STACK_WIND(frame, changelog_ftruncate_cbk, FIRST_CHILD(this),
FIRST_CHILD(this)->fops->ftruncate, fd, offset, xdata);
return 0;
}
/* writev() */
int32_t
changelog_writev_cbk(call_frame_t *frame, void *cookie, xlator_t *this,
int32_t op_ret, int32_t op_errno, struct iatt *prebuf,
struct iatt *postbuf, dict_t *xdata)
{
changelog_priv_t *priv = NULL;
changelog_local_t *local = NULL;
priv = this->private;
local = frame->local;
CHANGELOG_COND_GOTO(priv, ((op_ret <= 0) || !local), unwind);
changelog_update(this, priv, local, CHANGELOG_TYPE_DATA);
unwind:
changelog_dec_fop_cnt(this, priv, local);
CHANGELOG_STACK_UNWIND(writev, frame, op_ret, op_errno, prebuf, postbuf,
xdata);
return 0;
}
int32_t
changelog_writev(call_frame_t *frame, xlator_t *this, fd_t *fd,
struct iovec *vector, int32_t count, off_t offset,
uint32_t flags, struct iobref *iobref, dict_t *xdata)
{
changelog_priv_t *priv = NULL;
priv = this->private;
CHANGELOG_NOT_ACTIVE_THEN_GOTO(frame, priv, wind);
CHANGELOG_INIT(this, frame->local, fd->inode, fd->inode->gfid, 0);
LOCK(&priv->c_snap_lock);
{
if (priv->c_snap_fd != -1 && priv->barrier_enabled == _gf_true) {
changelog_snap_handle_ascii_change(
this, &(((changelog_local_t *)(frame->local))->cld));
}
}
UNLOCK(&priv->c_snap_lock);
wind:
changelog_color_fop_and_inc_cnt(this, priv, frame->local);
STACK_WIND(frame, changelog_writev_cbk, FIRST_CHILD(this),
FIRST_CHILD(this)->fops->writev, fd, vector, count, offset,
flags, iobref, xdata);
return 0;
}
/* }}} */
/* open, release and other beasts */
/* {{{ */
int
changelog_open_cbk(call_frame_t *frame, void *cookie, xlator_t *this,
int op_ret, int op_errno, fd_t *fd, dict_t *xdata)
{
int ret = 0;
changelog_priv_t *priv = NULL;
changelog_event_t ev = {
0,
};
gf_boolean_t logopen = _gf_false;
priv = this->private;
if (frame->local) {
frame->local = NULL;
logopen = _gf_true;
}
CHANGELOG_COND_GOTO(priv, ((op_ret < 0) || !logopen), unwind);
/* fill the event structure */
ev.ev_type = CHANGELOG_OP_TYPE_OPEN;
gf_uuid_copy(ev.u.open.gfid, fd->inode->gfid);
ev.u.open.flags = fd->flags;
changelog_dispatch_event(this, priv, &ev);
if (changelog_ev_selected(this, &priv->ev_selection,
CHANGELOG_OP_TYPE_RELEASE)) {
ret = fd_ctx_set(fd, this, (uint64_t)(long)0x1);
if (ret)
gf_msg(this->name, GF_LOG_WARNING, 0, CHANGELOG_MSG_SET_FD_CONTEXT,
"could not set fd context (for release cbk)");
}
unwind:
CHANGELOG_STACK_UNWIND(open, frame, op_ret, op_errno, fd, xdata);
return 0;
}
int
changelog_open(call_frame_t *frame, xlator_t *this, loc_t *loc, int flags,
fd_t *fd, dict_t *xdata)
{
changelog_priv_t *priv = NULL;
priv = this->private;
CHANGELOG_NOT_ACTIVE_THEN_GOTO(frame, priv, wind);
frame->local = (void *)0x1; /* do not dereference in ->cbk */
wind:
STACK_WIND(frame, changelog_open_cbk, FIRST_CHILD(this),
FIRST_CHILD(this)->fops->open, loc, flags, fd, xdata);
return 0;
}
/* }}} */
/* {{{ */
/* }}} */
int32_t
_changelog_generic_dispatcher(dict_t *dict, char *key, data_t *value,
void *data)
{
xlator_t *this = NULL;
changelog_priv_t *priv = NULL;
this = data;
priv = this->private;
changelog_dispatch_event(this, priv, (changelog_event_t *)value->data);
return 0;
}
/**
* changelog ipc dispatches events, pointers of which are passed in
* @xdata. Dispatching is orderless (whatever order dict_foreach()
* traverses the dictionary).
*/
int32_t
changelog_ipc(call_frame_t *frame, xlator_t *this, int32_t op, dict_t *xdata)
{
if (op != GF_IPC_TARGET_CHANGELOG)
goto wind;
/* it's for us, do the job */
if (xdata)
(void)dict_foreach(xdata, _changelog_generic_dispatcher, this);
STACK_UNWIND_STRICT(ipc, frame, 0, 0, NULL);
return 0;
wind:
STACK_WIND(frame, default_ipc_cbk, FIRST_CHILD(this),
FIRST_CHILD(this)->fops->ipc, op, xdata);
return 0;
}
/* {{{ */
int32_t
changelog_release(xlator_t *this, fd_t *fd)
{
changelog_event_t ev = {
0,
};
changelog_priv_t *priv = NULL;
priv = this->private;
ev.ev_type = CHANGELOG_OP_TYPE_RELEASE;
gf_uuid_copy(ev.u.release.gfid, fd->inode->gfid);
changelog_dispatch_event(this, priv, &ev);
(void)fd_ctx_del(fd, this, NULL);
return 0;
}
/* }}} */
/**
* The
* - @init ()
* - @fini ()
* - @reconfigure ()
* ... and helper routines
*/
/**
* needed if there are more operation modes in the future.
*/
static void
changelog_assign_opmode(changelog_priv_t *priv, char *mode)
{
if (strncmp(mode, "realtime", 8) == 0) {
priv->op_mode = CHANGELOG_MODE_RT;
}
}
static void
changelog_assign_encoding(changelog_priv_t *priv, char *enc)
{
if (strncmp(enc, "binary", 6) == 0) {
priv->encode_mode = CHANGELOG_ENCODE_BINARY;
} else if (strncmp(enc, "ascii", 5) == 0) {
priv->encode_mode = CHANGELOG_ENCODE_ASCII;
}
}
static void
changelog_assign_barrier_timeout(changelog_priv_t *priv, uint32_t timeout)
{
LOCK(&priv->lock);
{
priv->timeout.tv_sec = timeout;
}
UNLOCK(&priv->lock);
}
/* cleanup any helper threads that are running */
static void
changelog_cleanup_helper_threads(xlator_t *this, changelog_priv_t *priv)
{
if (priv->cr.rollover_th) {
(void)changelog_thread_cleanup(this, priv->cr.rollover_th);
priv->cr.rollover_th = 0;
}
if (priv->cf.fsync_th) {
(void)changelog_thread_cleanup(this, priv->cf.fsync_th);
priv->cf.fsync_th = 0;
}
}
/* spawn helper thread; cleaning up in case of errors */
static int
changelog_spawn_helper_threads(xlator_t *this, changelog_priv_t *priv)
{
int ret = 0;
/* Geo-Rep snapshot dependency:
*
* To implement explicit rollover of changlog journal on barrier
* notification, a pipe is created to communicate between
* 'changelog_rollover' thread and changelog main thread. The select
* call used to wait till roll-over time in changelog_rollover thread
* is modified to wait on read end of the pipe. When barrier
* notification comes (i.e, in 'reconfigure'), select in
* changelog_rollover thread is woken up explicitly by writing into
* the write end of the pipe in 'reconfigure'.
*/
priv->cr.notify = _gf_false;
priv->cr.this = this;
ret = gf_thread_create(&priv->cr.rollover_th, NULL, changelog_rollover,
priv, "clogro");
if (ret)
goto out;
if (priv->fsync_interval) {
priv->cf.this = this;
ret = gf_thread_create(&priv->cf.fsync_th, NULL, changelog_fsync_thread,
priv, "clogfsyn");
}
if (ret)
changelog_cleanup_helper_threads(this, priv);
out:
return ret;
}
int
notify(xlator_t *this, int event, void *data, ...)
{
changelog_priv_t *priv = NULL;
dict_t *dict = NULL;
char buf[1] = {1};
int barrier = DICT_DEFAULT;
gf_boolean_t bclean_req = _gf_false;
int ret = 0;
int ret1 = 0;
struct list_head queue = {
0,
};
uint64_t xprtcnt = 0;
uint64_t clntcnt = 0;
changelog_clnt_t *conn = NULL;
gf_boolean_t cleanup_notify = _gf_false;
INIT_LIST_HEAD(&queue);
priv = this->private;
if (!priv)
goto out;
if (event == GF_EVENT_PARENT_DOWN) {
priv->victim = data;
gf_log(this->name, GF_LOG_INFO,
"cleanup changelog rpc connection of brick %s",
priv->victim->name);
this->cleanup_starting = 1;
changelog_destroy_rpc_listner(this, priv);
conn = &priv->connections;
if (conn)
changelog_ev_cleanup_connections(this, conn);
xprtcnt = GF_ATOMIC_GET(priv->xprtcnt);
clntcnt = GF_ATOMIC_GET(priv->clntcnt);
if (!xprtcnt && !clntcnt) {
LOCK(&priv->lock);
{
cleanup_notify = priv->notify_down;
priv->notify_down = _gf_true;
}
UNLOCK(&priv->lock);
if (!cleanup_notify)
default_notify(this, GF_EVENT_PARENT_DOWN, data);
}
goto out;
}
if (event == GF_EVENT_TRANSLATOR_OP) {
dict = data;
barrier = dict_get_str_boolean(dict, "barrier", DICT_DEFAULT);
switch (barrier) {
case DICT_ERROR:
gf_msg(this->name, GF_LOG_ERROR, 0,
CHANGELOG_MSG_DICT_GET_FAILED,
"Barrier dict_get_str_boolean failed");
ret = -1;
goto out;
case BARRIER_OFF:
gf_msg(this->name, GF_LOG_INFO, 0, CHANGELOG_MSG_BARRIER_INFO,
"Barrier off notification");
CHANGELOG_NOT_ON_THEN_GOTO(priv, ret, out);
LOCK(&priv->c_snap_lock);
{
changelog_snap_logging_stop(this, priv);
}
UNLOCK(&priv->c_snap_lock);
LOCK(&priv->bflags.lock);
{
if (priv->bflags.barrier_ext == _gf_false)
ret = -1;
}
UNLOCK(&priv->bflags.lock);
if (ret == -1) {
gf_msg(this->name, GF_LOG_ERROR, 0,
CHANGELOG_MSG_BARRIER_ERROR,
"Received another barrier off"
" notification while already off");
goto out;
}
/* Stop changelog barrier and dequeue all fops */
LOCK(&priv->lock);
{
if (priv->barrier_enabled == _gf_true)
__chlog_barrier_disable(this, &queue);
else
ret = -1;
}
UNLOCK(&priv->lock);
/* If ret = -1, then changelog barrier is already
* disabled because of error or timeout.
*/
if (ret == 0) {
chlog_barrier_dequeue_all(this, &queue);
gf_msg(this->name, GF_LOG_INFO, 0,
CHANGELOG_MSG_BARRIER_INFO,
"Disabled changelog barrier");
} else {
gf_msg(this->name, GF_LOG_ERROR, 0,
CHANGELOG_MSG_BARRIER_ERROR,
"Changelog barrier already disabled");
}
LOCK(&priv->bflags.lock);
{
priv->bflags.barrier_ext = _gf_false;
}
UNLOCK(&priv->bflags.lock);
goto out;
case BARRIER_ON:
gf_msg(this->name, GF_LOG_INFO, 0, CHANGELOG_MSG_BARRIER_INFO,
"Barrier on notification");
CHANGELOG_NOT_ON_THEN_GOTO(priv, ret, out);
LOCK(&priv->c_snap_lock);
{
changelog_snap_logging_start(this, priv);
}
UNLOCK(&priv->c_snap_lock);
LOCK(&priv->bflags.lock);
{
if (priv->bflags.barrier_ext == _gf_true)
ret = -1;
else
priv->bflags.barrier_ext = _gf_true;
}
UNLOCK(&priv->bflags.lock);
if (ret == -1) {
gf_msg(this->name, GF_LOG_ERROR, 0,
CHANGELOG_MSG_BARRIER_ERROR,
"Received another barrier on"
"notification when last one is"
"not served yet");
goto out;
}
ret = pthread_mutex_lock(&priv->bn.bnotify_mutex);
CHANGELOG_PTHREAD_ERROR_HANDLE_1(ret, out, bclean_req);
{
priv->bn.bnotify = _gf_true;
}
ret = pthread_mutex_unlock(&priv->bn.bnotify_mutex);
CHANGELOG_PTHREAD_ERROR_HANDLE_1(ret, out, bclean_req);
/* Start changelog barrier */
LOCK(&priv->lock);
{
ret = __chlog_barrier_enable(this, priv);
}
UNLOCK(&priv->lock);
if (ret == -1) {
changelog_barrier_cleanup(this, priv, &queue);
goto out;
}
gf_msg(this->name, GF_LOG_INFO, 0, CHANGELOG_MSG_BARRIER_INFO,
"Enabled changelog barrier");
ret = changelog_barrier_notify(priv, buf);
if (ret) {
gf_msg(this->name, GF_LOG_ERROR, 0,
CHANGELOG_MSG_WRITE_FAILED,
"Explicit roll over: write failed");
changelog_barrier_cleanup(this, priv, &queue);
ret = -1;
goto out;
}
ret = pthread_mutex_lock(&priv->bn.bnotify_mutex);
CHANGELOG_PTHREAD_ERROR_HANDLE_1(ret, out, bclean_req);
{
/* The while condition check is required here to
* handle spurious wakeup of cond wait that can
* happen with pthreads. See man page */
while (priv->bn.bnotify == _gf_true) {
ret = pthread_cond_wait(&priv->bn.bnotify_cond,
&priv->bn.bnotify_mutex);
CHANGELOG_PTHREAD_ERROR_HANDLE_1(ret, out, bclean_req);
}
if (priv->bn.bnotify_error == _gf_true) {
ret = -1;
priv->bn.bnotify_error = _gf_false;
}
}
ret1 = pthread_mutex_unlock(&priv->bn.bnotify_mutex);
CHANGELOG_PTHREAD_ERROR_HANDLE_1(ret1, out, bclean_req);
gf_msg(this->name, GF_LOG_INFO, 0, CHANGELOG_MSG_BNOTIFY_INFO,
"Woke up: bnotify conditional wait");
goto out;
case DICT_DEFAULT:
gf_msg(this->name, GF_LOG_ERROR, 0,
CHANGELOG_MSG_DICT_GET_FAILED, "barrier key not found");
ret = -1;
goto out;
default:
gf_msg(this->name, GF_LOG_ERROR, EINVAL,
CHANGELOG_MSG_DICT_GET_FAILED,
"Something went bad in dict_get_str_boolean");
ret = -1;
goto out;
}
} else {
ret = default_notify(this, event, data);
}
out:
if (bclean_req)
changelog_barrier_cleanup(this, priv, &queue);
return ret;
}
int32_t
mem_acct_init(xlator_t *this)
{
int ret = -1;
if (!this)
return ret;
ret = xlator_mem_acct_init(this, gf_changelog_mt_end + 1);
if (ret != 0) {
gf_msg(this->name, GF_LOG_WARNING, ENOMEM, CHANGELOG_MSG_NO_MEMORY,
"Memory accounting"
" init failed");
return ret;
}
return ret;
}
static int
changelog_init(xlator_t *this, changelog_priv_t *priv)
{
int i = 0;
int ret = -1;
struct timeval tv = {
0,
};
changelog_log_data_t cld = {
0,
};
ret = gettimeofday(&tv, NULL);
if (ret) {
gf_msg(this->name, GF_LOG_ERROR, errno,
CHANGELOG_MSG_GET_TIME_OP_FAILED, "gettimeofday() failure");
goto out;
}
priv->slice.tv_start = tv;
priv->maps[CHANGELOG_TYPE_DATA] = "D ";
priv->maps[CHANGELOG_TYPE_METADATA] = "M ";
priv->maps[CHANGELOG_TYPE_METADATA_XATTR] = "M ";
priv->maps[CHANGELOG_TYPE_ENTRY] = "E ";
for (; i < CHANGELOG_MAX_TYPE; i++) {
/* start with version 1 */
priv->slice.changelog_version[i] = 1;
}
if (!priv->active)
return ret;
/**
* start with a fresh changelog file every time. this is done
* in case there was an encoding change. so... things are kept
* simple here.
*/
ret = changelog_fill_rollover_data(&cld, _gf_false);
if (ret)
goto out;
ret = htime_open(this, priv, cld.cld_roll_time);
/* call htime open with cld's rollover_time */
if (ret)
goto out;
LOCK(&priv->lock);
{
ret = changelog_inject_single_event(this, priv, &cld);
}
UNLOCK(&priv->lock);
/* ... and finally spawn the helpers threads */
ret = changelog_spawn_helper_threads(this, priv);
out:
return ret;
}
/**
* Init barrier related condition variables and locks
*/
static int
changelog_barrier_pthread_init(xlator_t *this, changelog_priv_t *priv)
{
gf_boolean_t bn_mutex_init = _gf_false;
gf_boolean_t bn_cond_init = _gf_false;
gf_boolean_t dm_mutex_black_init = _gf_false;
gf_boolean_t dm_cond_black_init = _gf_false;
gf_boolean_t dm_mutex_white_init = _gf_false;
gf_boolean_t dm_cond_white_init = _gf_false;
gf_boolean_t cr_mutex_init = _gf_false;
gf_boolean_t cr_cond_init = _gf_false;
int ret = 0;
if ((ret = pthread_mutex_init(&priv->bn.bnotify_mutex, NULL)) != 0) {
gf_smsg(this->name, GF_LOG_ERROR, errno,
CHANGELOG_MSG_PTHREAD_MUTEX_INIT_FAILED,
"bnotify pthread_mutex_init failed", "ret=%d", ret, NULL);
ret = -1;
goto out;
}
bn_mutex_init = _gf_true;
if ((ret = pthread_cond_init(&priv->bn.bnotify_cond, NULL)) != 0) {
gf_smsg(this->name, GF_LOG_ERROR, errno,
CHANGELOG_MSG_PTHREAD_COND_INIT_FAILED,
"bnotify pthread_cond_init failed", "ret=%d", ret, NULL);
ret = -1;
goto out;
}
bn_cond_init = _gf_true;
if ((ret = pthread_mutex_init(&priv->dm.drain_black_mutex, NULL)) != 0) {
gf_smsg(this->name, GF_LOG_ERROR, errno,
CHANGELOG_MSG_PTHREAD_MUTEX_INIT_FAILED,
"drain_black pthread_mutex_init failed", "ret=%d", ret, NULL);
ret = -1;
goto out;
}
dm_mutex_black_init = _gf_true;
if ((ret = pthread_cond_init(&priv->dm.drain_black_cond, NULL)) != 0) {
gf_smsg(this->name, GF_LOG_ERROR, errno,
CHANGELOG_MSG_PTHREAD_COND_INIT_FAILED,
"drain_black pthread_cond_init failed", "ret=%d", ret, NULL);
ret = -1;
goto out;
}
dm_cond_black_init = _gf_true;
if ((ret = pthread_mutex_init(&priv->dm.drain_white_mutex, NULL)) != 0) {
gf_smsg(this->name, GF_LOG_ERROR, errno,
CHANGELOG_MSG_PTHREAD_MUTEX_INIT_FAILED,
"drain_white pthread_mutex_init failed", "ret=%d", ret, NULL);
ret = -1;
goto out;
}
dm_mutex_white_init = _gf_true;
if ((ret = pthread_cond_init(&priv->dm.drain_white_cond, NULL)) != 0) {
gf_smsg(this->name, GF_LOG_ERROR, errno,
CHANGELOG_MSG_PTHREAD_COND_INIT_FAILED,
"drain_white pthread_cond_init failed", "ret=%d", ret, NULL);
ret = -1;
goto out;
}
dm_cond_white_init = _gf_true;
if ((pthread_mutex_init(&priv->cr.lock, NULL)) != 0) {
gf_smsg(this->name, GF_LOG_ERROR, errno,
CHANGELOG_MSG_PTHREAD_MUTEX_INIT_FAILED,
"changelog_rollover lock init failed", "ret=%d", ret, NULL);
ret = -1;
goto out;
}
cr_mutex_init = _gf_true;
if ((pthread_cond_init(&priv->cr.cond, NULL)) != 0) {
gf_smsg(this->name, GF_LOG_ERROR, errno,
CHANGELOG_MSG_PTHREAD_COND_INIT_FAILED,
"changelog_rollover cond init failed", "ret=%d", ret, NULL);
ret = -1;
goto out;
}
cr_cond_init = _gf_true;
out:
if (ret) {
if (bn_mutex_init)
pthread_mutex_destroy(&priv->bn.bnotify_mutex);
if (bn_cond_init)
pthread_cond_destroy(&priv->bn.bnotify_cond);
if (dm_mutex_black_init)
pthread_mutex_destroy(&priv->dm.drain_black_mutex);
if (dm_cond_black_init)
pthread_cond_destroy(&priv->dm.drain_black_cond);
if (dm_mutex_white_init)
pthread_mutex_destroy(&priv->dm.drain_white_mutex);
if (dm_cond_white_init)
pthread_cond_destroy(&priv->dm.drain_white_cond);
if (cr_mutex_init)
pthread_mutex_destroy(&priv->cr.lock);
if (cr_cond_init)
pthread_cond_destroy(&priv->cr.cond);
}
return ret;
}
/* Destroy barrier related condition variables and locks */
static void
changelog_barrier_pthread_destroy(changelog_priv_t *priv)
{
pthread_mutex_destroy(&priv->bn.bnotify_mutex);
pthread_cond_destroy(&priv->bn.bnotify_cond);
pthread_mutex_destroy(&priv->dm.drain_black_mutex);
pthread_cond_destroy(&priv->dm.drain_black_cond);
pthread_mutex_destroy(&priv->dm.drain_white_mutex);
pthread_cond_destroy(&priv->dm.drain_white_cond);
pthread_mutex_destroy(&priv->cr.lock);
pthread_cond_destroy(&priv->cr.cond);
LOCK_DESTROY(&priv->bflags.lock);
}
int
reconfigure(xlator_t *this, dict_t *options)
{
int ret = 0;
char *tmp = NULL;
changelog_priv_t *priv = NULL;
gf_boolean_t active_earlier = _gf_true;
gf_boolean_t active_now = _gf_true;
changelog_time_slice_t *slice = NULL;
changelog_log_data_t cld = {
0,
};
char htime_dir[PATH_MAX] = {
0,
};
char csnap_dir[PATH_MAX] = {
0,
};
struct timeval tv = {
0,
};
uint32_t timeout = 0;
priv = this->private;
if (!priv)
goto out;
ret = -1;
active_earlier = priv->active;
/* first stop the rollover and the fsync thread */
changelog_cleanup_helper_threads(this, priv);
GF_OPTION_RECONF("changelog-dir", tmp, options, str, out);
if (!tmp) {
gf_msg(this->name, GF_LOG_ERROR, 0, CHANGELOG_MSG_DIR_OPTIONS_NOT_SET,
"\"changelog-dir\" option is not set");
goto out;
}
GF_FREE(priv->changelog_dir);
priv->changelog_dir = gf_strdup(tmp);
if (!priv->changelog_dir)
goto out;
ret = mkdir_p(priv->changelog_dir, 0600, _gf_true);
if (ret)
goto out;
CHANGELOG_FILL_HTIME_DIR(priv->changelog_dir, htime_dir);
ret = mkdir_p(htime_dir, 0600, _gf_true);
if (ret)
goto out;
CHANGELOG_FILL_CSNAP_DIR(priv->changelog_dir, csnap_dir);
ret = mkdir_p(csnap_dir, 0600, _gf_true);
if (ret)
goto out;
GF_OPTION_RECONF("changelog", active_now, options, bool, out);
/**
* changelog_handle_change() handles changes that could possibly
* have been submit changes before changelog deactivation.
*/
if (!active_now)
priv->active = _gf_false;
GF_OPTION_RECONF("op-mode", tmp, options, str, out);
changelog_assign_opmode(priv, tmp);
tmp = NULL;
GF_OPTION_RECONF("encoding", tmp, options, str, out);
changelog_assign_encoding(priv, tmp);
GF_OPTION_RECONF("rollover-time", priv->rollover_time, options, int32, out);
GF_OPTION_RECONF("fsync-interval", priv->fsync_interval, options, int32,
out);
GF_OPTION_RECONF("changelog-barrier-timeout", timeout, options, time, out);
changelog_assign_barrier_timeout(priv, timeout);
GF_OPTION_RECONF("capture-del-path", priv->capture_del_path, options, bool,
out);
if (active_now || active_earlier) {
ret = changelog_fill_rollover_data(&cld, !active_now);
if (ret)
goto out;
slice = &priv->slice;
LOCK(&priv->lock);
{
ret = changelog_inject_single_event(this, priv, &cld);
if (!ret && active_now)
SLICE_VERSION_UPDATE(slice);
}
UNLOCK(&priv->lock);
if (ret)
goto out;
if (active_now) {
if (!active_earlier) {
gf_msg(this->name, GF_LOG_INFO, 0, CHANGELOG_MSG_HTIME_INFO,
"Reconfigure: Changelog Enable");
if (gettimeofday(&tv, NULL)) {
gf_msg(this->name, GF_LOG_ERROR, 0,
CHANGELOG_MSG_HTIME_ERROR, "unable to fetch htime");
ret = -1;
goto out;
}
htime_create(this, priv, tv.tv_sec);
}
ret = changelog_spawn_helper_threads(this, priv);
}
}
out:
if (ret) {
/* TODO */
} else {
gf_msg_debug(this->name, 0, "changelog reconfigured");
if (active_now && priv)
priv->active = _gf_true;
}
return ret;
}
static void
changelog_freeup_options(xlator_t *this, changelog_priv_t *priv)
{
int ret = 0;
ret = priv->cb->dtor(this, &priv->cd);
if (ret)
gf_msg(this->name, GF_LOG_ERROR, 0, CHANGELOG_MSG_FREEUP_FAILED,
"could not cleanup bootstrapper");
GF_FREE(priv->changelog_brick);
GF_FREE(priv->changelog_dir);
}
static int
changelog_init_options(xlator_t *this, changelog_priv_t *priv)
{
int ret = 0;
char *tmp = NULL;
uint32_t timeout = 0;
char htime_dir[PATH_MAX] = {
0,
};
char csnap_dir[PATH_MAX] = {
0,
};
GF_OPTION_INIT("changelog-brick", tmp, str, error_return);
priv->changelog_brick = gf_strdup(tmp);
if (!priv->changelog_brick)
goto error_return;
tmp = NULL;
GF_OPTION_INIT("changelog-dir", tmp, str, dealloc_1);
priv->changelog_dir = gf_strdup(tmp);
if (!priv->changelog_dir)
goto dealloc_1;
tmp = NULL;
/**
* create the directory even if change-logging would be inactive
* so that consumers can _look_ into it (finding nothing...)
*/
ret = mkdir_p(priv->changelog_dir, 0600, _gf_true);
if (ret)
goto dealloc_2;
CHANGELOG_FILL_HTIME_DIR(priv->changelog_dir, htime_dir);
ret = mkdir_p(htime_dir, 0600, _gf_true);
if (ret)
goto dealloc_2;
CHANGELOG_FILL_CSNAP_DIR(priv->changelog_dir, csnap_dir);
ret = mkdir_p(csnap_dir, 0600, _gf_true);
if (ret)
goto dealloc_2;
GF_OPTION_INIT("changelog", priv->active, bool, dealloc_2);
GF_OPTION_INIT("capture-del-path", priv->capture_del_path, bool, dealloc_2);
GF_OPTION_INIT("op-mode", tmp, str, dealloc_2);
changelog_assign_opmode(priv, tmp);
tmp = NULL;
GF_OPTION_INIT("encoding", tmp, str, dealloc_2);
changelog_assign_encoding(priv, tmp);
changelog_encode_change(priv);
GF_OPTION_INIT("rollover-time", priv->rollover_time, int32, dealloc_2);
GF_OPTION_INIT("fsync-interval", priv->fsync_interval, int32, dealloc_2);
GF_OPTION_INIT("changelog-barrier-timeout", timeout, time, dealloc_2);
changelog_assign_barrier_timeout(priv, timeout);
GF_ASSERT(cb_bootstrap[priv->op_mode].mode == priv->op_mode);
priv->cb = &cb_bootstrap[priv->op_mode];
/* ... now bootstrap the logger */
ret = priv->cb->ctor(this, &priv->cd);
if (ret)
goto dealloc_2;
priv->changelog_fd = -1;
return 0;
dealloc_2:
GF_FREE(priv->changelog_dir);
dealloc_1:
GF_FREE(priv->changelog_brick);
error_return:
return -1;
}
static void
changelog_cleanup_rpc(xlator_t *this, changelog_priv_t *priv)
{
/* terminate rpc server */
if (!this->cleanup_starting)
changelog_destroy_rpc_listner(this, priv);
(void)changelog_cleanup_rpc_threads(this, priv);
/* cleanup rot buffs */
rbuf_dtor(priv->rbuf);
/* cleanup poller thread */
if (priv->poller)
(void)changelog_thread_cleanup(this, priv->poller);
}
static int
changelog_init_rpc(xlator_t *this, changelog_priv_t *priv)
{
rpcsvc_t *rpc = NULL;
changelog_ev_selector_t *selection = NULL;
selection = &priv->ev_selection;
/* initialize event selection */
changelog_init_event_selection(this, selection);
priv->rbuf = rbuf_init(NR_ROTT_BUFFS);
if (!priv->rbuf)
goto cleanup_thread;
rpc = changelog_init_rpc_listener(this, priv, priv->rbuf, NR_DISPATCHERS);
if (!rpc)
goto cleanup_rbuf;
priv->rpc = rpc;
return 0;
cleanup_rbuf:
rbuf_dtor(priv->rbuf);
cleanup_thread:
if (priv->poller)
(void)changelog_thread_cleanup(this, priv->poller);
return -1;
}
int32_t
init(xlator_t *this)
{
int ret = -1;
changelog_priv_t *priv = NULL;
GF_VALIDATE_OR_GOTO("changelog", this, error_return);
if (!this->children || this->children->next) {
gf_msg(this->name, GF_LOG_ERROR, 0, CHANGELOG_MSG_CHILD_MISCONFIGURED,
"translator needs a single subvolume");
goto error_return;
}
if (!this->parents) {
gf_msg(this->name, GF_LOG_ERROR, 0, CHANGELOG_MSG_VOL_MISCONFIGURED,
"dangling volume. please check volfile");
goto error_return;
}
priv = GF_CALLOC(1, sizeof(*priv), gf_changelog_mt_priv_t);
if (!priv)
goto error_return;
this->local_pool = mem_pool_new(changelog_local_t, 64);
if (!this->local_pool) {
gf_msg(this->name, GF_LOG_ERROR, ENOMEM, CHANGELOG_MSG_NO_MEMORY,
"failed to create local memory pool");
goto cleanup_priv;
}
LOCK_INIT(&priv->lock);
LOCK_INIT(&priv->c_snap_lock);
GF_ATOMIC_INIT(priv->listnercnt, 0);
GF_ATOMIC_INIT(priv->clntcnt, 0);
GF_ATOMIC_INIT(priv->xprtcnt, 0);
INIT_LIST_HEAD(&priv->xprt_list);
ret = changelog_init_options(this, priv);
if (ret)
goto cleanup_mempool;
/* snap dependency changes */
priv->dm.black_fop_cnt = 0;
priv->dm.white_fop_cnt = 0;
priv->dm.drain_wait_black = _gf_false;
priv->dm.drain_wait_white = _gf_false;
priv->current_color = FOP_COLOR_BLACK;
priv->explicit_rollover = _gf_false;
priv->cr.notify = _gf_false;
/* Mutex is not needed as threads are not spawned yet */
priv->bn.bnotify = _gf_false;
priv->bn.bnotify_error = _gf_false;
ret = changelog_barrier_pthread_init(this, priv);
if (ret)
goto cleanup_options;
LOCK_INIT(&priv->bflags.lock);
priv->bflags.barrier_ext = _gf_false;
/* Changelog barrier init */
INIT_LIST_HEAD(&priv->queue);
priv->barrier_enabled = _gf_false;
/* RPC ball rolling.. */
ret = changelog_init_rpc(this, priv);
if (ret)
goto cleanup_barrier;
ret = changelog_init(this, priv);
if (ret)
goto cleanup_rpc;
gf_msg_debug(this->name, 0, "changelog translator loaded");
this->private = priv;
return 0;
cleanup_rpc:
changelog_cleanup_rpc(this, priv);
cleanup_barrier:
changelog_barrier_pthread_destroy(priv);
cleanup_options:
changelog_freeup_options(this, priv);
cleanup_mempool:
mem_pool_destroy(this->local_pool);
cleanup_priv:
GF_FREE(priv);
error_return:
this->private = NULL;
return -1;
}
void
fini(xlator_t *this)
{
changelog_priv_t *priv = NULL;
struct list_head queue = {
0,
};
priv = this->private;
if (priv) {
/* terminate RPC server/threads */
changelog_cleanup_rpc(this, priv);
/* call barrier_disable to cancel timer */
if (priv->barrier_enabled)
__chlog_barrier_disable(this, &queue);
/* cleanup barrier related objects */
changelog_barrier_pthread_destroy(priv);
/* cleanup helper threads */
changelog_cleanup_helper_threads(this, priv);
/* cleanup allocated options */
changelog_freeup_options(this, priv);
/* deallocate mempool */
mem_pool_destroy(this->local_pool);
if (priv->htime_fd != -1) {
sys_close(priv->htime_fd);
}
/* finally, dealloac private variable */
GF_FREE(priv);
}
this->private = NULL;
this->local_pool = NULL;
return;
}
struct xlator_fops fops = {
.open = changelog_open,
.mknod = changelog_mknod,
.mkdir = changelog_mkdir,
.create = changelog_create,
.symlink = changelog_symlink,
.writev = changelog_writev,
.truncate = changelog_truncate,
.ftruncate = changelog_ftruncate,
.link = changelog_link,
.rename = changelog_rename,
.unlink = changelog_unlink,
.rmdir = changelog_rmdir,
.setattr = changelog_setattr,
.fsetattr = changelog_fsetattr,
.setxattr = changelog_setxattr,
.fsetxattr = changelog_fsetxattr,
.removexattr = changelog_removexattr,
.fremovexattr = changelog_fremovexattr,
.ipc = changelog_ipc,
.xattrop = changelog_xattrop,
.fxattrop = changelog_fxattrop,
};
struct xlator_cbks cbks = {
.forget = changelog_forget,
.release = changelog_release,
};
struct volume_options options[] = {
{.key = {"changelog"},
.type = GF_OPTION_TYPE_BOOL,
.default_value = "off",
.description = "enable/disable change-logging",
.op_version = {3},
.flags = OPT_FLAG_SETTABLE,
.level = OPT_STATUS_BASIC,
.tags = {"journal", "georep", "glusterfind"}},
{.key = {"changelog-brick"},
.type = GF_OPTION_TYPE_PATH,
.description = "brick path to generate unique socket file name."
" should be the export directory of the volume strictly.",
.default_value = "{{ brick.path }}",
.op_version = {3},
.tags = {"journal"}},
{.key = {"changelog-dir"},
.type = GF_OPTION_TYPE_PATH,
.description = "directory for the changelog files",
.default_value = "{{ brick.path }}/.glusterfs/changelogs",
.op_version = {3},
.flags = OPT_FLAG_SETTABLE,
.level = OPT_STATUS_ADVANCED,
.tags = {"journal", "georep", "glusterfind"}},
{.key = {"op-mode"},
.type = GF_OPTION_TYPE_STR,
.default_value = "realtime",
.value = {"realtime"},
.description = "operation mode - futuristic operation modes",
.op_version = {3},
.tags = {"journal"}},
{.key = {"encoding"},
.type = GF_OPTION_TYPE_STR,
.default_value = "ascii",
.value = {"binary", "ascii"},
.description = "encoding type for changelogs",
.op_version = {3},
.flags = OPT_FLAG_SETTABLE,
.level = OPT_STATUS_ADVANCED,
.tags = {"journal"}},
{.key = {"rollover-time"},
.default_value = "15",
.type = GF_OPTION_TYPE_TIME,
.description = "time to switch to a new changelog file (in seconds)",
.op_version = {3},
.flags = OPT_FLAG_SETTABLE,
.level = OPT_STATUS_ADVANCED,
.tags = {"journal", "georep", "glusterfind"}},
{.key = {"fsync-interval"},
.type = GF_OPTION_TYPE_TIME,
.default_value = "5",
.description = "do not open CHANGELOG file with O_SYNC mode."
" instead perform fsync() at specified intervals",
.op_version = {3},
.flags = OPT_FLAG_SETTABLE,
.level = OPT_STATUS_ADVANCED,
.tags = {"journal"}},
{.key = {"changelog-barrier-timeout"},
.type = GF_OPTION_TYPE_TIME,
.default_value = BARRIER_TIMEOUT,
.description = "After 'timeout' seconds since the time 'barrier' "
"option was set to \"on\", unlink/rmdir/rename "
"operations are no longer blocked and previously "
"blocked fops are allowed to go through",
.op_version = {3},
.flags = OPT_FLAG_SETTABLE,
.level = OPT_STATUS_ADVANCED,
.tags = {"journal"}},
{.key = {"capture-del-path"},
.type = GF_OPTION_TYPE_BOOL,
.default_value = "off",
.description = "enable/disable capturing paths of deleted entries",
.op_version = {3},
.flags = OPT_FLAG_SETTABLE,
.level = OPT_STATUS_BASIC,
.tags = {"journal", "glusterfind"}},
{.key = {NULL}},
};
xlator_api_t xlator_api = {
.init = init,
.fini = fini,
.notify = notify,
.reconfigure = reconfigure,
.mem_acct_init = mem_acct_init,
.op_version = {1}, /* Present from the initial version */
.fops = &fops,
.cbks = &cbks,
.options = options,
.identifier = "changelog",
.category = GF_MAINTAINED,
};