Blob Blame History Raw
/**
* Copyright (C) Mellanox Technologies Ltd. 2019.  ALL RIGHTS RESERVED.
*
* See file LICENSE for terms.
*/

#ifdef HAVE_CONFIG_H
#  include "config.h" /* Defines HAVE_RDMACM_QP_LESS */
#endif

#include "rdmacm_cm_ep.h"
#include <uct/ib/base/ib_iface.h>
#include <ucs/async/async.h>

#include <poll.h>
#include <rdma/rdma_cma.h>


ucs_status_t uct_rdmacm_cm_destroy_id(struct rdma_cm_id *id)
{
    ucs_trace("destroying cm_id %p", id);

    if (rdma_destroy_id(id)) {
        ucs_warn("rdma_destroy_id() failed: %m");
        return UCS_ERR_IO_ERROR;
    }

    return UCS_OK;
}

ucs_status_t uct_rdmacm_cm_ack_event(struct rdma_cm_event *event)
{
    ucs_trace("ack event %p, cm_id %p", event, event->id);

    if (rdma_ack_cm_event(event)) {
        ucs_warn("rdma_ack_cm_event failed on event %s: %m",
                 rdma_event_str(event->event));
        return UCS_ERR_IO_ERROR;
    }

    return UCS_OK;
}

ucs_status_t uct_rdmacm_cm_reject(struct rdma_cm_id *id)
{
    ucs_trace("reject on cm_id %p", id);

    if (rdma_reject(id, NULL, 0)) {
        ucs_error("rdma_reject (id=%p) failed with error: %m", id);
        return UCS_ERR_IO_ERROR;
    }

    return UCS_OK;
}

size_t uct_rdmacm_cm_get_max_conn_priv()
{
    return UCT_RDMACM_TCP_PRIV_DATA_LEN - sizeof(uct_rdmacm_priv_data_hdr_t);
}

static ucs_status_t uct_rdmacm_cm_query(uct_cm_h cm, uct_cm_attr_t *cm_attr)
{
    if (cm_attr->field_mask & UCT_CM_ATTR_FIELD_MAX_CONN_PRIV) {
        cm_attr->max_conn_priv = uct_rdmacm_cm_get_max_conn_priv();
    }
    return UCS_OK;
}

static void uct_rdmacm_cm_handle_event_addr_resolved(struct rdma_cm_event *event)
{
    struct sockaddr    *remote_addr = rdma_get_peer_addr(event->id);
    uct_rdmacm_cm_ep_t *cep         = (uct_rdmacm_cm_ep_t *)event->id->context;
    char ip_port_str[UCS_SOCKADDR_STRING_LEN];
    char ep_str[UCT_RDMACM_EP_STRING_LEN];
    uct_cm_remote_data_t remote_data;

    ucs_assert(event->id == cep->id);

    ucs_trace("%s: rdma_resolve_route on cm_id %p",
              uct_rdmacm_cm_ep_str(cep, ep_str, UCT_RDMACM_EP_STRING_LEN),
              event->id);

    if (rdma_resolve_route(event->id, 1000 /* TODO */)) {
        ucs_error("%s: rdma_resolve_route(to addr=%s) failed: %m",
                  uct_rdmacm_cm_ep_str(cep, ep_str, UCT_RDMACM_EP_STRING_LEN),
                  ucs_sockaddr_str(remote_addr, ip_port_str,
                                   UCS_SOCKADDR_STRING_LEN));
        remote_data.field_mask = 0;
        uct_rdmacm_cm_ep_set_failed(cep, &remote_data, UCS_ERR_IO_ERROR);
    }
}

static void uct_rdmacm_cm_handle_event_route_resolved(struct rdma_cm_event *event)
{
    struct sockaddr        *remote_addr = rdma_get_peer_addr(event->id);
    uct_rdmacm_cm_ep_t     *cep         = (uct_rdmacm_cm_ep_t *)event->id->context;
    uct_cm_remote_data_t   remote_data;
    ucs_status_t           status;
    struct rdma_conn_param conn_param;
    char                   ip_port_str[UCS_SOCKADDR_STRING_LEN];
    char                   ep_str[UCT_RDMACM_EP_STRING_LEN];

    ucs_assert(event->id == cep->id);

    memset(&conn_param, 0, sizeof(conn_param));
    conn_param.private_data = ucs_alloca(uct_rdmacm_cm_get_max_conn_priv() +
                                         sizeof(uct_rdmacm_priv_data_hdr_t));

    status = uct_rdmacm_cm_ep_conn_param_init(cep, &conn_param);
    if (status != UCS_OK) {
        remote_data.field_mask = 0;
        uct_rdmacm_cm_ep_set_failed(cep, &remote_data, status);
        return;
    }

    ucs_trace("%s: rdma_connect, cm_id %p",
              uct_rdmacm_cm_ep_str(cep, ep_str, UCT_RDMACM_EP_STRING_LEN), cep->id);

    if (rdma_connect(cep->id, &conn_param)) {
        ucs_error("%s: rdma_connect(to addr=%s) failed: %m",
                  uct_rdmacm_cm_ep_str(cep, ep_str, UCT_RDMACM_EP_STRING_LEN),
                  ucs_sockaddr_str(remote_addr, ip_port_str,
                                   UCS_SOCKADDR_STRING_LEN));
        remote_data.field_mask = 0;
        uct_rdmacm_cm_ep_set_failed(cep, &remote_data, UCS_ERR_IO_ERROR);
    }
}

static ucs_status_t uct_rdmacm_cm_id_to_dev_addr(struct rdma_cm_id *cm_id,
                                                 uct_device_addr_t **dev_addr_p,
                                                 size_t *dev_addr_len_p)
{
    struct ibv_port_attr port_attr;
    uct_ib_address_t *dev_addr;
    struct ibv_qp_attr qp_attr;
    size_t addr_length;
    int qp_attr_mask;
    char dev_name[UCT_DEVICE_NAME_MAX];
    unsigned address_pack_flags;

    /* get the qp attributes in order to modify the qp state.
     * the ah_attr fields from them are required to extract the device address
     * of the remote peer.
     */
    qp_attr.qp_state = IBV_QPS_RTR;
    if (rdma_init_qp_attr(cm_id, &qp_attr, &qp_attr_mask)) {
        ucs_error("rdma_init_qp_attr (id=%p, qp_state=%d) failed: %m",
                  cm_id, qp_attr.qp_state);
        return UCS_ERR_IO_ERROR;
     }

    if (ibv_query_port(cm_id->verbs, cm_id->port_num, &port_attr)) {
        uct_rdmacm_cm_id_to_dev_name(cm_id, dev_name);
        ucs_error("ibv_query_port (%s) failed: %m", dev_name);
        return UCS_ERR_IO_ERROR;
    }

    if (IBV_PORT_IS_LINK_LAYER_ETHERNET(&port_attr)) {
        /* Ethernet address */
        ucs_assert(qp_attr.ah_attr.is_global);
        address_pack_flags = UCT_IB_ADDRESS_PACK_FLAG_ETH;
    } else if (qp_attr.ah_attr.is_global) {
        /* IB global address */
        address_pack_flags = UCT_IB_ADDRESS_PACK_FLAG_INTERFACE_ID |
                             UCT_IB_ADDRESS_PACK_FLAG_SUBNET_PREFIX;
    } else {
        /* IB local address - need just LID */
        address_pack_flags = 0;
    }

    addr_length = uct_ib_address_size(&qp_attr.ah_attr.grh.dgid,
                                      address_pack_flags);

    dev_addr = ucs_malloc(addr_length, "IB device address");
    if (dev_addr == NULL) {
        ucs_error("failed to allocate IB device address");
        return UCS_ERR_NO_MEMORY;
    }

    uct_ib_address_pack(&qp_attr.ah_attr.grh.dgid, qp_attr.ah_attr.dlid,
                        address_pack_flags, dev_addr);

    *dev_addr_p     = (uct_device_addr_t *)dev_addr;
    *dev_addr_len_p = addr_length;
    return UCS_OK;
}

static void uct_rdmacm_cm_handle_event_connect_request(struct rdma_cm_event *event)
{
    uct_rdmacm_priv_data_hdr_t *hdr      = (uct_rdmacm_priv_data_hdr_t *)
                                           event->param.conn.private_data;
    uct_rdmacm_listener_t      *listener = event->listen_id->context;
    char                       dev_name[UCT_DEVICE_NAME_MAX];
    uct_device_addr_t          *dev_addr;
    size_t                     addr_length;
    uct_cm_remote_data_t       remote_data;
    ucs_status_t               status;

    ucs_assert(hdr->status == UCS_OK);

    uct_rdmacm_cm_id_to_dev_name(event->id, dev_name);

    status = uct_rdmacm_cm_id_to_dev_addr(event->id, &dev_addr, &addr_length);
    if (status != UCS_OK) {
        uct_rdmacm_cm_reject(event->id);
        uct_rdmacm_cm_destroy_id(event->id);
        return;
    }

    remote_data.field_mask            = UCT_CM_REMOTE_DATA_FIELD_DEV_ADDR        |
                                        UCT_CM_REMOTE_DATA_FIELD_DEV_ADDR_LENGTH |
                                        UCT_CM_REMOTE_DATA_FIELD_CONN_PRIV_DATA  |
                                        UCT_CM_REMOTE_DATA_FIELD_CONN_PRIV_DATA_LENGTH;
    remote_data.dev_addr              = dev_addr;
    remote_data.dev_addr_length       = addr_length;
    remote_data.conn_priv_data        = hdr + 1;
    remote_data.conn_priv_data_length = hdr->length;

    listener->conn_request_cb(&listener->super, listener->user_data,
                              dev_name, event, &remote_data);
    ucs_free(dev_addr);
}

static void uct_rdmacm_cm_handle_event_connect_response(struct rdma_cm_event *event)
{
    struct sockaddr            *remote_addr = rdma_get_peer_addr(event->id);
    uct_rdmacm_priv_data_hdr_t *hdr         = (uct_rdmacm_priv_data_hdr_t *)
                                              event->param.conn.private_data;
    uct_rdmacm_cm_ep_t         *cep         = event->id->context;
    char                       ip_port_str[UCS_SOCKADDR_STRING_LEN];
    uct_device_addr_t          *dev_addr;
    size_t                     addr_length;
    uct_cm_remote_data_t       remote_data;
    ucs_status_t               status;

    ucs_assert(event->id == cep->id);

    /* Do not notify user on disconnected EP, RDMACM out of order case */
    if (cep->flags & UCT_RDMACM_CM_EP_GOT_DISCONNECT) {
        return;
    }

    remote_data.field_mask            = UCT_CM_REMOTE_DATA_FIELD_CONN_PRIV_DATA |
                                        UCT_CM_REMOTE_DATA_FIELD_CONN_PRIV_DATA_LENGTH;
    remote_data.conn_priv_data        = hdr + 1;
    remote_data.conn_priv_data_length = hdr->length;

    status = uct_rdmacm_cm_id_to_dev_addr(event->id, &dev_addr, &addr_length);
    if (status != UCS_OK) {
        ucs_error("client (ep=%p id=%p) failed to process a connect response "
                  "from server %s.", cep, event->id,
                  ucs_sockaddr_str(remote_addr, ip_port_str,
                                   UCS_SOCKADDR_STRING_LEN));
        uct_rdmacm_cm_ep_set_failed(cep, &remote_data, status);
        /* notify remote side about local error */
        rdma_disconnect(cep->id);
        return;
    }

    remote_data.field_mask       |= UCT_CM_REMOTE_DATA_FIELD_DEV_ADDR |
                                    UCT_CM_REMOTE_DATA_FIELD_DEV_ADDR_LENGTH;
    remote_data.dev_addr          = dev_addr;
    remote_data.dev_addr_length   = addr_length;

    uct_rdmacm_cm_ep_client_connect_cb(cep, &remote_data,
                                       (ucs_status_t)hdr->status);
    ucs_free(dev_addr);

    if (rdma_establish(event->id)) {
        ucs_error("rdma_establish on ep %p (to server addr=%s) failed: %m",
                  cep, ucs_sockaddr_str(remote_addr, ip_port_str,
                                        UCS_SOCKADDR_STRING_LEN));
        uct_rdmacm_cm_ep_set_failed(cep, &remote_data, UCS_ERR_IO_ERROR);
    }
}

static void uct_rdmacm_cm_handle_event_established(struct rdma_cm_event *event)
{
    uct_rdmacm_cm_ep_t *cep = event->id->context;

    ucs_assert(event->id == cep->id);
    /* do not call connect callback again, RDMACM out of order case */
    if (cep->flags & UCT_RDMACM_CM_EP_GOT_DISCONNECT) {
        return;
    }

    uct_rdmacm_cm_ep_server_connect_cb(cep, UCS_OK);
}

static void uct_rdmacm_cm_handle_event_disconnected(struct rdma_cm_event *event)
{
    uct_rdmacm_cm_ep_t   *cep         = event->id->context;
    struct sockaddr      *remote_addr = rdma_get_peer_addr(event->id);
    char                 ip_port_str[UCS_SOCKADDR_STRING_LEN];
    char                 ep_str[UCT_RDMACM_EP_STRING_LEN];
    uct_cm_remote_data_t remote_data;

    ucs_debug("%s: got disconnect event, status %d peer %s",
              uct_rdmacm_cm_ep_str(cep, ep_str, UCT_RDMACM_EP_STRING_LEN),
              event->status, ucs_sockaddr_str(remote_addr, ip_port_str,
                                             UCS_SOCKADDR_STRING_LEN));

    cep->flags |= UCT_RDMACM_CM_EP_GOT_DISCONNECT;
    /* calling error_cb instead of disconnect CB directly handles out-of-order
     * disconnect event prior connect_response/connect_established event */
    remote_data.field_mask = 0;
    uct_rdmacm_cm_ep_error_cb(cep, &remote_data, UCS_ERR_CONNECTION_RESET);
}

static void uct_rdmacm_cm_handle_error_event(struct rdma_cm_event *event)
{
    uct_rdmacm_cm_ep_t *cep      = event->id->context;
    struct sockaddr *remote_addr = rdma_get_peer_addr(event->id);
    char ip_port_str[UCS_SOCKADDR_STRING_LEN];
    char ep_str[UCT_RDMACM_EP_STRING_LEN];
    uct_cm_remote_data_t remote_data;
    ucs_log_level_t log_level;
    ucs_status_t status;

    if (event->event == RDMA_CM_EVENT_REJECTED) {
        if (cep->flags & UCT_RDMACM_CM_EP_ON_SERVER) {
            /* response was rejected by the client in the middle of
             * connection establishment, so report connection reset */
            status = UCS_ERR_CONNECTION_RESET;
        } else {
            ucs_assert(cep->flags & UCT_RDMACM_CM_EP_ON_CLIENT);
            status = UCS_ERR_REJECTED;
        }

        log_level = UCS_LOG_LEVEL_DEBUG;
    } else {
        status    = UCS_ERR_IO_ERROR;
        log_level = UCS_LOG_LEVEL_ERROR;
    }

    ucs_log(log_level, "%s: got error event %s, status %d peer %s",
            uct_rdmacm_cm_ep_str(cep, ep_str, UCT_RDMACM_EP_STRING_LEN),
            rdma_event_str(event->event), event->status,
            ucs_sockaddr_str(remote_addr, ip_port_str,
                             UCS_SOCKADDR_STRING_LEN));

    remote_data.field_mask = 0;
    uct_rdmacm_cm_ep_set_failed(cep, &remote_data, status);
}

static void
uct_rdmacm_cm_process_event(uct_rdmacm_cm_t *cm, struct rdma_cm_event *event)
{
    struct sockaddr *remote_addr = rdma_get_peer_addr(event->id);
    uint8_t         ack_event    = 1;
    char            ip_port_str[UCS_SOCKADDR_STRING_LEN];

    ucs_trace("rdmacm event (fd=%d cm_id %p cm %p event_channel %p): %s. Peer: %s.",
              cm->ev_ch->fd, event->id, cm, cm->ev_ch, rdma_event_str(event->event),
              ucs_sockaddr_str(remote_addr, ip_port_str, UCS_SOCKADDR_STRING_LEN));

    /* The following applies for rdma_cm_id of type RDMA_PS_TCP only */
    ucs_assert(event->id->ps == RDMA_PS_TCP);

    /* Using https://linux.die.net/man/3/rdma_get_cm_event to distinguish
     * between client and server events */
    switch (event->event) {
    case RDMA_CM_EVENT_ADDR_RESOLVED:
        /* Client side event */
        uct_rdmacm_cm_handle_event_addr_resolved(event);
        break;
    case RDMA_CM_EVENT_ROUTE_RESOLVED:
        /* Client side event */
        uct_rdmacm_cm_handle_event_route_resolved(event);
        break;
    case RDMA_CM_EVENT_CONNECT_REQUEST:
        /* Server side event */
        uct_rdmacm_cm_handle_event_connect_request(event);
        /* The server will ack the event after accepting/rejecting the request
         * (in ep_create). */
        ack_event = 0;
        break;
    case RDMA_CM_EVENT_CONNECT_RESPONSE:
        /* Client side event */
        uct_rdmacm_cm_handle_event_connect_response(event);
        break;
    case RDMA_CM_EVENT_ESTABLISHED:
        /* Server side event */
        uct_rdmacm_cm_handle_event_established(event);
        break;
    case RDMA_CM_EVENT_DISCONNECTED:
        /* Client and Server side event */
        uct_rdmacm_cm_handle_event_disconnected(event);
        break;
    case RDMA_CM_EVENT_TIMEWAIT_EXIT:
        /* This event is generated when the QP associated with the connection
         * has exited its timewait state and is now ready to be re-used.
         * After a QP has been disconnected, it is maintained in a timewait
         * state to allow any in flight packets to exit the network.
         * After the timewait state has completed, the rdma_cm will report this event.*/
        break;
        /* client error events */
    case RDMA_CM_EVENT_REJECTED:
    case RDMA_CM_EVENT_UNREACHABLE:
    case RDMA_CM_EVENT_ADDR_ERROR:
    case RDMA_CM_EVENT_ROUTE_ERROR:
    case RDMA_CM_EVENT_DEVICE_REMOVAL:
    case RDMA_CM_EVENT_ADDR_CHANGE:
        /* client and server error events */
    case RDMA_CM_EVENT_CONNECT_ERROR:
        uct_rdmacm_cm_handle_error_event(event);
        break;
    default:
        ucs_warn("unexpected RDMACM event: %s", rdma_event_str(event->event));
        break;
    }

    if (ack_event) {
        uct_rdmacm_cm_ack_event(event);
    }
}

static void uct_rdmacm_cm_event_handler(int fd, void *arg)
{
    uct_rdmacm_cm_t      *cm = (uct_rdmacm_cm_t *)arg;
    struct rdma_cm_event *event;
    int                  ret;

    for (;;) {
        /* Fetch an event */
        ret = rdma_get_cm_event(cm->ev_ch, &event);
        if (ret) {
            /* EAGAIN (in a non-blocking rdma_get_cm_event) means that
             * there are no more events */
            if ((errno != EAGAIN) && (errno != EINTR)) {
                ucs_warn("rdma_get_cm_event() failed: %m");
            }

            return;
        }

        UCS_ASYNC_BLOCK(uct_rdmacm_cm_get_async(cm));
        uct_rdmacm_cm_process_event(cm, event);
        UCS_ASYNC_UNBLOCK(uct_rdmacm_cm_get_async(cm));
    }
}

static uct_cm_ops_t uct_rdmacm_cm_ops = {
    .close            = UCS_CLASS_DELETE_FUNC_NAME(uct_rdmacm_cm_t),
    .cm_query         = uct_rdmacm_cm_query,
    .listener_create  = UCS_CLASS_NEW_FUNC_NAME(uct_rdmacm_listener_t),
    .listener_reject  = uct_rdmacm_listener_reject,
    .listener_query   = uct_rdmacm_listener_query,
    .listener_destroy = UCS_CLASS_DELETE_FUNC_NAME(uct_rdmacm_listener_t),
    .ep_create        = UCS_CLASS_NEW_FUNC_NAME(uct_rdmacm_cm_ep_t)
};

static uct_iface_ops_t uct_rdmacm_cm_iface_ops = {
    .ep_pending_purge         = ucs_empty_function,
    .ep_disconnect            = uct_rdmacm_cm_ep_disconnect,
    .ep_destroy               = UCS_CLASS_DELETE_FUNC_NAME(uct_rdmacm_cm_ep_t),
    .ep_put_short             = (uct_ep_put_short_func_t)ucs_empty_function_return_unsupported,
    .ep_put_bcopy             = (uct_ep_put_bcopy_func_t)ucs_empty_function_return_unsupported,
    .ep_get_bcopy             = (uct_ep_get_bcopy_func_t)ucs_empty_function_return_unsupported,
    .ep_am_short              = (uct_ep_am_short_func_t)ucs_empty_function_return_unsupported,
    .ep_am_bcopy              = (uct_ep_am_bcopy_func_t)ucs_empty_function_return_unsupported,
    .ep_atomic_cswap64        = (uct_ep_atomic_cswap64_func_t)ucs_empty_function_return_unsupported,
    .ep_atomic64_post         = (uct_ep_atomic64_post_func_t)ucs_empty_function_return_unsupported,
    .ep_atomic64_fetch        = (uct_ep_atomic64_fetch_func_t)ucs_empty_function_return_unsupported,
    .ep_atomic_cswap32        = (uct_ep_atomic_cswap32_func_t)ucs_empty_function_return_unsupported,
    .ep_atomic32_post         = (uct_ep_atomic32_post_func_t)ucs_empty_function_return_unsupported,
    .ep_atomic32_fetch        = (uct_ep_atomic32_fetch_func_t)ucs_empty_function_return_unsupported,
    .ep_pending_add           = (uct_ep_pending_add_func_t)ucs_empty_function_return_unsupported,
    .ep_flush                 = (uct_ep_flush_func_t)ucs_empty_function_return_success,
    .ep_fence                 = (uct_ep_fence_func_t)ucs_empty_function_return_unsupported,
    .ep_check                 = (uct_ep_check_func_t)ucs_empty_function_return_unsupported,
    .ep_create                = (uct_ep_create_func_t)ucs_empty_function_return_unsupported,
    .iface_flush              = (uct_iface_flush_func_t)ucs_empty_function_return_unsupported,
    .iface_fence              = (uct_iface_fence_func_t)ucs_empty_function_return_unsupported,
    .iface_progress_enable    = ucs_empty_function,
    .iface_progress_disable   = ucs_empty_function,
    .iface_progress           = (uct_iface_progress_func_t)ucs_empty_function_return_zero,
    .iface_event_fd_get       = (uct_iface_event_fd_get_func_t)ucs_empty_function_return_unsupported,
    .iface_event_arm          = (uct_iface_event_arm_func_t)ucs_empty_function_return_unsupported,
    .iface_close              = ucs_empty_function,
    .iface_query              = (uct_iface_query_func_t)ucs_empty_function_return_unsupported,
    .iface_get_device_address = (uct_iface_get_device_address_func_t)ucs_empty_function_return_unsupported,
    .iface_get_address        = (uct_iface_get_address_func_t)ucs_empty_function_return_unsupported,
    .iface_is_reachable       = (uct_iface_is_reachable_func_t)ucs_empty_function_return_zero
};

UCS_CLASS_INIT_FUNC(uct_rdmacm_cm_t, uct_component_h component,
                    uct_worker_h worker, const uct_cm_config_t *config)
{
    uct_priv_worker_t *worker_priv;
    ucs_status_t status;

    UCS_CLASS_CALL_SUPER_INIT(uct_cm_t, &uct_rdmacm_cm_ops,
                              &uct_rdmacm_cm_iface_ops, worker, component);

    self->ev_ch  = rdma_create_event_channel();
    if (self->ev_ch == NULL) {
        ucs_error("rdma_create_event_channel failed: %m");
        status = UCS_ERR_IO_ERROR;
        goto err;
    }

    /* Set the event_channel fd to non-blocking mode
     * (so that rdma_get_cm_event won't be blocking) */
    status = ucs_sys_fcntl_modfl(self->ev_ch->fd, O_NONBLOCK, 0);
    if (status != UCS_OK) {
        status = UCS_ERR_IO_ERROR;
        goto err_destroy_ev_ch;
    }

    worker_priv = ucs_derived_of(worker, uct_priv_worker_t);
    status = ucs_async_set_event_handler(worker_priv->async->mode,
                                         self->ev_ch->fd, UCS_EVENT_SET_EVREAD,
                                         uct_rdmacm_cm_event_handler, self,
                                         worker_priv->async);
    if (status != UCS_OK) {
        goto err_destroy_ev_ch;
    }

    ucs_debug("created rdmacm_cm %p with event_channel %p (fd=%d)",
              self, self->ev_ch, self->ev_ch->fd);

    return UCS_OK;

err_destroy_ev_ch:
    rdma_destroy_event_channel(self->ev_ch);
err:
    return status;
}

UCS_CLASS_CLEANUP_FUNC(uct_rdmacm_cm_t)
{
    ucs_status_t status;

    status = ucs_async_remove_handler(self->ev_ch->fd, 1);
    if (status != UCS_OK) {
        ucs_warn("failed to remove event handler for fd %d: %s",
                 self->ev_ch->fd, ucs_status_string(status));
    }

    ucs_trace("destroying event_channel %p on cm %p", self->ev_ch, self);
    rdma_destroy_event_channel(self->ev_ch);
}

UCS_CLASS_DEFINE(uct_rdmacm_cm_t, uct_cm_t);
UCS_CLASS_DEFINE_NEW_FUNC(uct_rdmacm_cm_t, uct_cm_t, uct_component_h,
                          uct_worker_h, const uct_cm_config_t *);
UCS_CLASS_DEFINE_DELETE_FUNC(uct_rdmacm_cm_t, uct_cm_t);