/*
Copyright (c) 2008-2012 Red Hat, Inc. <http://www.redhat.com>
This file is part of GlusterFS.
This file is licensed to you under your choice of the GNU Lesser
General Public License, version 3 or any later version (LGPLv3 or
later), or the GNU General Public License, version 2 (GPLv2), in all
cases as published by the Free Software Foundation.
*/
#include <glusterfs/dict.h>
#include <glusterfs/glusterfs.h>
#include <glusterfs/iobuf.h>
#include <glusterfs/logging.h>
#include "rdma.h"
#include "name.h"
#include <glusterfs/byte-order.h>
#include <glusterfs/xlator.h>
#include "xdr-rpc.h"
#include "rpc-lib-messages.h"
#include "rpc-trans-rdma-messages.h"
#include <signal.h>
#define GF_RDMA_LOG_NAME "rpc-transport/rdma"
static int32_t
__gf_rdma_ioq_churn(gf_rdma_peer_t *peer);
gf_rdma_post_t *
gf_rdma_post_ref(gf_rdma_post_t *post);
int
gf_rdma_post_unref(gf_rdma_post_t *post);
static void *
gf_rdma_send_completion_proc(void *data);
static void *
gf_rdma_recv_completion_proc(void *data);
void *
gf_rdma_async_event_thread(void *context);
static int32_t
gf_rdma_create_qp(rpc_transport_t *this);
static int32_t
__gf_rdma_teardown(rpc_transport_t *this);
static int32_t
gf_rdma_teardown(rpc_transport_t *this);
static int32_t
gf_rdma_disconnect(rpc_transport_t *this, gf_boolean_t wait);
static void
gf_rdma_cm_handle_disconnect(rpc_transport_t *this);
static int
gf_rdma_cm_handle_connect_init(struct rdma_cm_event *event);
static void
gf_rdma_put_post(gf_rdma_queue_t *queue, gf_rdma_post_t *post)
{
post->ctx.is_request = 0;
pthread_mutex_lock(&queue->lock);
{
if (post->prev) {
queue->active_count--;
post->prev->next = post->next;
}
if (post->next) {
post->next->prev = post->prev;
}
post->prev = &queue->passive_posts;
post->next = post->prev->next;
post->prev->next = post;
post->next->prev = post;
queue->passive_count++;
}
pthread_mutex_unlock(&queue->lock);
}
static gf_rdma_post_t *
gf_rdma_new_post(rpc_transport_t *this, gf_rdma_device_t *device, int32_t len,
gf_rdma_post_type_t type)
{
gf_rdma_post_t *post = NULL;
int ret = -1;
post = (gf_rdma_post_t *)GF_CALLOC(1, sizeof(*post),
gf_common_mt_rdma_post_t);
if (post == NULL) {
goto out;
}
pthread_mutex_init(&post->lock, NULL);
post->buf_size = len;
post->buf = valloc(len);
if (!post->buf) {
gf_msg_nomem(GF_RDMA_LOG_NAME, GF_LOG_ERROR, len);
goto out;
}
post->mr = ibv_reg_mr(device->pd, post->buf, post->buf_size,
IBV_ACCESS_LOCAL_WRITE);
if (!post->mr) {
gf_msg(this->name, GF_LOG_WARNING, errno, RDMA_MSG_MR_ALOC_FAILED,
"memory registration failed");
goto out;
}
post->device = device;
post->type = type;
ret = 0;
out:
if (ret != 0 && post) {
free(post->buf);
GF_FREE(post);
post = NULL;
}
return post;
}
static gf_rdma_post_t *
gf_rdma_get_post(gf_rdma_queue_t *queue)
{
gf_rdma_post_t *post = NULL;
pthread_mutex_lock(&queue->lock);
{
post = queue->passive_posts.next;
if (post == &queue->passive_posts)
post = NULL;
if (post) {
if (post->prev)
post->prev->next = post->next;
if (post->next)
post->next->prev = post->prev;
post->prev = &queue->active_posts;
post->next = post->prev->next;
post->prev->next = post;
post->next->prev = post;
post->reused++;
queue->active_count++;
}
}
pthread_mutex_unlock(&queue->lock);
return post;
}
void
gf_rdma_destroy_post(gf_rdma_post_t *post)
{
ibv_dereg_mr(post->mr);
free(post->buf);
GF_FREE(post);
}
static int32_t
__gf_rdma_quota_get(gf_rdma_peer_t *peer)
{
int32_t ret = -1;
gf_rdma_private_t *priv = NULL;
priv = peer->trans->private;
if (priv->connected && peer->quota > 0) {
ret = peer->quota--;
}
return ret;
}
static void
__gf_rdma_ioq_entry_free(gf_rdma_ioq_t *entry)
{
list_del_init(&entry->list);
if (entry->iobref) {
iobref_unref(entry->iobref);
entry->iobref = NULL;
}
if (entry->msg.request.rsp_iobref) {
iobref_unref(entry->msg.request.rsp_iobref);
entry->msg.request.rsp_iobref = NULL;
}
mem_put(entry);
}
static void
__gf_rdma_ioq_flush(gf_rdma_peer_t *peer)
{
gf_rdma_ioq_t *entry = NULL, *dummy = NULL;
list_for_each_entry_safe(entry, dummy, &peer->ioq, list)
{
__gf_rdma_ioq_entry_free(entry);
}
}
static int32_t
__gf_rdma_disconnect(rpc_transport_t *this)
{
gf_rdma_private_t *priv = NULL;
priv = this->private;
if (priv->connected) {
rdma_disconnect(priv->peer.cm_id);
}
return 0;
}
static void
gf_rdma_queue_init(gf_rdma_queue_t *queue)
{
pthread_mutex_init(&queue->lock, NULL);
queue->active_posts.next = &queue->active_posts;
queue->active_posts.prev = &queue->active_posts;
queue->passive_posts.next = &queue->passive_posts;
queue->passive_posts.prev = &queue->passive_posts;
}
static void
__gf_rdma_destroy_queue(gf_rdma_post_t *post)
{
gf_rdma_post_t *tmp = NULL;
while (post->next != post) {
tmp = post->next;
post->next = post->next->next;
post->next->prev = post;
gf_rdma_destroy_post(tmp);
}
}
static void
gf_rdma_destroy_queue(gf_rdma_queue_t *queue)
{
if (queue == NULL) {
goto out;
}
pthread_mutex_lock(&queue->lock);
{
if (queue->passive_count > 0) {
__gf_rdma_destroy_queue(&queue->passive_posts);
queue->passive_count = 0;
}
if (queue->active_count > 0) {
__gf_rdma_destroy_queue(&queue->active_posts);
queue->active_count = 0;
}
}
pthread_mutex_unlock(&queue->lock);
out:
return;
}
static void
gf_rdma_destroy_posts(rpc_transport_t *this)
{
gf_rdma_device_t *device = NULL;
gf_rdma_private_t *priv = NULL;
if (this == NULL) {
goto out;
}
priv = this->private;
device = priv->device;
gf_rdma_destroy_queue(&device->sendq);
gf_rdma_destroy_queue(&device->recvq);
out:
return;
}
static int32_t
__gf_rdma_create_posts(rpc_transport_t *this, int32_t count, int32_t size,
gf_rdma_queue_t *q, gf_rdma_post_type_t type)
{
int32_t i = 0;
int32_t ret = 0;
gf_rdma_private_t *priv = NULL;
gf_rdma_device_t *device = NULL;
priv = this->private;
device = priv->device;
for (i = 0; i < count; i++) {
gf_rdma_post_t *post = NULL;
post = gf_rdma_new_post(this, device, size + 2048, type);
if (!post) {
gf_msg(this->name, GF_LOG_ERROR, 0, RDMA_MSG_POST_CREATION_FAILED,
"post creation failed");
ret = -1;
break;
}
gf_rdma_put_post(q, post);
}
return ret;
}
static int32_t
gf_rdma_post_recv(struct ibv_srq *srq, gf_rdma_post_t *post)
{
struct ibv_sge list = {.addr = (unsigned long)post->buf,
.length = post->buf_size,
.lkey = post->mr->lkey};
struct ibv_recv_wr wr =
{
.wr_id = (unsigned long)post,
.sg_list = &list,
.num_sge = 1,
},
*bad_wr;
gf_rdma_post_ref(post);
return ibv_post_srq_recv(srq, &wr, &bad_wr);
}
static void
gf_rdma_deregister_iobuf_pool(gf_rdma_device_t *device)
{
gf_rdma_arena_mr *arena_mr = NULL;
gf_rdma_arena_mr *tmp = NULL;
while (device) {
pthread_mutex_lock(&device->all_mr_lock);
{
if (!list_empty(&device->all_mr)) {
list_for_each_entry_safe(arena_mr, tmp, &device->all_mr, list)
{
if (ibv_dereg_mr(arena_mr->mr)) {
gf_msg("rdma", GF_LOG_WARNING, 0,
RDMA_MSG_DEREGISTER_ARENA_FAILED,
"deallocation of memory region "
"failed");
pthread_mutex_unlock(&device->all_mr_lock);
return;
}
list_del(&arena_mr->list);
GF_FREE(arena_mr);
}
}
}
pthread_mutex_unlock(&device->all_mr_lock);
device = device->next;
}
}
int
gf_rdma_deregister_arena(struct list_head **mr_list,
struct iobuf_arena *iobuf_arena)
{
gf_rdma_arena_mr *tmp = NULL;
gf_rdma_arena_mr *dummy = NULL;
gf_rdma_device_t *device = NULL;
int count = 0, i = 0;
count = iobuf_arena->iobuf_pool->rdma_device_count;
for (i = 0; i < count; i++) {
device = iobuf_arena->iobuf_pool->device[i];
pthread_mutex_lock(&device->all_mr_lock);
{
list_for_each_entry_safe(tmp, dummy, mr_list[i], list)
{
if (tmp->iobuf_arena == iobuf_arena) {
if (ibv_dereg_mr(tmp->mr)) {
gf_msg("rdma", GF_LOG_WARNING, 0,
RDMA_MSG_DEREGISTER_ARENA_FAILED,
"deallocation of memory region "
"failed");
pthread_mutex_unlock(&device->all_mr_lock);
return -1;
}
list_del(&tmp->list);
GF_FREE(tmp);
break;
}
}
}
pthread_mutex_unlock(&device->all_mr_lock);
}
return 0;
}
int
gf_rdma_register_arena(void **arg1, void *arg2)
{
struct ibv_mr *mr = NULL;
gf_rdma_arena_mr *new = NULL;
struct iobuf_pool *iobuf_pool = NULL;
gf_rdma_device_t **device = (gf_rdma_device_t **)arg1;
struct iobuf_arena *iobuf_arena = arg2;
int count = 0, i = 0;
iobuf_pool = iobuf_arena->iobuf_pool;
count = iobuf_pool->rdma_device_count;
for (i = 0; i < count; i++) {
new = GF_CALLOC(1, sizeof(gf_rdma_arena_mr),
gf_common_mt_rdma_arena_mr);
if (new == NULL) {
gf_msg("rdma", GF_LOG_INFO, ENOMEM, RDMA_MSG_MR_ALOC_FAILED,
"Out of "
"memory: registering pre allocated buffer "
"with rdma device failed.");
return -1;
}
INIT_LIST_HEAD(&new->list);
new->iobuf_arena = iobuf_arena;
mr = ibv_reg_mr(device[i]->pd, iobuf_arena->mem_base,
iobuf_arena->arena_size,
IBV_ACCESS_REMOTE_READ | IBV_ACCESS_LOCAL_WRITE |
IBV_ACCESS_REMOTE_WRITE);
if (!mr)
gf_msg("rdma", GF_LOG_WARNING, 0, RDMA_MSG_MR_ALOC_FAILED,
"allocation of mr "
"failed");
new->mr = mr;
pthread_mutex_lock(&device[i]->all_mr_lock);
{
list_add(&new->list, &device[i]->all_mr);
}
pthread_mutex_unlock(&device[i]->all_mr_lock);
new = NULL;
}
return 0;
}
static void
gf_rdma_register_iobuf_pool(gf_rdma_device_t *device,
struct iobuf_pool *iobuf_pool)
{
struct iobuf_arena *tmp = NULL;
struct iobuf_arena *dummy = NULL;
struct ibv_mr *mr = NULL;
gf_rdma_arena_mr *new = NULL;
if (!list_empty(&iobuf_pool->all_arenas)) {
list_for_each_entry_safe(tmp, dummy, &iobuf_pool->all_arenas, all_list)
{
new = GF_CALLOC(1, sizeof(gf_rdma_arena_mr),
gf_common_mt_rdma_arena_mr);
if (new == NULL) {
gf_msg("rdma", GF_LOG_INFO, ENOMEM, RDMA_MSG_MR_ALOC_FAILED,
"Out of "
"memory: registering pre allocated "
"buffer with rdma device failed.");
return;
}
INIT_LIST_HEAD(&new->list);
new->iobuf_arena = tmp;
mr = ibv_reg_mr(device->pd, tmp->mem_base, tmp->arena_size,
IBV_ACCESS_REMOTE_READ | IBV_ACCESS_LOCAL_WRITE |
IBV_ACCESS_REMOTE_WRITE);
if (!mr) {
gf_msg("rdma", GF_LOG_WARNING, 0, RDMA_MSG_MR_ALOC_FAILED,
"failed"
" to pre register buffers with rdma "
"devices.");
}
new->mr = mr;
pthread_mutex_lock(&device->all_mr_lock);
{
list_add(&new->list, &device->all_mr);
}
pthread_mutex_unlock(&device->all_mr_lock);
new = NULL;
}
}
return;
}
static void
gf_rdma_register_iobuf_pool_with_device(gf_rdma_device_t *device,
struct iobuf_pool *iobuf_pool)
{
while (device) {
gf_rdma_register_iobuf_pool(device, iobuf_pool);
device = device->next;
}
}
static struct ibv_mr *
gf_rdma_get_pre_registred_mr(rpc_transport_t *this, void *ptr, int size)
{
gf_rdma_arena_mr *tmp = NULL;
gf_rdma_arena_mr *dummy = NULL;
gf_rdma_private_t *priv = NULL;
gf_rdma_device_t *device = NULL;
priv = this->private;
device = priv->device;
pthread_mutex_lock(&device->all_mr_lock);
{
if (!list_empty(&device->all_mr)) {
list_for_each_entry_safe(tmp, dummy, &device->all_mr, list)
{
if (tmp->iobuf_arena->mem_base <= ptr &&
ptr < tmp->iobuf_arena->mem_base +
tmp->iobuf_arena->arena_size) {
pthread_mutex_unlock(&device->all_mr_lock);
return tmp->mr;
}
}
}
}
pthread_mutex_unlock(&device->all_mr_lock);
return NULL;
}
static int32_t
gf_rdma_create_posts(rpc_transport_t *this)
{
int32_t i = 0, ret = 0;
gf_rdma_post_t *post = NULL;
gf_rdma_private_t *priv = NULL;
gf_rdma_options_t *options = NULL;
gf_rdma_device_t *device = NULL;
priv = this->private;
options = &priv->options;
device = priv->device;
ret = __gf_rdma_create_posts(this, options->send_count, options->send_size,
&device->sendq, GF_RDMA_SEND_POST);
if (!ret)
ret = __gf_rdma_create_posts(this, options->recv_count,
options->recv_size, &device->recvq,
GF_RDMA_RECV_POST);
if (!ret) {
for (i = 0; i < options->recv_count; i++) {
post = gf_rdma_get_post(&device->recvq);
if (gf_rdma_post_recv(device->srq, post) != 0) {
ret = -1;
break;
}
}
}
if (ret)
gf_rdma_destroy_posts(this);
return ret;
}
static void
gf_rdma_destroy_cq(rpc_transport_t *this)
{
gf_rdma_private_t *priv = NULL;
gf_rdma_device_t *device = NULL;
priv = this->private;
device = priv->device;
if (device->recv_cq)
ibv_destroy_cq(device->recv_cq);
device->recv_cq = NULL;
if (device->send_cq)
ibv_destroy_cq(device->send_cq);
device->send_cq = NULL;
return;
}
static int32_t
gf_rdma_create_cq(rpc_transport_t *this)
{
gf_rdma_private_t *priv = NULL;
gf_rdma_options_t *options = NULL;
gf_rdma_device_t *device = NULL;
uint64_t send_cqe = 0;
int32_t ret = 0;
struct ibv_device_attr device_attr = {
{0},
};
priv = this->private;
options = &priv->options;
device = priv->device;
device->recv_cq = ibv_create_cq(priv->device->context,
options->recv_count * 2, device,
device->recv_chan, 0);
if (!device->recv_cq) {
gf_msg(this->name, GF_LOG_ERROR, 0, RDMA_MSG_CQ_CREATION_FAILED,
"creation of CQ for "
"device %s failed",
device->device_name);
ret = -1;
goto out;
} else if (ibv_req_notify_cq(device->recv_cq, 0)) {
gf_msg(this->name, GF_LOG_ERROR, 0, RDMA_MSG_REQ_NOTIFY_CQ_REVQ_FAILED,
"ibv_req_notify_"
"cq on recv CQ of device %s failed",
device->device_name);
ret = -1;
goto out;
}
do {
ret = ibv_query_device(priv->device->context, &device_attr);
if (ret != 0) {
gf_msg(this->name, GF_LOG_ERROR, 0, RDMA_MSG_QUERY_DEVICE_FAILED,
"ibv_query_"
"device on %s returned %d (%s)",
priv->device->device_name, ret,
(ret > 0) ? strerror(ret) : "");
ret = -1;
goto out;
}
send_cqe = (uint64_t)options->send_count * 128;
send_cqe = (send_cqe > device_attr.max_cqe) ? device_attr.max_cqe
: send_cqe;
/* TODO: make send_cq size dynamically adaptive */
device->send_cq = ibv_create_cq(priv->device->context, send_cqe, device,
device->send_chan, 0);
if (!device->send_cq) {
gf_msg(this->name, GF_LOG_ERROR, 0, RDMA_MSG_CQ_CREATION_FAILED,
"creation of send_cq "
"for device %s failed",
device->device_name);
ret = -1;
goto out;
}
if (ibv_req_notify_cq(device->send_cq, 0)) {
gf_msg(this->name, GF_LOG_ERROR, 0,
RDMA_MSG_REQ_NOTIFY_CQ_SENDQ_FAILED,
"ibv_req_notify_cq on send_cq for device %s"
" failed",
device->device_name);
ret = -1;
goto out;
}
} while (0);
out:
if (ret != 0)
gf_rdma_destroy_cq(this);
return ret;
}
static gf_rdma_device_t *
gf_rdma_get_device(rpc_transport_t *this, struct ibv_context *ibctx,
char *device_name)
{
glusterfs_ctx_t *ctx = NULL;
gf_rdma_private_t *priv = NULL;
gf_rdma_options_t *options = NULL;
int32_t ret = 0;
int32_t i = 0;
gf_rdma_device_t *trav = NULL, *device = NULL;
gf_rdma_ctx_t *rdma_ctx = NULL;
struct iobuf_pool *iobuf_pool = NULL;
priv = this->private;
options = &priv->options;
ctx = this->ctx;
rdma_ctx = ctx->ib;
iobuf_pool = ctx->iobuf_pool;
trav = rdma_ctx->device;
while (trav) {
if (!strcmp(trav->device_name, device_name))
break;
trav = trav->next;
}
if (!trav) {
trav = GF_CALLOC(1, sizeof(*trav), gf_common_mt_rdma_device_t);
if (trav == NULL) {
goto out;
}
priv->device = trav;
trav->context = ibctx;
trav->next = rdma_ctx->device;
rdma_ctx->device = trav;
iobuf_pool->device[iobuf_pool->rdma_device_count] = trav;
iobuf_pool->mr_list[iobuf_pool->rdma_device_count++] = &trav->all_mr;
trav->request_ctx_pool = mem_pool_new(gf_rdma_request_context_t,
GF_RDMA_POOL_SIZE);
if (trav->request_ctx_pool == NULL) {
goto out;
}
trav->ioq_pool = mem_pool_new(gf_rdma_ioq_t, GF_RDMA_POOL_SIZE);
if (trav->ioq_pool == NULL) {
goto out;
}
trav->reply_info_pool = mem_pool_new(gf_rdma_reply_info_t,
GF_RDMA_POOL_SIZE);
if (trav->reply_info_pool == NULL) {
goto out;
}
trav->device_name = gf_strdup(device_name);
trav->send_chan = ibv_create_comp_channel(trav->context);
if (!trav->send_chan) {
gf_msg(this->name, GF_LOG_ERROR, 0, RDMA_MSG_SEND_COMP_CHAN_FAILED,
"could not "
"create send completion channel for "
"device (%s)",
device_name);
goto out;
}
trav->recv_chan = ibv_create_comp_channel(trav->context);
if (!trav->recv_chan) {
gf_msg(this->name, GF_LOG_ERROR, 0, RDMA_MSG_RECV_COMP_CHAN_FAILED,
"could not "
"create recv completion channel for "
"device (%s)",
device_name);
/* TODO: cleanup current mess */
goto out;
}
if (gf_rdma_create_cq(this) < 0) {
gf_msg(this->name, GF_LOG_ERROR, 0, RDMA_MSG_CQ_CREATION_FAILED,
"could not create CQ for device (%s)", device_name);
goto out;
}
/* protection domain */
trav->pd = ibv_alloc_pd(trav->context);
if (!trav->pd) {
gf_msg(this->name, GF_LOG_ERROR, 0, RDMA_MSG_ALOC_PROT_DOM_FAILED,
"could not "
"allocate protection domain for device (%s)",
device_name);
goto out;
}
struct ibv_srq_init_attr attr = {.attr = {.max_wr = options->recv_count,
.max_sge = 1,
.srq_limit = 10}};
trav->srq = ibv_create_srq(trav->pd, &attr);
if (!trav->srq) {
gf_msg(this->name, GF_LOG_ERROR, 0, RDMA_MSG_CRE_SRQ_FAILED,
"could not create SRQ"
" for device (%s)",
device_name);
goto out;
}
/* queue init */
gf_rdma_queue_init(&trav->sendq);
gf_rdma_queue_init(&trav->recvq);
INIT_LIST_HEAD(&trav->all_mr);
pthread_mutex_init(&trav->all_mr_lock, NULL);
gf_rdma_register_iobuf_pool(trav, iobuf_pool);
if (gf_rdma_create_posts(this) < 0) {
gf_msg(this->name, GF_LOG_ERROR, 0, RDMA_MSG_ALOC_POST_FAILED,
"could not allocate"
"posts for device (%s)",
device_name);
goto out;
}
/* completion threads */
ret = gf_thread_create(&trav->send_thread, NULL,
gf_rdma_send_completion_proc, trav->send_chan,
"rdmascom");
if (ret) {
gf_msg(this->name, GF_LOG_ERROR, 0,
RDMA_MSG_SEND_COMP_THREAD_FAILED,
"could not create send completion thread for "
"device (%s)",
device_name);
goto out;
}
ret = gf_thread_create(&trav->recv_thread, NULL,
gf_rdma_recv_completion_proc, trav->recv_chan,
"rdmarcom");
if (ret) {
gf_msg(this->name, GF_LOG_ERROR, 0,
RDMA_MSG_RECV_COMP_THREAD_FAILED,
"could not create recv completion thread "
"for device (%s)",
device_name);
return NULL;
}
ret = gf_thread_create(&trav->async_event_thread, NULL,
gf_rdma_async_event_thread, ibctx, "rdmaAsyn");
if (ret) {
gf_msg(this->name, GF_LOG_ERROR, 0,
RDMA_MSG_ASYNC_EVENT_THEAD_FAILED,
"could not create async_event_thread");
return NULL;
}
/* qpreg */
pthread_mutex_init(&trav->qpreg.lock, NULL);
for (i = 0; i < 42; i++) {
trav->qpreg.ents[i].next = &trav->qpreg.ents[i];
trav->qpreg.ents[i].prev = &trav->qpreg.ents[i];
}
}
device = trav;
trav = NULL;
out:
if (trav != NULL) {
rdma_ctx->device = trav->next;
gf_rdma_destroy_posts(this);
mem_pool_destroy(trav->ioq_pool);
mem_pool_destroy(trav->request_ctx_pool);
mem_pool_destroy(trav->reply_info_pool);
if (trav->pd != NULL) {
ibv_dealloc_pd(trav->pd);
}
gf_rdma_destroy_cq(this);
ibv_destroy_comp_channel(trav->recv_chan);
ibv_destroy_comp_channel(trav->send_chan);
GF_FREE((char *)trav->device_name);
GF_FREE(trav);
}
return device;
}
static rpc_transport_t *
gf_rdma_transport_new(rpc_transport_t *listener, struct rdma_cm_id *cm_id)
{
gf_rdma_private_t *listener_priv = NULL, *priv = NULL;
rpc_transport_t *this = NULL, *new = NULL;
gf_rdma_options_t *options = NULL;
char *device_name = NULL;
listener_priv = listener->private;
this = GF_CALLOC(1, sizeof(rpc_transport_t), gf_common_mt_rpc_transport_t);
if (this == NULL) {
goto out;
}
this->listener = listener;
priv = GF_CALLOC(1, sizeof(gf_rdma_private_t), gf_common_mt_rdma_private_t);
if (priv == NULL) {
goto out;
}
this->private = priv;
priv->options = listener_priv->options;
priv->listener = listener;
priv->entity = GF_RDMA_SERVER;
options = &priv->options;
this->ops = listener->ops;
this->init = listener->init;
this->fini = listener->fini;
this->ctx = listener->ctx;
this->name = gf_strdup(listener->name);
this->notify = listener->notify;
this->mydata = listener->mydata;
this->xl = listener->xl;
this->myinfo.sockaddr_len = sizeof(cm_id->route.addr.src_addr);
memcpy(&this->myinfo.sockaddr, &cm_id->route.addr.src_addr,
this->myinfo.sockaddr_len);
this->peerinfo.sockaddr_len = sizeof(cm_id->route.addr.dst_addr);
memcpy(&this->peerinfo.sockaddr, &cm_id->route.addr.dst_addr,
this->peerinfo.sockaddr_len);
priv->peer.trans = this;
gf_rdma_get_transport_identifiers(this);
device_name = (char *)ibv_get_device_name(cm_id->verbs->device);
if (device_name == NULL) {
gf_msg(listener->name, GF_LOG_WARNING, 0,
RDMA_MSG_GET_DEVICE_NAME_FAILED,
"cannot get device "
"name (peer:%s me:%s)",
this->peerinfo.identifier, this->myinfo.identifier);
goto out;
}
priv->device = gf_rdma_get_device(this, cm_id->verbs, device_name);
if (priv->device == NULL) {
gf_msg(listener->name, GF_LOG_WARNING, 0, RDMA_MSG_GET_IB_DEVICE_FAILED,
"cannot get infiniband"
" device %s (peer:%s me:%s)",
device_name, this->peerinfo.identifier, this->myinfo.identifier);
goto out;
}
priv->peer.send_count = options->send_count;
priv->peer.recv_count = options->recv_count;
priv->peer.send_size = options->send_size;
priv->peer.recv_size = options->recv_size;
priv->peer.cm_id = cm_id;
INIT_LIST_HEAD(&priv->peer.ioq);
pthread_mutex_init(&priv->write_mutex, NULL);
pthread_mutex_init(&priv->recv_mutex, NULL);
cm_id->context = this;
new = rpc_transport_ref(this);
this = NULL;
out:
if (this != NULL) {
if (this->private != NULL) {
GF_FREE(this->private);
}
if (this->name != NULL) {
GF_FREE(this->name);
}
GF_FREE(this);
}
return new;
}
static int
gf_rdma_cm_handle_connect_request(struct rdma_cm_event *event)
{
int ret = -1;
rpc_transport_t *this = NULL, *listener = NULL;
struct rdma_cm_id *child_cm_id = NULL, *listener_cm_id = NULL;
struct rdma_conn_param conn_param = {
0,
};
gf_rdma_private_t *priv = NULL;
gf_rdma_options_t *options = NULL;
child_cm_id = event->id;
listener_cm_id = event->listen_id;
listener = listener_cm_id->context;
priv = listener->private;
options = &priv->options;
this = gf_rdma_transport_new(listener, child_cm_id);
if (this == NULL) {
gf_msg(listener->name, GF_LOG_WARNING, 0,
RDMA_MSG_CREAT_INC_TRANS_FAILED,
"could not create "
"a transport for incoming connection"
" (me.name:%s me.identifier:%s)",
listener->name, listener->myinfo.identifier);
rdma_destroy_id(child_cm_id);
goto out;
}
gf_msg_trace(listener->name, 0,
"got a connect request (me:%s peer:"
"%s)",
listener->myinfo.identifier, this->peerinfo.identifier);
ret = gf_rdma_create_qp(this);
if (ret < 0) {
gf_msg(listener->name, GF_LOG_WARNING, 0, RDMA_MSG_CREAT_QP_FAILED,
"could not create QP "
"(peer:%s me:%s)",
this->peerinfo.identifier, this->myinfo.identifier);
gf_rdma_cm_handle_disconnect(this);
goto out;
}
conn_param.responder_resources = 1;
conn_param.initiator_depth = 1;
conn_param.retry_count = options->attr_retry_cnt;
conn_param.rnr_retry_count = options->attr_rnr_retry;
ret = rdma_accept(child_cm_id, &conn_param);
if (ret < 0) {
gf_msg(listener->name, GF_LOG_WARNING, errno, RDMA_MSG_ACCEPT_FAILED,
"rdma_accept failed peer:%s "
"me:%s",
this->peerinfo.identifier, this->myinfo.identifier);
gf_rdma_cm_handle_disconnect(this);
goto out;
}
gf_rdma_cm_handle_connect_init(event);
ret = 0;
out:
return ret;
}
static int
gf_rdma_cm_handle_route_resolved(struct rdma_cm_event *event)
{
struct rdma_conn_param conn_param = {
0,
};
int ret = 0;
rpc_transport_t *this = NULL;
gf_rdma_private_t *priv = NULL;
gf_rdma_peer_t *peer = NULL;
gf_rdma_options_t *options = NULL;
if (event == NULL) {
goto out;
}
this = event->id->context;
priv = this->private;
peer = &priv->peer;
options = &priv->options;
ret = gf_rdma_create_qp(this);
if (ret != 0) {
gf_msg(this->name, GF_LOG_WARNING, 0, RDMA_MSG_CREAT_QP_FAILED,
"could not create QP "
"(peer:%s me:%s)",
this->peerinfo.identifier, this->myinfo.identifier);
gf_rdma_cm_handle_disconnect(this);
goto out;
}
memset(&conn_param, 0, sizeof conn_param);
conn_param.responder_resources = 1;
conn_param.initiator_depth = 1;
conn_param.retry_count = options->attr_retry_cnt;
conn_param.rnr_retry_count = options->attr_rnr_retry;
ret = rdma_connect(peer->cm_id, &conn_param);
if (ret != 0) {
gf_msg(this->name, GF_LOG_WARNING, errno, RDMA_MSG_CONNECT_FAILED,
"rdma_connect failed");
gf_rdma_cm_handle_disconnect(this);
goto out;
}
gf_msg_trace(this->name, 0, "route resolved (me:%s peer:%s)",
this->myinfo.identifier, this->peerinfo.identifier);
ret = 0;
out:
return ret;
}
static int
gf_rdma_cm_handle_addr_resolved(struct rdma_cm_event *event)
{
rpc_transport_t *this = NULL;
gf_rdma_peer_t *peer = NULL;
gf_rdma_private_t *priv = NULL;
int ret = 0;
this = event->id->context;
priv = this->private;
peer = &priv->peer;
GF_ASSERT(peer->cm_id == event->id);
this->myinfo.sockaddr_len = sizeof(peer->cm_id->route.addr.src_addr);
memcpy(&this->myinfo.sockaddr, &peer->cm_id->route.addr.src_addr,
this->myinfo.sockaddr_len);
this->peerinfo.sockaddr_len = sizeof(peer->cm_id->route.addr.dst_addr);
memcpy(&this->peerinfo.sockaddr, &peer->cm_id->route.addr.dst_addr,
this->peerinfo.sockaddr_len);
gf_rdma_get_transport_identifiers(this);
ret = rdma_resolve_route(peer->cm_id, 2000);
if (ret != 0) {
gf_msg(this->name, GF_LOG_WARNING, errno, RDMA_MSG_ROUTE_RESOLVE_FAILED,
"rdma_resolve_route "
"failed (me:%s peer:%s)",
this->myinfo.identifier, this->peerinfo.identifier);
gf_rdma_cm_handle_disconnect(this);
return ret;
}
gf_msg_trace(this->name, 0, "Address resolved (me:%s peer:%s)",
this->myinfo.identifier, this->peerinfo.identifier);
return ret;
}
static void
gf_rdma_cm_handle_disconnect(rpc_transport_t *this)
{
gf_rdma_private_t *priv = NULL;
char need_unref = 0;
priv = this->private;
gf_msg_debug(this->name, 0, "peer disconnected, cleaning up");
pthread_mutex_lock(&priv->write_mutex);
{
if (priv->peer.cm_id != NULL) {
need_unref = 1;
priv->connected = 0;
}
__gf_rdma_teardown(this);
}
pthread_mutex_unlock(&priv->write_mutex);
rpc_transport_notify(this, RPC_TRANSPORT_DISCONNECT, this);
if (need_unref)
rpc_transport_unref(this);
}
static int
gf_rdma_cm_handle_connect_init(struct rdma_cm_event *event)
{
rpc_transport_t *this = NULL;
gf_rdma_private_t *priv = NULL;
struct rdma_cm_id *cm_id = NULL;
int ret = 0;
cm_id = event->id;
this = cm_id->context;
priv = this->private;
if (priv->connected == 1) {
gf_msg_trace(this->name, 0,
"received event "
"RDMA_CM_EVENT_ESTABLISHED (me:%s peer:%s)",
this->myinfo.identifier, this->peerinfo.identifier);
return ret;
}
priv->connected = 1;
pthread_mutex_lock(&priv->write_mutex);
{
priv->peer.quota = 1;
priv->peer.quota_set = 0;
}
pthread_mutex_unlock(&priv->write_mutex);
if (priv->entity == GF_RDMA_CLIENT) {
gf_msg_trace(this->name, 0,
"received event "
"RDMA_CM_EVENT_ESTABLISHED (me:%s peer:%s)",
this->myinfo.identifier, this->peerinfo.identifier);
ret = rpc_transport_notify(this, RPC_TRANSPORT_CONNECT, this);
} else if (priv->entity == GF_RDMA_SERVER) {
ret = rpc_transport_notify(priv->listener, RPC_TRANSPORT_ACCEPT, this);
}
if (ret < 0) {
gf_rdma_disconnect(this, _gf_false);
}
return ret;
}
static int
gf_rdma_cm_handle_event_error(rpc_transport_t *this)
{
gf_rdma_private_t *priv = NULL;
priv = this->private;
if (priv->entity != GF_RDMA_SERVER_LISTENER) {
gf_rdma_cm_handle_disconnect(this);
}
return 0;
}
static int
gf_rdma_cm_handle_device_removal(struct rdma_cm_event *event)
{
return 0;
}
static void *
gf_rdma_cm_event_handler(void *data)
{
struct rdma_cm_event *event = NULL;
int ret = 0;
rpc_transport_t *this = NULL;
struct rdma_event_channel *event_channel = NULL;
event_channel = data;
while (1) {
ret = rdma_get_cm_event(event_channel, &event);
if (ret != 0) {
gf_msg(GF_RDMA_LOG_NAME, GF_LOG_WARNING, errno,
RDMA_MSG_CM_EVENT_FAILED, "rdma_cm_get_event failed");
break;
}
switch (event->event) {
case RDMA_CM_EVENT_ADDR_RESOLVED:
gf_rdma_cm_handle_addr_resolved(event);
break;
case RDMA_CM_EVENT_ROUTE_RESOLVED:
gf_rdma_cm_handle_route_resolved(event);
break;
case RDMA_CM_EVENT_CONNECT_REQUEST:
gf_rdma_cm_handle_connect_request(event);
break;
case RDMA_CM_EVENT_ESTABLISHED:
gf_rdma_cm_handle_connect_init(event);
break;
case RDMA_CM_EVENT_ADDR_ERROR:
case RDMA_CM_EVENT_ROUTE_ERROR:
case RDMA_CM_EVENT_CONNECT_ERROR:
case RDMA_CM_EVENT_UNREACHABLE:
case RDMA_CM_EVENT_REJECTED:
this = event->id->context;
gf_msg(this->name, GF_LOG_WARNING, 0, RDMA_MSG_CM_EVENT_FAILED,
"cma event %s, "
"error %d (me:%s peer:%s)\n",
rdma_event_str(event->event), event->status,
this->myinfo.identifier, this->peerinfo.identifier);
rdma_ack_cm_event(event);
event = NULL;
gf_rdma_cm_handle_event_error(this);
continue;
case RDMA_CM_EVENT_DISCONNECTED:
this = event->id->context;
gf_msg_debug(this->name, 0,
"received disconnect "
"(me:%s peer:%s)\n",
this->myinfo.identifier,
this->peerinfo.identifier);
rdma_ack_cm_event(event);
event = NULL;
gf_rdma_cm_handle_disconnect(this);
continue;
case RDMA_CM_EVENT_DEVICE_REMOVAL:
gf_msg(GF_RDMA_LOG_NAME, GF_LOG_WARNING, 0,
RDMA_MSG_CM_EVENT_FAILED,
"device "
"removed");
gf_rdma_cm_handle_device_removal(event);
break;
default:
gf_msg(GF_RDMA_LOG_NAME, GF_LOG_WARNING, 0,
RDMA_MSG_CM_EVENT_FAILED,
"unhandled event: %s, ignoring",
rdma_event_str(event->event));
break;
}
rdma_ack_cm_event(event);
}
return NULL;
}
static int32_t
gf_rdma_post_send(struct ibv_qp *qp, gf_rdma_post_t *post, int32_t len)
{
struct ibv_sge list = {.addr = (unsigned long)post->buf,
.length = len,
.lkey = post->mr->lkey};
struct ibv_send_wr wr =
{
.wr_id = (unsigned long)post,
.sg_list = &list,
.num_sge = 1,
.opcode = IBV_WR_SEND,
.send_flags = IBV_SEND_SIGNALED,
},
*bad_wr;
if (!qp)
return EINVAL;
return ibv_post_send(qp, &wr, &bad_wr);
}
int
__gf_rdma_encode_error(gf_rdma_peer_t *peer, gf_rdma_reply_info_t *reply_info,
struct iovec *rpchdr, gf_rdma_header_t *hdr,
gf_rdma_errcode_t err)
{
struct rpc_msg *rpc_msg = NULL;
if (reply_info != NULL) {
hdr->rm_xid = hton32(reply_info->rm_xid);
} else {
rpc_msg = rpchdr[0].iov_base; /* assume rpchdr contains
* only one vector.
* (which is true)
*/
hdr->rm_xid = rpc_msg->rm_xid;
}
hdr->rm_vers = hton32(GF_RDMA_VERSION);
hdr->rm_credit = hton32(peer->send_count);
hdr->rm_type = hton32(GF_RDMA_ERROR);
hdr->rm_body.rm_error.rm_type = hton32(err);
if (err == ERR_VERS) {
hdr->rm_body.rm_error.rm_version.gf_rdma_vers_low = hton32(
GF_RDMA_VERSION);
hdr->rm_body.rm_error.rm_version.gf_rdma_vers_high = hton32(
GF_RDMA_VERSION);
}
return sizeof(*hdr);
}
int32_t
__gf_rdma_send_error(gf_rdma_peer_t *peer, gf_rdma_ioq_t *entry,
gf_rdma_post_t *post, gf_rdma_reply_info_t *reply_info,
gf_rdma_errcode_t err)
{
int32_t ret = -1, len = 0;
len = __gf_rdma_encode_error(peer, reply_info, entry->rpchdr,
(gf_rdma_header_t *)post->buf, err);
if (len == -1) {
gf_msg(GF_RDMA_LOG_NAME, GF_LOG_ERROR, 0, RDMA_MSG_ENCODE_ERROR,
"encode error returned -1");
goto out;
}
gf_rdma_post_ref(post);
ret = gf_rdma_post_send(peer->qp, post, len);
if (!ret) {
ret = len;
} else {
gf_msg(GF_RDMA_LOG_NAME, GF_LOG_WARNING, 0, RDMA_MSG_POST_SEND_FAILED,
"gf_rdma_post_send (to %s) failed with ret = %d (%s)",
peer->trans->peerinfo.identifier, ret,
(ret > 0) ? strerror(ret) : "");
gf_rdma_post_unref(post);
__gf_rdma_disconnect(peer->trans);
ret = -1;
}
out:
return ret;
}
int32_t
__gf_rdma_create_read_chunks_from_vector(gf_rdma_peer_t *peer,
gf_rdma_read_chunk_t **readch_ptr,
int32_t *pos, struct iovec *vector,
int count,
gf_rdma_request_context_t *request_ctx)
{
int i = 0;
gf_rdma_private_t *priv = NULL;
gf_rdma_device_t *device = NULL;
struct ibv_mr *mr = NULL;
gf_rdma_read_chunk_t *readch = NULL;
int32_t ret = -1;
GF_VALIDATE_OR_GOTO(GF_RDMA_LOG_NAME, peer, out);
GF_VALIDATE_OR_GOTO(GF_RDMA_LOG_NAME, readch_ptr, out);
GF_VALIDATE_OR_GOTO(GF_RDMA_LOG_NAME, *readch_ptr, out);
GF_VALIDATE_OR_GOTO(GF_RDMA_LOG_NAME, request_ctx, out);
GF_VALIDATE_OR_GOTO(GF_RDMA_LOG_NAME, vector, out);
priv = peer->trans->private;
device = priv->device;
readch = *readch_ptr;
for (i = 0; i < count; i++) {
readch->rc_discrim = hton32(1);
readch->rc_position = hton32(*pos);
mr = gf_rdma_get_pre_registred_mr(
peer->trans, (void *)vector[i].iov_base, vector[i].iov_len);
if (!mr) {
mr = ibv_reg_mr(device->pd, vector[i].iov_base, vector[i].iov_len,
IBV_ACCESS_REMOTE_READ);
}
if (!mr) {
gf_msg(GF_RDMA_LOG_NAME, GF_LOG_WARNING, errno,
RDMA_MSG_MR_ALOC_FAILED,
"memory registration failed (peer:%s)",
peer->trans->peerinfo.identifier);
goto out;
}
request_ctx->mr[request_ctx->mr_count++] = mr;
readch->rc_target.rs_handle = hton32(mr->rkey);
readch->rc_target.rs_length = hton32(vector[i].iov_len);
readch->rc_target.rs_offset = hton64(
(uint64_t)(unsigned long)vector[i].iov_base);
*pos = *pos + vector[i].iov_len;
readch++;
}
*readch_ptr = readch;
ret = 0;
out:
return ret;
}
int32_t
__gf_rdma_create_read_chunks(gf_rdma_peer_t *peer, gf_rdma_ioq_t *entry,
gf_rdma_chunktype_t type, uint32_t **ptr,
gf_rdma_request_context_t *request_ctx)
{
int32_t ret = -1;
int pos = 0;
GF_VALIDATE_OR_GOTO(GF_RDMA_LOG_NAME, peer, out);
GF_VALIDATE_OR_GOTO(GF_RDMA_LOG_NAME, entry, out);
GF_VALIDATE_OR_GOTO(GF_RDMA_LOG_NAME, ptr, out);
GF_VALIDATE_OR_GOTO(GF_RDMA_LOG_NAME, *ptr, out);
GF_VALIDATE_OR_GOTO(GF_RDMA_LOG_NAME, request_ctx, out);
request_ctx->iobref = iobref_ref(entry->iobref);
if (type == gf_rdma_areadch) {
pos = 0;
ret = __gf_rdma_create_read_chunks_from_vector(
peer, (gf_rdma_read_chunk_t **)ptr, &pos, entry->rpchdr,
entry->rpchdr_count, request_ctx);
if (ret == -1) {
gf_msg(GF_RDMA_LOG_NAME, GF_LOG_WARNING, 0,
RDMA_MSG_READ_CHUNK_VECTOR_FAILED,
"cannot create read chunks from vector "
"entry->rpchdr");
goto out;
}
ret = __gf_rdma_create_read_chunks_from_vector(
peer, (gf_rdma_read_chunk_t **)ptr, &pos, entry->proghdr,
entry->proghdr_count, request_ctx);
if (ret == -1) {
gf_msg(GF_RDMA_LOG_NAME, GF_LOG_WARNING, 0,
RDMA_MSG_READ_CHUNK_VECTOR_FAILED,
"cannot create read chunks from vector "
"entry->proghdr");
}
if (entry->prog_payload_count != 0) {
ret = __gf_rdma_create_read_chunks_from_vector(
peer, (gf_rdma_read_chunk_t **)ptr, &pos, entry->prog_payload,
entry->prog_payload_count, request_ctx);
if (ret == -1) {
gf_msg(GF_RDMA_LOG_NAME, GF_LOG_WARNING, 0,
RDMA_MSG_READ_CHUNK_VECTOR_FAILED,
"cannot create read chunks from vector"
" entry->prog_payload");
}
}
} else {
pos = iov_length(entry->rpchdr, entry->rpchdr_count);
ret = __gf_rdma_create_read_chunks_from_vector(
peer, (gf_rdma_read_chunk_t **)ptr, &pos, entry->prog_payload,
entry->prog_payload_count, request_ctx);
if (ret == -1) {
gf_msg(GF_RDMA_LOG_NAME, GF_LOG_WARNING, 0,
RDMA_MSG_READ_CHUNK_VECTOR_FAILED,
"cannot create read chunks from vector "
"entry->prog_payload");
}
}
/* terminate read-chunk list*/
**ptr = 0;
*ptr = *ptr + 1;
out:
return ret;
}
int32_t
__gf_rdma_create_write_chunks_from_vector(
gf_rdma_peer_t *peer, gf_rdma_write_chunk_t **writech_ptr,
struct iovec *vector, int count, gf_rdma_request_context_t *request_ctx)
{
int i = 0;
gf_rdma_private_t *priv = NULL;
gf_rdma_device_t *device = NULL;
struct ibv_mr *mr = NULL;
gf_rdma_write_chunk_t *writech = NULL;
int32_t ret = -1;
GF_VALIDATE_OR_GOTO(GF_RDMA_LOG_NAME, peer, out);
GF_VALIDATE_OR_GOTO(GF_RDMA_LOG_NAME, writech_ptr, out);
GF_VALIDATE_OR_GOTO(GF_RDMA_LOG_NAME, *writech_ptr, out);
GF_VALIDATE_OR_GOTO(GF_RDMA_LOG_NAME, request_ctx, out);
GF_VALIDATE_OR_GOTO(GF_RDMA_LOG_NAME, vector, out);
writech = *writech_ptr;
priv = peer->trans->private;
device = priv->device;
for (i = 0; i < count; i++) {
mr = gf_rdma_get_pre_registred_mr(
peer->trans, (void *)vector[i].iov_base, vector[i].iov_len);
if (!mr) {
mr = ibv_reg_mr(device->pd, vector[i].iov_base, vector[i].iov_len,
IBV_ACCESS_REMOTE_WRITE | IBV_ACCESS_LOCAL_WRITE);
}
if (!mr) {
gf_msg(GF_RDMA_LOG_NAME, GF_LOG_WARNING, errno,
RDMA_MSG_MR_ALOC_FAILED,
"memory "
"registration failed (peer:%s)",
peer->trans->peerinfo.identifier);
goto out;
}
request_ctx->mr[request_ctx->mr_count++] = mr;
writech->wc_target.rs_handle = hton32(mr->rkey);
writech->wc_target.rs_length = hton32(vector[i].iov_len);
writech->wc_target.rs_offset = hton64(
((uint64_t)(unsigned long)vector[i].iov_base));
writech++;
}
*writech_ptr = writech;
ret = 0;
out:
return ret;
}
int32_t
__gf_rdma_create_write_chunks(gf_rdma_peer_t *peer, gf_rdma_ioq_t *entry,
gf_rdma_chunktype_t chunk_type, uint32_t **ptr,
gf_rdma_request_context_t *request_ctx)
{
int32_t ret = -1;
gf_rdma_write_array_t *warray = NULL;
GF_VALIDATE_OR_GOTO(GF_RDMA_LOG_NAME, peer, out);
GF_VALIDATE_OR_GOTO(GF_RDMA_LOG_NAME, ptr, out);
GF_VALIDATE_OR_GOTO(GF_RDMA_LOG_NAME, *ptr, out);
GF_VALIDATE_OR_GOTO(GF_RDMA_LOG_NAME, request_ctx, out);
GF_VALIDATE_OR_GOTO(GF_RDMA_LOG_NAME, entry, out);
if ((chunk_type == gf_rdma_replych) &&
((entry->msg.request.rsphdr_count != 1) ||
(entry->msg.request.rsphdr_vec[0].iov_base == NULL))) {
gf_msg(GF_RDMA_LOG_NAME, GF_LOG_WARNING, 0, RDMA_MSG_BUFFER_ERROR,
(entry->msg.request.rsphdr_count == 1)
? "chunktype specified as reply chunk but the vector "
"specifying the buffer to be used for holding reply"
" header is not correct"
: "chunktype specified as reply chunk, but more than one "
"buffer provided for holding reply");
goto out;
}
/*
if ((chunk_type == gf_rdma_writech)
&& ((entry->msg.request.rsphdr_count == 0)
|| (entry->msg.request.rsphdr_vec[0].iov_base == NULL))) {
gf_msg_debug (GF_RDMA_LOG_NAME, 0,
"vector specifying buffer to hold the program's reply "
"header should also be provided when buffers are "
"provided for holding the program's payload in reply");
goto out;
}
*/
if (chunk_type == gf_rdma_writech) {
warray = (gf_rdma_write_array_t *)*ptr;
warray->wc_discrim = hton32(1);
warray->wc_nchunks = hton32(entry->msg.request.rsp_payload_count);
*ptr = (uint32_t *)&warray->wc_array[0];
ret = __gf_rdma_create_write_chunks_from_vector(
peer, (gf_rdma_write_chunk_t **)ptr, entry->msg.request.rsp_payload,
entry->msg.request.rsp_payload_count, request_ctx);
if (ret == -1) {
gf_msg(GF_RDMA_LOG_NAME, GF_LOG_WARNING, 0,
RDMA_MSG_WRITE_CHUNK_VECTOR_FAILED,
"cannot create write chunks from vector "
"entry->rpc_payload");
goto out;
}
/* terminate write chunklist */
**ptr = 0;
*ptr = *ptr + 1;
/* no reply chunklist */
**ptr = 0;
*ptr = *ptr + 1;
} else {
/* no write chunklist */
**ptr = 0;
*ptr = *ptr + 1;
warray = (gf_rdma_write_array_t *)*ptr;
warray->wc_discrim = hton32(1);
warray->wc_nchunks = hton32(entry->msg.request.rsphdr_count);
*ptr = (uint32_t *)&warray->wc_array[0];
ret = __gf_rdma_create_write_chunks_from_vector(
peer, (gf_rdma_write_chunk_t **)ptr, entry->msg.request.rsphdr_vec,
entry->msg.request.rsphdr_count, request_ctx);
if (ret == -1) {
gf_msg(GF_RDMA_LOG_NAME, GF_LOG_WARNING, 0,
RDMA_MSG_WRITE_CHUNK_VECTOR_FAILED,
"cannot create write chunks from vector "
"entry->rpchdr");
goto out;
}
/* terminate reply chunklist */
**ptr = 0;
*ptr = *ptr + 1;
}
out:
return ret;
}
static void
__gf_rdma_deregister_mr(gf_rdma_device_t *device, struct ibv_mr **mr, int count)
{
gf_rdma_arena_mr *tmp = NULL;
gf_rdma_arena_mr *dummy = NULL;
int i = 0;
int found = 0;
if (mr == NULL) {
goto out;
}
for (i = 0; i < count; i++) {
found = 0;
pthread_mutex_lock(&device->all_mr_lock);
{
if (!list_empty(&device->all_mr)) {
list_for_each_entry_safe(tmp, dummy, &device->all_mr, list)
{
if (tmp->mr == mr[i]) {
found = 1;
break;
}
}
}
}
pthread_mutex_unlock(&device->all_mr_lock);
if (!found)
ibv_dereg_mr(mr[i]);
}
out:
return;
}
static int32_t
__gf_rdma_quota_put(gf_rdma_peer_t *peer)
{
int32_t ret = 0;
peer->quota++;
ret = peer->quota;
if (!list_empty(&peer->ioq)) {
ret = __gf_rdma_ioq_churn(peer);
}
return ret;
}
static int32_t
gf_rdma_quota_put(gf_rdma_peer_t *peer)
{
int32_t ret = 0;
gf_rdma_private_t *priv = NULL;
priv = peer->trans->private;
pthread_mutex_lock(&priv->write_mutex);
{
ret = __gf_rdma_quota_put(peer);
}
pthread_mutex_unlock(&priv->write_mutex);
return ret;
}
/* to be called with priv->mutex held */
void
__gf_rdma_request_context_destroy(gf_rdma_request_context_t *context)
{
gf_rdma_peer_t *peer = NULL;
gf_rdma_private_t *priv = NULL;
gf_rdma_device_t *device = NULL;
int32_t ret = 0;
if (context == NULL) {
goto out;
}
peer = context->peer;
priv = peer->trans->private;
device = priv->device;
__gf_rdma_deregister_mr(device, context->mr, context->mr_count);
if (priv->connected) {
ret = __gf_rdma_quota_put(peer);
if (ret < 0) {
gf_msg_debug("rdma", 0, "failed to send message");
mem_put(context);
__gf_rdma_disconnect(peer->trans);
goto out;
}
}
if (context->iobref != NULL) {
iobref_unref(context->iobref);
context->iobref = NULL;
}
if (context->rsp_iobref != NULL) {
iobref_unref(context->rsp_iobref);
context->rsp_iobref = NULL;
}
mem_put(context);
out:
return;
}
void
gf_rdma_post_context_destroy(gf_rdma_device_t *device,
gf_rdma_post_context_t *ctx)
{
if (ctx == NULL) {
goto out;
}
__gf_rdma_deregister_mr(device, ctx->mr, ctx->mr_count);
if (ctx->iobref != NULL) {
iobref_unref(ctx->iobref);
}
if (ctx->hdr_iobuf != NULL) {
iobuf_unref(ctx->hdr_iobuf);
}
memset(ctx, 0, sizeof(*ctx));
out:
return;
}
int
gf_rdma_post_unref(gf_rdma_post_t *post)
{
int refcount = -1;
if (post == NULL) {
goto out;
}
pthread_mutex_lock(&post->lock);
{
refcount = --post->refcount;
}
pthread_mutex_unlock(&post->lock);
if (refcount == 0) {
gf_rdma_post_context_destroy(post->device, &post->ctx);
if (post->type == GF_RDMA_SEND_POST) {
gf_rdma_put_post(&post->device->sendq, post);
} else {
gf_rdma_post_recv(post->device->srq, post);
}
}
out:
return refcount;
}
int
gf_rdma_post_get_refcount(gf_rdma_post_t *post)
{
int refcount = -1;
if (post == NULL) {
goto out;
}
pthread_mutex_lock(&post->lock);
{
refcount = post->refcount;
}
pthread_mutex_unlock(&post->lock);
out:
return refcount;
}
gf_rdma_post_t *
gf_rdma_post_ref(gf_rdma_post_t *post)
{
if (post == NULL) {
goto out;
}
pthread_mutex_lock(&post->lock);
{
post->refcount++;
}
pthread_mutex_unlock(&post->lock);
out:
return post;
}
int32_t
__gf_rdma_ioq_churn_request(gf_rdma_peer_t *peer, gf_rdma_ioq_t *entry,
gf_rdma_post_t *post)
{
gf_rdma_chunktype_t rtype = gf_rdma_noch;
gf_rdma_chunktype_t wtype = gf_rdma_noch;
uint64_t send_size = 0;
gf_rdma_header_t *hdr = NULL;
struct rpc_msg *rpc_msg = NULL;
uint32_t *chunkptr = NULL;
char *buf = NULL;
int32_t ret = 0;
gf_rdma_private_t *priv = NULL;
gf_rdma_device_t *device = NULL;
int chunk_count = 0;
gf_rdma_request_context_t *request_ctx = NULL;
uint32_t prog_payload_length = 0, len = 0;
struct rpc_req *rpc_req = NULL;
GF_VALIDATE_OR_GOTO(GF_RDMA_LOG_NAME, peer, out);
GF_VALIDATE_OR_GOTO(GF_RDMA_LOG_NAME, entry, out);
GF_VALIDATE_OR_GOTO(GF_RDMA_LOG_NAME, post, out);
if ((entry->msg.request.rsphdr_count != 0) &&
(entry->msg.request.rsp_payload_count != 0)) {
ret = -1;
gf_msg(GF_RDMA_LOG_NAME, GF_LOG_WARNING, 0,
RDMA_MSG_WRITE_REPLY_CHUNCK_CONFLICT,
"both write-chunklist and reply-chunk cannot be "
"present");
goto out;
}
post->ctx.is_request = 1;
priv = peer->trans->private;
device = priv->device;
hdr = (gf_rdma_header_t *)post->buf;
send_size = iov_length(entry->rpchdr, entry->rpchdr_count) +
iov_length(entry->proghdr, entry->proghdr_count) +
GLUSTERFS_RDMA_MAX_HEADER_SIZE;
if (entry->prog_payload_count != 0) {
prog_payload_length = iov_length(entry->prog_payload,
entry->prog_payload_count);
}
if (send_size > GLUSTERFS_RDMA_INLINE_THRESHOLD) {
rtype = gf_rdma_areadch;
} else if ((send_size + prog_payload_length) <
GLUSTERFS_RDMA_INLINE_THRESHOLD) {
rtype = gf_rdma_noch;
} else if (entry->prog_payload_count != 0) {
rtype = gf_rdma_readch;
}
if (entry->msg.request.rsphdr_count != 0) {
wtype = gf_rdma_replych;
} else if (entry->msg.request.rsp_payload_count != 0) {
wtype = gf_rdma_writech;
}
if (rtype == gf_rdma_readch) {
chunk_count += entry->prog_payload_count;
} else if (rtype == gf_rdma_areadch) {
chunk_count += entry->rpchdr_count;
chunk_count += entry->proghdr_count;
}
if (wtype == gf_rdma_writech) {
chunk_count += entry->msg.request.rsp_payload_count;
} else if (wtype == gf_rdma_replych) {
chunk_count += entry->msg.request.rsphdr_count;
}
if (chunk_count > GF_RDMA_MAX_SEGMENTS) {
ret = -1;
gf_msg(GF_RDMA_LOG_NAME, GF_LOG_WARNING, 0,
RDMA_MSG_CHUNK_COUNT_GREAT_MAX_SEGMENTS,
"chunk count(%d) exceeding maximum allowed RDMA "
"segment count(%d)",
chunk_count, GF_RDMA_MAX_SEGMENTS);
goto out;
}
if ((wtype != gf_rdma_noch) || (rtype != gf_rdma_noch)) {
request_ctx = mem_get(device->request_ctx_pool);
if (request_ctx == NULL) {
ret = -1;
goto out;
}
memset(request_ctx, 0, sizeof(*request_ctx));
request_ctx->pool = device->request_ctx_pool;
request_ctx->peer = peer;
entry->msg.request.rpc_req->conn_private = request_ctx;
if (entry->msg.request.rsp_iobref != NULL) {
request_ctx->rsp_iobref = iobref_ref(entry->msg.request.rsp_iobref);
}
}
rpc_msg = (struct rpc_msg *)entry->rpchdr[0].iov_base;
hdr->rm_xid = rpc_msg->rm_xid; /* no need of hton32(rpc_msg->rm_xid),
* since rpc_msg->rm_xid is already
* hton32ed value of actual xid
*/
hdr->rm_vers = hton32(GF_RDMA_VERSION);
hdr->rm_credit = hton32(peer->send_count);
if (rtype == gf_rdma_areadch) {
hdr->rm_type = hton32(GF_RDMA_NOMSG);
} else {
hdr->rm_type = hton32(GF_RDMA_MSG);
}
chunkptr = &hdr->rm_body.rm_chunks[0];
if (rtype != gf_rdma_noch) {
ret = __gf_rdma_create_read_chunks(peer, entry, rtype, &chunkptr,
request_ctx);
if (ret != 0) {
gf_msg(GF_RDMA_LOG_NAME, GF_LOG_WARNING, 0,
RDMA_MSG_CREATE_READ_CHUNK_FAILED,
"creation of read chunks failed");
goto out;
}
} else {
*chunkptr++ = 0; /* no read chunks */
}
if (wtype != gf_rdma_noch) {
ret = __gf_rdma_create_write_chunks(peer, entry, wtype, &chunkptr,
request_ctx);
if (ret != 0) {
gf_msg(GF_RDMA_LOG_NAME, GF_LOG_WARNING, 0,
RDMA_MSG_CREATE_WRITE_REPLAY_FAILED,
"creation of write/reply chunk failed");
goto out;
}
} else {
*chunkptr++ = 0; /* no write chunks */
*chunkptr++ = 0; /* no reply chunk */
}
buf = (char *)chunkptr;
if (rtype != gf_rdma_areadch) {
iov_unload(buf, entry->rpchdr, entry->rpchdr_count);
buf += iov_length(entry->rpchdr, entry->rpchdr_count);
iov_unload(buf, entry->proghdr, entry->proghdr_count);
buf += iov_length(entry->proghdr, entry->proghdr_count);
if (rtype != gf_rdma_readch) {
iov_unload(buf, entry->prog_payload, entry->prog_payload_count);
buf += iov_length(entry->prog_payload, entry->prog_payload_count);
}
}
len = buf - post->buf;
gf_rdma_post_ref(post);
ret = gf_rdma_post_send(peer->qp, post, len);
if (!ret) {
ret = len;
} else {
gf_msg(GF_RDMA_LOG_NAME, GF_LOG_WARNING, 0, RDMA_MSG_POST_SEND_FAILED,
"gf_rdma_post_send (to %s) failed with ret = %d (%s)",
peer->trans->peerinfo.identifier, ret,
(ret > 0) ? strerror(ret) : "");
gf_rdma_post_unref(post);
__gf_rdma_disconnect(peer->trans);
ret = -1;
}
out:
if (ret == -1) {
rpc_req = entry->msg.request.rpc_req;
if (request_ctx != NULL) {
__gf_rdma_request_context_destroy(rpc_req->conn_private);
}
rpc_req->conn_private = NULL;
}
return ret;
}
static void
__gf_rdma_fill_reply_header(gf_rdma_header_t *header, struct iovec *rpchdr,
gf_rdma_reply_info_t *reply_info, int credits)
{
struct rpc_msg *rpc_msg = NULL;
if (reply_info != NULL) {
header->rm_xid = hton32(reply_info->rm_xid);
} else {
rpc_msg = rpchdr[0].iov_base; /* assume rpchdr contains
* only one vector.
* (which is true)
*/
header->rm_xid = rpc_msg->rm_xid;
}
header->rm_type = hton32(GF_RDMA_MSG);
header->rm_vers = hton32(GF_RDMA_VERSION);
header->rm_credit = hton32(credits);
header->rm_body.rm_chunks[0] = 0; /* no read chunks */
header->rm_body.rm_chunks[1] = 0; /* no write chunks */
header->rm_body.rm_chunks[2] = 0; /* no reply chunks */
return;
}
int32_t
__gf_rdma_send_reply_inline(gf_rdma_peer_t *peer, gf_rdma_ioq_t *entry,
gf_rdma_post_t *post,
gf_rdma_reply_info_t *reply_info)
{
gf_rdma_header_t *header = NULL;
int32_t send_size = 0, ret = 0;
char *buf = NULL;
send_size = iov_length(entry->rpchdr, entry->rpchdr_count) +
iov_length(entry->proghdr, entry->proghdr_count) +
iov_length(entry->prog_payload, entry->prog_payload_count) +
sizeof(gf_rdma_header_t); /*
* remember, no chunklists in the
* reply
*/
if (send_size > GLUSTERFS_RDMA_INLINE_THRESHOLD) {
ret = __gf_rdma_send_error(peer, entry, post, reply_info, ERR_CHUNK);
gf_msg(GF_RDMA_LOG_NAME, GF_LOG_WARNING, 0,
RDMA_MSG_SEND_SIZE_GREAT_INLINE_THRESHOLD,
"msg size (%d) is greater than maximum size "
"of msg that can be sent inlined (%d)",
send_size, GLUSTERFS_RDMA_INLINE_THRESHOLD);
goto out;
}
header = (gf_rdma_header_t *)post->buf;
__gf_rdma_fill_reply_header(header, entry->rpchdr, reply_info,
peer->send_count);
buf = (char *)&header->rm_body.rm_chunks[3];
if (entry->rpchdr_count != 0) {
iov_unload(buf, entry->rpchdr, entry->rpchdr_count);
buf += iov_length(entry->rpchdr, entry->rpchdr_count);
}
if (entry->proghdr_count != 0) {
iov_unload(buf, entry->proghdr, entry->proghdr_count);
buf += iov_length(entry->proghdr, entry->proghdr_count);
}
if (entry->prog_payload_count != 0) {
iov_unload(buf, entry->prog_payload, entry->prog_payload_count);
buf += iov_length(entry->prog_payload, entry->prog_payload_count);
}
gf_rdma_post_ref(post);
ret = gf_rdma_post_send(peer->qp, post, (buf - post->buf));
if (!ret) {
ret = send_size;
} else {
gf_msg(GF_RDMA_LOG_NAME, GF_LOG_WARNING, 0, RDMA_MSG_POST_SEND_FAILED,
"posting send (to %s) "
"failed with ret = %d (%s)",
peer->trans->peerinfo.identifier, ret,
(ret > 0) ? strerror(ret) : "");
gf_rdma_post_unref(post);
__gf_rdma_disconnect(peer->trans);
ret = -1;
}
out:
return ret;
}
int32_t
__gf_rdma_reply_encode_write_chunks(gf_rdma_peer_t *peer, uint32_t payload_size,
gf_rdma_post_t *post,
gf_rdma_reply_info_t *reply_info,
uint32_t **ptr)
{
uint32_t chunk_size = 0;
int32_t ret = -1;
gf_rdma_write_array_t *target_array = NULL;
int i = 0;
target_array = (gf_rdma_write_array_t *)*ptr;
for (i = 0; i < reply_info->wc_array->wc_nchunks; i++) {
chunk_size += reply_info->wc_array->wc_array[i].wc_target.rs_length;
}
if (chunk_size < payload_size) {
gf_msg_debug(GF_RDMA_LOG_NAME, 0,
"length of payload (%d) is "
"exceeding the total write chunk length (%d)",
payload_size, chunk_size);
goto out;
}
target_array->wc_discrim = hton32(1);
for (i = 0; (i < reply_info->wc_array->wc_nchunks) && (payload_size != 0);
i++) {
target_array->wc_array[i].wc_target.rs_offset = hton64(
reply_info->wc_array->wc_array[i].wc_target.rs_offset);
target_array->wc_array[i].wc_target.rs_length = hton32(
min(payload_size,
reply_info->wc_array->wc_array[i].wc_target.rs_length));
}
target_array->wc_nchunks = hton32(i);
target_array->wc_array[i].wc_target.rs_handle = 0; /* terminate
chunklist */
ret = 0;
*ptr = &target_array->wc_array[i].wc_target.rs_length;
out:
return ret;
}
static int32_t
__gf_rdma_register_local_mr_for_rdma(gf_rdma_peer_t *peer, struct iovec *vector,
int count, gf_rdma_post_context_t *ctx)
{
int i = 0;
int32_t ret = -1;
gf_rdma_private_t *priv = NULL;
gf_rdma_device_t *device = NULL;
GF_VALIDATE_OR_GOTO(GF_RDMA_LOG_NAME, ctx, out);
GF_VALIDATE_OR_GOTO(GF_RDMA_LOG_NAME, vector, out);
priv = peer->trans->private;
device = priv->device;
for (i = 0; i < count; i++) {
/* what if the memory is registered more than once?
* Assume that a single write buffer is passed to afr, which
* then passes it to its children. If more than one children
* happen to use rdma, then the buffer is registered more than
* once.
* Ib-verbs specification says that multiple registrations of
* same memory location is allowed. Refer to 10.6.3.8 of
* Infiniband Architecture Specification Volume 1
* (Release 1.2.1)
*/
ctx->mr[ctx->mr_count] = gf_rdma_get_pre_registred_mr(
peer->trans, (void *)vector[i].iov_base, vector[i].iov_len);
if (!ctx->mr[ctx->mr_count]) {
ctx->mr[ctx->mr_count] = ibv_reg_mr(device->pd, vector[i].iov_base,
vector[i].iov_len,
IBV_ACCESS_LOCAL_WRITE);
}
if (ctx->mr[ctx->mr_count] == NULL) {
gf_msg(GF_RDMA_LOG_NAME, GF_LOG_WARNING, errno,
RDMA_MSG_MR_ALOC_FAILED,
"registering memory for IBV_ACCESS_LOCAL_WRITE"
" failed");
goto out;
}
ctx->mr_count++;
}
ret = 0;
out:
return ret;
}
/* 1. assumes xfer_len of data is pointed by vector(s) starting from vec[*idx]
* 2. modifies vec
*/
int32_t
__gf_rdma_write(gf_rdma_peer_t *peer, gf_rdma_post_t *post, struct iovec *vec,
uint32_t xfer_len, int *idx, gf_rdma_write_chunk_t *writech)
{
int size = 0, num_sge = 0, i = 0;
int32_t ret = -1;
struct ibv_sge *sg_list = NULL;
struct ibv_send_wr wr =
{
.opcode = IBV_WR_RDMA_WRITE,
.send_flags = IBV_SEND_SIGNALED,
},
*bad_wr;
if ((peer == NULL) || (writech == NULL) || (idx == NULL) ||
(post == NULL) || (vec == NULL) || (xfer_len == 0)) {
goto out;
}
for (i = *idx; size < xfer_len; i++) {
size += vec[i].iov_len;
}
num_sge = i - *idx;
sg_list = GF_CALLOC(num_sge, sizeof(struct ibv_sge), gf_common_mt_sge);
if (sg_list == NULL) {
ret = -1;
goto out;
}
for ((i = *idx), (num_sge = 0); (xfer_len != 0); i++, num_sge++) {
size = min(xfer_len, vec[i].iov_len);
sg_list[num_sge].addr = (unsigned long)vec[i].iov_base;
sg_list[num_sge].length = size;
sg_list[num_sge].lkey = post->ctx.mr[i]->lkey;
xfer_len -= size;
}
*idx = i;
if (size < vec[i - 1].iov_len) {
vec[i - 1].iov_base += size;
vec[i - 1].iov_len -= size;
*idx = i - 1;
}
wr.sg_list = sg_list;
wr.num_sge = num_sge;
wr.wr_id = (unsigned long)gf_rdma_post_ref(post);
wr.wr.rdma.rkey = writech->wc_target.rs_handle;
wr.wr.rdma.remote_addr = writech->wc_target.rs_offset;
ret = ibv_post_send(peer->qp, &wr, &bad_wr);
if (ret) {
gf_msg(GF_RDMA_LOG_NAME, GF_LOG_WARNING, 0, RDMA_MSG_WRITE_CLIENT_ERROR,
"rdma write to "
"client (%s) failed with ret = %d (%s)",
peer->trans->peerinfo.identifier, ret,
(ret > 0) ? strerror(ret) : "");
ret = -1;
}
GF_FREE(sg_list);
out:
return ret;
}
int32_t
__gf_rdma_do_gf_rdma_write(gf_rdma_peer_t *peer, gf_rdma_post_t *post,
struct iovec *vector, int count,
struct iobref *iobref,
gf_rdma_reply_info_t *reply_info)
{
int i = 0, payload_idx = 0;
uint32_t payload_size = 0, xfer_len = 0;
int32_t ret = -1;
if (count != 0) {
payload_size = iov_length(vector, count);
}
if (payload_size == 0) {
ret = 0;
goto out;
}
ret = __gf_rdma_register_local_mr_for_rdma(peer, vector, count, &post->ctx);
if (ret == -1) {
gf_msg(GF_RDMA_LOG_NAME, GF_LOG_WARNING, 0, RDMA_MSG_MR_ALOC_FAILED,
"registering memory region for rdma failed");
goto out;
}
post->ctx.iobref = iobref_ref(iobref);
for (i = 0; (i < reply_info->wc_array->wc_nchunks) && (payload_size != 0);
i++) {
xfer_len = min(payload_size,
reply_info->wc_array->wc_array[i].wc_target.rs_length);
ret = __gf_rdma_write(peer, post, vector, xfer_len, &payload_idx,
&reply_info->wc_array->wc_array[i]);
if (ret == -1) {
gf_msg(GF_RDMA_LOG_NAME, GF_LOG_WARNING, 0,
RDMA_MSG_WRITE_CLIENT_ERROR,
"rdma write to "
"client (%s) failed",
peer->trans->peerinfo.identifier);
goto out;
}
payload_size -= xfer_len;
}
ret = 0;
out:
return ret;
}
int32_t
__gf_rdma_send_reply_type_nomsg(gf_rdma_peer_t *peer, gf_rdma_ioq_t *entry,
gf_rdma_post_t *post,
gf_rdma_reply_info_t *reply_info)
{
gf_rdma_header_t *header = NULL;
char *buf = NULL;
uint32_t payload_size = 0;
int count = 0, i = 0;
int32_t ret = 0;
struct iovec vector[MAX_IOVEC];
header = (gf_rdma_header_t *)post->buf;
__gf_rdma_fill_reply_header(header, entry->rpchdr, reply_info,
peer->send_count);
header->rm_type = hton32(GF_RDMA_NOMSG);
payload_size = iov_length(entry->rpchdr, entry->rpchdr_count) +
iov_length(entry->proghdr, entry->proghdr_count);
/* encode reply chunklist */
buf = (char *)&header->rm_body.rm_chunks[2];
ret = __gf_rdma_reply_encode_write_chunks(peer, payload_size, post,
reply_info, (uint32_t **)&buf);
if (ret == -1) {
gf_msg(GF_RDMA_LOG_NAME, GF_LOG_WARNING, 0, RDMA_MSG_ENCODE_ERROR,
"encoding write chunks failed");
ret = __gf_rdma_send_error(peer, entry, post, reply_info, ERR_CHUNK);
goto out;
}
gf_rdma_post_ref(post);
for (i = 0; i < entry->rpchdr_count; i++) {
vector[count++] = entry->rpchdr[i];
}
for (i = 0; i < entry->proghdr_count; i++) {
vector[count++] = entry->proghdr[i];
}
ret = __gf_rdma_do_gf_rdma_write(peer, post, vector, count, entry->iobref,
reply_info);
if (ret == -1) {
gf_msg(GF_RDMA_LOG_NAME, GF_LOG_WARNING, 0, RDMA_MSG_WRITE_PEER_FAILED,
"rdma write to peer "
"(%s) failed",
peer->trans->peerinfo.identifier);
gf_rdma_post_unref(post);
goto out;
}
ret = gf_rdma_post_send(peer->qp, post, (buf - post->buf));
if (ret) {
gf_msg(GF_RDMA_LOG_NAME, GF_LOG_WARNING, 0, RDMA_MSG_POST_SEND_FAILED,
"posting a send request "
"to client (%s) failed with ret = %d (%s)",
peer->trans->peerinfo.identifier, ret,
(ret > 0) ? strerror(ret) : "");
ret = -1;
gf_rdma_post_unref(post);
} else {
ret = payload_size;
}
out:
return ret;
}
int32_t
__gf_rdma_send_reply_type_msg(gf_rdma_peer_t *peer, gf_rdma_ioq_t *entry,
gf_rdma_post_t *post,
gf_rdma_reply_info_t *reply_info)
{
gf_rdma_header_t *header = NULL;
int32_t send_size = 0, ret = 0;
char *ptr = NULL;
uint32_t payload_size = 0;
send_size = iov_length(entry->rpchdr, entry->rpchdr_count) +
iov_length(entry->proghdr, entry->proghdr_count) +
GLUSTERFS_RDMA_MAX_HEADER_SIZE;
if (send_size > GLUSTERFS_RDMA_INLINE_THRESHOLD) {
gf_msg(GF_RDMA_LOG_NAME, GF_LOG_WARNING, 0,
RDMA_MSG_SEND_SIZE_GREAT_INLINE_THRESHOLD,
"client has provided only write chunks, but the "
"combined size of rpc and program header (%d) is "
"exceeding the size of msg that can be sent using "
"RDMA send (%d)",
send_size, GLUSTERFS_RDMA_INLINE_THRESHOLD);
ret = __gf_rdma_send_error(peer, entry, post, reply_info, ERR_CHUNK);
goto out;
}
header = (gf_rdma_header_t *)post->buf;
__gf_rdma_fill_reply_header(header, entry->rpchdr, reply_info,
peer->send_count);
payload_size = iov_length(entry->prog_payload, entry->prog_payload_count);
ptr = (char *)&header->rm_body.rm_chunks[1];
ret = __gf_rdma_reply_encode_write_chunks(peer, payload_size, post,
reply_info, (uint32_t **)&ptr);
if (ret == -1) {
gf_msg(GF_RDMA_LOG_NAME, GF_LOG_WARNING, 0, RDMA_MSG_ENCODE_ERROR,
"encoding write chunks failed");
ret = __gf_rdma_send_error(peer, entry, post, reply_info, ERR_CHUNK);
goto out;
}
*(uint32_t *)ptr = 0; /* terminate reply chunklist */
ptr += sizeof(uint32_t);
gf_rdma_post_ref(post);
ret = __gf_rdma_do_gf_rdma_write(peer, post, entry->prog_payload,
entry->prog_payload_count, entry->iobref,
reply_info);
if (ret == -1) {
gf_msg(GF_RDMA_LOG_NAME, GF_LOG_WARNING, 0, RDMA_MSG_WRITE_PEER_FAILED,
"rdma write to peer "
"(%s) failed",
peer->trans->peerinfo.identifier);
gf_rdma_post_unref(post);
goto out;
}
iov_unload(ptr, entry->rpchdr, entry->rpchdr_count);
ptr += iov_length(entry->rpchdr, entry->rpchdr_count);
iov_unload(ptr, entry->proghdr, entry->proghdr_count);
ptr += iov_length(entry->proghdr, entry->proghdr_count);
ret = gf_rdma_post_send(peer->qp, post, (ptr - post->buf));
if (ret) {
gf_msg(GF_RDMA_LOG_NAME, GF_LOG_WARNING, 0, RDMA_MSG_SEND_CLIENT_ERROR,
"rdma send to client (%s) failed with ret = %d (%s)",
peer->trans->peerinfo.identifier, ret,
(ret > 0) ? strerror(ret) : "");
gf_rdma_post_unref(post);
ret = -1;
} else {
ret = send_size + payload_size;
}
out:
return ret;
}
void
gf_rdma_reply_info_destroy(gf_rdma_reply_info_t *reply_info)
{
if (reply_info == NULL) {
goto out;
}
if (reply_info->wc_array != NULL) {
GF_FREE(reply_info->wc_array);
reply_info->wc_array = NULL;
}
mem_put(reply_info);
out:
return;
}
gf_rdma_reply_info_t *
gf_rdma_reply_info_alloc(gf_rdma_peer_t *peer)
{
gf_rdma_reply_info_t *reply_info = NULL;
gf_rdma_private_t *priv = NULL;
priv = peer->trans->private;
reply_info = mem_get(priv->device->reply_info_pool);
if (reply_info == NULL) {
goto out;
}
memset(reply_info, 0, sizeof(*reply_info));
reply_info->pool = priv->device->reply_info_pool;
out:
return reply_info;
}
int32_t
__gf_rdma_ioq_churn_reply(gf_rdma_peer_t *peer, gf_rdma_ioq_t *entry,
gf_rdma_post_t *post)
{
gf_rdma_reply_info_t *reply_info = NULL;
int32_t ret = -1;
gf_rdma_chunktype_t type = gf_rdma_noch;
GF_VALIDATE_OR_GOTO(GF_RDMA_LOG_NAME, peer, out);
GF_VALIDATE_OR_GOTO(GF_RDMA_LOG_NAME, entry, out);
GF_VALIDATE_OR_GOTO(GF_RDMA_LOG_NAME, post, out);
reply_info = entry->msg.reply_info;
if (reply_info != NULL) {
type = reply_info->type;
}
switch (type) {
case gf_rdma_noch:
ret = __gf_rdma_send_reply_inline(peer, entry, post, reply_info);
if (ret < 0) {
gf_msg(GF_RDMA_LOG_NAME, GF_LOG_WARNING, 0,
RDMA_MSG_SEND_REPLY_FAILED,
"failed to send reply to peer (%s) as an "
"inlined rdma msg",
peer->trans->peerinfo.identifier);
}
break;
case gf_rdma_replych:
ret = __gf_rdma_send_reply_type_nomsg(peer, entry, post,
reply_info);
if (ret < 0) {
gf_msg(GF_RDMA_LOG_NAME, GF_LOG_WARNING, 0,
RDMA_MSG_SEND_REPLY_FAILED,
"failed to send reply to peer (%s) as "
"RDMA_NOMSG",
peer->trans->peerinfo.identifier);
}
break;
case gf_rdma_writech:
ret = __gf_rdma_send_reply_type_msg(peer, entry, post, reply_info);
if (ret < 0) {
gf_msg(GF_RDMA_LOG_NAME, GF_LOG_WARNING, 0,
RDMA_MSG_SEND_REPLY_FAILED,
"failed to send reply with write chunks "
"to peer (%s)",
peer->trans->peerinfo.identifier);
}
break;
default:
gf_msg(GF_RDMA_LOG_NAME, GF_LOG_WARNING, 0,
RDMA_MSG_INVALID_CHUNK_TYPE,
"invalid chunktype (%d) specified for sending reply "
" (peer:%s)",
type, peer->trans->peerinfo.identifier);
break;
}
if (reply_info != NULL) {
gf_rdma_reply_info_destroy(reply_info);
}
out:
return ret;
}
int32_t
__gf_rdma_ioq_churn_entry(gf_rdma_peer_t *peer, gf_rdma_ioq_t *entry)
{
int32_t ret = 0, quota = 0;
gf_rdma_private_t *priv = NULL;
gf_rdma_device_t *device = NULL;
gf_rdma_options_t *options = NULL;
gf_rdma_post_t *post = NULL;
priv = peer->trans->private;
options = &priv->options;
device = priv->device;
quota = __gf_rdma_quota_get(peer);
if (quota > 0) {
post = gf_rdma_get_post(&device->sendq);
if (post == NULL) {
post = gf_rdma_new_post(peer->trans, device,
(options->send_size + 2048),
GF_RDMA_SEND_POST);
}
if (post == NULL) {
ret = -1;
gf_msg_callingfn(GF_RDMA_LOG_NAME, GF_LOG_WARNING, 0,
RDMA_MSG_POST_SEND_FAILED,
"not able to get a post to send msg");
goto out;
}
if (entry->is_request) {
ret = __gf_rdma_ioq_churn_request(peer, entry, post);
if (ret < 0) {
gf_msg(GF_RDMA_LOG_NAME, GF_LOG_WARNING, 0,
RDMA_MSG_PROC_IOQ_ENTRY_FAILED,
"failed to process request ioq entry "
"to peer(%s)",
peer->trans->peerinfo.identifier);
}
} else {
ret = __gf_rdma_ioq_churn_reply(peer, entry, post);
if (ret < 0) {
gf_msg(GF_RDMA_LOG_NAME, GF_LOG_WARNING, 0,
RDMA_MSG_PROC_IOQ_ENTRY_FAILED,
"failed to process reply ioq entry "
"to peer (%s)",
peer->trans->peerinfo.identifier);
}
}
if (ret != 0) {
__gf_rdma_ioq_entry_free(entry);
}
} else {
ret = 0;
}
out:
return ret;
}
static int32_t
__gf_rdma_ioq_churn(gf_rdma_peer_t *peer)
{
gf_rdma_ioq_t *entry = NULL;
int32_t ret = 0;
while (!list_empty(&peer->ioq)) {
/* pick next entry */
entry = peer->ioq_next;
ret = __gf_rdma_ioq_churn_entry(peer, entry);
if (ret <= 0)
break;
}
/*
list_for_each_entry_safe (entry, dummy, &peer->ioq, list) {
ret = __gf_rdma_ioq_churn_entry (peer, entry);
if (ret <= 0) {
break;
}
}
*/
return ret;
}
static int32_t
gf_rdma_writev(rpc_transport_t *this, gf_rdma_ioq_t *entry)
{
int32_t ret = 0, need_append = 1;
gf_rdma_private_t *priv = NULL;
gf_rdma_peer_t *peer = NULL;
priv = this->private;
pthread_mutex_lock(&priv->write_mutex);
{
if (!priv->connected) {
gf_msg(this->name, GF_LOG_WARNING, 0, RDMA_MSG_PEER_DISCONNECTED,
"rdma is not connected to peer (%s)",
this->peerinfo.identifier);
ret = -1;
goto unlock;
}
peer = &priv->peer;
if (list_empty(&peer->ioq)) {
ret = __gf_rdma_ioq_churn_entry(peer, entry);
if (ret != 0) {
need_append = 0;
if (ret < 0) {
gf_msg(this->name, GF_LOG_WARNING, 0,
RDMA_MSG_PROC_IOQ_ENTRY_FAILED,
"processing ioq entry destined"
" to (%s) failed",
this->peerinfo.identifier);
}
}
}
if (need_append) {
list_add_tail(&entry->list, &peer->ioq);
}
}
unlock:
pthread_mutex_unlock(&priv->write_mutex);
return ret;
}
gf_rdma_ioq_t *
gf_rdma_ioq_new(rpc_transport_t *this, rpc_transport_data_t *data)
{
gf_rdma_ioq_t *entry = NULL;
int count = 0, i = 0;
rpc_transport_msg_t *msg = NULL;
gf_rdma_private_t *priv = NULL;
if ((data == NULL) || (this == NULL)) {
goto out;
}
priv = this->private;
entry = mem_get(priv->device->ioq_pool);
if (entry == NULL) {
goto out;
}
memset(entry, 0, sizeof(*entry));
entry->pool = priv->device->ioq_pool;
if (data->is_request) {
msg = &data->data.req.msg;
if (data->data.req.rsp.rsphdr_count != 0) {
for (i = 0; i < data->data.req.rsp.rsphdr_count; i++) {
entry->msg.request.rsphdr_vec[i] = data->data.req.rsp.rsphdr[i];
}
entry->msg.request.rsphdr_count = data->data.req.rsp.rsphdr_count;
}
if (data->data.req.rsp.rsp_payload_count != 0) {
for (i = 0; i < data->data.req.rsp.rsp_payload_count; i++) {
entry->msg.request.rsp_payload[i] = data->data.req.rsp
.rsp_payload[i];
}
entry->msg.request.rsp_payload_count = data->data.req.rsp
.rsp_payload_count;
}
entry->msg.request.rpc_req = data->data.req.rpc_req;
if (data->data.req.rsp.rsp_iobref != NULL) {
entry->msg.request.rsp_iobref = iobref_ref(
data->data.req.rsp.rsp_iobref);
}
} else {
msg = &data->data.reply.msg;
entry->msg.reply_info = data->data.reply.private;
}
entry->is_request = data->is_request;
count = msg->rpchdrcount + msg->proghdrcount + msg->progpayloadcount;
GF_ASSERT(count <= MAX_IOVEC);
if (msg->rpchdr != NULL) {
memcpy(&entry->rpchdr[0], msg->rpchdr,
sizeof(struct iovec) * msg->rpchdrcount);
entry->rpchdr_count = msg->rpchdrcount;
}
if (msg->proghdr != NULL) {
memcpy(&entry->proghdr[0], msg->proghdr,
sizeof(struct iovec) * msg->proghdrcount);
entry->proghdr_count = msg->proghdrcount;
}
if (msg->progpayload != NULL) {
memcpy(&entry->prog_payload[0], msg->progpayload,
sizeof(struct iovec) * msg->progpayloadcount);
entry->prog_payload_count = msg->progpayloadcount;
}
if (msg->iobref != NULL) {
entry->iobref = iobref_ref(msg->iobref);
}
INIT_LIST_HEAD(&entry->list);
out:
return entry;
}
int32_t
gf_rdma_submit_request(rpc_transport_t *this, rpc_transport_req_t *req)
{
int32_t ret = 0;
gf_rdma_ioq_t *entry = NULL;
rpc_transport_data_t data;
gf_rdma_private_t *priv = NULL;
gf_rdma_peer_t *peer = NULL;
if (req == NULL) {
goto out;
}
priv = this->private;
if (priv == NULL) {
ret = -1;
goto out;
}
peer = &priv->peer;
data.is_request = 1;
data.data.req = *req;
/*
* when fist message is received on a transport, quota variable will
* initiaize and quota_set will set to one. In gluster code client
* process with respect to transport is the one who sends the first
* message. Before settng quota_set variable if a submit request is
* came on server, then the message should not send.
*/
if (priv->entity == GF_RDMA_SERVER && peer->quota_set == 0) {
ret = 0;
goto out;
}
entry = gf_rdma_ioq_new(this, &data);
if (entry == NULL) {
gf_msg(this->name, GF_LOG_WARNING, 0, RDMA_MSG_NEW_IOQ_ENTRY_FAILED,
"getting a new ioq entry failed (peer:%s)",
this->peerinfo.identifier);
goto out;
}
ret = gf_rdma_writev(this, entry);
if (ret > 0) {
ret = 0;
} else if (ret < 0) {
gf_msg(GF_RDMA_LOG_NAME, GF_LOG_WARNING, 0, RDMA_MSG_WRITE_PEER_FAILED,
"sending request to peer (%s) failed",
this->peerinfo.identifier);
rpc_transport_disconnect(this, _gf_false);
}
out:
return ret;
}
int32_t
gf_rdma_submit_reply(rpc_transport_t *this, rpc_transport_reply_t *reply)
{
int32_t ret = 0;
gf_rdma_ioq_t *entry = NULL;
rpc_transport_data_t data;
if (reply == NULL) {
goto out;
}
data.data.reply = *reply;
entry = gf_rdma_ioq_new(this, &data);
if (entry == NULL) {
gf_msg(this->name, GF_LOG_WARNING, 0, RDMA_MSG_NEW_IOQ_ENTRY_FAILED,
"getting a new ioq entry failed (peer:%s)",
this->peerinfo.identifier);
goto out;
}
ret = gf_rdma_writev(this, entry);
if (ret > 0) {
ret = 0;
} else if (ret < 0) {
gf_msg(GF_RDMA_LOG_NAME, GF_LOG_WARNING, 0, RDMA_MSG_WRITE_PEER_FAILED,
"sending request to peer (%s) failed",
this->peerinfo.identifier);
rpc_transport_disconnect(this, _gf_false);
}
out:
return ret;
}
static int
gf_rdma_register_peer(gf_rdma_device_t *device, int32_t qp_num,
gf_rdma_peer_t *peer)
{
struct _qpent *ent = NULL;
gf_rdma_qpreg_t *qpreg = NULL;
int32_t hash = 0;
int ret = -1;
qpreg = &device->qpreg;
hash = qp_num % 42;
pthread_mutex_lock(&qpreg->lock);
{
ent = qpreg->ents[hash].next;
while ((ent != &qpreg->ents[hash]) && (ent->qp_num != qp_num)) {
ent = ent->next;
}
if (ent->qp_num == qp_num) {
ret = 0;
goto unlock;
}
ent = (struct _qpent *)GF_CALLOC(1, sizeof(*ent), gf_common_mt_qpent);
if (ent == NULL) {
goto unlock;
}
/* TODO: ref reg->peer */
ent->peer = peer;
ent->next = &qpreg->ents[hash];
ent->prev = ent->next->prev;
ent->next->prev = ent;
ent->prev->next = ent;
ent->qp_num = qp_num;
qpreg->count++;
ret = 0;
}
unlock:
pthread_mutex_unlock(&qpreg->lock);
return ret;
}
static void
gf_rdma_unregister_peer(gf_rdma_device_t *device, int32_t qp_num)
{
struct _qpent *ent = NULL;
gf_rdma_qpreg_t *qpreg = NULL;
int32_t hash = 0;
qpreg = &device->qpreg;
hash = qp_num % 42;
pthread_mutex_lock(&qpreg->lock);
{
ent = qpreg->ents[hash].next;
while ((ent != &qpreg->ents[hash]) && (ent->qp_num != qp_num))
ent = ent->next;
if ((ent->qp_num != qp_num) || (ent == &qpreg->ents[hash])) {
pthread_mutex_unlock(&qpreg->lock);
return;
}
ent->prev->next = ent->next;
ent->next->prev = ent->prev;
/* TODO: unref reg->peer */
GF_FREE(ent);
qpreg->count--;
}
pthread_mutex_unlock(&qpreg->lock);
}
static gf_rdma_peer_t *
__gf_rdma_lookup_peer(gf_rdma_device_t *device, int32_t qp_num)
{
struct _qpent *ent = NULL;
gf_rdma_peer_t *peer = NULL;
gf_rdma_qpreg_t *qpreg = NULL;
int32_t hash = 0;
qpreg = &device->qpreg;
hash = qp_num % 42;
ent = qpreg->ents[hash].next;
while ((ent != &qpreg->ents[hash]) && (ent->qp_num != qp_num))
ent = ent->next;
if (ent != &qpreg->ents[hash]) {
peer = ent->peer;
}
return peer;
}
static void
__gf_rdma_destroy_qp(rpc_transport_t *this)
{
gf_rdma_private_t *priv = NULL;
priv = this->private;
if (priv->peer.qp) {
gf_rdma_unregister_peer(priv->device, priv->peer.qp->qp_num);
rdma_destroy_qp(priv->peer.cm_id);
}
priv->peer.qp = NULL;
return;
}
static int32_t
gf_rdma_create_qp(rpc_transport_t *this)
{
gf_rdma_private_t *priv = NULL;
gf_rdma_device_t *device = NULL;
int32_t ret = 0;
gf_rdma_peer_t *peer = NULL;
char *device_name = NULL;
priv = this->private;
peer = &priv->peer;
device_name = (char *)ibv_get_device_name(peer->cm_id->verbs->device);
if (device_name == NULL) {
ret = -1;
gf_msg(this->name, GF_LOG_WARNING, 0, RDMA_MSG_GET_DEVICE_NAME_FAILED,
"cannot get "
"device_name");
goto out;
}
device = gf_rdma_get_device(this, peer->cm_id->verbs, device_name);
if (device == NULL) {
ret = -1;
gf_msg(this->name, GF_LOG_WARNING, 0, RDMA_MSG_GET_DEVICE_FAILED,
"cannot get device for "
"device %s",
device_name);
goto out;
}
if (priv->device == NULL) {
priv->device = device;
}
struct ibv_qp_init_attr init_attr = {
.send_cq = device->send_cq,
.recv_cq = device->recv_cq,
.srq = device->srq,
.cap = {.max_send_wr = peer->send_count,
.max_recv_wr = peer->recv_count,
.max_send_sge = 2,
.max_recv_sge = 1},
.qp_type = IBV_QPT_RC};
ret = rdma_create_qp(peer->cm_id, device->pd, &init_attr);
if (ret != 0) {
gf_msg(peer->trans->name, GF_LOG_CRITICAL, errno,
RDMA_MSG_CREAT_QP_FAILED, "%s: could not create QP", this->name);
ret = -1;
goto out;
}
peer->qp = peer->cm_id->qp;
ret = gf_rdma_register_peer(device, peer->qp->qp_num, peer);
out:
if (ret == -1)
__gf_rdma_destroy_qp(this);
return ret;
}
static int32_t
__gf_rdma_teardown(rpc_transport_t *this)
{
gf_rdma_private_t *priv = NULL;
gf_rdma_peer_t *peer = NULL;
priv = this->private;
peer = &priv->peer;
if (peer->cm_id && peer->cm_id->qp != NULL) {
__gf_rdma_destroy_qp(this);
}
if (!list_empty(&priv->peer.ioq)) {
__gf_rdma_ioq_flush(peer);
}
if (peer->cm_id != NULL) {
rdma_destroy_id(peer->cm_id);
peer->cm_id = NULL;
}
/* TODO: decrement cq size */
return 0;
}
static int32_t
gf_rdma_teardown(rpc_transport_t *this)
{
int32_t ret = 0;
gf_rdma_private_t *priv = NULL;
if (this == NULL) {
goto out;
}
priv = this->private;
pthread_mutex_lock(&priv->write_mutex);
{
ret = __gf_rdma_teardown(this);
}
pthread_mutex_unlock(&priv->write_mutex);
out:
return ret;
}
/*
* allocates new memory to hold write-chunklist. New memory is needed since
* write-chunklist will be used while sending reply and the post holding initial
* write-chunklist sent from client will be put back to srq before a pollin
* event is sent to upper layers.
*/
int32_t
gf_rdma_get_write_chunklist(char **ptr, gf_rdma_write_array_t **write_ary)
{
gf_rdma_write_array_t *from = NULL, *to = NULL;
int32_t ret = -1, size = 0, i = 0;
from = (gf_rdma_write_array_t *)*ptr;
if (from->wc_discrim == 0) {
ret = 0;
goto out;
}
from->wc_nchunks = ntoh32(from->wc_nchunks);
size = sizeof(*from) + (sizeof(gf_rdma_write_chunk_t) * from->wc_nchunks);
to = GF_CALLOC(1, size, gf_common_mt_char);
if (to == NULL) {
ret = -1;
goto out;
}
to->wc_discrim = ntoh32(from->wc_discrim);
to->wc_nchunks = from->wc_nchunks;
for (i = 0; i < to->wc_nchunks; i++) {
to->wc_array[i].wc_target.rs_handle = ntoh32(
from->wc_array[i].wc_target.rs_handle);
to->wc_array[i].wc_target.rs_length = ntoh32(
from->wc_array[i].wc_target.rs_length);
to->wc_array[i].wc_target.rs_offset = ntoh64(
from->wc_array[i].wc_target.rs_offset);
}
*write_ary = to;
ret = 0;
*ptr = (char *)&from->wc_array[i].wc_target.rs_handle;
out:
return ret;
}
/*
* does not allocate new memory to hold read-chunklist. New memory is not
* needed, since post is not put back to srq till we've completed all the
* rdma-reads and hence readchunk-list can point to memory held by post.
*/
int32_t
gf_rdma_get_read_chunklist(char **ptr, gf_rdma_read_chunk_t **readch)
{
int32_t ret = -1;
gf_rdma_read_chunk_t *chunk = NULL;
int i = 0;
chunk = (gf_rdma_read_chunk_t *)*ptr;
if (chunk[0].rc_discrim == 0) {
ret = 0;
goto out;
}
for (i = 0; chunk[i].rc_discrim != 0; i++) {
chunk[i].rc_discrim = ntoh32(chunk[i].rc_discrim);
chunk[i].rc_position = ntoh32(chunk[i].rc_position);
chunk[i].rc_target.rs_handle = ntoh32(chunk[i].rc_target.rs_handle);
chunk[i].rc_target.rs_length = ntoh32(chunk[i].rc_target.rs_length);
chunk[i].rc_target.rs_offset = ntoh64(chunk[i].rc_target.rs_offset);
}
*readch = &chunk[0];
ret = 0;
*ptr = (char *)&chunk[i].rc_discrim;
out:
return ret;
}
static int32_t
gf_rdma_decode_error_msg(gf_rdma_peer_t *peer, gf_rdma_post_t *post,
size_t bytes_in_post)
{
gf_rdma_header_t *header = NULL;
struct iobuf *iobuf = NULL;
struct iobref *iobref = NULL;
int32_t ret = -1;
struct rpc_msg rpc_msg = {
0,
};
header = (gf_rdma_header_t *)post->buf;
header->rm_body.rm_error.rm_type = ntoh32(header->rm_body.rm_error.rm_type);
if (header->rm_body.rm_error.rm_type == ERR_VERS) {
header->rm_body.rm_error.rm_version.gf_rdma_vers_low = ntoh32(
header->rm_body.rm_error.rm_version.gf_rdma_vers_low);
header->rm_body.rm_error.rm_version.gf_rdma_vers_high = ntoh32(
header->rm_body.rm_error.rm_version.gf_rdma_vers_high);
}
rpc_msg.rm_xid = header->rm_xid;
rpc_msg.rm_direction = REPLY;
rpc_msg.rm_reply.rp_stat = MSG_DENIED;
iobuf = iobuf_get2(peer->trans->ctx->iobuf_pool, bytes_in_post);
if (iobuf == NULL) {
ret = -1;
goto out;
}
post->ctx.iobref = iobref = iobref_new();
if (iobref == NULL) {
ret = -1;
goto out;
}
ret = rpc_reply_to_xdr(&rpc_msg, iobuf_ptr(iobuf), iobuf_pagesize(iobuf),
&post->ctx.vector[0]);
if (ret == -1) {
gf_msg(GF_RDMA_LOG_NAME, GF_LOG_WARNING, 0,
RDMA_MSG_RPC_REPLY_CREATE_FAILED,
"Failed to create "
"RPC reply");
goto out;
}
iobref_add(iobref, iobuf);
iobuf_unref(iobuf);
post->ctx.count = 1;
iobuf = NULL;
iobref = NULL;
out:
if (ret == -1) {
if (iobuf != NULL) {
iobuf_unref(iobuf);
}
if (iobref != NULL) {
iobref_unref(iobref);
}
}
return 0;
}
int32_t
gf_rdma_decode_msg(gf_rdma_peer_t *peer, gf_rdma_post_t *post,
gf_rdma_read_chunk_t **readch, size_t bytes_in_post)
{
int32_t ret = -1;
gf_rdma_header_t *header = NULL;
gf_rdma_reply_info_t *reply_info = NULL;
char *ptr = NULL;
gf_rdma_write_array_t *write_ary = NULL;
size_t header_len = 0;
header = (gf_rdma_header_t *)post->buf;
ptr = (char *)&header->rm_body.rm_chunks[0];
ret = gf_rdma_get_read_chunklist(&ptr, readch);
if (ret == -1) {
gf_msg(GF_RDMA_LOG_NAME, GF_LOG_WARNING, 0,
RDMA_MSG_GET_READ_CHUNK_FAILED,
"cannot get read "
"chunklist from msg");
goto out;
}
/* skip terminator of read-chunklist */
ptr = ptr + sizeof(uint32_t);
ret = gf_rdma_get_write_chunklist(&ptr, &write_ary);
if (ret == -1) {
gf_msg(GF_RDMA_LOG_NAME, GF_LOG_WARNING, 0,
RDMA_MSG_GET_WRITE_CHUNK_FAILED,
"cannot get write "
"chunklist from msg");
goto out;
}
/* skip terminator of write-chunklist */
ptr = ptr + sizeof(uint32_t);
if (write_ary != NULL) {
reply_info = gf_rdma_reply_info_alloc(peer);
if (reply_info == NULL) {
gf_msg(GF_RDMA_LOG_NAME, GF_LOG_WARNING, 0,
RDMA_MSG_REPLY_INFO_ALLOC_FAILED, "reply_info_alloc failed");
ret = -1;
goto out;
}
reply_info->type = gf_rdma_writech;
reply_info->wc_array = write_ary;
reply_info->rm_xid = header->rm_xid;
} else {
ret = gf_rdma_get_write_chunklist(&ptr, &write_ary);
if (ret == -1) {
gf_msg(GF_RDMA_LOG_NAME, GF_LOG_WARNING, 0,
RDMA_MSG_CHUNKLIST_ERROR,
"cannot get reply "
"chunklist from msg");
goto out;
}
if (write_ary != NULL) {
reply_info = gf_rdma_reply_info_alloc(peer);
if (reply_info == NULL) {
gf_msg(GF_RDMA_LOG_NAME, GF_LOG_WARNING, 0,
RDMA_MSG_REPLY_INFO_ALLOC_FAILED,
"reply_info_alloc_failed");
ret = -1;
goto out;
}
reply_info->type = gf_rdma_replych;
reply_info->wc_array = write_ary;
reply_info->rm_xid = header->rm_xid;
}
}
/* skip terminator of reply chunk */
ptr = ptr + sizeof(uint32_t);
if (header->rm_type != GF_RDMA_NOMSG) {
header_len = (long)ptr - (long)post->buf;
post->ctx.vector[0].iov_len = (bytes_in_post - header_len);
post->ctx.hdr_iobuf = iobuf_get2(peer->trans->ctx->iobuf_pool,
(bytes_in_post - header_len));
if (post->ctx.hdr_iobuf == NULL) {
ret = -1;
goto out;
}
post->ctx.vector[0].iov_base = iobuf_ptr(post->ctx.hdr_iobuf);
memcpy(post->ctx.vector[0].iov_base, ptr, post->ctx.vector[0].iov_len);
post->ctx.count = 1;
}
post->ctx.reply_info = reply_info;
out:
if (ret == -1) {
if (*readch != NULL) {
GF_FREE(*readch);
*readch = NULL;
}
if (reply_info)
GF_FREE(reply_info);
GF_FREE(write_ary);
}
return ret;
}
/* Assumes only one of either write-chunklist or a reply chunk is present */
int32_t
gf_rdma_decode_header(gf_rdma_peer_t *peer, gf_rdma_post_t *post,
gf_rdma_read_chunk_t **readch, size_t bytes_in_post)
{
int32_t ret = -1;
gf_rdma_header_t *header = NULL;
header = (gf_rdma_header_t *)post->buf;
header->rm_xid = ntoh32(header->rm_xid);
header->rm_vers = ntoh32(header->rm_vers);
header->rm_credit = ntoh32(header->rm_credit);
header->rm_type = ntoh32(header->rm_type);
switch (header->rm_type) {
case GF_RDMA_MSG:
case GF_RDMA_NOMSG:
ret = gf_rdma_decode_msg(peer, post, readch, bytes_in_post);
if (ret < 0) {
gf_msg(GF_RDMA_LOG_NAME, GF_LOG_WARNING, 0,
RDMA_MSG_ENCODE_ERROR,
"cannot decode msg of "
"type (%d)",
header->rm_type);
}
break;
case GF_RDMA_MSGP:
gf_msg(GF_RDMA_LOG_NAME, GF_LOG_WARNING, 0, RDMA_MSG_INVALID_ENTRY,
"rdma msg of msg-type "
"GF_RDMA_MSGP should not have been received");
ret = -1;
break;
case GF_RDMA_DONE:
gf_msg(GF_RDMA_LOG_NAME, GF_LOG_WARNING, 0, RDMA_MSG_INVALID_ENTRY,
"rdma msg of msg-type "
"GF_RDMA_DONE should not have been received");
ret = -1;
break;
case GF_RDMA_ERROR:
gf_msg(GF_RDMA_LOG_NAME, GF_LOG_WARNING, 0,
RDMA_MSG_RDMA_ERROR_RECEIVED,
"received a msg of type"
" RDMA_ERROR");
ret = gf_rdma_decode_error_msg(peer, post, bytes_in_post);
break;
default:
gf_msg(GF_RDMA_LOG_NAME, GF_LOG_WARNING, 0, RDMA_MSG_INVALID_ENTRY,
"unknown rdma msg-type (%d)", header->rm_type);
}
return ret;
}
int32_t
gf_rdma_do_reads(gf_rdma_peer_t *peer, gf_rdma_post_t *post,
gf_rdma_read_chunk_t *readch)
{
int32_t ret = -1, i = 0, count = 0;
size_t size = 0;
char *ptr = NULL;
struct iobuf *iobuf = NULL;
gf_rdma_private_t *priv = NULL;
struct ibv_sge *list = NULL;
struct ibv_send_wr *wr = NULL, *bad_wr = NULL;
int total_ref = 0;
priv = peer->trans->private;
for (i = 0; readch[i].rc_discrim != 0; i++) {
size += readch[i].rc_target.rs_length;
}
if (i == 0) {
gf_msg(GF_RDMA_LOG_NAME, GF_LOG_WARNING, 0, RDMA_MSG_INVALID_CHUNK_TYPE,
"message type specified "
"as rdma-read but there are no rdma read-chunks "
"present");
goto out;
}
post->ctx.gf_rdma_reads = i;
i = 0;
iobuf = iobuf_get2(peer->trans->ctx->iobuf_pool, size);
if (iobuf == NULL) {
goto out;
}
if (post->ctx.iobref == NULL) {
post->ctx.iobref = iobref_new();
if (post->ctx.iobref == NULL) {
iobuf_unref(iobuf);
iobuf = NULL;
goto out;
}
}
ptr = iobuf_ptr(iobuf);
iobref_add(post->ctx.iobref, iobuf);
iobuf_unref(iobuf);
iobuf = NULL;
pthread_mutex_lock(&priv->write_mutex);
{
if (!priv->connected) {
gf_msg(GF_RDMA_LOG_NAME, GF_LOG_WARNING, 0,
RDMA_MSG_PEER_DISCONNECTED,
"transport not "
"connected to peer (%s), not doing rdma reads",
peer->trans->peerinfo.identifier);
goto unlock;
}
list = GF_CALLOC(post->ctx.gf_rdma_reads, sizeof(struct ibv_sge),
gf_common_mt_sge);
if (list == NULL) {
errno = ENOMEM;
ret = -1;
goto unlock;
}
wr = GF_CALLOC(post->ctx.gf_rdma_reads, sizeof(struct ibv_send_wr),
gf_common_mt_wr);
if (wr == NULL) {
errno = ENOMEM;
ret = -1;
goto unlock;
}
for (i = 0; readch[i].rc_discrim != 0; i++) {
count = post->ctx.count++;
post->ctx.vector[count].iov_base = ptr;
post->ctx.vector[count].iov_len = readch[i].rc_target.rs_length;
ret = __gf_rdma_register_local_mr_for_rdma(
peer, &post->ctx.vector[count], 1, &post->ctx);
if (ret == -1) {
gf_msg(GF_RDMA_LOG_NAME, GF_LOG_WARNING, 0,
RDMA_MSG_MR_ALOC_FAILED,
"registering local memory"
" for rdma read failed");
goto unlock;
}
list[i].addr = (unsigned long)post->ctx.vector[count].iov_base;
list[i].length = post->ctx.vector[count].iov_len;
list[i].lkey = post->ctx.mr[post->ctx.mr_count - 1]->lkey;
wr[i].wr_id = (unsigned long)gf_rdma_post_ref(post);
wr[i].sg_list = &list[i];
wr[i].next = &wr[i + 1];
wr[i].num_sge = 1;
wr[i].opcode = IBV_WR_RDMA_READ;
wr[i].send_flags = IBV_SEND_SIGNALED;
wr[i].wr.rdma.remote_addr = readch[i].rc_target.rs_offset;
wr[i].wr.rdma.rkey = readch[i].rc_target.rs_handle;
ptr += readch[i].rc_target.rs_length;
total_ref++;
}
wr[i - 1].next = NULL;
ret = ibv_post_send(peer->qp, wr, &bad_wr);
if (ret) {
gf_msg(GF_RDMA_LOG_NAME, GF_LOG_WARNING, 0,
RDMA_MSG_READ_CLIENT_ERROR,
"rdma read from "
"client (%s) failed with ret = %d (%s)",
peer->trans->peerinfo.identifier, ret,
(ret > 0) ? strerror(ret) : "");
if (!bad_wr) {
ret = -1;
goto unlock;
}
for (i = 0; i < post->ctx.gf_rdma_reads; i++) {
if (&wr[i] != bad_wr)
total_ref--;
else
break;
}
ret = -1;
}
}
unlock:
pthread_mutex_unlock(&priv->write_mutex);
out:
if (list)
GF_FREE(list);
if (wr)
GF_FREE(wr);
if (ret == -1) {
while (total_ref-- > 0)
gf_rdma_post_unref(post);
}
return ret;
}
int32_t
gf_rdma_pollin_notify(gf_rdma_peer_t *peer, gf_rdma_post_t *post)
{
int32_t ret = -1;
enum msg_type msg_type = 0;
struct rpc_req *rpc_req = NULL;
gf_rdma_request_context_t *request_context = NULL;
rpc_request_info_t request_info = {
0,
};
gf_rdma_private_t *priv = NULL;
uint32_t *ptr = NULL;
rpc_transport_pollin_t *pollin = NULL;
if ((peer == NULL) || (post == NULL)) {
goto out;
}
if (post->ctx.iobref == NULL) {
post->ctx.iobref = iobref_new();
if (post->ctx.iobref == NULL) {
goto out;
}
/* handling the case where both hdr and payload of
* GF_FOP_READ_CBK were received in a single iobuf
* because of server sending entire msg as inline without
* doing rdma writes.
*/
if (post->ctx.hdr_iobuf)
iobref_add(post->ctx.iobref, post->ctx.hdr_iobuf);
}
pollin = rpc_transport_pollin_alloc(peer->trans, post->ctx.vector,
post->ctx.count, post->ctx.hdr_iobuf,
post->ctx.iobref, post->ctx.reply_info);
if (pollin == NULL) {
goto out;
}
ptr = (uint32_t *)pollin->vector[0].iov_base;
request_info.xid = ntoh32(*ptr);
msg_type = ntoh32(*(ptr + 1));
if (msg_type == REPLY) {
ret = rpc_transport_notify(peer->trans, RPC_TRANSPORT_MAP_XID_REQUEST,
&request_info);
if (ret == -1) {
gf_msg_debug(GF_RDMA_LOG_NAME, 0,
"cannot get request"
"information from rpc layer");
goto out;
}
rpc_req = request_info.rpc_req;
if (rpc_req == NULL) {
gf_msg_debug(GF_RDMA_LOG_NAME, 0,
"rpc request "
"structure not found");
ret = -1;
goto out;
}
request_context = rpc_req->conn_private;
rpc_req->conn_private = NULL;
priv = peer->trans->private;
if (request_context != NULL) {
pthread_mutex_lock(&priv->write_mutex);
{
__gf_rdma_request_context_destroy(request_context);
}
pthread_mutex_unlock(&priv->write_mutex);
} else {
gf_rdma_quota_put(peer);
}
pollin->is_reply = 1;
}
ret = rpc_transport_notify(peer->trans, RPC_TRANSPORT_MSG_RECEIVED, pollin);
if (ret < 0) {
gf_msg(GF_RDMA_LOG_NAME, GF_LOG_WARNING, 0, TRANS_MSG_TRANSPORT_ERROR,
"transport_notify failed");
}
out:
if (pollin != NULL) {
pollin->private = NULL;
rpc_transport_pollin_destroy(pollin);
}
return ret;
}
int32_t
gf_rdma_recv_reply(gf_rdma_peer_t *peer, gf_rdma_post_t *post)
{
int32_t ret = -1;
gf_rdma_header_t *header = NULL;
gf_rdma_reply_info_t *reply_info = NULL;
gf_rdma_write_array_t *wc_array = NULL;
int i = 0;
uint32_t *ptr = NULL;
gf_rdma_request_context_t *ctx = NULL;
rpc_request_info_t request_info = {
0,
};
struct rpc_req *rpc_req = NULL;
header = (gf_rdma_header_t *)post->buf;
reply_info = post->ctx.reply_info;
/* no write chunklist, just notify upper layers */
if (reply_info == NULL) {
ret = 0;
goto out;
}
wc_array = reply_info->wc_array;
if (header->rm_type == GF_RDMA_NOMSG) {
post->ctx.vector[0].iov_base = (void *)(long)wc_array->wc_array[0]
.wc_target.rs_offset;
post->ctx.vector[0].iov_len = wc_array->wc_array[0].wc_target.rs_length;
post->ctx.count = 1;
} else {
for (i = 0; i < wc_array->wc_nchunks; i++) {
post->ctx.vector[i + 1].iov_base =
(void *)(long)wc_array->wc_array[i].wc_target.rs_offset;
post->ctx.vector[i + 1].iov_len = wc_array->wc_array[i]
.wc_target.rs_length;
}
post->ctx.count += wc_array->wc_nchunks;
}
ptr = (uint32_t *)post->ctx.vector[0].iov_base;
request_info.xid = ntoh32(*ptr);
ret = rpc_transport_notify(peer->trans, RPC_TRANSPORT_MAP_XID_REQUEST,
&request_info);
if (ret == -1) {
gf_msg(GF_RDMA_LOG_NAME, GF_LOG_WARNING, 0, TRANS_MSG_TRANSPORT_ERROR,
"cannot get request "
"information (peer:%s) from rpc layer",
peer->trans->peerinfo.identifier);
goto out;
}
rpc_req = request_info.rpc_req;
if (rpc_req == NULL) {
gf_msg(GF_RDMA_LOG_NAME, GF_LOG_WARNING, 0, RDMA_MSG_RPC_ST_ERROR,
"rpc request structure not "
"found");
ret = -1;
goto out;
}
ctx = rpc_req->conn_private;
if ((post->ctx.iobref == NULL) && ctx->rsp_iobref) {
post->ctx.iobref = iobref_ref(ctx->rsp_iobref);
}
ret = 0;
gf_rdma_reply_info_destroy(reply_info);
out:
if (ret == 0) {
ret = gf_rdma_pollin_notify(peer, post);
if (ret < 0) {
gf_msg(GF_RDMA_LOG_NAME, GF_LOG_WARNING, 0,
RDMA_MSG_POLL_IN_NOTIFY_FAILED, "pollin notify failed");
}
}
return ret;
}
static int32_t
gf_rdma_recv_request(gf_rdma_peer_t *peer, gf_rdma_post_t *post,
gf_rdma_read_chunk_t *readch)
{
int32_t ret = -1;
if (readch != NULL) {
ret = gf_rdma_do_reads(peer, post, readch);
if (ret < 0) {
gf_msg(GF_RDMA_LOG_NAME, GF_LOG_WARNING, 0,
RDMA_MSG_PEER_READ_FAILED, "rdma read from peer (%s) failed",
peer->trans->peerinfo.identifier);
}
} else {
ret = gf_rdma_pollin_notify(peer, post);
if (ret == -1) {
gf_msg(GF_RDMA_LOG_NAME, GF_LOG_WARNING, 0,
RDMA_MSG_POLL_IN_NOTIFY_FAILED,
"pollin notification failed");
}
}
return ret;
}
void
gf_rdma_process_recv(gf_rdma_peer_t *peer, struct ibv_wc *wc)
{
gf_rdma_post_t *post = NULL;
gf_rdma_read_chunk_t *readch = NULL;
int ret = -1;
uint32_t *ptr = NULL;
enum msg_type msg_type = 0;
gf_rdma_header_t *header = NULL;
gf_rdma_private_t *priv = NULL;
post = (gf_rdma_post_t *)(long)wc->wr_id;
if (post == NULL) {
gf_msg(GF_RDMA_LOG_NAME, GF_LOG_WARNING, 0, RDMA_MSG_POST_MISSING,
"no post found in successful "
"work completion element");
goto out;
}
ret = gf_rdma_decode_header(peer, post, &readch, wc->byte_len);
if (ret == -1) {
gf_msg(GF_RDMA_LOG_NAME, GF_LOG_WARNING, 0,
RDMA_MSG_HEADER_DECODE_FAILED,
"decoding of header "
"failed");
goto out;
}
header = (gf_rdma_header_t *)post->buf;
priv = peer->trans->private;
pthread_mutex_lock(&priv->write_mutex);
{
if (!priv->peer.quota_set) {
priv->peer.quota_set = 1;
/* Initially peer.quota is set to 1 as per RFC 5666. We
* have to account for the quota used while sending
* first msg (which may or may not be returned to pool
* at this point) while deriving peer.quota from
* header->rm_credit. Hence the arithmetic below,
* instead of directly setting it to header->rm_credit.
*/
priv->peer.quota = header->rm_credit - (1 - priv->peer.quota);
}
}
pthread_mutex_unlock(&priv->write_mutex);
switch (header->rm_type) {
case GF_RDMA_MSG:
ptr = (uint32_t *)post->ctx.vector[0].iov_base;
msg_type = ntoh32(*(ptr + 1));
break;
case GF_RDMA_NOMSG:
if (readch != NULL) {
msg_type = CALL;
} else {
msg_type = REPLY;
}
break;
case GF_RDMA_ERROR:
if (header->rm_body.rm_error.rm_type == ERR_CHUNK) {
gf_msg(GF_RDMA_LOG_NAME, GF_LOG_WARNING, 0,
RDMA_MSG_RDMA_ERROR_RECEIVED,
"peer (%s), couldn't encode or decode the msg "
"properly or write chunks were not provided "
"for replies that were bigger than "
"RDMA_INLINE_THRESHOLD (%d)",
peer->trans->peerinfo.identifier,
GLUSTERFS_RDMA_INLINE_THRESHOLD);
ret = gf_rdma_pollin_notify(peer, post);
if (ret == -1) {
gf_msg_debug(GF_RDMA_LOG_NAME, 0,
"pollin "
"notification failed");
}
goto out;
} else {
gf_msg(GF_RDMA_LOG_NAME, GF_LOG_ERROR, 0,
TRANS_MSG_TRANSPORT_ERROR,
"an error has "
"happened while transmission of msg, "
"disconnecting the transport");
ret = -1;
goto out;
}
default:
gf_msg(GF_RDMA_LOG_NAME, GF_LOG_WARNING, 0, RDMA_MSG_INVALID_ENTRY,
"invalid rdma msg-type (%d)", header->rm_type);
goto out;
}
if (msg_type == CALL) {
ret = gf_rdma_recv_request(peer, post, readch);
if (ret < 0) {
gf_msg(GF_RDMA_LOG_NAME, GF_LOG_WARNING, 0,
RDMA_MSG_PEER_REQ_FAILED,
"receiving a request"
" from peer (%s) failed",
peer->trans->peerinfo.identifier);
}
} else {
ret = gf_rdma_recv_reply(peer, post);
if (ret < 0) {
gf_msg(GF_RDMA_LOG_NAME, GF_LOG_WARNING, 0,
RDMA_MSG_PEER_REP_FAILED,
"receiving a reply "
"from peer (%s) failed",
peer->trans->peerinfo.identifier);
}
}
out:
if (ret == -1) {
rpc_transport_disconnect(peer->trans, _gf_false);
}
return;
}
void *
gf_rdma_async_event_thread(void *context)
{
struct ibv_async_event event;
int ret;
while (1) {
do {
ret = ibv_get_async_event((struct ibv_context *)context, &event);
if (ret && errno != EINTR) {
gf_msg(GF_RDMA_LOG_NAME, GF_LOG_WARNING, errno,
RDMA_MSG_EVENT_ERROR,
"Error getting "
"event");
}
} while (ret && errno == EINTR);
switch (event.event_type) {
case IBV_EVENT_SRQ_LIMIT_REACHED:
gf_msg(GF_RDMA_LOG_NAME, GF_LOG_WARNING, 0,
RDMA_MSG_EVENT_SRQ_LIMIT_REACHED,
"received "
"srq_limit reached");
break;
default:
gf_msg_debug(GF_RDMA_LOG_NAME, 0,
"event (%d) "
"received",
event.event_type);
break;
}
ibv_ack_async_event(&event);
}
return 0;
}
static void *
gf_rdma_recv_completion_proc(void *data)
{
struct ibv_comp_channel *chan = NULL;
gf_rdma_device_t *device = NULL;
;
gf_rdma_post_t *post = NULL;
gf_rdma_peer_t *peer = NULL;
struct ibv_cq *event_cq = NULL;
struct ibv_wc wc[10] = {
{0},
};
void *event_ctx = NULL;
int32_t ret = 0;
int32_t num_wr = 0, index = 0;
uint8_t failed = 0;
chan = data;
while (1) {
failed = 0;
ret = ibv_get_cq_event(chan, &event_cq, &event_ctx);
if (ret) {
gf_msg(GF_RDMA_LOG_NAME, GF_LOG_ERROR, errno,
RDMA_MSG_IBV_GET_CQ_FAILED,
"ibv_get_cq_event failed, terminating recv "
"thread %d (%d)",
ret, errno);
continue;
}
device = event_ctx;
ret = ibv_req_notify_cq(event_cq, 0);
if (ret) {
gf_msg(GF_RDMA_LOG_NAME, GF_LOG_ERROR, errno,
RDMA_MSG_IBV_REQ_NOTIFY_CQ_FAILED,
"ibv_req_notify_cq on %s failed, terminating "
"recv thread: %d (%d)",
device->device_name, ret, errno);
continue;
}
device = (gf_rdma_device_t *)event_ctx;
while (!failed && (num_wr = ibv_poll_cq(event_cq, 10, wc)) > 0) {
for (index = 0; index < num_wr && !failed; index++) {
post = (gf_rdma_post_t *)(long)wc[index].wr_id;
pthread_mutex_lock(&device->qpreg.lock);
{
peer = __gf_rdma_lookup_peer(device, wc[index].qp_num);
/*
* keep a refcount on transport so that it
* does not get freed because of some error
* indicated by wc.status till we are done
* with usage of peer and thereby that of
* trans.
*/
if (peer != NULL) {
rpc_transport_ref(peer->trans);
}
}
pthread_mutex_unlock(&device->qpreg.lock);
if (wc[index].status != IBV_WC_SUCCESS) {
gf_msg(GF_RDMA_LOG_NAME, GF_LOG_ERROR, 0,
RDMA_MSG_RECV_ERROR,
"recv work "
"request on `%s' returned error (%d)",
device->device_name, wc[index].status);
failed = 1;
if (peer) {
ibv_ack_cq_events(event_cq, num_wr);
rpc_transport_disconnect(peer->trans, _gf_false);
rpc_transport_unref(peer->trans);
}
if (post) {
gf_rdma_post_unref(post);
}
continue;
}
if (peer) {
gf_rdma_process_recv(peer, &wc[index]);
rpc_transport_unref(peer->trans);
} else {
gf_msg_debug(GF_RDMA_LOG_NAME, 0,
"could not lookup peer "
"for qp_num: %d",
wc[index].qp_num);
}
gf_rdma_post_unref(post);
}
}
if (ret < 0) {
gf_msg(GF_RDMA_LOG_NAME, GF_LOG_ERROR, errno,
RDMA_MSG_IBV_POLL_CQ_ERROR,
"ibv_poll_cq on `%s' returned error "
"(ret = %d, errno = %d)",
device->device_name, ret, errno);
continue;
}
if (!failed)
ibv_ack_cq_events(event_cq, num_wr);
}
return NULL;
}
void
gf_rdma_handle_failed_send_completion(gf_rdma_peer_t *peer, struct ibv_wc *wc)
{
gf_rdma_post_t *post = NULL;
gf_rdma_device_t *device = NULL;
gf_rdma_private_t *priv = NULL;
if (peer != NULL) {
priv = peer->trans->private;
if (priv != NULL) {
device = priv->device;
}
}
post = (gf_rdma_post_t *)(long)wc->wr_id;
gf_msg(GF_RDMA_LOG_NAME, GF_LOG_WARNING, 0, RDMA_MSG_RDMA_HANDLE_FAILED,
"send work request on `%s' returned error "
"wc.status = %d, wc.vendor_err = %d, post->buf = %p, "
"wc.byte_len = %d, post->reused = %d",
(device != NULL) ? device->device_name : NULL, wc->status,
wc->vendor_err, post->buf, wc->byte_len, post->reused);
if (wc->status == IBV_WC_RETRY_EXC_ERR) {
gf_msg("rdma", GF_LOG_ERROR, 0, TRANS_MSG_TIMEOUT_EXCEEDED,
"connection between client and server not working. "
"check by running 'ibv_srq_pingpong'. also make sure "
"subnet manager is running (eg: 'opensm'), or check "
"if rdma port is valid (or active) by running "
"'ibv_devinfo'. contact Gluster Support Team if the "
"problem persists.");
}
if (peer) {
rpc_transport_disconnect(peer->trans, _gf_false);
}
return;
}
void
gf_rdma_handle_successful_send_completion(gf_rdma_peer_t *peer,
struct ibv_wc *wc)
{
gf_rdma_post_t *post = NULL;
int reads = 0, ret = 0;
gf_rdma_header_t *header = NULL;
if (wc->opcode != IBV_WC_RDMA_READ) {
goto out;
}
post = (gf_rdma_post_t *)(long)wc->wr_id;
pthread_mutex_lock(&post->lock);
{
reads = --post->ctx.gf_rdma_reads;
}
pthread_mutex_unlock(&post->lock);
if (reads != 0) {
/* if it is not the last rdma read, we've got nothing to do */
goto out;
}
header = (gf_rdma_header_t *)post->buf;
if (header->rm_type == GF_RDMA_NOMSG) {
post->ctx.count = 1;
post->ctx.vector[0].iov_len += post->ctx.vector[1].iov_len;
}
/*
* if reads performed as vectored, then all the buffers are actually
* contiguous memory, so that we can use it as single vector, instead
* of multiple.
*/
while (post->ctx.count > 2) {
post->ctx.vector[1].iov_len += post->ctx.vector[post->ctx.count - 1]
.iov_len;
post->ctx.count--;
}
ret = gf_rdma_pollin_notify(peer, post);
if ((ret == -1) && (peer != NULL)) {
rpc_transport_disconnect(peer->trans, _gf_false);
}
out:
return;
}
static void *
gf_rdma_send_completion_proc(void *data)
{
struct ibv_comp_channel *chan = NULL;
gf_rdma_post_t *post = NULL;
gf_rdma_peer_t *peer = NULL;
struct ibv_cq *event_cq = NULL;
void *event_ctx = NULL;
gf_rdma_device_t *device = NULL;
struct ibv_wc wc[10] = {
{0},
};
char is_request = 0;
int32_t ret = 0, quota_ret = 0, num_wr = 0;
int32_t index = 0, failed = 0;
chan = data;
while (1) {
failed = 0;
ret = ibv_get_cq_event(chan, &event_cq, &event_ctx);
if (ret) {
gf_msg(GF_RDMA_LOG_NAME, GF_LOG_ERROR, errno,
RDMA_MSG_IBV_GET_CQ_FAILED,
"ibv_get_cq_event on failed, terminating "
"send thread: %d (%d)",
ret, errno);
continue;
}
device = event_ctx;
ret = ibv_req_notify_cq(event_cq, 0);
if (ret) {
gf_msg(GF_RDMA_LOG_NAME, GF_LOG_ERROR, errno,
RDMA_MSG_IBV_REQ_NOTIFY_CQ_FAILED,
"ibv_req_notify_cq on %s failed, terminating "
"send thread: %d (%d)",
device->device_name, ret, errno);
continue;
}
while (!failed && (num_wr = ibv_poll_cq(event_cq, 10, wc)) > 0) {
for (index = 0; index < num_wr && !failed; index++) {
post = (gf_rdma_post_t *)(long)wc[index].wr_id;
pthread_mutex_lock(&device->qpreg.lock);
{
peer = __gf_rdma_lookup_peer(device, wc[index].qp_num);
/*
* keep a refcount on transport so that it
* does not get freed because of some error
* indicated by wc.status, till we are done
* with usage of peer and thereby that of trans.
*/
if (peer != NULL) {
rpc_transport_ref(peer->trans);
}
}
pthread_mutex_unlock(&device->qpreg.lock);
if (wc[index].status != IBV_WC_SUCCESS) {
ibv_ack_cq_events(event_cq, num_wr);
failed = 1;
gf_rdma_handle_failed_send_completion(peer, &wc[index]);
} else {
gf_rdma_handle_successful_send_completion(peer, &wc[index]);
}
if (post) {
is_request = post->ctx.is_request;
ret = gf_rdma_post_unref(post);
if ((ret == 0) && (wc[index].status == IBV_WC_SUCCESS) &&
!is_request && (post->type == GF_RDMA_SEND_POST) &&
(peer != NULL)) {
/* An GF_RDMA_RECV_POST can end up in
* gf_rdma_send_completion_proc for
* rdma-reads, and we do not take
* quota for getting an GF_RDMA_RECV_POST.
*/
/*
* if it is request, quota is returned
* after reply has come.
*/
quota_ret = gf_rdma_quota_put(peer);
if (quota_ret < 0) {
gf_msg_debug("rdma", 0,
"failed to send "
"message");
}
}
}
if (peer) {
rpc_transport_unref(peer->trans);
} else {
gf_msg_debug(GF_RDMA_LOG_NAME, 0,
"could not lookup peer for qp_num: %d",
wc[index].qp_num);
}
}
}
if (ret < 0) {
gf_msg(GF_RDMA_LOG_NAME, GF_LOG_ERROR, errno,
RDMA_MSG_IBV_POLL_CQ_ERROR,
"ibv_poll_cq on `%s' returned error (ret = %d,"
" errno = %d)",
device->device_name, ret, errno);
continue;
}
if (!failed)
ibv_ack_cq_events(event_cq, num_wr);
}
return NULL;
}
static void
gf_rdma_options_init(rpc_transport_t *this)
{
gf_rdma_private_t *priv = NULL;
gf_rdma_options_t *options = NULL;
int32_t mtu = 0;
data_t *temp = NULL;
/* TODO: validate arguments from options below */
priv = this->private;
options = &priv->options;
options->send_size =
GLUSTERFS_RDMA_INLINE_THRESHOLD; /*this->ctx->page_size * 4; 512 KB*/
options->recv_size =
GLUSTERFS_RDMA_INLINE_THRESHOLD; /*this->ctx->page_size * 4; 512 KB*/
options->send_count = 4096;
options->recv_count = 4096;
options->attr_timeout = GF_RDMA_TIMEOUT;
options->attr_retry_cnt = GF_RDMA_RETRY_CNT;
options->attr_rnr_retry = GF_RDMA_RNR_RETRY;
temp = dict_get(this->options, "transport.listen-backlog");
if (temp)
options->backlog = data_to_uint32(temp);
else
options->backlog = GLUSTERFS_SOCKET_LISTEN_BACKLOG;
temp = dict_get(this->options, "transport.rdma.work-request-send-count");
if (temp)
options->send_count = data_to_int32(temp);
temp = dict_get(this->options, "transport.rdma.work-request-recv-count");
if (temp)
options->recv_count = data_to_int32(temp);
temp = dict_get(this->options, "transport.rdma.attr-timeout");
if (temp)
options->attr_timeout = data_to_uint8(temp);
temp = dict_get(this->options, "transport.rdma.attr-retry-cnt");
if (temp)
options->attr_retry_cnt = data_to_uint8(temp);
temp = dict_get(this->options, "transport.rdma.attr-rnr-retry");
if (temp)
options->attr_rnr_retry = data_to_uint8(temp);
options->port = 1;
temp = dict_get(this->options, "transport.rdma.port");
if (temp)
options->port = data_to_uint64(temp);
options->mtu = mtu = IBV_MTU_2048;
temp = dict_get(this->options, "transport.rdma.mtu");
if (temp)
mtu = data_to_int32(temp);
switch (mtu) {
case 256:
options->mtu = IBV_MTU_256;
break;
case 512:
options->mtu = IBV_MTU_512;
break;
case 1024:
options->mtu = IBV_MTU_1024;
break;
case 2048:
options->mtu = IBV_MTU_2048;
break;
case 4096:
options->mtu = IBV_MTU_4096;
break;
default:
if (temp)
gf_msg(GF_RDMA_LOG_NAME, GF_LOG_WARNING, 0,
RDMA_MSG_UNRECG_MTU_VALUE,
"%s: unrecognized "
"MTU value '%s', defaulting to '2048'",
this->name, data_to_str(temp));
else
gf_msg_trace(GF_RDMA_LOG_NAME, 0,
"%s: defaulting "
"MTU to '2048'",
this->name);
options->mtu = IBV_MTU_2048;
break;
}
temp = dict_get(this->options, "transport.rdma.device-name");
if (temp)
options->device_name = gf_strdup(temp->data);
return;
}
gf_rdma_ctx_t *
__gf_rdma_ctx_create(void)
{
gf_rdma_ctx_t *rdma_ctx = NULL;
int ret = -1;
rdma_ctx = GF_CALLOC(1, sizeof(*rdma_ctx), gf_common_mt_char);
if (rdma_ctx == NULL) {
goto out;
}
pthread_mutex_init(&rdma_ctx->lock, NULL);
rdma_ctx->rdma_cm_event_channel = rdma_create_event_channel();
if (rdma_ctx->rdma_cm_event_channel == NULL) {
gf_msg(GF_RDMA_LOG_NAME, GF_LOG_WARNING, errno,
RDMA_MSG_CM_EVENT_FAILED,
"rdma_cm event channel "
"creation failed");
goto out;
}
ret = gf_thread_create(&rdma_ctx->rdma_cm_thread, NULL,
gf_rdma_cm_event_handler,
rdma_ctx->rdma_cm_event_channel, "rdmaehan");
if (ret != 0) {
gf_msg(GF_RDMA_LOG_NAME, GF_LOG_WARNING, ret, RDMA_MSG_CM_EVENT_FAILED,
"creation of thread to "
"handle rdma-cm events failed");
goto out;
}
out:
if (ret < 0 && rdma_ctx) {
if (rdma_ctx->rdma_cm_event_channel != NULL) {
rdma_destroy_event_channel(rdma_ctx->rdma_cm_event_channel);
}
GF_FREE(rdma_ctx);
rdma_ctx = NULL;
}
return rdma_ctx;
}
static int32_t
gf_rdma_init(rpc_transport_t *this)
{
gf_rdma_private_t *priv = NULL;
int32_t ret = 0;
glusterfs_ctx_t *ctx = NULL;
gf_rdma_options_t *options = NULL;
ctx = this->ctx;
priv = this->private;
ibv_fork_init();
gf_rdma_options_init(this);
options = &priv->options;
priv->peer.send_count = options->send_count;
priv->peer.recv_count = options->recv_count;
priv->peer.send_size = options->send_size;
priv->peer.recv_size = options->recv_size;
priv->backlog = options->backlog;
priv->peer.trans = this;
INIT_LIST_HEAD(&priv->peer.ioq);
pthread_mutex_init(&priv->write_mutex, NULL);
pthread_mutex_init(&priv->recv_mutex, NULL);
pthread_cond_init(&priv->recv_cond, NULL);
LOCK(&ctx->lock);
{
if (ctx->ib == NULL) {
ctx->ib = __gf_rdma_ctx_create();
if (ctx->ib == NULL) {
ret = -1;
}
}
}
UNLOCK(&ctx->lock);
return ret;
}
static int32_t
gf_rdma_disconnect(rpc_transport_t *this, gf_boolean_t wait)
{
gf_rdma_private_t *priv = NULL;
int32_t ret = 0;
priv = this->private;
gf_msg_callingfn(this->name, GF_LOG_DEBUG, 0, RDMA_MSG_PEER_DISCONNECTED,
"disconnect called (peer:%s)", this->peerinfo.identifier);
pthread_mutex_lock(&priv->write_mutex);
{
ret = __gf_rdma_disconnect(this);
}
pthread_mutex_unlock(&priv->write_mutex);
return ret;
}
static int32_t
gf_rdma_connect(struct rpc_transport *this, int port)
{
gf_rdma_private_t *priv = NULL;
int32_t ret = 0;
union gf_sock_union sock_union = {
{
0,
},
};
socklen_t sockaddr_len = 0;
gf_rdma_peer_t *peer = NULL;
gf_rdma_ctx_t *rdma_ctx = NULL;
gf_boolean_t connected = _gf_false;
priv = this->private;
peer = &priv->peer;
rpc_transport_ref(this);
ret = gf_rdma_client_get_remote_sockaddr(this, &sock_union.sa,
&sockaddr_len, port);
if (ret != 0) {
gf_msg_debug(this->name, 0,
"cannot get remote address to "
"connect");
goto out;
}
rdma_ctx = this->ctx->ib;
pthread_mutex_lock(&priv->write_mutex);
{
if (peer->cm_id != NULL) {
ret = -1;
errno = EINPROGRESS;
connected = _gf_true;
goto unlock;
}
priv->entity = GF_RDMA_CLIENT;
ret = rdma_create_id(rdma_ctx->rdma_cm_event_channel, &peer->cm_id,
this, RDMA_PS_TCP);
if (ret != 0) {
gf_msg(this->name, GF_LOG_ERROR, errno, RDMA_MSG_CM_EVENT_FAILED,
"creation of "
"rdma_cm_id failed");
ret = -errno;
goto unlock;
}
memcpy(&this->peerinfo.sockaddr, &sock_union.storage, sockaddr_len);
this->peerinfo.sockaddr_len = sockaddr_len;
if (port > 0)
sock_union.sin.sin_port = htons(port);
((struct sockaddr *)&this->myinfo.sockaddr)->sa_family =
((struct sockaddr *)&this->peerinfo.sockaddr)->sa_family;
ret = gf_rdma_client_bind(this,
(struct sockaddr *)&this->myinfo.sockaddr,
&this->myinfo.sockaddr_len, peer->cm_id);
if (ret != 0) {
gf_msg(this->name, GF_LOG_WARNING, errno,
RDMA_MSG_CLIENT_BIND_FAILED, "client bind failed");
goto unlock;
}
ret = rdma_resolve_addr(peer->cm_id, NULL, &sock_union.sa, 2000);
if (ret != 0) {
gf_msg(this->name, GF_LOG_WARNING, errno,
RDMA_MSG_RDMA_RESOLVE_ADDR_FAILED,
"rdma_resolve_addr failed");
goto unlock;
}
priv->connected = 0;
}
unlock:
pthread_mutex_unlock(&priv->write_mutex);
out:
if (ret != 0) {
if (!connected) {
gf_rdma_teardown(this);
}
rpc_transport_unref(this);
}
return ret;
}
static int32_t
gf_rdma_listen(rpc_transport_t *this)
{
union gf_sock_union sock_union = {
{
0,
},
};
socklen_t sockaddr_len = 0;
gf_rdma_private_t *priv = NULL;
gf_rdma_peer_t *peer = NULL;
int ret = 0;
gf_rdma_ctx_t *rdma_ctx = NULL;
cmd_args_t *cmd_args = NULL;
char service[NI_MAXSERV], host[NI_MAXHOST];
int optval = 2;
priv = this->private;
peer = &priv->peer;
priv->entity = GF_RDMA_SERVER_LISTENER;
rdma_ctx = this->ctx->ib;
ret = gf_rdma_server_get_local_sockaddr(this, &sock_union.sa,
&sockaddr_len);
if (ret != 0) {
gf_msg(this->name, GF_LOG_WARNING, 0, RDMA_MSG_NW_ADDR_UNKNOWN,
"cannot find network address of server to bind to");
goto err;
}
ret = rdma_create_id(rdma_ctx->rdma_cm_event_channel, &peer->cm_id, this,
RDMA_PS_TCP);
if (ret != 0) {
gf_msg(this->name, GF_LOG_WARNING, errno, RDMA_MSG_CM_EVENT_FAILED,
"creation of rdma_cm_id "
"failed");
goto err;
}
memcpy(&this->myinfo.sockaddr, &sock_union.storage, sockaddr_len);
this->myinfo.sockaddr_len = sockaddr_len;
ret = getnameinfo((struct sockaddr *)&this->myinfo.sockaddr,
this->myinfo.sockaddr_len, host, sizeof(host), service,
sizeof(service), NI_NUMERICHOST);
if (ret != 0) {
gf_msg(this->name, GF_LOG_ERROR, ret, TRANS_MSG_GET_NAME_INFO_FAILED,
"getnameinfo failed");
goto err;
}
if (snprintf(this->myinfo.identifier, UNIX_PATH_MAX, "%s:%s", host,
service) >= UNIX_PATH_MAX) {
gf_msg(this->name, GF_LOG_WARNING, 0, RDMA_MSG_BUFFER_ERROR,
"host and service name too large");
goto err;
}
ret = rdma_set_option(peer->cm_id, RDMA_OPTION_ID, RDMA_OPTION_ID_REUSEADDR,
(void *)&optval, sizeof(optval));
if (ret != 0) {
gf_msg(this->name, GF_LOG_WARNING, errno, RDMA_MSG_OPTION_SET_FAILED,
"rdma option set failed");
goto err;
}
ret = rdma_bind_addr(peer->cm_id, &sock_union.sa);
if (ret != 0) {
gf_msg(this->name, GF_LOG_WARNING, errno,
RDMA_MSG_RDMA_BIND_ADDR_FAILED, "rdma_bind_addr failed");
goto err;
}
ret = rdma_listen(peer->cm_id, priv->backlog);
if (ret != 0) {
gf_msg(this->name, GF_LOG_WARNING, errno, RDMA_MSG_LISTEN_FAILED,
"rdma_listen failed");
goto err;
}
cmd_args = &(this->ctx->cmd_args);
if (!cmd_args->brick_port2) {
cmd_args->brick_port2 = rdma_get_src_port(peer->cm_id);
gf_log(this->name, GF_LOG_INFO,
"process started listening on port (%d)", cmd_args->brick_port2);
}
rpc_transport_ref(this);
ret = 0;
err:
if (ret < 0) {
if (peer->cm_id != NULL) {
rdma_destroy_id(peer->cm_id);
peer->cm_id = NULL;
}
}
return ret;
}
struct rpc_transport_ops tops = {
.submit_request = gf_rdma_submit_request,
.submit_reply = gf_rdma_submit_reply,
.connect = gf_rdma_connect,
.disconnect = gf_rdma_disconnect,
.listen = gf_rdma_listen,
};
int32_t
init(rpc_transport_t *this)
{
gf_rdma_private_t *priv = NULL;
gf_rdma_ctx_t *rdma_ctx = NULL;
struct iobuf_pool *iobuf_pool = NULL;
priv = GF_CALLOC(1, sizeof(*priv), gf_common_mt_rdma_private_t);
if (!priv)
return -1;
this->private = priv;
if (gf_rdma_init(this)) {
gf_msg(this->name, GF_LOG_WARNING, 0, RDMA_MSG_INIT_IB_DEVICE_FAILED,
"Failed to initialize IB Device");
this->private = NULL;
GF_FREE(priv);
return -1;
}
rdma_ctx = this->ctx->ib;
if (!rdma_ctx)
return -1;
pthread_mutex_lock(&rdma_ctx->lock);
{
if (this->dl_handle && (++(rdma_ctx->dlcount)) == 1) {
iobuf_pool = this->ctx->iobuf_pool;
iobuf_pool->rdma_registration = gf_rdma_register_arena;
iobuf_pool->rdma_deregistration = gf_rdma_deregister_arena;
gf_rdma_register_iobuf_pool_with_device(rdma_ctx->device,
iobuf_pool);
}
}
pthread_mutex_unlock(&rdma_ctx->lock);
return 0;
}
int
reconfigure(rpc_transport_t *this, dict_t *options)
{
gf_rdma_private_t *priv = NULL;
uint32_t backlog = 0;
int ret = -1;
GF_VALIDATE_OR_GOTO("rdma", this, out);
GF_VALIDATE_OR_GOTO("rdma", this->private, out);
priv = this->private;
if (dict_get_uint32(options, "transport.listen-backlog", &backlog) == 0) {
priv->backlog = backlog;
gf_log(this->name, GF_LOG_DEBUG,
"Reconfigued "
"transport.listen-backlog=%d",
priv->backlog);
}
ret = 0;
out:
return ret;
}
void
fini(struct rpc_transport *this)
{
/* TODO: verify this function does graceful finish */
gf_rdma_private_t *priv = NULL;
struct iobuf_pool *iobuf_pool = NULL;
gf_rdma_ctx_t *rdma_ctx = NULL;
priv = this->private;
this->private = NULL;
if (priv) {
pthread_mutex_destroy(&priv->recv_mutex);
pthread_mutex_destroy(&priv->write_mutex);
gf_msg_trace(this->name, 0, "called fini on transport: %p", this);
GF_FREE(priv);
}
rdma_ctx = this->ctx->ib;
if (!rdma_ctx)
return;
pthread_mutex_lock(&rdma_ctx->lock);
{
if (this->dl_handle && (--(rdma_ctx->dlcount)) == 0) {
iobuf_pool = this->ctx->iobuf_pool;
gf_rdma_deregister_iobuf_pool(rdma_ctx->device);
iobuf_pool->rdma_registration = NULL;
iobuf_pool->rdma_deregistration = NULL;
}
}
pthread_mutex_unlock(&rdma_ctx->lock);
return;
}
/* TODO: expand each option */
struct volume_options options[] = {
{.key = {"transport.rdma.port", "rdma-port"},
.type = GF_OPTION_TYPE_INT,
.min = 1,
.max = 4,
.description = "check the option by 'ibv_devinfo'"},
{
.key = {"transport.rdma.mtu", "rdma-mtu"},
.type = GF_OPTION_TYPE_INT,
},
{.key = {"transport.rdma.device-name", "rdma-device-name"},
.type = GF_OPTION_TYPE_ANY,
.description = "check by 'ibv_devinfo'"},
{
.key = {"transport.rdma.work-request-send-count",
"rdma-work-request-send-count"},
.type = GF_OPTION_TYPE_INT,
},
{
.key = {"transport.rdma.work-request-recv-count",
"rdma-work-request-recv-count"},
.type = GF_OPTION_TYPE_INT,
},
{.key = {"remote-port", "transport.remote-port",
"transport.rdma.remote-port"},
.type = GF_OPTION_TYPE_INT},
{.key = {"transport.rdma.attr-timeout", "rdma-attr-timeout"},
.type = GF_OPTION_TYPE_INT},
{.key = {"transport.rdma.attr-retry-cnt", "rdma-attr-retry-cnt"},
.type = GF_OPTION_TYPE_INT},
{.key = {"transport.rdma.attr-rnr-retry", "rdma-attr-rnr-retry"},
.type = GF_OPTION_TYPE_INT},
{.key = {"transport.rdma.listen-port", "listen-port"},
.type = GF_OPTION_TYPE_INT},
{.key = {"transport.rdma.connect-path", "connect-path"},
.type = GF_OPTION_TYPE_ANY},
{.key = {"transport.rdma.bind-path", "bind-path"},
.type = GF_OPTION_TYPE_ANY},
{.key = {"transport.rdma.listen-path", "listen-path"},
.type = GF_OPTION_TYPE_ANY},
{.key = {"transport.address-family", "address-family"},
.value = {"inet", "inet6", "inet/inet6", "inet6/inet", "unix", "inet-sdp"},
.type = GF_OPTION_TYPE_STR},
{.key = {"transport.socket.lowlat"}, .type = GF_OPTION_TYPE_BOOL},
{.key = {NULL}}};