Blob Blame History Raw
/**
 * Copyright (C) Mellanox Technologies Ltd. 2001-2016.  ALL RIGHTS RESERVED.
 * Copyright (C) Los Alamos National Security, LLC. 2019 ALL RIGHTS RESERVED.
 *
 * See file LICENSE for terms.
 */

#ifdef HAVE_CONFIG_H
#  include "config.h"
#endif

#include "wireup.h"
#include "address.h"

#include <ucs/algorithm/qsort_r.h>
#include <ucs/datastruct/queue.h>
#include <ucs/sys/sock.h>
#include <ucp/core/ucp_ep.inl>
#include <string.h>
#include <inttypes.h>

#define UCP_WIREUP_RMA_BW_TEST_MSG_SIZE       262144

#define UCP_WIREUP_CHECK_AMO_FLAGS(_ae, _criteria, _context, _addr_index, _op, _size)      \
    if (!ucs_test_all_flags((_ae)->iface_attr.atomic.atomic##_size._op##_flags,            \
                            (_criteria)->remote_atomic_flags.atomic##_size._op##_flags)) { \
        char desc[256];                                                                    \
        ucs_trace("addr[%d] %s: no %s", (_addr_index),                                     \
                  ucp_find_tl_name_by_csum((_context), (_ae)->tl_name_csum),               \
                  ucp_wireup_get_missing_amo_flag_desc_##_op(                              \
                      (_ae)->iface_attr.atomic.atomic##_size._op##_flags,                  \
                      (_criteria)->remote_atomic_flags.atomic##_size._op##_flags,          \
                      (_size), desc, sizeof(desc)));                                       \
        continue;                                                                          \
    }

typedef struct ucp_wireup_atomic_flag {
    const char *name;
    const char *fetch;
} ucp_wireup_atomic_flag_t;


enum {
    UCP_WIREUP_LANE_USAGE_AM     = UCS_BIT(0), /* Active messages */
    UCP_WIREUP_LANE_USAGE_AM_BW  = UCS_BIT(1), /* High-BW active messages */
    UCP_WIREUP_LANE_USAGE_RMA    = UCS_BIT(2), /* Remote memory access */
    UCP_WIREUP_LANE_USAGE_RMA_BW = UCS_BIT(3), /* High-BW remote memory access */
    UCP_WIREUP_LANE_USAGE_AMO    = UCS_BIT(4), /* Atomic memory access */
    UCP_WIREUP_LANE_USAGE_TAG    = UCS_BIT(5), /* Tag matching offload */
    UCP_WIREUP_LANE_USAGE_CM     = UCS_BIT(6)  /* CM wireup */
};


typedef struct {
    ucp_rsc_index_t   rsc_index;
    unsigned          addr_index;
    ucp_lane_index_t  proxy_lane;
    ucp_rsc_index_t   dst_md_index;
    uint32_t          usage;
    double            am_bw_score;
    double            rma_score;
    double            rma_bw_score;
    double            amo_score;
} ucp_wireup_lane_desc_t;


typedef struct {
    ucp_wireup_criteria_t criteria;
    uint64_t              local_dev_bitmap;
    uint64_t              remote_dev_bitmap;
    ucp_md_map_t          md_map;
    uint32_t              usage;
    unsigned              max_lanes;
} ucp_wireup_select_bw_info_t;


/**
 * Global parameters for lanes selection during UCP wireup procedure
 */
typedef struct {
    ucp_ep_h                      ep;               /* UCP Endpoint */
    unsigned                      ep_init_flags;    /* Endpoint init flags */
    uint64_t                      tl_bitmap;        /* TLs bitmap which can be selected */
    const ucp_unpacked_address_t  *address;         /* Remote addresses */
    int                           allow_am;         /* Shows whether emulation over AM
                                                     * is allowed or not for RMA/AMO */
    int                           show_error;       /* Global flag that controls showing
                                                     * errors from a selecting transport
                                                     * procedure */
} ucp_wireup_select_params_t;

/**
 * Context for lanes selection during UCP wireup procedure
 */
typedef struct {
    ucp_wireup_lane_desc_t    lane_descs[UCP_MAX_LANES]; /* Array of active lanes that are
                                                          * found during selection */
    ucp_lane_index_t          num_lanes;                 /* Number of active lanes */
    unsigned                  ucp_ep_init_flags;         /* Endpoint init extra flags */
} ucp_wireup_select_context_t;

static const char *ucp_wireup_md_flags[] = {
    [ucs_ilog2(UCT_MD_FLAG_ALLOC)]               = "memory allocation",
    [ucs_ilog2(UCT_MD_FLAG_REG)]                 = "memory registration",
};

static const char *ucp_wireup_iface_flags[] = {
    [ucs_ilog2(UCT_IFACE_FLAG_AM_SHORT)]         = "am short",
    [ucs_ilog2(UCT_IFACE_FLAG_AM_BCOPY)]         = "am bcopy",
    [ucs_ilog2(UCT_IFACE_FLAG_AM_ZCOPY)]         = "am zcopy",
    [ucs_ilog2(UCT_IFACE_FLAG_PUT_SHORT)]        = "put short",
    [ucs_ilog2(UCT_IFACE_FLAG_PUT_BCOPY)]        = "put bcopy",
    [ucs_ilog2(UCT_IFACE_FLAG_PUT_ZCOPY)]        = "put zcopy",
    [ucs_ilog2(UCT_IFACE_FLAG_GET_SHORT)]        = "get short",
    [ucs_ilog2(UCT_IFACE_FLAG_GET_BCOPY)]        = "get bcopy",
    [ucs_ilog2(UCT_IFACE_FLAG_GET_ZCOPY)]        = "get zcopy",
    [ucs_ilog2(UCT_IFACE_FLAG_ERRHANDLE_PEER_FAILURE)] = "peer failure handler",
    [ucs_ilog2(UCT_IFACE_FLAG_CONNECT_TO_IFACE)] = "connect to iface",
    [ucs_ilog2(UCT_IFACE_FLAG_CONNECT_TO_EP)]    = "connect to ep",
    [ucs_ilog2(UCT_IFACE_FLAG_AM_DUP)]           = "full reliability",
    [ucs_ilog2(UCT_IFACE_FLAG_CB_SYNC)]          = "sync callback",
    [ucs_ilog2(UCT_IFACE_FLAG_CB_ASYNC)]         = "async callback",
    [ucs_ilog2(UCT_IFACE_FLAG_EVENT_SEND_COMP)]  = "send completion event",
    [ucs_ilog2(UCT_IFACE_FLAG_EVENT_RECV)]       = "tag or active message event",
    [ucs_ilog2(UCT_IFACE_FLAG_EVENT_RECV_SIG)]   = "signaled message event",
    [ucs_ilog2(UCT_IFACE_FLAG_PENDING)]          = "pending",
    [ucs_ilog2(UCT_IFACE_FLAG_TAG_EAGER_SHORT)]  = "tag eager short",
    [ucs_ilog2(UCT_IFACE_FLAG_TAG_EAGER_BCOPY)]  = "tag eager bcopy",
    [ucs_ilog2(UCT_IFACE_FLAG_TAG_EAGER_ZCOPY)]  = "tag eager zcopy",
    [ucs_ilog2(UCT_IFACE_FLAG_TAG_RNDV_ZCOPY)]   = "tag rndv zcopy"
};

static ucp_wireup_atomic_flag_t ucp_wireup_atomic_desc[] = {
     [UCT_ATOMIC_OP_ADD]   = {.name = "add",   .fetch = "fetch-"},
     [UCT_ATOMIC_OP_AND]   = {.name = "and",   .fetch = "fetch-"},
     [UCT_ATOMIC_OP_OR]    = {.name = "or",    .fetch = "fetch-"},
     [UCT_ATOMIC_OP_XOR]   = {.name = "xor",   .fetch = "fetch-"},
     [UCT_ATOMIC_OP_SWAP]  = {.name = "swap",  .fetch = ""},
     [UCT_ATOMIC_OP_CSWAP] = {.name = "cscap", .fetch = ""}
};


static double ucp_wireup_aux_score_func(ucp_context_h context,
                                        const uct_md_attr_t *md_attr,
                                        const uct_iface_attr_t *iface_attr,
                                        const ucp_address_iface_attr_t *remote_iface_attr);

static const char *
ucp_wireup_get_missing_flag_desc(uint64_t flags, uint64_t required_flags,
                                 const char ** flag_descs)
{
    ucs_assert((required_flags & (~flags)) != 0);
    return flag_descs[ucs_ffs64(required_flags & (~flags))];
}

static const char *
ucp_wireup_get_missing_amo_flag_desc(uint64_t flags, uint64_t required_flags,
                                     int op_size, int fetch, char *buf, size_t len)
{
    int idx;

    ucs_assert((required_flags & (~flags)) != 0);

    idx = ucs_ffs64(required_flags & (~flags));

    snprintf(buf, len, "%d-bit atomic %s%s", op_size,
             fetch ? ucp_wireup_atomic_desc[idx].fetch : "",
             ucp_wireup_atomic_desc[idx].name);

    return buf;
}

static const char *
ucp_wireup_get_missing_amo_flag_desc_op(uint64_t flags, uint64_t required_flags,
                                        int op_size, char *buf, size_t len)
{
    return ucp_wireup_get_missing_amo_flag_desc(flags, required_flags, op_size, 0, buf, len);
}

static const char *
ucp_wireup_get_missing_amo_flag_desc_fop(uint64_t flags, uint64_t required_flags,
                                         int op_size, char *buf, size_t len)
{
    return ucp_wireup_get_missing_amo_flag_desc(flags, required_flags, op_size, 1, buf, len);
}

static int ucp_wireup_check_flags(const uct_tl_resource_desc_t *resource,
                                  uint64_t flags, uint64_t required_flags,
                                  const char *title, const char ** flag_descs,
                                  char *reason, size_t max)
{
    const char *missing_flag_desc;

    if (ucs_test_all_flags(flags, required_flags)) {
        return 1;
    }

    if (required_flags) {
        missing_flag_desc = ucp_wireup_get_missing_flag_desc(flags, required_flags,
                                                             flag_descs);
        ucs_trace(UCT_TL_RESOURCE_DESC_FMT " : not suitable for %s, no %s",
                  UCT_TL_RESOURCE_DESC_ARG(resource), title,
                  missing_flag_desc);
        snprintf(reason, max, UCT_TL_RESOURCE_DESC_FMT" - no %s",
                 UCT_TL_RESOURCE_DESC_ARG(resource), missing_flag_desc);
    }
    return 0;
}

static int ucp_wireup_check_amo_flags(const uct_tl_resource_desc_t *resource,
                                      uint64_t flags, uint64_t required_flags,
                                      int op_size, int fetch,
                                      const char *title, char *reason, size_t max)
{
    char missing_flag_desc[256];

    if (ucs_test_all_flags(flags, required_flags)) {
        return 1;
    }

    if (required_flags) {
        ucp_wireup_get_missing_amo_flag_desc(flags, required_flags,
                                             op_size, fetch, missing_flag_desc,
                                             sizeof(missing_flag_desc));
        ucs_trace(UCT_TL_RESOURCE_DESC_FMT " : not suitable for %s, no %s",
                  UCT_TL_RESOURCE_DESC_ARG(resource), title,
                  missing_flag_desc);
        snprintf(reason, max, UCT_TL_RESOURCE_DESC_FMT" - no %s",
                 UCT_TL_RESOURCE_DESC_ARG(resource), missing_flag_desc);
    }
    return 0;
}

static void
ucp_wireup_init_select_info(ucp_context_h context, double score,
                            unsigned addr_index, ucp_rsc_index_t rsc_index,
                            uint8_t priority, const char *title,
                            ucp_wireup_select_info_t *select_info)
{
    ucs_assert(score >= 0.0);

    ucs_trace(UCT_TL_RESOURCE_DESC_FMT "->addr[%u] : %s score %.2f priority %d",
              UCT_TL_RESOURCE_DESC_ARG(&context->tl_rscs[rsc_index].tl_rsc),
              addr_index, title, score, priority);

    select_info->score      = score;
    select_info->addr_index = addr_index;
    select_info->rsc_index  = rsc_index;
    select_info->priority   = priority;
}

/**
 * Select a local and remote transport
 */
static UCS_F_NOINLINE ucs_status_t
ucp_wireup_select_transport(const ucp_wireup_select_params_t *select_params,
                            const ucp_wireup_criteria_t *criteria,
                            uint64_t tl_bitmap, uint64_t remote_md_map,
                            uint64_t local_dev_bitmap,
                            uint64_t remote_dev_bitmap,
                            int show_error,
                            ucp_wireup_select_info_t *select_info)
{
    ucp_ep_h ep                    = select_params->ep;
    ucp_worker_h worker            = ep->worker;
    ucp_context_h context          = worker->context;
    ucp_wireup_select_info_t sinfo = {0};
    int found                      = 0;
    unsigned addr_index;
    uct_tl_resource_desc_t *resource;
    const ucp_address_entry_t *ae;
    ucp_rsc_index_t rsc_index;
    char tls_info[256];
    char *p, *endp;
    uct_iface_attr_t *iface_attr;
    uct_md_attr_t *md_attr;
    uint64_t addr_index_map;
    double score;
    uint8_t priority;

    p            = tls_info;
    endp         = tls_info + sizeof(tls_info) - 1;
    tls_info[0]  = '\0';
    tl_bitmap   &= select_params->tl_bitmap;
    show_error   = (select_params->show_error && show_error);

    /* Check which remote addresses satisfy the criteria */
    addr_index_map = 0;
    ucp_unpacked_address_for_each(ae, select_params->address) {
        addr_index = ucp_unpacked_address_index(select_params->address, ae);
        if (!(remote_dev_bitmap & UCS_BIT(ae->dev_index))) {
            ucs_trace("addr[%d]: not in use, because on device[%d]",
                      addr_index, ae->dev_index);
            continue;
        } else if (!(remote_md_map & UCS_BIT(ae->md_index))) {
            ucs_trace("addr[%d]: not in use, because on md[%d]", addr_index,
                      ae->md_index);
            continue;
        } else if (!ucs_test_all_flags(ae->md_flags,
                                       criteria->remote_md_flags)) {
            ucs_trace("addr[%d] %s: no %s", addr_index,
                      ucp_find_tl_name_by_csum(context, ae->tl_name_csum),
                      ucp_wireup_get_missing_flag_desc(ae->md_flags,
                                                       criteria->remote_md_flags,
                                                       ucp_wireup_md_flags));
            continue;
        }

        /* Make sure we are indeed passing all flags required by the criteria in
         * ucp packed address */
        ucs_assert(ucs_test_all_flags(UCP_ADDRESS_IFACE_FLAGS,
                                      criteria->remote_iface_flags));

        if (!ucs_test_all_flags(ae->iface_attr.cap_flags, criteria->remote_iface_flags)) {
            ucs_trace("addr[%d] %s: no %s", addr_index,
                      ucp_find_tl_name_by_csum(context, ae->tl_name_csum),
                      ucp_wireup_get_missing_flag_desc(ae->iface_attr.cap_flags,
                                                       criteria->remote_iface_flags,
                                                       ucp_wireup_iface_flags));
            continue;
        }

        UCP_WIREUP_CHECK_AMO_FLAGS(ae, criteria, context, addr_index, op, 32);
        UCP_WIREUP_CHECK_AMO_FLAGS(ae, criteria, context, addr_index, op, 64);
        UCP_WIREUP_CHECK_AMO_FLAGS(ae, criteria, context, addr_index, fop, 32);
        UCP_WIREUP_CHECK_AMO_FLAGS(ae, criteria, context, addr_index, fop, 64);

        addr_index_map |= UCS_BIT(addr_index);
    }

    if (!addr_index_map) {
         snprintf(p, endp - p, "%s  ", ucs_status_string(UCS_ERR_UNSUPPORTED));
         p += strlen(p);
         goto out;
    }

    /* For each local resource try to find the best remote address to connect to.
     * Pick the best local resource to satisfy the criteria.
     * best one has the highest score (from the dedicated score_func) and
     * has a reachable tl on the remote peer */
    ucs_for_each_bit(rsc_index, context->tl_bitmap) {
        resource   = &context->tl_rscs[rsc_index].tl_rsc;
        iface_attr = ucp_worker_iface_get_attr(worker, rsc_index);
        md_attr    = &context->tl_mds[context->tl_rscs[rsc_index].md_index].attr;

        if ((context->tl_rscs[rsc_index].flags & UCP_TL_RSC_FLAG_AUX) &&
            !(criteria->tl_rsc_flags & UCP_TL_RSC_FLAG_AUX)) {
            continue;
        }

        /* Check that local md and interface satisfy the criteria */
        if (!ucp_wireup_check_flags(resource, md_attr->cap.flags,
                                    criteria->local_md_flags, criteria->title,
                                    ucp_wireup_md_flags, p, endp - p) ||
            !ucp_wireup_check_flags(resource, iface_attr->cap.flags,
                                    criteria->local_iface_flags, criteria->title,
                                    ucp_wireup_iface_flags, p, endp - p) ||
            !ucp_wireup_check_amo_flags(resource, iface_attr->cap.atomic32.op_flags,
                                        criteria->local_atomic_flags.atomic32.op_flags,
                                        32, 0, criteria->title, p, endp - p) ||
            !ucp_wireup_check_amo_flags(resource, iface_attr->cap.atomic64.op_flags,
                                        criteria->local_atomic_flags.atomic64.op_flags,
                                        64, 0, criteria->title, p, endp - p) ||
            !ucp_wireup_check_amo_flags(resource, iface_attr->cap.atomic32.fop_flags,
                                        criteria->local_atomic_flags.atomic32.fop_flags,
                                        32, 1, criteria->title, p, endp - p) ||
            !ucp_wireup_check_amo_flags(resource, iface_attr->cap.atomic64.fop_flags,
                                        criteria->local_atomic_flags.atomic64.fop_flags,
                                        64, 1, criteria->title, p, endp - p))
        {
            p += strlen(p);
            snprintf(p, endp - p, ", ");
            p += strlen(p);
            continue;
        }

        /* Check supplied tl & device bitmap */
        if (!(tl_bitmap & UCS_BIT(rsc_index))) {
            ucs_trace(UCT_TL_RESOURCE_DESC_FMT " : disabled by tl_bitmap",
                      UCT_TL_RESOURCE_DESC_ARG(resource));
            snprintf(p, endp - p, UCT_TL_RESOURCE_DESC_FMT" - disabled for %s, ",
                     UCT_TL_RESOURCE_DESC_ARG(resource), criteria->title);
            p += strlen(p);
            continue;
        } else if (!(local_dev_bitmap & UCS_BIT(context->tl_rscs[rsc_index].dev_index))) {
            ucs_trace(UCT_TL_RESOURCE_DESC_FMT " : disabled by device bitmap",
                      UCT_TL_RESOURCE_DESC_ARG(resource));
            snprintf(p, endp - p, UCT_TL_RESOURCE_DESC_FMT" - disabled for %s, ",
                     UCT_TL_RESOURCE_DESC_ARG(resource), criteria->title);
            p += strlen(p);
            continue;
        }

        ucp_unpacked_address_for_each(ae, select_params->address) {
            addr_index = ucp_unpacked_address_index(select_params->address, ae);
            if (!(addr_index_map & UCS_BIT(addr_index)) ||
                !ucp_wireup_is_reachable(worker, rsc_index, ae))
            {
                /* Must be reachable device address, on same transport */
                continue;
            }

            score        = criteria->calc_score(context, md_attr, iface_attr,
                                                &ae->iface_attr);
            priority     = iface_attr->priority + ae->iface_attr.priority;

            if (!found || (ucp_score_prio_cmp(score, priority, sinfo.score,
                                              sinfo.priority) > 0)) {
                ucp_wireup_init_select_info(context, score, addr_index,
                                            rsc_index, priority,
                                            criteria->title, &sinfo);
                found = 1;
            }
        }

        /* If a local resource cannot reach any of the remote addresses,
         * generate debug message. */
        if (!found) {
            snprintf(p, endp - p, UCT_TL_RESOURCE_DESC_FMT" - %s, ",
                     UCT_TL_RESOURCE_DESC_ARG(resource),
                     ucs_status_string(UCS_ERR_UNREACHABLE));
            p += strlen(p);
        }
    }

out:
    if (p >= tls_info + 2) {
        *(p - 2) = '\0'; /* trim last "," */
    }

    if (!found) {
        if (show_error) {
            ucs_error("no %s transport to %s: %s", criteria->title,
                      ucp_ep_peer_name(ep), tls_info);
        }

        return UCS_ERR_UNREACHABLE;
    }

    ucs_trace("ep %p: selected for %s: " UCT_TL_RESOURCE_DESC_FMT " md[%d]"
              " -> '%s' address[%d],md[%d] score %.2f", ep, criteria->title,
              UCT_TL_RESOURCE_DESC_ARG(&context->tl_rscs[sinfo.rsc_index].tl_rsc),
              context->tl_rscs[sinfo.rsc_index].md_index, ucp_ep_peer_name(ep),
              sinfo.addr_index,
              select_params->address->address_list[sinfo.addr_index].md_index,
              sinfo.score);

    *select_info = sinfo;
    return UCS_OK;
}

static inline double ucp_wireup_tl_iface_latency(ucp_context_h context,
                                                 const uct_iface_attr_t *iface_attr,
                                                 const ucp_address_iface_attr_t *remote_iface_attr)
{
    return ucs_max(iface_attr->latency.overhead, remote_iface_attr->lat_ovh) +
           (iface_attr->latency.growth * context->config.est_num_eps);
}

static UCS_F_NOINLINE void
ucp_wireup_add_lane_desc(const ucp_wireup_select_info_t *select_info,
                         ucp_rsc_index_t dst_md_index,
                         uint32_t usage, int is_proxy,
                         ucp_wireup_select_context_t *select_ctx)
{
    ucp_wireup_lane_desc_t *lane_desc;
    ucp_lane_index_t lane, proxy_lane;
    int proxy_changed;

    /* Add a new lane, but try to reuse already added lanes which are selected
     * on the same transport resources.
     */
    proxy_changed = 0;
    for (lane_desc = select_ctx->lane_descs;
         lane_desc < select_ctx->lane_descs + select_ctx->num_lanes; ++lane_desc) {
        if ((lane_desc->rsc_index == select_info->rsc_index) &&
            (lane_desc->addr_index == select_info->addr_index))
        {
            lane = lane_desc - select_ctx->lane_descs;
            ucs_assertv_always(dst_md_index == lane_desc->dst_md_index,
                               "lane[%d].dst_md_index=%d, dst_md_index=%d",
                               lane, lane_desc->dst_md_index, dst_md_index);
            ucs_assertv_always(!(lane_desc->usage & usage), "lane[%d]=0x%x |= 0x%x",
                               lane, lane_desc->usage, usage);
            if (is_proxy && (lane_desc->proxy_lane == UCP_NULL_LANE)) {
                /* New lane is a proxy, and found existing non-proxy lane with
                 * same resource. So that lane should be used by the proxy.
                 */
                proxy_lane = lane;
                goto out_add_lane;
            } else if (!is_proxy && (lane_desc->proxy_lane == lane)) {
                /* New lane is not a proxy, but found existing proxy lane which
                 * could use the new lane. It also means we should be able to
                 * add our new lane.
                 */
                lane_desc->proxy_lane = select_ctx->num_lanes;
                proxy_changed = 1;
            } else if (!is_proxy && (lane_desc->proxy_lane == UCP_NULL_LANE)) {
                /* Found non-proxy lane with same resource - don't add */
                ucs_assert_always(!proxy_changed);
                lane_desc->usage |= usage;
                goto out_update_score;
            }
        }
    }

    /* If a proxy cannot find other lane with same resource, proxy to self */
    proxy_lane = is_proxy ? select_ctx->num_lanes : UCP_NULL_LANE;

out_add_lane:
    lane_desc = &select_ctx->lane_descs[select_ctx->num_lanes];
    ++select_ctx->num_lanes;

    lane_desc->rsc_index    = select_info->rsc_index;
    lane_desc->addr_index   = select_info->addr_index;
    lane_desc->proxy_lane   = proxy_lane;
    lane_desc->dst_md_index = dst_md_index;
    lane_desc->usage        = usage;
    lane_desc->am_bw_score  = 0.0;
    lane_desc->rma_score    = 0.0;
    lane_desc->rma_bw_score = 0.0;
    lane_desc->amo_score    = 0.0;

out_update_score:
    if (usage & UCP_WIREUP_LANE_USAGE_AM_BW) {
        lane_desc->am_bw_score = select_info->score;
    }
    if (usage & UCP_WIREUP_LANE_USAGE_RMA) {
        lane_desc->rma_score = select_info->score;
    }
    if (usage & UCP_WIREUP_LANE_USAGE_RMA_BW) {
        lane_desc->rma_bw_score = select_info->score;
    }
    if (usage & UCP_WIREUP_LANE_USAGE_AMO) {
        lane_desc->amo_score = select_info->score;
    }
}

static int ucp_wireup_is_lane_proxy(ucp_ep_h ep, ucp_rsc_index_t rsc_index,
                                    uint64_t remote_cap_flags)
{
    return !ucp_worker_is_tl_p2p(ep->worker, rsc_index) &&
           ((remote_cap_flags & UCP_WORKER_UCT_RECV_EVENT_CAP_FLAGS) ==
            UCT_IFACE_FLAG_EVENT_RECV_SIG);
}

static UCS_F_NOINLINE void
ucp_wireup_add_lane(const ucp_wireup_select_params_t *select_params,
                    const ucp_wireup_select_info_t *select_info,
                    uint32_t usage, ucp_wireup_select_context_t *select_ctx)
{
    int is_proxy = 0;
    ucp_rsc_index_t dst_md_index;
    uint64_t remote_cap_flags;

    if (usage & (UCP_WIREUP_LANE_USAGE_AM |
                 UCP_WIREUP_LANE_USAGE_AM_BW |
                 UCP_WIREUP_LANE_USAGE_TAG)) {
        /* If the remote side is not p2p and has only signaled-am wakeup, it may
         * deactivate its interface and wait for signaled active message to wake up.
         * Use a proxy lane which would send the first active message as signaled to
         * make sure the remote interface will indeed wake up. */
        remote_cap_flags = select_params->address->address_list
                                [select_info->addr_index].iface_attr.cap_flags;
        is_proxy         = ucp_wireup_is_lane_proxy(select_params->ep,
                                                    select_info->rsc_index,
                                                    remote_cap_flags);
    }

    dst_md_index = select_params->address->address_list
                                [select_info->addr_index].md_index;
    ucp_wireup_add_lane_desc(select_info, dst_md_index,
                             usage, is_proxy, select_ctx);
}

#define UCP_WIREUP_COMPARE_SCORE(_elem1, _elem2, _arg, _token) \
    ({ \
        const ucp_lane_index_t *lane1 = (_elem1); \
        const ucp_lane_index_t *lane2 = (_elem2); \
        const ucp_wireup_lane_desc_t *lanes = (_arg); \
        double score1, score2; \
        \
        score1 = (*lane1 == UCP_NULL_LANE) ? 0.0 : lanes[*lane1]._token##_score; \
        score2 = (*lane2 == UCP_NULL_LANE) ? 0.0 : lanes[*lane2]._token##_score; \
        /* sort from highest score to lowest */ \
        (score1 < score2) ? 1 : ((score1 > score2) ? -1 : 0); \
    })

static int ucp_wireup_compare_lane_am_bw_score(const void *elem1, const void *elem2,
                                               void *arg)
{
    return UCP_WIREUP_COMPARE_SCORE(elem1, elem2, arg, am_bw);
}

static int ucp_wireup_compare_lane_rma_score(const void *elem1, const void *elem2,
                                             void *arg)
{
    return UCP_WIREUP_COMPARE_SCORE(elem1, elem2, arg, rma);
}

static int ucp_wireup_compare_lane_rma_bw_score(const void *elem1, const void *elem2,
                                             void *arg)
{
    return UCP_WIREUP_COMPARE_SCORE(elem1, elem2, arg, rma_bw);
}

static int ucp_wireup_compare_lane_amo_score(const void *elem1, const void *elem2,
                                             void *arg)
{
    return UCP_WIREUP_COMPARE_SCORE(elem1, elem2, arg, amo);
}

static void
ucp_wireup_unset_tl_by_md(const ucp_wireup_select_params_t *sparams,
                          const ucp_wireup_select_info_t *sinfo,
                          uint64_t *tl_bitmap, uint64_t *remote_md_map)
{
    ucp_context_h context         = sparams->ep->worker->context;
    const ucp_address_entry_t *ae = &sparams->address->
                                        address_list[sinfo->addr_index];
    ucp_rsc_index_t md_index      = context->tl_rscs[sinfo->rsc_index].md_index;
    ucp_rsc_index_t dst_md_index  = ae->md_index;
    ucp_rsc_index_t i;

    *remote_md_map &= ~UCS_BIT(dst_md_index);

    ucs_for_each_bit(i, context->tl_bitmap) {
        if (context->tl_rscs[i].md_index == md_index) {
            *tl_bitmap &= ~UCS_BIT(i);
        }
    }
}

static UCS_F_NOINLINE ucs_status_t
ucp_wireup_add_memaccess_lanes(const ucp_wireup_select_params_t *select_params,
                               const ucp_wireup_criteria_t *criteria,
                               uint64_t tl_bitmap, uint32_t usage,
                               ucp_wireup_select_context_t *select_ctx)
{
    ucp_wireup_criteria_t mem_criteria   = *criteria;
    ucp_wireup_select_info_t select_info = {0};
    int show_error                       = !select_params->allow_am;
    double reg_score;
    uint64_t remote_md_map;
    ucs_status_t status;
    char title[64];

    remote_md_map = UINT64_MAX;

    /* Select best transport which can reach registered memory */
    snprintf(title, sizeof(title), criteria->title, "registered");
    mem_criteria.title           = title;
    mem_criteria.remote_md_flags = UCT_MD_FLAG_REG | criteria->remote_md_flags;
    status = ucp_wireup_select_transport(select_params, &mem_criteria,
                                         tl_bitmap, remote_md_map,
                                         UINT64_MAX, UINT64_MAX,
                                         show_error, &select_info);
    if (status != UCS_OK) {
        goto out;
    }

    reg_score = select_info.score;

    /* Add to the list of lanes and remove all occurrences of the remote md
     * from the address list, to avoid selecting the same remote md again. */
    ucp_wireup_add_lane(select_params, &select_info, usage, select_ctx);
    ucp_wireup_unset_tl_by_md(select_params, &select_info, &tl_bitmap,
                              &remote_md_map);

    /* Select additional transports which can access allocated memory, but
     * only if their scores are better. We need this because a remote memory
     * block can be potentially allocated using one of them, and we might get
     * better performance than the transports which support only registered
     * remote memory. */
    snprintf(title, sizeof(title), criteria->title, "allocated");
    mem_criteria.title           = title;
    mem_criteria.remote_md_flags = UCT_MD_FLAG_ALLOC |
                                   criteria->remote_md_flags;

    for (;;) {
        status = ucp_wireup_select_transport(select_params, &mem_criteria,
                                             tl_bitmap, remote_md_map,
                                             UINT64_MAX, UINT64_MAX, 0,
                                             &select_info);
        /* Break if: */
        /* - transport selection wasn't OK */
        if ((status != UCS_OK) ||
            /* - the selected transport is worse than
             *   the transport selected above */
            (ucp_score_cmp(select_info.score, reg_score) <= 0)) {
            break;
        }

        /* Add lane description and remove all occurrences of the remote md. */
        ucp_wireup_add_lane(select_params, &select_info, usage, select_ctx);
        ucp_wireup_unset_tl_by_md(select_params, &select_info, &tl_bitmap,
                                  &remote_md_map);
    }

    status = UCS_OK;

out:
    if ((status != UCS_OK) && select_params->allow_am) {
        /* using emulation over active messages */
        select_ctx->ucp_ep_init_flags |= UCP_EP_INIT_CREATE_AM_LANE;
        status                         = UCS_OK;
    }

    return status;
}

static uint64_t ucp_ep_get_context_features(const ucp_ep_h ep)
{
    return ep->worker->context->config.features;
}

static double ucp_wireup_rma_score_func(ucp_context_h context,
                                        const uct_md_attr_t *md_attr,
                                        const uct_iface_attr_t *iface_attr,
                                        const ucp_address_iface_attr_t *remote_iface_attr)
{
    /* best for 4k messages */
    return 1e-3 / (ucp_wireup_tl_iface_latency(context, iface_attr, remote_iface_attr) +
                   iface_attr->overhead +
                   (4096.0 / ucs_min(ucp_tl_iface_bandwidth(context, &iface_attr->bandwidth),
                                     ucp_tl_iface_bandwidth(context, &remote_iface_attr->bandwidth))));
}

static void ucp_wireup_fill_peer_err_criteria(ucp_wireup_criteria_t *criteria,
                                              unsigned ep_init_flags)
{
    if (ep_init_flags & UCP_EP_INIT_ERR_MODE_PEER_FAILURE) {
        criteria->local_iface_flags |= UCT_IFACE_FLAG_ERRHANDLE_PEER_FAILURE;
    }
}

static void ucp_wireup_fill_aux_criteria(ucp_wireup_criteria_t *criteria,
                                         unsigned ep_init_flags)
{
    criteria->title              = "auxiliary";
    criteria->local_md_flags     = 0;
    criteria->remote_md_flags    = 0;
    criteria->local_iface_flags  = UCT_IFACE_FLAG_CONNECT_TO_IFACE |
                                   UCT_IFACE_FLAG_AM_BCOPY |
                                   UCT_IFACE_FLAG_PENDING;
    criteria->remote_iface_flags = UCT_IFACE_FLAG_CONNECT_TO_IFACE |
                                   UCT_IFACE_FLAG_AM_BCOPY |
                                   UCT_IFACE_FLAG_CB_ASYNC;
    criteria->calc_score         = ucp_wireup_aux_score_func;
    criteria->tl_rsc_flags       = UCP_TL_RSC_FLAG_AUX; /* Can use aux transports */

    ucp_wireup_fill_peer_err_criteria(criteria, ep_init_flags);
}

static void ucp_wireup_clean_amo_criteria(ucp_wireup_criteria_t *criteria)
{
    memset(&criteria->remote_atomic_flags, 0,
           sizeof(criteria->remote_atomic_flags));
    memset(&criteria->local_atomic_flags, 0,
           sizeof(criteria->local_atomic_flags));
}

/**
 * Check whether emulation over AM is allowed for RMA/AMO lanes
 */
static int ucp_wireup_allow_am_emulation_layer(unsigned ep_init_flags)
{
    /* disable emulation layer if err handling is required due to lack of
     * keep alive protocol */
    return !(ep_init_flags & (UCP_EP_INIT_FLAG_MEM_TYPE |
                              UCP_EP_INIT_ERR_MODE_PEER_FAILURE));
}

static unsigned
ucp_wireup_ep_init_flags(const ucp_wireup_select_params_t *select_params,
                         const ucp_wireup_select_context_t *select_ctx)
{
    return select_params->ep_init_flags | select_ctx->ucp_ep_init_flags;
}

static ucs_status_t
ucp_wireup_add_cm_lane(const ucp_wireup_select_params_t *select_params,
                       ucp_wireup_select_context_t *select_ctx)
{
    ucp_wireup_select_info_t select_info;

    if (!(select_params->ep_init_flags & (UCP_EP_INIT_CM_WIREUP_CLIENT |
                                          UCP_EP_INIT_CM_WIREUP_SERVER))) {
        return UCS_OK;
    }

    select_info.priority   = 0;  /**< Currently we have only 1 CM
                                      implementation */
    select_info.rsc_index  = UCP_NULL_RESOURCE; /**< RSC doesn't matter for CM */
    select_info.addr_index = 0;  /**< This makes sense only for transport
                                      lanes */
    select_info.score      = 0.; /**< TODO: when we have > 1 CM implementation */

    /* server is not a proxy because it can create all lanes connected */
    ucp_wireup_add_lane_desc(&select_info, select_info.rsc_index,
                             UCP_WIREUP_LANE_USAGE_CM, 0, select_ctx);
    return UCS_OK;
}

static ucs_status_t
ucp_wireup_add_rma_lanes(const ucp_wireup_select_params_t *select_params,
                         ucp_wireup_select_context_t *select_ctx)
{
    ucp_wireup_criteria_t criteria = {0};
    unsigned ep_init_flags         = ucp_wireup_ep_init_flags(select_params,
                                                              select_ctx);

    if (!(ucp_ep_get_context_features(select_params->ep) & UCP_FEATURE_RMA) &&
        !(ep_init_flags & UCP_EP_INIT_FLAG_MEM_TYPE)) {
        return UCS_OK;
    }

    if (ep_init_flags & UCP_EP_INIT_FLAG_MEM_TYPE) {
        criteria.title              = "copy across memory types";
        criteria.remote_iface_flags = UCT_IFACE_FLAG_PUT_SHORT;
        criteria.local_iface_flags  = criteria.remote_iface_flags;
    } else {
        criteria.title              = "remote %s memory access";
        criteria.remote_iface_flags = UCT_IFACE_FLAG_PUT_SHORT |
                                      UCT_IFACE_FLAG_PUT_BCOPY |
                                      UCT_IFACE_FLAG_GET_BCOPY;
        criteria.local_iface_flags  = criteria.remote_iface_flags |
                                      UCT_IFACE_FLAG_PENDING;
    }
    criteria.calc_score             = ucp_wireup_rma_score_func;
    criteria.tl_rsc_flags           = 0;
    ucp_wireup_fill_peer_err_criteria(&criteria, ep_init_flags);

    return ucp_wireup_add_memaccess_lanes(select_params, &criteria, UINT64_MAX,
                                          UCP_WIREUP_LANE_USAGE_RMA,
                                          select_ctx);
}

double ucp_wireup_amo_score_func(ucp_context_h context,
                                 const uct_md_attr_t *md_attr,
                                 const uct_iface_attr_t *iface_attr,
                                 const ucp_address_iface_attr_t *remote_iface_attr)
{
    /* best one-sided latency */
    return 1e-3 / (ucp_wireup_tl_iface_latency(context, iface_attr, remote_iface_attr) +
                   iface_attr->overhead);
}

static ucs_status_t
ucp_wireup_add_amo_lanes(const ucp_wireup_select_params_t *select_params,
                         ucp_wireup_select_context_t *select_ctx)
{
    ucp_worker_h worker            = select_params->ep->worker;
    ucp_context_h context          = worker->context;
    ucp_wireup_criteria_t criteria = {0};
    unsigned ep_init_flags         = ucp_wireup_ep_init_flags(select_params,
                                                              select_ctx);
    ucp_rsc_index_t rsc_index;
    uint64_t tl_bitmap;

    if (!ucs_test_flags(context->config.features,
                        UCP_FEATURE_AMO32, UCP_FEATURE_AMO64) ||
        (ep_init_flags & UCP_EP_INIT_FLAG_MEM_TYPE)) {
        return UCS_OK;
    }

    ucp_context_uct_atomic_iface_flags(context, &criteria.remote_atomic_flags);

    criteria.title              = "atomic operations on %s memory";
    criteria.local_iface_flags  = criteria.remote_iface_flags |
                                  UCT_IFACE_FLAG_PENDING;
    criteria.local_atomic_flags = criteria.remote_atomic_flags;
    criteria.calc_score         = ucp_wireup_amo_score_func;
    ucp_wireup_fill_peer_err_criteria(&criteria, ep_init_flags);

    /* We can use only non-p2p resources or resources which are explicitly
     * selected for atomics. Otherwise, the remote peer would not be able to
     * connect back on p2p transport.
     */
    tl_bitmap = worker->atomic_tls;
    ucs_for_each_bit(rsc_index, context->tl_bitmap) {
        if (!ucp_worker_is_tl_p2p(worker, rsc_index)) {
            tl_bitmap |= UCS_BIT(rsc_index);
        }
    }

    return ucp_wireup_add_memaccess_lanes(select_params, &criteria, tl_bitmap,
                                          UCP_WIREUP_LANE_USAGE_AMO,
                                          select_ctx);
}

static double ucp_wireup_am_score_func(ucp_context_h context,
                                       const uct_md_attr_t *md_attr,
                                       const uct_iface_attr_t *iface_attr,
                                       const ucp_address_iface_attr_t *remote_iface_attr)
{
    /* best end-to-end latency */
    return 1e-3 / (ucp_wireup_tl_iface_latency(context, iface_attr, remote_iface_attr) +
                   iface_attr->overhead + remote_iface_attr->overhead);
}

static double ucp_wireup_rma_bw_score_func(ucp_context_h context,
                                           const uct_md_attr_t *md_attr,
                                           const uct_iface_attr_t *iface_attr,
                                           const ucp_address_iface_attr_t *remote_iface_attr)
{
    /* highest bandwidth with lowest overhead - test a message size of 256KB,
     * a size which is likely to be used for high-bw memory access protocol, for
     * how long it would take to transfer it with a certain transport. */
    return 1 / ((UCP_WIREUP_RMA_BW_TEST_MSG_SIZE /
                ucs_min(ucp_tl_iface_bandwidth(context, &iface_attr->bandwidth),
                        ucp_tl_iface_bandwidth(context, &remote_iface_attr->bandwidth))) +
                ucp_wireup_tl_iface_latency(context, iface_attr, remote_iface_attr) +
                iface_attr->overhead + md_attr->reg_cost.overhead +
                (UCP_WIREUP_RMA_BW_TEST_MSG_SIZE * md_attr->reg_cost.growth));
}

static inline int
ucp_wireup_is_am_required(const ucp_wireup_select_params_t *select_params,
                          const ucp_wireup_select_context_t *select_ctx)
{
    ucp_ep_h ep            = select_params->ep;
    unsigned ep_init_flags = ucp_wireup_ep_init_flags(select_params,
                                                      select_ctx);
    ucp_lane_index_t lane;

    /* Check if we need active messages from the configurations, for wireup.
     * If not, check if am is required due to p2p transports */

    if (ep_init_flags & UCP_EP_INIT_CREATE_AM_LANE) {
        return 1;
    }

    if (!(ep_init_flags & UCP_EP_INIT_FLAG_MEM_TYPE) &&
        (ucp_ep_get_context_features(ep) & (UCP_FEATURE_TAG | 
                                            UCP_FEATURE_STREAM | 
                                            UCP_FEATURE_AM))) {
        return 1;
    }

    for (lane = 0; lane < select_ctx->num_lanes; ++lane) {
        if (ucp_worker_is_tl_p2p(ep->worker,
                                 select_ctx->lane_descs[lane].rsc_index)) {
            return 1;
        }
    }

    return 0;
}

static ucs_status_t
ucp_wireup_add_am_lane(const ucp_wireup_select_params_t *select_params,
                       ucp_wireup_select_info_t *am_info,
                       ucp_wireup_select_context_t *select_ctx)
{
    ucp_wireup_criteria_t criteria = {0};
    ucs_status_t status;

    if (!ucp_wireup_is_am_required(select_params, select_ctx)) {
        memset(am_info, 0, sizeof(*am_info));
        return UCS_OK;
    }

    /* Select one lane for active messages */
    criteria.title              = "active messages";
    criteria.remote_iface_flags = UCT_IFACE_FLAG_AM_BCOPY |
                                  UCT_IFACE_FLAG_CB_SYNC;
    criteria.local_iface_flags  = UCT_IFACE_FLAG_AM_BCOPY;
    criteria.calc_score         = ucp_wireup_am_score_func;
    ucp_wireup_fill_peer_err_criteria(&criteria,
                                      ucp_wireup_ep_init_flags(select_params,
                                                               select_ctx));

    if (ucs_test_all_flags(ucp_ep_get_context_features(select_params->ep),
                           UCP_FEATURE_TAG | UCP_FEATURE_WAKEUP)) {
        criteria.local_iface_flags |= UCP_WORKER_UCT_UNSIG_EVENT_CAP_FLAGS;
    }

    status = ucp_wireup_select_transport(select_params, &criteria,
                                         select_params->tl_bitmap, UINT64_MAX,
                                         UINT64_MAX, UINT64_MAX, 1, am_info);
    if (status != UCS_OK) {
        return status;
    }

    ucp_wireup_add_lane(select_params, am_info, UCP_WIREUP_LANE_USAGE_AM,
                        select_ctx);
    return UCS_OK;
}

static double ucp_wireup_am_bw_score_func(ucp_context_h context,
                                          const uct_md_attr_t *md_attr,
                                          const uct_iface_attr_t *iface_attr,
                                          const ucp_address_iface_attr_t *remote_iface_attr)
{
    /* best single MTU bandwidth */
    double size = iface_attr->cap.am.max_bcopy;
    double time = (size / ucs_min(ucp_tl_iface_bandwidth(context, &iface_attr->bandwidth),
                                  ucp_tl_iface_bandwidth(context, &remote_iface_attr->bandwidth))) +
                  iface_attr->overhead + remote_iface_attr->overhead +
                  ucp_wireup_tl_iface_latency(context, iface_attr, remote_iface_attr);

    return size / time * 1e-5;
}

int ucp_wireup_is_rsc_self_or_shm(ucp_ep_h ep, ucp_rsc_index_t rsc_index)
{
    return (ep->worker->context->tl_rscs[rsc_index].tl_rsc.dev_type == UCT_DEVICE_TYPE_SHM) ||
           (ep->worker->context->tl_rscs[rsc_index].tl_rsc.dev_type == UCT_DEVICE_TYPE_SELF);
}

static unsigned
ucp_wireup_add_bw_lanes(const ucp_wireup_select_params_t *select_params,
                        const ucp_wireup_select_bw_info_t *bw_info,
                        uint64_t tl_bitmap,
                        ucp_wireup_select_context_t *select_ctx)
{
    ucp_ep_h ep                    = select_params->ep;
    ucp_context_h context          = ep->worker->context;
    ucp_wireup_select_info_t sinfo = {0};
    const ucp_address_entry_t *ae;
    ucs_status_t status;
    unsigned num_lanes;
    uint64_t local_dev_bitmap;
    uint64_t remote_dev_bitmap;
    ucp_md_map_t md_map;

    num_lanes         = 0;
    md_map            = bw_info->md_map;
    local_dev_bitmap  = bw_info->local_dev_bitmap;
    remote_dev_bitmap = bw_info->remote_dev_bitmap;

    /* lookup for requested number of lanes or limit of MD map
     * (we have to limit MD's number to avoid malloc in
     * memory registration) */
    while ((num_lanes < bw_info->max_lanes) &&
           (ucs_popcount(md_map) < UCP_MAX_OP_MDS)) {
        status = ucp_wireup_select_transport(select_params, &bw_info->criteria,
                                             tl_bitmap, UINT64_MAX,
                                             local_dev_bitmap, remote_dev_bitmap,
                                             0, &sinfo);
        if (status != UCS_OK) {
            break;
        }

        ucp_wireup_add_lane(select_params, &sinfo, bw_info->usage, select_ctx);

        md_map |= UCS_BIT(context->tl_rscs[sinfo.rsc_index].md_index);
        num_lanes++;

        local_dev_bitmap  &= ~UCS_BIT(context->tl_rscs[sinfo.rsc_index].dev_index);
        ae                 = &select_params->address->address_list[sinfo.addr_index];
        remote_dev_bitmap &= ~UCS_BIT(ae->dev_index);

        if (ucp_wireup_is_rsc_self_or_shm(ep, sinfo.rsc_index)) {
            /* special case for SHM: do not try to lookup additional lanes when
             * SHM transport detected (another transport will be significantly
             * slower) */
            break;
        }
    }

    return num_lanes;
}

static ucs_status_t
ucp_wireup_add_am_bw_lanes(const ucp_wireup_select_params_t *select_params,
                           ucp_wireup_select_context_t *select_ctx)
{
    ucp_ep_h ep            = select_params->ep; 
    ucp_context_h context  = ep->worker->context;
    unsigned ep_init_flags = ucp_wireup_ep_init_flags(select_params,
                                                      select_ctx);
            
    ucp_wireup_select_bw_info_t bw_info;
    ucp_lane_index_t lane_desc_idx;
    ucp_rsc_index_t rsc_index;
    unsigned addr_index;

    /* Check if we need active messages, for wireup */
    if (!(ucp_ep_get_context_features(ep) & UCP_FEATURE_TAG) ||
        (ep_init_flags & UCP_EP_INIT_FLAG_MEM_TYPE) ||
        (context->config.ext.max_eager_lanes < 2)) {
        return UCS_OK;
    }

    /* Select one lane for active messages */
    bw_info.criteria.title              = "high-bw active messages";
    bw_info.criteria.local_md_flags     = 0;
    bw_info.criteria.remote_md_flags    = 0;
    bw_info.criteria.remote_iface_flags = UCT_IFACE_FLAG_AM_BCOPY |
                                          UCT_IFACE_FLAG_CB_SYNC;
    bw_info.criteria.local_iface_flags  = UCT_IFACE_FLAG_AM_BCOPY;
    bw_info.criteria.calc_score         = ucp_wireup_am_bw_score_func;
    bw_info.criteria.tl_rsc_flags       = 0;
    ucp_wireup_clean_amo_criteria(&bw_info.criteria);
    ucp_wireup_fill_peer_err_criteria(&bw_info.criteria, ep_init_flags);

    if (ucs_test_all_flags(ucp_ep_get_context_features(ep),
                           UCP_FEATURE_TAG | UCP_FEATURE_WAKEUP)) {
        bw_info.criteria.local_iface_flags |= UCP_WORKER_UCT_UNSIG_EVENT_CAP_FLAGS;
    }

    bw_info.local_dev_bitmap  = UINT64_MAX;
    bw_info.remote_dev_bitmap = UINT64_MAX;
    bw_info.md_map            = 0;
    bw_info.max_lanes         = context->config.ext.max_eager_lanes - 1;
    bw_info.usage             = UCP_WIREUP_LANE_USAGE_AM_BW;

    /* am_bw_lane[0] is am_lane, so don't re-select it here */
    for (lane_desc_idx = 0; lane_desc_idx < select_ctx->num_lanes; ++lane_desc_idx) {
        if (select_ctx->lane_descs[lane_desc_idx].usage & UCP_WIREUP_LANE_USAGE_AM) {
            addr_index                 = select_ctx->lane_descs[lane_desc_idx].addr_index;
            rsc_index                  = select_ctx->lane_descs[lane_desc_idx].rsc_index;
            bw_info.md_map            |= UCS_BIT(context->tl_rscs[rsc_index].md_index);
            bw_info.local_dev_bitmap  &= ~UCS_BIT(context->tl_rscs[rsc_index].dev_index);
            bw_info.remote_dev_bitmap &= ~UCS_BIT(select_params->address->
                                                  address_list[addr_index].dev_index);
            if (ucp_wireup_is_rsc_self_or_shm(ep, rsc_index)) {
                /* if AM lane is SELF or SHMEM - then do not use more lanes */
                return UCS_OK;
            } else {
                break; /* do not continue searching due to we found
                          AM lane (and there is only one lane) */
            }
        }
    }

    /* don't check returned number of lanes from the function below,
     * since we already have one AM BW lane - AM lane */
    ucp_wireup_add_bw_lanes(select_params, &bw_info, UINT64_MAX, select_ctx);

    return UCS_OK;
}

static uint64_t ucp_wireup_get_rma_bw_iface_flags(ucp_rndv_mode_t rndv_mode)
{
    switch (rndv_mode) {
    case UCP_RNDV_MODE_AUTO:
        return (UCT_IFACE_FLAG_GET_ZCOPY | UCT_IFACE_FLAG_PUT_ZCOPY);
    case UCP_RNDV_MODE_GET_ZCOPY:
        return UCT_IFACE_FLAG_GET_ZCOPY;
    case UCP_RNDV_MODE_PUT_ZCOPY:
        return UCT_IFACE_FLAG_PUT_ZCOPY;
    default:
        return 0;
    }
}

static ucs_status_t
ucp_wireup_add_rma_bw_lanes(const ucp_wireup_select_params_t *select_params,
                            ucp_wireup_select_context_t *select_ctx)
{
    ucp_ep_h ep                  = select_params->ep;
    ucp_context_h context        = ep->worker->context;
    unsigned ep_init_flags       = ucp_wireup_ep_init_flags(select_params,
                                                            select_ctx);
    uint64_t iface_rma_flags     = 0;
    ucp_rndv_mode_t rndv_modes[] = {
        context->config.ext.rndv_mode,
        UCP_RNDV_MODE_GET_ZCOPY,
        UCP_RNDV_MODE_PUT_ZCOPY
    };
    ucp_wireup_select_bw_info_t bw_info;
    ucs_memory_type_t mem_type;
    size_t added_lanes;
    uint64_t md_reg_flag;
    uint8_t i;

    if (ep_init_flags & UCP_EP_INIT_FLAG_MEM_TYPE) {
        md_reg_flag = 0;
    } else if (ucp_ep_get_context_features(ep) & UCP_FEATURE_TAG) {
        /* if needed for RNDV, need only access for remote registered memory */
        md_reg_flag = UCT_MD_FLAG_REG;
    } else {
        return UCS_OK;
    }

    bw_info.usage                       = UCP_WIREUP_LANE_USAGE_RMA_BW;
    bw_info.criteria.title              = "high-bw remote memory access";
    bw_info.criteria.remote_iface_flags = 0;
    bw_info.criteria.local_iface_flags  = UCT_IFACE_FLAG_PENDING;
    bw_info.criteria.calc_score         = ucp_wireup_rma_bw_score_func;
    bw_info.criteria.tl_rsc_flags       = 0;
    bw_info.criteria.remote_md_flags    = md_reg_flag;
    ucp_wireup_clean_amo_criteria(&bw_info.criteria);
    ucp_wireup_fill_peer_err_criteria(&bw_info.criteria, ep_init_flags);

    if (ucs_test_all_flags(ucp_ep_get_context_features(ep),
                           UCP_FEATURE_TAG | UCP_FEATURE_WAKEUP)) {
        bw_info.criteria.local_iface_flags |= UCP_WORKER_UCT_UNSIG_EVENT_CAP_FLAGS;
    }

    bw_info.local_dev_bitmap  = UINT64_MAX;
    bw_info.remote_dev_bitmap = UINT64_MAX;
    bw_info.md_map            = 0;

    /* check rkey_ptr */
    if (!(ep_init_flags & UCP_EP_INIT_FLAG_MEM_TYPE) &&
         (context->config.ext.rndv_mode == UCP_RNDV_MODE_AUTO)) {

        /* We require remote memory registration and local ability to obtain
         * a pointer to the remote key. Only one is needed since we are doing
         * memory copy on the CPU.
         * Allow selecting additional lanes in case the remote memory will not be
         * registered with this memory domain, i.e with GPU memory.
         */
        bw_info.criteria.local_md_flags  = UCT_MD_FLAG_RKEY_PTR;
        bw_info.max_lanes                = 1;

        ucp_wireup_add_bw_lanes(select_params, &bw_info,
                                context->mem_type_access_tls[UCS_MEMORY_TYPE_HOST],
                                select_ctx);
    }

    /* First checked RNDV mode has to be a mode specified in config */
    bw_info.criteria.local_md_flags  = md_reg_flag;
    bw_info.max_lanes                = context->config.ext.max_rndv_lanes;
    ucs_assert(rndv_modes[0] == context->config.ext.rndv_mode);

    /* RNDV protocol can't mix different schemes, i.e. wireup has to
     * select lanes with the same iface flags depends on a requested
     * RNDV scheme.
     * First of all, try to select lanes with RNDV scheme requested
     * by user. If no lanes were selected and RNDV scheme in the
     * configuration is AUTO, try other schemes. */
    UCS_STATIC_ASSERT(UCS_MEMORY_TYPE_HOST == 0);
    for (i = 0; i < ucs_array_size(rndv_modes); i++) {
        /* Remove the previous iface RMA flags */
        bw_info.criteria.remote_iface_flags &= ~iface_rma_flags;
        bw_info.criteria.local_iface_flags  &= ~iface_rma_flags;

        iface_rma_flags = ucp_wireup_get_rma_bw_iface_flags(rndv_modes[i]);

        /* Set the new iface RMA flags */
        bw_info.criteria.remote_iface_flags |= iface_rma_flags;
        bw_info.criteria.local_iface_flags  |= iface_rma_flags;

        added_lanes = 0;

        for (mem_type = UCS_MEMORY_TYPE_HOST;
             mem_type < UCS_MEMORY_TYPE_LAST; mem_type++) {
            if (!context->mem_type_access_tls[mem_type]) {
                continue;
            }

            added_lanes += ucp_wireup_add_bw_lanes(select_params, &bw_info,
                                                   context->mem_type_access_tls[mem_type],
                                                   select_ctx);
        }

        if (added_lanes /* There are selected lanes */ ||
            /* There are no selected lanes, but a user requested
             * the exact RNDV scheme, so there is no other choice */
            (context->config.ext.rndv_mode != UCP_RNDV_MODE_AUTO)) {
            break;
        }
    }

    return UCS_OK;
}

/* Lane for transport offloaded tag interface */
static ucs_status_t
ucp_wireup_add_tag_lane(const ucp_wireup_select_params_t *select_params,
                        const ucp_wireup_select_info_t *am_info,
                        ucp_err_handling_mode_t err_mode,
                        ucp_wireup_select_context_t *select_ctx)
{
    ucp_ep_h ep                          = select_params->ep;
    ucp_wireup_criteria_t criteria       = {0};
    ucp_wireup_select_info_t select_info = {0};
    ucs_status_t status;

    if (!(ucp_ep_get_context_features(ep) & UCP_FEATURE_TAG) ||
        /* TODO: remove check below when UCP_ERR_HANDLING_MODE_PEER supports
         *       RNDV-protocol or HW TM supports fragmented protocols
         */
        (err_mode != UCP_ERR_HANDLING_MODE_NONE)) {
        return UCS_OK;
    }

    criteria.title              = "tag_offload";
    criteria.local_md_flags     = UCT_MD_FLAG_REG; /* needed for posting tags to HW */
    criteria.remote_md_flags    = UCT_MD_FLAG_REG; /* needed for posting tags to HW */
    criteria.remote_iface_flags = /* the same as local_iface_flags */
    criteria.local_iface_flags  = UCT_IFACE_FLAG_TAG_EAGER_BCOPY |
                                  UCT_IFACE_FLAG_TAG_RNDV_ZCOPY  |
                                  UCT_IFACE_FLAG_GET_ZCOPY       |
                                  UCT_IFACE_FLAG_PENDING;
    criteria.calc_score         = ucp_wireup_am_score_func;

    if (ucs_test_all_flags(ucp_ep_get_context_features(ep),
                           UCP_FEATURE_WAKEUP)) {
        criteria.local_iface_flags |= UCP_WORKER_UCT_UNSIG_EVENT_CAP_FLAGS;
    }

    /* Do not add tag offload lane, if selected tag lane score is lower
     * than AM score. In this case AM will be used for tag macthing. */
    status = ucp_wireup_select_transport(select_params, &criteria,
                                         UINT64_MAX, UINT64_MAX, UINT64_MAX,
                                         UINT64_MAX, 0, &select_info);
    if ((status == UCS_OK) &&
        (ucp_score_cmp(select_info.score,
                       am_info->score) >= 0)) {
        ucp_wireup_add_lane(select_params, &select_info,
                            UCP_WIREUP_LANE_USAGE_TAG, select_ctx);
    }

    return UCS_OK;
}

static ucp_lane_index_t
ucp_wireup_select_wireup_msg_lane(ucp_worker_h worker,
                                  unsigned ep_init_flags,
                                  const ucp_address_entry_t *address_list,
                                  const ucp_wireup_lane_desc_t *lane_descs,
                                  ucp_lane_index_t num_lanes)
{
    ucp_context_h context          = worker->context;
    ucp_lane_index_t p2p_lane      = UCP_NULL_LANE;
    ucp_wireup_criteria_t criteria = {0};
    uct_tl_resource_desc_t *resource;
    ucp_rsc_index_t rsc_index;
    uct_iface_attr_t *attrs;
    ucp_lane_index_t lane;
    unsigned addr_index;

    ucp_wireup_fill_aux_criteria(&criteria, ep_init_flags);
    for (lane = 0; lane < num_lanes; ++lane) {
        rsc_index  = lane_descs[lane].rsc_index;
        addr_index = lane_descs[lane].addr_index;
        resource   = &context->tl_rscs[rsc_index].tl_rsc;
        attrs      = ucp_worker_iface_get_attr(worker, rsc_index);

        /* if the current lane satisfies the wireup criteria, choose it for wireup.
         * if it doesn't take a lane with a p2p transport */
        if (ucp_wireup_check_flags(resource,
                                   attrs->cap.flags,
                                   criteria.local_iface_flags, criteria.title,
                                   ucp_wireup_iface_flags, NULL, 0) &&
            ucp_wireup_check_flags(resource,
                                   address_list[addr_index].iface_attr.cap_flags,
                                   criteria.remote_iface_flags, criteria.title,
                                   ucp_wireup_iface_flags, NULL, 0))
         {
             return lane;
         } else if (ucp_worker_is_tl_p2p(worker, rsc_index)) {
             p2p_lane = lane;
         }
    }

    return p2p_lane;
}

static UCS_F_NOINLINE void
ucp_wireup_select_params_init(ucp_wireup_select_params_t *select_params,
                              ucp_ep_h ep, unsigned ep_init_flags,
                              const ucp_unpacked_address_t *remote_address,
                              uint64_t tl_bitmap, int show_error)
{
    select_params->ep            = ep;
    select_params->ep_init_flags = ep_init_flags;
    select_params->tl_bitmap     = tl_bitmap;
    select_params->address       = remote_address;
    select_params->allow_am      =
            ucp_wireup_allow_am_emulation_layer(ep_init_flags);
    select_params->show_error    = show_error;
}

static UCS_F_NOINLINE ucs_status_t
ucp_wireup_search_lanes(const ucp_wireup_select_params_t *select_params,
                        ucp_err_handling_mode_t err_mode,
                        ucp_wireup_select_context_t *select_ctx)
{
    ucp_wireup_select_info_t am_info;
    ucs_status_t status;

    memset(select_ctx, 0, sizeof(*select_ctx));

    status = ucp_wireup_add_cm_lane(select_params, select_ctx);
    if (status != UCS_OK) {
        return status;
    }

    status = ucp_wireup_add_rma_lanes(select_params, select_ctx);
    if (status != UCS_OK) {
        return status;
    }

    status = ucp_wireup_add_amo_lanes(select_params, select_ctx);
    if (status != UCS_OK) {
        return status;
    }

    /* Add AM lane only after RMA/AMO was selected to be aware
     * about whether they need emulation over AM or not */
    status = ucp_wireup_add_am_lane(select_params, &am_info, select_ctx);
    if (status != UCS_OK) {
        return status;
    }

    status = ucp_wireup_add_rma_bw_lanes(select_params, select_ctx);
    if (status != UCS_OK) {
        return status;
    }

    status = ucp_wireup_add_tag_lane(select_params, &am_info, err_mode,
                                     select_ctx);
    if (status != UCS_OK) {
        return status;
    }

    /* call ucp_wireup_add_am_bw_lanes after ucp_wireup_add_am_lane to
     * allow exclude AM lane from AM_BW list */
    status = ucp_wireup_add_am_bw_lanes(select_params, select_ctx);
    if (status != UCS_OK) {
        return status;
    }

    /* User should not create endpoints unless requested communication features */
    if (select_ctx->num_lanes == 0) {
        ucs_error("No transports selected to %s (features: 0x%lx)",
                  ucp_ep_peer_name(select_params->ep),
                  ucp_ep_get_context_features(select_params->ep));
        return UCS_ERR_UNREACHABLE;
    }

    return UCS_OK;
}

static UCS_F_NOINLINE void
ucp_wireup_construct_lanes(const ucp_wireup_select_params_t *select_params,
                           ucp_wireup_select_context_t *select_ctx,
                           unsigned *addr_indices, ucp_ep_config_key_t *key)
{
    ucp_ep_h ep           = select_params->ep;
    ucp_worker_h worker   = ep->worker;
    ucp_context_h context = worker->context;
    ucp_rsc_index_t rsc_index;
    ucp_md_index_t md_index;
    ucp_lane_index_t lane;
    ucp_lane_index_t i;

    key->num_lanes = select_ctx->num_lanes;
    /* Construct the endpoint configuration key:
     * - arrange lane description in the EP configuration
     * - create remote MD bitmap
     * - if AM lane exists and fits for wireup messages, select it for this purpose.
     */
    for (lane = 0; lane < key->num_lanes; ++lane) {
        ucs_assert(select_ctx->lane_descs[lane].usage != 0);
        key->lanes[lane].rsc_index    = select_ctx->lane_descs[lane].rsc_index;
        key->lanes[lane].proxy_lane   = select_ctx->lane_descs[lane].proxy_lane;
        key->lanes[lane].dst_md_index = select_ctx->lane_descs[lane].dst_md_index;
        addr_indices[lane]            = select_ctx->lane_descs[lane].addr_index;

        if (select_ctx->lane_descs[lane].usage & UCP_WIREUP_LANE_USAGE_CM) {
            ucs_assert(key->cm_lane == UCP_NULL_LANE);
            key->cm_lane = lane;
            /* CM lane can't be shared with TL usage */
            ucs_assert(select_ctx->lane_descs[lane].usage ==
                       UCP_WIREUP_LANE_USAGE_CM);
            continue;
        }
        if (select_ctx->lane_descs[lane].usage & UCP_WIREUP_LANE_USAGE_AM) {
            ucs_assert(key->am_lane == UCP_NULL_LANE);
            key->am_lane = lane;
        }
        if ((select_ctx->lane_descs[lane].usage & UCP_WIREUP_LANE_USAGE_AM_BW) &&
            (lane < UCP_MAX_LANES - 1)) {
            key->am_bw_lanes[lane + 1] = lane;
        }
        if (select_ctx->lane_descs[lane].usage & UCP_WIREUP_LANE_USAGE_RMA) {
            key->rma_lanes[lane] = lane;
        }
        if (select_ctx->lane_descs[lane].usage & UCP_WIREUP_LANE_USAGE_RMA_BW) {
            key->rma_bw_lanes[lane] = lane;
        }
        if (select_ctx->lane_descs[lane].usage & UCP_WIREUP_LANE_USAGE_AMO) {
            key->amo_lanes[lane] = lane;
        }
        if (select_ctx->lane_descs[lane].usage & UCP_WIREUP_LANE_USAGE_TAG) {
            ucs_assert(key->tag_lane == UCP_NULL_LANE);
            key->tag_lane = lane;
        }
    }

    /* Sort AM, RMA and AMO lanes according to score */
    ucs_qsort_r(key->am_bw_lanes + 1, UCP_MAX_LANES - 1, sizeof(ucp_lane_index_t),
                ucp_wireup_compare_lane_am_bw_score, select_ctx->lane_descs);
    ucs_qsort_r(key->rma_lanes, UCP_MAX_LANES, sizeof(ucp_lane_index_t),
                ucp_wireup_compare_lane_rma_score, select_ctx->lane_descs);
    ucs_qsort_r(key->rma_bw_lanes, UCP_MAX_LANES, sizeof(ucp_lane_index_t),
                ucp_wireup_compare_lane_rma_bw_score, select_ctx->lane_descs);
    ucs_qsort_r(key->amo_lanes, UCP_MAX_LANES, sizeof(ucp_lane_index_t),
                ucp_wireup_compare_lane_amo_score, select_ctx->lane_descs);

    if (!(select_params->ep_init_flags & (UCP_EP_INIT_CM_WIREUP_CLIENT |
                                          UCP_EP_INIT_CM_WIREUP_SERVER))) {
        /* Select lane for wireup messages */
        key->wireup_lane =
        ucp_wireup_select_wireup_msg_lane(worker,
                                          ucp_wireup_ep_init_flags(select_params,
                                                                   select_ctx),
                                          select_params->address->address_list,
                                          select_ctx->lane_descs,
                                          key->num_lanes);
    }

    /* add to map first UCP_MAX_OP_MDS fastest MD's */
    for (i = 0;
         (key->rma_bw_lanes[i] != UCP_NULL_LANE) &&
         (ucs_popcount(key->rma_bw_md_map) < UCP_MAX_OP_MDS); i++) {
        lane = key->rma_bw_lanes[i];
        rsc_index = select_ctx->lane_descs[lane].rsc_index;
        md_index  = context->tl_rscs[rsc_index].md_index;

        /* Pack remote key only if needed for RMA.
         * FIXME a temporary workaround to prevent the ugni uct from using rndv. */
        if ((context->tl_mds[md_index].attr.cap.flags & UCT_MD_FLAG_NEED_RKEY) &&
            !(strstr(context->tl_rscs[rsc_index].tl_rsc.tl_name, "ugni"))) {
            key->rma_bw_md_map |= UCS_BIT(md_index);
        }
    }

    /* use AM lane first for eager AM transport to simplify processing single/middle
     * msg packets */
    key->am_bw_lanes[0] = key->am_lane;
}

ucs_status_t
ucp_wireup_select_lanes(ucp_ep_h ep, unsigned ep_init_flags, uint64_t tl_bitmap,
                        const ucp_unpacked_address_t *remote_address,
                        unsigned *addr_indices, ucp_ep_config_key_t *key)
{
    ucp_worker_h worker         = ep->worker;
    uint64_t scalable_tl_bitmap = worker->scalable_tl_bitmap & tl_bitmap;
    ucp_wireup_select_context_t select_ctx;
    ucp_wireup_select_params_t select_params;
    ucs_status_t status;

    if (scalable_tl_bitmap) {
        ucp_wireup_select_params_init(&select_params, ep, ep_init_flags,
                                      remote_address, scalable_tl_bitmap, 0);
        status = ucp_wireup_search_lanes(&select_params, key->err_mode,
                                         &select_ctx);
        if (status == UCS_OK) {
            goto out;
        }

        /* If the transport selection based on the scalable TL bitmap wasn't
         * successful, repeat the selection procedure with full TL bitmap in
         * order to select best transports based on their scores only */
    }

    ucp_wireup_select_params_init(&select_params, ep, ep_init_flags,
                                  remote_address, tl_bitmap, 1);
    status = ucp_wireup_search_lanes(&select_params, key->err_mode,
                                     &select_ctx);
    if (status != UCS_OK) {
        return status;
    }

out:
    ucp_wireup_construct_lanes(&select_params, &select_ctx, addr_indices, key);
    return UCS_OK;
}

static double ucp_wireup_aux_score_func(ucp_context_h context,
                                        const uct_md_attr_t *md_attr,
                                        const uct_iface_attr_t *iface_attr,
                                        const ucp_address_iface_attr_t *remote_iface_attr)
{
    /* best end-to-end latency and larger bcopy size */
    return (1e-3 / (ucp_wireup_tl_iface_latency(context, iface_attr, remote_iface_attr) +
            iface_attr->overhead + remote_iface_attr->overhead));
}

ucs_status_t
ucp_wireup_select_aux_transport(ucp_ep_h ep, unsigned ep_init_flags,
                                const ucp_unpacked_address_t *remote_address,
                                ucp_wireup_select_info_t *select_info)
{
    ucp_wireup_criteria_t criteria = {0};
    ucp_wireup_select_params_t select_params;

    ucp_wireup_select_params_init(&select_params, ep, ep_init_flags,
                                  remote_address, UINT64_MAX, 1);
    ucp_wireup_fill_aux_criteria(&criteria, ep_init_flags);
    return ucp_wireup_select_transport(&select_params, &criteria,
                                       UINT64_MAX, UINT64_MAX, UINT64_MAX,
                                       UINT64_MAX, 1, select_info);
}

ucs_status_t
ucp_wireup_select_sockaddr_transport(const ucp_context_h context,
                                     const ucs_sock_addr_t *sockaddr,
                                     ucp_rsc_index_t *rsc_index_p)
{
    char saddr_str[UCS_SOCKADDR_STRING_LEN];
    ucp_tl_resource_desc_t *resource;
    ucp_rsc_index_t tl_id;
    ucp_md_index_t md_index;
    uct_md_h md;
    int i;

    /* Go over the sockaddr transports priority array and try to use the transports
     * one by one for the client side */
    for (i = 0; i < context->config.num_sockaddr_tls; i++) {
        tl_id    = context->config.sockaddr_tl_ids[i];
        resource = &context->tl_rscs[tl_id];
        md_index = resource->md_index;
        md       = context->tl_mds[md_index].md;

        ucs_assert(context->tl_mds[md_index].attr.cap.flags &
                   UCT_MD_FLAG_SOCKADDR);

        /* The client selects the transport for sockaddr according to the
         * configuration. We rely on the server having this transport available
         * as well */
        if (uct_md_is_sockaddr_accessible(md, sockaddr,
                                          UCT_SOCKADDR_ACC_REMOTE)) {
            *rsc_index_p = tl_id;
            ucs_debug("sockaddr transport selected: %s", resource->tl_rsc.tl_name);
            return UCS_OK;
        }

        ucs_debug("md %s cannot reach %s",
                  context->tl_mds[md_index].rsc.md_name,
                  ucs_sockaddr_str(sockaddr->addr, saddr_str,
                                   sizeof(saddr_str)));
    }

    return UCS_ERR_UNREACHABLE;
}