[openib-general] [PATCH] uDAPL cq channel support, sync with latest verbs
Arlin Davis
arlin.r.davis at intel.com
Fri Sep 30 17:24:49 PDT 2005
James,
Here is a patch to support CQ_WAIT_OBJECT with channels and sync with latest verbs.
Tested with dapltest, dtest, netpipe, and Intel-MPI.
-arlin
Signed-off by: Arlin Davis <ardavis at ichips.intel.com>
Index: dapl/udapl/Makefile
===================================================================
--- dapl/udapl/Makefile (revision 3629)
+++ dapl/udapl/Makefile (working copy)
@@ -134,7 +134,7 @@
ifeq ($(VERBS),openib)
PROVIDER = $(TOPDIR)/../openib
CFLAGS += -DOPENIB
-#CFLAGS += -DCQ_WAIT_OBJECT uncomment when fixed
+CFLAGS += -DCQ_WAIT_OBJECT
CFLAGS += -I/usr/local/include/infiniband
endif
Index: dapl/openib/dapl_ib_util.c
===================================================================
--- dapl/openib/dapl_ib_util.c (revision 3629)
+++ dapl/openib/dapl_ib_util.c (working copy)
@@ -208,8 +208,6 @@
{
struct dlist *dev_list;
long opts;
- int i;
-
dapl_dbg_log (DAPL_DBG_TYPE_UTIL,
" open_hca: %s - %p\n", hca_name, hca_ptr );
@@ -278,16 +276,18 @@
" open_hca: ERR with async FD\n" );
goto bail;
}
- for (i=0;i<hca_ptr->ib_hca_handle->num_comp;i++) { /* uCQ */
- opts = fcntl(hca_ptr->ib_hca_handle->cq_fd[i], F_GETFL);
- if (opts < 0 || fcntl(hca_ptr->ib_hca_handle->async_fd,
- F_SETFL, opts | O_NONBLOCK) < 0) {
- dapl_dbg_log(DAPL_DBG_TYPE_ERR,
- " open_hca: ERR with CQ FD\n");
- goto bail;
- }
- }
-
+
+ /* EVD events without direct CQ channels, non-blocking */
+ hca_ptr->ib_trans.ib_cq =
+ ibv_create_comp_channel(hca_ptr->ib_hca_handle);
+ opts = fcntl(hca_ptr->ib_trans.ib_cq->fd, F_GETFL); /* uCQ */
+ if (opts < 0 || fcntl(hca_ptr->ib_trans.ib_cq->fd,
+ F_SETFL, opts | O_NONBLOCK) < 0) {
+ dapl_dbg_log (DAPL_DBG_TYPE_ERR,
+ " open_hca: ERR with CQ FD\n" );
+ goto bail;
+ }
+
/* Get CM device handle for events, and set to non-blocking */
hca_ptr->ib_trans.ib_cm = ib_cm_get_device(hca_ptr->ib_hca_handle);
opts = fcntl(hca_ptr->ib_trans.ib_cm->fd, F_GETFL); /* uCM */
@@ -320,6 +320,7 @@
((struct sockaddr_in *)&hca_ptr->hca_address)->sin_addr.s_addr >> 24 & 0xff,
hca_ptr->ib_trans.max_inline_send );
+ hca_ptr->ib_trans.d_hca = hca_ptr;
return DAT_SUCCESS;
bail:
@@ -704,7 +705,6 @@
}
}
-
/* work thread for uAT, uCM, CQ, and async events */
void dapli_thread(void *arg)
{
@@ -741,7 +741,6 @@
hca = NULL;
while(hca) {
- int i;
ufds[++idx].fd = hca->ib_cm->fd; /* uCM */
ufds[idx].events = POLLIN;
ufds[idx].revents = 0;
@@ -750,15 +749,17 @@
ufds[idx].events = POLLIN;
ufds[idx].revents = 0;
uhca[idx] = hca;
- for (i=0;i<hca->ib_ctx->num_comp;i++) { /* uCQ */
- ufds[++idx].fd = hca->ib_ctx->cq_fd[i];
+
+ if (hca->ib_cq != NULL) {
+ ufds[++idx].fd = hca->ib_cq->fd; /* uCQ */
ufds[idx].events = POLLIN;
ufds[idx].revents = 0;
uhca[idx] = hca;
}
+
hca = dapl_llist_next_entry(
- &g_hca_list,
- (DAPL_LLIST_ENTRY*)&hca->entry);
+ &g_hca_list,
+ (DAPL_LLIST_ENTRY*)&hca->entry);
}
/* unlock, and setup poll */
@@ -810,6 +811,5 @@
dapl_dbg_log(DAPL_DBG_TYPE_UTIL," ib_thread(%d) EXIT\n",getpid());
g_ib_destroy = 2;
dapl_os_unlock(&g_hca_lock);
- pthread_exit(NULL);
}
Index: dapl/openib/dapl_ib_cm.c
===================================================================
--- dapl/openib/dapl_ib_cm.c (revision 3629)
+++ dapl/openib/dapl_ib_cm.c (working copy)
@@ -291,7 +291,6 @@
}
/* move QP state to RTR and RTS */
- /* TODO: could use a ib_cm_init_qp_attr() call here */
dapl_dbg_log(DAPL_DBG_TYPE_CM,
" rep_recv: RTR_RTS: id %d rqp %x rlid %x rSID %d\n",
conn->cm_id,event->param.rep_rcvd.remote_qpn,
@@ -621,8 +620,8 @@
dapl_dbg_log(DAPL_DBG_TYPE_CM,
" connect: at_route requested(ret=%d,id=%d): SRC %x DST %x\n",
status, conn->dapl_comp.req_id,
- ((struct sockaddr_in *)&conn->hca->hca_address)->sin_addr.s_addr,
- ((struct sockaddr_in *)&conn->r_addr)->sin_addr.s_addr);
+ ntohl(((struct sockaddr_in *)&conn->hca->hca_address)->sin_addr.s_addr),
+ ntohl(((struct sockaddr_in *)&conn->r_addr)->sin_addr.s_addr));
if (status < 0) {
dat_status = dapl_convert_errno(errno,"ib_at_route_by_ip");
Index: dapl/openib/dapl_ib_qp.c
===================================================================
--- dapl/openib/dapl_ib_qp.c (revision 3629)
+++ dapl/openib/dapl_ib_qp.c (working copy)
@@ -82,13 +82,21 @@
* Create a CQ with zero entries under the covers to support and
* catch any invalid posting.
*/
- if ( rcv_evd != DAT_HANDLE_NULL )
+ if (rcv_evd != DAT_HANDLE_NULL)
rcv_cq = rcv_evd->ib_cq_handle;
else if (!ia_ptr->hca_ptr->ib_trans.ib_cq_empty)
rcv_cq = ia_ptr->hca_ptr->ib_trans.ib_cq_empty;
else {
- rcv_cq = ibv_create_cq(ia_ptr->hca_ptr->ib_hca_handle,
- 0, NULL);
+ struct ibv_comp_channel *channel =
+ ia_ptr->hca_ptr->ib_trans.ib_cq;
+#ifdef CQ_WAIT_OBJECT
+ if (rcv_evd->cq_wait_obj_handle)
+ channel = rcv_evd->cq_wait_obj_handle;
+#endif
+ /* Call IB verbs to create CQ */
+ rcv_cq = ibv_create_cq(ia_ptr->hca_ptr->ib_hca_handle,
+ 0, NULL, channel, 0);
+
if (rcv_cq == IB_INVALID_HANDLE)
return(dapl_convert_errno(ENOMEM, "create_cq"));
Index: dapl/openib/dapl_ib_util.h
===================================================================
--- dapl/openib/dapl_ib_util.h (revision 3629)
+++ dapl/openib/dapl_ib_util.h (working copy)
@@ -159,7 +159,7 @@
typedef uint32_t ib_comp_handle_t;
#ifdef CQ_WAIT_OBJECT
-typedef struct dapl_evd *ib_wait_obj_handle_t;
+typedef struct ibv_comp_channel *ib_wait_obj_handle_t;
#endif
/* Definitions */
@@ -233,9 +233,11 @@
{
struct ib_llist_entry entry;
int destroy;
+ struct dapl_hca *d_hca;
struct ibv_device *ib_dev;
struct ibv_context *ib_ctx;
struct ib_cm_device *ib_cm;
+ struct ibv_comp_channel *ib_cq;
ib_cq_handle_t ib_cq_empty;
DAPL_OS_WAIT_OBJECT wait_object;
int max_inline_send;
Index: dapl/openib/dapl_ib_cq.c
===================================================================
--- dapl/openib/dapl_ib_cq.c (revision 3629)
+++ dapl/openib/dapl_ib_cq.c (working copy)
@@ -53,37 +53,36 @@
#include "dapl_ring_buffer_util.h"
#include <sys/poll.h>
+/* One CQ event channel per HCA */
void dapli_cq_event_cb(struct _ib_hca_transport *hca)
{
- int i;
- dapl_dbg_log(DAPL_DBG_TYPE_UTIL," dapli_cq_event_cb(%p)\n", hca);
-
/* check all comp events on this device */
- for(i=0;i<hca->ib_ctx->num_comp;i++) {
- struct dapl_evd *evd_ptr = NULL;
- struct ibv_cq *ibv_cq = NULL;
- struct pollfd cq_fd = {
- .fd = hca->ib_ctx->cq_fd[i],
- .events = POLLIN,
- .revents = 0
- };
- if ((poll(&cq_fd, 1, 0) == 1) &&
- (!ibv_get_cq_event(hca->ib_ctx, i,
- &ibv_cq, (void*)&evd_ptr))) {
-
- if (DAPL_BAD_HANDLE(evd_ptr, DAPL_MAGIC_EVD)) {
- ibv_ack_cq_events(ibv_cq, 1);
- continue;
- }
+ struct dapl_evd *evd_ptr = NULL;
+ struct ibv_cq *ibv_cq = NULL;
+ struct pollfd cq_fd = {
+ .fd = hca->ib_cq->fd,
+ .events = POLLIN,
+ .revents = 0
+ };
+
+ dapl_dbg_log(DAPL_DBG_TYPE_UTIL," dapli_cq_event_cb(%p)\n", hca);
- /* process DTO event via callback */
- dapl_evd_dto_callback ( hca->ib_ctx,
- evd_ptr->ib_cq_handle,
- (void*)evd_ptr );
+ if ((poll(&cq_fd, 1, 0) == 1) &&
+ (!ibv_get_cq_event(hca->ib_cq,
+ &ibv_cq, (void*)&evd_ptr))) {
+ if (DAPL_BAD_HANDLE(evd_ptr, DAPL_MAGIC_EVD)) {
ibv_ack_cq_events(ibv_cq, 1);
- }
- }
+ return;
+ }
+
+ /* process DTO event via callback */
+ dapl_evd_dto_callback ( hca->ib_ctx,
+ evd_ptr->ib_cq_handle,
+ (void*)evd_ptr );
+
+ ibv_ack_cq_events(ibv_cq, 1);
+ }
}
/*
@@ -241,16 +240,24 @@
dapl_dbg_log ( DAPL_DBG_TYPE_UTIL,
"dapls_ib_cq_alloc: evd %p cqlen=%d \n", evd_ptr, *cqlen );
+ struct ibv_comp_channel *channel = ia_ptr->hca_ptr->ib_trans.ib_cq;
+
+#ifdef CQ_WAIT_OBJECT
+ if (evd_ptr->cq_wait_obj_handle)
+ channel = evd_ptr->cq_wait_obj_handle;
+#endif
+
/* Call IB verbs to create CQ */
evd_ptr->ib_cq_handle = ibv_create_cq(ia_ptr->hca_ptr->ib_hca_handle,
*cqlen,
- evd_ptr);
+ evd_ptr,
+ channel, 0);
if (evd_ptr->ib_cq_handle == IB_INVALID_HANDLE)
return DAT_INSUFFICIENT_RESOURCES;
/* arm cq for events */
- dapls_set_cq_notify (ia_ptr, evd_ptr);
+ dapls_set_cq_notify(ia_ptr, evd_ptr);
/* update with returned cq entry size */
*cqlen = evd_ptr->ib_cq_handle->cqe;
@@ -288,16 +295,21 @@
IN DAT_COUNT *cqlen )
{
ib_cq_handle_t new_cq;
+ struct ibv_comp_channel *channel = ia_ptr->hca_ptr->ib_trans.ib_cq;
/* IB verbs doe not support resize. Try to re-create CQ
* with new size. Can only be done if QP is not attached.
* destroy EBUSY == QP still attached.
*/
- /* create a new size before destroying original */
- new_cq = ibv_create_cq( ia_ptr->hca_ptr->ib_hca_handle,
- *cqlen,
- evd_ptr);
+#ifdef CQ_WAIT_OBJECT
+ if (evd_ptr->cq_wait_obj_handle)
+ channel = evd_ptr->cq_wait_obj_handle;
+#endif
+
+ /* Call IB verbs to create CQ */
+ new_cq = ibv_create_cq(ia_ptr->hca_ptr->ib_hca_handle, *cqlen,
+ evd_ptr, channel, 0);
if (new_cq == IB_INVALID_HANDLE)
return DAT_INSUFFICIENT_RESOURCES;
@@ -444,12 +456,13 @@
IN ib_wait_obj_handle_t *p_cq_wait_obj_handle )
{
dapl_dbg_log ( DAPL_DBG_TYPE_CM,
- " cq_object_create: (%p)=%p\n",
- p_cq_wait_obj_handle, evd_ptr );
+ " cq_object_create: (%p,%p)\n",
+ evd_ptr, p_cq_wait_obj_handle );
/* set cq_wait object to evd_ptr */
- *p_cq_wait_obj_handle = evd_ptr;
-
+ *p_cq_wait_obj_handle =
+ ibv_create_comp_channel(evd_ptr->header.owner_ia->hca_ptr->ib_hca_handle);
+
return DAT_SUCCESS;
}
@@ -460,6 +473,9 @@
dapl_dbg_log ( DAPL_DBG_TYPE_UTIL,
" cq_object_destroy: wait_obj=%p\n",
p_cq_wait_obj_handle );
+
+ ibv_destroy_comp_channel(p_cq_wait_obj_handle);
+
return DAT_SUCCESS;
}
@@ -470,6 +486,8 @@
dapl_dbg_log ( DAPL_DBG_TYPE_UTIL,
" cq_object_wakeup: wait_obj=%p\n",
p_cq_wait_obj_handle );
+
+ /* no wake up mechanism */
return DAT_SUCCESS;
}
@@ -478,88 +496,42 @@
IN ib_wait_obj_handle_t p_cq_wait_obj_handle,
IN u_int32_t timeout)
{
- DAPL_EVD *evd_ptr = p_cq_wait_obj_handle;
- ib_cq_handle_t cq = evd_ptr->ib_cq_handle;
- struct ibv_cq *ibv_cq = NULL;
- void *ibv_ctx = NULL;
- int status = 0;
-
- dapl_dbg_log ( DAPL_DBG_TYPE_CM,
- " cq_object_wait: dev %p evd %p cq %p, time %d\n",
- cq->context, evd_ptr, cq, timeout );
-
- /* Multiple EVD's sharing one event handle for now until uverbs supports more */
-
- /*
- * This makes it very inefficient and tricky to manage multiple CQ per device open
- * For example: 4 threads waiting on separate CQ events will all be woke when
- * a CQ event fires. So the poll wakes up and the first thread to get to the
- * the get_cq_event wins and the other 3 will block. The dapl_evd_wait code
- * above will immediately do a poll_cq after returning from CQ wait and if
- * nothing on the queue will call this wait again and go back to sleep. So
- * as long as they all wake up, a mutex is held around the get_cq_event
- * so no blocking occurs and they all return then everything should work.
- * Of course, the timeout needs adjusted on the threads that go back to sleep.
- */
- while (cq) {
- struct pollfd cq_poll = {
- .fd = cq->context->cq_fd[0],
+ struct dapl_evd *evd_ptr;
+ struct ibv_cq *ibv_cq = NULL;
+ void *ibv_ctx = NULL;
+ int status = 0;
+ int timeout_ms = -1;
+ struct pollfd cq_fd = {
+ .fd = p_cq_wait_obj_handle->fd,
.events = POLLIN,
.revents = 0
};
- int timeout_ms = -1;
- if (timeout != DAT_TIMEOUT_INFINITE)
- timeout_ms = timeout/1000;
-
- /* check if another thread processed the event already, pending queue > 0 */
- dapl_os_lock( &evd_ptr->header.owner_ia->hca_ptr->ib_trans.cq_lock );
- if (dapls_rbuf_count(&evd_ptr->pending_event_queue)) {
- dapl_os_unlock( &evd_ptr->header.owner_ia->hca_ptr->ib_trans.cq_lock );
- break;
+ dapl_dbg_log ( DAPL_DBG_TYPE_CM,
+ " cq_object_wait: CQ channel %p time %d\n",
+ p_cq_wait_obj_handle, timeout );
+
+ /* uDAPL timeout values in usecs */
+ if (timeout != DAT_TIMEOUT_INFINITE)
+ timeout_ms = timeout/1000;
+
+ status = poll(&cq_fd, 1, timeout_ms);
+
+ /* returned event */
+ if (status > 0) {
+ if (!ibv_get_cq_event(p_cq_wait_obj_handle,
+ &ibv_cq, (void*)&evd_ptr)) {
+ ibv_ack_cq_events(ibv_cq, 1);
}
- dapl_os_unlock( &evd_ptr->header.owner_ia->hca_ptr->ib_trans.cq_lock );
+ status = 0;
- dapl_dbg_log ( DAPL_DBG_TYPE_CM," cq_object_wait: polling\n");
- status = poll(&cq_poll, 1, timeout_ms);
- dapl_dbg_log ( DAPL_DBG_TYPE_CM," cq_object_wait: poll returned
status=%d\n",status);
-
- /*
- * If poll with timeout wakes then hold mutex around a poll with no timeout
- * so subsequent get_cq_events will be guaranteed not to block
- * If the event does not belong to this EVD then put it on proper EVD pending
- * queue under the mutex.
- */
- if (status == 1) {
- dapl_os_lock( &evd_ptr->header.owner_ia->hca_ptr->ib_trans.cq_lock );
- status = poll(&cq_poll, 1, 0);
- if (status == 1) {
- status = ibv_get_cq_event(cq->context,
- 0, &ibv_cq, &ibv_ctx);
-
- /* if event is not ours, put on proper evd pending queue */
- /* force another wakeup */
- if ((ibv_ctx != evd_ptr ) &&
- (!DAPL_BAD_HANDLE(ibv_ctx, DAPL_MAGIC_EVD))) {
- dapl_dbg_log (DAPL_DBG_TYPE_CM,
- " cq_object_wait: ibv_ctx %p != evd %p\n",
- ibv_ctx, evd_ptr);
- dapls_evd_copy_cq((struct evd_ptr*)ibv_ctx);
-
dapl_os_unlock(&evd_ptr->header.owner_ia->hca_ptr->ib_trans.cq_lock );
- continue;
- }
- }
- dapl_os_unlock( &evd_ptr->header.owner_ia->hca_ptr->ib_trans.cq_lock );
- break;
-
- } else if (status == 0) {
- status = ETIMEDOUT;
- break;
- }
- }
+ /* timeout */
+ } else if (status == 0)
+ status = ETIMEDOUT;
+
dapl_dbg_log (DAPL_DBG_TYPE_CM,
- " cq_object_wait: RET evd %p cq %p ibv_cq %p ibv_ctx %p %s\n",
- evd_ptr, cq,ibv_cq,ibv_ctx,strerror(errno));
+ " cq_object_wait: RET evd %p ibv_cq %p ibv_ctx %p %s\n",
+ evd_ptr, ibv_cq,ibv_ctx,strerror(errno));
return(dapl_convert_errno(status,"cq_wait_object_wait"));
-------------- next part --------------
A non-text attachment was scrubbed...
Name: cq_channel.patch
Type: application/octet-stream
Size: 14204 bytes
Desc: not available
URL: <http://lists.openfabrics.org/pipermail/general/attachments/20050930/ee9fd54e/attachment.obj>
More information about the general
mailing list