Blob Blame History Raw
/*
  Copyright (c) 2015 Red Hat, Inc. <http://www.redhat.com>
  This file is part of GlusterFS.

  This file is licensed to you under your choice of the GNU Lesser
  General Public License, version 3 or any later version (LGPLv3 or
  later), or the GNU General Public License, version 2 (GPLv2), in all
  cases as published by the Free Software Foundation.
*/

#include <glusterfs/xlator.h>
#include <glusterfs/defaults.h>
#include <glusterfs/compat-errno.h>
#include "ec.h"
#include "ec-messages.h"
#include "ec-heald.h"
#include "ec-mem-types.h"
#include <glusterfs/syncop.h>
#include <glusterfs/syncop-utils.h>
#include "protocol-common.h"

#define NTH_INDEX_HEALER(this, n)                                              \
    (&((((ec_t *)this->private))->shd.index_healers[n]))
#define NTH_FULL_HEALER(this, n)                                               \
    (&((((ec_t *)this->private))->shd.full_healers[n]))

gf_boolean_t
ec_shd_is_subvol_local(xlator_t *this, int subvol)
{
    ec_t *ec = NULL;
    gf_boolean_t is_local = _gf_false;
    loc_t loc = {
        0,
    };

    ec = this->private;
    loc.inode = this->itable->root;
    syncop_is_subvol_local(ec->xl_list[subvol], &loc, &is_local);
    return is_local;
}

char *
ec_subvol_name(xlator_t *this, int subvol)
{
    ec_t *ec = NULL;

    ec = this->private;
    if (subvol < 0 || subvol > ec->nodes)
        return NULL;

    return ec->xl_list[subvol]->name;
}

int
__ec_shd_healer_wait(struct subvol_healer *healer)
{
    ec_t *ec = NULL;
    struct timespec wait_till = {
        0,
    };
    int ret = 0;

    ec = healer->this->private;

disabled_loop:
    wait_till.tv_sec = time(NULL) + 60;

    while (!healer->rerun) {
        ret = pthread_cond_timedwait(&healer->cond, &healer->mutex, &wait_till);
        if (ret == ETIMEDOUT)
            break;
    }

    ret = healer->rerun;
    healer->rerun = 0;

    if (!ec->shd.enabled || !ec->up)
        goto disabled_loop;

    return ret;
}

int
ec_shd_healer_wait(struct subvol_healer *healer)
{
    int ret = 0;

    pthread_mutex_lock(&healer->mutex);
    {
        ret = __ec_shd_healer_wait(healer);
    }
    pthread_mutex_unlock(&healer->mutex);

    return ret;
}

int
ec_shd_index_inode(xlator_t *this, xlator_t *subvol, inode_t **inode)
{
    loc_t rootloc = {
        0,
    };
    int ret = 0;
    dict_t *xattr = NULL;
    void *index_gfid = NULL;

    *inode = NULL;
    rootloc.inode = inode_ref(this->itable->root);
    gf_uuid_copy(rootloc.gfid, rootloc.inode->gfid);

    ret = syncop_getxattr(subvol, &rootloc, &xattr, GF_XATTROP_INDEX_GFID, NULL,
                          NULL);
    if (ret < 0)
        goto out;
    if (!xattr) {
        ret = -EINVAL;
        goto out;
    }

    ret = dict_get_ptr(xattr, GF_XATTROP_INDEX_GFID, &index_gfid);
    if (ret)
        goto out;

    gf_msg_debug(this->name, 0, "index-dir gfid for %s: %s", subvol->name,
                 uuid_utoa(index_gfid));

    ret = syncop_inode_find(this, subvol, index_gfid, inode, NULL, NULL);

out:
    loc_wipe(&rootloc);

    if (xattr)
        dict_unref(xattr);

    return ret;
}

int
ec_shd_index_purge(xlator_t *subvol, inode_t *inode, char *name)
{
    loc_t loc = {
        0,
    };
    int ret = 0;

    loc.parent = inode_ref(inode);
    loc.name = name;

    ret = syncop_unlink(subvol, &loc, NULL, NULL);

    loc_wipe(&loc);
    return ret;
}

int
ec_shd_selfheal(struct subvol_healer *healer, int child, loc_t *loc,
                gf_boolean_t full)
{
    dict_t *xdata = NULL;
    uint32_t count;
    int32_t ret;

    ret = syncop_getxattr(healer->this, loc, NULL, EC_XATTR_HEAL, NULL, &xdata);
    if (!full && (loc->inode->ia_type == IA_IFDIR)) {
        /* If we have just healed a directory, it's possible that
         * other index entries have appeared to be healed. */
        if ((xdata != NULL) &&
            (dict_get_uint32(xdata, EC_XATTR_HEAL_NEW, &count) == 0) &&
            (count > 0)) {
            /* Force a rerun of the index healer. */
            gf_msg_debug(healer->this->name, 0, "%d more entries to heal",
                         count);

            healer->rerun = _gf_true;
        }
    }

    if (xdata != NULL) {
        dict_unref(xdata);
    }

    return ret;
}

int
ec_shd_index_heal(xlator_t *subvol, gf_dirent_t *entry, loc_t *parent,
                  void *data)
{
    struct subvol_healer *healer = data;
    ec_t *ec = NULL;
    loc_t loc = {0};
    int ret = 0;

    ec = healer->this->private;
    if (ec->xl_up_count <= ec->fragments) {
        return -ENOTCONN;
    }
    if (!ec->shd.enabled)
        return -EBUSY;

    gf_msg_debug(healer->this->name, 0, "got entry: %s", entry->d_name);

    ret = gf_uuid_parse(entry->d_name, loc.gfid);
    if (ret)
        return 0;

    /* If this fails with ENOENT/ESTALE index is stale */
    ret = syncop_gfid_to_path(healer->this->itable, subvol, loc.gfid,
                              (char **)&loc.path);
    if (ret < 0)
        goto out;

    ret = syncop_inode_find(healer->this, healer->this, loc.gfid, &loc.inode,
                            NULL, NULL);
    if (ret < 0)
        goto out;

    ec_shd_selfheal(healer, healer->subvol, &loc, _gf_false);
out:
    if (ret == -ENOENT || ret == -ESTALE) {
        gf_msg(healer->this->name, GF_LOG_DEBUG, 0, EC_MSG_HEAL_FAIL,
               "Purging index for gfid %s:", uuid_utoa(loc.gfid));
        ec_shd_index_purge(subvol, parent->inode, entry->d_name);
    }
    loc_wipe(&loc);

    return 0;
}

int
ec_shd_index_sweep(struct subvol_healer *healer)
{
    loc_t loc = {0};
    ec_t *ec = NULL;
    int ret = 0;
    xlator_t *subvol = NULL;
    dict_t *xdata = NULL;

    ec = healer->this->private;
    subvol = ec->xl_list[healer->subvol];

    ret = ec_shd_index_inode(healer->this, subvol, &loc.inode);
    if (ret < 0) {
        gf_msg(healer->this->name, GF_LOG_WARNING, errno,
               EC_MSG_INDEX_DIR_GET_FAIL, "unable to get index-dir on %s",
               subvol->name);
        goto out;
    }

    xdata = dict_new();
    if (!xdata || dict_set_int32(xdata, "get-gfid-type", 1)) {
        ret = -ENOMEM;
        goto out;
    }

    ret = syncop_mt_dir_scan(NULL, subvol, &loc, GF_CLIENT_PID_SELF_HEALD,
                             healer, ec_shd_index_heal, xdata,
                             ec->shd.max_threads, ec->shd.wait_qlength);
out:
    if (xdata)
        dict_unref(xdata);
    loc_wipe(&loc);

    return ret;
}

int
ec_shd_full_heal(xlator_t *subvol, gf_dirent_t *entry, loc_t *parent,
                 void *data)
{
    struct subvol_healer *healer = data;
    xlator_t *this = healer->this;
    ec_t *ec = NULL;
    loc_t loc = {0};
    int ret = 0;

    ec = this->private;
    if (ec->xl_up_count <= ec->fragments) {
        return -ENOTCONN;
    }
    if (!ec->shd.enabled)
        return -EBUSY;

    if (gf_uuid_is_null(entry->d_stat.ia_gfid)) {
        /* It's possible that an entry has been removed just after
         * being seen in a directory but before getting its stat info.
         * In this case we'll receive a NULL gfid here. Since the file
         * doesn't exist anymore, we can safely ignore it. */
        return 0;
    }

    loc.parent = inode_ref(parent->inode);
    loc.name = entry->d_name;
    gf_uuid_copy(loc.gfid, entry->d_stat.ia_gfid);

    /* If this fails with ENOENT/ESTALE index is stale */
    ret = syncop_gfid_to_path(this->itable, subvol, loc.gfid,
                              (char **)&loc.path);
    if (ret < 0)
        goto out;

    ret = syncop_inode_find(this, this, loc.gfid, &loc.inode, NULL, NULL);
    if (ret < 0)
        goto out;

    ec_shd_selfheal(healer, healer->subvol, &loc, _gf_true);

    ret = 0;

out:
    loc_wipe(&loc);
    return ret;
}

int
ec_shd_full_sweep(struct subvol_healer *healer, inode_t *inode)
{
    ec_t *ec = NULL;
    loc_t loc = {0};

    ec = healer->this->private;
    loc.inode = inode;
    return syncop_ftw(ec->xl_list[healer->subvol], &loc,
                      GF_CLIENT_PID_SELF_HEALD, healer, ec_shd_full_heal);
}

void *
ec_shd_index_healer(void *data)
{
    struct subvol_healer *healer = NULL;
    xlator_t *this = NULL;

    healer = data;
    THIS = this = healer->this;
    ec_t *ec = this->private;

    for (;;) {
        ec_shd_healer_wait(healer);

        if (ec->xl_up_count > ec->fragments) {
            gf_msg_debug(this->name, 0, "starting index sweep on subvol %s",
                         ec_subvol_name(this, healer->subvol));
            ec_shd_index_sweep(healer);
        }
        gf_msg_debug(this->name, 0, "finished index sweep on subvol %s",
                     ec_subvol_name(this, healer->subvol));
    }

    return NULL;
}

void *
ec_shd_full_healer(void *data)
{
    struct subvol_healer *healer = NULL;
    xlator_t *this = NULL;
    loc_t rootloc = {0};

    int run = 0;

    healer = data;
    THIS = this = healer->this;
    ec_t *ec = this->private;

    rootloc.inode = this->itable->root;
    for (;;) {
        pthread_mutex_lock(&healer->mutex);
        {
            run = __ec_shd_healer_wait(healer);
            if (!run)
                healer->running = _gf_false;
        }
        pthread_mutex_unlock(&healer->mutex);

        if (!run)
            break;

        if (ec->xl_up_count > ec->fragments) {
            gf_msg(this->name, GF_LOG_INFO, 0, EC_MSG_FULL_SWEEP_START,
                   "starting full sweep on subvol %s",
                   ec_subvol_name(this, healer->subvol));

            ec_shd_selfheal(healer, healer->subvol, &rootloc, _gf_true);
            ec_shd_full_sweep(healer, this->itable->root);
        }

        gf_msg(this->name, GF_LOG_INFO, 0, EC_MSG_FULL_SWEEP_STOP,
               "finished full sweep on subvol %s",
               ec_subvol_name(this, healer->subvol));
    }

    return NULL;
}

int
ec_shd_healer_init(xlator_t *this, struct subvol_healer *healer)
{
    int ret = 0;

    ret = pthread_mutex_init(&healer->mutex, NULL);
    if (ret)
        goto out;

    ret = pthread_cond_init(&healer->cond, NULL);
    if (ret)
        goto out;

    healer->this = this;
    healer->running = _gf_false;
    healer->rerun = _gf_false;
out:
    return ret;
}

int
ec_shd_healer_spawn(xlator_t *this, struct subvol_healer *healer,
                    void *(threadfn)(void *))
{
    int ret = 0;

    pthread_mutex_lock(&healer->mutex);
    {
        if (healer->running) {
            pthread_cond_signal(&healer->cond);
        } else {
            ret = gf_thread_create(&healer->thread, NULL, threadfn, healer,
                                   "ecshd");
            if (ret)
                goto unlock;
            healer->running = 1;
        }

        healer->rerun = 1;
    }
unlock:
    pthread_mutex_unlock(&healer->mutex);

    return ret;
}

int
ec_shd_full_healer_spawn(xlator_t *this, int subvol)
{
    return ec_shd_healer_spawn(this, NTH_FULL_HEALER(this, subvol),
                               ec_shd_full_healer);
}

int
ec_shd_index_healer_spawn(xlator_t *this, int subvol)
{
    return ec_shd_healer_spawn(this, NTH_INDEX_HEALER(this, subvol),
                               ec_shd_index_healer);
}

void
ec_shd_index_healer_wake(ec_t *ec)
{
    int32_t i;

    for (i = 0; i < ec->nodes; i++) {
        if (((ec->xl_up >> i) & 1) != 0) {
            ec_shd_index_healer_spawn(ec->xl, i);
        }
    }
}

int
ec_selfheal_daemon_init(xlator_t *this)
{
    ec_t *ec = NULL;
    ec_self_heald_t *shd = NULL;
    int ret = -1;
    int i = 0;

    ec = this->private;
    shd = &ec->shd;

    shd->index_healers = GF_CALLOC(sizeof(*shd->index_healers), ec->nodes,
                                   ec_mt_subvol_healer_t);
    if (!shd->index_healers)
        goto out;

    for (i = 0; i < ec->nodes; i++) {
        shd->index_healers[i].subvol = i;
        ret = ec_shd_healer_init(this, &shd->index_healers[i]);
        if (ret)
            goto out;
    }

    shd->full_healers = GF_CALLOC(sizeof(*shd->full_healers), ec->nodes,
                                  ec_mt_subvol_healer_t);
    if (!shd->full_healers)
        goto out;

    for (i = 0; i < ec->nodes; i++) {
        shd->full_healers[i].subvol = i;
        ret = ec_shd_healer_init(this, &shd->full_healers[i]);
        if (ret)
            goto out;
    }

    ret = 0;
out:
    return ret;
}

int
ec_heal_op(xlator_t *this, dict_t *output, gf_xl_afr_op_t op, int xl_id)
{
    char key[64] = {0};
    int op_ret = 0;
    ec_t *ec = NULL;
    int i = 0;
    GF_UNUSED int ret = 0;

    ec = this->private;

    op_ret = -1;
    for (i = 0; i < ec->nodes; i++) {
        snprintf(key, sizeof(key), "%d-%d-status", xl_id, i);

        if (((ec->xl_up >> i) & 1) == 0) {
            ret = dict_set_str(output, key, "Brick is not connected");
        } else if (!ec->up) {
            ret = dict_set_str(output, key, "Disperse subvolume is not up");
        } else if (!ec_shd_is_subvol_local(this, i)) {
            ret = dict_set_str(output, key, "Brick is remote");
        } else {
            ret = dict_set_str(output, key, "Started self-heal");
            if (op == GF_SHD_OP_HEAL_FULL) {
                ec_shd_full_healer_spawn(this, i);
            } else if (op == GF_SHD_OP_HEAL_INDEX) {
                ec_shd_index_healer_spawn(this, i);
            }
            op_ret = 0;
        }
    }
    return op_ret;
}

int
ec_xl_op(xlator_t *this, dict_t *input, dict_t *output)
{
    gf_xl_afr_op_t op = GF_SHD_OP_INVALID;
    int ret = 0;
    int xl_id = 0;

    ret = dict_get_int32(input, "xl-op", (int32_t *)&op);
    if (ret)
        goto out;

    ret = dict_get_int32(input, this->name, &xl_id);
    if (ret)
        goto out;

    ret = dict_set_int32(output, this->name, xl_id);
    if (ret)
        goto out;

    switch (op) {
        case GF_SHD_OP_HEAL_FULL:
            ret = ec_heal_op(this, output, op, xl_id);
            break;

        case GF_SHD_OP_HEAL_INDEX:
            ret = ec_heal_op(this, output, op, xl_id);
            break;

        default:
            ret = -1;
            break;
    }
out:
    dict_del(output, this->name);
    return ret;
}