Blob Blame History Raw
/* -*- Mode: C; c-basic-offset:4 ; indent-tabs-mode:nil ; -*- */
/*
 *  (C) 2001 by Argonne National Laboratory.
 *      See COPYRIGHT in top-level directory.
 */

#include "mpidimpl.h"
#ifdef USE_PMI2_API
#include "pmi2.h"
#else
#include "pmi.h"
#endif

#define MAX_JOBID_LEN 1024

/* FIXME: These routines need a description.  What is their purpose?  Who
   calls them and why?  What does each one do?
*/
static MPIDI_PG_t * MPIDI_PG_list = NULL;
static MPIDI_PG_t * MPIDI_PG_iterator_next = NULL;
static MPIDI_PG_Compare_ids_fn_t MPIDI_PG_Compare_ids_fn;
static MPIDI_PG_Destroy_fn_t MPIDI_PG_Destroy_fn;

/* Set verbose to 1 to record changes to the process group structure. */
static int verbose = 0;

/* Key track of the process group corresponding to the MPI_COMM_WORLD 
   of this process */
static MPIDI_PG_t *pg_world = NULL;

#define MPIDI_MAX_KVS_KEY_LEN      256

#undef FUNCNAME
#define FUNCNAME MPIDI_PG_Init
#undef FCNAME
#define FCNAME MPL_QUOTE(FUNCNAME)
int MPIDI_PG_Init(int *argc_p, char ***argv_p, 
		  MPIDI_PG_Compare_ids_fn_t compare_ids_fn, 
		  MPIDI_PG_Destroy_fn_t destroy_fn)
{
    int mpi_errno = MPI_SUCCESS;
    char *p;
    
    MPIDI_PG_Compare_ids_fn = compare_ids_fn;
    MPIDI_PG_Destroy_fn     = destroy_fn;

    /* Check for debugging options.  We use MPICHD_DBG and -mpichd-dbg 
       to avoid confusion with the code in src/util/dbg/dbg_printf.c */
    p = getenv( "MPICHD_DBG_PG" );
    if (p && ( strcmp( p, "YES" ) == 0 || strcmp( p, "yes" ) == 0) )
	verbose = 1;
    if (argc_p && argv_p) {
	int argc = *argc_p, i;
	char **argv = *argv_p;
        /* applied patch from Juha Jeronen, req #3920 */
	for (i=1; i<argc && argv[i]; i++) {
	    if (strcmp( "-mpichd-dbg-pg", argv[i] ) == 0) {
		int j;
		verbose = 1;
		for (j=i; j<argc-1; j++) {
		    argv[j] = argv[j+1];
		}
		argv[argc-1] = NULL;
		*argc_p = argc - 1;
		break;
	    }
	}
    }

    return mpi_errno;
}

#undef FUNCNAME
#define FUNCNAME MPIDI_PG_Finalize
#undef FCNAME
#define FCNAME MPL_QUOTE(FUNCNAME)
/*@ 
   MPIDI_PG_Finalize - Finalize the process groups, including freeing all
   process group structures
  @*/
int MPIDI_PG_Finalize(void)
{
    int mpi_errno = MPI_SUCCESS;
    MPIDI_PG_t *pg, *pgNext;
    MPIDI_STATE_DECL(MPID_STATE_MPIDI_PG_FINALIZE);

    MPIDI_FUNC_ENTER(MPID_STATE_MPIDI_PG_FINALIZE);

    /* Print the state of the process groups */
    if (verbose) {
	MPIU_PG_Printall( stdout );
    }

    /* FIXME - straighten out the use of PMI_Finalize - no use after 
       PG_Finalize */
    if (pg_world->connData) {
#ifdef USE_PMI2_API
        mpi_errno = PMI2_Finalize();
        if (mpi_errno) MPIR_ERR_SET(mpi_errno, MPI_ERR_OTHER, "**ch3|pmi_finalize");
#else
	int rc;
	rc = PMI_Finalize();
	if (rc) {
	    MPIR_ERR_SET1(mpi_errno,MPI_ERR_OTHER, 
			  "**ch3|pmi_finalize", 
			  "**ch3|pmi_finalize %d", rc);
	}
#endif
    }

    /* Free the storage associated with the process groups */
    pg = MPIDI_PG_list;
    while (pg) {
	pgNext = pg->next;
	
	/* In finalize, we free all process group information, even if
	   the ref count is not zero.  This can happen if the user
	   fails to use MPI_Comm_disconnect on communicators that
	   were created with the dynamic process routines.*/
        /* XXX DJG FIXME-MT should we be checking this? */
	if (MPIU_Object_get_ref(pg) == 0 || 1) {
	    if (pg == MPIDI_Process.my_pg)
		MPIDI_Process.my_pg = NULL;

	    MPIU_Object_set_ref(pg, 0); /* satisfy assertions in PG_Destroy */
	    MPIDI_PG_Destroy( pg );
	}
	pg     = pgNext;
    }

    /* If COMM_WORLD is still around (it normally should be), 
       try to free it here.  The reason that we need to free it at this 
       point is that comm_world (and comm_self) still exist, and 
       hence the usual process to free the related VC structures will
       not be invoked. */
    if (MPIDI_Process.my_pg) {
	MPIDI_PG_Destroy(MPIDI_Process.my_pg);
    } 
    MPIDI_Process.my_pg = NULL;

    /* ifdefing out this check because the list will not be NULL in 
       Ch3_finalize because
       one additional reference is retained in MPIDI_Process.my_pg. 
       That reference is released
       only after ch3_finalize returns. If I release it before ch3_finalize, 
       the ssm channel crashes. */
#if 0
    if (MPIDI_PG_list != NULL)
    { 
	
	/* --BEGIN ERROR HANDLING-- */
	mpi_errno = MPIR_Err_create_code(MPI_SUCCESS, MPIR_ERR_FATAL, FCNAME, __LINE__, MPI_ERR_INTERN,
        "**dev|pg_finalize|list_not_empty", NULL); 
	/* --END ERROR HANDLING-- */
    }
#endif

    MPIDI_FUNC_EXIT(MPID_STATE_MPIDI_PG_FINALIZE);
    return mpi_errno;
}

/* FIXME: This routine needs to make it clear that the pg_id, for example
   is saved; thus, if the pg_id is a string, then that string is not 
   copied and must be freed by a PG_Destroy routine */

/* This routine creates a new process group description and appends it to 
   the list of the known process groups.  The pg_id is saved, not copied.
   The PG_Destroy routine that was set with MPIDI_PG_Init is responsible for
   freeing any storage associated with the pg_id. 

   The new process group is returned in pg_ptr 
*/
#undef FUNCNAME
#define FUNCNAME MPIDI_PG_Create
#undef FCNAME
#define FCNAME MPL_QUOTE(FUNCNAME)
int MPIDI_PG_Create(int vct_sz, void * pg_id, MPIDI_PG_t ** pg_ptr)
{
    MPIDI_PG_t * pg = NULL, *pgnext;
    int p;
    int mpi_errno = MPI_SUCCESS;
    MPIU_CHKPMEM_DECL(2);
    MPIDI_STATE_DECL(MPID_STATE_MPIDI_PG_CREATE);

    MPIDI_FUNC_ENTER(MPID_STATE_MPIDI_PG_CREATE);
    
    MPIU_CHKPMEM_MALLOC(pg,MPIDI_PG_t*,sizeof(MPIDI_PG_t),mpi_errno,"pg");
    MPIU_CHKPMEM_MALLOC(pg->vct,MPIDI_VC_t *,sizeof(MPIDI_VC_t)*vct_sz,
			mpi_errno,"pg->vct");

    if (verbose) {
	fprintf( stdout, "Creating a process group of size %d\n", vct_sz );
	fflush(stdout);
    }

    pg->handle = 0;
    /* The reference count indicates the number of vc's that are or 
       have been in use and not disconnected. It starts at zero,
       except for MPI_COMM_WORLD. */
    MPIU_Object_set_ref(pg, 0);
    pg->size = vct_sz;
    pg->id   = pg_id;
    pg->finalize = 0;
    /* Initialize the connection information to null.  Use
       the appropriate MPIDI_PG_InitConnXXX routine to set up these 
       fields */
    pg->connData           = 0;
    pg->getConnInfo        = 0;
    pg->connInfoToString   = 0;
    pg->connInfoFromString = 0;
    pg->freeConnInfo       = 0;

    for (p = 0; p < vct_sz; p++)
    {
	/* Initialize device fields in the VC object */
	MPIDI_VC_Init(&pg->vct[p], pg, p);
    }

    /* We may first need to initialize the channel before calling the channel 
       VC init functions.  This routine may be a no-op; look in the 
       ch3_init.c file in each channel */
    MPIDI_CH3_PG_Init(pg);

    /* These are now done in MPIDI_VC_Init */
#if 0
    for (p = 0; p < vct_sz; p++)
    {
	/* Initialize the channel fields in the VC object */
	MPIDI_CH3_VC_Init( &pg->vct[p] );
    }
#endif

    /* The first process group is always the world group */
    if (!pg_world) { pg_world = pg; }

    /* Add pg's at the tail so that comm world is always the first pg */
    pg->next = 0;
    if (!MPIDI_PG_list)
    {
	MPIDI_PG_list = pg;
    }
    else
    {
	pgnext = MPIDI_PG_list; 
	while (pgnext->next)
	{
	    pgnext = pgnext->next;
	}
	pgnext->next = pg;
    }
    *pg_ptr = pg;
    
  fn_exit:
    MPIDI_FUNC_EXIT(MPID_STATE_MPIDI_PG_CREATE);
    return mpi_errno;
    
  fn_fail:
    MPIU_CHKPMEM_REAP();
    goto fn_exit;
}

#undef FUNCNAME
#define FUNCNAME MPIDI_PG_Destroy
#undef FCNAME
#define FCNAME MPL_QUOTE(FUNCNAME)
int MPIDI_PG_Destroy(MPIDI_PG_t * pg)
{
    MPIDI_PG_t * pg_prev;
    MPIDI_PG_t * pg_cur;
    int i;
    int mpi_errno = MPI_SUCCESS;
    MPIDI_STATE_DECL(MPID_STATE_MPIDI_PG_DESTROY);

    MPIDI_FUNC_ENTER(MPID_STATE_MPIDI_PG_DESTROY);

    MPIU_Assert(MPIU_Object_get_ref(pg) == 0);

    pg_prev = NULL;
    pg_cur = MPIDI_PG_list;
    while(pg_cur != NULL)
    {
	if (pg_cur == pg)
	{
	    if (MPIDI_PG_iterator_next == pg)
	    { 
		MPIDI_PG_iterator_next = MPIDI_PG_iterator_next->next;
	    }

            if (pg_prev == NULL)
                MPIDI_PG_list = pg->next; 
            else
                pg_prev->next = pg->next;

            MPIU_DBG_MSG_FMT(CH3_DISCONNECT, VERBOSE, (MPIU_DBG_FDEST, "destroying pg=%p pg->id=%s", pg, (char *)pg->id));

            for (i = 0; i < pg->size; ++i) {
                /* FIXME it would be good if we could make this assertion.
                   Unfortunately, either:
                   1) We're not being disciplined and some caller of this
                      function doesn't bother to manage all the refcounts
                      because he thinks he knows better.  Annoying, but not
                      strictly a bug.
		      (wdg - actually, that is a bug - managing the ref
		      counts IS required and missing one is a bug.)
                   2) There is a real bug lurking out there somewhere and we
                      just haven't hit it in the tests yet.  */
                /*MPIU_Assert(MPIU_Object_get_ref(pg->vct[i]) == 0);*/

                MPIU_DBG_MSG_FMT(CH3_DISCONNECT, VERBOSE, (MPIU_DBG_FDEST, "about to free pg->vct=%p which contains vc=%p", pg->vct, &pg->vct[i]));

                /* This used to be handled in MPIDI_VCRT_Release, but that was
                   not the right place to do this.  The VC should only be freed
                   when the PG that it belongs to is freed, not just when the
                   VC's refcount drops to zero. [goodell@ 2008-06-13] */
		/* In that case, the fact that the VC is in the PG should
		   increment the ref count - reflecting the fact that the
		   use in the PG constitutes a reference-count-incrementing
		   use.  Alternately, if the PG is able to recreate a VC, 
		   and can thus free unused (or idle) VCs, it should be allowed
		   to do so.  [wdg 2008-08-31] */
                mpi_errno = MPIDI_CH3_VC_Destroy(&(pg->vct[i]));
                if (mpi_errno) { MPIR_ERR_POP(mpi_errno); }
            }

	    MPIDI_PG_Destroy_fn(pg);
	    MPIU_Free(pg->vct);
	    if (pg->connData) {
		if (pg->freeConnInfo) {
		    (*pg->freeConnInfo)( pg );
		}
		else {
		    MPIU_Free(pg->connData);
		}
	    }
	    mpi_errno = MPIDI_CH3_PG_Destroy(pg);
	    MPIU_Free(pg);

	    goto fn_exit;
	}

	pg_prev = pg_cur;
	pg_cur = pg_cur->next;
    }

    /* PG not found if we got here */
    MPIR_ERR_SET1(mpi_errno,MPI_ERR_OTHER,
		  "**dev|pg_not_found", "**dev|pg_not_found %p", pg);

  fn_exit:
    MPIDI_FUNC_EXIT(MPID_STATE_MPIDI_PG_DESTROY);
    return mpi_errno;
  fn_fail:
    goto fn_exit;
}

#undef FUNCNAME
#define FUNCNAME MPIDI_PG_Find
#undef FCNAME
#define FCNAME MPL_QUOTE(FUNCNAME)
int MPIDI_PG_Find(void * id, MPIDI_PG_t ** pg_ptr)
{
    MPIDI_PG_t * pg;
    int mpi_errno = MPI_SUCCESS;
    MPIDI_STATE_DECL(MPID_STATE_MPIDI_PG_FIND);

    MPIDI_FUNC_ENTER(MPID_STATE_MPIDI_PG_FIND);
    
    pg = MPIDI_PG_list;
    while (pg != NULL)
    {
	if (MPIDI_PG_Compare_ids_fn(id, pg->id) != FALSE)
	{
	    *pg_ptr = pg;
	    goto fn_exit;
	}

	pg = pg->next;
    }

    *pg_ptr = NULL;

  fn_exit:
    MPIDI_FUNC_EXIT(MPID_STATE_MPIDI_PG_FIND);
    return mpi_errno;
}


#undef FUNCNAME
#define FUNCNAME MPIDI_PG_Id_compare
#undef FCNAME
#define FCNAME MPL_QUOTE(FUNCNAME)
int MPIDI_PG_Id_compare(void * id1, void *id2)
{
    return MPIDI_PG_Compare_ids_fn(id1, id2);
}

/* iter always points at the next element */
#undef FUNCNAME
#define FUNCNAME MPIDI_PG_Get_next
#undef FCNAME
#define FCNAME MPL_QUOTE(FUNCNAME)
int MPIDI_PG_Get_next(MPIDI_PG_iterator *iter, MPIDI_PG_t ** pg_ptr)
{
    *pg_ptr = (*iter);
    if ((*iter) != NULL) {
        (*iter) = (*iter)->next;
    }

    return MPI_SUCCESS;
}

#undef FUNCNAME
#define FUNCNAME MPIDI_PG_Has_next
#undef FCNAME
#define FCNAME MPL_QUOTE(FUNCNAME)
int MPIDI_PG_Has_next(MPIDI_PG_iterator *iter)
{
    return (*iter != NULL);
}

#undef FUNCNAME
#define FUNCNAME MPIDI_PG_Get_iterator
#undef FCNAME
#define FCNAME MPL_QUOTE(FUNCNAME)
int MPIDI_PG_Get_iterator(MPIDI_PG_iterator *iter)
{
    *iter = MPIDI_PG_list;
    return MPI_SUCCESS;
}

/* FIXME: What does DEV_IMPLEMENTS_KVS mean?  Why is it used?  Who uses 
   PG_To_string and why?  */

#ifdef MPIDI_DEV_IMPLEMENTS_KVS

/* PG_To_string is used in the implementation of connect/accept (and 
   hence in spawn) in ch3u_port.c */
/* Note: Allocated memory that is returned in str_ptr.  The user of
   this routine must free that data */
#undef FUNCNAME
#define FUNCNAME MPIDI_PG_To_string
#undef FCNAME
#define FCNAME MPL_QUOTE(FUNCNAME)
int MPIDI_PG_To_string(MPIDI_PG_t *pg_ptr, char **str_ptr, int *lenStr)
{
    int mpi_errno = MPI_SUCCESS;
    MPIDI_STATE_DECL(MPID_STATE_MPIDI_PG_TO_STRING);

    MPIDI_FUNC_ENTER(MPID_STATE_MPIDI_PG_TO_STRING);

    /* Replace this with the new string */
    if (pg_ptr->connInfoToString) {
	(*pg_ptr->connInfoToString)( str_ptr, lenStr, pg_ptr );
    }
    else {
	MPIR_ERR_SETANDJUMP(mpi_errno,MPI_ERR_INTERN,"**noConnInfoToString");
    }

    /*printf( "PgToString: Pg string is %s\n", *str_ptr ); fflush(stdout);*/
fn_exit:
    MPIDI_FUNC_EXIT(MPID_STATE_MPIDI_PG_TO_STRING);
    return mpi_errno;
fn_fail:
    goto fn_exit;
}

/* This routine takes a string description of a process group (created with 
   MPIDI_PG_To_string, usually on a different process) and returns a pointer to
   the matching process group.  If the group already exists, flag is set to 
   false.  If the group does not exist, it is created with MPIDI_PG_Create (and
   hence is added to the list of active process groups) and flag is set to 
   true.  In addition, the connection information is set up using the 
   information in the input string.
*/
#undef FUNCNAME
#define FUNCNAME MPIDI_PG_Create_from_string
#undef FCNAME
#define FCNAME MPL_QUOTE(FUNCNAME)
int MPIDI_PG_Create_from_string(const char * str, MPIDI_PG_t ** pg_pptr, 
				int *flag)
{
    int mpi_errno = MPI_SUCCESS;
    const char *p;
    int vct_sz;
    MPIDI_PG_t *existing_pg, *pg_ptr=0;
    MPIDI_STATE_DECL(MPID_STATE_MPIDI_PG_CREATE_FROM_STRING);

    MPIDI_FUNC_ENTER(MPID_STATE_MPIDI_PG_CREATE_FROM_STRING);

    /*printf( "PgCreateFromString: Creating pg from %s\n", str ); 
      fflush(stdout); */
    /* The pg_id is at the beginning of the string, so we can just pass
       it to the find routine */
    /* printf( "Looking for pg with id %s\n", str );fflush(stdout); */
    mpi_errno = MPIDI_PG_Find((void *)str, &existing_pg);
    if (mpi_errno) MPIR_ERR_POP(mpi_errno);

    if (existing_pg != NULL) {
	/* return the existing PG */
	*pg_pptr = existing_pg;
	*flag = 0;
	/* Note that the memory for the pg_id is freed in the exit */
	goto fn_exit;
    }
    *flag = 1;

    /* Get the size from the string */
    p = str;
    while (*p) p++; p++;
    vct_sz = atoi(p);

    mpi_errno = MPIDI_PG_Create(vct_sz, (void *)str, pg_pptr);
    if (mpi_errno != MPI_SUCCESS) {
	MPIR_ERR_POP(mpi_errno);
    }
    
    pg_ptr = *pg_pptr;
    pg_ptr->id = MPIU_Strdup( str );
    
    /* Set up the functions to use strings to manage connection information */
    MPIDI_PG_InitConnString( pg_ptr );
    (*pg_ptr->connInfoFromString)( str, pg_ptr );

fn_exit:
    MPIDI_FUNC_EXIT(MPID_STATE_MPIDI_PG_CREATE_FROM_STRING);
    return mpi_errno;
fn_fail:
    goto fn_exit;
}

#ifdef HAVE_CTYPE_H
/* Needed for isdigit */
#include <ctype.h>
#endif

/* Convert a process group id into a number.  This is a hash-based approach,
 * which has the potential for some collisions.  This is an alternative to the
 * previous approach that caused req#3930, which was to sum up the values of the
 * characters.  The summing approach worked OK when the id's were all similar
 * but with an incrementing prefix or suffix, but terrible for a 32 hex-character
 * UUID type of id.
 *
 * FIXME It would really be best if the PM could give us this value.
 */
void MPIDI_PG_IdToNum( MPIDI_PG_t *pg, int *id )
{
    const char *p = (const char *)pg->id;
    unsigned int pgid = 0;

    while (*p) {
        pgid += *p++;
        pgid += (pgid << 10);
        pgid ^= (pgid >> 6);
    }
    pgid += (pgid << 3);
    pgid ^= (pgid >> 11);
    pgid += (pgid << 15);

    /* restrict to 31 bits */
    *id = (pgid & 0x7fffffff);
}
#else
/* FIXME: This is a temporary hack for devices that do not define
   MPIDI_DEV_IMPLEMENTS_KVS
   FIXME: MPIDI_DEV_IMPLEMENTS_KVS should be removed
 */
void MPIDI_PG_IdToNum( MPIDI_PG_t *pg, int *id )
{
    *id = 0;
}
#endif

/*
 * Managing connection information for process groups
 * 
 *
 */

/* Setting a process's connection information 
   
   This is a collective call (for scalability) over all of the processes in 
   the same MPI_COMM_WORLD.
*/
#undef FUNCNAME
#define FUNCNAME MPIDI_PG_SetConnInfo
#undef FCNAME
#define FCNAME MPL_QUOTE(FUNCNAME)
int MPIDI_PG_SetConnInfo( int rank, const char *connString )
{
#ifdef USE_PMI2_API
    int mpi_errno = MPI_SUCCESS;
    int len;
    char key[PMI2_MAX_KEYLEN];
    MPIDI_STATE_DECL(MPID_STATE_MPIDI_PG_SetConnInfo);

    MPIDI_FUNC_ENTER(MPID_STATE_MPIDI_PG_SetConnInfo);

    len = MPL_snprintf(key, sizeof(key), "P%d-businesscard", rank);
    MPIR_ERR_CHKANDJUMP1(len < 0 || len > sizeof(key), mpi_errno, MPI_ERR_OTHER, "**snprintf", "**snprintf %d", len);

    mpi_errno = PMI2_KVS_Put(key, connString);
    if (mpi_errno) MPIR_ERR_POP(mpi_errno);

    mpi_errno = PMI2_KVS_Fence();
    if (mpi_errno) MPIR_ERR_POP(mpi_errno);
    
 fn_exit:
    MPIDI_FUNC_EXIT(MPID_STATE_MPIDI_PG_SetConnInfo);
    return mpi_errno;
 fn_fail:
    goto fn_exit;
#else
    int mpi_errno = MPI_SUCCESS;
    int pmi_errno;
    int len;
    char key[128];
    MPIDI_STATE_DECL(MPID_STATE_MPIDI_PG_SetConnInfo);

    MPIDI_FUNC_ENTER(MPID_STATE_MPIDI_PG_SetConnInfo);

    MPIU_Assert(pg_world->connData);
    
    len = MPL_snprintf(key, sizeof(key), "P%d-businesscard", rank);
    if (len < 0 || len > sizeof(key)) {
	MPIR_ERR_SETANDJUMP1(mpi_errno,MPI_ERR_OTHER, "**snprintf",
			     "**snprintf %d", len);
    }
    pmi_errno = PMI_KVS_Put(pg_world->connData, key, connString );
    if (pmi_errno != PMI_SUCCESS) {
	MPIR_ERR_SETANDJUMP1(mpi_errno,MPI_ERR_OTHER, "**pmi_kvs_put",
			     "**pmi_kvs_put %d", pmi_errno);
    }
    pmi_errno = PMI_KVS_Commit(pg_world->connData);
    if (pmi_errno != PMI_SUCCESS) {
	MPIR_ERR_SETANDJUMP1(mpi_errno,MPI_ERR_OTHER, "**pmi_kvs_commit",
			     "**pmi_kvs_commit %d", pmi_errno);
    }
    
    pmi_errno = PMI_Barrier();
    if (pmi_errno != PMI_SUCCESS) {
	MPIR_ERR_SETANDJUMP1(mpi_errno,MPI_ERR_OTHER, "**pmi_barrier",
			     "**pmi_barrier %d", pmi_errno);
    }
 fn_exit:
    MPIDI_FUNC_EXIT(MPID_STATE_MPIDI_PG_SetConnInfo);
    return mpi_errno;
 fn_fail:
    goto fn_exit;
#endif
}

/* For all of these routines, the format of the process group description
   that is created and used by the connTo/FromString routines is this:
   (All items are strings, terminated by null)

   process group id string
   sizeof process group (as string)
   conninfo for rank 0
   conninfo for rank 1
   ...

   The "conninfo for rank 0" etc. for the original (MPI_COMM_WORLD)
   process group are stored in the PMI_KVS space with the keys 
   p<rank>-businesscard .  

   Fixme: Add a routine to publish the connection info to this file so that
   the key for the businesscard is defined in just this one file.
*/


/* The "KVS" versions are for the process group to which the calling 
   process belongs.  These use the PMI_KVS routines to access the
   process information */
#undef FUNCNAME
#define FUNCNAME getConnInfoKVS
#undef FCNAME
#define FCNAME MPL_QUOTE(FUNCNAME)
static int getConnInfoKVS( int rank, char *buf, int bufsize, MPIDI_PG_t *pg )
{
#ifdef USE_PMI2_API
    char key[MPIDI_MAX_KVS_KEY_LEN];
    int  mpi_errno = MPI_SUCCESS, rc;
    int vallen;

    rc = MPL_snprintf(key, MPIDI_MAX_KVS_KEY_LEN, "P%d-businesscard", rank );
    if (rc < 0 || rc > MPIDI_MAX_KVS_KEY_LEN) {
	MPIR_ERR_SETANDJUMP(mpi_errno,MPI_ERR_OTHER,"**nomem");
    }

    mpi_errno = PMI2_KVS_Get(pg->connData, PMI2_ID_NULL, key, buf, bufsize, &vallen);
    if (mpi_errno) {
	MPIDI_PG_CheckForSingleton();
	mpi_errno = PMI2_KVS_Get(pg->connData, PMI2_ID_NULL, key, buf, bufsize, &vallen);
        if (mpi_errno) MPIR_ERR_POP(mpi_errno);
    }
 fn_exit:
    return mpi_errno;
 fn_fail:
    goto fn_exit;
#else
    char key[MPIDI_MAX_KVS_KEY_LEN];
    int  mpi_errno = MPI_SUCCESS, rc, pmi_errno;

    rc = MPL_snprintf(key, MPIDI_MAX_KVS_KEY_LEN, "P%d-businesscard", rank );
    if (rc < 0 || rc > MPIDI_MAX_KVS_KEY_LEN) {
	MPIR_ERR_SETANDJUMP(mpi_errno,MPI_ERR_OTHER,"**nomem");
    }

    MPID_THREAD_CS_ENTER(POBJ, MPIR_THREAD_POBJ_PMI_MUTEX);
    pmi_errno = PMI_KVS_Get(pg->connData, key, buf, bufsize );
    if (pmi_errno) {
	MPIDI_PG_CheckForSingleton();
	pmi_errno = PMI_KVS_Get(pg->connData, key, buf, bufsize );
    }
    MPID_THREAD_CS_EXIT(POBJ, MPIR_THREAD_POBJ_PMI_MUTEX);
    if (pmi_errno) {
	MPIR_ERR_SETANDJUMP(mpi_errno,MPI_ERR_OTHER,"**pmi_kvs_get");
    }

 fn_exit:
    return mpi_errno;
 fn_fail:
    goto fn_exit;
#endif
}

/* *slen is the length of the string, including the null terminator.  So if the
   resulting string is |foo\0bar\0|, then *slen == 8. */
static int connToStringKVS( char **buf_p, int *slen, MPIDI_PG_t *pg )
{
    char *string = 0;
    char *pg_idStr = (char *)pg->id;      /* In the PMI/KVS space,
					     the pg id is a string */
    char   buf[MPIDI_MAX_KVS_VALUE_LEN];
    int    i, j, rc, mpi_errno = MPI_SUCCESS, len;
    size_t vallen, curSlen;

    /* Make an initial allocation of a string with an estimate of the
       needed space */
    len = 0;
    curSlen = 10 + pg->size * 128;
    string = (char *)MPIU_Malloc( curSlen );

    /* Start with the id of the pg */
    while (*pg_idStr && len < curSlen) 
	string[len++] = *pg_idStr++;
    string[len++] = 0;
    
    /* Add the size of the pg */
    MPL_snprintf( &string[len], curSlen - len, "%d", pg->size );
    while (string[len]) len++;
    len++;

    for (i=0; i<pg->size; i++) {
	rc = getConnInfoKVS( i, buf, MPIDI_MAX_KVS_VALUE_LEN, pg );
	if (rc) {
	    MPL_internal_error_printf( 
		    "Panic: getConnInfoKVS failed for %s (rc=%d)\n", 
		    (char *)pg->id, rc );
	}
#ifndef USE_PERSISTENT_SHARED_MEMORY
	/* FIXME: This is a hack to avoid including shared-memory 
	   queue names in the business card that may be used
	   by processes that were not part of the same COMM_WORLD. 
	   To fix this, the shared memory channels should look at the
	   returned connection info and decide whether to use 
	   sockets or shared memory by determining whether the
	   process is in the same MPI_COMM_WORLD. */
	/* FIXME: The more general problem is that the connection information
	   needs to include some information on the range of validity (e.g.,
	   all processes, same comm world, particular ranks), and that
	   representation needs to be scalable */
/*	printf( "Adding key %s value %s\n", key, val ); */
	{
	char *p = strstr( buf, "$shm_host" );
	if (p) p[1] = 0;
	/*	    printf( "(fixed) Adding key %s value %s\n", key, val ); */
	}
#endif
	/* Add the information to the output buffer */
	vallen = strlen(buf);
	/* Check that this will fix in the remaining space */
	if (len + vallen + 1 >= curSlen) {
	    char *nstring = 0;
            curSlen += (pg->size - i) * (vallen + 1 );
	    nstring = MPIU_Realloc( string, curSlen );
	    if (!nstring) {
		MPIR_ERR_SETANDJUMP(mpi_errno,MPI_ERR_OTHER,"**nomem");
	    }
	    string = nstring;
	}
	/* Append to string */
	for (j=0; j<vallen+1; j++) {
	    string[len++] = buf[j];
	}
    }

    MPIU_Assert(len <= curSlen);

    *buf_p = string;
    *slen  = len;
 fn_exit:
    return mpi_errno;
 fn_fail:
    if (string) MPIU_Free(string);
    goto fn_exit;
}
static int connFromStringKVS( const char *buf ATTRIBUTE((unused)), 
			      MPIDI_PG_t *pg ATTRIBUTE((unused)) )
{
    /* Fixme: this should be a failure to call this routine */
    return MPI_SUCCESS;
}
static int connFreeKVS( MPIDI_PG_t *pg )
{
    if (pg->connData) {
	MPIU_Free( pg->connData );
    }
    return MPI_SUCCESS;
}


#undef FUNCNAME
#define FUNCNAME MPIDI_PG_InitConnKVS
#undef FCNAME
#define FCNAME MPL_QUOTE(FUNCNAME)
int MPIDI_PG_InitConnKVS( MPIDI_PG_t *pg )
{
#ifdef USE_PMI2_API
    int mpi_errno = MPI_SUCCESS;
    
    pg->connData = (char *)MPIU_Malloc(MAX_JOBID_LEN);
    if (pg->connData == NULL) {
	MPIR_ERR_SETANDJUMP(mpi_errno,MPI_ERR_OTHER, "**nomem");
    }
    
    mpi_errno = PMI2_Job_GetId(pg->connData, MAX_JOBID_LEN);
    if (mpi_errno) MPIR_ERR_POP(mpi_errno);
#else
    int pmi_errno, kvs_name_sz;
    int mpi_errno = MPI_SUCCESS;

    pmi_errno = PMI_KVS_Get_name_length_max( &kvs_name_sz );
    if (pmi_errno != PMI_SUCCESS) {
	MPIR_ERR_SETANDJUMP1(mpi_errno,MPI_ERR_OTHER, 
			     "**pmi_kvs_get_name_length_max", 
			     "**pmi_kvs_get_name_length_max %d", pmi_errno);
    }
    
    pg->connData = (char *)MPIU_Malloc(kvs_name_sz + 1);
    if (pg->connData == NULL) {
	MPIR_ERR_SETANDJUMP(mpi_errno,MPI_ERR_OTHER, "**nomem");
    }
    
    pmi_errno = PMI_KVS_Get_my_name(pg->connData, kvs_name_sz);
    if (pmi_errno != PMI_SUCCESS) {
	MPIR_ERR_SETANDJUMP1(mpi_errno,MPI_ERR_OTHER, 
			     "**pmi_kvs_get_my_name", 
			     "**pmi_kvs_get_my_name %d", pmi_errno);
    }
#endif
    pg->getConnInfo        = getConnInfoKVS;
    pg->connInfoToString   = connToStringKVS;
    pg->connInfoFromString = connFromStringKVS;
    pg->freeConnInfo       = connFreeKVS;

 fn_exit:
    return mpi_errno;
 fn_fail:
    if (pg->connData) { MPIU_Free(pg->connData); }
    goto fn_exit;
}

/* Return the kvsname associated with the MPI_COMM_WORLD of this process. */
int MPIDI_PG_GetConnKVSname( char ** kvsname )
{
    *kvsname = pg_world->connData;
    return MPI_SUCCESS;
}

/* For process groups that are not our MPI_COMM_WORLD, store the connection
   information in an array of strings.  These routines and structure
   implement the access to this information. */
typedef struct {
    int     toStringLen;   /* Length needed to encode this connection info */
    char ** connStrings;   /* pointer to an array, indexed by rank, containing
			      connection information */
} MPIDI_ConnInfo;

static int getConnInfo( int rank, char *buf, int bufsize, MPIDI_PG_t *pg )
{
    MPIDI_ConnInfo *connInfo = (MPIDI_ConnInfo *)pg->connData;

    /* printf( "Entering getConnInfo\n" ); fflush(stdout); */
    if (!connInfo || !connInfo->connStrings || !connInfo->connStrings[rank]) {
	/* FIXME: Turn this into a valid error code create/return */
	printf( "Fatal error in getConnInfo (rank = %d)\n", rank );
	printf( "connInfo = %p\n", connInfo );fflush(stdout);
	if (connInfo) {
	    printf( "connInfo->connStrings = %p\n", connInfo->connStrings );
	}
	/* Fatal error.  Connection information missing */
	fflush(stdout);   
    }

    /* printf( "Copying %s to buf\n", connInfo->connStrings[rank] ); fflush(stdout); */
    
    MPIU_Strncpy( buf, connInfo->connStrings[rank], bufsize );
    return MPI_SUCCESS;
}

#undef FUNCNAME
#define FUNCNAME connToString
#undef FCNAME
#define FCNAME MPL_QUOTE(FUNCNAME)
static int connToString( char **buf_p, int *slen, MPIDI_PG_t *pg )
{
    int mpi_errno = MPI_SUCCESS;
    char *str = NULL, *pg_id;
    int  i, len=0;
    MPIU_CHKPMEM_DECL(1);
    MPIDI_ConnInfo *connInfo = (MPIDI_ConnInfo *)pg->connData;

    /* Create this from the string array */
    MPIU_CHKPMEM_MALLOC(str, char *, connInfo->toStringLen, mpi_errno, "str");

#if defined(MPICH_DEBUG_MEMINIT)
    memset(str, 0, connInfo->toStringLen);
#endif

    pg_id = pg->id;
    /* FIXME: This is a hack, and it doesn't even work */
    /*    MPIDI_PrintConnStrToFile( stdout, __FILE__, __LINE__, 
	  "connToString: pg id is", (char *)pg_id );*/
    /* This is intended to cause a process to transition from a singleton
       to a non-singleton. */
    /* XXX DJG TODO figure out what this little bit is all about. */
    if (strstr( pg_id, "singinit_kvs" ) == pg_id) {
#ifdef USE_PMI2_API
        MPIU_Assertp(0); /* don't know what to do here for pmi2 yet.  DARIUS */
#else
	PMI_KVS_Get_my_name( pg->id, 256 );
#endif
    }
    
    while (*pg_id) str[len++] = *pg_id++;
    str[len++] = 0;
    
    MPL_snprintf( &str[len], 20, "%d", pg->size);
    /* Skip over the length */
    while (str[len++]);

    /* Copy each connection string */
    for (i=0; i<pg->size; i++) {
	char *p = connInfo->connStrings[i];
	while (*p) { str[len++] = *p++; }
	str[len++] = 0;
    }

    if (len > connInfo->toStringLen) {
	*buf_p = 0;
	*slen  = 0;
        MPIR_ERR_INTERNALANDJUMP(mpi_errno, "len > connInfo->toStringLen");
    }

    *buf_p = str;
    *slen = len;

fn_exit:
    MPIU_CHKPMEM_COMMIT();
    return mpi_errno;
fn_fail:
    MPIU_CHKPMEM_REAP();
    goto fn_exit;
    
}
static int connFromString( const char *buf, MPIDI_PG_t *pg )
{
    MPIDI_ConnInfo *conninfo = 0;
    int i, mpi_errno = MPI_SUCCESS;
    const char *buf0 = buf;   /* save the start of buf */

    /* printf( "Starting with buf = %s\n", buf );fflush(stdout); */

    /* Skip the pg id */
    while (*buf) buf++; buf++;

    /* Determine the size of the pg */
    pg->size = atoi( buf );
    while (*buf) buf++; buf++;

    conninfo = (MPIDI_ConnInfo *)MPIU_Malloc( sizeof(MPIDI_ConnInfo) );
    conninfo->connStrings = (char **)MPIU_Malloc( pg->size * sizeof(char *));

    /* For now, make a copy of each item */
    for (i=0; i<pg->size; i++) {
	/* printf( "Adding conn[%d] = %s\n", i, buf );fflush(stdout); */
	conninfo->connStrings[i] = MPIU_Strdup( buf );
	while (*buf) buf++;
	buf++;
    }
    pg->connData = conninfo;
	
    /* Save the length of the string needed to encode the connection
       information */
    conninfo->toStringLen = (int)(buf - buf0) + 1;

    return mpi_errno;
}
static int connFree( MPIDI_PG_t *pg )
{
    MPIDI_ConnInfo *conninfo = (MPIDI_ConnInfo *)pg->connData;
    int i;

    for (i=0; i<pg->size; i++) {
	MPIU_Free( conninfo->connStrings[i] );
    }
    MPIU_Free( conninfo->connStrings );
    MPIU_Free( conninfo );

    return MPI_SUCCESS;
}

#ifdef USE_DBG_LOGGING
/* This is a temporary routine that is used to print out the pg string.
   A better approach may be to convert it into a single (long) string
   with no nulls. */
int MPIDI_PrintConnStr( const char *file, int line, 
			const char *label, const char *str )
{
    int pg_size, i;

    MPIU_DBG_Outevent( file, line, MPIU_DBG_CH3_CONNECT, 0, "%s", label );
    MPIU_DBG_Outevent( file, line, MPIU_DBG_CH3_CONNECT, 0, "%s", str );
    
    /* Skip the pg id */
    while (*str) str++; str++;

    /* Determine the size of the pg */
    pg_size = atoi( str );
    while (*str) str++; str++;

    for (i=0; i<pg_size; i++) {
	MPIU_DBG_Outevent( file, line, MPIU_DBG_CH3_CONNECT, 0, "%s", str );
	while (*str) str++;
	str++;
    }
    return 0;
}
int MPIDI_PrintConnStrToFile( FILE *fd, const char *file, int line, 
			      const char *label, const char *str )
{
    int pg_size, i;

    fprintf( fd, "ConnStr from %s(%d); %s\n\t%s\n", file, line, label, str );
    
    /* Skip the pg id */
    while (*str) str++; str++;

    fprintf( fd, "\t%s\n", str );
    /* Determine the size of the pg */
    pg_size = atoi( str );
    while (*str) str++; str++;

    for (i=0; i<pg_size; i++) {
	fprintf( fd, "\t%s\n", str ); 
	while (*str) str++;
	str++;
    }
    fflush(stdout);
    return 0;
}
#endif

int MPIDI_PG_InitConnString( MPIDI_PG_t *pg )
{
    int mpi_errno = MPI_SUCCESS;

    pg->connData           = 0;
    pg->getConnInfo        = getConnInfo;
    pg->connInfoToString   = connToString;
    pg->connInfoFromString = connFromString;
    pg->freeConnInfo       = connFree;

    return mpi_errno;
}

/* Temp to get connection value for rank r */
#undef FUNCNAME
#define FUNCNAME MPIDI_PG_GetConnString
#undef FCNAME
#define FCNAME MPL_QUOTE(FUNCNAME)
int MPIDI_PG_GetConnString( MPIDI_PG_t *pg, int rank, char *val, int vallen )
{
    int mpi_errno = MPI_SUCCESS;

    if (pg->getConnInfo) {
	mpi_errno = (*pg->getConnInfo)( rank, val, vallen, pg );
    }
    else {
	MPL_internal_error_printf( "Panic: no getConnInfo defined!\n" );
    }

    return mpi_errno;
}


#undef FUNCNAME
#define FUNCNAME MPIDI_PG_Dup_vcr
#undef FCNAME
#define FCNAME MPL_QUOTE(FUNCNAME)
/*@
  MPIDI_PG_Dup_vcr - Duplicate a virtual connection from a process group

  Notes:
  This routine provides a dup of a virtual connection given a process group
  and a rank in that group.  This routine is used only in initializing
  the MPI-1 communicators 'MPI_COMM_WORLD' and 'MPI_COMM_SELF', and in creating
  the initial intercommunicator after an 'MPI_Comm_spawn', 
  'MPI_Comm_spawn_multiple', or 'MPI_Comm_connect/MPI_Comm_accept'.  

  In addition to returning a dup of the virtual connection, it manages the
  reference count of the process group, which is always the number of inuse
  virtual connections.
  @*/
int MPIDI_PG_Dup_vcr( MPIDI_PG_t *pg, int rank, MPIDI_VC_t **vc_p )
{
    MPIDI_VC_t *vc;
    MPIDI_STATE_DECL(MPID_STATE_MPIDI_PG_DUP_VCR);

    MPIDI_FUNC_ENTER(MPID_STATE_MPIDI_PG_DUP_VCR);

    vc = &pg->vct[rank];
    /* Increase the reference count of the vc.  If the reference count 
       increases from 0 to 1, increase the reference count of the 
       process group *and* the reference count of the vc (this
       allows us to distinquish between Comm_free and Comm_disconnect) */
    /* FIXME-MT: This should be a fetch and increment for thread-safety */
    if (MPIU_Object_get_ref(vc) == 0) {
	MPIDI_PG_add_ref(pg);
	MPIDI_VC_add_ref(vc);
    }
    MPIDI_VC_add_ref(vc);
    *vc_p = vc;

    MPIDI_FUNC_EXIT(MPID_STATE_MPIDI_PG_DUP_VCR);
    return MPI_SUCCESS;
}

/* FIXME: This routine should invoke a close method on the connection,
   rather than have all of the code here */
#undef FUNCNAME
#define FUNCNAME MPIDI_PG_Close_VCs
#undef FCNAME
#define FCNAME MPL_QUOTE(FUNCNAME)
/*@
  MPIDI_PG_Close_VCs - Close all virtual connections on all process groups.
  
  Note:
  This routine is used in MPID_Finalize.  It is here to keep the process-group
  related functions together.
  @*/
int MPIDI_PG_Close_VCs( void )
{
    MPIDI_PG_t * pg = MPIDI_PG_list;
    int mpi_errno = MPI_SUCCESS;
    MPIDI_STATE_DECL(MPID_STATE_MPIDI_PG_CLOSE_VCS);

    MPIDI_FUNC_ENTER(MPID_STATE_MPIDI_PG_CLOSE_VCS);

    while (pg) {
	int i, inuse, n, i_start;

	MPIU_DBG_MSG_S(CH3_DISCONNECT,VERBOSE,"Closing vcs for pg %s",
		       (char *)pg->id );

        /* We want to reduce the chance of having all processes send
           close requests to the same process at once.  We do this by
           having processes start at different indices, namely
           (my_pg_rank+1) mod pg->size. */
        i_start = (MPIDI_Process.my_pg_rank+1) % pg->size;
	for (n = 0; n < pg->size; n++)
	{
            MPIDI_VC_t * vc;
            i = (n + i_start) % pg->size;
	    vc = &pg->vct[i];
	    /* If the VC is myself then skip the close message */
	    if (pg == MPIDI_Process.my_pg && i == MPIDI_Process.my_pg_rank) {
                /* XXX DJG FIXME-MT should we be checking this? */
                if (MPIU_Object_get_ref(vc) != 0) {
                    MPIDI_PG_release_ref(pg, &inuse);
                }
		continue;
	    }

	    if (vc->state == MPIDI_VC_STATE_ACTIVE ||
		vc->state == MPIDI_VC_STATE_REMOTE_CLOSE) {
		mpi_errno = MPIDI_CH3U_VC_SendClose( vc, i );
                if (mpi_errno) MPIR_ERR_POP(mpi_errno);
	    } else if (vc->state == MPIDI_VC_STATE_INACTIVE ||
                       vc->state == MPIDI_VC_STATE_MORIBUND) {
                /* XXX DJG FIXME-MT should we be checking this? */
                if (MPIU_Object_get_ref(vc) != 0) {
		    /* FIXME: If the reference count for the vc is not 0,
		       something is wrong */
                    MPIDI_PG_release_ref(pg, &inuse);
                }
                /* Inactive connections need to be marked
                   INACTIVE_CLOSED, so that if a connection request
                   comes in during the close protocol, we know to
                   reject it. */
                if (vc->state == MPIDI_VC_STATE_INACTIVE)
                    MPIDI_CHANGE_VC_STATE(vc, INACTIVE_CLOSED);
            } else {
		MPIU_DBG_MSG_FMT(CH3_DISCONNECT,VERBOSE,(MPIU_DBG_FDEST,
		     "vc=%p: not sending a close to %d, vc in state %s", vc,i,
		     MPIDI_VC_GetStateString(vc->state)));
	    }
	}
        pg->finalize = 1;
	pg = pg->next;
    }
    /* Note that we do not free the process groups within this routine, even
       if the reference counts have gone to zero. That is done once the 
       connections are in fact closed (by the final progress loop that
       handles any close requests that this code generates) */

 fn_exit:
    MPIDI_FUNC_EXIT(MPID_STATE_MPIDI_PG_CLOSE_VCS);
    return mpi_errno;
 fn_fail:
    goto fn_exit;
}

/*
 * This routine may be called to print the contents (including states and
 * reference counts) for process groups.
 */
int MPIU_PG_Printall( FILE *fp )
{
    MPIDI_PG_t *pg;
    int         i;
		  
    pg = MPIDI_PG_list;

    fprintf( fp, "Process groups:\n" );
    while (pg) {
        /* XXX DJG FIXME-MT should we be checking this? */
	fprintf( fp, "size = %d, refcount = %d, id = %s\n", 
		 pg->size, MPIU_Object_get_ref(pg), (char *)pg->id );
	for (i=0; i<pg->size; i++) {
	    fprintf( fp, "\tVCT rank = %d, refcount = %d, lpid = %d, state = %d \n", 
		     pg->vct[i].pg_rank, MPIU_Object_get_ref(&pg->vct[i]),
		     pg->vct[i].lpid, (int)pg->vct[i].state );
	}
	fflush(fp);
	pg = pg->next;
    }

    return 0;
}

int MPIDI_PG_CheckForSingleton( void )
{

#ifdef USE_PMI2_API
    /* PMI2 FIXME for now we just always assume we aren't doing singleton init */
#else
    if (strstr((char*)pg_world->id,"singinit_kvs") == (char *)pg_world->id) {
	char buf[256];
	/* Force an enroll */
	PMI_KVS_Get( "foobar", "foobar", buf, sizeof(buf) );
	PMI_KVS_Get_my_name( pg_world->id, 256 );
	PMI_KVS_Get_my_name( pg_world->connData, 256 );
    }
#endif
    return MPI_SUCCESS;
}