Blame test/mpi/rma/lockall_dt_flushlocal.c

Packit 0848f5
/* -*- Mode: C; c-basic-offset:4 ; indent-tabs-mode:nil ; -*- */
Packit 0848f5
/*
Packit 0848f5
 *
Packit 0848f5
 *  (C) 2015 by Argonne National Laboratory.
Packit 0848f5
 *      See COPYRIGHT in top-level directory.
Packit 0848f5
 */
Packit 0848f5
#include "mpi.h"
Packit 0848f5
#include <stdio.h>
Packit 0848f5
#include <stdlib.h>
Packit 0848f5
#include <string.h>
Packit 0848f5
#include "mpitest.h"
Packit 0848f5
Packit 0848f5
/*
Packit 0848f5
static char MTEST_Descrip[] = "Test for streaming ACC-like operations with lock_all+flush_local";
Packit 0848f5
*/
Packit 0848f5
Packit 0848f5
int main(int argc, char *argv[])
Packit 0848f5
{
Packit 0848f5
    int errs = 0;
Packit 0848f5
    int rank, size;
Packit 0848f5
    int minsize = 2, count;
Packit 0848f5
    MPI_Comm comm;
Packit 0848f5
    MPI_Win win;
Packit 0848f5
    MPI_Aint lb, extent;
Packit 0848f5
    MTestDatatype sendtype, recvtype;
Packit 0848f5
Packit 0848f5
    MTest_Init(&argc, &argv);
Packit 0848f5
Packit 0848f5
    while (MTestGetIntracommGeneral(&comm, minsize, 1)) {
Packit 0848f5
        if (comm == MPI_COMM_NULL)
Packit 0848f5
            continue;
Packit 0848f5
Packit 0848f5
        MPI_Comm_rank(comm, &rank;;
Packit 0848f5
        MPI_Comm_size(comm, &size);
Packit 0848f5
        int source = 0;
Packit 0848f5
Packit 0848f5
        MTEST_DATATYPE_FOR_EACH_COUNT(count) {
Packit 0848f5
            while (MTestGetDatatypes(&sendtype, &recvtype, count)) {
Packit 0848f5
                recvtype.printErrors = 1;
Packit 0848f5
                recvtype.InitBuf(&recvtype);
Packit 0848f5
                MPI_Type_get_extent(recvtype.datatype, &lb, &extent);
Packit 0848f5
Packit 0848f5
                MPI_Win_create(recvtype.buf, lb + recvtype.count * extent,
Packit 0848f5
                               (int) extent, MPI_INFO_NULL, comm, &win);
Packit 0848f5
                if (rank == source) {
Packit 0848f5
                    int dest;
Packit 0848f5
                    MPI_Aint slb, sextent;
Packit 0848f5
                    MPI_Type_get_extent(sendtype.datatype, &slb, &sextent);
Packit 0848f5
                    sendtype.InitBuf(&sendtype);
Packit 0848f5
Packit 0848f5
                    MPI_Win_lock_all(0, win);
Packit 0848f5
Packit 0848f5
                    for (dest = 0; dest < size; dest++)
Packit 0848f5
                        if (dest != source) {
Packit 0848f5
                            MPI_Accumulate(sendtype.buf, sendtype.count,
Packit 0848f5
                                           sendtype.datatype, dest, 0,
Packit 0848f5
                                           recvtype.count, recvtype.datatype, MPI_REPLACE, win);
Packit 0848f5
                            MPI_Win_flush_local(dest, win);
Packit 0848f5
                        }
Packit 0848f5
                    /* reset the send buffer to test local completion */
Packit 0848f5
                    memset(sendtype.buf, 0, slb + sextent * sendtype.count);
Packit 0848f5
                    MPI_Win_unlock_all(win);
Packit 0848f5
                    MPI_Barrier(comm);
Packit 0848f5
Packit 0848f5
                    sendtype.InitBuf(&sendtype);
Packit 0848f5
                    char *resbuf = (char *) calloc(lb + extent * recvtype.count, sizeof(char));
Packit 0848f5
Packit 0848f5
                    /*wait for the destinations to finish checking and reinitializing the buffers */
Packit 0848f5
                    MPI_Barrier(comm);
Packit 0848f5
Packit 0848f5
                    MPI_Win_lock_all(0, win);
Packit 0848f5
                    for (dest = 0; dest < size; dest++)
Packit 0848f5
                        if (dest != source) {
Packit 0848f5
                            MPI_Get_accumulate(sendtype.buf, sendtype.count,
Packit 0848f5
                                               sendtype.datatype, resbuf, recvtype.count,
Packit 0848f5
                                               recvtype.datatype, dest, 0, recvtype.count,
Packit 0848f5
                                               recvtype.datatype, MPI_REPLACE, win);
Packit 0848f5
                            MPI_Win_flush_local(dest, win);
Packit 0848f5
                        }
Packit 0848f5
                    /* reset the send buffer to test local completion */
Packit 0848f5
                    memset(sendtype.buf, 0, slb + sextent * sendtype.count);
Packit 0848f5
                    MPI_Win_unlock_all(win);
Packit 0848f5
                    MPI_Barrier(comm);
Packit 0848f5
                    free(resbuf);
Packit 0848f5
                }
Packit 0848f5
                else {
Packit 0848f5
                    int err;
Packit 0848f5
                    MPI_Barrier(comm);
Packit 0848f5
                    MPI_Win_lock(MPI_LOCK_SHARED, rank, 0, win);
Packit 0848f5
                    err = MTestCheckRecv(0, &recvtype);
Packit 0848f5
                    if (err)
Packit 0848f5
                        errs++;
Packit 0848f5
                    recvtype.InitBuf(&recvtype);
Packit 0848f5
                    MPI_Win_unlock(rank, win);
Packit 0848f5
Packit 0848f5
                    /*signal the source that checking and reinitialization is done */
Packit 0848f5
                    MPI_Barrier(comm);
Packit 0848f5
Packit 0848f5
                    MPI_Barrier(comm);
Packit 0848f5
                    MPI_Win_lock(MPI_LOCK_SHARED, rank, 0, win);
Packit 0848f5
                    err = MTestCheckRecv(0, &recvtype);
Packit 0848f5
                    if (err)
Packit 0848f5
                        errs++;
Packit 0848f5
                    MPI_Win_unlock(rank, win);
Packit 0848f5
                }
Packit 0848f5
Packit 0848f5
                MPI_Win_free(&win);
Packit 0848f5
                MTestFreeDatatype(&sendtype);
Packit 0848f5
                MTestFreeDatatype(&recvtype);
Packit 0848f5
            }
Packit 0848f5
        }
Packit 0848f5
        MTestFreeComm(&comm);
Packit 0848f5
    }
Packit 0848f5
    MTest_Finalize(errs);
Packit 0848f5
    MPI_Finalize();
Packit 0848f5
    return 0;
Packit 0848f5
}