Blob Blame History Raw
/**
 * Copyright (C) Mellanox Technologies Ltd. 2017-2019.  ALL RIGHTS RESERVED.
 * See file LICENSE for terms.
 */

#include "rdmacm_ep.h"


#define UCT_RDMACM_CB_FLAGS_CHECK(_flags) \
    do { \
        UCT_CB_FLAGS_CHECK(_flags); \
        if (!((_flags) & UCT_CB_FLAG_ASYNC)) { \
            return UCS_ERR_UNSUPPORTED; \
        } \
    } while (0)


ucs_status_t uct_rdmacm_ep_resolve_addr(uct_rdmacm_ep_t *ep)
{
    uct_rdmacm_iface_t *iface = ucs_derived_of(ep->super.super.iface, uct_rdmacm_iface_t);
    ucs_status_t status;

    UCS_ASYNC_BLOCK(iface->super.worker->async);

    status = uct_rdmacm_resolve_addr(ep->cm_id_ctx->cm_id,
                                    (struct sockaddr *)&ep->remote_addr,
                                    UCS_MSEC_PER_SEC * iface->config.addr_resolve_timeout,
                                    UCS_LOG_LEVEL_ERROR);

    UCS_ASYNC_UNBLOCK(iface->super.worker->async);
    return status;
}

ucs_status_t uct_rdmacm_ep_set_cm_id(uct_rdmacm_iface_t *iface, uct_rdmacm_ep_t *ep)
{
    ucs_status_t status;

    UCS_ASYNC_BLOCK(iface->super.worker->async);

    /* create a cm_id for the client side */
    if (iface->cm_id_quota > 0) {
        /* Create an id for this interface. Events associated with this id will be
         * reported on the event_channel that was created on iface init. */
        ep->cm_id_ctx = ucs_malloc(sizeof(*ep->cm_id_ctx), "client cm_id_ctx");
        if (ep->cm_id_ctx == NULL) {
            status = UCS_ERR_NO_MEMORY;
            goto out;
        }

        if (rdma_create_id(iface->event_ch, &ep->cm_id_ctx->cm_id,
                           ep->cm_id_ctx, RDMA_PS_UDP)) {
            ucs_error("rdma_create_id() failed: %m");
            status = UCS_ERR_IO_ERROR;
            goto out_free;
        }

        ep->cm_id_ctx->ep = ep;
        ucs_list_add_tail(&iface->used_cm_ids_list, &ep->cm_id_ctx->list);
        iface->cm_id_quota--;
        ucs_debug("ep %p, new cm_id %p. cm_id_in_quota %d", ep,
                   ep->cm_id_ctx->cm_id, iface->cm_id_quota);
        status = UCS_OK;
        goto out;
    } else {
        ep->cm_id_ctx = NULL;
        status = UCS_ERR_NO_RESOURCE;
        goto out;
    }

out_free:
    ucs_free(ep->cm_id_ctx);
out:
    UCS_ASYNC_UNBLOCK(iface->super.worker->async);
    return status;
}

static inline void uct_rdmacm_ep_add_to_pending(uct_rdmacm_iface_t *iface, uct_rdmacm_ep_t *ep)
{
    UCS_ASYNC_BLOCK(iface->super.worker->async);
    ucs_list_add_tail(&iface->pending_eps_list, &ep->list_elem);
    ep->is_on_pending = 1;
    UCS_ASYNC_UNBLOCK(iface->super.worker->async);
}

static UCS_CLASS_INIT_FUNC(uct_rdmacm_ep_t, const uct_ep_params_t *params)
{
    uct_rdmacm_iface_t *iface       = ucs_derived_of(params->iface,
                                                     uct_rdmacm_iface_t);
    const ucs_sock_addr_t *sockaddr = params->sockaddr;
    char ip_port_str[UCS_SOCKADDR_STRING_LEN];
    ucs_status_t status;

    UCS_CLASS_CALL_SUPER_INIT(uct_base_ep_t, &iface->super);

    if (iface->is_server) {
        /* TODO allow an interface to be used both for server and client */
        return UCS_ERR_UNSUPPORTED;
    }

    if (!(params->field_mask & UCT_EP_PARAM_FIELD_SOCKADDR)) {
        return UCS_ERR_INVALID_PARAM;
    }

    UCT_RDMACM_CB_FLAGS_CHECK((params->field_mask &
                               UCT_EP_PARAM_FIELD_SOCKADDR_CB_FLAGS) ?
                              params->sockaddr_cb_flags : 0);

    /* Initialize these fields before calling rdma_resolve_addr to avoid a race
     * where they are used before being initialized (from the async thread
     * - after an RDMA_CM_EVENT_ROUTE_RESOLVED event) */
    self->pack_cb       = (params->field_mask &
                           UCT_EP_PARAM_FIELD_SOCKADDR_PACK_CB) ?
                          params->sockaddr_pack_cb : NULL;
    self->pack_cb_arg   = (params->field_mask &
                           UCT_EP_PARAM_FIELD_USER_DATA) ?
                          params->user_data : NULL;
    self->pack_cb_flags = (params->field_mask &
                           UCT_EP_PARAM_FIELD_SOCKADDR_CB_FLAGS) ?
                          params->sockaddr_cb_flags : 0;
    pthread_mutex_init(&self->ops_mutex, NULL);
    ucs_queue_head_init(&self->ops);

    /* Save the remote address */
    if (sockaddr->addr->sa_family == AF_INET) {
        memcpy(&self->remote_addr, sockaddr->addr, sizeof(struct sockaddr_in));
    } else if (sockaddr->addr->sa_family == AF_INET6) {
        memcpy(&self->remote_addr, sockaddr->addr, sizeof(struct sockaddr_in6));
    } else {
        ucs_error("rdmacm ep: unknown remote sa_family=%d", sockaddr->addr->sa_family);
        status = UCS_ERR_IO_ERROR;
        goto err;
    }

    self->slow_prog_id = UCS_CALLBACKQ_ID_NULL;

    status = uct_rdmacm_ep_set_cm_id(iface, self);
    if (status == UCS_ERR_NO_RESOURCE) {
        goto add_to_pending;
    } else if (status != UCS_OK) {
        goto err;
    }

    self->is_on_pending = 0;

    /* After rdma_resolve_addr(), the client will wait for an
     * RDMA_CM_EVENT_ADDR_RESOLVED event on the event_channel
     * to proceed with the connection establishment.
     * This event will be retrieved from the event_channel by the async thread.
     * All endpoints share the interface's event_channel. */
    status = uct_rdmacm_ep_resolve_addr(self);
    if (status != UCS_OK) {
        goto err;
    }

    goto out;

add_to_pending:
    /* Add the ep to the pending queue of eps since there is no
     * available cm_id for it */
    uct_rdmacm_ep_add_to_pending(iface, self);
out:
    ucs_debug("created an RDMACM endpoint on iface %p. event_channel: %p, "
              "iface cm_id: %p remote addr: %s",
               iface, iface->event_ch, iface->cm_id,
               ucs_sockaddr_str((struct sockaddr *)sockaddr->addr,
                                ip_port_str, UCS_SOCKADDR_STRING_LEN));
    self->status = UCS_INPROGRESS;
    return UCS_OK;

err:
    pthread_mutex_destroy(&self->ops_mutex);

    return status;
}

static UCS_CLASS_CLEANUP_FUNC(uct_rdmacm_ep_t)
{
    uct_rdmacm_iface_t *iface = ucs_derived_of(self->super.super.iface, uct_rdmacm_iface_t);
    uct_rdmacm_ctx_t *cm_id_ctx;

    ucs_debug("rdmacm_ep %p: destroying", self);

    UCS_ASYNC_BLOCK(iface->super.worker->async);
    if (self->is_on_pending) {
        ucs_list_del(&self->list_elem);
        self->is_on_pending = 0;
    }

    /* remove the slow progress function in case it was placed on the slow progress
     * chain but wasn't invoked yet */
    uct_worker_progress_unregister_safe(&iface->super.worker->super,
                                        &self->slow_prog_id);

    pthread_mutex_destroy(&self->ops_mutex);
    if (!ucs_queue_is_empty(&self->ops)) {
        ucs_warn("destroying endpoint %p with not completed operations", self);
    }

    /* mark this ep as destroyed so that arriving events on it won't try to
     * use it */
    if (self->cm_id_ctx != NULL) {
        cm_id_ctx     = self->cm_id_ctx->cm_id->context;
        cm_id_ctx->ep = NULL;
        ucs_debug("ep destroy: cm_id %p", cm_id_ctx->cm_id);
    }
    UCS_ASYNC_UNBLOCK(iface->super.worker->async);
}

UCS_CLASS_DEFINE(uct_rdmacm_ep_t, uct_base_ep_t)
UCS_CLASS_DEFINE_NEW_FUNC(uct_rdmacm_ep_t, uct_ep_t, const uct_ep_params_t *);
UCS_CLASS_DEFINE_DELETE_FUNC(uct_rdmacm_ep_t, uct_ep_t);

static unsigned uct_rdmacm_client_err_handle_progress(void *arg)
{
    uct_rdmacm_ep_t *rdmacm_ep = arg;
    uct_rdmacm_iface_t *iface = ucs_derived_of(rdmacm_ep->super.super.iface,
                                               uct_rdmacm_iface_t);

    ucs_trace_func("err_handle ep=%p", rdmacm_ep);
    UCS_ASYNC_BLOCK(iface->super.worker->async);

    rdmacm_ep->slow_prog_id = UCS_CALLBACKQ_ID_NULL;
    uct_set_ep_failed(&UCS_CLASS_NAME(uct_rdmacm_ep_t), &rdmacm_ep->super.super,
                      rdmacm_ep->super.super.iface, rdmacm_ep->status);

    UCS_ASYNC_UNBLOCK(iface->super.worker->async);
    return 0;
}

void uct_rdmacm_ep_set_failed(uct_iface_t *iface, uct_ep_h ep, ucs_status_t status)
{
    uct_rdmacm_iface_t *rdmacm_iface = ucs_derived_of(iface, uct_rdmacm_iface_t);
    uct_rdmacm_ep_t *rdmacm_ep       = ucs_derived_of(ep, uct_rdmacm_ep_t);

    if (rdmacm_iface->super.err_handler_flags & UCT_CB_FLAG_ASYNC) {
        uct_set_ep_failed(&UCS_CLASS_NAME(uct_rdmacm_ep_t), &rdmacm_ep->super.super,
                          &rdmacm_iface->super.super, status);
    } else {
        /* invoke the error handling flow from the main thread */
        rdmacm_ep->status = status;
        uct_worker_progress_register_safe(&rdmacm_iface->super.worker->super,
                                          uct_rdmacm_client_err_handle_progress,
                                          rdmacm_ep, UCS_CALLBACKQ_FLAG_ONESHOT,
                                          &rdmacm_ep->slow_prog_id);
    }
}

/**
 * Caller must lock ep->ops_mutex
 */
void uct_rdmacm_ep_invoke_completions(uct_rdmacm_ep_t *ep, ucs_status_t status)
{
    uct_rdmacm_ep_op_t *op;

    ucs_assert(pthread_mutex_trylock(&ep->ops_mutex) == EBUSY);

    ucs_queue_for_each_extract(op, &ep->ops, queue_elem, 1) {
        pthread_mutex_unlock(&ep->ops_mutex);
        uct_invoke_completion(op->user_comp, status);
        ucs_free(op);
        pthread_mutex_lock(&ep->ops_mutex);
    }
    /* coverity[missing_unlock] */
}