/* -*- Mode: C; c-basic-offset:4 ; indent-tabs-mode:nil ; -*- */
/*
* (C) 2006 by Argonne National Laboratory.
* See COPYRIGHT in top-level directory.
*
* Portions of this code were written by Intel Corporation.
* Copyright (C) 2011-2016 Intel Corporation. Intel provides this material
* to Argonne National Laboratory subject to Software Grant and Corporate
* Contributor License Agreement dated February 8, 2012.
*/
#ifndef CH4_SPAWN_H_INCLUDED
#define CH4_SPAWN_H_INCLUDED
#include "ch4_impl.h"
/*
=== BEGIN_MPI_T_CVAR_INFO_BLOCK ===
cvars:
- name : MPIR_CVAR_CH4_COMM_CONNECT_TIMEOUT
category : CH4
type : int
default : 180
class : none
verbosity : MPI_T_VERBOSITY_USER_BASIC
scope : MPI_T_SCOPE_GROUP_EQ
description : >-
The default time out period in seconds for a connection attempt to the
server communicator where the named port exists but no pending accept.
User can change the value for a specified connection through its info
argument.
=== END_MPI_T_CVAR_INFO_BLOCK ===
*/
#if defined(USE_PMIX_API) || defined(USE_PMI2_API)
#undef FUNCNAME
#define FUNCNAME MPID_Comm_spawn_multiple
#undef FCNAME
#define FCNAME MPL_QUOTE(FUNCNAME)
MPL_STATIC_INLINE_PREFIX int MPID_Comm_spawn_multiple(int count,
char *commands[],
char **argvs[],
const int maxprocs[],
MPIR_Info * info_ptrs[],
int root,
MPIR_Comm * comm_ptr,
MPIR_Comm ** intercomm, int errcodes[])
{
MPIR_Assert(0);
}
#else
static inline int MPIDI_mpi_to_pmi_keyvals(MPIR_Info * info_ptr,
PMI_keyval_t ** kv_ptr, int *nkeys_ptr)
{
char key[MPI_MAX_INFO_KEY];
PMI_keyval_t *kv = 0;
int i, nkeys = 0, vallen, flag, mpi_errno = MPI_SUCCESS;
MPIR_FUNC_VERBOSE_STATE_DECL(MPID_STATE_MPIDI_MPI_TO_PMI_KEYVALS);
MPIR_FUNC_VERBOSE_ENTER(MPID_STATE_MPIDI_MPI_TO_PMI_KEYVALS);
if (!info_ptr || info_ptr->handle == MPI_INFO_NULL)
goto fn_exit;
MPIR_Info_get_nkeys_impl(info_ptr, &nkeys);
if (nkeys == 0)
goto fn_exit;
kv = (PMI_keyval_t *) MPL_malloc(nkeys * sizeof(PMI_keyval_t), MPL_MEM_BUFFER);
for (i = 0; i < nkeys; i++) {
mpi_errno = MPIR_Info_get_nthkey_impl(info_ptr, i, key);
if (mpi_errno)
MPIR_ERR_POP(mpi_errno);
MPIR_Info_get_valuelen_impl(info_ptr, key, &vallen, &flag);
kv[i].key = (const char *) MPL_strdup(key);
kv[i].val = (char *) MPL_malloc(vallen + 1, MPL_MEM_BUFFER);
MPIR_Info_get_impl(info_ptr, key, vallen + 1, kv[i].val, &flag);
}
fn_exit:
*kv_ptr = kv;
*nkeys_ptr = nkeys;
MPIR_FUNC_VERBOSE_EXIT(MPID_STATE_MPIDI_MPI_TO_PMI_KEYVALS);
return mpi_errno;
fn_fail:
goto fn_exit;
}
static inline void MPIDI_free_pmi_keyvals(PMI_keyval_t ** kv, int size, int *counts)
{
int i, j;
MPIR_FUNC_VERBOSE_STATE_DECL(MPID_STATE_MPIDI_FREE_PMI_KEYVALS);
MPIR_FUNC_VERBOSE_ENTER(MPID_STATE_MPIDI_FREE_PMI_KEYVALS);
for (i = 0; i < size; i++) {
for (j = 0; j < counts[i]; j++) {
if (kv[i][j].key != NULL)
MPL_free((char *) kv[i][j].key);
if (kv[i][j].val != NULL)
MPL_free(kv[i][j].val);
}
if (kv[i] != NULL)
MPL_free(kv[i]);
}
MPIR_FUNC_VERBOSE_EXIT(MPID_STATE_MPIDI_FREE_PMI_KEYVALS);
}
#undef FUNCNAME
#define FUNCNAME MPID_Comm_spawn_multiple
#undef FCNAME
#define FCNAME MPL_QUOTE(FUNCNAME)
MPL_STATIC_INLINE_PREFIX int MPID_Comm_spawn_multiple(int count,
char *commands[],
char **argvs[],
const int maxprocs[],
MPIR_Info * info_ptrs[],
int root,
MPIR_Comm * comm_ptr,
MPIR_Comm ** intercomm, int errcodes[])
{
char port_name[MPI_MAX_PORT_NAME];
int *info_keyval_sizes = 0, i, mpi_errno = MPI_SUCCESS;
PMI_keyval_t **info_keyval_vectors = 0, preput_keyval_vector;
int *pmi_errcodes = 0, pmi_errno = 0;
int total_num_processes, should_accept = 1;
MPIR_FUNC_VERBOSE_STATE_DECL(MPID_STATE_MPID_COMM_SPAWN_MULTIPLE);
MPIR_FUNC_VERBOSE_ENTER(MPID_STATE_MPID_COMM_SPAWN_MULTIPLE);
memset(port_name, 0, sizeof(port_name));
if (comm_ptr->rank == root) {
total_num_processes = 0;
for (i = 0; i < count; i++)
total_num_processes += maxprocs[i];
pmi_errcodes = (int *) MPL_malloc(sizeof(int) * total_num_processes, MPL_MEM_BUFFER);
MPIR_ERR_CHKANDJUMP(!pmi_errcodes, mpi_errno, MPI_ERR_OTHER, "**nomem");
for (i = 0; i < total_num_processes; i++)
pmi_errcodes[i] = 0;
mpi_errno = MPID_Open_port(NULL, port_name);
if (mpi_errno)
MPIR_ERR_POP(mpi_errno);
info_keyval_sizes = (int *) MPL_malloc(count * sizeof(int), MPL_MEM_BUFFER);
MPIR_ERR_CHKANDJUMP(!info_keyval_sizes, mpi_errno, MPI_ERR_OTHER, "**nomem");
info_keyval_vectors =
(PMI_keyval_t **) MPL_malloc(count * sizeof(PMI_keyval_t *), MPL_MEM_BUFFER);
MPIR_ERR_CHKANDJUMP(!info_keyval_vectors, mpi_errno, MPI_ERR_OTHER, "**nomem");
if (!info_ptrs)
for (i = 0; i < count; i++) {
info_keyval_vectors[i] = 0;
info_keyval_sizes[i] = 0;
} else
for (i = 0; i < count; i++) {
mpi_errno = MPIDI_mpi_to_pmi_keyvals(info_ptrs[i],
&info_keyval_vectors[i],
&info_keyval_sizes[i]);
if (mpi_errno)
MPIR_ERR_POP(mpi_errno);
}
preput_keyval_vector.key = MPIDI_PARENT_PORT_KVSKEY;
preput_keyval_vector.val = port_name;
pmi_errno = PMI_Spawn_multiple(count, (const char **)
commands,
(const char ***) argvs,
maxprocs, info_keyval_sizes, (const PMI_keyval_t **)
info_keyval_vectors, 1, &preput_keyval_vector, pmi_errcodes);
if (pmi_errno != PMI_SUCCESS)
MPIR_ERR_SETANDJUMP1(mpi_errno, MPI_ERR_OTHER,
"**pmi_spawn_multiple", "**pmi_spawn_multiple %d", pmi_errno);
if (errcodes != MPI_ERRCODES_IGNORE) {
for (i = 0; i < total_num_processes; i++) {
errcodes[i] = pmi_errcodes[0];
should_accept = should_accept && errcodes[i];
}
should_accept = !should_accept;
}
}
if (errcodes != MPI_ERRCODES_IGNORE) {
MPIR_Errflag_t errflag = MPIR_ERR_NONE;
mpi_errno = MPIR_Bcast(&should_accept, 1, MPI_INT, root, comm_ptr, &errflag);
if (mpi_errno)
MPIR_ERR_POP(mpi_errno);
mpi_errno = MPIR_Bcast(&pmi_errno, 1, MPI_INT, root, comm_ptr, &errflag);
if (mpi_errno)
MPIR_ERR_POP(mpi_errno);
mpi_errno = MPIR_Bcast(&total_num_processes, 1, MPI_INT, root, comm_ptr, &errflag);
if (mpi_errno)
MPIR_ERR_POP(mpi_errno);
mpi_errno = MPIR_Bcast(errcodes, total_num_processes, MPI_INT, root, comm_ptr, &errflag);
if (mpi_errno)
MPIR_ERR_POP(mpi_errno);
}
if (should_accept) {
mpi_errno = MPID_Comm_accept(port_name, NULL, root, comm_ptr, intercomm);
if (mpi_errno)
MPIR_ERR_POP(mpi_errno);
} else {
if ((pmi_errno == PMI_SUCCESS) && (errcodes[0] != 0)) {
mpi_errno = MPIR_Comm_create(intercomm);
if (mpi_errno)
MPIR_ERR_POP(mpi_errno);
}
}
if (comm_ptr->rank == root) {
mpi_errno = MPID_Close_port(port_name);
if (mpi_errno)
MPIR_ERR_POP(mpi_errno);
}
fn_exit:
if (info_keyval_vectors) {
MPIDI_free_pmi_keyvals(info_keyval_vectors, count, info_keyval_sizes);
MPL_free(info_keyval_vectors);
}
if (info_keyval_sizes)
MPL_free(info_keyval_sizes);
if (pmi_errcodes)
MPL_free(pmi_errcodes);
MPIR_FUNC_VERBOSE_EXIT(MPID_STATE_MPID_COMM_SPAWN_MULTIPLE);
return mpi_errno;
fn_fail:
goto fn_exit;
}
#endif
#undef FUNCNAME
#define FUNCNAME MPID_Comm_connect
#undef FCNAME
#define FCNAME MPL_QUOTE(FUNCNAME)
MPL_STATIC_INLINE_PREFIX int MPID_Comm_connect(const char *port_name,
MPIR_Info * info,
int root, MPIR_Comm * comm, MPIR_Comm ** newcomm_ptr)
{
int mpi_errno = MPI_SUCCESS;
int timeout = MPIR_CVAR_CH4_COMM_CONNECT_TIMEOUT;
MPIR_FUNC_VERBOSE_STATE_DECL(MPID_STATE_MPID_COMM_CONNECT);
MPIR_FUNC_VERBOSE_ENTER(MPID_STATE_MPID_COMM_CONNECT);
if (info != NULL) {
int info_flag = 0;
char info_value[MPI_MAX_INFO_VAL + 1];
MPIR_Info_get_impl(info, "timeout", MPI_MAX_INFO_VAL, info_value, &info_flag);
if (info_flag) {
timeout = atoi(info_value);
}
}
mpi_errno = MPIDI_NM_mpi_comm_connect(port_name, info, root, timeout, comm, newcomm_ptr);
if (mpi_errno != MPI_SUCCESS) {
MPIR_ERR_POP(mpi_errno);
}
fn_exit:
MPIR_FUNC_VERBOSE_EXIT(MPID_STATE_MPID_COMM_CONNECT);
return mpi_errno;
fn_fail:
goto fn_exit;
}
#undef FUNCNAME
#define FUNCNAME MPID_Comm_disconnect
#undef FCNAME
#define FCNAME MPL_QUOTE(FUNCNAME)
MPL_STATIC_INLINE_PREFIX int MPID_Comm_disconnect(MPIR_Comm * comm_ptr)
{
int mpi_errno;
MPIR_FUNC_VERBOSE_STATE_DECL(MPID_STATE_MPID_COMM_DISCONNECT);
MPIR_FUNC_VERBOSE_ENTER(MPID_STATE_MPID_COMM_DISCONNECT);
mpi_errno = MPIDI_NM_mpi_comm_disconnect(comm_ptr);
if (mpi_errno != MPI_SUCCESS) {
MPIR_ERR_POP(mpi_errno);
}
fn_exit:
MPIR_FUNC_VERBOSE_EXIT(MPID_STATE_MPID_COMM_DISCONNECT);
return mpi_errno;
fn_fail:
goto fn_exit;
}
#undef FUNCNAME
#define FUNCNAME MPID_Open_port
#undef FCNAME
#define FCNAME MPL_QUOTE(FUNCNAME)
MPL_STATIC_INLINE_PREFIX int MPID_Open_port(MPIR_Info * info_ptr, char *port_name)
{
int mpi_errno;
MPIR_FUNC_VERBOSE_STATE_DECL(MPID_STATE_MPID_OPEN_PORT);
MPIR_FUNC_VERBOSE_ENTER(MPID_STATE_MPID_OPEN_PORT);
mpi_errno = MPIDI_NM_mpi_open_port(info_ptr, port_name);
if (mpi_errno != MPI_SUCCESS) {
MPIR_ERR_POP(mpi_errno);
}
fn_exit:
MPIR_FUNC_VERBOSE_EXIT(MPID_STATE_MPID_OPEN_PORT);
return mpi_errno;
fn_fail:
goto fn_exit;
}
#undef FUNCNAME
#define FUNCNAME MPID_Close_port
#undef FCNAME
#define FCNAME MPL_QUOTE(FUNCNAME)
MPL_STATIC_INLINE_PREFIX int MPID_Close_port(const char *port_name)
{
int mpi_errno;
MPIR_FUNC_VERBOSE_STATE_DECL(MPID_STATE_MPID_CLOSE_PORT);
MPIR_FUNC_VERBOSE_ENTER(MPID_STATE_MPID_CLOSE_PORT);
mpi_errno = MPIDI_NM_mpi_close_port(port_name);
if (mpi_errno != MPI_SUCCESS) {
MPIR_ERR_POP(mpi_errno);
}
fn_exit:
MPIR_FUNC_VERBOSE_EXIT(MPID_STATE_MPID_CLOSE_PORT);
return mpi_errno;
fn_fail:
goto fn_exit;
}
#undef FUNCNAME
#define FUNCNAME MPID_Comm_accept
#undef FCNAME
#define FCNAME MPL_QUOTE(FUNCNAME)
MPL_STATIC_INLINE_PREFIX int MPID_Comm_accept(const char *port_name,
MPIR_Info * info,
int root, MPIR_Comm * comm, MPIR_Comm ** newcomm_ptr)
{
int mpi_errno;
MPIR_FUNC_VERBOSE_STATE_DECL(MPID_STATE_MPID_COMM_ACCEPT);
MPIR_FUNC_VERBOSE_ENTER(MPID_STATE_MPID_COMM_ACCEPT);
mpi_errno = MPIDI_NM_mpi_comm_accept(port_name, info, root, comm, newcomm_ptr);
if (mpi_errno != MPI_SUCCESS) {
MPIR_ERR_POP(mpi_errno);
}
fn_exit:
MPIR_FUNC_VERBOSE_EXIT(MPID_STATE_MPID_COMM_ACCEPT);
return mpi_errno;
fn_fail:
goto fn_exit;
}
#endif /* CH4_SPAWN_H_INCLUDED */