[openib-general] [PATCH] udapl provider
Arlin Davis
ardavis at ichips.intel.com
Tue Apr 19 14:49:56 PDT 2005
Fixes for socket CM to prevent blocking and allow more uDAPL applications to run successfully.
Signed-off-by: Arlin Davis <ardavis at ichips.intel.com>
Index: udapl/Makefile
===================================================================
--- udapl/Makefile (revision 2190)
+++ udapl/Makefile (working copy)
@@ -122,7 +122,6 @@ endif
#
ifeq ($(VERBS),openib)
PROVIDER = $(TOPDIR)/../openib
-DAPL_IBLIB_DIR = /usr/local/lib
CFLAGS += -DSOCKET_CM -DOPENIB -DCQ_WAIT_OBJECT
CFLAGS += -I/usr/local/include/infiniband
endif
@@ -139,7 +138,7 @@ endif
CFLAGS += -I.
CFLAGS += -I..
-CFLAGS += -I../../dat/include
+CFLAGS += -I../dat/include
CFLAGS += -I../include
CFLAGS += -I$(PROVIDER)
@@ -234,8 +233,9 @@ PROVIDER_SRCS += dapl_openib_util.c dapl
endif
ifeq ($(VERBS),openib)
-LDFLAGS += -libverbs
-LDFLAGS += -L /usr/local/lib/
+LDFLAGS += -libverbs /usr/local/lib/infiniband/mthca.so
+LDFLAGS += -rpath /usr/local/lib -L /usr/local/lib
+LDFLAGS += -rpath /usr/local/lib/infiniband
PROVIDER_SRCS = dapl_ib_util.c dapl_ib_cq.c dapl_ib_qp.c
PROVIDER_SRCS += dapl_ib_cm.c dapl_ib_mem.c
endif
Index: openib/TODO
===================================================================
--- openib/TODO (revision 2190)
+++ openib/TODO (working copy)
@@ -2,16 +2,17 @@
IB Verbs:
- CQ resize?
- query call to get current qp state
-- ibv_get_cq_event() blocks until event arrives. need timed event and wakeup
+- ibv_get_cq_event() needs timed event call and wakeup
- query call to get device attributes
-- poll_cq return codes not exported
+- current implementation only supports one event per device
+- memory window support
DAPL:
- Build udapl issues with mthca having reverse dependencies to ibverbs
-- When CM arrives: change modify_qp_state RTS RTR calls
+- When real CM arrives: change modify_qp_state RTS RTR calls
- reinit EP needs a QP timewait completion notification
-- disconnect clean
-- add cq_object wakeup, time based cq_object wait
+- code disconnect clean
+- add cq_object wakeup, time based cq_object wait when verbs support arrives
- update uDAPL code with real CM and ATS support
- etc, etc.
Index: openib/dapl_ib_util.c
===================================================================
--- openib/dapl_ib_util.c (revision 2190)
+++ openib/dapl_ib_util.c (working copy)
@@ -53,18 +53,12 @@ static const char rcsid[] = "$Id: $";
#include "dapl_adapter_util.h"
#include "dapl_ib_util.h"
-#include <dlfcn.h>
#include <stdlib.h>
#include <netinet/tcp.h>
#include <sys/utsname.h>
#include <unistd.h>
#include <fcntl.h>
-/* set default path */
-#define OPENIB_VERBS_PATH_DEFAULT "/usr/local/lib/libibverbs.so"
-static char * ibv_path;
-static void * ibv_handle = NULL;
-
int g_dapl_loopback_connection = 0;
#ifdef SOCKET_CM
@@ -110,32 +104,11 @@ DAT_RETURN getipaddr( char *addr, int ad
*/
int32_t dapls_ib_init (void)
{
- dapl_dbg_log(DAPL_DBG_TYPE_UTIL, "dapls_ib_init() called\n");
-
- ibv_path = getenv("OPENIB_VERBS_PATH");
-
- if (ibv_path == NULL)
- ibv_path = OPENIB_VERBS_PATH_DEFAULT;
-
- dapl_dbg_log(DAPL_DBG_TYPE_UTIL," loading verbs library %s\n",ibv_path);
-
- ibv_handle = dlopen(ibv_path, RTLD_NOW | RTLD_GLOBAL);
- if (ibv_handle == NULL ) {
- dapl_dbg_log(DAPL_DBG_TYPE_ERR,
- " library load failure %s\n", dlerror());
- return -1;
- }
-
return 0;
}
int32_t dapls_ib_release (void)
{
- dapl_dbg_log (DAPL_DBG_TYPE_UTIL, "dapls_ib_release() called\n");
-
- if (ibv_handle)
- dlclose(ibv_handle);
-
return 0;
}
@@ -166,13 +139,6 @@ DAT_RETURN dapls_ib_open_hca (
dapl_dbg_log (DAPL_DBG_TYPE_UTIL,
" open_hca: %s - %p\n", hca_name, hca_ptr );
- if (ibv_handle == NULL) {
- dapl_dbg_log(DAPL_DBG_TYPE_ERR,
- " Failure loading IB verbs library %s \n",
- ibv_path);
- return DAT_PROVIDER_NOT_FOUND;
- }
-
/* Get list of all IB devices, find match, open */
dev_list = ibv_get_devices();
dlist_start(dev_list);
@@ -201,6 +167,29 @@ DAT_RETURN dapls_ib_open_hca (
}
#ifdef SOCKET_CM
+ /* initialize cr_list lock */
+ dat_status = dapl_os_lock_init(&hca_ptr->ib_trans.lock);
+ if (dat_status != DAT_SUCCESS)
+ {
+ dapl_dbg_log (DAPL_DBG_TYPE_ERR,
+ " open_hca: failed to init lock\n");
+ return dat_status;
+ }
+
+ /* initialize CM list for listens on this HCA */
+ dapl_llist_init_head(&hca_ptr->ib_trans.list);
+
+ /* create thread to process inbound connect request */
+ dat_status = dapl_os_thread_create(cr_thread,
+ (void*)hca_ptr,
+ &hca_ptr->ib_trans.thread );
+ if (dat_status != DAT_SUCCESS)
+ {
+ dapl_dbg_log (DAPL_DBG_TYPE_ERR,
+ " open_hca: failed to create thread\n");
+ return dat_status;
+ }
+
/* get the IP address of the device */
dat_status = getipaddr((char*)&hca_ptr->hca_address,
sizeof(DAT_SOCK_ADDR6) );
@@ -243,6 +232,20 @@ DAT_RETURN dapls_ib_close_hca ( IN DAP
return(dapl_convert_errno(errno,"ib_close_device"));
hca_ptr->ib_hca_handle = IB_INVALID_HANDLE;
}
+
+#if SOCKET_CM
+ /* destroy cr_thread and lock */
+ hca_ptr->ib_trans.destroy = 1;
+ while (hca_ptr->ib_trans.destroy) {
+ struct timespec sleep, remain;
+ sleep.tv_sec = 0;
+ sleep.tv_nsec = 10000000; /* 10 ms */
+ dapl_dbg_log(DAPL_DBG_TYPE_UTIL,
+ " close_hca: waiting for cr_thread\n");
+ nanosleep (&sleep, &remain);
+ }
+ dapl_os_lock_destroy(&hca_ptr->ib_trans.lock);
+#endif
return (DAT_SUCCESS);
}
@@ -297,18 +300,18 @@ DAT_RETURN dapls_ib_query_hca (
((struct sockaddr_in *)ia_attr->ia_address_ptr)->sin_addr.s_addr >> 24 & 0xff );
/* TODO: need verbs query call */
- ia_attr->max_eps = 1000;
- ia_attr->max_dto_per_ep = 1000;
- ia_attr->max_rdma_read_per_ep = 4;
- ia_attr->max_evds = 1000;
- ia_attr->max_evd_qlen = 1000;
- ia_attr->max_iov_segments_per_dto = 10;
- ia_attr->max_lmrs = 1000;
+ ia_attr->max_eps = 64000;
+ ia_attr->max_dto_per_ep = 64000;
+ ia_attr->max_rdma_read_per_ep = 8;
+ ia_attr->max_evds = 64000;
+ ia_attr->max_evd_qlen = 64000;
+ ia_attr->max_iov_segments_per_dto = 32;
+ ia_attr->max_lmrs = 64000;
ia_attr->max_lmr_block_size = 0x80000000;
- ia_attr->max_rmrs = 1000;
+ ia_attr->max_rmrs = 64000;
ia_attr->max_lmr_virtual_address = 0x80000000;
ia_attr->max_rmr_target_address = 0x80000000;
- ia_attr->max_pzs = 1000;
+ ia_attr->max_pzs = 64000;
ia_attr->max_mtu_size = 0x80000000;
ia_attr->max_rdma_size = 0x80000000;
ia_attr->num_transport_attr = 0;
@@ -333,12 +336,12 @@ DAT_RETURN dapls_ib_query_hca (
if (ep_attr != NULL) {
ep_attr->max_mtu_size = 0x80000000;
ep_attr->max_rdma_size = 0x80000000;
- ep_attr->max_recv_dtos = 1000;
- ep_attr->max_request_dtos = 1000;
- ep_attr->max_recv_iov = 10;
- ep_attr->max_request_iov = 10;
- ep_attr->max_rdma_read_in = 4;
- ep_attr->max_rdma_read_out= 4;
+ ep_attr->max_recv_dtos = 64000;
+ ep_attr->max_request_dtos = 64000;
+ ep_attr->max_recv_iov = 32;
+ ep_attr->max_request_iov = 32;
+ ep_attr->max_rdma_read_in = 8;
+ ep_attr->max_rdma_read_out= 8;
dapl_dbg_log (DAPL_DBG_TYPE_UTIL,
" query_hca: MAX msg %llu dto %d iov %d rdma i%d,o%d\n",
ep_attr->max_mtu_size,
@@ -394,7 +397,7 @@ DAT_RETURN dapls_ib_setup_async_callback
hca_ptr->async_cq_error = callback;
break;
case DAPL_ASYNC_CQ_COMPLETION:
- hca_ptr->async_cq_completion = callback;
+ hca_ptr->async_cq = callback;
break;
case DAPL_ASYNC_QP_ERROR:
hca_ptr->async_qp_error = callback;
Index: openib/dapl_ib_mem.c
===================================================================
--- openib/dapl_ib_mem.c (revision 2190)
+++ openib/dapl_ib_mem.c (working copy)
@@ -1,26 +1,25 @@
/*
- * Copyright (c) 2002-2003, Network Appliance, Inc. All rights reserved.
- *
- * This Software is licensed under either one of the following two licenses:
+ * This Software is licensed under one of the following licenses:
*
* 1) under the terms of the "Common Public License 1.0" a copy of which is
- * in the file LICENSE.txt in the root directory. The license is also
* available from the Open Source Initiative, see
* http://www.opensource.org/licenses/cpl.php.
- * OR
*
- * 2) under the terms of the "The BSD License" a copy of which is in the file
- * LICENSE2.txt in the root directory. The license is also available from
- * the Open Source Initiative, see
+ * 2) under the terms of the "The BSD License" a copy of which is
+ * available from the Open Source Initiative, see
* http://www.opensource.org/licenses/bsd-license.php.
*
- * Licensee has the right to choose either one of the above two licenses.
+ * 3) under the terms of the "GNU General Public License (GPL) Version 2" a
+ * copy of which is available from the Open Source Initiative, see
+ * http://www.opensource.org/licenses/gpl-license.php.
+ *
+ * Licensee has the right to choose one of the above licenses.
*
- * Redistributions of source code must retain both the above copyright
- * notice and either one of the license notices.
+ * Redistributions of source code must retain the above copyright
+ * notice and one of the license notices.
*
* Redistributions in binary form must reproduce both the above copyright
- * notice, either one of the license notices in the documentation
+ * notice, one of the license notices in the documentation
* and/or other materials provided with the distribution.
*/
@@ -31,7 +30,7 @@
* PURPOSE: Intel DET APIs: Memory windows, registration,
* and protection domain
*
- * $Id:$
+ * $Id: $
*
**********************************************************************/
@@ -182,8 +181,11 @@ dapls_ib_mr_register (
ia_ptr, lmr, virt_addr, length, privileges );
/* TODO: shared memory */
- if (lmr->param.mem_type == DAT_MEM_TYPE_SHARED_VIRTUAL)
+ if (lmr->param.mem_type == DAT_MEM_TYPE_SHARED_VIRTUAL) {
+ dapl_dbg_log( DAPL_DBG_TYPE_ERR,
+ " mr_register_shared: NOT IMPLEMENTED\n");
return DAT_ERROR (DAT_NOT_IMPLEMENTED, DAT_NO_SUBTYPE);
+ }
/* local read is default on IB */
lmr->mr_handle =
@@ -266,6 +268,7 @@ dapls_ib_mr_register_shared (
IN DAPL_LMR *lmr,
IN DAT_MEM_PRIV_FLAGS privileges )
{
+ dapl_dbg_log(DAPL_DBG_TYPE_ERR," mr_register_shared: NOT IMPLEMENTED\n");
return DAT_ERROR (DAT_NOT_IMPLEMENTED, DAT_NO_SUBTYPE);
}
@@ -289,6 +292,8 @@ DAT_RETURN
dapls_ib_mw_alloc (
IN DAPL_RMR *rmr )
{
+
+ dapl_dbg_log(DAPL_DBG_TYPE_ERR," mw_alloc: NOT IMPLEMENTED\n");
return DAT_ERROR (DAT_NOT_IMPLEMENTED, DAT_NO_SUBTYPE);
}
@@ -312,6 +317,7 @@ DAT_RETURN
dapls_ib_mw_free (
IN DAPL_RMR *rmr )
{
+ dapl_dbg_log(DAPL_DBG_TYPE_ERR," mw_free: NOT IMPLEMENTED\n");
return DAT_ERROR (DAT_NOT_IMPLEMENTED, DAT_NO_SUBTYPE);
}
@@ -343,6 +349,7 @@ dapls_ib_mw_bind (
IN DAT_MEM_PRIV_FLAGS mem_priv,
IN DAT_BOOLEAN is_signaled)
{
+ dapl_dbg_log(DAPL_DBG_TYPE_ERR," mw_bind: NOT IMPLEMENTED\n");
return DAT_ERROR (DAT_NOT_IMPLEMENTED, DAT_NO_SUBTYPE);
}
@@ -371,6 +378,7 @@ dapls_ib_mw_unbind (
IN DAPL_COOKIE *cookie,
IN DAT_BOOLEAN is_signaled )
{
+ dapl_dbg_log(DAPL_DBG_TYPE_ERR," mw_unbind: NOT IMPLEMENTED\n");
return DAT_ERROR (DAT_NOT_IMPLEMENTED, DAT_NO_SUBTYPE);
}
Index: openib/dapl_ib_cm.c
===================================================================
--- openib/dapl_ib_cm.c (revision 2190)
+++ openib/dapl_ib_cm.c (working copy)
@@ -74,10 +74,12 @@ static DAT_RETURN dapli_socket_listen (
DAT_CONN_QUAL serviceID,
DAPL_SP *sp_ptr );
-static DAT_RETURN dapli_socket_accept( DAPL_EP *ep_ptr,
- DAPL_CR *cr_ptr,
- DAT_COUNT p_size,
- DAT_PVOID p_data );
+static DAT_RETURN dapli_socket_accept( ib_cm_srvc_handle_t cm_ptr );
+
+static DAT_RETURN dapli_socket_accept_final( DAPL_EP *ep_ptr,
+ DAPL_CR *cr_ptr,
+ DAT_COUNT p_size,
+ DAT_PVOID p_data );
/* XXX temporary hack to get lid */
static uint16_t dapli_get_lid(IN struct ibv_device *dev, IN int port)
@@ -114,6 +116,7 @@ dapli_socket_connect ( DAPL_EP *ep_ptr
DAPL_IA *ia_ptr = ep_ptr->header.owner_ia;
int len, opt = 1;
struct iovec iovec[2];
+ short rtu_data = htons(0x0E0F);
dapl_dbg_log(DAPL_DBG_TYPE_EP, " connect: r_qual %d\n", r_qual);
@@ -133,7 +136,7 @@ dapli_socket_connect ( DAPL_EP *ep_ptr
return DAT_INSUFFICIENT_RESOURCES;
}
- ((struct sockaddr_in*)r_addr)->sin_port = r_qual;
+ ((struct sockaddr_in*)r_addr)->sin_port = htons(r_qual);
if ( connect(cm_ptr->socket, r_addr, sizeof(*r_addr)) < 0 ) {
dapl_dbg_log(DAPL_DBG_TYPE_ERR,
@@ -153,9 +156,11 @@ dapli_socket_connect ( DAPL_EP *ep_ptr
cm_ptr->dst.p_size = p_size;
iovec[0].iov_base = &cm_ptr->dst;
iovec[0].iov_len = sizeof(ib_qp_cm_t);
- iovec[1].iov_base = p_data;
- iovec[1].iov_len = p_size;
- len = writev( cm_ptr->socket, iovec, 2 );
+ if ( p_size ) {
+ iovec[1].iov_base = p_data;
+ iovec[1].iov_len = p_size;
+ }
+ len = writev( cm_ptr->socket, iovec, (p_size ? 2:1) );
if ( len != (p_size + sizeof(ib_qp_cm_t)) ) {
dapl_dbg_log(DAPL_DBG_TYPE_ERR,
" connect write: ERR %s, wcnt=%d\n",
@@ -190,8 +195,8 @@ dapli_socket_connect ( DAPL_EP *ep_ptr
/* read private data into cm_handle if any present */
if ( cm_ptr->dst.p_size ) {
- iovec[1].iov_base = cm_ptr->p_data;
- iovec[1].iov_len = cm_ptr->dst.p_size;
+ iovec[0].iov_base = cm_ptr->p_data;
+ iovec[0].iov_len = cm_ptr->dst.p_size;
len = readv( cm_ptr->socket, iovec, 1 );
if ( len != cm_ptr->dst.p_size ) {
dapl_dbg_log(DAPL_DBG_TYPE_ERR,
@@ -213,10 +218,11 @@ dapli_socket_connect ( DAPL_EP *ep_ptr
ep_ptr->qp_state = IB_QP_STATE_RTS;
/* complete handshake after final QP state change */
- write(cm_ptr->socket, "QP_RTR_RTS", sizeof "QP_RTR_RTS");
+ write(cm_ptr->socket, &rtu_data, sizeof(rtu_data) );
/* init cm_handle and post the event with private data */
ep_ptr->cm_handle = cm_ptr;
+ dapl_dbg_log( DAPL_DBG_TYPE_EP," ACTIVE: connected!\n" );
dapl_evd_connection_callback( ep_ptr->cm_handle,
IB_CME_CONNECTED,
cm_ptr->p_data,
@@ -248,9 +254,7 @@ dapli_socket_listen ( DAPL_IA *ia_ptr,
{
struct sockaddr_in addr;
ib_cm_srvc_handle_t cm_ptr = NULL;
- void *p_data = NULL;
- int l_sock = -1;
- int len, opt = 1;
+ int opt = 1;
DAT_RETURN dat_status = DAT_SUCCESS;
dapl_dbg_log ( DAPL_DBG_TYPE_EP,
@@ -263,26 +267,30 @@ dapli_socket_listen ( DAPL_IA *ia_ptr,
(void) dapl_os_memzero( cm_ptr, sizeof( *cm_ptr ) );
- cm_ptr->socket = -1;
+ cm_ptr->socket = cm_ptr->l_socket = -1;
cm_ptr->sp = sp_ptr;
cm_ptr->hca_ptr = ia_ptr->hca_ptr;
/* bind, listen, set sockopt, accept, exchange data */
- if ((l_sock = socket(AF_INET, SOCK_STREAM, 0)) < 0) {
+ if ((cm_ptr->l_socket = socket(AF_INET, SOCK_STREAM, 0)) < 0) {
dapl_dbg_log (DAPL_DBG_TYPE_ERR,
"socket for listen returned %d\n", errno);
dat_status = DAT_INSUFFICIENT_RESOURCES;
goto bail;
}
- setsockopt(l_sock,SOL_SOCKET,SO_REUSEADDR,&opt,sizeof(opt));
- addr.sin_port = serviceID;
+ setsockopt(cm_ptr->l_socket,SOL_SOCKET,SO_REUSEADDR,&opt,sizeof(opt));
+ addr.sin_port = htons(serviceID);
addr.sin_family = AF_INET;
addr.sin_addr.s_addr = INADDR_ANY;
- if (( bind( l_sock,(struct sockaddr*)&addr, sizeof(addr) ) < 0) ||
- (listen( l_sock, 1 ) < 0) ) {
+ if (( bind( cm_ptr->l_socket,(struct sockaddr*)&addr, sizeof(addr) ) < 0) ||
+ (listen( cm_ptr->l_socket, 128 ) < 0) ) {
+ dapl_dbg_log( DAPL_DBG_TYPE_ERR,
+ " listen: ERROR %s on conn_qual 0x%x\n",
+ strerror(errno),serviceID);
+
if ( errno == EADDRINUSE )
dat_status = DAT_CONN_QUAL_IN_USE;
else
@@ -290,109 +298,144 @@ dapli_socket_listen ( DAPL_IA *ia_ptr,
goto bail;
}
+
+ /* set cm_handle for this service point, save listen socket */
+ sp_ptr->cm_srvc_handle = cm_ptr;
- /* block on the accept */
- len = sizeof(cm_ptr->dst.ia_address);
- cm_ptr->socket = accept(l_sock,
- (struct sockaddr*)&cm_ptr->dst.ia_address,
+ /* add to SP->CR thread list */
+ dapl_llist_init_entry((DAPL_LLIST_ENTRY*)&cm_ptr->entry);
+ dapl_os_lock( &cm_ptr->hca_ptr->ib_trans.lock );
+ dapl_llist_add_tail(&cm_ptr->hca_ptr->ib_trans.list,
+ (DAPL_LLIST_ENTRY*)&cm_ptr->entry, cm_ptr);
+ dapl_os_unlock(&cm_ptr->hca_ptr->ib_trans.lock);
+
+ dapl_dbg_log( DAPL_DBG_TYPE_CM,
+ " listen: qual 0x%x cr %p s_fd %d\n",
+ ntohs(serviceID), cm_ptr, cm_ptr->l_socket );
+
+ return dat_status;
+bail:
+ dapl_dbg_log( DAPL_DBG_TYPE_ERR,
+ " listen: ERROR on conn_qual 0x%x\n",serviceID);
+ if ( cm_ptr->l_socket >= 0 )
+ close( cm_ptr->l_socket );
+ dapl_os_free( cm_ptr, sizeof( *cm_ptr ) );
+ return dat_status;
+}
+
+
+/*
+ * PASSIVE: send local QP information, private data, and wait for
+ * active side to respond with QP RTS/RTR status
+ */
+static DAT_RETURN
+dapli_socket_accept(ib_cm_srvc_handle_t cm_ptr)
+{
+ ib_cm_handle_t acm_ptr;
+ void *p_data = NULL;
+ int len;
+ DAT_RETURN dat_status = DAT_SUCCESS;
+
+ /* Allocate accept CM and initialize */
+ if ((acm_ptr = dapl_os_alloc(sizeof(*acm_ptr))) == NULL)
+ return DAT_INSUFFICIENT_RESOURCES;
+
+ (void) dapl_os_memzero( acm_ptr, sizeof( *acm_ptr ) );
+
+ acm_ptr->socket = -1;
+ acm_ptr->sp = cm_ptr->sp;
+ acm_ptr->hca_ptr = cm_ptr->hca_ptr;
+
+ len = sizeof(acm_ptr->dst.ia_address);
+ acm_ptr->socket = accept(cm_ptr->l_socket,
+ (struct sockaddr*)&acm_ptr->dst.ia_address,
&len );
- if ( cm_ptr->socket < 0 ) {
+ if ( acm_ptr->socket < 0 ) {
dapl_dbg_log(DAPL_DBG_TYPE_ERR,
- " listen accept: ERR %s\n",strerror(errno));
+ " accept: ERR %s on FD %d l_cr %p\n",
+ strerror(errno),cm_ptr->l_socket,cm_ptr);
dat_status = DAT_INTERNAL_ERROR;
goto bail;
}
/* read in DST QP info, IA address. check for private data */
- len = read( cm_ptr->socket, &cm_ptr->dst, sizeof(ib_qp_cm_t) );
+ len = read( acm_ptr->socket, &acm_ptr->dst, sizeof(ib_qp_cm_t) );
if ( len != sizeof(ib_qp_cm_t) ) {
dapl_dbg_log(DAPL_DBG_TYPE_ERR,
- " listen read: ERR %s, rcnt=%d\n",
- strerror(errno), len);
+ " accept read: ERR %s, rcnt=%d\n",
+ strerror(errno), len);
dat_status = DAT_INTERNAL_ERROR;
goto bail;
}
-
dapl_dbg_log(DAPL_DBG_TYPE_EP,
- " listen: DST port=0x%x lid=0x%x, qpn=0x%x, psize=%d\n",
- cm_ptr->dst.port, cm_ptr->dst.lid,
- cm_ptr->dst.qpn, cm_ptr->dst.p_size );
+ " accept: DST port=0x%x lid=0x%x, qpn=0x%x, psize=%d\n",
+ acm_ptr->dst.port, acm_ptr->dst.lid,
+ acm_ptr->dst.qpn, acm_ptr->dst.p_size );
/* validate private data size before reading */
- if ( cm_ptr->dst.p_size > IB_MAX_REQ_PDATA_SIZE ) {
+ if ( acm_ptr->dst.p_size > IB_MAX_REQ_PDATA_SIZE ) {
dapl_dbg_log(DAPL_DBG_TYPE_ERR,
- " listen read: psize (%d) wrong\n",
- cm_ptr->dst.p_size );
+ " accept read: psize (%d) wrong\n",
+ acm_ptr->dst.p_size );
dat_status = DAT_INTERNAL_ERROR;
goto bail;
}
/* read private data into cm_handle if any present */
- if ( cm_ptr->dst.p_size ) {
- len = read( cm_ptr->socket,
- cm_ptr->p_data,
- cm_ptr->dst.p_size );
- if ( len != cm_ptr->dst.p_size ) {
+ if ( acm_ptr->dst.p_size ) {
+ len = read( acm_ptr->socket,
+ acm_ptr->p_data, acm_ptr->dst.p_size );
+ if ( len != acm_ptr->dst.p_size ) {
dapl_dbg_log(DAPL_DBG_TYPE_ERR,
- " listen read pdata: ERR %s, rcnt=%d\n",
+ " accept read pdata: ERR %s, rcnt=%d\n",
strerror(errno), len );
dat_status = DAT_INTERNAL_ERROR;
goto bail;
}
- p_data = cm_ptr->p_data;
+ dapl_dbg_log(DAPL_DBG_TYPE_EP,
+ " accept: psize=%d read\n",
+ acm_ptr->dst.p_size);
+ p_data = acm_ptr->p_data;
}
-
- /* set cm_handle for this service point */
- sp_ptr->cm_srvc_handle = cm_ptr;
- /*
- * dapls_ib_accept_connection send QP information
- * and complete CM handshake
- */
-
/* trigger CR event and return SUCCESS */
- dapls_cr_callback( cm_ptr,
+ dapls_cr_callback( acm_ptr,
IB_CME_CONNECTION_REQUEST_PENDING,
p_data,
- sp_ptr );
+ acm_ptr->sp );
- return dat_status;
+ return DAT_SUCCESS;
bail:
- if ( l_sock >= 0 )
- close( l_sock );
- if ( cm_ptr->socket >= 0 )
- close( cm_ptr->socket );
- if ( cm_ptr )
- dapl_os_free( cm_ptr, sizeof( *cm_ptr ) );
-
- return dat_status;
+ if ( acm_ptr->socket >=0 )
+ close( acm_ptr->socket );
+ dapl_os_free( acm_ptr, sizeof( *acm_ptr ) );
+ return DAT_INTERNAL_ERROR;
}
-
-/*
- * PASSIVE: send local QP information, private data, and wait for
- * active side to respond with QP RTS/RTR status
- */
static DAT_RETURN
-dapli_socket_accept( DAPL_EP *ep_ptr,
- DAPL_CR *cr_ptr,
- DAT_COUNT p_size,
- DAT_PVOID p_data )
+dapli_socket_accept_final( DAPL_EP *ep_ptr,
+ DAPL_CR *cr_ptr,
+ DAT_COUNT p_size,
+ DAT_PVOID p_data )
{
- ib_cm_handle_t cm_ptr = cr_ptr->ib_cm_handle;
DAPL_IA *ia_ptr = ep_ptr->header.owner_ia;
+ ib_cm_handle_t cm_ptr = cr_ptr->ib_cm_handle;
ib_qp_cm_t qp_cm;
struct iovec iovec[2];
int len;
- char r_buf[10] = "XX_XXX_XXX";
+ short rtu_data = 0;
if (p_size > IB_MAX_REP_PDATA_SIZE)
- return (DAT_LENGTH_ERROR);
+ return DAT_LENGTH_ERROR;
+ /* must have a accepted socket */
+ if ( cm_ptr->socket < 0 )
+ return DAT_INTERNAL_ERROR;
+
/* modify QP to RTR and then to RTS with remote info already read */
if ( dapls_modify_qp_state( ep_ptr->qp_handle,
IBV_QPS_RTR, &cm_ptr->dst ) != DAT_SUCCESS )
@@ -413,42 +456,42 @@ dapli_socket_accept( DAPL_EP *ep_ptr,
qp_cm.p_size = p_size;
iovec[0].iov_base = &qp_cm;
iovec[0].iov_len = sizeof(ib_qp_cm_t);
- iovec[1].iov_base = p_data;
- iovec[1].iov_len = p_size;
- len = writev( cm_ptr->socket, iovec, 2 );
+ if (p_size) {
+ iovec[1].iov_base = p_data;
+ iovec[1].iov_len = p_size;
+ }
+ len = writev( cm_ptr->socket, iovec, (p_size ? 2:1) );
if (len != (p_size + sizeof(ib_qp_cm_t))) {
dapl_dbg_log(DAPL_DBG_TYPE_ERR,
- " connect write: ERR %s, wcnt=%d\n",
+ " accept_final: ERR %s, wcnt=%d\n",
strerror(errno), len);
goto bail;
}
dapl_dbg_log(DAPL_DBG_TYPE_EP,
- " accept: SRC port=0x%x lid=0x%x, qpn=0x%x, psize=%d\n",
+ " accept_final: SRC port=0x%x lid=0x%x, qpn=0x%x, psize=%d\n",
qp_cm.port, qp_cm.lid, qp_cm.qpn, qp_cm.p_size );
-
+
/* complete handshake after final QP state change */
- len = read(cm_ptr->socket, r_buf, sizeof(r_buf) );
- if ( len != sizeof(r_buf) ) {
+ len = read(cm_ptr->socket, &rtu_data, sizeof(rtu_data) );
+ if ( len != sizeof(rtu_data) || ntohs(rtu_data) != 0x0e0f ) {
dapl_dbg_log(DAPL_DBG_TYPE_ERR,
- " accept: ERR %s, rcnt=%d\n",
- strerror(errno), len);
+ " accept_final: ERR %s, rcnt=%d rdata=%x\n",
+ strerror(errno), len, ntohs(rtu_data) );
goto bail;
}
/* final data exchange if remote QP state is good to go */
- dapl_dbg_log( DAPL_DBG_TYPE_EP," accept: %s \n", r_buf);
-
- dapls_cr_callback ( cm_ptr,
- IB_CME_CONNECTED,
- NULL,
- cm_ptr->sp );
-
+ dapl_dbg_log( DAPL_DBG_TYPE_EP," PASSIVE: connected!\n" );
+ dapls_cr_callback ( cm_ptr, IB_CME_CONNECTED, NULL, cm_ptr->sp );
return DAT_SUCCESS;
bail:
- dapl_dbg_log( DAPL_DBG_TYPE_ERR," accept: ERR !QP_RTR_RTS \n");
- close( cm_ptr->socket );
+ dapl_dbg_log( DAPL_DBG_TYPE_ERR," accept_final: ERR !QP_RTR_RTS \n");
+ if ( cm_ptr >= 0 )
+ close( cm_ptr->socket );
+ dapl_os_free( cm_ptr, sizeof( *cm_ptr ) );
dapls_ib_reinit_ep( ep_ptr ); /* reset QP state */
+
return DAT_INTERNAL_ERROR;
}
@@ -482,7 +525,7 @@ dapls_ib_connect (
IN DAT_IA_ADDRESS_PTR remote_ia_address,
IN DAT_CONN_QUAL remote_conn_qual,
IN DAT_COUNT private_data_size,
- IN DAT_PVOID private_data )
+ IN void *private_data )
{
DAPL_EP *ep_ptr;
ib_qp_handle_t qp_ptr;
@@ -545,18 +588,19 @@ dapls_ib_disconnect (
dapls_ib_reinit_ep(ep_ptr);
#endif
-
- if ( ep_ptr->cr_ptr )
+ if ( ep_ptr->cr_ptr ) {
dapls_cr_callback ( ep_ptr->cm_handle,
IB_CME_DISCONNECTED,
NULL,
((DAPL_CR *)ep_ptr->cr_ptr)->sp_ptr );
- else
+ } else {
dapl_evd_connection_callback ( ep_ptr->cm_handle,
IB_CME_DISCONNECTED,
NULL,
ep_ptr );
-
+ ep_ptr->cm_handle = NULL;
+ dapl_os_free( cm_ptr, sizeof( *cm_ptr ) );
+ }
return DAT_SUCCESS;
}
@@ -584,7 +628,6 @@ dapls_ib_disconnect_clean (
IN DAT_BOOLEAN active,
IN const ib_cm_events_t ib_cm_event )
{
-
return;
}
@@ -644,25 +687,22 @@ dapls_ib_remove_conn_listener (
IN DAPL_IA *ia_ptr,
IN DAPL_SP *sp_ptr )
{
-
ib_cm_srvc_handle_t cm_ptr = sp_ptr->cm_srvc_handle;
dapl_dbg_log (DAPL_DBG_TYPE_EP,
"dapls_ib_remove_conn_listener(ia_ptr %p sp_ptr %p cm_ptr %p)\n",
ia_ptr, sp_ptr, cm_ptr );
#ifdef SOCKET_CM
-
/* close accepted socket, free cm_srvc_handle and return */
if ( cm_ptr != NULL ) {
- if ( cm_ptr->socket > 0 ) {
- close( cm_ptr->socket );
- cm_ptr->socket = 0;
+ if ( cm_ptr->l_socket >= 0 ) {
+ close( cm_ptr->l_socket );
+ cm_ptr->socket = -1;
}
- dapl_os_free( cm_ptr, sizeof( *cm_ptr ) );
+ /* cr_thread will free */
sp_ptr->cm_srvc_handle = NULL;
}
return DAT_SUCCESS;
-
#else
return DAT_NOT_IMPLEMENTED;
@@ -717,7 +757,7 @@ dapls_ib_accept_connection (
}
#ifdef SOCKET_CM
- return ( dapli_socket_accept(ep_ptr, cr_ptr, p_size, p_data) );
+ return ( dapli_socket_accept_final(ep_ptr, cr_ptr, p_size, p_data) );
#else
return DAT_NOT_IMPLEMENTED;
#endif
@@ -756,13 +796,13 @@ dapls_ib_reject_connection (
/* just close the socket and return */
if ( cm_ptr->socket > 0 ) {
close( cm_ptr->socket );
- cm_ptr->socket = 0;
+ cm_ptr->socket = -1;
}
-
return DAT_SUCCESS;
-
+#else
+ return DAT_NOT_IMPLEMENTED;
#endif
- return DAT_SUCCESS;
+
}
@@ -984,6 +1024,76 @@ dapls_ib_get_cm_event (
return ib_cm_event;
}
+/* async CR processing thread to avoid blocking applications */
+void cr_thread(void *arg)
+{
+ struct dapl_hca *hca_ptr = arg;
+ ib_cm_srvc_handle_t cr, next_cr;
+ int max_fd;
+ fd_set rfd,rfds;
+ struct timeval to;
+
+ dapl_os_lock( &hca_ptr->ib_trans.lock );
+ while ( !hca_ptr->ib_trans.destroy ) {
+
+ FD_ZERO( &rfds );
+ max_fd = -1;
+
+ if (!dapl_llist_is_empty(&hca_ptr->ib_trans.list))
+ next_cr = dapl_llist_peek_head (&hca_ptr->ib_trans.list);
+ else
+ next_cr = NULL;
+
+ while (next_cr) {
+ cr = next_cr;
+ dapl_dbg_log (DAPL_DBG_TYPE_CM," thread: cm_ptr %p\n", cr );
+ if (cr->l_socket == -1 || hca_ptr->ib_trans.destroy) {
+
+ dapl_dbg_log(DAPL_DBG_TYPE_CM," thread: Freeing %p\n", cr);
+ next_cr = dapl_llist_next_entry(&hca_ptr->ib_trans.list,
+ (DAPL_LLIST_ENTRY*)&cr->entry );
+ dapl_llist_remove_entry(&hca_ptr->ib_trans.list,
+ (DAPL_LLIST_ENTRY*)&cr->entry);
+ dapl_os_free( cr, sizeof(*cr) );
+ continue;
+ }
+
+ FD_SET( cr->l_socket, &rfds ); /* add to select set */
+ if ( cr->l_socket > max_fd )
+ max_fd = cr->l_socket;
+
+ /* individual select poll to check for work */
+ FD_ZERO(&rfd);
+ FD_SET(cr->l_socket, &rfd);
+ dapl_os_unlock(&hca_ptr->ib_trans.lock);
+ to.tv_sec = 0;
+ to.tv_usec = 0;
+ if ( select(cr->l_socket + 1,&rfd, NULL, NULL, &to) < 0) {
+ dapl_dbg_log (DAPL_DBG_TYPE_CM,
+ " thread: ERR %s on cr %p sk %d\n",
+ strerror(errno), cr, cr->l_socket);
+ close(cr->l_socket);
+ cr->l_socket = -1;
+ } else if ( FD_ISSET(cr->l_socket, &rfd) &&
+ dapli_socket_accept(cr)) {
+ close(cr->l_socket);
+ cr->l_socket = -1;
+ }
+ dapl_os_lock( &hca_ptr->ib_trans.lock );
+ next_cr = dapl_llist_next_entry(&hca_ptr->ib_trans.list,
+ (DAPL_LLIST_ENTRY*)&cr->entry );
+ }
+ dapl_os_unlock( &hca_ptr->ib_trans.lock );
+ to.tv_sec = 0;
+ to.tv_usec = 500000; /* wakeup and check destroy */
+ select(max_fd + 1, &rfds, NULL, NULL, &to);
+ dapl_os_lock( &hca_ptr->ib_trans.lock );
+ }
+ dapl_os_unlock( &hca_ptr->ib_trans.lock );
+ hca_ptr->ib_trans.destroy = 0;
+ dapl_dbg_log(DAPL_DBG_TYPE_CM," thread(hca %p) exit\n",hca_ptr);
+}
+
/* Real IBv CM */
#else
Index: openib/dapl_ib_qp.c
===================================================================
--- openib/dapl_ib_qp.c (revision 2190)
+++ openib/dapl_ib_qp.c (working copy)
@@ -1,26 +1,25 @@
/*
- * Copyright (c) 2002-2003, Network Appliance, Inc. All rights reserved.
- *
- * This Software is licensed under either one of the following two licenses:
+ * This Software is licensed under one of the following licenses:
*
* 1) under the terms of the "Common Public License 1.0" a copy of which is
- * in the file LICENSE.txt in the root directory. The license is also
* available from the Open Source Initiative, see
* http://www.opensource.org/licenses/cpl.php.
- * OR
*
- * 2) under the terms of the "The BSD License" a copy of which is in the file
- * LICENSE2.txt in the root directory. The license is also available from
- * the Open Source Initiative, see
+ * 2) under the terms of the "The BSD License" a copy of which is
+ * available from the Open Source Initiative, see
* http://www.opensource.org/licenses/bsd-license.php.
*
- * Licensee has the right to choose either one of the above two licenses.
+ * 3) under the terms of the "GNU General Public License (GPL) Version 2" a
+ * copy of which is available from the Open Source Initiative, see
+ * http://www.opensource.org/licenses/gpl-license.php.
+ *
+ * Licensee has the right to choose one of the above licenses.
*
- * Redistributions of source code must retain both the above copyright
- * notice and either one of the license notices.
+ * Redistributions of source code must retain the above copyright
+ * notice and one of the license notices.
*
* Redistributions in binary form must reproduce both the above copyright
- * notice, either one of the license notices in the documentation
+ * notice, one of the license notices in the documentation
* and/or other materials provided with the distribution.
*/
@@ -30,7 +29,7 @@
*
* PURPOSE: QP routines for access to DET Verbs
*
- * $Id:$
+ * $Id: $
**********************************************************************/
#include "dapl.h"
@@ -311,7 +310,7 @@ dapls_modify_qp_state ( IN ib_qp_handle_
qp_attr.path_mtu = IBV_MTU_1024;
qp_attr.dest_qp_num = qp_cm->qpn;
qp_attr.rq_psn = 1;
- qp_attr.max_dest_rd_atomic = 1;
+ qp_attr.max_dest_rd_atomic = 8;
qp_attr.min_rnr_timer = 12;
qp_attr.ah_attr.is_global = 0;
qp_attr.ah_attr.dlid = qp_cm->lid;
@@ -338,7 +337,7 @@ dapls_modify_qp_state ( IN ib_qp_handle_
qp_attr.retry_cnt = 7;
qp_attr.rnr_retry = 7;
qp_attr.sq_psn = 1;
- qp_attr.max_rd_atomic = 1;
+ qp_attr.max_rd_atomic = 8;
dapl_dbg_log (DAPL_DBG_TYPE_EP,
" modify_qp_rts: psn %x or %x\n",
qp_attr.sq_psn, qp_attr.max_rd_atomic );
Index: openib/README
===================================================================
--- openib/README (revision 2190)
+++ openib/README (working copy)
@@ -41,8 +41,8 @@ A simple dapl test just for initial open
known issues:
- early drop, good luck! Only tested with a simple dtest.
- see TODO for more details
- events not working??
+ early drop, only tested with simple dtest and dapltest SR.
+ no memory windows support in ibverbs, dat_create_rmr fails.
+
Index: openib/dapl_ib_util.h
===================================================================
--- openib/dapl_ib_util.h (revision 2190)
+++ openib/dapl_ib_util.h (working copy)
@@ -79,10 +79,23 @@ typedef struct _ib_qp_cm
} ib_qp_cm_t;
-/* EP->cm_handle for connect, SP->cm_srvc_handle for listen */
+/*
+ * dapl_llist_entry in dapl.h but dapl.h depends on provider
+ * typedef's in this file first. move dapl_llist_entry out of dapl.h
+ */
+struct ib_llist_entry
+{
+ struct dapl_llist_entry *flink;
+ struct dapl_llist_entry *blink;
+ void *data;
+ struct dapl_llist_entry *list_head;
+};
+
struct ib_cm_handle
{
- int socket;
+ struct ib_llist_entry entry;
+ int socket;
+ int l_socket;
struct dapl_hca *hca_ptr;
DAT_HANDLE cr;
DAT_HANDLE sp;
@@ -112,6 +125,9 @@ typedef enum
} ib_cm_events_t;
+/* prototype for cm thread */
+void cr_thread (void *arg);
+
#else
/* TODO: Waiting for IB CM to define */
@@ -205,11 +221,6 @@ typedef struct dapl_evd *ib_wait_obj_ha
* ibv_post_recv - Return 0, -1 & bad_wr
*/
-/* definitions from libmthca/src/cq.c, should be in verbs.h */
-#define IB_CQ_OK 0
-#define IB_CQ_EMPTY -1
-#define IB_POLL_ERR -2
-
/* async handler for CQ, QP, and unafiliated */
typedef void (*ib_async_handler_t)(
IN ib_hca_handle_t ib_hca_handle,
@@ -221,11 +232,18 @@ typedef struct _ib_hca_transport
{
struct ibv_device *ib_dev;
ib_cq_handle_t ib_cq_empty;
+
+#if SOCKET_CM
+ int destroy;
+ DAPL_OS_THREAD thread;
+ DAPL_OS_LOCK lock;
+ struct dapl_llist_entry *list;
+#endif
ib_async_handler_t async_unafiliated;
ib_async_handler_t async_cq_error;
- ib_async_handler_t async_cq_completion;
+ ib_async_handler_t async_cq;
ib_async_handler_t async_qp_error;
-
+
} ib_hca_tranport_t;
/* provider specfic fields for shared memory support */
Index: openib/dapl_ib_cq.c
===================================================================
--- openib/dapl_ib_cq.c (revision 2190)
+++ openib/dapl_ib_cq.c (working copy)
@@ -382,10 +382,10 @@ DAT_RETURN dapls_ib_completion_notify (
* Output:
* none
*
- * Returns:
+ * Returns:
* DAT_SUCCESS
* DAT_QUEUE_EMPTY
- * dapl_convert_errno
+ *
*/
DAT_RETURN dapls_ib_completion_poll (
IN DAPL_HCA *hca_ptr,
@@ -393,15 +393,12 @@ DAT_RETURN dapls_ib_completion_poll (
IN ib_work_completion_t *wc_ptr)
{
int ret;
-
- ret = ibv_poll_cq(evd_ptr->ib_cq_handle, 1, wc_ptr);
+
+ ret = ibv_poll_cq(evd_ptr->ib_cq_handle, 1, wc_ptr);
if (ret == 1)
return DAT_SUCCESS;
- else if ((ret == IB_CQ_OK) || (ret == IB_CQ_EMPTY))
- return DAT_QUEUE_EMPTY;
- else
- return(dapl_convert_errno(EFAULT,"poll_cq"));;
-
+
+ return DAT_QUEUE_EMPTY;
}
#ifdef CQ_WAIT_OBJECT
@@ -447,24 +444,45 @@ dapls_ib_wait_object_wait (
IN ib_wait_obj_handle_t p_cq_wait_obj_handle,
IN u_int32_t timeout)
{
- int status;
- ib_cq_handle_t cq = p_cq_wait_obj_handle->ib_cq_handle;
- struct ibv_cq *ibv_cq;
- void *ibv_ctx;
+ DAPL_EVD *evd_ptr = p_cq_wait_obj_handle;
+ ib_cq_handle_t cq = evd_ptr->ib_cq_handle;
+ struct ibv_cq *ibv_cq;
+ void *ibv_ctx;
+ int status = EINVAL; /* invalid handle */
dapl_dbg_log ( DAPL_DBG_TYPE_UTIL,
" cq_object_wait: dev %p evd %p cq %p, time %d\n",
- cq->context, p_cq_wait_obj_handle, cq, timeout );
+ cq->context, evd_ptr, cq, timeout );
- /* will block forever, only 1 per device for now?? */
/* TODO: add timeout, map each CQ created?? */
- if (cq) {
- status = ibv_get_cq_event(cq->context, 0, &ibv_cq, &ibv_ctx);
- if (!status && (ibv_cq == cq))
- return DAT_SUCCESS;
+ /* Multiple EVD's sharing one event handle for now */
+ while (evd_ptr->ib_cq_handle) {
+
+ status = ibv_get_cq_event(cq->context,
+ 0, &ibv_cq, &ibv_ctx);
+ if (status)
+ break;
+
+ /* EVD mismatch, process DTO callback for this EVD */
+ if (ibv_cq != cq) {
+ ib_hca_tranport_t *hca_ptr =
+ &evd_ptr->header.owner_ia->hca_ptr->ib_trans;
+
+ if ( hca_ptr->async_cq )
+ hca_ptr->async_cq(cq->context,
+ (ib_error_record_t*)ibv_cq,
+ ibv_ctx);
+
+ continue;
+ }
+ break;
}
- return(dapl_convert_errno(EFAULT,"cq_wait_object_wait"));
+ dapl_dbg_log (DAPL_DBG_TYPE_UTIL,
+ " cq_object_wait: RET cq %p ibv_cq %p ibv_ctx %p %x\n",
+ cq,ibv_cq,ibv_ctx,status);
+
+ return(dapl_convert_errno(status,"cq_wait_object_wait"));
}
#endif
More information about the general
mailing list