[ofw] [PATCH] uDAPL v2 ucm: add new provider using a DAPL based IB-UD cm mechanism for MPI implementations.
Arlin Davis
arlin.r.davis at intel.com
Tue Aug 18 12:33:59 PDT 2009
New provider uses it's own CM protocol on top of IB-UD queue pairs.
During device open, this provider creates a UD queue pair and
returns local address information via dat_ia_query. This 24 byte
opaque address must be exchange out-of-band before connecting to a
server via dat_ep_connect. This provider is targeted for MPI
implementations that already exchange address information
during boot/init phase.
dtest, dtestx, and dtestcm was modified to report the lid and qpn
information on the server side so you can provide appropriate
destination address information for the client test suite.
Dapltest will not work with this provider.
Signed-off-by: Arlin Davis <arlin.r.davis at intel.com>
---
Makefile.am | 148 +++-
dapl/openib_cma/dapl_ib_util.h | 7 +-
dapl/openib_common/dapl_ib_common.h | 136 ++-
dapl/openib_common/qp.c | 336 ++++---
dapl/openib_scm/cm.c | 553 +++++------
dapl/openib_scm/dapl_ib_util.h | 17 +-
dapl/openib_ucm/README | 40 +
dapl/openib_ucm/SOURCES | 53 +
dapl/openib_ucm/cm.c | 1837 ++++++++++++++++++++++++++++++++++
dapl/openib_ucm/dapl_ib_util.h | 119 +++
dapl/openib_ucm/device.c | 603 +++++++++++
dapl/openib_ucm/linux/openib_osd.h | 21 +
dapl/openib_ucm/udapl.rc | 48 +
dapl/openib_ucm/windows/openib_osd.h | 35 +
test/dtest/dtest.c | 177 +++-
test/dtest/dtestcm.c | 146 ++-
test/dtest/dtestx.c | 88 ++-
17 files changed, 3782 insertions(+), 582 deletions(-)
create mode 100644 dapl/openib_ucm/README
create mode 100644 dapl/openib_ucm/SOURCES
create mode 100644 dapl/openib_ucm/cm.c
create mode 100644 dapl/openib_ucm/dapl_ib_util.h
create mode 100644 dapl/openib_ucm/device.c
create mode 100644 dapl/openib_ucm/linux/openib_osd.h
create mode 100644 dapl/openib_ucm/udapl.rc
create mode 100644 dapl/openib_ucm/windows/openib_osd.h
diff --git a/Makefile.am b/Makefile.am
index 6842c05..1fe71d9 100755
--- a/Makefile.am
+++ b/Makefile.am
@@ -17,12 +17,10 @@ endif
if EXT_TYPE_IB
XFLAGS = -DDAT_EXTENSIONS
-XPROGRAMS_CMA = dapl/openib_common/ib_extensions.c
-XPROGRAMS_SCM = dapl/openib_common/ib_extensions.c
+XPROGRAMS = dapl/openib_common/ib_extensions.c
else
XFLAGS =
-XPROGRAMS_CMA =
-XPROGRAMS_SCM =
+XPROGRAMS =
endif
if DEBUG
@@ -34,10 +32,12 @@ endif
datlibdir = $(libdir)
dapllibofadir = $(libdir)
daplliboscmdir = $(libdir)
+daplliboucmdir = $(libdir)
datlib_LTLIBRARIES = dat/udat/libdat2.la
dapllibofa_LTLIBRARIES = dapl/udapl/libdaplofa.la
daplliboscm_LTLIBRARIES = dapl/udapl/libdaploscm.la
+daplliboucm_LTLIBRARIES = dapl/udapl/libdaploucm.la
dat_udat_libdat2_la_CFLAGS = $(AM_CFLAGS) -D_GNU_SOURCE $(OSFLAGS) $(XFLAGS) \
-I$(srcdir)/dat/include/ -I$(srcdir)/dat/udat/ \
@@ -59,14 +59,24 @@ dapl_udapl_libdaploscm_la_CFLAGS = $(AM_CFLAGS) -D_GNU_SOURCE $(OSFLAGS) $(XFLAG
-I$(srcdir)/dapl/openib_scm \
-I$(srcdir)/dapl/openib_scm/linux
+dapl_udapl_libdaploucm_la_CFLAGS = $(AM_CFLAGS) -D_GNU_SOURCE $(OSFLAGS) $(XFLAGS) \
+ -DOPENIB -DCQ_WAIT_OBJECT \
+ -I$(srcdir)/dat/include/ -I$(srcdir)/dapl/include/ \
+ -I$(srcdir)/dapl/common -I$(srcdir)/dapl/udapl/linux \
+ -I$(srcdir)/dapl/openib_common \
+ -I$(srcdir)/dapl/openib_ucm \
+ -I$(srcdir)/dapl/openib_ucm/linux
+
if HAVE_LD_VERSION_SCRIPT
dat_version_script = -Wl,--version-script=$(srcdir)/dat/udat/libdat2.map
daplofa_version_script = -Wl,--version-script=$(srcdir)/dapl/udapl/libdaplofa.map
daploscm_version_script = -Wl,--version-script=$(srcdir)/dapl/udapl/libdaploscm.map
+ daploucm_version_script = -Wl,--version-script=$(srcdir)/dapl/udapl/libdaploucm.map
else
dat_version_script =
daplofa_version_script =
daploscm_version_script =
+ daploucm_version_script =
endif
#
@@ -192,14 +202,14 @@ dapl_udapl_libdaplofa_la_SOURCES = dapl/udapl/dapl_init.c \
dapl/openib_common/qp.c \
dapl/openib_common/util.c \
dapl/openib_cma/cm.c \
- dapl/openib_cma/device.c $(XPROGRAMS_CMA)
+ dapl/openib_cma/device.c $(XPROGRAMS)
dapl_udapl_libdaplofa_la_LDFLAGS = -version-info 2:0:0 $(daplofa_version_script) \
-Wl,-init,dapl_init -Wl,-fini,dapl_fini \
-lpthread -libverbs -lrdmacm
#
-# uDAPL OpenFabrics Socket CM version: libdaplscm.so
+# uDAPL OpenFabrics Socket CM version for IB: libdaplscm.so
#
dapl_udapl_libdaploscm_la_SOURCES = dapl/udapl/dapl_init.c \
dapl/udapl/dapl_evd_create.c \
@@ -306,11 +316,125 @@ dapl_udapl_libdaploscm_la_SOURCES = dapl/udapl/dapl_init.c \
dapl/openib_common/qp.c \
dapl/openib_common/util.c \
dapl/openib_scm/cm.c \
- dapl/openib_scm/device.c $(XPROGRAMS_SCM)
+ dapl/openib_scm/device.c $(XPROGRAMS)
dapl_udapl_libdaploscm_la_LDFLAGS = -version-info 2:0:0 $(daploscm_version_script) \
-Wl,-init,dapl_init -Wl,-fini,dapl_fini \
-lpthread -libverbs
+
+#
+# uDAPL OpenFabrics UD CM version for IB: libdaplucm.so
+#
+dapl_udapl_libdaploucm_la_SOURCES = dapl/udapl/dapl_init.c \
+ dapl/udapl/dapl_evd_create.c \
+ dapl/udapl/dapl_evd_query.c \
+ dapl/udapl/dapl_cno_create.c \
+ dapl/udapl/dapl_cno_modify_agent.c \
+ dapl/udapl/dapl_cno_free.c \
+ dapl/udapl/dapl_cno_wait.c \
+ dapl/udapl/dapl_cno_query.c \
+ dapl/udapl/dapl_lmr_create.c \
+ dapl/udapl/dapl_evd_wait.c \
+ dapl/udapl/dapl_evd_disable.c \
+ dapl/udapl/dapl_evd_enable.c \
+ dapl/udapl/dapl_evd_modify_cno.c \
+ dapl/udapl/dapl_evd_set_unwaitable.c \
+ dapl/udapl/dapl_evd_clear_unwaitable.c \
+ dapl/udapl/linux/dapl_osd.c \
+ dapl/common/dapl_cookie.c \
+ dapl/common/dapl_cr_accept.c \
+ dapl/common/dapl_cr_query.c \
+ dapl/common/dapl_cr_reject.c \
+ dapl/common/dapl_cr_util.c \
+ dapl/common/dapl_cr_callback.c \
+ dapl/common/dapl_cr_handoff.c \
+ dapl/common/dapl_ep_connect.c \
+ dapl/common/dapl_ep_create.c \
+ dapl/common/dapl_ep_disconnect.c \
+ dapl/common/dapl_ep_dup_connect.c \
+ dapl/common/dapl_ep_free.c \
+ dapl/common/dapl_ep_reset.c \
+ dapl/common/dapl_ep_get_status.c \
+ dapl/common/dapl_ep_modify.c \
+ dapl/common/dapl_ep_post_rdma_read.c \
+ dapl/common/dapl_ep_post_rdma_write.c \
+ dapl/common/dapl_ep_post_recv.c \
+ dapl/common/dapl_ep_post_send.c \
+ dapl/common/dapl_ep_query.c \
+ dapl/common/dapl_ep_util.c \
+ dapl/common/dapl_evd_dequeue.c \
+ dapl/common/dapl_evd_free.c \
+ dapl/common/dapl_evd_post_se.c \
+ dapl/common/dapl_evd_resize.c \
+ dapl/common/dapl_evd_util.c \
+ dapl/common/dapl_evd_cq_async_error_callb.c \
+ dapl/common/dapl_evd_qp_async_error_callb.c \
+ dapl/common/dapl_evd_un_async_error_callb.c \
+ dapl/common/dapl_evd_connection_callb.c \
+ dapl/common/dapl_evd_dto_callb.c \
+ dapl/common/dapl_get_consumer_context.c \
+ dapl/common/dapl_get_handle_type.c \
+ dapl/common/dapl_hash.c \
+ dapl/common/dapl_hca_util.c \
+ dapl/common/dapl_ia_close.c \
+ dapl/common/dapl_ia_open.c \
+ dapl/common/dapl_ia_query.c \
+ dapl/common/dapl_ia_util.c \
+ dapl/common/dapl_llist.c \
+ dapl/common/dapl_lmr_free.c \
+ dapl/common/dapl_lmr_query.c \
+ dapl/common/dapl_lmr_util.c \
+ dapl/common/dapl_lmr_sync_rdma_read.c \
+ dapl/common/dapl_lmr_sync_rdma_write.c \
+ dapl/common/dapl_mr_util.c \
+ dapl/common/dapl_provider.c \
+ dapl/common/dapl_sp_util.c \
+ dapl/common/dapl_psp_create.c \
+ dapl/common/dapl_psp_create_any.c \
+ dapl/common/dapl_psp_free.c \
+ dapl/common/dapl_psp_query.c \
+ dapl/common/dapl_pz_create.c \
+ dapl/common/dapl_pz_free.c \
+ dapl/common/dapl_pz_query.c \
+ dapl/common/dapl_pz_util.c \
+ dapl/common/dapl_rmr_create.c \
+ dapl/common/dapl_rmr_free.c \
+ dapl/common/dapl_rmr_bind.c \
+ dapl/common/dapl_rmr_query.c \
+ dapl/common/dapl_rmr_util.c \
+ dapl/common/dapl_rsp_create.c \
+ dapl/common/dapl_rsp_free.c \
+ dapl/common/dapl_rsp_query.c \
+ dapl/common/dapl_cno_util.c \
+ dapl/common/dapl_set_consumer_context.c \
+ dapl/common/dapl_ring_buffer_util.c \
+ dapl/common/dapl_name_service.c \
+ dapl/common/dapl_timer_util.c \
+ dapl/common/dapl_ep_create_with_srq.c \
+ dapl/common/dapl_ep_recv_query.c \
+ dapl/common/dapl_ep_set_watermark.c \
+ dapl/common/dapl_srq_create.c \
+ dapl/common/dapl_srq_free.c \
+ dapl/common/dapl_srq_query.c \
+ dapl/common/dapl_srq_resize.c \
+ dapl/common/dapl_srq_post_recv.c \
+ dapl/common/dapl_srq_set_lw.c \
+ dapl/common/dapl_srq_util.c \
+ dapl/common/dapl_debug.c \
+ dapl/common/dapl_ia_ha.c \
+ dapl/common/dapl_csp.c \
+ dapl/common/dapl_ep_post_send_invalidate.c \
+ dapl/common/dapl_ep_post_rdma_read_to_rmr.c \
+ dapl/openib_common/mem.c \
+ dapl/openib_common/cq.c \
+ dapl/openib_common/qp.c \
+ dapl/openib_common/util.c \
+ dapl/openib_ucm/cm.c \
+ dapl/openib_ucm/device.c $(XPROGRAMS)
+
+dapl_udapl_libdaploucm_la_LDFLAGS = -version-info 2:0:0 $(daploscm_version_script) \
+ -Wl,-init,dapl_init -Wl,-fini,dapl_fini \
+ -lpthread -libverbs
libdatincludedir = $(includedir)/dat2
@@ -375,9 +499,12 @@ EXTRA_DIST = dat/common/dat_dictionary.h \
dapl/openib_cma/linux/openib_osd.h \
dapl/openib_scm/dapl_ib_util.h \
dapl/openib_scm/linux/openib_osd.h \
+ dapl/openib_ucm/dapl_ib_util.h \
+ dapl/openib_ucm/linux/openib_osd.h \
dat/udat/libdat2.map \
dapl/udapl/libdaplofa.map \
dapl/udapl/libdaploscm.map \
+ dapl/udapl/libdaploucm.map \
dapl.spec.in \
$(man_MANS) \
test/dapltest/include/dapl_bpool.h \
@@ -419,12 +546,14 @@ install-exec-hook:
sed -e '/ofa-v2-.* u2/d' < $(DESTDIR)$(sysconfdir)/dat.conf > /tmp/$$$$ofadapl; \
cp /tmp/$$$$ofadapl $(DESTDIR)$(sysconfdir)/dat.conf; \
fi; \
- echo ofa-v2-mlx4_0-1 u2.0 nonthreadsafe default libdaploscm.so.2 dapl.2.0 '"mlx4_0 1" ""' >>
$(DESTDIR)$(sysconfdir)/dat.conf; \
- echo ofa-v2-mlx4_0-2 u2.0 nonthreadsafe default libdaploscm.so.2 dapl.2.0 '"mlx4_0 2" ""' >>
$(DESTDIR)$(sysconfdir)/dat.conf; \
echo ofa-v2-ib0 u2.0 nonthreadsafe default libdaplofa.so.2 dapl.2.0 '"ib0 0" ""' >> $(DESTDIR)$(sysconfdir)/dat.conf; \
echo ofa-v2-ib1 u2.0 nonthreadsafe default libdaplofa.so.2 dapl.2.0 '"ib1 0" ""' >> $(DESTDIR)$(sysconfdir)/dat.conf; \
echo ofa-v2-mthca0-1 u2.0 nonthreadsafe default libdaploscm.so.2 dapl.2.0 '"mthca0 1" ""' >>
$(DESTDIR)$(sysconfdir)/dat.conf; \
echo ofa-v2-mthca0-2 u2.0 nonthreadsafe default libdaploscm.so.2 dapl.2.0 '"mthca0 2" ""' >>
$(DESTDIR)$(sysconfdir)/dat.conf; \
+ echo ofa-v2-mlx4_0-1 u2.0 nonthreadsafe default libdaploscm.so.2 dapl.2.0 '"mlx4_0 1" ""' >>
$(DESTDIR)$(sysconfdir)/dat.conf; \
+ echo ofa-v2-mlx4_0-2 u2.0 nonthreadsafe default libdaploscm.so.2 dapl.2.0 '"mlx4_0 2" ""' >>
$(DESTDIR)$(sysconfdir)/dat.conf; \
+ echo ucm-mlx4-1 u2.0 nonthreadsafe default libdaploucm.so.2 dapl.2.0 '"mlx4_0 1" ""' >> $(DESTDIR)$(sysconfdir)/dat.conf; \
+ echo ucm-mlx4-2 u2.0 nonthreadsafe default libdaploucm.so.2 dapl.2.0 '"mlx4_0 2" ""' >> $(DESTDIR)$(sysconfdir)/dat.conf; \
echo ofa-v2-ipath0-1 u2.0 nonthreadsafe default libdaploscm.so.2 dapl.2.0 '"ipath0 1" ""' >>
$(DESTDIR)$(sysconfdir)/dat.conf; \
echo ofa-v2-ipath0-2 u2.0 nonthreadsafe default libdaploscm.so.2 dapl.2.0 '"ipath0 2" ""' >>
$(DESTDIR)$(sysconfdir)/dat.conf; \
echo ofa-v2-ehca0-2 u2.0 nonthreadsafe default libdaploscm.so.2 dapl.2.0 '"ehca0 1" ""' >> $(DESTDIR)$(sysconfdir)/dat.conf;
\
@@ -433,6 +562,7 @@ install-exec-hook:
uninstall-hook:
if test -e $(DESTDIR)$(sysconfdir)/dat.conf; then \
sed -e '/ofa-v2-.* u2/d' < $(DESTDIR)$(sysconfdir)/dat.conf > /tmp/$$$$ofadapl; \
+ sed -e '/ucm-.* u2/d' < $(DESTDIR)$(sysconfdir)/dat.conf > /tmp/$$$$ofadapl; \
cp /tmp/$$$$ofadapl $(DESTDIR)$(sysconfdir)/dat.conf; \
fi;
diff --git a/dapl/openib_cma/dapl_ib_util.h b/dapl/openib_cma/dapl_ib_util.h
index c9ab4d6..35900e7 100755
--- a/dapl/openib_cma/dapl_ib_util.h
+++ b/dapl/openib_cma/dapl_ib_util.h
@@ -72,8 +72,8 @@ struct dapl_cm_id {
DAT_SOCK_ADDR6 r_addr;
int p_len;
unsigned char p_data[256]; /* dapl max private data size */
- ib_qp_cm_t dst; /* dapls_modify_qp_state */
- struct ibv_ah *ah; /* dapls_modify_qp_state */
+ ib_cm_msg_t dst;
+ struct ibv_ah *ah;
};
typedef struct dapl_cm_id *dp_ib_cm_handle_t;
@@ -123,9 +123,6 @@ void dapli_async_event_cb(struct _ib_hca_transport *tp);
void dapli_cq_event_cb(struct _ib_hca_transport *tp);
dp_ib_cm_handle_t dapls_ib_cm_create(DAPL_EP *ep);
void dapls_ib_cm_free(dp_ib_cm_handle_t cm, DAPL_EP *ep);
-DAT_RETURN dapls_modify_qp_state(IN ib_qp_handle_t qp_handle,
- IN ib_qp_state_t qp_state,
- IN dp_ib_cm_handle_t cm);
STATIC _INLINE_ void dapls_print_cm_list(IN DAPL_IA * ia_ptr)
{
diff --git a/dapl/openib_common/dapl_ib_common.h b/dapl/openib_common/dapl_ib_common.h
index 2195767..3cd8885 100644
--- a/dapl/openib_common/dapl_ib_common.h
+++ b/dapl/openib_common/dapl_ib_common.h
@@ -50,25 +50,56 @@ typedef struct ibv_pd *ib_pd_handle_t;
typedef struct ibv_mr *ib_mr_handle_t;
typedef struct ibv_mw *ib_mw_handle_t;
typedef struct ibv_wc ib_work_completion_t;
+typedef struct ibv_ah *ib_ah_handle_t;
+typedef union ibv_gid *ib_gid_handle_t;
/* HCA context type maps to IB verbs */
typedef struct ibv_context *ib_hca_handle_t;
typedef ib_hca_handle_t dapl_ibal_ca_t;
/* QP info to exchange, wire protocol version for these CM's */
-#define DCM_VER 4
-typedef struct _ib_qp_cm
-{
+#define DCM_VER 5
+
+/* CM private data areas, same for all operations */
+#define DCM_MAX_PDATA_SIZE 128
+
+/*
+ * DAPL IB/QP address (type, port, lid, qp_num, gid) mapping to
+ * DAT_IA_ADDRESS_PTR, DAT_SOCK_ADDR2 (24 bytes)
+ * For applications, like MPI, that exchange IA_ADDRESS
+ * across the fabric before connecting, it eliminates the
+ * overhead of name and address resolution to the destination's
+ * CM services. UCM provider uses this for DAT_IA_ADDRESS.
+ */
+union dcm_addr {
+ DAT_SOCK_ADDR6 so;
+ struct {
+ uint8_t qp_type;
+ uint8_t port_num;
+ uint16_t lid;
+ uint32_t qpn;
+ union ibv_gid gid;
+ } ib;
+};
+
+/* 256 bytes total; default max_inline_send, min IB MTU size */
+typedef struct _ib_cm_msg
+{
uint16_t ver;
- uint16_t rej;
- uint16_t lid;
- uint16_t port;
- uint32_t qpn;
- uint32_t p_size;
- union ibv_gid gid;
- DAT_SOCK_ADDR6 ia_address;
- uint16_t qp_type;
-} ib_qp_cm_t;
+ uint16_t op;
+ uint16_t sport; /* src cm port */
+ uint16_t dport; /* dst cm port */
+ uint32_t sqpn; /* src cm qpn */
+ uint32_t dqpn; /* dst cm qpn */
+ uint16_t p_size;
+ uint8_t resv[14];
+ union dcm_addr saddr;
+ union dcm_addr daddr;
+ union dcm_addr saddr_alt;
+ union dcm_addr daddr_alt;
+ uint8_t p_data[DCM_MAX_PDATA_SIZE];
+
+} ib_cm_msg_t;
/* CM events */
typedef enum {
@@ -113,11 +144,27 @@ typedef uint16_t ib_hca_port_t;
/* inline send rdma threshold */
#define INLINE_SEND_IWARP_DEFAULT 64
-#define INLINE_SEND_IB_DEFAULT 200
+#define INLINE_SEND_IB_DEFAULT 256
/* qkey for UD QP's */
#define DAT_UD_QKEY 0x78654321
+/* RC timer - retry count defaults */
+#define DCM_ACK_TIMER 16 /* 5 bits, 4.096us*2^ack_timer. 16== 268ms */
+#define DCM_ACK_RETRY 7 /* 3 bits, 7 * 268ms = 1.8 seconds */
+#define DCM_RNR_TIMER 12 /* 5 bits, 12 =.64ms, 28 =163ms, 31 =491ms */
+#define DCM_RNR_RETRY 7 /* 3 bits, 7 == infinite */
+#define DCM_IB_MTU 2048
+
+/* Global routing defaults */
+#define DCM_GLOBAL 0 /* global routing is disabled */
+#define DCM_HOP_LIMIT 0xff
+#define DCM_TCLASS 0
+
+/* DAPL uCM timers */
+#define DCM_RETRY_CNT 7
+#define DCM_RETRY_TIME_MS 1000
+
/* DTO OPs, ordered for DAPL ENUM definitions */
#define OP_RDMA_WRITE IBV_WR_RDMA_WRITE
#define OP_RDMA_WRITE_IMM IBV_WR_RDMA_WRITE_WITH_IMM
@@ -201,6 +248,36 @@ typedef enum
} ib_thread_state_t;
+typedef enum dapl_cm_op
+{
+ DCM_REQ,
+ DCM_REP,
+ DCM_REJ_USER, /* user reject */
+ DCM_REJ_CM, /* cm reject, no SID */
+ DCM_RTU,
+ DCM_DREQ,
+ DCM_DREP
+
+} DAPL_CM_OP;
+
+typedef enum dapl_cm_state
+{
+ DCM_INIT,
+ DCM_LISTEN,
+ DCM_CONN_PENDING,
+ DCM_RTU_PENDING,
+ DCM_ACCEPTING,
+ DCM_ACCEPTING_DATA,
+ DCM_ACCEPTED,
+ DCM_REJECTING,
+ DCM_REJECTED,
+ DCM_CONNECTED,
+ DCM_RELEASED,
+ DCM_DISC_PENDING,
+ DCM_DISCONNECTED,
+ DCM_DESTROY
+
+} DAPL_CM_STATE;
/* provider specfic fields for shared memory support */
typedef uint32_t ib_shm_transport_t;
@@ -214,6 +291,19 @@ enum ibv_mtu dapl_ib_mtu(int mtu);
char *dapl_ib_mtu_str(enum ibv_mtu mtu);
DAT_RETURN getlocalipaddr(DAT_SOCK_ADDR *addr, int addr_len);
+/* qp.c */
+DAT_RETURN dapls_modify_qp_ud(IN DAPL_HCA *hca, IN ib_qp_handle_t qp);
+DAT_RETURN dapls_modify_qp_state(IN ib_qp_handle_t qp_handle,
+ IN ib_qp_state_t qp_state,
+ IN uint32_t qpn,
+ IN uint16_t lid,
+ IN ib_gid_handle_t gid);
+ib_ah_handle_t dapls_create_ah( IN DAPL_HCA *hca,
+ IN ib_pd_handle_t pd,
+ IN ib_qp_handle_t qp,
+ IN uint16_t lid,
+ IN ib_gid_handle_t gid);
+
/* inline functions */
STATIC _INLINE_ IB_HCA_NAME dapl_ib_convert_name (IN char *name)
{
@@ -260,22 +350,6 @@ dapl_convert_errno( IN int err, IN const char *str )
}
}
-typedef enum dapl_cm_state
-{
- DCM_INIT,
- DCM_LISTEN,
- DCM_CONN_PENDING,
- DCM_RTU_PENDING,
- DCM_ACCEPTING,
- DCM_ACCEPTING_DATA,
- DCM_ACCEPTED,
- DCM_REJECTED,
- DCM_CONNECTED,
- DCM_RELEASED,
- DCM_DISCONNECTED,
- DCM_DESTROY
-} DAPL_CM_STATE;
-
STATIC _INLINE_ char * dapl_cm_state_str(IN int st)
{
static char *state[] = {
@@ -286,13 +360,15 @@ STATIC _INLINE_ char * dapl_cm_state_str(IN int st)
"CM_ACCEPTING",
"CM_ACCEPTING_DATA",
"CM_ACCEPTED",
+ "CM_REJECTING",
"CM_REJECTED",
"CM_CONNECTED",
"CM_RELEASED",
+ "CM_DISC_PENDING",
"CM_DISCONNECTED",
"CM_DESTROY"
};
- return ((st < 0 || st > 11) ? "Invalid CM state?" : state[st]);
+ return ((st < 0 || st > 13) ? "Invalid CM state?" : state[st]);
}
#endif /* _DAPL_IB_COMMON_H_ */
diff --git a/dapl/openib_common/qp.c b/dapl/openib_common/qp.c
index 9aa0594..73d2c3f 100644
--- a/dapl/openib_common/qp.c
+++ b/dapl/openib_common/qp.c
@@ -176,7 +176,7 @@ dapls_ib_qp_alloc(IN DAPL_IA * ia_ptr,
/* Setup QP attributes for INIT state on the way out */
if (dapls_modify_qp_state(ep_ptr->qp_handle,
- IBV_QPS_INIT, NULL) != DAT_SUCCESS) {
+ IBV_QPS_INIT, 0, 0, 0) != DAT_SUCCESS) {
ibv_destroy_qp(ep_ptr->qp_handle);
ep_ptr->qp_handle = IB_INVALID_HANDLE;
return DAT_INTERNAL_ERROR;
@@ -219,7 +219,7 @@ DAT_RETURN dapls_ib_qp_free(IN DAPL_IA * ia_ptr, IN DAPL_EP * ep_ptr)
if (ep_ptr->qp_handle != NULL) {
/* force error state to flush queue, then destroy */
- dapls_modify_qp_state(ep_ptr->qp_handle, IBV_QPS_ERR, NULL);
+ dapls_modify_qp_state(ep_ptr->qp_handle, IBV_QPS_ERR, 0,0,0);
if (ibv_destroy_qp(ep_ptr->qp_handle))
return (dapl_convert_errno(errno, "destroy_qp"));
@@ -280,8 +280,8 @@ dapls_ib_qp_modify(IN DAPL_IA * ia_ptr,
/* move to error state if necessary */
if ((ep_ptr->qp_state == IB_QP_STATE_ERROR) &&
(ep_ptr->qp_handle->state != IBV_QPS_ERR)) {
- return (dapls_modify_qp_state(ep_ptr->qp_handle,
- IBV_QPS_ERR, NULL));
+ return (dapls_modify_qp_state(ep_ptr->qp_handle,
+ IBV_QPS_ERR, 0, 0, 0));
}
/*
@@ -345,8 +345,8 @@ void dapls_ib_reinit_ep(IN DAPL_EP * ep_ptr)
if (ep_ptr->qp_handle != IB_INVALID_HANDLE &&
ep_ptr->qp_handle->qp_type != IBV_QPT_UD) {
/* move to RESET state and then to INIT */
- dapls_modify_qp_state(ep_ptr->qp_handle, IBV_QPS_RESET, 0);
- dapls_modify_qp_state(ep_ptr->qp_handle, IBV_QPS_INIT, 0);
+ dapls_modify_qp_state(ep_ptr->qp_handle, IBV_QPS_RESET,0,0,0);
+ dapls_modify_qp_state(ep_ptr->qp_handle, IBV_QPS_INIT,0,0,0);
}
}
#endif // _WIN32 || _WIN64
@@ -354,152 +354,137 @@ void dapls_ib_reinit_ep(IN DAPL_EP * ep_ptr)
/*
* Generic QP modify for init, reset, error, RTS, RTR
* For UD, create_ah on RTR, qkey on INIT
+ * CM msg provides QP attributes, info in network order
*/
DAT_RETURN
-dapls_modify_qp_state(IN ib_qp_handle_t qp_handle,
- IN ib_qp_state_t qp_state,
- IN dp_ib_cm_handle_t cm_ptr)
+dapls_modify_qp_state(IN ib_qp_handle_t qp_handle,
+ IN ib_qp_state_t qp_state,
+ IN uint32_t qpn,
+ IN uint16_t lid,
+ IN ib_gid_handle_t gid)
{
struct ibv_qp_attr qp_attr;
enum ibv_qp_attr_mask mask = IBV_QP_STATE;
DAPL_EP *ep_ptr = (DAPL_EP *) qp_handle->qp_context;
DAPL_IA *ia_ptr = ep_ptr->header.owner_ia;
- ib_qp_cm_t *qp_cm = &cm_ptr->dst;
int ret;
dapl_os_memzero((void *)&qp_attr, sizeof(qp_attr));
qp_attr.qp_state = qp_state;
+
switch (qp_state) {
- /* additional attributes with RTR and RTS */
case IBV_QPS_RTR:
- {
- dapl_dbg_log(DAPL_DBG_TYPE_EP,
- " QPS_RTR: type %d state %d qpn %x lid %x"
- " port %x ep %p qp_state %d\n",
- qp_handle->qp_type, qp_handle->qp_type,
- qp_cm->qpn, qp_cm->lid, qp_cm->port,
- ep_ptr, ep_ptr->qp_state);
-
- mask |= IBV_QP_AV |
- IBV_QP_PATH_MTU |
- IBV_QP_DEST_QPN |
- IBV_QP_RQ_PSN |
- IBV_QP_MAX_DEST_RD_ATOMIC | IBV_QP_MIN_RNR_TIMER;
-
- qp_attr.dest_qp_num = qp_cm->qpn;
- qp_attr.rq_psn = 1;
- qp_attr.path_mtu = ia_ptr->hca_ptr->ib_trans.mtu;
- qp_attr.max_dest_rd_atomic =
- ep_ptr->param.ep_attr.max_rdma_read_out;
- qp_attr.min_rnr_timer =
- ia_ptr->hca_ptr->ib_trans.rnr_timer;
-
- /* address handle. RC and UD */
- qp_attr.ah_attr.dlid = qp_cm->lid;
- if (ia_ptr->hca_ptr->ib_trans.global) {
- qp_attr.ah_attr.is_global = 1;
- qp_attr.ah_attr.grh.dgid = qp_cm->gid;
- qp_attr.ah_attr.grh.hop_limit =
- ia_ptr->hca_ptr->ib_trans.hop_limit;
- qp_attr.ah_attr.grh.traffic_class =
- ia_ptr->hca_ptr->ib_trans.tclass;
- }
- qp_attr.ah_attr.sl = 0;
- qp_attr.ah_attr.src_path_bits = 0;
- qp_attr.ah_attr.port_num = ia_ptr->hca_ptr->port_num;
-#ifdef DAT_EXTENSIONS
- /* UD: create AH for remote side */
- if (qp_handle->qp_type == IBV_QPT_UD) {
- ib_pd_handle_t pz;
- pz = ((DAPL_PZ *)
- ep_ptr->param.pz_handle)->pd_handle;
- mask = IBV_QP_STATE;
- cm_ptr->ah = ibv_create_ah(pz,
- &qp_attr.ah_attr);
- if (!cm_ptr->ah)
- return (dapl_convert_errno(errno,
- "ibv_ah"));
-
- /* already RTR, multi remote AH's on QP */
- if (ep_ptr->qp_state == IBV_QPS_RTR ||
- ep_ptr->qp_state == IBV_QPS_RTS)
- return DAT_SUCCESS;
- }
-#endif
- break;
+ dapl_dbg_log(DAPL_DBG_TYPE_EP,
+ " QPS_RTR: type %d qpn 0x%x lid 0x%x"
+ " port %d ep %p qp_state %d \n",
+ qp_handle->qp_type,
+ ntohl(qpn), ntohs(lid),
+ ia_ptr->hca_ptr->port_num,
+ ep_ptr, ep_ptr->qp_state);
+
+ mask |= IBV_QP_AV |
+ IBV_QP_PATH_MTU |
+ IBV_QP_DEST_QPN |
+ IBV_QP_RQ_PSN |
+ IBV_QP_MAX_DEST_RD_ATOMIC | IBV_QP_MIN_RNR_TIMER;
+
+ qp_attr.dest_qp_num = ntohl(qpn);
+ qp_attr.rq_psn = 1;
+ qp_attr.path_mtu = ia_ptr->hca_ptr->ib_trans.mtu;
+ qp_attr.max_dest_rd_atomic =
+ ep_ptr->param.ep_attr.max_rdma_read_out;
+ qp_attr.min_rnr_timer =
+ ia_ptr->hca_ptr->ib_trans.rnr_timer;
+
+ /* address handle. RC and UD */
+ qp_attr.ah_attr.dlid = ntohs(lid);
+ if (ia_ptr->hca_ptr->ib_trans.global) {
+ qp_attr.ah_attr.is_global = 1;
+ qp_attr.ah_attr.grh.dgid.global.subnet_prefix =
+ ntohll(gid->global.subnet_prefix);
+ qp_attr.ah_attr.grh.dgid.global.interface_id =
+ ntohll(gid->global.interface_id);
+ qp_attr.ah_attr.grh.hop_limit =
+ ia_ptr->hca_ptr->ib_trans.hop_limit;
+ qp_attr.ah_attr.grh.traffic_class =
+ ia_ptr->hca_ptr->ib_trans.tclass;
}
+ qp_attr.ah_attr.sl = 0;
+ qp_attr.ah_attr.src_path_bits = 0;
+ qp_attr.ah_attr.port_num = ia_ptr->hca_ptr->port_num;
+
+ /* UD: already in RTR, RTS state */
+ if (qp_handle->qp_type == IBV_QPT_UD) {
+ if (ep_ptr->qp_state == IBV_QPS_RTR ||
+ ep_ptr->qp_state == IBV_QPS_RTS)
+ return DAT_SUCCESS;
+ }
+ break;
case IBV_QPS_RTS:
- {
- /* RC only */
- if (qp_handle->qp_type == IBV_QPT_RC) {
- mask |= IBV_QP_SQ_PSN |
- IBV_QP_TIMEOUT |
- IBV_QP_RETRY_CNT |
- IBV_QP_RNR_RETRY | IBV_QP_MAX_QP_RD_ATOMIC;
- qp_attr.timeout =
- ia_ptr->hca_ptr->ib_trans.ack_timer;
- qp_attr.retry_cnt =
- ia_ptr->hca_ptr->ib_trans.ack_retry;
- qp_attr.rnr_retry =
- ia_ptr->hca_ptr->ib_trans.rnr_retry;
- qp_attr.max_rd_atomic =
- ep_ptr->param.ep_attr.max_rdma_read_out;
- }
- /* RC and UD */
- qp_attr.qp_state = IBV_QPS_RTS;
- qp_attr.sq_psn = 1;
-
- dapl_dbg_log(DAPL_DBG_TYPE_EP,
- " QPS_RTS: psn %x rd_atomic %d ack %d "
- " retry %d rnr_retry %d ep %p qp_state %d\n",
- qp_attr.sq_psn, qp_attr.max_rd_atomic,
- qp_attr.timeout, qp_attr.retry_cnt,
- qp_attr.rnr_retry, ep_ptr,
- ep_ptr->qp_state);
-#ifdef DAT_EXTENSIONS
- if (qp_handle->qp_type == IBV_QPT_UD) {
- /* already RTS, multi remote AH's on QP */
- if (ep_ptr->qp_state == IBV_QPS_RTS)
- return DAT_SUCCESS;
- else
- mask = IBV_QP_STATE | IBV_QP_SQ_PSN;
- }
-#endif
- break;
+ if (qp_handle->qp_type == IBV_QPT_RC) {
+ mask |= IBV_QP_SQ_PSN |
+ IBV_QP_TIMEOUT |
+ IBV_QP_RETRY_CNT |
+ IBV_QP_RNR_RETRY | IBV_QP_MAX_QP_RD_ATOMIC;
+ qp_attr.timeout =
+ ia_ptr->hca_ptr->ib_trans.ack_timer;
+ qp_attr.retry_cnt =
+ ia_ptr->hca_ptr->ib_trans.ack_retry;
+ qp_attr.rnr_retry =
+ ia_ptr->hca_ptr->ib_trans.rnr_retry;
+ qp_attr.max_rd_atomic =
+ ep_ptr->param.ep_attr.max_rdma_read_out;
+ }
+ /* RC and UD */
+ qp_attr.qp_state = IBV_QPS_RTS;
+ qp_attr.sq_psn = 1;
+
+ dapl_dbg_log(DAPL_DBG_TYPE_EP,
+ " QPS_RTS: psn %x rd_atomic %d ack %d "
+ " retry %d rnr_retry %d ep %p qp_state %d\n",
+ qp_attr.sq_psn, qp_attr.max_rd_atomic,
+ qp_attr.timeout, qp_attr.retry_cnt,
+ qp_attr.rnr_retry, ep_ptr,
+ ep_ptr->qp_state);
+
+ if (qp_handle->qp_type == IBV_QPT_UD) {
+ /* already RTS, multi remote AH's on QP */
+ if (ep_ptr->qp_state == IBV_QPS_RTS)
+ return DAT_SUCCESS;
+ else
+ mask = IBV_QP_STATE | IBV_QP_SQ_PSN;
}
+ break;
case IBV_QPS_INIT:
- {
- mask |= IBV_QP_PKEY_INDEX | IBV_QP_PORT;
- if (qp_handle->qp_type == IBV_QPT_RC) {
- mask |= IBV_QP_ACCESS_FLAGS;
- qp_attr.qp_access_flags =
- IBV_ACCESS_LOCAL_WRITE |
- IBV_ACCESS_REMOTE_WRITE |
- IBV_ACCESS_REMOTE_READ |
- IBV_ACCESS_REMOTE_ATOMIC |
- IBV_ACCESS_MW_BIND;
- }
-#ifdef DAT_EXTENSIONS
- if (qp_handle->qp_type == IBV_QPT_UD) {
- /* already INIT, multi remote AH's on QP */
- if (ep_ptr->qp_state == IBV_QPS_INIT)
- return DAT_SUCCESS;
- mask |= IBV_QP_QKEY;
- qp_attr.qkey = DAT_UD_QKEY;
- }
-#endif
- qp_attr.pkey_index = 0;
- qp_attr.port_num = ia_ptr->hca_ptr->port_num;
-
- dapl_dbg_log(DAPL_DBG_TYPE_EP,
- " QPS_INIT: pi %x port %x acc %x qkey 0x%x\n",
- qp_attr.pkey_index, qp_attr.port_num,
- qp_attr.qp_access_flags, qp_attr.qkey);
- break;
+ mask |= IBV_QP_PKEY_INDEX | IBV_QP_PORT;
+ if (qp_handle->qp_type == IBV_QPT_RC) {
+ mask |= IBV_QP_ACCESS_FLAGS;
+ qp_attr.qp_access_flags =
+ IBV_ACCESS_LOCAL_WRITE |
+ IBV_ACCESS_REMOTE_WRITE |
+ IBV_ACCESS_REMOTE_READ |
+ IBV_ACCESS_REMOTE_ATOMIC |
+ IBV_ACCESS_MW_BIND;
+ }
+
+ if (qp_handle->qp_type == IBV_QPT_UD) {
+ /* already INIT, multi remote AH's on QP */
+ if (ep_ptr->qp_state == IBV_QPS_INIT)
+ return DAT_SUCCESS;
+ mask |= IBV_QP_QKEY;
+ qp_attr.qkey = DAT_UD_QKEY;
}
+
+ qp_attr.pkey_index = 0;
+ qp_attr.port_num = ia_ptr->hca_ptr->port_num;
+
+ dapl_dbg_log(DAPL_DBG_TYPE_EP,
+ " QPS_INIT: pi %x port %x acc %x qkey 0x%x\n",
+ qp_attr.pkey_index, qp_attr.port_num,
+ qp_attr.qp_access_flags, qp_attr.qkey);
+ break;
default:
break;
-
}
ret = ibv_modify_qp(qp_handle, &qp_attr, mask);
@@ -511,6 +496,93 @@ dapls_modify_qp_state(IN ib_qp_handle_t qp_handle,
}
}
+/* Modify UD type QP from init, rtr, rts, info network order */
+DAT_RETURN
+dapls_modify_qp_ud(IN DAPL_HCA *hca, IN ib_qp_handle_t qp)
+{
+ struct ibv_qp_attr qp_attr;
+
+ /* modify QP, setup and prepost buffers */
+ dapl_os_memzero((void *)&qp_attr, sizeof(qp_attr));
+ qp_attr.qp_state = IBV_QPS_INIT;
+ qp_attr.pkey_index = 0;
+ qp_attr.port_num = hca->port_num;
+ qp_attr.qkey = DAT_UD_QKEY;
+ if (ibv_modify_qp(qp, &qp_attr,
+ IBV_QP_STATE |
+ IBV_QP_PKEY_INDEX |
+ IBV_QP_PORT |
+ IBV_QP_QKEY)) {
+ dapl_log(DAPL_DBG_TYPE_ERR,
+ " modify_ud_qp INIT: ERR %s\n", strerror(errno));
+ return (dapl_convert_errno(errno, "modify_qp"));
+ }
+ dapl_os_memzero((void *)&qp_attr, sizeof(qp_attr));
+ qp_attr.qp_state = IBV_QPS_RTR;
+ if (ibv_modify_qp(qp, &qp_attr,IBV_QP_STATE)) {
+ dapl_log(DAPL_DBG_TYPE_ERR,
+ " modify_ud_qp RTR: ERR %s\n", strerror(errno));
+ return (dapl_convert_errno(errno, "modify_qp"));
+ }
+ dapl_os_memzero((void *)&qp_attr, sizeof(qp_attr));
+ qp_attr.qp_state = IBV_QPS_RTS;
+ qp_attr.sq_psn = 1;
+ if (ibv_modify_qp(qp, &qp_attr,
+ IBV_QP_STATE | IBV_QP_SQ_PSN)) {
+ dapl_log(DAPL_DBG_TYPE_ERR,
+ " modify_ud_qp RTS: ERR %s\n", strerror(errno));
+ return (dapl_convert_errno(errno, "modify_qp"));
+ }
+ return DAT_SUCCESS;
+}
+
+/* Create address handle for remote QP, info in network order */
+ib_ah_handle_t
+dapls_create_ah(IN DAPL_HCA *hca,
+ IN ib_pd_handle_t pd,
+ IN ib_qp_handle_t qp,
+ IN uint16_t lid,
+ IN ib_gid_handle_t gid)
+{
+ struct ibv_qp_attr qp_attr;
+ ib_ah_handle_t ah;
+
+ if (qp->qp_type != IBV_QPT_UD)
+ return NULL;
+
+ dapl_os_memzero((void *)&qp_attr, sizeof(qp_attr));
+ qp_attr.qp_state = IBV_QP_STATE;
+
+ /* address handle. RC and UD */
+ qp_attr.ah_attr.dlid = ntohs(lid);
+ if (gid != NULL) {
+ qp_attr.ah_attr.is_global = 1;
+ qp_attr.ah_attr.grh.dgid.global.subnet_prefix =
+ ntohll(gid->global.subnet_prefix);
+ qp_attr.ah_attr.grh.dgid.global.interface_id =
+ ntohll(gid->global.interface_id);
+ qp_attr.ah_attr.grh.hop_limit = hca->ib_trans.hop_limit;
+ qp_attr.ah_attr.grh.traffic_class = hca->ib_trans.tclass;
+ }
+ qp_attr.ah_attr.sl = 0;
+ qp_attr.ah_attr.src_path_bits = 0;
+ qp_attr.ah_attr.port_num = hca->port_num;
+
+ /* UD: create AH for remote side */
+ ah = ibv_create_ah(pd, &qp_attr.ah_attr);
+ if (!ah) {
+ dapl_log(DAPL_DBG_TYPE_ERR,
+ " create_ah: ERR %s\n", strerror(errno));
+ return NULL;
+ }
+
+ dapl_log(DAPL_DBG_TYPE_CM,
+ " dapls_create_ah: AH %p for lid %x\n",
+ ah, qp_attr.ah_attr.dlid);
+
+ return ah;
+}
+
/*
* Local variables:
* c-indent-level: 4
diff --git a/dapl/openib_scm/cm.c b/dapl/openib_scm/cm.c
index 416ee71..e779d41 100644
--- a/dapl/openib_scm/cm.c
+++ b/dapl/openib_scm/cm.c
@@ -46,11 +46,6 @@
*
**************************************************************************/
-#if defined(_WIN32)
-#define FD_SETSIZE 1024
-#define DAPL_FD_SETSIZE FD_SETSIZE
-#endif
-
#include "dapl.h"
#include "dapl_adapter_util.h"
#include "dapl_evd_util.h"
@@ -252,7 +247,7 @@ dp_ib_cm_handle_t dapls_ib_cm_create(DAPL_EP *ep)
if (dapl_os_lock_init(&cm_ptr->lock))
goto bail;
- cm_ptr->dst.ver = htons(DCM_VER);
+ cm_ptr->msg.ver = htons(DCM_VER);
cm_ptr->socket = DAPL_INVALID_SOCKET;
cm_ptr->ep = ep;
return cm_ptr;
@@ -437,7 +432,7 @@ DAT_RETURN dapli_socket_disconnect(dp_ib_cm_handle_t cm_ptr)
*/
static void dapli_socket_connected(dp_ib_cm_handle_t cm_ptr, int err)
{
- int ret, len, opt = 1;
+ int ret, len, exp, opt = 1;
struct iovec iov[2];
struct dapl_ep *ep_ptr = cm_ptr->ep;
@@ -450,56 +445,60 @@ static void dapli_socket_connected(dp_ib_cm_handle_t cm_ptr, int err)
ep_ptr->param.
remote_ia_address_ptr)->sin_addr),
ntohs(((struct sockaddr_in *)
- &cm_ptr->dst.ia_address)->sin_port));
+ &cm_ptr->msg.daddr.so)->sin_port));
goto bail;
}
- dapl_dbg_log(DAPL_DBG_TYPE_EP,
- " socket connected, write QP and private data\n");
/* no delay for small packets */
ret = setsockopt(cm_ptr->socket, IPPROTO_TCP, TCP_NODELAY,
(char *)&opt, sizeof(opt));
if (ret)
dapl_log(DAPL_DBG_TYPE_WARN,
- " connected: NODELAY setsockopt: %s\n",
+ " CONN_PENDING: NODELAY setsockopt: %s\n",
strerror(errno));
/* send qp info and pdata to remote peer */
- iov[0].iov_base = (void *)&cm_ptr->dst;
- iov[0].iov_len = sizeof(ib_qp_cm_t);
- if (cm_ptr->dst.p_size) {
- iov[1].iov_base = cm_ptr->p_data;
- iov[1].iov_len = ntohl(cm_ptr->dst.p_size);
+ exp = sizeof(ib_cm_msg_t) - DCM_MAX_PDATA_SIZE;
+ iov[0].iov_base = (void *)&cm_ptr->msg;
+ iov[0].iov_len = exp;
+ if (cm_ptr->msg.p_size) {
+ iov[1].iov_base = cm_ptr->msg.p_data;
+ iov[1].iov_len = ntohs(cm_ptr->msg.p_size);
len = writev(cm_ptr->socket, iov, 2);
} else {
len = writev(cm_ptr->socket, iov, 1);
}
- if (len != (ntohl(cm_ptr->dst.p_size) + sizeof(ib_qp_cm_t))) {
+ if (len != (exp + ntohs(cm_ptr->msg.p_size))) {
dapl_log(DAPL_DBG_TYPE_ERR,
- " CONN_PENDING write: ERR %s, wcnt=%d -> %s\n",
- strerror(errno), len, inet_ntoa(((struct sockaddr_in *)
- ep_ptr->param.
- remote_ia_address_ptr)->
- sin_addr));
+ " CONN_PENDING len ERR %s, wcnt=%d(%d) -> %s\n",
+ strerror(errno), len,
+ exp + ntohs(cm_ptr->msg.p_size),
+ inet_ntoa(((struct sockaddr_in *)
+ ep_ptr->param.
+ remote_ia_address_ptr)->sin_addr));
goto bail;
}
- dapl_dbg_log(DAPL_DBG_TYPE_CM,
- " connected: sending SRC port=0x%x lid=0x%x,"
+
+ dapl_dbg_log(DAPL_DBG_TYPE_CM,
+ " CONN_PENDING: sending SRC port=%d lid=0x%x,"
" qpn=0x%x, psize=%d\n",
- ntohs(cm_ptr->dst.port), ntohs(cm_ptr->dst.lid),
- ntohl(cm_ptr->dst.qpn), ntohl(cm_ptr->dst.p_size));
+ cm_ptr->msg.saddr.ib.port_num,
+ ntohs(cm_ptr->msg.saddr.ib.lid),
+ ntohl(cm_ptr->msg.saddr.ib.qpn),
+ ntohs(cm_ptr->msg.p_size));
dapl_dbg_log(DAPL_DBG_TYPE_CM,
- " connected: sending SRC GID subnet %016llx id %016llx\n",
+ " CONN_PENDING: SRC GID subnet %016llx id %016llx\n",
(unsigned long long)
- htonll(cm_ptr->dst.gid.global.subnet_prefix),
+ htonll(cm_ptr->msg.saddr.ib.gid.global.subnet_prefix),
(unsigned long long)
- htonll(cm_ptr->dst.gid.global.interface_id));
+ htonll(cm_ptr->msg.saddr.ib.gid.global.interface_id));
/* queue up to work thread to avoid blocking consumer */
cm_ptr->state = DCM_RTU_PENDING;
return;
- bail:
+
+bail:
/* close socket, free cm structure and post error event */
dapls_ib_cm_free(cm_ptr, cm_ptr->ep);
dapl_evd_connection_callback(NULL, IB_CME_LOCAL_FAILURE, NULL, ep_ptr);
@@ -554,25 +553,24 @@ dapli_socket_connect(DAPL_EP * ep_ptr,
return DAT_INVALID_ADDRESS;
}
- /* Send QP info, IA address, and private data */
- cm_ptr->dst.qpn = htonl(ep_ptr->qp_handle->qp_num);
-#ifdef DAT_EXTENSIONS
- cm_ptr->dst.qp_type = htons(ep_ptr->qp_handle->qp_type);
-#endif
- cm_ptr->dst.port = htons(ia_ptr->hca_ptr->port_num);
- cm_ptr->dst.lid = ia_ptr->hca_ptr->ib_trans.lid;
- cm_ptr->dst.gid = ia_ptr->hca_ptr->ib_trans.gid;
+ /* REQ: QP info in msg.saddr, IA address in msg.daddr, and pdata */
+ cm_ptr->msg.op = ntohs(DCM_REQ);
+ cm_ptr->msg.saddr.ib.qpn = htonl(ep_ptr->qp_handle->qp_num);
+ cm_ptr->msg.saddr.ib.qp_type = ep_ptr->qp_handle->qp_type;
+ cm_ptr->msg.saddr.ib.port_num = ia_ptr->hca_ptr->port_num;
+ cm_ptr->msg.saddr.ib.lid = ia_ptr->hca_ptr->ib_trans.lid;
+ cm_ptr->msg.saddr.ib.gid = ia_ptr->hca_ptr->ib_trans.gid;
/* save references */
cm_ptr->hca = ia_ptr->hca_ptr;
cm_ptr->ep = ep_ptr;
- cm_ptr->dst.ia_address = ia_ptr->hca_ptr->hca_address;
+ cm_ptr->msg.daddr.so = ia_ptr->hca_ptr->hca_address;
((struct sockaddr_in *)
- &cm_ptr->dst.ia_address)->sin_port = ntohs(r_qual);
+ &cm_ptr->msg.daddr.so)->sin_port = ntohs((uint16_t)r_qual);
if (p_size) {
- cm_ptr->dst.p_size = htonl(p_size);
- dapl_os_memcpy(cm_ptr->p_data, p_data, p_size);
+ cm_ptr->msg.p_size = htons(p_size);
+ dapl_os_memcpy(cm_ptr->msg.p_data, p_data, p_size);
}
/* connected or pending, either way results via async event */
@@ -581,18 +579,22 @@ dapli_socket_connect(DAPL_EP * ep_ptr,
else
cm_ptr->state = DCM_CONN_PENDING;
+ dapl_dbg_log(DAPL_DBG_TYPE_EP, " connect: p_data=%p %p\n",
+ cm_ptr->msg.p_data, cm_ptr->msg.p_data);
+
dapl_dbg_log(DAPL_DBG_TYPE_EP,
- " connect: socket %d to %s r_qual %d pending\n",
- cm_ptr->socket,
- inet_ntoa(addr.sin_addr), (unsigned int)r_qual);
+ " connect: %s r_qual %d pending, p_sz=%d, %d %d ...\n",
+ inet_ntoa(addr.sin_addr), (unsigned int)r_qual,
+ ntohs(cm_ptr->msg.p_size),
+ cm_ptr->msg.p_data[0], cm_ptr->msg.p_data[1]);
dapli_cm_queue(cm_ptr);
return DAT_SUCCESS;
- bail:
+
+bail:
dapl_log(DAPL_DBG_TYPE_ERR,
- " socket connect ERROR: %s query lid(0x%x)/gid"
- " -> %s r_qual %d\n",
- strerror(errno), ntohs(cm_ptr->dst.lid),
+ " connect ERROR: %s -> %s r_qual %d\n",
+ strerror(errno),
inet_ntoa(((struct sockaddr_in *)r_addr)->sin_addr),
(unsigned int)r_qual);
@@ -607,64 +609,60 @@ dapli_socket_connect(DAPL_EP * ep_ptr,
static void dapli_socket_connect_rtu(dp_ib_cm_handle_t cm_ptr)
{
DAPL_EP *ep_ptr = cm_ptr->ep;
- int len;
- short rtu_data = htons(0x0E0F);
- ib_cm_events_t event = IB_CME_DESTINATION_REJECT;
+ int len, exp = sizeof(ib_cm_msg_t) - DCM_MAX_PDATA_SIZE;
+ ib_cm_events_t event = IB_CME_LOCAL_FAILURE;
/* read DST information into cm_ptr, overwrite SRC info */
dapl_dbg_log(DAPL_DBG_TYPE_EP, " connect_rtu: recv peer QP data\n");
- len = recv(cm_ptr->socket, (char *)&cm_ptr->dst, sizeof(ib_qp_cm_t), 0);
- if (len != sizeof(ib_qp_cm_t) || ntohs(cm_ptr->dst.ver) != DCM_VER) {
+ len = recv(cm_ptr->socket, (char *)&cm_ptr->msg, exp, 0);
+ if (len != exp || ntohs(cm_ptr->msg.ver) != DCM_VER) {
dapl_log(DAPL_DBG_TYPE_ERR,
" CONN_RTU read: ERR %s, rcnt=%d, ver=%d -> %s\n",
- strerror(errno), len, cm_ptr->dst.ver,
+ strerror(errno), len, cm_ptr->msg.ver,
inet_ntoa(((struct sockaddr_in *)
ep_ptr->param.remote_ia_address_ptr)->
sin_addr));
goto bail;
}
- /* convert peer response values to host order */
- cm_ptr->dst.port = ntohs(cm_ptr->dst.port);
- cm_ptr->dst.lid = ntohs(cm_ptr->dst.lid);
- cm_ptr->dst.qpn = ntohl(cm_ptr->dst.qpn);
-#ifdef DAT_EXTENSIONS
- cm_ptr->dst.qp_type = ntohs(cm_ptr->dst.qp_type);
-#endif
- cm_ptr->dst.p_size = ntohl(cm_ptr->dst.p_size);
-
- /* save remote address information */
+ /* keep the QP, address info in network order */
+
+ /* save remote address information, in msg.daddr */
dapl_os_memcpy(&ep_ptr->remote_ia_address,
- &cm_ptr->dst.ia_address,
- sizeof(ep_ptr->remote_ia_address));
+ &cm_ptr->msg.daddr.so,
+ sizeof(union dcm_addr));
dapl_dbg_log(DAPL_DBG_TYPE_EP,
- " CONN_RTU: DST %s port=0x%x lid=0x%x,"
+ " CONN_RTU: DST %s %d port=0x%x lid=0x%x,"
" qpn=0x%x, qp_type=%d, psize=%d\n",
inet_ntoa(((struct sockaddr_in *)
- &cm_ptr->dst.ia_address)->sin_addr),
- cm_ptr->dst.port, cm_ptr->dst.lid,
- cm_ptr->dst.qpn, cm_ptr->dst.qp_type, cm_ptr->dst.p_size);
+ &cm_ptr->msg.daddr.so)->sin_addr),
+ ntohs(((struct sockaddr_in *)
+ &cm_ptr->msg.daddr.so)->sin_port),
+ cm_ptr->msg.saddr.ib.port_num,
+ ntohs(cm_ptr->msg.saddr.ib.lid),
+ ntohl(cm_ptr->msg.saddr.ib.qpn),
+ cm_ptr->msg.saddr.ib.qp_type,
+ ntohs(cm_ptr->msg.p_size));
/* validate private data size before reading */
- if (cm_ptr->dst.p_size > IB_MAX_REP_PDATA_SIZE) {
+ if (ntohs(cm_ptr->msg.p_size) > DCM_MAX_PDATA_SIZE) {
dapl_log(DAPL_DBG_TYPE_ERR,
" CONN_RTU read: psize (%d) wrong -> %s\n",
- cm_ptr->dst.p_size, inet_ntoa(((struct sockaddr_in *)
- ep_ptr->param.
- remote_ia_address_ptr)->
- sin_addr));
+ ntohs(cm_ptr->msg.p_size),
+ inet_ntoa(((struct sockaddr_in *)
+ ep_ptr->param.
+ remote_ia_address_ptr)->sin_addr));
goto bail;
}
/* read private data into cm_handle if any present */
- dapl_dbg_log(DAPL_DBG_TYPE_EP,
- " socket connected, read private data\n");
- if (cm_ptr->dst.p_size) {
- len =
- recv(cm_ptr->socket, cm_ptr->p_data, cm_ptr->dst.p_size, 0);
- if (len != cm_ptr->dst.p_size) {
+ dapl_dbg_log(DAPL_DBG_TYPE_EP," CONN_RTU: read private data\n");
+ exp = ntohs(cm_ptr->msg.p_size);
+ if (exp) {
+ len = recv(cm_ptr->socket, cm_ptr->msg.p_data, exp, 0);
+ if (len != exp) {
dapl_log(DAPL_DBG_TYPE_ERR,
" CONN_RTU read pdata: ERR %s, rcnt=%d -> %s\n",
strerror(errno), len,
@@ -675,17 +673,22 @@ static void dapli_socket_connect_rtu(dp_ib_cm_handle_t cm_ptr)
}
}
- /* check for consumer reject */
- if (cm_ptr->dst.rej) {
+ /* check for consumer or protocol stack reject */
+ if (ntohs(cm_ptr->msg.op) == DCM_REP)
+ event = IB_CME_CONNECTED;
+ else if (ntohs(cm_ptr->msg.op) == DCM_REJ_USER)
+ event = IB_CME_DESTINATION_REJECT_PRIVATE_DATA;
+ else
+ event = IB_CME_DESTINATION_REJECT;
+
+ if (event != IB_CME_CONNECTED) {
dapl_log(DAPL_DBG_TYPE_CM,
- " CONN_RTU read: PEER REJ reason=0x%x -> %s\n",
- ntohs(cm_ptr->dst.rej),
+ " CONN_RTU: reject from %s\n",
inet_ntoa(((struct sockaddr_in *)
- ep_ptr->param.remote_ia_address_ptr)->
- sin_addr));
- event = IB_CME_DESTINATION_REJECT_PRIVATE_DATA;
+ ep_ptr->param.
+ remote_ia_address_ptr)->sin_addr));
#ifdef DAT_EXTENSIONS
- if (cm_ptr->dst.qp_type == IBV_QPT_UD)
+ if (cm_ptr->msg.saddr.ib.qp_type == IBV_QPT_UD)
goto ud_bail;
else
#endif
@@ -695,32 +698,39 @@ static void dapli_socket_connect_rtu(dp_ib_cm_handle_t cm_ptr)
/* modify QP to RTR and then to RTS with remote info */
dapl_os_lock(&ep_ptr->header.lock);
if (dapls_modify_qp_state(ep_ptr->qp_handle,
- IBV_QPS_RTR, cm_ptr) != DAT_SUCCESS) {
+ IBV_QPS_RTR,
+ cm_ptr->msg.saddr.ib.qpn,
+ cm_ptr->msg.saddr.ib.lid,
+ NULL) != DAT_SUCCESS) {
dapl_log(DAPL_DBG_TYPE_ERR,
" CONN_RTU: QPS_RTR ERR %s -> %s\n",
- strerror(errno), inet_ntoa(((struct sockaddr_in *)
- ep_ptr->param.
- remote_ia_address_ptr)->
- sin_addr));
+ strerror(errno),
+ inet_ntoa(((struct sockaddr_in *)
+ ep_ptr->param.
+ remote_ia_address_ptr)->sin_addr));
dapl_os_unlock(&ep_ptr->header.lock);
goto bail;
}
if (dapls_modify_qp_state(ep_ptr->qp_handle,
- IBV_QPS_RTS, cm_ptr) != DAT_SUCCESS) {
+ IBV_QPS_RTS,
+ cm_ptr->msg.saddr.ib.qpn,
+ cm_ptr->msg.saddr.ib.lid,
+ NULL) != DAT_SUCCESS) {
dapl_log(DAPL_DBG_TYPE_ERR,
" CONN_RTU: QPS_RTS ERR %s -> %s\n",
- strerror(errno), inet_ntoa(((struct sockaddr_in *)
- ep_ptr->param.
- remote_ia_address_ptr)->
- sin_addr));
+ strerror(errno),
+ inet_ntoa(((struct sockaddr_in *)
+ ep_ptr->param.
+ remote_ia_address_ptr)->sin_addr));
dapl_os_unlock(&ep_ptr->header.lock);
goto bail;
}
dapl_os_unlock(&ep_ptr->header.lock);
dapl_dbg_log(DAPL_DBG_TYPE_EP, " connect_rtu: send RTU\n");
- /* complete handshake after final QP state change */
- if (send(cm_ptr->socket, (char *)&rtu_data, sizeof(rtu_data), 0) == -1) {
+ /* complete handshake after final QP state change, Just ver+op */
+ cm_ptr->msg.op = ntohs(DCM_RTU);
+ if (send(cm_ptr->socket, (char *)&cm_ptr->msg, 4, 0) == -1) {
dapl_log(DAPL_DBG_TYPE_ERR,
" CONN_RTU: write error = %s\n", strerror(errno));
goto bail;
@@ -732,30 +742,41 @@ static void dapli_socket_connect_rtu(dp_ib_cm_handle_t cm_ptr)
#ifdef DAT_EXTENSIONS
ud_bail:
- if (cm_ptr->dst.qp_type == IBV_QPT_UD) {
+ if (cm_ptr->msg.saddr.ib.qp_type == IBV_QPT_UD) {
DAT_IB_EXTENSION_EVENT_DATA xevent;
+ ib_pd_handle_t pd_handle =
+ ((DAPL_PZ *)ep_ptr->param.pz_handle)->pd_handle;
+
+ cm_ptr->ah = dapls_create_ah(cm_ptr->hca, pd_handle,
+ ep_ptr->qp_handle,
+ cm_ptr->msg.saddr.ib.lid,
+ NULL);
+ if (!cm_ptr->ah) {
+ event = IB_CME_LOCAL_FAILURE;
+ goto bail;
+ }
/* post EVENT, modify_qp created ah */
xevent.status = 0;
xevent.type = DAT_IB_UD_REMOTE_AH;
xevent.remote_ah.ah = cm_ptr->ah;
- xevent.remote_ah.qpn = cm_ptr->dst.qpn;
+ xevent.remote_ah.qpn = cm_ptr->msg.saddr.ib.qpn;
dapl_os_memcpy(&xevent.remote_ah.ia_addr,
- &cm_ptr->dst.ia_address,
- sizeof(cm_ptr->dst.ia_address));
+ &ep_ptr->remote_ia_address,
+ sizeof(union dcm_addr));
if (event == IB_CME_CONNECTED)
event = DAT_IB_UD_CONNECTION_EVENT_ESTABLISHED;
else
event = DAT_IB_UD_CONNECTION_REJECT_EVENT;
- dapls_evd_post_connection_event_ext((DAPL_EVD *) ep_ptr->param.
- connect_evd_handle,
- event,
- (DAT_EP_HANDLE) ep_ptr,
- (DAT_COUNT) cm_ptr->dst.p_size,
- (DAT_PVOID *) cm_ptr->p_data,
- (DAT_PVOID *) &xevent);
+ dapls_evd_post_connection_event_ext(
+ (DAPL_EVD *) ep_ptr->param.connect_evd_handle,
+ event,
+ (DAT_EP_HANDLE) ep_ptr,
+ (DAT_COUNT) cm_ptr->msg.p_size,
+ (DAT_PVOID *) cm_ptr->msg.p_data,
+ (DAT_PVOID *) &xevent);
/* done with socket, don't destroy cm_ptr, need pdata */
closesocket(cm_ptr->socket);
@@ -766,17 +787,17 @@ ud_bail:
{
ep_ptr->cm_handle = cm_ptr; /* only RC, multi CR's on UD */
dapl_evd_connection_callback(cm_ptr,
- IB_CME_CONNECTED,
- cm_ptr->p_data, ep_ptr);
+ event,
+ cm_ptr->msg.p_data, ep_ptr);
}
return;
bail:
/* close socket, and post error event */
- dapls_modify_qp_state(ep_ptr->qp_handle, IBV_QPS_ERR, 0);
+ dapls_modify_qp_state(ep_ptr->qp_handle, IBV_QPS_ERR, 0, 0, 0);
closesocket(cm_ptr->socket);
cm_ptr->socket = DAPL_INVALID_SOCKET;
- dapl_evd_connection_callback(NULL, event, cm_ptr->p_data, ep_ptr);
+ dapl_evd_connection_callback(NULL, event, cm_ptr->msg.p_data, ep_ptr);
}
/*
@@ -856,8 +877,6 @@ static void dapli_socket_accept(ib_cm_srvc_handle_t cm_ptr)
dp_ib_cm_handle_t acm_ptr;
int ret, len, opt = 1;
- dapl_dbg_log(DAPL_DBG_TYPE_EP, " socket_accept\n");
-
/*
* Accept all CR's on this port to avoid half-connection (SYN_RCV)
* stalls with many to one connection storms
@@ -870,25 +889,28 @@ static void dapli_socket_accept(ib_cm_srvc_handle_t cm_ptr)
acm_ptr->sp = cm_ptr->sp;
acm_ptr->hca = cm_ptr->hca;
- len = sizeof(acm_ptr->dst.ia_address);
+ len = sizeof(union dcm_addr);
acm_ptr->socket = accept(cm_ptr->socket,
(struct sockaddr *)
- &acm_ptr->dst.ia_address,
- (socklen_t *) & len);
+ &acm_ptr->msg.daddr.so,
+ (socklen_t *) &len);
if (acm_ptr->socket == DAPL_INVALID_SOCKET) {
dapl_log(DAPL_DBG_TYPE_ERR,
- " accept: ERR %s on FD %d l_cr %p\n",
+ " ACCEPT: ERR %s on FD %d l_cr %p\n",
strerror(errno), cm_ptr->socket, cm_ptr);
dapls_ib_cm_free(acm_ptr, acm_ptr->ep);
return;
}
+ dapl_dbg_log(DAPL_DBG_TYPE_CM, " accepting from %s\n",
+ inet_ntoa(((struct sockaddr_in *)
+ &acm_ptr->msg.daddr.so)->sin_addr));
/* no delay for small packets */
ret = setsockopt(acm_ptr->socket, IPPROTO_TCP, TCP_NODELAY,
(char *)&opt, sizeof(opt));
if (ret)
dapl_log(DAPL_DBG_TYPE_WARN,
- " accept: NODELAY setsockopt: %s\n",
+ " ACCEPT: NODELAY setsockopt: %s\n",
strerror(errno));
acm_ptr->state = DCM_ACCEPTING;
@@ -902,65 +924,57 @@ static void dapli_socket_accept(ib_cm_srvc_handle_t cm_ptr)
*/
static void dapli_socket_accept_data(ib_cm_srvc_handle_t acm_ptr)
{
- int len;
+ int len, exp = sizeof(ib_cm_msg_t) - DCM_MAX_PDATA_SIZE;
void *p_data = NULL;
dapl_dbg_log(DAPL_DBG_TYPE_EP, " socket accepted, read QP data\n");
/* read in DST QP info, IA address. check for private data */
- len =
- recv(acm_ptr->socket, (char *)&acm_ptr->dst, sizeof(ib_qp_cm_t), 0);
- if (len != sizeof(ib_qp_cm_t) || ntohs(acm_ptr->dst.ver) != DCM_VER) {
+ len = recv(acm_ptr->socket, (char *)&acm_ptr->msg, exp, 0);
+ if (len != exp || ntohs(acm_ptr->msg.ver) != DCM_VER) {
dapl_log(DAPL_DBG_TYPE_ERR,
- " accept read: ERR %s, rcnt=%d, ver=%d\n",
- strerror(errno), len, ntohs(acm_ptr->dst.ver));
+ " ACCEPT read: ERR %s, rcnt=%d, ver=%d\n",
+ strerror(errno), len, ntohs(acm_ptr->msg.ver));
goto bail;
}
- /* convert accepted values to host order */
- acm_ptr->dst.port = ntohs(acm_ptr->dst.port);
- acm_ptr->dst.lid = ntohs(acm_ptr->dst.lid);
- acm_ptr->dst.qpn = ntohl(acm_ptr->dst.qpn);
-#ifdef DAT_EXTENSIONS
- acm_ptr->dst.qp_type = ntohs(acm_ptr->dst.qp_type);
-#endif
- acm_ptr->dst.p_size = ntohl(acm_ptr->dst.p_size);
-
- dapl_dbg_log(DAPL_DBG_TYPE_EP,
- " accept: DST %s port=0x%x lid=0x%x, qpn=0x%x, psize=%d\n",
- inet_ntoa(((struct sockaddr_in *)&acm_ptr->dst.
- ia_address)->sin_addr), acm_ptr->dst.port,
- acm_ptr->dst.lid, acm_ptr->dst.qpn, acm_ptr->dst.p_size);
+ /* keep the QP, address info in network order */
/* validate private data size before reading */
- if (acm_ptr->dst.p_size > IB_MAX_REQ_PDATA_SIZE) {
+ exp = ntohs(acm_ptr->msg.p_size);
+ if (exp > DCM_MAX_PDATA_SIZE) {
dapl_dbg_log(DAPL_DBG_TYPE_ERR,
" accept read: psize (%d) wrong\n",
- acm_ptr->dst.p_size);
+ acm_ptr->msg.p_size);
goto bail;
}
- dapl_dbg_log(DAPL_DBG_TYPE_EP, " socket accepted, read private data\n");
-
/* read private data into cm_handle if any present */
- if (acm_ptr->dst.p_size) {
- len =
- recv(acm_ptr->socket, acm_ptr->p_data, acm_ptr->dst.p_size,
- 0);
- if (len != acm_ptr->dst.p_size) {
+ if (exp) {
+ len = recv(acm_ptr->socket, acm_ptr->msg.p_data, exp, 0);
+ if (len != exp) {
dapl_log(DAPL_DBG_TYPE_ERR,
" accept read pdata: ERR %s, rcnt=%d\n",
strerror(errno), len);
goto bail;
}
- dapl_dbg_log(DAPL_DBG_TYPE_EP, " accept: psize=%d read\n", len);
- p_data = acm_ptr->p_data;
+ p_data = acm_ptr->msg.p_data;
}
acm_ptr->state = DCM_ACCEPTING_DATA;
+ dapl_dbg_log(DAPL_DBG_TYPE_EP,
+ " ACCEPT: DST %s %d port=%d lid=0x%x, qpn=0x%x, psz=%d\n",
+ inet_ntoa(((struct sockaddr_in *)
+ &acm_ptr->msg.daddr.so)->sin_addr),
+ ntohs(((struct sockaddr_in *)
+ &acm_ptr->msg.daddr.so)->sin_port),
+ acm_ptr->msg.saddr.ib.port_num,
+ ntohs(acm_ptr->msg.saddr.ib.lid),
+ ntohl(acm_ptr->msg.saddr.ib.qpn), exp);
+
#ifdef DAT_EXTENSIONS
- if (acm_ptr->dst.qp_type == IBV_QPT_UD) {
+ if (acm_ptr->msg.saddr.ib.qp_type == IBV_QPT_UD) {
DAT_IB_EXTENSION_EVENT_DATA xevent;
/* post EVENT, modify_qp created ah */
@@ -970,9 +984,9 @@ static void dapli_socket_accept_data(ib_cm_srvc_handle_t acm_ptr)
dapls_evd_post_cr_event_ext(acm_ptr->sp,
DAT_IB_UD_CONNECTION_REQUEST_EVENT,
acm_ptr,
- (DAT_COUNT) acm_ptr->dst.p_size,
- (DAT_PVOID *) acm_ptr->p_data,
- (DAT_PVOID *) & xevent);
+ (DAT_COUNT) exp,
+ (DAT_PVOID *) acm_ptr->msg.p_data,
+ (DAT_PVOID *) &xevent);
} else
#endif
/* trigger CR event and return SUCCESS */
@@ -980,8 +994,8 @@ static void dapli_socket_accept_data(ib_cm_srvc_handle_t acm_ptr)
IB_CME_CONNECTION_REQUEST_PENDING,
p_data, acm_ptr->sp);
return;
- bail:
- /* close socket, free cm structure, active will see socket close as reject */
+bail:
+ /* close socket, free cm structure, active will see close as rej */
dapls_ib_cm_free(acm_ptr, acm_ptr->ep);
return;
}
@@ -997,11 +1011,11 @@ dapli_socket_accept_usr(DAPL_EP * ep_ptr,
{
DAPL_IA *ia_ptr = ep_ptr->header.owner_ia;
dp_ib_cm_handle_t cm_ptr = cr_ptr->ib_cm_handle;
- ib_qp_cm_t local;
+ ib_cm_msg_t local;
struct iovec iov[2];
- int len;
+ int len, exp = sizeof(ib_cm_msg_t) - DCM_MAX_PDATA_SIZE;
- if (p_size > IB_MAX_REP_PDATA_SIZE)
+ if (p_size > DCM_MAX_PDATA_SIZE)
return DAT_LENGTH_ERROR;
/* must have a accepted socket */
@@ -1009,13 +1023,16 @@ dapli_socket_accept_usr(DAPL_EP * ep_ptr,
return DAT_INTERNAL_ERROR;
dapl_dbg_log(DAPL_DBG_TYPE_EP,
- " ACCEPT_USR: remote port=0x%x lid=0x%x"
+ " ACCEPT_USR: remote port=%d lid=0x%x"
" qpn=0x%x qp_type %d, psize=%d\n",
- cm_ptr->dst.port, cm_ptr->dst.lid,
- cm_ptr->dst.qpn, cm_ptr->dst.qp_type, cm_ptr->dst.p_size);
+ cm_ptr->msg.saddr.ib.port_num,
+ ntohs(cm_ptr->msg.saddr.ib.lid),
+ ntohl(cm_ptr->msg.saddr.ib.qpn),
+ cm_ptr->msg.saddr.ib.qp_type,
+ ntohs(cm_ptr->msg.p_size));
#ifdef DAT_EXTENSIONS
- if (cm_ptr->dst.qp_type == IBV_QPT_UD &&
+ if (cm_ptr->msg.saddr.ib.qp_type == IBV_QPT_UD &&
ep_ptr->qp_handle->qp_type != IBV_QPT_UD) {
dapl_dbg_log(DAPL_DBG_TYPE_ERR,
" ACCEPT_USR: ERR remote QP is UD,"
@@ -1027,22 +1044,28 @@ dapli_socket_accept_usr(DAPL_EP * ep_ptr,
/* modify QP to RTR and then to RTS with remote info already read */
dapl_os_lock(&ep_ptr->header.lock);
if (dapls_modify_qp_state(ep_ptr->qp_handle,
- IBV_QPS_RTR, cm_ptr) != DAT_SUCCESS) {
+ IBV_QPS_RTR,
+ cm_ptr->msg.saddr.ib.qpn,
+ cm_ptr->msg.saddr.ib.lid,
+ NULL) != DAT_SUCCESS) {
dapl_log(DAPL_DBG_TYPE_ERR,
" ACCEPT_USR: QPS_RTR ERR %s -> %s\n",
- strerror(errno), inet_ntoa(((struct sockaddr_in *)
- &cm_ptr->dst.ia_address)->
- sin_addr));
+ strerror(errno),
+ inet_ntoa(((struct sockaddr_in *)
+ &cm_ptr->msg.daddr.so)->sin_addr));
dapl_os_unlock(&ep_ptr->header.lock);
goto bail;
}
if (dapls_modify_qp_state(ep_ptr->qp_handle,
- IBV_QPS_RTS, cm_ptr) != DAT_SUCCESS) {
+ IBV_QPS_RTS,
+ cm_ptr->msg.saddr.ib.qpn,
+ cm_ptr->msg.saddr.ib.lid,
+ NULL) != DAT_SUCCESS) {
dapl_log(DAPL_DBG_TYPE_ERR,
" ACCEPT_USR: QPS_RTS ERR %s -> %s\n",
- strerror(errno), inet_ntoa(((struct sockaddr_in *)
- &cm_ptr->dst.ia_address)->
- sin_addr));
+ strerror(errno),
+ inet_ntoa(((struct sockaddr_in *)
+ &cm_ptr->msg.daddr.so)->sin_addr));
dapl_os_unlock(&ep_ptr->header.lock);
goto bail;
}
@@ -1050,53 +1073,50 @@ dapli_socket_accept_usr(DAPL_EP * ep_ptr,
/* save remote address information */
dapl_os_memcpy(&ep_ptr->remote_ia_address,
- &cm_ptr->dst.ia_address,
- sizeof(ep_ptr->remote_ia_address));
+ &cm_ptr->msg.daddr.so,
+ sizeof(union dcm_addr));
/* send our QP info, IA address, pdata. Don't overwrite dst data */
local.ver = htons(DCM_VER);
- local.rej = 0;
- local.qpn = htonl(ep_ptr->qp_handle->qp_num);
- local.qp_type = htons(ep_ptr->qp_handle->qp_type);
- local.port = htons(ia_ptr->hca_ptr->port_num);
- local.lid = ia_ptr->hca_ptr->ib_trans.lid;
- local.gid = ia_ptr->hca_ptr->ib_trans.gid;
- local.ia_address = ia_ptr->hca_ptr->hca_address;
- ((struct sockaddr_in *)&local.ia_address)->sin_port =
- ntohs(cm_ptr->sp->conn_qual);
-
- local.p_size = htonl(p_size);
+ local.op = htons(DCM_REP);
+ local.saddr.ib.qpn = htonl(ep_ptr->qp_handle->qp_num);
+ local.saddr.ib.qp_type = ep_ptr->qp_handle->qp_type;
+ local.saddr.ib.port_num = ia_ptr->hca_ptr->port_num;
+ local.saddr.ib.lid = ia_ptr->hca_ptr->ib_trans.lid;
+ local.saddr.ib.gid = ia_ptr->hca_ptr->ib_trans.gid;
+ local.daddr.so = ia_ptr->hca_ptr->hca_address;
+ ((struct sockaddr_in *)&local.daddr.so)->sin_port =
+ htons((uint16_t)cm_ptr->sp->conn_qual);
+
+ local.p_size = htons(p_size);
iov[0].iov_base = (void *)&local;
- iov[0].iov_len = sizeof(ib_qp_cm_t);
+ iov[0].iov_len = exp;
if (p_size) {
iov[1].iov_base = p_data;
iov[1].iov_len = p_size;
len = writev(cm_ptr->socket, iov, 2);
- } else {
+ } else
len = writev(cm_ptr->socket, iov, 1);
- }
-
- if (len != (p_size + sizeof(ib_qp_cm_t))) {
+
+ if (len != (p_size + exp)) {
dapl_log(DAPL_DBG_TYPE_ERR,
" ACCEPT_USR: ERR %s, wcnt=%d -> %s\n",
- strerror(errno), len, inet_ntoa(((struct sockaddr_in *)
- &cm_ptr->dst.
- ia_address)->
- sin_addr));
+ strerror(errno), len,
+ inet_ntoa(((struct sockaddr_in *)
+ &cm_ptr->msg.daddr.so)->sin_addr));
goto bail;
}
dapl_dbg_log(DAPL_DBG_TYPE_CM,
- " ACCEPT_USR: local port=0x%x lid=0x%x"
- " qpn=0x%x psize=%d\n",
- ntohs(local.port), ntohs(local.lid),
- ntohl(local.qpn), ntohl(local.p_size));
+ " ACCEPT_USR: local port=%d lid=0x%x qpn=0x%x psz=%d\n",
+ local.saddr.ib.port_num, ntohs(local.saddr.ib.lid),
+ ntohl(local.saddr.ib.qpn), ntohs(local.p_size));
dapl_dbg_log(DAPL_DBG_TYPE_CM,
- " ACCEPT_USR SRC GID subnet %016llx id %016llx\n",
+ " ACCEPT_USR: SRC GID subnet %016llx id %016llx\n",
(unsigned long long)
- htonll(local.gid.global.subnet_prefix),
+ htonll(local.saddr.ib.gid.global.subnet_prefix),
(unsigned long long)
- htonll(local.gid.global.interface_id));
+ htonll(local.saddr.ib.gid.global.interface_id));
/* save state and reference to EP, queue for RTU data */
cm_ptr->ep = ep_ptr;
@@ -1107,7 +1127,7 @@ dapli_socket_accept_usr(DAPL_EP * ep_ptr,
return DAT_SUCCESS;
bail:
dapls_ib_cm_free(cm_ptr, cm_ptr->ep);
- dapls_modify_qp_state(ep_ptr->qp_handle, IBV_QPS_ERR, 0);
+ dapls_modify_qp_state(ep_ptr->qp_handle, IBV_QPS_ERR, 0, 0, 0);
return DAT_INTERNAL_ERROR;
}
@@ -1117,16 +1137,15 @@ dapli_socket_accept_usr(DAPL_EP * ep_ptr,
void dapli_socket_accept_rtu(dp_ib_cm_handle_t cm_ptr)
{
int len;
- short rtu_data = 0;
- /* complete handshake after final QP state change */
- len = recv(cm_ptr->socket, (char *)&rtu_data, sizeof(rtu_data), 0);
- if (len != sizeof(rtu_data) || ntohs(rtu_data) != 0x0e0f) {
+ /* complete handshake after final QP state change, VER and OP */
+ len = recv(cm_ptr->socket, (char *)&cm_ptr->msg, 4, 0);
+ if (len != 4 || ntohs(cm_ptr->msg.op) != DCM_RTU) {
dapl_log(DAPL_DBG_TYPE_ERR,
- " ACCEPT_RTU: ERR %s, rcnt=%d rdata=%x\n",
- strerror(errno), len, ntohs(rtu_data),
+ " ACCEPT_RTU: rcv ERR, rcnt=%d op=%x\n",
+ len, ntohs(cm_ptr->msg.op),
inet_ntoa(((struct sockaddr_in *)
- &cm_ptr->dst.ia_address)->sin_addr));
+ &cm_ptr->msg.daddr.so)->sin_addr));
goto bail;
}
@@ -1137,25 +1156,26 @@ void dapli_socket_accept_rtu(dp_ib_cm_handle_t cm_ptr)
dapl_dbg_log(DAPL_DBG_TYPE_EP, " PASSIVE: connected!\n");
#ifdef DAT_EXTENSIONS
- if (cm_ptr->dst.qp_type == IBV_QPT_UD) {
+ if (cm_ptr->msg.saddr.ib.qp_type == IBV_QPT_UD) {
DAT_IB_EXTENSION_EVENT_DATA xevent;
/* post EVENT, modify_qp created ah */
xevent.status = 0;
xevent.type = DAT_IB_UD_PASSIVE_REMOTE_AH;
xevent.remote_ah.ah = cm_ptr->ah;
- xevent.remote_ah.qpn = cm_ptr->dst.qpn;
+ xevent.remote_ah.qpn = cm_ptr->msg.saddr.ib.qpn;
dapl_os_memcpy(&xevent.remote_ah.ia_addr,
- &cm_ptr->dst.ia_address,
- sizeof(cm_ptr->dst.ia_address));
-
- dapls_evd_post_connection_event_ext((DAPL_EVD *) cm_ptr->ep->
- param.connect_evd_handle,
- DAT_IB_UD_CONNECTION_EVENT_ESTABLISHED,
- (DAT_EP_HANDLE) cm_ptr->ep,
- (DAT_COUNT) cm_ptr->dst.p_size,
- (DAT_PVOID *) cm_ptr->p_data,
- (DAT_PVOID *) &xevent);
+ &cm_ptr->msg.daddr.so,
+ sizeof(union dcm_addr));
+
+ dapls_evd_post_connection_event_ext(
+ (DAPL_EVD *)
+ cm_ptr->ep->param.connect_evd_handle,
+ DAT_IB_UD_CONNECTION_EVENT_ESTABLISHED,
+ (DAT_EP_HANDLE) cm_ptr->ep,
+ (DAT_COUNT) cm_ptr->msg.p_size,
+ (DAT_PVOID *) cm_ptr->msg.p_data,
+ (DAT_PVOID *) &xevent);
/* done with socket, don't destroy cm_ptr, need pdata */
closesocket(cm_ptr->socket);
@@ -1169,7 +1189,7 @@ void dapli_socket_accept_rtu(dp_ib_cm_handle_t cm_ptr)
return;
bail:
- dapls_modify_qp_state(cm_ptr->ep->qp_handle, IBV_QPS_ERR, 0);
+ dapls_modify_qp_state(cm_ptr->ep->qp_handle, IBV_QPS_ERR, 0, 0, 0);
dapls_ib_cm_free(cm_ptr, cm_ptr->ep);
dapls_cr_callback(cm_ptr, IB_CME_DESTINATION_REJECT, NULL, cm_ptr->sp);
}
@@ -1237,7 +1257,7 @@ dapls_ib_disconnect(IN DAPL_EP * ep_ptr, IN DAT_CLOSE_FLAGS close_flags)
"dapls_ib_disconnect(ep_handle %p ....)\n", ep_ptr);
/* Transition to error state to flush queue */
- dapls_modify_qp_state(ep_ptr->qp_handle, IBV_QPS_ERR, 0);
+ dapls_modify_qp_state(ep_ptr->qp_handle, IBV_QPS_ERR, 0, 0, 0);
if (ep_ptr->cm_handle == NULL ||
ep_ptr->param.ep_state == DAT_EP_STATE_DISCONNECTED)
@@ -1429,19 +1449,16 @@ dapls_ib_reject_connection(IN dp_ib_cm_handle_t cm_ptr,
" reject(cm %p reason %x, pdata %p, psize %d)\n",
cm_ptr, reason, pdata, psize);
- if (psize > IB_MAX_REJ_PDATA_SIZE)
+ if (psize > DCM_MAX_PDATA_SIZE)
return DAT_LENGTH_ERROR;
/* write reject data to indicate reject */
if (cm_ptr->socket != DAPL_INVALID_SOCKET) {
- cm_ptr->dst.rej = (uint16_t) reason;
- cm_ptr->dst.rej = htons(cm_ptr->dst.rej);
- cm_ptr->dst.p_size = htonl(psize);
- /* get qp_type from request */
- cm_ptr->dst.qp_type = ntohs(cm_ptr->dst.qp_type);
-
- iov[0].iov_base = (void *)&cm_ptr->dst;
- iov[0].iov_len = sizeof(ib_qp_cm_t);
+ cm_ptr->msg.op = htons(DCM_REJ_USER);
+ cm_ptr->msg.p_size = htons(psize);
+
+ iov[0].iov_base = (void *)&cm_ptr->msg;
+ iov[0].iov_len = sizeof(ib_cm_msg_t) - DCM_MAX_PDATA_SIZE;
if (psize) {
iov[1].iov_base = pdata;
iov[1].iov_len = psize;
@@ -1457,10 +1474,7 @@ dapls_ib_reject_connection(IN dp_ib_cm_handle_t cm_ptr,
/* cr_thread will destroy CR */
cm_ptr->state = DCM_DESTROY;
- if (send(cm_ptr->hca->ib_trans.scm[1], "w", sizeof "w", 0) == -1)
- dapl_log(DAPL_DBG_TYPE_CM,
- " cm_destroy: thread wakeup error = %s\n",
- strerror(errno));
+ send(cm_ptr->hca->ib_trans.scm[1], "w", sizeof "w", 0);
return DAT_SUCCESS;
}
@@ -1501,7 +1515,7 @@ dapls_ib_cm_remote_addr(IN DAT_HANDLE dat_handle,
return DAT_INVALID_HANDLE;
dapl_os_memcpy(remote_ia_address,
- &ib_cm_handle->dst.ia_address, sizeof(DAT_SOCK_ADDR6));
+ &ib_cm_handle->msg.daddr.so, sizeof(DAT_SOCK_ADDR6));
return DAT_SUCCESS;
}
@@ -1533,38 +1547,16 @@ int dapls_ib_private_data_size(IN DAPL_PRIVATE * prd_ptr,
int size;
switch (conn_op) {
- case DAPL_PDATA_CONN_REQ:
- {
- size = IB_MAX_REQ_PDATA_SIZE;
- break;
- }
- case DAPL_PDATA_CONN_REP:
- {
- size = IB_MAX_REP_PDATA_SIZE;
- break;
- }
- case DAPL_PDATA_CONN_REJ:
- {
- size = IB_MAX_REJ_PDATA_SIZE;
+ case DAPL_PDATA_CONN_REQ:
+ case DAPL_PDATA_CONN_REP:
+ case DAPL_PDATA_CONN_REJ:
+ case DAPL_PDATA_CONN_DREQ:
+ case DAPL_PDATA_CONN_DREP:
+ size = DCM_MAX_PDATA_SIZE;
break;
- }
- case DAPL_PDATA_CONN_DREQ:
- {
- size = IB_MAX_DREQ_PDATA_SIZE;
- break;
- }
- case DAPL_PDATA_CONN_DREP:
- {
- size = IB_MAX_DREP_PDATA_SIZE;
- break;
- }
- default:
- {
+ default:
size = 0;
- }
-
- } /* end case */
-
+ }
return size;
}
@@ -1717,27 +1709,26 @@ void cr_thread(void *arg)
continue;
event = (cr->state == DCM_CONN_PENDING) ?
- DAPL_FD_WRITE : DAPL_FD_READ;
+ DAPL_FD_WRITE : DAPL_FD_READ;
+
if (dapl_fd_set(cr->socket, set, event)) {
dapl_log(DAPL_DBG_TYPE_ERR,
" cr_thread: DESTROY CR st=%d fd %d"
" -> %s\n", cr->state, cr->socket,
inet_ntoa(((struct sockaddr_in *)
- &cr->dst.ia_address)->
- sin_addr));
+ &cr->msg.daddr.so)->sin_addr));
dapls_ib_cm_free(cr, cr->ep);
continue;
}
dapl_dbg_log(DAPL_DBG_TYPE_CM,
- " poll cr=%p, socket=%d\n", cr,
- cr->socket);
+ " poll cr=%p, sck=%d\n", cr, cr->socket);
dapl_os_unlock(&hca_ptr->ib_trans.lock);
ret = dapl_poll(cr->socket, event);
dapl_dbg_log(DAPL_DBG_TYPE_CM,
- " poll ret=0x%x cr->state=%d socket=%d\n",
+ " poll ret=0x%x cr->state=%d sck=%d\n",
ret, cr->state, cr->socket);
/* data on listen, qp exchange, and on disc req */
@@ -1783,7 +1774,7 @@ void cr_thread(void *arg)
" poll=%d cr->st=%s sk=%d ep %p, %d\n",
ret, dapl_cm_state_str(cr->state),
cr->socket, cr->ep,
- cr->ep ? cr->ep->param.ep_state:0);
+ cr->ep ? cr->ep->param.ep_state : 0);
dapli_socket_disconnect(cr);
}
dapl_os_lock(&hca_ptr->ib_trans.lock);
@@ -1846,17 +1837,17 @@ void dapls_print_cm_list(IN DAPL_IA *ia_ptr)
printf( " CONN[%d]: sp %p ep %p sock %d %s %s %s %s %d\n",
i, cr->sp, cr->ep, cr->socket,
- cr->dst.qp_type == IBV_QPT_RC ? "RC" : "UD",
+ cr->msg.saddr.ib.qp_type == IBV_QPT_RC ? "RC" : "UD",
dapl_cm_state_str(cr->state),
cr->sp ? "<-" : "->",
cr->state == DCM_LISTEN ?
inet_ntoa(((struct sockaddr_in *)
&ia_ptr->hca_ptr->hca_address)->sin_addr) :
inet_ntoa(((struct sockaddr_in *)
- &cr->dst.ia_address)->sin_addr),
+ &cr->msg.daddr.so)->sin_addr),
cr->sp ? (int)cr->sp->conn_qual :
ntohs(((struct sockaddr_in *)
- &cr->dst.ia_address)->sin_port));
+ &cr->msg.daddr.so)->sin_port));
i++;
}
printf("\n");
diff --git a/dapl/openib_scm/dapl_ib_util.h b/dapl/openib_scm/dapl_ib_util.h
index 933364c..d6950fa 100644
--- a/dapl/openib_scm/dapl_ib_util.h
+++ b/dapl/openib_scm/dapl_ib_util.h
@@ -40,8 +40,7 @@ struct ib_cm_handle
struct dapl_hca *hca;
struct dapl_sp *sp;
struct dapl_ep *ep;
- ib_qp_cm_t dst;
- unsigned char p_data[256]; /* must follow ib_qp_cm_t */
+ ib_cm_msg_t msg;
struct ibv_ah *ah;
};
@@ -66,15 +65,6 @@ typedef dp_ib_cm_handle_t ib_cm_srvc_handle_t;
#define SCM_HOP_LIMIT 0xff
#define SCM_TCLASS 0
-/* CM private data areas */
-#define IB_MAX_REQ_PDATA_SIZE 92
-#define IB_MAX_REP_PDATA_SIZE 196
-#define IB_MAX_REJ_PDATA_SIZE 148
-#define IB_MAX_DREQ_PDATA_SIZE 220
-#define IB_MAX_DREP_PDATA_SIZE 224
-#define IB_MAX_RTU_PDATA_SIZE 224
-
-
/* ib_hca_transport_t, specific to this implementation */
typedef struct _ib_hca_transport
{
@@ -120,11 +110,8 @@ void dapli_cq_thread_destroy(struct dapl_hca *hca_ptr);
void dapli_async_event_cb(struct _ib_hca_transport *tp);
void dapli_cq_event_cb(struct _ib_hca_transport *tp);
DAT_RETURN dapli_socket_disconnect(dp_ib_cm_handle_t cm_ptr);
-void dapls_print_cm_list(IN DAPL_IA *ia_ptr);
dp_ib_cm_handle_t dapls_ib_cm_create(DAPL_EP *ep);
void dapls_ib_cm_free(dp_ib_cm_handle_t cm, DAPL_EP *ep);
-DAT_RETURN dapls_modify_qp_state(IN ib_qp_handle_t qp_handle,
- IN ib_qp_state_t qp_state,
- IN dp_ib_cm_handle_t cm);
+void dapls_print_cm_list(IN DAPL_IA *ia_ptr);
#endif /* _DAPL_IB_UTIL_H_ */
diff --git a/dapl/openib_ucm/README b/dapl/openib_ucm/README
new file mode 100644
index 0000000..239dfe6
--- /dev/null
+++ b/dapl/openib_ucm/README
@@ -0,0 +1,40 @@
+
+OpenIB uDAPL provider using socket-based CM, in leiu of uCM/uAT, to setup QP/channels.
+
+to build:
+
+cd dapl/udapl
+make VERBS=openib_scm clean
+make VERBS=openib_scm
+
+
+Modifications to common code:
+
+- added dapl/openib_scm directory
+
+ dapl/udapl/Makefile
+
+New files for openib_scm provider
+
+ dapl/openib/dapl_ib_cq.c
+ dapl/openib/dapl_ib_dto.h
+ dapl/openib/dapl_ib_mem.c
+ dapl/openib/dapl_ib_qp.c
+ dapl/openib/dapl_ib_util.c
+ dapl/openib/dapl_ib_util.h
+ dapl/openib/dapl_ib_cm.c
+
+A simple dapl test just for openib_scm testing...
+
+ test/dtest/dtest.c
+ test/dtest/makefile
+
+ server: dtest -s
+ client: dtest -h hostname
+
+known issues:
+
+ no memory windows support in ibverbs, dat_create_rmr fails.
+
+
+
diff --git a/dapl/openib_ucm/SOURCES b/dapl/openib_ucm/SOURCES
new file mode 100644
index 0000000..dfe956f
--- /dev/null
+++ b/dapl/openib_ucm/SOURCES
@@ -0,0 +1,53 @@
+!if $(FREEBUILD)
+TARGETNAME=dapl2-ofa-ucm
+!else
+TARGETNAME=dapl2-ofa-ucmd
+!endif
+
+TARGETPATH = ..\..\..\..\bin\user\obj$(BUILD_ALT_DIR)
+TARGETTYPE = DYNLINK
+DLLENTRY = _DllMainCRTStartup
+
+!if $(_NT_TOOLS_VERSION) == 0x700
+DLLDEF=$O\udapl_ofa_ucm_exports.def
+!else
+DLLDEF=$(OBJ_PATH)\$O\udapl_ofa_ucm_exports.def
+!endif
+
+USE_MSVCRT = 1
+
+SOURCES = \
+ udapl.rc \
+ ..\dapl_common_src.c \
+ ..\dapl_udapl_src.c \
+ dapl_ib_cq.c \
+ dapl_ib_extensions.c \
+ dapl_ib_mem.c \
+ dapl_ib_qp.c \
+ dapl_ib_util.c \
+ dapl_ib_cm.c
+
+INCLUDES = ..\include;..\common;windows;..\..\dat\include;\
+ ..\..\dat\udat\windows;..\udapl\windows;\
+ ..\..\..\..\inc;..\..\..\..\inc\user;..\..\..\libibverbs\include
+
+DAPL_OPTS = -DEXPORT_DAPL_SYMBOLS -DDAT_EXTENSIONS -DSOCK_CM -DOPENIB -DCQ_WAIT_OBJECT
+
+USER_C_FLAGS = $(USER_C_FLAGS) $(DAPL_OPTS)
+
+!if !$(FREEBUILD)
+USER_C_FLAGS = $(USER_C_FLAGS) -DDAPL_DBG
+!endif
+
+TARGETLIBS= \
+ $(SDK_LIB_PATH)\kernel32.lib \
+ $(SDK_LIB_PATH)\ws2_32.lib \
+!if $(FREEBUILD)
+ $(TARGETPATH)\*\dat2.lib \
+ $(TARGETPATH)\*\libibverbs.lib
+!else
+ $(TARGETPATH)\*\dat2d.lib \
+ $(TARGETPATH)\*\libibverbsd.lib
+!endif
+
+MSC_WARNING_LEVEL = /W1 /wd4113
diff --git a/dapl/openib_ucm/cm.c b/dapl/openib_ucm/cm.c
new file mode 100644
index 0000000..ab3823e
--- /dev/null
+++ b/dapl/openib_ucm/cm.c
@@ -0,0 +1,1837 @@
+/*
+ * Copyright (c) 2009 Intel Corporation. All rights reserved.
+ *
+ * This Software is licensed under one of the following licenses:
+ *
+ * 1) under the terms of the "Common Public License 1.0" a copy of which is
+ * available from the Open Source Initiative, see
+ * http://www.opensource.org/licenses/cpl.php.
+ *
+ * 2) under the terms of the "The BSD License" a copy of which is
+ * available from the Open Source Initiative, see
+ * http://www.opensource.org/licenses/bsd-license.php.
+ *
+ * 3) under the terms of the "GNU General Public License (GPL) Version 2" a
+ * copy of which is available from the Open Source Initiative, see
+ * http://www.opensource.org/licenses/gpl-license.php.
+ *
+ * Licensee has the right to choose one of the above licenses.
+ *
+ * Redistributions of source code must retain the above copyright
+ * notice and one of the license notices.
+ *
+ * Redistributions in binary form must reproduce both the above copyright
+ * notice, one of the license notices in the documentation
+ * and/or other materials provided with the distribution.
+ */
+
+#include "dapl.h"
+#include "dapl_adapter_util.h"
+#include "dapl_evd_util.h"
+#include "dapl_cr_util.h"
+#include "dapl_name_service.h"
+#include "dapl_ib_util.h"
+#include "dapl_osd.h"
+
+
+#if defined(_WIN32) || defined(_WIN64)
+enum DAPL_FD_EVENTS {
+ DAPL_FD_READ = 0x1,
+ DAPL_FD_WRITE = 0x2,
+ DAPL_FD_ERROR = 0x4
+};
+
+struct dapl_fd_set {
+ struct fd_set set[3];
+};
+
+static struct dapl_fd_set *dapl_alloc_fd_set(void)
+{
+ return dapl_os_alloc(sizeof(struct dapl_fd_set));
+}
+
+static void dapl_fd_zero(struct dapl_fd_set *set)
+{
+ FD_ZERO(&set->set[0]);
+ FD_ZERO(&set->set[1]);
+ FD_ZERO(&set->set[2]);
+}
+
+static int dapl_fd_set(DAPL_SOCKET s, struct dapl_fd_set *set,
+ enum DAPL_FD_EVENTS event)
+{
+ FD_SET(s, &set->set[(event == DAPL_FD_READ) ? 0 : 1]);
+ FD_SET(s, &set->set[2]);
+ return 0;
+}
+
+static enum DAPL_FD_EVENTS dapl_poll(DAPL_SOCKET s, enum DAPL_FD_EVENTS event)
+{
+ struct fd_set rw_fds;
+ struct fd_set err_fds;
+ struct timeval tv;
+ int ret;
+
+ FD_ZERO(&rw_fds);
+ FD_ZERO(&err_fds);
+ FD_SET(s, &rw_fds);
+ FD_SET(s, &err_fds);
+
+ tv.tv_sec = 0;
+ tv.tv_usec = 0;
+
+ if (event == DAPL_FD_READ)
+ ret = select(1, &rw_fds, NULL, &err_fds, &tv);
+ else
+ ret = select(1, NULL, &rw_fds, &err_fds, &tv);
+
+ if (ret == 0)
+ return 0;
+ else if (ret == SOCKET_ERROR)
+ return WSAGetLastError();
+ else if (FD_ISSET(s, &rw_fds))
+ return event;
+ else
+ return DAPL_FD_ERROR;
+}
+
+static int dapl_select(struct dapl_fd_set *set)
+{
+ int ret;
+
+ dapl_dbg_log(DAPL_DBG_TYPE_CM, " dapl_select: sleep\n");
+ ret = select(0, &set->set[0], &set->set[1], &set->set[2], NULL);
+ dapl_dbg_log(DAPL_DBG_TYPE_CM, " dapl_select: wakeup\n");
+
+ if (ret == SOCKET_ERROR)
+ dapl_dbg_log(DAPL_DBG_TYPE_CM,
+ " dapl_select: error 0x%x\n", WSAGetLastError());
+
+ return ret;
+}
+#else // _WIN32 || _WIN64
+enum DAPL_FD_EVENTS {
+ DAPL_FD_READ = POLLIN,
+ DAPL_FD_WRITE = POLLOUT,
+ DAPL_FD_ERROR = POLLERR
+};
+
+struct dapl_fd_set {
+ int index;
+ struct pollfd set[DAPL_FD_SETSIZE];
+};
+
+static struct dapl_fd_set *dapl_alloc_fd_set(void)
+{
+ return dapl_os_alloc(sizeof(struct dapl_fd_set));
+}
+
+static void dapl_fd_zero(struct dapl_fd_set *set)
+{
+ set->index = 0;
+}
+
+static int dapl_fd_set(DAPL_SOCKET s, struct dapl_fd_set *set,
+ enum DAPL_FD_EVENTS event)
+{
+ if (set->index == DAPL_FD_SETSIZE - 1) {
+ dapl_log(DAPL_DBG_TYPE_ERR,
+ "SCM ERR: cm_thread exceeded FD_SETSIZE %d\n",
+ set->index + 1);
+ return -1;
+ }
+
+ set->set[set->index].fd = s;
+ set->set[set->index].revents = 0;
+ set->set[set->index++].events = event;
+ return 0;
+}
+
+static enum DAPL_FD_EVENTS dapl_poll(DAPL_SOCKET s, enum DAPL_FD_EVENTS event)
+{
+ struct pollfd fds;
+ int ret;
+
+ fds.fd = s;
+ fds.events = event;
+ fds.revents = 0;
+ ret = poll(&fds, 1, 0);
+ dapl_log(DAPL_DBG_TYPE_CM, " dapl_poll: fd=%d ret=%d, evnts=0x%x\n",
+ s, ret, fds.revents);
+ if (ret == 0)
+ return 0;
+ else if (fds.revents & (POLLERR | POLLHUP | POLLNVAL))
+ return DAPL_FD_ERROR;
+ else
+ return fds.revents;
+}
+
+static int dapl_select(struct dapl_fd_set *set)
+{
+ int ret;
+
+ dapl_dbg_log(DAPL_DBG_TYPE_CM, " dapl_select: sleep, fds=%d\n",
+ set->index);
+ ret = poll(set->set, set->index, -1);
+ dapl_dbg_log(DAPL_DBG_TYPE_CM, " dapl_select: wakeup, ret=0x%x\n", ret);
+ return ret;
+}
+#endif
+
+/* forward declarations */
+static void ucm_accept(ib_cm_srvc_handle_t cm, ib_cm_msg_t *msg);
+static void ucm_connect_rtu(dp_ib_cm_handle_t cm, ib_cm_msg_t *msg);
+static void ucm_accept_rtu(dp_ib_cm_handle_t cm, ib_cm_msg_t *msg);
+static int ucm_send(ib_hca_transport_t *tp, ib_cm_msg_t *msg);
+DAT_RETURN dapli_cm_disconnect(dp_ib_cm_handle_t cm);
+
+#define UCM_SND_BURST 100
+
+/* Service ids - port space */
+static uint16_t ucm_get_port(ib_hca_transport_t *tp, uint16_t port)
+{
+ int i = 0;
+
+ dapl_os_lock(&tp->plock);
+ /* get specific ID */
+ if (port) {
+ if (tp->sid[port] == 0) {
+ tp->sid[port] = 1;
+ i = port;
+ }
+ goto done;
+ }
+
+ /* get any free ID */
+ for (i = 0xffff; i > 0; i--) {
+ if (tp->sid[i] == 0) {
+ tp->sid[i] = 1;
+ break;
+ }
+ }
+done:
+ dapl_os_unlock(&tp->plock);
+ return i;
+}
+
+static void ucm_free_port(ib_hca_transport_t *tp, uint16_t port)
+{
+ dapl_os_lock(&tp->plock);
+ tp->sid[port] = 0;
+ dapl_os_unlock(&tp->plock);
+}
+
+/* SEND CM MESSAGE PROCESSING */
+
+/* Get CM UD message from send queue, called with s_lock held */
+static ib_cm_msg_t *ucm_get_smsg(ib_hca_transport_t *tp)
+{
+ ib_cm_msg_t *msg = NULL;
+ int ret, polled = 0, hd = tp->s_hd;
+
+ hd++;
+retry:
+ if (hd == tp->qpe)
+ hd = 0;
+
+ if (hd == tp->s_tl)
+ msg = NULL;
+ else {
+ msg = &tp->sbuf[hd];
+ tp->s_hd = hd; /* new hd */
+ }
+
+ /* if empty, process some completions */
+ if ((msg == NULL) && (!polled)) {
+ struct ibv_wc wc;
+
+ /* process completions, based on UCM_SND_BURST */
+ ret = ibv_poll_cq(tp->scq, 1, &wc);
+ if (ret < 0) {
+ dapl_log(DAPL_DBG_TYPE_WARN,
+ " get_smsg: cq %p %s\n",
+ tp->scq, strerror(errno));
+ }
+ /* free up completed sends, update tail */
+ if (ret > 0) {
+ tp->s_tl = (int)wc.wr_id;
+ dapl_log(DAPL_DBG_TYPE_CM,
+ " get_smsg: wr_cmp (%d) s_tl=%d\n",
+ wc.status, tp->s_tl);
+ }
+ polled++;
+ goto retry;
+ }
+ return msg;
+}
+
+/* RECEIVE CM MESSAGE PROCESSING */
+
+static int ucm_post_rmsg(ib_hca_transport_t *tp, ib_cm_msg_t *msg)
+{
+ struct ibv_recv_wr recv_wr, *recv_err;
+ struct ibv_sge sge;
+
+ recv_wr.next = NULL;
+ recv_wr.sg_list = &sge;
+ recv_wr.num_sge = 1;
+ recv_wr.wr_id = (uint64_t)(uintptr_t) msg;
+ sge.length = sizeof(ib_cm_msg_t) + sizeof(struct ibv_grh);
+ sge.lkey = tp->mr_rbuf->lkey;
+ sge.addr = (uintptr_t)((char *)msg - sizeof(struct ibv_grh));
+
+ return (ibv_post_recv(tp->qp, &recv_wr, &recv_err));
+}
+
+static int ucm_reject(ib_hca_transport_t *tp, ib_cm_msg_t *msg)
+{
+ ib_cm_msg_t smsg;
+
+ /* setup op, rearrange the src, dst cm and addr info */
+ (void)dapl_os_memzero(&smsg, sizeof(smsg));
+ smsg.ver = htons(DCM_VER);
+ smsg.op = htons(DCM_REJ_CM);
+ smsg.dport = msg->sport;
+ smsg.dqpn = msg->sqpn;
+ smsg.sport = msg->dport;
+ smsg.sqpn = msg->dqpn;
+
+ dapl_os_memcpy(&smsg.daddr, &msg->saddr, sizeof(union dcm_addr));
+ dapl_os_memcpy(&smsg.saddr, &msg->daddr, sizeof(union dcm_addr));
+
+ dapl_dbg_log(DAPL_DBG_TYPE_CM,
+ " CM reject -> LID %x, QPN %x PORT %d\n",
+ ntohs(smsg.daddr.ib.lid),
+ ntohl(smsg.dqpn), ntohs(smsg.dport));
+
+ return (ucm_send(tp, &smsg));
+}
+
+static void ucm_process_recv(ib_hca_transport_t *tp,
+ ib_cm_msg_t *msg,
+ dp_ib_cm_handle_t cm)
+{
+ dapl_os_lock(&cm->lock);
+ switch (cm->state) {
+ case DCM_LISTEN:
+ dapl_dbg_log(DAPL_DBG_TYPE_CM, " ucm_recv: LISTEN\n");
+ dapl_os_unlock(&cm->lock);
+ ucm_accept(cm, msg);
+ break;
+ case DCM_ACCEPTED:
+ dapl_dbg_log(DAPL_DBG_TYPE_CM, " ucm_recv: ACCEPT_RTU\n");
+ dapl_os_unlock(&cm->lock);
+ ucm_accept_rtu(cm, msg);
+ break;
+ case DCM_CONN_PENDING:
+ dapl_dbg_log(DAPL_DBG_TYPE_CM, " ucm_recv: CONN_RTU\n");
+ dapl_os_unlock(&cm->lock);
+ ucm_connect_rtu(cm, msg);
+ break;
+ case DCM_CONNECTED:
+ dapl_dbg_log(DAPL_DBG_TYPE_CM, " ucm_recv: DREQ connect\n");
+ dapl_os_unlock(&cm->lock);
+ if (ntohs(msg->op) == DCM_DREQ)
+ dapli_cm_disconnect(cm);
+ break;
+ case DCM_DISC_PENDING:
+ case DCM_DESTROY:
+ dapl_dbg_log(DAPL_DBG_TYPE_CM, " ucm_recv: DREQ toss\n");
+ break;
+ default:
+ dapl_log(DAPL_DBG_TYPE_WARN,
+ " process_recv: UNKNOWN state"
+ " <- op %d, st %d spsp %d sqpn %d\n",
+ ntohs(msg->op), cm->state,
+ ntohs(msg->sport), ntohl(msg->sqpn));
+ dapl_os_unlock(&cm->lock);
+ break;
+ }
+}
+
+/* Find matching CM object for this receive message, return CM reference */
+dp_ib_cm_handle_t ucm_cm_find(ib_hca_transport_t *tp, ib_cm_msg_t *msg)
+{
+ dp_ib_cm_handle_t cm, next, found = NULL;
+ struct dapl_llist_entry *list;
+ DAPL_OS_LOCK lock;
+
+ /* connect request - listen list, otherwise conn list */
+ if (ntohs(msg->op) == DCM_REQ) {
+ dapl_dbg_log(DAPL_DBG_TYPE_CM," search - listenQ\n");
+ list = tp->llist;
+ lock = tp->llock;
+ } else {
+ dapl_dbg_log(DAPL_DBG_TYPE_CM," search - connectQ\n");
+ list = tp->list;
+ lock = tp->lock;
+ }
+
+ dapl_os_lock(&lock);
+ if (!dapl_llist_is_empty(&list))
+ next = dapl_llist_peek_head(&list);
+ else
+ next = NULL;
+
+ while (next) {
+ cm = next;
+ next = dapl_llist_next_entry(&list,
+ (DAPL_LLIST_ENTRY *)&cm->entry);
+ if (cm->state == DCM_DESTROY)
+ continue;
+
+ dapl_dbg_log(DAPL_DBG_TYPE_CM,
+ " MATCH? cm %p st %s sport %x sqpn %x lid %x\n",
+ cm, dapl_cm_state_str(cm->state),
+ ntohs(cm->msg.sport), ntohl(cm->msg.sqpn),
+ ntohs(cm->msg.saddr.ib.lid));
+
+ dapl_dbg_log(DAPL_DBG_TYPE_CM,
+ " src port %d=%d, sqp %x=%x slid %x=%x, iqp %x=%x\n",
+ ntohs(cm->msg.sport), ntohs(msg->dport),
+ ntohl(cm->msg.sqpn), ntohl(msg->dqpn),
+ ntohs(cm->msg.saddr.ib.lid),
+ ntohs(msg->daddr.ib.lid),
+ ntohl(cm->msg.saddr.ib.qpn),
+ ntohl(msg->daddr.ib.qpn));
+ dapl_dbg_log(DAPL_DBG_TYPE_CM,
+ " dst port %d=%d, sqp %x=%x slid %x=%x, iqp %x=%x\n",
+ ntohs(cm->msg.dport), ntohs(msg->sport),
+ ntohl(cm->msg.dqpn), ntohl(msg->sqpn),
+ ntohs(cm->msg.daddr.ib.lid),
+ ntohs(msg->saddr.ib.lid),
+ ntohl(cm->msg.daddr.ib.qpn),
+ ntohl(msg->saddr.ib.qpn));
+
+ /* REQ: CM sPORT + QPN, match is good enough */
+ if ((cm->msg.sport == msg->dport) &&
+ (cm->msg.sqpn == msg->dqpn)) {
+ if (ntohs(msg->op) == DCM_REQ) {
+ found = cm;
+ break;
+ /* NOT REQ: add remote CM sPORT, QPN, LID match */
+ } else if ((cm->msg.dport == msg->sport) &&
+ (cm->msg.dqpn == msg->sqpn) &&
+ (cm->msg.daddr.ib.lid ==
+ msg->saddr.ib.lid)) {
+ found = cm;
+ break;
+ }
+ }
+ }
+ dapl_os_unlock(&lock);
+ return found;
+}
+
+/* Get rmsgs from CM completion queue, 10 at a time */
+static void ucm_recv(ib_hca_transport_t *tp)
+{
+ struct ibv_wc wc[10];
+ ib_cm_msg_t *msg;
+ dp_ib_cm_handle_t cm;
+ int i, ret, notify = 0;
+ struct ibv_cq *ibv_cq = NULL;
+ DAPL_HCA *hca;
+
+ /* POLLIN on channel FD */
+ ret = ibv_get_cq_event(tp->rch, &ibv_cq, (void *)&hca);
+ if (ret == 0) {
+ ibv_ack_cq_events(ibv_cq, 1);
+ }
+retry:
+ ret = ibv_poll_cq(tp->rcq, 10, wc);
+ if (ret <= 0) {
+ if (!ret && !notify) {
+ ibv_req_notify_cq(tp->rcq, 0);
+ notify = 1;
+ goto retry;
+ }
+ return;
+ } else
+ notify = 0;
+
+ for (i = 0; i < ret; i++) {
+ msg = (ib_cm_msg_t*)wc[i].wr_id;
+
+ dapl_dbg_log(DAPL_DBG_TYPE_CM,
+ " ucm_recv: wc status=%d, ln=%d id=%p sqp=%x\n",
+ wc[i].status, wc[i].byte_len,
+ (void*)wc[i].wr_id, wc[i].src_qp);
+
+ /* validate CM message, version */
+ if (ntohs(msg->ver) != DCM_VER) {
+ dapl_log(DAPL_DBG_TYPE_WARN,
+ " ucm_recv: UNKNOWN msg %p, ver %d\n",
+ msg, msg->ver);
+ ucm_post_rmsg(tp, msg);
+ continue;
+ }
+ if (!(cm = ucm_cm_find(tp, msg))) {
+ dapl_log(DAPL_DBG_TYPE_CM,
+ " ucm_recv: NO MATCH op %d port %d cqp %x\n",
+ ntohs(msg->op), ntohs(msg->dport),
+ ntohl(msg->dqpn));
+ if (ntohs(msg->op) == DCM_REQ)
+ ucm_reject(tp, msg);
+ ucm_post_rmsg(tp, msg);
+ continue;
+ }
+ dapl_dbg_log(DAPL_DBG_TYPE_CM, " ucm_recv: match %p\n",cm);
+
+ /* match, process it */
+ ucm_process_recv(tp, msg, cm);
+ ucm_post_rmsg(tp, msg);
+ }
+
+ /* finished this batch of WC's, poll and rearm */
+ goto retry;
+
+}
+
+/* ACTIVE/PASSIVE: build and send CM message out of CM object */
+static int ucm_send(ib_hca_transport_t *tp, ib_cm_msg_t *msg)
+{
+ ib_cm_msg_t *smsg = NULL;
+ struct ibv_send_wr wr, *bad_wr;
+ struct ibv_sge sge;
+ int len, ret = -1;
+ uint16_t dlid = ntohs(msg->daddr.ib.lid);
+
+ /* Get message from send queue, copy data, and send */
+ dapl_os_lock(&tp->slock);
+ if ((smsg = ucm_get_smsg(tp)) == NULL)
+ goto bail;
+
+ len = ((sizeof(*msg) - DCM_MAX_PDATA_SIZE) + ntohs(msg->p_size));
+ dapl_os_memcpy(smsg, msg, len);
+
+ wr.next = NULL;
+ wr.sg_list = &sge;
+ wr.num_sge = 1;
+ wr.opcode = IBV_WR_SEND;
+ wr.wr_id = (unsigned long)tp->s_hd;
+ wr.send_flags = (wr.wr_id % UCM_SND_BURST) ? 0 : IBV_SEND_SIGNALED;
+ if (len <= tp->max_inline_send)
+ wr.send_flags |= IBV_SEND_INLINE;
+
+ sge.length = len;
+ sge.lkey = tp->mr_sbuf->lkey;
+ sge.addr = (uintptr_t)smsg;
+
+ dapl_dbg_log(DAPL_DBG_TYPE_CM,
+ " ucm_send: op %d ln %d lid %x c_qpn %x rport %d\n",
+ ntohs(smsg->op), len, htons(smsg->daddr.ib.lid),
+ htonl(smsg->dqpn), htons(smsg->dport));
+
+ /* empty slot, then create AH */
+ if (!tp->ah[dlid]) {
+ tp->ah[dlid] =
+ dapls_create_ah(tp->hca, tp->pd, tp->qp,
+ htons(dlid), NULL);
+ if (!tp->ah[dlid])
+ goto bail;
+ }
+
+ wr.wr.ud.ah = tp->ah[dlid];
+ wr.wr.ud.remote_qpn = ntohl(smsg->dqpn);
+ wr.wr.ud.remote_qkey = DAT_UD_QKEY;
+
+ ret = ibv_post_send(tp->qp, &wr, &bad_wr);
+bail:
+ dapl_os_unlock(&tp->slock);
+ return ret;
+}
+
+/* ACTIVE/PASSIVE: CM objects */
+dp_ib_cm_handle_t dapls_ib_cm_create(DAPL_EP *ep)
+{
+ dp_ib_cm_handle_t cm;
+
+ /* Allocate CM, init lock, and initialize */
+ if ((cm = dapl_os_alloc(sizeof(*cm))) == NULL)
+ return NULL;
+
+ (void)dapl_os_memzero(cm, sizeof(*cm));
+ if (dapl_os_lock_init(&cm->lock))
+ goto bail;
+
+ cm->msg.ver = htons(DCM_VER);
+
+ /* ACTIVE: init source address QP info from local EP */
+ if (ep) {
+ DAPL_HCA *hca = ep->header.owner_ia->hca_ptr;
+
+ cm->msg.sport = htons(ucm_get_port(&hca->ib_trans, 0));
+ if (!cm->msg.sport)
+ goto bail;
+
+ /* IB info in network order */
+ cm->ep = ep;
+ cm->hca = hca;
+ cm->msg.sqpn = htonl(hca->ib_trans.qp->qp_num); /* ucm */
+ cm->msg.saddr.ib.qpn = htonl(ep->qp_handle->qp_num); /* ep */
+ cm->msg.saddr.ib.qp_type = ep->qp_handle->qp_type;
+ cm->msg.saddr.ib.port_num = hca->port_num;
+ cm->msg.saddr.ib.lid = hca->ib_trans.addr.ib.lid;
+ cm->msg.saddr.ib.gid = hca->ib_trans.addr.ib.gid;
+ }
+ return cm;
+bail:
+ dapl_os_free(cm, sizeof(*cm));
+ return NULL;
+}
+
+/*
+ * UD CR objects are kept active because of direct private data references
+ * from CONN events. The cr->socket is closed and marked inactive but the
+ * object remains allocated and queued on the CR resource list. There can
+ * be multiple CR's associated with a given EP. There is no way to determine
+ * when consumer is finished with event until the dat_ep_free.
+ *
+ * Schedule destruction for all CR's associated with this EP, cr_thread will
+ * complete the cleanup with state == DCM_DESTROY.
+ */
+static void ucm_ud_free(DAPL_EP *ep)
+{
+ DAPL_IA *ia = ep->header.owner_ia;
+ DAPL_HCA *hca = NULL;
+ ib_hca_transport_t *tp = &ia->hca_ptr->ib_trans;
+ dp_ib_cm_handle_t cr, next;
+
+ dapl_os_lock(&tp->lock);
+ if (!dapl_llist_is_empty((DAPL_LLIST_HEAD*)&tp->list))
+ next = dapl_llist_peek_head((DAPL_LLIST_HEAD*)&tp->list);
+ else
+ next = NULL;
+
+ while (next) {
+ cr = next;
+ next = dapl_llist_next_entry((DAPL_LLIST_HEAD*)&tp->list,
+ (DAPL_LLIST_ENTRY*)&cr->entry);
+ if (cr->ep == ep) {
+ dapl_dbg_log(DAPL_DBG_TYPE_EP,
+ " qp_free CR: ep %p cr %p\n", ep, cr);
+ dapl_os_lock(&cr->lock);
+ hca = cr->hca;
+ cr->ep = NULL;
+ cr->state = DCM_DESTROY;
+ dapl_os_unlock(&cr->lock);
+ }
+ }
+ dapl_os_unlock(&tp->lock);
+
+ /* wakeup work thread if necessary */
+ if (hca)
+ send(tp->scm[1], "w", sizeof "w", 0);
+}
+
+/* mark for destroy, remove all references, schedule cleanup */
+/* cm_ptr == NULL (UD), then multi CR's, kill all associated with EP */
+void dapls_ib_cm_free(dp_ib_cm_handle_t cm, DAPL_EP *ep)
+{
+ dapl_dbg_log(DAPL_DBG_TYPE_CM,
+ " cm_destroy: cm %p ep %p\n", cm, ep);
+
+ if (!cm && ep)
+ return (ucm_ud_free(ep));
+
+ dapl_os_lock(&cm->lock);
+
+ /* client, release local conn id port */
+ if (!cm->sp && cm->msg.sport)
+ ucm_free_port(&cm->hca->ib_trans, cm->msg.sport);
+
+ /* cleanup, never made it to work queue */
+ if (cm->state == DCM_INIT) {
+ dapl_os_unlock(&cm->lock);
+ dapl_os_free(cm, sizeof(*cm));
+ return;
+ }
+
+ /* free could be called before disconnect, disc_clean will destroy */
+ if (cm->state == DCM_CONNECTED) {
+ dapl_os_unlock(&cm->lock);
+ dapli_cm_disconnect(cm);
+ return;
+ }
+
+ cm->state = DCM_DESTROY;
+ if ((cm->ep) && (cm->ep->cm_handle == cm)) {
+ cm->ep->cm_handle = IB_INVALID_HANDLE;
+ cm->ep = NULL;
+ }
+
+ dapl_os_unlock(&cm->lock);
+
+ /* wakeup work thread */
+ send(cm->hca->ib_trans.scm[1], "w", sizeof "w", 0);
+}
+
+/* ACTIVE/PASSIVE: queue up connection object on CM list */
+static void ucm_queue_conn(dp_ib_cm_handle_t cm)
+{
+ /* add to work queue, list, for cm thread processing */
+ dapl_llist_init_entry((DAPL_LLIST_ENTRY *)&cm->entry);
+ dapl_os_lock(&cm->hca->ib_trans.lock);
+ dapl_llist_add_tail(&cm->hca->ib_trans.list,
+ (DAPL_LLIST_ENTRY *)&cm->entry, cm);
+ dapl_os_unlock(&cm->hca->ib_trans.lock);
+}
+
+/* PASSIVE: queue up listen object on listen list */
+static void ucm_queue_listen(dp_ib_cm_handle_t cm)
+{
+ /* add to work queue, llist, for cm thread processing */
+ dapl_llist_init_entry((DAPL_LLIST_ENTRY *)&cm->entry);
+ dapl_os_lock(&cm->hca->ib_trans.llock);
+ dapl_llist_add_tail(&cm->hca->ib_trans.llist,
+ (DAPL_LLIST_ENTRY *)&cm->entry, cm);
+ dapl_os_unlock(&cm->hca->ib_trans.llock);
+}
+
+static void ucm_dequeue_listen(dp_ib_cm_handle_t cm) {
+ dapl_os_lock(&cm->hca->ib_trans.llock);
+ dapl_llist_remove_entry(&cm->hca->ib_trans.llist,
+ (DAPL_LLIST_ENTRY *)&cm->entry);
+ dapl_os_unlock(&cm->hca->ib_trans.llock);
+}
+
+/*
+ * ACTIVE/PASSIVE: called from CR thread or consumer via ep_disconnect
+ * or from ep_free
+ */
+DAT_RETURN dapli_cm_disconnect(dp_ib_cm_handle_t cm)
+{
+ DAPL_EP *ep = cm->ep;
+
+ if (ep == NULL)
+ return DAT_SUCCESS;
+
+ dapl_os_lock(&cm->lock);
+ if ((cm->state == DCM_INIT) ||
+ (cm->state == DCM_DISC_PENDING) ||
+ (cm->state == DCM_DISCONNECTED) ||
+ (cm->state == DCM_DESTROY)) {
+ dapl_os_unlock(&cm->lock);
+ return DAT_SUCCESS;
+ } else {
+ /* send disc, schedule destroy */
+ cm->msg.op = htons(DCM_DREQ);
+ if (ucm_send(&cm->hca->ib_trans, &cm->msg)) {
+ dapl_log(DAPL_DBG_TYPE_WARN,
+ " disc_req: ERR-> %s lid %d qpn %d"
+ " r_psp %d \n", strerror(errno),
+ htons(cm->msg.saddr.ib.lid),
+ htonl(cm->msg.saddr.ib.qpn),
+ htons(cm->msg.sport));
+ }
+ cm->state = DCM_DISC_PENDING;
+ }
+ dapl_os_unlock(&cm->lock);
+
+ /* disconnect events for RC's only */
+ if (ep->param.ep_attr.service_type == DAT_SERVICE_TYPE_RC) {
+ if (ep->cr_ptr) {
+ dapls_cr_callback(cm,
+ IB_CME_DISCONNECTED,
+ NULL,
+ ((DAPL_CR *)ep->cr_ptr)->sp_ptr);
+ } else {
+ dapl_evd_connection_callback(ep->cm_handle,
+ IB_CME_DISCONNECTED,
+ NULL, ep);
+ }
+ }
+
+ /* scheduled destroy via disconnect clean in callback */
+ return DAT_SUCCESS;
+}
+
+/*
+ * ACTIVE: get remote CM SID server info from r_addr.
+ * send, or resend CM msg via UD CM QP
+ */
+DAT_RETURN
+dapli_cm_connect(DAPL_EP *ep, dp_ib_cm_handle_t cm)
+{
+ dapl_log(DAPL_DBG_TYPE_EP,
+ " connect: lid %x qpn %x lport %d p_sz=%d -> "
+ " lid %x c_qpn %x rport %d\n",
+ htons(cm->msg.saddr.ib.lid), htonl(cm->msg.saddr.ib.qpn),
+ htons(cm->msg.sport), htons(cm->msg.p_size),
+ htons(cm->msg.daddr.ib.lid), htonl(cm->msg.dqpn),
+ htons(cm->msg.dport));
+
+ dapl_os_lock(&cm->lock);
+ if (cm->state == DCM_INIT)
+ cm->state = DCM_CONN_PENDING;
+ else if (++cm->retries == DCM_RETRY_CNT) {
+ dapl_log(DAPL_DBG_TYPE_WARN,
+ " connect: RETRIES EXHAUSTED -> lid %d qpn %d r_psp"
+ " %d p_sz=%d\n",
+ strerror(errno), htons(cm->msg.daddr.ib.lid),
+ htonl(cm->msg.dqpn), htons(cm->msg.dport),
+ htons(cm->msg.p_size));
+
+ /* update ep->cm reference so we get cleaned up on callback */
+ if (cm->msg.saddr.ib.qp_type == IBV_QPT_RC);
+ ep->cm_handle = cm;
+
+ dapl_os_unlock(&cm->lock);
+ dapl_evd_connection_callback(cm,
+ IB_CME_DESTINATION_UNREACHABLE,
+ NULL, ep);
+
+ return DAT_ERROR(DAT_INVALID_ADDRESS,
+ DAT_INVALID_ADDRESS_UNREACHABLE);
+ }
+ dapl_os_unlock(&cm->lock);
+
+ cm->msg.op = htons(DCM_REQ);
+ if (ucm_send(&cm->hca->ib_trans, &cm->msg))
+ goto bail;
+
+ /* first time through, put on work queue */
+ if (!cm->retries)
+ ucm_queue_conn(cm);
+
+ return DAT_SUCCESS;
+
+bail:
+ dapl_log(DAPL_DBG_TYPE_ERR,
+ " connect: ERR %s -> cm_lid %d cm_qpn %d r_psp %d p_sz=%d\n",
+ strerror(errno), htons(cm->msg.daddr.ib.lid),
+ htonl(cm->msg.dqpn), htons(cm->msg.dport),
+ htonl(cm->msg.p_size));
+
+ /* close socket, free cm structure */
+ dapls_ib_cm_free(cm, cm->ep);
+ return DAT_INSUFFICIENT_RESOURCES;
+}
+
+/*
+ * ACTIVE: exchange QP information, called from CR thread
+ */
+static void ucm_connect_rtu(dp_ib_cm_handle_t cm, ib_cm_msg_t *msg)
+{
+ DAPL_EP *ep = cm->ep;
+ ib_cm_events_t event = IB_CME_CONNECTED;
+
+ dapl_os_lock(&cm->lock);
+ if (cm->state != DCM_CONN_PENDING) {
+ dapl_log(DAPL_DBG_TYPE_WARN,
+ " CONN_RTU: UNEXPECTED state:"
+ " op %d, st %s <- lid %d sqpn %d sport %d\n",
+ ntohs(msg->op), dapl_cm_state_str(cm->state),
+ ntohs(msg->saddr.ib.lid), ntohl(msg->saddr.ib.qpn),
+ ntohs(msg->sport));
+ dapl_os_unlock(&cm->lock);
+ return;
+ }
+ dapl_os_unlock(&cm->lock);
+
+ /* save remote address information to EP and CM */
+ dapl_os_memcpy(&ep->remote_ia_address,
+ &msg->saddr, sizeof(union dcm_addr));
+ dapl_os_memcpy(&cm->msg.daddr,
+ &msg->saddr, sizeof(union dcm_addr));
+
+ /* validate private data size, and copy if necessary */
+ if (msg->p_size) {
+ if (ntohs(msg->p_size) > DCM_MAX_PDATA_SIZE) {
+ dapl_log(DAPL_DBG_TYPE_WARN,
+ " CONN_RTU: invalid p_size %d:"
+ " st %s <- lid %d sqpn %d spsp %d\n",
+ ntohs(msg->p_size),
+ dapl_cm_state_str(cm->state),
+ ntohs(msg->saddr.ib.lid),
+ ntohl(msg->saddr.ib.qpn),
+ ntohs(msg->sport));
+ goto bail;
+ }
+ dapl_os_memcpy(cm->msg.p_data,
+ msg->p_data, ntohs(msg->p_size));
+ }
+
+ dapl_dbg_log(DAPL_DBG_TYPE_CM,
+ " CONN_RTU: DST port=%d lid=%x,"
+ " iqp=%x, qp_type=%d, port=%d psize=%d\n",
+ cm->msg.daddr.ib.port_num, ntohs(cm->msg.daddr.ib.lid),
+ ntohl(cm->msg.daddr.ib.qpn), cm->msg.daddr.ib.qp_type,
+ ntohs(msg->sport), ntohs(msg->p_size));
+
+ if (ntohs(msg->op) == DCM_REP)
+ event = IB_CME_CONNECTED;
+ else if (ntohs(msg->op) == DCM_REJ_USER)
+ event = IB_CME_DESTINATION_REJECT_PRIVATE_DATA;
+ else
+ event = IB_CME_DESTINATION_REJECT;
+
+ if (event != IB_CME_CONNECTED) {
+ dapl_log(DAPL_DBG_TYPE_CM,
+ " CONN_RTU: REJ op=%d <- lid %x, iqp %x, psp %d\n",
+ ntohs(msg->op), ntohs(msg->saddr.ib.lid),
+ ntohl(msg->saddr.ib.qpn), ntohs(msg->sport));
+#ifdef DAT_EXTENSIONS
+ if (cm->msg.daddr.ib.qp_type == IBV_QPT_UD)
+ goto ud_bail;
+ else
+#endif
+ goto bail;
+ }
+
+ /* modify QP to RTR and then to RTS with remote info */
+ dapl_os_lock(&cm->ep->header.lock);
+ if (dapls_modify_qp_state(cm->ep->qp_handle,
+ IBV_QPS_RTR,
+ cm->msg.daddr.ib.qpn,
+ cm->msg.daddr.ib.lid,
+ NULL) != DAT_SUCCESS) {
+ dapl_log(DAPL_DBG_TYPE_ERR,
+ " CONN_RTU: QPS_RTR ERR %s <- lid %x iqp %x\n",
+ strerror(errno), ntohs(cm->msg.daddr.ib.lid),
+ ntohl(cm->msg.daddr.ib.qpn));
+ dapl_os_unlock(&cm->ep->header.lock);
+ event = IB_CME_LOCAL_FAILURE;
+ goto bail;
+ }
+ if (dapls_modify_qp_state(cm->ep->qp_handle,
+ IBV_QPS_RTS,
+ cm->msg.daddr.ib.qpn,
+ cm->msg.daddr.ib.lid,
+ NULL) != DAT_SUCCESS) {
+ dapl_log(DAPL_DBG_TYPE_ERR,
+ " CONN_RTU: QPS_RTS ERR %s <- lid %x iqp %x\n",
+ strerror(errno), ntohs(cm->msg.daddr.ib.lid),
+ ntohl(cm->msg.daddr.ib.qpn));
+ dapl_os_unlock(&cm->ep->header.lock);
+ event = IB_CME_LOCAL_FAILURE;
+ goto bail;
+ }
+ dapl_os_unlock(&cm->ep->header.lock);
+
+ /* Send RTU */
+ cm->msg.op = htons(DCM_RTU);
+
+ if (ucm_send(&cm->hca->ib_trans, &cm->msg))
+ goto bail;
+
+ /* init cm_handle and post the event with private data */
+ cm->state = DCM_CONNECTED;
+ dapl_dbg_log(DAPL_DBG_TYPE_EP, " ACTIVE: connected!\n");
+
+#ifdef DAT_EXTENSIONS
+ud_bail:
+ if (cm->msg.daddr.ib.qp_type == IBV_QPT_UD) {
+ DAT_IB_EXTENSION_EVENT_DATA xevent;
+ uint16_t lid = ntohs(cm->msg.daddr.ib.lid);
+
+ /* post EVENT, modify_qp, AH already created, ucm msg */
+ xevent.status = 0;
+ xevent.type = DAT_IB_UD_REMOTE_AH;
+ xevent.remote_ah.ah = cm->hca->ib_trans.ah[lid];
+ xevent.remote_ah.qpn = cm->msg.daddr.ib.qpn;
+ dapl_os_memcpy(&xevent.remote_ah.ia_addr,
+ &cm->msg.daddr,
+ sizeof(union dcm_addr));
+
+ if (event == IB_CME_CONNECTED)
+ event = DAT_IB_UD_CONNECTION_EVENT_ESTABLISHED;
+ else
+ event = DAT_IB_UD_CONNECTION_REJECT_EVENT;
+
+ dapls_evd_post_connection_event_ext(
+ (DAPL_EVD *)cm->ep->param.connect_evd_handle,
+ event,
+ (DAT_EP_HANDLE)ep,
+ (DAT_COUNT)cm->msg.p_size,
+ (DAT_PVOID *)cm->msg.p_data,
+ (DAT_PVOID *)&xevent);
+
+ /* we are done, don't destroy cm_ptr, need pdata */
+ cm->state = DCM_RELEASED;
+ } else
+#endif
+ {
+ cm->ep->cm_handle = cm; /* only RC, multi CR's on UD */
+ dapl_evd_connection_callback(cm,
+ IB_CME_CONNECTED,
+ cm->msg.p_data, cm->ep);
+ }
+ return;
+
+bail:
+ if (cm->msg.saddr.ib.qp_type != IBV_QPT_UD)
+ dapls_ib_reinit_ep(cm->ep); /* reset QP state */
+ dapl_evd_connection_callback(NULL, event, cm->msg.p_data, cm->ep);
+}
+
+/*
+ * PASSIVE: Accept on listen CM PSP.
+ * create new CM object for this CR,
+ * receive peer QP information, private data,
+ * and post cr_event
+ */
+static void ucm_accept(ib_cm_srvc_handle_t cm, ib_cm_msg_t *msg)
+{
+ dp_ib_cm_handle_t acm;
+
+ /* Allocate accept CM and setup passive references */
+ if ((acm = dapls_ib_cm_create(NULL)) == NULL) {
+ dapl_log(DAPL_DBG_TYPE_WARN, " accept: ERR cm_create\n");
+ return;
+ }
+
+ /* dest CM info from CR msg, source CM info from listen */
+ acm->sp = cm->sp;
+ acm->hca = cm->hca;
+ acm->state = DCM_ACCEPTING;
+ acm->msg.dport = msg->sport;
+ acm->msg.dqpn = msg->sqpn;
+ acm->msg.sport = cm->msg.sport;
+ acm->msg.sqpn = cm->msg.sqpn;
+ acm->msg.p_size = msg->p_size;
+
+ /* CR saddr is CM daddr info, need EP for local saddr */
+ dapl_os_memcpy(&acm->msg.daddr, &msg->saddr, sizeof(union dcm_addr));
+
+ dapl_log(DAPL_DBG_TYPE_CM,
+ " accept: DST port=%d lid=%x, iqp=%x, psize=%d\n",
+ ntohs(acm->msg.dport), ntohs(acm->msg.daddr.ib.lid),
+ htonl(acm->msg.daddr.ib.qpn), htons(acm->msg.p_size));
+
+ /* validate private data size before reading */
+ if (ntohs(msg->p_size) > DCM_MAX_PDATA_SIZE) {
+ dapl_log(DAPL_DBG_TYPE_WARN, " accept: psize (%d) wrong\n",
+ ntohs(msg->p_size));
+ goto bail;
+ }
+
+ /* read private data into cm_handle if any present */
+ if (msg->p_size)
+ dapl_os_memcpy(acm->msg.p_data,
+ msg->p_data, ntohs(msg->p_size));
+
+ acm->state = DCM_ACCEPTING_DATA;
+ ucm_queue_conn(acm);
+
+#ifdef DAT_EXTENSIONS
+ if (acm->msg.daddr.ib.qp_type == IBV_QPT_UD) {
+ DAT_IB_EXTENSION_EVENT_DATA xevent;
+
+ /* post EVENT, modify_qp created ah */
+ xevent.status = 0;
+ xevent.type = DAT_IB_UD_CONNECT_REQUEST;
+
+ dapls_evd_post_cr_event_ext(acm->sp,
+ DAT_IB_UD_CONNECTION_REQUEST_EVENT,
+ acm,
+ (DAT_COUNT)acm->msg.p_size,
+ (DAT_PVOID *)acm->msg.p_data,
+ (DAT_PVOID *)&xevent);
+ } else
+#endif
+ /* trigger CR event and return SUCCESS */
+ dapls_cr_callback(acm,
+ IB_CME_CONNECTION_REQUEST_PENDING,
+ acm->msg.p_data, acm->sp);
+ return;
+
+bail:
+ /* free cm object */
+ dapls_ib_cm_free(acm, NULL);
+ return;
+}
+
+/*
+ * PASSIVE: read RTU from active peer, post CONN event
+ */
+static void ucm_accept_rtu(dp_ib_cm_handle_t cm, ib_cm_msg_t *msg)
+{
+ dapl_os_lock(&cm->lock);
+ if ((ntohs(msg->op) != DCM_RTU) || (cm->state != DCM_ACCEPTED)) {
+ dapl_log(DAPL_DBG_TYPE_WARN,
+ " accept_rtu: UNEXPECTED op, state:"
+ " op %d, st %s <- lid %x iqp %x sport %d\n",
+ ntohs(msg->op), dapl_cm_state_str(cm->state),
+ ntohs(msg->saddr.ib.lid), ntohl(msg->saddr.ib.qpn),
+ ntohs(msg->sport));
+ dapl_os_unlock(&cm->lock);
+ goto bail;
+ }
+ cm->state = DCM_CONNECTED;
+ dapl_os_unlock(&cm->lock);
+
+ if (msg->p_size)
+ dapl_os_memcpy(cm->msg.p_data,
+ msg->p_data, ntohs(msg->p_size));
+
+ /* final data exchange if remote QP state is good to go */
+ dapl_dbg_log(DAPL_DBG_TYPE_CM, " PASSIVE: connected!\n");
+
+#ifdef DAT_EXTENSIONS
+ if (cm->msg.saddr.ib.qp_type == IBV_QPT_UD) {
+ DAT_IB_EXTENSION_EVENT_DATA xevent;
+ uint16_t lid = ntohs(cm->msg.daddr.ib.lid);
+
+ /* post EVENT, modify_qp, AH already created, ucm msg */
+ xevent.status = 0;
+ xevent.type = DAT_IB_UD_PASSIVE_REMOTE_AH;
+ xevent.remote_ah.ah = cm->hca->ib_trans.ah[lid];
+ xevent.remote_ah.qpn = cm->msg.daddr.ib.qpn;
+ dapl_os_memcpy(&xevent.remote_ah.ia_addr,
+ &cm->msg.daddr,
+ sizeof(cm->msg.daddr));
+
+ dapls_evd_post_connection_event_ext(
+ (DAPL_EVD *)cm->ep->param.connect_evd_handle,
+ DAT_IB_UD_CONNECTION_EVENT_ESTABLISHED,
+ (DAT_EP_HANDLE)cm->ep,
+ (DAT_COUNT)cm->msg.p_size,
+ (DAT_PVOID *)cm->msg.p_data,
+ (DAT_PVOID *)&xevent);
+
+ /* done with CM object, don't destroy cm, need pdata */
+ cm->state = DCM_RELEASED;
+ } else {
+#endif
+ cm->ep->cm_handle = cm; /* only RC, multi CR's on UD */
+ dapls_cr_callback(cm, IB_CME_CONNECTED, NULL, cm->sp);
+ }
+ return;
+bail:
+ if (cm->msg.saddr.ib.qp_type != IBV_QPT_UD)
+ dapls_ib_reinit_ep(cm->ep); /* reset QP state */
+ dapls_ib_cm_free(cm, cm->ep);
+ dapls_cr_callback(cm, IB_CME_LOCAL_FAILURE, NULL, cm->sp);
+}
+
+/*
+ * PASSIVE: consumer accept, send local QP information, private data,
+ * queue on work thread to receive RTU information to avoid blocking
+ * user thread.
+ */
+DAT_RETURN
+dapli_accept_usr(DAPL_EP *ep, DAPL_CR *cr, DAT_COUNT p_size, DAT_PVOID p_data)
+{
+ DAPL_IA *ia = ep->header.owner_ia;
+ dp_ib_cm_handle_t cm = cr->ib_cm_handle;
+
+ if (p_size > DCM_MAX_PDATA_SIZE)
+ return DAT_LENGTH_ERROR;
+
+ dapl_os_lock(&cm->lock);
+ if (cm->state != DCM_ACCEPTING_DATA) {
+ dapl_os_unlock(&cm->lock);
+ return DAT_INVALID_STATE;
+ }
+ dapl_os_unlock(&cm->lock);
+
+ dapl_dbg_log(DAPL_DBG_TYPE_CM,
+ " ACCEPT_USR: remote port_num=%d lid=%x"
+ " iqp=%x qp_type %d, psize=%d\n",
+ cm->msg.daddr.ib.port_num, cm->msg.daddr.ib.lid,
+ cm->msg.daddr.ib.qpn, cm->msg.daddr.ib.qp_type,
+ cm->msg.p_size);
+
+ dapl_dbg_log(DAPL_DBG_TYPE_CM,
+ " ACCEPT_USR: remote GID subnet %016llx id %016llx\n",
+ (unsigned long long)
+ htonll(cm->msg.daddr.ib.gid.global.subnet_prefix),
+ (unsigned long long)
+ htonll(cm->msg.daddr.ib.gid.global.interface_id));
+
+#ifdef DAT_EXTENSIONS
+ if (cm->msg.daddr.ib.qp_type == IBV_QPT_UD &&
+ ep->qp_handle->qp_type != IBV_QPT_UD) {
+ dapl_dbg_log(DAPL_DBG_TYPE_ERR,
+ " ACCEPT_USR: ERR remote QP is UD,"
+ ", but local QP is not\n");
+ return (DAT_INVALID_HANDLE | DAT_INVALID_HANDLE_EP);
+ }
+#endif
+
+ /* modify QP to RTR and then to RTS with remote info already read */
+ dapl_os_lock(&ep->header.lock);
+ if (dapls_modify_qp_state(ep->qp_handle,
+ IBV_QPS_RTR,
+ cm->msg.daddr.ib.qpn,
+ cm->msg.daddr.ib.lid,
+ NULL) != DAT_SUCCESS) {
+ dapl_log(DAPL_DBG_TYPE_ERR,
+ " ACCEPT_USR: QPS_RTR ERR %s -> lid %x qpn %x\n",
+ strerror(errno), ntohs(cm->msg.daddr.ib.lid),
+ ntohl(cm->msg.daddr.ib.qpn));
+ dapl_os_unlock(&ep->header.lock);
+ goto bail;
+ }
+ if (dapls_modify_qp_state(ep->qp_handle,
+ IBV_QPS_RTS,
+ cm->msg.daddr.ib.qpn,
+ cm->msg.daddr.ib.lid,
+ NULL) != DAT_SUCCESS) {
+ dapl_log(DAPL_DBG_TYPE_ERR,
+ " ACCEPT_USR: QPS_RTS ERR %s -> lid %x qpn %x\n",
+ strerror(errno), ntohs(cm->msg.daddr.ib.lid),
+ ntohl(cm->msg.daddr.ib.qpn));
+ dapl_os_unlock(&ep->header.lock);
+ goto bail;
+ }
+ dapl_os_unlock(&ep->header.lock);
+
+ /* save remote address information */
+ dapl_os_memcpy(&ep->remote_ia_address,
+ &cm->msg.saddr, sizeof(union dcm_addr));
+
+ /* setup local QP info and type from EP, copy pdata, for reply */
+ cm->msg.op = htons(DCM_REP);
+ cm->msg.saddr.ib.qpn = htonl(ep->qp_handle->qp_num);
+ cm->msg.saddr.ib.qp_type = htons(ep->qp_handle->qp_type);
+ cm->msg.saddr.ib.port_num = cm->hca->port_num;
+ cm->msg.saddr.ib.lid = cm->hca->ib_trans.addr.ib.lid;
+ cm->msg.saddr.ib.gid = cm->hca->ib_trans.addr.ib.gid;
+ dapl_os_memcpy(&cm->msg.p_data, p_data, p_size);
+
+ if (ucm_send(&cm->hca->ib_trans, &cm->msg))
+ goto bail;
+
+ /* save state and setup valid reference to EP, HCA */
+ dapl_os_lock(&cm->lock);
+ cm->ep = ep;
+ cm->hca = ia->hca_ptr;
+ cm->state = DCM_ACCEPTED;
+ dapl_os_unlock(&cm->lock);
+
+ dapl_dbg_log(DAPL_DBG_TYPE_CM, " PASSIVE: accepted!\n");
+ return DAT_SUCCESS;
+
+bail:
+ if (cm->msg.saddr.ib.qp_type != IBV_QPT_UD)
+ dapls_ib_reinit_ep(ep);
+ dapls_ib_cm_free(cm, ep);
+ return DAT_INTERNAL_ERROR;
+}
+
+
+/*
+ * dapls_ib_connect
+ *
+ * Initiate a connection with the passive listener on another node
+ *
+ * Input:
+ * ep_handle,
+ * remote_ia_address,
+ * remote_conn_qual,
+ * prd_size size of private data and structure
+ * prd_prt pointer to private data structure
+ *
+ * Output:
+ * none
+ *
+ * Returns:
+ * DAT_SUCCESS
+ * DAT_INSUFFICIENT_RESOURCES
+ * DAT_INVALID_PARAMETER
+ *
+ */
+DAT_RETURN
+dapls_ib_connect(IN DAT_EP_HANDLE ep_handle,
+ IN DAT_IA_ADDRESS_PTR r_addr,
+ IN DAT_CONN_QUAL r_psp,
+ IN DAT_COUNT p_size, IN void *p_data)
+{
+ DAPL_EP *ep = (DAPL_EP *)ep_handle;
+ dp_ib_cm_handle_t cm;
+
+ /* create CM object, initialize SRC info from EP */
+ cm = dapls_ib_cm_create(ep);
+ if (cm == NULL)
+ return DAT_INSUFFICIENT_RESOURCES;
+
+ /* remote hca and port: lid, gid, port_num, network order */
+ dapl_os_memcpy(&cm->msg.daddr, r_addr, sizeof(union dcm_addr));
+
+ /* remote uCM information, comes from consumer provider r_addr */
+ cm->msg.dport = htons((uint16_t)r_psp);
+ cm->msg.dqpn = cm->msg.daddr.ib.qpn;
+
+ if (p_size) {
+ cm->msg.p_size = htons(p_size);
+ dapl_os_memcpy(&cm->msg.p_data, p_data, p_size);
+ }
+
+ /* build connect request, send to remote CM based on r_addr info */
+ return(dapli_cm_connect(ep, cm));
+}
+
+/*
+ * dapls_ib_disconnect
+ *
+ * Disconnect an EP
+ *
+ * Input:
+ * ep_handle,
+ * disconnect_flags
+ *
+ * Output:
+ * none
+ *
+ * Returns:
+ * DAT_SUCCESS
+ */
+DAT_RETURN
+dapls_ib_disconnect(IN DAPL_EP *ep, IN DAT_CLOSE_FLAGS close_flags)
+{
+ dapl_dbg_log(DAPL_DBG_TYPE_EP,
+ "dapls_ib_disconnect(ep_handle %p ....)\n", ep);
+
+ /* reinit to modify QP state, if not UD */
+ if (ep->qp_handle->qp_type != IBV_QPT_UD)
+ dapls_ib_reinit_ep(ep);
+
+ if (ep->cm_handle == NULL ||
+ ep->param.ep_state == DAT_EP_STATE_DISCONNECTED)
+ return DAT_SUCCESS;
+ else
+ return (dapli_cm_disconnect(ep->cm_handle));
+}
+
+/*
+ * dapls_ib_disconnect_clean
+ *
+ * Clean up outstanding connection data. This routine is invoked
+ * after the final disconnect callback has occurred. Only on the
+ * ACTIVE side of a connection. It is also called if dat_ep_connect
+ * times out using the consumer supplied timeout value.
+ *
+ * Input:
+ * ep_ptr DAPL_EP
+ * active Indicates active side of connection
+ *
+ * Output:
+ * none
+ *
+ * Returns:
+ * void
+ *
+ */
+void
+dapls_ib_disconnect_clean(IN DAPL_EP *ep,
+ IN DAT_BOOLEAN active,
+ IN const ib_cm_events_t ib_cm_event)
+{
+ /* NOTE: SCM will only initialize cm_handle with RC type
+ *
+ * For UD there can many in-flight CR's so you
+ * cannot cleanup timed out CR's with EP reference
+ * alone since they share the same EP. The common
+ * code that handles connection timeout logic needs
+ * updated for UD support.
+ */
+ if (ep->cm_handle)
+ dapls_ib_cm_free(ep->cm_handle, ep);
+
+ return;
+}
+
+/*
+ * dapl_ib_setup_conn_listener
+ *
+ * Have the CM set up a connection listener.
+ *
+ * Input:
+ * ibm_hca_handle HCA handle
+ * qp_handle QP handle
+ *
+ * Output:
+ * none
+ *
+ * Returns:
+ * DAT_SUCCESS
+ * DAT_INSUFFICIENT_RESOURCES
+ * DAT_INTERNAL_ERROR
+ * DAT_CONN_QUAL_UNAVAILBLE
+ * DAT_CONN_QUAL_IN_USE
+ *
+ */
+DAT_RETURN
+dapls_ib_setup_conn_listener(IN DAPL_IA *ia,
+ IN DAT_UINT64 sid,
+ IN DAPL_SP *sp)
+{
+ ib_cm_srvc_handle_t cm = NULL;
+
+ dapl_dbg_log(DAPL_DBG_TYPE_EP,
+ " listen(ia %p ServiceID %d sp %p)\n",
+ ia, sid, sp);
+
+ /* reserve local port, then allocate CM object */
+ if (!ucm_get_port(&ia->hca_ptr->ib_trans, (uint16_t)sid)) {
+ dapl_dbg_log(DAPL_DBG_TYPE_CM,
+ " listen: ERROR %s on conn_qual 0x%x\n",
+ strerror(errno), sid);
+ return DAT_CONN_QUAL_IN_USE;
+ }
+
+ /* cm_create will setup saddr for listen server */
+ if ((cm = dapls_ib_cm_create(NULL)) == NULL)
+ return DAT_INSUFFICIENT_RESOURCES;
+
+ /* LISTEN: init DST address and QP info to local CM server info */
+ cm->sp = sp;
+ cm->hca = ia->hca_ptr;
+ cm->msg.sport = htons((uint16_t)sid);
+ cm->msg.sqpn = htonl(ia->hca_ptr->ib_trans.qp->qp_num);
+ cm->msg.saddr.ib.qp_type = IBV_QPT_UD;
+ cm->msg.saddr.ib.port_num = ia->hca_ptr->port_num;
+ cm->msg.saddr.ib.lid = ia->hca_ptr->ib_trans.addr.ib.lid;
+ cm->msg.saddr.ib.gid = ia->hca_ptr->ib_trans.addr.ib.gid;
+
+ /* save cm_handle reference in service point */
+ sp->cm_srvc_handle = cm;
+
+ /* queue up listen socket to process inbound CR's */
+ cm->state = DCM_LISTEN;
+ ucm_queue_listen(cm);
+
+ return DAT_SUCCESS;
+}
+
+
+/*
+ * dapl_ib_remove_conn_listener
+ *
+ * Have the CM remove a connection listener.
+ *
+ * Input:
+ * ia_handle IA handle
+ * ServiceID IB Channel Service ID
+ *
+ * Output:
+ * none
+ *
+ * Returns:
+ * DAT_SUCCESS
+ * DAT_INVALID_STATE
+ *
+ */
+DAT_RETURN
+dapls_ib_remove_conn_listener(IN DAPL_IA *ia, IN DAPL_SP *sp)
+{
+ ib_cm_srvc_handle_t cm = sp->cm_srvc_handle;
+ ib_hca_transport_t *tp = &ia->hca_ptr->ib_trans;
+
+ /* free cm_srvc_handle and port, and mark CM for cleanup */
+ if (cm) {
+ dapl_dbg_log(DAPL_DBG_TYPE_EP,
+ " remove_listener(ia %p sp %p cm %p psp=%d)\n",
+ ia, sp, cm, ntohs(cm->msg.dport));
+
+ sp->cm_srvc_handle = NULL;
+ dapl_os_lock(&cm->lock);
+ ucm_free_port(tp, ntohs(cm->msg.dport));
+ cm->msg.dport = 0;
+ cm->state = DCM_DESTROY;
+ dapl_os_unlock(&cm->lock);
+ ucm_dequeue_listen(cm);
+ dapl_os_free(cm, sizeof(*cm));
+ }
+ return DAT_SUCCESS;
+}
+
+/*
+ * dapls_ib_accept_connection
+ *
+ * Perform necessary steps to accept a connection
+ *
+ * Input:
+ * cr_handle
+ * ep_handle
+ * private_data_size
+ * private_data
+ *
+ * Output:
+ * none
+ *
+ * Returns:
+ * DAT_SUCCESS
+ * DAT_INSUFFICIENT_RESOURCES
+ * DAT_INTERNAL_ERROR
+ *
+ */
+DAT_RETURN
+dapls_ib_accept_connection(IN DAT_CR_HANDLE cr_handle,
+ IN DAT_EP_HANDLE ep_handle,
+ IN DAT_COUNT p_size,
+ IN const DAT_PVOID p_data)
+{
+ DAPL_CR *cr = (DAPL_CR *)cr_handle;
+ DAPL_EP *ep = (DAPL_EP *)ep_handle;
+
+ dapl_dbg_log(DAPL_DBG_TYPE_EP,
+ " accept_connection(cr %p ep %p prd %p,%d)\n",
+ cr, ep, p_data, p_size);
+
+ /* allocate and attach a QP if necessary */
+ if (ep->qp_state == DAPL_QP_STATE_UNATTACHED) {
+ DAT_RETURN status;
+ status = dapls_ib_qp_alloc(ep->header.owner_ia,
+ ep, ep);
+ if (status != DAT_SUCCESS)
+ return status;
+ }
+ return (dapli_accept_usr(ep, cr, p_size, p_data));
+}
+
+/*
+ * dapls_ib_reject_connection
+ *
+ * Reject a connection
+ *
+ * Input:
+ * cr_handle
+ *
+ * Output:
+ * none
+ *
+ * Returns:
+ * DAT_SUCCESS
+ * DAT_INTERNAL_ERROR
+ *
+ */
+DAT_RETURN
+dapls_ib_reject_connection(IN dp_ib_cm_handle_t cm,
+ IN int reason,
+ IN DAT_COUNT psize, IN const DAT_PVOID pdata)
+{
+
+ dapl_dbg_log(DAPL_DBG_TYPE_EP,
+ " reject(cm %p reason %x, pdata %p, psize %d)\n",
+ cm, reason, pdata, psize);
+
+ if (psize > DCM_MAX_PDATA_SIZE)
+ return DAT_LENGTH_ERROR;
+
+ cm->msg.op = htons(DCM_REJ_USER);
+ if (psize)
+ dapl_os_memcpy(&cm->msg.p_data, pdata, psize);
+
+ if (ucm_send(&cm->hca->ib_trans, &cm->msg)) {
+ dapl_log(DAPL_DBG_TYPE_WARN,
+ " cm_reject: ERR: %s\n", strerror(errno));
+ return DAT_INTERNAL_ERROR;
+ }
+
+ /* cr_thread will destroy CR */
+ cm->state = DCM_REJECTING;
+ send(cm->hca->ib_trans.scm[1], "w", sizeof "w", 0);
+ return DAT_SUCCESS;
+}
+
+/*
+ * dapls_ib_cm_remote_addr
+ *
+ * Obtain the remote IP address given a connection
+ *
+ * Input:
+ * cr_handle
+ *
+ * Output:
+ * remote_ia_address: where to place the remote address
+ *
+ * Returns:
+ * DAT_SUCCESS
+ * DAT_INVALID_HANDLE
+ *
+ */
+DAT_RETURN
+dapls_ib_cm_remote_addr(IN DAT_HANDLE dat_handle,
+ OUT DAT_SOCK_ADDR6 * remote_ia_address)
+{
+ DAPL_HEADER *header;
+ dp_ib_cm_handle_t ib_cm_handle;
+
+ dapl_dbg_log(DAPL_DBG_TYPE_EP,
+ "dapls_ib_cm_remote_addr(dat_handle %p, ....)\n",
+ dat_handle);
+
+ header = (DAPL_HEADER *) dat_handle;
+
+ if (header->magic == DAPL_MAGIC_EP)
+ ib_cm_handle = ((DAPL_EP *) dat_handle)->cm_handle;
+ else if (header->magic == DAPL_MAGIC_CR)
+ ib_cm_handle = ((DAPL_CR *) dat_handle)->ib_cm_handle;
+ else
+ return DAT_INVALID_HANDLE;
+
+ dapl_os_memcpy(remote_ia_address,
+ &ib_cm_handle->msg.daddr, sizeof(DAT_SOCK_ADDR6));
+
+ return DAT_SUCCESS;
+}
+
+/*
+ * dapls_ib_private_data_size
+ *
+ * Return the size of private data given a connection op type
+ *
+ * Input:
+ * prd_ptr private data pointer
+ * conn_op connection operation type
+ *
+ * If prd_ptr is NULL, this is a query for the max size supported by
+ * the provider, otherwise it is the actual size of the private data
+ * contained in prd_ptr.
+ *
+ *
+ * Output:
+ * None
+ *
+ * Returns:
+ * length of private data
+ *
+ */
+int dapls_ib_private_data_size(IN DAPL_PRIVATE * prd_ptr,
+ IN DAPL_PDATA_OP conn_op, IN DAPL_HCA * hca_ptr)
+{
+ int size;
+
+ switch (conn_op) {
+ case DAPL_PDATA_CONN_REQ:
+ case DAPL_PDATA_CONN_REP:
+ case DAPL_PDATA_CONN_REJ:
+ case DAPL_PDATA_CONN_DREQ:
+ case DAPL_PDATA_CONN_DREP:
+ size = DCM_MAX_PDATA_SIZE;
+ break;
+ default:
+ size = 0;
+ }
+ return size;
+}
+
+/*
+ * Map all socket CM event codes to the DAT equivelent.
+ */
+#define DAPL_IB_EVENT_CNT 10
+
+static struct ib_cm_event_map {
+ const ib_cm_events_t ib_cm_event;
+ DAT_EVENT_NUMBER dat_event_num;
+} ib_cm_event_map[DAPL_IB_EVENT_CNT] = {
+/* 00 */ {IB_CME_CONNECTED,
+ DAT_CONNECTION_EVENT_ESTABLISHED},
+/* 01 */ {IB_CME_DISCONNECTED,
+ DAT_CONNECTION_EVENT_DISCONNECTED},
+/* 02 */ {IB_CME_DISCONNECTED_ON_LINK_DOWN,
+ DAT_CONNECTION_EVENT_DISCONNECTED},
+/* 03 */ {IB_CME_CONNECTION_REQUEST_PENDING,
+ DAT_CONNECTION_REQUEST_EVENT},
+/* 04 */ {IB_CME_CONNECTION_REQUEST_PENDING_PRIVATE_DATA,
+ DAT_CONNECTION_REQUEST_EVENT},
+/* 05 */ {IB_CME_DESTINATION_REJECT,
+ DAT_CONNECTION_EVENT_NON_PEER_REJECTED},
+/* 06 */ {IB_CME_DESTINATION_REJECT_PRIVATE_DATA,
+ DAT_CONNECTION_EVENT_PEER_REJECTED},
+/* 07 */ {IB_CME_DESTINATION_UNREACHABLE,
+ DAT_CONNECTION_EVENT_UNREACHABLE},
+/* 08 */ {IB_CME_TOO_MANY_CONNECTION_REQUESTS,
+ DAT_CONNECTION_EVENT_NON_PEER_REJECTED},
+/* 09 */ {IB_CME_LOCAL_FAILURE,
+ DAT_CONNECTION_EVENT_BROKEN}
+};
+
+/*
+ * dapls_ib_get_cm_event
+ *
+ * Return a DAT connection event given a provider CM event.
+ *
+ * Input:
+ * dat_event_num DAT event we need an equivelent CM event for
+ *
+ * Output:
+ * none
+ *
+ * Returns:
+ * ib_cm_event of translated DAPL value
+ */
+DAT_EVENT_NUMBER
+dapls_ib_get_dat_event(IN const ib_cm_events_t ib_cm_event,
+ IN DAT_BOOLEAN active)
+{
+ DAT_EVENT_NUMBER dat_event_num;
+ int i;
+
+ if (ib_cm_event > IB_CME_LOCAL_FAILURE)
+ return (DAT_EVENT_NUMBER) 0;
+
+ dat_event_num = 0;
+ for (i = 0; i < DAPL_IB_EVENT_CNT; i++) {
+ if (ib_cm_event == ib_cm_event_map[i].ib_cm_event) {
+ dat_event_num = ib_cm_event_map[i].dat_event_num;
+ break;
+ }
+ }
+ dapl_dbg_log(DAPL_DBG_TYPE_CALLBACK,
+ "dapls_ib_get_dat_event: event translate(%s) ib=0x%x dat=0x%x\n",
+ active ? "active" : "passive", ib_cm_event, dat_event_num);
+
+ return dat_event_num;
+}
+
+/*
+ * dapls_ib_get_dat_event
+ *
+ * Return a DAT connection event given a provider CM event.
+ *
+ * Input:
+ * ib_cm_event event provided to the dapl callback routine
+ * active switch indicating active or passive connection
+ *
+ * Output:
+ * none
+ *
+ * Returns:
+ * DAT_EVENT_NUMBER of translated provider value
+ */
+ib_cm_events_t dapls_ib_get_cm_event(IN DAT_EVENT_NUMBER dat_event_num)
+{
+ ib_cm_events_t ib_cm_event;
+ int i;
+
+ ib_cm_event = 0;
+ for (i = 0; i < DAPL_IB_EVENT_CNT; i++) {
+ if (dat_event_num == ib_cm_event_map[i].dat_event_num) {
+ ib_cm_event = ib_cm_event_map[i].ib_cm_event;
+ break;
+ }
+ }
+ return ib_cm_event;
+}
+
+/* work thread for uAT, uCM, CQ, and async events */
+void cm_thread(void *arg)
+{
+ struct dapl_hca *hca = arg;
+ dp_ib_cm_handle_t cm, next;
+ struct dapl_fd_set *set;
+ char rbuf[2];
+
+ dapl_dbg_log(DAPL_DBG_TYPE_UTIL, " cm_thread: ENTER hca %p\n", hca);
+ set = dapl_alloc_fd_set();
+ if (!set)
+ goto out;
+
+ dapl_os_lock(&hca->ib_trans.lock);
+ hca->ib_trans.cm_state = IB_THREAD_RUN;
+
+ while (1) {
+ dapl_fd_zero(set);
+ dapl_fd_set(hca->ib_trans.scm[0], set, DAPL_FD_READ);
+ dapl_fd_set(hca->ib_hca_handle->async_fd, set, DAPL_FD_READ);
+ dapl_fd_set(hca->ib_trans.rch->fd, set, DAPL_FD_READ);
+
+ if (!dapl_llist_is_empty(&hca->ib_trans.list))
+ next = dapl_llist_peek_head(&hca->ib_trans.list);
+ else
+ next = NULL;
+
+ while (next) {
+ cm = next;
+ next = dapl_llist_next_entry(
+ &hca->ib_trans.list,
+ (DAPL_LLIST_ENTRY *)&cm->entry);
+
+ if (cm->state == DCM_DESTROY ||
+ hca->ib_trans.cm_state != IB_THREAD_RUN) {
+ dapl_llist_remove_entry(
+ &hca->ib_trans.list,
+ (DAPL_LLIST_ENTRY *)&cm->entry);
+ dapl_os_free(cm, sizeof(*cm));
+ continue;
+ }
+
+ /* TODO: Check and process retries here */
+
+ continue;
+ }
+
+ /* set to exit and all resources destroyed */
+ if ((hca->ib_trans.cm_state != IB_THREAD_RUN) &&
+ (dapl_llist_is_empty(&hca->ib_trans.list)))
+ break;
+
+ dapl_os_unlock(&hca->ib_trans.lock);
+ dapl_dbg_log(DAPL_DBG_TYPE_UTIL, " cm_thread: select sleep\n");
+ dapl_select(set);
+ dapl_dbg_log(DAPL_DBG_TYPE_UTIL, " cm_thread: select wake\n");
+
+ /* Process events: CM, ASYNC, NOTIFY THREAD */
+ if (dapl_poll(hca->ib_trans.rch->fd,
+ DAPL_FD_READ) == DAPL_FD_READ) {
+ ucm_recv(&hca->ib_trans);
+ }
+ if (dapl_poll(hca->ib_hca_handle->async_fd,
+ DAPL_FD_READ) == DAPL_FD_READ) {
+ ucm_async_event(hca);
+ }
+ while (dapl_poll(hca->ib_trans.scm[0],
+ DAPL_FD_READ) == DAPL_FD_READ) {
+ recv(hca->ib_trans.scm[0], rbuf, 2, 0);
+ }
+
+ dapl_os_lock(&hca->ib_trans.lock);
+
+ /* set to exit and all resources destroyed */
+ if ((hca->ib_trans.cm_state != IB_THREAD_RUN) &&
+ (dapl_llist_is_empty(&hca->ib_trans.list)))
+ break;
+ }
+
+ dapl_os_unlock(&hca->ib_trans.lock);
+ free(set);
+out:
+ hca->ib_trans.cm_state = IB_THREAD_EXIT;
+ dapl_dbg_log(DAPL_DBG_TYPE_UTIL, " cm_thread(hca %p) exit\n", hca);
+}
+
+
+#ifdef DAPL_COUNTERS
+/* Debug aid: List all Connections in process and state */
+void dapls_print_cm_list(IN DAPL_IA *ia_ptr)
+{
+ /* Print in process CR's for this IA, if debug type set */
+ int i = 0;
+ dp_ib_cm_handle_t cr, next_cr;
+
+ dapl_os_lock(&ia_ptr->hca_ptr->ib_trans.lock);
+ if (!dapl_llist_is_empty((DAPL_LLIST_HEAD*)
+ &ia_ptr->hca_ptr->ib_trans.list))
+ next_cr = dapl_llist_peek_head((DAPL_LLIST_HEAD*)
+ &ia_ptr->hca_ptr->ib_trans.list);
+ else
+ next_cr = NULL;
+
+ printf("\n DAPL IA CONNECTIONS IN PROCESS:\n");
+ while (next_cr) {
+ cr = next_cr;
+ next_cr = dapl_llist_next_entry((DAPL_LLIST_HEAD*)
+ &ia_ptr->hca_ptr->ib_trans.list,
+ (DAPL_LLIST_ENTRY*)&cr->entry);
+
+ printf( " CONN[%d]: sp %p ep %p %s %s %s"
+ " dst lid %x iqp %x port %d\n",
+ i, cr->sp, cr->ep,
+ cr->msg.saddr.ib.qp_type == IBV_QPT_RC ? "RC" : "UD",
+ dapl_cm_state_str(cr->state),
+ cr->sp ? "<-" : "->",
+ ntohs(cr->msg.daddr.ib.lid),
+ ntohl(cr->msg.daddr.ib.qpn),
+ cr->sp ?
+ (int)cr->sp->conn_qual : ntohs(cr->msg.dport) );
+ i++;
+ }
+ printf("\n");
+ dapl_os_unlock(&ia_ptr->hca_ptr->ib_trans.lock);
+}
+#endif
diff --git a/dapl/openib_ucm/dapl_ib_util.h b/dapl/openib_ucm/dapl_ib_util.h
new file mode 100644
index 0000000..dfee2b9
--- /dev/null
+++ b/dapl/openib_ucm/dapl_ib_util.h
@@ -0,0 +1,119 @@
+/*
+ * Copyright (c) 2009 Intel Corporation. All rights reserved.
+ *
+ * This Software is licensed under one of the following licenses:
+ *
+ * 1) under the terms of the "Common Public License 1.0" a copy of which is
+ * available from the Open Source Initiative, see
+ * http://www.opensource.org/licenses/cpl.php.
+ *
+ * 2) under the terms of the "The BSD License" a copy of which is
+ * available from the Open Source Initiative, see
+ * http://www.opensource.org/licenses/bsd-license.php.
+ *
+ * 3) under the terms of the "GNU General Public License (GPL) Version 2" a
+ * copy of which is available from the Open Source Initiative, see
+ * http://www.opensource.org/licenses/gpl-license.php.
+ *
+ * Licensee has the right to choose one of the above licenses.
+ *
+ * Redistributions of source code must retain the above copyright
+ * notice and one of the license notices.
+ *
+ * Redistributions in binary form must reproduce both the above copyright
+ * notice, one of the license notices in the documentation
+ * and/or other materials provided with the distribution.
+ */
+
+#ifndef _DAPL_IB_UTIL_H_
+#define _DAPL_IB_UTIL_H_
+#define _OPENIB_SCM_
+
+#include <infiniband/verbs.h>
+#include "openib_osd.h"
+#include "dapl_ib_common.h"
+
+#define UCM_DEFAULT_CQE 500
+#define UCM_DEFAULT_QPE 500
+
+struct ib_cm_handle
+{
+ struct dapl_llist_entry entry;
+ DAPL_OS_LOCK lock;
+ int state;
+ int retries;
+ struct dapl_hca *hca;
+ struct dapl_sp *sp;
+ struct dapl_ep *ep;
+ ib_cm_msg_t msg;
+};
+
+typedef struct ib_cm_handle *dp_ib_cm_handle_t;
+typedef dp_ib_cm_handle_t ib_cm_srvc_handle_t;
+
+/* Definitions */
+#define IB_INVALID_HANDLE NULL
+
+/* ib_hca_transport_t, specific to this implementation */
+typedef struct _ib_hca_transport
+{
+ struct ibv_device *ib_dev;
+ struct dapl_hca *hca;
+ struct ibv_context *ib_ctx;
+ struct ibv_comp_channel *ib_cq;
+ ib_cq_handle_t ib_cq_empty;
+ int destroy;
+ int cm_state;
+ DAPL_OS_THREAD thread;
+ DAPL_OS_LOCK lock; /* connect list */
+ struct dapl_llist_entry *list;
+ DAPL_OS_LOCK llock; /* listen list */
+ struct dapl_llist_entry *llist;
+ ib_async_handler_t async_unafiliated;
+ void *async_un_ctx;
+ ib_async_cq_handler_t async_cq_error;
+ ib_async_dto_handler_t async_cq;
+ ib_async_qp_handler_t async_qp_error;
+ union dcm_addr addr; /* lid, port, qp_num, gid */
+ int max_inline_send;
+ int rd_atom_in;
+ int rd_atom_out;
+ uint8_t ack_timer;
+ uint8_t ack_retry;
+ uint8_t rnr_timer;
+ uint8_t rnr_retry;
+ uint8_t global;
+ uint8_t hop_limit;
+ uint8_t tclass;
+ uint8_t mtu;
+ DAT_NAMED_ATTR named_attr;
+ DAPL_SOCKET scm[2];
+ int cqe;
+ int qpe;
+ DAPL_OS_LOCK slock;
+ int s_hd;
+ int s_tl;
+ struct ibv_pd *pd;
+ struct ibv_cq *scq;
+ struct ibv_cq *rcq;
+ struct ibv_qp *qp;
+ struct ibv_mr *mr_rbuf;
+ struct ibv_mr *mr_sbuf;
+ ib_cm_msg_t *sbuf;
+ ib_cm_msg_t *rbuf;
+ struct ibv_comp_channel *rch;
+ struct ibv_ah **ah;
+ DAPL_OS_LOCK plock;
+ uint8_t *sid; /* Sevice IDs, port space, bitarray? */
+
+} ib_hca_transport_t;
+
+/* prototypes */
+void cm_thread(void *arg);
+void ucm_async_event(struct dapl_hca *hca);
+dp_ib_cm_handle_t dapls_ib_cm_create(DAPL_EP *ep);
+void dapls_ib_cm_free(dp_ib_cm_handle_t cm, DAPL_EP *ep);
+void dapls_print_cm_list(IN DAPL_IA *ia_ptr);
+
+#endif /* _DAPL_IB_UTIL_H_ */
+
diff --git a/dapl/openib_ucm/device.c b/dapl/openib_ucm/device.c
new file mode 100644
index 0000000..329b050
--- /dev/null
+++ b/dapl/openib_ucm/device.c
@@ -0,0 +1,603 @@
+/*
+ * Copyright (c) 2009 Intel Corporation. All rights reserved.
+ *
+ * This Software is licensed under one of the following licenses:
+ *
+ * 1) under the terms of the "Common Public License 1.0" a copy of which is
+ * available from the Open Source Initiative, see
+ * http://www.opensource.org/licenses/cpl.php.
+ *
+ * 2) under the terms of the "The BSD License" a copy of which is
+ * available from the Open Source Initiative, see
+ * http://www.opensource.org/licenses/bsd-license.php.
+ *
+ * 3) under the terms of the "GNU General Public License (GPL) Version 2" a
+ * copy of which is available from the Open Source Initiative, see
+ * http://www.opensource.org/licenses/gpl-license.php.
+ *
+ * Licensee has the right to choose one of the above licenses.
+ *
+ * Redistributions of source code must retain the above copyright
+ * notice and one of the license notices.
+ *
+ * Redistributions in binary form must reproduce both the above copyright
+ * notice, one of the license notices in the documentation
+ * and/or other materials provided with the distribution.
+ */
+
+#include "openib_osd.h"
+#include "dapl.h"
+#include "dapl_adapter_util.h"
+#include "dapl_ib_util.h"
+#include "dapl_osd.h"
+
+#include <stdlib.h>
+
+static void ucm_service_destroy(IN DAPL_HCA *hca);
+static int ucm_service_create(IN DAPL_HCA *hca);
+
+static int32_t create_cr_pipe(IN DAPL_HCA * hca_ptr)
+{
+ DAPL_SOCKET listen_socket;
+ struct sockaddr_in addr;
+ socklen_t addrlen = sizeof(addr);
+ int ret;
+
+ listen_socket = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP);
+ if (listen_socket == DAPL_INVALID_SOCKET)
+ return 1;
+
+ memset(&addr, 0, sizeof addr);
+ addr.sin_family = AF_INET;
+ addr.sin_addr.s_addr = htonl(0x7f000001);
+ ret = bind(listen_socket, (struct sockaddr *)&addr, sizeof addr);
+ if (ret)
+ goto err1;
+
+ ret = getsockname(listen_socket, (struct sockaddr *)&addr, &addrlen);
+ if (ret)
+ goto err1;
+
+ ret = listen(listen_socket, 0);
+ if (ret)
+ goto err1;
+
+ hca_ptr->ib_trans.scm[1] = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP);
+ if (hca_ptr->ib_trans.scm[1] == DAPL_INVALID_SOCKET)
+ goto err1;
+
+ ret = connect(hca_ptr->ib_trans.scm[1],
+ (struct sockaddr *)&addr, sizeof(addr));
+ if (ret)
+ goto err2;
+
+ hca_ptr->ib_trans.scm[0] = accept(listen_socket, NULL, NULL);
+ if (hca_ptr->ib_trans.scm[0] == DAPL_INVALID_SOCKET)
+ goto err2;
+
+ closesocket(listen_socket);
+ return 0;
+
+ err2:
+ closesocket(hca_ptr->ib_trans.scm[1]);
+ err1:
+ closesocket(listen_socket);
+ return 1;
+}
+
+static void destroy_cr_pipe(IN DAPL_HCA * hca_ptr)
+{
+ closesocket(hca_ptr->ib_trans.scm[0]);
+ closesocket(hca_ptr->ib_trans.scm[1]);
+}
+
+
+/*
+ * dapls_ib_init, dapls_ib_release
+ *
+ * Initialize Verb related items for device open
+ *
+ * Input:
+ * none
+ *
+ * Output:
+ * none
+ *
+ * Returns:
+ * 0 success, -1 error
+ *
+ */
+int32_t dapls_ib_init(void)
+{
+ return 0;
+}
+
+int32_t dapls_ib_release(void)
+{
+ return 0;
+}
+
+#if defined(_WIN64) || defined(_WIN32)
+int dapls_config_comp_channel(struct ibv_comp_channel *channel)
+{
+ return 0;
+}
+#else // _WIN64 || WIN32
+int dapls_config_comp_channel(struct ibv_comp_channel *channel)
+{
+ int opts;
+
+ opts = fcntl(channel->fd, F_GETFL); /* uCQ */
+ if (opts < 0 || fcntl(channel->fd, F_SETFL, opts | O_NONBLOCK) < 0) {
+ dapl_log(DAPL_DBG_TYPE_ERR,
+ " dapls_create_comp_channel: fcntl on ib_cq->fd %d ERR %d %s\n",
+ channel->fd, opts, strerror(errno));
+ return errno;
+ }
+
+ return 0;
+}
+#endif
+
+/*
+ * dapls_ib_open_hca
+ *
+ * Open HCA
+ *
+ * Input:
+ * *hca_name pointer to provider device name
+ * *ib_hca_handle_p pointer to provide HCA handle
+ *
+ * Output:
+ * none
+ *
+ * Return:
+ * DAT_SUCCESS
+ * dapl_convert_errno
+ *
+ */
+DAT_RETURN dapls_ib_open_hca(IN IB_HCA_NAME hca_name, IN DAPL_HCA * hca_ptr)
+{
+ struct ibv_device **dev_list;
+ struct ibv_port_attr port_attr;
+ int i;
+ DAT_RETURN dat_status;
+
+ /* Get list of all IB devices, find match, open */
+ dev_list = ibv_get_device_list(NULL);
+ if (!dev_list) {
+ dapl_dbg_log(DAPL_DBG_TYPE_ERR,
+ " open_hca: ibv_get_device_list() failed\n",
+ hca_name);
+ return DAT_INTERNAL_ERROR;
+ }
+
+ for (i = 0; dev_list[i]; ++i) {
+ hca_ptr->ib_trans.ib_dev = dev_list[i];
+ if (!strcmp(ibv_get_device_name(hca_ptr->ib_trans.ib_dev),
+ hca_name))
+ goto found;
+ }
+
+ dapl_log(DAPL_DBG_TYPE_ERR,
+ " open_hca: device %s not found\n", hca_name);
+ goto err;
+
+found:
+
+ hca_ptr->ib_hca_handle = ibv_open_device(hca_ptr->ib_trans.ib_dev);
+ if (!hca_ptr->ib_hca_handle) {
+ dapl_log(DAPL_DBG_TYPE_ERR,
+ " open_hca: dev open failed for %s, err=%s\n",
+ ibv_get_device_name(hca_ptr->ib_trans.ib_dev),
+ strerror(errno));
+ goto err;
+ }
+ hca_ptr->ib_trans.ib_ctx = hca_ptr->ib_hca_handle;
+
+ /* get lid for this hca-port, network order */
+ if (ibv_query_port(hca_ptr->ib_hca_handle,
+ (uint8_t)hca_ptr->port_num, &port_attr)) {
+ dapl_log(DAPL_DBG_TYPE_ERR,
+ " open_hca: get lid ERR for %s, err=%s\n",
+ ibv_get_device_name(hca_ptr->ib_trans.ib_dev),
+ strerror(errno));
+ goto err;
+ } else {
+ hca_ptr->ib_trans.addr.ib.lid = htons(port_attr.lid);
+ hca_ptr->ib_trans.addr.ib.port_num = hca_ptr->port_num;
+ }
+
+ /* get gid for this hca-port, network order */
+ if (ibv_query_gid(hca_ptr->ib_hca_handle,
+ (uint8_t) hca_ptr->port_num,
+ 0, &hca_ptr->ib_trans.addr.ib.gid)) {
+ dapl_log(DAPL_DBG_TYPE_ERR,
+ " open_hca: query GID ERR for %s, err=%s\n",
+ ibv_get_device_name(hca_ptr->ib_trans.ib_dev),
+ strerror(errno));
+ goto err;
+ }
+
+ /* set RC tunables via enviroment or default */
+ hca_ptr->ib_trans.max_inline_send =
+ dapl_os_get_env_val("DAPL_MAX_INLINE", INLINE_SEND_IB_DEFAULT);
+ hca_ptr->ib_trans.ack_retry =
+ dapl_os_get_env_val("DAPL_ACK_RETRY", DCM_ACK_RETRY);
+ hca_ptr->ib_trans.ack_timer =
+ dapl_os_get_env_val("DAPL_ACK_TIMER", DCM_ACK_TIMER);
+ hca_ptr->ib_trans.rnr_retry =
+ dapl_os_get_env_val("DAPL_RNR_RETRY", DCM_RNR_RETRY);
+ hca_ptr->ib_trans.rnr_timer =
+ dapl_os_get_env_val("DAPL_RNR_TIMER", DCM_RNR_TIMER);
+ hca_ptr->ib_trans.global =
+ dapl_os_get_env_val("DAPL_GLOBAL_ROUTING", DCM_GLOBAL);
+ hca_ptr->ib_trans.hop_limit =
+ dapl_os_get_env_val("DAPL_HOP_LIMIT", DCM_HOP_LIMIT);
+ hca_ptr->ib_trans.tclass =
+ dapl_os_get_env_val("DAPL_TCLASS", DCM_TCLASS);
+ hca_ptr->ib_trans.mtu =
+ dapl_ib_mtu(dapl_os_get_env_val("DAPL_IB_MTU", DCM_IB_MTU));
+
+ /* initialize CM list, LISTEN, SND queue, PSP array, locks */
+ if ((dapl_os_lock_init(&hca_ptr->ib_trans.lock)) != DAT_SUCCESS)
+ goto err;
+
+ if ((dapl_os_lock_init(&hca_ptr->ib_trans.llock)) != DAT_SUCCESS)
+ goto err;
+
+ if ((dapl_os_lock_init(&hca_ptr->ib_trans.slock)) != DAT_SUCCESS)
+ goto err;
+
+ if ((dapl_os_lock_init(&hca_ptr->ib_trans.plock)) != DAT_SUCCESS)
+ goto err;
+
+
+ /* initialize CM and listen lists on this HCA uCM QP */
+ dapl_llist_init_head(&hca_ptr->ib_trans.list);
+ dapl_llist_init_head(&hca_ptr->ib_trans.llist);
+
+ /* create uCM qp services */
+ if (ucm_service_create(hca_ptr))
+ goto bail;
+
+ /* initialize pipe, user level wakeup on select */
+ if (create_cr_pipe(hca_ptr)) {
+ dapl_log(DAPL_DBG_TYPE_ERR,
+ " open_hca: failed to init cr pipe - %s\n",
+ strerror(errno));
+ goto bail;
+ }
+
+ /* create thread to process inbound connect request */
+ hca_ptr->ib_trans.cm_state = IB_THREAD_INIT;
+ dat_status = dapl_os_thread_create(cm_thread,
+ (void *)hca_ptr,
+ &hca_ptr->ib_trans.thread);
+ if (dat_status != DAT_SUCCESS) {
+ dapl_log(DAPL_DBG_TYPE_ERR,
+ " open_hca: failed to create thread\n");
+ goto bail;
+ }
+
+ dapl_dbg_log(DAPL_DBG_TYPE_UTIL,
+ " open_hca: devname %s, port %d, hostname_IP %s\n",
+ ibv_get_device_name(hca_ptr->ib_trans.ib_dev),
+ hca_ptr->ib_trans.addr.ib.port_num,
+ inet_ntoa(((struct sockaddr_in *)
+ &hca_ptr->hca_address)->sin_addr));
+ dapl_dbg_log(DAPL_DBG_TYPE_UTIL,
+ " open_hca: QPN 0x%x LID 0x%x GID Subnet 0x" F64x ""
+ "ID 0x" F64x "\n",
+ ntohl(hca_ptr->ib_trans.addr.ib.qpn),
+ ntohs(hca_ptr->ib_trans.addr.ib.lid),
+ (unsigned long long)
+ htonll(hca_ptr->ib_trans.addr.ib.gid.global.subnet_prefix),
+ (unsigned long long)
+ htonll(hca_ptr->ib_trans.addr.ib.gid.global.interface_id));
+
+ /* save LID, GID, QPN, PORT address information, for ia_queries */
+ hca_ptr->ib_trans.hca = hca_ptr;
+ hca_ptr->ib_trans.addr.ib.qp_type = IBV_QPT_UD;
+ memcpy(&hca_ptr->hca_address,
+ &hca_ptr->ib_trans.addr,
+ sizeof(union dcm_addr));
+
+ ibv_free_device_list(dev_list);
+
+ /* wait for cm_thread */
+ while (hca_ptr->ib_trans.cm_state != IB_THREAD_RUN)
+ dapl_os_sleep_usec(1000);
+
+ return dat_status;
+
+bail:
+ ucm_service_destroy(hca_ptr);
+ ibv_close_device(hca_ptr->ib_hca_handle);
+ hca_ptr->ib_hca_handle = IB_INVALID_HANDLE;
+
+err:
+ ibv_free_device_list(dev_list);
+ return DAT_INTERNAL_ERROR;
+}
+
+/*
+ * dapls_ib_close_hca
+ *
+ * Open HCA
+ *
+ * Input:
+ * DAPL_HCA provide CA handle
+ *
+ * Output:
+ * none
+ *
+ * Return:
+ * DAT_SUCCESS
+ * dapl_convert_errno
+ *
+ */
+DAT_RETURN dapls_ib_close_hca(IN DAPL_HCA * hca_ptr)
+{
+ dapl_dbg_log(DAPL_DBG_TYPE_UTIL, " close_hca: %p\n", hca_ptr);
+
+ if (hca_ptr->ib_trans.cm_state == IB_THREAD_RUN) {
+ hca_ptr->ib_trans.cm_state = IB_THREAD_CANCEL;
+ send(hca_ptr->ib_trans.scm[1], "w", sizeof "w", 0);
+ while (hca_ptr->ib_trans.cm_state != IB_THREAD_EXIT) {
+ dapl_dbg_log(DAPL_DBG_TYPE_UTIL,
+ " close_hca: waiting for cr_thread\n");
+ send(hca_ptr->ib_trans.scm[1], "w", sizeof "w", 0);
+ dapl_os_sleep_usec(1000);
+ }
+ }
+
+ if (hca_ptr->ib_hca_handle != IB_INVALID_HANDLE) {
+ if (ibv_close_device(hca_ptr->ib_hca_handle))
+ return (dapl_convert_errno(errno, "ib_close_device"));
+ hca_ptr->ib_hca_handle = IB_INVALID_HANDLE;
+ }
+
+ dapl_os_lock_destroy(&hca_ptr->ib_trans.lock);
+ dapl_os_lock_destroy(&hca_ptr->ib_trans.llock);
+ destroy_cr_pipe(hca_ptr); /* no longer need pipe */
+ ucm_service_destroy(hca_ptr);
+ return (DAT_SUCCESS);
+}
+
+/* Create uCM endpoint services, allocate remote_ah's array */
+static void ucm_service_destroy(IN DAPL_HCA *hca)
+{
+ ib_hca_transport_t *tp = &hca->ib_trans;
+ int msg_size = sizeof(ib_cm_msg_t);
+
+ if (tp->pd)
+ ibv_dealloc_pd(tp->pd);
+
+ if (tp->rch)
+ ibv_destroy_comp_channel(tp->rch);
+
+ if (tp->scq)
+ ibv_destroy_cq(tp->scq);
+
+ if (tp->rcq)
+ ibv_destroy_cq(tp->rcq);
+
+ if (tp->qp)
+ ibv_destroy_qp(tp->qp);
+
+ if (tp->mr_sbuf)
+ ibv_dereg_mr(tp->mr_sbuf);
+
+ if (tp->mr_sbuf)
+ ibv_dereg_mr(tp->mr_sbuf);
+
+ if (tp->ah)
+ dapl_os_free(tp->ah, (sizeof(*tp->ah) * 0xffff));
+
+ if (tp->sid)
+ dapl_os_free(tp->sid, (sizeof(*tp->sid) * 0xffff));
+
+ if (tp->rbuf)
+ dapl_os_free(tp->rbuf, (msg_size * tp->qpe));
+
+ if (tp->sbuf)
+ dapl_os_free(tp->sbuf, (msg_size * tp->qpe));
+}
+
+static int ucm_service_create(IN DAPL_HCA *hca)
+{
+ struct ibv_qp_init_attr qp_create;
+ ib_hca_transport_t *tp = &hca->ib_trans;
+ struct ibv_recv_wr recv_wr, *recv_err;
+ struct ibv_sge sge;
+ int i, mlen = sizeof(ib_cm_msg_t);
+ int hlen = sizeof(struct ibv_grh); /* hdr included with UD recv */
+
+ dapl_dbg_log(DAPL_DBG_TYPE_UTIL, " ucm_create: \n");
+
+ /* get queue sizes */
+ tp->qpe = dapl_os_get_env_val("DAPL_UCM_QPE", UCM_DEFAULT_QPE);
+ tp->cqe = dapl_os_get_env_val("DAPL_UCM_CQE", UCM_DEFAULT_CQE);
+ tp->pd = ibv_alloc_pd(hca->ib_hca_handle);
+ if (!tp->pd)
+ goto bail;
+
+ tp->rch = ibv_create_comp_channel(hca->ib_hca_handle);
+ if (!tp->rch)
+ goto bail;
+
+ tp->scq = ibv_create_cq(hca->ib_hca_handle, tp->cqe, hca, NULL, 0);
+ if (!tp->scq)
+ goto bail;
+
+ tp->rcq = ibv_create_cq(hca->ib_hca_handle, tp->cqe, hca, tp->rch, 0);
+ if (!tp->rcq)
+ goto bail;
+
+ if(ibv_req_notify_cq(tp->rcq, 0))
+ goto bail;
+
+ dapl_os_memzero((void *)&qp_create, sizeof(qp_create));
+ qp_create.qp_type = IBV_QPT_UD;
+ qp_create.send_cq = tp->scq;
+ qp_create.recv_cq = tp->rcq;
+ qp_create.cap.max_send_wr = qp_create.cap.max_recv_wr = tp->qpe;
+ qp_create.cap.max_send_sge = qp_create.cap.max_recv_sge = 1;
+ qp_create.cap.max_inline_data = tp->max_inline_send;
+ qp_create.qp_context = (void *)hca;
+
+ tp->qp = ibv_create_qp(tp->pd, &qp_create);
+ if (!tp->qp)
+ goto bail;
+
+ tp->ah = (ib_ah_handle_t*) dapl_os_alloc(sizeof(ib_ah_handle_t) * 0xffff);
+ tp->sid = (uint8_t*) dapl_os_alloc(sizeof(uint8_t) * 0xffff);
+ tp->rbuf = (void*) dapl_os_alloc((mlen + hlen) * tp->qpe);
+ tp->sbuf = (void*) dapl_os_alloc(mlen * tp->qpe);
+
+ if (!tp->ah || !tp->rbuf || !tp->sbuf || !tp->sid)
+ goto bail;
+
+ (void)dapl_os_memzero(tp->ah, (sizeof(ib_ah_handle_t) * 0xffff));
+ (void)dapl_os_memzero(tp->sid, (sizeof(uint8_t) * 0xffff));
+ tp->sid[0] = 1; /* resv slot 0, 0 == no ports available */
+ (void)dapl_os_memzero(tp->rbuf, ((mlen + hlen) * tp->qpe));
+ (void)dapl_os_memzero(tp->sbuf, (mlen * tp->qpe));
+
+ tp->mr_sbuf = ibv_reg_mr(tp->pd, tp->sbuf,
+ (mlen * tp->qpe),
+ IBV_ACCESS_LOCAL_WRITE);
+ if (!tp->mr_sbuf)
+ goto bail;
+
+ tp->mr_rbuf = ibv_reg_mr(tp->pd, tp->rbuf,
+ ((mlen + hlen) * tp->qpe),
+ IBV_ACCESS_LOCAL_WRITE);
+ if (!tp->mr_rbuf)
+ goto bail;
+
+ /* modify UD QP: init, rtr, rts */
+ if ((dapls_modify_qp_ud(hca, tp->qp)) != DAT_SUCCESS)
+ goto bail;
+
+ /* post receive buffers, setup head, tail pointers */
+ recv_wr.next = NULL;
+ recv_wr.sg_list = &sge;
+ recv_wr.num_sge = 1;
+ sge.length = mlen + hlen;
+ sge.lkey = tp->mr_rbuf->lkey;
+
+ for (i = 0; i < tp->qpe; i++) {
+ recv_wr.wr_id =
+ (uintptr_t)((char *)&tp->rbuf[i] +
+ sizeof(struct ibv_grh));
+ sge.addr = (uintptr_t) &tp->rbuf[i];
+ if (ibv_post_recv(tp->qp, &recv_wr, &recv_err))
+ goto bail;
+ }
+
+ /* save qp_num as part of ia_address, network order */
+ tp->addr.ib.qpn = htonl(tp->qp->qp_num);
+ return 0;
+bail:
+ dapl_log(DAPL_DBG_TYPE_ERR,
+ " ucm_create_services: ERR %s\n", strerror(errno));
+ ucm_service_destroy(hca);
+ return -1;
+}
+
+void ucm_async_event(struct dapl_hca *hca)
+{
+ struct ibv_async_event event;
+ struct _ib_hca_transport *tp = &hca->ib_trans;
+
+ dapl_log(DAPL_DBG_TYPE_WARN, " async_event(%p)\n", hca);
+
+ if (!ibv_get_async_event(hca->ib_hca_handle, &event)) {
+
+ switch (event.event_type) {
+ case IBV_EVENT_CQ_ERR:
+ {
+ struct dapl_ep *evd_ptr =
+ event.element.cq->cq_context;
+
+ dapl_log(DAPL_DBG_TYPE_ERR,
+ "dapl async_event CQ (%p) ERR %d\n",
+ evd_ptr, event.event_type);
+
+ /* report up if async callback still setup */
+ if (tp->async_cq_error)
+ tp->async_cq_error(hca->ib_hca_handle,
+ event.element.cq,
+ &event, (void *)evd_ptr);
+ break;
+ }
+ case IBV_EVENT_COMM_EST:
+ {
+ /* Received msgs on connected QP before RTU */
+ dapl_log(DAPL_DBG_TYPE_UTIL,
+ " async_event COMM_EST(%p) rdata beat RTU\n",
+ event.element.qp);
+
+ break;
+ }
+ case IBV_EVENT_QP_FATAL:
+ case IBV_EVENT_QP_REQ_ERR:
+ case IBV_EVENT_QP_ACCESS_ERR:
+ case IBV_EVENT_QP_LAST_WQE_REACHED:
+ case IBV_EVENT_SRQ_ERR:
+ case IBV_EVENT_SRQ_LIMIT_REACHED:
+ case IBV_EVENT_SQ_DRAINED:
+ {
+ struct dapl_ep *ep_ptr =
+ event.element.qp->qp_context;
+
+ dapl_log(DAPL_DBG_TYPE_ERR,
+ "dapl async_event QP (%p) ERR %d\n",
+ ep_ptr, event.event_type);
+
+ /* report up if async callback still setup */
+ if (tp->async_qp_error)
+ tp->async_qp_error(hca->ib_hca_handle,
+ ep_ptr->qp_handle,
+ &event, (void *)ep_ptr);
+ break;
+ }
+ case IBV_EVENT_PATH_MIG:
+ case IBV_EVENT_PATH_MIG_ERR:
+ case IBV_EVENT_DEVICE_FATAL:
+ case IBV_EVENT_PORT_ACTIVE:
+ case IBV_EVENT_PORT_ERR:
+ case IBV_EVENT_LID_CHANGE:
+ case IBV_EVENT_PKEY_CHANGE:
+ case IBV_EVENT_SM_CHANGE:
+ {
+ dapl_log(DAPL_DBG_TYPE_WARN,
+ "dapl async_event: DEV ERR %d\n",
+ event.event_type);
+
+ /* report up if async callback still setup */
+ if (tp->async_unafiliated)
+ tp->async_unafiliated(hca->ib_hca_handle,
+ &event,
+ tp->async_un_ctx);
+ break;
+ }
+ case IBV_EVENT_CLIENT_REREGISTER:
+ /* no need to report this event this time */
+ dapl_log(DAPL_DBG_TYPE_UTIL,
+ " async_event: IBV_CLIENT_REREGISTER\n");
+ break;
+
+ default:
+ dapl_log(DAPL_DBG_TYPE_WARN,
+ "dapl async_event: %d UNKNOWN\n",
+ event.event_type);
+ break;
+
+ }
+ ibv_ack_async_event(&event);
+ }
+}
+
diff --git a/dapl/openib_ucm/linux/openib_osd.h b/dapl/openib_ucm/linux/openib_osd.h
new file mode 100644
index 0000000..191a55b
--- /dev/null
+++ b/dapl/openib_ucm/linux/openib_osd.h
@@ -0,0 +1,21 @@
+#ifndef OPENIB_OSD_H
+#define OPENIB_OSD_H
+
+#include <endian.h>
+#include <netinet/in.h>
+
+#if __BYTE_ORDER == __BIG_ENDIAN
+#define htonll(x) (x)
+#define ntohll(x) (x)
+#elif __BYTE_ORDER == __LITTLE_ENDIAN
+#define htonll(x) bswap_64(x)
+#define ntohll(x) bswap_64(x)
+#endif
+
+#define DAPL_SOCKET int
+#define DAPL_INVALID_SOCKET -1
+#define DAPL_FD_SETSIZE 16
+
+#define closesocket close
+
+#endif // OPENIB_OSD_H
diff --git a/dapl/openib_ucm/udapl.rc b/dapl/openib_ucm/udapl.rc
new file mode 100644
index 0000000..8550256
--- /dev/null
+++ b/dapl/openib_ucm/udapl.rc
@@ -0,0 +1,48 @@
+/*
+ * Copyright (c) 2007, 2009 Intel Corporation. All rights reserved.
+ *
+ * This software is available to you under the OpenIB.org BSD license
+ * below:
+ *
+ * Redistribution and use in source and binary forms, with or
+ * without modification, are permitted provided that the following
+ * conditions are met:
+ *
+ * - Redistributions of source code must retain the above
+ * copyright notice, this list of conditions and the following
+ * disclaimer.
+ *
+ * - Redistributions in binary form must reproduce the above
+ * copyright notice, this list of conditions and the following
+ * disclaimer in the documentation and/or other materials
+ * provided with the distribution.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
+ * EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
+ * MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
+ * NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS
+ * BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN
+ * ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
+ * CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ *
+ * $Id$
+ */
+
+
+#include <oib_ver.h>
+
+#define VER_FILETYPE VFT_DLL
+#define VER_FILESUBTYPE VFT2_UNKNOWN
+
+#if DBG
+#define VER_FILEDESCRIPTION_STR "Direct Access Provider Library v2.0 (OFA socket-cm) (Debug)"
+#define VER_INTERNALNAME_STR "dapl2-ofa-scmd.dll"
+#define VER_ORIGINALFILENAME_STR "dapl2-ofa-scmd.dll"
+#else
+#define VER_FILEDESCRIPTION_STR "Direct Access Provider Library v2.0 (OFA socket-cm)"
+#define VER_INTERNALNAME_STR "dapl2-ofa-scm.dll"
+#define VER_ORIGINALFILENAME_STR "dapl2-ofa-scm.dll"
+#endif
+
+#include <common.ver>
diff --git a/dapl/openib_ucm/windows/openib_osd.h b/dapl/openib_ucm/windows/openib_osd.h
new file mode 100644
index 0000000..7eb3df3
--- /dev/null
+++ b/dapl/openib_ucm/windows/openib_osd.h
@@ -0,0 +1,35 @@
+#ifndef OPENIB_OSD_H
+#define OPENIB_OSD_H
+
+#ifndef FD_SETSIZE
+#define FD_SETSIZE 1024 /* Set before including winsock2 - see select help */
+#define DAPL_FD_SETSIZE FD_SETSIZE
+#endif
+
+#include <winsock2.h>
+#include <ws2tcpip.h>
+#include <io.h>
+#include <fcntl.h>
+
+#define ntohll _byteswap_uint64
+#define htonll _byteswap_uint64
+
+#define DAPL_SOCKET SOCKET
+#define DAPL_INVALID_SOCKET INVALID_SOCKET
+
+/* allow casting to WSABUF */
+struct iovec
+{
+ u_long iov_len;
+ char FAR* iov_base;
+};
+
+static int writev(DAPL_SOCKET s, struct iovec *vector, int count)
+{
+ int len, ret;
+
+ ret = WSASend(s, (WSABUF *) vector, count, &len, 0, NULL, NULL);
+ return ret ? ret : len;
+}
+
+#endif // OPENIB_OSD_H
diff --git a/test/dtest/dtest.c b/test/dtest/dtest.c
index d868490..2f418fe 100755
--- a/test/dtest/dtest.c
+++ b/test/dtest/dtest.c
@@ -76,6 +76,7 @@
#include <getopt.h>
#include <inttypes.h>
#include <unistd.h>
+#include <stdlib.h>
#define DAPL_PROVIDER "ofa-v2-ib0"
@@ -99,14 +100,15 @@
#define MAX_PROCS 1000
/* Header files needed for DAT/uDAPL */
-#include "dat2/udat.h"
+#include "infiniband/verbs.h"
+#include "dat2/udat.h"
/* definitions */
#define SERVER_CONN_QUAL 45248
#define DTO_TIMEOUT (1000*1000*5)
#define CNO_TIMEOUT (1000*1000*1)
#define DTO_FLUSH_TIMEOUT (1000*1000*2)
-#define CONN_TIMEOUT (1000*1000*10)
+#define CONN_TIMEOUT (1000*1000*100)
#define SERVER_TIMEOUT DAT_TIMEOUT_INFINITE
#define RDMA_BUFFER_SIZE (64)
@@ -187,7 +189,7 @@ struct dt_time {
double conn;
};
-struct dt_time time;
+struct dt_time ts;
/* defaults */
static int failed = 0;
@@ -207,6 +209,22 @@ static int use_cno = 0;
static int recv_msg_index = 0;
static int burst_msg_posted = 0;
static int burst_msg_index = 0;
+static int ucm = 0;
+
+/* IB address structure used by DAPL uCM provider */
+union dcm_addr {
+ DAT_SOCK_ADDR6 so;
+ struct {
+ uint8_t qp_type;
+ uint8_t port_num;
+ uint16_t lid;
+ uint32_t qpn;
+ union ibv_gid gid;
+ } ib;
+};
+
+static union dcm_addr remote;
+static union dcm_addr local;
/* forward prototypes */
const char *DT_RetToStr(DAT_RETURN ret_value);
@@ -313,9 +331,10 @@ int main(int argc, char **argv)
int i, c;
DAT_RETURN ret;
DAT_EP_PARAM ep_param;
+ DAT_IA_ATTR ia_attr;
/* parse arguments */
- while ((c = getopt(argc, argv, "tscvpb:d:B:h:P:")) != -1) {
+ while ((c = getopt(argc, argv, "tscvpq:l:b:d:B:h:P:")) != -1) {
switch (c) {
case 't':
performance_times = 1;
@@ -340,6 +359,16 @@ int main(int argc, char **argv)
printf("%d Polling\n", getpid());
fflush(stdout);
break;
+ case 'q':
+ remote.ib.qpn = htonl(strtol(optarg,NULL,0));
+ ucm = 1;
+ server = 0;
+ break;
+ case 'l':
+ remote.ib.lid = htons(strtol(optarg,NULL,0));
+ ucm = 1;
+ server = 0;
+ break;
case 'B':
burst = atoi(optarg);
break;
@@ -389,7 +418,7 @@ int main(int argc, char **argv)
perror("malloc");
exit(1);
}
- memset(&time, 0, sizeof(struct dt_time));
+ memset(&ts, 0, sizeof(struct dt_time));
LOGPRINTF("%d Allocated RDMA buffers (r:%p,s:%p) len %d \n",
getpid(), rbuf, sbuf, buf_len);
@@ -398,7 +427,7 @@ int main(int argc, char **argv)
start = get_time();
ret = dat_ia_open(provider, 8, &h_async_evd, &h_ia);
stop = get_time();
- time.open += ((stop - start) * 1.0e6);
+ ts.open += ((stop - start) * 1.0e6);
if (ret != DAT_SUCCESS) {
fprintf(stderr, "%d: Error Adaptor open: %s\n",
getpid(), DT_RetToStr(ret));
@@ -406,12 +435,34 @@ int main(int argc, char **argv)
} else
LOGPRINTF("%d Opened Interface Adaptor\n", getpid());
+ printf("%d query \n", getpid());
+
+ ret = dat_ia_query(h_ia, 0, DAT_IA_FIELD_ALL, &ia_attr, 0, 0);
+ if (ret != DAT_SUCCESS) {
+ fprintf(stderr, "%d: Error Adaptor query: %s\n",
+ getpid(), DT_RetToStr(ret));
+ exit(1);
+ }
+ memcpy((void*)&local,
+ (void*)ia_attr.ia_address_ptr,
+ sizeof(DAT_SOCK_ADDR6));
+
+ if (local.ib.qp_type == IBV_QPT_UD) {
+ ucm = 1;
+ printf("%d Local uCM Address = QPN=0x%x, LID=0x%x\n",
+ getpid(), ntohl(local.ib.qpn),
+ ntohs(local.ib.lid));
+ printf("%d Remote uCM Address = QPN=0x%x, LID=0x%x\n",
+ getpid(), ntohl(remote.ib.qpn),
+ ntohs(remote.ib.lid));
+ }
+
/* Create Protection Zone */
start = get_time();
LOGPRINTF("%d Create Protection Zone\n", getpid());
ret = dat_pz_create(h_ia, &h_pz);
stop = get_time();
- time.pzc += ((stop - start) * 1.0e6);
+ ts.pzc += ((stop - start) * 1.0e6);
if (ret != DAT_SUCCESS) {
fprintf(stderr, "%d Error creating Protection Zone: %s\n",
getpid(), DT_RetToStr(ret));
@@ -461,8 +512,8 @@ int main(int argc, char **argv)
ret = dat_ep_create(h_ia, h_pz, h_dto_rcv_evd,
h_dto_req_evd, h_conn_evd, &ep_attr, &h_ep);
stop = get_time();
- time.epc += ((stop - start) * 1.0e6);
- time.total += time.epc;
+ ts.epc += ((stop - start) * 1.0e6);
+ ts.total += ts.epc;
if (ret != DAT_SUCCESS) {
fprintf(stderr, "%d Error dat_ep_create: %s\n",
getpid(), DT_RetToStr(ret));
@@ -570,8 +621,8 @@ complete:
start = get_time();
ret = dat_ep_free(h_ep);
stop = get_time();
- time.epf += ((stop - start) * 1.0e6);
- time.total += time.epf;
+ ts.epf += ((stop - start) * 1.0e6);
+ ts.total += ts.epf;
if (ret != DAT_SUCCESS) {
fprintf(stderr, "%d Error freeing EP: %s\n",
getpid(), DT_RetToStr(ret));
@@ -603,7 +654,7 @@ complete:
start = get_time();
ret = dat_pz_free(h_pz);
stop = get_time();
- time.pzf += ((stop - start) * 1.0e6);
+ ts.pzf += ((stop - start) * 1.0e6);
if (ret != DAT_SUCCESS) {
fprintf(stderr, "%d Error freeing PZ: %s\n",
getpid(), DT_RetToStr(ret));
@@ -617,7 +668,7 @@ complete:
start = get_time();
ret = dat_ia_close(h_ia, DAT_CLOSE_ABRUPT_FLAG);
stop = get_time();
- time.close += ((stop - start) * 1.0e6);
+ ts.close += ((stop - start) * 1.0e6);
if (ret != DAT_SUCCESS) {
fprintf(stderr, "%d: Error Adaptor close: %s\n",
getpid(), DT_RetToStr(ret));
@@ -640,35 +691,35 @@ complete:
printf("\n%d: DAPL Test Complete.\n\n", getpid());
printf("%d: Message RTT: Total=%10.2lf usec, %d bursts, itime=%10.2lf"
" usec, pc=%d\n",
- getpid(), time.rtt, burst, time.rtt / burst, poll_count);
+ getpid(), ts.rtt, burst, ts.rtt / burst, poll_count);
printf("%d: RDMA write: Total=%10.2lf usec, %d bursts, itime=%10.2lf"
" usec, pc=%d\n",
- getpid(), time.rdma_wr, burst,
- time.rdma_wr / burst, rdma_wr_poll_count);
+ getpid(), ts.rdma_wr, burst,
+ ts.rdma_wr / burst, rdma_wr_poll_count);
for (i = 0; i < MAX_RDMA_RD; i++) {
printf("%d: RDMA read: Total=%10.2lf usec, %d bursts, "
"itime=%10.2lf usec, pc=%d\n",
- getpid(), time.rdma_rd_total, MAX_RDMA_RD,
- time.rdma_rd[i], rdma_rd_poll_count[i]);
+ getpid(), ts.rdma_rd_total, MAX_RDMA_RD,
+ ts.rdma_rd[i], rdma_rd_poll_count[i]);
}
- printf("%d: open: %10.2lf usec\n", getpid(), time.open);
- printf("%d: close: %10.2lf usec\n", getpid(), time.close);
- printf("%d: PZ create: %10.2lf usec\n", getpid(), time.pzc);
- printf("%d: PZ free: %10.2lf usec\n", getpid(), time.pzf);
- printf("%d: LMR create:%10.2lf usec\n", getpid(), time.reg);
- printf("%d: LMR free: %10.2lf usec\n", getpid(), time.unreg);
- printf("%d: EVD create:%10.2lf usec\n", getpid(), time.evdc);
- printf("%d: EVD free: %10.2lf usec\n", getpid(), time.evdf);
+ printf("%d: open: %10.2lf usec\n", getpid(), ts.open);
+ printf("%d: close: %10.2lf usec\n", getpid(), ts.close);
+ printf("%d: PZ create: %10.2lf usec\n", getpid(), ts.pzc);
+ printf("%d: PZ free: %10.2lf usec\n", getpid(), ts.pzf);
+ printf("%d: LMR create:%10.2lf usec\n", getpid(), ts.reg);
+ printf("%d: LMR free: %10.2lf usec\n", getpid(), ts.unreg);
+ printf("%d: EVD create:%10.2lf usec\n", getpid(), ts.evdc);
+ printf("%d: EVD free: %10.2lf usec\n", getpid(), ts.evdf);
if (use_cno) {
- printf("%d: CNO create: %10.2lf usec\n", getpid(), time.cnoc);
- printf("%d: CNO free: %10.2lf usec\n", getpid(), time.cnof);
+ printf("%d: CNO create: %10.2lf usec\n", getpid(), ts.cnoc);
+ printf("%d: CNO free: %10.2lf usec\n", getpid(), ts.cnof);
}
- printf("%d: EP create: %10.2lf usec\n", getpid(), time.epc);
- printf("%d: EP free: %10.2lf usec\n", getpid(), time.epf);
+ printf("%d: EP create: %10.2lf usec\n", getpid(), ts.epc);
+ printf("%d: EP free: %10.2lf usec\n", getpid(), ts.epf);
if (!server)
printf("%d: connect: %10.2lf usec, poll_cnt=%d\n",
- getpid(), time.conn, conn_poll_count);
- printf("%d: TOTAL: %10.2lf usec\n", getpid(), time.total);
+ getpid(), ts.conn, conn_poll_count);
+ printf("%d: TOTAL: %10.2lf usec\n", getpid(), ts.total);
#if defined(_WIN32) || defined(_WIN64)
WSACleanup();
@@ -676,6 +727,17 @@ complete:
return (0);
}
+#if defined(_WIN32) || defined(_WIN64)
+void gettimeofday(struct timeval *t, char *jnk)
+{
+ SYSTEMTIME now;
+ GetLocalTime(&now);
+ t->tv_sec = now.wMinute * 60;
+ t->tv_sec += now.wSecond;
+ t->tv_usec = now.wMilliseconds;
+}
+#endif
+
double get_time(void)
{
struct timeval tp;
@@ -761,7 +823,7 @@ send_msg(void *data,
DAT_RETURN connect_ep(char *hostname, DAT_CONN_QUAL conn_id)
{
- DAT_SOCK_ADDR remote_addr;
+ DAT_IA_ADDRESS_PTR remote_addr = (DAT_IA_ADDRESS_PTR)&remote;
DAT_RETURN ret;
DAT_REGION_DESCRIPTION region;
DAT_EVENT event;
@@ -953,6 +1015,9 @@ DAT_RETURN connect_ep(char *hostname, DAT_CONN_QUAL conn_id)
struct addrinfo *target;
int rval;
+ if (ucm)
+ goto no_resolution;
+
#if defined(_WIN32) || defined(_WIN64)
if ((rval = getaddrinfo(hostname, "ftp", NULL, &target)) != 0) {
printf("\n remote name resolution failed! %s\n",
@@ -972,16 +1037,15 @@ DAT_RETURN connect_ep(char *hostname, DAT_CONN_QUAL conn_id)
getpid(), (rval >> 0) & 0xff, (rval >> 8) & 0xff,
(rval >> 16) & 0xff, (rval >> 24) & 0xff, conn_id);
- remote_addr = *((DAT_IA_ADDRESS_PTR) target->ai_addr);
- freeaddrinfo(target);
-
+ remote_addr = (DAT_IA_ADDRESS_PTR)&target->ai_addr; /* IP */
+no_resolution:
for (i = 0; i < 48; i++) /* simple pattern in private data */
pdata[i] = i + 1;
LOGPRINTF("%d Connecting to server\n", getpid());
start = get_time();
ret = dat_ep_connect(h_ep,
- &remote_addr,
+ remote_addr,
conn_id,
CONN_TIMEOUT,
48,
@@ -993,6 +1057,9 @@ DAT_RETURN connect_ep(char *hostname, DAT_CONN_QUAL conn_id)
return (ret);
} else
LOGPRINTF("%d dat_ep_connect completed\n", getpid());
+
+ if (!ucm)
+ freeaddrinfo(target);
}
printf("%d Waiting for connect response\n", getpid());
@@ -1007,7 +1074,7 @@ DAT_RETURN connect_ep(char *hostname, DAT_CONN_QUAL conn_id)
if (!server) {
stop = get_time();
- time.conn += ((stop - start) * 1.0e6);
+ ts.conn += ((stop - start) * 1.0e6);
}
#ifdef TEST_REJECT_WITH_PRIVATE_DATA
@@ -1307,7 +1374,7 @@ DAT_RETURN do_rdma_write_with_msg(void)
return (DAT_ABORT);
stop = get_time();
- time.rdma_wr = ((stop - start) * 1.0e6);
+ ts.rdma_wr = ((stop - start) * 1.0e6);
/* validate event number and status */
printf("%d inbound rdma_write; send message arrived!\n", getpid());
@@ -1436,8 +1503,8 @@ DAT_RETURN do_rdma_read_with_msg(void)
return (DAT_ABORT);
}
stop = get_time();
- time.rdma_rd[i] = ((stop - start) * 1.0e6);
- time.rdma_rd_total += time.rdma_rd[i];
+ ts.rdma_rd[i] = ((stop - start) * 1.0e6);
+ ts.rdma_rd_total += ts.rdma_rd[i];
LOGPRINTF("%d rdma_read # %d completed\n", getpid(), i + 1);
}
@@ -1675,7 +1742,7 @@ DAT_RETURN do_ping_pong_msg()
snd_buf += buf_len;
}
stop = get_time();
- time.rtt = ((stop - start) * 1.0e6);
+ ts.rtt = ((stop - start) * 1.0e6);
return (DAT_SUCCESS);
}
@@ -1700,8 +1767,8 @@ DAT_RETURN register_rdma_memory(void)
&rmr_context_recv,
®istered_size_recv, ®istered_addr_recv);
stop = get_time();
- time.reg += ((stop - start) * 1.0e6);
- time.total += time.reg;
+ ts.reg += ((stop - start) * 1.0e6);
+ ts.total += ts.reg;
if (ret != DAT_SUCCESS) {
fprintf(stderr,
@@ -1751,8 +1818,8 @@ DAT_RETURN unregister_rdma_memory(void)
start = get_time();
ret = dat_lmr_free(h_lmr_recv);
stop = get_time();
- time.unreg += ((stop - start) * 1.0e6);
- time.total += time.unreg;
+ ts.unreg += ((stop - start) * 1.0e6);
+ ts.total += ts.unreg;
if (ret != DAT_SUCCESS) {
fprintf(stderr, "%d Error deregistering recv mr: %s\n",
getpid(), DT_RetToStr(ret));
@@ -1801,8 +1868,8 @@ DAT_RETURN create_events(void)
&h_dto_cno);
#endif
stop = get_time();
- time.cnoc += ((stop - start) * 1.0e6);
- time.total += time.cnoc;
+ ts.cnoc += ((stop - start) * 1.0e6);
+ ts.total += ts.cnoc;
if (ret != DAT_SUCCESS) {
fprintf(stderr, "%d Error dat_cno_create: %s\n",
getpid(), DT_RetToStr(ret));
@@ -1819,8 +1886,8 @@ DAT_RETURN create_events(void)
dat_evd_create(h_ia, 10, DAT_HANDLE_NULL, DAT_EVD_CR_FLAG,
&h_cr_evd);
stop = get_time();
- time.evdc += ((stop - start) * 1.0e6);
- time.total += time.evdc;
+ ts.evdc += ((stop - start) * 1.0e6);
+ ts.total += ts.evdc;
if (ret != DAT_SUCCESS) {
fprintf(stderr, "%d Error dat_evd_create: %s\n",
getpid(), DT_RetToStr(ret));
@@ -1930,8 +1997,8 @@ DAT_RETURN destroy_events(void)
start = get_time();
ret = dat_evd_free(h_dto_rcv_evd);
stop = get_time();
- time.evdf += ((stop - start) * 1.0e6);
- time.total += time.evdf;
+ ts.evdf += ((stop - start) * 1.0e6);
+ ts.total += ts.evdf;
if (ret != DAT_SUCCESS) {
fprintf(stderr, "%d Error freeing dto EVD: %s\n",
getpid(), DT_RetToStr(ret));
@@ -1962,8 +2029,8 @@ DAT_RETURN destroy_events(void)
start = get_time();
ret = dat_cno_free(h_dto_cno);
stop = get_time();
- time.cnof += ((stop - start) * 1.0e6);
- time.total += time.cnof;
+ ts.cnof += ((stop - start) * 1.0e6);
+ ts.total += ts.cnof;
if (ret != DAT_SUCCESS) {
fprintf(stderr, "%d Error freeing dto CNO: %s\n",
getpid(), DT_RetToStr(ret));
@@ -2048,6 +2115,8 @@ void print_usage(void)
printf("B: burst count, rdma and msgs \n");
printf("h: hostname/address of server, specified on client\n");
printf("P: provider name (default = OpenIB-cma)\n");
+ printf("l: server lid (required ucm provider)\n");
+ printf("q: server qpn (required ucm provider)\n");
printf("\n");
}
diff --git a/test/dtest/dtestcm.c b/test/dtest/dtestcm.c
index 71d9350..5b0272a 100644
--- a/test/dtest/dtestcm.c
+++ b/test/dtest/dtestcm.c
@@ -76,6 +76,7 @@
#include <getopt.h>
#include <inttypes.h>
#include <unistd.h>
+#include <stdlib.h>
#define DAPL_PROVIDER "ofa-v2-mlx4_0-1"
@@ -96,8 +97,24 @@
#define MAX_POLLING_CNT 50000
/* Header files needed for DAT/uDAPL */
-#include "dat2/udat.h"
-#include "dat2/dat_ib_extensions.h"
+#include "infiniband/verbs.h"
+#include "dat2/udat.h"
+#include "dat2/dat_ib_extensions.h"
+
+/* IB address structure used by DAPL uCM provider */
+union dcm_addr {
+ DAT_SOCK_ADDR6 so;
+ struct {
+ uint8_t qp_type;
+ uint8_t port_num;
+ uint16_t lid;
+ uint32_t qpn;
+ union ibv_gid gid;
+ } ib;
+};
+
+static union dcm_addr remote;
+static union dcm_addr local;
/* definitions */
#define SERVER_CONN_QUAL 45248
@@ -145,7 +162,7 @@ struct dt_time {
double conn;
};
-struct dt_time time;
+struct dt_time ts;
/* defaults */
static int connected = 0;
@@ -160,6 +177,7 @@ static int delay = 0;
static int connections = 1000;
static int burst = 100;
static int port_id = SERVER_CONN_QUAL;
+static int ucm = 0;
/* forward prototypes */
const char *DT_RetToString(DAT_RETURN ret_value);
@@ -191,9 +209,10 @@ int main(int argc, char **argv)
{
int i, c, len;
DAT_RETURN ret;
+ DAT_IA_ATTR ia_attr;
/* parse arguments */
- while ((c = getopt(argc, argv, "smwvub:c:d:h:P:p:")) != -1) {
+ while ((c = getopt(argc, argv, "smwvub:c:d:h:P:p:q:l:")) != -1) {
switch (c) {
case 's':
server = 1;
@@ -230,6 +249,16 @@ int main(int argc, char **argv)
case 'P':
strcpy(provider, optarg);
break;
+ case 'q':
+ remote.ib.qpn = htonl(strtol(optarg,NULL,0));
+ ucm = 1;
+ server = 0;
+ break;
+ case 'l':
+ remote.ib.lid = htons(strtol(optarg,NULL,0));
+ ucm = 1;
+ server = 0;
+ break;
default:
print_usage();
exit(-12);
@@ -283,14 +312,14 @@ int main(int argc, char **argv)
exit(1);
}
memset(h_psp, 0, len);
- memset(&time, 0, sizeof(struct dt_time));
+ memset(&ts, 0, sizeof(struct dt_time));
/* dat_ia_open, dat_pz_create */
h_async_evd = DAT_HANDLE_NULL;
start = get_time();
ret = dat_ia_open(provider, 8, &h_async_evd, &h_ia);
stop = get_time();
- time.open += ((stop - start) * 1.0e6);
+ ts.open += ((stop - start) * 1.0e6);
if (ret != DAT_SUCCESS) {
fprintf(stderr, " Error Adaptor open: %s\n",
DT_RetToString(ret));
@@ -298,12 +327,33 @@ int main(int argc, char **argv)
} else
LOGPRINTF(" Opened Interface Adaptor\n");
+ /* query for UCM addressing */
+ ret = dat_ia_query(h_ia, 0, DAT_IA_FIELD_ALL, &ia_attr, 0, 0);
+ if (ret != DAT_SUCCESS) {
+ fprintf(stderr, "%d: Error Adaptor query: %s\n",
+ getpid(), DT_RetToString(ret));
+ exit(1);
+ }
+ memcpy((void*)&local,
+ (void*)ia_attr.ia_address_ptr,
+ sizeof(DAT_SOCK_ADDR6));
+
+ if (local.ib.qp_type == IBV_QPT_UD) {
+ ucm = 1;
+ printf("%d Local uCM Address = QPN=0x%x, LID=0x%x\n",
+ getpid(), ntohl(local.ib.qpn),
+ ntohs(local.ib.lid));
+ printf("%d Remote uCM Address = QPN=0x%x, LID=0x%x\n",
+ getpid(), ntohl(remote.ib.qpn),
+ ntohs(remote.ib.lid));
+ }
+
/* Create Protection Zone */
start = get_time();
LOGPRINTF(" Create Protection Zone\n");
ret = dat_pz_create(h_ia, &h_pz);
stop = get_time();
- time.pzc += ((stop - start) * 1.0e6);
+ ts.pzc += ((stop - start) * 1.0e6);
if (ret != DAT_SUCCESS) {
fprintf(stderr, " Error creating Protection Zone: %s\n",
DT_RetToString(ret));
@@ -345,8 +395,8 @@ int main(int argc, char **argv)
&ep_attr, &h_ep[i]);
}
stop = get_time();
- time.epc += ((stop - start) * 1.0e6);
- time.total += time.epc;
+ ts.epc += ((stop - start) * 1.0e6);
+ ts.total += ts.epc;
if (ret != DAT_SUCCESS) {
fprintf(stderr, " Error dat_ep_create: %s\n",
DT_RetToString(ret));
@@ -447,7 +497,7 @@ complete:
start = get_time();
ret = dat_pz_free(h_pz);
stop = get_time();
- time.pzf += ((stop - start) * 1.0e6);
+ ts.pzf += ((stop - start) * 1.0e6);
if (ret != DAT_SUCCESS) {
fprintf(stderr, " Error freeing PZ: %s\n",
DT_RetToString(ret));
@@ -462,7 +512,7 @@ complete:
start = get_time();
ret = dat_ia_close(h_ia, DAT_CLOSE_ABRUPT_FLAG);
stop = get_time();
- time.close += ((stop - start) * 1.0e6);
+ ts.close += ((stop - start) * 1.0e6);
if (ret != DAT_SUCCESS) {
fprintf(stderr, " Error Adaptor close: %s\n",
DT_RetToString(ret));
@@ -471,25 +521,25 @@ complete:
LOGPRINTF(" Closed Interface Adaptor\n");
printf(" DAPL Connection Test Complete.\n");
- printf(" open: %10.2lf usec\n", time.open);
- printf(" close: %10.2lf usec\n", time.close);
- printf(" PZ create: %10.2lf usec\n", time.pzc);
- printf(" PZ free: %10.2lf usec\n", time.pzf);
- printf(" LMR create:%10.2lf usec\n", time.reg);
- printf(" LMR free: %10.2lf usec\n", time.unreg);
- printf(" EVD create:%10.2lf usec\n", time.evdc);
- printf(" EVD free: %10.2lf usec\n", time.evdf);
- printf(" EP create: %10.2lf usec avg\n", time.epc/connections);
- printf(" EP free: %10.2lf usec avg\n", time.epf/connections);
+ printf(" open: %10.2lf usec\n", ts.open);
+ printf(" close: %10.2lf usec\n", ts.close);
+ printf(" PZ create: %10.2lf usec\n", ts.pzc);
+ printf(" PZ free: %10.2lf usec\n", ts.pzf);
+ printf(" LMR create:%10.2lf usec\n", ts.reg);
+ printf(" LMR free: %10.2lf usec\n", ts.unreg);
+ printf(" EVD create:%10.2lf usec\n", ts.evdc);
+ printf(" EVD free: %10.2lf usec\n", ts.evdf);
+ printf(" EP create: %10.2lf usec avg\n", ts.epc/connections);
+ printf(" EP free: %10.2lf usec avg\n", ts.epf/connections);
if (!server) {
printf(" Connections: %8.2lf usec, CPS %7.2lf "
"Total %4.2lf secs, poll_cnt=%u, Num=%d\n",
- (double)(time.conn/connections),
- (double)(1/(time.conn/1000000/connections)),
- (double)(time.conn/1000000),
+ (double)(ts.conn/connections),
+ (double)(1/(ts.conn/1000000/connections)),
+ (double)(ts.conn/1000000),
conn_poll_count, connections);
}
- printf(" TOTAL: %4.2lf sec\n", time.total/1000000);
+ printf(" TOTAL: %4.2lf sec\n", ts.total/1000000);
fflush(stderr); fflush(stdout);
bail:
free(h_ep);
@@ -501,6 +551,19 @@ bail:
return (0);
}
+#if defined(_WIN32) || defined(_WIN64)
+
+void gettimeofday(struct timeval *t, char *jnk)
+{
+ SYSTEMTIME now;
+ GetLocalTime(&now);
+ t->tv_sec = now.wMinute * 60;
+ t->tv_sec += now.wSecond;
+ t->tv_usec = now.wMilliseconds;
+}
+
+#endif
+
double get_time(void)
{
struct timeval tp;
@@ -644,7 +707,7 @@ DAT_RETURN conn_server()
DAT_RETURN conn_client()
{
- DAT_SOCK_ADDR raddr;
+ DAT_IA_ADDRESS_PTR raddr = (DAT_IA_ADDRESS_PTR)&remote;
DAT_RETURN ret;
DAT_EVENT event;
DAT_COUNT nmore;
@@ -657,6 +720,9 @@ DAT_RETURN conn_client()
struct addrinfo *target;
int rval;
+ if (ucm)
+ goto no_resolution;
+
#if defined(_WIN32) || defined(_WIN64)
if ((rval = getaddrinfo(hostname, "ftp", NULL, &target)) != 0) {
printf("\n remote name resolution failed! %s\n",
@@ -677,8 +743,9 @@ DAT_RETURN conn_client()
(rval >> 16) & 0xff, (rval >> 24) & 0xff,
port_id);
- raddr = *((DAT_IA_ADDRESS_PTR)target->ai_addr);
- freeaddrinfo(target);
+ raddr = (DAT_IA_ADDRESS_PTR)target->ai_addr;
+
+no_resolution:
for (i = 0; i < 48; i++) /* simple pattern in private data */
pdata[i] = i + 1;
@@ -692,7 +759,7 @@ DAT_RETURN conn_client()
else
conn_id = port_id;
- ret = dat_ep_connect(h_ep[i+ii], &raddr,
+ ret = dat_ep_connect(h_ep[i+ii], raddr,
conn_id, CONN_TIMEOUT,
48, (DAT_PVOID) pdata, 0,
DAT_CONNECT_DEFAULT_FLAG);
@@ -790,7 +857,10 @@ DAT_RETURN conn_client()
}
stop = get_time();
- time.conn += ((stop - start) * 1.0e6);
+ ts.conn += ((stop - start) * 1.0e6);
+
+ if (!ucm)
+ freeaddrinfo(target);
printf("\n ALL %d CONNECTED on Client!\n\n", connections);
@@ -825,8 +895,8 @@ DAT_RETURN disconnect_eps(void)
}
}
stop = get_time();
- time.epf += ((stop - start) * 1.0e6);
- time.total += time.epf;
+ ts.epf += ((stop - start) * 1.0e6);
+ ts.total += ts.epf;
return DAT_SUCCESS;
}
@@ -900,8 +970,8 @@ DAT_RETURN disconnect_eps(void)
}
/* free EPs */
stop = get_time();
- time.epf += ((stop - start) * 1.0e6);
- time.total += time.epf;
+ ts.epf += ((stop - start) * 1.0e6);
+ ts.total += ts.epf;
return DAT_SUCCESS;
}
@@ -918,8 +988,8 @@ DAT_RETURN create_events(void)
ret = dat_evd_create(h_ia, connections, DAT_HANDLE_NULL,
DAT_EVD_CR_FLAG, &h_cr_evd);
stop = get_time();
- time.evdc += ((stop - start) * 1.0e6);
- time.total += time.evdc;
+ ts.evdc += ((stop - start) * 1.0e6);
+ ts.total += ts.evdc;
if (ret != DAT_SUCCESS) {
fprintf(stderr, " Error dat_evd_create: %s\n",
DT_RetToString(ret));
@@ -1009,8 +1079,8 @@ DAT_RETURN destroy_events(void)
start = get_time();
ret = dat_evd_free(h_dto_rcv_evd);
stop = get_time();
- time.evdf += ((stop - start) * 1.0e6);
- time.total += time.evdf;
+ ts.evdf += ((stop - start) * 1.0e6);
+ ts.total += ts.evdf;
if (ret != DAT_SUCCESS) {
fprintf(stderr, " Error freeing dto EVD: %s\n",
DT_RetToString(ret));
diff --git a/test/dtest/dtestx.c b/test/dtest/dtestx.c
index a14785b..af87af0 100755
--- a/test/dtest/dtestx.c
+++ b/test/dtest/dtestx.c
@@ -65,6 +65,7 @@
#endif
+#include "infiniband/verbs.h"
#include "dat2/udat.h"
#include "dat2/dat_ib_extensions.h"
@@ -178,6 +179,22 @@ int eps = 1;
int verbose = 0;
int counters = 0;
int counters_ok = 0;
+static int ucm = 0;
+
+/* IB address structure used by DAPL uCM provider */
+union dcm_addr {
+ DAT_SOCK_ADDR6 so;
+ struct {
+ uint8_t qp_type;
+ uint8_t port_num;
+ uint16_t lid;
+ uint32_t qpn;
+ union ibv_gid gid;
+ } ib;
+};
+
+static union dcm_addr remote;
+static union dcm_addr local;
#define LOGPRINTF if (verbose) printf
@@ -392,8 +409,9 @@ void process_conn(int idx)
int connect_ep(char *hostname)
{
- DAT_SOCK_ADDR remote_addr;
+ DAT_IA_ADDRESS_PTR remote_addr = (DAT_IA_ADDRESS_PTR)&remote;
DAT_EP_ATTR ep_attr;
+ DAT_IA_ATTR ia_attr;
DAT_RETURN status;
DAT_REGION_DESCRIPTION region;
DAT_EVENT event;
@@ -412,10 +430,26 @@ int connect_ep(char *hostname)
_OK(status, "dat_ia_open");
memset(&prov_attrs, 0, sizeof(prov_attrs));
- status = dat_ia_query(ia, NULL, 0, NULL,
+ status = dat_ia_query(ia, NULL,
+ DAT_IA_FIELD_ALL, &ia_attr,
DAT_PROVIDER_FIELD_ALL, &prov_attrs);
_OK(status, "dat_ia_query");
+ memcpy((void*)&local,
+ (void*)ia_attr.ia_address_ptr,
+ sizeof(DAT_SOCK_ADDR6));
+
+ if (local.ib.qp_type == IBV_QPT_UD) {
+ ucm = 1;
+ printf("%d Local uCM Address = QPN=0x%x, LID=0x%x\n",
+ getpid(), ntohl(local.ib.qpn),
+ ntohs(local.ib.lid));
+ printf("%d Remote uCM Address = QPN=0x%x, LID=0x%x\n",
+ getpid(), ntohl(remote.ib.qpn),
+ ntohs(remote.ib.lid));
+ }
+
+
/* Print provider specific attributes */
for (i = 0; i < prov_attrs.num_provider_specific_attr; i++) {
LOGPRINTF(" Provider Specific Attribute[%d] %s=%s\n",
@@ -567,6 +601,9 @@ int connect_ep(char *hostname)
if (!server || (server && ud_test)) {
struct addrinfo *target;
+ if (ucm)
+ goto no_resolution;
+
if (getaddrinfo(hostname, NULL, NULL, &target) != 0) {
printf("Error getting remote address.\n");
exit(1);
@@ -579,10 +616,11 @@ int connect_ep(char *hostname)
inet_ntoa(((struct sockaddr_in *)
target->ai_addr)->sin_addr));
- remote_addr = *((DAT_IA_ADDRESS_PTR) target->ai_addr);
- freeaddrinfo(target);
strcpy((char *)buf[SND_RDMA_BUF_INDEX], "Client written data");
-
+
+ remote_addr = (DAT_IA_ADDRESS_PTR)&target->ai_addr; /* IP */
+no_resolution:
+
/* one Client EP, multiple Server EPs, same conn_qual
* use private data to select EP on Server
*/
@@ -596,13 +634,16 @@ int connect_ep(char *hostname)
pdata = 0; /* just use first EP */
status = dat_ep_connect(ep[0],
- &remote_addr,
+ remote_addr,
(server ? CLIENT_ID :
SERVER_ID), CONN_TIMEOUT, 4,
(DAT_PVOID) & pdata, 0,
DAT_CONNECT_DEFAULT_FLAG);
_OK(status, "dat_ep_connect");
}
+
+ if (!ucm)
+ freeaddrinfo(target);
}
/* UD: process CR's starting with 2nd on server, 1st for client */
@@ -721,7 +762,19 @@ int disconnect_ep(void)
DAT_EVENT event;
DAT_COUNT nmore;
int i;
+
+ if (counters) { /* examples of query and print */
+ int ii;
+ DAT_UINT64 ia_cntrs[DCNT_IA_ALL_COUNTERS];
+ dat_query_counters(ia, DCNT_IA_ALL_COUNTERS, ia_cntrs, 0);
+ printf(" IA Cntrs:");
+ for (ii = 0; ii < DCNT_IA_ALL_COUNTERS; ii++)
+ printf(" " F64u "", ia_cntrs[ii]);
+ printf("\n");
+ dat_print_counters(ia, DCNT_IA_ALL_COUNTERS, 0);
+ }
+
if (!ud_test) {
status = dat_ep_disconnect(ep[0], DAT_CLOSE_DEFAULT);
_OK2(status, "dat_ep_disconnect");
@@ -797,17 +850,6 @@ int disconnect_ep(void)
status = dat_pz_free(pz);
_OK2(status, "dat_pz_free");
- if (counters) { /* examples of query and print */
- int ii;
- DAT_UINT64 ia_cntrs[DCNT_IA_ALL_COUNTERS];
-
- dat_query_counters(ia, DCNT_IA_ALL_COUNTERS, ia_cntrs, 0);
- printf(" IA Cntrs:");
- for (ii = 0; ii < DCNT_IA_ALL_COUNTERS; ii++)
- printf(" " F64u "", ia_cntrs[ii]);
- printf("\n");
- dat_print_counters(ia, DCNT_IA_ALL_COUNTERS, 0);
- }
status = dat_ia_close(ia, DAT_CLOSE_DEFAULT);
_OK2(status, "dat_ia_close");
@@ -1200,7 +1242,7 @@ int main(int argc, char **argv)
int rc;
/* parse arguments */
- while ((rc = getopt(argc, argv, "csvumpU:h:b:P:")) != -1) {
+ while ((rc = getopt(argc, argv, "csvumpU:h:b:P:q:l:")) != -1) {
switch (rc) {
case 'u':
ud_test = 1;
@@ -1235,6 +1277,16 @@ int main(int argc, char **argv)
case 'v':
verbose = 1;
break;
+ case 'q':
+ remote.ib.qpn = htonl(strtol(optarg,NULL,0));
+ ucm = 1;
+ server = 0;
+ break;
+ case 'l':
+ remote.ib.lid = htons(strtol(optarg,NULL,0));
+ ucm = 1;
+ server = 0;
+ break;
default:
print_usage();
exit(-12);
--
1.5.2.5
More information about the ofw
mailing list