Blob Blame History Raw
/* -*- Mode: C; c-basic-offset:4 ; indent-tabs-mode:nil ; -*- */
/*
 *  (C) 2006 by Argonne National Laboratory.
 *      See COPYRIGHT in top-level directory.
 */

#include "tcp_impl.h"

#define NUM_PREALLOC_SENDQ 10
#define MAX_SEND_IOV 10

typedef struct MPID_nem_tcp_send_q_element
{
    struct MPID_nem_tcp_send_q_element *next;
    size_t len;                        /* number of bytes left to send */
    char *start;                       /* pointer to next byte to send */
    MPID_nem_cell_ptr_t cell;
    /*     char buf[MPID_NEM_MAX_PACKET_LEN];*/ /* data to be sent */
} MPID_nem_tcp_send_q_element_t;

static struct {MPID_nem_tcp_send_q_element_t *top;} free_buffers = {0};

#define ALLOC_Q_ELEMENT(e) do {                                                                                                         \
        if (S_EMPTY (free_buffers))                                                                                                     \
        {                                                                                                                               \
            MPIU_CHKPMEM_MALLOC (*(e), MPID_nem_tcp_send_q_element_t *, sizeof(MPID_nem_tcp_send_q_element_t),      \
                                 mpi_errno, "send queue element");                                                                      \
        }                                                                                                                               \
        else                                                                                                                            \
        {                                                                                                                               \
            S_POP (&free_buffers, e);                                                                                                   \
        }                                                                                                                               \
    } while (0)

/* FREE_Q_ELEMENTS() frees a list if elements starting at e0 through e1 */
#define FREE_Q_ELEMENTS(e0, e1) S_PUSH_MULTIPLE (&free_buffers, e0, e1)
#define FREE_Q_ELEMENT(e) S_PUSH (&free_buffers, e)

#undef FUNCNAME
#define FUNCNAME MPID_nem_tcp_send_init
#undef FCNAME
#define FCNAME MPL_QUOTE(FUNCNAME)
int MPID_nem_tcp_send_init(void)
{
    int mpi_errno = MPI_SUCCESS;
    int i;
    MPIU_CHKPMEM_DECL (NUM_PREALLOC_SENDQ);
    
    /* preallocate sendq elements */
    for (i = 0; i < NUM_PREALLOC_SENDQ; ++i)
    {
        MPID_nem_tcp_send_q_element_t *e;
        
        MPIU_CHKPMEM_MALLOC (e, MPID_nem_tcp_send_q_element_t *,
                             sizeof(MPID_nem_tcp_send_q_element_t), mpi_errno, "send queue element");
        S_PUSH (&free_buffers, e);
    }

    MPIU_CHKPMEM_COMMIT();
    return mpi_errno;
 fn_fail:
    MPIU_CHKPMEM_REAP();
    return mpi_errno;
}

#undef FUNCNAME
#define FUNCNAME MPID_nem_tcp_send_queued
#undef FCNAME
#define FCNAME MPL_QUOTE(FUNCNAME)
int MPID_nem_tcp_send_queued(MPIDI_VC_t *vc, MPIDI_nem_tcp_request_queue_t *send_queue)
{
    int mpi_errno = MPI_SUCCESS;
    MPID_Request *sreq;
    MPIDI_msg_sz_t offset;
    MPL_IOV *iov;
    int complete;
    MPID_nem_tcp_vc_area *vc_tcp = VC_TCP(vc);
    MPIDI_STATE_DECL(MPID_STATE_MPID_NEM_TCP_SEND_QUEUED);

    MPIDI_FUNC_ENTER(MPID_STATE_MPID_NEM_TCP_SEND_QUEUED);

    MPIU_DBG_MSG_P(CH3_CHANNEL, VERBOSE, "vc = %p", vc);
    MPIU_Assert(vc != NULL);

    if (MPIDI_CH3I_Sendq_empty(*send_queue))
	goto fn_exit;

    while (!MPIDI_CH3I_Sendq_empty(*send_queue))
    {
        sreq = MPIDI_CH3I_Sendq_head(*send_queue);
        MPIU_DBG_MSG_P(CH3_CHANNEL, VERBOSE, "Sending %p", sreq);

        iov = &sreq->dev.iov[sreq->dev.iov_offset];
        
        offset = MPL_large_writev(vc_tcp->sc->fd, iov, sreq->dev.iov_count);
        if (offset == 0) {
            int req_errno = MPI_SUCCESS;

            MPIR_ERR_SET(req_errno, MPI_ERR_OTHER, "**sock_closed");
            MPIR_ERR_SET1(req_errno, MPIX_ERR_PROC_FAILED, "**comm_fail", "**comm_fail %d", vc->pg_rank);
            mpi_errno = MPID_nem_tcp_cleanup_on_error(vc, req_errno);
            if (mpi_errno) MPIR_ERR_POP(mpi_errno);
            goto fn_exit; /* this vc is closed now, just bail out */
        }
        if (offset == -1)
        {
            if (errno == EAGAIN)
            {
                offset = 0;
                MPIU_DBG_MSG(CH3_CHANNEL, VERBOSE, "EAGAIN");
                break;
            } else {
                int req_errno = MPI_SUCCESS;
                MPIR_ERR_SET1(req_errno, MPI_ERR_OTHER, "**writev", "**writev %s", MPIU_Strerror(errno));
                MPIR_ERR_SET1(req_errno, MPIX_ERR_PROC_FAILED, "**comm_fail", "**comm_fail %d", vc->pg_rank);
                mpi_errno = MPID_nem_tcp_cleanup_on_error(vc, req_errno);
                if (mpi_errno) MPIR_ERR_POP(mpi_errno);
                goto fn_exit; /* this vc is closed now, just bail out */
            }
        }
        MPIU_DBG_MSG_D(CH3_CHANNEL, VERBOSE, "write " MPIDI_MSG_SZ_FMT, offset);

        complete = 1;
        for (iov = &sreq->dev.iov[sreq->dev.iov_offset]; iov < &sreq->dev.iov[sreq->dev.iov_offset + sreq->dev.iov_count]; ++iov)
        {
            if (offset < iov->MPL_IOV_LEN)
            {
                iov->MPL_IOV_BUF = (char *)iov->MPL_IOV_BUF + offset;
                iov->MPL_IOV_LEN -= offset;
                /* iov_count should be equal to the number of iov's remaining */
                sreq->dev.iov_count -= ((iov - sreq->dev.iov) - sreq->dev.iov_offset);
                sreq->dev.iov_offset = iov - sreq->dev.iov;
                complete = 0;
                break;
            }
            offset -= iov->MPL_IOV_LEN;
        }
        if (!complete)
        {
            /* writev couldn't write the entire iov, give up for now */
            break;
        }
        else
        {
            /* sent whole message */
            int (*reqFn)(MPIDI_VC_t *, MPID_Request *, int *);

            reqFn = sreq->dev.OnDataAvail;
            if (!reqFn)
            {
                MPIU_Assert(MPIDI_Request_get_type(sreq) != MPIDI_REQUEST_TYPE_GET_RESP);
                mpi_errno = MPID_Request_complete(sreq);
                if (mpi_errno != MPI_SUCCESS) {
                    MPIR_ERR_POP(mpi_errno);
                }
                MPIU_DBG_MSG(CH3_CHANNEL, VERBOSE, ".... complete");
                MPIDI_CH3I_Sendq_dequeue(send_queue, &sreq);
                continue;
            }

            complete = 0;
            mpi_errno = reqFn(vc, sreq, &complete);
            if (mpi_errno) MPIR_ERR_POP(mpi_errno);
            
            if (complete)
            {
                MPIU_DBG_MSG(CH3_CHANNEL, VERBOSE, ".... complete");
                MPIDI_CH3I_Sendq_dequeue(send_queue, &sreq);
                continue;
            }
            sreq->dev.iov_offset = 0;
        }
    }

    if (MPIDI_CH3I_Sendq_empty(*send_queue))
        UNSET_PLFD(vc_tcp);
    
fn_exit:
    MPIDI_FUNC_EXIT(MPID_STATE_MPID_NEM_TCP_SEND_QUEUED);
    return mpi_errno;
fn_fail:
    goto fn_exit;
}

#undef FUNCNAME
#define FUNCNAME MPID_nem_tcp_send_finalize
#undef FCNAME
#define FCNAME MPL_QUOTE(FUNCNAME)
int MPID_nem_tcp_send_finalize(void)
{
    int mpi_errno = MPI_SUCCESS;

    while (!S_EMPTY (free_buffers))
    {
        MPID_nem_tcp_send_q_element_t *e;
        S_POP (&free_buffers, &e);
        MPIU_Free (e);
    }
    return mpi_errno;
}

/* MPID_nem_tcp_conn_est -- this function is called when the
   connection is finally established to send any pending sends */
#undef FUNCNAME
#define FUNCNAME MPID_nem_tcp_conn_est
#undef FCNAME
#define FCNAME MPL_QUOTE(FUNCNAME)
int MPID_nem_tcp_conn_est (MPIDI_VC_t *vc)
{
    int mpi_errno = MPI_SUCCESS;
    MPID_nem_tcp_vc_area *vc_tcp = VC_TCP(vc);
    MPIDI_STATE_DECL(MPID_STATE_MPID_NEM_TCP_CONN_EST);

    MPIDI_FUNC_ENTER(MPID_STATE_MPID_NEM_TCP_CONN_EST);

    /* only update VC state when it is not being closed.
     * Note that we still need change state here if the VC is passively
     * connected (i.e., server in dynamic process connection) */
    if (vc->state == MPIDI_VC_STATE_INACTIVE)
        MPIDI_CHANGE_VC_STATE(vc, ACTIVE);

    if (!MPIDI_CH3I_Sendq_empty(vc_tcp->send_queue))
    {
        SET_PLFD(vc_tcp);
        mpi_errno = MPID_nem_tcp_send_queued(vc, &vc_tcp->send_queue);
        if (mpi_errno) MPIR_ERR_POP (mpi_errno);
    }

 fn_fail:
    MPIDI_FUNC_EXIT(MPID_STATE_MPID_NEM_TCP_CONN_EST);
    return mpi_errno;
}

#undef FUNCNAME
#define FUNCNAME MPID_nem_tcp_iStartContigMsg
#undef FCNAME
#define FCNAME MPL_QUOTE(FUNCNAME)
int MPID_nem_tcp_iStartContigMsg(MPIDI_VC_t *vc, void *hdr, MPIDI_msg_sz_t hdr_sz, void *data, MPIDI_msg_sz_t data_sz,
                                 MPID_Request **sreq_ptr)
{
    int mpi_errno = MPI_SUCCESS;
    MPID_Request * sreq = NULL;
    MPIDI_msg_sz_t offset = 0;
    MPID_nem_tcp_vc_area *vc_tcp = VC_TCP(vc);
    sockconn_t *sc = vc_tcp->sc;
    MPIDI_STATE_DECL(MPID_STATE_MPID_NEM_TCP_ISTARTCONTIGMSG);

    MPIDI_FUNC_ENTER(MPID_STATE_MPID_NEM_TCP_ISTARTCONTIGMSG);
    
    MPIU_Assert(hdr_sz <= sizeof(MPIDI_CH3_Pkt_t));
    
    MPIU_DBG_MSG(CH3_CHANNEL, VERBOSE, "tcp_iStartContigMsg");
    MPIDI_DBG_Print_packet((MPIDI_CH3_Pkt_t *)hdr);

    if (!MPID_nem_tcp_vc_send_paused(vc_tcp)) {
        if (MPID_nem_tcp_vc_is_connected(vc_tcp))
        {
            if (MPIDI_CH3I_Sendq_empty(vc_tcp->send_queue))
            {
                MPL_IOV iov[2];
                
                iov[0].MPL_IOV_BUF = hdr;
                iov[0].MPL_IOV_LEN = sizeof(MPIDI_CH3_Pkt_t);
                iov[1].MPL_IOV_BUF = data;
                iov[1].MPL_IOV_LEN = data_sz;
                
                offset = MPL_large_writev(sc->fd, iov, 2);
                if (offset == 0) {
                    int req_errno = MPI_SUCCESS;

                    MPIR_ERR_SET(req_errno, MPI_ERR_OTHER, "**sock_closed");
                    MPIR_ERR_SET1(req_errno, MPIX_ERR_PROC_FAILED, "**comm_fail", "**comm_fail %d", vc->pg_rank);
                    mpi_errno = MPID_nem_tcp_cleanup_on_error(vc, req_errno);
                    if (mpi_errno) MPIR_ERR_POP(mpi_errno);
                    goto fn_fail;
                }
                if (offset == -1)
                {
                    if (errno == EAGAIN)
                        offset = 0;
                    else {
                        int req_errno = MPI_SUCCESS;
                        MPIR_ERR_SET1(req_errno, MPI_ERR_OTHER, "**writev", "**writev %s", MPIU_Strerror(errno));
                        MPIR_ERR_SET1(req_errno, MPIX_ERR_PROC_FAILED, "**comm_fail", "**comm_fail %d", vc->pg_rank);
                        mpi_errno = MPID_nem_tcp_cleanup_on_error(vc, req_errno);
                        if (mpi_errno) MPIR_ERR_POP(mpi_errno);
                        goto fn_fail;
                    }
                }
                MPIU_DBG_MSG_D(CH3_CHANNEL, VERBOSE, "write " MPIDI_MSG_SZ_FMT, offset);
                
                if (offset == sizeof(MPIDI_CH3_Pkt_t) + data_sz)
                {
                    /* sent whole message */
                    *sreq_ptr = NULL;
                    goto fn_exit;
                }
            }
        }
        else
        {
            /* state may be DISCONNECTED or ERROR.  Calling tcp_connect in an ERROR state will return an
               appropriate error code. */
            mpi_errno = MPID_nem_tcp_connect(vc);
            if (mpi_errno) MPIR_ERR_POP(mpi_errno);
        }
    }

    /* create and enqueue request */
    MPIU_DBG_MSG (CH3_CHANNEL, VERBOSE, "enqueuing");

    /* create a request */
    sreq = MPID_Request_create();
    MPIU_Assert (sreq != NULL);
    MPIU_Object_set_ref (sreq, 2);
    sreq->kind = MPID_REQUEST_SEND;

    sreq->dev.OnDataAvail = 0;
    sreq->ch.vc = vc;
    sreq->dev.iov_offset = 0;

    if (offset < sizeof(MPIDI_CH3_Pkt_t))
    {
        sreq->dev.pending_pkt = *(MPIDI_CH3_Pkt_t *)hdr;
        sreq->dev.iov[0].MPL_IOV_BUF = (char *)&sreq->dev.pending_pkt + offset;
        sreq->dev.iov[0].MPL_IOV_LEN = sizeof(MPIDI_CH3_Pkt_t) - offset ;
        if (data_sz)
        {
            sreq->dev.iov[1].MPL_IOV_BUF = data;
            sreq->dev.iov[1].MPL_IOV_LEN = data_sz;
            sreq->dev.iov_count = 2;
        }
        else
            sreq->dev.iov_count = 1;
    }
    else
    {
        sreq->dev.iov[0].MPL_IOV_BUF = (char *)data + (offset - sizeof(MPIDI_CH3_Pkt_t));
        sreq->dev.iov[0].MPL_IOV_LEN = data_sz - (offset - sizeof(MPIDI_CH3_Pkt_t));
        sreq->dev.iov_count = 1;
    }
    
    MPIU_Assert(sreq->dev.iov_count >= 1 && sreq->dev.iov[0].MPL_IOV_LEN > 0);

    if (MPID_nem_tcp_vc_send_paused(vc_tcp)) {
        MPIDI_CH3I_Sendq_enqueue(&vc_tcp->paused_send_queue, sreq);
    } else {
        if (MPID_nem_tcp_vc_is_connected(vc_tcp)) {
            if (MPIDI_CH3I_Sendq_empty(vc_tcp->send_queue)) {
                /* this will be the first send on the queue: queue it and set the write flag on the pollfd */
                MPIDI_CH3I_Sendq_enqueue(&vc_tcp->send_queue, sreq);
                SET_PLFD(vc_tcp);
            } else {
                /* there are other sends in the queue before this one: try to send from the queue */
                MPIDI_CH3I_Sendq_enqueue(&vc_tcp->send_queue, sreq);
                mpi_errno = MPID_nem_tcp_send_queued(vc, &vc_tcp->send_queue);
                if (mpi_errno) MPIR_ERR_POP(mpi_errno);
            }
        } else {
            MPIDI_CH3I_Sendq_enqueue(&vc_tcp->send_queue, sreq);
        }
    }
    
    *sreq_ptr = sreq;
    
fn_exit:
    MPIDI_FUNC_EXIT(MPID_STATE_MPID_NEM_TCP_ISTARTCONTIGMSG);
    return mpi_errno;
fn_fail:
    goto fn_exit;
}

/* This sends the message even if the vc is in a paused state */
#undef FUNCNAME
#define FUNCNAME MPID_nem_tcp_iStartContigMsg_paused
#undef FCNAME
#define FCNAME MPL_QUOTE(FUNCNAME)
int MPID_nem_tcp_iStartContigMsg_paused(MPIDI_VC_t *vc, void *hdr, MPIDI_msg_sz_t hdr_sz, void *data, MPIDI_msg_sz_t data_sz,
                                        MPID_Request **sreq_ptr)
{
    int mpi_errno = MPI_SUCCESS;
    MPID_Request * sreq = NULL;
    MPIDI_msg_sz_t offset = 0;
    MPID_nem_tcp_vc_area *vc_tcp = VC_TCP(vc);
    sockconn_t *sc = vc_tcp->sc;
    MPIDI_STATE_DECL(MPID_STATE_MPID_NEM_TCP_ISTARTCONTIGMSG_PAUSED);

    MPIDI_FUNC_ENTER(MPID_STATE_MPID_NEM_TCP_ISTARTCONTIGMSG_PAUSED);
    
    MPIU_Assert(hdr_sz <= sizeof(MPIDI_CH3_Pkt_t));
    
    MPIU_DBG_MSG(CH3_CHANNEL, VERBOSE, "tcp_iStartContigMsg");
    MPIDI_DBG_Print_packet((MPIDI_CH3_Pkt_t *)hdr);

    if (MPID_nem_tcp_vc_is_connected(vc_tcp))
    {
        if (MPIDI_CH3I_Sendq_empty(vc_tcp->send_queue))
        {
            MPL_IOV iov[2];
                
            iov[0].MPL_IOV_BUF = hdr;
            iov[0].MPL_IOV_LEN = sizeof(MPIDI_CH3_Pkt_t);
            iov[1].MPL_IOV_BUF = data;
            iov[1].MPL_IOV_LEN = data_sz;
                
            offset = MPL_large_writev(sc->fd, iov, 2);
            if (offset == 0) {
                int req_errno = MPI_SUCCESS;

                MPIR_ERR_SET(req_errno, MPI_ERR_OTHER, "**sock_closed");
                MPIR_ERR_SET1(req_errno, MPIX_ERR_PROC_FAILED, "**comm_fail", "**comm_fail %d", vc->pg_rank);
                mpi_errno = MPID_nem_tcp_cleanup_on_error(vc, req_errno);
                if (mpi_errno) MPIR_ERR_POP(mpi_errno);
                goto fn_fail;
            }
            if (offset == -1)
            {
                if (errno == EAGAIN)
                    offset = 0;
                else {
                    int req_errno = MPI_SUCCESS;
                    MPIR_ERR_SET1(req_errno, MPI_ERR_OTHER, "**writev", "**writev %s", MPIU_Strerror(errno));
                    MPIR_ERR_SET1(req_errno, MPIX_ERR_PROC_FAILED, "**comm_fail", "**comm_fail %d", vc->pg_rank);

                    mpi_errno = MPID_nem_tcp_cleanup_on_error(vc, req_errno);
                    if (mpi_errno) MPIR_ERR_POP(mpi_errno);
                    goto fn_fail;
                }
            }
            MPIU_DBG_MSG_D(CH3_CHANNEL, VERBOSE, "write " MPIDI_MSG_SZ_FMT, offset);
                
            if (offset == sizeof(MPIDI_CH3_Pkt_t) + data_sz)
            {
                /* sent whole message */
                *sreq_ptr = NULL;
                goto fn_exit;
            }
        }
    }
    else
    {
        /* state may be DISCONNECTED or ERROR.  Calling tcp_connect in an ERROR state will return an
           appropriate error code. */
        mpi_errno = MPID_nem_tcp_connect(vc);
        if (mpi_errno) MPIR_ERR_POP(mpi_errno);
    }

    /* create and enqueue request */
    MPIU_DBG_MSG (CH3_CHANNEL, VERBOSE, "enqueuing");

    /* create a request */
    sreq = MPID_Request_create();
    MPIU_Assert (sreq != NULL);
    MPIU_Object_set_ref (sreq, 2);
    sreq->kind = MPID_REQUEST_SEND;

    sreq->dev.OnDataAvail = 0;
    sreq->ch.vc = vc;
    sreq->dev.iov_offset = 0;

    if (offset < sizeof(MPIDI_CH3_Pkt_t))
    {
        sreq->dev.pending_pkt = *(MPIDI_CH3_Pkt_t *)hdr;
        sreq->dev.iov[0].MPL_IOV_BUF = (char *)&sreq->dev.pending_pkt + offset;
        sreq->dev.iov[0].MPL_IOV_LEN = sizeof(MPIDI_CH3_Pkt_t) - offset ;
        if (data_sz)
        {
            sreq->dev.iov[1].MPL_IOV_BUF = data;
            sreq->dev.iov[1].MPL_IOV_LEN = data_sz;
            sreq->dev.iov_count = 2;
        }
        else
            sreq->dev.iov_count = 1;
    }
    else
    {
        sreq->dev.iov[0].MPL_IOV_BUF = (char *)data + (offset - sizeof(MPIDI_CH3_Pkt_t));
        sreq->dev.iov[0].MPL_IOV_LEN = data_sz - (offset - sizeof(MPIDI_CH3_Pkt_t));
        sreq->dev.iov_count = 1;
    }
    
    MPIU_Assert(sreq->dev.iov_count >= 1 && sreq->dev.iov[0].MPL_IOV_LEN > 0);

    if (MPID_nem_tcp_vc_is_connected(vc_tcp)) {
        if (MPIDI_CH3I_Sendq_empty(vc_tcp->send_queue)) {
            /* this will be the first send on the queue: queue it and set the write flag on the pollfd */
            MPIDI_CH3I_Sendq_enqueue(&vc_tcp->send_queue, sreq);
            SET_PLFD(vc_tcp);
        } else {
            /* there are other sends in the queue before this one: try to send from the queue */
            MPIDI_CH3I_Sendq_enqueue(&vc_tcp->send_queue, sreq);
            mpi_errno = MPID_nem_tcp_send_queued(vc, &vc_tcp->send_queue);
            if (mpi_errno) MPIR_ERR_POP(mpi_errno);
        }
    } else {
        MPIDI_CH3I_Sendq_enqueue(&vc_tcp->send_queue, sreq);
    }
    
    *sreq_ptr = sreq;
    
fn_exit:
    MPIDI_FUNC_EXIT(MPID_STATE_MPID_NEM_TCP_ISTARTCONTIGMSG_PAUSED);
    return mpi_errno;
fn_fail:
    goto fn_exit;
}

#undef FUNCNAME
#define FUNCNAME MPID_nem_tcp_iSendContig
#undef FCNAME
#define FCNAME MPL_QUOTE(FUNCNAME)
int MPID_nem_tcp_iSendContig(MPIDI_VC_t *vc, MPID_Request *sreq, void *hdr, MPIDI_msg_sz_t hdr_sz,
                             void *data, MPIDI_msg_sz_t data_sz)
{
    int mpi_errno = MPI_SUCCESS;
    MPIDI_msg_sz_t offset = 0;
    MPID_nem_tcp_vc_area *vc_tcp = VC_TCP(vc);
    sockconn_t *sc = vc_tcp->sc;
    MPIDI_STATE_DECL(MPID_STATE_MPID_NEM_TCP_ISENDCONTIGMSG);

    MPIDI_FUNC_ENTER(MPID_STATE_MPID_NEM_TCP_ISENDCONTIGMSG);
    
    MPIU_Assert(hdr_sz <= sizeof(MPIDI_CH3_Pkt_t));
    
    MPIU_DBG_MSG(CH3_CHANNEL, VERBOSE, "tcp_iSendContig");

    MPIDI_DBG_Print_packet((MPIDI_CH3_Pkt_t *)hdr);

    if (!MPID_nem_tcp_vc_send_paused(vc_tcp)) {
        if (MPID_nem_tcp_vc_is_connected(vc_tcp))
        {
            if (MPIDI_CH3I_Sendq_empty(vc_tcp->send_queue))
            {
                MPL_IOV iov[3];
                int iov_n = 0;

                iov[iov_n].MPL_IOV_BUF = hdr;
                iov[iov_n].MPL_IOV_LEN = sizeof(MPIDI_CH3_Pkt_t);
                iov_n++;

                if (sreq->dev.ext_hdr_sz != 0) {
                    iov[iov_n].MPL_IOV_BUF = sreq->dev.ext_hdr_ptr;
                    iov[iov_n].MPL_IOV_LEN = sreq->dev.ext_hdr_sz;
                    iov_n++;
                }

                iov[iov_n].MPL_IOV_BUF = data;
                iov[iov_n].MPL_IOV_LEN = data_sz;
                iov_n++;
                
                offset = MPL_large_writev(sc->fd, iov, iov_n);
                if (offset == 0) {
                    int req_errno = MPI_SUCCESS;

                    MPIR_ERR_SET(req_errno, MPI_ERR_OTHER, "**sock_closed");
                    MPIR_ERR_SET1(req_errno, MPIX_ERR_PROC_FAILED, "**comm_fail", "**comm_fail %d", vc->pg_rank);
                    mpi_errno = MPID_nem_tcp_cleanup_on_error(vc, req_errno);
                    if (mpi_errno) MPIR_ERR_POP(mpi_errno);
                    goto fn_fail;
                }
                if (offset == -1)
                {
                    if (errno == EAGAIN)
                        offset = 0;
                    else {
                        int req_errno = MPI_SUCCESS;
                        MPIR_ERR_SET1(req_errno, MPI_ERR_OTHER, "**writev", "**writev %s", MPIU_Strerror(errno));
                        MPIR_ERR_SET1(req_errno, MPIX_ERR_PROC_FAILED, "**comm_fail", "**comm_fail %d", vc->pg_rank);
                        mpi_errno = MPID_nem_tcp_cleanup_on_error(vc, req_errno);
                        if (mpi_errno) MPIR_ERR_POP(mpi_errno);
                        goto fn_fail;
                    }
                }
                MPIU_DBG_MSG_D(CH3_CHANNEL, VERBOSE, "write " MPIDI_MSG_SZ_FMT, offset);
                
                if (offset == sizeof(MPIDI_CH3_Pkt_t) + sreq->dev.ext_hdr_sz + data_sz)
                {
                    /* sent whole message */
                    int (*reqFn)(MPIDI_VC_t *, MPID_Request *, int *);
                    
                    reqFn = sreq->dev.OnDataAvail;
                    if (!reqFn)
                    {
                        MPIU_Assert(MPIDI_Request_get_type(sreq) != MPIDI_REQUEST_TYPE_GET_RESP);
                        mpi_errno = MPID_Request_complete(sreq);
                        if (mpi_errno != MPI_SUCCESS) {
                            MPIR_ERR_POP(mpi_errno);
                        }
                        MPIU_DBG_MSG(CH3_CHANNEL, VERBOSE, ".... complete");
                        goto fn_exit;
                    }
                    else
                    {
                        int complete = 0;
                        
                        mpi_errno = reqFn(vc, sreq, &complete);
                        if (mpi_errno) MPIR_ERR_POP(mpi_errno);
                        
                        if (complete)
                        {
                            MPIU_DBG_MSG(CH3_CHANNEL, VERBOSE, ".... complete");
                            goto fn_exit;
                        }
                        
                        /* not completed: more to send */
                        goto enqueue_request;
                    }
                }
            }
        }
        else
        {
            /* state may be DISCONNECTED or ERROR.  Calling tcp_connect in an ERROR state will return an
               appropriate error code. */
            mpi_errno = MPID_nem_tcp_connect(vc);
            if (mpi_errno) MPIR_ERR_POP(mpi_errno);
        }
    }


    /* save iov */
    sreq->dev.iov_count = 0;
    if (offset < sizeof(MPIDI_CH3_Pkt_t))
    {
        sreq->dev.pending_pkt = *(MPIDI_CH3_Pkt_t *)hdr;

        sreq->dev.iov[sreq->dev.iov_count].MPL_IOV_BUF = (char *)&sreq->dev.pending_pkt + offset;
        sreq->dev.iov[sreq->dev.iov_count].MPL_IOV_LEN = sizeof(MPIDI_CH3_Pkt_t) - offset;
        sreq->dev.iov_count++;

        if (sreq->dev.ext_hdr_sz > 0) {
            sreq->dev.iov[sreq->dev.iov_count].MPL_IOV_BUF = sreq->dev.ext_hdr_ptr;
            sreq->dev.iov[sreq->dev.iov_count].MPL_IOV_LEN = sreq->dev.ext_hdr_sz;
            sreq->dev.iov_count++;
        }

        if (data_sz)
        {
            sreq->dev.iov[sreq->dev.iov_count].MPL_IOV_BUF = data;
            sreq->dev.iov[sreq->dev.iov_count].MPL_IOV_LEN = data_sz;
            sreq->dev.iov_count++;
        }
    }
    else if (offset < sizeof(MPIDI_CH3_Pkt_t) + sreq->dev.ext_hdr_sz) {
        MPIU_Assert(sreq->dev.ext_hdr_sz > 0);
        sreq->dev.iov[sreq->dev.iov_count].MPL_IOV_BUF = sreq->dev.ext_hdr_ptr;
        sreq->dev.iov[sreq->dev.iov_count].MPL_IOV_LEN = sreq->dev.ext_hdr_sz;
        sreq->dev.iov_count++;

        if (data_sz) {
            sreq->dev.iov[sreq->dev.iov_count].MPL_IOV_BUF = data;
            sreq->dev.iov[sreq->dev.iov_count].MPL_IOV_LEN = data_sz;
            sreq->dev.iov_count++;
        }
    }
    else
    {
        sreq->dev.iov[sreq->dev.iov_count].MPL_IOV_BUF = (char *)data + (offset - sizeof(MPIDI_CH3_Pkt_t) - sreq->dev.ext_hdr_sz);
        sreq->dev.iov[sreq->dev.iov_count].MPL_IOV_LEN = data_sz - (offset - sizeof(MPIDI_CH3_Pkt_t) - sreq->dev.ext_hdr_sz);
        sreq->dev.iov_count++;
    }

enqueue_request:
    /* enqueue request */
    MPIU_DBG_MSG (CH3_CHANNEL, VERBOSE, "enqueuing");
    MPIU_Assert(sreq->dev.iov_count >= 1 && sreq->dev.iov[0].MPL_IOV_LEN > 0);

    sreq->ch.vc = vc;
    sreq->dev.iov_offset = 0;

    if (MPID_nem_tcp_vc_send_paused(vc_tcp)) {
        MPIDI_CH3I_Sendq_enqueue(&vc_tcp->paused_send_queue, sreq);
    } else {
        if (MPID_nem_tcp_vc_is_connected(vc_tcp)) {
            if (MPIDI_CH3I_Sendq_empty(vc_tcp->send_queue)) {
                /* this will be the first send on the queue: queue it and set the write flag on the pollfd */
                MPIDI_CH3I_Sendq_enqueue(&vc_tcp->send_queue, sreq);
                SET_PLFD(vc_tcp);
            } else {
                /* there are other sends in the queue before this one: try to send from the queue */
                MPIDI_CH3I_Sendq_enqueue(&vc_tcp->send_queue, sreq);
                mpi_errno = MPID_nem_tcp_send_queued(vc, &vc_tcp->send_queue);
                if (mpi_errno) MPIR_ERR_POP(mpi_errno);
            }
        } else {
            MPIDI_CH3I_Sendq_enqueue(&vc_tcp->send_queue, sreq);
        }
    }
    
fn_exit:
    MPIDI_FUNC_EXIT(MPID_STATE_MPID_NEM_TCP_ISENDCONTIGMSG);
    return mpi_errno;
fn_fail:
    goto fn_exit;
}


#undef FUNCNAME
#define FUNCNAME MPID_nem_tcp_SendNoncontig
#undef FCNAME
#define FCNAME MPL_QUOTE(FUNCNAME)
int MPID_nem_tcp_SendNoncontig(MPIDI_VC_t *vc, MPID_Request *sreq, void *header, MPIDI_msg_sz_t hdr_sz)
{
    int mpi_errno = MPI_SUCCESS;
    int iov_n;
    MPL_IOV iov[MPL_IOV_LIMIT];
    MPL_IOV *iov_p;
    MPIDI_msg_sz_t offset;
    int complete;
    MPID_nem_tcp_vc_area *vc_tcp = VC_TCP(vc);
    int iov_offset;

    MPIDI_STATE_DECL(MPID_STATE_MPID_NEM_TCP_SENDNONCONTIG);
    MPIDI_FUNC_ENTER(MPID_STATE_MPID_NEM_TCP_SENDNONCONTIG);
    MPIU_DBG_MSG(CH3_CHANNEL, VERBOSE, "tcp_SendNoncontig");
    MPIU_Assert(hdr_sz <= sizeof(MPIDI_CH3_Pkt_t));

    iov_n = 0;

    iov[iov_n].MPL_IOV_BUF = header;
    iov[iov_n].MPL_IOV_LEN = sizeof(MPIDI_CH3_Pkt_t);
    iov_n++;

    if (sreq->dev.ext_hdr_ptr != NULL) {
        iov[iov_n].MPL_IOV_BUF = sreq->dev.ext_hdr_ptr;
        iov[iov_n].MPL_IOV_LEN = sreq->dev.ext_hdr_sz;
        iov_n++;
    }

    iov_offset = iov_n;

    mpi_errno = MPIDI_CH3U_Request_load_send_iov(sreq, &iov[iov_offset], &iov_n);
    MPIR_ERR_CHKANDJUMP(mpi_errno, mpi_errno, MPI_ERR_OTHER, "**ch3|loadsendiov");

    iov_n += iov_offset;
    offset = 0;

    if (!MPID_nem_tcp_vc_send_paused(vc_tcp)) {
        if (MPID_nem_tcp_vc_is_connected(vc_tcp))
        {
            if (MPIDI_CH3I_Sendq_empty(vc_tcp->send_queue))
            {
                offset = MPL_large_writev(vc_tcp->sc->fd, iov, iov_n);
                if (offset == 0) {
                    int req_errno = MPI_SUCCESS;

                    MPIR_ERR_SET(req_errno, MPI_ERR_OTHER, "**sock_closed");
                    MPIR_ERR_SET1(req_errno, MPIX_ERR_PROC_FAILED, "**comm_fail", "**comm_fail %d", vc->pg_rank);
                    mpi_errno = MPID_nem_tcp_cleanup_on_error(vc, req_errno);
                    if (mpi_errno) MPIR_ERR_POP(mpi_errno);
                    goto fn_fail;
                }
                if (offset == -1)
                {
                    if (errno == EAGAIN)
                        offset = 0;
                    else {
                        int req_errno = MPI_SUCCESS;
                        MPIR_ERR_SET1(req_errno, MPI_ERR_OTHER, "**writev", "**writev %s", MPIU_Strerror(errno));
                        MPIR_ERR_SET1(req_errno, MPIX_ERR_PROC_FAILED, "**comm_fail", "**comm_fail %d", vc->pg_rank);
                        mpi_errno = MPID_nem_tcp_cleanup_on_error(vc, req_errno);
                        if (mpi_errno) MPIR_ERR_POP(mpi_errno);
                        goto fn_fail;
                    }
                }
                
                MPIU_DBG_MSG_D(CH3_CHANNEL, VERBOSE, "write noncontig " MPIDI_MSG_SZ_FMT, offset);
            }
        }
        else
        {
            /* state may be DISCONNECTED or ERROR.  Calling tcp_connect in an ERROR state will return an
               appropriate error code. */
            mpi_errno = MPID_nem_tcp_connect(vc);
            if (mpi_errno) MPIR_ERR_POP(mpi_errno);
        }
    }
    
    if (offset < iov[0].MPL_IOV_LEN)
    {
        /* header was not yet sent, save it in req */
        sreq->dev.pending_pkt = *(MPIDI_CH3_Pkt_t *)header;
        iov[0].MPL_IOV_BUF = (MPL_IOV_BUF_CAST)&sreq->dev.pending_pkt;
        iov[0].MPL_IOV_LEN = sizeof(MPIDI_CH3_Pkt_t);
    }

    /* check if whole iov was sent, and save any unsent portion of iov */
    sreq->dev.iov_count = 0;
    complete = 1;
    for (iov_p = &iov[0]; iov_p < &iov[iov_n]; ++iov_p)
    {
        if (offset < iov_p->MPL_IOV_LEN)
        {
            sreq->dev.iov[sreq->dev.iov_count].MPL_IOV_BUF = (MPL_IOV_BUF_CAST)((char *)iov_p->MPL_IOV_BUF + offset);
            sreq->dev.iov[sreq->dev.iov_count].MPL_IOV_LEN = iov_p->MPL_IOV_LEN - offset;
            offset = 0;
            ++sreq->dev.iov_count;
            complete = 0;
        }
        else
            offset -= iov_p->MPL_IOV_LEN;
    }
        
    if (complete)
    {
        /* sent whole iov */
        int (*reqFn)(MPIDI_VC_t *, MPID_Request *, int *);

        reqFn = sreq->dev.OnDataAvail;
        if (!reqFn)
        {
            mpi_errno = MPID_Request_complete(sreq);
            if (mpi_errno != MPI_SUCCESS) {
                MPIR_ERR_POP(mpi_errno);
            }
            MPIU_DBG_MSG(CH3_CHANNEL, VERBOSE, ".... complete");
            goto fn_exit;
        }

        complete = 0;
        mpi_errno = reqFn(vc, sreq, &complete);
        if (mpi_errno) MPIR_ERR_POP(mpi_errno);
            
        if (complete)
        {
            MPIU_DBG_MSG(CH3_CHANNEL, VERBOSE, ".... complete");
            goto fn_exit;
        }
    }
        
    /* enqueue request */
    MPIU_DBG_MSG (CH3_CHANNEL, VERBOSE, "enqueuing");
    MPIU_Assert(sreq->dev.iov_count >= 1 && sreq->dev.iov[0].MPL_IOV_LEN > 0);
        
    sreq->ch.vc = vc;
    sreq->dev.iov_offset = 0;
        
    if (MPID_nem_tcp_vc_send_paused(vc_tcp)) {
        MPIDI_CH3I_Sendq_enqueue(&vc_tcp->paused_send_queue, sreq);
    } else {
        if (MPID_nem_tcp_vc_is_connected(vc_tcp)) {
            if (MPIDI_CH3I_Sendq_empty(vc_tcp->send_queue)) {
                /* this will be the first send on the queue: queue it and set the write flag on the pollfd */
                MPIDI_CH3I_Sendq_enqueue(&vc_tcp->send_queue, sreq);
                SET_PLFD(vc_tcp);
            } else {
                /* there are other sends in the queue before this one: try to send from the queue */
                MPIDI_CH3I_Sendq_enqueue(&vc_tcp->send_queue, sreq);
                mpi_errno = MPID_nem_tcp_send_queued(vc, &vc_tcp->send_queue);
                if (mpi_errno) MPIR_ERR_POP(mpi_errno);
            }
        } else {
            MPIDI_CH3I_Sendq_enqueue(&vc_tcp->send_queue, sreq);
        }
    }
    
fn_exit:
    MPIDI_FUNC_EXIT(MPID_STATE_MPID_NEM_TCP_SENDNONCONTIG);
    return mpi_errno;
fn_fail:
    MPID_Request_release(sreq);
    goto fn_exit;
}

#undef FUNCNAME
#define FUNCNAME MPID_nem_tcp_error_out_send_queue
#undef FCNAME
#define FCNAME MPL_QUOTE(FUNCNAME)
int MPID_nem_tcp_error_out_send_queue(struct MPIDI_VC *const vc, int req_errno)
{
    int mpi_errno = MPI_SUCCESS;
    MPID_Request *req;
    MPID_nem_tcp_vc_area *const vc_tcp = VC_TCP(vc);
    MPIDI_STATE_DECL(MPID_STATE_MPID_NEM_TCP_ERROR_OUT_SEND_QUEUE);

    MPIDI_FUNC_ENTER(MPID_STATE_MPID_NEM_TCP_ERROR_OUT_SEND_QUEUE);

    /* we don't call onDataAvail or onFinal handlers because this is
       an error condition and we just want to mark them as complete */

    /* send queue */
    while (!MPIDI_CH3I_Sendq_empty(vc_tcp->send_queue)) {
        MPIDI_CH3I_Sendq_dequeue(&vc_tcp->send_queue, &req);
        req->status.MPI_ERROR = req_errno;

        mpi_errno = MPID_Request_complete(req);
        if (mpi_errno != MPI_SUCCESS) {
            MPIR_ERR_POP(mpi_errno);
        }
    }

    /* paused send queue */
    while (!MPIDI_CH3I_Sendq_empty(vc_tcp->paused_send_queue)) {
        MPIDI_CH3I_Sendq_dequeue(&vc_tcp->paused_send_queue, &req);
        req->status.MPI_ERROR = req_errno;

        mpi_errno = MPID_Request_complete(req);
        if (mpi_errno != MPI_SUCCESS) {
            MPIR_ERR_POP(mpi_errno);
        }
    }

 fn_exit:
    MPIDI_FUNC_EXIT(MPID_STATE_MPID_NEM_TCP_ERROR_OUT_SEND_QUEUE);
    return mpi_errno;
 fn_fail:
    goto fn_exit;
}