/*
Copyright (c) 2008-2012 Red Hat, Inc. <http://www.redhat.com>
This file is part of GlusterFS.
This file is licensed to you under your choice of the GNU Lesser
General Public License, version 3 or any later version (LGPLv3 or
later), or the GNU General Public License, version 2 (GPLv2), in all
cases as published by the Free Software Foundation.
*/
/* TODO: link(oldpath, newpath) fails if newpath already exists. DHT should
* delete the newpath if it gets EEXISTS from link() call.
*/
#include <glusterfs/glusterfs.h>
#include <glusterfs/xlator.h>
#include "dht-common.h"
#include "dht-lock.h"
#include <glusterfs/defaults.h>
int
dht_rename_unlock(call_frame_t *frame, xlator_t *this);
int32_t
dht_rename_lock_cbk(call_frame_t *frame, void *cookie, xlator_t *this,
int32_t op_ret, int32_t op_errno, dict_t *xdata);
int
dht_rename_unlock_cbk(call_frame_t *frame, void *cookie, xlator_t *this,
int32_t op_ret, int32_t op_errno, dict_t *xdata)
{
dht_local_t *local = NULL;
local = frame->local;
dht_set_fixed_dir_stat(&local->preoldparent);
dht_set_fixed_dir_stat(&local->postoldparent);
dht_set_fixed_dir_stat(&local->preparent);
dht_set_fixed_dir_stat(&local->postparent);
if (IA_ISREG(local->stbuf.ia_type))
DHT_STRIP_PHASE1_FLAGS(&local->stbuf);
DHT_STACK_UNWIND(rename, frame, local->op_ret, local->op_errno,
&local->stbuf, &local->preoldparent, &local->postoldparent,
&local->preparent, &local->postparent, local->xattr);
return 0;
}
static void
dht_rename_dir_unlock_src(call_frame_t *frame, xlator_t *this)
{
dht_local_t *local = NULL;
local = frame->local;
dht_unlock_namespace(frame, &local->lock[0]);
return;
}
static void
dht_rename_dir_unlock_dst(call_frame_t *frame, xlator_t *this)
{
dht_local_t *local = NULL;
int op_ret = -1;
char src_gfid[GF_UUID_BUF_SIZE] = {0};
char dst_gfid[GF_UUID_BUF_SIZE] = {0};
local = frame->local;
/* Unlock entrylk */
dht_unlock_entrylk_wrapper(frame, &local->lock[1].ns.directory_ns);
/* Unlock inodelk */
op_ret = dht_unlock_inodelk(frame, local->lock[1].ns.parent_layout.locks,
local->lock[1].ns.parent_layout.lk_count,
dht_rename_unlock_cbk);
if (op_ret < 0) {
uuid_utoa_r(local->loc.inode->gfid, src_gfid);
if (local->loc2.inode)
uuid_utoa_r(local->loc2.inode->gfid, dst_gfid);
if (IA_ISREG(local->stbuf.ia_type))
gf_msg(this->name, GF_LOG_WARNING, 0, DHT_MSG_UNLOCKING_FAILED,
"winding unlock inodelk failed "
"rename (%s:%s:%s %s:%s:%s), "
"stale locks left on bricks",
local->loc.path, src_gfid, local->src_cached->name,
local->loc2.path, dst_gfid,
local->dst_cached ? local->dst_cached->name : NULL);
else
gf_msg(this->name, GF_LOG_WARNING, 0, DHT_MSG_UNLOCKING_FAILED,
"winding unlock inodelk failed "
"rename (%s:%s %s:%s), "
"stale locks left on bricks",
local->loc.path, src_gfid, local->loc2.path, dst_gfid);
dht_rename_unlock_cbk(frame, NULL, this, 0, 0, NULL);
}
return;
}
static int
dht_rename_dir_unlock(call_frame_t *frame, xlator_t *this)
{
dht_rename_dir_unlock_src(frame, this);
dht_rename_dir_unlock_dst(frame, this);
return 0;
}
int
dht_rename_dir_cbk(call_frame_t *frame, void *cookie, xlator_t *this,
int32_t op_ret, int32_t op_errno, struct iatt *stbuf,
struct iatt *preoldparent, struct iatt *postoldparent,
struct iatt *prenewparent, struct iatt *postnewparent,
dict_t *xdata)
{
dht_conf_t *conf = NULL;
dht_local_t *local = NULL;
int this_call_cnt = 0;
xlator_t *prev = NULL;
int i = 0;
char gfid[GF_UUID_BUF_SIZE] = {0};
int subvol_cnt = -1;
conf = this->private;
local = frame->local;
prev = cookie;
subvol_cnt = dht_subvol_cnt(this, prev);
local->ret_cache[subvol_cnt] = op_ret;
if (op_ret == -1) {
gf_uuid_unparse(local->loc.inode->gfid, gfid);
gf_msg(this->name, GF_LOG_INFO, op_errno, DHT_MSG_RENAME_FAILED,
"Rename %s -> %s on %s failed, (gfid = %s)", local->loc.path,
local->loc2.path, prev->name, gfid);
local->op_ret = op_ret;
local->op_errno = op_errno;
goto unwind;
}
/* TODO: construct proper stbuf for dir */
/*
* FIXME: is this the correct way to build stbuf and
* parent bufs?
*/
dht_iatt_merge(this, &local->stbuf, stbuf);
dht_iatt_merge(this, &local->preoldparent, preoldparent);
dht_iatt_merge(this, &local->postoldparent, postoldparent);
dht_iatt_merge(this, &local->preparent, prenewparent);
dht_iatt_merge(this, &local->postparent, postnewparent);
unwind:
this_call_cnt = dht_frame_return(frame);
if (is_last_call(this_call_cnt)) {
/* We get here with local->call_cnt == 0. Which means
* we are the only one executing this code, there is
* no contention. Therefore it's safe to manipulate or
* deref local->call_cnt directly (without locking).
*/
if (local->ret_cache[conf->subvolume_cnt] == 0) {
/* count errant subvols in last field of ret_cache */
for (i = 0; i < conf->subvolume_cnt; i++) {
if (local->ret_cache[i] != 0)
++local->ret_cache[conf->subvolume_cnt];
}
if (local->ret_cache[conf->subvolume_cnt]) {
/* undoing the damage:
* for all subvolumes, where rename
* succeeded, we perform the reverse operation
*/
for (i = 0; i < conf->subvolume_cnt; i++) {
if (local->ret_cache[i] == 0)
++local->call_cnt;
}
for (i = 0; i < conf->subvolume_cnt; i++) {
if (local->ret_cache[i])
continue;
STACK_WIND(frame, dht_rename_dir_cbk, conf->subvolumes[i],
conf->subvolumes[i]->fops->rename, &local->loc2,
&local->loc, NULL);
}
return 0;
}
}
WIPE(&local->preoldparent);
WIPE(&local->postoldparent);
WIPE(&local->preparent);
WIPE(&local->postparent);
dht_rename_dir_unlock(frame, this);
}
return 0;
}
int
dht_rename_hashed_dir_cbk(call_frame_t *frame, void *cookie, xlator_t *this,
int32_t op_ret, int32_t op_errno, struct iatt *stbuf,
struct iatt *preoldparent, struct iatt *postoldparent,
struct iatt *prenewparent, struct iatt *postnewparent,
dict_t *xdata)
{
dht_conf_t *conf = NULL;
dht_local_t *local = NULL;
int call_cnt = 0;
xlator_t *prev = NULL;
int i = 0;
char gfid[GF_UUID_BUF_SIZE] = {0};
conf = this->private;
local = frame->local;
prev = cookie;
if (op_ret == -1) {
gf_uuid_unparse(local->loc.inode->gfid, gfid);
gf_msg(this->name, GF_LOG_INFO, op_errno, DHT_MSG_RENAME_FAILED,
"rename %s -> %s on %s failed, (gfid = %s) ", local->loc.path,
local->loc2.path, prev->name, gfid);
local->op_ret = op_ret;
local->op_errno = op_errno;
goto unwind;
}
/* TODO: construct proper stbuf for dir */
/*
* FIXME: is this the correct way to build stbuf and
* parent bufs?
*/
dht_iatt_merge(this, &local->stbuf, stbuf);
dht_iatt_merge(this, &local->preoldparent, preoldparent);
dht_iatt_merge(this, &local->postoldparent, postoldparent);
dht_iatt_merge(this, &local->preparent, prenewparent);
dht_iatt_merge(this, &local->postparent, postnewparent);
call_cnt = local->call_cnt = conf->subvolume_cnt - 1;
if (!local->call_cnt)
goto unwind;
for (i = 0; i < conf->subvolume_cnt; i++) {
if (conf->subvolumes[i] == local->dst_hashed)
continue;
STACK_WIND_COOKIE(
frame, dht_rename_dir_cbk, conf->subvolumes[i], conf->subvolumes[i],
conf->subvolumes[i]->fops->rename, &local->loc, &local->loc2, NULL);
if (!--call_cnt)
break;
}
return 0;
unwind:
WIPE(&local->preoldparent);
WIPE(&local->postoldparent);
WIPE(&local->preparent);
WIPE(&local->postparent);
dht_rename_dir_unlock(frame, this);
return 0;
}
int
dht_rename_dir_do(call_frame_t *frame, xlator_t *this)
{
dht_local_t *local = NULL;
local = frame->local;
if (local->op_ret == -1)
goto err;
local->op_ret = 0;
STACK_WIND_COOKIE(frame, dht_rename_hashed_dir_cbk, local->dst_hashed,
local->dst_hashed, local->dst_hashed->fops->rename,
&local->loc, &local->loc2, NULL);
return 0;
err:
dht_rename_dir_unlock(frame, this);
return 0;
}
int
dht_rename_readdir_cbk(call_frame_t *frame, void *cookie, xlator_t *this,
int op_ret, int op_errno, gf_dirent_t *entries,
dict_t *xdata)
{
dht_local_t *local = NULL;
int this_call_cnt = -1;
xlator_t *prev = NULL;
local = frame->local;
prev = cookie;
if (op_ret > 2) {
gf_msg_trace(this->name, 0, "readdir on %s for %s returned %d entries",
prev->name, local->loc.path, op_ret);
local->op_ret = -1;
local->op_errno = ENOTEMPTY;
}
this_call_cnt = dht_frame_return(frame);
if (is_last_call(this_call_cnt)) {
dht_rename_dir_do(frame, this);
}
return 0;
}
int
dht_rename_opendir_cbk(call_frame_t *frame, void *cookie, xlator_t *this,
int op_ret, int op_errno, fd_t *fd, dict_t *xdata)
{
dht_local_t *local = NULL;
int this_call_cnt = -1;
xlator_t *prev = NULL;
char gfid[GF_UUID_BUF_SIZE] = {0};
local = frame->local;
prev = cookie;
if (op_ret == -1) {
gf_uuid_unparse(local->loc.inode->gfid, gfid);
gf_msg(this->name, GF_LOG_INFO, op_errno, DHT_MSG_OPENDIR_FAILED,
"opendir on %s for %s failed,(gfid = %s) ", prev->name,
local->loc.path, gfid);
goto err;
}
fd_bind(fd);
STACK_WIND_COOKIE(frame, dht_rename_readdir_cbk, prev, prev,
prev->fops->readdir, local->fd, 4096, 0, NULL);
return 0;
err:
this_call_cnt = dht_frame_return(frame);
if (is_last_call(this_call_cnt)) {
dht_rename_dir_do(frame, this);
}
return 0;
}
int
dht_rename_dir_lock2_cbk(call_frame_t *frame, void *cookie, xlator_t *this,
int32_t op_ret, int32_t op_errno, dict_t *xdata)
{
dht_local_t *local = NULL;
char src_gfid[GF_UUID_BUF_SIZE] = {0};
char dst_gfid[GF_UUID_BUF_SIZE] = {0};
dht_conf_t *conf = NULL;
int i = 0;
local = frame->local;
conf = this->private;
if (op_ret < 0) {
uuid_utoa_r(local->loc.inode->gfid, src_gfid);
if (local->loc2.inode)
uuid_utoa_r(local->loc2.inode->gfid, dst_gfid);
gf_msg(this->name, GF_LOG_WARNING, op_errno, DHT_MSG_INODE_LK_ERROR,
"acquiring entrylk after inodelk failed"
"rename (%s:%s:%s %s:%s:%s)",
local->loc.path, src_gfid, local->src_cached->name,
local->loc2.path, dst_gfid,
local->dst_cached ? local->dst_cached->name : NULL);
local->op_ret = -1;
local->op_errno = op_errno;
goto err;
}
local->fd = fd_create(local->loc.inode, frame->root->pid);
if (!local->fd) {
op_errno = ENOMEM;
goto err;
}
local->op_ret = 0;
if (!local->dst_cached) {
dht_rename_dir_do(frame, this);
return 0;
}
for (i = 0; i < conf->subvolume_cnt; i++) {
STACK_WIND_COOKIE(frame, dht_rename_opendir_cbk, conf->subvolumes[i],
conf->subvolumes[i],
conf->subvolumes[i]->fops->opendir, &local->loc2,
local->fd, NULL);
}
return 0;
err:
/* No harm in calling an extra unlock */
dht_rename_dir_unlock(frame, this);
return 0;
}
int
dht_rename_dir_lock1_cbk(call_frame_t *frame, void *cookie, xlator_t *this,
int32_t op_ret, int32_t op_errno, dict_t *xdata)
{
dht_local_t *local = NULL;
char src_gfid[GF_UUID_BUF_SIZE] = {0};
char dst_gfid[GF_UUID_BUF_SIZE] = {0};
int ret = 0;
loc_t *loc = NULL;
xlator_t *subvol = NULL;
local = frame->local;
if (op_ret < 0) {
uuid_utoa_r(local->loc.inode->gfid, src_gfid);
if (local->loc2.inode)
uuid_utoa_r(local->loc2.inode->gfid, dst_gfid);
gf_msg(this->name, GF_LOG_WARNING, op_errno, DHT_MSG_INODE_LK_ERROR,
"acquiring entrylk after inodelk failed"
"rename (%s:%s:%s %s:%s:%s)",
local->loc.path, src_gfid, local->src_cached->name,
local->loc2.path, dst_gfid,
local->dst_cached ? local->dst_cached->name : NULL);
local->op_ret = -1;
local->op_errno = op_errno;
goto err;
}
if (local->current == &local->lock[0]) {
loc = &local->loc2;
subvol = local->dst_hashed;
local->current = &local->lock[1];
} else {
loc = &local->loc;
subvol = local->src_hashed;
local->current = &local->lock[0];
}
ret = dht_protect_namespace(frame, loc, subvol, &local->current->ns,
dht_rename_dir_lock2_cbk);
if (ret < 0) {
op_errno = EINVAL;
goto err;
}
return 0;
err:
/* No harm in calling an extra unlock */
dht_rename_dir_unlock(frame, this);
return 0;
}
/*
* If the hashed subvolumes of both source and dst are the different,
* lock in dictionary order of hashed subvol->name. This is important
* in case the parent directory is the same for both src and dst to
* prevent inodelk deadlocks when racing with a fix-layout op on the parent.
*
* If the hashed subvols are the same, use the gfid/name to determine
* the order of taking locks to prevent entrylk deadlocks when the parent
* dirs are the same.
*
*/
static int
dht_order_rename_lock(call_frame_t *frame, loc_t **loc, xlator_t **subvol)
{
int ret = 0;
int op_ret = 0;
dht_local_t *local = NULL;
char *src = NULL;
char *dst = NULL;
local = frame->local;
if (local->src_hashed->name == local->dst_hashed->name) {
ret = 0;
} else {
ret = strcmp(local->src_hashed->name, local->dst_hashed->name);
}
if (ret == 0) {
/* hashed subvols are the same for src and dst */
/* Entrylks need to be ordered*/
src = alloca(GF_UUID_BNAME_BUF_SIZE + strlen(local->loc.name) + 1);
if (!src) {
gf_msg(frame->this->name, GF_LOG_ERROR, ENOMEM, 0,
"Insufficient memory for src");
op_ret = -1;
goto out;
}
if (!gf_uuid_is_null(local->loc.pargfid))
uuid_utoa_r(local->loc.pargfid, src);
else if (local->loc.parent)
uuid_utoa_r(local->loc.parent->gfid, src);
strcat(src, local->loc.name);
dst = alloca(GF_UUID_BNAME_BUF_SIZE + strlen(local->loc2.name) + 1);
if (!dst) {
gf_msg(frame->this->name, GF_LOG_ERROR, ENOMEM, 0,
"Insufficient memory for dst");
op_ret = -1;
goto out;
}
if (!gf_uuid_is_null(local->loc2.pargfid))
uuid_utoa_r(local->loc2.pargfid, dst);
else if (local->loc2.parent)
uuid_utoa_r(local->loc2.parent->gfid, dst);
strcat(dst, local->loc2.name);
ret = strcmp(src, dst);
}
if (ret <= 0) {
/*inodelk in dictionary order of hashed subvol names*/
/*entrylk in dictionary order of gfid/basename */
local->current = &local->lock[0];
*loc = &local->loc;
*subvol = local->src_hashed;
} else {
local->current = &local->lock[1];
*loc = &local->loc2;
*subvol = local->dst_hashed;
}
op_ret = 0;
out:
return op_ret;
}
int
dht_rename_dir(call_frame_t *frame, xlator_t *this)
{
dht_conf_t *conf = NULL;
dht_local_t *local = NULL;
loc_t *loc = NULL;
xlator_t *subvol = NULL;
int i = 0;
int ret = 0;
int op_errno = -1;
conf = frame->this->private;
local = frame->local;
local->ret_cache = GF_CALLOC(conf->subvolume_cnt + 1, sizeof(int),
gf_dht_ret_cache_t);
if (local->ret_cache == NULL) {
op_errno = ENOMEM;
goto err;
}
local->call_cnt = conf->subvolume_cnt;
for (i = 0; i < conf->subvolume_cnt; i++) {
if (!conf->subvolume_status[i]) {
gf_msg(this->name, GF_LOG_INFO, 0, DHT_MSG_RENAME_FAILED,
"Rename dir failed: subvolume down (%s)",
conf->subvolumes[i]->name);
op_errno = ENOTCONN;
goto err;
}
}
/* Locks on src and dst needs to ordered which otherwise might cause
* deadlocks when rename (src, dst) and rename (dst, src) is done from
* two different clients
*/
ret = dht_order_rename_lock(frame, &loc, &subvol);
if (ret) {
op_errno = ENOMEM;
goto err;
}
/* Rename must take locks on src to avoid lookup selfheal from
* recreating src on those subvols where the rename was successful.
* The locks can't be issued parallel as two different clients might
* attempt same rename command and be in dead lock.
*/
ret = dht_protect_namespace(frame, loc, subvol, &local->current->ns,
dht_rename_dir_lock1_cbk);
if (ret < 0) {
op_errno = EINVAL;
goto err;
}
return 0;
err:
DHT_STACK_UNWIND(rename, frame, -1, op_errno, NULL, NULL, NULL, NULL, NULL,
NULL);
return 0;
}
static int
dht_rename_track_for_changelog(xlator_t *this, dict_t *xattr, loc_t *oldloc,
loc_t *newloc)
{
int ret = -1;
dht_changelog_rename_info_t *info = NULL;
char *name = NULL;
int len1 = 0;
int len2 = 0;
int size = 0;
if (!xattr || !oldloc || !newloc || !this)
return ret;
len1 = strlen(oldloc->name) + 1;
len2 = strlen(newloc->name) + 1;
size = sizeof(dht_changelog_rename_info_t) + len1 + len2;
info = GF_CALLOC(size, sizeof(char), gf_common_mt_char);
if (!info) {
gf_msg(this->name, GF_LOG_ERROR, 0, DHT_MSG_DICT_SET_FAILED,
"Failed to calloc memory");
return ret;
}
gf_uuid_copy(info->old_pargfid, oldloc->pargfid);
gf_uuid_copy(info->new_pargfid, newloc->pargfid);
info->oldname_len = len1;
info->newname_len = len2;
strncpy(info->buffer, oldloc->name, len1);
name = info->buffer + len1;
strncpy(name, newloc->name, len2);
ret = dict_set_bin(xattr, DHT_CHANGELOG_RENAME_OP_KEY, info, size);
if (ret) {
gf_msg(this->name, GF_LOG_ERROR, 0, DHT_MSG_DICT_SET_FAILED,
"Failed to set dictionary value: key = %s,"
" path = %s",
DHT_CHANGELOG_RENAME_OP_KEY, oldloc->name);
GF_FREE(info);
}
return ret;
}
#define DHT_MARKER_DONT_ACCOUNT(xattr) \
do { \
int tmp = -1; \
if (!xattr) { \
xattr = dict_new(); \
if (!xattr) \
break; \
} \
tmp = dict_set_str(xattr, GLUSTERFS_MARKER_DONT_ACCOUNT_KEY, "yes"); \
if (tmp) { \
gf_msg(this->name, GF_LOG_ERROR, 0, DHT_MSG_DICT_SET_FAILED, \
"Failed to set dictionary value: key = %s," \
" path = %s", \
GLUSTERFS_MARKER_DONT_ACCOUNT_KEY, local->loc.path); \
} \
} while (0)
#define DHT_CHANGELOG_TRACK_AS_RENAME(xattr, oldloc, newloc) \
do { \
int tmp = -1; \
if (!xattr) { \
xattr = dict_new(); \
if (!xattr) { \
gf_msg(this->name, GF_LOG_ERROR, 0, DHT_MSG_DICT_SET_FAILED, \
"Failed to create dictionary to " \
"track rename"); \
break; \
} \
} \
\
tmp = dht_rename_track_for_changelog(this, xattr, oldloc, newloc); \
\
if (tmp) { \
gf_msg(this->name, GF_LOG_ERROR, 0, DHT_MSG_DICT_SET_FAILED, \
"Failed to set dictionary value: key = %s," \
" path = %s", \
DHT_CHANGELOG_RENAME_OP_KEY, (oldloc)->path); \
} \
} while (0)
int
dht_rename_unlock(call_frame_t *frame, xlator_t *this)
{
dht_local_t *local = NULL;
int op_ret = -1;
char src_gfid[GF_UUID_BUF_SIZE] = {0};
char dst_gfid[GF_UUID_BUF_SIZE] = {0};
dht_ilock_wrap_t inodelk_wrapper = {
0,
};
local = frame->local;
inodelk_wrapper.locks = local->rename_inodelk_backward_compatible;
inodelk_wrapper.lk_count = local->rename_inodelk_bc_count;
op_ret = dht_unlock_inodelk_wrapper(frame, &inodelk_wrapper);
if (op_ret < 0) {
uuid_utoa_r(local->loc.inode->gfid, src_gfid);
if (local->loc2.inode)
uuid_utoa_r(local->loc2.inode->gfid, dst_gfid);
if (IA_ISREG(local->stbuf.ia_type))
gf_msg(this->name, GF_LOG_WARNING, 0, DHT_MSG_UNLOCKING_FAILED,
"winding unlock inodelk failed "
"rename (%s:%s:%s %s:%s:%s), "
"stale locks left on bricks",
local->loc.path, src_gfid, local->src_cached->name,
local->loc2.path, dst_gfid,
local->dst_cached ? local->dst_cached->name : NULL);
else
gf_msg(this->name, GF_LOG_WARNING, 0, DHT_MSG_UNLOCKING_FAILED,
"winding unlock inodelk failed "
"rename (%s:%s %s:%s), "
"stale locks left on bricks",
local->loc.path, src_gfid, local->loc2.path, dst_gfid);
}
dht_unlock_namespace(frame, &local->lock[0]);
dht_unlock_namespace(frame, &local->lock[1]);
dht_rename_unlock_cbk(frame, NULL, this, local->op_ret, local->op_errno,
NULL);
return 0;
}
int
dht_rename_done(call_frame_t *frame, xlator_t *this)
{
dht_local_t *local = NULL;
local = frame->local;
if (local->linked == _gf_true) {
local->linked = _gf_false;
dht_linkfile_attr_heal(frame, this);
}
dht_rename_unlock(frame, this);
return 0;
}
int
dht_rename_unlink_cbk(call_frame_t *frame, void *cookie, xlator_t *this,
int32_t op_ret, int32_t op_errno, struct iatt *preparent,
struct iatt *postparent, dict_t *xdata)
{
dht_local_t *local = NULL;
xlator_t *prev = NULL;
int this_call_cnt = 0;
local = frame->local;
prev = cookie;
FRAME_SU_UNDO(frame, dht_local_t);
if (!local) {
gf_msg(this->name, GF_LOG_ERROR, 0, DHT_MSG_INVALID_VALUE,
"!local, should not happen");
goto out;
}
this_call_cnt = dht_frame_return(frame);
if (op_ret == -1) {
gf_msg(this->name, GF_LOG_WARNING, op_errno, DHT_MSG_UNLINK_FAILED,
"%s: Rename: unlink on %s failed ", local->loc.path, prev->name);
}
WIPE(&local->preoldparent);
WIPE(&local->postoldparent);
WIPE(&local->preparent);
WIPE(&local->postparent);
if (is_last_call(this_call_cnt)) {
dht_rename_done(frame, this);
}
out:
return 0;
}
int
dht_rename_cleanup(call_frame_t *frame)
{
dht_local_t *local = NULL;
xlator_t *this = NULL;
xlator_t *src_hashed = NULL;
xlator_t *src_cached = NULL;
xlator_t *dst_hashed = NULL;
xlator_t *dst_cached = NULL;
int call_cnt = 0;
dict_t *xattr = NULL;
char gfid[GF_UUID_BUF_SIZE] = {0};
local = frame->local;
this = frame->this;
src_hashed = local->src_hashed;
src_cached = local->src_cached;
dst_hashed = local->dst_hashed;
dst_cached = local->dst_cached;
if (src_cached == dst_cached)
goto nolinks;
if (local->linked && (dst_hashed != src_hashed) &&
(dst_hashed != src_cached)) {
call_cnt++;
}
if (local->added_link && (src_cached != dst_hashed)) {
call_cnt++;
}
local->call_cnt = call_cnt;
if (!call_cnt)
goto nolinks;
DHT_MARK_FOP_INTERNAL(xattr);
gf_uuid_unparse(local->loc.inode->gfid, gfid);
if (local->linked && (dst_hashed != src_hashed) &&
(dst_hashed != src_cached)) {
dict_t *xattr_new = NULL;
gf_msg_trace(this->name, 0,
"unlinking linkfile %s @ %s => %s, (gfid = %s)",
local->loc.path, dst_hashed->name, src_cached->name, gfid);
xattr_new = dict_copy_with_ref(xattr, NULL);
DHT_MARKER_DONT_ACCOUNT(xattr_new);
FRAME_SU_DO(frame, dht_local_t);
STACK_WIND_COOKIE(frame, dht_rename_unlink_cbk, dst_hashed, dst_hashed,
dst_hashed->fops->unlink, &local->loc, 0, xattr_new);
dict_unref(xattr_new);
xattr_new = NULL;
}
if (local->added_link && (src_cached != dst_hashed)) {
dict_t *xattr_new = NULL;
gf_msg_trace(this->name, 0, "unlinking link %s => %s (%s), (gfid = %s)",
local->loc.path, local->loc2.path, src_cached->name, gfid);
xattr_new = dict_copy_with_ref(xattr, NULL);
if (gf_uuid_compare(local->loc.pargfid, local->loc2.pargfid) == 0) {
DHT_MARKER_DONT_ACCOUNT(xattr_new);
}
/* *
* The link to file is created using root permission.
* Hence deletion should happen using root. Otherwise
* it will fail.
*/
FRAME_SU_DO(frame, dht_local_t);
STACK_WIND_COOKIE(frame, dht_rename_unlink_cbk, src_cached, src_cached,
src_cached->fops->unlink, &local->loc2, 0, xattr_new);
dict_unref(xattr_new);
xattr_new = NULL;
}
if (xattr)
dict_unref(xattr);
return 0;
nolinks:
WIPE(&local->preoldparent);
WIPE(&local->postoldparent);
WIPE(&local->preparent);
WIPE(&local->postparent);
dht_rename_unlock(frame, this);
return 0;
}
int
dht_rename_unlink(call_frame_t *frame, xlator_t *this)
{
dht_local_t *local = NULL;
xlator_t *src_hashed = NULL;
xlator_t *src_cached = NULL;
xlator_t *dst_hashed = NULL;
xlator_t *dst_cached = NULL;
xlator_t *rename_subvol = NULL;
dict_t *xattr = NULL;
local = frame->local;
src_hashed = local->src_hashed;
src_cached = local->src_cached;
dst_hashed = local->dst_hashed;
dst_cached = local->dst_cached;
local->call_cnt = 0;
/* NOTE: rename_subvol is the same subvolume from which dht_rename_cbk
* is called. since rename has already happened on rename_subvol,
* unlink shouldn't be sent for oldpath (either linkfile or cached-file)
* on rename_subvol. */
if (src_cached == dst_cached)
rename_subvol = src_cached;
else
rename_subvol = dst_hashed;
/* TODO: delete files in background */
if (src_cached != dst_hashed && src_cached != dst_cached)
local->call_cnt++;
if (src_hashed != rename_subvol && src_hashed != src_cached)
local->call_cnt++;
if (dst_cached && dst_cached != dst_hashed && dst_cached != src_cached)
local->call_cnt++;
if (local->call_cnt == 0)
goto unwind;
DHT_MARK_FOP_INTERNAL(xattr);
if (src_cached != dst_hashed && src_cached != dst_cached) {
dict_t *xattr_new = NULL;
xattr_new = dict_copy_with_ref(xattr, NULL);
gf_msg_trace(this->name, 0, "deleting old src datafile %s @ %s",
local->loc.path, src_cached->name);
if (gf_uuid_compare(local->loc.pargfid, local->loc2.pargfid) == 0) {
DHT_MARKER_DONT_ACCOUNT(xattr_new);
}
DHT_CHANGELOG_TRACK_AS_RENAME(xattr_new, &local->loc, &local->loc2);
STACK_WIND_COOKIE(frame, dht_rename_unlink_cbk, src_cached, src_cached,
src_cached->fops->unlink, &local->loc, 0, xattr_new);
dict_unref(xattr_new);
xattr_new = NULL;
}
if (src_hashed != rename_subvol && src_hashed != src_cached) {
dict_t *xattr_new = NULL;
xattr_new = dict_copy_with_ref(xattr, NULL);
gf_msg_trace(this->name, 0, "deleting old src linkfile %s @ %s",
local->loc.path, src_hashed->name);
DHT_MARKER_DONT_ACCOUNT(xattr_new);
STACK_WIND_COOKIE(frame, dht_rename_unlink_cbk, src_hashed, src_hashed,
src_hashed->fops->unlink, &local->loc, 0, xattr_new);
dict_unref(xattr_new);
xattr_new = NULL;
}
if (dst_cached && (dst_cached != dst_hashed) &&
(dst_cached != src_cached)) {
gf_msg_trace(this->name, 0, "deleting old dst datafile %s @ %s",
local->loc2.path, dst_cached->name);
STACK_WIND_COOKIE(frame, dht_rename_unlink_cbk, dst_cached, dst_cached,
dst_cached->fops->unlink, &local->loc2, 0, xattr);
}
if (xattr)
dict_unref(xattr);
return 0;
unwind:
WIPE(&local->preoldparent);
WIPE(&local->postoldparent);
WIPE(&local->preparent);
WIPE(&local->postparent);
dht_rename_done(frame, this);
return 0;
}
int
dht_rename_links_create_cbk(call_frame_t *frame, void *cookie, xlator_t *this,
int32_t op_ret, int32_t op_errno, inode_t *inode,
struct iatt *stbuf, struct iatt *preparent,
struct iatt *postparent, dict_t *xdata)
{
xlator_t *prev = NULL;
dht_local_t *local = NULL;
call_frame_t *main_frame = NULL;
prev = cookie;
local = frame->local;
main_frame = local->main_frame;
/* TODO: Handle this case in lookup-optimize */
if (op_ret == -1) {
gf_msg(this->name, GF_LOG_WARNING, op_errno, DHT_MSG_CREATE_LINK_FAILED,
"link/file %s on %s failed", local->loc.path, prev->name);
}
if (local->linked == _gf_true) {
local->linked = _gf_false;
dht_linkfile_attr_heal(frame, this);
}
dht_rename_unlink(main_frame, this);
DHT_STACK_DESTROY(frame);
return 0;
}
int
dht_rename_cbk(call_frame_t *frame, void *cookie, xlator_t *this,
int32_t op_ret, int32_t op_errno, struct iatt *stbuf,
struct iatt *preoldparent, struct iatt *postoldparent,
struct iatt *prenewparent, struct iatt *postnewparent,
dict_t *xdata)
{
dht_local_t *local = NULL;
xlator_t *prev = NULL;
xlator_t *src_cached = NULL;
xlator_t *dst_hashed = NULL;
xlator_t *dst_cached = NULL;
call_frame_t *link_frame = NULL;
dht_local_t *link_local = NULL;
local = frame->local;
prev = cookie;
src_cached = local->src_cached;
dst_hashed = local->dst_hashed;
dst_cached = local->dst_cached;
if (local->linked == _gf_true)
FRAME_SU_UNDO(frame, dht_local_t);
/* It is a critical failure iff we fail to rename the cached file
* if the rename of the linkto failed, it is not a critical failure,
* and we do not want to lose the created hard link for the new
* name as that could have been read by other clients.
*
* NOTE: If another client is attempting the same oldname -> newname
* rename, and finds both file names as existing, and are hard links
* to each other, then FUSE would send in an unlink for oldname. In
* this time duration if we treat the linkto as a critical error and
* unlink the newname we created, we would have effectively lost the
* file to rename operations.
*
* Repercussions of treating this as a non-critical error is that
* we could leave behind a stale linkto file and/or not create the new
* linkto file, the second case would be rectified by a subsequent
* lookup, the first case by a rebalance, like for all stale linkto
* files */
if (op_ret == -1) {
/* Critical failure: unable to rename the cached file */
if (prev == src_cached) {
gf_msg(this->name, GF_LOG_WARNING, op_errno, DHT_MSG_RENAME_FAILED,
"%s: Rename on %s failed, (gfid = %s) ", local->loc.path,
prev->name,
local->loc.inode ? uuid_utoa(local->loc.inode->gfid) : "");
local->op_ret = op_ret;
local->op_errno = op_errno;
goto cleanup;
} else {
/* Non-critical failure, unable to rename the linkto
* file
*/
gf_msg(this->name, GF_LOG_INFO, op_errno, DHT_MSG_RENAME_FAILED,
"%s: Rename (linkto file) on %s failed, "
"(gfid = %s) ",
local->loc.path, prev->name,
local->loc.inode ? uuid_utoa(local->loc.inode->gfid) : "");
}
}
if (xdata) {
if (!local->xattr)
local->xattr = dict_ref(xdata);
else
local->xattr = dict_copy_with_ref(xdata, local->xattr);
}
/* Merge attrs only from src_cached. In case there of src_cached !=
* dst_hashed, this ignores linkfile attrs. */
if (prev == src_cached) {
dht_iatt_merge(this, &local->stbuf, stbuf);
dht_iatt_merge(this, &local->preoldparent, preoldparent);
dht_iatt_merge(this, &local->postoldparent, postoldparent);
dht_iatt_merge(this, &local->preparent, prenewparent);
dht_iatt_merge(this, &local->postparent, postnewparent);
}
/* Create the linkto file for the dst file */
if ((src_cached == dst_cached) && (dst_hashed != dst_cached)) {
link_frame = copy_frame(frame);
if (!link_frame) {
goto unlink;
}
/* fop value sent as maxvalue because it is not used
* anywhere in this case */
link_local = dht_local_init(link_frame, &local->loc2, NULL,
GF_FOP_MAXVALUE);
if (!link_local) {
goto unlink;
}
if (link_local->loc.inode)
inode_unref(link_local->loc.inode);
link_local->loc.inode = inode_ref(local->loc.inode);
link_local->main_frame = frame;
link_local->stbuf = local->stbuf;
gf_uuid_copy(link_local->gfid, local->loc.inode->gfid);
dht_linkfile_create(link_frame, dht_rename_links_create_cbk, this,
src_cached, dst_hashed, &link_local->loc);
return 0;
}
unlink:
if (link_frame) {
DHT_STACK_DESTROY(link_frame);
}
dht_rename_unlink(frame, this);
return 0;
cleanup:
dht_rename_cleanup(frame);
return 0;
}
int
dht_do_rename(call_frame_t *frame)
{
dht_local_t *local = NULL;
xlator_t *dst_hashed = NULL;
xlator_t *src_cached = NULL;
xlator_t *dst_cached = NULL;
xlator_t *this = NULL;
xlator_t *rename_subvol = NULL;
local = frame->local;
this = frame->this;
dst_hashed = local->dst_hashed;
dst_cached = local->dst_cached;
src_cached = local->src_cached;
if (src_cached == dst_cached)
rename_subvol = src_cached;
else
rename_subvol = dst_hashed;
if ((src_cached != dst_hashed) && (rename_subvol == dst_hashed)) {
DHT_MARKER_DONT_ACCOUNT(local->xattr_req);
}
if (rename_subvol == src_cached) {
DHT_CHANGELOG_TRACK_AS_RENAME(local->xattr_req, &local->loc,
&local->loc2);
}
gf_msg_trace(this->name, 0, "renaming %s => %s (%s)", local->loc.path,
local->loc2.path, rename_subvol->name);
if (local->linked == _gf_true)
FRAME_SU_DO(frame, dht_local_t);
STACK_WIND_COOKIE(frame, dht_rename_cbk, rename_subvol, rename_subvol,
rename_subvol->fops->rename, &local->loc, &local->loc2,
local->xattr_req);
return 0;
}
int
dht_rename_link_cbk(call_frame_t *frame, void *cookie, xlator_t *this,
int32_t op_ret, int32_t op_errno, inode_t *inode,
struct iatt *stbuf, struct iatt *preparent,
struct iatt *postparent, dict_t *xdata)
{
dht_local_t *local = NULL;
xlator_t *prev = NULL;
local = frame->local;
prev = cookie;
if (op_ret == -1) {
gf_msg_debug(this->name, 0, "link/file on %s failed (%s)", prev->name,
strerror(op_errno));
local->op_ret = -1;
local->op_errno = op_errno;
local->added_link = _gf_false;
} else
dht_iatt_merge(this, &local->stbuf, stbuf);
if (local->op_ret == -1)
goto cleanup;
dht_do_rename(frame);
return 0;
cleanup:
dht_rename_cleanup(frame);
return 0;
}
int
dht_rename_linkto_cbk(call_frame_t *frame, void *cookie, xlator_t *this,
int32_t op_ret, int32_t op_errno, inode_t *inode,
struct iatt *stbuf, struct iatt *preparent,
struct iatt *postparent, dict_t *xdata)
{
dht_local_t *local = NULL;
xlator_t *prev = NULL;
xlator_t *src_cached = NULL;
dict_t *xattr = NULL;
local = frame->local;
DHT_MARK_FOP_INTERNAL(xattr);
prev = cookie;
src_cached = local->src_cached;
if (op_ret == -1) {
gf_msg_debug(this->name, 0, "link/file on %s failed (%s)", prev->name,
strerror(op_errno));
local->op_ret = -1;
local->op_errno = op_errno;
}
/* If linkto creation failed move to failure cleanup code,
* instead of continuing with creating the link file */
if (local->op_ret != 0) {
goto cleanup;
}
gf_msg_trace(this->name, 0, "link %s => %s (%s)", local->loc.path,
local->loc2.path, src_cached->name);
if (gf_uuid_compare(local->loc.pargfid, local->loc2.pargfid) == 0) {
DHT_MARKER_DONT_ACCOUNT(xattr);
}
local->added_link = _gf_true;
STACK_WIND_COOKIE(frame, dht_rename_link_cbk, src_cached, src_cached,
src_cached->fops->link, &local->loc, &local->loc2, xattr);
if (xattr)
dict_unref(xattr);
return 0;
cleanup:
dht_rename_cleanup(frame);
if (xattr)
dict_unref(xattr);
return 0;
}
int
dht_rename_unlink_links_cbk(call_frame_t *frame, void *cookie, xlator_t *this,
int32_t op_ret, int32_t op_errno,
struct iatt *preparent, struct iatt *postparent,
dict_t *xdata)
{
dht_local_t *local = NULL;
xlator_t *prev = NULL;
local = frame->local;
prev = cookie;
if ((op_ret == -1) && (op_errno != ENOENT)) {
gf_msg_debug(this->name, 0, "unlink of %s on %s failed (%s)",
local->loc2.path, prev->name, strerror(op_errno));
local->op_ret = -1;
local->op_errno = op_errno;
}
if (local->op_ret == -1)
goto cleanup;
dht_do_rename(frame);
return 0;
cleanup:
dht_rename_cleanup(frame);
return 0;
}
int
dht_rename_create_links(call_frame_t *frame)
{
dht_local_t *local = NULL;
xlator_t *this = NULL;
xlator_t *src_hashed = NULL;
xlator_t *src_cached = NULL;
xlator_t *dst_hashed = NULL;
xlator_t *dst_cached = NULL;
int call_cnt = 0;
dict_t *xattr = NULL;
local = frame->local;
this = frame->this;
src_hashed = local->src_hashed;
src_cached = local->src_cached;
dst_hashed = local->dst_hashed;
dst_cached = local->dst_cached;
DHT_MARK_FOP_INTERNAL(xattr);
if (src_cached == dst_cached) {
dict_t *xattr_new = NULL;
if (dst_hashed == dst_cached)
goto nolinks;
xattr_new = dict_copy_with_ref(xattr, NULL);
gf_msg_trace(this->name, 0, "unlinking dst linkfile %s @ %s",
local->loc2.path, dst_hashed->name);
DHT_MARKER_DONT_ACCOUNT(xattr_new);
STACK_WIND_COOKIE(frame, dht_rename_unlink_links_cbk, dst_hashed,
dst_hashed, dst_hashed->fops->unlink, &local->loc2, 0,
xattr_new);
dict_unref(xattr_new);
if (xattr)
dict_unref(xattr);
return 0;
}
if (src_cached != dst_hashed) {
/* needed to create the link file */
call_cnt++;
if (dst_hashed != src_hashed)
/* needed to create the linkto file */
call_cnt++;
}
/* We should not have any failures post the link creation, as this
* introduces the newname into the namespace. Clients could have cached
* the existence of the newname and may start taking actions based on
* the same. Hence create the linkto first, and then attempt the link.
*
* NOTE: If another client is attempting the same oldname -> newname
* rename, and finds both file names as existing, and are hard links
* to each other, then FUSE would send in an unlink for oldname. In
* this time duration if we treat the linkto as a critical error and
* unlink the newname we created, we would have effectively lost the
* file to rename operations. */
if (dst_hashed != src_hashed && src_cached != dst_hashed) {
gf_msg_trace(this->name, 0, "linkfile %s @ %s => %s", local->loc.path,
dst_hashed->name, src_cached->name);
memcpy(local->gfid, local->loc.inode->gfid, 16);
dht_linkfile_create(frame, dht_rename_linkto_cbk, this, src_cached,
dst_hashed, &local->loc);
} else if (src_cached != dst_hashed) {
dict_t *xattr_new = NULL;
xattr_new = dict_copy_with_ref(xattr, NULL);
gf_msg_trace(this->name, 0, "link %s => %s (%s)", local->loc.path,
local->loc2.path, src_cached->name);
if (gf_uuid_compare(local->loc.pargfid, local->loc2.pargfid) == 0) {
DHT_MARKER_DONT_ACCOUNT(xattr_new);
}
local->added_link = _gf_true;
STACK_WIND_COOKIE(frame, dht_rename_link_cbk, src_cached, src_cached,
src_cached->fops->link, &local->loc, &local->loc2,
xattr_new);
dict_unref(xattr_new);
}
nolinks:
if (!call_cnt) {
/* skip to next step */
dht_do_rename(frame);
}
if (xattr)
dict_unref(xattr);
return 0;
}
int
dht_rename_lookup_cbk(call_frame_t *frame, void *cookie, xlator_t *this,
int op_ret, int op_errno, inode_t *inode,
struct iatt *stbuf, dict_t *xattr,
struct iatt *postparent)
{
dht_local_t *local = NULL;
int call_cnt = 0;
dht_conf_t *conf = NULL;
char gfid_local[GF_UUID_BUF_SIZE] = {0};
char gfid_server[GF_UUID_BUF_SIZE] = {0};
int child_index = -1;
gf_boolean_t is_src = _gf_false;
loc_t *loc = NULL;
child_index = (long)cookie;
local = frame->local;
conf = this->private;
is_src = (child_index == 0);
if (is_src)
loc = &local->loc;
else
loc = &local->loc2;
if (op_ret >= 0) {
if (is_src)
local->src_cached = dht_subvol_get_cached(this, local->loc.inode);
else {
if (loc->inode)
gf_uuid_unparse(loc->inode->gfid, gfid_local);
gf_msg_debug(this->name, 0,
"dst_cached before lookup: %s, "
"(path:%s)(gfid:%s),",
local->loc2.path,
local->dst_cached ? local->dst_cached->name : NULL,
local->dst_cached ? gfid_local : NULL);
local->dst_cached = dht_subvol_get_cached(this,
local->loc2_copy.inode);
gf_uuid_unparse(stbuf->ia_gfid, gfid_local);
gf_msg_debug(this->name, GF_LOG_WARNING,
"dst_cached after lookup: %s, "
"(path:%s)(gfid:%s)",
local->loc2.path,
local->dst_cached ? local->dst_cached->name : NULL,
local->dst_cached ? gfid_local : NULL);
if ((local->loc2.inode == NULL) ||
gf_uuid_compare(stbuf->ia_gfid, local->loc2.inode->gfid)) {
if (local->loc2.inode != NULL) {
inode_unlink(local->loc2.inode, local->loc2.parent,
local->loc2.name);
inode_unref(local->loc2.inode);
}
local->loc2.inode = inode_link(local->loc2_copy.inode,
local->loc2_copy.parent,
local->loc2_copy.name, stbuf);
gf_uuid_copy(local->loc2.gfid, stbuf->ia_gfid);
}
}
}
if (op_ret < 0) {
if (is_src) {
/* The meaning of is_linkfile is overloaded here. For locking
* to work properly both rebalance and rename should acquire
* lock on datafile. The reason for sending this lookup is to
* find out whether we've acquired a lock on data file.
* Between the lookup before rename and this rename, the
* file could be migrated by a rebalance process and now this
* file this might be a linkto file. We verify that by sending
* this lookup. However, if this lookup fails we cannot really
* say whether we've acquired lock on a datafile or linkto file.
* So, we act conservatively and _assume_
* that this is a linkfile and fail the rename operation.
*/
local->is_linkfile = _gf_true;
local->op_errno = op_errno;
} else {
if (local->dst_cached)
gf_msg_debug(this->name, op_errno,
"file %s (gfid:%s) was present "
"(hashed-subvol=%s, "
"cached-subvol=%s) before rename,"
" but lookup failed",
local->loc2.path,
uuid_utoa(local->loc2.inode->gfid),
local->dst_hashed->name, local->dst_cached->name);
if (dht_inode_missing(op_errno))
local->dst_cached = NULL;
}
} else if (is_src && xattr &&
check_is_linkfile(inode, stbuf, xattr, conf->link_xattr_name)) {
local->is_linkfile = _gf_true;
/* Found linkto file instead of data file, passdown ENOENT
* based on the above comment */
local->op_errno = ENOENT;
}
if (!local->is_linkfile && (op_ret >= 0) &&
gf_uuid_compare(loc->gfid, stbuf->ia_gfid)) {
gf_uuid_unparse(loc->gfid, gfid_local);
gf_uuid_unparse(stbuf->ia_gfid, gfid_server);
gf_msg(this->name, GF_LOG_WARNING, 0, DHT_MSG_GFID_MISMATCH,
"path:%s, received a different gfid, local_gfid= %s"
" server_gfid: %s",
local->loc.path, gfid_local, gfid_server);
/* Will passdown ENOENT anyway since the file we sent on
* rename is replaced with a different file */
local->op_errno = ENOENT;
/* Since local->is_linkfile is used here to detect failure,
* marking this to true */
local->is_linkfile = _gf_true;
}
call_cnt = dht_frame_return(frame);
if (is_last_call(call_cnt)) {
if (local->is_linkfile) {
local->op_ret = -1;
goto fail;
}
dht_rename_create_links(frame);
}
return 0;
fail:
dht_rename_unlock(frame, this);
return 0;
}
int
dht_rename_file_lock1_cbk(call_frame_t *frame, void *cookie, xlator_t *this,
int32_t op_ret, int32_t op_errno, dict_t *xdata)
{
dht_local_t *local = NULL;
char src_gfid[GF_UUID_BUF_SIZE] = {0};
char dst_gfid[GF_UUID_BUF_SIZE] = {0};
int ret = 0;
loc_t *loc = NULL;
xlator_t *subvol = NULL;
local = frame->local;
if (op_ret < 0) {
uuid_utoa_r(local->loc.inode->gfid, src_gfid);
if (local->loc2.inode)
uuid_utoa_r(local->loc2.inode->gfid, dst_gfid);
gf_msg(this->name, GF_LOG_WARNING, op_errno, DHT_MSG_INODE_LK_ERROR,
"protecting namespace of %s failed"
"rename (%s:%s:%s %s:%s:%s)",
local->current == &local->lock[0] ? local->loc.path
: local->loc2.path,
local->loc.path, src_gfid, local->src_hashed->name,
local->loc2.path, dst_gfid,
local->dst_hashed ? local->dst_hashed->name : NULL);
local->op_ret = -1;
local->op_errno = op_errno;
goto err;
}
if (local->current == &local->lock[0]) {
loc = &local->loc2;
subvol = local->dst_hashed;
local->current = &local->lock[1];
} else {
loc = &local->loc;
subvol = local->src_hashed;
local->current = &local->lock[0];
}
ret = dht_protect_namespace(frame, loc, subvol, &local->current->ns,
dht_rename_lock_cbk);
if (ret < 0) {
op_errno = EINVAL;
goto err;
}
return 0;
err:
/* No harm in calling an extra unlock */
dht_rename_unlock(frame, this);
return 0;
}
int32_t
dht_rename_file_protect_namespace(call_frame_t *frame, void *cookie,
xlator_t *this, int32_t op_ret,
int32_t op_errno, dict_t *xdata)
{
dht_local_t *local = NULL;
char src_gfid[GF_UUID_BUF_SIZE] = {0};
char dst_gfid[GF_UUID_BUF_SIZE] = {0};
int ret = 0;
loc_t *loc = NULL;
xlator_t *subvol = NULL;
local = frame->local;
if (op_ret < 0) {
uuid_utoa_r(local->loc.inode->gfid, src_gfid);
if (local->loc2.inode)
uuid_utoa_r(local->loc2.inode->gfid, dst_gfid);
gf_msg(this->name, GF_LOG_WARNING, op_errno, DHT_MSG_INODE_LK_ERROR,
"acquiring inodelk failed "
"rename (%s:%s:%s %s:%s:%s)",
local->loc.path, src_gfid, local->src_cached->name,
local->loc2.path, dst_gfid,
local->dst_cached ? local->dst_cached->name : NULL);
local->op_ret = -1;
local->op_errno = op_errno;
goto err;
}
/* Locks on src and dst needs to ordered which otherwise might cause
* deadlocks when rename (src, dst) and rename (dst, src) is done from
* two different clients
*/
ret = dht_order_rename_lock(frame, &loc, &subvol);
if (ret) {
local->op_errno = ENOMEM;
goto err;
}
ret = dht_protect_namespace(frame, loc, subvol, &local->current->ns,
dht_rename_file_lock1_cbk);
if (ret < 0) {
op_errno = EINVAL;
goto err;
}
return 0;
err:
/* Its fine to call unlock even when no locks are acquired, as we check
* for lock->locked before winding a unlock call.
*/
dht_rename_unlock(frame, this);
return 0;
}
int32_t
dht_rename_lock_cbk(call_frame_t *frame, void *cookie, xlator_t *this,
int32_t op_ret, int32_t op_errno, dict_t *xdata)
{
dht_local_t *local = NULL;
char src_gfid[GF_UUID_BUF_SIZE] = {0};
char dst_gfid[GF_UUID_BUF_SIZE] = {0};
dict_t *xattr_req = NULL;
dht_conf_t *conf = NULL;
int i = 0;
xlator_t *subvol = NULL;
dht_lock_t *lock = NULL;
local = frame->local;
conf = this->private;
if (op_ret < 0) {
uuid_utoa_r(local->loc.inode->gfid, src_gfid);
if (local->loc2.inode)
uuid_utoa_r(local->loc2.inode->gfid, dst_gfid);
gf_msg(this->name, GF_LOG_WARNING, op_errno, DHT_MSG_INODE_LK_ERROR,
"protecting namespace of %s failed. "
"rename (%s:%s:%s %s:%s:%s)",
local->current == &local->lock[0] ? local->loc.path
: local->loc2.path,
local->loc.path, src_gfid, local->src_hashed->name,
local->loc2.path, dst_gfid,
local->dst_hashed ? local->dst_hashed->name : NULL);
local->op_ret = -1;
local->op_errno = op_errno;
goto done;
}
xattr_req = dict_new();
if (xattr_req == NULL) {
local->op_ret = -1;
local->op_errno = ENOMEM;
goto done;
}
op_ret = dict_set_uint32(xattr_req, conf->link_xattr_name, 256);
if (op_ret < 0) {
local->op_ret = -1;
local->op_errno = -op_ret;
goto done;
}
/* dst_cached might've changed. This normally happens for two reasons:
* 1. rebalance migrated dst
* 2. Another parallel rename was done overwriting dst
*
* Doing a lookup on local->loc2 when dst exists, but is associated
* with a different gfid will result in an ESTALE error. So, do a fresh
* lookup with a new inode on dst-path and handle change of dst-cached
* in the cbk. Also, to identify dst-cached changes we do a lookup on
* "this" rather than the subvol.
*/
loc_copy(&local->loc2_copy, &local->loc2);
inode_unref(local->loc2_copy.inode);
local->loc2_copy.inode = inode_new(local->loc.inode->table);
/* Why not use local->lock.locks[?].loc for lookup post lock phase
* ---------------------------------------------------------------
* "layout.parent_layout.locks[?].loc" does not have the name and pargfid
* populated.
* Reason: If we had populated the name and pargfid, server might
* resolve to a successful lookup even if there is a file with same name
* with a different gfid(unlink & create) as server does name based
* resolution on first priority. And this can result in operating on a
* different inode entirely.
*
* Now consider a scenario where source file was renamed by some other
* client to a new name just before this lock was granted. So if a
* lookup would be done on local->lock[0].layout.parent_layout.locks[?].loc,
* server will send success even if the entry was renamed (since server will
* do a gfid based resolution). So once a lock is granted, make sure the
* file exists with the name that the client requested with.
* */
local->call_cnt = 2;
for (i = 0; i < 2; i++) {
if (i == 0) {
lock = local->rename_inodelk_backward_compatible[0];
if (gf_uuid_compare(local->loc.gfid, lock->loc.gfid) == 0)
subvol = lock->xl;
else {
lock = local->rename_inodelk_backward_compatible[1];
subvol = lock->xl;
}
} else {
subvol = this;
}
STACK_WIND_COOKIE(frame, dht_rename_lookup_cbk, (void *)(long)i, subvol,
subvol->fops->lookup,
(i == 0) ? &local->loc : &local->loc2_copy,
xattr_req);
}
dict_unref(xattr_req);
return 0;
done:
/* Its fine to call unlock even when no locks are acquired, as we check
* for lock->locked before winding a unlock call.
*/
dht_rename_unlock(frame, this);
if (xattr_req)
dict_unref(xattr_req);
return 0;
}
int
dht_rename_lock(call_frame_t *frame)
{
dht_local_t *local = NULL;
int count = 1, ret = -1;
dht_lock_t **lk_array = NULL;
local = frame->local;
if (local->dst_cached)
count++;
lk_array = GF_CALLOC(count, sizeof(*lk_array), gf_common_mt_pointer);
if (lk_array == NULL)
goto err;
lk_array[0] = dht_lock_new(frame->this, local->src_cached, &local->loc,
F_WRLCK, DHT_FILE_MIGRATE_DOMAIN, NULL,
FAIL_ON_ANY_ERROR);
if (lk_array[0] == NULL)
goto err;
if (local->dst_cached) {
/* dst might be removed by the time inodelk reaches bricks,
* which can result in ESTALE errors. POSIX imposes no
* restriction for dst to be present for renames to be
* successful. So, we'll ignore ESTALE errors. As far as
* synchronization on dst goes, we'll achieve the same by
* holding entrylk on parent directory of dst in the namespace
* of basename(dst). Also, there might not be quorum in cluster
* xlators like EC/disperse on errno, in which case they return
* EIO. For eg., in a disperse (4 + 2), 3 might return success
* and three might return ESTALE. Disperse, having no Quorum
* unwinds inodelk with EIO. So, ignore EIO too.
*/
lk_array[1] = dht_lock_new(frame->this, local->dst_cached, &local->loc2,
F_WRLCK, DHT_FILE_MIGRATE_DOMAIN, NULL,
IGNORE_ENOENT_ESTALE_EIO);
if (lk_array[1] == NULL)
goto err;
}
local->rename_inodelk_backward_compatible = lk_array;
local->rename_inodelk_bc_count = count;
/* retaining inodelks for the sake of backward compatibility. Please
* make sure to remove this inodelk once all of 3.10, 3.12 and 3.13
* reach EOL. Better way of getting synchronization would be to acquire
* entrylks on src and dst parent directories in the namespace of
* basenames of src and dst
*/
ret = dht_blocking_inodelk(frame, lk_array, count,
dht_rename_file_protect_namespace);
if (ret < 0) {
local->rename_inodelk_backward_compatible = NULL;
local->rename_inodelk_bc_count = 0;
goto err;
}
return 0;
err:
if (lk_array != NULL) {
int tmp_count = 0, i = 0;
for (i = 0; (i < count) && (lk_array[i]); i++, tmp_count++)
;
dht_lock_array_free(lk_array, tmp_count);
GF_FREE(lk_array);
}
return -1;
}
int
dht_rename(call_frame_t *frame, xlator_t *this, loc_t *oldloc, loc_t *newloc,
dict_t *xdata)
{
xlator_t *src_cached = NULL;
xlator_t *src_hashed = NULL;
xlator_t *dst_cached = NULL;
xlator_t *dst_hashed = NULL;
int op_errno = -1;
int ret = -1;
dht_local_t *local = NULL;
char gfid[GF_UUID_BUF_SIZE] = {0};
char newgfid[GF_UUID_BUF_SIZE] = {0};
VALIDATE_OR_GOTO(frame, err);
VALIDATE_OR_GOTO(this, err);
VALIDATE_OR_GOTO(oldloc, err);
VALIDATE_OR_GOTO(newloc, err);
gf_uuid_unparse(oldloc->inode->gfid, gfid);
src_hashed = dht_subvol_get_hashed(this, oldloc);
if (!src_hashed) {
gf_msg(this->name, GF_LOG_INFO, 0, DHT_MSG_RENAME_FAILED,
"No hashed subvolume in layout for path=%s,"
"(gfid = %s)",
oldloc->path, gfid);
op_errno = EINVAL;
goto err;
}
src_cached = dht_subvol_get_cached(this, oldloc->inode);
if (!src_cached) {
gf_msg(this->name, GF_LOG_INFO, 0, DHT_MSG_RENAME_FAILED,
"No cached subvolume for path = %s,"
"(gfid = %s)",
oldloc->path, gfid);
op_errno = EINVAL;
goto err;
}
dst_hashed = dht_subvol_get_hashed(this, newloc);
if (!dst_hashed) {
gf_msg(this->name, GF_LOG_INFO, 0, DHT_MSG_RENAME_FAILED,
"No hashed subvolume in layout for path=%s", newloc->path);
op_errno = EINVAL;
goto err;
}
if (newloc->inode)
dst_cached = dht_subvol_get_cached(this, newloc->inode);
local = dht_local_init(frame, oldloc, NULL, GF_FOP_RENAME);
if (!local) {
op_errno = ENOMEM;
goto err;
}
/* cached_subvol will be set from dht_local_init, reset it to NULL,
as the logic of handling rename is different */
local->cached_subvol = NULL;
ret = loc_copy(&local->loc2, newloc);
if (ret == -1) {
op_errno = ENOMEM;
goto err;
}
local->src_hashed = src_hashed;
local->src_cached = src_cached;
local->dst_hashed = dst_hashed;
local->dst_cached = dst_cached;
if (xdata)
local->xattr_req = dict_ref(xdata);
if (newloc->inode)
gf_uuid_unparse(newloc->inode->gfid, newgfid);
gf_msg(this->name, GF_LOG_INFO, 0, DHT_MSG_RENAME_INFO,
"renaming %s (%s) (hash=%s/cache=%s) => %s (%s) "
"(hash=%s/cache=%s) ",
oldloc->path, gfid, src_hashed->name, src_cached->name, newloc->path,
newloc->inode ? newgfid : NULL, dst_hashed->name,
dst_cached ? dst_cached->name : "<nul>");
if (IA_ISDIR(oldloc->inode->ia_type)) {
dht_rename_dir(frame, this);
} else {
local->op_ret = 0;
ret = dht_rename_lock(frame);
if (ret < 0) {
op_errno = ENOMEM;
goto err;
}
}
return 0;
err:
op_errno = (op_errno == -1) ? errno : op_errno;
DHT_STACK_UNWIND(rename, frame, -1, op_errno, NULL, NULL, NULL, NULL, NULL,
NULL);
return 0;
}
int
dht_pt_rename(call_frame_t *frame, xlator_t *this, loc_t *oldloc, loc_t *newloc,
dict_t *xdata)
{
gf_boolean_t free_xdata = _gf_false;
/* Just a pass through */
if (!IA_ISDIR(oldloc->inode->ia_type)) {
if (!xdata) {
free_xdata = _gf_true;
}
DHT_CHANGELOG_TRACK_AS_RENAME(xdata, oldloc, newloc);
}
default_rename(frame, this, oldloc, newloc, xdata);
if (free_xdata && xdata) {
dict_unref(xdata);
xdata = NULL;
}
return 0;
}