[ofa-general] [PATCH 5/5] [DAPL] dapl/ibal-scm: update ibal-scm provider
Sean Hefty
sean.hefty at intel.com
Fri Jan 30 10:59:17 PST 2009
From: Stan Smith <stan.smith at intel.com>
Update the dapl.git tree with the latest SVN version of the
ibal-scm provider.
Signed-off-by: Sean Hefty <sean.hefty at intel.com>
---
Actual codng changes were made by Stan. I'm just submitting the patch
to update the DAPL git repository.
dapl/ibal-scm/dapl_ibal-scm_cm.c | 775 +++++++++++++++++++++++++-----------
dapl/ibal-scm/dapl_ibal-scm_util.c | 44 ++
2 files changed, 584 insertions(+), 235 deletions(-)
diff --git a/dapl/ibal-scm/dapl_ibal-scm_cm.c b/dapl/ibal-scm/dapl_ibal-scm_cm.c
index df83008..6a050b8 100644
--- a/dapl/ibal-scm/dapl_ibal-scm_cm.c
+++ b/dapl/ibal-scm/dapl_ibal-scm_cm.c
@@ -63,6 +63,88 @@
#include <ws2tcpip.h>
#include <io.h>
+extern int g_scm_pipe[2];
+
+extern DAT_RETURN
+dapls_ib_query_gid( IN DAPL_HCA *hca_ptr,
+ IN GID *gid );
+
+
+static struct ib_cm_handle * dapli_cm_create(void)
+{
+ struct ib_cm_handle *cm_ptr;
+
+ /* Allocate CM, init lock, and initialize */
+ if ((cm_ptr = dapl_os_alloc(sizeof(*cm_ptr))) == NULL)
+ return NULL;
+
+ if (dapl_os_lock_init(&cm_ptr->lock))
+ goto bail;
+
+ (void)dapl_os_memzero(cm_ptr, sizeof(*cm_ptr));
+ cm_ptr->dst.ver = htons(DSCM_VER);
+ cm_ptr->socket = -1;
+ cm_ptr->l_socket = -1;
+ return cm_ptr;
+bail:
+ dapl_os_free(cm_ptr, sizeof(*cm_ptr));
+ return NULL;
+}
+
+
+/* mark for destroy, remove all references, schedule cleanup */
+
+static void dapli_cm_destroy(struct ib_cm_handle *cm_ptr)
+{
+ dapl_dbg_log(DAPL_DBG_TYPE_CM,
+ " cm_destroy: cm %p ep %p\n", cm_ptr,cm_ptr->ep);
+
+ /* cleanup, never made it to work queue */
+ if (cm_ptr->state == SCM_INIT) {
+ if (cm_ptr->socket >= 0)
+ closesocket(cm_ptr->socket);
+ if (cm_ptr->l_socket >= 0)
+ closesocket(cm_ptr->l_socket);
+ dapl_os_free(cm_ptr, sizeof(*cm_ptr));
+ return;
+ }
+
+ dapl_os_lock(&cm_ptr->lock);
+ cm_ptr->state = SCM_DESTROY;
+ if (cm_ptr->ep)
+ cm_ptr->ep->cm_handle = IB_INVALID_HANDLE;
+
+ /* close socket if still active */
+ if (cm_ptr->socket >= 0) {
+ closesocket(cm_ptr->socket);
+ cm_ptr->socket = -1;
+ }
+ if (cm_ptr->l_socket >= 0) {
+ closesocket(cm_ptr->l_socket);
+ cm_ptr->l_socket = -1;
+ }
+ dapl_os_unlock(&cm_ptr->lock);
+
+ /* wakeup work thread */
+ _write(g_scm_pipe[1], "w", sizeof "w");
+}
+
+
+/* queue socket for processing CM work */
+static void dapli_cm_queue(struct ib_cm_handle *cm_ptr)
+{
+ /* add to work queue for cr thread processing */
+ dapl_llist_init_entry((DAPL_LLIST_ENTRY*)&cm_ptr->entry);
+ dapl_os_lock( &cm_ptr->hca->ib_trans.lock );
+ dapl_llist_add_tail((DAPL_LLIST_HEAD*)&cm_ptr->hca->ib_trans.list,
+ (DAPL_LLIST_ENTRY*)&cm_ptr->entry, (void*)cm_ptr);
+ dapl_os_unlock(&cm_ptr->hca->ib_trans.lock);
+
+ /* wakeup CM work thread */
+ _write(g_scm_pipe[1], "w", sizeof "w");
+}
+
+
static uint16_t
dapli_get_lid(IN DAPL_HCA *hca, IN int port)
@@ -123,6 +205,263 @@ dapli_get_lid(IN DAPL_HCA *hca, IN int port)
/*
+ * ACTIVE/PASSIVE: called from CR thread or consumer via ep_disconnect
+ */
+static DAT_RETURN
+dapli_socket_disconnect(dp_ib_cm_handle_t cm_ptr)
+{
+ DAPL_EP *ep_ptr = cm_ptr->ep;
+ DAT_UINT32 disc_data = htonl(0xdead);
+
+ if (ep_ptr == NULL)
+ return DAT_SUCCESS;
+
+ dapl_os_lock(&cm_ptr->lock);
+ if ((cm_ptr->state == SCM_INIT) ||
+ (cm_ptr->state == SCM_DISCONNECTED)) {
+ dapl_os_unlock(&cm_ptr->lock);
+ return DAT_SUCCESS;
+ } else {
+ /* send disc date, close socket, schedule destroy */
+ if (cm_ptr->socket >= 0) {
+ send(cm_ptr->socket, (const char *)&disc_data,
+ sizeof(disc_data), 0);
+ closesocket(cm_ptr->socket);
+ cm_ptr->socket = -1;
+ }
+ cm_ptr->state = SCM_DISCONNECTED;
+ _write(g_scm_pipe[1], "w", sizeof "w");
+ }
+ dapl_os_unlock(&cm_ptr->lock);
+
+
+ if (ep_ptr->cr_ptr) {
+ dapls_cr_callback(cm_ptr,
+ IB_CME_DISCONNECTED,
+ NULL,
+ ((DAPL_CR *)ep_ptr->cr_ptr)->sp_ptr);
+ } else {
+ dapl_evd_connection_callback(ep_ptr->cm_handle,
+ IB_CME_DISCONNECTED,
+ NULL,
+ ep_ptr);
+ }
+
+ /* remove reference from endpoint */
+ ep_ptr->cm_handle = NULL;
+
+ /* schedule destroy */
+
+
+ return DAT_SUCCESS;
+}
+
+
+
+/*
+ * PASSIVE: consumer accept, send local QP information, private data,
+ * queue on work thread to receive RTU information to avoid blocking
+ * user thread.
+ */
+static DAT_RETURN
+dapli_socket_accept_usr( DAPL_EP *ep_ptr,
+ DAPL_CR *cr_ptr,
+ DAT_COUNT p_size,
+ DAT_PVOID p_data )
+{
+ DAPL_IA *ia_ptr = ep_ptr->header.owner_ia;
+ dp_ib_cm_handle_t cm_ptr = cr_ptr->ib_cm_handle;
+ WSABUF iovec[2];
+ int len, rc;
+ short rtu_data = 0;
+ ib_api_status_t ibs;
+ ib_qp_attr_t qpa;
+ dapl_ibal_port_t *p_port;
+ dapl_ibal_ca_t *p_ca;
+
+ dapl_dbg_log (DAPL_DBG_TYPE_EP, "%s() p_sz %d sock %d port 0x%x\n",
+ __FUNCTION__,p_size,cm_ptr->socket,
+ ia_ptr->hca_ptr->port_num);
+
+ if (p_size > IB_MAX_REP_PDATA_SIZE)
+ return DAT_LENGTH_ERROR;
+
+ /* must have a accepted socket */
+ if ( cm_ptr->socket < 0 ) {
+ dapl_dbg_log(DAPL_DBG_TYPE_EP,
+ "%s() Not accepted socket? remote port=0x%x lid=0x%x"
+ " qpn=0x%x psize=%d\n",
+ cm_ptr->dst.port, cm_ptr->dst.lid,
+ ntohs(cm_ptr->dst.qpn), cm_ptr->dst.p_size);
+ return DAT_INTERNAL_ERROR;
+ }
+
+ dapl_dbg_log(DAPL_DBG_TYPE_EP,
+ " accept_usr: remote port=0x%x lid=0x%x"
+ " qpn=0x%x psize=%d\n",
+ cm_ptr->dst.port, cm_ptr->dst.lid,
+ ntohs(cm_ptr->dst.qpn), cm_ptr->dst.p_size);
+
+ /* modify QP to RTR and then to RTS with remote info already read */
+
+ p_ca = (dapl_ibal_ca_t *) ia_ptr->hca_ptr->ib_hca_handle;
+ p_port = dapli_ibal_get_port (p_ca, (uint8_t)ia_ptr->hca_ptr->port_num);
+ if (!p_port)
+ {
+ dapl_dbg_log(DAPL_DBG_TYPE_ERR,
+ "%s() dapli_ibal_get_port() failed @ line #%d\n",
+ __FUNCTION__,__LINE__);
+ goto bail;
+ }
+
+ dapl_dbg_log(DAPL_DBG_TYPE_EP,
+ "%s() DST: qpn 0x%x port 0x%x lid %x psize %d\n",
+ __FUNCTION__,
+ cl_ntoh32(cm_ptr->dst.qpn),
+ cm_ptr->dst.port,
+ cl_ntoh16(cm_ptr->dst.lid), cm_ptr->dst.p_size);
+
+ /* modify QP to RTR and then to RTS with remote info */
+
+ ibs = dapls_modify_qp_state_to_rtr( ep_ptr->qp_handle,
+ cm_ptr->dst.qpn,
+ cm_ptr->dst.lid,
+ p_port );
+ if (ibs != IB_SUCCESS)
+ {
+ dapl_dbg_log(DAPL_DBG_TYPE_ERR,
+ "%s() QP --> RTR failed @ line #%d\n",
+ __FUNCTION__,__LINE__);
+ goto bail;
+ }
+
+ if ( dapls_modify_qp_state_to_rts( ep_ptr->qp_handle ) )
+ {
+ dapl_dbg_log(DAPL_DBG_TYPE_ERR,
+ "%s() QP --> RTS failed @ line #%d\n",
+ __FUNCTION__,__LINE__);
+ goto bail;
+ }
+
+ ep_ptr->qp_state = IB_QP_STATE_RTS;
+
+ /* save remote address information */
+ dapl_os_memcpy( &ep_ptr->remote_ia_address,
+ &cm_ptr->dst.ia_address,
+ sizeof(ep_ptr->remote_ia_address));
+
+ /* determine QP & port numbers */
+ ibs = ib_query_qp(ep_ptr->qp_handle, &qpa);
+ if (ibs != IB_SUCCESS)
+ {
+ dapl_dbg_log(DAPL_DBG_TYPE_ERR,
+ " ib_query_qp() ERR %s\n", ib_get_err_str(ibs));
+ goto bail;
+ }
+
+ /* Send our QP info, IA address, and private data */
+ cm_ptr->dst.qpn = qpa.num; /* ib_net32_t */
+ cm_ptr->dst.port = ia_ptr->hca_ptr->port_num;
+ cm_ptr->dst.lid = dapli_get_lid(ia_ptr->hca_ptr, ia_ptr->hca_ptr->port_num);
+ /* set gid in network order */
+ ibs = dapls_ib_query_gid( ia_ptr->hca_ptr, &cm_ptr->dst.gid );
+ if ( ibs != IB_SUCCESS )
+ {
+ dapl_dbg_log(DAPL_DBG_TYPE_ERR,
+ "%s() dapls_ib_query_gid() returns '%s'\n",
+ __FUNCTION__,ib_get_err_str(ibs));
+ goto bail;
+ }
+
+ cm_ptr->dst.ia_address = ia_ptr->hca_ptr->hca_address;
+ cm_ptr->dst.p_size = p_size;
+
+ dapl_dbg_log(DAPL_DBG_TYPE_CM,
+ "%s()\n Tx QP info: qpn %x port 0x%x lid 0x%x p_sz %d IP %s\n",
+ __FUNCTION__, cl_ntoh32(cm_ptr->dst.qpn), cm_ptr->dst.port,
+ cl_ntoh16(cm_ptr->dst.lid), cm_ptr->dst.p_size,
+ dapli_get_ip_addr_str(&cm_ptr->dst.ia_address,NULL));
+
+ /* network byte-ordering - QPN & LID already are */
+ cm_ptr->dst.p_size = cl_hton32(cm_ptr->dst.p_size);
+ cm_ptr->dst.port = cl_hton16(cm_ptr->dst.port);
+
+ iovec[0].buf = (char*)&cm_ptr->dst;
+ iovec[0].len = sizeof(ib_qp_cm_t);
+ if (p_size) {
+ iovec[1].buf = p_data;
+ iovec[1].len = p_size;
+ }
+ rc = WSASend( cm_ptr->socket, iovec, (p_size ? 2:1), &len, 0, 0, 0 );
+ if (rc || len != (p_size + sizeof(ib_qp_cm_t))) {
+ dapl_dbg_log(DAPL_DBG_TYPE_ERR,
+ " accept_usr: ERR %d, wcnt=%d\n",
+ WSAGetLastError(), len);
+ goto bail;
+ }
+ dapl_dbg_log(DAPL_DBG_TYPE_CM,
+ " accept_usr: local port=0x%x lid=0x%x"
+ " qpn=0x%x psize=%d\n",
+ ntohs(cm_ptr->dst.port), ntohs(cm_ptr->dst.lid),
+ ntohl(cm_ptr->dst.qpn), ntohl(cm_ptr->dst.p_size));
+
+ /* save state and reference to EP, queue for RTU data */
+ cm_ptr->ep = ep_ptr;
+ cm_ptr->hca = ia_ptr->hca_ptr;
+ cm_ptr->state = SCM_ACCEPTED;
+
+ /* restore remote address information for query */
+ dapl_os_memcpy( &cm_ptr->dst.ia_address,
+ &ep_ptr->remote_ia_address,
+ sizeof(cm_ptr->dst.ia_address));
+
+ dapl_dbg_log( DAPL_DBG_TYPE_EP," PASSIVE: accepted!\n" );
+ dapli_cm_queue(cm_ptr);
+
+ return DAT_SUCCESS;
+
+bail:
+ dapl_dbg_log( DAPL_DBG_TYPE_ERR," accept_usr: ERR !QP_RTR_RTS \n");
+ dapli_cm_destroy(cm_ptr);
+ dapls_ib_reinit_ep( ep_ptr ); /* reset QP state */
+
+ return DAT_INTERNAL_ERROR;
+}
+
+
+/*
+ * PASSIVE: read RTU from active peer, post CONN event
+ */
+void
+dapli_socket_accept_rtu(dp_ib_cm_handle_t cm_ptr)
+{
+ int len;
+ short rtu_data = 0;
+
+ /* complete handshake after final QP state change */
+ len = recv(cm_ptr->socket, (char*)&rtu_data, sizeof(rtu_data), 0);
+ if ( len != sizeof(rtu_data) || ntohs(rtu_data) != 0x0e0f ) {
+ dapl_dbg_log(DAPL_DBG_TYPE_ERR,
+ " accept_rtu: ERR %d, rcnt=%d rdata=%x\n",
+ WSAGetLastError(), len, ntohs(rtu_data) );
+ goto bail;
+ }
+
+ /* save state and reference to EP, queue for disc event */
+ cm_ptr->state = SCM_CONNECTED;
+
+ /* final data exchange if remote QP state is good to go */
+ dapl_dbg_log( DAPL_DBG_TYPE_EP," PASSIVE: connected!\n" );
+ dapls_cr_callback(cm_ptr, IB_CME_CONNECTED, NULL, cm_ptr->sp);
+ return;
+bail:
+ dapls_ib_reinit_ep(cm_ptr->ep); /* reset QP state */
+ dapli_cm_destroy(cm_ptr);
+ dapls_cr_callback(cm_ptr, IB_CME_DESTINATION_REJECT, NULL, cm_ptr->sp);
+}
+
+
+/*
* ACTIVE: Create socket, connect, and exchange QP information
*/
static DAT_RETURN
@@ -143,21 +482,16 @@ dapli_socket_connect ( DAPL_EP *ep_ptr,
dapl_ibal_port_t *p_port;
dapl_ibal_ca_t *p_ca;
- dapl_dbg_log(DAPL_DBG_TYPE_EP, " connect: r_qual %d\n", r_qual);
+ dapl_dbg_log(DAPL_DBG_TYPE_EP, " connect: r_qual %d psize %d\n",
+ r_qual, p_size);
- /*
- * Allocate CM and initialize
- */
- if ((cm_ptr = dapl_os_alloc(sizeof(*cm_ptr))) == NULL ) {
+ cm_ptr = dapli_cm_create();
+ if (cm_ptr == NULL)
return DAT_INSUFFICIENT_RESOURCES;
- }
-
- (void) dapl_os_memzero( cm_ptr, sizeof(*cm_ptr) );
- cm_ptr->socket = -1;
/* create, connect, sockopt, and exchange QP information */
if ((cm_ptr->socket = socket(AF_INET,SOCK_STREAM,0)) < 0 ) {
- dapl_os_free( cm_ptr, sizeof( *cm_ptr ) );
+ dapli_cm_destroy(cm_ptr);
return DAT_INSUFFICIENT_RESOURCES;
}
@@ -166,7 +500,7 @@ dapli_socket_connect ( DAPL_EP *ep_ptr,
if (connect(cm_ptr->socket, r_addr, sizeof(*r_addr)) == SOCKET_ERROR) {
dapl_dbg_log(DAPL_DBG_TYPE_ERR, " connect: %d on r_qual %d\n",
WSAGetLastError(), (unsigned int)r_qual);
- dapl_os_free( cm_ptr, sizeof( *cm_ptr ) );
+ dapli_cm_destroy(cm_ptr);
return DAT_INVALID_ADDRESS;
}
@@ -175,6 +509,8 @@ dapli_socket_connect ( DAPL_EP *ep_ptr,
(const char*)&opt,
sizeof(opt) );
+ dapl_dbg_log(DAPL_DBG_TYPE_EP, " socket connected!\n");
+
/* determine QP & port numbers */
ibs = ib_query_qp(ep_ptr->qp_handle, &qpa);
if (ibs != IB_SUCCESS)
@@ -187,7 +523,6 @@ dapli_socket_connect ( DAPL_EP *ep_ptr,
/* Send QP info, IA address and private data */
cm_ptr->dst.qpn = qpa.num; /* ib_net32_t */
cm_ptr->dst.port = cl_hton16(ia_ptr->hca_ptr->port_num);
-
cm_ptr->dst.lid = dapli_get_lid( ia_ptr->hca_ptr,
ia_ptr->hca_ptr->port_num );
if (cm_ptr->dst.lid == 0)
@@ -197,6 +532,17 @@ dapli_socket_connect ( DAPL_EP *ep_ptr,
__FUNCTION__, __LINE__);
goto bail;
}
+
+ /* set gid in network order */
+ ibs = dapls_ib_query_gid( ia_ptr->hca_ptr, &cm_ptr->dst.gid );
+ if ( ibs != IB_SUCCESS )
+ {
+ dapl_dbg_log(DAPL_DBG_TYPE_ERR,
+ "%s() dapls_ib_query_gid() returns '%s'\n",
+ __FUNCTION__,ib_get_err_str(ibs));
+ goto bail;
+ }
+
cm_ptr->dst.ia_address = ia_ptr->hca_ptr->hca_address;
cm_ptr->dst.p_size = cl_hton32(p_size);
@@ -213,6 +559,8 @@ dapli_socket_connect ( DAPL_EP *ep_ptr,
iovec[1].buf = p_data;
iovec[1].len = p_size;
}
+
+ dapl_dbg_log(DAPL_DBG_TYPE_EP," socket connected, write QP and private data\n");
rc = WSASend (cm_ptr->socket, iovec, (p_size ? 2:1), &len, 0, 0, NULL);
if ( rc || len != (p_size + sizeof(ib_qp_cm_t)) ) {
dapl_dbg_log(DAPL_DBG_TYPE_ERR,
@@ -225,17 +573,65 @@ dapli_socket_connect ( DAPL_EP *ep_ptr,
cm_ptr->dst.port, cm_ptr->dst.lid,
cm_ptr->dst.qpn, cm_ptr->dst.p_size );
+ /* queue up to work thread to avoid blocking consumer */
+ cm_ptr->state = SCM_CONN_PENDING;
+ cm_ptr->hca = ia_ptr->hca_ptr;
+ cm_ptr->ep = ep_ptr;
+ dapli_cm_queue(cm_ptr);
+ return DAT_SUCCESS;
+bail:
+ /* close socket, free cm structure */
+ dapli_cm_destroy(cm_ptr);
+ return DAT_INTERNAL_ERROR;
+}
+
+
+/*
+ * ACTIVE: exchange QP information, called from CR thread
+ */
+void
+dapli_socket_connect_rtu(dp_ib_cm_handle_t cm_ptr)
+{
+ DAPL_EP *ep_ptr = cm_ptr->ep;
+ DAPL_IA *ia_ptr = cm_ptr->ep->header.owner_ia;
+ int len, rc;
+ DWORD ioflags;
+ WSABUF iovec[1];
+ short rtu_data = htons(0x0E0F);
+ ib_cm_events_t event = IB_CME_DESTINATION_REJECT;
+ ib_api_status_t ibs;
+ dapl_ibal_port_t *p_port;
+ dapl_ibal_ca_t *p_ca;
+
/* read DST information into cm_ptr, overwrite SRC info */
+ dapl_dbg_log(DAPL_DBG_TYPE_EP," connect_rtu: recv peer QP data\n");
+
+ iovec[0].buf = (char*)&cm_ptr->dst;
+ iovec[0].len = sizeof(ib_qp_cm_t);
ioflags = len = 0;
rc = WSARecv (cm_ptr->socket, iovec, 1, &len, &ioflags, 0, 0);
- if ( rc == SOCKET_ERROR || len != sizeof(ib_qp_cm_t) ) {
- dapl_dbg_log(DAPL_DBG_TYPE_ERR,"connect read: ERR %d rcnt=%d\n",
- WSAGetLastError(), len);
+ if ( rc == SOCKET_ERROR || len != sizeof(ib_qp_cm_t) ||
+ ntohs(cm_ptr->dst.ver) != DSCM_VER )
+ {
+ dapl_dbg_log(DAPL_DBG_TYPE_ERR,
+ "connect_rtu read: ERR %d rcnt=%d ver=%d\n",
+ WSAGetLastError(), len, cm_ptr->dst.ver);
+ goto bail;
+ }
+
+ /* check for consumer reject */
+ if (cm_ptr->dst.rej) {
+ dapl_dbg_log(DAPL_DBG_TYPE_CM,
+ " connect_rtu read: PEER REJ reason=0x%x\n",
+ ntohs(cm_ptr->dst.rej));
+ event = IB_CME_DESTINATION_REJECT_PRIVATE_DATA;
goto bail;
}
- /* revert back to host byte ordering */
+ /* convert peer response values to host order */
cm_ptr->dst.port = cl_ntoh16(cm_ptr->dst.port);
+ cm_ptr->dst.lid = ntohs(cm_ptr->dst.lid);
+ cm_ptr->dst.qpn = cm_ptr->dst.qpn;
cm_ptr->dst.p_size = cl_ntoh32(cm_ptr->dst.p_size);
dapl_dbg_log(DAPL_DBG_TYPE_EP, " connect: Rx DST: qpn %x port %d "
@@ -245,15 +641,27 @@ dapli_socket_connect ( DAPL_EP *ep_ptr,
cm_ptr->dst.p_size,
dapli_get_ip_addr_str(&cm_ptr->dst.ia_address,NULL));
+ /* save remote address information */
+ dapl_os_memcpy( &ep_ptr->remote_ia_address,
+ &cm_ptr->dst.ia_address,
+ sizeof(ep_ptr->remote_ia_address));
+
+ dapl_dbg_log(DAPL_DBG_TYPE_EP,
+ " connect_rtu: DST %s port=0x%x lid=0x%x, qpn=0x%x, psize=%d\n",
+ inet_ntoa(((struct sockaddr_in *)&cm_ptr->dst.ia_address)->sin_addr),
+ cm_ptr->dst.port, cm_ptr->dst.lid,
+ cm_ptr->dst.qpn, cm_ptr->dst.p_size);
+
/* validate private data size before reading */
- if ( cm_ptr->dst.p_size > IB_MAX_REP_PDATA_SIZE ) {
+ if (cm_ptr->dst.p_size > IB_MAX_REP_PDATA_SIZE) {
dapl_dbg_log(DAPL_DBG_TYPE_ERR,
- " connect read: psize (%d) wrong\n",
+ " connect_rtu read: psize (%d) wrong\n",
cm_ptr->dst.p_size );
goto bail;
}
/* read private data into cm_handle if any present */
+ dapl_dbg_log(DAPL_DBG_TYPE_EP," socket connected, read private data\n");
if ( cm_ptr->dst.p_size ) {
iovec[0].buf = cm_ptr->p_data;
iovec[0].len = cm_ptr->dst.p_size;
@@ -300,32 +708,29 @@ dapli_socket_connect ( DAPL_EP *ep_ptr,
ep_ptr->qp_state = IB_QP_STATE_RTS;
+ dapl_dbg_log(DAPL_DBG_TYPE_EP," connect_rtu: send RTU\n");
+
/* complete handshake after final QP state change */
send(cm_ptr->socket, (const char *)&rtu_data, sizeof(rtu_data), 0);
/* init cm_handle and post the event with private data */
ep_ptr->cm_handle = cm_ptr;
+ cm_ptr->state = SCM_CONNECTED;
dapl_dbg_log( DAPL_DBG_TYPE_EP," ACTIVE: connected!\n" );
dapl_evd_connection_callback( ep_ptr->cm_handle,
IB_CME_CONNECTED,
cm_ptr->p_data,
ep_ptr );
- return DAT_SUCCESS;
-
+ return;
bail:
/* close socket, free cm structure and post error event */
- if ( cm_ptr->socket >= 0 )
- closesocket(cm_ptr->socket);
-
- dapl_os_free( cm_ptr, sizeof( *cm_ptr ) );
- dapls_ib_reinit_ep( ep_ptr ); /* reset QP state */
-
- dapl_evd_connection_callback( ep_ptr->cm_handle,
- IB_CME_LOCAL_FAILURE,
+ dapli_cm_destroy(cm_ptr);
+ dapls_ib_reinit_ep(ep_ptr); /* reset QP state */
+ dapl_evd_connection_callback( NULL /*ep_ptr->cm_handle*/,
+ event,
NULL,
ep_ptr );
- return DAT_INTERNAL_ERROR;
}
@@ -347,14 +752,12 @@ dapli_socket_listen ( DAPL_IA *ia_ptr,
ia_ptr, serviceID, sp_ptr);
/* Allocate CM and initialize */
- if ((cm_ptr = dapl_os_alloc(sizeof(*cm_ptr))) == NULL)
+ cm_ptr = dapli_cm_create();
+ if (cm_ptr == NULL)
return DAT_INSUFFICIENT_RESOURCES;
- (void) dapl_os_memzero( cm_ptr, sizeof( *cm_ptr ) );
-
- cm_ptr->socket = cm_ptr->l_socket = -1;
cm_ptr->sp = sp_ptr;
- cm_ptr->hca_ptr = ia_ptr->hca_ptr;
+ cm_ptr->hca = ia_ptr->hca_ptr;
/* bind, listen, set sockopt, accept, exchange data */
if ((cm_ptr->l_socket = socket(AF_INET, SOCK_STREAM, 0)) < 0) {
@@ -406,12 +809,9 @@ dapli_socket_listen ( DAPL_IA *ia_ptr,
/* set cm_handle for this service point, save listen socket */
sp_ptr->cm_srvc_handle = cm_ptr;
- /* add to SP->CR thread list */
- dapl_llist_init_entry((DAPL_LLIST_ENTRY*)&cm_ptr->entry);
- dapl_os_lock( &cm_ptr->hca_ptr->ib_trans.lock );
- dapl_llist_add_tail((DAPL_LLIST_HEAD*)&cm_ptr->hca_ptr->ib_trans.list,
- (DAPL_LLIST_ENTRY*)&cm_ptr->entry, (void*)cm_ptr);
- dapl_os_unlock(&cm_ptr->hca_ptr->ib_trans.lock);
+ /* queue up listen socket to process inbound CR's */
+ cm_ptr->state = SCM_LISTEN;
+ dapli_cm_queue(cm_ptr);
dapl_dbg_log( DAPL_DBG_TYPE_CM,
" listen: qual 0x%x cr %p s_fd %d\n",
@@ -421,10 +821,7 @@ dapli_socket_listen ( DAPL_IA *ia_ptr,
bail:
dapl_dbg_log( DAPL_DBG_TYPE_CM,
" listen: ERROR on conn_qual 0x%x\n",serviceID);
- if ( cm_ptr->l_socket >= 0 )
- closesocket( cm_ptr->l_socket );
-
- dapl_os_free( cm_ptr, sizeof( *cm_ptr ) );
+ dapli_cm_destroy(cm_ptr);
return dat_status;
}
@@ -441,6 +838,8 @@ dapli_socket_accept(ib_cm_srvc_handle_t cm_ptr)
int len;
DAT_RETURN dat_status = DAT_SUCCESS;
+ dapl_dbg_log(DAPL_DBG_TYPE_EP," socket_accept\n");
+
/* Allocate accept CM and initialize */
if ((acm_ptr = dapl_os_alloc(sizeof(*acm_ptr))) == NULL)
return DAT_INSUFFICIENT_RESOURCES;
@@ -448,8 +847,9 @@ dapli_socket_accept(ib_cm_srvc_handle_t cm_ptr)
(void) dapl_os_memzero( acm_ptr, sizeof( *acm_ptr ) );
acm_ptr->socket = -1;
+ acm_ptr->l_socket = -1;
acm_ptr->sp = cm_ptr->sp;
- acm_ptr->hca_ptr = cm_ptr->hca_ptr;
+ acm_ptr->hca = cm_ptr->hca;
len = sizeof(acm_ptr->dst.ia_address);
acm_ptr->socket = accept(cm_ptr->l_socket,
@@ -464,27 +864,32 @@ dapli_socket_accept(ib_cm_srvc_handle_t cm_ptr)
goto bail;
}
+ dapl_dbg_log(DAPL_DBG_TYPE_EP," socket accepted, read QP data\n");
+
/* read in DST QP info, IA address. check for private data */
len = recv(acm_ptr->socket,(char*)&acm_ptr->dst,sizeof(ib_qp_cm_t),0);
- if ( len != sizeof(ib_qp_cm_t) ) {
+ if ( len != sizeof(ib_qp_cm_t) || ntohs(acm_ptr->dst.ver) != DSCM_VER )
+ {
dapl_dbg_log(DAPL_DBG_TYPE_ERR,
- " accept read: ERR %d, rcnt=%d\n",
- WSAGetLastError(), len);
+ " accept read: ERR %d, rcnt=%d ver=%d\n",
+ WSAGetLastError(), len, acm_ptr->dst.ver);
dat_status = DAT_INTERNAL_ERROR;
goto bail;
}
- /* revert back to host byte ordering */
+ /* convert accepted values to host byte ordering */
acm_ptr->dst.port = cl_ntoh16(acm_ptr->dst.port);
+ acm_ptr->dst.lid = ntohs(acm_ptr->dst.lid);
+ acm_ptr->dst.qpn = acm_ptr->dst.qpn;
acm_ptr->dst.p_size = cl_ntoh32(acm_ptr->dst.p_size);
- dapl_dbg_log(DAPL_DBG_TYPE_EP, " accept: DST sizeof(ib_cm_t) %d qpn %x "
- "port %d lid 0x%x psize %d IP %s\n",
- sizeof(ib_qp_cm_t),
- cl_ntoh32(acm_ptr->dst.qpn), acm_ptr->dst.port,
+ dapl_dbg_log(DAPL_DBG_TYPE_EP, " accept: DST %s port 0x%x "
+ "lid 0x%x qpn 0x%x psize %d\n",
+ dapli_get_ip_addr_str(&acm_ptr->dst.ia_address,NULL),
+ acm_ptr->dst.port,
cl_ntoh16(acm_ptr->dst.lid),
- acm_ptr->dst.p_size,
- dapli_get_ip_addr_str(&acm_ptr->dst.ia_address,NULL));
+ cl_ntoh32(acm_ptr->dst.qpn),
+ acm_ptr->dst.p_size);
/* validate private data size before reading */
if ( acm_ptr->dst.p_size > IB_MAX_REQ_PDATA_SIZE ) {
@@ -495,6 +900,8 @@ dapli_socket_accept(ib_cm_srvc_handle_t cm_ptr)
goto bail;
}
+ dapl_dbg_log(DAPL_DBG_TYPE_EP," socket accepted, read private data\n");
+
/* read private data into cm_handle if any present */
if ( acm_ptr->dst.p_size ) {
len = recv( acm_ptr->socket,
@@ -514,6 +921,8 @@ dapli_socket_accept(ib_cm_srvc_handle_t cm_ptr)
p_data = acm_ptr->p_data;
}
+ acm_ptr->state = SCM_ACCEPTING;
+
/* trigger CR event and return SUCCESS */
dapls_cr_callback( acm_ptr,
IB_CME_CONNECTION_REQUEST_PENDING,
@@ -521,153 +930,8 @@ dapli_socket_accept(ib_cm_srvc_handle_t cm_ptr)
acm_ptr->sp );
return DAT_SUCCESS;
-
-bail:
- if ( acm_ptr->socket >= 0 )
- closesocket( acm_ptr->socket );
-
- dapl_os_free( acm_ptr, sizeof( *acm_ptr ) );
- return DAT_INTERNAL_ERROR;
-}
-
-
-static DAT_RETURN
-dapli_socket_accept_final( DAPL_EP *ep_ptr,
- DAPL_CR *cr_ptr,
- DAT_COUNT p_size,
- DAT_PVOID p_data )
-{
- DAPL_IA *ia_ptr = ep_ptr->header.owner_ia;
- dp_ib_cm_handle_t cm_ptr = cr_ptr->ib_cm_handle;
- ib_qp_cm_t qp_cm;
- WSABUF iovec[2];
- int len, rc;
- short rtu_data = 0;
- ib_api_status_t ibs;
- ib_qp_attr_t qpa;
- dapl_ibal_port_t *p_port;
- dapl_ibal_ca_t *p_ca;
-
- dapl_dbg_log (DAPL_DBG_TYPE_EP, "%s() p_sz %d sock %d port %d\n",
- __FUNCTION__,p_size,cm_ptr->socket,
- ia_ptr->hca_ptr->port_num);
-
- if (p_size > IB_MAX_REP_PDATA_SIZE)
- return DAT_LENGTH_ERROR;
-
- /* must have a accepted socket */
- if ( cm_ptr->socket < 0 )
- return DAT_INTERNAL_ERROR;
-
- /* modify QP to RTR and then to RTS with remote info already read */
-
- p_ca = (dapl_ibal_ca_t *) ia_ptr->hca_ptr->ib_hca_handle;
- p_port = dapli_ibal_get_port (p_ca, (uint8_t)ia_ptr->hca_ptr->port_num);
- if (!p_port)
- {
- dapl_dbg_log(DAPL_DBG_TYPE_ERR,
- "%s() dapli_ibal_get_port() failed @ line #%d\n",
- __FUNCTION__,__LINE__);
- goto bail;
- }
-
- dapl_dbg_log(DAPL_DBG_TYPE_EP, "%s() DST: qpn %x port %d lid %x\n",
- __FUNCTION__,
- cl_ntoh32(cm_ptr->dst.qpn),
- cm_ptr->dst.port,
- cl_ntoh16(cm_ptr->dst.lid));
-
- /* modify QP to RTR and then to RTS with remote info */
-
- ibs = dapls_modify_qp_state_to_rtr( ep_ptr->qp_handle,
- cm_ptr->dst.qpn,
- cm_ptr->dst.lid,
- p_port );
- if (ibs != IB_SUCCESS)
- {
- dapl_dbg_log(DAPL_DBG_TYPE_ERR,
- "%s() QP --> RTR failed @ line #%d\n",
- __FUNCTION__,__LINE__);
- goto bail;
- }
-
- if ( dapls_modify_qp_state_to_rts( ep_ptr->qp_handle ) )
- {
- dapl_dbg_log(DAPL_DBG_TYPE_ERR,
- "%s() QP --> RTS failed @ line #%d\n",
- __FUNCTION__,__LINE__);
- goto bail;
- }
-
- ep_ptr->qp_state = IB_QP_STATE_RTS;
-
- /* determine QP & port numbers */
- ibs = ib_query_qp(ep_ptr->qp_handle, &qpa);
- if (ibs != IB_SUCCESS)
- {
- dapl_dbg_log(DAPL_DBG_TYPE_ERR,
- " ib_query_qp() ERR %s\n", ib_get_err_str(ibs));
- goto bail;
- }
-
- /* Send QP info, IA address, and private data */
- qp_cm.qpn = qpa.num; /* ib_net32_t */
- qp_cm.port = ia_ptr->hca_ptr->port_num;
- qp_cm.lid = dapli_get_lid( ia_ptr->hca_ptr, ia_ptr->hca_ptr->port_num );
- qp_cm.ia_address = ia_ptr->hca_ptr->hca_address;
- qp_cm.p_size = p_size;
-
- dapl_dbg_log(DAPL_DBG_TYPE_CM,
- "%s()\n Tx QP info: qpn %x port %d lid 0x%x p_sz %d IP %s\n",
- __FUNCTION__, cl_ntoh32(qp_cm.qpn), qp_cm.port,
- cl_ntoh16(qp_cm.lid), qp_cm.p_size,
- dapli_get_ip_addr_str(&qp_cm.ia_address,NULL));
-
- /* network byte-ordering - QPN & LID already are */
- qp_cm.p_size = cl_hton32(qp_cm.p_size);
- qp_cm.port = cl_hton16(qp_cm.port);
-
- iovec[0].buf = (char*)&qp_cm;
- iovec[0].len = sizeof(ib_qp_cm_t);
- if (p_size) {
- iovec[1].buf = p_data;
- iovec[1].len = p_size;
- }
- rc = WSASend( cm_ptr->socket, iovec, (p_size ? 2:1), &len, 0, 0, 0 );
- if (rc || len != (p_size + sizeof(ib_qp_cm_t))) {
- dapl_dbg_log(DAPL_DBG_TYPE_ERR,
- " accept_final: ERR %d, wcnt=%d\n",
- WSAGetLastError(), len);
- goto bail;
- }
- dapl_dbg_log(DAPL_DBG_TYPE_EP,
- " accept_final: SRC qpn %x port %d lid 0x%x psize %d\n",
- qp_cm.qpn, qp_cm.port, qp_cm.lid, qp_cm.p_size );
-
- /* complete handshake after final QP state change */
- len = recv(cm_ptr->socket, (char*)&rtu_data, sizeof(rtu_data), 0);
- if ( len != sizeof(rtu_data) || ntohs(rtu_data) != 0x0e0f ) {
- dapl_dbg_log(DAPL_DBG_TYPE_ERR,
- " accept_final: ERR %d, rcnt=%d rdata=%x\n",
- WSAGetLastError(), len, ntohs(rtu_data) );
- goto bail;
- }
-
- /* final data exchange if remote QP state is good to go */
- dapl_dbg_log( DAPL_DBG_TYPE_EP," PASSIVE: connected!\n" );
-
- dapls_cr_callback ( cm_ptr, IB_CME_CONNECTED, NULL, cm_ptr->sp );
-
- return DAT_SUCCESS;
-
bail:
- dapl_dbg_log( DAPL_DBG_TYPE_ERR," accept_final: ERR !QP_RTR_RTS \n");
- if ( cm_ptr->socket >= 0 )
- closesocket( cm_ptr->socket );
-
- dapl_os_free( cm_ptr, sizeof( *cm_ptr ) );
- dapls_ib_reinit_ep( ep_ptr ); /* reset QP state */
-
+ dapli_cm_destroy(acm_ptr);
return DAT_INTERNAL_ERROR;
}
@@ -747,11 +1011,7 @@ dapls_ib_disconnect (
dapl_dbg_log (DAPL_DBG_TYPE_EP,
"dapls_ib_disconnect(ep_handle %p ....)\n", ep_ptr);
- if ( cm_ptr->socket >= 0 ) {
- closesocket( cm_ptr->socket );
- cm_ptr->socket = -1;
- }
-
+#if 0 // XXX
/* disconnect QP ala transition to RESET state */
ib_status = dapls_modify_qp_state_to_reset (ep_ptr->qp_handle);
@@ -776,15 +1036,18 @@ dapls_ib_disconnect (
NULL,
ep_ptr );
ep_ptr->cm_handle = NULL;
- dapl_os_free( cm_ptr, sizeof( *cm_ptr ) );
}
-
+#endif
/* modify QP state --> INIT */
dapls_ib_reinit_ep(ep_ptr);
+ if (cm_ptr == NULL)
return DAT_SUCCESS;
+ else
+ return dapli_socket_disconnect(cm_ptr);
}
+
/*
* dapls_ib_disconnect_clean
*
@@ -874,14 +1137,20 @@ dapls_ib_remove_conn_listener (
if ( cm_ptr != NULL ) {
if ( cm_ptr->l_socket >= 0 ) {
closesocket( cm_ptr->l_socket );
+ cm_ptr->l_socket = -1;
+ }
+ if ( cm_ptr->socket >= 0 ) {
+ closesocket( cm_ptr->socket );
cm_ptr->socket = -1;
}
/* cr_thread will free */
sp_ptr->cm_srvc_handle = NULL;
+ _write(g_scm_pipe[1], "w", sizeof "w");
}
return DAT_SUCCESS;
}
+
/*
* dapls_ib_accept_connection
*
@@ -928,7 +1197,7 @@ dapls_ib_accept_connection (
return status;
}
- return ( dapli_socket_accept_final(ep_ptr, cr_ptr, p_size, p_data) );
+ return ( dapli_socket_accept_usr(ep_ptr, cr_ptr, p_size, p_data) );
}
@@ -948,27 +1217,39 @@ dapls_ib_accept_connection (
* DAT_INTERNAL_ERROR
*
*/
+
DAT_RETURN
dapls_ib_reject_connection (
- IN dp_ib_cm_handle_t ib_cm_handle,
+ IN dp_ib_cm_handle_t cm_ptr,
IN int reject_reason,
- IN DAT_COUNT private_data_size,
- IN const DAT_PVOID private_data)
+ IN DAT_COUNT psize,
+ IN const DAT_PVOID pdata)
{
- ib_cm_srvc_handle_t cm_ptr = ib_cm_handle;
+ WSABUF iovec[1];
+ int len;
dapl_dbg_log (DAPL_DBG_TYPE_EP,
- "dapls_ib_reject_connection(cm_handle %p reason %x)\n",
- ib_cm_handle, reject_reason );
-
- /* just close the socket and return */
- if ( cm_ptr->socket > 0 ) {
- closesocket( cm_ptr->socket );
+ " reject(cm %p reason %x pdata %p psize %d)\n",
+ cm_ptr, reject_reason, pdata, psize );
+
+ /* write reject data to indicate reject */
+ if (cm_ptr->socket >= 0) {
+ cm_ptr->dst.rej = (uint16_t)reject_reason;
+ cm_ptr->dst.rej = cl_hton16(cm_ptr->dst.rej);
+ iovec[0].buf = (char*)&cm_ptr->dst;
+ iovec[0].len = sizeof(ib_qp_cm_t);
+ (void) WSASend (cm_ptr->socket, iovec, 1, &len, 0, 0, NULL);
+ closesocket(cm_ptr->socket);
cm_ptr->socket = -1;
}
+
+ /* cr_thread will destroy CR */
+ cm_ptr->state = SCM_REJECTED;
+ _write(g_scm_pipe[1], "w", sizeof "w");
return DAT_SUCCESS;
}
+
/*
* dapls_ib_cm_remote_addr
*
@@ -1157,7 +1438,7 @@ dapls_ib_get_dat_event (
/*
- * dapls_ib_get_dat_event
+ * dapls_ib_get_cm_event
*
* Return a DAT connection event given a provider CM event.
*
@@ -1189,12 +1470,16 @@ dapls_ib_get_cm_event (
}
#endif /* NOT_USED */
-/* async CR processing thread to avoid blocking applications */
+/* outbound/inbound CR processing thread to avoid blocking applications */
+
+#define SCM_MAX_CONN (8 * sizeof(fd_set))
+
void cr_thread(void *arg)
{
struct dapl_hca *hca_ptr = arg;
ib_cm_srvc_handle_t cr, next_cr;
int max_fd, rc;
+ char rbuf[2];
fd_set rfd, rfds;
struct timeval to;
@@ -1202,10 +1487,12 @@ void cr_thread(void *arg)
dapl_os_lock( &hca_ptr->ib_trans.lock );
hca_ptr->ib_trans.cr_state = IB_THREAD_RUN;
+
while (hca_ptr->ib_trans.cr_state == IB_THREAD_RUN) {
FD_ZERO( &rfds );
- max_fd = -1;
+ FD_SET(g_scm_pipe[0], &rfds);
+ max_fd = g_scm_pipe[0];
if (!dapl_llist_is_empty((DAPL_LLIST_HEAD*)&hca_ptr->ib_trans.list))
next_cr = dapl_llist_peek_head((DAPL_LLIST_HEAD*)
@@ -1230,32 +1517,46 @@ void cr_thread(void *arg)
continue;
}
- FD_SET( cr->l_socket, &rfds ); /* add to select set */
- if ( cr->l_socket > max_fd )
+ if (cr->socket > SCM_MAX_CONN-1) {
+ dapl_dbg_log(DAPL_DBG_TYPE_ERR,
+ "SCM ERR: cr->socket(%d) exceeded FD_SETSIZE %d\n",
+ cr->socket,SCM_MAX_CONN-1);
+ continue;
+ }
+ FD_SET( cr->socket, &rfds ); /* add to select SET */
+ if ( cr->socket > max_fd )
max_fd = cr->l_socket;
/* individual select poll to check for work */
FD_ZERO(&rfd);
- FD_SET(cr->l_socket, &rfd);
+ FD_SET(cr->socket, &rfd);
dapl_os_unlock(&hca_ptr->ib_trans.lock);
to.tv_sec = 0;
to.tv_usec = 0; /* wakeup and check destroy */
/* block waiting for Rx data */
- if (select(cr->l_socket+1,&rfd,NULL,NULL,&to) == SOCKET_ERROR) {
+ if (select(cr->socket+1,&rfd,NULL,NULL,&to) == SOCKET_ERROR) {
rc = WSAGetLastError();
if ( rc != SOCKET_ERROR /*WSAENOTSOCK*/ )
{
dapl_dbg_log (DAPL_DBG_TYPE_ERR/*CM*/,
" thread: select(sock %d) ERR %d on cr %p\n",
- cr->l_socket, rc, cr);
+ cr->socket, rc, cr);
+ }
+ closesocket(cr->socket);
+ cr->socket = -1;
+ } else if (FD_ISSET(cr->socket,&rfd)) {
+ if (cr->socket > 0) {
+ if (cr->state == SCM_LISTEN)
+ dapli_socket_accept(cr);
+ else if (cr->state == SCM_ACCEPTED)
+ dapli_socket_accept_rtu(cr);
+ else if (cr->state == SCM_CONN_PENDING)
+ dapli_socket_connect_rtu(cr);
+ else if (cr->state == SCM_CONNECTED)
+ dapli_socket_disconnect(cr);
}
- closesocket(cr->l_socket);
- cr->l_socket = -1;
- } else if (FD_ISSET(cr->l_socket,&rfd) && dapli_socket_accept(cr)) {
- closesocket(cr->l_socket);
- cr->l_socket = -1;
}
dapl_os_lock( &hca_ptr->ib_trans.lock );
next_cr = dapl_llist_next_entry((DAPL_LLIST_HEAD*)
@@ -1263,9 +1564,19 @@ void cr_thread(void *arg)
(DAPL_LLIST_ENTRY*)&cr->entry );
}
dapl_os_unlock( &hca_ptr->ib_trans.lock );
+
to.tv_sec = 0;
to.tv_usec = 100000; /* wakeup and check destroy */
+
(void) select(max_fd+1, &rfds, NULL, NULL, &to);
+
+ /* if pipe data consume - used to wake this thread up */
+ if (FD_ISSET(g_scm_pipe[0],&rfds)) {
+ dapl_dbg_log(DAPL_DBG_TYPE_CM," cr_thread() read pipe data\n");
+printf(" cr_thread() read pipe data\n");
+ _read(g_scm_pipe[0], rbuf, 2);
+printf(" cr_thread() Finished read pipe data\n");
+ }
dapl_os_lock( &hca_ptr->ib_trans.lock );
}
dapl_os_unlock( &hca_ptr->ib_trans.lock );
diff --git a/dapl/ibal-scm/dapl_ibal-scm_util.c b/dapl/ibal-scm/dapl_ibal-scm_util.c
index 8e5f8ac..06bc704 100644
--- a/dapl/ibal-scm/dapl_ibal-scm_util.c
+++ b/dapl/ibal-scm/dapl_ibal-scm_util.c
@@ -52,6 +52,7 @@ static const char rcsid[] = "$Id: $";
#include "dapl.h"
#include "dapl_adapter_util.h"
#include "dapl_ibal_util.h"
+#include "dapl_ibal_name_service.h"
#include <stdio.h>
#include <stdlib.h>
@@ -61,9 +62,12 @@ static const char rcsid[] = "$Id: $";
#include <winsock2.h>
#include <ws2tcpip.h>
#include <io.h>
+#include <fcntl.h>
+extern void cr_thread(void *arg);
int g_dapl_loopback_connection = 0;
+int g_scm_pipe[2];
#ifdef NOT_USED
@@ -132,22 +136,55 @@ DAT_RETURN dapli_init_sock_cm ( IN DAPL_HCA *hca_ptr )
dapl_dbg_log (DAPL_DBG_TYPE_UTIL, " %s(): %p\n",__FUNCTION__,hca_ptr );
- /* set inline max with enviroment or default */
+ /* set RC tunables via enviroment or default */
hca_ptr->ib_trans.max_inline_send =
- dapl_os_get_env_val ( "DAPL_MAX_INLINE", INLINE_SEND_DEFAULT );
+ dapl_os_get_env_val("DAPL_MAX_INLINE", INLINE_SEND_DEFAULT);
+#if 0
+ hca_ptr->ib_trans.ack_retry =
+ dapl_os_get_env_val("DAPL_ACK_RETRY", SCM_ACK_RETRY);
+ hca_ptr->ib_trans.ack_timer =
+ dapl_os_get_env_val("DAPL_ACK_TIMER", SCM_ACK_TIMER);
+ hca_ptr->ib_trans.rnr_retry =
+ dapl_os_get_env_val("DAPL_RNR_RETRY", SCM_RNR_RETRY);
+ hca_ptr->ib_trans.rnr_timer =
+ dapl_os_get_env_val("DAPL_RNR_TIMER", SCM_RNR_TIMER);
+ hca_ptr->ib_trans.global =
+ dapl_os_get_env_val("DAPL_GLOBAL_ROUTING", SCM_GLOBAL);
+ hca_ptr->ib_trans.hop_limit =
+ dapl_os_get_env_val("DAPL_HOP_LIMIT", SCM_HOP_LIMIT);
+ hca_ptr->ib_trans.tclass =
+ dapl_os_get_env_val("DAPL_TCLASS", SCM_TCLASS);
+#endif
/* initialize cr_list lock */
dat_status = dapl_os_lock_init(&hca_ptr->ib_trans.lock);
if (dat_status != DAT_SUCCESS)
{
dapl_dbg_log (DAPL_DBG_TYPE_ERR,
- " open_hca: failed to init lock\n");
+ "%s() failed to init cr_list lock\n", __FUNCTION__);
return DAT_INTERNAL_ERROR;
}
+#if 0
+ /* initialize cq_lock */
+ dat_status = dapl_os_lock_init(&hca_ptr->ib_trans.cq_lock);
+ if (dat_status != DAT_SUCCESS) {
+ dapl_log(DAPL_DBG_TYPE_ERR,
+ "%s() failed to init cq_lock\n", __FUNCTION__);
+ return DAT_INTERNAL_ERROR;
+ }
+#endif
+
/* initialize CM list for listens on this HCA */
dapl_llist_init_head((DAPL_LLIST_HEAD*)&hca_ptr->ib_trans.list);
+ /* create pipe communication endpoints */
+ if (_pipe(g_scm_pipe, 256, O_TEXT)) {
+ dapl_dbg_log (DAPL_DBG_TYPE_ERR,
+ "%s() failed to create thread\n", __FUNCTION__);
+ return DAT_INTERNAL_ERROR;
+ }
+
/* create thread to process inbound connect request */
hca_ptr->ib_trans.cr_state = IB_THREAD_INIT;
dat_status = dapl_os_thread_create(cr_thread,
@@ -199,6 +236,7 @@ DAT_RETURN dapli_close_sock_cm ( IN DAPL_HCA *hca_ptr )
/* destroy cr_thread and lock */
hca_ptr->ib_trans.cr_state = IB_THREAD_CANCEL;
+
while (hca_ptr->ib_trans.cr_state != IB_THREAD_EXIT) {
dapl_dbg_log(DAPL_DBG_TYPE_UTIL,
" close_hca: waiting for cr_thread\n");
More information about the general
mailing list