[openib-general] [PATCH] uDAPL with uCM and uAT support - version 2
Arlin Davis
arlin.r.davis at intel.com
Mon Aug 1 14:59:01 PDT 2005
James,
Here is version 2 with the changes you requested. Also, README updated per Hal's comments.
dapl/udapl/dapl_evd_wait.c
dapl/udapl/Makefile
dapl/common/dapl_evd_resize.c
dapl/openib/TODO
dapl/openib/dapl_ib_util.c
dapl/openib/dapl_ib_cm.c
dapl/openib/dapl_ib_util.h
dapl/openib/README
dapl/openib/dapl_ib_cq.c
Signed-off by: Arlin Davis <ardavis at ichips.intel.com>
Index: dapl/udapl/dapl_evd_wait.c
===================================================================
--- dapl/udapl/dapl_evd_wait.c (revision 2919)
+++ dapl/udapl/dapl_evd_wait.c (working copy)
@@ -74,9 +74,10 @@
DAPL_EVD *evd_ptr;
DAT_RETURN dat_status;
DAT_EVENT *local_event;
- DAT_BOOLEAN notify_requested = DAT_FALSE;
+ DAT_BOOLEAN notify_needed = DAT_FALSE;
DAT_BOOLEAN waitable;
DAPL_EVD_STATE evd_state;
+ DAT_COUNT total_events,new_events;
dapl_dbg_log (DAPL_DBG_TYPE_API,
"dapl_evd_wait (%p, %d, %d, %p, %p)\n",
@@ -124,9 +125,9 @@
}
dapl_dbg_log (DAPL_DBG_TYPE_EVD,
- "dapl_evd_wait: EVD %p, CQ %p\n",
- evd_ptr,
- (void *)evd_ptr->ib_cq_handle);
+ "dapl_evd_wait: EVD %p, CQ %p, Timeout %d, Threshold %d\n",
+ evd_ptr,(void *)evd_ptr->ib_cq_handle, time_out, threshold);
+
/*
* Make sure there are no other waiters and the evd is active.
@@ -144,11 +145,10 @@
evd_state = dapl_os_atomic_assign ( (DAPL_ATOMIC *)&evd_ptr->evd_state,
(DAT_COUNT) DAPL_EVD_STATE_OPEN,
(DAT_COUNT) DAPL_EVD_STATE_WAITED );
- dapl_os_unlock ( &evd_ptr->header.lock );
- if ( evd_state != DAPL_EVD_STATE_OPEN )
+ dapl_os_unlock ( &evd_ptr->header.lock );
+ if ( evd_state != DAPL_EVD_STATE_OPEN || !waitable)
{
- /* Bogus state, bail out */
dat_status = DAT_ERROR (DAT_INVALID_STATE,0);
goto bail;
}
@@ -182,37 +182,54 @@
* return right away if the ib_cq_handle associate with these evd
* equal to IB_INVALID_HANDLE
*/
- dapls_evd_copy_cq(evd_ptr);
-
- if (dapls_rbuf_count(&evd_ptr->pending_event_queue) >= threshold)
- {
- break;
- }
-
- /*
- * Do not enable the completion notification if this evd is not
- * a DTO_EVD or RMR_BIND_EVD
+ /* Logic to prevent missing completion between copy_cq (poll)
+ * and completion_notify (re-arm)
*/
- if ( (!notify_requested) &&
- ((evd_ptr->evd_flags & DAT_EVD_DTO_FLAG) ||
- (evd_ptr->evd_flags & DAT_EVD_RMR_BIND_FLAG)) )
+ notify_needed = DAT_TRUE;
+ new_events = 0;
+ while (DAT_TRUE)
{
- dat_status = dapls_ib_completion_notify (
- evd_ptr->header.owner_ia->hca_ptr->ib_hca_handle,
- evd_ptr,
- (evd_ptr->completion_type == DAPL_EVD_STATE_SOLICITED_WAIT) ?
- IB_NOTIFY_ON_SOLIC_COMP : IB_NOTIFY_ON_NEXT_COMP );
-
- DAPL_CNTR(DCNT_EVD_WAIT_CMP_NTFY);
- /* FIXME report error */
- dapl_os_assert(dat_status == DAT_SUCCESS);
+ dapls_evd_copy_cq(evd_ptr); /* poll for new completions */
+ total_events = dapls_rbuf_count (&evd_ptr->pending_event_queue);
+ new_events = total_events - new_events;
+ if (total_events >= threshold ||
+ (!new_events && notify_needed == DAT_FALSE))
+ {
+ break;
+ }
+
+ /*
+ * Do not enable the completion notification if this evd is not
+ * a DTO_EVD or RMR_BIND_EVD
+ */
+ if ( (evd_ptr->evd_flags & DAT_EVD_DTO_FLAG) ||
+ (evd_ptr->evd_flags & DAT_EVD_RMR_BIND_FLAG) )
+ {
+ dat_status = dapls_ib_completion_notify (
+ evd_ptr->header.owner_ia->hca_ptr->ib_hca_handle,
+ evd_ptr,
+ (evd_ptr->completion_type == DAPL_EVD_STATE_SOLICITED_WAIT)
?
+ IB_NOTIFY_ON_SOLIC_COMP : IB_NOTIFY_ON_NEXT_COMP );
+
+ DAPL_CNTR(DCNT_EVD_WAIT_CMP_NTFY);
+ notify_needed = DAT_FALSE;
+ new_events = total_events;
+
+ /* FIXME report error */
+ dapl_os_assert(dat_status == DAT_SUCCESS);
+ }
+ else
+ {
+ break;
+ }
- notify_requested = DAT_TRUE;
+ } /* while completions < threshold, and rearm needed */
- /* Try again. */
- continue;
+ if (total_events >= threshold)
+ {
+ break;
}
-
+
/*
* Unused by poster; it has no way to tell how many
@@ -232,8 +249,6 @@
#endif
dat_status = dapl_os_wait_object_wait (
&evd_ptr->wait_object, time_out );
-
- notify_requested = DAT_FALSE; /* We've used it up. */
/* See if we were awakened by evd_set_unwaitable */
if ( !evd_ptr->evd_waitable )
@@ -243,13 +258,22 @@
if (dat_status != DAT_SUCCESS)
{
- /*
- * If the status is DAT_TIMEOUT, we'll break out of the
- * loop, *not* dequeue an event (because dat_status
- * != DAT_SUCCESS), set *nmore (as we should for timeout)
- * and return DAT_TIMEOUT.
- */
- break;
+ /*
+ * If the status is DAT_TIMEOUT, we'll break out of the
+ * loop, *not* dequeue an event (because dat_status
+ * != DAT_SUCCESS), set *nmore (as we should for timeout)
+ * and return DAT_TIMEOUT.
+ */
+
+#if defined(DAPL_DBG)
+ dapls_evd_copy_cq(evd_ptr); /* poll */
+ dapl_dbg_log (DAPL_DBG_TYPE_EVD,
+ "dapl_evd_wait: WAKEUP ERROR (0x%x): EVD %p, CQ %p, events? %d\n",
+ dat_status,evd_ptr,(void *)evd_ptr->ib_cq_handle,
+ dapls_rbuf_count(&evd_ptr->pending_event_queue) );
+#endif /* DAPL_DBG */
+
+ break;
}
}
Index: dapl/udapl/Makefile
===================================================================
--- dapl/udapl/Makefile (revision 2941)
+++ dapl/udapl/Makefile (working copy)
@@ -122,7 +122,8 @@
#
ifeq ($(VERBS),openib)
PROVIDER = $(TOPDIR)/../openib
-CFLAGS += -DOPENIB -DCQ_WAIT_OBJECT
+CFLAGS += -DOPENIB
+#CFLAGS += -DCQ_WAIT_OBJECT uncomment when fixed
CFLAGS += -I/usr/local/include/infiniband
endif
Index: dapl/common/dapl_evd_resize.c
===================================================================
--- dapl/common/dapl_evd_resize.c (revision 2919)
+++ dapl/common/dapl_evd_resize.c (working copy)
@@ -67,71 +67,139 @@
IN DAT_EVD_HANDLE evd_handle,
IN DAT_COUNT evd_qlen )
{
- DAPL_IA *ia_ptr;
- DAPL_EVD *evd_ptr;
- DAT_COUNT pend_cnt;
- DAT_RETURN dat_status;
+ DAPL_IA *ia_ptr;
+ DAPL_EVD *evd_ptr;
+ DAT_EVENT *event_ptr;
+ DAT_EVENT *events;
+ DAT_EVENT *orig_event;
+ DAPL_RING_BUFFER free_event_queue;
+ DAPL_RING_BUFFER pending_event_queue;
+ DAT_COUNT pend_cnt;
+ DAT_COUNT i;
+ DAT_RETURN dat_status;
dapl_dbg_log (DAPL_DBG_TYPE_API, "dapl_evd_resize (%p, %d)\n",
evd_handle, evd_qlen);
if (DAPL_BAD_HANDLE (evd_handle, DAPL_MAGIC_EVD))
{
- dat_status = DAT_ERROR (DAT_INVALID_HANDLE,0);
- goto bail;
+ return DAT_ERROR (DAT_INVALID_PARAMETER,DAT_INVALID_ARG1);
}
evd_ptr = (DAPL_EVD *)evd_handle;
ia_ptr = evd_ptr->header.owner_ia;
- if ( evd_qlen == evd_ptr->qlen )
+ if ((evd_qlen <= 0) || (evd_ptr->qlen > evd_qlen))
{
- dat_status = DAT_SUCCESS;
- goto bail;
+ dat_status = DAT_ERROR(DAT_INVALID_PARAMETER,DAT_INVALID_ARG2);
+ goto bail;
}
if ( evd_qlen > ia_ptr->hca_ptr->ia_attr.max_evd_qlen )
{
- dat_status = DAT_ERROR (DAT_INVALID_PARAMETER,DAT_INVALID_ARG2);
+ dat_status = DAT_ERROR(DAT_INSUFFICIENT_RESOURCES,DAT_RESOURCE_TEVD);
goto bail;
}
dapl_os_lock(&evd_ptr->header.lock);
- /* Don't try to resize if we are actively waiting */
if (evd_ptr->evd_state == DAPL_EVD_STATE_WAITED)
{
- dapl_os_unlock(&evd_ptr->header.lock);
- dat_status = DAT_ERROR (DAT_INVALID_STATE,0);
- goto bail;
+ dat_status = DAT_ERROR(DAT_INVALID_STATE,0);
+ goto bail_unlock;
}
pend_cnt = dapls_rbuf_count(&evd_ptr->pending_event_queue);
if (pend_cnt > evd_qlen) {
- dapl_os_unlock(&evd_ptr->header.lock);
- dat_status = DAT_ERROR (DAT_INVALID_STATE,0);
- goto bail;
+ dat_status = DAT_ERROR(DAT_INVALID_STATE,0);
+ goto bail_unlock;
}
dat_status = dapls_ib_cq_resize(evd_ptr->header.owner_ia,
- evd_ptr,
- &evd_qlen);
- if (dat_status != DAT_SUCCESS)
+ evd_ptr,
+ &evd_qlen);
+ if (DAT_SUCCESS != dat_status) {
+ dat_status = DAT_ERROR(DAT_INSUFFICIENT_RESOURCES,DAT_RESOURCE_MEMORY);
+ goto bail_unlock;
+ }
+
+ /* Allocate EVENTs */
+ events = (DAT_EVENT *) dapl_os_alloc (evd_qlen * sizeof (DAT_EVENT));
+ if (!events)
{
- dapl_os_unlock(&evd_ptr->header.lock);
- goto bail;
+ dat_status = DAT_ERROR(DAT_INSUFFICIENT_RESOURCES,DAT_RESOURCE_MEMORY);
+ goto bail_unlock;
}
+ event_ptr = events;
- dat_status = dapls_evd_event_realloc (evd_ptr, evd_qlen);
- if (dat_status != DAT_SUCCESS)
+ /* allocate free event queue */
+ dat_status = dapls_rbuf_alloc (&free_event_queue, evd_qlen);
+ if (DAT_SUCCESS != dat_status)
{
- dapl_os_unlock(&evd_ptr->header.lock);
- goto bail;
+ dapl_os_free(event_ptr, evd_qlen * sizeof (DAT_EVENT));
+ dat_status = DAT_ERROR(DAT_INSUFFICIENT_RESOURCES,DAT_RESOURCE_MEMORY);
+ goto bail_unlock;
+ }
+
+ /* allocate pending event queue */
+ dat_status = dapls_rbuf_alloc (&pending_event_queue, evd_qlen);
+ if (DAT_SUCCESS != dat_status)
+ {
+ dapl_os_free(event_ptr, evd_qlen * sizeof (DAT_EVENT));
+ dat_status = DAT_ERROR(DAT_INSUFFICIENT_RESOURCES,DAT_RESOURCE_MEMORY);
+ goto bail_unlock;
}
+ for (i = 0; i < pend_cnt; i++)
+ {
+ orig_event = dapls_rbuf_remove(&evd_ptr->pending_event_queue);
+ if (orig_event == NULL) {
+ dapl_dbg_log (DAPL_DBG_TYPE_ERR, " Inconsistent event queue\n");
+ dapl_os_free(event_ptr, evd_qlen * sizeof (DAT_EVENT));
+ dat_status = DAT_ERROR(DAT_INSUFFICIENT_RESOURCES,DAT_RESOURCE_MEMORY);
+ goto bail_unlock;
+ }
+ memcpy(event_ptr, orig_event, sizeof(DAT_EVENT));
+ dat_status = dapls_rbuf_add(&pending_event_queue, event_ptr);
+ if (DAT_SUCCESS != dat_status) {
+ dapl_os_free(event_ptr, evd_qlen * sizeof (DAT_EVENT));
+ dat_status = DAT_ERROR(DAT_INSUFFICIENT_RESOURCES,DAT_RESOURCE_MEMORY);
+ goto bail_unlock;
+ }
+ event_ptr++;
+ }
+
+ for (i = pend_cnt; i < evd_qlen; i++)
+ {
+ dat_status = dapls_rbuf_add(&free_event_queue,(void *) event_ptr);
+ if (DAT_SUCCESS != dat_status) {
+ dapl_os_free(event_ptr, evd_qlen * sizeof (DAT_EVENT));
+ dat_status = DAT_ERROR(DAT_INSUFFICIENT_RESOURCES,DAT_RESOURCE_MEMORY);
+ goto bail_unlock;
+ }
+ event_ptr++;
+ }
+
+ dapls_rbuf_destroy (&evd_ptr->free_event_queue);
+ dapls_rbuf_destroy (&evd_ptr->pending_event_queue);
+ if (evd_ptr->events)
+ {
+ dapl_os_free (evd_ptr->events, evd_ptr->qlen * sizeof (DAT_EVENT));
+ }
+ evd_ptr->free_event_queue = free_event_queue;
+ evd_ptr->pending_event_queue = pending_event_queue;
+ evd_ptr->events = events;
+ evd_ptr->qlen = evd_qlen;
+
+bail_unlock:
+
dapl_os_unlock(&evd_ptr->header.lock);
- bail:
+ dapl_dbg_log (DAPL_DBG_TYPE_RTN,
+ "dapl_evd_resize returns 0x%x\n",dat_status);
+
+bail:
+
return dat_status;
}
Index: dapl/openib/TODO
===================================================================
--- dapl/openib/TODO (revision 2919)
+++ dapl/openib/TODO (working copy)
@@ -1,7 +1,7 @@
IB Verbs:
- CQ resize?
-- query call to get current qp state
+- query call to get current qp state, remote port number
- ibv_get_cq_event() needs timed event call and wakeup
- query call to get device attributes
- memory window support
@@ -9,8 +9,6 @@
DAPL:
- reinit EP needs a QP timewait completion notification
- add cq_object wakeup, time based cq_object wait when verbs support arrives
-- update uDAPL code with real ATS support
-- etc, etc.
Other:
- Shared memory in udapl and kernel module to support?
Index: dapl/openib/dapl_ib_util.c
===================================================================
--- dapl/openib/dapl_ib_util.c (revision 2919)
+++ dapl/openib/dapl_ib_util.c (working copy)
@@ -111,27 +111,40 @@
}
-/* just get IP address for hostname */
-int dapli_get_addr( char *addr, int addr_len)
+/* just get IP address, IPv4 only for now */
+int dapli_get_hca_addr( struct dapl_hca *hca_ptr )
{
- struct sockaddr_in *ipv4_addr = (struct sockaddr_in*)addr;
- struct hostent *h_ptr;
- struct utsname ourname;
-
- if ( uname( &ourname ) < 0 )
- return 1;
-
- h_ptr = gethostbyname( ourname.nodename );
- if ( h_ptr == NULL )
+ struct sockaddr_in *ipv4_addr;
+ struct ib_at_completion at_comp;
+ struct dapl_at_record at_rec;
+ int status;
+ DAT_RETURN dat_status;
+
+ ipv4_addr = (struct sockaddr_in*)&hca_ptr->hca_address;
+ ipv4_addr->sin_family = AF_INET;
+ ipv4_addr->sin_addr.s_addr = 0;
+
+ at_comp.fn = dapli_ip_comp_handler;
+ at_comp.context = &at_rec;
+ at_rec.addr = &hca_ptr->hca_address;
+ at_rec.wait_object = &hca_ptr->ib_trans.wait_object;
+
+ /* call with async_comp until the sync version works */
+ status = ib_at_ips_by_gid(&hca_ptr->ib_trans.gid, &ipv4_addr->sin_addr.s_addr, 1,
+ &at_comp, &at_rec.req_id);
+
+ if (status < 0)
return 1;
-
- if ( h_ptr->h_addrtype == AF_INET ) {
- ipv4_addr = (struct sockaddr_in*) addr;
- ipv4_addr->sin_family = AF_INET;
- dapl_os_memcpy( &ipv4_addr->sin_addr, h_ptr->h_addr_list[0], 4 );
- } else
+
+ if (status > 0)
+ dapli_ip_comp_handler(at_rec.req_id, (void*)ipv4_addr, status);
+
+ /* wait for answer, 5 seconds max */
+ dat_status = dapl_os_wait_object_wait (&hca_ptr->ib_trans.wait_object,5000000);
+
+ if ((dat_status != DAT_SUCCESS ) || (!ipv4_addr->sin_addr.s_addr))
return 1;
-
+
return 0;
}
@@ -152,14 +165,17 @@
*/
int32_t dapls_ib_init (void)
{
- if (dapli_cm_thread_init())
- return -1;
- else
- return 0;
+ dapl_dbg_log (DAPL_DBG_TYPE_UTIL, " dapl_ib_init: \n" );
+ if (dapli_cm_thread_init() || dapli_at_thread_init())
+ return 1;
+
+ return 0;
}
int32_t dapls_ib_release (void)
{
+ dapl_dbg_log (DAPL_DBG_TYPE_UTIL, " dapl_ib_release: \n" );
+ dapli_at_thread_destroy();
dapli_cm_thread_destroy();
return 0;
}
@@ -186,7 +202,6 @@
IN DAPL_HCA *hca_ptr)
{
struct dlist *dev_list;
- DAT_RETURN dat_status = DAT_SUCCESS;
dapl_dbg_log (DAPL_DBG_TYPE_UTIL,
" open_hca: %s - %p\n", hca_name, hca_ptr );
@@ -217,36 +232,46 @@
ibv_get_device_name(hca_ptr->ib_trans.ib_dev) );
return DAT_INTERNAL_ERROR;
}
-
+
/* set inline max with enviromment or default, get local lid and gid 0 */
hca_ptr->ib_trans.max_inline_send =
dapl_os_get_env_val ( "DAPL_MAX_INLINE", INLINE_SEND_DEFAULT );
- if ( dapli_get_lid(hca_ptr, hca_ptr->port_num,
- &hca_ptr->ib_trans.lid )) {
+ if (dapli_get_lid(hca_ptr, hca_ptr->port_num,
+ &hca_ptr->ib_trans.lid)) {
dapl_dbg_log (DAPL_DBG_TYPE_ERR,
" open_hca: IB get LID failed for %s\n",
ibv_get_device_name(hca_ptr->ib_trans.ib_dev) );
- return DAT_INTERNAL_ERROR;
+ goto bail;
}
- if ( dapli_get_gid(hca_ptr, hca_ptr->port_num, 0,
- &hca_ptr->ib_trans.gid )) {
+ if (dapli_get_gid(hca_ptr, hca_ptr->port_num, 0,
+ &hca_ptr->ib_trans.gid)) {
dapl_dbg_log (DAPL_DBG_TYPE_ERR,
" open_hca: IB get GID failed for %s\n",
ibv_get_device_name(hca_ptr->ib_trans.ib_dev) );
- return DAT_INTERNAL_ERROR;
+ goto bail;
}
-
/* get the IP address of the device */
- if ( dapli_get_addr((char*)&hca_ptr->hca_address,
- sizeof(DAT_SOCK_ADDR6) )) {
+ if (dapli_get_hca_addr(hca_ptr)) {
dapl_dbg_log (DAPL_DBG_TYPE_ERR,
" open_hca: IB get ADDR failed for %s\n",
ibv_get_device_name(hca_ptr->ib_trans.ib_dev) );
- return DAT_INTERNAL_ERROR;
+ goto bail;
+ }
+
+ /* one thread for each device open */
+ if (dapli_cq_thread_init(hca_ptr)) {
+ dapl_dbg_log (DAPL_DBG_TYPE_ERR,
+ " open_hca: cq_thread_init failed for %s\n",
+ ibv_get_device_name(hca_ptr->ib_trans.ib_dev) );
+ goto bail;
}
+ /* initialize cq_lock and wait object */
+ dapl_os_lock_init(&hca_ptr->ib_trans.cq_lock);
+ dapl_os_wait_object_init (&hca_ptr->ib_trans.wait_object);
+
dapl_dbg_log (DAPL_DBG_TYPE_UTIL,
" open_hca: %s, port %d, %s %d.%d.%d.%d INLINE_MAX=%d\n",
ibv_get_device_name(hca_ptr->ib_trans.ib_dev), hca_ptr->port_num,
@@ -257,7 +282,19 @@
((struct sockaddr_in *)&hca_ptr->hca_address)->sin_addr.s_addr >> 24 & 0xff,
hca_ptr->ib_trans.max_inline_send );
- return dat_status;
+ dapl_dbg_log(DAPL_DBG_TYPE_CM,
+ " open_hca: LID 0x%x GID subnet %016llx id %016llx\n",
+ hca_ptr->ib_trans.lid,
+ (unsigned long long)bswap_64(hca_ptr->ib_trans.gid.global.subnet_prefix),
+ (unsigned long long)bswap_64(hca_ptr->ib_trans.gid.global.interface_id) );
+
+ return DAT_SUCCESS;
+
+bail:
+ ibv_close_device(hca_ptr->ib_hca_handle);
+ hca_ptr->ib_hca_handle = IB_INVALID_HANDLE;
+ return DAT_INTERNAL_ERROR;
+
}
@@ -282,10 +319,14 @@
dapl_dbg_log (DAPL_DBG_TYPE_UTIL," close_hca: %p\n",hca_ptr);
if (hca_ptr->ib_hca_handle != IB_INVALID_HANDLE) {
+ dapli_cq_thread_destroy(hca_ptr);
if (ibv_close_device(hca_ptr->ib_hca_handle))
return(dapl_convert_errno(errno,"ib_close_device"));
hca_ptr->ib_hca_handle = IB_INVALID_HANDLE;
}
+
+ dapl_os_lock_destroy(&hca_ptr->ib_trans.cq_lock);
+
return (DAT_SUCCESS);
}
@@ -448,35 +489,4 @@
return DAT_SUCCESS;
}
-#ifdef PROVIDER_SPECIFIC_ATTR
-
-/*
- * dapls_set_provider_specific_attr
- *
- * Input:
- * attr_ptr Pointer provider attributes
- *
- * Output:
- * none
- *
- * Returns:
- * void
- */
-DAT_NAMED_ATTR ib_attrs[] = {
- {
- "I_DAT_SEND_INLINE_THRESHOLD",
- "128"
- },
-};
-
-#define SPEC_ATTR_SIZE( x ) (sizeof( x ) / sizeof( DAT_NAMED_ATTR))
-
-void dapls_set_provider_specific_attr(
- IN DAT_PROVIDER_ATTR *attr_ptr )
-{
- attr_ptr->num_provider_specific_attr = SPEC_ATTR_SIZE( ib_attrs );
- attr_ptr->provider_specific_attr = ib_attrs;
-}
-
-#endif
Index: dapl/openib/dapl_ib_cm.c
===================================================================
--- dapl/openib/dapl_ib_cm.c (revision 2919)
+++ dapl/openib/dapl_ib_cm.c (working copy)
@@ -70,19 +70,8 @@
static inline uint64_t cpu_to_be64(uint64_t x) { return x; }
#endif
-#ifndef IB_AT
-
-#include <stdio.h>
-#include <unistd.h>
-#include <fcntl.h>
-#include <netinet/tcp.h>
-#include <sysfs/libsysfs.h>
-#include <signal.h>
-
-/* iclust-20 hard coded values, network order */
-#define REMOTE_GID "fe80:0000:0000:0000:0002:c902:0000:4071"
-#define REMOTE_LID "0002"
-
+static int g_at_destroy;
+static DAPL_OS_THREAD g_at_thread;
static int g_cm_destroy;
static DAPL_OS_THREAD g_cm_thread;
static DAPL_OS_LOCK g_cm_lock;
@@ -122,7 +111,7 @@
while (g_cm_destroy) {
struct timespec sleep, remain;
sleep.tv_sec = 0;
- sleep.tv_nsec = 200000000; /* 200 ms */
+ sleep.tv_nsec = 10000000; /* 10 ms */
dapl_dbg_log(DAPL_DBG_TYPE_CM,
" cm_thread_destroy: waiting for cm_thread\n");
nanosleep (&sleep, &remain);
@@ -130,112 +119,70 @@
dapl_dbg_log(DAPL_DBG_TYPE_CM," cm_thread_destroy(%d) exit\n",getpid());
}
-static int ib_at_route_by_ip(uint32_t dst_ip, uint32_t src_ip, int tos, uint16_t flags,
- struct ib_at_ib_route *ib_route,
- struct ib_at_completion *async_comp)
-{
- struct dapl_cm_id *conn = (struct dapl_cm_id *)async_comp->context;
-
- dapl_dbg_log (
- DAPL_DBG_TYPE_CM,
- " CM at_route_by_ip: conn %p cm_id %d src %d.%d.%d.%d -> dst %d.%d.%d.%d (%d)\n",
- conn,conn->cm_id,
- src_ip >> 0 & 0xff, src_ip >> 8 & 0xff,
- src_ip >> 16 & 0xff,src_ip >> 24 & 0xff,
- dst_ip >> 0 & 0xff, dst_ip >> 8 & 0xff,
- dst_ip >> 16 & 0xff,dst_ip >> 24 & 0xff, conn->service_id);
-
- /* use req_id for loopback indication */
- if (( src_ip == dst_ip ) || ( dst_ip == 0x0100007f ))
- async_comp->req_id = 1;
- else
- async_comp->req_id = 0;
-
- return 1;
-}
-
-static int ib_at_paths_by_route(struct ib_at_ib_route *ib_route, uint32_t mpath_type,
- struct ib_sa_path_rec *pr, int npath,
- struct ib_at_completion *async_comp)
+int dapli_at_thread_init(void)
{
- struct dapl_cm_id *conn = (struct dapl_cm_id *)async_comp->context;
- char *env, *token;
- char dgid[40];
- uint16_t *p_gid = (uint16_t*)&ib_route->gid;
+ DAT_RETURN dat_status;
- /* set local path record values and send to remote */
- (void)dapl_os_memzero(pr, sizeof(*pr));
+ dapl_dbg_log(DAPL_DBG_TYPE_CM," at_thread_init(%d)\n", getpid());
- pr->slid = htons(conn->hca->ib_trans.lid);
- pr->sgid.global.subnet_prefix = conn->hca->ib_trans.gid.global.subnet_prefix;
- pr->sgid.global.interface_id = conn->hca->ib_trans.gid.global.interface_id;
+ /* create thread to process AT async requests */
+ dat_status = dapl_os_thread_create(at_thread, NULL, &g_at_thread);
+ if (dat_status != DAT_SUCCESS)
+ {
+ dapl_dbg_log(DAPL_DBG_TYPE_ERR,
+ " at_thread_init: failed to create thread\n");
+ return 1;
+ }
+ return 0;
+}
- env = getenv("DAPL_REMOTE_LID");
- if ( env == NULL )
- env = REMOTE_LID;
- ib_route->lid = strtol(env,NULL,0);
+void dapli_at_thread_destroy(void)
+{
+ dapl_dbg_log(DAPL_DBG_TYPE_CM," at_thread_destroy(%d)\n", getpid());
- env = getenv("DAPL_REMOTE_GID");
- if ( env == NULL )
- env = REMOTE_GID;
+ /* destroy cr_thread and lock */
+ g_at_destroy = 1;
+ pthread_kill( g_at_thread, SIGUSR1 );
+ dapl_dbg_log(DAPL_DBG_TYPE_CM," at_thread_destroy(%d) SIGUSR1 sent\n",getpid());
+ while (g_at_destroy) {
+ struct timespec sleep, remain;
+ sleep.tv_sec = 0;
+ sleep.tv_nsec = 10000000; /* 10 ms */
+ dapl_dbg_log(DAPL_DBG_TYPE_CM,
+ " at_thread_destroy: waiting for at_thread\n");
+ nanosleep (&sleep, &remain);
+ }
+ dapl_dbg_log(DAPL_DBG_TYPE_CM," at_thread_destroy(%d) exit\n",getpid());
+}
- dapl_dbg_log(DAPL_DBG_TYPE_CM,
- " ib_at_paths_by_route: remote LID %x GID %s\n",
- ib_route->lid,env);
+void dapli_ip_comp_handler(uint64_t req_id, void *context, int rec_num)
+{
+ struct dapl_at_record *at_rec = context;
- dapl_os_memcpy( dgid, env, 40 );
+ dapl_dbg_log(DAPL_DBG_TYPE_CM,
+ " ip_comp_handler: ctxt %p, req_id %lld rec_num %d\n",
+ context, req_id, rec_num);
- /* get GID with token strings and delimiter */
- token = strtok(dgid,":");
- while (token) {
- *p_gid = strtoul(token,NULL,16);
- *p_gid = htons(*p_gid); /* convert each token to network order */
- token = strtok(NULL,":");
- p_gid++;
- }
-
- /* set remote lid and gid, req_id is indication of loopback */
- if ( !async_comp->req_id ) {
- pr->dlid = htons(ib_route->lid);
- pr->dgid.global.subnet_prefix = ib_route->gid.global.subnet_prefix;
- pr->dgid.global.interface_id = ib_route->gid.global.interface_id;
- } else {
- pr->dlid = pr->slid;
- pr->dgid.global.subnet_prefix = pr->sgid.global.subnet_prefix;
- pr->dgid.global.interface_id = pr->sgid.global.interface_id;
- }
-
- pr->reversible = 0x1000000;
- pr->pkey = 0xffff;
- pr->mtu = IBV_MTU_1024;
- pr->mtu_selector = 2;
- pr->rate_selector = 2;
- pr->rate = 3;
- pr->packet_life_time_selector = 2;
- pr->packet_life_time = 2;
+ if ((at_rec) && ( at_rec->req_id == req_id)) {
+ dapl_os_wait_object_wakeup(at_rec->wait_object);
+ return;
+ }
- dapl_dbg_log(DAPL_DBG_TYPE_CM,
- " ib_at_paths_by_route: SRC LID 0x%x GID subnet %016llx id %016llx\n",
- pr->slid,(unsigned long long)(pr->sgid.global.subnet_prefix),
- (unsigned long long)(pr->sgid.global.interface_id) );
- dapl_dbg_log(DAPL_DBG_TYPE_CM,
- " ib_at_paths_by_route: DST LID 0x%x GID subnet %016llx id %016llx\n",
- pr->dlid,(unsigned long long)(pr->dgid.global.subnet_prefix),
- (unsigned long long)(pr->dgid.global.interface_id) );
-
- dapli_path_comp_handler( async_comp->req_id, (void*)conn, 1);
-
- return 0;
+ dapl_dbg_log(DAPL_DBG_TYPE_ERR,
+ " ip_comp_handler: at_rec->req_id %lld != req_id %lld\n",
+ at_rec->req_id, req_id );
}
-#endif /* ifndef IB_AT */
-
static void dapli_path_comp_handler(uint64_t req_id, void *context, int rec_num)
{
struct dapl_cm_id *conn = context;
int status;
ib_cm_events_t event;
+ dapl_dbg_log(DAPL_DBG_TYPE_CM,
+ " path_comp_handler: ctxt %p, req_id %lld rec_num %d\n",
+ context, req_id, rec_num);
+
if (rec_num <= 0) {
dapl_dbg_log(DAPL_DBG_TYPE_CM,
" path_comp_handler: resolution err %d retry %d\n",
@@ -249,7 +196,7 @@
status = ib_at_paths_by_route(&conn->dapl_rt, 0,
&conn->dapl_path, 1,
- &conn->dapl_comp);
+ &conn->dapl_comp, &conn->dapl_comp.req_id);
if (status) {
dapl_dbg_log(DAPL_DBG_TYPE_CM,
" path_by_route: err %d id %lld\n",
@@ -287,6 +234,21 @@
int status;
ib_cm_events_t event;
+ dapl_dbg_log(DAPL_DBG_TYPE_CM,
+ " rt_comp_handler: conn %p, req_id %lld rec_num %d\n",
+ conn, req_id, rec_num);
+
+ dapl_dbg_log(DAPL_DBG_TYPE_CM,
+ " rt_comp_handler: SRC GID subnet %016llx id %016llx\n",
+ (unsigned long long)cpu_to_be64(conn->dapl_rt.sgid.global.subnet_prefix),
+ (unsigned long long)cpu_to_be64(conn->dapl_rt.sgid.global.interface_id) );
+
+ dapl_dbg_log(DAPL_DBG_TYPE_CM,
+ " rt_comp_handler: DST GID subnet %016llx id %016llx\n",
+ (unsigned long long)cpu_to_be64(conn->dapl_rt.dgid.global.subnet_prefix),
+ (unsigned long long)cpu_to_be64(conn->dapl_rt.dgid.global.interface_id) );
+
+
if (rec_num <= 0) {
dapl_dbg_log(DAPL_DBG_TYPE_ERR,
" dapl_rt_comp_handler: rec %d retry %d\n",
@@ -298,7 +260,8 @@
}
status = ib_at_route_by_ip(((struct sockaddr_in *)&conn->r_addr)->sin_addr.s_addr,
- 0, 0, 0, &conn->dapl_rt, &conn->dapl_comp);
+ 0, 0, 0, &conn->dapl_rt,
+ &conn->dapl_comp,&conn->dapl_comp.req_id);
if (status < 0) {
dapl_dbg_log(DAPL_DBG_TYPE_ERR, "dapl_rt_comp_handler: "
"ib_at_route_by_ip failed with status %d\n",
@@ -306,9 +269,16 @@
event = IB_CME_DESTINATION_UNREACHABLE;
goto bail;
}
-
if (status == 1)
dapli_rt_comp_handler(conn->dapl_comp.req_id, conn, 1);
+
+ return;
+ }
+
+ if (!conn->dapl_rt.dgid.global.subnet_prefix || req_id != conn->dapl_comp.req_id) {
+ dapl_dbg_log(DAPL_DBG_TYPE_ERR,
+ " dapl_rt_comp_handler: ERROR: unexpected callback req_id=%d(%d)\n",
+ req_id, conn->dapl_comp.req_id );
return;
}
@@ -316,7 +286,7 @@
conn->dapl_comp.context = conn;
conn->retries = 0;
status = ib_at_paths_by_route(&conn->dapl_rt, 0, &conn->dapl_path, 1,
- &conn->dapl_comp);
+ &conn->dapl_comp, &conn->dapl_comp.req_id);
if (status) {
dapl_dbg_log(DAPL_DBG_TYPE_ERR,
"dapl_rt_comp_handler: ib_at_paths_by_route "
@@ -346,8 +316,6 @@
ib_cm_destroy_id(conn->cm_id);
if (conn->ep)
conn->ep->cm_handle = IB_INVALID_HANDLE;
- if (conn->sp)
- conn->sp->cm_srvc_handle = IB_INVALID_HANDLE;
/* take off the CM thread work queue and free */
dapl_os_lock( &g_cm_lock );
@@ -621,10 +589,8 @@
}
/* something to catch the signal */
-static void cm_handler(int signum)
+static void ib_sig_handler(int signum)
{
- dapl_dbg_log (DAPL_DBG_TYPE_CM," cm_thread(%d,0x%x): ENTER cm_handler %d\n",
- getpid(),g_cm_thread,signum);
return;
}
@@ -643,7 +609,7 @@
sigemptyset(&sigset);
sigaddset(&sigset, SIGUSR1);
pthread_sigmask(SIG_UNBLOCK, &sigset, NULL);
- signal( SIGUSR1, cm_handler);
+ signal( SIGUSR1, ib_sig_handler);
dapl_os_lock( &g_cm_lock );
while (!g_cm_destroy) {
@@ -667,7 +633,7 @@
dapl_dbg_log(DAPL_DBG_TYPE_CM,
" cm_thread: GET EVENT fd=%d n=%d\n",
ib_cm_get_fd(),ret);
- if (ib_cm_event_get(&event)) {
+ if (ib_cm_event_get_timed(0,&event)) {
dapl_dbg_log(DAPL_DBG_TYPE_CM,
" cm_thread: ERR %s eventi_get on %d\n",
strerror(errno), ib_cm_get_fd() );
@@ -732,6 +698,33 @@
g_cm_destroy = 0;
}
+/* async AT processing thread */
+void at_thread(void *arg)
+{
+ sigset_t sigset;
+
+ dapl_dbg_log (DAPL_DBG_TYPE_CM,
+ " at_thread(%d,0x%x): ENTER: at_fd %d\n",
+ getpid(), g_at_thread, ib_at_get_fd());
+
+ sigemptyset(&sigset);
+ sigaddset(&sigset, SIGUSR1);
+ pthread_sigmask(SIG_UNBLOCK, &sigset, NULL);
+ signal(SIGUSR1, ib_sig_handler);
+
+ while (!g_at_destroy) {
+ /* poll forever until callback or signal */
+ if (ib_at_callback_get_timed(-1) < 0) {
+ dapl_dbg_log(DAPL_DBG_TYPE_CM,
+ " at_thread: SIG? ret=%s, destroy=%d\n",
+ strerror(errno), g_at_destroy );
+ }
+ dapl_dbg_log(DAPL_DBG_TYPE_CM," at_thread: callback woke\n");
+ }
+ dapl_dbg_log(DAPL_DBG_TYPE_CM," at_thread(%d) EXIT \n", getpid());
+ g_at_destroy = 0;
+}
+
/************************ DAPL provider entry points **********************/
/*
@@ -826,33 +819,34 @@
conn->dapl_comp.context = conn;
conn->retries = 0;
dapl_os_memcpy(&conn->r_addr, r_addr, sizeof(DAT_SOCK_ADDR6));
+
+ /* put on CM thread work queue */
+ dapl_llist_init_entry((DAPL_LLIST_ENTRY*)&conn->entry);
+ dapl_os_lock( &g_cm_lock );
+ dapl_llist_add_tail(&g_cm_list,
+ (DAPL_LLIST_ENTRY*)&conn->entry, conn);
+ dapl_os_unlock(&g_cm_lock);
status = ib_at_route_by_ip(
((struct sockaddr_in *)&conn->r_addr)->sin_addr.s_addr,
((struct sockaddr_in *)&conn->hca->hca_address)->sin_addr.s_addr,
- 0, 0, &conn->dapl_rt, &conn->dapl_comp);
+ 0, 0, &conn->dapl_rt, &conn->dapl_comp, &conn->dapl_comp.req_id);
+
+ dapl_dbg_log(DAPL_DBG_TYPE_CM, " connect: at_route ret=%d,%s req_id %d GID %016llx
%016llx\n",
+ status, strerror(errno), conn->dapl_comp.req_id,
+ (unsigned long long)cpu_to_be64(conn->dapl_rt.dgid.global.subnet_prefix),
+ (unsigned long long)cpu_to_be64(conn->dapl_rt.dgid.global.interface_id) );
if (status < 0) {
dat_status = dapl_convert_errno(errno,"ib_at_route_by_ip");
- goto destroy;
+ dapli_destroy_cm_id(conn);
+ return dat_status;
}
- if (status == 1)
- dapli_rt_comp_handler(conn->dapl_comp.req_id, conn, 1);
-
- /* put on CM thread work queue */
- dapl_llist_init_entry((DAPL_LLIST_ENTRY*)&conn->entry);
- dapl_os_lock( &g_cm_lock );
- dapl_llist_add_tail(&g_cm_list,
- (DAPL_LLIST_ENTRY*)&conn->entry, conn);
- dapl_os_unlock(&g_cm_lock);
+ if (status > 0)
+ dapli_rt_comp_handler(conn->dapl_comp.req_id, conn, status);
return DAT_SUCCESS;
-
-destroy:
- dapli_destroy_cm_id(conn);
- return dat_status;
-
}
/*
@@ -992,6 +986,13 @@
conn->hca = ia_ptr->hca_ptr;
conn->service_id = ServiceID;
+ /* put on CM thread work queue */
+ dapl_llist_init_entry((DAPL_LLIST_ENTRY*)&conn->entry);
+ dapl_os_lock( &g_cm_lock );
+ dapl_llist_add_tail(&g_cm_list,
+ (DAPL_LLIST_ENTRY*)&conn->entry, conn);
+ dapl_os_unlock(&g_cm_lock);
+
dapl_dbg_log(DAPL_DBG_TYPE_EP,
" setup_listener(conn=%p cm_id=%d)\n",
sp_ptr->cm_srvc_handle,conn->cm_id);
@@ -1003,19 +1004,13 @@
dat_status = DAT_CONN_QUAL_IN_USE;
else
dat_status = DAT_INSUFFICIENT_RESOURCES;
- /* success */
- } else {
- /* put on CM thread work queue */
- dapl_llist_init_entry((DAPL_LLIST_ENTRY*)&conn->entry);
- dapl_os_lock( &g_cm_lock );
- dapl_llist_add_tail(&g_cm_list,
- (DAPL_LLIST_ENTRY*)&conn->entry, conn);
- dapl_os_unlock(&g_cm_lock);
+
+ dapli_destroy_cm_id(conn);
return dat_status;
}
- dapli_destroy_cm_id(conn);
- return dat_status;
+ /* success */
+ return DAT_SUCCESS;
}
@@ -1047,9 +1042,11 @@
" remove_listener(ia_ptr %p sp_ptr %p cm_ptr %p)\n",
ia_ptr, sp_ptr, conn );
- if (sp_ptr->cm_srvc_handle != IB_INVALID_HANDLE)
+ if (conn != IB_INVALID_HANDLE) {
+ sp_ptr->cm_srvc_handle = NULL;
dapli_destroy_cm_id(conn);
-
+ }
+
return DAT_SUCCESS;
}
Index: dapl/openib/dapl_ib_util.h
===================================================================
--- dapl/openib/dapl_ib_util.h (revision 2919)
+++ dapl/openib/dapl_ib_util.h (working copy)
@@ -53,6 +53,7 @@
#include <byteswap.h>
#include <infiniband/sa.h>
#include <infiniband/cm.h>
+#include <infiniband/at.h>
/* Typedefs to map common DAPL provider types to IB verbs */
typedef struct ibv_qp *ib_qp_handle_t;
@@ -68,8 +69,8 @@
#define IB_RC_RETRY_COUNT 7
#define IB_RNR_RETRY_COUNT 7
-#define IB_CM_RESPONSE_TIMEOUT 20 /* 4 sec */
-#define IB_MAX_CM_RETRIES 4
+#define IB_CM_RESPONSE_TIMEOUT 18 /* 1 sec */
+#define IB_MAX_CM_RETRIES 7
#define IB_REQ_MRA_TIMEOUT 27 /* a little over 9 minutes */
#define IB_MAX_AT_RETRY 3
@@ -92,21 +93,12 @@
IB_CME_BROKEN
} ib_cm_events_t;
-#ifndef IB_AT
-/* implement a quick hack to exchange GID/LID's until user IB_AT arrives */
-struct ib_at_ib_route {
- union ibv_gid gid;
- uint16_t lid;
+struct dapl_at_record {
+ uint64_t req_id;
+ DAT_SOCK_ADDR6 *addr;
+ DAPL_OS_WAIT_OBJECT *wait_object;
};
-struct ib_at_completion {
- void (*fn)(uint64_t req_id, void *context, int rec_num);
- void *context;
- uint64_t req_id;
-};
-
-#endif
-
/*
* dapl_llist_entry in dapl.h but dapl.h depends on provider
* typedef's in this file first. move dapl_llist_entry out of dapl.h
@@ -122,6 +114,7 @@
struct dapl_cm_id {
struct ib_llist_entry entry;
DAPL_OS_LOCK lock;
+ DAPL_OS_WAIT_OBJECT wait_object;
int retries;
int destroy;
int in_callback;
@@ -238,6 +231,10 @@
{
struct ibv_device *ib_dev;
ib_cq_handle_t ib_cq_empty;
+ DAPL_OS_LOCK cq_lock;
+ DAPL_OS_WAIT_OBJECT wait_object;
+ int cq_destroy;
+ DAPL_OS_THREAD cq_thread;
int max_inline_send;
uint16_t lid;
union ibv_gid gid;
@@ -257,11 +254,18 @@
void cm_thread (void *arg);
int dapli_cm_thread_init(void);
void dapli_cm_thread_destroy(void);
+void at_thread (void *arg);
+int dapli_at_thread_init(void);
+void dapli_at_thread_destroy(void);
+void cq_thread (void *arg);
+int dapli_cq_thread_init(struct dapl_hca *hca_ptr);
+void dapli_cq_thread_destroy(struct dapl_hca *hca_ptr);
-int dapli_get_lid(struct dapl_hca *hca_ptr, int port, uint16_t *lid );
+int dapli_get_lid(struct dapl_hca *hca_ptr, int port, uint16_t *lid);
int dapli_get_gid(struct dapl_hca *hca_ptr, int port, int index,
- union ibv_gid *gid );
-int dapli_get_addr(char *addr, int addr_len);
+ union ibv_gid *gid);
+int dapli_get_hca_addr(struct dapl_hca *hca_ptr);
+void dapli_ip_comp_handler(uint64_t req_id, void *context, int rec_num);
DAT_RETURN
dapls_modify_qp_state ( IN ib_qp_handle_t qp_handle,
Index: dapl/openib/README
===================================================================
--- dapl/openib/README (revision 2919)
+++ dapl/openib/README (working copy)
@@ -39,18 +39,16 @@
server: dtest -s
client: dtest -h hostname
+
+Testing: dtest, dapltest - cl.sh regress.sh
-setup/known issues:
-
- First drop with uCM (without IBAT), tested with simple dtest across 2 nodes.
- hand rolled path records require remote LID and GID set via enviroment:
+Setup:
- export DAPL_REMOTE_GID "fe80:0000:0000:0000:0002:c902:0000:4071"
- export DAPL_REMOTE_LID "0002"
+ Third drop of code, includes uCM and uAT support.
+ NOTE: requires both uCM and uAT libraries and device modules from trunk.
- Also, hard coded (RTR) for use with port 1 only.
-
+Known issues:
no memory windows support in ibverbs, dat_create_rmr fails.
+ some uCM scale up issues with an 8 thread dapltest in regress.sh
+ hard coded modify QP RTR to port 1, waiting for ib_cm_init_qp_attr call.
-
-
Index: dapl/openib/dapl_ib_cq.c
===================================================================
--- dapl/openib/dapl_ib_cq.c (revision 2919)
+++ dapl/openib/dapl_ib_cq.c (working copy)
@@ -50,9 +50,96 @@
#include "dapl_adapter_util.h"
#include "dapl_lmr_util.h"
#include "dapl_evd_util.h"
+#include "dapl_ring_buffer_util.h"
#include <sys/poll.h>
+#include <signal.h>
+int dapli_cq_thread_init(struct dapl_hca *hca_ptr)
+{
+ DAT_RETURN dat_status;
+
+ dapl_dbg_log(DAPL_DBG_TYPE_UTIL," cq_thread_init(%p)\n", hca_ptr);
+
+ /* create thread to process inbound connect request */
+ dat_status = dapl_os_thread_create( cq_thread, (void*)hca_ptr,
&hca_ptr->ib_trans.cq_thread);
+ if (dat_status != DAT_SUCCESS)
+ {
+ dapl_dbg_log(DAPL_DBG_TYPE_ERR,
+ " cq_thread_init: failed to create thread\n");
+ return 1;
+ }
+ return 0;
+}
+
+void dapli_cq_thread_destroy(struct dapl_hca *hca_ptr)
+{
+ dapl_dbg_log(DAPL_DBG_TYPE_UTIL," cq_thread_destroy(%p)\n", hca_ptr);
+
+ /* destroy cr_thread and lock */
+ hca_ptr->ib_trans.cq_destroy = 1;
+ pthread_kill(hca_ptr->ib_trans.cq_thread, SIGUSR1);
+ dapl_dbg_log(DAPL_DBG_TYPE_CM," cq_thread_destroy(%p) SIGUSR1 sent\n",hca_ptr);
+ while (hca_ptr->ib_trans.cq_destroy != 2) {
+ struct timespec sleep, remain;
+ sleep.tv_sec = 0;
+ sleep.tv_nsec = 10000000; /* 10 ms */
+ dapl_dbg_log(DAPL_DBG_TYPE_UTIL,
+ " cq_thread_destroy: waiting for cq_thread\n");
+ nanosleep (&sleep, &remain);
+ }
+ dapl_dbg_log(DAPL_DBG_TYPE_UTIL," cq_thread_destroy(%d) exit\n",getpid());
+ return;
+}
+
+/* something to catch the signal */
+static void ib_cq_handler(int signum)
+{
+ return;
+}
+
+void cq_thread( void *arg )
+{
+ struct dapl_hca *hca_ptr = arg;
+ struct dapl_evd *evd_ptr;
+ struct ibv_cq *ibv_cq = NULL;
+ sigset_t sigset;
+ int status = 0;
+
+ dapl_dbg_log ( DAPL_DBG_TYPE_UTIL," cq_thread: ENTER hca %p\n",hca_ptr);
+
+ sigemptyset(&sigset);
+ sigaddset(&sigset,SIGUSR1);
+ pthread_sigmask(SIG_UNBLOCK, &sigset, NULL);
+ signal(SIGUSR1, ib_cq_handler);
+
+ /* wait on DTO event, or signal to abort */
+ while (!hca_ptr->ib_trans.cq_destroy) {
+
+ struct pollfd cq_poll = {
+ .fd = hca_ptr->ib_hca_handle->cq_fd[0],
+ .events = POLLIN,
+ .revents = 0
+ };
+ status = poll(&cq_poll, 1, -1);
+ if ((status == 1) &&
+ (!ibv_get_cq_event(hca_ptr->ib_hca_handle, 0, &ibv_cq, (void*)&evd_ptr))) {
+
+ if (DAPL_BAD_HANDLE(evd_ptr, DAPL_MAGIC_EVD))
+ continue;
+
+ /* process DTO event via callback */
+ dapl_evd_dto_callback ( evd_ptr->header.owner_ia->hca_ptr->ib_hca_handle,
+ evd_ptr->ib_cq_handle,
+ (void*)evd_ptr );
+ } else {
+
+ }
+ }
+ hca_ptr->ib_trans.cq_destroy = 2;
+ dapl_dbg_log(DAPL_DBG_TYPE_UTIL," cq_thread: EXIT: hca %p \n", hca_ptr);
+ return;
+}
/*
* Map all verbs DTO completion codes to the DAT equivelent.
*
@@ -410,9 +497,9 @@
IN DAPL_EVD *evd_ptr,
IN ib_wait_obj_handle_t *p_cq_wait_obj_handle )
{
- dapl_dbg_log ( DAPL_DBG_TYPE_UTIL,
+ dapl_dbg_log ( DAPL_DBG_TYPE_CM,
" cq_object_create: (%p)=%p\n",
- p_cq_wait_obj_handle, *p_cq_wait_obj_handle);
+ p_cq_wait_obj_handle, evd_ptr );
/* set cq_wait object to evd_ptr */
*p_cq_wait_obj_handle = evd_ptr;
@@ -447,33 +534,86 @@
{
DAPL_EVD *evd_ptr = p_cq_wait_obj_handle;
ib_cq_handle_t cq = evd_ptr->ib_cq_handle;
- struct ibv_cq *ibv_cq;
- void *ibv_ctx;
- int status = -ETIMEDOUT;
+ struct ibv_cq *ibv_cq = NULL;
+ void *ibv_ctx = NULL;
+ int status = 0;
- dapl_dbg_log ( DAPL_DBG_TYPE_UTIL,
+ dapl_dbg_log ( DAPL_DBG_TYPE_CM,
" cq_object_wait: dev %p evd %p cq %p, time %d\n",
cq->context, evd_ptr, cq, timeout );
- /* Multiple EVD's sharing one event handle for now */
- if (cq) {
- struct pollfd cq_poll = {
- .fd = cq->context->cq_fd[0],
- .events = POLLIN
+ /* Multiple EVD's sharing one event handle for now until uverbs supports more */
+
+ /*
+ * This makes it very inefficient and tricky to manage multiple CQ per device open
+ * For example: 4 threads waiting on separate CQ events will all be woke when
+ * a CQ event fires. So the poll wakes up and the first thread to get to the
+ * the get_cq_event wins and the other 3 will block. The dapl_evd_wait code
+ * above will immediately do a poll_cq after returning from CQ wait and if
+ * nothing on the queue will call this wait again and go back to sleep. So
+ * as long as they all wake up, a mutex is held around the get_cq_event
+ * so no blocking occurs and they all return then everything should work.
+ * Of course, the timeout needs adjusted on the threads that go back to sleep.
+ */
+ while (cq) {
+ struct pollfd cq_poll = {
+ .fd = cq->context->cq_fd[0],
+ .events = POLLIN,
+ .revents = 0
};
- int timeout_ms = -1;
+ int timeout_ms = -1;
if (timeout != DAT_TIMEOUT_INFINITE)
timeout_ms = timeout/1000;
-
+
+ /* check if another thread processed the event already, pending queue > 0 */
+ dapl_os_lock( &evd_ptr->header.owner_ia->hca_ptr->ib_trans.cq_lock );
+ if (dapls_rbuf_count(&evd_ptr->pending_event_queue)) {
+ dapl_os_unlock( &evd_ptr->header.owner_ia->hca_ptr->ib_trans.cq_lock );
+ break;
+ }
+ dapl_os_unlock( &evd_ptr->header.owner_ia->hca_ptr->ib_trans.cq_lock );
+
+ dapl_dbg_log ( DAPL_DBG_TYPE_CM," cq_object_wait: polling\n");
status = poll(&cq_poll, 1, timeout_ms);
- if (status == 1)
- status = ibv_get_cq_event(cq->context,
- 0, &ibv_cq, &ibv_ctx);
- }
- dapl_dbg_log (DAPL_DBG_TYPE_UTIL,
- " cq_object_wait: RET cq %p ibv_cq %p ibv_ctx %p %x\n",
- cq,ibv_cq,ibv_ctx,status);
+ dapl_dbg_log ( DAPL_DBG_TYPE_CM," cq_object_wait: poll returned
status=%d\n",status);
+
+ /*
+ * If poll with timeout wakes then hold mutex around a poll with no timeout
+ * so subsequent get_cq_events will be guaranteed not to block
+ * If the event does not belong to this EVD then put it on proper EVD pending
+ * queue under the mutex.
+ */
+ if (status == 1) {
+ dapl_os_lock( &evd_ptr->header.owner_ia->hca_ptr->ib_trans.cq_lock );
+ status = poll(&cq_poll, 1, 0);
+ if (status == 1) {
+ status = ibv_get_cq_event(cq->context,
+ 0, &ibv_cq, &ibv_ctx);
+
+ /* if event is not ours, put on proper evd pending queue */
+ /* force another wakeup */
+ if ((ibv_ctx != evd_ptr ) &&
+ (!DAPL_BAD_HANDLE(ibv_ctx, DAPL_MAGIC_EVD))) {
+ dapl_dbg_log (DAPL_DBG_TYPE_CM,
+ " cq_object_wait: ibv_ctx %p != evd %p\n",
+ ibv_ctx, evd_ptr);
+ dapls_evd_copy_cq((struct evd_ptr*)ibv_ctx);
+ dapl_os_unlock(
&evd_ptr->header.owner_ia->hca_ptr->ib_trans.cq_lock );
+ continue;
+ }
+ }
+ dapl_os_unlock( &evd_ptr->header.owner_ia->hca_ptr->ib_trans.cq_lock );
+ break;
+
+ } else if (status == 0) {
+ status = ETIMEDOUT;
+ break;
+ }
+ }
+ dapl_dbg_log (DAPL_DBG_TYPE_CM,
+ " cq_object_wait: RET evd %p cq %p ibv_cq %p ibv_ctx %p %s\n",
+ evd_ptr, cq,ibv_cq,ibv_ctx,strerror(errno));
return(dapl_convert_errno(status,"cq_wait_object_wait"));
More information about the general
mailing list