Blob Blame History Raw
/**
 * Copyright (C) Mellanox Technologies Ltd. 2001-2015.  ALL RIGHTS RESERVED.
 * Copyright (C) ARM Ltd. 2016.  ALL RIGHTS RESERVED.
 * Copyright (C) Advanced Micro Devices, Inc. 2019. ALL RIGHTS RESERVED.
 *
 * See file LICENSE for terms.
 */


#ifndef UCP_CONTEXT_H_
#define UCP_CONTEXT_H_

#include "ucp_types.h"
#include "ucp_thread.h"

#include <ucp/api/ucp.h>
#include <uct/api/uct.h>
#include <ucs/datastruct/mpool.h>
#include <ucs/datastruct/queue_types.h>
#include <ucs/memory/memtype_cache.h>
#include <ucs/type/spinlock.h>
#include <ucs/sys/string.h>


enum {
    /* The flag indicates that the resource may be used for auxiliary
     * wireup communications only */
    UCP_TL_RSC_FLAG_AUX      = UCS_BIT(0),
    /* The flag indicates that the resource may be used for client-server
     * connection establishment with a sockaddr */
    UCP_TL_RSC_FLAG_SOCKADDR = UCS_BIT(1)
};


typedef struct ucp_context_config {
    /** Threshold for switching UCP to buffered copy(bcopy) protocol */
    size_t                                 bcopy_thresh;
    /** Threshold for switching UCP to rendezvous protocol */
    size_t                                 rndv_thresh;
    /** Threshold for switching UCP to rendezvous protocol
     *  in ucp_tag_send_nbr() */
    size_t                                 rndv_send_nbr_thresh;
    /** Threshold for switching UCP to rendezvous protocol in case the calculated
     *  threshold is zero or negative */
    size_t                                 rndv_thresh_fallback;
    /** The percentage allowed for performance difference between rendezvous
     *  and the eager_zcopy protocol */
    double                                 rndv_perf_diff;
    /** Threshold for switching UCP to zero copy protocol */
    size_t                                 zcopy_thresh;
    /** Communication scheme in RNDV protocol */
    ucp_rndv_mode_t                        rndv_mode;
    /** Estimation of bcopy bandwidth */
    double                                 bcopy_bw;
    /** Segment size in the worker pre-registered memory pool */
    size_t                                 seg_size;
    /** RNDV pipeline fragment size */
    size_t                                 rndv_frag_size;
    /** Threshold for using tag matching offload capabilities. Smaller buffers
     *  will not be posted to the transport. */
    size_t                                 tm_thresh;
    /** Threshold for forcing tag matching offload capabilities */
    size_t                                 tm_force_thresh;
    /** Upper bound for posting tm offload receives with internal UCP
     *  preregistered bounce buffers. */
    size_t                                 tm_max_bb_size;
    /** Enabling SW rndv protocol with tag offload mode */
    int                                    tm_sw_rndv;
    /** Maximal size of worker name for debugging */
    unsigned                               max_worker_name;
    /** Atomic mode */
    ucp_atomic_mode_t                      atomic_mode;
    /** If use mutex for MT support or not */
    int                                    use_mt_mutex;
    /** On-demand progress */
    int                                    adaptive_progress;
    /** Eager-am multi-lane support */
    unsigned                               max_eager_lanes;
    /** Rendezvous-get multi-lane support */
    unsigned                               max_rndv_lanes;
    /** Estimated number of endpoints */
    size_t                                 estimated_num_eps;
    /** Estimated number of processes per node */
    size_t                                 estimated_num_ppn;
    /** Memtype cache */
    int                                    enable_memtype_cache;
    /** Enable flushing endpoints while flushing a worker */
    int                                    flush_worker_eps;
    /** Enable optimizations suitable for homogeneous systems */
    int                                    unified_mode;
    /** Enable cm wireup-and-close protocol for client-server connections */
    ucs_ternary_value_t                    sockaddr_cm_enable;
} ucp_context_config_t;


struct ucp_config {
    /** Array of device lists names to use.
     *  This array holds three lists - network devices, shared memory devices
     *  and acceleration devices */
    ucs_config_names_array_t               devices[UCT_DEVICE_TYPE_LAST];
    /** Array of transport names to use */
    ucs_config_names_array_t               tls;
    /** Array of memory allocation methods */
    UCS_CONFIG_STRING_ARRAY_FIELD(methods) alloc_prio;
    /** Array of transports for partial worker address to pack */
    UCS_CONFIG_STRING_ARRAY_FIELD(aux_tls) sockaddr_aux_tls;
    /** Array of transports for client-server transports and port selection */
    UCS_CONFIG_STRING_ARRAY_FIELD(cm_tls)  sockaddr_cm_tls;
    /** Warn on invalid configuration */
    int                                    warn_invalid_config;
    /** Configuration saved directly in the context */
    ucp_context_config_t                   ctx;
};


/**
 * UCP communication resource descriptor
 */
typedef struct ucp_tl_resource_desc {
    uct_tl_resource_desc_t        tl_rsc;     /* UCT resource descriptor */
    uint16_t                      tl_name_csum; /* Checksum of transport name */
    ucp_rsc_index_t               md_index;   /* Memory domain index (within the context) */
    ucp_rsc_index_t               dev_index;  /* Arbitrary device index. Resources
                                                 with same index have same device name. */
    uint8_t                       flags;      /* Flags that describe resource specifics */
} ucp_tl_resource_desc_t;


/**
 * Transport aliases.
 */
typedef struct ucp_tl_alias {
    const char                    *alias;   /* Alias name */
    const char*                   tls[8];   /* Transports which are selected by the alias */
} ucp_tl_alias_t;


/**
 * UCT component
 */
typedef struct ucp_tl_cmpt {
    uct_component_h               cmpt;      /* UCT component handle */
    uct_component_attr_t          attr;      /* UCT component attributes */
} ucp_tl_cmpt_t;


/**
 * Memory domain.
 */
typedef struct ucp_tl_md {
    uct_md_h                      md;         /* Memory domain handle */
    ucp_rsc_index_t               cmpt_index; /* Index of owning component */
    uct_md_resource_desc_t        rsc;        /* Memory domain resource */
    uct_md_attr_t                 attr;       /* Memory domain attributes */
} ucp_tl_md_t;


/**
 * UCP context
 */
typedef struct ucp_context {

    ucp_tl_cmpt_t                 *tl_cmpts;  /* UCT components */
    ucp_rsc_index_t               num_cmpts;  /* Number of UCT components */

    ucp_tl_md_t                   *tl_mds;    /* Memory domain resources */
    ucp_rsc_index_t               num_mds;    /* Number of memory domains */

    /* List of MDs which detect non host memory type */
    ucp_rsc_index_t               mem_type_detect_mds[UCS_MEMORY_TYPE_LAST];
    ucp_rsc_index_t               num_mem_type_detect_mds;  /* Number of mem type MDs */
    ucs_memtype_cache_t           *memtype_cache;           /* mem type allocation cache */

    ucp_tl_resource_desc_t        *tl_rscs;   /* Array of communication resources */
    uint64_t                      tl_bitmap;  /* Cached map of tl resources used by workers.
                                               * Not all resources may be used if unified
                                               * mode is enabled. */
    ucp_rsc_index_t               num_tls;    /* Number of resources in the array */

    /* Mask of memory type communication resources */
    uint64_t                      mem_type_access_tls[UCS_MEMORY_TYPE_LAST];

    struct {

        /* Bitmap of features supported by the context */
        uint64_t                  features;
        uint64_t                  tag_sender_mask;

        /* How many endpoints are expected to be created */
        int                       est_num_eps;

        /* How many endpoints are expected to be created on single node */
        int                       est_num_ppn;

        struct {
            size_t                         size;    /* Request size for user */
            ucp_request_init_callback_t    init;    /* Initialization user callback */
            ucp_request_cleanup_callback_t cleanup; /* Cleanup user callback */
        } request;

        /* Array of allocation methods, a mix of MD allocation methods and non-MD */
        struct {
            /* Allocation method */
            uct_alloc_method_t    method;

            /* Component name to use, if method is MD */
            char                  cmpt_name[UCT_COMPONENT_NAME_MAX];
        } *alloc_methods;
        unsigned                  num_alloc_methods;

        /* Cached map of components which support CM capability */
        uint64_t                  cm_cmpts_bitmap;

        /* Bitmap of sockaddr auxiliary transports to pack for client/server flow */
        uint64_t                  sockaddr_aux_rscs_bitmap;

        /* Array of sockaddr transports indexes.
         * The indexes appear in the configured priority order */
        ucp_rsc_index_t           sockaddr_tl_ids[UCP_MAX_RESOURCES];
        ucp_rsc_index_t           num_sockaddr_tls;
        /* Array of CMs indexes. The indexes appear in the configured priority
         * order. */
        ucp_rsc_index_t           cm_cmpt_idxs[UCP_MAX_RESOURCES];
        ucp_rsc_index_t           num_cm_cmpts;

        /* Configuration supplied by the user */
        ucp_context_config_t      ext;

    } config;

    /* All configurations about multithreading support */
    ucp_mt_lock_t                 mt_lock;

} ucp_context_t;


typedef struct ucp_am_handler {
    uint64_t                      features;
    uct_am_callback_t             cb;
    ucp_am_tracer_t               tracer;
    uint32_t                      flags;
    uct_am_callback_t             proxy_cb;
} ucp_am_handler_t;

typedef struct ucp_tl_iface_atomic_flags {
    struct {
        uint64_t                  op_flags;  /**< Attributes for atomic-post operations */
        uint64_t                  fop_flags; /**< Attributes for atomic-fetch operations */
    } atomic32, atomic64;
} ucp_tl_iface_atomic_flags_t;

#define UCP_ATOMIC_OP_MASK  (UCS_BIT(UCT_ATOMIC_OP_ADD)  | \
                             UCS_BIT(UCT_ATOMIC_OP_AND)  | \
                             UCS_BIT(UCT_ATOMIC_OP_OR)   | \
                             UCS_BIT(UCT_ATOMIC_OP_XOR))

#define UCP_ATOMIC_FOP_MASK (UCS_BIT(UCT_ATOMIC_OP_ADD)  | \
                             UCS_BIT(UCT_ATOMIC_OP_AND)  | \
                             UCS_BIT(UCT_ATOMIC_OP_OR)   | \
                             UCS_BIT(UCT_ATOMIC_OP_XOR)  | \
                             UCS_BIT(UCT_ATOMIC_OP_SWAP) | \
                             UCS_BIT(UCT_ATOMIC_OP_CSWAP))


/*
 * Define UCP active message handler.
 */
#define UCP_DEFINE_AM(_features, _id, _cb, _tracer, _flags) \
    UCS_STATIC_INIT { \
        ucp_am_handlers[_id].features = _features; \
        ucp_am_handlers[_id].cb       = _cb; \
        ucp_am_handlers[_id].tracer   = _tracer; \
        ucp_am_handlers[_id].flags    = _flags; \
    }


/**
 * Defines a proxy handler which counts received messages on ucp_worker_iface_t
 * context. It's used to determine if there is activity on a transport interface.
 */
#define UCP_DEFINE_AM_PROXY(_id) \
    \
    static ucs_status_t \
    ucp_am_##_id##_counting_proxy(void *arg, void *data, size_t length, \
                                  unsigned flags) \
    { \
        ucp_worker_iface_t *wiface = arg; \
        wiface->proxy_recv_count++; \
        return ucp_am_handlers[_id].cb(wiface->worker, data, length, flags); \
    } \
    \
    UCS_STATIC_INIT { \
        ucp_am_handlers[_id].proxy_cb = ucp_am_##_id##_counting_proxy; \
    }


#define UCP_CHECK_PARAM_NON_NULL(_param, _status, _action) \
    if ((_param) == NULL) { \
        ucs_error("the parameter %s must not be NULL", #_param); \
        (_status) = UCS_ERR_INVALID_PARAM; \
        _action; \
    };


/**
 * Check if at least one feature flag from @a _flags is initialized.
 */
#define UCP_CONTEXT_CHECK_FEATURE_FLAGS(_context, _flags, _action) \
    do { \
        if (ENABLE_PARAMS_CHECK && \
            ucs_unlikely(!((_context)->config.features & (_flags)))) {  \
            size_t feature_list_str_max = 512; \
            char *feature_list_str = ucs_alloca(feature_list_str_max);  \
            ucs_error("feature flags %s were not set for ucp_init()", \
                      ucs_flags_str(feature_list_str, feature_list_str_max,  \
                                    (_flags) & ~(_context)->config.features, \
                                    ucp_feature_str)); \
            _action; \
        } \
    } while (0)


#define UCP_PARAM_VALUE(_obj, _params, _name, _flag, _default) \
    (((_params)->field_mask & (UCP_##_obj##_PARAM_FIELD_##_flag)) ? \
                    (_params)->_name : (_default))


#define ucp_assert_memtype(_context, _buffer, _length, _mem_type) \
    ucs_assert(ucp_memory_type_detect(_context, _buffer, _length) == (_mem_type))


extern ucp_am_handler_t ucp_am_handlers[];
extern const char       *ucp_feature_str[];

void ucp_dump_payload(ucp_context_h context, char *buffer, size_t max,
                      const void *data, size_t length);

void ucp_context_tag_offload_enable(ucp_context_h context);

void ucp_context_uct_atomic_iface_flags(ucp_context_h context,
                                        ucp_tl_iface_atomic_flags_t *atomic);

const char * ucp_find_tl_name_by_csum(ucp_context_t *context, uint16_t tl_name_csum);

const char* ucp_tl_bitmap_str(ucp_context_h context, uint64_t tl_bitmap,
                              char *str, size_t max_str_len);

const char* ucp_feature_flags_str(unsigned feature_flags, char *str,
                                  size_t max_str_len);

ucs_memory_type_t
ucp_memory_type_detect_mds(ucp_context_h context, const void *address, size_t length);

/**
 * Calculate a small value to overcome float imprecision
 * between two float values
 */
static UCS_F_ALWAYS_INLINE
double ucp_calc_epsilon(double val1, double val2)
{
    return (val1 + val2) * (1e-6);
}

/**
 * Compare two scores and return:
 * - `-1` if score1 < score2
 * -  `0` if score1 == score2
 * -  `1` if score1 > score2
 */
static UCS_F_ALWAYS_INLINE
int ucp_score_cmp(double score1, double score2)
{
    double diff = score1 - score2;
    return ((fabs(diff) < ucp_calc_epsilon(score1, score2)) ?
            0 : ucs_signum(diff));
}

/**
 * Compare two scores taking into account priorities if scores are equal
 */
static UCS_F_ALWAYS_INLINE
int ucp_score_prio_cmp(double score1, int prio1, double score2, int prio2)
{
    int score_res = ucp_score_cmp(score1, score2);

    return score_res ? score_res : ucs_signum(prio1 - prio2);
}

static UCS_F_ALWAYS_INLINE
int ucp_is_scalable_transport(ucp_context_h context, size_t max_num_eps)
{
    return (max_num_eps >= (size_t)context->config.est_num_eps);
}

static UCS_F_ALWAYS_INLINE double
ucp_tl_iface_latency(ucp_context_h context, const uct_iface_attr_t *iface_attr)
{
    return iface_attr->latency.overhead +
           (iface_attr->latency.growth * context->config.est_num_eps);
}

static UCS_F_ALWAYS_INLINE double
ucp_tl_iface_bandwidth(ucp_context_h context, const uct_ppn_bandwidth_t *bandwidth)
{
    return bandwidth->dedicated + (bandwidth->shared / context->config.est_num_ppn);
}

static UCS_F_ALWAYS_INLINE int ucp_memory_type_cache_is_empty(ucp_context_h context)
{
    return (context->memtype_cache &&
            !context->memtype_cache->pgtable.num_regions);
}

static UCS_F_ALWAYS_INLINE ucs_memory_type_t
ucp_memory_type_detect(ucp_context_h context, const void *address, size_t length)
{
    ucs_memory_type_t mem_type;
    ucs_status_t status;

    if (ucs_likely(context->num_mem_type_detect_mds == 0)) {
        return UCS_MEMORY_TYPE_HOST;
    }

    if (ucs_likely(context->memtype_cache != NULL)) {
        if (!context->memtype_cache->pgtable.num_regions) {
            return UCS_MEMORY_TYPE_HOST;
        }

        status = ucs_memtype_cache_lookup(context->memtype_cache, address,
                                          length, &mem_type);
        if (status != UCS_OK) {
            ucs_assert(status == UCS_ERR_NO_ELEM);
            return UCS_MEMORY_TYPE_HOST;
        }

        if (mem_type != UCS_MEMORY_TYPE_LAST) {
            return mem_type;
        }

        /* mem_type is UCS_MEMORY_TYPE_LAST: fall thru to memory detection by
         * UCT memory domains */
    }

    return ucp_memory_type_detect_mds(context, address, length);
}

uint64_t ucp_context_dev_tl_bitmap(ucp_context_h context, const char *dev_name);

uint64_t ucp_context_dev_idx_tl_bitmap(ucp_context_h context,
                                       ucp_rsc_index_t dev_idx);

#endif