Blob Blame History Raw
/**
 * Copyright (c) UT-Battelle, LLC. 2014-2015. ALL RIGHTS RESERVED.
 * Copyright (C) Mellanox Technologies Ltd. 2001-2019.  ALL RIGHTS RESERVED.
 * See file LICENSE for terms.
 */

#ifdef HAVE_CONFIG_H
#  include "config.h"
#endif

#include "mm_iface.h"
#include "mm_ep.h"

#include <uct/base/uct_worker.h>
#include <uct/sm/base/sm_ep.h>
#include <ucs/arch/atomic.h>
#include <ucs/arch/bitops.h>
#include <ucs/async/async.h>
#include <ucs/sys/string.h>
#include <sys/poll.h>


/* Maximal number of events to clear from the signaling pipe in single call */
#define UCT_MM_IFACE_MAX_SIG_EVENTS  32


ucs_config_field_t uct_mm_iface_config_table[] = {
    {"SM_", "ALLOC=md,mmap,heap", NULL,
     ucs_offsetof(uct_mm_iface_config_t, super),
     UCS_CONFIG_TYPE_TABLE(uct_sm_iface_config_table)},

    {"FIFO_SIZE", "64",
     "Size of the receive FIFO in the memory-map UCTs.",
     ucs_offsetof(uct_mm_iface_config_t, fifo_size), UCS_CONFIG_TYPE_UINT},

    {"SEG_SIZE", "8256",
     "Size of send/receive buffers for copy-out sends.",
     ucs_offsetof(uct_mm_iface_config_t, seg_size), UCS_CONFIG_TYPE_MEMUNITS},

    {"FIFO_RELEASE_FACTOR", "0.5",
     "Frequency of resource releasing on the receiver's side in the MM UCT.\n"
     "This value refers to the percentage of the FIFO size. (must be >= 0 and < 1).",
     ucs_offsetof(uct_mm_iface_config_t, release_fifo_factor), UCS_CONFIG_TYPE_DOUBLE},

    UCT_IFACE_MPOOL_CONFIG_FIELDS("RX_", -1, 512, "receive",
                                  ucs_offsetof(uct_mm_iface_config_t, mp), ""),

    {"FIFO_HUGETLB", "no",
     "Enable using huge pages for internal shared memory buffers."
     "Possible values are:\n"
     " y   - Allocate memory using huge pages only.\n"
     " n   - Allocate memory using regular pages only.\n"
     " try - Try to allocate memory using huge pages and if it fails, allocate regular pages.",
     ucs_offsetof(uct_mm_iface_config_t, hugetlb_mode), UCS_CONFIG_TYPE_TERNARY},

    {"FIFO_ELEM_SIZE", "128",
     "Size of the FIFO element size (data + header) in the MM UCTs.",
     ucs_offsetof(uct_mm_iface_config_t, fifo_elem_size), UCS_CONFIG_TYPE_UINT},

    {NULL}
};

static ucs_status_t uct_mm_iface_get_address(uct_iface_t *tl_iface,
                                             uct_iface_addr_t *addr)
{
    uct_mm_iface_t      *iface      = ucs_derived_of(tl_iface, uct_mm_iface_t);
    uct_mm_md_t         *md         = ucs_derived_of(iface->super.super.md,
                                                     uct_mm_md_t);
    uct_mm_iface_addr_t *iface_addr = (void*)addr;
    uct_mm_seg_t        *seg        = iface->recv_fifo_mem.memh;

    iface_addr->fifo_seg_id = seg->seg_id;
    return uct_mm_md_mapper_ops(md)->iface_addr_pack(md, iface_addr + 1);
}

static int
uct_mm_iface_is_reachable(const uct_iface_h tl_iface,
                          const uct_device_addr_t *dev_addr,
                          const uct_iface_addr_t *tl_iface_addr)
{
    uct_mm_iface_t      *iface      = ucs_derived_of(tl_iface, uct_mm_iface_t);
    uct_mm_md_t         *md         = ucs_derived_of(iface->super.super.md,
                                                     uct_mm_md_t);
    uct_mm_iface_addr_t *iface_addr = (void*)tl_iface_addr;

    if (!uct_sm_iface_is_reachable(tl_iface, dev_addr, tl_iface_addr)) {
        return 0;
    }

    return uct_mm_md_mapper_ops(md)->is_reachable(md, iface_addr->fifo_seg_id,
                                                  iface_addr + 1);
}

void uct_mm_iface_release_desc(uct_recv_desc_t *self, void *desc)
{
    void *mm_desc;

    mm_desc = UCS_PTR_BYTE_OFFSET(desc, -sizeof(uct_mm_recv_desc_t));
    ucs_mpool_put(mm_desc);
}

ucs_status_t uct_mm_iface_flush(uct_iface_h tl_iface, unsigned flags,
                                uct_completion_t *comp)
{
    if (comp != NULL) {
        return UCS_ERR_UNSUPPORTED;
    }

    ucs_memory_cpu_store_fence();
    UCT_TL_IFACE_STAT_FLUSH(ucs_derived_of(tl_iface, uct_base_iface_t));
    return UCS_OK;
}

static ucs_status_t uct_mm_iface_query(uct_iface_h tl_iface,
                                       uct_iface_attr_t *iface_attr)
{
    uct_mm_iface_t *iface = ucs_derived_of(tl_iface, uct_mm_iface_t);
    uct_mm_md_t    *md    = ucs_derived_of(iface->super.super.md, uct_mm_md_t);

    uct_base_iface_query(&iface->super.super, iface_attr);

    /* default values for all shared memory transports */
    iface_attr->cap.put.max_short       = UINT_MAX;
    iface_attr->cap.put.max_bcopy       = SIZE_MAX;
    iface_attr->cap.put.min_zcopy       = 0;
    iface_attr->cap.put.max_zcopy       = SIZE_MAX;
    iface_attr->cap.put.opt_zcopy_align = UCS_SYS_CACHE_LINE_SIZE;
    iface_attr->cap.put.align_mtu       = iface_attr->cap.put.opt_zcopy_align;
    iface_attr->cap.put.max_iov         = 1;

    iface_attr->cap.get.max_bcopy       = SIZE_MAX;
    iface_attr->cap.get.min_zcopy       = 0;
    iface_attr->cap.get.max_zcopy       = SIZE_MAX;
    iface_attr->cap.get.opt_zcopy_align = UCS_SYS_CACHE_LINE_SIZE;
    iface_attr->cap.get.align_mtu       = iface_attr->cap.get.opt_zcopy_align;
    iface_attr->cap.get.max_iov         = 1;

    iface_attr->cap.am.max_short        = iface->config.fifo_elem_size -
                                          sizeof(uct_mm_fifo_element_t);
    iface_attr->cap.am.max_bcopy        = iface->config.seg_size;
    iface_attr->cap.am.min_zcopy        = 0;
    iface_attr->cap.am.max_zcopy        = 0;
    iface_attr->cap.am.opt_zcopy_align  = UCS_SYS_CACHE_LINE_SIZE;
    iface_attr->cap.am.align_mtu        = iface_attr->cap.am.opt_zcopy_align;
    iface_attr->cap.am.max_iov          = 1;

    iface_attr->iface_addr_len          = sizeof(uct_mm_iface_addr_t) +
                                          md->iface_addr_len;
    iface_attr->device_addr_len         = uct_sm_iface_get_device_addr_len();
    iface_attr->ep_addr_len             = 0;
    iface_attr->max_conn_priv           = 0;
    iface_attr->cap.flags               = UCT_IFACE_FLAG_PUT_SHORT           |
                                          UCT_IFACE_FLAG_PUT_BCOPY           |
                                          UCT_IFACE_FLAG_ATOMIC_CPU          |
                                          UCT_IFACE_FLAG_GET_BCOPY           |
                                          UCT_IFACE_FLAG_AM_SHORT            |
                                          UCT_IFACE_FLAG_AM_BCOPY            |
                                          UCT_IFACE_FLAG_PENDING             |
                                          UCT_IFACE_FLAG_CB_SYNC             |
                                          UCT_IFACE_FLAG_EVENT_SEND_COMP     |
                                          UCT_IFACE_FLAG_EVENT_RECV_SIG      |
                                          UCT_IFACE_FLAG_CONNECT_TO_IFACE;

    iface_attr->cap.atomic32.op_flags   =
    iface_attr->cap.atomic64.op_flags   = UCS_BIT(UCT_ATOMIC_OP_ADD)         |
                                          UCS_BIT(UCT_ATOMIC_OP_AND)         |
                                          UCS_BIT(UCT_ATOMIC_OP_OR)          |
                                          UCS_BIT(UCT_ATOMIC_OP_XOR);
    iface_attr->cap.atomic32.fop_flags  =
    iface_attr->cap.atomic64.fop_flags  = UCS_BIT(UCT_ATOMIC_OP_ADD)         |
                                          UCS_BIT(UCT_ATOMIC_OP_AND)         |
                                          UCS_BIT(UCT_ATOMIC_OP_OR)          |
                                          UCS_BIT(UCT_ATOMIC_OP_XOR)         |
                                          UCS_BIT(UCT_ATOMIC_OP_SWAP)        |
                                          UCS_BIT(UCT_ATOMIC_OP_CSWAP);

    iface_attr->latency.overhead        = 80e-9; /* 80 ns */
    iface_attr->latency.growth          = 0;
    iface_attr->bandwidth.dedicated     = iface->super.config.bandwidth;
    iface_attr->bandwidth.shared        = 0;
    iface_attr->overhead                = 10e-9; /* 10 ns */
    iface_attr->priority                = 0;

    return UCS_OK;
}

static inline void uct_mm_progress_fifo_tail(uct_mm_iface_t *iface)
{
    /* don't progress the tail every time - release in batches. improves performance */
    if (iface->read_index & iface->fifo_release_factor_mask) {
        return;
    }

    iface->recv_fifo_ctl->tail = iface->read_index;
}

ucs_status_t uct_mm_assign_desc_to_fifo_elem(uct_mm_iface_t *iface,
                                             uct_mm_fifo_element_t *elem,
                                             unsigned need_new_desc)
{
    uct_mm_recv_desc_t *desc;

    if (!need_new_desc) {
        desc = iface->last_recv_desc;
    } else {
        UCT_TL_IFACE_GET_RX_DESC(&iface->super.super, &iface->recv_desc_mp, desc,
                                 return UCS_ERR_NO_RESOURCE);
    }

    elem->desc      = desc->info;
    elem->desc_data = UCS_PTR_BYTE_OFFSET(desc + 1, iface->rx_headroom);
    return UCS_OK;
}

static inline ucs_status_t uct_mm_iface_process_recv(uct_mm_iface_t *iface,
                                                     uct_mm_fifo_element_t* elem)
{
    ucs_status_t status;
    void         *data;

    if (ucs_likely(elem->flags & UCT_MM_FIFO_ELEM_FLAG_INLINE)) {
        /* read short (inline) messages from the FIFO elements */
        uct_iface_trace_am(&iface->super.super, UCT_AM_TRACE_TYPE_RECV,
                           elem->am_id, elem + 1, elem->length, "RX: AM_SHORT");
        status = uct_mm_iface_invoke_am(iface, elem->am_id, elem + 1,
                                        elem->length, 0);
    } else {
        /* read bcopy messages from the receive descriptors */
        data = elem->desc_data;
        VALGRIND_MAKE_MEM_DEFINED(data, elem->length);

        uct_iface_trace_am(&iface->super.super, UCT_AM_TRACE_TYPE_RECV,
                           elem->am_id, data, elem->length, "RX: AM_BCOPY");

        status = uct_mm_iface_invoke_am(iface, elem->am_id, data, elem->length,
                                        UCT_CB_PARAM_FLAG_DESC);
        if (status != UCS_OK) {
            /* assign a new receive descriptor to this FIFO element.*/
            uct_mm_assign_desc_to_fifo_elem(iface, elem, 0);
        }
    }
    return status;
}

static inline unsigned uct_mm_iface_poll_fifo(uct_mm_iface_t *iface)
{
    uint64_t read_index_loc, read_index;
    uct_mm_fifo_element_t* read_index_elem;
    ucs_status_t status;

    /* check the memory pool to make sure that there is a new descriptor available */
    if (ucs_unlikely(iface->last_recv_desc == NULL)) {
        UCT_TL_IFACE_GET_RX_DESC(&iface->super.super, &iface->recv_desc_mp,
                                 iface->last_recv_desc, return 0);
    }

    read_index = iface->read_index;
    read_index_loc = (read_index & iface->fifo_mask);
    /* the fifo_element which the read_index points to */
    read_index_elem = UCT_MM_IFACE_GET_FIFO_ELEM(iface, iface->recv_fifo_elems,
                                                 read_index_loc);

    /* check the read_index to see if there is a new item to read (checking the owner bit) */
    if (((read_index >> iface->fifo_shift) & 1) == ((read_index_elem->flags) & 1)) {

        /* read from read_index_elem */
        ucs_memory_cpu_load_fence();
        ucs_assert(iface->read_index <= iface->recv_fifo_ctl->head);

        status = uct_mm_iface_process_recv(iface, read_index_elem);
        if (status != UCS_OK) {
            /* the last_recv_desc is in use. get a new descriptor for it */
            UCT_TL_IFACE_GET_RX_DESC(&iface->super.super, &iface->recv_desc_mp,
                                     iface->last_recv_desc, ucs_debug("recv mpool is empty"));
        }

        /* raise the read_index. */
        iface->read_index++;

        uct_mm_progress_fifo_tail(iface);

        return 1;
    } else {
        return 0;
    }
}

unsigned uct_mm_iface_progress(void *arg)
{
    uct_mm_iface_t *iface = arg;
    unsigned count;

    /* progress receive */
    count = uct_mm_iface_poll_fifo(iface);

    /* progress the pending sends (if there are any) */
    ucs_arbiter_dispatch(&iface->arbiter, 1, uct_mm_ep_process_pending, NULL);

    return count;
}

static ucs_status_t uct_mm_iface_event_fd_get(uct_iface_h tl_iface, int *fd_p)
{
    *fd_p = ucs_derived_of(tl_iface, uct_mm_iface_t)->signal_fd;
    return UCS_OK;
}

static ucs_status_t uct_mm_iface_event_fd_arm(uct_iface_h tl_iface,
                                              unsigned events)
{
    uct_mm_iface_t *iface = ucs_derived_of(tl_iface, uct_mm_iface_t);
    char dummy[UCT_MM_IFACE_MAX_SIG_EVENTS]; /* pop multiple signals at once */
    int ret;

    ret = recvfrom(iface->signal_fd, &dummy, sizeof(dummy), 0, NULL, 0);
    if (ret > 0) {
        return UCS_ERR_BUSY;
    } else if (ret == -1) {
        if (errno == EAGAIN) {
            return UCS_OK;
        } else if (errno == EINTR) {
            return UCS_ERR_BUSY;
        } else {
            ucs_error("failed to retrieve message from signal pipe: %m");
            return UCS_ERR_IO_ERROR;
        }
    } else {
        ucs_assert(ret == 0);
        return UCS_OK;
    }
}

static UCS_CLASS_DECLARE_DELETE_FUNC(uct_mm_iface_t, uct_iface_t);

static uct_iface_ops_t uct_mm_iface_ops = {
    .ep_put_short             = uct_sm_ep_put_short,
    .ep_put_bcopy             = uct_sm_ep_put_bcopy,
    .ep_get_bcopy             = uct_sm_ep_get_bcopy,
    .ep_am_short              = uct_mm_ep_am_short,
    .ep_am_bcopy              = uct_mm_ep_am_bcopy,
    .ep_atomic_cswap64        = uct_sm_ep_atomic_cswap64,
    .ep_atomic64_post         = uct_sm_ep_atomic64_post,
    .ep_atomic64_fetch        = uct_sm_ep_atomic64_fetch,
    .ep_atomic_cswap32        = uct_sm_ep_atomic_cswap32,
    .ep_atomic32_post         = uct_sm_ep_atomic32_post,
    .ep_atomic32_fetch        = uct_sm_ep_atomic32_fetch,
    .ep_pending_add           = uct_mm_ep_pending_add,
    .ep_pending_purge         = uct_mm_ep_pending_purge,
    .ep_flush                 = uct_mm_ep_flush,
    .ep_fence                 = uct_sm_ep_fence,
    .ep_create                = UCS_CLASS_NEW_FUNC_NAME(uct_mm_ep_t),
    .ep_destroy               = UCS_CLASS_DELETE_FUNC_NAME(uct_mm_ep_t),
    .iface_flush              = uct_mm_iface_flush,
    .iface_fence              = uct_sm_iface_fence,
    .iface_progress_enable    = uct_base_iface_progress_enable,
    .iface_progress_disable   = uct_base_iface_progress_disable,
    .iface_progress           = (uct_iface_progress_func_t)uct_mm_iface_progress,
    .iface_event_fd_get       = uct_mm_iface_event_fd_get,
    .iface_event_arm          = uct_mm_iface_event_fd_arm,
    .iface_close              = UCS_CLASS_DELETE_FUNC_NAME(uct_mm_iface_t),
    .iface_query              = uct_mm_iface_query,
    .iface_get_device_address = uct_sm_iface_get_device_address,
    .iface_get_address        = uct_mm_iface_get_address,
    .iface_is_reachable       = uct_mm_iface_is_reachable
};

static void uct_mm_iface_recv_desc_init(uct_iface_h tl_iface, void *obj,
                                        uct_mem_h memh)
{
    uct_mm_iface_t     *iface = ucs_derived_of(tl_iface, uct_mm_iface_t);
    uct_mm_recv_desc_t *desc  = obj;
    uct_mm_seg_t        *seg  = memh;
    size_t offset;

    if (seg->length > UINT_MAX) {
        ucs_error("mm: shared memory segment length cannot exceed %u", UINT_MAX);
        desc->info.seg_id   = UINT64_MAX;
        desc->info.seg_size = 0;
        desc->info.offset   = 0;
        return;
    }

    offset = UCS_PTR_BYTE_DIFF(seg->address, desc + 1) + iface->rx_headroom;
    ucs_assert(offset <= UINT_MAX);

    desc->info.seg_id   = seg->seg_id;
    desc->info.seg_size = seg->length;
    desc->info.offset   = offset;
}

static void uct_mm_iface_free_rx_descs(uct_mm_iface_t *iface, unsigned num_elems)
{
    uct_mm_fifo_element_t *elem;
    uct_mm_recv_desc_t *desc;
    unsigned i;

    for (i = 0; i < num_elems; i++) {
        elem = UCT_MM_IFACE_GET_FIFO_ELEM(iface, iface->recv_fifo_elems, i);
        desc = (uct_mm_recv_desc_t*)UCS_PTR_BYTE_OFFSET(elem->desc_data,
                                                        -iface->rx_headroom) - 1;
        ucs_mpool_put(desc);
    }
}

void uct_mm_iface_set_fifo_ptrs(void *fifo_mem, uct_mm_fifo_ctl_t **fifo_ctl_p,
                                void **fifo_elems_p)
{
    uct_mm_fifo_ctl_t *fifo_ctl;

    /* initiate the the uct_mm_fifo_ctl struct, holding the head and the tail */
    fifo_ctl = (uct_mm_fifo_ctl_t*)ucs_align_up_pow2
                    ((uintptr_t)fifo_mem, UCS_SYS_CACHE_LINE_SIZE);

    /* Make sure head and tail are cache-aligned, and not on same cacheline, to
     * avoid false-sharing.
     */
    ucs_assert_always(
        (((uintptr_t)&fifo_ctl->head) % UCS_SYS_CACHE_LINE_SIZE) == 0);
    ucs_assert_always(
        (((uintptr_t)&fifo_ctl->tail) % UCS_SYS_CACHE_LINE_SIZE) == 0);
    ucs_assert_always(
        ((uintptr_t)&fifo_ctl->tail - (uintptr_t)&fifo_ctl->head) >= UCS_SYS_CACHE_LINE_SIZE);

    /* initiate the pointer to the beginning of the first FIFO element */
    *fifo_ctl_p   = fifo_ctl;
    *fifo_elems_p = UCS_PTR_BYTE_OFFSET(fifo_ctl, UCT_MM_FIFO_CTL_SIZE);
}

static ucs_status_t uct_mm_iface_create_signal_fd(uct_mm_iface_t *iface)
{
    ucs_status_t status;
    socklen_t addrlen;
    struct sockaddr_un bind_addr;
    int ret;

    /* Create a UNIX domain socket to send and receive wakeup signal from remote processes */
    iface->signal_fd = socket(AF_UNIX, SOCK_DGRAM, 0);
    if (iface->signal_fd < 0) {
        ucs_error("Failed to create unix domain socket for signal: %m");
        status = UCS_ERR_IO_ERROR;
        goto err;
    }

    /* Set the signal socket to non-blocking mode */
    status = ucs_sys_fcntl_modfl(iface->signal_fd, O_NONBLOCK, 0);
    if (status != UCS_OK) {
        goto err_close;
    }

    /* Bind the signal socket to automatic address */
    bind_addr.sun_family = AF_UNIX;
    memset(bind_addr.sun_path, 0, sizeof(bind_addr.sun_path));
    ret = bind(iface->signal_fd, (struct sockaddr*)&bind_addr, sizeof(sa_family_t));
    if (ret < 0) {
        ucs_error("Failed to auto-bind unix domain socket: %m");
        status = UCS_ERR_IO_ERROR;
        goto err_close;
    }

    /* Share the socket address on the FIFO control area, so we would not have
     * to enlarge the interface address size.
     */
    addrlen = sizeof(struct sockaddr_un);
    memset(&iface->recv_fifo_ctl->signal_sockaddr, 0, addrlen);
    ret = getsockname(iface->signal_fd,
                      (struct sockaddr *)ucs_unaligned_ptr(&iface->recv_fifo_ctl->signal_sockaddr),
                      &addrlen);
    if (ret < 0) {
        ucs_error("Failed to retrieve unix domain socket address: %m");
        status = UCS_ERR_IO_ERROR;
        goto err_close;
    }

    iface->recv_fifo_ctl->signal_addrlen = addrlen;
    return UCS_OK;

err_close:
    close(iface->signal_fd);
err:
    return status;
}

static void uct_mm_iface_log_created(uct_mm_iface_t *iface)
{
    uct_mm_seg_t UCS_V_UNUSED *seg = iface->recv_fifo_mem.memh;

    ucs_debug("created mm iface %p FIFO id 0x%lx va %p size %zu (%u x %u elems)",
              iface, seg->seg_id, seg->address, seg->length,
              iface->config.fifo_elem_size, iface->config.fifo_size);
}

static UCS_CLASS_INIT_FUNC(uct_mm_iface_t, uct_md_h md, uct_worker_h worker,
                           const uct_iface_params_t *params,
                           const uct_iface_config_t *tl_config)
{
    uct_mm_iface_config_t *mm_config =
                    ucs_derived_of(tl_config, uct_mm_iface_config_t);
    uct_mm_fifo_element_t* fifo_elem_p;
    ucs_status_t status;
    unsigned i;

    UCS_CLASS_CALL_SUPER_INIT(uct_sm_iface_t, &uct_mm_iface_ops, md,
                              worker, params, tl_config);

    if (ucs_derived_of(worker, uct_priv_worker_t)->thread_mode == UCS_THREAD_MODE_MULTI) {
        ucs_error("Shared memory transport does not support multi-threaded worker");
        return UCS_ERR_INVALID_PARAM;
    }

    /* check that the fifo size, from the user, is a power of two and bigger than 1 */
    if ((mm_config->fifo_size <= 1) || ucs_is_pow2(mm_config->fifo_size) != 1) {
        ucs_error("The MM FIFO size must be a power of two and bigger than 1.");
        status = UCS_ERR_INVALID_PARAM;
        goto err;
    }

    /* check the value defining the FIFO batch release */
    if ((mm_config->release_fifo_factor < 0) || (mm_config->release_fifo_factor >= 1)) {
        ucs_error("The MM release FIFO factor must be: (0 =< factor < 1).");
        status = UCS_ERR_INVALID_PARAM;
        goto err;
    }

    /* check the value defining the size of the FIFO element */
    if (mm_config->fifo_elem_size <= sizeof(uct_mm_fifo_element_t)) {
        ucs_error("The UCX_MM_FIFO_ELEM_SIZE parameter (%u) must be larger "
                  "than the FIFO element header size (%ld bytes).",
                  mm_config->fifo_elem_size, sizeof(uct_mm_fifo_element_t));
        status = UCS_ERR_INVALID_PARAM;
        goto err;
    }

    self->config.fifo_size         = mm_config->fifo_size;
    self->config.fifo_elem_size    = mm_config->fifo_elem_size;
    self->config.seg_size          = mm_config->seg_size;
    /* cppcheck-suppress internalAstError */
    self->fifo_release_factor_mask = UCS_MASK(ucs_ilog2(ucs_max((int)
                                     (mm_config->fifo_size * mm_config->release_fifo_factor),
                                     1)));
    self->fifo_mask                = mm_config->fifo_size - 1;
    self->fifo_shift               = ucs_count_trailing_zero_bits(mm_config->fifo_size);
    self->rx_headroom              = (params->field_mask &
                                      UCT_IFACE_PARAM_FIELD_RX_HEADROOM) ?
                                     params->rx_headroom : 0;
    self->release_desc.cb          = uct_mm_iface_release_desc;

    /* Allocate the receive FIFO */
    status = uct_iface_mem_alloc(&self->super.super.super,
                                 UCT_MM_GET_FIFO_SIZE(self),
                                 UCT_MD_MEM_ACCESS_ALL, "mm_recv_fifo",
                                 &self->recv_fifo_mem);
    if (status != UCS_OK) {
        ucs_error("mm_iface failed to allocate receive FIFO");
        return status;
    }

    uct_mm_iface_set_fifo_ptrs(self->recv_fifo_mem.address,
                               &self->recv_fifo_ctl, &self->recv_fifo_elems);
    self->recv_fifo_ctl->head = 0;
    self->recv_fifo_ctl->tail = 0;
    self->read_index          = 0;

    /* create a unix file descriptor to receive event notifications */
    status = uct_mm_iface_create_signal_fd(self);
    if (status != UCS_OK) {
        goto err_free_fifo;
    }

    /* create a memory pool for receive descriptors */
    status = uct_iface_mpool_init(&self->super.super,
                                  &self->recv_desc_mp,
                                  sizeof(uct_mm_recv_desc_t) + self->rx_headroom +
                                  self->config.seg_size,
                                  sizeof(uct_mm_recv_desc_t),
                                  UCS_SYS_CACHE_LINE_SIZE,
                                  &mm_config->mp,
                                  512,
                                  uct_mm_iface_recv_desc_init,
                                  "mm_recv_desc");
    if (status != UCS_OK) {
        ucs_error("failed to create a receive descriptor memory pool for the MM transport");
        goto err_close_signal_fd;
    }

    ucs_mpool_grow(&self->recv_desc_mp, mm_config->fifo_size * 2);

    /* set the first receive descriptor */
    self->last_recv_desc = ucs_mpool_get(&self->recv_desc_mp);
    VALGRIND_MAKE_MEM_DEFINED(self->last_recv_desc, sizeof(*(self->last_recv_desc)));
    if (self->last_recv_desc == NULL) {
        ucs_error("failed to get the first receive descriptor");
        status = UCS_ERR_NO_RESOURCE;
        goto destroy_recv_mpool;
    }

    /* initiate the owner bit in all the FIFO elements and assign a receive descriptor
     * per every FIFO element */
    for (i = 0; i < mm_config->fifo_size; i++) {
        fifo_elem_p = UCT_MM_IFACE_GET_FIFO_ELEM(self, self->recv_fifo_elems, i);
        fifo_elem_p->flags = UCT_MM_FIFO_ELEM_FLAG_OWNER;

        status = uct_mm_assign_desc_to_fifo_elem(self, fifo_elem_p, 1);
        if (status != UCS_OK) {
            ucs_error("failed to allocate a descriptor for MM");
            goto destroy_descs;
        }
    }

    ucs_arbiter_init(&self->arbiter);
    uct_mm_iface_log_created(self);

    return UCS_OK;

destroy_descs:
    uct_mm_iface_free_rx_descs(self, i);
    ucs_mpool_put(self->last_recv_desc);
destroy_recv_mpool:
    ucs_mpool_cleanup(&self->recv_desc_mp, 1);
err_close_signal_fd:
    close(self->signal_fd);
err_free_fifo:
    uct_iface_mem_free(&self->recv_fifo_mem);
err:
    return status;
}

static UCS_CLASS_CLEANUP_FUNC(uct_mm_iface_t)
{
    uct_base_iface_progress_disable(&self->super.super.super,
                                    UCT_PROGRESS_SEND | UCT_PROGRESS_RECV);

    /* return all the descriptors that are now 'assigned' to the FIFO,
     * to their mpool */
    uct_mm_iface_free_rx_descs(self, self->config.fifo_size);

    ucs_mpool_put(self->last_recv_desc);
    ucs_mpool_cleanup(&self->recv_desc_mp, 1);
    close(self->signal_fd);
    uct_iface_mem_free(&self->recv_fifo_mem);
    ucs_arbiter_cleanup(&self->arbiter);
}

UCS_CLASS_DEFINE(uct_mm_iface_t, uct_base_iface_t);

UCS_CLASS_DEFINE_NEW_FUNC(uct_mm_iface_t, uct_iface_t, uct_md_h, uct_worker_h,
                          const uct_iface_params_t*, const uct_iface_config_t*);
static UCS_CLASS_DEFINE_DELETE_FUNC(uct_mm_iface_t, uct_iface_t);