Blob Blame History Raw
/* -*- Mode: C; c-basic-offset:4 ; indent-tabs-mode:nil ; -*- */
/*
 *
 *  (C) 2006 by Argonne National Laboratory.
 *      See COPYRIGHT in top-level directory.
 */

/* Test of MPI_Comm_join.  This should work even when each process
 * is started as a singleton process.
 *
 * To run
 *
 * Start the server:
 *
 * ./singjoin -s -p 54321
 *
 * In a separate shell, start the client:
 *
 * ./singjoin -c -p 54321
 *
 * where "54321" is some available port number.
 */

/* Include mpitestconf to determine if we can use h_addr or need h_addr_list */
#include "mpitestconf.h"
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#ifndef HAVE_WINDOWS_H
#include <strings.h>
#include <unistd.h>
#include <sys/types.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <netdb.h>
#else
#include <windows.h>
#endif
#include <mpi.h>

#ifndef HAVE_WINDOWS_H
#define SOCKET_FD_TYPE int
#define INVALID_SOCKET_FD -1
#else
#define SOCKET_FD_TYPE  SOCKET
#define INVALID_SOCKET_FD INVALID_SOCKET
#define bcopy(s1, s2, n)    memmove(s2, s1, n)
#endif


int parse_args(int argc, char **argv);
void usage(char *name);
SOCKET_FD_TYPE server_routine(int portnum);
SOCKET_FD_TYPE client_routine(int portnum);


int is_server = 0;
int is_client = 0;
int opt_port = 0;

/* open a socket, establish a connection with a client, and return the fd */
SOCKET_FD_TYPE server_routine(int portnum)
{
    SOCKET_FD_TYPE listenfd, peer_fd;
    int ret, peer_addr_size;
    struct sockaddr_in server_addr, peer_addr;

    listenfd = socket(AF_INET, SOCK_STREAM, 0);
    if (listenfd == INVALID_SOCKET_FD) {
        perror("server socket");
        return -1;
    }

    memset(&server_addr, 0, sizeof(server_addr));
    server_addr.sin_family = AF_INET;
    server_addr.sin_addr.s_addr = INADDR_ANY;
    server_addr.sin_port = htons(portnum);
    ret = bind(listenfd, (struct sockaddr *) &server_addr, sizeof(server_addr));
    if (ret < 0) {
        perror("server binding");
        return -1;
    }

    ret = listen(listenfd, 1024);
    if (ret < 0) {
        perror("server listen");
        return -1;
    }

    peer_addr_size = sizeof(peer_addr);
    peer_fd = accept(listenfd, (struct sockaddr *) &peer_addr, &peer_addr_size);

    if (peer_fd < 0) {
        perror("server accept");
        return -1;
    }
    return peer_fd;
}

/* open a socket, establish a connection with a server, and return the fd */
SOCKET_FD_TYPE client_routine(int portnum)
{
    SOCKET_FD_TYPE sockfd;
    struct sockaddr_in server_addr;
    struct hostent *server_ent;
    char hostname[MPI_MAX_PROCESSOR_NAME];
    int hostnamelen;

    sockfd = socket(AF_INET, SOCK_STREAM, 0);
    if (sockfd == INVALID_SOCKET_FD) {
        perror("client socket");
        return -1;
    }

    /* Try to get the processor name from MPI */
    MPI_Get_processor_name(hostname, &hostnamelen);
    server_ent = gethostbyname(hostname);
    if (server_ent == NULL) {
        fprintf(stderr, "gethostbyname fails\n");
        return -1;
    }

    memset(&server_addr, 0, sizeof(server_addr));
    server_addr.sin_family = AF_INET;
    server_addr.sin_port = htons(portnum);
    /* POSIX might define h_addr_list only and not define h_addr */
#ifdef HAVE_H_ADDR_LIST
    bcopy(server_ent->h_addr_list[0], (char *) &server_addr.sin_addr.s_addr, server_ent->h_length);
#else
    bcopy(server_ent->h_addr, (char *) &server_addr.sin_addr.s_addr, server_ent->h_length);
#endif

    if (connect(sockfd, (struct sockaddr *) &server_addr, sizeof(server_addr)) < 0) {
        perror("client connect");
        return -1;
    }
    return sockfd;
}

void usage(char *name)
{
    fprintf(stderr, "usage: %s [-s|-c] -p <PORT>\n", name);
    fprintf(stderr, "      specify one and only one of -s or -c\n");
    fprintf(stderr, "      port is mandatory\n");
}

#ifdef HAVE_WINDOWS_H
static int get_option(int argc, char **argv, int arg_cnt)
{
    if (argv == NULL) {
        return -1;
    }
    if (*argv == NULL) {
        return -1;
    }
    if (arg_cnt < 0 || arg_cnt >= argc) {
        return -1;
    }
    if (strlen(argv[arg_cnt]) < 2) {
        return -1;
    }
    return (*(argv[arg_cnt] + 1));
}
#endif

int parse_args(int argc, char **argv)
{
#ifndef HAVE_WINDOWS_H
    int c;
    extern char *optarg;
    while ((c = getopt(argc, argv, "csp:")) != -1) {
        switch (c) {
        case 's':
            is_server = 1;
            break;
        case 'c':
            is_client = 1;
            break;
        case 'p':
            opt_port = atoi(optarg);
            break;
        case '?':
        case ':':
        default:
            usage(argv[0]);
            return -1;
        }
    }
    if ((is_client == 0) && (is_server == 0)) {
        usage(argv[0]);
        return -1;
    }
    if (opt_port == 0) {
        usage(argv[0]);
        return -1;
    }
    return 0;
#else
    int arg_cnt = 1;
    int c;
    while ((c = get_option(argc, argv, arg_cnt++)) > 0) {
        switch (c) {
        case 's':
            is_server = 1;
            break;
        case 'c':
            is_client = 1;
            break;
        case 'p':
            opt_port = atoi(argv[arg_cnt++]);
            break;
        default:
            usage(argv[0]);
            return -1;
        }
    }
    if ((is_client == 0) && (is_server == 0)) {
        usage(argv[0]);
        return -1;
    }
    if (opt_port == 0) {
        usage(argv[0]);
        return -1;
    }
    return 0;
#endif
}


int main(int argc, char **argv)
{
    int rank, size;
    SOCKET_FD_TYPE fd = INVALID_SOCKET_FD;
    MPI_Comm intercomm, intracomm;

    MPI_Init(&argc, &argv);
    MPI_Comm_size(MPI_COMM_WORLD, &size);
    if (size != 1) {
        fprintf(stderr, "This test requires that only one process be in each comm_world\n");
        MPI_Abort(MPI_COMM_WORLD, 1);
    }
    if (parse_args(argc, argv) == -1) {
        fprintf(stderr, "Unable to parse the command line arguments\n");
        MPI_Abort(MPI_COMM_WORLD, 1);
    }

    if (is_server) {
        fd = server_routine(opt_port);

    }
    else if (is_client) {
        fd = client_routine(opt_port);
    }

    if (fd == INVALID_SOCKET_FD) {
        return -1;
    }

#ifdef SINGLETON_KICK
/* #warning isn't standard C, so we comment out this directive */
/* #warning  using singleton workaround */
    {
        int *usize, aflag;
        MPI_Comm_get_attr(MPI_COMM_WORLD, MPI_UNIVERSE_SIZE, &usize, &aflag);
    }
#endif

    MPI_Comm_join(fd, &intercomm);

    if (is_server) {
        MPI_Send(MPI_BOTTOM, 0, MPI_INT, 0, 0, intercomm);
    }
    else {
        MPI_Recv(MPI_BOTTOM, 0, MPI_INT, 0, 0, intercomm, MPI_STATUS_IGNORE);
        printf("Completed receive on intercomm\n");
        fflush(stdout);
    }

    MPI_Intercomm_merge(intercomm, 0, &intracomm);
    MPI_Comm_rank(intracomm, &rank);
    MPI_Comm_size(intracomm, &size);

    printf("[%d/%d] after Intercomm_merge\n", rank, size);

    MPI_Comm_free(&intracomm);
    MPI_Comm_disconnect(&intercomm);
    MPI_Finalize();
    return 0;
}