[openib-general] [PATCH] uDAPL with uCM and uAT support - version 2

Arlin Davis arlin.r.davis at intel.com
Mon Aug 1 14:59:01 PDT 2005


James,

Here is version 2 with the changes you requested. Also, README updated per Hal's comments.

 dapl/udapl/dapl_evd_wait.c
 dapl/udapl/Makefile
 dapl/common/dapl_evd_resize.c
 dapl/openib/TODO
 dapl/openib/dapl_ib_util.c
 dapl/openib/dapl_ib_cm.c
 dapl/openib/dapl_ib_util.h
 dapl/openib/README
 dapl/openib/dapl_ib_cq.c

Signed-off by: Arlin Davis <ardavis at ichips.intel.com>

Index: dapl/udapl/dapl_evd_wait.c
===================================================================
--- dapl/udapl/dapl_evd_wait.c	(revision 2919)
+++ dapl/udapl/dapl_evd_wait.c	(working copy)
@@ -74,9 +74,10 @@
     DAPL_EVD		*evd_ptr;
     DAT_RETURN		dat_status;
     DAT_EVENT		*local_event;
-    DAT_BOOLEAN		notify_requested = DAT_FALSE;
+    DAT_BOOLEAN		notify_needed = DAT_FALSE;
     DAT_BOOLEAN		waitable;
     DAPL_EVD_STATE	evd_state;
+    DAT_COUNT		total_events,new_events;
 
     dapl_dbg_log (DAPL_DBG_TYPE_API,
 		  "dapl_evd_wait (%p, %d, %d, %p, %p)\n", 
@@ -124,9 +125,9 @@
     }
 
     dapl_dbg_log (DAPL_DBG_TYPE_EVD, 
-	          "dapl_evd_wait: EVD %p, CQ %p\n", 
-                  evd_ptr,
-		  (void *)evd_ptr->ib_cq_handle);
+	          "dapl_evd_wait: EVD %p, CQ %p, Timeout %d, Threshold %d\n", 
+              evd_ptr,(void *)evd_ptr->ib_cq_handle, time_out, threshold);
+  
 
     /*
      * Make sure there are no other waiters and the evd is active.
@@ -144,11 +145,10 @@
     evd_state = dapl_os_atomic_assign ( (DAPL_ATOMIC *)&evd_ptr->evd_state,
 					(DAT_COUNT) DAPL_EVD_STATE_OPEN,
 					(DAT_COUNT) DAPL_EVD_STATE_WAITED );
-    dapl_os_unlock ( &evd_ptr->header.lock );
 
-    if ( evd_state != DAPL_EVD_STATE_OPEN )
+    dapl_os_unlock ( &evd_ptr->header.lock );
+    if ( evd_state != DAPL_EVD_STATE_OPEN || !waitable)
     {
-	/* Bogus state, bail out */
 	dat_status = DAT_ERROR (DAT_INVALID_STATE,0);
 	goto bail;
     }
@@ -182,37 +182,54 @@
 	 * return right away if the ib_cq_handle associate with these evd
 	 * equal to IB_INVALID_HANDLE
 	 */
-	dapls_evd_copy_cq(evd_ptr);
-
-	if (dapls_rbuf_count(&evd_ptr->pending_event_queue) >= threshold)
-	{
-	    break;
-	}
-
-	/*
-	 * Do not enable the completion notification if this evd is not 
-	 * a DTO_EVD or RMR_BIND_EVD
+	/* Logic to prevent missing completion between copy_cq (poll)
+	 * and completion_notify (re-arm)  
 	 */
-	if ( (!notify_requested) &&
-             ((evd_ptr->evd_flags & DAT_EVD_DTO_FLAG) ||
-              (evd_ptr->evd_flags & DAT_EVD_RMR_BIND_FLAG)) )
+	notify_needed = DAT_TRUE;
+	new_events = 0;
+	while (DAT_TRUE)
 	{
-	    dat_status = dapls_ib_completion_notify (
-		evd_ptr->header.owner_ia->hca_ptr->ib_hca_handle,
-		evd_ptr,
-		(evd_ptr->completion_type == DAPL_EVD_STATE_SOLICITED_WAIT) ?
-		     IB_NOTIFY_ON_SOLIC_COMP : IB_NOTIFY_ON_NEXT_COMP );  
-
-	    DAPL_CNTR(DCNT_EVD_WAIT_CMP_NTFY);
-	    /* FIXME report error */
-	    dapl_os_assert(dat_status == DAT_SUCCESS);
+		dapls_evd_copy_cq(evd_ptr); /* poll for new completions */
+		total_events = dapls_rbuf_count (&evd_ptr->pending_event_queue); 
+		new_events = total_events - new_events;
+		if (total_events >= threshold ||
+			(!new_events && notify_needed == DAT_FALSE))
+		{
+			break;
+		} 
+											
+		/*
+		 * Do not enable the completion notification if this evd is not 
+		 * a DTO_EVD or RMR_BIND_EVD
+		 */
+		if ( (evd_ptr->evd_flags & DAT_EVD_DTO_FLAG) ||
+			(evd_ptr->evd_flags & DAT_EVD_RMR_BIND_FLAG) )
+		{
+			dat_status = dapls_ib_completion_notify (
+					evd_ptr->header.owner_ia->hca_ptr->ib_hca_handle,
+					evd_ptr,
+					(evd_ptr->completion_type == DAPL_EVD_STATE_SOLICITED_WAIT)
?
+		     			IB_NOTIFY_ON_SOLIC_COMP : IB_NOTIFY_ON_NEXT_COMP );  
+
+			DAPL_CNTR(DCNT_EVD_WAIT_CMP_NTFY);
+			notify_needed = DAT_FALSE;
+			new_events = total_events;
+			
+			/* FIXME report error */
+			dapl_os_assert(dat_status == DAT_SUCCESS);
+		} 
+		else 
+		{
+			break;
+		}
 
-	    notify_requested = DAT_TRUE;
+	} /* while completions < threshold, and rearm needed */
 
-	    /* Try again.  */
-	    continue;
+	if (total_events >= threshold)
+	{
+		break;
 	}
-
+	
 
 	/*
 	 * Unused by poster; it has no way to tell how many
@@ -232,8 +249,6 @@
 #endif
 		dat_status = dapl_os_wait_object_wait (
 				&evd_ptr->wait_object, time_out );
-	
-	notify_requested = DAT_FALSE; /* We've used it up.  */
 
 	/* See if we were awakened by evd_set_unwaitable */
 	if ( !evd_ptr->evd_waitable )
@@ -243,13 +258,22 @@
 
 	if (dat_status != DAT_SUCCESS)
 	{
-	    /*
-	     * If the status is DAT_TIMEOUT, we'll break out of the
-	     * loop, *not* dequeue an event (because dat_status
-	     * != DAT_SUCCESS), set *nmore (as we should for timeout)
-	     * and return DAT_TIMEOUT.
-	     */
-	    break;
+		/*
+		 * If the status is DAT_TIMEOUT, we'll break out of the
+		 * loop, *not* dequeue an event (because dat_status
+		 * != DAT_SUCCESS), set *nmore (as we should for timeout)
+		 * and return DAT_TIMEOUT.
+		 */
+
+#if defined(DAPL_DBG)
+		dapls_evd_copy_cq(evd_ptr); /* poll */
+		dapl_dbg_log (DAPL_DBG_TYPE_EVD, 
+			"dapl_evd_wait: WAKEUP ERROR (0x%x): EVD %p, CQ %p, events? %d\n", 
+			dat_status,evd_ptr,(void *)evd_ptr->ib_cq_handle, 
+			dapls_rbuf_count(&evd_ptr->pending_event_queue) );
+#endif /* DAPL_DBG */
+
+		break;
 	}
     }
 	    
Index: dapl/udapl/Makefile
===================================================================
--- dapl/udapl/Makefile	(revision 2941)
+++ dapl/udapl/Makefile	(working copy)
@@ -122,7 +122,8 @@
 #
 ifeq ($(VERBS),openib)
 PROVIDER = $(TOPDIR)/../openib
-CFLAGS   += -DOPENIB -DCQ_WAIT_OBJECT
+CFLAGS   += -DOPENIB 
+#CFLAGS   += -DCQ_WAIT_OBJECT uncomment when fixed
 CFLAGS   += -I/usr/local/include/infiniband
 endif
 
Index: dapl/common/dapl_evd_resize.c
===================================================================
--- dapl/common/dapl_evd_resize.c	(revision 2919)
+++ dapl/common/dapl_evd_resize.c	(working copy)
@@ -67,71 +67,139 @@
 	IN	DAT_EVD_HANDLE	   evd_handle,
 	IN	DAT_COUNT	   evd_qlen )
 {
-    DAPL_IA		*ia_ptr;
-    DAPL_EVD		*evd_ptr;
-    DAT_COUNT   	pend_cnt;
-    DAT_RETURN		dat_status;
+    DAPL_IA          *ia_ptr;
+    DAPL_EVD         *evd_ptr;
+    DAT_EVENT        *event_ptr;
+    DAT_EVENT        *events;
+    DAT_EVENT        *orig_event;
+    DAPL_RING_BUFFER free_event_queue;
+    DAPL_RING_BUFFER pending_event_queue;
+    DAT_COUNT        pend_cnt;
+    DAT_COUNT        i;
+    DAT_RETURN       dat_status;
 
     dapl_dbg_log (DAPL_DBG_TYPE_API, "dapl_evd_resize (%p, %d)\n",
 		  evd_handle, evd_qlen);
 
     if (DAPL_BAD_HANDLE (evd_handle, DAPL_MAGIC_EVD))
     {
-	dat_status = DAT_ERROR (DAT_INVALID_HANDLE,0);
-	goto bail;
+        return DAT_ERROR (DAT_INVALID_PARAMETER,DAT_INVALID_ARG1);
     }
 
     evd_ptr     = (DAPL_EVD *)evd_handle;
     ia_ptr      = evd_ptr->header.owner_ia;
 
-    if ( evd_qlen == evd_ptr->qlen )
+    if ((evd_qlen <= 0) || (evd_ptr->qlen > evd_qlen))
     {
-	 dat_status = DAT_SUCCESS;
-	 goto bail;
+        dat_status = DAT_ERROR(DAT_INVALID_PARAMETER,DAT_INVALID_ARG2);
+	goto bail;
     }
 
     if ( evd_qlen > ia_ptr->hca_ptr->ia_attr.max_evd_qlen )
     {
-	dat_status = DAT_ERROR (DAT_INVALID_PARAMETER,DAT_INVALID_ARG2);
+	dat_status = DAT_ERROR(DAT_INSUFFICIENT_RESOURCES,DAT_RESOURCE_TEVD);
 	goto bail;
     }
 
     dapl_os_lock(&evd_ptr->header.lock);
 
-    /* Don't try to resize if we are actively waiting */
     if (evd_ptr->evd_state == DAPL_EVD_STATE_WAITED)
     {
-        dapl_os_unlock(&evd_ptr->header.lock);
-	dat_status = DAT_ERROR (DAT_INVALID_STATE,0);
-	goto bail;
+        dat_status = DAT_ERROR(DAT_INVALID_STATE,0);
+        goto bail_unlock;
     }
 
     pend_cnt = dapls_rbuf_count(&evd_ptr->pending_event_queue);
     if (pend_cnt > evd_qlen) {
-	dapl_os_unlock(&evd_ptr->header.lock);
-	dat_status = DAT_ERROR (DAT_INVALID_STATE,0);
-	goto bail;
+        dat_status = DAT_ERROR(DAT_INVALID_STATE,0);
+        goto bail_unlock;
     }
 
     dat_status = dapls_ib_cq_resize(evd_ptr->header.owner_ia,
-				    evd_ptr,
-				    &evd_qlen);
-    if (dat_status != DAT_SUCCESS)
+                                    evd_ptr,
+                                    &evd_qlen); 
+    if (DAT_SUCCESS != dat_status) {
+        dat_status = DAT_ERROR(DAT_INSUFFICIENT_RESOURCES,DAT_RESOURCE_MEMORY);
+        goto bail_unlock;
+    }
+
+    /* Allocate EVENTs */
+    events = (DAT_EVENT *) dapl_os_alloc (evd_qlen * sizeof (DAT_EVENT));
+    if (!events)
     {
-        dapl_os_unlock(&evd_ptr->header.lock);
-	goto bail;
+        dat_status = DAT_ERROR(DAT_INSUFFICIENT_RESOURCES,DAT_RESOURCE_MEMORY);
+        goto bail_unlock;
     }
+    event_ptr = events;
 
-    dat_status = dapls_evd_event_realloc (evd_ptr, evd_qlen);
-    if (dat_status != DAT_SUCCESS)
+    /* allocate free event queue */
+    dat_status = dapls_rbuf_alloc (&free_event_queue, evd_qlen);
+    if (DAT_SUCCESS != dat_status)
     {
-        dapl_os_unlock(&evd_ptr->header.lock);
-	goto bail;
+        dapl_os_free(event_ptr, evd_qlen * sizeof (DAT_EVENT));
+        dat_status = DAT_ERROR(DAT_INSUFFICIENT_RESOURCES,DAT_RESOURCE_MEMORY);
+        goto bail_unlock;
+    }
+
+    /* allocate pending event queue */
+    dat_status = dapls_rbuf_alloc (&pending_event_queue, evd_qlen);
+    if (DAT_SUCCESS != dat_status)
+    {
+        dapl_os_free(event_ptr, evd_qlen * sizeof (DAT_EVENT));
+        dat_status = DAT_ERROR(DAT_INSUFFICIENT_RESOURCES,DAT_RESOURCE_MEMORY);
+        goto bail_unlock;
     }
 
+    for (i = 0; i < pend_cnt; i++) 
+    {
+        orig_event = dapls_rbuf_remove(&evd_ptr->pending_event_queue);
+        if (orig_event == NULL) {
+            dapl_dbg_log (DAPL_DBG_TYPE_ERR, " Inconsistent event queue\n");
+            dapl_os_free(event_ptr, evd_qlen * sizeof (DAT_EVENT));
+	    dat_status = DAT_ERROR(DAT_INSUFFICIENT_RESOURCES,DAT_RESOURCE_MEMORY);
+	    goto bail_unlock;
+        }
+        memcpy(event_ptr, orig_event, sizeof(DAT_EVENT));
+        dat_status = dapls_rbuf_add(&pending_event_queue, event_ptr);
+        if (DAT_SUCCESS != dat_status) {
+            dapl_os_free(event_ptr, evd_qlen * sizeof (DAT_EVENT));
+	    dat_status = DAT_ERROR(DAT_INSUFFICIENT_RESOURCES,DAT_RESOURCE_MEMORY);
+	    goto bail_unlock;
+        }
+        event_ptr++;
+    }
+
+    for (i = pend_cnt; i < evd_qlen; i++)
+    {
+        dat_status = dapls_rbuf_add(&free_event_queue,(void *) event_ptr);
+        if (DAT_SUCCESS != dat_status) {
+            dapl_os_free(event_ptr, evd_qlen * sizeof (DAT_EVENT));
+	    dat_status = DAT_ERROR(DAT_INSUFFICIENT_RESOURCES,DAT_RESOURCE_MEMORY);
+	    goto bail_unlock;
+        }
+        event_ptr++;
+    }
+
+    dapls_rbuf_destroy (&evd_ptr->free_event_queue);
+    dapls_rbuf_destroy (&evd_ptr->pending_event_queue);
+    if (evd_ptr->events)
+    {
+        dapl_os_free (evd_ptr->events, evd_ptr->qlen * sizeof (DAT_EVENT));
+    }
+    evd_ptr->free_event_queue    = free_event_queue;
+    evd_ptr->pending_event_queue = pending_event_queue;
+    evd_ptr->events              = events;
+    evd_ptr->qlen                = evd_qlen;
+
+bail_unlock:
+
     dapl_os_unlock(&evd_ptr->header.lock);
 
- bail:
+    dapl_dbg_log (DAPL_DBG_TYPE_RTN,
+		  "dapl_evd_resize returns 0x%x\n",dat_status);
+
+bail:
+
     return dat_status;
 }
 
Index: dapl/openib/TODO
===================================================================
--- dapl/openib/TODO	(revision 2919)
+++ dapl/openib/TODO	(working copy)
@@ -1,7 +1,7 @@
 
 IB Verbs:
 - CQ resize?
-- query call to get current qp state 
+- query call to get current qp state, remote port number 
 - ibv_get_cq_event() needs timed event call and wakeup
 - query call to get device attributes
 - memory window support
@@ -9,8 +9,6 @@
 DAPL:
 - reinit EP needs a QP timewait completion notification
 - add cq_object wakeup, time based cq_object wait when verbs support arrives
-- update uDAPL code with real ATS support
-- etc, etc.
 
 Other:
 - Shared memory in udapl and kernel module to support?
Index: dapl/openib/dapl_ib_util.c
===================================================================
--- dapl/openib/dapl_ib_util.c	(revision 2919)
+++ dapl/openib/dapl_ib_util.c	(working copy)
@@ -111,27 +111,40 @@
 }
 
 
-/* just get IP address for hostname */
-int dapli_get_addr( char *addr, int addr_len)
+/* just get IP address, IPv4 only for now  */
+int dapli_get_hca_addr( struct dapl_hca *hca_ptr )
 {
-	struct sockaddr_in	*ipv4_addr = (struct sockaddr_in*)addr;
-	struct hostent		*h_ptr;
-	struct utsname		ourname;
-
-	if ( uname( &ourname ) < 0 ) 
-		return 1;
-
-	h_ptr = gethostbyname( ourname.nodename );
-	if ( h_ptr == NULL ) 
+	struct sockaddr_in	*ipv4_addr;
+	struct ib_at_completion	at_comp;	
+	struct dapl_at_record	at_rec;
+	int			status;
+	DAT_RETURN		dat_status;
+	
+	ipv4_addr = (struct sockaddr_in*)&hca_ptr->hca_address;
+	ipv4_addr->sin_family = AF_INET;
+	ipv4_addr->sin_addr.s_addr = 0;
+	
+	at_comp.fn = dapli_ip_comp_handler;
+	at_comp.context = &at_rec; 
+	at_rec.addr = &hca_ptr->hca_address;
+	at_rec.wait_object = &hca_ptr->ib_trans.wait_object;
+
+	/*  call with async_comp until the sync version works */
+	status = ib_at_ips_by_gid(&hca_ptr->ib_trans.gid, &ipv4_addr->sin_addr.s_addr, 1, 
+				  &at_comp, &at_rec.req_id);
+	
+	if (status < 0) 
 		return 1;
-
-	if ( h_ptr->h_addrtype == AF_INET ) {
-		ipv4_addr = (struct sockaddr_in*) addr;
-		ipv4_addr->sin_family = AF_INET;
-		dapl_os_memcpy( &ipv4_addr->sin_addr, h_ptr->h_addr_list[0], 4 );
-	} else 
+ 
+        if (status > 0)
+                dapli_ip_comp_handler(at_rec.req_id, (void*)ipv4_addr, status);
+	
+	/* wait for answer, 5 seconds max */
+	dat_status = dapl_os_wait_object_wait (&hca_ptr->ib_trans.wait_object,5000000);
+	 
+	if ((dat_status != DAT_SUCCESS ) || (!ipv4_addr->sin_addr.s_addr)) 
 		return 1;
-
+		
 	return 0;
 }
 
@@ -152,14 +165,17 @@
  */
 int32_t dapls_ib_init (void)
 {	
-	if (dapli_cm_thread_init())
-		return -1;
-	else
-		return 0;
+	dapl_dbg_log (DAPL_DBG_TYPE_UTIL, " dapl_ib_init: \n" );
+	if (dapli_cm_thread_init() || dapli_at_thread_init()) 
+		return 1;
+
+	return 0;
 }
 
 int32_t dapls_ib_release (void)
 {
+	dapl_dbg_log (DAPL_DBG_TYPE_UTIL, " dapl_ib_release: \n" );
+	dapli_at_thread_destroy();
 	dapli_cm_thread_destroy();
 	return 0;
 }
@@ -186,7 +202,6 @@
         IN   DAPL_HCA		*hca_ptr)
 {
 	struct dlist	*dev_list;
-	DAT_RETURN	dat_status = DAT_SUCCESS;
 
 	dapl_dbg_log (DAPL_DBG_TYPE_UTIL, 
 		      " open_hca: %s - %p\n", hca_name, hca_ptr );
@@ -217,36 +232,46 @@
 			      ibv_get_device_name(hca_ptr->ib_trans.ib_dev) );
 		return DAT_INTERNAL_ERROR;
 	}
-  
+
 	/* set inline max with enviromment or default, get local lid and gid 0 */
 	hca_ptr->ib_trans.max_inline_send = 
 		dapl_os_get_env_val ( "DAPL_MAX_INLINE", INLINE_SEND_DEFAULT );
 
-	if ( dapli_get_lid(hca_ptr, hca_ptr->port_num,
-			   &hca_ptr->ib_trans.lid )) {
+	if (dapli_get_lid(hca_ptr, hca_ptr->port_num,
+			   &hca_ptr->ib_trans.lid)) {
 		dapl_dbg_log (DAPL_DBG_TYPE_ERR, 
 			      " open_hca: IB get LID failed for %s\n", 
 			      ibv_get_device_name(hca_ptr->ib_trans.ib_dev) );
-		return DAT_INTERNAL_ERROR;
+		goto bail;
 	}
 
-	if ( dapli_get_gid(hca_ptr, hca_ptr->port_num, 0,  
-			   &hca_ptr->ib_trans.gid )) {
+	if (dapli_get_gid(hca_ptr, hca_ptr->port_num, 0,  
+			   &hca_ptr->ib_trans.gid)) {
 		dapl_dbg_log (DAPL_DBG_TYPE_ERR, 
 			      " open_hca: IB get GID failed for %s\n", 
 			      ibv_get_device_name(hca_ptr->ib_trans.ib_dev) );
-		return DAT_INTERNAL_ERROR;
+		goto bail;
 	}
-
 	/* get the IP address of the device */
-	if ( dapli_get_addr((char*)&hca_ptr->hca_address, 
-			    sizeof(DAT_SOCK_ADDR6) )) {
+	if (dapli_get_hca_addr(hca_ptr)) {
 		dapl_dbg_log (DAPL_DBG_TYPE_ERR, 
 			      " open_hca: IB get ADDR failed for %s\n", 
 			      ibv_get_device_name(hca_ptr->ib_trans.ib_dev) );
-		return DAT_INTERNAL_ERROR;
+		goto bail;
+	}
+
+	/* one thread for each device open */
+	if (dapli_cq_thread_init(hca_ptr)) {
+		dapl_dbg_log (DAPL_DBG_TYPE_ERR, 
+			      " open_hca: cq_thread_init failed for %s\n", 
+			      ibv_get_device_name(hca_ptr->ib_trans.ib_dev) );
+		goto bail;
 	}
 
+	/* initialize cq_lock and wait object */
+	dapl_os_lock_init(&hca_ptr->ib_trans.cq_lock);
+	dapl_os_wait_object_init (&hca_ptr->ib_trans.wait_object);
+  
 	dapl_dbg_log (DAPL_DBG_TYPE_UTIL, 
 		      " open_hca: %s, port %d, %s  %d.%d.%d.%d INLINE_MAX=%d\n", 
 		      ibv_get_device_name(hca_ptr->ib_trans.ib_dev), hca_ptr->port_num,
@@ -257,7 +282,19 @@
 		      ((struct sockaddr_in *)&hca_ptr->hca_address)->sin_addr.s_addr >> 24 & 0xff,
 		      hca_ptr->ib_trans.max_inline_send );
 
-	return dat_status;
+	dapl_dbg_log(DAPL_DBG_TYPE_CM,
+		     " open_hca: LID 0x%x GID subnet %016llx id %016llx\n",
+		     hca_ptr->ib_trans.lid,
+		     (unsigned long long)bswap_64(hca_ptr->ib_trans.gid.global.subnet_prefix),
+		     (unsigned long long)bswap_64(hca_ptr->ib_trans.gid.global.interface_id) );
+
+	return DAT_SUCCESS;
+
+bail:
+	ibv_close_device(hca_ptr->ib_hca_handle); 
+	hca_ptr->ib_hca_handle = IB_INVALID_HANDLE;
+	return DAT_INTERNAL_ERROR;
+
 }
 
 
@@ -282,10 +319,14 @@
 	dapl_dbg_log (DAPL_DBG_TYPE_UTIL," close_hca: %p\n",hca_ptr);
 
 	if (hca_ptr->ib_hca_handle != IB_INVALID_HANDLE) {
+		dapli_cq_thread_destroy(hca_ptr);
 		if (ibv_close_device(hca_ptr->ib_hca_handle)) 
 			return(dapl_convert_errno(errno,"ib_close_device"));
 		hca_ptr->ib_hca_handle = IB_INVALID_HANDLE;
 	}
+	
+	dapl_os_lock_destroy(&hca_ptr->ib_trans.cq_lock);
+
 	return (DAT_SUCCESS);
 }
   
@@ -448,35 +489,4 @@
     return DAT_SUCCESS;
 }
 
-#ifdef PROVIDER_SPECIFIC_ATTR
-
-/*
- * dapls_set_provider_specific_attr
- *
- * Input:
- *	attr_ptr		Pointer provider attributes
- *
- * Output:
- * 	none
- *
- * Returns:
- * 	void
- */
-DAT_NAMED_ATTR	ib_attrs[] = {
-    {
-    	"I_DAT_SEND_INLINE_THRESHOLD",
-    	"128"
-    },
-};
-
-#define SPEC_ATTR_SIZE( x )	(sizeof( x ) / sizeof( DAT_NAMED_ATTR))
-
-void dapls_set_provider_specific_attr(
-	IN DAT_PROVIDER_ATTR	*attr_ptr )
-{
-	attr_ptr->num_provider_specific_attr = SPEC_ATTR_SIZE( ib_attrs );
-	attr_ptr->provider_specific_attr     = ib_attrs;
-}
-
-#endif
 
Index: dapl/openib/dapl_ib_cm.c
===================================================================
--- dapl/openib/dapl_ib_cm.c	(revision 2919)
+++ dapl/openib/dapl_ib_cm.c	(working copy)
@@ -70,19 +70,8 @@
 static inline uint64_t cpu_to_be64(uint64_t x) { return x; }
 #endif
 
-#ifndef IB_AT
-
-#include <stdio.h>
-#include <unistd.h>
-#include <fcntl.h>
-#include <netinet/tcp.h>
-#include <sysfs/libsysfs.h>
-#include <signal.h>
-
-/* iclust-20 hard coded values, network order */
-#define REMOTE_GID	"fe80:0000:0000:0000:0002:c902:0000:4071"
-#define REMOTE_LID	"0002"
-
+static int			g_at_destroy;
+static DAPL_OS_THREAD		g_at_thread;
 static int			g_cm_destroy;
 static DAPL_OS_THREAD		g_cm_thread;
 static DAPL_OS_LOCK		g_cm_lock;
@@ -122,7 +111,7 @@
 	while (g_cm_destroy) {
 		struct timespec	sleep, remain;
 		sleep.tv_sec = 0;
-		sleep.tv_nsec = 200000000; /* 200 ms */
+		sleep.tv_nsec = 10000000; /* 10 ms */
 		dapl_dbg_log(DAPL_DBG_TYPE_CM, 
 			     " cm_thread_destroy: waiting for cm_thread\n");
 		nanosleep (&sleep, &remain);
@@ -130,112 +119,70 @@
 	dapl_dbg_log(DAPL_DBG_TYPE_CM," cm_thread_destroy(%d) exit\n",getpid());
 }
 
-static int ib_at_route_by_ip(uint32_t dst_ip, uint32_t src_ip, int tos, uint16_t flags,
-			     struct ib_at_ib_route *ib_route,
-			     struct ib_at_completion *async_comp)
-{
-	struct dapl_cm_id *conn = (struct dapl_cm_id *)async_comp->context;
-	
-	dapl_dbg_log (
-		DAPL_DBG_TYPE_CM, 
-		" CM at_route_by_ip: conn %p cm_id %d src %d.%d.%d.%d -> dst %d.%d.%d.%d (%d)\n", 
-		conn,conn->cm_id,
-		src_ip >> 0 & 0xff, src_ip >> 8 & 0xff,
-		src_ip >> 16 & 0xff,src_ip >> 24 & 0xff,
-		dst_ip >> 0 & 0xff, dst_ip >> 8 & 0xff,
-		dst_ip >> 16 & 0xff,dst_ip >> 24 & 0xff, conn->service_id);
-
-	/* use req_id for loopback indication */
-	if (( src_ip == dst_ip ) || ( dst_ip == 0x0100007f ))
-		async_comp->req_id = 1;
-	else
-		async_comp->req_id = 0;
-		
-	return 1;			     
-}
-
-static int ib_at_paths_by_route(struct ib_at_ib_route *ib_route, uint32_t mpath_type,
-				struct ib_sa_path_rec *pr, int npath,
-				struct ib_at_completion *async_comp)
+int dapli_at_thread_init(void)
 {
-	struct dapl_cm_id *conn = (struct dapl_cm_id *)async_comp->context;
-	char *env, *token;
-	char  dgid[40];
-	uint16_t *p_gid = (uint16_t*)&ib_route->gid;
+	DAT_RETURN dat_status;
 
-	/* set local path record values and send to remote */
-	(void)dapl_os_memzero(pr, sizeof(*pr));
+	dapl_dbg_log(DAPL_DBG_TYPE_CM," at_thread_init(%d)\n", getpid());
 
-	pr->slid = htons(conn->hca->ib_trans.lid);
-	pr->sgid.global.subnet_prefix = conn->hca->ib_trans.gid.global.subnet_prefix;
-	pr->sgid.global.interface_id  = conn->hca->ib_trans.gid.global.interface_id;
+	/* create thread to process AT async requests */
+	dat_status = dapl_os_thread_create(at_thread, NULL, &g_at_thread);
+	if (dat_status != DAT_SUCCESS)
+	{
+		dapl_dbg_log(DAPL_DBG_TYPE_ERR, 
+			     " at_thread_init: failed to create thread\n");
+		return 1;
+	}
+	return 0;
+}
 
-	env = getenv("DAPL_REMOTE_LID");
-	if ( env == NULL )
-		env = REMOTE_LID;
-	ib_route->lid = strtol(env,NULL,0);
+void dapli_at_thread_destroy(void)
+{
+	dapl_dbg_log(DAPL_DBG_TYPE_CM," at_thread_destroy(%d)\n", getpid());
 
-	env = getenv("DAPL_REMOTE_GID");
-	if ( env == NULL )
-		env = REMOTE_GID;
+	/* destroy cr_thread and lock */
+	g_at_destroy = 1;
+	pthread_kill( g_at_thread, SIGUSR1 );
+	dapl_dbg_log(DAPL_DBG_TYPE_CM," at_thread_destroy(%d) SIGUSR1 sent\n",getpid());
+	while (g_at_destroy) {
+		struct timespec	sleep, remain;
+		sleep.tv_sec = 0;
+		sleep.tv_nsec = 10000000; /* 10 ms */
+		dapl_dbg_log(DAPL_DBG_TYPE_CM, 
+			     " at_thread_destroy: waiting for at_thread\n");
+		nanosleep (&sleep, &remain);
+	}
+	dapl_dbg_log(DAPL_DBG_TYPE_CM," at_thread_destroy(%d) exit\n",getpid());
+}
 
-	dapl_dbg_log(DAPL_DBG_TYPE_CM, 
-		     " ib_at_paths_by_route: remote LID %x GID %s\n",
-		     ib_route->lid,env);
+void dapli_ip_comp_handler(uint64_t req_id, void *context, int rec_num)
+{
+	struct dapl_at_record	*at_rec = context;
 
-	dapl_os_memcpy( dgid, env, 40 ); 
+	dapl_dbg_log(DAPL_DBG_TYPE_CM,
+		     " ip_comp_handler: ctxt %p, req_id %lld rec_num %d\n",
+		     context, req_id, rec_num);
 
-	/* get GID with token strings and delimiter */
-	token = strtok(dgid,":");
-	while (token) {
-		*p_gid = strtoul(token,NULL,16);
-		*p_gid = htons(*p_gid); /* convert each token to network order */
-		token = strtok(NULL,":");
-		p_gid++;
-	}
-       
-	/* set remote lid and gid, req_id is indication of loopback */
-	if ( !async_comp->req_id ) {
-		pr->dlid = htons(ib_route->lid);
-		pr->dgid.global.subnet_prefix = ib_route->gid.global.subnet_prefix;
-		pr->dgid.global.interface_id  = ib_route->gid.global.interface_id;
-	} else {
-		pr->dlid = pr->slid;
-		pr->dgid.global.subnet_prefix = pr->sgid.global.subnet_prefix;
-		pr->dgid.global.interface_id  = pr->sgid.global.interface_id;
-	}
-
-	pr->reversible = 0x1000000;
-	pr->pkey = 0xffff;
-	pr->mtu  = IBV_MTU_1024;
-	pr->mtu_selector  = 2;
-	pr->rate_selector = 2;
-	pr->rate          = 3;
-	pr->packet_life_time_selector = 2;
-	pr->packet_life_time          = 2;
+	if ((at_rec) && ( at_rec->req_id == req_id)) {
+		dapl_os_wait_object_wakeup(at_rec->wait_object);
+		return;
+	}
 	
-	dapl_dbg_log(DAPL_DBG_TYPE_CM, 
-		     " ib_at_paths_by_route: SRC LID 0x%x GID subnet %016llx id %016llx\n",
-		     pr->slid,(unsigned long long)(pr->sgid.global.subnet_prefix),
-		     (unsigned long long)(pr->sgid.global.interface_id) );
-	dapl_dbg_log(DAPL_DBG_TYPE_CM, 
-		     " ib_at_paths_by_route: DST LID 0x%x GID subnet %016llx id %016llx\n",
-		     pr->dlid,(unsigned long long)(pr->dgid.global.subnet_prefix),
-		     (unsigned long long)(pr->dgid.global.interface_id) );
-
-	dapli_path_comp_handler( async_comp->req_id, (void*)conn, 1);
-
-	return 0;
+	dapl_dbg_log(DAPL_DBG_TYPE_ERR,
+		     " ip_comp_handler: at_rec->req_id %lld != req_id %lld\n",
+		     at_rec->req_id, req_id );
 }
 
-#endif /* ifndef IB_AT */
-
 static void dapli_path_comp_handler(uint64_t req_id, void *context, int rec_num)
 {
 	struct dapl_cm_id *conn = context;
 	int status;
 	ib_cm_events_t event;
 
+	dapl_dbg_log(DAPL_DBG_TYPE_CM,
+		     " path_comp_handler: ctxt %p, req_id %lld rec_num %d\n",
+		     context, req_id, rec_num);
+
 	if (rec_num <= 0) {
 		dapl_dbg_log(DAPL_DBG_TYPE_CM, 
 			     " path_comp_handler: resolution err %d retry %d\n",
@@ -249,7 +196,7 @@
 
 		status = ib_at_paths_by_route(&conn->dapl_rt, 0,
 					      &conn->dapl_path, 1,
-					      &conn->dapl_comp);
+					      &conn->dapl_comp, &conn->dapl_comp.req_id);
 		if (status) {
 			dapl_dbg_log(DAPL_DBG_TYPE_CM,
 				     " path_by_route: err %d id %lld\n",
@@ -287,6 +234,21 @@
 	int status;
 	ib_cm_events_t event;
 
+	dapl_dbg_log(DAPL_DBG_TYPE_CM,
+		     " rt_comp_handler: conn %p, req_id %lld rec_num %d\n",
+		     conn, req_id, rec_num);
+
+	dapl_dbg_log(DAPL_DBG_TYPE_CM, 
+		     " rt_comp_handler: SRC GID subnet %016llx id %016llx\n",
+		     (unsigned long long)cpu_to_be64(conn->dapl_rt.sgid.global.subnet_prefix),
+		     (unsigned long long)cpu_to_be64(conn->dapl_rt.sgid.global.interface_id) );
+
+	dapl_dbg_log(DAPL_DBG_TYPE_CM, 
+		     " rt_comp_handler: DST GID subnet %016llx id %016llx\n",
+		     (unsigned long long)cpu_to_be64(conn->dapl_rt.dgid.global.subnet_prefix),
+		     (unsigned long long)cpu_to_be64(conn->dapl_rt.dgid.global.interface_id) );
+
+
 	if (rec_num <= 0) {
 		dapl_dbg_log(DAPL_DBG_TYPE_ERR,
 			     " dapl_rt_comp_handler: rec %d retry %d\n",
@@ -298,7 +260,8 @@
 		}
 
 		status = ib_at_route_by_ip(((struct sockaddr_in *)&conn->r_addr)->sin_addr.s_addr,
-					   0, 0, 0, &conn->dapl_rt, &conn->dapl_comp);
+					   0, 0, 0, &conn->dapl_rt, 
+					   &conn->dapl_comp,&conn->dapl_comp.req_id);
 		if (status < 0) {
 			dapl_dbg_log(DAPL_DBG_TYPE_ERR, "dapl_rt_comp_handler: "
 				    "ib_at_route_by_ip failed with status %d\n",
@@ -306,9 +269,16 @@
 			event = IB_CME_DESTINATION_UNREACHABLE;
 			goto bail;
 		}
-
 		if (status == 1)
 			dapli_rt_comp_handler(conn->dapl_comp.req_id, conn, 1);
+
+		return;
+	}
+
+	if (!conn->dapl_rt.dgid.global.subnet_prefix || req_id != conn->dapl_comp.req_id) {
+		dapl_dbg_log(DAPL_DBG_TYPE_ERR,
+			     " dapl_rt_comp_handler: ERROR: unexpected callback req_id=%d(%d)\n",
+			     req_id, conn->dapl_comp.req_id ); 
 		return;
 	}
 
@@ -316,7 +286,7 @@
 	conn->dapl_comp.context = conn;
 	conn->retries = 0;
 	status = ib_at_paths_by_route(&conn->dapl_rt, 0, &conn->dapl_path, 1,
-				      &conn->dapl_comp);
+				      &conn->dapl_comp, &conn->dapl_comp.req_id);
 	if (status) {
 		dapl_dbg_log(DAPL_DBG_TYPE_ERR,
 			     "dapl_rt_comp_handler: ib_at_paths_by_route "
@@ -346,8 +316,6 @@
 		ib_cm_destroy_id(conn->cm_id);
 		if (conn->ep)
 			conn->ep->cm_handle = IB_INVALID_HANDLE;
-		if (conn->sp)
-			conn->sp->cm_srvc_handle = IB_INVALID_HANDLE;
 
 		/* take off the CM thread work queue and free */
 		dapl_os_lock( &g_cm_lock );
@@ -621,10 +589,8 @@
 }
 
 /* something to catch the signal */
-static void cm_handler(int signum)
+static void ib_sig_handler(int signum)
 {
-	dapl_dbg_log (DAPL_DBG_TYPE_CM," cm_thread(%d,0x%x): ENTER cm_handler %d\n",
-			getpid(),g_cm_thread,signum);
 	return;
 }
 
@@ -643,7 +609,7 @@
     	sigemptyset(&sigset);
 	sigaddset(&sigset, SIGUSR1);
     	pthread_sigmask(SIG_UNBLOCK, &sigset, NULL);
-	signal( SIGUSR1, cm_handler); 
+	signal( SIGUSR1, ib_sig_handler); 
 	
 	dapl_os_lock( &g_cm_lock );
 	while (!g_cm_destroy) {
@@ -667,7 +633,7 @@
 		dapl_dbg_log(DAPL_DBG_TYPE_CM,
 			" cm_thread: GET EVENT fd=%d n=%d\n",
 			ib_cm_get_fd(),ret);
-		if (ib_cm_event_get(&event)) { 
+		if (ib_cm_event_get_timed(0,&event)) { 
 			dapl_dbg_log(DAPL_DBG_TYPE_CM,
 				" cm_thread: ERR %s eventi_get on %d\n", 
 				strerror(errno), ib_cm_get_fd() );
@@ -732,6 +698,33 @@
 	g_cm_destroy = 0;
 }
 
+/* async AT processing thread */
+void at_thread(void *arg) 
+{
+	sigset_t sigset;
+
+	dapl_dbg_log (DAPL_DBG_TYPE_CM,
+		      " at_thread(%d,0x%x): ENTER: at_fd %d\n",
+		      getpid(), g_at_thread, ib_at_get_fd());
+
+    	sigemptyset(&sigset);
+	sigaddset(&sigset, SIGUSR1);
+    	pthread_sigmask(SIG_UNBLOCK, &sigset, NULL);
+	signal(SIGUSR1, ib_sig_handler); 
+	
+	while (!g_at_destroy) {
+		/* poll forever until callback or signal */
+		if (ib_at_callback_get_timed(-1) < 0) { 
+			dapl_dbg_log(DAPL_DBG_TYPE_CM,
+				" at_thread: SIG? ret=%s, destroy=%d\n", 
+				strerror(errno), g_at_destroy );
+		}
+		dapl_dbg_log(DAPL_DBG_TYPE_CM," at_thread: callback woke\n");
+	}
+	dapl_dbg_log(DAPL_DBG_TYPE_CM," at_thread(%d) EXIT \n", getpid());
+	g_at_destroy = 0;
+}
+
 /************************ DAPL provider entry points **********************/
 
 /*
@@ -826,33 +819,34 @@
 	conn->dapl_comp.context = conn;
 	conn->retries = 0;
 	dapl_os_memcpy(&conn->r_addr, r_addr, sizeof(DAT_SOCK_ADDR6));
+	
+	/* put on CM thread work queue */
+	dapl_llist_init_entry((DAPL_LLIST_ENTRY*)&conn->entry);
+	dapl_os_lock( &g_cm_lock );
+	dapl_llist_add_tail(&g_cm_list, 
+			    (DAPL_LLIST_ENTRY*)&conn->entry, conn);
+	dapl_os_unlock(&g_cm_lock);
 
 	status = ib_at_route_by_ip(
 		((struct sockaddr_in *)&conn->r_addr)->sin_addr.s_addr, 
 		((struct sockaddr_in *)&conn->hca->hca_address)->sin_addr.s_addr, 
-		0, 0, &conn->dapl_rt, &conn->dapl_comp);
+		0, 0, &conn->dapl_rt, &conn->dapl_comp, &conn->dapl_comp.req_id);
+
+	dapl_dbg_log(DAPL_DBG_TYPE_CM, " connect: at_route ret=%d,%s req_id %d GID %016llx
%016llx\n", 
+		     status, strerror(errno), conn->dapl_comp.req_id,
+		     (unsigned long long)cpu_to_be64(conn->dapl_rt.dgid.global.subnet_prefix),
+		     (unsigned long long)cpu_to_be64(conn->dapl_rt.dgid.global.interface_id) );
 
 	if (status < 0) {
 		dat_status = dapl_convert_errno(errno,"ib_at_route_by_ip");
-		goto destroy;
+		dapli_destroy_cm_id(conn); 
+		return dat_status;
 	}
-	if (status == 1)
-		dapli_rt_comp_handler(conn->dapl_comp.req_id, conn, 1);
 
-	
-	/* put on CM thread work queue */
-	dapl_llist_init_entry((DAPL_LLIST_ENTRY*)&conn->entry);
-	dapl_os_lock( &g_cm_lock );
-	dapl_llist_add_tail(&g_cm_list, 
-			    (DAPL_LLIST_ENTRY*)&conn->entry, conn);
-	dapl_os_unlock(&g_cm_lock);
+	if (status > 0) 
+		dapli_rt_comp_handler(conn->dapl_comp.req_id, conn, status);
 
 	return DAT_SUCCESS;
-
-destroy:
-	dapli_destroy_cm_id(conn); 
-	return dat_status;
-
 }
 
 /*
@@ -992,6 +986,13 @@
 	conn->hca = ia_ptr->hca_ptr;
 	conn->service_id = ServiceID;
 
+	/* put on CM thread work queue */
+	dapl_llist_init_entry((DAPL_LLIST_ENTRY*)&conn->entry);
+	dapl_os_lock( &g_cm_lock );
+	dapl_llist_add_tail(&g_cm_list, 
+			(DAPL_LLIST_ENTRY*)&conn->entry, conn);
+	dapl_os_unlock(&g_cm_lock);
+
 	dapl_dbg_log(DAPL_DBG_TYPE_EP,
 		     " setup_listener(conn=%p cm_id=%d)\n",
 		     sp_ptr->cm_srvc_handle,conn->cm_id);
@@ -1003,19 +1004,13 @@
 			dat_status = DAT_CONN_QUAL_IN_USE;
 		else
 			dat_status = DAT_INSUFFICIENT_RESOURCES;
-	/* success */ 
-	} else  { 
-		/* put on CM thread work queue */
-		dapl_llist_init_entry((DAPL_LLIST_ENTRY*)&conn->entry);
-		dapl_os_lock( &g_cm_lock );
-		dapl_llist_add_tail(&g_cm_list, 
-				(DAPL_LLIST_ENTRY*)&conn->entry, conn);
-		dapl_os_unlock(&g_cm_lock);
+
+        	dapli_destroy_cm_id(conn);
 		return dat_status;
 	}
 
-        dapli_destroy_cm_id(conn);
-	return dat_status;
+	/* success */ 
+	return DAT_SUCCESS;
 }
 
 
@@ -1047,9 +1042,11 @@
 			" remove_listener(ia_ptr %p sp_ptr %p cm_ptr %p)\n",
 			ia_ptr, sp_ptr, conn );
 	
-	if (sp_ptr->cm_srvc_handle != IB_INVALID_HANDLE) 
+	if (conn != IB_INVALID_HANDLE) { 
+		sp_ptr->cm_srvc_handle = NULL;
         	dapli_destroy_cm_id(conn);
-	
+	}	
+
 	return DAT_SUCCESS;
 }
 
Index: dapl/openib/dapl_ib_util.h
===================================================================
--- dapl/openib/dapl_ib_util.h	(revision 2919)
+++ dapl/openib/dapl_ib_util.h	(working copy)
@@ -53,6 +53,7 @@
 #include <byteswap.h>
 #include <infiniband/sa.h>
 #include <infiniband/cm.h>
+#include <infiniband/at.h>
 
 /* Typedefs to map common DAPL provider types to IB verbs */
 typedef	struct ibv_qp		*ib_qp_handle_t;
@@ -68,8 +69,8 @@
 
 #define IB_RC_RETRY_COUNT      7
 #define IB_RNR_RETRY_COUNT     7
-#define IB_CM_RESPONSE_TIMEOUT 20	/* 4 sec */
-#define IB_MAX_CM_RETRIES      4
+#define IB_CM_RESPONSE_TIMEOUT 18	/* 1 sec */
+#define IB_MAX_CM_RETRIES      7
 
 #define IB_REQ_MRA_TIMEOUT	27	/* a little over 9 minutes */
 #define IB_MAX_AT_RETRY		3
@@ -92,21 +93,12 @@
 	IB_CME_BROKEN
 } ib_cm_events_t;
 
-#ifndef IB_AT
-/* implement a quick hack to exchange GID/LID's until user IB_AT arrives */
-struct ib_at_ib_route {
-        union ibv_gid   gid;
-        uint16_t        lid;
+struct dapl_at_record {
+	uint64_t                req_id;
+	DAT_SOCK_ADDR6		*addr;
+	DAPL_OS_WAIT_OBJECT	*wait_object;
 };
 
-struct ib_at_completion {
-        void (*fn)(uint64_t req_id, void *context, int rec_num);
-        void *context;
-        uint64_t req_id;
-};
-
-#endif
-
 /* 
  * dapl_llist_entry in dapl.h but dapl.h depends on provider 
  * typedef's in this file first. move dapl_llist_entry out of dapl.h
@@ -122,6 +114,7 @@
 struct dapl_cm_id {
 	struct ib_llist_entry		entry;
 	DAPL_OS_LOCK			lock;
+	DAPL_OS_WAIT_OBJECT		wait_object;
 	int				retries;
 	int				destroy;
 	int				in_callback;
@@ -238,6 +231,10 @@
 { 
 	struct	ibv_device	*ib_dev;
 	ib_cq_handle_t		ib_cq_empty;
+	DAPL_OS_LOCK            cq_lock;
+	DAPL_OS_WAIT_OBJECT     wait_object;
+	int			cq_destroy;
+	DAPL_OS_THREAD		cq_thread;
 	int			max_inline_send;
 	uint16_t		lid;
 	union ibv_gid		gid;
@@ -257,11 +254,18 @@
 void cm_thread (void *arg);
 int dapli_cm_thread_init(void);
 void dapli_cm_thread_destroy(void);
+void at_thread (void *arg);
+int dapli_at_thread_init(void);
+void dapli_at_thread_destroy(void);
+void cq_thread (void *arg);
+int dapli_cq_thread_init(struct dapl_hca *hca_ptr);
+void dapli_cq_thread_destroy(struct dapl_hca *hca_ptr);
 
-int dapli_get_lid(struct dapl_hca *hca_ptr, int port, uint16_t *lid );
+int dapli_get_lid(struct dapl_hca *hca_ptr, int port, uint16_t *lid);
 int dapli_get_gid(struct dapl_hca *hca_ptr, int port, int index, 
-		  union ibv_gid *gid );
-int dapli_get_addr(char *addr, int addr_len);
+		  union ibv_gid *gid);
+int dapli_get_hca_addr(struct dapl_hca *hca_ptr);
+void dapli_ip_comp_handler(uint64_t req_id, void *context, int rec_num);
 
 DAT_RETURN
 dapls_modify_qp_state ( IN ib_qp_handle_t	qp_handle,
Index: dapl/openib/README
===================================================================
--- dapl/openib/README	(revision 2919)
+++ dapl/openib/README	(working copy)
@@ -39,18 +39,16 @@
 
 	server:	dtest -s 
 	client:	dtest -h hostname
+
+Testing: dtest, dapltest - cl.sh regress.sh
 	
-setup/known issues:
-	
-	First drop with uCM (without IBAT), tested with simple dtest across 2 nodes. 
-	hand rolled path records require remote LID and GID set via enviroment:
+Setup:
 	
-	export DAPL_REMOTE_GID	"fe80:0000:0000:0000:0002:c902:0000:4071"
-	export DAPL_REMOTE_LID	"0002"
+	Third drop of code, includes uCM and uAT support.
+	NOTE: requires both uCM and uAT libraries and device modules from trunk.
 
-    Also, hard coded (RTR) for use with port 1 only.
-	   
+Known issues:
 	no memory windows support in ibverbs, dat_create_rmr fails.
+	some uCM scale up issues with an 8 thread dapltest in regress.sh
+	hard coded modify QP RTR to port 1, waiting for ib_cm_init_qp_attr call.
 	
-
-
Index: dapl/openib/dapl_ib_cq.c
===================================================================
--- dapl/openib/dapl_ib_cq.c	(revision 2919)
+++ dapl/openib/dapl_ib_cq.c	(working copy)
@@ -50,9 +50,96 @@
 #include "dapl_adapter_util.h"
 #include "dapl_lmr_util.h"
 #include "dapl_evd_util.h"
+#include "dapl_ring_buffer_util.h"
 #include <sys/poll.h>
+#include <signal.h>
 
+int dapli_cq_thread_init(struct dapl_hca *hca_ptr)
+{
+        DAT_RETURN dat_status;
+
+        dapl_dbg_log(DAPL_DBG_TYPE_UTIL," cq_thread_init(%p)\n", hca_ptr);
+
+        /* create thread to process inbound connect request */
+        dat_status = dapl_os_thread_create( cq_thread, (void*)hca_ptr,
&hca_ptr->ib_trans.cq_thread);
+        if (dat_status != DAT_SUCCESS)
+        {
+                dapl_dbg_log(DAPL_DBG_TYPE_ERR,
+                             " cq_thread_init: failed to create thread\n");
+                return 1;
+        }
+        return 0;
+}
+
+void dapli_cq_thread_destroy(struct dapl_hca *hca_ptr)
+{
+        dapl_dbg_log(DAPL_DBG_TYPE_UTIL," cq_thread_destroy(%p)\n", hca_ptr);
+
+        /* destroy cr_thread and lock */
+        hca_ptr->ib_trans.cq_destroy = 1;
+        pthread_kill(hca_ptr->ib_trans.cq_thread, SIGUSR1);
+        dapl_dbg_log(DAPL_DBG_TYPE_CM," cq_thread_destroy(%p) SIGUSR1 sent\n",hca_ptr);
+        while (hca_ptr->ib_trans.cq_destroy != 2) {
+                struct timespec sleep, remain;
+                sleep.tv_sec = 0;
+                sleep.tv_nsec = 10000000; /* 10 ms */
+                dapl_dbg_log(DAPL_DBG_TYPE_UTIL,
+                             " cq_thread_destroy: waiting for cq_thread\n");
+                nanosleep (&sleep, &remain);
+        }
+        dapl_dbg_log(DAPL_DBG_TYPE_UTIL," cq_thread_destroy(%d) exit\n",getpid());
+	return;
+}
+
+/* something to catch the signal */
+static void ib_cq_handler(int signum)
+{
+        return;
+}
+
+void cq_thread( void *arg )
+{
+	struct dapl_hca	*hca_ptr = arg;
+	struct dapl_evd	*evd_ptr;
+	struct ibv_cq	*ibv_cq = NULL;
+	sigset_t	sigset;
+	int		status = 0; 
+
+	dapl_dbg_log ( DAPL_DBG_TYPE_UTIL," cq_thread: ENTER hca %p\n",hca_ptr);
+  
+        sigemptyset(&sigset);
+        sigaddset(&sigset,SIGUSR1);
+        pthread_sigmask(SIG_UNBLOCK, &sigset, NULL);
+        signal(SIGUSR1, ib_cq_handler);
+
+	/* wait on DTO event, or signal to abort */
+	while (!hca_ptr->ib_trans.cq_destroy) {
+
+		struct pollfd cq_poll = {
+			.fd      = hca_ptr->ib_hca_handle->cq_fd[0],
+			.events  = POLLIN,
+			.revents = 0
+		};
 
+		status = poll(&cq_poll, 1, -1);
+		if ((status == 1) &&
+			(!ibv_get_cq_event(hca_ptr->ib_hca_handle, 0, &ibv_cq, (void*)&evd_ptr))) {
+	
+			if (DAPL_BAD_HANDLE(evd_ptr, DAPL_MAGIC_EVD))
+				continue;
+
+			/* process DTO event via callback */
+			dapl_evd_dto_callback ( evd_ptr->header.owner_ia->hca_ptr->ib_hca_handle,
+						evd_ptr->ib_cq_handle,
+						(void*)evd_ptr );
+		} else {
+
+		}
+	} 
+	hca_ptr->ib_trans.cq_destroy = 2;
+	dapl_dbg_log(DAPL_DBG_TYPE_UTIL," cq_thread: EXIT: hca %p \n", hca_ptr);
+	return;
+}
 /*
  * Map all verbs DTO completion codes to the DAT equivelent.
  *
@@ -410,9 +497,9 @@
 		IN DAPL_EVD		*evd_ptr,
 		IN ib_wait_obj_handle_t	*p_cq_wait_obj_handle )
 {
-	dapl_dbg_log (	DAPL_DBG_TYPE_UTIL, 
+	dapl_dbg_log (	DAPL_DBG_TYPE_CM, 
 			" cq_object_create: (%p)=%p\n", 
-			p_cq_wait_obj_handle, *p_cq_wait_obj_handle);
+			p_cq_wait_obj_handle, evd_ptr );
 
 	/* set cq_wait object to evd_ptr */
 	*p_cq_wait_obj_handle = evd_ptr;
@@ -447,33 +534,86 @@
 {
 	DAPL_EVD		*evd_ptr = p_cq_wait_obj_handle;
 	ib_cq_handle_t		cq = evd_ptr->ib_cq_handle;
-	struct ibv_cq		*ibv_cq;
-	void			*ibv_ctx;
-	int			status = -ETIMEDOUT; 
+	struct ibv_cq		*ibv_cq = NULL;
+	void			*ibv_ctx = NULL;
+	int			status = 0; 
 
-	dapl_dbg_log ( DAPL_DBG_TYPE_UTIL, 
+	dapl_dbg_log ( DAPL_DBG_TYPE_CM, 
 			" cq_object_wait: dev %p evd %p cq %p, time %d\n", 
 			cq->context, evd_ptr, cq, timeout );
 
-	/* Multiple EVD's sharing one event handle for now */
-	if (cq) {
-		struct pollfd cq_poll = { 
-        		.fd      = cq->context->cq_fd[0],
-			.events  = POLLIN
+	/* Multiple EVD's sharing one event handle for now until uverbs supports more */
+
+	/*
+	 *  This makes it very inefficient and tricky to manage multiple CQ per device open
+	 *  For example: 4 threads waiting on separate CQ events will all be woke when
+	 *  a CQ event fires. So the poll wakes up and the first thread to get to the
+	 *  the get_cq_event wins and the other 3 will block. The dapl_evd_wait code
+	 *  above will immediately do a poll_cq after returning from CQ wait and if
+	 *  nothing on the queue will call this wait again and go back to sleep. So
+	 *  as long as they all wake up, a mutex is held around the get_cq_event
+	 *  so no blocking occurs and they all return then everything should work.
+	 *  Of course, the timeout needs adjusted on the threads that go back to sleep.
+	 */
+	while (cq) {
+		struct pollfd cq_poll = {
+			.fd      = cq->context->cq_fd[0],
+			.events  = POLLIN,
+			.revents = 0
 		};
-		int	timeout_ms = -1;
+		int     timeout_ms = -1;
 
 		if (timeout != DAT_TIMEOUT_INFINITE)
 			timeout_ms = timeout/1000;
-		
+
+		/* check if another thread processed the event already, pending queue > 0 */
+		dapl_os_lock( &evd_ptr->header.owner_ia->hca_ptr->ib_trans.cq_lock );
+		if (dapls_rbuf_count(&evd_ptr->pending_event_queue)) {
+			dapl_os_unlock( &evd_ptr->header.owner_ia->hca_ptr->ib_trans.cq_lock );
+			break;	
+		}
+		dapl_os_unlock( &evd_ptr->header.owner_ia->hca_ptr->ib_trans.cq_lock );
+
+		dapl_dbg_log ( DAPL_DBG_TYPE_CM," cq_object_wait: polling\n");
 		status = poll(&cq_poll, 1, timeout_ms);
-		if (status == 1)
-			status = ibv_get_cq_event(cq->context, 
-						  0, &ibv_cq, &ibv_ctx);
-	}
-	dapl_dbg_log (DAPL_DBG_TYPE_UTIL, 
-		      " cq_object_wait: RET cq %p ibv_cq %p ibv_ctx %p %x\n",
-		      cq,ibv_cq,ibv_ctx,status);
+		dapl_dbg_log ( DAPL_DBG_TYPE_CM," cq_object_wait: poll returned
status=%d\n",status);
+
+		/*
+		 * If poll with timeout wakes then hold mutex around a poll with no timeout
+		 * so subsequent get_cq_events will be guaranteed not to block
+		 * If the event does not belong to this EVD then put it on proper EVD pending 
+		 * queue under the mutex.
+		 */
+		if (status == 1) {
+			dapl_os_lock( &evd_ptr->header.owner_ia->hca_ptr->ib_trans.cq_lock );
+			status = poll(&cq_poll, 1, 0);
+			if (status == 1) {
+				status = ibv_get_cq_event(cq->context,
+							  0, &ibv_cq, &ibv_ctx);
+
+				/* if event is not ours, put on proper evd pending queue */
+				/* force another wakeup */
+				if ((ibv_ctx != evd_ptr ) && 
+				    (!DAPL_BAD_HANDLE(ibv_ctx, DAPL_MAGIC_EVD))) {
+					dapl_dbg_log (DAPL_DBG_TYPE_CM,
+						      " cq_object_wait: ibv_ctx %p != evd %p\n",
+						      ibv_ctx, evd_ptr);
+					dapls_evd_copy_cq((struct evd_ptr*)ibv_ctx); 
+					dapl_os_unlock(
&evd_ptr->header.owner_ia->hca_ptr->ib_trans.cq_lock );
+					continue;
+				}	
+			}	
+			dapl_os_unlock( &evd_ptr->header.owner_ia->hca_ptr->ib_trans.cq_lock );
+			break;
+
+		} else if (status == 0) {
+			status = ETIMEDOUT;  
+			break;
+		}
+	}	
+	dapl_dbg_log (DAPL_DBG_TYPE_CM, 
+		      " cq_object_wait: RET evd %p cq %p ibv_cq %p ibv_ctx %p %s\n",
+		      evd_ptr, cq,ibv_cq,ibv_ctx,strerror(errno));
 	
 	return(dapl_convert_errno(status,"cq_wait_object_wait"));
 	








More information about the general mailing list