/** * Copyright (C) Mellanox Technologies Ltd. 2001-2019. ALL RIGHTS RESERVED. * * See file LICENSE for terms. */ #ifdef HAVE_CONFIG_H # include "config.h" #endif #include "ucp_proxy_ep.h" #include "ucp_ep.inl" #include #define UCP_PROXY_EP_PASTE_ARG_NAME(_, _index) \ , UCS_PP_TOKENPASTE(arg, _index) #define UCP_PROXY_EP_PASTE_ARG_TYPE(_, _bundle) \ , UCS_PP_TUPLE_1 _bundle UCS_PP_TOKENPASTE(arg, UCS_PP_TUPLE_0 _bundle) /* Generate list of typed arguments for a proxy function prototype */ #define UCP_PROXY_EP_FUNC_ARGS(...) \ uct_ep_h ep \ UCS_PP_FOREACH(UCP_PROXY_EP_PASTE_ARG_TYPE, _, \ UCS_PP_ZIP((UCS_PP_SEQ(UCS_PP_NUM_ARGS(__VA_ARGS__))), \ (__VA_ARGS__))) /* Generate a list of arguments passed to function call */ #define UCP_PROXY_EP_FUNC_CALL(...) \ proxy_ep->uct_ep \ UCS_PP_FOREACH(UCP_PROXY_EP_PASTE_ARG_NAME, _, \ UCS_PP_SEQ(UCS_PP_NUM_ARGS(__VA_ARGS__))) /* Generate a return statement based on return type */ #define UCP_PROXY_EP_RETURN(_retval) \ UCS_PP_TOKENPASTE(UCP_PROXY_EP_RETURN_, _retval) #define UCP_PROXY_EP_RETURN_ucs_status_t return #define UCP_PROXY_EP_RETURN_ucs_status_ptr_t return #define UCP_PROXY_EP_RETURN_ssize_t return #define UCP_PROXY_EP_RETURN_void /* * Define a proxy endpoint operation, which redirects the call to the underlying * transport endpoint. */ #define UCP_PROXY_EP_DEFINE_OP(_retval, _name, ...) \ static _retval ucp_proxy_ep_##_name(UCP_PROXY_EP_FUNC_ARGS(__VA_ARGS__)) \ { \ ucp_proxy_ep_t *proxy_ep = ucs_derived_of(ep, ucp_proxy_ep_t); \ UCP_PROXY_EP_RETURN(_retval) \ uct_ep_##_name(UCP_PROXY_EP_FUNC_CALL(__VA_ARGS__)); \ } UCP_PROXY_EP_DEFINE_OP(ucs_status_t, put_short, const void*, unsigned, uint64_t, uct_rkey_t) UCP_PROXY_EP_DEFINE_OP(ssize_t, put_bcopy, uct_pack_callback_t, void*, uint64_t, uct_rkey_t) UCP_PROXY_EP_DEFINE_OP(ucs_status_t, put_zcopy, const uct_iov_t*, size_t, uint64_t, uct_rkey_t, uct_completion_t*) UCP_PROXY_EP_DEFINE_OP(ucs_status_t, get_bcopy, uct_unpack_callback_t, void*, size_t, uint64_t, uct_rkey_t, uct_completion_t*) UCP_PROXY_EP_DEFINE_OP(ucs_status_t, get_zcopy, const uct_iov_t*, size_t, uint64_t, uct_rkey_t, uct_completion_t*) UCP_PROXY_EP_DEFINE_OP(ucs_status_t, am_short, uint8_t, uint64_t, const void*, unsigned) UCP_PROXY_EP_DEFINE_OP(ssize_t, am_bcopy, uint8_t, uct_pack_callback_t, void*, unsigned) UCP_PROXY_EP_DEFINE_OP(ucs_status_t, am_zcopy, uint8_t, const void*, unsigned, const uct_iov_t*, size_t, unsigned, uct_completion_t*) UCP_PROXY_EP_DEFINE_OP(ucs_status_t, atomic_cswap64, uint64_t, uint64_t, uint64_t, uct_rkey_t, uint64_t*, uct_completion_t*) UCP_PROXY_EP_DEFINE_OP(ucs_status_t, atomic_cswap32, uint32_t, uint32_t, uint64_t, uct_rkey_t, uint32_t*, uct_completion_t*) UCP_PROXY_EP_DEFINE_OP(ucs_status_t, atomic64_post, uct_atomic_op_t, uint64_t, uint64_t, uct_rkey_t) UCP_PROXY_EP_DEFINE_OP(ucs_status_t, atomic32_post, uct_atomic_op_t, uint32_t, uint64_t, uct_rkey_t) UCP_PROXY_EP_DEFINE_OP(ucs_status_t, atomic64_fetch, uct_atomic_op_t, uint64_t, uint64_t*, uint64_t, uct_rkey_t, uct_completion_t*) UCP_PROXY_EP_DEFINE_OP(ucs_status_t, atomic32_fetch, uct_atomic_op_t, uint32_t, uint32_t*, uint64_t, uct_rkey_t, uct_completion_t*) UCP_PROXY_EP_DEFINE_OP(ucs_status_t, tag_eager_short, uct_tag_t, const void*, size_t) UCP_PROXY_EP_DEFINE_OP(ssize_t, tag_eager_bcopy, uct_tag_t, uint64_t, uct_pack_callback_t, void*, unsigned) UCP_PROXY_EP_DEFINE_OP(ucs_status_t, tag_eager_zcopy, uct_tag_t, uint64_t, const uct_iov_t*, size_t, unsigned, uct_completion_t*) UCP_PROXY_EP_DEFINE_OP(ucs_status_ptr_t, tag_rndv_zcopy, uct_tag_t, const void*, unsigned, const uct_iov_t*, size_t, unsigned, uct_completion_t*) UCP_PROXY_EP_DEFINE_OP(ucs_status_t, tag_rndv_cancel, void*) UCP_PROXY_EP_DEFINE_OP(ucs_status_t, tag_rndv_request, uct_tag_t, const void*, unsigned, unsigned) UCP_PROXY_EP_DEFINE_OP(ucs_status_t, pending_add, uct_pending_req_t*, unsigned) UCP_PROXY_EP_DEFINE_OP(void, pending_purge, uct_pending_purge_callback_t, void*) UCP_PROXY_EP_DEFINE_OP(ucs_status_t, flush, unsigned, uct_completion_t*) UCP_PROXY_EP_DEFINE_OP(ucs_status_t, fence, unsigned) UCP_PROXY_EP_DEFINE_OP(ucs_status_t, check, unsigned, uct_completion_t*) UCP_PROXY_EP_DEFINE_OP(ucs_status_t, get_address, uct_ep_addr_t*) UCP_PROXY_EP_DEFINE_OP(ucs_status_t, connect_to_ep, const uct_device_addr_t*, const uct_ep_addr_t*) static UCS_CLASS_DEFINE_NAMED_DELETE_FUNC(ucp_proxy_ep_destroy, ucp_proxy_ep_t, uct_ep_t); static ucs_status_t ucp_proxy_ep_fatal(uct_iface_h iface, ...) { ucs_bug("unsupported function on proxy endpoint"); return UCS_ERR_UNSUPPORTED; } UCS_CLASS_INIT_FUNC(ucp_proxy_ep_t, const uct_iface_ops_t *ops, ucp_ep_h ucp_ep, uct_ep_h uct_ep, int is_owner) { #define UCP_PROXY_EP_SET_OP(_name) \ self->iface.ops._name = (ops->_name != NULL) ? ops->_name : ucp_proxy_##_name self->super.iface = &self->iface; self->ucp_ep = ucp_ep; self->uct_ep = uct_ep; self->is_owner = is_owner; UCP_PROXY_EP_SET_OP(ep_put_short); UCP_PROXY_EP_SET_OP(ep_put_short); UCP_PROXY_EP_SET_OP(ep_put_bcopy); UCP_PROXY_EP_SET_OP(ep_put_zcopy); UCP_PROXY_EP_SET_OP(ep_get_bcopy); UCP_PROXY_EP_SET_OP(ep_get_zcopy); UCP_PROXY_EP_SET_OP(ep_am_short); UCP_PROXY_EP_SET_OP(ep_am_bcopy); UCP_PROXY_EP_SET_OP(ep_am_zcopy); UCP_PROXY_EP_SET_OP(ep_atomic_cswap64); UCP_PROXY_EP_SET_OP(ep_atomic_cswap32); UCP_PROXY_EP_SET_OP(ep_atomic64_post); UCP_PROXY_EP_SET_OP(ep_atomic32_post); UCP_PROXY_EP_SET_OP(ep_atomic64_fetch); UCP_PROXY_EP_SET_OP(ep_atomic32_fetch); UCP_PROXY_EP_SET_OP(ep_tag_eager_short); UCP_PROXY_EP_SET_OP(ep_tag_eager_bcopy); UCP_PROXY_EP_SET_OP(ep_tag_eager_zcopy); UCP_PROXY_EP_SET_OP(ep_tag_rndv_zcopy); UCP_PROXY_EP_SET_OP(ep_tag_rndv_cancel); UCP_PROXY_EP_SET_OP(ep_tag_rndv_request); UCP_PROXY_EP_SET_OP(ep_pending_add); UCP_PROXY_EP_SET_OP(ep_pending_purge); UCP_PROXY_EP_SET_OP(ep_flush); UCP_PROXY_EP_SET_OP(ep_fence); UCP_PROXY_EP_SET_OP(ep_check); UCP_PROXY_EP_SET_OP(ep_destroy); UCP_PROXY_EP_SET_OP(ep_get_address); UCP_PROXY_EP_SET_OP(ep_connect_to_ep); self->iface.ops.iface_tag_recv_zcopy = (uct_iface_tag_recv_zcopy_func_t)ucp_proxy_ep_fatal; self->iface.ops.iface_tag_recv_cancel = (uct_iface_tag_recv_cancel_func_t)ucp_proxy_ep_fatal; self->iface.ops.ep_create = (uct_ep_create_func_t)ucp_proxy_ep_fatal; self->iface.ops.iface_flush = (uct_iface_flush_func_t)ucp_proxy_ep_fatal; self->iface.ops.iface_fence = (uct_iface_fence_func_t)ucp_proxy_ep_fatal; self->iface.ops.iface_progress_enable = (uct_iface_progress_enable_func_t)ucp_proxy_ep_fatal; self->iface.ops.iface_progress_disable = (uct_iface_progress_disable_func_t)ucp_proxy_ep_fatal; self->iface.ops.iface_progress = (uct_iface_progress_func_t)ucp_proxy_ep_fatal; self->iface.ops.iface_event_fd_get = (uct_iface_event_fd_get_func_t)ucp_proxy_ep_fatal; self->iface.ops.iface_event_arm = (uct_iface_event_arm_func_t)ucp_proxy_ep_fatal; self->iface.ops.iface_close = (uct_iface_close_func_t)ucp_proxy_ep_fatal; self->iface.ops.iface_query = (uct_iface_query_func_t)ucp_proxy_ep_fatal; self->iface.ops.iface_get_device_address = (uct_iface_get_device_address_func_t)ucp_proxy_ep_fatal; self->iface.ops.iface_get_address = (uct_iface_get_address_func_t)ucp_proxy_ep_fatal; self->iface.ops.iface_is_reachable = (uct_iface_is_reachable_func_t)ucp_proxy_ep_fatal; return UCS_OK; } static UCS_CLASS_CLEANUP_FUNC(ucp_proxy_ep_t) { if ((self->uct_ep != NULL) && self->is_owner) { uct_ep_destroy(self->uct_ep); } } int ucp_proxy_ep_test(uct_ep_h uct_ep) { return uct_ep->iface->ops.ep_destroy == ucp_proxy_ep_destroy; } uct_ep_h ucp_proxy_ep_extract(uct_ep_h ep) { ucp_proxy_ep_t *proxy_ep = ucs_derived_of(ep, ucp_proxy_ep_t); uct_ep_h uct_ep; uct_ep = proxy_ep->uct_ep; proxy_ep->uct_ep = NULL; return uct_ep; } static void ucp_proxy_ep_replace_if_owned(uct_ep_h uct_ep, uct_ep_h owned_ep, uct_ep_h replacement_ep) { ucp_proxy_ep_t *proxy_ep; if (ucp_proxy_ep_test(uct_ep)) { proxy_ep = ucs_derived_of(uct_ep, ucp_proxy_ep_t); if (proxy_ep->uct_ep == owned_ep) { proxy_ep->uct_ep = replacement_ep; } ucs_assert(replacement_ep != NULL); } } void ucp_proxy_ep_replace(ucp_proxy_ep_t *proxy_ep) { ucp_ep_h ucp_ep = proxy_ep->ucp_ep; ucp_lane_index_t lane; uct_ep_h tl_ep = NULL; ucs_assert(proxy_ep->uct_ep != NULL); for (lane = 0; lane < ucp_ep_num_lanes(ucp_ep); ++lane) { if (ucp_ep->uct_eps[lane] == &proxy_ep->super) { ucs_assert(proxy_ep->uct_ep != NULL); /* make sure there is only one match */ ucp_ep->uct_eps[lane] = proxy_ep->uct_ep; tl_ep = ucp_ep->uct_eps[lane]; proxy_ep->uct_ep = NULL; } } /* go through the lanes and check if the proxy ep that is being destroyed, * is pointed to by another proxy ep. if so, redirect that other proxy ep * to point to the underlying uct ep. */ for (lane = 0; lane < ucp_ep_num_lanes(ucp_ep); ++lane) { ucp_proxy_ep_replace_if_owned(ucp_ep->uct_eps[lane], &proxy_ep->super, tl_ep); } uct_ep_destroy(&proxy_ep->super); } void ucp_proxy_ep_set_uct_ep(ucp_proxy_ep_t *proxy_ep, uct_ep_h uct_ep, int is_owner) { proxy_ep->uct_ep = uct_ep; proxy_ep->is_owner = is_owner; } UCS_CLASS_DEFINE(ucp_proxy_ep_t, void);