[openib-general] [PATCH] udapl provider

Arlin Davis ardavis at ichips.intel.com
Tue Apr 19 14:49:56 PDT 2005


Fixes for socket CM to prevent blocking and allow more uDAPL applications to run successfully.

Signed-off-by: Arlin Davis <ardavis at ichips.intel.com>

Index: udapl/Makefile
===================================================================
--- udapl/Makefile	(revision 2190)
+++ udapl/Makefile	(working copy)
@@ -122,7 +122,6 @@ endif
 #
 ifeq ($(VERBS),openib)
 PROVIDER = $(TOPDIR)/../openib
-DAPL_IBLIB_DIR = /usr/local/lib
 CFLAGS   += -DSOCKET_CM -DOPENIB -DCQ_WAIT_OBJECT
 CFLAGS   += -I/usr/local/include/infiniband
 endif
@@ -139,7 +138,7 @@ endif
 
 CFLAGS   += -I. 
 CFLAGS   += -I.. 
-CFLAGS   += -I../../dat/include 
+CFLAGS   += -I../dat/include 
 CFLAGS   += -I../include 
 
 CFLAGS   += -I$(PROVIDER)
@@ -234,8 +233,9 @@ PROVIDER_SRCS += dapl_openib_util.c dapl
 endif
 
 ifeq ($(VERBS),openib)
-LDFLAGS += -libverbs 
-LDFLAGS += -L /usr/local/lib/ 
+LDFLAGS += -libverbs /usr/local/lib/infiniband/mthca.so
+LDFLAGS += -rpath /usr/local/lib -L /usr/local/lib
+LDFLAGS += -rpath /usr/local/lib/infiniband 
 PROVIDER_SRCS  = dapl_ib_util.c dapl_ib_cq.c dapl_ib_qp.c 
 PROVIDER_SRCS += dapl_ib_cm.c dapl_ib_mem.c
 endif
Index: openib/TODO
===================================================================
--- openib/TODO	(revision 2190)
+++ openib/TODO	(working copy)
@@ -2,16 +2,17 @@
 IB Verbs:
 - CQ resize?
 - query call to get current qp state 
-- ibv_get_cq_event() blocks until event arrives. need timed event and wakeup
+- ibv_get_cq_event() needs timed event call and wakeup
 - query call to get device attributes
-- poll_cq return codes not exported
+- current implementation only supports one event per device
+- memory window support
 
 DAPL:
 - Build udapl issues with mthca having reverse dependencies to ibverbs
-- When CM arrives: change modify_qp_state RTS RTR calls
+- When real CM arrives: change modify_qp_state RTS RTR calls
 - reinit EP needs a QP timewait completion notification
-- disconnect clean 
-- add cq_object wakeup, time based cq_object wait
+- code disconnect clean 
+- add cq_object wakeup, time based cq_object wait when verbs support arrives
 - update uDAPL code with real CM and ATS support
 - etc, etc.
 
Index: openib/dapl_ib_util.c
===================================================================
--- openib/dapl_ib_util.c	(revision 2190)
+++ openib/dapl_ib_util.c	(working copy)
@@ -53,18 +53,12 @@ static const char rcsid[] = "$Id:  $";
 #include "dapl_adapter_util.h"
 #include "dapl_ib_util.h"
 
-#include <dlfcn.h>
 #include <stdlib.h>
 #include <netinet/tcp.h>
 #include <sys/utsname.h>
 #include <unistd.h>	
 #include <fcntl.h>
 
-/* set default path */
-#define OPENIB_VERBS_PATH_DEFAULT	"/usr/local/lib/libibverbs.so"
-static char *				ibv_path;
-static void *				ibv_handle = NULL;
-
 int g_dapl_loopback_connection = 0;
 
 #ifdef SOCKET_CM
@@ -110,32 +104,11 @@ DAT_RETURN getipaddr( char *addr, int ad
  */
 int32_t dapls_ib_init (void)
 {	
-	dapl_dbg_log(DAPL_DBG_TYPE_UTIL, "dapls_ib_init() called\n");
-
-	ibv_path = getenv("OPENIB_VERBS_PATH");
-	
-	if (ibv_path == NULL)
-		ibv_path = OPENIB_VERBS_PATH_DEFAULT;
-	
-	dapl_dbg_log(DAPL_DBG_TYPE_UTIL," loading verbs library %s\n",ibv_path);
-
-	ibv_handle = dlopen(ibv_path, RTLD_NOW | RTLD_GLOBAL);
-	if (ibv_handle == NULL ) {
-		dapl_dbg_log(DAPL_DBG_TYPE_ERR,
-			     " library load failure %s\n", dlerror());
-		return -1;
-	}
-
 	return 0;
 }
 
 int32_t dapls_ib_release (void)
 {
-	dapl_dbg_log (DAPL_DBG_TYPE_UTIL, "dapls_ib_release() called\n");
-	
-	if (ibv_handle)
-		dlclose(ibv_handle);
-
 	return 0;
 }
 
@@ -166,13 +139,6 @@ DAT_RETURN dapls_ib_open_hca (
 	dapl_dbg_log (DAPL_DBG_TYPE_UTIL, 
 		      " open_hca: %s - %p\n", hca_name, hca_ptr );
 
-	if (ibv_handle == NULL) {
-		dapl_dbg_log(DAPL_DBG_TYPE_ERR,
-			     " Failure loading IB verbs library %s \n", 
-			     ibv_path);
-		return DAT_PROVIDER_NOT_FOUND;
-	}
-
 	/* Get list of all IB devices, find match, open */
 	dev_list = ibv_get_devices();
 	dlist_start(dev_list);
@@ -201,6 +167,29 @@ DAT_RETURN dapls_ib_open_hca (
 	}
   
 #ifdef SOCKET_CM
+	/* initialize cr_list lock */
+	dat_status = dapl_os_lock_init(&hca_ptr->ib_trans.lock);
+	if (dat_status != DAT_SUCCESS)
+	{
+		dapl_dbg_log (DAPL_DBG_TYPE_ERR, 
+			" open_hca: failed to init lock\n");
+		return dat_status;
+	}
+
+	/* initialize CM list for listens on this HCA */
+	dapl_llist_init_head(&hca_ptr->ib_trans.list);
+
+	/* create thread to process inbound connect request */
+	dat_status = dapl_os_thread_create(cr_thread, 
+					   (void*)hca_ptr, 
+					   &hca_ptr->ib_trans.thread );
+	if (dat_status != DAT_SUCCESS)
+	{
+		dapl_dbg_log (DAPL_DBG_TYPE_ERR, 
+		" open_hca: failed to create thread\n");
+		return dat_status;
+	}
+
 	/* get the IP address of the device */
 	dat_status = getipaddr((char*)&hca_ptr->hca_address, 
 				sizeof(DAT_SOCK_ADDR6) );
@@ -243,6 +232,20 @@ DAT_RETURN dapls_ib_close_hca (	IN   DAP
 			return(dapl_convert_errno(errno,"ib_close_device"));
 		hca_ptr->ib_hca_handle = IB_INVALID_HANDLE;
 	}
+
+#if SOCKET_CM
+	/* destroy cr_thread and lock */
+	hca_ptr->ib_trans.destroy = 1;
+	while (hca_ptr->ib_trans.destroy) {
+		struct timespec	sleep, remain;
+		sleep.tv_sec = 0;
+		sleep.tv_nsec = 10000000; /* 10 ms */
+		dapl_dbg_log(DAPL_DBG_TYPE_UTIL, 
+			     " close_hca: waiting for cr_thread\n");
+		nanosleep (&sleep, &remain);
+	}
+	dapl_os_lock_destroy(&hca_ptr->ib_trans.lock);
+#endif
 	return (DAT_SUCCESS);
 }
   
@@ -297,18 +300,18 @@ DAT_RETURN dapls_ib_query_hca (
 			((struct sockaddr_in *)ia_attr->ia_address_ptr)->sin_addr.s_addr >> 24 & 0xff );
 
 	/* TODO: need verbs query call */
-	ia_attr->max_eps                  = 1000;
-        ia_attr->max_dto_per_ep           = 1000;
-        ia_attr->max_rdma_read_per_ep     = 4;
-        ia_attr->max_evds                 = 1000;
-        ia_attr->max_evd_qlen             = 1000;
-	ia_attr->max_iov_segments_per_dto = 10;
-        ia_attr->max_lmrs                 = 1000;
+	ia_attr->max_eps                  = 64000;
+        ia_attr->max_dto_per_ep           = 64000;
+        ia_attr->max_rdma_read_per_ep     = 8;
+        ia_attr->max_evds                 = 64000;
+        ia_attr->max_evd_qlen             = 64000;
+	ia_attr->max_iov_segments_per_dto = 32;
+        ia_attr->max_lmrs                 = 64000;
         ia_attr->max_lmr_block_size       = 0x80000000;
-        ia_attr->max_rmrs                 = 1000;
+        ia_attr->max_rmrs                 = 64000;
         ia_attr->max_lmr_virtual_address  = 0x80000000;
         ia_attr->max_rmr_target_address   = 0x80000000;
-        ia_attr->max_pzs                  = 1000;
+        ia_attr->max_pzs                  = 64000;
         ia_attr->max_mtu_size             = 0x80000000;
         ia_attr->max_rdma_size            = 0x80000000;
         ia_attr->num_transport_attr       = 0;
@@ -333,12 +336,12 @@ DAT_RETURN dapls_ib_query_hca (
 	if (ep_attr != NULL) {
 		ep_attr->max_mtu_size     = 0x80000000;
 		ep_attr->max_rdma_size    = 0x80000000;
-		ep_attr->max_recv_dtos    = 1000;
-		ep_attr->max_request_dtos = 1000;
-		ep_attr->max_recv_iov     = 10;
-		ep_attr->max_request_iov  = 10;
-		ep_attr->max_rdma_read_in = 4;
-		ep_attr->max_rdma_read_out= 4;
+		ep_attr->max_recv_dtos    = 64000;
+		ep_attr->max_request_dtos = 64000;
+		ep_attr->max_recv_iov     = 32;
+		ep_attr->max_request_iov  = 32;
+		ep_attr->max_rdma_read_in = 8;
+		ep_attr->max_rdma_read_out= 8;
 		dapl_dbg_log (DAPL_DBG_TYPE_UTIL, 
 			" query_hca: MAX msg %llu dto %d iov %d rdma i%d,o%d\n", 
 			ep_attr->max_mtu_size,
@@ -394,7 +397,7 @@ DAT_RETURN dapls_ib_setup_async_callback
 	    hca_ptr->async_cq_error = callback;
 	    break;
 	case DAPL_ASYNC_CQ_COMPLETION:
-	   hca_ptr->async_cq_completion = callback;
+	   hca_ptr->async_cq = callback;
 	   break;
 	case DAPL_ASYNC_QP_ERROR:
 	    hca_ptr->async_qp_error = callback;
Index: openib/dapl_ib_mem.c
===================================================================
--- openib/dapl_ib_mem.c	(revision 2190)
+++ openib/dapl_ib_mem.c	(working copy)
@@ -1,26 +1,25 @@
 /*
- * Copyright (c) 2002-2003, Network Appliance, Inc. All rights reserved.
- *
- * This Software is licensed under either one of the following two licenses:
+ * This Software is licensed under one of the following licenses:
  *
  * 1) under the terms of the "Common Public License 1.0" a copy of which is
- *    in the file LICENSE.txt in the root directory. The license is also
  *    available from the Open Source Initiative, see
  *    http://www.opensource.org/licenses/cpl.php.
- * OR
  *
- * 2) under the terms of the "The BSD License" a copy of which is in the file
- *    LICENSE2.txt in the root directory. The license is also available from
- *    the Open Source Initiative, see
+ * 2) under the terms of the "The BSD License" a copy of which is
+ *    available from the Open Source Initiative, see
  *    http://www.opensource.org/licenses/bsd-license.php.
  *
- * Licensee has the right to choose either one of the above two licenses.
+ * 3) under the terms of the "GNU General Public License (GPL) Version 2" a
+ *    copy of which is available from the Open Source Initiative, see
+ *    http://www.opensource.org/licenses/gpl-license.php.
+ *
+ * Licensee has the right to choose one of the above licenses.
  *
- * Redistributions of source code must retain both the above copyright
- * notice and either one of the license notices.
+ * Redistributions of source code must retain the above copyright
+ * notice and one of the license notices.
  *
  * Redistributions in binary form must reproduce both the above copyright
- * notice, either one of the license notices in the documentation
+ * notice, one of the license notices in the documentation
  * and/or other materials provided with the distribution.
  */
 
@@ -31,7 +30,7 @@
  * PURPOSE: Intel DET APIs: Memory windows, registration,
  *           and protection domain 
  *
- * $Id:$
+ * $Id: $
  *
  **********************************************************************/
 
@@ -182,8 +181,11 @@ dapls_ib_mr_register (
 			ia_ptr, lmr, virt_addr, length, privileges );
 
 	/* TODO: shared memory */
-	if (lmr->param.mem_type == DAT_MEM_TYPE_SHARED_VIRTUAL)        
+	if (lmr->param.mem_type == DAT_MEM_TYPE_SHARED_VIRTUAL) {
+		dapl_dbg_log( DAPL_DBG_TYPE_ERR,
+		     " mr_register_shared: NOT IMPLEMENTED\n");    
 		return DAT_ERROR (DAT_NOT_IMPLEMENTED, DAT_NO_SUBTYPE);  
+	}
 
 	/* local read is default on IB */ 
 	lmr->mr_handle = 
@@ -266,6 +268,7 @@ dapls_ib_mr_register_shared (
 	IN  DAPL_LMR		    *lmr,
 	IN  DAT_MEM_PRIV_FLAGS	privileges )
 {
+    dapl_dbg_log(DAPL_DBG_TYPE_ERR," mr_register_shared: NOT IMPLEMENTED\n");
     return DAT_ERROR (DAT_NOT_IMPLEMENTED, DAT_NO_SUBTYPE);  
 }
 
@@ -289,6 +292,8 @@ DAT_RETURN
 dapls_ib_mw_alloc (
 	IN  DAPL_RMR	*rmr )
 {
+
+	dapl_dbg_log(DAPL_DBG_TYPE_ERR," mw_alloc: NOT IMPLEMENTED\n");
    	return DAT_ERROR (DAT_NOT_IMPLEMENTED, DAT_NO_SUBTYPE);  
 }
 
@@ -312,6 +317,7 @@ DAT_RETURN
 dapls_ib_mw_free (
 	IN  DAPL_RMR 	*rmr )
 {	
+	dapl_dbg_log(DAPL_DBG_TYPE_ERR," mw_free: NOT IMPLEMENTED\n");
 	return DAT_ERROR (DAT_NOT_IMPLEMENTED, DAT_NO_SUBTYPE);  
 }
 
@@ -343,6 +349,7 @@ dapls_ib_mw_bind (
 	IN  DAT_MEM_PRIV_FLAGS		mem_priv,
 	IN  DAT_BOOLEAN			is_signaled)
 {
+	dapl_dbg_log(DAPL_DBG_TYPE_ERR," mw_bind: NOT IMPLEMENTED\n");
 	return DAT_ERROR (DAT_NOT_IMPLEMENTED, DAT_NO_SUBTYPE);  
 }
 
@@ -371,6 +378,7 @@ dapls_ib_mw_unbind (
 	IN  DAPL_COOKIE	*cookie,
 	IN  DAT_BOOLEAN	is_signaled )
 {
+	dapl_dbg_log(DAPL_DBG_TYPE_ERR," mw_unbind: NOT IMPLEMENTED\n");
 	return DAT_ERROR (DAT_NOT_IMPLEMENTED, DAT_NO_SUBTYPE);  
 }
 
Index: openib/dapl_ib_cm.c
===================================================================
--- openib/dapl_ib_cm.c	(revision 2190)
+++ openib/dapl_ib_cm.c	(working copy)
@@ -74,10 +74,12 @@ static DAT_RETURN dapli_socket_listen ( 
 					DAT_CONN_QUAL		serviceID,
 					DAPL_SP			*sp_ptr );
 
-static DAT_RETURN dapli_socket_accept(	DAPL_EP			*ep_ptr,
-					DAPL_CR			*cr_ptr,
-					DAT_COUNT		p_size,
-					DAT_PVOID		p_data );
+static DAT_RETURN dapli_socket_accept(	ib_cm_srvc_handle_t cm_ptr );
+
+static DAT_RETURN dapli_socket_accept_final(	DAPL_EP		*ep_ptr,
+						DAPL_CR		*cr_ptr,
+						DAT_COUNT	p_size,
+						DAT_PVOID	p_data );
 
 /* XXX temporary hack to get lid */
 static uint16_t dapli_get_lid(IN struct ibv_device *dev, IN int port)
@@ -114,6 +116,7 @@ dapli_socket_connect (	DAPL_EP			*ep_ptr
 	DAPL_IA		*ia_ptr = ep_ptr->header.owner_ia;
 	int		len, opt = 1;
 	struct iovec    iovec[2];
+	short		rtu_data = htons(0x0E0F);
 	
 	dapl_dbg_log(DAPL_DBG_TYPE_EP, " connect: r_qual %d\n", r_qual);
 			
@@ -133,7 +136,7 @@ dapli_socket_connect (	DAPL_EP			*ep_ptr
 		return DAT_INSUFFICIENT_RESOURCES;
 	}
 
-	((struct sockaddr_in*)r_addr)->sin_port = r_qual;
+	((struct sockaddr_in*)r_addr)->sin_port = htons(r_qual);
 
 	if ( connect(cm_ptr->socket, r_addr, sizeof(*r_addr)) < 0 ) {
 		dapl_dbg_log(DAPL_DBG_TYPE_ERR,
@@ -153,9 +156,11 @@ dapli_socket_connect (	DAPL_EP			*ep_ptr
 	cm_ptr->dst.p_size = p_size;
 	iovec[0].iov_base = &cm_ptr->dst;
 	iovec[0].iov_len  = sizeof(ib_qp_cm_t);
-	iovec[1].iov_base = p_data;
-	iovec[1].iov_len  = p_size;
-	len = writev( cm_ptr->socket, iovec, 2 );
+	if ( p_size ) {
+		iovec[1].iov_base = p_data;
+		iovec[1].iov_len  = p_size;
+	}
+	len = writev( cm_ptr->socket, iovec, (p_size ? 2:1) );
     	if ( len != (p_size + sizeof(ib_qp_cm_t)) ) {
 		dapl_dbg_log(DAPL_DBG_TYPE_ERR, 
 			     " connect write: ERR %s, wcnt=%d\n",
@@ -190,8 +195,8 @@ dapli_socket_connect (	DAPL_EP			*ep_ptr
 
 	/* read private data into cm_handle if any present */
 	if ( cm_ptr->dst.p_size ) {
-		iovec[1].iov_base = cm_ptr->p_data;
-		iovec[1].iov_len  = cm_ptr->dst.p_size;
+		iovec[0].iov_base = cm_ptr->p_data;
+		iovec[0].iov_len  = cm_ptr->dst.p_size;
 		len = readv( cm_ptr->socket, iovec, 1 );
 		if ( len != cm_ptr->dst.p_size ) {
 			dapl_dbg_log(DAPL_DBG_TYPE_ERR, 
@@ -213,10 +218,11 @@ dapli_socket_connect (	DAPL_EP			*ep_ptr
 	ep_ptr->qp_state = IB_QP_STATE_RTS;
 
 	/* complete handshake after final QP state change */
-	write(cm_ptr->socket, "QP_RTR_RTS", sizeof "QP_RTR_RTS");
+	write(cm_ptr->socket, &rtu_data, sizeof(rtu_data) );
 
 	/* init cm_handle and post the event with private data */
 	ep_ptr->cm_handle = cm_ptr;
+	dapl_dbg_log( DAPL_DBG_TYPE_EP," ACTIVE: connected!\n" ); 
 	dapl_evd_connection_callback(   ep_ptr->cm_handle, 
 					IB_CME_CONNECTED, 
 					cm_ptr->p_data, 
@@ -248,9 +254,7 @@ dapli_socket_listen (	DAPL_IA		*ia_ptr,
 {
 	struct sockaddr_in	addr;
 	ib_cm_srvc_handle_t	cm_ptr = NULL;
-	void			*p_data = NULL;
-	int			l_sock = -1;
-	int			len, opt = 1;
+	int			opt = 1;
 	DAT_RETURN		dat_status = DAT_SUCCESS;
 
 	dapl_dbg_log (	DAPL_DBG_TYPE_EP,
@@ -263,26 +267,30 @@ dapli_socket_listen (	DAPL_IA		*ia_ptr,
 
 	(void) dapl_os_memzero( cm_ptr, sizeof( *cm_ptr ) );
 	
-	cm_ptr->socket = -1;
+	cm_ptr->socket = cm_ptr->l_socket = -1;
 	cm_ptr->sp = sp_ptr;
 	cm_ptr->hca_ptr = ia_ptr->hca_ptr;
 	
 	/* bind, listen, set sockopt, accept, exchange data */
-	if ((l_sock = socket(AF_INET, SOCK_STREAM, 0)) < 0) {
+	if ((cm_ptr->l_socket = socket(AF_INET, SOCK_STREAM, 0)) < 0) {
 		dapl_dbg_log (DAPL_DBG_TYPE_ERR, 
 				"socket for listen returned %d\n", errno);
 		dat_status = DAT_INSUFFICIENT_RESOURCES;
 		goto bail;
 	}
 
-	setsockopt(l_sock,SOL_SOCKET,SO_REUSEADDR,&opt,sizeof(opt));
-	addr.sin_port        = serviceID;
+	setsockopt(cm_ptr->l_socket,SOL_SOCKET,SO_REUSEADDR,&opt,sizeof(opt));
+	addr.sin_port        = htons(serviceID);
 	addr.sin_family      = AF_INET;
 	addr.sin_addr.s_addr = INADDR_ANY;
 
-	if (( bind( l_sock,(struct sockaddr*)&addr, sizeof(addr) ) < 0) ||
-		   (listen( l_sock, 1 ) < 0) ) {
+	if (( bind( cm_ptr->l_socket,(struct sockaddr*)&addr, sizeof(addr) ) < 0) ||
+		   (listen( cm_ptr->l_socket, 128 ) < 0) ) {
 	
+		dapl_dbg_log( DAPL_DBG_TYPE_ERR,
+				" listen: ERROR %s on conn_qual 0x%x\n",
+				strerror(errno),serviceID); 
+
 		if ( errno == EADDRINUSE )
 			dat_status = DAT_CONN_QUAL_IN_USE;
 		else
@@ -290,109 +298,144 @@ dapli_socket_listen (	DAPL_IA		*ia_ptr,
 
 		goto bail;
 	}
+	
+	/* set cm_handle for this service point, save listen socket */
+	sp_ptr->cm_srvc_handle = cm_ptr;
 
-	/* block on the accept */
-        len = sizeof(cm_ptr->dst.ia_address);
-        cm_ptr->socket = accept(l_sock, 
-				(struct sockaddr*)&cm_ptr->dst.ia_address, 
+	/* add to SP->CR thread list */
+	dapl_llist_init_entry((DAPL_LLIST_ENTRY*)&cm_ptr->entry);
+	dapl_os_lock( &cm_ptr->hca_ptr->ib_trans.lock );
+	dapl_llist_add_tail(&cm_ptr->hca_ptr->ib_trans.list, 
+			    (DAPL_LLIST_ENTRY*)&cm_ptr->entry, cm_ptr);
+	dapl_os_unlock(&cm_ptr->hca_ptr->ib_trans.lock);
+
+	dapl_dbg_log( DAPL_DBG_TYPE_CM,
+			" listen: qual 0x%x cr %p s_fd %d\n",
+			ntohs(serviceID), cm_ptr, cm_ptr->l_socket ); 
+	
+	return dat_status;
+bail:
+	dapl_dbg_log( DAPL_DBG_TYPE_ERR,
+			" listen: ERROR on conn_qual 0x%x\n",serviceID); 
+	if ( cm_ptr->l_socket >= 0 )
+		close( cm_ptr->l_socket );
+	dapl_os_free( cm_ptr, sizeof( *cm_ptr ) );
+	return dat_status;
+}
+
+
+/*
+ * PASSIVE: send local QP information, private data, and wait for 
+ *	    active side to respond with QP RTS/RTR status 
+ */
+static DAT_RETURN 
+dapli_socket_accept(ib_cm_srvc_handle_t cm_ptr)
+{
+	ib_cm_handle_t	acm_ptr;
+	void		*p_data = NULL;
+	int		len;
+	DAT_RETURN	dat_status = DAT_SUCCESS;
+		
+	/* Allocate accept CM and initialize */
+	if ((acm_ptr = dapl_os_alloc(sizeof(*acm_ptr))) == NULL) 
+		return DAT_INSUFFICIENT_RESOURCES;
+
+	(void) dapl_os_memzero( acm_ptr, sizeof( *acm_ptr ) );
+	
+	acm_ptr->socket = -1;
+	acm_ptr->sp = cm_ptr->sp;
+	acm_ptr->hca_ptr = cm_ptr->hca_ptr;
+
+	len = sizeof(acm_ptr->dst.ia_address);
+	acm_ptr->socket = accept(cm_ptr->l_socket, 
+				(struct sockaddr*)&acm_ptr->dst.ia_address, 
 				&len );
 
-        if ( cm_ptr->socket < 0 ) {
+	if ( acm_ptr->socket < 0 ) {
 		dapl_dbg_log(DAPL_DBG_TYPE_ERR, 
-			     " listen accept: ERR %s\n",strerror(errno)); 
+			" accept: ERR %s on FD %d l_cr %p\n",
+			strerror(errno),cm_ptr->l_socket,cm_ptr); 
 		dat_status = DAT_INTERNAL_ERROR;
 		goto bail;
    	}
 
 	/* read in DST QP info, IA address. check for private data */
-	len = read( cm_ptr->socket, &cm_ptr->dst, sizeof(ib_qp_cm_t) );
+	len = read( acm_ptr->socket, &acm_ptr->dst, sizeof(ib_qp_cm_t) );
 	if ( len != sizeof(ib_qp_cm_t) ) {
 		dapl_dbg_log(DAPL_DBG_TYPE_ERR, 
-			     " listen read: ERR %s, rcnt=%d\n",
-			     strerror(errno), len); 
+			" accept read: ERR %s, rcnt=%d\n",
+			strerror(errno), len); 
 		dat_status = DAT_INTERNAL_ERROR;
 		goto bail;
 
 	}
-
 	dapl_dbg_log(DAPL_DBG_TYPE_EP, 
-		     " listen: DST port=0x%x lid=0x%x, qpn=0x%x, psize=%d\n",
-		     cm_ptr->dst.port, cm_ptr->dst.lid, 
-		     cm_ptr->dst.qpn, cm_ptr->dst.p_size ); 
+		" accept: DST port=0x%x lid=0x%x, qpn=0x%x, psize=%d\n",
+		acm_ptr->dst.port, acm_ptr->dst.lid, 
+		acm_ptr->dst.qpn, acm_ptr->dst.p_size ); 
 
 	/* validate private data size before reading */
-	if ( cm_ptr->dst.p_size > IB_MAX_REQ_PDATA_SIZE ) {
+	if ( acm_ptr->dst.p_size > IB_MAX_REQ_PDATA_SIZE ) {
 		dapl_dbg_log(DAPL_DBG_TYPE_ERR, 
-			     " listen read: psize (%d) wrong\n",
-			     cm_ptr->dst.p_size ); 
+			" accept read: psize (%d) wrong\n",
+			acm_ptr->dst.p_size ); 
 		dat_status = DAT_INTERNAL_ERROR;
 		goto bail;
 	}
 
 	/* read private data into cm_handle if any present */
-	if ( cm_ptr->dst.p_size ) {
-		len = read( cm_ptr->socket, 
-			    cm_ptr->p_data, 
-			    cm_ptr->dst.p_size );
-		if ( len != cm_ptr->dst.p_size ) {
+	if ( acm_ptr->dst.p_size ) {
+		len = read( acm_ptr->socket, 
+			    acm_ptr->p_data, acm_ptr->dst.p_size );
+		if ( len != acm_ptr->dst.p_size ) {
 			dapl_dbg_log(DAPL_DBG_TYPE_ERR, 
-				" listen read pdata: ERR %s, rcnt=%d\n",
+				" accept read pdata: ERR %s, rcnt=%d\n",
 				strerror(errno), len ); 
 			dat_status = DAT_INTERNAL_ERROR;
 			goto bail;
 		}
-		p_data = cm_ptr->p_data;
+		dapl_dbg_log(DAPL_DBG_TYPE_EP, 
+				" accept: psize=%d read\n",
+				acm_ptr->dst.p_size); 
+		p_data = acm_ptr->p_data;
 	}
-		
-	/* set cm_handle for this service point */
-	sp_ptr->cm_srvc_handle = cm_ptr;
 	
-	/* 
-	 * dapls_ib_accept_connection send QP information
-	 * and complete CM handshake
-	 */
-
 	/* trigger CR event and return SUCCESS */
-	dapls_cr_callback(  cm_ptr,
+	dapls_cr_callback(  acm_ptr,
 			    IB_CME_CONNECTION_REQUEST_PENDING,
 		            p_data,
-			    sp_ptr );
+			    acm_ptr->sp );
 
-	return dat_status;
+	return DAT_SUCCESS;
 
 bail:
-	if ( l_sock >= 0 )
-		close( l_sock );
-	if ( cm_ptr->socket >= 0 )
-		close( cm_ptr->socket );
-	if ( cm_ptr )
-    		dapl_os_free( cm_ptr, sizeof( *cm_ptr ) );
-
-	return dat_status;
+	if ( acm_ptr->socket >=0 )
+		close( acm_ptr->socket );
+	dapl_os_free( acm_ptr, sizeof( *acm_ptr ) );
+	return DAT_INTERNAL_ERROR;
 }
 
 
-
-/*
- * PASSIVE: send local QP information, private data, and wait for 
- *	    active side to respond with QP RTS/RTR status 
- */
 static DAT_RETURN 
-dapli_socket_accept( DAPL_EP		*ep_ptr,
-		     DAPL_CR		*cr_ptr,
-		     DAT_COUNT		p_size,
-		     DAT_PVOID		p_data )
+dapli_socket_accept_final( DAPL_EP		*ep_ptr,
+			   DAPL_CR		*cr_ptr,
+			   DAT_COUNT		p_size,
+		           DAT_PVOID		p_data )
 {
-	ib_cm_handle_t	cm_ptr = cr_ptr->ib_cm_handle;
 	DAPL_IA		*ia_ptr = ep_ptr->header.owner_ia;
+	ib_cm_handle_t	cm_ptr = cr_ptr->ib_cm_handle;
 	ib_qp_cm_t	qp_cm;
 	struct iovec    iovec[2];
 	int		len;
-	char		r_buf[10] = "XX_XXX_XXX";
+	short		rtu_data = 0;
 
 	if (p_size >  IB_MAX_REP_PDATA_SIZE) 
-		return (DAT_LENGTH_ERROR);
+		return DAT_LENGTH_ERROR;
 
+	/* must have a accepted socket */
+	if ( cm_ptr->socket < 0 )
+		return DAT_INTERNAL_ERROR;
+	
 	/* modify QP to RTR and then to RTS with remote info already read */
 	if ( dapls_modify_qp_state( ep_ptr->qp_handle, 
 				    IBV_QPS_RTR, &cm_ptr->dst ) != DAT_SUCCESS )
@@ -413,42 +456,42 @@ dapli_socket_accept( DAPL_EP		*ep_ptr,
 	qp_cm.p_size = p_size;
 	iovec[0].iov_base = &qp_cm;
 	iovec[0].iov_len  = sizeof(ib_qp_cm_t);
-	iovec[1].iov_base = p_data;
-	iovec[1].iov_len  = p_size;
-	len = writev( cm_ptr->socket, iovec, 2 );
+	if (p_size) {
+		iovec[1].iov_base = p_data;
+		iovec[1].iov_len  = p_size;
+	}
+	len = writev( cm_ptr->socket, iovec, (p_size ? 2:1) );
     	if (len != (p_size + sizeof(ib_qp_cm_t))) {
 		dapl_dbg_log(DAPL_DBG_TYPE_ERR, 
-			     " connect write: ERR %s, wcnt=%d\n",
+			     " accept_final: ERR %s, wcnt=%d\n",
 			     strerror(errno), len); 
 		goto bail;
 	}
 	dapl_dbg_log(DAPL_DBG_TYPE_EP, 
-		     " accept: SRC port=0x%x lid=0x%x, qpn=0x%x, psize=%d\n",
+		     " accept_final: SRC port=0x%x lid=0x%x, qpn=0x%x, psize=%d\n",
 		     qp_cm.port, qp_cm.lid, qp_cm.qpn, qp_cm.p_size ); 
-			 
+	
 	/* complete handshake after final QP state change */
-	len = read(cm_ptr->socket, r_buf, sizeof(r_buf) );
-	if ( len != sizeof(r_buf) ) {
+	len = read(cm_ptr->socket, &rtu_data, sizeof(rtu_data) );
+	if ( len != sizeof(rtu_data) || ntohs(rtu_data) != 0x0e0f ) {
 		dapl_dbg_log(DAPL_DBG_TYPE_ERR, 
-			     " accept: ERR %s, rcnt=%d\n",
-			     strerror(errno), len); 
+			     " accept_final: ERR %s, rcnt=%d rdata=%x\n",
+			     strerror(errno), len, ntohs(rtu_data) ); 
 		goto bail;
 	}
 
 	/* final data exchange if remote QP state is good to go */
-	dapl_dbg_log( DAPL_DBG_TYPE_EP," accept: %s \n", r_buf); 
-
-	dapls_cr_callback ( cm_ptr,
-    			    IB_CME_CONNECTED,
-    			    NULL,
-			    cm_ptr->sp );
-
+	dapl_dbg_log( DAPL_DBG_TYPE_EP," PASSIVE: connected!\n" ); 
+	dapls_cr_callback ( cm_ptr, IB_CME_CONNECTED, NULL, cm_ptr->sp );
 	return DAT_SUCCESS;
 
 bail:
-	dapl_dbg_log( DAPL_DBG_TYPE_ERR," accept: ERR !QP_RTR_RTS \n"); 
-	close( cm_ptr->socket );
+	dapl_dbg_log( DAPL_DBG_TYPE_ERR," accept_final: ERR !QP_RTR_RTS \n"); 
+	if ( cm_ptr >= 0 )
+		close( cm_ptr->socket );
+	dapl_os_free( cm_ptr, sizeof( *cm_ptr ) );
 	dapls_ib_reinit_ep( ep_ptr ); /* reset QP state */
+
 	return DAT_INTERNAL_ERROR;
 }
 
@@ -482,7 +525,7 @@ dapls_ib_connect (
 	IN  DAT_IA_ADDRESS_PTR		remote_ia_address,
 	IN  DAT_CONN_QUAL		remote_conn_qual,
 	IN  DAT_COUNT			private_data_size,
-	IN  DAT_PVOID			private_data )
+	IN  void			*private_data )
 {
 	DAPL_EP		*ep_ptr;
 	ib_qp_handle_t	qp_ptr;
@@ -545,18 +588,19 @@ dapls_ib_disconnect (
 	dapls_ib_reinit_ep(ep_ptr);
 
 #endif
-	
-	if ( ep_ptr->cr_ptr )	
+	if ( ep_ptr->cr_ptr ) {
 		dapls_cr_callback ( ep_ptr->cm_handle,
 				    IB_CME_DISCONNECTED,
 				    NULL,
 				    ((DAPL_CR *)ep_ptr->cr_ptr)->sp_ptr );
-	else
+	} else {
 		dapl_evd_connection_callback ( ep_ptr->cm_handle,
 						IB_CME_DISCONNECTED,
 						NULL,
 						ep_ptr );
-
+		ep_ptr->cm_handle = NULL;
+		dapl_os_free( cm_ptr, sizeof( *cm_ptr ) );
+	}	
 	return DAT_SUCCESS;
 }
 
@@ -584,7 +628,6 @@ dapls_ib_disconnect_clean (
 	IN  DAT_BOOLEAN			active,
 	IN  const ib_cm_events_t	ib_cm_event )
 {
-    
     return;
 }
 
@@ -644,25 +687,22 @@ dapls_ib_remove_conn_listener (
 	IN  DAPL_IA		*ia_ptr,
 	IN  DAPL_SP		*sp_ptr )
 {
-
 	ib_cm_srvc_handle_t	cm_ptr = sp_ptr->cm_srvc_handle;
 
 	dapl_dbg_log (DAPL_DBG_TYPE_EP,
 			"dapls_ib_remove_conn_listener(ia_ptr %p sp_ptr %p cm_ptr %p)\n",
 			ia_ptr, sp_ptr, cm_ptr );
 #ifdef SOCKET_CM
-
 	/* close accepted socket, free cm_srvc_handle and return */
 	if ( cm_ptr != NULL ) {
-		if ( cm_ptr->socket > 0 ) {
-			close( cm_ptr->socket );
-			cm_ptr->socket = 0;
+		if ( cm_ptr->l_socket >= 0 ) {
+			close( cm_ptr->l_socket );
+			cm_ptr->socket = -1;
 		}
-	    	dapl_os_free( cm_ptr, sizeof( *cm_ptr ) );
+	    	/* cr_thread will free */
 		sp_ptr->cm_srvc_handle = NULL;
 	}
 	return DAT_SUCCESS;
-
 #else
 	return DAT_NOT_IMPLEMENTED;   
 
@@ -717,7 +757,7 @@ dapls_ib_accept_connection (
 	}
     
 #ifdef SOCKET_CM
-	return ( dapli_socket_accept(ep_ptr, cr_ptr, p_size, p_data) );
+	return ( dapli_socket_accept_final(ep_ptr, cr_ptr, p_size, p_data) );
 #else
 	return DAT_NOT_IMPLEMENTED;   
 #endif
@@ -756,13 +796,13 @@ dapls_ib_reject_connection (
 	/* just close the socket and return */
 	if ( cm_ptr->socket > 0 ) {
 		close( cm_ptr->socket );
-		cm_ptr->socket = 0;
+		cm_ptr->socket = -1;
 	}
-	
 	return DAT_SUCCESS;
-
+#else
+	return DAT_NOT_IMPLEMENTED;   
 #endif
-	return DAT_SUCCESS;
+	
 
 }
 
@@ -984,6 +1024,76 @@ dapls_ib_get_cm_event (
     return ib_cm_event;
 }
 
+/* async CR processing thread to avoid blocking applications */
+void cr_thread(void *arg) 
+{
+    struct dapl_hca	*hca_ptr = arg;
+    ib_cm_srvc_handle_t	cr, next_cr;
+    int			max_fd;
+    fd_set		rfd,rfds;
+    struct timeval	to;
+     
+    dapl_os_lock( &hca_ptr->ib_trans.lock );
+    while ( !hca_ptr->ib_trans.destroy ) {
+	
+	FD_ZERO( &rfds ); 
+	max_fd = -1;
+	
+	if (!dapl_llist_is_empty(&hca_ptr->ib_trans.list))
+            next_cr = dapl_llist_peek_head (&hca_ptr->ib_trans.list);
+	else
+	    next_cr = NULL;
+
+	while (next_cr) {
+	    cr = next_cr;
+	    dapl_dbg_log (DAPL_DBG_TYPE_CM," thread: cm_ptr %p\n", cr );
+	    if (cr->l_socket == -1 || hca_ptr->ib_trans.destroy) {
+
+		dapl_dbg_log(DAPL_DBG_TYPE_CM," thread: Freeing %p\n", cr);
+		next_cr = dapl_llist_next_entry(&hca_ptr->ib_trans.list,
+						(DAPL_LLIST_ENTRY*)&cr->entry );
+		dapl_llist_remove_entry(&hca_ptr->ib_trans.list, 
+					(DAPL_LLIST_ENTRY*)&cr->entry);
+		dapl_os_free( cr, sizeof(*cr) );
+		continue;
+	    }
+	          
+	    FD_SET( cr->l_socket, &rfds ); /* add to select set */
+	    if ( cr->l_socket > max_fd )
+		max_fd = cr->l_socket;
+
+	    /* individual select poll to check for work */
+	    FD_ZERO(&rfd);
+	    FD_SET(cr->l_socket, &rfd);
+	    dapl_os_unlock(&hca_ptr->ib_trans.lock);	
+	    to.tv_sec  = 0;
+	    to.tv_usec = 0;
+	    if ( select(cr->l_socket + 1,&rfd, NULL, NULL, &to) < 0) {
+		dapl_dbg_log (DAPL_DBG_TYPE_CM,
+			  " thread: ERR %s on cr %p sk %d\n", 
+			  strerror(errno), cr, cr->l_socket);
+		close(cr->l_socket);
+		cr->l_socket = -1;
+	    } else if ( FD_ISSET(cr->l_socket, &rfd) && 
+			dapli_socket_accept(cr)) {
+		close(cr->l_socket);
+		cr->l_socket = -1;
+	    }
+	    dapl_os_lock( &hca_ptr->ib_trans.lock );
+	    next_cr =  dapl_llist_next_entry(&hca_ptr->ib_trans.list,
+					     (DAPL_LLIST_ENTRY*)&cr->entry );
+	} 
+	dapl_os_unlock( &hca_ptr->ib_trans.lock );
+	to.tv_sec  = 0;
+	to.tv_usec = 500000; /* wakeup and check destroy */
+	select(max_fd + 1, &rfds, NULL, NULL, &to);
+	dapl_os_lock( &hca_ptr->ib_trans.lock );
+    } 
+    dapl_os_unlock( &hca_ptr->ib_trans.lock );	
+    hca_ptr->ib_trans.destroy = 0;
+    dapl_dbg_log(DAPL_DBG_TYPE_CM," thread(hca %p) exit\n",hca_ptr);
+}
+
 /* Real IBv CM */
 #else 
 
Index: openib/dapl_ib_qp.c
===================================================================
--- openib/dapl_ib_qp.c	(revision 2190)
+++ openib/dapl_ib_qp.c	(working copy)
@@ -1,26 +1,25 @@
 /*
- * Copyright (c) 2002-2003, Network Appliance, Inc. All rights reserved.
- *
- * This Software is licensed under either one of the following two licenses:
+ * This Software is licensed under one of the following licenses:
  *
  * 1) under the terms of the "Common Public License 1.0" a copy of which is
- *    in the file LICENSE.txt in the root directory. The license is also
  *    available from the Open Source Initiative, see
  *    http://www.opensource.org/licenses/cpl.php.
- * OR
  *
- * 2) under the terms of the "The BSD License" a copy of which is in the file
- *    LICENSE2.txt in the root directory. The license is also available from
- *    the Open Source Initiative, see
+ * 2) under the terms of the "The BSD License" a copy of which is
+ *    available from the Open Source Initiative, see
  *    http://www.opensource.org/licenses/bsd-license.php.
  *
- * Licensee has the right to choose either one of the above two licenses.
+ * 3) under the terms of the "GNU General Public License (GPL) Version 2" a
+ *    copy of which is available from the Open Source Initiative, see
+ *    http://www.opensource.org/licenses/gpl-license.php.
+ *
+ * Licensee has the right to choose one of the above licenses.
  *
- * Redistributions of source code must retain both the above copyright
- * notice and either one of the license notices.
+ * Redistributions of source code must retain the above copyright
+ * notice and one of the license notices.
  *
  * Redistributions in binary form must reproduce both the above copyright
- * notice, either one of the license notices in the documentation
+ * notice, one of the license notices in the documentation
  * and/or other materials provided with the distribution.
  */
 
@@ -30,7 +29,7 @@
  *
  * PURPOSE: QP routines for access to DET Verbs
  *
- * $Id:$
+ * $Id: $
  **********************************************************************/
 
 #include "dapl.h"
@@ -311,7 +310,7 @@ dapls_modify_qp_state ( IN ib_qp_handle_
 			qp_attr.path_mtu 		= IBV_MTU_1024;
 			qp_attr.dest_qp_num 		= qp_cm->qpn;
 			qp_attr.rq_psn 			= 1;
-			qp_attr.max_dest_rd_atomic	= 1;
+			qp_attr.max_dest_rd_atomic	= 8;
 			qp_attr.min_rnr_timer		= 12;
 			qp_attr.ah_attr.is_global	= 0;
 			qp_attr.ah_attr.dlid		= qp_cm->lid;
@@ -338,7 +337,7 @@ dapls_modify_qp_state ( IN ib_qp_handle_
 			qp_attr.retry_cnt	= 7;
 			qp_attr.rnr_retry	= 7;
 			qp_attr.sq_psn		= 1;
-			qp_attr.max_rd_atomic	= 1;
+			qp_attr.max_rd_atomic	= 8;
 			dapl_dbg_log (DAPL_DBG_TYPE_EP,
 			      " modify_qp_rts: psn %x or %x\n",
 			      qp_attr.sq_psn, qp_attr.max_rd_atomic );
Index: openib/README
===================================================================
--- openib/README	(revision 2190)
+++ openib/README	(working copy)
@@ -41,8 +41,8 @@ A simple dapl test just for initial open
 
 known issues:
 
-	early drop, good luck! Only tested with a simple dtest.
-	see TODO for more details
-	events not working?? 
+	early drop, only tested with simple dtest and dapltest SR.
+	no memory windows support in ibverbs, dat_create_rmr fails.
+	
 
 
Index: openib/dapl_ib_util.h
===================================================================
--- openib/dapl_ib_util.h	(revision 2190)
+++ openib/dapl_ib_util.h	(working copy)
@@ -79,10 +79,23 @@ typedef struct _ib_qp_cm
 
 } ib_qp_cm_t;
 
-/* EP->cm_handle for connect, SP->cm_srvc_handle for listen */
+/* 
+ * dapl_llist_entry in dapl.h but dapl.h depends on provider 
+ * typedef's in this file first. move dapl_llist_entry out of dapl.h
+ */
+struct ib_llist_entry
+{
+    struct dapl_llist_entry	*flink;
+    struct dapl_llist_entry	*blink;
+    void			*data;
+    struct dapl_llist_entry	*list_head;
+};
+
 struct ib_cm_handle
 { 
-	int			socket; 
+	struct ib_llist_entry	entry;
+	int			socket;
+	int			l_socket; 
 	struct dapl_hca		*hca_ptr;
 	DAT_HANDLE		cr;
 	DAT_HANDLE		sp;	
@@ -112,6 +125,9 @@ typedef enum 
 
 } ib_cm_events_t;
 
+/* prototype for cm thread */
+void cr_thread (void *arg);
+
 #else
 
 /* TODO: Waiting for IB CM to define */
@@ -205,11 +221,6 @@ typedef struct dapl_evd		*ib_wait_obj_ha
  * ibv_post_recv - Return 0, -1 & bad_wr 
  */
 
-/* definitions from libmthca/src/cq.c, should be in verbs.h */
-#define IB_CQ_OK	0
-#define IB_CQ_EMPTY	-1
-#define IB_POLL_ERR	-2
-
 /* async handler for CQ, QP, and unafiliated */
 typedef void (*ib_async_handler_t)(
     IN    ib_hca_handle_t    ib_hca_handle,
@@ -221,11 +232,18 @@ typedef struct _ib_hca_transport
 { 
 	struct	ibv_device	*ib_dev;
 	ib_cq_handle_t		ib_cq_empty;
+
+#if SOCKET_CM
+	int			destroy;
+	DAPL_OS_THREAD		thread;
+	DAPL_OS_LOCK		lock;	
+	struct dapl_llist_entry	*list;	
+#endif
 	ib_async_handler_t	async_unafiliated;
 	ib_async_handler_t	async_cq_error;
-	ib_async_handler_t	async_cq_completion;
+	ib_async_handler_t	async_cq;
 	ib_async_handler_t	async_qp_error;
-	
+
 } ib_hca_tranport_t;
 
 /* provider specfic fields for shared memory support */
Index: openib/dapl_ib_cq.c
===================================================================
--- openib/dapl_ib_cq.c	(revision 2190)
+++ openib/dapl_ib_cq.c	(working copy)
@@ -382,10 +382,10 @@ DAT_RETURN dapls_ib_completion_notify (
  * Output:
  * 	none
  *
- * Returns:
+ * Returns: 
  * 	DAT_SUCCESS
  *	DAT_QUEUE_EMPTY
- *	dapl_convert_errno
+ *	
  */
 DAT_RETURN dapls_ib_completion_poll (
 	IN  DAPL_HCA			*hca_ptr,
@@ -393,15 +393,12 @@ DAT_RETURN dapls_ib_completion_poll (
 	IN  ib_work_completion_t	*wc_ptr)
 {
 	int	ret;
-    	
-	ret = ibv_poll_cq(evd_ptr->ib_cq_handle, 1, wc_ptr);
+
+    	ret = ibv_poll_cq(evd_ptr->ib_cq_handle, 1, wc_ptr);
 	if (ret == 1) 
 		return	DAT_SUCCESS;
-	else if ((ret == IB_CQ_OK) || (ret == IB_CQ_EMPTY)) 
-		return	DAT_QUEUE_EMPTY;
-	else 
-		return(dapl_convert_errno(EFAULT,"poll_cq"));;
-
+	
+	return	DAT_QUEUE_EMPTY;
 }
 
 #ifdef CQ_WAIT_OBJECT
@@ -447,24 +444,45 @@ dapls_ib_wait_object_wait (
 	IN ib_wait_obj_handle_t	    p_cq_wait_obj_handle,
 	IN u_int32_t 		    timeout)
 {
-	int status;
-	ib_cq_handle_t	cq = p_cq_wait_obj_handle->ib_cq_handle;
-	struct ibv_cq	*ibv_cq;
-	void		*ibv_ctx;
+	DAPL_EVD		*evd_ptr = p_cq_wait_obj_handle;
+	ib_cq_handle_t		cq = evd_ptr->ib_cq_handle;
+	struct ibv_cq		*ibv_cq;
+	void			*ibv_ctx;
+	int			status = EINVAL; /* invalid handle */
 
 	dapl_dbg_log ( DAPL_DBG_TYPE_UTIL, 
 			" cq_object_wait: dev %p evd %p cq %p, time %d\n", 
-			cq->context, p_cq_wait_obj_handle, cq, timeout );
+			cq->context, evd_ptr, cq, timeout );
 
-	/* will block forever, only 1 per device for now?? */
 	/* TODO: add timeout, map each CQ created?? */
-	if (cq) {
-		status = ibv_get_cq_event(cq->context, 0, &ibv_cq, &ibv_ctx);
-		if (!status && (ibv_cq == cq)) 
-			return DAT_SUCCESS;
+	/* Multiple EVD's sharing one event handle for now */
+	while (evd_ptr->ib_cq_handle) {
+		
+		status = ibv_get_cq_event(cq->context, 
+					  0, &ibv_cq, &ibv_ctx);
+		if (status) 
+			break;
+
+		/* EVD mismatch, process DTO callback for this EVD */
+		if (ibv_cq != cq) {
+			ib_hca_tranport_t *hca_ptr = 
+				&evd_ptr->header.owner_ia->hca_ptr->ib_trans;
+							
+			if ( hca_ptr->async_cq ) 
+				hca_ptr->async_cq(cq->context,
+						  (ib_error_record_t*)ibv_cq,
+						  ibv_ctx);
+			
+			continue;
+		} 
+		break;
 	}
 	
-	return(dapl_convert_errno(EFAULT,"cq_wait_object_wait"));
+	dapl_dbg_log (DAPL_DBG_TYPE_UTIL, 
+		      " cq_object_wait: RET cq %p ibv_cq %p ibv_ctx %p %x\n",
+		      cq,ibv_cq,ibv_ctx,status);
+	
+	return(dapl_convert_errno(status,"cq_wait_object_wait"));
 	
 }
 #endif





More information about the general mailing list