[ofw] [PATCH] uDAPL v2: CNO support broken in both CMA and SCM providers.

Arlin Davis arlin.r.davis at intel.com
Sun Aug 2 16:26:52 PDT 2009


CQ thread/callback mechanism was removed by mistake. Still
need indirect DTO callbacks when CNO is attached to EVD's.

Add CQ event channel to cma provider's thread and add
to select for rdma_cm and async channels.

For scm provider there is no easy way to add this channel
to the select across sockets on windows. So, for portablity
reasons a 2nd thread is started to process the ASYNC and
CQ channels for events.

Must also disable EVD (evd_endabled=FALSE) during destroy
to prevent EVD events firing for CNOs and re-arming CQ while
CQ is being destroyed.

Slight modification to dtest to check EVD after CNO timeout.

Signed-off-by: Arlin Davis <arlin.r.davis at intel.com>
---
 dapl/common/dapl_evd_util.c         |    1 +
 dapl/openib_cma/dapl_ib_util.h      |    5 +-
 dapl/openib_cma/device.c            |  154 ++++---------
 dapl/openib_common/cq.c             |  192 +++++----------
 dapl/openib_common/dapl_ib_common.h |    2 +
 dapl/openib_common/util.c           |   98 ++++++++
 dapl/openib_scm/dapl_ib_util.h      |    5 +
 dapl/openib_scm/device.c            |  458 ++++++++++++++++++++++++++++++-----
 test/dtest/dtest.c                  |   54 +++--
 9 files changed, 649 insertions(+), 320 deletions(-)

diff --git a/dapl/common/dapl_evd_util.c b/dapl/common/dapl_evd_util.c
index 88c3f8f..02909e9 100644
--- a/dapl/common/dapl_evd_util.c
+++ b/dapl/common/dapl_evd_util.c
@@ -469,6 +469,7 @@ DAT_RETURN dapls_evd_dealloc(IN DAPL_EVD * evd_ptr)
 	 * Destroy the CQ first, to keep any more callbacks from coming
 	 * up from it.
 	 */
+	evd_ptr->evd_enabled = DAT_FALSE;
 	if (evd_ptr->ib_cq_handle != IB_INVALID_HANDLE) {
 		ia_ptr = evd_ptr->header.owner_ia;
 
diff --git a/dapl/openib_cma/dapl_ib_util.h b/dapl/openib_cma/dapl_ib_util.h
index f466c06..c9ab4d6 100755
--- a/dapl/openib_cma/dapl_ib_util.h
+++ b/dapl/openib_cma/dapl_ib_util.h
@@ -84,7 +84,6 @@ typedef struct _ib_hca_transport
 { 
 	struct dapl_llist_entry	entry;
 	int			destroy;
-	struct dapl_hca		*d_hca;
 	struct rdma_cm_id 	*cm_id;
 	struct ibv_comp_channel *ib_cq;
 	ib_cq_handle_t		ib_cq_empty;
@@ -99,6 +98,7 @@ typedef struct _ib_hca_transport
 	/* device attributes */
 	int			rd_atom_in;
 	int			rd_atom_out;
+	struct	ibv_context	*ib_ctx;
 	struct	ibv_device	*ib_dev;
 	/* dapls_modify_qp_state */
 	uint16_t		lid;
@@ -119,7 +119,8 @@ void dapli_thread(void *arg);
 DAT_RETURN  dapli_ib_thread_init(void);
 void dapli_ib_thread_destroy(void);
 void dapli_cma_event_cb(void);
-void dapli_async_event_cb(struct _ib_hca_transport *hca);
+void dapli_async_event_cb(struct _ib_hca_transport *tp);
+void dapli_cq_event_cb(struct _ib_hca_transport *tp);
 dp_ib_cm_handle_t dapls_ib_cm_create(DAPL_EP *ep);
 void dapls_ib_cm_free(dp_ib_cm_handle_t cm, DAPL_EP *ep);
 DAT_RETURN dapls_modify_qp_state(IN ib_qp_handle_t qp_handle,
diff --git a/dapl/openib_cma/device.c b/dapl/openib_cma/device.c
index 81203bf..743e8fa 100644
--- a/dapl/openib_cma/device.c
+++ b/dapl/openib_cma/device.c
@@ -123,6 +123,12 @@ static int dapls_config_verbs(struct ibv_context *verbs)
 	return 0;
 }
 
+static int dapls_config_comp_channel(struct ibv_comp_channel *channel)
+{
+	channel->comp_channel.Milliseconds = 0;
+	return 0;
+}
+
 static int dapls_thread_signal(void)
 {
 	CompManagerCancel(windata.comp_mgr);
@@ -205,6 +211,11 @@ static int dapls_config_verbs(struct ibv_context *verbs)
 	return dapls_config_fd(verbs->async_fd);
 }
 
+static int dapls_config_comp_channel(struct ibv_comp_channel *channel)
+{
+	return dapls_config_fd(channel->fd);
+}
+
 static int dapls_thread_signal(void)
 {
 	return write(g_ib_pipe[1], "w", sizeof "w");
@@ -334,10 +345,6 @@ DAT_RETURN dapls_ib_open_hca(IN IB_HCA_NAME hca_name, IN DAPL_HCA * hca_ptr)
 	dapl_dbg_log(DAPL_DBG_TYPE_UTIL,
 		     " open_hca: RDMA channel created (%p)\n", g_cm_events);
 
-	dat_status = dapli_ib_thread_init();
-	if (dat_status != DAT_SUCCESS)
-		return dat_status;
-
 	/* HCA name will be hostname or IP address */
 	if (getipaddr((char *)hca_name,
 		      (char *)&hca_ptr->hca_address, 
@@ -357,6 +364,7 @@ DAT_RETURN dapls_ib_open_hca(IN IB_HCA_NAME hca_name, IN DAPL_HCA * hca_ptr)
 		dapl_log(DAPL_DBG_TYPE_ERR,
 			 " open_hca: rdma_bind ERR %s."
 			 " Is %s configured?\n", strerror(errno), hca_name);
+		rdma_destroy_id(cm_id);
 		return DAT_INVALID_ADDRESS;
 	}
 
@@ -366,6 +374,7 @@ DAT_RETURN dapls_ib_open_hca(IN IB_HCA_NAME hca_name, IN DAPL_HCA * hca_ptr)
 	dapls_config_verbs(cm_id->verbs);
 	hca_ptr->port_num = cm_id->port_num;
 	hca_ptr->ib_trans.ib_dev = cm_id->verbs->device;
+	hca_ptr->ib_trans.ib_ctx = cm_id->verbs;
 	gid = &cm_id->route.addr.addr.ibaddr.sgid;
 
 	dapl_dbg_log(DAPL_DBG_TYPE_UTIL,
@@ -374,6 +383,21 @@ DAT_RETURN dapls_ib_open_hca(IN IB_HCA_NAME hca_name, IN DAPL_HCA * hca_ptr)
 		     (unsigned long long)ntohll(gid->global.subnet_prefix),
 		     (unsigned long long)ntohll(gid->global.interface_id));
 
+	/* support for EVD's with CNO's: one channel via thread */
+	hca_ptr->ib_trans.ib_cq =
+	    ibv_create_comp_channel(hca_ptr->ib_hca_handle);
+	if (hca_ptr->ib_trans.ib_cq == NULL) {
+		dapl_log(DAPL_DBG_TYPE_ERR,
+			 " open_hca: ibv_create_comp_channel ERR %s\n",
+			 strerror(errno));
+		rdma_destroy_id(cm_id);
+		return DAT_INTERNAL_ERROR;
+	}
+	if (dapls_config_comp_channel(hca_ptr->ib_trans.ib_cq)) {
+		rdma_destroy_id(cm_id);
+		return DAT_INTERNAL_ERROR;
+	}
+
 	/* set inline max with env or default, get local lid and gid 0 */
 	if (hca_ptr->ib_hca_handle->device->transport_type
 	    == IBV_TRANSPORT_IWARP)
@@ -395,14 +419,17 @@ DAT_RETURN dapls_ib_open_hca(IN IB_HCA_NAME hca_name, IN DAPL_HCA * hca_ptr)
 	/* set default IB MTU */
 	hca_ptr->ib_trans.mtu = dapl_ib_mtu(2048);
 
+	dat_status = dapli_ib_thread_init();
+	if (dat_status != DAT_SUCCESS)
+		return dat_status;
 	/* 
 	 * Put new hca_transport on list for async and CQ event processing 
 	 * Wakeup work thread to add to polling list
 	 */
-	dapl_llist_init_entry((DAPL_LLIST_ENTRY *) & hca_ptr->ib_trans.entry);
+	dapl_llist_init_entry((DAPL_LLIST_ENTRY *) &hca_ptr->ib_trans.entry);
 	dapl_os_lock(&g_hca_lock);
 	dapl_llist_add_tail(&g_hca_list,
-			    (DAPL_LLIST_ENTRY *) & hca_ptr->ib_trans.entry,
+			    (DAPL_LLIST_ENTRY *) &hca_ptr->ib_trans.entry,
 			    &hca_ptr->ib_trans.entry);
 	if (dapls_thread_signal() == -1)
 		dapl_log(DAPL_DBG_TYPE_UTIL,
@@ -425,7 +452,6 @@ DAT_RETURN dapls_ib_open_hca(IN IB_HCA_NAME hca_name, IN DAPL_HCA * hca_ptr)
 		     &hca_ptr->hca_address)->sin_addr.s_addr >> 24 & 0xff, 
 		     hca_ptr->ib_trans.max_inline_send);
 
-	hca_ptr->ib_trans.d_hca = hca_ptr;
 	return DAT_SUCCESS;
 }
 
@@ -574,105 +600,6 @@ bail:
 		     " ib_thread_destroy(%d) exit\n", dapl_os_getpid());
 }
 
-void dapli_async_event_cb(struct _ib_hca_transport *hca)
-{
-	struct ibv_async_event event;
-
-	dapl_dbg_log(DAPL_DBG_TYPE_UTIL, " async_event(%p)\n", hca);
-
-	if (hca->destroy)
-		return;
-
-	if (!ibv_get_async_event(hca->cm_id->verbs, &event)) {
-
-		switch (event.event_type) {
-		case IBV_EVENT_CQ_ERR:
-		{
-			struct dapl_ep *evd_ptr =
-				event.element.cq->cq_context;
-
-			dapl_log(DAPL_DBG_TYPE_ERR,
-				 "dapl async_event CQ (%p) ERR %d\n",
-				 evd_ptr, event.event_type);
-
-			/* report up if async callback still setup */
-			if (hca->async_cq_error)
-				hca->async_cq_error(hca->cm_id->verbs,
-							event.element.cq,
-							&event,
-							(void *)evd_ptr);
-			break;
-		}
-		case IBV_EVENT_COMM_EST:
-		{
-			/* Received msgs on connected QP before RTU */
-			dapl_log(DAPL_DBG_TYPE_UTIL,
-				 " async_event COMM_EST(%p) rdata beat RTU\n",
-				 event.element.qp);
-
-			break;
-		}
-		case IBV_EVENT_QP_FATAL:
-		case IBV_EVENT_QP_REQ_ERR:
-		case IBV_EVENT_QP_ACCESS_ERR:
-		case IBV_EVENT_QP_LAST_WQE_REACHED:
-		case IBV_EVENT_SRQ_ERR:
-		case IBV_EVENT_SRQ_LIMIT_REACHED:
-		case IBV_EVENT_SQ_DRAINED:
-		{
-			struct dapl_ep *ep_ptr =
-				event.element.qp->qp_context;
-
-			dapl_log(DAPL_DBG_TYPE_ERR,
-				 "dapl async_event QP (%p) ERR %d\n",
-				 ep_ptr, event.event_type);
-
-			/* report up if async callback still setup */
-			if (hca->async_qp_error)
-				hca->async_qp_error(hca->cm_id->verbs,
-						    ep_ptr->qp_handle,
-						    &event,
-						    (void *)ep_ptr);
-			break;
-		}
-		case IBV_EVENT_PATH_MIG:
-		case IBV_EVENT_PATH_MIG_ERR:
-		case IBV_EVENT_DEVICE_FATAL:
-		case IBV_EVENT_PORT_ACTIVE:
-		case IBV_EVENT_PORT_ERR:
-		case IBV_EVENT_LID_CHANGE:
-		case IBV_EVENT_PKEY_CHANGE:
-		case IBV_EVENT_SM_CHANGE:
-		{
-			dapl_log(DAPL_DBG_TYPE_WARN,
-				 "dapl async_event: DEV ERR %d\n",
-				 event.event_type);
-
-			/* report up if async callback still setup */
-			if (hca->async_unafiliated)
-				hca->async_unafiliated(hca->cm_id->
-							verbs, &event,
-							hca->
-							async_un_ctx);
-			break;
-		}
-		case IBV_EVENT_CLIENT_REREGISTER:
-			/* no need to report this event this time */
-			dapl_log(DAPL_DBG_TYPE_UTIL,
-				 " async_event: IBV_CLIENT_REREGISTER\n");
-			break;
-
-		default:
-			dapl_log(DAPL_DBG_TYPE_WARN,
-				 "dapl async_event: %d UNKNOWN\n",
-				 event.event_type);
-			break;
-
-		}
-		ibv_ack_async_event(&event);
-	}
-}
-
 #if defined(_WIN64) || defined(_WIN32)
 /* work thread for uAT, uCM, CQ, and async events */
 void dapli_thread(void *arg)
@@ -721,6 +648,7 @@ void dapli_thread(void *arg)
 				dapl_os_unlock(&g_hca_lock);
 				uhca[idx]->destroy = 2;
 			} else {
+				dapli_cq_event_cb(uhca[idx]);
 				dapli_async_event_cb(uhca[idx]);
 			}
 		}
@@ -732,6 +660,7 @@ void dapli_thread(void *arg)
 	dapl_os_unlock(&g_hca_lock);
 }
 #else				// _WIN64 || WIN32
+
 /* work thread for uAT, uCM, CQ, and async events */
 void dapli_thread(void *arg)
 {
@@ -771,7 +700,13 @@ void dapli_thread(void *arg)
 		while (hca) {
 
 			/* uASYNC events */
-			ufds[++idx].fd = hca->cm_id->verbs->async_fd;
+			ufds[++idx].fd = hca->ib_ctx->async_fd;
+			ufds[idx].events = POLLIN;
+			ufds[idx].revents = 0;
+			uhca[idx] = hca;
+
+			/* CQ events are non-direct with CNO's */
+			ufds[++idx].fd = hca->ib_cq->fd;
 			ufds[idx].events = POLLIN;
 			ufds[idx].revents = 0;
 			uhca[idx] = hca;
@@ -809,9 +744,10 @@ void dapli_thread(void *arg)
 		if (ufds[1].revents == POLLIN)
 			dapli_cma_event_cb();
 
-		/* check and process ASYNC events, per device */
+		/* check and process CQ and ASYNC events, per device */
 		for (idx = 2; idx < fds; idx++) {
 			if (ufds[idx].revents == POLLIN) {
+				dapli_cq_event_cb(uhca[idx]);
 				dapli_async_event_cb(uhca[idx]);
 			}
 		}
@@ -824,7 +760,7 @@ void dapli_thread(void *arg)
 					 strerror(errno));
 
 			/* cleanup any device on list marked for destroy */
-			for (idx = 2; idx < fds; idx++) {
+			for (idx = 3; idx < fds; idx++) {
 				if (uhca[idx] && uhca[idx]->destroy == 1) {
 					dapl_os_lock(&g_hca_lock);
 					dapl_llist_remove_entry(
diff --git a/dapl/openib_common/cq.c b/dapl/openib_common/cq.c
index 096167c..16d4f18 100644
--- a/dapl/openib_common/cq.c
+++ b/dapl/openib_common/cq.c
@@ -171,36 +171,32 @@ DAT_RETURN dapls_ib_get_async_event(IN ib_error_record_t * err_record,
  *	DAT_INSUFFICIENT_RESOURCES
  *
  */
-#if defined(_WIN32)
-
 DAT_RETURN
 dapls_ib_cq_alloc(IN DAPL_IA * ia_ptr,
 		  IN DAPL_EVD * evd_ptr, IN DAT_COUNT * cqlen)
 {
-	OVERLAPPED *overlap;
+	struct ibv_comp_channel *channel;
 	DAT_RETURN ret;
 
 	dapl_dbg_log(DAPL_DBG_TYPE_UTIL,
 		     "dapls_ib_cq_alloc: evd %p cqlen=%d \n", evd_ptr, *cqlen);
 
-	evd_ptr->ib_cq_handle = ibv_create_cq(ia_ptr->hca_ptr->ib_hca_handle,
-					      *cqlen, evd_ptr, NULL, 0);
+	if (!evd_ptr->cno_ptr)
+		channel = ibv_create_comp_channel(ia_ptr->hca_ptr->ib_hca_handle);
+	else
+		channel = ia_ptr->hca_ptr->ib_trans.ib_cq;
 
-	if (evd_ptr->ib_cq_handle == IB_INVALID_HANDLE)
+	if (!channel)
 		return DAT_INSUFFICIENT_RESOURCES;
 
-	dapl_dbg_log(DAPL_DBG_TYPE_UTIL,
-		     " cq_object_create: (%p)\n", evd_ptr);
+	evd_ptr->ib_cq_handle = ibv_create_cq(ia_ptr->hca_ptr->ib_hca_handle,
+					      *cqlen, evd_ptr, channel, 0);
 
-	overlap = &evd_ptr->ib_cq_handle->comp_entry.Overlap;
-	overlap->hEvent = CreateEvent(NULL, FALSE, FALSE, NULL);
-	if (!overlap->hEvent) {
+	if (evd_ptr->ib_cq_handle == IB_INVALID_HANDLE) {
 		ret = DAT_INSUFFICIENT_RESOURCES;
 		goto err;
 	}
 
-	overlap->hEvent = (HANDLE) ((ULONG_PTR) overlap->hEvent | 1);
-
 	/* arm cq for events */
 	dapls_set_cq_notify(ia_ptr, evd_ptr);
 
@@ -214,7 +210,8 @@ dapls_ib_cq_alloc(IN DAPL_IA * ia_ptr,
 	return DAT_SUCCESS;
 
 err:
-	ibv_destroy_cq(evd_ptr->ib_cq_handle);
+	if (!evd_ptr->cno_ptr)
+		ibv_destroy_comp_channel(channel);
 	return ret;
 }
 
@@ -239,18 +236,18 @@ DAT_RETURN dapls_ib_cq_free(IN DAPL_IA * ia_ptr, IN DAPL_EVD * evd_ptr)
 {
 	DAT_EVENT event;
 	ib_work_completion_t wc;
-	HANDLE hevent;
+	struct ibv_comp_channel *channel;
 
 	if (evd_ptr->ib_cq_handle != IB_INVALID_HANDLE) {
 		/* pull off CQ and EVD entries and toss */
 		while (ibv_poll_cq(evd_ptr->ib_cq_handle, 1, &wc) == 1) ;
 		while (dapl_evd_dequeue(evd_ptr, &event) == DAT_SUCCESS) ;
 
-		hevent = evd_ptr->ib_cq_handle->comp_entry.Overlap.hEvent;
+		channel = evd_ptr->ib_cq_handle->channel;
 		if (ibv_destroy_cq(evd_ptr->ib_cq_handle))
 			return (dapl_convert_errno(errno, "ibv_destroy_cq"));
-
-		CloseHandle(hevent);
+		if (!evd_ptr->cno_ptr)
+			ibv_destroy_comp_channel(channel);
 		evd_ptr->ib_cq_handle = IB_INVALID_HANDLE;
 	}
 	return DAT_SUCCESS;
@@ -262,105 +259,42 @@ dapls_evd_dto_wakeup(IN DAPL_EVD * evd_ptr)
 	dapl_dbg_log(DAPL_DBG_TYPE_UTIL,
 		     " cq_object_wakeup: evd=%p\n", evd_ptr);
 
-	if (!SetEvent(evd_ptr->ib_cq_handle->comp_entry.Overlap.hEvent))
-		return DAT_INTERNAL_ERROR;
-
+	/* no wake up mechanism */
 	return DAT_SUCCESS;
 }
 
-DAT_RETURN
-dapls_evd_dto_wait(IN DAPL_EVD * evd_ptr, IN uint32_t timeout)
+#if defined(_WIN32)
+static int
+dapls_wait_comp_channel(IN struct ibv_comp_channel *channel, IN uint32_t timeout)
 {
-	int status;
-
-	dapl_dbg_log(DAPL_DBG_TYPE_UTIL,
-		     " cq_object_wait: EVD %p time %d\n",
-		     evd_ptr, timeout);
-
-	status = WaitForSingleObject(evd_ptr->ib_cq_handle->
-				     comp_entry.Overlap.hEvent,
-				     timeout / 1000);
-	dapl_dbg_log(DAPL_DBG_TYPE_UTIL,
-		     " cq_object_wait: EVD %p status 0x%x\n",
-		     evd_ptr, status);
-	if (status)
-		return DAT_TIMEOUT_EXPIRED;
-
-	InterlockedExchange(&evd_ptr->ib_cq_handle->comp_entry.Busy, 0);
-	return DAT_SUCCESS;
+	channel->comp_channel.Milliseconds =
+		(timeout == DAT_TIMEOUT_INFINITE) ? INFINITE : timeout / 1000;
+	return 0;
 }
 
 #else // WIN32
 
-DAT_RETURN
-dapls_ib_cq_alloc(IN DAPL_IA * ia_ptr,
-		  IN DAPL_EVD * evd_ptr, IN DAT_COUNT * cqlen)
-{
-	struct ibv_comp_channel *channel;
-	DAT_RETURN ret;
-
-	dapl_dbg_log(DAPL_DBG_TYPE_UTIL,
-		     "dapls_ib_cq_alloc: evd %p cqlen=%d \n", evd_ptr, *cqlen);
-
-	channel = ibv_create_comp_channel(ia_ptr->hca_ptr->ib_hca_handle);
-	if (!channel)
-		return DAT_INSUFFICIENT_RESOURCES;
-
-	evd_ptr->ib_cq_handle = ibv_create_cq(ia_ptr->hca_ptr->ib_hca_handle,
-					      *cqlen, evd_ptr, channel, 0);
-
-	if (evd_ptr->ib_cq_handle == IB_INVALID_HANDLE) {
-		ret = DAT_INSUFFICIENT_RESOURCES;
-		goto err;
-	}
-
-	/* arm cq for events */
-	dapls_set_cq_notify(ia_ptr, evd_ptr);
-
-	/* update with returned cq entry size */
-	*cqlen = evd_ptr->ib_cq_handle->cqe;
-
-	dapl_dbg_log(DAPL_DBG_TYPE_UTIL,
-		     "dapls_ib_cq_alloc: new_cq %p cqlen=%d \n",
-		     evd_ptr->ib_cq_handle, *cqlen);
-
-	return DAT_SUCCESS;
-
-err:
-	ibv_destroy_comp_channel(channel);
-	return ret;
-}
-
-DAT_RETURN dapls_ib_cq_free(IN DAPL_IA * ia_ptr, IN DAPL_EVD * evd_ptr)
+static int
+dapls_wait_comp_channel(IN struct ibv_comp_channel *channel, IN uint32_t timeout)
 {
-	DAT_EVENT event;
-	ib_work_completion_t wc;
-	struct ibv_comp_channel *channel;
-
-	if (evd_ptr->ib_cq_handle != IB_INVALID_HANDLE) {
-		/* pull off CQ and EVD entries and toss */
-		while (ibv_poll_cq(evd_ptr->ib_cq_handle, 1, &wc) == 1) ;
-		while (dapl_evd_dequeue(evd_ptr, &event) == DAT_SUCCESS) ;
-
-		channel = evd_ptr->ib_cq_handle->channel;
-		if (ibv_destroy_cq(evd_ptr->ib_cq_handle))
-			return (dapl_convert_errno(errno, "ibv_destroy_cq"));
-
-		ibv_destroy_comp_channel(channel);
-		evd_ptr->ib_cq_handle = IB_INVALID_HANDLE;
-	}
-	return DAT_SUCCESS;
-}
-
-DAT_RETURN
-dapls_evd_dto_wakeup(IN DAPL_EVD * evd_ptr)
-{
-	dapl_dbg_log(DAPL_DBG_TYPE_UTIL,
-		     " cq_object_wakeup: evd=%p\n", evd_ptr);
+	int status, timeout_ms;
+	struct pollfd cq_fd = {
+		.fd = channel->fd,
+		.events = POLLIN,
+		.revents = 0
+	};
 
-	/* no wake up mechanism */
-	return DAT_SUCCESS;
+	/* uDAPL timeout values in usecs */
+	timeout_ms = (timeout == DAT_TIMEOUT_INFINITE) ? -1 : timeout / 1000;
+	status = poll(&cq_fd, 1, timeout_ms);
+	if (status > 0)
+		return 0;
+	else if (status == 0)
+		return ETIMEDOUT;
+	else
+		return status;
 }
+#endif
 
 DAT_RETURN
 dapls_evd_dto_wait(IN DAPL_EVD * evd_ptr, IN uint32_t timeout)
@@ -368,43 +302,45 @@ dapls_evd_dto_wait(IN DAPL_EVD * evd_ptr, IN uint32_t timeout)
 	struct ibv_comp_channel *channel = evd_ptr->ib_cq_handle->channel;
 	struct ibv_cq *ibv_cq = NULL;
 	void *context;
-	int status = 0;
-	int timeout_ms = -1;
-	struct pollfd cq_fd = {
-		.fd = channel->fd,
-		.events = POLLIN,
-		.revents = 0
-	};
+	int status;
 
 	dapl_dbg_log(DAPL_DBG_TYPE_UTIL,
 		     " cq_object_wait: EVD %p time %d\n",
 		     evd_ptr, timeout);
 
-	/* uDAPL timeout values in usecs */
-	if (timeout != DAT_TIMEOUT_INFINITE)
-		timeout_ms = timeout / 1000;
-
-	status = poll(&cq_fd, 1, timeout_ms);
-
-	/* returned event */
-	if (status > 0) {
+	status = dapls_wait_comp_channel(channel, timeout);
+	if (!status) {
 		if (!ibv_get_cq_event(channel, &ibv_cq, &context)) {
 			ibv_ack_cq_events(ibv_cq, 1);
 		}
-		status = 0;
-
-		/* timeout */
-	} else if (status == 0)
-		status = ETIMEDOUT;
+	}
 
 	dapl_dbg_log(DAPL_DBG_TYPE_UTIL,
 		     " cq_object_wait: RET evd %p ibv_cq %p %s\n",
 		     evd_ptr, ibv_cq, strerror(errno));
 
-	return (dapl_convert_errno(status, "cq_wait_object_wait"));
+	return dapl_convert_errno(status, "cq_wait_object_wait");
+}
 
+void dapli_cq_event_cb(struct _ib_hca_transport *tp)
+{
+	/* check all comp events on this device */
+	struct dapl_evd *evd = NULL;
+	struct ibv_cq   *ibv_cq = NULL;
+
+	dapl_dbg_log(DAPL_DBG_TYPE_UTIL," dapli_cq_event_cb(%p)\n", tp);
+
+	while (!ibv_get_cq_event(tp->ib_cq, &ibv_cq, (void*)&evd)) {
+
+		if (!DAPL_BAD_HANDLE(evd, DAPL_MAGIC_EVD)) {
+			/* Both EVD or EVD->CNO event via callback */
+			dapl_evd_dto_callback(tp->ib_ctx, 
+					      evd->ib_cq_handle, (void*)evd);
+		}
+
+		ibv_ack_cq_events(ibv_cq, 1);
+	} 
 }
-#endif
 
 /*
  * dapl_ib_cq_resize
diff --git a/dapl/openib_common/dapl_ib_common.h b/dapl/openib_common/dapl_ib_common.h
index 0b417b8..2195767 100644
--- a/dapl/openib_common/dapl_ib_common.h
+++ b/dapl/openib_common/dapl_ib_common.h
@@ -208,6 +208,8 @@ typedef uint32_t ib_shm_transport_t;
 /* prototypes */
 int32_t	dapls_ib_init(void);
 int32_t	dapls_ib_release(void);
+
+/* util.c */
 enum ibv_mtu dapl_ib_mtu(int mtu);
 char *dapl_ib_mtu_str(enum ibv_mtu mtu);
 DAT_RETURN getlocalipaddr(DAT_SOCK_ADDR *addr, int addr_len);
diff --git a/dapl/openib_common/util.c b/dapl/openib_common/util.c
index da913c5..3963e1f 100644
--- a/dapl/openib_common/util.c
+++ b/dapl/openib_common/util.c
@@ -320,6 +320,104 @@ DAT_RETURN dapls_ib_setup_async_callback(IN DAPL_IA * ia_ptr,
 	return DAT_SUCCESS;
 }
 
+void dapli_async_event_cb(struct _ib_hca_transport *hca)
+{
+	struct ibv_async_event event;
+
+	dapl_dbg_log(DAPL_DBG_TYPE_UTIL, " async_event(%p)\n", hca);
+
+	if (hca->destroy)
+		return;
+
+	if (!ibv_get_async_event(hca->ib_ctx, &event)) {
+
+		switch (event.event_type) {
+		case IBV_EVENT_CQ_ERR:
+		{
+			struct dapl_ep *evd_ptr =
+				event.element.cq->cq_context;
+
+			dapl_log(DAPL_DBG_TYPE_ERR,
+				 "dapl async_event CQ (%p) ERR %d\n",
+				 evd_ptr, event.event_type);
+
+			/* report up if async callback still setup */
+			if (hca->async_cq_error)
+				hca->async_cq_error(hca->ib_ctx,
+						    event.element.cq,
+						    &event,
+						    (void *)evd_ptr);
+			break;
+		}
+		case IBV_EVENT_COMM_EST:
+		{
+			/* Received msgs on connected QP before RTU */
+			dapl_log(DAPL_DBG_TYPE_UTIL,
+				 " async_event COMM_EST(%p) rdata beat RTU\n",
+				 event.element.qp);
+
+			break;
+		}
+		case IBV_EVENT_QP_FATAL:
+		case IBV_EVENT_QP_REQ_ERR:
+		case IBV_EVENT_QP_ACCESS_ERR:
+		case IBV_EVENT_QP_LAST_WQE_REACHED:
+		case IBV_EVENT_SRQ_ERR:
+		case IBV_EVENT_SRQ_LIMIT_REACHED:
+		case IBV_EVENT_SQ_DRAINED:
+		{
+			struct dapl_ep *ep_ptr =
+				event.element.qp->qp_context;
+
+			dapl_log(DAPL_DBG_TYPE_ERR,
+				 "dapl async_event QP (%p) ERR %d\n",
+				 ep_ptr, event.event_type);
+
+			/* report up if async callback still setup */
+			if (hca->async_qp_error)
+				hca->async_qp_error(hca->ib_ctx,
+						    ep_ptr->qp_handle,
+						    &event,
+						    (void *)ep_ptr);
+			break;
+		}
+		case IBV_EVENT_PATH_MIG:
+		case IBV_EVENT_PATH_MIG_ERR:
+		case IBV_EVENT_DEVICE_FATAL:
+		case IBV_EVENT_PORT_ACTIVE:
+		case IBV_EVENT_PORT_ERR:
+		case IBV_EVENT_LID_CHANGE:
+		case IBV_EVENT_PKEY_CHANGE:
+		case IBV_EVENT_SM_CHANGE:
+		{
+			dapl_log(DAPL_DBG_TYPE_WARN,
+				 "dapl async_event: DEV ERR %d\n",
+				 event.event_type);
+
+			/* report up if async callback still setup */
+			if (hca->async_unafiliated)
+				hca->async_unafiliated(hca->ib_ctx, 
+						       &event,	
+						       hca->async_un_ctx);
+			break;
+		}
+		case IBV_EVENT_CLIENT_REREGISTER:
+			/* no need to report this event this time */
+			dapl_log(DAPL_DBG_TYPE_UTIL,
+				 " async_event: IBV_CLIENT_REREGISTER\n");
+			break;
+
+		default:
+			dapl_log(DAPL_DBG_TYPE_WARN,
+				 "dapl async_event: %d UNKNOWN\n",
+				 event.event_type);
+			break;
+
+		}
+		ibv_ack_async_event(&event);
+	}
+}
+
 /*
  * dapls_set_provider_specific_attr
  *
diff --git a/dapl/openib_scm/dapl_ib_util.h b/dapl/openib_scm/dapl_ib_util.h
index a5e734e..933364c 100644
--- a/dapl/openib_scm/dapl_ib_util.h
+++ b/dapl/openib_scm/dapl_ib_util.h
@@ -78,8 +78,11 @@ typedef dp_ib_cm_handle_t	ib_cm_srvc_handle_t;
 /* ib_hca_transport_t, specific to this implementation */
 typedef struct _ib_hca_transport
 { 
+	struct dapl_llist_entry	entry;
+	int			destroy;
 	union ibv_gid		gid;
 	struct	ibv_device	*ib_dev;
+	struct	ibv_context	*ib_ctx;
 	ib_cq_handle_t		ib_cq_empty;
 	DAPL_OS_LOCK		cq_lock;	
 	int			max_inline_send;
@@ -114,6 +117,8 @@ typedef struct _ib_hca_transport
 void cr_thread(void *arg);
 int dapli_cq_thread_init(struct dapl_hca *hca_ptr);
 void dapli_cq_thread_destroy(struct dapl_hca *hca_ptr);
+void dapli_async_event_cb(struct _ib_hca_transport *tp);
+void dapli_cq_event_cb(struct _ib_hca_transport *tp);
 DAT_RETURN dapli_socket_disconnect(dp_ib_cm_handle_t cm_ptr);
 void dapls_print_cm_list(IN DAPL_IA *ia_ptr);
 dp_ib_cm_handle_t dapls_ib_cm_create(DAPL_EP *ep);
diff --git a/dapl/openib_scm/device.c b/dapl/openib_scm/device.c
index d5089aa..9c91b78 100644
--- a/dapl/openib_scm/device.c
+++ b/dapl/openib_scm/device.c
@@ -57,6 +57,96 @@ static const char rcsid[] = "$Id:  $";
 
 #include <stdlib.h>
 
+ib_thread_state_t g_ib_thread_state = 0;
+DAPL_OS_THREAD g_ib_thread;
+DAPL_OS_LOCK g_hca_lock;
+struct dapl_llist_entry *g_hca_list;
+
+void dapli_thread(void *arg);
+DAT_RETURN  dapli_ib_thread_init(void);
+void dapli_ib_thread_destroy(void);
+
+#if defined(_WIN64) || defined(_WIN32)
+#include "..\..\..\..\..\etc\user\comp_channel.cpp"
+#include <rdma\winverbs.h>
+
+struct ibvw_windata windata;
+
+static int dapls_os_init(void)
+{
+	return ibvw_get_windata(&windata, IBVW_WINDATA_VERSION);
+}
+
+static void dapls_os_release(void)
+{
+	if (windata.comp_mgr)
+		ibvw_release_windata(&windata, IBVW_WINDATA_VERSION);
+	windata.comp_mgr = NULL;
+}
+
+static int dapls_config_verbs(struct ibv_context *verbs)
+{
+	verbs->channel.Milliseconds = 0;
+	return 0;
+}
+
+static int dapls_config_comp_channel(struct ibv_comp_channel *channel)
+{
+	channel->comp_channel.Milliseconds = 0;
+	return 0;
+}
+
+static int dapls_thread_signal(void)
+{
+	CompManagerCancel(windata.comp_mgr);
+	return 0;
+}
+#else				// _WIN64 || WIN32
+int g_ib_pipe[2];
+
+static int dapls_os_init(void)
+{
+	/* create pipe for waking up work thread */
+	return pipe(g_ib_pipe);
+}
+
+static void dapls_os_release(void)
+{
+	/* close pipe? */
+}
+
+static int dapls_config_fd(int fd)
+{
+	int opts;
+
+	opts = fcntl(fd, F_GETFL);
+	if (opts < 0 || fcntl(fd, F_SETFL, opts | O_NONBLOCK) < 0) {
+		dapl_log(DAPL_DBG_TYPE_ERR,
+			 " dapls_config_fd: fcntl on fd %d ERR %d %s\n",
+			 fd, opts, strerror(errno));
+		return errno;
+	}
+
+	return 0;
+}
+
+static int dapls_config_verbs(struct ibv_context *verbs)
+{
+	return dapls_config_fd(verbs->async_fd);
+}
+
+static int dapls_config_comp_channel(struct ibv_comp_channel *channel)
+{
+	return dapls_config_fd(channel->fd);
+}
+
+static int dapls_thread_signal(void)
+{
+	return write(g_ib_pipe[1], "w", sizeof "w");
+}
+#endif
+
+
 static int32_t create_cr_pipe(IN DAPL_HCA * hca_ptr)
 {
 	DAPL_SOCKET listen_socket;
@@ -130,35 +220,22 @@ static void destroy_cr_pipe(IN DAPL_HCA * hca_ptr)
  */
 int32_t dapls_ib_init(void)
 {
-	return 0;
-}
+	/* initialize hca_list */
+	dapl_os_lock_init(&g_hca_lock);
+	dapl_llist_init_head(&g_hca_list);
 
-int32_t dapls_ib_release(void)
-{
-	return 0;
-}
+	if (dapls_os_init())
+		return 1;
 
-#if defined(_WIN64) || defined(_WIN32)
-int dapls_config_comp_channel(struct ibv_comp_channel *channel)
-{
 	return 0;
 }
-#else				// _WIN64 || WIN32
-int dapls_config_comp_channel(struct ibv_comp_channel *channel)
-{
-	int opts;
-
-	opts = fcntl(channel->fd, F_GETFL);	/* uCQ */
-	if (opts < 0 || fcntl(channel->fd, F_SETFL, opts | O_NONBLOCK) < 0) {
-		dapl_log(DAPL_DBG_TYPE_ERR,
-			 " dapls_create_comp_channel: fcntl on ib_cq->fd %d ERR %d %s\n",
-			 channel->fd, opts, strerror(errno));
-		return errno;
-	}
 
+int32_t dapls_ib_release(void)
+{
+	dapli_ib_thread_destroy();
+	dapls_os_release();
 	return 0;
 }
-#endif
 
 /*
  * dapls_ib_open_hca
@@ -213,7 +290,7 @@ DAT_RETURN dapls_ib_open_hca(IN IB_HCA_NAME hca_name, IN DAPL_HCA * hca_ptr)
 		 " open_hca: device %s not found\n", hca_name);
 	goto err;
 
-      found:
+found:
 	dapl_dbg_log(DAPL_DBG_TYPE_UTIL, " open_hca: Found dev %s %016llx\n",
 		     ibv_get_device_name(hca_ptr->ib_trans.ib_dev),
 		     (unsigned long long)
@@ -227,6 +304,8 @@ DAT_RETURN dapls_ib_open_hca(IN IB_HCA_NAME hca_name, IN DAPL_HCA * hca_ptr)
 			 strerror(errno));
 		goto err;
 	}
+	hca_ptr->ib_trans.ib_ctx = hca_ptr->ib_hca_handle;
+	dapls_config_verbs(hca_ptr->ib_hca_handle);
 
 	/* get lid for this hca-port, network order */
 	if (ibv_query_port(hca_ptr->ib_hca_handle,
@@ -271,15 +350,8 @@ DAT_RETURN dapls_ib_open_hca(IN IB_HCA_NAME hca_name, IN DAPL_HCA * hca_ptr)
 	hca_ptr->ib_trans.mtu =
 	    dapl_ib_mtu(dapl_os_get_env_val("DAPL_IB_MTU", SCM_IB_MTU));
 
-#ifndef CQ_WAIT_OBJECT
-	/* initialize cq_lock */
-	dat_status = dapl_os_lock_init(&hca_ptr->ib_trans.cq_lock);
-	if (dat_status != DAT_SUCCESS) {
-		dapl_log(DAPL_DBG_TYPE_ERR,
-			 " open_hca: failed to init cq_lock\n");
-		goto bail;
-	}
-	/* EVD events without direct CQ channels, non-blocking */
+
+	/* EVD events without direct CQ channels, CNO support */
 	hca_ptr->ib_trans.ib_cq =
 	    ibv_create_comp_channel(hca_ptr->ib_hca_handle);
 	if (hca_ptr->ib_trans.ib_cq == NULL) {
@@ -288,18 +360,28 @@ DAT_RETURN dapls_ib_open_hca(IN IB_HCA_NAME hca_name, IN DAPL_HCA * hca_ptr)
 			 strerror(errno));
 		goto bail;
 	}
-
-	if (dapls_config_comp_channel(hca_ptr->ib_trans.ib_cq)) {
-		goto bail;
-	}
-
-	if (dapli_cq_thread_init(hca_ptr)) {
+	dapls_config_comp_channel(hca_ptr->ib_trans.ib_cq);
+	
+	dat_status = dapli_ib_thread_init();
+	if (dat_status != DAT_SUCCESS) {
 		dapl_log(DAPL_DBG_TYPE_ERR,
-			 " open_hca: cq_thread_init failed for %s\n",
-			 ibv_get_device_name(hca_ptr->ib_trans.ib_dev));
+			 " open_hca: failed to init cq thread lock\n");
 		goto bail;
 	}
-#endif				/* CQ_WAIT_OBJECT */
+	/* 
+	 * Put new hca_transport on list for async and CQ event processing 
+	 * Wakeup work thread to add to polling list
+	 */
+	dapl_llist_init_entry((DAPL_LLIST_ENTRY *)&hca_ptr->ib_trans.entry);
+	dapl_os_lock(&g_hca_lock);
+	dapl_llist_add_tail(&g_hca_list,
+			    (DAPL_LLIST_ENTRY *) &hca_ptr->ib_trans.entry,
+			    &hca_ptr->ib_trans.entry);
+	if (dapls_thread_signal() == -1)
+		dapl_log(DAPL_DBG_TYPE_UTIL,
+			 " open_hca: thread wakeup error = %s\n",
+			 strerror(errno));
+	dapl_os_unlock(&g_hca_lock);
 
 	/* initialize cr_list lock */
 	dat_status = dapl_os_lock_init(&hca_ptr->ib_trans.lock);
@@ -333,7 +415,7 @@ DAT_RETURN dapls_ib_open_hca(IN IB_HCA_NAME hca_name, IN DAPL_HCA * hca_ptr)
 
 	/* wait for thread */
 	while (hca_ptr->ib_trans.cr_state != IB_THREAD_RUN) {
-		dapl_os_sleep_usec(2000);
+		dapl_os_sleep_usec(1000);
 	}
 
 	dapl_dbg_log(DAPL_DBG_TYPE_UTIL,
@@ -380,33 +462,297 @@ DAT_RETURN dapls_ib_close_hca(IN DAPL_HCA * hca_ptr)
 {
 	dapl_dbg_log(DAPL_DBG_TYPE_UTIL, " close_hca: %p\n", hca_ptr);
 
-#ifndef CQ_WAIT_OBJECT
-	dapli_cq_thread_destroy(hca_ptr);
-	dapl_os_lock_destroy(&hca_ptr->ib_trans.cq_lock);
-#endif				/* CQ_WAIT_OBJECT */
-
 	if (hca_ptr->ib_hca_handle != IB_INVALID_HANDLE) {
 		if (ibv_close_device(hca_ptr->ib_hca_handle))
 			return (dapl_convert_errno(errno, "ib_close_device"));
 		hca_ptr->ib_hca_handle = IB_INVALID_HANDLE;
 	}
 
+	dapl_os_lock(&g_hca_lock);
+	if (g_ib_thread_state != IB_THREAD_RUN) {
+		dapl_os_unlock(&g_hca_lock);
+		return (DAT_SUCCESS);
+	}
+	dapl_os_unlock(&g_hca_lock);
+
 	/* destroy cr_thread and lock */
 	hca_ptr->ib_trans.cr_state = IB_THREAD_CANCEL;
-	if (send(hca_ptr->ib_trans.scm[1], "w", sizeof "w", 0) == -1)
-		dapl_log(DAPL_DBG_TYPE_UTIL,
-			 " thread_destroy: thread wakeup err = %s\n",
-			 strerror(errno));
+	send(hca_ptr->ib_trans.scm[1], "w", sizeof "w", 0);
 	while (hca_ptr->ib_trans.cr_state != IB_THREAD_EXIT) {
 		dapl_dbg_log(DAPL_DBG_TYPE_UTIL,
 			     " close_hca: waiting for cr_thread\n");
-		if (send(hca_ptr->ib_trans.scm[1], "w", sizeof "w", 0) == -1)
-			dapl_log(DAPL_DBG_TYPE_UTIL,
-				 " thread_destroy: thread wakeup err = %s\n",
-				 strerror(errno));
-		dapl_os_sleep_usec(2000);
+		send(hca_ptr->ib_trans.scm[1], "w", sizeof "w", 0);
+		dapl_os_sleep_usec(1000);
 	}
 	dapl_os_lock_destroy(&hca_ptr->ib_trans.lock);
 	destroy_cr_pipe(hca_ptr); /* no longer need pipe */
+	
+	/* 
+	 * Remove hca from async event processing list
+	 * Wakeup work thread to remove from polling list
+	 */
+	hca_ptr->ib_trans.destroy = 1;
+	if (dapls_thread_signal() == -1)
+		dapl_log(DAPL_DBG_TYPE_UTIL,
+			 " destroy: thread wakeup error = %s\n",
+			 strerror(errno));
+
+	/* wait for thread to remove HCA references */
+	while (hca_ptr->ib_trans.destroy != 2) {
+		if (dapls_thread_signal() == -1)
+			dapl_log(DAPL_DBG_TYPE_UTIL,
+				 " destroy: thread wakeup error = %s\n",
+				 strerror(errno));
+		dapl_os_sleep_usec(1000);
+	}
+
 	return (DAT_SUCCESS);
 }
+
+DAT_RETURN dapli_ib_thread_init(void)
+{
+	DAT_RETURN dat_status;
+
+	dapl_os_lock(&g_hca_lock);
+	if (g_ib_thread_state != IB_THREAD_INIT) {
+		dapl_os_unlock(&g_hca_lock);
+		return DAT_SUCCESS;
+	}
+
+	g_ib_thread_state = IB_THREAD_CREATE;
+	dapl_os_unlock(&g_hca_lock);
+
+	/* create thread to process inbound connect request */
+	dat_status = dapl_os_thread_create(dapli_thread, NULL, &g_ib_thread);
+	if (dat_status != DAT_SUCCESS)
+		return (dapl_convert_errno(errno,
+					   "create_thread ERR:"
+					   " check resource limits"));
+
+	/* wait for thread to start */
+	dapl_os_lock(&g_hca_lock);
+	while (g_ib_thread_state != IB_THREAD_RUN) {
+		dapl_dbg_log(DAPL_DBG_TYPE_UTIL,
+			     " ib_thread_init: waiting for ib_thread\n");
+		dapl_os_unlock(&g_hca_lock);
+		dapl_os_sleep_usec(1000);
+		dapl_os_lock(&g_hca_lock);
+	}
+	dapl_os_unlock(&g_hca_lock);
+
+	return DAT_SUCCESS;
+}
+
+void dapli_ib_thread_destroy(void)
+{
+	int retries = 10;
+
+	dapl_dbg_log(DAPL_DBG_TYPE_UTIL,
+		     " ib_thread_destroy(%d)\n", dapl_os_getpid());
+	/* 
+	 * wait for async thread to terminate. 
+	 * pthread_join would be the correct method
+	 * but some applications have some issues
+	 */
+
+	/* destroy ib_thread, wait for termination, if not already */
+	dapl_os_lock(&g_hca_lock);
+	if (g_ib_thread_state != IB_THREAD_RUN)
+		goto bail;
+
+	g_ib_thread_state = IB_THREAD_CANCEL;
+	if (dapls_thread_signal() == -1)
+		dapl_log(DAPL_DBG_TYPE_UTIL,
+			 " destroy: thread wakeup error = %s\n",
+			 strerror(errno));
+	while ((g_ib_thread_state != IB_THREAD_EXIT) && (retries--)) {
+		dapl_dbg_log(DAPL_DBG_TYPE_UTIL,
+			     " ib_thread_destroy: waiting for ib_thread\n");
+		if (dapls_thread_signal() == -1)
+			dapl_log(DAPL_DBG_TYPE_UTIL,
+				 " destroy: thread wakeup error = %s\n",
+				 strerror(errno));
+		dapl_os_unlock(&g_hca_lock);
+		dapl_os_sleep_usec(2000);
+		dapl_os_lock(&g_hca_lock);
+	}
+bail:
+	dapl_os_unlock(&g_hca_lock);
+
+	dapl_dbg_log(DAPL_DBG_TYPE_UTIL,
+		     " ib_thread_destroy(%d) exit\n", dapl_os_getpid());
+}
+
+
+#if defined(_WIN64) || defined(_WIN32)
+/* work thread for uAT, uCM, CQ, and async events */
+void dapli_thread(void *arg)
+{
+	struct _ib_hca_transport *hca;
+	struct _ib_hca_transport *uhca[8];
+	COMP_CHANNEL *channel;
+	int ret, idx, cnt;
+
+	dapl_dbg_log(DAPL_DBG_TYPE_UTIL, " ib_thread(%d,0x%x): ENTER: \n",
+		     dapl_os_getpid(), g_ib_thread);
+
+	dapl_os_lock(&g_hca_lock);
+	for (g_ib_thread_state = IB_THREAD_RUN;
+	     g_ib_thread_state == IB_THREAD_RUN; 
+	     dapl_os_lock(&g_hca_lock)) {
+
+		idx = 0;
+		hca = dapl_llist_is_empty(&g_hca_list) ? NULL :
+		      dapl_llist_peek_head(&g_hca_list);
+
+		while (hca) {
+			uhca[idx++] = hca;
+			hca = dapl_llist_next_entry(&g_hca_list,
+						    (DAPL_LLIST_ENTRY *)
+						    &hca->entry);
+		}
+		cnt = idx;
+
+		dapl_os_unlock(&g_hca_lock);
+		ret = CompManagerPoll(windata.comp_mgr, INFINITE, &channel);
+
+		dapl_dbg_log(DAPL_DBG_TYPE_UTIL,
+			     " ib_thread(%d) poll_event 0x%x\n",
+			     dapl_os_getpid(), ret);
+
+
+		/* check and process ASYNC events, per device */
+		for (idx = 0; idx < cnt; idx++) {
+			if (uhca[idx]->destroy == 1) {
+				dapl_os_lock(&g_hca_lock);
+				dapl_llist_remove_entry(&g_hca_list,
+							(DAPL_LLIST_ENTRY *)
+							&uhca[idx]->entry);
+				dapl_os_unlock(&g_hca_lock);
+				uhca[idx]->destroy = 2;
+			} else {
+				dapli_cq_event_cb(uhca[idx]);
+				dapli_async_event_cb(uhca[idx]);
+			}
+		}
+	}
+
+	dapl_dbg_log(DAPL_DBG_TYPE_UTIL, " ib_thread(%d) EXIT\n",
+		     dapl_os_getpid());
+	g_ib_thread_state = IB_THREAD_EXIT;
+	dapl_os_unlock(&g_hca_lock);
+}
+#else				// _WIN64 || WIN32
+
+/* work thread for uAT, uCM, CQ, and async events */
+void dapli_thread(void *arg)
+{
+	struct pollfd ufds[__FD_SETSIZE];
+	struct _ib_hca_transport *uhca[__FD_SETSIZE] = { NULL };
+	struct _ib_hca_transport *hca;
+	int ret, idx, fds;
+	char rbuf[2];
+
+	dapl_dbg_log(DAPL_DBG_TYPE_THREAD,
+		     " ib_thread(%d,0x%x): ENTER: pipe %d \n",
+		     dapl_os_getpid(), g_ib_thread, g_ib_pipe[0]);
+
+	/* Poll across pipe, CM, AT never changes */
+	dapl_os_lock(&g_hca_lock);
+	g_ib_thread_state = IB_THREAD_RUN;
+
+	ufds[0].fd = g_ib_pipe[0];	/* pipe */
+	ufds[0].events = POLLIN;
+
+	while (g_ib_thread_state == IB_THREAD_RUN) {
+
+		/* build ufds after pipe and uCMA events */
+		ufds[0].revents = 0;
+		idx = 0;
+
+		/*  Walk HCA list and setup async and CQ events */
+		if (!dapl_llist_is_empty(&g_hca_list))
+			hca = dapl_llist_peek_head(&g_hca_list);
+		else
+			hca = NULL;
+
+		while (hca) {
+
+			/* uASYNC events */
+			ufds[++idx].fd = hca->ib_ctx->async_fd;
+			ufds[idx].events = POLLIN;
+			ufds[idx].revents = 0;
+			uhca[idx] = hca;
+
+			/* CQ events are non-direct with CNO's */
+			ufds[++idx].fd = hca->ib_cq->fd;
+			ufds[idx].events = POLLIN;
+			ufds[idx].revents = 0;
+			uhca[idx] = hca;
+
+			dapl_dbg_log(DAPL_DBG_TYPE_THREAD,
+				     " ib_thread(%d) poll_fd: hca[%d]=%p,"
+				     " async=%d pipe=%d \n",
+				     dapl_os_getpid(), hca, ufds[idx - 1].fd,
+				     ufds[0].fd);
+
+			hca = dapl_llist_next_entry(&g_hca_list,
+						    (DAPL_LLIST_ENTRY *)
+						    &hca->entry);
+		}
+
+		/* unlock, and setup poll */
+		fds = idx + 1;
+		dapl_os_unlock(&g_hca_lock);
+		ret = poll(ufds, fds, -1);
+		if (ret <= 0) {
+			dapl_dbg_log(DAPL_DBG_TYPE_THREAD,
+				     " ib_thread(%d): ERR %s poll\n",
+				     dapl_os_getpid(), strerror(errno));
+			dapl_os_lock(&g_hca_lock);
+			continue;
+		}
+
+		dapl_dbg_log(DAPL_DBG_TYPE_THREAD,
+			     " ib_thread(%d) poll_event: "
+			     " async=0x%x pipe=0x%x \n",
+			     dapl_os_getpid(), ufds[idx].revents,
+			     ufds[0].revents);
+
+		/* check and process CQ and ASYNC events, per device */
+		for (idx = 1; idx < fds; idx++) {
+			if (ufds[idx].revents == POLLIN) {
+				dapli_cq_event_cb(uhca[idx]);
+				dapli_async_event_cb(uhca[idx]);
+			}
+		}
+
+		/* check and process user events, PIPE */
+		if (ufds[0].revents == POLLIN) {
+			if (read(g_ib_pipe[0], rbuf, 2) == -1)
+				dapl_log(DAPL_DBG_TYPE_THREAD,
+					 " cr_thread: pipe rd err= %s\n",
+					 strerror(errno));
+
+			/* cleanup any device on list marked for destroy */
+			for (idx = 1; idx < fds; idx++) {
+				if (uhca[idx] && uhca[idx]->destroy == 1) {
+					dapl_os_lock(&g_hca_lock);
+					dapl_llist_remove_entry(
+						&g_hca_list,
+						(DAPL_LLIST_ENTRY*)
+						&uhca[idx]->entry);
+					dapl_os_unlock(&g_hca_lock);
+					uhca[idx]->destroy = 2;
+				}
+			}
+		}
+		dapl_os_lock(&g_hca_lock);
+	}
+
+	dapl_dbg_log(DAPL_DBG_TYPE_THREAD, " ib_thread(%d) EXIT\n",
+		     dapl_os_getpid());
+	g_ib_thread_state = IB_THREAD_EXIT;
+	dapl_os_unlock(&g_hca_lock);
+}
+#endif
diff --git a/test/dtest/dtest.c b/test/dtest/dtest.c
index 77d78b2..739ccca 100755
--- a/test/dtest/dtest.c
+++ b/test/dtest/dtest.c
@@ -689,10 +689,9 @@ send_msg(void *data,
 				LOGPRINTF("%d cno wait return evd_handle=%p\n",
 					  getpid(), evd);
 				if (evd != h_dto_req_evd) {
-					fprintf(stderr,
-						"%d Error waiting on h_dto_cno: evd != h_dto_req_evd\n",
-						getpid());
-					return (DAT_ABORT);
+					/* CNO timeout, already on EVD */
+					if (evd != NULL)
+						return (ret);
 				}
 			}
 			/* use wait to dequeue */
@@ -1085,10 +1084,9 @@ DAT_RETURN connect_ep(char *hostname, DAT_CONN_QUAL conn_id)
 			LOGPRINTF("%d cno wait return evd_handle=%p\n",
 				  getpid(), evd);
 			if (evd != h_dto_rcv_evd) {
-				fprintf(stderr,
-					"%d Error waiting on h_dto_cno: evd != h_dto_rcv_evd\n",
-					getpid());
-				return (DAT_ABORT);
+				/* CNO timeout, already on EVD */
+				if (evd != NULL)
+					return (ret);
 			}
 		}
 		/* use wait to dequeue */
@@ -1319,10 +1317,9 @@ DAT_RETURN do_rdma_write_with_msg(void)
 			LOGPRINTF("%d cno wait return evd_handle=%p\n",
 				  getpid(), evd);
 			if (evd != h_dto_rcv_evd) {
-				fprintf(stderr,
-					"%d Error waiting on h_dto_cno: "
-					"evd != h_dto_rcv_evd\n", getpid());
-				return (ret);
+				/* CNO timeout, already on EVD */
+				if (evd != NULL)
+					return (ret);
 			}
 		}
 		/* use wait to dequeue */
@@ -1446,10 +1443,9 @@ DAT_RETURN do_rdma_read_with_msg(void)
 				LOGPRINTF("%d cno wait return evd_handle=%p\n",
 					  getpid(), evd);
 				if (evd != h_dto_req_evd) {
-					fprintf(stderr,
-						"%d Error waiting on h_dto_cno: evd != h_dto_req_evd\n",
-						getpid());
-					return (DAT_ABORT);
+					/* CNO timeout, already on EVD */
+					if (evd != NULL)
+						return (ret);
 				}
 			}
 			/* use wait to dequeue */
@@ -1501,6 +1497,15 @@ DAT_RETURN do_rdma_read_with_msg(void)
 	 */
 	printf("%d Sending RDMA read completion message\n", getpid());
 
+	/* give remote chance to process read completes */
+	if (use_cno) {
+#if defined(_WIN32) || defined(_WIN64)
+		Sleep(1000);
+#else
+		sleep(1);
+#endif
+	}
+
 	ret = send_msg(&rmr_send_msg,
 		       sizeof(DAT_RMR_TRIPLET),
 		       lmr_context_send_msg,
@@ -1525,14 +1530,14 @@ DAT_RETURN do_rdma_read_with_msg(void)
 		LOGPRINTF("%d waiting for message receive event\n", getpid());
 		if (use_cno) {
 			DAT_EVD_HANDLE evd = DAT_HANDLE_NULL;
-			ret = dat_cno_wait(h_dto_cno, DTO_TIMEOUT, &evd);
+			
+		        ret = dat_cno_wait(h_dto_cno, DTO_TIMEOUT, &evd);
 			LOGPRINTF("%d cno wait return evd_handle=%p\n",
 				  getpid(), evd);
 			if (evd != h_dto_rcv_evd) {
-				fprintf(stderr,
-					"%d Error waiting on h_dto_cno: evd != h_dto_rcv_evd\n",
-					getpid());
-				return (ret);
+				/* CNO timeout, already on EVD */
+				if (evd != NULL)
+					return (ret);
 			}
 		}
 		/* use wait to dequeue */
@@ -1693,10 +1698,9 @@ DAT_RETURN do_ping_pong_msg()
 				LOGPRINTF("%d cno wait return evd_handle=%p\n",
 					  getpid(), evd);
 				if (evd != h_dto_rcv_evd) {
-					fprintf(stderr,
-						"%d Error waiting on h_dto_cno: evd != h_dto_rcv_evd\n",
-						getpid());
-					return (ret);
+					/* CNO timeout, already on EVD */
+					if (evd != NULL)
+						return (ret);
 				}
 			}
 			/* use wait to dequeue */
-- 
1.5.2.5





More information about the ofw mailing list