[openib-general] Re: fixes to the udapl ucm/uat patch you sent
James Lentini
jlentini at netapp.com
Tue Aug 2 14:46:01 PDT 2005
Hi Arlin,
Can I break this patch into 3 parts: the changes to dapl_evd_wait, the
changes to dapl_evd_resize, and the ib changes? I think it will be
easier to discuss each set of changes seperately (with so many
seperate issues, I'm afraid I've missed your reply to some of these
questions)
dapl_evd_wait:
I looked over the original implementation of dapl_evd_wait() with an
eye towards the situation you described (the caller polling and
finding fewer events than requested, the caller going to turn on
notification, an event occuring, the caller turning on notification,
the caller blocking unaware of the last event). I don't believe that
this would happen in the original implementation. Here's why: after
the caller turns on notification, the code loops, via the continue
statement on line 213, back to the begining of the for loop on line
173 and repolls. Do you agree?
dapl_evd_resize:
I'm still unsure of why you removed the call to
dapls_evd_event_realloc() and moved the work that was being performed
in that routine up into dapl_evd_resize(). If we don't call
dapls_evd_event_realloc() anymore, the code should be removed.
ib changes:
These look ok to me. I've checked them into revision 2955.
On Tue, 2 Aug 2005, Or Gerlitz wrote:
> Arlin,
>
> The patch you sent yesterday had some broken lines (97,949,1174,1332,1355,etc)
> Here it is with the changes that made it patch fine over 2944
>
> Or.
>
> Index: dapl/udapl/dapl_evd_wait.c
> ===================================================================
> --- dapl/udapl/dapl_evd_wait.c (revision 2919)
> +++ dapl/udapl/dapl_evd_wait.c (working copy)
> @@ -74,9 +74,10 @@
> DAPL_EVD *evd_ptr;
> DAT_RETURN dat_status;
> DAT_EVENT *local_event;
> - DAT_BOOLEAN notify_requested = DAT_FALSE;
> + DAT_BOOLEAN notify_needed = DAT_FALSE;
> DAT_BOOLEAN waitable;
> DAPL_EVD_STATE evd_state;
> + DAT_COUNT total_events,new_events;
>
> dapl_dbg_log (DAPL_DBG_TYPE_API,
> "dapl_evd_wait (%p, %d, %d, %p, %p)\n",
> @@ -124,9 +125,9 @@
> }
>
> dapl_dbg_log (DAPL_DBG_TYPE_EVD,
> - "dapl_evd_wait: EVD %p, CQ %p\n",
> - evd_ptr,
> - (void *)evd_ptr->ib_cq_handle);
> + "dapl_evd_wait: EVD %p, CQ %p, Timeout %d, Threshold %d\n",
> + evd_ptr,(void *)evd_ptr->ib_cq_handle, time_out, threshold);
> +
>
> /*
> * Make sure there are no other waiters and the evd is active.
> @@ -144,11 +145,10 @@
> evd_state = dapl_os_atomic_assign ( (DAPL_ATOMIC *)&evd_ptr->evd_state,
> (DAT_COUNT) DAPL_EVD_STATE_OPEN,
> (DAT_COUNT) DAPL_EVD_STATE_WAITED );
> - dapl_os_unlock ( &evd_ptr->header.lock );
>
> - if ( evd_state != DAPL_EVD_STATE_OPEN )
> + dapl_os_unlock ( &evd_ptr->header.lock );
> + if ( evd_state != DAPL_EVD_STATE_OPEN || !waitable)
> {
> - /* Bogus state, bail out */
> dat_status = DAT_ERROR (DAT_INVALID_STATE,0);
> goto bail;
> }
> @@ -182,37 +182,54 @@
> * return right away if the ib_cq_handle associate with these evd
> * equal to IB_INVALID_HANDLE
> */
> - dapls_evd_copy_cq(evd_ptr);
> -
> - if (dapls_rbuf_count(&evd_ptr->pending_event_queue) >= threshold)
> - {
> - break;
> - }
> -
> - /*
> - * Do not enable the completion notification if this evd is not
> - * a DTO_EVD or RMR_BIND_EVD
> + /* Logic to prevent missing completion between copy_cq (poll)
> + * and completion_notify (re-arm)
> */
> - if ( (!notify_requested) &&
> - ((evd_ptr->evd_flags & DAT_EVD_DTO_FLAG) ||
> - (evd_ptr->evd_flags & DAT_EVD_RMR_BIND_FLAG)) )
> + notify_needed = DAT_TRUE;
> + new_events = 0;
> + while (DAT_TRUE)
> {
> - dat_status = dapls_ib_completion_notify (
> - evd_ptr->header.owner_ia->hca_ptr->ib_hca_handle,
> - evd_ptr,
> - (evd_ptr->completion_type == DAPL_EVD_STATE_SOLICITED_WAIT) ?
> - IB_NOTIFY_ON_SOLIC_COMP : IB_NOTIFY_ON_NEXT_COMP );
> -
> - DAPL_CNTR(DCNT_EVD_WAIT_CMP_NTFY);
> - /* FIXME report error */
> - dapl_os_assert(dat_status == DAT_SUCCESS);
> + dapls_evd_copy_cq(evd_ptr); /* poll for new completions */
> + total_events = dapls_rbuf_count (&evd_ptr->pending_event_queue);
> + new_events = total_events - new_events;
> + if (total_events >= threshold ||
> + (!new_events && notify_needed == DAT_FALSE))
> + {
> + break;
> + }
> +
> + /*
> + * Do not enable the completion notification if this evd is not
> + * a DTO_EVD or RMR_BIND_EVD
> + */
> + if ( (evd_ptr->evd_flags & DAT_EVD_DTO_FLAG) ||
> + (evd_ptr->evd_flags & DAT_EVD_RMR_BIND_FLAG) )
> + {
> + dat_status = dapls_ib_completion_notify (
> + evd_ptr->header.owner_ia->hca_ptr->ib_hca_handle,
> + evd_ptr,
> + (evd_ptr->completion_type == DAPL_EVD_STATE_SOLICITED_WAIT)?
> + IB_NOTIFY_ON_SOLIC_COMP : IB_NOTIFY_ON_NEXT_COMP );
> +
> + DAPL_CNTR(DCNT_EVD_WAIT_CMP_NTFY);
> + notify_needed = DAT_FALSE;
> + new_events = total_events;
> +
> + /* FIXME report error */
> + dapl_os_assert(dat_status == DAT_SUCCESS);
> + }
> + else
> + {
> + break;
> + }
>
> - notify_requested = DAT_TRUE;
> + } /* while completions < threshold, and rearm needed */
>
> - /* Try again. */
> - continue;
> + if (total_events >= threshold)
> + {
> + break;
> }
> -
> +
>
> /*
> * Unused by poster; it has no way to tell how many
> @@ -232,8 +249,6 @@
> #endif
> dat_status = dapl_os_wait_object_wait (
> &evd_ptr->wait_object, time_out );
> -
> - notify_requested = DAT_FALSE; /* We've used it up. */
>
> /* See if we were awakened by evd_set_unwaitable */
> if ( !evd_ptr->evd_waitable )
> @@ -243,13 +258,22 @@
>
> if (dat_status != DAT_SUCCESS)
> {
> - /*
> - * If the status is DAT_TIMEOUT, we'll break out of the
> - * loop, *not* dequeue an event (because dat_status
> - * != DAT_SUCCESS), set *nmore (as we should for timeout)
> - * and return DAT_TIMEOUT.
> - */
> - break;
> + /*
> + * If the status is DAT_TIMEOUT, we'll break out of the
> + * loop, *not* dequeue an event (because dat_status
> + * != DAT_SUCCESS), set *nmore (as we should for timeout)
> + * and return DAT_TIMEOUT.
> + */
> +
> +#if defined(DAPL_DBG)
> + dapls_evd_copy_cq(evd_ptr); /* poll */
> + dapl_dbg_log (DAPL_DBG_TYPE_EVD,
> + "dapl_evd_wait: WAKEUP ERROR (0x%x): EVD %p, CQ %p, events? %d\n",
> + dat_status,evd_ptr,(void *)evd_ptr->ib_cq_handle,
> + dapls_rbuf_count(&evd_ptr->pending_event_queue) );
> +#endif /* DAPL_DBG */
> +
> + break;
> }
> }
>
> Index: dapl/udapl/Makefile
> ===================================================================
> --- dapl/udapl/Makefile (revision 2941)
> +++ dapl/udapl/Makefile (working copy)
> @@ -122,7 +122,8 @@
> #
> ifeq ($(VERBS),openib)
> PROVIDER = $(TOPDIR)/../openib
> -CFLAGS += -DOPENIB -DCQ_WAIT_OBJECT
> +CFLAGS += -DOPENIB
> +#CFLAGS += -DCQ_WAIT_OBJECT uncomment when fixed
> CFLAGS += -I/usr/local/include/infiniband
> endif
>
> Index: dapl/common/dapl_evd_resize.c
> ===================================================================
> --- dapl/common/dapl_evd_resize.c (revision 2919)
> +++ dapl/common/dapl_evd_resize.c (working copy)
> @@ -67,71 +67,139 @@
> IN DAT_EVD_HANDLE evd_handle,
> IN DAT_COUNT evd_qlen )
> {
> - DAPL_IA *ia_ptr;
> - DAPL_EVD *evd_ptr;
> - DAT_COUNT pend_cnt;
> - DAT_RETURN dat_status;
> + DAPL_IA *ia_ptr;
> + DAPL_EVD *evd_ptr;
> + DAT_EVENT *event_ptr;
> + DAT_EVENT *events;
> + DAT_EVENT *orig_event;
> + DAPL_RING_BUFFER free_event_queue;
> + DAPL_RING_BUFFER pending_event_queue;
> + DAT_COUNT pend_cnt;
> + DAT_COUNT i;
> + DAT_RETURN dat_status;
>
> dapl_dbg_log (DAPL_DBG_TYPE_API, "dapl_evd_resize (%p, %d)\n",
> evd_handle, evd_qlen);
>
> if (DAPL_BAD_HANDLE (evd_handle, DAPL_MAGIC_EVD))
> {
> - dat_status = DAT_ERROR (DAT_INVALID_HANDLE,0);
> - goto bail;
> + return DAT_ERROR (DAT_INVALID_PARAMETER,DAT_INVALID_ARG1);
> }
>
> evd_ptr = (DAPL_EVD *)evd_handle;
> ia_ptr = evd_ptr->header.owner_ia;
>
> - if ( evd_qlen == evd_ptr->qlen )
> + if ((evd_qlen <= 0) || (evd_ptr->qlen > evd_qlen))
> {
> - dat_status = DAT_SUCCESS;
> - goto bail;
> + dat_status = DAT_ERROR(DAT_INVALID_PARAMETER,DAT_INVALID_ARG2);
> + goto bail;
> }
>
> if ( evd_qlen > ia_ptr->hca_ptr->ia_attr.max_evd_qlen )
> {
> - dat_status = DAT_ERROR (DAT_INVALID_PARAMETER,DAT_INVALID_ARG2);
> + dat_status = DAT_ERROR(DAT_INSUFFICIENT_RESOURCES,DAT_RESOURCE_TEVD);
> goto bail;
> }
>
> dapl_os_lock(&evd_ptr->header.lock);
>
> - /* Don't try to resize if we are actively waiting */
> if (evd_ptr->evd_state == DAPL_EVD_STATE_WAITED)
> {
> - dapl_os_unlock(&evd_ptr->header.lock);
> - dat_status = DAT_ERROR (DAT_INVALID_STATE,0);
> - goto bail;
> + dat_status = DAT_ERROR(DAT_INVALID_STATE,0);
> + goto bail_unlock;
> }
>
> pend_cnt = dapls_rbuf_count(&evd_ptr->pending_event_queue);
> if (pend_cnt > evd_qlen) {
> - dapl_os_unlock(&evd_ptr->header.lock);
> - dat_status = DAT_ERROR (DAT_INVALID_STATE,0);
> - goto bail;
> + dat_status = DAT_ERROR(DAT_INVALID_STATE,0);
> + goto bail_unlock;
> }
>
> dat_status = dapls_ib_cq_resize(evd_ptr->header.owner_ia,
> - evd_ptr,
> - &evd_qlen);
> - if (dat_status != DAT_SUCCESS)
> + evd_ptr,
> + &evd_qlen);
> + if (DAT_SUCCESS != dat_status) {
> + dat_status = DAT_ERROR(DAT_INSUFFICIENT_RESOURCES,DAT_RESOURCE_MEMORY);
> + goto bail_unlock;
> + }
> +
> + /* Allocate EVENTs */
> + events = (DAT_EVENT *) dapl_os_alloc (evd_qlen * sizeof (DAT_EVENT));
> + if (!events)
> {
> - dapl_os_unlock(&evd_ptr->header.lock);
> - goto bail;
> + dat_status = DAT_ERROR(DAT_INSUFFICIENT_RESOURCES,DAT_RESOURCE_MEMORY);
> + goto bail_unlock;
> }
> + event_ptr = events;
>
> - dat_status = dapls_evd_event_realloc (evd_ptr, evd_qlen);
> - if (dat_status != DAT_SUCCESS)
> + /* allocate free event queue */
> + dat_status = dapls_rbuf_alloc (&free_event_queue, evd_qlen);
> + if (DAT_SUCCESS != dat_status)
> {
> - dapl_os_unlock(&evd_ptr->header.lock);
> - goto bail;
> + dapl_os_free(event_ptr, evd_qlen * sizeof (DAT_EVENT));
> + dat_status = DAT_ERROR(DAT_INSUFFICIENT_RESOURCES,DAT_RESOURCE_MEMORY);
> + goto bail_unlock;
> + }
> +
> + /* allocate pending event queue */
> + dat_status = dapls_rbuf_alloc (&pending_event_queue, evd_qlen);
> + if (DAT_SUCCESS != dat_status)
> + {
> + dapl_os_free(event_ptr, evd_qlen * sizeof (DAT_EVENT));
> + dat_status = DAT_ERROR(DAT_INSUFFICIENT_RESOURCES,DAT_RESOURCE_MEMORY);
> + goto bail_unlock;
> }
>
> + for (i = 0; i < pend_cnt; i++)
> + {
> + orig_event = dapls_rbuf_remove(&evd_ptr->pending_event_queue);
> + if (orig_event == NULL) {
> + dapl_dbg_log (DAPL_DBG_TYPE_ERR, " Inconsistent event queue\n");
> + dapl_os_free(event_ptr, evd_qlen * sizeof (DAT_EVENT));
> + dat_status = DAT_ERROR(DAT_INSUFFICIENT_RESOURCES,DAT_RESOURCE_MEMORY);
> + goto bail_unlock;
> + }
> + memcpy(event_ptr, orig_event, sizeof(DAT_EVENT));
> + dat_status = dapls_rbuf_add(&pending_event_queue, event_ptr);
> + if (DAT_SUCCESS != dat_status) {
> + dapl_os_free(event_ptr, evd_qlen * sizeof (DAT_EVENT));
> + dat_status = DAT_ERROR(DAT_INSUFFICIENT_RESOURCES,DAT_RESOURCE_MEMORY);
> + goto bail_unlock;
> + }
> + event_ptr++;
> + }
> +
> + for (i = pend_cnt; i < evd_qlen; i++)
> + {
> + dat_status = dapls_rbuf_add(&free_event_queue,(void *) event_ptr);
> + if (DAT_SUCCESS != dat_status) {
> + dapl_os_free(event_ptr, evd_qlen * sizeof (DAT_EVENT));
> + dat_status = DAT_ERROR(DAT_INSUFFICIENT_RESOURCES,DAT_RESOURCE_MEMORY);
> + goto bail_unlock;
> + }
> + event_ptr++;
> + }
> +
> + dapls_rbuf_destroy (&evd_ptr->free_event_queue);
> + dapls_rbuf_destroy (&evd_ptr->pending_event_queue);
> + if (evd_ptr->events)
> + {
> + dapl_os_free (evd_ptr->events, evd_ptr->qlen * sizeof (DAT_EVENT));
> + }
> + evd_ptr->free_event_queue = free_event_queue;
> + evd_ptr->pending_event_queue = pending_event_queue;
> + evd_ptr->events = events;
> + evd_ptr->qlen = evd_qlen;
> +
> +bail_unlock:
> +
> dapl_os_unlock(&evd_ptr->header.lock);
>
> - bail:
> + dapl_dbg_log (DAPL_DBG_TYPE_RTN,
> + "dapl_evd_resize returns 0x%x\n",dat_status);
> +
> +bail:
> +
> return dat_status;
> }
>
> Index: dapl/openib/TODO
> ===================================================================
> --- dapl/openib/TODO (revision 2919)
> +++ dapl/openib/TODO (working copy)
> @@ -1,7 +1,7 @@
>
> IB Verbs:
> - CQ resize?
> -- query call to get current qp state
> +- query call to get current qp state, remote port number
> - ibv_get_cq_event() needs timed event call and wakeup
> - query call to get device attributes
> - memory window support
> @@ -9,8 +9,6 @@
> DAPL:
> - reinit EP needs a QP timewait completion notification
> - add cq_object wakeup, time based cq_object wait when verbs support arrives
> -- update uDAPL code with real ATS support
> -- etc, etc.
>
> Other:
> - Shared memory in udapl and kernel module to support?
> Index: dapl/openib/dapl_ib_util.c
> ===================================================================
> --- dapl/openib/dapl_ib_util.c (revision 2919)
> +++ dapl/openib/dapl_ib_util.c (working copy)
> @@ -111,27 +111,40 @@
> }
>
>
> -/* just get IP address for hostname */
> -int dapli_get_addr( char *addr, int addr_len)
> +/* just get IP address, IPv4 only for now */
> +int dapli_get_hca_addr( struct dapl_hca *hca_ptr )
> {
> - struct sockaddr_in *ipv4_addr = (struct sockaddr_in*)addr;
> - struct hostent *h_ptr;
> - struct utsname ourname;
> -
> - if ( uname( &ourname ) < 0 )
> - return 1;
> -
> - h_ptr = gethostbyname( ourname.nodename );
> - if ( h_ptr == NULL )
> + struct sockaddr_in *ipv4_addr;
> + struct ib_at_completion at_comp;
> + struct dapl_at_record at_rec;
> + int status;
> + DAT_RETURN dat_status;
> +
> + ipv4_addr = (struct sockaddr_in*)&hca_ptr->hca_address;
> + ipv4_addr->sin_family = AF_INET;
> + ipv4_addr->sin_addr.s_addr = 0;
> +
> + at_comp.fn = dapli_ip_comp_handler;
> + at_comp.context = &at_rec;
> + at_rec.addr = &hca_ptr->hca_address;
> + at_rec.wait_object = &hca_ptr->ib_trans.wait_object;
> +
> + /* call with async_comp until the sync version works */
> + status = ib_at_ips_by_gid(&hca_ptr->ib_trans.gid, &ipv4_addr->sin_addr.s_addr, 1,
> + &at_comp, &at_rec.req_id);
> +
> + if (status < 0)
> return 1;
> -
> - if ( h_ptr->h_addrtype == AF_INET ) {
> - ipv4_addr = (struct sockaddr_in*) addr;
> - ipv4_addr->sin_family = AF_INET;
> - dapl_os_memcpy( &ipv4_addr->sin_addr, h_ptr->h_addr_list[0], 4 );
> - } else
> +
> + if (status > 0)
> + dapli_ip_comp_handler(at_rec.req_id, (void*)ipv4_addr, status);
> +
> + /* wait for answer, 5 seconds max */
> + dat_status = dapl_os_wait_object_wait (&hca_ptr->ib_trans.wait_object,5000000);
> +
> + if ((dat_status != DAT_SUCCESS ) || (!ipv4_addr->sin_addr.s_addr))
> return 1;
> -
> +
> return 0;
> }
>
> @@ -152,14 +165,17 @@
> */
> int32_t dapls_ib_init (void)
> {
> - if (dapli_cm_thread_init())
> - return -1;
> - else
> - return 0;
> + dapl_dbg_log (DAPL_DBG_TYPE_UTIL, " dapl_ib_init: \n" );
> + if (dapli_cm_thread_init() || dapli_at_thread_init())
> + return 1;
> +
> + return 0;
> }
>
> int32_t dapls_ib_release (void)
> {
> + dapl_dbg_log (DAPL_DBG_TYPE_UTIL, " dapl_ib_release: \n" );
> + dapli_at_thread_destroy();
> dapli_cm_thread_destroy();
> return 0;
> }
> @@ -186,7 +202,6 @@
> IN DAPL_HCA *hca_ptr)
> {
> struct dlist *dev_list;
> - DAT_RETURN dat_status = DAT_SUCCESS;
>
> dapl_dbg_log (DAPL_DBG_TYPE_UTIL,
> " open_hca: %s - %p\n", hca_name, hca_ptr );
> @@ -217,36 +232,46 @@
> ibv_get_device_name(hca_ptr->ib_trans.ib_dev) );
> return DAT_INTERNAL_ERROR;
> }
> -
> +
> /* set inline max with enviromment or default, get local lid and gid 0 */
> hca_ptr->ib_trans.max_inline_send =
> dapl_os_get_env_val ( "DAPL_MAX_INLINE", INLINE_SEND_DEFAULT );
>
> - if ( dapli_get_lid(hca_ptr, hca_ptr->port_num,
> - &hca_ptr->ib_trans.lid )) {
> + if (dapli_get_lid(hca_ptr, hca_ptr->port_num,
> + &hca_ptr->ib_trans.lid)) {
> dapl_dbg_log (DAPL_DBG_TYPE_ERR,
> " open_hca: IB get LID failed for %s\n",
> ibv_get_device_name(hca_ptr->ib_trans.ib_dev) );
> - return DAT_INTERNAL_ERROR;
> + goto bail;
> }
>
> - if ( dapli_get_gid(hca_ptr, hca_ptr->port_num, 0,
> - &hca_ptr->ib_trans.gid )) {
> + if (dapli_get_gid(hca_ptr, hca_ptr->port_num, 0,
> + &hca_ptr->ib_trans.gid)) {
> dapl_dbg_log (DAPL_DBG_TYPE_ERR,
> " open_hca: IB get GID failed for %s\n",
> ibv_get_device_name(hca_ptr->ib_trans.ib_dev) );
> - return DAT_INTERNAL_ERROR;
> + goto bail;
> }
> -
> /* get the IP address of the device */
> - if ( dapli_get_addr((char*)&hca_ptr->hca_address,
> - sizeof(DAT_SOCK_ADDR6) )) {
> + if (dapli_get_hca_addr(hca_ptr)) {
> dapl_dbg_log (DAPL_DBG_TYPE_ERR,
> " open_hca: IB get ADDR failed for %s\n",
> ibv_get_device_name(hca_ptr->ib_trans.ib_dev) );
> - return DAT_INTERNAL_ERROR;
> + goto bail;
> + }
> +
> + /* one thread for each device open */
> + if (dapli_cq_thread_init(hca_ptr)) {
> + dapl_dbg_log (DAPL_DBG_TYPE_ERR,
> + " open_hca: cq_thread_init failed for %s\n",
> + ibv_get_device_name(hca_ptr->ib_trans.ib_dev) );
> + goto bail;
> }
>
> + /* initialize cq_lock and wait object */
> + dapl_os_lock_init(&hca_ptr->ib_trans.cq_lock);
> + dapl_os_wait_object_init (&hca_ptr->ib_trans.wait_object);
> +
> dapl_dbg_log (DAPL_DBG_TYPE_UTIL,
> " open_hca: %s, port %d, %s %d.%d.%d.%d INLINE_MAX=%d\n",
> ibv_get_device_name(hca_ptr->ib_trans.ib_dev), hca_ptr->port_num,
> @@ -257,7 +282,19 @@
> ((struct sockaddr_in *)&hca_ptr->hca_address)->sin_addr.s_addr >> 24 & 0xff,
> hca_ptr->ib_trans.max_inline_send );
>
> - return dat_status;
> + dapl_dbg_log(DAPL_DBG_TYPE_CM,
> + " open_hca: LID 0x%x GID subnet %016llx id %016llx\n",
> + hca_ptr->ib_trans.lid,
> + (unsigned long long)bswap_64(hca_ptr->ib_trans.gid.global.subnet_prefix),
> + (unsigned long long)bswap_64(hca_ptr->ib_trans.gid.global.interface_id) );
> +
> + return DAT_SUCCESS;
> +
> +bail:
> + ibv_close_device(hca_ptr->ib_hca_handle);
> + hca_ptr->ib_hca_handle = IB_INVALID_HANDLE;
> + return DAT_INTERNAL_ERROR;
> +
> }
>
>
> @@ -282,10 +319,14 @@
> dapl_dbg_log (DAPL_DBG_TYPE_UTIL," close_hca: %p\n",hca_ptr);
>
> if (hca_ptr->ib_hca_handle != IB_INVALID_HANDLE) {
> + dapli_cq_thread_destroy(hca_ptr);
> if (ibv_close_device(hca_ptr->ib_hca_handle))
> return(dapl_convert_errno(errno,"ib_close_device"));
> hca_ptr->ib_hca_handle = IB_INVALID_HANDLE;
> }
> +
> + dapl_os_lock_destroy(&hca_ptr->ib_trans.cq_lock);
> +
> return (DAT_SUCCESS);
> }
>
> @@ -448,35 +489,4 @@
> return DAT_SUCCESS;
> }
>
> -#ifdef PROVIDER_SPECIFIC_ATTR
> -
> -/*
> - * dapls_set_provider_specific_attr
> - *
> - * Input:
> - * attr_ptr Pointer provider attributes
> - *
> - * Output:
> - * none
> - *
> - * Returns:
> - * void
> - */
> -DAT_NAMED_ATTR ib_attrs[] = {
> - {
> - "I_DAT_SEND_INLINE_THRESHOLD",
> - "128"
> - },
> -};
> -
> -#define SPEC_ATTR_SIZE( x ) (sizeof( x ) / sizeof( DAT_NAMED_ATTR))
> -
> -void dapls_set_provider_specific_attr(
> - IN DAT_PROVIDER_ATTR *attr_ptr )
> -{
> - attr_ptr->num_provider_specific_attr = SPEC_ATTR_SIZE( ib_attrs );
> - attr_ptr->provider_specific_attr = ib_attrs;
> -}
> -
> -#endif
>
> Index: dapl/openib/dapl_ib_cm.c
> ===================================================================
> --- dapl/openib/dapl_ib_cm.c (revision 2919)
> +++ dapl/openib/dapl_ib_cm.c (working copy)
> @@ -70,19 +70,8 @@
> static inline uint64_t cpu_to_be64(uint64_t x) { return x; }
> #endif
>
> -#ifndef IB_AT
> -
> -#include <stdio.h>
> -#include <unistd.h>
> -#include <fcntl.h>
> -#include <netinet/tcp.h>
> -#include <sysfs/libsysfs.h>
> -#include <signal.h>
> -
> -/* iclust-20 hard coded values, network order */
> -#define REMOTE_GID "fe80:0000:0000:0000:0002:c902:0000:4071"
> -#define REMOTE_LID "0002"
> -
> +static int g_at_destroy;
> +static DAPL_OS_THREAD g_at_thread;
> static int g_cm_destroy;
> static DAPL_OS_THREAD g_cm_thread;
> static DAPL_OS_LOCK g_cm_lock;
> @@ -122,7 +111,7 @@
> while (g_cm_destroy) {
> struct timespec sleep, remain;
> sleep.tv_sec = 0;
> - sleep.tv_nsec = 200000000; /* 200 ms */
> + sleep.tv_nsec = 10000000; /* 10 ms */
> dapl_dbg_log(DAPL_DBG_TYPE_CM,
> " cm_thread_destroy: waiting for cm_thread\n");
> nanosleep (&sleep, &remain);
> @@ -130,112 +119,70 @@
> dapl_dbg_log(DAPL_DBG_TYPE_CM," cm_thread_destroy(%d) exit\n",getpid());
> }
>
> -static int ib_at_route_by_ip(uint32_t dst_ip, uint32_t src_ip, int tos, uint16_t flags,
> - struct ib_at_ib_route *ib_route,
> - struct ib_at_completion *async_comp)
> -{
> - struct dapl_cm_id *conn = (struct dapl_cm_id *)async_comp->context;
> -
> - dapl_dbg_log (
> - DAPL_DBG_TYPE_CM,
> - " CM at_route_by_ip: conn %p cm_id %d src %d.%d.%d.%d -> dst %d.%d.%d.%d (%d)\n",
> - conn,conn->cm_id,
> - src_ip >> 0 & 0xff, src_ip >> 8 & 0xff,
> - src_ip >> 16 & 0xff,src_ip >> 24 & 0xff,
> - dst_ip >> 0 & 0xff, dst_ip >> 8 & 0xff,
> - dst_ip >> 16 & 0xff,dst_ip >> 24 & 0xff, conn->service_id);
> -
> - /* use req_id for loopback indication */
> - if (( src_ip == dst_ip ) || ( dst_ip == 0x0100007f ))
> - async_comp->req_id = 1;
> - else
> - async_comp->req_id = 0;
> -
> - return 1;
> -}
> -
> -static int ib_at_paths_by_route(struct ib_at_ib_route *ib_route, uint32_t mpath_type,
> - struct ib_sa_path_rec *pr, int npath,
> - struct ib_at_completion *async_comp)
> +int dapli_at_thread_init(void)
> {
> - struct dapl_cm_id *conn = (struct dapl_cm_id *)async_comp->context;
> - char *env, *token;
> - char dgid[40];
> - uint16_t *p_gid = (uint16_t*)&ib_route->gid;
> + DAT_RETURN dat_status;
>
> - /* set local path record values and send to remote */
> - (void)dapl_os_memzero(pr, sizeof(*pr));
> + dapl_dbg_log(DAPL_DBG_TYPE_CM," at_thread_init(%d)\n", getpid());
>
> - pr->slid = htons(conn->hca->ib_trans.lid);
> - pr->sgid.global.subnet_prefix = conn->hca->ib_trans.gid.global.subnet_prefix;
> - pr->sgid.global.interface_id = conn->hca->ib_trans.gid.global.interface_id;
> + /* create thread to process AT async requests */
> + dat_status = dapl_os_thread_create(at_thread, NULL, &g_at_thread);
> + if (dat_status != DAT_SUCCESS)
> + {
> + dapl_dbg_log(DAPL_DBG_TYPE_ERR,
> + " at_thread_init: failed to create thread\n");
> + return 1;
> + }
> + return 0;
> +}
>
> - env = getenv("DAPL_REMOTE_LID");
> - if ( env == NULL )
> - env = REMOTE_LID;
> - ib_route->lid = strtol(env,NULL,0);
> +void dapli_at_thread_destroy(void)
> +{
> + dapl_dbg_log(DAPL_DBG_TYPE_CM," at_thread_destroy(%d)\n", getpid());
>
> - env = getenv("DAPL_REMOTE_GID");
> - if ( env == NULL )
> - env = REMOTE_GID;
> + /* destroy cr_thread and lock */
> + g_at_destroy = 1;
> + pthread_kill( g_at_thread, SIGUSR1 );
> + dapl_dbg_log(DAPL_DBG_TYPE_CM," at_thread_destroy(%d) SIGUSR1 sent\n",getpid());
> + while (g_at_destroy) {
> + struct timespec sleep, remain;
> + sleep.tv_sec = 0;
> + sleep.tv_nsec = 10000000; /* 10 ms */
> + dapl_dbg_log(DAPL_DBG_TYPE_CM,
> + " at_thread_destroy: waiting for at_thread\n");
> + nanosleep (&sleep, &remain);
> + }
> + dapl_dbg_log(DAPL_DBG_TYPE_CM," at_thread_destroy(%d) exit\n",getpid());
> +}
>
> - dapl_dbg_log(DAPL_DBG_TYPE_CM,
> - " ib_at_paths_by_route: remote LID %x GID %s\n",
> - ib_route->lid,env);
> +void dapli_ip_comp_handler(uint64_t req_id, void *context, int rec_num)
> +{
> + struct dapl_at_record *at_rec = context;
>
> - dapl_os_memcpy( dgid, env, 40 );
> + dapl_dbg_log(DAPL_DBG_TYPE_CM,
> + " ip_comp_handler: ctxt %p, req_id %lld rec_num %d\n",
> + context, req_id, rec_num);
>
> - /* get GID with token strings and delimiter */
> - token = strtok(dgid,":");
> - while (token) {
> - *p_gid = strtoul(token,NULL,16);
> - *p_gid = htons(*p_gid); /* convert each token to network order */
> - token = strtok(NULL,":");
> - p_gid++;
> - }
> -
> - /* set remote lid and gid, req_id is indication of loopback */
> - if ( !async_comp->req_id ) {
> - pr->dlid = htons(ib_route->lid);
> - pr->dgid.global.subnet_prefix = ib_route->gid.global.subnet_prefix;
> - pr->dgid.global.interface_id = ib_route->gid.global.interface_id;
> - } else {
> - pr->dlid = pr->slid;
> - pr->dgid.global.subnet_prefix = pr->sgid.global.subnet_prefix;
> - pr->dgid.global.interface_id = pr->sgid.global.interface_id;
> - }
> -
> - pr->reversible = 0x1000000;
> - pr->pkey = 0xffff;
> - pr->mtu = IBV_MTU_1024;
> - pr->mtu_selector = 2;
> - pr->rate_selector = 2;
> - pr->rate = 3;
> - pr->packet_life_time_selector = 2;
> - pr->packet_life_time = 2;
> + if ((at_rec) && ( at_rec->req_id == req_id)) {
> + dapl_os_wait_object_wakeup(at_rec->wait_object);
> + return;
> + }
>
> - dapl_dbg_log(DAPL_DBG_TYPE_CM,
> - " ib_at_paths_by_route: SRC LID 0x%x GID subnet %016llx id %016llx\n",
> - pr->slid,(unsigned long long)(pr->sgid.global.subnet_prefix),
> - (unsigned long long)(pr->sgid.global.interface_id) );
> - dapl_dbg_log(DAPL_DBG_TYPE_CM,
> - " ib_at_paths_by_route: DST LID 0x%x GID subnet %016llx id %016llx\n",
> - pr->dlid,(unsigned long long)(pr->dgid.global.subnet_prefix),
> - (unsigned long long)(pr->dgid.global.interface_id) );
> -
> - dapli_path_comp_handler( async_comp->req_id, (void*)conn, 1);
> -
> - return 0;
> + dapl_dbg_log(DAPL_DBG_TYPE_ERR,
> + " ip_comp_handler: at_rec->req_id %lld != req_id %lld\n",
> + at_rec->req_id, req_id );
> }
>
> -#endif /* ifndef IB_AT */
> -
> static void dapli_path_comp_handler(uint64_t req_id, void *context, int rec_num)
> {
> struct dapl_cm_id *conn = context;
> int status;
> ib_cm_events_t event;
>
> + dapl_dbg_log(DAPL_DBG_TYPE_CM,
> + " path_comp_handler: ctxt %p, req_id %lld rec_num %d\n",
> + context, req_id, rec_num);
> +
> if (rec_num <= 0) {
> dapl_dbg_log(DAPL_DBG_TYPE_CM,
> " path_comp_handler: resolution err %d retry %d\n",
> @@ -249,7 +196,7 @@
>
> status = ib_at_paths_by_route(&conn->dapl_rt, 0,
> &conn->dapl_path, 1,
> - &conn->dapl_comp);
> + &conn->dapl_comp, &conn->dapl_comp.req_id);
> if (status) {
> dapl_dbg_log(DAPL_DBG_TYPE_CM,
> " path_by_route: err %d id %lld\n",
> @@ -287,6 +234,21 @@
> int status;
> ib_cm_events_t event;
>
> + dapl_dbg_log(DAPL_DBG_TYPE_CM,
> + " rt_comp_handler: conn %p, req_id %lld rec_num %d\n",
> + conn, req_id, rec_num);
> +
> + dapl_dbg_log(DAPL_DBG_TYPE_CM,
> + " rt_comp_handler: SRC GID subnet %016llx id %016llx\n",
> + (unsigned long long)cpu_to_be64(conn->dapl_rt.sgid.global.subnet_prefix),
> + (unsigned long long)cpu_to_be64(conn->dapl_rt.sgid.global.interface_id) );
> +
> + dapl_dbg_log(DAPL_DBG_TYPE_CM,
> + " rt_comp_handler: DST GID subnet %016llx id %016llx\n",
> + (unsigned long long)cpu_to_be64(conn->dapl_rt.dgid.global.subnet_prefix),
> + (unsigned long long)cpu_to_be64(conn->dapl_rt.dgid.global.interface_id) );
> +
> +
> if (rec_num <= 0) {
> dapl_dbg_log(DAPL_DBG_TYPE_ERR,
> " dapl_rt_comp_handler: rec %d retry %d\n",
> @@ -298,7 +260,8 @@
> }
>
> status = ib_at_route_by_ip(((struct sockaddr_in *)&conn->r_addr)->sin_addr.s_addr,
> - 0, 0, 0, &conn->dapl_rt, &conn->dapl_comp);
> + 0, 0, 0, &conn->dapl_rt,
> + &conn->dapl_comp,&conn->dapl_comp.req_id);
> if (status < 0) {
> dapl_dbg_log(DAPL_DBG_TYPE_ERR, "dapl_rt_comp_handler: "
> "ib_at_route_by_ip failed with status %d\n",
> @@ -306,9 +269,16 @@
> event = IB_CME_DESTINATION_UNREACHABLE;
> goto bail;
> }
> -
> if (status == 1)
> dapli_rt_comp_handler(conn->dapl_comp.req_id, conn, 1);
> +
> + return;
> + }
> +
> + if (!conn->dapl_rt.dgid.global.subnet_prefix || req_id != conn->dapl_comp.req_id) {
> + dapl_dbg_log(DAPL_DBG_TYPE_ERR,
> + " dapl_rt_comp_handler: ERROR: unexpected callback req_id=%d(%d)\n",
> + req_id, conn->dapl_comp.req_id );
> return;
> }
>
> @@ -316,7 +286,7 @@
> conn->dapl_comp.context = conn;
> conn->retries = 0;
> status = ib_at_paths_by_route(&conn->dapl_rt, 0, &conn->dapl_path, 1,
> - &conn->dapl_comp);
> + &conn->dapl_comp, &conn->dapl_comp.req_id);
> if (status) {
> dapl_dbg_log(DAPL_DBG_TYPE_ERR,
> "dapl_rt_comp_handler: ib_at_paths_by_route "
> @@ -346,8 +316,6 @@
> ib_cm_destroy_id(conn->cm_id);
> if (conn->ep)
> conn->ep->cm_handle = IB_INVALID_HANDLE;
> - if (conn->sp)
> - conn->sp->cm_srvc_handle = IB_INVALID_HANDLE;
>
> /* take off the CM thread work queue and free */
> dapl_os_lock( &g_cm_lock );
> @@ -621,10 +589,8 @@
> }
>
> /* something to catch the signal */
> -static void cm_handler(int signum)
> +static void ib_sig_handler(int signum)
> {
> - dapl_dbg_log (DAPL_DBG_TYPE_CM," cm_thread(%d,0x%x): ENTER cm_handler %d\n",
> - getpid(),g_cm_thread,signum);
> return;
> }
>
> @@ -643,7 +609,7 @@
> sigemptyset(&sigset);
> sigaddset(&sigset, SIGUSR1);
> pthread_sigmask(SIG_UNBLOCK, &sigset, NULL);
> - signal( SIGUSR1, cm_handler);
> + signal( SIGUSR1, ib_sig_handler);
>
> dapl_os_lock( &g_cm_lock );
> while (!g_cm_destroy) {
> @@ -667,7 +633,7 @@
> dapl_dbg_log(DAPL_DBG_TYPE_CM,
> " cm_thread: GET EVENT fd=%d n=%d\n",
> ib_cm_get_fd(),ret);
> - if (ib_cm_event_get(&event)) {
> + if (ib_cm_event_get_timed(0,&event)) {
> dapl_dbg_log(DAPL_DBG_TYPE_CM,
> " cm_thread: ERR %s eventi_get on %d\n",
> strerror(errno), ib_cm_get_fd() );
> @@ -732,6 +698,33 @@
> g_cm_destroy = 0;
> }
>
> +/* async AT processing thread */
> +void at_thread(void *arg)
> +{
> + sigset_t sigset;
> +
> + dapl_dbg_log (DAPL_DBG_TYPE_CM,
> + " at_thread(%d,0x%x): ENTER: at_fd %d\n",
> + getpid(), g_at_thread, ib_at_get_fd());
> +
> + sigemptyset(&sigset);
> + sigaddset(&sigset, SIGUSR1);
> + pthread_sigmask(SIG_UNBLOCK, &sigset, NULL);
> + signal(SIGUSR1, ib_sig_handler);
> +
> + while (!g_at_destroy) {
> + /* poll forever until callback or signal */
> + if (ib_at_callback_get_timed(-1) < 0) {
> + dapl_dbg_log(DAPL_DBG_TYPE_CM,
> + " at_thread: SIG? ret=%s, destroy=%d\n",
> + strerror(errno), g_at_destroy );
> + }
> + dapl_dbg_log(DAPL_DBG_TYPE_CM," at_thread: callback woke\n");
> + }
> + dapl_dbg_log(DAPL_DBG_TYPE_CM," at_thread(%d) EXIT \n", getpid());
> + g_at_destroy = 0;
> +}
> +
> /************************ DAPL provider entry points **********************/
>
> /*
> @@ -826,33 +819,34 @@
> conn->dapl_comp.context = conn;
> conn->retries = 0;
> dapl_os_memcpy(&conn->r_addr, r_addr, sizeof(DAT_SOCK_ADDR6));
> +
> + /* put on CM thread work queue */
> + dapl_llist_init_entry((DAPL_LLIST_ENTRY*)&conn->entry);
> + dapl_os_lock( &g_cm_lock );
> + dapl_llist_add_tail(&g_cm_list,
> + (DAPL_LLIST_ENTRY*)&conn->entry, conn);
> + dapl_os_unlock(&g_cm_lock);
>
> status = ib_at_route_by_ip(
> ((struct sockaddr_in *)&conn->r_addr)->sin_addr.s_addr,
> ((struct sockaddr_in *)&conn->hca->hca_address)->sin_addr.s_addr,
> - 0, 0, &conn->dapl_rt, &conn->dapl_comp);
> + 0, 0, &conn->dapl_rt, &conn->dapl_comp, &conn->dapl_comp.req_id);
> +
> + dapl_dbg_log(DAPL_DBG_TYPE_CM, " connect: at_route ret=%d,%s req_id %d GID %016llx %016llx\n",
> + status, strerror(errno), conn->dapl_comp.req_id,
> + (unsigned long long)cpu_to_be64(conn->dapl_rt.dgid.global.subnet_prefix),
> + (unsigned long long)cpu_to_be64(conn->dapl_rt.dgid.global.interface_id) );
>
> if (status < 0) {
> dat_status = dapl_convert_errno(errno,"ib_at_route_by_ip");
> - goto destroy;
> + dapli_destroy_cm_id(conn);
> + return dat_status;
> }
> - if (status == 1)
> - dapli_rt_comp_handler(conn->dapl_comp.req_id, conn, 1);
>
> -
> - /* put on CM thread work queue */
> - dapl_llist_init_entry((DAPL_LLIST_ENTRY*)&conn->entry);
> - dapl_os_lock( &g_cm_lock );
> - dapl_llist_add_tail(&g_cm_list,
> - (DAPL_LLIST_ENTRY*)&conn->entry, conn);
> - dapl_os_unlock(&g_cm_lock);
> + if (status > 0)
> + dapli_rt_comp_handler(conn->dapl_comp.req_id, conn, status);
>
> return DAT_SUCCESS;
> -
> -destroy:
> - dapli_destroy_cm_id(conn);
> - return dat_status;
> -
> }
>
> /*
> @@ -992,6 +986,13 @@
> conn->hca = ia_ptr->hca_ptr;
> conn->service_id = ServiceID;
>
> + /* put on CM thread work queue */
> + dapl_llist_init_entry((DAPL_LLIST_ENTRY*)&conn->entry);
> + dapl_os_lock( &g_cm_lock );
> + dapl_llist_add_tail(&g_cm_list,
> + (DAPL_LLIST_ENTRY*)&conn->entry, conn);
> + dapl_os_unlock(&g_cm_lock);
> +
> dapl_dbg_log(DAPL_DBG_TYPE_EP,
> " setup_listener(conn=%p cm_id=%d)\n",
> sp_ptr->cm_srvc_handle,conn->cm_id);
> @@ -1003,19 +1004,13 @@
> dat_status = DAT_CONN_QUAL_IN_USE;
> else
> dat_status = DAT_INSUFFICIENT_RESOURCES;
> - /* success */
> - } else {
> - /* put on CM thread work queue */
> - dapl_llist_init_entry((DAPL_LLIST_ENTRY*)&conn->entry);
> - dapl_os_lock( &g_cm_lock );
> - dapl_llist_add_tail(&g_cm_list,
> - (DAPL_LLIST_ENTRY*)&conn->entry, conn);
> - dapl_os_unlock(&g_cm_lock);
> +
> + dapli_destroy_cm_id(conn);
> return dat_status;
> }
>
> - dapli_destroy_cm_id(conn);
> - return dat_status;
> + /* success */
> + return DAT_SUCCESS;
> }
>
>
> @@ -1047,9 +1042,11 @@
> " remove_listener(ia_ptr %p sp_ptr %p cm_ptr %p)\n",
> ia_ptr, sp_ptr, conn );
>
> - if (sp_ptr->cm_srvc_handle != IB_INVALID_HANDLE)
> + if (conn != IB_INVALID_HANDLE) {
> + sp_ptr->cm_srvc_handle = NULL;
> dapli_destroy_cm_id(conn);
> -
> + }
> +
> return DAT_SUCCESS;
> }
>
> Index: dapl/openib/dapl_ib_util.h
> ===================================================================
> --- dapl/openib/dapl_ib_util.h (revision 2919)
> +++ dapl/openib/dapl_ib_util.h (working copy)
> @@ -53,6 +53,7 @@
> #include <byteswap.h>
> #include <infiniband/sa.h>
> #include <infiniband/cm.h>
> +#include <infiniband/at.h>
>
> /* Typedefs to map common DAPL provider types to IB verbs */
> typedef struct ibv_qp *ib_qp_handle_t;
> @@ -68,8 +69,8 @@
>
> #define IB_RC_RETRY_COUNT 7
> #define IB_RNR_RETRY_COUNT 7
> -#define IB_CM_RESPONSE_TIMEOUT 20 /* 4 sec */
> -#define IB_MAX_CM_RETRIES 4
> +#define IB_CM_RESPONSE_TIMEOUT 18 /* 1 sec */
> +#define IB_MAX_CM_RETRIES 7
>
> #define IB_REQ_MRA_TIMEOUT 27 /* a little over 9 minutes */
> #define IB_MAX_AT_RETRY 3
> @@ -92,21 +93,12 @@
> IB_CME_BROKEN
> } ib_cm_events_t;
>
> -#ifndef IB_AT
> -/* implement a quick hack to exchange GID/LID's until user IB_AT arrives */
> -struct ib_at_ib_route {
> - union ibv_gid gid;
> - uint16_t lid;
> +struct dapl_at_record {
> + uint64_t req_id;
> + DAT_SOCK_ADDR6 *addr;
> + DAPL_OS_WAIT_OBJECT *wait_object;
> };
>
> -struct ib_at_completion {
> - void (*fn)(uint64_t req_id, void *context, int rec_num);
> - void *context;
> - uint64_t req_id;
> -};
> -
> -#endif
> -
> /*
> * dapl_llist_entry in dapl.h but dapl.h depends on provider
> * typedef's in this file first. move dapl_llist_entry out of dapl.h
> @@ -122,6 +114,7 @@
> struct dapl_cm_id {
> struct ib_llist_entry entry;
> DAPL_OS_LOCK lock;
> + DAPL_OS_WAIT_OBJECT wait_object;
> int retries;
> int destroy;
> int in_callback;
> @@ -238,6 +231,10 @@
> {
> struct ibv_device *ib_dev;
> ib_cq_handle_t ib_cq_empty;
> + DAPL_OS_LOCK cq_lock;
> + DAPL_OS_WAIT_OBJECT wait_object;
> + int cq_destroy;
> + DAPL_OS_THREAD cq_thread;
> int max_inline_send;
> uint16_t lid;
> union ibv_gid gid;
> @@ -257,11 +254,18 @@
> void cm_thread (void *arg);
> int dapli_cm_thread_init(void);
> void dapli_cm_thread_destroy(void);
> +void at_thread (void *arg);
> +int dapli_at_thread_init(void);
> +void dapli_at_thread_destroy(void);
> +void cq_thread (void *arg);
> +int dapli_cq_thread_init(struct dapl_hca *hca_ptr);
> +void dapli_cq_thread_destroy(struct dapl_hca *hca_ptr);
>
> -int dapli_get_lid(struct dapl_hca *hca_ptr, int port, uint16_t *lid );
> +int dapli_get_lid(struct dapl_hca *hca_ptr, int port, uint16_t *lid);
> int dapli_get_gid(struct dapl_hca *hca_ptr, int port, int index,
> - union ibv_gid *gid );
> -int dapli_get_addr(char *addr, int addr_len);
> + union ibv_gid *gid);
> +int dapli_get_hca_addr(struct dapl_hca *hca_ptr);
> +void dapli_ip_comp_handler(uint64_t req_id, void *context, int rec_num);
>
> DAT_RETURN
> dapls_modify_qp_state ( IN ib_qp_handle_t qp_handle,
> Index: dapl/openib/README
> ===================================================================
> --- dapl/openib/README (revision 2919)
> +++ dapl/openib/README (working copy)
> @@ -39,18 +39,16 @@
>
> server: dtest -s
> client: dtest -h hostname
> +
> +Testing: dtest, dapltest - cl.sh regress.sh
>
> -setup/known issues:
> -
> - First drop with uCM (without IBAT), tested with simple dtest across 2 nodes.
> - hand rolled path records require remote LID and GID set via enviroment:
> +Setup:
>
> - export DAPL_REMOTE_GID "fe80:0000:0000:0000:0002:c902:0000:4071"
> - export DAPL_REMOTE_LID "0002"
> + Third drop of code, includes uCM and uAT support.
> + NOTE: requires both uCM and uAT libraries and device modules from trunk.
>
> - Also, hard coded (RTR) for use with port 1 only.
> -
> +Known issues:
> no memory windows support in ibverbs, dat_create_rmr fails.
> + some uCM scale up issues with an 8 thread dapltest in regress.sh
> + hard coded modify QP RTR to port 1, waiting for ib_cm_init_qp_attr call.
>
> -
> -
> Index: dapl/openib/dapl_ib_cq.c
> ===================================================================
> --- dapl/openib/dapl_ib_cq.c (revision 2919)
> +++ dapl/openib/dapl_ib_cq.c (working copy)
> @@ -50,9 +50,96 @@
> #include "dapl_adapter_util.h"
> #include "dapl_lmr_util.h"
> #include "dapl_evd_util.h"
> +#include "dapl_ring_buffer_util.h"
> #include <sys/poll.h>
> +#include <signal.h>
>
> +int dapli_cq_thread_init(struct dapl_hca *hca_ptr)
> +{
> + DAT_RETURN dat_status;
> +
> + dapl_dbg_log(DAPL_DBG_TYPE_UTIL," cq_thread_init(%p)\n", hca_ptr);
> +
> + /* create thread to process inbound connect request */
> + dat_status = dapl_os_thread_create( cq_thread, (void*)hca_ptr,&hca_ptr->ib_trans.cq_thread);
> + if (dat_status != DAT_SUCCESS)
> + {
> + dapl_dbg_log(DAPL_DBG_TYPE_ERR,
> + " cq_thread_init: failed to create thread\n");
> + return 1;
> + }
> + return 0;
> +}
> +
> +void dapli_cq_thread_destroy(struct dapl_hca *hca_ptr)
> +{
> + dapl_dbg_log(DAPL_DBG_TYPE_UTIL," cq_thread_destroy(%p)\n", hca_ptr);
> +
> + /* destroy cr_thread and lock */
> + hca_ptr->ib_trans.cq_destroy = 1;
> + pthread_kill(hca_ptr->ib_trans.cq_thread, SIGUSR1);
> + dapl_dbg_log(DAPL_DBG_TYPE_CM," cq_thread_destroy(%p) SIGUSR1 sent\n",hca_ptr);
> + while (hca_ptr->ib_trans.cq_destroy != 2) {
> + struct timespec sleep, remain;
> + sleep.tv_sec = 0;
> + sleep.tv_nsec = 10000000; /* 10 ms */
> + dapl_dbg_log(DAPL_DBG_TYPE_UTIL,
> + " cq_thread_destroy: waiting for cq_thread\n");
> + nanosleep (&sleep, &remain);
> + }
> + dapl_dbg_log(DAPL_DBG_TYPE_UTIL," cq_thread_destroy(%d) exit\n",getpid());
> + return;
> +}
> +
> +/* something to catch the signal */
> +static void ib_cq_handler(int signum)
> +{
> + return;
> +}
> +
> +void cq_thread( void *arg )
> +{
> + struct dapl_hca *hca_ptr = arg;
> + struct dapl_evd *evd_ptr;
> + struct ibv_cq *ibv_cq = NULL;
> + sigset_t sigset;
> + int status = 0;
> +
> + dapl_dbg_log ( DAPL_DBG_TYPE_UTIL," cq_thread: ENTER hca %p\n",hca_ptr);
> +
> + sigemptyset(&sigset);
> + sigaddset(&sigset,SIGUSR1);
> + pthread_sigmask(SIG_UNBLOCK, &sigset, NULL);
> + signal(SIGUSR1, ib_cq_handler);
> +
> + /* wait on DTO event, or signal to abort */
> + while (!hca_ptr->ib_trans.cq_destroy) {
> +
> + struct pollfd cq_poll = {
> + .fd = hca_ptr->ib_hca_handle->cq_fd[0],
> + .events = POLLIN,
> + .revents = 0
> + };
>
> + status = poll(&cq_poll, 1, -1);
> + if ((status == 1) &&
> + (!ibv_get_cq_event(hca_ptr->ib_hca_handle, 0, &ibv_cq, (void*)&evd_ptr))) {
> +
> + if (DAPL_BAD_HANDLE(evd_ptr, DAPL_MAGIC_EVD))
> + continue;
> +
> + /* process DTO event via callback */
> + dapl_evd_dto_callback ( evd_ptr->header.owner_ia->hca_ptr->ib_hca_handle,
> + evd_ptr->ib_cq_handle,
> + (void*)evd_ptr );
> + } else {
> +
> + }
> + }
> + hca_ptr->ib_trans.cq_destroy = 2;
> + dapl_dbg_log(DAPL_DBG_TYPE_UTIL," cq_thread: EXIT: hca %p \n", hca_ptr);
> + return;
> +}
> /*
> * Map all verbs DTO completion codes to the DAT equivelent.
> *
> @@ -410,9 +497,9 @@
> IN DAPL_EVD *evd_ptr,
> IN ib_wait_obj_handle_t *p_cq_wait_obj_handle )
> {
> - dapl_dbg_log ( DAPL_DBG_TYPE_UTIL,
> + dapl_dbg_log ( DAPL_DBG_TYPE_CM,
> " cq_object_create: (%p)=%p\n",
> - p_cq_wait_obj_handle, *p_cq_wait_obj_handle);
> + p_cq_wait_obj_handle, evd_ptr );
>
> /* set cq_wait object to evd_ptr */
> *p_cq_wait_obj_handle = evd_ptr;
> @@ -447,33 +534,86 @@
> {
> DAPL_EVD *evd_ptr = p_cq_wait_obj_handle;
> ib_cq_handle_t cq = evd_ptr->ib_cq_handle;
> - struct ibv_cq *ibv_cq;
> - void *ibv_ctx;
> - int status = -ETIMEDOUT;
> + struct ibv_cq *ibv_cq = NULL;
> + void *ibv_ctx = NULL;
> + int status = 0;
>
> - dapl_dbg_log ( DAPL_DBG_TYPE_UTIL,
> + dapl_dbg_log ( DAPL_DBG_TYPE_CM,
> " cq_object_wait: dev %p evd %p cq %p, time %d\n",
> cq->context, evd_ptr, cq, timeout );
>
> - /* Multiple EVD's sharing one event handle for now */
> - if (cq) {
> - struct pollfd cq_poll = {
> - .fd = cq->context->cq_fd[0],
> - .events = POLLIN
> + /* Multiple EVD's sharing one event handle for now until uverbs supports more */
> +
> + /*
> + * This makes it very inefficient and tricky to manage multiple CQ per device open
> + * For example: 4 threads waiting on separate CQ events will all be woke when
> + * a CQ event fires. So the poll wakes up and the first thread to get to the
> + * the get_cq_event wins and the other 3 will block. The dapl_evd_wait code
> + * above will immediately do a poll_cq after returning from CQ wait and if
> + * nothing on the queue will call this wait again and go back to sleep. So
> + * as long as they all wake up, a mutex is held around the get_cq_event
> + * so no blocking occurs and they all return then everything should work.
> + * Of course, the timeout needs adjusted on the threads that go back to sleep.
> + */
> + while (cq) {
> + struct pollfd cq_poll = {
> + .fd = cq->context->cq_fd[0],
> + .events = POLLIN,
> + .revents = 0
> };
> - int timeout_ms = -1;
> + int timeout_ms = -1;
>
> if (timeout != DAT_TIMEOUT_INFINITE)
> timeout_ms = timeout/1000;
> -
> +
> + /* check if another thread processed the event already, pending queue > 0 */
> + dapl_os_lock( &evd_ptr->header.owner_ia->hca_ptr->ib_trans.cq_lock );
> + if (dapls_rbuf_count(&evd_ptr->pending_event_queue)) {
> + dapl_os_unlock( &evd_ptr->header.owner_ia->hca_ptr->ib_trans.cq_lock );
> + break;
> + }
> + dapl_os_unlock( &evd_ptr->header.owner_ia->hca_ptr->ib_trans.cq_lock );
> +
> + dapl_dbg_log ( DAPL_DBG_TYPE_CM," cq_object_wait: polling\n");
> status = poll(&cq_poll, 1, timeout_ms);
> - if (status == 1)
> - status = ibv_get_cq_event(cq->context,
> - 0, &ibv_cq, &ibv_ctx);
> - }
> - dapl_dbg_log (DAPL_DBG_TYPE_UTIL,
> - " cq_object_wait: RET cq %p ibv_cq %p ibv_ctx %p %x\n",
> - cq,ibv_cq,ibv_ctx,status);
> + dapl_dbg_log ( DAPL_DBG_TYPE_CM," cq_object_wait: poll returned status=%d\n",status);
> +
> + /*
> + * If poll with timeout wakes then hold mutex around a poll with no timeout
> + * so subsequent get_cq_events will be guaranteed not to block
> + * If the event does not belong to this EVD then put it on proper EVD pending
> + * queue under the mutex.
> + */
> + if (status == 1) {
> + dapl_os_lock( &evd_ptr->header.owner_ia->hca_ptr->ib_trans.cq_lock );
> + status = poll(&cq_poll, 1, 0);
> + if (status == 1) {
> + status = ibv_get_cq_event(cq->context,
> + 0, &ibv_cq, &ibv_ctx);
> +
> + /* if event is not ours, put on proper evd pending queue */
> + /* force another wakeup */
> + if ((ibv_ctx != evd_ptr ) &&
> + (!DAPL_BAD_HANDLE(ibv_ctx, DAPL_MAGIC_EVD))) {
> + dapl_dbg_log (DAPL_DBG_TYPE_CM,
> + " cq_object_wait: ibv_ctx %p != evd %p\n",
> + ibv_ctx, evd_ptr);
> + dapls_evd_copy_cq((struct evd_ptr*)ibv_ctx);
> + dapl_os_unlock(&evd_ptr->header.owner_ia->hca_ptr->ib_trans.cq_lock );
> + continue;
> + }
> + }
> + dapl_os_unlock( &evd_ptr->header.owner_ia->hca_ptr->ib_trans.cq_lock );
> + break;
> +
> + } else if (status == 0) {
> + status = ETIMEDOUT;
> + break;
> + }
> + }
> + dapl_dbg_log (DAPL_DBG_TYPE_CM,
> + " cq_object_wait: RET evd %p cq %p ibv_cq %p ibv_ctx %p %s\n",
> + evd_ptr, cq,ibv_cq,ibv_ctx,strerror(errno));
>
> return(dapl_convert_errno(status,"cq_wait_object_wait"));
>
More information about the general
mailing list