[ofw] [PATCH] scm: fixes and optimizations for connection scaling

Davis, Arlin R arlin.r.davis at intel.com
Fri Jun 26 16:50:00 PDT 2009


Prioritize accepts on listen ports via FD_READ
process the accepts ahead of other work to avoid
socket half_connection (SYN_RECV) stalls.

Fix dapl_poll to return DAPL_FD_ERROR on
all event error types.

Add new state for socket released, but CR
not yet destroyed. This enables scm to release
the socket resources immediately after exchanging
all QP information. Also, add state to str call.

Only add the CR reference to the EP if it is
RC type. UD has multiple CR's per EP so when
a UD EP disconnect_clean was called, from a
timeout, it destroyed the wrong CR.

Signed-off-by: Arlin Davis <arlin.r.davis at intel.com>
---
 dapl/openib_scm/dapl_ib_cm.c   |  176 +++++++++++++++++++++++-----------------
 dapl/openib_scm/dapl_ib_util.h |    4 +-
 2 files changed, 103 insertions(+), 77 deletions(-)

diff --git a/dapl/openib_scm/dapl_ib_cm.c b/dapl/openib_scm/dapl_ib_cm.c
index 27defb6..90d6d27 100644
--- a/dapl/openib_scm/dapl_ib_cm.c
+++ b/dapl/openib_scm/dapl_ib_cm.c
@@ -213,12 +213,14 @@ static enum DAPL_FD_EVENTS dapl_poll(DAPL_SOCKET s, enum DAPL_FD_EVENTS event)
 	fds.events = event;
 	fds.revents = 0;
 	ret = poll(&fds, 1, 0);
-	dapl_dbg_log(DAPL_DBG_TYPE_CM, " dapl_poll: ret=%d, events=0x%x\n",
-		     ret, fds.revents);
-	if (ret <= 0)
-		return ret;
-
-	return fds.revents;
+	dapl_log(DAPL_DBG_TYPE_CM, " dapl_poll: fd=%d ret=%d, evnts=0x%x\n",
+		 s, ret, fds.revents);
+	if (ret == 0)
+		return 0;
+	else if (fds.revents & (POLLERR | POLLHUP | POLLNVAL)) 
+		return DAPL_FD_ERROR;
+	else 
+		return fds.revents;
 }
 
 static int dapl_select(struct dapl_fd_set *set)
@@ -271,8 +273,10 @@ static void dapli_cm_destroy(struct ib_cm_handle *cm_ptr)
 
 	dapl_os_lock(&cm_ptr->lock);
 	cm_ptr->state = SCM_DESTROY;
-	if ((cm_ptr->ep) && (cm_ptr->ep->cm_handle == cm_ptr))
+	if ((cm_ptr->ep) && (cm_ptr->ep->cm_handle == cm_ptr)) {
 		cm_ptr->ep->cm_handle = IB_INVALID_HANDLE;
+		cm_ptr->ep = NULL;
+	}
 
 	/* close socket if still active */
 	if (cm_ptr->socket != DAPL_INVALID_SOCKET) {
@@ -369,11 +373,14 @@ static void dapli_socket_connected(dp_ib_cm_handle_t cm_ptr, int err)
 
 	if (err) {
 		dapl_log(DAPL_DBG_TYPE_ERR,
-			 " CONN_PENDING: socket ERR %s -> %s\n",
-			 strerror(err), inet_ntoa(((struct sockaddr_in *)
-						   ep_ptr->param.
-						   remote_ia_address_ptr)->
-						  sin_addr));
+			 " CONN_PENDING: %s ERR %s -> %s %d\n",
+			 err == -1 ? "POLL" : "SOCKOPT",
+			 err == -1 ? strerror(errno) : strerror(err), 
+			 inet_ntoa(((struct sockaddr_in *)
+				   ep_ptr->param.
+				   remote_ia_address_ptr)->sin_addr), 
+			 ntohs(((struct sockaddr_in *)
+				&cm_ptr->dst.ia_address)->sin_port));
 		goto bail;
 	}
 	dapl_dbg_log(DAPL_DBG_TYPE_EP,
@@ -486,6 +493,9 @@ dapli_socket_connect(DAPL_EP * ep_ptr,
 	cm_ptr->hca = ia_ptr->hca_ptr;
 	cm_ptr->ep = ep_ptr;
 	cm_ptr->dst.ia_address = ia_ptr->hca_ptr->hca_address;
+	((struct sockaddr_in *)
+		&cm_ptr->dst.ia_address)->sin_port = ntohs(r_qual);
+
 	if (p_size) {
 		cm_ptr->dst.p_size = htonl(p_size);
 		dapl_os_memcpy(cm_ptr->p_data, p_data, p_size);
@@ -642,7 +652,6 @@ static void dapli_socket_connect_rtu(dp_ib_cm_handle_t cm_ptr)
 		goto bail;
 	}
 	/* init cm_handle and post the event with private data */
-	ep_ptr->cm_handle = cm_ptr;
 	cm_ptr->state = SCM_CONNECTED;
 	event = IB_CME_CONNECTED;
 	dapl_dbg_log(DAPL_DBG_TYPE_EP, " ACTIVE: connected!\n");
@@ -677,11 +686,15 @@ ud_bail:
 		/* done with socket, don't destroy cm_ptr, need pdata */
 		closesocket(cm_ptr->socket);
 		cm_ptr->socket = DAPL_INVALID_SOCKET;
+		cm_ptr->state = SCM_RELEASED;
 	} else
 #endif
+	{
+		ep_ptr->cm_handle = cm_ptr; /* only RC, multi CR's on UD */
 		dapl_evd_connection_callback(cm_ptr,
 					     IB_CME_CONNECTED,
 					     cm_ptr->p_data, ep_ptr);
+	}
 	return;
 
 bail:
@@ -769,34 +782,36 @@ static void dapli_socket_accept(ib_cm_srvc_handle_t cm_ptr)
 	int len;
 
 	dapl_dbg_log(DAPL_DBG_TYPE_EP, " socket_accept\n");
+	
+	/* 
+	 * Accept all CR's on this port to avoid half-connection (SYN_RCV)
+	 * stalls with many to one connection storms
+	 */
+	do {
+		/* Allocate accept CM and initialize */
+		if ((acm_ptr = dapli_cm_create()) == NULL)
+			return;
+
+		acm_ptr->sp = cm_ptr->sp;
+		acm_ptr->hca = cm_ptr->hca;
+
+		len = sizeof(acm_ptr->dst.ia_address);
+		acm_ptr->socket = accept(cm_ptr->socket,
+					(struct sockaddr *)
+					&acm_ptr->dst.ia_address,
+					(socklen_t *) & len);
+		if (acm_ptr->socket == DAPL_INVALID_SOCKET) {
+			dapl_log(DAPL_DBG_TYPE_ERR,
+				" accept: ERR %s on FD %d l_cr %p\n",
+				strerror(errno), cm_ptr->socket, cm_ptr);
+			dapli_cm_destroy(acm_ptr);
+			return;
+		}
 
-	/* Allocate accept CM and initialize */
-	if ((acm_ptr = dapli_cm_create()) == NULL)
-		return;
-
-	acm_ptr->sp = cm_ptr->sp;
-	acm_ptr->hca = cm_ptr->hca;
-
-	len = sizeof(acm_ptr->dst.ia_address);
-	acm_ptr->socket = accept(cm_ptr->socket,
-				 (struct sockaddr *)&acm_ptr->dst.ia_address,
-				 (socklen_t *) & len);
-	if (acm_ptr->socket == DAPL_INVALID_SOCKET) {
-		dapl_log(DAPL_DBG_TYPE_ERR,
-			 " accept: ERR %s on FD %d l_cr %p\n",
-			 strerror(errno), cm_ptr->socket, cm_ptr);
-		goto bail;
-	}
-
-	dapl_dbg_log(DAPL_DBG_TYPE_EP,
-		     " socket accepted, queue new cm %p\n", acm_ptr);
-
-	acm_ptr->state = SCM_ACCEPTING;
-	dapli_cm_queue(acm_ptr);
-	return;
-      bail:
-	/* close socket, free cm structure, active will see socket close as reject */
-	dapli_cm_destroy(acm_ptr);
+		acm_ptr->state = SCM_ACCEPTING;
+		dapli_cm_queue(acm_ptr);
+	
+	} while (dapl_poll(cm_ptr->socket, DAPL_FD_READ) == DAPL_FD_READ);
 }
 
 /*
@@ -964,6 +979,9 @@ dapli_socket_accept_usr(DAPL_EP * ep_ptr,
 	local.lid = ia_ptr->hca_ptr->ib_trans.lid;
 	local.gid = ia_ptr->hca_ptr->ib_trans.gid;
 	local.ia_address = ia_ptr->hca_ptr->hca_address;
+	((struct sockaddr_in *)&local.ia_address)->sin_port = 
+		ntohs(cm_ptr->sp->conn_qual);
+
 	local.p_size = htonl(p_size);
 	iov[0].iov_base = (void *)&local;
 	iov[0].iov_len = sizeof(ib_qp_cm_t);
@@ -1059,6 +1077,7 @@ void dapli_socket_accept_rtu(dp_ib_cm_handle_t cm_ptr)
                 /* done with socket, don't destroy cm_ptr, need pdata */
                 closesocket(cm_ptr->socket);
                 cm_ptr->socket = DAPL_INVALID_SOCKET;
+		cm_ptr->state = SCM_RELEASED;
 	} else
 #endif
 		dapls_cr_callback(cm_ptr, IB_CME_CONNECTED, NULL, cm_ptr->sp);
@@ -1146,7 +1165,8 @@ dapls_ib_disconnect(IN DAPL_EP * ep_ptr, IN DAT_CLOSE_FLAGS close_flags)
  *
  * Clean up outstanding connection data. This routine is invoked
  * after the final disconnect callback has occurred. Only on the
- * ACTIVE side of a connection.
+ * ACTIVE side of a connection. It is also called if dat_ep_connect
+ * times out using the consumer supplied timeout value.
  *
  * Input:
  *	ep_ptr		DAPL_EP
@@ -1164,6 +1184,14 @@ dapls_ib_disconnect_clean(IN DAPL_EP * ep_ptr,
 			  IN DAT_BOOLEAN active,
 			  IN const ib_cm_events_t ib_cm_event)
 {
+	/* NOTE: SCM will only initialize cm_handle with RC type
+	 * 
+	 * For UD there can many in-flight CR's so you 
+	 * cannot cleanup timed out CR's with EP reference 
+	 * alone since they share the same EP. The common
+	 * code that handles connection timeout logic needs 
+	 * updated for UD support.
+	 */
 	if (ep_ptr->cm_handle)
 		dapli_cm_destroy(ep_ptr->cm_handle);
 
@@ -1633,7 +1661,7 @@ void cr_thread(void *arg)
 				     " poll ret=0x%x cr->state=%d socket=%d\n",
 				     ret, cr->state, cr->socket);
 
-			/* data on listen, qp exchange, and on disconnect request */
+			/* data on listen, qp exchange, and on disc req */
 			if (ret == DAPL_FD_READ) {
 				if (cr->socket != DAPL_INVALID_SOCKET) {
 					switch (cr->state) {
@@ -1656,40 +1684,29 @@ void cr_thread(void *arg)
 						break;
 					}
 				}
-				/* connect socket is writable, check status */
-			} else if (ret == DAPL_FD_WRITE || ret == DAPL_FD_ERROR) {
-				if (cr->state == SCM_CONN_PENDING) {
-					opt = 0;
-					opt_len = sizeof(opt);
-					ret = getsockopt(cr->socket, SOL_SOCKET,
-							 SO_ERROR, (char *)&opt,
-							 &opt_len);
-					if (!ret)
-						dapli_socket_connected(cr, opt);
-					else
-						dapli_socket_connected(cr,
-								       errno);
-				} else {
-					dapl_log(DAPL_DBG_TYPE_CM,
-						 " CM poll ERR, wrong state(%d) -> %s SKIP\n",
-						 cr->state,
-						 inet_ntoa(((struct sockaddr_in
-							     *)&cr->dst.
-							    ia_address)->
-							   sin_addr));
-				}
-			} else if (ret != 0) {
-				dapl_log(DAPL_DBG_TYPE_CM,
-					 " CM poll warning %s, ret=%d st=%d -> %s\n",
-					 strerror(errno), ret, cr->state,
-					 inet_ntoa(((struct sockaddr_in *)
-						    &cr->dst.ia_address)->
-						   sin_addr));
-
-				/* POLLUP, NVAL, or poll error. - DISC */
+			/* connect socket is writable, check status */
+			} else if (ret == DAPL_FD_WRITE ||
+				   (cr->state == SCM_CONN_PENDING && 
+				    ret == DAPL_FD_ERROR)) {
+				opt = 0;
+				opt_len = sizeof(opt);
+				ret = getsockopt(cr->socket, SOL_SOCKET,
+						 SO_ERROR, (char *)&opt,
+						 &opt_len);
+				if (!ret)
+					dapli_socket_connected(cr, opt);
+				else
+					dapli_socket_connected(cr, errno);
+			 
+			/* POLLUP, ERR, NVAL, or poll error - DISC */
+			} else if (ret < 0 || ret == DAPL_FD_ERROR) {
+				dapl_log(DAPL_DBG_TYPE_WARN,
+				     " poll=%d cr->st=%s sk=%d ep %p, %d\n",
+				     ret, dapl_cm_state_str(cr->state), 
+				     cr->socket, cr->ep,
+				     cr->ep ? cr->ep->param.ep_state:0);
 				dapli_socket_disconnect(cr);
 			}
-
 			dapl_os_lock(&hca_ptr->ib_trans.lock);
 		}
 
@@ -1748,12 +1765,19 @@ void dapls_print_cm_list(IN DAPL_IA *ia_ptr)
 				 &ia_ptr->hca_ptr->ib_trans.list,
 				(DAPL_LLIST_ENTRY*)&cr->entry);
 
-		printf( "  CONN[%d]: sp %p ep %p sock %d %s %s -> %s\n",
+		printf( "  CONN[%d]: sp %p ep %p sock %d %s %s %s %s %d\n",
 			i, cr->sp, cr->ep, cr->socket,
 			cr->dst.qp_type == IBV_QPT_RC ? "RC" : "UD",
 			dapl_cm_state_str(cr->state),
+			cr->sp ? "<-" : "->",
+			cr->state == SCM_LISTEN ? 
+			inet_ntoa(((struct sockaddr_in *)
+				&ia_ptr->hca_ptr->hca_address)->sin_addr) :
 			inet_ntoa(((struct sockaddr_in *)
-				  &cr->dst.ia_address)->sin_addr));
+				&cr->dst.ia_address)->sin_addr),
+			cr->sp ? (int)cr->sp->conn_qual : 
+			ntohs(((struct sockaddr_in *)
+				&cr->dst.ia_address)->sin_port));
 		i++;
 	}
 	printf("\n");
diff --git a/dapl/openib_scm/dapl_ib_util.h b/dapl/openib_scm/dapl_ib_util.h
index 294ef3d..a668af7 100644
--- a/dapl/openib_scm/dapl_ib_util.h
+++ b/dapl/openib_scm/dapl_ib_util.h
@@ -99,6 +99,7 @@ typedef enum scm_state
 	SCM_ACCEPTED,
 	SCM_REJECTED,
 	SCM_CONNECTED,
+	SCM_RELEASED,
 	SCM_DISCONNECTED,
 	SCM_DESTROY
 } SCM_STATE;
@@ -382,10 +383,11 @@ STATIC _INLINE_ char * dapl_cm_state_str(IN int st)
 		"SCM_ACCEPTED",
 		"SCM_REJECTED",
 		"SCM_CONNECTED",
+		"SCM_RELEASED",
 		"SCM_DISCONNECTED",
 		"SCM_DESTROY"
         };
-        return ((st < 0 || st > 10) ? "Invalid CM state?" : cm_state[st]);
+        return ((st < 0 || st > 11) ? "Invalid CM state?" : cm_state[st]);
 }
 
 /*
-- 
1.5.2.5




More information about the ofw mailing list