Blob Blame History Raw
/*
   Copyright (c) 2015 Red Hat, Inc. <http://www.redhat.com>
   This file is part of GlusterFS.

   This file is licensed to you under your choice of the GNU Lesser
   General Public License, version 3 or any later version (LGPLv3 or
   later), or the GNU General Public License, version 2 (GPLv2), in all
   cases as published by the Free Software Foundation.
*/

#include "changelog-rpc-common.h"
#include "changelog-messages.h"

#include <glusterfs/syscall.h>
/**
*****************************************************
                  Client Interface
*****************************************************
*/

/**
 * Initialize and return an RPC client object for a given unix
 * domain socket.
 */

void *
changelog_rpc_poller(void *arg)
{
    xlator_t *this = arg;

    (void)event_dispatch(this->ctx->event_pool);
    return NULL;
}

struct rpc_clnt *
changelog_rpc_client_init(xlator_t *this, void *cbkdata, char *sockfile,
                          rpc_clnt_notify_t fn)
{
    int ret = 0;
    struct rpc_clnt *rpc = NULL;
    dict_t *options = NULL;

    if (!cbkdata)
        cbkdata = this;

    options = dict_new();
    if (!options)
        goto error_return;

    ret = rpc_transport_unix_options_build(options, sockfile, 0);
    if (ret) {
        gf_msg(this->name, GF_LOG_ERROR, 0, CHANGELOG_MSG_RPC_BUILD_ERROR,
               "failed to build rpc options");
        goto dealloc_dict;
    }

    rpc = rpc_clnt_new(options, this, this->name, 16);
    if (!rpc)
        goto dealloc_dict;

    ret = rpc_clnt_register_notify(rpc, fn, cbkdata);
    if (ret) {
        gf_msg(this->name, GF_LOG_ERROR, 0,
               CHANGELOG_MSG_NOTIFY_REGISTER_FAILED,
               "failed to register notify");
        goto dealloc_rpc_clnt;
    }

    ret = rpc_clnt_start(rpc);
    if (ret) {
        gf_msg(this->name, GF_LOG_ERROR, 0, CHANGELOG_MSG_RPC_START_ERROR,
               "failed to start rpc");
        goto dealloc_rpc_clnt;
    }

    dict_unref(options);
    return rpc;

dealloc_rpc_clnt:
    rpc_clnt_unref(rpc);
dealloc_dict:
    dict_unref(options);
error_return:
    return NULL;
}

/**
 * Generic RPC client routine to dispatch a request to an
 * RPC server.
 */
int
changelog_rpc_sumbit_req(struct rpc_clnt *rpc, void *req, call_frame_t *frame,
                         rpc_clnt_prog_t *prog, int procnum,
                         struct iovec *payload, int payloadcnt,
                         struct iobref *iobref, xlator_t *this,
                         fop_cbk_fn_t cbkfn, xdrproc_t xdrproc)
{
    int ret = 0;
    int count = 0;
    struct iovec iov = {
        0,
    };
    struct iobuf *iobuf = NULL;
    char new_iobref = 0;
    ssize_t xdr_size = 0;

    GF_ASSERT(this);

    if (req) {
        xdr_size = xdr_sizeof(xdrproc, req);

        iobuf = iobuf_get2(this->ctx->iobuf_pool, xdr_size);
        if (!iobuf) {
            goto out;
        };

        if (!iobref) {
            iobref = iobref_new();
            if (!iobref) {
                goto out;
            }

            new_iobref = 1;
        }

        iobref_add(iobref, iobuf);

        iov.iov_base = iobuf->ptr;
        iov.iov_len = iobuf_size(iobuf);

        /* Create the xdr payload */
        ret = xdr_serialize_generic(iov, req, xdrproc);
        if (ret == -1) {
            goto out;
        }

        iov.iov_len = ret;
        count = 1;
    }

    ret = rpc_clnt_submit(rpc, prog, procnum, cbkfn, &iov, count, payload,
                          payloadcnt, iobref, frame, NULL, 0, NULL, 0, NULL);

out:
    if (new_iobref)
        iobref_unref(iobref);
    if (iobuf)
        iobuf_unref(iobuf);
    return ret;
}

/**
 * Entry point to perform a remote procedure call
 */
int
changelog_invoke_rpc(xlator_t *this, struct rpc_clnt *rpc,
                     rpc_clnt_prog_t *prog, int procidx, void *arg)
{
    int ret = 0;
    call_frame_t *frame = NULL;
    rpc_clnt_procedure_t *proc = NULL;

    if (!this || !prog)
        goto error_return;

    frame = create_frame(this, this->ctx->pool);
    if (!frame) {
        gf_msg(this->name, GF_LOG_ERROR, 0, CHANGELOG_MSG_CREATE_FRAME_FAILED,
               "failed to create frame");
        goto error_return;
    }

    proc = &prog->proctable[procidx];
    if (proc->fn)
        ret = proc->fn(frame, this, arg);

    STACK_DESTROY(frame->root);
    return ret;

error_return:
    return -1;
}

/**
*****************************************************
                  Server Interface
*****************************************************
*/

struct iobuf *
__changelog_rpc_serialize_reply(rpcsvc_request_t *req, void *arg,
                                struct iovec *outmsg, xdrproc_t xdrproc)
{
    struct iobuf *iob = NULL;
    ssize_t retlen = 0;
    ssize_t rsp_size = 0;

    rsp_size = xdr_sizeof(xdrproc, arg);
    iob = iobuf_get2(req->svc->ctx->iobuf_pool, rsp_size);
    if (!iob)
        goto error_return;

    iobuf_to_iovec(iob, outmsg);

    retlen = xdr_serialize_generic(*outmsg, arg, xdrproc);
    if (retlen == -1)
        goto unref_iob;

    outmsg->iov_len = retlen;
    return iob;

unref_iob:
    iobuf_unref(iob);
error_return:
    return NULL;
}

int
changelog_rpc_sumbit_reply(rpcsvc_request_t *req, void *arg,
                           struct iovec *payload, int payloadcount,
                           struct iobref *iobref, xdrproc_t xdrproc)
{
    int ret = -1;
    struct iobuf *iob = NULL;
    struct iovec iov = {
        0,
    };
    char new_iobref = 0;

    if (!req)
        goto return_ret;

    if (!iobref) {
        iobref = iobref_new();
        if (!iobref)
            goto return_ret;
        new_iobref = 1;
    }

    iob = __changelog_rpc_serialize_reply(req, arg, &iov, xdrproc);
    if (!iob)
        gf_msg("", GF_LOG_ERROR, 0, CHANGELOG_MSG_RPC_SUBMIT_REPLY_FAILED,
               "failed to serialize reply");
    else
        iobref_add(iobref, iob);

    ret = rpcsvc_submit_generic(req, &iov, 1, payload, payloadcount, iobref);

    if (new_iobref)
        iobref_unref(iobref);
    if (iob)
        iobuf_unref(iob);
return_ret:
    return ret;
}

void
changelog_rpc_server_destroy(xlator_t *this, rpcsvc_t *rpc, char *sockfile,
                             rpcsvc_notify_t fn, struct rpcsvc_program **progs)
{
    rpcsvc_listener_t *listener = NULL;
    rpcsvc_listener_t *next = NULL;
    struct rpcsvc_program *prog = NULL;
    rpc_transport_t *trans = NULL;

    while (*progs) {
        prog = *progs;
        (void)rpcsvc_program_unregister(rpc, prog);
        progs++;
    }

    list_for_each_entry_safe(listener, next, &rpc->listeners, list)
    {
        if (listener->trans) {
            trans = listener->trans;
            rpc_transport_disconnect(trans, _gf_false);
        }
    }

    (void)rpcsvc_unregister_notify(rpc, fn, this);

    /* TODO Avoid freeing rpc object in case of brick multiplex
       after freeing rpc object svc->rpclock corrupted and it takes
       more time to detach a brick
    */
    if (!this->cleanup_starting) {
        if (rpc->rxpool) {
            mem_pool_destroy(rpc->rxpool);
            rpc->rxpool = NULL;
        }
        GF_FREE(rpc);
    }
}

rpcsvc_t *
changelog_rpc_server_init(xlator_t *this, char *sockfile, void *cbkdata,
                          rpcsvc_notify_t fn, struct rpcsvc_program **progs)
{
    int ret = 0;
    rpcsvc_t *rpc = NULL;
    dict_t *options = NULL;
    struct rpcsvc_program *prog = NULL;

    if (!cbkdata)
        cbkdata = this;

    options = dict_new();
    if (!options)
        return NULL;

    ret = rpcsvc_transport_unix_options_build(options, sockfile);
    if (ret)
        goto dealloc_dict;

    rpc = rpcsvc_init(this, this->ctx, options, 8);
    if (rpc == NULL) {
        gf_msg(this->name, GF_LOG_ERROR, 0, CHANGELOG_MSG_RPC_START_ERROR,
               "failed to init rpc");
        goto dealloc_dict;
    }

    ret = rpcsvc_register_notify(rpc, fn, cbkdata);
    if (ret) {
        gf_msg(this->name, GF_LOG_ERROR, 0,
               CHANGELOG_MSG_NOTIFY_REGISTER_FAILED,
               "failed to register notify function");
        goto dealloc_rpc;
    }

    ret = rpcsvc_create_listeners(rpc, options, this->name);
    if (ret != 1) {
        gf_msg_debug(this->name, 0, "failed to create listeners");
        goto dealloc_rpc;
    }

    while (*progs) {
        prog = *progs;
        ret = rpcsvc_program_register(rpc, prog, _gf_false);
        if (ret) {
            gf_msg(this->name, GF_LOG_ERROR, 0,
                   CHANGELOG_MSG_PROGRAM_NAME_REG_FAILED,
                   "cannot register program "
                   "(name: %s, prognum: %d, pogver: %d)",
                   prog->progname, prog->prognum, prog->progver);
            goto dealloc_rpc;
        }

        progs++;
    }

    dict_unref(options);
    return rpc;

dealloc_rpc:
    GF_FREE(rpc);
dealloc_dict:
    dict_unref(options);
    return NULL;
}