Blob Blame History Raw
/**
* Copyright (C) Mellanox Technologies Ltd. 2001-2015.  ALL RIGHTS RESERVED.
* Copyright (C) ARM Ltd. 2016-2017.  ALL RIGHTS RESERVED.
*
* See file LICENSE for terms.
*/

#ifdef HAVE_CONFIG_H
#  include "config.h"
#endif

#include "ucp_am.h"
#include "ucp_worker.h"
#include "ucp_mm.h"
#include "ucp_request.inl"

#include <ucp/wireup/address.h>
#include <ucp/wireup/wireup_cm.h>
#include <ucp/wireup/wireup_ep.h>
#include <ucp/tag/eager.h>
#include <ucp/tag/offload.h>
#include <ucp/stream/stream.h>
#include <ucs/config/parser.h>
#include <ucs/datastruct/mpool.inl>
#include <ucs/datastruct/queue.h>
#include <ucs/type/cpu_set.h>
#include <ucs/sys/string.h>
#include <ucs/arch/atomic.h>
#include <sys/poll.h>
#include <sys/eventfd.h>
#include <sys/epoll.h>


#define UCP_WORKER_HEADROOM_SIZE \
    (sizeof(ucp_recv_desc_t) + UCP_WORKER_HEADROOM_PRIV_SIZE)

typedef enum ucp_worker_event_fd_op {
    UCP_WORKER_EPFD_OP_ADD,
    UCP_WORKER_EPFD_OP_DEL
} ucp_worker_event_fd_op_t;

#if ENABLE_STATS
static ucs_stats_class_t ucp_worker_tm_offload_stats_class = {
    .name           = "tag_offload",
    .num_counters   = UCP_WORKER_STAT_TAG_OFFLOAD_LAST,
    .counter_names  = {
        [UCP_WORKER_STAT_TAG_OFFLOAD_POSTED]           = "posted",
        [UCP_WORKER_STAT_TAG_OFFLOAD_MATCHED]          = "matched",
        [UCP_WORKER_STAT_TAG_OFFLOAD_MATCHED_SW_RNDV]  = "matched_sw_rndv",
        [UCP_WORKER_STAT_TAG_OFFLOAD_CANCELED]         = "canceled",
        [UCP_WORKER_STAT_TAG_OFFLOAD_BLOCK_TAG_EXCEED] = "block_tag_exceed",
        [UCP_WORKER_STAT_TAG_OFFLOAD_BLOCK_NON_CONTIG] = "block_non_contig",
        [UCP_WORKER_STAT_TAG_OFFLOAD_BLOCK_WILDCARD]   = "block_wildcard",
        [UCP_WORKER_STAT_TAG_OFFLOAD_BLOCK_SW_PEND]    = "block_sw_pend",
        [UCP_WORKER_STAT_TAG_OFFLOAD_BLOCK_NO_IFACE]   = "block_no_iface",
        [UCP_WORKER_STAT_TAG_OFFLOAD_BLOCK_MEM_REG]    = "block_mem_reg",
        [UCP_WORKER_STAT_TAG_OFFLOAD_RX_UNEXP_EGR]     = "rx_unexp_egr",
        [UCP_WORKER_STAT_TAG_OFFLOAD_RX_UNEXP_RNDV]    = "rx_unexp_rndv",
        [UCP_WORKER_STAT_TAG_OFFLOAD_RX_UNEXP_SW_RNDV] = "rx_unexp_sw_rndv"
    }
};

static ucs_stats_class_t ucp_worker_stats_class = {
    .name           = "ucp_worker",
    .num_counters   = UCP_WORKER_STAT_LAST,
    .counter_names  = {
        [UCP_WORKER_STAT_TAG_RX_EAGER_MSG]         = "rx_eager_msg",
        [UCP_WORKER_STAT_TAG_RX_EAGER_SYNC_MSG]    = "rx_sync_msg",
        [UCP_WORKER_STAT_TAG_RX_EAGER_CHUNK_EXP]   = "rx_eager_chunk_exp",
        [UCP_WORKER_STAT_TAG_RX_EAGER_CHUNK_UNEXP] = "rx_eager_chunk_unexp",
        [UCP_WORKER_STAT_TAG_RX_RNDV_EXP]          = "rx_rndv_rts_exp",
        [UCP_WORKER_STAT_TAG_RX_RNDV_UNEXP]        = "rx_rndv_rts_unexp"
    }
};
#endif


ucs_mpool_ops_t ucp_am_mpool_ops = {
    .chunk_alloc   = ucs_mpool_hugetlb_malloc,
    .chunk_release = ucs_mpool_hugetlb_free,
    .obj_init      = ucs_empty_function,
    .obj_cleanup   = ucs_empty_function
};


ucs_mpool_ops_t ucp_reg_mpool_ops = {
    .chunk_alloc   = ucp_reg_mpool_malloc,
    .chunk_release = ucp_reg_mpool_free,
    .obj_init      = ucp_mpool_obj_init,
    .obj_cleanup   = ucs_empty_function
};

ucs_mpool_ops_t ucp_frag_mpool_ops = {
    .chunk_alloc   = ucp_frag_mpool_malloc,
    .chunk_release = ucp_frag_mpool_free,
    .obj_init      = ucp_mpool_obj_init,
    .obj_cleanup   = ucs_empty_function
};

void ucp_worker_iface_check_events(ucp_worker_iface_t *wiface, int force);

static UCS_F_ALWAYS_INLINE double
ucp_worker_iface_latency(ucp_worker_h worker, ucp_worker_iface_t *wiface)
{
    return wiface->attr.latency.overhead +
           wiface->attr.latency.growth * worker->context->config.est_num_eps;
}

static ucs_status_t ucp_worker_wakeup_ctl_fd(ucp_worker_h worker,
                                             ucp_worker_event_fd_op_t op,
                                             int event_fd)
{
    ucs_event_set_type_t events = UCS_EVENT_SET_EVREAD;
    ucs_status_t status;

    if (!(worker->context->config.features & UCP_FEATURE_WAKEUP)) {
        return UCS_OK;
    }

    if (worker->flags & UCP_WORKER_FLAG_EDGE_TRIGGERED) {
        events |= UCS_EVENT_SET_EDGE_TRIGGERED;
    }

    switch (op) {
    case UCP_WORKER_EPFD_OP_ADD:
        status = ucs_event_set_add(worker->event_set, event_fd,
                                   events, worker->user_data);
        break;
    case UCP_WORKER_EPFD_OP_DEL:
        status = ucs_event_set_del(worker->event_set, event_fd);
        break;
    default:
        ucs_bug("Unknown operation (%d) was passed", op);
        status = UCS_ERR_INVALID_PARAM;
        break;
    }

    return status;
}

static void ucp_worker_set_am_handlers(ucp_worker_iface_t *wiface, int is_proxy)
{
    ucp_worker_h worker   = wiface->worker;
    ucp_context_h context = worker->context;
    ucs_status_t status;
    unsigned am_id;

    ucs_trace_func("iface=%p is_proxy=%d", wiface->iface, is_proxy);

    for (am_id = 0; am_id < UCP_AM_ID_LAST; ++am_id) {
        if (!(wiface->attr.cap.flags & (UCT_IFACE_FLAG_AM_SHORT |
                                        UCT_IFACE_FLAG_AM_BCOPY |
                                        UCT_IFACE_FLAG_AM_ZCOPY))) {
            continue;
        }

        if (!(context->config.features & ucp_am_handlers[am_id].features)) {
            continue;
        }

        if (!(ucp_am_handlers[am_id].flags & UCT_CB_FLAG_ASYNC) &&
            !(wiface->attr.cap.flags & UCT_IFACE_FLAG_CB_SYNC))
        {
            /* Do not register a sync callback on interface which does not
             * support it. The transport selection logic should not use async
             * transports for protocols with sync active message handlers.
             */
            continue;
        }

        if (is_proxy && (ucp_am_handlers[am_id].proxy_cb != NULL)) {
            /* we care only about sync active messages, and this also makes sure
             * the counter is not accessed from another thread.
             */
            ucs_assert(!(ucp_am_handlers[am_id].flags & UCT_CB_FLAG_ASYNC));
            status = uct_iface_set_am_handler(wiface->iface, am_id,
                                              ucp_am_handlers[am_id].proxy_cb,
                                              wiface,
                                              ucp_am_handlers[am_id].flags);
        } else {
            status = uct_iface_set_am_handler(wiface->iface, am_id,
                                              ucp_am_handlers[am_id].cb,
                                              worker,
                                              ucp_am_handlers[am_id].flags);
        }
        if (status != UCS_OK) {
            ucs_fatal("failed to set active message handler id %d: %s", am_id,
                      ucs_status_string(status));
        }
    }
}

static ucs_status_t ucp_stub_am_handler(void *arg, void *data, size_t length,
                                        unsigned flags)
{
    ucp_worker_h worker = arg;
    ucs_trace("worker %p: drop message", worker);
    return UCS_OK;
}

static void ucp_worker_remove_am_handlers(ucp_worker_h worker)
{
    ucp_context_h context = worker->context;
    ucp_worker_iface_t *wiface;
    ucp_rsc_index_t iface_id;
    unsigned am_id;

    ucs_debug("worker %p: remove active message handlers", worker);

    for (iface_id = 0; iface_id < worker->num_ifaces; ++iface_id) {
        wiface = worker->ifaces[iface_id];
        if (!(wiface->attr.cap.flags & (UCT_IFACE_FLAG_AM_SHORT |
                                        UCT_IFACE_FLAG_AM_BCOPY |
                                        UCT_IFACE_FLAG_AM_ZCOPY))) {
            continue;
        }
        for (am_id = 0; am_id < UCP_AM_ID_LAST; ++am_id) {
            if (context->config.features & ucp_am_handlers[am_id].features) {
                (void)uct_iface_set_am_handler(wiface->iface,
                                               am_id, ucp_stub_am_handler,
                                               worker, UCT_CB_FLAG_ASYNC);
            }
        }
    }
}

static void ucp_worker_am_tracer(void *arg, uct_am_trace_type_t type,
                                 uint8_t id, const void *data, size_t length,
                                 char *buffer, size_t max)
{
    ucp_worker_h worker = arg;
    ucp_am_tracer_t tracer;

    if (id < UCP_AM_ID_LAST) {
        tracer = ucp_am_handlers[id].tracer;
        if (tracer != NULL) {
            tracer(worker, type, id, data, length, buffer, max);
        }
    }
}

static ucs_status_t ucp_worker_wakeup_init(ucp_worker_h worker,
                                           const ucp_worker_params_t *params)
{
    ucp_context_h context = worker->context;
    unsigned events;
    ucs_status_t status;

    if (!(context->config.features & UCP_FEATURE_WAKEUP)) {
        worker->event_fd   = -1;
        worker->event_set  = NULL;
        worker->eventfd    = -1;
        worker->uct_events = 0;
        status = UCS_OK;
        goto out;
    }

    if (params->field_mask & UCP_WORKER_PARAM_FIELD_EVENTS) {
        events = params->events;
    } else {
        events = UCP_WAKEUP_RMA | UCP_WAKEUP_AMO | UCP_WAKEUP_TAG_SEND |
                 UCP_WAKEUP_TAG_RECV | UCP_WAKEUP_TX | UCP_WAKEUP_RX;
    }

    if (params->field_mask & UCP_WORKER_PARAM_FIELD_EVENT_FD) {
        worker->flags |= UCP_WORKER_FLAG_EXTERNAL_EVENT_FD;
        status = ucs_event_set_create_from_fd(&worker->event_set,
                                              params->event_fd);
    } else {
        status = ucs_event_set_create(&worker->event_set);
    }
    if (status != UCS_OK) {
        goto out;
    }

    status = ucs_event_set_fd_get(worker->event_set, &worker->event_fd);
    if (status != UCS_OK) {
        goto err_cleanup_event_set;
    }

    if (events & UCP_WAKEUP_EDGE) {
        worker->flags |= UCP_WORKER_FLAG_EDGE_TRIGGERED;
    }

    worker->eventfd = eventfd(0, EFD_CLOEXEC | EFD_NONBLOCK);
    if (worker->eventfd == -1) {
        ucs_error("Failed to create event fd: %m");
        status = UCS_ERR_IO_ERROR;
        goto err_cleanup_event_set;
    }

    ucp_worker_wakeup_ctl_fd(worker, UCP_WORKER_EPFD_OP_ADD, worker->eventfd);

    worker->uct_events = 0;

    /* FIXME: any TAG flag initializes all types of completion because of
     *        possible issues in RNDV protocol. The optimization may be
     *        implemented with using of separated UCP descriptors or manual
     *        signaling in RNDV and similar cases, see conversation in PR #1277
     */
    if ((events & UCP_WAKEUP_TAG_SEND) ||
        ((events & UCP_WAKEUP_TAG_RECV) &&
         (context->config.ext.rndv_thresh != UCS_MEMUNITS_INF)))
    {
        worker->uct_events |= UCT_EVENT_SEND_COMP;
    }

    if (events & (UCP_WAKEUP_TAG_RECV | UCP_WAKEUP_RX)) {
        worker->uct_events |= UCT_EVENT_RECV;
    }

    if (events & (UCP_WAKEUP_RMA | UCP_WAKEUP_AMO | UCP_WAKEUP_TX)) {
        worker->uct_events |= UCT_EVENT_SEND_COMP;
    }

    return UCS_OK;

err_cleanup_event_set:
    ucs_event_set_cleanup(worker->event_set);
    worker->event_set = NULL;
    worker->event_fd  = -1;
out:
    return status;
}

static void ucp_worker_wakeup_cleanup(ucp_worker_h worker)
{
    if (worker->event_set != NULL) {
        ucs_assert(worker->event_fd != -1);
        ucs_event_set_cleanup(worker->event_set);
        worker->event_set = NULL;
        worker->event_fd  = -1;
    }
    if (worker->eventfd != -1) {
        close(worker->eventfd);
    }
}

static void ucp_worker_iface_disarm(ucp_worker_iface_t *wiface)
{
    ucs_status_t status;

    if (wiface->flags & UCP_WORKER_IFACE_FLAG_ON_ARM_LIST) {
        status = ucp_worker_wakeup_ctl_fd(wiface->worker,
                                          UCP_WORKER_EPFD_OP_DEL,
                                          wiface->event_fd);
        ucs_assert_always(status == UCS_OK);
        ucs_list_del(&wiface->arm_list);
        wiface->flags &= ~UCP_WORKER_IFACE_FLAG_ON_ARM_LIST;
    }
}

static ucs_status_t ucp_worker_wakeup_signal_fd(ucp_worker_h worker)
{
    uint64_t dummy = 1;
    int ret;

    ucs_trace_func("worker=%p fd=%d", worker, worker->eventfd);

    do {
        ret = write(worker->eventfd, &dummy, sizeof(dummy));
        if (ret == sizeof(dummy)) {
            return UCS_OK;
        } else if (ret == -1) {
            if (errno == EAGAIN) {
                return UCS_OK;
            } else if (errno != EINTR) {
                ucs_error("Signaling wakeup failed: %m");
                return UCS_ERR_IO_ERROR;
            }
        } else {
            ucs_assert(ret == 0);
        }
    } while (ret == 0);

    return UCS_OK;
}

void ucp_worker_signal_internal(ucp_worker_h worker)
{
    if (worker->context->config.features & UCP_FEATURE_WAKEUP) {
        ucp_worker_wakeup_signal_fd(worker);
    }
}

static unsigned ucp_worker_iface_err_handle_progress(void *arg)
{
    ucp_worker_err_handle_arg_t *err_handle_arg = arg;
    ucp_worker_h worker                         = err_handle_arg->worker;
    ucp_ep_h ucp_ep                             = err_handle_arg->ucp_ep;
    uct_ep_h uct_ep                             = err_handle_arg->uct_ep;
    ucs_status_t status                         = err_handle_arg->status;
    ucp_lane_index_t failed_lane                = err_handle_arg->failed_lane;
    ucp_lane_index_t lane;
    ucp_ep_config_key_t key;

    UCS_ASYNC_BLOCK(&worker->async);

    ucs_debug("ep %p: handle error on lane[%d]=%p: %s",
              ucp_ep, failed_lane, uct_ep, ucs_status_string(status));

    ucs_assert(ucp_ep->flags & UCP_EP_FLAG_FAILED);

    /* Destroy all lanes except failed one since ucp_ep becomes unusable as well */
    for (lane = 0; lane < ucp_ep_num_lanes(ucp_ep); ++lane) {
        if (ucp_ep->uct_eps[lane] == NULL) {
            continue;
        }

        /* Purge pending queue */
        ucs_trace("ep %p: purge pending on uct_ep[%d]=%p", ucp_ep, lane,
                  ucp_ep->uct_eps[lane]);
        uct_ep_pending_purge(ucp_ep->uct_eps[lane], ucp_ep_err_pending_purge,
                             UCS_STATUS_PTR(status));

        if (lane != failed_lane) {
            ucs_trace("ep %p: destroy uct_ep[%d]=%p", ucp_ep, lane,
                      ucp_ep->uct_eps[lane]);
            uct_ep_destroy(ucp_ep->uct_eps[lane]);
            ucp_ep->uct_eps[lane] = NULL;
        }
    }

    /* Move failed lane to index 0 */
    if ((failed_lane != 0) && (failed_lane != UCP_NULL_LANE)) {
        ucp_ep->uct_eps[0] = ucp_ep->uct_eps[failed_lane];
        ucp_ep->uct_eps[failed_lane] = NULL;
    }

    /* NOTE: if failed ep is wireup auxiliary/sockaddr then we need to replace
     *       the lane with failed ep and destroy wireup ep
     */
    if (ucp_ep->uct_eps[0] != uct_ep) {
        ucs_assert(ucp_wireup_ep_is_owner(ucp_ep->uct_eps[0], uct_ep));
        ucp_wireup_ep_disown(ucp_ep->uct_eps[0], uct_ep);
        ucs_trace("ep %p: destroy failed wireup ep %p", ucp_ep, ucp_ep->uct_eps[0]);
        uct_ep_destroy(ucp_ep->uct_eps[0]);
        ucp_ep->uct_eps[0] = uct_ep;
    }

    /* Redirect all lanes to failed one */
    key                    = ucp_ep_config(ucp_ep)->key;
    key.am_lane            = 0;
    key.wireup_lane        = 0;
    key.tag_lane           = 0;
    key.rma_lanes[0]       = 0;
    key.rma_bw_lanes[0]    = 0;
    key.amo_lanes[0]       = 0;
    key.lanes[0].rsc_index = UCP_NULL_RESOURCE;
    key.num_lanes          = 1;
    key.status             = status;

    status = ucp_worker_get_ep_config(worker, &key, 0, &ucp_ep->cfg_index);
    if (status != UCS_OK) {
        ucs_fatal("ep %p: could not change configuration to error state: %s",
                  ucp_ep, ucs_status_string(status));
    }

    ucp_ep->am_lane = 0;

    if (!(ucp_ep->flags & UCP_EP_FLAG_USED)) {
        ucs_debug("ep %p: destroy internal endpoint due to peer failure", ucp_ep);
        ucp_ep_disconnected(ucp_ep, 1);
    } else {
        ucp_ep_invoke_err_cb(ucp_ep, key.status);
    }

    ucs_free(err_handle_arg);
    UCS_ASYNC_UNBLOCK(&worker->async);
    return 1;
}

int ucp_worker_err_handle_remove_filter(const ucs_callbackq_elem_t *elem,
                                        void *arg)
{
    ucp_worker_err_handle_arg_t *err_handle_arg = elem->arg;

    if ((elem->cb == ucp_worker_iface_err_handle_progress) &&
        (err_handle_arg->ucp_ep == arg)) {
        /* release err handling argument to avoid memory leak */
        ucs_free(err_handle_arg);
        return 1;
    }

    return 0;
}

ucs_status_t ucp_worker_set_ep_failed(ucp_worker_h worker, ucp_ep_h ucp_ep,
                                      uct_ep_h uct_ep, ucp_lane_index_t lane,
                                      ucs_status_t status)
{
    uct_worker_cb_id_t          prog_id    = UCS_CALLBACKQ_ID_NULL;
    ucs_status_t                ret_status = UCS_OK;
    ucp_rsc_index_t             rsc_index;
    uct_tl_resource_desc_t      *tl_rsc;
    ucp_worker_err_handle_arg_t *err_handle_arg;

    if (ucp_ep->flags & UCP_EP_FLAG_FAILED) {
        goto out_ok;
    }

    /* In case if this is a local failure we need to notify remote side */
    if (ucp_ep_is_cm_local_connected(ucp_ep)) {
        ucp_ep_cm_disconnect_cm_lane(ucp_ep);
    }

    /* set endpoint to failed to prevent wireup_ep switch */
    ucp_ep->flags |= UCP_EP_FLAG_FAILED;

    if (ucp_ep_config(ucp_ep)->key.err_mode == UCP_ERR_HANDLING_MODE_NONE) {
        /* NOTE: if user has not requested error handling on the endpoint,
         *       the failure is considered unhandled */
        ret_status = status;
        goto out;
    }

    err_handle_arg = ucs_malloc(sizeof(*err_handle_arg), "ucp_worker_err_handle_arg");
    if (err_handle_arg == NULL) {
        ucs_error("failed to allocate ucp_worker_err_handle_arg");
        ret_status = UCS_ERR_NO_MEMORY;
        goto out;
    }

    err_handle_arg->worker      = worker;
    err_handle_arg->ucp_ep      = ucp_ep;
    err_handle_arg->uct_ep      = uct_ep;
    err_handle_arg->status      = status;
    err_handle_arg->failed_lane = lane;

    /* invoke the rest of the error handling flow from the main thread */
    uct_worker_progress_register_safe(worker->uct,
                                      ucp_worker_iface_err_handle_progress,
                                      err_handle_arg, UCS_CALLBACKQ_FLAG_ONESHOT,
                                      &prog_id);

    if ((ucp_ep_ext_gen(ucp_ep)->err_cb == NULL) &&
        (ucp_ep->flags & UCP_EP_FLAG_USED)) {
        if (lane != UCP_NULL_LANE) {
            rsc_index = ucp_ep_get_rsc_index(ucp_ep, lane);
            tl_rsc    = &worker->context->tl_rscs[rsc_index].tl_rsc;
            ucs_error("error '%s' will not be handled for ep %p - "
                      UCT_TL_RESOURCE_DESC_FMT " since no error callback is installed",
                      ucs_status_string(status), ucp_ep,
                      UCT_TL_RESOURCE_DESC_ARG(tl_rsc));
        } else {
            ucs_assert(uct_ep == NULL);
            ucs_error("error '%s' occurred on wireup will not be handled for ep %p "
                      "since no error callback is installed",
                      ucs_status_string(status), ucp_ep);
        }
        ret_status = status;
        goto out;
    }

out_ok:
    ret_status = UCS_OK;

out:
    /* If the worker supports the UCP_FEATURE_WAKEUP feature, signal the user so
     * that he can wake-up on this event */
    ucp_worker_signal_internal(worker);

    return ret_status;
}

static ucs_status_t
ucp_worker_iface_error_handler(void *arg, uct_ep_h uct_ep, ucs_status_t status)
{
    ucp_worker_h worker         = (ucp_worker_h)arg;
    ucp_lane_index_t lane;
    ucs_status_t ret_status;
    ucp_ep_ext_gen_t *ep_ext;
    ucp_ep_h ucp_ep;

    UCS_ASYNC_BLOCK(&worker->async);

    ucs_debug("worker %p: error handler called for uct_ep %p: %s",
              worker, uct_ep, ucs_status_string(status));

    /* TODO: need to optimize uct_ep -> ucp_ep lookup */
    ucs_list_for_each(ep_ext, &worker->all_eps, ep_list) {
        ucp_ep = ucp_ep_from_ext_gen(ep_ext);
        for (lane = 0; lane < ucp_ep_num_lanes(ucp_ep); ++lane) {
            if ((uct_ep == ucp_ep->uct_eps[lane]) ||
                ucp_wireup_ep_is_owner(ucp_ep->uct_eps[lane], uct_ep)) {
                ret_status = ucp_worker_set_ep_failed(worker, ucp_ep, uct_ep,
                                                      lane, status);
                UCS_ASYNC_UNBLOCK(&worker->async);
                return ret_status;
            }
        }
    }

    ucs_error("no uct_ep_h %p associated with ucp_ep_h on ucp_worker_h %p",
              uct_ep, worker);
    UCS_ASYNC_UNBLOCK(&worker->async);

    return UCS_ERR_NO_ELEM;
}

void ucp_worker_iface_activate(ucp_worker_iface_t *wiface, unsigned uct_flags)
{
    ucp_worker_h worker = wiface->worker;
    ucs_status_t status;

    ucs_trace("activate iface %p acount=%u aifaces=%u", wiface->iface,
              wiface->activate_count, worker->num_active_ifaces);

    if (wiface->activate_count++ > 0) {
        return; /* was already activated */
    }

    /* Stop ongoing activation process, if such exists */
    uct_worker_progress_unregister_safe(worker->uct, &wiface->check_events_id);

    /* Set default active message handlers */
    ucp_worker_set_am_handlers(wiface, 0);

    /* Add to user wakeup */
    if (wiface->attr.cap.flags & UCP_WORKER_UCT_ALL_EVENT_CAP_FLAGS) {
        status = ucp_worker_wakeup_ctl_fd(worker, UCP_WORKER_EPFD_OP_ADD,
                                          wiface->event_fd);
        ucs_assert_always(status == UCS_OK);
        wiface->flags |= UCP_WORKER_IFACE_FLAG_ON_ARM_LIST;
        ucs_list_add_tail(&worker->arm_ifaces, &wiface->arm_list);
    }

    ++worker->num_active_ifaces;

    uct_iface_progress_enable(wiface->iface,
                              UCT_PROGRESS_SEND | UCT_PROGRESS_RECV | uct_flags);
}

static void ucp_worker_iface_deactivate(ucp_worker_iface_t *wiface, int force)
{
    ucs_trace("deactivate iface %p force=%d acount=%u aifaces=%u",
              wiface->iface, force, wiface->activate_count,
              wiface->worker->num_active_ifaces);

    if (!force) {
        ucs_assert(wiface->activate_count > 0);
        if (--wiface->activate_count > 0) {
            return; /* not completely deactivated yet */
        }
        --wiface->worker->num_active_ifaces;
    }

    /* Avoid progress on the interface to reduce overhead */
    uct_iface_progress_disable(wiface->iface,
                               UCT_PROGRESS_SEND | UCT_PROGRESS_RECV);

    /* Remove from user wakeup */
    ucp_worker_iface_disarm(wiface);

    /* Set proxy active message handlers to count receives */
    ucp_worker_set_am_handlers(wiface, 1);

    /* Prepare for next receive event */
    ucp_worker_iface_check_events(wiface, force);
}

void ucp_worker_iface_progress_ep(ucp_worker_iface_t *wiface)
{
    ucs_trace_func("iface=%p", wiface->iface);

    UCS_ASYNC_BLOCK(&wiface->worker->async);

    /* This function may be called from progress thread (such as when processing
     * wireup messages), so ask UCT to be thread-safe.
     */
    ucp_worker_iface_activate(wiface, UCT_PROGRESS_THREAD_SAFE);

    UCS_ASYNC_UNBLOCK(&wiface->worker->async);
}

void ucp_worker_iface_unprogress_ep(ucp_worker_iface_t *wiface)
{
    ucs_trace_func("iface=%p", wiface->iface);

    UCS_ASYNC_BLOCK(&wiface->worker->async);
    ucp_worker_iface_deactivate(wiface, 0);
    UCS_ASYNC_UNBLOCK(&wiface->worker->async);
}

/*
 * If active messages were received by am proxy handler, activate the interface.
 * Otherwise, arm the interface event and make sure that when an active message
 * is received in the future, the interface would be activated.
 */
static ucs_status_t ucp_worker_iface_check_events_do(ucp_worker_iface_t *wiface,
                                                     unsigned *progress_count)
{
    unsigned prev_recv_count;
    ucs_status_t status;

    ucs_trace_func("wiface=%p iface=%p", wiface, wiface->iface);

    if (wiface->activate_count > 0) {
        ucs_trace("iface %p already activated", wiface->iface);
        *progress_count = 0;
        return UCS_OK;
    }

    prev_recv_count = wiface->proxy_recv_count;

    *progress_count = uct_iface_progress(wiface->iface);
    if (prev_recv_count != wiface->proxy_recv_count) {
        /* Received relevant active messages, activate the interface */
        ucp_worker_iface_activate(wiface, 0);
        return UCS_OK;
    } else if (*progress_count == 0) {
        /* Arm the interface to wait for next event */
        ucs_assert(wiface->attr.cap.flags & UCP_WORKER_UCT_RECV_EVENT_CAP_FLAGS);
        status = uct_iface_event_arm(wiface->iface,
                                     UCP_WORKER_UCT_RECV_EVENT_ARM_FLAGS);
        if (status == UCS_OK) {
            ucs_trace("armed iface %p", wiface->iface);

            /* re-enable events, which were disabled by ucp_suspended_iface_event() */
            status = ucs_async_modify_handler(wiface->event_fd,
                                              UCS_EVENT_SET_EVREAD);
            if (status != UCS_OK) {
                ucs_fatal("failed to modify %d event handler to UCS_EVENT_SET_EVREAD: %s",
                          wiface->event_fd, ucs_status_string(status));
            }

            return UCS_OK;
        } else if (status != UCS_ERR_BUSY) {
            ucs_fatal("failed to arm iface %p: %s", wiface->iface,
                      ucs_status_string(status));
        } else {
            ucs_trace("arm iface %p returned BUSY", wiface->iface);
            return UCS_ERR_BUSY;
        }
    } else {
        ucs_trace("wiface %p progress returned %u, but no active messages were received",
                  wiface, *progress_count);
        return UCS_ERR_BUSY;
    }
}

static unsigned ucp_worker_iface_check_events_progress(void *arg)
{
    ucp_worker_iface_t *wiface = arg;
    ucp_worker_h worker = wiface->worker;
    unsigned progress_count;
    ucs_status_t status;

    ucs_trace_func("iface=%p, worker=%p", wiface->iface, worker);

    /* Check if we either had active messages or were able to arm the interface.
     * In these cases, the work is done and this progress callback can be removed.
     */
    UCS_ASYNC_BLOCK(&worker->async);
    status = ucp_worker_iface_check_events_do(wiface, &progress_count);
    if (status == UCS_OK) {
        uct_worker_progress_unregister_safe(worker->uct, &wiface->check_events_id);
    }
    UCS_ASYNC_UNBLOCK(&worker->async);

    return progress_count;
}

void ucp_worker_iface_check_events(ucp_worker_iface_t *wiface, int force)
{
    unsigned progress_count;
    ucs_status_t status;

    ucs_trace_func("iface=%p, force=%d", wiface->iface, force);

    if (force) {
        do {
            /* coverity wrongly resolves rc's progress to ucp_listener_conn_request_progress
             * which in turn releases wiface->iface. this leads coverity to assume
             * that ucp_worker_iface_check_events_do() dereferences a freed pointer
             * in the subsequent call in the following loop */
            /* coverity[freed_arg] */
            status = ucp_worker_iface_check_events_do(wiface, &progress_count);
            ucs_assert(progress_count == 0);
        } while (status == UCS_ERR_BUSY);
        ucs_assert(status == UCS_OK);
    } else {
        /* Check events on the main progress loop, to make this function safe to
         * call from async context, and avoid starvation of other progress callbacks.
         */
        uct_worker_progress_register_safe(wiface->worker->uct,
                                          ucp_worker_iface_check_events_progress,
                                          wiface, 0, &wiface->check_events_id);
    }
}

void ucp_worker_iface_event(int fd, void *arg)
{
    ucp_worker_iface_t *wiface = arg;
    ucp_worker_h worker = wiface->worker;
    ucs_status_t status;

    ucs_trace_func("fd=%d iface=%p", fd, wiface->iface);

    status = ucs_async_modify_handler(wiface->event_fd, 0);
    if (status != UCS_OK) {
        ucs_fatal("failed to modify %d event handler to <empty>: %s",
                  wiface->event_fd, ucs_status_string(status));
    }

    /* Do more work on the main thread */
    ucp_worker_iface_check_events(wiface, 0);

    /* Signal user wakeup, to report the first message on the interface */
    ucp_worker_signal_internal(worker);
}

static void ucp_worker_uct_iface_close(ucp_worker_iface_t *wiface)
{
    if (wiface->iface != NULL) {
        uct_iface_close(wiface->iface);
        wiface->iface = NULL;
    }
}

static int ucp_worker_iface_find_better(ucp_worker_h worker,
                                        ucp_worker_iface_t *wiface,
                                        ucp_rsc_index_t *better_index)
{
    ucp_context_h ctx = worker->context;
    ucp_rsc_index_t rsc_index;
    ucp_worker_iface_t *if_iter;
    uint64_t test_flags;
    double latency_iter, latency_cur, bw_cur;

    ucs_assert(wiface != NULL);

    latency_cur = ucp_worker_iface_latency(worker, wiface);
    bw_cur      = ucp_tl_iface_bandwidth(ctx, &wiface->attr.bandwidth);

    test_flags = wiface->attr.cap.flags & ~(UCT_IFACE_FLAG_CONNECT_TO_IFACE |
                                            UCT_IFACE_FLAG_CONNECT_TO_EP);

    for (rsc_index = 0; rsc_index < ctx->num_tls; ++rsc_index) {
        if_iter = worker->ifaces[rsc_index];

        /* Need to check resources which belong to the same device only */
        if ((ctx->tl_rscs[rsc_index].dev_index != ctx->tl_rscs[wiface->rsc_index].dev_index) ||
            (if_iter->flags & UCP_WORKER_IFACE_FLAG_UNUSED) ||
            (rsc_index == wiface->rsc_index)) {
            continue;
        }

        latency_iter = ucp_worker_iface_latency(worker, if_iter);

        /* Check that another iface: */
        if (/* 1. Supports all capabilities of the target iface (at least),
             *    except ...CONNECT_TO... caps. */
            ucs_test_all_flags(if_iter->attr.cap.flags, test_flags) &&
            /* 2. Has the same or better performance characteristics */
            (if_iter->attr.overhead <= wiface->attr.overhead) &&
            (ucp_tl_iface_bandwidth(ctx, &if_iter->attr.bandwidth) >= bw_cur) &&
            /* swap latencies in args list since less is better */
            (ucp_score_prio_cmp(latency_cur,  if_iter->attr.priority,
                                latency_iter, wiface->attr.priority) >= 0) &&
            /* 3. The found transport is scalable enough or both
             *    transport are unscalable */
            (ucp_is_scalable_transport(ctx, if_iter->attr.max_num_eps) ||
             !ucp_is_scalable_transport(ctx, wiface->attr.max_num_eps)))
        {
            *better_index = rsc_index;
            /* Do not check this iface anymore, because better one exists.
             * It helps to avoid the case when two interfaces with the same
             * caps and performance exclude each other. */
            wiface->flags |= UCP_WORKER_IFACE_FLAG_UNUSED;
            return 1;
        }
    }

    /* Better resource wasn't found */
    *better_index = 0;
    return 0;
}

/**
 * @brief Find the minimal possible set of tl interfaces for each device
 *
 * @param [in]  worker     UCP worker.
 * @param [out] tl_bitmap  Map of the relevant tl resources.
 *
 * @return Error code as defined by @ref ucs_status_t
 */
static void ucp_worker_select_best_ifaces(ucp_worker_h worker,
                                          uint64_t *tl_bitmap_p)
{
    ucp_context_h context = worker->context;
    uint64_t tl_bitmap    = 0;
    ucp_rsc_index_t repl_ifaces[UCP_MAX_RESOURCES];
    ucp_worker_iface_t *wiface;
    ucp_rsc_index_t tl_id, iface_id;

    /* For each iface check whether there is another iface, which:
     * 1. Supports at least the same capabilities
     * 2. Provides equivalent or better performance
     */
    for (tl_id = 0; tl_id < context->num_tls; ++tl_id) {
        wiface = worker->ifaces[tl_id];
        if (!ucp_worker_iface_find_better(worker, wiface, &repl_ifaces[tl_id])) {
            tl_bitmap |= UCS_BIT(tl_id);
        }
    }

    *tl_bitmap_p       = tl_bitmap;
    worker->num_ifaces = ucs_popcount(tl_bitmap);
    ucs_assert(worker->num_ifaces <= context->num_tls);

    if (worker->num_ifaces == context->num_tls) {
        return;
    }

    ucs_assert(worker->num_ifaces < context->num_tls);

    /* Some ifaces need to be closed */
    for (tl_id = 0, iface_id = 0; tl_id < context->num_tls; ++tl_id) {
        wiface = worker->ifaces[tl_id];
        if (tl_bitmap & UCS_BIT(tl_id)) {
            if (iface_id != tl_id) {
                worker->ifaces[iface_id] = wiface;
            }
            ++iface_id;
        } else {
            ucs_debug("closing resource[%d] "UCT_TL_RESOURCE_DESC_FMT
                      ", since resource[%d] "UCT_TL_RESOURCE_DESC_FMT
                      " is better, worker %p",
                      tl_id, UCT_TL_RESOURCE_DESC_ARG(&context->tl_rscs[tl_id].tl_rsc),
                      repl_ifaces[tl_id],
                      UCT_TL_RESOURCE_DESC_ARG(&context->tl_rscs[repl_ifaces[tl_id]].tl_rsc),
                      worker);
            /* Ifaces should not be initialized yet, just close it
             * (no need for cleanup) */
            ucp_worker_uct_iface_close(wiface);
            ucs_free(wiface);
        }
    }
}

/**
 * @brief  Open all resources as interfaces on this worker
 *
 * This routine opens interfaces on the tl resources according to the
 * bitmap in the context. If bitmap is not set, the routine opens interfaces
 * on all available resources and select the best ones. Then it caches obtained
 * bitmap on the context, so the next workers could use it instead of
 * constructing it themselves.
 *
 * @param [in]  worker     UCP worker.
 *
 * @return Error code as defined by @ref ucs_status_t
 */
static ucs_status_t ucp_worker_add_resource_ifaces(ucp_worker_h worker)
{
    ucp_context_h context = worker->context;
    ucp_tl_resource_desc_t *resource;
    uct_iface_params_t iface_params;
    ucp_rsc_index_t tl_id, iface_id;
    ucp_worker_iface_t *wiface;
    uint64_t ctx_tl_bitmap, tl_bitmap;
    unsigned num_ifaces;
    ucs_status_t status;

    /* If tl_bitmap is already set, just use it. Otherwise open ifaces on all
     * available resources and then select the best ones. */
    ctx_tl_bitmap  = context->tl_bitmap;
    if (ctx_tl_bitmap) {
        num_ifaces = ucs_popcount(ctx_tl_bitmap);
        tl_bitmap  = ctx_tl_bitmap;
    } else {
        num_ifaces = context->num_tls;
        tl_bitmap  = UCS_MASK(context->num_tls);
    }

    worker->ifaces = ucs_calloc(num_ifaces, sizeof(*worker->ifaces),
                                "ucp ifaces array");
    if (worker->ifaces == NULL) {
        ucs_error("failed to allocate worker ifaces");
        return UCS_ERR_NO_MEMORY;
    }

    worker->num_ifaces = num_ifaces;
    iface_id           = 0;

    ucs_for_each_bit(tl_id, tl_bitmap) {
        iface_params.field_mask = UCT_IFACE_PARAM_FIELD_OPEN_MODE;
        resource = &context->tl_rscs[tl_id];

        if (resource->flags & UCP_TL_RSC_FLAG_SOCKADDR) {
            iface_params.open_mode            = UCT_IFACE_OPEN_MODE_SOCKADDR_CLIENT;
        } else {
            iface_params.open_mode            = UCT_IFACE_OPEN_MODE_DEVICE;
            iface_params.field_mask          |= UCT_IFACE_PARAM_FIELD_DEVICE;
            iface_params.mode.device.tl_name  = resource->tl_rsc.tl_name;
            iface_params.mode.device.dev_name = resource->tl_rsc.dev_name;
        }

        status = ucp_worker_iface_open(worker, tl_id, &iface_params,
                                       &worker->ifaces[iface_id++]);
        if (status != UCS_OK) {
            return status;
        }
    }

    if (!ctx_tl_bitmap) {
        /* Context bitmap is not set, need to select the best tl resources */
        tl_bitmap = 0;
        ucp_worker_select_best_ifaces(worker, &tl_bitmap);
        ucs_assert(tl_bitmap);

        /* Cache tl_bitmap on the context, so the next workers would not need
         * to select best ifaces. */
        context->tl_bitmap = tl_bitmap;
        ucs_debug("selected tl bitmap: 0x%lx (%d tls)",
                  tl_bitmap, ucs_popcount(tl_bitmap));
    }

    worker->scalable_tl_bitmap = 0;
    ucs_for_each_bit(tl_id, context->tl_bitmap) {
        wiface = ucp_worker_iface(worker, tl_id);

        if (ucp_is_scalable_transport(context, wiface->attr.max_num_eps)) {
            worker->scalable_tl_bitmap |= UCS_BIT(tl_id);
        }
    }

    ucs_debug("selected scalable tl bitmap: 0x%lx (%d tls)",
              worker->scalable_tl_bitmap,
              ucs_popcount(worker->scalable_tl_bitmap));

    iface_id = 0;
    ucs_for_each_bit(tl_id, tl_bitmap) {
        status = ucp_worker_iface_init(worker, tl_id,
                                       worker->ifaces[iface_id++]);
        if (status != UCS_OK) {
            return status;
        }
    }

    return UCS_OK;
}

static void ucp_worker_close_ifaces(ucp_worker_h worker)
{
    ucp_rsc_index_t iface_id;
    ucp_worker_iface_t *wiface;

    UCS_ASYNC_BLOCK(&worker->async);
    for (iface_id = 0; iface_id < worker->num_ifaces; ++iface_id) {
        wiface = worker->ifaces[iface_id];
        if (wiface != NULL) {
            ucp_worker_iface_cleanup(wiface);
        }
    }
    ucs_free(worker->ifaces);
    UCS_ASYNC_UNBLOCK(&worker->async);
}

ucs_status_t ucp_worker_iface_open(ucp_worker_h worker, ucp_rsc_index_t tl_id,
                                   uct_iface_params_t *iface_params,
                                   ucp_worker_iface_t **wiface_p)
{
    ucp_context_h context            = worker->context;
    ucp_tl_resource_desc_t *resource = &context->tl_rscs[tl_id];
    uct_md_h md                      = context->tl_mds[resource->md_index].md;
    uct_iface_config_t *iface_config;
    const char *cfg_tl_name;
    ucp_worker_iface_t *wiface;
    ucs_status_t status;

    wiface = ucs_calloc(1, sizeof(*wiface), "ucp_iface");
    if (wiface == NULL) {
        return UCS_ERR_NO_MEMORY;
    }

    wiface->rsc_index        = tl_id;
    wiface->worker           = worker;
    wiface->event_fd         = -1;
    wiface->activate_count   = 0;
    wiface->check_events_id  = UCS_CALLBACKQ_ID_NULL;
    wiface->proxy_recv_count = 0;
    wiface->post_count       = 0;
    wiface->flags            = 0;

    /* Read interface or md configuration */
    if (resource->flags & UCP_TL_RSC_FLAG_SOCKADDR) {
        cfg_tl_name = NULL;
    } else {
        cfg_tl_name = resource->tl_rsc.tl_name;
    }
    status = uct_md_iface_config_read(md, cfg_tl_name, NULL, NULL, &iface_config);
    if (status != UCS_OK) {
        goto err_free_iface;
    }

    UCS_STATIC_ASSERT(UCP_WORKER_HEADROOM_PRIV_SIZE >= sizeof(ucp_eager_sync_hdr_t));

    /* Fill rest of uct_iface params (caller should fill specific mode fields) */
    iface_params->field_mask       |= UCT_IFACE_PARAM_FIELD_STATS_ROOT        |
                                      UCT_IFACE_PARAM_FIELD_RX_HEADROOM       |
                                      UCT_IFACE_PARAM_FIELD_ERR_HANDLER_ARG   |
                                      UCT_IFACE_PARAM_FIELD_ERR_HANDLER       |
                                      UCT_IFACE_PARAM_FIELD_ERR_HANDLER_FLAGS |
                                      UCT_IFACE_PARAM_FIELD_HW_TM_EAGER_ARG   |
                                      UCT_IFACE_PARAM_FIELD_HW_TM_RNDV_ARG    |
                                      UCT_IFACE_PARAM_FIELD_HW_TM_RNDV_CB     |
                                      UCT_IFACE_PARAM_FIELD_HW_TM_EAGER_CB    |
                                      UCT_IFACE_PARAM_FIELD_CPU_MASK;
    iface_params->stats_root        = UCS_STATS_RVAL(worker->stats);
    iface_params->rx_headroom       = UCP_WORKER_HEADROOM_SIZE;
    iface_params->err_handler_arg   = worker;
    iface_params->err_handler       = ucp_worker_iface_error_handler;
    iface_params->err_handler_flags = UCT_CB_FLAG_ASYNC;
    iface_params->eager_arg         = iface_params->rndv_arg = wiface;
    iface_params->eager_cb          = ucp_tag_offload_unexp_eager;
    iface_params->rndv_cb           = ucp_tag_offload_unexp_rndv;
    iface_params->cpu_mask          = worker->cpu_mask;

    /* Open UCT interface */
    status = uct_iface_open(md, worker->uct, iface_params, iface_config,
                            &wiface->iface);
    uct_config_release(iface_config);

    if (status != UCS_OK) {
       goto err_free_iface;
    }

    VALGRIND_MAKE_MEM_UNDEFINED(&wiface->attr, sizeof(wiface->attr));

    status = uct_iface_query(wiface->iface, &wiface->attr);
    if (status != UCS_OK) {
        goto err_close_iface;
    }

    ucs_debug("created interface[%d]=%p using "UCT_TL_RESOURCE_DESC_FMT" on worker %p",
              tl_id, wiface->iface, UCT_TL_RESOURCE_DESC_ARG(&resource->tl_rsc),
              worker);

    *wiface_p = wiface;

    return UCS_OK;

err_close_iface:
    uct_iface_close(wiface->iface);
err_free_iface:
    ucs_free(wiface);
    return status;
}

ucs_status_t ucp_worker_iface_init(ucp_worker_h worker, ucp_rsc_index_t tl_id,
                                   ucp_worker_iface_t *wiface)
{
    ucp_context_h context            = worker->context;
    ucp_tl_resource_desc_t *resource = &context->tl_rscs[tl_id];
    ucs_status_t status;

    ucs_assert(wiface != NULL);

    /* Set wake-up handlers */
    if (wiface->attr.cap.flags & UCP_WORKER_UCT_ALL_EVENT_CAP_FLAGS) {
        status = uct_iface_event_fd_get(wiface->iface, &wiface->event_fd);
        if (status != UCS_OK) {
            goto out_close_iface;
        }

        /* Register event handler without actual events so we could modify it later. */
        status = ucs_async_set_event_handler(worker->async.mode, wiface->event_fd,
                                             0, ucp_worker_iface_event, wiface,
                                             &worker->async);
        if (status != UCS_OK) {
            ucs_fatal("failed to register event handler: %s",
                      ucs_status_string(status));
        }
    }

    /* Set active message handlers */
    if ((wiface->attr.cap.flags & (UCT_IFACE_FLAG_AM_SHORT|
                                   UCT_IFACE_FLAG_AM_BCOPY|
                                   UCT_IFACE_FLAG_AM_ZCOPY)))
    {
        status = uct_iface_set_am_tracer(wiface->iface, ucp_worker_am_tracer,
                                         worker);
        if (status != UCS_OK) {
            goto out_close_iface;
        }

        if (context->config.ext.adaptive_progress &&
            (wiface->attr.cap.flags & UCP_WORKER_UCT_RECV_EVENT_CAP_FLAGS))
        {
            ucp_worker_iface_deactivate(wiface, 1);
        } else {
            ucp_worker_iface_activate(wiface, 0);
        }
    }

    context->mem_type_access_tls[context->tl_mds[resource->md_index].
                                 attr.cap.access_mem_type] |= UCS_BIT(tl_id);

    return UCS_OK;

out_close_iface:
    ucp_worker_uct_iface_close(wiface);
    return status;
}

void ucp_worker_iface_cleanup(ucp_worker_iface_t *wiface)
{
    ucs_status_t status;

    uct_worker_progress_unregister_safe(wiface->worker->uct,
                                        &wiface->check_events_id);

    ucp_worker_iface_disarm(wiface);

    if ((wiface->event_fd != -1) &&
        (wiface->attr.cap.flags & UCP_WORKER_UCT_ALL_EVENT_CAP_FLAGS)) {
        status = ucs_async_remove_handler(wiface->event_fd, 1);
        if (status != UCS_OK) {
            ucs_warn("failed to remove event handler for fd %d: %s",
                     wiface->event_fd, ucs_status_string(status));
        }
    }

    ucp_worker_uct_iface_close(wiface);
    ucs_free(wiface);
}

static void ucp_worker_close_cms(ucp_worker_h worker)
{
    const ucp_rsc_index_t num_cms = ucp_worker_num_cm_cmpts(worker);
    ucp_rsc_index_t i;

    for (i = 0; (i < num_cms) && (worker->cms[i].cm != NULL); ++i) {
        uct_cm_close(worker->cms[i].cm);
    }

    ucs_free(worker->cms);
    worker->cms = NULL;
}

static ucs_status_t ucp_worker_add_resource_cms(ucp_worker_h worker)
{
    ucp_context_h   context = worker->context;
    uct_cm_config_t *cm_config;
    uct_component_h cmpt;
    ucp_rsc_index_t cmpt_index, cm_cmpt_index, i;
    ucs_status_t    status;

    if (!ucp_worker_sockaddr_is_cm_proto(worker)) {
        worker->cms = NULL;
        return UCS_OK;
    }

    UCS_ASYNC_BLOCK(&worker->async);

    worker->cms = ucs_calloc(ucp_worker_num_cm_cmpts(worker),
                             sizeof(*worker->cms), "ucp cms");
    if (worker->cms == NULL) {
        ucs_error("can't allocate CMs array");
        status = UCS_ERR_NO_MEMORY;
        goto out;
    }

    for (i = 0, cm_cmpt_index = 0; cm_cmpt_index < context->config.num_cm_cmpts;
         ++cm_cmpt_index) {
        cmpt_index = context->config.cm_cmpt_idxs[cm_cmpt_index];
        cmpt       = context->tl_cmpts[cmpt_index].cmpt;

        status = uct_cm_config_read(cmpt, NULL, NULL, &cm_config);
        if (status != UCS_OK) {
            ucs_error("failed to read cm configuration on component %s",
                      context->tl_cmpts[cmpt_index].attr.name);
            goto err_free_cms;
        }

        status = uct_cm_open(cmpt, worker->uct, cm_config, &worker->cms[i].cm);
        if (status != UCS_OK) {
            ucs_error("failed to open CM on component %s with status %s",
                      context->tl_cmpts[cmpt_index].attr.name,
                      ucs_status_string(status));
            goto err_free_cms;
        }

        uct_config_release(cm_config);
        worker->cms[i++].cmpt_idx = cmpt_index;
    }

    status = UCS_OK;
    goto out;

err_free_cms:
    ucp_worker_close_cms(worker);
out:
    UCS_ASYNC_UNBLOCK(&worker->async);
    return status;
}

static void ucp_worker_enable_atomic_tl(ucp_worker_h worker, const char *mode,
                                        ucp_rsc_index_t rsc_index)
{
    ucs_assert(rsc_index != UCP_NULL_RESOURCE);
    ucs_trace("worker %p: using %s atomics on iface[%d]=" UCT_TL_RESOURCE_DESC_FMT,
              worker, mode, rsc_index,
              UCT_TL_RESOURCE_DESC_ARG(&worker->context->tl_rscs[rsc_index].tl_rsc));
    worker->atomic_tls |= UCS_BIT(rsc_index);
}

static void ucp_worker_init_cpu_atomics(ucp_worker_h worker)
{
    ucp_rsc_index_t iface_id;
    ucp_worker_iface_t *wiface;

    ucs_debug("worker %p: using cpu atomics", worker);

    /* Enable all interfaces which have host-based atomics */
    for (iface_id = 0; iface_id < worker->num_ifaces; ++iface_id) {
        wiface = worker->ifaces[iface_id];
        if (wiface->attr.cap.flags & UCT_IFACE_FLAG_ATOMIC_CPU) {
            ucp_worker_enable_atomic_tl(worker, "cpu", wiface->rsc_index);
        }
    }
}

static void ucp_worker_init_device_atomics(ucp_worker_h worker)
{
    ucp_context_h context = worker->context;
    ucp_address_iface_attr_t dummy_iface_attr;
    ucp_tl_resource_desc_t *rsc, *best_rsc;
    uct_iface_attr_t *iface_attr;
    ucp_rsc_index_t rsc_index;
    ucp_rsc_index_t iface_id;
    uint64_t iface_cap_flags;
    double score, best_score;
    ucp_rsc_index_t md_index;
    ucp_worker_iface_t *wiface;
    uct_md_attr_t *md_attr;
    uint64_t supp_tls;
    uint8_t priority, best_priority;
    ucp_tl_iface_atomic_flags_t atomic;

    ucp_context_uct_atomic_iface_flags(context, &atomic);

    iface_cap_flags                      = UCT_IFACE_FLAG_ATOMIC_DEVICE;

    dummy_iface_attr.bandwidth.dedicated = 1e12;
    dummy_iface_attr.bandwidth.shared    = 0;
    dummy_iface_attr.cap_flags           = UINT64_MAX;
    dummy_iface_attr.overhead            = 0;
    dummy_iface_attr.priority            = 0;
    dummy_iface_attr.lat_ovh             = 0;

    supp_tls                             = 0;
    best_score                           = -1;
    best_rsc                             = NULL;
    best_priority                        = 0;

    /* Select best interface for atomics device */
    for (iface_id = 0; iface_id < worker->num_ifaces; ++iface_id) {
        wiface     = worker->ifaces[iface_id];
        rsc_index  = wiface->rsc_index;
        rsc        = &context->tl_rscs[rsc_index];
        md_index   = rsc->md_index;
        md_attr    = &context->tl_mds[md_index].attr;
        iface_attr = &wiface->attr;

        if (!(md_attr->cap.flags & UCT_MD_FLAG_REG) ||
            !ucs_test_all_flags(iface_attr->cap.flags, iface_cap_flags)                        ||
            !ucs_test_all_flags(iface_attr->cap.atomic32.op_flags, atomic.atomic32.op_flags)   ||
            !ucs_test_all_flags(iface_attr->cap.atomic32.fop_flags, atomic.atomic32.fop_flags) ||
            !ucs_test_all_flags(iface_attr->cap.atomic64.op_flags, atomic.atomic64.op_flags)   ||
            !ucs_test_all_flags(iface_attr->cap.atomic64.fop_flags, atomic.atomic64.fop_flags))
        {
            continue;
        }

        supp_tls |= UCS_BIT(rsc_index);
        priority  = iface_attr->priority;

        score = ucp_wireup_amo_score_func(context, md_attr, iface_attr,
                                          &dummy_iface_attr);
        if (ucp_is_scalable_transport(worker->context,
                                      iface_attr->max_num_eps) &&
            ((score > best_score) ||
             ((score == best_score) && (priority > best_priority))))
        {
            best_rsc      = rsc;
            best_score    = score;
            best_priority = priority;
        }
    }

    if (best_rsc == NULL) {
        ucs_debug("worker %p: no support for atomics", worker);
        return;
    }

    ucs_debug("worker %p: using device atomics", worker);

    /* Enable atomics on all resources using same device as the "best" resource */
    ucs_for_each_bit(rsc_index, context->tl_bitmap) {
        rsc = &context->tl_rscs[rsc_index];
        if ((supp_tls & UCS_BIT(rsc_index)) &&
            (rsc->md_index == best_rsc->md_index) &&
            !strncmp(rsc->tl_rsc.dev_name, best_rsc->tl_rsc.dev_name,
                     UCT_DEVICE_NAME_MAX))
        {
            ucp_worker_enable_atomic_tl(worker, "device", rsc_index);
        }
    }
}

static void ucp_worker_init_guess_atomics(ucp_worker_h worker)
{
    uint64_t accumulated_flags = 0;
    ucp_rsc_index_t iface_id;

    for (iface_id = 0; iface_id < worker->num_ifaces; ++iface_id) {
        if (ucp_is_scalable_transport(worker->context,
                                      worker->ifaces[iface_id]->attr.max_num_eps)) {
            accumulated_flags |= worker->ifaces[iface_id]->attr.cap.flags;
        }
    }

    if (accumulated_flags & UCT_IFACE_FLAG_ATOMIC_DEVICE) {
        ucp_worker_init_device_atomics(worker);
    } else {
        ucp_worker_init_cpu_atomics(worker);
    }
}

static void ucp_worker_init_atomic_tls(ucp_worker_h worker)
{
    ucp_context_h context = worker->context;

    worker->atomic_tls = 0;

    if (context->config.features & UCP_FEATURE_AMO) {
        switch(context->config.ext.atomic_mode) {
        case UCP_ATOMIC_MODE_CPU:
            ucp_worker_init_cpu_atomics(worker);
            break;
        case UCP_ATOMIC_MODE_DEVICE:
            ucp_worker_init_device_atomics(worker);
            break;
        case UCP_ATOMIC_MODE_GUESS:
            ucp_worker_init_guess_atomics(worker);
            break;
        default:
            ucs_fatal("unsupported atomic mode: %d",
                      context->config.ext.atomic_mode);
        }
    }
}

static char* ucp_worker_add_feature_rsc(ucp_context_h context,
                                        const ucp_ep_config_key_t *key,
                                        ucp_lane_map_t lanes_bitmap,
                                        const char *feature_str,
                                        char *buf, size_t max)
{
    char *p    = buf;
    char *endp = buf + max;
    int   sep  = 0;
    ucp_rsc_index_t rsc_idx;
    ucp_lane_index_t lane;

    if (!lanes_bitmap) {
        return p;
    }

    snprintf(p, endp - p, "%s(", feature_str);
    p += strlen(p);

    ucs_for_each_bit(lane, lanes_bitmap) {
        ucs_assert(lane < UCP_MAX_LANES); /* make coverity happy */
        rsc_idx = key->lanes[lane].rsc_index;
        snprintf(p, endp - p, "%*s"UCT_TL_RESOURCE_DESC_FMT, sep, "",
                 UCT_TL_RESOURCE_DESC_ARG(&context->tl_rscs[rsc_idx].tl_rsc));
        p  += strlen(p);
        sep = 1; /* add space between tl names */
    }

    snprintf(p, endp - p, "); ");
    p += strlen(p);

    return p;
}

static void ucp_worker_print_used_tls(const ucp_ep_config_key_t *key,
                                      ucp_context_h context,
                                      ucp_ep_cfg_index_t config_idx)
{
    char info[256]                  = {0};
    ucp_lane_map_t tag_lanes_map    = 0;
    ucp_lane_map_t rma_lanes_map    = 0;
    ucp_lane_map_t amo_lanes_map    = 0;
    ucp_lane_map_t stream_lanes_map = 0;
    ucp_lane_index_t lane;
    char *p, *endp;

    if (!ucs_log_is_enabled(UCS_LOG_LEVEL_INFO)) {
        return;
    }

    p    = info;
    endp = p + sizeof(info);

    snprintf(p, endp - p,  "ep_cfg[%d]: ", config_idx);
    p += strlen(p);

    for (lane = 0; lane < key->num_lanes; ++lane) {
        if (((key->am_lane == lane) || (lane == key->tag_lane) ||
            (ucp_ep_config_get_multi_lane_prio(key->am_bw_lanes, lane) >= 0)  ||
            (ucp_ep_config_get_multi_lane_prio(key->rma_bw_lanes, lane) >= 0)) &&
            (context->config.features & UCP_FEATURE_TAG)) {
            tag_lanes_map |= UCS_BIT(lane);
        }

        if ((key->am_lane == lane) &&
            (context->config.features & UCP_FEATURE_STREAM)) {
            stream_lanes_map |= UCS_BIT(lane);
        }

        if ((ucp_ep_config_get_multi_lane_prio(key->rma_lanes, lane) >= 0)) {
            rma_lanes_map |= UCS_BIT(lane);
        }

        if ((ucp_ep_config_get_multi_lane_prio(key->amo_lanes, lane) >= 0)) {
            amo_lanes_map |= UCS_BIT(lane);
        }
    }

    p = ucp_worker_add_feature_rsc(context, key, tag_lanes_map, "tag",
                                   p, endp - p);
    p = ucp_worker_add_feature_rsc(context, key, rma_lanes_map, "rma",
                                   p, endp - p);
    p = ucp_worker_add_feature_rsc(context, key, amo_lanes_map, "amo",
                                   p, endp - p);
    ucp_worker_add_feature_rsc(context, key, stream_lanes_map, "stream",
                               p, endp - p);
    ucs_info("%s", info);
}

static ucs_status_t ucp_worker_init_mpools(ucp_worker_h worker)
{
    size_t           max_mp_entry_size = 0;
    ucp_context_t    *context          = worker->context;
    uct_iface_attr_t *if_attr;
    ucp_rsc_index_t  iface_id;
    ucs_status_t     status;

    for (iface_id = 0; iface_id < worker->num_ifaces; ++iface_id) {
        if_attr           = &worker->ifaces[iface_id]->attr;
        max_mp_entry_size = ucs_max(max_mp_entry_size,
                                    if_attr->cap.am.max_short);
        max_mp_entry_size = ucs_max(max_mp_entry_size,
                                    if_attr->cap.am.max_bcopy);
        max_mp_entry_size = ucs_max(max_mp_entry_size,
                                    if_attr->cap.am.max_zcopy);
    }

    status = ucs_mpool_init(&worker->am_mp, 0,
                            max_mp_entry_size + UCP_WORKER_HEADROOM_SIZE,
                            0, UCS_SYS_CACHE_LINE_SIZE, 128, UINT_MAX,
                            &ucp_am_mpool_ops, "ucp_am_bufs");
    if (status != UCS_OK) {
        goto out;
    }

    status = ucs_mpool_init(&worker->reg_mp, 0,
                            context->config.ext.seg_size + sizeof(ucp_mem_desc_t),
                            sizeof(ucp_mem_desc_t), UCS_SYS_CACHE_LINE_SIZE,
                            128, UINT_MAX, &ucp_reg_mpool_ops, "ucp_reg_bufs");
    if (status != UCS_OK) {
        goto err_release_am_mpool;
    }

    status = ucs_mpool_init(&worker->rndv_frag_mp, 0,
                            context->config.ext.rndv_frag_size + sizeof(ucp_mem_desc_t),
                            sizeof(ucp_mem_desc_t), UCS_SYS_CACHE_LINE_SIZE, 128,
                            UINT_MAX, &ucp_frag_mpool_ops, "ucp_rndv_frags");
    if (status != UCS_OK) {
        goto err_release_reg_mpool;
    }

    return UCS_OK;

err_release_reg_mpool:
    ucs_mpool_cleanup(&worker->reg_mp, 0);
err_release_am_mpool:
    ucs_mpool_cleanup(&worker->am_mp, 0);
out:
    return status;
}

/* All the ucp endpoints will share the configurations. No need for every ep to
 * have it's own configuration (to save memory footprint). Same config can be used
 * by different eps.
 * A 'key' identifies an entry in the ep_config array. An entry holds the key and
 * additional configuration parameters and thresholds.
 */
ucs_status_t ucp_worker_get_ep_config(ucp_worker_h worker,
                                      const ucp_ep_config_key_t *key,
                                      int print_cfg,
                                      ucp_ep_cfg_index_t *config_idx_p)
{
    ucp_ep_cfg_index_t config_idx;
    ucs_status_t status;

    /* Search for the given key in the ep_config array */
    for (config_idx = 0; config_idx < worker->ep_config_count; ++config_idx) {
        if (ucp_ep_config_is_equal(&worker->ep_config[config_idx].key, key)) {
            goto out;
        }
    }

    if (worker->ep_config_count >= worker->ep_config_max) {
        /* TODO support larger number of configurations */
        ucs_fatal("too many ep configurations: %d", worker->ep_config_count);
    }

    /* Create new configuration */
    config_idx = worker->ep_config_count++;
    status = ucp_ep_config_init(worker, &worker->ep_config[config_idx], key);
    if (status != UCS_OK) {
        return status;
    }

    if (print_cfg) {
        ucp_worker_print_used_tls(key, worker->context, config_idx);
    }

out:
    *config_idx_p = config_idx;
    return UCS_OK;
}

static ucs_mpool_ops_t ucp_rkey_mpool_ops = {
    .chunk_alloc   = ucs_mpool_chunk_malloc,
    .chunk_release = ucs_mpool_chunk_free,
    .obj_init      = NULL,
    .obj_cleanup   = NULL
};

ucs_status_t ucp_worker_create(ucp_context_h context,
                               const ucp_worker_params_t *params,
                               ucp_worker_h *worker_p)
{
    ucs_thread_mode_t uct_thread_mode;
    unsigned config_count;
    unsigned name_length;
    ucp_worker_h worker;
    ucs_status_t status;

    config_count = ucs_min((context->num_tls + 1) * (context->num_tls + 1) * context->num_tls,
                           UINT8_MAX);

    worker = ucs_calloc(1, sizeof(*worker) +
                           sizeof(*worker->ep_config) * config_count,
                        "ucp worker");
    if (worker == NULL) {
        return UCS_ERR_NO_MEMORY;
    }

    uct_thread_mode = UCS_THREAD_MODE_SINGLE;
    worker->flags   = 0;

    if (params->field_mask & UCP_WORKER_PARAM_FIELD_THREAD_MODE) {
#if ENABLE_MT
        if (params->thread_mode != UCS_THREAD_MODE_SINGLE) {
            /* UCT is serialized by UCP lock or by UCP user */
            uct_thread_mode = UCS_THREAD_MODE_SERIALIZED;
        }

        if (params->thread_mode == UCS_THREAD_MODE_MULTI) {
            worker->flags |= UCP_WORKER_FLAG_MT;
        }
#else
        if (params->thread_mode != UCS_THREAD_MODE_SINGLE) {
            ucs_debug("forced single thread mode on worker create");
        }
#endif
    }

    worker->context           = context;
    worker->uuid              = ucs_generate_uuid((uintptr_t)worker);
    worker->flush_ops_count   = 0;
    worker->inprogress        = 0;
    worker->ep_config_max     = config_count;
    worker->ep_config_count   = 0;
    worker->num_active_ifaces = 0;
    worker->num_ifaces        = 0;
    worker->am_message_id     = ucs_generate_uuid(0);
    ucs_list_head_init(&worker->arm_ifaces);
    ucs_list_head_init(&worker->stream_ready_eps);
    ucs_list_head_init(&worker->all_eps);
    ucp_ep_match_init(&worker->ep_match_ctx);

    UCS_STATIC_ASSERT(sizeof(ucp_ep_ext_gen_t) <= sizeof(ucp_ep_t));
    if (context->config.features & (UCP_FEATURE_STREAM | UCP_FEATURE_AM)) {
        UCS_STATIC_ASSERT(sizeof(ucp_ep_ext_proto_t) <= sizeof(ucp_ep_t));
        ucs_strided_alloc_init(&worker->ep_alloc, sizeof(ucp_ep_t), 3);
    } else {
        ucs_strided_alloc_init(&worker->ep_alloc, sizeof(ucp_ep_t), 2);
    }

    if (params->field_mask & UCP_WORKER_PARAM_FIELD_USER_DATA) {
        worker->user_data = params->user_data;
    } else {
        worker->user_data = NULL;
    }

    if (context->config.features & UCP_FEATURE_AM){
        worker->am_cbs            = NULL;
        worker->am_cb_array_len   = 0;
    }
    name_length = ucs_min(UCP_WORKER_NAME_MAX,
                          context->config.ext.max_worker_name + 1);
    ucs_snprintf_zero(worker->name, name_length, "%s:%d", ucs_get_host_name(),
                      getpid());

    /* Create statistics */
    status = UCS_STATS_NODE_ALLOC(&worker->stats, &ucp_worker_stats_class,
                                  ucs_stats_get_root(), "-%p", worker);
    if (status != UCS_OK) {
        goto err_free;
    }

    status = UCS_STATS_NODE_ALLOC(&worker->tm_offload_stats,
                                  &ucp_worker_tm_offload_stats_class,
                                  worker->stats);
    if (status != UCS_OK) {
        goto err_free_stats;
    }

    status = ucs_async_context_init(&worker->async,
                                    context->config.ext.use_mt_mutex ?
                                    UCS_ASYNC_MODE_THREAD_MUTEX :
                                    UCS_ASYNC_THREAD_LOCK_TYPE);
    if (status != UCS_OK) {
        goto err_free_tm_offload_stats;
    }

    /* Create the underlying UCT worker */
    status = uct_worker_create(&worker->async, uct_thread_mode, &worker->uct);
    if (status != UCS_OK) {
        goto err_destroy_async;
    }

    /* Create memory pool for requests */
    status = ucs_mpool_init(&worker->req_mp, 0,
                            sizeof(ucp_request_t) + context->config.request.size,
                            0, UCS_SYS_CACHE_LINE_SIZE, 128, UINT_MAX,
                            &ucp_request_mpool_ops, "ucp_requests");
    if (status != UCS_OK) {
        goto err_destroy_uct_worker;
    }

    /* create memory pool for small rkeys */
    status = ucs_mpool_init(&worker->rkey_mp, 0,
                            sizeof(ucp_rkey_t) +
                            sizeof(ucp_tl_rkey_t) * UCP_RKEY_MPOOL_MAX_MD,
                            0, UCS_SYS_CACHE_LINE_SIZE, 128, UINT_MAX,
                            &ucp_rkey_mpool_ops, "ucp_rkeys");
    if (status != UCS_OK) {
        goto err_req_mp_cleanup;
    }

    /* Create UCS event set which combines events from all transports */
    status = ucp_worker_wakeup_init(worker, params);
    if (status != UCS_OK) {
        goto err_rkey_mp_cleanup;
    }

    if (params->field_mask & UCP_WORKER_PARAM_FIELD_CPU_MASK) {
        worker->cpu_mask = params->cpu_mask;
    } else {
        UCS_CPU_ZERO(&worker->cpu_mask);
    }

    /* Initialize tag matching */
    status = ucp_tag_match_init(&worker->tm);
    if (status != UCS_OK) {
        goto err_wakeup_cleanup;
    }

    /* Open all resources as interfaces on this worker */
    status = ucp_worker_add_resource_ifaces(worker);
    if (status != UCS_OK) {
        goto err_close_ifaces;
    }

    /* Open all resources as connection managers on this worker */
    status = ucp_worker_add_resource_cms(worker);
    if (status != UCS_OK) {
        goto err_close_cms;
    }

    /* create mem type endponts */
    status = ucp_worker_create_mem_type_endpoints(worker);
    if (status != UCS_OK) {
        goto err_close_cms;
    }

    /* Init AM and registered memory pools */
    status = ucp_worker_init_mpools(worker);
    if (status != UCS_OK) {
        goto err_close_cms;
    }

    /* Select atomic resources */
    ucp_worker_init_atomic_tls(worker);

    /* At this point all UCT memory domains and interfaces are already created
     * so warn about unused environment variables.
     */
    ucs_config_parser_warn_unused_env_vars_once();

    *worker_p = worker;
    return UCS_OK;

err_close_cms:
    ucp_worker_close_cms(worker);
err_close_ifaces:
    ucp_worker_close_ifaces(worker);
    ucp_tag_match_cleanup(&worker->tm);
err_wakeup_cleanup:
    ucp_worker_wakeup_cleanup(worker);
err_rkey_mp_cleanup:
    ucs_mpool_cleanup(&worker->rkey_mp, 1);
err_req_mp_cleanup:
    ucs_mpool_cleanup(&worker->req_mp, 1);
err_destroy_uct_worker:
    uct_worker_destroy(worker->uct);
err_destroy_async:
    ucs_async_context_cleanup(&worker->async);
err_free_tm_offload_stats:
    UCS_STATS_NODE_FREE(worker->tm_offload_stats);
err_free_stats:
    UCS_STATS_NODE_FREE(worker->stats);
err_free:
    ucs_strided_alloc_cleanup(&worker->ep_alloc);
    ucs_free(worker);
    return status;
}

static void ucp_worker_destroy_eps(ucp_worker_h worker)
{
    ucp_ep_ext_gen_t *ep_ext, *tmp;

    ucs_debug("worker %p: destroy all endpoints", worker);
    ucs_list_for_each_safe(ep_ext, tmp, &worker->all_eps, ep_list) {
        ucp_ep_disconnected(ucp_ep_from_ext_gen(ep_ext), 1);
    }
}

static void ucp_worker_destroy_ep_configs(ucp_worker_h worker)
{
    unsigned i;

    for (i = 0; i < worker->ep_config_count; ++i) {
        ucp_ep_config_cleanup(worker, &worker->ep_config[i]);
    }

    worker->ep_config_count = 0;
}

void ucp_worker_destroy(ucp_worker_h worker)
{
    ucs_trace_func("worker=%p", worker);

    UCS_ASYNC_BLOCK(&worker->async);
    ucs_free(worker->am_cbs);
    ucp_worker_destroy_eps(worker);
    ucp_worker_remove_am_handlers(worker);
    ucp_worker_close_cms(worker);
    UCS_ASYNC_UNBLOCK(&worker->async);

    ucp_worker_destroy_ep_configs(worker);
    ucs_mpool_cleanup(&worker->am_mp, 1);
    ucs_mpool_cleanup(&worker->reg_mp, 1);
    ucs_mpool_cleanup(&worker->rndv_frag_mp, 1);
    ucp_worker_close_ifaces(worker);
    ucp_tag_match_cleanup(&worker->tm);
    ucp_worker_wakeup_cleanup(worker);
    ucs_mpool_cleanup(&worker->rkey_mp, 1);
    ucs_mpool_cleanup(&worker->req_mp, 1);
    uct_worker_destroy(worker->uct);
    ucs_async_context_cleanup(&worker->async);
    ucp_ep_match_cleanup(&worker->ep_match_ctx);
    ucs_strided_alloc_cleanup(&worker->ep_alloc);
    UCS_STATS_NODE_FREE(worker->tm_offload_stats);
    UCS_STATS_NODE_FREE(worker->stats);
    ucs_free(worker);
}

ucs_status_t ucp_worker_query(ucp_worker_h worker,
                              ucp_worker_attr_t *attr)
{
    ucp_context_h context = worker->context;
    ucs_status_t status   = UCS_OK;
    uint64_t tl_bitmap;
    ucp_rsc_index_t tl_id;

    if (attr->field_mask & UCP_WORKER_ATTR_FIELD_THREAD_MODE) {
        if (worker->flags & UCP_WORKER_FLAG_MT) {
            attr->thread_mode = UCS_THREAD_MODE_MULTI;
        } else {
            attr->thread_mode = UCS_THREAD_MODE_SINGLE;
        }
    }

    if (attr->field_mask & UCP_WORKER_ATTR_FIELD_ADDRESS) {
        /* If UCP_WORKER_ATTR_FIELD_ADDRESS_FLAGS is not set,
         * pack all tl adresses */
        tl_bitmap = UINT64_MAX;

        if (attr->field_mask & UCP_WORKER_ATTR_FIELD_ADDRESS_FLAGS) {
            if (attr->address_flags & UCP_WORKER_ADDRESS_FLAG_NET_ONLY) {
                tl_bitmap = 0;
                ucs_for_each_bit(tl_id, context->tl_bitmap) {
                    if (context->tl_rscs[tl_id].tl_rsc.dev_type == UCT_DEVICE_TYPE_NET) {
                        tl_bitmap |= UCS_BIT(tl_id);
                    }
                }
            }
        }

        status = ucp_address_pack(worker, NULL, tl_bitmap,
                                  UCP_ADDRESS_PACK_FLAG_ALL, NULL,
                                  &attr->address_length,
                                  (void**)&attr->address);
    }

    return status;
}

unsigned ucp_worker_progress(ucp_worker_h worker)
{
    unsigned count;

    /* worker->inprogress is used only for assertion check.
     * coverity[assert_side_effect]
     */
    UCP_WORKER_THREAD_CS_ENTER_CONDITIONAL(worker);

    /* check that ucp_worker_progress is not called from within ucp_worker_progress */
    ucs_assert(worker->inprogress++ == 0);
    count = uct_worker_progress(worker->uct);
    ucs_async_check_miss(&worker->async);

    /* coverity[assert_side_effect] */
    ucs_assert(--worker->inprogress == 0);

    UCP_WORKER_THREAD_CS_EXIT_CONDITIONAL(worker);

    return count;
}

ssize_t ucp_stream_worker_poll(ucp_worker_h worker,
                               ucp_stream_poll_ep_t *poll_eps,
                               size_t max_eps, unsigned flags)
{
    ssize_t            count = 0;
    ucp_ep_ext_proto_t *ep_ext;
    ucp_ep_h           ep;

    UCP_CONTEXT_CHECK_FEATURE_FLAGS(worker->context, UCP_FEATURE_STREAM,
                                    return UCS_ERR_INVALID_PARAM);

    UCP_WORKER_THREAD_CS_ENTER_CONDITIONAL(worker);

    while ((count < max_eps) && !ucs_list_is_empty(&worker->stream_ready_eps)) {
        ep_ext                    = ucp_stream_worker_dequeue_ep_head(worker);
        ep                        = ucp_ep_from_ext_proto(ep_ext);
        poll_eps[count].ep        = ep;
        poll_eps[count].user_data = ucp_ep_ext_gen(ep)->user_data;
        ++count;
    }

    UCP_WORKER_THREAD_CS_EXIT_CONDITIONAL(worker);

    return count;
}

ucs_status_t ucp_worker_get_efd(ucp_worker_h worker, int *fd)
{
    ucs_status_t status;

    UCP_CONTEXT_CHECK_FEATURE_FLAGS(worker->context, UCP_FEATURE_WAKEUP,
                                    return UCS_ERR_INVALID_PARAM);

    UCP_WORKER_THREAD_CS_ENTER_CONDITIONAL(worker);
    if (worker->flags & UCP_WORKER_FLAG_EXTERNAL_EVENT_FD) {
        status = UCS_ERR_UNSUPPORTED;
    } else {
        *fd    = worker->event_fd;
        status = UCS_OK;
    }
    UCP_WORKER_THREAD_CS_EXIT_CONDITIONAL(worker);
    return status;
}

ucs_status_t ucp_worker_arm(ucp_worker_h worker)
{
    ucp_worker_iface_t *wiface;
    ucs_status_t status;
    uint64_t dummy;
    int ret;

    ucs_trace_func("worker=%p", worker);

    UCP_CONTEXT_CHECK_FEATURE_FLAGS(worker->context, UCP_FEATURE_WAKEUP,
                                    return UCS_ERR_INVALID_PARAM);

    /* Read from event pipe. If some events are found, return BUSY,
     * Otherwise, continue to arm the transport interfaces.
     */
    do {
        ret = read(worker->eventfd, &dummy, sizeof(dummy));
        if (ret == sizeof(dummy)) {
            status = UCS_ERR_BUSY;
            goto out;
        } else if (ret == -1) {
            if (errno == EAGAIN) {
                break; /* No more events */
            } else if (errno != EINTR) {
                ucs_error("Read from internal event fd failed: %m");
                status = UCS_ERR_IO_ERROR;
                goto out;
            }
        } else {
            ucs_assert(ret == 0);
        }
    } while (ret != 0);

    UCP_WORKER_THREAD_CS_ENTER_CONDITIONAL(worker);

    /* Go over arm_list of active interfaces which support events and arm them */
    ucs_list_for_each(wiface, &worker->arm_ifaces, arm_list) {
        ucs_assert(wiface->activate_count > 0);
        status = uct_iface_event_arm(wiface->iface, worker->uct_events);
        ucs_trace("arm iface %p returned %s", wiface->iface,
                  ucs_status_string(status));
        if (status != UCS_OK) {
            goto out_unlock;
        }
    }

    status = UCS_OK;

out_unlock:
    UCP_WORKER_THREAD_CS_EXIT_CONDITIONAL(worker);
out:
    ucs_trace("ucp_worker_arm returning %s", ucs_status_string(status));
    return status;
}

void ucp_worker_wait_mem(ucp_worker_h worker, void *address)
{
    ucs_arch_wait_mem(address);
}

ucs_status_t ucp_worker_wait(ucp_worker_h worker)
{
    ucp_worker_iface_t *wiface;
    struct pollfd *pfd;
    ucs_status_t status;
    nfds_t nfds;
    int ret;

    ucs_trace_func("worker %p", worker);

    UCP_CONTEXT_CHECK_FEATURE_FLAGS(worker->context, UCP_FEATURE_WAKEUP,
                                    return UCS_ERR_INVALID_PARAM);

    UCP_WORKER_THREAD_CS_ENTER_CONDITIONAL(worker);

    status = ucp_worker_arm(worker);
    if (status == UCS_ERR_BUSY) { /* if UCS_ERR_BUSY returned - no poll() must called */
        status = UCS_OK;
        goto out_unlock;
    } else if (status != UCS_OK) {
        goto out_unlock;
    }

    if (worker->flags & UCP_WORKER_FLAG_EXTERNAL_EVENT_FD) {
        pfd = ucs_alloca(sizeof(*pfd) * worker->context->num_tls);
        nfds = 0;
        ucs_list_for_each(wiface, &worker->arm_ifaces, arm_list) {
            pfd[nfds].fd     = wiface->event_fd;
            pfd[nfds].events = POLLIN;
            ++nfds;
        }
    } else {
        pfd = ucs_alloca(sizeof(*pfd));
        pfd->fd     = worker->event_fd;
        pfd->events = POLLIN;
        nfds        = 1;
    }

    UCP_WORKER_THREAD_CS_EXIT_CONDITIONAL(worker);

    /* poll is thread safe system call, though can have unpredictable results
     * because of using the same descriptor in multiple threads.
     */
    for (;;) {
        ret = poll(pfd, nfds, -1);
        if (ret >= 0) {
            ucs_assertv(ret == 1, "ret=%d", ret);
            status = UCS_OK;
            goto out;
        } else {
            if (errno != EINTR) {
                ucs_error("poll(nfds=%d) returned %d: %m", (int)nfds, ret);
                status = UCS_ERR_IO_ERROR;
                goto out;
            }
        }
    }

out_unlock:
     UCP_WORKER_THREAD_CS_EXIT_CONDITIONAL(worker);
out:
    return status;
}

ucs_status_t ucp_worker_signal(ucp_worker_h worker)
{
    ucs_trace_func("worker %p", worker);
    UCP_CONTEXT_CHECK_FEATURE_FLAGS(worker->context, UCP_FEATURE_WAKEUP,
                                    return UCS_ERR_INVALID_PARAM);
    return ucp_worker_wakeup_signal_fd(worker);
}

ucs_status_t ucp_worker_get_address(ucp_worker_h worker, ucp_address_t **address_p,
                                    size_t *address_length_p)
{
    ucs_status_t status;

    UCP_WORKER_THREAD_CS_ENTER_CONDITIONAL(worker);

    status = ucp_address_pack(worker, NULL, UINT64_MAX,
                              UCP_ADDRESS_PACK_FLAG_ALL, NULL,
                              address_length_p, (void**)address_p);

    UCP_WORKER_THREAD_CS_EXIT_CONDITIONAL(worker);

    return status;
}

void ucp_worker_release_address(ucp_worker_h worker, ucp_address_t *address)
{
    ucs_free(address);
}


void ucp_worker_print_info(ucp_worker_h worker, FILE *stream)
{
    ucp_context_h context = worker->context;
    ucp_address_t *address;
    size_t address_length;
    ucs_status_t status;
    ucp_rsc_index_t rsc_index;
    int first;

    UCP_WORKER_THREAD_CS_ENTER_CONDITIONAL(worker);

    fprintf(stream, "#\n");
    fprintf(stream, "# UCP worker '%s'\n", ucp_worker_get_name(worker));
    fprintf(stream, "#\n");

    status = ucp_worker_get_address(worker, &address, &address_length);
    if (status == UCS_OK) {
        ucp_worker_release_address(worker, address);
        fprintf(stream, "#                 address: %zu bytes\n", address_length);
    } else {
        fprintf(stream, "# <failed to get address>\n");
    }

    if (context->config.features & UCP_FEATURE_AMO) {
        fprintf(stream, "#                 atomics: ");
        first = 1;
        for (rsc_index = 0; rsc_index < worker->context->num_tls; ++rsc_index) {
            if (worker->atomic_tls & UCS_BIT(rsc_index)) {
                if (!first) {
                    fprintf(stream, ", ");
                }
                fprintf(stream, "%d:"UCT_TL_RESOURCE_DESC_FMT, rsc_index,
                        UCT_TL_RESOURCE_DESC_ARG(&context->tl_rscs[rsc_index].tl_rsc));
                first = 0;
            }
        }
        fprintf(stream, "\n");
    }

    fprintf(stream, "#\n");

    UCP_WORKER_THREAD_CS_EXIT_CONDITIONAL(worker);
}