[ofa-general] [PATCH 1/6][uDAPL v1] dapl scm: change connect and accept to non-blocking to avoid blocking user thread.

Arlin Davis arlin.r.davis at intel.com
Thu Aug 14 16:19:16 PDT 2008


Patch set for uDAPL v1 that includes socket cm provider improvements. 
Similar patch set coming for uDAPL v2.

The connect socket that is used to exchange QP information is now non-blocking
and the data exchange is done via the cr thread. New state RTU_PENDING added.
On the passive side there is a new state ACCEPT_DATA used to avoid read blocking
on the user accept call.

Signed-off by: Arlin Davis ardavis at ichips.intel.com
---
 dapl/openib_scm/dapl_ib_cm.c   |  214 +++++++++++++++++++++++++++++-----------
 dapl/openib_scm/dapl_ib_util.h |    2 +
 2 files changed, 156 insertions(+), 60 deletions(-)

diff --git a/dapl/openib_scm/dapl_ib_cm.c b/dapl/openib_scm/dapl_ib_cm.c
index f78ebe6..03b0f12 100644
--- a/dapl/openib_scm/dapl_ib_cm.c
+++ b/dapl/openib_scm/dapl_ib_cm.c
@@ -197,6 +197,63 @@ dapli_socket_disconnect(ib_cm_handle_t	cm_ptr)
 	return DAT_SUCCESS;
 }
 
+/*
+ * ACTIVE: socket connected, send QP information to peer 
+ */
+void
+dapli_socket_connected(ib_cm_handle_t cm_ptr, int err)
+{
+	int		len, opt = 1;
+	struct iovec    iovec[2];
+	struct dapl_ep	*ep_ptr = cm_ptr->ep;
+
+	if (err) {
+		dapl_log(DAPL_DBG_TYPE_ERR, " connect: socket ERR %s\n", 
+			 strerror(err)); 
+		goto bail;
+	}
+	dapl_dbg_log(DAPL_DBG_TYPE_EP,
+		     " socket connected, write QP and private data\n"); 
+
+	/* no delay for small packets */
+	setsockopt(cm_ptr->socket,IPPROTO_TCP,TCP_NODELAY,&opt,sizeof(opt));
+
+	/* send qp info and pdata to remote peer */
+	iovec[0].iov_base = &cm_ptr->dst;
+	iovec[0].iov_len  = sizeof(ib_qp_cm_t);
+	if (cm_ptr->dst.p_size) {
+		iovec[1].iov_base = cm_ptr->p_data;
+		iovec[1].iov_len  = ntohl(cm_ptr->dst.p_size);
+	}
+
+	len = writev(cm_ptr->socket, iovec, (cm_ptr->dst.p_size ? 2:1));
+    	if (len != (ntohl(cm_ptr->dst.p_size) + sizeof(ib_qp_cm_t))) {
+		dapl_log(DAPL_DBG_TYPE_ERR, 
+			 " connect write: ERR %s, wcnt=%d\n",
+			 strerror(errno), len); 
+		goto bail;
+	}
+	dapl_dbg_log(DAPL_DBG_TYPE_CM, 
+		     " connected: sending SRC port=0x%x lid=0x%x,"
+		     " qpn=0x%x, psize=%d\n",
+		     ntohs(cm_ptr->dst.port), ntohs(cm_ptr->dst.lid), 
+		     ntohl(cm_ptr->dst.qpn), ntohl(cm_ptr->dst.p_size)); 
+        dapl_dbg_log(DAPL_DBG_TYPE_CM,
+                     " connected: sending SRC GID subnet %016llx id %016llx\n",
+                     (unsigned long long) 
+			cpu_to_be64(cm_ptr->dst.gid.global.subnet_prefix),
+                     (unsigned long long) 
+			cpu_to_be64(cm_ptr->dst.gid.global.interface_id));
+
+	/* queue up to work thread to avoid blocking consumer */
+	cm_ptr->state = SCM_RTU_PENDING;
+	return;
+bail:
+	/* close socket, free cm structure and post error event */
+	dapli_cm_destroy(cm_ptr);
+	dapl_evd_connection_callback(NULL, IB_CME_LOCAL_FAILURE, NULL, ep_ptr);
+}
+
 
 /*
  * ACTIVE: Create socket, connect, defer exchange QP information to CR thread
@@ -210,8 +267,7 @@ dapli_socket_connect(DAPL_EP		*ep_ptr,
 		     DAT_PVOID		p_data)
 {
 	ib_cm_handle_t cm_ptr;
-	int		len, opt = 1;
-	struct iovec    iovec[2];
+	int		ret;
 	DAPL_IA		*ia_ptr = ep_ptr->header.owner_ia;
 
 	dapl_dbg_log(DAPL_DBG_TYPE_EP, " connect: r_qual %d p_size=%d\n", 
@@ -227,19 +283,28 @@ dapli_socket_connect(DAPL_EP		*ep_ptr,
 		return DAT_INSUFFICIENT_RESOURCES;
 	}
 
-	((struct sockaddr_in*)r_addr)->sin_port = htons(r_qual);
+	/* non-blocking */
+	ret = fcntl(cm_ptr->socket, F_GETFL); 
+        if (ret < 0 || fcntl(cm_ptr->socket,
+                              F_SETFL, ret | O_NONBLOCK) < 0) {
+                dapl_log(DAPL_DBG_TYPE_ERR,
+                         " connect: fcntl on socket %d ERR %d %s\n",
+                         cm_ptr->socket, ret,
+                         strerror(errno));
+                goto bail;
+        }
 
-	if (connect(cm_ptr->socket, r_addr, sizeof(*r_addr)) < 0) {
-		dapl_dbg_log(DAPL_DBG_TYPE_ERR,
-			     " connect: %s on r_qual %d\n",
-			     strerror(errno), (unsigned int)r_qual);
+	((struct sockaddr_in*)r_addr)->sin_port = htons(r_qual);
+	ret = connect(cm_ptr->socket, r_addr, sizeof(*r_addr));
+	if (ret && errno != EINPROGRESS) {
+		dapl_log(DAPL_DBG_TYPE_ERR,
+			     " connect ERROR: %s on %s r_qual %d\n",
+			     strerror(errno), 
+		     	     inet_ntoa(((struct sockaddr_in *)r_addr)->sin_addr),
+			     (unsigned int)r_qual);
 		dapli_cm_destroy(cm_ptr);
 		return DAT_INVALID_ADDRESS;
-	}
-	setsockopt(cm_ptr->socket,IPPROTO_TCP,TCP_NODELAY,&opt,sizeof(opt));
-
-	dapl_dbg_log(DAPL_DBG_TYPE_EP, " socket connected!\n");
-
+	} 
 
 	/* Send QP info, IA address, and private data */
 	cm_ptr->dst.qpn = htonl(ep_ptr->qp_handle->qp_num);
@@ -257,41 +322,36 @@ dapli_socket_connect(DAPL_EP		*ep_ptr,
                                     &cm_ptr->dst.gid))
 		goto bail;
 
+	/* save references */
+	cm_ptr->hca = ia_ptr->hca_ptr;
+	cm_ptr->ep = ep_ptr;
 	cm_ptr->dst.ia_address = ia_ptr->hca_ptr->hca_address;
-	cm_ptr->dst.p_size = htonl(p_size);
-	iovec[0].iov_base = &cm_ptr->dst;
-	iovec[0].iov_len  = sizeof(ib_qp_cm_t);
 	if (p_size) {
-		iovec[1].iov_base = p_data;
-		iovec[1].iov_len  = p_size;
+		cm_ptr->dst.p_size = htonl(p_size);
+		dapl_os_memcpy(cm_ptr->p_data, p_data, p_size);
 	}
 
-	dapl_dbg_log(DAPL_DBG_TYPE_EP," socket connected, write QP and private data\n"); 
-	len = writev(cm_ptr->socket, iovec, (p_size ? 2:1));
-    	if (len != (p_size + sizeof(ib_qp_cm_t))) {
-		dapl_dbg_log(DAPL_DBG_TYPE_ERR, 
-			     " connect write: ERR %s, wcnt=%d\n",
-			     strerror(errno), len); 
-		goto bail;
-	}
-	dapl_dbg_log(DAPL_DBG_TYPE_CM, 
-		     " connect: SRC port=0x%x lid=0x%x, qpn=0x%x, psize=%d\n",
-		     ntohs(cm_ptr->dst.port), ntohs(cm_ptr->dst.lid), 
-		     ntohl(cm_ptr->dst.qpn), ntohl(cm_ptr->dst.p_size)); 
-        dapl_dbg_log(DAPL_DBG_TYPE_CM,
-                     " connect SRC GID subnet %016llx id %016llx\n",
-                     (unsigned long long) 
-			cpu_to_be64(cm_ptr->dst.gid.global.subnet_prefix),
-                     (unsigned long long) 
-			cpu_to_be64(cm_ptr->dst.gid.global.interface_id));
-
-	/* queue up to work thread to avoid blocking consumer */
-	cm_ptr->state = SCM_CONN_PENDING;
-	cm_ptr->hca = ia_ptr->hca_ptr;
-	cm_ptr->ep = ep_ptr;
+	/* connected or pending, either way results via async event */
+	if (ret == 0) 
+		dapli_socket_connected(cm_ptr,0);
+	else 
+		cm_ptr->state = SCM_CONN_PENDING;
+	
+	dapl_dbg_log(DAPL_DBG_TYPE_EP,
+	             " connect: socket %d to %s r_qual %d pending\n",
+		     cm_ptr->socket,
+		     inet_ntoa(((struct sockaddr_in *)r_addr)->sin_addr),
+		     (unsigned int)r_qual);
+			
 	dapli_cm_queue(cm_ptr);
 	return DAT_SUCCESS;
 bail:
+	dapl_log(DAPL_DBG_TYPE_ERR,
+		 " connect ERROR: %s query lid(0x%x)/gid on %s r_qual %d\n",
+		 strerror(errno),ntohs(cm_ptr->dst.lid), 
+		 inet_ntoa(((struct sockaddr_in *)r_addr)->sin_addr),
+		 (unsigned int)r_qual);
+
 	/* close socket, free cm structure */
 	dapli_cm_destroy(cm_ptr);
 	return DAT_INTERNAL_ERROR;
@@ -470,25 +530,22 @@ bail:
 	return dat_status;
 }
 
-
 /*
- * PASSIVE: accept socket, receive peer QP information, private data, post cr_event 
+ * PASSIVE: accept socket 
  */
-DAT_RETURN 
+void 
 dapli_socket_accept(ib_cm_srvc_handle_t cm_ptr)
 {
 	ib_cm_handle_t	acm_ptr;
-	void		*p_data = NULL;
 	int		len;
-	DAT_RETURN	dat_status = DAT_SUCCESS;
 		
 	dapl_dbg_log(DAPL_DBG_TYPE_EP," socket_accept\n"); 
 
 	/* Allocate accept CM and initialize */
 	if ((acm_ptr = dapl_os_alloc(sizeof(*acm_ptr))) == NULL) 
-		return DAT_INSUFFICIENT_RESOURCES;
+		goto bail;
 
-	(void) dapl_os_memzero( acm_ptr, sizeof( *acm_ptr ) );
+	(void) dapl_os_memzero(acm_ptr, sizeof(*acm_ptr));
 	
 	acm_ptr->socket = -1;
 	acm_ptr->sp = cm_ptr->sp;
@@ -498,15 +555,34 @@ dapli_socket_accept(ib_cm_srvc_handle_t cm_ptr)
 	acm_ptr->socket = accept(cm_ptr->socket, 
 				(struct sockaddr*)&acm_ptr->dst.ia_address, 
 				(socklen_t*)&len);
-
 	if (acm_ptr->socket < 0) {
 		dapl_dbg_log(DAPL_DBG_TYPE_ERR, 
 			" accept: ERR %s on FD %d l_cr %p\n",
 			strerror(errno),cm_ptr->socket,cm_ptr); 
-		dat_status = DAT_INTERNAL_ERROR;
 		goto bail;
    	}
 
+	dapl_dbg_log(DAPL_DBG_TYPE_EP,
+		     " socket accepted, queue new cm %p\n",acm_ptr); 
+
+	acm_ptr->state = SCM_ACCEPTING;
+	dapli_cm_queue(acm_ptr);
+	return;
+bail:
+	/* close socket, free cm structure, active will see socket close as reject */
+	if (acm_ptr)
+		dapli_cm_destroy(acm_ptr);
+}
+
+/*
+ * PASSIVE: receive peer QP information, private data, post cr_event 
+ */
+void 
+dapli_socket_accept_data(ib_cm_srvc_handle_t acm_ptr)
+{
+	int len;
+	void *p_data = NULL;
+
 	dapl_dbg_log(DAPL_DBG_TYPE_EP," socket accepted, read QP data\n"); 
 
 	/* read in DST QP info, IA address. check for private data */
@@ -516,7 +592,6 @@ dapli_socket_accept(ib_cm_srvc_handle_t cm_ptr)
 		dapl_dbg_log(DAPL_DBG_TYPE_ERR, 
 			     " accept read: ERR %s, rcnt=%d, ver=%d\n",
 			     strerror(errno), len, acm_ptr->dst.ver); 
-		dat_status = DAT_INTERNAL_ERROR;
 		goto bail;
 	}
 
@@ -537,7 +612,6 @@ dapli_socket_accept(ib_cm_srvc_handle_t cm_ptr)
 		dapl_dbg_log(DAPL_DBG_TYPE_ERR, 
 			     " accept read: psize (%d) wrong\n",
 			     acm_ptr->dst.p_size); 
-		dat_status = DAT_INTERNAL_ERROR;
 		goto bail;
 	}
 
@@ -551,24 +625,24 @@ dapli_socket_accept(ib_cm_srvc_handle_t cm_ptr)
 			dapl_dbg_log(DAPL_DBG_TYPE_ERR, 
 				     " accept read pdata: ERR %s, rcnt=%d\n",
 				     strerror(errno), len); 
-			dat_status = DAT_INTERNAL_ERROR;
 			goto bail;
 		}
 		dapl_dbg_log(DAPL_DBG_TYPE_EP," accept: psize=%d read\n",len);
 		p_data = acm_ptr->p_data;
 	}
 	
-	acm_ptr->state = SCM_ACCEPTING;
+	acm_ptr->state = SCM_ACCEPTING_DATA;
 
 	/* trigger CR event and return SUCCESS */
 	dapls_cr_callback(acm_ptr,
 			  IB_CME_CONNECTION_REQUEST_PENDING,
 		          p_data,
 			  acm_ptr->sp );
-	return DAT_SUCCESS;
+	return;
 bail:
+	/* close socket, free cm structure, active will see socket close as reject */
 	dapli_cm_destroy(acm_ptr);
-	return DAT_INTERNAL_ERROR;
+	return;
 }
 
 /*
@@ -669,7 +743,6 @@ dapli_socket_accept_usr(DAPL_EP		*ep_ptr,
 			sizeof(cm_ptr->dst.ia_address));
 
 	dapl_dbg_log( DAPL_DBG_TYPE_EP," PASSIVE: accepted!\n" ); 
-	dapli_cm_queue(cm_ptr);
 	return DAT_SUCCESS;
 bail:
 	dapl_dbg_log(DAPL_DBG_TYPE_ERR," accept_rtu: ERR !QP_RTR_RTS \n"); 
@@ -1202,7 +1275,8 @@ void cr_thread(void *arg)
 {
     struct dapl_hca	*hca_ptr = arg;
     ib_cm_handle_t	cr, next_cr;
-    int 		ret,idx;
+    int 		opt,ret,idx;
+    socklen_t		opt_len;
     char		rbuf[2];
     struct pollfd	ufds[SCM_MAX_CONN];
      
@@ -1242,14 +1316,19 @@ void cr_thread(void *arg)
 		
 	    /* Add to ufds for poll, check for immediate work */
 	    ufds[++idx].fd = cr->socket; /* add listen or cr */
-	    ufds[idx].events = POLLIN;
+	    if (cr->state == SCM_CONN_PENDING)
+	    	ufds[idx].events = POLLOUT;
+	    else
+		ufds[idx].events = POLLIN;
 
 	    /* check socket for event, accept in or connect out */
 	    dapl_dbg_log(DAPL_DBG_TYPE_CM," poll cr=%p, fd=%d,%d\n", 
 				cr, cr->socket, ufds[idx].fd);
 	    dapl_os_unlock(&hca_ptr->ib_trans.lock);
 	    ret = poll(&ufds[idx],1,1);
-	    dapl_dbg_log(DAPL_DBG_TYPE_CM," poll wakeup ret=%d cr->st=%d ev=%d fd=%d\n",
+	    dapl_dbg_log(DAPL_DBG_TYPE_CM,
+			 " poll wakeup ret=%d cr->st=%d"
+			 " ev=0x%x fd=%d\n",
 			 ret,cr->state,ufds[idx].revents,ufds[idx].fd);
 
 	    /* data on listen, qp exchange, and on disconnect request */
@@ -1257,13 +1336,28 @@ void cr_thread(void *arg)
 		if (cr->socket > 0) {
 			if (cr->state == SCM_LISTEN)
 				dapli_socket_accept(cr);
+			else if (cr->state == SCM_ACCEPTING)
+				dapli_socket_accept_data(cr);
 			else if (cr->state == SCM_ACCEPTED)
 				dapli_socket_accept_rtu(cr);
-			else if (cr->state == SCM_CONN_PENDING)
+			else if (cr->state == SCM_RTU_PENDING)
 				dapli_socket_connect_rtu(cr);
 			else if (cr->state == SCM_CONNECTED)
 				dapli_socket_disconnect(cr);
 		}
+	    /* connect socket is writable, check status */
+	    } else if ((ret == 1) && 
+			(ufds[idx].revents & POLLOUT ||
+			 ufds[idx].revents & POLLERR)) {
+		if (cr->state == SCM_CONN_PENDING) {
+			opt = 0;
+			ret = getsockopt(cr->socket, SOL_SOCKET, 
+					 SO_ERROR, &opt, &opt_len);
+			if (!ret)
+				dapli_socket_connected(cr,opt);
+			else
+				dapli_socket_connected(cr,EFAULT);
+		}
 	    } else if (ret != 0) {
     		dapl_dbg_log(DAPL_DBG_TYPE_CM,
 			     " cr_thread(cr=%p) st=%d poll ERR= %s\n",
diff --git a/dapl/openib_scm/dapl_ib_util.h b/dapl/openib_scm/dapl_ib_util.h
index 37c5dbb..6d7568c 100644
--- a/dapl/openib_scm/dapl_ib_util.h
+++ b/dapl/openib_scm/dapl_ib_util.h
@@ -102,7 +102,9 @@ typedef enum scm_state
 	SCM_INIT,
 	SCM_LISTEN,
 	SCM_CONN_PENDING,
+	SCM_RTU_PENDING,
 	SCM_ACCEPTING,
+	SCM_ACCEPTING_DATA,
 	SCM_ACCEPTED,
 	SCM_REJECTED,
 	SCM_CONNECTED,
-- 
1.5.2.5





More information about the general mailing list