Blob Blame History Raw
/* -*- Mode: C; c-basic-offset:4 ; indent-tabs-mode:nil ; -*- */
/*
 *  (C) 2001 by Argonne National Laboratory.
 *      See COPYRIGHT in top-level directory.
 *
 */

#ifndef MPIR_THREAD_H_INCLUDED
#define MPIR_THREAD_H_INCLUDED

#include "mpichconfconst.h"
#include "mpichconf.h"
#include "utlist.h"

typedef struct {
    int thread_provided;        /* Provided level of thread support */

    /* This is a special case for is_thread_main, which must be
     * implemented even if MPICH itself is single threaded.  */
#if MPICH_THREAD_LEVEL >= MPI_THREAD_SERIALIZED
    MPID_Thread_id_t master_thread;     /* Thread that started MPI */
#endif

#if defined MPICH_IS_THREADED
    int isThreaded;             /* Set to true if user requested
                                 * THREAD_MULTIPLE */
#endif                          /* MPICH_IS_THREADED */
} MPIR_Thread_info_t;
extern MPIR_Thread_info_t MPIR_ThreadInfo;

#ifdef MPICH_THREAD_USE_MDTA
typedef struct MPIR_Thread_sync {
    struct MPIR_Thread_sync *next;
    struct MPIR_Thread_sync *prev;
    int is_server;
    int is_initialized;
    OPA_int_t count;
    MPID_Thread_cond_t cond;
} MPIR_Thread_sync_t;

typedef struct MPIR_Thread_sync_list {
    MPIR_Thread_sync_t *head;
} MPIR_Thread_sync_list_t;

extern MPIR_Thread_sync_list_t sync_wait_list;
extern OPA_int_t num_server_thread;
#endif

/* ------------------------------------------------------------------------- */
/* thread-local storage macros */
/* arbitrary, just needed to avoid cleaning up heap allocated memory at thread
 * destruction time */
#define MPIR_STRERROR_BUF_SIZE (1024)

/* This structure contains all thread-local variables and will be zeroed at
 * allocation time.
 *
 * Note that any pointers to dynamically allocated memory stored in this
 * structure must be externally cleaned up.
 * */
typedef struct {
    int op_errno;               /* For errors in predefined MPI_Ops */

    /* error string storage for MPIR_Strerror */
    char strerrbuf[MPIR_STRERROR_BUF_SIZE];

#if (MPICH_THREAD_LEVEL == MPI_THREAD_MULTIPLE)
    int lock_depth;

#ifdef MPICH_THREAD_USE_MDTA
    MPIR_Thread_sync_t sync;
#endif
#endif
} MPIR_Per_thread_t;

#if defined(MPICH_IS_THREADED) && defined(MPL_TLS)
extern MPL_TLS MPIR_Per_thread_t MPIR_Per_thread;
#else
extern MPIR_Per_thread_t MPIR_Per_thread;
#endif

extern MPID_Thread_tls_t MPIR_Per_thread_key;

#if defined(MPICH_IS_THREADED)
#define MPIR_THREAD_CHECK_BEGIN if (MPIR_ThreadInfo.isThreaded) {
#define MPIR_THREAD_CHECK_END   }
#else
#define MPIR_THREAD_CHECK_BEGIN
#define MPIR_THREAD_CHECK_END
#endif /* MPICH_IS_THREADED */

#if MPICH_THREAD_GRANULARITY == MPICH_THREAD_GRANULARITY__GLOBAL || \
    MPICH_THREAD_GRANULARITY == MPICH_THREAD_GRANULARITY__POBJ   || \
    MPICH_THREAD_GRANULARITY == MPICH_THREAD_GRANULARITY__VNI
extern MPID_Thread_mutex_t MPIR_THREAD_GLOBAL_ALLFUNC_MUTEX;
#endif

#if MPICH_THREAD_GRANULARITY == MPICH_THREAD_GRANULARITY__POBJ
extern MPID_Thread_mutex_t MPIR_THREAD_POBJ_HANDLE_MUTEX;
extern MPID_Thread_mutex_t MPIR_THREAD_POBJ_MSGQ_MUTEX;
extern MPID_Thread_mutex_t MPIR_THREAD_POBJ_COMPLETION_MUTEX;
extern MPID_Thread_mutex_t MPIR_THREAD_POBJ_CTX_MUTEX;
extern MPID_Thread_mutex_t MPIR_THREAD_POBJ_PMI_MUTEX;

#define MPIR_THREAD_POBJ_COMM_MUTEX(_comm_ptr) _comm_ptr->mutex
#define MPIR_THREAD_POBJ_WIN_MUTEX(_win_ptr)   _win_ptr->mutex
#endif

#ifdef MPICH_THREAD_USE_MDTA

MPL_STATIC_INLINE_PREFIX void MPIR_Thread_sync_signal(MPIR_Thread_sync_t * sync, const int force)
{
    int rc;
    if (force) {
        MPID_Thread_cond_signal(&sync->cond, &rc);
        return;
    }

    OPA_decr_int(&(sync->count));
    if (OPA_load_int(&(sync->count)) == 0) {
        MPID_Thread_cond_signal(&sync->cond, &rc);
    }
}

MPL_STATIC_INLINE_PREFIX void MPIR_Thread_sync_alloc(MPIR_Thread_sync_t ** sync, const int count)
{
    int rc;
    MPIR_Per_thread_t *per_thread = NULL;
    MPID_THREADPRIV_KEY_GET_ADDR(MPIR_ThreadInfo.isThreaded, MPIR_Per_thread_key,
                                 MPIR_Per_thread, per_thread, &rc);
    *sync = &per_thread->sync;
    (*sync)->is_server = FALSE;
    OPA_store_int(&((*sync)->count), count);
    if (unlikely(!(*sync)->is_initialized)) {
        MPID_Thread_cond_create(&((*sync)->cond), &rc);
        (*sync)->is_initialized = TRUE;
    }
}

MPL_STATIC_INLINE_PREFIX void MPIR_Thread_sync_free(MPIR_Thread_sync_t * sync)
{
    if (sync->is_server) {
        OPA_decr_int(&num_server_thread);
        sync->is_server = FALSE;
    }
    if (OPA_load_int(&num_server_thread) == 0 && sync_wait_list.head != NULL) {
        MPIR_Thread_sync_signal(sync_wait_list.head, /* force */ 1);
    }
}

MPL_STATIC_INLINE_PREFIX void MPIR_Thread_sync_wait(MPIR_Thread_sync_t * sync)
{
    int rc;

    if (!sync->is_server && OPA_load_int(&num_server_thread) == 0) {
        OPA_incr_int(&num_server_thread);
        sync->is_server = TRUE;
        return;
    }

    if (!sync->is_server && OPA_load_int(&sync->count) > 0) {
        DL_APPEND(sync_wait_list.head, sync);
        MPID_Thread_cond_wait(&sync->cond, &MPIR_THREAD_GLOBAL_ALLFUNC_MUTEX, &rc);
        DL_DELETE(sync_wait_list.head, sync);
    }
}

#endif /* MPICH_THREAD_USE_MDTA */

#if MPICH_THREAD_GRANULARITY == MPICH_THREAD_GRANULARITY__VNI
extern MPID_Thread_mutex_t MPIR_THREAD_POBJ_HANDLE_MUTEX;
#endif

#endif /* MPIR_THREAD_H_INCLUDED */