Blob Blame History Raw
/**
* Copyright (C) Mellanox Technologies Ltd. 2001-2019.  ALL RIGHTS RESERVED.
*
* See file LICENSE for terms.
*/

#ifdef HAVE_CONFIG_H
#  include "config.h"
#endif

#include "cm.h"

#include <uct/api/uct.h>
#include <uct/ib/base/ib_iface.h>
#include <uct/base/uct_md.h>
#include <ucs/arch/atomic.h>
#include <ucs/async/async.h>
#include <ucs/debug/log.h>
#include <poll.h>


static ucs_config_field_t uct_cm_iface_config_table[] = {
  {"IB_", "RX_INLINE=0", NULL,
   ucs_offsetof(uct_cm_iface_config_t, super), UCS_CONFIG_TYPE_TABLE(uct_ib_iface_config_table)},

  {"TIMEOUT", "300ms", "Timeout for MAD layer",
   ucs_offsetof(uct_cm_iface_config_t, timeout), UCS_CONFIG_TYPE_TIME},

  {"RETRY_COUNT", "100", "Number of retries for MAD layer",
   ucs_offsetof(uct_cm_iface_config_t, retry_count), UCS_CONFIG_TYPE_UINT},

  {"MAX_OP", "1024", "Maximal number of outstanding SIDR operations",
   ucs_offsetof(uct_cm_iface_config_t, max_outstanding), UCS_CONFIG_TYPE_UINT},

  {NULL}
};

static uct_ib_iface_ops_t uct_cm_iface_ops;


static unsigned uct_cm_iface_progress(void *arg)
{
    uct_cm_iface_t *iface = arg;
    uct_cm_pending_req_priv_t *priv;
    uct_cm_iface_op_t *op;
    unsigned count;

    uct_cm_enter(iface);

    /* Invoke flush completions at the head of the queue - the sends which
     * started before them were already completed.
     */
    count = 0;
    ucs_queue_for_each_extract(op, &iface->outstanding_q, queue, !op->is_id) {
        uct_invoke_completion(op->comp, UCS_OK);
        ucs_free(op);
        ++count;
    }

    /* we are in the progress() context. Now it is safe to release resources. */
    iface->num_outstanding -= iface->num_completions;
    iface->num_completions  = 0;

    /* Dispatch pending operations */
    uct_pending_queue_dispatch(priv, &iface->notify_q,
                               iface->num_outstanding < iface->config.max_outstanding);

    /* Remove the progress callback only if there is no user completion at the
     * head of the queue. It could be added by the progress callback.
     */
    if (ucs_queue_is_empty(&iface->outstanding_q) ||
        ucs_queue_head_elem_non_empty(&iface->outstanding_q, uct_cm_iface_op_t, queue)->is_id)
    {
        uct_worker_progress_unregister_safe(&uct_cm_iface_worker(iface)->super,
                                            &iface->slow_prog_id);
    }

    uct_cm_leave(iface);

    return count;
}

ucs_status_t uct_cm_iface_flush_do(uct_cm_iface_t *iface, uct_completion_t *comp)
{
    uct_cm_iface_op_t *op;

    if (iface->num_outstanding == 0) {
        return UCS_OK;
    }

    /* If user request a completion callback, allocate a new operation and put
     * it in the tail of the queue. It will be called when all operations which
     * were sent before are completed.
     */
    if (comp != NULL) {
        op = ucs_malloc(sizeof *op, "cm_op");
        if (op == NULL) {
            return UCS_ERR_NO_MEMORY;
        }

        op->is_id = 0;
        op->comp  = comp;
        ucs_queue_push(&iface->outstanding_q, &op->queue);
    }

    sched_yield();
    return UCS_INPROGRESS;
}

ucs_status_t uct_cm_iface_flush(uct_iface_h tl_iface, unsigned flags,
                                uct_completion_t *comp)
{
    uct_cm_iface_t *iface = ucs_derived_of(tl_iface, uct_cm_iface_t);
    ucs_status_t status;

    uct_cm_enter(iface);
    status = uct_cm_iface_flush_do(iface, comp);
    if (status == UCS_OK) {
        UCT_TL_IFACE_STAT_FLUSH(ucs_derived_of(tl_iface, uct_base_iface_t));
    } else if (status == UCS_INPROGRESS){
        UCT_TL_IFACE_STAT_FLUSH_WAIT(ucs_derived_of(tl_iface, uct_base_iface_t));
    }
    uct_cm_leave(iface);

    return status;
}

static void uct_cm_iface_handle_sidr_req(uct_cm_iface_t *iface,
                                         struct ib_cm_event *event)
{
    uct_cm_hdr_t *hdr = event->private_data;
    struct ib_cm_sidr_rep_param rep;
    int ret;

    VALGRIND_MAKE_MEM_DEFINED(hdr, sizeof(hdr));
    VALGRIND_MAKE_MEM_DEFINED(hdr + 1, hdr->length);

    uct_cm_iface_trace_data(iface, UCT_AM_TRACE_TYPE_RECV, hdr, "RX: SIDR_REQ");

    /* Send reply */
    ucs_trace_data("TX: SIDR_REP [id %p{%u}]", event->cm_id,
                   event->cm_id->handle);
    memset(&rep, 0, sizeof rep);
    rep.status = IB_SIDR_SUCCESS;
    ret = ib_cm_send_sidr_rep(event->cm_id, &rep);
    if (ret) {
        ucs_error("ib_cm_send_sidr_rep() failed: %m");
    }

    uct_iface_invoke_am(&iface->super.super, hdr->am_id, hdr + 1, hdr->length, 0);
}

static void uct_cm_iface_outstanding_remove(uct_cm_iface_t* iface,
                                            struct ib_cm_id* id)
{
    uct_cm_iface_op_t *op;
    ucs_queue_iter_t iter;

    ucs_queue_for_each_safe(op, iter, &iface->outstanding_q, queue) {
        if (op->is_id && (op->id == id)) {
            ucs_queue_del_iter(&iface->outstanding_q, iter);
            /* Must not release resources from the async context
             * because it will break pending op ordering.
             * For example bcopy() may succeed while there are queued
             * pending ops:
             * bcopy() -> no resources
             * pending_add() -> ok
             * <-- async event: resources available
             * bcopy() --> ok. oops this is out of order send
             *
             * save the number and do actual release in the
             * progress() context.
             */
            ++iface->num_completions;
            ucs_free(op);
            return;
        }
    }

    ucs_fatal("outstanding cm id %p not found", id);
}

static void uct_cm_iface_outstanding_purge(uct_cm_iface_t *iface)
{
    uct_cm_iface_op_t *op;

    ucs_queue_for_each_extract(op, &iface->outstanding_q, queue, 1) {
        if (op->is_id) {
            ib_cm_destroy_id(op->id);
        } else {
            uct_invoke_completion(op->comp, UCS_ERR_CANCELED);
        }
        ucs_free(op);
    }
    iface->num_outstanding = 0;
}

static void uct_cm_iface_event_handler(int fd, void *arg)
{
    uct_cm_iface_t *iface = arg;
    struct ib_cm_event *event;
    struct ib_cm_id *id;
    int destroy_id;
    int ret;

    ucs_trace_func("");

    for (;;) {
        /* Fetch all events */
        ret = ib_cm_get_event(iface->cmdev, &event);
        if (ret) {
            if (errno != EAGAIN) {
                ucs_warn("ib_cm_get_event() failed: %m");
            }
            return;
        }

        id  = event->cm_id;

        /* Handle the event */
        switch (event->event) {
        case IB_CM_SIDR_REQ_ERROR:
            ucs_error("SIDR request error, status: %s",
                      ibv_wc_status_str(event->param.send_status));
            destroy_id = 1;
            break;
        case IB_CM_SIDR_REQ_RECEIVED:
            uct_cm_iface_handle_sidr_req(iface, event);
            destroy_id = 1; /* Destroy the ID created by the driver */
            break;
        case IB_CM_SIDR_REP_RECEIVED:
            ucs_trace_data("RX: SIDR_REP [id %p{%u}]", id, id->handle);
            uct_cm_iface_outstanding_remove(iface, id);
            destroy_id = 1; /* Destroy the ID which was used for sending */
            break;
        default:
            ucs_warn("Unexpected CM event: %d", event->event);
            destroy_id = 0;
            break;
        }

        /* Acknowledge CM event, remember the id, in case we would destroy it */
        ret = ib_cm_ack_event(event);
        if (ret) {
            ucs_warn("ib_cm_ack_event() failed: %m");
        }

        /* If there is an id which should be destroyed, do it now, after
         * acknowledging all events.
         */
        if (destroy_id) {
            ret = ib_cm_destroy_id(id);
            if (ret) {
                ucs_error("ib_cm_destroy_id() failed: %m");
            }
        }

        uct_worker_progress_register_safe(&uct_cm_iface_worker(iface)->super,
                                          uct_cm_iface_progress, iface, 0,
                                          &iface->slow_prog_id);
    }
}

static void uct_cm_iface_release_desc(uct_recv_desc_t *self, void *desc)
{
    uct_ib_iface_t *iface = ucs_container_of(self, uct_ib_iface_t, release_desc);
    ucs_free(desc - iface->config.rx_headroom_offset);
}

static UCS_CLASS_INIT_FUNC(uct_cm_iface_t, uct_md_h md, uct_worker_h worker,
                           const uct_iface_params_t *params,
                           const uct_iface_config_t *tl_config)
{
    uct_cm_iface_config_t *config = ucs_derived_of(tl_config, uct_cm_iface_config_t);
    uct_ib_iface_init_attr_t init_attr = {};
    ucs_status_t status;
    int ret;

    ucs_trace_func("");

    init_attr.tx_cq_len = 1;
    init_attr.rx_cq_len = config->super.rx.queue_len;
    init_attr.seg_size  = ucs_min(IB_CM_SIDR_REQ_PRIVATE_DATA_SIZE,
                                  config->super.seg_size);

    UCS_CLASS_CALL_SUPER_INIT(uct_ib_iface_t, &uct_cm_iface_ops, md, worker,
                              params, &config->super, &init_attr);

    if (self->super.super.worker->async == NULL) {
        ucs_error("cm must have async!=NULL");
        return UCS_ERR_INVALID_PARAM;
    }

    self->num_outstanding     = 0;
    self->num_completions     = 0;
    self->service_id          = 0;
    self->config.timeout_ms   = (int)(config->timeout * 1e3 + 0.5);
    self->config.max_outstanding = config->max_outstanding;
    self->config.retry_count  = ucs_min(config->retry_count, UINT8_MAX);
    self->notify_q.head       = NULL;
    self->slow_prog_id        = UCS_CALLBACKQ_ID_NULL;
    ucs_queue_head_init(&self->notify_q);
    ucs_queue_head_init(&self->outstanding_q);

    /* Redefine receive desc release callback */
    self->super.release_desc.cb = uct_cm_iface_release_desc;

    self->cmdev = ib_cm_open_device(uct_ib_iface_device(&self->super)->ibv_context);
    if (self->cmdev == NULL) {
        ucs_error("ib_cm_open_device() failed: %m. Check if ib_ucm.ko module is loaded.");
        status = UCS_ERR_NO_DEVICE;
        goto err;
    }

    status = ucs_sys_fcntl_modfl(self->cmdev->fd, O_NONBLOCK, 0);
    if (status != UCS_OK) {
        goto err_close_device;
    }

    ret = ib_cm_create_id(self->cmdev, &self->listen_id, self);
    if (ret) {
        ucs_error("ib_cm_create_id() failed: %m");
        status = UCS_ERR_NO_DEVICE;
        goto err_close_device;
    }

    do {
        self->service_id = (uint32_t)(ucs_generate_uuid((uintptr_t)self) &
                                      (~IB_CM_ASSIGN_SERVICE_ID_MASK));
        ret = ib_cm_listen(self->listen_id, self->service_id, 0);
        if (ret) {
            if (errno == EBUSY) {
                /* The generated service id is already in use - try to
                 * generate another one.
                 */
                ucs_debug("ib_cm service id 0x%x already in use, "
                          "trying another one", self->service_id);
                continue;
            } else {
                ucs_error("ib_cm_listen(service_id=0x%x) failed: %m",
                          self->service_id);
                status = UCS_ERR_INVALID_ADDR;
                goto err_destroy_id;
            }
        }
    } while (ret);

    if (self->super.super.worker->async->mode == UCS_ASYNC_MODE_SIGNAL) {
        ucs_warn("ib_cm fd does not support SIGIO");
    }

    status = ucs_async_set_event_handler(self->super.super.worker->async->mode,
                                         self->cmdev->fd, UCS_EVENT_SET_EVREAD,
                                         uct_cm_iface_event_handler, self,
                                         self->super.super.worker->async);
    if (status != UCS_OK) {
        ucs_error("failed to set event handler");
        goto err_destroy_id;
    }

    ucs_debug("listening for SIDR service_id 0x%x on fd %d", self->service_id,
              self->cmdev->fd);
    return UCS_OK;

err_destroy_id:
    ib_cm_destroy_id(self->listen_id);
err_close_device:
    ib_cm_close_device(self->cmdev);
err:
    return status;
}

static UCS_CLASS_CLEANUP_FUNC(uct_cm_iface_t)
{

    ucs_trace_func("");

    ucs_async_remove_handler(self->cmdev->fd, 1);

    uct_cm_enter(self);
    uct_cm_iface_outstanding_purge(self);
    ib_cm_destroy_id(self->listen_id);
    ib_cm_close_device(self->cmdev);
    uct_worker_progress_unregister_safe(&uct_cm_iface_worker(self)->super,
                                        &self->slow_prog_id);
    uct_cm_leave(self);

    /* At this point all outstanding have been removed, and no further events
     * can be added.
     */
}

UCS_CLASS_DEFINE(uct_cm_iface_t, uct_ib_iface_t);
static UCS_CLASS_DEFINE_NEW_FUNC(uct_cm_iface_t, uct_iface_t, uct_md_h, uct_worker_h,
                                 const uct_iface_params_t*, const uct_iface_config_t*);
static UCS_CLASS_DEFINE_DELETE_FUNC(uct_cm_iface_t, uct_iface_t);

static ucs_status_t uct_cm_iface_query(uct_iface_h tl_iface,
                                       uct_iface_attr_t *iface_attr)
{
    uct_cm_iface_t *iface = ucs_derived_of(tl_iface, uct_cm_iface_t);
    ucs_status_t status;
    size_t mtu;

    status = uct_ib_iface_query(&iface->super, 32 /* TODO */, iface_attr);
    if (status != UCS_OK) {
        return status;
    }

    iface_attr->overhead = 1200e-9;

    mtu = ucs_min(IB_CM_SIDR_REQ_PRIVATE_DATA_SIZE - sizeof(uct_cm_hdr_t),
                  UINT8_MAX);

    iface_attr->cap.am.max_bcopy      = mtu;
    iface_attr->iface_addr_len        = sizeof(uint32_t);
    iface_attr->ep_addr_len           = 0;
    iface_attr->max_conn_priv         = 0;
    iface_attr->cap.flags             = UCT_IFACE_FLAG_AM_BCOPY |
                                        UCT_IFACE_FLAG_AM_DUP   |
                                        UCT_IFACE_FLAG_PENDING  |
                                        UCT_IFACE_FLAG_CB_ASYNC |
                                        UCT_IFACE_FLAG_CONNECT_TO_IFACE;
    return UCS_OK;
}

static ucs_status_t uct_cm_iface_get_address(uct_iface_h tl_iface,
                                             uct_iface_addr_t *iface_addr)
{
    uct_cm_iface_t *iface = ucs_derived_of(tl_iface, uct_cm_iface_t);
    *(uint32_t*)iface_addr = iface->service_id;
    return UCS_OK;
}


static uct_ib_iface_ops_t uct_cm_iface_ops = {
    {
    .ep_am_bcopy              = uct_cm_ep_am_bcopy,
    .ep_pending_add           = uct_cm_ep_pending_add,
    .ep_pending_purge         = uct_cm_ep_pending_purge,
    .ep_flush                 = uct_cm_ep_flush,
    .ep_fence                 = uct_base_ep_fence,
    .ep_create                = UCS_CLASS_NEW_FUNC_NAME(uct_cm_ep_t),
    .ep_destroy               = UCS_CLASS_DELETE_FUNC_NAME(uct_cm_ep_t),
    .iface_flush              = uct_cm_iface_flush,
    .iface_fence              = uct_base_iface_fence,
    .iface_progress_enable    = ucs_empty_function,
    .iface_progress_disable   = ucs_empty_function,
    .iface_progress           = ucs_empty_function_return_zero,
    .iface_close              = UCS_CLASS_DELETE_FUNC_NAME(uct_cm_iface_t),
    .iface_query              = uct_cm_iface_query,
    .iface_get_device_address = uct_ib_iface_get_device_address,
    .iface_get_address        = uct_cm_iface_get_address,
    .iface_is_reachable       = uct_ib_iface_is_reachable
    },
    .create_cq                = uct_ib_verbs_create_cq,
    .arm_cq                   = (void*)ucs_empty_function_return_success,
};

static int uct_cm_is_module_loaded(uct_ib_md_t *ib_md)
{
    struct ib_cm_device *cmdev = NULL;

    cmdev = ib_cm_open_device(ib_md->dev.ibv_context);
    if (cmdev == NULL) {
        ucs_debug("ib_cm_open_device() for %s failed: %m. "
                  "Check if ib_ucm.ko module is loaded.",
                  uct_ib_device_name(&ib_md->dev));
        return 0;
    }

    ib_cm_close_device(cmdev);
    return 1;
}

static ucs_status_t
uct_cm_query_tl_devices(uct_md_h md, uct_tl_device_resource_t **tl_devices_p,
                        unsigned *num_tl_devices_p)
{
    uct_ib_md_t *ib_md = ucs_derived_of(md, uct_ib_md_t);

    if (!uct_cm_is_module_loaded(ib_md)) {
        *num_tl_devices_p = 0;
        *tl_devices_p     = NULL;
        return UCS_OK;
    }

   return uct_ib_device_query_ports(&ib_md->dev, UCT_IB_DEVICE_FLAG_LINK_IB,
                                    tl_devices_p, num_tl_devices_p);
}

UCT_TL_DEFINE(&uct_ib_component, cm, uct_cm_query_tl_devices, uct_cm_iface_t,
              "CM_", uct_cm_iface_config_table, uct_cm_iface_config_t);