Blame src/mpi/init/async.c

Packit 0848f5
/* -*- Mode: C; c-basic-offset:4 ; indent-tabs-mode:nil ; -*- */
Packit 0848f5
/*
Packit 0848f5
 *  (C) 2001 by Argonne National Laboratory.
Packit 0848f5
 *      See COPYRIGHT in top-level directory.
Packit 0848f5
 */
Packit 0848f5
Packit 0848f5
#include "mpiimpl.h"
Packit 0848f5
#include "mpi_init.h"
Packit 0848f5
#include "mpiu_thread.h"
Packit 0848f5
Packit 0848f5
#ifndef MPICH_MPI_FROM_PMPI
Packit 0848f5
Packit 0848f5
#if MPICH_THREAD_LEVEL == MPI_THREAD_MULTIPLE
Packit 0848f5
static MPID_Comm *progress_comm_ptr;
Packit 0848f5
static MPID_Thread_id_t progress_thread_id;
Packit 0848f5
static MPID_Thread_mutex_t progress_mutex;
Packit 0848f5
static MPID_Thread_cond_t progress_cond;
Packit 0848f5
static volatile int progress_thread_done = 0;
Packit 0848f5
Packit 0848f5
/* We can use whatever tag we want; we use a different communicator
Packit 0848f5
 * for communicating with the progress thread. */
Packit 0848f5
#define WAKE_TAG 100
Packit 0848f5
Packit 0848f5
#undef FUNCNAME
Packit 0848f5
#define FUNCNAME progress_fn
Packit 0848f5
#undef FCNAME
Packit 0848f5
#define FCNAME MPL_QUOTE(FUNCNAME)
Packit 0848f5
static void progress_fn(void * data)
Packit 0848f5
{
Packit 0848f5
    int mpi_errno = MPI_SUCCESS;
Packit 0848f5
    MPID_Request *request_ptr = NULL;
Packit 0848f5
    MPI_Request request;
Packit 0848f5
    MPI_Status status;
Packit 0848f5
Packit 0848f5
    /* Explicitly add CS_ENTER/EXIT since this thread is created from
Packit 0848f5
     * within an internal function and will call NMPI functions
Packit 0848f5
     * directly. */
Packit 0848f5
    MPID_THREAD_CS_ENTER(GLOBAL, MPIR_THREAD_GLOBAL_ALLFUNC_MUTEX);
Packit 0848f5
Packit 0848f5
    /* FIXME: We assume that waiting on some request forces progress
Packit 0848f5
     * on all requests. With fine-grained threads, will this still
Packit 0848f5
     * work as expected? We can imagine an approach where a request on
Packit 0848f5
     * a non-conflicting communicator would not touch the remaining
Packit 0848f5
     * requests to avoid locking issues. Once the fine-grained threads
Packit 0848f5
     * code is fully functional, we need to revisit this and, if
Packit 0848f5
     * appropriate, either change what we do in this thread, or delete
Packit 0848f5
     * this comment. */
Packit 0848f5
Packit 0848f5
    mpi_errno = MPID_Irecv(NULL, 0, MPI_CHAR, 0, WAKE_TAG, progress_comm_ptr,
Packit 0848f5
                           MPID_CONTEXT_INTRA_PT2PT, &request_ptr);
Packit 0848f5
    MPIU_Assert(!mpi_errno);
Packit 0848f5
    request = request_ptr->handle;
Packit 0848f5
    mpi_errno = MPIR_Wait_impl(&request, &status);
Packit 0848f5
    MPIU_Assert(!mpi_errno);
Packit 0848f5
Packit 0848f5
    /* Send a signal to the main thread saying we are done */
Packit 0848f5
    MPID_Thread_mutex_lock(&progress_mutex, &mpi_errno);
Packit 0848f5
    MPIU_Assert(!mpi_errno);
Packit 0848f5
Packit 0848f5
    progress_thread_done = 1;
Packit 0848f5
Packit 0848f5
    MPID_Thread_mutex_unlock(&progress_mutex, &mpi_errno);
Packit 0848f5
    MPIU_Assert(!mpi_errno);
Packit 0848f5
Packit 0848f5
    MPID_Thread_cond_signal(&progress_cond, &mpi_errno);
Packit 0848f5
    MPIU_Assert(!mpi_errno);
Packit 0848f5
Packit 0848f5
    MPID_THREAD_CS_EXIT(GLOBAL, MPIR_THREAD_GLOBAL_ALLFUNC_MUTEX);
Packit 0848f5
Packit 0848f5
    return;
Packit 0848f5
}
Packit 0848f5
Packit 0848f5
#endif /* MPICH_THREAD_LEVEL == MPI_THREAD_MULTIPLE */
Packit 0848f5
Packit 0848f5
#undef FUNCNAME
Packit 0848f5
#define FUNCNAME MPIR_Init_async_thread
Packit 0848f5
#undef FCNAME
Packit 0848f5
#define FCNAME MPL_QUOTE(FUNCNAME)
Packit 0848f5
int MPIR_Init_async_thread(void)
Packit 0848f5
{
Packit 0848f5
#if MPICH_THREAD_LEVEL == MPI_THREAD_MULTIPLE
Packit 0848f5
    int mpi_errno = MPI_SUCCESS;
Packit 0848f5
    MPID_Comm *comm_self_ptr;
Packit 0848f5
    int err = 0;
Packit 0848f5
    MPID_MPI_STATE_DECL(MPID_STATE_MPIR_INIT_ASYNC_THREAD);
Packit 0848f5
Packit 0848f5
    MPID_MPI_FUNC_ENTER(MPID_STATE_MPIR_INIT_ASYNC_THREAD);
Packit 0848f5
Packit 0848f5
Packit 0848f5
    /* Dup comm world for the progress thread */
Packit 0848f5
    MPID_Comm_get_ptr(MPI_COMM_SELF, comm_self_ptr);
Packit 0848f5
    mpi_errno = MPIR_Comm_dup_impl(comm_self_ptr, &progress_comm_ptr);
Packit 0848f5
    if (mpi_errno) MPIR_ERR_POP(mpi_errno);
Packit 0848f5
Packit 0848f5
    MPID_Thread_cond_create(&progress_cond, &err;;
Packit 0848f5
    MPIR_ERR_CHKANDJUMP1(err, mpi_errno, MPI_ERR_OTHER, "**cond_create", "**cond_create %s", strerror(err));
Packit 0848f5
    
Packit 0848f5
    MPID_Thread_mutex_create(&progress_mutex, &err;;
Packit 0848f5
    MPIR_ERR_CHKANDJUMP1(err, mpi_errno, MPI_ERR_OTHER, "**mutex_create", "**mutex_create %s", strerror(err));
Packit 0848f5
    
Packit 0848f5
    MPID_Thread_create((MPID_Thread_func_t) progress_fn, NULL, &progress_thread_id, &err;;
Packit 0848f5
    MPIR_ERR_CHKANDJUMP1(err, mpi_errno, MPI_ERR_OTHER, "**mutex_create", "**mutex_create %s", strerror(err));
Packit 0848f5
    
Packit 0848f5
    MPID_MPI_FUNC_EXIT(MPID_STATE_MPIR_INIT_ASYNC_THREAD);
Packit 0848f5
Packit 0848f5
 fn_exit:
Packit 0848f5
    return mpi_errno;
Packit 0848f5
 fn_fail:
Packit 0848f5
    goto fn_exit;
Packit 0848f5
#else
Packit 0848f5
    return MPI_SUCCESS;
Packit 0848f5
#endif /* MPICH_THREAD_LEVEL == MPI_THREAD_MULTIPLE */
Packit 0848f5
}
Packit 0848f5
Packit 0848f5
#undef FUNCNAME
Packit 0848f5
#define FUNCNAME MPIR_Finalize_async_thread
Packit 0848f5
#undef FCNAME
Packit 0848f5
#define FCNAME MPL_QUOTE(FUNCNAME)
Packit 0848f5
int MPIR_Finalize_async_thread(void)
Packit 0848f5
{
Packit 0848f5
    int mpi_errno = MPI_SUCCESS;
Packit 0848f5
#if MPICH_THREAD_LEVEL == MPI_THREAD_MULTIPLE
Packit 0848f5
    MPID_Request *request_ptr = NULL;
Packit 0848f5
    MPI_Request request;
Packit 0848f5
    MPI_Status status;
Packit 0848f5
    MPID_MPI_STATE_DECL(MPID_STATE_MPIR_FINALIZE_ASYNC_THREAD);
Packit 0848f5
Packit 0848f5
    MPID_MPI_FUNC_ENTER(MPID_STATE_MPIR_FINALIZE_ASYNC_THREAD);
Packit 0848f5
Packit 0848f5
    mpi_errno = MPID_Isend(NULL, 0, MPI_CHAR, 0, WAKE_TAG, progress_comm_ptr,
Packit 0848f5
                           MPID_CONTEXT_INTRA_PT2PT, &request_ptr);
Packit 0848f5
    MPIU_Assert(!mpi_errno);
Packit 0848f5
    request = request_ptr->handle;
Packit 0848f5
    mpi_errno = MPIR_Wait_impl(&request, &status);
Packit 0848f5
    MPIU_Assert(!mpi_errno);
Packit 0848f5
Packit 0848f5
    /* XXX DJG why is this unlock/lock necessary?  Should we just YIELD here or later?  */
Packit 0848f5
    MPID_THREAD_CS_EXIT(GLOBAL, MPIR_THREAD_GLOBAL_ALLFUNC_MUTEX);
Packit 0848f5
Packit 0848f5
    MPID_Thread_mutex_lock(&progress_mutex, &mpi_errno);
Packit 0848f5
    MPIU_Assert(!mpi_errno);
Packit 0848f5
Packit 0848f5
    while (!progress_thread_done) {
Packit 0848f5
        MPID_Thread_cond_wait(&progress_cond, &progress_mutex, &mpi_errno);
Packit 0848f5
        MPIU_Assert(!mpi_errno);
Packit 0848f5
    }
Packit 0848f5
Packit 0848f5
    MPID_Thread_mutex_unlock(&progress_mutex, &mpi_errno);
Packit 0848f5
    MPIU_Assert(!mpi_errno);
Packit 0848f5
Packit 0848f5
    mpi_errno = MPIR_Comm_free_impl(progress_comm_ptr);
Packit 0848f5
    MPIU_Assert(!mpi_errno);
Packit 0848f5
Packit 0848f5
    MPID_THREAD_CS_ENTER(GLOBAL, MPIR_THREAD_GLOBAL_ALLFUNC_MUTEX);
Packit 0848f5
Packit 0848f5
    MPID_Thread_cond_destroy(&progress_cond, &mpi_errno);
Packit 0848f5
    MPIU_Assert(!mpi_errno);
Packit 0848f5
Packit 0848f5
    MPID_Thread_mutex_destroy(&progress_mutex, &mpi_errno);
Packit 0848f5
    MPIU_Assert(!mpi_errno);
Packit 0848f5
Packit 0848f5
    MPID_MPI_FUNC_EXIT(MPID_STATE_MPIR_FINALIZE_ASYNC_THREAD);
Packit 0848f5
Packit 0848f5
#endif /* MPICH_THREAD_LEVEL == MPI_THREAD_MULTIPLE */
Packit 0848f5
    return mpi_errno;
Packit 0848f5
}
Packit 0848f5
Packit 0848f5
#endif