/**
* Copyright (C) Mellanox Technologies Ltd. 2001-2014. ALL RIGHTS RESERVED.
*
* See file LICENSE for terms.
*/
#ifndef UCT_UD_IFACE_H
#define UCT_UD_IFACE_H
#include <uct/base/uct_worker.h>
#include <uct/ib/base/ib_iface.h>
#include <ucs/datastruct/sglib_wrapper.h>
#include <ucs/datastruct/ptr_array.h>
#include <ucs/datastruct/sglib.h>
#include <ucs/datastruct/list.h>
#include <ucs/datastruct/arbiter.h>
#include <ucs/async/async.h>
#include <ucs/time/timer_wheel.h>
#include <ucs/sys/compiler_def.h>
#include "ud_def.h"
#include "ud_ep.h"
#include "ud_iface_common.h"
BEGIN_C_DECLS
#define UCT_UD_MIN_TIMER_TIMER_BACKOFF 1.0
/** @file ud_iface.h */
enum {
UCT_UD_IFACE_STAT_RX_DROP,
UCT_UD_IFACE_STAT_LAST
};
/* TODO: maybe tx_moderation can be defined at compile-time since tx completions are used only to know how much space is there in tx qp */
typedef struct uct_ud_iface_config {
uct_ib_iface_config_t super;
uct_ud_iface_common_config_t ud_common;
double peer_timeout;
double slow_timer_tick;
double slow_timer_backoff;
int dgid_check;
unsigned max_window;
} uct_ud_iface_config_t;
struct uct_ud_iface_peer {
uct_ud_iface_peer_t *next;
union ibv_gid dgid;
uint16_t dlid;
uint32_t dst_qpn;
uint32_t conn_id_last;
ucs_list_link_t ep_list; /* ep list ordered by connection id */
};
static inline int uct_ud_iface_peer_cmp(uct_ud_iface_peer_t *a, uct_ud_iface_peer_t *b) {
return (int)a->dst_qpn - (int)b->dst_qpn ||
memcmp(a->dgid.raw, b->dgid.raw, sizeof(union ibv_gid)) ||
(int)a->dlid - (int)b->dlid;
}
static inline int uct_ud_iface_peer_hash(uct_ud_iface_peer_t *a) {
return (a->dlid + a->dgid.global.interface_id + a->dgid.global.subnet_prefix)
% UCT_UD_HASH_SIZE;
}
SGLIB_DEFINE_LIST_PROTOTYPES(uct_ud_iface_peer_t, uct_ud_iface_peer_cmp, next)
SGLIB_DEFINE_HASHED_CONTAINER_PROTOTYPES(uct_ud_iface_peer_t, UCT_UD_HASH_SIZE,
uct_ud_iface_peer_hash)
#if UCT_UD_EP_DEBUG_HOOKS
typedef ucs_status_t (*uct_ud_iface_hook_t)(uct_ud_iface_t *iface, uct_ud_neth_t *neth);
#define UCT_UD_IFACE_HOOK_DECLARE(_name) \
uct_ud_iface_hook_t _name;
#define UCT_UD_IFACE_HOOK_CALL_RX(_iface, _neth, _len) \
if ((_iface)->rx.hook(_iface, _neth) != UCS_OK) { \
ucs_trace_data("RX: dropping packet"); \
return; \
}
#define UCT_UD_IFACE_HOOK_INIT(_iface) { \
(_iface)->rx.hook = uct_ud_iface_null_hook; \
}
static inline ucs_status_t uct_ud_iface_null_hook(uct_ud_iface_t *iface,
uct_ud_neth_t *neth)
{
return UCS_OK;
}
#else
#define UCT_UD_IFACE_HOOK_DECLARE(_name)
#define UCT_UD_IFACE_HOOK_CALL_RX(_iface, _neth, _len)
#define UCT_UD_IFACE_HOOK_INIT(_iface)
#endif
typedef struct uct_ud_iface_ops {
uct_ib_iface_ops_t super;
unsigned (*async_progress)(uct_ud_iface_t *iface);
void (*tx_skb)(uct_ud_ep_t *ep, uct_ud_send_skb_t *skb,
int solicited);
void (*ep_free)(uct_ep_h ep);
ucs_status_t (*create_qp)(uct_ib_iface_t *iface, uct_ib_qp_attr_t *attr,
struct ibv_qp **qp_p);
} uct_ud_iface_ops_t;
struct uct_ud_iface {
uct_ib_iface_t super;
struct ibv_qp *qp;
struct {
ucs_mpool_t mp;
unsigned available;
unsigned quota;
ucs_queue_head_t pending_q;
UCT_UD_IFACE_HOOK_DECLARE(hook)
} rx;
struct {
uct_ud_send_skb_t *skb; /* ready to use skb */
uct_ud_send_skb_inl_t skb_inl;
ucs_mpool_t mp;
/* got async events but pending queue was not dispatched */
uint8_t async_before_pending;
int16_t available;
unsigned unsignaled;
/* pool of skbs that are reserved for retransmissions */
ucs_queue_head_t resend_skbs;
unsigned resend_skbs_quota;
ucs_arbiter_t pending_q;
ucs_queue_head_t async_comp_q;
} tx;
struct {
ucs_time_t peer_timeout;
double slow_timer_backoff;
unsigned tx_qp_len;
unsigned max_inline;
int check_grh_dgid;
unsigned gid_len;
unsigned max_window;
} config;
UCS_STATS_NODE_DECLARE(stats)
ucs_ptr_array_t eps;
uct_ud_iface_peer_t *peers[UCT_UD_HASH_SIZE];
struct {
ucs_twheel_t slow_timer;
ucs_time_t slow_tick;
int timer_id;
} async;
};
UCS_CLASS_DECLARE(uct_ud_iface_t, uct_ud_iface_ops_t*, uct_md_h,
uct_worker_h, const uct_iface_params_t*,
const uct_ud_iface_config_t*,
uct_ib_iface_init_attr_t*)
struct uct_ud_ctl_hdr {
uint8_t type;
uint8_t reserved[3];
union {
struct {
uct_ud_ep_addr_t ep_addr;
uint32_t conn_id;
} conn_req;
struct {
uint32_t src_ep_id;
} conn_rep;
uint32_t data;
};
uct_ud_peer_name_t peer;
/* For CREQ packet, IB address follows */
} UCS_S_PACKED;
extern ucs_config_field_t uct_ud_iface_config_table[];
ucs_status_t uct_ud_iface_query(uct_ud_iface_t *iface, uct_iface_attr_t *iface_attr);
void uct_ud_iface_release_desc(uct_recv_desc_t *self, void *desc);
ucs_status_t uct_ud_iface_get_address(uct_iface_h tl_iface, uct_iface_addr_t *addr);
void uct_ud_iface_add_ep(uct_ud_iface_t *iface, uct_ud_ep_t *ep);
void uct_ud_iface_remove_ep(uct_ud_iface_t *iface, uct_ud_ep_t *ep);
void uct_ud_iface_replace_ep(uct_ud_iface_t *iface, uct_ud_ep_t *old_ep, uct_ud_ep_t *new_ep);
ucs_status_t uct_ud_iface_flush(uct_iface_h tl_iface, unsigned flags,
uct_completion_t *comp);
ucs_status_t uct_ud_iface_complete_init(uct_ud_iface_t *iface);
void uct_ud_iface_remove_async_handlers(uct_ud_iface_t *iface);
void uct_ud_dump_packet(uct_base_iface_t *iface, uct_am_trace_type_t type,
void *data, size_t length, size_t valid_length,
char *buffer, size_t max);
static UCS_F_ALWAYS_INLINE int uct_ud_iface_can_tx(uct_ud_iface_t *iface)
{
return iface->tx.available > 0;
}
static UCS_F_ALWAYS_INLINE int uct_ud_iface_has_skbs(uct_ud_iface_t *iface)
{
return iface->tx.skb || !ucs_mpool_is_empty(&iface->tx.mp);
}
uct_ud_send_skb_t *uct_ud_iface_resend_skb_get(uct_ud_iface_t *iface);
static inline void
uct_ud_iface_resend_skb_put(uct_ud_iface_t *iface, uct_ud_send_skb_t *skb)
{
if (skb != ucs_unaligned_ptr(&iface->tx.skb_inl.super)) {
ucs_queue_push(&iface->tx.resend_skbs, &skb->queue);
}
}
static inline uct_ib_address_t* uct_ud_creq_ib_addr(uct_ud_ctl_hdr_t *conn_req)
{
ucs_assert(conn_req->type == UCT_UD_PACKET_CREQ);
return (uct_ib_address_t*)(conn_req + 1);
}
static UCS_F_ALWAYS_INLINE void uct_ud_enter(uct_ud_iface_t *iface)
{
UCS_ASYNC_BLOCK(iface->super.super.worker->async);
}
static UCS_F_ALWAYS_INLINE void uct_ud_leave(uct_ud_iface_t *iface)
{
UCS_ASYNC_UNBLOCK(iface->super.super.worker->async);
}
static UCS_F_ALWAYS_INLINE int
uct_ud_iface_check_grh(uct_ud_iface_t *iface, void *grh_end, int is_grh_present)
{
void *dest_gid, *local_gid;
if (!iface->config.check_grh_dgid) {
return 1;
}
if (ucs_unlikely(!is_grh_present)) {
ucs_warn("RoCE packet does not contain GRH");
return 1;
}
local_gid = (char*)iface->super.gid.raw + (16 - iface->config.gid_len);
dest_gid = (char*)grh_end - iface->config.gid_len;
if (memcmp(local_gid, dest_gid, iface->config.gid_len)) {
UCS_STATS_UPDATE_COUNTER(iface->stats, UCT_UD_IFACE_STAT_RX_DROP, 1);
ucs_trace_data("Drop packet with wrong dgid");
return 0;
}
return 1;
}
/*
management of connecting endpoints (cep)
Such endpoint are created either by explicitely calling ep_create_connected()
or implicitely as a result of UD connection protocol. Calling
ep_create_connected() may reuse already existing endpoint that was implicitely
created.
UD connection protocol
The protocol allows connection establishment in environment where UD packets
can be dropped, duplicated or reordered. The connection is done as 3 way
handshake:
1: CREQ (src_if_addr, src_ep_addr, conn_id)
Connection request. It includes source interface address, source ep address
and connection id.
Connection id is essentially a counter of endpoints that are created by
ep_create_connected(). The counter is per destination interface. Purpose of
conn_id is to ensure order between multiple CREQ packets and to handle
simultanuous connection establishment. The case when both sides call
ep_create_connected(). The rule is that connected endpoints must have
same conn_id.
2: CREP (dest_ep_id)
Connection reply. It includes id of destination endpoint and optinally ACK
request flag. From this point reliability is handled by UD protocol as
source and destination endpoint ids are known.
Endpoint may be created upon reception of CREQ. It is possible that the
endpoint already exists because CREQ is retransmitted or because of
simultaneous connection. In any case endpoint connection id must be
equal to connection id in CREQ.
3: ACK
Ack on connection reply. It may be send as part of the data packet.
Implicit endpoints reuse
Endpoints created upon receive of CREP request can be re-used when
application calls ep_create_connected().
Data structure
Hash table and double linked sorted list:
hash(src_if_addr) -> peer ->ep (list sorted in descending order)
List is used to save memory (8 bytes instead of 500-1000 bytes of hashtable)
In many cases list will provide fast lookup and insertion.
It is expected that most of connect requests will arrive in order. In
such case the insertion is O(1) because it is done to the head of the
list. Lookup is O(number of 'passive' eps) which is expected to be small.
TODO: add and maintain pointer to the list element with conn_id equal to
conn_last_id. This will allow for O(1) endpoint lookup.
Connection id assignment:
0 1 ... conn_last_id, +1, +2, ... UCT_UD_EP_CONN_ID_MAX
Ids upto (not including) conn_last_id are already assigned to endpoints.
Any endpoint with conn_id >= conn_last_id is created on receive of CREQ
There may be holes because CREQs are not received in order.
Call to ep_create_connected() will try reuse endpoint with
conn_id = conn_last_id
If there is no such endpoint new endpoint with id conn_last_id
will be created.
In both cases conn_last_id = conn_last_id + 1
*/
void uct_ud_iface_cep_init(uct_ud_iface_t *iface);
/* find ep that is connected to (src_if, src_ep),
* if conn_id == UCT_UD_EP_CONN_ID_MAX then try to
* reuse ep with conn_id == conn_last_id
*/
uct_ud_ep_t *uct_ud_iface_cep_lookup(uct_ud_iface_t *iface,
const uct_ib_address_t *src_ib_addr,
const uct_ud_iface_addr_t *src_if_addr,
uint32_t conn_id);
/* remove ep */
void uct_ud_iface_cep_remove(uct_ud_ep_t *ep);
/*
* rollback last ordered insert (conn_id == UCT_UD_EP_CONN_ID_MAX).
*/
void uct_ud_iface_cep_rollback(uct_ud_iface_t *iface,
const uct_ib_address_t *src_ib_addr,
const uct_ud_iface_addr_t *src_if_addr,
uct_ud_ep_t *ep);
/* insert new ep that is connected to src_if_addr */
ucs_status_t uct_ud_iface_cep_insert(uct_ud_iface_t *iface,
const uct_ib_address_t *src_ib_addr,
const uct_ud_iface_addr_t *src_if_addr,
uct_ud_ep_t *ep, uint32_t conn_id);
void uct_ud_iface_cep_cleanup(uct_ud_iface_t *iface);
/* get time of the last async wakeup */
static UCS_F_ALWAYS_INLINE ucs_time_t
uct_ud_iface_get_async_time(uct_ud_iface_t *iface)
{
return iface->super.super.worker->async->last_wakeup;
}
static UCS_F_ALWAYS_INLINE void
uct_ud_iface_progress_pending(uct_ud_iface_t *iface, const uintptr_t is_async)
{
if (!is_async) {
iface->tx.async_before_pending = 0;
}
if (!uct_ud_iface_can_tx(iface)) {
return;
}
ucs_arbiter_dispatch(&iface->tx.pending_q, 1, uct_ud_ep_do_pending,
(void *)is_async);
}
static UCS_F_ALWAYS_INLINE int
uct_ud_iface_has_pending_async_ev(uct_ud_iface_t *iface)
{
return iface->tx.async_before_pending;
}
static UCS_F_ALWAYS_INLINE void
uct_ud_iface_raise_pending_async_ev(uct_ud_iface_t *iface)
{
if (!ucs_arbiter_is_empty(&iface->tx.pending_q)) {
iface->tx.async_before_pending = 1;
}
}
/* Go over all active eps and remove them. Do it this way because class destructors are not
* virtual
*/
#define UCT_UD_IFACE_DELETE_EPS(_iface, _ep_type_t) \
{ \
int _i; \
_ep_type_t *_ep; \
ucs_ptr_array_for_each(_ep, _i, &(_iface)->eps) { \
UCS_CLASS_DELETE(_ep_type_t, _ep); \
} \
}
ucs_status_t uct_ud_iface_dispatch_pending_rx_do(uct_ud_iface_t *iface);
void uct_ud_iface_handle_failure(uct_ib_iface_t *iface, void *arg,
ucs_status_t status);
ucs_status_t uct_ud_iface_event_arm(uct_iface_h tl_iface, unsigned events);
void uct_ud_iface_progress_enable(uct_iface_h tl_iface, unsigned flags);
static UCS_F_ALWAYS_INLINE ucs_status_t
uct_ud_iface_dispatch_pending_rx(uct_ud_iface_t *iface)
{
if (ucs_likely(ucs_queue_is_empty(&iface->rx.pending_q))) {
return UCS_OK;
}
return uct_ud_iface_dispatch_pending_rx_do(iface);
}
void uct_ud_iface_dispatch_async_comps_do(uct_ud_iface_t *iface);
static UCS_F_ALWAYS_INLINE void
uct_ud_iface_dispatch_zcopy_comps(uct_ud_iface_t *iface)
{
if (ucs_likely(ucs_queue_is_empty(&iface->tx.async_comp_q))) {
return;
}
uct_ud_iface_dispatch_async_comps_do(iface);
}
#if ENABLE_PARAMS_CHECK
#define UCT_UD_CHECK_LENGTH(iface, header_len, payload_len, msg) \
do { \
int mtu; \
mtu = uct_ib_mtu_value(uct_ib_iface_port_attr(&(iface)->super)->active_mtu); \
UCT_CHECK_LENGTH(sizeof(uct_ud_neth_t) + payload_len + header_len, \
0, mtu, msg); \
} while(0);
#define UCT_UD_CHECK_BCOPY_LENGTH(iface, len) \
UCT_UD_CHECK_LENGTH(iface, 0, len, "am_bcopy length")
#define UCT_UD_CHECK_ZCOPY_LENGTH(iface, header_len, payload_len) \
UCT_UD_CHECK_LENGTH(iface, header_len, payload_len, "am_zcopy payload")
#else
#define UCT_UD_CHECK_ZCOPY_LENGTH(iface, header_len, payload_len)
#define UCT_UD_CHECK_BCOPY_LENGTH(iface, len)
#endif
END_C_DECLS
#endif