[openib-general] [PATCH] uDAPL cq channel support, sync with latest verbs

Arlin Davis arlin.r.davis at intel.com
Fri Sep 30 17:24:49 PDT 2005


James,

Here is a patch to support CQ_WAIT_OBJECT with channels and sync with latest verbs.
Tested with dapltest, dtest, netpipe, and Intel-MPI. 

-arlin

Signed-off by: Arlin Davis <ardavis at ichips.intel.com>

Index: dapl/udapl/Makefile
===================================================================
--- dapl/udapl/Makefile	(revision 3629)
+++ dapl/udapl/Makefile	(working copy)
@@ -134,7 +134,7 @@
 ifeq ($(VERBS),openib)
 PROVIDER = $(TOPDIR)/../openib
 CFLAGS   += -DOPENIB
-#CFLAGS   += -DCQ_WAIT_OBJECT uncomment when fixed
+CFLAGS   += -DCQ_WAIT_OBJECT 
 CFLAGS   += -I/usr/local/include/infiniband
 endif
 
Index: dapl/openib/dapl_ib_util.c
===================================================================
--- dapl/openib/dapl_ib_util.c	(revision 3629)
+++ dapl/openib/dapl_ib_util.c	(working copy)
@@ -208,8 +208,6 @@
 {
 	struct dlist	*dev_list;
 	long		opts;
-	int		i;
-
 	dapl_dbg_log (DAPL_DBG_TYPE_UTIL, 
 		      " open_hca: %s - %p\n", hca_name, hca_ptr );
 
@@ -278,16 +276,18 @@
 			      " open_hca: ERR with async FD\n" );
 		goto bail;
 	}
-	for (i=0;i<hca_ptr->ib_hca_handle->num_comp;i++) { /* uCQ */
-		opts = fcntl(hca_ptr->ib_hca_handle->cq_fd[i], F_GETFL);
-		if (opts < 0 || fcntl(hca_ptr->ib_hca_handle->async_fd, 
-				      F_SETFL, opts | O_NONBLOCK) < 0) {
-			dapl_dbg_log(DAPL_DBG_TYPE_ERR, 
-				     " open_hca: ERR with CQ FD\n");
-			goto bail;
-		}
-	}	
-	
+
+	/* EVD events without direct CQ channels, non-blocking */
+	hca_ptr->ib_trans.ib_cq = 
+		ibv_create_comp_channel(hca_ptr->ib_hca_handle);
+	opts = fcntl(hca_ptr->ib_trans.ib_cq->fd, F_GETFL); /* uCQ */
+	if (opts < 0 || fcntl(hca_ptr->ib_trans.ib_cq->fd, 
+			      F_SETFL, opts | O_NONBLOCK) < 0) {
+		dapl_dbg_log (DAPL_DBG_TYPE_ERR, 
+			      " open_hca: ERR with CQ FD\n" );
+		goto bail;
+	}
+
 	/* Get CM device handle for events, and set to non-blocking */	
 	hca_ptr->ib_trans.ib_cm = ib_cm_get_device(hca_ptr->ib_hca_handle);
 	opts = fcntl(hca_ptr->ib_trans.ib_cm->fd, F_GETFL); /* uCM */
@@ -320,6 +320,7 @@
 		      ((struct sockaddr_in *)&hca_ptr->hca_address)->sin_addr.s_addr >> 24 & 0xff,
 		      hca_ptr->ib_trans.max_inline_send );
 
+	hca_ptr->ib_trans.d_hca = hca_ptr;
 	return DAT_SUCCESS;
 
 bail:
@@ -704,7 +705,6 @@
 	}
 }
 
-
 /* work thread for uAT, uCM, CQ, and async events */
 void dapli_thread(void *arg) 
 {
@@ -741,7 +741,6 @@
 			hca = NULL;
 		
 		while(hca) {
-			int i;
 			ufds[++idx].fd = hca->ib_cm->fd; /* uCM */
 			ufds[idx].events = POLLIN;
 			ufds[idx].revents = 0;
@@ -750,15 +749,17 @@
 			ufds[idx].events = POLLIN;
 			ufds[idx].revents = 0;
 			uhca[idx] = hca;
-			for (i=0;i<hca->ib_ctx->num_comp;i++) {	 /* uCQ */
-				ufds[++idx].fd = hca->ib_ctx->cq_fd[i];	
+			
+			if (hca->ib_cq != NULL) {
+				ufds[++idx].fd = hca->ib_cq->fd; /* uCQ */
 				ufds[idx].events = POLLIN;
 				ufds[idx].revents = 0;
 				uhca[idx] = hca;
 			}
+			
 			hca = dapl_llist_next_entry(
-					&g_hca_list,
-					(DAPL_LLIST_ENTRY*)&hca->entry);
+				&g_hca_list,
+				(DAPL_LLIST_ENTRY*)&hca->entry);
 		}
 		
 		/* unlock, and setup poll */
@@ -810,6 +811,5 @@
 	dapl_dbg_log(DAPL_DBG_TYPE_UTIL," ib_thread(%d) EXIT\n",getpid());
 	g_ib_destroy = 2;
 	dapl_os_unlock(&g_hca_lock);	
-	pthread_exit(NULL);
 }
 
Index: dapl/openib/dapl_ib_cm.c
===================================================================
--- dapl/openib/dapl_ib_cm.c	(revision 3629)
+++ dapl/openib/dapl_ib_cm.c	(working copy)
@@ -291,7 +291,6 @@
 	}
 
 	/* move QP state to RTR and RTS */
-	/* TODO: could use a ib_cm_init_qp_attr() call here */
 	dapl_dbg_log(DAPL_DBG_TYPE_CM, 
 		    " rep_recv: RTR_RTS: id %d rqp %x rlid %x rSID %d\n",
 		     conn->cm_id,event->param.rep_rcvd.remote_qpn,
@@ -621,8 +620,8 @@
 	dapl_dbg_log(DAPL_DBG_TYPE_CM, 
 		" connect: at_route requested(ret=%d,id=%d): SRC %x DST %x\n", 
 	     status, conn->dapl_comp.req_id,
-	     ((struct sockaddr_in *)&conn->hca->hca_address)->sin_addr.s_addr,
-	     ((struct sockaddr_in *)&conn->r_addr)->sin_addr.s_addr);
+	     ntohl(((struct sockaddr_in *)&conn->hca->hca_address)->sin_addr.s_addr),
+	     ntohl(((struct sockaddr_in *)&conn->r_addr)->sin_addr.s_addr));
 
 	if (status < 0) {
 		dat_status = dapl_convert_errno(errno,"ib_at_route_by_ip");
Index: dapl/openib/dapl_ib_qp.c
===================================================================
--- dapl/openib/dapl_ib_qp.c	(revision 3629)
+++ dapl/openib/dapl_ib_qp.c	(working copy)
@@ -82,13 +82,21 @@
 	 * Create a CQ with zero entries under the covers to support and 
 	 * catch any invalid posting. 
 	 */
-	if ( rcv_evd != DAT_HANDLE_NULL ) 
+	if (rcv_evd != DAT_HANDLE_NULL) 
 		rcv_cq = rcv_evd->ib_cq_handle;
 	else if (!ia_ptr->hca_ptr->ib_trans.ib_cq_empty) 
 		rcv_cq = ia_ptr->hca_ptr->ib_trans.ib_cq_empty;
 	else {
-		rcv_cq = ibv_create_cq(ia_ptr->hca_ptr->ib_hca_handle,	
-				       0, NULL);
+		struct ibv_comp_channel *channel = 
+					ia_ptr->hca_ptr->ib_trans.ib_cq;
+#ifdef CQ_WAIT_OBJECT
+		if (rcv_evd->cq_wait_obj_handle)
+			channel = rcv_evd->cq_wait_obj_handle;
+#endif
+		/* Call IB verbs to create CQ */
+		rcv_cq = ibv_create_cq(ia_ptr->hca_ptr->ib_hca_handle,
+				       0, NULL, channel, 0);
+
 		if (rcv_cq == IB_INVALID_HANDLE) 
 			return(dapl_convert_errno(ENOMEM, "create_cq"));
 
Index: dapl/openib/dapl_ib_util.h
===================================================================
--- dapl/openib/dapl_ib_util.h	(revision 3629)
+++ dapl/openib/dapl_ib_util.h	(working copy)
@@ -159,7 +159,7 @@
 typedef uint32_t		ib_comp_handle_t;
 
 #ifdef CQ_WAIT_OBJECT
-typedef struct dapl_evd		*ib_wait_obj_handle_t;
+typedef struct ibv_comp_channel *ib_wait_obj_handle_t;
 #endif
 
 /* Definitions */
@@ -233,9 +233,11 @@
 { 
 	struct ib_llist_entry	entry;
 	int			destroy;
+	struct dapl_hca		*d_hca;
 	struct ibv_device	*ib_dev;
 	struct ibv_context	*ib_ctx;
 	struct ib_cm_device	*ib_cm;
+	struct ibv_comp_channel *ib_cq;
 	ib_cq_handle_t		ib_cq_empty;
 	DAPL_OS_WAIT_OBJECT     wait_object;
 	int			max_inline_send;
Index: dapl/openib/dapl_ib_cq.c
===================================================================
--- dapl/openib/dapl_ib_cq.c	(revision 3629)
+++ dapl/openib/dapl_ib_cq.c	(working copy)
@@ -53,37 +53,36 @@
 #include "dapl_ring_buffer_util.h"
 #include <sys/poll.h>
 
+/* One CQ event channel per HCA */
 void dapli_cq_event_cb(struct _ib_hca_transport *hca)
 {
-	int i;
-	dapl_dbg_log(DAPL_DBG_TYPE_UTIL," dapli_cq_event_cb(%p)\n", hca);
-
 	/* check all comp events on this device */
-	for(i=0;i<hca->ib_ctx->num_comp;i++) {
-		struct dapl_evd	*evd_ptr = NULL;
-		struct ibv_cq	*ibv_cq = NULL;
-		struct pollfd	cq_fd = {
-			.fd      = hca->ib_ctx->cq_fd[i],
-			.events  = POLLIN,
-			.revents = 0
-		};
-		if ((poll(&cq_fd, 1, 0) == 1) &&
-			(!ibv_get_cq_event(hca->ib_ctx, i, 
-					   &ibv_cq, (void*)&evd_ptr))) {
-
-			if (DAPL_BAD_HANDLE(evd_ptr, DAPL_MAGIC_EVD)) {
-				ibv_ack_cq_events(ibv_cq, 1);
-				continue;
-			}
+	struct dapl_evd	*evd_ptr = NULL;
+	struct ibv_cq	*ibv_cq = NULL;
+	struct pollfd	cq_fd = {
+		.fd      = hca->ib_cq->fd,
+		.events  = POLLIN,
+		.revents = 0 
+	};
+	
+	dapl_dbg_log(DAPL_DBG_TYPE_UTIL," dapli_cq_event_cb(%p)\n", hca);
 
-			/* process DTO event via callback */
-			dapl_evd_dto_callback ( hca->ib_ctx,
-						evd_ptr->ib_cq_handle,
-						(void*)evd_ptr );
+	if ((poll(&cq_fd, 1, 0) == 1) &&
+		(!ibv_get_cq_event(hca->ib_cq,  
+				   &ibv_cq, (void*)&evd_ptr))) {
 
+		if (DAPL_BAD_HANDLE(evd_ptr, DAPL_MAGIC_EVD)) {
 			ibv_ack_cq_events(ibv_cq, 1);
-		} 
-	}
+			return;
+		}
+
+		/* process DTO event via callback */
+		dapl_evd_dto_callback ( hca->ib_ctx,
+					evd_ptr->ib_cq_handle,
+					(void*)evd_ptr );
+
+		ibv_ack_cq_events(ibv_cq, 1);
+	} 
 }
 
 /*
@@ -241,16 +240,24 @@
 	dapl_dbg_log ( DAPL_DBG_TYPE_UTIL, 
 		"dapls_ib_cq_alloc: evd %p cqlen=%d \n", evd_ptr, *cqlen );
 
+	struct ibv_comp_channel *channel = ia_ptr->hca_ptr->ib_trans.ib_cq;
+
+#ifdef CQ_WAIT_OBJECT
+	if (evd_ptr->cq_wait_obj_handle)
+		channel = evd_ptr->cq_wait_obj_handle;
+#endif
+
 	/* Call IB verbs to create CQ */
 	evd_ptr->ib_cq_handle = ibv_create_cq(ia_ptr->hca_ptr->ib_hca_handle,
 					      *cqlen,
-					      evd_ptr);
+					      evd_ptr,
+					      channel, 0);
 	
 	if (evd_ptr->ib_cq_handle == IB_INVALID_HANDLE) 
 		return	DAT_INSUFFICIENT_RESOURCES;
 
 	/* arm cq for events */
-	dapls_set_cq_notify (ia_ptr, evd_ptr);
+	dapls_set_cq_notify(ia_ptr, evd_ptr);
 	
         /* update with returned cq entry size */
 	*cqlen = evd_ptr->ib_cq_handle->cqe;
@@ -288,16 +295,21 @@
 	IN  DAT_COUNT	*cqlen )
 {
 	ib_cq_handle_t	new_cq;
+	struct ibv_comp_channel *channel = ia_ptr->hca_ptr->ib_trans.ib_cq;
 
 	/* IB verbs doe not support resize. Try to re-create CQ
 	 * with new size. Can only be done if QP is not attached. 
 	 * destroy EBUSY == QP still attached.
 	 */
 
-	/* create a new size before destroying original */
-	new_cq = ibv_create_cq( ia_ptr->hca_ptr->ib_hca_handle,
-				*cqlen,
-				evd_ptr);
+#ifdef CQ_WAIT_OBJECT
+	if (evd_ptr->cq_wait_obj_handle)
+		channel = evd_ptr->cq_wait_obj_handle;
+#endif
+
+	/* Call IB verbs to create CQ */
+	new_cq = ibv_create_cq(ia_ptr->hca_ptr->ib_hca_handle, *cqlen,
+			       evd_ptr, channel, 0);
 
 	if (new_cq == IB_INVALID_HANDLE) 
 		return	DAT_INSUFFICIENT_RESOURCES;
@@ -444,12 +456,13 @@
 		IN ib_wait_obj_handle_t	*p_cq_wait_obj_handle )
 {
 	dapl_dbg_log (	DAPL_DBG_TYPE_CM, 
-			" cq_object_create: (%p)=%p\n", 
-			p_cq_wait_obj_handle, evd_ptr );
+			" cq_object_create: (%p,%p)\n", 
+			evd_ptr, p_cq_wait_obj_handle );
 
 	/* set cq_wait object to evd_ptr */
-	*p_cq_wait_obj_handle = evd_ptr;
-	
+	*p_cq_wait_obj_handle = 
+		ibv_create_comp_channel(evd_ptr->header.owner_ia->hca_ptr->ib_hca_handle);	
+		
 	return DAT_SUCCESS;
 }
 
@@ -460,6 +473,9 @@
 	dapl_dbg_log (  DAPL_DBG_TYPE_UTIL, 
 			" cq_object_destroy: wait_obj=%p\n", 
 			p_cq_wait_obj_handle );
+	
+	ibv_destroy_comp_channel(p_cq_wait_obj_handle);
+	
 	return DAT_SUCCESS;
 }
 
@@ -470,6 +486,8 @@
 	dapl_dbg_log (  DAPL_DBG_TYPE_UTIL, 
 			" cq_object_wakeup: wait_obj=%p\n", 
 			p_cq_wait_obj_handle );
+
+        /* no wake up mechanism */
 	return DAT_SUCCESS;
 }
 
@@ -478,88 +496,42 @@
 	IN ib_wait_obj_handle_t	    p_cq_wait_obj_handle,
 	IN u_int32_t 		    timeout)
 {
-	DAPL_EVD		*evd_ptr = p_cq_wait_obj_handle;
-	ib_cq_handle_t		cq = evd_ptr->ib_cq_handle;
-	struct ibv_cq		*ibv_cq = NULL;
-	void			*ibv_ctx = NULL;
-	int			status = 0; 
-
-	dapl_dbg_log ( DAPL_DBG_TYPE_CM, 
-			" cq_object_wait: dev %p evd %p cq %p, time %d\n", 
-			cq->context, evd_ptr, cq, timeout );
-
-	/* Multiple EVD's sharing one event handle for now until uverbs supports more */
-
-	/*
-	 *  This makes it very inefficient and tricky to manage multiple CQ per device open
-	 *  For example: 4 threads waiting on separate CQ events will all be woke when
-	 *  a CQ event fires. So the poll wakes up and the first thread to get to the
-	 *  the get_cq_event wins and the other 3 will block. The dapl_evd_wait code
-	 *  above will immediately do a poll_cq after returning from CQ wait and if
-	 *  nothing on the queue will call this wait again and go back to sleep. So
-	 *  as long as they all wake up, a mutex is held around the get_cq_event
-	 *  so no blocking occurs and they all return then everything should work.
-	 *  Of course, the timeout needs adjusted on the threads that go back to sleep.
-	 */
-	while (cq) {
-		struct pollfd cq_poll = {
-			.fd      = cq->context->cq_fd[0],
+	struct dapl_evd	*evd_ptr;
+	struct ibv_cq	*ibv_cq = NULL;
+	void		*ibv_ctx = NULL;
+	int		status = 0; 
+	int		timeout_ms = -1;
+	struct pollfd cq_fd = {
+			.fd      = p_cq_wait_obj_handle->fd,
 			.events  = POLLIN,
 			.revents = 0
 		};
-		int     timeout_ms = -1;
 
-		if (timeout != DAT_TIMEOUT_INFINITE)
-			timeout_ms = timeout/1000;
-
-		/* check if another thread processed the event already, pending queue > 0 */
-		dapl_os_lock( &evd_ptr->header.owner_ia->hca_ptr->ib_trans.cq_lock );
-		if (dapls_rbuf_count(&evd_ptr->pending_event_queue)) {
-			dapl_os_unlock( &evd_ptr->header.owner_ia->hca_ptr->ib_trans.cq_lock );
-			break;	
+	dapl_dbg_log ( DAPL_DBG_TYPE_CM, 
+			" cq_object_wait: CQ channel %p time %d\n", 
+			p_cq_wait_obj_handle, timeout );
+	
+	/* uDAPL timeout values in usecs */
+	if (timeout != DAT_TIMEOUT_INFINITE)
+		timeout_ms = timeout/1000;
+
+	status = poll(&cq_fd, 1, timeout_ms);
+
+	/* returned event */
+	if (status > 0) {
+		if (!ibv_get_cq_event(p_cq_wait_obj_handle, 
+				      &ibv_cq, (void*)&evd_ptr)) {
+			ibv_ack_cq_events(ibv_cq, 1);
 		}
-		dapl_os_unlock( &evd_ptr->header.owner_ia->hca_ptr->ib_trans.cq_lock );
+		status = 0;
 
-		dapl_dbg_log ( DAPL_DBG_TYPE_CM," cq_object_wait: polling\n");
-		status = poll(&cq_poll, 1, timeout_ms);
-		dapl_dbg_log ( DAPL_DBG_TYPE_CM," cq_object_wait: poll returned
status=%d\n",status);
-
-		/*
-		 * If poll with timeout wakes then hold mutex around a poll with no timeout
-		 * so subsequent get_cq_events will be guaranteed not to block
-		 * If the event does not belong to this EVD then put it on proper EVD pending 
-		 * queue under the mutex.
-		 */
-		if (status == 1) {
-			dapl_os_lock( &evd_ptr->header.owner_ia->hca_ptr->ib_trans.cq_lock );
-			status = poll(&cq_poll, 1, 0);
-			if (status == 1) {
-				status = ibv_get_cq_event(cq->context,
-							  0, &ibv_cq, &ibv_ctx);
-
-				/* if event is not ours, put on proper evd pending queue */
-				/* force another wakeup */
-				if ((ibv_ctx != evd_ptr ) && 
-				    (!DAPL_BAD_HANDLE(ibv_ctx, DAPL_MAGIC_EVD))) {
-					dapl_dbg_log (DAPL_DBG_TYPE_CM,
-						      " cq_object_wait: ibv_ctx %p != evd %p\n",
-						      ibv_ctx, evd_ptr);
-					dapls_evd_copy_cq((struct evd_ptr*)ibv_ctx); 
-
dapl_os_unlock(&evd_ptr->header.owner_ia->hca_ptr->ib_trans.cq_lock );
-					continue;
-				}	
-			}	
-			dapl_os_unlock( &evd_ptr->header.owner_ia->hca_ptr->ib_trans.cq_lock );
-			break;
-
-		} else if (status == 0) {
-			status = ETIMEDOUT;  
-			break;
-		}
-	}	
+	/* timeout */
+	} else if (status == 0) 
+		status = ETIMEDOUT;
+	
 	dapl_dbg_log (DAPL_DBG_TYPE_CM, 
-		      " cq_object_wait: RET evd %p cq %p ibv_cq %p ibv_ctx %p %s\n",
-		      evd_ptr, cq,ibv_cq,ibv_ctx,strerror(errno));
+		      " cq_object_wait: RET evd %p ibv_cq %p ibv_ctx %p %s\n",
+		      evd_ptr, ibv_cq,ibv_ctx,strerror(errno));
 	
 	return(dapl_convert_errno(status,"cq_wait_object_wait"));
 	


-------------- next part --------------
A non-text attachment was scrubbed...
Name: cq_channel.patch
Type: application/octet-stream
Size: 14204 bytes
Desc: not available
URL: <http://lists.openfabrics.org/pipermail/general/attachments/20050930/ee9fd54e/attachment.obj>


More information about the general mailing list