/** * Copyright (C) Mellanox Technologies Ltd. 2001-2019. ALL RIGHTS RESERVED. * * See file LICENSE for terms. */ #ifdef HAVE_CONFIG_H # include "config.h" #endif #include "cm.h" #include #include #include #include #include #include #include static ucs_config_field_t uct_cm_iface_config_table[] = { {"IB_", "RX_INLINE=0", NULL, ucs_offsetof(uct_cm_iface_config_t, super), UCS_CONFIG_TYPE_TABLE(uct_ib_iface_config_table)}, {"TIMEOUT", "300ms", "Timeout for MAD layer", ucs_offsetof(uct_cm_iface_config_t, timeout), UCS_CONFIG_TYPE_TIME}, {"RETRY_COUNT", "100", "Number of retries for MAD layer", ucs_offsetof(uct_cm_iface_config_t, retry_count), UCS_CONFIG_TYPE_UINT}, {"MAX_OP", "1024", "Maximal number of outstanding SIDR operations", ucs_offsetof(uct_cm_iface_config_t, max_outstanding), UCS_CONFIG_TYPE_UINT}, {NULL} }; static uct_ib_iface_ops_t uct_cm_iface_ops; static unsigned uct_cm_iface_progress(void *arg) { uct_cm_iface_t *iface = arg; uct_cm_pending_req_priv_t *priv; uct_cm_iface_op_t *op; unsigned count; uct_cm_enter(iface); /* Invoke flush completions at the head of the queue - the sends which * started before them were already completed. */ count = 0; ucs_queue_for_each_extract(op, &iface->outstanding_q, queue, !op->is_id) { uct_invoke_completion(op->comp, UCS_OK); ucs_free(op); ++count; } /* we are in the progress() context. Now it is safe to release resources. */ iface->num_outstanding -= iface->num_completions; iface->num_completions = 0; /* Dispatch pending operations */ uct_pending_queue_dispatch(priv, &iface->notify_q, iface->num_outstanding < iface->config.max_outstanding); /* Remove the progress callback only if there is no user completion at the * head of the queue. It could be added by the progress callback. */ if (ucs_queue_is_empty(&iface->outstanding_q) || ucs_queue_head_elem_non_empty(&iface->outstanding_q, uct_cm_iface_op_t, queue)->is_id) { uct_worker_progress_unregister_safe(&uct_cm_iface_worker(iface)->super, &iface->slow_prog_id); } uct_cm_leave(iface); return count; } ucs_status_t uct_cm_iface_flush_do(uct_cm_iface_t *iface, uct_completion_t *comp) { uct_cm_iface_op_t *op; if (iface->num_outstanding == 0) { return UCS_OK; } /* If user request a completion callback, allocate a new operation and put * it in the tail of the queue. It will be called when all operations which * were sent before are completed. */ if (comp != NULL) { op = ucs_malloc(sizeof *op, "cm_op"); if (op == NULL) { return UCS_ERR_NO_MEMORY; } op->is_id = 0; op->comp = comp; ucs_queue_push(&iface->outstanding_q, &op->queue); } sched_yield(); return UCS_INPROGRESS; } ucs_status_t uct_cm_iface_flush(uct_iface_h tl_iface, unsigned flags, uct_completion_t *comp) { uct_cm_iface_t *iface = ucs_derived_of(tl_iface, uct_cm_iface_t); ucs_status_t status; uct_cm_enter(iface); status = uct_cm_iface_flush_do(iface, comp); if (status == UCS_OK) { UCT_TL_IFACE_STAT_FLUSH(ucs_derived_of(tl_iface, uct_base_iface_t)); } else if (status == UCS_INPROGRESS){ UCT_TL_IFACE_STAT_FLUSH_WAIT(ucs_derived_of(tl_iface, uct_base_iface_t)); } uct_cm_leave(iface); return status; } static void uct_cm_iface_handle_sidr_req(uct_cm_iface_t *iface, struct ib_cm_event *event) { uct_cm_hdr_t *hdr = event->private_data; struct ib_cm_sidr_rep_param rep; int ret; VALGRIND_MAKE_MEM_DEFINED(hdr, sizeof(hdr)); VALGRIND_MAKE_MEM_DEFINED(hdr + 1, hdr->length); uct_cm_iface_trace_data(iface, UCT_AM_TRACE_TYPE_RECV, hdr, "RX: SIDR_REQ"); /* Send reply */ ucs_trace_data("TX: SIDR_REP [id %p{%u}]", event->cm_id, event->cm_id->handle); memset(&rep, 0, sizeof rep); rep.status = IB_SIDR_SUCCESS; ret = ib_cm_send_sidr_rep(event->cm_id, &rep); if (ret) { ucs_error("ib_cm_send_sidr_rep() failed: %m"); } uct_iface_invoke_am(&iface->super.super, hdr->am_id, hdr + 1, hdr->length, 0); } static void uct_cm_iface_outstanding_remove(uct_cm_iface_t* iface, struct ib_cm_id* id) { uct_cm_iface_op_t *op; ucs_queue_iter_t iter; ucs_queue_for_each_safe(op, iter, &iface->outstanding_q, queue) { if (op->is_id && (op->id == id)) { ucs_queue_del_iter(&iface->outstanding_q, iter); /* Must not release resources from the async context * because it will break pending op ordering. * For example bcopy() may succeed while there are queued * pending ops: * bcopy() -> no resources * pending_add() -> ok * <-- async event: resources available * bcopy() --> ok. oops this is out of order send * * save the number and do actual release in the * progress() context. */ ++iface->num_completions; ucs_free(op); return; } } ucs_fatal("outstanding cm id %p not found", id); } static void uct_cm_iface_outstanding_purge(uct_cm_iface_t *iface) { uct_cm_iface_op_t *op; ucs_queue_for_each_extract(op, &iface->outstanding_q, queue, 1) { if (op->is_id) { ib_cm_destroy_id(op->id); } else { uct_invoke_completion(op->comp, UCS_ERR_CANCELED); } ucs_free(op); } iface->num_outstanding = 0; } static void uct_cm_iface_event_handler(int fd, void *arg) { uct_cm_iface_t *iface = arg; struct ib_cm_event *event; struct ib_cm_id *id; int destroy_id; int ret; ucs_trace_func(""); for (;;) { /* Fetch all events */ ret = ib_cm_get_event(iface->cmdev, &event); if (ret) { if (errno != EAGAIN) { ucs_warn("ib_cm_get_event() failed: %m"); } return; } id = event->cm_id; /* Handle the event */ switch (event->event) { case IB_CM_SIDR_REQ_ERROR: ucs_error("SIDR request error, status: %s", ibv_wc_status_str(event->param.send_status)); destroy_id = 1; break; case IB_CM_SIDR_REQ_RECEIVED: uct_cm_iface_handle_sidr_req(iface, event); destroy_id = 1; /* Destroy the ID created by the driver */ break; case IB_CM_SIDR_REP_RECEIVED: ucs_trace_data("RX: SIDR_REP [id %p{%u}]", id, id->handle); uct_cm_iface_outstanding_remove(iface, id); destroy_id = 1; /* Destroy the ID which was used for sending */ break; default: ucs_warn("Unexpected CM event: %d", event->event); destroy_id = 0; break; } /* Acknowledge CM event, remember the id, in case we would destroy it */ ret = ib_cm_ack_event(event); if (ret) { ucs_warn("ib_cm_ack_event() failed: %m"); } /* If there is an id which should be destroyed, do it now, after * acknowledging all events. */ if (destroy_id) { ret = ib_cm_destroy_id(id); if (ret) { ucs_error("ib_cm_destroy_id() failed: %m"); } } uct_worker_progress_register_safe(&uct_cm_iface_worker(iface)->super, uct_cm_iface_progress, iface, 0, &iface->slow_prog_id); } } static void uct_cm_iface_release_desc(uct_recv_desc_t *self, void *desc) { uct_ib_iface_t *iface = ucs_container_of(self, uct_ib_iface_t, release_desc); ucs_free(desc - iface->config.rx_headroom_offset); } static UCS_CLASS_INIT_FUNC(uct_cm_iface_t, uct_md_h md, uct_worker_h worker, const uct_iface_params_t *params, const uct_iface_config_t *tl_config) { uct_cm_iface_config_t *config = ucs_derived_of(tl_config, uct_cm_iface_config_t); uct_ib_iface_init_attr_t init_attr = {}; ucs_status_t status; int ret; ucs_trace_func(""); init_attr.tx_cq_len = 1; init_attr.rx_cq_len = config->super.rx.queue_len; init_attr.seg_size = ucs_min(IB_CM_SIDR_REQ_PRIVATE_DATA_SIZE, config->super.seg_size); UCS_CLASS_CALL_SUPER_INIT(uct_ib_iface_t, &uct_cm_iface_ops, md, worker, params, &config->super, &init_attr); if (self->super.super.worker->async == NULL) { ucs_error("cm must have async!=NULL"); return UCS_ERR_INVALID_PARAM; } self->num_outstanding = 0; self->num_completions = 0; self->service_id = 0; self->config.timeout_ms = (int)(config->timeout * 1e3 + 0.5); self->config.max_outstanding = config->max_outstanding; self->config.retry_count = ucs_min(config->retry_count, UINT8_MAX); self->notify_q.head = NULL; self->slow_prog_id = UCS_CALLBACKQ_ID_NULL; ucs_queue_head_init(&self->notify_q); ucs_queue_head_init(&self->outstanding_q); /* Redefine receive desc release callback */ self->super.release_desc.cb = uct_cm_iface_release_desc; self->cmdev = ib_cm_open_device(uct_ib_iface_device(&self->super)->ibv_context); if (self->cmdev == NULL) { ucs_error("ib_cm_open_device() failed: %m. Check if ib_ucm.ko module is loaded."); status = UCS_ERR_NO_DEVICE; goto err; } status = ucs_sys_fcntl_modfl(self->cmdev->fd, O_NONBLOCK, 0); if (status != UCS_OK) { goto err_close_device; } ret = ib_cm_create_id(self->cmdev, &self->listen_id, self); if (ret) { ucs_error("ib_cm_create_id() failed: %m"); status = UCS_ERR_NO_DEVICE; goto err_close_device; } do { self->service_id = (uint32_t)(ucs_generate_uuid((uintptr_t)self) & (~IB_CM_ASSIGN_SERVICE_ID_MASK)); ret = ib_cm_listen(self->listen_id, self->service_id, 0); if (ret) { if (errno == EBUSY) { /* The generated service id is already in use - try to * generate another one. */ ucs_debug("ib_cm service id 0x%x already in use, " "trying another one", self->service_id); continue; } else { ucs_error("ib_cm_listen(service_id=0x%x) failed: %m", self->service_id); status = UCS_ERR_INVALID_ADDR; goto err_destroy_id; } } } while (ret); if (self->super.super.worker->async->mode == UCS_ASYNC_MODE_SIGNAL) { ucs_warn("ib_cm fd does not support SIGIO"); } status = ucs_async_set_event_handler(self->super.super.worker->async->mode, self->cmdev->fd, UCS_EVENT_SET_EVREAD, uct_cm_iface_event_handler, self, self->super.super.worker->async); if (status != UCS_OK) { ucs_error("failed to set event handler"); goto err_destroy_id; } ucs_debug("listening for SIDR service_id 0x%x on fd %d", self->service_id, self->cmdev->fd); return UCS_OK; err_destroy_id: ib_cm_destroy_id(self->listen_id); err_close_device: ib_cm_close_device(self->cmdev); err: return status; } static UCS_CLASS_CLEANUP_FUNC(uct_cm_iface_t) { ucs_trace_func(""); ucs_async_remove_handler(self->cmdev->fd, 1); uct_cm_enter(self); uct_cm_iface_outstanding_purge(self); ib_cm_destroy_id(self->listen_id); ib_cm_close_device(self->cmdev); uct_worker_progress_unregister_safe(&uct_cm_iface_worker(self)->super, &self->slow_prog_id); uct_cm_leave(self); /* At this point all outstanding have been removed, and no further events * can be added. */ } UCS_CLASS_DEFINE(uct_cm_iface_t, uct_ib_iface_t); static UCS_CLASS_DEFINE_NEW_FUNC(uct_cm_iface_t, uct_iface_t, uct_md_h, uct_worker_h, const uct_iface_params_t*, const uct_iface_config_t*); static UCS_CLASS_DEFINE_DELETE_FUNC(uct_cm_iface_t, uct_iface_t); static ucs_status_t uct_cm_iface_query(uct_iface_h tl_iface, uct_iface_attr_t *iface_attr) { uct_cm_iface_t *iface = ucs_derived_of(tl_iface, uct_cm_iface_t); ucs_status_t status; size_t mtu; status = uct_ib_iface_query(&iface->super, 32 /* TODO */, iface_attr); if (status != UCS_OK) { return status; } iface_attr->overhead = 1200e-9; mtu = ucs_min(IB_CM_SIDR_REQ_PRIVATE_DATA_SIZE - sizeof(uct_cm_hdr_t), UINT8_MAX); iface_attr->cap.am.max_bcopy = mtu; iface_attr->iface_addr_len = sizeof(uint32_t); iface_attr->ep_addr_len = 0; iface_attr->max_conn_priv = 0; iface_attr->cap.flags = UCT_IFACE_FLAG_AM_BCOPY | UCT_IFACE_FLAG_AM_DUP | UCT_IFACE_FLAG_PENDING | UCT_IFACE_FLAG_CB_ASYNC | UCT_IFACE_FLAG_CONNECT_TO_IFACE; return UCS_OK; } static ucs_status_t uct_cm_iface_get_address(uct_iface_h tl_iface, uct_iface_addr_t *iface_addr) { uct_cm_iface_t *iface = ucs_derived_of(tl_iface, uct_cm_iface_t); *(uint32_t*)iface_addr = iface->service_id; return UCS_OK; } static uct_ib_iface_ops_t uct_cm_iface_ops = { { .ep_am_bcopy = uct_cm_ep_am_bcopy, .ep_pending_add = uct_cm_ep_pending_add, .ep_pending_purge = uct_cm_ep_pending_purge, .ep_flush = uct_cm_ep_flush, .ep_fence = uct_base_ep_fence, .ep_create = UCS_CLASS_NEW_FUNC_NAME(uct_cm_ep_t), .ep_destroy = UCS_CLASS_DELETE_FUNC_NAME(uct_cm_ep_t), .iface_flush = uct_cm_iface_flush, .iface_fence = uct_base_iface_fence, .iface_progress_enable = ucs_empty_function, .iface_progress_disable = ucs_empty_function, .iface_progress = ucs_empty_function_return_zero, .iface_close = UCS_CLASS_DELETE_FUNC_NAME(uct_cm_iface_t), .iface_query = uct_cm_iface_query, .iface_get_device_address = uct_ib_iface_get_device_address, .iface_get_address = uct_cm_iface_get_address, .iface_is_reachable = uct_ib_iface_is_reachable }, .create_cq = uct_ib_verbs_create_cq, .arm_cq = (void*)ucs_empty_function_return_success, }; static int uct_cm_is_module_loaded(uct_ib_md_t *ib_md) { struct ib_cm_device *cmdev = NULL; cmdev = ib_cm_open_device(ib_md->dev.ibv_context); if (cmdev == NULL) { ucs_debug("ib_cm_open_device() for %s failed: %m. " "Check if ib_ucm.ko module is loaded.", uct_ib_device_name(&ib_md->dev)); return 0; } ib_cm_close_device(cmdev); return 1; } static ucs_status_t uct_cm_query_tl_devices(uct_md_h md, uct_tl_device_resource_t **tl_devices_p, unsigned *num_tl_devices_p) { uct_ib_md_t *ib_md = ucs_derived_of(md, uct_ib_md_t); if (!uct_cm_is_module_loaded(ib_md)) { *num_tl_devices_p = 0; *tl_devices_p = NULL; return UCS_OK; } return uct_ib_device_query_ports(&ib_md->dev, UCT_IB_DEVICE_FLAG_LINK_IB, tl_devices_p, num_tl_devices_p); } UCT_TL_DEFINE(&uct_ib_component, cm, uct_cm_query_tl_devices, uct_cm_iface_t, "CM_", uct_cm_iface_config_table, uct_cm_iface_config_t);