/** * Copyright (C) Mellanox Technologies Ltd. 2001-2018. ALL RIGHTS RESERVED. * Copyright (C) UT-Battelle, LLC. 2016. ALL RIGHTS RESERVED. * * See file LICENSE for terms. */ #ifdef HAVE_CONFIG_H # include "config.h" #endif #include "rma.h" #include "rma.inl" #include #include #include #include #include #include #define UCP_AMO_CHECK_PARAM(_context, _remote_addr, _size, _opcode, \ _last_opcode, _action) \ { \ if (ENABLE_PARAMS_CHECK && \ ucs_unlikely(((_remote_addr) % (_size)) != 0)) { \ ucs_error("atomic variable must be naturally aligned " \ "(remote address 0x%"PRIx64", size %zu)", (_remote_addr), \ (_size)); \ _action; \ } \ \ if (ENABLE_PARAMS_CHECK && \ ucs_unlikely(((_size) != 4) && (_size != 8))) { \ ucs_error("invalid atomic operation size: %zu", (_size)); \ _action; \ } \ \ UCP_CONTEXT_CHECK_FEATURE_FLAGS((_context), ((_size) == 4) ? \ UCP_FEATURE_AMO32 : UCP_FEATURE_AMO64, \ _action); \ \ if (ENABLE_PARAMS_CHECK && \ (ucs_unlikely((_opcode) >= (_last_opcode)))) { \ ucs_error("invalid atomic opcode %d ", _opcode); \ _action; \ } \ } static uct_atomic_op_t ucp_uct_op_table[] = { [UCP_ATOMIC_POST_OP_ADD] = UCT_ATOMIC_OP_ADD, [UCP_ATOMIC_POST_OP_AND] = UCT_ATOMIC_OP_AND, [UCP_ATOMIC_POST_OP_OR] = UCT_ATOMIC_OP_OR, [UCP_ATOMIC_POST_OP_XOR] = UCT_ATOMIC_OP_XOR }; static uct_atomic_op_t ucp_uct_fop_table[] = { [UCP_ATOMIC_FETCH_OP_FADD] = UCT_ATOMIC_OP_ADD, [UCP_ATOMIC_FETCH_OP_FAND] = UCT_ATOMIC_OP_AND, [UCP_ATOMIC_FETCH_OP_FOR] = UCT_ATOMIC_OP_OR, [UCP_ATOMIC_FETCH_OP_FXOR] = UCT_ATOMIC_OP_XOR, [UCP_ATOMIC_FETCH_OP_SWAP] = UCT_ATOMIC_OP_SWAP, [UCP_ATOMIC_FETCH_OP_CSWAP] = UCT_ATOMIC_OP_CSWAP, }; static void ucp_amo_completed_single(uct_completion_t *self, ucs_status_t status) { ucp_request_t *req = ucs_container_of(self, ucp_request_t, send.state.uct_comp); ucs_trace("invoking completion on AMO request %p", req); ucp_request_complete_send(req, status); } static UCS_F_ALWAYS_INLINE void ucp_amo_init_common(ucp_request_t *req, ucp_ep_h ep, uct_atomic_op_t op, uint64_t remote_addr, ucp_rkey_h rkey, uint64_t value, size_t size) { req->flags = 0; req->send.ep = ep; req->send.length = size; req->send.amo.uct_op = op; req->send.amo.remote_addr = remote_addr; req->send.amo.rkey = rkey; req->send.amo.value = value; #if UCS_ENABLE_ASSERT req->send.lane = UCP_NULL_LANE; #endif } static UCS_F_ALWAYS_INLINE void ucp_amo_init_fetch(ucp_request_t *req, ucp_ep_h ep, void *buffer, uct_atomic_op_t op, size_t op_size, uint64_t remote_addr, ucp_rkey_h rkey, uint64_t value, const ucp_amo_proto_t *proto) { ucp_amo_init_common(req, ep, op, remote_addr, rkey, value, op_size); req->send.state.uct_comp.count = 1; req->send.state.uct_comp.func = ucp_amo_completed_single; req->send.uct.func = proto->progress_fetch; req->send.buffer = buffer; } static UCS_F_ALWAYS_INLINE void ucp_amo_init_post(ucp_request_t *req, ucp_ep_h ep, uct_atomic_op_t op, size_t op_size, uint64_t remote_addr, ucp_rkey_h rkey, uint64_t value, const ucp_amo_proto_t *proto) { ucp_amo_init_common(req, ep, op, remote_addr, rkey, value, op_size); req->send.uct.func = proto->progress_post; } ucs_status_ptr_t ucp_atomic_fetch_nb(ucp_ep_h ep, ucp_atomic_fetch_op_t opcode, uint64_t value, void *result, size_t op_size, uint64_t remote_addr, ucp_rkey_h rkey, ucp_send_callback_t cb) { ucs_status_ptr_t status_p; ucs_status_t status; ucp_request_t *req; UCP_AMO_CHECK_PARAM(ep->worker->context, remote_addr, op_size, opcode, UCP_ATOMIC_FETCH_OP_LAST, return UCS_STATUS_PTR(UCS_ERR_INVALID_PARAM)); UCP_WORKER_THREAD_CS_ENTER_CONDITIONAL(ep->worker); ucs_trace_req("atomic_fetch_nb opcode %d value %"PRIu64" buffer %p size %zu" " remote_addr %"PRIx64" rkey %p to %s cb %p", opcode, value, result, op_size, remote_addr, rkey, ucp_ep_peer_name(ep), cb); status = UCP_RKEY_RESOLVE(rkey, ep, amo); if (status != UCS_OK) { status_p = UCS_STATUS_PTR(UCS_ERR_UNREACHABLE); goto out; } req = ucp_request_get(ep->worker); if (ucs_unlikely(NULL == req)) { status_p = UCS_STATUS_PTR(UCS_ERR_NO_MEMORY); goto out; } ucp_amo_init_fetch(req, ep, result, ucp_uct_fop_table[opcode], op_size, remote_addr, rkey, value, rkey->cache.amo_proto); status_p = ucp_rma_send_request_cb(req, cb); out: UCP_WORKER_THREAD_CS_EXIT_CONDITIONAL(ep->worker); return status_p; } ucs_status_t ucp_atomic_post(ucp_ep_h ep, ucp_atomic_post_op_t opcode, uint64_t value, size_t op_size, uint64_t remote_addr, ucp_rkey_h rkey) { ucs_status_ptr_t status_p; ucs_status_t status; ucp_request_t *req; UCP_AMO_CHECK_PARAM(ep->worker->context, remote_addr, op_size, opcode, UCP_ATOMIC_POST_OP_LAST, return UCS_ERR_INVALID_PARAM); UCP_WORKER_THREAD_CS_ENTER_CONDITIONAL(ep->worker); ucs_trace_req("atomic_post opcode %d value %"PRIu64" size %zu " "remote_addr %"PRIx64" rkey %p to %s", opcode, value, op_size, remote_addr, rkey, ucp_ep_peer_name(ep)); status = UCP_RKEY_RESOLVE(rkey, ep, amo); if (status != UCS_OK) { goto out; } req = ucp_request_get(ep->worker); if (ucs_unlikely(NULL == req)) { status = UCS_ERR_NO_MEMORY; goto out; } ucp_amo_init_post(req, ep, ucp_uct_op_table[opcode], op_size, remote_addr, rkey, value, rkey->cache.amo_proto); status_p = ucp_rma_send_request_cb(req, (ucp_send_callback_t)ucs_empty_function); if (UCS_PTR_IS_PTR(status_p)) { ucp_request_release(status_p); status = UCS_OK; } else { status = UCS_PTR_STATUS(status_p); } out: UCP_WORKER_THREAD_CS_EXIT_CONDITIONAL(ep->worker); return status; } static inline ucs_status_t ucp_atomic_fetch_b(ucp_ep_h ep, ucp_atomic_fetch_op_t opcode, uint64_t value, void *result, size_t size, uint64_t remote_addr, ucp_rkey_h rkey, const char *op_name) { void *request; request = ucp_atomic_fetch_nb(ep, opcode, value, result, size, remote_addr, rkey, (ucp_send_callback_t)ucs_empty_function); return ucp_rma_wait(ep->worker, request, op_name); } UCS_PROFILE_FUNC(ucs_status_t, ucp_atomic_add32, (ep, add, remote_addr, rkey), ucp_ep_h ep, uint32_t add, uint64_t remote_addr, ucp_rkey_h rkey) { return ucp_atomic_post(ep, UCP_ATOMIC_POST_OP_ADD, add, sizeof(add), remote_addr, rkey); } UCS_PROFILE_FUNC(ucs_status_t, ucp_atomic_add64, (ep, add, remote_addr, rkey), ucp_ep_h ep, uint64_t add, uint64_t remote_addr, ucp_rkey_h rkey) { return ucp_atomic_post(ep, UCP_ATOMIC_POST_OP_ADD, add, sizeof(add), remote_addr, rkey); } UCS_PROFILE_FUNC(ucs_status_t, ucp_atomic_fadd32, (ep, add, remote_addr, rkey, result), ucp_ep_h ep, uint32_t add, uint64_t remote_addr, ucp_rkey_h rkey, uint32_t *result) { return ucp_atomic_fetch_b(ep, UCP_ATOMIC_FETCH_OP_FADD, add, result, sizeof(add), remote_addr, rkey, "atomic_fadd32"); } UCS_PROFILE_FUNC(ucs_status_t, ucp_atomic_fadd64, (ep, add, remote_addr, rkey, result), ucp_ep_h ep, uint64_t add, uint64_t remote_addr, ucp_rkey_h rkey, uint64_t *result) { return ucp_atomic_fetch_b(ep, UCP_ATOMIC_FETCH_OP_FADD, add, result, sizeof(add), remote_addr, rkey, "atomic_fadd64"); } UCS_PROFILE_FUNC(ucs_status_t, ucp_atomic_swap32, (ep, swap, remote_addr, rkey, result), ucp_ep_h ep, uint32_t swap, uint64_t remote_addr, ucp_rkey_h rkey, uint32_t *result) { return ucp_atomic_fetch_b(ep, UCP_ATOMIC_FETCH_OP_SWAP, swap, result, sizeof(swap), remote_addr, rkey, "atomic_swap32"); } UCS_PROFILE_FUNC(ucs_status_t, ucp_atomic_swap64, (ep, swap, remote_addr, rkey, result), ucp_ep_h ep, uint64_t swap, uint64_t remote_addr, ucp_rkey_h rkey, uint64_t *result) { return ucp_atomic_fetch_b(ep, UCP_ATOMIC_FETCH_OP_SWAP, swap, result, sizeof(swap), remote_addr, rkey, "atomic_swap64"); } static UCS_F_ALWAYS_INLINE ucs_status_t ucp_atomic_cswap_b(ucp_ep_h ep, uint64_t compare, uint64_t swap, size_t size, uint64_t remote_addr, ucp_rkey_h rkey, void *result, const char *op_name) { char tmp[sizeof(swap)]; /* sufficient storage for maximal operand size */ ucs_status_t status; memcpy(tmp, &swap, size); status = ucp_atomic_fetch_b(ep, UCP_ATOMIC_FETCH_OP_CSWAP, compare, &tmp, size, remote_addr, rkey, op_name); if (status == UCS_OK) { memcpy(result, tmp, size); } return status; } UCS_PROFILE_FUNC(ucs_status_t, ucp_atomic_cswap32, (ep, compare, swap, remote_addr, rkey, result), ucp_ep_h ep, uint32_t compare, uint32_t swap, uint64_t remote_addr, ucp_rkey_h rkey, uint32_t *result) { return ucp_atomic_cswap_b(ep, compare, swap, sizeof(swap), remote_addr, rkey, result, "atomic_cswap32"); } UCS_PROFILE_FUNC(ucs_status_t, ucp_atomic_cswap64, (ep, compare, swap, remote_addr, rkey, result), ucp_ep_h ep, uint64_t compare, uint64_t swap, uint64_t remote_addr, ucp_rkey_h rkey, uint64_t *result) { return ucp_atomic_cswap_b(ep, compare, swap, sizeof(swap), remote_addr, rkey, result, "atomic_cswap64"); }