/* -*- Mode: C; c-basic-offset:4 ; indent-tabs-mode:nil ; -*- */
/*
* (C) 2006 by Argonne National Laboratory.
* See COPYRIGHT in top-level directory.
*/
#include "tcp_impl.h"
#define NUM_PREALLOC_SENDQ 10
#define MAX_SEND_IOV 10
typedef struct MPID_nem_tcp_send_q_element
{
struct MPID_nem_tcp_send_q_element *next;
size_t len; /* number of bytes left to send */
char *start; /* pointer to next byte to send */
MPID_nem_cell_ptr_t cell;
/* char buf[MPID_NEM_MAX_PACKET_LEN];*/ /* data to be sent */
} MPID_nem_tcp_send_q_element_t;
static struct {MPID_nem_tcp_send_q_element_t *top;} free_buffers = {0};
#define ALLOC_Q_ELEMENT(e) do { \
if (S_EMPTY (free_buffers)) \
{ \
MPIU_CHKPMEM_MALLOC (*(e), MPID_nem_tcp_send_q_element_t *, sizeof(MPID_nem_tcp_send_q_element_t), \
mpi_errno, "send queue element"); \
} \
else \
{ \
S_POP (&free_buffers, e); \
} \
} while (0)
/* FREE_Q_ELEMENTS() frees a list if elements starting at e0 through e1 */
#define FREE_Q_ELEMENTS(e0, e1) S_PUSH_MULTIPLE (&free_buffers, e0, e1)
#define FREE_Q_ELEMENT(e) S_PUSH (&free_buffers, e)
#undef FUNCNAME
#define FUNCNAME MPID_nem_tcp_send_init
#undef FCNAME
#define FCNAME MPL_QUOTE(FUNCNAME)
int MPID_nem_tcp_send_init(void)
{
int mpi_errno = MPI_SUCCESS;
int i;
MPIU_CHKPMEM_DECL (NUM_PREALLOC_SENDQ);
/* preallocate sendq elements */
for (i = 0; i < NUM_PREALLOC_SENDQ; ++i)
{
MPID_nem_tcp_send_q_element_t *e;
MPIU_CHKPMEM_MALLOC (e, MPID_nem_tcp_send_q_element_t *,
sizeof(MPID_nem_tcp_send_q_element_t), mpi_errno, "send queue element");
S_PUSH (&free_buffers, e);
}
MPIU_CHKPMEM_COMMIT();
return mpi_errno;
fn_fail:
MPIU_CHKPMEM_REAP();
return mpi_errno;
}
#undef FUNCNAME
#define FUNCNAME MPID_nem_tcp_send_queued
#undef FCNAME
#define FCNAME MPL_QUOTE(FUNCNAME)
int MPID_nem_tcp_send_queued(MPIDI_VC_t *vc, MPIDI_nem_tcp_request_queue_t *send_queue)
{
int mpi_errno = MPI_SUCCESS;
MPID_Request *sreq;
MPIDI_msg_sz_t offset;
MPL_IOV *iov;
int complete;
MPID_nem_tcp_vc_area *vc_tcp = VC_TCP(vc);
MPIDI_STATE_DECL(MPID_STATE_MPID_NEM_TCP_SEND_QUEUED);
MPIDI_FUNC_ENTER(MPID_STATE_MPID_NEM_TCP_SEND_QUEUED);
MPIU_DBG_MSG_P(CH3_CHANNEL, VERBOSE, "vc = %p", vc);
MPIU_Assert(vc != NULL);
if (MPIDI_CH3I_Sendq_empty(*send_queue))
goto fn_exit;
while (!MPIDI_CH3I_Sendq_empty(*send_queue))
{
sreq = MPIDI_CH3I_Sendq_head(*send_queue);
MPIU_DBG_MSG_P(CH3_CHANNEL, VERBOSE, "Sending %p", sreq);
iov = &sreq->dev.iov[sreq->dev.iov_offset];
offset = MPL_large_writev(vc_tcp->sc->fd, iov, sreq->dev.iov_count);
if (offset == 0) {
int req_errno = MPI_SUCCESS;
MPIR_ERR_SET(req_errno, MPI_ERR_OTHER, "**sock_closed");
MPIR_ERR_SET1(req_errno, MPIX_ERR_PROC_FAILED, "**comm_fail", "**comm_fail %d", vc->pg_rank);
mpi_errno = MPID_nem_tcp_cleanup_on_error(vc, req_errno);
if (mpi_errno) MPIR_ERR_POP(mpi_errno);
goto fn_exit; /* this vc is closed now, just bail out */
}
if (offset == -1)
{
if (errno == EAGAIN)
{
offset = 0;
MPIU_DBG_MSG(CH3_CHANNEL, VERBOSE, "EAGAIN");
break;
} else {
int req_errno = MPI_SUCCESS;
MPIR_ERR_SET1(req_errno, MPI_ERR_OTHER, "**writev", "**writev %s", MPIU_Strerror(errno));
MPIR_ERR_SET1(req_errno, MPIX_ERR_PROC_FAILED, "**comm_fail", "**comm_fail %d", vc->pg_rank);
mpi_errno = MPID_nem_tcp_cleanup_on_error(vc, req_errno);
if (mpi_errno) MPIR_ERR_POP(mpi_errno);
goto fn_exit; /* this vc is closed now, just bail out */
}
}
MPIU_DBG_MSG_D(CH3_CHANNEL, VERBOSE, "write " MPIDI_MSG_SZ_FMT, offset);
complete = 1;
for (iov = &sreq->dev.iov[sreq->dev.iov_offset]; iov < &sreq->dev.iov[sreq->dev.iov_offset + sreq->dev.iov_count]; ++iov)
{
if (offset < iov->MPL_IOV_LEN)
{
iov->MPL_IOV_BUF = (char *)iov->MPL_IOV_BUF + offset;
iov->MPL_IOV_LEN -= offset;
/* iov_count should be equal to the number of iov's remaining */
sreq->dev.iov_count -= ((iov - sreq->dev.iov) - sreq->dev.iov_offset);
sreq->dev.iov_offset = iov - sreq->dev.iov;
complete = 0;
break;
}
offset -= iov->MPL_IOV_LEN;
}
if (!complete)
{
/* writev couldn't write the entire iov, give up for now */
break;
}
else
{
/* sent whole message */
int (*reqFn)(MPIDI_VC_t *, MPID_Request *, int *);
reqFn = sreq->dev.OnDataAvail;
if (!reqFn)
{
MPIU_Assert(MPIDI_Request_get_type(sreq) != MPIDI_REQUEST_TYPE_GET_RESP);
mpi_errno = MPID_Request_complete(sreq);
if (mpi_errno != MPI_SUCCESS) {
MPIR_ERR_POP(mpi_errno);
}
MPIU_DBG_MSG(CH3_CHANNEL, VERBOSE, ".... complete");
MPIDI_CH3I_Sendq_dequeue(send_queue, &sreq);
continue;
}
complete = 0;
mpi_errno = reqFn(vc, sreq, &complete);
if (mpi_errno) MPIR_ERR_POP(mpi_errno);
if (complete)
{
MPIU_DBG_MSG(CH3_CHANNEL, VERBOSE, ".... complete");
MPIDI_CH3I_Sendq_dequeue(send_queue, &sreq);
continue;
}
sreq->dev.iov_offset = 0;
}
}
if (MPIDI_CH3I_Sendq_empty(*send_queue))
UNSET_PLFD(vc_tcp);
fn_exit:
MPIDI_FUNC_EXIT(MPID_STATE_MPID_NEM_TCP_SEND_QUEUED);
return mpi_errno;
fn_fail:
goto fn_exit;
}
#undef FUNCNAME
#define FUNCNAME MPID_nem_tcp_send_finalize
#undef FCNAME
#define FCNAME MPL_QUOTE(FUNCNAME)
int MPID_nem_tcp_send_finalize(void)
{
int mpi_errno = MPI_SUCCESS;
while (!S_EMPTY (free_buffers))
{
MPID_nem_tcp_send_q_element_t *e;
S_POP (&free_buffers, &e);
MPIU_Free (e);
}
return mpi_errno;
}
/* MPID_nem_tcp_conn_est -- this function is called when the
connection is finally established to send any pending sends */
#undef FUNCNAME
#define FUNCNAME MPID_nem_tcp_conn_est
#undef FCNAME
#define FCNAME MPL_QUOTE(FUNCNAME)
int MPID_nem_tcp_conn_est (MPIDI_VC_t *vc)
{
int mpi_errno = MPI_SUCCESS;
MPID_nem_tcp_vc_area *vc_tcp = VC_TCP(vc);
MPIDI_STATE_DECL(MPID_STATE_MPID_NEM_TCP_CONN_EST);
MPIDI_FUNC_ENTER(MPID_STATE_MPID_NEM_TCP_CONN_EST);
/* only update VC state when it is not being closed.
* Note that we still need change state here if the VC is passively
* connected (i.e., server in dynamic process connection) */
if (vc->state == MPIDI_VC_STATE_INACTIVE)
MPIDI_CHANGE_VC_STATE(vc, ACTIVE);
if (!MPIDI_CH3I_Sendq_empty(vc_tcp->send_queue))
{
SET_PLFD(vc_tcp);
mpi_errno = MPID_nem_tcp_send_queued(vc, &vc_tcp->send_queue);
if (mpi_errno) MPIR_ERR_POP (mpi_errno);
}
fn_fail:
MPIDI_FUNC_EXIT(MPID_STATE_MPID_NEM_TCP_CONN_EST);
return mpi_errno;
}
#undef FUNCNAME
#define FUNCNAME MPID_nem_tcp_iStartContigMsg
#undef FCNAME
#define FCNAME MPL_QUOTE(FUNCNAME)
int MPID_nem_tcp_iStartContigMsg(MPIDI_VC_t *vc, void *hdr, MPIDI_msg_sz_t hdr_sz, void *data, MPIDI_msg_sz_t data_sz,
MPID_Request **sreq_ptr)
{
int mpi_errno = MPI_SUCCESS;
MPID_Request * sreq = NULL;
MPIDI_msg_sz_t offset = 0;
MPID_nem_tcp_vc_area *vc_tcp = VC_TCP(vc);
sockconn_t *sc = vc_tcp->sc;
MPIDI_STATE_DECL(MPID_STATE_MPID_NEM_TCP_ISTARTCONTIGMSG);
MPIDI_FUNC_ENTER(MPID_STATE_MPID_NEM_TCP_ISTARTCONTIGMSG);
MPIU_Assert(hdr_sz <= sizeof(MPIDI_CH3_Pkt_t));
MPIU_DBG_MSG(CH3_CHANNEL, VERBOSE, "tcp_iStartContigMsg");
MPIDI_DBG_Print_packet((MPIDI_CH3_Pkt_t *)hdr);
if (!MPID_nem_tcp_vc_send_paused(vc_tcp)) {
if (MPID_nem_tcp_vc_is_connected(vc_tcp))
{
if (MPIDI_CH3I_Sendq_empty(vc_tcp->send_queue))
{
MPL_IOV iov[2];
iov[0].MPL_IOV_BUF = hdr;
iov[0].MPL_IOV_LEN = sizeof(MPIDI_CH3_Pkt_t);
iov[1].MPL_IOV_BUF = data;
iov[1].MPL_IOV_LEN = data_sz;
offset = MPL_large_writev(sc->fd, iov, 2);
if (offset == 0) {
int req_errno = MPI_SUCCESS;
MPIR_ERR_SET(req_errno, MPI_ERR_OTHER, "**sock_closed");
MPIR_ERR_SET1(req_errno, MPIX_ERR_PROC_FAILED, "**comm_fail", "**comm_fail %d", vc->pg_rank);
mpi_errno = MPID_nem_tcp_cleanup_on_error(vc, req_errno);
if (mpi_errno) MPIR_ERR_POP(mpi_errno);
goto fn_fail;
}
if (offset == -1)
{
if (errno == EAGAIN)
offset = 0;
else {
int req_errno = MPI_SUCCESS;
MPIR_ERR_SET1(req_errno, MPI_ERR_OTHER, "**writev", "**writev %s", MPIU_Strerror(errno));
MPIR_ERR_SET1(req_errno, MPIX_ERR_PROC_FAILED, "**comm_fail", "**comm_fail %d", vc->pg_rank);
mpi_errno = MPID_nem_tcp_cleanup_on_error(vc, req_errno);
if (mpi_errno) MPIR_ERR_POP(mpi_errno);
goto fn_fail;
}
}
MPIU_DBG_MSG_D(CH3_CHANNEL, VERBOSE, "write " MPIDI_MSG_SZ_FMT, offset);
if (offset == sizeof(MPIDI_CH3_Pkt_t) + data_sz)
{
/* sent whole message */
*sreq_ptr = NULL;
goto fn_exit;
}
}
}
else
{
/* state may be DISCONNECTED or ERROR. Calling tcp_connect in an ERROR state will return an
appropriate error code. */
mpi_errno = MPID_nem_tcp_connect(vc);
if (mpi_errno) MPIR_ERR_POP(mpi_errno);
}
}
/* create and enqueue request */
MPIU_DBG_MSG (CH3_CHANNEL, VERBOSE, "enqueuing");
/* create a request */
sreq = MPID_Request_create();
MPIU_Assert (sreq != NULL);
MPIU_Object_set_ref (sreq, 2);
sreq->kind = MPID_REQUEST_SEND;
sreq->dev.OnDataAvail = 0;
sreq->ch.vc = vc;
sreq->dev.iov_offset = 0;
if (offset < sizeof(MPIDI_CH3_Pkt_t))
{
sreq->dev.pending_pkt = *(MPIDI_CH3_Pkt_t *)hdr;
sreq->dev.iov[0].MPL_IOV_BUF = (char *)&sreq->dev.pending_pkt + offset;
sreq->dev.iov[0].MPL_IOV_LEN = sizeof(MPIDI_CH3_Pkt_t) - offset ;
if (data_sz)
{
sreq->dev.iov[1].MPL_IOV_BUF = data;
sreq->dev.iov[1].MPL_IOV_LEN = data_sz;
sreq->dev.iov_count = 2;
}
else
sreq->dev.iov_count = 1;
}
else
{
sreq->dev.iov[0].MPL_IOV_BUF = (char *)data + (offset - sizeof(MPIDI_CH3_Pkt_t));
sreq->dev.iov[0].MPL_IOV_LEN = data_sz - (offset - sizeof(MPIDI_CH3_Pkt_t));
sreq->dev.iov_count = 1;
}
MPIU_Assert(sreq->dev.iov_count >= 1 && sreq->dev.iov[0].MPL_IOV_LEN > 0);
if (MPID_nem_tcp_vc_send_paused(vc_tcp)) {
MPIDI_CH3I_Sendq_enqueue(&vc_tcp->paused_send_queue, sreq);
} else {
if (MPID_nem_tcp_vc_is_connected(vc_tcp)) {
if (MPIDI_CH3I_Sendq_empty(vc_tcp->send_queue)) {
/* this will be the first send on the queue: queue it and set the write flag on the pollfd */
MPIDI_CH3I_Sendq_enqueue(&vc_tcp->send_queue, sreq);
SET_PLFD(vc_tcp);
} else {
/* there are other sends in the queue before this one: try to send from the queue */
MPIDI_CH3I_Sendq_enqueue(&vc_tcp->send_queue, sreq);
mpi_errno = MPID_nem_tcp_send_queued(vc, &vc_tcp->send_queue);
if (mpi_errno) MPIR_ERR_POP(mpi_errno);
}
} else {
MPIDI_CH3I_Sendq_enqueue(&vc_tcp->send_queue, sreq);
}
}
*sreq_ptr = sreq;
fn_exit:
MPIDI_FUNC_EXIT(MPID_STATE_MPID_NEM_TCP_ISTARTCONTIGMSG);
return mpi_errno;
fn_fail:
goto fn_exit;
}
/* This sends the message even if the vc is in a paused state */
#undef FUNCNAME
#define FUNCNAME MPID_nem_tcp_iStartContigMsg_paused
#undef FCNAME
#define FCNAME MPL_QUOTE(FUNCNAME)
int MPID_nem_tcp_iStartContigMsg_paused(MPIDI_VC_t *vc, void *hdr, MPIDI_msg_sz_t hdr_sz, void *data, MPIDI_msg_sz_t data_sz,
MPID_Request **sreq_ptr)
{
int mpi_errno = MPI_SUCCESS;
MPID_Request * sreq = NULL;
MPIDI_msg_sz_t offset = 0;
MPID_nem_tcp_vc_area *vc_tcp = VC_TCP(vc);
sockconn_t *sc = vc_tcp->sc;
MPIDI_STATE_DECL(MPID_STATE_MPID_NEM_TCP_ISTARTCONTIGMSG_PAUSED);
MPIDI_FUNC_ENTER(MPID_STATE_MPID_NEM_TCP_ISTARTCONTIGMSG_PAUSED);
MPIU_Assert(hdr_sz <= sizeof(MPIDI_CH3_Pkt_t));
MPIU_DBG_MSG(CH3_CHANNEL, VERBOSE, "tcp_iStartContigMsg");
MPIDI_DBG_Print_packet((MPIDI_CH3_Pkt_t *)hdr);
if (MPID_nem_tcp_vc_is_connected(vc_tcp))
{
if (MPIDI_CH3I_Sendq_empty(vc_tcp->send_queue))
{
MPL_IOV iov[2];
iov[0].MPL_IOV_BUF = hdr;
iov[0].MPL_IOV_LEN = sizeof(MPIDI_CH3_Pkt_t);
iov[1].MPL_IOV_BUF = data;
iov[1].MPL_IOV_LEN = data_sz;
offset = MPL_large_writev(sc->fd, iov, 2);
if (offset == 0) {
int req_errno = MPI_SUCCESS;
MPIR_ERR_SET(req_errno, MPI_ERR_OTHER, "**sock_closed");
MPIR_ERR_SET1(req_errno, MPIX_ERR_PROC_FAILED, "**comm_fail", "**comm_fail %d", vc->pg_rank);
mpi_errno = MPID_nem_tcp_cleanup_on_error(vc, req_errno);
if (mpi_errno) MPIR_ERR_POP(mpi_errno);
goto fn_fail;
}
if (offset == -1)
{
if (errno == EAGAIN)
offset = 0;
else {
int req_errno = MPI_SUCCESS;
MPIR_ERR_SET1(req_errno, MPI_ERR_OTHER, "**writev", "**writev %s", MPIU_Strerror(errno));
MPIR_ERR_SET1(req_errno, MPIX_ERR_PROC_FAILED, "**comm_fail", "**comm_fail %d", vc->pg_rank);
mpi_errno = MPID_nem_tcp_cleanup_on_error(vc, req_errno);
if (mpi_errno) MPIR_ERR_POP(mpi_errno);
goto fn_fail;
}
}
MPIU_DBG_MSG_D(CH3_CHANNEL, VERBOSE, "write " MPIDI_MSG_SZ_FMT, offset);
if (offset == sizeof(MPIDI_CH3_Pkt_t) + data_sz)
{
/* sent whole message */
*sreq_ptr = NULL;
goto fn_exit;
}
}
}
else
{
/* state may be DISCONNECTED or ERROR. Calling tcp_connect in an ERROR state will return an
appropriate error code. */
mpi_errno = MPID_nem_tcp_connect(vc);
if (mpi_errno) MPIR_ERR_POP(mpi_errno);
}
/* create and enqueue request */
MPIU_DBG_MSG (CH3_CHANNEL, VERBOSE, "enqueuing");
/* create a request */
sreq = MPID_Request_create();
MPIU_Assert (sreq != NULL);
MPIU_Object_set_ref (sreq, 2);
sreq->kind = MPID_REQUEST_SEND;
sreq->dev.OnDataAvail = 0;
sreq->ch.vc = vc;
sreq->dev.iov_offset = 0;
if (offset < sizeof(MPIDI_CH3_Pkt_t))
{
sreq->dev.pending_pkt = *(MPIDI_CH3_Pkt_t *)hdr;
sreq->dev.iov[0].MPL_IOV_BUF = (char *)&sreq->dev.pending_pkt + offset;
sreq->dev.iov[0].MPL_IOV_LEN = sizeof(MPIDI_CH3_Pkt_t) - offset ;
if (data_sz)
{
sreq->dev.iov[1].MPL_IOV_BUF = data;
sreq->dev.iov[1].MPL_IOV_LEN = data_sz;
sreq->dev.iov_count = 2;
}
else
sreq->dev.iov_count = 1;
}
else
{
sreq->dev.iov[0].MPL_IOV_BUF = (char *)data + (offset - sizeof(MPIDI_CH3_Pkt_t));
sreq->dev.iov[0].MPL_IOV_LEN = data_sz - (offset - sizeof(MPIDI_CH3_Pkt_t));
sreq->dev.iov_count = 1;
}
MPIU_Assert(sreq->dev.iov_count >= 1 && sreq->dev.iov[0].MPL_IOV_LEN > 0);
if (MPID_nem_tcp_vc_is_connected(vc_tcp)) {
if (MPIDI_CH3I_Sendq_empty(vc_tcp->send_queue)) {
/* this will be the first send on the queue: queue it and set the write flag on the pollfd */
MPIDI_CH3I_Sendq_enqueue(&vc_tcp->send_queue, sreq);
SET_PLFD(vc_tcp);
} else {
/* there are other sends in the queue before this one: try to send from the queue */
MPIDI_CH3I_Sendq_enqueue(&vc_tcp->send_queue, sreq);
mpi_errno = MPID_nem_tcp_send_queued(vc, &vc_tcp->send_queue);
if (mpi_errno) MPIR_ERR_POP(mpi_errno);
}
} else {
MPIDI_CH3I_Sendq_enqueue(&vc_tcp->send_queue, sreq);
}
*sreq_ptr = sreq;
fn_exit:
MPIDI_FUNC_EXIT(MPID_STATE_MPID_NEM_TCP_ISTARTCONTIGMSG_PAUSED);
return mpi_errno;
fn_fail:
goto fn_exit;
}
#undef FUNCNAME
#define FUNCNAME MPID_nem_tcp_iSendContig
#undef FCNAME
#define FCNAME MPL_QUOTE(FUNCNAME)
int MPID_nem_tcp_iSendContig(MPIDI_VC_t *vc, MPID_Request *sreq, void *hdr, MPIDI_msg_sz_t hdr_sz,
void *data, MPIDI_msg_sz_t data_sz)
{
int mpi_errno = MPI_SUCCESS;
MPIDI_msg_sz_t offset = 0;
MPID_nem_tcp_vc_area *vc_tcp = VC_TCP(vc);
sockconn_t *sc = vc_tcp->sc;
MPIDI_STATE_DECL(MPID_STATE_MPID_NEM_TCP_ISENDCONTIGMSG);
MPIDI_FUNC_ENTER(MPID_STATE_MPID_NEM_TCP_ISENDCONTIGMSG);
MPIU_Assert(hdr_sz <= sizeof(MPIDI_CH3_Pkt_t));
MPIU_DBG_MSG(CH3_CHANNEL, VERBOSE, "tcp_iSendContig");
MPIDI_DBG_Print_packet((MPIDI_CH3_Pkt_t *)hdr);
if (!MPID_nem_tcp_vc_send_paused(vc_tcp)) {
if (MPID_nem_tcp_vc_is_connected(vc_tcp))
{
if (MPIDI_CH3I_Sendq_empty(vc_tcp->send_queue))
{
MPL_IOV iov[3];
int iov_n = 0;
iov[iov_n].MPL_IOV_BUF = hdr;
iov[iov_n].MPL_IOV_LEN = sizeof(MPIDI_CH3_Pkt_t);
iov_n++;
if (sreq->dev.ext_hdr_sz != 0) {
iov[iov_n].MPL_IOV_BUF = sreq->dev.ext_hdr_ptr;
iov[iov_n].MPL_IOV_LEN = sreq->dev.ext_hdr_sz;
iov_n++;
}
iov[iov_n].MPL_IOV_BUF = data;
iov[iov_n].MPL_IOV_LEN = data_sz;
iov_n++;
offset = MPL_large_writev(sc->fd, iov, iov_n);
if (offset == 0) {
int req_errno = MPI_SUCCESS;
MPIR_ERR_SET(req_errno, MPI_ERR_OTHER, "**sock_closed");
MPIR_ERR_SET1(req_errno, MPIX_ERR_PROC_FAILED, "**comm_fail", "**comm_fail %d", vc->pg_rank);
mpi_errno = MPID_nem_tcp_cleanup_on_error(vc, req_errno);
if (mpi_errno) MPIR_ERR_POP(mpi_errno);
goto fn_fail;
}
if (offset == -1)
{
if (errno == EAGAIN)
offset = 0;
else {
int req_errno = MPI_SUCCESS;
MPIR_ERR_SET1(req_errno, MPI_ERR_OTHER, "**writev", "**writev %s", MPIU_Strerror(errno));
MPIR_ERR_SET1(req_errno, MPIX_ERR_PROC_FAILED, "**comm_fail", "**comm_fail %d", vc->pg_rank);
mpi_errno = MPID_nem_tcp_cleanup_on_error(vc, req_errno);
if (mpi_errno) MPIR_ERR_POP(mpi_errno);
goto fn_fail;
}
}
MPIU_DBG_MSG_D(CH3_CHANNEL, VERBOSE, "write " MPIDI_MSG_SZ_FMT, offset);
if (offset == sizeof(MPIDI_CH3_Pkt_t) + sreq->dev.ext_hdr_sz + data_sz)
{
/* sent whole message */
int (*reqFn)(MPIDI_VC_t *, MPID_Request *, int *);
reqFn = sreq->dev.OnDataAvail;
if (!reqFn)
{
MPIU_Assert(MPIDI_Request_get_type(sreq) != MPIDI_REQUEST_TYPE_GET_RESP);
mpi_errno = MPID_Request_complete(sreq);
if (mpi_errno != MPI_SUCCESS) {
MPIR_ERR_POP(mpi_errno);
}
MPIU_DBG_MSG(CH3_CHANNEL, VERBOSE, ".... complete");
goto fn_exit;
}
else
{
int complete = 0;
mpi_errno = reqFn(vc, sreq, &complete);
if (mpi_errno) MPIR_ERR_POP(mpi_errno);
if (complete)
{
MPIU_DBG_MSG(CH3_CHANNEL, VERBOSE, ".... complete");
goto fn_exit;
}
/* not completed: more to send */
goto enqueue_request;
}
}
}
}
else
{
/* state may be DISCONNECTED or ERROR. Calling tcp_connect in an ERROR state will return an
appropriate error code. */
mpi_errno = MPID_nem_tcp_connect(vc);
if (mpi_errno) MPIR_ERR_POP(mpi_errno);
}
}
/* save iov */
sreq->dev.iov_count = 0;
if (offset < sizeof(MPIDI_CH3_Pkt_t))
{
sreq->dev.pending_pkt = *(MPIDI_CH3_Pkt_t *)hdr;
sreq->dev.iov[sreq->dev.iov_count].MPL_IOV_BUF = (char *)&sreq->dev.pending_pkt + offset;
sreq->dev.iov[sreq->dev.iov_count].MPL_IOV_LEN = sizeof(MPIDI_CH3_Pkt_t) - offset;
sreq->dev.iov_count++;
if (sreq->dev.ext_hdr_sz > 0) {
sreq->dev.iov[sreq->dev.iov_count].MPL_IOV_BUF = sreq->dev.ext_hdr_ptr;
sreq->dev.iov[sreq->dev.iov_count].MPL_IOV_LEN = sreq->dev.ext_hdr_sz;
sreq->dev.iov_count++;
}
if (data_sz)
{
sreq->dev.iov[sreq->dev.iov_count].MPL_IOV_BUF = data;
sreq->dev.iov[sreq->dev.iov_count].MPL_IOV_LEN = data_sz;
sreq->dev.iov_count++;
}
}
else if (offset < sizeof(MPIDI_CH3_Pkt_t) + sreq->dev.ext_hdr_sz) {
MPIU_Assert(sreq->dev.ext_hdr_sz > 0);
sreq->dev.iov[sreq->dev.iov_count].MPL_IOV_BUF = sreq->dev.ext_hdr_ptr;
sreq->dev.iov[sreq->dev.iov_count].MPL_IOV_LEN = sreq->dev.ext_hdr_sz;
sreq->dev.iov_count++;
if (data_sz) {
sreq->dev.iov[sreq->dev.iov_count].MPL_IOV_BUF = data;
sreq->dev.iov[sreq->dev.iov_count].MPL_IOV_LEN = data_sz;
sreq->dev.iov_count++;
}
}
else
{
sreq->dev.iov[sreq->dev.iov_count].MPL_IOV_BUF = (char *)data + (offset - sizeof(MPIDI_CH3_Pkt_t) - sreq->dev.ext_hdr_sz);
sreq->dev.iov[sreq->dev.iov_count].MPL_IOV_LEN = data_sz - (offset - sizeof(MPIDI_CH3_Pkt_t) - sreq->dev.ext_hdr_sz);
sreq->dev.iov_count++;
}
enqueue_request:
/* enqueue request */
MPIU_DBG_MSG (CH3_CHANNEL, VERBOSE, "enqueuing");
MPIU_Assert(sreq->dev.iov_count >= 1 && sreq->dev.iov[0].MPL_IOV_LEN > 0);
sreq->ch.vc = vc;
sreq->dev.iov_offset = 0;
if (MPID_nem_tcp_vc_send_paused(vc_tcp)) {
MPIDI_CH3I_Sendq_enqueue(&vc_tcp->paused_send_queue, sreq);
} else {
if (MPID_nem_tcp_vc_is_connected(vc_tcp)) {
if (MPIDI_CH3I_Sendq_empty(vc_tcp->send_queue)) {
/* this will be the first send on the queue: queue it and set the write flag on the pollfd */
MPIDI_CH3I_Sendq_enqueue(&vc_tcp->send_queue, sreq);
SET_PLFD(vc_tcp);
} else {
/* there are other sends in the queue before this one: try to send from the queue */
MPIDI_CH3I_Sendq_enqueue(&vc_tcp->send_queue, sreq);
mpi_errno = MPID_nem_tcp_send_queued(vc, &vc_tcp->send_queue);
if (mpi_errno) MPIR_ERR_POP(mpi_errno);
}
} else {
MPIDI_CH3I_Sendq_enqueue(&vc_tcp->send_queue, sreq);
}
}
fn_exit:
MPIDI_FUNC_EXIT(MPID_STATE_MPID_NEM_TCP_ISENDCONTIGMSG);
return mpi_errno;
fn_fail:
goto fn_exit;
}
#undef FUNCNAME
#define FUNCNAME MPID_nem_tcp_SendNoncontig
#undef FCNAME
#define FCNAME MPL_QUOTE(FUNCNAME)
int MPID_nem_tcp_SendNoncontig(MPIDI_VC_t *vc, MPID_Request *sreq, void *header, MPIDI_msg_sz_t hdr_sz)
{
int mpi_errno = MPI_SUCCESS;
int iov_n;
MPL_IOV iov[MPL_IOV_LIMIT];
MPL_IOV *iov_p;
MPIDI_msg_sz_t offset;
int complete;
MPID_nem_tcp_vc_area *vc_tcp = VC_TCP(vc);
int iov_offset;
MPIDI_STATE_DECL(MPID_STATE_MPID_NEM_TCP_SENDNONCONTIG);
MPIDI_FUNC_ENTER(MPID_STATE_MPID_NEM_TCP_SENDNONCONTIG);
MPIU_DBG_MSG(CH3_CHANNEL, VERBOSE, "tcp_SendNoncontig");
MPIU_Assert(hdr_sz <= sizeof(MPIDI_CH3_Pkt_t));
iov_n = 0;
iov[iov_n].MPL_IOV_BUF = header;
iov[iov_n].MPL_IOV_LEN = sizeof(MPIDI_CH3_Pkt_t);
iov_n++;
if (sreq->dev.ext_hdr_ptr != NULL) {
iov[iov_n].MPL_IOV_BUF = sreq->dev.ext_hdr_ptr;
iov[iov_n].MPL_IOV_LEN = sreq->dev.ext_hdr_sz;
iov_n++;
}
iov_offset = iov_n;
mpi_errno = MPIDI_CH3U_Request_load_send_iov(sreq, &iov[iov_offset], &iov_n);
MPIR_ERR_CHKANDJUMP(mpi_errno, mpi_errno, MPI_ERR_OTHER, "**ch3|loadsendiov");
iov_n += iov_offset;
offset = 0;
if (!MPID_nem_tcp_vc_send_paused(vc_tcp)) {
if (MPID_nem_tcp_vc_is_connected(vc_tcp))
{
if (MPIDI_CH3I_Sendq_empty(vc_tcp->send_queue))
{
offset = MPL_large_writev(vc_tcp->sc->fd, iov, iov_n);
if (offset == 0) {
int req_errno = MPI_SUCCESS;
MPIR_ERR_SET(req_errno, MPI_ERR_OTHER, "**sock_closed");
MPIR_ERR_SET1(req_errno, MPIX_ERR_PROC_FAILED, "**comm_fail", "**comm_fail %d", vc->pg_rank);
mpi_errno = MPID_nem_tcp_cleanup_on_error(vc, req_errno);
if (mpi_errno) MPIR_ERR_POP(mpi_errno);
goto fn_fail;
}
if (offset == -1)
{
if (errno == EAGAIN)
offset = 0;
else {
int req_errno = MPI_SUCCESS;
MPIR_ERR_SET1(req_errno, MPI_ERR_OTHER, "**writev", "**writev %s", MPIU_Strerror(errno));
MPIR_ERR_SET1(req_errno, MPIX_ERR_PROC_FAILED, "**comm_fail", "**comm_fail %d", vc->pg_rank);
mpi_errno = MPID_nem_tcp_cleanup_on_error(vc, req_errno);
if (mpi_errno) MPIR_ERR_POP(mpi_errno);
goto fn_fail;
}
}
MPIU_DBG_MSG_D(CH3_CHANNEL, VERBOSE, "write noncontig " MPIDI_MSG_SZ_FMT, offset);
}
}
else
{
/* state may be DISCONNECTED or ERROR. Calling tcp_connect in an ERROR state will return an
appropriate error code. */
mpi_errno = MPID_nem_tcp_connect(vc);
if (mpi_errno) MPIR_ERR_POP(mpi_errno);
}
}
if (offset < iov[0].MPL_IOV_LEN)
{
/* header was not yet sent, save it in req */
sreq->dev.pending_pkt = *(MPIDI_CH3_Pkt_t *)header;
iov[0].MPL_IOV_BUF = (MPL_IOV_BUF_CAST)&sreq->dev.pending_pkt;
iov[0].MPL_IOV_LEN = sizeof(MPIDI_CH3_Pkt_t);
}
/* check if whole iov was sent, and save any unsent portion of iov */
sreq->dev.iov_count = 0;
complete = 1;
for (iov_p = &iov[0]; iov_p < &iov[iov_n]; ++iov_p)
{
if (offset < iov_p->MPL_IOV_LEN)
{
sreq->dev.iov[sreq->dev.iov_count].MPL_IOV_BUF = (MPL_IOV_BUF_CAST)((char *)iov_p->MPL_IOV_BUF + offset);
sreq->dev.iov[sreq->dev.iov_count].MPL_IOV_LEN = iov_p->MPL_IOV_LEN - offset;
offset = 0;
++sreq->dev.iov_count;
complete = 0;
}
else
offset -= iov_p->MPL_IOV_LEN;
}
if (complete)
{
/* sent whole iov */
int (*reqFn)(MPIDI_VC_t *, MPID_Request *, int *);
reqFn = sreq->dev.OnDataAvail;
if (!reqFn)
{
mpi_errno = MPID_Request_complete(sreq);
if (mpi_errno != MPI_SUCCESS) {
MPIR_ERR_POP(mpi_errno);
}
MPIU_DBG_MSG(CH3_CHANNEL, VERBOSE, ".... complete");
goto fn_exit;
}
complete = 0;
mpi_errno = reqFn(vc, sreq, &complete);
if (mpi_errno) MPIR_ERR_POP(mpi_errno);
if (complete)
{
MPIU_DBG_MSG(CH3_CHANNEL, VERBOSE, ".... complete");
goto fn_exit;
}
}
/* enqueue request */
MPIU_DBG_MSG (CH3_CHANNEL, VERBOSE, "enqueuing");
MPIU_Assert(sreq->dev.iov_count >= 1 && sreq->dev.iov[0].MPL_IOV_LEN > 0);
sreq->ch.vc = vc;
sreq->dev.iov_offset = 0;
if (MPID_nem_tcp_vc_send_paused(vc_tcp)) {
MPIDI_CH3I_Sendq_enqueue(&vc_tcp->paused_send_queue, sreq);
} else {
if (MPID_nem_tcp_vc_is_connected(vc_tcp)) {
if (MPIDI_CH3I_Sendq_empty(vc_tcp->send_queue)) {
/* this will be the first send on the queue: queue it and set the write flag on the pollfd */
MPIDI_CH3I_Sendq_enqueue(&vc_tcp->send_queue, sreq);
SET_PLFD(vc_tcp);
} else {
/* there are other sends in the queue before this one: try to send from the queue */
MPIDI_CH3I_Sendq_enqueue(&vc_tcp->send_queue, sreq);
mpi_errno = MPID_nem_tcp_send_queued(vc, &vc_tcp->send_queue);
if (mpi_errno) MPIR_ERR_POP(mpi_errno);
}
} else {
MPIDI_CH3I_Sendq_enqueue(&vc_tcp->send_queue, sreq);
}
}
fn_exit:
MPIDI_FUNC_EXIT(MPID_STATE_MPID_NEM_TCP_SENDNONCONTIG);
return mpi_errno;
fn_fail:
MPID_Request_release(sreq);
goto fn_exit;
}
#undef FUNCNAME
#define FUNCNAME MPID_nem_tcp_error_out_send_queue
#undef FCNAME
#define FCNAME MPL_QUOTE(FUNCNAME)
int MPID_nem_tcp_error_out_send_queue(struct MPIDI_VC *const vc, int req_errno)
{
int mpi_errno = MPI_SUCCESS;
MPID_Request *req;
MPID_nem_tcp_vc_area *const vc_tcp = VC_TCP(vc);
MPIDI_STATE_DECL(MPID_STATE_MPID_NEM_TCP_ERROR_OUT_SEND_QUEUE);
MPIDI_FUNC_ENTER(MPID_STATE_MPID_NEM_TCP_ERROR_OUT_SEND_QUEUE);
/* we don't call onDataAvail or onFinal handlers because this is
an error condition and we just want to mark them as complete */
/* send queue */
while (!MPIDI_CH3I_Sendq_empty(vc_tcp->send_queue)) {
MPIDI_CH3I_Sendq_dequeue(&vc_tcp->send_queue, &req);
req->status.MPI_ERROR = req_errno;
mpi_errno = MPID_Request_complete(req);
if (mpi_errno != MPI_SUCCESS) {
MPIR_ERR_POP(mpi_errno);
}
}
/* paused send queue */
while (!MPIDI_CH3I_Sendq_empty(vc_tcp->paused_send_queue)) {
MPIDI_CH3I_Sendq_dequeue(&vc_tcp->paused_send_queue, &req);
req->status.MPI_ERROR = req_errno;
mpi_errno = MPID_Request_complete(req);
if (mpi_errno != MPI_SUCCESS) {
MPIR_ERR_POP(mpi_errno);
}
}
fn_exit:
MPIDI_FUNC_EXIT(MPID_STATE_MPID_NEM_TCP_ERROR_OUT_SEND_QUEUE);
return mpi_errno;
fn_fail:
goto fn_exit;
}