[ofa-general] RE: [ofw] [PATCH 5/5] [DAPL] dapl/ibal-scm: update ibal-scm provider
Davis, Arlin R
arlin.r.davis at intel.com
Fri Jan 30 14:34:38 PST 2009
Applied all 5 patches. Thanks.
-arlin
>-----Original Message-----
>From: ofw-bounces at lists.openfabrics.org
>[mailto:ofw-bounces at lists.openfabrics.org] On Behalf Of Sean Hefty
>Sent: Friday, January 30, 2009 10:59 AM
>To: Hefty, Sean; OpenIB; ofw at lists.openfabrics.org
>Subject: [ofw] [PATCH 5/5] [DAPL] dapl/ibal-scm: update
>ibal-scm provider
>
>From: Stan Smith <stan.smith at intel.com>
>
>Update the dapl.git tree with the latest SVN version of the
>ibal-scm provider.
>
>Signed-off-by: Sean Hefty <sean.hefty at intel.com>
>---
>Actual codng changes were made by Stan. I'm just submitting the patch
>to update the DAPL git repository.
>
> dapl/ibal-scm/dapl_ibal-scm_cm.c | 775
>+++++++++++++++++++++++++-----------
> dapl/ibal-scm/dapl_ibal-scm_util.c | 44 ++
> 2 files changed, 584 insertions(+), 235 deletions(-)
>
>diff --git a/dapl/ibal-scm/dapl_ibal-scm_cm.c
>b/dapl/ibal-scm/dapl_ibal-scm_cm.c
>index df83008..6a050b8 100644
>--- a/dapl/ibal-scm/dapl_ibal-scm_cm.c
>+++ b/dapl/ibal-scm/dapl_ibal-scm_cm.c
>@@ -63,6 +63,88 @@
> #include <ws2tcpip.h>
> #include <io.h>
>
>+extern int g_scm_pipe[2];
>+
>+extern DAT_RETURN
>+dapls_ib_query_gid( IN DAPL_HCA *hca_ptr,
>+ IN GID *gid );
>+
>+
>+static struct ib_cm_handle * dapli_cm_create(void)
>+{
>+ struct ib_cm_handle *cm_ptr;
>+
>+ /* Allocate CM, init lock, and initialize */
>+ if ((cm_ptr = dapl_os_alloc(sizeof(*cm_ptr))) == NULL)
>+ return NULL;
>+
>+ if (dapl_os_lock_init(&cm_ptr->lock))
>+ goto bail;
>+
>+ (void)dapl_os_memzero(cm_ptr, sizeof(*cm_ptr));
>+ cm_ptr->dst.ver = htons(DSCM_VER);
>+ cm_ptr->socket = -1;
>+ cm_ptr->l_socket = -1;
>+ return cm_ptr;
>+bail:
>+ dapl_os_free(cm_ptr, sizeof(*cm_ptr));
>+ return NULL;
>+}
>+
>+
>+/* mark for destroy, remove all references, schedule cleanup */
>+
>+static void dapli_cm_destroy(struct ib_cm_handle *cm_ptr)
>+{
>+ dapl_dbg_log(DAPL_DBG_TYPE_CM,
>+ " cm_destroy: cm %p ep %p\n", cm_ptr,cm_ptr->ep);
>+
>+ /* cleanup, never made it to work queue */
>+ if (cm_ptr->state == SCM_INIT) {
>+ if (cm_ptr->socket >= 0)
>+ closesocket(cm_ptr->socket);
>+ if (cm_ptr->l_socket >= 0)
>+ closesocket(cm_ptr->l_socket);
>+ dapl_os_free(cm_ptr, sizeof(*cm_ptr));
>+ return;
>+ }
>+
>+ dapl_os_lock(&cm_ptr->lock);
>+ cm_ptr->state = SCM_DESTROY;
>+ if (cm_ptr->ep)
>+ cm_ptr->ep->cm_handle = IB_INVALID_HANDLE;
>+
>+ /* close socket if still active */
>+ if (cm_ptr->socket >= 0) {
>+ closesocket(cm_ptr->socket);
>+ cm_ptr->socket = -1;
>+ }
>+ if (cm_ptr->l_socket >= 0) {
>+ closesocket(cm_ptr->l_socket);
>+ cm_ptr->l_socket = -1;
>+ }
>+ dapl_os_unlock(&cm_ptr->lock);
>+
>+ /* wakeup work thread */
>+ _write(g_scm_pipe[1], "w", sizeof "w");
>+}
>+
>+
>+/* queue socket for processing CM work */
>+static void dapli_cm_queue(struct ib_cm_handle *cm_ptr)
>+{
>+ /* add to work queue for cr thread processing */
>+ dapl_llist_init_entry((DAPL_LLIST_ENTRY*)&cm_ptr->entry);
>+ dapl_os_lock( &cm_ptr->hca->ib_trans.lock );
>+
>dapl_llist_add_tail((DAPL_LLIST_HEAD*)&cm_ptr->hca->ib_trans.list,
>+ (DAPL_LLIST_ENTRY*)&cm_ptr->entry,
>(void*)cm_ptr);
>+ dapl_os_unlock(&cm_ptr->hca->ib_trans.lock);
>+
>+ /* wakeup CM work thread */
>+ _write(g_scm_pipe[1], "w", sizeof "w");
>+}
>+
>+
>
> static uint16_t
> dapli_get_lid(IN DAPL_HCA *hca, IN int port)
>@@ -123,6 +205,263 @@ dapli_get_lid(IN DAPL_HCA *hca, IN int port)
>
>
> /*
>+ * ACTIVE/PASSIVE: called from CR thread or consumer via ep_disconnect
>+ */
>+static DAT_RETURN
>+dapli_socket_disconnect(dp_ib_cm_handle_t cm_ptr)
>+{
>+ DAPL_EP *ep_ptr = cm_ptr->ep;
>+ DAT_UINT32 disc_data = htonl(0xdead);
>+
>+ if (ep_ptr == NULL)
>+ return DAT_SUCCESS;
>+
>+ dapl_os_lock(&cm_ptr->lock);
>+ if ((cm_ptr->state == SCM_INIT) ||
>+ (cm_ptr->state == SCM_DISCONNECTED)) {
>+ dapl_os_unlock(&cm_ptr->lock);
>+ return DAT_SUCCESS;
>+ } else {
>+ /* send disc date, close socket, schedule destroy */
>+ if (cm_ptr->socket >= 0) {
>+ send(cm_ptr->socket, (const char *)&disc_data,
>+ sizeof(disc_data), 0);
>+ closesocket(cm_ptr->socket);
>+ cm_ptr->socket = -1;
>+ }
>+ cm_ptr->state = SCM_DISCONNECTED;
>+ _write(g_scm_pipe[1], "w", sizeof "w");
>+ }
>+ dapl_os_unlock(&cm_ptr->lock);
>+
>+
>+ if (ep_ptr->cr_ptr) {
>+ dapls_cr_callback(cm_ptr,
>+ IB_CME_DISCONNECTED,
>+ NULL,
>+ ((DAPL_CR *)ep_ptr->cr_ptr)->sp_ptr);
>+ } else {
>+ dapl_evd_connection_callback(ep_ptr->cm_handle,
>+ IB_CME_DISCONNECTED,
>+ NULL,
>+ ep_ptr);
>+ }
>+
>+ /* remove reference from endpoint */
>+ ep_ptr->cm_handle = NULL;
>+
>+ /* schedule destroy */
>+
>+
>+ return DAT_SUCCESS;
>+}
>+
>+
>+
>+/*
>+ * PASSIVE: consumer accept, send local QP information, private data,
>+ * queue on work thread to receive RTU information to avoid blocking
>+ * user thread.
>+ */
>+static DAT_RETURN
>+dapli_socket_accept_usr( DAPL_EP *ep_ptr,
>+ DAPL_CR *cr_ptr,
>+ DAT_COUNT p_size,
>+ DAT_PVOID p_data )
>+{
>+ DAPL_IA *ia_ptr = ep_ptr->header.owner_ia;
>+ dp_ib_cm_handle_t cm_ptr = cr_ptr->ib_cm_handle;
>+ WSABUF iovec[2];
>+ int len, rc;
>+ short rtu_data = 0;
>+ ib_api_status_t ibs;
>+ ib_qp_attr_t qpa;
>+ dapl_ibal_port_t *p_port;
>+ dapl_ibal_ca_t *p_ca;
>+
>+ dapl_dbg_log (DAPL_DBG_TYPE_EP, "%s() p_sz %d sock %d
>port 0x%x\n",
>+ __FUNCTION__,p_size,cm_ptr->socket,
>+ ia_ptr->hca_ptr->port_num);
>+
>+ if (p_size > IB_MAX_REP_PDATA_SIZE)
>+ return DAT_LENGTH_ERROR;
>+
>+ /* must have a accepted socket */
>+ if ( cm_ptr->socket < 0 ) {
>+ dapl_dbg_log(DAPL_DBG_TYPE_EP,
>+ "%s() Not accepted socket? remote
>port=0x%x lid=0x%x"
>+ " qpn=0x%x psize=%d\n",
>+ cm_ptr->dst.port, cm_ptr->dst.lid,
>+ ntohs(cm_ptr->dst.qpn), cm_ptr->dst.p_size);
>+ return DAT_INTERNAL_ERROR;
>+ }
>+
>+ dapl_dbg_log(DAPL_DBG_TYPE_EP,
>+ " accept_usr: remote port=0x%x lid=0x%x"
>+ " qpn=0x%x psize=%d\n",
>+ cm_ptr->dst.port, cm_ptr->dst.lid,
>+ ntohs(cm_ptr->dst.qpn), cm_ptr->dst.p_size);
>+
>+ /* modify QP to RTR and then to RTS with remote info
>already read */
>+
>+ p_ca = (dapl_ibal_ca_t *) ia_ptr->hca_ptr->ib_hca_handle;
>+ p_port = dapli_ibal_get_port (p_ca,
>(uint8_t)ia_ptr->hca_ptr->port_num);
>+ if (!p_port)
>+ {
>+ dapl_dbg_log(DAPL_DBG_TYPE_ERR,
>+ "%s() dapli_ibal_get_port() failed @
>line #%d\n",
>+ __FUNCTION__,__LINE__);
>+ goto bail;
>+ }
>+
>+ dapl_dbg_log(DAPL_DBG_TYPE_EP,
>+ "%s() DST: qpn 0x%x port 0x%x lid %x
>psize %d\n",
>+ __FUNCTION__,
>+ cl_ntoh32(cm_ptr->dst.qpn),
>+ cm_ptr->dst.port,
>+ cl_ntoh16(cm_ptr->dst.lid),
>cm_ptr->dst.p_size);
>+
>+ /* modify QP to RTR and then to RTS with remote info */
>+
>+ ibs = dapls_modify_qp_state_to_rtr( ep_ptr->qp_handle,
>+ cm_ptr->dst.qpn,
>+ cm_ptr->dst.lid,
>+ p_port );
>+ if (ibs != IB_SUCCESS)
>+ {
>+ dapl_dbg_log(DAPL_DBG_TYPE_ERR,
>+ "%s() QP --> RTR failed @ line #%d\n",
>+ __FUNCTION__,__LINE__);
>+ goto bail;
>+ }
>+
>+ if ( dapls_modify_qp_state_to_rts( ep_ptr->qp_handle ) )
>+ {
>+ dapl_dbg_log(DAPL_DBG_TYPE_ERR,
>+ "%s() QP --> RTS failed @ line #%d\n",
>+ __FUNCTION__,__LINE__);
>+ goto bail;
>+ }
>+
>+ ep_ptr->qp_state = IB_QP_STATE_RTS;
>+
>+ /* save remote address information */
>+ dapl_os_memcpy( &ep_ptr->remote_ia_address,
>+ &cm_ptr->dst.ia_address,
>+ sizeof(ep_ptr->remote_ia_address));
>+
>+ /* determine QP & port numbers */
>+ ibs = ib_query_qp(ep_ptr->qp_handle, &qpa);
>+ if (ibs != IB_SUCCESS)
>+ {
>+ dapl_dbg_log(DAPL_DBG_TYPE_ERR,
>+ " ib_query_qp() ERR %s\n",
>ib_get_err_str(ibs));
>+ goto bail;
>+ }
>+
>+ /* Send our QP info, IA address, and private data */
>+ cm_ptr->dst.qpn = qpa.num; /* ib_net32_t */
>+ cm_ptr->dst.port = ia_ptr->hca_ptr->port_num;
>+ cm_ptr->dst.lid = dapli_get_lid(ia_ptr->hca_ptr,
>ia_ptr->hca_ptr->port_num);
>+ /* set gid in network order */
>+ ibs = dapls_ib_query_gid( ia_ptr->hca_ptr, &cm_ptr->dst.gid );
>+ if ( ibs != IB_SUCCESS )
>+ {
>+ dapl_dbg_log(DAPL_DBG_TYPE_ERR,
>+ "%s() dapls_ib_query_gid() returns '%s'\n",
>+ __FUNCTION__,ib_get_err_str(ibs));
>+ goto bail;
>+ }
>+
>+ cm_ptr->dst.ia_address = ia_ptr->hca_ptr->hca_address;
>+ cm_ptr->dst.p_size = p_size;
>+
>+ dapl_dbg_log(DAPL_DBG_TYPE_CM,
>+ "%s()\n Tx QP info: qpn %x port 0x%x lid 0x%x
>p_sz %d IP %s\n",
>+ __FUNCTION__, cl_ntoh32(cm_ptr->dst.qpn),
>cm_ptr->dst.port,
>+ cl_ntoh16(cm_ptr->dst.lid), cm_ptr->dst.p_size,
>+ dapli_get_ip_addr_str(&cm_ptr->dst.ia_address,NULL));
>+
>+ /* network byte-ordering - QPN & LID already are */
>+ cm_ptr->dst.p_size = cl_hton32(cm_ptr->dst.p_size);
>+ cm_ptr->dst.port = cl_hton16(cm_ptr->dst.port);
>+
>+ iovec[0].buf = (char*)&cm_ptr->dst;
>+ iovec[0].len = sizeof(ib_qp_cm_t);
>+ if (p_size) {
>+ iovec[1].buf = p_data;
>+ iovec[1].len = p_size;
>+ }
>+ rc = WSASend( cm_ptr->socket, iovec, (p_size ? 2:1),
>&len, 0, 0, 0 );
>+ if (rc || len != (p_size + sizeof(ib_qp_cm_t))) {
>+ dapl_dbg_log(DAPL_DBG_TYPE_ERR,
>+ " accept_usr: ERR %d, wcnt=%d\n",
>+ WSAGetLastError(), len);
>+ goto bail;
>+ }
>+ dapl_dbg_log(DAPL_DBG_TYPE_CM,
>+ " accept_usr: local port=0x%x lid=0x%x"
>+ " qpn=0x%x psize=%d\n",
>+ ntohs(cm_ptr->dst.port), ntohs(cm_ptr->dst.lid),
>+ ntohl(cm_ptr->dst.qpn),
>ntohl(cm_ptr->dst.p_size));
>+
>+ /* save state and reference to EP, queue for RTU data */
>+ cm_ptr->ep = ep_ptr;
>+ cm_ptr->hca = ia_ptr->hca_ptr;
>+ cm_ptr->state = SCM_ACCEPTED;
>+
>+ /* restore remote address information for query */
>+ dapl_os_memcpy( &cm_ptr->dst.ia_address,
>+ &ep_ptr->remote_ia_address,
>+ sizeof(cm_ptr->dst.ia_address));
>+
>+ dapl_dbg_log( DAPL_DBG_TYPE_EP," PASSIVE: accepted!\n" );
>+ dapli_cm_queue(cm_ptr);
>+
>+ return DAT_SUCCESS;
>+
>+bail:
>+ dapl_dbg_log( DAPL_DBG_TYPE_ERR," accept_usr: ERR
>!QP_RTR_RTS \n");
>+ dapli_cm_destroy(cm_ptr);
>+ dapls_ib_reinit_ep( ep_ptr ); /* reset QP state */
>+
>+ return DAT_INTERNAL_ERROR;
>+}
>+
>+
>+/*
>+ * PASSIVE: read RTU from active peer, post CONN event
>+ */
>+void
>+dapli_socket_accept_rtu(dp_ib_cm_handle_t cm_ptr)
>+{
>+ int len;
>+ short rtu_data = 0;
>+
>+ /* complete handshake after final QP state change */
>+ len = recv(cm_ptr->socket, (char*)&rtu_data,
>sizeof(rtu_data), 0);
>+ if ( len != sizeof(rtu_data) || ntohs(rtu_data) != 0x0e0f ) {
>+ dapl_dbg_log(DAPL_DBG_TYPE_ERR,
>+ " accept_rtu: ERR %d, rcnt=%d rdata=%x\n",
>+ WSAGetLastError(), len, ntohs(rtu_data) );
>+ goto bail;
>+ }
>+
>+ /* save state and reference to EP, queue for disc event */
>+ cm_ptr->state = SCM_CONNECTED;
>+
>+ /* final data exchange if remote QP state is good to go */
>+ dapl_dbg_log( DAPL_DBG_TYPE_EP," PASSIVE: connected!\n" );
>+ dapls_cr_callback(cm_ptr, IB_CME_CONNECTED, NULL, cm_ptr->sp);
>+ return;
>+bail:
>+ dapls_ib_reinit_ep(cm_ptr->ep); /* reset QP state */
>+ dapli_cm_destroy(cm_ptr);
>+ dapls_cr_callback(cm_ptr, IB_CME_DESTINATION_REJECT,
>NULL, cm_ptr->sp);
>+}
>+
>+
>+/*
> * ACTIVE: Create socket, connect, and exchange QP information
> */
> static DAT_RETURN
>@@ -143,21 +482,16 @@ dapli_socket_connect ( DAPL_EP
> *ep_ptr,
> dapl_ibal_port_t *p_port;
> dapl_ibal_ca_t *p_ca;
>
>- dapl_dbg_log(DAPL_DBG_TYPE_EP, " connect: r_qual
>%d\n", r_qual);
>+ dapl_dbg_log(DAPL_DBG_TYPE_EP, " connect: r_qual %d
>psize %d\n",
>+ r_qual, p_size);
>
>- /*
>- * Allocate CM and initialize
>- */
>- if ((cm_ptr = dapl_os_alloc(sizeof(*cm_ptr))) == NULL ) {
>+ cm_ptr = dapli_cm_create();
>+ if (cm_ptr == NULL)
> return DAT_INSUFFICIENT_RESOURCES;
>- }
>-
>- (void) dapl_os_memzero( cm_ptr, sizeof(*cm_ptr) );
>- cm_ptr->socket = -1;
>
> /* create, connect, sockopt, and exchange QP information */
> if ((cm_ptr->socket = socket(AF_INET,SOCK_STREAM,0)) < 0 ) {
>- dapl_os_free( cm_ptr, sizeof( *cm_ptr ) );
>+ dapli_cm_destroy(cm_ptr);
> return DAT_INSUFFICIENT_RESOURCES;
> }
>
>@@ -166,7 +500,7 @@ dapli_socket_connect ( DAPL_EP
> *ep_ptr,
> if (connect(cm_ptr->socket, r_addr, sizeof(*r_addr))
>== SOCKET_ERROR) {
> dapl_dbg_log(DAPL_DBG_TYPE_ERR, " connect: %d
>on r_qual %d\n",
> WSAGetLastError(), (unsigned int)r_qual);
>- dapl_os_free( cm_ptr, sizeof( *cm_ptr ) );
>+ dapli_cm_destroy(cm_ptr);
> return DAT_INVALID_ADDRESS;
> }
>
>@@ -175,6 +509,8 @@ dapli_socket_connect ( DAPL_EP
> *ep_ptr,
> (const char*)&opt,
> sizeof(opt) );
>
>+ dapl_dbg_log(DAPL_DBG_TYPE_EP, " socket connected!\n");
>+
> /* determine QP & port numbers */
> ibs = ib_query_qp(ep_ptr->qp_handle, &qpa);
> if (ibs != IB_SUCCESS)
>@@ -187,7 +523,6 @@ dapli_socket_connect ( DAPL_EP
> *ep_ptr,
> /* Send QP info, IA address and private data */
> cm_ptr->dst.qpn = qpa.num; /* ib_net32_t */
> cm_ptr->dst.port = cl_hton16(ia_ptr->hca_ptr->port_num);
>-
> cm_ptr->dst.lid = dapli_get_lid( ia_ptr->hca_ptr,
> ia_ptr->hca_ptr->port_num );
> if (cm_ptr->dst.lid == 0)
>@@ -197,6 +532,17 @@ dapli_socket_connect ( DAPL_EP
> *ep_ptr,
> __FUNCTION__, __LINE__);
> goto bail;
> }
>+
>+ /* set gid in network order */
>+ ibs = dapls_ib_query_gid( ia_ptr->hca_ptr, &cm_ptr->dst.gid );
>+ if ( ibs != IB_SUCCESS )
>+ {
>+ dapl_dbg_log(DAPL_DBG_TYPE_ERR,
>+ "%s() dapls_ib_query_gid() returns '%s'\n",
>+ __FUNCTION__,ib_get_err_str(ibs));
>+ goto bail;
>+ }
>+
> cm_ptr->dst.ia_address = ia_ptr->hca_ptr->hca_address;
> cm_ptr->dst.p_size = cl_hton32(p_size);
>
>@@ -213,6 +559,8 @@ dapli_socket_connect ( DAPL_EP
> *ep_ptr,
> iovec[1].buf = p_data;
> iovec[1].len = p_size;
> }
>+
>+ dapl_dbg_log(DAPL_DBG_TYPE_EP," socket connected,
>write QP and private data\n");
> rc = WSASend (cm_ptr->socket, iovec, (p_size ? 2:1),
>&len, 0, 0, NULL);
> if ( rc || len != (p_size + sizeof(ib_qp_cm_t)) ) {
> dapl_dbg_log(DAPL_DBG_TYPE_ERR,
>@@ -225,17 +573,65 @@ dapli_socket_connect ( DAPL_EP
> *ep_ptr,
> cm_ptr->dst.port, cm_ptr->dst.lid,
> cm_ptr->dst.qpn, cm_ptr->dst.p_size );
>
>+ /* queue up to work thread to avoid blocking consumer */
>+ cm_ptr->state = SCM_CONN_PENDING;
>+ cm_ptr->hca = ia_ptr->hca_ptr;
>+ cm_ptr->ep = ep_ptr;
>+ dapli_cm_queue(cm_ptr);
>+ return DAT_SUCCESS;
>+bail:
>+ /* close socket, free cm structure */
>+ dapli_cm_destroy(cm_ptr);
>+ return DAT_INTERNAL_ERROR;
>+}
>+
>+
>+/*
>+ * ACTIVE: exchange QP information, called from CR thread
>+ */
>+void
>+dapli_socket_connect_rtu(dp_ib_cm_handle_t cm_ptr)
>+{
>+ DAPL_EP *ep_ptr = cm_ptr->ep;
>+ DAPL_IA *ia_ptr = cm_ptr->ep->header.owner_ia;
>+ int len, rc;
>+ DWORD ioflags;
>+ WSABUF iovec[1];
>+ short rtu_data = htons(0x0E0F);
>+ ib_cm_events_t event = IB_CME_DESTINATION_REJECT;
>+ ib_api_status_t ibs;
>+ dapl_ibal_port_t *p_port;
>+ dapl_ibal_ca_t *p_ca;
>+
> /* read DST information into cm_ptr, overwrite SRC info */
>+ dapl_dbg_log(DAPL_DBG_TYPE_EP," connect_rtu: recv peer
>QP data\n");
>+
>+ iovec[0].buf = (char*)&cm_ptr->dst;
>+ iovec[0].len = sizeof(ib_qp_cm_t);
> ioflags = len = 0;
> rc = WSARecv (cm_ptr->socket, iovec, 1, &len, &ioflags, 0, 0);
>- if ( rc == SOCKET_ERROR || len != sizeof(ib_qp_cm_t) ) {
>- dapl_dbg_log(DAPL_DBG_TYPE_ERR,"connect read:
>ERR %d rcnt=%d\n",
>- WSAGetLastError(), len);
>+ if ( rc == SOCKET_ERROR || len != sizeof(ib_qp_cm_t) ||
>+ ntohs(cm_ptr->dst.ver)
>!= DSCM_VER )
>+ {
>+ dapl_dbg_log(DAPL_DBG_TYPE_ERR,
>+ "connect_rtu read: ERR %d rcnt=%d
>ver=%d\n",
>+ WSAGetLastError(), len, cm_ptr->dst.ver);
>+ goto bail;
>+ }
>+
>+ /* check for consumer reject */
>+ if (cm_ptr->dst.rej) {
>+ dapl_dbg_log(DAPL_DBG_TYPE_CM,
>+ " connect_rtu read: PEER REJ
>reason=0x%x\n",
>+ ntohs(cm_ptr->dst.rej));
>+ event = IB_CME_DESTINATION_REJECT_PRIVATE_DATA;
> goto bail;
> }
>
>- /* revert back to host byte ordering */
>+ /* convert peer response values to host order */
> cm_ptr->dst.port = cl_ntoh16(cm_ptr->dst.port);
>+ cm_ptr->dst.lid = ntohs(cm_ptr->dst.lid);
>+ cm_ptr->dst.qpn = cm_ptr->dst.qpn;
> cm_ptr->dst.p_size = cl_ntoh32(cm_ptr->dst.p_size);
>
> dapl_dbg_log(DAPL_DBG_TYPE_EP, " connect: Rx DST: qpn
>%x port %d "
>@@ -245,15 +641,27 @@ dapli_socket_connect ( DAPL_EP
> *ep_ptr,
> cm_ptr->dst.p_size,
>
>dapli_get_ip_addr_str(&cm_ptr->dst.ia_address,NULL));
>
>+ /* save remote address information */
>+ dapl_os_memcpy( &ep_ptr->remote_ia_address,
>+ &cm_ptr->dst.ia_address,
>+ sizeof(ep_ptr->remote_ia_address));
>+
>+ dapl_dbg_log(DAPL_DBG_TYPE_EP,
>+ " connect_rtu: DST %s port=0x%x lid=0x%x,
>qpn=0x%x, psize=%d\n",
>+ inet_ntoa(((struct sockaddr_in
>*)&cm_ptr->dst.ia_address)->sin_addr),
>+ cm_ptr->dst.port, cm_ptr->dst.lid,
>+ cm_ptr->dst.qpn, cm_ptr->dst.p_size);
>+
> /* validate private data size before reading */
>- if ( cm_ptr->dst.p_size > IB_MAX_REP_PDATA_SIZE ) {
>+ if (cm_ptr->dst.p_size > IB_MAX_REP_PDATA_SIZE) {
> dapl_dbg_log(DAPL_DBG_TYPE_ERR,
>- " connect read: psize (%d) wrong\n",
>+ " connect_rtu read: psize (%d) wrong\n",
> cm_ptr->dst.p_size );
> goto bail;
> }
>
> /* read private data into cm_handle if any present */
>+ dapl_dbg_log(DAPL_DBG_TYPE_EP," socket connected, read
>private data\n");
> if ( cm_ptr->dst.p_size ) {
> iovec[0].buf = cm_ptr->p_data;
> iovec[0].len = cm_ptr->dst.p_size;
>@@ -300,32 +708,29 @@ dapli_socket_connect ( DAPL_EP
> *ep_ptr,
>
> ep_ptr->qp_state = IB_QP_STATE_RTS;
>
>+ dapl_dbg_log(DAPL_DBG_TYPE_EP," connect_rtu: send RTU\n");
>+
> /* complete handshake after final QP state change */
> send(cm_ptr->socket, (const char *)&rtu_data,
>sizeof(rtu_data), 0);
>
> /* init cm_handle and post the event with private data */
> ep_ptr->cm_handle = cm_ptr;
>+ cm_ptr->state = SCM_CONNECTED;
> dapl_dbg_log( DAPL_DBG_TYPE_EP," ACTIVE: connected!\n" );
>
> dapl_evd_connection_callback( ep_ptr->cm_handle,
> IB_CME_CONNECTED,
> cm_ptr->p_data,
> ep_ptr );
>- return DAT_SUCCESS;
>-
>+ return;
> bail:
> /* close socket, free cm structure and post error event */
>- if ( cm_ptr->socket >= 0 )
>- closesocket(cm_ptr->socket);
>-
>- dapl_os_free( cm_ptr, sizeof( *cm_ptr ) );
>- dapls_ib_reinit_ep( ep_ptr ); /* reset QP state */
>-
>- dapl_evd_connection_callback( ep_ptr->cm_handle,
>- IB_CME_LOCAL_FAILURE,
>+ dapli_cm_destroy(cm_ptr);
>+ dapls_ib_reinit_ep(ep_ptr); /* reset QP state */
>+ dapl_evd_connection_callback( NULL /*ep_ptr->cm_handle*/,
>+ event,
> NULL,
> ep_ptr );
>- return DAT_INTERNAL_ERROR;
> }
>
>
>@@ -347,14 +752,12 @@ dapli_socket_listen ( DAPL_IA
> *ia_ptr,
> ia_ptr, serviceID, sp_ptr);
>
> /* Allocate CM and initialize */
>- if ((cm_ptr = dapl_os_alloc(sizeof(*cm_ptr))) == NULL)
>+ cm_ptr = dapli_cm_create();
>+ if (cm_ptr == NULL)
> return DAT_INSUFFICIENT_RESOURCES;
>
>- (void) dapl_os_memzero( cm_ptr, sizeof( *cm_ptr ) );
>-
>- cm_ptr->socket = cm_ptr->l_socket = -1;
> cm_ptr->sp = sp_ptr;
>- cm_ptr->hca_ptr = ia_ptr->hca_ptr;
>+ cm_ptr->hca = ia_ptr->hca_ptr;
>
> /* bind, listen, set sockopt, accept, exchange data */
> if ((cm_ptr->l_socket = socket(AF_INET, SOCK_STREAM, 0)) < 0) {
>@@ -406,12 +809,9 @@ dapli_socket_listen ( DAPL_IA
> *ia_ptr,
> /* set cm_handle for this service point, save listen socket */
> sp_ptr->cm_srvc_handle = cm_ptr;
>
>- /* add to SP->CR thread list */
>- dapl_llist_init_entry((DAPL_LLIST_ENTRY*)&cm_ptr->entry);
>- dapl_os_lock( &cm_ptr->hca_ptr->ib_trans.lock );
>-
>dapl_llist_add_tail((DAPL_LLIST_HEAD*)&cm_ptr->hca_ptr->ib_trans.list,
>- (DAPL_LLIST_ENTRY*)&cm_ptr->entry,
>(void*)cm_ptr);
>- dapl_os_unlock(&cm_ptr->hca_ptr->ib_trans.lock);
>+ /* queue up listen socket to process inbound CR's */
>+ cm_ptr->state = SCM_LISTEN;
>+ dapli_cm_queue(cm_ptr);
>
> dapl_dbg_log( DAPL_DBG_TYPE_CM,
> " listen: qual 0x%x cr %p s_fd %d\n",
>@@ -421,10 +821,7 @@ dapli_socket_listen ( DAPL_IA
> *ia_ptr,
> bail:
> dapl_dbg_log( DAPL_DBG_TYPE_CM,
> " listen: ERROR on conn_qual
>0x%x\n",serviceID);
>- if ( cm_ptr->l_socket >= 0 )
>- closesocket( cm_ptr->l_socket );
>-
>- dapl_os_free( cm_ptr, sizeof( *cm_ptr ) );
>+ dapli_cm_destroy(cm_ptr);
> return dat_status;
> }
>
>@@ -441,6 +838,8 @@ dapli_socket_accept(ib_cm_srvc_handle_t cm_ptr)
> int len;
> DAT_RETURN dat_status = DAT_SUCCESS;
>
>+ dapl_dbg_log(DAPL_DBG_TYPE_EP," socket_accept\n");
>+
> /* Allocate accept CM and initialize */
> if ((acm_ptr = dapl_os_alloc(sizeof(*acm_ptr))) == NULL)
> return DAT_INSUFFICIENT_RESOURCES;
>@@ -448,8 +847,9 @@ dapli_socket_accept(ib_cm_srvc_handle_t cm_ptr)
> (void) dapl_os_memzero( acm_ptr, sizeof( *acm_ptr ) );
>
> acm_ptr->socket = -1;
>+ acm_ptr->l_socket = -1;
> acm_ptr->sp = cm_ptr->sp;
>- acm_ptr->hca_ptr = cm_ptr->hca_ptr;
>+ acm_ptr->hca = cm_ptr->hca;
>
> len = sizeof(acm_ptr->dst.ia_address);
> acm_ptr->socket = accept(cm_ptr->l_socket,
>@@ -464,27 +864,32 @@ dapli_socket_accept(ib_cm_srvc_handle_t cm_ptr)
> goto bail;
> }
>
>+ dapl_dbg_log(DAPL_DBG_TYPE_EP," socket accepted, read
>QP data\n");
>+
> /* read in DST QP info, IA address. check for private data */
> len =
>recv(acm_ptr->socket,(char*)&acm_ptr->dst,sizeof(ib_qp_cm_t),0);
>- if ( len != sizeof(ib_qp_cm_t) ) {
>+ if ( len != sizeof(ib_qp_cm_t) ||
>ntohs(acm_ptr->dst.ver) != DSCM_VER )
>+ {
> dapl_dbg_log(DAPL_DBG_TYPE_ERR,
>- " accept read: ERR %d, rcnt=%d\n",
>- WSAGetLastError(), len);
>+ " accept read: ERR %d, rcnt=%d ver=%d\n",
>+ WSAGetLastError(), len, acm_ptr->dst.ver);
> dat_status = DAT_INTERNAL_ERROR;
> goto bail;
>
> }
>- /* revert back to host byte ordering */
>+ /* convert accepted values to host byte ordering */
> acm_ptr->dst.port = cl_ntoh16(acm_ptr->dst.port);
>+ acm_ptr->dst.lid = ntohs(acm_ptr->dst.lid);
>+ acm_ptr->dst.qpn = acm_ptr->dst.qpn;
> acm_ptr->dst.p_size = cl_ntoh32(acm_ptr->dst.p_size);
>
>- dapl_dbg_log(DAPL_DBG_TYPE_EP, " accept: DST
>sizeof(ib_cm_t) %d qpn %x "
>- "port %d lid 0x%x psize %d IP %s\n",
>- sizeof(ib_qp_cm_t),
>- cl_ntoh32(acm_ptr->dst.qpn), acm_ptr->dst.port,
>+ dapl_dbg_log(DAPL_DBG_TYPE_EP, " accept: DST %s port 0x%x "
>+ "lid 0x%x qpn 0x%x psize %d\n",
>+ dapli_get_ip_addr_str(&acm_ptr->dst.ia_address,NULL),
>+ acm_ptr->dst.port,
> cl_ntoh16(acm_ptr->dst.lid),
>- acm_ptr->dst.p_size,
>- dapli_get_ip_addr_str(&acm_ptr->dst.ia_address,NULL));
>+ cl_ntoh32(acm_ptr->dst.qpn),
>+ acm_ptr->dst.p_size);
>
> /* validate private data size before reading */
> if ( acm_ptr->dst.p_size > IB_MAX_REQ_PDATA_SIZE ) {
>@@ -495,6 +900,8 @@ dapli_socket_accept(ib_cm_srvc_handle_t cm_ptr)
> goto bail;
> }
>
>+ dapl_dbg_log(DAPL_DBG_TYPE_EP," socket accepted, read
>private data\n");
>+
> /* read private data into cm_handle if any present */
> if ( acm_ptr->dst.p_size ) {
> len = recv( acm_ptr->socket,
>@@ -514,6 +921,8 @@ dapli_socket_accept(ib_cm_srvc_handle_t cm_ptr)
> p_data = acm_ptr->p_data;
> }
>
>+ acm_ptr->state = SCM_ACCEPTING;
>+
> /* trigger CR event and return SUCCESS */
> dapls_cr_callback( acm_ptr,
> IB_CME_CONNECTION_REQUEST_PENDING,
>@@ -521,153 +930,8 @@ dapli_socket_accept(ib_cm_srvc_handle_t cm_ptr)
> acm_ptr->sp );
>
> return DAT_SUCCESS;
>-
>-bail:
>- if ( acm_ptr->socket >= 0 )
>- closesocket( acm_ptr->socket );
>-
>- dapl_os_free( acm_ptr, sizeof( *acm_ptr ) );
>- return DAT_INTERNAL_ERROR;
>-}
>-
>-
>-static DAT_RETURN
>-dapli_socket_accept_final( DAPL_EP *ep_ptr,
>- DAPL_CR *cr_ptr,
>- DAT_COUNT p_size,
>- DAT_PVOID p_data )
>-{
>- DAPL_IA *ia_ptr = ep_ptr->header.owner_ia;
>- dp_ib_cm_handle_t cm_ptr = cr_ptr->ib_cm_handle;
>- ib_qp_cm_t qp_cm;
>- WSABUF iovec[2];
>- int len, rc;
>- short rtu_data = 0;
>- ib_api_status_t ibs;
>- ib_qp_attr_t qpa;
>- dapl_ibal_port_t *p_port;
>- dapl_ibal_ca_t *p_ca;
>-
>- dapl_dbg_log (DAPL_DBG_TYPE_EP, "%s() p_sz %d sock %d
>port %d\n",
>- __FUNCTION__,p_size,cm_ptr->socket,
>- ia_ptr->hca_ptr->port_num);
>-
>- if (p_size > IB_MAX_REP_PDATA_SIZE)
>- return DAT_LENGTH_ERROR;
>-
>- /* must have a accepted socket */
>- if ( cm_ptr->socket < 0 )
>- return DAT_INTERNAL_ERROR;
>-
>- /* modify QP to RTR and then to RTS with remote info
>already read */
>-
>- p_ca = (dapl_ibal_ca_t *) ia_ptr->hca_ptr->ib_hca_handle;
>- p_port = dapli_ibal_get_port (p_ca,
>(uint8_t)ia_ptr->hca_ptr->port_num);
>- if (!p_port)
>- {
>- dapl_dbg_log(DAPL_DBG_TYPE_ERR,
>- "%s() dapli_ibal_get_port() failed @
>line #%d\n",
>- __FUNCTION__,__LINE__);
>- goto bail;
>- }
>-
>- dapl_dbg_log(DAPL_DBG_TYPE_EP, "%s() DST: qpn %x port
>%d lid %x\n",
>- __FUNCTION__,
>- cl_ntoh32(cm_ptr->dst.qpn),
>- cm_ptr->dst.port,
>- cl_ntoh16(cm_ptr->dst.lid));
>-
>- /* modify QP to RTR and then to RTS with remote info */
>-
>- ibs = dapls_modify_qp_state_to_rtr( ep_ptr->qp_handle,
>- cm_ptr->dst.qpn,
>- cm_ptr->dst.lid,
>- p_port );
>- if (ibs != IB_SUCCESS)
>- {
>- dapl_dbg_log(DAPL_DBG_TYPE_ERR,
>- "%s() QP --> RTR failed @ line #%d\n",
>- __FUNCTION__,__LINE__);
>- goto bail;
>- }
>-
>- if ( dapls_modify_qp_state_to_rts( ep_ptr->qp_handle ) )
>- {
>- dapl_dbg_log(DAPL_DBG_TYPE_ERR,
>- "%s() QP --> RTS failed @ line #%d\n",
>- __FUNCTION__,__LINE__);
>- goto bail;
>- }
>-
>- ep_ptr->qp_state = IB_QP_STATE_RTS;
>-
>- /* determine QP & port numbers */
>- ibs = ib_query_qp(ep_ptr->qp_handle, &qpa);
>- if (ibs != IB_SUCCESS)
>- {
>- dapl_dbg_log(DAPL_DBG_TYPE_ERR,
>- " ib_query_qp() ERR %s\n",
>ib_get_err_str(ibs));
>- goto bail;
>- }
>-
>- /* Send QP info, IA address, and private data */
>- qp_cm.qpn = qpa.num; /* ib_net32_t */
>- qp_cm.port = ia_ptr->hca_ptr->port_num;
>- qp_cm.lid = dapli_get_lid( ia_ptr->hca_ptr,
>ia_ptr->hca_ptr->port_num );
>- qp_cm.ia_address = ia_ptr->hca_ptr->hca_address;
>- qp_cm.p_size = p_size;
>-
>- dapl_dbg_log(DAPL_DBG_TYPE_CM,
>- "%s()\n Tx QP info: qpn %x port %d lid 0x%x
>p_sz %d IP %s\n",
>- __FUNCTION__, cl_ntoh32(qp_cm.qpn), qp_cm.port,
>- cl_ntoh16(qp_cm.lid), qp_cm.p_size,
>- dapli_get_ip_addr_str(&qp_cm.ia_address,NULL));
>-
>- /* network byte-ordering - QPN & LID already are */
>- qp_cm.p_size = cl_hton32(qp_cm.p_size);
>- qp_cm.port = cl_hton16(qp_cm.port);
>-
>- iovec[0].buf = (char*)&qp_cm;
>- iovec[0].len = sizeof(ib_qp_cm_t);
>- if (p_size) {
>- iovec[1].buf = p_data;
>- iovec[1].len = p_size;
>- }
>- rc = WSASend( cm_ptr->socket, iovec, (p_size ? 2:1),
>&len, 0, 0, 0 );
>- if (rc || len != (p_size + sizeof(ib_qp_cm_t))) {
>- dapl_dbg_log(DAPL_DBG_TYPE_ERR,
>- " accept_final: ERR %d, wcnt=%d\n",
>- WSAGetLastError(), len);
>- goto bail;
>- }
>- dapl_dbg_log(DAPL_DBG_TYPE_EP,
>- " accept_final: SRC qpn %x port %d lid
>0x%x psize %d\n",
>- qp_cm.qpn, qp_cm.port, qp_cm.lid, qp_cm.p_size );
>-
>- /* complete handshake after final QP state change */
>- len = recv(cm_ptr->socket, (char*)&rtu_data,
>sizeof(rtu_data), 0);
>- if ( len != sizeof(rtu_data) || ntohs(rtu_data) != 0x0e0f ) {
>- dapl_dbg_log(DAPL_DBG_TYPE_ERR,
>- " accept_final: ERR %d, rcnt=%d
>rdata=%x\n",
>- WSAGetLastError(), len, ntohs(rtu_data) );
>- goto bail;
>- }
>-
>- /* final data exchange if remote QP state is good to go */
>- dapl_dbg_log( DAPL_DBG_TYPE_EP," PASSIVE: connected!\n" );
>-
>- dapls_cr_callback ( cm_ptr, IB_CME_CONNECTED, NULL,
>cm_ptr->sp );
>-
>- return DAT_SUCCESS;
>-
> bail:
>- dapl_dbg_log( DAPL_DBG_TYPE_ERR," accept_final: ERR
>!QP_RTR_RTS \n");
>- if ( cm_ptr->socket >= 0 )
>- closesocket( cm_ptr->socket );
>-
>- dapl_os_free( cm_ptr, sizeof( *cm_ptr ) );
>- dapls_ib_reinit_ep( ep_ptr ); /* reset QP state */
>-
>+ dapli_cm_destroy(acm_ptr);
> return DAT_INTERNAL_ERROR;
> }
>
>@@ -747,11 +1011,7 @@ dapls_ib_disconnect (
> dapl_dbg_log (DAPL_DBG_TYPE_EP,
> "dapls_ib_disconnect(ep_handle %p
>....)\n", ep_ptr);
>
>- if ( cm_ptr->socket >= 0 ) {
>- closesocket( cm_ptr->socket );
>- cm_ptr->socket = -1;
>- }
>-
>+#if 0 // XXX
> /* disconnect QP ala transition to RESET state */
> ib_status = dapls_modify_qp_state_to_reset (ep_ptr->qp_handle);
>
>@@ -776,15 +1036,18 @@ dapls_ib_disconnect (
> NULL,
> ep_ptr );
> ep_ptr->cm_handle = NULL;
>- dapl_os_free( cm_ptr, sizeof( *cm_ptr ) );
> }
>-
>+#endif
> /* modify QP state --> INIT */
> dapls_ib_reinit_ep(ep_ptr);
>
>+ if (cm_ptr == NULL)
> return DAT_SUCCESS;
>+ else
>+ return dapli_socket_disconnect(cm_ptr);
> }
>
>+
> /*
> * dapls_ib_disconnect_clean
> *
>@@ -874,14 +1137,20 @@ dapls_ib_remove_conn_listener (
> if ( cm_ptr != NULL ) {
> if ( cm_ptr->l_socket >= 0 ) {
> closesocket( cm_ptr->l_socket );
>+ cm_ptr->l_socket = -1;
>+ }
>+ if ( cm_ptr->socket >= 0 ) {
>+ closesocket( cm_ptr->socket );
> cm_ptr->socket = -1;
> }
> /* cr_thread will free */
> sp_ptr->cm_srvc_handle = NULL;
>+ _write(g_scm_pipe[1], "w", sizeof "w");
> }
> return DAT_SUCCESS;
> }
>
>+
> /*
> * dapls_ib_accept_connection
> *
>@@ -928,7 +1197,7 @@ dapls_ib_accept_connection (
> return status;
> }
>
>- return ( dapli_socket_accept_final(ep_ptr, cr_ptr,
>p_size, p_data) );
>+ return ( dapli_socket_accept_usr(ep_ptr, cr_ptr,
>p_size, p_data) );
> }
>
>
>@@ -948,27 +1217,39 @@ dapls_ib_accept_connection (
> * DAT_INTERNAL_ERROR
> *
> */
>+
> DAT_RETURN
> dapls_ib_reject_connection (
>- IN dp_ib_cm_handle_t ib_cm_handle,
>+ IN dp_ib_cm_handle_t cm_ptr,
> IN int reject_reason,
>- IN DAT_COUNT private_data_size,
>- IN const DAT_PVOID private_data)
>+ IN DAT_COUNT psize,
>+ IN const DAT_PVOID pdata)
> {
>- ib_cm_srvc_handle_t cm_ptr = ib_cm_handle;
>+ WSABUF iovec[1];
>+ int len;
>
> dapl_dbg_log (DAPL_DBG_TYPE_EP,
>- "dapls_ib_reject_connection(cm_handle %p
>reason %x)\n",
>- ib_cm_handle, reject_reason );
>-
>- /* just close the socket and return */
>- if ( cm_ptr->socket > 0 ) {
>- closesocket( cm_ptr->socket );
>+ " reject(cm %p reason %x pdata %p psize %d)\n",
>+ cm_ptr, reject_reason, pdata, psize );
>+
>+ /* write reject data to indicate reject */
>+ if (cm_ptr->socket >= 0) {
>+ cm_ptr->dst.rej = (uint16_t)reject_reason;
>+ cm_ptr->dst.rej = cl_hton16(cm_ptr->dst.rej);
>+ iovec[0].buf = (char*)&cm_ptr->dst;
>+ iovec[0].len = sizeof(ib_qp_cm_t);
>+ (void) WSASend (cm_ptr->socket, iovec, 1,
>&len, 0, 0, NULL);
>+ closesocket(cm_ptr->socket);
> cm_ptr->socket = -1;
> }
>+
>+ /* cr_thread will destroy CR */
>+ cm_ptr->state = SCM_REJECTED;
>+ _write(g_scm_pipe[1], "w", sizeof "w");
> return DAT_SUCCESS;
> }
>
>+
> /*
> * dapls_ib_cm_remote_addr
> *
>@@ -1157,7 +1438,7 @@ dapls_ib_get_dat_event (
>
>
> /*
>- * dapls_ib_get_dat_event
>+ * dapls_ib_get_cm_event
> *
> * Return a DAT connection event given a provider CM event.
> *
>@@ -1189,12 +1470,16 @@ dapls_ib_get_cm_event (
> }
> #endif /* NOT_USED */
>
>-/* async CR processing thread to avoid blocking applications */
>+/* outbound/inbound CR processing thread to avoid blocking
>applications */
>+
>+#define SCM_MAX_CONN (8 * sizeof(fd_set))
>+
> void cr_thread(void *arg)
> {
> struct dapl_hca *hca_ptr = arg;
> ib_cm_srvc_handle_t cr, next_cr;
> int max_fd, rc;
>+ char rbuf[2];
> fd_set rfd, rfds;
> struct timeval to;
>
>@@ -1202,10 +1487,12 @@ void cr_thread(void *arg)
>
> dapl_os_lock( &hca_ptr->ib_trans.lock );
> hca_ptr->ib_trans.cr_state = IB_THREAD_RUN;
>+
> while (hca_ptr->ib_trans.cr_state == IB_THREAD_RUN) {
>
> FD_ZERO( &rfds );
>- max_fd = -1;
>+ FD_SET(g_scm_pipe[0], &rfds);
>+ max_fd = g_scm_pipe[0];
>
> if
>(!dapl_llist_is_empty((DAPL_LLIST_HEAD*)&hca_ptr->ib_trans.list))
> next_cr = dapl_llist_peek_head((DAPL_LLIST_HEAD*)
>@@ -1230,32 +1517,46 @@ void cr_thread(void *arg)
> continue;
> }
>
>- FD_SET( cr->l_socket, &rfds ); /* add to select set */
>- if ( cr->l_socket > max_fd )
>+ if (cr->socket > SCM_MAX_CONN-1) {
>+ dapl_dbg_log(DAPL_DBG_TYPE_ERR,
>+ "SCM ERR: cr->socket(%d) exceeded
>FD_SETSIZE %d\n",
>+ cr->socket,SCM_MAX_CONN-1);
>+ continue;
>+ }
>+ FD_SET( cr->socket, &rfds ); /* add to select SET */
>+ if ( cr->socket > max_fd )
> max_fd = cr->l_socket;
>
> /* individual select poll to check for work */
> FD_ZERO(&rfd);
>- FD_SET(cr->l_socket, &rfd);
>+ FD_SET(cr->socket, &rfd);
> dapl_os_unlock(&hca_ptr->ib_trans.lock);
>
> to.tv_sec = 0;
> to.tv_usec = 0; /* wakeup and check destroy */
>
> /* block waiting for Rx data */
>- if (select(cr->l_socket+1,&rfd,NULL,NULL,&to) ==
>SOCKET_ERROR) {
>+ if (select(cr->socket+1,&rfd,NULL,NULL,&to) ==
>SOCKET_ERROR) {
> rc = WSAGetLastError();
> if ( rc != SOCKET_ERROR /*WSAENOTSOCK*/ )
> {
> dapl_dbg_log (DAPL_DBG_TYPE_ERR/*CM*/,
> " thread: select(sock %d) ERR
>%d on cr %p\n",
>- cr->l_socket, rc, cr);
>+ cr->socket, rc, cr);
>+ }
>+ closesocket(cr->socket);
>+ cr->socket = -1;
>+ } else if (FD_ISSET(cr->socket,&rfd)) {
>+ if (cr->socket > 0) {
>+ if (cr->state == SCM_LISTEN)
>+ dapli_socket_accept(cr);
>+ else if (cr->state == SCM_ACCEPTED)
>+ dapli_socket_accept_rtu(cr);
>+ else if (cr->state == SCM_CONN_PENDING)
>+ dapli_socket_connect_rtu(cr);
>+ else if (cr->state == SCM_CONNECTED)
>+ dapli_socket_disconnect(cr);
> }
>- closesocket(cr->l_socket);
>- cr->l_socket = -1;
>- } else if (FD_ISSET(cr->l_socket,&rfd) &&
>dapli_socket_accept(cr)) {
>- closesocket(cr->l_socket);
>- cr->l_socket = -1;
> }
> dapl_os_lock( &hca_ptr->ib_trans.lock );
> next_cr = dapl_llist_next_entry((DAPL_LLIST_HEAD*)
>@@ -1263,9 +1564,19 @@ void cr_thread(void *arg)
>
>(DAPL_LLIST_ENTRY*)&cr->entry );
> }
> dapl_os_unlock( &hca_ptr->ib_trans.lock );
>+
> to.tv_sec = 0;
> to.tv_usec = 100000; /* wakeup and check destroy */
>+
> (void) select(max_fd+1, &rfds, NULL, NULL, &to);
>+
>+ /* if pipe data consume - used to wake this thread up */
>+ if (FD_ISSET(g_scm_pipe[0],&rfds)) {
>+ dapl_dbg_log(DAPL_DBG_TYPE_CM," cr_thread()
>read pipe data\n");
>+printf(" cr_thread() read pipe data\n");
>+ _read(g_scm_pipe[0], rbuf, 2);
>+printf(" cr_thread() Finished read pipe data\n");
>+ }
> dapl_os_lock( &hca_ptr->ib_trans.lock );
> }
> dapl_os_unlock( &hca_ptr->ib_trans.lock );
>diff --git a/dapl/ibal-scm/dapl_ibal-scm_util.c
>b/dapl/ibal-scm/dapl_ibal-scm_util.c
>index 8e5f8ac..06bc704 100644
>--- a/dapl/ibal-scm/dapl_ibal-scm_util.c
>+++ b/dapl/ibal-scm/dapl_ibal-scm_util.c
>@@ -52,6 +52,7 @@ static const char rcsid[] = "$Id: $";
> #include "dapl.h"
> #include "dapl_adapter_util.h"
> #include "dapl_ibal_util.h"
>+#include "dapl_ibal_name_service.h"
>
> #include <stdio.h>
> #include <stdlib.h>
>@@ -61,9 +62,12 @@ static const char rcsid[] = "$Id: $";
> #include <winsock2.h>
> #include <ws2tcpip.h>
> #include <io.h>
>+#include <fcntl.h>
>
>+extern void cr_thread(void *arg);
>
> int g_dapl_loopback_connection = 0;
>+int g_scm_pipe[2];
>
> #ifdef NOT_USED
>
>@@ -132,22 +136,55 @@ DAT_RETURN dapli_init_sock_cm ( IN
>DAPL_HCA *hca_ptr )
>
> dapl_dbg_log (DAPL_DBG_TYPE_UTIL, " %s():
>%p\n",__FUNCTION__,hca_ptr );
>
>- /* set inline max with enviroment or default */
>+ /* set RC tunables via enviroment or default */
> hca_ptr->ib_trans.max_inline_send =
>- dapl_os_get_env_val ( "DAPL_MAX_INLINE",
>INLINE_SEND_DEFAULT );
>+ dapl_os_get_env_val("DAPL_MAX_INLINE",
>INLINE_SEND_DEFAULT);
>+#if 0
>+ hca_ptr->ib_trans.ack_retry =
>+ dapl_os_get_env_val("DAPL_ACK_RETRY", SCM_ACK_RETRY);
>+ hca_ptr->ib_trans.ack_timer =
>+ dapl_os_get_env_val("DAPL_ACK_TIMER", SCM_ACK_TIMER);
>+ hca_ptr->ib_trans.rnr_retry =
>+ dapl_os_get_env_val("DAPL_RNR_RETRY", SCM_RNR_RETRY);
>+ hca_ptr->ib_trans.rnr_timer =
>+ dapl_os_get_env_val("DAPL_RNR_TIMER", SCM_RNR_TIMER);
>+ hca_ptr->ib_trans.global =
>+ dapl_os_get_env_val("DAPL_GLOBAL_ROUTING", SCM_GLOBAL);
>+ hca_ptr->ib_trans.hop_limit =
>+ dapl_os_get_env_val("DAPL_HOP_LIMIT", SCM_HOP_LIMIT);
>+ hca_ptr->ib_trans.tclass =
>+ dapl_os_get_env_val("DAPL_TCLASS", SCM_TCLASS);
>+#endif
>
> /* initialize cr_list lock */
> dat_status = dapl_os_lock_init(&hca_ptr->ib_trans.lock);
> if (dat_status != DAT_SUCCESS)
> {
> dapl_dbg_log (DAPL_DBG_TYPE_ERR,
>- " open_hca: failed to init lock\n");
>+ "%s() failed to init cr_list lock\n",
>__FUNCTION__);
> return DAT_INTERNAL_ERROR;
> }
>
>+#if 0
>+ /* initialize cq_lock */
>+ dat_status = dapl_os_lock_init(&hca_ptr->ib_trans.cq_lock);
>+ if (dat_status != DAT_SUCCESS) {
>+ dapl_log(DAPL_DBG_TYPE_ERR,
>+ "%s() failed to init cq_lock\n",
>__FUNCTION__);
>+ return DAT_INTERNAL_ERROR;
>+ }
>+#endif
>+
> /* initialize CM list for listens on this HCA */
>
>dapl_llist_init_head((DAPL_LLIST_HEAD*)&hca_ptr->ib_trans.list);
>
>+ /* create pipe communication endpoints */
>+ if (_pipe(g_scm_pipe, 256, O_TEXT)) {
>+ dapl_dbg_log (DAPL_DBG_TYPE_ERR,
>+ "%s() failed to create thread\n",
>__FUNCTION__);
>+ return DAT_INTERNAL_ERROR;
>+ }
>+
> /* create thread to process inbound connect request */
> hca_ptr->ib_trans.cr_state = IB_THREAD_INIT;
> dat_status = dapl_os_thread_create(cr_thread,
>@@ -199,6 +236,7 @@ DAT_RETURN dapli_close_sock_cm ( IN
>DAPL_HCA *hca_ptr )
>
> /* destroy cr_thread and lock */
> hca_ptr->ib_trans.cr_state = IB_THREAD_CANCEL;
>+
> while (hca_ptr->ib_trans.cr_state != IB_THREAD_EXIT) {
> dapl_dbg_log(DAPL_DBG_TYPE_UTIL,
> " close_hca: waiting for cr_thread\n");
>
>
>
>_______________________________________________
>ofw mailing list
>ofw at lists.openfabrics.org
>http://lists.openfabrics.org/cgi-bin/mailman/listinfo/ofw
>
More information about the general
mailing list