/** * Copyright (c) UT-Battelle, LLC. 2014-2015. ALL RIGHTS RESERVED. * Copyright (C) Mellanox Technologies Ltd. 2001-2019. ALL RIGHTS RESERVED. * See file LICENSE for terms. */ #ifdef HAVE_CONFIG_H # include "config.h" #endif #include "mm_iface.h" #include "mm_ep.h" #include #include #include #include #include #include #include /* Maximal number of events to clear from the signaling pipe in single call */ #define UCT_MM_IFACE_MAX_SIG_EVENTS 32 ucs_config_field_t uct_mm_iface_config_table[] = { {"SM_", "ALLOC=md,mmap,heap", NULL, ucs_offsetof(uct_mm_iface_config_t, super), UCS_CONFIG_TYPE_TABLE(uct_sm_iface_config_table)}, {"FIFO_SIZE", "64", "Size of the receive FIFO in the memory-map UCTs.", ucs_offsetof(uct_mm_iface_config_t, fifo_size), UCS_CONFIG_TYPE_UINT}, {"SEG_SIZE", "8256", "Size of send/receive buffers for copy-out sends.", ucs_offsetof(uct_mm_iface_config_t, seg_size), UCS_CONFIG_TYPE_MEMUNITS}, {"FIFO_RELEASE_FACTOR", "0.5", "Frequency of resource releasing on the receiver's side in the MM UCT.\n" "This value refers to the percentage of the FIFO size. (must be >= 0 and < 1).", ucs_offsetof(uct_mm_iface_config_t, release_fifo_factor), UCS_CONFIG_TYPE_DOUBLE}, UCT_IFACE_MPOOL_CONFIG_FIELDS("RX_", -1, 512, "receive", ucs_offsetof(uct_mm_iface_config_t, mp), ""), {"FIFO_HUGETLB", "no", "Enable using huge pages for internal shared memory buffers." "Possible values are:\n" " y - Allocate memory using huge pages only.\n" " n - Allocate memory using regular pages only.\n" " try - Try to allocate memory using huge pages and if it fails, allocate regular pages.", ucs_offsetof(uct_mm_iface_config_t, hugetlb_mode), UCS_CONFIG_TYPE_TERNARY}, {"FIFO_ELEM_SIZE", "128", "Size of the FIFO element size (data + header) in the MM UCTs.", ucs_offsetof(uct_mm_iface_config_t, fifo_elem_size), UCS_CONFIG_TYPE_UINT}, {NULL} }; static ucs_status_t uct_mm_iface_get_address(uct_iface_t *tl_iface, uct_iface_addr_t *addr) { uct_mm_iface_t *iface = ucs_derived_of(tl_iface, uct_mm_iface_t); uct_mm_md_t *md = ucs_derived_of(iface->super.super.md, uct_mm_md_t); uct_mm_iface_addr_t *iface_addr = (void*)addr; uct_mm_seg_t *seg = iface->recv_fifo_mem.memh; iface_addr->fifo_seg_id = seg->seg_id; return uct_mm_md_mapper_ops(md)->iface_addr_pack(md, iface_addr + 1); } static int uct_mm_iface_is_reachable(const uct_iface_h tl_iface, const uct_device_addr_t *dev_addr, const uct_iface_addr_t *tl_iface_addr) { uct_mm_iface_t *iface = ucs_derived_of(tl_iface, uct_mm_iface_t); uct_mm_md_t *md = ucs_derived_of(iface->super.super.md, uct_mm_md_t); uct_mm_iface_addr_t *iface_addr = (void*)tl_iface_addr; if (!uct_sm_iface_is_reachable(tl_iface, dev_addr, tl_iface_addr)) { return 0; } return uct_mm_md_mapper_ops(md)->is_reachable(md, iface_addr->fifo_seg_id, iface_addr + 1); } void uct_mm_iface_release_desc(uct_recv_desc_t *self, void *desc) { void *mm_desc; mm_desc = UCS_PTR_BYTE_OFFSET(desc, -sizeof(uct_mm_recv_desc_t)); ucs_mpool_put(mm_desc); } ucs_status_t uct_mm_iface_flush(uct_iface_h tl_iface, unsigned flags, uct_completion_t *comp) { if (comp != NULL) { return UCS_ERR_UNSUPPORTED; } ucs_memory_cpu_store_fence(); UCT_TL_IFACE_STAT_FLUSH(ucs_derived_of(tl_iface, uct_base_iface_t)); return UCS_OK; } static ucs_status_t uct_mm_iface_query(uct_iface_h tl_iface, uct_iface_attr_t *iface_attr) { uct_mm_iface_t *iface = ucs_derived_of(tl_iface, uct_mm_iface_t); uct_mm_md_t *md = ucs_derived_of(iface->super.super.md, uct_mm_md_t); uct_base_iface_query(&iface->super.super, iface_attr); /* default values for all shared memory transports */ iface_attr->cap.put.max_short = UINT_MAX; iface_attr->cap.put.max_bcopy = SIZE_MAX; iface_attr->cap.put.min_zcopy = 0; iface_attr->cap.put.max_zcopy = SIZE_MAX; iface_attr->cap.put.opt_zcopy_align = UCS_SYS_CACHE_LINE_SIZE; iface_attr->cap.put.align_mtu = iface_attr->cap.put.opt_zcopy_align; iface_attr->cap.put.max_iov = 1; iface_attr->cap.get.max_bcopy = SIZE_MAX; iface_attr->cap.get.min_zcopy = 0; iface_attr->cap.get.max_zcopy = SIZE_MAX; iface_attr->cap.get.opt_zcopy_align = UCS_SYS_CACHE_LINE_SIZE; iface_attr->cap.get.align_mtu = iface_attr->cap.get.opt_zcopy_align; iface_attr->cap.get.max_iov = 1; iface_attr->cap.am.max_short = iface->config.fifo_elem_size - sizeof(uct_mm_fifo_element_t); iface_attr->cap.am.max_bcopy = iface->config.seg_size; iface_attr->cap.am.min_zcopy = 0; iface_attr->cap.am.max_zcopy = 0; iface_attr->cap.am.opt_zcopy_align = UCS_SYS_CACHE_LINE_SIZE; iface_attr->cap.am.align_mtu = iface_attr->cap.am.opt_zcopy_align; iface_attr->cap.am.max_iov = 1; iface_attr->iface_addr_len = sizeof(uct_mm_iface_addr_t) + md->iface_addr_len; iface_attr->device_addr_len = uct_sm_iface_get_device_addr_len(); iface_attr->ep_addr_len = 0; iface_attr->max_conn_priv = 0; iface_attr->cap.flags = UCT_IFACE_FLAG_PUT_SHORT | UCT_IFACE_FLAG_PUT_BCOPY | UCT_IFACE_FLAG_ATOMIC_CPU | UCT_IFACE_FLAG_GET_BCOPY | UCT_IFACE_FLAG_AM_SHORT | UCT_IFACE_FLAG_AM_BCOPY | UCT_IFACE_FLAG_PENDING | UCT_IFACE_FLAG_CB_SYNC | UCT_IFACE_FLAG_EVENT_SEND_COMP | UCT_IFACE_FLAG_EVENT_RECV_SIG | UCT_IFACE_FLAG_CONNECT_TO_IFACE; iface_attr->cap.atomic32.op_flags = iface_attr->cap.atomic64.op_flags = UCS_BIT(UCT_ATOMIC_OP_ADD) | UCS_BIT(UCT_ATOMIC_OP_AND) | UCS_BIT(UCT_ATOMIC_OP_OR) | UCS_BIT(UCT_ATOMIC_OP_XOR); iface_attr->cap.atomic32.fop_flags = iface_attr->cap.atomic64.fop_flags = UCS_BIT(UCT_ATOMIC_OP_ADD) | UCS_BIT(UCT_ATOMIC_OP_AND) | UCS_BIT(UCT_ATOMIC_OP_OR) | UCS_BIT(UCT_ATOMIC_OP_XOR) | UCS_BIT(UCT_ATOMIC_OP_SWAP) | UCS_BIT(UCT_ATOMIC_OP_CSWAP); iface_attr->latency.overhead = 80e-9; /* 80 ns */ iface_attr->latency.growth = 0; iface_attr->bandwidth.dedicated = iface->super.config.bandwidth; iface_attr->bandwidth.shared = 0; iface_attr->overhead = 10e-9; /* 10 ns */ iface_attr->priority = 0; return UCS_OK; } static inline void uct_mm_progress_fifo_tail(uct_mm_iface_t *iface) { /* don't progress the tail every time - release in batches. improves performance */ if (iface->read_index & iface->fifo_release_factor_mask) { return; } iface->recv_fifo_ctl->tail = iface->read_index; } ucs_status_t uct_mm_assign_desc_to_fifo_elem(uct_mm_iface_t *iface, uct_mm_fifo_element_t *elem, unsigned need_new_desc) { uct_mm_recv_desc_t *desc; if (!need_new_desc) { desc = iface->last_recv_desc; } else { UCT_TL_IFACE_GET_RX_DESC(&iface->super.super, &iface->recv_desc_mp, desc, return UCS_ERR_NO_RESOURCE); } elem->desc = desc->info; elem->desc_data = UCS_PTR_BYTE_OFFSET(desc + 1, iface->rx_headroom); return UCS_OK; } static inline ucs_status_t uct_mm_iface_process_recv(uct_mm_iface_t *iface, uct_mm_fifo_element_t* elem) { ucs_status_t status; void *data; if (ucs_likely(elem->flags & UCT_MM_FIFO_ELEM_FLAG_INLINE)) { /* read short (inline) messages from the FIFO elements */ uct_iface_trace_am(&iface->super.super, UCT_AM_TRACE_TYPE_RECV, elem->am_id, elem + 1, elem->length, "RX: AM_SHORT"); status = uct_mm_iface_invoke_am(iface, elem->am_id, elem + 1, elem->length, 0); } else { /* read bcopy messages from the receive descriptors */ data = elem->desc_data; VALGRIND_MAKE_MEM_DEFINED(data, elem->length); uct_iface_trace_am(&iface->super.super, UCT_AM_TRACE_TYPE_RECV, elem->am_id, data, elem->length, "RX: AM_BCOPY"); status = uct_mm_iface_invoke_am(iface, elem->am_id, data, elem->length, UCT_CB_PARAM_FLAG_DESC); if (status != UCS_OK) { /* assign a new receive descriptor to this FIFO element.*/ uct_mm_assign_desc_to_fifo_elem(iface, elem, 0); } } return status; } static inline unsigned uct_mm_iface_poll_fifo(uct_mm_iface_t *iface) { uint64_t read_index_loc, read_index; uct_mm_fifo_element_t* read_index_elem; ucs_status_t status; /* check the memory pool to make sure that there is a new descriptor available */ if (ucs_unlikely(iface->last_recv_desc == NULL)) { UCT_TL_IFACE_GET_RX_DESC(&iface->super.super, &iface->recv_desc_mp, iface->last_recv_desc, return 0); } read_index = iface->read_index; read_index_loc = (read_index & iface->fifo_mask); /* the fifo_element which the read_index points to */ read_index_elem = UCT_MM_IFACE_GET_FIFO_ELEM(iface, iface->recv_fifo_elems, read_index_loc); /* check the read_index to see if there is a new item to read (checking the owner bit) */ if (((read_index >> iface->fifo_shift) & 1) == ((read_index_elem->flags) & 1)) { /* read from read_index_elem */ ucs_memory_cpu_load_fence(); ucs_assert(iface->read_index <= iface->recv_fifo_ctl->head); status = uct_mm_iface_process_recv(iface, read_index_elem); if (status != UCS_OK) { /* the last_recv_desc is in use. get a new descriptor for it */ UCT_TL_IFACE_GET_RX_DESC(&iface->super.super, &iface->recv_desc_mp, iface->last_recv_desc, ucs_debug("recv mpool is empty")); } /* raise the read_index. */ iface->read_index++; uct_mm_progress_fifo_tail(iface); return 1; } else { return 0; } } unsigned uct_mm_iface_progress(void *arg) { uct_mm_iface_t *iface = arg; unsigned count; /* progress receive */ count = uct_mm_iface_poll_fifo(iface); /* progress the pending sends (if there are any) */ ucs_arbiter_dispatch(&iface->arbiter, 1, uct_mm_ep_process_pending, NULL); return count; } static ucs_status_t uct_mm_iface_event_fd_get(uct_iface_h tl_iface, int *fd_p) { *fd_p = ucs_derived_of(tl_iface, uct_mm_iface_t)->signal_fd; return UCS_OK; } static ucs_status_t uct_mm_iface_event_fd_arm(uct_iface_h tl_iface, unsigned events) { uct_mm_iface_t *iface = ucs_derived_of(tl_iface, uct_mm_iface_t); char dummy[UCT_MM_IFACE_MAX_SIG_EVENTS]; /* pop multiple signals at once */ int ret; ret = recvfrom(iface->signal_fd, &dummy, sizeof(dummy), 0, NULL, 0); if (ret > 0) { return UCS_ERR_BUSY; } else if (ret == -1) { if (errno == EAGAIN) { return UCS_OK; } else if (errno == EINTR) { return UCS_ERR_BUSY; } else { ucs_error("failed to retrieve message from signal pipe: %m"); return UCS_ERR_IO_ERROR; } } else { ucs_assert(ret == 0); return UCS_OK; } } static UCS_CLASS_DECLARE_DELETE_FUNC(uct_mm_iface_t, uct_iface_t); static uct_iface_ops_t uct_mm_iface_ops = { .ep_put_short = uct_sm_ep_put_short, .ep_put_bcopy = uct_sm_ep_put_bcopy, .ep_get_bcopy = uct_sm_ep_get_bcopy, .ep_am_short = uct_mm_ep_am_short, .ep_am_bcopy = uct_mm_ep_am_bcopy, .ep_atomic_cswap64 = uct_sm_ep_atomic_cswap64, .ep_atomic64_post = uct_sm_ep_atomic64_post, .ep_atomic64_fetch = uct_sm_ep_atomic64_fetch, .ep_atomic_cswap32 = uct_sm_ep_atomic_cswap32, .ep_atomic32_post = uct_sm_ep_atomic32_post, .ep_atomic32_fetch = uct_sm_ep_atomic32_fetch, .ep_pending_add = uct_mm_ep_pending_add, .ep_pending_purge = uct_mm_ep_pending_purge, .ep_flush = uct_mm_ep_flush, .ep_fence = uct_sm_ep_fence, .ep_create = UCS_CLASS_NEW_FUNC_NAME(uct_mm_ep_t), .ep_destroy = UCS_CLASS_DELETE_FUNC_NAME(uct_mm_ep_t), .iface_flush = uct_mm_iface_flush, .iface_fence = uct_sm_iface_fence, .iface_progress_enable = uct_base_iface_progress_enable, .iface_progress_disable = uct_base_iface_progress_disable, .iface_progress = (uct_iface_progress_func_t)uct_mm_iface_progress, .iface_event_fd_get = uct_mm_iface_event_fd_get, .iface_event_arm = uct_mm_iface_event_fd_arm, .iface_close = UCS_CLASS_DELETE_FUNC_NAME(uct_mm_iface_t), .iface_query = uct_mm_iface_query, .iface_get_device_address = uct_sm_iface_get_device_address, .iface_get_address = uct_mm_iface_get_address, .iface_is_reachable = uct_mm_iface_is_reachable }; static void uct_mm_iface_recv_desc_init(uct_iface_h tl_iface, void *obj, uct_mem_h memh) { uct_mm_iface_t *iface = ucs_derived_of(tl_iface, uct_mm_iface_t); uct_mm_recv_desc_t *desc = obj; uct_mm_seg_t *seg = memh; size_t offset; if (seg->length > UINT_MAX) { ucs_error("mm: shared memory segment length cannot exceed %u", UINT_MAX); desc->info.seg_id = UINT64_MAX; desc->info.seg_size = 0; desc->info.offset = 0; return; } offset = UCS_PTR_BYTE_DIFF(seg->address, desc + 1) + iface->rx_headroom; ucs_assert(offset <= UINT_MAX); desc->info.seg_id = seg->seg_id; desc->info.seg_size = seg->length; desc->info.offset = offset; } static void uct_mm_iface_free_rx_descs(uct_mm_iface_t *iface, unsigned num_elems) { uct_mm_fifo_element_t *elem; uct_mm_recv_desc_t *desc; unsigned i; for (i = 0; i < num_elems; i++) { elem = UCT_MM_IFACE_GET_FIFO_ELEM(iface, iface->recv_fifo_elems, i); desc = (uct_mm_recv_desc_t*)UCS_PTR_BYTE_OFFSET(elem->desc_data, -iface->rx_headroom) - 1; ucs_mpool_put(desc); } } void uct_mm_iface_set_fifo_ptrs(void *fifo_mem, uct_mm_fifo_ctl_t **fifo_ctl_p, void **fifo_elems_p) { uct_mm_fifo_ctl_t *fifo_ctl; /* initiate the the uct_mm_fifo_ctl struct, holding the head and the tail */ fifo_ctl = (uct_mm_fifo_ctl_t*)ucs_align_up_pow2 ((uintptr_t)fifo_mem, UCS_SYS_CACHE_LINE_SIZE); /* Make sure head and tail are cache-aligned, and not on same cacheline, to * avoid false-sharing. */ ucs_assert_always( (((uintptr_t)&fifo_ctl->head) % UCS_SYS_CACHE_LINE_SIZE) == 0); ucs_assert_always( (((uintptr_t)&fifo_ctl->tail) % UCS_SYS_CACHE_LINE_SIZE) == 0); ucs_assert_always( ((uintptr_t)&fifo_ctl->tail - (uintptr_t)&fifo_ctl->head) >= UCS_SYS_CACHE_LINE_SIZE); /* initiate the pointer to the beginning of the first FIFO element */ *fifo_ctl_p = fifo_ctl; *fifo_elems_p = UCS_PTR_BYTE_OFFSET(fifo_ctl, UCT_MM_FIFO_CTL_SIZE); } static ucs_status_t uct_mm_iface_create_signal_fd(uct_mm_iface_t *iface) { ucs_status_t status; socklen_t addrlen; struct sockaddr_un bind_addr; int ret; /* Create a UNIX domain socket to send and receive wakeup signal from remote processes */ iface->signal_fd = socket(AF_UNIX, SOCK_DGRAM, 0); if (iface->signal_fd < 0) { ucs_error("Failed to create unix domain socket for signal: %m"); status = UCS_ERR_IO_ERROR; goto err; } /* Set the signal socket to non-blocking mode */ status = ucs_sys_fcntl_modfl(iface->signal_fd, O_NONBLOCK, 0); if (status != UCS_OK) { goto err_close; } /* Bind the signal socket to automatic address */ bind_addr.sun_family = AF_UNIX; memset(bind_addr.sun_path, 0, sizeof(bind_addr.sun_path)); ret = bind(iface->signal_fd, (struct sockaddr*)&bind_addr, sizeof(sa_family_t)); if (ret < 0) { ucs_error("Failed to auto-bind unix domain socket: %m"); status = UCS_ERR_IO_ERROR; goto err_close; } /* Share the socket address on the FIFO control area, so we would not have * to enlarge the interface address size. */ addrlen = sizeof(struct sockaddr_un); memset(&iface->recv_fifo_ctl->signal_sockaddr, 0, addrlen); ret = getsockname(iface->signal_fd, (struct sockaddr *)ucs_unaligned_ptr(&iface->recv_fifo_ctl->signal_sockaddr), &addrlen); if (ret < 0) { ucs_error("Failed to retrieve unix domain socket address: %m"); status = UCS_ERR_IO_ERROR; goto err_close; } iface->recv_fifo_ctl->signal_addrlen = addrlen; return UCS_OK; err_close: close(iface->signal_fd); err: return status; } static void uct_mm_iface_log_created(uct_mm_iface_t *iface) { uct_mm_seg_t UCS_V_UNUSED *seg = iface->recv_fifo_mem.memh; ucs_debug("created mm iface %p FIFO id 0x%lx va %p size %zu (%u x %u elems)", iface, seg->seg_id, seg->address, seg->length, iface->config.fifo_elem_size, iface->config.fifo_size); } static UCS_CLASS_INIT_FUNC(uct_mm_iface_t, uct_md_h md, uct_worker_h worker, const uct_iface_params_t *params, const uct_iface_config_t *tl_config) { uct_mm_iface_config_t *mm_config = ucs_derived_of(tl_config, uct_mm_iface_config_t); uct_mm_fifo_element_t* fifo_elem_p; ucs_status_t status; unsigned i; UCS_CLASS_CALL_SUPER_INIT(uct_sm_iface_t, &uct_mm_iface_ops, md, worker, params, tl_config); if (ucs_derived_of(worker, uct_priv_worker_t)->thread_mode == UCS_THREAD_MODE_MULTI) { ucs_error("Shared memory transport does not support multi-threaded worker"); return UCS_ERR_INVALID_PARAM; } /* check that the fifo size, from the user, is a power of two and bigger than 1 */ if ((mm_config->fifo_size <= 1) || ucs_is_pow2(mm_config->fifo_size) != 1) { ucs_error("The MM FIFO size must be a power of two and bigger than 1."); status = UCS_ERR_INVALID_PARAM; goto err; } /* check the value defining the FIFO batch release */ if ((mm_config->release_fifo_factor < 0) || (mm_config->release_fifo_factor >= 1)) { ucs_error("The MM release FIFO factor must be: (0 =< factor < 1)."); status = UCS_ERR_INVALID_PARAM; goto err; } /* check the value defining the size of the FIFO element */ if (mm_config->fifo_elem_size <= sizeof(uct_mm_fifo_element_t)) { ucs_error("The UCX_MM_FIFO_ELEM_SIZE parameter (%u) must be larger " "than the FIFO element header size (%ld bytes).", mm_config->fifo_elem_size, sizeof(uct_mm_fifo_element_t)); status = UCS_ERR_INVALID_PARAM; goto err; } self->config.fifo_size = mm_config->fifo_size; self->config.fifo_elem_size = mm_config->fifo_elem_size; self->config.seg_size = mm_config->seg_size; /* cppcheck-suppress internalAstError */ self->fifo_release_factor_mask = UCS_MASK(ucs_ilog2(ucs_max((int) (mm_config->fifo_size * mm_config->release_fifo_factor), 1))); self->fifo_mask = mm_config->fifo_size - 1; self->fifo_shift = ucs_count_trailing_zero_bits(mm_config->fifo_size); self->rx_headroom = (params->field_mask & UCT_IFACE_PARAM_FIELD_RX_HEADROOM) ? params->rx_headroom : 0; self->release_desc.cb = uct_mm_iface_release_desc; /* Allocate the receive FIFO */ status = uct_iface_mem_alloc(&self->super.super.super, UCT_MM_GET_FIFO_SIZE(self), UCT_MD_MEM_ACCESS_ALL, "mm_recv_fifo", &self->recv_fifo_mem); if (status != UCS_OK) { ucs_error("mm_iface failed to allocate receive FIFO"); return status; } uct_mm_iface_set_fifo_ptrs(self->recv_fifo_mem.address, &self->recv_fifo_ctl, &self->recv_fifo_elems); self->recv_fifo_ctl->head = 0; self->recv_fifo_ctl->tail = 0; self->read_index = 0; /* create a unix file descriptor to receive event notifications */ status = uct_mm_iface_create_signal_fd(self); if (status != UCS_OK) { goto err_free_fifo; } /* create a memory pool for receive descriptors */ status = uct_iface_mpool_init(&self->super.super, &self->recv_desc_mp, sizeof(uct_mm_recv_desc_t) + self->rx_headroom + self->config.seg_size, sizeof(uct_mm_recv_desc_t), UCS_SYS_CACHE_LINE_SIZE, &mm_config->mp, 512, uct_mm_iface_recv_desc_init, "mm_recv_desc"); if (status != UCS_OK) { ucs_error("failed to create a receive descriptor memory pool for the MM transport"); goto err_close_signal_fd; } ucs_mpool_grow(&self->recv_desc_mp, mm_config->fifo_size * 2); /* set the first receive descriptor */ self->last_recv_desc = ucs_mpool_get(&self->recv_desc_mp); VALGRIND_MAKE_MEM_DEFINED(self->last_recv_desc, sizeof(*(self->last_recv_desc))); if (self->last_recv_desc == NULL) { ucs_error("failed to get the first receive descriptor"); status = UCS_ERR_NO_RESOURCE; goto destroy_recv_mpool; } /* initiate the owner bit in all the FIFO elements and assign a receive descriptor * per every FIFO element */ for (i = 0; i < mm_config->fifo_size; i++) { fifo_elem_p = UCT_MM_IFACE_GET_FIFO_ELEM(self, self->recv_fifo_elems, i); fifo_elem_p->flags = UCT_MM_FIFO_ELEM_FLAG_OWNER; status = uct_mm_assign_desc_to_fifo_elem(self, fifo_elem_p, 1); if (status != UCS_OK) { ucs_error("failed to allocate a descriptor for MM"); goto destroy_descs; } } ucs_arbiter_init(&self->arbiter); uct_mm_iface_log_created(self); return UCS_OK; destroy_descs: uct_mm_iface_free_rx_descs(self, i); ucs_mpool_put(self->last_recv_desc); destroy_recv_mpool: ucs_mpool_cleanup(&self->recv_desc_mp, 1); err_close_signal_fd: close(self->signal_fd); err_free_fifo: uct_iface_mem_free(&self->recv_fifo_mem); err: return status; } static UCS_CLASS_CLEANUP_FUNC(uct_mm_iface_t) { uct_base_iface_progress_disable(&self->super.super.super, UCT_PROGRESS_SEND | UCT_PROGRESS_RECV); /* return all the descriptors that are now 'assigned' to the FIFO, * to their mpool */ uct_mm_iface_free_rx_descs(self, self->config.fifo_size); ucs_mpool_put(self->last_recv_desc); ucs_mpool_cleanup(&self->recv_desc_mp, 1); close(self->signal_fd); uct_iface_mem_free(&self->recv_fifo_mem); ucs_arbiter_cleanup(&self->arbiter); } UCS_CLASS_DEFINE(uct_mm_iface_t, uct_base_iface_t); UCS_CLASS_DEFINE_NEW_FUNC(uct_mm_iface_t, uct_iface_t, uct_md_h, uct_worker_h, const uct_iface_params_t*, const uct_iface_config_t*); static UCS_CLASS_DEFINE_DELETE_FUNC(uct_mm_iface_t, uct_iface_t);