/**
* Copyright (C) Mellanox Technologies Ltd. 2017-2019. ALL RIGHTS RESERVED.
* See file LICENSE for terms.
*/
#include "rdmacm_ep.h"
#define UCT_RDMACM_CB_FLAGS_CHECK(_flags) \
do { \
UCT_CB_FLAGS_CHECK(_flags); \
if (!((_flags) & UCT_CB_FLAG_ASYNC)) { \
return UCS_ERR_UNSUPPORTED; \
} \
} while (0)
ucs_status_t uct_rdmacm_ep_resolve_addr(uct_rdmacm_ep_t *ep)
{
uct_rdmacm_iface_t *iface = ucs_derived_of(ep->super.super.iface, uct_rdmacm_iface_t);
ucs_status_t status;
UCS_ASYNC_BLOCK(iface->super.worker->async);
status = uct_rdmacm_resolve_addr(ep->cm_id_ctx->cm_id,
(struct sockaddr *)&ep->remote_addr,
UCS_MSEC_PER_SEC * iface->config.addr_resolve_timeout,
UCS_LOG_LEVEL_ERROR);
UCS_ASYNC_UNBLOCK(iface->super.worker->async);
return status;
}
ucs_status_t uct_rdmacm_ep_set_cm_id(uct_rdmacm_iface_t *iface, uct_rdmacm_ep_t *ep)
{
ucs_status_t status;
UCS_ASYNC_BLOCK(iface->super.worker->async);
/* create a cm_id for the client side */
if (iface->cm_id_quota > 0) {
/* Create an id for this interface. Events associated with this id will be
* reported on the event_channel that was created on iface init. */
ep->cm_id_ctx = ucs_malloc(sizeof(*ep->cm_id_ctx), "client cm_id_ctx");
if (ep->cm_id_ctx == NULL) {
status = UCS_ERR_NO_MEMORY;
goto out;
}
if (rdma_create_id(iface->event_ch, &ep->cm_id_ctx->cm_id,
ep->cm_id_ctx, RDMA_PS_UDP)) {
ucs_error("rdma_create_id() failed: %m");
status = UCS_ERR_IO_ERROR;
goto out_free;
}
ep->cm_id_ctx->ep = ep;
ucs_list_add_tail(&iface->used_cm_ids_list, &ep->cm_id_ctx->list);
iface->cm_id_quota--;
ucs_debug("ep %p, new cm_id %p. cm_id_in_quota %d", ep,
ep->cm_id_ctx->cm_id, iface->cm_id_quota);
status = UCS_OK;
goto out;
} else {
ep->cm_id_ctx = NULL;
status = UCS_ERR_NO_RESOURCE;
goto out;
}
out_free:
ucs_free(ep->cm_id_ctx);
out:
UCS_ASYNC_UNBLOCK(iface->super.worker->async);
return status;
}
static inline void uct_rdmacm_ep_add_to_pending(uct_rdmacm_iface_t *iface, uct_rdmacm_ep_t *ep)
{
UCS_ASYNC_BLOCK(iface->super.worker->async);
ucs_list_add_tail(&iface->pending_eps_list, &ep->list_elem);
ep->is_on_pending = 1;
UCS_ASYNC_UNBLOCK(iface->super.worker->async);
}
static UCS_CLASS_INIT_FUNC(uct_rdmacm_ep_t, const uct_ep_params_t *params)
{
uct_rdmacm_iface_t *iface = ucs_derived_of(params->iface,
uct_rdmacm_iface_t);
const ucs_sock_addr_t *sockaddr = params->sockaddr;
char ip_port_str[UCS_SOCKADDR_STRING_LEN];
ucs_status_t status;
UCS_CLASS_CALL_SUPER_INIT(uct_base_ep_t, &iface->super);
if (iface->is_server) {
/* TODO allow an interface to be used both for server and client */
return UCS_ERR_UNSUPPORTED;
}
if (!(params->field_mask & UCT_EP_PARAM_FIELD_SOCKADDR)) {
return UCS_ERR_INVALID_PARAM;
}
UCT_RDMACM_CB_FLAGS_CHECK((params->field_mask &
UCT_EP_PARAM_FIELD_SOCKADDR_CB_FLAGS) ?
params->sockaddr_cb_flags : 0);
/* Initialize these fields before calling rdma_resolve_addr to avoid a race
* where they are used before being initialized (from the async thread
* - after an RDMA_CM_EVENT_ROUTE_RESOLVED event) */
self->pack_cb = (params->field_mask &
UCT_EP_PARAM_FIELD_SOCKADDR_PACK_CB) ?
params->sockaddr_pack_cb : NULL;
self->pack_cb_arg = (params->field_mask &
UCT_EP_PARAM_FIELD_USER_DATA) ?
params->user_data : NULL;
self->pack_cb_flags = (params->field_mask &
UCT_EP_PARAM_FIELD_SOCKADDR_CB_FLAGS) ?
params->sockaddr_cb_flags : 0;
pthread_mutex_init(&self->ops_mutex, NULL);
ucs_queue_head_init(&self->ops);
/* Save the remote address */
if (sockaddr->addr->sa_family == AF_INET) {
memcpy(&self->remote_addr, sockaddr->addr, sizeof(struct sockaddr_in));
} else if (sockaddr->addr->sa_family == AF_INET6) {
memcpy(&self->remote_addr, sockaddr->addr, sizeof(struct sockaddr_in6));
} else {
ucs_error("rdmacm ep: unknown remote sa_family=%d", sockaddr->addr->sa_family);
status = UCS_ERR_IO_ERROR;
goto err;
}
self->slow_prog_id = UCS_CALLBACKQ_ID_NULL;
status = uct_rdmacm_ep_set_cm_id(iface, self);
if (status == UCS_ERR_NO_RESOURCE) {
goto add_to_pending;
} else if (status != UCS_OK) {
goto err;
}
self->is_on_pending = 0;
/* After rdma_resolve_addr(), the client will wait for an
* RDMA_CM_EVENT_ADDR_RESOLVED event on the event_channel
* to proceed with the connection establishment.
* This event will be retrieved from the event_channel by the async thread.
* All endpoints share the interface's event_channel. */
status = uct_rdmacm_ep_resolve_addr(self);
if (status != UCS_OK) {
goto err;
}
goto out;
add_to_pending:
/* Add the ep to the pending queue of eps since there is no
* available cm_id for it */
uct_rdmacm_ep_add_to_pending(iface, self);
out:
ucs_debug("created an RDMACM endpoint on iface %p. event_channel: %p, "
"iface cm_id: %p remote addr: %s",
iface, iface->event_ch, iface->cm_id,
ucs_sockaddr_str((struct sockaddr *)sockaddr->addr,
ip_port_str, UCS_SOCKADDR_STRING_LEN));
self->status = UCS_INPROGRESS;
return UCS_OK;
err:
pthread_mutex_destroy(&self->ops_mutex);
return status;
}
static UCS_CLASS_CLEANUP_FUNC(uct_rdmacm_ep_t)
{
uct_rdmacm_iface_t *iface = ucs_derived_of(self->super.super.iface, uct_rdmacm_iface_t);
uct_rdmacm_ctx_t *cm_id_ctx;
ucs_debug("rdmacm_ep %p: destroying", self);
UCS_ASYNC_BLOCK(iface->super.worker->async);
if (self->is_on_pending) {
ucs_list_del(&self->list_elem);
self->is_on_pending = 0;
}
/* remove the slow progress function in case it was placed on the slow progress
* chain but wasn't invoked yet */
uct_worker_progress_unregister_safe(&iface->super.worker->super,
&self->slow_prog_id);
pthread_mutex_destroy(&self->ops_mutex);
if (!ucs_queue_is_empty(&self->ops)) {
ucs_warn("destroying endpoint %p with not completed operations", self);
}
/* mark this ep as destroyed so that arriving events on it won't try to
* use it */
if (self->cm_id_ctx != NULL) {
cm_id_ctx = self->cm_id_ctx->cm_id->context;
cm_id_ctx->ep = NULL;
ucs_debug("ep destroy: cm_id %p", cm_id_ctx->cm_id);
}
UCS_ASYNC_UNBLOCK(iface->super.worker->async);
}
UCS_CLASS_DEFINE(uct_rdmacm_ep_t, uct_base_ep_t)
UCS_CLASS_DEFINE_NEW_FUNC(uct_rdmacm_ep_t, uct_ep_t, const uct_ep_params_t *);
UCS_CLASS_DEFINE_DELETE_FUNC(uct_rdmacm_ep_t, uct_ep_t);
static unsigned uct_rdmacm_client_err_handle_progress(void *arg)
{
uct_rdmacm_ep_t *rdmacm_ep = arg;
uct_rdmacm_iface_t *iface = ucs_derived_of(rdmacm_ep->super.super.iface,
uct_rdmacm_iface_t);
ucs_trace_func("err_handle ep=%p", rdmacm_ep);
UCS_ASYNC_BLOCK(iface->super.worker->async);
rdmacm_ep->slow_prog_id = UCS_CALLBACKQ_ID_NULL;
uct_set_ep_failed(&UCS_CLASS_NAME(uct_rdmacm_ep_t), &rdmacm_ep->super.super,
rdmacm_ep->super.super.iface, rdmacm_ep->status);
UCS_ASYNC_UNBLOCK(iface->super.worker->async);
return 0;
}
void uct_rdmacm_ep_set_failed(uct_iface_t *iface, uct_ep_h ep, ucs_status_t status)
{
uct_rdmacm_iface_t *rdmacm_iface = ucs_derived_of(iface, uct_rdmacm_iface_t);
uct_rdmacm_ep_t *rdmacm_ep = ucs_derived_of(ep, uct_rdmacm_ep_t);
if (rdmacm_iface->super.err_handler_flags & UCT_CB_FLAG_ASYNC) {
uct_set_ep_failed(&UCS_CLASS_NAME(uct_rdmacm_ep_t), &rdmacm_ep->super.super,
&rdmacm_iface->super.super, status);
} else {
/* invoke the error handling flow from the main thread */
rdmacm_ep->status = status;
uct_worker_progress_register_safe(&rdmacm_iface->super.worker->super,
uct_rdmacm_client_err_handle_progress,
rdmacm_ep, UCS_CALLBACKQ_FLAG_ONESHOT,
&rdmacm_ep->slow_prog_id);
}
}
/**
* Caller must lock ep->ops_mutex
*/
void uct_rdmacm_ep_invoke_completions(uct_rdmacm_ep_t *ep, ucs_status_t status)
{
uct_rdmacm_ep_op_t *op;
ucs_assert(pthread_mutex_trylock(&ep->ops_mutex) == EBUSY);
ucs_queue_for_each_extract(op, &ep->ops, queue_elem, 1) {
pthread_mutex_unlock(&ep->ops_mutex);
uct_invoke_completion(op->user_comp, status);
ucs_free(op);
pthread_mutex_lock(&ep->ops_mutex);
}
/* coverity[missing_unlock] */
}