[ofa-general] [PATCH 5/5] [DAPL] dapl/ibal-scm: update ibal-scm provider

Sean Hefty sean.hefty at intel.com
Fri Jan 30 10:59:17 PST 2009


From: Stan Smith <stan.smith at intel.com>

Update the dapl.git tree with the latest SVN version of the
ibal-scm provider.

Signed-off-by: Sean Hefty <sean.hefty at intel.com>
---
Actual codng changes were made by Stan.  I'm just submitting the patch
to update the DAPL git repository.

 dapl/ibal-scm/dapl_ibal-scm_cm.c   |  775 +++++++++++++++++++++++++-----------
 dapl/ibal-scm/dapl_ibal-scm_util.c |   44 ++
 2 files changed, 584 insertions(+), 235 deletions(-)

diff --git a/dapl/ibal-scm/dapl_ibal-scm_cm.c b/dapl/ibal-scm/dapl_ibal-scm_cm.c
index df83008..6a050b8 100644
--- a/dapl/ibal-scm/dapl_ibal-scm_cm.c
+++ b/dapl/ibal-scm/dapl_ibal-scm_cm.c
@@ -63,6 +63,88 @@
 #include <ws2tcpip.h>
 #include <io.h>
 
+extern int g_scm_pipe[2];
+
+extern DAT_RETURN
+dapls_ib_query_gid( IN  DAPL_HCA	*hca_ptr,
+		    IN  GID		*gid );
+
+
+static struct ib_cm_handle * dapli_cm_create(void)
+{ 
+	struct ib_cm_handle *cm_ptr;
+
+	/* Allocate CM, init lock, and initialize */
+	if ((cm_ptr = dapl_os_alloc(sizeof(*cm_ptr))) == NULL) 
+		return NULL;
+
+        if (dapl_os_lock_init(&cm_ptr->lock)) 
+		goto bail;
+
+	(void)dapl_os_memzero(cm_ptr, sizeof(*cm_ptr));
+	cm_ptr->dst.ver = htons(DSCM_VER);
+	cm_ptr->socket = -1;
+	cm_ptr->l_socket = -1;
+	return cm_ptr;
+bail:
+	dapl_os_free(cm_ptr, sizeof(*cm_ptr));
+	return NULL;
+}
+
+
+/* mark for destroy, remove all references, schedule cleanup */
+
+static void dapli_cm_destroy(struct ib_cm_handle *cm_ptr)
+{
+	dapl_dbg_log(DAPL_DBG_TYPE_CM, 
+		     " cm_destroy: cm %p ep %p\n", cm_ptr,cm_ptr->ep);
+	
+	/* cleanup, never made it to work queue */
+	if (cm_ptr->state == SCM_INIT) {
+		if (cm_ptr->socket >= 0)  
+			closesocket(cm_ptr->socket);
+		if (cm_ptr->l_socket >= 0)  
+			closesocket(cm_ptr->l_socket);
+		dapl_os_free(cm_ptr, sizeof(*cm_ptr));
+		return;
+	}
+
+	dapl_os_lock(&cm_ptr->lock);
+	cm_ptr->state = SCM_DESTROY;
+	if (cm_ptr->ep) 
+		cm_ptr->ep->cm_handle = IB_INVALID_HANDLE;
+
+	/* close socket if still active */
+	if (cm_ptr->socket >= 0) {
+		closesocket(cm_ptr->socket);
+		cm_ptr->socket = -1;
+	}
+	if (cm_ptr->l_socket >= 0) {
+		closesocket(cm_ptr->l_socket);
+		cm_ptr->l_socket = -1;
+	}
+	dapl_os_unlock(&cm_ptr->lock);
+
+	/* wakeup work thread */
+	_write(g_scm_pipe[1], "w", sizeof "w");
+}
+
+
+/* queue socket for processing CM work */
+static void dapli_cm_queue(struct ib_cm_handle *cm_ptr)
+{
+	/* add to work queue for cr thread processing */
+	dapl_llist_init_entry((DAPL_LLIST_ENTRY*)&cm_ptr->entry);
+	dapl_os_lock( &cm_ptr->hca->ib_trans.lock );
+	dapl_llist_add_tail((DAPL_LLIST_HEAD*)&cm_ptr->hca->ib_trans.list, 
+			    (DAPL_LLIST_ENTRY*)&cm_ptr->entry, (void*)cm_ptr);
+	dapl_os_unlock(&cm_ptr->hca->ib_trans.lock);
+
+        /* wakeup CM work thread */
+        _write(g_scm_pipe[1], "w", sizeof "w");
+}
+
+
 
 static uint16_t
 dapli_get_lid(IN DAPL_HCA *hca, IN int port)
@@ -123,6 +205,263 @@ dapli_get_lid(IN DAPL_HCA *hca, IN int port)
 
 
 /*
+ * ACTIVE/PASSIVE: called from CR thread or consumer via ep_disconnect
+ */
+static DAT_RETURN 
+dapli_socket_disconnect(dp_ib_cm_handle_t cm_ptr)
+{
+	DAPL_EP	*ep_ptr = cm_ptr->ep;
+	DAT_UINT32 disc_data = htonl(0xdead);
+
+	if (ep_ptr == NULL)
+		return DAT_SUCCESS;
+	
+	dapl_os_lock(&cm_ptr->lock);
+	if ((cm_ptr->state == SCM_INIT) ||
+	    (cm_ptr->state == SCM_DISCONNECTED)) {
+		dapl_os_unlock(&cm_ptr->lock);
+		return DAT_SUCCESS;
+	} else {
+		/* send disc date, close socket, schedule destroy */
+		if (cm_ptr->socket >= 0) { 
+			send(cm_ptr->socket, (const char *)&disc_data,
+				sizeof(disc_data), 0);
+			closesocket(cm_ptr->socket);
+			cm_ptr->socket = -1;
+		}
+		cm_ptr->state = SCM_DISCONNECTED;
+		_write(g_scm_pipe[1], "w", sizeof "w");
+	}
+	dapl_os_unlock(&cm_ptr->lock);
+
+
+	if (ep_ptr->cr_ptr) {
+		dapls_cr_callback(cm_ptr,
+				  IB_CME_DISCONNECTED,
+				  NULL,
+				  ((DAPL_CR *)ep_ptr->cr_ptr)->sp_ptr);
+	} else {
+		dapl_evd_connection_callback(ep_ptr->cm_handle,
+					     IB_CME_DISCONNECTED,
+					     NULL,
+					     ep_ptr);
+	}	
+
+	/* remove reference from endpoint */
+	ep_ptr->cm_handle = NULL;
+	
+	/* schedule destroy */
+
+
+	return DAT_SUCCESS;
+}
+
+
+
+/*
+ * PASSIVE: consumer accept, send local QP information, private data, 
+ * queue on work thread to receive RTU information to avoid blocking
+ * user thread. 
+ */
+static DAT_RETURN 
+dapli_socket_accept_usr( DAPL_EP	*ep_ptr,
+			 DAPL_CR	*cr_ptr,
+			 DAT_COUNT	p_size,
+		         DAT_PVOID	p_data )
+{
+	DAPL_IA		*ia_ptr = ep_ptr->header.owner_ia;
+	dp_ib_cm_handle_t cm_ptr = cr_ptr->ib_cm_handle;
+	WSABUF		iovec[2];
+	int		len, rc;
+	short		rtu_data = 0;
+	ib_api_status_t	ibs;
+	ib_qp_attr_t	qpa;
+	dapl_ibal_port_t *p_port;
+	dapl_ibal_ca_t  *p_ca;
+
+	dapl_dbg_log (DAPL_DBG_TYPE_EP, "%s() p_sz %d sock %d port 0x%x\n",
+			__FUNCTION__,p_size,cm_ptr->socket,
+			ia_ptr->hca_ptr->port_num);
+
+	if (p_size >  IB_MAX_REP_PDATA_SIZE) 
+		return DAT_LENGTH_ERROR;
+
+	/* must have a accepted socket */
+	if ( cm_ptr->socket < 0 ) {
+		dapl_dbg_log(DAPL_DBG_TYPE_EP, 
+		     "%s() Not accepted socket? remote port=0x%x lid=0x%x"
+		     " qpn=0x%x psize=%d\n",
+		     cm_ptr->dst.port, cm_ptr->dst.lid,
+		     ntohs(cm_ptr->dst.qpn), cm_ptr->dst.p_size); 
+		return DAT_INTERNAL_ERROR;
+	}
+	
+	dapl_dbg_log(DAPL_DBG_TYPE_EP, 
+		     " accept_usr: remote port=0x%x lid=0x%x"
+		     " qpn=0x%x psize=%d\n",
+		     cm_ptr->dst.port, cm_ptr->dst.lid,
+		     ntohs(cm_ptr->dst.qpn), cm_ptr->dst.p_size); 
+
+	/* modify QP to RTR and then to RTS with remote info already read */
+
+	p_ca = (dapl_ibal_ca_t *) ia_ptr->hca_ptr->ib_hca_handle;
+	p_port = dapli_ibal_get_port (p_ca, (uint8_t)ia_ptr->hca_ptr->port_num);
+	if (!p_port)
+	{
+		dapl_dbg_log(DAPL_DBG_TYPE_ERR,
+			"%s() dapli_ibal_get_port() failed @ line #%d\n",
+			__FUNCTION__,__LINE__);
+		goto bail;
+	}
+
+	dapl_dbg_log(DAPL_DBG_TYPE_EP,
+			"%s() DST: qpn 0x%x port 0x%x lid %x psize %d\n",
+			__FUNCTION__,
+			cl_ntoh32(cm_ptr->dst.qpn),
+			cm_ptr->dst.port,
+			cl_ntoh16(cm_ptr->dst.lid), cm_ptr->dst.p_size);
+
+	/* modify QP to RTR and then to RTS with remote info */
+
+	ibs = dapls_modify_qp_state_to_rtr( ep_ptr->qp_handle, 
+					    cm_ptr->dst.qpn,
+					    cm_ptr->dst.lid,
+					    p_port );
+	if (ibs != IB_SUCCESS)
+	{
+		dapl_dbg_log(DAPL_DBG_TYPE_ERR,
+				"%s() QP --> RTR failed @ line #%d\n",
+				__FUNCTION__,__LINE__);
+		goto bail;
+	}
+
+	if ( dapls_modify_qp_state_to_rts( ep_ptr->qp_handle ) )
+	{
+		dapl_dbg_log(DAPL_DBG_TYPE_ERR,
+				"%s() QP --> RTS failed @ line #%d\n",
+				__FUNCTION__,__LINE__);
+		goto bail;
+	}
+
+	ep_ptr->qp_state = IB_QP_STATE_RTS;
+	
+	/* save remote address information */
+	dapl_os_memcpy( &ep_ptr->remote_ia_address, 
+			&cm_ptr->dst.ia_address, 
+			sizeof(ep_ptr->remote_ia_address));
+
+	/* determine QP & port numbers */
+	ibs = ib_query_qp(ep_ptr->qp_handle, &qpa);
+	if (ibs != IB_SUCCESS)
+	{
+		dapl_dbg_log(DAPL_DBG_TYPE_ERR, 
+			     " ib_query_qp() ERR %s\n", ib_get_err_str(ibs)); 
+		goto bail;
+	}
+
+	/* Send our QP info, IA address, and private data */
+	cm_ptr->dst.qpn = qpa.num; /* ib_net32_t */
+	cm_ptr->dst.port = ia_ptr->hca_ptr->port_num;
+	cm_ptr->dst.lid = dapli_get_lid(ia_ptr->hca_ptr, ia_ptr->hca_ptr->port_num);
+	/* set gid in network order */
+	ibs = dapls_ib_query_gid( ia_ptr->hca_ptr, &cm_ptr->dst.gid );
+	if ( ibs != IB_SUCCESS )
+	{
+		dapl_dbg_log(DAPL_DBG_TYPE_ERR, 
+			"%s() dapls_ib_query_gid() returns '%s'\n",
+			__FUNCTION__,ib_get_err_str(ibs)); 
+		goto bail;
+	}
+
+	cm_ptr->dst.ia_address = ia_ptr->hca_ptr->hca_address;
+	cm_ptr->dst.p_size = p_size;
+
+	dapl_dbg_log(DAPL_DBG_TYPE_CM,
+		"%s()\n  Tx QP info: qpn %x port 0x%x lid 0x%x p_sz %d IP %s\n",
+		__FUNCTION__, cl_ntoh32(cm_ptr->dst.qpn), cm_ptr->dst.port,
+		cl_ntoh16(cm_ptr->dst.lid), cm_ptr->dst.p_size,
+		dapli_get_ip_addr_str(&cm_ptr->dst.ia_address,NULL));
+
+	/* network byte-ordering - QPN & LID already are */
+	cm_ptr->dst.p_size = cl_hton32(cm_ptr->dst.p_size);
+	cm_ptr->dst.port = cl_hton16(cm_ptr->dst.port);
+
+	iovec[0].buf = (char*)&cm_ptr->dst;
+	iovec[0].len  = sizeof(ib_qp_cm_t);
+	if (p_size) {
+		iovec[1].buf = p_data;
+		iovec[1].len  = p_size;
+	}
+	rc = WSASend( cm_ptr->socket, iovec, (p_size ? 2:1), &len, 0, 0, 0 );
+    	if (rc || len != (p_size + sizeof(ib_qp_cm_t))) {
+		dapl_dbg_log(DAPL_DBG_TYPE_ERR, 
+			     " accept_usr: ERR %d, wcnt=%d\n",
+			     WSAGetLastError(), len); 
+		goto bail;
+	}
+	dapl_dbg_log(DAPL_DBG_TYPE_CM, 
+		     " accept_usr: local port=0x%x lid=0x%x"
+		     " qpn=0x%x psize=%d\n",
+		     ntohs(cm_ptr->dst.port), ntohs(cm_ptr->dst.lid), 
+		     ntohl(cm_ptr->dst.qpn), ntohl(cm_ptr->dst.p_size)); 
+	
+	/* save state and reference to EP, queue for RTU data */
+	cm_ptr->ep = ep_ptr;
+	cm_ptr->hca = ia_ptr->hca_ptr;
+	cm_ptr->state = SCM_ACCEPTED;
+
+	/* restore remote address information for query */
+	dapl_os_memcpy( &cm_ptr->dst.ia_address, 
+			&ep_ptr->remote_ia_address,
+			sizeof(cm_ptr->dst.ia_address));
+
+	dapl_dbg_log( DAPL_DBG_TYPE_EP," PASSIVE: accepted!\n" ); 
+	dapli_cm_queue(cm_ptr);
+
+	return DAT_SUCCESS;
+
+bail:
+	dapl_dbg_log( DAPL_DBG_TYPE_ERR," accept_usr: ERR !QP_RTR_RTS \n"); 
+	dapli_cm_destroy(cm_ptr);
+	dapls_ib_reinit_ep( ep_ptr ); /* reset QP state */
+
+	return DAT_INTERNAL_ERROR;
+}
+
+
+/*
+ * PASSIVE: read RTU from active peer, post CONN event
+ */
+void 
+dapli_socket_accept_rtu(dp_ib_cm_handle_t cm_ptr)
+{
+	int		len;
+	short		rtu_data = 0;
+
+	/* complete handshake after final QP state change */
+	len = recv(cm_ptr->socket, (char*)&rtu_data, sizeof(rtu_data), 0);
+	if ( len != sizeof(rtu_data) || ntohs(rtu_data) != 0x0e0f ) {
+		dapl_dbg_log(DAPL_DBG_TYPE_ERR, 
+			     " accept_rtu: ERR %d, rcnt=%d rdata=%x\n",
+			     WSAGetLastError(), len, ntohs(rtu_data) ); 
+		goto bail;
+	}
+
+	/* save state and reference to EP, queue for disc event */
+	cm_ptr->state = SCM_CONNECTED;
+
+	/* final data exchange if remote QP state is good to go */
+	dapl_dbg_log( DAPL_DBG_TYPE_EP," PASSIVE: connected!\n" ); 
+	dapls_cr_callback(cm_ptr, IB_CME_CONNECTED, NULL, cm_ptr->sp);
+	return;
+bail:
+	dapls_ib_reinit_ep(cm_ptr->ep); /* reset QP state */
+	dapli_cm_destroy(cm_ptr);
+	dapls_cr_callback(cm_ptr, IB_CME_DESTINATION_REJECT, NULL, cm_ptr->sp);
+}
+
+
+/*
  * ACTIVE: Create socket, connect, and exchange QP information 
  */
 static DAT_RETURN 
@@ -143,21 +482,16 @@ dapli_socket_connect (	DAPL_EP			*ep_ptr,
 	dapl_ibal_port_t *p_port;
 	dapl_ibal_ca_t  *p_ca;
 	
-	dapl_dbg_log(DAPL_DBG_TYPE_EP, " connect: r_qual %d\n", r_qual);
+	dapl_dbg_log(DAPL_DBG_TYPE_EP, " connect: r_qual %d psize %d\n",
+		     r_qual, p_size);
 			
-	/*
-	 *  Allocate CM and initialize
-	 */
-	if ((cm_ptr = dapl_os_alloc(sizeof(*cm_ptr))) == NULL ) {
+	cm_ptr = dapli_cm_create();
+	if (cm_ptr == NULL)
 		return DAT_INSUFFICIENT_RESOURCES;
-	}
-
-	(void) dapl_os_memzero( cm_ptr, sizeof(*cm_ptr) );
-	cm_ptr->socket = -1;
 
 	/* create, connect, sockopt, and exchange QP information */
 	if ((cm_ptr->socket = socket(AF_INET,SOCK_STREAM,0)) < 0 ) {
-		dapl_os_free( cm_ptr, sizeof( *cm_ptr ) );
+		dapli_cm_destroy(cm_ptr);
 		return DAT_INSUFFICIENT_RESOURCES;
 	}
 
@@ -166,7 +500,7 @@ dapli_socket_connect (	DAPL_EP			*ep_ptr,
 	if (connect(cm_ptr->socket, r_addr, sizeof(*r_addr)) == SOCKET_ERROR) {
 		dapl_dbg_log(DAPL_DBG_TYPE_ERR, " connect: %d on r_qual %d\n",
 			     WSAGetLastError(), (unsigned int)r_qual);
-		dapl_os_free( cm_ptr, sizeof( *cm_ptr ) );
+		dapli_cm_destroy(cm_ptr);
 		return DAT_INVALID_ADDRESS;
 	}
 
@@ -175,6 +509,8 @@ dapli_socket_connect (	DAPL_EP			*ep_ptr,
 			  (const char*)&opt,
 			  sizeof(opt) );
 	
+	dapl_dbg_log(DAPL_DBG_TYPE_EP, " socket connected!\n");
+
 	/* determine QP & port numbers */
 	ibs = ib_query_qp(ep_ptr->qp_handle, &qpa);
 	if (ibs != IB_SUCCESS)
@@ -187,7 +523,6 @@ dapli_socket_connect (	DAPL_EP			*ep_ptr,
 	/* Send QP info, IA address and private data */
 	cm_ptr->dst.qpn = qpa.num; /* ib_net32_t */
 	cm_ptr->dst.port = cl_hton16(ia_ptr->hca_ptr->port_num);
-
 	cm_ptr->dst.lid = dapli_get_lid( ia_ptr->hca_ptr, 
 					 ia_ptr->hca_ptr->port_num );
 	if (cm_ptr->dst.lid == 0)
@@ -197,6 +532,17 @@ dapli_socket_connect (	DAPL_EP			*ep_ptr,
 				__FUNCTION__, __LINE__); 
 		goto bail;
 	}
+
+	/* set gid in network order */
+	ibs = dapls_ib_query_gid( ia_ptr->hca_ptr, &cm_ptr->dst.gid );
+	if ( ibs != IB_SUCCESS )
+	{
+		dapl_dbg_log(DAPL_DBG_TYPE_ERR, 
+			"%s() dapls_ib_query_gid() returns '%s'\n",
+			__FUNCTION__,ib_get_err_str(ibs)); 
+		goto bail;
+	}
+
 	cm_ptr->dst.ia_address = ia_ptr->hca_ptr->hca_address;
 	cm_ptr->dst.p_size = cl_hton32(p_size);
 
@@ -213,6 +559,8 @@ dapli_socket_connect (	DAPL_EP			*ep_ptr,
 		iovec[1].buf = p_data;
 		iovec[1].len  = p_size;
 	}
+
+	dapl_dbg_log(DAPL_DBG_TYPE_EP," socket connected, write QP and private data\n"); 
 	rc = WSASend (cm_ptr->socket, iovec, (p_size ? 2:1), &len, 0, 0, NULL);
     	if ( rc || len != (p_size + sizeof(ib_qp_cm_t)) ) {
 		dapl_dbg_log(DAPL_DBG_TYPE_ERR, 
@@ -225,17 +573,65 @@ dapli_socket_connect (	DAPL_EP			*ep_ptr,
 		     cm_ptr->dst.port, cm_ptr->dst.lid, 
 		     cm_ptr->dst.qpn, cm_ptr->dst.p_size ); 
 
+	/* queue up to work thread to avoid blocking consumer */
+	cm_ptr->state = SCM_CONN_PENDING;
+	cm_ptr->hca = ia_ptr->hca_ptr;
+	cm_ptr->ep = ep_ptr;
+	dapli_cm_queue(cm_ptr);
+	return DAT_SUCCESS;
+bail:
+	/* close socket, free cm structure */
+	dapli_cm_destroy(cm_ptr);
+	return DAT_INTERNAL_ERROR;
+}
+
+
+/*
+ * ACTIVE: exchange QP information, called from CR thread
+ */
+void 
+dapli_socket_connect_rtu(dp_ib_cm_handle_t cm_ptr)
+{
+	DAPL_EP		*ep_ptr = cm_ptr->ep;
+	DAPL_IA		*ia_ptr = cm_ptr->ep->header.owner_ia;
+	int		len, rc;
+	DWORD		ioflags;
+	WSABUF		iovec[1];
+	short		rtu_data = htons(0x0E0F);
+	ib_cm_events_t	event = IB_CME_DESTINATION_REJECT;
+	ib_api_status_t	ibs;
+	dapl_ibal_port_t *p_port;
+	dapl_ibal_ca_t  *p_ca;
+
 	/* read DST information into cm_ptr, overwrite SRC info */
+	dapl_dbg_log(DAPL_DBG_TYPE_EP," connect_rtu: recv peer QP data\n"); 
+
+	iovec[0].buf = (char*)&cm_ptr->dst;
+	iovec[0].len  = sizeof(ib_qp_cm_t);
 	ioflags = len = 0;
 	rc = WSARecv (cm_ptr->socket, iovec, 1, &len, &ioflags, 0, 0);
-	if ( rc == SOCKET_ERROR || len != sizeof(ib_qp_cm_t) ) {
-		dapl_dbg_log(DAPL_DBG_TYPE_ERR,"connect read: ERR %d rcnt=%d\n",
-			     WSAGetLastError(), len); 
+	if ( rc == SOCKET_ERROR || len != sizeof(ib_qp_cm_t) ||
+					ntohs(cm_ptr->dst.ver) != DSCM_VER )
+	{
+		dapl_dbg_log(DAPL_DBG_TYPE_ERR,
+			     "connect_rtu read: ERR %d rcnt=%d ver=%d\n",
+			     WSAGetLastError(), len, cm_ptr->dst.ver); 
+		goto bail;
+	}
+
+	/* check for consumer reject */
+	if (cm_ptr->dst.rej) {
+		dapl_dbg_log(DAPL_DBG_TYPE_CM, 
+			     " connect_rtu read: PEER REJ reason=0x%x\n",
+			     ntohs(cm_ptr->dst.rej)); 
+		event = IB_CME_DESTINATION_REJECT_PRIVATE_DATA;
 		goto bail;
 	}
 
-	/* revert back to host byte ordering */
+	/* convert peer response values to host order */
 	cm_ptr->dst.port = cl_ntoh16(cm_ptr->dst.port);
+	cm_ptr->dst.lid = ntohs(cm_ptr->dst.lid);
+	cm_ptr->dst.qpn = cm_ptr->dst.qpn;
 	cm_ptr->dst.p_size = cl_ntoh32(cm_ptr->dst.p_size);
 
 	dapl_dbg_log(DAPL_DBG_TYPE_EP, " connect: Rx DST: qpn %x port %d "
@@ -245,15 +641,27 @@ dapli_socket_connect (	DAPL_EP			*ep_ptr,
 			cm_ptr->dst.p_size,
 			dapli_get_ip_addr_str(&cm_ptr->dst.ia_address,NULL));
 
+	/* save remote address information */
+	dapl_os_memcpy( &ep_ptr->remote_ia_address, 
+			&cm_ptr->dst.ia_address, 
+			sizeof(ep_ptr->remote_ia_address));
+
+	dapl_dbg_log(DAPL_DBG_TYPE_EP, 
+		     " connect_rtu: DST %s port=0x%x lid=0x%x, qpn=0x%x, psize=%d\n",
+		     inet_ntoa(((struct sockaddr_in *)&cm_ptr->dst.ia_address)->sin_addr),
+		     cm_ptr->dst.port, cm_ptr->dst.lid, 
+		     cm_ptr->dst.qpn, cm_ptr->dst.p_size); 
+
 	/* validate private data size before reading */
-	if ( cm_ptr->dst.p_size > IB_MAX_REP_PDATA_SIZE ) {
+	if (cm_ptr->dst.p_size > IB_MAX_REP_PDATA_SIZE) {
 		dapl_dbg_log(DAPL_DBG_TYPE_ERR, 
-			     " connect read: psize (%d) wrong\n",
+			     " connect_rtu read: psize (%d) wrong\n",
 			     cm_ptr->dst.p_size ); 
 		goto bail;
 	}
 
 	/* read private data into cm_handle if any present */
+	dapl_dbg_log(DAPL_DBG_TYPE_EP," socket connected, read private data\n"); 
 	if ( cm_ptr->dst.p_size ) {
 		iovec[0].buf = cm_ptr->p_data;
 		iovec[0].len  = cm_ptr->dst.p_size;
@@ -300,32 +708,29 @@ dapli_socket_connect (	DAPL_EP			*ep_ptr,
 		 
 	ep_ptr->qp_state = IB_QP_STATE_RTS;
 
+	dapl_dbg_log(DAPL_DBG_TYPE_EP," connect_rtu: send RTU\n"); 
+
 	/* complete handshake after final QP state change */
 	send(cm_ptr->socket, (const char *)&rtu_data, sizeof(rtu_data), 0);
 
 	/* init cm_handle and post the event with private data */
 	ep_ptr->cm_handle = cm_ptr;
+	cm_ptr->state = SCM_CONNECTED;
 	dapl_dbg_log( DAPL_DBG_TYPE_EP," ACTIVE: connected!\n" ); 
 
 	dapl_evd_connection_callback(   ep_ptr->cm_handle, 
 					IB_CME_CONNECTED, 
 					cm_ptr->p_data, 
 					ep_ptr );	
-	return DAT_SUCCESS;
-
+	return;
 bail:
 	/* close socket, free cm structure and post error event */
-	if ( cm_ptr->socket >= 0 ) 
-		closesocket(cm_ptr->socket);
-
-	dapl_os_free( cm_ptr, sizeof( *cm_ptr ) );
-	dapls_ib_reinit_ep( ep_ptr ); /* reset QP state */
-
-	dapl_evd_connection_callback(	ep_ptr->cm_handle, 
-					IB_CME_LOCAL_FAILURE, 
+	dapli_cm_destroy(cm_ptr);
+	dapls_ib_reinit_ep(ep_ptr); /* reset QP state */
+	dapl_evd_connection_callback(	NULL /*ep_ptr->cm_handle*/, 
+					event, 
 					NULL, 
 					ep_ptr );
-	return DAT_INTERNAL_ERROR;
 }
 
 
@@ -347,14 +752,12 @@ dapli_socket_listen (	DAPL_IA		*ia_ptr,
 			ia_ptr, serviceID, sp_ptr);
 
 	/* Allocate CM and initialize */
-	if ((cm_ptr = dapl_os_alloc(sizeof(*cm_ptr))) == NULL) 
+	cm_ptr = dapli_cm_create();
+	if (cm_ptr == NULL)
 		return DAT_INSUFFICIENT_RESOURCES;
 
-	(void) dapl_os_memzero( cm_ptr, sizeof( *cm_ptr ) );
-	
-	cm_ptr->socket = cm_ptr->l_socket = -1;
 	cm_ptr->sp = sp_ptr;
-	cm_ptr->hca_ptr = ia_ptr->hca_ptr;
+	cm_ptr->hca = ia_ptr->hca_ptr;
 	
 	/* bind, listen, set sockopt, accept, exchange data */
 	if ((cm_ptr->l_socket = socket(AF_INET, SOCK_STREAM, 0)) < 0) {
@@ -406,12 +809,9 @@ dapli_socket_listen (	DAPL_IA		*ia_ptr,
 	/* set cm_handle for this service point, save listen socket */
 	sp_ptr->cm_srvc_handle = cm_ptr;
 
-	/* add to SP->CR thread list */
-	dapl_llist_init_entry((DAPL_LLIST_ENTRY*)&cm_ptr->entry);
-	dapl_os_lock( &cm_ptr->hca_ptr->ib_trans.lock );
-	dapl_llist_add_tail((DAPL_LLIST_HEAD*)&cm_ptr->hca_ptr->ib_trans.list, 
-			    (DAPL_LLIST_ENTRY*)&cm_ptr->entry, (void*)cm_ptr);
-	dapl_os_unlock(&cm_ptr->hca_ptr->ib_trans.lock);
+	/* queue up listen socket to process inbound CR's */
+	cm_ptr->state = SCM_LISTEN;
+	dapli_cm_queue(cm_ptr);
 
 	dapl_dbg_log( DAPL_DBG_TYPE_CM,
 			" listen: qual 0x%x cr %p s_fd %d\n",
@@ -421,10 +821,7 @@ dapli_socket_listen (	DAPL_IA		*ia_ptr,
 bail:
 	dapl_dbg_log( DAPL_DBG_TYPE_CM,
 			" listen: ERROR on conn_qual 0x%x\n",serviceID); 
-	if ( cm_ptr->l_socket >= 0 )
-		closesocket( cm_ptr->l_socket );
-
-	dapl_os_free( cm_ptr, sizeof( *cm_ptr ) );
+	dapli_cm_destroy(cm_ptr);
 	return dat_status;
 }
 
@@ -441,6 +838,8 @@ dapli_socket_accept(ib_cm_srvc_handle_t cm_ptr)
 	int		len;
 	DAT_RETURN	dat_status = DAT_SUCCESS;
 		
+	dapl_dbg_log(DAPL_DBG_TYPE_EP," socket_accept\n");
+
 	/* Allocate accept CM and initialize */
 	if ((acm_ptr = dapl_os_alloc(sizeof(*acm_ptr))) == NULL) 
 		return DAT_INSUFFICIENT_RESOURCES;
@@ -448,8 +847,9 @@ dapli_socket_accept(ib_cm_srvc_handle_t cm_ptr)
 	(void) dapl_os_memzero( acm_ptr, sizeof( *acm_ptr ) );
 	
 	acm_ptr->socket = -1;
+	acm_ptr->l_socket = -1;
 	acm_ptr->sp = cm_ptr->sp;
-	acm_ptr->hca_ptr = cm_ptr->hca_ptr;
+	acm_ptr->hca = cm_ptr->hca;
 
 	len = sizeof(acm_ptr->dst.ia_address);
 	acm_ptr->socket = accept(cm_ptr->l_socket, 
@@ -464,27 +864,32 @@ dapli_socket_accept(ib_cm_srvc_handle_t cm_ptr)
 		goto bail;
    	}
 
+	dapl_dbg_log(DAPL_DBG_TYPE_EP," socket accepted, read QP data\n"); 
+
 	/* read in DST QP info, IA address. check for private data */
 	len = recv(acm_ptr->socket,(char*)&acm_ptr->dst,sizeof(ib_qp_cm_t),0);
-	if ( len != sizeof(ib_qp_cm_t) ) {
+	if ( len != sizeof(ib_qp_cm_t) || ntohs(acm_ptr->dst.ver) != DSCM_VER )
+	{
 		dapl_dbg_log(DAPL_DBG_TYPE_ERR, 
-			" accept read: ERR %d, rcnt=%d\n",
-			WSAGetLastError(), len); 
+			" accept read: ERR %d, rcnt=%d ver=%d\n",
+			WSAGetLastError(), len, acm_ptr->dst.ver); 
 		dat_status = DAT_INTERNAL_ERROR;
 		goto bail;
 
 	}
-	/* revert back to host byte ordering */
+	/* convert accepted values to host byte ordering */
 	acm_ptr->dst.port = cl_ntoh16(acm_ptr->dst.port);
+	acm_ptr->dst.lid = ntohs(acm_ptr->dst.lid);
+	acm_ptr->dst.qpn = acm_ptr->dst.qpn;
 	acm_ptr->dst.p_size = cl_ntoh32(acm_ptr->dst.p_size);
 
-	dapl_dbg_log(DAPL_DBG_TYPE_EP, " accept: DST sizeof(ib_cm_t) %d qpn %x "
-		"port %d lid 0x%x psize %d IP %s\n",
-		sizeof(ib_qp_cm_t),
-		cl_ntoh32(acm_ptr->dst.qpn), acm_ptr->dst.port,
+	dapl_dbg_log(DAPL_DBG_TYPE_EP, " accept: DST %s port 0x%x "
+		"lid 0x%x qpn 0x%x psize %d\n",
+		dapli_get_ip_addr_str(&acm_ptr->dst.ia_address,NULL),
+		acm_ptr->dst.port,
 		cl_ntoh16(acm_ptr->dst.lid),
-		acm_ptr->dst.p_size,
-		dapli_get_ip_addr_str(&acm_ptr->dst.ia_address,NULL));
+		cl_ntoh32(acm_ptr->dst.qpn),
+		acm_ptr->dst.p_size);
 
 	/* validate private data size before reading */
 	if ( acm_ptr->dst.p_size > IB_MAX_REQ_PDATA_SIZE ) {
@@ -495,6 +900,8 @@ dapli_socket_accept(ib_cm_srvc_handle_t cm_ptr)
 		goto bail;
 	}
 
+	dapl_dbg_log(DAPL_DBG_TYPE_EP," socket accepted, read private data\n");
+
 	/* read private data into cm_handle if any present */
 	if ( acm_ptr->dst.p_size ) {
 		len = recv( acm_ptr->socket, 
@@ -514,6 +921,8 @@ dapli_socket_accept(ib_cm_srvc_handle_t cm_ptr)
 		p_data = acm_ptr->p_data;
 	}
 	
+	acm_ptr->state = SCM_ACCEPTING;
+
 	/* trigger CR event and return SUCCESS */
 	dapls_cr_callback(  acm_ptr,
 			    IB_CME_CONNECTION_REQUEST_PENDING,
@@ -521,153 +930,8 @@ dapli_socket_accept(ib_cm_srvc_handle_t cm_ptr)
 			    acm_ptr->sp );
 
 	return DAT_SUCCESS;
-
-bail:
-	if ( acm_ptr->socket >= 0 )
-		closesocket( acm_ptr->socket );
-
-	dapl_os_free( acm_ptr, sizeof( *acm_ptr ) );
-	return DAT_INTERNAL_ERROR;
-}
-
-
-static DAT_RETURN 
-dapli_socket_accept_final( DAPL_EP		*ep_ptr,
-			   DAPL_CR		*cr_ptr,
-			   DAT_COUNT		p_size,
-		           DAT_PVOID		p_data )
-{
-	DAPL_IA		*ia_ptr = ep_ptr->header.owner_ia;
-	dp_ib_cm_handle_t cm_ptr = cr_ptr->ib_cm_handle;
-	ib_qp_cm_t	qp_cm;
-	WSABUF		iovec[2];
-	int		len, rc;
-	short		rtu_data = 0;
-	ib_api_status_t	ibs;
-	ib_qp_attr_t	qpa;
-	dapl_ibal_port_t *p_port;
-	dapl_ibal_ca_t  *p_ca;
-
-	dapl_dbg_log (DAPL_DBG_TYPE_EP, "%s() p_sz %d sock %d port %d\n",
-			__FUNCTION__,p_size,cm_ptr->socket,
-			ia_ptr->hca_ptr->port_num);
-
-	if (p_size >  IB_MAX_REP_PDATA_SIZE) 
-		return DAT_LENGTH_ERROR;
-
-	/* must have a accepted socket */
-	if ( cm_ptr->socket < 0 )
-		return DAT_INTERNAL_ERROR;
-	
-	/* modify QP to RTR and then to RTS with remote info already read */
-
-	p_ca = (dapl_ibal_ca_t *) ia_ptr->hca_ptr->ib_hca_handle;
-	p_port = dapli_ibal_get_port (p_ca, (uint8_t)ia_ptr->hca_ptr->port_num);
-	if (!p_port)
-	{
-		dapl_dbg_log(DAPL_DBG_TYPE_ERR,
-			"%s() dapli_ibal_get_port() failed @ line #%d\n",
-			__FUNCTION__,__LINE__);
-		goto bail;
-	}
-
-	dapl_dbg_log(DAPL_DBG_TYPE_EP, "%s() DST: qpn %x port %d lid %x\n",
-			__FUNCTION__,
-			cl_ntoh32(cm_ptr->dst.qpn),
-			cm_ptr->dst.port,
-			cl_ntoh16(cm_ptr->dst.lid));
-
-	/* modify QP to RTR and then to RTS with remote info */
-
-	ibs = dapls_modify_qp_state_to_rtr( ep_ptr->qp_handle, 
-					    cm_ptr->dst.qpn,
-					    cm_ptr->dst.lid,
-					    p_port );
-	if (ibs != IB_SUCCESS)
-	{
-		dapl_dbg_log(DAPL_DBG_TYPE_ERR,
-				"%s() QP --> RTR failed @ line #%d\n",
-				__FUNCTION__,__LINE__);
-		goto bail;
-	}
-
-	if ( dapls_modify_qp_state_to_rts( ep_ptr->qp_handle ) )
-	{
-		dapl_dbg_log(DAPL_DBG_TYPE_ERR,
-				"%s() QP --> RTS failed @ line #%d\n",
-				__FUNCTION__,__LINE__);
-		goto bail;
-	}
-
-	ep_ptr->qp_state = IB_QP_STATE_RTS;
-	
-	/* determine QP & port numbers */
-	ibs = ib_query_qp(ep_ptr->qp_handle, &qpa);
-	if (ibs != IB_SUCCESS)
-	{
-		dapl_dbg_log(DAPL_DBG_TYPE_ERR, 
-			     " ib_query_qp() ERR %s\n", ib_get_err_str(ibs)); 
-		goto bail;
-	}
-
-	/* Send QP info, IA address, and private data */
-	qp_cm.qpn = qpa.num; /* ib_net32_t */
-	qp_cm.port = ia_ptr->hca_ptr->port_num;
-	qp_cm.lid = dapli_get_lid( ia_ptr->hca_ptr, ia_ptr->hca_ptr->port_num );
-	qp_cm.ia_address = ia_ptr->hca_ptr->hca_address;
-	qp_cm.p_size = p_size;
-
-	dapl_dbg_log(DAPL_DBG_TYPE_CM,
-		"%s()\n  Tx QP info: qpn %x port %d lid 0x%x p_sz %d IP %s\n",
-		__FUNCTION__, cl_ntoh32(qp_cm.qpn), qp_cm.port,
-		cl_ntoh16(qp_cm.lid), qp_cm.p_size,
-		dapli_get_ip_addr_str(&qp_cm.ia_address,NULL));
-
-	/* network byte-ordering - QPN & LID already are */
-	qp_cm.p_size = cl_hton32(qp_cm.p_size);
-	qp_cm.port = cl_hton16(qp_cm.port);
-
-	iovec[0].buf = (char*)&qp_cm;
-	iovec[0].len  = sizeof(ib_qp_cm_t);
-	if (p_size) {
-		iovec[1].buf = p_data;
-		iovec[1].len  = p_size;
-	}
-	rc = WSASend( cm_ptr->socket, iovec, (p_size ? 2:1), &len, 0, 0, 0 );
-    	if (rc || len != (p_size + sizeof(ib_qp_cm_t))) {
-		dapl_dbg_log(DAPL_DBG_TYPE_ERR, 
-			     " accept_final: ERR %d, wcnt=%d\n",
-			     WSAGetLastError(), len); 
-		goto bail;
-	}
-	dapl_dbg_log(DAPL_DBG_TYPE_EP, 
-		     " accept_final: SRC qpn %x port %d lid 0x%x psize %d\n",
-		     qp_cm.qpn, qp_cm.port, qp_cm.lid, qp_cm.p_size ); 
-	
-	/* complete handshake after final QP state change */
-	len = recv(cm_ptr->socket, (char*)&rtu_data, sizeof(rtu_data), 0);
-	if ( len != sizeof(rtu_data) || ntohs(rtu_data) != 0x0e0f ) {
-		dapl_dbg_log(DAPL_DBG_TYPE_ERR, 
-			     " accept_final: ERR %d, rcnt=%d rdata=%x\n",
-			     WSAGetLastError(), len, ntohs(rtu_data) ); 
-		goto bail;
-	}
-
-	/* final data exchange if remote QP state is good to go */
-	dapl_dbg_log( DAPL_DBG_TYPE_EP," PASSIVE: connected!\n" ); 
-
-	dapls_cr_callback ( cm_ptr, IB_CME_CONNECTED, NULL, cm_ptr->sp );
-
-	return DAT_SUCCESS;
-
 bail:
-	dapl_dbg_log( DAPL_DBG_TYPE_ERR," accept_final: ERR !QP_RTR_RTS \n"); 
-	if ( cm_ptr->socket >= 0 )
-		closesocket( cm_ptr->socket );
-
-	dapl_os_free( cm_ptr, sizeof( *cm_ptr ) );
-	dapls_ib_reinit_ep( ep_ptr ); /* reset QP state */
-
+	dapli_cm_destroy(acm_ptr);
 	return DAT_INTERNAL_ERROR;
 }
 
@@ -747,11 +1011,7 @@ dapls_ib_disconnect (
 	dapl_dbg_log (DAPL_DBG_TYPE_EP,
 			"dapls_ib_disconnect(ep_handle %p ....)\n", ep_ptr);
 
-	if ( cm_ptr->socket >= 0 ) {
-		closesocket( cm_ptr->socket );
-		cm_ptr->socket = -1;
-	}
-	
+#if 0 // XXX
 	/* disconnect QP ala transition to RESET state */
 	ib_status = dapls_modify_qp_state_to_reset (ep_ptr->qp_handle);
 
@@ -776,15 +1036,18 @@ dapls_ib_disconnect (
 						NULL,
 						ep_ptr );
 		ep_ptr->cm_handle = NULL;
-		dapl_os_free( cm_ptr, sizeof( *cm_ptr ) );
 	}	
-
+#endif
 	/* modify QP state --> INIT */
 	dapls_ib_reinit_ep(ep_ptr);
 
+	if (cm_ptr == NULL)
 	return DAT_SUCCESS;
+	else
+		return dapli_socket_disconnect(cm_ptr);
 }
 
+
 /*
  * dapls_ib_disconnect_clean
  *
@@ -874,14 +1137,20 @@ dapls_ib_remove_conn_listener (
 	if ( cm_ptr != NULL ) {
 		if ( cm_ptr->l_socket >= 0 ) {
 			closesocket( cm_ptr->l_socket );
+			cm_ptr->l_socket = -1;
+		}
+		if ( cm_ptr->socket >= 0 ) {
+			closesocket( cm_ptr->socket );
 			cm_ptr->socket = -1;
 		}
 	    	/* cr_thread will free */
 		sp_ptr->cm_srvc_handle = NULL;
+		_write(g_scm_pipe[1], "w", sizeof "w");
 	}
 	return DAT_SUCCESS;
 }
 
+
 /*
  * dapls_ib_accept_connection
  *
@@ -928,7 +1197,7 @@ dapls_ib_accept_connection (
     			return status;
 	}
     
-	return ( dapli_socket_accept_final(ep_ptr, cr_ptr, p_size, p_data) );
+	return ( dapli_socket_accept_usr(ep_ptr, cr_ptr, p_size, p_data) );
 }
 
 
@@ -948,27 +1217,39 @@ dapls_ib_accept_connection (
  *	DAT_INTERNAL_ERROR
  *
  */
+
 DAT_RETURN
 dapls_ib_reject_connection (
-	IN  dp_ib_cm_handle_t	ib_cm_handle,
+	IN  dp_ib_cm_handle_t	cm_ptr,
 	IN  int			reject_reason,
-	IN  DAT_COUNT 		private_data_size,
-	IN  const DAT_PVOID 	private_data)
+	IN  DAT_COUNT 		psize,
+	IN  const DAT_PVOID 	pdata)
 {
-    	ib_cm_srvc_handle_t	cm_ptr = ib_cm_handle;
+	WSABUF	iovec[1];
+	int	len;
 
 	dapl_dbg_log (DAPL_DBG_TYPE_EP,
-		      "dapls_ib_reject_connection(cm_handle %p reason %x)\n",
-		      ib_cm_handle, reject_reason );
-
-	/* just close the socket and return */
-	if ( cm_ptr->socket > 0 ) {
-		closesocket( cm_ptr->socket );
+		      " reject(cm %p reason %x pdata %p psize %d)\n",
+		      cm_ptr, reject_reason, pdata, psize );
+
+	/* write reject data to indicate reject */
+	if (cm_ptr->socket >= 0) {
+		cm_ptr->dst.rej = (uint16_t)reject_reason;
+		cm_ptr->dst.rej = cl_hton16(cm_ptr->dst.rej);
+		iovec[0].buf = (char*)&cm_ptr->dst;
+		iovec[0].len  = sizeof(ib_qp_cm_t);
+		(void) WSASend (cm_ptr->socket, iovec, 1, &len, 0, 0, NULL);
+		closesocket(cm_ptr->socket);
 		cm_ptr->socket = -1;
 	}
+
+	/* cr_thread will destroy CR */
+	cm_ptr->state = SCM_REJECTED;
+        _write(g_scm_pipe[1], "w", sizeof "w");
 	return DAT_SUCCESS;
 }
 
+
 /*
  * dapls_ib_cm_remote_addr
  *
@@ -1157,7 +1438,7 @@ dapls_ib_get_dat_event (
 
 
 /*
- * dapls_ib_get_dat_event
+ * dapls_ib_get_cm_event
  *
  * Return a DAT connection event given a provider CM event.
  * 
@@ -1189,12 +1470,16 @@ dapls_ib_get_cm_event (
 }
 #endif /* NOT_USED */
 
-/* async CR processing thread to avoid blocking applications */
+/* outbound/inbound CR processing thread to avoid blocking applications */
+
+#define SCM_MAX_CONN (8 * sizeof(fd_set))
+
 void cr_thread(void *arg) 
 {
     struct dapl_hca	*hca_ptr = arg;
     ib_cm_srvc_handle_t	cr, next_cr;
     int			max_fd, rc;
+    char		rbuf[2];
     fd_set		rfd, rfds;
     struct timeval	to;
      
@@ -1202,10 +1487,12 @@ void cr_thread(void *arg)
 
     dapl_os_lock( &hca_ptr->ib_trans.lock );
     hca_ptr->ib_trans.cr_state = IB_THREAD_RUN;
+
     while (hca_ptr->ib_trans.cr_state == IB_THREAD_RUN) {
 	
 	FD_ZERO( &rfds ); 
-	max_fd = -1;
+	FD_SET(g_scm_pipe[0], &rfds);
+	max_fd = g_scm_pipe[0];
 	
 	if (!dapl_llist_is_empty((DAPL_LLIST_HEAD*)&hca_ptr->ib_trans.list))
             next_cr = dapl_llist_peek_head((DAPL_LLIST_HEAD*)
@@ -1230,32 +1517,46 @@ void cr_thread(void *arg)
 		continue;
 	    }
 	          
-	    FD_SET( cr->l_socket, &rfds ); /* add to select set */
-	    if ( cr->l_socket > max_fd )
+	    if (cr->socket > SCM_MAX_CONN-1) {
+		dapl_dbg_log(DAPL_DBG_TYPE_ERR, 
+			     "SCM ERR: cr->socket(%d) exceeded FD_SETSIZE %d\n",
+				cr->socket,SCM_MAX_CONN-1);
+		continue;
+	    }
+	    FD_SET( cr->socket, &rfds ); /* add to select SET */
+	    if ( cr->socket > max_fd )
 		max_fd = cr->l_socket;
 
 	    /* individual select poll to check for work */
 	    FD_ZERO(&rfd);
-	    FD_SET(cr->l_socket, &rfd);
+	    FD_SET(cr->socket, &rfd);
 	    dapl_os_unlock(&hca_ptr->ib_trans.lock);	
 
 	    to.tv_sec  = 0;
 	    to.tv_usec = 0; /* wakeup and check destroy */
 
 	    /* block waiting for Rx data */
-	    if (select(cr->l_socket+1,&rfd,NULL,NULL,&to) == SOCKET_ERROR) {
+	    if (select(cr->socket+1,&rfd,NULL,NULL,&to) == SOCKET_ERROR) {
 		rc = WSAGetLastError();
 		if ( rc != SOCKET_ERROR /*WSAENOTSOCK*/ )
 		{
 		    dapl_dbg_log (DAPL_DBG_TYPE_ERR/*CM*/,
 				" thread: select(sock %d) ERR %d on cr %p\n",
-				cr->l_socket, rc, cr);
+				cr->socket, rc, cr);
+		}
+		closesocket(cr->socket);
+		cr->socket = -1;
+	    } else if (FD_ISSET(cr->socket,&rfd)) {
+		if (cr->socket > 0) {
+			if (cr->state == SCM_LISTEN) 
+				dapli_socket_accept(cr);
+			else if (cr->state == SCM_ACCEPTED)
+				dapli_socket_accept_rtu(cr);
+			else if (cr->state == SCM_CONN_PENDING)
+				dapli_socket_connect_rtu(cr);
+			else if (cr->state == SCM_CONNECTED)
+				dapli_socket_disconnect(cr);
 		}
-		closesocket(cr->l_socket);
-		cr->l_socket = -1;
-	    } else if (FD_ISSET(cr->l_socket,&rfd) && dapli_socket_accept(cr)) {
-		closesocket(cr->l_socket);
-		cr->l_socket = -1;
 	    }
 	    dapl_os_lock( &hca_ptr->ib_trans.lock );
 	    next_cr =  dapl_llist_next_entry((DAPL_LLIST_HEAD*)
@@ -1263,9 +1564,19 @@ void cr_thread(void *arg)
 					     (DAPL_LLIST_ENTRY*)&cr->entry );
 	}
 	dapl_os_unlock( &hca_ptr->ib_trans.lock );
+
 	to.tv_sec  = 0;
 	to.tv_usec = 100000; /* wakeup and check destroy */
+
 	(void) select(max_fd+1, &rfds, NULL, NULL, &to);
+
+	/* if pipe data consume - used to wake this thread up */
+	if (FD_ISSET(g_scm_pipe[0],&rfds)) {
+    		dapl_dbg_log(DAPL_DBG_TYPE_CM," cr_thread() read pipe data\n");
+printf(" cr_thread() read pipe data\n");
+		_read(g_scm_pipe[0], rbuf, 2);
+printf(" cr_thread() Finished read pipe data\n");
+	}
 	dapl_os_lock( &hca_ptr->ib_trans.lock );
     }
     dapl_os_unlock( &hca_ptr->ib_trans.lock );	
diff --git a/dapl/ibal-scm/dapl_ibal-scm_util.c b/dapl/ibal-scm/dapl_ibal-scm_util.c
index 8e5f8ac..06bc704 100644
--- a/dapl/ibal-scm/dapl_ibal-scm_util.c
+++ b/dapl/ibal-scm/dapl_ibal-scm_util.c
@@ -52,6 +52,7 @@ static const char rcsid[] = "$Id:  $";
 #include "dapl.h"
 #include "dapl_adapter_util.h"
 #include "dapl_ibal_util.h"
+#include "dapl_ibal_name_service.h"
 
 #include <stdio.h>
 #include <stdlib.h>
@@ -61,9 +62,12 @@ static const char rcsid[] = "$Id:  $";
 #include <winsock2.h>
 #include <ws2tcpip.h>
 #include <io.h>
+#include <fcntl.h>
 
+extern void cr_thread(void *arg);
 
 int g_dapl_loopback_connection = 0;
+int g_scm_pipe[2];
 
 #ifdef NOT_USED
 
@@ -132,22 +136,55 @@ DAT_RETURN dapli_init_sock_cm ( IN DAPL_HCA  *hca_ptr )
 
 	dapl_dbg_log (DAPL_DBG_TYPE_UTIL, " %s(): %p\n",__FUNCTION__,hca_ptr );
 
-	/* set inline max with enviroment or default */
+	/* set RC tunables via enviroment or default */
 	hca_ptr->ib_trans.max_inline_send = 
-		dapl_os_get_env_val ( "DAPL_MAX_INLINE", INLINE_SEND_DEFAULT );
+		dapl_os_get_env_val("DAPL_MAX_INLINE", INLINE_SEND_DEFAULT);
+#if 0
+	hca_ptr->ib_trans.ack_retry = 
+		dapl_os_get_env_val("DAPL_ACK_RETRY", SCM_ACK_RETRY);
+	hca_ptr->ib_trans.ack_timer =
+		dapl_os_get_env_val("DAPL_ACK_TIMER", SCM_ACK_TIMER);
+	hca_ptr->ib_trans.rnr_retry = 
+		dapl_os_get_env_val("DAPL_RNR_RETRY", SCM_RNR_RETRY);
+	hca_ptr->ib_trans.rnr_timer = 
+		dapl_os_get_env_val("DAPL_RNR_TIMER", SCM_RNR_TIMER);
+	hca_ptr->ib_trans.global =
+		dapl_os_get_env_val("DAPL_GLOBAL_ROUTING", SCM_GLOBAL);
+	hca_ptr->ib_trans.hop_limit =
+		dapl_os_get_env_val("DAPL_HOP_LIMIT", SCM_HOP_LIMIT);
+	hca_ptr->ib_trans.tclass =
+		dapl_os_get_env_val("DAPL_TCLASS", SCM_TCLASS);
+#endif
 
 	/* initialize cr_list lock */
 	dat_status = dapl_os_lock_init(&hca_ptr->ib_trans.lock);
 	if (dat_status != DAT_SUCCESS)
 	{
 		dapl_dbg_log (DAPL_DBG_TYPE_ERR, 
-				" open_hca: failed to init lock\n");
+			"%s() failed to init cr_list lock\n", __FUNCTION__);
 		return DAT_INTERNAL_ERROR;
 	}
 
+#if 0
+	/* initialize cq_lock */
+	dat_status = dapl_os_lock_init(&hca_ptr->ib_trans.cq_lock);
+	if (dat_status != DAT_SUCCESS) {
+		dapl_log(DAPL_DBG_TYPE_ERR, 
+			 "%s() failed to init cq_lock\n", __FUNCTION__);
+		return DAT_INTERNAL_ERROR;
+	}
+#endif
+
 	/* initialize CM list for listens on this HCA */
 	dapl_llist_init_head((DAPL_LLIST_HEAD*)&hca_ptr->ib_trans.list);
 
+	/* create pipe communication endpoints */
+	if (_pipe(g_scm_pipe, 256, O_TEXT)) {
+		dapl_dbg_log (DAPL_DBG_TYPE_ERR, 
+			"%s() failed to create thread\n", __FUNCTION__);
+		return DAT_INTERNAL_ERROR;
+	}
+
 	/* create thread to process inbound connect request */
 	hca_ptr->ib_trans.cr_state = IB_THREAD_INIT;
 	dat_status = dapl_os_thread_create(cr_thread, 
@@ -199,6 +236,7 @@ DAT_RETURN dapli_close_sock_cm ( IN DAPL_HCA  *hca_ptr )
 
 	/* destroy cr_thread and lock */
 	hca_ptr->ib_trans.cr_state = IB_THREAD_CANCEL;
+
 	while (hca_ptr->ib_trans.cr_state != IB_THREAD_EXIT) {
 		dapl_dbg_log(DAPL_DBG_TYPE_UTIL, 
 			     " close_hca: waiting for cr_thread\n");






More information about the general mailing list