[ofa-general] [PATCH 2/5] [uDAPL v2] dapl scm: change connect and accept to non-blocking to avoid blocking user thread.

Arlin Davis arlin.r.davis at intel.com
Thu Aug 14 17:06:43 PDT 2008


The connect socket that is used to exchange QP information is now non-blocking
and the data exchange is done via the cr thread. New state RTU_PENDING added.
On the passive side there is a new state ACCEPT_DATA used to avoid read blocking
on the user accept call.

Signed-off by: Arlin Davis ardavis at ichips.intel.com
---
 dapl/openib_scm/dapl_ib_cm.c         |  415 +++++++++++++++++++++++-----------
 dapl/openib_scm/dapl_ib_extensions.c |    2 +-
 dapl/openib_scm/dapl_ib_util.c       |    2 -
 dapl/openib_scm/dapl_ib_util.h       |    2 +
 4 files changed, 288 insertions(+), 133 deletions(-)

diff --git a/dapl/openib_scm/dapl_ib_cm.c b/dapl/openib_scm/dapl_ib_cm.c
index e712f9d..5ba5ddc 100644
--- a/dapl/openib_scm/dapl_ib_cm.c
+++ b/dapl/openib_scm/dapl_ib_cm.c
@@ -150,7 +150,7 @@ static uint16_t dapli_get_lid(IN struct ibv_context *ctx, IN uint8_t port)
  * ACTIVE/PASSIVE: called from CR thread or consumer via ep_disconnect
  */
 static DAT_RETURN 
-dapli_socket_disconnect(dp_ib_cm_handle_t cm_ptr)
+dapli_socket_disconnect(dp_ib_cm_handle_t cm_ptr)
 {
 	DAPL_EP	*ep_ptr = cm_ptr->ep;
 	DAT_UINT32 disc_data = htonl(0xdead);
@@ -197,6 +197,65 @@ dapli_socket_disconnect(dp_ib_cm_handle_t cm_ptr)
 	return DAT_SUCCESS;
 }
 
+/*
+ * ACTIVE: socket connected, send QP information to peer 
+ */
+void
+dapli_socket_connected(dp_ib_cm_handle_t cm_ptr, int err)
+{
+	int		len, opt = 1;
+	struct iovec    iovec[2];
+	struct dapl_ep	*ep_ptr = cm_ptr->ep;
+
+	if (err) {
+		dapl_log(DAPL_DBG_TYPE_ERR, " connect: socket ERR %s\n", 
+			 strerror(err)); 
+		goto bail;
+	}
+	dapl_dbg_log(DAPL_DBG_TYPE_EP,
+		     " socket connected, write QP and private data\n"); 
+
+	/* no delay for small packets */
+	setsockopt(cm_ptr->socket,IPPROTO_TCP,TCP_NODELAY,&opt,sizeof(opt));
+
+	/* send qp info and pdata to remote peer */
+	iovec[0].iov_base = &cm_ptr->dst;
+	iovec[0].iov_len  = sizeof(ib_qp_cm_t);
+	if (cm_ptr->dst.p_size) {
+		iovec[1].iov_base = cm_ptr->p_data;
+		iovec[1].iov_len  = ntohl(cm_ptr->dst.p_size);
+	}
+
+	len = writev(cm_ptr->socket, iovec, (cm_ptr->dst.p_size ? 2:1));
+    	if (len != (ntohl(cm_ptr->dst.p_size) + sizeof(ib_qp_cm_t))) {
+		dapl_log(DAPL_DBG_TYPE_ERR, 
+			 " CONN_PENDING write: ERR %s, wcnt=%d -> %s\n",
+			 strerror(errno), len,
+			 inet_ntoa(((struct sockaddr_in *)
+		           ep_ptr->param.remote_ia_address_ptr)->sin_addr)); 
+		goto bail;
+	}
+	dapl_dbg_log(DAPL_DBG_TYPE_CM, 
+		     " connected: sending SRC port=0x%x lid=0x%x,"
+		     " qpn=0x%x, psize=%d\n",
+		     ntohs(cm_ptr->dst.port), ntohs(cm_ptr->dst.lid), 
+		     ntohl(cm_ptr->dst.qpn), ntohl(cm_ptr->dst.p_size)); 
+        dapl_dbg_log(DAPL_DBG_TYPE_CM,
+                     " connected: sending SRC GID subnet %016llx id %016llx\n",
+                     (unsigned long long) 
+			cpu_to_be64(cm_ptr->dst.gid.global.subnet_prefix),
+                     (unsigned long long) 
+			cpu_to_be64(cm_ptr->dst.gid.global.interface_id));
+
+	/* queue up to work thread to avoid blocking consumer */
+	cm_ptr->state = SCM_RTU_PENDING;
+	return;
+bail:
+	/* close socket, free cm structure and post error event */
+	dapli_cm_destroy(cm_ptr);
+	dapl_evd_connection_callback(NULL, IB_CME_LOCAL_FAILURE, NULL, ep_ptr);
+}
+
 
 /*
  * ACTIVE: Create socket, connect, defer exchange QP information to CR thread
@@ -210,8 +269,7 @@ dapli_socket_connect(DAPL_EP		*ep_ptr,
 		     DAT_PVOID		p_data)
 {
 	dp_ib_cm_handle_t cm_ptr;
-	int		len, opt = 1;
-	struct iovec    iovec[2];
+	int		ret;
 	DAPL_IA		*ia_ptr = ep_ptr->header.owner_ia;
 
 	dapl_dbg_log(DAPL_DBG_TYPE_EP, " connect: r_qual %d p_size=%d\n", 
@@ -227,75 +285,88 @@ dapli_socket_connect(DAPL_EP		*ep_ptr,
 		return DAT_INSUFFICIENT_RESOURCES;
 	}
 
-	((struct sockaddr_in*)r_addr)->sin_port = htons(r_qual);
+	/* non-blocking */
+	ret = fcntl(cm_ptr->socket, F_GETFL); 
+        if (ret < 0 || fcntl(cm_ptr->socket,
+                              F_SETFL, ret | O_NONBLOCK) < 0) {
+                dapl_log(DAPL_DBG_TYPE_ERR,
+                         " socket connect: fcntl on socket %d ERR %d %s\n",
+                         cm_ptr->socket, ret,
+                         strerror(errno));
+                goto bail;
+        }
 
-	if (connect(cm_ptr->socket, r_addr, sizeof(*r_addr)) < 0) {
-		dapl_dbg_log(DAPL_DBG_TYPE_ERR,
-			     " connect: %s on r_qual %d\n",
-			     strerror(errno), (unsigned int)r_qual);
+	((struct sockaddr_in*)r_addr)->sin_port = htons(r_qual);
+	ret = connect(cm_ptr->socket, r_addr, sizeof(*r_addr));
+	if (ret && errno != EINPROGRESS) {
+		dapl_log(DAPL_DBG_TYPE_ERR,
+			 " socket connect ERROR: %s -> %s r_qual %d\n",
+			 strerror(errno), 
+		     	 inet_ntoa(((struct sockaddr_in *)r_addr)->sin_addr),
+			 (unsigned int)r_qual);
 		dapli_cm_destroy(cm_ptr);
 		return DAT_INVALID_ADDRESS;
-	}
-	setsockopt(cm_ptr->socket,IPPROTO_TCP,TCP_NODELAY,&opt,sizeof(opt));
-
-	dapl_dbg_log(DAPL_DBG_TYPE_EP, " socket connected!\n");
-
+	} 
 
 	/* Send QP info, IA address, and private data */
 	cm_ptr->dst.qpn = htonl(ep_ptr->qp_handle->qp_num);
+#ifdef DAT_EXTENSIONS
 	cm_ptr->dst.qp_type = htons(ep_ptr->qp_handle->qp_type);
+#endif
 	cm_ptr->dst.port = htons(ia_ptr->hca_ptr->port_num);
 	cm_ptr->dst.lid = 
 		htons(dapli_get_lid(ia_ptr->hca_ptr->ib_hca_handle, 
 				    (uint8_t)ia_ptr->hca_ptr->port_num));
-	if (cm_ptr->dst.lid == 0xffff)
+	if (cm_ptr->dst.lid == 0xffff) {
+		dapl_log(DAPL_DBG_TYPE_ERR, 
+			 " CONNECT: query LID ERR %s -> %s\n",
+			 strerror(errno), 
+			 inet_ntoa(((struct sockaddr_in *)r_addr)->sin_addr));
 		goto bail;
+	}
 
         /* in network order */
         if (ibv_query_gid(ia_ptr->hca_ptr->ib_hca_handle,
 				    (uint8_t)ia_ptr->hca_ptr->port_num,
-				    0,
-                                    &cm_ptr->dst.gid))
+				    0, &cm_ptr->dst.gid)) {
+		dapl_log(DAPL_DBG_TYPE_ERR, 
+			 " CONNECT: query GID ERR %s -> %s\n",
+			 strerror(errno), 
+			 inet_ntoa(((struct sockaddr_in *)r_addr)->sin_addr));
 		goto bail;
+	}
 
+	/* save references */
+	cm_ptr->hca = ia_ptr->hca_ptr;
+	cm_ptr->ep = ep_ptr;
 	cm_ptr->dst.ia_address = ia_ptr->hca_ptr->hca_address;
-	cm_ptr->dst.p_size = htonl(p_size);
-	iovec[0].iov_base = &cm_ptr->dst;
-	iovec[0].iov_len  = sizeof(ib_qp_cm_t);
 	if (p_size) {
-		iovec[1].iov_base = p_data;
-		iovec[1].iov_len  = p_size;
+		cm_ptr->dst.p_size = htonl(p_size);
+		dapl_os_memcpy(cm_ptr->p_data, p_data, p_size);
 	}
 
-	dapl_dbg_log(DAPL_DBG_TYPE_EP,
-		     " socket connected, write QP (%d), private data (%d)\n",
-		     sizeof(ib_qp_cm_t),p_size); 
+	/* connected or pending, either way results via async event */
+	if (ret == 0) 
+		dapli_socket_connected(cm_ptr,0);
+	else 
+		cm_ptr->state = SCM_CONN_PENDING;
 	
-	len = writev(cm_ptr->socket, iovec, (p_size ? 2:1));
-    	if (len != (p_size + sizeof(ib_qp_cm_t))) {
-		dapl_dbg_log(DAPL_DBG_TYPE_ERR, 
-			     " connect write: ERR %s, wcnt=%d\n",
-			     strerror(errno), len); 
-		goto bail;
-	}
-	dapl_dbg_log(DAPL_DBG_TYPE_CM, 
-		     " connect: SRC port=0x%x lid=0x%x, qpn=0x%x, psize=%d\n",
-		     ntohs(cm_ptr->dst.port), ntohs(cm_ptr->dst.lid), 
-		     ntohl(cm_ptr->dst.qpn), ntohl(cm_ptr->dst.p_size)); 
-        dapl_dbg_log(DAPL_DBG_TYPE_CM,
-                     " connect SRC GID subnet %016llx id %016llx\n",
-                     (unsigned long long) 
-			cpu_to_be64(cm_ptr->dst.gid.global.subnet_prefix),
-                     (unsigned long long) 
-			cpu_to_be64(cm_ptr->dst.gid.global.interface_id));
-
-	/* queue up to work thread to avoid blocking consumer */
-	cm_ptr->state = SCM_CONN_PENDING;
-	cm_ptr->hca = ia_ptr->hca_ptr;
-	cm_ptr->ep = ep_ptr;
+	dapl_dbg_log(DAPL_DBG_TYPE_EP,
+	             " connect: socket %d to %s r_qual %d pending\n",
+		     cm_ptr->socket,
+		     inet_ntoa(((struct sockaddr_in *)r_addr)->sin_addr),
+		     (unsigned int)r_qual);
+			
 	dapli_cm_queue(cm_ptr);
 	return DAT_SUCCESS;
 bail:
+	dapl_log(DAPL_DBG_TYPE_ERR,
+		 " socket connect ERROR: %s query lid(0x%x)/gid"
+		 " -> %s r_qual %d\n",
+		 strerror(errno), ntohs(cm_ptr->dst.lid), 
+		 inet_ntoa(((struct sockaddr_in *)r_addr)->sin_addr),
+		 (unsigned int)r_qual);
+
 	/* close socket, free cm structure */
 	dapli_cm_destroy(cm_ptr);
 	return DAT_INTERNAL_ERROR;
@@ -306,7 +377,7 @@ bail:
  * ACTIVE: exchange QP information, called from CR thread
  */
 void 
-dapli_socket_connect_rtu(dp_ib_cm_handle_t cm_ptr)
+dapli_socket_connect_rtu(dp_ib_cm_handle_t	cm_ptr)
 {
 	DAPL_EP		*ep_ptr = cm_ptr->ep;
 	int		len;
@@ -321,16 +392,20 @@ dapli_socket_connect_rtu(dp_ib_cm_handle_t cm_ptr)
 	iovec[0].iov_len  = sizeof(ib_qp_cm_t);
 	len = readv(cm_ptr->socket, iovec, 1);
 	if (len != sizeof(ib_qp_cm_t) || ntohs(cm_ptr->dst.ver) != DSCM_VER) {
-		dapl_dbg_log(DAPL_DBG_TYPE_ERR, 
-			     " connect_rtu read: ERR %s, rcnt=%d, ver=%d\n",
-			     strerror(errno), len, ntohs(cm_ptr->dst.ver)); 
+		dapl_log(DAPL_DBG_TYPE_ERR, 
+		     " CONN_RTU read: ERR %s, rcnt=%d, ver=%d -> %s\n",
+		     strerror(errno), len, cm_ptr->dst.ver,
+		     inet_ntoa(((struct sockaddr_in *)
+		         ep_ptr->param.remote_ia_address_ptr)->sin_addr)); 
 		goto bail;
 	}
 	/* check for consumer reject */
 	if (cm_ptr->dst.rej) {
-		dapl_dbg_log(DAPL_DBG_TYPE_CM, 
-			     " connect_rtu read: PEER REJ reason=0x%x\n",
-			     ntohs(cm_ptr->dst.rej)); 
+		dapl_log(DAPL_DBG_TYPE_CM, 
+			 " CONN_RTU read: PEER REJ reason=0x%x -> %s\n",
+			 ntohs(cm_ptr->dst.rej),
+			 inet_ntoa(((struct sockaddr_in *)
+			   ep_ptr->param.remote_ia_address_ptr)->sin_addr));
 		event = IB_CME_DESTINATION_REJECT_PRIVATE_DATA;
 		goto bail;
 	}
@@ -339,7 +414,9 @@ dapli_socket_connect_rtu(dp_ib_cm_handle_t cm_ptr)
 	cm_ptr->dst.port = ntohs(cm_ptr->dst.port);
 	cm_ptr->dst.lid = ntohs(cm_ptr->dst.lid);
 	cm_ptr->dst.qpn = ntohl(cm_ptr->dst.qpn);
+#ifdef DAT_EXTENSIONS
 	cm_ptr->dst.qp_type = ntohs(cm_ptr->dst.qp_type);
+#endif
 	cm_ptr->dst.p_size = ntohl(cm_ptr->dst.p_size);
 
 	/* save remote address information */
@@ -348,7 +425,7 @@ dapli_socket_connect_rtu(dp_ib_cm_handle_t cm_ptr)
 			sizeof(ep_ptr->remote_ia_address));
 
 	dapl_dbg_log(DAPL_DBG_TYPE_EP, 
-		     " connect_rtu: DST %s port=0x%x lid=0x%x,"
+		     " CONN_RTU: DST %s port=0x%x lid=0x%x,"
 		     " qpn=0x%x, qp_type=%d, psize=%d\n",
 		     inet_ntoa(((struct sockaddr_in *)
 				&cm_ptr->dst.ia_address)->sin_addr),
@@ -358,35 +435,50 @@ dapli_socket_connect_rtu(dp_ib_cm_handle_t cm_ptr)
 
 	/* validate private data size before reading */
 	if (cm_ptr->dst.p_size > IB_MAX_REP_PDATA_SIZE) {
-		dapl_dbg_log(DAPL_DBG_TYPE_ERR, 
-			     " connect_rtu read: psize (%d) wrong\n",
-			     cm_ptr->dst.p_size ); 
+		dapl_log(DAPL_DBG_TYPE_ERR, 
+			 " CONN_RTU read: psize (%d) wrong -> %s\n",
+			 cm_ptr->dst.p_size,
+			 inet_ntoa(((struct sockaddr_in *)
+			   ep_ptr->param.remote_ia_address_ptr)->sin_addr)); 
 		goto bail;
 	}
 
 	/* read private data into cm_handle if any present */
-	dapl_dbg_log(DAPL_DBG_TYPE_EP," socket connected, read pdata\n"); 
-
+	dapl_dbg_log(DAPL_DBG_TYPE_EP," socket connected, read private data\n"); 
 	if (cm_ptr->dst.p_size) {
 		iovec[0].iov_base = cm_ptr->p_data;
 		iovec[0].iov_len  = cm_ptr->dst.p_size;
 		len = readv(cm_ptr->socket, iovec, 1);
 		if (len != cm_ptr->dst.p_size) {
-			dapl_dbg_log(DAPL_DBG_TYPE_ERR, 
-				" connect_rtu read pdata: ERR %s, rcnt=%d\n",
-				strerror(errno), len); 
+			dapl_log(DAPL_DBG_TYPE_ERR, 
+			    " CONN_RTU read pdata: ERR %s, rcnt=%d -> %s\n",
+			    strerror(errno), len,
+			    inet_ntoa(((struct sockaddr_in *)
+			      ep_ptr->param.remote_ia_address_ptr)->sin_addr)); 
 			goto bail;
 		}
 	}
 
 	/* modify QP to RTR and then to RTS with remote info */
 	if (dapls_modify_qp_state(ep_ptr->qp_handle, 
-				  IBV_QPS_RTR, cm_ptr) != DAT_SUCCESS)
+				  IBV_QPS_RTR, cm_ptr) != DAT_SUCCESS) {
+		dapl_log(DAPL_DBG_TYPE_ERR, 
+			 " CONN_RTU: QPS_RTR ERR %s -> %s\n",
+			 strerror(errno), 
+			 inet_ntoa(((struct sockaddr_in *)
+			   ep_ptr->param.remote_ia_address_ptr)->sin_addr)); 
 		goto bail;
+	}
 
 	if (dapls_modify_qp_state(ep_ptr->qp_handle, 
-				   IBV_QPS_RTS, cm_ptr) != DAT_SUCCESS)
+				  IBV_QPS_RTS, cm_ptr) != DAT_SUCCESS) {
+		dapl_log(DAPL_DBG_TYPE_ERR, 
+			 " CONN_RTU: QPS_RTS ERR %s -> %s\n",
+			 strerror(errno), 
+			 inet_ntoa(((struct sockaddr_in *)
+			   ep_ptr->param.remote_ia_address_ptr)->sin_addr)); 
 		goto bail;
+	}
 		 
 	ep_ptr->qp_state = IB_QP_STATE_RTS;
 
@@ -426,7 +518,6 @@ dapli_socket_connect_rtu(dp_ib_cm_handle_t cm_ptr)
 				     IB_CME_CONNECTED, 
 				     cm_ptr->p_data, 
 				     ep_ptr);	
-
 	return;
 bail:
 	/* close socket, free cm structure and post error event */
@@ -461,8 +552,9 @@ dapli_socket_listen(DAPL_IA		*ia_ptr,
 	
 	/* bind, listen, set sockopt, accept, exchange data */
 	if ((cm_ptr->socket = socket(AF_INET, SOCK_STREAM, 0)) < 0) {
-		dapl_dbg_log (DAPL_DBG_TYPE_ERR, 
-				"socket for listen returned %d\n", errno);
+		dapl_log(DAPL_DBG_TYPE_ERR, 
+			 " ERR: listen socket create: %s\n", 
+			 strerror(errno));
 		dat_status = DAT_INSUFFICIENT_RESOURCES;
 		goto bail;
 	}
@@ -474,9 +566,9 @@ dapli_socket_listen(DAPL_IA		*ia_ptr,
 
 	if ((bind(cm_ptr->socket,(struct sockaddr*)&addr, sizeof(addr)) < 0) ||
 	    (listen(cm_ptr->socket, 128) < 0)) {
-		dapl_dbg_log( DAPL_DBG_TYPE_CM,
-				" listen: ERROR %s on conn_qual 0x%x\n",
-				strerror(errno),serviceID); 
+		dapl_dbg_log(DAPL_DBG_TYPE_CM,
+			     " listen: ERROR %s on conn_qual 0x%x\n",
+			     strerror(errno),serviceID); 
 		if (errno == EADDRINUSE)
 			dat_status = DAT_CONN_QUAL_IN_USE;
 		else
@@ -503,25 +595,22 @@ bail:
 	return dat_status;
 }
 
-
 /*
- * PASSIVE: accept socket, receive peer QP information, private data, post cr_event 
+ * PASSIVE: accept socket 
  */
-DAT_RETURN 
+void 
 dapli_socket_accept(ib_cm_srvc_handle_t cm_ptr)
 {
-	dp_ib_cm_handle_t acm_ptr;
-	void		*p_data = NULL;
+	dp_ib_cm_handle_t	acm_ptr;
 	int		len;
-	DAT_RETURN	dat_status = DAT_SUCCESS;
 		
 	dapl_dbg_log(DAPL_DBG_TYPE_EP," socket_accept\n"); 
 
 	/* Allocate accept CM and initialize */
 	if ((acm_ptr = dapl_os_alloc(sizeof(*acm_ptr))) == NULL) 
-		return DAT_INSUFFICIENT_RESOURCES;
+		goto bail;
 
-	(void) dapl_os_memzero( acm_ptr, sizeof( *acm_ptr ) );
+	(void) dapl_os_memzero(acm_ptr, sizeof(*acm_ptr));
 	
 	acm_ptr->socket = -1;
 	acm_ptr->sp = cm_ptr->sp;
@@ -531,25 +620,43 @@ dapli_socket_accept(ib_cm_srvc_handle_t cm_ptr)
 	acm_ptr->socket = accept(cm_ptr->socket, 
 				(struct sockaddr*)&acm_ptr->dst.ia_address, 
 				(socklen_t*)&len);
-
 	if (acm_ptr->socket < 0) {
-		dapl_dbg_log(DAPL_DBG_TYPE_ERR, 
+		dapl_log(DAPL_DBG_TYPE_ERR, 
 			" accept: ERR %s on FD %d l_cr %p\n",
 			strerror(errno),cm_ptr->socket,cm_ptr); 
-		dat_status = DAT_INTERNAL_ERROR;
 		goto bail;
    	}
 
+	dapl_dbg_log(DAPL_DBG_TYPE_EP,
+		     " socket accepted, queue new cm %p\n",acm_ptr); 
+
+	acm_ptr->state = SCM_ACCEPTING;
+	dapli_cm_queue(acm_ptr);
+	return;
+bail:
+	/* close socket, free cm structure, active will see socket close as reject */
+	if (acm_ptr)
+		dapli_cm_destroy(acm_ptr);
+}
+
+/*
+ * PASSIVE: receive peer QP information, private data, post cr_event 
+ */
+void 
+dapli_socket_accept_data(ib_cm_srvc_handle_t acm_ptr)
+{
+	int len;
+	void *p_data = NULL;
+
 	dapl_dbg_log(DAPL_DBG_TYPE_EP," socket accepted, read QP data\n"); 
 
 	/* read in DST QP info, IA address. check for private data */
 	len = read(acm_ptr->socket, &acm_ptr->dst, sizeof(ib_qp_cm_t));
 	if (len != sizeof(ib_qp_cm_t) || 
 	    ntohs(acm_ptr->dst.ver) != DSCM_VER) {
-		dapl_dbg_log(DAPL_DBG_TYPE_ERR, 
+		dapl_log(DAPL_DBG_TYPE_ERR, 
 			     " accept read: ERR %s, rcnt=%d, ver=%d\n",
-			     strerror(errno), len, ntohs(acm_ptr->dst.ver)); 
-		dat_status = DAT_INTERNAL_ERROR;
+			     strerror(errno), len, acm_ptr->dst.ver); 
 		goto bail;
 	}
 
@@ -557,13 +664,14 @@ dapli_socket_accept(ib_cm_srvc_handle_t cm_ptr)
 	acm_ptr->dst.port = ntohs(acm_ptr->dst.port);
 	acm_ptr->dst.lid = ntohs(acm_ptr->dst.lid);
 	acm_ptr->dst.qpn = ntohl(acm_ptr->dst.qpn);
+#ifdef DAT_EXTENSIONS
 	acm_ptr->dst.qp_type = ntohs(acm_ptr->dst.qp_type);
+#endif
 	acm_ptr->dst.p_size = ntohl(acm_ptr->dst.p_size);
 
 	dapl_dbg_log(DAPL_DBG_TYPE_EP, 
-		     " accept: DST %s port=0x%x lid=0x%x, qpn=0x%x, psz=%d\n",
-		     inet_ntoa(((struct sockaddr_in *)
-			&acm_ptr->dst.ia_address)->sin_addr),
+		     " accept: DST %s port=0x%x lid=0x%x, qpn=0x%x, psize=%d\n",
+		     inet_ntoa(((struct sockaddr_in *)&acm_ptr->dst.ia_address)->sin_addr),
 		     acm_ptr->dst.port, acm_ptr->dst.lid, 
 		     acm_ptr->dst.qpn, acm_ptr->dst.p_size); 
 
@@ -572,7 +680,6 @@ dapli_socket_accept(ib_cm_srvc_handle_t cm_ptr)
 		dapl_dbg_log(DAPL_DBG_TYPE_ERR, 
 			     " accept read: psize (%d) wrong\n",
 			     acm_ptr->dst.p_size); 
-		dat_status = DAT_INTERNAL_ERROR;
 		goto bail;
 	}
 
@@ -583,18 +690,17 @@ dapli_socket_accept(ib_cm_srvc_handle_t cm_ptr)
 		len = read( acm_ptr->socket, 
 			    acm_ptr->p_data, acm_ptr->dst.p_size);
 		if (len != acm_ptr->dst.p_size) {
-			dapl_dbg_log(DAPL_DBG_TYPE_ERR, 
+			dapl_log(DAPL_DBG_TYPE_ERR, 
 				     " accept read pdata: ERR %s, rcnt=%d\n",
 				     strerror(errno), len); 
-			dat_status = DAT_INTERNAL_ERROR;
 			goto bail;
 		}
 		dapl_dbg_log(DAPL_DBG_TYPE_EP," accept: psize=%d read\n",len);
 		p_data = acm_ptr->p_data;
 	}
 	
-	acm_ptr->state = SCM_ACCEPTING;
-	
+	acm_ptr->state = SCM_ACCEPTING_DATA;
+
 #ifdef DAT_EXTENSIONS
 	if (acm_ptr->dst.qp_type == IBV_QPT_UD) {
 		DAT_IB_EXTENSION_EVENT_DATA xevent;
@@ -617,10 +723,11 @@ dapli_socket_accept(ib_cm_srvc_handle_t cm_ptr)
 			  IB_CME_CONNECTION_REQUEST_PENDING,
 		          p_data,
 			  acm_ptr->sp );
-	return DAT_SUCCESS;
+	return;
 bail:
+	/* close socket, free cm structure, active will see socket close as reject */
 	dapli_cm_destroy(acm_ptr);
-	return DAT_INTERNAL_ERROR;
+	return;
 }
 
 /*
@@ -635,7 +742,7 @@ dapli_socket_accept_usr(DAPL_EP		*ep_ptr,
 			DAT_PVOID	p_data)
 {
 	DAPL_IA		*ia_ptr = ep_ptr->header.owner_ia;
-	dp_ib_cm_handle_t cm_ptr = cr_ptr->ib_cm_handle;
+	dp_ib_cm_handle_t  cm_ptr = cr_ptr->ib_cm_handle;
 	ib_qp_cm_t	local;
 	struct iovec    iovec[2];
 	int		len;
@@ -648,7 +755,7 @@ dapli_socket_accept_usr(DAPL_EP		*ep_ptr,
 		return DAT_INTERNAL_ERROR;
 	
 	dapl_dbg_log(DAPL_DBG_TYPE_EP, 
-		     " accept_usr: remote port=0x%x lid=0x%x"
+		     " ACCEPT_USR: remote port=0x%x lid=0x%x"
 		     " qpn=0x%x qp_type %d, psize=%d\n",
 		     cm_ptr->dst.port, cm_ptr->dst.lid,
 		     cm_ptr->dst.qpn, cm_ptr->dst.qp_type,
@@ -658,25 +765,34 @@ dapli_socket_accept_usr(DAPL_EP		*ep_ptr,
 	if (cm_ptr->dst.qp_type == IBV_QPT_UD && 
 	    ep_ptr->qp_handle->qp_type != IBV_QPT_UD) {
 		    dapl_dbg_log(DAPL_DBG_TYPE_ERR,
-				 " accept_rtu: ERR remote QP is UD,"
+				 " ACCEPT_USR: ERR remote QP is UD,"
 				 ", but local QP is not\n"); 
 		    return (DAT_INVALID_HANDLE | DAT_INVALID_HANDLE_EP);
-
 	}
 #endif
 
 	/* modify QP to RTR and then to RTS with remote info already read */
 	if (dapls_modify_qp_state(ep_ptr->qp_handle, 
-				  IBV_QPS_RTR, cm_ptr) != DAT_SUCCESS)
+				  IBV_QPS_RTR, cm_ptr) != DAT_SUCCESS) {
+		dapl_log(DAPL_DBG_TYPE_ERR, 
+			 " ACCEPT_USR: QPS_RTR ERR %s -> %s\n",
+			 strerror(errno), 
+			 inet_ntoa(((struct sockaddr_in *)
+				&cm_ptr->dst.ia_address)->sin_addr)); 
 		goto bail;
-
+	}
 	if (dapls_modify_qp_state(ep_ptr->qp_handle, 
-				  IBV_QPS_RTS, cm_ptr) != DAT_SUCCESS)
+				  IBV_QPS_RTS, cm_ptr) != DAT_SUCCESS) {
+		dapl_log(DAPL_DBG_TYPE_ERR, 
+			 " ACCEPT_USR: QPS_RTS ERR %s -> %s\n",
+			 strerror(errno), 
+			 inet_ntoa(((struct sockaddr_in *)
+				&cm_ptr->dst.ia_address)->sin_addr)); 
 		goto bail;
-
+	}
 	ep_ptr->qp_state = IB_QP_STATE_RTS;
 	
-	/* save remote address information, for qp_query */
+	/* save remote address information */
 	dapl_os_memcpy( &ep_ptr->remote_ia_address, 
 			&cm_ptr->dst.ia_address, 
 			sizeof(ep_ptr->remote_ia_address));
@@ -689,15 +805,26 @@ dapli_socket_accept_usr(DAPL_EP		*ep_ptr,
 	local.port = htons(ia_ptr->hca_ptr->port_num);
 	local.lid = htons(dapli_get_lid(ia_ptr->hca_ptr->ib_hca_handle, 
 				        (uint8_t)ia_ptr->hca_ptr->port_num));
-	if (local.lid == 0xffff)
+	if (local.lid == 0xffff) {
+		dapl_log(DAPL_DBG_TYPE_ERR, 
+			 " ACCEPT_USR: query LID ERR %s -> %s\n",
+			 strerror(errno), 
+			 inet_ntoa(((struct sockaddr_in *)
+				&cm_ptr->dst.ia_address)->sin_addr)); 
 		goto bail;
+	}
 
         /* in network order */
 	if (ibv_query_gid(ia_ptr->hca_ptr->ib_hca_handle,
 			  (uint8_t)ia_ptr->hca_ptr->port_num,
-			  0,
-			  &local.gid))
+			  0, &local.gid)) {
+		dapl_log(DAPL_DBG_TYPE_ERR, 
+			 " ACCEPT_USR: query GID ERR %s -> %s\n",
+			 strerror(errno), 
+			 inet_ntoa(((struct sockaddr_in *)
+				&cm_ptr->dst.ia_address)->sin_addr)); 
 		goto bail;
+	}
 
 	local.ia_address = ia_ptr->hca_ptr->hca_address;
 	local.p_size = htonl(p_size);
@@ -709,19 +836,20 @@ dapli_socket_accept_usr(DAPL_EP		*ep_ptr,
 	}
 	len = writev(cm_ptr->socket, iovec, (p_size ? 2:1));
     	if (len != (p_size + sizeof(ib_qp_cm_t))) {
-		dapl_dbg_log(DAPL_DBG_TYPE_ERR, 
-			     " accept_rtu: ERR %s, wcnt=%d\n",
-			     strerror(errno), len); 
+		dapl_log(DAPL_DBG_TYPE_ERR, 
+			 " ACCEPT_USR: ERR %s, wcnt=%d -> %s\n",
+			 strerror(errno), len,
+			 inet_ntoa(((struct sockaddr_in *)
+			     &cm_ptr->dst.ia_address)->sin_addr)); 
 		goto bail;
 	}
 	dapl_dbg_log(DAPL_DBG_TYPE_CM, 
-		     " accept_usr: local port=0x%x lid=0x%x"
-		     " qpn=0x%x qp_type=%d psize=%d\n",
+		     " ACCEPT_USR: local port=0x%x lid=0x%x"
+		     " qpn=0x%x psize=%d\n",
 		     ntohs(local.port), ntohs(local.lid), 
-		     ntohl(local.qpn), ntohs(local.qp_type),
-		     ntohl(local.p_size)); 
+		     ntohl(local.qpn), ntohl(local.p_size)); 
         dapl_dbg_log(DAPL_DBG_TYPE_CM,
-                     " accept_usr SRC GID subnet %016llx id %016llx\n",
+                     " ACCEPT_USR SRC GID subnet %016llx id %016llx\n",
                      (unsigned long long) 
 			cpu_to_be64(local.gid.global.subnet_prefix),
                      (unsigned long long) 
@@ -733,10 +861,8 @@ dapli_socket_accept_usr(DAPL_EP		*ep_ptr,
 	cm_ptr->state = SCM_ACCEPTED;
 
 	dapl_dbg_log( DAPL_DBG_TYPE_EP," PASSIVE: accepted!\n" ); 
-	dapli_cm_queue(cm_ptr);
 	return DAT_SUCCESS;
 bail:
-	dapl_dbg_log(DAPL_DBG_TYPE_ERR," accept_rtu: ERR !QP_RTR_RTS \n"); 
 	dapli_cm_destroy(cm_ptr);
 	dapls_ib_reinit_ep(ep_ptr); /* reset QP state */
 	return DAT_INTERNAL_ERROR;
@@ -746,7 +872,7 @@ bail:
  * PASSIVE: read RTU from active peer, post CONN event
  */
 void 
-dapli_socket_accept_rtu(dp_ib_cm_handle_t cm_ptr)
+dapli_socket_accept_rtu(dp_ib_cm_handle_t	cm_ptr)
 {
 	int		len;
 	short		rtu_data = 0;
@@ -754,9 +880,11 @@ dapli_socket_accept_rtu(dp_ib_cm_handle_t cm_ptr)
 	/* complete handshake after final QP state change */
 	len = read(cm_ptr->socket, &rtu_data, sizeof(rtu_data));
 	if (len != sizeof(rtu_data) || ntohs(rtu_data) != 0x0e0f) {
-		dapl_dbg_log(DAPL_DBG_TYPE_ERR, 
-			     " accept_rtu: ERR %s, rcnt=%d rdata=%x\n",
-			     strerror(errno), len, ntohs(rtu_data)); 
+		dapl_log(DAPL_DBG_TYPE_ERR, 
+			 " ACCEPT_RTU: ERR %s, rcnt=%d rdata=%x\n",
+			 strerror(errno), len, ntohs(rtu_data),
+			 inet_ntoa(((struct sockaddr_in *)
+				&cm_ptr->dst.ia_address)->sin_addr)); 
 		goto bail;
 	}
 
@@ -860,7 +988,7 @@ dapls_ib_disconnect(
 	IN	DAPL_EP			*ep_ptr,
 	IN	DAT_CLOSE_FLAGS		close_flags)
 {
-	dp_ib_cm_handle_t cm_ptr = ep_ptr->cm_handle;
+	dp_ib_cm_handle_t	cm_ptr = ep_ptr->cm_handle;
 
 	dapl_dbg_log (DAPL_DBG_TYPE_EP,
 			"dapls_ib_disconnect(ep_handle %p ....)\n",
@@ -1045,7 +1173,7 @@ dapls_ib_reject_connection(
 	IN DAT_COUNT psize,
 	IN const DAT_PVOID pdata)
 {
-	struct iovec    	iovec;
+	struct iovec iovec[2];
 
 	dapl_dbg_log (DAPL_DBG_TYPE_EP,
 		      " reject(cm %p reason %x, pdata %p, psize %d)\n",
@@ -1055,9 +1183,15 @@ dapls_ib_reject_connection(
 	if (cm_ptr->socket >= 0) {
 		cm_ptr->dst.rej = (uint16_t)reason;
 		cm_ptr->dst.rej = htons(cm_ptr->dst.rej);
-		iovec.iov_base = &cm_ptr->dst;
-		iovec.iov_len  = sizeof(ib_qp_cm_t);
-		writev(cm_ptr->socket, &iovec, 1);
+		iovec[0].iov_base = &cm_ptr->dst;
+		iovec[0].iov_len  = sizeof(ib_qp_cm_t);
+		if (psize) {
+			iovec[1].iov_base = pdata;
+			iovec[2].iov_len = psize;
+			writev(cm_ptr->socket, &iovec[0], 2);
+		} else
+			writev(cm_ptr->socket, &iovec[0], 1);
+
 		close(cm_ptr->socket);
 		cm_ptr->socket = -1;
 	}
@@ -1090,7 +1224,7 @@ dapls_ib_cm_remote_addr (
 	OUT	DAT_SOCK_ADDR6	*remote_ia_address )
 {
 	DAPL_HEADER	*header;
-	dp_ib_cm_handle_t ib_cm_handle;
+	dp_ib_cm_handle_t	ib_cm_handle;
 
 	dapl_dbg_log (DAPL_DBG_TYPE_EP,
 		      "dapls_ib_cm_remote_addr(dat_handle %p, ....)\n",
@@ -1290,7 +1424,8 @@ void cr_thread(void *arg)
 {
     struct dapl_hca	*hca_ptr = arg;
     dp_ib_cm_handle_t	cr, next_cr;
-    int 		ret,idx;
+    int 		opt,ret,idx;
+    socklen_t		opt_len;
     char		rbuf[2];
     struct pollfd	ufds[SCM_MAX_CONN];
      
@@ -1330,14 +1465,19 @@ void cr_thread(void *arg)
 		
 	    /* Add to ufds for poll, check for immediate work */
 	    ufds[++idx].fd = cr->socket; /* add listen or cr */
-	    ufds[idx].events = POLLIN;
+	    if (cr->state == SCM_CONN_PENDING)
+	    	ufds[idx].events = POLLOUT;
+	    else
+		ufds[idx].events = POLLIN;
 
 	    /* check socket for event, accept in or connect out */
 	    dapl_dbg_log(DAPL_DBG_TYPE_CM," poll cr=%p, fd=%d,%d\n", 
 				cr, cr->socket, ufds[idx].fd);
 	    dapl_os_unlock(&hca_ptr->ib_trans.lock);
 	    ret = poll(&ufds[idx],1,1);
-	    dapl_dbg_log(DAPL_DBG_TYPE_CM," poll wakeup ret=%d cr->st=%d ev=%d fd=%d\n",
+	    dapl_dbg_log(DAPL_DBG_TYPE_CM,
+			 " poll wakeup ret=%d cr->st=%d"
+			 " ev=0x%x fd=%d\n",
 			 ret,cr->state,ufds[idx].revents,ufds[idx].fd);
 
 	    /* data on listen, qp exchange, and on disconnect request */
@@ -1345,13 +1485,28 @@ void cr_thread(void *arg)
 		if (cr->socket > 0) {
 			if (cr->state == SCM_LISTEN)
 				dapli_socket_accept(cr);
+			else if (cr->state == SCM_ACCEPTING)
+				dapli_socket_accept_data(cr);
 			else if (cr->state == SCM_ACCEPTED)
 				dapli_socket_accept_rtu(cr);
-			else if (cr->state == SCM_CONN_PENDING)
+			else if (cr->state == SCM_RTU_PENDING)
 				dapli_socket_connect_rtu(cr);
 			else if (cr->state == SCM_CONNECTED)
 				dapli_socket_disconnect(cr);
 		}
+	    /* connect socket is writable, check status */
+	    } else if ((ret == 1) && 
+			(ufds[idx].revents & POLLOUT ||
+			 ufds[idx].revents & POLLERR)) {
+		if (cr->state == SCM_CONN_PENDING) {
+			opt = 0;
+			ret = getsockopt(cr->socket, SOL_SOCKET, 
+					 SO_ERROR, &opt, &opt_len);
+			if (!ret)
+				dapli_socket_connected(cr,opt);
+			else
+				dapli_socket_connected(cr,EFAULT);
+		}
 	    } else if (ret != 0) {
     		dapl_dbg_log(DAPL_DBG_TYPE_CM,
 			     " cr_thread(cr=%p) st=%d poll ERR= %s\n",
diff --git a/dapl/openib_scm/dapl_ib_extensions.c b/dapl/openib_scm/dapl_ib_extensions.c
index b88e853..692c8a8 100755
--- a/dapl/openib_scm/dapl_ib_extensions.c
+++ b/dapl/openib_scm/dapl_ib_extensions.c
@@ -82,7 +82,7 @@ dapl_extensions(IN DAT_HANDLE		dat_handle,
 		IN va_list		args)
 {
 	DAT_EP_HANDLE		ep;
-	DAT_IB_ADDR_HANDLE	*ah;
+	DAT_IB_ADDR_HANDLE	*ah = NULL;
 	DAT_LMR_TRIPLET		*lmr_p;
 	DAT_DTO_COOKIE		cookie;
 	const DAT_RMR_TRIPLET	*rmr_p;
diff --git a/dapl/openib_scm/dapl_ib_util.c b/dapl/openib_scm/dapl_ib_util.c
index f3874ca..0f24737 100644
--- a/dapl/openib_scm/dapl_ib_util.c
+++ b/dapl/openib_scm/dapl_ib_util.c
@@ -444,8 +444,6 @@ DAT_RETURN dapls_ib_query_hca (
 		ia_attr->transport_attr           = NULL;
 		ia_attr->num_vendor_attr          = 0;
 		ia_attr->vendor_attr              = NULL;
-                ia_attr->max_iov_segments_per_rdma_read = dev_attr.max_sge;
-                ia_attr->max_iov_segments_per_rdma_write = dev_attr.max_sge;
 #ifdef DAT_EXTENSIONS
                 ia_attr->extension_supported = DAT_EXTENSION_IB;
                 ia_attr->extension_version = DAT_IB_EXTENSION_VERSION;
diff --git a/dapl/openib_scm/dapl_ib_util.h b/dapl/openib_scm/dapl_ib_util.h
index bd3ea83..deb6be3 100644
--- a/dapl/openib_scm/dapl_ib_util.h
+++ b/dapl/openib_scm/dapl_ib_util.h
@@ -107,7 +107,9 @@ typedef enum scm_state
 	SCM_INIT,
 	SCM_LISTEN,
 	SCM_CONN_PENDING,
+	SCM_RTU_PENDING,
 	SCM_ACCEPTING,
+	SCM_ACCEPTING_DATA,
 	SCM_ACCEPTED,
 	SCM_REJECTED,
 	SCM_CONNECTED,
-- 
1.5.2.5





More information about the general mailing list