/*
Copyright (c) 2012-2014 DataLab, s.l. <http://www.datalab.es>
This file is part of GlusterFS.
This file is licensed to you under your choice of the GNU Lesser
General Public License, version 3 or any later version (LGPLv3 or
later), or the GNU General Public License, version 2 (GPLv2), in all
cases as published by the Free Software Foundation.
*/
#include <glusterfs/xlator.h>
#include <glusterfs/defaults.h>
#include <glusterfs/compat-errno.h>
#include <glusterfs/byte-order.h>
#include <glusterfs/syncop.h>
#include <glusterfs/syncop-utils.h>
#include <glusterfs/cluster-syncop.h>
#include "ec.h"
#include "ec-mem-types.h"
#include "ec-types.h"
#include "ec-messages.h"
#include "ec-helpers.h"
#include "ec-common.h"
#include "ec-combine.h"
#include "ec-method.h"
#include "ec-fops.h"
#include "ec-heald.h"
#define EC_COUNT(array, max) \
({ \
int __i; \
int __res = 0; \
for (__i = 0; __i < max; __i++) \
if (array[__i]) \
__res++; \
__res; \
})
#define EC_INTERSECT(dst, src1, src2, max) \
({ \
int __i; \
for (__i = 0; __i < max; __i++) \
dst[__i] = src1[__i] && src2[__i]; \
})
#define EC_ADJUST_SOURCE(source, sources, max) \
({ \
int __i; \
if (sources[source] == 0) { \
source = -1; \
for (__i = 0; __i < max; __i++) \
if (sources[__i]) \
source = __i; \
} \
})
#define IA_EQUAL(f, s, field) \
(memcmp(&(f.ia_##field), &(s.ia_##field), sizeof(s.ia_##field)) == 0)
#define EC_REPLIES_ALLOC(replies, numsubvols) \
do { \
int __i = 0; \
replies = alloca0(numsubvols * sizeof(*replies)); \
for (__i = 0; __i < numsubvols; __i++) \
INIT_LIST_HEAD(&replies[__i].entries.list); \
} while (0)
struct ec_name_data {
call_frame_t *frame;
unsigned char *participants;
unsigned char *failed_on;
unsigned char *gfidless;
unsigned char *enoent;
unsigned char *same;
char *name;
inode_t *parent;
default_args_cbk_t *replies;
};
static char *ec_ignore_xattrs[] = {GF_SELINUX_XATTR_KEY, QUOTA_SIZE_KEY, NULL};
static gf_boolean_t
ec_ignorable_key_match(dict_t *dict, char *key, data_t *val, void *mdata)
{
int i = 0;
if (!key)
goto out;
if (strncmp(key, EC_XATTR_PREFIX, SLEN(EC_XATTR_PREFIX)) == 0)
return _gf_true;
for (i = 0; ec_ignore_xattrs[i]; i++) {
if (!strcmp(key, ec_ignore_xattrs[i]))
return _gf_true;
}
out:
return _gf_false;
}
static gf_boolean_t
ec_sh_key_match(dict_t *dict, char *key, data_t *val, void *mdata)
{
return !ec_ignorable_key_match(dict, key, val, mdata);
}
/* FOP: heal */
void
ec_set_entry_healing(ec_fop_data_t *fop)
{
ec_inode_t *ctx = NULL;
loc_t *loc = NULL;
if (!fop)
return;
loc = &fop->loc[0];
LOCK(&loc->inode->lock);
{
ctx = __ec_inode_get(loc->inode, fop->xl);
if (ctx) {
ctx->heal_count += 1;
}
}
UNLOCK(&loc->inode->lock);
}
void
ec_reset_entry_healing(ec_fop_data_t *fop)
{
ec_inode_t *ctx = NULL;
loc_t *loc = NULL;
int32_t heal_count = 0;
if (!fop)
return;
loc = &fop->loc[0];
LOCK(&loc->inode->lock);
{
ctx = __ec_inode_get(loc->inode, fop->xl);
if (ctx) {
ctx->heal_count += -1;
heal_count = ctx->heal_count;
}
}
UNLOCK(&loc->inode->lock);
GF_ASSERT(heal_count >= 0);
}
uintptr_t
ec_heal_check(ec_fop_data_t *fop, uintptr_t *pgood)
{
ec_cbk_data_t *cbk;
uintptr_t mask[2] = {0, 0};
list_for_each_entry(cbk, &fop->cbk_list, list)
{
mask[cbk->op_ret >= 0] |= cbk->mask;
}
if (pgood != NULL) {
*pgood = mask[1];
}
return mask[0];
}
void
ec_heal_update(ec_fop_data_t *fop, int32_t is_open)
{
ec_heal_t *heal = fop->data;
uintptr_t good, bad;
bad = ec_heal_check(fop, &good);
LOCK(&heal->lock);
heal->bad &= ~bad;
if (is_open) {
heal->open |= good;
}
UNLOCK(&heal->lock);
fop->error = 0;
}
void
ec_heal_avoid(ec_fop_data_t *fop)
{
ec_heal_t *heal = fop->data;
uintptr_t bad;
bad = ec_heal_check(fop, NULL);
LOCK(&heal->lock);
heal->good &= ~bad;
UNLOCK(&heal->lock);
}
int32_t
ec_heal_lock_cbk(call_frame_t *frame, void *cookie, xlator_t *this,
int32_t op_ret, int32_t op_errno, dict_t *xdata)
{
ec_fop_data_t *fop = cookie;
ec_heal_t *heal = fop->data;
if (op_ret >= 0) {
GF_ASSERT(
ec_set_inode_size(heal->fop, heal->fd->inode, heal->total_size));
}
return 0;
}
void
ec_heal_lock(ec_heal_t *heal, int32_t type, fd_t *fd, loc_t *loc, off_t offset,
size_t size)
{
struct gf_flock flock;
fop_inodelk_cbk_t cbk = NULL;
flock.l_type = type;
flock.l_whence = SEEK_SET;
flock.l_start = offset;
flock.l_len = size;
flock.l_pid = 0;
flock.l_owner.len = 0;
if (type == F_UNLCK) {
/* Remove inode size information before unlocking it. */
if (fd == NULL) {
ec_clear_inode_info(heal->fop, heal->loc.inode);
} else {
ec_clear_inode_info(heal->fop, heal->fd->inode);
}
cbk = ec_lock_unlocked;
} else {
/* Otherwise use the callback to update size information. */
cbk = ec_heal_lock_cbk;
}
if (fd != NULL) {
ec_finodelk(heal->fop->frame, heal->xl,
&heal->fop->frame->root->lk_owner, heal->fop->mask,
EC_MINIMUM_ALL, cbk, heal, heal->xl->name, fd, F_SETLKW,
&flock, NULL);
} else {
ec_inodelk(heal->fop->frame, heal->xl,
&heal->fop->frame->root->lk_owner, heal->fop->mask,
EC_MINIMUM_ALL, cbk, heal, heal->xl->name, loc, F_SETLKW,
&flock, NULL);
}
}
void
ec_heal_inodelk(ec_heal_t *heal, int32_t type, int32_t use_fd, off_t offset,
size_t size)
{
ec_heal_lock(heal, type, use_fd ? heal->fd : NULL, &heal->loc, offset,
size);
}
int32_t
ec_heal_xattr_clean(dict_t *dict, char *key, data_t *data, void *arg)
{
dict_t *base = arg;
if (ec_ignorable_key_match(NULL, key, NULL, NULL)) {
dict_del(dict, key);
return 0;
}
if (dict_get(base, key) != NULL)
dict_del(dict, key);
return 0;
}
/********************************************************************
* ec_wind_xattrop_parallel:
* Helper function to update the extended attributes
* in parallel.
*
*******************************************************************/
void
ec_wind_xattrop_parallel(call_frame_t *frame, xlator_t *subvol, int child_index,
loc_t *loc, gf_xattrop_flags_t flags, dict_t **dict,
dict_t *xdata)
{
gf_msg_debug("EC", 0, "WIND: on child %d ", child_index);
STACK_WIND_COOKIE(
frame, cluster_xattrop_cbk, (void *)(uintptr_t)child_index, subvol,
subvol->fops->xattrop, loc, flags, dict[child_index], xdata);
}
int32_t
ec_heal_writev_cbk(call_frame_t *frame, void *cookie, xlator_t *this,
int32_t op_ret, int32_t op_errno, struct iatt *prebuf,
struct iatt *postbuf, dict_t *xdata)
{
ec_fop_data_t *fop = cookie;
ec_heal_t *heal = fop->data;
ec_trace("WRITE_CBK", cookie, "ret=%d, errno=%d", op_ret, op_errno);
gf_msg_debug(fop->xl->name, 0,
"%s: write op_ret %d, op_errno %s"
" at %" PRIu64,
uuid_utoa(heal->fd->inode->gfid), op_ret, strerror(op_errno),
heal->offset);
ec_heal_update(cookie, 0);
return 0;
}
int32_t
ec_heal_readv_cbk(call_frame_t *frame, void *cookie, xlator_t *this,
int32_t op_ret, int32_t op_errno, struct iovec *vector,
int32_t count, struct iatt *stbuf, struct iobref *iobref,
dict_t *xdata)
{
ec_fop_data_t *fop = cookie;
ec_heal_t *heal = fop->data;
ec_trace("READ_CBK", fop, "ret=%d, errno=%d", op_ret, op_errno);
ec_heal_avoid(fop);
if (op_ret > 0) {
gf_msg_debug(fop->xl->name, 0,
"%s: read succeeded, proceeding "
"to write at %" PRIu64,
uuid_utoa(heal->fd->inode->gfid), heal->offset);
ec_writev(heal->fop->frame, heal->xl, heal->bad, EC_MINIMUM_ONE,
ec_heal_writev_cbk, heal, heal->fd, vector, count,
heal->offset, 0, iobref, NULL);
} else {
if (op_ret < 0) {
gf_msg_debug(fop->xl->name, 0,
"%s: read failed %s, failing "
"to heal block at %" PRIu64,
uuid_utoa(heal->fd->inode->gfid), strerror(op_errno),
heal->offset);
heal->bad = 0;
}
heal->done = 1;
}
return 0;
}
void
ec_heal_data_block(ec_heal_t *heal)
{
ec_trace("DATA", heal->fop, "good=%lX, bad=%lX", heal->good, heal->bad);
if ((heal->good != 0) && (heal->bad != 0) &&
(heal->iatt.ia_type == IA_IFREG)) {
ec_readv(heal->fop->frame, heal->xl, heal->good, EC_MINIMUM_MIN,
ec_heal_readv_cbk, heal, heal->fd, heal->size, heal->offset, 0,
NULL);
}
}
/* FOP: fheal */
void
ec_fheal(call_frame_t *frame, xlator_t *this, uintptr_t target,
uint32_t fop_flags, fop_fheal_cbk_t func, void *data, fd_t *fd,
int32_t partial, dict_t *xdata)
{
ec_fd_t *ctx = ec_fd_get(fd, this);
if (ctx != NULL) {
gf_msg_trace("ec", 0, "FHEAL ctx: flags=%X, open=%" PRIXPTR, ctx->flags,
ctx->open);
ec_heal(frame, this, target, fop_flags, func, data, &ctx->loc, partial,
xdata);
}
}
/* Common heal code */
void
ec_mask_to_char_array(uintptr_t mask, unsigned char *array, int numsubvols)
{
int i = 0;
for (i = 0; i < numsubvols; i++)
array[i] = ((mask >> i) & 1);
}
uintptr_t
ec_char_array_to_mask(unsigned char *array, int numsubvols)
{
int i = 0;
uintptr_t mask = 0;
if (array == NULL)
goto out;
for (i = 0; i < numsubvols; i++)
if (array[i])
mask |= (1ULL << i);
out:
return mask;
}
int
ec_heal_entry_find_direction(ec_t *ec, default_args_cbk_t *replies,
uint64_t *versions, uint64_t *dirty,
unsigned char *sources,
unsigned char *healed_sinks)
{
uint64_t xattr[EC_VERSION_SIZE] = {0};
int source = -1;
uint64_t max_version = 0;
int ret = 0;
int i = 0;
for (i = 0; i < ec->nodes; i++) {
if (!replies[i].valid)
continue;
if (replies[i].op_ret == -1)
continue;
if (source == -1)
source = i;
ret = ec_dict_get_array(replies[i].xdata, EC_XATTR_VERSION, xattr,
EC_VERSION_SIZE);
if (ret == 0) {
versions[i] = xattr[EC_DATA_TXN];
if (max_version < versions[i]) {
max_version = versions[i];
source = i;
}
}
memset(xattr, 0, sizeof(xattr));
ret = ec_dict_get_array(replies[i].xdata, EC_XATTR_DIRTY, xattr,
EC_VERSION_SIZE);
if (ret == 0) {
dirty[i] = xattr[EC_DATA_TXN];
}
}
if (source < 0)
goto out;
for (i = 0; i < ec->nodes; i++) {
if (!replies[i].valid)
continue;
if (replies[i].op_ret == -1)
continue;
if (versions[i] == versions[source])
sources[i] = 1;
else
healed_sinks[i] = 1;
}
out:
return source;
}
int
ec_adjust_versions(call_frame_t *frame, ec_t *ec, ec_txn_t type, inode_t *inode,
int source, unsigned char *sources,
unsigned char *healed_sinks, uint64_t *versions,
uint64_t *dirty)
{
int i = 0;
int ret = 0;
int call_count = 0;
dict_t **xattr = NULL;
int op_ret = 0;
loc_t loc = {0};
gf_boolean_t erase_dirty = _gf_false;
uint64_t *versions_xattr = NULL;
uint64_t *dirty_xattr = NULL;
uint64_t allzero[2] = {0};
unsigned char *on = NULL;
unsigned char *output = NULL;
default_args_cbk_t *replies = NULL;
/* Allocate the required memory */
loc.inode = inode_ref(inode);
gf_uuid_copy(loc.gfid, inode->gfid);
on = alloca0(ec->nodes);
output = alloca0(ec->nodes);
EC_REPLIES_ALLOC(replies, ec->nodes);
xattr = GF_CALLOC(ec->nodes, sizeof(*xattr), gf_common_mt_pointer);
if (!xattr) {
op_ret = -ENOMEM;
goto out;
}
for (i = 0; i < ec->nodes; i++) {
xattr[i] = dict_new();
if (!xattr[i]) {
op_ret = -ENOMEM;
goto out;
}
}
/* dirty xattr represents if the file/dir needs heal. Unless all the
* copies are healed, don't erase it */
if (EC_COUNT(sources, ec->nodes) + EC_COUNT(healed_sinks, ec->nodes) ==
ec->nodes)
erase_dirty = _gf_true;
else
op_ret = -ENOTCONN;
/* Populate the xattr array */
for (i = 0; i < ec->nodes; i++) {
if (!sources[i] && !healed_sinks[i])
continue;
versions_xattr = GF_CALLOC(EC_VERSION_SIZE, sizeof(*versions_xattr),
gf_common_mt_pointer);
if (!versions_xattr) {
op_ret = -ENOMEM;
continue;
}
versions_xattr[type] = hton64(versions[source] - versions[i]);
ret = dict_set_bin(xattr[i], EC_XATTR_VERSION, versions_xattr,
(sizeof(*versions_xattr) * EC_VERSION_SIZE));
if (ret < 0) {
op_ret = -ENOMEM;
continue;
}
if (erase_dirty) {
dirty_xattr = GF_CALLOC(EC_VERSION_SIZE, sizeof(*dirty_xattr),
gf_common_mt_pointer);
if (!dirty_xattr) {
op_ret = -ENOMEM;
continue;
}
dirty_xattr[type] = hton64(-dirty[i]);
ret = dict_set_bin(xattr[i], EC_XATTR_DIRTY, dirty_xattr,
(sizeof(*dirty_xattr) * EC_VERSION_SIZE));
if (ret < 0) {
op_ret = -ENOMEM;
continue;
}
}
if (memcmp(versions_xattr, allzero,
(sizeof(*versions_xattr) * EC_VERSION_SIZE)) == 0) {
if (!erase_dirty) {
continue;
}
if (memcmp(dirty_xattr, allzero,
(sizeof(*dirty_xattr) * EC_VERSION_SIZE)) == 0) {
continue;
}
}
on[i] = 1;
call_count++;
}
/* Update the bricks with xattr */
if (call_count) {
PARALLEL_FOP_ONLIST(ec->xl_list, on, ec->nodes, replies, frame,
ec_wind_xattrop_parallel, &loc,
GF_XATTROP_ADD_ARRAY64, xattr, NULL);
ret = cluster_fop_success_fill(replies, ec->nodes, output);
}
if (ret < call_count) {
op_ret = -ENOTCONN;
goto out;
}
out:
/* Cleanup */
if (xattr) {
for (i = 0; i < ec->nodes; i++) {
if (xattr[i])
dict_unref(xattr[i]);
}
GF_FREE(xattr);
}
cluster_replies_wipe(replies, ec->nodes);
loc_wipe(&loc);
return op_ret;
}
int
ec_heal_metadata_find_direction(ec_t *ec, default_args_cbk_t *replies,
uint64_t *versions, uint64_t *dirty,
unsigned char *sources,
unsigned char *healed_sinks)
{
uint64_t xattr[EC_VERSION_SIZE] = {0};
uint64_t max_version = 0;
int same_count = 0;
int max_same_count = 0;
int same_source = -1;
int ret = 0;
int i = 0;
int j = 0;
int *groups = NULL;
struct iatt source_ia = {0};
struct iatt child_ia = {0};
groups = alloca0(ec->nodes * sizeof(*groups));
for (i = 0; i < ec->nodes; i++)
groups[i] = -1;
for (i = 0; i < ec->nodes; i++) {
if (!replies[i].valid)
continue;
if (replies[i].op_ret < 0)
continue;
ret = ec_dict_get_array(replies[i].xdata, EC_XATTR_VERSION, xattr,
EC_VERSION_SIZE);
if (ret == 0) {
versions[i] = xattr[EC_METADATA_TXN];
}
memset(xattr, 0, sizeof(xattr));
ret = ec_dict_get_array(replies[i].xdata, EC_XATTR_DIRTY, xattr,
EC_VERSION_SIZE);
if (ret == 0) {
dirty[i] = xattr[EC_METADATA_TXN];
}
if (groups[i] >= 0) /*Already part of group*/
continue;
groups[i] = i;
same_count = 1;
source_ia = replies[i].stat;
for (j = i + 1; j < ec->nodes; j++) {
if (!replies[j].valid || replies[j].op_ret < 0)
continue;
child_ia = replies[j].stat;
if (!IA_EQUAL(source_ia, child_ia, gfid) ||
!IA_EQUAL(source_ia, child_ia, type) ||
!IA_EQUAL(source_ia, child_ia, prot) ||
!IA_EQUAL(source_ia, child_ia, uid) ||
!IA_EQUAL(source_ia, child_ia, gid))
continue;
if (!are_dicts_equal(replies[i].xdata, replies[j].xdata,
ec_sh_key_match, NULL))
continue;
groups[j] = i;
same_count++;
}
if (max_same_count < same_count) {
max_same_count = same_count;
same_source = i;
}
}
if (max_same_count < ec->fragments) {
ret = -EIO;
goto out;
}
for (i = 0; i < ec->nodes; i++) {
if (groups[i] == groups[same_source])
sources[i] = 1;
else if (replies[i].valid && replies[i].op_ret >= 0)
healed_sinks[i] = 1;
}
for (i = 0; i < ec->nodes; i++) {
if (sources[i] && (versions[i] > max_version)) {
same_source = i;
max_version = versions[i];
}
}
ret = same_source;
out:
return ret;
}
int
__ec_heal_metadata_prepare(call_frame_t *frame, ec_t *ec, inode_t *inode,
unsigned char *locked_on,
default_args_cbk_t *replies, uint64_t *versions,
uint64_t *dirty, unsigned char *sources,
unsigned char *healed_sinks)
{
loc_t loc = {0};
unsigned char *output = NULL;
unsigned char *lookup_on = NULL;
int ret = 0;
int source = 0;
default_args_cbk_t *greplies = NULL;
int i = 0;
EC_REPLIES_ALLOC(greplies, ec->nodes);
loc.inode = inode_ref(inode);
gf_uuid_copy(loc.gfid, inode->gfid);
output = alloca0(ec->nodes);
lookup_on = alloca0(ec->nodes);
ret = cluster_lookup(ec->xl_list, locked_on, ec->nodes, replies, output,
frame, ec->xl, &loc, NULL);
if (ret <= ec->fragments) {
ret = -ENOTCONN;
goto out;
}
memcpy(lookup_on, output, ec->nodes);
/*Use getxattr to get the filtered xattrs which filter internal xattrs*/
ret = cluster_getxattr(ec->xl_list, lookup_on, ec->nodes, greplies, output,
frame, ec->xl, &loc, NULL, NULL);
for (i = 0; i < ec->nodes; i++) {
if (lookup_on[i] && !output[i]) {
replies[i].valid = 0;
continue;
}
if (replies[i].xdata) {
dict_unref(replies[i].xdata);
replies[i].xdata = NULL;
if (greplies[i].xattr)
replies[i].xdata = dict_ref(greplies[i].xattr);
}
}
source = ec_heal_metadata_find_direction(ec, replies, versions, dirty,
sources, healed_sinks);
if (source < 0) {
ret = -EIO;
goto out;
}
ret = source;
out:
cluster_replies_wipe(greplies, ec->nodes);
loc_wipe(&loc);
return ret;
}
/* Metadata heal */
int
__ec_removexattr_sinks(call_frame_t *frame, ec_t *ec, inode_t *inode,
int source, unsigned char *sources,
unsigned char *healed_sinks, default_args_cbk_t *replies)
{
int i = 0;
int ret = 0;
loc_t loc = {0};
loc.inode = inode_ref(inode);
gf_uuid_copy(loc.gfid, inode->gfid);
for (i = 0; i < ec->nodes; i++) {
if (i == source)
continue;
if (!sources[i] && !healed_sinks[i])
continue;
ret = dict_foreach(replies[i].xdata, ec_heal_xattr_clean,
replies[source].xdata);
if (ret < 0) {
sources[i] = 0;
healed_sinks[i] = 0;
continue;
}
if (replies[i].xdata->count == 0) {
continue;
} else if (sources[i]) {
/* This can happen if setxattr/removexattr succeeds on
* the bricks but fails to update the version. This
* will make sure that the xattrs are made equal after
* heal*/
sources[i] = 0;
healed_sinks[i] = 1;
}
ret = syncop_removexattr(ec->xl_list[i], &loc, "", replies[i].xdata,
NULL);
if (ret < 0)
healed_sinks[i] = 0;
}
loc_wipe(&loc);
if (EC_COUNT(healed_sinks, ec->nodes) == 0)
return -ENOTCONN;
return 0;
}
int
__ec_heal_metadata(call_frame_t *frame, ec_t *ec, inode_t *inode,
unsigned char *locked_on, unsigned char *sources,
unsigned char *healed_sinks)
{
loc_t loc = {0};
int ret = 0;
int source = 0;
default_args_cbk_t *replies = NULL;
default_args_cbk_t *sreplies = NULL;
uint64_t *versions = NULL;
uint64_t *dirty = NULL;
unsigned char *output = NULL;
dict_t *source_dict = NULL;
struct iatt source_buf = {0};
EC_REPLIES_ALLOC(replies, ec->nodes);
EC_REPLIES_ALLOC(sreplies, ec->nodes);
loc.inode = inode_ref(inode);
gf_uuid_copy(loc.gfid, inode->gfid);
output = alloca0(ec->nodes);
versions = alloca0(ec->nodes * sizeof(*versions));
dirty = alloca0(ec->nodes * sizeof(*dirty));
source = __ec_heal_metadata_prepare(frame, ec, inode, locked_on, replies,
versions, dirty, sources, healed_sinks);
if (source < 0) {
ret = -EIO;
goto out;
}
if ((EC_COUNT(sources, ec->nodes) == ec->nodes) ||
(EC_COUNT(healed_sinks, ec->nodes) == 0)) {
ret = 0;
goto erase_dirty;
}
source_buf = replies[source].stat;
ret = cluster_setattr(ec->xl_list, healed_sinks, ec->nodes, sreplies,
output, frame, ec->xl, &loc, &source_buf,
GF_SET_ATTR_MODE | GF_SET_ATTR_UID | GF_SET_ATTR_GID,
NULL);
/*In case the operation fails on some of the subvols*/
memcpy(healed_sinks, output, ec->nodes);
if (EC_COUNT(healed_sinks, ec->nodes) == 0) {
ret = -ENOTCONN;
goto out;
}
ret = __ec_removexattr_sinks(frame, ec, inode, source, sources,
healed_sinks, replies);
if (ret < 0)
goto out;
source_dict = dict_ref(replies[source].xdata);
if (dict_foreach_match(source_dict, ec_ignorable_key_match, NULL,
dict_remove_foreach_fn, NULL) == -1) {
ret = -ENOMEM;
goto out;
}
ret = cluster_setxattr(ec->xl_list, healed_sinks, ec->nodes, replies,
output, frame, ec->xl, &loc, source_dict, 0, NULL);
EC_INTERSECT(healed_sinks, healed_sinks, output, ec->nodes);
if (EC_COUNT(healed_sinks, ec->nodes) == 0) {
ret = -ENOTCONN;
goto out;
}
erase_dirty:
ret = ec_adjust_versions(frame, ec, EC_METADATA_TXN, inode, source, sources,
healed_sinks, versions, dirty);
out:
if (source_dict)
dict_unref(source_dict);
loc_wipe(&loc);
cluster_replies_wipe(replies, ec->nodes);
cluster_replies_wipe(sreplies, ec->nodes);
return ret;
}
int
ec_heal_metadata(call_frame_t *frame, ec_t *ec, inode_t *inode,
unsigned char *sources, unsigned char *healed_sinks)
{
unsigned char *locked_on = NULL;
unsigned char *up_subvols = NULL;
unsigned char *output = NULL;
int ret = 0;
default_args_cbk_t *replies = NULL;
EC_REPLIES_ALLOC(replies, ec->nodes);
locked_on = alloca0(ec->nodes);
output = alloca0(ec->nodes);
up_subvols = alloca0(ec->nodes);
ec_mask_to_char_array(ec->xl_up, up_subvols, ec->nodes);
ret = cluster_inodelk(ec->xl_list, up_subvols, ec->nodes, replies,
locked_on, frame, ec->xl, ec->xl->name, inode, 0, 0);
{
if (ret <= ec->fragments) {
gf_msg_debug(ec->xl->name, 0,
"%s: Skipping heal "
"as only %d number of subvolumes could "
"be locked",
uuid_utoa(inode->gfid), ret);
ret = -ENOTCONN;
goto unlock;
}
ret = __ec_heal_metadata(frame, ec, inode, locked_on, sources,
healed_sinks);
}
unlock:
cluster_uninodelk(ec->xl_list, locked_on, ec->nodes, replies, output, frame,
ec->xl, ec->xl->name, inode, 0, 0);
cluster_replies_wipe(replies, ec->nodes);
return ret;
}
/*entry heal*/
int
__ec_heal_entry_prepare(call_frame_t *frame, ec_t *ec, inode_t *inode,
unsigned char *locked_on, uint64_t *versions,
uint64_t *dirty, unsigned char *sources,
unsigned char *healed_sinks)
{
loc_t loc = {0};
int source = 0;
int ret = 0;
default_args_cbk_t *replies = NULL;
unsigned char *output = NULL;
dict_t *xdata = NULL;
EC_REPLIES_ALLOC(replies, ec->nodes);
loc.inode = inode_ref(inode);
gf_uuid_copy(loc.gfid, inode->gfid);
xdata = dict_new();
if (!xdata) {
ret = -ENOMEM;
goto out;
}
if (dict_set_uint64(xdata, EC_XATTR_VERSION, 0) ||
dict_set_uint64(xdata, EC_XATTR_DIRTY, 0)) {
ret = -ENOMEM;
goto out;
}
output = alloca0(ec->nodes);
ret = cluster_lookup(ec->xl_list, locked_on, ec->nodes, replies, output,
frame, ec->xl, &loc, xdata);
if (ret <= ec->fragments) {
ret = -ENOTCONN;
goto out;
}
source = ec_heal_entry_find_direction(ec, replies, versions, dirty, sources,
healed_sinks);
if (source < 0) {
ret = -EIO;
goto out;
}
ret = source;
out:
if (xdata)
dict_unref(xdata);
loc_wipe(&loc);
cluster_replies_wipe(replies, ec->nodes);
return ret;
}
int32_t
ec_set_new_entry_dirty(ec_t *ec, loc_t *loc, struct iatt *ia,
call_frame_t *frame, xlator_t *this, unsigned char *on)
{
dict_t *xattr = NULL;
int32_t ret = -1;
default_args_cbk_t *replies = NULL;
unsigned char *output = NULL;
uint64_t dirty[EC_VERSION_SIZE] = {1, 1};
loc_t newloc = {0};
/*Symlinks don't have any data to be healed*/
if (ia->ia_type == IA_IFLNK)
dirty[EC_DATA_TXN] = 0;
newloc.inode = inode_ref(loc->inode);
gf_uuid_copy(newloc.gfid, ia->ia_gfid);
EC_REPLIES_ALLOC(replies, ec->nodes);
output = alloca0(ec->nodes);
xattr = dict_new();
if (!xattr) {
ret = -ENOMEM;
goto out;
}
ret = ec_dict_set_array(xattr, EC_XATTR_DIRTY, dirty, EC_VERSION_SIZE);
if (ret)
goto out;
ret = cluster_xattrop(ec->xl_list, on, ec->nodes, replies, output, frame,
ec->xl, &newloc, GF_XATTROP_ADD_ARRAY64, xattr, NULL);
if (ret < ec->fragments) {
ret = -ENOTCONN;
goto out;
}
out:
if (xattr)
dict_unref(xattr);
cluster_replies_wipe(replies, ec->nodes);
loc_wipe(&newloc);
return ret;
}
/*Name heal*/
int
ec_delete_stale_name(dict_t *gfid_db, char *key, data_t *d, void *data)
{
struct ec_name_data *name_data = data;
struct iatt *ia = NULL;
ec_t *ec = NULL;
loc_t loc = {0};
unsigned char *same = data_to_bin(d);
default_args_cbk_t *replies = NULL;
unsigned char *output = NULL;
int ret = 0;
int estale_count = 0;
int i = 0;
call_frame_t *frame = name_data->frame;
ec = name_data->frame->this->private;
EC_REPLIES_ALLOC(replies, ec->nodes);
if (EC_COUNT(same, ec->nodes) >= ec->fragments) {
ret = 0;
goto out;
}
loc.inode = inode_new(name_data->parent->table);
if (!loc.inode) {
ret = -ENOMEM;
goto out;
}
gf_uuid_parse(key, loc.gfid);
output = alloca0(ec->nodes);
ret = cluster_lookup(ec->xl_list, name_data->participants, ec->nodes,
replies, output, name_data->frame, ec->xl, &loc, NULL);
for (i = 0; i < ec->nodes; i++) {
if (!replies[i].valid)
continue;
if (replies[i].op_ret == -1) {
if (replies[i].op_errno == ESTALE || replies[i].op_errno == ENOENT)
estale_count++;
else
name_data->participants[i] = 0;
}
}
if (estale_count <= ec->redundancy) {
/* We have at least ec->fragments number of fragments, so the
* file is recoverable, so don't delete it*/
/* Please note that the lookup call above could fail with
* ENOTCONN on all subvoumes and still this branch will be
* true, but in those cases conservatively we decide to not
* delete the file until we are sure*/
ret = 0;
goto out;
}
/*Noway to recover, delete the name*/
loc_wipe(&loc);
loc.parent = inode_ref(name_data->parent);
gf_uuid_copy(loc.pargfid, loc.parent->gfid);
loc.name = name_data->name;
for (i = 0; i < ec->nodes; i++) {
if (same[i] && replies[i].valid && (replies[i].op_ret == 0)) {
ia = &replies[i].stat;
break;
}
}
if (!ia) {
ret = -ENOTCONN;
goto out;
}
if (IA_ISDIR(ia->ia_type)) {
ret = cluster_rmdir(ec->xl_list, same, ec->nodes, replies, output,
frame, ec->xl, &loc, 1, NULL);
gf_msg_debug(ec->xl->name, 0,
"cluster rmdir succeeded on %d "
"nodes",
ret);
} else {
ret = cluster_unlink(ec->xl_list, same, ec->nodes, replies, output,
frame, ec->xl, &loc, 0, NULL);
gf_msg_debug(ec->xl->name, 0,
"cluster unlink succeeded on %d "
"nodes",
ret);
}
for (i = 0; i < ec->nodes; i++) {
if (output[i]) {
same[i] = 0;
name_data->enoent[i] = 1;
} else {
/*op failed*/
if (same[i])
name_data->participants[i] = 0;
}
}
ret = 0;
/*This will help in making decisions about creating names*/
dict_del(gfid_db, key);
out:
if (ret < 0) {
gf_msg_debug(ec->xl->name, 0, "%s/%s: heal failed %s",
uuid_utoa(name_data->parent->gfid), name_data->name,
strerror(-ret));
}
cluster_replies_wipe(replies, ec->nodes);
loc_wipe(&loc);
return ret;
}
int
ec_delete_stale_names(call_frame_t *frame, ec_t *ec, inode_t *parent,
char *name, default_args_cbk_t *replies, dict_t *gfid_db,
unsigned char *enoent, unsigned char *gfidless,
unsigned char *participants)
{
struct ec_name_data name_data = {0};
name_data.enoent = enoent;
name_data.gfidless = gfidless;
name_data.participants = participants;
name_data.name = name;
name_data.parent = parent;
name_data.frame = frame;
name_data.replies = replies;
return dict_foreach(gfid_db, ec_delete_stale_name, &name_data);
}
int
_assign_same(dict_t *dict, char *key, data_t *value, void *data)
{
struct ec_name_data *name_data = data;
name_data->same = data_to_bin(value);
return 0;
}
int
ec_create_name(call_frame_t *frame, ec_t *ec, inode_t *parent, char *name,
default_args_cbk_t *lookup_replies, dict_t *gfid_db,
unsigned char *enoent, unsigned char *participants)
{
int ret = 0;
int i = 0;
struct ec_name_data name_data = {0};
struct iatt *ia = NULL;
unsigned char *output = 0;
unsigned char *output1 = 0;
unsigned char *on = NULL;
default_args_cbk_t *replies = NULL;
loc_t loc = {0};
loc_t srcloc = {0};
unsigned char *link = NULL;
unsigned char *create = NULL;
dict_t *xdata = NULL;
char *linkname = NULL;
ec_config_t config;
/* There should be just one gfid key */
EC_REPLIES_ALLOC(replies, ec->nodes);
if (gfid_db->count != 1) {
ret = -EINVAL;
goto out;
}
ret = dict_foreach(gfid_db, _assign_same, &name_data);
if (ret < 0)
goto out;
/*There should at least be one valid success reply with gfid*/
for (i = 0; i < ec->nodes; i++)
if (name_data.same[i])
break;
if (i == ec->nodes) {
ret = -EINVAL;
goto out;
}
ia = &lookup_replies[i].stat;
xdata = dict_new();
loc.parent = inode_ref(parent);
gf_uuid_copy(loc.pargfid, parent->gfid);
loc.inode = inode_new(parent->table);
if (loc.inode)
srcloc.inode = inode_ref(loc.inode);
gf_uuid_copy(srcloc.gfid, ia->ia_gfid);
if (!loc.inode || !xdata ||
dict_set_static_bin(xdata, "gfid-req", ia->ia_gfid,
sizeof(ia->ia_gfid))) {
ret = -ENOMEM;
goto out;
}
loc.name = name;
link = alloca0(ec->nodes);
create = alloca0(ec->nodes);
on = alloca0(ec->nodes);
output = alloca0(ec->nodes);
output1 = alloca0(ec->nodes);
for (i = 0; i < ec->nodes; i++) {
if (!lookup_replies[i].valid)
continue;
if (lookup_replies[i].op_ret)
continue;
on[i] = 1;
}
switch (ia->ia_type) {
case IA_IFDIR:
ec_set_new_entry_dirty(ec, &loc, ia, frame, ec->xl, on);
(void)cluster_mkdir(
ec->xl_list, enoent, ec->nodes, replies, output, frame, ec->xl,
&loc, st_mode_from_ia(ia->ia_prot, ia->ia_type), 0, xdata);
break;
case IA_IFLNK:
/*Check for hard links and create/link*/
ret = cluster_lookup(ec->xl_list, enoent, ec->nodes, replies,
output, frame, ec->xl, &srcloc, NULL);
for (i = 0; i < ec->nodes; i++) {
if (output[i]) {
link[i] = 1;
} else {
if (replies[i].op_errno == ENOENT ||
replies[i].op_errno == ESTALE) {
create[i] = 1;
}
}
}
if (EC_COUNT(link, ec->nodes)) {
cluster_link(ec->xl_list, link, ec->nodes, replies, output1,
frame, ec->xl, &srcloc, &loc, NULL);
}
if (EC_COUNT(create, ec->nodes)) {
cluster_readlink(ec->xl_list, name_data.same, ec->nodes,
replies, output, frame, ec->xl, &srcloc, 4096,
NULL);
if (EC_COUNT(output, ec->nodes) == 0) {
ret = -ENOTCONN;
goto out;
}
for (i = 0; i < ec->nodes; i++) {
if (output[i])
break;
}
linkname = alloca0(strlen(replies[i].buf) + 1);
strcpy(linkname, replies[i].buf);
ec_set_new_entry_dirty(ec, &loc, ia, frame, ec->xl, on);
cluster_symlink(ec->xl_list, create, ec->nodes, replies, output,
frame, ec->xl, linkname, &loc, 0, xdata);
}
for (i = 0; i < ec->nodes; i++)
if (output1[i])
output[i] = 1;
break;
case IA_IFREG:
ec_set_new_entry_dirty(ec, &loc, ia, frame, ec->xl, on);
config.version = EC_CONFIG_VERSION;
config.algorithm = EC_CONFIG_ALGORITHM;
config.gf_word_size = EC_GF_BITS;
config.bricks = ec->nodes;
config.redundancy = ec->redundancy;
config.chunk_size = EC_METHOD_CHUNK_SIZE;
ret = ec_dict_set_config(xdata, EC_XATTR_CONFIG, &config);
if (ret != 0) {
goto out;
}
/* Fall through */
default:
ret = dict_set_int32(xdata, GLUSTERFS_INTERNAL_FOP_KEY, 1);
if (ret)
goto out;
ret = cluster_mknod(
ec->xl_list, enoent, ec->nodes, replies, output, frame, ec->xl,
&loc, st_mode_from_ia(ia->ia_prot, ia->ia_type),
makedev(ia_major(ia->ia_rdev), ia_minor(ia->ia_rdev)), 0,
xdata);
break;
}
for (i = 0; i < ec->nodes; i++) {
if (enoent[i] && !output[i])
participants[i] = 0;
}
ret = 0;
out:
if (ret < 0)
gf_msg_debug(ec->xl->name, 0, "%s/%s: heal failed %s",
uuid_utoa(parent->gfid), name, strerror(-ret));
cluster_replies_wipe(replies, ec->nodes);
loc_wipe(&loc);
loc_wipe(&srcloc);
if (xdata)
dict_unref(xdata);
return ret;
}
int
__ec_heal_name(call_frame_t *frame, ec_t *ec, inode_t *parent, char *name,
unsigned char *participants)
{
unsigned char *output = NULL;
unsigned char *enoent = NULL;
default_args_cbk_t *replies = NULL;
dict_t *xdata = NULL;
dict_t *gfid_db = NULL;
int ret = 0;
loc_t loc = {0};
int i = 0;
struct iatt *ia = NULL;
char gfid[64] = {0};
unsigned char *same = NULL;
unsigned char *gfidless = NULL;
EC_REPLIES_ALLOC(replies, ec->nodes);
loc.parent = inode_ref(parent);
loc.inode = inode_new(parent->table);
gf_uuid_copy(loc.pargfid, parent->gfid);
loc.name = name;
xdata = dict_new();
gfid_db = dict_new();
if (!xdata || !gfid_db || !loc.inode) {
ret = -ENOMEM;
goto out;
}
ret = dict_set_int32(xdata, GF_GFIDLESS_LOOKUP, 1);
if (ret) {
ret = -ENOMEM;
goto out;
}
output = alloca0(ec->nodes);
gfidless = alloca0(ec->nodes);
enoent = alloca0(ec->nodes);
ret = cluster_lookup(ec->xl_list, participants, ec->nodes, replies, output,
frame, ec->xl, &loc, NULL);
for (i = 0; i < ec->nodes; i++) {
if (!replies[i].valid)
continue;
if (replies[i].op_ret == -1) {
/*If ESTALE comes here, that means parent dir is not
* present, nothing to do there, so reset participants
* for that brick*/
if (replies[i].op_errno == ENOENT)
enoent[i] = 1;
else
participants[i] = 0;
continue;
}
ia = &replies[i].stat;
if (gf_uuid_is_null(ia->ia_gfid)) {
if (IA_ISDIR(ia->ia_type) || ia->ia_size == 0)
gfidless[i] = 1;
else
participants[i] = 0;
} else {
uuid_utoa_r(ia->ia_gfid, gfid);
ret = dict_get_bin(gfid_db, gfid, (void **)&same);
if (ret < 0) {
same = alloca0(ec->nodes);
}
same[i] = 1;
if (ret < 0) {
ret = dict_set_static_bin(gfid_db, gfid, same, ec->nodes);
}
if (ret < 0)
goto out;
}
}
ret = ec_delete_stale_names(frame, ec, parent, name, replies, gfid_db,
enoent, gfidless, participants);
if (gfid_db->count == 0) {
/* All entries seem to be stale entries and deleted,
* nothing more to do.*/
goto out;
}
if (gfid_db->count > 1) {
gf_msg(ec->xl->name, GF_LOG_INFO, 0, EC_MSG_HEAL_FAIL,
"%s/%s: Not able to heal", uuid_utoa(parent->gfid), name);
memset(participants, 0, ec->nodes);
goto out;
}
EC_INTERSECT(enoent, enoent, participants, ec->nodes);
if (EC_COUNT(enoent, ec->nodes) == 0) {
ret = 0;
goto out;
}
ret = ec_create_name(frame, ec, parent, name, replies, gfid_db, enoent,
participants);
out:
cluster_replies_wipe(replies, ec->nodes);
loc_wipe(&loc);
if (xdata)
dict_unref(xdata);
if (gfid_db)
dict_unref(gfid_db);
return ret;
}
int
ec_heal_name(call_frame_t *frame, ec_t *ec, inode_t *parent, char *name,
unsigned char *participants)
{
int ret = 0;
default_args_cbk_t *replies = NULL;
unsigned char *output = NULL;
unsigned char *locked_on = NULL;
loc_t loc = {0};
loc.parent = inode_ref(parent);
loc.name = name;
loc.inode = inode_new(parent->table);
if (!loc.inode) {
ret = -ENOMEM;
goto out;
}
EC_REPLIES_ALLOC(replies, ec->nodes);
output = alloca0(ec->nodes);
locked_on = alloca0(ec->nodes);
ret = cluster_inodelk(ec->xl_list, participants, ec->nodes, replies,
locked_on, frame, ec->xl, ec->xl->name, parent, 0, 0);
{
if (ret <= ec->fragments) {
gf_msg_debug(ec->xl->name, 0,
"%s/%s: Skipping "
"heal as only %d number of subvolumes could "
"be locked",
uuid_utoa(parent->gfid), name, ret);
ret = -ENOTCONN;
goto unlock;
}
EC_INTERSECT(participants, participants, locked_on, ec->nodes);
ret = __ec_heal_name(frame, ec, parent, name, participants);
}
unlock:
cluster_uninodelk(ec->xl_list, locked_on, ec->nodes, replies, output, frame,
ec->xl, ec->xl->name, parent, 0, 0);
out:
cluster_replies_wipe(replies, ec->nodes);
loc_wipe(&loc);
return ret;
}
int
ec_name_heal_handler(xlator_t *subvol, gf_dirent_t *entry, loc_t *parent,
void *data)
{
struct ec_name_data *name_data = data;
xlator_t *this = THIS;
ec_t *ec = this->private;
unsigned char *name_on = alloca0(ec->nodes);
int i = 0;
int ret = 0;
if (ec->shutdown) {
gf_msg_debug(this->name, 0,
"Cancelling directory heal "
"because EC is stopping.");
return -ENOTCONN;
}
memcpy(name_on, name_data->participants, ec->nodes);
ret = ec_heal_name(name_data->frame, ec, parent->inode, entry->d_name,
name_on);
if (ret < 0)
memset(name_on, 0, ec->nodes);
for (i = 0; i < ec->nodes; i++)
if (name_data->participants[i] && !name_on[i])
name_data->failed_on[i] = 1;
return 0;
}
int
ec_heal_names(call_frame_t *frame, ec_t *ec, inode_t *inode,
unsigned char *participants)
{
int i = 0;
int j = 0;
loc_t loc = {0};
struct ec_name_data name_data = {0};
int ret = 0;
loc.inode = inode_ref(inode);
gf_uuid_copy(loc.gfid, inode->gfid);
name_data.frame = frame;
name_data.participants = participants;
name_data.failed_on = alloca0(ec->nodes);
;
for (i = 0; i < ec->nodes; i++) {
if (!participants[i])
continue;
ret = syncop_dir_scan(ec->xl_list[i], &loc, GF_CLIENT_PID_SELF_HEALD,
&name_data, ec_name_heal_handler);
if (ret < 0) {
break;
}
for (j = 0; j < ec->nodes; j++)
if (name_data.failed_on[j])
participants[j] = 0;
if (EC_COUNT(participants, ec->nodes) <= ec->fragments) {
ret = -ENOTCONN;
break;
}
}
loc_wipe(&loc);
return ret;
}
int
__ec_heal_entry(call_frame_t *frame, ec_t *ec, inode_t *inode,
unsigned char *heal_on, unsigned char *sources,
unsigned char *healed_sinks)
{
unsigned char *locked_on = NULL;
unsigned char *output = NULL;
uint64_t *versions = NULL;
uint64_t *dirty = NULL;
unsigned char *participants = NULL;
default_args_cbk_t *replies = NULL;
int ret = 0;
int source = 0;
int i = 0;
locked_on = alloca0(ec->nodes);
output = alloca0(ec->nodes);
versions = alloca0(ec->nodes * sizeof(*versions));
dirty = alloca0(ec->nodes * sizeof(*dirty));
EC_REPLIES_ALLOC(replies, ec->nodes);
ret = cluster_inodelk(ec->xl_list, heal_on, ec->nodes, replies, locked_on,
frame, ec->xl, ec->xl->name, inode, 0, 0);
{
if (ret <= ec->fragments) {
gf_msg_debug(ec->xl->name, 0,
"%s: Skipping heal "
"as only %d number of subvolumes could "
"be locked",
uuid_utoa(inode->gfid), ret);
ret = -ENOTCONN;
goto unlock;
}
ret = __ec_heal_entry_prepare(frame, ec, inode, locked_on, versions,
dirty, sources, healed_sinks);
source = ret;
}
unlock:
cluster_uninodelk(ec->xl_list, locked_on, ec->nodes, replies, output, frame,
ec->xl, ec->xl->name, inode, 0, 0);
if (ret < 0)
goto out;
participants = alloca0(ec->nodes);
for (i = 0; i < ec->nodes; i++) {
if (sources[i] || healed_sinks[i])
participants[i] = 1;
}
ret = ec_heal_names(frame, ec, inode, participants);
if (EC_COUNT(participants, ec->nodes) <= ec->fragments)
goto out;
for (i = 0; i < ec->nodes; i++) {
if (!participants[i]) {
sources[i] = 0;
healed_sinks[i] = 0;
}
}
ec_adjust_versions(frame, ec, EC_DATA_TXN, inode, source, sources,
healed_sinks, versions, dirty);
out:
cluster_replies_wipe(replies, ec->nodes);
return ret;
}
int
ec_heal_entry(call_frame_t *frame, ec_t *ec, inode_t *inode,
unsigned char *sources, unsigned char *healed_sinks)
{
unsigned char *locked_on = NULL;
unsigned char *up_subvols = NULL;
unsigned char *output = NULL;
char selfheal_domain[1024] = {0};
int ret = 0;
default_args_cbk_t *replies = NULL;
EC_REPLIES_ALLOC(replies, ec->nodes);
locked_on = alloca0(ec->nodes);
output = alloca0(ec->nodes);
up_subvols = alloca0(ec->nodes);
sprintf(selfheal_domain, "%s:self-heal", ec->xl->name);
ec_mask_to_char_array(ec->xl_up, up_subvols, ec->nodes);
/*If other processes are already doing the heal, don't block*/
ret = cluster_tiebreaker_inodelk(ec->xl_list, up_subvols, ec->nodes,
replies, locked_on, frame, ec->xl,
selfheal_domain, inode, 0, 0);
{
if (ret <= ec->fragments) {
gf_msg_debug(ec->xl->name, 0,
"%s: Skipping heal "
"as only %d number of subvolumes could "
"be locked",
uuid_utoa(inode->gfid), ret);
ret = -ENOTCONN;
goto unlock;
}
ret = __ec_heal_entry(frame, ec, inode, locked_on, sources,
healed_sinks);
}
unlock:
cluster_uninodelk(ec->xl_list, locked_on, ec->nodes, replies, output, frame,
ec->xl, selfheal_domain, inode, 0, 0);
cluster_replies_wipe(replies, ec->nodes);
return ret;
}
/*Find direction for data heal and heal info*/
int
ec_heal_data_find_direction(ec_t *ec, default_args_cbk_t *replies,
uint64_t *data_versions, uint64_t *dirty,
uint64_t *size, unsigned char *sources,
unsigned char *healed_sinks,
gf_boolean_t check_ondisksize, int which)
{
uint64_t xattr[EC_VERSION_SIZE] = {0};
char version_size[128] = {0};
dict_t *version_size_db = NULL;
unsigned char *same = NULL;
int max_same_count = 0;
int source = 0;
int i = 0;
int ret = 0;
dict_t *dict = NULL;
uint64_t source_size = 0;
version_size_db = dict_new();
if (!version_size_db) {
ret = -ENOMEM;
goto out;
}
for (i = 0; i < ec->nodes; i++) {
if (!replies[i].valid)
continue;
if (replies[i].op_ret < 0)
continue;
dict = (which == EC_COMBINE_XDATA) ? replies[i].xdata
: replies[i].xattr;
ret = ec_dict_get_array(dict, EC_XATTR_VERSION, xattr, EC_VERSION_SIZE);
if (ret == 0) {
data_versions[i] = xattr[EC_DATA_TXN];
}
memset(xattr, 0, sizeof(xattr));
ret = ec_dict_get_array(dict, EC_XATTR_DIRTY, xattr, EC_VERSION_SIZE);
if (ret == 0) {
dirty[i] = xattr[EC_DATA_TXN];
}
ret = ec_dict_del_number(dict, EC_XATTR_SIZE, &size[i]);
/*Build a db of same metadata and data version and size*/
snprintf(version_size, sizeof(version_size), "%" PRIu64 "-%" PRIu64,
data_versions[i], size[i]);
ret = dict_get_bin(version_size_db, version_size, (void **)&same);
if (ret < 0) {
same = alloca0(ec->nodes);
}
same[i] = 1;
if (max_same_count < EC_COUNT(same, ec->nodes)) {
max_same_count = EC_COUNT(same, ec->nodes);
source = i;
}
if (ret < 0) {
ret = dict_set_static_bin(version_size_db, version_size, same,
ec->nodes);
}
if (ret < 0) {
ret = -ENOMEM;
goto out;
}
}
/* If we don't have ec->fragments number of same version,size it is not
* recoverable*/
if (max_same_count < ec->fragments) {
ret = -EIO;
goto out;
} else {
snprintf(version_size, sizeof(version_size), "%" PRIu64 "-%" PRIu64,
data_versions[source], size[source]);
ret = dict_get_bin(version_size_db, version_size, (void **)&same);
if (ret < 0)
goto out;
memcpy(sources, same, ec->nodes);
for (i = 0; i < ec->nodes; i++) {
if (replies[i].valid && (replies[i].op_ret == 0) && !sources[i])
healed_sinks[i] = 1;
}
}
/* There could be files with versions, size same but on disk ia_size
* could be different because of disk crashes, mark them as sinks as
* well*/
if (check_ondisksize) {
source_size = size[source];
ec_adjust_size_up(ec, &source_size, _gf_true);
for (i = 0; i < ec->nodes; i++) {
if (sources[i]) {
if (replies[i].stat.ia_size != source_size) {
sources[i] = 0;
healed_sinks[i] = 1;
max_same_count--;
} else {
source = i;
}
}
}
if (max_same_count < ec->fragments) {
ret = -EIO;
goto out;
}
}
ret = source;
out:
if (version_size_db)
dict_unref(version_size_db);
return ret;
}
int
__ec_heal_data_prepare(call_frame_t *frame, ec_t *ec, fd_t *fd,
unsigned char *locked_on, uint64_t *versions,
uint64_t *dirty, uint64_t *size, unsigned char *sources,
unsigned char *healed_sinks, unsigned char *trim,
struct iatt *stbuf)
{
default_args_cbk_t *replies = NULL;
default_args_cbk_t *fstat_replies = NULL;
unsigned char *output = NULL;
unsigned char *fstat_output = NULL;
dict_t *xattrs = NULL;
uint64_t zero_array[2] = {0};
int source = 0;
int ret = 0;
uint64_t zero_value = 0;
int i = 0;
EC_REPLIES_ALLOC(replies, ec->nodes);
EC_REPLIES_ALLOC(fstat_replies, ec->nodes);
output = alloca0(ec->nodes);
fstat_output = alloca0(ec->nodes);
xattrs = dict_new();
if (!xattrs ||
dict_set_static_bin(xattrs, EC_XATTR_VERSION, zero_array,
sizeof(zero_array)) ||
dict_set_static_bin(xattrs, EC_XATTR_DIRTY, zero_array,
sizeof(zero_array)) ||
dict_set_static_bin(xattrs, EC_XATTR_SIZE, &zero_value,
sizeof(zero_value))) {
ret = -ENOMEM;
goto out;
}
ret = cluster_fxattrop(ec->xl_list, locked_on, ec->nodes, replies, output,
frame, ec->xl, fd, GF_XATTROP_ADD_ARRAY64, xattrs,
NULL);
ret = cluster_fstat(ec->xl_list, locked_on, ec->nodes, fstat_replies,
fstat_output, frame, ec->xl, fd, NULL);
for (i = 0; i < ec->nodes; i++) {
output[i] = output[i] && fstat_output[i];
replies[i].valid = output[i];
if (output[i])
replies[i].stat = fstat_replies[i].stat;
}
if (EC_COUNT(output, ec->nodes) <= ec->fragments) {
ret = -ENOTCONN;
goto out;
}
source = ec_heal_data_find_direction(ec, replies, versions, dirty, size,
sources, healed_sinks, _gf_true,
EC_COMBINE_DICT);
ret = source;
if (ret < 0)
goto out;
if (stbuf)
*stbuf = replies[source].stat;
for (i = 0; i < ec->nodes; i++) {
if (healed_sinks[i]) {
if (replies[i].stat.ia_size)
trim[i] = 1;
}
}
if (EC_COUNT(sources, ec->nodes) < ec->fragments) {
ret = -ENOTCONN;
goto out;
}
ret = source;
out:
if (xattrs)
dict_unref(xattrs);
cluster_replies_wipe(replies, ec->nodes);
cluster_replies_wipe(fstat_replies, ec->nodes);
if (ret < 0) {
gf_msg_debug(ec->xl->name, 0, "%s: heal failed %s",
uuid_utoa(fd->inode->gfid), strerror(-ret));
} else {
gf_msg_debug(ec->xl->name, 0,
"%s: sources: %d, sinks: "
"%d",
uuid_utoa(fd->inode->gfid), EC_COUNT(sources, ec->nodes),
EC_COUNT(healed_sinks, ec->nodes));
}
return ret;
}
int
__ec_heal_mark_sinks(call_frame_t *frame, ec_t *ec, fd_t *fd,
uint64_t *versions, unsigned char *healed_sinks)
{
int i = 0;
int ret = 0;
unsigned char *mark = NULL;
dict_t *xattrs = NULL;
default_args_cbk_t *replies = NULL;
unsigned char *output = NULL;
uint64_t versions_xattr[2] = {0};
EC_REPLIES_ALLOC(replies, ec->nodes);
xattrs = dict_new();
if (!xattrs) {
ret = -ENOMEM;
goto out;
}
mark = alloca0(ec->nodes);
for (i = 0; i < ec->nodes; i++) {
if (!healed_sinks[i])
continue;
if ((versions[i] >> EC_SELFHEAL_BIT) & 1)
continue;
mark[i] = 1;
}
if (EC_COUNT(mark, ec->nodes) == 0)
return 0;
versions_xattr[EC_DATA_TXN] = hton64(1ULL << EC_SELFHEAL_BIT);
if (dict_set_static_bin(xattrs, EC_XATTR_VERSION, versions_xattr,
sizeof(versions_xattr))) {
ret = -ENOMEM;
goto out;
}
output = alloca0(ec->nodes);
ret = cluster_fxattrop(ec->xl_list, mark, ec->nodes, replies, output, frame,
ec->xl, fd, GF_XATTROP_ADD_ARRAY64, xattrs, NULL);
for (i = 0; i < ec->nodes; i++) {
if (!output[i]) {
if (mark[i])
healed_sinks[i] = 0;
continue;
}
versions[i] |= (1ULL << EC_SELFHEAL_BIT);
}
if (EC_COUNT(healed_sinks, ec->nodes) == 0) {
ret = -ENOTCONN;
goto out;
}
ret = 0;
out:
cluster_replies_wipe(replies, ec->nodes);
if (xattrs)
dict_unref(xattrs);
if (ret < 0)
gf_msg_debug(ec->xl->name, 0, "%s: heal failed %s",
uuid_utoa(fd->inode->gfid), strerror(-ret));
return ret;
}
int32_t
ec_manager_heal_block(ec_fop_data_t *fop, int32_t state)
{
ec_heal_t *heal = fop->data;
heal->fop = fop;
switch (state) {
case EC_STATE_INIT:
ec_owner_set(fop->frame, fop->frame->root);
ec_heal_inodelk(heal, F_WRLCK, 1, 0, 0);
return EC_STATE_HEAL_DATA_COPY;
case EC_STATE_HEAL_DATA_COPY:
gf_msg_debug(fop->xl->name, 0, "%s: read/write starting",
uuid_utoa(heal->fd->inode->gfid));
ec_heal_data_block(heal);
return EC_STATE_HEAL_DATA_UNLOCK;
case -EC_STATE_HEAL_DATA_COPY:
case -EC_STATE_HEAL_DATA_UNLOCK:
case EC_STATE_HEAL_DATA_UNLOCK:
ec_heal_inodelk(heal, F_UNLCK, 1, 0, 0);
return EC_STATE_REPORT;
case EC_STATE_REPORT:
if (fop->cbks.heal) {
fop->cbks.heal(fop->req_frame, fop, fop->xl, 0, 0,
(heal->good | heal->bad), heal->good, heal->bad,
NULL);
}
return EC_STATE_END;
case -EC_STATE_REPORT:
if (fop->cbks.heal) {
fop->cbks.heal(fop->req_frame, fop, fop->xl, -1, fop->error, 0,
0, 0, NULL);
}
return EC_STATE_END;
default:
gf_msg(fop->xl->name, GF_LOG_ERROR, 0, EC_MSG_UNHANDLED_STATE,
"Unhandled state %d for %s", state, ec_fop_name(fop->id));
return EC_STATE_END;
}
}
/*Takes lock */
void
ec_heal_block(call_frame_t *frame, xlator_t *this, uintptr_t target,
uint32_t fop_flags, fop_heal_cbk_t func, ec_heal_t *heal)
{
ec_cbk_t callback = {.heal = func};
ec_fop_data_t *fop = NULL;
int32_t error = ENOMEM;
gf_msg_trace("ec", 0, "EC(HEAL) %p", frame);
VALIDATE_OR_GOTO(this, out);
GF_VALIDATE_OR_GOTO(this->name, this->private, out);
fop = ec_fop_data_allocate(frame, this, EC_FOP_HEAL, 0, target, fop_flags,
NULL, ec_manager_heal_block, callback, heal);
if (fop == NULL)
goto out;
error = 0;
out:
if (fop != NULL) {
ec_manager(fop, error);
} else {
func(frame, NULL, this, -1, error, 0, 0, 0, NULL);
}
}
int32_t
ec_heal_block_done(call_frame_t *frame, void *cookie, xlator_t *this,
int32_t op_ret, int32_t op_errno, uintptr_t mask,
uintptr_t good, uintptr_t bad, dict_t *xdata)
{
ec_fop_data_t *fop = cookie;
ec_heal_t *heal = fop->data;
fop->heal = NULL;
heal->fop = NULL;
heal->error = op_ret < 0 ? op_errno : 0;
syncbarrier_wake(heal->data);
return 0;
}
int
ec_sync_heal_block(call_frame_t *frame, xlator_t *this, ec_heal_t *heal)
{
ec_heal_block(frame, this, heal->bad | heal->good, EC_MINIMUM_ONE,
ec_heal_block_done, heal);
syncbarrier_wait(heal->data, 1);
if (heal->error != 0) {
return -heal->error;
}
if (heal->bad == 0)
return -ENOTCONN;
return 0;
}
int
ec_rebuild_data(call_frame_t *frame, ec_t *ec, fd_t *fd, uint64_t size,
unsigned char *sources, unsigned char *healed_sinks)
{
ec_heal_t *heal = NULL;
int ret = 0;
syncbarrier_t barrier;
if (syncbarrier_init(&barrier))
return -ENOMEM;
heal = alloca0(sizeof(*heal));
heal->fd = fd_ref(fd);
heal->xl = ec->xl;
heal->data = &barrier;
ec_adjust_size_up(ec, &size, _gf_false);
heal->total_size = size;
heal->size = (128 * GF_UNIT_KB * (ec->self_heal_window_size));
/* We need to adjust the size to a multiple of the stripe size of the
* volume. Otherwise writes would need to fill gaps (head and/or tail)
* with existent data from the bad bricks. This could be garbage on a
* damaged file or it could fail if there aren't enough bricks. */
heal->size -= heal->size % ec->stripe_size;
heal->bad = ec_char_array_to_mask(healed_sinks, ec->nodes);
heal->good = ec_char_array_to_mask(sources, ec->nodes);
heal->iatt.ia_type = IA_IFREG;
LOCK_INIT(&heal->lock);
for (heal->offset = 0; (heal->offset < size) && !heal->done;
heal->offset += heal->size) {
/* We immediately abort any heal if a shutdown request has been
* received to avoid delays. The healing of this file will be
* restarted by another SHD or other client that accesses the
* file. */
if (ec->shutdown) {
gf_msg_debug(ec->xl->name, 0,
"Cancelling heal because "
"EC is stopping.");
ret = -ENOTCONN;
break;
}
gf_msg_debug(ec->xl->name, 0,
"%s: sources: %d, sinks: "
"%d, offset: %" PRIu64 " bsize: %" PRIu64,
uuid_utoa(fd->inode->gfid), EC_COUNT(sources, ec->nodes),
EC_COUNT(healed_sinks, ec->nodes), heal->offset,
heal->size);
ret = ec_sync_heal_block(frame, ec->xl, heal);
if (ret < 0)
break;
}
memset(healed_sinks, 0, ec->nodes);
ec_mask_to_char_array(heal->bad, healed_sinks, ec->nodes);
fd_unref(heal->fd);
LOCK_DESTROY(&heal->lock);
syncbarrier_destroy(heal->data);
if (ret < 0)
gf_msg_debug(ec->xl->name, 0, "%s: heal failed %s",
uuid_utoa(fd->inode->gfid), strerror(-ret));
return ret;
}
int
__ec_heal_trim_sinks(call_frame_t *frame, ec_t *ec, fd_t *fd,
unsigned char *healed_sinks, unsigned char *trim,
uint64_t size)
{
default_args_cbk_t *replies = NULL;
unsigned char *output = NULL;
int ret = 0;
int i = 0;
off_t trim_offset = 0;
EC_REPLIES_ALLOC(replies, ec->nodes);
output = alloca0(ec->nodes);
if (EC_COUNT(trim, ec->nodes) == 0) {
ret = 0;
goto out;
}
trim_offset = size;
ec_adjust_offset_up(ec, &trim_offset, _gf_true);
ret = cluster_ftruncate(ec->xl_list, trim, ec->nodes, replies, output,
frame, ec->xl, fd, trim_offset, NULL);
for (i = 0; i < ec->nodes; i++) {
if (!output[i] && trim[i])
healed_sinks[i] = 0;
}
if (EC_COUNT(healed_sinks, ec->nodes) == 0) {
ret = -ENOTCONN;
goto out;
}
out:
cluster_replies_wipe(replies, ec->nodes);
if (ret < 0)
gf_msg_debug(ec->xl->name, 0, "%s: heal failed %s",
uuid_utoa(fd->inode->gfid), strerror(-ret));
return ret;
}
int
ec_data_undo_pending(call_frame_t *frame, ec_t *ec, fd_t *fd, dict_t *xattr,
uint64_t *versions, uint64_t *dirty, uint64_t *size,
int source, gf_boolean_t erase_dirty, int idx)
{
uint64_t versions_xattr[2] = {0};
uint64_t dirty_xattr[2] = {0};
uint64_t allzero[2] = {0};
uint64_t size_xattr = 0;
int ret = 0;
versions_xattr[EC_DATA_TXN] = hton64(versions[source] - versions[idx]);
ret = dict_set_static_bin(xattr, EC_XATTR_VERSION, versions_xattr,
sizeof(versions_xattr));
if (ret < 0)
goto out;
size_xattr = hton64(size[source] - size[idx]);
ret = dict_set_static_bin(xattr, EC_XATTR_SIZE, &size_xattr,
sizeof(size_xattr));
if (ret < 0)
goto out;
if (erase_dirty) {
dirty_xattr[EC_DATA_TXN] = hton64(-dirty[idx]);
ret = dict_set_static_bin(xattr, EC_XATTR_DIRTY, dirty_xattr,
sizeof(dirty_xattr));
if (ret < 0)
goto out;
}
if ((memcmp(versions_xattr, allzero, sizeof(allzero)) == 0) &&
(memcmp(dirty_xattr, allzero, sizeof(allzero)) == 0) &&
(size_xattr == 0)) {
ret = 0;
goto out;
}
ret = syncop_fxattrop(ec->xl_list[idx], fd, GF_XATTROP_ADD_ARRAY64, xattr,
NULL, NULL, NULL);
out:
return ret;
}
int
__ec_fd_data_adjust_versions(call_frame_t *frame, ec_t *ec, fd_t *fd,
unsigned char *sources,
unsigned char *healed_sinks, uint64_t *versions,
uint64_t *dirty, uint64_t *size)
{
dict_t *xattr = NULL;
int i = 0;
int ret = 0;
int op_ret = 0;
int source = -1;
gf_boolean_t erase_dirty = _gf_false;
xattr = dict_new();
if (!xattr) {
op_ret = -ENOMEM;
goto out;
}
/* dirty xattr represents if the file needs heal. Unless all the
* copies are healed, don't erase it */
if (EC_COUNT(sources, ec->nodes) + EC_COUNT(healed_sinks, ec->nodes) ==
ec->nodes)
erase_dirty = _gf_true;
for (i = 0; i < ec->nodes; i++) {
if (sources[i]) {
source = i;
break;
}
}
if (source == -1) {
op_ret = -ENOTCONN;
goto out;
}
for (i = 0; i < ec->nodes; i++) {
if (healed_sinks[i]) {
ret = ec_data_undo_pending(frame, ec, fd, xattr, versions, dirty,
size, source, erase_dirty, i);
if (ret < 0)
goto out;
}
}
if (!erase_dirty)
goto out;
for (i = 0; i < ec->nodes; i++) {
if (sources[i]) {
ret = ec_data_undo_pending(frame, ec, fd, xattr, versions, dirty,
size, source, erase_dirty, i);
if (ret < 0)
continue;
}
}
out:
if (xattr)
dict_unref(xattr);
return op_ret;
}
int
ec_restore_time_and_adjust_versions(call_frame_t *frame, ec_t *ec, fd_t *fd,
unsigned char *sources,
unsigned char *healed_sinks,
uint64_t *versions, uint64_t *dirty,
uint64_t *size)
{
unsigned char *locked_on = NULL;
unsigned char *participants = NULL;
unsigned char *output = NULL;
default_args_cbk_t *replies = NULL;
unsigned char *postsh_sources = NULL;
unsigned char *postsh_healed_sinks = NULL;
unsigned char *postsh_trim = NULL;
uint64_t *postsh_versions = NULL;
uint64_t *postsh_dirty = NULL;
uint64_t *postsh_size = NULL;
int ret = 0;
int i = 0;
struct iatt source_buf = {0};
loc_t loc = {0};
locked_on = alloca0(ec->nodes);
output = alloca0(ec->nodes);
participants = alloca0(ec->nodes);
postsh_sources = alloca0(ec->nodes);
postsh_healed_sinks = alloca0(ec->nodes);
postsh_trim = alloca0(ec->nodes);
postsh_versions = alloca0(ec->nodes * sizeof(*postsh_versions));
postsh_dirty = alloca0(ec->nodes * sizeof(*postsh_dirty));
postsh_size = alloca0(ec->nodes * sizeof(*postsh_size));
for (i = 0; i < ec->nodes; i++) {
if (healed_sinks[i] || sources[i])
participants[i] = 1;
}
EC_REPLIES_ALLOC(replies, ec->nodes);
ret = cluster_inodelk(ec->xl_list, participants, ec->nodes, replies,
locked_on, frame, ec->xl, ec->xl->name, fd->inode, 0,
0);
{
if (ret <= ec->fragments) {
gf_msg_debug(ec->xl->name, 0,
"%s: Skipping heal "
"as only %d number of subvolumes could "
"be locked",
uuid_utoa(fd->inode->gfid), ret);
ret = -ENOTCONN;
goto unlock;
}
ret = __ec_heal_data_prepare(frame, ec, fd, locked_on, postsh_versions,
postsh_dirty, postsh_size, postsh_sources,
postsh_healed_sinks, postsh_trim,
&source_buf);
if (ret < 0)
goto unlock;
loc.inode = inode_ref(fd->inode);
gf_uuid_copy(loc.gfid, fd->inode->gfid);
ret = cluster_setattr(
ec->xl_list, healed_sinks, ec->nodes, replies, output, frame,
ec->xl, &loc, &source_buf,
GF_SET_ATTR_ATIME | GF_SET_ATTR_MTIME | GF_SET_ATTR_CTIME, NULL);
EC_INTERSECT(healed_sinks, healed_sinks, output, ec->nodes);
if (EC_COUNT(healed_sinks, ec->nodes) == 0) {
ret = -ENOTCONN;
goto unlock;
}
ret = __ec_fd_data_adjust_versions(frame, ec, fd, sources, healed_sinks,
versions, dirty, size);
}
unlock:
cluster_uninodelk(ec->xl_list, locked_on, ec->nodes, replies, output, frame,
ec->xl, ec->xl->name, fd->inode, 0, 0);
cluster_replies_wipe(replies, ec->nodes);
loc_wipe(&loc);
return ret;
}
int
__ec_heal_data(call_frame_t *frame, ec_t *ec, fd_t *fd, unsigned char *heal_on,
unsigned char *sources, unsigned char *healed_sinks)
{
unsigned char *locked_on = NULL;
unsigned char *output = NULL;
uint64_t *versions = NULL;
uint64_t *dirty = NULL;
uint64_t *size = NULL;
unsigned char *trim = NULL;
default_args_cbk_t *replies = NULL;
int ret = 0;
int source = 0;
locked_on = alloca0(ec->nodes);
output = alloca0(ec->nodes);
trim = alloca0(ec->nodes);
versions = alloca0(ec->nodes * sizeof(*versions));
dirty = alloca0(ec->nodes * sizeof(*dirty));
size = alloca0(ec->nodes * sizeof(*size));
EC_REPLIES_ALLOC(replies, ec->nodes);
ret = cluster_inodelk(ec->xl_list, heal_on, ec->nodes, replies, locked_on,
frame, ec->xl, ec->xl->name, fd->inode, 0, 0);
{
if (ret <= ec->fragments) {
gf_msg_debug(ec->xl->name, 0,
"%s: Skipping heal "
"as only %d number of subvolumes could "
"be locked",
uuid_utoa(fd->inode->gfid), ret);
ret = -ENOTCONN;
goto unlock;
}
ret = __ec_heal_data_prepare(frame, ec, fd, locked_on, versions, dirty,
size, sources, healed_sinks, trim, NULL);
if (ret < 0)
goto unlock;
if (EC_COUNT(healed_sinks, ec->nodes) == 0) {
ret = __ec_fd_data_adjust_versions(
frame, ec, fd, sources, healed_sinks, versions, dirty, size);
goto unlock;
}
source = ret;
ret = __ec_heal_mark_sinks(frame, ec, fd, versions, healed_sinks);
if (ret < 0)
goto unlock;
ret = __ec_heal_trim_sinks(frame, ec, fd, healed_sinks, trim,
size[source]);
}
unlock:
cluster_uninodelk(ec->xl_list, locked_on, ec->nodes, replies, output, frame,
ec->xl, ec->xl->name, fd->inode, 0, 0);
if (ret < 0)
goto out;
if (EC_COUNT(healed_sinks, ec->nodes) == 0)
goto out;
gf_msg_debug(ec->xl->name, 0,
"%s: sources: %d, sinks: "
"%d",
uuid_utoa(fd->inode->gfid), EC_COUNT(sources, ec->nodes),
EC_COUNT(healed_sinks, ec->nodes));
ret = ec_rebuild_data(frame, ec, fd, size[source], sources, healed_sinks);
if (ret < 0)
goto out;
ret = ec_restore_time_and_adjust_versions(
frame, ec, fd, sources, healed_sinks, versions, dirty, size);
out:
cluster_replies_wipe(replies, ec->nodes);
return ret;
}
int
ec_heal_data(call_frame_t *frame, ec_t *ec, gf_boolean_t block, inode_t *inode,
unsigned char *sources, unsigned char *healed_sinks)
{
unsigned char *locked_on = NULL;
unsigned char *up_subvols = NULL;
unsigned char *output = NULL;
default_args_cbk_t *replies = NULL;
fd_t *fd = NULL;
loc_t loc = {0};
char selfheal_domain[1024] = {0};
int ret = 0;
EC_REPLIES_ALLOC(replies, ec->nodes);
locked_on = alloca0(ec->nodes);
output = alloca0(ec->nodes);
up_subvols = alloca0(ec->nodes);
loc.inode = inode_ref(inode);
gf_uuid_copy(loc.gfid, inode->gfid);
fd = fd_create(inode, 0);
if (!fd) {
ret = -ENOMEM;
goto out;
}
ec_mask_to_char_array(ec->xl_up, up_subvols, ec->nodes);
ret = cluster_open(ec->xl_list, up_subvols, ec->nodes, replies, output,
frame, ec->xl, &loc, O_RDWR | O_LARGEFILE, fd, NULL);
if (ret <= ec->fragments) {
ret = -ENOTCONN;
goto out;
}
fd_bind(fd);
sprintf(selfheal_domain, "%s:self-heal", ec->xl->name);
/*If other processes are already doing the heal, don't block*/
if (block) {
ret = cluster_inodelk(ec->xl_list, output, ec->nodes, replies,
locked_on, frame, ec->xl, selfheal_domain, inode,
0, 0);
} else {
ret = cluster_tiebreaker_inodelk(ec->xl_list, output, ec->nodes,
replies, locked_on, frame, ec->xl,
selfheal_domain, inode, 0, 0);
}
{
if (ret <= ec->fragments) {
gf_msg_debug(ec->xl->name, 0,
"%s: Skipping heal "
"as only %d number of subvolumes could "
"be locked",
uuid_utoa(inode->gfid), ret);
ret = -ENOTCONN;
goto unlock;
}
ret = __ec_heal_data(frame, ec, fd, locked_on, sources, healed_sinks);
}
unlock:
cluster_uninodelk(ec->xl_list, locked_on, ec->nodes, replies, output, frame,
ec->xl, selfheal_domain, inode, 0, 0);
out:
if (fd)
fd_unref(fd);
loc_wipe(&loc);
cluster_replies_wipe(replies, ec->nodes);
return ret;
}
void
ec_heal_do(xlator_t *this, void *data, loc_t *loc, int32_t partial)
{
call_frame_t *frame = NULL;
unsigned char *participants = NULL;
unsigned char *msources = NULL;
unsigned char *mhealed_sinks = NULL;
unsigned char *sources = NULL;
unsigned char *healed_sinks = NULL;
ec_t *ec = NULL;
int ret = 0;
int op_ret = 0;
int op_errno = 0;
intptr_t mgood = 0;
intptr_t mbad = 0;
intptr_t good = 0;
intptr_t bad = 0;
ec_fop_data_t *fop = data;
gf_boolean_t blocking = _gf_false;
ec_heal_need_t need_heal = EC_HEAL_NONEED;
unsigned char *up_subvols = NULL;
char up_bricks[32];
ec = this->private;
/* If it is heal request from getxattr, complete the heal and then
* unwind, if it is ec_heal with NULL as frame then no need to block
* the heal as the caller doesn't care about its completion. In case
* of heald whichever gets tiebreaking inodelk will take care of the
* heal, so no need to block*/
if (fop->req_frame && !ec->shd.iamshd)
blocking = _gf_true;
frame = create_frame(this, this->ctx->pool);
if (!frame)
goto out;
ec_owner_set(frame, frame->root);
/*Do heal as root*/
frame->root->uid = 0;
frame->root->gid = 0;
/*Mark the fops as internal*/
frame->root->pid = GF_CLIENT_PID_SELF_HEALD;
participants = alloca0(ec->nodes);
ec_mask_to_char_array(ec->xl_up, participants, ec->nodes);
up_subvols = alloca0(ec->nodes);
ec_mask_to_char_array(ec->xl_up, up_subvols, ec->nodes);
if (loc->name && strlen(loc->name)) {
ret = ec_heal_name(frame, ec, loc->parent, (char *)loc->name,
participants);
if (ret == 0) {
gf_msg_debug(this->name, 0,
"%s: name heal "
"successful on %" PRIXPTR,
loc->path,
ec_char_array_to_mask(participants, ec->nodes));
} else {
gf_msg_debug(
this->name, 0,
"%s: name heal "
"failed. ret = %d, subvolumes up = %s",
loc->path, ret,
ec_bin(up_bricks, sizeof(up_bricks), ec->xl_up, ec->nodes));
}
}
/* Mount triggers heal only when it detects that it must need heal, shd
* triggers heals periodically which need not be thorough*/
if (ec->shd.iamshd) {
ec_heal_inspect(frame, ec, loc->inode, up_subvols, _gf_false, _gf_false,
&need_heal);
if (need_heal == EC_HEAL_NONEED) {
gf_msg(ec->xl->name, GF_LOG_DEBUG, 0, EC_MSG_HEAL_FAIL,
"Heal is not required for : %s ", uuid_utoa(loc->gfid));
goto out;
}
}
sources = alloca0(ec->nodes);
healed_sinks = alloca0(ec->nodes);
if (IA_ISREG(loc->inode->ia_type)) {
ret = ec_heal_data(frame, ec, blocking, loc->inode, sources,
healed_sinks);
} else if (IA_ISDIR(loc->inode->ia_type) && !partial) {
ret = ec_heal_entry(frame, ec, loc->inode, sources, healed_sinks);
} else {
ret = 0;
memcpy(sources, participants, ec->nodes);
memcpy(healed_sinks, participants, ec->nodes);
}
if (ret == 0) {
good = ec_char_array_to_mask(sources, ec->nodes);
bad = ec_char_array_to_mask(healed_sinks, ec->nodes);
} else {
op_ret = -1;
op_errno = -ret;
}
msources = alloca0(ec->nodes);
mhealed_sinks = alloca0(ec->nodes);
ret = ec_heal_metadata(frame, ec, loc->inode, msources, mhealed_sinks);
if (ret == 0) {
mgood = ec_char_array_to_mask(msources, ec->nodes);
mbad = ec_char_array_to_mask(mhealed_sinks, ec->nodes);
} else {
op_ret = -1;
op_errno = -ret;
}
out:
ec_reset_entry_healing(fop);
if (fop->cbks.heal) {
fop->cbks.heal(fop->req_frame, fop, fop->xl, op_ret, op_errno,
ec_char_array_to_mask(participants, ec->nodes),
mgood & good, mbad & bad, NULL);
}
if (frame)
STACK_DESTROY(frame->root);
return;
}
int
ec_synctask_heal_wrap(void *opaque)
{
ec_fop_data_t *fop = opaque;
ec_heal_do(fop->xl, fop, &fop->loc[0], fop->int32);
return 0;
}
int
ec_heal_done(int ret, call_frame_t *heal, void *opaque)
{
if (opaque)
ec_fop_data_release(opaque);
return 0;
}
ec_fop_data_t *
__ec_dequeue_heals(ec_t *ec)
{
ec_fop_data_t *fop = NULL;
if (list_empty(&ec->heal_waiting))
goto none;
if ((ec->background_heals > 0) && (ec->healers >= ec->background_heals))
goto none;
fop = list_entry(ec->heal_waiting.next, ec_fop_data_t, healer);
ec->heal_waiters--;
list_del_init(&fop->healer);
list_add(&fop->healer, &ec->healing);
ec->healers++;
return fop;
none:
gf_msg_debug(ec->xl->name, 0, "Num healers: %d, Num Waiters: %d",
ec->healers, ec->heal_waiters);
return NULL;
}
void
ec_heal_fail(ec_t *ec, ec_fop_data_t *fop)
{
if (fop->cbks.heal) {
fop->cbks.heal(fop->req_frame, NULL, ec->xl, -1, fop->error, 0, 0, 0,
NULL);
}
ec_fop_data_release(fop);
}
void
ec_launch_heal(ec_t *ec, ec_fop_data_t *fop)
{
int ret = 0;
call_frame_t *frame = NULL;
frame = create_frame(ec->xl, ec->xl->ctx->pool);
if (!frame) {
goto out;
ret = -1;
}
ec_owner_set(frame, frame->root);
/*Do heal as root*/
frame->root->uid = 0;
frame->root->gid = 0;
/*Mark the fops as internal*/
frame->root->pid = GF_CLIENT_PID_SELF_HEALD;
ret = synctask_new(ec->xl->ctx->env, ec_synctask_heal_wrap, ec_heal_done,
frame, fop);
out:
if (ret < 0) {
ec_fop_set_error(fop, ENOMEM);
ec_heal_fail(ec, fop);
}
if (frame)
STACK_DESTROY(frame->root);
}
void
ec_handle_healers_done(ec_fop_data_t *fop)
{
ec_t *ec = fop->xl->private;
ec_fop_data_t *heal_fop = NULL;
if (list_empty(&fop->healer))
return;
LOCK(&ec->lock);
list_del_init(&fop->healer);
do {
ec->healers--;
heal_fop = __ec_dequeue_heals(ec);
if ((heal_fop != NULL) && ec->shutdown) {
/* This will prevent ec_handle_healers_done() to be
* called recursively. That would be problematic if
* the queue is too big. */
list_del_init(&heal_fop->healer);
UNLOCK(&ec->lock);
ec_fop_set_error(fop, ENOTCONN);
ec_heal_fail(ec, heal_fop);
LOCK(&ec->lock);
}
} while ((heal_fop != NULL) && ec->shutdown);
UNLOCK(&ec->lock);
if (heal_fop)
ec_launch_heal(ec, heal_fop);
}
gf_boolean_t
ec_is_entry_healing(ec_fop_data_t *fop)
{
ec_inode_t *ctx = NULL;
int32_t heal_count = 0;
loc_t *loc = NULL;
loc = &fop->loc[0];
LOCK(&loc->inode->lock);
{
ctx = __ec_inode_get(loc->inode, fop->xl);
if (ctx) {
heal_count = ctx->heal_count;
}
}
UNLOCK(&loc->inode->lock);
GF_ASSERT(heal_count >= 0);
return heal_count;
}
void
ec_heal_throttle(xlator_t *this, ec_fop_data_t *fop)
{
gf_boolean_t can_heal = _gf_true;
ec_t *ec = this->private;
ec_fop_data_t *fop_rel = NULL;
if (fop->req_frame == NULL) {
LOCK(&ec->lock);
{
if ((ec->background_heals > 0) &&
(ec->heal_wait_qlen + ec->background_heals) >
(ec->heal_waiters + ec->healers)) {
if (!ec_is_entry_healing(fop)) {
list_add_tail(&fop->healer, &ec->heal_waiting);
ec->heal_waiters++;
ec_set_entry_healing(fop);
} else {
fop_rel = fop;
}
fop = __ec_dequeue_heals(ec);
} else {
can_heal = _gf_false;
}
}
UNLOCK(&ec->lock);
}
if (can_heal) {
if (fop) {
if (fop->req_frame != NULL) {
ec_set_entry_healing(fop);
}
ec_launch_heal(ec, fop);
}
} else {
gf_msg_debug(this->name, 0,
"Max number of heals are "
"pending, background self-heal rejected");
ec_fop_set_error(fop, EBUSY);
ec_heal_fail(ec, fop);
}
if (fop_rel) {
ec_heal_done(0, NULL, fop_rel);
}
}
void
ec_heal(call_frame_t *frame, xlator_t *this, uintptr_t target,
uint32_t fop_flags, fop_heal_cbk_t func, void *data, loc_t *loc,
int32_t partial, dict_t *xdata)
{
ec_cbk_t callback = {.heal = func};
ec_fop_data_t *fop = NULL;
int32_t err = EINVAL;
gf_msg_trace("ec", 0, "EC(HEAL) %p", frame);
VALIDATE_OR_GOTO(this, fail);
GF_VALIDATE_OR_GOTO(this->name, this->private, fail);
if (!loc || !loc->inode || gf_uuid_is_null(loc->inode->gfid))
goto fail;
if (frame && frame->local)
goto fail;
fop = ec_fop_data_allocate(frame, this, EC_FOP_HEAL, 0, target, fop_flags,
NULL, NULL, callback, data);
err = ENOMEM;
if (fop == NULL)
goto fail;
fop->int32 = partial;
if (loc) {
if (loc_copy(&fop->loc[0], loc) != 0)
goto fail;
}
if (xdata)
fop->xdata = dict_ref(xdata);
ec_heal_throttle(this, fop);
return;
fail:
if (fop)
ec_fop_data_release(fop);
if (func)
func(frame, NULL, this, -1, err, 0, 0, 0, NULL);
}
int
ec_replace_heal_done(int ret, call_frame_t *heal, void *opaque)
{
ec_t *ec = opaque;
gf_msg_debug(ec->xl->name, 0, "getxattr on bricks is done ret %d", ret);
return 0;
}
int32_t
ec_replace_heal(ec_t *ec, inode_t *inode)
{
loc_t loc = {0};
int ret = 0;
loc.inode = inode_ref(inode);
gf_uuid_copy(loc.gfid, inode->gfid);
ret = syncop_getxattr(ec->xl, &loc, NULL, EC_XATTR_HEAL, NULL, NULL);
if (ret < 0)
gf_msg_debug(ec->xl->name, 0, "Heal failed for replace brick ret = %d",
ret);
/* Once the root inode has been checked, it might have triggered a
* self-heal on it after a replace brick command or for some other
* reason. It can also happen that the volume already had damaged
* files in the index, even if the heal on the root directory failed.
* In both cases we need to wake all index healers to continue
* healing remaining entries that are marked as dirty. */
ec_shd_index_healer_wake(ec);
loc_wipe(&loc);
return ret;
}
int32_t
ec_replace_brick_heal_wrap(void *opaque)
{
ec_t *ec = opaque;
inode_table_t *itable = NULL;
int32_t ret = -1;
if (ec->xl->itable)
itable = ec->xl->itable;
else
goto out;
ret = ec_replace_heal(ec, itable->root);
out:
return ret;
}
int32_t
ec_launch_replace_heal(ec_t *ec)
{
int ret = -1;
if (!ec)
return ret;
ret = synctask_new(ec->xl->ctx->env, ec_replace_brick_heal_wrap,
ec_replace_heal_done, NULL, ec);
if (ret < 0) {
gf_msg_debug(ec->xl->name, 0, "Heal failed for replace brick ret = %d",
ret);
}
return ret;
}
int32_t
ec_set_heal_info(dict_t **dict_rsp, char *status)
{
dict_t *dict = NULL;
int ret = 0;
dict = dict_new();
if (!dict) {
ret = -ENOMEM;
goto out;
}
ret = dict_set_str(dict, "heal-info", status);
if (ret) {
gf_msg(THIS->name, GF_LOG_WARNING, -ret, EC_MSG_HEAL_FAIL,
"Failed to set heal-info key to "
"%s",
status);
dict_unref(dict);
dict = NULL;
}
*dict_rsp = dict;
out:
return ret;
}
static int32_t
_need_heal_calculate(ec_t *ec, uint64_t *dirty, unsigned char *sources,
gf_boolean_t self_locked, int32_t lock_count,
ec_heal_need_t *need_heal, uint64_t *versions)
{
int i = 0;
int source_count = 0;
source_count = EC_COUNT(sources, ec->nodes);
if (source_count == ec->nodes) {
*need_heal = EC_HEAL_NONEED;
if (self_locked || lock_count == 0) {
for (i = 0; i < ec->nodes; i++) {
if (dirty[i] || (versions[i] != versions[0])) {
*need_heal = EC_HEAL_MUST;
goto out;
}
}
} else {
for (i = 0; i < ec->nodes; i++) {
/* Since each lock can only increment the dirty
* count once, if dirty is > 1 it means that
* another operation has left the dirty count
* set and this indicates a problem in the
* inode.*/
if (dirty[i] > 1) {
*need_heal = EC_HEAL_MUST;
goto out;
}
if (dirty[i] != dirty[0] || (versions[i] != versions[0])) {
*need_heal = EC_HEAL_MAYBE;
}
}
}
} else {
*need_heal = EC_HEAL_MUST;
}
out:
return source_count;
}
static int32_t
ec_need_metadata_heal(ec_t *ec, inode_t *inode, default_args_cbk_t *replies,
int32_t lock_count, gf_boolean_t self_locked,
gf_boolean_t thorough, ec_heal_need_t *need_heal)
{
uint64_t *dirty = NULL;
unsigned char *sources = NULL;
unsigned char *healed_sinks = NULL;
uint64_t *meta_versions = NULL;
int ret = 0;
sources = alloca0(ec->nodes);
healed_sinks = alloca0(ec->nodes);
dirty = alloca0(ec->nodes * sizeof(*dirty));
meta_versions = alloca0(ec->nodes * sizeof(*meta_versions));
ret = ec_heal_metadata_find_direction(ec, replies, meta_versions, dirty,
sources, healed_sinks);
if (ret < 0 && ret != -EIO) {
goto out;
}
ret = _need_heal_calculate(ec, dirty, sources, self_locked, lock_count,
need_heal, meta_versions);
out:
return ret;
}
static int32_t
ec_need_data_heal(ec_t *ec, inode_t *inode, default_args_cbk_t *replies,
int32_t lock_count, gf_boolean_t self_locked,
gf_boolean_t thorough, ec_heal_need_t *need_heal)
{
uint64_t *dirty = NULL;
unsigned char *sources = NULL;
unsigned char *healed_sinks = NULL;
uint64_t *data_versions = NULL;
uint64_t *size = NULL;
int ret = 0;
sources = alloca0(ec->nodes);
healed_sinks = alloca0(ec->nodes);
dirty = alloca0(ec->nodes * sizeof(*dirty));
data_versions = alloca0(ec->nodes * sizeof(*data_versions));
size = alloca0(ec->nodes * sizeof(*size));
/* When dd is going on and heal info is called there is a very good
* chance for on disk sizes to mismatch even though nothing is wrong
* we don't need ondisk size check there. But if the file is either
* self-locked or the caller wants a thorough check then make sure to
* perform on disk check also. */
ret = ec_heal_data_find_direction(
ec, replies, data_versions, dirty, size, sources, healed_sinks,
self_locked || thorough, EC_COMBINE_XDATA);
if (ret < 0 && ret != -EIO) {
goto out;
}
ret = _need_heal_calculate(ec, dirty, sources, self_locked, lock_count,
need_heal, data_versions);
out:
return ret;
}
static int32_t
ec_need_entry_heal(ec_t *ec, inode_t *inode, default_args_cbk_t *replies,
int32_t lock_count, gf_boolean_t self_locked,
gf_boolean_t thorough, ec_heal_need_t *need_heal)
{
uint64_t *dirty = NULL;
unsigned char *sources = NULL;
unsigned char *healed_sinks = NULL;
uint64_t *data_versions = NULL;
int ret = 0;
sources = alloca0(ec->nodes);
healed_sinks = alloca0(ec->nodes);
dirty = alloca0(ec->nodes * sizeof(*dirty));
data_versions = alloca0(ec->nodes * sizeof(*data_versions));
ret = ec_heal_entry_find_direction(ec, replies, data_versions, dirty,
sources, healed_sinks);
if (ret < 0 && ret != -EIO) {
goto out;
}
ret = _need_heal_calculate(ec, dirty, sources, self_locked, lock_count,
need_heal, data_versions);
out:
return ret;
}
static int32_t
ec_need_heal(ec_t *ec, inode_t *inode, default_args_cbk_t *replies,
int32_t lock_count, gf_boolean_t self_locked,
gf_boolean_t thorough, ec_heal_need_t *need_heal)
{
int ret = 0;
ret = ec_need_metadata_heal(ec, inode, replies, lock_count, self_locked,
thorough, need_heal);
if (ret < 0)
goto out;
if (*need_heal == EC_HEAL_MUST)
goto out;
if (inode->ia_type == IA_IFREG) {
ret = ec_need_data_heal(ec, inode, replies, lock_count, self_locked,
thorough, need_heal);
} else if (inode->ia_type == IA_IFDIR) {
ret = ec_need_entry_heal(ec, inode, replies, lock_count, self_locked,
thorough, need_heal);
}
out:
return ret;
}
int32_t
ec_heal_inspect(call_frame_t *frame, ec_t *ec, inode_t *inode,
unsigned char *locked_on, gf_boolean_t self_locked,
gf_boolean_t thorough, ec_heal_need_t *need_heal)
{
loc_t loc = {0};
int i = 0;
int ret = 0;
dict_t *xdata = NULL;
uint64_t zero_array[2] = {0};
uint64_t zero_value = 0;
unsigned char *output = NULL;
default_args_cbk_t *replies = NULL;
int32_t lock_count = 0;
EC_REPLIES_ALLOC(replies, ec->nodes);
output = alloca0(ec->nodes);
loc.inode = inode_ref(inode);
gf_uuid_copy(loc.gfid, inode->gfid);
xdata = dict_new();
if (!xdata ||
dict_set_static_bin(xdata, EC_XATTR_VERSION, zero_array,
sizeof(zero_array)) ||
dict_set_static_bin(xdata, EC_XATTR_DIRTY, zero_array,
sizeof(zero_array)) ||
dict_set_static_bin(xdata, EC_XATTR_SIZE, &zero_value,
sizeof(zero_value))) {
ret = -ENOMEM;
goto out;
}
if (!self_locked) {
ret = dict_set_str(xdata, GLUSTERFS_INODELK_DOM_COUNT, ec->xl->name);
if (ret) {
ret = -ENOMEM;
goto out;
}
}
ret = cluster_lookup(ec->xl_list, locked_on, ec->nodes, replies, output,
frame, ec->xl, &loc, xdata);
if (ret != ec->nodes) {
ret = ec->nodes;
*need_heal = EC_HEAL_MUST;
goto out;
}
if (self_locked)
goto need_heal;
for (i = 0; i < ec->nodes; i++) {
if (!output[i] || !replies[i].xdata) {
continue;
}
if ((dict_get_int32(replies[i].xdata, GLUSTERFS_INODELK_COUNT,
&lock_count) == 0) &&
lock_count > 0) {
break;
}
}
need_heal:
ret = ec_need_heal(ec, inode, replies, lock_count, self_locked, thorough,
need_heal);
out:
cluster_replies_wipe(replies, ec->nodes);
loc_wipe(&loc);
if (xdata) {
dict_unref(xdata);
}
return ret;
}
int32_t
ec_heal_locked_inspect(call_frame_t *frame, ec_t *ec, inode_t *inode,
ec_heal_need_t *need_heal)
{
unsigned char *locked_on = NULL;
unsigned char *up_subvols = NULL;
unsigned char *output = NULL;
default_args_cbk_t *replies = NULL;
int ret = 0;
EC_REPLIES_ALLOC(replies, ec->nodes);
locked_on = alloca0(ec->nodes);
output = alloca0(ec->nodes);
up_subvols = alloca0(ec->nodes);
ec_mask_to_char_array(ec->xl_up, up_subvols, ec->nodes);
ret = cluster_inodelk(ec->xl_list, up_subvols, ec->nodes, replies,
locked_on, frame, ec->xl, ec->xl->name, inode, 0, 0);
if (ret != ec->nodes) {
*need_heal = EC_HEAL_MUST;
goto unlock;
}
ret = ec_heal_inspect(frame, ec, inode, locked_on, _gf_true, _gf_true,
need_heal);
unlock:
cluster_uninodelk(ec->xl_list, locked_on, ec->nodes, replies, output, frame,
ec->xl, ec->xl->name, inode, 0, 0);
cluster_replies_wipe(replies, ec->nodes);
return ret;
}
int32_t
ec_get_heal_info(xlator_t *this, loc_t *entry_loc, dict_t **dict_rsp)
{
int ret = -ENOMEM;
ec_heal_need_t need_heal = EC_HEAL_NONEED;
call_frame_t *frame = NULL;
ec_t *ec = NULL;
unsigned char *up_subvols = NULL;
loc_t loc = {
0,
};
VALIDATE_OR_GOTO(this, out);
GF_VALIDATE_OR_GOTO(this->name, entry_loc, out);
ec = this->private;
up_subvols = alloca0(ec->nodes);
ec_mask_to_char_array(ec->xl_up, up_subvols, ec->nodes);
if (EC_COUNT(up_subvols, ec->nodes) != ec->nodes) {
need_heal = EC_HEAL_MUST;
goto set_heal;
}
frame = create_frame(this, this->ctx->pool);
if (!frame) {
goto out;
}
ec_owner_set(frame, frame->root);
frame->root->uid = 0;
frame->root->gid = 0;
frame->root->pid = GF_CLIENT_PID_SELF_HEALD;
if (loc_copy(&loc, entry_loc) != 0) {
gf_msg(this->name, GF_LOG_ERROR, ENOMEM, EC_MSG_LOC_COPY_FAIL,
"Failed to copy a location.");
goto out;
}
if (!loc.inode) {
ret = syncop_inode_find(this, this, loc.gfid, &loc.inode, NULL, NULL);
if (ret < 0)
goto out;
}
ret = ec_heal_inspect(frame, ec, loc.inode, up_subvols, _gf_false,
_gf_false, &need_heal);
if (ret == ec->nodes && need_heal != EC_HEAL_MAYBE) {
goto set_heal;
}
need_heal = EC_HEAL_NONEED;
ret = ec_heal_locked_inspect(frame, ec, loc.inode, &need_heal);
if (ret < 0)
goto out;
set_heal:
if (need_heal == EC_HEAL_MUST) {
ret = ec_set_heal_info(dict_rsp, "heal");
} else {
ret = ec_set_heal_info(dict_rsp, "no-heal");
}
out:
if (frame) {
STACK_DESTROY(frame->root);
}
loc_wipe(&loc);
return ret;
}