/** * Copyright (C) Mellanox Technologies Ltd. 2001-2016. ALL RIGHTS RESERVED. * * See file LICENSE for terms. */ #ifdef HAVE_CONFIG_H # include "config.h" #endif #include "proto_am.inl" #include static inline size_t ucp_proto_max_packed_size() { return ucs_max(sizeof(ucp_reply_hdr_t), sizeof(ucp_offload_ssend_hdr_t)); } static size_t ucp_proto_pack(void *dest, void *arg) { ucp_request_t *req = arg; ucp_reply_hdr_t *rep_hdr; ucp_offload_ssend_hdr_t *off_rep_hdr; switch (req->send.proto.am_id) { case UCP_AM_ID_EAGER_SYNC_ACK: case UCP_AM_ID_RNDV_ATS: case UCP_AM_ID_RNDV_ATP: rep_hdr = dest; rep_hdr->reqptr = req->send.proto.remote_request; rep_hdr->status = req->send.proto.status; return sizeof(*rep_hdr); case UCP_AM_ID_OFFLOAD_SYNC_ACK: off_rep_hdr = dest; off_rep_hdr->sender_tag = req->send.proto.sender_tag; off_rep_hdr->ep_ptr = ucp_request_get_dest_ep_ptr(req); return sizeof(*off_rep_hdr); } ucs_fatal("unexpected am_id"); return 0; } ucs_status_t ucp_do_am_single(uct_pending_req_t *self, uint8_t am_id, uct_pack_callback_t pack_cb, ssize_t max_packed_size) { ucp_request_t *req = ucs_container_of(self, ucp_request_t, send.uct); ucp_ep_t *ep = req->send.ep; ssize_t packed_len; uint64_t *buffer; /* if packed data can fit short active message, use it, because it should * be faster than bcopy. */ if ((max_packed_size <= UCS_ALLOCA_MAX_SIZE) && (max_packed_size <= ucp_ep_config(ep)->am.max_short)) { req->send.lane = ucp_ep_get_am_lane(ep); buffer = ucs_alloca(max_packed_size); packed_len = pack_cb(buffer, req); ucs_assertv((packed_len >= 0) && (packed_len <= max_packed_size), "packed_len=%zd max_packed_size=%zu", packed_len, max_packed_size); return uct_ep_am_short(ep->uct_eps[req->send.lane], am_id, buffer[0], &buffer[1], packed_len - sizeof(uint64_t)); } else { return ucp_do_am_bcopy_single(self, am_id, pack_cb); } } ucs_status_t ucp_proto_progress_am_single(uct_pending_req_t *self) { ucp_request_t *req = ucs_container_of(self, ucp_request_t, send.uct); ucs_status_t status = ucp_do_am_single(self, req->send.proto.am_id, ucp_proto_pack, ucp_proto_max_packed_size()); if (status == UCS_OK) { req->send.proto.comp_cb(req); } return status; } void ucp_proto_am_zcopy_req_complete(ucp_request_t *req, ucs_status_t status) { ucs_assert(req->send.state.uct_comp.count == 0); ucp_request_send_buffer_dereg(req); /* TODO register+lane change */ ucp_request_complete_send(req, status); } void ucp_proto_am_zcopy_completion(uct_completion_t *self, ucs_status_t status) { ucp_request_t *req = ucs_container_of(self, ucp_request_t, send.state.uct_comp); if (req->send.state.dt.offset == req->send.length) { ucp_proto_am_zcopy_req_complete(req, status); } else if (status != UCS_OK) { ucs_assert(req->send.state.uct_comp.count == 0); ucs_assert(status != UCS_INPROGRESS); /* NOTE: the request is in pending queue if data was not completely sent, * just dereg the buffer here and complete request on purge * pending later. */ ucp_request_send_buffer_dereg(req); req->send.state.uct_comp.func = NULL; } }