/*
Copyright (c) 2015 Red Hat, Inc. <http://www.redhat.com>
This file is part of GlusterFS.
This file is licensed to you under your choice of the GNU Lesser
General Public License, version 3 or any later version (LGPLv3 or
later), or the GNU General Public License, version 2 (GPLv2), in all
cases as published by the Free Software Foundation.
*/
#include <ctype.h>
#include <sys/uio.h>
#include <signal.h>
#include <glusterfs/glusterfs.h>
#include <glusterfs/xlator.h>
#include <glusterfs/logging.h>
#include "changelog.h"
#include <glusterfs/compat-errno.h>
#include <glusterfs/call-stub.h>
#include "bit-rot-stub.h"
#include "bit-rot-stub-mem-types.h"
#include "bit-rot-stub-messages.h"
#include "bit-rot-common.h"
#define BR_STUB_REQUEST_COOKIE 0x1
void
br_stub_lock_cleaner(void *arg)
{
pthread_mutex_t *clean_mutex = arg;
pthread_mutex_unlock(clean_mutex);
return;
}
void *
br_stub_signth(void *);
struct br_stub_signentry {
unsigned long v;
call_stub_t *stub;
struct list_head list;
};
int32_t
mem_acct_init(xlator_t *this)
{
int32_t ret = -1;
if (!this)
return ret;
ret = xlator_mem_acct_init(this, gf_br_stub_mt_end + 1);
if (ret != 0) {
gf_msg(this->name, GF_LOG_WARNING, 0, BRS_MSG_MEM_ACNT_FAILED,
"Memory accounting init failed");
return ret;
}
return ret;
}
int
br_stub_bad_object_container_init(xlator_t *this, br_stub_private_t *priv)
{
pthread_attr_t w_attr;
int ret = -1;
ret = pthread_cond_init(&priv->container.bad_cond, NULL);
if (ret != 0) {
gf_msg(this->name, GF_LOG_ERROR, 0, BRS_MSG_BAD_OBJ_THREAD_FAIL,
"pthread_cond_init failed (%d)", ret);
goto out;
}
ret = pthread_mutex_init(&priv->container.bad_lock, NULL);
if (ret != 0) {
gf_msg(this->name, GF_LOG_ERROR, 0, BRS_MSG_BAD_OBJ_THREAD_FAIL,
"pthread_mutex_init failed (%d)", ret);
goto cleanup_cond;
}
ret = pthread_attr_init(&w_attr);
if (ret != 0) {
gf_msg(this->name, GF_LOG_ERROR, 0, BRS_MSG_BAD_OBJ_THREAD_FAIL,
"pthread_attr_init failed (%d)", ret);
goto cleanup_lock;
}
ret = pthread_attr_setstacksize(&w_attr, BAD_OBJECT_THREAD_STACK_SIZE);
if (ret == EINVAL) {
gf_msg(this->name, GF_LOG_WARNING, 0, BRS_MSG_BAD_OBJ_THREAD_FAIL,
"Using default thread stack size");
}
INIT_LIST_HEAD(&priv->container.bad_queue);
ret = br_stub_dir_create(this, priv);
if (ret < 0)
goto cleanup_lock;
ret = gf_thread_create(&priv->container.thread, &w_attr, br_stub_worker,
this, "brswrker");
if (ret)
goto cleanup_attr;
return 0;
cleanup_attr:
pthread_attr_destroy(&w_attr);
cleanup_lock:
pthread_mutex_destroy(&priv->container.bad_lock);
cleanup_cond:
pthread_cond_destroy(&priv->container.bad_cond);
out:
return -1;
}
int32_t
init(xlator_t *this)
{
int ret = 0;
char *tmp = NULL;
struct timeval tv = {
0,
};
br_stub_private_t *priv = NULL;
if (!this->children) {
gf_msg(this->name, GF_LOG_ERROR, 0, BRS_MSG_NO_CHILD,
"FATAL: no children");
goto error_return;
}
priv = GF_CALLOC(1, sizeof(*priv), gf_br_stub_mt_private_t);
if (!priv)
goto error_return;
priv->local_pool = mem_pool_new(br_stub_local_t, 512);
if (!priv->local_pool)
goto free_priv;
GF_OPTION_INIT("bitrot", priv->do_versioning, bool, free_mempool);
GF_OPTION_INIT("export", tmp, str, free_mempool);
if (snprintf(priv->export, PATH_MAX, "%s", tmp) >= PATH_MAX)
goto free_mempool;
if (snprintf(priv->stub_basepath, sizeof(priv->stub_basepath), "%s/%s",
priv->export,
BR_STUB_QUARANTINE_DIR) >= sizeof(priv->stub_basepath))
goto free_mempool;
(void)gettimeofday(&tv, NULL);
/* boot time is in network endian format */
priv->boot[0] = htonl(tv.tv_sec);
priv->boot[1] = htonl(tv.tv_usec);
pthread_mutex_init(&priv->lock, NULL);
pthread_cond_init(&priv->cond, NULL);
INIT_LIST_HEAD(&priv->squeue);
/* Thread creations need 'this' to be passed so that THIS can be
* assigned inside the thread. So setting this->private here.
*/
this->private = priv;
if (!priv->do_versioning)
return 0;
ret = gf_thread_create(&priv->signth, NULL, br_stub_signth, this,
"brssign");
if (ret != 0) {
gf_msg(this->name, GF_LOG_WARNING, 0, BRS_MSG_SPAWN_SIGN_THRD_FAILED,
"failed to create the new thread for signer");
goto cleanup_lock;
}
ret = br_stub_bad_object_container_init(this, priv);
if (ret) {
gf_msg(this->name, GF_LOG_ERROR, 0, BRS_MSG_BAD_CONTAINER_FAIL,
"failed to launch the thread for storing bad gfids");
goto cleanup_lock;
}
gf_msg_debug(this->name, 0, "bit-rot stub loaded");
return 0;
cleanup_lock:
pthread_cond_destroy(&priv->cond);
pthread_mutex_destroy(&priv->lock);
free_mempool:
mem_pool_destroy(priv->local_pool);
free_priv:
GF_FREE(priv);
this->private = NULL;
error_return:
return -1;
}
/* TODO:
* As of now enabling bitrot option does 2 things.
* 1) Start the Bitrot Daemon which signs the objects (currently files only)
* upon getting notified by the stub.
* 2) Enable versioning of the objects. Object versions (again files only) are
* incremented upon modification.
* So object versioning is tied to bitrot daemon's signing. In future, object
* versioning might be necessary for other things as well apart from bit-rot
* detection (well that's the objective of bringing in object-versioning :)).
* In that case, better to make versioning a new option and letting it to be
* enabled despite bit-rot detection is not needed.
* Ex: ICAP.
*/
int32_t
reconfigure(xlator_t *this, dict_t *options)
{
int32_t ret = -1;
br_stub_private_t *priv = NULL;
priv = this->private;
GF_OPTION_RECONF("bitrot", priv->do_versioning, options, bool, err);
if (priv->do_versioning && !priv->signth) {
ret = gf_thread_create(&priv->signth, NULL, br_stub_signth, this,
"brssign");
if (ret != 0) {
gf_msg(this->name, GF_LOG_WARNING, 0,
BRS_MSG_SPAWN_SIGN_THRD_FAILED,
"failed to create the new thread for signer");
goto err;
}
ret = br_stub_bad_object_container_init(this, priv);
if (ret) {
gf_msg(this->name, GF_LOG_ERROR, 0, BRS_MSG_BAD_CONTAINER_FAIL,
"failed to launch the thread for storing bad gfids");
goto err;
}
} else {
if (priv->signth) {
if (gf_thread_cleanup_xint(priv->signth)) {
gf_msg(this->name, GF_LOG_ERROR, 0,
BRS_MSG_CANCEL_SIGN_THREAD_FAILED,
"Could not cancel sign serializer thread");
} else {
gf_msg(this->name, GF_LOG_INFO, 0, BRS_MSG_KILL_SIGN_THREAD,
"killed the signer thread");
priv->signth = 0;
}
}
if (priv->container.thread) {
if (gf_thread_cleanup_xint(priv->container.thread)) {
gf_msg(this->name, GF_LOG_ERROR, 0,
BRS_MSG_CANCEL_SIGN_THREAD_FAILED,
"Could not cancel sign serializer thread");
}
priv->container.thread = 0;
}
}
ret = 0;
return ret;
err:
if (priv->signth) {
if (gf_thread_cleanup_xint(priv->signth)) {
gf_msg(this->name, GF_LOG_ERROR, 0,
BRS_MSG_CANCEL_SIGN_THREAD_FAILED,
"Could not cancel sign serializer thread");
}
priv->signth = 0;
}
if (priv->container.thread) {
if (gf_thread_cleanup_xint(priv->container.thread)) {
gf_msg(this->name, GF_LOG_ERROR, 0,
BRS_MSG_CANCEL_SIGN_THREAD_FAILED,
"Could not cancel sign serializer thread");
}
priv->container.thread = 0;
}
ret = -1;
return ret;
}
int
notify(xlator_t *this, int event, void *data, ...)
{
br_stub_private_t *priv = NULL;
if (!this)
return 0;
priv = this->private;
if (!priv)
return 0;
default_notify(this, event, data);
return 0;
}
void
fini(xlator_t *this)
{
int32_t ret = 0;
br_stub_private_t *priv = this->private;
struct br_stub_signentry *sigstub = NULL;
call_stub_t *stub = NULL;
if (!priv)
return;
if (!priv->do_versioning)
goto cleanup;
ret = gf_thread_cleanup_xint(priv->signth);
if (ret) {
gf_msg(this->name, GF_LOG_ERROR, 0, BRS_MSG_CANCEL_SIGN_THREAD_FAILED,
"Could not cancel sign serializer thread");
goto out;
}
priv->signth = 0;
while (!list_empty(&priv->squeue)) {
sigstub = list_first_entry(&priv->squeue, struct br_stub_signentry,
list);
list_del_init(&sigstub->list);
call_stub_destroy(sigstub->stub);
GF_FREE(sigstub);
}
ret = gf_thread_cleanup_xint(priv->container.thread);
if (ret) {
gf_msg(this->name, GF_LOG_ERROR, 0, BRS_MSG_CANCEL_SIGN_THREAD_FAILED,
"Could not cancel sign serializer thread");
goto out;
}
priv->container.thread = 0;
while (!list_empty(&priv->container.bad_queue)) {
stub = list_first_entry(&priv->container.bad_queue, call_stub_t, list);
list_del_init(&stub->list);
call_stub_destroy(stub);
}
pthread_mutex_destroy(&priv->container.bad_lock);
pthread_cond_destroy(&priv->container.bad_cond);
cleanup:
pthread_mutex_destroy(&priv->lock);
pthread_cond_destroy(&priv->cond);
if (priv->local_pool) {
mem_pool_destroy(priv->local_pool);
priv->local_pool = NULL;
}
this->private = NULL;
GF_FREE(priv);
out:
return;
}
static int
br_stub_alloc_versions(br_version_t **obuf, br_signature_t **sbuf,
size_t signaturelen)
{
void *mem = NULL;
size_t size = 0;
if (obuf)
size += sizeof(br_version_t);
if (sbuf)
size += sizeof(br_signature_t) + signaturelen;
mem = GF_CALLOC(1, size, gf_br_stub_mt_version_t);
if (!mem)
goto error_return;
if (obuf) {
*obuf = (br_version_t *)mem;
mem = ((char *)mem + sizeof(br_version_t));
}
if (sbuf) {
*sbuf = (br_signature_t *)mem;
}
return 0;
error_return:
return -1;
}
static void
br_stub_dealloc_versions(void *mem)
{
GF_FREE(mem);
}
static br_stub_local_t *
br_stub_alloc_local(xlator_t *this)
{
br_stub_private_t *priv = this->private;
return mem_get0(priv->local_pool);
}
static void
br_stub_dealloc_local(br_stub_local_t *ptr)
{
if (!ptr)
return;
mem_put(ptr);
}
static int
br_stub_prepare_version_request(xlator_t *this, dict_t *dict,
br_version_t *obuf, unsigned long oversion)
{
br_stub_private_t *priv = NULL;
priv = this->private;
br_set_ongoingversion(obuf, oversion, priv->boot);
return dict_set_static_bin(dict, BITROT_CURRENT_VERSION_KEY, (void *)obuf,
sizeof(br_version_t));
}
static int
br_stub_prepare_signing_request(dict_t *dict, br_signature_t *sbuf,
br_isignature_t *sign, size_t signaturelen)
{
size_t size = 0;
br_set_signature(sbuf, sign, signaturelen, &size);
return dict_set_static_bin(dict, BITROT_SIGNING_VERSION_KEY, (void *)sbuf,
size);
}
/**
* initialize an inode context starting with a given ongoing version.
* a fresh lookup() or a first creat() call initializes the inode
* context, hence the inode is marked dirty. this routine also
* initializes the transient inode version.
*/
static int
br_stub_init_inode_versions(xlator_t *this, fd_t *fd, inode_t *inode,
unsigned long version, gf_boolean_t markdirty,
gf_boolean_t bad_object, uint64_t *ctx_addr)
{
int32_t ret = 0;
br_stub_inode_ctx_t *ctx = NULL;
ctx = GF_CALLOC(1, sizeof(br_stub_inode_ctx_t), gf_br_stub_mt_inode_ctx_t);
if (!ctx)
goto error_return;
INIT_LIST_HEAD(&ctx->fd_list);
(markdirty) ? __br_stub_mark_inode_dirty(ctx)
: __br_stub_mark_inode_synced(ctx);
__br_stub_set_ongoing_version(ctx, version);
if (bad_object)
__br_stub_mark_object_bad(ctx);
if (fd) {
ret = br_stub_add_fd_to_inode(this, fd, ctx);
if (ret)
goto free_ctx;
}
ret = br_stub_set_inode_ctx(this, inode, ctx);
if (ret)
goto free_ctx;
if (ctx_addr)
*ctx_addr = (uint64_t)(uintptr_t)ctx;
return 0;
free_ctx:
GF_FREE(ctx);
error_return:
return -1;
}
/**
* modify the ongoing version of an inode.
*/
static int
br_stub_mod_inode_versions(xlator_t *this, fd_t *fd, inode_t *inode,
unsigned long version)
{
int32_t ret = -1;
br_stub_inode_ctx_t *ctx = 0;
LOCK(&inode->lock);
{
ctx = __br_stub_get_ongoing_version_ctx(this, inode, NULL);
if (ctx == NULL)
goto unblock;
if (__br_stub_is_inode_dirty(ctx)) {
__br_stub_set_ongoing_version(ctx, version);
__br_stub_mark_inode_synced(ctx);
}
ret = 0;
}
unblock:
UNLOCK(&inode->lock);
return ret;
}
static void
br_stub_fill_local(br_stub_local_t *local, call_stub_t *stub, fd_t *fd,
inode_t *inode, uuid_t gfid, int versioningtype,
unsigned long memversion)
{
local->fopstub = stub;
local->versioningtype = versioningtype;
local->u.context.version = memversion;
if (fd)
local->u.context.fd = fd_ref(fd);
if (inode)
local->u.context.inode = inode_ref(inode);
gf_uuid_copy(local->u.context.gfid, gfid);
}
static void
br_stub_cleanup_local(br_stub_local_t *local)
{
if (!local)
return;
local->fopstub = NULL;
local->versioningtype = 0;
local->u.context.version = 0;
if (local->u.context.fd) {
fd_unref(local->u.context.fd);
local->u.context.fd = NULL;
}
if (local->u.context.inode) {
inode_unref(local->u.context.inode);
local->u.context.inode = NULL;
}
memset(local->u.context.gfid, '\0', sizeof(uuid_t));
}
static int
br_stub_need_versioning(xlator_t *this, fd_t *fd, gf_boolean_t *versioning,
gf_boolean_t *modified, br_stub_inode_ctx_t **ctx)
{
int32_t ret = -1;
uint64_t ctx_addr = 0;
br_stub_inode_ctx_t *c = NULL;
unsigned long version = BITROT_DEFAULT_CURRENT_VERSION;
*versioning = _gf_false;
*modified = _gf_false;
/* Bitrot stub inode context was initialized only in lookup, create
* and mknod cbk path. Object versioning was enabled by default
* irrespective of bitrot enabled or not. But it's made optional now.
* As a consequence there could be cases where getting inode ctx would
* fail because it's not set yet.
* e.g., If versioning (with bitrot enable) is enabled while I/O is
* happening, it could directly get other fops like writev without
* lookup, where getting inode ctx would fail. Hence initialize the
* inode ctx on failure to get ctx. This is done in all places where
* applicable.
*/
ret = br_stub_get_inode_ctx(this, fd->inode, &ctx_addr);
if (ret < 0) {
ret = br_stub_init_inode_versions(this, fd, fd->inode, version,
_gf_true, _gf_false, &ctx_addr);
if (ret) {
gf_msg(this->name, GF_LOG_ERROR, 0,
BRS_MSG_GET_INODE_CONTEXT_FAILED,
"failed to "
" init the inode context for the inode %s",
uuid_utoa(fd->inode->gfid));
goto error_return;
}
}
c = (br_stub_inode_ctx_t *)(long)ctx_addr;
LOCK(&fd->inode->lock);
{
if (__br_stub_is_inode_dirty(c))
*versioning = _gf_true;
if (__br_stub_is_inode_modified(c))
*modified = _gf_true;
}
UNLOCK(&fd->inode->lock);
if (ctx)
*ctx = c;
return 0;
error_return:
return -1;
}
static int32_t
br_stub_anon_fd_ctx(xlator_t *this, fd_t *fd, br_stub_inode_ctx_t *ctx)
{
int32_t ret = -1;
br_stub_fd_t *br_stub_fd = NULL;
br_stub_fd = br_stub_fd_ctx_get(this, fd);
if (!br_stub_fd) {
ret = br_stub_add_fd_to_inode(this, fd, ctx);
if (ret) {
gf_msg(this->name, GF_LOG_ERROR, 0, BRS_MSG_ADD_FD_TO_INODE,
"failed to add fd to "
"the inode (gfid: %s)",
uuid_utoa(fd->inode->gfid));
goto out;
}
}
ret = 0;
out:
return ret;
}
static int
br_stub_versioning_prep(call_frame_t *frame, xlator_t *this, fd_t *fd,
br_stub_inode_ctx_t *ctx)
{
int32_t ret = -1;
br_stub_local_t *local = NULL;
local = br_stub_alloc_local(this);
if (!local) {
gf_msg(this->name, GF_LOG_ERROR, ENOMEM, BRS_MSG_NO_MEMORY,
"local allocation failed (gfid: %s)",
uuid_utoa(fd->inode->gfid));
goto error_return;
}
if (fd_is_anonymous(fd)) {
ret = br_stub_anon_fd_ctx(this, fd, ctx);
if (ret)
goto free_local;
}
frame->local = local;
return 0;
free_local:
br_stub_dealloc_local(local);
error_return:
return -1;
}
static int
br_stub_mark_inode_modified(xlator_t *this, br_stub_local_t *local)
{
fd_t *fd = NULL;
int32_t ret = 0;
uint64_t ctx_addr = 0;
br_stub_inode_ctx_t *ctx = NULL;
unsigned long version = BITROT_DEFAULT_CURRENT_VERSION;
fd = local->u.context.fd;
ret = br_stub_get_inode_ctx(this, fd->inode, &ctx_addr);
if (ret < 0) {
ret = br_stub_init_inode_versions(this, fd, fd->inode, version,
_gf_true, _gf_false, &ctx_addr);
if (ret)
goto error_return;
}
ctx = (br_stub_inode_ctx_t *)(long)ctx_addr;
LOCK(&fd->inode->lock);
{
__br_stub_set_inode_modified(ctx);
}
UNLOCK(&fd->inode->lock);
return 0;
error_return:
return -1;
}
/**
* The possible return values from br_stub_is_bad_object () are:
* 1) 0 => as per the inode context object is not bad
* 2) -1 => Failed to get the inode context itself
* 3) -2 => As per the inode context object is bad
* Both -ve values means the fop which called this function is failed
* and error is returned upwards.
*/
static int
br_stub_check_bad_object(xlator_t *this, inode_t *inode, int32_t *op_ret,
int32_t *op_errno)
{
int ret = -1;
unsigned long version = BITROT_DEFAULT_CURRENT_VERSION;
ret = br_stub_is_bad_object(this, inode);
if (ret == -2) {
gf_msg(this->name, GF_LOG_ERROR, 0, BRS_MSG_BAD_OBJECT_ACCESS,
"%s is a bad object. Returning", uuid_utoa(inode->gfid));
*op_ret = -1;
*op_errno = EIO;
}
if (ret == -1) {
ret = br_stub_init_inode_versions(this, NULL, inode, version, _gf_true,
_gf_false, NULL);
if (ret) {
gf_msg(
this->name, GF_LOG_ERROR, 0, BRS_MSG_GET_INODE_CONTEXT_FAILED,
"failed to init inode context for %s", uuid_utoa(inode->gfid));
*op_ret = -1;
*op_errno = EINVAL;
}
}
return ret;
}
/**
* callback for inode/fd versioning
*/
int
br_stub_fd_incversioning_cbk(call_frame_t *frame, void *cookie, xlator_t *this,
int op_ret, int op_errno, dict_t *xdata)
{
fd_t *fd = NULL;
inode_t *inode = NULL;
unsigned long version = 0;
br_stub_local_t *local = NULL;
local = (br_stub_local_t *)frame->local;
if (op_ret < 0)
goto done;
fd = local->u.context.fd;
inode = local->u.context.inode;
version = local->u.context.version;
op_ret = br_stub_mod_inode_versions(this, fd, inode, version);
if (op_ret < 0)
op_errno = EINVAL;
done:
if (op_ret < 0) {
frame->local = NULL;
call_unwind_error(local->fopstub, -1, op_errno);
br_stub_cleanup_local(local);
br_stub_dealloc_local(local);
} else {
call_resume(local->fopstub);
}
return 0;
}
/**
* Initial object versioning
*
* Version persists two (2) extended attributes as explained below:
* 1. Current (ongoing) version: This is incremented on an writev ()
* or truncate () and is the running version for an object.
* 2. Signing version: This is the version against which an object
* was signed (checksummed).
*
* During initial versioning, both ongoing and signing versions are
* set of one and zero respectively. A write() call increments the
* ongoing version as an indication of modification to the object.
* Additionally this needs to be persisted on disk and needs to be
* durable: fsync().. :-/
* As an optimization only the first write() synchronizes the ongoing
* version to disk, subsequent write()s before the *last* release()
* are no-op's.
*
* create(), just like lookup() initializes the object versions to
* the default. As an optimization this is not a durable operation:
* in case of a crash, hard reboot etc.. absence of versioning xattrs
* is ignored in scrubber along with the one time crawler explicitly
* triggering signing for such objects.
*
* c.f. br_stub_writev() / br_stub_truncate()
*/
/**
* perform full or incremental versioning on an inode pointd by an
* fd. incremental versioning is done when an inode is dirty and a
* writeback is triggered.
*/
int
br_stub_fd_versioning(xlator_t *this, call_frame_t *frame, call_stub_t *stub,
dict_t *dict, fd_t *fd, br_stub_version_cbk *callback,
unsigned long memversion, int versioningtype, int durable)
{
int32_t ret = -1;
int flags = 0;
dict_t *xdata = NULL;
br_stub_local_t *local = NULL;
xdata = dict_new();
if (!xdata)
goto done;
ret = dict_set_int32(xdata, GLUSTERFS_INTERNAL_FOP_KEY, 1);
if (ret)
goto dealloc_xdata;
if (durable) {
ret = dict_set_int32(xdata, GLUSTERFS_DURABLE_OP, 0);
if (ret)
goto dealloc_xdata;
}
local = frame->local;
br_stub_fill_local(local, stub, fd, fd->inode, fd->inode->gfid,
versioningtype, memversion);
STACK_WIND(frame, callback, FIRST_CHILD(this),
FIRST_CHILD(this)->fops->fsetxattr, fd, dict, flags, xdata);
ret = 0;
dealloc_xdata:
dict_unref(xdata);
done:
return ret;
}
static int
br_stub_perform_incversioning(xlator_t *this, call_frame_t *frame,
call_stub_t *stub, fd_t *fd,
br_stub_inode_ctx_t *ctx)
{
int32_t ret = -1;
dict_t *dict = NULL;
br_version_t *obuf = NULL;
unsigned long writeback_version = 0;
int op_errno = 0;
br_stub_local_t *local = NULL;
op_errno = EINVAL;
local = frame->local;
writeback_version = __br_stub_writeback_version(ctx);
op_errno = ENOMEM;
dict = dict_new();
if (!dict)
goto done;
ret = br_stub_alloc_versions(&obuf, NULL, 0);
if (ret)
goto dealloc_dict;
ret = br_stub_prepare_version_request(this, dict, obuf, writeback_version);
if (ret)
goto dealloc_versions;
ret = br_stub_fd_versioning(
this, frame, stub, dict, fd, br_stub_fd_incversioning_cbk,
writeback_version, BR_STUB_INCREMENTAL_VERSIONING, !WRITEBACK_DURABLE);
dealloc_versions:
br_stub_dealloc_versions(obuf);
dealloc_dict:
dict_unref(dict);
done:
if (ret) {
if (local)
frame->local = NULL;
call_unwind_error(stub, -1, op_errno);
if (local) {
br_stub_cleanup_local(local);
br_stub_dealloc_local(local);
}
}
return ret;
}
/** {{{ */
/* fsetxattr() */
int32_t
br_stub_perform_objsign(call_frame_t *frame, xlator_t *this, fd_t *fd,
dict_t *dict, int flags, dict_t *xdata)
{
STACK_WIND(frame, default_fsetxattr_cbk, FIRST_CHILD(this),
FIRST_CHILD(this)->fops->fsetxattr, fd, dict, flags, xdata);
dict_unref(xdata);
return 0;
}
void *
br_stub_signth(void *arg)
{
xlator_t *this = arg;
br_stub_private_t *priv = this->private;
struct br_stub_signentry *sigstub = NULL;
THIS = this;
while (1) {
/*
* Disabling bit-rot feature leads to this particular thread
* getting cleaned up by reconfigure via a call to the function
* gf_thread_cleanup_xint (which in turn calls pthread_cancel
* and pthread_join). But, if this thread had held the mutex
* &priv->lock at the time of cancellation, then it leads to
* deadlock in future when bit-rot feature is enabled (which
* again spawns this thread which cant hold the lock as the
* mutex is still held by the previous instance of the thread
* which got killed). Also, the br_stub_handle_object_signature
* function which is called whenever file has to be signed
* also gets blocked as it too attempts to acquire &priv->lock.
*
* So, arrange for the lock to be unlocked as part of the
* cleanup of this thread using pthread_cleanup_push and
* pthread_cleanup_pop.
*/
pthread_cleanup_push(br_stub_lock_cleaner, &priv->lock);
pthread_mutex_lock(&priv->lock);
{
while (list_empty(&priv->squeue))
pthread_cond_wait(&priv->cond, &priv->lock);
sigstub = list_first_entry(&priv->squeue, struct br_stub_signentry,
list);
list_del_init(&sigstub->list);
}
pthread_mutex_unlock(&priv->lock);
pthread_cleanup_pop(0);
call_resume(sigstub->stub);
GF_FREE(sigstub);
}
return NULL;
}
static gf_boolean_t
br_stub_internal_xattr(dict_t *dict)
{
if (dict_get(dict, GLUSTERFS_SET_OBJECT_SIGNATURE) ||
dict_get(dict, GLUSTERFS_GET_OBJECT_SIGNATURE) ||
dict_get(dict, BR_REOPEN_SIGN_HINT_KEY) ||
dict_get(dict, BITROT_OBJECT_BAD_KEY) ||
dict_get(dict, BITROT_SIGNING_VERSION_KEY) ||
dict_get(dict, BITROT_CURRENT_VERSION_KEY))
return _gf_true;
return _gf_false;
}
int
orderq(struct list_head *elem1, struct list_head *elem2)
{
struct br_stub_signentry *s1 = NULL;
struct br_stub_signentry *s2 = NULL;
s1 = list_entry(elem1, struct br_stub_signentry, list);
s2 = list_entry(elem2, struct br_stub_signentry, list);
return (s1->v > s2->v);
}
static int
br_stub_compare_sign_version(xlator_t *this, inode_t *inode,
br_signature_t *sbuf, dict_t *dict,
int *fakesuccess)
{
int32_t ret = -1;
uint64_t tmp_ctx = 0;
gf_boolean_t invalid = _gf_false;
br_stub_inode_ctx_t *ctx = NULL;
GF_VALIDATE_OR_GOTO("bit-rot-stub", this, out);
GF_VALIDATE_OR_GOTO(this->name, inode, out);
GF_VALIDATE_OR_GOTO(this->name, sbuf, out);
GF_VALIDATE_OR_GOTO(this->name, dict, out);
ret = br_stub_get_inode_ctx(this, inode, &tmp_ctx);
if (ret) {
dict_del(dict, BITROT_SIGNING_VERSION_KEY);
goto out;
}
ctx = (br_stub_inode_ctx_t *)(long)tmp_ctx;
LOCK(&inode->lock);
{
if (ctx->currentversion < sbuf->signedversion) {
invalid = _gf_true;
} else if (ctx->currentversion > sbuf->signedversion) {
gf_msg_debug(this->name, 0,
"\"Signing version\" "
"(%lu) lower than \"Current version \" "
"(%lu)",
ctx->currentversion, sbuf->signedversion);
*fakesuccess = 1;
}
}
UNLOCK(&inode->lock);
if (invalid) {
ret = -1;
gf_msg(this->name, GF_LOG_WARNING, 0, BRS_MSG_SIGN_VERSION_ERROR,
"Signing version exceeds "
"current version [%lu > %lu]",
sbuf->signedversion, ctx->currentversion);
}
out:
return ret;
}
static int
br_stub_prepare_signature(xlator_t *this, dict_t *dict, inode_t *inode,
br_isignature_t *sign, int *fakesuccess)
{
int32_t ret = 0;
size_t signaturelen = 0;
br_signature_t *sbuf = NULL;
if (!br_is_signature_type_valid(sign->signaturetype))
goto error_return;
signaturelen = sign->signaturelen;
ret = br_stub_alloc_versions(NULL, &sbuf, signaturelen);
if (ret)
goto error_return;
ret = br_stub_prepare_signing_request(dict, sbuf, sign, signaturelen);
if (ret)
goto dealloc_versions;
ret = br_stub_compare_sign_version(this, inode, sbuf, dict, fakesuccess);
if (ret)
goto dealloc_versions;
return 0;
dealloc_versions:
br_stub_dealloc_versions(sbuf);
error_return:
return -1;
}
static void
br_stub_handle_object_signature(call_frame_t *frame, xlator_t *this, fd_t *fd,
dict_t *dict, br_isignature_t *sign,
dict_t *xdata)
{
int32_t ret = -1;
int32_t op_ret = -1;
int32_t op_errno = EINVAL;
int fakesuccess = 0;
br_stub_private_t *priv = NULL;
struct br_stub_signentry *sigstub = NULL;
priv = this->private;
if (frame->root->pid != GF_CLIENT_PID_BITD) {
gf_msg(this->name, GF_LOG_WARNING, op_errno, BRS_MSG_NON_BITD_PID,
"PID %d from where signature request"
"came, does not belong to bit-rot daemon."
"Unwinding the fop",
frame->root->pid);
goto dofop;
}
ret = br_stub_prepare_signature(this, dict, fd->inode, sign, &fakesuccess);
if (ret) {
gf_msg(this->name, GF_LOG_WARNING, 0, BRS_MSG_SIGN_PREPARE_FAIL,
"failed to prepare the signature for %s. Unwinding the fop",
uuid_utoa(fd->inode->gfid));
goto dofop;
}
if (fakesuccess) {
op_ret = op_errno = 0;
goto dofop;
}
dict_del(dict, GLUSTERFS_SET_OBJECT_SIGNATURE);
ret = -1;
if (!xdata) {
xdata = dict_new();
if (!xdata)
goto dofop;
} else {
dict_ref(xdata);
}
ret = dict_set_int32(xdata, GLUSTERFS_DURABLE_OP, 0);
if (ret)
goto unref_dict;
/* prepare dispatch stub to order object signing */
sigstub = GF_CALLOC(1, sizeof(*sigstub), gf_br_stub_mt_sigstub_t);
if (!sigstub)
goto unref_dict;
INIT_LIST_HEAD(&sigstub->list);
sigstub->v = ntohl(sign->signedversion);
sigstub->stub = fop_fsetxattr_stub(frame, br_stub_perform_objsign, fd, dict,
0, xdata);
if (!sigstub->stub)
goto cleanup_stub;
pthread_mutex_lock(&priv->lock);
{
list_add_order(&sigstub->list, &priv->squeue, orderq);
pthread_cond_signal(&priv->cond);
}
pthread_mutex_unlock(&priv->lock);
return;
cleanup_stub:
GF_FREE(sigstub);
unref_dict:
dict_unref(xdata);
dofop:
STACK_UNWIND_STRICT(fsetxattr, frame, op_ret, op_errno, NULL);
}
int32_t
br_stub_fsetxattr_resume(call_frame_t *frame, void *cookie, xlator_t *this,
int32_t op_ret, int32_t op_errno, dict_t *xdata)
{
int32_t ret = -1;
br_stub_local_t *local = NULL;
local = frame->local;
frame->local = NULL;
ret = br_stub_mark_inode_modified(this, local);
if (ret) {
op_ret = -1;
op_errno = EINVAL;
}
STACK_UNWIND_STRICT(fsetxattr, frame, op_ret, op_errno, xdata);
br_stub_cleanup_local(local);
br_stub_dealloc_local(local);
return 0;
}
/**
* Handles object reopens. Object reopens can be of 3 types. 2 are from
* oneshot crawler and 1 from the regular signer.
* ONESHOT CRAWLER:
* For those objects which were created before bitrot was enabled. oneshow
* crawler crawls the namespace and signs all the objects. It has to do
* the versioning before making bit-rot-stub send a sign notification.
* So it sends fsetxattr with BR_OBJECT_REOPEN as the value. And bit-rot-stub
* upon getting BR_OBJECT_REOPEN value checks if the version has to be
* increased or not. By default the version will be increased. But if the
* object is modified before BR_OBJECT_REOPEN from oneshot crawler, then
* versioning need not be done. In that case simply a success is returned.
* SIGNER:
* Signer wait for 2 minutes upon getting the notification from bit-rot-stub
* and then it sends a dummy write (in reality a fsetxattr) call, to change
* the state of the inode from REOPEN_WAIT to SIGN_QUICK. The funny part here
* is though the inode's state is REOPEN_WAIT, the call sent by signer is
* BR_OBJECT_RESIGN. Once the state is changed to SIGN_QUICK, then yet another
* notification is sent upon release (RESIGN would have happened via fsetxattr,
* so a fd is needed) and the object is signed truly this time.
* There is a challenge in the above RESIGN method by signer. After sending
* the 1st notification, the inode could be forgotten before RESIGN request
* is received. In that case, the inode's context (the newly looked up inode)
* would not indicate the inode as being modified (it would be in the default
* state) and because of this, a SIGN_QUICK notification to truly sign the
* object would not be sent. So, this is how its handled.
* if (request == RESIGN) {
* if (inode->sign_info == NORMAL) {
* mark_inode_non_dirty;
* mark_inode_modified;
* }
* GOBACK (means unwind without doing versioning)
* }
*/
static void
br_stub_handle_object_reopen(call_frame_t *frame, xlator_t *this, fd_t *fd,
uint32_t val)
{
int32_t ret = -1;
int32_t op_ret = -1;
int32_t op_errno = EINVAL;
call_stub_t *stub = NULL;
gf_boolean_t inc_version = _gf_false;
gf_boolean_t modified = _gf_false;
br_stub_inode_ctx_t *ctx = NULL;
br_stub_local_t *local = NULL;
gf_boolean_t goback = _gf_true;
ret = br_stub_need_versioning(this, fd, &inc_version, &modified, &ctx);
if (ret)
goto unwind;
LOCK(&fd->inode->lock);
{
if ((val == BR_OBJECT_REOPEN) && inc_version)
goback = _gf_false;
if (val == BR_OBJECT_RESIGN && ctx->info_sign == BR_SIGN_NORMAL) {
__br_stub_mark_inode_synced(ctx);
__br_stub_set_inode_modified(ctx);
}
(void)__br_stub_inode_sign_state(ctx, GF_FOP_FSETXATTR, fd);
}
UNLOCK(&fd->inode->lock);
if (goback) {
op_ret = op_errno = 0;
goto unwind;
}
ret = br_stub_versioning_prep(frame, this, fd, ctx);
if (ret)
goto unwind;
local = frame->local;
stub = fop_fsetxattr_cbk_stub(frame, br_stub_fsetxattr_resume, 0, 0, NULL);
if (!stub) {
gf_msg(this->name, GF_LOG_ERROR, 0, BRS_MSG_STUB_ALLOC_FAILED,
"failed to allocate stub for fsetxattr fop (gfid: %s),"
" unwinding",
uuid_utoa(fd->inode->gfid));
goto cleanup_local;
}
(void)br_stub_perform_incversioning(this, frame, stub, fd, ctx);
return;
cleanup_local:
br_stub_cleanup_local(local);
br_stub_dealloc_local(local);
unwind:
frame->local = NULL;
STACK_UNWIND_STRICT(fsetxattr, frame, op_ret, op_errno, NULL);
}
/**
* This function only handles bad file identification. Instead of checking in
* fops like open, readv, writev whether the object is bad or not by doing
* getxattr calls, better to catch them when scrubber marks it as bad.
* So this callback is called only when the fsetxattr is sent by the scrubber
* to mark the object as bad.
*/
int
br_stub_fsetxattr_bad_object_cbk(call_frame_t *frame, void *cookie,
xlator_t *this, int32_t op_ret,
int32_t op_errno, dict_t *xdata)
{
br_stub_local_t *local = NULL;
int32_t ret = -1;
local = frame->local;
frame->local = NULL;
if (op_ret < 0)
goto unwind;
/*
* What to do if marking the object as bad fails? (i.e. in memory
* marking within the inode context. If we are here means fsetxattr
* fop has succeeded on disk and the bad object xattr has been set).
* We can return failure to scruber, but there is nothing the scrubber
* can do with it (it might assume that the on disk setxattr itself has
* failed). The main purpose of this operation is to help identify the
* bad object by checking the inode context itself (thus avoiding the
* necessity of doing a getxattr fop on the disk).
*
* So as of now, success itself is being returned even though inode
* context set operation fails.
* In future if there is any change in the policy which can handle this,
* then appropriate response should be sent (i.e. success or error).
*/
ret = br_stub_mark_object_bad(this, local->u.context.inode);
if (ret)
gf_msg(this->name, GF_LOG_ERROR, 0, BRS_MSG_BAD_OBJ_MARK_FAIL,
"failed to mark object %s as bad",
uuid_utoa(local->u.context.inode->gfid));
ret = br_stub_add(this, local->u.context.inode->gfid);
unwind:
STACK_UNWIND_STRICT(fsetxattr, frame, op_ret, op_errno, xdata);
br_stub_cleanup_local(local);
br_stub_dealloc_local(local);
return 0;
}
static int32_t
br_stub_handle_bad_object_key(call_frame_t *frame, xlator_t *this, fd_t *fd,
dict_t *dict, int flags, dict_t *xdata)
{
br_stub_local_t *local = NULL;
int32_t op_ret = -1;
int32_t op_errno = EINVAL;
if (frame->root->pid != GF_CLIENT_PID_SCRUB) {
gf_msg(this->name, GF_LOG_ERROR, 0, BRS_MSG_NON_SCRUB_BAD_OBJ_MARK,
"bad object marking "
"on %s is not from the scrubber",
uuid_utoa(fd->inode->gfid));
goto unwind;
}
local = br_stub_alloc_local(this);
if (!local) {
gf_msg(this->name, GF_LOG_ERROR, 0, BRS_MSG_NO_MEMORY,
"failed to allocate memory for fsetxattr on %s",
uuid_utoa(fd->inode->gfid));
op_ret = -1;
op_errno = ENOMEM;
goto unwind;
}
br_stub_fill_local(local, NULL, fd, fd->inode, fd->inode->gfid,
BR_STUB_NO_VERSIONING, 0);
frame->local = local;
STACK_WIND(frame, br_stub_fsetxattr_bad_object_cbk, FIRST_CHILD(this),
FIRST_CHILD(this)->fops->fsetxattr, fd, dict, flags, xdata);
return 0;
unwind:
STACK_UNWIND_STRICT(fsetxattr, frame, op_ret, op_errno, NULL);
return 0;
}
/**
* As of now, versioning is done by the stub (though as a setxattr
* operation) as part of inode modification operations such as writev,
* truncate, ftruncate. And signing is done by BitD by a fsetxattr call.
* So any kind of setxattr coming on the versioning and the signing xattr is
* not allowed (i.e. BITROT_CURRENT_VERSION_KEY and BITROT_SIGNING_VERSION_KEY).
* In future if BitD/scrubber are allowed to change the versioning
* xattrs (though I cannot see a reason for it as of now), then the below
* function can be modified to block setxattr on version for only applications.
*
* NOTE: BitD sends sign request on GLUSTERFS_SET_OBJECT_SIGNATURE key.
* BITROT_SIGNING_VERSION_KEY is the xattr used to save the signature.
*
*/
static int32_t
br_stub_handle_internal_xattr(call_frame_t *frame, xlator_t *this, fd_t *fd,
char *key)
{
int32_t op_ret = -1;
int32_t op_errno = EINVAL;
gf_msg(this->name, GF_LOG_ERROR, 0, BRS_MSG_SET_INTERNAL_XATTR,
"setxattr called"
" on the internal xattr %s for inode %s",
key, uuid_utoa(fd->inode->gfid));
STACK_UNWIND_STRICT(fsetxattr, frame, op_ret, op_errno, NULL);
return 0;
}
static void
br_stub_dump_xattr(xlator_t *this, dict_t *dict, int *op_errno)
{
char *format = "(%s:%s)";
char *dump = NULL;
dump = GF_CALLOC(1, BR_STUB_DUMP_STR_SIZE, gf_br_stub_mt_misc);
if (!dump) {
*op_errno = ENOMEM;
goto out;
}
dict_dump_to_str(dict, dump, BR_STUB_DUMP_STR_SIZE, format);
gf_msg(this->name, GF_LOG_ERROR, 0, BRS_MSG_SET_INTERNAL_XATTR,
"fsetxattr called on "
"internal xattr %s",
dump);
out:
if (dump) {
GF_FREE(dump);
}
return;
}
int
br_stub_fsetxattr(call_frame_t *frame, xlator_t *this, fd_t *fd, dict_t *dict,
int flags, dict_t *xdata)
{
int32_t ret = 0;
uint32_t val = 0;
br_isignature_t *sign = NULL;
br_stub_private_t *priv = NULL;
int32_t op_ret = -1;
int32_t op_errno = EINVAL;
priv = this->private;
if ((frame->root->pid != GF_CLIENT_PID_BITD &&
frame->root->pid != GF_CLIENT_PID_SCRUB) &&
br_stub_internal_xattr(dict)) {
br_stub_dump_xattr(this, dict, &op_errno);
goto unwind;
}
if (!priv->do_versioning)
goto wind;
if (!IA_ISREG(fd->inode->ia_type))
goto wind;
/* object signature request */
ret = dict_get_bin(dict, GLUSTERFS_SET_OBJECT_SIGNATURE, (void **)&sign);
if (!ret) {
gf_msg_debug(this->name, 0, "got SIGNATURE request on %s",
uuid_utoa(fd->inode->gfid));
br_stub_handle_object_signature(frame, this, fd, dict, sign, xdata);
goto done;
}
/* signing xattr */
if (dict_get(dict, BITROT_SIGNING_VERSION_KEY)) {
br_stub_handle_internal_xattr(frame, this, fd,
BITROT_SIGNING_VERSION_KEY);
goto done;
}
/* version xattr */
if (dict_get(dict, BITROT_CURRENT_VERSION_KEY)) {
br_stub_handle_internal_xattr(frame, this, fd,
BITROT_CURRENT_VERSION_KEY);
goto done;
}
if (dict_get(dict, GLUSTERFS_GET_OBJECT_SIGNATURE)) {
br_stub_handle_internal_xattr(frame, this, fd,
GLUSTERFS_GET_OBJECT_SIGNATURE);
goto done;
}
/* object reopen request */
ret = dict_get_uint32(dict, BR_REOPEN_SIGN_HINT_KEY, &val);
if (!ret) {
br_stub_handle_object_reopen(frame, this, fd, val);
goto done;
}
/* handle bad object */
if (dict_get(dict, BITROT_OBJECT_BAD_KEY)) {
br_stub_handle_bad_object_key(frame, this, fd, dict, flags, xdata);
goto done;
}
wind:
STACK_WIND(frame, default_fsetxattr_cbk, FIRST_CHILD(this),
FIRST_CHILD(this)->fops->fsetxattr, fd, dict, flags, xdata);
return 0;
unwind:
STACK_UNWIND_STRICT(fsetxattr, frame, op_ret, op_errno, NULL);
done:
return 0;
}
/**
* Currently BitD and scrubber are doing fsetxattr to either sign the object
* or to mark it as bad. Hence setxattr on any of those keys is denied directly
* without checking from where the fop is coming.
* Later, if BitD or Scrubber does setxattr of those keys, then appropriate
* check has to be added below.
*/
int
br_stub_setxattr(call_frame_t *frame, xlator_t *this, loc_t *loc, dict_t *dict,
int flags, dict_t *xdata)
{
int32_t op_ret = -1;
int32_t op_errno = EINVAL;
if (br_stub_internal_xattr(dict)) {
br_stub_dump_xattr(this, dict, &op_errno);
goto unwind;
}
STACK_WIND_TAIL(frame, FIRST_CHILD(this), FIRST_CHILD(this)->fops->setxattr,
loc, dict, flags, xdata);
return 0;
unwind:
STACK_UNWIND_STRICT(setxattr, frame, op_ret, op_errno, NULL);
return 0;
}
/** }}} */
/** {{{ */
/* {f}removexattr() */
int32_t
br_stub_removexattr(call_frame_t *frame, xlator_t *this, loc_t *loc,
const char *name, dict_t *xdata)
{
int32_t op_ret = -1;
int32_t op_errno = EINVAL;
if (!strcmp(BITROT_OBJECT_BAD_KEY, name) ||
!strcmp(BITROT_SIGNING_VERSION_KEY, name) ||
!strcmp(BITROT_CURRENT_VERSION_KEY, name)) {
gf_msg(this->name, GF_LOG_WARNING, 0, BRS_MSG_REMOVE_INTERNAL_XATTR,
"removexattr called"
" on internal xattr %s for file %s",
name, loc->path);
goto unwind;
}
STACK_WIND_TAIL(frame, FIRST_CHILD(this),
FIRST_CHILD(this)->fops->removexattr, loc, name, xdata);
return 0;
unwind:
STACK_UNWIND_STRICT(removexattr, frame, op_ret, op_errno, NULL);
return 0;
}
int32_t
br_stub_fremovexattr(call_frame_t *frame, xlator_t *this, fd_t *fd,
const char *name, dict_t *xdata)
{
int32_t op_ret = -1;
int32_t op_errno = EINVAL;
if (!strcmp(BITROT_OBJECT_BAD_KEY, name) ||
!strcmp(BITROT_SIGNING_VERSION_KEY, name) ||
!strcmp(BITROT_CURRENT_VERSION_KEY, name)) {
gf_msg(this->name, GF_LOG_WARNING, 0, BRS_MSG_REMOVE_INTERNAL_XATTR,
"removexattr called"
" on internal xattr %s for inode %s",
name, uuid_utoa(fd->inode->gfid));
goto unwind;
}
STACK_WIND_TAIL(frame, FIRST_CHILD(this),
FIRST_CHILD(this)->fops->fremovexattr, fd, name, xdata);
return 0;
unwind:
STACK_UNWIND_STRICT(fremovexattr, frame, op_ret, op_errno, NULL);
return 0;
}
/** }}} */
/** {{{ */
/* {f}getxattr() */
int
br_stub_listxattr_cbk(call_frame_t *frame, void *cookie, xlator_t *this,
int op_ret, int op_errno, dict_t *xattr, dict_t *xdata)
{
if (op_ret < 0)
goto unwind;
br_stub_remove_vxattrs(xattr, _gf_true);
unwind:
STACK_UNWIND_STRICT(getxattr, frame, op_ret, op_errno, xattr, xdata);
return 0;
}
/**
* ONE SHOT CRAWLER from BitD signs the objects that it encounters while
* crawling, if the object is identified as stale by the stub. Stub follows
* the below logic to mark an object as stale or not.
* If the ongoing version and the signed_version match, then the object is not
* stale. Just return. Otherwise if they does not match, then it means one
* of the below things.
* 1) If the inode does not need write back of the version and the sign state is
* is NORMAL, then some active i/o is going on the object. So skip it.
* A notification will be sent to trigger the sign once the release is
* received on the object.
* 2) If inode does not need writeback of the version and the sign state is
* either reopen wait or quick sign, then it means:
* A) BitD restarted and it is not sure whether the object it encountered
* while crawling is in its timer wheel or not. Since there is no way to
* scan the timer wheel as of now, ONE SHOT CRAWLER just goes ahead and
* signs the object. Since the inode does not need writeback, version will
* not be incremented and directly the object will be signed.
* 3) If the inode needs writeback, then it means the inode was forgotten after
* the versioning and it has to be signed now.
*
* This is the algorithm followed:
* if (ongoing_version == signed_version); then
* object_is_not_stale;
* return;
* else; then
* if (!inode_needs_writeback && inode_sign_state != NORMAL); then
* object_is_stale;
* if (inode_needs_writeback); then
* object_is_stale;
*
* For SCRUBBER, no need to check for the sign state and inode writeback.
* If the ondisk ongoingversion and the ondisk signed version does not match,
* then treat the object as stale.
*/
char
br_stub_is_object_stale(xlator_t *this, call_frame_t *frame, inode_t *inode,
br_version_t *obuf, br_signature_t *sbuf)
{
uint64_t ctx_addr = 0;
br_stub_inode_ctx_t *ctx = NULL;
int32_t ret = -1;
char stale = 0;
if (obuf->ongoingversion == sbuf->signedversion)
goto out;
if (frame->root->pid == GF_CLIENT_PID_SCRUB) {
stale = 1;
goto out;
}
ret = br_stub_get_inode_ctx(this, inode, &ctx_addr);
if (ret) {
gf_msg(this->name, GF_LOG_ERROR, 0, BRS_MSG_GET_INODE_CONTEXT_FAILED,
"failed to get the "
"inode context for %s",
uuid_utoa(inode->gfid));
goto out;
}
ctx = (br_stub_inode_ctx_t *)(long)ctx_addr;
LOCK(&inode->lock);
{
if ((!__br_stub_is_inode_dirty(ctx) &&
ctx->info_sign != BR_SIGN_NORMAL) ||
__br_stub_is_inode_dirty(ctx))
stale = 1;
}
UNLOCK(&inode->lock);
out:
return stale;
}
int
br_stub_getxattr_cbk(call_frame_t *frame, void *cookie, xlator_t *this,
int op_ret, int op_errno, dict_t *xattr, dict_t *xdata)
{
int32_t ret = 0;
size_t totallen = 0;
size_t signaturelen = 0;
br_stub_private_t *priv = NULL;
br_version_t *obuf = NULL;
br_signature_t *sbuf = NULL;
br_isignature_out_t *sign = NULL;
br_vxattr_status_t status;
br_stub_local_t *local = NULL;
inode_t *inode = NULL;
gf_boolean_t bad_object = _gf_false;
gf_boolean_t ver_enabled = _gf_false;
BR_STUB_VER_ENABLED_IN_CALLPATH(frame, ver_enabled);
priv = this->private;
if (op_ret < 0)
goto unwind;
BR_STUB_VER_COND_GOTO(priv, (!ver_enabled), delkeys);
if (cookie != (void *)BR_STUB_REQUEST_COOKIE)
goto unwind;
local = frame->local;
frame->local = NULL;
if (!local) {
op_ret = -1;
op_errno = EINVAL;
goto unwind;
}
inode = local->u.context.inode;
op_ret = -1;
status = br_version_xattr_state(xattr, &obuf, &sbuf, &bad_object);
op_errno = EIO;
if (bad_object)
goto delkeys;
op_errno = EINVAL;
if (status == BR_VXATTR_STATUS_INVALID)
goto delkeys;
op_errno = ENODATA;
if ((status == BR_VXATTR_STATUS_MISSING) ||
(status == BR_VXATTR_STATUS_UNSIGNED))
goto delkeys;
/**
* okay.. we have enough information to satisfy the request,
* namely: version and signing extended attribute. what's
* pending is the signature length -- that's figured out
* indirectly via the size of the _whole_ xattr and the
* on-disk signing xattr header size.
*/
op_errno = EINVAL;
ret = dict_get_uint32(xattr, BITROT_SIGNING_XATTR_SIZE_KEY,
(uint32_t *)&signaturelen);
if (ret)
goto delkeys;
signaturelen -= sizeof(br_signature_t);
totallen = sizeof(br_isignature_out_t) + signaturelen;
op_errno = ENOMEM;
sign = GF_CALLOC(1, totallen, gf_br_stub_mt_signature_t);
if (!sign)
goto delkeys;
sign->time[0] = obuf->timebuf[0];
sign->time[1] = obuf->timebuf[1];
/* Object's dirty state & current signed version */
sign->version = sbuf->signedversion;
sign->stale = br_stub_is_object_stale(this, frame, inode, obuf, sbuf);
/* Object's signature */
sign->signaturelen = signaturelen;
sign->signaturetype = sbuf->signaturetype;
(void)memcpy(sign->signature, sbuf->signature, signaturelen);
op_errno = EINVAL;
ret = dict_set_bin(xattr, GLUSTERFS_GET_OBJECT_SIGNATURE, (void *)sign,
totallen);
if (ret < 0) {
GF_FREE(sign);
goto delkeys;
}
op_errno = 0;
op_ret = totallen;
delkeys:
br_stub_remove_vxattrs(xattr, _gf_true);
unwind:
STACK_UNWIND_STRICT(getxattr, frame, op_ret, op_errno, xattr, xdata);
br_stub_cleanup_local(local);
br_stub_dealloc_local(local);
return 0;
}
static void
br_stub_send_stub_init_time(call_frame_t *frame, xlator_t *this)
{
int op_ret = 0;
int op_errno = 0;
dict_t *xattr = NULL;
br_stub_init_t stub = {
{
0,
},
};
br_stub_private_t *priv = NULL;
priv = this->private;
xattr = dict_new();
if (!xattr) {
op_ret = -1;
op_errno = ENOMEM;
goto unwind;
}
stub.timebuf[0] = priv->boot[0];
stub.timebuf[1] = priv->boot[1];
memcpy(stub.export, priv->export, strlen(priv->export) + 1);
op_ret = dict_set_static_bin(xattr, GLUSTERFS_GET_BR_STUB_INIT_TIME,
(void *)&stub, sizeof(br_stub_init_t));
if (op_ret < 0) {
op_errno = EINVAL;
goto unwind;
}
op_ret = sizeof(br_stub_init_t);
unwind:
STACK_UNWIND_STRICT(getxattr, frame, op_ret, op_errno, xattr, NULL);
if (xattr)
dict_unref(xattr);
}
int
br_stub_getxattr(call_frame_t *frame, xlator_t *this, loc_t *loc,
const char *name, dict_t *xdata)
{
void *cookie = NULL;
uuid_t rootgfid = {
0,
};
fop_getxattr_cbk_t cbk = br_stub_getxattr_cbk;
int32_t op_ret = -1;
int32_t op_errno = EINVAL;
br_stub_local_t *local = NULL;
br_stub_private_t *priv = NULL;
GF_VALIDATE_OR_GOTO("bit-rot-stub", this, unwind);
GF_VALIDATE_OR_GOTO(this->name, loc, unwind);
GF_VALIDATE_OR_GOTO(this->name, this->private, unwind);
GF_VALIDATE_OR_GOTO(this->name, loc->inode, unwind);
rootgfid[15] = 1;
if (!name) {
cbk = br_stub_listxattr_cbk;
goto wind;
}
if (br_stub_is_internal_xattr(name))
goto unwind;
priv = this->private;
BR_STUB_VER_NOT_ACTIVE_THEN_GOTO(frame, priv, wind);
/**
* If xattr is node-uuid and the inode is marked bad, return EIO.
* Returning EIO would result in AFR to choose correct node-uuid
* corresponding to the subvolume * where the good copy of the
* file resides.
*/
if (IA_ISREG(loc->inode->ia_type) && XATTR_IS_NODE_UUID(name) &&
br_stub_check_bad_object(this, loc->inode, &op_ret, &op_errno)) {
goto unwind;
}
/**
* this special extended attribute is allowed only on root
*/
if (name &&
(strncmp(name, GLUSTERFS_GET_BR_STUB_INIT_TIME,
sizeof(GLUSTERFS_GET_BR_STUB_INIT_TIME) - 1) == 0) &&
((gf_uuid_compare(loc->gfid, rootgfid) == 0) ||
(gf_uuid_compare(loc->inode->gfid, rootgfid) == 0))) {
BR_STUB_RESET_LOCAL_NULL(frame);
br_stub_send_stub_init_time(frame, this);
return 0;
}
if (!IA_ISREG(loc->inode->ia_type))
goto wind;
if (name && (strncmp(name, GLUSTERFS_GET_OBJECT_SIGNATURE,
sizeof(GLUSTERFS_GET_OBJECT_SIGNATURE) - 1) == 0)) {
cookie = (void *)BR_STUB_REQUEST_COOKIE;
local = br_stub_alloc_local(this);
if (!local) {
op_ret = -1;
op_errno = ENOMEM;
goto unwind;
}
br_stub_fill_local(local, NULL, NULL, loc->inode, loc->inode->gfid,
BR_STUB_NO_VERSIONING, 0);
frame->local = local;
}
wind:
STACK_WIND_COOKIE(frame, cbk, cookie, FIRST_CHILD(this),
FIRST_CHILD(this)->fops->getxattr, loc, name, xdata);
return 0;
unwind:
BR_STUB_RESET_LOCAL_NULL(frame);
STACK_UNWIND_STRICT(getxattr, frame, op_ret, op_errno, NULL, NULL);
return 0;
}
int
br_stub_fgetxattr(call_frame_t *frame, xlator_t *this, fd_t *fd,
const char *name, dict_t *xdata)
{
void *cookie = NULL;
uuid_t rootgfid = {
0,
};
fop_fgetxattr_cbk_t cbk = br_stub_getxattr_cbk;
int32_t op_ret = -1;
int32_t op_errno = EINVAL;
br_stub_local_t *local = NULL;
br_stub_private_t *priv = NULL;
rootgfid[15] = 1;
priv = this->private;
if (!name) {
cbk = br_stub_listxattr_cbk;
goto wind;
}
if (br_stub_is_internal_xattr(name))
goto unwind;
BR_STUB_VER_NOT_ACTIVE_THEN_GOTO(frame, priv, wind);
/**
* If xattr is node-uuid and the inode is marked bad, return EIO.
* Returning EIO would result in AFR to choose correct node-uuid
* corresponding to the subvolume * where the good copy of the
* file resides.
*/
if (IA_ISREG(fd->inode->ia_type) && XATTR_IS_NODE_UUID(name) &&
br_stub_check_bad_object(this, fd->inode, &op_ret, &op_errno)) {
goto unwind;
}
/**
* this special extended attribute is allowed only on root
*/
if (name &&
(strncmp(name, GLUSTERFS_GET_BR_STUB_INIT_TIME,
sizeof(GLUSTERFS_GET_BR_STUB_INIT_TIME) - 1) == 0) &&
(gf_uuid_compare(fd->inode->gfid, rootgfid) == 0)) {
BR_STUB_RESET_LOCAL_NULL(frame);
br_stub_send_stub_init_time(frame, this);
return 0;
}
if (!IA_ISREG(fd->inode->ia_type))
goto wind;
if (name && (strncmp(name, GLUSTERFS_GET_OBJECT_SIGNATURE,
sizeof(GLUSTERFS_GET_OBJECT_SIGNATURE) - 1) == 0)) {
cookie = (void *)BR_STUB_REQUEST_COOKIE;
local = br_stub_alloc_local(this);
if (!local) {
op_ret = -1;
op_errno = ENOMEM;
goto unwind;
}
br_stub_fill_local(local, NULL, fd, fd->inode, fd->inode->gfid,
BR_STUB_NO_VERSIONING, 0);
frame->local = local;
}
wind:
STACK_WIND_COOKIE(frame, cbk, cookie, FIRST_CHILD(this),
FIRST_CHILD(this)->fops->fgetxattr, fd, name, xdata);
return 0;
unwind:
BR_STUB_RESET_LOCAL_NULL(frame);
STACK_UNWIND_STRICT(fgetxattr, frame, op_ret, op_errno, NULL, NULL);
return 0;
}
int32_t
br_stub_readv(call_frame_t *frame, xlator_t *this, fd_t *fd, size_t size,
off_t offset, uint32_t flags, dict_t *xdata)
{
int32_t op_ret = -1;
int32_t op_errno = EINVAL;
int32_t ret = -1;
br_stub_private_t *priv = NULL;
GF_VALIDATE_OR_GOTO("bit-rot-stub", this, unwind);
GF_VALIDATE_OR_GOTO(this->name, frame, unwind);
GF_VALIDATE_OR_GOTO(this->name, this->private, unwind);
GF_VALIDATE_OR_GOTO(this->name, fd, unwind);
GF_VALIDATE_OR_GOTO(this->name, fd->inode, unwind);
priv = this->private;
if (!priv->do_versioning)
goto wind;
ret = br_stub_check_bad_object(this, fd->inode, &op_ret, &op_errno);
if (ret)
goto unwind;
wind:
STACK_WIND_TAIL(frame, FIRST_CHILD(this), FIRST_CHILD(this)->fops->readv,
fd, size, offset, flags, xdata);
return 0;
unwind:
STACK_UNWIND_STRICT(readv, frame, op_ret, op_errno, NULL, 0, NULL, NULL,
NULL);
return 0;
}
/**
* The first write response on the first fd in the list of fds will set
* the flag to indicate that the inode is modified. The subsequent write
* respnses coming on either the first fd or some other fd will not change
* the fd. The inode-modified flag is unset only upon release of all the
* fds.
*/
int32_t
br_stub_writev_cbk(call_frame_t *frame, void *cookie, xlator_t *this,
int32_t op_ret, int32_t op_errno, struct iatt *prebuf,
struct iatt *postbuf, dict_t *xdata)
{
int32_t ret = 0;
br_stub_local_t *local = NULL;
local = frame->local;
frame->local = NULL;
if (op_ret < 0)
goto unwind;
ret = br_stub_mark_inode_modified(this, local);
if (ret) {
op_ret = -1;
op_errno = EINVAL;
}
unwind:
STACK_UNWIND_STRICT(writev, frame, op_ret, op_errno, prebuf, postbuf,
xdata);
br_stub_cleanup_local(local);
br_stub_dealloc_local(local);
return 0;
}
int32_t
br_stub_writev_resume(call_frame_t *frame, xlator_t *this, fd_t *fd,
struct iovec *vector, int32_t count, off_t offset,
uint32_t flags, struct iobref *iobref, dict_t *xdata)
{
STACK_WIND(frame, br_stub_writev_cbk, FIRST_CHILD(this),
FIRST_CHILD(this)->fops->writev, fd, vector, count, offset,
flags, iobref, xdata);
return 0;
}
/**
* This is probably the most crucial part about the whole versioning thing.
* There's absolutely no differentiation as such between an anonymous fd
* and a regular fd except the fd context initialization. Object versioning
* is performed when the inode is dirty. Parallel write operations are no
* special with each write performing object versioning followed by marking
* the inode as non-dirty (synced). This is followed by the actual operation
* (writev() in this case) which on a success marks the inode as modified.
* This prevents signing of objects that have not been modified.
*/
int32_t
br_stub_writev(call_frame_t *frame, xlator_t *this, fd_t *fd,
struct iovec *vector, int32_t count, off_t offset,
uint32_t flags, struct iobref *iobref, dict_t *xdata)
{
call_stub_t *stub = NULL;
int32_t op_ret = -1;
int32_t op_errno = EINVAL;
gf_boolean_t inc_version = _gf_false;
gf_boolean_t modified = _gf_false;
br_stub_inode_ctx_t *ctx = NULL;
int32_t ret = -1;
fop_writev_cbk_t cbk = default_writev_cbk;
br_stub_local_t *local = NULL;
br_stub_private_t *priv = NULL;
GF_VALIDATE_OR_GOTO("bit-rot-stub", this, unwind);
GF_VALIDATE_OR_GOTO(this->name, this->private, unwind);
GF_VALIDATE_OR_GOTO(this->name, frame, unwind);
GF_VALIDATE_OR_GOTO(this->name, fd, unwind);
priv = this->private;
if (!priv->do_versioning)
goto wind;
ret = br_stub_need_versioning(this, fd, &inc_version, &modified, &ctx);
if (ret)
goto unwind;
ret = br_stub_check_bad_object(this, fd->inode, &op_ret, &op_errno);
if (ret)
goto unwind;
/**
* The inode is not dirty and also witnessed at least one successful
* modification operation. Therefore, subsequent operations need not
* perform any special tracking.
*/
if (!inc_version && modified)
goto wind;
/**
* okay.. so, either the inode needs versioning or the modification
* needs to be tracked. ->cbk is set to the appropriate callback
* routine for this.
* NOTE: ->local needs to be deallocated on failures from here on.
*/
ret = br_stub_versioning_prep(frame, this, fd, ctx);
if (ret)
goto unwind;
local = frame->local;
if (!inc_version) {
br_stub_fill_local(local, NULL, fd, fd->inode, fd->inode->gfid,
BR_STUB_NO_VERSIONING, 0);
cbk = br_stub_writev_cbk;
goto wind;
}
stub = fop_writev_stub(frame, br_stub_writev_resume, fd, vector, count,
offset, flags, iobref, xdata);
if (!stub) {
gf_msg(this->name, GF_LOG_ERROR, 0, BRS_MSG_STUB_ALLOC_FAILED,
"failed to allocate stub for write fop (gfid: %s), "
"unwinding",
uuid_utoa(fd->inode->gfid));
goto cleanup_local;
}
/* Perform Versioning */
return br_stub_perform_incversioning(this, frame, stub, fd, ctx);
wind:
STACK_WIND(frame, cbk, FIRST_CHILD(this), FIRST_CHILD(this)->fops->writev,
fd, vector, count, offset, flags, iobref, xdata);
return 0;
cleanup_local:
br_stub_cleanup_local(local);
br_stub_dealloc_local(local);
unwind:
frame->local = NULL;
STACK_UNWIND_STRICT(writev, frame, op_ret, op_errno, NULL, NULL, NULL);
return 0;
}
int32_t
br_stub_ftruncate_cbk(call_frame_t *frame, void *cookie, xlator_t *this,
int32_t op_ret, int32_t op_errno, struct iatt *prebuf,
struct iatt *postbuf, dict_t *xdata)
{
int32_t ret = -1;
br_stub_local_t *local = NULL;
local = frame->local;
frame->local = NULL;
if (op_ret < 0)
goto unwind;
ret = br_stub_mark_inode_modified(this, local);
if (ret) {
op_ret = -1;
op_errno = EINVAL;
}
unwind:
STACK_UNWIND_STRICT(ftruncate, frame, op_ret, op_errno, prebuf, postbuf,
xdata);
br_stub_cleanup_local(local);
br_stub_dealloc_local(local);
return 0;
}
int32_t
br_stub_ftruncate_resume(call_frame_t *frame, xlator_t *this, fd_t *fd,
off_t offset, dict_t *xdata)
{
STACK_WIND(frame, br_stub_ftruncate_cbk, FIRST_CHILD(this),
FIRST_CHILD(this)->fops->ftruncate, fd, offset, xdata);
return 0;
}
/* c.f. br_stub_writev() for explanation */
int32_t
br_stub_ftruncate(call_frame_t *frame, xlator_t *this, fd_t *fd, off_t offset,
dict_t *xdata)
{
br_stub_local_t *local = NULL;
call_stub_t *stub = NULL;
int32_t op_ret = -1;
int32_t op_errno = EINVAL;
gf_boolean_t inc_version = _gf_false;
gf_boolean_t modified = _gf_false;
br_stub_inode_ctx_t *ctx = NULL;
int32_t ret = -1;
fop_ftruncate_cbk_t cbk = default_ftruncate_cbk;
br_stub_private_t *priv = NULL;
GF_VALIDATE_OR_GOTO("bit-rot-stub", this, unwind);
GF_VALIDATE_OR_GOTO(this->name, this->private, unwind);
GF_VALIDATE_OR_GOTO(this->name, frame, unwind);
GF_VALIDATE_OR_GOTO(this->name, fd, unwind);
priv = this->private;
if (!priv->do_versioning)
goto wind;
ret = br_stub_need_versioning(this, fd, &inc_version, &modified, &ctx);
if (ret)
goto unwind;
ret = br_stub_check_bad_object(this, fd->inode, &op_ret, &op_errno);
if (ret)
goto unwind;
if (!inc_version && modified)
goto wind;
ret = br_stub_versioning_prep(frame, this, fd, ctx);
if (ret)
goto unwind;
local = frame->local;
if (!inc_version) {
br_stub_fill_local(local, NULL, fd, fd->inode, fd->inode->gfid,
BR_STUB_NO_VERSIONING, 0);
cbk = br_stub_ftruncate_cbk;
goto wind;
}
stub = fop_ftruncate_stub(frame, br_stub_ftruncate_resume, fd, offset,
xdata);
if (!stub) {
gf_msg(this->name, GF_LOG_ERROR, 0, BRS_MSG_STUB_ALLOC_FAILED,
"failed to allocate stub for ftruncate fop (gfid: %s),"
" unwinding",
uuid_utoa(fd->inode->gfid));
goto cleanup_local;
}
return br_stub_perform_incversioning(this, frame, stub, fd, ctx);
wind:
STACK_WIND(frame, cbk, FIRST_CHILD(this),
FIRST_CHILD(this)->fops->ftruncate, fd, offset, xdata);
return 0;
cleanup_local:
br_stub_cleanup_local(local);
br_stub_dealloc_local(local);
unwind:
frame->local = NULL;
STACK_UNWIND_STRICT(ftruncate, frame, op_ret, op_errno, NULL, NULL, NULL);
return 0;
}
int32_t
br_stub_truncate_cbk(call_frame_t *frame, void *cookie, xlator_t *this,
int32_t op_ret, int32_t op_errno, struct iatt *prebuf,
struct iatt *postbuf, dict_t *xdata)
{
int32_t ret = 0;
br_stub_local_t *local = NULL;
local = frame->local;
frame->local = NULL;
if (op_ret < 0)
goto unwind;
ret = br_stub_mark_inode_modified(this, local);
if (ret) {
op_ret = -1;
op_errno = EINVAL;
}
unwind:
STACK_UNWIND_STRICT(truncate, frame, op_ret, op_errno, prebuf, postbuf,
xdata);
br_stub_cleanup_local(local);
br_stub_dealloc_local(local);
return 0;
}
int32_t
br_stub_truncate_resume(call_frame_t *frame, xlator_t *this, loc_t *loc,
off_t offset, dict_t *xdata)
{
br_stub_local_t *local = frame->local;
fd_unref(local->u.context.fd);
STACK_WIND(frame, br_stub_ftruncate_cbk, FIRST_CHILD(this),
FIRST_CHILD(this)->fops->truncate, loc, offset, xdata);
return 0;
}
/**
* Bit-rot-stub depends heavily on the fd based operations to for doing
* versioning and sending notification. It starts tracking the operation
* upon getting first fd based modify operation by doing versioning and
* sends notification when last fd using which the inode was modified is
* released.
* But for truncate there is no fd and hence it becomes difficult to do
* the versioning and send notification. It is handled by doing versioning
* on an anonymous fd. The fd will be valid till the completion of the
* truncate call. It guarantees that release on this anonymous fd will happen
* after the truncate call and notification is sent after the truncate call.
*
* c.f. br_writev_cbk() for explanation
*/
int32_t
br_stub_truncate(call_frame_t *frame, xlator_t *this, loc_t *loc, off_t offset,
dict_t *xdata)
{
br_stub_local_t *local = NULL;
call_stub_t *stub = NULL;
int32_t op_ret = -1;
int32_t op_errno = EINVAL;
gf_boolean_t inc_version = _gf_false;
gf_boolean_t modified = _gf_false;
br_stub_inode_ctx_t *ctx = NULL;
int32_t ret = -1;
fd_t *fd = NULL;
fop_truncate_cbk_t cbk = default_truncate_cbk;
br_stub_private_t *priv = NULL;
GF_VALIDATE_OR_GOTO("bit-rot-stub", this, unwind);
GF_VALIDATE_OR_GOTO(this->name, this->private, unwind);
GF_VALIDATE_OR_GOTO(this->name, frame, unwind);
GF_VALIDATE_OR_GOTO(this->name, loc, unwind);
GF_VALIDATE_OR_GOTO(this->name, loc->inode, unwind);
priv = this->private;
if (!priv->do_versioning)
goto wind;
fd = fd_anonymous(loc->inode);
if (!fd) {
gf_msg(this->name, GF_LOG_ERROR, 0, BRS_MSG_CREATE_ANONYMOUS_FD_FAILED,
"failed to create "
"anonymous fd for the inode %s",
uuid_utoa(loc->inode->gfid));
goto unwind;
}
ret = br_stub_need_versioning(this, fd, &inc_version, &modified, &ctx);
if (ret)
goto cleanup_fd;
ret = br_stub_check_bad_object(this, fd->inode, &op_ret, &op_errno);
if (ret)
goto unwind;
if (!inc_version && modified)
goto wind;
ret = br_stub_versioning_prep(frame, this, fd, ctx);
if (ret)
goto cleanup_fd;
local = frame->local;
if (!inc_version) {
br_stub_fill_local(local, NULL, fd, fd->inode, fd->inode->gfid,
BR_STUB_NO_VERSIONING, 0);
cbk = br_stub_truncate_cbk;
goto wind;
}
stub = fop_truncate_stub(frame, br_stub_truncate_resume, loc, offset,
xdata);
if (!stub) {
gf_msg(this->name, GF_LOG_ERROR, 0, BRS_MSG_STUB_ALLOC_FAILED,
"failed to allocate stub for truncate fop (gfid: %s), "
"unwinding",
uuid_utoa(fd->inode->gfid));
goto cleanup_local;
}
return br_stub_perform_incversioning(this, frame, stub, fd, ctx);
wind:
STACK_WIND(frame, cbk, FIRST_CHILD(this), FIRST_CHILD(this)->fops->truncate,
loc, offset, xdata);
if (fd)
fd_unref(fd);
return 0;
cleanup_local:
br_stub_cleanup_local(local);
br_stub_dealloc_local(local);
cleanup_fd:
fd_unref(fd);
unwind:
frame->local = NULL;
STACK_UNWIND_STRICT(truncate, frame, op_ret, op_errno, NULL, NULL, NULL);
return 0;
}
/** }}} */
/** {{{ */
/* open() */
/**
* It's probably worth mentioning a bit about why some of the housekeeping
* work is done in open() call path, rather than the callback path.
* Two (or more) open()'s in parallel can race and lead to a situation
* where a release() gets triggered (possibly after a series of write()
* calls) when *other* open()'s have still not reached callback path
* thereby having an active fd on an inode that is in process of getting
* signed with the current version.
*
* Maintaining fd list in the call path ensures that a release() would
* not be triggered if an open() call races ahead (followed by a close())
* threby finding non-empty fd list.
*/
int
br_stub_open(call_frame_t *frame, xlator_t *this, loc_t *loc, int32_t flags,
fd_t *fd, dict_t *xdata)
{
int32_t ret = -1;
br_stub_inode_ctx_t *ctx = NULL;
uint64_t ctx_addr = 0;
int32_t op_ret = -1;
int32_t op_errno = EINVAL;
br_stub_private_t *priv = NULL;
unsigned long version = BITROT_DEFAULT_CURRENT_VERSION;
GF_VALIDATE_OR_GOTO("bit-rot-stub", this, unwind);
GF_VALIDATE_OR_GOTO(this->name, this->private, unwind);
GF_VALIDATE_OR_GOTO(this->name, loc, unwind);
GF_VALIDATE_OR_GOTO(this->name, fd, unwind);
GF_VALIDATE_OR_GOTO(this->name, fd->inode, unwind);
priv = this->private;
if (!priv->do_versioning)
goto wind;
ret = br_stub_get_inode_ctx(this, fd->inode, &ctx_addr);
if (ret) {
ret = br_stub_init_inode_versions(this, fd, fd->inode, version,
_gf_true, _gf_false, &ctx_addr);
if (ret) {
gf_msg(this->name, GF_LOG_ERROR, 0,
BRS_MSG_GET_INODE_CONTEXT_FAILED,
"failed to init the inode context for "
"the file %s (gfid: %s)",
loc->path, uuid_utoa(fd->inode->gfid));
goto unwind;
}
}
ctx = (br_stub_inode_ctx_t *)(long)ctx_addr;
ret = br_stub_check_bad_object(this, fd->inode, &op_ret, &op_errno);
if (ret)
goto unwind;
if (frame->root->pid == GF_CLIENT_PID_SCRUB)
goto wind;
if (flags == O_RDONLY)
goto wind;
ret = br_stub_add_fd_to_inode(this, fd, ctx);
if (ret) {
gf_msg(this->name, GF_LOG_ERROR, 0, BRS_MSG_ADD_FD_TO_LIST_FAILED,
"failed add fd to the list (gfid: %s)",
uuid_utoa(fd->inode->gfid));
goto unwind;
}
wind:
STACK_WIND(frame, default_open_cbk, FIRST_CHILD(this),
FIRST_CHILD(this)->fops->open, loc, flags, fd, xdata);
return 0;
unwind:
STACK_UNWIND_STRICT(open, frame, op_ret, op_errno, NULL, NULL);
return 0;
}
/** }}} */
/** {{{ */
/* creat() */
/**
* This routine registers a release callback for the given fd and adds the
* fd to the inode context fd tracking list.
*/
int32_t
br_stub_add_fd_to_inode(xlator_t *this, fd_t *fd, br_stub_inode_ctx_t *ctx)
{
int32_t ret = -1;
br_stub_fd_t *br_stub_fd = NULL;
ret = br_stub_require_release_call(this, fd, &br_stub_fd);
if (ret) {
gf_msg(this->name, GF_LOG_ERROR, 0, BRS_MSG_SET_FD_CONTEXT_FAILED,
"failed to set the fd "
"context for the file (gfid: %s)",
uuid_utoa(fd->inode->gfid));
goto out;
}
LOCK(&fd->inode->lock);
{
list_add_tail(&ctx->fd_list, &br_stub_fd->list);
}
UNLOCK(&fd->inode->lock);
ret = 0;
out:
return ret;
}
int
br_stub_create_cbk(call_frame_t *frame, void *cookie, xlator_t *this,
int op_ret, int op_errno, fd_t *fd, inode_t *inode,
struct iatt *stbuf, struct iatt *preparent,
struct iatt *postparent, dict_t *xdata)
{
int32_t ret = 0;
uint64_t ctx_addr = 0;
br_stub_inode_ctx_t *ctx = NULL;
unsigned long version = BITROT_DEFAULT_CURRENT_VERSION;
br_stub_private_t *priv = NULL;
priv = this->private;
if (op_ret < 0)
goto unwind;
if (!priv->do_versioning)
goto unwind;
ret = br_stub_get_inode_ctx(this, fd->inode, &ctx_addr);
if (ret < 0) {
ret = br_stub_init_inode_versions(this, fd, inode, version, _gf_true,
_gf_false, &ctx_addr);
if (ret) {
op_ret = -1;
op_errno = EINVAL;
}
} else {
ctx = (br_stub_inode_ctx_t *)(long)ctx_addr;
ret = br_stub_add_fd_to_inode(this, fd, ctx);
}
unwind:
STACK_UNWIND_STRICT(create, frame, op_ret, op_errno, fd, inode, stbuf,
preparent, postparent, xdata);
return 0;
}
int
br_stub_create(call_frame_t *frame, xlator_t *this, loc_t *loc, int32_t flags,
mode_t mode, mode_t umask, fd_t *fd, dict_t *xdata)
{
GF_VALIDATE_OR_GOTO("bit-rot-stub", this, unwind);
GF_VALIDATE_OR_GOTO(this->name, loc, unwind);
GF_VALIDATE_OR_GOTO(this->name, loc->inode, unwind);
GF_VALIDATE_OR_GOTO(this->name, fd, unwind);
GF_VALIDATE_OR_GOTO(this->name, fd->inode, unwind);
STACK_WIND(frame, br_stub_create_cbk, FIRST_CHILD(this),
FIRST_CHILD(this)->fops->create, loc, flags, mode, umask, fd,
xdata);
return 0;
unwind:
STACK_UNWIND_STRICT(create, frame, -1, EINVAL, NULL, NULL, NULL, NULL, NULL,
NULL);
return 0;
}
int
br_stub_mknod_cbk(call_frame_t *frame, void *cookie, xlator_t *this, int op_ret,
int op_errno, inode_t *inode, struct iatt *stbuf,
struct iatt *preparent, struct iatt *postparent,
dict_t *xdata)
{
int32_t ret = -1;
unsigned long version = BITROT_DEFAULT_CURRENT_VERSION;
br_stub_private_t *priv = NULL;
priv = this->private;
if (op_ret < 0)
goto unwind;
if (!priv->do_versioning)
goto unwind;
ret = br_stub_init_inode_versions(this, NULL, inode, version, _gf_true,
_gf_false, NULL);
/**
* Like lookup, if init_inode_versions fail, return EINVAL
*/
if (ret) {
op_ret = -1;
op_errno = EINVAL;
}
unwind:
STACK_UNWIND_STRICT(mknod, frame, op_ret, op_errno, inode, stbuf, preparent,
postparent, xdata);
return 0;
}
int
br_stub_mknod(call_frame_t *frame, xlator_t *this, loc_t *loc, mode_t mode,
dev_t dev, mode_t umask, dict_t *xdata)
{
GF_VALIDATE_OR_GOTO("bit-rot-stub", this, unwind);
GF_VALIDATE_OR_GOTO(this->name, loc, unwind);
GF_VALIDATE_OR_GOTO(this->name, loc->inode, unwind);
STACK_WIND(frame, br_stub_mknod_cbk, FIRST_CHILD(this),
FIRST_CHILD(this)->fops->mknod, loc, mode, dev, umask, xdata);
return 0;
unwind:
STACK_UNWIND_STRICT(mknod, frame, -1, EINVAL, NULL, NULL, NULL, NULL, NULL);
return 0;
}
/** }}} */
/**
* As of now, only lookup searches for bad object xattr and marks the
* object as bad in its inode context if the xattr is present. But there
* is a possibility that, at the time of the lookup the object was not
* marked bad (i.e. bad object xattr was not set), and later its marked
* as bad. In this case, object is not bad, so when a fop such as open or
* readv or writev comes on the object, the fop will be sent downward instead
* of sending as error upwards.
* The solution for this is to do a getxattr for the below list of fops.
* lookup, readdirp, open, readv, writev.
* But doing getxattr for each of the above fops might be costly.
* So another method followed is to catch the bad file marking by the scrubber
* and set that info within the object's inode context. In this way getxattr
* calls can be avoided and bad objects can be caught instantly. Fetching the
* xattr is needed only in lookups when there is a brick restart or inode
* forget.
*
* If the dict (@xattr) is NULL, then how should that be handled? Fail the
* lookup operation? Or let it continue with version being initialized to
* BITROT_DEFAULT_CURRENT_VERSION. But what if the version was different
* on disk (and also a right signature was there), but posix failed to
* successfully allocate the dict? Posix does not treat call back xdata
* creattion failure as the lookup failure.
*/
static int32_t
br_stub_lookup_version(xlator_t *this, uuid_t gfid, inode_t *inode,
dict_t *xattr)
{
unsigned long version = 0;
br_version_t *obuf = NULL;
br_signature_t *sbuf = NULL;
br_vxattr_status_t status;
gf_boolean_t bad_object = _gf_false;
/**
* versioning xattrs were requested from POSIX. if available, figure
* out the correct version to use in the inode context (start with
* the default version if unavailable). As of now versions are not
* persisted on-disk. The inode is marked dirty, so that the first
* operation (such as write(), etc..) triggers synchronization to
* disk.
*/
status = br_version_xattr_state(xattr, &obuf, &sbuf, &bad_object);
version = ((status == BR_VXATTR_STATUS_FULL) ||
(status == BR_VXATTR_STATUS_UNSIGNED))
? obuf->ongoingversion
: BITROT_DEFAULT_CURRENT_VERSION;
/**
* If signature is there, but version is not there then that status is
* is treated as INVALID. So in that case, we should not initialize the
* inode context with wrong version names etc.
*/
if (status == BR_VXATTR_STATUS_INVALID)
return -1;
return br_stub_init_inode_versions(this, NULL, inode, version, _gf_true,
bad_object, NULL);
}
/** {{{ */
int32_t
br_stub_opendir(call_frame_t *frame, xlator_t *this, loc_t *loc, fd_t *fd,
dict_t *xdata)
{
br_stub_private_t *priv = NULL;
br_stub_fd_t *fd_ctx = NULL;
int32_t op_ret = -1;
int32_t op_errno = EINVAL;
priv = this->private;
if (gf_uuid_compare(fd->inode->gfid, priv->bad_object_dir_gfid))
goto normal;
fd_ctx = br_stub_fd_new();
if (!fd_ctx) {
op_errno = ENOMEM;
goto unwind;
}
fd_ctx->bad_object.dir_eof = -1;
fd_ctx->bad_object.dir = sys_opendir(priv->stub_basepath);
if (!fd_ctx->bad_object.dir) {
op_errno = errno;
goto err_freectx;
}
op_ret = br_stub_fd_ctx_set(this, fd, fd_ctx);
if (!op_ret)
goto unwind;
sys_closedir(fd_ctx->bad_object.dir);
err_freectx:
GF_FREE(fd_ctx);
unwind:
STACK_UNWIND_STRICT(opendir, frame, op_ret, op_errno, fd, NULL);
return 0;
normal:
STACK_WIND(frame, default_opendir_cbk, FIRST_CHILD(this),
FIRST_CHILD(this)->fops->opendir, loc, fd, xdata);
return 0;
}
int32_t
br_stub_readdir(call_frame_t *frame, xlator_t *this, fd_t *fd, size_t size,
off_t off, dict_t *xdata)
{
call_stub_t *stub = NULL;
br_stub_private_t *priv = NULL;
priv = this->private;
if (!priv->do_versioning)
goto out;
if (gf_uuid_compare(fd->inode->gfid, priv->bad_object_dir_gfid))
goto out;
stub = fop_readdir_stub(frame, br_stub_readdir_wrapper, fd, size, off,
xdata);
if (!stub) {
STACK_UNWIND_STRICT(readdir, frame, -1, ENOMEM, NULL, NULL);
return 0;
}
br_stub_worker_enqueue(this, stub);
return 0;
out:
STACK_WIND(frame, default_readdir_cbk, FIRST_CHILD(this),
FIRST_CHILD(this)->fops->readdir, fd, size, off, xdata);
return 0;
}
int
br_stub_readdirp_cbk(call_frame_t *frame, void *cookie, xlator_t *this,
int op_ret, int op_errno, gf_dirent_t *entries,
dict_t *dict)
{
int32_t ret = 0;
uint64_t ctxaddr = 0;
gf_dirent_t *entry = NULL;
br_stub_private_t *priv = NULL;
gf_boolean_t ver_enabled = _gf_false;
BR_STUB_VER_ENABLED_IN_CALLPATH(frame, ver_enabled);
priv = this->private;
BR_STUB_VER_COND_GOTO(priv, (!ver_enabled), unwind);
if (op_ret < 0)
goto unwind;
list_for_each_entry(entry, &entries->list, list)
{
if ((strcmp(entry->d_name, ".") == 0) ||
(strcmp(entry->d_name, "..") == 0))
continue;
if (!IA_ISREG(entry->d_stat.ia_type))
continue;
/*
* Readdirp for most part is a bulk lookup for all the entries
* present in the directory being read. Ideally, for each
* entry, the handling should be similar to that of a lookup
* callback. But for now, just keeping this as it has been
* until now (which means, this comment has been added much
* later as part of a change that wanted to send the flag
* of true/false to br_stub_remove_vxattrs to indicate whether
* the bad-object xattr should be removed from the entry->dict
* or not). Until this change, the function br_stub_remove_vxattrs
* was just removing all the xattrs associated with bit-rot-stub
* (like version, bad-object, signature etc). But, there are
* scenarios where we only want to send bad-object xattr and not
* others. So this comment is part of that change which also
* mentions about another possible change that might be needed
* in future.
* But for now, adding _gf_true means functionally its same as
* what this function was doing before. Just remove all the stub
* related xattrs.
*/
ret = br_stub_get_inode_ctx(this, entry->inode, &ctxaddr);
if (ret < 0)
ctxaddr = 0;
if (ctxaddr) { /* already has the context */
br_stub_remove_vxattrs(entry->dict, _gf_true);
continue;
}
ret = br_stub_lookup_version(this, entry->inode->gfid, entry->inode,
entry->dict);
br_stub_remove_vxattrs(entry->dict, _gf_true);
if (ret) {
/**
* there's no per-file granularity support in case of
* failure. let's fail the entire request for now..
*/
break;
}
}
if (ret) {
op_ret = -1;
op_errno = EINVAL;
}
unwind:
STACK_UNWIND_STRICT(readdirp, frame, op_ret, op_errno, entries, dict);
return 0;
}
int
br_stub_readdirp(call_frame_t *frame, xlator_t *this, fd_t *fd, size_t size,
off_t offset, dict_t *dict)
{
int32_t ret = -1;
int op_errno = 0;
gf_boolean_t xref = _gf_false;
br_stub_private_t *priv = NULL;
priv = this->private;
BR_STUB_VER_NOT_ACTIVE_THEN_GOTO(frame, priv, wind);
op_errno = ENOMEM;
if (!dict) {
dict = dict_new();
if (!dict)
goto unwind;
} else {
dict = dict_ref(dict);
}
xref = _gf_true;
op_errno = EINVAL;
ret = dict_set_uint32(dict, BITROT_CURRENT_VERSION_KEY, 0);
if (ret)
goto unwind;
ret = dict_set_uint32(dict, BITROT_SIGNING_VERSION_KEY, 0);
if (ret)
goto unwind;
ret = dict_set_uint32(dict, BITROT_OBJECT_BAD_KEY, 0);
if (ret)
goto unwind;
wind:
STACK_WIND(frame, br_stub_readdirp_cbk, FIRST_CHILD(this),
FIRST_CHILD(this)->fops->readdirp, fd, size, offset, dict);
goto unref_dict;
unwind:
if (frame->local == (void *)0x1)
frame->local = NULL;
STACK_UNWIND_STRICT(readdirp, frame, -1, op_errno, NULL, NULL);
return 0;
unref_dict:
if (xref)
dict_unref(dict);
return 0;
}
/** }}} */
/** {{{ */
/* lookup() */
/**
* This function mainly handles the ENOENT error for the bad objects. Though
* br_stub_forget () handles removal of the link for the bad object from the
* quarantine directory, its better to handle it in lookup as well, where
* a failed lookup on a bad object with ENOENT, will trigger deletion of the
* link for the bad object from quarantine directory. So whoever comes first
* either forget () or lookup () will take care of removing the link.
*/
void
br_stub_handle_lookup_error(xlator_t *this, inode_t *inode, int32_t op_errno)
{
int32_t ret = -1;
uint64_t ctx_addr = 0;
br_stub_inode_ctx_t *ctx = NULL;
if (op_errno != ENOENT)
goto out;
if (!inode_is_linked(inode))
goto out;
ret = br_stub_get_inode_ctx(this, inode, &ctx_addr);
if (ret)
goto out;
ctx = (br_stub_inode_ctx_t *)(long)ctx_addr;
LOCK(&inode->lock);
{
if (__br_stub_is_bad_object(ctx))
(void)br_stub_del(this, inode->gfid);
}
UNLOCK(&inode->lock);
if (__br_stub_is_bad_object(ctx)) {
/* File is not present, might be deleted for recovery,
* del the bitrot inode context
*/
ctx_addr = 0;
inode_ctx_del(inode, this, &ctx_addr);
if (ctx_addr) {
ctx = (br_stub_inode_ctx_t *)(long)ctx_addr;
GF_FREE(ctx);
}
}
out:
return;
}
int
br_stub_lookup_cbk(call_frame_t *frame, void *cookie, xlator_t *this,
int op_ret, int op_errno, inode_t *inode, struct iatt *stbuf,
dict_t *xattr, struct iatt *postparent)
{
int32_t ret = 0;
br_stub_private_t *priv = NULL;
gf_boolean_t ver_enabled = _gf_false;
gf_boolean_t remove_bad_file_marker = _gf_true;
BR_STUB_VER_ENABLED_IN_CALLPATH(frame, ver_enabled);
priv = this->private;
if (op_ret < 0) {
(void)br_stub_handle_lookup_error(this, inode, op_errno);
/*
* If the lookup error is not ENOENT, then it is better
* to send the bad file marker to the higher layer (if
* it has been set)
*/
if (op_errno != ENOENT)
remove_bad_file_marker = _gf_false;
goto delkey;
}
BR_STUB_VER_COND_GOTO(priv, (!ver_enabled), delkey);
if (!IA_ISREG(stbuf->ia_type))
goto unwind;
/**
* If the object is bad, then "bad inode" marker has to be sent back
* in resoinse, for revalidated lookups as well. Some xlators such as
* quick-read might cache the data in revalidated lookup as fresh
* lookup would anyway have sent "bad inode" marker.
* In general send bad inode marker for every lookup operation on the
* bad object.
*/
if (cookie != (void *)BR_STUB_REQUEST_COOKIE) {
ret = br_stub_mark_xdata_bad_object(this, inode, xattr);
if (ret) {
op_ret = -1;
op_errno = EIO;
/*
* This flag ensures that in the label @delkey below,
* bad file marker is not removed from the dictinary,
* but other virtual xattrs (such as version, signature)
* are removed.
*/
remove_bad_file_marker = _gf_false;
}
goto delkey;
}
ret = br_stub_lookup_version(this, stbuf->ia_gfid, inode, xattr);
if (ret < 0) {
op_ret = -1;
op_errno = EINVAL;
goto delkey;
}
/**
* If the object is bad, send "bad inode" marker back in response
* for xlator(s) to act accordingly (such as quick-read, etc..)
*/
ret = br_stub_mark_xdata_bad_object(this, inode, xattr);
if (ret) {
/**
* aaha! bad object, but sorry we would not
* satisfy the request on allocation failures.
*/
op_ret = -1;
op_errno = EIO;
goto delkey;
}
delkey:
br_stub_remove_vxattrs(xattr, remove_bad_file_marker);
unwind:
STACK_UNWIND_STRICT(lookup, frame, op_ret, op_errno, inode, stbuf, xattr,
postparent);
return 0;
}
int
br_stub_lookup(call_frame_t *frame, xlator_t *this, loc_t *loc, dict_t *xdata)
{
int32_t ret = 0;
int op_errno = 0;
void *cookie = NULL;
uint64_t ctx_addr = 0;
gf_boolean_t xref = _gf_false;
br_stub_private_t *priv = NULL;
call_stub_t *stub = NULL;
GF_VALIDATE_OR_GOTO("bit-rot-stub", this, unwind);
GF_VALIDATE_OR_GOTO(this->name, loc, unwind);
GF_VALIDATE_OR_GOTO(this->name, loc->inode, unwind);
priv = this->private;
BR_STUB_VER_NOT_ACTIVE_THEN_GOTO(frame, priv, wind);
if (!gf_uuid_compare(loc->gfid, priv->bad_object_dir_gfid) ||
!gf_uuid_compare(loc->pargfid, priv->bad_object_dir_gfid)) {
stub = fop_lookup_stub(frame, br_stub_lookup_wrapper, loc, xdata);
if (!stub) {
op_errno = ENOMEM;
goto unwind;
}
br_stub_worker_enqueue(this, stub);
return 0;
}
ret = br_stub_get_inode_ctx(this, loc->inode, &ctx_addr);
if (ret < 0)
ctx_addr = 0;
if (ctx_addr != 0)
goto wind;
/**
* fresh lookup: request version keys from POSIX
*/
op_errno = ENOMEM;
if (!xdata) {
xdata = dict_new();
if (!xdata)
goto unwind;
} else {
xdata = dict_ref(xdata);
}
xref = _gf_true;
/**
* Requesting both xattrs provides a way of sanity checking the
* object. Anomaly checking is done in cbk by examining absence
* of either or both xattrs.
*/
op_errno = EINVAL;
ret = dict_set_uint32(xdata, BITROT_CURRENT_VERSION_KEY, 0);
if (ret)
goto unwind;
ret = dict_set_uint32(xdata, BITROT_SIGNING_VERSION_KEY, 0);
if (ret)
goto unwind;
ret = dict_set_uint32(xdata, BITROT_OBJECT_BAD_KEY, 0);
if (ret)
goto unwind;
cookie = (void *)BR_STUB_REQUEST_COOKIE;
wind:
STACK_WIND_COOKIE(frame, br_stub_lookup_cbk, cookie, FIRST_CHILD(this),
FIRST_CHILD(this)->fops->lookup, loc, xdata);
goto dealloc_dict;
unwind:
if (frame->local == (void *)0x1)
frame->local = NULL;
STACK_UNWIND_STRICT(lookup, frame, -1, op_errno, NULL, NULL, NULL, NULL);
dealloc_dict:
if (xref)
dict_unref(xdata);
return 0;
}
/** }}} */
/** {{{ */
/* stat */
int
br_stub_stat(call_frame_t *frame, xlator_t *this, loc_t *loc, dict_t *xdata)
{
int32_t ret = 0;
int32_t op_ret = -1;
int32_t op_errno = EINVAL;
br_stub_private_t *priv = NULL;
priv = this->private;
if (!priv->do_versioning)
goto wind;
if (!IA_ISREG(loc->inode->ia_type))
goto wind;
ret = br_stub_check_bad_object(this, loc->inode, &op_ret, &op_errno);
if (ret)
goto unwind;
wind:
STACK_WIND_TAIL(frame, FIRST_CHILD(this), FIRST_CHILD(this)->fops->stat,
loc, xdata);
return 0;
unwind:
STACK_UNWIND_STRICT(stat, frame, op_ret, op_errno, NULL, NULL);
return 0;
}
/* fstat */
int
br_stub_fstat(call_frame_t *frame, xlator_t *this, fd_t *fd, dict_t *xdata)
{
int32_t ret = 0;
int32_t op_ret = -1;
int32_t op_errno = EINVAL;
br_stub_private_t *priv = NULL;
priv = this->private;
if (!priv->do_versioning)
goto wind;
if (!IA_ISREG(fd->inode->ia_type))
goto wind;
ret = br_stub_check_bad_object(this, fd->inode, &op_ret, &op_errno);
if (ret)
goto unwind;
wind:
STACK_WIND_TAIL(frame, FIRST_CHILD(this), FIRST_CHILD(this)->fops->fstat,
fd, xdata);
return 0;
unwind:
STACK_UNWIND_STRICT(fstat, frame, op_ret, op_errno, NULL, NULL);
return 0;
}
/** }}} */
/** {{{ */
/* unlink() */
int
br_stub_unlink_cbk(call_frame_t *frame, void *cookie, xlator_t *this,
int32_t op_ret, int32_t op_errno, struct iatt *preparent,
struct iatt *postparent, dict_t *xdata)
{
br_stub_local_t *local = NULL;
inode_t *inode = NULL;
uint64_t ctx_addr = 0;
br_stub_inode_ctx_t *ctx = NULL;
int32_t ret = -1;
br_stub_private_t *priv = NULL;
gf_boolean_t ver_enabled = _gf_false;
BR_STUB_VER_ENABLED_IN_CALLPATH(frame, ver_enabled);
priv = this->private;
BR_STUB_VER_COND_GOTO(priv, (!ver_enabled), unwind);
local = frame->local;
frame->local = NULL;
if (op_ret < 0)
goto unwind;
if (!local) {
gf_msg(this->name, GF_LOG_WARNING, 0, BRS_MSG_NULL_LOCAL,
"local is NULL");
goto unwind;
}
inode = local->u.context.inode;
if (!IA_ISREG(inode->ia_type))
goto unwind;
ret = br_stub_get_inode_ctx(this, inode, &ctx_addr);
if (ret) {
/**
* If the inode is bad AND context is not there, then there
* is a possibility of the gfid of the object being listed
* in the quarantine directory and will be shown in the
* bad objects list. So continuing with the fop with a
* warning log. The entry from the quarantine directory
* has to be removed manually. Its not a good idea to fail
* the fop, as the object has already been deleted.
*/
gf_msg(this->name, GF_LOG_WARNING, 0, BRS_MSG_GET_INODE_CONTEXT_FAILED,
"failed to get the context for the inode %s",
uuid_utoa(inode->gfid));
goto unwind;
}
ctx = (br_stub_inode_ctx_t *)(long)ctx_addr;
LOCK(&inode->lock);
{
/**
* Ignoring the return value of br_stub_del ().
* There is not much that can be done if unlinking
* of the entry in the quarantine directory fails.
* The failure is logged.
*/
if (__br_stub_is_bad_object(ctx))
(void)br_stub_del(this, inode->gfid);
}
UNLOCK(&inode->lock);
unwind:
STACK_UNWIND_STRICT(unlink, frame, op_ret, op_errno, preparent, postparent,
xdata);
br_stub_cleanup_local(local);
br_stub_dealloc_local(local);
return 0;
}
int
br_stub_unlink(call_frame_t *frame, xlator_t *this, loc_t *loc, int flag,
dict_t *xdata)
{
br_stub_local_t *local = NULL;
int32_t op_ret = -1;
int32_t op_errno = 0;
br_stub_private_t *priv = NULL;
priv = this->private;
BR_STUB_VER_NOT_ACTIVE_THEN_GOTO(frame, priv, wind);
local = br_stub_alloc_local(this);
if (!local) {
op_ret = -1;
op_errno = ENOMEM;
gf_msg(this->name, GF_LOG_ERROR, ENOMEM, BRS_MSG_NO_MEMORY,
"failed to allocate memory for local (path: %s, gfid: %s)",
loc->path, uuid_utoa(loc->inode->gfid));
goto unwind;
}
br_stub_fill_local(local, NULL, NULL, loc->inode, loc->inode->gfid,
BR_STUB_NO_VERSIONING, 0);
frame->local = local;
wind:
STACK_WIND(frame, br_stub_unlink_cbk, FIRST_CHILD(this),
FIRST_CHILD(this)->fops->unlink, loc, flag, xdata);
return 0;
unwind:
if (frame->local == (void *)0x1)
frame->local = NULL;
STACK_UNWIND_STRICT(unlink, frame, op_ret, op_errno, NULL, NULL, NULL);
return 0;
}
/** }}} */
/** {{{ */
/* forget() */
int
br_stub_forget(xlator_t *this, inode_t *inode)
{
uint64_t ctx_addr = 0;
br_stub_inode_ctx_t *ctx = NULL;
inode_ctx_del(inode, this, &ctx_addr);
if (!ctx_addr)
return 0;
ctx = (br_stub_inode_ctx_t *)(long)ctx_addr;
GF_FREE(ctx);
return 0;
}
/** }}} */
/** {{{ */
int32_t
br_stub_noop(call_frame_t *frame, void *cookie, xlator_t *this, int32_t op_ret,
int32_t op_errno, dict_t *xdata)
{
STACK_DESTROY(frame->root);
return 0;
}
static void
br_stub_send_ipc_fop(xlator_t *this, fd_t *fd, unsigned long releaseversion,
int sign_info)
{
int32_t op = 0;
int32_t ret = 0;
dict_t *xdata = NULL;
call_frame_t *frame = NULL;
changelog_event_t ev = {
0,
};
ev.ev_type = CHANGELOG_OP_TYPE_BR_RELEASE;
ev.u.releasebr.version = releaseversion;
ev.u.releasebr.sign_info = sign_info;
gf_uuid_copy(ev.u.releasebr.gfid, fd->inode->gfid);
xdata = dict_new();
if (!xdata) {
gf_msg(this->name, GF_LOG_WARNING, ENOMEM, BRS_MSG_NO_MEMORY,
"dict allocation failed: cannot send IPC FOP "
"to changelog");
goto out;
}
ret = dict_set_static_bin(xdata, "RELEASE-EVENT", &ev, CHANGELOG_EV_SIZE);
if (ret) {
gf_msg(this->name, GF_LOG_WARNING, 0, BRS_MSG_SET_EVENT_FAILED,
"cannot set release event in dict");
goto dealloc_dict;
}
frame = create_frame(this, this->ctx->pool);
if (!frame) {
gf_msg(this->name, GF_LOG_WARNING, 0, BRS_MSG_CREATE_FRAME_FAILED,
"create_frame() failure");
goto dealloc_dict;
}
op = GF_IPC_TARGET_CHANGELOG;
STACK_WIND(frame, br_stub_noop, FIRST_CHILD(this),
FIRST_CHILD(this)->fops->ipc, op, xdata);
dealloc_dict:
dict_unref(xdata);
out:
return;
}
/**
* This is how the state machine of sign info works:
* 3 states:
* 1) BR_SIGN_NORMAL => The default State of the inode
* 2) BR_SIGN_REOPEN_WAIT => A release has been sent and is waiting for reopen
* 3) BR_SIGN_QUICK => reopen has happened and this release should trigger sign
* 2 events:
* 1) GF_FOP_RELEASE
* 2) GF_FOP_WRITE (actually a dummy write for BitD)
*
* This is how states are changed based on events:
* EVENT: GF_FOP_RELEASE:
* if (state == BR_SIGN_NORMAL) ; then
* set state = BR_SIGN_REOPEN_WAIT;
* if (state == BR_SIGN_QUICK); then
* set state = BR_SIGN_NORMAL;
* EVENT: GF_FOP_WRITE:
* if (state == BR_SIGN_REOPEN_WAIT); then
* set state = BR_SIGN_QUICK;
*/
br_sign_state_t
__br_stub_inode_sign_state(br_stub_inode_ctx_t *ctx, glusterfs_fop_t fop,
fd_t *fd)
{
br_sign_state_t sign_info = BR_SIGN_INVALID;
switch (fop) {
case GF_FOP_FSETXATTR:
sign_info = ctx->info_sign = BR_SIGN_QUICK;
break;
case GF_FOP_RELEASE:
GF_ASSERT(ctx->info_sign != BR_SIGN_REOPEN_WAIT);
if (ctx->info_sign == BR_SIGN_NORMAL) {
sign_info = ctx->info_sign = BR_SIGN_REOPEN_WAIT;
} else {
sign_info = ctx->info_sign;
ctx->info_sign = BR_SIGN_NORMAL;
}
break;
default:
break;
}
return sign_info;
}
int32_t
br_stub_release(xlator_t *this, fd_t *fd)
{
int32_t ret = 0;
int32_t flags = 0;
inode_t *inode = NULL;
unsigned long releaseversion = 0;
br_stub_inode_ctx_t *ctx = NULL;
uint64_t tmp = 0;
br_stub_fd_t *br_stub_fd = NULL;
int32_t signinfo = 0;
inode = fd->inode;
LOCK(&inode->lock);
{
ctx = __br_stub_get_ongoing_version_ctx(this, inode, NULL);
if (ctx == NULL)
goto unblock;
br_stub_fd = br_stub_fd_ctx_get(this, fd);
if (br_stub_fd) {
list_del_init(&br_stub_fd->list);
}
ret = __br_stub_can_trigger_release(inode, ctx, &releaseversion);
if (!ret)
goto unblock;
signinfo = __br_stub_inode_sign_state(ctx, GF_FOP_RELEASE, fd);
signinfo = htonl(signinfo);
/* inode back to initital state: mark dirty */
if (ctx->info_sign == BR_SIGN_NORMAL) {
__br_stub_mark_inode_dirty(ctx);
__br_stub_unset_inode_modified(ctx);
}
}
unblock:
UNLOCK(&inode->lock);
if (ret) {
gf_msg_debug(this->name, 0,
"releaseversion: %lu | flags: %d "
"| signinfo: %d",
(unsigned long)ntohl(releaseversion), flags,
ntohl(signinfo));
br_stub_send_ipc_fop(this, fd, releaseversion, signinfo);
}
ret = fd_ctx_del(fd, this, &tmp);
br_stub_fd = (br_stub_fd_t *)(long)tmp;
GF_FREE(br_stub_fd);
return 0;
}
int32_t
br_stub_releasedir(xlator_t *this, fd_t *fd)
{
br_stub_fd_t *fctx = NULL;
uint64_t ctx = 0;
int ret = 0;
ret = fd_ctx_del(fd, this, &ctx);
if (ret < 0)
goto out;
fctx = (br_stub_fd_t *)(long)ctx;
if (fctx->bad_object.dir) {
ret = sys_closedir(fctx->bad_object.dir);
if (ret)
gf_msg(this->name, GF_LOG_ERROR, 0, BRS_MSG_BAD_OBJ_DIR_CLOSE_FAIL,
"closedir error: %s", strerror(errno));
}
GF_FREE(fctx);
out:
return 0;
}
/** }}} */
/** {{{ */
/* ictxmerge */
void
br_stub_ictxmerge(xlator_t *this, fd_t *fd, inode_t *inode,
inode_t *linked_inode)
{
int32_t ret = 0;
uint64_t ctxaddr = 0;
uint64_t lctxaddr = 0;
br_stub_inode_ctx_t *ctx = NULL;
br_stub_inode_ctx_t *lctx = NULL;
br_stub_fd_t *br_stub_fd = NULL;
ret = br_stub_get_inode_ctx(this, inode, &ctxaddr);
if (ret < 0)
goto done;
ctx = (br_stub_inode_ctx_t *)(uintptr_t)ctxaddr;
LOCK(&linked_inode->lock);
{
ret = __br_stub_get_inode_ctx(this, linked_inode, &lctxaddr);
if (ret < 0)
goto unblock;
lctx = (br_stub_inode_ctx_t *)(uintptr_t)lctxaddr;
GF_ASSERT(list_is_singular(&ctx->fd_list));
br_stub_fd = list_first_entry(&ctx->fd_list, br_stub_fd_t, list);
if (br_stub_fd) {
GF_ASSERT(br_stub_fd->fd == fd);
list_move_tail(&br_stub_fd->list, &lctx->fd_list);
}
}
unblock:
UNLOCK(&linked_inode->lock);
done:
return;
}
/** }}} */
struct xlator_fops fops = {
.lookup = br_stub_lookup,
.stat = br_stub_stat,
.fstat = br_stub_fstat,
.open = br_stub_open,
.create = br_stub_create,
.readdirp = br_stub_readdirp,
.getxattr = br_stub_getxattr,
.fgetxattr = br_stub_fgetxattr,
.fsetxattr = br_stub_fsetxattr,
.writev = br_stub_writev,
.truncate = br_stub_truncate,
.ftruncate = br_stub_ftruncate,
.mknod = br_stub_mknod,
.readv = br_stub_readv,
.removexattr = br_stub_removexattr,
.fremovexattr = br_stub_fremovexattr,
.setxattr = br_stub_setxattr,
.opendir = br_stub_opendir,
.readdir = br_stub_readdir,
.unlink = br_stub_unlink,
};
struct xlator_cbks cbks = {
.forget = br_stub_forget,
.release = br_stub_release,
.ictxmerge = br_stub_ictxmerge,
};
struct volume_options options[] = {
{.key = {"bitrot"},
.type = GF_OPTION_TYPE_BOOL,
.default_value = "off",
.op_version = {GD_OP_VERSION_3_7_0},
.flags = OPT_FLAG_SETTABLE | OPT_FLAG_FORCE,
.tags = {"bitrot"},
.description = "enable/disable bitrot stub"},
{.key = {"export"},
.type = GF_OPTION_TYPE_PATH,
.op_version = {GD_OP_VERSION_3_7_0},
.tags = {"bitrot"},
.description = "brick path for versioning",
.default_value = "{{ brick.path }}"},
{.key = {NULL}},
};
xlator_api_t xlator_api = {
.init = init,
.fini = fini,
.notify = notify,
.reconfigure = reconfigure,
.mem_acct_init = mem_acct_init,
.op_version = {1}, /* Present from the initial version */
.fops = &fops,
.cbks = &cbks,
.options = options,
.identifier = "bitrot-stub",
.category = GF_MAINTAINED,
};