Blob Blame History Raw
/* -*- Mode: C; c-basic-offset:4 ; indent-tabs-mode:nil ; -*- */
/*
 *  (C) 2012 by Argonne National Laboratory.
 *      See COPYRIGHT in top-level directory.
 */

#include "mpidimpl.h"

int (*MPIDI_Anysource_improbe_fn)(int tag, MPID_Comm * comm, int context_offset,
                                  int *flag, MPID_Request **message,
                                  MPI_Status * status) = NULL;

#undef FUNCNAME
#define FUNCNAME MPID_Improbe
#undef FCNAME
#define FCNAME MPL_QUOTE(FUNCNAME)
int MPID_Improbe(int source, int tag, MPID_Comm *comm, int context_offset,
                 int *flag, MPID_Request **message, MPI_Status *status)
{
    int mpi_errno = MPI_SUCCESS;
    int context_id = comm->recvcontext_id + context_offset;

    *message = NULL;

    if (source == MPI_PROC_NULL)
    {
        MPIR_Status_set_procnull(status);
        *flag = TRUE;
        *message = NULL; /* should be interpreted as MPI_MESSAGE_NO_PROC */
        goto fn_exit;
    }

    /* Check to make sure the communicator hasn't already been revoked */
    if (comm->revoked) {
        MPIR_ERR_SETANDJUMP(mpi_errno,MPIX_ERR_REVOKED,"**revoked");
    }

#ifdef ENABLE_COMM_OVERRIDES
    if (MPIDI_Anysource_improbe_fn) {
        if (source == MPI_ANY_SOURCE) {
            /* if it's anysource, check shm, then check the network.
               If still not found, call progress, and check again. */

            /* check shm*/
            MPID_THREAD_CS_ENTER(POBJ, MPIR_THREAD_POBJ_MSGQ_MUTEX);
            *message = MPIDI_CH3U_Recvq_FDU_matchonly(source, tag, context_id, comm, flag);
            MPID_THREAD_CS_EXIT(POBJ, MPIR_THREAD_POBJ_MSGQ_MUTEX);
            if (!*flag) {
                /* not found, check network */
                mpi_errno = MPIDI_Anysource_improbe_fn(tag, comm, context_offset, flag, message, status);
                if (mpi_errno) MPIR_ERR_POP(mpi_errno);
                if (!*flag) {
                    /* still not found, make some progress*/
                    mpi_errno = MPIDI_CH3_Progress_poke();
                    if (mpi_errno) MPIR_ERR_POP(mpi_errno);
                    /* check shm again */
                    MPID_THREAD_CS_ENTER(POBJ, MPIR_THREAD_POBJ_MSGQ_MUTEX);
                    *message = MPIDI_CH3U_Recvq_FDU_matchonly(source, tag, context_id, comm, flag);
                    MPID_THREAD_CS_EXIT(POBJ, MPIR_THREAD_POBJ_MSGQ_MUTEX);
                    if (!*flag) {
                        /* check network again */
                        mpi_errno = MPIDI_Anysource_improbe_fn(tag, comm, context_offset, flag, message, status);
                        if (mpi_errno) MPIR_ERR_POP(mpi_errno);
                    }
                }
            }
            goto fn_exit;
        }
        else {
            /* it's not anysource, check if the netmod has overridden it */
            MPIDI_VC_t * vc;
            MPIDI_Comm_get_vc_set_active(comm, source, &vc);
            if (vc->comm_ops && vc->comm_ops->improbe) {
                mpi_errno = vc->comm_ops->improbe(vc, source, tag, comm, context_offset, flag, message, status);
                if (mpi_errno) MPIR_ERR_POP(mpi_errno);
                goto fn_exit;
            }
            /* fall-through to shm case */
        }
    }
#endif

    MPID_THREAD_CS_ENTER(POBJ, MPIR_THREAD_POBJ_MSGQ_MUTEX);
    *message = MPIDI_CH3U_Recvq_FDU_matchonly(source, tag, context_id, comm, flag);
    MPID_THREAD_CS_EXIT(POBJ, MPIR_THREAD_POBJ_MSGQ_MUTEX);

    if (!*flag) {
        /* Always try to advance progress before returning failure
           from the improbe test. */
        /* FIXME: It would be helpful to know if the Progress_poke
           operation causes any change in state; we could then avoid
           a second test of the receive queue if we knew that nothing
           had changed */
        mpi_errno = MPID_Progress_poke();
        if (mpi_errno) MPIR_ERR_POP(mpi_errno);
        MPID_THREAD_CS_ENTER(POBJ, MPIR_THREAD_POBJ_MSGQ_MUTEX);
        *message = MPIDI_CH3U_Recvq_FDU_matchonly(source, tag, context_id, comm, flag);
        MPID_THREAD_CS_EXIT(POBJ, MPIR_THREAD_POBJ_MSGQ_MUTEX);
    }

    if (*flag && *message) {
        (*message)->kind = MPID_REQUEST_MPROBE;
        MPIR_Request_extract_status((*message), status);
    }

fn_exit:
    return mpi_errno;
fn_fail:
    goto fn_exit;
}