[ofa-general] [PATCH 4/7][v1.2] dapl scm: Adding socket cm provider for better scalability on large homogeneous clusters.
Davis, Arlin R
arlin.r.davis at intel.com
Fri Jun 20 11:50:11 PDT 2008
Bring socket cm provider back to life with some changes:
better threading support for exchanging QP information.
Avoid blocking during connect to support dynamic connection
model with MPI implementations.
consumer control of ack timeout/retries.
disconnect/reject capabilities via socket exchange.
version support for wire protocol to insure compatibility
with peer scm provider. Add gids to exchange.
validated with Intel MPI on a 14,000+ core fabric using IB DDR.
---
Makefile.am | 134 ++++++-
dapl.spec.in | 10 +-
dapl/openib_scm/dapl_ib_cm.c | 833
+++++++++++++++++++++++++---------------
dapl/openib_scm/dapl_ib_cq.c | 19 +-
dapl/openib_scm/dapl_ib_dto.h | 212 ++++++-----
dapl/openib_scm/dapl_ib_mem.c | 137 ++++----
dapl/openib_scm/dapl_ib_qp.c | 53 ++--
dapl/openib_scm/dapl_ib_util.c | 152 ++++----
dapl/openib_scm/dapl_ib_util.h | 44 ++-
9 files changed, 1016 insertions(+), 578 deletions(-)
diff --git a/Makefile.am b/Makefile.am
index 079ad7f..bc7926d 100644
--- a/Makefile.am
+++ b/Makefile.am
@@ -19,9 +19,11 @@ endif
datlibdir = $(libdir)
dapllibcmadir = $(libdir)
+dapllibscmdir = $(libdir)
datlib_LTLIBRARIES = dat/udat/libdat.la
dapllibcma_LTLIBRARIES = dapl/udapl/libdaplcma.la
+dapllibscm_LTLIBRARIES = dapl/udapl/libdaplscm.la
dat_udat_libdat_la_CFLAGS = -Wall $(DBGFLAGS) -D_GNU_SOURCE $(OSFLAGS)
\
-I$(srcdir)/dat/include/
-I$(srcdir)/dat/udat/ \
@@ -32,15 +34,22 @@ dapl_udapl_libdaplcma_la_CFLAGS = -Wall $(DBGFLAGS)
-D_GNU_SOURCE $(OSFLAGS) \
-I$(srcdir)/dat/include/
-I$(srcdir)/dapl/include/ \
-I$(srcdir)/dapl/common
-I$(srcdir)/dapl/udapl/linux \
-I$(srcdir)/dapl/openib_cma
+
+dapl_udapl_libdaplscm_la_CFLAGS = -Wall $(DBGFLAGS) -D_GNU_SOURCE
$(OSFLAGS) $(XFLAGS) \
+ -DOPENIB -DCQ_WAIT_OBJECT \
+ -I$(srcdir)/dat/include/
-I$(srcdir)/dapl/include/ \
+ -I$(srcdir)/dapl/common
-I$(srcdir)/dapl/udapl/linux \
+ -I$(srcdir)/dapl/openib_scm
+
if HAVE_LD_VERSION_SCRIPT
dat_version_script =
-Wl,--version-script=$(srcdir)/dat/udat/libdat.map
daplcma_version_script =
-Wl,--version-script=$(srcdir)/dapl/udapl/libdaplcma.map
-
+ daplscm_version_script =
-Wl,--version-script=$(srcdir)/dapl/udapl/libdaplscm.map
else
- dat_version_script =
- daplcma_version_script =
-
+ dat_version_script =
+ daplofa_version_script =
+ daplscm_version_script =
endif
#
@@ -168,6 +177,115 @@ dapl_udapl_libdaplcma_la_LDFLAGS = -version-info
1:2:0 $(daplcma_version_script)
-Wl,-init,dapl_init
-Wl,-fini,dapl_fini \
-lpthread -libverbs -lrdmacm
+#
+# uDAPL OpenFabrics Socket CM version: libdaplscm.so
+#
+dapl_udapl_libdaplscm_la_SOURCES = dapl/udapl/dapl_init.c \
+ dapl/udapl/dapl_evd_create.c \
+ dapl/udapl/dapl_evd_query.c \
+ dapl/udapl/dapl_cno_create.c \
+ dapl/udapl/dapl_cno_modify_agent.c \
+ dapl/udapl/dapl_cno_free.c \
+ dapl/udapl/dapl_cno_wait.c \
+ dapl/udapl/dapl_cno_query.c \
+ dapl/udapl/dapl_lmr_create.c \
+ dapl/udapl/dapl_evd_wait.c \
+ dapl/udapl/dapl_evd_disable.c \
+ dapl/udapl/dapl_evd_enable.c \
+ dapl/udapl/dapl_evd_modify_cno.c \
+ dapl/udapl/dapl_evd_set_unwaitable.c \
+ dapl/udapl/dapl_evd_clear_unwaitable.c \
+ dapl/udapl/linux/dapl_osd.c \
+ dapl/common/dapl_cookie.c \
+ dapl/common/dapl_cr_accept.c \
+ dapl/common/dapl_cr_query.c \
+ dapl/common/dapl_cr_reject.c \
+ dapl/common/dapl_cr_util.c \
+ dapl/common/dapl_cr_callback.c \
+ dapl/common/dapl_cr_handoff.c \
+ dapl/common/dapl_ep_connect.c \
+ dapl/common/dapl_ep_create.c \
+ dapl/common/dapl_ep_disconnect.c \
+ dapl/common/dapl_ep_dup_connect.c \
+ dapl/common/dapl_ep_free.c \
+ dapl/common/dapl_ep_reset.c \
+ dapl/common/dapl_ep_get_status.c \
+ dapl/common/dapl_ep_modify.c \
+ dapl/common/dapl_ep_post_rdma_read.c \
+ dapl/common/dapl_ep_post_rdma_write.c \
+ dapl/common/dapl_ep_post_recv.c \
+ dapl/common/dapl_ep_post_send.c \
+ dapl/common/dapl_ep_query.c \
+ dapl/common/dapl_ep_util.c \
+ dapl/common/dapl_evd_dequeue.c \
+ dapl/common/dapl_evd_free.c \
+ dapl/common/dapl_evd_post_se.c \
+ dapl/common/dapl_evd_resize.c \
+ dapl/common/dapl_evd_util.c \
+ dapl/common/dapl_evd_cq_async_error_callb.c \
+ dapl/common/dapl_evd_qp_async_error_callb.c \
+ dapl/common/dapl_evd_un_async_error_callb.c \
+ dapl/common/dapl_evd_connection_callb.c \
+ dapl/common/dapl_evd_dto_callb.c \
+ dapl/common/dapl_get_consumer_context.c \
+ dapl/common/dapl_get_handle_type.c \
+ dapl/common/dapl_hash.c \
+ dapl/common/dapl_hca_util.c \
+ dapl/common/dapl_ia_close.c \
+ dapl/common/dapl_ia_open.c \
+ dapl/common/dapl_ia_query.c \
+ dapl/common/dapl_ia_util.c \
+ dapl/common/dapl_llist.c \
+ dapl/common/dapl_lmr_free.c \
+ dapl/common/dapl_lmr_query.c \
+ dapl/common/dapl_lmr_util.c \
+ dapl/common/dapl_lmr_sync_rdma_read.c \
+ dapl/common/dapl_lmr_sync_rdma_write.c \
+ dapl/common/dapl_mr_util.c \
+ dapl/common/dapl_provider.c \
+ dapl/common/dapl_sp_util.c \
+ dapl/common/dapl_psp_create.c \
+ dapl/common/dapl_psp_create_any.c \
+ dapl/common/dapl_psp_free.c \
+ dapl/common/dapl_psp_query.c \
+ dapl/common/dapl_pz_create.c \
+ dapl/common/dapl_pz_free.c \
+ dapl/common/dapl_pz_query.c \
+ dapl/common/dapl_pz_util.c \
+ dapl/common/dapl_rmr_create.c \
+ dapl/common/dapl_rmr_free.c \
+ dapl/common/dapl_rmr_bind.c \
+ dapl/common/dapl_rmr_query.c \
+ dapl/common/dapl_rmr_util.c \
+ dapl/common/dapl_rsp_create.c \
+ dapl/common/dapl_rsp_free.c \
+ dapl/common/dapl_rsp_query.c \
+ dapl/common/dapl_cno_util.c \
+ dapl/common/dapl_set_consumer_context.c \
+ dapl/common/dapl_ring_buffer_util.c \
+ dapl/common/dapl_name_service.c \
+ dapl/common/dapl_timer_util.c \
+ dapl/common/dapl_ep_create_with_srq.c \
+ dapl/common/dapl_ep_recv_query.c \
+ dapl/common/dapl_ep_set_watermark.c \
+ dapl/common/dapl_srq_create.c \
+ dapl/common/dapl_srq_free.c \
+ dapl/common/dapl_srq_query.c \
+ dapl/common/dapl_srq_resize.c \
+ dapl/common/dapl_srq_post_recv.c \
+ dapl/common/dapl_srq_set_lw.c \
+ dapl/common/dapl_srq_util.c \
+ dapl/common/dapl_debug.c \
+ dapl/openib_scm/dapl_ib_util.c \
+ dapl/openib_scm/dapl_ib_cq.c \
+ dapl/openib_scm/dapl_ib_qp.c \
+ dapl/openib_scm/dapl_ib_cm.c \
+ dapl/openib_scm/dapl_ib_mem.c
+
+dapl_udapl_libdaplscm_la_LDFLAGS = -version-info 1:2:0
$(daplscm_version_script) \
+ -Wl,-init,dapl_init
-Wl,-fini,dapl_fini \
+ -lpthread -libverbs
+
libdatincludedir = $(includedir)/dat
libdatinclude_HEADERS = dat/include/dat/dat.h \
@@ -230,6 +348,7 @@ EXTRA_DIST = dat/common/dat_dictionary.h \
dapl/openib_scm/dapl_ib_util.h \
dat/udat/libdat.map \
dapl/udapl/libdaplcma.map \
+ dapl/udapl/libdaplscm.map \
dapl.spec.in \
$(man_MANS) \
test/dapltest/include/dapl_bpool.h \
@@ -271,9 +390,10 @@ install-exec-hook:
fi; \
echo OpenIB-cma u1.2 nonthreadsafe default libdaplcma.so.1
dapl.1.2 '"ib0 0" ""' >> $(sysconfdir)/dat.conf; \
echo OpenIB-cma-1 u1.2 nonthreadsafe default libdaplcma.so.1
dapl.1.2 '"ib1 0" ""' >> $(sysconfdir)/dat.conf; \
- echo OpenIB-cma-2 u1.2 nonthreadsafe default libdaplcma.so.1
dapl.1.2 '"ib2 0" ""' >> $(sysconfdir)/dat.conf; \
- echo OpenIB-cma-3 u1.2 nonthreadsafe default libdaplcma.so.1
dapl.1.2 '"ib3 0" ""' >> $(sysconfdir)/dat.conf; \
- echo OpenIB-bond u1.2 nonthreadsafe default libdaplcma.so.1
dapl.1.2 '"bond0 0" ""' >> $(sysconfdir)/dat.conf;
+ echo OpenIB-mthca0-1 u1.2 nonthreadsafe default libdaplscm.so.1
dapl.1.2 '"mthca0 1" ""' >> $(sysconfdir)/dat.conf; \
+ echo OpenIB-mthca0-2 u1.2 nonthreadsafe default libdaplscm.so.1
dapl.1.2 '"mthca0 2" ""' >> $(sysconfdir)/dat.conf; \
+ echo OpenIB-mlx4_0-1 u1.2 nonthreadsafe default libdaplscm.so.1
dapl.1.2 '"mlx4_0 1" ""' >> $(sysconfdir)/dat.conf; \
+ echo OpenIB-mlx4_0-2 u1.2 nonthreadsafe default libdaplscm.so.1
dapl.1.2 '"mlx4_0 2" ""' >> $(sysconfdir)/dat.conf;
uninstall-hook:
if test -e $(sysconfdir)/dat.conf; then \
diff --git a/dapl.spec.in b/dapl.spec.in
index 2153e27..3e64bdc 100644
--- a/dapl.spec.in
+++ b/dapl.spec.in
@@ -95,10 +95,10 @@ if [ -e %{_sysconfdir}/dat.conf ]; then
fi
echo OpenIB-cma u1.2 nonthreadsafe default libdaplcma.so.1 dapl.1.2
'"ib0 0" ""' >> %{_sysconfdir}/dat.conf
echo OpenIB-cma-1 u1.2 nonthreadsafe default libdaplcma.so.1 dapl.1.2
'"ib1 0" ""' >> %{_sysconfdir}/dat.conf
-echo OpenIB-cma-2 u1.2 nonthreadsafe default libdaplcma.so.1 dapl.1.2
'"ib2 0" ""' >> %{_sysconfdir}/dat.conf
-echo OpenIB-cma-3 u1.2 nonthreadsafe default libdaplcma.so.1 dapl.1.2
'"ib3 0" ""' >> %{_sysconfdir}/dat.conf
-echo OpenIB-bond u1.2 nonthreadsafe default libdaplcma.so.1 dapl.1.2
'"bond0 0" ""' >> %{_sysconfdir}/dat.conf
-
+echo OpenIB-mthca0-1 u2.0 nonthreadsafe default libdaplscm.so.1
dapl.1.2 '"mthca0 1" ""' >> %{_sysconfdir}/dat.conf
+echo OpenIB-mthca0-2 u2.0 nonthreadsafe default libdaplscm.so.1
dapl.1.2 '"mthca0 2" ""' >> %{_sysconfdir}/dat.conf
+echo OpenIB-mlx4_0-1 u1.2 nonthreadsafe default libdaplscm.so.1
dapl.1.2 '"mlx4_0 1" ""' >> %{_sysconfdir}/dat.conf
+echo OpenIB-mlx4_0-2 u1.2 nonthreadsafe default libdaplscm.so.1
dapl.1.2 '"mlx4_0 2" ""' >> %{_sysconfdir}/dat.conf
%postun
/sbin/ldconfig
@@ -130,7 +130,7 @@ fi
%changelog
* Tue May 20 2008 Arlin Davis <ardavis at ichips.intel.com> - 1.2.7
-- DAT/DAPL Version 1.2.7 Release 1, OFED 1.3.1 GA
+- DAT/DAPL Version 1.2.7 Release 1, OFED 1.3.1 GA
* Thu May 1 2008 Arlin Davis <ardavis at ichips.intel.com> - 1.2.6
- DAT/DAPL Version 1.2.6 Release 1, OFED 1.3.1
diff --git a/dapl/openib_scm/dapl_ib_cm.c b/dapl/openib_scm/dapl_ib_cm.c
index f534e8d..9e686d6 100644
--- a/dapl/openib_scm/dapl_ib_cm.c
+++ b/dapl/openib_scm/dapl_ib_cm.c
@@ -57,76 +57,171 @@
#include <unistd.h>
#include <fcntl.h>
#include <netinet/tcp.h>
-#include <sysfs/libsysfs.h>
+#include <byteswap.h>
+#include <poll.h>
-/* prototypes */
-static uint16_t dapli_get_lid( struct ibv_device *dev, int port );
+#include <sys/socket.h>
+#include <netinet/in.h>
+#include <arpa/inet.h>
-static DAT_RETURN dapli_socket_connect ( DAPL_EP *ep_ptr,
- DAT_IA_ADDRESS_PTR r_addr,
- DAT_CONN_QUAL r_qual,
- DAT_COUNT p_size,
- DAT_PVOID p_data
);
+#if __BYTE_ORDER == __LITTLE_ENDIAN
+static inline uint64_t cpu_to_be64(uint64_t x) {return bswap_64(x);}
+#elif __BYTE_ORDER == __BIG_ENDIAN
+static inline uint64_t cpu_to_be64(uint64_t x) {return x;}
+#endif
-static DAT_RETURN dapli_socket_listen ( DAPL_IA
*ia_ptr,
- DAT_CONN_QUAL
serviceID,
- DAPL_SP *sp_ptr
);
+extern int g_scm_pipe[2];
-static DAT_RETURN dapli_socket_accept( ib_cm_srvc_handle_t cm_ptr );
+static struct ib_cm_handle *dapli_cm_create(void)
+{
+ struct ib_cm_handle *cm_ptr;
-static DAT_RETURN dapli_socket_accept_final( DAPL_EP *ep_ptr,
- DAPL_CR *cr_ptr,
- DAT_COUNT p_size,
- DAT_PVOID p_data
);
+ /* Allocate CM, init lock, and initialize */
+ if ((cm_ptr = dapl_os_alloc(sizeof(*cm_ptr))) == NULL)
+ return NULL;
+
+ if (dapl_os_lock_init(&cm_ptr->lock))
+ goto bail;
-/* XXX temporary hack to get lid */
-static uint16_t dapli_get_lid(IN struct ibv_device *dev, IN int port)
+ (void)dapl_os_memzero(cm_ptr, sizeof(*cm_ptr));
+ cm_ptr->dst.ver = htons(DSCM_VER);
+ cm_ptr->socket = -1;
+ return cm_ptr;
+bail:
+ dapl_os_free(cm_ptr, sizeof(*cm_ptr));
+ return NULL;
+}
+
+/* mark for destroy, remove all references, schedule cleanup */
+static void dapli_cm_destroy(struct ib_cm_handle *cm_ptr)
{
- char path[128];
- char val[16];
- char name[256];
+ dapl_dbg_log(DAPL_DBG_TYPE_CM,
+ " cm_destroy: cm %p ep %p\n", cm_ptr,cm_ptr->ep);
+
+ /* cleanup, never made it to work queue */
+ if (cm_ptr->state == SCM_INIT) {
+ if (cm_ptr->socket >= 0)
+ close(cm_ptr->socket);
+ dapl_os_free(cm_ptr, sizeof(*cm_ptr));
+ return;
+ }
- if (sysfs_get_mnt_path(path, sizeof path)) {
- fprintf(stderr, "Couldn't find sysfs mount.\n");
- return 0;
+ dapl_os_lock(&cm_ptr->lock);
+ cm_ptr->state = SCM_DESTROY;
+ if (cm_ptr->ep) {
+ cm_ptr->ep->cm_handle = IB_INVALID_HANDLE;
+ cm_ptr->ep->qp_handle = IB_INVALID_HANDLE;
}
- sprintf(name, "%s/class/infiniband/%s/ports/%d/lid", path,
- ibv_get_device_name(dev), port);
- if (sysfs_read_attribute_value(name, val, sizeof val)) {
- fprintf(stderr, "Couldn't read LID at %s\n", name);
- return 0;
+ /* close socket if still active */
+ if (cm_ptr->socket >= 0) {
+ close(cm_ptr->socket);
+ cm_ptr->socket = -1;
}
- return strtol(val, NULL, 0);
+ dapl_os_unlock(&cm_ptr->lock);
+
+ /* wakeup work thread */
+ write(g_scm_pipe[1], "w", sizeof "w");
+}
+
+/* queue socket for processing CM work */
+static void dapli_cm_queue(struct ib_cm_handle *cm_ptr)
+{
+ /* add to work queue for cr thread processing */
+ dapl_llist_init_entry((DAPL_LLIST_ENTRY*)&cm_ptr->entry);
+ dapl_os_lock(&cm_ptr->hca->ib_trans.lock);
+ dapl_llist_add_tail(&cm_ptr->hca->ib_trans.list,
+ (DAPL_LLIST_ENTRY*)&cm_ptr->entry, cm_ptr);
+ dapl_os_unlock(&cm_ptr->hca->ib_trans.lock);
+
+ /* wakeup CM work thread */
+ write(g_scm_pipe[1], "w", sizeof "w");
+}
+
+static uint16_t dapli_get_lid(IN struct ibv_context *ctx, IN uint8_t
port)
+{
+ struct ibv_port_attr port_attr;
+
+ if(ibv_query_port(ctx, port,&port_attr))
+ return(0xffff);
+ else
+ return(port_attr.lid);
}
/*
- * ACTIVE: Create socket, connect, and exchange QP information
+ * ACTIVE/PASSIVE: called from CR thread or consumer via ep_disconnect
*/
static DAT_RETURN
-dapli_socket_connect ( DAPL_EP *ep_ptr,
- DAT_IA_ADDRESS_PTR r_addr,
- DAT_CONN_QUAL r_qual,
- DAT_COUNT p_size,
- DAT_PVOID p_data )
+dapli_socket_disconnect(ib_cm_handle_t cm_ptr)
{
- ib_cm_handle_t cm_ptr;
- DAPL_IA *ia_ptr = ep_ptr->header.owner_ia;
+ DAPL_EP *ep_ptr = cm_ptr->ep;
+ DAT_UINT32 disc_data = htonl(0xdead);
+
+ if (ep_ptr == NULL)
+ return DAT_SUCCESS;
+
+ dapl_os_lock(&cm_ptr->lock);
+ if ((cm_ptr->state == SCM_INIT) ||
+ (cm_ptr->state == SCM_DISCONNECTED)) {
+ dapl_os_unlock(&cm_ptr->lock);
+ return DAT_SUCCESS;
+ } else {
+ /* send disc date, close socket, schedule destroy */
+ if (cm_ptr->socket >= 0) {
+ write(cm_ptr->socket, &disc_data,
sizeof(disc_data));
+ close(cm_ptr->socket);
+ cm_ptr->socket = -1;
+ }
+ cm_ptr->state = SCM_DISCONNECTED;
+ write(g_scm_pipe[1], "w", sizeof "w");
+ }
+ dapl_os_unlock(&cm_ptr->lock);
+
+
+ if (ep_ptr->cr_ptr) {
+ dapls_cr_callback(cm_ptr,
+ IB_CME_DISCONNECTED,
+ NULL,
+ ((DAPL_CR *)ep_ptr->cr_ptr)->sp_ptr);
+ } else {
+ dapl_evd_connection_callback(ep_ptr->cm_handle,
+ IB_CME_DISCONNECTED,
+ NULL,
+ ep_ptr);
+ }
+
+ /* remove reference from endpoint */
+ ep_ptr->cm_handle = NULL;
+
+ /* schedule destroy */
+
+
+ return DAT_SUCCESS;
+}
+
+
+/*
+ * ACTIVE: Create socket, connect, defer exchange QP information to CR
thread
+ * to avoid blocking.
+ */
+DAT_RETURN
+dapli_socket_connect(DAPL_EP *ep_ptr,
+ DAT_IA_ADDRESS_PTR r_addr,
+ DAT_CONN_QUAL r_qual,
+ DAT_COUNT p_size,
+ DAT_PVOID p_data)
+{
+ ib_cm_handle_t cm_ptr;
int len, opt = 1;
struct iovec iovec[2];
- short rtu_data = htons(0x0E0F);
-
- dapl_dbg_log(DAPL_DBG_TYPE_EP, " connect: r_qual %d\n", r_qual);
+ DAPL_IA *ia_ptr = ep_ptr->header.owner_ia;
+
+ dapl_dbg_log(DAPL_DBG_TYPE_EP, " connect: r_qual %d
p_size=%d\n",
+ r_qual,p_size);
- /*
- * Allocate CM and initialize
- */
- if ((cm_ptr = dapl_os_alloc(sizeof(*cm_ptr))) == NULL ) {
+ cm_ptr = dapli_cm_create();
+ if (cm_ptr == NULL)
return DAT_INSUFFICIENT_RESOURCES;
- }
-
- (void) dapl_os_memzero( cm_ptr, sizeof( *cm_ptr ) );
- cm_ptr->socket = -1;
/* create, connect, sockopt, and exchange QP information */
if ((cm_ptr->socket = socket(AF_INET,SOCK_STREAM,0)) < 0 ) {
@@ -136,197 +231,252 @@ dapli_socket_connect ( DAPL_EP
*ep_ptr,
((struct sockaddr_in*)r_addr)->sin_port = htons(r_qual);
- if ( connect(cm_ptr->socket, r_addr, sizeof(*r_addr)) < 0 ) {
+ if (connect(cm_ptr->socket, r_addr, sizeof(*r_addr)) < 0) {
dapl_dbg_log(DAPL_DBG_TYPE_ERR,
" connect: %s on r_qual %d\n",
strerror(errno), (unsigned int)r_qual);
- dapl_os_free( cm_ptr, sizeof( *cm_ptr ) );
+ dapli_cm_destroy(cm_ptr);
return DAT_INVALID_ADDRESS;
}
setsockopt(cm_ptr->socket,IPPROTO_TCP,TCP_NODELAY,&opt,sizeof(opt));
-
+
+ dapl_dbg_log(DAPL_DBG_TYPE_EP, " socket connected!\n");
+
+
/* Send QP info, IA address, and private data */
- cm_ptr->dst.qpn = ep_ptr->qp_handle->qp_num;
- cm_ptr->dst.port = ia_ptr->hca_ptr->port_num;
- cm_ptr->dst.lid = dapli_get_lid(
ia_ptr->hca_ptr->ib_trans.ib_dev,
- ia_ptr->hca_ptr->port_num );
+ cm_ptr->dst.qpn = htonl(ep_ptr->qp_handle->qp_num);
+ cm_ptr->dst.port = htons(ia_ptr->hca_ptr->port_num);
+ cm_ptr->dst.lid =
+ htons(dapli_get_lid(ia_ptr->hca_ptr->ib_hca_handle,
+
(uint8_t)ia_ptr->hca_ptr->port_num));
+ if (cm_ptr->dst.lid == 0xffff)
+ goto bail;
+
+ /* in network order */
+ if (ibv_query_gid(ia_ptr->hca_ptr->ib_hca_handle,
+ (uint8_t)ia_ptr->hca_ptr->port_num,
+ 0,
+ &cm_ptr->dst.gid))
+ goto bail;
+
cm_ptr->dst.ia_address = ia_ptr->hca_ptr->hca_address;
- cm_ptr->dst.p_size = p_size;
+ cm_ptr->dst.p_size = htonl(p_size);
iovec[0].iov_base = &cm_ptr->dst;
iovec[0].iov_len = sizeof(ib_qp_cm_t);
- if ( p_size ) {
+ if (p_size) {
iovec[1].iov_base = p_data;
iovec[1].iov_len = p_size;
}
- len = writev( cm_ptr->socket, iovec, (p_size ? 2:1) );
- if ( len != (p_size + sizeof(ib_qp_cm_t)) ) {
+
+ dapl_dbg_log(DAPL_DBG_TYPE_EP," socket connected, write QP and
private data\n");
+ len = writev(cm_ptr->socket, iovec, (p_size ? 2:1));
+ if (len != (p_size + sizeof(ib_qp_cm_t))) {
dapl_dbg_log(DAPL_DBG_TYPE_ERR,
" connect write: ERR %s, wcnt=%d\n",
strerror(errno), len);
goto bail;
}
- dapl_dbg_log(DAPL_DBG_TYPE_EP,
+ dapl_dbg_log(DAPL_DBG_TYPE_CM,
" connect: SRC port=0x%x lid=0x%x, qpn=0x%x,
psize=%d\n",
- cm_ptr->dst.port, cm_ptr->dst.lid,
- cm_ptr->dst.qpn, cm_ptr->dst.p_size );
+ ntohs(cm_ptr->dst.port), ntohs(cm_ptr->dst.lid),
+ ntohl(cm_ptr->dst.qpn), ntohl(cm_ptr->dst.p_size));
+ dapl_dbg_log(DAPL_DBG_TYPE_CM,
+ " connect SRC GID subnet %016llx id %016llx\n",
+ (unsigned long long)
+
cpu_to_be64(cm_ptr->dst.gid.global.subnet_prefix),
+ (unsigned long long)
+
cpu_to_be64(cm_ptr->dst.gid.global.interface_id));
+
+ /* queue up to work thread to avoid blocking consumer */
+ cm_ptr->state = SCM_CONN_PENDING;
+ cm_ptr->hca = ia_ptr->hca_ptr;
+ cm_ptr->ep = ep_ptr;
+ dapli_cm_queue(cm_ptr);
+ return DAT_SUCCESS;
+bail:
+ /* close socket, free cm structure */
+ dapli_cm_destroy(cm_ptr);
+ return DAT_INTERNAL_ERROR;
+}
+
+
+/*
+ * ACTIVE: exchange QP information, called from CR thread
+ */
+void
+dapli_socket_connect_rtu(ib_cm_handle_t cm_ptr)
+{
+ DAPL_EP *ep_ptr = cm_ptr->ep;
+ int len;
+ struct iovec iovec[2];
+ short rtu_data = htons(0x0E0F);
+ ib_cm_events_t event = IB_CME_DESTINATION_REJECT;
/* read DST information into cm_ptr, overwrite SRC info */
- len = readv( cm_ptr->socket, iovec, 1 );
- if ( len != sizeof(ib_qp_cm_t) ) {
+ dapl_dbg_log(DAPL_DBG_TYPE_EP," connect_rtu: recv peer QP
data\n");
+
+ iovec[0].iov_base = &cm_ptr->dst;
+ iovec[0].iov_len = sizeof(ib_qp_cm_t);
+ len = readv(cm_ptr->socket, iovec, 1);
+ if (len != sizeof(ib_qp_cm_t) || ntohs(cm_ptr->dst.ver) !=
DSCM_VER) {
dapl_dbg_log(DAPL_DBG_TYPE_ERR,
- " connect read: ERR %s, rcnt=%d\n",
- strerror(errno), len);
+ " connect_rtu read: ERR %s, rcnt=%d,
ver=%d\n",
+ strerror(errno), len, cm_ptr->dst.ver);
+ goto bail;
+ }
+ /* check for consumer reject */
+ if (cm_ptr->dst.rej) {
+ dapl_dbg_log(DAPL_DBG_TYPE_CM,
+ " connect_rtu read: PEER REJ
reason=0x%x\n",
+ ntohs(cm_ptr->dst.rej));
+ event = IB_CME_DESTINATION_REJECT_PRIVATE_DATA;
goto bail;
}
+
+ /* convert peer response values to host order */
+ cm_ptr->dst.port = ntohs(cm_ptr->dst.port);
+ cm_ptr->dst.lid = ntohs(cm_ptr->dst.lid);
+ cm_ptr->dst.qpn = ntohl(cm_ptr->dst.qpn);
+ cm_ptr->dst.p_size = ntohl(cm_ptr->dst.p_size);
+
+ /* save remote address information */
+ dapl_os_memcpy( &ep_ptr->remote_ia_address,
+ &cm_ptr->dst.ia_address,
+ sizeof(ep_ptr->remote_ia_address));
+
dapl_dbg_log(DAPL_DBG_TYPE_EP,
- " connect: DST port=0x%x lid=0x%x, qpn=0x%x,
psize=%d\n",
+ " connect_rtu: DST %s port=0x%x lid=0x%x, qpn=0x%x,
psize=%d\n",
+ inet_ntoa(((struct sockaddr_in
*)&cm_ptr->dst.ia_address)->sin_addr),
cm_ptr->dst.port, cm_ptr->dst.lid,
- cm_ptr->dst.qpn, cm_ptr->dst.p_size );
+ cm_ptr->dst.qpn, cm_ptr->dst.p_size);
/* validate private data size before reading */
- if ( cm_ptr->dst.p_size > IB_MAX_REP_PDATA_SIZE ) {
+ if (cm_ptr->dst.p_size > IB_MAX_REP_PDATA_SIZE) {
dapl_dbg_log(DAPL_DBG_TYPE_ERR,
- " connect read: psize (%d) wrong\n",
+ " connect_rtu read: psize (%d) wrong\n",
cm_ptr->dst.p_size );
goto bail;
}
/* read private data into cm_handle if any present */
- if ( cm_ptr->dst.p_size ) {
+ dapl_dbg_log(DAPL_DBG_TYPE_EP," socket connected, read private
data\n");
+ if (cm_ptr->dst.p_size) {
iovec[0].iov_base = cm_ptr->p_data;
iovec[0].iov_len = cm_ptr->dst.p_size;
- len = readv( cm_ptr->socket, iovec, 1 );
- if ( len != cm_ptr->dst.p_size ) {
+ len = readv(cm_ptr->socket, iovec, 1);
+ if (len != cm_ptr->dst.p_size) {
dapl_dbg_log(DAPL_DBG_TYPE_ERR,
- " connect read pdata: ERR %s,
rcnt=%d\n",
+ " connect_rtu read pdata: ERR %s,
rcnt=%d\n",
strerror(errno), len);
goto bail;
}
}
/* modify QP to RTR and then to RTS with remote info */
- if ( dapls_modify_qp_state( ep_ptr->qp_handle,
- IBV_QPS_RTR, &cm_ptr->dst ) !=
DAT_SUCCESS )
+ if (dapls_modify_qp_state(ep_ptr->qp_handle,
+ IBV_QPS_RTR, &cm_ptr->dst) !=
DAT_SUCCESS)
goto bail;
- if ( dapls_modify_qp_state( ep_ptr->qp_handle,
- IBV_QPS_RTS, &cm_ptr->dst ) !=
DAT_SUCCESS )
+ if (dapls_modify_qp_state(ep_ptr->qp_handle,
+ IBV_QPS_RTS, &cm_ptr->dst) !=
DAT_SUCCESS)
goto bail;
ep_ptr->qp_state = IB_QP_STATE_RTS;
+ dapl_dbg_log(DAPL_DBG_TYPE_EP," connect_rtu: send RTU\n");
+
/* complete handshake after final QP state change */
- write(cm_ptr->socket, &rtu_data, sizeof(rtu_data) );
+ write(cm_ptr->socket, &rtu_data, sizeof(rtu_data));
/* init cm_handle and post the event with private data */
ep_ptr->cm_handle = cm_ptr;
- dapl_dbg_log( DAPL_DBG_TYPE_EP," ACTIVE: connected!\n" );
- dapl_evd_connection_callback( ep_ptr->cm_handle,
- IB_CME_CONNECTED,
- cm_ptr->p_data,
- ep_ptr );
- return DAT_SUCCESS;
-
+ cm_ptr->state = SCM_CONNECTED;
+ dapl_dbg_log(DAPL_DBG_TYPE_EP," ACTIVE: connected!\n");
+ dapl_evd_connection_callback(cm_ptr,
+ IB_CME_CONNECTED,
+ cm_ptr->p_data,
+ ep_ptr);
+ return;
bail:
/* close socket, free cm structure and post error event */
- if ( cm_ptr->socket >= 0 )
- close(cm_ptr->socket);
- dapl_os_free( cm_ptr, sizeof( *cm_ptr ) );
- dapls_ib_reinit_ep( ep_ptr ); /* reset QP state */
-
- dapl_evd_connection_callback( ep_ptr->cm_handle,
- IB_CME_LOCAL_FAILURE,
- NULL,
- ep_ptr );
- return DAT_INTERNAL_ERROR;
+ dapli_cm_destroy(cm_ptr);
+ dapls_ib_reinit_ep(ep_ptr); /* reset QP state */
+ dapl_evd_connection_callback(NULL, event, NULL, ep_ptr);
}
-
/*
* PASSIVE: Create socket, listen, accept, exchange QP information
*/
-static DAT_RETURN
-dapli_socket_listen ( DAPL_IA *ia_ptr,
- DAT_CONN_QUAL serviceID,
- DAPL_SP *sp_ptr )
+DAT_RETURN
+dapli_socket_listen(DAPL_IA *ia_ptr,
+ DAT_CONN_QUAL serviceID,
+ DAPL_SP *sp_ptr )
{
struct sockaddr_in addr;
ib_cm_srvc_handle_t cm_ptr = NULL;
int opt = 1;
DAT_RETURN dat_status = DAT_SUCCESS;
- dapl_dbg_log ( DAPL_DBG_TYPE_EP,
- " listen(ia_ptr %p ServiceID %d sp_ptr %p)\n",
- ia_ptr, serviceID, sp_ptr);
+ dapl_dbg_log(DAPL_DBG_TYPE_EP,
+ " listen(ia_ptr %p ServiceID %d sp_ptr %p)\n",
+ ia_ptr, serviceID, sp_ptr);
- /* Allocate CM and initialize */
- if ((cm_ptr = dapl_os_alloc(sizeof(*cm_ptr))) == NULL)
+ cm_ptr = dapli_cm_create();
+ if (cm_ptr == NULL)
return DAT_INSUFFICIENT_RESOURCES;
- (void) dapl_os_memzero( cm_ptr, sizeof( *cm_ptr ) );
-
- cm_ptr->socket = cm_ptr->l_socket = -1;
cm_ptr->sp = sp_ptr;
- cm_ptr->hca_ptr = ia_ptr->hca_ptr;
+ cm_ptr->hca = ia_ptr->hca_ptr;
/* bind, listen, set sockopt, accept, exchange data */
- if ((cm_ptr->l_socket = socket(AF_INET, SOCK_STREAM, 0)) < 0) {
+ if ((cm_ptr->socket = socket(AF_INET, SOCK_STREAM, 0)) < 0) {
dapl_dbg_log (DAPL_DBG_TYPE_ERR,
"socket for listen returned %d\n",
errno);
dat_status = DAT_INSUFFICIENT_RESOURCES;
goto bail;
}
-
setsockopt(cm_ptr->l_socket,SOL_SOCKET,SO_REUSEADDR,&opt,sizeof(opt));
+
setsockopt(cm_ptr->socket,SOL_SOCKET,SO_REUSEADDR,&opt,sizeof(opt));
addr.sin_port = htons(serviceID);
addr.sin_family = AF_INET;
addr.sin_addr.s_addr = INADDR_ANY;
- if (( bind( cm_ptr->l_socket,(struct sockaddr*)&addr,
sizeof(addr) ) < 0) ||
- (listen( cm_ptr->l_socket, 128 ) < 0) ) {
-
+ if ((bind(cm_ptr->socket,(struct sockaddr*)&addr, sizeof(addr))
< 0) ||
+ (listen(cm_ptr->socket, 128) < 0)) {
dapl_dbg_log( DAPL_DBG_TYPE_CM,
" listen: ERROR %s on conn_qual 0x%x\n",
strerror(errno),serviceID);
-
- if ( errno == EADDRINUSE )
+ if (errno == EADDRINUSE)
dat_status = DAT_CONN_QUAL_IN_USE;
else
dat_status = DAT_CONN_QUAL_UNAVAILABLE;
-
goto bail;
}
/* set cm_handle for this service point, save listen socket */
sp_ptr->cm_srvc_handle = cm_ptr;
- /* add to SP->CR thread list */
- dapl_llist_init_entry((DAPL_LLIST_ENTRY*)&cm_ptr->entry);
- dapl_os_lock( &cm_ptr->hca_ptr->ib_trans.lock );
- dapl_llist_add_tail(&cm_ptr->hca_ptr->ib_trans.list,
- (DAPL_LLIST_ENTRY*)&cm_ptr->entry, cm_ptr);
- dapl_os_unlock(&cm_ptr->hca_ptr->ib_trans.lock);
+ /* queue up listen socket to process inbound CR's */
+ cm_ptr->state = SCM_LISTEN;
+ dapli_cm_queue(cm_ptr);
+
+ dapl_dbg_log(DAPL_DBG_TYPE_CM,
+ " listen: qual 0x%x cr %p s_fd %d\n",
+ ntohs(serviceID), cm_ptr, cm_ptr->socket );
- dapl_dbg_log( DAPL_DBG_TYPE_CM,
- " listen: qual 0x%x cr %p s_fd %d\n",
- ntohs(serviceID), cm_ptr, cm_ptr->l_socket );
-
return dat_status;
bail:
dapl_dbg_log( DAPL_DBG_TYPE_CM,
" listen: ERROR on conn_qual 0x%x\n",serviceID);
- if ( cm_ptr->l_socket >= 0 )
- close( cm_ptr->l_socket );
- dapl_os_free( cm_ptr, sizeof( *cm_ptr ) );
+ dapli_cm_destroy(cm_ptr);
return dat_status;
}
/*
- * PASSIVE: send local QP information, private data, and wait for
- * active side to respond with QP RTS/RTR status
+ * PASSIVE: accept socket, receive peer QP information, private data,
post cr_event
*/
-static DAT_RETURN
+DAT_RETURN
dapli_socket_accept(ib_cm_srvc_handle_t cm_ptr)
{
ib_cm_handle_t acm_ptr;
@@ -334,6 +484,8 @@ dapli_socket_accept(ib_cm_srvc_handle_t cm_ptr)
int len;
DAT_RETURN dat_status = DAT_SUCCESS;
+ dapl_dbg_log(DAPL_DBG_TYPE_EP," socket_accept\n");
+
/* Allocate accept CM and initialize */
if ((acm_ptr = dapl_os_alloc(sizeof(*acm_ptr))) == NULL)
return DAT_INSUFFICIENT_RESOURCES;
@@ -342,155 +494,221 @@ dapli_socket_accept(ib_cm_srvc_handle_t cm_ptr)
acm_ptr->socket = -1;
acm_ptr->sp = cm_ptr->sp;
- acm_ptr->hca_ptr = cm_ptr->hca_ptr;
+ acm_ptr->hca = cm_ptr->hca;
len = sizeof(acm_ptr->dst.ia_address);
- acm_ptr->socket = accept(cm_ptr->l_socket,
+ acm_ptr->socket = accept(cm_ptr->socket,
(struct
sockaddr*)&acm_ptr->dst.ia_address,
- (socklen_t*)&len );
+ (socklen_t*)&len);
- if ( acm_ptr->socket < 0 ) {
+ if (acm_ptr->socket < 0) {
dapl_dbg_log(DAPL_DBG_TYPE_ERR,
" accept: ERR %s on FD %d l_cr %p\n",
- strerror(errno),cm_ptr->l_socket,cm_ptr);
+ strerror(errno),cm_ptr->socket,cm_ptr);
dat_status = DAT_INTERNAL_ERROR;
goto bail;
}
+ dapl_dbg_log(DAPL_DBG_TYPE_EP," socket accepted, read QP
data\n");
+
/* read in DST QP info, IA address. check for private data */
- len = read( acm_ptr->socket, &acm_ptr->dst, sizeof(ib_qp_cm_t)
);
- if ( len != sizeof(ib_qp_cm_t) ) {
+ len = read(acm_ptr->socket, &acm_ptr->dst, sizeof(ib_qp_cm_t));
+ if (len != sizeof(ib_qp_cm_t) ||
+ ntohs(acm_ptr->dst.ver) != DSCM_VER) {
dapl_dbg_log(DAPL_DBG_TYPE_ERR,
- " accept read: ERR %s, rcnt=%d\n",
- strerror(errno), len);
+ " accept read: ERR %s, rcnt=%d, ver=%d\n",
+ strerror(errno), len, acm_ptr->dst.ver);
dat_status = DAT_INTERNAL_ERROR;
goto bail;
-
}
+
+ /* convert accepted values to host order */
+ acm_ptr->dst.port = ntohs(acm_ptr->dst.port);
+ acm_ptr->dst.lid = ntohs(acm_ptr->dst.lid);
+ acm_ptr->dst.qpn = ntohl(acm_ptr->dst.qpn);
+ acm_ptr->dst.p_size = ntohl(acm_ptr->dst.p_size);
+
dapl_dbg_log(DAPL_DBG_TYPE_EP,
- " accept: DST port=0x%x lid=0x%x, qpn=0x%x, psize=%d\n",
- acm_ptr->dst.port, acm_ptr->dst.lid,
- acm_ptr->dst.qpn, acm_ptr->dst.p_size );
+ " accept: DST %s port=0x%x lid=0x%x, qpn=0x%x,
psize=%d\n",
+ inet_ntoa(((struct sockaddr_in
*)&acm_ptr->dst.ia_address)->sin_addr),
+ acm_ptr->dst.port, acm_ptr->dst.lid,
+ acm_ptr->dst.qpn, acm_ptr->dst.p_size);
/* validate private data size before reading */
- if ( acm_ptr->dst.p_size > IB_MAX_REQ_PDATA_SIZE ) {
+ if (acm_ptr->dst.p_size > IB_MAX_REQ_PDATA_SIZE) {
dapl_dbg_log(DAPL_DBG_TYPE_ERR,
- " accept read: psize (%d) wrong\n",
- acm_ptr->dst.p_size );
+ " accept read: psize (%d) wrong\n",
+ acm_ptr->dst.p_size);
dat_status = DAT_INTERNAL_ERROR;
goto bail;
}
+ dapl_dbg_log(DAPL_DBG_TYPE_EP," socket accepted, read private
data\n");
+
/* read private data into cm_handle if any present */
- if ( acm_ptr->dst.p_size ) {
+ if (acm_ptr->dst.p_size) {
len = read( acm_ptr->socket,
- acm_ptr->p_data, acm_ptr->dst.p_size );
- if ( len != acm_ptr->dst.p_size ) {
+ acm_ptr->p_data, acm_ptr->dst.p_size);
+ if (len != acm_ptr->dst.p_size) {
dapl_dbg_log(DAPL_DBG_TYPE_ERR,
- " accept read pdata: ERR %s, rcnt=%d\n",
- strerror(errno), len );
+ " accept read pdata: ERR %s,
rcnt=%d\n",
+ strerror(errno), len);
dat_status = DAT_INTERNAL_ERROR;
goto bail;
}
- dapl_dbg_log(DAPL_DBG_TYPE_EP,
- " accept: psize=%d read\n",
- acm_ptr->dst.p_size);
+ dapl_dbg_log(DAPL_DBG_TYPE_EP," accept: psize=%d
read\n",len);
p_data = acm_ptr->p_data;
}
- /* trigger CR event and return SUCCESS */
- dapls_cr_callback( acm_ptr,
- IB_CME_CONNECTION_REQUEST_PENDING,
- p_data,
- acm_ptr->sp );
+ acm_ptr->state = SCM_ACCEPTING;
+ /* trigger CR event and return SUCCESS */
+ dapls_cr_callback(acm_ptr,
+ IB_CME_CONNECTION_REQUEST_PENDING,
+ p_data,
+ acm_ptr->sp );
return DAT_SUCCESS;
-
bail:
- if ( acm_ptr->socket >=0 )
- close( acm_ptr->socket );
- dapl_os_free( acm_ptr, sizeof( *acm_ptr ) );
+ dapli_cm_destroy(acm_ptr);
return DAT_INTERNAL_ERROR;
}
-
-static DAT_RETURN
-dapli_socket_accept_final( DAPL_EP *ep_ptr,
- DAPL_CR *cr_ptr,
- DAT_COUNT p_size,
- DAT_PVOID p_data )
+/*
+ * PASSIVE: consumer accept, send local QP information, private data,
+ * queue on work thread to receive RTU information to avoid blocking
+ * user thread.
+ */
+DAT_RETURN
+dapli_socket_accept_usr(DAPL_EP *ep_ptr,
+ DAPL_CR *cr_ptr,
+ DAT_COUNT p_size,
+ DAT_PVOID p_data)
{
DAPL_IA *ia_ptr = ep_ptr->header.owner_ia;
- ib_cm_handle_t cm_ptr = cr_ptr->ib_cm_handle;
- ib_qp_cm_t qp_cm;
+ ib_cm_handle_t cm_ptr = cr_ptr->ib_cm_handle;
struct iovec iovec[2];
int len;
- short rtu_data = 0;
- if (p_size > IB_MAX_REP_PDATA_SIZE)
+ if (p_size > IB_MAX_REP_PDATA_SIZE)
return DAT_LENGTH_ERROR;
/* must have a accepted socket */
- if ( cm_ptr->socket < 0 )
+ if (cm_ptr->socket < 0)
return DAT_INTERNAL_ERROR;
+ dapl_dbg_log(DAPL_DBG_TYPE_EP,
+ " accept_usr: remote port=0x%x lid=0x%x"
+ " qpn=0x%x psize=%d\n",
+ cm_ptr->dst.port, cm_ptr->dst.lid,
+ cm_ptr->dst.qpn, cm_ptr->dst.p_size);
+
/* modify QP to RTR and then to RTS with remote info already
read */
- if ( dapls_modify_qp_state( ep_ptr->qp_handle,
- IBV_QPS_RTR, &cm_ptr->dst ) !=
DAT_SUCCESS )
+ if (dapls_modify_qp_state(ep_ptr->qp_handle,
+ IBV_QPS_RTR, &cm_ptr->dst) !=
DAT_SUCCESS)
goto bail;
- if ( dapls_modify_qp_state( ep_ptr->qp_handle,
- IBV_QPS_RTS, &cm_ptr->dst ) !=
DAT_SUCCESS )
+ if (dapls_modify_qp_state(ep_ptr->qp_handle,
+ IBV_QPS_RTS, &cm_ptr->dst) !=
DAT_SUCCESS)
goto bail;
ep_ptr->qp_state = IB_QP_STATE_RTS;
- /* Send QP info, IA address, and private data */
- qp_cm.qpn = ep_ptr->qp_handle->qp_num;
- qp_cm.port = ia_ptr->hca_ptr->port_num;
- qp_cm.lid = dapli_get_lid( ia_ptr->hca_ptr->ib_trans.ib_dev,
- ia_ptr->hca_ptr->port_num );
- qp_cm.ia_address = ia_ptr->hca_ptr->hca_address;
- qp_cm.p_size = p_size;
- iovec[0].iov_base = &qp_cm;
+ /* save remote address information */
+ dapl_os_memcpy( &ep_ptr->remote_ia_address,
+ &cm_ptr->dst.ia_address,
+ sizeof(ep_ptr->remote_ia_address));
+
+ /* send our QP info, IA address, and private data */
+ cm_ptr->dst.qpn = htonl(ep_ptr->qp_handle->qp_num);
+ cm_ptr->dst.port = htons(ia_ptr->hca_ptr->port_num);
+ cm_ptr->dst.lid =
htons(dapli_get_lid(ia_ptr->hca_ptr->ib_hca_handle,
+
(uint8_t)ia_ptr->hca_ptr->port_num));
+ if (cm_ptr->dst.lid == 0xffff)
+ goto bail;
+
+ /* in network order */
+ if (ibv_query_gid(ia_ptr->hca_ptr->ib_hca_handle,
+ (uint8_t)ia_ptr->hca_ptr->port_num,
+ 0,
+ &cm_ptr->dst.gid))
+ goto bail;
+
+ cm_ptr->dst.ia_address = ia_ptr->hca_ptr->hca_address;
+ cm_ptr->dst.p_size = htonl(p_size);
+ iovec[0].iov_base = &cm_ptr->dst;
iovec[0].iov_len = sizeof(ib_qp_cm_t);
if (p_size) {
iovec[1].iov_base = p_data;
iovec[1].iov_len = p_size;
}
- len = writev( cm_ptr->socket, iovec, (p_size ? 2:1) );
+ len = writev(cm_ptr->socket, iovec, (p_size ? 2:1));
if (len != (p_size + sizeof(ib_qp_cm_t))) {
dapl_dbg_log(DAPL_DBG_TYPE_ERR,
- " accept_final: ERR %s, wcnt=%d\n",
+ " accept_rtu: ERR %s, wcnt=%d\n",
strerror(errno), len);
goto bail;
}
- dapl_dbg_log(DAPL_DBG_TYPE_EP,
- " accept_final: SRC port=0x%x lid=0x%x, qpn=0x%x,
psize=%d\n",
- qp_cm.port, qp_cm.lid, qp_cm.qpn, qp_cm.p_size );
-
+ dapl_dbg_log(DAPL_DBG_TYPE_CM,
+ " accept_usr: local port=0x%x lid=0x%x"
+ " qpn=0x%x psize=%d\n",
+ ntohs(cm_ptr->dst.port), ntohs(cm_ptr->dst.lid),
+ ntohl(cm_ptr->dst.qpn), ntohl(cm_ptr->dst.p_size));
+ dapl_dbg_log(DAPL_DBG_TYPE_CM,
+ " accept_usr SRC GID subnet %016llx id %016llx\n",
+ (unsigned long long)
+
cpu_to_be64(cm_ptr->dst.gid.global.subnet_prefix),
+ (unsigned long long)
+
cpu_to_be64(cm_ptr->dst.gid.global.interface_id));
+
+ /* save state and reference to EP, queue for RTU data */
+ cm_ptr->ep = ep_ptr;
+ cm_ptr->hca = ia_ptr->hca_ptr;
+ cm_ptr->state = SCM_ACCEPTED;
+
+ /* restore remote address information for query */
+ dapl_os_memcpy( &cm_ptr->dst.ia_address,
+ &ep_ptr->remote_ia_address,
+ sizeof(cm_ptr->dst.ia_address));
+
+ dapl_dbg_log( DAPL_DBG_TYPE_EP," PASSIVE: accepted!\n" );
+ dapli_cm_queue(cm_ptr);
+ return DAT_SUCCESS;
+bail:
+ dapl_dbg_log(DAPL_DBG_TYPE_ERR," accept_rtu: ERR !QP_RTR_RTS
\n");
+ dapli_cm_destroy(cm_ptr);
+ dapls_ib_reinit_ep(ep_ptr); /* reset QP state */
+ return DAT_INTERNAL_ERROR;
+}
+
+/*
+ * PASSIVE: read RTU from active peer, post CONN event
+ */
+void
+dapli_socket_accept_rtu(ib_cm_handle_t cm_ptr)
+{
+ int len;
+ short rtu_data = 0;
+
/* complete handshake after final QP state change */
- len = read(cm_ptr->socket, &rtu_data, sizeof(rtu_data) );
- if ( len != sizeof(rtu_data) || ntohs(rtu_data) != 0x0e0f ) {
+ len = read(cm_ptr->socket, &rtu_data, sizeof(rtu_data));
+ if (len != sizeof(rtu_data) || ntohs(rtu_data) != 0x0e0f) {
dapl_dbg_log(DAPL_DBG_TYPE_ERR,
- " accept_final: ERR %s, rcnt=%d
rdata=%x\n",
- strerror(errno), len, ntohs(rtu_data) );
+ " accept_rtu: ERR %s, rcnt=%d rdata=%x\n",
+ strerror(errno), len, ntohs(rtu_data));
goto bail;
}
+ /* save state and reference to EP, queue for disc event */
+ cm_ptr->state = SCM_CONNECTED;
+
/* final data exchange if remote QP state is good to go */
dapl_dbg_log( DAPL_DBG_TYPE_EP," PASSIVE: connected!\n" );
- dapls_cr_callback ( cm_ptr, IB_CME_CONNECTED, NULL, cm_ptr->sp
);
- return DAT_SUCCESS;
-
+ dapls_cr_callback(cm_ptr, IB_CME_CONNECTED, NULL, cm_ptr->sp);
+ return;
bail:
- dapl_dbg_log( DAPL_DBG_TYPE_ERR," accept_final: ERR !QP_RTR_RTS
\n");
- if ( cm_ptr >= 0 )
- close( cm_ptr->socket );
- dapl_os_free( cm_ptr, sizeof( *cm_ptr ) );
- dapls_ib_reinit_ep( ep_ptr ); /* reset QP state */
-
- return DAT_INTERNAL_ERROR;
+ dapls_ib_reinit_ep(cm_ptr->ep); /* reset QP state */
+ dapli_cm_destroy(cm_ptr);
+ dapls_cr_callback(cm_ptr, IB_CME_DESTINATION_REJECT, NULL,
cm_ptr->sp);
}
@@ -528,18 +746,13 @@ dapls_ib_connect (
dapl_dbg_log ( DAPL_DBG_TYPE_EP,
" connect(ep_handle %p ....)\n", ep_handle);
- /*
- * Sanity check
- */
- if ( NULL == ep_handle )
- return DAT_SUCCESS;
ep_ptr = (DAPL_EP*)ep_handle;
qp_ptr = ep_ptr->qp_handle;
- return (dapli_socket_connect( ep_ptr, remote_ia_address,
- remote_conn_qual,
- private_data_size, private_data
));
+ return (dapli_socket_connect(ep_ptr, remote_ia_address,
+ remote_conn_qual,
+ private_data_size, private_data));
}
/*
@@ -556,12 +769,11 @@ dapls_ib_connect (
*
* Returns:
* DAT_SUCCESS
- *
*/
DAT_RETURN
-dapls_ib_disconnect (
+dapls_ib_disconnect(
IN DAPL_EP *ep_ptr,
- IN DAT_CLOSE_FLAGS close_flags )
+ IN DAT_CLOSE_FLAGS close_flags)
{
ib_cm_handle_t cm_ptr = ep_ptr->cm_handle;
@@ -569,28 +781,13 @@ dapls_ib_disconnect (
"dapls_ib_disconnect(ep_handle %p ....)\n",
ep_ptr);
- if ( cm_ptr->socket >= 0 ) {
- close( cm_ptr->socket );
- cm_ptr->socket = -1;
- }
-
/* reinit to modify QP state */
dapls_ib_reinit_ep(ep_ptr);
- if ( ep_ptr->cr_ptr ) {
- dapls_cr_callback ( ep_ptr->cm_handle,
- IB_CME_DISCONNECTED,
- NULL,
- ((DAPL_CR *)ep_ptr->cr_ptr)->sp_ptr
);
- } else {
- dapl_evd_connection_callback ( ep_ptr->cm_handle,
- IB_CME_DISCONNECTED,
- NULL,
- ep_ptr );
- ep_ptr->cm_handle = NULL;
- dapl_os_free( cm_ptr, sizeof( *cm_ptr ) );
- }
- return DAT_SUCCESS;
+ if (cm_ptr == NULL)
+ return DAT_SUCCESS;
+ else
+ return(dapli_socket_disconnect(cm_ptr));
}
/*
@@ -679,13 +876,14 @@ dapls_ib_remove_conn_listener (
ia_ptr, sp_ptr, cm_ptr );
/* close accepted socket, free cm_srvc_handle and return */
- if ( cm_ptr != NULL ) {
- if ( cm_ptr->l_socket >= 0 ) {
- close( cm_ptr->l_socket );
+ if (cm_ptr != NULL) {
+ if (cm_ptr->socket >= 0) {
+ close(cm_ptr->socket );
cm_ptr->socket = -1;
}
/* cr_thread will free */
sp_ptr->cm_srvc_handle = NULL;
+ write(g_scm_pipe[1], "w", sizeof "w");
}
return DAT_SUCCESS;
}
@@ -720,23 +918,22 @@ dapls_ib_accept_connection (
DAPL_CR *cr_ptr;
DAPL_EP *ep_ptr;
- dapl_dbg_log (DAPL_DBG_TYPE_EP,
- "dapls_ib_accept_connection(cr %p ep %p prd
%p,%d)\n",
- cr_handle, ep_handle, p_data, p_size );
+ dapl_dbg_log(DAPL_DBG_TYPE_EP,
+ "dapls_ib_accept_connection(cr %p ep %p prd
%p,%d)\n",
+ cr_handle, ep_handle, p_data, p_size );
- cr_ptr = (DAPL_CR *) cr_handle;
- ep_ptr = (DAPL_EP *) ep_handle;
+ cr_ptr = (DAPL_CR *)cr_handle;
+ ep_ptr = (DAPL_EP *)ep_handle;
/* allocate and attach a QP if necessary */
- if ( ep_ptr->qp_state == DAPL_QP_STATE_UNATTACHED ) {
+ if (ep_ptr->qp_state == DAPL_QP_STATE_UNATTACHED) {
DAT_RETURN status;
- status = dapls_ib_qp_alloc( ep_ptr->header.owner_ia,
- ep_ptr, ep_ptr );
- if ( status != DAT_SUCCESS )
+ status = dapls_ib_qp_alloc(ep_ptr->header.owner_ia,
+ ep_ptr, ep_ptr);
+ if (status != DAT_SUCCESS)
return status;
}
-
- return ( dapli_socket_accept_final(ep_ptr, cr_ptr, p_size,
p_data) );
+ return(dapli_socket_accept_usr(ep_ptr, cr_ptr, p_size, p_data));
}
@@ -759,19 +956,29 @@ dapls_ib_accept_connection (
DAT_RETURN
dapls_ib_reject_connection (
IN ib_cm_handle_t ib_cm_handle,
- IN int reject_reason )
+ IN int reject_reason)
{
ib_cm_srvc_handle_t cm_ptr = ib_cm_handle;
+ struct iovec iovec;
dapl_dbg_log (DAPL_DBG_TYPE_EP,
"dapls_ib_reject_connection(cm_handle %p reason
%x)\n",
- ib_cm_handle, reject_reason );
-
- /* just close the socket and return */
- if ( cm_ptr->socket > 0 ) {
- close( cm_ptr->socket );
+ ib_cm_handle, reject_reason);
+
+ /* write reject data to indicate reject */
+ if (cm_ptr->socket >= 0) {
+ cm_ptr->dst.rej = (uint16_t)reject_reason;
+ cm_ptr->dst.rej = htons(cm_ptr->dst.rej);
+ iovec.iov_base = &cm_ptr->dst;
+ iovec.iov_len = sizeof(ib_qp_cm_t);
+ writev(cm_ptr->socket, &iovec, 1);
+ close(cm_ptr->socket);
cm_ptr->socket = -1;
}
+
+ /* cr_thread will destroy CR */
+ cm_ptr->state = SCM_REJECTED;
+ write(g_scm_pipe[1], "w", sizeof "w");
return DAT_SUCCESS;
}
@@ -991,24 +1198,25 @@ dapls_ib_get_cm_event (
return ib_cm_event;
}
-/* async CR processing thread to avoid blocking applications */
+/* outbound/inbound CR processing thread to avoid blocking applications
*/
+#define SCM_MAX_CONN 8192
void cr_thread(void *arg)
{
struct dapl_hca *hca_ptr = arg;
- ib_cm_srvc_handle_t cr, next_cr;
- int max_fd;
- fd_set rfd,rfds;
- struct timeval to;
+ ib_cm_handle_t cr, next_cr;
+ int ret,idx;
+ char rbuf[2];
+ struct pollfd ufds[SCM_MAX_CONN];
dapl_dbg_log(DAPL_DBG_TYPE_UTIL," cr_thread: ENTER hca
%p\n",hca_ptr);
dapl_os_lock( &hca_ptr->ib_trans.lock );
hca_ptr->ib_trans.cr_state = IB_THREAD_RUN;
while (hca_ptr->ib_trans.cr_state == IB_THREAD_RUN) {
-
- FD_ZERO( &rfds );
- max_fd = -1;
-
+ idx=0;
+ ufds[idx].fd = g_scm_pipe[0]; /* wakeup and process work */
+ ufds[idx].events = POLLIN;
+
if (!dapl_llist_is_empty(&hca_ptr->ib_trans.list))
next_cr = dapl_llist_peek_head (&hca_ptr->ib_trans.list);
else
@@ -1016,51 +1224,70 @@ void cr_thread(void *arg)
while (next_cr) {
cr = next_cr;
- dapl_dbg_log (DAPL_DBG_TYPE_CM," thread: cm_ptr %p\n", cr );
- if (cr->l_socket == -1 ||
+ if ((cr->socket == -1) ||
hca_ptr->ib_trans.cr_state != IB_THREAD_RUN) {
- dapl_dbg_log(DAPL_DBG_TYPE_CM," thread: Freeing %p\n",
cr);
+ dapl_dbg_log(DAPL_DBG_TYPE_CM," cr_thread: Free %p\n",
cr);
next_cr = dapl_llist_next_entry(&hca_ptr->ib_trans.list,
(DAPL_LLIST_ENTRY*)&cr->entry );
dapl_llist_remove_entry(&hca_ptr->ib_trans.list,
(DAPL_LLIST_ENTRY*)&cr->entry);
- dapl_os_free( cr, sizeof(*cr) );
+ dapl_os_free(cr, sizeof(*cr));
continue;
}
-
- FD_SET( cr->l_socket, &rfds ); /* add to select set */
- if ( cr->l_socket > max_fd )
- max_fd = cr->l_socket;
-
- /* individual select poll to check for work */
- FD_ZERO(&rfd);
- FD_SET(cr->l_socket, &rfd);
- dapl_os_unlock(&hca_ptr->ib_trans.lock);
- to.tv_sec = 0;
- to.tv_usec = 0;
- if ( select(cr->l_socket + 1,&rfd, NULL, NULL, &to) < 0) {
- dapl_dbg_log (DAPL_DBG_TYPE_CM,
- " thread: ERR %s on cr %p sk %d\n",
- strerror(errno), cr, cr->l_socket);
- close(cr->l_socket);
- cr->l_socket = -1;
- } else if ( FD_ISSET(cr->l_socket, &rfd) &&
- dapli_socket_accept(cr)) {
- close(cr->l_socket);
- cr->l_socket = -1;
+
+ if (idx==SCM_MAX_CONN-1) {
+ dapl_dbg_log(DAPL_DBG_TYPE_ERR,
+ "SCM ERR: cm_thread exceeded FD_SETSIZE
%d\n",idx+1);
+ continue;
}
- dapl_os_lock( &hca_ptr->ib_trans.lock );
+
+ /* Add to ufds for poll, check for immediate work */
+ ufds[++idx].fd = cr->socket; /* add listen or cr */
+ ufds[idx].events = POLLIN;
+
+ /* check socket for event, accept in or connect out */
+ dapl_dbg_log(DAPL_DBG_TYPE_CM," poll cr=%p, fd=%d,%d\n",
+ cr, cr->socket, ufds[idx].fd);
+ dapl_os_unlock(&hca_ptr->ib_trans.lock);
+ ret = poll(&ufds[idx],1,1);
+ dapl_dbg_log(DAPL_DBG_TYPE_CM," poll wakeup ret=%d cr->st=%d
ev=%d fd=%d\n",
+ ret,cr->state,ufds[idx].revents,ufds[idx].fd);
+
+ /* data on listen, qp exchange, and on disconnect request */
+ if ((ret == 1) && ufds[idx].revents == POLLIN) {
+ if (cr->socket > 0) {
+ if (cr->state == SCM_LISTEN)
+ dapli_socket_accept(cr);
+ else if (cr->state == SCM_ACCEPTED)
+ dapli_socket_accept_rtu(cr);
+ else if (cr->state == SCM_CONN_PENDING)
+ dapli_socket_connect_rtu(cr);
+ else if (cr->state == SCM_CONNECTED)
+ dapli_socket_disconnect(cr);
+ }
+ } else if (ret != 0) {
+ dapl_dbg_log(DAPL_DBG_TYPE_CM,
+ " cr_thread(cr=%p) st=%d poll ERR= %s\n",
+ cr,cr->state,strerror(errno));
+ /* POLLUP or poll error case, issue event if connected
*/
+ if (cr->state == SCM_CONNECTED)
+ dapli_socket_disconnect(cr);
+ }
+ dapl_os_lock(&hca_ptr->ib_trans.lock);
next_cr = dapl_llist_next_entry(&hca_ptr->ib_trans.list,
-
(DAPL_LLIST_ENTRY*)&cr->entry );
+
(DAPL_LLIST_ENTRY*)&cr->entry);
}
- dapl_os_unlock( &hca_ptr->ib_trans.lock );
- to.tv_sec = 0;
- to.tv_usec = 100000; /* wakeup and check destroy */
- select(max_fd + 1, &rfds, NULL, NULL, &to);
- dapl_os_lock( &hca_ptr->ib_trans.lock );
+ dapl_os_unlock(&hca_ptr->ib_trans.lock);
+ dapl_dbg_log(DAPL_DBG_TYPE_CM," cr_thread: sleep, %d\n", idx+1);
+ poll(ufds,idx+1,-1); /* infinite, all sockets and pipe */
+ /* if pipe used to wakeup, consume */
+ if (ufds[0].revents == POLLIN)
+ read(g_scm_pipe[0], rbuf, 2);
+ dapl_dbg_log(DAPL_DBG_TYPE_CM," cr_thread: wakeup\n");
+ dapl_os_lock(&hca_ptr->ib_trans.lock);
}
- dapl_os_unlock( &hca_ptr->ib_trans.lock );
+ dapl_os_unlock(&hca_ptr->ib_trans.lock);
hca_ptr->ib_trans.cr_state = IB_THREAD_EXIT;
dapl_dbg_log(DAPL_DBG_TYPE_UTIL," cr_thread(hca %p)
exit\n",hca_ptr);
}
diff --git a/dapl/openib_scm/dapl_ib_cq.c b/dapl/openib_scm/dapl_ib_cq.c
index 7ac7037..56b729e 100644
--- a/dapl/openib_scm/dapl_ib_cq.c
+++ b/dapl/openib_scm/dapl_ib_cq.c
@@ -97,7 +97,7 @@ void dapli_cq_thread_destroy(struct dapl_hca *hca_ptr)
while (hca_ptr->ib_trans.cq_state != IB_THREAD_EXIT) {
struct timespec sleep, remain;
sleep.tv_sec = 0;
- sleep.tv_nsec = 200000000; /* 200 ms */
+ sleep.tv_nsec = 2000000; /* 2 ms */
dapl_dbg_log(DAPL_DBG_TYPE_UTIL,
" cq_thread_destroy: waiting for
cq_thread\n");
nanosleep (&sleep, &remain);
@@ -422,12 +422,21 @@ DAT_RETURN dapls_ib_cq_free (
IN DAPL_IA *ia_ptr,
IN DAPL_EVD *evd_ptr)
{
- if ( evd_ptr->ib_cq_handle != IB_INVALID_HANDLE ) {
- /* copy all entries on CQ to EVD before destroying */
- dapls_evd_copy_cq(evd_ptr);
+ DAT_EVENT event;
+
+ if (evd_ptr->ib_cq_handle != IB_INVALID_HANDLE) {
+ /* pull off CQ and EVD entries and toss */
+ while (dapls_ib_completion_poll == DAT_SUCCESS);
+ while (dapl_evd_dequeue(evd_ptr,&event) !=
DAT_QUEUE_EMPTY);
+#if 1
+ ibv_destroy_cq(evd_ptr->ib_cq_handle);
+ evd_ptr->ib_cq_handle = IB_INVALID_HANDLE;
+ return DAT_SUCCESS;
+#else
if (ibv_destroy_cq(evd_ptr->ib_cq_handle))
return(dapl_convert_errno(errno,"destroy_cq"));
evd_ptr->ib_cq_handle = IB_INVALID_HANDLE;
+#endif
}
return DAT_SUCCESS;
}
@@ -600,7 +609,7 @@ dapls_ib_wait_object_wait (
status = ETIMEDOUT;
dapl_dbg_log (DAPL_DBG_TYPE_CM,
- " cq_object_wait: RET evd %p ibv_cq %p ibv_ctx %p
%s\n",
+ " cq_object_wait: RET evd %p ibv_cq %p %s\n",
evd_ptr, ibv_cq,strerror(errno));
return(dapl_convert_errno(status,"cq_wait_object_wait"));
diff --git a/dapl/openib_scm/dapl_ib_dto.h
b/dapl/openib_scm/dapl_ib_dto.h
index bea3e4d..b15f347 100644
--- a/dapl/openib_scm/dapl_ib_dto.h
+++ b/dapl/openib_scm/dapl_ib_dto.h
@@ -35,7 +35,7 @@
*
* Description:
*
- * The uDAPL openib provider - DTO operations and CQE macros
+ * The OpenIB SCM (socket CM) provider - DTO operations and CQE
macros
*
************************************************************************
****
* Source Control System Information
@@ -61,26 +61,25 @@ STATIC _INLINE_ int
dapls_cqe_opcode(ib_work_completion_t *cqe_p);
*/
STATIC _INLINE_ DAT_RETURN
dapls_ib_post_recv (
- IN DAPL_EP *ep_ptr,
+ IN DAPL_EP *ep_ptr,
IN DAPL_COOKIE *cookie,
- IN DAT_COUNT segments,
+ IN DAT_COUNT segments,
IN DAT_LMR_TRIPLET *local_iov )
{
- ib_data_segment_t ds_array[DEFAULT_DS_ENTRIES];
- ib_data_segment_t *ds_array_p, *ds_array_start_p = NULL;
- struct ibv_recv_wr wr;
- struct ibv_recv_wr *bad_wr;
- DAT_COUNT i, total_len;
- int ret;
+ ib_data_segment_t ds_array[DEFAULT_DS_ENTRIES];
+ ib_data_segment_t *ds_array_p;
+ struct ibv_recv_wr wr;
+ struct ibv_recv_wr *bad_wr;
+ DAT_COUNT i, total_len;
- dapl_dbg_log (DAPL_DBG_TYPE_EP,
- " post_rcv: ep %p cookie %p segs %d l_iov %p\n",
- ep_ptr, cookie, segments, local_iov);
+ dapl_dbg_log(DAPL_DBG_TYPE_EP,
+ " post_rcv: ep %p cookie %p segs %d l_iov %p\n",
+ ep_ptr, cookie, segments, local_iov);
- if ( segments <= DEFAULT_DS_ENTRIES )
+ if (segments <= DEFAULT_DS_ENTRIES)
ds_array_p = ds_array;
else
- ds_array_start_p = ds_array_p =
+ ds_array_p =
dapl_os_alloc(segments *
sizeof(ib_data_segment_t));
if (NULL == ds_array_p)
@@ -93,18 +92,18 @@ dapls_ib_post_recv (
wr.wr_id = (uint64_t)(uintptr_t)cookie;
wr.sg_list = ds_array_p;
- for (i = 0; i < segments; i++ ) {
- if ( !local_iov[i].segment_length )
+ for (i = 0; i < segments; i++) {
+ if (!local_iov[i].segment_length)
continue;
- ds_array_p->addr = (uint64_t)
local_iov[i].virtual_address;
+ ds_array_p->addr = (uint64_t)
local_iov[i].virtual_address;
ds_array_p->length = local_iov[i].segment_length;
- ds_array_p->lkey = local_iov[i].lmr_context;
+ ds_array_p->lkey = local_iov[i].lmr_context;
- dapl_dbg_log ( DAPL_DBG_TYPE_EP,
- " post_rcv: l_key 0x%x va %p len %d\n",
- ds_array_p->lkey, ds_array_p->addr,
- ds_array_p->length );
+ dapl_dbg_log(DAPL_DBG_TYPE_EP,
+ " post_rcv: l_key 0x%x va %p len %d\n",
+ ds_array_p->lkey, ds_array_p->addr,
+ ds_array_p->length );
total_len += ds_array_p->length;
wr.num_sge++;
@@ -114,18 +113,12 @@ dapls_ib_post_recv (
if (cookie != NULL)
cookie->val.dto.size = total_len;
- ret = ibv_post_recv(ep_ptr->qp_handle, &wr, &bad_wr);
+ if (ibv_post_recv(ep_ptr->qp_handle, &wr, &bad_wr))
+ return( dapl_convert_errno(errno,"ibv_recv") );
- if (ds_array_start_p != NULL)
- dapl_os_free(ds_array_start_p, segments *
sizeof(ib_data_segment_t));
-
- if (ret)
- return( dapl_convert_errno(EFAULT,"ibv_recv") );
-
return DAT_SUCCESS;
}
-
/*
* dapls_ib_post_send
*
@@ -133,35 +126,36 @@ dapls_ib_post_recv (
*/
STATIC _INLINE_ DAT_RETURN
dapls_ib_post_send (
- IN DAPL_EP *ep_ptr,
- IN ib_send_op_type_t op_type,
- IN DAPL_COOKIE *cookie,
- IN DAT_COUNT segments,
- IN DAT_LMR_TRIPLET *local_iov,
- IN const DAT_RMR_TRIPLET *remote_iov,
- IN DAT_COMPLETION_FLAGS completion_flags)
+ IN DAPL_EP *ep_ptr,
+ IN ib_send_op_type_t op_type,
+ IN DAPL_COOKIE *cookie,
+ IN DAT_COUNT segments,
+ IN DAT_LMR_TRIPLET *local_iov,
+ IN const DAT_RMR_TRIPLET *remote_iov,
+ IN DAT_COMPLETION_FLAGS completion_flags)
{
- dapl_dbg_log (DAPL_DBG_TYPE_EP,
- " post_snd: ep %p op %d ck %p sgs %d l_iov %p
r_iov %p f %d\n",
- ep_ptr, op_type, cookie, segments, local_iov,
- remote_iov, completion_flags);
-
- ib_data_segment_t ds_array[DEFAULT_DS_ENTRIES];
- ib_data_segment_t *ds_array_p, *ds_array_start_p = NULL;
- struct ibv_send_wr wr;
- struct ibv_send_wr *bad_wr;
- ib_hca_transport_t *ibt_ptr =
&ep_ptr->header.owner_ia->hca_ptr->ib_trans;
- DAT_COUNT i, total_len;
- int ret;
+ dapl_dbg_log(DAPL_DBG_TYPE_EP,
+ " post_snd: ep %p op %d ck %p sgs",
+ "%d l_iov %p r_iov %p f %d\n",
+ ep_ptr, op_type, cookie, segments, local_iov,
+ remote_iov, completion_flags);
+
+ ib_data_segment_t ds_array[DEFAULT_DS_ENTRIES];
+ ib_data_segment_t *ds_array_p;
+ struct ibv_send_wr wr;
+ struct ibv_send_wr *bad_wr;
+ ib_hca_transport_t *ibt_ptr =
+ &ep_ptr->header.owner_ia->hca_ptr->ib_trans;
+ DAT_COUNT i, total_len;
- dapl_dbg_log (DAPL_DBG_TYPE_EP,
- " post_snd: ep %p cookie %p segs %d l_iov %p\n",
- ep_ptr, cookie, segments, local_iov);
+ dapl_dbg_log(DAPL_DBG_TYPE_EP,
+ " post_snd: ep %p cookie %p segs %d l_iov %p\n",
+ ep_ptr, cookie, segments, local_iov);
- if( segments <= DEFAULT_DS_ENTRIES )
+ if(segments <= DEFAULT_DS_ENTRIES)
ds_array_p = ds_array;
else
- ds_array_start_p = ds_array_p =
+ ds_array_p =
dapl_os_alloc(segments *
sizeof(ib_data_segment_t));
if (NULL == ds_array_p)
@@ -180,14 +174,14 @@ dapls_ib_post_send (
if ( !local_iov[i].segment_length )
continue;
- ds_array_p->addr = (uint64_t)
local_iov[i].virtual_address;
+ ds_array_p->addr = (uint64_t)
local_iov[i].virtual_address;
ds_array_p->length = local_iov[i].segment_length;
- ds_array_p->lkey = local_iov[i].lmr_context;
+ ds_array_p->lkey = local_iov[i].lmr_context;
- dapl_dbg_log ( DAPL_DBG_TYPE_EP,
- " post_snd: lkey 0x%x va %p len %d \n",
- ds_array_p->lkey, ds_array_p->addr,
- ds_array_p->length );
+ dapl_dbg_log(DAPL_DBG_TYPE_EP,
+ " post_snd: lkey 0x%x va %p len %d\n",
+ ds_array_p->lkey, ds_array_p->addr,
+ ds_array_p->length );
total_len += ds_array_p->length;
wr.num_sge++;
@@ -196,20 +190,21 @@ dapls_ib_post_send (
if (cookie != NULL)
cookie->val.dto.size = total_len;
-
+
if ((op_type == OP_RDMA_WRITE) || (op_type == OP_RDMA_READ)) {
wr.wr.rdma.remote_addr = remote_iov->target_address;
wr.wr.rdma.rkey = remote_iov->rmr_context;
- dapl_dbg_log ( DAPL_DBG_TYPE_EP,
- " post_snd_rdma: rkey 0x%x va
%#016Lx\n",
- wr.wr.rdma.rkey, wr.wr.rdma.remote_addr
);
+ dapl_dbg_log(DAPL_DBG_TYPE_EP,
+ " post_snd_rdma: rkey 0x%x va %#016Lx\n",
+ wr.wr.rdma.rkey, wr.wr.rdma.remote_addr);
}
+
/* inline data for send or write ops */
- if ((total_len <= ibt_ptr->max_inline_send ) &&
+ if ((total_len <= ibt_ptr->max_inline_send) &&
((op_type == OP_SEND) || (op_type == OP_RDMA_WRITE)))
wr.send_flags |= IBV_SEND_INLINE;
-
+
/* set completion flags in work request */
wr.send_flags |= (DAT_COMPLETION_SUPPRESS_FLAG &
completion_flags) ? 0 :
IBV_SEND_SIGNALED;
@@ -218,24 +213,19 @@ dapls_ib_post_send (
wr.send_flags |= (DAT_COMPLETION_SOLICITED_WAIT_FLAG &
completion_flags) ? IBV_SEND_SOLICITED :
0;
- dapl_dbg_log (DAPL_DBG_TYPE_EP,
- " post_snd: op 0x%x flags 0x%x sglist %p, %d\n",
- wr.opcode, wr.send_flags, wr.sg_list,
wr.num_sge);
+ dapl_dbg_log(DAPL_DBG_TYPE_EP,
+ " post_snd: op 0x%x flags 0x%x sglist %p, %d\n",
+ wr.opcode, wr.send_flags, wr.sg_list, wr.num_sge);
- ret = ibv_post_send(ep_ptr->qp_handle, &wr, &bad_wr);
+ if (ibv_post_send(ep_ptr->qp_handle, &wr, &bad_wr))
+ return( dapl_convert_errno(errno,"ibv_recv") );
- if (ds_array_start_p != NULL)
- dapl_os_free(ds_array_start_p, segments *
sizeof(ib_data_segment_t));
-
- if (ret)
- return( dapl_convert_errno(EFAULT,"ibv_send") );
-
- dapl_dbg_log (DAPL_DBG_TYPE_EP," post_snd: returned\n");
+ dapl_dbg_log(DAPL_DBG_TYPE_EP," post_snd: returned\n");
return DAT_SUCCESS;
}
STATIC _INLINE_ DAT_RETURN
-dapls_ib_optional_prv_dat (
+dapls_ib_optional_prv_dat(
IN DAPL_CR *cr_ptr,
IN const void *event_data,
OUT DAPL_CR **cr_pp)
@@ -243,34 +233,68 @@ dapls_ib_optional_prv_dat (
return DAT_SUCCESS;
}
+/* map Work Completions to DAPL WR operations */
STATIC _INLINE_ int dapls_cqe_opcode(ib_work_completion_t *cqe_p)
{
- switch (cqe_p->opcode) {
+ switch (cqe_p->opcode) {
case IBV_WC_SEND:
- return (OP_SEND);
+ return (OP_SEND);
case IBV_WC_RDMA_WRITE:
- return (OP_RDMA_WRITE);
+ if (cqe_p->wc_flags & IBV_WC_WITH_IMM)
+ return (OP_RDMA_WRITE_IMM);
+ else
+ return (OP_RDMA_WRITE);
case IBV_WC_RDMA_READ:
- return (OP_RDMA_READ);
+ return (OP_RDMA_READ);
case IBV_WC_COMP_SWAP:
- return (OP_COMP_AND_SWAP);
+ return (OP_COMP_AND_SWAP);
case IBV_WC_FETCH_ADD:
- return (OP_FETCH_AND_ADD);
+ return (OP_FETCH_AND_ADD);
case IBV_WC_BIND_MW:
- return (OP_BIND_MW);
+ return (OP_BIND_MW);
case IBV_WC_RECV:
- return (OP_RECEIVE);
+ if (cqe_p->wc_flags & IBV_WC_WITH_IMM)
+ return (OP_RECEIVE_IMM);
+ else
+ return (OP_RECEIVE);
case IBV_WC_RECV_RDMA_WITH_IMM:
- return (OP_RECEIVE_IMM);
+ return (OP_RECEIVE_IMM);
default:
- return (OP_INVALID);
- }
+ return (OP_INVALID);
+ }
+}
+
+#define DAPL_GET_CQE_OPTYPE(cqe_p) dapls_cqe_opcode(cqe_p)
+#define DAPL_GET_CQE_WRID(cqe_p) ((ib_work_completion_t*)cqe_p)->wr_id
+#define DAPL_GET_CQE_STATUS(cqe_p)
((ib_work_completion_t*)cqe_p)->status
+#define DAPL_GET_CQE_BYTESNUM(cqe_p)
((ib_work_completion_t*)cqe_p)->byte_len
+#define DAPL_GET_CQE_IMMED_DATA(cqe_p)
((ib_work_completion_t*)cqe_p)->imm_data
+#define DAPL_GET_CQE_VENDOR_ERR(cqe_p)
((ib_work_completion_t*)cqe_p)->vendor_err
+
+STATIC _INLINE_ char * dapls_dto_op_str(int op)
+{
+ static char *optable[] =
+ {
+ "OP_RDMA_WRITE",
+ "OP_RDMA_WRITE_IMM",
+ "OP_SEND",
+ "OP_SEND_IMM",
+ "OP_RDMA_READ",
+ "OP_COMP_AND_SWAP",
+ "OP_FETCH_AND_ADD",
+ "OP_RECEIVE",
+ "OP_RECEIVE_IMM",
+ "OP_BIND_MW",
+ 0
+ };
+ return ((op < 0 || op > 9) ? "Invalid CQE OP?" : optable[op]);
}
-#define DAPL_GET_CQE_OPTYPE(cqe_p) dapls_cqe_opcode(cqe_p)
-#define DAPL_GET_CQE_WRID(cqe_p)
((ib_work_completion_t*)cqe_p)->wr_id
-#define DAPL_GET_CQE_STATUS(cqe_p)
((ib_work_completion_t*)cqe_p)->status
-#define DAPL_GET_CQE_BYTESNUM(cqe_p)
((ib_work_completion_t*)cqe_p)->byte_len
-#define DAPL_GET_CQE_IMMED_DATA(cqe_p)
((ib_work_completion_t*)cqe_p)->imm_data
+static _INLINE_ char *
+dapls_cqe_op_str(IN ib_work_completion_t *cqe_ptr)
+{
+ return dapls_dto_op_str(DAPL_GET_CQE_OPTYPE(cqe_ptr));
+}
+#define DAPL_GET_CQE_OP_STR(cqe) dapls_cqe_op_str(cqe)
#endif /* _DAPL_IB_DTO_H_ */
diff --git a/dapl/openib_scm/dapl_ib_mem.c
b/dapl/openib_scm/dapl_ib_mem.c
index de36c0f..6a5e4a2 100644
--- a/dapl/openib_scm/dapl_ib_mem.c
+++ b/dapl/openib_scm/dapl_ib_mem.c
@@ -1,4 +1,6 @@
/*
+ * Copyright (c) 2005-2007 Intel Corporation. All rights reserved.
+ *
* This Software is licensed under one of the following licenses:
*
* 1) under the terms of the "Common Public License 1.0" a copy of
which is
@@ -25,12 +27,11 @@
/**********************************************************************
*
- * MODULE: dapl_det_mem.c
+ * MODULE: dapl_ib_mem.c
*
- * PURPOSE: Intel DET APIs: Memory windows, registration,
- * and protection domain
+ * PURPOSE: Memory windows, registration, and protection domain
*
- * $Id: $
+ * $Id:$
*
**********************************************************************/
@@ -61,8 +62,7 @@
*
*/
STATIC _INLINE_ int
-dapls_convert_privileges (
- IN DAT_MEM_PRIV_FLAGS privileges)
+dapls_convert_privileges(IN DAT_MEM_PRIV_FLAGS privileges)
{
int access = 0;
@@ -101,16 +101,15 @@ dapls_convert_privileges (
*
*/
DAT_RETURN
-dapls_ib_pd_alloc (
- IN DAPL_IA *ia_ptr,
- IN DAPL_PZ *pz )
+dapls_ib_pd_alloc(IN DAPL_IA *ia_ptr, IN DAPL_PZ *pz)
{
/* get a protection domain */
pz->pd_handle = ibv_alloc_pd(ia_ptr->hca_ptr->ib_hca_handle);
if (!pz->pd_handle)
return(dapl_convert_errno(ENOMEM,"alloc_pd"));
- dapl_dbg_log(DAPL_DBG_TYPE_UTIL, " pd_alloc: pd_handle=%p\n",
+ dapl_dbg_log(DAPL_DBG_TYPE_UTIL,
+ " pd_alloc: pd_handle=%p\n",
pz->pd_handle );
return DAT_SUCCESS;
@@ -134,13 +133,18 @@ dapls_ib_pd_alloc (
*
*/
DAT_RETURN
-dapls_ib_pd_free (
- IN DAPL_PZ *pz )
+dapls_ib_pd_free(IN DAPL_PZ *pz )
{
if (pz->pd_handle != IB_INVALID_HANDLE) {
+#if 1
+ ibv_dealloc_pd(pz->pd_handle);
+ pz->pd_handle = IB_INVALID_HANDLE;
+ return DAT_SUCCESS;
+#else
if (ibv_dealloc_pd(pz->pd_handle))
return(dapl_convert_errno(errno,"dealloc_pd"));
pz->pd_handle = IB_INVALID_HANDLE;
+#endif
}
return DAT_SUCCESS;
}
@@ -165,25 +169,24 @@ dapls_ib_pd_free (
*
*/
DAT_RETURN
-dapls_ib_mr_register (
- IN DAPL_IA *ia_ptr,
- IN DAPL_LMR *lmr,
- IN DAT_PVOID virt_addr,
- IN DAT_VLEN length,
- IN DAT_MEM_PRIV_FLAGS privileges)
+dapls_ib_mr_register(IN DAPL_IA *ia_ptr,
+ IN DAPL_LMR *lmr,
+ IN DAT_PVOID virt_addr,
+ IN DAT_VLEN length,
+ IN DAT_MEM_PRIV_FLAGS privileges)
{
ib_pd_handle_t ib_pd_handle;
ib_pd_handle = ((DAPL_PZ *)lmr->param.pz_handle)->pd_handle;
- dapl_dbg_log ( DAPL_DBG_TYPE_UTIL,
- " mr_register: ia=%p, lmr=%p va=%p ln=%d
pv=0x%x\n",
- ia_ptr, lmr, virt_addr, length, privileges );
+ dapl_dbg_log(DAPL_DBG_TYPE_UTIL,
+ " mr_register: ia=%p, lmr=%p va=%p ln=%d
pv=0x%x\n",
+ ia_ptr, lmr, virt_addr, length, privileges );
/* TODO: shared memory */
if (lmr->param.mem_type == DAT_MEM_TYPE_SHARED_VIRTUAL) {
- dapl_dbg_log( DAPL_DBG_TYPE_ERR,
- " mr_register_shared: NOT IMPLEMENTED\n");
+ dapl_dbg_log(DAPL_DBG_TYPE_ERR,
+ " mr_register_shared: NOT IMPLEMENTED\n");
return DAT_ERROR (DAT_NOT_IMPLEMENTED, DAT_NO_SUBTYPE);
}
@@ -200,16 +203,16 @@ dapls_ib_mr_register (
lmr->param.lmr_context = lmr->mr_handle->lkey;
lmr->param.rmr_context = lmr->mr_handle->rkey;
lmr->param.registered_size = length;
- lmr->param.registered_address = (DAT_VADDR)(uintptr_t)
virt_addr;
+ lmr->param.registered_address = (DAT_VADDR)(uintptr_t)virt_addr;
- dapl_dbg_log ( DAPL_DBG_TYPE_UTIL,
- " mr_register: mr=%p h %x pd %p ctx %p
,lkey=0x%x, rkey=0x%x priv=%x\n",
- lmr->mr_handle, lmr->mr_handle->handle,
- lmr->mr_handle->pd,
- lmr->mr_handle->context,
- lmr->mr_handle->lkey,
- lmr->mr_handle->rkey,
- length, dapls_convert_privileges(privileges) );
+ dapl_dbg_log(DAPL_DBG_TYPE_UTIL,
+ " mr_register: mr=%p addr=%p h %x pd %p ctx %p "
+ "lkey=0x%x rkey=0x%x priv=%x\n",
+ lmr->mr_handle, lmr->mr_handle->addr,
+ lmr->mr_handle->handle,
+ lmr->mr_handle->pd, lmr->mr_handle->context,
+ lmr->mr_handle->lkey, lmr->mr_handle->rkey,
+ length, dapls_convert_privileges(privileges));
return DAT_SUCCESS;
}
@@ -231,8 +234,7 @@ dapls_ib_mr_register (
*
*/
DAT_RETURN
-dapls_ib_mr_deregister (
- IN DAPL_LMR *lmr )
+dapls_ib_mr_deregister(IN DAPL_LMR *lmr)
{
if (lmr->mr_handle != IB_INVALID_HANDLE) {
if (ibv_dereg_mr(lmr->mr_handle))
@@ -263,13 +265,14 @@ dapls_ib_mr_deregister (
*
*/
DAT_RETURN
-dapls_ib_mr_register_shared (
- IN DAPL_IA *ia_ptr,
- IN DAPL_LMR *lmr,
- IN DAT_MEM_PRIV_FLAGS privileges )
+dapls_ib_mr_register_shared(IN DAPL_IA *ia_ptr,
+ IN DAPL_LMR *lmr,
+ IN DAT_MEM_PRIV_FLAGS privileges)
{
- dapl_dbg_log(DAPL_DBG_TYPE_ERR," mr_register_shared: NOT
IMPLEMENTED\n");
- return DAT_ERROR (DAT_NOT_IMPLEMENTED, DAT_NO_SUBTYPE);
+ dapl_dbg_log(DAPL_DBG_TYPE_ERR,
+ " mr_register_shared: NOT IMPLEMENTED\n");
+
+ return DAT_ERROR(DAT_NOT_IMPLEMENTED, DAT_NO_SUBTYPE);
}
/*
@@ -289,12 +292,13 @@ dapls_ib_mr_register_shared (
*
*/
DAT_RETURN
-dapls_ib_mw_alloc (
- IN DAPL_RMR *rmr )
+dapls_ib_mw_alloc (IN DAPL_RMR *rmr)
{
- dapl_dbg_log(DAPL_DBG_TYPE_ERR," mw_alloc: NOT IMPLEMENTED\n");
- return DAT_ERROR (DAT_NOT_IMPLEMENTED, DAT_NO_SUBTYPE);
+ dapl_dbg_log(DAPL_DBG_TYPE_ERR,
+ " mw_alloc: NOT IMPLEMENTED\n");
+
+ return DAT_ERROR(DAT_NOT_IMPLEMENTED, DAT_NO_SUBTYPE);
}
/*
@@ -314,11 +318,12 @@ dapls_ib_mw_alloc (
*
*/
DAT_RETURN
-dapls_ib_mw_free (
- IN DAPL_RMR *rmr )
+dapls_ib_mw_free(IN DAPL_RMR *rmr)
{
- dapl_dbg_log(DAPL_DBG_TYPE_ERR," mw_free: NOT IMPLEMENTED\n");
- return DAT_ERROR (DAT_NOT_IMPLEMENTED, DAT_NO_SUBTYPE);
+ dapl_dbg_log(DAPL_DBG_TYPE_ERR,
+ " mw_free: NOT IMPLEMENTED\n");
+
+ return DAT_ERROR(DAT_NOT_IMPLEMENTED, DAT_NO_SUBTYPE);
}
/*
@@ -339,17 +344,18 @@ dapls_ib_mw_free (
*
*/
DAT_RETURN
-dapls_ib_mw_bind (
- IN DAPL_RMR *rmr,
- IN DAPL_LMR *lmr,
- IN DAPL_EP *ep,
- IN DAPL_COOKIE *cookie,
- IN DAT_VADDR virtual_address,
- IN DAT_VLEN length,
- IN DAT_MEM_PRIV_FLAGS mem_priv,
- IN DAT_BOOLEAN is_signaled)
+dapls_ib_mw_bind(IN DAPL_RMR *rmr,
+ IN DAPL_LMR *lmr,
+ IN DAPL_EP *ep,
+ IN DAPL_COOKIE *cookie,
+ IN DAT_VADDR virtual_address,
+ IN DAT_VLEN length,
+ IN DAT_MEM_PRIV_FLAGS mem_priv,
+ IN DAT_BOOLEAN is_signaled)
{
- dapl_dbg_log(DAPL_DBG_TYPE_ERR," mw_bind: NOT IMPLEMENTED\n");
+ dapl_dbg_log(DAPL_DBG_TYPE_ERR,
+ " mw_bind: NOT IMPLEMENTED\n");
+
return DAT_ERROR (DAT_NOT_IMPLEMENTED, DAT_NO_SUBTYPE);
}
@@ -372,14 +378,15 @@ dapls_ib_mw_bind (
*
*/
DAT_RETURN
-dapls_ib_mw_unbind (
- IN DAPL_RMR *rmr,
- IN DAPL_EP *ep,
- IN DAPL_COOKIE *cookie,
- IN DAT_BOOLEAN is_signaled )
+dapls_ib_mw_unbind(IN DAPL_RMR *rmr,
+ IN DAPL_EP *ep,
+ IN DAPL_COOKIE *cookie,
+ IN DAT_BOOLEAN is_signaled )
{
- dapl_dbg_log(DAPL_DBG_TYPE_ERR," mw_unbind: NOT IMPLEMENTED\n");
- return DAT_ERROR (DAT_NOT_IMPLEMENTED, DAT_NO_SUBTYPE);
+ dapl_dbg_log(DAPL_DBG_TYPE_ERR,
+ " mw_unbind: NOT IMPLEMENTED\n");
+
+ return DAT_ERROR(DAT_NOT_IMPLEMENTED, DAT_NO_SUBTYPE);
}
/*
diff --git a/dapl/openib_scm/dapl_ib_qp.c b/dapl/openib_scm/dapl_ib_qp.c
index 3a1e3c8..1eba2bd 100644
--- a/dapl/openib_scm/dapl_ib_qp.c
+++ b/dapl/openib_scm/dapl_ib_qp.c
@@ -110,15 +110,23 @@ dapls_ib_qp_alloc (
/* Setup attributes and create qp */
dapl_os_memzero((void*)&qp_create, sizeof(qp_create));
qp_create.send_cq = req_cq;
- qp_create.recv_cq = rcv_cq;
qp_create.cap.max_send_wr = attr->max_request_dtos;
- qp_create.cap.max_recv_wr = attr->max_recv_dtos;
qp_create.cap.max_send_sge = attr->max_request_iov;
- qp_create.cap.max_recv_sge = attr->max_recv_iov;
qp_create.cap.max_inline_data =
ia_ptr->hca_ptr->ib_trans.max_inline_send;
qp_create.qp_type = IBV_QPT_RC;
qp_create.qp_context = (void*)ep_ptr;
+ /* ibv assumes rcv_cq is never NULL, set to req_cq */
+ if (rcv_cq == NULL) {
+ qp_create.recv_cq = req_cq;
+ qp_create.cap.max_recv_wr = 0;
+ qp_create.cap.max_recv_sge = 0;
+ } else {
+ qp_create.recv_cq = rcv_cq;
+ qp_create.cap.max_recv_wr = attr->max_recv_dtos;
+ qp_create.cap.max_recv_sge = attr->max_recv_iov;
+ }
+
ep_ptr->qp_handle = ibv_create_qp( ib_pd_handle, &qp_create);
if (!ep_ptr->qp_handle)
return(dapl_convert_errno(ENOMEM, "create_qp"));
@@ -298,9 +306,10 @@ dapls_modify_qp_state ( IN ib_qp_handle_t
qp_handle,
IN ib_qp_state_t qp_state,
IN ib_qp_cm_t *qp_cm )
{
- struct ibv_qp_attr qp_attr;
+ struct ibv_qp_attr qp_attr;
enum ibv_qp_attr_mask mask = IBV_QP_STATE;
- DAPL_EP *ep_ptr = (DAPL_EP*)qp_handle->qp_context;
+ DAPL_EP *ep_ptr =
(DAPL_EP*)qp_handle->qp_context;
+ DAPL_IA *ia_ptr = ep_ptr->header.owner_ia;
dapl_os_memzero((void*)&qp_attr, sizeof(qp_attr));
qp_attr.qp_state = qp_state;
@@ -315,14 +324,16 @@ dapls_modify_qp_state ( IN ib_qp_handle_t
qp_handle,
IBV_QP_RQ_PSN |
IBV_QP_MAX_DEST_RD_ATOMIC |
IBV_QP_MIN_RNR_TIMER;
+
qp_attr.qp_state = IBV_QPS_RTR;
- qp_attr.path_mtu = IBV_MTU_1024;
+ qp_attr.path_mtu = IBV_MTU_2048;
qp_attr.dest_qp_num = qp_cm->qpn;
qp_attr.rq_psn = 1;
qp_attr.max_dest_rd_atomic =
ep_ptr->param.ep_attr.max_rdma_read_out;
- qp_attr.min_rnr_timer = 12;
+ qp_attr.min_rnr_timer =
ia_ptr->hca_ptr->ib_trans.rnr_timer;
qp_attr.ah_attr.is_global = 0;
+ qp_attr.ah_attr.grh.dgid = qp_cm->gid;
qp_attr.ah_attr.dlid = qp_cm->lid;
qp_attr.ah_attr.sl = 0;
qp_attr.ah_attr.src_path_bits = 0;
@@ -343,30 +354,25 @@ dapls_modify_qp_state ( IN ib_qp_handle_t
qp_handle,
IBV_QP_RNR_RETRY |
IBV_QP_SQ_PSN |
IBV_QP_MAX_QP_RD_ATOMIC;
+
qp_attr.qp_state = IBV_QPS_RTS;
- qp_attr.timeout = 14;
- qp_attr.retry_cnt = 7;
- qp_attr.rnr_retry = 7;
+ qp_attr.timeout =
ia_ptr->hca_ptr->ib_trans.ack_timer;
+ qp_attr.retry_cnt =
ia_ptr->hca_ptr->ib_trans.ack_retry;
+ qp_attr.rnr_retry =
ia_ptr->hca_ptr->ib_trans.rnr_retry;
qp_attr.sq_psn = 1;
qp_attr.max_rd_atomic =
ep_ptr->param.ep_attr.max_rdma_read_out;
- dapl_dbg_log (DAPL_DBG_TYPE_EP,
- " modify_qp_rts: psn %x rd_atomic %d\n",
- qp_attr.sq_psn, qp_attr.max_rd_atomic );
+ dapl_dbg_log(DAPL_DBG_TYPE_EP,
+ " modify_qp_rts: psn %x rd_atomic %d ack
%d "
+ " retry %d rnr_retry %d\n",
+ qp_attr.sq_psn, qp_attr.max_rd_atomic,
+ qp_attr.timeout, qp_attr.retry_cnt,
+ qp_attr.rnr_retry );
break;
}
case IBV_QPS_INIT:
{
- DAPL_IA *ia_ptr;
- DAPL_EP *ep_ptr;
- /* need to find way back to port num */
- ep_ptr = (DAPL_EP*)qp_handle->qp_context;
- if (ep_ptr)
- ia_ptr = ep_ptr->header.owner_ia;
- else
- break;
-
mask |= IBV_QP_PKEY_INDEX |
IBV_QP_PORT |
IBV_QP_ACCESS_FLAGS;
@@ -377,7 +383,8 @@ dapls_modify_qp_state ( IN ib_qp_handle_t
qp_handle,
IBV_ACCESS_LOCAL_WRITE |
IBV_ACCESS_REMOTE_WRITE |
IBV_ACCESS_REMOTE_READ |
- IBV_ACCESS_REMOTE_ATOMIC;
+ IBV_ACCESS_REMOTE_ATOMIC |
+ IBV_ACCESS_MW_BIND;
dapl_dbg_log (DAPL_DBG_TYPE_EP,
" modify_qp_init: pi %x port %x acc
%x\n",
diff --git a/dapl/openib_scm/dapl_ib_util.c
b/dapl/openib_scm/dapl_ib_util.c
index 5e34b47..a9941f5 100644
--- a/dapl/openib_scm/dapl_ib_util.c
+++ b/dapl/openib_scm/dapl_ib_util.c
@@ -62,6 +62,7 @@ static const char rcsid[] = "$Id: $";
#include <fcntl.h>
int g_dapl_loopback_connection = 0;
+int g_scm_pipe[2];
/* just get IP address for hostname */
DAT_RETURN getipaddr( char *addr, int addr_len)
@@ -70,14 +71,14 @@ DAT_RETURN getipaddr( char *addr, int addr_len)
struct hostent *h_ptr;
struct utsname ourname;
- if ( uname( &ourname ) < 0 )
+ if (uname( &ourname ) < 0)
return DAT_INTERNAL_ERROR;
- h_ptr = gethostbyname( ourname.nodename );
- if ( h_ptr == NULL )
+ h_ptr = gethostbyname(ourname.nodename);
+ if (h_ptr == NULL)
return DAT_INTERNAL_ERROR;
- if ( h_ptr->h_addrtype == AF_INET ) {
+ if (h_ptr->h_addrtype == AF_INET) {
int i;
struct in_addr **alist =
(struct in_addr **)h_ptr->h_addr_list;
@@ -87,18 +88,17 @@ DAT_RETURN getipaddr( char *addr, int addr_len)
/* Walk the list of addresses for host */
for (i=0; alist[i] != NULL; i++) {
-
- /* first non-loopback address */
- if ( *(uint32_t*)alist[i] != htonl(0x7f000001) )
{
- dapl_os_memcpy( &ipv4_addr->sin_addr,
- h_ptr->h_addr_list[i],
- 4 );
+ /* first non-loopback address */
+ if (*(uint32_t*)alist[i] != htonl(0x7f000001)) {
+ dapl_os_memcpy(&ipv4_addr->sin_addr,
+ h_ptr->h_addr_list[i],
+ 4);
break;
}
}
/* if no acceptable address found */
if (*(uint32_t*)&ipv4_addr->sin_addr == 0)
- return DAT_INVALID_ADDRESS;
+ return DAT_INVALID_ADDRESS;
} else
return DAT_INVALID_ADDRESS;
@@ -122,6 +122,10 @@ DAT_RETURN getipaddr( char *addr, int addr_len)
*/
int32_t dapls_ib_init (void)
{
+ /* create pipe for waking up thread */
+ if (pipe(g_scm_pipe))
+ return 1;
+
return 0;
}
@@ -156,7 +160,7 @@ DAT_RETURN dapls_ib_open_hca (
int i;
DAT_RETURN dat_status = DAT_SUCCESS;
- dapl_dbg_log (DAPL_DBG_TYPE_UTIL,
+ dapl_dbg_log(DAPL_DBG_TYPE_UTIL,
" open_hca: %s - %p\n", hca_name, hca_ptr );
/* Get list of all IB devices, find match, open */
@@ -170,65 +174,83 @@ DAT_RETURN dapls_ib_open_hca (
for (i = 0; dev_list[i]; ++i) {
hca_ptr->ib_trans.ib_dev = dev_list[i];
- if
(!strcmp(ibv_get_device_name(hca_ptr->ib_trans.ib_dev),hca_name))
+ if
(!strcmp(ibv_get_device_name(hca_ptr->ib_trans.ib_dev),
+ hca_name))
goto found;
}
- dapl_dbg_log (DAPL_DBG_TYPE_ERR,
- " open_hca: IB device %s not found\n",
- hca_name);
+ dapl_log(DAPL_DBG_TYPE_ERR,
+ " open_hca: device %s not found\n",
+ hca_name);
goto err;
found:
- dapl_dbg_log (DAPL_DBG_TYPE_UTIL," open_hca: Found dev %s
%016llx\n",
- ibv_get_device_name(hca_ptr->ib_trans.ib_dev),
- (unsigned long
long)bswap_64(ibv_get_device_guid(hca_ptr->ib_trans.ib_dev)));
+ dapl_dbg_log(DAPL_DBG_TYPE_UTIL," open_hca: Found dev %s
%016llx\n",
+ ibv_get_device_name(hca_ptr->ib_trans.ib_dev),
+ (unsigned long long)
+
bswap_64(ibv_get_device_guid(hca_ptr->ib_trans.ib_dev)));
hca_ptr->ib_hca_handle =
ibv_open_device(hca_ptr->ib_trans.ib_dev);
if (!hca_ptr->ib_hca_handle) {
- dapl_dbg_log (DAPL_DBG_TYPE_ERR,
- " open_hca: IB dev open failed for
%s\n",
-
ibv_get_device_name(hca_ptr->ib_trans.ib_dev) );
+ dapl_log(DAPL_DBG_TYPE_ERR,
+ " open_hca: dev open failed for %s, err=%s\n",
+ ibv_get_device_name(hca_ptr->ib_trans.ib_dev),
+ strerror(errno));
goto err;
}
- /* set inline max with enviroment or default */
+ /* set RC tunables via enviroment or default */
hca_ptr->ib_trans.max_inline_send =
- dapl_os_get_env_val ( "DAPL_MAX_INLINE",
INLINE_SEND_DEFAULT );
+ dapl_os_get_env_val("DAPL_MAX_INLINE",
INLINE_SEND_DEFAULT);
+ hca_ptr->ib_trans.ack_retry =
+ dapl_os_get_env_val("DAPL_ACK_RETRY", SCM_ACK_RETRY);
+ hca_ptr->ib_trans.ack_timer =
+ dapl_os_get_env_val("DAPL_ACK_TIMER", SCM_ACK_TIMER);
+ hca_ptr->ib_trans.rnr_retry =
+ dapl_os_get_env_val("DAPL_RNR_RETRY", SCM_RNR_RETRY);
+ hca_ptr->ib_trans.rnr_timer =
+ dapl_os_get_env_val("DAPL_RNR_TIMER", SCM_RNR_TIMER);
/* initialize cq_lock */
dat_status = dapl_os_lock_init(&hca_ptr->ib_trans.cq_lock);
- if (dat_status != DAT_SUCCESS)
- {
- dapl_dbg_log (DAPL_DBG_TYPE_ERR,
- " open_hca: failed to init cq_lock\n");
+ if (dat_status != DAT_SUCCESS) {
+ dapl_log(DAPL_DBG_TYPE_ERR,
+ " open_hca: failed to init cq_lock\n");
goto bail;
}
/* EVD events without direct CQ channels, non-blocking */
hca_ptr->ib_trans.ib_cq =
ibv_create_comp_channel(hca_ptr->ib_hca_handle);
+ if (hca_ptr->ib_trans.ib_cq == NULL) {
+ dapl_log(DAPL_DBG_TYPE_ERR,
+ " open_hca: ibv_create_comp_channel ERR %s\n",
+ strerror(errno));
+ goto bail;
+ }
+
opts = fcntl(hca_ptr->ib_trans.ib_cq->fd, F_GETFL); /* uCQ */
if (opts < 0 || fcntl(hca_ptr->ib_trans.ib_cq->fd,
F_SETFL, opts | O_NONBLOCK) < 0) {
- dapl_dbg_log (DAPL_DBG_TYPE_ERR,
- " open_hca: ERR with CQ FD\n" );
+ dapl_log(DAPL_DBG_TYPE_ERR,
+ " open_hca: fcntl on ib_cq->fd %d ERR %d %s\n",
+ hca_ptr->ib_trans.ib_cq->fd, opts,
+ strerror(errno));
goto bail;
}
if (dapli_cq_thread_init(hca_ptr)) {
- dapl_dbg_log (DAPL_DBG_TYPE_ERR,
- " open_hca: cq_thread_init failed for
%s\n",
-
ibv_get_device_name(hca_ptr->ib_trans.ib_dev) );
+ dapl_log(DAPL_DBG_TYPE_ERR,
+ " open_hca: cq_thread_init failed for %s\n",
+
ibv_get_device_name(hca_ptr->ib_trans.ib_dev));
goto bail;
}
/* initialize cr_list lock */
dat_status = dapl_os_lock_init(&hca_ptr->ib_trans.lock);
- if (dat_status != DAT_SUCCESS)
- {
- dapl_dbg_log (DAPL_DBG_TYPE_ERR,
- " open_hca: failed to init lock\n");
+ if (dat_status != DAT_SUCCESS) {
+ dapl_log(DAPL_DBG_TYPE_ERR,
+ " open_hca: failed to init cr_list lock\n");
goto bail;
}
@@ -240,10 +262,9 @@ found:
dat_status = dapl_os_thread_create(cr_thread,
(void*)hca_ptr,
&hca_ptr->ib_trans.thread );
- if (dat_status != DAT_SUCCESS)
- {
- dapl_dbg_log (DAPL_DBG_TYPE_ERR,
- " open_hca: failed to create thread\n");
+ if (dat_status != DAT_SUCCESS) {
+ dapl_log(DAPL_DBG_TYPE_ERR,
+ " open_hca: failed to create thread\n");
goto bail;
}
@@ -251,7 +272,7 @@ found:
while (hca_ptr->ib_trans.cr_state != IB_THREAD_RUN) {
struct timespec sleep, remain;
sleep.tv_sec = 0;
- sleep.tv_nsec = 20000000; /* 20 ms */
+ sleep.tv_nsec = 2000000; /* 2 ms */
dapl_dbg_log(DAPL_DBG_TYPE_UTIL,
" open_hca: waiting for cr_thread\n");
nanosleep (&sleep, &remain);
@@ -259,16 +280,15 @@ found:
/* get the IP address of the device */
dat_status = getipaddr((char*)&hca_ptr->hca_address,
- sizeof(DAT_SOCK_ADDR6) );
- dapl_dbg_log (DAPL_DBG_TYPE_UTIL,
- " open_hca: %s, port %d, %s %d.%d.%d.%d\n",
- ibv_get_device_name(hca_ptr->ib_trans.ib_dev),
hca_ptr->port_num,
- ((struct sockaddr_in
*)&hca_ptr->hca_address)->sin_family == AF_INET ? "AF_INET":"AF_INET6",
- ((struct sockaddr_in
*)&hca_ptr->hca_address)->sin_addr.s_addr >> 0 & 0xff,
- ((struct sockaddr_in
*)&hca_ptr->hca_address)->sin_addr.s_addr >> 8 & 0xff,
- ((struct sockaddr_in
*)&hca_ptr->hca_address)->sin_addr.s_addr >> 16 & 0xff,
- ((struct sockaddr_in
*)&hca_ptr->hca_address)->sin_addr.s_addr >> 24 & 0xff );
-
+ sizeof(DAT_SOCK_ADDR6));
+
+ dapl_dbg_log(DAPL_DBG_TYPE_UTIL,
+ " open_hca: devname %s, port %d, hostname_IP %s\n",
+ ibv_get_device_name(hca_ptr->ib_trans.ib_dev),
+ hca_ptr->port_num,
+ inet_ntoa(((struct sockaddr_in *)
+ &hca_ptr->hca_address)->sin_addr));
+
ibv_free_device_list(dev_list);
return dat_status;
@@ -308,15 +328,15 @@ DAT_RETURN dapls_ib_close_hca ( IN DAPL_HCA
*hca_ptr )
return(dapl_convert_errno(errno,"ib_close_device"));
hca_ptr->ib_hca_handle = IB_INVALID_HANDLE;
}
-
dapl_os_lock_destroy(&hca_ptr->ib_trans.cq_lock);
/* destroy cr_thread and lock */
hca_ptr->ib_trans.cr_state = IB_THREAD_CANCEL;
+ write(g_scm_pipe[1], "w", sizeof "w");
while (hca_ptr->ib_trans.cr_state != IB_THREAD_EXIT) {
struct timespec sleep, remain;
sleep.tv_sec = 0;
- sleep.tv_nsec = 20000000; /* 20 ms */
+ sleep.tv_nsec = 2000000; /* 2 ms */
dapl_dbg_log(DAPL_DBG_TYPE_UTIL,
" close_hca: waiting for cr_thread\n");
nanosleep (&sleep, &remain);
@@ -378,14 +398,11 @@ DAT_RETURN dapls_ib_query_hca (
ia_attr->vendor_name[DAT_NAME_MAX_LENGTH - 1] = '\0';
ia_attr->ia_address_ptr =
(DAT_IA_ADDRESS_PTR)&hca_ptr->hca_address;
- dapl_dbg_log (DAPL_DBG_TYPE_UTIL,
- " query_hca: %s %s %d.%d.%d.%d\n",
- ibv_get_device_name(hca_ptr->ib_trans.ib_dev),
- ((struct sockaddr_in
*)ia_attr->ia_address_ptr)->sin_family == AF_INET ?
"AF_INET":"AF_INET6",
- ((struct sockaddr_in
*)ia_attr->ia_address_ptr)->sin_addr.s_addr >> 0 & 0xff,
- ((struct sockaddr_in
*)ia_attr->ia_address_ptr)->sin_addr.s_addr >> 8 & 0xff,
- ((struct sockaddr_in
*)ia_attr->ia_address_ptr)->sin_addr.s_addr >> 16 & 0xff,
- ((struct sockaddr_in
*)ia_attr->ia_address_ptr)->sin_addr.s_addr >> 24 & 0xff );
+ dapl_dbg_log(DAPL_DBG_TYPE_UTIL,
+ " query_hca: %s %s \n",
+
ibv_get_device_name(hca_ptr->ib_trans.ib_dev),
+ inet_ntoa(((struct sockaddr_in *)
+
&hca_ptr->hca_address)->sin_addr));
ia_attr->hardware_version_major = dev_attr.hw_ver;
/* ia_attr->hardware_version_minor = dev_attr.fw_ver;
*/
@@ -408,11 +425,14 @@ DAT_RETURN dapls_ib_query_hca (
ia_attr->max_pzs = dev_attr.max_pd;
ia_attr->max_mtu_size =
port_attr.max_msg_sz;
ia_attr->max_rdma_size =
port_attr.max_msg_sz;
+ ia_attr->max_iov_segments_per_rdma_read =
dev_attr.max_sge;
+ ia_attr->max_iov_segments_per_rdma_write =
dev_attr.max_sge;
ia_attr->num_transport_attr = 0;
ia_attr->transport_attr = NULL;
ia_attr->num_vendor_attr = 0;
ia_attr->vendor_attr = NULL;
-
+ hca_ptr->ib_trans.ack_timer =
DAPL_MAX(dev_attr.local_ca_ack_delay,
+
hca_ptr->ib_trans.ack_timer);
dapl_dbg_log (DAPL_DBG_TYPE_UTIL,
" query_hca: (%x.%x) ep %d ep_q %d evd %d evd_q
%d\n",
ia_attr->hardware_version_major,
@@ -420,11 +440,10 @@ DAT_RETURN dapls_ib_query_hca (
ia_attr->max_eps, ia_attr->max_dto_per_ep,
ia_attr->max_evds, ia_attr->max_evd_qlen );
dapl_dbg_log (DAPL_DBG_TYPE_UTIL,
- " query_hca: msg %llu rdma %llu iov %d lmr %d
rmr %d\n",
+ " query_hca: msg %llu rdma %llu iov %d lmr %d
rmr %d ack_time %d\n",
ia_attr->max_mtu_size, ia_attr->max_rdma_size,
ia_attr->max_iov_segments_per_dto,
ia_attr->max_lmrs,
- ia_attr->max_rmrs );
-
+ ia_attr->max_rmrs,hca_ptr->ib_trans.ack_timer );
}
if (ep_attr != NULL) {
@@ -443,7 +462,6 @@ DAT_RETURN dapls_ib_query_hca (
ep_attr->max_recv_dtos, ep_attr->max_recv_iov,
ep_attr->max_rdma_read_in,
ep_attr->max_rdma_read_out);
}
-
return DAT_SUCCESS;
}
diff --git a/dapl/openib_scm/dapl_ib_util.h
b/dapl/openib_scm/dapl_ib_util.h
index 0d928df..81a1752 100644
--- a/dapl/openib_scm/dapl_ib_util.h
+++ b/dapl/openib_scm/dapl_ib_util.h
@@ -71,15 +71,18 @@ typedef ib_hca_handle_t dapl_ibal_ca_t;
/* CM mappings, user CM not complete use SOCKETS */
-/* destination info to exchange until real IB CM shows up */
+/* destination info to exchange, define wire protocol version */
+#define DSCM_VER 2
typedef struct _ib_qp_cm
{
- uint32_t qpn;
+ uint16_t ver;
+ uint16_t rej;
uint16_t lid;
uint16_t port;
- int p_size;
+ uint32_t qpn;
+ uint32_t p_size;
DAT_SOCK_ADDR6 ia_address;
-
+ union ibv_gid gid;
} ib_qp_cm_t;
/*
@@ -94,20 +97,34 @@ struct ib_llist_entry
struct dapl_llist_entry *list_head;
};
+typedef enum scm_state
+{
+ SCM_INIT,
+ SCM_LISTEN,
+ SCM_CONN_PENDING,
+ SCM_ACCEPTING,
+ SCM_ACCEPTED,
+ SCM_REJECTED,
+ SCM_CONNECTED,
+ SCM_DISCONNECTED,
+ SCM_DESTROY
+} SCM_STATE;
+
struct ib_cm_handle
{
struct ib_llist_entry entry;
+ DAPL_OS_LOCK lock;
+ SCM_STATE state;
int socket;
- int l_socket;
- struct dapl_hca *hca_ptr;
- DAT_HANDLE cr;
+ struct dapl_hca *hca;
DAT_HANDLE sp;
+ struct dapl_ep *ep;
ib_qp_cm_t dst;
unsigned char p_data[256];
};
typedef struct ib_cm_handle *ib_cm_handle_t;
-typedef ib_cm_handle_t ib_cm_srvc_handle_t;
+typedef ib_cm_handle_t ib_cm_srvc_handle_t;
DAT_RETURN getipaddr(char *addr, int addr_len);
@@ -163,6 +180,12 @@ typedef struct ibv_comp_channel
*ib_wait_obj_handle_t;
/* inline send rdma threshold */
#define INLINE_SEND_DEFAULT 128
+/* RC timer - retry count defaults */
+#define SCM_ACK_TIMER 15 /* 5 bits, 4.096us*2^ack_timer. 15 ==
134ms */
+#define SCM_ACK_RETRY 7 /* 3 bits, 7 * 134ms = 940ms */
+#define SCM_RNR_TIMER 28 /* 5 bits, 28 == 163ms, 31 == 491ms */
+#define SCM_RNR_RETRY 7 /* 3 bits, 7 == infinite */
+
/* CM private data areas */
#define IB_MAX_REQ_PDATA_SIZE 92
#define IB_MAX_REP_PDATA_SIZE 196
@@ -268,7 +291,10 @@ typedef struct _ib_hca_transport
ib_async_cq_handler_t async_cq_error;
ib_async_dto_handler_t async_cq;
ib_async_qp_handler_t async_qp_error;
-
+ uint8_t ack_timer;
+ uint8_t ack_retry;
+ uint8_t rnr_timer;
+ uint8_t rnr_retry;
} ib_hca_transport_t;
/* provider specfic fields for shared memory support */
--
1.5.2.5
More information about the general
mailing list