Blob Blame History Raw
/*
   Copyright (c) 2012-2012 Red Hat, Inc. <http://www.redhat.com>
   This file is part of GlusterFS.

   This file is licensed to you under your choice of the GNU Lesser
   General Public License, version 3 or any later version (LGPLv3 or
   later), or the GNU General Public License, version 2 (GPLv2), in all
   cases as published by the Free Software Foundation.
*/
/* rpc related syncops */
#include "rpc-clnt.h"
#include "protocol-common.h"
#include "xdr-generic.h"
#include "glusterd1-xdr.h"
#include "glusterd-syncop.h"
#include "glusterd-mgmt.h"

#include "glusterd.h"
#include "glusterd-op-sm.h"
#include "glusterd-utils.h"
#include "glusterd-server-quorum.h"
#include "glusterd-locks.h"
#include "glusterd-snapshot-utils.h"
#include "glusterd-messages.h"
#include "glusterd-errno.h"

extern glusterd_op_info_t opinfo;

void
gd_synctask_barrier_wait(struct syncargs *args, int count)
{
    glusterd_conf_t *conf = THIS->private;

    synclock_unlock(&conf->big_lock);
    synctask_barrier_wait(args, count);
    synclock_lock(&conf->big_lock);

    syncbarrier_destroy(&args->barrier);
}

static void
gd_collate_errors(struct syncargs *args, int op_ret, int op_errno,
                  char *op_errstr, int op_code, uuid_t peerid, u_char *uuid)
{
    char err_str[PATH_MAX] = "Please check log file for details.";
    char op_err[PATH_MAX] = "";
    int len = -1;
    char *peer_str = NULL;
    glusterd_peerinfo_t *peerinfo = NULL;

    if (op_ret) {
        args->op_ret = op_ret;
        args->op_errno = op_errno;

        RCU_READ_LOCK;
        peerinfo = glusterd_peerinfo_find(peerid, NULL);
        if (peerinfo)
            peer_str = gf_strdup(peerinfo->hostname);
        else
            peer_str = gf_strdup(uuid_utoa(uuid));
        RCU_READ_UNLOCK;

        if (op_errstr && strcmp(op_errstr, "")) {
            len = snprintf(err_str, sizeof(err_str) - 1, "Error: %s",
                           op_errstr);
            err_str[len] = '\0';
        }

        switch (op_code) {
            case GLUSTERD_MGMT_CLUSTER_LOCK: {
                len = snprintf(op_err, sizeof(op_err) - 1,
                               "Locking failed on %s. %s", peer_str, err_str);
                break;
            }
            case GLUSTERD_MGMT_CLUSTER_UNLOCK: {
                len = snprintf(op_err, sizeof(op_err) - 1,
                               "Unlocking failed on %s. %s", peer_str, err_str);
                break;
            }
            case GLUSTERD_MGMT_STAGE_OP: {
                len = snprintf(op_err, sizeof(op_err) - 1,
                               "Staging failed on %s. %s", peer_str, err_str);
                break;
            }
            case GLUSTERD_MGMT_COMMIT_OP: {
                len = snprintf(op_err, sizeof(op_err) - 1,
                               "Commit failed on %s. %s", peer_str, err_str);
                break;
            }
        }

        if (len > 0)
            op_err[len] = '\0';

        if (args->errstr) {
            len = snprintf(err_str, sizeof(err_str) - 1, "%s\n%s", args->errstr,
                           op_err);
            GF_FREE(args->errstr);
            args->errstr = NULL;
        } else
            len = snprintf(err_str, sizeof(err_str) - 1, "%s", op_err);
        err_str[len] = '\0';

        gf_msg("glusterd", GF_LOG_ERROR, 0, GD_MSG_MGMT_OP_FAIL, "%s", op_err);
        args->errstr = gf_strdup(err_str);
    }

    GF_FREE(peer_str);

    return;
}

void
gd_syncargs_init(struct syncargs *args, dict_t *op_ctx)
{
    args->dict = op_ctx;
    pthread_mutex_init(&args->lock_dict, NULL);
}

static void
gd_stage_op_req_free(gd1_mgmt_stage_op_req *req)
{
    if (!req)
        return;

    GF_FREE(req->buf.buf_val);
    GF_FREE(req);
}

static void
gd_commit_op_req_free(gd1_mgmt_commit_op_req *req)
{
    if (!req)
        return;

    GF_FREE(req->buf.buf_val);
    GF_FREE(req);
}

static void
gd_brick_op_req_free(gd1_mgmt_brick_op_req *req)
{
    if (!req)
        return;

    GF_FREE(req->input.input_val);
    GF_FREE(req);
}

int
gd_syncop_submit_request(struct rpc_clnt *rpc, void *req, void *local,
                         void *cookie, rpc_clnt_prog_t *prog, int procnum,
                         fop_cbk_fn_t cbkfn, xdrproc_t xdrproc)
{
    int ret = -1;
    struct iobuf *iobuf = NULL;
    struct iobref *iobref = NULL;
    int count = 0;
    struct iovec iov = {
        0,
    };
    ssize_t req_size = 0;
    call_frame_t *frame = NULL;

    GF_ASSERT(rpc);
    if (!req)
        goto out;

    req_size = xdr_sizeof(xdrproc, req);
    iobuf = iobuf_get2(rpc->ctx->iobuf_pool, req_size);
    if (!iobuf)
        goto out;

    iobref = iobref_new();
    if (!iobref)
        goto out;

    frame = create_frame(THIS, THIS->ctx->pool);
    if (!frame)
        goto out;

    iobref_add(iobref, iobuf);

    iov.iov_base = iobuf->ptr;
    iov.iov_len = iobuf_pagesize(iobuf);

    /* Create the xdr payload */
    ret = xdr_serialize_generic(iov, req, xdrproc);
    if (ret == -1)
        goto out;

    iov.iov_len = ret;
    count = 1;

    frame->local = local;
    frame->cookie = cookie;

    /* Send the msg */
    ret = rpc_clnt_submit(rpc, prog, procnum, cbkfn, &iov, count, NULL, 0,
                          iobref, frame, NULL, 0, NULL, 0, NULL);

    /* TODO: do we need to start ping also? */

out:
    iobref_unref(iobref);
    iobuf_unref(iobuf);

    if (ret && frame)
        STACK_DESTROY(frame->root);
    return ret;
}

/* Defined in glusterd-rpc-ops.c */
extern struct rpc_clnt_program gd_mgmt_prog;
extern struct rpc_clnt_program gd_brick_prog;
extern struct rpc_clnt_program gd_mgmt_v3_prog;

int
glusterd_syncop_aggr_rsp_dict(glusterd_op_t op, dict_t *aggr, dict_t *rsp)
{
    int ret = 0;
    xlator_t *this = NULL;

    this = THIS;
    GF_ASSERT(this);

    switch (op) {
        case GD_OP_CREATE_VOLUME:
        case GD_OP_ADD_BRICK:
        case GD_OP_START_VOLUME:
        case GD_OP_ADD_TIER_BRICK:
            ret = glusterd_aggr_brick_mount_dirs(aggr, rsp);
            if (ret) {
                gf_msg(this->name, GF_LOG_ERROR, 0,
                       GD_MSG_BRICK_MOUNDIRS_AGGR_FAIL,
                       "Failed to "
                       "aggregate brick mount dirs");
                goto out;
            }
            break;

        case GD_OP_REPLACE_BRICK:
        case GD_OP_RESET_BRICK:
            ret = glusterd_rb_use_rsp_dict(aggr, rsp);
            if (ret)
                goto out;
            break;

        case GD_OP_SYNC_VOLUME:
            ret = glusterd_sync_use_rsp_dict(aggr, rsp);
            if (ret)
                goto out;
            break;

        case GD_OP_GSYNC_CREATE:
            break;

        case GD_OP_GSYNC_SET:
            ret = glusterd_gsync_use_rsp_dict(aggr, rsp, NULL);
            if (ret)
                goto out;
            break;

        case GD_OP_STATUS_VOLUME:
            ret = glusterd_volume_status_copy_to_op_ctx_dict(aggr, rsp);
            if (ret)
                goto out;
            break;

        case GD_OP_HEAL_VOLUME:
            ret = glusterd_volume_heal_use_rsp_dict(aggr, rsp);
            if (ret)
                goto out;

            break;

        case GD_OP_CLEARLOCKS_VOLUME:
            ret = glusterd_use_rsp_dict(aggr, rsp);
            if (ret)
                goto out;
            break;

        case GD_OP_QUOTA:
            ret = glusterd_volume_quota_copy_to_op_ctx_dict(aggr, rsp);
            if (ret)
                goto out;
            break;

        case GD_OP_SYS_EXEC:
            ret = glusterd_sys_exec_output_rsp_dict(aggr, rsp);
            if (ret)
                goto out;
            break;

        case GD_OP_SNAP:
            ret = glusterd_snap_use_rsp_dict(aggr, rsp);
            if (ret)
                goto out;
            break;

        case GD_OP_SCRUB_STATUS:
            ret = glusterd_volume_bitrot_scrub_use_rsp_dict(aggr, rsp);
            break;

        case GD_OP_SCRUB_ONDEMAND:
            break;

        case GD_OP_MAX_OPVERSION:
            ret = glusterd_max_opversion_use_rsp_dict(aggr, rsp);
            break;

        case GD_OP_PROFILE_VOLUME:
            ret = glusterd_profile_volume_use_rsp_dict(aggr, rsp);
            break;

        case GD_OP_REBALANCE:
        case GD_OP_DEFRAG_BRICK_VOLUME:
            ret = glusterd_volume_rebalance_use_rsp_dict(aggr, rsp);
            break;

        case GD_OP_TIER_STATUS:
        case GD_OP_DETACH_TIER_STATUS:
        case GD_OP_REMOVE_TIER_BRICK:
            ret = glusterd_volume_tier_use_rsp_dict(aggr, rsp);
        /* FALLTHROUGH */
        default:
            break;
    }
out:
    return ret;
}

int32_t
gd_syncop_mgmt_v3_lock_cbk_fn(struct rpc_req *req, struct iovec *iov, int count,
                              void *myframe)
{
    int ret = -1;
    struct syncargs *args = NULL;
    gd1_mgmt_v3_lock_rsp rsp = {
        {0},
    };
    call_frame_t *frame = NULL;
    int op_ret = -1;
    int op_errno = -1;
    xlator_t *this = NULL;
    uuid_t *peerid = NULL;

    this = THIS;
    GF_ASSERT(this);
    GF_ASSERT(req);
    GF_ASSERT(myframe);

    frame = myframe;
    args = frame->local;
    peerid = frame->cookie;
    frame->local = NULL;
    frame->cookie = NULL;

    if (-1 == req->rpc_status) {
        op_errno = ENOTCONN;
        goto out;
    }

    GF_VALIDATE_OR_GOTO_WITH_ERROR(this->name, iov, out, op_errno, EINVAL);

    ret = xdr_to_generic(*iov, &rsp, (xdrproc_t)xdr_gd1_mgmt_v3_lock_rsp);
    if (ret < 0)
        goto out;

    gf_uuid_copy(args->uuid, rsp.uuid);

    op_ret = rsp.op_ret;
    op_errno = rsp.op_errno;
out:
    gd_mgmt_v3_collate_errors(args, op_ret, op_errno, NULL,
                              GLUSTERD_MGMT_V3_LOCK, *peerid, rsp.uuid);

    GF_FREE(peerid);
    /* req->rpc_status set to -1 means, STACK_DESTROY will be called from
     * the caller function.
     */
    if (req->rpc_status != -1)
        STACK_DESTROY(frame->root);
    synctask_barrier_wake(args);
    return 0;
}

int32_t
gd_syncop_mgmt_v3_lock_cbk(struct rpc_req *req, struct iovec *iov, int count,
                           void *myframe)
{
    return glusterd_big_locked_cbk(req, iov, count, myframe,
                                   gd_syncop_mgmt_v3_lock_cbk_fn);
}

int
gd_syncop_mgmt_v3_lock(glusterd_op_t op, dict_t *op_ctx,
                       glusterd_peerinfo_t *peerinfo, struct syncargs *args,
                       uuid_t my_uuid, uuid_t recv_uuid, uuid_t txn_id)
{
    int ret = -1;
    gd1_mgmt_v3_lock_req req = {
        {0},
    };
    uuid_t *peerid = NULL;

    GF_ASSERT(op_ctx);
    GF_ASSERT(peerinfo);
    GF_ASSERT(args);

    ret = dict_allocate_and_serialize(op_ctx, &req.dict.dict_val,
                                      &req.dict.dict_len);
    if (ret)
        goto out;

    gf_uuid_copy(req.uuid, my_uuid);
    gf_uuid_copy(req.txn_id, txn_id);
    req.op = op;

    GD_ALLOC_COPY_UUID(peerid, peerinfo->uuid, ret);
    if (ret)
        goto out;

    ret = gd_syncop_submit_request(peerinfo->rpc, &req, args, peerid,
                                   &gd_mgmt_v3_prog, GLUSTERD_MGMT_V3_LOCK,
                                   gd_syncop_mgmt_v3_lock_cbk,
                                   (xdrproc_t)xdr_gd1_mgmt_v3_lock_req);
out:
    GF_FREE(req.dict.dict_val);
    gf_msg_debug("glusterd", 0, "Returning %d", ret);
    return ret;
}

int32_t
gd_syncop_mgmt_v3_unlock_cbk_fn(struct rpc_req *req, struct iovec *iov,
                                int count, void *myframe)
{
    int ret = -1;
    struct syncargs *args = NULL;
    gd1_mgmt_v3_unlock_rsp rsp = {
        {0},
    };
    call_frame_t *frame = NULL;
    int op_ret = -1;
    int op_errno = -1;
    xlator_t *this = NULL;
    uuid_t *peerid = NULL;

    this = THIS;
    GF_ASSERT(this);
    GF_ASSERT(req);
    GF_ASSERT(myframe);

    frame = myframe;
    args = frame->local;
    peerid = frame->cookie;
    frame->local = NULL;
    frame->cookie = NULL;

    if (-1 == req->rpc_status) {
        op_errno = ENOTCONN;
        goto out;
    }

    GF_VALIDATE_OR_GOTO_WITH_ERROR(this->name, iov, out, op_errno, EINVAL);

    ret = xdr_to_generic(*iov, &rsp, (xdrproc_t)xdr_gd1_mgmt_v3_unlock_rsp);
    if (ret < 0)
        goto out;

    gf_uuid_copy(args->uuid, rsp.uuid);

    op_ret = rsp.op_ret;
    op_errno = rsp.op_errno;
out:
    gd_mgmt_v3_collate_errors(args, op_ret, op_errno, NULL,
                              GLUSTERD_MGMT_V3_UNLOCK, *peerid, rsp.uuid);

    GF_FREE(peerid);
    /* req->rpc_status set to -1 means, STACK_DESTROY will be called from
     * the caller function.
     */
    if (req->rpc_status != -1)
        STACK_DESTROY(frame->root);
    synctask_barrier_wake(args);
    return 0;
}

int32_t
gd_syncop_mgmt_v3_unlock_cbk(struct rpc_req *req, struct iovec *iov, int count,
                             void *myframe)
{
    return glusterd_big_locked_cbk(req, iov, count, myframe,
                                   gd_syncop_mgmt_v3_unlock_cbk_fn);
}

int
gd_syncop_mgmt_v3_unlock(dict_t *op_ctx, glusterd_peerinfo_t *peerinfo,
                         struct syncargs *args, uuid_t my_uuid,
                         uuid_t recv_uuid, uuid_t txn_id)
{
    int ret = -1;
    gd1_mgmt_v3_unlock_req req = {
        {0},
    };
    uuid_t *peerid = NULL;

    GF_ASSERT(op_ctx);
    GF_ASSERT(peerinfo);
    GF_ASSERT(args);

    ret = dict_allocate_and_serialize(op_ctx, &req.dict.dict_val,
                                      &req.dict.dict_len);
    if (ret)
        goto out;

    gf_uuid_copy(req.uuid, my_uuid);
    gf_uuid_copy(req.txn_id, txn_id);

    GD_ALLOC_COPY_UUID(peerid, peerinfo->uuid, ret);
    if (ret)
        goto out;

    ret = gd_syncop_submit_request(peerinfo->rpc, &req, args, peerid,
                                   &gd_mgmt_v3_prog, GLUSTERD_MGMT_V3_UNLOCK,
                                   gd_syncop_mgmt_v3_unlock_cbk,
                                   (xdrproc_t)xdr_gd1_mgmt_v3_unlock_req);
out:
    GF_FREE(req.dict.dict_val);
    gf_msg_debug("glusterd", 0, "Returning %d", ret);
    return ret;
}

int32_t
_gd_syncop_mgmt_lock_cbk(struct rpc_req *req, struct iovec *iov, int count,
                         void *myframe)
{
    int ret = -1;
    struct syncargs *args = NULL;
    glusterd_peerinfo_t *peerinfo = NULL;
    gd1_mgmt_cluster_lock_rsp rsp = {
        {0},
    };
    call_frame_t *frame = NULL;
    int op_ret = -1;
    int op_errno = -1;
    xlator_t *this = NULL;
    uuid_t *peerid = NULL;

    this = THIS;
    GF_ASSERT(this);

    frame = myframe;
    args = frame->local;
    peerid = frame->cookie;
    frame->local = NULL;
    frame->cookie = NULL;

    if (-1 == req->rpc_status) {
        op_errno = ENOTCONN;
        goto out;
    }

    GF_VALIDATE_OR_GOTO_WITH_ERROR(this->name, iov, out, op_errno, EINVAL);

    ret = xdr_to_generic(*iov, &rsp, (xdrproc_t)xdr_gd1_mgmt_cluster_lock_rsp);
    if (ret < 0)
        goto out;

    gf_uuid_copy(args->uuid, rsp.uuid);

    RCU_READ_LOCK;
    peerinfo = glusterd_peerinfo_find(*peerid, NULL);
    if (peerinfo) {
        /* Set peer as locked, so we unlock only the locked peers */
        if (rsp.op_ret == 0)
            peerinfo->locked = _gf_true;
    } else {
        rsp.op_ret = -1;
        gf_msg(this->name, GF_LOG_ERROR, EINVAL, GD_MSG_PEER_NOT_FOUND,
               "Could not find peer with "
               "ID %s",
               uuid_utoa(*peerid));
    }
    RCU_READ_UNLOCK;

    op_ret = rsp.op_ret;
    op_errno = rsp.op_errno;
out:
    gd_collate_errors(args, op_ret, op_errno, NULL, GLUSTERD_MGMT_CLUSTER_LOCK,
                      *peerid, rsp.uuid);

    GF_FREE(peerid);
    /* req->rpc_status set to -1 means, STACK_DESTROY will be called from
     * the caller function.
     */
    if (req->rpc_status != -1)
        STACK_DESTROY(frame->root);
    synctask_barrier_wake(args);
    return 0;
}

int32_t
gd_syncop_mgmt_lock_cbk(struct rpc_req *req, struct iovec *iov, int count,
                        void *myframe)
{
    return glusterd_big_locked_cbk(req, iov, count, myframe,
                                   _gd_syncop_mgmt_lock_cbk);
}

int
gd_syncop_mgmt_lock(glusterd_peerinfo_t *peerinfo, struct syncargs *args,
                    uuid_t my_uuid, uuid_t recv_uuid)
{
    int ret = -1;
    gd1_mgmt_cluster_lock_req req = {
        {0},
    };
    uuid_t *peerid = NULL;

    gf_uuid_copy(req.uuid, my_uuid);
    GD_ALLOC_COPY_UUID(peerid, peerinfo->uuid, ret);
    if (ret)
        goto out;

    ret = gd_syncop_submit_request(peerinfo->rpc, &req, args, peerid,
                                   &gd_mgmt_prog, GLUSTERD_MGMT_CLUSTER_LOCK,
                                   gd_syncop_mgmt_lock_cbk,
                                   (xdrproc_t)xdr_gd1_mgmt_cluster_lock_req);
out:
    return ret;
}

int32_t
_gd_syncop_mgmt_unlock_cbk(struct rpc_req *req, struct iovec *iov, int count,
                           void *myframe)
{
    int ret = -1;
    struct syncargs *args = NULL;
    glusterd_peerinfo_t *peerinfo = NULL;
    gd1_mgmt_cluster_unlock_rsp rsp = {
        {0},
    };
    call_frame_t *frame = NULL;
    int op_ret = -1;
    int op_errno = -1;
    xlator_t *this = NULL;
    uuid_t *peerid = NULL;

    this = THIS;
    GF_ASSERT(this);

    frame = myframe;
    args = frame->local;
    peerid = frame->cookie;
    frame->local = NULL;
    frame->cookie = NULL;

    if (-1 == req->rpc_status) {
        op_errno = ENOTCONN;
        goto out;
    }

    GF_VALIDATE_OR_GOTO_WITH_ERROR(this->name, iov, out, op_errno, EINVAL);

    ret = xdr_to_generic(*iov, &rsp,
                         (xdrproc_t)xdr_gd1_mgmt_cluster_unlock_rsp);
    if (ret < 0)
        goto out;

    gf_uuid_copy(args->uuid, rsp.uuid);

    RCU_READ_LOCK;
    peerinfo = glusterd_peerinfo_find(*peerid, NULL);
    if (peerinfo) {
        peerinfo->locked = _gf_false;
    } else {
        rsp.op_ret = -1;
        gf_msg(this->name, GF_LOG_ERROR, EINVAL, GD_MSG_PEER_NOT_FOUND,
               "Could not find peer with "
               "ID %s",
               uuid_utoa(*peerid));
    }
    RCU_READ_UNLOCK;

    op_ret = rsp.op_ret;
    op_errno = rsp.op_errno;
out:
    gd_collate_errors(args, op_ret, op_errno, NULL,
                      GLUSTERD_MGMT_CLUSTER_UNLOCK, *peerid, rsp.uuid);

    GF_FREE(peerid);
    /* req->rpc_status set to -1 means, STACK_DESTROY will be called from
     * the caller function.
     */
    if (req->rpc_status != -1)
        STACK_DESTROY(frame->root);
    synctask_barrier_wake(args);
    return 0;
}

int32_t
gd_syncop_mgmt_unlock_cbk(struct rpc_req *req, struct iovec *iov, int count,
                          void *myframe)
{
    return glusterd_big_locked_cbk(req, iov, count, myframe,
                                   _gd_syncop_mgmt_unlock_cbk);
}

int
gd_syncop_mgmt_unlock(glusterd_peerinfo_t *peerinfo, struct syncargs *args,
                      uuid_t my_uuid, uuid_t recv_uuid)
{
    int ret = -1;
    gd1_mgmt_cluster_unlock_req req = {
        {0},
    };
    uuid_t *peerid = NULL;

    gf_uuid_copy(req.uuid, my_uuid);
    GD_ALLOC_COPY_UUID(peerid, peerinfo->uuid, ret);
    if (ret)
        goto out;

    ret = gd_syncop_submit_request(peerinfo->rpc, &req, args, peerid,
                                   &gd_mgmt_prog, GLUSTERD_MGMT_CLUSTER_UNLOCK,
                                   gd_syncop_mgmt_unlock_cbk,
                                   (xdrproc_t)xdr_gd1_mgmt_cluster_lock_req);
out:
    return ret;
}

int32_t
_gd_syncop_stage_op_cbk(struct rpc_req *req, struct iovec *iov, int count,
                        void *myframe)
{
    int ret = -1;
    gd1_mgmt_stage_op_rsp rsp = {
        {0},
    };
    struct syncargs *args = NULL;
    xlator_t *this = NULL;
    dict_t *rsp_dict = NULL;
    call_frame_t *frame = NULL;
    int op_ret = -1;
    int op_errno = -1;
    uuid_t *peerid = NULL;

    this = THIS;
    GF_ASSERT(this);

    frame = myframe;
    args = frame->local;
    peerid = frame->cookie;
    frame->local = NULL;
    frame->cookie = NULL;

    if (-1 == req->rpc_status) {
        op_errno = ENOTCONN;
        goto out;
    }

    GF_VALIDATE_OR_GOTO_WITH_ERROR(this->name, iov, out, op_errno, EINVAL);

    ret = xdr_to_generic(*iov, &rsp, (xdrproc_t)xdr_gd1_mgmt_stage_op_rsp);
    if (ret < 0)
        goto out;

    if (rsp.dict.dict_len) {
        /* Unserialize the dictionary */
        rsp_dict = dict_new();

        ret = dict_unserialize(rsp.dict.dict_val, rsp.dict.dict_len, &rsp_dict);
        if (ret < 0) {
            GF_FREE(rsp.dict.dict_val);
            goto out;
        } else {
            rsp_dict->extra_stdfree = rsp.dict.dict_val;
        }
    }

    RCU_READ_LOCK;
    ret = (glusterd_peerinfo_find(rsp.uuid, NULL) == NULL);
    RCU_READ_UNLOCK;
    if (ret) {
        ret = -1;
        gf_msg(this->name, GF_LOG_CRITICAL, 0, GD_MSG_RESP_FROM_UNKNOWN_PEER,
               "Staging response "
               "for 'Volume %s' received from unknown "
               "peer: %s",
               gd_op_list[rsp.op], uuid_utoa(rsp.uuid));
        goto out;
    }

    gf_uuid_copy(args->uuid, rsp.uuid);
    if (rsp.op == GD_OP_REPLACE_BRICK || rsp.op == GD_OP_QUOTA ||
        rsp.op == GD_OP_CREATE_VOLUME || rsp.op == GD_OP_ADD_BRICK ||
        rsp.op == GD_OP_START_VOLUME) {
        pthread_mutex_lock(&args->lock_dict);
        {
            ret = glusterd_syncop_aggr_rsp_dict(rsp.op, args->dict, rsp_dict);
            if (ret)
                gf_msg(this->name, GF_LOG_ERROR, 0, GD_MSG_RESP_AGGR_FAIL, "%s",
                       "Failed to aggregate response from "
                       " node/brick");
        }
        pthread_mutex_unlock(&args->lock_dict);
    }

    op_ret = rsp.op_ret;
    op_errno = rsp.op_errno;

out:
    gd_collate_errors(args, op_ret, op_errno, rsp.op_errstr,
                      GLUSTERD_MGMT_STAGE_OP, *peerid, rsp.uuid);

    if (rsp_dict)
        dict_unref(rsp_dict);
    GF_FREE(peerid);
    /* req->rpc_status set to -1 means, STACK_DESTROY will be called from
     * the caller function.
     */
    if (req->rpc_status != -1)
        STACK_DESTROY(frame->root);
    synctask_barrier_wake(args);
    return 0;
}

int32_t
gd_syncop_stage_op_cbk(struct rpc_req *req, struct iovec *iov, int count,
                       void *myframe)
{
    return glusterd_big_locked_cbk(req, iov, count, myframe,
                                   _gd_syncop_stage_op_cbk);
}

int
gd_syncop_mgmt_stage_op(glusterd_peerinfo_t *peerinfo, struct syncargs *args,
                        uuid_t my_uuid, uuid_t recv_uuid, int op,
                        dict_t *dict_out, dict_t *op_ctx)
{
    gd1_mgmt_stage_op_req *req = NULL;
    int ret = -1;
    uuid_t *peerid = NULL;

    req = GF_CALLOC(1, sizeof(*req), gf_gld_mt_mop_stage_req_t);
    if (!req)
        goto out;

    gf_uuid_copy(req->uuid, my_uuid);
    req->op = op;

    ret = dict_allocate_and_serialize(dict_out, &req->buf.buf_val,
                                      &req->buf.buf_len);
    if (ret)
        goto out;

    GD_ALLOC_COPY_UUID(peerid, peerinfo->uuid, ret);
    if (ret)
        goto out;

    ret = gd_syncop_submit_request(
        peerinfo->rpc, req, args, peerid, &gd_mgmt_prog, GLUSTERD_MGMT_STAGE_OP,
        gd_syncop_stage_op_cbk, (xdrproc_t)xdr_gd1_mgmt_stage_op_req);
out:
    gd_stage_op_req_free(req);
    return ret;
}

int32_t
_gd_syncop_brick_op_cbk(struct rpc_req *req, struct iovec *iov, int count,
                        void *myframe)
{
    struct syncargs *args = NULL;
    gd1_mgmt_brick_op_rsp rsp = {
        0,
    };
    int ret = -1;
    call_frame_t *frame = NULL;
    xlator_t *this = NULL;

    this = THIS;
    GF_ASSERT(this);

    frame = myframe;
    args = frame->local;
    frame->local = NULL;

    /* initialize */
    args->op_ret = -1;
    args->op_errno = EINVAL;

    if (-1 == req->rpc_status) {
        args->op_errno = ENOTCONN;
        goto out;
    }

    GF_VALIDATE_OR_GOTO_WITH_ERROR(this->name, iov, out, args->op_errno,
                                   EINVAL);

    ret = xdr_to_generic(*iov, &rsp, (xdrproc_t)xdr_gd1_mgmt_brick_op_rsp);
    if (ret < 0)
        goto out;

    if (rsp.output.output_len) {
        args->dict = dict_new();
        if (!args->dict) {
            ret = -1;
            args->op_errno = ENOMEM;
            goto out;
        }

        ret = dict_unserialize(rsp.output.output_val, rsp.output.output_len,
                               &args->dict);
        if (ret < 0)
            goto out;
    }

    args->op_ret = rsp.op_ret;
    args->op_errno = rsp.op_errno;
    args->errstr = gf_strdup(rsp.op_errstr);

out:
    if ((rsp.op_errstr) && (strcmp(rsp.op_errstr, "") != 0))
        free(rsp.op_errstr);
    free(rsp.output.output_val);

    /* req->rpc_status set to -1 means, STACK_DESTROY will be called from
     * the caller function.
     */
    if (req->rpc_status != -1)
        STACK_DESTROY(frame->root);
    __wake(args);

    return 0;
}

int32_t
gd_syncop_brick_op_cbk(struct rpc_req *req, struct iovec *iov, int count,
                       void *myframe)
{
    return glusterd_big_locked_cbk(req, iov, count, myframe,
                                   _gd_syncop_brick_op_cbk);
}

int
gd_syncop_mgmt_brick_op(struct rpc_clnt *rpc, glusterd_pending_node_t *pnode,
                        int op, dict_t *dict_out, dict_t *op_ctx, char **errstr)
{
    struct syncargs args = {
        0,
    };
    gd1_mgmt_brick_op_req *req = NULL;
    int ret = 0;
    xlator_t *this = NULL;

    this = THIS;
    args.op_ret = -1;
    args.op_errno = ENOTCONN;

    if ((pnode->type == GD_NODE_NFS) || (pnode->type == GD_NODE_QUOTAD) ||
        (pnode->type == GD_NODE_SCRUB) ||
        ((pnode->type == GD_NODE_SHD) && (op == GD_OP_STATUS_VOLUME))) {
        ret = glusterd_node_op_build_payload(op, &req, dict_out);

    } else {
        ret = glusterd_brick_op_build_payload(op, pnode->node, &req, dict_out);
    }

    if (ret)
        goto out;

    GD_SYNCOP(rpc, (&args), NULL, gd_syncop_brick_op_cbk, req, &gd_brick_prog,
              req->op, xdr_gd1_mgmt_brick_op_req);

    if (args.errstr) {
        if ((strlen(args.errstr) > 0) && errstr)
            *errstr = args.errstr;
        else
            GF_FREE(args.errstr);
    }

    if (GD_OP_STATUS_VOLUME == op) {
        ret = dict_set_int32(args.dict, "index", pnode->index);
        if (ret) {
            gf_msg(this->name, GF_LOG_ERROR, 0, GD_MSG_DICT_SET_FAILED,
                   "Error setting index on brick status"
                   " rsp dict");
            args.op_ret = -1;
            goto out;
        }
    }

    if (req->op == GLUSTERD_BRICK_TERMINATE) {
        if (args.op_ret && (args.op_errno == ENOTCONN)) {
            /*
             * This is actually OK.  It happens when the target
             * brick process exits and we saw the closed connection
             * before we read the response.  If we didn't read the
             * response quickly enough that's kind of our own
             * fault, and the fact that the process exited means
             * that our goal of terminating the brick was achieved.
             */
            args.op_ret = 0;
        }
    }

    if (args.op_ret == 0)
        glusterd_handle_node_rsp(dict_out, pnode->node, op, args.dict, op_ctx,
                                 errstr, pnode->type);

out:
    errno = args.op_errno;
    if (args.dict)
        dict_unref(args.dict);
    if (args.op_ret && errstr && (*errstr == NULL)) {
        if (op == GD_OP_HEAL_VOLUME) {
            gf_asprintf(errstr,
                        "Glusterd Syncop Mgmt brick op '%s' failed."
                        " Please check glustershd log file for details.",
                        gd_op_list[op]);
        } else {
            gf_asprintf(errstr,
                        "Glusterd Syncop Mgmt brick op '%s' failed."
                        " Please check brick log file for details.",
                        gd_op_list[op]);
        }
    }
    gd_brick_op_req_free(req);
    return args.op_ret;
}

int32_t
_gd_syncop_commit_op_cbk(struct rpc_req *req, struct iovec *iov, int count,
                         void *myframe)
{
    int ret = -1;
    gd1_mgmt_commit_op_rsp rsp = {
        {0},
    };
    struct syncargs *args = NULL;
    xlator_t *this = NULL;
    dict_t *rsp_dict = NULL;
    call_frame_t *frame = NULL;
    int op_ret = -1;
    int op_errno = -1;
    int type = GF_QUOTA_OPTION_TYPE_NONE;
    uuid_t *peerid = NULL;

    this = THIS;
    GF_ASSERT(this);

    frame = myframe;
    args = frame->local;
    peerid = frame->cookie;
    frame->local = NULL;
    frame->cookie = NULL;

    if (-1 == req->rpc_status) {
        op_errno = ENOTCONN;
        goto out;
    }

    GF_VALIDATE_OR_GOTO_WITH_ERROR(this->name, iov, out, op_errno, EINVAL);

    ret = xdr_to_generic(*iov, &rsp, (xdrproc_t)xdr_gd1_mgmt_commit_op_rsp);
    if (ret < 0) {
        goto out;
    }

    if (rsp.dict.dict_len) {
        /* Unserialize the dictionary */
        rsp_dict = dict_new();

        ret = dict_unserialize(rsp.dict.dict_val, rsp.dict.dict_len, &rsp_dict);
        if (ret < 0) {
            GF_FREE(rsp.dict.dict_val);
            goto out;
        } else {
            rsp_dict->extra_stdfree = rsp.dict.dict_val;
        }
    }

    RCU_READ_LOCK;
    ret = (glusterd_peerinfo_find(rsp.uuid, NULL) == 0);
    RCU_READ_UNLOCK;
    if (ret) {
        ret = -1;
        gf_msg(this->name, GF_LOG_CRITICAL, 0, GD_MSG_RESP_FROM_UNKNOWN_PEER,
               "Commit response "
               "for 'Volume %s' received from unknown "
               "peer: %s",
               gd_op_list[rsp.op], uuid_utoa(rsp.uuid));
        goto out;
    }

    gf_uuid_copy(args->uuid, rsp.uuid);
    if (rsp.op == GD_OP_QUOTA) {
        ret = dict_get_int32(args->dict, "type", &type);
        if (ret) {
            gf_msg(this->name, GF_LOG_ERROR, 0, GD_MSG_DICT_GET_FAILED,
                   "Failed to get "
                   "opcode");
            goto out;
        }
    }

    if ((rsp.op != GD_OP_QUOTA) || (type == GF_QUOTA_OPTION_TYPE_LIST)) {
        pthread_mutex_lock(&args->lock_dict);
        {
            ret = glusterd_syncop_aggr_rsp_dict(rsp.op, args->dict, rsp_dict);
            if (ret)
                gf_msg(this->name, GF_LOG_ERROR, 0, GD_MSG_RESP_AGGR_FAIL, "%s",
                       "Failed to aggregate response from "
                       " node/brick");
        }
        pthread_mutex_unlock(&args->lock_dict);
    }

    op_ret = rsp.op_ret;
    op_errno = rsp.op_errno;

out:
    gd_collate_errors(args, op_ret, op_errno, rsp.op_errstr,
                      GLUSTERD_MGMT_COMMIT_OP, *peerid, rsp.uuid);
    if (rsp_dict)
        dict_unref(rsp_dict);
    GF_FREE(peerid);
    /* req->rpc_status set to -1 means, STACK_DESTROY will be called from
     * the caller function.
     */
    if (req->rpc_status != -1)
        STACK_DESTROY(frame->root);
    synctask_barrier_wake(args);

    return 0;
}

int32_t
gd_syncop_commit_op_cbk(struct rpc_req *req, struct iovec *iov, int count,
                        void *myframe)
{
    return glusterd_big_locked_cbk(req, iov, count, myframe,
                                   _gd_syncop_commit_op_cbk);
}

int
gd_syncop_mgmt_commit_op(glusterd_peerinfo_t *peerinfo, struct syncargs *args,
                         uuid_t my_uuid, uuid_t recv_uuid, int op,
                         dict_t *dict_out, dict_t *op_ctx)
{
    gd1_mgmt_commit_op_req *req = NULL;
    int ret = -1;
    uuid_t *peerid = NULL;

    req = GF_CALLOC(1, sizeof(*req), gf_gld_mt_mop_commit_req_t);
    if (!req)
        goto out;

    gf_uuid_copy(req->uuid, my_uuid);
    req->op = op;

    ret = dict_allocate_and_serialize(dict_out, &req->buf.buf_val,
                                      &req->buf.buf_len);
    if (ret)
        goto out;

    GD_ALLOC_COPY_UUID(peerid, peerinfo->uuid, ret);
    if (ret)
        goto out;

    ret = gd_syncop_submit_request(peerinfo->rpc, req, args, peerid,
                                   &gd_mgmt_prog, GLUSTERD_MGMT_COMMIT_OP,
                                   gd_syncop_commit_op_cbk,
                                   (xdrproc_t)xdr_gd1_mgmt_commit_op_req);
out:
    gd_commit_op_req_free(req);
    return ret;
}

int
gd_lock_op_phase(glusterd_conf_t *conf, glusterd_op_t op, dict_t *op_ctx,
                 char **op_errstr, uuid_t txn_id,
                 glusterd_op_info_t *txn_opinfo, gf_boolean_t cluster_lock)
{
    int ret = -1;
    int peer_cnt = 0;
    uuid_t peer_uuid = {0};
    xlator_t *this = NULL;
    glusterd_peerinfo_t *peerinfo = NULL;
    struct syncargs args = {0};

    this = THIS;
    GF_VALIDATE_OR_GOTO("glusterd", this, out);

    ret = synctask_barrier_init((&args));
    if (ret)
        goto out;

    peer_cnt = 0;

    RCU_READ_LOCK;
    cds_list_for_each_entry_rcu(peerinfo, &conf->peers, uuid_list)
    {
        /* Only send requests to peers who were available before the
         * transaction started
         */
        if (peerinfo->generation > txn_opinfo->txn_generation)
            continue;

        if (!peerinfo->connected)
            continue;
        if (op != GD_OP_SYNC_VOLUME &&
            peerinfo->state.state != GD_FRIEND_STATE_BEFRIENDED)
            continue;

        if (cluster_lock) {
            /* Reset lock status */
            peerinfo->locked = _gf_false;
            gd_syncop_mgmt_lock(peerinfo, &args, MY_UUID, peer_uuid);
        } else
            gd_syncop_mgmt_v3_lock(op, op_ctx, peerinfo, &args, MY_UUID,
                                   peer_uuid, txn_id);
        peer_cnt++;
    }
    RCU_READ_UNLOCK;

    if (0 == peer_cnt) {
        ret = 0;
        goto out;
    }

    gd_synctask_barrier_wait((&args), peer_cnt);

    if (args.op_ret) {
        if (args.errstr)
            *op_errstr = gf_strdup(args.errstr);
        else {
            ret = gf_asprintf(op_errstr,
                              "Another transaction "
                              "could be in progress. Please try "
                              "again after some time.");
            if (ret == -1)
                *op_errstr = NULL;

            gf_msg(this->name, GF_LOG_ERROR, 0, GD_MSG_PEER_LOCK_FAIL,
                   "Failed to acquire lock");
        }
    }

    ret = args.op_ret;

    gf_msg_debug(this->name, 0,
                 "Sent lock op req for 'Volume %s' "
                 "to %d peers. Returning %d",
                 gd_op_list[op], peer_cnt, ret);
out:
    return ret;
}

int
gd_stage_op_phase(glusterd_op_t op, dict_t *op_ctx, dict_t *req_dict,
                  char **op_errstr, glusterd_op_info_t *txn_opinfo)
{
    int ret = -1;
    int peer_cnt = 0;
    dict_t *rsp_dict = NULL;
    char *hostname = NULL;
    xlator_t *this = NULL;
    glusterd_conf_t *conf = NULL;
    glusterd_peerinfo_t *peerinfo = NULL;
    uuid_t tmp_uuid = {0};
    char *errstr = NULL;
    struct syncargs args = {0};
    dict_t *aggr_dict = NULL;

    this = THIS;
    GF_ASSERT(this);
    conf = this->private;
    GF_ASSERT(conf);

    rsp_dict = dict_new();
    if (!rsp_dict)
        goto out;

    if ((op == GD_OP_CREATE_VOLUME) || (op == GD_OP_ADD_BRICK) ||
        (op == GD_OP_START_VOLUME))
        aggr_dict = req_dict;
    else
        aggr_dict = op_ctx;

    ret = glusterd_validate_quorum(this, op, req_dict, op_errstr);
    if (ret) {
        gf_msg(this->name, GF_LOG_CRITICAL, 0, GD_MSG_SERVER_QUORUM_NOT_MET,
               "Server quorum not met. Rejecting operation.");
        goto out;
    }

    ret = glusterd_op_stage_validate(op, req_dict, op_errstr, rsp_dict);
    if (ret) {
        hostname = "localhost";
        goto stage_done;
    }

    if ((op == GD_OP_REPLACE_BRICK || op == GD_OP_QUOTA ||
         op == GD_OP_CREATE_VOLUME || op == GD_OP_ADD_BRICK ||
         op == GD_OP_START_VOLUME)) {
        ret = glusterd_syncop_aggr_rsp_dict(op, aggr_dict, rsp_dict);
        if (ret) {
            gf_msg(this->name, GF_LOG_ERROR, 0, GD_MSG_RESP_AGGR_FAIL, "%s",
                   "Failed to aggregate response from node/brick");
            goto out;
        }
    }
    dict_unref(rsp_dict);
    rsp_dict = NULL;

stage_done:
    if (ret) {
        gf_msg(this->name, GF_LOG_ERROR, 0, GD_MSG_VALIDATE_FAILED,
               LOGSTR_STAGE_FAIL, gd_op_list[op], hostname,
               (*op_errstr) ? ":" : " ", (*op_errstr) ? *op_errstr : " ");
        if (*op_errstr == NULL)
            gf_asprintf(op_errstr, OPERRSTR_STAGE_FAIL, hostname);
        goto out;
    }

    gd_syncargs_init(&args, aggr_dict);
    ret = synctask_barrier_init((&args));
    if (ret)
        goto out;

    peer_cnt = 0;

    RCU_READ_LOCK;
    cds_list_for_each_entry_rcu(peerinfo, &conf->peers, uuid_list)
    {
        /* Only send requests to peers who were available before the
         * transaction started
         */
        if (peerinfo->generation > txn_opinfo->txn_generation)
            continue;

        if (!peerinfo->connected)
            continue;
        if (op != GD_OP_SYNC_VOLUME &&
            peerinfo->state.state != GD_FRIEND_STATE_BEFRIENDED)
            continue;

        (void)gd_syncop_mgmt_stage_op(peerinfo, &args, MY_UUID, tmp_uuid, op,
                                      req_dict, op_ctx);
        peer_cnt++;
    }
    RCU_READ_UNLOCK;

    if (0 == peer_cnt) {
        ret = 0;
        goto out;
    }

    gf_msg_debug(this->name, 0,
                 "Sent stage op req for 'Volume %s' "
                 "to %d peers",
                 gd_op_list[op], peer_cnt);

    gd_synctask_barrier_wait((&args), peer_cnt);

    if (args.errstr)
        *op_errstr = gf_strdup(args.errstr);
    else if (dict_get_str(aggr_dict, "errstr", &errstr) == 0)
        *op_errstr = gf_strdup(errstr);

    ret = args.op_ret;

out:
    if ((ret == 0) && (op == GD_OP_QUOTA)) {
        ret = glusterd_validate_and_set_gfid(op_ctx, req_dict, op_errstr);
        if (ret)
            gf_msg(this->name, GF_LOG_ERROR, 0, GD_MSG_GFID_VALIDATE_SET_FAIL,
                   "Failed to validate and set gfid");
    }

    if (rsp_dict)
        dict_unref(rsp_dict);
    return ret;
}

int
gd_commit_op_phase(glusterd_op_t op, dict_t *op_ctx, dict_t *req_dict,
                   char **op_errstr, glusterd_op_info_t *txn_opinfo)
{
    dict_t *rsp_dict = NULL;
    int peer_cnt = -1;
    int ret = -1;
    char *hostname = NULL;
    glusterd_peerinfo_t *peerinfo = NULL;
    xlator_t *this = NULL;
    glusterd_conf_t *conf = NULL;
    uuid_t tmp_uuid = {0};
    char *errstr = NULL;
    struct syncargs args = {0};
    int type = GF_QUOTA_OPTION_TYPE_NONE;
    uint32_t cmd = 0;
    gf_boolean_t origin_glusterd = _gf_false;

    this = THIS;
    GF_ASSERT(this);
    conf = this->private;
    GF_ASSERT(conf);

    rsp_dict = dict_new();
    if (!rsp_dict) {
        ret = -1;
        goto out;
    }

    ret = glusterd_op_commit_perform(op, req_dict, op_errstr, rsp_dict);
    if (ret) {
        hostname = "localhost";
        goto commit_done;
    }

    if (op == GD_OP_QUOTA) {
        ret = dict_get_int32(op_ctx, "type", &type);
        if (ret) {
            gf_msg(this->name, GF_LOG_ERROR, 0, GD_MSG_DICT_GET_FAILED,
                   "Failed to get "
                   "opcode");
            goto out;
        }
    }

    if (((op == GD_OP_QUOTA) &&
         ((type == GF_QUOTA_OPTION_TYPE_LIST) ||
          (type == GF_QUOTA_OPTION_TYPE_LIST_OBJECTS))) ||
        ((op != GD_OP_SYNC_VOLUME) && (op != GD_OP_QUOTA))) {
        ret = glusterd_syncop_aggr_rsp_dict(op, op_ctx, rsp_dict);
        if (ret) {
            gf_msg(this->name, GF_LOG_ERROR, 0, GD_MSG_RESP_AGGR_FAIL, "%s",
                   "Failed to aggregate "
                   "response from node/brick");
            goto out;
        }
    }

    dict_unref(rsp_dict);
    rsp_dict = NULL;

commit_done:
    if (ret) {
        gf_msg(this->name, GF_LOG_ERROR, 0, GD_MSG_COMMIT_OP_FAIL,
               LOGSTR_COMMIT_FAIL, gd_op_list[op], hostname,
               (*op_errstr) ? ":" : " ", (*op_errstr) ? *op_errstr : " ");
        if (*op_errstr == NULL)
            gf_asprintf(op_errstr, OPERRSTR_COMMIT_FAIL, hostname);
        goto out;
    }

    gd_syncargs_init(&args, op_ctx);
    ret = synctask_barrier_init((&args));
    if (ret)
        goto out;

    peer_cnt = 0;
    origin_glusterd = is_origin_glusterd(req_dict);

    if (op == GD_OP_STATUS_VOLUME) {
        ret = dict_get_uint32(req_dict, "cmd", &cmd);
        if (ret)
            goto out;

        if (origin_glusterd) {
            if ((cmd & GF_CLI_STATUS_ALL)) {
                ret = 0;
                goto out;
            }
        }
    }

    RCU_READ_LOCK;
    cds_list_for_each_entry_rcu(peerinfo, &conf->peers, uuid_list)
    {
        /* Only send requests to peers who were available before the
         * transaction started
         */
        if (peerinfo->generation > txn_opinfo->txn_generation)
            continue;

        if (!peerinfo->connected)
            continue;
        if (op != GD_OP_SYNC_VOLUME &&
            peerinfo->state.state != GD_FRIEND_STATE_BEFRIENDED)
            continue;

        (void)gd_syncop_mgmt_commit_op(peerinfo, &args, MY_UUID, tmp_uuid, op,
                                       req_dict, op_ctx);
        peer_cnt++;
    }
    RCU_READ_UNLOCK;

    if (0 == peer_cnt) {
        ret = 0;
        goto out;
    }

    gd_synctask_barrier_wait((&args), peer_cnt);
    ret = args.op_ret;
    if (args.errstr)
        *op_errstr = gf_strdup(args.errstr);
    else if (dict_get_str(op_ctx, "errstr", &errstr) == 0)
        *op_errstr = gf_strdup(errstr);

    gf_msg_debug(this->name, 0,
                 "Sent commit op req for 'Volume %s' "
                 "to %d peers",
                 gd_op_list[op], peer_cnt);
out:
    if (!ret)
        glusterd_op_modify_op_ctx(op, op_ctx);

    if (rsp_dict)
        dict_unref(rsp_dict);

    GF_FREE(args.errstr);
    args.errstr = NULL;

    return ret;
}

int
gd_unlock_op_phase(glusterd_conf_t *conf, glusterd_op_t op, int *op_ret,
                   rpcsvc_request_t *req, dict_t *op_ctx, char *op_errstr,
                   char *volname, gf_boolean_t is_acquired, uuid_t txn_id,
                   glusterd_op_info_t *txn_opinfo, gf_boolean_t cluster_lock)
{
    glusterd_peerinfo_t *peerinfo = NULL;
    uuid_t tmp_uuid = {0};
    int peer_cnt = 0;
    int ret = -1;
    xlator_t *this = NULL;
    struct syncargs args = {0};
    int32_t global = 0;
    char *type = NULL;

    this = THIS;
    GF_ASSERT(this);

    /* If the lock has not been held during this
     * transaction, do not send unlock requests */
    if (!is_acquired) {
        ret = 0;
        goto out;
    }

    ret = synctask_barrier_init((&args));
    if (ret)
        goto out;

    peer_cnt = 0;

    if (cluster_lock) {
        RCU_READ_LOCK;
        cds_list_for_each_entry_rcu(peerinfo, &conf->peers, uuid_list)
        {
            /* Only send requests to peers who were available before
             * the transaction started
             */
            if (peerinfo->generation > txn_opinfo->txn_generation)
                continue;

            if (!peerinfo->connected)
                continue;
            if (op != GD_OP_SYNC_VOLUME &&
                peerinfo->state.state != GD_FRIEND_STATE_BEFRIENDED)
                continue;

            /* Only unlock peers that were locked */
            if (peerinfo->locked) {
                gd_syncop_mgmt_unlock(peerinfo, &args, MY_UUID, tmp_uuid);
                peer_cnt++;
            }
        }
        RCU_READ_UNLOCK;
    } else {
        ret = dict_get_int32(op_ctx, "hold_global_locks", &global);
        if (!ret && global)
            type = "global";
        else
            type = "vol";
        if (volname || global) {
            RCU_READ_LOCK;
            cds_list_for_each_entry_rcu(peerinfo, &conf->peers, uuid_list)
            {
                /* Only send requests to peers who were
                 * available before the transaction started
                 */
                if (peerinfo->generation > txn_opinfo->txn_generation)
                    continue;

                if (!peerinfo->connected)
                    continue;
                if (op != GD_OP_SYNC_VOLUME &&
                    peerinfo->state.state != GD_FRIEND_STATE_BEFRIENDED)
                    continue;

                gd_syncop_mgmt_v3_unlock(op_ctx, peerinfo, &args, MY_UUID,
                                         tmp_uuid, txn_id);
                peer_cnt++;
            }
            RCU_READ_UNLOCK;
        }
    }

    if (0 == peer_cnt) {
        ret = 0;
        goto out;
    }

    gd_synctask_barrier_wait((&args), peer_cnt);

    ret = args.op_ret;

    gf_msg_debug(this->name, 0,
                 "Sent unlock op req for 'Volume %s' "
                 "to %d peers. Returning %d",
                 gd_op_list[op], peer_cnt, ret);
    if (ret) {
        gf_msg(this->name, GF_LOG_ERROR, 0, GD_MSG_PEER_UNLOCK_FAIL,
               "Failed to unlock "
               "on some peer(s)");
    }

out:
    /* If unlock failed, and op_ret was previously set
     * priority is given to the op_ret. If op_ret was
     * not set, and unlock failed, then set op_ret */
    if (!*op_ret)
        *op_ret = ret;

    if (is_acquired) {
        /* Based on the op-version,
         * we release the cluster or mgmt_v3 lock
         * and clear the op */

        glusterd_op_clear_op(op);
        if (cluster_lock)
            glusterd_unlock(MY_UUID);
        else {
            if (type) {
                ret = glusterd_mgmt_v3_unlock(volname, MY_UUID, type);
                if (ret)
                    gf_msg(this->name, GF_LOG_ERROR, 0,
                           GD_MSG_MGMTV3_UNLOCK_FAIL,
                           "Unable to release lock for %s", volname);
            }
        }
    }

    if (!*op_ret)
        *op_ret = ret;

    /*
     * If there are any quorum events while the OP is in progress, process
     * them.
     */
    if (conf->pending_quorum_action)
        glusterd_do_quorum_action();

    return 0;
}

int
gd_get_brick_count(struct cds_list_head *bricks)
{
    glusterd_pending_node_t *pending_node = NULL;
    int npeers = 0;
    cds_list_for_each_entry(pending_node, bricks, list) { npeers++; }
    return npeers;
}

int
gd_brick_op_phase(glusterd_op_t op, dict_t *op_ctx, dict_t *req_dict,
                  char **op_errstr)
{
    glusterd_pending_node_t *pending_node = NULL;
    glusterd_pending_node_t *tmp = NULL;
    struct cds_list_head selected = {
        0,
    };
    xlator_t *this = NULL;
    int brick_count = 0;
    int ret = -1;
    rpc_clnt_t *rpc = NULL;
    dict_t *rsp_dict = NULL;
    int32_t cmd = GF_OP_CMD_NONE;

    this = THIS;
    rsp_dict = dict_new();
    if (!rsp_dict) {
        ret = -1;
        goto out;
    }

    CDS_INIT_LIST_HEAD(&selected);
    ret = glusterd_op_bricks_select(op, req_dict, op_errstr, &selected,
                                    rsp_dict);
    if (ret) {
        gf_msg(this->name, GF_LOG_ERROR, 0, GD_MSG_BRICK_OP_FAIL, "%s",
               (*op_errstr) ? *op_errstr
                            : "Brick op failed. Check "
                              "glusterd log file for more details.");
        goto out;
    }

    if (op == GD_OP_HEAL_VOLUME) {
        ret = glusterd_syncop_aggr_rsp_dict(op, op_ctx, rsp_dict);
        if (ret)
            goto out;
    }
    dict_unref(rsp_dict);
    rsp_dict = NULL;

    brick_count = 0;
    cds_list_for_each_entry_safe(pending_node, tmp, &selected, list)
    {
        rpc = glusterd_pending_node_get_rpc(pending_node);
        if (!rpc) {
            if (pending_node->type == GD_NODE_REBALANCE) {
                ret = 0;
                glusterd_defrag_volume_node_rsp(req_dict, NULL, op_ctx);
                goto out;
            }

            ret = -1;
            gf_msg(this->name, GF_LOG_ERROR, 0, GD_MSG_RPC_FAILURE,
                   "Brick Op failed "
                   "due to rpc failure.");
            goto out;
        }

        /* Redirect operation to be detach tier via rebalance flow. */
        ret = dict_get_int32(req_dict, "command", &cmd);
        if (!ret) {
            if (cmd == GF_OP_CMD_DETACH_START) {
                /* this change is left to support backward
                 * compatibility. */
                op = GD_OP_REBALANCE;
                ret = dict_set_int32(req_dict, "rebalance-command",
                                     GF_DEFRAG_CMD_START_DETACH_TIER);
            } else if (cmd == GF_DEFRAG_CMD_DETACH_START) {
                op = GD_OP_REMOVE_TIER_BRICK;
                ret = dict_set_int32(req_dict, "rebalance-command",
                                     GF_DEFRAG_CMD_DETACH_START);
            }
            if (ret)
                goto out;
        }
        ret = gd_syncop_mgmt_brick_op(rpc, pending_node, op, req_dict, op_ctx,
                                      op_errstr);
        if (op == GD_OP_STATUS_VOLUME) {
            /* for client-list its enough to quit the loop
             * once we get the value from one brick
             * */
            ret = dict_get_int32(req_dict, "cmd", &cmd);
            if (!ret && (cmd & GF_CLI_STATUS_CLIENT_LIST)) {
                if (dict_get(op_ctx, "client-count"))
                    break;
            }
            /* coverity[MIXED_ENUMS] */
        } else if (cmd == GF_OP_CMD_DETACH_START) {
            op = GD_OP_REMOVE_BRICK;
            dict_del(req_dict, "rebalance-command");
        } else if (cmd == GF_DEFRAG_CMD_DETACH_START) {
            op = GD_OP_REMOVE_TIER_BRICK;
            dict_del(req_dict, "rebalance-command");
        }
        if (ret)
            goto out;

        brick_count++;
        glusterd_pending_node_put_rpc(pending_node);
        GF_FREE(pending_node);
    }

    pending_node = NULL;
    ret = 0;
out:
    if (pending_node)
        glusterd_pending_node_put_rpc(pending_node);

    if (rsp_dict)
        dict_unref(rsp_dict);
    gf_msg_debug(this->name, 0, "Sent op req to %d bricks", brick_count);
    return ret;
}

void
gd_sync_task_begin(dict_t *op_ctx, rpcsvc_request_t *req)
{
    int ret = -1;
    int op_ret = -1;
    dict_t *req_dict = NULL;
    glusterd_conf_t *conf = NULL;
    glusterd_op_t op = GD_OP_NONE;
    int32_t tmp_op = 0;
    char *op_errstr = NULL;
    char *tmp = NULL;
    char *global = NULL;
    char *volname = NULL;
    xlator_t *this = NULL;
    gf_boolean_t is_acquired = _gf_false;
    gf_boolean_t is_global = _gf_false;
    uuid_t *txn_id = NULL;
    glusterd_op_info_t txn_opinfo = {
        {0},
    };
    uint32_t op_errno = 0;
    gf_boolean_t cluster_lock = _gf_false;
    uint32_t timeout = 0;

    this = THIS;
    GF_ASSERT(this);
    conf = this->private;
    GF_ASSERT(conf);

    ret = dict_get_int32(op_ctx, GD_SYNC_OPCODE_KEY, &tmp_op);
    if (ret) {
        gf_msg(this->name, GF_LOG_ERROR, 0, GD_MSG_DICT_GET_FAILED,
               "Failed to get volume "
               "operation");
        goto out;
    }
    op = tmp_op;

    /* Generate a transaction-id for this operation and
     * save it in the dict */
    ret = glusterd_generate_txn_id(op_ctx, &txn_id);
    if (ret) {
        gf_msg(this->name, GF_LOG_ERROR, 0, GD_MSG_TRANS_IDGEN_FAIL,
               "Failed to generate transaction id");
        goto out;
    }

    /* Save opinfo for this transaction with the transaction id */
    glusterd_txn_opinfo_init(&txn_opinfo, NULL, &op, NULL, NULL);
    ret = glusterd_set_txn_opinfo(txn_id, &txn_opinfo);
    if (ret)
        gf_msg(this->name, GF_LOG_ERROR, 0, GD_MSG_TRANS_OPINFO_SET_FAIL,
               "Unable to set transaction's opinfo");

    gf_msg_debug(this->name, 0, "Transaction ID : %s", uuid_utoa(*txn_id));

    /* Save the MY_UUID as the originator_uuid */
    ret = glusterd_set_originator_uuid(op_ctx);
    if (ret) {
        gf_msg(this->name, GF_LOG_ERROR, 0, GD_MSG_UUID_SET_FAIL,
               "Failed to set originator_uuid.");
        goto out;
    }

    if (conf->op_version < GD_OP_VERSION_RHS_3_0)
        cluster_lock = _gf_true;

    /* Based on the op_version, acquire a cluster or mgmt_v3 lock */
    if (cluster_lock) {
        ret = glusterd_lock(MY_UUID);
        if (ret) {
            gf_msg(this->name, GF_LOG_ERROR, 0, GD_MSG_GLUSTERD_LOCK_FAIL,
                   "Unable to acquire lock");
            gf_asprintf(&op_errstr,
                        "Another transaction is in progress. "
                        "Please try again after some time.");
            goto out;
        }
    } else {
        /* Cli will add timeout key to dict if the default timeout is
         * other than 2 minutes. Here we use this value to check whether
         * mgmt_v3_lock_timeout should be set to default value or we
         * need to change the value according to timeout value
         * i.e, timeout + 120 seconds. */
        ret = dict_get_uint32(op_ctx, "timeout", &timeout);
        if (!ret)
            conf->mgmt_v3_lock_timeout = timeout + 120;

        ret = dict_get_str(op_ctx, "globalname", &global);
        if (!ret) {
            is_global = _gf_true;
            goto global;
        }

        /* If no volname is given as a part of the command, locks will
         * not be held */
        ret = dict_get_str(op_ctx, "volname", &tmp);
        if (ret) {
            gf_msg_debug("glusterd", 0,
                         "Failed to get volume "
                         "name");
            goto local_locking_done;
        } else {
            /* Use a copy of volname, as cli response will be
             * sent before the unlock, and the volname in the
             * dict, might be removed */
            volname = gf_strdup(tmp);
            if (!volname)
                goto out;
        }

        ret = glusterd_mgmt_v3_lock(volname, MY_UUID, &op_errno, "vol");
        if (ret) {
            gf_msg(this->name, GF_LOG_ERROR, 0, GD_MSG_MGMTV3_LOCK_GET_FAIL,
                   "Unable to acquire lock for %s", volname);
            gf_asprintf(&op_errstr,
                        "Another transaction is in progress "
                        "for %s. Please try again after some time.",
                        volname);
            goto out;
        }
    }

global:
    if (is_global) {
        ret = glusterd_mgmt_v3_lock(global, MY_UUID, &op_errno, "global");
        if (ret) {
            gf_msg(this->name, GF_LOG_ERROR, 0, GD_MSG_MGMTV3_LOCK_GET_FAIL,
                   "Unable to acquire lock for %s", global);
            gf_asprintf(&op_errstr,
                        "Another transaction is in progress "
                        "for %s. Please try again after some time.",
                        global);
            is_global = _gf_false;
            goto out;
        }
    }

    is_acquired = _gf_true;

local_locking_done:

    /* If no volname is given as a part of the command, locks will
     * not be held */
    if (volname || cluster_lock || is_global) {
        ret = gd_lock_op_phase(conf, op, op_ctx, &op_errstr, *txn_id,
                               &txn_opinfo, cluster_lock);
        if (ret) {
            gf_msg(this->name, GF_LOG_ERROR, 0, GD_MSG_PEER_LOCK_FAIL,
                   "Locking Peers Failed.");
            goto out;
        }
    }

    ret = glusterd_op_build_payload(&req_dict, &op_errstr, op_ctx);
    if (ret) {
        gf_msg(this->name, GF_LOG_ERROR, 0, GD_MSG_BRICK_OP_PAYLOAD_BUILD_FAIL,
               LOGSTR_BUILD_PAYLOAD, gd_op_list[op]);
        if (op_errstr == NULL)
            gf_asprintf(&op_errstr, OPERRSTR_BUILD_PAYLOAD);
        goto out;
    }

    ret = gd_stage_op_phase(op, op_ctx, req_dict, &op_errstr, &txn_opinfo);
    if (ret)
        goto out;

    ret = gd_brick_op_phase(op, op_ctx, req_dict, &op_errstr);
    if (ret)
        goto out;

    ret = gd_commit_op_phase(op, op_ctx, req_dict, &op_errstr, &txn_opinfo);
    if (ret)
        goto out;

    ret = 0;
out:
    op_ret = ret;
    if (txn_id) {
        if (global)
            (void)gd_unlock_op_phase(conf, op, &op_ret, req, op_ctx, op_errstr,
                                     global, is_acquired, *txn_id, &txn_opinfo,
                                     cluster_lock);
        else
            (void)gd_unlock_op_phase(conf, op, &op_ret, req, op_ctx, op_errstr,
                                     volname, is_acquired, *txn_id, &txn_opinfo,
                                     cluster_lock);

        /* Clearing the transaction opinfo */
        ret = glusterd_clear_txn_opinfo(txn_id);
        if (ret)
            gf_msg(this->name, GF_LOG_ERROR, 0, GD_MSG_TRANS_OPINFO_CLEAR_FAIL,
                   "Unable to clear transaction's "
                   "opinfo for transaction ID : %s",
                   uuid_utoa(*txn_id));
    }

    if (op_ret && (op_errno == 0))
        op_errno = EG_INTRNL;

    glusterd_op_send_cli_response(op, op_ret, op_errno, req, op_ctx, op_errstr);

    if (volname)
        GF_FREE(volname);

    if (req_dict)
        dict_unref(req_dict);

    if (op_errstr) {
        GF_FREE(op_errstr);
        op_errstr = NULL;
    }

    return;
}

int32_t
glusterd_op_begin_synctask(rpcsvc_request_t *req, glusterd_op_t op, void *dict)
{
    int ret = 0;

    ret = dict_set_int32(dict, GD_SYNC_OPCODE_KEY, op);
    if (ret) {
        gf_msg(THIS->name, GF_LOG_ERROR, 0, GD_MSG_DICT_GET_FAILED,
               "dict set failed for setting operations");
        goto out;
    }

    gd_sync_task_begin(dict, req);
    ret = 0;
out:

    return ret;
}