/** * Copyright (C) Mellanox Technologies Ltd. 2001-2018. ALL RIGHTS RESERVED. * * See file LICENSE for terms. */ #ifdef HAVE_CONFIG_H # include "config.h" #endif #include "rma.h" #include "rma.inl" #include #include static size_t ucp_amo_sw_pack(void *dest, void *arg, uint8_t fetch) { ucp_request_t *req = arg; ucp_atomic_req_hdr_t *atomich = dest; ucp_ep_t *ep = req->send.ep; size_t size = req->send.length; size_t length; atomich->address = req->send.rma.remote_addr; atomich->req.ep_ptr = ucp_ep_dest_ep_ptr(ep); atomich->req.reqptr = fetch ? (uintptr_t)req : 0; atomich->length = size; atomich->opcode = req->send.amo.uct_op; memcpy(atomich + 1, &req->send.amo.value, size); length = sizeof(*atomich) + size; if (req->send.amo.uct_op == UCT_ATOMIC_OP_CSWAP) { /* compare-swap has two arguments */ memcpy(UCS_PTR_BYTE_OFFSET(atomich + 1, size), req->send.buffer, size); length += size; } return length; } static size_t ucp_amo_sw_post_pack_cb(void *dest, void *arg) { return ucp_amo_sw_pack(dest, arg, 0); } static size_t ucp_amo_sw_fetch_pack_cb(void *dest, void *arg) { return ucp_amo_sw_pack(dest, arg, 1); } static ucs_status_t ucp_amo_sw_progress(uct_pending_req_t *self, uct_pack_callback_t pack_cb, int fetch) { ucp_request_t *req = ucs_container_of(self, ucp_request_t, send.uct); ucp_ep_t *ep = req->send.ep; ucs_status_t status; ssize_t packed_len; req->send.lane = ucp_ep_get_am_lane(ep); packed_len = uct_ep_am_bcopy(ep->uct_eps[req->send.lane], UCP_AM_ID_ATOMIC_REQ, pack_cb, req, 0); if (packed_len > 0) { ucp_ep_rma_remote_request_sent(ep); if (!fetch) { ucp_request_complete_send(req, UCS_OK); } return UCS_OK; } else { status = (ucs_status_t)packed_len; if (status != UCS_ERR_NO_RESOURCE) { /* failure */ ucp_request_complete_send(req, status); } return status; } } static ucs_status_t ucp_amo_sw_progress_post(uct_pending_req_t *self) { return ucp_amo_sw_progress(self, ucp_amo_sw_post_pack_cb, 0); } static ucs_status_t ucp_amo_sw_progress_fetch(uct_pending_req_t *self) { return ucp_amo_sw_progress(self, ucp_amo_sw_fetch_pack_cb, 1); } ucp_amo_proto_t ucp_amo_sw_proto = { .name = "sw_amo", .progress_fetch = ucp_amo_sw_progress_fetch, .progress_post = ucp_amo_sw_progress_post }; static size_t ucp_amo_sw_pack_atomic_reply(void *dest, void *arg) { ucp_rma_rep_hdr_t *hdr = dest; ucp_request_t *req = arg; hdr->req = req->send.get_reply.req; switch (req->send.length) { case sizeof(uint32_t): *(uint32_t*)(hdr + 1) = req->send.atomic_reply.data.reply32; break; case sizeof(uint64_t): *(uint64_t*)(hdr + 1) = req->send.atomic_reply.data.reply64; break; default: ucs_fatal("invalid atomic length: %zu", req->send.length); } return sizeof(*hdr) + req->send.length; } static ucs_status_t ucp_progress_atomic_reply(uct_pending_req_t *self) { ucp_request_t *req = ucs_container_of(self, ucp_request_t, send.uct); ucp_ep_t *ep = req->send.ep; ssize_t packed_len; req->send.lane = ucp_ep_get_am_lane(ep); packed_len = uct_ep_am_bcopy(ep->uct_eps[req->send.lane], UCP_AM_ID_ATOMIC_REP, ucp_amo_sw_pack_atomic_reply, req, 0); if (packed_len < 0) { return (ucs_status_t)packed_len; } ucs_assert(packed_len == sizeof(ucp_rma_rep_hdr_t) + req->send.length); ucp_request_put(req); return UCS_OK; } #define DEFINE_AMO_SW_OP(_bits) \ static void ucp_amo_sw_do_op##_bits(const ucp_atomic_req_hdr_t *atomicreqh) \ { \ uint##_bits##_t *ptr = (void*)atomicreqh->address; \ uint##_bits##_t *args = (void*)(atomicreqh + 1); \ \ switch (atomicreqh->opcode) { \ case UCT_ATOMIC_OP_ADD: \ ucs_atomic_add##_bits(ptr, args[0]); \ break; \ case UCT_ATOMIC_OP_AND: \ ucs_atomic_and##_bits(ptr, args[0]); \ break; \ case UCT_ATOMIC_OP_OR: \ ucs_atomic_or##_bits(ptr, args[0]); \ break; \ case UCT_ATOMIC_OP_XOR: \ ucs_atomic_xor##_bits(ptr, args[0]); \ break; \ default: \ ucs_fatal("invalid opcode: %d", atomicreqh->opcode); \ } \ } #define DEFINE_AMO_SW_FOP(_bits) \ static void ucp_amo_sw_do_fop##_bits(const ucp_atomic_req_hdr_t *atomicreqh, \ ucp_atomic_reply_t *result) \ { \ uint##_bits##_t *ptr = (void*)atomicreqh->address; \ uint##_bits##_t *args = (void*)(atomicreqh + 1); \ \ switch (atomicreqh->opcode) { \ case UCT_ATOMIC_OP_ADD: \ result->reply##_bits = ucs_atomic_fadd##_bits(ptr, args[0]); \ break; \ case UCT_ATOMIC_OP_AND: \ result->reply##_bits = ucs_atomic_fand##_bits(ptr, args[0]); \ break; \ case UCT_ATOMIC_OP_OR: \ result->reply##_bits = ucs_atomic_for##_bits(ptr, args[0]); \ break; \ case UCT_ATOMIC_OP_XOR: \ result->reply##_bits = ucs_atomic_fxor##_bits(ptr, args[0]); \ break; \ case UCT_ATOMIC_OP_SWAP: \ result->reply##_bits = ucs_atomic_swap##_bits(ptr, args[0]); \ break; \ case UCT_ATOMIC_OP_CSWAP: \ result->reply##_bits = ucs_atomic_cswap##_bits(ptr, args[0], args[1]); \ break; \ default: \ ucs_fatal("invalid opcode: %d", atomicreqh->opcode); \ } \ } DEFINE_AMO_SW_OP(32) DEFINE_AMO_SW_OP(64) DEFINE_AMO_SW_FOP(32) DEFINE_AMO_SW_FOP(64) UCS_PROFILE_FUNC(ucs_status_t, ucp_atomic_req_handler, (arg, data, length, am_flags), void *arg, void *data, size_t length, unsigned am_flags) { ucp_atomic_req_hdr_t *atomicreqh = data; ucp_worker_h worker = arg; ucp_ep_h ep = ucp_worker_get_ep_by_ptr(worker, atomicreqh->req.ep_ptr); ucp_rsc_index_t amo_rsc_idx = ucs_ffs64_safe(worker->atomic_tls); ucp_request_t *req; if (ucs_unlikely((amo_rsc_idx != UCP_MAX_RESOURCES) && (ucp_worker_iface_get_attr(worker, amo_rsc_idx)->cap.flags & UCT_IFACE_FLAG_ATOMIC_DEVICE))) { ucs_error("Unsupported: got software atomic request while device atomics are selected on worker %p", worker); /* TODO: this situation will be possible then CM wireup is implemented * and CM lane is bound to suboptimal device, then need to execute * AMO on fastest resource from worker->atomic_tls using loopback * EP and continue SW AMO protocol */ } if (atomicreqh->req.reqptr == 0) { /* atomic operation without result */ switch (atomicreqh->length) { case sizeof(uint32_t): ucp_amo_sw_do_op32(atomicreqh); break; case sizeof(uint64_t): ucp_amo_sw_do_op64(atomicreqh); break; default: ucs_fatal("invalid atomic length: %u", atomicreqh->length); } ucp_rma_sw_send_cmpl(ep); } else { /* atomic operation with result */ req = ucp_request_get(worker); if (req == NULL) { ucs_error("failed to allocate atomic reply"); return UCS_OK; } switch (atomicreqh->length) { case sizeof(uint32_t): ucp_amo_sw_do_fop32(atomicreqh, &req->send.atomic_reply.data); break; case sizeof(uint64_t): ucp_amo_sw_do_fop64(atomicreqh, &req->send.atomic_reply.data); break; default: ucs_fatal("invalid atomic length: %u", atomicreqh->length); } req->send.ep = ep; req->send.atomic_reply.req = atomicreqh->req.reqptr; req->send.length = atomicreqh->length; req->send.uct.func = ucp_progress_atomic_reply; ucp_request_send(req, 0); } return UCS_OK; } UCS_PROFILE_FUNC(ucs_status_t, ucp_atomic_rep_handler, (arg, data, length, am_flags), void *arg, void *data, size_t length, unsigned am_flags) { ucp_rma_rep_hdr_t *hdr = data; size_t frag_length = length - sizeof(*hdr); ucp_request_t *req = (ucp_request_t*)hdr->req; ucp_ep_h ep = req->send.ep; memcpy(req->send.buffer, hdr + 1, frag_length); ucp_request_complete_send(req, UCS_OK); ucp_ep_rma_remote_request_completed(ep); return UCS_OK; } static void ucp_amo_sw_dump_packet(ucp_worker_h worker, uct_am_trace_type_t type, uint8_t id, const void *data, size_t length, char *buffer, size_t max) { const ucp_atomic_req_hdr_t *atomich; const ucp_rma_rep_hdr_t *reph; size_t header_len; char *p; switch (id) { case UCP_AM_ID_ATOMIC_REQ: atomich = data; snprintf(buffer, max, "ATOMIC_REQ [addr 0x%lx len %u reqptr 0x%lx ep 0x%lx op %d]", atomich->address, atomich->length, atomich->req.reqptr, atomich->req.ep_ptr, atomich->opcode); header_len = sizeof(*atomich);; break; case UCP_AM_ID_ATOMIC_REP: reph = data; snprintf(buffer, max, "ATOMIC_REP [reqptr 0x%lx]", reph->req); header_len = sizeof(*reph); break; default: return; } p = buffer + strlen(buffer); ucp_dump_payload(worker->context, p, buffer + max - p, UCS_PTR_BYTE_OFFSET(data, header_len), length - header_len); } UCP_DEFINE_AM(UCP_FEATURE_AMO, UCP_AM_ID_ATOMIC_REQ, ucp_atomic_req_handler, ucp_amo_sw_dump_packet, 0); UCP_DEFINE_AM(UCP_FEATURE_AMO, UCP_AM_ID_ATOMIC_REP, ucp_atomic_rep_handler, ucp_amo_sw_dump_packet, 0); UCP_DEFINE_AM_PROXY(UCP_AM_ID_ATOMIC_REQ);