Blame src/mpi/init/async.c

Packit Service c5cf8c
/* -*- Mode: C; c-basic-offset:4 ; indent-tabs-mode:nil ; -*- */
Packit Service c5cf8c
/*
Packit Service c5cf8c
 *  (C) 2001 by Argonne National Laboratory.
Packit Service c5cf8c
 *      See COPYRIGHT in top-level directory.
Packit Service c5cf8c
 */
Packit Service c5cf8c
Packit Service c5cf8c
#include "mpiimpl.h"
Packit Service c5cf8c
#include "mpi_init.h"
Packit Service c5cf8c
Packit Service c5cf8c
#ifndef MPICH_MPI_FROM_PMPI
Packit Service c5cf8c
Packit Service c5cf8c
#if MPICH_THREAD_LEVEL == MPI_THREAD_MULTIPLE
Packit Service c5cf8c
static MPIR_Comm *progress_comm_ptr;
Packit Service c5cf8c
static MPID_Thread_id_t progress_thread_id;
Packit Service c5cf8c
static MPID_Thread_mutex_t progress_mutex;
Packit Service c5cf8c
static MPID_Thread_cond_t progress_cond;
Packit Service c5cf8c
static volatile int progress_thread_done = 0;
Packit Service c5cf8c
Packit Service c5cf8c
/* We can use whatever tag we want; we use a different communicator
Packit Service c5cf8c
 * for communicating with the progress thread. */
Packit Service c5cf8c
#define WAKE_TAG 100
Packit Service c5cf8c
Packit Service c5cf8c
#undef FUNCNAME
Packit Service c5cf8c
#define FUNCNAME progress_fn
Packit Service c5cf8c
#undef FCNAME
Packit Service c5cf8c
#define FCNAME MPL_QUOTE(FUNCNAME)
Packit Service c5cf8c
static void progress_fn(void *data)
Packit Service c5cf8c
{
Packit Service c5cf8c
    int mpi_errno = MPI_SUCCESS;
Packit Service c5cf8c
    MPIR_Request *request_ptr = NULL;
Packit Service c5cf8c
    MPI_Request request;
Packit Service c5cf8c
    MPI_Status status;
Packit Service c5cf8c
Packit Service c5cf8c
    /* Explicitly add CS_ENTER/EXIT since this thread is created from
Packit Service c5cf8c
     * within an internal function and will call NMPI functions
Packit Service c5cf8c
     * directly. */
Packit Service c5cf8c
    MPID_THREAD_CS_ENTER(GLOBAL, MPIR_THREAD_GLOBAL_ALLFUNC_MUTEX);
Packit Service c5cf8c
Packit Service c5cf8c
    /* FIXME: We assume that waiting on some request forces progress
Packit Service c5cf8c
     * on all requests. With fine-grained threads, will this still
Packit Service c5cf8c
     * work as expected? We can imagine an approach where a request on
Packit Service c5cf8c
     * a non-conflicting communicator would not touch the remaining
Packit Service c5cf8c
     * requests to avoid locking issues. Once the fine-grained threads
Packit Service c5cf8c
     * code is fully functional, we need to revisit this and, if
Packit Service c5cf8c
     * appropriate, either change what we do in this thread, or delete
Packit Service c5cf8c
     * this comment. */
Packit Service c5cf8c
Packit Service c5cf8c
    mpi_errno = MPID_Irecv(NULL, 0, MPI_CHAR, 0, WAKE_TAG, progress_comm_ptr,
Packit Service c5cf8c
                           MPIR_CONTEXT_INTRA_PT2PT, &request_ptr);
Packit Service c5cf8c
    MPIR_Assert(!mpi_errno);
Packit Service c5cf8c
    request = request_ptr->handle;
Packit Service c5cf8c
    mpi_errno = MPIR_Wait(&request, &status);
Packit Service c5cf8c
    MPIR_Assert(!mpi_errno);
Packit Service c5cf8c
Packit Service c5cf8c
    /* Send a signal to the main thread saying we are done */
Packit Service c5cf8c
    MPID_Thread_mutex_lock(&progress_mutex, &mpi_errno);
Packit Service c5cf8c
    MPIR_Assert(!mpi_errno);
Packit Service c5cf8c
Packit Service c5cf8c
    progress_thread_done = 1;
Packit Service c5cf8c
Packit Service c5cf8c
    MPID_Thread_mutex_unlock(&progress_mutex, &mpi_errno);
Packit Service c5cf8c
    MPIR_Assert(!mpi_errno);
Packit Service c5cf8c
Packit Service c5cf8c
    MPID_Thread_cond_signal(&progress_cond, &mpi_errno);
Packit Service c5cf8c
    MPIR_Assert(!mpi_errno);
Packit Service c5cf8c
Packit Service c5cf8c
    MPID_THREAD_CS_EXIT(GLOBAL, MPIR_THREAD_GLOBAL_ALLFUNC_MUTEX);
Packit Service c5cf8c
Packit Service c5cf8c
    return;
Packit Service c5cf8c
}
Packit Service c5cf8c
Packit Service c5cf8c
#endif /* MPICH_THREAD_LEVEL == MPI_THREAD_MULTIPLE */
Packit Service c5cf8c
Packit Service c5cf8c
#undef FUNCNAME
Packit Service c5cf8c
#define FUNCNAME MPIR_Init_async_thread
Packit Service c5cf8c
#undef FCNAME
Packit Service c5cf8c
#define FCNAME MPL_QUOTE(FUNCNAME)
Packit Service c5cf8c
int MPIR_Init_async_thread(void)
Packit Service c5cf8c
{
Packit Service c5cf8c
#if MPICH_THREAD_LEVEL == MPI_THREAD_MULTIPLE
Packit Service c5cf8c
    int mpi_errno = MPI_SUCCESS;
Packit Service c5cf8c
    MPIR_Comm *comm_self_ptr;
Packit Service c5cf8c
    int err = 0;
Packit Service c5cf8c
    MPIR_FUNC_TERSE_STATE_DECL(MPID_STATE_MPIR_INIT_ASYNC_THREAD);
Packit Service c5cf8c
Packit Service c5cf8c
    MPIR_FUNC_TERSE_ENTER(MPID_STATE_MPIR_INIT_ASYNC_THREAD);
Packit Service c5cf8c
Packit Service c5cf8c
Packit Service c5cf8c
    /* Dup comm world for the progress thread */
Packit Service c5cf8c
    MPIR_Comm_get_ptr(MPI_COMM_SELF, comm_self_ptr);
Packit Service c5cf8c
    mpi_errno = MPIR_Comm_dup_impl(comm_self_ptr, &progress_comm_ptr);
Packit Service c5cf8c
    if (mpi_errno)
Packit Service c5cf8c
        MPIR_ERR_POP(mpi_errno);
Packit Service c5cf8c
Packit Service c5cf8c
    MPID_Thread_cond_create(&progress_cond, &err;;
Packit Service c5cf8c
    MPIR_ERR_CHKANDJUMP1(err, mpi_errno, MPI_ERR_OTHER, "**cond_create", "**cond_create %s",
Packit Service c5cf8c
                         strerror(err));
Packit Service c5cf8c
Packit Service c5cf8c
    MPID_Thread_mutex_create(&progress_mutex, &err;;
Packit Service c5cf8c
    MPIR_ERR_CHKANDJUMP1(err, mpi_errno, MPI_ERR_OTHER, "**mutex_create", "**mutex_create %s",
Packit Service c5cf8c
                         strerror(err));
Packit Service c5cf8c
Packit Service c5cf8c
    MPID_Thread_create((MPID_Thread_func_t) progress_fn, NULL, &progress_thread_id, &err;;
Packit Service c5cf8c
    MPIR_ERR_CHKANDJUMP1(err, mpi_errno, MPI_ERR_OTHER, "**mutex_create", "**mutex_create %s",
Packit Service c5cf8c
                         strerror(err));
Packit Service c5cf8c
Packit Service c5cf8c
    MPIR_FUNC_TERSE_EXIT(MPID_STATE_MPIR_INIT_ASYNC_THREAD);
Packit Service c5cf8c
Packit Service c5cf8c
  fn_exit:
Packit Service c5cf8c
    return mpi_errno;
Packit Service c5cf8c
  fn_fail:
Packit Service c5cf8c
    goto fn_exit;
Packit Service c5cf8c
#else
Packit Service c5cf8c
    return MPI_SUCCESS;
Packit Service c5cf8c
#endif /* MPICH_THREAD_LEVEL == MPI_THREAD_MULTIPLE */
Packit Service c5cf8c
}
Packit Service c5cf8c
Packit Service c5cf8c
#undef FUNCNAME
Packit Service c5cf8c
#define FUNCNAME MPIR_Finalize_async_thread
Packit Service c5cf8c
#undef FCNAME
Packit Service c5cf8c
#define FCNAME MPL_QUOTE(FUNCNAME)
Packit Service c5cf8c
int MPIR_Finalize_async_thread(void)
Packit Service c5cf8c
{
Packit Service c5cf8c
    int mpi_errno = MPI_SUCCESS;
Packit Service c5cf8c
#if MPICH_THREAD_LEVEL == MPI_THREAD_MULTIPLE
Packit Service c5cf8c
    MPIR_Request *request_ptr = NULL;
Packit Service c5cf8c
    MPI_Request request;
Packit Service c5cf8c
    MPI_Status status;
Packit Service c5cf8c
    MPIR_FUNC_TERSE_STATE_DECL(MPID_STATE_MPIR_FINALIZE_ASYNC_THREAD);
Packit Service c5cf8c
Packit Service c5cf8c
    MPIR_FUNC_TERSE_ENTER(MPID_STATE_MPIR_FINALIZE_ASYNC_THREAD);
Packit Service c5cf8c
Packit Service c5cf8c
    mpi_errno = MPID_Isend(NULL, 0, MPI_CHAR, 0, WAKE_TAG, progress_comm_ptr,
Packit Service c5cf8c
                           MPIR_CONTEXT_INTRA_PT2PT, &request_ptr);
Packit Service c5cf8c
    MPIR_Assert(!mpi_errno);
Packit Service c5cf8c
    request = request_ptr->handle;
Packit Service c5cf8c
    mpi_errno = MPIR_Wait(&request, &status);
Packit Service c5cf8c
    MPIR_Assert(!mpi_errno);
Packit Service c5cf8c
Packit Service c5cf8c
    /* XXX DJG why is this unlock/lock necessary?  Should we just YIELD here or later?  */
Packit Service c5cf8c
    MPID_THREAD_CS_EXIT(GLOBAL, MPIR_THREAD_GLOBAL_ALLFUNC_MUTEX);
Packit Service c5cf8c
Packit Service c5cf8c
    MPID_Thread_mutex_lock(&progress_mutex, &mpi_errno);
Packit Service c5cf8c
    MPIR_Assert(!mpi_errno);
Packit Service c5cf8c
Packit Service c5cf8c
    while (!progress_thread_done) {
Packit Service c5cf8c
        MPID_Thread_cond_wait(&progress_cond, &progress_mutex, &mpi_errno);
Packit Service c5cf8c
        MPIR_Assert(!mpi_errno);
Packit Service c5cf8c
    }
Packit Service c5cf8c
Packit Service c5cf8c
    MPID_Thread_mutex_unlock(&progress_mutex, &mpi_errno);
Packit Service c5cf8c
    MPIR_Assert(!mpi_errno);
Packit Service c5cf8c
Packit Service c5cf8c
    mpi_errno = MPIR_Comm_free_impl(progress_comm_ptr);
Packit Service c5cf8c
    MPIR_Assert(!mpi_errno);
Packit Service c5cf8c
Packit Service c5cf8c
    MPID_THREAD_CS_ENTER(GLOBAL, MPIR_THREAD_GLOBAL_ALLFUNC_MUTEX);
Packit Service c5cf8c
Packit Service c5cf8c
    MPID_Thread_cond_destroy(&progress_cond, &mpi_errno);
Packit Service c5cf8c
    MPIR_Assert(!mpi_errno);
Packit Service c5cf8c
Packit Service c5cf8c
    MPID_Thread_mutex_destroy(&progress_mutex, &mpi_errno);
Packit Service c5cf8c
    MPIR_Assert(!mpi_errno);
Packit Service c5cf8c
Packit Service c5cf8c
    MPIR_FUNC_TERSE_EXIT(MPID_STATE_MPIR_FINALIZE_ASYNC_THREAD);
Packit Service c5cf8c
Packit Service c5cf8c
#endif /* MPICH_THREAD_LEVEL == MPI_THREAD_MULTIPLE */
Packit Service c5cf8c
    return mpi_errno;
Packit Service c5cf8c
}
Packit Service c5cf8c
Packit Service c5cf8c
#endif