/** * Copyright (C) Mellanox Technologies Ltd. 2001-2018. ALL RIGHTS RESERVED. * * See file LICENSE for terms. */ #ifdef HAVE_CONFIG_H # include "config.h" #endif #include "rma.h" #include "rma.inl" #include #include static size_t ucp_rma_sw_put_pack_cb(void *dest, void *arg) { ucp_request_t *req = arg; ucp_ep_t *ep = req->send.ep; ucp_put_hdr_t *puth = dest; size_t length; puth->address = req->send.rma.remote_addr; puth->ep_ptr = ucp_ep_dest_ep_ptr(ep); ucs_assert(puth->ep_ptr != 0); length = ucs_min(req->send.length, ucp_ep_config(ep)->am.max_bcopy - sizeof(*puth)); memcpy(puth + 1, req->send.buffer, length); return sizeof(*puth) + length; } static ucs_status_t ucp_rma_sw_progress_put(uct_pending_req_t *self) { ucp_request_t *req = ucs_container_of(self, ucp_request_t, send.uct); ucp_ep_t *ep = req->send.ep; ssize_t packed_len; ucs_status_t status; ucs_assert(req->send.lane == ucp_ep_get_am_lane(ep)); packed_len = uct_ep_am_bcopy(ep->uct_eps[req->send.lane], UCP_AM_ID_PUT, ucp_rma_sw_put_pack_cb, req, 0); if (packed_len > 0) { status = UCS_OK; ucp_ep_rma_remote_request_sent(ep); } else { status = (ucs_status_t)packed_len; } return ucp_rma_request_advance(req, packed_len - sizeof(ucp_put_hdr_t), status); } static size_t ucp_rma_sw_get_req_pack_cb(void *dest, void *arg) { ucp_request_t *req = arg; ucp_get_req_hdr_t *getreqh = dest; getreqh->address = req->send.rma.remote_addr; getreqh->length = req->send.length; getreqh->req.ep_ptr = ucp_ep_dest_ep_ptr(req->send.ep); getreqh->req.reqptr = (uintptr_t)req; ucs_assert(getreqh->req.ep_ptr != 0); return sizeof(*getreqh); } static ucs_status_t ucp_rma_sw_progress_get(uct_pending_req_t *self) { ucp_request_t *req = ucs_container_of(self, ucp_request_t, send.uct); ucp_ep_t *ep = req->send.ep; ucs_status_t status; ssize_t packed_len; ucs_assert(req->send.lane == ucp_ep_get_am_lane(ep)); packed_len = uct_ep_am_bcopy(ep->uct_eps[req->send.lane], UCP_AM_ID_GET_REQ, ucp_rma_sw_get_req_pack_cb, req, 0); if (packed_len < 0) { status = (ucs_status_t)packed_len; if (status != UCS_ERR_NO_RESOURCE) { ucp_request_complete_send(req, status); } return status; } /* get request packet sent, complete the request object when all data arrives */ ucs_assert(packed_len == sizeof(ucp_get_req_hdr_t)); ucp_ep_rma_remote_request_sent(ep); return UCS_OK; } ucp_rma_proto_t ucp_rma_sw_proto = { .name = "sw_rma", .progress_put = ucp_rma_sw_progress_put, .progress_get = ucp_rma_sw_progress_get }; static size_t ucp_rma_sw_pack_rma_ack(void *dest, void *arg) { ucp_cmpl_hdr_t *hdr = dest; ucp_request_t *req = arg; hdr->ep_ptr = ucp_ep_dest_ep_ptr(req->send.ep); return sizeof(*hdr); } static ucs_status_t ucp_progress_rma_cmpl(uct_pending_req_t *self) { ucp_request_t *req = ucs_container_of(self, ucp_request_t, send.uct); ucp_ep_t *ep = req->send.ep; ssize_t packed_len; req->send.lane = ucp_ep_get_am_lane(ep); packed_len = uct_ep_am_bcopy(ep->uct_eps[req->send.lane], UCP_AM_ID_CMPL, ucp_rma_sw_pack_rma_ack, req, 0); if (packed_len < 0) { return (ucs_status_t)packed_len; } ucs_assert(packed_len == sizeof(ucp_cmpl_hdr_t)); ucp_request_put(req); return UCS_OK; } void ucp_rma_sw_send_cmpl(ucp_ep_h ep) { ucp_request_t *req; req = ucp_request_get(ep->worker); if (req == NULL) { ucs_error("failed to allocate put completion"); return; } req->send.ep = ep; req->send.uct.func = ucp_progress_rma_cmpl; ucp_request_send(req, 0); } UCS_PROFILE_FUNC(ucs_status_t, ucp_put_handler, (arg, data, length, am_flags), void *arg, void *data, size_t length, unsigned am_flags) { ucp_put_hdr_t *puth = data; ucp_worker_h worker = arg; memcpy((void*)puth->address, puth + 1, length - sizeof(*puth)); ucp_rma_sw_send_cmpl(ucp_worker_get_ep_by_ptr(worker, puth->ep_ptr)); return UCS_OK; } UCS_PROFILE_FUNC(ucs_status_t, ucp_rma_cmpl_handler, (arg, data, length, am_flags), void *arg, void *data, size_t length, unsigned am_flags) { ucp_cmpl_hdr_t *putackh = data; ucp_worker_h worker = arg; ucp_ep_h ep = ucp_worker_get_ep_by_ptr(worker, putackh->ep_ptr); ucp_ep_rma_remote_request_completed(ep); return UCS_OK; } static size_t ucp_rma_sw_pack_get_reply(void *dest, void *arg) { ucp_rma_rep_hdr_t *hdr = dest; ucp_request_t *req = arg; size_t length; length = ucs_min(req->send.length, ucp_ep_config(req->send.ep)->am.max_bcopy - sizeof(*hdr)); hdr->req = req->send.get_reply.req; memcpy(hdr + 1, req->send.buffer, length); return sizeof(*hdr) + length; } static ucs_status_t ucp_progress_get_reply(uct_pending_req_t *self) { ucp_request_t *req = ucs_container_of(self, ucp_request_t, send.uct); ucp_ep_t *ep = req->send.ep; ssize_t packed_len, payload_len; req->send.lane = ucp_ep_get_am_lane(ep); packed_len = uct_ep_am_bcopy(ep->uct_eps[req->send.lane], UCP_AM_ID_GET_REP, ucp_rma_sw_pack_get_reply, req, 0); if (packed_len < 0) { return (ucs_status_t)packed_len; } payload_len = packed_len - sizeof(ucp_rma_rep_hdr_t); ucs_assert(payload_len >= 0); req->send.buffer = UCS_PTR_BYTE_OFFSET(req->send.buffer, payload_len); req->send.length -= payload_len; if (req->send.length == 0) { ucp_request_put(req); return UCS_OK; } else { return UCS_INPROGRESS; } } UCS_PROFILE_FUNC(ucs_status_t, ucp_get_req_handler, (arg, data, length, am_flags), void *arg, void *data, size_t length, unsigned am_flags) { ucp_get_req_hdr_t *getreqh = data; ucp_worker_h worker = arg; ucp_ep_h ep = ucp_worker_get_ep_by_ptr(worker, getreqh->req.ep_ptr); ucp_request_t *req; req = ucp_request_get(worker); if (req == NULL) { ucs_error("failed to allocate get reply"); return UCS_OK; } req->send.ep = ep; req->send.buffer = (void*)getreqh->address; req->send.length = getreqh->length; req->send.get_reply.req = getreqh->req.reqptr; req->send.uct.func = ucp_progress_get_reply; ucp_request_send(req, 0); return UCS_OK; } UCS_PROFILE_FUNC(ucs_status_t, ucp_get_rep_handler, (arg, data, length, am_flags), void *arg, void *data, size_t length, unsigned am_flags) { ucp_rma_rep_hdr_t *getreph = data; size_t frag_length = length - sizeof(*getreph); ucp_request_t *req = (ucp_request_t*)getreph->req; ucp_ep_h ep = req->send.ep; memcpy(req->send.buffer, getreph + 1, frag_length); /* complete get request on last fragment of the reply */ if (ucp_rma_request_advance(req, frag_length, UCS_OK) == UCS_OK) { ucp_ep_rma_remote_request_completed(ep); } return UCS_OK; } static void ucp_rma_sw_dump_packet(ucp_worker_h worker, uct_am_trace_type_t type, uint8_t id, const void *data, size_t length, char *buffer, size_t max) { const ucp_get_req_hdr_t *geth; const ucp_rma_rep_hdr_t *reph; const ucp_cmpl_hdr_t *cmplh; const ucp_put_hdr_t *puth; size_t header_len; char *p; switch (id) { case UCP_AM_ID_PUT: puth = data; snprintf(buffer, max, "PUT [addr 0x%lx ep_ptr 0x%lx]", puth->address, puth->ep_ptr); header_len = sizeof(*puth); break; case UCP_AM_ID_GET_REQ: geth = data; snprintf(buffer, max, "GET_REQ [addr 0x%lx len %zu reqptr 0x%lx ep 0x%lx]", geth->address, geth->length, geth->req.reqptr, geth->req.ep_ptr); return; case UCP_AM_ID_GET_REP: reph = data; snprintf(buffer, max, "GET_REP [reqptr 0x%lx]", reph->req); header_len = sizeof(*reph); break; case UCP_AM_ID_CMPL: cmplh = data; snprintf(buffer, max, "CMPL [ep_ptr 0x%lx]", cmplh->ep_ptr); return; default: return; } p = buffer + strlen(buffer); ucp_dump_payload(worker->context, p, buffer + max - p, UCS_PTR_BYTE_OFFSET(data, header_len), length - header_len); } UCP_DEFINE_AM(UCP_FEATURE_RMA, UCP_AM_ID_PUT, ucp_put_handler, ucp_rma_sw_dump_packet, 0); UCP_DEFINE_AM(UCP_FEATURE_RMA, UCP_AM_ID_GET_REQ, ucp_get_req_handler, ucp_rma_sw_dump_packet, 0); UCP_DEFINE_AM(UCP_FEATURE_RMA, UCP_AM_ID_GET_REP, ucp_get_rep_handler, ucp_rma_sw_dump_packet, 0); UCP_DEFINE_AM(UCP_FEATURE_RMA|UCP_FEATURE_AMO, UCP_AM_ID_CMPL, ucp_rma_cmpl_handler, ucp_rma_sw_dump_packet, 0); UCP_DEFINE_AM_PROXY(UCP_AM_ID_PUT); UCP_DEFINE_AM_PROXY(UCP_AM_ID_GET_REQ);