Blob Blame History Raw
/*
  Copyright (c) 2008-2012 Red Hat, Inc. <http://www.redhat.com>
  This file is part of GlusterFS.

  This file is licensed to you under your choice of the GNU Lesser
  General Public License, version 3 or any later version (LGPLv3 or
  later), or the GNU General Public License, version 2 (GPLv2), in all
  cases as published by the Free Software Foundation.
*/

#include <glusterfs/fd-lk.h>
#include "client.h"
#include <glusterfs/xlator.h>
#include <glusterfs/defaults.h>
#include <glusterfs/glusterfs.h>
#include <glusterfs/statedump.h>
#include <glusterfs/compat-errno.h>

#include "glusterfs3.h"
#include "portmap-xdr.h"
#include "rpc-common-xdr.h"
#include "client-messages.h"
#include "xdr-rpc.h"

#define CLIENT_REOPEN_MAX_ATTEMPTS 1024
extern rpc_clnt_prog_t clnt3_3_fop_prog;
extern rpc_clnt_prog_t clnt4_0_fop_prog;
extern rpc_clnt_prog_t clnt_pmap_prog;

typedef struct client_fd_lk_local {
    gf_atomic_t ref;
    gf_boolean_t error;
    gf_lock_t lock;
    clnt_fd_ctx_t *fdctx;
} clnt_fd_lk_local_t;


int32_t
client3_getspec(call_frame_t *frame, xlator_t *this, void *data)
{
    CLIENT_STACK_UNWIND(getspec, frame, -1, ENOSYS, NULL);
    return 0;
}

int
client_notify_parents_child_up(xlator_t *this)
{
    clnt_conf_t *conf = NULL;
    int ret = 0;

    GF_VALIDATE_OR_GOTO("client", this, out);
    conf = this->private;
    GF_VALIDATE_OR_GOTO(this->name, conf, out);

    if (conf->child_up) {
        ret = client_notify_dispatch_uniq(this, GF_EVENT_CHILD_UP, NULL);
        if (ret) {
            gf_msg(this->name, GF_LOG_INFO, 0, PC_MSG_CHILD_UP_NOTIFY_FAILED,
                   "notify of CHILD_UP failed");
            goto out;
        }
    } else {
        gf_msg(this->name, GF_LOG_INFO, 0, PC_MSG_CHILD_STATUS,
               "Defering sending CHILD_UP message as the client "
               "translators are not yet ready to serve.");
    }

out:
    return 0;
}

int
clnt_fd_lk_reacquire_failed(xlator_t *this, clnt_fd_ctx_t *fdctx,
                            clnt_conf_t *conf)
{
    int ret = -1;

    GF_VALIDATE_OR_GOTO("client", this, out);
    GF_VALIDATE_OR_GOTO(this->name, conf, out);
    GF_VALIDATE_OR_GOTO(this->name, fdctx, out);

    pthread_spin_lock(&conf->fd_lock);
    {
        fdctx->remote_fd = -1;
    }
    pthread_spin_unlock(&conf->fd_lock);

    ret = 0;
out:
    return ret;
}

int
client_fd_lk_count(fd_lk_ctx_t *lk_ctx)
{
    int count = 0;
    fd_lk_ctx_node_t *fd_lk = NULL;

    GF_VALIDATE_OR_GOTO("client", lk_ctx, err);

    LOCK(&lk_ctx->lock);
    {
        list_for_each_entry(fd_lk, &lk_ctx->lk_list, next) count++;
    }
    UNLOCK(&lk_ctx->lock);

    return count;
err:
    return -1;
}

clnt_fd_lk_local_t *
clnt_fd_lk_local_ref(xlator_t *this, clnt_fd_lk_local_t *local)
{
    GF_VALIDATE_OR_GOTO(this->name, local, out);

    GF_ATOMIC_INC(local->ref);
out:
    return local;
}

int
clnt_fd_lk_local_unref(xlator_t *this, clnt_fd_lk_local_t *local)
{
    int ref = -1;

    GF_VALIDATE_OR_GOTO(this->name, local, out);

    ref = GF_ATOMIC_DEC(local->ref);

    if (ref == 0) {
        LOCK_DESTROY(&local->lock);
        GF_FREE(local);
    }
out:
    return ref;
}

clnt_fd_lk_local_t *
clnt_fd_lk_local_create(clnt_fd_ctx_t *fdctx)
{
    clnt_fd_lk_local_t *local = NULL;

    local = GF_CALLOC(1, sizeof(clnt_fd_lk_local_t),
                      gf_client_mt_clnt_fd_lk_local_t);
    if (!local)
        goto out;

    GF_ATOMIC_INIT(local->ref, 1);
    local->error = _gf_false;
    local->fdctx = fdctx;

    LOCK_INIT(&local->lock);
out:
    return local;
}

int
clnt_release_reopen_fd_cbk(struct rpc_req *req, struct iovec *iov, int count,
                           void *myframe)
{
    xlator_t *this = NULL;
    call_frame_t *frame = NULL;
    clnt_conf_t *conf = NULL;
    clnt_fd_ctx_t *fdctx = NULL;

    frame = myframe;
    this = frame->this;
    fdctx = (clnt_fd_ctx_t *)frame->local;
    conf = (clnt_conf_t *)this->private;

    clnt_fd_lk_reacquire_failed(this, fdctx, conf);

    fdctx->reopen_done(fdctx, fdctx->remote_fd, this);

    frame->local = NULL;
    STACK_DESTROY(frame->root);

    return 0;
}

int
clnt_release_reopen_fd(xlator_t *this, clnt_fd_ctx_t *fdctx)
{
    int ret = -1;
    clnt_conf_t *conf = NULL;
    call_frame_t *frame = NULL;
    gfs3_release_req req = {
        {
            0,
        },
    };

    conf = (clnt_conf_t *)this->private;

    frame = create_frame(this, this->ctx->pool);
    if (!frame)
        goto out;

    frame->local = (void *)fdctx;
    req.fd = fdctx->remote_fd;

    ret = client_submit_request(this, &req, frame, conf->fops, GFS3_OP_RELEASE,
                                clnt_release_reopen_fd_cbk, NULL, NULL, 0, NULL,
                                0, NULL, (xdrproc_t)xdr_gfs3_releasedir_req);
out:
    if (ret) {
        clnt_fd_lk_reacquire_failed(this, fdctx, conf);
        fdctx->reopen_done(fdctx, fdctx->remote_fd, this);
    }
    return 0;
}

int
clnt_reacquire_lock_error(xlator_t *this, clnt_fd_ctx_t *fdctx,
                          clnt_conf_t *conf)
{
    int32_t ret = -1;

    GF_VALIDATE_OR_GOTO("client", this, out);
    GF_VALIDATE_OR_GOTO(this->name, fdctx, out);
    GF_VALIDATE_OR_GOTO(this->name, conf, out);

    clnt_release_reopen_fd(this, fdctx);

    ret = 0;
out:
    return ret;
}

gf_boolean_t
clnt_fd_lk_local_error_status(xlator_t *this, clnt_fd_lk_local_t *local)
{
    gf_boolean_t error = _gf_false;

    LOCK(&local->lock);
    {
        error = local->error;
    }
    UNLOCK(&local->lock);

    return error;
}

int
clnt_fd_lk_local_mark_error(xlator_t *this, clnt_fd_lk_local_t *local)
{
    int32_t ret = -1;
    clnt_conf_t *conf = NULL;
    gf_boolean_t error = _gf_false;

    GF_VALIDATE_OR_GOTO("client", this, out);
    GF_VALIDATE_OR_GOTO(this->name, local, out);

    conf = (clnt_conf_t *)this->private;

    LOCK(&local->lock);
    {
        error = local->error;
        local->error = _gf_true;
    }
    UNLOCK(&local->lock);

    if (!error)
        clnt_reacquire_lock_error(this, local->fdctx, conf);
    ret = 0;
out:
    return ret;
}

void
client_default_reopen_done(clnt_fd_ctx_t *fdctx, int64_t rfd, xlator_t *this)
{
    gf_log_callingfn(this->name, GF_LOG_WARNING,
                     "This function should never be called");
}

void
client_reopen_done(clnt_fd_ctx_t *fdctx, int64_t rfd, xlator_t *this)
{
    clnt_conf_t *conf = NULL;
    gf_boolean_t destroy = _gf_false;

    conf = this->private;

    pthread_spin_lock(&conf->fd_lock);
    {
        fdctx->remote_fd = rfd;
        fdctx->reopen_attempts = 0;
        fdctx->reopen_done = client_default_reopen_done;
        if (!fdctx->released)
            list_add_tail(&fdctx->sfd_pos, &conf->saved_fds);
        else
            destroy = _gf_true;
    }
    pthread_spin_unlock(&conf->fd_lock);

    if (destroy)
        client_fdctx_destroy(this, fdctx);
}

void
client_child_up_reopen_done(clnt_fd_ctx_t *fdctx, int64_t rfd, xlator_t *this)
{
    clnt_conf_t *conf = NULL;
    uint64_t fd_count = 0;

    conf = this->private;

    LOCK(&conf->rec_lock);
    {
        fd_count = --(conf->reopen_fd_count);
    }
    UNLOCK(&conf->rec_lock);

    client_reopen_done(fdctx, rfd, this);
    if (fd_count == 0) {
        gf_msg(this->name, GF_LOG_INFO, 0, PC_MSG_CHILD_UP_NOTIFY,
               "last fd open'd/lock-self-heal'd - notifying CHILD-UP");
        client_notify_parents_child_up(this);
    }
}

int
client3_3_reopen_cbk(struct rpc_req *req, struct iovec *iov, int count,
                     void *myframe)
{
    int32_t ret = -1;
    gfs3_open_rsp rsp = {
        0,
    };
    clnt_local_t *local = NULL;
    clnt_fd_ctx_t *fdctx = NULL;
    call_frame_t *frame = NULL;
    xlator_t *this = NULL;

    frame = myframe;
    this = frame->this;
    local = frame->local;
    fdctx = local->fdctx;

    if (-1 == req->rpc_status) {
        gf_msg(frame->this->name, GF_LOG_WARNING, ENOTCONN,
               PC_MSG_RPC_STATUS_ERROR,
               "received RPC status error, "
               "returning ENOTCONN");
        rsp.op_ret = -1;
        rsp.op_errno = ENOTCONN;
        goto out;
    }

    ret = xdr_to_generic(*iov, &rsp, (xdrproc_t)xdr_gfs3_open_rsp);
    if (ret < 0) {
        gf_msg(frame->this->name, GF_LOG_ERROR, EINVAL,
               PC_MSG_XDR_DECODING_FAILED, "XDR decoding failed");
        rsp.op_ret = -1;
        rsp.op_errno = EINVAL;
        goto out;
    }

    if (rsp.op_ret < 0) {
        gf_msg(frame->this->name, GF_LOG_WARNING, rsp.op_errno,
               PC_MSG_DIR_OP_SUCCESS, "reopen on %s failed.", local->loc.path);
    } else {
        gf_msg_debug(frame->this->name, 0,
                     "reopen on %s succeeded (remote-fd = %" PRId64 ")",
                     local->loc.path, rsp.fd);
    }

    if (rsp.op_ret == -1) {
        ret = -1;
        goto out;
    }

    ret = 0;

out:
    fdctx->reopen_done(fdctx, (rsp.op_ret) ? -1 : rsp.fd, this);

    frame->local = NULL;
    STACK_DESTROY(frame->root);

    client_local_wipe(local);

    return 0;
}

int
client3_3_reopendir_cbk(struct rpc_req *req, struct iovec *iov, int count,
                        void *myframe)
{
    int32_t ret = -1;
    gfs3_open_rsp rsp = {
        0,
    };
    clnt_local_t *local = NULL;
    clnt_fd_ctx_t *fdctx = NULL;
    call_frame_t *frame = NULL;

    frame = myframe;
    local = frame->local;
    fdctx = local->fdctx;

    if (-1 == req->rpc_status) {
        gf_msg(frame->this->name, GF_LOG_WARNING, ENOTCONN,
               PC_MSG_RPC_STATUS_ERROR,
               "received RPC status error, "
               "returning ENOTCONN");
        rsp.op_ret = -1;
        rsp.op_errno = ENOTCONN;
        goto out;
    }

    ret = xdr_to_generic(*iov, &rsp, (xdrproc_t)xdr_gfs3_opendir_rsp);
    if (ret < 0) {
        gf_msg(frame->this->name, GF_LOG_ERROR, EINVAL,
               PC_MSG_XDR_DECODING_FAILED, "XDR decoding failed");
        rsp.op_ret = -1;
        rsp.op_errno = EINVAL;
        goto out;
    }

    if (rsp.op_ret < 0) {
        gf_msg(frame->this->name, GF_LOG_WARNING, rsp.op_errno,
               PC_MSG_DIR_OP_FAILED, "reopendir on %s failed", local->loc.path);
    } else {
        gf_msg(frame->this->name, GF_LOG_INFO, 0, PC_MSG_DIR_OP_SUCCESS,
               "reopendir on %s succeeded "
               "(fd = %" PRId64 ")",
               local->loc.path, rsp.fd);
    }

    if (-1 == rsp.op_ret) {
        ret = -1;
        goto out;
    }

out:
    fdctx->reopen_done(fdctx, (rsp.op_ret) ? -1 : rsp.fd, frame->this);

    frame->local = NULL;
    STACK_DESTROY(frame->root);
    client_local_wipe(local);

    return 0;
}

static int
protocol_client_reopendir(clnt_fd_ctx_t *fdctx, xlator_t *this)
{
    int ret = -1;
    gfs3_opendir_req req = {
        {
            0,
        },
    };
    clnt_local_t *local = NULL;
    call_frame_t *frame = NULL;
    clnt_conf_t *conf = NULL;

    conf = this->private;

    local = mem_get0(this->local_pool);
    if (!local) {
        ret = -1;
        goto out;
    }
    local->fdctx = fdctx;

    gf_uuid_copy(local->loc.gfid, fdctx->gfid);
    ret = loc_path(&local->loc, NULL);
    if (ret < 0)
        goto out;

    frame = create_frame(this, this->ctx->pool);
    if (!frame) {
        ret = -1;
        goto out;
    }

    memcpy(req.gfid, fdctx->gfid, 16);

    gf_msg_debug(frame->this->name, 0, "attempting reopen on %s",
                 local->loc.path);

    frame->local = local;

    ret = client_submit_request(this, &req, frame, conf->fops, GFS3_OP_OPENDIR,
                                client3_3_reopendir_cbk, NULL, NULL, 0, NULL, 0,
                                NULL, (xdrproc_t)xdr_gfs3_opendir_req);
    if (ret) {
        gf_msg(this->name, GF_LOG_ERROR, 0, PC_MSG_DIR_OP_FAILED,
               "failed to send the re-opendir request");
    }

    return 0;

out:
    if (local)
        client_local_wipe(local);

    fdctx->reopen_done(fdctx, fdctx->remote_fd, this);

    return 0;
}

static int
protocol_client_reopenfile(clnt_fd_ctx_t *fdctx, xlator_t *this)
{
    int ret = -1;
    gfs3_open_req req = {
        {
            0,
        },
    };
    clnt_local_t *local = NULL;
    call_frame_t *frame = NULL;
    clnt_conf_t *conf = NULL;

    conf = this->private;

    frame = create_frame(this, this->ctx->pool);
    if (!frame) {
        ret = -1;
        goto out;
    }

    local = mem_get0(this->local_pool);
    if (!local) {
        ret = -1;
        goto out;
    }

    local->fdctx = fdctx;
    gf_uuid_copy(local->loc.gfid, fdctx->gfid);
    ret = loc_path(&local->loc, NULL);
    if (ret < 0)
        goto out;

    frame->local = local;

    memcpy(req.gfid, fdctx->gfid, 16);
    req.flags = gf_flags_from_flags(fdctx->flags);
    req.flags = req.flags & (~(O_TRUNC | O_CREAT | O_EXCL));

    gf_msg_debug(frame->this->name, 0, "attempting reopen on %s",
                 local->loc.path);

    ret = client_submit_request(this, &req, frame, conf->fops, GFS3_OP_OPEN,
                                client3_3_reopen_cbk, NULL, NULL, 0, NULL, 0,
                                NULL, (xdrproc_t)xdr_gfs3_open_req);
    if (ret) {
        gf_msg(this->name, GF_LOG_ERROR, 0, PC_MSG_DIR_OP_FAILED,
               "failed to send the re-open request");
    }

    return 0;

out:
    if (frame) {
        frame->local = NULL;
        STACK_DESTROY(frame->root);
    }

    if (local)
        client_local_wipe(local);

    fdctx->reopen_done(fdctx, fdctx->remote_fd, this);

    return 0;
}

static void
protocol_client_reopen(clnt_fd_ctx_t *fdctx, xlator_t *this)
{
    if (fdctx->is_dir)
        protocol_client_reopendir(fdctx, this);
    else
        protocol_client_reopenfile(fdctx, this);
}

/* v4.x +  */
int
client4_0_reopen_cbk(struct rpc_req *req, struct iovec *iov, int count,
                     void *myframe)
{
    int32_t ret = -1;
    gfx_open_rsp rsp = {
        0,
    };
    clnt_local_t *local = NULL;
    clnt_fd_ctx_t *fdctx = NULL;
    call_frame_t *frame = NULL;
    xlator_t *this = NULL;

    frame = myframe;
    this = frame->this;
    local = frame->local;
    fdctx = local->fdctx;

    if (-1 == req->rpc_status) {
        gf_msg(frame->this->name, GF_LOG_WARNING, ENOTCONN,
               PC_MSG_RPC_STATUS_ERROR,
               "received RPC status error, "
               "returning ENOTCONN");
        rsp.op_ret = -1;
        rsp.op_errno = ENOTCONN;
        goto out;
    }

    ret = xdr_to_generic(*iov, &rsp, (xdrproc_t)xdr_gfx_open_rsp);
    if (ret < 0) {
        gf_msg(frame->this->name, GF_LOG_ERROR, EINVAL,
               PC_MSG_XDR_DECODING_FAILED, "XDR decoding failed");
        rsp.op_ret = -1;
        rsp.op_errno = EINVAL;
        goto out;
    }

    if (rsp.op_ret < 0) {
        gf_msg(frame->this->name, GF_LOG_WARNING, rsp.op_errno,
               PC_MSG_DIR_OP_SUCCESS, "reopen on %s failed.", local->loc.path);
    } else {
        gf_msg_debug(frame->this->name, 0,
                     "reopen on %s succeeded (remote-fd = %" PRId64 ")",
                     local->loc.path, rsp.fd);
    }

    if (rsp.op_ret == -1) {
        ret = -1;
        goto out;
    }

    ret = 0;

out:
    fdctx->reopen_done(fdctx, (rsp.op_ret) ? -1 : rsp.fd, this);

    frame->local = NULL;
    STACK_DESTROY(frame->root);

    client_local_wipe(local);

    return 0;
}

int
client4_0_reopendir_cbk(struct rpc_req *req, struct iovec *iov, int count,
                        void *myframe)
{
    int32_t ret = -1;
    gfx_open_rsp rsp = {
        0,
    };
    clnt_local_t *local = NULL;
    clnt_fd_ctx_t *fdctx = NULL;
    call_frame_t *frame = NULL;

    frame = myframe;
    local = frame->local;
    fdctx = local->fdctx;

    if (-1 == req->rpc_status) {
        gf_msg(frame->this->name, GF_LOG_WARNING, ENOTCONN,
               PC_MSG_RPC_STATUS_ERROR,
               "received RPC status error, "
               "returning ENOTCONN");
        rsp.op_ret = -1;
        rsp.op_errno = ENOTCONN;
        goto out;
    }

    ret = xdr_to_generic(*iov, &rsp, (xdrproc_t)xdr_gfx_open_rsp);
    if (ret < 0) {
        gf_msg(frame->this->name, GF_LOG_ERROR, EINVAL,
               PC_MSG_XDR_DECODING_FAILED, "XDR decoding failed");
        rsp.op_ret = -1;
        rsp.op_errno = EINVAL;
        goto out;
    }

    if (rsp.op_ret < 0) {
        gf_msg(frame->this->name, GF_LOG_WARNING, rsp.op_errno,
               PC_MSG_DIR_OP_FAILED, "reopendir on %s failed", local->loc.path);
    } else {
        gf_msg(frame->this->name, GF_LOG_INFO, 0, PC_MSG_DIR_OP_SUCCESS,
               "reopendir on %s succeeded "
               "(fd = %" PRId64 ")",
               local->loc.path, rsp.fd);
    }

    if (-1 == rsp.op_ret) {
        ret = -1;
        goto out;
    }

out:
    fdctx->reopen_done(fdctx, (rsp.op_ret) ? -1 : rsp.fd, frame->this);

    frame->local = NULL;
    STACK_DESTROY(frame->root);
    client_local_wipe(local);

    return 0;
}

static int
protocol_client_reopendir_v2(clnt_fd_ctx_t *fdctx, xlator_t *this)
{
    int ret = -1;
    gfx_opendir_req req = {
        {
            0,
        },
    };
    clnt_local_t *local = NULL;
    call_frame_t *frame = NULL;
    clnt_conf_t *conf = NULL;

    conf = this->private;

    local = mem_get0(this->local_pool);
    if (!local) {
        ret = -1;
        goto out;
    }
    local->fdctx = fdctx;

    gf_uuid_copy(local->loc.gfid, fdctx->gfid);
    ret = loc_path(&local->loc, NULL);
    if (ret < 0)
        goto out;

    frame = create_frame(this, this->ctx->pool);
    if (!frame) {
        ret = -1;
        goto out;
    }

    memcpy(req.gfid, fdctx->gfid, 16);

    gf_msg_debug(frame->this->name, 0, "attempting reopen on %s",
                 local->loc.path);

    frame->local = local;

    ret = client_submit_request(this, &req, frame, conf->fops, GFS3_OP_OPENDIR,
                                client4_0_reopendir_cbk, NULL, NULL, 0, NULL, 0,
                                NULL, (xdrproc_t)xdr_gfx_opendir_req);
    if (ret) {
        gf_msg(this->name, GF_LOG_ERROR, 0, PC_MSG_DIR_OP_FAILED,
               "failed to send the re-opendir request");
    }

    return 0;

out:
    if (local)
        client_local_wipe(local);

    fdctx->reopen_done(fdctx, fdctx->remote_fd, this);

    return 0;
}

static int
protocol_client_reopenfile_v2(clnt_fd_ctx_t *fdctx, xlator_t *this)
{
    int ret = -1;
    gfx_open_req req = {
        {
            0,
        },
    };
    clnt_local_t *local = NULL;
    call_frame_t *frame = NULL;
    clnt_conf_t *conf = NULL;

    conf = this->private;

    frame = create_frame(this, this->ctx->pool);
    if (!frame) {
        ret = -1;
        goto out;
    }

    local = mem_get0(this->local_pool);
    if (!local) {
        ret = -1;
        goto out;
    }

    local->fdctx = fdctx;
    gf_uuid_copy(local->loc.gfid, fdctx->gfid);
    ret = loc_path(&local->loc, NULL);
    if (ret < 0)
        goto out;

    frame->local = local;

    memcpy(req.gfid, fdctx->gfid, 16);
    req.flags = gf_flags_from_flags(fdctx->flags);
    req.flags = req.flags & (~(O_TRUNC | O_CREAT | O_EXCL));

    gf_msg_debug(frame->this->name, 0, "attempting reopen on %s",
                 local->loc.path);

    ret = client_submit_request(this, &req, frame, conf->fops, GFS3_OP_OPEN,
                                client4_0_reopen_cbk, NULL, NULL, 0, NULL, 0,
                                NULL, (xdrproc_t)xdr_gfx_open_req);
    if (ret) {
        gf_msg(this->name, GF_LOG_ERROR, 0, PC_MSG_DIR_OP_FAILED,
               "failed to send the re-open request");
    }

    return 0;

out:
    if (frame) {
        frame->local = NULL;
        STACK_DESTROY(frame->root);
    }

    if (local)
        client_local_wipe(local);

    fdctx->reopen_done(fdctx, fdctx->remote_fd, this);

    return 0;
}

static void
protocol_client_reopen_v2(clnt_fd_ctx_t *fdctx, xlator_t *this)
{
    if (fdctx->is_dir)
        protocol_client_reopendir_v2(fdctx, this);
    else
        protocol_client_reopenfile_v2(fdctx, this);
}

gf_boolean_t
__is_fd_reopen_in_progress(clnt_fd_ctx_t *fdctx)
{
    if (fdctx->reopen_done == client_default_reopen_done)
        return _gf_false;
    return _gf_true;
}

void
client_attempt_reopen(fd_t *fd, xlator_t *this)
{
    clnt_conf_t *conf = NULL;
    clnt_fd_ctx_t *fdctx = NULL;
    gf_boolean_t reopen = _gf_false;

    if (!fd || !this)
        goto out;

    conf = this->private;

    pthread_spin_lock(&conf->fd_lock);
    {
        fdctx = this_fd_get_ctx(fd, this);
        if (!fdctx) {
            pthread_spin_unlock(&conf->fd_lock);
            goto out;
        }

        if (__is_fd_reopen_in_progress(fdctx))
            goto unlock;
        if (fdctx->remote_fd != -1)
            goto unlock;

        if (fdctx->reopen_attempts == CLIENT_REOPEN_MAX_ATTEMPTS) {
            reopen = _gf_true;
            fdctx->reopen_done = client_reopen_done;
            list_del_init(&fdctx->sfd_pos);
        } else {
            fdctx->reopen_attempts++;
        }
    }
unlock:
    pthread_spin_unlock(&conf->fd_lock);
    if (reopen) {
        if (conf->fops->progver == GLUSTER_FOP_VERSION_v2)
            protocol_client_reopen_v2(fdctx, this);
        else
            protocol_client_reopen(fdctx, this);
    }
out:
    return;
}

int
client_post_handshake(call_frame_t *frame, xlator_t *this)
{
    clnt_conf_t *conf = NULL;
    clnt_fd_ctx_t *tmp = NULL;
    clnt_fd_ctx_t *fdctx = NULL;
    struct list_head reopen_head;

    int count = 0;

    if (!this || !this->private)
        goto out;

    conf = this->private;
    INIT_LIST_HEAD(&reopen_head);

    pthread_spin_lock(&conf->fd_lock);
    {
        list_for_each_entry_safe(fdctx, tmp, &conf->saved_fds, sfd_pos)
        {
            if (fdctx->remote_fd != -1)
                continue;

            fdctx->reopen_done = client_child_up_reopen_done;
            list_del_init(&fdctx->sfd_pos);
            list_add_tail(&fdctx->sfd_pos, &reopen_head);
            count++;
        }
    }
    pthread_spin_unlock(&conf->fd_lock);

    /* Delay notifying CHILD_UP to parents
       until all locks are recovered */
    if (count > 0) {
        gf_msg(this->name, GF_LOG_INFO, 0, PC_MSG_CHILD_UP_NOTIFY_DELAY,
               "%d fds open - Delaying "
               "child_up until they are re-opened",
               count);
        client_save_number_fds(conf, count);

        list_for_each_entry_safe(fdctx, tmp, &reopen_head, sfd_pos)
        {
            list_del_init(&fdctx->sfd_pos);

            if (conf->fops->progver == GLUSTER_FOP_VERSION_v2)
                protocol_client_reopen_v2(fdctx, this);
            else
                protocol_client_reopen(fdctx, this);
        }
    } else {
        gf_msg_debug(this->name, 0,
                     "No fds to open - notifying all parents child "
                     "up");
        client_notify_parents_child_up(this);
    }
out:
    return 0;
}

int
client_setvolume_cbk(struct rpc_req *req, struct iovec *iov, int count,
                     void *myframe)
{
    call_frame_t *frame = NULL;
    clnt_conf_t *conf = NULL;
    xlator_t *this = NULL;
    dict_t *reply = NULL;
    char *process_uuid = NULL;
    char *remote_error = NULL;
    char *remote_subvol = NULL;
    gf_setvolume_rsp rsp = {
        0,
    };
    int ret = 0;
    int32_t op_ret = 0;
    int32_t op_errno = 0;
    gf_boolean_t auth_fail = _gf_false;
    glusterfs_ctx_t *ctx = NULL;

    frame = myframe;
    this = frame->this;
    conf = this->private;
    GF_VALIDATE_OR_GOTO(this->name, conf, out);
    ctx = this->ctx;
    GF_VALIDATE_OR_GOTO(this->name, ctx, out);

    if (-1 == req->rpc_status) {
        gf_msg(frame->this->name, GF_LOG_WARNING, ENOTCONN,
               PC_MSG_RPC_STATUS_ERROR, "received RPC status error");
        op_ret = -1;
        goto out;
    }

    ret = xdr_to_generic(*iov, &rsp, (xdrproc_t)xdr_gf_setvolume_rsp);
    if (ret < 0) {
        gf_msg(this->name, GF_LOG_ERROR, EINVAL, PC_MSG_XDR_DECODING_FAILED,
               "XDR decoding failed");
        op_ret = -1;
        goto out;
    }
    op_ret = rsp.op_ret;
    op_errno = gf_error_to_errno(rsp.op_errno);
    if (-1 == rsp.op_ret) {
        gf_msg(frame->this->name, GF_LOG_WARNING, op_errno, PC_MSG_VOL_SET_FAIL,
               "failed to set the volume");
    }

    reply = dict_new();
    if (!reply)
        goto out;

    if (rsp.dict.dict_len) {
        ret = dict_unserialize(rsp.dict.dict_val, rsp.dict.dict_len, &reply);
        if (ret < 0) {
            gf_msg(frame->this->name, GF_LOG_WARNING, 0,
                   PC_MSG_DICT_UNSERIALIZE_FAIL,
                   "failed to "
                   "unserialize buffer to dict");
            goto out;
        }
    }

    ret = dict_get_str(reply, "ERROR", &remote_error);
    if (ret < 0) {
        gf_msg(this->name, GF_LOG_WARNING, EINVAL, PC_MSG_DICT_GET_FAILED,
               "failed to get ERROR "
               "string from reply dict");
    }

    ret = dict_get_str(reply, "process-uuid", &process_uuid);
    if (ret < 0) {
        gf_msg(this->name, GF_LOG_WARNING, EINVAL, PC_MSG_DICT_GET_FAILED,
               "failed to get "
               "'process-uuid' from reply dict");
    }

    if (op_ret < 0) {
        gf_msg(this->name, GF_LOG_ERROR, op_errno, PC_MSG_SETVOLUME_FAIL,
               "SETVOLUME on remote-host failed: %s", remote_error);

        errno = op_errno;
        if (remote_error &&
            (strcmp("Authentication failed", remote_error) == 0)) {
            auth_fail = _gf_true;
            op_ret = 0;
        }
        if ((op_errno == ENOENT) && this->ctx->cmd_args.subdir_mount &&
            (ctx->graph_id <= 1)) {
            /* A case of subdir not being present at the moment,
               ride on auth_fail framework to notify the error */
            /* Make sure this case is handled only in the new
               graph, so mount may fail in this case. In case
               of 'add-brick' etc, we need to continue retry */
            auth_fail = _gf_true;
            op_ret = 0;
        }
        if (op_errno == ESTALE) {
            ret = client_notify_dispatch(this, GF_EVENT_VOLFILE_MODIFIED, NULL);
            if (ret)
                gf_msg(this->name, GF_LOG_INFO, 0, PC_MSG_VOLFILE_NOTIFY_FAILED,
                       "notify of VOLFILE_MODIFIED failed");
        }
        goto out;
    }

    ret = dict_get_str(this->options, "remote-subvolume", &remote_subvol);
    if (ret || !remote_subvol) {
        gf_msg(this->name, GF_LOG_WARNING, 0, PC_MSG_DICT_GET_FAILED,
               "failed to find key 'remote-subvolume' in the options");
        goto out;
    }

    uint32_t child_up_int;
    ret = dict_get_uint32(reply, "child_up", &child_up_int);
    if (ret) {
        /*
         * This would happen in cases where the server trying to     *
         * connect to this client is running an older version. Hence *
         * setting the child_up to _gf_true in this case.            *
         */
        gf_msg(this->name, GF_LOG_WARNING, 0, PC_MSG_DICT_GET_FAILED,
               "failed to find key 'child_up' in the options");
        conf->child_up = _gf_true;
    } else {
        conf->child_up = (child_up_int != 0);
    }

    /* TODO: currently setpeer path is broken */
    /*
    if (process_uuid && req->conn &&
        !strcmp (this->ctx->process_uuid, process_uuid)) {
            rpc_transport_t      *peer_trans    = NULL;
            uint64_t              peertrans_int = 0;

            ret = dict_get_uint64 (reply, "transport-ptr",
                                   &peertrans_int);
            if (ret)
                    goto out;

            gf_log (this->name, GF_LOG_WARNING,
                    "attaching to the local volume '%s'",
                    remote_subvol);

            peer_trans = (void *) (long) (peertrans_int);

            rpc_transport_setpeer (req->conn->trans, peer_trans);
    }
    */

    conf->client_id = glusterfs_leaf_position(this);

    gf_msg(this->name, GF_LOG_INFO, 0, PC_MSG_REMOTE_VOL_CONNECTED,
           "Connected to %s, attached to remote volume '%s'.",
           conf->rpc->conn.name, remote_subvol);

    op_ret = 0;
    conf->connected = 1;

    client_post_handshake(frame, frame->this);
out:
    if (auth_fail) {
        gf_msg(this->name, GF_LOG_INFO, 0, PC_MSG_AUTH_FAILED,
               "sending AUTH_FAILED event");
        ret = client_notify_dispatch(this, GF_EVENT_AUTH_FAILED, NULL);
        if (ret)
            gf_msg(this->name, GF_LOG_INFO, 0, PC_MSG_AUTH_FAILED_NOTIFY_FAILED,
                   "notify of "
                   "AUTH_FAILED failed");
        conf->connected = 0;
        ret = -1;
    }
    if (-1 == op_ret) {
        /* Let the connection/re-connection happen in
         * background, for now, don't hang here,
         * tell the parents that i am all ok..
         */
        gf_msg(this->name, GF_LOG_INFO, 0, PC_MSG_CHILD_CONNECTING_EVENT,
               "sending "
               "CHILD_CONNECTING event");
        ret = client_notify_dispatch(this, GF_EVENT_CHILD_CONNECTING, NULL);
        if (ret)
            gf_msg(this->name, GF_LOG_INFO, 0,
                   PC_MSG_CHILD_CONNECTING_NOTIFY_FAILED,
                   "notify of CHILD_CONNECTING failed");
        /*
         * The reconnection *won't* happen in the background (see
         * previous comment) unless we kill the current connection.
         */
        rpc_transport_disconnect(conf->rpc->conn.trans, _gf_false);
        ret = 0;
    }

    free(rsp.dict.dict_val);

    STACK_DESTROY(frame->root);

    if (reply)
        dict_unref(reply);

    return ret;
}

int
client_setvolume(xlator_t *this, struct rpc_clnt *rpc)
{
    int ret = 0;
    gf_setvolume_req req = {
        {
            0,
        },
    };
    call_frame_t *fr = NULL;
    char *process_uuid_xl = NULL;
    clnt_conf_t *conf = NULL;
    dict_t *options = NULL;
    char counter_str[32] = {0};
    char hostname[256] = {
        0,
    };

    options = this->options;
    conf = this->private;

    if (conf->fops) {
        ret = dict_set_int32(options, "fops-version", conf->fops->prognum);
        if (ret < 0) {
            gf_msg(this->name, GF_LOG_ERROR, 0, PC_MSG_DICT_SET_FAILED,
                   "failed to set "
                   "version-fops(%d) in handshake msg",
                   conf->fops->prognum);
            goto fail;
        }
    }

    if (conf->mgmt) {
        ret = dict_set_int32(options, "mgmt-version", conf->mgmt->prognum);
        if (ret < 0) {
            gf_msg(this->name, GF_LOG_ERROR, 0, PC_MSG_DICT_SET_FAILED,
                   "failed to set "
                   "version-mgmt(%d) in handshake msg",
                   conf->mgmt->prognum);
            goto fail;
        }
    }

    /*
     * Connection-id should always be unique so that server never gets to
     * reuse the previous connection resources so it cleans up the resources
     * on every disconnect. Otherwise it may lead to stale resources, i.e.
     * leaked file descriptors, inode/entry locks
     */

    snprintf(counter_str, sizeof(counter_str), "-%" PRIu64, conf->setvol_count);
    conf->setvol_count++;

    if (gethostname(hostname, 256) == -1) {
        gf_msg(this->name, GF_LOG_ERROR, errno, LG_MSG_GETHOSTNAME_FAILED,
               "gethostname: failed");

        goto fail;
    }

    ret = gf_asprintf(&process_uuid_xl, GLUSTER_PROCESS_UUID_FMT,
                      this->ctx->process_uuid, this->graph->id, getpid(),
                      hostname, this->name, counter_str);
    if (-1 == ret) {
        gf_msg(this->name, GF_LOG_ERROR, 0, PC_MSG_PROCESS_UUID_SET_FAIL,
               "asprintf failed while "
               "setting process_uuid");
        goto fail;
    }

    ret = dict_set_dynstr(options, "process-uuid", process_uuid_xl);
    if (ret < 0) {
        gf_msg(this->name, GF_LOG_ERROR, 0, PC_MSG_DICT_SET_FAILED,
               "failed to set process-uuid(%s) in handshake msg",
               process_uuid_xl);
        goto fail;
    }

    ret = dict_set_str(options, "process-name",
                       this->ctx->cmd_args.process_name);
    if (ret < 0) {
        gf_msg(this->name, GF_LOG_INFO, 0, PC_MSG_DICT_SET_FAILED,
               "failed to set process-name in handshake msg");
    }

    ret = dict_set_str(options, "client-version", PACKAGE_VERSION);
    if (ret < 0) {
        gf_msg(this->name, GF_LOG_WARNING, 0, PC_MSG_DICT_SET_FAILED,
               "failed to set client-version(%s) in handshake msg",
               PACKAGE_VERSION);
    }

    if (this->ctx->cmd_args.volfile_server) {
        if (this->ctx->cmd_args.volfile_id) {
            ret = dict_set_str(options, "volfile-key",
                               this->ctx->cmd_args.volfile_id);
            if (ret)
                gf_msg(this->name, GF_LOG_ERROR, 0, PC_MSG_DICT_SET_FAILED,
                       "failed to "
                       "set 'volfile-key'");
        }
        ret = dict_set_uint32(options, "volfile-checksum",
                              this->graph->volfile_checksum);
        if (ret)
            gf_msg(this->name, GF_LOG_ERROR, 0, PC_MSG_DICT_SET_FAILED,
                   "failed to set "
                   "'volfile-checksum'");
    }

    if (this->ctx->cmd_args.subdir_mount) {
        ret = dict_set_str(options, "subdir-mount",
                           this->ctx->cmd_args.subdir_mount);
        if (ret) {
            gf_log(THIS->name, GF_LOG_ERROR, "Failed to set subdir_mount");
            /* It makes sense to fail, as per the CLI, we
               should be doing a subdir_mount */
            goto fail;
        }
    }

    /* Insert a dummy key value pair to avoid failure at server side for
     * clnt-lk-version with new clients.
     */
    ret = dict_set_uint32(options, "clnt-lk-version", 1);
    if (ret < 0) {
        gf_msg(this->name, GF_LOG_WARNING, 0, PC_MSG_DICT_SET_FAILED,
               "failed to set clnt-lk-version(1) in handshake msg");
    }

    ret = dict_set_int32(options, "opversion", GD_OP_VERSION_MAX);
    if (ret < 0) {
        gf_msg(this->name, GF_LOG_ERROR, 0, PC_MSG_DICT_SET_FAILED,
               "Failed to set client opversion in handshake message");
    }

    ret = dict_serialized_length(options);
    if (ret < 0) {
        gf_msg(this->name, GF_LOG_ERROR, 0, PC_MSG_DICT_ERROR,
               "failed to get serialized length of dict");
        ret = -1;
        goto fail;
    }
    req.dict.dict_len = ret;
    req.dict.dict_val = GF_CALLOC(1, req.dict.dict_len,
                                  gf_client_mt_clnt_req_buf_t);
    ret = dict_serialize(options, req.dict.dict_val);
    if (ret < 0) {
        gf_msg(this->name, GF_LOG_ERROR, 0, PC_MSG_DICT_SERIALIZE_FAIL,
               "failed to serialize "
               "dictionary");
        goto fail;
    }

    fr = create_frame(this, this->ctx->pool);
    if (!fr)
        goto fail;

    ret = client_submit_request(this, &req, fr, conf->handshake,
                                GF_HNDSK_SETVOLUME, client_setvolume_cbk, NULL,
                                NULL, 0, NULL, 0, NULL,
                                (xdrproc_t)xdr_gf_setvolume_req);

fail:
    GF_FREE(req.dict.dict_val);

    return ret;
}

int
select_server_supported_programs(xlator_t *this, gf_prog_detail *prog)
{
    gf_prog_detail *trav = NULL;
    clnt_conf_t *conf = NULL;
    int ret = -1;

    if (!this || !prog) {
        gf_msg(THIS->name, GF_LOG_WARNING, 0, PC_MSG_PGM_NOT_FOUND,
               "xlator not found OR RPC program not found");
        goto out;
    }

    conf = this->private;
    trav = prog;

    while (trav) {
        /* Select 'programs' */
        if ((clnt3_3_fop_prog.prognum == trav->prognum) &&
            (clnt3_3_fop_prog.progver == trav->progver) && !conf->fops) {
            conf->fops = &clnt3_3_fop_prog;
            if (conf->rpc)
                conf->rpc->auth_value = AUTH_GLUSTERFS_v2;
            ret = 0;
        }

        if ((clnt4_0_fop_prog.prognum == trav->prognum) &&
            (clnt4_0_fop_prog.progver == trav->progver)) {
            conf->fops = &clnt4_0_fop_prog;
            if (conf->rpc)
                conf->rpc->auth_value = AUTH_GLUSTERFS_v3;
            ret = 0;
            /* this is latest program, lets use it */
            goto out;
        }

        if (ret) {
            gf_msg_debug(this->name, 0, "%s (%" PRId64 ") not supported",
                         trav->progname, trav->progver);
        }
        trav = trav->next;
    }

    if (!ret)
        gf_msg(this->name, GF_LOG_INFO, 0, PC_MSG_VERSION_INFO,
               "Using Program %s,"
               " Num (%d), Version (%d)",
               conf->fops->progname, conf->fops->prognum, conf->fops->progver);

out:
    return ret;
}

int
server_has_portmap(xlator_t *this, gf_prog_detail *prog)
{
    gf_prog_detail *trav = NULL;
    int ret = -1;

    if (!this || !prog) {
        gf_msg(THIS->name, GF_LOG_WARNING, 0, PC_MSG_PGM_NOT_FOUND,
               "xlator not found OR RPC program not found");
        goto out;
    }

    trav = prog;

    while (trav) {
        if ((trav->prognum == GLUSTER_PMAP_PROGRAM) &&
            (trav->progver == GLUSTER_PMAP_VERSION)) {
            gf_msg_debug(this->name, 0, "detected portmapper on server");
            ret = 0;
            break;
        }
        trav = trav->next;
    }

out:
    return ret;
}

int
client_query_portmap_cbk(struct rpc_req *req, struct iovec *iov, int count,
                         void *myframe)
{
    struct pmap_port_by_brick_rsp rsp = {
        0,
    };
    call_frame_t *frame = NULL;
    clnt_conf_t *conf = NULL;
    int ret = -1;
    struct rpc_clnt_config config = {
        0,
    };
    xlator_t *this = NULL;

    frame = myframe;
    if (!frame || !frame->this || !frame->this->private) {
        gf_msg(THIS->name, GF_LOG_WARNING, EINVAL, PC_MSG_INVALID_ENTRY,
               "frame not found with rpc "
               "request");
        goto out;
    }
    this = frame->this;
    conf = frame->this->private;

    if (-1 == req->rpc_status) {
        gf_msg(this->name, GF_LOG_WARNING, ENOTCONN, PC_MSG_RPC_STATUS_ERROR,
               "received RPC status error, "
               "try again later");
        goto out;
    }

    ret = xdr_to_generic(*iov, &rsp, (xdrproc_t)xdr_pmap_port_by_brick_rsp);
    if (ret < 0) {
        gf_msg(this->name, GF_LOG_ERROR, EINVAL, PC_MSG_XDR_DECODING_FAILED,
               "XDR decoding failed");
        goto out;
    }

    if (-1 == rsp.op_ret) {
        ret = -1;
        if (!conf->portmap_err_logged) {
            gf_msg(this->name, GF_LOG_ERROR, 0, PC_MSG_PORT_NUM_ERROR,
                   "failed to get the "
                   "port number for remote subvolume. Please run "
                   "'gluster volume status' on server to see if "
                   "brick process is running.");
        } else {
            gf_msg_debug(this->name, 0,
                         "failed to get the port number for "
                         "remote subvolume. Please run 'gluster "
                         "volume status' on server to see "
                         "if brick process is running.");
        }
        conf->portmap_err_logged = 1;
        goto out;
    }

    conf->portmap_err_logged = 0;
    conf->disconnect_err_logged = 0;
    config.remote_port = rsp.port;
    rpc_clnt_reconfig(conf->rpc, &config);

    conf->skip_notify = 1;
    conf->quick_reconnect = 1;

out:
    if (frame)
        STACK_DESTROY(frame->root);

    if (conf) {
        /* Need this to connect the same transport on different port */
        /* ie, glusterd to glusterfsd */
        rpc_transport_disconnect(conf->rpc->conn.trans, _gf_false);
    }

    return ret;
}

int
client_query_portmap(xlator_t *this, struct rpc_clnt *rpc)
{
    int ret = -1;
    pmap_port_by_brick_req req = {
        0,
    };
    call_frame_t *fr = NULL;
    dict_t *options = NULL;
    char *remote_subvol = NULL;
    char *xprt = NULL;
    char brick_name[PATH_MAX] = {
        0,
    };

    options = this->options;

    ret = dict_get_str(options, "remote-subvolume", &remote_subvol);
    if (ret < 0) {
        gf_msg(this->name, GF_LOG_ERROR, 0, PC_MSG_VOL_SET_FAIL,
               "remote-subvolume not set in volfile");
        goto fail;
    }

    req.brick = remote_subvol;

    if (!dict_get_str(options, "transport-type", &xprt)) {
        if (!strcmp(xprt, "rdma")) {
            snprintf(brick_name, sizeof(brick_name), "%s.rdma", remote_subvol);
            req.brick = brick_name;
        }
    }

    fr = create_frame(this, this->ctx->pool);
    if (!fr) {
        ret = -1;
        goto fail;
    }

    ret = client_submit_request(this, &req, fr, &clnt_pmap_prog,
                                GF_PMAP_PORTBYBRICK, client_query_portmap_cbk,
                                NULL, NULL, 0, NULL, 0, NULL,
                                (xdrproc_t)xdr_pmap_port_by_brick_req);

fail:
    return ret;
}

int
client_dump_version_cbk(struct rpc_req *req, struct iovec *iov, int count,
                        void *myframe)
{
    gf_dump_rsp rsp = {
        0,
    };
    gf_prog_detail *trav = NULL;
    gf_prog_detail *next = NULL;
    call_frame_t *frame = NULL;
    clnt_conf_t *conf = NULL;
    int ret = 0;

    frame = myframe;
    conf = frame->this->private;

    if (-1 == req->rpc_status) {
        gf_msg(frame->this->name, GF_LOG_WARNING, ENOTCONN,
               PC_MSG_RPC_STATUS_ERROR, "received RPC status error");
        goto out;
    }

    ret = xdr_to_generic(*iov, &rsp, (xdrproc_t)xdr_gf_dump_rsp);
    if (ret < 0) {
        gf_msg(frame->this->name, GF_LOG_ERROR, EINVAL,
               PC_MSG_XDR_DECODING_FAILED, "XDR decoding failed");
        goto out;
    }
    if (-1 == rsp.op_ret) {
        gf_msg(frame->this->name, GF_LOG_WARNING, 0, PC_MSG_VERSION_ERROR,
               "failed to get the 'versions' "
               "from server");
        goto out;
    }

    if (server_has_portmap(frame->this, rsp.prog) == 0) {
        ret = client_query_portmap(frame->this, conf->rpc);
        goto out;
    }

    /* Check for the proper version string */
    /* Reply in "Name:Program-Number:Program-Version,..." format */
    ret = select_server_supported_programs(frame->this, rsp.prog);
    if (ret) {
        gf_msg(frame->this->name, GF_LOG_ERROR, 0, PC_MSG_VERSION_ERROR,
               "server doesn't support the "
               "version");
        goto out;
    }

    client_setvolume(frame->this, conf->rpc);

out:
    /* don't use GF_FREE, buffer was allocated by libc */
    if (rsp.prog) {
        trav = rsp.prog;
        while (trav) {
            next = trav->next;
            free(trav->progname);
            free(trav);
            trav = next;
        }
    }

    STACK_DESTROY(frame->root);

    if (ret != 0)
        rpc_transport_disconnect(conf->rpc->conn.trans, _gf_false);

    return ret;
}

int
client_handshake(xlator_t *this, struct rpc_clnt *rpc)
{
    call_frame_t *frame = NULL;
    clnt_conf_t *conf = NULL;
    gf_dump_req req = {
        0,
    };
    int ret = 0;

    conf = this->private;
    if (!conf->handshake) {
        gf_msg(this->name, GF_LOG_WARNING, 0, PC_MSG_PGM_NOT_FOUND,
               "handshake program not found");
        goto out;
    }

    frame = create_frame(this, this->ctx->pool);
    if (!frame)
        goto out;

    req.gfs_id = 0xbabe;
    ret = client_submit_request(this, &req, frame, conf->dump, GF_DUMP_DUMP,
                                client_dump_version_cbk, NULL, NULL, 0, NULL, 0,
                                NULL, (xdrproc_t)xdr_gf_dump_req);

out:
    return ret;
}

char *clnt_handshake_procs[GF_HNDSK_MAXVALUE] = {
    [GF_HNDSK_NULL] = "NULL",
    [GF_HNDSK_SETVOLUME] = "SETVOLUME",
    [GF_HNDSK_GETSPEC] = "GETSPEC",
    [GF_HNDSK_PING] = "PING",
};

rpc_clnt_prog_t clnt_handshake_prog = {
    .progname = "GlusterFS Handshake",
    .prognum = GLUSTER_HNDSK_PROGRAM,
    .progver = GLUSTER_HNDSK_VERSION,
    .procnames = clnt_handshake_procs,
};

char *clnt_dump_proc[GF_DUMP_MAXVALUE] = {
    [GF_DUMP_NULL] = "NULL",
    [GF_DUMP_DUMP] = "DUMP",
};

rpc_clnt_prog_t clnt_dump_prog = {
    .progname = "GF-DUMP",
    .prognum = GLUSTER_DUMP_PROGRAM,
    .progver = GLUSTER_DUMP_VERSION,
    .procnames = clnt_dump_proc,
};

char *clnt_pmap_procs[GF_PMAP_MAXVALUE] = {
    [GF_PMAP_PORTBYBRICK] = "PORTBYBRICK",
};

rpc_clnt_prog_t clnt_pmap_prog = {
    .progname = "PORTMAP",
    .prognum = GLUSTER_PMAP_PROGRAM,
    .progver = GLUSTER_PMAP_VERSION,
    .procnames = clnt_pmap_procs,
};