[ofw] [PATCH 1/3] uDAPL v2: scm: improve serialization of destroy and state changes

Arlin Davis arlin.r.davis at intel.com
Mon Sep 28 15:08:10 PDT 2009


WinOF testing with slightly different scheduler and verbs
showed some issues with cleanup. Add better protection around
destroy and move state change before socket send to insure
correct state in multi-thread environment targeting the same
device on send and recv.

Change DCM_RTU_PENDING to DCM_REP_PENDING and
and add static definition to local routines for better
readability.

Signed-off-by: Arlin Davis <arlin.r.davis at intel.com>
---
 dapl/openib_common/dapl_ib_common.h |    4 +-
 dapl/openib_scm/cm.c                |  125 +++++++++++++++--------------------
 2 files changed, 56 insertions(+), 73 deletions(-)

diff --git a/dapl/openib_common/dapl_ib_common.h b/dapl/openib_common/dapl_ib_common.h
index 3cd8885..671073b 100644
--- a/dapl/openib_common/dapl_ib_common.h
+++ b/dapl/openib_common/dapl_ib_common.h
@@ -265,7 +265,7 @@ typedef enum dapl_cm_state
 	DCM_INIT,
 	DCM_LISTEN,
 	DCM_CONN_PENDING,
-	DCM_RTU_PENDING,
+	DCM_REP_PENDING,
 	DCM_ACCEPTING,
 	DCM_ACCEPTING_DATA,
 	DCM_ACCEPTED,
@@ -356,7 +356,7 @@ STATIC _INLINE_ char * dapl_cm_state_str(IN int st)
 		"CM_INIT",
 		"CM_LISTEN",
 		"CM_CONN_PENDING",
-		"CM_RTU_PENDING",
+		"CM_REP_PENDING",
 		"CM_ACCEPTING",
 		"CM_ACCEPTING_DATA",
 		"CM_ACCEPTED",
diff --git a/dapl/openib_scm/cm.c b/dapl/openib_scm/cm.c
index 2403918..87f5446 100644
--- a/dapl/openib_scm/cm.c
+++ b/dapl/openib_scm/cm.c
@@ -46,6 +46,11 @@
  *
  **************************************************************************/
 
+#if defined(_WIN32)
+#define FD_SETSIZE 1024
+#define DAPL_FD_SETSIZE FD_SETSIZE
+#endif
+
 #include "dapl.h"
 #include "dapl_adapter_util.h"
 #include "dapl_evd_util.h"
@@ -314,12 +319,6 @@ void dapls_ib_cm_free(dp_ib_cm_handle_t cm_ptr, DAPL_EP *ep)
 		cm_ptr->ep = NULL;
 	}
 
-	/* close socket if still active */
-	if (cm_ptr->socket != DAPL_INVALID_SOCKET) {
-		shutdown(cm_ptr->socket, SHUT_RDWR);
-		closesocket(cm_ptr->socket);
-		cm_ptr->socket = DAPL_INVALID_SOCKET;
-	}
 	dapl_os_unlock(&cm_ptr->lock);
 	goto notify_thread;
 
@@ -404,25 +403,15 @@ DAT_RETURN dapli_socket_disconnect(dp_ib_cm_handle_t cm_ptr)
 		return DAT_SUCCESS;
 
 	dapl_os_lock(&cm_ptr->lock);
-	if ((cm_ptr->state == DCM_INIT) ||
-	    (cm_ptr->state == DCM_DISCONNECTED) ||
-	    (cm_ptr->state == DCM_DESTROY)) {
+	if (cm_ptr->state != DCM_CONNECTED) {
 		dapl_os_unlock(&cm_ptr->lock);
 		return DAT_SUCCESS;
-	} else {
-		/* send disc date, close socket, schedule destroy */
-		if (cm_ptr->socket != DAPL_INVALID_SOCKET) {
-			if (send(cm_ptr->socket, (char *)&disc_data,
-				 sizeof(disc_data), 0) == -1)
-				dapl_log(DAPL_DBG_TYPE_WARN,
-					 " cm_disc: write error = %s\n",
-					 strerror(errno));
-			shutdown(cm_ptr->socket, SHUT_RDWR);
-			closesocket(cm_ptr->socket);
-			cm_ptr->socket = DAPL_INVALID_SOCKET;
-		}
-		cm_ptr->state = DCM_DISCONNECTED;
 	}
+
+	/* send disc date, close socket, schedule destroy */
+	dapls_modify_qp_state(ep_ptr->qp_handle, IBV_QPS_ERR, 0,0,0);
+	cm_ptr->state = DCM_DISCONNECTED;
+	send(cm_ptr->socket, (char *)&disc_data, sizeof(disc_data), 0);
 	dapl_os_unlock(&cm_ptr->lock);
 
 	/* disconnect events for RC's only */
@@ -472,6 +461,8 @@ static void dapli_socket_connected(dp_ib_cm_handle_t cm_ptr, int err)
 		dapl_log(DAPL_DBG_TYPE_WARN,
 			 " CONN_PENDING: NODELAY setsockopt: %s\n",
 			 strerror(errno));
+		
+	cm_ptr->state = DCM_REP_PENDING;
 
 	/* send qp info and pdata to remote peer */
 	exp = sizeof(ib_cm_msg_t) - DCM_MAX_PDATA_SIZE;
@@ -509,9 +500,6 @@ static void dapli_socket_connected(dp_ib_cm_handle_t cm_ptr, int err)
 		     htonll(cm_ptr->msg.saddr.ib.gid.global.subnet_prefix),
 		     (unsigned long long)
 		     htonll(cm_ptr->msg.saddr.ib.gid.global.interface_id));
-
-	/* queue up to work thread to avoid blocking consumer */
-	cm_ptr->state = DCM_RTU_PENDING;
 	return;
 
 bail:
@@ -745,14 +733,14 @@ static void dapli_socket_connect_rtu(dp_ib_cm_handle_t cm_ptr)
 	dapl_dbg_log(DAPL_DBG_TYPE_EP, " connect_rtu: send RTU\n");
 
 	/* complete handshake after final QP state change, Just ver+op */
+	cm_ptr->state = DCM_CONNECTED;
 	cm_ptr->msg.op = ntohs(DCM_RTU);
 	if (send(cm_ptr->socket, (char *)&cm_ptr->msg, 4, 0) == -1) {
 		dapl_log(DAPL_DBG_TYPE_ERR,
 			 " CONN_RTU: write error = %s\n", strerror(errno));
 		goto bail;
 	}
-	/* init cm_handle and post the event with private data */
-	cm_ptr->state = DCM_CONNECTED;
+	/* post the event with private data */
 	event = IB_CME_CONNECTED;
 	dapl_dbg_log(DAPL_DBG_TYPE_EP, " ACTIVE: connected!\n");
 
@@ -807,9 +795,7 @@ ud_bail:
 #endif
 	{
 		ep_ptr->cm_handle = cm_ptr; /* only RC, multi CR's on UD */
-		dapl_evd_connection_callback(cm_ptr,
-					     event,
-					     cm_ptr->msg.p_data, ep_ptr);
+		dapl_evd_connection_callback(cm_ptr, event, cm_ptr->msg.p_data, ep_ptr);
 	}
 	return;
 
@@ -883,7 +869,7 @@ dapli_socket_listen(DAPL_IA * ia_ptr, DAT_CONN_QUAL serviceID, DAPL_SP * sp_ptr)
 		     ntohs(serviceID), cm_ptr, cm_ptr->socket);
 
 	return dat_status;
-      bail:
+bail:
 	dapl_dbg_log(DAPL_DBG_TYPE_CM,
 		     " listen: ERROR on conn_qual 0x%x\n", serviceID);
 	dapls_ib_cm_free(cm_ptr, cm_ptr->ep);
@@ -1026,7 +1012,7 @@ bail:
  * queue on work thread to receive RTU information to avoid blocking
  * user thread. 
  */
-DAT_RETURN
+static DAT_RETURN
 dapli_socket_accept_usr(DAPL_EP * ep_ptr,
 			DAPL_CR * cr_ptr, DAT_COUNT p_size, DAT_PVOID p_data)
 {
@@ -1108,10 +1094,14 @@ dapli_socket_accept_usr(DAPL_EP * ep_ptr,
 	local.daddr.so = ia_ptr->hca_ptr->hca_address;
 	((struct sockaddr_in *)&local.daddr.so)->sin_port = 
 				htons((uint16_t)cm_ptr->sp->conn_qual);
+	cm_ptr->ep = ep_ptr;
+	cm_ptr->hca = ia_ptr->hca_ptr;
+	cm_ptr->state = DCM_ACCEPTED;
 
 	local.p_size = htons(p_size);
 	iov[0].iov_base = (void *)&local;
 	iov[0].iov_len = exp;
+	
 	if (p_size) {
 		iov[1].iov_base = p_data;
 		iov[1].iov_len = p_size;
@@ -1139,14 +1129,9 @@ dapli_socket_accept_usr(DAPL_EP * ep_ptr,
 		     (unsigned long long)
 		     htonll(local.saddr.ib.gid.global.interface_id));
 
-	/* save state and reference to EP, queue for RTU data */
-	cm_ptr->ep = ep_ptr;
-	cm_ptr->hca = ia_ptr->hca_ptr;
-	cm_ptr->state = DCM_ACCEPTED;
-
 	dapl_dbg_log(DAPL_DBG_TYPE_EP, " PASSIVE: accepted!\n");
 	return DAT_SUCCESS;
-      bail:
+bail:
 	dapls_ib_cm_free(cm_ptr, cm_ptr->ep);
 	dapls_modify_qp_state(ep_ptr->qp_handle, IBV_QPS_ERR, 0, 0, 0);
 	return DAT_INTERNAL_ERROR;
@@ -1155,7 +1140,7 @@ dapli_socket_accept_usr(DAPL_EP * ep_ptr,
 /*
  * PASSIVE: read RTU from active peer, post CONN event
  */
-void dapli_socket_accept_rtu(dp_ib_cm_handle_t cm_ptr)
+static void dapli_socket_accept_rtu(dp_ib_cm_handle_t cm_ptr)
 {
 	int len;
 	ib_cm_events_t event = IB_CME_CONNECTED;
@@ -1221,8 +1206,9 @@ void dapli_socket_accept_rtu(dp_ib_cm_handle_t cm_ptr)
                 closesocket(cm_ptr->socket);
                 cm_ptr->socket = DAPL_INVALID_SOCKET;
 		cm_ptr->state = DCM_RELEASED;
-	} else {
+	} else 
 #endif
+	{
 		cm_ptr->ep->cm_handle = cm_ptr; /* only RC, multi CR's on UD */
 		dapls_cr_callback(cm_ptr, event, NULL, cm_ptr->sp);
 	}
@@ -1399,19 +1385,12 @@ dapls_ib_remove_conn_listener(IN DAPL_IA * ia_ptr, IN DAPL_SP * sp_ptr)
 
 	/* close accepted socket, free cm_srvc_handle and return */
 	if (cm_ptr != NULL) {
-		if (cm_ptr->socket != DAPL_INVALID_SOCKET) {
-			shutdown(cm_ptr->socket, SHUT_RDWR);
-			closesocket(cm_ptr->socket);
-			cm_ptr->socket = DAPL_INVALID_SOCKET;
-		}
 		/* cr_thread will free */
+		dapl_os_lock(&cm_ptr->lock);
 		cm_ptr->state = DCM_DESTROY;
 		sp_ptr->cm_srvc_handle = NULL;
-		if (send(cm_ptr->hca->ib_trans.scm[1], 
-			 "w", sizeof "w", 0) == -1)
-			dapl_log(DAPL_DBG_TYPE_CM,
-				 " cm_destroy: thread wakeup error = %s\n",
-				 strerror(errno));
+		send(cm_ptr->hca->ib_trans.scm[1], "w", sizeof "w", 0);
+		dapl_os_unlock(&cm_ptr->lock);
 	}
 	return DAT_SUCCESS;
 }
@@ -1492,29 +1471,26 @@ dapls_ib_reject_connection(IN dp_ib_cm_handle_t cm_ptr,
         if (psize > DCM_MAX_PDATA_SIZE)
                 return DAT_LENGTH_ERROR;
 
-	/* write reject data to indicate reject */
-	if (cm_ptr->socket != DAPL_INVALID_SOCKET) {
-		cm_ptr->msg.op = htons(DCM_REJ_USER);
-		cm_ptr->msg.p_size = htons(psize);
-		
-		iov[0].iov_base = (void *)&cm_ptr->msg;
-		iov[0].iov_len = sizeof(ib_cm_msg_t) - DCM_MAX_PDATA_SIZE;
-		if (psize) {
-			iov[1].iov_base = pdata;
-			iov[1].iov_len = psize;
-			writev(cm_ptr->socket, iov, 2);
-		} else {
-			writev(cm_ptr->socket, iov, 1);
-		}
+	dapl_os_lock(&cm_ptr->lock);
 
-		shutdown(cm_ptr->socket, SHUT_RDWR);
-		closesocket(cm_ptr->socket);
-		cm_ptr->socket = DAPL_INVALID_SOCKET;
+	/* write reject data to indicate reject */
+	cm_ptr->msg.op = htons(DCM_REJ_USER);
+	cm_ptr->msg.p_size = htons(psize);
+	
+	iov[0].iov_base = (void *)&cm_ptr->msg;
+	iov[0].iov_len = sizeof(ib_cm_msg_t) - DCM_MAX_PDATA_SIZE;
+	if (psize) {
+		iov[1].iov_base = pdata;
+		iov[1].iov_len = psize;
+		writev(cm_ptr->socket, iov, 2);
+	} else {
+		writev(cm_ptr->socket, iov, 1);
 	}
 
 	/* cr_thread will destroy CR */
 	cm_ptr->state = DCM_DESTROY;
 	send(cm_ptr->hca->ib_trans.scm[1], "w", sizeof "w", 0);
+	dapl_os_unlock(&cm_ptr->lock);
 	return DAT_SUCCESS;
 }
 
@@ -1734,19 +1710,25 @@ void cr_thread(void *arg)
 			next_cr = dapl_llist_next_entry(&hca_ptr->ib_trans.list,
 							(DAPL_LLIST_ENTRY *) &
 							cr->entry);
+			dapl_os_lock(&cr->lock);
 			if (cr->state == DCM_DESTROY
 			    || hca_ptr->ib_trans.cr_state != IB_THREAD_RUN) {
+				dapl_os_unlock(&cr->lock);
 				dapl_llist_remove_entry(&hca_ptr->ib_trans.list,
 							(DAPL_LLIST_ENTRY *) &
 							cr->entry);
 				dapl_dbg_log(DAPL_DBG_TYPE_CM, 
 					     " CR FREE: %p ep=%p st=%d sock=%d\n", 
 					     cr, cr->ep, cr->state, cr->socket);
+				shutdown(cr->socket, SHUT_RDWR);
+				closesocket(cr->socket);
 				dapl_os_free(cr, sizeof(*cr));
 				continue;
 			}
-			if (cr->socket == DAPL_INVALID_SOCKET) 
+			if (cr->socket == DAPL_INVALID_SOCKET) {
+				dapl_os_unlock(&cr->lock);
 				continue;
+			}
 
 			event = (cr->state == DCM_CONN_PENDING) ?
 						DAPL_FD_WRITE : DAPL_FD_READ;
@@ -1757,10 +1739,11 @@ void cr_thread(void *arg)
 					 " -> %s\n", cr->state, cr->socket,
 					 inet_ntoa(((struct sockaddr_in *)
 						&cr->msg.daddr.so)->sin_addr));
+				dapl_os_unlock(&cr->lock);
 				dapls_ib_cm_free(cr, cr->ep);
 				continue;
 			}
-
+			dapl_os_unlock(&cr->lock);
 			dapl_dbg_log(DAPL_DBG_TYPE_CM,
 				     " poll cr=%p, sck=%d\n", cr, cr->socket);
 			dapl_os_unlock(&hca_ptr->ib_trans.lock);
@@ -1784,7 +1767,7 @@ void cr_thread(void *arg)
 					case DCM_ACCEPTED:
 						dapli_socket_accept_rtu(cr);
 						break;
-					case DCM_RTU_PENDING:
+					case DCM_REP_PENDING:
 						dapli_socket_connect_rtu(cr);
 						break;
 					case DCM_CONNECTED:
@@ -1846,7 +1829,7 @@ void cr_thread(void *arg)
 
 	dapl_os_unlock(&hca_ptr->ib_trans.lock);
 	dapl_os_free(set, sizeof(struct dapl_fd_set));
-      out:
+out:
 	hca_ptr->ib_trans.cr_state = IB_THREAD_EXIT;
 	dapl_dbg_log(DAPL_DBG_TYPE_UTIL, " cr_thread(hca %p) exit\n", hca_ptr);
 }
-- 
1.5.2.5





More information about the ofw mailing list