/*
Copyright (c) 2008-2012 Red Hat, Inc. <http://www.redhat.com>
This file is part of GlusterFS.
This file is licensed to you under your choice of the GNU Lesser
General Public License, version 3 or any later version (LGPLv3 or
later), or the GNU General Public License, version 2 (GPLv2), in all
cases as published by the Free Software Foundation.
*/
#include <glusterfs/dict.h>
#include <glusterfs/byte-order.h>
#include <glusterfs/common-utils.h>
#include "afr.h"
#include "afr-transaction.h"
#include "afr-messages.h"
#include <signal.h>
#define LOCKED_NO 0x0 /* no lock held */
#define LOCKED_YES 0x1 /* for DATA, METADATA, ENTRY and higher_path */
#define LOCKED_LOWER 0x2 /* for lower path */
void
afr_lockee_cleanup(afr_lockee_t *lockee)
{
if (lockee->fd) {
fd_unref(lockee->fd);
lockee->fd = NULL;
} else {
loc_wipe(&lockee->loc);
}
GF_FREE(lockee->basename);
lockee->basename = NULL;
GF_FREE(lockee->locked_nodes);
lockee->locked_nodes = NULL;
return;
}
void
afr_lockees_cleanup(afr_internal_lock_t *int_lock)
{
int i = 0;
for (i = 0; i < int_lock->lockee_count; i++) {
afr_lockee_cleanup(&int_lock->lockee[i]);
}
return;
}
int
afr_entry_lockee_cmp(const void *l1, const void *l2)
{
const afr_lockee_t *r1 = l1;
const afr_lockee_t *r2 = l2;
int ret = 0;
uuid_t gfid1 = {0};
uuid_t gfid2 = {0};
loc_gfid((loc_t *)&r1->loc, gfid1);
loc_gfid((loc_t *)&r2->loc, gfid2);
ret = gf_uuid_compare(gfid1, gfid2);
/*Entrylks with NULL basename are the 'smallest'*/
if (ret == 0) {
if (!r1->basename)
return -1;
if (!r2->basename)
return 1;
ret = strcmp(r1->basename, r2->basename);
}
if (ret <= 0)
return -1;
else
return 1;
}
int
afr_lock_blocking(call_frame_t *frame, xlator_t *this, int child_index);
void
afr_set_lk_owner(call_frame_t *frame, xlator_t *this, void *lk_owner)
{
gf_msg_trace(this->name, 0, "Setting lk-owner=%llu",
(unsigned long long)(unsigned long)lk_owner);
set_lk_owner_from_ptr(&frame->root->lk_owner, lk_owner);
}
int32_t
internal_lock_count(call_frame_t *frame, xlator_t *this)
{
afr_local_t *local = NULL;
afr_private_t *priv = NULL;
int32_t call_count = 0;
int i = 0;
local = frame->local;
priv = this->private;
for (i = 0; i < priv->child_count; i++) {
if (local->child_up[i])
++call_count;
}
return call_count;
}
int
afr_add_entry_lockee(afr_local_t *local, loc_t *loc, char *basename,
int child_count)
{
int ret = -ENOMEM;
afr_internal_lock_t *int_lock = &local->internal_lock;
afr_lockee_t *lockee = &int_lock->lockee[int_lock->lockee_count];
GF_ASSERT(int_lock->lockee_count < AFR_LOCKEE_COUNT_MAX);
loc_copy(&lockee->loc, loc);
lockee->basename = (basename) ? gf_strdup(basename) : NULL;
if (basename && !lockee->basename)
goto out;
lockee->locked_count = 0;
lockee->locked_nodes = GF_CALLOC(child_count, sizeof(*lockee->locked_nodes),
gf_afr_mt_afr_node_character);
if (!lockee->locked_nodes)
goto out;
ret = 0;
int_lock->lockee_count++;
out:
if (ret) {
afr_lockee_cleanup(lockee);
}
return ret;
}
int
afr_add_inode_lockee(afr_local_t *local, int child_count)
{
int ret = -ENOMEM;
afr_internal_lock_t *int_lock = &local->internal_lock;
afr_lockee_t *lockee = &int_lock->lockee[int_lock->lockee_count];
if (local->fd) {
lockee->fd = fd_ref(local->fd);
} else {
loc_copy(&lockee->loc, &local->loc);
}
lockee->locked_count = 0;
lockee->locked_nodes = GF_CALLOC(child_count, sizeof(*lockee->locked_nodes),
gf_afr_mt_afr_node_character);
if (!lockee->locked_nodes)
goto out;
ret = 0;
int_lock->lockee_count++;
out:
if (ret) {
afr_lockee_cleanup(lockee);
}
return ret;
}
static int
initialize_internal_lock_variables(call_frame_t *frame, xlator_t *this)
{
afr_local_t *local = NULL;
afr_internal_lock_t *int_lock = NULL;
afr_private_t *priv = NULL;
int i = 0;
priv = this->private;
local = frame->local;
int_lock = &local->internal_lock;
int_lock->lock_count = 0;
int_lock->lock_op_ret = -1;
int_lock->lock_op_errno = 0;
int_lock->lk_attempted_count = 0;
for (i = 0; i < AFR_LOCKEE_COUNT_MAX; i++) {
if (!int_lock->lockee[i].locked_nodes)
break;
int_lock->lockee[i].locked_count = 0;
memset(int_lock->lockee[i].locked_nodes, 0,
sizeof(*int_lock->lockee[i].locked_nodes) * priv->child_count);
}
return 0;
}
int
afr_lockee_locked_nodes_count(afr_internal_lock_t *int_lock)
{
int call_count = 0;
int i = 0;
for (i = 0; i < int_lock->lockee_count; i++)
call_count += int_lock->lockee[i].locked_count;
return call_count;
}
int
afr_locked_nodes_count(unsigned char *locked_nodes, int child_count)
{
int i = 0;
int call_count = 0;
for (i = 0; i < child_count; i++) {
if (locked_nodes[i] & LOCKED_YES)
call_count++;
}
return call_count;
}
static void
afr_log_locks_failure(call_frame_t *frame, char *where, char *what,
int op_errno)
{
xlator_t *this = frame->this;
gf_lkowner_t *lk_owner = &frame->root->lk_owner;
afr_local_t *local = frame->local;
const char *fop = NULL;
char *gfid = NULL;
const char *name = NULL;
fop = gf_fop_list[local->op];
switch (local->transaction.type) {
case AFR_ENTRY_RENAME_TRANSACTION:
case AFR_ENTRY_TRANSACTION:
switch (local->op) {
case GF_FOP_LINK:
gfid = uuid_utoa(local->newloc.pargfid);
name = local->newloc.name;
break;
default:
gfid = uuid_utoa(local->loc.pargfid);
name = local->loc.name;
break;
}
gf_msg(this->name, GF_LOG_WARNING, op_errno,
AFR_MSG_INTERNAL_LKS_FAILED,
"Unable to do entry %s with lk-owner:%s on %s "
"while attempting %s on {pgfid:%s, name:%s}.",
what, lkowner_utoa(lk_owner), where, fop, gfid, name);
break;
case AFR_DATA_TRANSACTION:
case AFR_METADATA_TRANSACTION:
gfid = uuid_utoa(local->inode->gfid);
gf_msg(this->name, GF_LOG_WARNING, op_errno,
AFR_MSG_INTERNAL_LKS_FAILED,
"Unable to do inode %s with lk-owner:%s on %s "
"while attempting %s on gfid:%s.",
what, lkowner_utoa(lk_owner), where, fop, gfid);
break;
}
}
static int32_t
afr_unlock_common_cbk(call_frame_t *frame, void *cookie, xlator_t *this,
int32_t op_ret, int32_t op_errno, dict_t *xdata)
{
afr_local_t *local = NULL;
afr_private_t *priv = NULL;
afr_internal_lock_t *int_lock = NULL;
int lockee_num = 0;
int call_count = 0;
int child_index = 0;
int ret = 0;
local = frame->local;
int_lock = &local->internal_lock;
priv = this->private;
lockee_num = (int)((long)cookie) / priv->child_count;
child_index = (int)((long)cookie) % priv->child_count;
if (op_ret < 0 && op_errno != ENOTCONN && op_errno != EBADFD) {
afr_log_locks_failure(frame, priv->children[child_index]->name,
"unlock", op_errno);
}
int_lock->lockee[lockee_num].locked_nodes[child_index] &= LOCKED_NO;
if (local->transaction.type == AFR_DATA_TRANSACTION && op_ret != 1)
ret = afr_write_subvol_reset(frame, this);
LOCK(&frame->lock);
{
call_count = --int_lock->lk_call_count;
}
UNLOCK(&frame->lock);
if (call_count == 0) {
int_lock->lock_cbk(frame, this);
}
return ret;
}
void
afr_internal_lock_wind(call_frame_t *frame,
int32_t (*cbk)(call_frame_t *, void *, xlator_t *,
int32_t, int32_t, dict_t *),
void *cookie, int child, int lockee_num,
gf_boolean_t blocking, gf_boolean_t unlock)
{
afr_local_t *local = frame->local;
xlator_t *this = frame->this;
afr_private_t *priv = this->private;
afr_internal_lock_t *int_lock = &local->internal_lock;
entrylk_cmd cmd = ENTRYLK_LOCK_NB;
int32_t cmd1 = F_SETLK;
struct gf_flock flock = {
0,
};
switch (local->transaction.type) {
case AFR_ENTRY_TRANSACTION:
case AFR_ENTRY_RENAME_TRANSACTION:
if (unlock) {
cmd = ENTRYLK_UNLOCK;
} else if (blocking) { /*Doesn't make sense to have blocking
unlock*/
cmd = ENTRYLK_LOCK;
}
if (local->fd) {
STACK_WIND_COOKIE(frame, cbk, cookie, priv->children[child],
priv->children[child]->fops->fentrylk,
int_lock->domain,
int_lock->lockee[lockee_num].fd,
int_lock->lockee[lockee_num].basename, cmd,
ENTRYLK_WRLCK, NULL);
} else {
STACK_WIND_COOKIE(frame, cbk, cookie, priv->children[child],
priv->children[child]->fops->entrylk,
int_lock->domain,
&int_lock->lockee[lockee_num].loc,
int_lock->lockee[lockee_num].basename, cmd,
ENTRYLK_WRLCK, NULL);
}
break;
case AFR_DATA_TRANSACTION:
case AFR_METADATA_TRANSACTION:
flock = int_lock->lockee[lockee_num].flock;
if (unlock) {
flock.l_type = F_UNLCK;
} else if (blocking) { /*Doesn't make sense to have blocking
unlock*/
cmd1 = F_SETLKW;
}
if (local->fd) {
STACK_WIND_COOKIE(
frame, cbk, cookie, priv->children[child],
priv->children[child]->fops->finodelk, int_lock->domain,
int_lock->lockee[lockee_num].fd, cmd1, &flock, NULL);
} else {
STACK_WIND_COOKIE(
frame, cbk, cookie, priv->children[child],
priv->children[child]->fops->inodelk, int_lock->domain,
&int_lock->lockee[lockee_num].loc, cmd1, &flock, NULL);
}
break;
}
}
static int
afr_unlock_now(call_frame_t *frame, xlator_t *this)
{
afr_internal_lock_t *int_lock = NULL;
afr_local_t *local = NULL;
afr_private_t *priv = NULL;
int call_count = 0;
int child_index = 0;
int lockee_num = 0;
int i = -1;
local = frame->local;
int_lock = &local->internal_lock;
priv = this->private;
call_count = afr_lockee_locked_nodes_count(int_lock);
int_lock->lk_call_count = call_count;
if (!call_count) {
gf_msg_trace(this->name, 0, "No internal locks unlocked");
int_lock->lock_cbk(frame, this);
goto out;
}
for (i = 0; i < int_lock->lockee_count * priv->child_count; i++) {
lockee_num = i / priv->child_count;
child_index = i % priv->child_count;
if (int_lock->lockee[lockee_num].locked_nodes[child_index] &
LOCKED_YES) {
afr_internal_lock_wind(frame, afr_unlock_common_cbk,
(void *)(long)i, child_index, lockee_num,
_gf_false, _gf_true);
if (!--call_count)
break;
}
}
out:
return 0;
}
static int32_t
afr_lock_cbk(call_frame_t *frame, void *cookie, xlator_t *this, int32_t op_ret,
int32_t op_errno, dict_t *xdata)
{
afr_internal_lock_t *int_lock = NULL;
afr_local_t *local = NULL;
afr_private_t *priv = NULL;
int cky = (long)cookie;
int child_index = 0;
int lockee_num = 0;
priv = this->private;
local = frame->local;
int_lock = &local->internal_lock;
child_index = ((int)cky) % priv->child_count;
lockee_num = ((int)cky) / priv->child_count;
LOCK(&frame->lock);
{
if (op_ret == -1) {
if (op_errno == ENOSYS) {
/* return ENOTSUP */
gf_msg(this->name, GF_LOG_ERROR, ENOSYS,
AFR_MSG_LOCK_XLATOR_NOT_LOADED,
"subvolume does not support locking. "
"please load features/locks xlator on server");
local->op_ret = op_ret;
int_lock->lock_op_ret = op_ret;
}
local->op_errno = op_errno;
int_lock->lock_op_errno = op_errno;
}
int_lock->lk_attempted_count++;
}
UNLOCK(&frame->lock);
if ((op_ret == -1) && (op_errno == ENOSYS)) {
afr_unlock_now(frame, this);
} else {
if (op_ret == 0) {
int_lock->lockee[lockee_num]
.locked_nodes[child_index] |= LOCKED_YES;
int_lock->lockee[lockee_num].locked_count++;
int_lock->lock_count++;
if (local->transaction.type == AFR_DATA_TRANSACTION) {
LOCK(&local->inode->lock);
{
local->inode_ctx->lock_count++;
}
UNLOCK(&local->inode->lock);
}
}
afr_lock_blocking(frame, this, cky + 1);
}
return 0;
}
static gf_boolean_t
_is_lock_wind_needed(afr_local_t *local, int child_index)
{
if (!local->child_up[child_index])
return _gf_false;
return _gf_true;
}
static gf_boolean_t
is_blocking_locks_count_sufficient(call_frame_t *frame, xlator_t *this)
{
afr_local_t *local = NULL;
afr_private_t *priv = NULL;
afr_internal_lock_t *int_lock = NULL;
int child = 0;
int nlockee = 0;
int lockee_count = 0;
gf_boolean_t ret = _gf_true;
local = frame->local;
priv = this->private;
int_lock = &local->internal_lock;
lockee_count = int_lock->lockee_count;
if (int_lock->lock_count == 0) {
afr_log_locks_failure(frame, "any subvolume", "lock",
int_lock->lock_op_errno);
return _gf_false;
}
/* For FOPS that take multiple sets of locks (mkdir, rename),
* there must be at least one brick on which the locks from
* all lock sets were successful. */
for (child = 0; child < priv->child_count; child++) {
ret = _gf_true;
for (nlockee = 0; nlockee < lockee_count; nlockee++) {
if (!(int_lock->lockee[nlockee].locked_nodes[child] & LOCKED_YES))
ret = _gf_false;
}
if (ret)
return ret;
}
if (!ret)
afr_log_locks_failure(frame, "all", "lock", int_lock->lock_op_errno);
return ret;
}
int
afr_lock_blocking(call_frame_t *frame, xlator_t *this, int cookie)
{
afr_internal_lock_t *int_lock = NULL;
afr_local_t *local = NULL;
afr_private_t *priv = NULL;
uint64_t ctx = 0;
int ret = 0;
int child_index = 0;
int lockee_num = 0;
local = frame->local;
int_lock = &local->internal_lock;
priv = this->private;
child_index = cookie % priv->child_count;
lockee_num = cookie / priv->child_count;
if (local->fd) {
ret = fd_ctx_get(local->fd, this, &ctx);
if (ret < 0) {
gf_msg(this->name, GF_LOG_INFO, 0, AFR_MSG_FD_CTX_GET_FAILED,
"unable to get fd ctx for fd=%p", local->fd);
local->op_ret = -1;
int_lock->lock_op_ret = -1;
afr_unlock_now(frame, this);
return 0;
}
}
if (int_lock->lk_expected_count == int_lock->lk_attempted_count) {
if (!is_blocking_locks_count_sufficient(frame, this)) {
local->op_ret = -1;
int_lock->lock_op_ret = -1;
afr_unlock_now(frame, this);
return 0;
}
}
if (int_lock->lk_expected_count == int_lock->lk_attempted_count) {
/* we're done locking */
gf_msg_debug(this->name, 0, "we're done locking");
int_lock->lock_op_ret = 0;
int_lock->lock_cbk(frame, this);
return 0;
}
if (!_is_lock_wind_needed(local, child_index)) {
afr_lock_blocking(frame, this, cookie + 1);
return 0;
}
afr_internal_lock_wind(frame, afr_lock_cbk, (void *)(long)cookie,
child_index, lockee_num, _gf_true, _gf_false);
return 0;
}
int32_t
afr_blocking_lock(call_frame_t *frame, xlator_t *this)
{
afr_internal_lock_t *int_lock = NULL;
afr_local_t *local = NULL;
afr_private_t *priv = NULL;
int up_count = 0;
priv = this->private;
local = frame->local;
int_lock = &local->internal_lock;
up_count = AFR_COUNT(local->child_up, priv->child_count);
int_lock->lk_call_count = int_lock->lk_expected_count =
(int_lock->lockee_count * up_count);
initialize_internal_lock_variables(frame, this);
afr_lock_blocking(frame, this, 0);
return 0;
}
static int32_t
afr_nb_internal_lock_cbk(call_frame_t *frame, void *cookie, xlator_t *this,
int32_t op_ret, int32_t op_errno, dict_t *xdata)
{
afr_internal_lock_t *int_lock = NULL;
afr_local_t *local = NULL;
int call_count = 0;
int child_index = 0;
int lockee_num = 0;
afr_private_t *priv = NULL;
priv = this->private;
child_index = ((long)cookie) % priv->child_count;
lockee_num = ((long)cookie) / priv->child_count;
local = frame->local;
int_lock = &local->internal_lock;
if (op_ret == 0 && local->transaction.type == AFR_DATA_TRANSACTION) {
LOCK(&local->inode->lock);
{
local->inode_ctx->lock_count++;
}
UNLOCK(&local->inode->lock);
}
LOCK(&frame->lock);
{
if (op_ret < 0) {
if (op_errno == ENOSYS) {
/* return ENOTSUP */
gf_msg(this->name, GF_LOG_ERROR, ENOSYS,
AFR_MSG_LOCK_XLATOR_NOT_LOADED,
"subvolume does not support "
"locking. please load features/locks"
" xlator on server");
local->op_ret = op_ret;
int_lock->lock_op_ret = op_ret;
int_lock->lock_op_errno = op_errno;
local->op_errno = op_errno;
}
} else if (op_ret == 0) {
int_lock->lockee[lockee_num]
.locked_nodes[child_index] |= LOCKED_YES;
int_lock->lockee[lockee_num].locked_count++;
int_lock->lock_count++;
}
call_count = --int_lock->lk_call_count;
}
UNLOCK(&frame->lock);
if (call_count == 0) {
gf_msg_trace(this->name, 0, "Last locking reply received");
/* all locks successful. Proceed to call FOP */
if (int_lock->lock_count == int_lock->lk_expected_count) {
gf_msg_trace(this->name, 0, "All servers locked. Calling the cbk");
int_lock->lock_op_ret = 0;
int_lock->lock_cbk(frame, this);
}
/* Not all locks were successful. Unlock and try locking
again, this time with serially blocking locks */
else {
gf_msg_trace(this->name, 0,
"%d servers locked. Trying again "
"with blocking calls",
int_lock->lock_count);
afr_unlock_now(frame, this);
}
}
return 0;
}
int
afr_lock_nonblocking(call_frame_t *frame, xlator_t *this)
{
afr_internal_lock_t *int_lock = NULL;
afr_local_t *local = NULL;
afr_private_t *priv = NULL;
afr_fd_ctx_t *fd_ctx = NULL;
int child = 0;
int lockee_num = 0;
int32_t call_count = 0;
int i = 0;
int ret = 0;
local = frame->local;
int_lock = &local->internal_lock;
priv = this->private;
initialize_internal_lock_variables(frame, this);
if (local->fd) {
fd_ctx = afr_fd_ctx_get(local->fd, this);
if (!fd_ctx) {
gf_msg(this->name, GF_LOG_INFO, 0, AFR_MSG_FD_CTX_GET_FAILED,
"unable to get fd ctx for fd=%p", local->fd);
local->op_ret = -1;
int_lock->lock_op_ret = -1;
local->op_errno = EINVAL;
int_lock->lock_op_errno = EINVAL;
afr_unlock_now(frame, this);
ret = -1;
goto out;
}
}
call_count = int_lock->lockee_count * internal_lock_count(frame, this);
int_lock->lk_call_count = call_count;
int_lock->lk_expected_count = call_count;
if (!call_count) {
gf_msg(this->name, GF_LOG_INFO, 0, AFR_MSG_INFO_COMMON,
"fd not open on any subvolumes. aborting.");
afr_unlock_now(frame, this);
goto out;
}
/* Send non-blocking lock calls only on up children
and where the fd has been opened */
for (i = 0; i < int_lock->lockee_count * priv->child_count; i++) {
child = i % priv->child_count;
lockee_num = i / priv->child_count;
if (local->child_up[child]) {
afr_internal_lock_wind(frame, afr_nb_internal_lock_cbk,
(void *)(long)i, child, lockee_num,
_gf_false, _gf_false);
if (!--call_count)
break;
}
}
out:
return ret;
}
int32_t
afr_unlock(call_frame_t *frame, xlator_t *this)
{
afr_local_t *local = NULL;
afr_lock_t *lock = NULL;
local = frame->local;
if (!local->transaction.eager_lock_on)
goto out;
lock = &local->inode_ctx->lock[local->transaction.type];
LOCK(&local->inode->lock);
{
list_del_init(&local->transaction.owner_list);
if (list_empty(&lock->owners) && list_empty(&lock->post_op)) {
local->transaction.do_eager_unlock = _gf_true;
/*TODO: Need to get metadata use on_disk and inherit/uninherit
*GF_ASSERT (!local->inode_ctx->on_disk[local->transaction.type]);
*GF_ASSERT (!local->inode_ctx->inherited[local->transaction.type]);
*/
GF_ASSERT(lock->release);
}
}
UNLOCK(&local->inode->lock);
if (!local->transaction.do_eager_unlock) {
local->internal_lock.lock_cbk(frame, this);
return 0;
}
out:
afr_unlock_now(frame, this);
return 0;
}