|
Packit Service |
c5cf8c |
/* -*- Mode: C; c-basic-offset:4 ; indent-tabs-mode:nil ; -*- */
|
|
Packit Service |
c5cf8c |
/*
|
|
Packit Service |
c5cf8c |
* (C) 2001 by Argonne National Laboratory.
|
|
Packit Service |
c5cf8c |
* See COPYRIGHT in top-level directory.
|
|
Packit Service |
c5cf8c |
*/
|
|
Packit Service |
c5cf8c |
|
|
Packit Service |
c5cf8c |
#include "mpiimpl.h"
|
|
Packit Service |
c5cf8c |
#include "mpi_init.h"
|
|
Packit Service |
c5cf8c |
|
|
Packit Service |
c5cf8c |
#ifndef MPICH_MPI_FROM_PMPI
|
|
Packit Service |
c5cf8c |
|
|
Packit Service |
c5cf8c |
#if MPICH_THREAD_LEVEL == MPI_THREAD_MULTIPLE
|
|
Packit Service |
c5cf8c |
static MPIR_Comm *progress_comm_ptr;
|
|
Packit Service |
c5cf8c |
static MPID_Thread_id_t progress_thread_id;
|
|
Packit Service |
c5cf8c |
static MPID_Thread_mutex_t progress_mutex;
|
|
Packit Service |
c5cf8c |
static MPID_Thread_cond_t progress_cond;
|
|
Packit Service |
c5cf8c |
static volatile int progress_thread_done = 0;
|
|
Packit Service |
c5cf8c |
|
|
Packit Service |
c5cf8c |
/* We can use whatever tag we want; we use a different communicator
|
|
Packit Service |
c5cf8c |
* for communicating with the progress thread. */
|
|
Packit Service |
c5cf8c |
#define WAKE_TAG 100
|
|
Packit Service |
c5cf8c |
|
|
Packit Service |
c5cf8c |
#undef FUNCNAME
|
|
Packit Service |
c5cf8c |
#define FUNCNAME progress_fn
|
|
Packit Service |
c5cf8c |
#undef FCNAME
|
|
Packit Service |
c5cf8c |
#define FCNAME MPL_QUOTE(FUNCNAME)
|
|
Packit Service |
c5cf8c |
static void progress_fn(void *data)
|
|
Packit Service |
c5cf8c |
{
|
|
Packit Service |
c5cf8c |
int mpi_errno = MPI_SUCCESS;
|
|
Packit Service |
c5cf8c |
MPIR_Request *request_ptr = NULL;
|
|
Packit Service |
c5cf8c |
MPI_Request request;
|
|
Packit Service |
c5cf8c |
MPI_Status status;
|
|
Packit Service |
c5cf8c |
|
|
Packit Service |
c5cf8c |
/* Explicitly add CS_ENTER/EXIT since this thread is created from
|
|
Packit Service |
c5cf8c |
* within an internal function and will call NMPI functions
|
|
Packit Service |
c5cf8c |
* directly. */
|
|
Packit Service |
c5cf8c |
MPID_THREAD_CS_ENTER(GLOBAL, MPIR_THREAD_GLOBAL_ALLFUNC_MUTEX);
|
|
Packit Service |
c5cf8c |
|
|
Packit Service |
c5cf8c |
/* FIXME: We assume that waiting on some request forces progress
|
|
Packit Service |
c5cf8c |
* on all requests. With fine-grained threads, will this still
|
|
Packit Service |
c5cf8c |
* work as expected? We can imagine an approach where a request on
|
|
Packit Service |
c5cf8c |
* a non-conflicting communicator would not touch the remaining
|
|
Packit Service |
c5cf8c |
* requests to avoid locking issues. Once the fine-grained threads
|
|
Packit Service |
c5cf8c |
* code is fully functional, we need to revisit this and, if
|
|
Packit Service |
c5cf8c |
* appropriate, either change what we do in this thread, or delete
|
|
Packit Service |
c5cf8c |
* this comment. */
|
|
Packit Service |
c5cf8c |
|
|
Packit Service |
c5cf8c |
mpi_errno = MPID_Irecv(NULL, 0, MPI_CHAR, 0, WAKE_TAG, progress_comm_ptr,
|
|
Packit Service |
c5cf8c |
MPIR_CONTEXT_INTRA_PT2PT, &request_ptr);
|
|
Packit Service |
c5cf8c |
MPIR_Assert(!mpi_errno);
|
|
Packit Service |
c5cf8c |
request = request_ptr->handle;
|
|
Packit Service |
c5cf8c |
mpi_errno = MPIR_Wait(&request, &status);
|
|
Packit Service |
c5cf8c |
MPIR_Assert(!mpi_errno);
|
|
Packit Service |
c5cf8c |
|
|
Packit Service |
c5cf8c |
/* Send a signal to the main thread saying we are done */
|
|
Packit Service |
c5cf8c |
MPID_Thread_mutex_lock(&progress_mutex, &mpi_errno);
|
|
Packit Service |
c5cf8c |
MPIR_Assert(!mpi_errno);
|
|
Packit Service |
c5cf8c |
|
|
Packit Service |
c5cf8c |
progress_thread_done = 1;
|
|
Packit Service |
c5cf8c |
|
|
Packit Service |
c5cf8c |
MPID_Thread_mutex_unlock(&progress_mutex, &mpi_errno);
|
|
Packit Service |
c5cf8c |
MPIR_Assert(!mpi_errno);
|
|
Packit Service |
c5cf8c |
|
|
Packit Service |
c5cf8c |
MPID_Thread_cond_signal(&progress_cond, &mpi_errno);
|
|
Packit Service |
c5cf8c |
MPIR_Assert(!mpi_errno);
|
|
Packit Service |
c5cf8c |
|
|
Packit Service |
c5cf8c |
MPID_THREAD_CS_EXIT(GLOBAL, MPIR_THREAD_GLOBAL_ALLFUNC_MUTEX);
|
|
Packit Service |
c5cf8c |
|
|
Packit Service |
c5cf8c |
return;
|
|
Packit Service |
c5cf8c |
}
|
|
Packit Service |
c5cf8c |
|
|
Packit Service |
c5cf8c |
#endif /* MPICH_THREAD_LEVEL == MPI_THREAD_MULTIPLE */
|
|
Packit Service |
c5cf8c |
|
|
Packit Service |
c5cf8c |
#undef FUNCNAME
|
|
Packit Service |
c5cf8c |
#define FUNCNAME MPIR_Init_async_thread
|
|
Packit Service |
c5cf8c |
#undef FCNAME
|
|
Packit Service |
c5cf8c |
#define FCNAME MPL_QUOTE(FUNCNAME)
|
|
Packit Service |
c5cf8c |
int MPIR_Init_async_thread(void)
|
|
Packit Service |
c5cf8c |
{
|
|
Packit Service |
c5cf8c |
#if MPICH_THREAD_LEVEL == MPI_THREAD_MULTIPLE
|
|
Packit Service |
c5cf8c |
int mpi_errno = MPI_SUCCESS;
|
|
Packit Service |
c5cf8c |
MPIR_Comm *comm_self_ptr;
|
|
Packit Service |
c5cf8c |
int err = 0;
|
|
Packit Service |
c5cf8c |
MPIR_FUNC_TERSE_STATE_DECL(MPID_STATE_MPIR_INIT_ASYNC_THREAD);
|
|
Packit Service |
c5cf8c |
|
|
Packit Service |
c5cf8c |
MPIR_FUNC_TERSE_ENTER(MPID_STATE_MPIR_INIT_ASYNC_THREAD);
|
|
Packit Service |
c5cf8c |
|
|
Packit Service |
c5cf8c |
|
|
Packit Service |
c5cf8c |
/* Dup comm world for the progress thread */
|
|
Packit Service |
c5cf8c |
MPIR_Comm_get_ptr(MPI_COMM_SELF, comm_self_ptr);
|
|
Packit Service |
c5cf8c |
mpi_errno = MPIR_Comm_dup_impl(comm_self_ptr, &progress_comm_ptr);
|
|
Packit Service |
c5cf8c |
if (mpi_errno)
|
|
Packit Service |
c5cf8c |
MPIR_ERR_POP(mpi_errno);
|
|
Packit Service |
c5cf8c |
|
|
Packit Service |
c5cf8c |
MPID_Thread_cond_create(&progress_cond, &err;;
|
|
Packit Service |
c5cf8c |
MPIR_ERR_CHKANDJUMP1(err, mpi_errno, MPI_ERR_OTHER, "**cond_create", "**cond_create %s",
|
|
Packit Service |
c5cf8c |
strerror(err));
|
|
Packit Service |
c5cf8c |
|
|
Packit Service |
c5cf8c |
MPID_Thread_mutex_create(&progress_mutex, &err;;
|
|
Packit Service |
c5cf8c |
MPIR_ERR_CHKANDJUMP1(err, mpi_errno, MPI_ERR_OTHER, "**mutex_create", "**mutex_create %s",
|
|
Packit Service |
c5cf8c |
strerror(err));
|
|
Packit Service |
c5cf8c |
|
|
Packit Service |
c5cf8c |
MPID_Thread_create((MPID_Thread_func_t) progress_fn, NULL, &progress_thread_id, &err;;
|
|
Packit Service |
c5cf8c |
MPIR_ERR_CHKANDJUMP1(err, mpi_errno, MPI_ERR_OTHER, "**mutex_create", "**mutex_create %s",
|
|
Packit Service |
c5cf8c |
strerror(err));
|
|
Packit Service |
c5cf8c |
|
|
Packit Service |
c5cf8c |
MPIR_FUNC_TERSE_EXIT(MPID_STATE_MPIR_INIT_ASYNC_THREAD);
|
|
Packit Service |
c5cf8c |
|
|
Packit Service |
c5cf8c |
fn_exit:
|
|
Packit Service |
c5cf8c |
return mpi_errno;
|
|
Packit Service |
c5cf8c |
fn_fail:
|
|
Packit Service |
c5cf8c |
goto fn_exit;
|
|
Packit Service |
c5cf8c |
#else
|
|
Packit Service |
c5cf8c |
return MPI_SUCCESS;
|
|
Packit Service |
c5cf8c |
#endif /* MPICH_THREAD_LEVEL == MPI_THREAD_MULTIPLE */
|
|
Packit Service |
c5cf8c |
}
|
|
Packit Service |
c5cf8c |
|
|
Packit Service |
c5cf8c |
#undef FUNCNAME
|
|
Packit Service |
c5cf8c |
#define FUNCNAME MPIR_Finalize_async_thread
|
|
Packit Service |
c5cf8c |
#undef FCNAME
|
|
Packit Service |
c5cf8c |
#define FCNAME MPL_QUOTE(FUNCNAME)
|
|
Packit Service |
c5cf8c |
int MPIR_Finalize_async_thread(void)
|
|
Packit Service |
c5cf8c |
{
|
|
Packit Service |
c5cf8c |
int mpi_errno = MPI_SUCCESS;
|
|
Packit Service |
c5cf8c |
#if MPICH_THREAD_LEVEL == MPI_THREAD_MULTIPLE
|
|
Packit Service |
c5cf8c |
MPIR_Request *request_ptr = NULL;
|
|
Packit Service |
c5cf8c |
MPI_Request request;
|
|
Packit Service |
c5cf8c |
MPI_Status status;
|
|
Packit Service |
c5cf8c |
MPIR_FUNC_TERSE_STATE_DECL(MPID_STATE_MPIR_FINALIZE_ASYNC_THREAD);
|
|
Packit Service |
c5cf8c |
|
|
Packit Service |
c5cf8c |
MPIR_FUNC_TERSE_ENTER(MPID_STATE_MPIR_FINALIZE_ASYNC_THREAD);
|
|
Packit Service |
c5cf8c |
|
|
Packit Service |
c5cf8c |
mpi_errno = MPID_Isend(NULL, 0, MPI_CHAR, 0, WAKE_TAG, progress_comm_ptr,
|
|
Packit Service |
c5cf8c |
MPIR_CONTEXT_INTRA_PT2PT, &request_ptr);
|
|
Packit Service |
c5cf8c |
MPIR_Assert(!mpi_errno);
|
|
Packit Service |
c5cf8c |
request = request_ptr->handle;
|
|
Packit Service |
c5cf8c |
mpi_errno = MPIR_Wait(&request, &status);
|
|
Packit Service |
c5cf8c |
MPIR_Assert(!mpi_errno);
|
|
Packit Service |
c5cf8c |
|
|
Packit Service |
c5cf8c |
/* XXX DJG why is this unlock/lock necessary? Should we just YIELD here or later? */
|
|
Packit Service |
c5cf8c |
MPID_THREAD_CS_EXIT(GLOBAL, MPIR_THREAD_GLOBAL_ALLFUNC_MUTEX);
|
|
Packit Service |
c5cf8c |
|
|
Packit Service |
c5cf8c |
MPID_Thread_mutex_lock(&progress_mutex, &mpi_errno);
|
|
Packit Service |
c5cf8c |
MPIR_Assert(!mpi_errno);
|
|
Packit Service |
c5cf8c |
|
|
Packit Service |
c5cf8c |
while (!progress_thread_done) {
|
|
Packit Service |
c5cf8c |
MPID_Thread_cond_wait(&progress_cond, &progress_mutex, &mpi_errno);
|
|
Packit Service |
c5cf8c |
MPIR_Assert(!mpi_errno);
|
|
Packit Service |
c5cf8c |
}
|
|
Packit Service |
c5cf8c |
|
|
Packit Service |
c5cf8c |
MPID_Thread_mutex_unlock(&progress_mutex, &mpi_errno);
|
|
Packit Service |
c5cf8c |
MPIR_Assert(!mpi_errno);
|
|
Packit Service |
c5cf8c |
|
|
Packit Service |
c5cf8c |
mpi_errno = MPIR_Comm_free_impl(progress_comm_ptr);
|
|
Packit Service |
c5cf8c |
MPIR_Assert(!mpi_errno);
|
|
Packit Service |
c5cf8c |
|
|
Packit Service |
c5cf8c |
MPID_THREAD_CS_ENTER(GLOBAL, MPIR_THREAD_GLOBAL_ALLFUNC_MUTEX);
|
|
Packit Service |
c5cf8c |
|
|
Packit Service |
c5cf8c |
MPID_Thread_cond_destroy(&progress_cond, &mpi_errno);
|
|
Packit Service |
c5cf8c |
MPIR_Assert(!mpi_errno);
|
|
Packit Service |
c5cf8c |
|
|
Packit Service |
c5cf8c |
MPID_Thread_mutex_destroy(&progress_mutex, &mpi_errno);
|
|
Packit Service |
c5cf8c |
MPIR_Assert(!mpi_errno);
|
|
Packit Service |
c5cf8c |
|
|
Packit Service |
c5cf8c |
MPIR_FUNC_TERSE_EXIT(MPID_STATE_MPIR_FINALIZE_ASYNC_THREAD);
|
|
Packit Service |
c5cf8c |
|
|
Packit Service |
c5cf8c |
#endif /* MPICH_THREAD_LEVEL == MPI_THREAD_MULTIPLE */
|
|
Packit Service |
c5cf8c |
return mpi_errno;
|
|
Packit Service |
c5cf8c |
}
|
|
Packit Service |
c5cf8c |
|
|
Packit Service |
c5cf8c |
#endif
|