/*
* Copyright (c) 2009-2014 Intel Corporation. All rights reserved.
* Copyright (c) 2013 Mellanox Technologies LTD. All rights reserved.
*
* This software is available to you under the OpenIB.org BSD license
* below:
*
* Redistribution and use in source and binary forms, with or
* without modification, are permitted provided that the following
* conditions are met:
*
* - Redistributions of source code must retain the above
* copyright notice, this list of conditions and the following
* disclaimer.
*
* - Redistributions in binary form must reproduce the above
* copyright notice, this list of conditions and the following
* disclaimer in the documentation and/or other materials
* provided with the distribution.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
* EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
* MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AWV
* NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS
* BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN
* ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
* CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
* SOFTWARE.
*/
#include <config.h>
#include <endian.h>
#include <stdio.h>
#include <stdarg.h>
#include <string.h>
#include <osd.h>
#include <arpa/inet.h>
#include <sys/types.h>
#include <sys/stat.h>
#include <sys/time.h>
#include <fcntl.h>
#include <dirent.h>
#include <infiniband/acm.h>
#include <infiniband/acm_prov.h>
#include <infiniband/umad.h>
#include <infiniband/verbs.h>
#include <infiniband/umad_sa.h>
#include <infiniband/umad_sa_mcm.h>
#include <ifaddrs.h>
#include <dlfcn.h>
#include <search.h>
#include <netdb.h>
#include <net/if.h>
#include <sys/ioctl.h>
#include <net/if_arp.h>
#include <netinet/in.h>
#include <linux/netlink.h>
#include <linux/rtnetlink.h>
#include <inttypes.h>
#include <ccan/list.h>
#include "acm_util.h"
#include "acm_mad.h"
#define IB_LID_MCAST_START 0xc000
#define MAX_EP_ADDR 4
#define MAX_EP_MC 2
enum acmp_state {
ACMP_INIT,
ACMP_QUERY_ADDR,
ACMP_ADDR_RESOLVED,
ACMP_QUERY_ROUTE,
ACMP_READY
};
enum acmp_addr_prot {
ACMP_ADDR_PROT_ACM
};
enum acmp_route_prot {
ACMP_ROUTE_PROT_ACM,
ACMP_ROUTE_PROT_SA
};
enum acmp_loopback_prot {
ACMP_LOOPBACK_PROT_NONE,
ACMP_LOOPBACK_PROT_LOCAL
};
enum acmp_route_preload {
ACMP_ROUTE_PRELOAD_NONE,
ACMP_ROUTE_PRELOAD_OSM_FULL_V1
};
enum acmp_addr_preload {
ACMP_ADDR_PRELOAD_NONE,
ACMP_ADDR_PRELOAD_HOSTS
};
/*
* Nested locking order: dest -> ep, dest -> port
*/
struct acmp_ep;
struct acmp_dest {
uint8_t address[ACM_MAX_ADDRESS]; /* keep first */
char name[ACM_MAX_ADDRESS];
struct ibv_ah *ah;
struct ibv_ah_attr av;
struct ibv_path_record path;
union ibv_gid mgid;
__be64 req_id;
struct list_head req_queue;
uint32_t remote_qpn;
pthread_mutex_t lock;
enum acmp_state state;
atomic_t refcnt;
uint64_t addr_timeout;
uint64_t route_timeout;
uint8_t addr_type;
struct acmp_ep *ep;
};
struct acmp_device;
struct acmp_port {
struct acmp_device *dev;
const struct acm_port *port;
struct list_head ep_list;
pthread_mutex_t lock;
struct acmp_dest sa_dest;
enum ibv_port_state state;
enum ibv_mtu mtu;
enum ibv_rate rate;
int subnet_timeout;
uint16_t default_pkey_ix;
uint16_t lid;
uint16_t lid_mask;
uint8_t port_num;
};
struct acmp_device {
struct ibv_context *verbs;
const struct acm_device *device;
struct ibv_comp_channel *channel;
struct ibv_pd *pd;
__be64 guid;
struct list_node entry;
pthread_t comp_thread_id;
int port_cnt;
struct acmp_port port[0];
};
/* Maintain separate virtual send queues to avoid deadlock */
struct acmp_send_queue {
int credits;
struct list_head pending;
};
struct acmp_addr {
uint16_t type;
union acm_ep_info info;
struct acm_address addr;
struct acmp_ep *ep;
};
struct acmp_addr_ctx {
struct acmp_ep *ep;
int addr_inx;
};
struct acmp_ep {
struct acmp_port *port;
struct ibv_cq *cq;
struct ibv_qp *qp;
struct ibv_mr *mr;
uint8_t *recv_bufs;
struct list_node entry;
char id_string[IBV_SYSFS_NAME_MAX + 11];
void *dest_map[ACM_ADDRESS_RESERVED - 1];
struct acmp_dest mc_dest[MAX_EP_MC];
int mc_cnt;
uint16_t pkey_index;
uint16_t pkey;
const struct acm_endpoint *endpoint;
pthread_mutex_t lock;
struct acmp_send_queue resolve_queue;
struct acmp_send_queue resp_queue;
struct list_head active_queue;
struct list_head wait_queue;
enum acmp_state state;
/* This lock protects nmbr_ep_addrs and addr_info */
pthread_rwlock_t rwlock;
int nmbr_ep_addrs;
struct acmp_addr *addr_info;
atomic_t counters[ACM_MAX_COUNTER];
};
struct acmp_send_msg {
struct list_node entry;
struct acmp_ep *ep;
struct acmp_dest *dest;
struct ibv_ah *ah;
void *context;
void (*resp_handler)(struct acmp_send_msg *req,
struct ibv_wc *wc, struct acm_mad *resp);
struct acmp_send_queue *req_queue;
struct ibv_mr *mr;
struct ibv_send_wr wr;
struct ibv_sge sge;
uint64_t expires;
int tries;
uint8_t data[ACM_SEND_SIZE];
};
struct acmp_request {
uint64_t id;
struct list_node entry;
struct acm_msg msg;
struct acmp_ep *ep;
};
static int acmp_open_dev(const struct acm_device *device, void **dev_context);
static void acmp_close_dev(void *dev_context);
static int acmp_open_port(const struct acm_port *port, void *dev_context,
void **port_context);
static void acmp_close_port(void *port_context);
static int acmp_open_endpoint(const struct acm_endpoint *endpoint,
void *port_context, void **ep_context);
static void acmp_close_endpoint(void *ep_context);
static int acmp_add_addr(const struct acm_address *addr, void *ep_context,
void **addr_context);
static void acmp_remove_addr(void *addr_context);
static int acmp_resolve(void *addr_context, struct acm_msg *msg, uint64_t id);
static int acmp_query(void *addr_context, struct acm_msg *msg, uint64_t id);
static int acmp_handle_event(void *port_context, enum ibv_event_type type);
static void acmp_query_perf(void *ep_context, uint64_t *values, uint8_t *cnt);
static struct acm_provider def_prov = {
.size = sizeof(struct acm_provider),
.version = ACM_PROV_VERSION,
.name = "ibacmp",
.open_device = acmp_open_dev,
.close_device = acmp_close_dev,
.open_port = acmp_open_port,
.close_port = acmp_close_port,
.open_endpoint = acmp_open_endpoint,
.close_endpoint = acmp_close_endpoint,
.add_address = acmp_add_addr,
.remove_address = acmp_remove_addr,
.resolve = acmp_resolve,
.query = acmp_query,
.handle_event = acmp_handle_event,
.query_perf = acmp_query_perf,
};
static LIST_HEAD(acmp_dev_list);
static pthread_mutex_t acmp_dev_lock;
static atomic_t g_tid;
static LIST_HEAD(timeout_list);
static event_t timeout_event;
static atomic_t wait_cnt;
static pthread_t retry_thread_id;
static int retry_thread_started = 0;
static __thread char log_data[ACM_MAX_ADDRESS];
/*
* Service options - may be set through ibacm_opts.cfg file.
*/
static char route_data_file[128] = ACM_CONF_DIR "/ibacm_route.data";
static char addr_data_file[128] = ACM_CONF_DIR "/ibacm_hosts.data";
static enum acmp_addr_prot addr_prot = ACMP_ADDR_PROT_ACM;
static int addr_timeout = 1440;
static enum acmp_route_prot route_prot = ACMP_ROUTE_PROT_SA;
static int route_timeout = -1;
static enum acmp_loopback_prot loopback_prot = ACMP_LOOPBACK_PROT_LOCAL;
static int timeout = 2000;
static int retries = 2;
static int resolve_depth = 1;
static int send_depth = 1;
static int recv_depth = 1024;
static uint8_t min_mtu = IBV_MTU_2048;
static uint8_t min_rate = IBV_RATE_10_GBPS;
static enum acmp_route_preload route_preload;
static enum acmp_addr_preload addr_preload;
static int acmp_initialized = 0;
static int acmp_compare_dest(const void *dest1, const void *dest2)
{
return memcmp(dest1, dest2, ACM_MAX_ADDRESS);
}
static void
acmp_set_dest_addr(struct acmp_dest *dest, uint8_t addr_type,
const uint8_t *addr, size_t size)
{
memcpy(dest->address, addr, size);
dest->addr_type = addr_type;
acm_format_name(0, dest->name, sizeof dest->name, addr_type, addr, size);
}
static void
acmp_init_dest(struct acmp_dest *dest, uint8_t addr_type,
const uint8_t *addr, size_t size)
{
list_head_init(&dest->req_queue);
atomic_init(&dest->refcnt);
atomic_set(&dest->refcnt, 1);
pthread_mutex_init(&dest->lock, NULL);
if (size)
acmp_set_dest_addr(dest, addr_type, addr, size);
dest->state = ACMP_INIT;
}
static struct acmp_dest *
acmp_alloc_dest(uint8_t addr_type, const uint8_t *addr)
{
struct acmp_dest *dest;
dest = calloc(1, sizeof *dest);
if (!dest) {
acm_log(0, "ERROR - unable to allocate dest\n");
return NULL;
}
acmp_init_dest(dest, addr_type, addr, ACM_MAX_ADDRESS);
acm_log(1, "%s\n", dest->name);
return dest;
}
/* Caller must hold ep lock. */
static struct acmp_dest *
acmp_get_dest(struct acmp_ep *ep, uint8_t addr_type, const uint8_t *addr)
{
struct acmp_dest *dest, **tdest;
tdest = tfind(addr, &ep->dest_map[addr_type - 1], acmp_compare_dest);
if (tdest) {
dest = *tdest;
(void) atomic_inc(&dest->refcnt);
acm_log(2, "%s\n", dest->name);
} else {
dest = NULL;
acm_format_name(2, log_data, sizeof log_data,
addr_type, addr, ACM_MAX_ADDRESS);
acm_log(2, "%s not found\n", log_data);
}
return dest;
}
static void
acmp_put_dest(struct acmp_dest *dest)
{
acm_log(2, "%s\n", dest->name);
if (atomic_dec(&dest->refcnt) == 0) {
free(dest);
}
}
/* Caller must hold ep lock. */
static void
acmp_remove_dest(struct acmp_ep *ep, struct acmp_dest *dest)
{
acm_log(2, "%s\n", dest->name);
if (!tdelete(dest->address, &ep->dest_map[dest->addr_type - 1],
acmp_compare_dest))
acm_log(0, "ERROR: %s not found!!\n", dest->name);
acmp_put_dest(dest);
}
static struct acmp_dest *
acmp_acquire_dest(struct acmp_ep *ep, uint8_t addr_type, const uint8_t *addr)
{
struct acmp_dest *dest;
int64_t rec_expr_minutes;
acm_format_name(2, log_data, sizeof log_data,
addr_type, addr, ACM_MAX_ADDRESS);
acm_log(2, "%s\n", log_data);
pthread_mutex_lock(&ep->lock);
dest = acmp_get_dest(ep, addr_type, addr);
if (dest && dest->state == ACMP_READY &&
dest->addr_timeout != (uint64_t)~0ULL) {
rec_expr_minutes = dest->addr_timeout - time_stamp_min();
if (rec_expr_minutes <= 0) {
acm_log(2, "Record expired\n");
acmp_remove_dest(ep, dest);
dest = NULL;
} else {
acm_log(2, "Record valid for the next %" PRId64 " minute(s)\n",
rec_expr_minutes);
}
}
if (!dest) {
dest = acmp_alloc_dest(addr_type, addr);
if (dest) {
dest->ep = ep;
tsearch(dest, &ep->dest_map[addr_type - 1], acmp_compare_dest);
(void) atomic_inc(&dest->refcnt);
}
}
pthread_mutex_unlock(&ep->lock);
return dest;
}
static struct acmp_request *acmp_alloc_req(uint64_t id, struct acm_msg *msg)
{
struct acmp_request *req;
req = calloc(1, sizeof *req);
if (!req) {
acm_log(0, "ERROR - unable to alloc client request\n");
return NULL;
}
req->id = id;
memcpy(&req->msg, msg, sizeof(req->msg));
acm_log(2, "id %" PRIu64 ", req %p\n", id, req);
return req;
}
static void acmp_free_req(struct acmp_request *req)
{
acm_log(2, "%p\n", req);
free(req);
}
static struct acmp_send_msg *
acmp_alloc_send(struct acmp_ep *ep, struct acmp_dest *dest, size_t size)
{
struct acmp_send_msg *msg;
msg = (struct acmp_send_msg *) calloc(1, sizeof *msg);
if (!msg) {
acm_log(0, "ERROR - unable to allocate send buffer\n");
return NULL;
}
msg->ep = ep;
msg->mr = ibv_reg_mr(ep->port->dev->pd, msg->data, size, 0);
if (!msg->mr) {
acm_log(0, "ERROR - failed to register send buffer\n");
goto err1;
}
if (!dest->ah) {
msg->ah = ibv_create_ah(ep->port->dev->pd, &dest->av);
if (!msg->ah) {
acm_log(0, "ERROR - unable to create ah\n");
goto err2;
}
msg->wr.wr.ud.ah = msg->ah;
} else {
msg->wr.wr.ud.ah = dest->ah;
}
acm_log(2, "get dest %s\n", dest->name);
(void) atomic_inc(&dest->refcnt);
msg->dest = dest;
msg->wr.next = NULL;
msg->wr.sg_list = &msg->sge;
msg->wr.num_sge = 1;
msg->wr.opcode = IBV_WR_SEND;
msg->wr.send_flags = IBV_SEND_SIGNALED;
msg->wr.wr_id = (uintptr_t) msg;
msg->wr.wr.ud.remote_qpn = dest->remote_qpn;
msg->wr.wr.ud.remote_qkey = ACM_QKEY;
msg->sge.length = size;
msg->sge.lkey = msg->mr->lkey;
msg->sge.addr = (uintptr_t) msg->data;
acm_log(2, "%p\n", msg);
return msg;
err2:
ibv_dereg_mr(msg->mr);
err1:
free(msg);
return NULL;
}
static void
acmp_init_send_req(struct acmp_send_msg *msg, void *context,
void (*resp_handler)(struct acmp_send_msg *req,
struct ibv_wc *wc, struct acm_mad *resp))
{
acm_log(2, "%p\n", msg);
msg->tries = retries + 1;
msg->context = context;
msg->resp_handler = resp_handler;
}
static void acmp_free_send(struct acmp_send_msg *msg)
{
acm_log(2, "%p\n", msg);
if (msg->ah)
ibv_destroy_ah(msg->ah);
ibv_dereg_mr(msg->mr);
acmp_put_dest(msg->dest);
free(msg);
}
static void acmp_post_send(struct acmp_send_queue *queue, struct acmp_send_msg *msg)
{
struct acmp_ep *ep = msg->ep;
struct ibv_send_wr *bad_wr;
msg->req_queue = queue;
pthread_mutex_lock(&ep->lock);
if (queue->credits) {
acm_log(2, "posting send to QP\n");
queue->credits--;
list_add_tail(&ep->active_queue, &msg->entry);
ibv_post_send(ep->qp, &msg->wr, &bad_wr);
} else {
acm_log(2, "no sends available, queuing message\n");
list_add_tail(&queue->pending, &msg->entry);
}
pthread_mutex_unlock(&ep->lock);
}
static void acmp_post_recv(struct acmp_ep *ep, uint64_t address)
{
struct ibv_recv_wr wr, *bad_wr;
struct ibv_sge sge;
wr.next = NULL;
wr.sg_list = &sge;
wr.num_sge = 1;
wr.wr_id = address;
sge.length = ACM_RECV_SIZE;
sge.lkey = ep->mr->lkey;
sge.addr = address;
ibv_post_recv(ep->qp, &wr, &bad_wr);
}
/* Caller must hold ep lock */
static void acmp_send_available(struct acmp_ep *ep, struct acmp_send_queue *queue)
{
struct acmp_send_msg *msg;
struct ibv_send_wr *bad_wr;
msg = list_pop(&queue->pending, struct acmp_send_msg, entry);
if (msg) {
acm_log(2, "posting queued send message\n");
list_add_tail(&ep->active_queue, &msg->entry);
ibv_post_send(ep->qp, &msg->wr, &bad_wr);
} else {
queue->credits++;
}
}
static void acmp_complete_send(struct acmp_send_msg *msg)
{
struct acmp_ep *ep = msg->ep;
pthread_mutex_lock(&ep->lock);
list_del(&msg->entry);
if (msg->tries) {
acm_log(2, "waiting for response\n");
msg->expires = time_stamp_ms() + ep->port->subnet_timeout + timeout;
list_add_tail(&ep->wait_queue, &msg->entry);
if (atomic_inc(&wait_cnt) == 1)
event_signal(&timeout_event);
} else {
acm_log(2, "freeing\n");
acmp_send_available(ep, msg->req_queue);
acmp_free_send(msg);
}
pthread_mutex_unlock(&ep->lock);
}
static struct acmp_send_msg *acmp_get_request(struct acmp_ep *ep, __be64 tid, int *free)
{
struct acmp_send_msg *msg, *next, *req = NULL;
struct acm_mad *mad;
acm_log(2, "\n");
pthread_mutex_lock(&ep->lock);
list_for_each_safe(&ep->wait_queue, msg, next, entry) {
mad = (struct acm_mad *) msg->data;
if (mad->tid == tid) {
acm_log(2, "match found in wait queue\n");
req = msg;
list_del(&msg->entry);
(void) atomic_dec(&wait_cnt);
acmp_send_available(ep, msg->req_queue);
*free = 1;
goto unlock;
}
}
list_for_each(&ep->active_queue, msg, entry) {
mad = (struct acm_mad *) msg->data;
if (mad->tid == tid && msg->tries) {
acm_log(2, "match found in active queue\n");
req = msg;
req->tries = 0;
*free = 0;
break;
}
}
unlock:
pthread_mutex_unlock(&ep->lock);
return req;
}
static int acmp_mc_index(struct acmp_ep *ep, union ibv_gid *gid)
{
int i;
for (i = 0; i < ep->mc_cnt; i++) {
if (!memcmp(&ep->mc_dest[i].address, gid, sizeof(*gid)))
return i;
}
return -1;
}
/* Multicast groups are ordered lowest to highest preference. */
static int acmp_best_mc_index(struct acmp_ep *ep, struct acm_resolve_rec *rec)
{
int i, index;
for (i = min_t(int, rec->gid_cnt, ACM_MAX_GID_COUNT) - 1; i >= 0; i--) {
index = acmp_mc_index(ep, &rec->gid[i]);
if (index >= 0) {
return index;
}
}
return -1;
}
static void
acmp_record_mc_av(struct acmp_port *port, struct ib_mc_member_rec *mc_rec,
struct acmp_dest *dest)
{
uint32_t sl_flow_hop;
sl_flow_hop = be32toh(mc_rec->sl_flow_hop);
dest->av.dlid = be16toh(mc_rec->mlid);
dest->av.sl = (uint8_t) (sl_flow_hop >> 28);
dest->av.src_path_bits = port->sa_dest.av.src_path_bits;
dest->av.static_rate = mc_rec->rate & 0x3F;
dest->av.port_num = port->port_num;
dest->av.is_global = 1;
dest->av.grh.dgid = mc_rec->mgid;
dest->av.grh.flow_label = (sl_flow_hop >> 8) & 0xFFFFF;
dest->av.grh.sgid_index = acm_gid_index((struct acm_port *) port->port,
&mc_rec->port_gid);
dest->av.grh.hop_limit = (uint8_t) sl_flow_hop;
dest->av.grh.traffic_class = mc_rec->tclass;
dest->path.dgid = mc_rec->mgid;
dest->path.sgid = mc_rec->port_gid;
dest->path.dlid = mc_rec->mlid;
dest->path.slid = htobe16(port->lid | port->sa_dest.av.src_path_bits);
dest->path.flowlabel_hoplimit = htobe32(sl_flow_hop & 0xFFFFFFF);
dest->path.tclass = mc_rec->tclass;
dest->path.reversible_numpath = IBV_PATH_RECORD_REVERSIBLE | 1;
dest->path.pkey = mc_rec->pkey;
dest->path.qosclass_sl = htobe16((uint16_t) (sl_flow_hop >> 28));
dest->path.mtu = mc_rec->mtu;
dest->path.rate = mc_rec->rate;
dest->path.packetlifetime = mc_rec->packet_lifetime;
}
/* Always send the GRH to transfer GID data to remote side */
static void
acmp_init_path_av(struct acmp_port *port, struct acmp_dest *dest)
{
uint32_t flow_hop;
dest->av.dlid = be16toh(dest->path.dlid);
dest->av.sl = be16toh(dest->path.qosclass_sl) & 0xF;
dest->av.src_path_bits = be16toh(dest->path.slid) & 0x7F;
dest->av.static_rate = dest->path.rate & 0x3F;
dest->av.port_num = port->port_num;
flow_hop = be32toh(dest->path.flowlabel_hoplimit);
dest->av.is_global = 1;
dest->av.grh.flow_label = (flow_hop >> 8) & 0xFFFFF;
pthread_mutex_lock(&port->lock);
if (port->port)
dest->av.grh.sgid_index = acm_gid_index(
(struct acm_port *) port->port, &dest->path.sgid);
else
dest->av.grh.sgid_index = 0;
pthread_mutex_unlock(&port->lock);
dest->av.grh.hop_limit = (uint8_t) flow_hop;
dest->av.grh.traffic_class = dest->path.tclass;
}
static void acmp_process_join_resp(struct acm_sa_mad *sa_mad)
{
struct acmp_dest *dest;
struct ib_mc_member_rec *mc_rec;
struct ib_sa_mad *mad;
int index, ret;
struct acmp_ep *ep = sa_mad->context;
mad = (struct ib_sa_mad *) &sa_mad->sa_mad;
acm_log(1, "response status: 0x%x, mad status: 0x%x\n",
sa_mad->umad.status, mad->status);
pthread_mutex_lock(&ep->lock);
if (sa_mad->umad.status) {
acm_log(0, "ERROR - send join failed 0x%x\n", sa_mad->umad.status);
goto out;
}
if (mad->status) {
acm_log(0, "ERROR - join response status 0x%x\n", mad->status);
goto out;
}
mc_rec = (struct ib_mc_member_rec *) mad->data;
index = acmp_mc_index(ep, &mc_rec->mgid);
if (index < 0) {
acm_log(0, "ERROR - MGID in join response not found\n");
goto out;
}
dest = &ep->mc_dest[index];
dest->remote_qpn = IB_MC_QPN;
dest->mgid = mc_rec->mgid;
acmp_record_mc_av(ep->port, mc_rec, dest);
if (index == 0) {
dest->ah = ibv_create_ah(ep->port->dev->pd, &dest->av);
if (!dest->ah) {
acm_log(0, "ERROR - unable to create ah\n");
goto out;
}
ret = ibv_attach_mcast(ep->qp, &dest->mgid, dest->av.dlid);
if (ret) {
acm_log(0, "ERROR - unable to attach QP to multicast group\n");
ibv_destroy_ah(dest->ah);
dest->ah = NULL;
goto out;
}
ep->state = ACMP_READY;
}
atomic_set(&dest->refcnt, 1);
dest->state = ACMP_READY;
acm_log(1, "join successful\n");
out:
acm_free_sa_mad(sa_mad);
pthread_mutex_unlock(&ep->lock);
}
static uint8_t
acmp_record_acm_route(struct acmp_ep *ep, struct acmp_dest *dest)
{
int i;
acm_log(2, "\n");
for (i = 0; i < MAX_EP_MC; i++) {
if (!memcmp(&dest->mgid, &ep->mc_dest[i].mgid, sizeof dest->mgid))
break;
}
if (i == MAX_EP_MC) {
acm_log(0, "ERROR - cannot match mgid\n");
return ACM_STATUS_EINVAL;
}
dest->path = ep->mc_dest[i].path;
dest->path.dgid = dest->av.grh.dgid;
dest->path.dlid = htobe16(dest->av.dlid);
dest->addr_timeout = time_stamp_min() + (unsigned) addr_timeout;
dest->route_timeout = time_stamp_min() + (unsigned) route_timeout;
dest->state = ACMP_READY;
return ACM_STATUS_SUCCESS;
}
static void acmp_init_path_query(struct ib_sa_mad *mad)
{
acm_log(2, "\n");
mad->base_version = 1;
mad->mgmt_class = IB_MGMT_CLASS_SA;
mad->class_version = 2;
mad->method = IB_METHOD_GET;
mad->tid = htobe64((uint64_t) atomic_inc(&g_tid));
mad->attr_id = IB_SA_ATTR_PATH_REC;
}
/* Caller must hold dest lock */
static uint8_t acmp_resolve_path_sa(struct acmp_ep *ep, struct acmp_dest *dest,
void (*handler)(struct acm_sa_mad *))
{
struct ib_sa_mad *mad;
uint8_t ret;
struct acm_sa_mad *sa_mad;
acm_log(2, "%s\n", dest->name);
sa_mad = acm_alloc_sa_mad(ep->endpoint, dest, handler);
if (!sa_mad) {
acm_log(0, "Error - failed to allocate sa_mad\n");
ret = ACM_STATUS_ENOMEM;
goto err;
}
mad = (struct ib_sa_mad *) &sa_mad->sa_mad;
acmp_init_path_query(mad);
memcpy(mad->data, &dest->path, sizeof(dest->path));
mad->comp_mask = acm_path_comp_mask(&dest->path);
acm_increment_counter(ACM_CNTR_ROUTE_QUERY);
atomic_inc(&ep->counters[ACM_CNTR_ROUTE_QUERY]);
dest->state = ACMP_QUERY_ROUTE;
if (acm_send_sa_mad(sa_mad)) {
acm_log(0, "Error - Failed to send sa mad\n");
ret = ACM_STATUS_ENODATA;
goto free_mad;
}
return ACM_STATUS_SUCCESS;
free_mad:
acm_free_sa_mad(sa_mad);
err:
dest->state = ACMP_INIT;
return ret;
}
static uint8_t
acmp_record_acm_addr(struct acmp_ep *ep, struct acmp_dest *dest, struct ibv_wc *wc,
struct acm_resolve_rec *rec)
{
int index;
acm_log(2, "%s\n", dest->name);
index = acmp_best_mc_index(ep, rec);
if (index < 0) {
acm_log(0, "ERROR - no shared multicast groups\n");
dest->state = ACMP_INIT;
return ACM_STATUS_ENODATA;
}
acm_log(2, "selecting MC group at index %d\n", index);
dest->av = ep->mc_dest[index].av;
dest->av.dlid = wc->slid;
dest->av.src_path_bits = wc->dlid_path_bits;
dest->av.grh.dgid = ((struct ibv_grh *) (uintptr_t) wc->wr_id)->sgid;
dest->mgid = ep->mc_dest[index].mgid;
dest->path.sgid = ep->mc_dest[index].path.sgid;
dest->path.dgid = dest->av.grh.dgid;
dest->path.tclass = ep->mc_dest[index].path.tclass;
dest->path.pkey = ep->mc_dest[index].path.pkey;
dest->remote_qpn = wc->src_qp;
dest->state = ACMP_ADDR_RESOLVED;
return ACM_STATUS_SUCCESS;
}
static void
acmp_record_path_addr(struct acmp_ep *ep, struct acmp_dest *dest,
struct ibv_path_record *path)
{
acm_log(2, "%s\n", dest->name);
dest->path.pkey = htobe16(ep->pkey);
dest->path.dgid = path->dgid;
if (path->slid) {
dest->path.slid = path->slid;
} else {
dest->path.slid = htobe16(ep->port->lid);
}
if (!ib_any_gid(&path->sgid)) {
dest->path.sgid = path->sgid;
} else {
dest->path.sgid = ep->mc_dest[0].path.sgid;
}
dest->path.dlid = path->dlid;
dest->state = ACMP_ADDR_RESOLVED;
}
static uint8_t acmp_validate_addr_req(struct acm_mad *mad)
{
struct acm_resolve_rec *rec;
if (mad->method != IB_METHOD_GET) {
acm_log(0, "ERROR - invalid method 0x%x\n", mad->method);
return ACM_STATUS_EINVAL;
}
rec = (struct acm_resolve_rec *) mad->data;
if (!rec->src_type || rec->src_type >= ACM_ADDRESS_RESERVED) {
acm_log(0, "ERROR - unknown src type 0x%x\n", rec->src_type);
return ACM_STATUS_EINVAL;
}
return ACM_STATUS_SUCCESS;
}
static void
acmp_send_addr_resp(struct acmp_ep *ep, struct acmp_dest *dest)
{
struct acm_resolve_rec *rec;
struct acmp_send_msg *msg;
struct acm_mad *mad;
acm_log(2, "%s\n", dest->name);
msg = acmp_alloc_send(ep, dest, sizeof (*mad));
if (!msg) {
acm_log(0, "ERROR - failed to allocate message\n");
return;
}
mad = (struct acm_mad *) msg->data;
rec = (struct acm_resolve_rec *) mad->data;
mad->base_version = 1;
mad->mgmt_class = ACM_MGMT_CLASS;
mad->class_version = 1;
mad->method = IB_METHOD_GET | IB_METHOD_RESP;
mad->status = ACM_STATUS_SUCCESS;
mad->control = ACM_CTRL_RESOLVE;
mad->tid = dest->req_id;
rec->gid_cnt = 1;
memcpy(rec->gid, dest->mgid.raw, sizeof(union ibv_gid));
acmp_post_send(&ep->resp_queue, msg);
}
static int
acmp_resolve_response(uint64_t id, struct acm_msg *req_msg,
struct acmp_dest *dest, uint8_t status)
{
struct acm_msg msg;
acm_log(2, "client %" PRIu64 ", status 0x%x\n", id, status);
memset(&msg, 0, sizeof msg);
if (dest) {
if (status == ACM_STATUS_ENODATA)
atomic_inc(&dest->ep->counters[ACM_CNTR_NODATA]);
else if (status)
atomic_inc(&dest->ep->counters[ACM_CNTR_ERROR]);
}
msg.hdr = req_msg->hdr;
msg.hdr.status = status;
msg.hdr.length = ACM_MSG_HDR_LENGTH;
memset(msg.hdr.data, 0, sizeof(msg.hdr.data));
if (status == ACM_STATUS_SUCCESS) {
msg.hdr.length += ACM_MSG_EP_LENGTH;
msg.resolve_data[0].flags = IBV_PATH_FLAG_GMP |
IBV_PATH_FLAG_PRIMARY | IBV_PATH_FLAG_BIDIRECTIONAL;
msg.resolve_data[0].type = ACM_EP_INFO_PATH;
msg.resolve_data[0].info.path = dest->path;
if (req_msg->hdr.src_out) {
msg.hdr.length += ACM_MSG_EP_LENGTH;
memcpy(&msg.resolve_data[1],
&req_msg->resolve_data[req_msg->hdr.src_index],
ACM_MSG_EP_LENGTH);
}
}
return acm_resolve_response(id, &msg);
}
static void
acmp_complete_queued_req(struct acmp_dest *dest, uint8_t status)
{
struct acmp_request *req;
acm_log(2, "status %d\n", status);
pthread_mutex_lock(&dest->lock);
while ((req = list_pop(&dest->req_queue, struct acmp_request, entry))) {
pthread_mutex_unlock(&dest->lock);
acm_log(2, "completing request, client %" PRIu64 "\n", req->id);
acmp_resolve_response(req->id, &req->msg, dest, status);
acmp_free_req(req);
pthread_mutex_lock(&dest->lock);
}
pthread_mutex_unlock(&dest->lock);
}
static void
acmp_dest_sa_resp(struct acm_sa_mad *mad)
{
struct acmp_dest *dest = (struct acmp_dest *) mad->context;
struct ib_sa_mad *sa_mad = (struct ib_sa_mad *) &mad->sa_mad;
uint8_t status;
if (!mad->umad.status) {
status = (uint8_t) (be16toh(sa_mad->status) >> 8);
} else {
status = ACM_STATUS_ETIMEDOUT;
}
acm_log(2, "%s status=0x%x\n", dest->name, status);
pthread_mutex_lock(&dest->lock);
if (dest->state != ACMP_QUERY_ROUTE) {
acm_log(1, "notice - discarding SA response\n");
pthread_mutex_unlock(&dest->lock);
goto out;
}
if (!status) {
memcpy(&dest->path, sa_mad->data, sizeof(dest->path));
acmp_init_path_av(dest->ep->port, dest);
dest->addr_timeout = time_stamp_min() + (unsigned) addr_timeout;
dest->route_timeout = time_stamp_min() + (unsigned) route_timeout;
acm_log(2, "timeout addr %" PRIu64 " route %" PRIu64 "\n",
dest->addr_timeout, dest->route_timeout);
dest->state = ACMP_READY;
} else {
dest->state = ACMP_INIT;
}
pthread_mutex_unlock(&dest->lock);
acmp_complete_queued_req(dest, status);
out:
acm_free_sa_mad(mad);
}
static void
acmp_resolve_sa_resp(struct acm_sa_mad *mad)
{
struct acmp_dest *dest = (struct acmp_dest *) mad->context;
int send_resp;
acm_log(2, "\n");
acmp_dest_sa_resp(mad);
pthread_mutex_lock(&dest->lock);
send_resp = (dest->state == ACMP_READY);
pthread_mutex_unlock(&dest->lock);
if (send_resp)
acmp_send_addr_resp(dest->ep, dest);
}
static struct acmp_addr *
acmp_addr_lookup(struct acmp_ep *ep, uint8_t *addr, uint16_t type)
{
struct acmp_addr *ret = NULL;
int i;
pthread_rwlock_rdlock(&ep->rwlock);
for (i = 0; i < ep->nmbr_ep_addrs; i++) {
if (ep->addr_info[i].type != type)
continue;
if ((type == ACM_ADDRESS_NAME &&
!strncasecmp((char *) ep->addr_info[i].info.name,
(char *) addr, ACM_MAX_ADDRESS)) ||
!memcmp(ep->addr_info[i].info.addr, addr,
ACM_MAX_ADDRESS)) {
ret = ep->addr_info + i;
break;
}
}
pthread_rwlock_unlock(&ep->rwlock);
return ret;
}
static void
acmp_process_addr_req(struct acmp_ep *ep, struct ibv_wc *wc, struct acm_mad *mad)
{
struct acm_resolve_rec *rec;
struct acmp_dest *dest;
uint8_t status;
struct acmp_addr *addr;
acm_log(2, "\n");
if ((status = acmp_validate_addr_req(mad))) {
acm_log(0, "ERROR - invalid request\n");
return;
}
rec = (struct acm_resolve_rec *) mad->data;
dest = acmp_acquire_dest(ep, rec->src_type, rec->src);
if (!dest) {
acm_log(0, "ERROR - unable to add source\n");
return;
}
addr = acmp_addr_lookup(ep, rec->dest, rec->dest_type);
if (addr)
dest->req_id = mad->tid;
pthread_mutex_lock(&dest->lock);
acm_log(2, "dest state %d\n", dest->state);
switch (dest->state) {
case ACMP_READY:
if (dest->remote_qpn == wc->src_qp)
break;
acm_log(2, "src service has new qp, resetting\n");
/* fall through */
case ACMP_INIT:
case ACMP_QUERY_ADDR:
status = acmp_record_acm_addr(ep, dest, wc, rec);
if (status)
break;
/* fall through */
case ACMP_ADDR_RESOLVED:
if (route_prot == ACMP_ROUTE_PROT_ACM) {
status = acmp_record_acm_route(ep, dest);
break;
}
if (addr || !list_empty(&dest->req_queue)) {
status = acmp_resolve_path_sa(ep, dest, acmp_resolve_sa_resp);
if (status)
break;
}
/* fall through */
default:
pthread_mutex_unlock(&dest->lock);
acmp_put_dest(dest);
return;
}
pthread_mutex_unlock(&dest->lock);
acmp_complete_queued_req(dest, status);
if (addr && !status) {
acmp_send_addr_resp(ep, dest);
}
acmp_put_dest(dest);
}
static void
acmp_process_addr_resp(struct acmp_send_msg *msg, struct ibv_wc *wc, struct acm_mad *mad)
{
struct acm_resolve_rec *resp_rec;
struct acmp_dest *dest = (struct acmp_dest *) msg->context;
uint8_t status;
if (mad) {
status = acm_class_status(mad->status);
resp_rec = (struct acm_resolve_rec *) mad->data;
} else {
status = ACM_STATUS_ETIMEDOUT;
resp_rec = NULL;
}
acm_log(2, "resp status 0x%x\n", status);
pthread_mutex_lock(&dest->lock);
if (dest->state != ACMP_QUERY_ADDR) {
pthread_mutex_unlock(&dest->lock);
goto put;
}
if (!status) {
status = acmp_record_acm_addr(msg->ep, dest, wc, resp_rec);
if (!status) {
if (route_prot == ACMP_ROUTE_PROT_ACM) {
status = acmp_record_acm_route(msg->ep, dest);
} else {
status = acmp_resolve_path_sa(msg->ep, dest, acmp_dest_sa_resp);
if (!status) {
pthread_mutex_unlock(&dest->lock);
goto put;
}
}
}
} else {
dest->state = ACMP_INIT;
}
pthread_mutex_unlock(&dest->lock);
acmp_complete_queued_req(dest, status);
put:
acmp_put_dest(dest);
}
static void acmp_process_acm_recv(struct acmp_ep *ep, struct ibv_wc *wc, struct acm_mad *mad)
{
struct acmp_send_msg *req;
struct acm_resolve_rec *rec;
int free;
acm_log(2, "\n");
if (mad->base_version != 1 || mad->class_version != 1) {
acm_log(0, "ERROR - invalid version %d %d\n",
mad->base_version, mad->class_version);
return;
}
if (mad->control != ACM_CTRL_RESOLVE) {
acm_log(0, "ERROR - invalid control 0x%x\n", mad->control);
return;
}
rec = (struct acm_resolve_rec *) mad->data;
acm_format_name(2, log_data, sizeof log_data,
rec->src_type, rec->src, sizeof rec->src);
acm_log(2, "src %s\n", log_data);
acm_format_name(2, log_data, sizeof log_data,
rec->dest_type, rec->dest, sizeof rec->dest);
acm_log(2, "dest %s\n", log_data);
if (mad->method & IB_METHOD_RESP) {
acm_log(2, "received response\n");
req = acmp_get_request(ep, mad->tid, &free);
if (!req) {
acm_log(1, "notice - response did not match active request\n");
return;
}
acm_log(2, "found matching request\n");
req->resp_handler(req, wc, mad);
if (free)
acmp_free_send(req);
} else {
acm_log(2, "unsolicited request\n");
acmp_process_addr_req(ep, wc, mad);
}
}
static void
acmp_sa_resp(struct acm_sa_mad *mad)
{
struct acmp_request *req = (struct acmp_request *) mad->context;
struct ib_sa_mad *sa_mad = (struct ib_sa_mad *) &mad->sa_mad;
req->msg.hdr.opcode |= ACM_OP_ACK;
if (!mad->umad.status) {
req->msg.hdr.status = (uint8_t) (be16toh(sa_mad->status) >> 8);
memcpy(&req->msg.resolve_data[0].info.path, sa_mad->data,
sizeof(struct ibv_path_record));
} else {
req->msg.hdr.status = ACM_STATUS_ETIMEDOUT;
}
acm_log(2, "status 0x%x\n", req->msg.hdr.status);
if (req->msg.hdr.status)
atomic_inc(&req->ep->counters[ACM_CNTR_ERROR]);
acm_query_response(req->id, &req->msg);
acm_free_sa_mad(mad);
acmp_free_req(req);
}
static void acmp_process_sa_recv(struct acmp_ep *ep, struct ibv_wc *wc, struct acm_mad *mad)
{
struct ib_sa_mad *sa_mad = (struct ib_sa_mad *) mad;
struct acmp_send_msg *req;
int free;
acm_log(2, "\n");
if (mad->base_version != 1 || mad->class_version != 2 ||
!(mad->method & IB_METHOD_RESP) || sa_mad->attr_id != IB_SA_ATTR_PATH_REC) {
acm_log(0, "ERROR - unexpected SA MAD %d %d\n",
mad->base_version, mad->class_version);
return;
}
req = acmp_get_request(ep, mad->tid, &free);
if (!req) {
acm_log(1, "notice - response did not match active request\n");
return;
}
acm_log(2, "found matching request\n");
req->resp_handler(req, wc, mad);
if (free)
acmp_free_send(req);
}
static void acmp_process_recv(struct acmp_ep *ep, struct ibv_wc *wc)
{
struct acm_mad *mad;
acm_log(2, "base endpoint name %s\n", ep->id_string);
mad = (struct acm_mad *) (uintptr_t) (wc->wr_id + sizeof(struct ibv_grh));
switch (mad->mgmt_class) {
case IB_MGMT_CLASS_SA:
acmp_process_sa_recv(ep, wc, mad);
break;
case ACM_MGMT_CLASS:
acmp_process_acm_recv(ep, wc, mad);
break;
default:
acm_log(0, "ERROR - invalid mgmt class 0x%x\n", mad->mgmt_class);
break;
}
acmp_post_recv(ep, wc->wr_id);
}
static void acmp_process_comp(struct acmp_ep *ep, struct ibv_wc *wc)
{
if (wc->status) {
acm_log(0, "ERROR - work completion error\n"
"\topcode %d, completion status %d\n",
wc->opcode, wc->status);
return;
}
if (wc->opcode & IBV_WC_RECV)
acmp_process_recv(ep, wc);
else
acmp_complete_send((struct acmp_send_msg *) (uintptr_t) wc->wr_id);
}
static void *acmp_comp_handler(void *context)
{
struct acmp_device *dev = (struct acmp_device *) context;
struct acmp_ep *ep;
struct ibv_cq *cq;
struct ibv_wc wc;
int cnt;
acm_log(1, "started\n");
if (pthread_setcanceltype(PTHREAD_CANCEL_DEFERRED, NULL)) {
acm_log(0, "Error: failed to set cancel type for dev %s\n",
dev->verbs->device->name);
pthread_exit(NULL);
}
if (pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, NULL)) {
acm_log(0, "Error: failed to set cancel state for dev %s\n",
dev->verbs->device->name);
pthread_exit(NULL);
}
while (1) {
pthread_testcancel();
ibv_get_cq_event(dev->channel, &cq, (void *) &ep);
cnt = 0;
while (ibv_poll_cq(cq, 1, &wc) > 0) {
cnt++;
acmp_process_comp(ep, &wc);
}
ibv_req_notify_cq(cq, 0);
while (ibv_poll_cq(cq, 1, &wc) > 0) {
cnt++;
acmp_process_comp(ep, &wc);
}
ibv_ack_cq_events(cq, cnt);
}
return NULL;
}
static void acmp_format_mgid(union ibv_gid *mgid, uint16_t pkey, uint8_t tos,
uint8_t rate, uint8_t mtu)
{
mgid->raw[0] = 0xFF;
mgid->raw[1] = 0x10 | 0x05;
mgid->raw[2] = 0x40;
mgid->raw[3] = 0x01;
mgid->raw[4] = (uint8_t) (pkey >> 8);
mgid->raw[5] = (uint8_t) pkey;
mgid->raw[6] = tos;
mgid->raw[7] = rate;
mgid->raw[8] = mtu;
mgid->raw[9] = 0;
mgid->raw[10] = 0;
mgid->raw[11] = 0;
mgid->raw[12] = 0;
mgid->raw[13] = 0;
mgid->raw[14] = 0;
mgid->raw[15] = 0;
}
static void acmp_init_join(struct ib_sa_mad *mad, union ibv_gid *port_gid,
uint16_t pkey, uint8_t tos, uint8_t tclass, uint8_t sl, uint8_t rate, uint8_t mtu)
{
struct ib_mc_member_rec *mc_rec;
acm_log(2, "\n");
mad->base_version = 1;
mad->mgmt_class = IB_MGMT_CLASS_SA;
mad->class_version = 2;
mad->method = IB_METHOD_SET;
mad->tid = htobe64((uint64_t) atomic_inc(&g_tid));
mad->attr_id = IB_SA_ATTR_MC_MEMBER_REC;
mad->comp_mask =
IB_COMP_MASK_MC_MGID | IB_COMP_MASK_MC_PORT_GID |
IB_COMP_MASK_MC_QKEY | IB_COMP_MASK_MC_MTU_SEL| IB_COMP_MASK_MC_MTU |
IB_COMP_MASK_MC_TCLASS | IB_COMP_MASK_MC_PKEY | IB_COMP_MASK_MC_RATE_SEL |
IB_COMP_MASK_MC_RATE | IB_COMP_MASK_MC_SL | IB_COMP_MASK_MC_FLOW |
IB_COMP_MASK_MC_SCOPE | IB_COMP_MASK_MC_JOIN_STATE;
mc_rec = (struct ib_mc_member_rec *) mad->data;
acmp_format_mgid(&mc_rec->mgid, pkey | IB_PKEY_FULL_MEMBER, tos, rate, mtu);
mc_rec->port_gid = *port_gid;
mc_rec->qkey = htobe32(ACM_QKEY);
mc_rec->mtu = umad_sa_set_rate_mtu_or_life(UMAD_SA_SELECTOR_EXACTLY, mtu);
mc_rec->tclass = tclass;
mc_rec->pkey = htobe16(pkey);
mc_rec->rate = umad_sa_set_rate_mtu_or_life(UMAD_SA_SELECTOR_EXACTLY, rate);
mc_rec->sl_flow_hop = umad_sa_mcm_set_sl_flow_hop(sl, 0, 0);
mc_rec->scope_state = umad_sa_mcm_set_scope_state(UMAD_SA_MCM_ADDR_SCOPE_SITE_LOCAL,
UMAD_SA_MCM_JOIN_STATE_FULL_MEMBER);
}
static void acmp_join_group(struct acmp_ep *ep, union ibv_gid *port_gid,
uint8_t tos, uint8_t tclass, uint8_t sl, uint8_t rate, uint8_t mtu)
{
struct ib_sa_mad *mad;
struct ib_mc_member_rec *mc_rec;
struct acm_sa_mad *sa_mad;
acm_log(2, "\n");
sa_mad = acm_alloc_sa_mad(ep->endpoint, ep, acmp_process_join_resp);
if (!sa_mad) {
acm_log(0, "Error - failed to allocate sa_mad\n");
return;
}
acm_log(0, "%s %d pkey 0x%x, sl 0x%x, rate 0x%x, mtu 0x%x\n",
ep->port->dev->verbs->device->name,
ep->port->port_num, ep->pkey, sl, rate, mtu);
mad = (struct ib_sa_mad *) &sa_mad->sa_mad;
acmp_init_join(mad, port_gid, ep->pkey, tos, tclass, sl, rate, mtu);
mc_rec = (struct ib_mc_member_rec *) mad->data;
acmp_set_dest_addr(&ep->mc_dest[ep->mc_cnt++], ACM_ADDRESS_GID,
mc_rec->mgid.raw, sizeof(mc_rec->mgid));
ep->mc_dest[ep->mc_cnt - 1].state = ACMP_INIT;
if (acm_send_sa_mad(sa_mad)) {
acm_log(0, "Error - Failed to send sa mad\n");
acm_free_sa_mad(sa_mad);
}
}
static void acmp_ep_join(struct acmp_ep *ep)
{
struct acmp_port *port;
union ibv_gid gid;
port = ep->port;
acm_log(1, "%s\n", ep->id_string);
if (ep->mc_dest[0].state == ACMP_READY && ep->mc_dest[0].ah) {
ibv_detach_mcast(ep->qp, &ep->mc_dest[0].mgid,
ep->mc_dest[0].av.dlid);
ibv_destroy_ah(ep->mc_dest[0].ah);
ep->mc_dest[0].ah = NULL;
}
ep->mc_cnt = 0;
ep->state = ACMP_INIT;
acm_get_gid((struct acm_port *)ep->port->port, 0, &gid);
acmp_join_group(ep, &gid, 0, 0, 0, min_rate, min_mtu);
if ((route_prot == ACMP_ROUTE_PROT_ACM) &&
(port->rate != min_rate || port->mtu != min_mtu))
acmp_join_group(ep, &gid, 0, 0, 0, port->rate, port->mtu);
acm_log(1, "join for %s complete\n", ep->id_string);
}
static int acmp_port_join(void *port_context)
{
struct acmp_ep *ep;
struct acmp_port *port = port_context;
acm_log(1, "device %s port %d\n", port->dev->verbs->device->name,
port->port_num);
list_for_each(&port->ep_list, ep, entry) {
if (!ep->endpoint) {
/* Stale endpoint */
continue;
}
acmp_ep_join(ep);
}
acm_log(1, "joins for device %s port %d complete\n",
port->dev->verbs->device->name, port->port_num);
return 0;
}
static int acmp_handle_event(void *port_context, enum ibv_event_type type)
{
int ret = 0;
acm_log(2, "event %s\n", ibv_event_type_str(type));
switch (type) {
case IBV_EVENT_CLIENT_REREGISTER:
ret = acmp_port_join(port_context);
break;
default:
break;
}
return ret;
}
static void acmp_process_timeouts(void)
{
struct acmp_send_msg *msg;
struct acm_resolve_rec *rec;
struct acm_mad *mad;
while ((msg = list_pop(&timeout_list, struct acmp_send_msg, entry))) {
mad = (struct acm_mad *) &msg->data[0];
rec = (struct acm_resolve_rec *) mad->data;
acm_format_name(0, log_data, sizeof log_data,
rec->dest_type, rec->dest, sizeof rec->dest);
acm_log(0, "notice - dest %s\n", log_data);
msg->resp_handler(msg, NULL, NULL);
acmp_free_send(msg);
}
}
static void acmp_process_wait_queue(struct acmp_ep *ep, uint64_t *next_expire)
{
struct acmp_send_msg *msg, *next;
struct ibv_send_wr *bad_wr;
list_for_each_safe(&ep->wait_queue, msg, next, entry) {
if (msg->expires <= time_stamp_ms()) {
list_del(&msg->entry);
(void) atomic_dec(&wait_cnt);
if (--msg->tries) {
acm_log(1, "notice - retrying request\n");
list_add_tail(&ep->active_queue, &msg->entry);
ibv_post_send(ep->qp, &msg->wr, &bad_wr);
} else {
acm_log(0, "notice - failing request\n");
acmp_send_available(ep, msg->req_queue);
list_add_tail(&timeout_list, &msg->entry);
}
} else {
*next_expire = min(*next_expire, msg->expires);
break;
}
}
}
/* While the device/port/ep will not be freed, we need to be careful of
* their addition while walking the link lists. Therefore, we need to acquire
* the appropriate locks.
*/
static void *acmp_retry_handler(void *context)
{
struct acmp_device *dev;
struct acmp_port *port;
struct acmp_ep *ep;
uint64_t next_expire;
int i, wait;
acm_log(0, "started\n");
if (pthread_setcanceltype(PTHREAD_CANCEL_DEFERRED, NULL)) {
acm_log(0, "Error: failed to set cancel type \n");
pthread_exit(NULL);
}
if (pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, NULL)) {
acm_log(0, "Error: failed to set cancel state\n");
pthread_exit(NULL);
}
retry_thread_started = 1;
while (1) {
while (!atomic_get(&wait_cnt)) {
pthread_testcancel();
event_wait(&timeout_event, -1);
}
next_expire = -1;
pthread_mutex_lock(&acmp_dev_lock);
list_for_each(&acmp_dev_list, dev, entry) {
pthread_mutex_unlock(&acmp_dev_lock);
for (i = 0; i < dev->port_cnt; i++) {
port = &dev->port[i];
pthread_mutex_lock(&port->lock);
list_for_each(&port->ep_list, ep, entry) {
pthread_mutex_unlock(&port->lock);
pthread_mutex_lock(&ep->lock);
if (!list_empty(&ep->wait_queue))
acmp_process_wait_queue(ep, &next_expire);
pthread_mutex_unlock(&ep->lock);
pthread_mutex_lock(&port->lock);
}
pthread_mutex_unlock(&port->lock);
}
pthread_mutex_lock(&acmp_dev_lock);
}
pthread_mutex_unlock(&acmp_dev_lock);
acmp_process_timeouts();
if (next_expire != -1) {
wait = (int) (next_expire - time_stamp_ms());
if (wait > 0 && atomic_get(&wait_cnt)) {
pthread_testcancel();
event_wait(&timeout_event, wait);
}
}
}
retry_thread_started = 0;
return NULL;
}
/* rwlock must be held read-locked */
static int
__acmp_query(struct acmp_ep *ep, struct acm_msg *msg, uint64_t id)
{
struct acmp_request *req;
struct ib_sa_mad *mad;
uint8_t status;
struct acm_sa_mad *sa_mad;
if (ep->state != ACMP_READY) {
status = ACM_STATUS_ENODATA;
goto resp;
}
req = acmp_alloc_req(id, msg);
if (!req) {
status = ACM_STATUS_ENOMEM;
goto resp;
}
req->ep = ep;
sa_mad = acm_alloc_sa_mad(ep->endpoint, req, acmp_sa_resp);
if (!sa_mad) {
acm_log(0, "Error - failed to allocate sa_mad\n");
status = ACM_STATUS_ENOMEM;
goto free_req;
}
mad = (struct ib_sa_mad *) &sa_mad->sa_mad;
acmp_init_path_query(mad);
memcpy(mad->data, &msg->resolve_data[0].info.path,
sizeof(struct ibv_path_record));
mad->comp_mask = acm_path_comp_mask(&msg->resolve_data[0].info.path);
acm_increment_counter(ACM_CNTR_ROUTE_QUERY);
atomic_inc(&ep->counters[ACM_CNTR_ROUTE_QUERY]);
if (acm_send_sa_mad(sa_mad)) {
acm_log(0, "Error - Failed to send sa mad\n");
status = ACM_STATUS_ENODATA;
goto free_mad;
}
return ACM_STATUS_SUCCESS;
free_mad:
acm_free_sa_mad(sa_mad);
free_req:
acmp_free_req(req);
resp:
msg->hdr.opcode |= ACM_OP_ACK;
msg->hdr.status = status;
if (status == ACM_STATUS_ENODATA)
atomic_inc(&ep->counters[ACM_CNTR_NODATA]);
else
atomic_inc(&ep->counters[ACM_CNTR_ERROR]);
return acm_query_response(id, msg);
}
static int
acmp_query(void *addr_context, struct acm_msg *msg, uint64_t id)
{
struct acmp_addr_ctx *addr_ctx = addr_context;
struct acmp_addr *address;
int ret;
pthread_rwlock_rdlock(&addr_ctx->ep->rwlock);
address = addr_ctx->ep->addr_info + addr_ctx->addr_inx;
ret = __acmp_query(address->ep, msg, id);
pthread_rwlock_unlock(&addr_ctx->ep->rwlock);
return ret;
}
static uint8_t
acmp_send_resolve(struct acmp_ep *ep, struct acmp_dest *dest,
struct acm_ep_addr_data *saddr)
{
struct acmp_send_msg *msg;
struct acm_mad *mad;
struct acm_resolve_rec *rec;
int i;
acm_log(2, "\n");
msg = acmp_alloc_send(ep, &ep->mc_dest[0], sizeof(*mad));
if (!msg) {
acm_log(0, "ERROR - cannot allocate send msg\n");
return ACM_STATUS_ENOMEM;
}
acmp_init_send_req(msg, (void *) dest, acmp_process_addr_resp);
(void) atomic_inc(&dest->refcnt);
mad = (struct acm_mad *) msg->data;
mad->base_version = 1;
mad->mgmt_class = ACM_MGMT_CLASS;
mad->class_version = 1;
mad->method = IB_METHOD_GET;
mad->control = ACM_CTRL_RESOLVE;
mad->tid = htobe64((uint64_t) atomic_inc(&g_tid));
rec = (struct acm_resolve_rec *) mad->data;
rec->src_type = (uint8_t) saddr->type;
rec->src_length = ACM_MAX_ADDRESS;
memcpy(rec->src, saddr->info.addr, ACM_MAX_ADDRESS);
rec->dest_type = dest->addr_type;
rec->dest_length = ACM_MAX_ADDRESS;
memcpy(rec->dest, dest->address, ACM_MAX_ADDRESS);
rec->gid_cnt = (uint8_t) ep->mc_cnt;
for (i = 0; i < ep->mc_cnt; i++)
memcpy(&rec->gid[i], ep->mc_dest[i].address, 16);
acm_increment_counter(ACM_CNTR_ADDR_QUERY);
atomic_inc(&ep->counters[ACM_CNTR_ADDR_QUERY]);
acmp_post_send(&ep->resolve_queue, msg);
return 0;
}
/* Caller must hold dest lock */
static uint8_t acmp_queue_req(struct acmp_dest *dest, uint64_t id, struct acm_msg *msg)
{
struct acmp_request *req;
acm_log(2, "id %" PRIu64 "\n", id);
req = acmp_alloc_req(id, msg);
if (!req) {
return ACM_STATUS_ENOMEM;
}
req->ep = dest->ep;
list_add_tail(&dest->req_queue, &req->entry);
return ACM_STATUS_SUCCESS;
}
static int acmp_dest_timeout(struct acmp_dest *dest)
{
uint64_t timestamp = time_stamp_min();
if (timestamp > dest->addr_timeout) {
acm_log(2, "%s address timed out\n", dest->name);
dest->state = ACMP_INIT;
return 1;
} else if (timestamp > dest->route_timeout) {
acm_log(2, "%s route timed out\n", dest->name);
dest->state = ACMP_ADDR_RESOLVED;
return 1;
}
return 0;
}
static int
acmp_check_addr_match(struct ifaddrs *iap, struct acm_ep_addr_data *saddr,
unsigned int d_family)
{
char sip[INET6_ADDRSTRLEN] = {0};
char dip[INET6_ADDRSTRLEN] = {0};
const char *tmp;
size_t sock_size;
unsigned int s_family;
int ret;
s_family = iap->ifa_addr->sa_family;
if (!(iap->ifa_flags & IFF_UP) ||
(s_family != d_family))
return -1;
sock_size = (s_family == AF_INET) ? sizeof(struct sockaddr_in) :
sizeof(struct sockaddr_in6);
ret = getnameinfo(iap->ifa_addr, sock_size,
sip, sizeof(sip),
NULL, 0, NI_NUMERICHOST);
if (ret)
return ret;
tmp = inet_ntop(d_family, (void *)saddr->info.addr, dip,
sizeof(dip));
if (!tmp)
return -1;
ret = memcmp(sip, dip, strlen(dip));
return ret;
}
static void
acmp_acquire_sgid(struct acm_ep_addr_data *saddr,
struct acmp_dest *dest)
{
struct ifaddrs *addrs, *iap;
unsigned int d_family;
int ret;
if (!ib_any_gid(&dest->path.sgid))
return;
if (dest->addr_type != ACM_ADDRESS_IP6 &&
dest->addr_type != ACM_ADDRESS_IP)
return;
if (getifaddrs(&addrs))
return;
d_family = (dest->addr_type == ACM_ADDRESS_IP) ? AF_INET : AF_INET6;
for (iap = addrs; iap != NULL; iap = iap->ifa_next) {
ret = acmp_check_addr_match(iap, saddr, d_family);
if (!ret) {
ret = acm_if_get_sgid(iap->ifa_name,
&dest->path.sgid);
if (!ret)
break;
}
}
freeifaddrs(addrs);
}
static int
acmp_resolve_dest(struct acmp_ep *ep, struct acm_msg *msg, uint64_t id)
{
struct acmp_dest *dest;
struct acm_ep_addr_data *saddr, *daddr;
uint8_t status;
int ret;
saddr = &msg->resolve_data[msg->hdr.src_index];
daddr = &msg->resolve_data[msg->hdr.dst_index];
acm_format_name(2, log_data, sizeof log_data,
daddr->type, daddr->info.addr, sizeof daddr->info.addr);
acm_log(2, "dest %s\n", log_data);
dest = acmp_acquire_dest(ep, daddr->type, daddr->info.addr);
if (!dest) {
acm_log(0, "ERROR - unable to allocate destination in request\n");
atomic_inc(&ep->counters[ACM_CNTR_ERROR]);
return acmp_resolve_response(id, msg, NULL, ACM_STATUS_ENOMEM);
}
pthread_mutex_lock(&dest->lock);
test:
switch (dest->state) {
case ACMP_READY:
if (acmp_dest_timeout(dest))
goto test;
acm_log(2, "request satisfied from local cache\n");
acm_increment_counter(ACM_CNTR_ROUTE_CACHE);
atomic_inc(&ep->counters[ACM_CNTR_ROUTE_CACHE]);
status = ACM_STATUS_SUCCESS;
break;
case ACMP_ADDR_RESOLVED:
acm_log(2, "have address, resolving route\n");
acm_increment_counter(ACM_CNTR_ADDR_CACHE);
atomic_inc(&ep->counters[ACM_CNTR_ADDR_CACHE]);
acmp_acquire_sgid(saddr, dest);
status = acmp_resolve_path_sa(ep, dest, acmp_dest_sa_resp);
if (status) {
break;
}
goto queue;
case ACMP_INIT:
acm_log(2, "sending resolve msg to dest\n");
status = acmp_send_resolve(ep, dest, saddr);
if (status) {
break;
}
dest->state = ACMP_QUERY_ADDR;
/* fall through */
default:
queue:
if (daddr->flags & ACM_FLAGS_NODELAY) {
acm_log(2, "lookup initiated, but client wants no delay\n");
status = ACM_STATUS_ENODATA;
break;
}
status = acmp_queue_req(dest, id, msg);
if (status) {
break;
}
ret = 0;
pthread_mutex_unlock(&dest->lock);
goto put;
}
pthread_mutex_unlock(&dest->lock);
ret = acmp_resolve_response(id, msg, dest, status);
put:
acmp_put_dest(dest);
return ret;
}
static int
acmp_resolve_path(struct acmp_ep *ep, struct acm_msg *msg, uint64_t id)
{
struct acmp_dest *dest;
struct ibv_path_record *path;
uint8_t *addr;
uint8_t status;
int ret;
path = &msg->resolve_data[0].info.path;
addr = msg->resolve_data[1].info.addr;
memset(addr, 0, ACM_MAX_ADDRESS);
if (path->dlid) {
* ((__be16 *) addr) = path->dlid;
dest = acmp_acquire_dest(ep, ACM_ADDRESS_LID, addr);
} else {
memcpy(addr, &path->dgid, sizeof path->dgid);
dest = acmp_acquire_dest(ep, ACM_ADDRESS_GID, addr);
}
if (!dest) {
acm_log(0, "ERROR - unable to allocate destination in request\n");
atomic_inc(&ep->counters[ACM_CNTR_ERROR]);
return acmp_resolve_response(id, msg, NULL, ACM_STATUS_ENOMEM);
}
pthread_mutex_lock(&dest->lock);
test:
switch (dest->state) {
case ACMP_READY:
if (acmp_dest_timeout(dest))
goto test;
acm_log(2, "request satisfied from local cache\n");
acm_increment_counter(ACM_CNTR_ROUTE_CACHE);
atomic_inc(&ep->counters[ACM_CNTR_ROUTE_CACHE]);
status = ACM_STATUS_SUCCESS;
break;
case ACMP_INIT:
acm_log(2, "have path, bypassing address resolution\n");
acmp_record_path_addr(ep, dest, path);
/* fall through */
case ACMP_ADDR_RESOLVED:
acm_log(2, "have address, resolving route\n");
status = acmp_resolve_path_sa(ep, dest, acmp_dest_sa_resp);
if (status) {
break;
}
/* fall through */
default:
if (msg->resolve_data[0].flags & ACM_FLAGS_NODELAY) {
acm_log(2, "lookup initiated, but client wants no delay\n");
status = ACM_STATUS_ENODATA;
break;
}
status = acmp_queue_req(dest, id, msg);
if (status) {
break;
}
ret = 0;
pthread_mutex_unlock(&dest->lock);
goto put;
}
pthread_mutex_unlock(&dest->lock);
ret = acmp_resolve_response(id, msg, dest, status);
put:
acmp_put_dest(dest);
return ret;
}
static int
acmp_resolve(void *addr_context, struct acm_msg *msg, uint64_t id)
{
struct acmp_addr_ctx *addr_ctx = addr_context;
struct acmp_addr *address = addr_ctx->ep->addr_info + addr_ctx->addr_inx;
struct acmp_ep *ep = address->ep;
if (ep->state != ACMP_READY) {
atomic_inc(&ep->counters[ACM_CNTR_NODATA]);
return acmp_resolve_response(id, msg, NULL, ACM_STATUS_ENODATA);
}
atomic_inc(&ep->counters[ACM_CNTR_RESOLVE]);
if (msg->resolve_data[0].type == ACM_EP_INFO_PATH)
return acmp_resolve_path(ep, msg, id);
else
return acmp_resolve_dest(ep, msg, id);
}
static void acmp_query_perf(void *ep_context, uint64_t *values, uint8_t *cnt)
{
struct acmp_ep *ep = ep_context;
int i;
for (i = 0; i < ACM_MAX_COUNTER; i++)
values[i] = htobe64((uint64_t) atomic_get(&ep->counters[i]));
*cnt = ACM_MAX_COUNTER;
}
static enum acmp_addr_prot acmp_convert_addr_prot(char *param)
{
if (!strcasecmp("acm", param))
return ACMP_ADDR_PROT_ACM;
return addr_prot;
}
static enum acmp_route_prot acmp_convert_route_prot(char *param)
{
if (!strcasecmp("acm", param))
return ACMP_ROUTE_PROT_ACM;
else if (!strcasecmp("sa", param))
return ACMP_ROUTE_PROT_SA;
return route_prot;
}
static enum acmp_loopback_prot acmp_convert_loopback_prot(char *param)
{
if (!strcasecmp("none", param))
return ACMP_LOOPBACK_PROT_NONE;
else if (!strcasecmp("local", param))
return ACMP_LOOPBACK_PROT_LOCAL;
return loopback_prot;
}
static enum acmp_route_preload acmp_convert_route_preload(char *param)
{
if (!strcasecmp("none", param) || !strcasecmp("no", param))
return ACMP_ROUTE_PRELOAD_NONE;
else if (!strcasecmp("opensm_full_v1", param))
return ACMP_ROUTE_PRELOAD_OSM_FULL_V1;
return route_preload;
}
static enum acmp_addr_preload acmp_convert_addr_preload(char *param)
{
if (!strcasecmp("none", param) || !strcasecmp("no", param))
return ACMP_ADDR_PRELOAD_NONE;
else if (!strcasecmp("acm_hosts", param))
return ACMP_ADDR_PRELOAD_HOSTS;
return addr_preload;
}
static int acmp_post_recvs(struct acmp_ep *ep)
{
int i, size;
size = recv_depth * ACM_RECV_SIZE;
ep->recv_bufs = malloc(size);
if (!ep->recv_bufs) {
acm_log(0, "ERROR - unable to allocate receive buffer\n");
return ACM_STATUS_ENOMEM;
}
ep->mr = ibv_reg_mr(ep->port->dev->pd, ep->recv_bufs, size,
IBV_ACCESS_LOCAL_WRITE);
if (!ep->mr) {
acm_log(0, "ERROR - unable to register receive buffer\n");
goto err;
}
for (i = 0; i < recv_depth; i++) {
acmp_post_recv(ep, (uintptr_t) (ep->recv_bufs + ACM_RECV_SIZE * i));
}
return 0;
err:
free(ep->recv_bufs);
return -1;
}
/* Parse "opensm full v1" file to build LID to GUID table */
static void acmp_parse_osm_fullv1_lid2guid(FILE *f, __be64 *lid2guid)
{
char s[128];
char *p, *ptr, *p_guid, *p_lid;
uint64_t guid;
uint16_t lid;
while (fgets(s, sizeof s, f)) {
if (s[0] == '#')
continue;
if (!(p = strtok_r(s, " \n", &ptr)))
continue; /* ignore blank lines */
if (strncmp(p, "Switch", sizeof("Switch") - 1) &&
strncmp(p, "Channel", sizeof("Channel") - 1) &&
strncmp(p, "Router", sizeof("Router") - 1))
continue;
if (!strncmp(p, "Channel", sizeof("Channel") - 1)) {
p = strtok_r(NULL, " ", &ptr); /* skip 'Adapter' */
if (!p)
continue;
}
p_guid = strtok_r(NULL, ",", &ptr);
if (!p_guid)
continue;
guid = (uint64_t) strtoull(p_guid, NULL, 16);
ptr = strstr(ptr, "base LID");
if (!ptr)
continue;
ptr += sizeof("base LID");
p_lid = strtok_r(NULL, ",", &ptr);
if (!p_lid)
continue;
lid = (uint16_t) strtoul(p_lid, NULL, 0);
if (lid >= IB_LID_MCAST_START)
continue;
if (lid2guid[lid])
acm_log(0, "ERROR - duplicate lid %u\n", lid);
else
lid2guid[lid] = htobe64(guid);
}
}
/* Parse 'opensm full v1' file to populate PR cache */
static int acmp_parse_osm_fullv1_paths(FILE *f, __be64 *lid2guid, struct acmp_ep *ep)
{
union ibv_gid sgid, dgid;
struct ibv_port_attr attr = {};
struct acmp_dest *dest;
char s[128];
char *p, *ptr, *p_guid, *p_lid;
uint64_t guid;
uint16_t lid, dlid;
__be16 net_dlid;
int sl, mtu, rate;
int ret = 1, i;
uint8_t addr[ACM_MAX_ADDRESS];
uint8_t addr_type;
acm_get_gid((struct acm_port *)ep->port->port, 0, &sgid);
/* Search for endpoint's SLID */
while (fgets(s, sizeof s, f)) {
if (s[0] == '#')
continue;
if (!(p = strtok_r(s, " \n", &ptr)))
continue; /* ignore blank lines */
if (strncmp(p, "Switch", sizeof("Switch") - 1) &&
strncmp(p, "Channel", sizeof("Channel") - 1) &&
strncmp(p, "Router", sizeof("Router") - 1))
continue;
if (!strncmp(p, "Channel", sizeof("Channel") - 1)) {
p = strtok_r(NULL, " ", &ptr); /* skip 'Adapter' */
if (!p)
continue;
}
p_guid = strtok_r(NULL, ",", &ptr);
if (!p_guid)
continue;
guid = (uint64_t) strtoull(p_guid, NULL, 16);
if (guid != be64toh(sgid.global.interface_id))
continue;
ptr = strstr(ptr, "base LID");
if (!ptr)
continue;
ptr += sizeof("base LID");
p_lid = strtok_r(NULL, ",", &ptr);
if (!p_lid)
continue;
lid = (uint16_t) strtoul(p_lid, NULL, 0);
if (lid != ep->port->lid)
continue;
ibv_query_port(ep->port->dev->verbs, ep->port->port_num, &attr);
ret = 0;
break;
}
while (fgets(s, sizeof s, f)) {
if (s[0] == '#')
continue;
if (!(p = strtok_r(s, " \n", &ptr)))
continue; /* ignore blank lines */
if (!strncmp(p, "Switch", sizeof("Switch") - 1) ||
!strncmp(p, "Channel", sizeof("Channel") - 1) ||
!strncmp(p, "Router", sizeof("Router") - 1))
break;
dlid = strtoul(p, NULL, 0);
net_dlid = htobe16(dlid);
p = strtok_r(NULL, ":", &ptr);
if (!p)
continue;
if (strcmp(p, "UNREACHABLE") == 0)
continue;
sl = atoi(p);
p = strtok_r(NULL, ":", &ptr);
if (!p)
continue;
mtu = atoi(p);
p = strtok_r(NULL, ":", &ptr);
if (!p)
continue;
rate = atoi(p);
if (!lid2guid[dlid]) {
acm_log(0, "ERROR - dlid %u not found in lid2guid table\n", dlid);
continue;
}
dgid.global.subnet_prefix = sgid.global.subnet_prefix;
dgid.global.interface_id = lid2guid[dlid];
for (i = 0; i < 2; i++) {
memset(addr, 0, ACM_MAX_ADDRESS);
if (i == 0) {
addr_type = ACM_ADDRESS_LID;
memcpy(addr, &net_dlid, sizeof net_dlid);
} else {
addr_type = ACM_ADDRESS_GID;
memcpy(addr, &dgid, sizeof(dgid));
}
dest = acmp_acquire_dest(ep, addr_type, addr);
if (!dest) {
acm_log(0, "ERROR - unable to create dest\n");
break;
}
dest->path.sgid = sgid;
dest->path.slid = htobe16(ep->port->lid);
dest->path.dgid = dgid;
dest->path.dlid = net_dlid;
dest->path.reversible_numpath = IBV_PATH_RECORD_REVERSIBLE;
dest->path.pkey = htobe16(ep->pkey);
dest->path.mtu = (uint8_t) mtu;
dest->path.rate = (uint8_t) rate;
dest->path.qosclass_sl = htobe16((uint16_t) sl & 0xF);
if (dlid == ep->port->lid) {
dest->path.packetlifetime = 0;
dest->addr_timeout = (uint64_t)~0ULL;
dest->route_timeout = (uint64_t)~0ULL;
} else {
dest->path.packetlifetime = attr.subnet_timeout;
dest->addr_timeout = time_stamp_min() + (unsigned) addr_timeout;
dest->route_timeout = time_stamp_min() + (unsigned) route_timeout;
}
dest->remote_qpn = 1;
dest->state = ACMP_READY;
acmp_put_dest(dest);
acm_log(1, "added cached dest %s\n", dest->name);
}
}
return ret;
}
static int acmp_parse_osm_fullv1(struct acmp_ep *ep)
{
FILE *f;
__be64 *lid2guid;
int ret = 1;
if (!(f = fopen(route_data_file, "r"))) {
acm_log(0, "ERROR - couldn't open %s\n", route_data_file);
return ret;
}
lid2guid = calloc(IB_LID_MCAST_START, sizeof(*lid2guid));
if (!lid2guid) {
acm_log(0, "ERROR - no memory for path record parsing\n");
goto err;
}
acmp_parse_osm_fullv1_lid2guid(f, lid2guid);
rewind(f);
ret = acmp_parse_osm_fullv1_paths(f, lid2guid, ep);
free(lid2guid);
err:
fclose(f);
return ret;
}
static void acmp_parse_hosts_file(struct acmp_ep *ep)
{
FILE *f;
char s[120];
char addr[INET6_ADDRSTRLEN], gid[INET6_ADDRSTRLEN];
uint8_t name[ACM_MAX_ADDRESS];
struct in6_addr ip_addr, ib_addr;
struct acmp_dest *dest, *gid_dest;
uint8_t addr_type;
if (!(f = fopen(addr_data_file, "r"))) {
acm_log(0, "ERROR - couldn't open %s\n", addr_data_file);
return;
}
while (fgets(s, sizeof s, f)) {
if (s[0] == '#')
continue;
if (sscanf(s, "%46s%46s", addr, gid) != 2)
continue;
acm_log(2, "%s", s);
if (inet_pton(AF_INET6, gid, &ib_addr) <= 0) {
acm_log(0, "ERROR - %s is not IB GID\n", gid);
continue;
}
memset(name, 0, ACM_MAX_ADDRESS);
if (inet_pton(AF_INET, addr, &ip_addr) > 0) {
addr_type = ACM_ADDRESS_IP;
memcpy(name, &ip_addr, 4);
} else if (inet_pton(AF_INET6, addr, &ip_addr) > 0) {
addr_type = ACM_ADDRESS_IP6;
memcpy(name, &ip_addr, sizeof(ip_addr));
} else {
addr_type = ACM_ADDRESS_NAME;
strncpy((char *)name, addr, ACM_MAX_ADDRESS);
}
dest = acmp_acquire_dest(ep, addr_type, name);
if (!dest) {
acm_log(0, "ERROR - unable to create dest %s\n", addr);
continue;
}
memset(name, 0, ACM_MAX_ADDRESS);
memcpy(name, &ib_addr, sizeof(ib_addr));
gid_dest = acmp_get_dest(ep, ACM_ADDRESS_GID, name);
if (gid_dest) {
dest->path = gid_dest->path;
dest->state = ACMP_READY;
acmp_put_dest(gid_dest);
} else {
memcpy(&dest->path.dgid, &ib_addr, 16);
//ibv_query_gid(ep->port->dev->verbs, ep->port->port_num,
// 0, &dest->path.sgid);
dest->path.slid = htobe16(ep->port->lid);
dest->path.reversible_numpath = IBV_PATH_RECORD_REVERSIBLE;
dest->path.pkey = htobe16(ep->pkey);
dest->state = ACMP_ADDR_RESOLVED;
}
dest->remote_qpn = 1;
dest->addr_timeout = time_stamp_min() + (unsigned) addr_timeout;
dest->route_timeout = time_stamp_min() + (unsigned) route_timeout;
acmp_put_dest(dest);
acm_log(1, "added host %s address type %d IB GID %s\n",
addr, addr_type, gid);
}
fclose(f);
}
/*
* We currently require that the routing data be preloaded in order to
* load the address data. This is backwards from normal operation, which
* usually resolves the address before the route.
*/
static void acmp_ep_preload(struct acmp_ep *ep)
{
switch (route_preload) {
case ACMP_ROUTE_PRELOAD_OSM_FULL_V1:
if (acmp_parse_osm_fullv1(ep))
acm_log(0, "ERROR - failed to preload EP\n");
break;
default:
break;
}
switch (addr_preload) {
case ACMP_ADDR_PRELOAD_HOSTS:
acmp_parse_hosts_file(ep);
break;
default:
break;
}
}
/* rwlock must be held write-locked */
static int __acmp_add_addr(const struct acm_address *addr, struct acmp_ep *ep,
void **addr_context)
{
struct acmp_dest *dest;
struct acmp_addr_ctx *addr_ctx;
int i;
for (i = 0; (i < ep->nmbr_ep_addrs) &&
(ep->addr_info[i].type != ACM_ADDRESS_INVALID); i++)
;
if (i == ep->nmbr_ep_addrs) {
struct acmp_addr *new_info;
new_info = realloc(ep->addr_info, (i + 1) * sizeof(*ep->addr_info));
if (!new_info) {
acm_log(0, "ERROR - no more space for local address\n");
return -1;
}
ep->addr_info = new_info;
/* Added memory is not initialized */
memset(ep->addr_info + i, 0, sizeof(*ep->addr_info));
++ep->nmbr_ep_addrs;
}
ep->addr_info[i].type = addr->type;
memcpy(&ep->addr_info[i].info, &addr->info, sizeof(addr->info));
memcpy(&ep->addr_info[i].addr, addr, sizeof(*addr));
ep->addr_info[i].ep = ep;
addr_ctx = malloc(sizeof(*addr_ctx));
if (!addr_ctx) {
acm_log(0, "ERROR - unable to alloc address context struct\n");
return -1;
}
addr_ctx->ep = ep;
addr_ctx->addr_inx = i;
if (loopback_prot != ACMP_LOOPBACK_PROT_LOCAL) {
*addr_context = addr_ctx;
return 0;
}
dest = acmp_acquire_dest(ep, addr->type, (uint8_t *)addr->info.addr);
if (!dest) {
acm_log(0, "ERROR - unable to create loopback dest %s\n",
addr->id_string);
memset(&ep->addr_info[i], 0, sizeof(ep->addr_info[i]));
free(addr_ctx);
return -1;
}
acm_get_gid((struct acm_port *) ep->port->port, 0, &dest->path.sgid);
dest->path.dgid = dest->path.sgid;
dest->path.dlid = dest->path.slid = htobe16(ep->port->lid);
dest->path.reversible_numpath = IBV_PATH_RECORD_REVERSIBLE;
dest->path.pkey = htobe16(ep->pkey);
dest->path.mtu = (uint8_t) ep->port->mtu;
dest->path.rate = (uint8_t) ep->port->rate;
dest->remote_qpn = ep->qp->qp_num;
dest->addr_timeout = (uint64_t) ~0ULL;
dest->route_timeout = (uint64_t) ~0ULL;
dest->state = ACMP_READY;
acmp_put_dest(dest);
*addr_context = addr_ctx;
acm_log(1, "added loopback dest %s\n", dest->name);
return 0;
}
static int acmp_add_addr(const struct acm_address *addr, void *ep_context,
void **addr_context)
{
struct acmp_ep *ep = ep_context;
int ret;
acm_log(2, "\n");
pthread_rwlock_wrlock(&ep->rwlock);
ret = __acmp_add_addr(addr, ep, addr_context);
pthread_rwlock_unlock(&ep->rwlock);
return ret;
}
static void acmp_remove_addr(void *addr_context)
{
struct acmp_addr_ctx *addr_ctx = addr_context;
struct acmp_addr *address = addr_ctx->ep->addr_info + addr_ctx->addr_inx;
struct acmp_device *dev;
struct acmp_dest *dest;
struct acmp_ep *ep;
int i;
acm_log(2, "\n");
/*
* The address may be a local destination address. If so,
* delete it from the cache.
*/
pthread_mutex_lock(&acmp_dev_lock);
list_for_each(&acmp_dev_list, dev, entry) {
pthread_mutex_unlock(&acmp_dev_lock);
for (i = 0; i < dev->port_cnt; i++) {
struct acmp_port *port = &dev->port[i];
pthread_mutex_lock(&port->lock);
list_for_each(&port->ep_list, ep, entry) {
pthread_mutex_unlock(&port->lock);
dest = acmp_get_dest(ep, address->type, address->addr.info.addr);
if (dest) {
acm_log(2, "Found a dest addr, deleting it\n");
pthread_mutex_lock(&ep->lock);
acmp_remove_dest(ep, dest);
pthread_mutex_unlock(&ep->lock);
}
pthread_mutex_lock(&port->lock);
}
pthread_mutex_unlock(&port->lock);
}
pthread_mutex_lock(&acmp_dev_lock);
}
pthread_mutex_unlock(&acmp_dev_lock);
memset(address, 0, sizeof(*address));
free(addr_ctx);
}
static struct acmp_port *acmp_get_port(struct acm_endpoint *endpoint)
{
struct acmp_device *dev;
acm_log(1, "dev 0x%" PRIx64 " port %d pkey 0x%x\n",
be64toh(endpoint->port->dev->dev_guid),
endpoint->port->port_num, endpoint->pkey);
list_for_each(&acmp_dev_list, dev, entry) {
if (dev->guid == endpoint->port->dev->dev_guid)
return &dev->port[endpoint->port->port_num - 1];
}
return NULL;
}
static struct acmp_ep *
acmp_get_ep(struct acmp_port *port, struct acm_endpoint *endpoint)
{
struct acmp_ep *ep;
acm_log(1, "dev 0x%" PRIx64 " port %d pkey 0x%x\n",
be64toh(endpoint->port->dev->dev_guid),
endpoint->port->port_num, endpoint->pkey);
list_for_each(&port->ep_list, ep, entry) {
if (ep->pkey == endpoint->pkey)
return ep;
}
return NULL;
}
static uint16_t acmp_get_pkey_index(struct acm_endpoint *endpoint)
{
struct acmp_port *port;
int i;
port = acmp_get_port(endpoint);
if (!port)
return 0;
i = ibv_get_pkey_index(port->dev->verbs, port->port_num,
htobe16(endpoint->pkey));
if (i < 0)
return 0;
return i;
}
static void acmp_close_endpoint(void *ep_context)
{
struct acmp_ep *ep = ep_context;
acm_log(1, "%s %d pkey 0x%04x\n",
ep->port->dev->verbs->device->name,
ep->port->port_num, ep->pkey);
ep->endpoint = NULL;
}
static struct acmp_ep *
acmp_alloc_ep(struct acmp_port *port, struct acm_endpoint *endpoint)
{
struct acmp_ep *ep;
int i;
acm_log(1, "\n");
ep = calloc(1, sizeof *ep);
if (!ep)
return NULL;
ep->port = port;
ep->endpoint = endpoint;
ep->pkey = endpoint->pkey;
ep->resolve_queue.credits = resolve_depth;
ep->resp_queue.credits = send_depth;
list_head_init(&ep->resolve_queue.pending);
list_head_init(&ep->resp_queue.pending);
list_head_init(&ep->active_queue);
list_head_init(&ep->wait_queue);
pthread_mutex_init(&ep->lock, NULL);
sprintf(ep->id_string, "%s-%d-0x%x", port->dev->verbs->device->name,
port->port_num, endpoint->pkey);
if (pthread_rwlock_init(&ep->rwlock, NULL)) {
free(ep);
return NULL;
}
ep->addr_info = NULL;
ep->nmbr_ep_addrs = 0;
for (i = 0; i < ACM_MAX_COUNTER; i++)
atomic_init(&ep->counters[i]);
return ep;
}
static int acmp_open_endpoint(const struct acm_endpoint *endpoint,
void *port_context, void **ep_context)
{
struct acmp_port *port = port_context;
struct acmp_ep *ep;
struct ibv_qp_init_attr init_attr;
struct ibv_qp_attr attr;
int ret, sq_size;
ep = acmp_get_ep(port, (struct acm_endpoint *) endpoint);
if (ep) {
acm_log(2, "endpoint for pkey 0x%x already exists\n", endpoint->pkey);
pthread_mutex_lock(&ep->lock);
ep->endpoint = (struct acm_endpoint *) endpoint;
pthread_mutex_unlock(&ep->lock);
*ep_context = (void *) ep;
return 0;
}
acm_log(2, "creating endpoint for pkey 0x%x\n", endpoint->pkey);
ep = acmp_alloc_ep(port, (struct acm_endpoint *) endpoint);
if (!ep)
return -1;
sprintf(ep->id_string, "%s-%d-0x%x",
port->dev->verbs->device->name,
port->port_num, endpoint->pkey);
sq_size = resolve_depth + send_depth;
ep->cq = ibv_create_cq(port->dev->verbs, sq_size + recv_depth,
ep, port->dev->channel, 0);
if (!ep->cq) {
acm_log(0, "ERROR - failed to create CQ\n");
goto err0;
}
ret = ibv_req_notify_cq(ep->cq, 0);
if (ret) {
acm_log(0, "ERROR - failed to arm CQ\n");
goto err1;
}
memset(&init_attr, 0, sizeof init_attr);
init_attr.cap.max_send_wr = sq_size;
init_attr.cap.max_recv_wr = recv_depth;
init_attr.cap.max_send_sge = 1;
init_attr.cap.max_recv_sge = 1;
init_attr.qp_context = ep;
init_attr.sq_sig_all = 1;
init_attr.qp_type = IBV_QPT_UD;
init_attr.send_cq = ep->cq;
init_attr.recv_cq = ep->cq;
ep->qp = ibv_create_qp(ep->port->dev->pd, &init_attr);
if (!ep->qp) {
acm_log(0, "ERROR - failed to create QP\n");
goto err1;
}
attr.qp_state = IBV_QPS_INIT;
attr.port_num = port->port_num;
attr.pkey_index = acmp_get_pkey_index((struct acm_endpoint *) endpoint);
attr.qkey = ACM_QKEY;
ret = ibv_modify_qp(ep->qp, &attr, IBV_QP_STATE | IBV_QP_PKEY_INDEX |
IBV_QP_PORT | IBV_QP_QKEY);
if (ret) {
acm_log(0, "ERROR - failed to modify QP to init\n");
goto err2;
}
attr.qp_state = IBV_QPS_RTR;
ret = ibv_modify_qp(ep->qp, &attr, IBV_QP_STATE);
if (ret) {
acm_log(0, "ERROR - failed to modify QP to rtr\n");
goto err2;
}
attr.qp_state = IBV_QPS_RTS;
attr.sq_psn = 0;
ret = ibv_modify_qp(ep->qp, &attr, IBV_QP_STATE | IBV_QP_SQ_PSN);
if (ret) {
acm_log(0, "ERROR - failed to modify QP to rts\n");
goto err2;
}
ret = acmp_post_recvs(ep);
if (ret)
goto err2;
pthread_mutex_lock(&port->lock);
list_add(&port->ep_list, &ep->entry);
pthread_mutex_unlock(&port->lock);
acmp_ep_preload(ep);
acmp_ep_join(ep);
*ep_context = (void *) ep;
return 0;
err2:
ibv_destroy_qp(ep->qp);
err1:
ibv_destroy_cq(ep->cq);
err0:
free(ep);
return -1;
}
static void acmp_port_up(struct acmp_port *port)
{
struct ibv_port_attr attr;
uint16_t pkey;
__be16 pkey_be;
__be16 sm_lid;
int i, ret;
int instance;
acm_log(1, "%s %d\n", port->dev->verbs->device->name, port->port_num);
ret = ibv_query_port(port->dev->verbs, port->port_num, &attr);
if (ret) {
acm_log(0, "ERROR - unable to get port attribute\n");
return;
}
port->mtu = attr.active_mtu;
port->rate = acm_get_rate(attr.active_width, attr.active_speed);
if (attr.subnet_timeout >= 8)
port->subnet_timeout = 1 << (attr.subnet_timeout - 8);
port->lid = attr.lid;
port->lid_mask = 0xffff - ((1 << attr.lmc) - 1);
port->sa_dest.av.src_path_bits = 0;
port->sa_dest.av.dlid = attr.sm_lid;
port->sa_dest.av.sl = attr.sm_sl;
port->sa_dest.av.port_num = port->port_num;
port->sa_dest.remote_qpn = 1;
sm_lid = htobe16(attr.sm_lid);
acmp_set_dest_addr(&port->sa_dest, ACM_ADDRESS_LID,
(uint8_t *) &sm_lid, sizeof(sm_lid));
instance = atomic_inc(&port->sa_dest.refcnt) - 1;
port->sa_dest.state = ACMP_READY;
for (i = 0; i < attr.pkey_tbl_len; i++) {
ret = ibv_query_pkey(port->dev->verbs, port->port_num, i, &pkey_be);
if (ret)
continue;
pkey = be16toh(pkey_be);
if (!(pkey & 0x7fff))
continue;
/* Determine pkey index for default partition with preference
* for full membership
*/
if ((pkey & 0x7fff) == 0x7fff) {
port->default_pkey_ix = i;
break;
}
}
port->state = IBV_PORT_ACTIVE;
acm_log(1, "%s %d %d is up\n", port->dev->verbs->device->name, port->port_num, instance);
}
static void acmp_port_down(struct acmp_port *port)
{
int instance;
acm_log(1, "%s %d\n", port->dev->verbs->device->name, port->port_num);
pthread_mutex_lock(&port->lock);
port->state = IBV_PORT_DOWN;
pthread_mutex_unlock(&port->lock);
/*
* We wait for the SA destination to be released. We could use an
* event instead of a sleep loop, but it's not worth it given how
* infrequently we should be processing a port down event in practice.
*/
instance = atomic_dec(&port->sa_dest.refcnt);
if (instance == 1) {
pthread_mutex_lock(&port->sa_dest.lock);
port->sa_dest.state = ACMP_INIT;
pthread_mutex_unlock(&port->sa_dest.lock);
}
acm_log(1, "%s %d %d is down\n", port->dev->verbs->device->name, port->port_num, instance);
}
static int acmp_open_port(const struct acm_port *cport, void *dev_context,
void **port_context)
{
struct acmp_device *dev = dev_context;
struct acmp_port *port;
if (cport->port_num < 1 || cport->port_num > dev->port_cnt) {
acm_log(0, "Error: port_num %d is out of range (max %d)\n",
cport->port_num, dev->port_cnt);
return -1;
}
port = &dev->port[cport->port_num - 1];
pthread_mutex_lock(&port->lock);
port->port = cport;
port->state = IBV_PORT_DOWN;
pthread_mutex_unlock(&port->lock);
acmp_port_up(port);
*port_context = port;
return 0;
}
static void acmp_close_port(void *port_context)
{
struct acmp_port *port = port_context;
acmp_port_down(port);
pthread_mutex_lock(&port->lock);
port->port = NULL;
pthread_mutex_unlock(&port->lock);
}
static void acmp_init_port(struct acmp_port *port, struct acmp_device *dev,
uint8_t port_num)
{
acm_log(1, "%s %d\n", dev->verbs->device->name, port_num);
port->dev = dev;
port->port_num = port_num;
pthread_mutex_init(&port->lock, NULL);
list_head_init(&port->ep_list);
acmp_init_dest(&port->sa_dest, ACM_ADDRESS_LID, NULL, 0);
port->state = IBV_PORT_DOWN;
}
static int acmp_open_dev(const struct acm_device *device, void **dev_context)
{
struct acmp_device *dev;
size_t size;
struct ibv_device_attr attr;
int i, ret;
struct ibv_context *verbs;
acm_log(1, "dev_guid 0x%" PRIx64 " %s\n", be64toh(device->dev_guid),
device->verbs->device->name);
list_for_each(&acmp_dev_list, dev, entry) {
if (dev->guid == device->dev_guid) {
acm_log(2, "dev_guid 0x%" PRIx64 " already exits\n",
be64toh(device->dev_guid));
*dev_context = dev;
dev->device = device;
return 0;
}
}
/* We need to release the core device structure when device close is
* called. But this provider does not support dynamic add/removal of
* devices/ports/endpoints. To avoid use-after-free issues, we open
* our own verbs context, rather than using the one in the core
* device structure.
*/
verbs = ibv_open_device(device->verbs->device);
if (!verbs) {
acm_log(0, "ERROR - opening device %s\n",
device->verbs->device->name);
goto err;
}
ret = ibv_query_device(verbs, &attr);
if (ret) {
acm_log(0, "ERROR - ibv_query_device (%s) %d\n",
verbs->device->name, ret);
goto err;
}
size = sizeof(*dev) + sizeof(struct acmp_port) * attr.phys_port_cnt;
dev = (struct acmp_device *) calloc(1, size);
if (!dev)
goto err;
dev->verbs = verbs;
dev->device = device;
dev->port_cnt = attr.phys_port_cnt;
dev->pd = ibv_alloc_pd(dev->verbs);
if (!dev->pd) {
acm_log(0, "ERROR - unable to allocate PD\n");
goto err1;
}
dev->channel = ibv_create_comp_channel(dev->verbs);
if (!dev->channel) {
acm_log(0, "ERROR - unable to create comp channel\n");
goto err2;
}
for (i = 0; i < dev->port_cnt; i++) {
acmp_init_port(&dev->port[i], dev, i + 1);
}
if (pthread_create(&dev->comp_thread_id, NULL, acmp_comp_handler, dev)) {
acm_log(0, "Error -- failed to create the comp thread for dev %s",
dev->verbs->device->name);
goto err3;
}
pthread_mutex_lock(&acmp_dev_lock);
list_add(&acmp_dev_list, &dev->entry);
pthread_mutex_unlock(&acmp_dev_lock);
dev->guid = device->dev_guid;
*dev_context = dev;
acm_log(1, "%s opened\n", dev->verbs->device->name);
return 0;
err3:
ibv_destroy_comp_channel(dev->channel);
err2:
ibv_dealloc_pd(dev->pd);
err1:
free(dev);
err:
return -1;
}
static void acmp_close_dev(void *dev_context)
{
struct acmp_device *dev = dev_context;
acm_log(1, "dev_guid 0x%" PRIx64 "\n", be64toh(dev->device->dev_guid));
dev->device = NULL;
}
static void acmp_set_options(void)
{
FILE *f;
char s[120];
char opt[32], value[256];
const char *opts_file = acm_get_opts_file();
if (!(f = fopen(opts_file, "r")))
return;
while (fgets(s, sizeof s, f)) {
if (s[0] == '#')
continue;
if (sscanf(s, "%31s%255s", opt, value) != 2)
continue;
if (!strcasecmp("addr_prot", opt))
addr_prot = acmp_convert_addr_prot(value);
else if (!strcasecmp("addr_timeout", opt))
addr_timeout = atoi(value);
else if (!strcasecmp("route_prot", opt))
route_prot = acmp_convert_route_prot(value);
else if (!strcmp("route_timeout", opt))
route_timeout = atoi(value);
else if (!strcasecmp("loopback_prot", opt))
loopback_prot = acmp_convert_loopback_prot(value);
else if (!strcasecmp("timeout", opt))
timeout = atoi(value);
else if (!strcasecmp("retries", opt))
retries = atoi(value);
else if (!strcasecmp("resolve_depth", opt))
resolve_depth = atoi(value);
else if (!strcasecmp("send_depth", opt))
send_depth = atoi(value);
else if (!strcasecmp("recv_depth", opt))
recv_depth = atoi(value);
else if (!strcasecmp("min_mtu", opt))
min_mtu = acm_convert_mtu(atoi(value));
else if (!strcasecmp("min_rate", opt))
min_rate = acm_convert_rate(atoi(value));
else if (!strcasecmp("route_preload", opt))
route_preload = acmp_convert_route_preload(value);
else if (!strcasecmp("route_data_file", opt))
strcpy(route_data_file, value);
else if (!strcasecmp("addr_preload", opt))
addr_preload = acmp_convert_addr_preload(value);
else if (!strcasecmp("addr_data_file", opt))
strcpy(addr_data_file, value);
}
fclose(f);
}
static void acmp_log_options(void)
{
acm_log(0, "address resolution %d\n", addr_prot);
acm_log(0, "address timeout %d\n", addr_timeout);
acm_log(0, "route resolution %d\n", route_prot);
acm_log(0, "route timeout %d\n", route_timeout);
acm_log(0, "loopback resolution %d\n", loopback_prot);
acm_log(0, "timeout %d ms\n", timeout);
acm_log(0, "retries %d\n", retries);
acm_log(0, "resolve depth %d\n", resolve_depth);
acm_log(0, "send depth %d\n", send_depth);
acm_log(0, "receive depth %d\n", recv_depth);
acm_log(0, "minimum mtu %d\n", min_mtu);
acm_log(0, "minimum rate %d\n", min_rate);
acm_log(0, "route preload %d\n", route_preload);
acm_log(0, "route data file %s\n", route_data_file);
acm_log(0, "address preload %d\n", addr_preload);
acm_log(0, "address data file %s\n", addr_data_file);
}
static void __attribute__((constructor)) acmp_init(void)
{
acmp_set_options();
acmp_log_options();
atomic_init(&g_tid);
atomic_init(&wait_cnt);
pthread_mutex_init(&acmp_dev_lock, NULL);
event_init(&timeout_event);
umad_init();
acm_log(1, "starting timeout/retry thread\n");
if (pthread_create(&retry_thread_id, NULL, acmp_retry_handler, NULL)) {
acm_log(0, "Error: failed to create the retry thread");
retry_thread_started = 0;
return;
}
acmp_initialized = 1;
}
int provider_query(struct acm_provider **provider, uint32_t *version)
{
acm_log(1, "\n");
if (!acmp_initialized)
return -1;
if (provider)
*provider = &def_prov;
if (version)
*version = ACM_PROV_VERSION;
return 0;
}