/*
Copyright (c) 2013 Red Hat, Inc. <http://www.redhat.com>
This file is part of GlusterFS.
This file is licensed to you under your choice of the GNU Lesser
General Public License, version 3 or any later version (LGPLv3 or
later), or the GNU General Public License, version 2 (GPLv2), in all
cases as published by the Free Software Foundation.
*/
#include "afr.h"
#include "afr-self-heal.h"
#include <glusterfs/byte-order.h>
#include "protocol-common.h"
#include "afr-messages.h"
#include <glusterfs/events.h>
#define HAS_HOLES(i) ((i->ia_blocks * 512) < (i->ia_size))
static int
__checksum_cbk(call_frame_t *frame, void *cookie, xlator_t *this, int op_ret,
int op_errno, uint32_t weak, uint8_t *strong, dict_t *xdata)
{
afr_local_t *local = NULL;
struct afr_reply *replies = NULL;
int i = (long)cookie;
local = frame->local;
replies = local->replies;
replies[i].valid = 1;
replies[i].op_ret = op_ret;
replies[i].op_errno = op_errno;
if (xdata) {
replies[i].buf_has_zeroes = dict_get_str_boolean(
xdata, "buf-has-zeroes", _gf_false);
replies[i].fips_mode_rchecksum = dict_get_str_boolean(
xdata, "fips-mode-rchecksum", _gf_false);
}
if (strong) {
if (replies[i].fips_mode_rchecksum) {
memcpy(local->replies[i].checksum, strong, SHA256_DIGEST_LENGTH);
} else {
memcpy(local->replies[i].checksum, strong, MD5_DIGEST_LENGTH);
}
}
syncbarrier_wake(&local->barrier);
return 0;
}
static gf_boolean_t
__afr_can_skip_data_block_heal(call_frame_t *frame, xlator_t *this, fd_t *fd,
int source, unsigned char *healed_sinks,
off_t offset, size_t size, struct iatt *poststat)
{
afr_private_t *priv = NULL;
afr_local_t *local = NULL;
unsigned char *wind_subvols = NULL;
gf_boolean_t checksum_match = _gf_true;
struct afr_reply *replies = NULL;
dict_t *xdata = NULL;
int i = 0;
priv = this->private;
local = frame->local;
replies = local->replies;
xdata = dict_new();
if (!xdata)
goto out;
if (dict_set_int32_sizen(xdata, "check-zero-filled", 1)) {
dict_unref(xdata);
goto out;
}
wind_subvols = alloca0(priv->child_count);
for (i = 0; i < priv->child_count; i++) {
if (i == source || healed_sinks[i])
wind_subvols[i] = 1;
}
AFR_ONLIST(wind_subvols, frame, __checksum_cbk, rchecksum, fd, offset, size,
xdata);
if (xdata)
dict_unref(xdata);
if (!replies[source].valid || replies[source].op_ret != 0)
return _gf_false;
for (i = 0; i < priv->child_count; i++) {
if (i == source)
continue;
if (replies[i].valid) {
if (memcmp(replies[source].checksum, replies[i].checksum,
replies[source].fips_mode_rchecksum
? SHA256_DIGEST_LENGTH
: MD5_DIGEST_LENGTH)) {
checksum_match = _gf_false;
break;
}
}
}
if (checksum_match) {
if (HAS_HOLES(poststat))
return _gf_true;
/* For non-sparse files, we might be better off writing the
* zeroes to sinks to avoid mismatch of disk-usage in bricks. */
if (local->replies[source].buf_has_zeroes)
return _gf_false;
else
return _gf_true;
}
out:
return _gf_false;
}
static gf_boolean_t
__afr_is_sink_zero_filled(xlator_t *this, fd_t *fd, size_t size, off_t offset,
int sink)
{
afr_private_t *priv = NULL;
struct iobref *iobref = NULL;
struct iovec *iovec = NULL;
int count = 0;
int ret = 0;
gf_boolean_t zero_filled = _gf_false;
priv = this->private;
ret = syncop_readv(priv->children[sink], fd, size, offset, 0, &iovec,
&count, &iobref, NULL, NULL, NULL);
if (ret < 0)
goto out;
ret = iov_0filled(iovec, count);
if (!ret)
zero_filled = _gf_true;
out:
if (iovec)
GF_FREE(iovec);
if (iobref)
iobref_unref(iobref);
return zero_filled;
}
static int
__afr_selfheal_data_read_write(call_frame_t *frame, xlator_t *this, fd_t *fd,
int source, unsigned char *healed_sinks,
off_t offset, size_t size,
struct afr_reply *replies, int type)
{
struct iovec *iovec = NULL;
int count = 0;
struct iobref *iobref = NULL;
int ret = 0;
int i = 0;
afr_private_t *priv = NULL;
priv = this->private;
ret = syncop_readv(priv->children[source], fd, size, offset, 0, &iovec,
&count, &iobref, NULL, NULL, NULL);
if (ret <= 0)
return ret;
for (i = 0; i < priv->child_count; i++) {
if (!healed_sinks[i])
continue;
/*
* TODO: Use fiemap() and discard() to heal holes
* in the future.
*
* For now,
*
* - if the source had any holes at all,
* AND
* - if we are writing past the original file size
* of the sink
* AND
* - is NOT the last block of the source file. if
* the block contains EOF, it has to be written
* in order to set the file size even if the
* last block is 0-filled.
* AND
* - if the read buffer is filled with only 0's
*
* then, skip writing to this source. We don't depend
* on the write to happen to update the size as we
* have performed an ftruncate() upfront anyways.
*/
#define is_last_block(o, b, s) ((s >= o) && (s <= (o + b)))
if (HAS_HOLES((&replies[source].poststat)) &&
offset >= replies[i].poststat.ia_size &&
!is_last_block(offset, size, replies[source].poststat.ia_size) &&
(iov_0filled(iovec, count) == 0))
continue;
/* Avoid filling up sparse regions of the sink with 0-filled
* writes.*/
if (type == AFR_SELFHEAL_DATA_FULL &&
HAS_HOLES((&replies[source].poststat)) &&
((offset + size) <= replies[i].poststat.ia_size) &&
(iov_0filled(iovec, count) == 0) &&
__afr_is_sink_zero_filled(this, fd, size, offset, i)) {
continue;
}
ret = syncop_writev(priv->children[i], fd, iovec, count, offset, iobref,
0, NULL, NULL, NULL, NULL);
if (ret != iov_length(iovec, count)) {
/* write() failed on this sink. unset the corresponding
member in sinks[] (which is healed_sinks[] in the
caller) so that this server does NOT get considered
as successfully healed.
*/
healed_sinks[i] = 0;
}
}
if (iovec)
GF_FREE(iovec);
if (iobref)
iobref_unref(iobref);
return ret;
}
static int
afr_selfheal_data_block(call_frame_t *frame, xlator_t *this, fd_t *fd,
int source, unsigned char *healed_sinks, off_t offset,
size_t size, int type, struct afr_reply *replies)
{
int ret = -1;
int sink_count = 0;
afr_private_t *priv = NULL;
unsigned char *data_lock = NULL;
priv = this->private;
sink_count = AFR_COUNT(healed_sinks, priv->child_count);
data_lock = alloca0(priv->child_count);
ret = afr_selfheal_inodelk(frame, this, fd->inode, this->name, offset, size,
data_lock);
{
if (ret < sink_count) {
ret = -ENOTCONN;
goto unlock;
}
if (type == AFR_SELFHEAL_DATA_DIFF &&
__afr_can_skip_data_block_heal(frame, this, fd, source,
healed_sinks, offset, size,
&replies[source].poststat)) {
ret = 0;
goto unlock;
}
ret = __afr_selfheal_data_read_write(
frame, this, fd, source, healed_sinks, offset, size, replies, type);
}
unlock:
afr_selfheal_uninodelk(frame, this, fd->inode, this->name, offset, size,
data_lock);
return ret;
}
static int
afr_selfheal_data_fsync(call_frame_t *frame, xlator_t *this, fd_t *fd,
unsigned char *healed_sinks)
{
afr_local_t *local = NULL;
afr_private_t *priv = NULL;
int i = 0;
local = frame->local;
priv = this->private;
if (!priv->ensure_durability)
return 0;
AFR_ONLIST(healed_sinks, frame, afr_sh_generic_fop_cbk, fsync, fd, 0, NULL);
for (i = 0; i < priv->child_count; i++)
if (healed_sinks[i] && local->replies[i].op_ret != 0)
/* fsync() failed. Do NOT consider this server
as successfully healed. Mark it so.
*/
healed_sinks[i] = 0;
return 0;
}
static int
afr_data_self_heal_type_get(afr_private_t *priv, unsigned char *healed_sinks,
int source, struct afr_reply *replies)
{
int type = AFR_SELFHEAL_DATA_FULL;
int i = 0;
if (priv->data_self_heal_algorithm == AFR_SELFHEAL_DATA_DYNAMIC) {
type = AFR_SELFHEAL_DATA_FULL;
for (i = 0; i < priv->child_count; i++) {
if (!healed_sinks[i] && i != source)
continue;
if (replies[i].poststat.ia_size) {
type = AFR_SELFHEAL_DATA_DIFF;
break;
}
}
} else {
type = priv->data_self_heal_algorithm;
}
return type;
}
static int
afr_selfheal_data_do(call_frame_t *frame, xlator_t *this, fd_t *fd, int source,
unsigned char *healed_sinks, struct afr_reply *replies)
{
afr_private_t *priv = NULL;
off_t off = 0;
size_t block = 0;
int type = AFR_SELFHEAL_DATA_FULL;
int ret = -1;
call_frame_t *iter_frame = NULL;
unsigned char arbiter_sink_status = 0;
priv = this->private;
if (priv->arbiter_count) {
arbiter_sink_status = healed_sinks[ARBITER_BRICK_INDEX];
healed_sinks[ARBITER_BRICK_INDEX] = 0;
}
block = 128 * 1024 * priv->data_self_heal_window_size;
type = afr_data_self_heal_type_get(priv, healed_sinks, source, replies);
iter_frame = afr_copy_frame(frame);
if (!iter_frame) {
ret = -ENOMEM;
goto out;
}
for (off = 0; off < replies[source].poststat.ia_size; off += block) {
if (AFR_COUNT(healed_sinks, priv->child_count) == 0) {
ret = -ENOTCONN;
goto out;
}
ret = afr_selfheal_data_block(iter_frame, this, fd, source,
healed_sinks, off, block, type, replies);
if (ret < 0)
goto out;
AFR_STACK_RESET(iter_frame);
if (iter_frame->local == NULL) {
ret = -ENOTCONN;
goto out;
}
}
ret = afr_selfheal_data_fsync(frame, this, fd, healed_sinks);
out:
if (arbiter_sink_status)
healed_sinks[ARBITER_BRICK_INDEX] = arbiter_sink_status;
if (iter_frame)
AFR_STACK_DESTROY(iter_frame);
return ret;
}
static int
__afr_selfheal_truncate_sinks(call_frame_t *frame, xlator_t *this, fd_t *fd,
unsigned char *healed_sinks, uint64_t size)
{
afr_local_t *local = NULL;
afr_private_t *priv = NULL;
int i = 0;
local = frame->local;
priv = this->private;
/* This will send truncate on the arbiter brick as well if it is marked as
* sink. If changelog is enabled on the volume it captures truncate as a
* data transactions on the arbiter brick. This will help geo-rep to
* properly sync the data from master to slave if arbiter is the ACTIVE
* brick during syncing and which had got some entries healed for data as
* part of self heal.
*/
AFR_ONLIST(healed_sinks, frame, afr_sh_generic_fop_cbk, ftruncate, fd, size,
NULL);
for (i = 0; i < priv->child_count; i++)
if (healed_sinks[i] && local->replies[i].op_ret == -1)
/* truncate() failed. Do NOT consider this server
as successfully healed. Mark it so.
*/
healed_sinks[i] = 0;
return 0;
}
gf_boolean_t
afr_has_source_witnesses(xlator_t *this, unsigned char *sources,
uint64_t *witness)
{
int i = 0;
afr_private_t *priv = NULL;
priv = this->private;
for (i = 0; i < priv->child_count; i++) {
if (sources[i] && witness[i])
return _gf_true;
}
return _gf_false;
}
static gf_boolean_t
afr_does_size_mismatch(xlator_t *this, unsigned char *sources,
struct afr_reply *replies)
{
int i = 0;
afr_private_t *priv = NULL;
struct iatt *min = NULL;
struct iatt *max = NULL;
priv = this->private;
for (i = 0; i < priv->child_count; i++) {
if (!replies[i].valid)
continue;
if (replies[i].op_ret < 0)
continue;
if (!sources[i])
continue;
if (AFR_IS_ARBITER_BRICK(priv, i) && (replies[i].poststat.ia_size == 0))
continue;
if (!min)
min = &replies[i].poststat;
if (!max)
max = &replies[i].poststat;
if (min->ia_size > replies[i].poststat.ia_size)
min = &replies[i].poststat;
if (max->ia_size < replies[i].poststat.ia_size)
max = &replies[i].poststat;
}
if (min && max) {
if (min->ia_size != max->ia_size)
return _gf_true;
}
return _gf_false;
}
static void
afr_mark_biggest_witness_as_source(xlator_t *this, unsigned char *sources,
uint64_t *witness)
{
int i = 0;
afr_private_t *priv = NULL;
uint64_t biggest_witness = 0;
priv = this->private;
/* Find source with biggest witness count */
for (i = 0; i < priv->child_count; i++) {
if (!sources[i])
continue;
if (biggest_witness < witness[i])
biggest_witness = witness[i];
}
/* Mark files with less witness count as not source */
for (i = 0; i < priv->child_count; i++) {
if (!sources[i])
continue;
if (witness[i] < biggest_witness)
sources[i] = 0;
}
return;
}
/* This is a tie breaker function. Only one source be assigned here */
static void
afr_mark_newest_file_as_source(xlator_t *this, unsigned char *sources,
struct afr_reply *replies)
{
int i = 0;
afr_private_t *priv = NULL;
int source = -1;
uint32_t max_ctime = 0;
priv = this->private;
/* Find source with latest ctime */
for (i = 0; i < priv->child_count; i++) {
if (!sources[i])
continue;
if (max_ctime <= replies[i].poststat.ia_ctime) {
source = i;
max_ctime = replies[i].poststat.ia_ctime;
}
}
/* Only mark one of the files as source to break ties */
memset(sources, 0, sizeof(*sources) * priv->child_count);
sources[source] = 1;
}
static int
__afr_selfheal_data_finalize_source(
call_frame_t *frame, xlator_t *this, inode_t *inode, unsigned char *sources,
unsigned char *sinks, unsigned char *healed_sinks, unsigned char *locked_on,
unsigned char *undid_pending, struct afr_reply *replies, uint64_t *witness)
{
afr_private_t *priv = NULL;
int source = -1;
int sources_count = 0;
priv = this->private;
sources_count = AFR_COUNT(sources, priv->child_count);
if ((AFR_CMP(locked_on, healed_sinks, priv->child_count) == 0) ||
!sources_count) {
/* split brain */
source = afr_mark_split_brain_source_sinks(
frame, this, inode, sources, sinks, healed_sinks, locked_on,
replies, AFR_DATA_TRANSACTION);
if (source < 0) {
gf_event(EVENT_AFR_SPLIT_BRAIN,
"subvol=%s;type=data;"
"file=%s",
this->name, uuid_utoa(inode->gfid));
return -EIO;
}
_afr_fav_child_reset_sink_xattrs(
frame, this, inode, source, healed_sinks, undid_pending,
AFR_DATA_TRANSACTION, locked_on, replies);
goto out;
}
/* No split brain at this point. If we were called from
* afr_heal_splitbrain_file(), abort.*/
if (afr_dict_contains_heal_op(frame))
return -EIO;
/* If there are no witnesses/size-mismatches on sources we are done*/
if (!afr_does_size_mismatch(this, sources, replies) &&
!afr_has_source_witnesses(this, sources, witness))
goto out;
afr_mark_largest_file_as_source(this, sources, replies);
afr_mark_biggest_witness_as_source(this, sources, witness);
afr_mark_newest_file_as_source(this, sources, replies);
if (priv->arbiter_count)
/* Choose non-arbiter brick as source for empty files. */
afr_mark_source_sinks_if_file_empty(this, sources, sinks, healed_sinks,
locked_on, replies,
AFR_DATA_TRANSACTION);
out:
afr_mark_active_sinks(this, sources, locked_on, healed_sinks);
source = afr_choose_source_by_policy(priv, sources, AFR_DATA_TRANSACTION);
return source;
}
/*
* __afr_selfheal_data_prepare:
*
* This function inspects the on-disk xattrs and determines which subvols
* are sources and sinks.
*
* The return value is the index of the subvolume to be used as the source
* for self-healing, or -1 if no healing is necessary/split brain.
*/
int
__afr_selfheal_data_prepare(call_frame_t *frame, xlator_t *this, inode_t *inode,
unsigned char *locked_on, unsigned char *sources,
unsigned char *sinks, unsigned char *healed_sinks,
unsigned char *undid_pending,
struct afr_reply *replies, unsigned char *pflag)
{
int ret = -1;
int source = -1;
afr_private_t *priv = NULL;
uint64_t *witness = NULL;
priv = this->private;
ret = afr_selfheal_unlocked_discover(frame, inode, inode->gfid, replies);
if (ret)
return ret;
witness = alloca0(priv->child_count * sizeof(*witness));
ret = afr_selfheal_find_direction(frame, this, replies,
AFR_DATA_TRANSACTION, locked_on, sources,
sinks, witness, pflag);
if (ret)
return ret;
/* Initialize the healed_sinks[] array optimistically to
the intersection of to-be-healed (i.e sinks[]) and
the list of servers which are up (i.e locked_on[]).
As we encounter failures in the healing process, we
will unmark the respective servers in the healed_sinks[]
array.
*/
AFR_INTERSECT(healed_sinks, sinks, locked_on, priv->child_count);
source = __afr_selfheal_data_finalize_source(
frame, this, inode, sources, sinks, healed_sinks, locked_on,
undid_pending, replies, witness);
if (source < 0)
return -EIO;
return source;
}
static int
__afr_selfheal_data(call_frame_t *frame, xlator_t *this, fd_t *fd,
unsigned char *locked_on)
{
afr_private_t *priv = NULL;
int ret = -1;
unsigned char *sources = NULL;
unsigned char *sinks = NULL;
unsigned char *data_lock = NULL;
unsigned char *healed_sinks = NULL;
unsigned char *undid_pending = NULL;
struct afr_reply *locked_replies = NULL;
int source = -1;
gf_boolean_t did_sh = _gf_true;
gf_boolean_t is_arbiter_the_only_sink = _gf_false;
gf_boolean_t empty_file = _gf_false;
priv = this->private;
sources = alloca0(priv->child_count);
sinks = alloca0(priv->child_count);
healed_sinks = alloca0(priv->child_count);
data_lock = alloca0(priv->child_count);
undid_pending = alloca0(priv->child_count);
locked_replies = alloca0(sizeof(*locked_replies) * priv->child_count);
ret = afr_selfheal_inodelk(frame, this, fd->inode, this->name, 0, 0,
data_lock);
{
if (ret < priv->child_count) {
gf_msg_debug(this->name, 0,
"%s: Skipping "
"self-heal as only %d number "
"of subvolumes "
"could be locked",
uuid_utoa(fd->inode->gfid), ret);
ret = -ENOTCONN;
goto unlock;
}
ret = __afr_selfheal_data_prepare(frame, this, fd->inode, data_lock,
sources, sinks, healed_sinks,
undid_pending, locked_replies, NULL);
if (ret < 0)
goto unlock;
if (AFR_COUNT(healed_sinks, priv->child_count) == 0) {
did_sh = _gf_false;
goto unlock;
}
source = ret;
if (AFR_IS_ARBITER_BRICK(priv, source)) {
empty_file = afr_is_file_empty_on_all_children(priv,
locked_replies);
if (empty_file)
goto restore_time;
did_sh = _gf_false;
goto unlock;
}
ret = __afr_selfheal_truncate_sinks(
frame, this, fd, healed_sinks,
locked_replies[source].poststat.ia_size);
if (ret < 0)
goto unlock;
if (priv->arbiter_count &&
AFR_COUNT(healed_sinks, priv->child_count) == 1 &&
healed_sinks[ARBITER_BRICK_INDEX]) {
is_arbiter_the_only_sink = _gf_true;
goto restore_time;
}
ret = 0;
}
unlock:
afr_selfheal_uninodelk(frame, this, fd->inode, this->name, 0, 0, data_lock);
if (ret < 0)
goto out;
if (!did_sh)
goto out;
ret = afr_selfheal_data_do(frame, this, fd, source, healed_sinks,
locked_replies);
if (ret)
goto out;
restore_time:
afr_selfheal_restore_time(frame, this, fd->inode, source, healed_sinks,
locked_replies);
if (!is_arbiter_the_only_sink && !empty_file) {
ret = afr_selfheal_inodelk(frame, this, fd->inode, this->name, 0, 0,
data_lock);
if (ret < priv->child_count) {
ret = -ENOTCONN;
did_sh = _gf_false;
goto skip_undo_pending;
}
}
ret = afr_selfheal_undo_pending(
frame, this, fd->inode, sources, sinks, healed_sinks, undid_pending,
AFR_DATA_TRANSACTION, locked_replies, data_lock);
skip_undo_pending:
afr_selfheal_uninodelk(frame, this, fd->inode, this->name, 0, 0, data_lock);
out:
if (did_sh)
afr_log_selfheal(fd->inode->gfid, this, ret, "data", source, sources,
healed_sinks);
else
ret = 1;
if (locked_replies)
afr_replies_wipe(locked_replies, priv->child_count);
return ret;
}
int
afr_selfheal_data_open_cbk(call_frame_t *frame, void *cookie, xlator_t *this,
int32_t op_ret, int32_t op_errno, fd_t *fd,
dict_t *xdata)
{
afr_local_t *local = NULL;
int i = (long)cookie;
local = frame->local;
local->replies[i].valid = 1;
local->replies[i].op_ret = op_ret;
local->replies[i].op_errno = op_errno;
syncbarrier_wake(&local->barrier);
return 0;
}
int
afr_selfheal_data_open(xlator_t *this, inode_t *inode, fd_t **fd)
{
int ret = 0;
fd_t *fd_tmp = NULL;
loc_t loc = {
0,
};
call_frame_t *frame = NULL;
afr_local_t *local = NULL;
afr_private_t *priv = NULL;
int i = 0;
priv = this->private;
fd_tmp = fd_create(inode, 0);
if (!fd_tmp)
return -ENOMEM;
loc.inode = inode_ref(inode);
gf_uuid_copy(loc.gfid, inode->gfid);
frame = afr_frame_create(this, &ret);
if (!frame) {
ret = -ret;
fd_unref(fd_tmp);
goto out;
}
local = frame->local;
AFR_ONLIST(local->child_up, frame, afr_selfheal_data_open_cbk, open, &loc,
O_RDWR | O_LARGEFILE, fd_tmp, NULL);
ret = -ENOTCONN;
for (i = 0; i < priv->child_count; i++) {
if (!local->replies[i].valid)
continue;
if (local->replies[i].op_ret < 0) {
ret = -local->replies[i].op_errno;
continue;
}
ret = 0;
break;
}
if (ret < 0) {
fd_unref(fd_tmp);
goto out;
} else {
fd_bind(fd_tmp);
}
*fd = fd_tmp;
out:
loc_wipe(&loc);
if (frame)
AFR_STACK_DESTROY(frame);
return ret;
}
int
afr_selfheal_data(call_frame_t *frame, xlator_t *this, fd_t *fd)
{
afr_private_t *priv = NULL;
unsigned char *locked_on = NULL;
int ret = 0;
inode_t *inode = fd->inode;
priv = this->private;
locked_on = alloca0(priv->child_count);
ret = afr_selfheal_tie_breaker_inodelk(frame, this, inode, priv->sh_domain,
0, 0, locked_on);
{
if (ret < priv->child_count) {
gf_msg_debug(this->name, 0,
"%s: Skipping "
"self-heal as only %d number of "
"subvolumes could be locked",
uuid_utoa(fd->inode->gfid), ret);
/* Either less than two subvols available, or another
selfheal (from another server) is in progress. Skip
for now in any case there isn't anything to do.
*/
ret = -ENOTCONN;
goto unlock;
}
ret = __afr_selfheal_data(frame, this, fd, locked_on);
}
unlock:
afr_selfheal_uninodelk(frame, this, inode, priv->sh_domain, 0, 0,
locked_on);
return ret;
}