[ofa-general] RE: [ofw] [PATCH 5/5] [DAPL] dapl/ibal-scm: update ibal-scm provider

Davis, Arlin R arlin.r.davis at intel.com
Fri Jan 30 14:34:38 PST 2009


Applied all 5 patches. Thanks.

-arlin

>-----Original Message-----
>From: ofw-bounces at lists.openfabrics.org
>[mailto:ofw-bounces at lists.openfabrics.org] On Behalf Of Sean Hefty
>Sent: Friday, January 30, 2009 10:59 AM
>To: Hefty, Sean; OpenIB; ofw at lists.openfabrics.org
>Subject: [ofw] [PATCH 5/5] [DAPL] dapl/ibal-scm: update
>ibal-scm provider
>
>From: Stan Smith <stan.smith at intel.com>
>
>Update the dapl.git tree with the latest SVN version of the
>ibal-scm provider.
>
>Signed-off-by: Sean Hefty <sean.hefty at intel.com>
>---
>Actual codng changes were made by Stan.  I'm just submitting the patch
>to update the DAPL git repository.
>
> dapl/ibal-scm/dapl_ibal-scm_cm.c   |  775
>+++++++++++++++++++++++++-----------
> dapl/ibal-scm/dapl_ibal-scm_util.c |   44 ++
> 2 files changed, 584 insertions(+), 235 deletions(-)
>
>diff --git a/dapl/ibal-scm/dapl_ibal-scm_cm.c
>b/dapl/ibal-scm/dapl_ibal-scm_cm.c
>index df83008..6a050b8 100644
>--- a/dapl/ibal-scm/dapl_ibal-scm_cm.c
>+++ b/dapl/ibal-scm/dapl_ibal-scm_cm.c
>@@ -63,6 +63,88 @@
> #include <ws2tcpip.h>
> #include <io.h>
>
>+extern int g_scm_pipe[2];
>+
>+extern DAT_RETURN
>+dapls_ib_query_gid( IN  DAPL_HCA       *hca_ptr,
>+                   IN  GID             *gid );
>+
>+
>+static struct ib_cm_handle * dapli_cm_create(void)
>+{
>+       struct ib_cm_handle *cm_ptr;
>+
>+       /* Allocate CM, init lock, and initialize */
>+       if ((cm_ptr = dapl_os_alloc(sizeof(*cm_ptr))) == NULL)
>+               return NULL;
>+
>+        if (dapl_os_lock_init(&cm_ptr->lock))
>+               goto bail;
>+
>+       (void)dapl_os_memzero(cm_ptr, sizeof(*cm_ptr));
>+       cm_ptr->dst.ver = htons(DSCM_VER);
>+       cm_ptr->socket = -1;
>+       cm_ptr->l_socket = -1;
>+       return cm_ptr;
>+bail:
>+       dapl_os_free(cm_ptr, sizeof(*cm_ptr));
>+       return NULL;
>+}
>+
>+
>+/* mark for destroy, remove all references, schedule cleanup */
>+
>+static void dapli_cm_destroy(struct ib_cm_handle *cm_ptr)
>+{
>+       dapl_dbg_log(DAPL_DBG_TYPE_CM,
>+                    " cm_destroy: cm %p ep %p\n", cm_ptr,cm_ptr->ep);
>+
>+       /* cleanup, never made it to work queue */
>+       if (cm_ptr->state == SCM_INIT) {
>+               if (cm_ptr->socket >= 0)
>+                       closesocket(cm_ptr->socket);
>+               if (cm_ptr->l_socket >= 0)
>+                       closesocket(cm_ptr->l_socket);
>+               dapl_os_free(cm_ptr, sizeof(*cm_ptr));
>+               return;
>+       }
>+
>+       dapl_os_lock(&cm_ptr->lock);
>+       cm_ptr->state = SCM_DESTROY;
>+       if (cm_ptr->ep)
>+               cm_ptr->ep->cm_handle = IB_INVALID_HANDLE;
>+
>+       /* close socket if still active */
>+       if (cm_ptr->socket >= 0) {
>+               closesocket(cm_ptr->socket);
>+               cm_ptr->socket = -1;
>+       }
>+       if (cm_ptr->l_socket >= 0) {
>+               closesocket(cm_ptr->l_socket);
>+               cm_ptr->l_socket = -1;
>+       }
>+       dapl_os_unlock(&cm_ptr->lock);
>+
>+       /* wakeup work thread */
>+       _write(g_scm_pipe[1], "w", sizeof "w");
>+}
>+
>+
>+/* queue socket for processing CM work */
>+static void dapli_cm_queue(struct ib_cm_handle *cm_ptr)
>+{
>+       /* add to work queue for cr thread processing */
>+       dapl_llist_init_entry((DAPL_LLIST_ENTRY*)&cm_ptr->entry);
>+       dapl_os_lock( &cm_ptr->hca->ib_trans.lock );
>+
>dapl_llist_add_tail((DAPL_LLIST_HEAD*)&cm_ptr->hca->ib_trans.list,
>+                           (DAPL_LLIST_ENTRY*)&cm_ptr->entry,
>(void*)cm_ptr);
>+       dapl_os_unlock(&cm_ptr->hca->ib_trans.lock);
>+
>+        /* wakeup CM work thread */
>+        _write(g_scm_pipe[1], "w", sizeof "w");
>+}
>+
>+
>
> static uint16_t
> dapli_get_lid(IN DAPL_HCA *hca, IN int port)
>@@ -123,6 +205,263 @@ dapli_get_lid(IN DAPL_HCA *hca, IN int port)
>
>
> /*
>+ * ACTIVE/PASSIVE: called from CR thread or consumer via ep_disconnect
>+ */
>+static DAT_RETURN
>+dapli_socket_disconnect(dp_ib_cm_handle_t cm_ptr)
>+{
>+       DAPL_EP *ep_ptr = cm_ptr->ep;
>+       DAT_UINT32 disc_data = htonl(0xdead);
>+
>+       if (ep_ptr == NULL)
>+               return DAT_SUCCESS;
>+
>+       dapl_os_lock(&cm_ptr->lock);
>+       if ((cm_ptr->state == SCM_INIT) ||
>+           (cm_ptr->state == SCM_DISCONNECTED)) {
>+               dapl_os_unlock(&cm_ptr->lock);
>+               return DAT_SUCCESS;
>+       } else {
>+               /* send disc date, close socket, schedule destroy */
>+               if (cm_ptr->socket >= 0) {
>+                       send(cm_ptr->socket, (const char *)&disc_data,
>+                               sizeof(disc_data), 0);
>+                       closesocket(cm_ptr->socket);
>+                       cm_ptr->socket = -1;
>+               }
>+               cm_ptr->state = SCM_DISCONNECTED;
>+               _write(g_scm_pipe[1], "w", sizeof "w");
>+       }
>+       dapl_os_unlock(&cm_ptr->lock);
>+
>+
>+       if (ep_ptr->cr_ptr) {
>+               dapls_cr_callback(cm_ptr,
>+                                 IB_CME_DISCONNECTED,
>+                                 NULL,
>+                                 ((DAPL_CR *)ep_ptr->cr_ptr)->sp_ptr);
>+       } else {
>+               dapl_evd_connection_callback(ep_ptr->cm_handle,
>+                                            IB_CME_DISCONNECTED,
>+                                            NULL,
>+                                            ep_ptr);
>+       }
>+
>+       /* remove reference from endpoint */
>+       ep_ptr->cm_handle = NULL;
>+
>+       /* schedule destroy */
>+
>+
>+       return DAT_SUCCESS;
>+}
>+
>+
>+
>+/*
>+ * PASSIVE: consumer accept, send local QP information, private data,
>+ * queue on work thread to receive RTU information to avoid blocking
>+ * user thread.
>+ */
>+static DAT_RETURN
>+dapli_socket_accept_usr( DAPL_EP       *ep_ptr,
>+                        DAPL_CR        *cr_ptr,
>+                        DAT_COUNT      p_size,
>+                        DAT_PVOID      p_data )
>+{
>+       DAPL_IA         *ia_ptr = ep_ptr->header.owner_ia;
>+       dp_ib_cm_handle_t cm_ptr = cr_ptr->ib_cm_handle;
>+       WSABUF          iovec[2];
>+       int             len, rc;
>+       short           rtu_data = 0;
>+       ib_api_status_t ibs;
>+       ib_qp_attr_t    qpa;
>+       dapl_ibal_port_t *p_port;
>+       dapl_ibal_ca_t  *p_ca;
>+
>+       dapl_dbg_log (DAPL_DBG_TYPE_EP, "%s() p_sz %d sock %d
>port 0x%x\n",
>+                       __FUNCTION__,p_size,cm_ptr->socket,
>+                       ia_ptr->hca_ptr->port_num);
>+
>+       if (p_size >  IB_MAX_REP_PDATA_SIZE)
>+               return DAT_LENGTH_ERROR;
>+
>+       /* must have a accepted socket */
>+       if ( cm_ptr->socket < 0 ) {
>+               dapl_dbg_log(DAPL_DBG_TYPE_EP,
>+                    "%s() Not accepted socket? remote
>port=0x%x lid=0x%x"
>+                    " qpn=0x%x psize=%d\n",
>+                    cm_ptr->dst.port, cm_ptr->dst.lid,
>+                    ntohs(cm_ptr->dst.qpn), cm_ptr->dst.p_size);
>+               return DAT_INTERNAL_ERROR;
>+       }
>+
>+       dapl_dbg_log(DAPL_DBG_TYPE_EP,
>+                    " accept_usr: remote port=0x%x lid=0x%x"
>+                    " qpn=0x%x psize=%d\n",
>+                    cm_ptr->dst.port, cm_ptr->dst.lid,
>+                    ntohs(cm_ptr->dst.qpn), cm_ptr->dst.p_size);
>+
>+       /* modify QP to RTR and then to RTS with remote info
>already read */
>+
>+       p_ca = (dapl_ibal_ca_t *) ia_ptr->hca_ptr->ib_hca_handle;
>+       p_port = dapli_ibal_get_port (p_ca,
>(uint8_t)ia_ptr->hca_ptr->port_num);
>+       if (!p_port)
>+       {
>+               dapl_dbg_log(DAPL_DBG_TYPE_ERR,
>+                       "%s() dapli_ibal_get_port() failed @
>line #%d\n",
>+                       __FUNCTION__,__LINE__);
>+               goto bail;
>+       }
>+
>+       dapl_dbg_log(DAPL_DBG_TYPE_EP,
>+                       "%s() DST: qpn 0x%x port 0x%x lid %x
>psize %d\n",
>+                       __FUNCTION__,
>+                       cl_ntoh32(cm_ptr->dst.qpn),
>+                       cm_ptr->dst.port,
>+                       cl_ntoh16(cm_ptr->dst.lid),
>cm_ptr->dst.p_size);
>+
>+       /* modify QP to RTR and then to RTS with remote info */
>+
>+       ibs = dapls_modify_qp_state_to_rtr( ep_ptr->qp_handle,
>+                                           cm_ptr->dst.qpn,
>+                                           cm_ptr->dst.lid,
>+                                           p_port );
>+       if (ibs != IB_SUCCESS)
>+       {
>+               dapl_dbg_log(DAPL_DBG_TYPE_ERR,
>+                               "%s() QP --> RTR failed @ line #%d\n",
>+                               __FUNCTION__,__LINE__);
>+               goto bail;
>+       }
>+
>+       if ( dapls_modify_qp_state_to_rts( ep_ptr->qp_handle ) )
>+       {
>+               dapl_dbg_log(DAPL_DBG_TYPE_ERR,
>+                               "%s() QP --> RTS failed @ line #%d\n",
>+                               __FUNCTION__,__LINE__);
>+               goto bail;
>+       }
>+
>+       ep_ptr->qp_state = IB_QP_STATE_RTS;
>+
>+       /* save remote address information */
>+       dapl_os_memcpy( &ep_ptr->remote_ia_address,
>+                       &cm_ptr->dst.ia_address,
>+                       sizeof(ep_ptr->remote_ia_address));
>+
>+       /* determine QP & port numbers */
>+       ibs = ib_query_qp(ep_ptr->qp_handle, &qpa);
>+       if (ibs != IB_SUCCESS)
>+       {
>+               dapl_dbg_log(DAPL_DBG_TYPE_ERR,
>+                            " ib_query_qp() ERR %s\n",
>ib_get_err_str(ibs));
>+               goto bail;
>+       }
>+
>+       /* Send our QP info, IA address, and private data */
>+       cm_ptr->dst.qpn = qpa.num; /* ib_net32_t */
>+       cm_ptr->dst.port = ia_ptr->hca_ptr->port_num;
>+       cm_ptr->dst.lid = dapli_get_lid(ia_ptr->hca_ptr,
>ia_ptr->hca_ptr->port_num);
>+       /* set gid in network order */
>+       ibs = dapls_ib_query_gid( ia_ptr->hca_ptr, &cm_ptr->dst.gid );
>+       if ( ibs != IB_SUCCESS )
>+       {
>+               dapl_dbg_log(DAPL_DBG_TYPE_ERR,
>+                       "%s() dapls_ib_query_gid() returns '%s'\n",
>+                       __FUNCTION__,ib_get_err_str(ibs));
>+               goto bail;
>+       }
>+
>+       cm_ptr->dst.ia_address = ia_ptr->hca_ptr->hca_address;
>+       cm_ptr->dst.p_size = p_size;
>+
>+       dapl_dbg_log(DAPL_DBG_TYPE_CM,
>+               "%s()\n  Tx QP info: qpn %x port 0x%x lid 0x%x
>p_sz %d IP %s\n",
>+               __FUNCTION__, cl_ntoh32(cm_ptr->dst.qpn),
>cm_ptr->dst.port,
>+               cl_ntoh16(cm_ptr->dst.lid), cm_ptr->dst.p_size,
>+               dapli_get_ip_addr_str(&cm_ptr->dst.ia_address,NULL));
>+
>+       /* network byte-ordering - QPN & LID already are */
>+       cm_ptr->dst.p_size = cl_hton32(cm_ptr->dst.p_size);
>+       cm_ptr->dst.port = cl_hton16(cm_ptr->dst.port);
>+
>+       iovec[0].buf = (char*)&cm_ptr->dst;
>+       iovec[0].len  = sizeof(ib_qp_cm_t);
>+       if (p_size) {
>+               iovec[1].buf = p_data;
>+               iovec[1].len  = p_size;
>+       }
>+       rc = WSASend( cm_ptr->socket, iovec, (p_size ? 2:1),
>&len, 0, 0, 0 );
>+       if (rc || len != (p_size + sizeof(ib_qp_cm_t))) {
>+               dapl_dbg_log(DAPL_DBG_TYPE_ERR,
>+                            " accept_usr: ERR %d, wcnt=%d\n",
>+                            WSAGetLastError(), len);
>+               goto bail;
>+       }
>+       dapl_dbg_log(DAPL_DBG_TYPE_CM,
>+                    " accept_usr: local port=0x%x lid=0x%x"
>+                    " qpn=0x%x psize=%d\n",
>+                    ntohs(cm_ptr->dst.port), ntohs(cm_ptr->dst.lid),
>+                    ntohl(cm_ptr->dst.qpn),
>ntohl(cm_ptr->dst.p_size));
>+
>+       /* save state and reference to EP, queue for RTU data */
>+       cm_ptr->ep = ep_ptr;
>+       cm_ptr->hca = ia_ptr->hca_ptr;
>+       cm_ptr->state = SCM_ACCEPTED;
>+
>+       /* restore remote address information for query */
>+       dapl_os_memcpy( &cm_ptr->dst.ia_address,
>+                       &ep_ptr->remote_ia_address,
>+                       sizeof(cm_ptr->dst.ia_address));
>+
>+       dapl_dbg_log( DAPL_DBG_TYPE_EP," PASSIVE: accepted!\n" );
>+       dapli_cm_queue(cm_ptr);
>+
>+       return DAT_SUCCESS;
>+
>+bail:
>+       dapl_dbg_log( DAPL_DBG_TYPE_ERR," accept_usr: ERR
>!QP_RTR_RTS \n");
>+       dapli_cm_destroy(cm_ptr);
>+       dapls_ib_reinit_ep( ep_ptr ); /* reset QP state */
>+
>+       return DAT_INTERNAL_ERROR;
>+}
>+
>+
>+/*
>+ * PASSIVE: read RTU from active peer, post CONN event
>+ */
>+void
>+dapli_socket_accept_rtu(dp_ib_cm_handle_t cm_ptr)
>+{
>+       int             len;
>+       short           rtu_data = 0;
>+
>+       /* complete handshake after final QP state change */
>+       len = recv(cm_ptr->socket, (char*)&rtu_data,
>sizeof(rtu_data), 0);
>+       if ( len != sizeof(rtu_data) || ntohs(rtu_data) != 0x0e0f ) {
>+               dapl_dbg_log(DAPL_DBG_TYPE_ERR,
>+                            " accept_rtu: ERR %d, rcnt=%d rdata=%x\n",
>+                            WSAGetLastError(), len, ntohs(rtu_data) );
>+               goto bail;
>+       }
>+
>+       /* save state and reference to EP, queue for disc event */
>+       cm_ptr->state = SCM_CONNECTED;
>+
>+       /* final data exchange if remote QP state is good to go */
>+       dapl_dbg_log( DAPL_DBG_TYPE_EP," PASSIVE: connected!\n" );
>+       dapls_cr_callback(cm_ptr, IB_CME_CONNECTED, NULL, cm_ptr->sp);
>+       return;
>+bail:
>+       dapls_ib_reinit_ep(cm_ptr->ep); /* reset QP state */
>+       dapli_cm_destroy(cm_ptr);
>+       dapls_cr_callback(cm_ptr, IB_CME_DESTINATION_REJECT,
>NULL, cm_ptr->sp);
>+}
>+
>+
>+/*
>  * ACTIVE: Create socket, connect, and exchange QP information
>  */
> static DAT_RETURN
>@@ -143,21 +482,16 @@ dapli_socket_connect (    DAPL_EP
>         *ep_ptr,
>        dapl_ibal_port_t *p_port;
>        dapl_ibal_ca_t  *p_ca;
>
>-       dapl_dbg_log(DAPL_DBG_TYPE_EP, " connect: r_qual
>%d\n", r_qual);
>+       dapl_dbg_log(DAPL_DBG_TYPE_EP, " connect: r_qual %d
>psize %d\n",
>+                    r_qual, p_size);
>
>-       /*
>-        *  Allocate CM and initialize
>-        */
>-       if ((cm_ptr = dapl_os_alloc(sizeof(*cm_ptr))) == NULL ) {
>+       cm_ptr = dapli_cm_create();
>+       if (cm_ptr == NULL)
>                return DAT_INSUFFICIENT_RESOURCES;
>-       }
>-
>-       (void) dapl_os_memzero( cm_ptr, sizeof(*cm_ptr) );
>-       cm_ptr->socket = -1;
>
>        /* create, connect, sockopt, and exchange QP information */
>        if ((cm_ptr->socket = socket(AF_INET,SOCK_STREAM,0)) < 0 ) {
>-               dapl_os_free( cm_ptr, sizeof( *cm_ptr ) );
>+               dapli_cm_destroy(cm_ptr);
>                return DAT_INSUFFICIENT_RESOURCES;
>        }
>
>@@ -166,7 +500,7 @@ dapli_socket_connect (      DAPL_EP
>         *ep_ptr,
>        if (connect(cm_ptr->socket, r_addr, sizeof(*r_addr))
>== SOCKET_ERROR) {
>                dapl_dbg_log(DAPL_DBG_TYPE_ERR, " connect: %d
>on r_qual %d\n",
>                             WSAGetLastError(), (unsigned int)r_qual);
>-               dapl_os_free( cm_ptr, sizeof( *cm_ptr ) );
>+               dapli_cm_destroy(cm_ptr);
>                return DAT_INVALID_ADDRESS;
>        }
>
>@@ -175,6 +509,8 @@ dapli_socket_connect (      DAPL_EP
>         *ep_ptr,
>                          (const char*)&opt,
>                          sizeof(opt) );
>
>+       dapl_dbg_log(DAPL_DBG_TYPE_EP, " socket connected!\n");
>+
>        /* determine QP & port numbers */
>        ibs = ib_query_qp(ep_ptr->qp_handle, &qpa);
>        if (ibs != IB_SUCCESS)
>@@ -187,7 +523,6 @@ dapli_socket_connect (      DAPL_EP
>         *ep_ptr,
>        /* Send QP info, IA address and private data */
>        cm_ptr->dst.qpn = qpa.num; /* ib_net32_t */
>        cm_ptr->dst.port = cl_hton16(ia_ptr->hca_ptr->port_num);
>-
>        cm_ptr->dst.lid = dapli_get_lid( ia_ptr->hca_ptr,
>                                         ia_ptr->hca_ptr->port_num );
>        if (cm_ptr->dst.lid == 0)
>@@ -197,6 +532,17 @@ dapli_socket_connect (     DAPL_EP
>         *ep_ptr,
>                                __FUNCTION__, __LINE__);
>                goto bail;
>        }
>+
>+       /* set gid in network order */
>+       ibs = dapls_ib_query_gid( ia_ptr->hca_ptr, &cm_ptr->dst.gid );
>+       if ( ibs != IB_SUCCESS )
>+       {
>+               dapl_dbg_log(DAPL_DBG_TYPE_ERR,
>+                       "%s() dapls_ib_query_gid() returns '%s'\n",
>+                       __FUNCTION__,ib_get_err_str(ibs));
>+               goto bail;
>+       }
>+
>        cm_ptr->dst.ia_address = ia_ptr->hca_ptr->hca_address;
>        cm_ptr->dst.p_size = cl_hton32(p_size);
>
>@@ -213,6 +559,8 @@ dapli_socket_connect (      DAPL_EP
>         *ep_ptr,
>                iovec[1].buf = p_data;
>                iovec[1].len  = p_size;
>        }
>+
>+       dapl_dbg_log(DAPL_DBG_TYPE_EP," socket connected,
>write QP and private data\n");
>        rc = WSASend (cm_ptr->socket, iovec, (p_size ? 2:1),
>&len, 0, 0, NULL);
>        if ( rc || len != (p_size + sizeof(ib_qp_cm_t)) ) {
>                dapl_dbg_log(DAPL_DBG_TYPE_ERR,
>@@ -225,17 +573,65 @@ dapli_socket_connect (    DAPL_EP
>         *ep_ptr,
>                     cm_ptr->dst.port, cm_ptr->dst.lid,
>                     cm_ptr->dst.qpn, cm_ptr->dst.p_size );
>
>+       /* queue up to work thread to avoid blocking consumer */
>+       cm_ptr->state = SCM_CONN_PENDING;
>+       cm_ptr->hca = ia_ptr->hca_ptr;
>+       cm_ptr->ep = ep_ptr;
>+       dapli_cm_queue(cm_ptr);
>+       return DAT_SUCCESS;
>+bail:
>+       /* close socket, free cm structure */
>+       dapli_cm_destroy(cm_ptr);
>+       return DAT_INTERNAL_ERROR;
>+}
>+
>+
>+/*
>+ * ACTIVE: exchange QP information, called from CR thread
>+ */
>+void
>+dapli_socket_connect_rtu(dp_ib_cm_handle_t cm_ptr)
>+{
>+       DAPL_EP         *ep_ptr = cm_ptr->ep;
>+       DAPL_IA         *ia_ptr = cm_ptr->ep->header.owner_ia;
>+       int             len, rc;
>+       DWORD           ioflags;
>+       WSABUF          iovec[1];
>+       short           rtu_data = htons(0x0E0F);
>+       ib_cm_events_t  event = IB_CME_DESTINATION_REJECT;
>+       ib_api_status_t ibs;
>+       dapl_ibal_port_t *p_port;
>+       dapl_ibal_ca_t  *p_ca;
>+
>        /* read DST information into cm_ptr, overwrite SRC info */
>+       dapl_dbg_log(DAPL_DBG_TYPE_EP," connect_rtu: recv peer
>QP data\n");
>+
>+       iovec[0].buf = (char*)&cm_ptr->dst;
>+       iovec[0].len  = sizeof(ib_qp_cm_t);
>        ioflags = len = 0;
>        rc = WSARecv (cm_ptr->socket, iovec, 1, &len, &ioflags, 0, 0);
>-       if ( rc == SOCKET_ERROR || len != sizeof(ib_qp_cm_t) ) {
>-               dapl_dbg_log(DAPL_DBG_TYPE_ERR,"connect read:
>ERR %d rcnt=%d\n",
>-                            WSAGetLastError(), len);
>+       if ( rc == SOCKET_ERROR || len != sizeof(ib_qp_cm_t) ||
>+                                       ntohs(cm_ptr->dst.ver)
>!= DSCM_VER )
>+       {
>+               dapl_dbg_log(DAPL_DBG_TYPE_ERR,
>+                            "connect_rtu read: ERR %d rcnt=%d
>ver=%d\n",
>+                            WSAGetLastError(), len, cm_ptr->dst.ver);
>+               goto bail;
>+       }
>+
>+       /* check for consumer reject */
>+       if (cm_ptr->dst.rej) {
>+               dapl_dbg_log(DAPL_DBG_TYPE_CM,
>+                            " connect_rtu read: PEER REJ
>reason=0x%x\n",
>+                            ntohs(cm_ptr->dst.rej));
>+               event = IB_CME_DESTINATION_REJECT_PRIVATE_DATA;
>                goto bail;
>        }
>
>-       /* revert back to host byte ordering */
>+       /* convert peer response values to host order */
>        cm_ptr->dst.port = cl_ntoh16(cm_ptr->dst.port);
>+       cm_ptr->dst.lid = ntohs(cm_ptr->dst.lid);
>+       cm_ptr->dst.qpn = cm_ptr->dst.qpn;
>        cm_ptr->dst.p_size = cl_ntoh32(cm_ptr->dst.p_size);
>
>        dapl_dbg_log(DAPL_DBG_TYPE_EP, " connect: Rx DST: qpn
>%x port %d "
>@@ -245,15 +641,27 @@ dapli_socket_connect (    DAPL_EP
>         *ep_ptr,
>                        cm_ptr->dst.p_size,
>
>dapli_get_ip_addr_str(&cm_ptr->dst.ia_address,NULL));
>
>+       /* save remote address information */
>+       dapl_os_memcpy( &ep_ptr->remote_ia_address,
>+                       &cm_ptr->dst.ia_address,
>+                       sizeof(ep_ptr->remote_ia_address));
>+
>+       dapl_dbg_log(DAPL_DBG_TYPE_EP,
>+                    " connect_rtu: DST %s port=0x%x lid=0x%x,
>qpn=0x%x, psize=%d\n",
>+                    inet_ntoa(((struct sockaddr_in
>*)&cm_ptr->dst.ia_address)->sin_addr),
>+                    cm_ptr->dst.port, cm_ptr->dst.lid,
>+                    cm_ptr->dst.qpn, cm_ptr->dst.p_size);
>+
>        /* validate private data size before reading */
>-       if ( cm_ptr->dst.p_size > IB_MAX_REP_PDATA_SIZE ) {
>+       if (cm_ptr->dst.p_size > IB_MAX_REP_PDATA_SIZE) {
>                dapl_dbg_log(DAPL_DBG_TYPE_ERR,
>-                            " connect read: psize (%d) wrong\n",
>+                            " connect_rtu read: psize (%d) wrong\n",
>                             cm_ptr->dst.p_size );
>                goto bail;
>        }
>
>        /* read private data into cm_handle if any present */
>+       dapl_dbg_log(DAPL_DBG_TYPE_EP," socket connected, read
>private data\n");
>        if ( cm_ptr->dst.p_size ) {
>                iovec[0].buf = cm_ptr->p_data;
>                iovec[0].len  = cm_ptr->dst.p_size;
>@@ -300,32 +708,29 @@ dapli_socket_connect (    DAPL_EP
>         *ep_ptr,
>
>        ep_ptr->qp_state = IB_QP_STATE_RTS;
>
>+       dapl_dbg_log(DAPL_DBG_TYPE_EP," connect_rtu: send RTU\n");
>+
>        /* complete handshake after final QP state change */
>        send(cm_ptr->socket, (const char *)&rtu_data,
>sizeof(rtu_data), 0);
>
>        /* init cm_handle and post the event with private data */
>        ep_ptr->cm_handle = cm_ptr;
>+       cm_ptr->state = SCM_CONNECTED;
>        dapl_dbg_log( DAPL_DBG_TYPE_EP," ACTIVE: connected!\n" );
>
>        dapl_evd_connection_callback(   ep_ptr->cm_handle,
>                                        IB_CME_CONNECTED,
>                                        cm_ptr->p_data,
>                                        ep_ptr );
>-       return DAT_SUCCESS;
>-
>+       return;
> bail:
>        /* close socket, free cm structure and post error event */
>-       if ( cm_ptr->socket >= 0 )
>-               closesocket(cm_ptr->socket);
>-
>-       dapl_os_free( cm_ptr, sizeof( *cm_ptr ) );
>-       dapls_ib_reinit_ep( ep_ptr ); /* reset QP state */
>-
>-       dapl_evd_connection_callback(   ep_ptr->cm_handle,
>-                                       IB_CME_LOCAL_FAILURE,
>+       dapli_cm_destroy(cm_ptr);
>+       dapls_ib_reinit_ep(ep_ptr); /* reset QP state */
>+       dapl_evd_connection_callback(   NULL /*ep_ptr->cm_handle*/,
>+                                       event,
>                                        NULL,
>                                        ep_ptr );
>-       return DAT_INTERNAL_ERROR;
> }
>
>
>@@ -347,14 +752,12 @@ dapli_socket_listen (     DAPL_IA
> *ia_ptr,
>                        ia_ptr, serviceID, sp_ptr);
>
>        /* Allocate CM and initialize */
>-       if ((cm_ptr = dapl_os_alloc(sizeof(*cm_ptr))) == NULL)
>+       cm_ptr = dapli_cm_create();
>+       if (cm_ptr == NULL)
>                return DAT_INSUFFICIENT_RESOURCES;
>
>-       (void) dapl_os_memzero( cm_ptr, sizeof( *cm_ptr ) );
>-
>-       cm_ptr->socket = cm_ptr->l_socket = -1;
>        cm_ptr->sp = sp_ptr;
>-       cm_ptr->hca_ptr = ia_ptr->hca_ptr;
>+       cm_ptr->hca = ia_ptr->hca_ptr;
>
>        /* bind, listen, set sockopt, accept, exchange data */
>        if ((cm_ptr->l_socket = socket(AF_INET, SOCK_STREAM, 0)) < 0) {
>@@ -406,12 +809,9 @@ dapli_socket_listen (      DAPL_IA
> *ia_ptr,
>        /* set cm_handle for this service point, save listen socket */
>        sp_ptr->cm_srvc_handle = cm_ptr;
>
>-       /* add to SP->CR thread list */
>-       dapl_llist_init_entry((DAPL_LLIST_ENTRY*)&cm_ptr->entry);
>-       dapl_os_lock( &cm_ptr->hca_ptr->ib_trans.lock );
>-
>dapl_llist_add_tail((DAPL_LLIST_HEAD*)&cm_ptr->hca_ptr->ib_trans.list,
>-                           (DAPL_LLIST_ENTRY*)&cm_ptr->entry,
>(void*)cm_ptr);
>-       dapl_os_unlock(&cm_ptr->hca_ptr->ib_trans.lock);
>+       /* queue up listen socket to process inbound CR's */
>+       cm_ptr->state = SCM_LISTEN;
>+       dapli_cm_queue(cm_ptr);
>
>        dapl_dbg_log( DAPL_DBG_TYPE_CM,
>                        " listen: qual 0x%x cr %p s_fd %d\n",
>@@ -421,10 +821,7 @@ dapli_socket_listen (      DAPL_IA
> *ia_ptr,
> bail:
>        dapl_dbg_log( DAPL_DBG_TYPE_CM,
>                        " listen: ERROR on conn_qual
>0x%x\n",serviceID);
>-       if ( cm_ptr->l_socket >= 0 )
>-               closesocket( cm_ptr->l_socket );
>-
>-       dapl_os_free( cm_ptr, sizeof( *cm_ptr ) );
>+       dapli_cm_destroy(cm_ptr);
>        return dat_status;
> }
>
>@@ -441,6 +838,8 @@ dapli_socket_accept(ib_cm_srvc_handle_t cm_ptr)
>        int             len;
>        DAT_RETURN      dat_status = DAT_SUCCESS;
>
>+       dapl_dbg_log(DAPL_DBG_TYPE_EP," socket_accept\n");
>+
>        /* Allocate accept CM and initialize */
>        if ((acm_ptr = dapl_os_alloc(sizeof(*acm_ptr))) == NULL)
>                return DAT_INSUFFICIENT_RESOURCES;
>@@ -448,8 +847,9 @@ dapli_socket_accept(ib_cm_srvc_handle_t cm_ptr)
>        (void) dapl_os_memzero( acm_ptr, sizeof( *acm_ptr ) );
>
>        acm_ptr->socket = -1;
>+       acm_ptr->l_socket = -1;
>        acm_ptr->sp = cm_ptr->sp;
>-       acm_ptr->hca_ptr = cm_ptr->hca_ptr;
>+       acm_ptr->hca = cm_ptr->hca;
>
>        len = sizeof(acm_ptr->dst.ia_address);
>        acm_ptr->socket = accept(cm_ptr->l_socket,
>@@ -464,27 +864,32 @@ dapli_socket_accept(ib_cm_srvc_handle_t cm_ptr)
>                goto bail;
>        }
>
>+       dapl_dbg_log(DAPL_DBG_TYPE_EP," socket accepted, read
>QP data\n");
>+
>        /* read in DST QP info, IA address. check for private data */
>        len =
>recv(acm_ptr->socket,(char*)&acm_ptr->dst,sizeof(ib_qp_cm_t),0);
>-       if ( len != sizeof(ib_qp_cm_t) ) {
>+       if ( len != sizeof(ib_qp_cm_t) ||
>ntohs(acm_ptr->dst.ver) != DSCM_VER )
>+       {
>                dapl_dbg_log(DAPL_DBG_TYPE_ERR,
>-                       " accept read: ERR %d, rcnt=%d\n",
>-                       WSAGetLastError(), len);
>+                       " accept read: ERR %d, rcnt=%d ver=%d\n",
>+                       WSAGetLastError(), len, acm_ptr->dst.ver);
>                dat_status = DAT_INTERNAL_ERROR;
>                goto bail;
>
>        }
>-       /* revert back to host byte ordering */
>+       /* convert accepted values to host byte ordering */
>        acm_ptr->dst.port = cl_ntoh16(acm_ptr->dst.port);
>+       acm_ptr->dst.lid = ntohs(acm_ptr->dst.lid);
>+       acm_ptr->dst.qpn = acm_ptr->dst.qpn;
>        acm_ptr->dst.p_size = cl_ntoh32(acm_ptr->dst.p_size);
>
>-       dapl_dbg_log(DAPL_DBG_TYPE_EP, " accept: DST
>sizeof(ib_cm_t) %d qpn %x "
>-               "port %d lid 0x%x psize %d IP %s\n",
>-               sizeof(ib_qp_cm_t),
>-               cl_ntoh32(acm_ptr->dst.qpn), acm_ptr->dst.port,
>+       dapl_dbg_log(DAPL_DBG_TYPE_EP, " accept: DST %s port 0x%x "
>+               "lid 0x%x qpn 0x%x psize %d\n",
>+               dapli_get_ip_addr_str(&acm_ptr->dst.ia_address,NULL),
>+               acm_ptr->dst.port,
>                cl_ntoh16(acm_ptr->dst.lid),
>-               acm_ptr->dst.p_size,
>-               dapli_get_ip_addr_str(&acm_ptr->dst.ia_address,NULL));
>+               cl_ntoh32(acm_ptr->dst.qpn),
>+               acm_ptr->dst.p_size);
>
>        /* validate private data size before reading */
>        if ( acm_ptr->dst.p_size > IB_MAX_REQ_PDATA_SIZE ) {
>@@ -495,6 +900,8 @@ dapli_socket_accept(ib_cm_srvc_handle_t cm_ptr)
>                goto bail;
>        }
>
>+       dapl_dbg_log(DAPL_DBG_TYPE_EP," socket accepted, read
>private data\n");
>+
>        /* read private data into cm_handle if any present */
>        if ( acm_ptr->dst.p_size ) {
>                len = recv( acm_ptr->socket,
>@@ -514,6 +921,8 @@ dapli_socket_accept(ib_cm_srvc_handle_t cm_ptr)
>                p_data = acm_ptr->p_data;
>        }
>
>+       acm_ptr->state = SCM_ACCEPTING;
>+
>        /* trigger CR event and return SUCCESS */
>        dapls_cr_callback(  acm_ptr,
>                            IB_CME_CONNECTION_REQUEST_PENDING,
>@@ -521,153 +930,8 @@ dapli_socket_accept(ib_cm_srvc_handle_t cm_ptr)
>                            acm_ptr->sp );
>
>        return DAT_SUCCESS;
>-
>-bail:
>-       if ( acm_ptr->socket >= 0 )
>-               closesocket( acm_ptr->socket );
>-
>-       dapl_os_free( acm_ptr, sizeof( *acm_ptr ) );
>-       return DAT_INTERNAL_ERROR;
>-}
>-
>-
>-static DAT_RETURN
>-dapli_socket_accept_final( DAPL_EP             *ep_ptr,
>-                          DAPL_CR              *cr_ptr,
>-                          DAT_COUNT            p_size,
>-                          DAT_PVOID            p_data )
>-{
>-       DAPL_IA         *ia_ptr = ep_ptr->header.owner_ia;
>-       dp_ib_cm_handle_t cm_ptr = cr_ptr->ib_cm_handle;
>-       ib_qp_cm_t      qp_cm;
>-       WSABUF          iovec[2];
>-       int             len, rc;
>-       short           rtu_data = 0;
>-       ib_api_status_t ibs;
>-       ib_qp_attr_t    qpa;
>-       dapl_ibal_port_t *p_port;
>-       dapl_ibal_ca_t  *p_ca;
>-
>-       dapl_dbg_log (DAPL_DBG_TYPE_EP, "%s() p_sz %d sock %d
>port %d\n",
>-                       __FUNCTION__,p_size,cm_ptr->socket,
>-                       ia_ptr->hca_ptr->port_num);
>-
>-       if (p_size >  IB_MAX_REP_PDATA_SIZE)
>-               return DAT_LENGTH_ERROR;
>-
>-       /* must have a accepted socket */
>-       if ( cm_ptr->socket < 0 )
>-               return DAT_INTERNAL_ERROR;
>-
>-       /* modify QP to RTR and then to RTS with remote info
>already read */
>-
>-       p_ca = (dapl_ibal_ca_t *) ia_ptr->hca_ptr->ib_hca_handle;
>-       p_port = dapli_ibal_get_port (p_ca,
>(uint8_t)ia_ptr->hca_ptr->port_num);
>-       if (!p_port)
>-       {
>-               dapl_dbg_log(DAPL_DBG_TYPE_ERR,
>-                       "%s() dapli_ibal_get_port() failed @
>line #%d\n",
>-                       __FUNCTION__,__LINE__);
>-               goto bail;
>-       }
>-
>-       dapl_dbg_log(DAPL_DBG_TYPE_EP, "%s() DST: qpn %x port
>%d lid %x\n",
>-                       __FUNCTION__,
>-                       cl_ntoh32(cm_ptr->dst.qpn),
>-                       cm_ptr->dst.port,
>-                       cl_ntoh16(cm_ptr->dst.lid));
>-
>-       /* modify QP to RTR and then to RTS with remote info */
>-
>-       ibs = dapls_modify_qp_state_to_rtr( ep_ptr->qp_handle,
>-                                           cm_ptr->dst.qpn,
>-                                           cm_ptr->dst.lid,
>-                                           p_port );
>-       if (ibs != IB_SUCCESS)
>-       {
>-               dapl_dbg_log(DAPL_DBG_TYPE_ERR,
>-                               "%s() QP --> RTR failed @ line #%d\n",
>-                               __FUNCTION__,__LINE__);
>-               goto bail;
>-       }
>-
>-       if ( dapls_modify_qp_state_to_rts( ep_ptr->qp_handle ) )
>-       {
>-               dapl_dbg_log(DAPL_DBG_TYPE_ERR,
>-                               "%s() QP --> RTS failed @ line #%d\n",
>-                               __FUNCTION__,__LINE__);
>-               goto bail;
>-       }
>-
>-       ep_ptr->qp_state = IB_QP_STATE_RTS;
>-
>-       /* determine QP & port numbers */
>-       ibs = ib_query_qp(ep_ptr->qp_handle, &qpa);
>-       if (ibs != IB_SUCCESS)
>-       {
>-               dapl_dbg_log(DAPL_DBG_TYPE_ERR,
>-                            " ib_query_qp() ERR %s\n",
>ib_get_err_str(ibs));
>-               goto bail;
>-       }
>-
>-       /* Send QP info, IA address, and private data */
>-       qp_cm.qpn = qpa.num; /* ib_net32_t */
>-       qp_cm.port = ia_ptr->hca_ptr->port_num;
>-       qp_cm.lid = dapli_get_lid( ia_ptr->hca_ptr,
>ia_ptr->hca_ptr->port_num );
>-       qp_cm.ia_address = ia_ptr->hca_ptr->hca_address;
>-       qp_cm.p_size = p_size;
>-
>-       dapl_dbg_log(DAPL_DBG_TYPE_CM,
>-               "%s()\n  Tx QP info: qpn %x port %d lid 0x%x
>p_sz %d IP %s\n",
>-               __FUNCTION__, cl_ntoh32(qp_cm.qpn), qp_cm.port,
>-               cl_ntoh16(qp_cm.lid), qp_cm.p_size,
>-               dapli_get_ip_addr_str(&qp_cm.ia_address,NULL));
>-
>-       /* network byte-ordering - QPN & LID already are */
>-       qp_cm.p_size = cl_hton32(qp_cm.p_size);
>-       qp_cm.port = cl_hton16(qp_cm.port);
>-
>-       iovec[0].buf = (char*)&qp_cm;
>-       iovec[0].len  = sizeof(ib_qp_cm_t);
>-       if (p_size) {
>-               iovec[1].buf = p_data;
>-               iovec[1].len  = p_size;
>-       }
>-       rc = WSASend( cm_ptr->socket, iovec, (p_size ? 2:1),
>&len, 0, 0, 0 );
>-       if (rc || len != (p_size + sizeof(ib_qp_cm_t))) {
>-               dapl_dbg_log(DAPL_DBG_TYPE_ERR,
>-                            " accept_final: ERR %d, wcnt=%d\n",
>-                            WSAGetLastError(), len);
>-               goto bail;
>-       }
>-       dapl_dbg_log(DAPL_DBG_TYPE_EP,
>-                    " accept_final: SRC qpn %x port %d lid
>0x%x psize %d\n",
>-                    qp_cm.qpn, qp_cm.port, qp_cm.lid, qp_cm.p_size );
>-
>-       /* complete handshake after final QP state change */
>-       len = recv(cm_ptr->socket, (char*)&rtu_data,
>sizeof(rtu_data), 0);
>-       if ( len != sizeof(rtu_data) || ntohs(rtu_data) != 0x0e0f ) {
>-               dapl_dbg_log(DAPL_DBG_TYPE_ERR,
>-                            " accept_final: ERR %d, rcnt=%d
>rdata=%x\n",
>-                            WSAGetLastError(), len, ntohs(rtu_data) );
>-               goto bail;
>-       }
>-
>-       /* final data exchange if remote QP state is good to go */
>-       dapl_dbg_log( DAPL_DBG_TYPE_EP," PASSIVE: connected!\n" );
>-
>-       dapls_cr_callback ( cm_ptr, IB_CME_CONNECTED, NULL,
>cm_ptr->sp );
>-
>-       return DAT_SUCCESS;
>-
> bail:
>-       dapl_dbg_log( DAPL_DBG_TYPE_ERR," accept_final: ERR
>!QP_RTR_RTS \n");
>-       if ( cm_ptr->socket >= 0 )
>-               closesocket( cm_ptr->socket );
>-
>-       dapl_os_free( cm_ptr, sizeof( *cm_ptr ) );
>-       dapls_ib_reinit_ep( ep_ptr ); /* reset QP state */
>-
>+       dapli_cm_destroy(acm_ptr);
>        return DAT_INTERNAL_ERROR;
> }
>
>@@ -747,11 +1011,7 @@ dapls_ib_disconnect (
>        dapl_dbg_log (DAPL_DBG_TYPE_EP,
>                        "dapls_ib_disconnect(ep_handle %p
>....)\n", ep_ptr);
>
>-       if ( cm_ptr->socket >= 0 ) {
>-               closesocket( cm_ptr->socket );
>-               cm_ptr->socket = -1;
>-       }
>-
>+#if 0 // XXX
>        /* disconnect QP ala transition to RESET state */
>        ib_status = dapls_modify_qp_state_to_reset (ep_ptr->qp_handle);
>
>@@ -776,15 +1036,18 @@ dapls_ib_disconnect (
>                                                NULL,
>                                                ep_ptr );
>                ep_ptr->cm_handle = NULL;
>-               dapl_os_free( cm_ptr, sizeof( *cm_ptr ) );
>        }
>-
>+#endif
>        /* modify QP state --> INIT */
>        dapls_ib_reinit_ep(ep_ptr);
>
>+       if (cm_ptr == NULL)
>        return DAT_SUCCESS;
>+       else
>+               return dapli_socket_disconnect(cm_ptr);
> }
>
>+
> /*
>  * dapls_ib_disconnect_clean
>  *
>@@ -874,14 +1137,20 @@ dapls_ib_remove_conn_listener (
>        if ( cm_ptr != NULL ) {
>                if ( cm_ptr->l_socket >= 0 ) {
>                        closesocket( cm_ptr->l_socket );
>+                       cm_ptr->l_socket = -1;
>+               }
>+               if ( cm_ptr->socket >= 0 ) {
>+                       closesocket( cm_ptr->socket );
>                        cm_ptr->socket = -1;
>                }
>                /* cr_thread will free */
>                sp_ptr->cm_srvc_handle = NULL;
>+               _write(g_scm_pipe[1], "w", sizeof "w");
>        }
>        return DAT_SUCCESS;
> }
>
>+
> /*
>  * dapls_ib_accept_connection
>  *
>@@ -928,7 +1197,7 @@ dapls_ib_accept_connection (
>                        return status;
>        }
>
>-       return ( dapli_socket_accept_final(ep_ptr, cr_ptr,
>p_size, p_data) );
>+       return ( dapli_socket_accept_usr(ep_ptr, cr_ptr,
>p_size, p_data) );
> }
>
>
>@@ -948,27 +1217,39 @@ dapls_ib_accept_connection (
>  *     DAT_INTERNAL_ERROR
>  *
>  */
>+
> DAT_RETURN
> dapls_ib_reject_connection (
>-       IN  dp_ib_cm_handle_t   ib_cm_handle,
>+       IN  dp_ib_cm_handle_t   cm_ptr,
>        IN  int                 reject_reason,
>-       IN  DAT_COUNT           private_data_size,
>-       IN  const DAT_PVOID     private_data)
>+       IN  DAT_COUNT           psize,
>+       IN  const DAT_PVOID     pdata)
> {
>-       ib_cm_srvc_handle_t     cm_ptr = ib_cm_handle;
>+       WSABUF  iovec[1];
>+       int     len;
>
>        dapl_dbg_log (DAPL_DBG_TYPE_EP,
>-                     "dapls_ib_reject_connection(cm_handle %p
>reason %x)\n",
>-                     ib_cm_handle, reject_reason );
>-
>-       /* just close the socket and return */
>-       if ( cm_ptr->socket > 0 ) {
>-               closesocket( cm_ptr->socket );
>+                     " reject(cm %p reason %x pdata %p psize %d)\n",
>+                     cm_ptr, reject_reason, pdata, psize );
>+
>+       /* write reject data to indicate reject */
>+       if (cm_ptr->socket >= 0) {
>+               cm_ptr->dst.rej = (uint16_t)reject_reason;
>+               cm_ptr->dst.rej = cl_hton16(cm_ptr->dst.rej);
>+               iovec[0].buf = (char*)&cm_ptr->dst;
>+               iovec[0].len  = sizeof(ib_qp_cm_t);
>+               (void) WSASend (cm_ptr->socket, iovec, 1,
>&len, 0, 0, NULL);
>+               closesocket(cm_ptr->socket);
>                cm_ptr->socket = -1;
>        }
>+
>+       /* cr_thread will destroy CR */
>+       cm_ptr->state = SCM_REJECTED;
>+        _write(g_scm_pipe[1], "w", sizeof "w");
>        return DAT_SUCCESS;
> }
>
>+
> /*
>  * dapls_ib_cm_remote_addr
>  *
>@@ -1157,7 +1438,7 @@ dapls_ib_get_dat_event (
>
>
> /*
>- * dapls_ib_get_dat_event
>+ * dapls_ib_get_cm_event
>  *
>  * Return a DAT connection event given a provider CM event.
>  *
>@@ -1189,12 +1470,16 @@ dapls_ib_get_cm_event (
> }
> #endif /* NOT_USED */
>
>-/* async CR processing thread to avoid blocking applications */
>+/* outbound/inbound CR processing thread to avoid blocking
>applications */
>+
>+#define SCM_MAX_CONN (8 * sizeof(fd_set))
>+
> void cr_thread(void *arg)
> {
>     struct dapl_hca    *hca_ptr = arg;
>     ib_cm_srvc_handle_t        cr, next_cr;
>     int                        max_fd, rc;
>+    char               rbuf[2];
>     fd_set             rfd, rfds;
>     struct timeval     to;
>
>@@ -1202,10 +1487,12 @@ void cr_thread(void *arg)
>
>     dapl_os_lock( &hca_ptr->ib_trans.lock );
>     hca_ptr->ib_trans.cr_state = IB_THREAD_RUN;
>+
>     while (hca_ptr->ib_trans.cr_state == IB_THREAD_RUN) {
>
>        FD_ZERO( &rfds );
>-       max_fd = -1;
>+       FD_SET(g_scm_pipe[0], &rfds);
>+       max_fd = g_scm_pipe[0];
>
>        if
>(!dapl_llist_is_empty((DAPL_LLIST_HEAD*)&hca_ptr->ib_trans.list))
>             next_cr = dapl_llist_peek_head((DAPL_LLIST_HEAD*)
>@@ -1230,32 +1517,46 @@ void cr_thread(void *arg)
>                continue;
>            }
>
>-           FD_SET( cr->l_socket, &rfds ); /* add to select set */
>-           if ( cr->l_socket > max_fd )
>+           if (cr->socket > SCM_MAX_CONN-1) {
>+               dapl_dbg_log(DAPL_DBG_TYPE_ERR,
>+                            "SCM ERR: cr->socket(%d) exceeded
>FD_SETSIZE %d\n",
>+                               cr->socket,SCM_MAX_CONN-1);
>+               continue;
>+           }
>+           FD_SET( cr->socket, &rfds ); /* add to select SET */
>+           if ( cr->socket > max_fd )
>                max_fd = cr->l_socket;
>
>            /* individual select poll to check for work */
>            FD_ZERO(&rfd);
>-           FD_SET(cr->l_socket, &rfd);
>+           FD_SET(cr->socket, &rfd);
>            dapl_os_unlock(&hca_ptr->ib_trans.lock);
>
>            to.tv_sec  = 0;
>            to.tv_usec = 0; /* wakeup and check destroy */
>
>            /* block waiting for Rx data */
>-           if (select(cr->l_socket+1,&rfd,NULL,NULL,&to) ==
>SOCKET_ERROR) {
>+           if (select(cr->socket+1,&rfd,NULL,NULL,&to) ==
>SOCKET_ERROR) {
>                rc = WSAGetLastError();
>                if ( rc != SOCKET_ERROR /*WSAENOTSOCK*/ )
>                {
>                    dapl_dbg_log (DAPL_DBG_TYPE_ERR/*CM*/,
>                                " thread: select(sock %d) ERR
>%d on cr %p\n",
>-                               cr->l_socket, rc, cr);
>+                               cr->socket, rc, cr);
>+               }
>+               closesocket(cr->socket);
>+               cr->socket = -1;
>+           } else if (FD_ISSET(cr->socket,&rfd)) {
>+               if (cr->socket > 0) {
>+                       if (cr->state == SCM_LISTEN)
>+                               dapli_socket_accept(cr);
>+                       else if (cr->state == SCM_ACCEPTED)
>+                               dapli_socket_accept_rtu(cr);
>+                       else if (cr->state == SCM_CONN_PENDING)
>+                               dapli_socket_connect_rtu(cr);
>+                       else if (cr->state == SCM_CONNECTED)
>+                               dapli_socket_disconnect(cr);
>                }
>-               closesocket(cr->l_socket);
>-               cr->l_socket = -1;
>-           } else if (FD_ISSET(cr->l_socket,&rfd) &&
>dapli_socket_accept(cr)) {
>-               closesocket(cr->l_socket);
>-               cr->l_socket = -1;
>            }
>            dapl_os_lock( &hca_ptr->ib_trans.lock );
>            next_cr =  dapl_llist_next_entry((DAPL_LLIST_HEAD*)
>@@ -1263,9 +1564,19 @@ void cr_thread(void *arg)
>
>(DAPL_LLIST_ENTRY*)&cr->entry );
>        }
>        dapl_os_unlock( &hca_ptr->ib_trans.lock );
>+
>        to.tv_sec  = 0;
>        to.tv_usec = 100000; /* wakeup and check destroy */
>+
>        (void) select(max_fd+1, &rfds, NULL, NULL, &to);
>+
>+       /* if pipe data consume - used to wake this thread up */
>+       if (FD_ISSET(g_scm_pipe[0],&rfds)) {
>+               dapl_dbg_log(DAPL_DBG_TYPE_CM," cr_thread()
>read pipe data\n");
>+printf(" cr_thread() read pipe data\n");
>+               _read(g_scm_pipe[0], rbuf, 2);
>+printf(" cr_thread() Finished read pipe data\n");
>+       }
>        dapl_os_lock( &hca_ptr->ib_trans.lock );
>     }
>     dapl_os_unlock( &hca_ptr->ib_trans.lock );
>diff --git a/dapl/ibal-scm/dapl_ibal-scm_util.c
>b/dapl/ibal-scm/dapl_ibal-scm_util.c
>index 8e5f8ac..06bc704 100644
>--- a/dapl/ibal-scm/dapl_ibal-scm_util.c
>+++ b/dapl/ibal-scm/dapl_ibal-scm_util.c
>@@ -52,6 +52,7 @@ static const char rcsid[] = "$Id:  $";
> #include "dapl.h"
> #include "dapl_adapter_util.h"
> #include "dapl_ibal_util.h"
>+#include "dapl_ibal_name_service.h"
>
> #include <stdio.h>
> #include <stdlib.h>
>@@ -61,9 +62,12 @@ static const char rcsid[] = "$Id:  $";
> #include <winsock2.h>
> #include <ws2tcpip.h>
> #include <io.h>
>+#include <fcntl.h>
>
>+extern void cr_thread(void *arg);
>
> int g_dapl_loopback_connection = 0;
>+int g_scm_pipe[2];
>
> #ifdef NOT_USED
>
>@@ -132,22 +136,55 @@ DAT_RETURN dapli_init_sock_cm ( IN
>DAPL_HCA  *hca_ptr )
>
>        dapl_dbg_log (DAPL_DBG_TYPE_UTIL, " %s():
>%p\n",__FUNCTION__,hca_ptr );
>
>-       /* set inline max with enviroment or default */
>+       /* set RC tunables via enviroment or default */
>        hca_ptr->ib_trans.max_inline_send =
>-               dapl_os_get_env_val ( "DAPL_MAX_INLINE",
>INLINE_SEND_DEFAULT );
>+               dapl_os_get_env_val("DAPL_MAX_INLINE",
>INLINE_SEND_DEFAULT);
>+#if 0
>+       hca_ptr->ib_trans.ack_retry =
>+               dapl_os_get_env_val("DAPL_ACK_RETRY", SCM_ACK_RETRY);
>+       hca_ptr->ib_trans.ack_timer =
>+               dapl_os_get_env_val("DAPL_ACK_TIMER", SCM_ACK_TIMER);
>+       hca_ptr->ib_trans.rnr_retry =
>+               dapl_os_get_env_val("DAPL_RNR_RETRY", SCM_RNR_RETRY);
>+       hca_ptr->ib_trans.rnr_timer =
>+               dapl_os_get_env_val("DAPL_RNR_TIMER", SCM_RNR_TIMER);
>+       hca_ptr->ib_trans.global =
>+               dapl_os_get_env_val("DAPL_GLOBAL_ROUTING", SCM_GLOBAL);
>+       hca_ptr->ib_trans.hop_limit =
>+               dapl_os_get_env_val("DAPL_HOP_LIMIT", SCM_HOP_LIMIT);
>+       hca_ptr->ib_trans.tclass =
>+               dapl_os_get_env_val("DAPL_TCLASS", SCM_TCLASS);
>+#endif
>
>        /* initialize cr_list lock */
>        dat_status = dapl_os_lock_init(&hca_ptr->ib_trans.lock);
>        if (dat_status != DAT_SUCCESS)
>        {
>                dapl_dbg_log (DAPL_DBG_TYPE_ERR,
>-                               " open_hca: failed to init lock\n");
>+                       "%s() failed to init cr_list lock\n",
>__FUNCTION__);
>                return DAT_INTERNAL_ERROR;
>        }
>
>+#if 0
>+       /* initialize cq_lock */
>+       dat_status = dapl_os_lock_init(&hca_ptr->ib_trans.cq_lock);
>+       if (dat_status != DAT_SUCCESS) {
>+               dapl_log(DAPL_DBG_TYPE_ERR,
>+                        "%s() failed to init cq_lock\n",
>__FUNCTION__);
>+               return DAT_INTERNAL_ERROR;
>+       }
>+#endif
>+
>        /* initialize CM list for listens on this HCA */
>
>dapl_llist_init_head((DAPL_LLIST_HEAD*)&hca_ptr->ib_trans.list);
>
>+       /* create pipe communication endpoints */
>+       if (_pipe(g_scm_pipe, 256, O_TEXT)) {
>+               dapl_dbg_log (DAPL_DBG_TYPE_ERR,
>+                       "%s() failed to create thread\n",
>__FUNCTION__);
>+               return DAT_INTERNAL_ERROR;
>+       }
>+
>        /* create thread to process inbound connect request */
>        hca_ptr->ib_trans.cr_state = IB_THREAD_INIT;
>        dat_status = dapl_os_thread_create(cr_thread,
>@@ -199,6 +236,7 @@ DAT_RETURN dapli_close_sock_cm ( IN
>DAPL_HCA  *hca_ptr )
>
>        /* destroy cr_thread and lock */
>        hca_ptr->ib_trans.cr_state = IB_THREAD_CANCEL;
>+
>        while (hca_ptr->ib_trans.cr_state != IB_THREAD_EXIT) {
>                dapl_dbg_log(DAPL_DBG_TYPE_UTIL,
>                             " close_hca: waiting for cr_thread\n");
>
>
>
>_______________________________________________
>ofw mailing list
>ofw at lists.openfabrics.org
>http://lists.openfabrics.org/cgi-bin/mailman/listinfo/ofw
>



More information about the general mailing list