Blob Blame History Raw
/* -*- Mode: C; c-basic-offset:4 ; indent-tabs-mode:nil ; -*- */
/*
 *
 *  (C) 2006 by Argonne National Laboratory.
 *      See COPYRIGHT in top-level directory.
 */

/*
   Define macro to override gcc strict flags,
   -D_POSIX_C_SOURCE=199506L, -std=c89 and -std=c99,
   that disallow pthread_barrier_t and friends.
*/
#if defined(_POSIX_C_SOURCE) && _POSIX_C_SOURCE < 200112L
#undef _POSIX_C_SOURCE
#define _POSIX_C_SOURCE 200112L
#endif

#include "mpi.h"
#include <stdio.h>
#include <stdlib.h>
#include "mpitest.h"
#include "mpithreadtest.h"

/* This file provides a portability layer for using threads.  Currently,
   it supports POSIX threads (pthreads) and Windows threads.  Testing has
   been performed for pthreads.
 */

/* We remember all of the threads we create; this similifies terminating
   (joining) them. */
#ifndef MTEST_MAX_THREADS
#define MTEST_MAX_THREADS 16
#endif

static MTEST_THREAD_HANDLE threads[MTEST_MAX_THREADS];
/* access w/o a lock is broken, but "volatile" should help reduce the amount of
 * speculative loading/storing */
static volatile int nthreads = 0;

#ifdef HAVE_WINDOWS_H
int MTest_Start_thread(MTEST_THREAD_RETURN_TYPE(*fn) (void *p), void *arg)
{
    int errs = 0;
    if (nthreads >= MTEST_MAX_THREADS) {
        fprintf(stderr, "Too many threads already created: max is %d\n", MTEST_MAX_THREADS);
        return 1;
    }
    threads[nthreads] = CreateThread(NULL, 0, (LPTHREAD_START_ROUTINE) fn, (LPVOID) arg, 0, NULL);
    if (threads[nthreads] == NULL) {
        return GetLastError();
    } else {
        nthreads++;
    }
    return MTestReturnValue(errs);
}

int MTest_Join_threads(void)
{
    int i, err = 0;
    for (i = 0; i < nthreads; i++) {
        if (threads[i] != INVALID_HANDLE_VALUE) {
            if (WaitForSingleObject(threads[i], INFINITE) == WAIT_FAILED) {
                err = GetLastError();
                fprintf(stderr, "Error WaitForSingleObject(), err = %d\n", err);
            } else {
                CloseHandle(threads[i]);
            }
        }
    }
    nthreads = 0;
    return err;
}

int MTest_thread_lock_create(MTEST_THREAD_LOCK_TYPE * lock)
{
    int errs = 0;
    if (lock == NULL)
        return -1;

    /* Create an unnamed uninheritable mutex */
    *lock = CreateMutex(NULL, FALSE, NULL);
    if (*lock == NULL)
        return -1;

    return MTestReturnValue(errs);
}

int MTest_thread_lock(MTEST_THREAD_LOCK_TYPE * lock)
{
    int errs = 0;
    if (lock == NULL)
        return -1;

    /* Wait infinitely for the mutex */
    if (WaitForSingleObject(*lock, INFINITE) != WAIT_OBJECT_0) {
        return -1;
    }
    return MTestReturnValue(errs);
}

int MTest_thread_unlock(MTEST_THREAD_LOCK_TYPE * lock)
{
    int errs = 0;
    if (lock == NULL)
        return -1;
    if (ReleaseMutex(*lock) == 0) {
        return -1;
    }
    return MTestReturnValue(errs);
}

int MTest_thread_lock_free(MTEST_THREAD_LOCK_TYPE * lock)
{
    int errs = 0;
    if (lock != NULL) {
        if (CloseHandle(*lock) == 0) {
            return -1;
        }
    }
    return MTestReturnValue(errs);
}

#else
int MTest_Start_thread(MTEST_THREAD_RETURN_TYPE(*fn) (void *p), void *arg)
{
    int err;
    pthread_attr_t attr;
    pthread_attr_init(&attr);
    pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_JOINABLE);

    if (nthreads >= MTEST_MAX_THREADS) {
        fprintf(stderr, "Too many threads already created: max is %d\n", MTEST_MAX_THREADS);
        return 1;
    }
    err = pthread_create(threads + nthreads, &attr, fn, arg);
    if (!err) {
        nthreads++;
    }
    pthread_attr_destroy(&attr);
    return err;
}

int MTest_Join_threads(void)
{
    int i, rc, err = 0;
    for (i = 0; i < nthreads; i++) {
        rc = pthread_join(threads[i], 0);
        if (rc)
            err = rc;
    }
    nthreads = 0;
    return err;
}

int MTest_thread_lock_create(MTEST_THREAD_LOCK_TYPE * lock)
{
    int err;
    err = pthread_mutex_init(lock, NULL);
    if (err) {
        perror("Failed to initialize lock:");
    }
    return err;
}

int MTest_thread_lock(MTEST_THREAD_LOCK_TYPE * lock)
{
    int err;
    err = pthread_mutex_lock(lock);
    if (err) {
        perror("Failed to acquire lock:");
    }
    return err;
}

int MTest_thread_unlock(MTEST_THREAD_LOCK_TYPE * lock)
{
    int err;
    err = pthread_mutex_unlock(lock);
    if (err) {
        perror("Failed to release lock:");
    }
    return err;
}

int MTest_thread_lock_free(MTEST_THREAD_LOCK_TYPE * lock)
{
    int err;
    err = pthread_mutex_destroy(lock);
    if (err) {
        perror("Failed to free lock:");
    }
    return err;
}
#endif

#if defined(HAVE_PTHREAD_H) && defined(HAVE_PTHREAD_BARRIER_INIT)
static MTEST_THREAD_LOCK_TYPE barrierLock;
static pthread_barrier_t barrier;
static int bcount = -1;
int MTest_thread_barrier_init(void)
{
    bcount = -1;        /* must reset to force barrier re-creation */
    return MTest_thread_lock_create(&barrierLock);
}

int MTest_thread_barrier_free(void)
{
    MTest_thread_lock_free(&barrierLock);
    return pthread_barrier_destroy(&barrier);
}

/* FIXME this barrier interface should be changed to more closely match the
 * pthread interface.  Specifically, nt should not be a barrier-time
 * parameter but an init-time parameter.  The double-checked locking below
 * isn't valid according to pthreads, and it isn't guaranteed to be robust
 * in the presence of aggressive CPU/compiler optimization. */
int MTest_thread_barrier(int nt)
{
    int err;
    if (nt < 0)
        nt = nthreads;
    if (bcount != nt) {
        /* One thread needs to initialize the barrier */
        MTest_thread_lock(&barrierLock);
        /* Test again in case another thread already fixed the problem */
        if (bcount != nt) {
            if (bcount > 0) {
                err = pthread_barrier_destroy(&barrier);
                if (err)
                    return err;
            }
            err = pthread_barrier_init(&barrier, NULL, nt);
            if (err)
                return err;
            bcount = nt;
        }
        err = MTest_thread_unlock(&barrierLock);
        if (err)
            return err;
    }
    return pthread_barrier_wait(&barrier);
}
#else
static MTEST_THREAD_LOCK_TYPE barrierLock;
static volatile int phase = 0;
static volatile int c[2] = { -1, -1 };

int MTest_thread_barrier_init(void)
{
    return MTest_thread_lock_create(&barrierLock);
}

int MTest_thread_barrier_free(void)
{
    return MTest_thread_lock_free(&barrierLock);
}

/* This is a generic barrier implementation.  To ensure that tests don't
   silently fail, this both prints an error message and returns an error
   result on any failure. */
int MTest_thread_barrier(int nt)
{
    volatile int *cntP;
    int err = 0;

    if (nt < 0)
        nt = nthreads;
    /* Force a write barrier by using lock/unlock */
    err = MTest_thread_lock(&barrierLock);
    if (err) {
        fprintf(stderr, "Lock failed in barrier!\n");
        return err;
    }
    cntP = &c[phase];
    err = MTest_thread_unlock(&barrierLock);
    if (err) {
        fprintf(stderr, "Unlock failed in barrier!\n");
        return err;
    }

    /* printf("[%d] cnt = %d, phase = %d\n", pthread_self(), *cntP, phase); */
    err = MTest_thread_lock(&barrierLock);
    if (err) {
        fprintf(stderr, "Lock failed in barrier!\n");
        return err;
    }
    /* The first thread to enter will reset the counter */
    if (*cntP < 0)
        *cntP = nt;
    /* printf("phase = %d, cnt = %d\n", phase, *cntP); */
    /* The last thread to enter will force the counter to be negative */
    if (*cntP == 1) {
        /* printf("[%d] changing phase from %d\n", pthread_self(), phase); */
        phase = !phase;
        c[phase] = -1;
        *cntP = 0;
    }
    /* Really need a write barrier here */
    *cntP = *cntP - 1;
    err = MTest_thread_unlock(&barrierLock);
    if (err) {
        fprintf(stderr, "Unlock failed in barrier!\n");
        return err;
    }
    while (*cntP > 0);

    return err;
}
#endif /* Default barrier routine */