[ofa-general] [PATCH] uDAPL (v2.0) scm: multi-hca CM processing broken. Need cr thread wakeup mechanism per HCA.

Davis, Arlin R arlin.r.davis at intel.com
Mon May 18 12:08:24 PDT 2009


Currently there is only one pipe across all
device opens. This results in some posted CR work
getting delayed or not processed at all. Provide
pipe for each device open and cr thread created
and manage on a per device level.

Signed-off-by: Arlin Davis <arlin.r.davis at intel.com>
---
 dapl/openib_scm/dapl_ib_cm.c   |   23 ++++++------
 dapl/openib_scm/dapl_ib_util.c |   74 +++++++++++++++++++++++++---------------
 dapl/openib_scm/dapl_ib_util.h |    1 +
 3 files changed, 58 insertions(+), 40 deletions(-)

diff --git a/dapl/openib_scm/dapl_ib_cm.c b/dapl/openib_scm/dapl_ib_cm.c
index a2b02eb..9cad5be 100644
--- a/dapl/openib_scm/dapl_ib_cm.c
+++ b/dapl/openib_scm/dapl_ib_cm.c
@@ -54,8 +54,6 @@
 #include "dapl_ib_util.h"
 #include "dapl_osd.h"
 
-extern DAPL_SOCKET g_scm[2];
-
 #if defined(_WIN32) || defined(_WIN64)
 enum DAPL_FD_EVENTS {
 	DAPL_FD_READ = 0x1,
@@ -282,7 +280,7 @@ static void dapli_cm_destroy(struct ib_cm_handle *cm_ptr)
 	dapl_os_unlock(&cm_ptr->lock);
 
 	/* wakeup work thread */
-	if (send(g_scm[1], "w", sizeof "w", 0) == -1)
+	if (send(cm_ptr->hca->ib_trans.scm[1], "w", sizeof "w", 0) == -1)
 		dapl_log(DAPL_DBG_TYPE_CM,
 			 " cm_destroy: thread wakeup error = %s\n",
 			 strerror(errno));
@@ -299,7 +297,7 @@ static void dapli_cm_queue(struct ib_cm_handle *cm_ptr)
 	dapl_os_unlock(&cm_ptr->hca->ib_trans.lock);
 
 	/* wakeup CM work thread */
-	if (send(g_scm[1], "w", sizeof "w", 0) == -1)
+	if (send(cm_ptr->hca->ib_trans.scm[1], "w", sizeof "w", 0) == -1)
 		dapl_log(DAPL_DBG_TYPE_CM,
 			 " cm_queue: thread wakeup error = %s\n",
 			 strerror(errno));
@@ -1210,7 +1208,8 @@ dapls_ib_remove_conn_listener(IN DAPL_IA * ia_ptr, IN DAPL_SP * sp_ptr)
 		/* cr_thread will free */
 		cm_ptr->state = SCM_DESTROY;
 		sp_ptr->cm_srvc_handle = NULL;
-		if (send(g_scm[1], "w", sizeof "w", 0) == -1)
+		if (send(cm_ptr->hca->ib_trans.scm[1], 
+			 "w", sizeof "w", 0) == -1)
 			dapl_log(DAPL_DBG_TYPE_CM,
 				 " cm_destroy: thread wakeup error = %s\n",
 				 strerror(errno));
@@ -1312,7 +1311,7 @@ dapls_ib_reject_connection(IN dp_ib_cm_handle_t cm_ptr,
 
 	/* cr_thread will destroy CR */
 	cm_ptr->state = SCM_REJECTED;
-	if (send(g_scm[1], "w", sizeof "w", 0) == -1)
+	if (send(cm_ptr->hca->ib_trans.scm[1], "w", sizeof "w", 0) == -1)
 		dapl_log(DAPL_DBG_TYPE_CM,
 			 " cm_destroy: thread wakeup error = %s\n",
 			 strerror(errno));
@@ -1552,7 +1551,7 @@ void cr_thread(void *arg)
 
 	while (hca_ptr->ib_trans.cr_state == IB_THREAD_RUN) {
 		dapl_fd_zero(set);
-		dapl_fd_set(g_scm[0], set, DAPL_FD_READ);
+		dapl_fd_set(hca_ptr->ib_trans.scm[0], set, DAPL_FD_READ);
 
 		if (!dapl_llist_is_empty(&hca_ptr->ib_trans.list))
 			next_cr = dapl_llist_peek_head(&hca_ptr->ib_trans.list);
@@ -1652,9 +1651,8 @@ void cr_thread(void *arg)
 						    &cr->dst.ia_address)->
 						   sin_addr));
 
-				/* POLLUP, NVAL, or poll error, issue event if connected */
-				if (cr->state == SCM_CONNECTED)
-					dapli_socket_disconnect(cr);
+				/* POLLUP, NVAL, or poll error. - DISC */
+				dapli_socket_disconnect(cr);
 			}
 
 			dapl_os_lock(&hca_ptr->ib_trans.lock);
@@ -1664,8 +1662,9 @@ void cr_thread(void *arg)
 		dapl_select(set);
 
 		/* if pipe used to wakeup, consume */
-		while (dapl_poll(g_scm[0], DAPL_FD_READ) == DAPL_FD_READ) {
-			if (recv(g_scm[0], rbuf, 2, 0) == -1)
+		while (dapl_poll(hca_ptr->ib_trans.scm[0], 
+				 DAPL_FD_READ) == DAPL_FD_READ) {
+			if (recv(hca_ptr->ib_trans.scm[0], rbuf, 2, 0) == -1)
 				dapl_log(DAPL_DBG_TYPE_CM,
 					 " cr_thread: read pipe error = %s\n",
 					 strerror(errno));
diff --git a/dapl/openib_scm/dapl_ib_util.c b/dapl/openib_scm/dapl_ib_util.c
index c95b0c2..30c71fa 100644
--- a/dapl/openib_scm/dapl_ib_util.c
+++ b/dapl/openib_scm/dapl_ib_util.c
@@ -58,7 +58,6 @@ static const char rcsid[] = "$Id:  $";
 #include <stdlib.h>
 
 int g_dapl_loopback_connection = 0;
-DAPL_SOCKET g_scm[2];
 
 enum ibv_mtu dapl_ib_mtu(int mtu)
 {
@@ -138,22 +137,7 @@ static DAT_RETURN getlocalipaddr(DAT_SOCK_ADDR * addr, int addr_len)
 	return ret;
 }
 
-/*
- * dapls_ib_init, dapls_ib_release
- *
- * Initialize Verb related items for device open
- *
- * Input:
- * 	none
- *
- * Output:
- *	none
- *
- * Returns:
- * 	0 success, -1 error
- *
- */
-int32_t dapls_ib_init(void)
+static int32_t create_cr_pipe(IN DAPL_HCA * hca_ptr)
 {
 	DAPL_SOCKET listen_socket;
 	struct sockaddr_in addr;
@@ -179,32 +163,58 @@ int32_t dapls_ib_init(void)
 	if (ret)
 		goto err1;
 
-	g_scm[1] = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP);
-	if (g_scm[1] == DAPL_INVALID_SOCKET)
+	hca_ptr->ib_trans.scm[1] = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP);
+	if (hca_ptr->ib_trans.scm[1] == DAPL_INVALID_SOCKET)
 		goto err1;
 
-	ret = connect(g_scm[1], (struct sockaddr *)&addr, sizeof(addr));
+	ret = connect(hca_ptr->ib_trans.scm[1], 
+		      (struct sockaddr *)&addr, sizeof(addr));
 	if (ret)
 		goto err2;
 
-	g_scm[0] = accept(listen_socket, NULL, NULL);
-	if (g_scm[0] == DAPL_INVALID_SOCKET)
+	hca_ptr->ib_trans.scm[0] = accept(listen_socket, NULL, NULL);
+	if (hca_ptr->ib_trans.scm[0] == DAPL_INVALID_SOCKET)
 		goto err2;
 
 	closesocket(listen_socket);
 	return 0;
 
       err2:
-	closesocket(g_scm[1]);
+	closesocket(hca_ptr->ib_trans.scm[1]);
       err1:
 	closesocket(listen_socket);
 	return 1;
 }
 
+static void destroy_cr_pipe(IN DAPL_HCA * hca_ptr)
+{
+	closesocket(hca_ptr->ib_trans.scm[0]);
+	closesocket(hca_ptr->ib_trans.scm[1]);
+}
+
+
+/*
+ * dapls_ib_init, dapls_ib_release
+ *
+ * Initialize Verb related items for device open
+ *
+ * Input:
+ * 	none
+ *
+ * Output:
+ *	none
+ *
+ * Returns:
+ * 	0 success, -1 error
+ *
+ */
+int32_t dapls_ib_init(void)
+{
+	return 0;
+}
+
 int32_t dapls_ib_release(void)
 {
-	closesocket(g_scm[0]);
-	closesocket(g_scm[1]);
 	return 0;
 }
 
@@ -382,6 +392,14 @@ DAT_RETURN dapls_ib_open_hca(IN IB_HCA_NAME hca_name, IN DAPL_HCA * hca_ptr)
 	/* initialize CM list for listens on this HCA */
 	dapl_llist_init_head(&hca_ptr->ib_trans.list);
 
+	/* initialize pipe, user level wakeup on select */
+	if (create_cr_pipe(hca_ptr)) {
+		dapl_log(DAPL_DBG_TYPE_ERR,
+			 " open_hca: failed to init cr pipe - %s\n",
+			 strerror(errno));
+		goto bail;
+	}
+
 	/* create thread to process inbound connect request */
 	hca_ptr->ib_trans.cr_state = IB_THREAD_INIT;
 	dat_status = dapl_os_thread_create(cr_thread,
@@ -455,21 +473,21 @@ DAT_RETURN dapls_ib_close_hca(IN DAPL_HCA * hca_ptr)
 
 	/* destroy cr_thread and lock */
 	hca_ptr->ib_trans.cr_state = IB_THREAD_CANCEL;
-	if (send(g_scm[1], "w", sizeof "w", 0) == -1)
+	if (send(hca_ptr->ib_trans.scm[1], "w", sizeof "w", 0) == -1)
 		dapl_log(DAPL_DBG_TYPE_UTIL,
 			 " thread_destroy: thread wakeup err = %s\n",
 			 strerror(errno));
 	while (hca_ptr->ib_trans.cr_state != IB_THREAD_EXIT) {
 		dapl_dbg_log(DAPL_DBG_TYPE_UTIL,
 			     " close_hca: waiting for cr_thread\n");
-		if (send(g_scm[1], "w", sizeof "w", 0) == -1)
+		if (send(hca_ptr->ib_trans.scm[1], "w", sizeof "w", 0) == -1)
 			dapl_log(DAPL_DBG_TYPE_UTIL,
 				 " thread_destroy: thread wakeup err = %s\n",
 				 strerror(errno));
 		dapl_os_sleep_usec(2000);
 	}
 	dapl_os_lock_destroy(&hca_ptr->ib_trans.lock);
-
+	destroy_cr_pipe(hca_ptr); /* no longer need pipe */
 	return (DAT_SUCCESS);
 }
 
diff --git a/dapl/openib_scm/dapl_ib_util.h b/dapl/openib_scm/dapl_ib_util.h
index 5493312..e924572 100644
--- a/dapl/openib_scm/dapl_ib_util.h
+++ b/dapl/openib_scm/dapl_ib_util.h
@@ -304,6 +304,7 @@ typedef struct _ib_hca_transport
 	uint8_t			tclass;
 	uint8_t			mtu;
 	DAT_NAMED_ATTR		named_attr;
+	DAPL_SOCKET		scm[2];
 } ib_hca_transport_t;
 
 /* provider specfic fields for shared memory support */
-- 
1.5.2.5




More information about the general mailing list