[ofa-general] [RFC] 5/5: IB ACM: ib_acm service

Sean Hefty sean.hefty at intel.com
Thu Sep 17 00:03:15 PDT 2009


Name and address resolution service for InfiniBand.
Defines and implements the ib_acm service to ib_acm service protocol.

Signed-off-by: Sean Hefty <sean.hefty at intel.com>
---
Note: Some of this was implemented before the IB ACM package added a dependency
on libibumad.  I have not gone back through to see if all of the definitions are
necessary given that dependency.

/*
 * Copyright (c) 2009 Intel Corporation.  All rights reserved.
 *
 * This software is available to you under the OpenFabrics.org BSD license
 * below:
 *
 *     Redistribution and use in source and binary forms, with or
 *     without modification, are permitted provided that the following
 *     conditions are met:
 *
 *      - Redistributions of source code must retain the above
 *        copyright notice, this list of conditions and the following
 *        disclaimer.
 *
 *      - Redistributions in binary form must reproduce the above
 *        copyright notice, this list of conditions and the following
 *        disclaimer in the documentation and/or other materials
 *        provided with the distribution.
 *
 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
 * EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
 * MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AWV
 * NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS
 * BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN
 * ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
 * CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
 * SOFTWARE.
 */

#if !defined(ACM_MAD_H)
#define ACM_MAD_H

#include <infiniband/verbs.h>
#include <infiniband/acm.h>
#include <netinet/in.h>

#define ACM_SEND_SIZE 256
#define ACM_RECV_SIZE (ACM_SEND_SIZE + sizeof(struct ibv_grh))

#define IB_METHOD_GET       0x01
#define IB_METHOD_SET       0x02
#define IB_METHOD_SEND      0x03
#define IB_METHOD_GET_TABLE 0x12
#define IB_METHOD_DELETE    0x15
#define IB_METHOD_RESP      0x80

#define ACM_MGMT_CLASS   0x2C

#define ACM_CTRL_ACK     htons(0x8000)
#define ACM_CTRL_RESOLVE htons(0x0001)
#define ACM_CTRL_CM      htons(0x0002)

struct acm_mad
{
	uint8_t  base_version;
	uint8_t  mgmt_class;
	uint8_t  class_version;
	uint8_t  method;
	uint16_t status;
	uint16_t control;
	uint64_t tid;

	uint8_t  data[240];
};

#define acm_class_status(status) ((uint8_t) (ntohs(status) >> 8))

#define ACM_QKEY 0x80010000

#define ACM_ADDRESS_INVALID    0x00
#define ACM_ADDRESS_NAME       0x01
#define ACM_ADDRESS_IP         0x02
#define ACM_ADDRESS_IP6        0x03
#define ACM_ADDRESS_RESERVED   0x04  /* start of reserved range */

#define ACM_MAX_GID_COUNT        10

struct acm_resolve_rec
{
	uint8_t       dest_type;
	uint8_t       dest_length;
	uint8_t       src_type;
	uint8_t       src_length;
	uint8_t       gid_cnt;
	uint8_t       resp_resources;
	uint8_t       init_depth;
	uint8_t       reserved;
	uint8_t       dest[ACM_MAX_ADDRESS];
	uint8_t       src[ACM_MAX_ADDRESS];
	union ibv_gid gid[ACM_MAX_GID_COUNT];
};

#define IB_MGMT_CLASS_SA 0x03

struct ib_sa_mad
{
	uint8_t  base_version;
	uint8_t  mgmt_class;
	uint8_t  class_version;
	uint8_t  method;
	uint16_t status;
	uint16_t reserved1;
	uint64_t tid;
	uint16_t attr_id;
	uint16_t reserved2;
	uint32_t attr_mod;

	uint8_t  rmpp_version;
	uint8_t  rmpp_type;
	uint8_t  rmpp_flags;
	uint8_t  rmpp_status;
	uint32_t seg_num;
	uint32_t paylen_newwin;

	uint32_t sm_key[2];
	uint16_t attr_offset;
	uint16_t reserved3;
	uint64_t comp_mask;

	uint8_t  data[200];
};

#define IB_SA_ATTR_PATH_REC htons(0x0035)

#define IB_COMP_MASK_PR_SERVICE_ID         (htonll(1 << 0) | \
                                            htonll(1 << 1))
#define IB_COMP_MASK_PR_DGID                htonll(1 << 2)
#define IB_COMP_MASK_PR_SGID                htonll(1 << 3)
#define IB_COMP_MASK_PR_DLID                htonll(1 << 4)
#define IB_COMP_MASK_PR_SLID                htonll(1 << 5)
#define IB_COMP_MASK_PR_RAW_TRAFFIC         htonll(1 << 6)
/* RESERVED                                 htonll(1 << 7) */
#define IB_COMP_MASK_PR_FLOW_LABEL          htonll(1 << 8)
#define IB_COMP_MASK_PR_HOP_LIMIT           htonll(1 << 9)
#define IB_COMP_MASK_PR_TCLASS              htonll(1 << 10)
#define IB_COMP_MASK_PR_REVERSIBLE          htonll(1 << 11)
#define IB_COMP_MASK_PR_NUM_PATH            htonll(1 << 12)
#define IB_COMP_MASK_PR_PKEY                htonll(1 << 13)
#define IB_COMP_MASK_PR_QOS_CLASS           htonll(1 << 14)
#define IB_COMP_MASK_PR_SL                  htonll(1 << 15)
#define IB_COMP_MASK_PR_MTU_SELECTOR        htonll(1 << 16)
#define IB_COMP_MASK_PR_MTU                 htonll(1 << 17)
#define IB_COMP_MASK_PR_RATE_SELECTOR       htonll(1 << 18)
#define IB_COMP_MASK_PR_RATE                htonll(1 << 19)
#define IB_COMP_MASK_PR_PACKET_LIFETIME_SELECTOR htonll(1 << 20)
#define IB_COMP_MASK_PR_PACKET_LIFETIME     htonll(1 << 21)
#define IB_COMP_MASK_PR_PREFERENCE          htonll(1 << 22)
/* RESERVED                                 htonll(1 << 23) */

#define IB_MC_QPN 0xffffff
#define IB_SA_ATTR_MC_MEMBER_REC htons(0x0038)

#define IB_COMP_MASK_MC_MGID                htonll(1 << 0)
#define IB_COMP_MASK_MC_PORT_GID            htonll(1 << 1)
#define IB_COMP_MASK_MC_QKEY                htonll(1 << 2)
#define IB_COMP_MASK_MC_MLID                htonll(1 << 3)
#define IB_COMP_MASK_MC_MTU_SEL             htonll(1 << 4)
#define IB_COMP_MASK_MC_MTU                 htonll(1 << 5)
#define IB_COMP_MASK_MC_TCLASS              htonll(1 << 6)
#define IB_COMP_MASK_MC_PKEY                htonll(1 << 7)
#define IB_COMP_MASK_MC_RATE_SEL            htonll(1 << 8)
#define IB_COMP_MASK_MC_RATE                htonll(1 << 9)
#define IB_COMP_MASK_MC_PACKET_LIFETIME_SEL htonll(1 << 10)
#define IB_COMP_MASK_MC_PACKET_LIFETIME     htonll(1 << 11)
#define IB_COMP_MASK_MC_SL                  htonll(1 << 12)
#define IB_COMP_MASK_MC_FLOW                htonll(1 << 13)
#define IB_COMP_MASK_MC_HOP                 htonll(1 << 14)
#define IB_COMP_MASK_MC_SCOPE               htonll(1 << 15)
#define IB_COMP_MASK_MC_JOIN_STATE          htonll(1 << 16)
#define IB_COMP_MASK_MC_PROXY_JOIN          htonll(1 << 17)

struct ib_mc_member_rec
{
	union ibv_gid mgid;
	union ibv_gid port_gid;
	uint32_t      qkey;
	uint16_t      mlid;
	uint8_t       mtu;
	uint8_t       tclass;
	uint16_t      pkey;
	uint8_t       rate;
	uint8_t       packet_lifetime;
	uint32_t      sl_flow_hop;
	uint8_t       scope_state;
	uint8_t       proxy_join;
	uint8_t       reserved[2];
	uint8_t       pad[4];
};

#endif /* ACM_MAD_H */


/*
 * Copyright (c) 2009 Intel Corporation. All rights reserved.
 *
 * This software is available to you under the OpenIB.org BSD license
 * below:
 *
 *     Redistribution and use in source and binary forms, with or
 *     without modification, are permitted provided that the following
 *     conditions are met:
 *
 *      - Redistributions of source code must retain the above
 *        copyright notice, this list of conditions and the following
 *        disclaimer.
 *
 *      - Redistributions in binary form must reproduce the above
 *        copyright notice, this list of conditions and the following
 *        disclaimer in the documentation and/or other materials
 *        provided with the distribution.
 *
 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
 * EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
 * MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AWV
 * NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS
 * BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN
 * ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
 * CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
 * SOFTWARE.
 */

#include <stdio.h>
#include <stdarg.h>
#include <string.h>
#include <osd.h>
#include <arpa/inet.h>
#include <infiniband/acm.h>
#include <infiniband/umad.h>
#include <dlist.h>
#include <search.h>
#include "acm_mad.h"

#define MAX_EP_ADDR 4
#define MAX_EP_MC   2

struct acm_dest
{
	uint8_t            address[ACM_MAX_ADDRESS]; /* keep first */
	struct ibv_ah      *ah;
	struct ibv_ah_attr av;
	union ibv_gid      mgid;
	DLIST_ENTRY        req_queue;
	uint32_t           remote_qpn;
	uint8_t            init_depth;
	uint8_t            resp_resources;
	uint8_t            mtu;
	uint8_t            packet_lifetime;
};

struct acm_port
{
	struct acm_device   *dev;
	DLIST_ENTRY         ep_list;
	int                 mad_portid;
	int                 mad_agentid;
	struct acm_dest     sa_dest;
	enum ibv_port_state state;
	enum ibv_mtu        mtu;
	enum ibv_rate       rate;
	int                 subnet_timeout;
	int                 gid_cnt;
	uint16_t            pkey_cnt;
	uint16_t            lid;
	uint8_t             lmc;
	uint8_t             port_num;
};

struct acm_device
{
	struct ibv_context      *verbs;
	struct ibv_comp_channel *channel;
	struct ibv_pd           *pd;
	uint64_t                guid;
	DLIST_ENTRY             entry;
	uint8_t                 active;
	uint8_t                 init_depth;
	uint8_t                 resp_resources;
	int                     port_cnt;
	struct acm_port         port[0];
};

struct acm_ep
{
	struct acm_port    *port;
	struct ibv_cq      *cq;
	struct ibv_qp      *qp;
	struct ibv_mr      *mr;
	uint8_t            *recv_bufs;
	DLIST_ENTRY        entry;
	union acm_ep_addr  addr[MAX_EP_ADDR];
	uint8_t            addr_type[MAX_EP_ADDR];
	void               *dest_map[ACM_ADDRESS_RESERVED - 1];
	struct acm_dest    mc_dest[MAX_EP_MC];
	int                mc_cnt;
	uint16_t           pkey_index;
	uint16_t           pkey;
	lock_t             lock;
	int                available_sends;
	DLIST_ENTRY        pending_queue;
	DLIST_ENTRY        active_queue;
	DLIST_ENTRY        wait_queue;
};

struct acm_send_msg
{
	DLIST_ENTRY        entry;
	struct acm_ep      *ep;
	struct ibv_mr      *mr;
	struct ibv_send_wr wr;
	struct ibv_sge     sge;
	uint64_t           expires;
	int                tries;
	uint8_t            data[ACM_SEND_SIZE];
};

struct acm_client
{
	lock_t   lock;   /* acquire ep lock first */
	SOCKET   sock;
	int      index;
	atomic_t refcnt;
};

struct acm_request
{
	struct acm_client *client;
	DLIST_ENTRY       entry;
	struct acm_msg    msg;
};

static DLIST_ENTRY dev_list;

static atomic_t tid;
static DLIST_ENTRY timeout_list;
static event_t timeout_event;
static atomic_t wait_cnt;

static SOCKET listen_socket;
static struct acm_client client[FD_SETSIZE - 1];

static FILE *flog;
static lock_t log_lock;

static char log_file[128] = "stdout";
static int log_level = 0;
static short server_port = 6125;
static int timeout = 2000;
static int retries = 15;
static int send_depth = 64;
static int recv_depth = 1024;
static uint8_t min_mtu = IBV_MTU_2048;
static uint8_t min_rate = IBV_RATE_10_GBPS;

#define acm_log(level, format, ...) \
	acm_write(level, "%s: "format, __func__, ## __VA_ARGS__)

static void acm_write(int level, const char *format, ...)
{
	va_list args;

	if (level > log_level)
		return;

	va_start(args, format);
	lock_acquire(&log_lock);
	vfprintf(flog, format, args);
	lock_release(&log_lock);
	va_end(args);
}

static void acm_log_ep_addr(int level, const char *msg,
	union acm_ep_addr *addr, uint8_t ep_type)
{
	char ip_addr[ACM_MAX_ADDRESS];

	if (level > log_level)
		return;

	lock_acquire(&log_lock);
	fprintf(flog, msg);
	switch (ep_type) {
	case ACM_EP_TYPE_NAME:
		fprintf(flog, "%s\n", addr->name);
		break;
	case ACM_EP_TYPE_ADDRESS_IP:
		inet_ntop(AF_INET, addr->addr, ip_addr, ACM_MAX_ADDRESS);
		fprintf(flog, "%s\n", ip_addr);
		break;
	case ACM_EP_TYPE_ADDRESS_IP6:
		inet_ntop(AF_INET6, addr->addr, ip_addr, ACM_MAX_ADDRESS);
		fprintf(flog, "%s\n", ip_addr);
		break;
	case ACM_EP_TYPE_DEVICE:
		fprintf(flog, "device guid 0x%llx, pkey index %d, port %d\n",
			addr->dev.guid, addr->dev.pkey_index, addr->dev.port_num);
		break;
	case ACM_EP_TYPE_AV:
		fprintf(flog, "endpoint specified using address vector\n");
		break;
	default:
		fprintf(flog, "unknown endpoint address 0x%x\n", ep_type);
	}
	lock_release(&log_lock);
}

static void *zalloc(size_t size)
{
	void *buf;

	buf = malloc(size);
	if (buf)
		memset(buf, 0, size);
	return buf;
}

static struct acm_send_msg *
acm_alloc_send(struct acm_ep *ep, struct acm_dest *dest, size_t size)
{
	struct acm_send_msg *msg;

	msg = (struct acm_send_msg *) zalloc(sizeof *msg);
	if (!msg) {
		acm_log(0, "ERROR - unable to allocate send buffer\n");
		return NULL;
	}

	msg->ep = ep;
	msg->mr = ibv_reg_mr(ep->port->dev->pd, msg->data, size, 0);
	if (!msg->mr) {
		acm_log(0, "ERROR - failed to register send buffer\n");
		goto err;
	}

	msg->wr.next = NULL;
	msg->wr.sg_list = &msg->sge;
	msg->wr.num_sge = 1;
	msg->wr.opcode = IBV_WR_SEND;
	msg->wr.send_flags = IBV_SEND_SIGNALED;
	msg->wr.wr_id = (uintptr_t) msg;

	msg->wr.wr.ud.ah = dest->ah;
	msg->wr.wr.ud.remote_qpn = dest->remote_qpn;
	msg->wr.wr.ud.remote_qkey = ACM_QKEY;

	msg->sge.length = size;
	msg->sge.lkey = msg->mr->lkey;
	msg->sge.addr = (uintptr_t) msg->data;
	return msg;
err:
	free(msg);
	return NULL;
}

static void acm_free_send(struct acm_send_msg *msg)
{
	ibv_dereg_mr(msg->mr);
	free(msg);
}

static void acm_post_send(struct acm_send_msg *msg)
{
	struct acm_ep *ep = msg->ep;
	struct ibv_send_wr *bad_wr;

	if (ep->available_sends) {
		acm_log(2, "posting send to QP\n");
		ep->available_sends--;
		DListInsertTail(&msg->entry, &ep->active_queue);
		ibv_post_send(ep->qp, &msg->wr, &bad_wr);
	} else {
		acm_log(2, "no sends available, queuing message\n");
		DListInsertTail(&msg->entry, &ep->pending_queue);
	}
}

static void acm_post_recv(struct acm_ep *ep, uint64_t address)
{
	struct ibv_recv_wr wr, *bad_wr;
	struct ibv_sge sge;

	wr.next = NULL;
	wr.sg_list = &sge;
	wr.num_sge = 1;
	wr.wr_id = address;

	sge.length = ACM_RECV_SIZE;
	sge.lkey = ep->mr->lkey;
	sge.addr = address;

	ibv_post_recv(ep->qp, &wr, &bad_wr);
}

static void acm_send_available(struct acm_ep *ep)
{
	struct acm_send_msg *msg;
	struct ibv_send_wr *bad_wr;
	DLIST_ENTRY *entry;

	if (DListEmpty(&ep->pending_queue)) {
		ep->available_sends++;
	} else {
		acm_log(2, "posting queued send message\n");
		entry = ep->pending_queue.Next;
		DListRemove(entry);
		msg = container_of(entry, struct acm_send_msg, entry);
		DListInsertTail(&msg->entry, &ep->active_queue);
		ibv_post_send(ep->qp, &msg->wr, &bad_wr);
	}
}

static void acm_complete_send(struct acm_send_msg *msg)
{
	struct acm_ep *ep = msg->ep;

	lock_acquire(&ep->lock);
	DListRemove(&msg->entry);
	if (msg->tries) {
		acm_log(2, "waiting for response\n");
		msg->expires = time_stamp_ms() + ep->port->subnet_timeout + timeout;
		DListInsertTail(&msg->entry, &ep->wait_queue);
		if (atomic_inc(&wait_cnt) == 1)
			event_signal(&timeout_event);
	} else {
		acm_log(2, "freeing\n");
		acm_send_available(ep);
		acm_free_send(msg);
	}
	lock_release(&ep->lock);
}

static struct acm_send_msg *acm_get_request(struct acm_ep *ep, uint64_t tid, int *free)
{
	struct acm_send_msg *msg, *req = NULL;
	struct acm_mad *mad;
	DLIST_ENTRY *entry, *next;

	acm_log(2, "\n");
	lock_acquire(&ep->lock);
	for (entry = ep->wait_queue.Next; entry != &ep->wait_queue; entry = next) {
		next = entry->Next;
		msg = container_of(entry, struct acm_send_msg, entry);
		mad = (struct acm_mad *) msg->data;
		if (mad->tid == tid) {
			acm_log(2, "match found in wait queue\n");
			req = msg;
			DListRemove(entry);
			(void) atomic_dec(&wait_cnt);
			acm_send_available(ep);
			*free = 1;
			goto unlock;
		}
	}

	for (entry = ep->active_queue.Next; entry != &ep->active_queue; entry = entry->Next) {
		msg = container_of(entry, struct acm_send_msg, entry);
		mad = (struct acm_mad *) msg->data;
		if (mad->tid == tid && msg->tries) {
			acm_log(2, "match found in active queue\n");
			req = msg;
			req->tries = 0;
			*free = 0;
			break;
		}
	}
unlock:
	lock_release(&ep->lock);
	return req;
}

static uint8_t acm_gid_index(struct acm_port *port, union ibv_gid *gid)
{
	union ibv_gid cmp_gid;
	uint8_t i;

	for (i = 0; i < port->gid_cnt; i++) {
		ibv_query_gid(port->dev->verbs, port->port_num, i, &cmp_gid);
		if (!memcmp(&cmp_gid, gid, sizeof cmp_gid))
			break;
	}
	return i;
}

static int acm_mc_index(struct acm_ep *ep, union ibv_gid *gid)
{
	int i;

	for (i = 0; i < ep->mc_cnt; i++) {
		if (!memcmp(&ep->mc_dest[i].address, gid, sizeof(*gid)))
			return i;
	}
	return -1;
}

static void
acm_init_mc_av(struct acm_port *port, struct ib_mc_member_rec *mc_rec,
	struct ibv_ah_attr *av)
{
	uint32_t sl_flow_hop;

	sl_flow_hop = ntohl(mc_rec->sl_flow_hop);

	av->dlid = ntohs(mc_rec->mlid);
	av->sl = (uint8_t) (sl_flow_hop >> 28);
	av->src_path_bits = port->sa_dest.av.src_path_bits;
	av->static_rate = mc_rec->rate & 0x3F;
	av->port_num = port->port_num;

	av->is_global = 1;
	av->grh.dgid = mc_rec->mgid;
	av->grh.flow_label = (sl_flow_hop >> 8) & 0xFFFFF;
	av->grh.sgid_index = acm_gid_index(port, &mc_rec->port_gid);
	av->grh.hop_limit = (uint8_t) sl_flow_hop;
	av->grh.traffic_class = mc_rec->tclass;
}

static void acm_process_join_resp(struct acm_ep *ep, struct ib_user_mad *umad)
{
	struct acm_dest *dest;
	struct ib_mc_member_rec *mc_rec;
	struct ib_sa_mad *mad;
	int index, ret;

	mad = (struct ib_sa_mad *) umad->data;
	acm_log(1, "response status: 0x%x, mad status: 0x%x\n",
		umad->status, mad->status);
	if (umad->status) {
		acm_log(0, "ERROR - send join failed 0x%x\n", umad->status);
		return;
	}
	if (mad->status) {
		acm_log(0, "ERROR - join response status 0x%x\n", mad->status);
		return;
	}

	mc_rec = (struct ib_mc_member_rec *) mad->data;
	lock_acquire(&ep->lock);
	index = acm_mc_index(ep, &mc_rec->mgid);
	if (index >= 0) {
		dest = &ep->mc_dest[index];
		dest->remote_qpn = IB_MC_QPN;
		dest->mgid = mc_rec->mgid;
		acm_init_mc_av(ep->port, mc_rec, &dest->av);
		dest->mtu = mc_rec->mtu & 0x3F;
		dest->packet_lifetime = mc_rec->packet_lifetime & 0x3F;
		dest->ah = ibv_create_ah(ep->port->dev->pd, &dest->av);
		ret = ibv_attach_mcast(ep->qp, &mc_rec->mgid, mc_rec->mlid);
		if (ret) {
			acm_log(0, "ERROR - unable to attach QP to multicast group\n");
		}
		acm_log(1, "join successful\n");
	} else {
		acm_log(0, "ERROR - MGID in join response not found\n");
	}
	lock_release(&ep->lock);
}

static int acm_compare_dest(const void *dest1, const void *dest2)
{
	return memcmp(dest1, dest2, ACM_MAX_ADDRESS);
}

static int acm_addr_index(struct acm_ep *ep, uint8_t *addr, uint8_t addr_type)
{
	int i;

	for (i = 0; i < MAX_EP_ADDR; i++) {
		if (ep->addr_type[i] != addr_type)
			continue;

		if ((addr_type == ACM_ADDRESS_NAME &&
			!strnicmp((char *) ep->addr[i].name, (char *) addr, ACM_MAX_ADDRESS)) ||
			!memcmp(ep->addr[i].addr, addr, ACM_MAX_ADDRESS))
			return i;
	}
	return -1;
}

/*
 * Multicast groups are ordered lowest to highest preference.
 */
static int
acm_record_av(struct acm_dest *dest, struct acm_ep *ep,
	struct ibv_wc *wc, struct acm_resolve_rec *rec)
{
	int i, index;

	acm_log(2, "\n");
	for (i = min(rec->gid_cnt, ACM_MAX_GID_COUNT) - 1; i >= 0; i--) {
		index = acm_mc_index(ep, &rec->gid[i]);
		if (index >= 0) {
			acm_log(2, "selecting MC group at index %d\n", index);
			dest->av = ep->mc_dest[index].av;
			dest->av.dlid = wc->slid;
			dest->av.src_path_bits = wc->dlid_path_bits;
			dest->av.grh.dgid = ((struct ibv_grh *) (uintptr_t) wc->wr_id)->sgid;
			
			dest->mgid = ep->mc_dest[index].mgid;
			dest->mtu = ep->mc_dest[index].mtu;
			dest->packet_lifetime = ep->mc_dest[index].packet_lifetime;
			return ACM_STATUS_SUCCESS;
		}
	}

	return ACM_STATUS_ENODATA;
}

/* 
 * Record the source of a resolve request.  Use the source QPN to see if
 * the remote service has relocated and we need to update our cache.
 */
static struct acm_dest *
acm_record_src(struct acm_ep *ep, struct ibv_wc *wc, struct acm_resolve_rec *rec)
{
	struct acm_dest *dest, **tdest;
	int ret;

	acm_log(2, "\n");
	lock_acquire(&ep->lock);
	tdest = tfind(rec->src, &ep->dest_map[rec->src_type - 1], acm_compare_dest);
	if (!tdest) {
		acm_log(2, "creating new dest\n");
		dest = zalloc(sizeof *dest);
		if (!dest) {
			acm_log(0, "ERROR - unable to allocate dest\n");
			goto unlock;
		}

		memcpy(dest->address, rec->src, ACM_MAX_ADDRESS);
		DListInit(&dest->req_queue);
		tsearch(dest, &ep->dest_map[rec->src_type - 1], acm_compare_dest);
	} else {
		dest = *tdest;
	}

	if (dest->ah) {
		if (dest->remote_qpn == wc->src_qp)
			goto unlock;

		ibv_destroy_ah(dest->ah); // TODO: ah could be in use
		dest->ah = NULL;
	}

	acm_log(2, "creating address handle\n");
	ret = acm_record_av(dest, ep, wc, rec);
	if (ret) {
		acm_log(0, "ERROR - failed to record av\n");
		goto err;
	}

	dest->ah = ibv_create_ah(ep->port->dev->pd, &dest->av);
	if (!dest->ah) {
		acm_log(0, "ERROR - failed to create ah\n");
		goto err;
	}

	dest->remote_qpn = wc->src_qp;
	dest->init_depth = rec->init_depth;
	dest->resp_resources = rec->resp_resources;

unlock:
	lock_release(&ep->lock);
	return dest;

err:
	if (!tdest) {
		tdelete(dest->address, &ep->dest_map[rec->src_type - 1], acm_compare_dest);
		free(dest);
	}
	lock_release(&ep->lock);
	return NULL;
}

static void acm_init_resp_mad(struct acm_mad *resp, struct acm_mad *req)
{
	resp->base_version = req->base_version;
	resp->mgmt_class = req->mgmt_class;
	resp->class_version = req->class_version;
	resp->method = req->method | IB_METHOD_RESP;
	resp->status = ACM_STATUS_SUCCESS;
	resp->control = req->control;
	resp->tid = req->tid;
}

static int acm_validate_resolve_req(struct acm_mad *mad)
{
	struct acm_resolve_rec *rec;

	if (mad->method != IB_METHOD_GET) {
		acm_log(0, "ERROR - invalid method 0x%x\n", mad->method);
		return ACM_STATUS_EINVAL;
	}

	rec = (struct acm_resolve_rec *) mad->data;
	if (!rec->src_type || rec->src_type >= ACM_ADDRESS_RESERVED) {
		acm_log(0, "ERROR - unknown src type 0x%x\n", rec->src_type);
		return ACM_STATUS_EINVAL;
	}

	return 0;
}

static void
acm_process_resolve_req(struct acm_ep *ep, struct ibv_wc *wc, struct acm_mad *mad)
{
	struct acm_resolve_rec *rec, *resp_rec;
	struct acm_dest *dest;
	struct acm_send_msg *msg;
	struct acm_mad *resp_mad;

	acm_log(2, "\n");
	if (acm_validate_resolve_req(mad)) {
		acm_log(0, "ERROR - invalid request\n");
		return;
	}

	rec = (struct acm_resolve_rec *) mad->data;
	dest = acm_record_src(ep, wc, rec);
	if (!dest) {
		acm_log(0, "ERROR - failed to record source\n");
		return;
	}

	if (acm_addr_index(ep, rec->dest, rec->dest_type) < 0) {
		acm_log(2, "no matching address - discarding\n");
		return;
	}

	msg = acm_alloc_send(ep, dest, sizeof (*resp_mad));
	if (!msg) {
		acm_log(0, "ERROR - failed to allocate message\n");
		return;
	}

	resp_mad = (struct acm_mad *) msg->data;
	resp_rec = (struct acm_resolve_rec *) resp_mad->data;
	acm_init_resp_mad(resp_mad, mad);
	resp_rec->dest_type = rec->src_type;
	resp_rec->dest_length = rec->src_length;
	resp_rec->src_type = rec->dest_type;
	resp_rec->src_length = rec->dest_length;
	resp_rec->gid_cnt = 1;
	resp_rec->resp_resources = ep->port->dev->resp_resources;
	resp_rec->init_depth = ep->port->dev->init_depth;
	memcpy(resp_rec->dest, rec->src, ACM_MAX_ADDRESS);
	memcpy(resp_rec->src, rec->dest, ACM_MAX_ADDRESS);
	memcpy(resp_rec->gid, dest->mgid.raw, sizeof(union ibv_gid));

	acm_log(2, "sending resolve response\n");
	lock_acquire(&ep->lock);
	acm_post_send(msg);
	lock_release(&ep->lock);
}

static int
acm_client_resolve_resp(struct acm_ep *ep, struct acm_client *client,
	struct acm_resolve_msg *msg, struct acm_dest *dest, uint8_t status)
{
	int ret;

	acm_log(1, "status 0x%x\n", status);
	lock_acquire(&client->lock);
	if (client->sock == INVALID_SOCKET) {
		acm_log(0, "ERROR - connection lost\n");
		ret = ACM_STATUS_ENOTCONN;
		goto release;
	}

	msg->hdr.opcode |= ACM_OP_ACK;
	msg->hdr.status = status;
	msg->hdr.param = 0;

	if (!status) {
		msg->hdr.src_type = ACM_EP_TYPE_DEVICE;
		msg->src.dev.guid = ep->port->dev->guid;
		msg->src.dev.pkey_index = ep->pkey_index;
		msg->src.dev.port_num = ep->port->port_num;

		if (dest) {
			acm_log(2, "destination found\n");
			msg->hdr.dest_type = ACM_EP_TYPE_AV;
			msg->dest.av = dest->av;
			msg->data.init_depth = min(ep->port->dev->init_depth, dest->resp_resources);
			msg->data.resp_resources = min(ep->port->dev->resp_resources, dest->init_depth);
			msg->data.packet_lifetime = dest->packet_lifetime;
			msg->data.mtu = dest->mtu;
		}
	}

	ret = send(client->sock, (char *) msg, sizeof *msg, 0);
	if (ret != sizeof(*msg))
		acm_log(0, "failed to send response\n");
	else
		ret = 0;

release:
	lock_release(&client->lock);
	(void) atomic_dec(&client->refcnt);
	return ret;
}

static struct acm_dest *
acm_record_dest(struct acm_ep *ep, struct ibv_wc *wc,
	struct acm_resolve_rec *req_rec, struct acm_resolve_rec *resp_rec)
{
	struct acm_dest *dest, **tdest;
	int ret;

	acm_log(2, "\n");
	lock_acquire(&ep->lock);
	tdest = tfind(req_rec->dest, &ep->dest_map[req_rec->dest_type - 1], acm_compare_dest);
	if (!tdest) {
		dest = NULL;
		goto unlock;
	}

	dest = *tdest;
	if (dest->ah)
		goto unlock;

	acm_log(2, "creating address handle\n");
	ret = acm_record_av(dest, ep, wc, resp_rec);
	if (ret) {
		acm_log(0, "ERROR - failed to record av\n");
		goto unlock;
	}

	dest->ah = ibv_create_ah(ep->port->dev->pd, &dest->av);
	if (!dest->ah) {
		acm_log(0, "ERROR - failed to create ah\n");
		goto unlock;
	}

	dest->remote_qpn = wc->src_qp;
	dest->init_depth = resp_rec->init_depth;
	dest->resp_resources = resp_rec->resp_resources;

unlock:
	lock_release(&ep->lock);
	return dest;
}

static void
acm_process_resolve_resp(struct acm_ep *ep, struct ibv_wc *wc,
	struct acm_send_msg *msg, struct acm_mad *mad)
{
	struct acm_resolve_rec *req_rec, *resp_rec;
	struct acm_dest *dest;
	struct acm_request *client_req;
	DLIST_ENTRY *entry;
	uint8_t status;

	status = acm_class_status(mad->status);
	acm_log(2, "resp status 0x%x\n", status);
	req_rec = (struct acm_resolve_rec *) ((struct acm_mad *) msg->data)->data;
	resp_rec = (struct acm_resolve_rec *) mad->data;

	dest = acm_record_dest(ep, wc, req_rec, resp_rec);
	if (!dest) {
		acm_log(0, "ERROR - cannot record dest\n");
		return;
	}

	if (!status && !dest->ah)
		status = ACM_STATUS_EINVAL;

	lock_acquire(&ep->lock);
	while (!DListEmpty(&dest->req_queue)) {
		entry = dest->req_queue.Next;
		DListRemove(entry);
		client_req = container_of(entry, struct acm_request, entry);
		lock_release(&ep->lock);
		acm_log(2, "completing queued client request\n");
		acm_client_resolve_resp(ep, client_req->client,
			(struct acm_resolve_msg *) &client_req->msg, dest, status);
		lock_acquire(&ep->lock);
	}
	if (status) {
		acm_log(0, "resp failed 0x%x\n", status);
		tdelete(dest->address, &ep->dest_map[req_rec->dest_type - 1], acm_compare_dest);
	}
	lock_release(&ep->lock);
}

static int acm_validate_recv(struct acm_mad *mad)
{
	if (mad->base_version != 1 || mad->class_version != 1) {
		acm_log(0, "ERROR - invalid version %d %d\n",
			mad->base_version, mad->class_version);
		return ACM_STATUS_EINVAL;
	}
	
	if (mad->mgmt_class != ACM_MGMT_CLASS) {
		acm_log(0, "ERROR - invalid mgmt class 0x%x\n", mad->mgmt_class);
		return ACM_STATUS_EINVAL;
	}

	if (mad->control != ACM_CTRL_RESOLVE) {
		acm_log(0, "ERROR - invalid control 0x%x\n", mad->control);
		return ACM_STATUS_EINVAL;
	}

	return 0;
}

static void acm_process_recv(struct acm_ep *ep, struct ibv_wc *wc)
{
	struct acm_mad *mad;
	struct acm_send_msg *req;
	int free;

	acm_log(2, "\n");
	mad = (struct acm_mad *) (uintptr_t) (wc->wr_id + sizeof(struct ibv_grh));
	if (acm_validate_recv(mad)) {
		acm_log(0, "ERROR - discarding message\n");
		goto out;
	}

	if (mad->method & IB_METHOD_RESP) {
		acm_log(2, "received response\n");
		req = acm_get_request(ep, mad->tid, &free);
		if (!req) {
			acm_log(0, "response did not match active request\n");
			goto out;
		}
		acm_log(2, "found matching request\n");
		acm_process_resolve_resp(ep, wc, req, mad);
		if (free)
			acm_free_send(req);
	} else {
		acm_log(2, "unsolicited request\n");
		acm_process_resolve_req(ep, wc, mad);
		free = 0;
	}

out:
	acm_post_recv(ep, wc->wr_id);
}

static void acm_process_comp(struct acm_ep *ep, struct ibv_wc *wc)
{
	if (wc->status) {
		acm_log(0, "ERROR - work completion error\n"
			"\topcode %d, completion status %d\n",
			wc->opcode, wc->status);
		return;
	}

	if (wc->opcode & IBV_WC_RECV)
		acm_process_recv(ep, wc);
	else
		acm_complete_send((struct acm_send_msg *) (uintptr_t) wc->wr_id);
}

static void CDECL_FUNC acm_comp_handler(void *context)
{
	struct acm_device *dev = (struct acm_device *) context;
	struct acm_ep *ep;
	struct ibv_cq *cq;
	struct ibv_wc wc;
	int cnt;

	acm_log(1, "started\n");
	while (1) {
		ibv_get_cq_event(dev->channel, &cq, (void *) &ep);

		cnt = 0;
		while (ibv_poll_cq(cq, 1, &wc) > 0) {
			cnt++;
			acm_process_comp(ep, &wc);
		}

		ibv_req_notify_cq(cq, 0);
		while (ibv_poll_cq(cq, 1, &wc) > 0) {
			cnt++;
			acm_process_comp(ep, &wc);
		}

		ibv_ack_cq_events(cq, cnt);
	}
}

static void acm_format_mgid(union ibv_gid *mgid, uint16_t pkey, uint8_t tos,
	uint8_t rate, uint8_t mtu)
{
	mgid->raw[0] = 0xFF;
	mgid->raw[1] = 0x10 | 0x05;
	mgid->raw[2] = 0x40;
	mgid->raw[3] = 0x01;
	mgid->raw[4] = (uint8_t) (pkey >> 8);
	mgid->raw[5] = (uint8_t) pkey;
	mgid->raw[6] = tos;
	mgid->raw[7] = rate;
	mgid->raw[8] = mtu;
	mgid->raw[9] = 0;
	mgid->raw[10] = 0;
	mgid->raw[11] = 0;
	mgid->raw[12] = 0;
	mgid->raw[13] = 0;
	mgid->raw[14] = 0;
	mgid->raw[15] = 0;
}

static void acm_init_path_query(struct ib_sa_mad *mad, struct ib_path_record *path)
{
	uint32_t fl_hop;
	uint16_t qos_sl;

	acm_log(2, "\n");
	mad->base_version = 1;
	mad->mgmt_class = IB_MGMT_CLASS_SA;
	mad->class_version = 2;
	mad->method = IB_METHOD_GET;
	mad->tid = (uint64_t) atomic_inc(&tid);
	mad->attr_id = IB_SA_ATTR_PATH_REC;

	memcpy(mad->data, path, sizeof(*path));
	if (path->service_id)
		mad->comp_mask |= IB_COMP_MASK_PR_SERVICE_ID;
	if (path->dgid.global.interface_id || path->dgid.global.subnet_prefix)
		mad->comp_mask |= IB_COMP_MASK_PR_DGID;
	if (path->sgid.global.interface_id || path->sgid.global.subnet_prefix)
		mad->comp_mask |= IB_COMP_MASK_PR_SGID;
	if (path->dlid)
		mad->comp_mask |= IB_COMP_MASK_PR_DLID;
	if (path->slid)
		mad->comp_mask |= IB_COMP_MASK_PR_SLID;

	fl_hop = ntohl(path->flowlabel_hoplimit);
	if (fl_hop >> 8)
		mad->comp_mask |= IB_COMP_MASK_PR_FLOW_LABEL;
	if (fl_hop & 0xFF)
		mad->comp_mask |= IB_COMP_MASK_PR_HOP_LIMIT;

	if (path->tclass)
		mad->comp_mask |= IB_COMP_MASK_PR_TCLASS;
	if (path->reversible_numpath & 0x80)
		mad->comp_mask |= IB_COMP_MASK_PR_REVERSIBLE;
	if (path->pkey)
		mad->comp_mask |= IB_COMP_MASK_PR_PKEY;

	qos_sl = ntohs(path->qosclass_sl);
	if (qos_sl >> 4)
		mad->comp_mask |= IB_COMP_MASK_PR_QOS_CLASS;
	if (qos_sl & 0xF)
		mad->comp_mask |= IB_COMP_MASK_PR_SL;

	if (path->mtu & 0xC0)
		mad->comp_mask |= IB_COMP_MASK_PR_MTU_SELECTOR;
	if (path->mtu & 0x3F)
		mad->comp_mask |= IB_COMP_MASK_PR_MTU;
	if (path->rate & 0xC0)
		mad->comp_mask |= IB_COMP_MASK_PR_RATE_SELECTOR;
	if (path->rate & 0x3F)
		mad->comp_mask |= IB_COMP_MASK_PR_RATE;
	if (path->packetlifetime & 0xC0)
		mad->comp_mask |= IB_COMP_MASK_PR_PACKET_LIFETIME_SELECTOR;
	if (path->packetlifetime & 0x3F)
		mad->comp_mask |= IB_COMP_MASK_PR_PACKET_LIFETIME;
}

static void acm_init_join(struct ib_sa_mad *mad, union ibv_gid *port_gid,
	uint16_t pkey, uint8_t tos, uint8_t tclass, uint8_t sl, uint8_t rate, uint8_t mtu)
{
	struct ib_mc_member_rec *mc_rec;

	acm_log(2, "\n");
	mad->base_version = 1;
	mad->mgmt_class = IB_MGMT_CLASS_SA;
	mad->class_version = 2;
	mad->method = IB_METHOD_SET;
	mad->tid = (uint64_t) atomic_inc(&tid);
	mad->attr_id = IB_SA_ATTR_MC_MEMBER_REC;
	mad->comp_mask =
		IB_COMP_MASK_MC_MGID | IB_COMP_MASK_MC_PORT_GID |
		IB_COMP_MASK_MC_QKEY | IB_COMP_MASK_MC_MTU_SEL| IB_COMP_MASK_MC_MTU |
		IB_COMP_MASK_MC_TCLASS | IB_COMP_MASK_MC_PKEY | IB_COMP_MASK_MC_RATE_SEL |
		IB_COMP_MASK_MC_RATE | IB_COMP_MASK_MC_SL | IB_COMP_MASK_MC_FLOW |
		IB_COMP_MASK_MC_SCOPE | IB_COMP_MASK_MC_JOIN_STATE;

	mc_rec = (struct ib_mc_member_rec *) mad->data;
	acm_format_mgid(&mc_rec->mgid, pkey, tos, rate, mtu);
	mc_rec->port_gid = *port_gid;
	mc_rec->qkey = ACM_QKEY;
	mc_rec->mtu = 0x80 | mtu;
	mc_rec->tclass = tclass;
	mc_rec->pkey = htons(pkey);
	mc_rec->rate = 0x80 | rate;
	mc_rec->sl_flow_hop = htonl(((uint32_t) sl) << 28);
	mc_rec->scope_state = 0x51;
}

static void acm_join_group(struct acm_ep *ep, union ibv_gid *port_gid,
	uint8_t tos, uint8_t tclass, uint8_t sl, uint8_t rate, uint8_t mtu)
{
	struct acm_port *port;
	struct ib_sa_mad *mad;
	struct ib_user_mad *umad;
	struct ib_mc_member_rec *mc_rec;
	int ret, len;

	acm_log(2, "\n");
	len = sizeof(*umad) + sizeof(*mad);
	umad = (struct ib_user_mad *) zalloc(len);
	if (!umad) {
		acm_log(0, "ERROR - unable to allocate MAD for join\n");
		return;
	}

	port = ep->port;
	umad->addr.qpn = htonl(port->sa_dest.remote_qpn);
	umad->addr.qkey = htonl(ACM_QKEY);
	umad->addr.pkey_index = ep->pkey_index;
	umad->addr.lid = htons(port->sa_dest.av.dlid);
	umad->addr.sl = port->sa_dest.av.sl;
	umad->addr.path_bits = port->sa_dest.av.src_path_bits;

	acm_log(0, "%s %d pkey 0x%x, sl 0x%x, rate 0x%x, mtu 0x%x\n",
		ep->port->dev->verbs->device->name, ep->port->port_num,
		ep->pkey, sl, rate, mtu);
	mad = (struct ib_sa_mad *) umad->data;
	acm_init_join(mad, port_gid, ep->pkey, tos, tclass, sl, rate, mtu);
	mc_rec = (struct ib_mc_member_rec *) mad->data;
	memcpy(&ep->mc_dest[ep->mc_cnt++], &mc_rec->mgid, sizeof(mc_rec->mgid));

	ret = umad_send(port->mad_portid, port->mad_agentid, (void *) umad,
		sizeof(*mad), timeout, retries);
	if (ret) {
		acm_log(0, "ERROR - failed to send multicast join request %d\n", ret);
		goto out;
	}

	acm_log(1, "waiting for response from SA to join request\n");
	ret = umad_recv(port->mad_portid, (void *) umad, &len, -1);
	if (ret < 0) {
		acm_log(0, "ERROR - recv error for multicast join response %d\n", ret);
		goto out;
	}

	acm_process_join_resp(ep, umad);
out:
	free(umad);
}

static void acm_port_join(void *context)
{
	struct acm_device *dev;
	struct acm_port *port = (struct acm_port *) context;
	struct acm_ep *ep;
	union ibv_gid port_gid;
	DLIST_ENTRY *ep_entry;
	int ret;

	dev = port->dev;
	acm_log(1, "device %s port %d\n", dev->verbs->device->name,
		port->port_num);

	ret = ibv_query_gid(dev->verbs, port->port_num, 0, &port_gid);
	if (ret) {
		acm_log(0, "ERROR - ibv_query_gid %d device %s port %d\n",
			ret, dev->verbs->device->name, port->port_num);
		return;
	}

	for (ep_entry = port->ep_list.Next; ep_entry != &port->ep_list;
		 ep_entry = ep_entry->Next) {

		ep = container_of(ep_entry, struct acm_ep, entry);
		acm_join_group(ep, &port_gid, 0, 0, 0, min_rate, min_mtu);
		if (port->rate != min_rate || port->mtu != min_mtu)
			acm_join_group(ep, &port_gid, 0, 0, 0, port->rate, port->mtu);
	}
	acm_log(1, "joins for device %s port %d complete\n", dev->verbs->device->name,
		port->port_num);
}

static void acm_join_groups(void)
{
	struct acm_device *dev;
	struct acm_port *port;
	DLIST_ENTRY *dev_entry;
	int i;

	acm_log(1, "initiating multicast joins for all ports\n");
	for (dev_entry = dev_list.Next; dev_entry != &dev_list;
		 dev_entry = dev_entry->Next) {

		dev = container_of(dev_entry, struct acm_device, entry);

		for (i = 0; i < dev->port_cnt; i++) {
			port = &dev->port[i];
			if (port->state != IBV_PORT_ACTIVE)
				continue;

			acm_log(1, "starting join for device %s, port %d\n",
				dev->verbs->device->name, port->port_num);
			// TODO: handle dynamic changes
			//beginthread(acm_port_join, port);
			acm_port_join(port);
		}
	}
}

static void acm_process_timeouts(void)
{
	DLIST_ENTRY *entry;
	struct acm_send_msg *msg;
	struct acm_mad *mad;
	struct acm_resolve_rec *rec;
	struct acm_dest *dest, **tdest;
	struct acm_request *req;
	struct acm_ep *ep;
	
	while (!DListEmpty(&timeout_list)) {
		entry = timeout_list.Next;
		DListRemove(entry);

		msg = container_of(entry, struct acm_send_msg, entry);
		mad = (struct acm_mad *) msg->data;

		rec = (struct acm_resolve_rec *) mad->data;
		ep = msg->ep;

		acm_log_ep_addr(0, "acm_process_timeouts: dest ",
			(union acm_ep_addr *) &rec->dest, rec->dest_type);
		lock_acquire(&ep->lock);
		tdest = tfind(rec->dest, &ep->dest_map[rec->dest_type - 1], acm_compare_dest);
		if (!tdest) {
			acm_log(0, "destination already removed\n");
			lock_release(&ep->lock);
			continue;
		} else {
			dest = *tdest;
		}

		acm_log(2, "failing pending client requests\n");
		while (!DListEmpty(&dest->req_queue)) {
			entry = dest->req_queue.Next;
			DListRemove(entry);
		
			req = container_of(entry, struct acm_request, entry);
			lock_release(&ep->lock);
			acm_client_resolve_resp(ep, req->client,
				(struct acm_resolve_msg *) &req->msg, dest,
				ACM_STATUS_ETIMEDOUT);
			lock_acquire(&ep->lock);
		}

		acm_log(2, "resolve timed out, releasing destination\n");
		tdelete(dest->address, &ep->dest_map[rec->dest_type - 1], acm_compare_dest);
		lock_release(&ep->lock);
	}
}

static void acm_process_wait_queue(struct acm_ep *ep, uint64_t *next_expire)
{
	struct acm_send_msg *msg;
	DLIST_ENTRY *entry, *next;
	struct ibv_send_wr *bad_wr;

	for (entry = ep->wait_queue.Next; entry != &ep->wait_queue; entry = next) {
		next = entry->Next;
		msg = container_of(entry, struct acm_send_msg, entry);
		if (msg->expires < time_stamp_ms()) {
			DListRemove(entry);
			(void) atomic_dec(&wait_cnt);
			if (--msg->tries) {
				acm_log(2, "retrying request\n");
				DListInsertTail(&msg->entry, &ep->active_queue);
				ibv_post_send(ep->qp, &msg->wr, &bad_wr);
			} else {
				acm_log(0, "failing request\n");
				acm_send_available(ep);
				DListInsertTail(&msg->entry, &timeout_list);
			}
		} else {
			*next_expire = min(*next_expire, msg->expires);
			break;
		}
	}
}

static void CDECL_FUNC acm_retry_handler(void *context)
{
	struct acm_device *dev;
	struct acm_port *port;
	struct acm_ep *ep;
	DLIST_ENTRY *dev_entry, *ep_entry;
	uint64_t next_expire;
	int i, wait;

	acm_log(0, "started\n");
	while (1) {
		while (!atomic_get(&wait_cnt))
			event_wait(&timeout_event, -1);

		next_expire = -1;
		for (dev_entry = dev_list.Next; dev_entry != &dev_list;
			 dev_entry = dev_entry->Next) {

			dev = container_of(dev_entry, struct acm_device, entry);

			for (i = 0; i < dev->port_cnt; i++) {
				port = &dev->port[i];

				for (ep_entry = port->ep_list.Next;
					 ep_entry != &port->ep_list;
					 ep_entry = ep_entry->Next) {

					ep = container_of(ep_entry, struct acm_ep, entry);
					lock_acquire(&ep->lock);
					if (!DListEmpty(&ep->wait_queue))
						acm_process_wait_queue(ep, &next_expire);
					lock_release(&ep->lock);
				}
			}
		}

		acm_process_timeouts();
		wait = (int) (next_expire - time_stamp_ms());
		if (wait > 0 && atomic_get(&wait_cnt))
			event_wait(&timeout_event, wait);
	}
}

static void acm_init_server(void)
{
	int i;

	for (i = 0; i < FD_SETSIZE - 1; i++) {
		lock_init(&client[i].lock);
		client[i].index = i;
		client[i].sock = INVALID_SOCKET;
	}
}

static int acm_listen(void)
{
	struct sockaddr_in addr;
	int ret;

	acm_log(2, "\n");
	listen_socket = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP);
	if (listen_socket == INVALID_SOCKET) {
		acm_log(0, "ERROR - unable to allocate listen socket\n");
		return socket_errno();
	}

	memset(&addr, 0, sizeof addr);
	addr.sin_family = AF_INET;
	addr.sin_port = htons(server_port);
	ret = bind(listen_socket, (struct sockaddr *) &addr, sizeof addr);
	if (ret == SOCKET_ERROR) {
		acm_log(0, "ERROR - unable to bind listen socket\n");
		return socket_errno();
	}
	
	ret = listen(listen_socket, 0);
	if (ret == SOCKET_ERROR) {
		acm_log(0, "ERROR - unable to start listen\n");
		return socket_errno();
	}

	acm_log(2, "listen active\n");
	return 0;
}

static void acm_release_client(struct acm_client *client)
{
	lock_acquire(&client->lock);
	shutdown(client->sock, SHUT_RDWR);
	closesocket(client->sock);
	client->sock = INVALID_SOCKET;
	lock_release(&client->lock);
	(void) atomic_dec(&client->refcnt);
}

static void acm_svr_accept(void)
{
	SOCKET s;
	int i;

	acm_log(2, "\n");
	s = accept(listen_socket, NULL, NULL);
	if (s == INVALID_SOCKET) {
		acm_log(0, "ERROR - failed to accept connection\n");
		return;
	}

	for (i = 0; i < FD_SETSIZE - 1; i++) {
		if (!atomic_get(&client[i].refcnt))
			break;
	}

	if (i == FD_SETSIZE - 1) {
		acm_log(0, "all connections busy - rejecting\n");
		closesocket(s);
		return;
	}

	client[i].sock = s;
	atomic_set(&client[i].refcnt, 1);
	acm_log(2, "assigned client id %d\n", i);
}

static uint8_t acm_get_addr_type(uint8_t ep_type)
{
	if (ep_type >= ACM_ADDRESS_RESERVED) {
		acm_log(0, "ERROR - invalid ep type %d\n", ep_type);
		return ACM_ADDRESS_INVALID;
	}
	return ep_type;
}

static int
acm_client_query_resp(struct acm_ep *ep, struct acm_client *client,
	struct acm_query_msg *msg, uint8_t status)
{
	int ret;

	acm_log(1, "status 0x%x\n", status);
	lock_acquire(&client->lock);
	if (client->sock == INVALID_SOCKET) {
		acm_log(0, "ERROR - connection lost\n");
		ret = ACM_STATUS_ENOTCONN;
		goto release;
	}

	msg->hdr.opcode |= ACM_OP_ACK;
	msg->hdr.status = status;

	ret = send(client->sock, (char *) msg, sizeof *msg, 0);
	if (ret != sizeof(*msg))
		acm_log(0, "failed to send response\n");
	else
		ret = 0;

release:
	lock_release(&client->lock);
	(void) atomic_dec(&client->refcnt);
	return ret;
}

static struct acm_ep *
acm_get_ep_by_path(struct ib_path_record *path)
{
	struct acm_device *dev;
	struct acm_port *port;
	struct acm_ep *ep;
	DLIST_ENTRY *dev_entry, *ep_entry;
	int i;

	for (dev_entry = dev_list.Next; dev_entry != &dev_list;
		 dev_entry = dev_entry->Next) {

		dev = container_of(dev_entry, struct acm_device, entry);
		for (i = 0; i < dev->port_cnt; i++) {
			port = &dev->port[i];

			// requires slid
			if (port->lid != ntohs(path->slid))
				continue;

			for (ep_entry = port->ep_list.Next; ep_entry != &port->ep_list;
				 ep_entry = ep_entry->Next) {

				// ignores pkey
				ep = container_of(ep_entry, struct acm_ep, entry);
				return ep;
			}
		}
	}

	acm_log(0, "could not find endpoint\n");
	return NULL;
}

// TODO: process send/recv asynchronously
static uint8_t acm_query_sa(struct acm_ep *ep, uint8_t query, union acm_query_data *data)
{
	struct acm_port *port;
	struct ib_sa_mad *mad;
	struct ib_user_mad *umad;
	int ret, len;
	size_t size;

	acm_log(2, "\n");
	len = sizeof(*umad) + sizeof(*mad);
	umad = (struct ib_user_mad *) zalloc(len);
	if (!umad) {
		acm_log(0, "ERROR - unable to allocate MAD\n");
		return ACM_STATUS_ENOMEM;
	}

	port = ep->port;
	umad->addr.qpn = htonl(port->sa_dest.remote_qpn);
	umad->addr.qkey = htonl(ACM_QKEY);
	umad->addr.pkey_index = ep->pkey_index;
	umad->addr.lid = htons(port->sa_dest.av.dlid);
	umad->addr.sl = port->sa_dest.av.sl;
	umad->addr.path_bits = port->sa_dest.av.src_path_bits;

	mad = (struct ib_sa_mad *) umad->data;
	switch (query) {
	case ACM_QUERY_PATH_RECORD:
		acm_init_path_query(mad, &data->path);
		size = sizeof(data->path);
		break;
	default:
		acm_log(0, "ERROR - unknown attribute id\n");
		ret = ACM_STATUS_EINVAL;
		goto out;
	}

	ret = umad_send(port->mad_portid, port->mad_agentid, (void *) umad,
		sizeof(*mad), timeout, retries);
	if (ret) {
		acm_log(0, "ERROR - umad_send %d\n", ret);
		goto out;
	}

	acm_log(2, "waiting to receive SA response\n");
	ret = umad_recv(port->mad_portid, (void *) umad, &len, -1);
	if (ret < 0) {
		acm_log(0, "ERROR - umad_recv %d\n", ret);
		goto out;
	}

	memcpy(data, mad->data, size);
	ret = umad->status ? umad->status : mad->status;
	if (ret) {
		acm_log(0, "SA query response error: 0x%x\n", ret);
		ret = ((uint8_t) ret) ? ret : -1;
	}
out:
	free(umad);
	return (uint8_t) ret;
}

static int
acm_svr_query(struct acm_client *client, struct acm_query_msg *msg)
{
	struct acm_ep *ep;
	uint8_t status;

	acm_log(2, "processing client query\n");
	ep = acm_get_ep_by_path(&msg->data.path);
	if (!ep) {
		acm_log(0, "could not find local end point\n");
		status = ACM_STATUS_ESRCADDR;
		goto resp;
	}

	(void) atomic_inc(&client->refcnt);
	lock_acquire(&ep->lock);
	status = acm_query_sa(ep, msg->hdr.param & ~ACM_QUERY_SA, &msg->data);
	lock_release(&ep->lock);

resp:
	return acm_client_query_resp(ep, client, msg, status);
}

static uint8_t
acm_send_resolve(struct acm_ep *ep, union acm_ep_addr *src, uint8_t src_type,
	struct acm_dest *dest, uint8_t dest_type)
{
	struct acm_send_msg *msg;
	struct acm_mad *mad;
	struct acm_resolve_rec *rec;
	int i;

	acm_log(2, "\n");
	if (!ep->mc_dest[0].ah) {
		acm_log(0, "ERROR - multicast group not ready\n");
		return ACM_STATUS_ENOTCONN;
	}

	msg = acm_alloc_send(ep, &ep->mc_dest[0], sizeof(struct acm_mad));
	if (!msg) {
		acm_log(0, "ERROR - cannot allocate send msg\n");
		return ACM_STATUS_ENOMEM;
	}

	msg->tries = retries + 1;
	mad = (struct acm_mad *) msg->data;
	mad->base_version = 1;
	mad->mgmt_class = ACM_MGMT_CLASS;
	mad->class_version = 1;
	mad->method = IB_METHOD_GET;
	mad->control = ACM_CTRL_RESOLVE;
	mad->tid = (uint64_t) atomic_inc(&tid);

	rec = (struct acm_resolve_rec *) mad->data;
	rec->src_type = src_type;
	rec->src_length = ACM_MAX_ADDRESS;
	memcpy(rec->src, src->addr, ACM_MAX_ADDRESS);
	rec->dest_type = dest_type;
	rec->dest_length = ACM_MAX_ADDRESS;
	memcpy(rec->dest, dest->address, ACM_MAX_ADDRESS);
	rec->resp_resources = ep->port->dev->resp_resources;
	rec->init_depth = ep->port->dev->init_depth;

	rec->gid_cnt = (uint8_t) ep->mc_cnt;
	for (i = 0; i < ep->mc_cnt; i++)
		memcpy(&rec->gid[i], ep->mc_dest[i].address, 16);
	
	acm_post_send(msg);
	return 0;
}

static struct acm_ep *
acm_get_ep_by_addr(union acm_ep_addr *addr, uint8_t src_type)
{
	struct acm_device *dev;
	struct acm_port *port;
	struct acm_ep *ep;
	DLIST_ENTRY *dev_entry, *ep_entry;
	int i;

	acm_log_ep_addr(2, "acm_get_ep_by_addr: ", addr, src_type);
	for (dev_entry = dev_list.Next; dev_entry != &dev_list;
		 dev_entry = dev_entry->Next) {

		dev = container_of(dev_entry, struct acm_device, entry);
		for (i = 0; i < dev->port_cnt; i++) {
			port = &dev->port[i];

			for (ep_entry = port->ep_list.Next; ep_entry != &port->ep_list;
				 ep_entry = ep_entry->Next) {

				ep = container_of(ep_entry, struct acm_ep, entry);
				if (acm_addr_index(ep, addr->addr, src_type) >= 0)
					return ep;
			}
		}
	}

	acm_log_ep_addr(0, "acm_get_ep_by_addr: could not find ", addr, src_type);
	return NULL;
}

static int
acm_svr_resolve(struct acm_client *client, struct acm_resolve_msg *msg)
{
	struct acm_ep *ep;
	struct acm_dest *dest, **tdest;
	struct acm_request *req;
	uint8_t dest_type, src_type;
	uint8_t status;

	acm_log_ep_addr(2, "acm_svr_resolve: source ", &msg->src, msg->hdr.src_type);
	ep = acm_get_ep_by_addr(&msg->src, msg->hdr.src_type);
	if (!ep) {
		acm_log(0, "unknown local end point\n");
		status = ACM_STATUS_ESRCADDR;
		goto resp;
	}

	dest_type = acm_get_addr_type(msg->hdr.dest_type);
	if (dest_type == ACM_ADDRESS_INVALID) {
		acm_log(0, "ERROR - unknown destination type\n");
		status = ACM_STATUS_EDESTTYPE;
		goto resp;
	}

	acm_log_ep_addr(2, "acm_svr_resolve: dest ", &msg->dest, msg->hdr.dest_type);
	(void) atomic_inc(&client->refcnt);
	lock_acquire(&ep->lock);
	tdest = tfind(msg->dest.addr, &ep->dest_map[dest_type - 1], acm_compare_dest);
	dest = tdest ? *tdest : NULL;
	if (dest && dest->ah) {
		acm_log(2, "request satisfied from local cache\n");
		status = ACM_STATUS_SUCCESS;
		goto release;
	}

	req = zalloc(sizeof *req);
	if (!req) {
		acm_log(0, "ERROR - unable to allocate memory to queue client request\n");
		status = ACM_STATUS_ENOMEM;
		goto release;
	}

	if (!dest) {
		acm_log(2, "adding new destination\n");
		dest = zalloc(sizeof *dest);
		if (!dest) {
			acm_log(0, "ERROR - unable to allocate destination in client request\n");
			status = ACM_STATUS_ENOMEM;
			goto free_req;
		}

		memcpy(dest->address, msg->dest.addr, ACM_MAX_ADDRESS);
		src_type = acm_get_addr_type(msg->hdr.src_type);
		acm_log(2, "sending resolve msg to dest\n");
		status = acm_send_resolve(ep, &msg->src, src_type, dest, dest_type);
		if (status) {
			acm_log(0, "ERROR - failure sending resolve request 0x%x\n", status);
			goto free_dest;
		}

		DListInit(&dest->req_queue);
		tsearch(dest, &ep->dest_map[dest_type - 1], acm_compare_dest);
	}

	acm_log(2, "queuing client request\n");
	req->client = client;
	memcpy(&req->msg, msg, sizeof(req->msg));
	DListInsertTail(&req->entry, &dest->req_queue);
	lock_release(&ep->lock);
	return 0;

free_dest:
	free(dest);
	dest = NULL;
free_req:
	free(req);
release:
	lock_release(&ep->lock);
resp:
	return acm_client_resolve_resp(ep, client, msg, dest, status);
}

static void acm_svr_receive(struct acm_client *client)
{
	struct acm_msg msg;
	int ret;

	acm_log(2, "\n");
	ret = recv(client->sock, (char *) &msg, sizeof msg, 0);
	if (ret != sizeof msg) {
		acm_log(2, "client disconnected\n");
		ret = ACM_STATUS_ENOTCONN;
		goto out;
	}

	if (msg.hdr.version != ACM_VERSION) {
		acm_log(0, "ERROR - unsupported version %d\n", msg.hdr.version);
		goto out;
	}

	switch (msg.hdr.opcode & ACM_OP_MASK) {
	case ACM_OP_RESOLVE:
		ret = acm_svr_resolve(client, (struct acm_resolve_msg *) &msg);
		break;
	case ACM_OP_QUERY:
		ret = acm_svr_query(client, (struct acm_query_msg *) &msg);
		break;
	default:
		acm_log(0, "ERROR - unknown opcode 0x%x\n", msg.hdr.opcode);
		ret = -1;
		break;
	}

out:
	if (ret)
		acm_release_client(client);
}

static void acm_server(void)
{
	fd_set readfds;
	int i, n, ret;

	acm_log(0, "started\n");
	acm_init_server();
	ret = acm_listen();
	if (ret) {
		acm_log(0, "ERROR - server listen failed\n");
		return;
	}

	while (1) {
		n = (int) listen_socket;
		FD_ZERO(&readfds);
		FD_SET(listen_socket, &readfds);

		for (i = 0; i < FD_SETSIZE - 1; i++) {
			if (client[i].sock != INVALID_SOCKET) {
				FD_SET(client[i].sock, &readfds);
				n = max(n, (int) client[i].sock);
			}
		}

		ret = select(n + 1, &readfds, NULL, NULL, NULL);
		if (ret == SOCKET_ERROR) {
			acm_log(0, "ERROR - server select error\n");
			continue;
		}

		if (FD_ISSET(listen_socket, &readfds))
			acm_svr_accept();

		for (i = 0; i < FD_SETSIZE - 1; i++) {
			if (client[i].sock != INVALID_SOCKET &&
				FD_ISSET(client[i].sock, &readfds)) {
				acm_log(2, "receiving from client %d\n", i);
				acm_svr_receive(&client[i]);
			}
		}
	}
}

static enum ibv_rate acm_get_rate(uint8_t width, uint8_t speed)
{
	switch (width) {
	case 1:
		switch (speed) {
		case 1: return IBV_RATE_2_5_GBPS;
		case 2: return IBV_RATE_5_GBPS;
		case 4: return IBV_RATE_10_GBPS;
		default: return IBV_RATE_MAX;
		}
	case 2:
		switch (speed) {
		case 1: return IBV_RATE_10_GBPS;
		case 2: return IBV_RATE_20_GBPS;
		case 4: return IBV_RATE_40_GBPS;
		default: return IBV_RATE_MAX;
		}
	case 4:
		switch (speed) {
		case 1: return IBV_RATE_20_GBPS;
		case 2: return IBV_RATE_40_GBPS;
		case 4: return IBV_RATE_80_GBPS;
		default: return IBV_RATE_MAX;
		}
	case 8:
		switch (speed) {
		case 1: return IBV_RATE_30_GBPS;
		case 2: return IBV_RATE_60_GBPS;
		case 4: return IBV_RATE_120_GBPS;
		default: return IBV_RATE_MAX;
		}
	default:
		acm_log(0, "ERROR - unknown link width 0x%x\n", width);
		return IBV_RATE_MAX;
	}
}

static enum ibv_mtu acm_convert_mtu(int mtu)
{
	switch (mtu) {
	case 256:  return IBV_MTU_256;
	case 512:  return IBV_MTU_512;
	case 1024: return IBV_MTU_1024;
	case 2048: return IBV_MTU_2048;
	case 4096: return IBV_MTU_4096;
	default:   return IBV_MTU_2048;
	}
}

static enum ibv_rate acm_convert_rate(int rate)
{
	switch (rate) {
	case 2:   return IBV_RATE_2_5_GBPS;
	case 5:   return IBV_RATE_5_GBPS;
	case 10:  return IBV_RATE_10_GBPS;
	case 20:  return IBV_RATE_20_GBPS;
	case 30:  return IBV_RATE_30_GBPS;
	case 40:  return IBV_RATE_40_GBPS;
	case 60:  return IBV_RATE_60_GBPS;
	case 80:  return IBV_RATE_80_GBPS;
	case 120: return IBV_RATE_120_GBPS;
	default:  return IBV_RATE_10_GBPS;
	}
}

static int acm_post_recvs(struct acm_ep *ep)
{
	int i, size;

	size = recv_depth * ACM_RECV_SIZE;
	ep->recv_bufs = malloc(size);
	if (!ep->recv_bufs) {
		acm_log(0, "ERROR - unable to allocate receive buffer\n");
		return ACM_STATUS_ENOMEM;
	}

	ep->mr = ibv_reg_mr(ep->port->dev->pd, ep->recv_bufs, size,
		IBV_ACCESS_LOCAL_WRITE);
	if (!ep->mr) {
		acm_log(0, "ERROR - unable to register receive buffer\n");
		goto err;
	}

	for (i = 0; i < recv_depth; i++) {
		acm_post_recv(ep, (uintptr_t) (ep->recv_bufs + ACM_RECV_SIZE * i));
	}
	return 0;

err:
	free(ep->recv_bufs);
	return -1;
}

static int acm_assign_ep_names(struct acm_ep *ep)
{
	char *dev_name;
	FILE *f;
	char s[120];
	char dev[32], addr[32], pkey_str[8];
	uint16_t pkey;
	uint8_t type;
	int port, index = 0;
	struct in6_addr ip_addr;

	dev_name = ep->port->dev->verbs->device->name;
	acm_log(1, "device %s, port %d, pkey 0x%x\n",
		dev_name, ep->port->port_num, ep->pkey);

	if (!(f = fopen("acm_addr.cfg", "r"))) {
		acm_log(0, "ERROR - unable to open acm_addr.cfg file\n");
		return ACM_STATUS_ENODATA;
	}

	while (fgets(s, sizeof s, f)) {
		if (s[0] == '#')
			continue;

		if (sscanf(s, "%32s%32s%d%8s", addr, dev, &port, pkey_str) != 4)
			continue;

		acm_log(2, "%s", s);
		if (inet_pton(AF_INET, addr, &ip_addr) > 0)
			type = ACM_ADDRESS_IP;
		else if (inet_pton(AF_INET6, addr, &ip_addr) > 0)
			type = ACM_ADDRESS_IP6;
		else
			type = ACM_ADDRESS_NAME;

		if (stricmp(pkey_str, "default")) {
			if (sscanf(pkey_str, "%hx", &pkey) != 1) {
				acm_log(0, "ERROR - bad pkey format %s\n", pkey_str);
				continue;
			}
		} else {
			pkey = 0xFFFF;
		}

		if (!stricmp(dev_name, dev) && (ep->port->port_num == (uint8_t) port) &&
			(ep->pkey == pkey)) {

			ep->addr_type[index] = type;
			acm_log(1, "assigning %s\n", addr);
			if (type == ACM_ADDRESS_IP)
				memcpy(ep->addr[index].addr, &ip_addr, 4);
			else if (type == ACM_ADDRESS_IP6)
				memcpy(ep->addr[index].addr, &ip_addr, sizeof ip_addr);
			else
				strncpy((char *) ep->addr[index].addr, addr, ACM_MAX_ADDRESS);

			if (++index == MAX_EP_ADDR) {
				acm_log(1, "maximum number of names assigned to EP\n");
				break;
			}
		}
	}

	fclose(f);
	return !index;
}

static int acm_activate_ep(struct acm_port *port, struct acm_ep *ep, uint16_t pkey_index)
{
	struct ibv_qp_init_attr init_attr;
	struct ibv_qp_attr attr;
	int ret;

	acm_log(1, "\n");
	ep->port = port;
	ep->pkey_index = pkey_index;
	ep->available_sends = send_depth;
	DListInit(&ep->pending_queue);
	DListInit(&ep->active_queue);
	DListInit(&ep->wait_queue);
	lock_init(&ep->lock);

	ret = ibv_query_pkey(port->dev->verbs, port->port_num, pkey_index, &ep->pkey);
	if (ret)
		return ACM_STATUS_EINVAL;

	ret = acm_assign_ep_names(ep);
	if (ret) {
		acm_log(0, "ERROR - unable to assign EP name\n");
		return ret;
	}

	ep->cq = ibv_create_cq(port->dev->verbs, send_depth + recv_depth, ep,
		port->dev->channel, 0);
	if (!ep->cq) {
		acm_log(0, "ERROR - failed to create CQ\n");
		return -1;
	}

	ret = ibv_req_notify_cq(ep->cq, 0);
	if (ret) {
		acm_log(0, "ERROR - failed to arm CQ\n");
		goto err1;
	}

	memset(&init_attr, 0, sizeof init_attr);
	init_attr.cap.max_send_wr = send_depth;
	init_attr.cap.max_recv_wr = recv_depth;
	init_attr.cap.max_send_sge = 1;
	init_attr.cap.max_recv_sge = 1;
	init_attr.qp_context = ep;
	init_attr.sq_sig_all = 1;
	init_attr.qp_type = IBV_QPT_UD;
	init_attr.send_cq = ep->cq;
	init_attr.recv_cq = ep->cq;
	ep->qp = ibv_create_qp(ep->port->dev->pd, &init_attr);
	if (!ep->qp) {
		acm_log(0, "ERROR - failed to create QP\n");
		goto err1;
	}

	attr.qp_state = IBV_QPS_INIT;
	attr.port_num = port->port_num;
	attr.pkey_index = pkey_index;
	attr.qkey = ACM_QKEY;
	ret = ibv_modify_qp(ep->qp, &attr, IBV_QP_STATE | IBV_QP_PKEY_INDEX |
		IBV_QP_PORT | IBV_QP_QKEY);
	if (ret) {
		acm_log(0, "ERROR - failed to modify QP to init\n");
		goto err2;
	}

	attr.qp_state = IBV_QPS_RTR;
	ret = ibv_modify_qp(ep->qp, &attr, IBV_QP_STATE);
	if (ret) {
		acm_log(0, "ERROR - failed to modify QP to rtr\n");
		goto err2;
	}

	attr.qp_state = IBV_QPS_RTS;
	attr.sq_psn = 0;
	ret = ibv_modify_qp(ep->qp, &attr, IBV_QP_STATE | IBV_QP_SQ_PSN);
	if (ret) {
		acm_log(0, "ERROR - failed to modify QP to rts\n");
		goto err2;
	}

	ret = acm_post_recvs(ep);
	if (ret)
		goto err2;

	return 0;

err2:
	ibv_destroy_qp(ep->qp);
err1:
	ibv_destroy_cq(ep->cq);
	return -1;
}

static void acm_activate_port(struct acm_port *port)
{
	struct acm_ep *ep;
	int i, ret;

	acm_log(1, "%s %d\n", port->dev->verbs->device->name,
		port->port_num);

	for (i = 0; i < port->pkey_cnt; i++) {
		ep = zalloc(sizeof *ep);
		if (!ep)
			break;

		ret = acm_activate_ep(port, ep, (uint16_t) i);
		if (!ret) {
			DListInsertHead(&ep->entry, &port->ep_list);
		} else {
			acm_log(0, "ERROR - failed to activate EP\n");
			free(ep);
		}
	}

	if (DListEmpty(&port->ep_list))
		goto err1;

	port->mad_portid = umad_open_port(port->dev->verbs->device->name, port->port_num);
	if (port->mad_portid < 0) {
		acm_log(0, "ERROR - unable to open MAD port\n");
		goto err2;
	}

	port->mad_agentid = umad_register(port->mad_portid,
		IB_MGMT_CLASS_SA, 1, 1, NULL);
	if (port->mad_agentid < 0) {
		acm_log(0, "ERROR - unable to register MAD client\n");
		goto err3;
	}

	return;

err3:
	umad_close_port(port->mad_portid);
err2:
	/* TODO: cleanup ep list */
err1:
	port->state = IBV_PORT_NOP;
	port->dev->active--;
}

static int acm_activate_dev(struct acm_device *dev)
{
	int i;

	acm_log(1, "%s\n", dev->verbs->device->name);
	dev->pd = ibv_alloc_pd(dev->verbs);
	if (!dev->pd)
		return ACM_STATUS_ENOMEM;

	dev->channel = ibv_create_comp_channel(dev->verbs);
	if (!dev->channel) {
		acm_log(0, "ERROR - unable to create comp channel\n");
		goto err1;
	}

	for (i = 0; i < dev->port_cnt; i++) {
		acm_log(2, "checking port %d\n", dev->port[i].port_num);
		if (dev->port[i].state == IBV_PORT_ACTIVE)
			acm_activate_port(&dev->port[i]);
	}

	if (!dev->active)
		goto err2;

	acm_log(1, "starting completion thread\n");
	beginthread(acm_comp_handler, dev);
	return 0;

err2:
	ibv_destroy_comp_channel(dev->channel);
err1:
	ibv_dealloc_pd(dev->pd);
	return -1;
}

static void acm_init_port(struct acm_port *port)
{
	struct ibv_port_attr attr;
	union ibv_gid gid;
	uint16_t pkey;
	int ret;

	acm_log(1, "%s %d\n", port->dev->verbs->device->name, port->port_num);
	DListInit(&port->ep_list);
	ret = ibv_query_port(port->dev->verbs, port->port_num, &attr);
	if (ret)
		return;

	port->state = attr.state;
	port->mtu = attr.active_mtu;
	port->rate = acm_get_rate(attr.active_width, attr.active_speed);
	port->subnet_timeout = 1 << (attr.subnet_timeout - 8);
	for (;; port->gid_cnt++) {
		ret = ibv_query_gid(port->dev->verbs, port->port_num, port->gid_cnt, &gid);
		if (ret || !gid.global.interface_id)
			break;
	}

	for (;; port->pkey_cnt++) {
		ret = ibv_query_pkey(port->dev->verbs, port->port_num, port->pkey_cnt, &pkey);
		if (ret || !pkey)
			break;
	}
	port->lid = attr.lid;
	port->lmc = attr.lmc;

	port->sa_dest.av.dlid = attr.sm_lid;
	port->sa_dest.av.sl = attr.sm_sl;
	port->sa_dest.av.port_num = port->port_num;
	port->sa_dest.remote_qpn = 1;

	if (port->state == IBV_PORT_ACTIVE)
		port->dev->active++;
}

static void acm_open_dev(struct ibv_device *ibdev)
{
	struct acm_device *dev;
	struct ibv_device_attr attr;
	struct ibv_context *verbs;
	size_t size;
	int i, ret;

	acm_log(1, "%s\n", ibdev->name);
	verbs = ibv_open_device(ibdev);
	if (verbs == NULL) {
		acm_log(0, "ERROR - opening device %s\n", ibdev->name);
		return;
	}

	ret = ibv_query_device(verbs, &attr);
	if (ret) {
		acm_log(0, "ERROR - ibv_query_device (%s) %d\n", ret, ibdev->name);
		goto err1;
	}

	size = sizeof(*dev) + sizeof(struct acm_port) * attr.phys_port_cnt;
	dev = (struct acm_device *) zalloc(size);
	if (!dev)
		goto err1;

	dev->verbs = verbs;
	dev->guid = ibv_get_device_guid(ibdev);
	dev->port_cnt = attr.phys_port_cnt;
	dev->init_depth = (uint8_t) attr.max_qp_init_rd_atom;
	dev->resp_resources = (uint8_t) attr.max_qp_rd_atom;

	for (i = 0; i < dev->port_cnt; i++) {
		dev->port[i].dev = dev;
		dev->port[i].port_num = i + 1;
		acm_init_port(&dev->port[i]);
	}

	if (!dev->active || acm_activate_dev(dev))
		goto err2;

	acm_log(1, "%s now active\n", ibdev->name);
	DListInsertHead(&dev->entry, &dev_list);
	return;

err2:
	free(dev);
err1:
	ibv_close_device(verbs);
}

static void acm_set_options(void)
{
	FILE *f;
	char s[120];
	char opt[32], value[32];

	if (!(f = fopen("acm_opts.cfg", "r")))
		return;

	while (fgets(s, sizeof s, f)) {
		if (s[0] == '#')
			continue;

		if (sscanf(s, "%32s%32s", opt, value) != 2)
			continue;

		if (!stricmp("log_file", opt))
			strcpy(log_file, value);
		else if (!stricmp("log_level", opt))
			log_level = atoi(value);
		else if (!stricmp("server_port", opt))
			server_port = (short) atoi(value);
		else if (!stricmp("timeout", opt))
			timeout = atoi(value);
		else if (!stricmp("retries", opt))
			retries = atoi(value);
		else if (!stricmp("send_depth", opt))
			send_depth = atoi(value);
		else if (!stricmp("recv_depth", opt))
			recv_depth = atoi(value);
		else if (!stricmp("min_mtu", opt))
			min_mtu = acm_convert_mtu(atoi(value));
		else if (!stricmp("min_rate", opt))
			min_rate = acm_convert_rate(atoi(value));
	}

	fclose(f);
}

static void acm_log_options(void)
{
	acm_log(0, "log level %d\n", log_level);
	acm_log(0, "server_port %d\n", server_port);
	acm_log(0, "timeout %d ms\n", timeout);
	acm_log(0, "retries %d\n", retries);
	acm_log(0, "send depth %d\n", send_depth);
	acm_log(0, "receive depth %d\n", recv_depth);
	acm_log(0, "minimum mtu %d\n", min_mtu);
	acm_log(0, "minimum rate %d\n", min_rate);
}

static FILE *acm_open_log(void)
{
	FILE *f;
	int n;

	if (!stricmp(log_file, "stdout"))
		return stdout;

	if (!stricmp(log_file, "stderr"))
		return stderr;

	n = strlen(log_file);
	sprintf(&log_file[n], "%5u.log", getpid());
	if (!(f = fopen(log_file, "w")))
		f = stdout;

	return f;
}

int CDECL_FUNC main(int argc, char **argv)
{
	struct ibv_device **ibdev;
	int dev_cnt;
	int i;

	if (osd_init())
		return -1;

	acm_set_options();

	lock_init(&log_lock);
	flog = acm_open_log();

	acm_log(0, "Assistant to the InfiniBand Communication Manager\n");
	acm_log_options();

	DListInit(&dev_list);
	DListInit(&timeout_list);
	event_init(&timeout_event);

	umad_init();
	ibdev = ibv_get_device_list(&dev_cnt);
	if (!ibdev) {
		acm_log(0, "ERROR - unable to get device list\n");
		return -1;
	}

	acm_log(1, "opening devices\n");
	for (i = 0; i < dev_cnt; i++)
		acm_open_dev(ibdev[i]);

	ibv_free_device_list(ibdev);

	acm_log(1, "initiating multicast joins\n");
	acm_join_groups();
	acm_log(1, "multicast joins done\n");
	acm_log(1, "starting timeout/retry thread\n");
	beginthread(acm_retry_handler, NULL);
	acm_log(1, "starting server\n");
	acm_server();

	acm_log(0, "shutting down\n");
	fclose(flog);
	return 0;
}





More information about the general mailing list