/* -*- Mode: C; c-basic-offset:4 ; indent-tabs-mode:nil ; -*- */
/*
* (C) 2001 by Argonne National Laboratory.
* See COPYRIGHT in top-level directory.
*/
#include "mpiimpl.h"
#include "mpi_init.h"
#ifndef MPICH_MPI_FROM_PMPI
#if MPICH_THREAD_LEVEL == MPI_THREAD_MULTIPLE
static MPIR_Comm *progress_comm_ptr;
static MPID_Thread_id_t progress_thread_id;
static MPID_Thread_mutex_t progress_mutex;
static MPID_Thread_cond_t progress_cond;
static volatile int progress_thread_done = 0;
/* We can use whatever tag we want; we use a different communicator
* for communicating with the progress thread. */
#define WAKE_TAG 100
#undef FUNCNAME
#define FUNCNAME progress_fn
#undef FCNAME
#define FCNAME MPL_QUOTE(FUNCNAME)
static void progress_fn(void *data)
{
int mpi_errno = MPI_SUCCESS;
MPIR_Request *request_ptr = NULL;
MPI_Request request;
MPI_Status status;
/* Explicitly add CS_ENTER/EXIT since this thread is created from
* within an internal function and will call NMPI functions
* directly. */
MPID_THREAD_CS_ENTER(GLOBAL, MPIR_THREAD_GLOBAL_ALLFUNC_MUTEX);
/* FIXME: We assume that waiting on some request forces progress
* on all requests. With fine-grained threads, will this still
* work as expected? We can imagine an approach where a request on
* a non-conflicting communicator would not touch the remaining
* requests to avoid locking issues. Once the fine-grained threads
* code is fully functional, we need to revisit this and, if
* appropriate, either change what we do in this thread, or delete
* this comment. */
mpi_errno = MPID_Irecv(NULL, 0, MPI_CHAR, 0, WAKE_TAG, progress_comm_ptr,
MPIR_CONTEXT_INTRA_PT2PT, &request_ptr);
MPIR_Assert(!mpi_errno);
request = request_ptr->handle;
mpi_errno = MPIR_Wait(&request, &status);
MPIR_Assert(!mpi_errno);
/* Send a signal to the main thread saying we are done */
MPID_Thread_mutex_lock(&progress_mutex, &mpi_errno);
MPIR_Assert(!mpi_errno);
progress_thread_done = 1;
MPID_Thread_mutex_unlock(&progress_mutex, &mpi_errno);
MPIR_Assert(!mpi_errno);
MPID_Thread_cond_signal(&progress_cond, &mpi_errno);
MPIR_Assert(!mpi_errno);
MPID_THREAD_CS_EXIT(GLOBAL, MPIR_THREAD_GLOBAL_ALLFUNC_MUTEX);
return;
}
#endif /* MPICH_THREAD_LEVEL == MPI_THREAD_MULTIPLE */
#undef FUNCNAME
#define FUNCNAME MPIR_Init_async_thread
#undef FCNAME
#define FCNAME MPL_QUOTE(FUNCNAME)
int MPIR_Init_async_thread(void)
{
#if MPICH_THREAD_LEVEL == MPI_THREAD_MULTIPLE
int mpi_errno = MPI_SUCCESS;
MPIR_Comm *comm_self_ptr;
int err = 0;
MPIR_FUNC_TERSE_STATE_DECL(MPID_STATE_MPIR_INIT_ASYNC_THREAD);
MPIR_FUNC_TERSE_ENTER(MPID_STATE_MPIR_INIT_ASYNC_THREAD);
/* Dup comm world for the progress thread */
MPIR_Comm_get_ptr(MPI_COMM_SELF, comm_self_ptr);
mpi_errno = MPIR_Comm_dup_impl(comm_self_ptr, &progress_comm_ptr);
if (mpi_errno)
MPIR_ERR_POP(mpi_errno);
MPID_Thread_cond_create(&progress_cond, &err);
MPIR_ERR_CHKANDJUMP1(err, mpi_errno, MPI_ERR_OTHER, "**cond_create", "**cond_create %s",
strerror(err));
MPID_Thread_mutex_create(&progress_mutex, &err);
MPIR_ERR_CHKANDJUMP1(err, mpi_errno, MPI_ERR_OTHER, "**mutex_create", "**mutex_create %s",
strerror(err));
MPID_Thread_create((MPID_Thread_func_t) progress_fn, NULL, &progress_thread_id, &err);
MPIR_ERR_CHKANDJUMP1(err, mpi_errno, MPI_ERR_OTHER, "**mutex_create", "**mutex_create %s",
strerror(err));
MPIR_FUNC_TERSE_EXIT(MPID_STATE_MPIR_INIT_ASYNC_THREAD);
fn_exit:
return mpi_errno;
fn_fail:
goto fn_exit;
#else
return MPI_SUCCESS;
#endif /* MPICH_THREAD_LEVEL == MPI_THREAD_MULTIPLE */
}
#undef FUNCNAME
#define FUNCNAME MPIR_Finalize_async_thread
#undef FCNAME
#define FCNAME MPL_QUOTE(FUNCNAME)
int MPIR_Finalize_async_thread(void)
{
int mpi_errno = MPI_SUCCESS;
#if MPICH_THREAD_LEVEL == MPI_THREAD_MULTIPLE
MPIR_Request *request_ptr = NULL;
MPI_Request request;
MPI_Status status;
MPIR_FUNC_TERSE_STATE_DECL(MPID_STATE_MPIR_FINALIZE_ASYNC_THREAD);
MPIR_FUNC_TERSE_ENTER(MPID_STATE_MPIR_FINALIZE_ASYNC_THREAD);
mpi_errno = MPID_Isend(NULL, 0, MPI_CHAR, 0, WAKE_TAG, progress_comm_ptr,
MPIR_CONTEXT_INTRA_PT2PT, &request_ptr);
MPIR_Assert(!mpi_errno);
request = request_ptr->handle;
mpi_errno = MPIR_Wait(&request, &status);
MPIR_Assert(!mpi_errno);
/* XXX DJG why is this unlock/lock necessary? Should we just YIELD here or later? */
MPID_THREAD_CS_EXIT(GLOBAL, MPIR_THREAD_GLOBAL_ALLFUNC_MUTEX);
MPID_Thread_mutex_lock(&progress_mutex, &mpi_errno);
MPIR_Assert(!mpi_errno);
while (!progress_thread_done) {
MPID_Thread_cond_wait(&progress_cond, &progress_mutex, &mpi_errno);
MPIR_Assert(!mpi_errno);
}
MPID_Thread_mutex_unlock(&progress_mutex, &mpi_errno);
MPIR_Assert(!mpi_errno);
mpi_errno = MPIR_Comm_free_impl(progress_comm_ptr);
MPIR_Assert(!mpi_errno);
MPID_THREAD_CS_ENTER(GLOBAL, MPIR_THREAD_GLOBAL_ALLFUNC_MUTEX);
MPID_Thread_cond_destroy(&progress_cond, &mpi_errno);
MPIR_Assert(!mpi_errno);
MPID_Thread_mutex_destroy(&progress_mutex, &mpi_errno);
MPIR_Assert(!mpi_errno);
MPIR_FUNC_TERSE_EXIT(MPID_STATE_MPIR_FINALIZE_ASYNC_THREAD);
#endif /* MPICH_THREAD_LEVEL == MPI_THREAD_MULTIPLE */
return mpi_errno;
}
#endif