[openib-general] Re: fixes to the udapl ucm/uat patch you sent

James Lentini jlentini at netapp.com
Tue Aug 2 14:46:01 PDT 2005


Hi Arlin,

Can I break this patch into 3 parts: the changes to dapl_evd_wait, the 
changes to dapl_evd_resize, and the ib changes? I think it will be 
easier to discuss each set of changes seperately (with so many 
seperate issues, I'm afraid I've missed your reply to some of these 
questions)

dapl_evd_wait:

I looked over the original implementation of dapl_evd_wait() with an 
eye towards the situation you described (the caller polling and 
finding fewer events than requested, the caller going to turn on 
notification, an event occuring, the caller turning on notification, 
the caller blocking unaware of the last event). I don't believe that 
this would happen in the original implementation. Here's why: after 
the caller turns on notification, the code loops, via the continue 
statement on line 213, back to the begining of the for loop on line 
173 and repolls. Do you agree?

dapl_evd_resize:

I'm still unsure of why you removed the call to 
dapls_evd_event_realloc() and moved the work that was being performed 
in that routine up into dapl_evd_resize(). If we don't call 
dapls_evd_event_realloc() anymore, the code should be removed.

ib changes:

These look ok to me. I've checked them into revision 2955.

On Tue, 2 Aug 2005, Or Gerlitz wrote:

> Arlin,
>
> The patch you sent yesterday had some broken lines (97,949,1174,1332,1355,etc)
> Here it is with the changes that made it patch fine over 2944
>
> Or.
>
> Index: dapl/udapl/dapl_evd_wait.c
> ===================================================================
> --- dapl/udapl/dapl_evd_wait.c	(revision 2919)
> +++ dapl/udapl/dapl_evd_wait.c	(working copy)
> @@ -74,9 +74,10 @@
>     DAPL_EVD		*evd_ptr;
>     DAT_RETURN		dat_status;
>     DAT_EVENT		*local_event;
> -    DAT_BOOLEAN		notify_requested = DAT_FALSE;
> +    DAT_BOOLEAN		notify_needed = DAT_FALSE;
>     DAT_BOOLEAN		waitable;
>     DAPL_EVD_STATE	evd_state;
> +    DAT_COUNT		total_events,new_events;
>
>     dapl_dbg_log (DAPL_DBG_TYPE_API,
> 		  "dapl_evd_wait (%p, %d, %d, %p, %p)\n",
> @@ -124,9 +125,9 @@
>     }
>
>     dapl_dbg_log (DAPL_DBG_TYPE_EVD,
> -	          "dapl_evd_wait: EVD %p, CQ %p\n",
> -                  evd_ptr,
> -		  (void *)evd_ptr->ib_cq_handle);
> +	          "dapl_evd_wait: EVD %p, CQ %p, Timeout %d, Threshold %d\n",
> +              evd_ptr,(void *)evd_ptr->ib_cq_handle, time_out, threshold);
> +
>
>     /*
>      * Make sure there are no other waiters and the evd is active.
> @@ -144,11 +145,10 @@
>     evd_state = dapl_os_atomic_assign ( (DAPL_ATOMIC *)&evd_ptr->evd_state,
> 					(DAT_COUNT) DAPL_EVD_STATE_OPEN,
> 					(DAT_COUNT) DAPL_EVD_STATE_WAITED );
> -    dapl_os_unlock ( &evd_ptr->header.lock );
>
> -    if ( evd_state != DAPL_EVD_STATE_OPEN )
> +    dapl_os_unlock ( &evd_ptr->header.lock );
> +    if ( evd_state != DAPL_EVD_STATE_OPEN || !waitable)
>     {
> -	/* Bogus state, bail out */
> 	dat_status = DAT_ERROR (DAT_INVALID_STATE,0);
> 	goto bail;
>     }
> @@ -182,37 +182,54 @@
> 	 * return right away if the ib_cq_handle associate with these evd
> 	 * equal to IB_INVALID_HANDLE
> 	 */
> -	dapls_evd_copy_cq(evd_ptr);
> -
> -	if (dapls_rbuf_count(&evd_ptr->pending_event_queue) >= threshold)
> -	{
> -	    break;
> -	}
> -
> -	/*
> -	 * Do not enable the completion notification if this evd is not
> -	 * a DTO_EVD or RMR_BIND_EVD
> +	/* Logic to prevent missing completion between copy_cq (poll)
> +	 * and completion_notify (re-arm)
> 	 */
> -	if ( (!notify_requested) &&
> -             ((evd_ptr->evd_flags & DAT_EVD_DTO_FLAG) ||
> -              (evd_ptr->evd_flags & DAT_EVD_RMR_BIND_FLAG)) )
> +	notify_needed = DAT_TRUE;
> +	new_events = 0;
> +	while (DAT_TRUE)
> 	{
> -	    dat_status = dapls_ib_completion_notify (
> -		evd_ptr->header.owner_ia->hca_ptr->ib_hca_handle,
> -		evd_ptr,
> -		(evd_ptr->completion_type == DAPL_EVD_STATE_SOLICITED_WAIT) ?
> -		     IB_NOTIFY_ON_SOLIC_COMP : IB_NOTIFY_ON_NEXT_COMP );
> -
> -	    DAPL_CNTR(DCNT_EVD_WAIT_CMP_NTFY);
> -	    /* FIXME report error */
> -	    dapl_os_assert(dat_status == DAT_SUCCESS);
> +		dapls_evd_copy_cq(evd_ptr); /* poll for new completions */
> +		total_events = dapls_rbuf_count (&evd_ptr->pending_event_queue);
> +		new_events = total_events - new_events;
> +		if (total_events >= threshold ||
> +			(!new_events && notify_needed == DAT_FALSE))
> +		{
> +			break;
> +		}
> +
> +		/*
> +		 * Do not enable the completion notification if this evd is not
> +		 * a DTO_EVD or RMR_BIND_EVD
> +		 */
> +		if ( (evd_ptr->evd_flags & DAT_EVD_DTO_FLAG) ||
> +			(evd_ptr->evd_flags & DAT_EVD_RMR_BIND_FLAG) )
> +		{
> +			dat_status = dapls_ib_completion_notify (
> +					evd_ptr->header.owner_ia->hca_ptr->ib_hca_handle,
> +					evd_ptr,
> +					(evd_ptr->completion_type == DAPL_EVD_STATE_SOLICITED_WAIT)?
> +		     			IB_NOTIFY_ON_SOLIC_COMP : IB_NOTIFY_ON_NEXT_COMP );
> +
> +			DAPL_CNTR(DCNT_EVD_WAIT_CMP_NTFY);
> +			notify_needed = DAT_FALSE;
> +			new_events = total_events;
> +
> +			/* FIXME report error */
> +			dapl_os_assert(dat_status == DAT_SUCCESS);
> +		}
> +		else
> +		{
> +			break;
> +		}
>
> -	    notify_requested = DAT_TRUE;
> +	} /* while completions < threshold, and rearm needed */
>
> -	    /* Try again.  */
> -	    continue;
> +	if (total_events >= threshold)
> +	{
> +		break;
> 	}
> -
> +
>
> 	/*
> 	 * Unused by poster; it has no way to tell how many
> @@ -232,8 +249,6 @@
> #endif
> 		dat_status = dapl_os_wait_object_wait (
> 				&evd_ptr->wait_object, time_out );
> -
> -	notify_requested = DAT_FALSE; /* We've used it up.  */
>
> 	/* See if we were awakened by evd_set_unwaitable */
> 	if ( !evd_ptr->evd_waitable )
> @@ -243,13 +258,22 @@
>
> 	if (dat_status != DAT_SUCCESS)
> 	{
> -	    /*
> -	     * If the status is DAT_TIMEOUT, we'll break out of the
> -	     * loop, *not* dequeue an event (because dat_status
> -	     * != DAT_SUCCESS), set *nmore (as we should for timeout)
> -	     * and return DAT_TIMEOUT.
> -	     */
> -	    break;
> +		/*
> +		 * If the status is DAT_TIMEOUT, we'll break out of the
> +		 * loop, *not* dequeue an event (because dat_status
> +		 * != DAT_SUCCESS), set *nmore (as we should for timeout)
> +		 * and return DAT_TIMEOUT.
> +		 */
> +
> +#if defined(DAPL_DBG)
> +		dapls_evd_copy_cq(evd_ptr); /* poll */
> +		dapl_dbg_log (DAPL_DBG_TYPE_EVD,
> +			"dapl_evd_wait: WAKEUP ERROR (0x%x): EVD %p, CQ %p, events? %d\n",
> +			dat_status,evd_ptr,(void *)evd_ptr->ib_cq_handle,
> +			dapls_rbuf_count(&evd_ptr->pending_event_queue) );
> +#endif /* DAPL_DBG */
> +
> +		break;
> 	}
>     }
>
> Index: dapl/udapl/Makefile
> ===================================================================
> --- dapl/udapl/Makefile	(revision 2941)
> +++ dapl/udapl/Makefile	(working copy)
> @@ -122,7 +122,8 @@
> #
> ifeq ($(VERBS),openib)
> PROVIDER = $(TOPDIR)/../openib
> -CFLAGS   += -DOPENIB -DCQ_WAIT_OBJECT
> +CFLAGS   += -DOPENIB
> +#CFLAGS   += -DCQ_WAIT_OBJECT uncomment when fixed
> CFLAGS   += -I/usr/local/include/infiniband
> endif
>
> Index: dapl/common/dapl_evd_resize.c
> ===================================================================
> --- dapl/common/dapl_evd_resize.c	(revision 2919)
> +++ dapl/common/dapl_evd_resize.c	(working copy)
> @@ -67,71 +67,139 @@
> 	IN	DAT_EVD_HANDLE	   evd_handle,
> 	IN	DAT_COUNT	   evd_qlen )
> {
> -    DAPL_IA		*ia_ptr;
> -    DAPL_EVD		*evd_ptr;
> -    DAT_COUNT   	pend_cnt;
> -    DAT_RETURN		dat_status;
> +    DAPL_IA          *ia_ptr;
> +    DAPL_EVD         *evd_ptr;
> +    DAT_EVENT        *event_ptr;
> +    DAT_EVENT        *events;
> +    DAT_EVENT        *orig_event;
> +    DAPL_RING_BUFFER free_event_queue;
> +    DAPL_RING_BUFFER pending_event_queue;
> +    DAT_COUNT        pend_cnt;
> +    DAT_COUNT        i;
> +    DAT_RETURN       dat_status;
>
>     dapl_dbg_log (DAPL_DBG_TYPE_API, "dapl_evd_resize (%p, %d)\n",
> 		  evd_handle, evd_qlen);
>
>     if (DAPL_BAD_HANDLE (evd_handle, DAPL_MAGIC_EVD))
>     {
> -	dat_status = DAT_ERROR (DAT_INVALID_HANDLE,0);
> -	goto bail;
> +        return DAT_ERROR (DAT_INVALID_PARAMETER,DAT_INVALID_ARG1);
>     }
>
>     evd_ptr     = (DAPL_EVD *)evd_handle;
>     ia_ptr      = evd_ptr->header.owner_ia;
>
> -    if ( evd_qlen == evd_ptr->qlen )
> +    if ((evd_qlen <= 0) || (evd_ptr->qlen > evd_qlen))
>     {
> -	 dat_status = DAT_SUCCESS;
> -	 goto bail;
> +        dat_status = DAT_ERROR(DAT_INVALID_PARAMETER,DAT_INVALID_ARG2);
> +	goto bail;
>     }
>
>     if ( evd_qlen > ia_ptr->hca_ptr->ia_attr.max_evd_qlen )
>     {
> -	dat_status = DAT_ERROR (DAT_INVALID_PARAMETER,DAT_INVALID_ARG2);
> +	dat_status = DAT_ERROR(DAT_INSUFFICIENT_RESOURCES,DAT_RESOURCE_TEVD);
> 	goto bail;
>     }
>
>     dapl_os_lock(&evd_ptr->header.lock);
>
> -    /* Don't try to resize if we are actively waiting */
>     if (evd_ptr->evd_state == DAPL_EVD_STATE_WAITED)
>     {
> -        dapl_os_unlock(&evd_ptr->header.lock);
> -	dat_status = DAT_ERROR (DAT_INVALID_STATE,0);
> -	goto bail;
> +        dat_status = DAT_ERROR(DAT_INVALID_STATE,0);
> +        goto bail_unlock;
>     }
>
>     pend_cnt = dapls_rbuf_count(&evd_ptr->pending_event_queue);
>     if (pend_cnt > evd_qlen) {
> -	dapl_os_unlock(&evd_ptr->header.lock);
> -	dat_status = DAT_ERROR (DAT_INVALID_STATE,0);
> -	goto bail;
> +        dat_status = DAT_ERROR(DAT_INVALID_STATE,0);
> +        goto bail_unlock;
>     }
>
>     dat_status = dapls_ib_cq_resize(evd_ptr->header.owner_ia,
> -				    evd_ptr,
> -				    &evd_qlen);
> -    if (dat_status != DAT_SUCCESS)
> +                                    evd_ptr,
> +                                    &evd_qlen);
> +    if (DAT_SUCCESS != dat_status) {
> +        dat_status = DAT_ERROR(DAT_INSUFFICIENT_RESOURCES,DAT_RESOURCE_MEMORY);
> +        goto bail_unlock;
> +    }
> +
> +    /* Allocate EVENTs */
> +    events = (DAT_EVENT *) dapl_os_alloc (evd_qlen * sizeof (DAT_EVENT));
> +    if (!events)
>     {
> -        dapl_os_unlock(&evd_ptr->header.lock);
> -	goto bail;
> +        dat_status = DAT_ERROR(DAT_INSUFFICIENT_RESOURCES,DAT_RESOURCE_MEMORY);
> +        goto bail_unlock;
>     }
> +    event_ptr = events;
>
> -    dat_status = dapls_evd_event_realloc (evd_ptr, evd_qlen);
> -    if (dat_status != DAT_SUCCESS)
> +    /* allocate free event queue */
> +    dat_status = dapls_rbuf_alloc (&free_event_queue, evd_qlen);
> +    if (DAT_SUCCESS != dat_status)
>     {
> -        dapl_os_unlock(&evd_ptr->header.lock);
> -	goto bail;
> +        dapl_os_free(event_ptr, evd_qlen * sizeof (DAT_EVENT));
> +        dat_status = DAT_ERROR(DAT_INSUFFICIENT_RESOURCES,DAT_RESOURCE_MEMORY);
> +        goto bail_unlock;
> +    }
> +
> +    /* allocate pending event queue */
> +    dat_status = dapls_rbuf_alloc (&pending_event_queue, evd_qlen);
> +    if (DAT_SUCCESS != dat_status)
> +    {
> +        dapl_os_free(event_ptr, evd_qlen * sizeof (DAT_EVENT));
> +        dat_status = DAT_ERROR(DAT_INSUFFICIENT_RESOURCES,DAT_RESOURCE_MEMORY);
> +        goto bail_unlock;
>     }
>
> +    for (i = 0; i < pend_cnt; i++)
> +    {
> +        orig_event = dapls_rbuf_remove(&evd_ptr->pending_event_queue);
> +        if (orig_event == NULL) {
> +            dapl_dbg_log (DAPL_DBG_TYPE_ERR, " Inconsistent event queue\n");
> +            dapl_os_free(event_ptr, evd_qlen * sizeof (DAT_EVENT));
> +	    dat_status = DAT_ERROR(DAT_INSUFFICIENT_RESOURCES,DAT_RESOURCE_MEMORY);
> +	    goto bail_unlock;
> +        }
> +        memcpy(event_ptr, orig_event, sizeof(DAT_EVENT));
> +        dat_status = dapls_rbuf_add(&pending_event_queue, event_ptr);
> +        if (DAT_SUCCESS != dat_status) {
> +            dapl_os_free(event_ptr, evd_qlen * sizeof (DAT_EVENT));
> +	    dat_status = DAT_ERROR(DAT_INSUFFICIENT_RESOURCES,DAT_RESOURCE_MEMORY);
> +	    goto bail_unlock;
> +        }
> +        event_ptr++;
> +    }
> +
> +    for (i = pend_cnt; i < evd_qlen; i++)
> +    {
> +        dat_status = dapls_rbuf_add(&free_event_queue,(void *) event_ptr);
> +        if (DAT_SUCCESS != dat_status) {
> +            dapl_os_free(event_ptr, evd_qlen * sizeof (DAT_EVENT));
> +	    dat_status = DAT_ERROR(DAT_INSUFFICIENT_RESOURCES,DAT_RESOURCE_MEMORY);
> +	    goto bail_unlock;
> +        }
> +        event_ptr++;
> +    }
> +
> +    dapls_rbuf_destroy (&evd_ptr->free_event_queue);
> +    dapls_rbuf_destroy (&evd_ptr->pending_event_queue);
> +    if (evd_ptr->events)
> +    {
> +        dapl_os_free (evd_ptr->events, evd_ptr->qlen * sizeof (DAT_EVENT));
> +    }
> +    evd_ptr->free_event_queue    = free_event_queue;
> +    evd_ptr->pending_event_queue = pending_event_queue;
> +    evd_ptr->events              = events;
> +    evd_ptr->qlen                = evd_qlen;
> +
> +bail_unlock:
> +
>     dapl_os_unlock(&evd_ptr->header.lock);
>
> - bail:
> +    dapl_dbg_log (DAPL_DBG_TYPE_RTN,
> +		  "dapl_evd_resize returns 0x%x\n",dat_status);
> +
> +bail:
> +
>     return dat_status;
> }
>
> Index: dapl/openib/TODO
> ===================================================================
> --- dapl/openib/TODO	(revision 2919)
> +++ dapl/openib/TODO	(working copy)
> @@ -1,7 +1,7 @@
>
> IB Verbs:
> - CQ resize?
> -- query call to get current qp state
> +- query call to get current qp state, remote port number
> - ibv_get_cq_event() needs timed event call and wakeup
> - query call to get device attributes
> - memory window support
> @@ -9,8 +9,6 @@
> DAPL:
> - reinit EP needs a QP timewait completion notification
> - add cq_object wakeup, time based cq_object wait when verbs support arrives
> -- update uDAPL code with real ATS support
> -- etc, etc.
>
> Other:
> - Shared memory in udapl and kernel module to support?
> Index: dapl/openib/dapl_ib_util.c
> ===================================================================
> --- dapl/openib/dapl_ib_util.c	(revision 2919)
> +++ dapl/openib/dapl_ib_util.c	(working copy)
> @@ -111,27 +111,40 @@
> }
>
>
> -/* just get IP address for hostname */
> -int dapli_get_addr( char *addr, int addr_len)
> +/* just get IP address, IPv4 only for now  */
> +int dapli_get_hca_addr( struct dapl_hca *hca_ptr )
> {
> -	struct sockaddr_in	*ipv4_addr = (struct sockaddr_in*)addr;
> -	struct hostent		*h_ptr;
> -	struct utsname		ourname;
> -
> -	if ( uname( &ourname ) < 0 )
> -		return 1;
> -
> -	h_ptr = gethostbyname( ourname.nodename );
> -	if ( h_ptr == NULL )
> +	struct sockaddr_in	*ipv4_addr;
> +	struct ib_at_completion	at_comp;
> +	struct dapl_at_record	at_rec;
> +	int			status;
> +	DAT_RETURN		dat_status;
> +
> +	ipv4_addr = (struct sockaddr_in*)&hca_ptr->hca_address;
> +	ipv4_addr->sin_family = AF_INET;
> +	ipv4_addr->sin_addr.s_addr = 0;
> +
> +	at_comp.fn = dapli_ip_comp_handler;
> +	at_comp.context = &at_rec;
> +	at_rec.addr = &hca_ptr->hca_address;
> +	at_rec.wait_object = &hca_ptr->ib_trans.wait_object;
> +
> +	/*  call with async_comp until the sync version works */
> +	status = ib_at_ips_by_gid(&hca_ptr->ib_trans.gid, &ipv4_addr->sin_addr.s_addr, 1,
> +				  &at_comp, &at_rec.req_id);
> +
> +	if (status < 0)
> 		return 1;
> -
> -	if ( h_ptr->h_addrtype == AF_INET ) {
> -		ipv4_addr = (struct sockaddr_in*) addr;
> -		ipv4_addr->sin_family = AF_INET;
> -		dapl_os_memcpy( &ipv4_addr->sin_addr, h_ptr->h_addr_list[0], 4 );
> -	} else
> +
> +        if (status > 0)
> +                dapli_ip_comp_handler(at_rec.req_id, (void*)ipv4_addr, status);
> +
> +	/* wait for answer, 5 seconds max */
> +	dat_status = dapl_os_wait_object_wait (&hca_ptr->ib_trans.wait_object,5000000);
> +
> +	if ((dat_status != DAT_SUCCESS ) || (!ipv4_addr->sin_addr.s_addr))
> 		return 1;
> -
> +
> 	return 0;
> }
>
> @@ -152,14 +165,17 @@
>  */
> int32_t dapls_ib_init (void)
> {
> -	if (dapli_cm_thread_init())
> -		return -1;
> -	else
> -		return 0;
> +	dapl_dbg_log (DAPL_DBG_TYPE_UTIL, " dapl_ib_init: \n" );
> +	if (dapli_cm_thread_init() || dapli_at_thread_init())
> +		return 1;
> +
> +	return 0;
> }
>
> int32_t dapls_ib_release (void)
> {
> +	dapl_dbg_log (DAPL_DBG_TYPE_UTIL, " dapl_ib_release: \n" );
> +	dapli_at_thread_destroy();
> 	dapli_cm_thread_destroy();
> 	return 0;
> }
> @@ -186,7 +202,6 @@
>         IN   DAPL_HCA		*hca_ptr)
> {
> 	struct dlist	*dev_list;
> -	DAT_RETURN	dat_status = DAT_SUCCESS;
>
> 	dapl_dbg_log (DAPL_DBG_TYPE_UTIL,
> 		      " open_hca: %s - %p\n", hca_name, hca_ptr );
> @@ -217,36 +232,46 @@
> 			      ibv_get_device_name(hca_ptr->ib_trans.ib_dev) );
> 		return DAT_INTERNAL_ERROR;
> 	}
> -
> +
> 	/* set inline max with enviromment or default, get local lid and gid 0 */
> 	hca_ptr->ib_trans.max_inline_send =
> 		dapl_os_get_env_val ( "DAPL_MAX_INLINE", INLINE_SEND_DEFAULT );
>
> -	if ( dapli_get_lid(hca_ptr, hca_ptr->port_num,
> -			   &hca_ptr->ib_trans.lid )) {
> +	if (dapli_get_lid(hca_ptr, hca_ptr->port_num,
> +			   &hca_ptr->ib_trans.lid)) {
> 		dapl_dbg_log (DAPL_DBG_TYPE_ERR,
> 			      " open_hca: IB get LID failed for %s\n",
> 			      ibv_get_device_name(hca_ptr->ib_trans.ib_dev) );
> -		return DAT_INTERNAL_ERROR;
> +		goto bail;
> 	}
>
> -	if ( dapli_get_gid(hca_ptr, hca_ptr->port_num, 0,
> -			   &hca_ptr->ib_trans.gid )) {
> +	if (dapli_get_gid(hca_ptr, hca_ptr->port_num, 0,
> +			   &hca_ptr->ib_trans.gid)) {
> 		dapl_dbg_log (DAPL_DBG_TYPE_ERR,
> 			      " open_hca: IB get GID failed for %s\n",
> 			      ibv_get_device_name(hca_ptr->ib_trans.ib_dev) );
> -		return DAT_INTERNAL_ERROR;
> +		goto bail;
> 	}
> -
> 	/* get the IP address of the device */
> -	if ( dapli_get_addr((char*)&hca_ptr->hca_address,
> -			    sizeof(DAT_SOCK_ADDR6) )) {
> +	if (dapli_get_hca_addr(hca_ptr)) {
> 		dapl_dbg_log (DAPL_DBG_TYPE_ERR,
> 			      " open_hca: IB get ADDR failed for %s\n",
> 			      ibv_get_device_name(hca_ptr->ib_trans.ib_dev) );
> -		return DAT_INTERNAL_ERROR;
> +		goto bail;
> +	}
> +
> +	/* one thread for each device open */
> +	if (dapli_cq_thread_init(hca_ptr)) {
> +		dapl_dbg_log (DAPL_DBG_TYPE_ERR,
> +			      " open_hca: cq_thread_init failed for %s\n",
> +			      ibv_get_device_name(hca_ptr->ib_trans.ib_dev) );
> +		goto bail;
> 	}
>
> +	/* initialize cq_lock and wait object */
> +	dapl_os_lock_init(&hca_ptr->ib_trans.cq_lock);
> +	dapl_os_wait_object_init (&hca_ptr->ib_trans.wait_object);
> +
> 	dapl_dbg_log (DAPL_DBG_TYPE_UTIL,
> 		      " open_hca: %s, port %d, %s  %d.%d.%d.%d INLINE_MAX=%d\n",
> 		      ibv_get_device_name(hca_ptr->ib_trans.ib_dev), hca_ptr->port_num,
> @@ -257,7 +282,19 @@
> 		      ((struct sockaddr_in *)&hca_ptr->hca_address)->sin_addr.s_addr >> 24 & 0xff,
> 		      hca_ptr->ib_trans.max_inline_send );
>
> -	return dat_status;
> +	dapl_dbg_log(DAPL_DBG_TYPE_CM,
> +		     " open_hca: LID 0x%x GID subnet %016llx id %016llx\n",
> +		     hca_ptr->ib_trans.lid,
> +		     (unsigned long long)bswap_64(hca_ptr->ib_trans.gid.global.subnet_prefix),
> +		     (unsigned long long)bswap_64(hca_ptr->ib_trans.gid.global.interface_id) );
> +
> +	return DAT_SUCCESS;
> +
> +bail:
> +	ibv_close_device(hca_ptr->ib_hca_handle);
> +	hca_ptr->ib_hca_handle = IB_INVALID_HANDLE;
> +	return DAT_INTERNAL_ERROR;
> +
> }
>
>
> @@ -282,10 +319,14 @@
> 	dapl_dbg_log (DAPL_DBG_TYPE_UTIL," close_hca: %p\n",hca_ptr);
>
> 	if (hca_ptr->ib_hca_handle != IB_INVALID_HANDLE) {
> +		dapli_cq_thread_destroy(hca_ptr);
> 		if (ibv_close_device(hca_ptr->ib_hca_handle))
> 			return(dapl_convert_errno(errno,"ib_close_device"));
> 		hca_ptr->ib_hca_handle = IB_INVALID_HANDLE;
> 	}
> +
> +	dapl_os_lock_destroy(&hca_ptr->ib_trans.cq_lock);
> +
> 	return (DAT_SUCCESS);
> }
>
> @@ -448,35 +489,4 @@
>     return DAT_SUCCESS;
> }
>
> -#ifdef PROVIDER_SPECIFIC_ATTR
> -
> -/*
> - * dapls_set_provider_specific_attr
> - *
> - * Input:
> - *	attr_ptr		Pointer provider attributes
> - *
> - * Output:
> - * 	none
> - *
> - * Returns:
> - * 	void
> - */
> -DAT_NAMED_ATTR	ib_attrs[] = {
> -    {
> -    	"I_DAT_SEND_INLINE_THRESHOLD",
> -    	"128"
> -    },
> -};
> -
> -#define SPEC_ATTR_SIZE( x )	(sizeof( x ) / sizeof( DAT_NAMED_ATTR))
> -
> -void dapls_set_provider_specific_attr(
> -	IN DAT_PROVIDER_ATTR	*attr_ptr )
> -{
> -	attr_ptr->num_provider_specific_attr = SPEC_ATTR_SIZE( ib_attrs );
> -	attr_ptr->provider_specific_attr     = ib_attrs;
> -}
> -
> -#endif
>
> Index: dapl/openib/dapl_ib_cm.c
> ===================================================================
> --- dapl/openib/dapl_ib_cm.c	(revision 2919)
> +++ dapl/openib/dapl_ib_cm.c	(working copy)
> @@ -70,19 +70,8 @@
> static inline uint64_t cpu_to_be64(uint64_t x) { return x; }
> #endif
>
> -#ifndef IB_AT
> -
> -#include <stdio.h>
> -#include <unistd.h>
> -#include <fcntl.h>
> -#include <netinet/tcp.h>
> -#include <sysfs/libsysfs.h>
> -#include <signal.h>
> -
> -/* iclust-20 hard coded values, network order */
> -#define REMOTE_GID	"fe80:0000:0000:0000:0002:c902:0000:4071"
> -#define REMOTE_LID	"0002"
> -
> +static int			g_at_destroy;
> +static DAPL_OS_THREAD		g_at_thread;
> static int			g_cm_destroy;
> static DAPL_OS_THREAD		g_cm_thread;
> static DAPL_OS_LOCK		g_cm_lock;
> @@ -122,7 +111,7 @@
> 	while (g_cm_destroy) {
> 		struct timespec	sleep, remain;
> 		sleep.tv_sec = 0;
> -		sleep.tv_nsec = 200000000; /* 200 ms */
> +		sleep.tv_nsec = 10000000; /* 10 ms */
> 		dapl_dbg_log(DAPL_DBG_TYPE_CM,
> 			     " cm_thread_destroy: waiting for cm_thread\n");
> 		nanosleep (&sleep, &remain);
> @@ -130,112 +119,70 @@
> 	dapl_dbg_log(DAPL_DBG_TYPE_CM," cm_thread_destroy(%d) exit\n",getpid());
> }
>
> -static int ib_at_route_by_ip(uint32_t dst_ip, uint32_t src_ip, int tos, uint16_t flags,
> -			     struct ib_at_ib_route *ib_route,
> -			     struct ib_at_completion *async_comp)
> -{
> -	struct dapl_cm_id *conn = (struct dapl_cm_id *)async_comp->context;
> -
> -	dapl_dbg_log (
> -		DAPL_DBG_TYPE_CM,
> -		" CM at_route_by_ip: conn %p cm_id %d src %d.%d.%d.%d -> dst %d.%d.%d.%d (%d)\n",
> -		conn,conn->cm_id,
> -		src_ip >> 0 & 0xff, src_ip >> 8 & 0xff,
> -		src_ip >> 16 & 0xff,src_ip >> 24 & 0xff,
> -		dst_ip >> 0 & 0xff, dst_ip >> 8 & 0xff,
> -		dst_ip >> 16 & 0xff,dst_ip >> 24 & 0xff, conn->service_id);
> -
> -	/* use req_id for loopback indication */
> -	if (( src_ip == dst_ip ) || ( dst_ip == 0x0100007f ))
> -		async_comp->req_id = 1;
> -	else
> -		async_comp->req_id = 0;
> -
> -	return 1;
> -}
> -
> -static int ib_at_paths_by_route(struct ib_at_ib_route *ib_route, uint32_t mpath_type,
> -				struct ib_sa_path_rec *pr, int npath,
> -				struct ib_at_completion *async_comp)
> +int dapli_at_thread_init(void)
> {
> -	struct dapl_cm_id *conn = (struct dapl_cm_id *)async_comp->context;
> -	char *env, *token;
> -	char  dgid[40];
> -	uint16_t *p_gid = (uint16_t*)&ib_route->gid;
> +	DAT_RETURN dat_status;
>
> -	/* set local path record values and send to remote */
> -	(void)dapl_os_memzero(pr, sizeof(*pr));
> +	dapl_dbg_log(DAPL_DBG_TYPE_CM," at_thread_init(%d)\n", getpid());
>
> -	pr->slid = htons(conn->hca->ib_trans.lid);
> -	pr->sgid.global.subnet_prefix = conn->hca->ib_trans.gid.global.subnet_prefix;
> -	pr->sgid.global.interface_id  = conn->hca->ib_trans.gid.global.interface_id;
> +	/* create thread to process AT async requests */
> +	dat_status = dapl_os_thread_create(at_thread, NULL, &g_at_thread);
> +	if (dat_status != DAT_SUCCESS)
> +	{
> +		dapl_dbg_log(DAPL_DBG_TYPE_ERR,
> +			     " at_thread_init: failed to create thread\n");
> +		return 1;
> +	}
> +	return 0;
> +}
>
> -	env = getenv("DAPL_REMOTE_LID");
> -	if ( env == NULL )
> -		env = REMOTE_LID;
> -	ib_route->lid = strtol(env,NULL,0);
> +void dapli_at_thread_destroy(void)
> +{
> +	dapl_dbg_log(DAPL_DBG_TYPE_CM," at_thread_destroy(%d)\n", getpid());
>
> -	env = getenv("DAPL_REMOTE_GID");
> -	if ( env == NULL )
> -		env = REMOTE_GID;
> +	/* destroy cr_thread and lock */
> +	g_at_destroy = 1;
> +	pthread_kill( g_at_thread, SIGUSR1 );
> +	dapl_dbg_log(DAPL_DBG_TYPE_CM," at_thread_destroy(%d) SIGUSR1 sent\n",getpid());
> +	while (g_at_destroy) {
> +		struct timespec	sleep, remain;
> +		sleep.tv_sec = 0;
> +		sleep.tv_nsec = 10000000; /* 10 ms */
> +		dapl_dbg_log(DAPL_DBG_TYPE_CM,
> +			     " at_thread_destroy: waiting for at_thread\n");
> +		nanosleep (&sleep, &remain);
> +	}
> +	dapl_dbg_log(DAPL_DBG_TYPE_CM," at_thread_destroy(%d) exit\n",getpid());
> +}
>
> -	dapl_dbg_log(DAPL_DBG_TYPE_CM,
> -		     " ib_at_paths_by_route: remote LID %x GID %s\n",
> -		     ib_route->lid,env);
> +void dapli_ip_comp_handler(uint64_t req_id, void *context, int rec_num)
> +{
> +	struct dapl_at_record	*at_rec = context;
>
> -	dapl_os_memcpy( dgid, env, 40 );
> +	dapl_dbg_log(DAPL_DBG_TYPE_CM,
> +		     " ip_comp_handler: ctxt %p, req_id %lld rec_num %d\n",
> +		     context, req_id, rec_num);
>
> -	/* get GID with token strings and delimiter */
> -	token = strtok(dgid,":");
> -	while (token) {
> -		*p_gid = strtoul(token,NULL,16);
> -		*p_gid = htons(*p_gid); /* convert each token to network order */
> -		token = strtok(NULL,":");
> -		p_gid++;
> -	}
> -
> -	/* set remote lid and gid, req_id is indication of loopback */
> -	if ( !async_comp->req_id ) {
> -		pr->dlid = htons(ib_route->lid);
> -		pr->dgid.global.subnet_prefix = ib_route->gid.global.subnet_prefix;
> -		pr->dgid.global.interface_id  = ib_route->gid.global.interface_id;
> -	} else {
> -		pr->dlid = pr->slid;
> -		pr->dgid.global.subnet_prefix = pr->sgid.global.subnet_prefix;
> -		pr->dgid.global.interface_id  = pr->sgid.global.interface_id;
> -	}
> -
> -	pr->reversible = 0x1000000;
> -	pr->pkey = 0xffff;
> -	pr->mtu  = IBV_MTU_1024;
> -	pr->mtu_selector  = 2;
> -	pr->rate_selector = 2;
> -	pr->rate          = 3;
> -	pr->packet_life_time_selector = 2;
> -	pr->packet_life_time          = 2;
> +	if ((at_rec) && ( at_rec->req_id == req_id)) {
> +		dapl_os_wait_object_wakeup(at_rec->wait_object);
> +		return;
> +	}
>
> -	dapl_dbg_log(DAPL_DBG_TYPE_CM,
> -		     " ib_at_paths_by_route: SRC LID 0x%x GID subnet %016llx id %016llx\n",
> -		     pr->slid,(unsigned long long)(pr->sgid.global.subnet_prefix),
> -		     (unsigned long long)(pr->sgid.global.interface_id) );
> -	dapl_dbg_log(DAPL_DBG_TYPE_CM,
> -		     " ib_at_paths_by_route: DST LID 0x%x GID subnet %016llx id %016llx\n",
> -		     pr->dlid,(unsigned long long)(pr->dgid.global.subnet_prefix),
> -		     (unsigned long long)(pr->dgid.global.interface_id) );
> -
> -	dapli_path_comp_handler( async_comp->req_id, (void*)conn, 1);
> -
> -	return 0;
> +	dapl_dbg_log(DAPL_DBG_TYPE_ERR,
> +		     " ip_comp_handler: at_rec->req_id %lld != req_id %lld\n",
> +		     at_rec->req_id, req_id );
> }
>
> -#endif /* ifndef IB_AT */
> -
> static void dapli_path_comp_handler(uint64_t req_id, void *context, int rec_num)
> {
> 	struct dapl_cm_id *conn = context;
> 	int status;
> 	ib_cm_events_t event;
>
> +	dapl_dbg_log(DAPL_DBG_TYPE_CM,
> +		     " path_comp_handler: ctxt %p, req_id %lld rec_num %d\n",
> +		     context, req_id, rec_num);
> +
> 	if (rec_num <= 0) {
> 		dapl_dbg_log(DAPL_DBG_TYPE_CM,
> 			     " path_comp_handler: resolution err %d retry %d\n",
> @@ -249,7 +196,7 @@
>
> 		status = ib_at_paths_by_route(&conn->dapl_rt, 0,
> 					      &conn->dapl_path, 1,
> -					      &conn->dapl_comp);
> +					      &conn->dapl_comp, &conn->dapl_comp.req_id);
> 		if (status) {
> 			dapl_dbg_log(DAPL_DBG_TYPE_CM,
> 				     " path_by_route: err %d id %lld\n",
> @@ -287,6 +234,21 @@
> 	int status;
> 	ib_cm_events_t event;
>
> +	dapl_dbg_log(DAPL_DBG_TYPE_CM,
> +		     " rt_comp_handler: conn %p, req_id %lld rec_num %d\n",
> +		     conn, req_id, rec_num);
> +
> +	dapl_dbg_log(DAPL_DBG_TYPE_CM,
> +		     " rt_comp_handler: SRC GID subnet %016llx id %016llx\n",
> +		     (unsigned long long)cpu_to_be64(conn->dapl_rt.sgid.global.subnet_prefix),
> +		     (unsigned long long)cpu_to_be64(conn->dapl_rt.sgid.global.interface_id) );
> +
> +	dapl_dbg_log(DAPL_DBG_TYPE_CM,
> +		     " rt_comp_handler: DST GID subnet %016llx id %016llx\n",
> +		     (unsigned long long)cpu_to_be64(conn->dapl_rt.dgid.global.subnet_prefix),
> +		     (unsigned long long)cpu_to_be64(conn->dapl_rt.dgid.global.interface_id) );
> +
> +
> 	if (rec_num <= 0) {
> 		dapl_dbg_log(DAPL_DBG_TYPE_ERR,
> 			     " dapl_rt_comp_handler: rec %d retry %d\n",
> @@ -298,7 +260,8 @@
> 		}
>
> 		status = ib_at_route_by_ip(((struct sockaddr_in *)&conn->r_addr)->sin_addr.s_addr,
> -					   0, 0, 0, &conn->dapl_rt, &conn->dapl_comp);
> +					   0, 0, 0, &conn->dapl_rt,
> +					   &conn->dapl_comp,&conn->dapl_comp.req_id);
> 		if (status < 0) {
> 			dapl_dbg_log(DAPL_DBG_TYPE_ERR, "dapl_rt_comp_handler: "
> 				    "ib_at_route_by_ip failed with status %d\n",
> @@ -306,9 +269,16 @@
> 			event = IB_CME_DESTINATION_UNREACHABLE;
> 			goto bail;
> 		}
> -
> 		if (status == 1)
> 			dapli_rt_comp_handler(conn->dapl_comp.req_id, conn, 1);
> +
> +		return;
> +	}
> +
> +	if (!conn->dapl_rt.dgid.global.subnet_prefix || req_id != conn->dapl_comp.req_id) {
> +		dapl_dbg_log(DAPL_DBG_TYPE_ERR,
> +			     " dapl_rt_comp_handler: ERROR: unexpected callback req_id=%d(%d)\n",
> +			     req_id, conn->dapl_comp.req_id );
> 		return;
> 	}
>
> @@ -316,7 +286,7 @@
> 	conn->dapl_comp.context = conn;
> 	conn->retries = 0;
> 	status = ib_at_paths_by_route(&conn->dapl_rt, 0, &conn->dapl_path, 1,
> -				      &conn->dapl_comp);
> +				      &conn->dapl_comp, &conn->dapl_comp.req_id);
> 	if (status) {
> 		dapl_dbg_log(DAPL_DBG_TYPE_ERR,
> 			     "dapl_rt_comp_handler: ib_at_paths_by_route "
> @@ -346,8 +316,6 @@
> 		ib_cm_destroy_id(conn->cm_id);
> 		if (conn->ep)
> 			conn->ep->cm_handle = IB_INVALID_HANDLE;
> -		if (conn->sp)
> -			conn->sp->cm_srvc_handle = IB_INVALID_HANDLE;
>
> 		/* take off the CM thread work queue and free */
> 		dapl_os_lock( &g_cm_lock );
> @@ -621,10 +589,8 @@
> }
>
> /* something to catch the signal */
> -static void cm_handler(int signum)
> +static void ib_sig_handler(int signum)
> {
> -	dapl_dbg_log (DAPL_DBG_TYPE_CM," cm_thread(%d,0x%x): ENTER cm_handler %d\n",
> -			getpid(),g_cm_thread,signum);
> 	return;
> }
>
> @@ -643,7 +609,7 @@
>     	sigemptyset(&sigset);
> 	sigaddset(&sigset, SIGUSR1);
>     	pthread_sigmask(SIG_UNBLOCK, &sigset, NULL);
> -	signal( SIGUSR1, cm_handler);
> +	signal( SIGUSR1, ib_sig_handler);
>
> 	dapl_os_lock( &g_cm_lock );
> 	while (!g_cm_destroy) {
> @@ -667,7 +633,7 @@
> 		dapl_dbg_log(DAPL_DBG_TYPE_CM,
> 			" cm_thread: GET EVENT fd=%d n=%d\n",
> 			ib_cm_get_fd(),ret);
> -		if (ib_cm_event_get(&event)) {
> +		if (ib_cm_event_get_timed(0,&event)) {
> 			dapl_dbg_log(DAPL_DBG_TYPE_CM,
> 				" cm_thread: ERR %s eventi_get on %d\n",
> 				strerror(errno), ib_cm_get_fd() );
> @@ -732,6 +698,33 @@
> 	g_cm_destroy = 0;
> }
>
> +/* async AT processing thread */
> +void at_thread(void *arg)
> +{
> +	sigset_t sigset;
> +
> +	dapl_dbg_log (DAPL_DBG_TYPE_CM,
> +		      " at_thread(%d,0x%x): ENTER: at_fd %d\n",
> +		      getpid(), g_at_thread, ib_at_get_fd());
> +
> +    	sigemptyset(&sigset);
> +	sigaddset(&sigset, SIGUSR1);
> +    	pthread_sigmask(SIG_UNBLOCK, &sigset, NULL);
> +	signal(SIGUSR1, ib_sig_handler);
> +
> +	while (!g_at_destroy) {
> +		/* poll forever until callback or signal */
> +		if (ib_at_callback_get_timed(-1) < 0) {
> +			dapl_dbg_log(DAPL_DBG_TYPE_CM,
> +				" at_thread: SIG? ret=%s, destroy=%d\n",
> +				strerror(errno), g_at_destroy );
> +		}
> +		dapl_dbg_log(DAPL_DBG_TYPE_CM," at_thread: callback woke\n");
> +	}
> +	dapl_dbg_log(DAPL_DBG_TYPE_CM," at_thread(%d) EXIT \n", getpid());
> +	g_at_destroy = 0;
> +}
> +
> /************************ DAPL provider entry points **********************/
>
> /*
> @@ -826,33 +819,34 @@
> 	conn->dapl_comp.context = conn;
> 	conn->retries = 0;
> 	dapl_os_memcpy(&conn->r_addr, r_addr, sizeof(DAT_SOCK_ADDR6));
> +
> +	/* put on CM thread work queue */
> +	dapl_llist_init_entry((DAPL_LLIST_ENTRY*)&conn->entry);
> +	dapl_os_lock( &g_cm_lock );
> +	dapl_llist_add_tail(&g_cm_list,
> +			    (DAPL_LLIST_ENTRY*)&conn->entry, conn);
> +	dapl_os_unlock(&g_cm_lock);
>
> 	status = ib_at_route_by_ip(
> 		((struct sockaddr_in *)&conn->r_addr)->sin_addr.s_addr,
> 		((struct sockaddr_in *)&conn->hca->hca_address)->sin_addr.s_addr,
> -		0, 0, &conn->dapl_rt, &conn->dapl_comp);
> +		0, 0, &conn->dapl_rt, &conn->dapl_comp, &conn->dapl_comp.req_id);
> +
> +	dapl_dbg_log(DAPL_DBG_TYPE_CM, " connect: at_route ret=%d,%s req_id %d GID %016llx %016llx\n",
> +		     status, strerror(errno), conn->dapl_comp.req_id,
> +		     (unsigned long long)cpu_to_be64(conn->dapl_rt.dgid.global.subnet_prefix),
> +		     (unsigned long long)cpu_to_be64(conn->dapl_rt.dgid.global.interface_id) );
>
> 	if (status < 0) {
> 		dat_status = dapl_convert_errno(errno,"ib_at_route_by_ip");
> -		goto destroy;
> +		dapli_destroy_cm_id(conn);
> +		return dat_status;
> 	}
> -	if (status == 1)
> -		dapli_rt_comp_handler(conn->dapl_comp.req_id, conn, 1);
>
> -
> -	/* put on CM thread work queue */
> -	dapl_llist_init_entry((DAPL_LLIST_ENTRY*)&conn->entry);
> -	dapl_os_lock( &g_cm_lock );
> -	dapl_llist_add_tail(&g_cm_list,
> -			    (DAPL_LLIST_ENTRY*)&conn->entry, conn);
> -	dapl_os_unlock(&g_cm_lock);
> +	if (status > 0)
> +		dapli_rt_comp_handler(conn->dapl_comp.req_id, conn, status);
>
> 	return DAT_SUCCESS;
> -
> -destroy:
> -	dapli_destroy_cm_id(conn);
> -	return dat_status;
> -
> }
>
> /*
> @@ -992,6 +986,13 @@
> 	conn->hca = ia_ptr->hca_ptr;
> 	conn->service_id = ServiceID;
>
> +	/* put on CM thread work queue */
> +	dapl_llist_init_entry((DAPL_LLIST_ENTRY*)&conn->entry);
> +	dapl_os_lock( &g_cm_lock );
> +	dapl_llist_add_tail(&g_cm_list,
> +			(DAPL_LLIST_ENTRY*)&conn->entry, conn);
> +	dapl_os_unlock(&g_cm_lock);
> +
> 	dapl_dbg_log(DAPL_DBG_TYPE_EP,
> 		     " setup_listener(conn=%p cm_id=%d)\n",
> 		     sp_ptr->cm_srvc_handle,conn->cm_id);
> @@ -1003,19 +1004,13 @@
> 			dat_status = DAT_CONN_QUAL_IN_USE;
> 		else
> 			dat_status = DAT_INSUFFICIENT_RESOURCES;
> -	/* success */
> -	} else  {
> -		/* put on CM thread work queue */
> -		dapl_llist_init_entry((DAPL_LLIST_ENTRY*)&conn->entry);
> -		dapl_os_lock( &g_cm_lock );
> -		dapl_llist_add_tail(&g_cm_list,
> -				(DAPL_LLIST_ENTRY*)&conn->entry, conn);
> -		dapl_os_unlock(&g_cm_lock);
> +
> +        	dapli_destroy_cm_id(conn);
> 		return dat_status;
> 	}
>
> -        dapli_destroy_cm_id(conn);
> -	return dat_status;
> +	/* success */
> +	return DAT_SUCCESS;
> }
>
>
> @@ -1047,9 +1042,11 @@
> 			" remove_listener(ia_ptr %p sp_ptr %p cm_ptr %p)\n",
> 			ia_ptr, sp_ptr, conn );
>
> -	if (sp_ptr->cm_srvc_handle != IB_INVALID_HANDLE)
> +	if (conn != IB_INVALID_HANDLE) {
> +		sp_ptr->cm_srvc_handle = NULL;
>         	dapli_destroy_cm_id(conn);
> -
> +	}
> +
> 	return DAT_SUCCESS;
> }
>
> Index: dapl/openib/dapl_ib_util.h
> ===================================================================
> --- dapl/openib/dapl_ib_util.h	(revision 2919)
> +++ dapl/openib/dapl_ib_util.h	(working copy)
> @@ -53,6 +53,7 @@
> #include <byteswap.h>
> #include <infiniband/sa.h>
> #include <infiniband/cm.h>
> +#include <infiniband/at.h>
>
> /* Typedefs to map common DAPL provider types to IB verbs */
> typedef	struct ibv_qp		*ib_qp_handle_t;
> @@ -68,8 +69,8 @@
>
> #define IB_RC_RETRY_COUNT      7
> #define IB_RNR_RETRY_COUNT     7
> -#define IB_CM_RESPONSE_TIMEOUT 20	/* 4 sec */
> -#define IB_MAX_CM_RETRIES      4
> +#define IB_CM_RESPONSE_TIMEOUT 18	/* 1 sec */
> +#define IB_MAX_CM_RETRIES      7
>
> #define IB_REQ_MRA_TIMEOUT	27	/* a little over 9 minutes */
> #define IB_MAX_AT_RETRY		3
> @@ -92,21 +93,12 @@
> 	IB_CME_BROKEN
> } ib_cm_events_t;
>
> -#ifndef IB_AT
> -/* implement a quick hack to exchange GID/LID's until user IB_AT arrives */
> -struct ib_at_ib_route {
> -        union ibv_gid   gid;
> -        uint16_t        lid;
> +struct dapl_at_record {
> +	uint64_t                req_id;
> +	DAT_SOCK_ADDR6		*addr;
> +	DAPL_OS_WAIT_OBJECT	*wait_object;
> };
>
> -struct ib_at_completion {
> -        void (*fn)(uint64_t req_id, void *context, int rec_num);
> -        void *context;
> -        uint64_t req_id;
> -};
> -
> -#endif
> -
> /*
>  * dapl_llist_entry in dapl.h but dapl.h depends on provider
>  * typedef's in this file first. move dapl_llist_entry out of dapl.h
> @@ -122,6 +114,7 @@
> struct dapl_cm_id {
> 	struct ib_llist_entry		entry;
> 	DAPL_OS_LOCK			lock;
> +	DAPL_OS_WAIT_OBJECT		wait_object;
> 	int				retries;
> 	int				destroy;
> 	int				in_callback;
> @@ -238,6 +231,10 @@
> {
> 	struct	ibv_device	*ib_dev;
> 	ib_cq_handle_t		ib_cq_empty;
> +	DAPL_OS_LOCK            cq_lock;
> +	DAPL_OS_WAIT_OBJECT     wait_object;
> +	int			cq_destroy;
> +	DAPL_OS_THREAD		cq_thread;
> 	int			max_inline_send;
> 	uint16_t		lid;
> 	union ibv_gid		gid;
> @@ -257,11 +254,18 @@
> void cm_thread (void *arg);
> int dapli_cm_thread_init(void);
> void dapli_cm_thread_destroy(void);
> +void at_thread (void *arg);
> +int dapli_at_thread_init(void);
> +void dapli_at_thread_destroy(void);
> +void cq_thread (void *arg);
> +int dapli_cq_thread_init(struct dapl_hca *hca_ptr);
> +void dapli_cq_thread_destroy(struct dapl_hca *hca_ptr);
>
> -int dapli_get_lid(struct dapl_hca *hca_ptr, int port, uint16_t *lid );
> +int dapli_get_lid(struct dapl_hca *hca_ptr, int port, uint16_t *lid);
> int dapli_get_gid(struct dapl_hca *hca_ptr, int port, int index,
> -		  union ibv_gid *gid );
> -int dapli_get_addr(char *addr, int addr_len);
> +		  union ibv_gid *gid);
> +int dapli_get_hca_addr(struct dapl_hca *hca_ptr);
> +void dapli_ip_comp_handler(uint64_t req_id, void *context, int rec_num);
>
> DAT_RETURN
> dapls_modify_qp_state ( IN ib_qp_handle_t	qp_handle,
> Index: dapl/openib/README
> ===================================================================
> --- dapl/openib/README	(revision 2919)
> +++ dapl/openib/README	(working copy)
> @@ -39,18 +39,16 @@
>
> 	server:	dtest -s
> 	client:	dtest -h hostname
> +
> +Testing: dtest, dapltest - cl.sh regress.sh
>
> -setup/known issues:
> -
> -	First drop with uCM (without IBAT), tested with simple dtest across 2 nodes.
> -	hand rolled path records require remote LID and GID set via enviroment:
> +Setup:
>
> -	export DAPL_REMOTE_GID	"fe80:0000:0000:0000:0002:c902:0000:4071"
> -	export DAPL_REMOTE_LID	"0002"
> +	Third drop of code, includes uCM and uAT support.
> +	NOTE: requires both uCM and uAT libraries and device modules from trunk.
>
> -    Also, hard coded (RTR) for use with port 1 only.
> -
> +Known issues:
> 	no memory windows support in ibverbs, dat_create_rmr fails.
> +	some uCM scale up issues with an 8 thread dapltest in regress.sh
> +	hard coded modify QP RTR to port 1, waiting for ib_cm_init_qp_attr call.
>
> -
> -
> Index: dapl/openib/dapl_ib_cq.c
> ===================================================================
> --- dapl/openib/dapl_ib_cq.c	(revision 2919)
> +++ dapl/openib/dapl_ib_cq.c	(working copy)
> @@ -50,9 +50,96 @@
> #include "dapl_adapter_util.h"
> #include "dapl_lmr_util.h"
> #include "dapl_evd_util.h"
> +#include "dapl_ring_buffer_util.h"
> #include <sys/poll.h>
> +#include <signal.h>
>
> +int dapli_cq_thread_init(struct dapl_hca *hca_ptr)
> +{
> +        DAT_RETURN dat_status;
> +
> +        dapl_dbg_log(DAPL_DBG_TYPE_UTIL," cq_thread_init(%p)\n", hca_ptr);
> +
> +        /* create thread to process inbound connect request */
> +        dat_status = dapl_os_thread_create( cq_thread, (void*)hca_ptr,&hca_ptr->ib_trans.cq_thread);
> +        if (dat_status != DAT_SUCCESS)
> +        {
> +                dapl_dbg_log(DAPL_DBG_TYPE_ERR,
> +                             " cq_thread_init: failed to create thread\n");
> +                return 1;
> +        }
> +        return 0;
> +}
> +
> +void dapli_cq_thread_destroy(struct dapl_hca *hca_ptr)
> +{
> +        dapl_dbg_log(DAPL_DBG_TYPE_UTIL," cq_thread_destroy(%p)\n", hca_ptr);
> +
> +        /* destroy cr_thread and lock */
> +        hca_ptr->ib_trans.cq_destroy = 1;
> +        pthread_kill(hca_ptr->ib_trans.cq_thread, SIGUSR1);
> +        dapl_dbg_log(DAPL_DBG_TYPE_CM," cq_thread_destroy(%p) SIGUSR1 sent\n",hca_ptr);
> +        while (hca_ptr->ib_trans.cq_destroy != 2) {
> +                struct timespec sleep, remain;
> +                sleep.tv_sec = 0;
> +                sleep.tv_nsec = 10000000; /* 10 ms */
> +                dapl_dbg_log(DAPL_DBG_TYPE_UTIL,
> +                             " cq_thread_destroy: waiting for cq_thread\n");
> +                nanosleep (&sleep, &remain);
> +        }
> +        dapl_dbg_log(DAPL_DBG_TYPE_UTIL," cq_thread_destroy(%d) exit\n",getpid());
> +	return;
> +}
> +
> +/* something to catch the signal */
> +static void ib_cq_handler(int signum)
> +{
> +        return;
> +}
> +
> +void cq_thread( void *arg )
> +{
> +	struct dapl_hca	*hca_ptr = arg;
> +	struct dapl_evd	*evd_ptr;
> +	struct ibv_cq	*ibv_cq = NULL;
> +	sigset_t	sigset;
> +	int		status = 0;
> +
> +	dapl_dbg_log ( DAPL_DBG_TYPE_UTIL," cq_thread: ENTER hca %p\n",hca_ptr);
> +
> +        sigemptyset(&sigset);
> +        sigaddset(&sigset,SIGUSR1);
> +        pthread_sigmask(SIG_UNBLOCK, &sigset, NULL);
> +        signal(SIGUSR1, ib_cq_handler);
> +
> +	/* wait on DTO event, or signal to abort */
> +	while (!hca_ptr->ib_trans.cq_destroy) {
> +
> +		struct pollfd cq_poll = {
> +			.fd      = hca_ptr->ib_hca_handle->cq_fd[0],
> +			.events  = POLLIN,
> +			.revents = 0
> +		};
>
> +		status = poll(&cq_poll, 1, -1);
> +		if ((status == 1) &&
> +			(!ibv_get_cq_event(hca_ptr->ib_hca_handle, 0, &ibv_cq, (void*)&evd_ptr))) {
> +
> +			if (DAPL_BAD_HANDLE(evd_ptr, DAPL_MAGIC_EVD))
> +				continue;
> +
> +			/* process DTO event via callback */
> +			dapl_evd_dto_callback ( evd_ptr->header.owner_ia->hca_ptr->ib_hca_handle,
> +						evd_ptr->ib_cq_handle,
> +						(void*)evd_ptr );
> +		} else {
> +
> +		}
> +	}
> +	hca_ptr->ib_trans.cq_destroy = 2;
> +	dapl_dbg_log(DAPL_DBG_TYPE_UTIL," cq_thread: EXIT: hca %p \n", hca_ptr);
> +	return;
> +}
> /*
>  * Map all verbs DTO completion codes to the DAT equivelent.
>  *
> @@ -410,9 +497,9 @@
> 		IN DAPL_EVD		*evd_ptr,
> 		IN ib_wait_obj_handle_t	*p_cq_wait_obj_handle )
> {
> -	dapl_dbg_log (	DAPL_DBG_TYPE_UTIL,
> +	dapl_dbg_log (	DAPL_DBG_TYPE_CM,
> 			" cq_object_create: (%p)=%p\n",
> -			p_cq_wait_obj_handle, *p_cq_wait_obj_handle);
> +			p_cq_wait_obj_handle, evd_ptr );
>
> 	/* set cq_wait object to evd_ptr */
> 	*p_cq_wait_obj_handle = evd_ptr;
> @@ -447,33 +534,86 @@
> {
> 	DAPL_EVD		*evd_ptr = p_cq_wait_obj_handle;
> 	ib_cq_handle_t		cq = evd_ptr->ib_cq_handle;
> -	struct ibv_cq		*ibv_cq;
> -	void			*ibv_ctx;
> -	int			status = -ETIMEDOUT;
> +	struct ibv_cq		*ibv_cq = NULL;
> +	void			*ibv_ctx = NULL;
> +	int			status = 0;
>
> -	dapl_dbg_log ( DAPL_DBG_TYPE_UTIL,
> +	dapl_dbg_log ( DAPL_DBG_TYPE_CM,
> 			" cq_object_wait: dev %p evd %p cq %p, time %d\n",
> 			cq->context, evd_ptr, cq, timeout );
>
> -	/* Multiple EVD's sharing one event handle for now */
> -	if (cq) {
> -		struct pollfd cq_poll = {
> -        		.fd      = cq->context->cq_fd[0],
> -			.events  = POLLIN
> +	/* Multiple EVD's sharing one event handle for now until uverbs supports more */
> +
> +	/*
> +	 *  This makes it very inefficient and tricky to manage multiple CQ per device open
> +	 *  For example: 4 threads waiting on separate CQ events will all be woke when
> +	 *  a CQ event fires. So the poll wakes up and the first thread to get to the
> +	 *  the get_cq_event wins and the other 3 will block. The dapl_evd_wait code
> +	 *  above will immediately do a poll_cq after returning from CQ wait and if
> +	 *  nothing on the queue will call this wait again and go back to sleep. So
> +	 *  as long as they all wake up, a mutex is held around the get_cq_event
> +	 *  so no blocking occurs and they all return then everything should work.
> +	 *  Of course, the timeout needs adjusted on the threads that go back to sleep.
> +	 */
> +	while (cq) {
> +		struct pollfd cq_poll = {
> +			.fd      = cq->context->cq_fd[0],
> +			.events  = POLLIN,
> +			.revents = 0
> 		};
> -		int	timeout_ms = -1;
> +		int     timeout_ms = -1;
>
> 		if (timeout != DAT_TIMEOUT_INFINITE)
> 			timeout_ms = timeout/1000;
> -
> +
> +		/* check if another thread processed the event already, pending queue > 0 */
> +		dapl_os_lock( &evd_ptr->header.owner_ia->hca_ptr->ib_trans.cq_lock );
> +		if (dapls_rbuf_count(&evd_ptr->pending_event_queue)) {
> +			dapl_os_unlock( &evd_ptr->header.owner_ia->hca_ptr->ib_trans.cq_lock );
> +			break;
> +		}
> +		dapl_os_unlock( &evd_ptr->header.owner_ia->hca_ptr->ib_trans.cq_lock );
> +
> +		dapl_dbg_log ( DAPL_DBG_TYPE_CM," cq_object_wait: polling\n");
> 		status = poll(&cq_poll, 1, timeout_ms);
> -		if (status == 1)
> -			status = ibv_get_cq_event(cq->context,
> -						  0, &ibv_cq, &ibv_ctx);
> -	}
> -	dapl_dbg_log (DAPL_DBG_TYPE_UTIL,
> -		      " cq_object_wait: RET cq %p ibv_cq %p ibv_ctx %p %x\n",
> -		      cq,ibv_cq,ibv_ctx,status);
> +		dapl_dbg_log ( DAPL_DBG_TYPE_CM," cq_object_wait: poll returned status=%d\n",status);
> +
> +		/*
> +		 * If poll with timeout wakes then hold mutex around a poll with no timeout
> +		 * so subsequent get_cq_events will be guaranteed not to block
> +		 * If the event does not belong to this EVD then put it on proper EVD pending
> +		 * queue under the mutex.
> +		 */
> +		if (status == 1) {
> +			dapl_os_lock( &evd_ptr->header.owner_ia->hca_ptr->ib_trans.cq_lock );
> +			status = poll(&cq_poll, 1, 0);
> +			if (status == 1) {
> +				status = ibv_get_cq_event(cq->context,
> +							  0, &ibv_cq, &ibv_ctx);
> +
> +				/* if event is not ours, put on proper evd pending queue */
> +				/* force another wakeup */
> +				if ((ibv_ctx != evd_ptr ) &&
> +				    (!DAPL_BAD_HANDLE(ibv_ctx, DAPL_MAGIC_EVD))) {
> +					dapl_dbg_log (DAPL_DBG_TYPE_CM,
> +						      " cq_object_wait: ibv_ctx %p != evd %p\n",
> +						      ibv_ctx, evd_ptr);
> +					dapls_evd_copy_cq((struct evd_ptr*)ibv_ctx);
> +					dapl_os_unlock(&evd_ptr->header.owner_ia->hca_ptr->ib_trans.cq_lock );
> +					continue;
> +				}
> +			}
> +			dapl_os_unlock( &evd_ptr->header.owner_ia->hca_ptr->ib_trans.cq_lock );
> +			break;
> +
> +		} else if (status == 0) {
> +			status = ETIMEDOUT;
> +			break;
> +		}
> +	}
> +	dapl_dbg_log (DAPL_DBG_TYPE_CM,
> +		      " cq_object_wait: RET evd %p cq %p ibv_cq %p ibv_ctx %p %s\n",
> +		      evd_ptr, cq,ibv_cq,ibv_ctx,strerror(errno));
>
> 	return(dapl_convert_errno(status,"cq_wait_object_wait"));
>



More information about the general mailing list