|
Packit |
0848f5 |
/* -*- Mode: C; c-basic-offset:4 ; indent-tabs-mode:nil ; -*- */
|
|
Packit |
0848f5 |
/*
|
|
Packit |
0848f5 |
*
|
|
Packit |
0848f5 |
* (C) 2013 by Argonne National Laboratory.
|
|
Packit |
0848f5 |
* See COPYRIGHT in top-level directory.
|
|
Packit |
0848f5 |
*/
|
|
Packit |
0848f5 |
|
|
Packit |
0848f5 |
/*
|
|
Packit |
0848f5 |
This program performs a short test of MPI_BSEND in a
|
|
Packit |
0848f5 |
multithreaded environment.
|
|
Packit |
0848f5 |
|
|
Packit |
0848f5 |
It starts a single receiver thread that expects
|
|
Packit |
0848f5 |
NUMSENDS messages and NUMSENDS sender threads, that
|
|
Packit |
0848f5 |
use MPI_Bsend to send a message of size MSGSIZE
|
|
Packit |
0848f5 |
to its right neigbour or rank 0 if (my_rank==comm_size-1), i.e.
|
|
Packit |
0848f5 |
target_rank = (my_rank+1)%size .
|
|
Packit |
0848f5 |
|
|
Packit |
0848f5 |
After all messages have been received, the
|
|
Packit |
0848f5 |
receiver thread prints a message, the threads
|
|
Packit |
0848f5 |
are joined into the main thread and the application
|
|
Packit |
0848f5 |
terminates.
|
|
Packit |
0848f5 |
*/
|
|
Packit |
0848f5 |
|
|
Packit |
0848f5 |
#include <stdio.h>
|
|
Packit |
0848f5 |
#include <stdlib.h>
|
|
Packit |
0848f5 |
#include <mpi.h>
|
|
Packit |
0848f5 |
#include <string.h>
|
|
Packit |
0848f5 |
#include "mpitest.h"
|
|
Packit |
0848f5 |
#include "mpithreadtest.h"
|
|
Packit |
0848f5 |
|
|
Packit |
0848f5 |
#define NUMSENDS 32
|
|
Packit |
0848f5 |
#define BUFSIZE 10000000
|
|
Packit |
0848f5 |
#define MSGSIZE 1024
|
|
Packit |
0848f5 |
|
|
Packit |
0848f5 |
int rank, size;
|
|
Packit |
0848f5 |
|
|
Packit |
0848f5 |
void *receiver(void *ptr)
|
|
Packit |
0848f5 |
{
|
|
Packit |
0848f5 |
int k;
|
|
Packit |
0848f5 |
char buf[MSGSIZE];
|
|
Packit |
0848f5 |
|
|
Packit |
0848f5 |
for (k = 0; k < NUMSENDS; k++)
|
|
Packit |
0848f5 |
MPI_Recv(buf, MSGSIZE, MPI_CHAR, MPI_ANY_SOURCE, MPI_ANY_TAG,
|
|
Packit |
0848f5 |
MPI_COMM_WORLD, MPI_STATUS_IGNORE);
|
|
Packit |
0848f5 |
|
|
Packit |
0848f5 |
return NULL;
|
|
Packit |
0848f5 |
}
|
|
Packit |
0848f5 |
|
|
Packit |
0848f5 |
|
|
Packit |
0848f5 |
void *sender_bsend(void *ptr)
|
|
Packit |
0848f5 |
{
|
|
Packit |
0848f5 |
char buffer[MSGSIZE];
|
|
Packit |
0848f5 |
MTEST_VG_MEM_INIT(buffer, MSGSIZE * sizeof(char));
|
|
Packit |
0848f5 |
MPI_Bsend(buffer, MSGSIZE, MPI_CHAR, (rank + 1) % size, 0, MPI_COMM_WORLD);
|
|
Packit |
0848f5 |
|
|
Packit |
0848f5 |
return NULL;
|
|
Packit |
0848f5 |
}
|
|
Packit |
0848f5 |
|
|
Packit |
0848f5 |
void *sender_ibsend(void *ptr)
|
|
Packit |
0848f5 |
{
|
|
Packit |
0848f5 |
char buffer[MSGSIZE];
|
|
Packit |
0848f5 |
MPI_Request req;
|
|
Packit |
0848f5 |
MTEST_VG_MEM_INIT(buffer, MSGSIZE * sizeof(char));
|
|
Packit |
0848f5 |
MPI_Ibsend(buffer, MSGSIZE, MPI_CHAR, (rank + 1) % size, 0, MPI_COMM_WORLD, &req;;
|
|
Packit |
0848f5 |
MPI_Wait(&req, MPI_STATUS_IGNORE);
|
|
Packit |
0848f5 |
|
|
Packit |
0848f5 |
return NULL;
|
|
Packit |
0848f5 |
}
|
|
Packit |
0848f5 |
|
|
Packit |
0848f5 |
void *sender_isend(void *ptr)
|
|
Packit |
0848f5 |
{
|
|
Packit |
0848f5 |
char buffer[MSGSIZE];
|
|
Packit |
0848f5 |
MPI_Request req;
|
|
Packit |
0848f5 |
MTEST_VG_MEM_INIT(buffer, MSGSIZE * sizeof(char));
|
|
Packit |
0848f5 |
MPI_Isend(buffer, MSGSIZE, MPI_CHAR, (rank + 1) % size, 0, MPI_COMM_WORLD, &req;;
|
|
Packit |
0848f5 |
MPI_Wait(&req, MPI_STATUS_IGNORE);
|
|
Packit |
0848f5 |
|
|
Packit |
0848f5 |
return NULL;
|
|
Packit |
0848f5 |
}
|
|
Packit |
0848f5 |
|
|
Packit |
0848f5 |
void *sender_send(void *ptr)
|
|
Packit |
0848f5 |
{
|
|
Packit |
0848f5 |
char buffer[MSGSIZE];
|
|
Packit |
0848f5 |
MTEST_VG_MEM_INIT(buffer, MSGSIZE * sizeof(char));
|
|
Packit |
0848f5 |
MPI_Send(buffer, MSGSIZE, MPI_CHAR, (rank + 1) % size, 0, MPI_COMM_WORLD);
|
|
Packit |
0848f5 |
|
|
Packit |
0848f5 |
return NULL;
|
|
Packit |
0848f5 |
}
|
|
Packit |
0848f5 |
|
|
Packit |
0848f5 |
int main(int argc, char *argv[])
|
|
Packit |
0848f5 |
{
|
|
Packit |
0848f5 |
|
|
Packit |
0848f5 |
int provided, i[2], k;
|
|
Packit |
0848f5 |
char *buffer, *ptr_dt;
|
|
Packit |
0848f5 |
buffer = (char *) malloc(BUFSIZE * sizeof(char));
|
|
Packit |
0848f5 |
MTEST_VG_MEM_INIT(buffer, BUFSIZE * sizeof(char));
|
|
Packit |
0848f5 |
MPI_Status status;
|
|
Packit |
0848f5 |
pthread_t receiver_thread, sender_thread[NUMSENDS];
|
|
Packit |
0848f5 |
pthread_attr_t attr;
|
|
Packit |
0848f5 |
MPI_Comm communicator;
|
|
Packit |
0848f5 |
int bs;
|
|
Packit |
0848f5 |
|
|
Packit |
0848f5 |
MPI_Init_thread(&argc, &argv, MPI_THREAD_MULTIPLE, &provided);
|
|
Packit |
0848f5 |
|
|
Packit |
0848f5 |
if (provided != MPI_THREAD_MULTIPLE) {
|
|
Packit |
0848f5 |
printf("Error\n");
|
|
Packit |
0848f5 |
MPI_Abort(911, MPI_COMM_WORLD);
|
|
Packit |
0848f5 |
}
|
|
Packit |
0848f5 |
|
|
Packit |
0848f5 |
MPI_Buffer_attach(buffer, BUFSIZE);
|
|
Packit |
0848f5 |
|
|
Packit |
0848f5 |
MPI_Comm_rank(MPI_COMM_WORLD, &rank;;
|
|
Packit |
0848f5 |
MPI_Comm_size(MPI_COMM_WORLD, &size);
|
|
Packit |
0848f5 |
MPI_Comm_dup(MPI_COMM_WORLD, &communicator); /* We do not use this communicator in this program, but
|
|
Packit |
0848f5 |
* with this call, the problem appears more reliably.
|
|
Packit |
0848f5 |
* If the MPI_Comm_dup() call is commented out, it is still
|
|
Packit |
0848f5 |
* evident but does not appear that often (don't know why) */
|
|
Packit |
0848f5 |
|
|
Packit |
0848f5 |
/* Initialize and set thread detached attribute */
|
|
Packit |
0848f5 |
pthread_attr_init(&attr);
|
|
Packit |
0848f5 |
pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_JOINABLE);
|
|
Packit |
0848f5 |
|
|
Packit |
0848f5 |
pthread_create(&receiver_thread, &attr, &receiver, NULL);
|
|
Packit |
0848f5 |
for (k = 0; k < NUMSENDS; k++)
|
|
Packit |
0848f5 |
pthread_create(&sender_thread[k], &attr, &sender_bsend, NULL);
|
|
Packit |
0848f5 |
pthread_join(receiver_thread, NULL);
|
|
Packit |
0848f5 |
for (k = 0; k < NUMSENDS; k++)
|
|
Packit |
0848f5 |
pthread_join(sender_thread[k], NULL);
|
|
Packit |
0848f5 |
MPI_Barrier(MPI_COMM_WORLD);
|
|
Packit |
0848f5 |
|
|
Packit |
0848f5 |
pthread_create(&receiver_thread, &attr, &receiver, NULL);
|
|
Packit |
0848f5 |
for (k = 0; k < NUMSENDS; k++)
|
|
Packit |
0848f5 |
pthread_create(&sender_thread[k], &attr, &sender_ibsend, NULL);
|
|
Packit |
0848f5 |
pthread_join(receiver_thread, NULL);
|
|
Packit |
0848f5 |
for (k = 0; k < NUMSENDS; k++)
|
|
Packit |
0848f5 |
pthread_join(sender_thread[k], NULL);
|
|
Packit |
0848f5 |
MPI_Barrier(MPI_COMM_WORLD);
|
|
Packit |
0848f5 |
|
|
Packit |
0848f5 |
pthread_create(&receiver_thread, &attr, &receiver, NULL);
|
|
Packit |
0848f5 |
for (k = 0; k < NUMSENDS; k++)
|
|
Packit |
0848f5 |
pthread_create(&sender_thread[k], &attr, &sender_isend, NULL);
|
|
Packit |
0848f5 |
pthread_join(receiver_thread, NULL);
|
|
Packit |
0848f5 |
for (k = 0; k < NUMSENDS; k++)
|
|
Packit |
0848f5 |
pthread_join(sender_thread[k], NULL);
|
|
Packit |
0848f5 |
MPI_Barrier(MPI_COMM_WORLD);
|
|
Packit |
0848f5 |
|
|
Packit |
0848f5 |
pthread_create(&receiver_thread, &attr, &receiver, NULL);
|
|
Packit |
0848f5 |
for (k = 0; k < NUMSENDS; k++)
|
|
Packit |
0848f5 |
pthread_create(&sender_thread[k], &attr, &sender_send, NULL);
|
|
Packit |
0848f5 |
pthread_join(receiver_thread, NULL);
|
|
Packit |
0848f5 |
for (k = 0; k < NUMSENDS; k++)
|
|
Packit |
0848f5 |
pthread_join(sender_thread[k], NULL);
|
|
Packit |
0848f5 |
MPI_Barrier(MPI_COMM_WORLD);
|
|
Packit |
0848f5 |
|
|
Packit |
0848f5 |
pthread_attr_destroy(&attr);
|
|
Packit |
0848f5 |
if (!rank)
|
|
Packit |
0848f5 |
printf(" No Errors\n");
|
|
Packit |
0848f5 |
|
|
Packit |
0848f5 |
MPI_Comm_free(&communicator);
|
|
Packit |
0848f5 |
MPI_Buffer_detach(&ptr_dt, &bs);
|
|
Packit |
0848f5 |
free(buffer);
|
|
Packit |
0848f5 |
MPI_Finalize();
|
|
Packit |
0848f5 |
return 0;
|
|
Packit |
0848f5 |
}
|