Blob Blame History Raw
/* -*- Mode: C; c-basic-offset:4 ; indent-tabs-mode:nil ; -*- */
/*
 *  (C) 2001 by Argonne National Laboratory.
 *      See COPYRIGHT in top-level directory.
 */

#include "mpidimpl.h"
#include "mpidi_recvq_statistics.h"

/* MPIDI_POSTED_RECV_ENQUEUE_HOOK(req): Notifies channel that req has
   been enqueued on the posted recv queue.  Returns void. */
#ifndef MPIDI_POSTED_RECV_ENQUEUE_HOOK
#define MPIDI_POSTED_RECV_ENQUEUE_HOOK(req) do{}while(0)
#endif
/* MPIDI_POSTED_RECV_DEQUEUE_HOOK(req): Notifies channel that req has
   been dequeued from the posted recv queue.  Returns non-zero if the
   channel has already matched the request; 0 otherwise.  This happens
   when the channel supports shared-memory and network communication
   with a network capable of matching, and the same request is matched
   by the network and, e.g., shared-memory.  When that happens the
   dequeue functions below should, either search for the next matching
   request, or report that no request was found. */
#ifndef MPIDI_POSTED_RECV_DEQUEUE_HOOK
#define MPIDI_POSTED_RECV_DEQUEUE_HOOK(req) 0
#endif

/* FIXME: 
 * Recvq_lock/unlock removed because it is not needed for the SINGLE_CS
 * approach and we might want a different, non-lock-based approach in 
 * a finer-grained thread-sync version.  For example, 
 * some routines can be implemented in a lock-free
 * fashion (because the user is required to guarantee non-conflicting 
 * accesses, such as doing a probe and a receive that matches in different 
 * threads).  
 *
 * There are a lot of routines here.  Do we really need them all?
 * 
 * The search criteria can be implemented in a single 64 bit compare on
 * systems with efficient 64-bit operations.  The rank and contextid can also
 * in many cases be combined into a single 32-bit word for the comparison
 * (in which case the message info should be stored in the queue in a 
 * naturally aligned, 64-bit word.
 * 
 */

static MPID_Request * recvq_posted_head = 0;
static MPID_Request * recvq_posted_tail = 0;
static MPID_Request * recvq_unexpected_head = 0;
static MPID_Request * recvq_unexpected_tail = 0;

/* Export the location of the queue heads if debugger support is enabled.
 * This allows the queue code to rely on the local variables for the
 * queue heads while also exporting those variables to the debugger.
 * See src/mpi/debugger/dll_mpich.c for how this is used to 
 * access the message queues.
 */
#ifdef HAVE_DEBUGGER_SUPPORT
MPID_Request ** const MPID_Recvq_posted_head_ptr     = &recvq_posted_head;
MPID_Request ** const MPID_Recvq_unexpected_head_ptr = &recvq_unexpected_head;
#endif

/* If the MPIDI_Message_match structure fits into a pointer size, we
 * can directly work on it */
/* MATCH_WITH_NO_MASK compares the match values without masking
 * them. This is useful for the case where there are no ANY_TAG or
 * ANY_SOURCE wild cards. */
#define MATCH_WITH_NO_MASK(match1, match2)                              \
    ((sizeof(MPIDI_Message_match) == SIZEOF_VOID_P) ? ((match1).whole == (match2).whole) : \
     (((match1).parts.rank == (match2).parts.rank) &&                   \
      ((match1).parts.tag == (match2).parts.tag) &&                     \
      ((match1).parts.context_id == (match2).parts.context_id)))

/* MATCH_WITH_LEFT_MASK compares the match values after masking only
 * the left field. This is useful for the case where the right match
 * is a part of the unexpected queue and has no ANY_TAG or ANY_SOURCE
 * wild cards, but the left match might have them. */
#define MATCH_WITH_LEFT_MASK(match1, match2, mask)                      \
    ((sizeof(MPIDI_Message_match) == SIZEOF_VOID_P) ?                   \
     (((match1).whole & (mask).whole) == (match2).whole) :              \
     ((((match1).parts.rank & (mask).parts.rank) == (match2).parts.rank) && \
      (((match1).parts.tag & (mask).parts.tag) == (match2).parts.tag) && \
      ((match1).parts.context_id == (match2).parts.context_id)))

/* This is the most general case where both matches have to be
 * masked. Both matches are masked with the same value. There doesn't
 * seem to be a need for two different masks at this time. */
#define MATCH_WITH_LEFT_RIGHT_MASK(match1, match2, mask)                \
    ((sizeof(MPIDI_Message_match) == SIZEOF_VOID_P) ?                   \
     (((match1).whole & (mask).whole) == ((match2).whole & (mask).whole)) : \
     ((((match1).parts.rank & (mask).parts.rank) == ((match2).parts.rank & (mask).parts.rank)) && \
      (((match1).parts.tag & (mask).parts.tag) == ((match2).parts.tag & (mask).parts.tag)) && \
      ((match1).parts.context_id == (match2).parts.context_id)))


MPIR_T_PVAR_UINT_LEVEL_DECL_STATIC(RECVQ, posted_recvq_length);
MPIR_T_PVAR_UINT_LEVEL_DECL_STATIC(RECVQ, unexpected_recvq_length);
MPIR_T_PVAR_ULONG2_COUNTER_DECL_STATIC(RECVQ, posted_recvq_match_attempts);
MPIR_T_PVAR_ULONG2_COUNTER_DECL_STATIC(RECVQ, unexpected_recvq_match_attempts);
MPIR_T_PVAR_DOUBLE_TIMER_DECL_STATIC(RECVQ, time_failed_matching_postedq);
MPIR_T_PVAR_DOUBLE_TIMER_DECL_STATIC(RECVQ, time_matching_unexpectedq);

/* used in ch3u_eager.c and ch3u_handle_recv_pkt.c */
MPIR_T_PVAR_ULONG2_LEVEL_DECL(RECVQ, unexpected_recvq_buffer_size);

#undef FUNCNAME
#define FUNCNAME MPIDI_CH3U_Recvq_init
#undef FCNAME
#define FCNAME MPL_QUOTE(FUNCNAME)
int MPIDI_CH3U_Recvq_init(void)
{
    int mpi_errno = MPI_SUCCESS;
    MPIR_T_PVAR_LEVEL_REGISTER_STATIC(
        RECVQ,
        MPI_UNSIGNED,
        posted_recvq_length,
        0, /* init value */
        MPI_T_VERBOSITY_USER_DETAIL,
        MPI_T_BIND_NO_OBJECT,
        (MPIR_T_PVAR_FLAG_READONLY | MPIR_T_PVAR_FLAG_CONTINUOUS),
        "CH3", /* category name */
        "length of the posted message receive queue");

    MPIR_T_PVAR_LEVEL_REGISTER_STATIC(
        RECVQ,
        MPI_UNSIGNED,
        unexpected_recvq_length,
        0, /* init value */
        MPI_T_VERBOSITY_USER_DETAIL,
        MPI_T_BIND_NO_OBJECT,
        (MPIR_T_PVAR_FLAG_READONLY | MPIR_T_PVAR_FLAG_CONTINUOUS),
        "CH3", /* category name */
        "length of the unexpected message receive queue");

    MPIR_T_PVAR_COUNTER_REGISTER_STATIC(
        RECVQ,
        MPI_UNSIGNED_LONG_LONG,
        posted_recvq_match_attempts,
        MPI_T_VERBOSITY_USER_DETAIL,
        MPI_T_BIND_NO_OBJECT,
        (MPIR_T_PVAR_FLAG_READONLY | MPIR_T_PVAR_FLAG_CONTINUOUS),
        "CH3", /* category name */
        "number of search passes on the posted message receive queue");

    MPIR_T_PVAR_COUNTER_REGISTER_STATIC(
        RECVQ,
        MPI_UNSIGNED_LONG_LONG,
        unexpected_recvq_match_attempts,
        MPI_T_VERBOSITY_USER_DETAIL,
        MPI_T_BIND_NO_OBJECT,
        (MPIR_T_PVAR_FLAG_READONLY | MPIR_T_PVAR_FLAG_CONTINUOUS),
        "CH3",
        "number of search passes on the unexpected message receive queue");

    MPIR_T_PVAR_TIMER_REGISTER_STATIC(
        RECVQ,
        MPI_DOUBLE,
        time_failed_matching_postedq,
        MPI_T_VERBOSITY_USER_DETAIL,
        MPI_T_BIND_NO_OBJECT,
        (MPIR_T_PVAR_FLAG_READONLY | MPIR_T_PVAR_FLAG_CONTINUOUS),
        "CH3", /* category name */
        "total time spent on unsuccessful search passes on the posted receives queue");

    MPIR_T_PVAR_TIMER_REGISTER_STATIC(
        RECVQ,
        MPI_DOUBLE,
        time_matching_unexpectedq,
        MPI_T_VERBOSITY_USER_DETAIL,
        MPI_T_BIND_NO_OBJECT,
        (MPIR_T_PVAR_FLAG_READONLY | MPIR_T_PVAR_FLAG_CONTINUOUS),
        "CH3", /* category name */
        "total time spent on search passes on the unexpected receive queue");

    MPIR_T_PVAR_LEVEL_REGISTER_STATIC(
        RECVQ,
        MPI_UNSIGNED_LONG_LONG,
        unexpected_recvq_buffer_size,
        0, /* init value */
        MPI_T_VERBOSITY_USER_DETAIL,
        MPI_T_BIND_NO_OBJECT,
        (MPIR_T_PVAR_FLAG_READONLY | MPIR_T_PVAR_FLAG_CONTINUOUS),
        "CH3", /* category name */
        "total buffer size allocated in the unexpected receive queue");

fn_fail:
    return mpi_errno;
}

/* FIXME: If this routine is only used by probe/iprobe, then we don't need
   to set the cancelled field in status (only set for nonblocking requests) */
/*
 * MPIDI_CH3U_Recvq_FU()
 *
 * Search for a matching request in the unexpected receive queue.  Return 
 * true if one is found, false otherwise.  If the status argument is
 * not MPI_STATUS_IGNORE, return information about the request in that
 * parameter.  This routine is used by mpid_probe and mpid_iprobe.
 *
 * Multithread - As this is a read-only routine, it need not
 * require an external critical section (careful organization of the
 * queue updates would not even require a critical section within this
 * routine).  However, this routine is used both from within the progress
 * engine and from without it.  To make that work with the current
 * design for MSGQUEUE and the brief-global mode, the critical section 
 * is *outside* of this routine.
 *
 * This routine is used only in mpid_iprobe and mpid_probe
 *
 */
#undef FUNCNAME
#define FUNCNAME MPIDI_CH3U_Recvq_FU
#undef FCNAME
#define FCNAME MPL_QUOTE(FUNCNAME)
int MPIDI_CH3U_Recvq_FU(int source, int tag, int context_id, MPI_Status *s)
{
    MPID_Request * rreq;
    int found = 0;
    MPIDI_Message_match match, mask;
    MPIDI_STATE_DECL(MPID_STATE_MPIDI_CH3U_RECVQ_FU);

    MPIDI_FUNC_ENTER(MPID_STATE_MPIDI_CH3U_RECVQ_FU);

    rreq = recvq_unexpected_head;

    match.parts.context_id = context_id;
    match.parts.tag = tag;
    match.parts.rank = source;

    mask.parts.context_id = mask.parts.rank = mask.parts.tag = ~0;
    /* Mask the error bit that might be set on incoming messages. It is
     * assumed that the local receive operation won't have the error bit set
     * (or it is masked away at some other level). */
    MPIR_TAG_CLEAR_ERROR_BITS(mask.parts.tag);
    if (tag != MPI_ANY_TAG && source != MPI_ANY_SOURCE) {
        MPIR_T_PVAR_TIMER_START(RECVQ, time_matching_unexpectedq);
	while (rreq != NULL) {
        MPIR_T_PVAR_COUNTER_INC(RECVQ, unexpected_recvq_match_attempts, 1);
	    if (MATCH_WITH_LEFT_MASK(rreq->dev.match, match, mask))
		break;
	    rreq = rreq->dev.next;
	}
        MPIR_T_PVAR_TIMER_END(RECVQ, time_matching_unexpectedq);
    }
    else {
	if (tag == MPI_ANY_TAG)
	    match.parts.tag = mask.parts.tag = 0;
	if (source == MPI_ANY_SOURCE)
	    match.parts.rank = mask.parts.rank = 0;

        MPIR_T_PVAR_TIMER_START(RECVQ, time_matching_unexpectedq);
	while (rreq != NULL) {
        MPIR_T_PVAR_COUNTER_INC(RECVQ, unexpected_recvq_match_attempts, 1);
	    if (MATCH_WITH_LEFT_MASK(rreq->dev.match, match, mask))
		break;
	    rreq = rreq->dev.next;
	}
        MPIR_T_PVAR_TIMER_END(RECVQ, time_matching_unexpectedq);
    }

    /* Save the information about the request before releasing the 
       queue */
    if (rreq) {
	if (s != MPI_STATUS_IGNORE) {
	    /* Avoid setting "extra" fields like MPI_ERROR */
	    s->MPI_SOURCE = rreq->status.MPI_SOURCE;
	    s->MPI_TAG    = rreq->status.MPI_TAG;
            MPIR_STATUS_SET_COUNT(*s, MPIR_STATUS_GET_COUNT(rreq->status));
            MPIR_STATUS_SET_CANCEL_BIT(*s, MPIR_STATUS_GET_CANCEL_BIT(rreq->status));
	}
	found = 1;
    }

    MPIDI_FUNC_EXIT(MPID_STATE_MPIDI_CH3U_RECVQ_FU);
    return found;
}

/*
 * MPIDI_CH3U_Recvq_FDU()
 *
 * Find a request in the unexpected queue and dequeue it; otherwise return NULL.
 *
 * Multithread - This routine must be atomic (since it dequeues a
 * request).  However, once the request is dequeued, no other thread can
 * see it, so this routine provides its own atomicity.
 *
 * This routine is used only in the case of send_cancel.  However, it is used both
 * within mpid_send_cancel and within a packet handler.
 */
#undef FUNCNAME
#define FUNCNAME MPIDI_CH3U_Recvq_FDU
#undef FCNAME
#define FCNAME MPL_QUOTE(FUNCNAME)
MPID_Request * MPIDI_CH3U_Recvq_FDU(MPI_Request sreq_id, 
				    MPIDI_Message_match * match)
{
    MPID_Request * rreq;
    MPID_Request * prev_rreq;
    MPID_Request * cur_rreq;
    MPID_Request * matching_prev_rreq;
    MPID_Request * matching_cur_rreq;
    MPIDI_Message_match mask;
    MPIDI_STATE_DECL(MPID_STATE_MPIDI_CH3U_RECVQ_FDU);

    MPIDI_FUNC_ENTER(MPID_STATE_MPIDI_CH3U_RECVQ_FDU);

    matching_prev_rreq = NULL;
    matching_cur_rreq = NULL;
    prev_rreq = NULL;

    mask.parts.context_id = mask.parts.rank = mask.parts.tag = ~0;
    /* Mask the error bit that might be set on incoming messages. It is
     * assumed that the local receive operation won't have the error bit set
     * (or it is masked away at some other level). */
    MPIR_TAG_CLEAR_ERROR_BITS(mask.parts.tag);

    /* Note that since this routine is used only in the case of send_cancel,
     * there can be only one match, if at all. The reason the loop continues
     * after finding a match is that send request handles are reused. In the
     * case multiple requests with the same sender_req_id are present, only
     * the last message is cancellable. This is because previous instances
     * must have been tested or waited on the sender side, indicating completion
     * and freeing the handle for reuse. Those completed operations cannot be
     * cancelled.
     */
    cur_rreq = recvq_unexpected_head;
    while (cur_rreq != NULL) {
        MPIR_T_PVAR_TIMER_START(RECVQ, time_matching_unexpectedq);

        if (cur_rreq->dev.sender_req_id == sreq_id) {
            MPIR_T_PVAR_COUNTER_INC(RECVQ, unexpected_recvq_match_attempts, 1);
            if (MATCH_WITH_LEFT_MASK(cur_rreq->dev.match, *match, mask)) {
                matching_prev_rreq = prev_rreq;
                matching_cur_rreq = cur_rreq;
            }
	    }

        MPIR_T_PVAR_TIMER_END(RECVQ, time_matching_unexpectedq);

	prev_rreq = cur_rreq;
	cur_rreq = cur_rreq->dev.next;
    }

    if (matching_cur_rreq != NULL) {
	if (matching_prev_rreq != NULL) {
	    matching_prev_rreq->dev.next = matching_cur_rreq->dev.next;
	}
	else {
	    recvq_unexpected_head = matching_cur_rreq->dev.next;
	}
	
	if (matching_cur_rreq->dev.next == NULL) {
	    recvq_unexpected_tail = matching_prev_rreq;
	}

    MPIR_T_PVAR_LEVEL_DEC(RECVQ, unexpected_recvq_length, 1);
	rreq = matching_cur_rreq;

        MPIR_T_PVAR_LEVEL_DEC(RECVQ, unexpected_recvq_buffer_size, rreq->dev.tmpbuf_sz);
    }
    else {
	rreq = NULL;
    }

    MPIDI_FUNC_EXIT(MPID_STATE_MPIDI_CH3U_RECVQ_FDU);
    return rreq;
}

/* TODO rename the old FDU and use that name for this one */
/* This is the routine that you expect to be named "_FDU".  It implements the
 * behavior needed for improbe; specifically, searching the receive queue for
 * the first matching request and dequeueing it. */
#undef FUNCNAME
#define FUNCNAME MPIDI_CH3U_Recvq_FDU_matchonly
#undef FCNAME
#define FCNAME MPL_QUOTE(FUNCNAME)
MPID_Request * MPIDI_CH3U_Recvq_FDU_matchonly(int source, int tag, int context_id, MPID_Comm *comm, int *foundp)
{
    int found = FALSE;
    MPID_Request *rreq, *prev_rreq;
    MPIDI_Message_match match;
    MPIDI_Message_match mask;
    MPIDI_STATE_DECL(MPID_STATE_MPIDI_CH3U_RECVQ_FDU_MATCHONLY);

    MPIDI_FUNC_ENTER(MPID_STATE_MPIDI_CH3U_RECVQ_FDU_MATCHONLY);

    /* Store how much time is spent traversing the queue */
    MPIR_T_PVAR_TIMER_START(RECVQ, time_matching_unexpectedq);

    /* Optimize this loop for an empty unexpected receive queue */
    rreq = recvq_unexpected_head;
    if (rreq) {
        prev_rreq = NULL;

        match.parts.context_id = context_id;
        match.parts.tag = tag;
        match.parts.rank = source;

        mask.parts.context_id = mask.parts.rank = mask.parts.tag = ~0;
        /* Mask the error bit that might be set on incoming messages. It is
         * assumed that the local receive operation won't have the error bit set
         * (or it is masked away at some other level). */
        MPIR_TAG_CLEAR_ERROR_BITS(mask.parts.tag);

        if (tag != MPI_ANY_TAG && source != MPI_ANY_SOURCE) {
            do {
                MPIR_T_PVAR_COUNTER_INC(RECVQ, unexpected_recvq_match_attempts, 1);
                if (MATCH_WITH_LEFT_MASK(rreq->dev.match, match, mask)) {
                    if (prev_rreq != NULL) {
                        prev_rreq->dev.next = rreq->dev.next;
                    }
                    else {
                        recvq_unexpected_head = rreq->dev.next;
                    }

                    if (rreq->dev.next == NULL) {
                        recvq_unexpected_tail = prev_rreq;
                    }
                    MPIR_T_PVAR_LEVEL_DEC(RECVQ, unexpected_recvq_length, 1);
                    MPIR_T_PVAR_LEVEL_DEC(RECVQ, unexpected_recvq_buffer_size, rreq->dev.tmpbuf_sz);

                    rreq->comm = comm;
                    MPIR_Comm_add_ref(comm);
                    /* don't have the (buf,count,type) info right now, can't add
                     * it to the request */
                    found = TRUE;
                    goto lock_exit;
                }
                prev_rreq = rreq;
                rreq      = rreq->dev.next;
            } while (rreq);
        }
        else {
            if (tag == MPI_ANY_TAG)
                match.parts.tag = mask.parts.tag = 0;
            if (source == MPI_ANY_SOURCE)
                match.parts.rank = mask.parts.rank = 0;

            do {
                MPIR_T_PVAR_COUNTER_INC(RECVQ, unexpected_recvq_match_attempts, 1);
                if (MATCH_WITH_LEFT_MASK(rreq->dev.match, match, mask)) {
                    if (prev_rreq != NULL) {
                        prev_rreq->dev.next = rreq->dev.next;
                    }
                    else {
                        recvq_unexpected_head = rreq->dev.next;
                    }
                    if (rreq->dev.next == NULL) {
                        recvq_unexpected_tail = prev_rreq;
                    }
                    MPIR_T_PVAR_LEVEL_DEC(RECVQ, unexpected_recvq_length, 1);
                    MPIR_T_PVAR_LEVEL_DEC(RECVQ, unexpected_recvq_buffer_size, rreq->dev.tmpbuf_sz);

                    rreq->comm                 = comm;
                    MPIR_Comm_add_ref(comm);
                    /* don't have the (buf,count,type) info right now, can't add
                     * it to the request */
                    found = TRUE;
                    goto lock_exit;
                }
                prev_rreq = rreq;
                rreq = rreq->dev.next;
            } while (rreq);
        }
    }

lock_exit:
    MPIR_T_PVAR_TIMER_END(RECVQ, time_matching_unexpectedq);

    *foundp = found;

    MPIDI_FUNC_EXIT(MPID_STATE_MPIDI_CH3U_RECVQ_FDU_MATCHONLY);
    return rreq;
}

/*
 * MPIDI_CH3U_Recvq_FDU_or_AEP()
 *
 * Atomically find a request in the unexpected queue and dequeue it, or 
 * allocate a new request and enqueue it in the posted queue
 *
 * Multithread - This routine must be called from within a MSGQUEUE 
 * critical section.  If a request is allocated, it must not release
 * the MSGQUEUE until the request is completely valid, as another thread
 * may then find it and dequeue it.
 *
 * This routine is used in mpid_irecv and mpid_recv.
 *
 */
#undef FUNCNAME
#define FUNCNAME MPIDI_CH3U_Recvq_FDU_or_AEP
#undef FCNAME
#define FCNAME MPL_QUOTE(FUNCNAME)
MPID_Request * MPIDI_CH3U_Recvq_FDU_or_AEP(int source, int tag, 
                                           int context_id, MPID_Comm *comm, void *user_buf,
                                           MPI_Aint user_count, MPI_Datatype datatype, int * foundp)
{
    int mpi_errno = MPI_SUCCESS;
    int found = FALSE;
    MPID_Request *rreq, *prev_rreq;
    MPIDI_Message_match match;
    MPIDI_Message_match mask;
    MPIDI_STATE_DECL(MPID_STATE_MPIDI_CH3U_RECVQ_FDU_OR_AEP);

    MPIDI_FUNC_ENTER(MPID_STATE_MPIDI_CH3U_RECVQ_FDU_OR_AEP);

    /* Store how much time is spent traversing the queue */
    MPIR_T_PVAR_TIMER_START(RECVQ, time_matching_unexpectedq);

    /* Optimize this loop for an empty unexpected receive queue */
    rreq = recvq_unexpected_head;
    if (rreq) {
	prev_rreq = NULL;

	match.parts.context_id = context_id;
	match.parts.tag = tag;
	match.parts.rank = source;

    mask.parts.context_id = mask.parts.rank = mask.parts.tag = ~0;
    /* Mask the error bit that might be set on incoming messages. It is
     * assumed that the local receive operation won't have the error bit set
     * (or it is masked away at some other level). */
    MPIR_TAG_CLEAR_ERROR_BITS(mask.parts.tag);

	if (tag != MPI_ANY_TAG && source != MPI_ANY_SOURCE) {
	    do {
            MPIR_T_PVAR_COUNTER_INC(RECVQ, unexpected_recvq_match_attempts, 1);
		if (MATCH_WITH_LEFT_MASK(rreq->dev.match, match, mask)) {
		    if (prev_rreq != NULL) {
			prev_rreq->dev.next = rreq->dev.next;
		    }
		    else {
			recvq_unexpected_head = rreq->dev.next;
		    }

		    if (rreq->dev.next == NULL) {
			recvq_unexpected_tail = prev_rreq;
		    }
            MPIR_T_PVAR_LEVEL_DEC(RECVQ, unexpected_recvq_length, 1);

            if (MPIDI_Request_get_msg_type(rreq) == MPIDI_REQUEST_EAGER_MSG)
                MPIR_T_PVAR_LEVEL_DEC(RECVQ, unexpected_recvq_buffer_size, rreq->dev.tmpbuf_sz);

		    rreq->comm = comm;
		    MPIR_Comm_add_ref(comm);
		    rreq->dev.user_buf = user_buf;
		    rreq->dev.user_count = user_count;
		    rreq->dev.datatype = datatype;
		    found = TRUE;
		    goto lock_exit;
		}
		prev_rreq = rreq;
		rreq      = rreq->dev.next;
	    } while (rreq);
	}
	else {
        do { /* This loop is just to make it easy to break out if necessary */
            if (tag == MPI_ANY_TAG)
                match.parts.tag = mask.parts.tag = 0;
            if (source == MPI_ANY_SOURCE) {
                match.parts.rank = mask.parts.rank = 0;
            }
            do {
                MPIR_T_PVAR_COUNTER_INC(RECVQ, unexpected_recvq_match_attempts, 1);
                if (MATCH_WITH_LEFT_MASK(rreq->dev.match, match, mask)) {
                    if (prev_rreq != NULL) {
                        prev_rreq->dev.next = rreq->dev.next;
                    }
                    else {
                        recvq_unexpected_head = rreq->dev.next;
                    }
                    if (rreq->dev.next == NULL) {
                        recvq_unexpected_tail = prev_rreq;
                    }
                    MPIR_T_PVAR_LEVEL_DEC(RECVQ, unexpected_recvq_length, 1);

                    if (MPIDI_Request_get_msg_type(rreq) == MPIDI_REQUEST_EAGER_MSG)
                        MPIR_T_PVAR_LEVEL_DEC(RECVQ, unexpected_recvq_buffer_size, rreq->dev.tmpbuf_sz);

                    rreq->comm                 = comm;
                    MPIR_Comm_add_ref(comm);
                    rreq->dev.user_buf         = user_buf;
                    rreq->dev.user_count       = user_count;
                    rreq->dev.datatype         = datatype;
                    found = TRUE;
                    goto lock_exit;
                }
                prev_rreq = rreq;
                rreq = rreq->dev.next;
            } while (rreq);
        } while (0);
	}
    }
    MPIR_T_PVAR_TIMER_END(RECVQ, time_matching_unexpectedq);

    /* A matching request was not found in the unexpected queue, so we 
       need to allocate a new request and add it to the posted queue */
    {
        found = FALSE;

	MPIDI_Request_create_rreq( rreq, mpi_errno, goto lock_exit );
	rreq->dev.match.parts.tag	   = tag;
	rreq->dev.match.parts.rank	   = source;
	rreq->dev.match.parts.context_id   = context_id;

	/* Added a mask for faster search on 64-bit capable
	 * platforms */
	rreq->dev.mask.parts.context_id = ~0;
	if (rreq->dev.match.parts.rank == MPI_ANY_SOURCE)
	    rreq->dev.mask.parts.rank = 0;
	else
	    rreq->dev.mask.parts.rank = ~0;
	if (rreq->dev.match.parts.tag == MPI_ANY_TAG)
	    rreq->dev.mask.parts.tag = 0;
	else
	    rreq->dev.mask.parts.tag = ~0;

        rreq->comm                 = comm;
        MPIR_Comm_add_ref(comm);
        rreq->dev.user_buf         = user_buf;
        rreq->dev.user_count       = user_count;
        rreq->dev.datatype         = datatype;

        /* check whether VC has failed, or this is an ANY_SOURCE in a
           failed communicator */
        if (source != MPI_ANY_SOURCE) {
            MPIDI_VC_t *vc;
            MPIDI_Comm_get_vc(comm, source, &vc);
            if (vc->state == MPIDI_VC_STATE_MORIBUND) {
                MPIR_ERR_SET1(mpi_errno, MPIX_ERR_PROC_FAILED, "**comm_fail", "**comm_fail %d", vc->pg_rank);
                rreq->status.MPI_ERROR = mpi_errno;
                MPID_Request_complete(rreq);
                goto lock_exit;
            }
        }

	rreq->dev.next = NULL;
	if (recvq_posted_tail != NULL) {
	    recvq_posted_tail->dev.next = rreq;
	}
	else {
	    recvq_posted_head = rreq;
	}
	recvq_posted_tail = rreq;
    MPIR_T_PVAR_LEVEL_INC(RECVQ, posted_recvq_length, 1);
	MPIDI_POSTED_RECV_ENQUEUE_HOOK(rreq);
    }
    
  lock_exit:
    *foundp = found;

    /* If a match was not found, the timer was stopped after the traversal */
    if (found)
        MPIR_T_PVAR_TIMER_END(RECVQ, time_matching_unexpectedq);
    
    MPIDI_FUNC_EXIT(MPID_STATE_MPIDI_CH3U_RECVQ_FDU_OR_AEP);
    return rreq;
}


/*
 * MPIDI_CH3U_Recvq_DP()
 *
 * Given an existing request, dequeue that request from the posted queue, or 
 * return NULL if the request was not in the posted queued
 *
 * Multithread - This routine is atomic
 */
#undef FUNCNAME
#define FUNCNAME MPIDI_CH3U_Recvq_DP
#undef FCNAME
#define FCNAME MPL_QUOTE(FUNCNAME)
int MPIDI_CH3U_Recvq_DP(MPID_Request * rreq)
{
    int found;
    MPID_Request * cur_rreq;
    MPID_Request * prev_rreq;
    int dequeue_failed;
    MPIDI_STATE_DECL(MPID_STATE_MPIDI_CH3U_RECVQ_DP);

    MPIDI_FUNC_ENTER(MPID_STATE_MPIDI_CH3U_RECVQ_DP);

    found = FALSE;
    prev_rreq = NULL;

    /* MT FIXME is this right? or should the caller do this? */
    MPID_THREAD_CS_ENTER(POBJ, MPIR_THREAD_POBJ_MSGQ_MUTEX);
    MPIR_T_PVAR_TIMER_START(RECVQ, time_failed_matching_postedq);
    cur_rreq = recvq_posted_head;
    while (cur_rreq != NULL) {
        MPIR_T_PVAR_COUNTER_INC(RECVQ, posted_recvq_match_attempts, 1);
	if (cur_rreq == rreq) {
	    if (prev_rreq != NULL) {
		prev_rreq->dev.next = cur_rreq->dev.next;
	    }
	    else {
		recvq_posted_head = cur_rreq->dev.next;
	    }
	    if (cur_rreq->dev.next == NULL) {
		recvq_posted_tail = prev_rreq;
	    }
        MPIR_T_PVAR_LEVEL_DEC(RECVQ, posted_recvq_length, 1);
            /* Notify channel that rreq has been dequeued and check if
               it has already matched rreq, fail if so */
	    dequeue_failed = MPIDI_POSTED_RECV_DEQUEUE_HOOK(rreq);
            if (!dequeue_failed)
                found = TRUE;
	    break;
	}
	
	prev_rreq = cur_rreq;
	cur_rreq = cur_rreq->dev.next;
    }
    if (!found)
        MPIR_T_PVAR_TIMER_END(RECVQ, time_failed_matching_postedq);

    MPID_THREAD_CS_EXIT(POBJ, MPIR_THREAD_POBJ_MSGQ_MUTEX);

    MPIDI_FUNC_EXIT(MPID_STATE_MPIDI_CH3U_RECVQ_DP);
    return found;
}

/*
 * MPIDI_CH3U_Recvq_FDP_or_AEU()
 *
 * Locate a request in the posted queue and dequeue it, or allocate a new 
 * request and enqueue it in the unexpected queue
 *
 * Multithread - This routine must be called from within a MSGQUEUE 
 * critical section.  If a request is allocated, it must not release
 * the MSGQUEUE until the request is completely valid, as another thread
 * may then find it and dequeue it.
 *
 * This routine is used in ch3u_eager, ch3u_eagersync, ch3u_handle_recv_pkt,
 * ch3u_rndv, and mpidi_isend_self.  Routines within the progress engine
 * will need to be careful to avoid nested critical sections.  
 *
 * FIXME: Currently, the routines called from within the progress engine
 * do not use the MSGQUEUE CS, because in the brief-global mode, that
 * simply uses the global_mutex .  
 */
#undef FUNCNAME
#define FUNCNAME MPIDI_CH3U_Recvq_FDP_or_AEU
#undef FCNAME
#define FCNAME MPL_QUOTE(FUNCNAME)
MPID_Request * MPIDI_CH3U_Recvq_FDP_or_AEU(MPIDI_Message_match * match, 
					   int * foundp)
{
    int found;
    MPID_Request * rreq;
    MPID_Request * prev_rreq;
    int channel_matched;
    int error_bit_masked = 0, proc_failure_bit_masked = 0;
    MPIDI_STATE_DECL(MPID_STATE_MPIDI_CH3U_RECVQ_FDP_OR_AEU);

    MPIDI_FUNC_ENTER(MPID_STATE_MPIDI_CH3U_RECVQ_FDP_OR_AEU);

    /* Unset the error bit if it is set on the incoming packet so we don't
     * have to mask it every time. It will get reset at the end of the loop or
     * before the request is added to the unexpected queue if was set here. */
    if (MPIR_TAG_CHECK_ERROR_BIT(match->parts.tag)) {
        error_bit_masked = 1;
        if (MPIR_TAG_CHECK_PROC_FAILURE_BIT(match->parts.tag)) proc_failure_bit_masked = 1;
        MPIR_TAG_CLEAR_ERROR_BITS(match->parts.tag);
    }

    prev_rreq = NULL;

    rreq = recvq_posted_head;

    MPIR_T_PVAR_TIMER_START(RECVQ, time_failed_matching_postedq);
    while (rreq != NULL) {
        MPIR_T_PVAR_COUNTER_INC(RECVQ, posted_recvq_match_attempts, 1);
	if (MATCH_WITH_LEFT_RIGHT_MASK(rreq->dev.match, *match, rreq->dev.mask)) {
	    if (prev_rreq != NULL) {
		prev_rreq->dev.next = rreq->dev.next;
	    }
	    else {
		recvq_posted_head = rreq->dev.next;
	    }
	    if (rreq->dev.next == NULL) {
		recvq_posted_tail = prev_rreq;
	    }
        MPIR_T_PVAR_LEVEL_DEC(RECVQ, posted_recvq_length, 1);

            /* Give channel a chance to match the request */
            channel_matched = MPIDI_POSTED_RECV_DEQUEUE_HOOK(rreq);
            if (!channel_matched) {
                /* If the channel did not match the request, the match here is
                 * valid and we can stop searching for the request. */
                found = TRUE;
                goto lock_exit;
            } else {
                /* If the channel did match the request, then it's already
                 * matched in the channel and the request here should be
                 * discarded. Continue searching. */
                rreq = rreq->dev.next;
            }
        } else {
            prev_rreq = rreq;
            rreq = rreq->dev.next;
        }
    }
    MPIR_T_PVAR_TIMER_END(RECVQ, time_failed_matching_postedq);

    /* If we didn't match the request, look to see if the communicator is
     * revoked. If so, just throw this request away since it won't be used
     * anyway. */
    {
        MPID_Comm *comm_ptr;
        int mpi_errno ATTRIBUTE((unused)) = MPI_SUCCESS;

        MPIDI_CH3I_Comm_find(match->parts.context_id, &comm_ptr);

        if (comm_ptr && comm_ptr->revoked && MPIR_TAG_MASK_ERROR_BITS(match->parts.tag) != MPIR_AGREE_TAG &&
                        comm_ptr->revoked && MPIR_TAG_MASK_ERROR_BITS(match->parts.tag) != MPIR_SHRINK_TAG) {
            *foundp = FALSE;
            MPIDI_Request_create_null_rreq( rreq, mpi_errno, found=FALSE;goto lock_exit );
            MPIU_Assert(mpi_errno == MPI_SUCCESS);

            MPIU_DBG_MSG_FMT(CH3_OTHER, VERBOSE,
                (MPIU_DBG_FDEST, "RECEIVED MESSAGE FOR REVOKED COMM (tag=%d,src=%d,cid=%d)\n", MPIR_TAG_MASK_ERROR_BITS(match->parts.tag), match->parts.rank, comm_ptr->context_id));
            return rreq;
        }
    }

    /* A matching request was not found in the posted queue, so we 
       need to allocate a new request and add it to the unexpected queue */
    {
        int mpi_errno ATTRIBUTE((unused)) = 0;
	MPIDI_Request_create_rreq( rreq, mpi_errno, 
				   found=FALSE;goto lock_exit );
        MPIU_Assert(mpi_errno == 0);
        rreq->dev.recv_pending_count = 1;
        /* Reset the error bits if we unset it earlier. */
        if (error_bit_masked) MPIR_TAG_SET_ERROR_BIT(match->parts.tag);
        if (proc_failure_bit_masked) MPIR_TAG_SET_PROC_FAILURE_BIT(match->parts.tag);
	rreq->dev.match	= *match;
	rreq->dev.next	= NULL;
	if (recvq_unexpected_tail != NULL) {
	    recvq_unexpected_tail->dev.next = rreq;
	}
	else {
	    recvq_unexpected_head = rreq;
	}
	recvq_unexpected_tail = rreq;
    MPIR_T_PVAR_LEVEL_INC(RECVQ, unexpected_recvq_length, 1);
    }
    
    found = FALSE;

  lock_exit:

    /* Reset the error bits if we unset it earlier. */
    if (error_bit_masked) MPIR_TAG_SET_ERROR_BIT(match->parts.tag);
    if (proc_failure_bit_masked) MPIR_TAG_SET_PROC_FAILURE_BIT(match->parts.tag);

    *foundp = found;

    MPIDI_FUNC_EXIT(MPID_STATE_MPIDI_CH3U_RECVQ_FDP_OR_AEU);
    return rreq;
}

/* returns TRUE iff the request was sent on the vc */
static inline int req_uses_vc(const MPID_Request* req, const MPIDI_VC_t *vc)
{
    MPIDI_VC_t *vc1;
    
    MPIDI_Comm_get_vc(req->comm, req->dev.match.parts.rank, &vc1);
    return vc == vc1;
}

#undef FUNCNAME
#define FUNCNAME dequeue_and_set_error
#undef FCNAME
#define FCNAME MPL_QUOTE(FUNCNAME)
/* This dequeues req from the posted recv queue, set req's error code to comm_fail, and updates the req pointer.
   Note that this creates a new error code if one hasn't already been created (i.e., if *error is MPI_SUCCESS). */
static inline void dequeue_and_set_error(MPID_Request **req,  MPID_Request *prev_req, MPID_Request **head, MPID_Request **tail, int *error, int rank)
{
    MPID_Request *next = (*req)->dev.next;
    
    /* remove from queue */
    if (*head == *req) {
        if (*head == recvq_posted_head) MPIR_T_PVAR_LEVEL_DEC(RECVQ, posted_recvq_length, 1);

        *head = (*req)->dev.next;
    } else
        prev_req->dev.next = (*req)->dev.next;

    if (*tail == *req)
        *tail = prev_req;

    /* set error and complete */
    (*req)->status.MPI_ERROR = *error;
    MPID_Request_complete(*req);
    MPIU_DBG_MSG_FMT(CH3_OTHER, VERBOSE,
                     (MPIU_DBG_FDEST, "set error of req %p (%#08x) to %#x and completing.",
                      *req, (*req)->handle, *error));
    *req = next;
}

/*
 * MPIDI_CH3U_Clean_recvq()
 *
 * Looks through the entire unexpected recv queue and the posted recv queues.
 * If a request is found that involved the provided communicator (comm_ptr),
 * it is dequeed and marked as failed via MPIX_ERR_REVOKED.
 *
 * Multithread - This routine must be called from within a MSGQUEUE
 * critical section.  If a request is allocated, it must not release
 * the MSGQUEUE until the request is completely valid, as another thread
 * may then find it and dequeue it.
 *
 */
int MPIDI_CH3U_Clean_recvq(MPID_Comm *comm_ptr)
{
    int mpi_errno = MPI_SUCCESS;
    int error = MPI_SUCCESS;
    MPID_Request *rreq, *prev_rreq = NULL;
    MPIDI_Message_match match;
    MPIDI_Message_match mask;
    MPIDI_STATE_DECL(MPIDI_CH3U_CLEAN_RECVQ);

    MPIDI_FUNC_ENTER(MPIDI_CH3U_CLEAN_RECVQ);

    MPIR_ERR_SETSIMPLE(error, MPIX_ERR_REVOKED, "**revoked");

    rreq = recvq_unexpected_head;
    mask.parts.context_id = ~0;
    mask.parts.rank = mask.parts.tag = 0;

    /* Clear the error bit in the tag since we don't care about whether or
     * not we're trying to report an error anymore. */
    MPIR_TAG_CLEAR_ERROR_BITS(mask.parts.tag);

    while (NULL != rreq) {
        /* We'll have to do this matching twice. Once for the pt2pt context id
         * and once for the collective context id */
        match.parts.context_id = comm_ptr->recvcontext_id + MPID_CONTEXT_INTRA_PT2PT;

        if (MATCH_WITH_LEFT_RIGHT_MASK(rreq->dev.match, match, mask)) {
            MPIU_DBG_MSG_FMT(CH3_OTHER,VERBOSE,(MPIU_DBG_FDEST,
                        "cleaning up unexpected pt2pt pkt rank=%d tag=%d contextid=%d",
                        rreq->dev.match.parts.rank, rreq->dev.match.parts.tag, rreq->dev.match.parts.context_id));
            dequeue_and_set_error(&rreq, prev_rreq, &recvq_unexpected_head, &recvq_unexpected_tail, &error, MPI_PROC_NULL);
            continue;
        }

        match.parts.context_id = comm_ptr->recvcontext_id + MPID_CONTEXT_INTRA_COLL;

        if (MATCH_WITH_LEFT_RIGHT_MASK(rreq->dev.match, match, mask)) {
            if (MPIR_TAG_MASK_ERROR_BITS(rreq->dev.match.parts.tag) != MPIR_AGREE_TAG &&
                MPIR_TAG_MASK_ERROR_BITS(rreq->dev.match.parts.tag) != MPIR_SHRINK_TAG) {
                MPIU_DBG_MSG_FMT(CH3_OTHER,VERBOSE,(MPIU_DBG_FDEST,
                            "cleaning up unexpected collective pkt rank=%d tag=%d contextid=%d",
                            rreq->dev.match.parts.rank, rreq->dev.match.parts.tag, rreq->dev.match.parts.context_id));
                dequeue_and_set_error(&rreq, prev_rreq, &recvq_unexpected_head, &recvq_unexpected_tail, &error, MPI_PROC_NULL);
                continue;
            }
        }

        if (MPIR_Comm_is_node_aware(comm_ptr)) {
            int offset;
            offset = (comm_ptr->comm_kind == MPID_INTRACOMM) ?  MPID_CONTEXT_INTRA_PT2PT : MPID_CONTEXT_INTER_PT2PT;
            match.parts.context_id = comm_ptr->recvcontext_id + MPID_CONTEXT_INTRANODE_OFFSET + offset;

            if (MATCH_WITH_LEFT_RIGHT_MASK(rreq->dev.match, match, mask)) {
                if (MPIR_TAG_MASK_ERROR_BITS(rreq->dev.match.parts.tag) != MPIR_AGREE_TAG &&
                    MPIR_TAG_MASK_ERROR_BITS(rreq->dev.match.parts.tag) != MPIR_SHRINK_TAG) {
                    MPIU_DBG_MSG_FMT(CH3_OTHER,VERBOSE,(MPIU_DBG_FDEST,
                                "cleaning up unexpected pt2pt pkt rank=%d tag=%d contextid=%d",
                                rreq->dev.match.parts.rank, rreq->dev.match.parts.tag, rreq->dev.match.parts.context_id));
                    dequeue_and_set_error(&rreq, prev_rreq, &recvq_unexpected_head, &recvq_unexpected_tail, &error, MPI_PROC_NULL);
                    continue;
                }
            }

            offset = (comm_ptr->comm_kind == MPID_INTRACOMM) ?  MPID_CONTEXT_INTRA_COLL : MPID_CONTEXT_INTER_COLL;
            match.parts.context_id = comm_ptr->recvcontext_id + MPID_CONTEXT_INTRANODE_OFFSET + offset;

            if (MATCH_WITH_LEFT_RIGHT_MASK(rreq->dev.match, match, mask)) {
                if (MPIR_TAG_MASK_ERROR_BITS(rreq->dev.match.parts.tag) != MPIR_AGREE_TAG &&
                    MPIR_TAG_MASK_ERROR_BITS(rreq->dev.match.parts.tag) != MPIR_SHRINK_TAG) {
                    MPIU_DBG_MSG_FMT(CH3_OTHER,VERBOSE,(MPIU_DBG_FDEST,
                                "cleaning up unexpected collective pkt rank=%d tag=%d contextid=%d",
                                rreq->dev.match.parts.rank, rreq->dev.match.parts.tag, rreq->dev.match.parts.context_id));
                    dequeue_and_set_error(&rreq, prev_rreq, &recvq_unexpected_head, &recvq_unexpected_tail, &error, MPI_PROC_NULL);
                    continue;
                }
            }

            offset = (comm_ptr->comm_kind == MPID_INTRACOMM) ?  MPID_CONTEXT_INTRA_PT2PT : MPID_CONTEXT_INTER_PT2PT;
            match.parts.context_id = comm_ptr->recvcontext_id + MPID_CONTEXT_INTERNODE_OFFSET + offset;

            if (MATCH_WITH_LEFT_RIGHT_MASK(rreq->dev.match, match, mask)) {
                if (MPIR_TAG_MASK_ERROR_BITS(rreq->dev.match.parts.tag) != MPIR_AGREE_TAG &&
                    MPIR_TAG_MASK_ERROR_BITS(rreq->dev.match.parts.tag) != MPIR_SHRINK_TAG) {
                    MPIU_DBG_MSG_FMT(CH3_OTHER,VERBOSE,(MPIU_DBG_FDEST,
                                "cleaning up unexpected pt2pt pkt rank=%d tag=%d contextid=%d",
                                rreq->dev.match.parts.rank, rreq->dev.match.parts.tag, rreq->dev.match.parts.context_id));
                    dequeue_and_set_error(&rreq, prev_rreq, &recvq_unexpected_head, &recvq_unexpected_tail, &error, MPI_PROC_NULL);
                    continue;
                }
            }

            offset = (comm_ptr->comm_kind == MPID_INTRACOMM) ?  MPID_CONTEXT_INTRA_COLL : MPID_CONTEXT_INTER_COLL;
            match.parts.context_id = comm_ptr->recvcontext_id + MPID_CONTEXT_INTERNODE_OFFSET + offset;

            if (MATCH_WITH_LEFT_RIGHT_MASK(rreq->dev.match, match, mask)) {
                if (MPIR_TAG_MASK_ERROR_BITS(rreq->dev.match.parts.tag) != MPIR_AGREE_TAG &&
                    MPIR_TAG_MASK_ERROR_BITS(rreq->dev.match.parts.tag) != MPIR_SHRINK_TAG) {
                    MPIU_DBG_MSG_FMT(CH3_OTHER,VERBOSE,(MPIU_DBG_FDEST,
                                "cleaning up unexpected collective pkt rank=%d tag=%d contextid=%d",
                                rreq->dev.match.parts.rank, rreq->dev.match.parts.tag, rreq->dev.match.parts.context_id));
                    dequeue_and_set_error(&rreq, prev_rreq, &recvq_unexpected_head, &recvq_unexpected_tail, &error, MPI_PROC_NULL);
                    continue;
                }
            }
        }

        prev_rreq = rreq;
        rreq = rreq->dev.next;
    }

    rreq = recvq_posted_head;
    prev_rreq = NULL;

    while (NULL != rreq) {
        /* We'll have to do this matching twice. Once for the pt2pt context id
         * and once for the collective context id */
        match.parts.context_id = comm_ptr->recvcontext_id + MPID_CONTEXT_INTRA_PT2PT;

        if (MATCH_WITH_LEFT_RIGHT_MASK(rreq->dev.match, match, mask)) {
            MPIU_DBG_MSG_FMT(CH3_OTHER,VERBOSE,(MPIU_DBG_FDEST,
                        "cleaning up posted pt2pt pkt rank=%d tag=%d contextid=%d",
                        rreq->dev.match.parts.rank, rreq->dev.match.parts.tag, rreq->dev.match.parts.context_id));
            dequeue_and_set_error(&rreq, prev_rreq, &recvq_posted_head, &recvq_posted_tail, &error, MPI_PROC_NULL);
            continue;
        }

        match.parts.context_id = comm_ptr->recvcontext_id + MPID_CONTEXT_INTRA_COLL;

        if (MATCH_WITH_LEFT_RIGHT_MASK(rreq->dev.match, match, mask)) {
            if (MPIR_TAG_MASK_ERROR_BITS(rreq->dev.match.parts.tag) != MPIR_AGREE_TAG &&
                MPIR_TAG_MASK_ERROR_BITS(rreq->dev.match.parts.tag) != MPIR_SHRINK_TAG) {
                MPIU_DBG_MSG_FMT(CH3_OTHER,VERBOSE,(MPIU_DBG_FDEST,
                            "cleaning up posted collective pkt rank=%d tag=%d contextid=%d",
                            rreq->dev.match.parts.rank, rreq->dev.match.parts.tag, rreq->dev.match.parts.context_id));
                dequeue_and_set_error(&rreq, prev_rreq, &recvq_posted_head, &recvq_posted_tail, &error, MPI_PROC_NULL);
                continue;
            }
        }

        if (MPIR_Comm_is_node_aware(comm_ptr)) {
            int offset;
            offset = (comm_ptr->comm_kind == MPID_INTRACOMM) ?  MPID_CONTEXT_INTRA_PT2PT : MPID_CONTEXT_INTER_PT2PT;
            match.parts.context_id = comm_ptr->recvcontext_id + MPID_CONTEXT_INTRANODE_OFFSET + offset;

            if (MATCH_WITH_LEFT_RIGHT_MASK(rreq->dev.match, match, mask)) {
                if (MPIR_TAG_MASK_ERROR_BITS(rreq->dev.match.parts.tag) != MPIR_AGREE_TAG &&
                    MPIR_TAG_MASK_ERROR_BITS(rreq->dev.match.parts.tag) != MPIR_SHRINK_TAG) {
                    MPIU_DBG_MSG_FMT(CH3_OTHER,VERBOSE,(MPIU_DBG_FDEST,
                                "cleaning up posted pt2pt pkt rank=%d tag=%d contextid=%d",
                                rreq->dev.match.parts.rank, rreq->dev.match.parts.tag, rreq->dev.match.parts.context_id));
                    dequeue_and_set_error(&rreq, prev_rreq, &recvq_posted_head, &recvq_posted_tail, &error, MPI_PROC_NULL);
                    continue;
                }
            }

            offset = (comm_ptr->comm_kind == MPID_INTRACOMM) ?  MPID_CONTEXT_INTRA_COLL : MPID_CONTEXT_INTER_COLL;
            match.parts.context_id = comm_ptr->recvcontext_id + MPID_CONTEXT_INTRANODE_OFFSET + offset;

            if (MATCH_WITH_LEFT_RIGHT_MASK(rreq->dev.match, match, mask)) {
                if (MPIR_TAG_MASK_ERROR_BITS(rreq->dev.match.parts.tag) != MPIR_AGREE_TAG &&
                    MPIR_TAG_MASK_ERROR_BITS(rreq->dev.match.parts.tag) != MPIR_SHRINK_TAG) {
                    MPIU_DBG_MSG_FMT(CH3_OTHER,VERBOSE,(MPIU_DBG_FDEST,
                                "cleaning up posted collective pkt rank=%d tag=%d contextid=%d",
                                rreq->dev.match.parts.rank, rreq->dev.match.parts.tag, rreq->dev.match.parts.context_id));
                    dequeue_and_set_error(&rreq, prev_rreq, &recvq_posted_head, &recvq_posted_tail, &error, MPI_PROC_NULL);
                    continue;
                }
            }

            offset = (comm_ptr->comm_kind == MPID_INTRACOMM) ?  MPID_CONTEXT_INTRA_PT2PT : MPID_CONTEXT_INTER_PT2PT;
            match.parts.context_id = comm_ptr->recvcontext_id + MPID_CONTEXT_INTERNODE_OFFSET + offset;

            if (MATCH_WITH_LEFT_RIGHT_MASK(rreq->dev.match, match, mask)) {
                if (MPIR_TAG_MASK_ERROR_BITS(rreq->dev.match.parts.tag) != MPIR_AGREE_TAG &&
                    MPIR_TAG_MASK_ERROR_BITS(rreq->dev.match.parts.tag) != MPIR_SHRINK_TAG) {
                    MPIU_DBG_MSG_FMT(CH3_OTHER,VERBOSE,(MPIU_DBG_FDEST,
                                "cleaning up posted pt2pt pkt rank=%d tag=%d contextid=%d",
                                rreq->dev.match.parts.rank, rreq->dev.match.parts.tag, rreq->dev.match.parts.context_id));
                    dequeue_and_set_error(&rreq, prev_rreq, &recvq_posted_head, &recvq_posted_tail, &error, MPI_PROC_NULL);
                    continue;
                }
            }

            offset = (comm_ptr->comm_kind == MPID_INTRACOMM) ?  MPID_CONTEXT_INTRA_COLL : MPID_CONTEXT_INTER_COLL;
            match.parts.context_id = comm_ptr->recvcontext_id + MPID_CONTEXT_INTERNODE_OFFSET + offset;

            if (MATCH_WITH_LEFT_RIGHT_MASK(rreq->dev.match, match, mask)) {
                if (MPIR_TAG_MASK_ERROR_BITS(rreq->dev.match.parts.tag) != MPIR_AGREE_TAG &&
                    MPIR_TAG_MASK_ERROR_BITS(rreq->dev.match.parts.tag) != MPIR_SHRINK_TAG) {
                    MPIU_DBG_MSG_FMT(CH3_OTHER,VERBOSE,(MPIU_DBG_FDEST,
                                "cleaning up posted collective pkt rank=%d tag=%d contextid=%d",
                                rreq->dev.match.parts.rank, rreq->dev.match.parts.tag, rreq->dev.match.parts.context_id));
                    dequeue_and_set_error(&rreq, prev_rreq, &recvq_posted_head, &recvq_posted_tail, &error, MPI_PROC_NULL);
                    continue;
                }
            }
        }

        prev_rreq = rreq;
        rreq = rreq->dev.next;
    }

    MPIDI_FUNC_EXIT(MPIDI_CH3U_CLEAN_RECVQ);

    return mpi_errno;
}

#undef FUNCNAME
#define FUNCNAME MPIDU_Complete_posted_with_error
#undef FCNAME
#define FCNAME MPL_QUOTE(FUNCNAME)
int MPIDI_CH3U_Complete_posted_with_error(MPIDI_VC_t *vc)
{
    int mpi_errno = MPI_SUCCESS;
    MPID_Request *req, *prev_req;
    int error = MPI_SUCCESS;
    MPIDI_STATE_DECL(MPID_STATE_MPIDU_COMPLETE_POSTED_WITH_ERROR);

    MPIDI_FUNC_ENTER(MPID_STATE_MPIDU_COMPLETE_POSTED_WITH_ERROR);

    MPID_THREAD_CS_ENTER(POBJ, MPIR_THREAD_POBJ_MSGQ_MUTEX);

    MPIR_ERR_SETSIMPLE(error, MPIX_ERR_PROC_FAILED, "**proc_failed");

    /* check each req in the posted queue and complete-with-error any requests
       using this VC. */
    req = recvq_posted_head;
    prev_req = NULL;
    while (req) {
        if (req->dev.match.parts.rank != MPI_ANY_SOURCE && req_uses_vc(req, vc)) {
            dequeue_and_set_error(&req, prev_req, &recvq_posted_head, &recvq_posted_tail, &error, MPI_PROC_NULL);
        } else {
            prev_req = req;
            req = req->dev.next;
        }
    }
    
 fn_exit:
    MPID_THREAD_CS_EXIT(POBJ, MPIR_THREAD_POBJ_MSGQ_MUTEX);

    MPIDI_FUNC_EXIT(MPID_STATE_MPIDU_COMPLETE_POSTED_WITH_ERROR);
    return mpi_errno;
 fn_fail:
    goto fn_exit;
}


/* --BEGIN ERROR HANDLING-- */
/* pretty prints tag, returns out for calling convenience */
static char *tag_val_to_str(int tag, char *out, int max)
{
    if (tag == MPI_ANY_TAG) {
        MPIU_Strncpy(out, "MPI_ANY_TAG", max);
    }
    else {
        MPL_snprintf(out, max, "%d", tag);
    }
    return out;
}

/* pretty prints rank, returns out for calling convenience */
static char *rank_val_to_str(int rank, char *out, int max)
{
    if (rank == MPI_ANY_SOURCE) {
        MPIU_Strncpy(out, "MPI_ANY_SOURCE", max);
    }
    else {
        MPL_snprintf(out, max, "%d", rank);
    }
    return out;
}
/* --END ERROR HANDLING-- */

/* --BEGIN DEBUG-- */
/* satisfy the compiler */
void MPIDI_CH3U_Dbg_print_recvq(FILE *stream);

/* This function can be called by a debugger to dump the recvq state to the
 * given stream. */
#undef FUNCNAME
#define FUNCNAME MPIDI_CH3U_Dbg_print_recvq
#undef FCNAME
#define FCNAME MPL_QUOTE(FUNCNAME)
void MPIDI_CH3U_Dbg_print_recvq(FILE *stream)
{
    MPID_Request * rreq;
    int i;
    char tag_buf[128];
    char rank_buf[128];

    fprintf(stream, "========================================\n");
    fprintf(stream, "MPI_COMM_WORLD  ctx=%#x rank=%d\n", MPIR_Process.comm_world->context_id, MPIR_Process.comm_world->rank);
    fprintf(stream, "MPI_COMM_SELF   ctx=%#x\n", MPIR_Process.comm_self->context_id);
    if (MPIR_Process.comm_parent) {
        fprintf(stream, "MPI_COMM_PARENT ctx=%#x recvctx=%#x\n",
                MPIR_Process.comm_self->context_id,
                MPIR_Process.comm_parent->recvcontext_id);
    }
    else {
        fprintf(stream, "MPI_COMM_PARENT (NULL)\n");
    }

    fprintf(stream, "CH3 Posted RecvQ:\n");
    rreq = recvq_posted_head;
    i = 0;
    while (rreq != NULL) {
        fprintf(stream, "..[%d] rreq=%p ctx=%#x rank=%s tag=%s\n", i, rreq,
                        rreq->dev.match.parts.context_id,
                        rank_val_to_str(rreq->dev.match.parts.rank, rank_buf, sizeof(rank_buf)),
                        tag_val_to_str(rreq->dev.match.parts.tag, tag_buf, sizeof(tag_buf)));
        ++i;
        rreq = rreq->dev.next;
    }

    fprintf(stream, "CH3 Unexpected RecvQ:\n");
    rreq = recvq_unexpected_head;
    i = 0;
    while (rreq != NULL) {
        fprintf(stream, "..[%d] rreq=%p ctx=%#x rank=%s tag=%s\n", i, rreq,
                        rreq->dev.match.parts.context_id,
                        rank_val_to_str(rreq->dev.match.parts.rank, rank_buf, sizeof(rank_buf)),
                        tag_val_to_str(rreq->dev.match.parts.tag, tag_buf, sizeof(tag_buf)));
        fprintf(stream, "..    status.src=%s status.tag=%s\n",
                        rank_val_to_str(rreq->status.MPI_SOURCE, rank_buf, sizeof(rank_buf)),
                        tag_val_to_str(rreq->status.MPI_TAG, tag_buf, sizeof(tag_buf)));
        ++i;
        rreq = rreq->dev.next;
    }
    fprintf(stream, "========================================\n");
}
/* --END DEBUG-- */

/* returns the number of elements in the unexpected queue */
#undef FUNCNAME
#define FUNCNAME MPIDI_CH3U_Recvq_count_unexp
#undef FCNAME
#define FCNAME MPL_QUOTE(FUNCNAME)
int MPIDI_CH3U_Recvq_count_unexp(void)
{
    int count = 0;
    MPID_Request *req = recvq_unexpected_head;

    while (req)
    {
        ++count;
        req = req->dev.next;
    }

    return count;
}