/* -*- Mode: C; c-basic-offset:4 ; indent-tabs-mode:nil ; -*- */ /* * See COPYRIGHT in top-level directory. */ #include #include #include #include #include "mpi.h" #define DEFAULT_NUM_THREADS 2 #define NUM_LOOP 10 #define BUF_SIZE 16384 #define HANDLE_ERROR(ret,msg) \ if (ret != ABT_SUCCESS) { \ fprintf(stderr, "ERROR[%d]: %s\n", ret, msg); \ exit(EXIT_FAILURE); \ } int rank; int size; int buf_size; int *send_buf; int *recv_buf; int num_loop; int sum; int verbose = 0; int profiling = 1; double t1, t2; #define FIELD_WIDTH 16 void thread_send_func(void *arg) { ABT_thread ult_self__; ABT_thread_self(&ult_self__); size_t my_id = (size_t) arg; if (verbose) printf("[rank %d, TH%lu]: send\n", rank, my_id); int i, j; for (i = 0; i < num_loop; i++) { for (j = 1; j < size; j++) { int dest = (rank + j) % size; send_buf[0] = rank; if (verbose) printf("[rank %d, TH%lu]: MPI_Send to %d\n", rank, my_id, dest); MPI_Send(send_buf, buf_size, MPI_INT, dest, 0, MPI_COMM_WORLD); } } } void thread_recv_func(void *arg) { ABT_thread ult_self__; ABT_thread_self(&ult_self__); size_t my_id = (size_t) arg; if (verbose) printf("[rank %d, TH%lu]: receive\n", rank, my_id); int i, j; for (i = 0; i < num_loop; i++) { for (j = 1; j < size; j++) { int source = (rank + size - j) % size; if (verbose) printf("[rank %d, TH%lu]: MPI_Recv from %d\n", rank, my_id, source); MPI_Recv(recv_buf, buf_size, MPI_INT, source, 0, MPI_COMM_WORLD, MPI_STATUS_IGNORE); sum += recv_buf[0]; } } } int main(int argc, char *argv[]) { int i, provided; int ret; int num_threads = DEFAULT_NUM_THREADS; if (argc > 1) num_threads = atoi(argv[1]); if (num_threads % 2 != 0) { if (rank == 0) printf("number of user level threads should be even\n"); exit(0); } assert(num_threads >= 0); num_loop = NUM_LOOP; if (argc > 2) num_loop = atoi(argv[2]); assert(num_loop >= 0); buf_size = BUF_SIZE; if (argc > 3) buf_size = atoi(argv[3]); assert(buf_size >= 0); /* Initialize */ ret = ABT_init(argc, argv); if (ret != ABT_SUCCESS) { printf("Failed to initialize Argobots\n"); return EXIT_FAILURE; } ret = MPI_Init_thread(&argc, &argv, MPI_THREAD_MULTIPLE, &provided); if (provided != MPI_THREAD_MULTIPLE) { printf("Cannot initialize with MPI_THREAD_MULTIPLE\n"); return EXIT_FAILURE; } send_buf = (int *) malloc(buf_size * sizeof(int)); recv_buf = (int *) malloc(buf_size * sizeof(int)); MPI_Comm_rank(MPI_COMM_WORLD, &rank); MPI_Comm_size(MPI_COMM_WORLD, &size); if (profiling) t1 = MPI_Wtime(); ABT_xstream xstream; ABT_pool pools; ABT_thread *threads = (ABT_thread *) malloc(num_threads * sizeof(ABT_thread)); ABT_xstream_self(&xstream); ABT_xstream_get_main_pools(xstream, 1, &pools); for (i = 0; i < num_threads; i++) { size_t tid = i + 1; if (i % 2 == 0) { ret = ABT_thread_create(pools, thread_send_func, (void *) tid, ABT_THREAD_ATTR_NULL, &threads[i]); } else { ret = ABT_thread_create(pools, thread_recv_func, (void *) tid, ABT_THREAD_ATTR_NULL, &threads[i]); } HANDLE_ERROR(ret, "ABT_thread_create"); } /* Join and free the ULT children */ for (i = 0; i < num_threads; i++) ABT_thread_free(&threads[i]); if (profiling) { t2 = MPI_Wtime(); if (rank == 0) { fprintf(stdout, "%*s%*f\n", FIELD_WIDTH, "Time", FIELD_WIDTH, t2 - t1); } } /* Finalize */ free(threads); free(send_buf); free(recv_buf); MPI_Finalize(); ABT_finalize(); return EXIT_SUCCESS; }