/*
Copyright (c) 2006-2012 Red Hat, Inc. <http://www.redhat.com>
This file is part of GlusterFS.
This file is licensed to you under your choice of the GNU Lesser
General Public License, version 3 or any later version (LGPLv3 or
later), or the GNU General Public License, version 2 (GPLv2), in all
cases as published by the Free Software Foundation.
*/
#include <glusterfs/glusterfs.h>
#include <glusterfs/compat.h>
#include <glusterfs/xlator.h>
#include <glusterfs/logging.h>
#include <glusterfs/common-utils.h>
#include <glusterfs/list.h>
#include <glusterfs/upcall-utils.h>
#include "locks.h"
#include "clear.h"
#include "common.h"
#include "pl-messages.h"
void
__pl_entrylk_unref(pl_entry_lock_t *lock)
{
lock->ref--;
if (!lock->ref) {
GF_FREE((char *)lock->basename);
GF_FREE(lock->connection_id);
GF_FREE(lock);
}
}
static void
__pl_entrylk_ref(pl_entry_lock_t *lock)
{
lock->ref++;
}
static pl_entry_lock_t *
new_entrylk_lock(pl_inode_t *pinode, const char *basename, entrylk_type type,
const char *domain, call_frame_t *frame, char *conn_id)
{
pl_entry_lock_t *newlock = NULL;
newlock = GF_CALLOC(1, sizeof(pl_entry_lock_t),
gf_locks_mt_pl_entry_lock_t);
if (!newlock) {
goto out;
}
newlock->basename = basename ? gf_strdup(basename) : NULL;
newlock->type = type;
newlock->client = frame->root->client;
newlock->client_pid = frame->root->pid;
newlock->volume = domain;
newlock->owner = frame->root->lk_owner;
newlock->frame = frame;
newlock->this = frame->this;
if (conn_id) {
newlock->connection_id = gf_strdup(conn_id);
}
INIT_LIST_HEAD(&newlock->domain_list);
INIT_LIST_HEAD(&newlock->blocked_locks);
INIT_LIST_HEAD(&newlock->client_list);
__pl_entrylk_ref(newlock);
out:
return newlock;
}
/**
* all_names - does a basename represent all names?
* @basename: name to check
*/
#define all_names(basename) ((basename == NULL) ? 1 : 0)
/**
* names_conflict - do two names conflict?
* @n1: name
* @n2: name
*/
static int
names_conflict(const char *n1, const char *n2)
{
return all_names(n1) || all_names(n2) || !strcmp(n1, n2);
}
static int
__same_entrylk_owner(pl_entry_lock_t *l1, pl_entry_lock_t *l2)
{
return (is_same_lkowner(&l1->owner, &l2->owner) &&
(l1->client == l2->client));
}
/* Just as in inodelk, allow conflicting name locks from same (lk_owner, conn)*/
static int
__conflicting_entrylks(pl_entry_lock_t *l1, pl_entry_lock_t *l2)
{
if (names_conflict(l1->basename, l2->basename) &&
!__same_entrylk_owner(l1, l2))
return 1;
return 0;
}
/* See comments in inodelk.c for details */
static inline gf_boolean_t
__stale_entrylk(xlator_t *this, pl_entry_lock_t *candidate_lock,
pl_entry_lock_t *requested_lock, time_t *lock_age_sec)
{
posix_locks_private_t *priv = NULL;
struct timeval curr;
priv = this->private;
/* Question: Should we just prune them all given the
* chance? Or just the locks we are attempting to acquire?
*/
if (names_conflict(candidate_lock->basename, requested_lock->basename)) {
gettimeofday(&curr, NULL);
*lock_age_sec = curr.tv_sec - candidate_lock->granted_time.tv_sec;
if (*lock_age_sec > priv->revocation_secs)
return _gf_true;
}
return _gf_false;
}
/* See comments in inodelk.c for details */
static gf_boolean_t
__entrylk_prune_stale(xlator_t *this, pl_inode_t *pinode, pl_dom_list_t *dom,
pl_entry_lock_t *lock)
{
posix_locks_private_t *priv = NULL;
pl_entry_lock_t *tmp = NULL;
pl_entry_lock_t *lk = NULL;
gf_boolean_t revoke_lock = _gf_false;
int bcount = 0;
int gcount = 0;
int op_errno = 0;
clrlk_args args;
args.opts = NULL;
time_t lk_age_sec = 0;
uint32_t max_blocked = 0;
char *reason_str = NULL;
priv = this->private;
args.type = CLRLK_ENTRY;
if (priv->revocation_clear_all == _gf_true)
args.kind = CLRLK_ALL;
else
args.kind = CLRLK_GRANTED;
if (list_empty(&dom->entrylk_list))
goto out;
pthread_mutex_lock(&pinode->mutex);
lock->pinode = pinode;
list_for_each_entry_safe(lk, tmp, &dom->entrylk_list, domain_list)
{
if (__stale_entrylk(this, lk, lock, &lk_age_sec) == _gf_true) {
revoke_lock = _gf_true;
reason_str = "age";
break;
}
}
max_blocked = priv->revocation_max_blocked;
if (max_blocked != 0 && revoke_lock == _gf_false) {
list_for_each_entry_safe(lk, tmp, &dom->blocked_entrylks, blocked_locks)
{
max_blocked--;
if (max_blocked == 0) {
revoke_lock = _gf_true;
reason_str = "max blocked";
break;
}
}
}
pthread_mutex_unlock(&pinode->mutex);
out:
if (revoke_lock == _gf_true) {
clrlk_clear_entrylk(this, pinode, dom, &args, &bcount, &gcount,
&op_errno);
gf_log(this->name, GF_LOG_WARNING,
"Lock revocation [reason: %s; gfid: %s; domain: %s; "
"age: %ld sec] - Entry lock revoked: %d granted & %d "
"blocked locks cleared",
reason_str, uuid_utoa(pinode->gfid), dom->domain, lk_age_sec,
gcount, bcount);
}
return revoke_lock;
}
void
entrylk_contention_notify_check(xlator_t *this, pl_entry_lock_t *lock,
struct timespec *now, struct list_head *contend)
{
posix_locks_private_t *priv;
int64_t elapsed;
priv = this->private;
/* If this lock is in a list, it means that we are about to send a
* notification for it, so no need to do anything else. */
if (!list_empty(&lock->contend)) {
return;
}
elapsed = now->tv_sec;
elapsed -= lock->contention_time.tv_sec;
if (now->tv_nsec < lock->contention_time.tv_nsec) {
elapsed--;
}
if (elapsed < priv->notify_contention_delay) {
return;
}
/* All contention notifications will be sent outside of the locked
* region. This means that currently granted locks might have already
* been unlocked by that time. To avoid the lock or the inode to be
* destroyed before we process them, we take an additional reference
* on both. */
inode_ref(lock->pinode->inode);
__pl_entrylk_ref(lock);
lock->contention_time = *now;
list_add_tail(&lock->contend, contend);
}
void
entrylk_contention_notify(xlator_t *this, struct list_head *contend)
{
struct gf_upcall up;
struct gf_upcall_entrylk_contention lc;
pl_entry_lock_t *lock;
pl_inode_t *pl_inode;
client_t *client;
gf_boolean_t notify;
while (!list_empty(contend)) {
lock = list_first_entry(contend, pl_entry_lock_t, contend);
pl_inode = lock->pinode;
pthread_mutex_lock(&pl_inode->mutex);
/* If the lock has already been released, no notification is
* sent. We clear the notification time in this case. */
notify = !list_empty(&lock->domain_list);
if (!notify) {
lock->contention_time.tv_sec = 0;
lock->contention_time.tv_nsec = 0;
} else {
lc.type = lock->type;
lc.name = lock->basename;
lc.pid = lock->client_pid;
lc.domain = lock->volume;
lc.xdata = NULL;
gf_uuid_copy(up.gfid, lock->pinode->gfid);
client = (client_t *)lock->client;
if (client == NULL) {
/* A NULL client can be found if the entrylk
* was issued by a server side xlator. */
up.client_uid = NULL;
} else {
up.client_uid = client->client_uid;
}
}
pthread_mutex_unlock(&pl_inode->mutex);
if (notify) {
up.event_type = GF_UPCALL_ENTRYLK_CONTENTION;
up.data = &lc;
if (this->notify(this, GF_EVENT_UPCALL, &up) < 0) {
gf_msg_debug(this->name, 0,
"Entrylk contention notification "
"failed");
} else {
gf_msg_debug(this->name, 0,
"Entrylk contention notification "
"sent");
}
}
pthread_mutex_lock(&pl_inode->mutex);
list_del_init(&lock->contend);
__pl_entrylk_unref(lock);
pthread_mutex_unlock(&pl_inode->mutex);
inode_unref(pl_inode->inode);
}
}
/**
* entrylk_grantable - is this lock grantable?
* @inode: inode in which to look
* @basename: name we're trying to lock
* @type: type of lock
*/
static pl_entry_lock_t *
__entrylk_grantable(xlator_t *this, pl_dom_list_t *dom, pl_entry_lock_t *lock,
struct timespec *now, struct list_head *contend)
{
pl_entry_lock_t *tmp = NULL;
pl_entry_lock_t *ret = NULL;
list_for_each_entry(tmp, &dom->entrylk_list, domain_list)
{
if (__conflicting_entrylks(tmp, lock)) {
if (ret == NULL) {
ret = tmp;
if (contend == NULL) {
break;
}
}
entrylk_contention_notify_check(this, tmp, now, contend);
}
}
return ret;
}
static pl_entry_lock_t *
__blocked_entrylk_conflict(pl_dom_list_t *dom, pl_entry_lock_t *lock)
{
pl_entry_lock_t *tmp = NULL;
list_for_each_entry(tmp, &dom->blocked_entrylks, blocked_locks)
{
if (names_conflict(tmp->basename, lock->basename))
return lock;
}
return NULL;
}
static int
__owner_has_lock(pl_dom_list_t *dom, pl_entry_lock_t *newlock)
{
pl_entry_lock_t *lock = NULL;
list_for_each_entry(lock, &dom->entrylk_list, domain_list)
{
if (__same_entrylk_owner(lock, newlock))
return 1;
}
list_for_each_entry(lock, &dom->blocked_entrylks, blocked_locks)
{
if (__same_entrylk_owner(lock, newlock))
return 1;
}
return 0;
}
static int
names_equal(const char *n1, const char *n2)
{
return (n1 == NULL && n2 == NULL) || (n1 && n2 && !strcmp(n1, n2));
}
void
pl_print_entrylk(char *str, int size, entrylk_cmd cmd, entrylk_type type,
const char *basename, const char *domain)
{
char *cmd_str = NULL;
char *type_str = NULL;
switch (cmd) {
case ENTRYLK_LOCK:
cmd_str = "LOCK";
break;
case ENTRYLK_LOCK_NB:
cmd_str = "LOCK_NB";
break;
case ENTRYLK_UNLOCK:
cmd_str = "UNLOCK";
break;
default:
cmd_str = "UNKNOWN";
break;
}
switch (type) {
case ENTRYLK_RDLCK:
type_str = "READ";
break;
case ENTRYLK_WRLCK:
type_str = "WRITE";
break;
default:
type_str = "UNKNOWN";
break;
}
snprintf(str, size,
"lock=ENTRYLK, cmd=%s, type=%s, basename=%s, domain: %s", cmd_str,
type_str, basename, domain);
}
void
entrylk_trace_in(xlator_t *this, call_frame_t *frame, const char *domain,
fd_t *fd, loc_t *loc, const char *basename, entrylk_cmd cmd,
entrylk_type type)
{
posix_locks_private_t *priv = NULL;
char pl_locker[256];
char pl_lockee[256];
char pl_entrylk[256];
priv = this->private;
if (!priv->trace)
return;
pl_print_locker(pl_locker, 256, this, frame);
pl_print_lockee(pl_lockee, 256, fd, loc);
pl_print_entrylk(pl_entrylk, 256, cmd, type, basename, domain);
gf_log(this->name, GF_LOG_INFO,
"[REQUEST] Locker = {%s} Lockee = {%s} Lock = {%s}", pl_locker,
pl_lockee, pl_entrylk);
}
void
entrylk_trace_out(xlator_t *this, call_frame_t *frame, const char *domain,
fd_t *fd, loc_t *loc, const char *basename, entrylk_cmd cmd,
entrylk_type type, int op_ret, int op_errno)
{
posix_locks_private_t *priv = NULL;
char pl_locker[256];
char pl_lockee[256];
char pl_entrylk[256];
char verdict[32];
priv = this->private;
if (!priv->trace)
return;
pl_print_locker(pl_locker, 256, this, frame);
pl_print_lockee(pl_lockee, 256, fd, loc);
pl_print_entrylk(pl_entrylk, 256, cmd, type, basename, domain);
pl_print_verdict(verdict, 32, op_ret, op_errno);
gf_log(this->name, GF_LOG_INFO,
"[%s] Locker = {%s} Lockee = {%s} Lock = {%s}", verdict, pl_locker,
pl_lockee, pl_entrylk);
}
void
entrylk_trace_block(xlator_t *this, call_frame_t *frame, const char *volume,
fd_t *fd, loc_t *loc, const char *basename, entrylk_cmd cmd,
entrylk_type type)
{
posix_locks_private_t *priv = NULL;
char pl_locker[256];
char pl_lockee[256];
char pl_entrylk[256];
priv = this->private;
if (!priv->trace)
return;
pl_print_locker(pl_locker, 256, this, frame);
pl_print_lockee(pl_lockee, 256, fd, loc);
pl_print_entrylk(pl_entrylk, 256, cmd, type, basename, volume);
gf_log(this->name, GF_LOG_INFO,
"[BLOCKED] Locker = {%s} Lockee = {%s} Lock = {%s}", pl_locker,
pl_lockee, pl_entrylk);
}
/**
* __find_most_matching_lock - find the lock struct which most matches in order
* of: lock on the exact basename || an all_names lock
*
*
* @inode: inode in which to look
* @basename: name to search for
*/
static pl_entry_lock_t *
__find_most_matching_lock(pl_dom_list_t *dom, const char *basename)
{
pl_entry_lock_t *lock;
pl_entry_lock_t *all = NULL;
pl_entry_lock_t *exact = NULL;
if (list_empty(&dom->entrylk_list))
return NULL;
list_for_each_entry(lock, &dom->entrylk_list, domain_list)
{
if (all_names(lock->basename))
all = lock;
else if (names_equal(lock->basename, basename))
exact = lock;
}
return (exact ? exact : all);
}
static pl_entry_lock_t *
__find_matching_lock(pl_dom_list_t *dom, pl_entry_lock_t *lock)
{
pl_entry_lock_t *tmp = NULL;
list_for_each_entry(tmp, &dom->entrylk_list, domain_list)
{
if (names_equal(lock->basename, tmp->basename) &&
__same_entrylk_owner(lock, tmp) && (lock->type == tmp->type))
return tmp;
}
return NULL;
}
static int
__lock_blocked_add(xlator_t *this, pl_inode_t *pinode, pl_dom_list_t *dom,
pl_entry_lock_t *lock, int nonblock)
{
struct timeval now;
if (nonblock)
goto out;
gettimeofday(&now, NULL);
lock->blkd_time = now;
list_add_tail(&lock->blocked_locks, &dom->blocked_entrylks);
gf_msg_trace(this->name, 0, "Blocking lock: {pinode=%p, basename=%s}",
pinode, lock->basename);
entrylk_trace_block(this, lock->frame, NULL, NULL, NULL, lock->basename,
ENTRYLK_LOCK, lock->type);
out:
return -EAGAIN;
}
/**
* __lock_entrylk - lock a name in a directory
* @inode: inode for the directory in which to lock
* @basename: name of the entry to lock
* if null, lock the entire directory
*
* the entire directory being locked is represented as: a single
* pl_entry_lock_t present in the entrylk_locks list with its
* basename = NULL
*/
int
__lock_entrylk(xlator_t *this, pl_inode_t *pinode, pl_entry_lock_t *lock,
int nonblock, pl_dom_list_t *dom, struct timespec *now,
struct list_head *contend)
{
pl_entry_lock_t *conf = NULL;
int ret = -EAGAIN;
conf = __entrylk_grantable(this, dom, lock, now, contend);
if (conf) {
ret = __lock_blocked_add(this, pinode, dom, lock, nonblock);
goto out;
}
/* To prevent blocked locks starvation, check if there are any blocked
* locks thay may conflict with this lock. If there is then don't grant
* the lock. BUT grant the lock if the owner already has lock to allow
* nested locks.
* Example: SHD from Machine1 takes (gfid, basename=257-length-name)
* and is granted.
* SHD from machine2 takes (gfid, basename=NULL) and is blocked.
* When SHD from Machine1 takes (gfid, basename=NULL) it needs to be
* granted, without which self-heal can't progress.
* TODO: Find why 'owner_has_lock' is checked even for blocked locks.
*/
if (__blocked_entrylk_conflict(dom, lock) &&
!(__owner_has_lock(dom, lock))) {
if (nonblock == 0) {
gf_log(this->name, GF_LOG_DEBUG,
"Lock is grantable, but blocking to prevent "
"starvation");
}
ret = __lock_blocked_add(this, pinode, dom, lock, nonblock);
goto out;
}
__pl_entrylk_ref(lock);
gettimeofday(&lock->granted_time, NULL);
list_add(&lock->domain_list, &dom->entrylk_list);
ret = 0;
out:
return ret;
}
/**
* __unlock_entrylk - unlock a name in a directory
* @inode: inode for the directory to unlock in
* @basename: name of the entry to unlock
* if null, unlock the entire directory
*/
pl_entry_lock_t *
__unlock_entrylk(pl_dom_list_t *dom, pl_entry_lock_t *lock)
{
pl_entry_lock_t *ret_lock = NULL;
ret_lock = __find_matching_lock(dom, lock);
if (ret_lock) {
list_del_init(&ret_lock->domain_list);
} else {
gf_log("locks", GF_LOG_ERROR,
"unlock on %s "
"(type=ENTRYLK_WRLCK) attempted but no matching lock "
"found",
lock->basename);
}
return ret_lock;
}
int32_t
check_entrylk_on_basename(xlator_t *this, inode_t *parent, char *basename)
{
int32_t entrylk = 0;
pl_dom_list_t *dom = NULL;
pl_entry_lock_t *conf = NULL;
pl_inode_t *pinode = pl_inode_get(this, parent, NULL);
if (!pinode)
goto out;
pthread_mutex_lock(&pinode->mutex);
{
list_for_each_entry(dom, &pinode->dom_list, inode_list)
{
conf = __find_most_matching_lock(dom, basename);
if (conf && conf->basename) {
entrylk = 1;
break;
}
}
}
pthread_mutex_unlock(&pinode->mutex);
out:
return entrylk;
}
void
__grant_blocked_entry_locks(xlator_t *this, pl_inode_t *pl_inode,
pl_dom_list_t *dom, struct list_head *granted,
struct timespec *now, struct list_head *contend)
{
int bl_ret = 0;
pl_entry_lock_t *bl = NULL;
pl_entry_lock_t *tmp = NULL;
struct list_head blocked_list;
INIT_LIST_HEAD(&blocked_list);
list_splice_init(&dom->blocked_entrylks, &blocked_list);
list_for_each_entry_safe(bl, tmp, &blocked_list, blocked_locks)
{
list_del_init(&bl->blocked_locks);
bl_ret = __lock_entrylk(bl->this, pl_inode, bl, 0, dom, now, contend);
if (bl_ret == 0) {
list_add_tail(&bl->blocked_locks, granted);
}
}
}
/* Grants locks if possible which are blocked on a lock */
void
grant_blocked_entry_locks(xlator_t *this, pl_inode_t *pl_inode,
pl_dom_list_t *dom, struct timespec *now,
struct list_head *contend)
{
struct list_head granted_list;
pl_entry_lock_t *tmp = NULL;
pl_entry_lock_t *lock = NULL;
INIT_LIST_HEAD(&granted_list);
pthread_mutex_lock(&pl_inode->mutex);
{
__grant_blocked_entry_locks(this, pl_inode, dom, &granted_list, now,
contend);
}
pthread_mutex_unlock(&pl_inode->mutex);
list_for_each_entry_safe(lock, tmp, &granted_list, blocked_locks)
{
entrylk_trace_out(this, lock->frame, NULL, NULL, NULL, lock->basename,
ENTRYLK_LOCK, lock->type, 0, 0);
STACK_UNWIND_STRICT(entrylk, lock->frame, 0, 0, NULL);
lock->frame = NULL;
}
pthread_mutex_lock(&pl_inode->mutex);
{
list_for_each_entry_safe(lock, tmp, &granted_list, blocked_locks)
{
list_del_init(&lock->blocked_locks);
__pl_entrylk_unref(lock);
}
}
pthread_mutex_unlock(&pl_inode->mutex);
}
/* Common entrylk code called by pl_entrylk and pl_fentrylk */
int
pl_common_entrylk(call_frame_t *frame, xlator_t *this, const char *volume,
inode_t *inode, const char *basename, entrylk_cmd cmd,
entrylk_type type, loc_t *loc, fd_t *fd, dict_t *xdata)
{
int32_t op_ret = -1;
int32_t op_errno = 0;
int ret = -1;
char unwind = 1;
GF_UNUSED int dict_ret = -1;
pl_inode_t *pinode = NULL;
pl_entry_lock_t *reqlock = NULL;
pl_entry_lock_t *unlocked = NULL;
pl_dom_list_t *dom = NULL;
char *conn_id = NULL;
pl_ctx_t *ctx = NULL;
int nonblock = 0;
gf_boolean_t need_inode_unref = _gf_false;
posix_locks_private_t *priv = NULL;
struct list_head *pcontend = NULL;
struct list_head contend;
struct timespec now = {};
priv = this->private;
if (priv->notify_contention) {
pcontend = &contend;
INIT_LIST_HEAD(pcontend);
timespec_now(&now);
}
if (xdata)
dict_ret = dict_get_str(xdata, "connection-id", &conn_id);
pinode = pl_inode_get(this, inode, NULL);
if (!pinode) {
op_errno = ENOMEM;
goto out;
}
if (frame->root->client) {
ctx = pl_ctx_get(frame->root->client, this);
if (!ctx) {
op_errno = ENOMEM;
gf_log(this->name, GF_LOG_INFO, "pl_ctx_get() failed");
goto unwind;
}
}
dom = get_domain(pinode, volume);
if (!dom) {
op_errno = ENOMEM;
goto out;
}
entrylk_trace_in(this, frame, volume, fd, loc, basename, cmd, type);
reqlock = new_entrylk_lock(pinode, basename, type, dom->domain, frame,
conn_id);
if (!reqlock) {
op_ret = -1;
op_errno = ENOMEM;
goto unwind;
}
/* Ideally, AFTER a successful lock (both blocking and non-blocking) or
* an unsuccessful blocking lock operation, the inode needs to be ref'd.
*
* But doing so might give room to a race where the lock-requesting
* client could send a DISCONNECT just before this thread refs the inode
* after the locking is done, and the epoll thread could unref the inode
* in cleanup which means the inode's refcount would come down to 0, and
* the call to pl_forget() at this point destroys @pinode. Now when
* the io-thread executing this function tries to access pinode,
* it could crash on account of illegal memory access.
*
* To get around this problem, the inode is ref'd once even before
* adding the lock into client_list as a precautionary measure.
* This way even if there are DISCONNECTs, there will always be 1 extra
* ref on the inode, so @pinode is still alive until after the
* current stack unwinds.
*/
pinode->inode = inode_ref(inode);
if (priv->revocation_secs != 0) {
if (cmd != ENTRYLK_UNLOCK) {
__entrylk_prune_stale(this, pinode, dom, reqlock);
} else if (priv->monkey_unlocking == _gf_true) {
if (pl_does_monkey_want_stuck_lock()) {
gf_log(this->name, GF_LOG_WARNING,
"MONKEY LOCKING (forcing stuck lock)!");
op_ret = 0;
need_inode_unref = _gf_true;
pthread_mutex_lock(&pinode->mutex);
{
__pl_entrylk_unref(reqlock);
}
pthread_mutex_unlock(&pinode->mutex);
goto out;
}
}
}
switch (cmd) {
case ENTRYLK_LOCK_NB:
nonblock = 1;
/* fall through */
case ENTRYLK_LOCK:
if (ctx)
pthread_mutex_lock(&ctx->lock);
pthread_mutex_lock(&pinode->mutex);
{
reqlock->pinode = pinode;
ret = __lock_entrylk(this, pinode, reqlock, nonblock, dom, &now,
pcontend);
if (ret == 0) {
reqlock->frame = NULL;
op_ret = 0;
} else {
op_errno = -ret;
}
if (ctx && (!ret || !nonblock))
list_add(&reqlock->client_list, &ctx->entrylk_lockers);
if (ret == -EAGAIN && !nonblock) {
/* blocked */
unwind = 0;
} else {
__pl_entrylk_unref(reqlock);
}
/* For all but the case where a non-blocking lock
* attempt fails, the extra ref taken before the switch
* block must be negated.
*/
if ((ret == -EAGAIN) && (nonblock))
need_inode_unref = _gf_true;
}
pthread_mutex_unlock(&pinode->mutex);
if (ctx)
pthread_mutex_unlock(&ctx->lock);
break;
case ENTRYLK_UNLOCK:
if (ctx)
pthread_mutex_lock(&ctx->lock);
pthread_mutex_lock(&pinode->mutex);
{
/* Irrespective of whether unlock succeeds or not,
* the extra inode ref that was done before the switch
* block must be negated. Towards this,
* @need_inode_unref flag is set unconditionally here.
*/
need_inode_unref = _gf_true;
unlocked = __unlock_entrylk(dom, reqlock);
if (unlocked) {
list_del_init(&unlocked->client_list);
__pl_entrylk_unref(unlocked);
op_ret = 0;
} else {
op_errno = EINVAL;
}
__pl_entrylk_unref(reqlock);
}
pthread_mutex_unlock(&pinode->mutex);
if (ctx)
pthread_mutex_unlock(&ctx->lock);
grant_blocked_entry_locks(this, pinode, dom, &now, pcontend);
break;
default:
need_inode_unref = _gf_true;
gf_log(this->name, GF_LOG_ERROR,
"Unexpected case in entrylk (cmd=%d). Please file"
"a bug report at http://bugs.gluster.com",
cmd);
goto out;
}
/* The following (extra) unref corresponds to the ref that
* was done at the time the lock was granted.
*/
if ((cmd == ENTRYLK_UNLOCK) && (op_ret == 0))
inode_unref(pinode->inode);
out:
if (need_inode_unref)
inode_unref(pinode->inode);
if (unwind) {
entrylk_trace_out(this, frame, volume, fd, loc, basename, cmd, type,
op_ret, op_errno);
unwind:
STACK_UNWIND_STRICT(entrylk, frame, op_ret, op_errno, NULL);
}
if (pcontend != NULL) {
entrylk_contention_notify(this, pcontend);
}
return 0;
}
/**
* pl_entrylk:
*
* Locking on names (directory entries)
*/
int
pl_entrylk(call_frame_t *frame, xlator_t *this, const char *volume, loc_t *loc,
const char *basename, entrylk_cmd cmd, entrylk_type type,
dict_t *xdata)
{
pl_common_entrylk(frame, this, volume, loc->inode, basename, cmd, type, loc,
NULL, xdata);
return 0;
}
/**
* pl_fentrylk:
*
* Locking on names (directory entries)
*/
int
pl_fentrylk(call_frame_t *frame, xlator_t *this, const char *volume, fd_t *fd,
const char *basename, entrylk_cmd cmd, entrylk_type type,
dict_t *xdata)
{
pl_common_entrylk(frame, this, volume, fd->inode, basename, cmd, type, NULL,
fd, xdata);
return 0;
}
static void
pl_entrylk_log_cleanup(pl_entry_lock_t *lock)
{
pl_inode_t *pinode = NULL;
pinode = lock->pinode;
gf_log(THIS->name, GF_LOG_WARNING,
"releasing lock on %s held by "
"{client=%p, pid=%" PRId64 " lk-owner=%s}",
uuid_utoa(pinode->gfid), lock->client, (uint64_t)lock->client_pid,
lkowner_utoa(&lock->owner));
}
/* Release all entrylks from this client */
int
pl_entrylk_client_cleanup(xlator_t *this, pl_ctx_t *ctx)
{
posix_locks_private_t *priv;
pl_entry_lock_t *tmp = NULL;
pl_entry_lock_t *l = NULL;
pl_dom_list_t *dom = NULL;
pl_inode_t *pinode = NULL;
struct list_head *pcontend = NULL;
struct list_head released;
struct list_head unwind;
struct list_head contend;
struct timespec now = {};
INIT_LIST_HEAD(&released);
INIT_LIST_HEAD(&unwind);
priv = this->private;
if (priv->notify_contention) {
pcontend = &contend;
INIT_LIST_HEAD(pcontend);
timespec_now(&now);
}
pthread_mutex_lock(&ctx->lock);
{
list_for_each_entry_safe(l, tmp, &ctx->entrylk_lockers, client_list)
{
pl_entrylk_log_cleanup(l);
pinode = l->pinode;
pthread_mutex_lock(&pinode->mutex);
{
/* If the entrylk object is part of granted list but not
* blocked list, then perform the following actions:
* i. delete the object from granted list;
* ii. grant other locks (from other clients) that may
* have been blocked on this entrylk; and
* iii. unref the object.
*
* If the entrylk object (L1) is part of both granted
* and blocked lists, then this means that a parallel
* unlock on another entrylk (L2 say) may have 'granted'
* L1 and added it to 'granted' list in
* __grant_blocked_entry_locks() (although using the
* 'blocked_locks' member). In that case, the cleanup
* codepath must try and grant other overlapping
* blocked entrylks from other clients, now that L1 is
* out of their way and then unref L1 in the end, and
* leave it to the other thread (the one executing
* unlock codepath) to unwind L1's frame, delete it from
* blocked_locks list, and perform the last unref on L1.
*
* If the entrylk object (L1) is part of blocked list
* only, the cleanup code path must:
* i. delete it from the blocked_locks list inside
* this critical section,
* ii. unwind its frame with EAGAIN,
* iii. try and grant blocked entry locks from other
* clients that were otherwise grantable, but were
* blocked to avoid leaving L1 to starve forever.
* iv. unref the object.
*/
list_del_init(&l->client_list);
if (!list_empty(&l->domain_list)) {
list_del_init(&l->domain_list);
list_add_tail(&l->client_list, &released);
} else {
list_del_init(&l->blocked_locks);
list_add_tail(&l->client_list, &unwind);
}
}
pthread_mutex_unlock(&pinode->mutex);
}
}
pthread_mutex_unlock(&ctx->lock);
if (!list_empty(&unwind)) {
list_for_each_entry_safe(l, tmp, &unwind, client_list)
{
list_del_init(&l->client_list);
if (l->frame)
STACK_UNWIND_STRICT(entrylk, l->frame, -1, EAGAIN, NULL);
list_add_tail(&l->client_list, &released);
}
}
if (!list_empty(&released)) {
list_for_each_entry_safe(l, tmp, &released, client_list)
{
list_del_init(&l->client_list);
pinode = l->pinode;
dom = get_domain(pinode, l->volume);
grant_blocked_entry_locks(this, pinode, dom, &now, pcontend);
pthread_mutex_lock(&pinode->mutex);
{
__pl_entrylk_unref(l);
}
pthread_mutex_unlock(&pinode->mutex);
inode_unref(pinode->inode);
}
}
if (pcontend != NULL) {
entrylk_contention_notify(this, pcontend);
}
return 0;
}
int32_t
__get_entrylk_count(xlator_t *this, pl_inode_t *pl_inode)
{
int32_t count = 0;
pl_entry_lock_t *lock = NULL;
pl_dom_list_t *dom = NULL;
list_for_each_entry(dom, &pl_inode->dom_list, inode_list)
{
list_for_each_entry(lock, &dom->entrylk_list, domain_list) { count++; }
list_for_each_entry(lock, &dom->blocked_entrylks, blocked_locks)
{
count++;
}
}
return count;
}
int32_t
get_entrylk_count(xlator_t *this, inode_t *inode)
{
pl_inode_t *pl_inode = NULL;
uint64_t tmp_pl_inode = 0;
int ret = 0;
int32_t count = 0;
ret = inode_ctx_get(inode, this, &tmp_pl_inode);
if (ret != 0) {
goto out;
}
pl_inode = (pl_inode_t *)(long)tmp_pl_inode;
pthread_mutex_lock(&pl_inode->mutex);
{
count = __get_entrylk_count(this, pl_inode);
}
pthread_mutex_unlock(&pl_inode->mutex);
out:
return count;
}