Blame ptl_am/am_reqrep_shmem.c

Packit 961e70
/*
Packit 961e70
Packit 961e70
  This file is provided under a dual BSD/GPLv2 license.  When using or
Packit 961e70
  redistributing this file, you may do so under either license.
Packit 961e70
Packit 961e70
  GPL LICENSE SUMMARY
Packit 961e70
Packit 961e70
  Copyright(c) 2016 Intel Corporation.
Packit 961e70
Packit 961e70
  This program is free software; you can redistribute it and/or modify
Packit 961e70
  it under the terms of version 2 of the GNU General Public License as
Packit 961e70
  published by the Free Software Foundation.
Packit 961e70
Packit 961e70
  This program is distributed in the hope that it will be useful, but
Packit 961e70
  WITHOUT ANY WARRANTY; without even the implied warranty of
Packit 961e70
  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
Packit 961e70
  General Public License for more details.
Packit 961e70
Packit 961e70
  Contact Information:
Packit 961e70
  Intel Corporation, www.intel.com
Packit 961e70
Packit 961e70
  BSD LICENSE
Packit 961e70
Packit 961e70
  Copyright(c) 2016 Intel Corporation.
Packit 961e70
Packit 961e70
  Redistribution and use in source and binary forms, with or without
Packit 961e70
  modification, are permitted provided that the following conditions
Packit 961e70
  are met:
Packit 961e70
Packit 961e70
    * Redistributions of source code must retain the above copyright
Packit 961e70
      notice, this list of conditions and the following disclaimer.
Packit 961e70
    * Redistributions in binary form must reproduce the above copyright
Packit 961e70
      notice, this list of conditions and the following disclaimer in
Packit 961e70
      the documentation and/or other materials provided with the
Packit 961e70
      distribution.
Packit 961e70
    * Neither the name of Intel Corporation nor the names of its
Packit 961e70
      contributors may be used to endorse or promote products derived
Packit 961e70
      from this software without specific prior written permission.
Packit 961e70
Packit 961e70
  THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
Packit 961e70
  "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
Packit 961e70
  LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
Packit 961e70
  A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
Packit 961e70
  OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
Packit 961e70
  SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
Packit 961e70
  LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
Packit 961e70
  DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
Packit 961e70
  THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
Packit 961e70
  (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
Packit 961e70
  OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
Packit 961e70
Packit 961e70
*/
Packit 961e70
Packit 961e70
/* Copyright (c) 2003-2016 Intel Corporation. All rights reserved. */
Packit 961e70
Packit 961e70
#include <sys/types.h>		/* shm_open and signal handling */
Packit 961e70
#include <sys/mman.h>
Packit 961e70
#include <sys/stat.h>
Packit 961e70
#include <fcntl.h>
Packit 961e70
#include <signal.h>
Packit 961e70
Packit 961e70
#include "psm_user.h"
Packit 961e70
#include "psm_mq_internal.h"
Packit 961e70
#include "psm_am_internal.h"
Packit 961e70
#include "cmarw.h"
Packit 961e70
#include "psmi_wrappers.h"
Packit 961e70
Packit 961e70
#ifdef PSM_CUDA
Packit 961e70
#include "am_cuda_memhandle_cache.h"
Packit 961e70
#endif
Packit 961e70
Packit 961e70
int psmi_shm_mq_rv_thresh = PSMI_MQ_RV_THRESH_NO_KASSIST;
Packit 961e70
Packit 961e70
static const amsh_qinfo_t amsh_qcounts = {
Packit 961e70
	.qreqFifoShort = 1024,
Packit 961e70
	.qreqFifoLong = 256,
Packit 961e70
	.qrepFifoShort = 1024,
Packit 961e70
	.qrepFifoLong = 256
Packit 961e70
};
Packit 961e70
Packit 961e70
static const amsh_qinfo_t amsh_qelemsz = {
Packit 961e70
	.qreqFifoShort = sizeof(am_pkt_short_t),
Packit 961e70
	.qreqFifoLong = AMLONG_SZ,
Packit 961e70
	.qrepFifoShort = sizeof(am_pkt_short_t),
Packit 961e70
	.qrepFifoLong = AMLONG_SZ
Packit 961e70
};
Packit 961e70
Packit 961e70
ustatic struct {
Packit 961e70
	void *addr;
Packit 961e70
	size_t len;
Packit 961e70
	struct sigaction SIGSEGV_old_act;
Packit 961e70
	struct sigaction SIGBUS_old_act;
Packit 961e70
} action_stash;
Packit 961e70
Packit 961e70
static psm2_error_t amsh_poll(ptl_t *ptl, int replyonly);
Packit 961e70
static void process_packet(ptl_t *ptl, am_pkt_short_t *pkt, int isreq);
Packit 961e70
static void amsh_conn_handler(void *toki, psm2_amarg_t *args, int narg,
Packit 961e70
			      void *buf, size_t len);
Packit 961e70
Packit 961e70
/* Kassist helper functions */
Packit 961e70
#if _HFI_DEBUGGING
Packit 961e70
static const char *psmi_kassist_getmode(int mode);
Packit 961e70
#endif
Packit 961e70
static int psmi_get_kassist_mode();
Packit 961e70
int psmi_epaddr_pid(psm2_epaddr_t epaddr);
Packit 961e70
Packit 961e70
static inline void
Packit 961e70
am_ctl_qhdr_init(volatile am_ctl_qhdr_t *q, int elem_cnt, int elem_sz)
Packit 961e70
{
Packit 961e70
	pthread_spin_init(&q->lock, PTHREAD_PROCESS_SHARED);
Packit 961e70
	q->head = 0;
Packit 961e70
	q->tail = 0;
Packit 961e70
	q->elem_cnt = elem_cnt;
Packit 961e70
	q->elem_sz = elem_sz;
Packit 961e70
}
Packit 961e70
Packit 961e70
static void
Packit 961e70
am_ctl_bulkpkt_init(am_pkt_bulk_t *base_ptr, size_t elemsz, int nelems)
Packit 961e70
{
Packit 961e70
	int i;
Packit 961e70
	am_pkt_bulk_t *bulkpkt;
Packit 961e70
	uintptr_t bulkptr = (uintptr_t) base_ptr;
Packit 961e70
Packit 961e70
	for (i = 0; i < nelems; i++, bulkptr += elemsz) {
Packit 961e70
		bulkpkt = (am_pkt_bulk_t *) bulkptr;
Packit 961e70
		bulkpkt->idx = i;
Packit 961e70
	}
Packit 961e70
}
Packit 961e70
Packit 961e70
#define _PA(type) PSMI_ALIGNUP(amsh_qcounts.q ## type * amsh_qelemsz.q ## type, \
Packit 961e70
			       PSMI_PAGESIZE)
Packit 961e70
static inline uintptr_t am_ctl_sizeof_block()
Packit 961e70
{
Packit 961e70
	return PSMI_ALIGNUP(
Packit 961e70
			PSMI_ALIGNUP(AMSH_BLOCK_HEADER_SIZE, PSMI_PAGESIZE) +
Packit 961e70
			/* reqctrl block */
Packit 961e70
			PSMI_ALIGNUP(sizeof(am_ctl_blockhdr_t), PSMI_PAGESIZE) +
Packit 961e70
			_PA(reqFifoShort) + _PA(reqFifoLong) +
Packit 961e70
			/*reqctrl block */
Packit 961e70
			PSMI_ALIGNUP(sizeof(am_ctl_blockhdr_t), PSMI_PAGESIZE) +
Packit 961e70
			/* align to page size */
Packit 961e70
			_PA(repFifoShort) + _PA(repFifoLong), PSMI_PAGESIZE);
Packit 961e70
}
Packit 961e70
Packit 961e70
#undef _PA
Packit 961e70
Packit Service 7ed5cc
static uint32_t create_extra_ep_data()
Packit Service 7ed5cc
{
Packit Service 7ed5cc
	uint32_t ret = getpid();
Packit Service 7ed5cc
Packit Service 7ed5cc
#ifdef PSM_CUDA
Packit Service 7ed5cc
	/* PID is at maximum 22 bits */
Packit Service 7ed5cc
	ret |= my_gpu_device << 22;
Packit Service 7ed5cc
#endif
Packit Service 7ed5cc
Packit Service 7ed5cc
	return ret;
Packit Service 7ed5cc
}
Packit Service 7ed5cc
Packit Service 7ed5cc
static void read_extra_ep_data(uint32_t data, uint32_t *pid, uint32_t *gpu)
Packit Service 7ed5cc
{
Packit Service 7ed5cc
	uint32_t pid_mask = (1 << 22) - 1;
Packit Service 7ed5cc
Packit Service 7ed5cc
	*pid = data & pid_mask;
Packit Service 7ed5cc
	*gpu = (data & ~pid_mask) >> 22;
Packit Service 7ed5cc
}
Packit Service 7ed5cc
Packit 961e70
static void am_update_directory(struct am_ctl_nodeinfo *);
Packit 961e70
Packit 961e70
static
Packit 961e70
void amsh_atexit()
Packit 961e70
{
Packit Service 7ed5cc
	static ips_atomic_t atexit_once = { 0 };
Packit 961e70
	psm2_ep_t ep;
Packit 961e70
	struct ptl_am *ptl;
Packit 961e70
Packit Service 7ed5cc
	/* bail out if previous value is non-zero */
Packit Service 7ed5cc
	if (ips_atomic_cmpxchg(&atexit_once, 0, 1) != 0)
Packit 961e70
		return;
Packit 961e70
Packit 961e70
	ep = psmi_opened_endpoint;
Packit 961e70
	while (ep) {
Packit 961e70
		ptl = (struct ptl_am *)(ep->ptl_amsh.ptl);
Packit 961e70
		if (ptl->self_nodeinfo &&
Packit 961e70
		    ptl->amsh_keyname != NULL) {
Packit 961e70
			_HFI_VDBG("unlinking shm file %s\n",
Packit 961e70
				  ptl->amsh_keyname);
Packit 961e70
			shm_unlink(ptl->amsh_keyname);
Packit 961e70
		}
Packit 961e70
		ep = ep->user_ep_next;
Packit 961e70
	}
Packit 961e70
Packit 961e70
	return;
Packit 961e70
}
Packit 961e70
Packit 961e70
ustatic
Packit 961e70
void amsh_mmap_fault(int signo, siginfo_t *siginfo, void *context)
Packit 961e70
{
Packit 961e70
	if ((unsigned long int) siginfo->si_addr >= (unsigned long int) action_stash.addr &&
Packit 961e70
	    (unsigned long int) siginfo->si_addr <  (unsigned long int) action_stash.addr + (unsigned long int) action_stash.len) {
Packit 961e70
Packit 961e70
		static char shm_errmsg[256];
Packit 961e70
Packit 961e70
		snprintf(shm_errmsg, sizeof(shm_errmsg),
Packit 961e70
			 "%s: Unable to allocate shared memory for intra-node messaging.\n"
Packit 961e70
			 "%s: Delete stale shared memory files in /dev/shm.\n",
Packit 961e70
			 psmi_gethostname(), psmi_gethostname());
Packit 961e70
		amsh_atexit();
Packit 961e70
		if (psmi_write(2, shm_errmsg, strlen(shm_errmsg) + 1) == -1)
Packit 961e70
			psmi_exit(2);
Packit 961e70
		else
Packit 961e70
			psmi_exit(1); /* XXX revisit this... there's probably a better way to exit */
Packit 961e70
	} else {
Packit 961e70
		if (signo == SIGSEGV) {
Packit 961e70
			if (action_stash.SIGSEGV_old_act.sa_sigaction == (void*) SIG_DFL) {
Packit 961e70
				psmi_sigaction(SIGSEGV, &action_stash.SIGSEGV_old_act, NULL);
Packit 961e70
				raise(SIGSEGV);
Packit 961e70
				struct sigaction act;
Packit 961e70
				act.sa_sigaction = amsh_mmap_fault;
Packit 961e70
				act.sa_flags = SA_SIGINFO;
Packit 961e70
				psmi_sigaction(SIGSEGV, &act, NULL);
Packit 961e70
			} else if (action_stash.SIGSEGV_old_act.sa_sigaction == (void*) SIG_IGN) {
Packit 961e70
				return;
Packit 961e70
			} else {
Packit 961e70
				action_stash.SIGSEGV_old_act.sa_sigaction(signo, siginfo, context);
Packit 961e70
			}
Packit 961e70
		} else if (signo == SIGBUS) {
Packit 961e70
			if (action_stash.SIGBUS_old_act.sa_sigaction == (void*) SIG_DFL) {
Packit 961e70
				psmi_sigaction(SIGBUS, &action_stash.SIGBUS_old_act, NULL);
Packit 961e70
				raise(SIGBUS);
Packit 961e70
				struct sigaction act;
Packit 961e70
				act.sa_sigaction = amsh_mmap_fault;
Packit 961e70
				act.sa_flags = SA_SIGINFO;
Packit 961e70
				psmi_sigaction(SIGBUS, &act, NULL);
Packit 961e70
			} else if (action_stash.SIGBUS_old_act.sa_sigaction == (void*) SIG_IGN) {
Packit 961e70
				return;
Packit 961e70
			} else {
Packit 961e70
				action_stash.SIGBUS_old_act.sa_sigaction(signo, siginfo, context);
Packit 961e70
			}
Packit 961e70
		} else {
Packit 961e70
			psmi_exit(signo);
Packit 961e70
		}
Packit 961e70
	}
Packit 961e70
}
Packit 961e70
Packit 961e70
/**
Packit 961e70
 * Create endpoint shared-memory object, containing ep's info
Packit 961e70
 * and message queues.
Packit 961e70
 */
Packit 961e70
psm2_error_t psmi_shm_create(ptl_t *ptl_gen)
Packit 961e70
{
Packit 961e70
	struct ptl_am *ptl = (struct ptl_am *)ptl_gen;
Packit 961e70
	psm2_ep_t ep = ptl->ep;
Packit 961e70
	char shmbuf[256];
Packit 961e70
	void *mapptr;
Packit 961e70
	size_t segsz;
Packit 961e70
	psm2_error_t err = PSM2_OK;
Packit 961e70
	int shmfd = -1;
Packit Service 7ed5cc
	char *amsh_keyname = NULL;
Packit 961e70
	int iterator;
Packit 961e70
	/* Get which kassist mode to use. */
Packit 961e70
	ptl->psmi_kassist_mode = psmi_get_kassist_mode();
Packit 961e70
Packit 961e70
	if (_HFI_PRDBG_ON) {
Packit 961e70
		_HFI_PRDBG_ALWAYS
Packit 961e70
			("kassist_mode %d %s use_kassist %d\n",
Packit 961e70
			ptl->psmi_kassist_mode,
Packit 961e70
			psmi_kassist_getmode(ptl->psmi_kassist_mode),
Packit 961e70
			(ptl->psmi_kassist_mode != PSMI_KASSIST_OFF));
Packit 961e70
	}
Packit 961e70
Packit 961e70
	segsz = am_ctl_sizeof_block();
Packit 961e70
	for (iterator = 0; iterator <= INT_MAX; iterator++) {
Packit 961e70
		snprintf(shmbuf,
Packit 961e70
			 sizeof(shmbuf),
Packit 961e70
			 "/psm2_shm.%ld%016lx%d",
Packit 961e70
			 (long int) getuid(),
Packit 961e70
			 ep->epid,
Packit 961e70
			 iterator);
Packit 961e70
		amsh_keyname = psmi_strdup(NULL, shmbuf);
Packit 961e70
		if (amsh_keyname == NULL) {
Packit 961e70
			err = PSM2_NO_MEMORY;
Packit 961e70
			goto fail;
Packit 961e70
		}
Packit 961e70
		shmfd =
Packit 961e70
		    shm_open(amsh_keyname, O_RDWR | O_CREAT, S_IRUSR | S_IWUSR);
Packit 961e70
		if (shmfd < 0) {
Packit Service 7ed5cc
			psmi_free(amsh_keyname);
Packit Service 7ed5cc
			amsh_keyname = NULL;
Packit 961e70
			if (errno == EACCES && iterator < INT_MAX)
Packit 961e70
				continue;
Packit 961e70
			else {
Packit 961e70
				err = psmi_handle_error(NULL,
Packit 961e70
							PSM2_SHMEM_SEGMENT_ERR,
Packit 961e70
							"Error creating shared "
Packit 961e70
							"memory object in "
Packit 961e70
							"shm_open: %s",
Packit 961e70
							strerror(errno));
Packit 961e70
				goto fail;
Packit 961e70
			}
Packit 961e70
		} else {
Packit 961e70
			struct stat st;
Packit 961e70
			if (fstat(shmfd, &st) == -1) {
Packit 961e70
				err = psmi_handle_error(NULL,
Packit 961e70
							PSM2_SHMEM_SEGMENT_ERR,
Packit 961e70
							"Error validating "
Packit 961e70
							"shared memory object "
Packit 961e70
							"with fstat: %s",
Packit 961e70
							strerror(errno));
Packit 961e70
				goto fail;
Packit 961e70
			}
Packit 961e70
			if (getuid() == st.st_uid) {
Packit 961e70
				err = PSM2_OK;
Packit 961e70
				break;
Packit 961e70
			} else {
Packit 961e70
				err = PSM2_SHMEM_SEGMENT_ERR;
Packit 961e70
				close(shmfd);
Packit 961e70
			}
Packit 961e70
		}
Packit 961e70
	}
Packit 961e70
	if (err) {
Packit Service 7ed5cc
		if (amsh_keyname) psmi_free(amsh_keyname);
Packit 961e70
		err = psmi_handle_error(NULL,
Packit 961e70
					PSM2_SHMEM_SEGMENT_ERR,
Packit 961e70
					"Error creating shared memory object "
Packit 961e70
					"in shm_open: namespace exhausted.");
Packit 961e70
		goto fail;
Packit 961e70
	}
Packit 961e70
Packit 961e70
	/* Now register the atexit handler for cleanup, whether master or slave */
Packit 961e70
	atexit(amsh_atexit);
Packit 961e70
Packit 961e70
	_HFI_PRDBG("Opened shmfile %s\n", amsh_keyname);
Packit 961e70
Packit 961e70
	if (ftruncate(shmfd, segsz) != 0) {
Packit 961e70
		err = psmi_handle_error(NULL, PSM2_SHMEM_SEGMENT_ERR,
Packit 961e70
					"Error setting size of shared memory object to %u bytes in "
Packit 961e70
					"ftruncate: %s\n",
Packit 961e70
					(uint32_t) segsz,
Packit 961e70
					strerror(errno));
Packit 961e70
		goto fail;
Packit 961e70
	}
Packit 961e70
Packit 961e70
	mapptr = mmap(NULL, segsz,
Packit 961e70
		      PROT_READ | PROT_WRITE, MAP_SHARED, shmfd, 0);
Packit 961e70
	if (mapptr == MAP_FAILED) {
Packit 961e70
		err = psmi_handle_error(NULL, PSM2_SHMEM_SEGMENT_ERR,
Packit 961e70
					"Error mmapping shared memory: %s",
Packit 961e70
					strerror(errno));
Packit Service 7ed5cc
		psmi_free(amsh_keyname);
Packit 961e70
		goto fail;
Packit 961e70
	}
Packit 961e70
Packit 961e70
	memset((void *) mapptr, 0, segsz); /* touch all of my pages */
Packit 961e70
Packit 961e70
	/* Our own ep's info for ptl_am resides at the start of the
Packit 961e70
	   shm object.  Other processes need some of this info to
Packit 961e70
	   understand the rest of the queue structure and other details. */
Packit 961e70
	ptl->self_nodeinfo = (struct am_ctl_nodeinfo *) mapptr;
Packit 961e70
	ptl->amsh_keyname = amsh_keyname;
Packit 961e70
	ptl->self_nodeinfo->amsh_shmbase = (uintptr_t) mapptr;
Packit 961e70
Packit 961e70
fail:
Packit 961e70
	if (shmfd >= 0) close(shmfd);
Packit 961e70
	return err;
Packit 961e70
}
Packit 961e70
Packit 961e70
psm2_error_t psmi_epdir_extend(ptl_t *ptl_gen)
Packit 961e70
{
Packit 961e70
	struct ptl_am *ptl = (struct ptl_am *)ptl_gen;
Packit 961e70
	struct am_ctl_nodeinfo *new = NULL;
Packit 961e70
Packit 961e70
	new = (struct am_ctl_nodeinfo *)
Packit 961e70
		psmi_memalign(ptl->ep, PER_PEER_ENDPOINT, 64,
Packit 961e70
			      (ptl->am_ep_size + AMSH_DIRBLOCK_SIZE) *
Packit 961e70
			      sizeof(struct am_ctl_nodeinfo));
Packit 961e70
	if (new == NULL)
Packit 961e70
		return PSM2_NO_MEMORY;
Packit 961e70
Packit 961e70
	memcpy(new, ptl->am_ep,
Packit 961e70
	       ptl->am_ep_size * sizeof(struct am_ctl_nodeinfo));
Packit 961e70
	memset(new + ptl->am_ep_size, 0,
Packit 961e70
	       AMSH_DIRBLOCK_SIZE * sizeof(struct am_ctl_nodeinfo));
Packit 961e70
Packit 961e70
	psmi_free(ptl->am_ep);
Packit 961e70
	ptl->am_ep = new;
Packit 961e70
	ptl->am_ep_size += AMSH_DIRBLOCK_SIZE;
Packit 961e70
Packit 961e70
	return PSM2_OK;
Packit 961e70
}
Packit 961e70
Packit 961e70
/**
Packit 961e70
 * Unmap shm regions upon proper disconnect with other processes
Packit 961e70
 */
Packit 961e70
psm2_error_t psmi_do_unmap(uintptr_t shmbase)
Packit 961e70
{
Packit 961e70
	psm2_error_t err = PSM2_OK;
Packit 961e70
	if (munmap((void *)shmbase, am_ctl_sizeof_block())) {
Packit 961e70
		err =
Packit 961e70
		    psmi_handle_error(NULL, PSM2_SHMEM_SEGMENT_ERR,
Packit 961e70
				      "Error with munmap of shared segment: %s",
Packit 961e70
				      strerror(errno));
Packit 961e70
	}
Packit 961e70
	return err;
Packit 961e70
}
Packit 961e70
Packit 961e70
/**
Packit 961e70
 * Map a remote process' shared memory object.
Packit 961e70
 *
Packit 961e70
 * If the remote process has a shared memory object available, add it to our own
Packit 961e70
 * directory and return the shmidx.  If the shared memory object does not exist,
Packit 961e70
 * return -1, and the connect poll function will try to map again later.
Packit 961e70
 *
Packit 961e70
 * If force_remap is true, then clear the entry that matches the epid.
Packit 961e70
 */
Packit 961e70
psm2_error_t psmi_shm_map_remote(ptl_t *ptl_gen, psm2_epid_t epid, uint16_t *shmidx_o, int force_remap)
Packit 961e70
{
Packit 961e70
	struct ptl_am *ptl = (struct ptl_am *)ptl_gen;
Packit 961e70
	int i;
Packit 961e70
	int use_kassist;
Packit 961e70
	uint16_t shmidx;
Packit 961e70
	char shmbuf[256];
Packit 961e70
	void *dest_mapptr;
Packit 961e70
	size_t segsz;
Packit 961e70
	psm2_error_t err = PSM2_OK;
Packit 961e70
	int dest_shmfd;
Packit 961e70
	struct am_ctl_nodeinfo *dest_nodeinfo;
Packit 961e70
	int iterator;
Packit 961e70
Packit 961e70
	shmidx = *shmidx_o = -1;
Packit 961e70
Packit 961e70
	for (i = 0; i <= ptl->max_ep_idx; i++) {
Packit 961e70
		if (ptl->am_ep[i].epid == epid) {
Packit 961e70
			if (force_remap) {
Packit 961e70
				ptl->am_ep[i].epaddr = NULL;
Packit 961e70
				ptl->am_ep[i].epid = 0;
Packit 961e70
				break;
Packit 961e70
			}
Packit 961e70
			*shmidx_o = shmidx = i;
Packit 961e70
			return err;
Packit 961e70
		}
Packit 961e70
	}
Packit 961e70
Packit 961e70
Packit 961e70
	use_kassist = (ptl->psmi_kassist_mode != PSMI_KASSIST_OFF);
Packit 961e70
Packit 961e70
	segsz = am_ctl_sizeof_block();
Packit 961e70
	for (iterator = 0; iterator <= INT_MAX; iterator++) {
Packit 961e70
		snprintf(shmbuf,
Packit 961e70
			 sizeof(shmbuf),
Packit 961e70
			 "/psm2_shm.%ld%016lx%d",
Packit 961e70
			 (long int) getuid(),
Packit 961e70
			 epid,
Packit 961e70
			 iterator);
Packit 961e70
		dest_shmfd = shm_open(shmbuf, O_RDWR, S_IRWXU);
Packit 961e70
		if (dest_shmfd < 0) {
Packit 961e70
			if (errno == EACCES && iterator < INT_MAX)
Packit 961e70
				continue;
Packit 961e70
			else {
Packit 961e70
				err = psmi_handle_error(NULL,
Packit 961e70
							PSM2_SHMEM_SEGMENT_ERR,
Packit 961e70
							"Error opening remote "
Packit 961e70
							"shared memory object "
Packit 961e70
							"in shm_open: %s",
Packit 961e70
							strerror(errno));
Packit 961e70
				goto fail;
Packit 961e70
			}
Packit 961e70
		} else {
Packit 961e70
			struct stat st;
Packit 961e70
			if (fstat(dest_shmfd, &st) == -1) {
Packit 961e70
				err = psmi_handle_error(NULL,
Packit 961e70
							PSM2_SHMEM_SEGMENT_ERR,
Packit 961e70
							"Error validating "
Packit 961e70
							"shared memory object "
Packit 961e70
							"with fstat: %s",
Packit 961e70
							strerror(errno));
Packit Service 7ed5cc
				close(dest_shmfd);
Packit 961e70
				goto fail;
Packit 961e70
			}
Packit 961e70
			if (getuid() == st.st_uid) {
Packit 961e70
				err = PSM2_OK;
Packit 961e70
				break;
Packit 961e70
			} else {
Packit 961e70
				err = PSM2_SHMEM_SEGMENT_ERR;
Packit 961e70
				close(dest_shmfd);
Packit 961e70
			}
Packit 961e70
		}
Packit 961e70
	}
Packit 961e70
	if (err) {
Packit 961e70
		err = psmi_handle_error(NULL,
Packit 961e70
					PSM2_SHMEM_SEGMENT_ERR,
Packit 961e70
					"Error opening remote shared "
Packit 961e70
					"memory object in shm_open: "
Packit 961e70
					"namespace exhausted.");
Packit 961e70
		goto fail;
Packit 961e70
	}
Packit 961e70
Packit 961e70
	dest_mapptr = mmap(NULL, segsz,
Packit 961e70
		      PROT_READ | PROT_WRITE, MAP_SHARED, dest_shmfd, 0);
Packit 961e70
	if (dest_mapptr == MAP_FAILED) {
Packit 961e70
		err = psmi_handle_error(NULL, PSM2_SHMEM_SEGMENT_ERR,
Packit 961e70
					"Error mmapping remote shared memory: %s",
Packit 961e70
					strerror(errno));
Packit Service 7ed5cc
		close(dest_shmfd);
Packit 961e70
		goto fail;
Packit 961e70
	}
Packit 961e70
	close(dest_shmfd);
Packit 961e70
	dest_nodeinfo = (struct am_ctl_nodeinfo *)dest_mapptr;
Packit 961e70
Packit 961e70
	/* We core dump right after here if we don't check the mmap */
Packit 961e70
	action_stash.addr = dest_mapptr;
Packit 961e70
	action_stash.len = segsz;
Packit 961e70
Packit 961e70
	struct sigaction act = { .sa_sigaction = amsh_mmap_fault, .sa_flags = SA_SIGINFO };
Packit 961e70
Packit 961e70
	sigaction(SIGSEGV, &act, &action_stash.SIGSEGV_old_act);
Packit 961e70
	sigaction(SIGBUS, &act, &action_stash.SIGBUS_old_act);
Packit 961e70
Packit 961e70
	{
Packit 961e70
		volatile uint16_t *is_init = &dest_nodeinfo->is_init;
Packit 961e70
		while (*is_init == 0)
Packit 961e70
			usleep(1);
Packit 961e70
		ips_sync_reads();
Packit 961e70
		_HFI_PRDBG("Got a published remote dirpage page at "
Packit 961e70
			   "%p, size=%dn", dest_mapptr, (int)segsz);
Packit 961e70
	}
Packit 961e70
Packit 961e70
	shmidx = -1;
Packit 961e70
	if ((ptl->max_ep_idx + 1) == ptl->am_ep_size) {
Packit 961e70
		err = psmi_epdir_extend(ptl_gen);
Packit 961e70
		if (err)
Packit 961e70
			goto fail;
Packit 961e70
Packit 961e70
		for (i = 0; i <= ptl->max_ep_idx; i++) {
Packit 961e70
			if (ptl->am_ep[i].epid != 0)
Packit 961e70
				am_update_directory(&ptl->am_ep[i]);
Packit 961e70
		}
Packit 961e70
	}
Packit 961e70
	for (i = 0; i < ptl->am_ep_size; i++) {
Packit 961e70
		psmi_assert(ptl->am_ep[i].epid != epid);
Packit 961e70
		if (ptl->am_ep[i].epid == 0) {
Packit 961e70
			ptl->am_ep[i].epid = epid;
Packit 961e70
			ptl->am_ep[i].psm_verno = dest_nodeinfo->psm_verno;
Packit 961e70
			ptl->am_ep[i].pid = dest_nodeinfo->pid;
Packit 961e70
			if (use_kassist) {
Packit 961e70
				/* If we are able to use CMA assume everyone
Packit 961e70
				 * else on the node can also use it.
Packit 961e70
				 * Advertise that CMA is active via the
Packit 961e70
				 * feature flag.
Packit 961e70
				 */
Packit 961e70
Packit 961e70
				if (cma_available()) {
Packit 961e70
					ptl->am_ep[i].amsh_features |=
Packit 961e70
					    AMSH_HAVE_CMA;
Packit 961e70
					psmi_shm_mq_rv_thresh =
Packit 961e70
					    PSMI_MQ_RV_THRESH_CMA;
Packit 961e70
				} else {
Packit 961e70
					ptl->psmi_kassist_mode =
Packit 961e70
					    PSMI_KASSIST_OFF;
Packit 961e70
					use_kassist = 0;
Packit 961e70
					psmi_shm_mq_rv_thresh =
Packit 961e70
					    PSMI_MQ_RV_THRESH_NO_KASSIST;
Packit 961e70
				}
Packit 961e70
			} else
Packit 961e70
				psmi_shm_mq_rv_thresh =
Packit 961e70
				    PSMI_MQ_RV_THRESH_NO_KASSIST;
Packit 961e70
			_HFI_PRDBG("KASSIST MODE: %s\n",
Packit 961e70
				   psmi_kassist_getmode(ptl->psmi_kassist_mode));
Packit 961e70
			shmidx = *shmidx_o = i;
Packit 961e70
			_HFI_PRDBG("Mapped epid %lx into shmidx %d\n", epid, shmidx);
Packit 961e70
			ptl->am_ep[i].amsh_shmbase = (uintptr_t) dest_mapptr;
Packit 961e70
			ptl->am_ep[i].amsh_qsizes = dest_nodeinfo->amsh_qsizes;
Packit 961e70
			if (i > ptl->max_ep_idx)
Packit 961e70
				ptl->max_ep_idx = i;
Packit 961e70
			break;
Packit 961e70
		}
Packit 961e70
	}
Packit 961e70
Packit 961e70
	/* install the old sighandler back */
Packit 961e70
	sigaction(SIGSEGV, &action_stash.SIGSEGV_old_act, NULL);
Packit 961e70
	sigaction(SIGBUS, &action_stash.SIGBUS_old_act, NULL);
Packit 961e70
Packit 961e70
	if (shmidx == (uint16_t)-1)
Packit 961e70
		err = psmi_handle_error(NULL, PSM2_SHMEM_SEGMENT_ERR,
Packit Service 7ed5cc
					"Could not connect to local endpoint");
Packit Service 7ed5cc
fail:
Packit 961e70
	return err;
Packit 961e70
}
Packit 961e70
Packit 961e70
/**
Packit 961e70
 * Initialize pointer structure and locks for endpoint shared-memory AM.
Packit 961e70
 */
Packit 961e70
Packit 961e70
#define AMSH_QSIZE(type)                                                \
Packit 961e70
	PSMI_ALIGNUP(amsh_qelemsz.q ## type * amsh_qcounts.q ## type,   \
Packit 961e70
		     PSMI_PAGESIZE)
Packit 961e70
Packit 961e70
static psm2_error_t amsh_init_segment(ptl_t *ptl_gen)
Packit 961e70
{
Packit 961e70
	struct ptl_am *ptl = (struct ptl_am *)ptl_gen;
Packit 961e70
	psm2_error_t err = PSM2_OK;
Packit 961e70
Packit 961e70
	/* Preconditions */
Packit 961e70
	psmi_assert_always(ptl != NULL);
Packit 961e70
	psmi_assert_always(ptl->ep != NULL);
Packit 961e70
	psmi_assert_always(ptl->epaddr != NULL);
Packit 961e70
	psmi_assert_always(ptl->ep->epid != 0);
Packit 961e70
Packit 961e70
	if ((err = psmi_shm_create(ptl_gen)))
Packit 961e70
		goto fail;
Packit 961e70
Packit 961e70
	ptl->self_nodeinfo->amsh_qsizes.qreqFifoShort = AMSH_QSIZE(reqFifoShort);
Packit 961e70
	ptl->self_nodeinfo->amsh_qsizes.qreqFifoLong = AMSH_QSIZE(reqFifoLong);
Packit 961e70
	ptl->self_nodeinfo->amsh_qsizes.qrepFifoShort = AMSH_QSIZE(repFifoShort);
Packit 961e70
	ptl->self_nodeinfo->amsh_qsizes.qrepFifoLong = AMSH_QSIZE(repFifoLong);
Packit 961e70
Packit 961e70
	/* We core dump right after here if we don't check the mmap */
Packit 961e70
Packit Service 7ed5cc
	struct sigaction act = {
Packit Service 7ed5cc
		.sa_sigaction = amsh_mmap_fault,
Packit Service 7ed5cc
		.sa_flags = SA_SIGINFO
Packit Service 7ed5cc
	};
Packit 961e70
Packit 961e70
	sigaction(SIGSEGV, &act, &action_stash.SIGSEGV_old_act);
Packit 961e70
	sigaction(SIGBUS, &act, &action_stash.SIGBUS_old_act);
Packit 961e70
Packit 961e70
	/*
Packit 961e70
	 * Now that we know our epid, update it in the shmidx array
Packit 961e70
	 */
Packit 961e70
	ptl->reqH.base = ptl->reqH.head = ptl->reqH.end = NULL;
Packit 961e70
	ptl->repH.base = ptl->repH.head = ptl->repH.end = NULL;
Packit 961e70
Packit 961e70
	am_update_directory(ptl->self_nodeinfo);
Packit 961e70
Packit 961e70
	ptl->reqH.head = ptl->reqH.base = (am_pkt_short_t *)
Packit 961e70
		(((uintptr_t)ptl->self_nodeinfo->qdir.qreqFifoShort));
Packit 961e70
	ptl->reqH.end = (am_pkt_short_t *)
Packit 961e70
		(((uintptr_t)ptl->self_nodeinfo->qdir.qreqFifoShort) +
Packit 961e70
		 amsh_qcounts.qreqFifoShort * amsh_qelemsz.qreqFifoShort);
Packit 961e70
Packit 961e70
	ptl->repH.head = ptl->repH.base = (am_pkt_short_t *)
Packit 961e70
		(((uintptr_t)ptl->self_nodeinfo->qdir.qrepFifoShort));
Packit 961e70
	ptl->repH.end = (am_pkt_short_t *)
Packit 961e70
		(((uintptr_t)ptl->self_nodeinfo->qdir.qrepFifoShort) +
Packit 961e70
		 amsh_qcounts.qrepFifoShort * amsh_qelemsz.qrepFifoShort);
Packit 961e70
Packit 961e70
	am_ctl_qhdr_init(&ptl->self_nodeinfo->qdir.qreqH->shortq,
Packit 961e70
			 amsh_qcounts.qreqFifoShort,
Packit 961e70
			 amsh_qelemsz.qreqFifoShort);
Packit 961e70
	am_ctl_qhdr_init(&ptl->self_nodeinfo->qdir.qreqH->longbulkq,
Packit 961e70
			 amsh_qcounts.qreqFifoLong, amsh_qelemsz.qreqFifoLong);
Packit 961e70
	am_ctl_qhdr_init(&ptl->self_nodeinfo->qdir.qrepH->shortq,
Packit 961e70
			 amsh_qcounts.qrepFifoShort,
Packit 961e70
			 amsh_qelemsz.qrepFifoShort);
Packit 961e70
	am_ctl_qhdr_init(&ptl->self_nodeinfo->qdir.qrepH->longbulkq,
Packit 961e70
			 amsh_qcounts.qrepFifoLong, amsh_qelemsz.qrepFifoLong);
Packit 961e70
Packit 961e70
	/* Set bulkidx in every bulk packet */
Packit 961e70
	am_ctl_bulkpkt_init(ptl->self_nodeinfo->qdir.qreqFifoLong,
Packit 961e70
			    amsh_qelemsz.qreqFifoLong,
Packit 961e70
			    amsh_qcounts.qreqFifoLong);
Packit 961e70
	am_ctl_bulkpkt_init(ptl->self_nodeinfo->qdir.qrepFifoLong,
Packit 961e70
			    amsh_qelemsz.qrepFifoLong,
Packit 961e70
			    amsh_qcounts.qrepFifoLong);
Packit 961e70
Packit 961e70
	/* install the old sighandler back */
Packit 961e70
	sigaction(SIGSEGV, &action_stash.SIGSEGV_old_act, NULL);
Packit 961e70
	sigaction(SIGBUS, &action_stash.SIGBUS_old_act, NULL);
Packit 961e70
Packit 961e70
fail:
Packit 961e70
	return err;
Packit 961e70
}
Packit 961e70
Packit 961e70
psm2_error_t psmi_shm_detach(ptl_t *ptl_gen)
Packit 961e70
{
Packit 961e70
	struct ptl_am *ptl = (struct ptl_am *)ptl_gen;
Packit 961e70
	psm2_error_t err = PSM2_OK;
Packit 961e70
	uintptr_t shmbase;
Packit 961e70
Packit 961e70
	if (ptl->self_nodeinfo == NULL)
Packit 961e70
		return err;
Packit 961e70
Packit 961e70
	_HFI_VDBG("unlinking shm file %s\n", ptl->amsh_keyname + 1);
Packit 961e70
	shmbase = ptl->self_nodeinfo->amsh_shmbase;
Packit 961e70
	shm_unlink(ptl->amsh_keyname);
Packit 961e70
	psmi_free(ptl->amsh_keyname);
Packit 961e70
Packit 961e70
	if (munmap((void *)shmbase, am_ctl_sizeof_block())) {
Packit 961e70
		err =
Packit 961e70
		    psmi_handle_error(NULL, PSM2_SHMEM_SEGMENT_ERR,
Packit 961e70
				      "Error with munmap of shared segment: %s",
Packit 961e70
				      strerror(errno));
Packit 961e70
		goto fail;
Packit 961e70
	}
Packit 961e70
	ptl->self_nodeinfo = NULL;
Packit 961e70
	return PSM2_OK;
Packit 961e70
Packit 961e70
fail:
Packit 961e70
	return err;
Packit 961e70
}
Packit 961e70
Packit 961e70
/**
Packit 961e70
 * Update locally shared-pointer directory.  The directory must be
Packit 961e70
 * updated when a new epaddr is connected to or on every epaddr already
Packit 961e70
 * connected to whenever the shared memory segment is relocated via mremap.
Packit 961e70
 *
Packit 961e70
 * @param epaddr Endpoint address for which to update local directory.
Packit 961e70
 */
Packit 961e70
Packit 961e70
static
Packit 961e70
void am_update_directory(struct am_ctl_nodeinfo *nodeinfo)
Packit 961e70
{
Packit 961e70
	uintptr_t base_this;
Packit 961e70
Packit 961e70
	base_this = nodeinfo->amsh_shmbase +
Packit 961e70
		AMSH_BLOCK_HEADER_SIZE;
Packit 961e70
Packit 961e70
	/* Request queues */
Packit 961e70
	nodeinfo->qdir.qreqH = (am_ctl_blockhdr_t *) base_this;
Packit 961e70
	nodeinfo->qdir.qreqFifoShort = (am_pkt_short_t *)
Packit 961e70
	    ((uintptr_t) nodeinfo->qdir.qreqH +
Packit 961e70
	     PSMI_ALIGNUP(sizeof(am_ctl_blockhdr_t), PSMI_PAGESIZE));
Packit 961e70
Packit 961e70
	nodeinfo->qdir.qreqFifoLong = (am_pkt_bulk_t *)
Packit 961e70
	    ((uintptr_t) nodeinfo->qdir.qreqFifoShort +
Packit 961e70
	     nodeinfo->amsh_qsizes.qreqFifoShort);
Packit 961e70
Packit 961e70
	/* Reply queues */
Packit 961e70
	nodeinfo->qdir.qrepH = (am_ctl_blockhdr_t *)
Packit 961e70
	    ((uintptr_t) nodeinfo->qdir.qreqFifoLong +
Packit 961e70
	     nodeinfo->amsh_qsizes.qreqFifoLong);
Packit 961e70
Packit 961e70
	nodeinfo->qdir.qrepFifoShort = (am_pkt_short_t *)
Packit 961e70
	    ((uintptr_t) nodeinfo->qdir.qrepH +
Packit 961e70
	     PSMI_ALIGNUP(sizeof(am_ctl_blockhdr_t), PSMI_PAGESIZE));
Packit 961e70
	nodeinfo->qdir.qrepFifoLong = (am_pkt_bulk_t *)
Packit 961e70
	    ((uintptr_t) nodeinfo->qdir.qrepFifoShort +
Packit 961e70
	     nodeinfo->amsh_qsizes.qrepFifoShort);
Packit 961e70
Packit 961e70
	_HFI_VDBG("epaddr=%p Request Hdr=%p,Pkt=%p,Long=%p\n",
Packit 961e70
		  nodeinfo->epaddr,
Packit 961e70
		  nodeinfo->qdir.qreqH,
Packit 961e70
		  nodeinfo->qdir.qreqFifoShort,
Packit 961e70
		  nodeinfo->qdir.qreqFifoLong);
Packit 961e70
	_HFI_VDBG("epaddr=%p Reply   Hdr=%p,Pkt=%p,Long=%p\n",
Packit 961e70
		  nodeinfo->epaddr,
Packit 961e70
		  nodeinfo->qdir.qrepH,
Packit 961e70
		  nodeinfo->qdir.qrepFifoShort,
Packit 961e70
		  nodeinfo->qdir.qrepFifoLong);
Packit 961e70
Packit 961e70
	/* Sanity check */
Packit 961e70
	uintptr_t base_next =
Packit 961e70
	    (uintptr_t) nodeinfo->qdir.qrepFifoLong +
Packit 961e70
	    nodeinfo->amsh_qsizes.qrepFifoLong;
Packit 961e70
Packit 961e70
	psmi_assert_always(base_next - base_this <= am_ctl_sizeof_block());
Packit 961e70
}
Packit 961e70
Packit 961e70
Packit 961e70
/* ep_epid_share_memory wrapper */
Packit 961e70
static
Packit 961e70
int amsh_epid_reachable(ptl_t *ptl_gen, psm2_epid_t epid)
Packit 961e70
{
Packit 961e70
	struct ptl_am *ptl = (struct ptl_am *)ptl_gen;
Packit 961e70
	int result;
Packit 961e70
	psm2_error_t err;
Packit 961e70
	err = psm2_ep_epid_share_memory(ptl->ep, epid, &result);
Packit 961e70
	psmi_assert_always(err == PSM2_OK);
Packit 961e70
	return result;
Packit 961e70
}
Packit 961e70
Packit 961e70
static
Packit 961e70
psm2_error_t
Packit 961e70
amsh_epaddr_add(ptl_t *ptl_gen, psm2_epid_t epid, uint16_t shmidx, psm2_epaddr_t *epaddr_o)
Packit 961e70
{
Packit 961e70
	struct ptl_am *ptl = (struct ptl_am *)ptl_gen;
Packit 961e70
	psm2_epaddr_t epaddr;
Packit 961e70
	am_epaddr_t *amaddr;
Packit 961e70
	psm2_error_t err = PSM2_OK;
Packit 961e70
Packit 961e70
	psmi_assert(psmi_epid_lookup(ptl->ep, epid) == NULL);
Packit 961e70
Packit 961e70
	/* The self PTL handles loopback communication. */
Packit 961e70
	psmi_assert(epid != ptl->epid);
Packit 961e70
Packit 961e70
	/* note the size of the memory is am_epaddr_t */
Packit 961e70
	epaddr = (psm2_epaddr_t) psmi_calloc(ptl->ep,
Packit 961e70
					    PER_PEER_ENDPOINT, 1,
Packit 961e70
					    sizeof(am_epaddr_t));
Packit 961e70
	if (epaddr == NULL) {
Packit 961e70
		return PSM2_NO_MEMORY;
Packit 961e70
	}
Packit 961e70
	psmi_assert_always(ptl->am_ep[shmidx].epaddr == NULL);
Packit 961e70
Packit 961e70
	if ((err = psmi_epid_set_hostname(psm2_epid_nid(epid),
Packit 961e70
					  psmi_gethostname(), 0)))
Packit 961e70
		goto fail;
Packit 961e70
Packit 961e70
	epaddr->ptlctl = ptl->ctl;
Packit 961e70
	epaddr->epid = epid;
Packit 961e70
Packit 961e70
	/* convert to am_epaddr_t */
Packit 961e70
	amaddr = (am_epaddr_t *) epaddr;
Packit 961e70
	/* tell the other endpoint their location in our directory */
Packit 961e70
	amaddr->shmidx = shmidx;
Packit 961e70
	/* we haven't connected yet, so we can't give them the same hint */
Packit 961e70
	amaddr->return_shmidx = -1;
Packit 961e70
	amaddr->cstate_outgoing = AMSH_CSTATE_OUTGOING_NONE;
Packit 961e70
	amaddr->cstate_incoming = AMSH_CSTATE_INCOMING_NONE;
Packit 961e70
Packit 961e70
	/* other setup */
Packit 961e70
	ptl->am_ep[shmidx].epaddr = epaddr;
Packit 961e70
	am_update_directory(&ptl->am_ep[shmidx]);
Packit 961e70
	/* Finally, add to table */
Packit 961e70
	if ((err = psmi_epid_add(ptl->ep, epid, epaddr)))
Packit 961e70
		goto fail;
Packit 961e70
	_HFI_VDBG("epaddr=%s added to ptl=%p\n",
Packit 961e70
		  psmi_epaddr_get_name(epid), ptl);
Packit 961e70
	*epaddr_o = epaddr;
Packit 961e70
	return PSM2_OK;
Packit 961e70
fail:
Packit 961e70
	if (epaddr != ptl->epaddr)
Packit 961e70
		psmi_free(epaddr);
Packit 961e70
	return err;
Packit 961e70
}
Packit 961e70
Packit 961e70
static
Packit 961e70
void
Packit 961e70
amsh_epaddr_update(ptl_t *ptl_gen, psm2_epaddr_t epaddr)
Packit 961e70
{
Packit 961e70
	struct ptl_am *ptl = (struct ptl_am *)ptl_gen;
Packit 961e70
	am_epaddr_t *amaddr;
Packit 961e70
	uint16_t shmidx;
Packit 961e70
	struct am_ctl_nodeinfo *nodeinfo;
Packit 961e70
Packit 961e70
	amaddr = (am_epaddr_t *) epaddr;
Packit 961e70
	shmidx = amaddr->shmidx;
Packit 961e70
	nodeinfo = (struct am_ctl_nodeinfo *) ptl->am_ep[shmidx].amsh_shmbase;
Packit 961e70
Packit 961e70
	/* restart the connection process */
Packit 961e70
	amaddr->return_shmidx = -1;
Packit 961e70
	amaddr->cstate_outgoing = AMSH_CSTATE_OUTGOING_NONE;
Packit 961e70
Packit 961e70
	/* wait for the other process to init again */
Packit 961e70
	{
Packit 961e70
		volatile uint16_t *is_init = &nodeinfo->is_init;
Packit 961e70
		while (*is_init == 0)
Packit 961e70
			usleep(1);
Packit 961e70
		ips_sync_reads();
Packit 961e70
	}
Packit 961e70
Packit 961e70
	/* get the updated values from the new nodeinfo page */
Packit 961e70
	ptl->am_ep[shmidx].psm_verno = nodeinfo->psm_verno;
Packit 961e70
	ptl->am_ep[shmidx].pid = nodeinfo->pid;
Packit 961e70
	ptl->am_ep[shmidx].amsh_qsizes = nodeinfo->amsh_qsizes;
Packit 961e70
	am_update_directory(&ptl->am_ep[shmidx]);
Packit 961e70
	return;
Packit 961e70
}
Packit 961e70
Packit 961e70
struct ptl_connection_req {
Packit 961e70
	int isdone;
Packit 961e70
	int op;			/* connect or disconnect */
Packit 961e70
	int numep;
Packit 961e70
	int numep_left;
Packit 961e70
	int phase;
Packit 961e70
Packit 961e70
	int *epid_mask;
Packit 961e70
	const psm2_epid_t *epids;	/* input epid list */
Packit 961e70
	psm2_epaddr_t *epaddr;
Packit 961e70
	psm2_error_t *errors;	/* inout errors */
Packit 961e70
Packit 961e70
	/* Used for connect/disconnect */
Packit 961e70
	psm2_amarg_t args[4];
Packit 961e70
};
Packit 961e70
Packit 961e70
static
Packit 961e70
void amsh_free_epaddr(psm2_epaddr_t epaddr)
Packit 961e70
{
Packit 961e70
	psmi_epid_remove(epaddr->ptlctl->ep, epaddr->epid);
Packit 961e70
	psmi_free(epaddr);
Packit 961e70
	return;
Packit 961e70
}
Packit 961e70
Packit 961e70
#define PTL_OP_CONNECT      0
Packit 961e70
#define PTL_OP_DISCONNECT   1
Packit 961e70
#define PTL_OP_ABORT        2
Packit 961e70
Packit 961e70
static
Packit 961e70
psm2_error_t
Packit 961e70
amsh_ep_connreq_init(ptl_t *ptl_gen, int op, /* connect, disconnect or abort */
Packit 961e70
		     int numep, const psm2_epid_t *array_of_epid, /* non-NULL on connect */
Packit 961e70
		     const int array_of_epid_mask[],
Packit 961e70
		     psm2_error_t *array_of_errors,
Packit 961e70
		     psm2_epaddr_t *array_of_epaddr,
Packit 961e70
		     struct ptl_connection_req **req_o)
Packit 961e70
{
Packit 961e70
	struct ptl_am *ptl = (struct ptl_am *)ptl_gen;
Packit 961e70
	int i, cstate;
Packit 961e70
	psm2_epaddr_t epaddr;
Packit 961e70
	psm2_epid_t epid;
Packit 961e70
	struct ptl_connection_req *req = NULL;
Packit 961e70
Packit 961e70
	req = (struct ptl_connection_req *)
Packit 961e70
	    psmi_calloc(ptl->ep, PER_PEER_ENDPOINT, 1,
Packit 961e70
			sizeof(struct ptl_connection_req));
Packit 961e70
	if (req == NULL)
Packit 961e70
		return PSM2_NO_MEMORY;
Packit 961e70
	req->isdone = 0;
Packit 961e70
	req->op = op;
Packit 961e70
	req->numep = numep;
Packit 961e70
	req->numep_left = 0;
Packit 961e70
	req->phase = ptl->connect_phase;
Packit 961e70
	req->epid_mask = (int *)
Packit 961e70
	    psmi_calloc(ptl->ep, PER_PEER_ENDPOINT, numep, sizeof(int));
Packit 961e70
	if (req->epid_mask == NULL) {
Packit 961e70
		psmi_free(req);
Packit 961e70
		return PSM2_NO_MEMORY;
Packit 961e70
	}
Packit 961e70
	req->epaddr = array_of_epaddr;
Packit 961e70
	req->epids = array_of_epid;
Packit 961e70
	req->errors = array_of_errors;
Packit 961e70
Packit 961e70
	/* First check if there's really something to connect/disconnect
Packit 961e70
	 * for this PTL */
Packit 961e70
	for (i = 0; i < numep; i++) {
Packit 961e70
		req->epid_mask[i] = AMSH_CMASK_NONE;	/* no connect by default */
Packit 961e70
		if (!array_of_epid_mask[i])
Packit 961e70
			continue;
Packit 961e70
		if (op == PTL_OP_CONNECT) {
Packit 961e70
			epid = array_of_epid[i];
Packit 961e70
Packit 961e70
			/* Connect only to other processes reachable by shared memory.
Packit 961e70
			   The self PTL handles loopback communication, so explicitly
Packit 961e70
			   refuse to connect to self. */
Packit 961e70
			if (!amsh_epid_reachable(ptl_gen, epid)
Packit 961e70
			    || epid == ptl->epid) {
Packit 961e70
				array_of_errors[i] = PSM2_EPID_UNREACHABLE;
Packit 961e70
				array_of_epaddr[i] = NULL;
Packit 961e70
				continue;
Packit 961e70
			}
Packit 961e70
Packit 961e70
			_HFI_VDBG("looking at epid %llx\n",
Packit 961e70
				  (unsigned long long)epid);
Packit 961e70
			epaddr = psmi_epid_lookup(ptl->ep, epid);
Packit 961e70
			if (epaddr != NULL) {
Packit 961e70
				if (epaddr->ptlctl->ptl != ptl_gen) {
Packit 961e70
					array_of_errors[i] =
Packit 961e70
					    PSM2_EPID_UNREACHABLE;
Packit 961e70
					array_of_epaddr[i] = NULL;
Packit 961e70
					continue;
Packit 961e70
				}
Packit 961e70
				cstate = ((am_epaddr_t *) epaddr)->cstate_outgoing;
Packit 961e70
				if (cstate == AMSH_CSTATE_OUTGOING_ESTABLISHED) {
Packit 961e70
					array_of_epaddr[i] = epaddr;
Packit 961e70
					array_of_errors[i] = PSM2_OK;
Packit 961e70
				} else {
Packit 961e70
					psmi_assert(cstate ==
Packit 961e70
						    AMSH_CSTATE_OUTGOING_NONE);
Packit 961e70
					array_of_errors[i] = PSM2_TIMEOUT;
Packit 961e70
					array_of_epaddr[i] = epaddr;
Packit 961e70
					req->epid_mask[i] = AMSH_CMASK_PREREQ;
Packit 961e70
				}
Packit 961e70
			} else {
Packit 961e70
				req->epid_mask[i] = AMSH_CMASK_PREREQ;
Packit 961e70
				array_of_epaddr[i] = NULL;
Packit 961e70
			}
Packit 961e70
		} else {	/* disc or abort */
Packit 961e70
			epaddr = array_of_epaddr[i];
Packit 961e70
			if (epaddr->ptlctl->ptl != ptl_gen)
Packit 961e70
				continue;
Packit 961e70
Packit 961e70
			psmi_assert(epaddr != NULL);
Packit 961e70
			cstate = ((am_epaddr_t *) epaddr)->cstate_outgoing;
Packit 961e70
			if (cstate == AMSH_CSTATE_OUTGOING_ESTABLISHED) {
Packit 961e70
				req->epid_mask[i] = AMSH_CMASK_PREREQ;
Packit 961e70
				_HFI_VDBG
Packit 961e70
				    ("Just set index %d to AMSH_CMASK_PREREQ\n",
Packit 961e70
				     i);
Packit 961e70
			}
Packit 961e70
			/* XXX undef ? */
Packit 961e70
		}
Packit 961e70
		if (req->epid_mask[i] != AMSH_CMASK_NONE)
Packit 961e70
			req->numep_left++;
Packit 961e70
	}
Packit 961e70
Packit 961e70
	if (req->numep_left == 0) {	/* nothing to do */
Packit 961e70
		psmi_free(req->epid_mask);
Packit 961e70
		psmi_free(req);
Packit 961e70
		_HFI_VDBG("Nothing to connect, bump up phase\n");
Packit 961e70
		ptl->connect_phase++;
Packit 961e70
		*req_o = NULL;
Packit 961e70
		return PSM2_OK;
Packit 961e70
	} else {
Packit 961e70
		*req_o = req;
Packit 961e70
		return PSM2_OK_NO_PROGRESS;
Packit 961e70
	}
Packit 961e70
}
Packit 961e70
Packit 961e70
static
Packit 961e70
psm2_error_t
Packit 961e70
amsh_ep_connreq_poll(ptl_t *ptl_gen, struct ptl_connection_req *req)
Packit 961e70
{
Packit 961e70
	struct ptl_am *ptl = (struct ptl_am *)ptl_gen;
Packit 961e70
	int i, j, cstate;
Packit 961e70
	uint16_t shmidx = (uint16_t)-1;
Packit 961e70
	psm2_error_t err = PSM2_OK;
Packit 961e70
	psm2_epid_t epid;
Packit 961e70
	psm2_epaddr_t epaddr;
Packit 961e70
Packit 961e70
	if (req == NULL || req->isdone)
Packit 961e70
		return PSM2_OK;
Packit 961e70
Packit 961e70
	psmi_assert_always(ptl->connect_phase == req->phase);
Packit 961e70
Packit 961e70
	if (req->op == PTL_OP_DISCONNECT || req->op == PTL_OP_ABORT) {
Packit 961e70
		for (i = 0; i < req->numep; i++) {
Packit 961e70
			if (req->epid_mask[i] == AMSH_CMASK_NONE ||
Packit 961e70
			    req->epid_mask[i] == AMSH_CMASK_DONE)
Packit 961e70
				continue;
Packit 961e70
Packit 961e70
			epaddr = req->epaddr[i];
Packit 961e70
			psmi_assert(epaddr != NULL);
Packit 961e70
			if (req->epid_mask[i] == AMSH_CMASK_PREREQ) {
Packit 961e70
				shmidx = ((am_epaddr_t *) epaddr)->shmidx;
Packit 961e70
				/* Make sure the target of the disconnect is still there */
Packit 961e70
				if (ptl->am_ep[shmidx].
Packit 961e70
				    epid != epaddr->epid) {
Packit 961e70
					req->numep_left--;
Packit 961e70
					req->epid_mask[i] = AMSH_CMASK_DONE;
Packit 961e70
					((am_epaddr_t *) epaddr)->cstate_outgoing =
Packit 961e70
						AMSH_CSTATE_OUTGOING_NONE;
Packit 961e70
				}
Packit 961e70
			}
Packit 961e70
Packit 961e70
			if (req->epid_mask[i] == AMSH_CMASK_PREREQ) {
Packit 961e70
				req->args[0].u16w0 = PSMI_AM_DISC_REQ;
Packit 961e70
				req->args[0].u16w1 = shmidx;
Packit 961e70
				req->args[0].u32w1 = ptl->connect_phase;
Packit 961e70
				req->args[1].u64w0 = (uint64_t) ptl->epid;
Packit 961e70
				psmi_assert(shmidx != (uint16_t)-1);
Packit Service 7ed5cc
				req->args[2].u32w0 = create_extra_ep_data();
Packit 961e70
				req->args[2].u32w1 = PSM2_OK;
Packit 961e70
				req->args[3].u64w0 =
Packit 961e70
				    (uint64_t) (uintptr_t) &req->errors[i];
Packit 961e70
				psmi_amsh_short_request(ptl_gen, epaddr,
Packit 961e70
							amsh_conn_handler_hidx,
Packit 961e70
							req->args, 4, NULL, 0,
Packit 961e70
							0);
Packit 961e70
				((am_epaddr_t *) epaddr)->cstate_outgoing =
Packit 961e70
					AMSH_CSTATE_OUTGOING_DISC_REQUESTED;
Packit 961e70
				/**
Packit 961e70
				* Only munmap if we have nothing more to
Packit 961e70
				* communicate with the other node, i.e. we
Packit 961e70
				* already recieved a disconnect req from the
Packit 961e70
				* other node.
Packit 961e70
				*/
Packit 961e70
				if (((am_epaddr_t *) epaddr)->cstate_incoming ==
Packit 961e70
					AMSH_CSTATE_INCOMING_DISC_REQUESTED)
Packit 961e70
					err = psmi_do_unmap(ptl->am_ep[shmidx].amsh_shmbase);
Packit 961e70
				req->epid_mask[i] = AMSH_CMASK_POSTREQ;
Packit 961e70
			} else if (req->epid_mask[i] == AMSH_CMASK_POSTREQ) {
Packit 961e70
				cstate = ((am_epaddr_t *) epaddr)->cstate_outgoing;
Packit 961e70
				if (cstate == AMSH_CSTATE_OUTGOING_DISC_REPLIED) {
Packit 961e70
					req->numep_left--;
Packit 961e70
					req->epid_mask[i] = AMSH_CMASK_DONE;
Packit 961e70
					((am_epaddr_t *) epaddr)->cstate_outgoing =
Packit 961e70
						AMSH_CSTATE_OUTGOING_NONE;
Packit 961e70
				}
Packit 961e70
			}
Packit 961e70
		}
Packit 961e70
	} else {
Packit 961e70
		/* First see if we've made progress on any postreqs */
Packit 961e70
		int n_prereq = 0;
Packit 961e70
		for (i = 0; i < req->numep; i++) {
Packit 961e70
			int cstate;
Packit 961e70
			if (req->epid_mask[i] != AMSH_CMASK_POSTREQ) {
Packit 961e70
				if (req->epid_mask[i] == AMSH_CMASK_PREREQ)
Packit 961e70
					n_prereq++;
Packit 961e70
				continue;
Packit 961e70
			}
Packit 961e70
			epaddr = req->epaddr[i];
Packit 961e70
			psmi_assert(epaddr != NULL);
Packit 961e70
Packit 961e70
			/* detect if a race has occurred on due to re-using an
Packit 961e70
			 * old shm file - if so, restart the connection */
Packit 961e70
			shmidx = ((am_epaddr_t *) epaddr)->shmidx;
Packit 961e70
			if (ptl->am_ep[shmidx].pid !=
Packit 961e70
			    ((struct am_ctl_nodeinfo *) ptl->am_ep[shmidx].amsh_shmbase)->pid) {
Packit 961e70
				req->epid_mask[i] = AMSH_CMASK_PREREQ;
Packit 961e70
				((am_epaddr_t *) epaddr)->cstate_outgoing =
Packit 961e70
					AMSH_CSTATE_OUTGOING_NONE;
Packit 961e70
				n_prereq++;
Packit 961e70
				amsh_epaddr_update(ptl_gen, epaddr);
Packit 961e70
				continue;
Packit 961e70
			}
Packit 961e70
Packit 961e70
			cstate = ((am_epaddr_t *) epaddr)->cstate_outgoing;
Packit 961e70
			if (cstate == AMSH_CSTATE_OUTGOING_REPLIED) {
Packit 961e70
				req->numep_left--;
Packit 961e70
				((am_epaddr_t *) epaddr)->cstate_outgoing =
Packit 961e70
					AMSH_CSTATE_OUTGOING_ESTABLISHED;
Packit 961e70
				req->epid_mask[i] = AMSH_CMASK_DONE;
Packit 961e70
				continue;
Packit 961e70
			}
Packit 961e70
		}
Packit 961e70
		if (n_prereq > 0) {
Packit 961e70
			psmi_assert(req->numep_left > 0);
Packit 961e70
			/* Go through the list of peers we need to connect to and find out
Packit 961e70
			 * if they each shared ep is mapped into shm */
Packit 961e70
			for (i = 0; i < req->numep; i++) {
Packit 961e70
				if (req->epid_mask[i] != AMSH_CMASK_PREREQ)
Packit 961e70
					continue;
Packit 961e70
				epid = req->epids[i];
Packit 961e70
				epaddr = req->epaddr[i];
Packit 961e70
				/* Go through mapped epids and find the epid we're looking for */
Packit 961e70
				for (shmidx = -1, j = 0;
Packit 961e70
				     j <= ptl->max_ep_idx; j++) {
Packit 961e70
					/* epid is connected and ready to go */
Packit 961e70
					if (ptl->am_ep[j].
Packit 961e70
					    epid == epid) {
Packit 961e70
						shmidx = j;
Packit 961e70
						break;
Packit 961e70
					}
Packit 961e70
				}
Packit 961e70
				if (shmidx == (uint16_t)-1) {
Packit 961e70
					/* Couldn't find peer's epid in dirpage.
Packit 961e70
					   Check shmdir to see if epid is up now. */
Packit 961e70
					if ((err = psmi_shm_map_remote(ptl_gen, epid, &shmidx, 0))) {
Packit 961e70
						return err;
Packit 961e70
					}
Packit 961e70
					continue;
Packit 961e70
				}
Packit 961e70
				/* Before we even send the request out, check to see if
Packit 961e70
				 * versions are interoperable */
Packit 961e70
				if (!psmi_verno_isinteroperable
Packit 961e70
				    (ptl->am_ep[shmidx].
Packit 961e70
				     psm_verno)) {
Packit 961e70
					char buf[32];
Packit 961e70
					uint16_t their_verno =
Packit 961e70
					    ptl->am_ep[shmidx].
Packit 961e70
					    psm_verno;
Packit 961e70
					snprintf(buf, sizeof(buf), "%d.%d",
Packit 961e70
						 PSMI_VERNO_GET_MAJOR
Packit 961e70
						 (their_verno),
Packit 961e70
						 PSMI_VERNO_GET_MINOR
Packit 961e70
						 (their_verno));
Packit 961e70
Packit 961e70
					_HFI_INFO("Local endpoint id %" PRIx64
Packit 961e70
						  " has version %s "
Packit 961e70
						  "which is not supported by library version %d.%d",
Packit 961e70
						  epid, buf, PSM2_VERNO_MAJOR,
Packit 961e70
						  PSM2_VERNO_MINOR);
Packit 961e70
					req->errors[i] =
Packit 961e70
					    PSM2_EPID_INVALID_VERSION;
Packit 961e70
					req->numep_left--;
Packit 961e70
					req->epid_mask[i] = AMSH_CMASK_DONE;
Packit 961e70
					continue;
Packit 961e70
				}
Packit 961e70
				if (epaddr != NULL) {
Packit 961e70
					psmi_assert(((am_epaddr_t *) epaddr)->
Packit 961e70
						    shmidx == shmidx);
Packit 961e70
				} else
Packit 961e70
				    if ((epaddr =
Packit 961e70
					 psmi_epid_lookup(ptl->ep,
Packit 961e70
							  epid)) == NULL) {
Packit 961e70
					if ((err =
Packit 961e70
					     amsh_epaddr_add(ptl_gen, epid, shmidx,
Packit 961e70
							     &epaddr))) {
Packit 961e70
						return err;
Packit 961e70
					}
Packit 961e70
					/* Remote pid is unknown at the moment */
Packit 961e70
					((am_epaddr_t *) epaddr)->pid =
Packit 961e70
						AMSH_PID_UNKNOWN;
Packit 961e70
				}
Packit 961e70
				req->epaddr[i] = epaddr;
Packit 961e70
				req->args[0].u16w0 = PSMI_AM_CONN_REQ;
Packit 961e70
				/* tell the other process its shmidx here */
Packit 961e70
				req->args[0].u16w1 = shmidx;
Packit 961e70
				req->args[0].u32w1 = ptl->connect_phase;
Packit 961e70
				req->args[1].u64w0 = (uint64_t) ptl->epid;
Packit Service 7ed5cc
				req->args[2].u32w0 = create_extra_ep_data();
Packit 961e70
				req->args[2].u32w1 = PSM2_OK;
Packit 961e70
				req->args[3].u64w0 =
Packit 961e70
				    (uint64_t) (uintptr_t) &req->errors[i];
Packit 961e70
				req->epid_mask[i] = AMSH_CMASK_POSTREQ;
Packit 961e70
				psmi_amsh_short_request(ptl_gen, epaddr,
Packit 961e70
							amsh_conn_handler_hidx,
Packit 961e70
							req->args, 4, NULL, 0,
Packit 961e70
							0);
Packit 961e70
				_HFI_PRDBG("epaddr=%p, epid=%" PRIx64
Packit 961e70
					   " at shmidx=%d\n", epaddr, epid,
Packit 961e70
					   shmidx);
Packit 961e70
			}
Packit 961e70
		}
Packit 961e70
	}
Packit 961e70
Packit 961e70
	if (req->numep_left == 0) {	/* we're all done */
Packit 961e70
		req->isdone = 1;
Packit 961e70
		return PSM2_OK;
Packit 961e70
	} else {
Packit 961e70
		sched_yield();
Packit 961e70
		return PSM2_OK_NO_PROGRESS;
Packit 961e70
	}
Packit 961e70
}
Packit 961e70
Packit 961e70
static
Packit 961e70
psm2_error_t
Packit 961e70
amsh_ep_connreq_fini(ptl_t *ptl_gen, struct ptl_connection_req *req)
Packit 961e70
{
Packit 961e70
	struct ptl_am *ptl = (struct ptl_am *)ptl_gen;
Packit 961e70
	psm2_error_t err = PSM2_OK;
Packit 961e70
	int i;
Packit 961e70
Packit 961e70
	/* Wherever we are at in our connect process, we've been instructed to
Packit 961e70
	 * finish the connection process */
Packit 961e70
	if (req == NULL)
Packit 961e70
		return PSM2_OK;
Packit 961e70
Packit 961e70
	/* This prevents future connect replies from referencing data structures
Packit 961e70
	 * that disappeared */
Packit 961e70
	ptl->connect_phase++;
Packit 961e70
Packit 961e70
	/* First process any leftovers in postreq or prereq */
Packit 961e70
	for (i = 0; i < req->numep; i++) {
Packit 961e70
		if (req->epid_mask[i] == AMSH_CMASK_NONE)
Packit 961e70
			continue;
Packit 961e70
		else if (req->epid_mask[i] == AMSH_CMASK_POSTREQ) {
Packit 961e70
			int cstate;
Packit 961e70
			req->epid_mask[i] = AMSH_CMASK_DONE;
Packit 961e70
			cstate = ((am_epaddr_t *) req->epaddr[i])->cstate_outgoing;
Packit 961e70
			if (cstate == AMSH_CSTATE_OUTGOING_REPLIED) {
Packit 961e70
				req->numep_left--;
Packit 961e70
				((am_epaddr_t *) req->epaddr[i])->cstate_outgoing =
Packit 961e70
					AMSH_CSTATE_OUTGOING_ESTABLISHED;
Packit 961e70
			} else {	/* never actually got reply */
Packit 961e70
				req->errors[i] = PSM2_TIMEOUT;
Packit 961e70
			}
Packit 961e70
		}
Packit 961e70
		/* If we couldn't go from prereq to postreq, that means we couldn't
Packit 961e70
		 * find the shmidx for an epid in time.  This can only be a case of
Packit 961e70
		 * time out */
Packit 961e70
		else if (req->epid_mask[i] == AMSH_CMASK_PREREQ) {
Packit 961e70
			req->errors[i] = PSM2_TIMEOUT;
Packit 961e70
			req->numep_left--;
Packit 961e70
			req->epid_mask[i] = AMSH_CMASK_DONE;
Packit 961e70
		}
Packit 961e70
	}
Packit 961e70
Packit 961e70
	/* Whatever is left can only be in DONE or NONE state */
Packit 961e70
	for (i = 0; i < req->numep; i++) {
Packit 961e70
		if (req->epid_mask[i] == AMSH_CMASK_NONE)
Packit 961e70
			continue;
Packit 961e70
		psmi_assert(req->epid_mask[i] == AMSH_CMASK_DONE);
Packit 961e70
Packit 961e70
		err = psmi_error_cmp(err, req->errors[i]);
Packit 961e70
		/* XXX TODO: Report errors in connection. */
Packit 961e70
		/* Only free epaddr if they have disconnected from us */
Packit 961e70
		int cstate = ((am_epaddr_t *) req->epaddr[i])->cstate_incoming;
Packit 961e70
		if (cstate == AMSH_CSTATE_INCOMING_DISC_REQUESTED) {
Packit 961e70
			if (req->op == PTL_OP_DISCONNECT || req->op == PTL_OP_ABORT) {
Packit 961e70
				psmi_assert(req->epaddr[i] != NULL);
Packit 961e70
				amsh_free_epaddr(req->epaddr[i]);
Packit 961e70
				req->epaddr[i] = NULL;
Packit 961e70
			}
Packit 961e70
		}
Packit 961e70
	}
Packit 961e70
Packit 961e70
	psmi_free(req->epid_mask);
Packit 961e70
	psmi_free(req);
Packit 961e70
Packit 961e70
	return err;
Packit 961e70
}
Packit 961e70
Packit 961e70
/* Wrapper for 2.0's use of connect/disconnect.  The plan is to move the
Packit 961e70
 * init/poll/fini interface up to the PTL level for 2.2 */
Packit 961e70
#define CONNREQ_ZERO_POLLS_BEFORE_YIELD  20
Packit 961e70
static
Packit 961e70
psm2_error_t
Packit 961e70
amsh_ep_connreq_wrap(ptl_t *ptl_gen, int op,
Packit 961e70
		     int numep,
Packit 961e70
		     const psm2_epid_t *array_of_epid,
Packit 961e70
		     const int array_of_epid_mask[],
Packit 961e70
		     psm2_error_t *array_of_errors,
Packit 961e70
		     psm2_epaddr_t *array_of_epaddr, uint64_t timeout_ns)
Packit 961e70
{
Packit 961e70
	struct ptl_am *ptl = (struct ptl_am *)ptl_gen;
Packit 961e70
	psm2_error_t err;
Packit 961e70
	uint64_t t_start;
Packit 961e70
	struct ptl_connection_req *req;
Packit 961e70
	int num_polls_noprogress = 0;
Packit 961e70
	static int shm_polite_attach = -1;
Packit 961e70
Packit 961e70
	if (shm_polite_attach == -1) {
Packit 961e70
		char *p = getenv("PSM2_SHM_POLITE_ATTACH");
Packit 961e70
		if (p && *p && atoi(p) != 0) {
Packit 961e70
			fprintf(stderr, "%s: Using Polite SHM segment attach\n",
Packit 961e70
				psmi_gethostname());
Packit 961e70
			shm_polite_attach = 1;
Packit 961e70
		}
Packit 961e70
		shm_polite_attach = 0;
Packit 961e70
	}
Packit 961e70
Packit 961e70
	/* Initialize */
Packit 961e70
	err = amsh_ep_connreq_init(ptl_gen, op, numep,
Packit 961e70
				   array_of_epid, array_of_epid_mask,
Packit 961e70
				   array_of_errors, array_of_epaddr, &req;;
Packit 961e70
	if (err != PSM2_OK_NO_PROGRESS)	/* Either we're all done with connect or
Packit 961e70
					 * there was an error */
Packit 961e70
		return err;
Packit 961e70
Packit 961e70
	/* Poll until either
Packit 961e70
	 * 1. We time out
Packit 961e70
	 * 2. We are done with connecting
Packit 961e70
	 */
Packit 961e70
	t_start = get_cycles();
Packit 961e70
	do {
Packit 961e70
		psmi_poll_internal(ptl->ep, 1);
Packit 961e70
		err = amsh_ep_connreq_poll(ptl_gen, req);
Packit 961e70
		if (err == PSM2_OK)
Packit 961e70
			break;	/* Finished before timeout */
Packit 961e70
		else if (err != PSM2_OK_NO_PROGRESS) {
Packit 961e70
			psmi_free(req->epid_mask);
Packit 961e70
			psmi_free(req);
Packit 961e70
			goto fail;
Packit 961e70
		} else if (shm_polite_attach &&
Packit 961e70
			   ++num_polls_noprogress ==
Packit 961e70
			   CONNREQ_ZERO_POLLS_BEFORE_YIELD) {
Packit 961e70
			num_polls_noprogress = 0;
Packit 961e70
			PSMI_YIELD(ptl->ep->mq->progress_lock);
Packit 961e70
		}
Packit 961e70
	}
Packit 961e70
	while (psmi_cycles_left(t_start, timeout_ns));
Packit 961e70
Packit 961e70
	err = amsh_ep_connreq_fini(ptl_gen, req);
Packit 961e70
Packit 961e70
fail:
Packit 961e70
	return err;
Packit 961e70
}
Packit 961e70
Packit 961e70
static
Packit 961e70
psm2_error_t
Packit 961e70
amsh_ep_connect(ptl_t *ptl,
Packit 961e70
		int numep,
Packit 961e70
		const psm2_epid_t *array_of_epid,
Packit 961e70
		const int array_of_epid_mask[],
Packit 961e70
		psm2_error_t *array_of_errors,
Packit 961e70
		psm2_epaddr_t *array_of_epaddr, uint64_t timeout_ns)
Packit 961e70
{
Packit 961e70
	return amsh_ep_connreq_wrap(ptl, PTL_OP_CONNECT, numep, array_of_epid,
Packit 961e70
				    array_of_epid_mask, array_of_errors,
Packit 961e70
				    array_of_epaddr, timeout_ns);
Packit 961e70
}
Packit 961e70
Packit 961e70
static
Packit 961e70
psm2_error_t
Packit 961e70
amsh_ep_disconnect(ptl_t *ptl, int force, int numep,
Packit 961e70
		   psm2_epaddr_t array_of_epaddr[],
Packit 961e70
		   const int array_of_epaddr_mask[],
Packit 961e70
		   psm2_error_t array_of_errors[], uint64_t timeout_ns)
Packit 961e70
{
Packit 961e70
	return amsh_ep_connreq_wrap(ptl,
Packit 961e70
				    force ? PTL_OP_ABORT : PTL_OP_DISCONNECT,
Packit 961e70
				    numep, NULL, array_of_epaddr_mask,
Packit 961e70
				    array_of_errors,
Packit 961e70
				    array_of_epaddr,
Packit 961e70
				    timeout_ns);
Packit 961e70
}
Packit 961e70
Packit 961e70
#undef CSWAP
Packit 961e70
PSMI_ALWAYS_INLINE(
Packit 961e70
int32_t
Packit 961e70
cswap(volatile int32_t *p, int32_t old_value, int32_t new_value))
Packit 961e70
{
Packit 961e70
	asm volatile ("lock cmpxchg %2, %0" :
Packit 961e70
		      "+m" (*p), "+a"(old_value) : "r"(new_value) : "memory");
Packit 961e70
	return old_value;
Packit 961e70
}
Packit 961e70
Packit 961e70
PSMI_ALWAYS_INLINE(
Packit 961e70
am_pkt_short_t *
Packit 961e70
am_ctl_getslot_pkt_inner(volatile am_ctl_qhdr_t *shq, am_pkt_short_t *pkt0))
Packit 961e70
{
Packit 961e70
	am_pkt_short_t *pkt;
Packit 961e70
	uint32_t idx;
Packit 961e70
#ifndef CSWAP
Packit 961e70
	pthread_spin_lock(&shq->lock);
Packit 961e70
	idx = shq->tail;
Packit 961e70
	pkt = (am_pkt_short_t *) ((uintptr_t) pkt0 + idx * shq->elem_sz);
Packit 961e70
	if (pkt->flag == QFREE) {
Packit 961e70
		ips_sync_reads();
Packit 961e70
		pkt->flag = QUSED;
Packit 961e70
		shq->tail += 1;
Packit 961e70
		if (shq->tail == shq->elem_cnt)
Packit 961e70
			shq->tail = 0;
Packit 961e70
	} else {
Packit 961e70
		pkt = 0;
Packit 961e70
	}
Packit 961e70
	pthread_spin_unlock(&shq->lock);
Packit 961e70
#else
Packit 961e70
	uint32_t idx_next;
Packit 961e70
	do {
Packit 961e70
		idx = shq->tail;
Packit 961e70
		idx_next = (idx + 1 == shq->elem_cnt) ? 0 : idx + 1;
Packit 961e70
	} while (cswap(&shq->tail, idx, idx_next) != idx);
Packit 961e70
Packit 961e70
	pkt = (am_pkt_short_t *) ((uintptr_t) pkt0 + idx * shq->elem_sz);
Packit 961e70
	while (cswap(&pkt->flag, QFREE, QUSED) != QFREE);
Packit 961e70
#endif
Packit 961e70
	return pkt;
Packit 961e70
}
Packit 961e70
Packit 961e70
/* This is safe because 'flag' is at the same offset on both pkt and bulkpkt */
Packit 961e70
#define am_ctl_getslot_bulkpkt_inner(shq, pkt0) ((am_pkt_bulk_t *) \
Packit 961e70
	am_ctl_getslot_pkt_inner(shq, (am_pkt_short_t *)(pkt0)))
Packit 961e70
Packit 961e70
PSMI_ALWAYS_INLINE(
Packit 961e70
am_pkt_short_t *
Packit 961e70
am_ctl_getslot_pkt(ptl_t *ptl_gen, uint16_t shmidx, int is_reply))
Packit 961e70
{
Packit 961e70
	struct ptl_am *ptl = (struct ptl_am *)ptl_gen;
Packit 961e70
	volatile am_ctl_qhdr_t *shq;
Packit 961e70
	am_pkt_short_t *pkt0;
Packit 961e70
	if (!is_reply) {
Packit 961e70
		shq = &(ptl->am_ep[shmidx].qdir.qreqH->shortq);
Packit 961e70
		pkt0 = ptl->am_ep[shmidx].qdir.qreqFifoShort;
Packit 961e70
	} else {
Packit 961e70
		shq = &(ptl->am_ep[shmidx].qdir.qrepH->shortq);
Packit 961e70
		pkt0 = ptl->am_ep[shmidx].qdir.qrepFifoShort;
Packit 961e70
	}
Packit 961e70
	return am_ctl_getslot_pkt_inner(shq, pkt0);
Packit 961e70
}
Packit 961e70
Packit 961e70
PSMI_ALWAYS_INLINE(
Packit 961e70
am_pkt_bulk_t *
Packit 961e70
am_ctl_getslot_long(ptl_t *ptl_gen, uint16_t shmidx, int is_reply))
Packit 961e70
{
Packit 961e70
	struct ptl_am *ptl = (struct ptl_am *)ptl_gen;
Packit 961e70
	volatile am_ctl_qhdr_t *shq;
Packit 961e70
	am_pkt_bulk_t *pkt0;
Packit 961e70
	if (!is_reply) {
Packit 961e70
		shq = &(ptl->am_ep[shmidx].qdir.qreqH->longbulkq);
Packit 961e70
		pkt0 = ptl->am_ep[shmidx].qdir.qreqFifoLong;
Packit 961e70
	} else {
Packit 961e70
		shq = &(ptl->am_ep[shmidx].qdir.qrepH->longbulkq);
Packit 961e70
		pkt0 = ptl->am_ep[shmidx].qdir.qrepFifoLong;
Packit 961e70
	}
Packit 961e70
	return am_ctl_getslot_bulkpkt_inner(shq, pkt0);
Packit 961e70
}
Packit 961e70
Packit 961e70
psmi_handlertab_t psmi_allhandlers[] = {
Packit 961e70
	{0}
Packit 961e70
	,
Packit 961e70
	{amsh_conn_handler}
Packit 961e70
	,
Packit 961e70
	{psmi_am_mq_handler}
Packit 961e70
	,
Packit 961e70
	{psmi_am_mq_handler_data}
Packit 961e70
	,
Packit 961e70
	{psmi_am_mq_handler_rtsmatch}
Packit 961e70
	,
Packit 961e70
	{psmi_am_mq_handler_rtsdone}
Packit 961e70
	,
Packit 961e70
	{psmi_am_handler}
Packit 961e70
};
Packit 961e70
Packit 961e70
PSMI_ALWAYS_INLINE(void advance_head(volatile am_ctl_qshort_cache_t *hdr))
Packit 961e70
{
Packit 961e70
	QMARKFREE(hdr->head);
Packit 961e70
	hdr->head++;
Packit 961e70
	if (hdr->head == hdr->end)
Packit 961e70
		hdr->head = hdr->base;
Packit 961e70
}
Packit 961e70
Packit 961e70
#define AMSH_ZERO_POLLS_BEFORE_YIELD    64
Packit 961e70
#define AMSH_POLLS_BEFORE_PSM_POLL      16
Packit 961e70
Packit 961e70
/* XXX this can be made faster.  Instead of checking the flag of the head, keep
Packit 961e70
 * a cached copy of the integer value of the tail and compare it against the
Packit 961e70
 * previous one we saw.
Packit 961e70
 */
Packit 961e70
PSMI_ALWAYS_INLINE(
Packit 961e70
psm2_error_t
Packit 961e70
amsh_poll_internal_inner(ptl_t *ptl_gen, int replyonly,
Packit 961e70
			 int is_internal))
Packit 961e70
{
Packit 961e70
	struct ptl_am *ptl = (struct ptl_am *)ptl_gen;
Packit 961e70
	psm2_error_t err = PSM2_OK_NO_PROGRESS;
Packit 961e70
	/* poll replies */
Packit 961e70
	if (!QISEMPTY(ptl->repH.head->flag)) {
Packit 961e70
		do {
Packit 961e70
			ips_sync_reads();
Packit 961e70
			process_packet(ptl_gen, (am_pkt_short_t *) ptl->repH.head,
Packit 961e70
				       0);
Packit 961e70
			advance_head(&ptl->repH);
Packit 961e70
			err = PSM2_OK;
Packit 961e70
		} while (!QISEMPTY(ptl->repH.head->flag));
Packit 961e70
	}
Packit 961e70
Packit 961e70
	if (!replyonly) {
Packit 961e70
		/* Request queue not enable for 2.0, will be re-enabled to support long
Packit 961e70
		 * replies */
Packit 961e70
		if (!is_internal && ptl->psmi_am_reqq_fifo.first != NULL) {
Packit 961e70
			psmi_am_reqq_drain(ptl_gen);
Packit 961e70
			err = PSM2_OK;
Packit 961e70
		}
Packit 961e70
		if (!QISEMPTY(ptl->reqH.head->flag)) {
Packit 961e70
			do {
Packit 961e70
				ips_sync_reads();
Packit 961e70
				process_packet(ptl_gen,
Packit 961e70
					       (am_pkt_short_t *) ptl->reqH.
Packit 961e70
					       head, 1);
Packit 961e70
				advance_head(&ptl->reqH);
Packit 961e70
				err = PSM2_OK;
Packit 961e70
			} while (!QISEMPTY(ptl->reqH.head->flag));
Packit 961e70
		}
Packit 961e70
	}
Packit 961e70
Packit 961e70
	if (is_internal) {
Packit 961e70
		if (err == PSM2_OK)	/* some progress, no yields */
Packit 961e70
			ptl->zero_polls = 0;
Packit 961e70
		else if (++ptl->zero_polls == AMSH_ZERO_POLLS_BEFORE_YIELD) {
Packit 961e70
			/* no progress for AMSH_ZERO_POLLS_BEFORE_YIELD */
Packit 961e70
			sched_yield();
Packit 961e70
			ptl->zero_polls = 0;
Packit 961e70
		}
Packit 961e70
Packit 961e70
		if (++ptl->amsh_only_polls == AMSH_POLLS_BEFORE_PSM_POLL) {
Packit 961e70
			psmi_poll_internal(ptl->ep, 0);
Packit 961e70
			ptl->amsh_only_polls = 0;
Packit 961e70
		}
Packit 961e70
	}
Packit 961e70
	return err;		/* if we actually did something */
Packit 961e70
}
Packit 961e70
Packit 961e70
/* non-inlined version */
Packit 961e70
static
Packit 961e70
psm2_error_t
Packit 961e70
amsh_poll_internal(ptl_t *ptl, int replyonly)
Packit 961e70
{
Packit 961e70
	return amsh_poll_internal_inner(ptl, replyonly, 1);
Packit 961e70
}
Packit 961e70
Packit 961e70
#ifdef PSM_PROFILE
Packit 961e70
#define AMSH_POLL_UNTIL(ptl, isreply, cond) \
Packit 961e70
	do {								\
Packit 961e70
		PSMI_PROFILE_BLOCK();					\
Packit 961e70
		while (!(cond)) {					\
Packit 961e70
			PSMI_PROFILE_REBLOCK(				\
Packit 961e70
				amsh_poll_internal(ptl, isreply) ==	\
Packit 961e70
					PSM2_OK_NO_PROGRESS);		\
Packit 961e70
		}							\
Packit 961e70
		PSMI_PROFILE_UNBLOCK();					\
Packit 961e70
	} while (0)
Packit 961e70
#else
Packit 961e70
#define AMSH_POLL_UNTIL(ptl, isreply, cond)			\
Packit 961e70
	do {							\
Packit 961e70
		while (!(cond)) {				\
Packit 961e70
			amsh_poll_internal(ptl, isreply);	\
Packit 961e70
		}						\
Packit 961e70
	} while (0)
Packit 961e70
#endif
Packit 961e70
Packit 961e70
static psm2_error_t amsh_poll(ptl_t *ptl, int replyonly)
Packit 961e70
{
Packit 961e70
	return amsh_poll_internal_inner(ptl, replyonly, 0);
Packit 961e70
}
Packit 961e70
Packit 961e70
PSMI_ALWAYS_INLINE(
Packit 961e70
void
Packit 961e70
am_send_pkt_short(ptl_t *ptl, uint32_t destidx, uint32_t returnidx,
Packit 961e70
		  uint32_t bulkidx, uint16_t fmt, uint16_t nargs,
Packit 961e70
		  uint16_t handleridx, psm2_amarg_t *args,
Packit 961e70
		  const void *src, uint32_t len, int isreply))
Packit 961e70
{
Packit 961e70
	int i;
Packit 961e70
	volatile am_pkt_short_t *pkt;
Packit 961e70
	int copy_nargs;
Packit 961e70
Packit 961e70
	AMSH_POLL_UNTIL(ptl, isreply,
Packit 961e70
			(pkt =
Packit 961e70
			 am_ctl_getslot_pkt(ptl, destidx, isreply)) != NULL);
Packit 961e70
Packit 961e70
	/* got a free pkt... fill it in */
Packit 961e70
	pkt->bulkidx = bulkidx;
Packit 961e70
	pkt->shmidx = returnidx;
Packit 961e70
	pkt->type = fmt;
Packit 961e70
	pkt->nargs = nargs;
Packit 961e70
	pkt->handleridx = handleridx;
Packit 961e70
Packit 961e70
	/* Limit the number of args copied here to NSHORT_ARGS.  Additional args
Packit 961e70
	   are carried in the bulkpkt. */
Packit 961e70
	copy_nargs = nargs;
Packit 961e70
	if (copy_nargs > NSHORT_ARGS) {
Packit 961e70
		copy_nargs = NSHORT_ARGS;
Packit 961e70
	}
Packit 961e70
Packit 961e70
	for (i = 0; i < copy_nargs; i++)
Packit 961e70
		pkt->args[i] = args[i];
Packit 961e70
Packit 961e70
	if (fmt == AMFMT_SHORT_INLINE)
Packit 961e70
		mq_copy_tiny((uint32_t *) &pkt->args[nargs], (uint32_t *) src,
Packit 961e70
			     len);
Packit 961e70
Packit 961e70
	_HFI_VDBG("pkt=%p fmt=%d bulkidx=%d,flag=%d,nargs=%d,"
Packit 961e70
		  "buf=%p,len=%d,hidx=%d,value=%d\n", pkt, (int)fmt, bulkidx,
Packit 961e70
		  pkt->flag, pkt->nargs, src, (int)len, (int)handleridx,
Packit 961e70
		  src != NULL ? *((uint32_t *) src) : 0);
Packit 961e70
	QMARKREADY(pkt);
Packit 961e70
}
Packit 961e70
Packit 961e70
#define amsh_shm_copy_short psmi_mq_mtucpy
Packit 961e70
#define amsh_shm_copy_long  psmi_mq_mtucpy
Packit 961e70
Packit 961e70
PSMI_ALWAYS_INLINE(
Packit 961e70
int
Packit 961e70
psmi_amsh_generic_inner(uint32_t amtype, ptl_t *ptl_gen, psm2_epaddr_t epaddr,
Packit 961e70
			psm2_handler_t handler, psm2_amarg_t *args, int nargs,
Packit 961e70
			const void *src, size_t len, void *dst, int flags))
Packit 961e70
{
Packit 961e70
#ifdef PSM_DEBUG
Packit 961e70
	struct ptl_am *ptl = (struct ptl_am *)ptl_gen;
Packit 961e70
#endif
Packit 961e70
	uint16_t type;
Packit 961e70
	uint32_t bulkidx;
Packit 961e70
	uint16_t hidx = (uint16_t) handler;
Packit 961e70
	int destidx = ((am_epaddr_t *) epaddr)->shmidx;
Packit 961e70
	int returnidx = ((am_epaddr_t *) epaddr)->return_shmidx;
Packit 961e70
	int is_reply = AM_IS_REPLY(amtype);
Packit 961e70
	volatile am_pkt_bulk_t *bulkpkt;
Packit 961e70
Packit 961e70
	_HFI_VDBG("%s epaddr=%s, shmidx=%d, type=%d\n",
Packit 961e70
		  is_reply ? "reply" : "request",
Packit 961e70
		  psmi_epaddr_get_name(epaddr->epid),
Packit 961e70
		  ((am_epaddr_t *) epaddr)->shmidx, amtype);
Packit 961e70
	psmi_assert(epaddr != ptl->epaddr);
Packit 961e70
Packit 961e70
	switch (amtype) {
Packit 961e70
	case AMREQUEST_SHORT:
Packit 961e70
	case AMREPLY_SHORT:
Packit 961e70
		if (len + (nargs << 3) <= (NSHORT_ARGS << 3)) {
Packit 961e70
			/* Payload fits in args packet */
Packit 961e70
			type = AMFMT_SHORT_INLINE;
Packit 961e70
			bulkidx = len;
Packit 961e70
		} else {
Packit 961e70
			int i;
Packit 961e70
Packit 961e70
			psmi_assert(len < amsh_qelemsz.qreqFifoLong);
Packit 961e70
			psmi_assert(src != NULL || nargs > NSHORT_ARGS);
Packit 961e70
			type = AMFMT_SHORT;
Packit 961e70
Packit 961e70
			AMSH_POLL_UNTIL(ptl_gen, is_reply,
Packit 961e70
					(bulkpkt =
Packit 961e70
					 am_ctl_getslot_long(ptl_gen, destidx,
Packit 961e70
							     is_reply)) !=
Packit 961e70
					NULL);
Packit 961e70
Packit 961e70
			bulkidx = bulkpkt->idx;
Packit 961e70
			bulkpkt->len = len;
Packit 961e70
			_HFI_VDBG("bulkpkt %p flag is %d from idx %d\n",
Packit 961e70
				  bulkpkt, bulkpkt->flag, destidx);
Packit 961e70
Packit 961e70
			for (i = 0; i < nargs - NSHORT_ARGS; i++) {
Packit 961e70
				bulkpkt->args[i] = args[i + NSHORT_ARGS];
Packit 961e70
			}
Packit 961e70
Packit 961e70
			amsh_shm_copy_short((void *)bulkpkt->payload, src,
Packit 961e70
					    (uint32_t) len);
Packit 961e70
			QMARKREADY(bulkpkt);
Packit 961e70
		}
Packit 961e70
		am_send_pkt_short(ptl_gen, destidx, returnidx, bulkidx, type,
Packit 961e70
				  nargs, hidx, args, src, len, is_reply);
Packit 961e70
		break;
Packit 961e70
Packit 961e70
	case AMREQUEST_LONG:
Packit 961e70
	case AMREPLY_LONG:
Packit 961e70
		{
Packit 961e70
			uint32_t bytes_left = len;
Packit 961e70
			uint8_t *src_this = (uint8_t *) src;
Packit 961e70
			uint8_t *dst_this = (uint8_t *) dst;
Packit 961e70
			uint32_t bytes_this;
Packit 961e70
Packit 961e70
			type = AMFMT_LONG;
Packit 961e70
Packit 961e70
			_HFI_VDBG("[long][%s] src=%p,dest=%p,len=%d,hidx=%d\n",
Packit 961e70
				  is_reply ? "rep" : "req", src, dst,
Packit 961e70
				  (uint32_t) len, hidx);
Packit 961e70
			while (bytes_left) {
Packit 961e70
				bytes_this = min(bytes_left, AMLONG_MTU);
Packit 961e70
				AMSH_POLL_UNTIL(ptl_gen, is_reply,
Packit 961e70
						(bulkpkt =
Packit 961e70
						 am_ctl_getslot_long(ptl_gen,
Packit 961e70
								     destidx,
Packit 961e70
								     is_reply))
Packit 961e70
						!= NULL);
Packit 961e70
				bytes_left -= bytes_this;
Packit 961e70
				if (bytes_left == 0)
Packit 961e70
					type = AMFMT_LONG_END;
Packit 961e70
				bulkidx = bulkpkt->idx;
Packit 961e70
				amsh_shm_copy_long((void *)bulkpkt->payload,
Packit 961e70
						   src_this, bytes_this);
Packit 961e70
Packit 961e70
				bulkpkt->dest = (uintptr_t) dst;
Packit 961e70
				bulkpkt->dest_off =
Packit 961e70
				    (uint32_t) ((uintptr_t) dst_this -
Packit 961e70
						(uintptr_t) dst);
Packit 961e70
				bulkpkt->len = bytes_this;
Packit 961e70
				QMARKREADY(bulkpkt);
Packit 961e70
				am_send_pkt_short(ptl_gen, destidx, returnidx,
Packit 961e70
						  bulkidx, type, nargs, hidx,
Packit 961e70
						  args, NULL, 0, is_reply);
Packit 961e70
				src_this += bytes_this;
Packit 961e70
				dst_this += bytes_this;
Packit 961e70
			}
Packit 961e70
			break;
Packit 961e70
		}
Packit 961e70
	default:
Packit 961e70
		break;
Packit 961e70
	}
Packit 961e70
	return 1;
Packit 961e70
}
Packit 961e70
Packit 961e70
/* A generic version that's not inlined */
Packit 961e70
int
Packit 961e70
psmi_amsh_generic(uint32_t amtype, ptl_t *ptl, psm2_epaddr_t epaddr,
Packit 961e70
		  psm2_handler_t handler, psm2_amarg_t *args, int nargs,
Packit 961e70
		  const void *src, size_t len, void *dst, int flags)
Packit 961e70
{
Packit 961e70
	return psmi_amsh_generic_inner(amtype, ptl, epaddr, handler, args,
Packit 961e70
				       nargs, src, len, dst, flags);
Packit 961e70
}
Packit 961e70
Packit 961e70
int
Packit 961e70
psmi_amsh_short_request(ptl_t *ptl, psm2_epaddr_t epaddr,
Packit 961e70
			psm2_handler_t handler, psm2_amarg_t *args, int nargs,
Packit 961e70
			const void *src, size_t len, int flags)
Packit 961e70
{
Packit 961e70
	return psmi_amsh_generic_inner(AMREQUEST_SHORT, ptl, epaddr, handler,
Packit 961e70
				       args, nargs, src, len, NULL, flags);
Packit 961e70
}
Packit 961e70
Packit 961e70
int
Packit 961e70
psmi_amsh_long_request(ptl_t *ptl, psm2_epaddr_t epaddr,
Packit 961e70
		       psm2_handler_t handler, psm2_amarg_t *args, int nargs,
Packit 961e70
		       const void *src, size_t len, void *dest, int flags)
Packit 961e70
{
Packit 961e70
	return psmi_amsh_generic_inner(AMREQUEST_LONG, ptl, epaddr, handler,
Packit 961e70
				       args, nargs, src, len, dest, flags);
Packit 961e70
}
Packit 961e70
Packit 961e70
void
Packit 961e70
psmi_amsh_short_reply(amsh_am_token_t *tok,
Packit 961e70
		      psm2_handler_t handler, psm2_amarg_t *args, int nargs,
Packit 961e70
		      const void *src, size_t len, int flags)
Packit 961e70
{
Packit 961e70
	psmi_amsh_generic_inner(AMREPLY_SHORT, tok->ptl, tok->tok.epaddr_incoming,
Packit 961e70
				handler, args, nargs, src, len, NULL, flags);
Packit 961e70
	return;
Packit 961e70
}
Packit 961e70
Packit 961e70
void
Packit 961e70
psmi_amsh_long_reply(amsh_am_token_t *tok,
Packit 961e70
		     psm2_handler_t handler, psm2_amarg_t *args, int nargs,
Packit 961e70
		     const void *src, size_t len, void *dest, int flags)
Packit 961e70
{
Packit 961e70
	psmi_amsh_generic_inner(AMREPLY_LONG, tok->ptl, tok->tok.epaddr_incoming,
Packit 961e70
				handler, args, nargs, src, len, dest, flags);
Packit 961e70
	return;
Packit 961e70
}
Packit 961e70
Packit 961e70
void psmi_am_reqq_init(ptl_t *ptl_gen)
Packit 961e70
{
Packit 961e70
	struct ptl_am *ptl = (struct ptl_am *)ptl_gen;
Packit 961e70
	ptl->psmi_am_reqq_fifo.first = NULL;
Packit 961e70
	ptl->psmi_am_reqq_fifo.lastp = &ptl->psmi_am_reqq_fifo.first;
Packit 961e70
}
Packit 961e70
Packit 961e70
psm2_error_t psmi_am_reqq_drain(ptl_t *ptl_gen)
Packit 961e70
{
Packit 961e70
	struct ptl_am *ptl = (struct ptl_am *)ptl_gen;
Packit 961e70
	am_reqq_t *reqn = ptl->psmi_am_reqq_fifo.first;
Packit 961e70
	am_reqq_t *req;
Packit 961e70
	psm2_error_t err = PSM2_OK_NO_PROGRESS;
Packit 961e70
Packit 961e70
	/* We're going to process the entire list, and running the generic handler
Packit 961e70
	 * below can cause other requests to be enqueued in the queue that we're
Packit 961e70
	 * processing. */
Packit 961e70
	ptl->psmi_am_reqq_fifo.first = NULL;
Packit 961e70
	ptl->psmi_am_reqq_fifo.lastp = &ptl->psmi_am_reqq_fifo.first;
Packit 961e70
Packit 961e70
	while ((req = reqn) != NULL) {
Packit 961e70
		err = PSM2_OK;
Packit 961e70
		reqn = req->next;
Packit 961e70
		_HFI_VDBG
Packit 961e70
		    ("push of reqq=%p epaddr=%s localreq=%p remotereq=%p\n",
Packit 961e70
		     req, psmi_epaddr_get_hostname(req->epaddr->epid),
Packit 961e70
		     (void *)(uintptr_t) req->args[1].u64w0,
Packit 961e70
		     (void *)(uintptr_t) req->args[0].u64w0);
Packit 961e70
		psmi_amsh_generic(req->amtype, req->ptl, req->epaddr,
Packit 961e70
				  req->handler, req->args, req->nargs, req->src,
Packit 961e70
				  req->len, req->dest, req->amflags);
Packit 961e70
		if (req->flags & AM_FLAG_SRC_TEMP)
Packit 961e70
			psmi_free(req->src);
Packit 961e70
		psmi_free(req);
Packit 961e70
	}
Packit 961e70
	return err;
Packit 961e70
}
Packit 961e70
Packit 961e70
void
Packit 961e70
psmi_am_reqq_add(int amtype, ptl_t *ptl_gen, psm2_epaddr_t epaddr,
Packit 961e70
		 psm2_handler_t handler, psm2_amarg_t *args, int nargs,
Packit 961e70
		 void *src, size_t len, void *dest, int amflags)
Packit 961e70
{
Packit 961e70
	struct ptl_am *ptl = (struct ptl_am *)ptl_gen;
Packit 961e70
	int i;
Packit 961e70
	int flags = 0;
Packit 961e70
	am_reqq_t *nreq =
Packit 961e70
	    (am_reqq_t *) psmi_malloc(ptl->ep, UNDEFINED, sizeof(am_reqq_t));
Packit 961e70
	psmi_assert_always(nreq != NULL);
Packit 961e70
	_HFI_VDBG("alloc of reqq=%p, to epaddr=%s, ptr=%p, len=%d, "
Packit 961e70
		  "localreq=%p, remotereq=%p\n", nreq,
Packit 961e70
		  psmi_epaddr_get_hostname(epaddr->epid), dest,
Packit 961e70
		  (int)len, (void *)(uintptr_t) args[1].u64w0,
Packit 961e70
		  (void *)(uintptr_t) args[0].u64w0);
Packit 961e70
Packit 961e70
	psmi_assert(nargs <= 8);
Packit 961e70
	nreq->next = NULL;
Packit 961e70
	nreq->amtype = amtype;
Packit 961e70
	nreq->ptl = ptl_gen;
Packit 961e70
	nreq->epaddr = epaddr;
Packit 961e70
	nreq->handler = handler;
Packit 961e70
	for (i = 0; i < nargs; i++)
Packit 961e70
		nreq->args[i] = args[i];
Packit 961e70
	nreq->nargs = nargs;
Packit 961e70
	if (AM_IS_LONG(amtype) && src != NULL &&
Packit 961e70
	    len > 0 && !(amflags & AM_FLAG_SRC_ASYNC)) {
Packit 961e70
		abort();
Packit 961e70
		flags |= AM_FLAG_SRC_TEMP;
Packit 961e70
		nreq->src = psmi_malloc(ptl->ep, UNDEFINED, len);
Packit 961e70
		psmi_assert_always(nreq->src != NULL);	/* XXX mem */
Packit 961e70
		amsh_shm_copy_short(nreq->src, src, len);
Packit 961e70
	} else
Packit 961e70
		nreq->src = src;
Packit 961e70
	nreq->len = len;
Packit 961e70
	nreq->dest = dest;
Packit 961e70
	nreq->amflags = amflags;
Packit 961e70
	nreq->flags = flags;
Packit 961e70
Packit 961e70
	nreq->next = NULL;
Packit 961e70
	*(ptl->psmi_am_reqq_fifo.lastp) = nreq;
Packit 961e70
	ptl->psmi_am_reqq_fifo.lastp = &nreq->next;
Packit 961e70
}
Packit 961e70
Packit 961e70
static
Packit 961e70
void process_packet(ptl_t *ptl_gen, am_pkt_short_t *pkt, int isreq)
Packit 961e70
{
Packit 961e70
	struct ptl_am *ptl = (struct ptl_am *)ptl_gen;
Packit 961e70
	amsh_am_token_t tok;
Packit 961e70
	psmi_handler_fn_t fn;
Packit 961e70
	psm2_amarg_t *args = pkt->args;
Packit 961e70
	uint16_t shmidx = pkt->shmidx;
Packit 961e70
	int nargs = pkt->nargs;
Packit 961e70
Packit 961e70
	tok.tok.epaddr_incoming = ((shmidx != (uint16_t)-1) ? ptl->am_ep[shmidx].epaddr : 0);
Packit 961e70
	tok.ptl = ptl_gen;
Packit 961e70
	tok.mq = ptl->ep->mq;
Packit 961e70
	tok.shmidx = shmidx;
Packit 961e70
Packit 961e70
	uint16_t hidx = (uint16_t) pkt->handleridx;
Packit 961e70
	uint32_t bulkidx = pkt->bulkidx;
Packit 961e70
	uintptr_t bulkptr;
Packit 961e70
	am_pkt_bulk_t *bulkpkt;
Packit 961e70
Packit 961e70
	fn = (psmi_handler_fn_t) psmi_allhandlers[hidx].fn;
Packit 961e70
	psmi_assert(fn != NULL);
Packit 961e70
	psmi_assert((uintptr_t) pkt > ptl->self_nodeinfo->amsh_shmbase);
Packit 961e70
Packit 961e70
	if (pkt->type == AMFMT_SHORT_INLINE) {
Packit 961e70
		_HFI_VDBG
Packit 961e70
		    ("%s inline flag=%d nargs=%d from_idx=%d pkt=%p hidx=%d\n",
Packit 961e70
		     isreq ? "request" : "reply", pkt->flag, nargs, shmidx, pkt,
Packit 961e70
		     hidx);
Packit 961e70
Packit 961e70
		fn(&tok, args, nargs, pkt->length > 0 ?
Packit 961e70
		   (void *)&args[nargs] : NULL, pkt->length);
Packit 961e70
	} else {
Packit 961e70
		int isend = 0;
Packit 961e70
		switch (pkt->type) {
Packit 961e70
		case AMFMT_LONG_END:
Packit 961e70
			isend = 1;
Packit 961e70
		case AMFMT_LONG:
Packit 961e70
		case AMFMT_SHORT:
Packit 961e70
			if (isreq) {
Packit 961e70
				bulkptr =
Packit 961e70
				    (uintptr_t) ptl->self_nodeinfo->qdir.
Packit 961e70
				    qreqFifoLong;
Packit 961e70
				bulkptr += bulkidx * amsh_qelemsz.qreqFifoLong;
Packit 961e70
			} else {
Packit 961e70
				bulkptr =
Packit 961e70
				    (uintptr_t) ptl->self_nodeinfo->qdir.
Packit 961e70
				    qrepFifoLong;
Packit 961e70
				bulkptr += bulkidx * amsh_qelemsz.qrepFifoLong;
Packit 961e70
			}
Packit 961e70
			break;
Packit 961e70
		default:
Packit 961e70
			bulkptr = 0;
Packit 961e70
			psmi_handle_error(PSMI_EP_NORETURN, PSM2_INTERNAL_ERR,
Packit 961e70
					  "Unknown/unhandled packet type 0x%x",
Packit 961e70
					  pkt->type);
Packit 961e70
			return;
Packit 961e70
		}
Packit 961e70
Packit 961e70
		bulkpkt = (am_pkt_bulk_t *) bulkptr;
Packit 961e70
		_HFI_VDBG("ep=%p mq=%p type=%d bulkidx=%d flag=%d/%d nargs=%d "
Packit 961e70
			  "from_idx=%d pkt=%p/%p hidx=%d\n",
Packit 961e70
			  ptl->ep, ptl->ep->mq, pkt->type, bulkidx, pkt->flag,
Packit 961e70
			  bulkpkt->flag, nargs, shmidx, pkt, bulkpkt, hidx);
Packit 961e70
		psmi_assert(bulkpkt->flag == QREADY);
Packit 961e70
Packit 961e70
		if (nargs > NSHORT_ARGS || isend == 1) {
Packit 961e70
			/* Either there are more args in the bulkpkt, or this is the last
Packit 961e70
			   packet of a long payload.  In either case, copy the args. */
Packit 961e70
			int i;
Packit 961e70
			args =
Packit 961e70
			    alloca((NSHORT_ARGS +
Packit 961e70
				    NBULK_ARGS) * sizeof(psm2_amarg_t));
Packit 961e70
Packit 961e70
			for (i = 0; i < NSHORT_ARGS; i++) {
Packit 961e70
				args[i] = pkt->args[i];
Packit 961e70
			}
Packit 961e70
Packit 961e70
			for (; i < nargs; i++) {
Packit 961e70
				args[i] = bulkpkt->args[i - NSHORT_ARGS];
Packit 961e70
			}
Packit 961e70
		}
Packit 961e70
Packit 961e70
		if (pkt->type == AMFMT_SHORT) {
Packit 961e70
			fn(&tok, args, nargs,
Packit 961e70
			   (void *)bulkpkt->payload, bulkpkt->len);
Packit 961e70
			QMARKFREE(bulkpkt);
Packit 961e70
		} else {
Packit 961e70
			amsh_shm_copy_long((void *)(bulkpkt->dest +
Packit 961e70
						    bulkpkt->dest_off),
Packit 961e70
					   bulkpkt->payload, bulkpkt->len);
Packit 961e70
Packit 961e70
			/* If this is the last packet, copy args before running the
Packit 961e70
			 * handler */
Packit 961e70
			if (isend) {
Packit 961e70
				void *dest = (void *)bulkpkt->dest;
Packit 961e70
				size_t len =
Packit 961e70
				    (size_t) (bulkpkt->dest_off + bulkpkt->len);
Packit 961e70
				QMARKFREE(bulkpkt);
Packit 961e70
				fn(&tok, args, nargs, dest, len);
Packit 961e70
			} else
Packit 961e70
				QMARKFREE(bulkpkt);
Packit 961e70
		}
Packit 961e70
	}
Packit 961e70
	return;
Packit 961e70
}
Packit 961e70
Packit 961e70
static
Packit 961e70
psm2_error_t
Packit 961e70
amsh_mq_rndv(ptl_t *ptl, psm2_mq_t mq, psm2_mq_req_t req,
Packit 961e70
	     psm2_epaddr_t epaddr, psm2_mq_tag_t *tag, const void *buf,
Packit 961e70
	     uint32_t len)
Packit 961e70
{
Packit 961e70
	psm2_amarg_t args[5];
Packit 961e70
	psm2_error_t err = PSM2_OK;
Packit 961e70
Packit 961e70
	args[0].u32w0 = MQ_MSG_LONGRTS;
Packit 961e70
	args[0].u32w1 = len;
Packit 961e70
	args[1].u32w1 = tag->tag[0];
Packit 961e70
	args[1].u32w0 = tag->tag[1];
Packit 961e70
	args[2].u32w1 = tag->tag[2];
Packit 961e70
	args[3].u64w0 = (uint64_t) (uintptr_t) req;
Packit 961e70
	args[4].u64w0 = (uint64_t) (uintptr_t) buf;
Packit 961e70
Packit 961e70
	psmi_assert(req != NULL);
Packit 961e70
	req->type = MQE_TYPE_SEND;
Packit 961e70
	req->req_data.buf = (void *)buf;
Packit 961e70
	req->req_data.buf_len = len;
Packit 961e70
	req->req_data.send_msglen = len;
Packit 961e70
	req->send_msgoff = 0;
Packit 961e70
Packit 961e70
#ifdef PSM_CUDA
Packit Service 7ed5cc
	/* If the send buffer is on gpu, we create a cuda IPC
Packit Service 7ed5cc
	 * handle and send it as payload in the RTS */
Packit Service 7ed5cc
	if (req->is_buf_gpu_mem) {
Packit Service 7ed5cc
		CUdeviceptr buf_base_ptr;
Packit Service 7ed5cc
		PSMI_CUDA_CALL(cuMemGetAddressRange, &buf_base_ptr, NULL, (CUdeviceptr)buf);
Packit Service 7ed5cc
Packit Service 7ed5cc
		/* Offset in GPU buffer from which we copy data, we have to
Packit Service 7ed5cc
			* send it separetly because this offset is lost
Packit Service 7ed5cc
			* when cuIpcGetMemHandle  is called */
Packit Service 7ed5cc
		req->cuda_ipc_offset = buf - (void*)buf_base_ptr;
Packit Service 7ed5cc
		args[2].u32w0 = (uint32_t)req->cuda_ipc_offset;
Packit Service 7ed5cc
Packit Service 7ed5cc
		PSMI_CUDA_CALL(cuIpcGetMemHandle,
Packit Service 7ed5cc
				&req->cuda_ipc_handle,
Packit Service 7ed5cc
				(CUdeviceptr) buf);
Packit 961e70
		if (req->flags_internal & PSMI_REQ_FLAG_FASTPATH) {
Packit 961e70
			psmi_am_reqq_add(AMREQUEST_SHORT, ptl,
Packit Service 7ed5cc
						epaddr, mq_handler_hidx,
Packit Service 7ed5cc
						args, 5, (void*)&req->cuda_ipc_handle,
Packit Service 7ed5cc
						sizeof(CUipcMemHandle), NULL, 0);
Packit 961e70
		} else {
Packit 961e70
			psmi_amsh_short_request(ptl, epaddr, mq_handler_hidx,
Packit Service 7ed5cc
						args, 5, (void*)&req->cuda_ipc_handle,
Packit Service 7ed5cc
						sizeof(CUipcMemHandle), 0);
Packit 961e70
		}
Packit Service 7ed5cc
		req->cuda_ipc_handle_attached = 1;
Packit Service 7ed5cc
	} else
Packit Service 7ed5cc
#endif
Packit Service 7ed5cc
	if (req->flags_internal & PSMI_REQ_FLAG_FASTPATH) {
Packit Service 7ed5cc
		psmi_am_reqq_add(AMREQUEST_SHORT, ptl, epaddr, mq_handler_hidx,
Packit Service 7ed5cc
					args, 5, NULL, 0, NULL, 0);
Packit Service 7ed5cc
	} else {
Packit Service 7ed5cc
		psmi_amsh_short_request(ptl, epaddr, mq_handler_hidx,
Packit Service 7ed5cc
					args, 5, NULL, 0, 0);
Packit Service 7ed5cc
	}
Packit Service 7ed5cc
Packit Service 7ed5cc
	mq->stats.tx_num++;
Packit Service 7ed5cc
	mq->stats.tx_shm_num++;
Packit Service 7ed5cc
	mq->stats.tx_rndv_num++;
Packit Service 7ed5cc
	mq->stats.tx_rndv_bytes += len;
Packit 961e70
Packit 961e70
	return err;
Packit 961e70
}
Packit 961e70
Packit 961e70
PSMI_ALWAYS_INLINE(
Packit 961e70
psm2_error_t
Packit Service 7ed5cc
amsh_mq_send_inner_eager(psm2_mq_t mq, psm2_mq_req_t req, psm2_epaddr_t epaddr,
Packit Service 7ed5cc
			psm2_amarg_t *args, uint32_t flags_user, uint32_t flags_internal,
Packit Service 7ed5cc
			psm2_mq_tag_t *tag, const void *ubuf, uint32_t len))
Packit 961e70
{
Packit Service 7ed5cc
	uint32_t bytes_left = len;
Packit Service 7ed5cc
	uint32_t bytes_this = 0;
Packit 961e70
Packit Service 7ed5cc
	psm2_handler_t handler = mq_handler_hidx;
Packit Service 7ed5cc
Packit Service 7ed5cc
	args[1].u32w1 = tag->tag[0];
Packit Service 7ed5cc
	args[1].u32w0 = tag->tag[1];
Packit Service 7ed5cc
	args[2].u32w1 = tag->tag[2];
Packit Service 7ed5cc
	args[2].u32w0 = 0;
Packit 961e70
Packit 961e70
	if (!flags_user && len <= AMLONG_MTU) {
Packit 961e70
		if (len <= 32)
Packit 961e70
			args[0].u32w0 = MQ_MSG_TINY;
Packit 961e70
		else
Packit 961e70
			args[0].u32w0 = MQ_MSG_SHORT;
Packit Service 7ed5cc
	} else {
Packit 961e70
		args[0].u32w0 = MQ_MSG_EAGER;
Packit 961e70
		args[0].u32w1 = len;
Packit Service 7ed5cc
	}
Packit Service 7ed5cc
Packit Service 7ed5cc
	do {
Packit Service 7ed5cc
		args[2].u32w0 += bytes_this;
Packit Service 7ed5cc
		bytes_this = min(bytes_left, AMLONG_MTU);
Packit Service 7ed5cc
Packit Service 7ed5cc
		/* Assume that shared-memory active messages are delivered in order */
Packit 961e70
		if (flags_internal & PSMI_REQ_FLAG_FASTPATH) {
Packit 961e70
			psmi_am_reqq_add(AMREQUEST_SHORT, epaddr->ptlctl->ptl,
Packit Service 7ed5cc
					epaddr, handler, args, 3, (void *)ubuf,
Packit Service 7ed5cc
					bytes_this, NULL, 0);
Packit 961e70
		} else {
Packit 961e70
			psmi_amsh_short_request(epaddr->ptlctl->ptl, epaddr,
Packit Service 7ed5cc
						handler, args, 3, ubuf, bytes_this, 0);
Packit 961e70
		}
Packit 961e70
Packit Service 7ed5cc
		ubuf += bytes_this;
Packit Service 7ed5cc
		bytes_left -= bytes_this;
Packit Service 7ed5cc
		handler = mq_handler_data_hidx;
Packit Service 7ed5cc
	} while(bytes_left);
Packit 961e70
Packit 961e70
	/* All eager async sends are always "all done" */
Packit 961e70
	if (req != NULL) {
Packit 961e70
		req->state = MQ_STATE_COMPLETE;
Packit 961e70
		mq_qq_append(&mq->completed_q, req);
Packit 961e70
	}
Packit 961e70
Packit 961e70
	mq->stats.tx_num++;
Packit 961e70
	mq->stats.tx_shm_num++;
Packit 961e70
	mq->stats.tx_eager_num++;
Packit 961e70
	mq->stats.tx_eager_bytes += len;
Packit 961e70
Packit Service 7ed5cc
	return PSM2_OK;
Packit Service 7ed5cc
}
Packit Service 7ed5cc
Packit Service 7ed5cc
/*
Packit Service 7ed5cc
 * All shared am mq sends, req can be NULL
Packit Service 7ed5cc
 */
Packit Service 7ed5cc
PSMI_ALWAYS_INLINE(
Packit Service 7ed5cc
psm2_error_t
Packit Service 7ed5cc
amsh_mq_send_inner(psm2_mq_t mq, psm2_mq_req_t req, psm2_epaddr_t epaddr,
Packit Service 7ed5cc
		   uint32_t flags_user, uint32_t flags_internal, psm2_mq_tag_t *tag,
Packit Service 7ed5cc
		   const void *ubuf, uint32_t len))
Packit Service 7ed5cc
{
Packit Service 7ed5cc
	psm2_amarg_t args[3];
Packit Service 7ed5cc
	psm2_error_t err = PSM2_OK;
Packit Service 7ed5cc
	int is_blocking = (req == NULL);
Packit Service 7ed5cc
Packit Service 7ed5cc
#ifdef PSM_CUDA
Packit Service 7ed5cc
	int gpu_mem = 0;
Packit Service 7ed5cc
	int ep_supports_p2p = (1 << ((am_epaddr_t *) epaddr)->gpuid) & gpu_p2p_supported;
Packit Service 7ed5cc
Packit Service 7ed5cc
	if (PSMI_IS_CUDA_ENABLED && PSMI_IS_CUDA_MEM(ubuf)) {
Packit Service 7ed5cc
		gpu_mem = 1;
Packit Service 7ed5cc
Packit Service 7ed5cc
		/* All sends from a gpu buffer use the rendezvous protocol if p2p is supported */
Packit Service 7ed5cc
		if (ep_supports_p2p) {
Packit Service 7ed5cc
			goto do_rendezvous;
Packit Service 7ed5cc
		}
Packit Service 7ed5cc
Packit Service 7ed5cc
		/*
Packit Service 7ed5cc
		 * Use eager messages if P2P is unsupported between endpoints.
Packit Service 7ed5cc
		 * Potentially use rendezvous with blocking requests only.
Packit Service 7ed5cc
		 */
Packit Service 7ed5cc
		if (!is_blocking)
Packit Service 7ed5cc
			goto do_eager;
Packit Service 7ed5cc
	}
Packit Service 7ed5cc
#endif
Packit Service 7ed5cc
	if (flags_user & PSM2_MQ_FLAG_SENDSYNC)
Packit Service 7ed5cc
		goto do_rendezvous;
Packit Service 7ed5cc
Packit Service 7ed5cc
	if (len <= mq->shm_thresh_rv)
Packit Service 7ed5cc
#ifdef PSM_CUDA
Packit Service 7ed5cc
do_eager:
Packit Service 7ed5cc
#endif
Packit Service 7ed5cc
		return amsh_mq_send_inner_eager(mq, req, epaddr, args, flags_user,
Packit Service 7ed5cc
						flags_internal, tag, ubuf, len);
Packit Service 7ed5cc
do_rendezvous:
Packit Service 7ed5cc
	if (is_blocking) {
Packit Service 7ed5cc
		req = psmi_mq_req_alloc(mq, MQE_TYPE_SEND);
Packit Service 7ed5cc
		if_pf(req == NULL)
Packit Service 7ed5cc
			return PSM2_NO_MEMORY;
Packit Service 7ed5cc
		req->req_data.send_msglen = len;
Packit Service 7ed5cc
		req->req_data.tag = *tag;
Packit Service 7ed5cc
Packit Service 7ed5cc
		/* Since SEND command is blocking, this request is
Packit Service 7ed5cc
		 * entirely internal and we will not be exposed to user.
Packit Service 7ed5cc
		 * Setting as internal so it will not be added to
Packit Service 7ed5cc
		 * mq->completed_q */
Packit Service 7ed5cc
		req->flags_internal |= (flags_internal | PSMI_REQ_FLAG_IS_INTERNAL);
Packit Service 7ed5cc
	}
Packit Service 7ed5cc
#ifdef PSM_CUDA
Packit Service 7ed5cc
	void *host_buf = NULL;
Packit Service 7ed5cc
Packit Service 7ed5cc
	req->is_buf_gpu_mem = gpu_mem;
Packit Service 7ed5cc
	if (req->is_buf_gpu_mem) {
Packit Service 7ed5cc
		psmi_cuda_set_attr_sync_memops(ubuf);
Packit Service 7ed5cc
Packit Service 7ed5cc
		/* Use host buffer for blocking requests if GPU P2P is
Packit Service 7ed5cc
		 * unsupported between endpoints.
Packit Service 7ed5cc
		 * This will be only used with blocking requests. */
Packit Service 7ed5cc
		if (!ep_supports_p2p) {
Packit Service 7ed5cc
			host_buf = psmi_malloc(epaddr->ptlctl->ep, UNDEFINED, len);
Packit Service 7ed5cc
			PSMI_CUDA_CALL(cuMemcpyDtoH, host_buf, (CUdeviceptr)ubuf, len);
Packit Service 7ed5cc
Packit Service 7ed5cc
			/* Reset is_buf_gpu_mem since host buffer is being used
Packit Service 7ed5cc
			 * instead of one from GPU. */
Packit Service 7ed5cc
			ubuf = host_buf;
Packit Service 7ed5cc
			req->is_buf_gpu_mem = 0;
Packit Service 7ed5cc
		}
Packit Service 7ed5cc
	}
Packit Service 7ed5cc
#endif
Packit Service 7ed5cc
Packit Service 7ed5cc
	err = amsh_mq_rndv(epaddr->ptlctl->ptl, mq, req, epaddr, tag, ubuf, len);
Packit Service 7ed5cc
Packit Service 7ed5cc
	if (err == PSM2_OK && is_blocking) {	/* wait... */
Packit Service 7ed5cc
		err = psmi_mq_wait_internal(&req;;
Packit Service 7ed5cc
	}
Packit Service 7ed5cc
Packit Service 7ed5cc
#ifdef PSM_CUDA
Packit Service 7ed5cc
	if (err == PSM2_OK && host_buf)
Packit Service 7ed5cc
		psmi_free(host_buf);
Packit Service 7ed5cc
#endif
Packit Service 7ed5cc
Packit 961e70
	return err;
Packit 961e70
}
Packit 961e70
Packit 961e70
static
Packit 961e70
psm2_error_t
Packit 961e70
amsh_mq_isend(psm2_mq_t mq, psm2_epaddr_t epaddr, uint32_t flags_user,
Packit 961e70
	      uint32_t flags_internal, psm2_mq_tag_t *tag, const void *ubuf,
Packit 961e70
	      uint32_t len, void *context, psm2_mq_req_t *req_o)
Packit 961e70
{
Packit 961e70
	psm2_mq_req_t req = psmi_mq_req_alloc(mq, MQE_TYPE_SEND);
Packit 961e70
	if_pf(req == NULL)
Packit 961e70
	    return PSM2_NO_MEMORY;
Packit 961e70
Packit 961e70
	req->req_data.send_msglen = len;
Packit 961e70
	req->req_data.tag = *tag;
Packit 961e70
	req->req_data.context = context;
Packit 961e70
	req->flags_user = flags_user;
Packit 961e70
	req->flags_internal = flags_internal;
Packit 961e70
	_HFI_VDBG("[ishrt][%s->%s][n=0][b=%p][l=%d][t=%08x.%08x.%08x]\n",
Packit 961e70
		  psmi_epaddr_get_name(epaddr->ptlctl->ep->epid),
Packit 961e70
		  psmi_epaddr_get_name(epaddr->epid), ubuf, len,
Packit 961e70
		  tag->tag[0], tag->tag[1], tag->tag[2]);
Packit 961e70
Packit 961e70
	amsh_mq_send_inner(mq, req, epaddr, flags_user, flags_internal, tag, ubuf, len);
Packit 961e70
Packit 961e70
	*req_o = req;
Packit 961e70
	return PSM2_OK;
Packit 961e70
}
Packit 961e70
Packit 961e70
static
Packit 961e70
psm2_error_t
Packit 961e70
amsh_mq_send(psm2_mq_t mq, psm2_epaddr_t epaddr, uint32_t flags,
Packit 961e70
	     psm2_mq_tag_t *tag, const void *ubuf, uint32_t len)
Packit 961e70
{
Packit 961e70
	_HFI_VDBG("[shrt][%s->%s][n=0][b=%p][l=%d][t=%08x.%08x.%08x]\n",
Packit 961e70
		  psmi_epaddr_get_name(epaddr->ptlctl->ep->epid),
Packit 961e70
		  psmi_epaddr_get_name(epaddr->epid), ubuf, len,
Packit 961e70
		  tag->tag[0], tag->tag[1], tag->tag[2]);
Packit 961e70
Packit 961e70
	amsh_mq_send_inner(mq, NULL, epaddr, flags, PSMI_REQ_FLAG_NORMAL, tag, ubuf, len);
Packit 961e70
Packit 961e70
	return PSM2_OK;
Packit 961e70
}
Packit 961e70
Packit 961e70
/* kassist-related handling */
Packit 961e70
int psmi_epaddr_pid(psm2_epaddr_t epaddr)
Packit 961e70
{
Packit 961e70
	uint16_t shmidx = ((am_epaddr_t *) epaddr)->shmidx;
Packit 961e70
	return ((struct ptl_am *)(epaddr->ptlctl->ptl))->am_ep[shmidx].pid;
Packit 961e70
}
Packit 961e70
#if _HFI_DEBUGGING
Packit 961e70
static
Packit 961e70
const char *psmi_kassist_getmode(int mode)
Packit 961e70
{
Packit 961e70
	switch (mode) {
Packit 961e70
	case PSMI_KASSIST_OFF:
Packit 961e70
		return "kassist off";
Packit 961e70
	case PSMI_KASSIST_CMA_GET:
Packit 961e70
		return "cma get";
Packit 961e70
	case PSMI_KASSIST_CMA_PUT:
Packit 961e70
		return "cma put";
Packit 961e70
	default:
Packit 961e70
		return "unknown";
Packit 961e70
	}
Packit 961e70
}
Packit 961e70
#endif
Packit 961e70
Packit 961e70
static
Packit 961e70
int psmi_get_kassist_mode()
Packit 961e70
{
Packit Service 7ed5cc
	/* Cuda PSM2 supports only KASSIST_CMA_GET */
Packit Service 7ed5cc
	int mode = PSMI_KASSIST_CMA_GET;
Packit Service 7ed5cc
#ifndef PSM_CUDA
Packit 961e70
	union psmi_envvar_val env_kassist;
Packit 961e70
Packit 961e70
	if (!psmi_getenv("PSM2_KASSIST_MODE",
Packit 961e70
			 "PSM Shared memory kernel assist mode "
Packit 961e70
			 "(cma-put, cma-get, none)",
Packit 961e70
			 PSMI_ENVVAR_LEVEL_USER, PSMI_ENVVAR_TYPE_STR,
Packit 961e70
			 (union psmi_envvar_val)
Packit 961e70
			 PSMI_KASSIST_MODE_DEFAULT_STRING, &env_kassist)) {
Packit 961e70
		char *s = env_kassist.e_str;
Packit 961e70
		if (strcasecmp(s, "cma-put") == 0)
Packit 961e70
			mode = PSMI_KASSIST_CMA_PUT;
Packit 961e70
		else if (strcasecmp(s, "cma-get") == 0)
Packit 961e70
			mode = PSMI_KASSIST_CMA_GET;
Packit 961e70
		else
Packit 961e70
			mode = PSMI_KASSIST_OFF;
Packit 961e70
	}
Packit 961e70
#endif
Packit 961e70
	return mode;
Packit 961e70
}
Packit 961e70
Packit 961e70
/* Connection handling for shared memory AM.
Packit 961e70
 *
Packit 961e70
 * arg0 => conn_op, result (PSM error type)
Packit 961e70
 * arg1 => epid (always)
Packit 961e70
 * arg2 => pid, version.
Packit 961e70
 * arg3 => pointer to error for replies.
Packit 961e70
 */
Packit 961e70
static
Packit 961e70
void
Packit 961e70
amsh_conn_handler(void *toki, psm2_amarg_t *args, int narg, void *buf,
Packit 961e70
		  size_t len)
Packit 961e70
{
Packit 961e70
	int op = args[0].u16w0;
Packit 961e70
	int phase = args[0].u32w1;
Packit 961e70
	psm2_epid_t epid = args[1].u64w0;
Packit 961e70
	int16_t return_shmidx = args[0].u16w1;
Packit 961e70
	psm2_error_t err = (psm2_error_t) args[2].u32w1;
Packit 961e70
	psm2_error_t *perr = (psm2_error_t *) (uintptr_t) args[3].u64w0;
Packit Service 7ed5cc
	unsigned int pid;
Packit Service 7ed5cc
	unsigned int gpuid;
Packit 961e70
	int force_remap = 0;
Packit 961e70
Packit 961e70
	psm2_epaddr_t epaddr;
Packit 961e70
	amsh_am_token_t *tok = (amsh_am_token_t *) toki;
Packit 961e70
	uint16_t shmidx = tok->shmidx;
Packit 961e70
	int is_valid;
Packit 961e70
	struct ptl_am *ptl = (struct ptl_am *)(tok->ptl);
Packit 961e70
	ptl_t *ptl_gen = tok->ptl;
Packit 961e70
	int cstate;
Packit 961e70
Packit 961e70
	/* We do this because it's an assumption below */
Packit 961e70
	psmi_assert_always(buf == NULL && len == 0);
Packit Service 7ed5cc
	read_extra_ep_data(args[2].u32w0, &pid, &gpuid);
Packit 961e70
Packit 961e70
	_HFI_VDBG("Conn op=%d, phase=%d, epid=%llx, err=%d\n",
Packit 961e70
		  op, phase, (unsigned long long)epid, err);
Packit 961e70
Packit 961e70
	switch (op) {
Packit 961e70
	case PSMI_AM_CONN_REQ:
Packit 961e70
		_HFI_VDBG("Connect from %d:%d\n",
Packit 961e70
			  (int)psm2_epid_nid(epid), (int)psm2_epid_context(epid));
Packit 961e70
		epaddr = psmi_epid_lookup(ptl->ep, epid);
Packit 961e70
		if (epaddr && ((am_epaddr_t *) epaddr)->pid != pid) {
Packit 961e70
			/* If old pid is unknown consider new pid the correct one */
Packit 961e70
			if (((am_epaddr_t *) epaddr)->pid == AMSH_PID_UNKNOWN) {
Packit 961e70
				((am_epaddr_t *) epaddr)->pid = pid;
Packit Service 7ed5cc
				((am_epaddr_t *) epaddr)->gpuid = gpuid;
Packit 961e70
			} else {
Packit 961e70
				psmi_epid_remove(ptl->ep, epid);
Packit 961e70
				epaddr = NULL;
Packit 961e70
				force_remap = 1;
Packit 961e70
			}
Packit 961e70
		}
Packit 961e70
Packit 961e70
		if (shmidx == (uint16_t)-1) {
Packit 961e70
			/* incoming packet will never be from our shmidx slot 0
Packit 961e70
			   thus the other process doesn't know our return info.
Packit 961e70
			   attach_to will lookup or create the proper shmidx */
Packit 961e70
			if ((err = psmi_shm_map_remote(ptl_gen, epid, &shmidx, force_remap))) {
Packit 961e70
				psmi_handle_error(PSMI_EP_NORETURN, err,
Packit 961e70
						  "Fatal error in "
Packit 961e70
						  "connecting to shm segment");
Packit 961e70
			}
Packit 961e70
			am_update_directory(&ptl->am_ep[shmidx]);
Packit 961e70
			tok->shmidx = shmidx;
Packit 961e70
		}
Packit 961e70
Packit 961e70
		if (epaddr == NULL) {
Packit 961e70
			uintptr_t args_segoff =
Packit 961e70
				(uintptr_t) args - ptl->self_nodeinfo->amsh_shmbase;
Packit 961e70
			if ((err = amsh_epaddr_add(ptl_gen, epid, shmidx, &epaddr)))
Packit 961e70
				/* Unfortunately, no way out of here yet */
Packit 961e70
				psmi_handle_error(PSMI_EP_NORETURN, err,
Packit 961e70
						  "Fatal error "
Packit 961e70
						  "in connecting to shm segment");
Packit 961e70
			args =
Packit 961e70
			    (psm2_amarg_t *) (ptl->self_nodeinfo->amsh_shmbase +
Packit 961e70
					     args_segoff);
Packit 961e70
Packit 961e70
			((am_epaddr_t *) epaddr)->pid = pid;
Packit Service 7ed5cc
			((am_epaddr_t *) epaddr)->gpuid = gpuid;
Packit 961e70
		}
Packit 961e70
Packit 961e70
		/* Rewrite args */
Packit 961e70
		ptl->connect_incoming++;
Packit 961e70
		args[0].u16w0 = PSMI_AM_CONN_REP;
Packit 961e70
		/* and return our shmidx for the connecting process */
Packit 961e70
		args[0].u16w1 = shmidx;
Packit 961e70
		args[1].u64w0 = (psm2_epid_t) ptl->epid;
Packit Service 7ed5cc
		args[2].u32w0 = create_extra_ep_data();
Packit 961e70
		args[2].u32w1 = PSM2_OK;
Packit 961e70
		((am_epaddr_t *) epaddr)->cstate_incoming =
Packit 961e70
			AMSH_CSTATE_INCOMING_ESTABLISHED;
Packit 961e70
		((am_epaddr_t *) epaddr)->return_shmidx = return_shmidx;
Packit 961e70
		tok->tok.epaddr_incoming = epaddr;	/* adjust token */
Packit 961e70
		psmi_amsh_short_reply(tok, amsh_conn_handler_hidx,
Packit 961e70
				      args, narg, NULL, 0, 0);
Packit 961e70
		break;
Packit 961e70
Packit 961e70
	case PSMI_AM_CONN_REP:
Packit 961e70
		if (ptl->connect_phase != phase) {
Packit 961e70
			_HFI_VDBG("Out of phase connect reply\n");
Packit 961e70
			return;
Packit 961e70
		}
Packit 961e70
		epaddr = ptl->am_ep[shmidx].epaddr;
Packit 961e70
		/* check if a race has occurred on shm-file reuse.
Packit 961e70
		 * if so, don't transition to the next state.
Packit 961e70
		 * the next call to connreq_poll() will restart the
Packit 961e70
		 * connection.
Packit 961e70
		*/
Packit 961e70
		if (ptl->am_ep[shmidx].pid !=
Packit 961e70
		    ((struct am_ctl_nodeinfo *) ptl->am_ep[shmidx].amsh_shmbase)->pid)
Packit 961e70
			break;
Packit 961e70
Packit 961e70
		*perr = err;
Packit 961e70
		((am_epaddr_t *) epaddr)->cstate_outgoing
Packit 961e70
			= AMSH_CSTATE_OUTGOING_REPLIED;
Packit 961e70
		((am_epaddr_t *) epaddr)->return_shmidx = return_shmidx;
Packit 961e70
		ptl->connect_outgoing++;
Packit 961e70
		_HFI_VDBG("CCC epaddr=%s connected to ptl=%p\n",
Packit 961e70
			  psmi_epaddr_get_name(epaddr->epid), ptl);
Packit 961e70
		break;
Packit 961e70
Packit 961e70
	case PSMI_AM_DISC_REQ:
Packit 961e70
		epaddr = psmi_epid_lookup(ptl->ep, epid);
Packit 961e70
		if (!epaddr) {
Packit 961e70
			_HFI_VDBG("Dropping disconnect request from an epid that we are not connected to\n");
Packit 961e70
			return;
Packit 961e70
		}
Packit 961e70
		args[0].u16w0 = PSMI_AM_DISC_REP;
Packit 961e70
		args[2].u32w1 = PSM2_OK;
Packit 961e70
		((am_epaddr_t *) epaddr)->cstate_incoming =
Packit 961e70
			AMSH_CSTATE_INCOMING_DISC_REQUESTED;
Packit 961e70
		ptl->connect_incoming--;
Packit 961e70
		/* Before sending the reply, make sure the process
Packit 961e70
		 * is still connected */
Packit 961e70
Packit 961e70
		if (ptl->am_ep[shmidx].epid != epaddr->epid)
Packit 961e70
			is_valid = 0;
Packit 961e70
		else
Packit 961e70
			is_valid = 1;
Packit 961e70
Packit 961e70
		if (is_valid) {
Packit 961e70
			psmi_amsh_short_reply(tok, amsh_conn_handler_hidx,
Packit 961e70
					      args, narg, NULL, 0, 0);
Packit 961e70
			/**
Packit 961e70
			* Only munmap if we have nothing more to
Packit 961e70
			* communicate with the other node, i.e. we are
Packit 961e70
			* already disconnected with the other node
Packit 961e70
			* or have sent a disconnect request.
Packit 961e70
			*/
Packit 961e70
			cstate = ((am_epaddr_t *) epaddr)->cstate_outgoing;
Packit 961e70
			if (cstate == AMSH_CSTATE_OUTGOING_DISC_REQUESTED) {
Packit 961e70
				err = psmi_do_unmap(ptl->am_ep[shmidx].amsh_shmbase);
Packit 961e70
				psmi_epid_remove(epaddr->ptlctl->ep, epaddr->epid);
Packit 961e70
			}
Packit 961e70
		}
Packit 961e70
		break;
Packit 961e70
Packit 961e70
	case PSMI_AM_DISC_REP:
Packit 961e70
		if (ptl->connect_phase != phase) {
Packit 961e70
			_HFI_VDBG("Out of phase disconnect reply\n");
Packit 961e70
			return;
Packit 961e70
		}
Packit 961e70
		*perr = err;
Packit 961e70
		epaddr = tok->tok.epaddr_incoming;
Packit 961e70
		((am_epaddr_t *) epaddr)->cstate_outgoing =
Packit 961e70
			AMSH_CSTATE_OUTGOING_DISC_REPLIED;
Packit 961e70
		ptl->connect_outgoing--;
Packit 961e70
		break;
Packit 961e70
Packit 961e70
	default:
Packit 961e70
		psmi_handle_error(PSMI_EP_NORETURN, PSM2_INTERNAL_ERR,
Packit 961e70
				  "Unknown/unhandled connect handler op=%d",
Packit 961e70
				  op);
Packit 961e70
		break;
Packit 961e70
	}
Packit 961e70
	return;
Packit 961e70
}
Packit 961e70
Packit 961e70
static
Packit 961e70
size_t amsh_sizeof(void)
Packit 961e70
{
Packit 961e70
	return sizeof(struct ptl_am);
Packit 961e70
}
Packit 961e70
Packit 961e70
/* Fill in AM capabilities parameters */
Packit 961e70
psm2_error_t
Packit 961e70
psmi_amsh_am_get_parameters(psm2_ep_t ep, struct psm2_am_parameters *parameters)
Packit 961e70
{
Packit 961e70
	if (parameters == NULL) {
Packit 961e70
		return PSM2_PARAM_ERR;
Packit 961e70
	}
Packit 961e70
Packit 961e70
	parameters->max_handlers = PSMI_AM_NUM_HANDLERS;
Packit 961e70
	parameters->max_nargs = PSMI_AM_MAX_ARGS;
Packit 961e70
	parameters->max_request_short = AMLONG_MTU;
Packit 961e70
	parameters->max_reply_short = AMLONG_MTU;
Packit 961e70
Packit 961e70
	return PSM2_OK;
Packit 961e70
}
Packit 961e70
Packit 961e70
/**
Packit 961e70
 * @param ep PSM Endpoint, guaranteed to have initialized epaddr and epid.
Packit 961e70
 * @param ptl Pointer to caller-allocated space for PTL (fill in)
Packit 961e70
 * @param ctl Pointer to caller-allocated space for PTL-control
Packit 961e70
 *            structure (fill in)
Packit 961e70
 */
Packit 961e70
static
Packit 961e70
psm2_error_t
Packit 961e70
amsh_init(psm2_ep_t ep, ptl_t *ptl_gen, ptl_ctl_t *ctl)
Packit 961e70
{
Packit 961e70
	struct ptl_am *ptl = (struct ptl_am *)ptl_gen;
Packit 961e70
	psm2_error_t err = PSM2_OK;
Packit 961e70
Packit 961e70
	/* Preconditions */
Packit 961e70
	psmi_assert_always(ep != NULL);
Packit 961e70
	psmi_assert_always(ep->epaddr != NULL);
Packit 961e70
	psmi_assert_always(ep->epid != 0);
Packit 961e70
Packit 961e70
	ptl->ep = ep;		/* back pointer */
Packit 961e70
	ptl->epid = ep->epid;	/* cache epid */
Packit 961e70
	ptl->epaddr = ep->epaddr;	/* cache a copy */
Packit 961e70
	ptl->ctl = ctl;
Packit 961e70
	ptl->zero_polls = 0;
Packit 961e70
Packit 961e70
	ptl->connect_phase = 0;
Packit 961e70
	ptl->connect_incoming = 0;
Packit 961e70
	ptl->connect_outgoing = 0;
Packit 961e70
Packit 961e70
	memset(&ptl->amsh_empty_shortpkt, 0, sizeof(ptl->amsh_empty_shortpkt));
Packit 961e70
	memset(&ptl->psmi_am_reqq_fifo, 0, sizeof(ptl->psmi_am_reqq_fifo));
Packit 961e70
Packit 961e70
	ptl->max_ep_idx = -1;
Packit 961e70
	ptl->am_ep_size = AMSH_DIRBLOCK_SIZE;
Packit 961e70
Packit 961e70
	ptl->am_ep = (struct am_ctl_nodeinfo *)
Packit 961e70
		psmi_memalign(ptl->ep, PER_PEER_ENDPOINT, 64,
Packit 961e70
			      ptl->am_ep_size * sizeof(struct am_ctl_nodeinfo));
Packit 961e70
Packit 961e70
	if (ptl->am_ep == NULL) {
Packit 961e70
		err = PSM2_NO_MEMORY;
Packit 961e70
		goto fail;
Packit 961e70
	}
Packit 961e70
	memset(ptl->am_ep, 0, ptl->am_ep_size * sizeof(struct am_ctl_nodeinfo));
Packit 961e70
Packit 961e70
	if ((err = amsh_init_segment(ptl_gen)))
Packit 961e70
		goto fail;
Packit 961e70
Packit 961e70
	ptl->self_nodeinfo->psm_verno = PSMI_VERNO;
Packit 961e70
	if (ptl->psmi_kassist_mode != PSMI_KASSIST_OFF) {
Packit 961e70
		if (cma_available()) {
Packit 961e70
			ptl->self_nodeinfo->amsh_features |=
Packit 961e70
				AMSH_HAVE_CMA;
Packit 961e70
			psmi_shm_mq_rv_thresh =
Packit 961e70
				PSMI_MQ_RV_THRESH_CMA;
Packit 961e70
		} else {
Packit 961e70
			ptl->psmi_kassist_mode =
Packit 961e70
				PSMI_KASSIST_OFF;
Packit 961e70
			psmi_shm_mq_rv_thresh =
Packit 961e70
				PSMI_MQ_RV_THRESH_NO_KASSIST;
Packit 961e70
		}
Packit 961e70
	} else {
Packit 961e70
		psmi_shm_mq_rv_thresh =
Packit 961e70
			PSMI_MQ_RV_THRESH_NO_KASSIST;
Packit 961e70
	}
Packit 961e70
	ptl->self_nodeinfo->pid = getpid();
Packit 961e70
	ptl->self_nodeinfo->epid = ep->epid;
Packit 961e70
	ptl->self_nodeinfo->epaddr = ep->epaddr;
Packit 961e70
Packit 961e70
	ips_mb();
Packit 961e70
	ptl->self_nodeinfo->is_init = 1;
Packit 961e70
Packit 961e70
	psmi_am_reqq_init(ptl_gen);
Packit 961e70
	memset(ctl, 0, sizeof(*ctl));
Packit 961e70
Packit 961e70
	/* Fill in the control structure */
Packit 961e70
	ctl->ep = ep;
Packit 961e70
	ctl->ptl = ptl_gen;
Packit 961e70
	ctl->ep_poll = amsh_poll;
Packit 961e70
	ctl->ep_connect = amsh_ep_connect;
Packit 961e70
	ctl->ep_disconnect = amsh_ep_disconnect;
Packit 961e70
Packit 961e70
	ctl->mq_send = amsh_mq_send;
Packit 961e70
	ctl->mq_isend = amsh_mq_isend;
Packit 961e70
Packit 961e70
	ctl->am_get_parameters = psmi_amsh_am_get_parameters;
Packit 961e70
	ctl->am_short_request = psmi_amsh_am_short_request;
Packit 961e70
	ctl->am_short_reply = psmi_amsh_am_short_reply;
Packit 961e70
Packit 961e70
	/* No stats in shm (for now...) */
Packit 961e70
	ctl->epaddr_stats_num = NULL;
Packit 961e70
	ctl->epaddr_stats_init = NULL;
Packit 961e70
	ctl->epaddr_stats_get = NULL;
Packit 961e70
#ifdef PSM_CUDA
Packit 961e70
	union psmi_envvar_val env_memcache_enabled;
Packit 961e70
	psmi_getenv("PSM2_CUDA_MEMCACHE_ENABLED",
Packit 961e70
		    "PSM cuda ipc memhandle cache enabled (default is enabled)",
Packit 961e70
		     PSMI_ENVVAR_LEVEL_USER, PSMI_ENVVAR_TYPE_UINT,
Packit 961e70
		     (union psmi_envvar_val)
Packit 961e70
		      1, &env_memcache_enabled);
Packit 961e70
	if (PSMI_IS_CUDA_ENABLED && env_memcache_enabled.e_uint) {
Packit 961e70
		union psmi_envvar_val env_memcache_size;
Packit 961e70
		psmi_getenv("PSM2_CUDA_MEMCACHE_SIZE",
Packit 961e70
			    "Size of the cuda ipc memhandle cache ",
Packit 961e70
			    PSMI_ENVVAR_LEVEL_USER, PSMI_ENVVAR_TYPE_UINT,
Packit 961e70
			    (union psmi_envvar_val)
Packit 961e70
			    CUDA_MEMHANDLE_CACHE_SIZE, &env_memcache_size);
Packit Service 7ed5cc
		if ((err = am_cuda_memhandle_cache_init(env_memcache_size.e_uint) != PSM2_OK))
Packit 961e70
			goto fail;
Packit 961e70
	}
Packit 961e70
#endif
Packit 961e70
fail:
Packit 961e70
	return err;
Packit 961e70
}
Packit 961e70
Packit 961e70
static psm2_error_t amsh_fini(ptl_t *ptl_gen, int force, uint64_t timeout_ns)
Packit 961e70
{
Packit 961e70
	struct ptl_am *ptl = (struct ptl_am *)ptl_gen;
Packit 961e70
	struct psmi_eptab_iterator itor;
Packit 961e70
	psm2_epaddr_t epaddr;
Packit 961e70
	psm2_error_t err = PSM2_OK;
Packit 961e70
	psm2_error_t err_seg;
Packit 961e70
	uint64_t t_start = get_cycles();
Packit 961e70
	int i = 0;
Packit 961e70
Packit 961e70
	/* Close whatever has been left open -- this will be factored out for 2.1 */
Packit 961e70
	if (ptl->connect_outgoing > 0) {
Packit 961e70
		int num_disc = 0;
Packit 961e70
		int *mask;
Packit 961e70
		psm2_error_t *errs;
Packit 961e70
		psm2_epaddr_t *epaddr_array;
Packit 961e70
Packit 961e70
		psmi_epid_itor_init(&itor, ptl->ep);
Packit 961e70
		while ((epaddr = psmi_epid_itor_next(&itor))) {
Packit 961e70
			if (epaddr->ptlctl->ptl != ptl_gen)
Packit 961e70
				continue;
Packit 961e70
			if (((am_epaddr_t *) epaddr)->cstate_outgoing ==
Packit 961e70
			    AMSH_CSTATE_OUTGOING_ESTABLISHED)
Packit 961e70
				num_disc++;
Packit 961e70
		}
Packit 961e70
		psmi_epid_itor_fini(&itor);
Packit 961e70
Packit 961e70
		mask =
Packit 961e70
		    (int *)psmi_calloc(ptl->ep, UNDEFINED, num_disc,
Packit 961e70
				       sizeof(int));
Packit 961e70
		errs = (psm2_error_t *)
Packit 961e70
		    psmi_calloc(ptl->ep, UNDEFINED, num_disc,
Packit 961e70
				sizeof(psm2_error_t));
Packit 961e70
		epaddr_array = (psm2_epaddr_t *)
Packit 961e70
		    psmi_calloc(ptl->ep, UNDEFINED, num_disc,
Packit 961e70
				sizeof(psm2_epaddr_t));
Packit 961e70
Packit 961e70
		if (errs == NULL || epaddr_array == NULL || mask == NULL) {
Packit 961e70
			if (epaddr_array)
Packit 961e70
				psmi_free(epaddr_array);
Packit 961e70
			if (errs)
Packit 961e70
				psmi_free(errs);
Packit 961e70
			if (mask)
Packit 961e70
				psmi_free(mask);
Packit 961e70
			err = PSM2_NO_MEMORY;
Packit 961e70
			goto fail;
Packit 961e70
		}
Packit 961e70
		psmi_epid_itor_init(&itor, ptl->ep);
Packit 961e70
		while ((epaddr = psmi_epid_itor_next(&itor))) {
Packit 961e70
			if (epaddr->ptlctl->ptl == ptl_gen) {
Packit 961e70
				if (((am_epaddr_t *) epaddr)->cstate_outgoing ==
Packit 961e70
				    AMSH_CSTATE_OUTGOING_ESTABLISHED) {
Packit 961e70
					mask[i] = 1;
Packit 961e70
					epaddr_array[i] = epaddr;
Packit 961e70
					i++;
Packit 961e70
				}
Packit 961e70
			}
Packit 961e70
		}
Packit 961e70
		psmi_epid_itor_fini(&itor);
Packit 961e70
		psmi_assert(i == num_disc && num_disc > 0);
Packit 961e70
		err = amsh_ep_disconnect(ptl_gen, force, num_disc, epaddr_array,
Packit 961e70
					 mask, errs, timeout_ns);
Packit 961e70
		psmi_free(mask);
Packit 961e70
		psmi_free(errs);
Packit 961e70
		psmi_free(epaddr_array);
Packit 961e70
	}
Packit 961e70
Packit 961e70
	if (ptl->connect_incoming > 0 || ptl->connect_outgoing > 0) {
Packit 961e70
		while (ptl->connect_incoming > 0 || ptl->connect_outgoing > 0) {
Packit 961e70
			if (!psmi_cycles_left(t_start, timeout_ns)) {
Packit 961e70
				err = PSM2_TIMEOUT;
Packit 961e70
				_HFI_VDBG("CCC timed out with from=%d,to=%d\n",
Packit 961e70
					  ptl->connect_incoming, ptl->connect_outgoing);
Packit 961e70
				break;
Packit 961e70
			}
Packit 961e70
			psmi_poll_internal(ptl->ep, 1);
Packit 961e70
		}
Packit 961e70
	} else
Packit 961e70
		_HFI_VDBG("CCC complete disconnect from=%d,to=%d\n",
Packit 961e70
			  ptl->connect_incoming, ptl->connect_outgoing);
Packit 961e70
Packit 961e70
	if ((err_seg = psmi_shm_detach(ptl_gen))) {
Packit 961e70
		err = err_seg;
Packit 961e70
		goto fail;
Packit 961e70
	}
Packit 961e70
Packit 961e70
	/* This prevents poll calls between now and the point where the endpoint is
Packit 961e70
	 * deallocated to reference memory that disappeared */
Packit 961e70
	ptl->repH.head = &ptl->amsh_empty_shortpkt;
Packit 961e70
	ptl->reqH.head = &ptl->amsh_empty_shortpkt;
Packit Service 7ed5cc
Packit Service 7ed5cc
	if (ptl->am_ep)
Packit Service 7ed5cc
		psmi_free(ptl->am_ep);
Packit Service 7ed5cc
Packit 961e70
#ifdef PSM_CUDA
Packit 961e70
	if (PSMI_IS_CUDA_ENABLED)
Packit 961e70
		am_cuda_memhandle_cache_map_fini();
Packit 961e70
#endif
Packit 961e70
	return PSM2_OK;
Packit 961e70
fail:
Packit 961e70
	return err;
Packit 961e70
Packit 961e70
}
Packit 961e70
Packit 961e70
static
Packit 961e70
psm2_error_t
Packit 961e70
amsh_setopt(const void *component_obj, int optname,
Packit 961e70
	    const void *optval, uint64_t optlen)
Packit 961e70
{
Packit 961e70
	/* No options for AM PTL at the moment */
Packit 961e70
	return psmi_handle_error(NULL, PSM2_PARAM_ERR,
Packit 961e70
				 "Unknown AM ptl option %u.", optname);
Packit 961e70
}
Packit 961e70
Packit 961e70
static
Packit 961e70
psm2_error_t
Packit 961e70
amsh_getopt(const void *component_obj, int optname,
Packit 961e70
	    void *optval, uint64_t *optlen)
Packit 961e70
{
Packit 961e70
	/* No options for AM PTL at the moment */
Packit 961e70
	return psmi_handle_error(NULL, PSM2_PARAM_ERR,
Packit 961e70
				 "Unknown AM ptl option %u.", optname);
Packit 961e70
}
Packit 961e70
Packit 961e70
/* Only symbol we expose out of here */
Packit 961e70
struct ptl_ctl_init
Packit 961e70
psmi_ptl_amsh = {
Packit 961e70
	amsh_sizeof, amsh_init, amsh_fini, amsh_setopt, amsh_getopt
Packit 961e70
};