Blob Blame History Raw
/*
  Copyright (c) 2008-2012 Red Hat, Inc. <http://www.redhat.com>
  This file is part of GlusterFS.

  This file is licensed to you under your choice of the GNU Lesser
  General Public License, version 3 or any later version (LGPLv3 or
  later), or the GNU General Public License, version 2 (GPLv2), in all
  cases as published by the Free Software Foundation.
*/

#include <dlfcn.h>
#include <stdlib.h>
#include <stdio.h>
#include <sys/poll.h>
#include <fnmatch.h>
#include <stdint.h>

#include <glusterfs/logging.h>
#include "rpc-transport.h"
#include <glusterfs/glusterfs.h>
/* FIXME: xlator.h is needed for volume_option_t, need to define the datatype
 * in some other header
 */
#include <glusterfs/xlator.h>
#include <glusterfs/list.h>

#ifndef GF_OPTION_LIST_EMPTY
#define GF_OPTION_LIST_EMPTY(_opt) (_opt->value[0] == NULL)
#endif

int32_t
rpc_transport_count(const char *transport_type)
{
    char *transport_dup = NULL;
    char *saveptr = NULL;
    char *ptr = NULL;
    int count = 0;

    if (transport_type == NULL)
        return -1;

    transport_dup = gf_strdup(transport_type);
    if (transport_dup == NULL) {
        return -1;
    }

    ptr = strtok_r(transport_dup, ",", &saveptr);
    while (ptr != NULL) {
        count++;
        ptr = strtok_r(NULL, ",", &saveptr);
    }

    GF_FREE(transport_dup);
    return count;
}

int
rpc_transport_get_myaddr(rpc_transport_t *this, char *peeraddr, int addrlen,
                         struct sockaddr_storage *sa, size_t salen)
{
    int32_t ret = -1;
    GF_VALIDATE_OR_GOTO("rpc", this, out);

    ret = this->ops->get_myaddr(this, peeraddr, addrlen, sa, salen);

out:
    return ret;
}

int32_t
rpc_transport_get_myname(rpc_transport_t *this, char *hostname, int hostlen)
{
    int32_t ret = -1;
    GF_VALIDATE_OR_GOTO("rpc", this, out);

    ret = this->ops->get_myname(this, hostname, hostlen);
out:
    return ret;
}

int32_t
rpc_transport_get_peername(rpc_transport_t *this, char *hostname, int hostlen)
{
    int32_t ret = -1;
    GF_VALIDATE_OR_GOTO("rpc", this, out);

    ret = this->ops->get_peername(this, hostname, hostlen);
out:
    return ret;
}

int
rpc_transport_throttle(rpc_transport_t *this, gf_boolean_t onoff)
{
    int ret = 0;

    if (!this->ops->throttle)
        return -ENOSYS;

    ret = this->ops->throttle(this, onoff);

    return ret;
}

int32_t
rpc_transport_get_peeraddr(rpc_transport_t *this, char *peeraddr, int addrlen,
                           struct sockaddr_storage *sa, size_t salen)
{
    int32_t ret = -1;
    GF_VALIDATE_OR_GOTO("rpc", this, out);

    ret = this->ops->get_peeraddr(this, peeraddr, addrlen, sa, salen);
out:
    return ret;
}

void
rpc_transport_pollin_destroy(rpc_transport_pollin_t *pollin)
{
    GF_VALIDATE_OR_GOTO("rpc", pollin, out);

    if (pollin->iobref) {
        iobref_unref(pollin->iobref);
    }

    if (pollin->private) {
        /* */
        GF_FREE(pollin->private);
    }

    GF_FREE(pollin);
out:
    return;
}

rpc_transport_pollin_t *
rpc_transport_pollin_alloc(rpc_transport_t *this, struct iovec *vector,
                           int count, struct iobuf *hdr_iobuf,
                           struct iobref *iobref, void *private)
{
    rpc_transport_pollin_t *msg = NULL;
    msg = GF_CALLOC(1, sizeof(*msg), gf_common_mt_rpc_trans_pollin_t);
    if (!msg) {
        goto out;
    }

    if (count > 1) {
        msg->vectored = 1;
    }

    memcpy(msg->vector, vector, count * sizeof(*vector));
    msg->count = count;
    msg->iobref = iobref_ref(iobref);
    msg->private = private;
    if (hdr_iobuf)
        iobref_add(iobref, hdr_iobuf);

out:
    return msg;
}

void
rpc_transport_cleanup(rpc_transport_t *trans)
{
    if (!trans)
        return;

    if (trans->fini)
        trans->fini(trans);

    if (trans->options) {
        dict_unref(trans->options);
        trans->options = NULL;
    }

    GF_FREE(trans->name);

    if (trans->xl)
        pthread_mutex_destroy(&trans->lock);

    if (trans->dl_handle)
        dlclose(trans->dl_handle);

    GF_FREE(trans);
}

rpc_transport_t *
rpc_transport_load(glusterfs_ctx_t *ctx, dict_t *options, char *trans_name)
{
    struct rpc_transport *trans = NULL, *return_trans = NULL;
    char *name = NULL;
    void *handle = NULL;
    char *type = NULL;
    char str[] = "ERROR";
    int32_t ret = -1;
    int is_tcp = 0, is_unix = 0, is_ibsdp = 0;
    volume_opt_list_t *vol_opt = NULL;
    gf_boolean_t bind_insecure = _gf_false;
    xlator_t *this = NULL;
    gf_boolean_t success = _gf_false;

    GF_VALIDATE_OR_GOTO("rpc-transport", options, fail);
    GF_VALIDATE_OR_GOTO("rpc-transport", ctx, fail);
    GF_VALIDATE_OR_GOTO("rpc-transport", trans_name, fail);

    trans = GF_CALLOC(1, sizeof(struct rpc_transport),
                      gf_common_mt_rpc_trans_t);
    if (!trans)
        goto fail;

    trans->name = gf_strdup(trans_name);
    if (!trans->name)
        goto fail;

    trans->ctx = ctx;
    type = str;

    /* Backward compatibility */
    ret = dict_get_str(options, "transport-type", &type);
    if (ret < 0) {
        ret = dict_set_str(options, "transport-type", "socket");
        if (ret < 0)
            gf_log("dict", GF_LOG_DEBUG, "setting transport-type failed");
        else
            gf_log("rpc-transport", GF_LOG_DEBUG,
                   "missing 'option transport-type'. defaulting to "
                   "\"socket\"");
    } else {
        {
            /* Backward compatibility to handle * /client,
             * * /server.
             */
            char *tmp = strchr(type, '/');
            if (tmp)
                *tmp = '\0';
        }

        is_tcp = strcmp(type, "tcp");
        is_unix = strcmp(type, "unix");
        is_ibsdp = strcmp(type, "ib-sdp");
        if ((is_tcp == 0) || (is_unix == 0) || (is_ibsdp == 0)) {
            if (is_unix == 0)
                ret = dict_set_str(options, "transport.address-family", "unix");
            if (is_ibsdp == 0)
                ret = dict_set_str(options, "transport.address-family",
                                   "inet-sdp");

            if (ret < 0)
                gf_log("dict", GF_LOG_DEBUG, "setting address-family failed");

            ret = dict_set_str(options, "transport-type", "socket");
            if (ret < 0)
                gf_log("dict", GF_LOG_DEBUG, "setting transport-type failed");
        }
    }

    /* client-bind-insecure is for clients protocol, and
     * bind-insecure for glusterd. Both mutually exclusive
     */
    ret = dict_get_str(options, "client-bind-insecure", &type);
    if (ret)
        ret = dict_get_str(options, "bind-insecure", &type);
    if (ret == 0) {
        ret = gf_string2boolean(type, &bind_insecure);
        if (ret < 0) {
            gf_log("rcp-transport", GF_LOG_WARNING,
                   "bind-insecure option %s is not a"
                   " valid bool option",
                   type);
            goto fail;
        }
        if (_gf_true == bind_insecure)
            trans->bind_insecure = 1;
        else
            trans->bind_insecure = 0;
    } else {
        /* Turning off bind insecure by default*/
        trans->bind_insecure = 0;
    }

    ret = dict_get_str(options, "transport-type", &type);
    if (ret < 0) {
        gf_log("rpc-transport", GF_LOG_ERROR,
               "'option transport-type <xx>' missing in volume '%s'",
               trans_name);
        goto fail;
    }

    ret = gf_asprintf(&name, "%s/%s.so", RPC_TRANSPORTDIR, type);
    if (-1 == ret) {
        goto fail;
    }

    if (dict_get(options, "notify-poller-death")) {
        trans->notify_poller_death = 1;
    }

    gf_log("rpc-transport", GF_LOG_DEBUG, "attempt to load file %s", name);

    handle = dlopen(name, RTLD_NOW);
    if (handle == NULL) {
        gf_log("rpc-transport", GF_LOG_ERROR, "%s", dlerror());
        gf_log("rpc-transport", GF_LOG_WARNING,
               "volume '%s': transport-type '%s' is not valid or "
               "not found on this machine",
               trans_name, type);
        goto fail;
    }

    trans->dl_handle = handle;

    trans->ops = dlsym(handle, "tops");
    if (trans->ops == NULL) {
        gf_log("rpc-transport", GF_LOG_ERROR, "dlsym (rpc_transport_ops) on %s",
               dlerror());
        goto fail;
    }

    *VOID(&(trans->init)) = dlsym(handle, "init");
    if (trans->init == NULL) {
        gf_log("rpc-transport", GF_LOG_ERROR,
               "dlsym (gf_rpc_transport_init) on %s", dlerror());
        goto fail;
    }

    *VOID(&(trans->fini)) = dlsym(handle, "fini");
    if (trans->fini == NULL) {
        gf_log("rpc-transport", GF_LOG_ERROR,
               "dlsym (gf_rpc_transport_fini) on %s", dlerror());
        goto fail;
    }

    *VOID(&(trans->reconfigure)) = dlsym(handle, "reconfigure");
    if (trans->reconfigure == NULL) {
        gf_log("rpc-transport", GF_LOG_DEBUG,
               "dlsym (gf_rpc_transport_reconfigure) on %s", dlerror());
    }

    vol_opt = GF_CALLOC(1, sizeof(volume_opt_list_t),
                        gf_common_mt_volume_opt_list_t);
    if (!vol_opt) {
        goto fail;
    }

    this = THIS;
    vol_opt->given_opt = dlsym(handle, "options");
    if (vol_opt->given_opt == NULL) {
        gf_log("rpc-transport", GF_LOG_DEBUG,
               "volume option validation not specified");
    } else {
        INIT_LIST_HEAD(&vol_opt->list);
        list_add_tail(&vol_opt->list, &(this->volume_options));
        if (xlator_options_validate_list(this, options, vol_opt, NULL)) {
            gf_log("rpc-transport", GF_LOG_ERROR,
                   "volume option validation failed");
            goto fail;
        }
    }

    trans->options = dict_ref(options);

    pthread_mutex_init(&trans->lock, NULL);
    trans->xl = this;

    ret = trans->init(trans);
    if (ret != 0) {
        gf_log("rpc-transport", GF_LOG_WARNING, "'%s' initialization failed",
               type);
        goto fail;
    }

    INIT_LIST_HEAD(&trans->list);
    GF_ATOMIC_INIT(trans->disconnect_progress, 0);

    return_trans = trans;

    GF_FREE(name);

    success = _gf_true;

fail:
    if (!success) {
        rpc_transport_cleanup(trans);
        GF_FREE(name);

        return_trans = NULL;
    }

    if (vol_opt) {
        if (!list_empty(&vol_opt->list)) {
            list_del_init(&vol_opt->list);
        }
        GF_FREE(vol_opt);
    }

    return return_trans;
}

int32_t
rpc_transport_submit_request(rpc_transport_t *this, rpc_transport_req_t *req)
{
    int32_t ret = -1;

    GF_VALIDATE_OR_GOTO("rpc_transport", this, fail);
    GF_VALIDATE_OR_GOTO("rpc_transport", this->ops, fail);

    ret = this->ops->submit_request(this, req);
fail:
    return ret;
}

int32_t
rpc_transport_submit_reply(rpc_transport_t *this, rpc_transport_reply_t *reply)
{
    int32_t ret = -1;

    GF_VALIDATE_OR_GOTO("rpc_transport", this, fail);
    GF_VALIDATE_OR_GOTO("rpc_transport", this->ops, fail);

    ret = this->ops->submit_reply(this, reply);
fail:
    return ret;
}

int32_t
rpc_transport_connect(rpc_transport_t *this, int port)
{
    int ret = -1;

    GF_VALIDATE_OR_GOTO("rpc_transport", this, fail);

    ret = this->ops->connect(this, port);
fail:
    return ret;
}

int32_t
rpc_transport_listen(rpc_transport_t *this)
{
    int ret = -1;

    GF_VALIDATE_OR_GOTO("rpc_transport", this, fail);

    ret = this->ops->listen(this);
fail:
    return ret;
}

int32_t
rpc_transport_disconnect(rpc_transport_t *this, gf_boolean_t wait)
{
    int32_t ret = -1;

    GF_VALIDATE_OR_GOTO("rpc_transport", this, fail);

    ret = this->ops->disconnect(this, wait);

fail:
    return ret;
}

int32_t
rpc_transport_destroy(rpc_transport_t *this)
{
    struct dnscache6 *cache = NULL;
    int32_t ret = -1;

    GF_VALIDATE_OR_GOTO("rpc_transport", this, fail);

    if (this->clnt_options)
        dict_unref(this->clnt_options);
    if (this->options)
        dict_unref(this->options);
    if (this->fini)
        this->fini(this);

    pthread_mutex_destroy(&this->lock);

    GF_FREE(this->name);

    if (this->dl_handle)
        dlclose(this->dl_handle);

    if (this->ssl_name) {
        GF_FREE(this->ssl_name);
    }

    if (this->dnscache) {
        cache = this->dnscache;
        if (cache->first)
            freeaddrinfo(cache->first);
        GF_FREE(this->dnscache);
    }

    GF_FREE(this);

    ret = 0;
fail:
    return ret;
}

rpc_transport_t *
rpc_transport_ref(rpc_transport_t *this)
{
    rpc_transport_t *return_this = NULL;

    GF_VALIDATE_OR_GOTO("rpc_transport", this, fail);

    GF_ATOMIC_INC(this->refcount);

    return_this = this;
fail:
    return return_this;
}

int32_t
rpc_transport_unref(rpc_transport_t *this)
{
    int32_t refcount = 0;
    int32_t ret = -1;

    GF_VALIDATE_OR_GOTO("rpc_transport", this, fail);

    refcount = GF_ATOMIC_DEC(this->refcount);

    if (refcount == 0) {
        if (this->mydata)
            this->notify(this, this->mydata, RPC_TRANSPORT_CLEANUP, NULL);
        this->mydata = NULL;
        this->notify = NULL;
        rpc_transport_destroy(this);
    }

    ret = 0;
fail:
    return ret;
}

int32_t
rpc_transport_notify(rpc_transport_t *this, rpc_transport_event_t event,
                     void *data, ...)
{
    int32_t ret = -1;
    GF_VALIDATE_OR_GOTO("rpc", this, out);

    if (this->notify != NULL) {
        ret = this->notify(this, this->mydata, event, data);
    } else {
        ret = 0;
    }
out:
    return ret;
}

int
rpc_transport_register_notify(rpc_transport_t *trans,
                              rpc_transport_notify_t notify, void *mydata)
{
    int32_t ret = -1;
    GF_VALIDATE_OR_GOTO("rpc", trans, out);

    trans->notify = notify;
    trans->mydata = mydata;

    ret = 0;
out:
    return ret;
}

// give negative values to skip setting that value
// this function asserts if both the values are negative.
// why call it if you don't set it.
int
rpc_transport_keepalive_options_set(dict_t *options, int32_t interval,
                                    int32_t time, int32_t timeout)
{
    int ret = -1;

    GF_ASSERT(options);
    GF_ASSERT((interval > 0) || (time > 0));

    ret = dict_set_int32(options, "transport.socket.keepalive-interval",
                         interval);
    if (ret)
        goto out;

    ret = dict_set_int32(options, "transport.socket.keepalive-time", time);
    if (ret)
        goto out;

    ret = dict_set_int32(options, "transport.tcp-user-timeout", timeout);
    if (ret)
        goto out;
out:
    return ret;
}

int
rpc_transport_unix_options_build(dict_t *dict, char *filepath,
                                 int frame_timeout)
{
    char *fpath = NULL;
    int ret = -1;

    GF_ASSERT(filepath);
    GF_VALIDATE_OR_GOTO("rpc-transport", dict, out);

    fpath = gf_strdup(filepath);
    if (!fpath) {
        ret = -1;
        goto out;
    }

    ret = dict_set_dynstr(dict, "transport.socket.connect-path", fpath);
    if (ret) {
        GF_FREE(fpath);
        goto out;
    }

    ret = dict_set_str(dict, "transport.address-family", "unix");
    if (ret)
        goto out;

    ret = dict_set_str(dict, "transport.socket.nodelay", "off");
    if (ret)
        goto out;

    ret = dict_set_str(dict, "transport-type", "socket");
    if (ret)
        goto out;

    ret = dict_set_str(dict, "transport.socket.keepalive", "off");
    if (ret)
        goto out;

    if (frame_timeout > 0) {
        ret = dict_set_int32(dict, "frame-timeout", frame_timeout);
        if (ret)
            goto out;
    }
out:
    return ret;
}

int
rpc_transport_inet_options_build(dict_t *dict, const char *hostname, int port,
                                 char *af)
{
    char *host = NULL;
    int ret = -1;
#ifdef IPV6_DEFAULT
    char *addr_family = "inet6";
#else
    char *addr_family = "inet";
#endif

    GF_ASSERT(hostname);
    GF_ASSERT(port >= 1024);
    GF_VALIDATE_OR_GOTO("rpc-transport", dict, out);

    host = gf_strdup((char *)hostname);
    if (!host) {
        ret = -1;
        goto out;
    }

    ret = dict_set_dynstr(dict, "remote-host", host);
    if (ret) {
        gf_log(THIS->name, GF_LOG_WARNING, "failed to set remote-host with %s",
               host);
        GF_FREE(host);
        goto out;
    }

    ret = dict_set_int32(dict, "remote-port", port);
    if (ret) {
        gf_log(THIS->name, GF_LOG_WARNING, "failed to set remote-port with %d",
               port);
        goto out;
    }

    ret = dict_set_str(dict, "address-family", (af != NULL ? af : addr_family));
    if (ret) {
        gf_log(THIS->name, GF_LOG_WARNING, "failed to set address-family to %s",
               addr_family);
        goto out;
    }

    ret = dict_set_str(dict, "transport-type", "socket");
    if (ret) {
        gf_log(THIS->name, GF_LOG_WARNING,
               "failed to set trans-type with socket");
        goto out;
    }
out:
    return ret;
}