[openib-general] [PATCH] uDAPL with IB_CM support
Arlin Davis
arlin.r.davis at intel.com
Thu Jul 7 15:53:11 PDT 2005
James,
Patch for OpenIB uDAPL provider with IB_CM support instead of SOCKET_CM. It was a little too messy
to make conditional and keep SOCKET_CM around so this version only supports IB_CM. Path records are
hand rolled until IBAT support is added in the next 1-2 weeks. See README for details on setup and
limitations.
Signed-off-by: Arlin Davis <ardavis at ichips.intel.com>
Index: dapl/udapl/Makefile
===================================================================
--- dapl/udapl/Makefile (revision 2811)
+++ dapl/udapl/Makefile (working copy)
@@ -122,7 +122,7 @@
#
ifeq ($(VERBS),openib)
PROVIDER = $(TOPDIR)/../openib
-CFLAGS += -DSOCKET_CM -DOPENIB -DCQ_WAIT_OBJECT
+CFLAGS += -DOPENIB -DCQ_WAIT_OBJECT
CFLAGS += -I/usr/local/include/infiniband
endif
Index: dapl/openib/TODO
===================================================================
--- dapl/openib/TODO (revision 2818)
+++ dapl/openib/TODO (working copy)
@@ -4,16 +4,12 @@
- query call to get current qp state
- ibv_get_cq_event() needs timed event call and wakeup
- query call to get device attributes
-- current implementation only supports one event per device
- memory window support
DAPL:
-- Build udapl issues with mthca having reverse dependencies to ibverbs
-- When real CM arrives: change modify_qp_state RTS RTR calls
- reinit EP needs a QP timewait completion notification
-- code disconnect clean
- add cq_object wakeup, time based cq_object wait when verbs support arrives
-- update uDAPL code with real CM and ATS support
+- update uDAPL code with real ATS support
- etc, etc.
Other:
Index: dapl/openib/dapl_ib_dto.h
===================================================================
--- dapl/openib/dapl_ib_dto.h (revision 2818)
+++ dapl/openib/dapl_ib_dto.h (working copy)
@@ -143,6 +143,7 @@
ib_data_segment_t *ds_array_p;
struct ibv_send_wr wr;
struct ibv_send_wr *bad_wr;
+ ib_hca_tranport_t *ibt_ptr = &ep_ptr->header.owner_ia->hca_ptr->ib_trans;
DAT_COUNT i, total_len;
dapl_dbg_log (DAPL_DBG_TYPE_EP,
@@ -195,6 +196,11 @@
wr.wr.rdma.rkey, wr.wr.rdma.remote_addr );
}
+ /* inline data for send or write ops */
+ if ((total_len <= ibt_ptr->max_inline_send ) &&
+ ((op_type == OP_SEND) || (op_type == OP_RDMA_WRITE)))
+ wr.send_flags |= IBV_SEND_INLINE;
+
/* set completion flags in work request */
wr.send_flags |= (DAT_COMPLETION_SUPPRESS_FLAG &
completion_flags) ? 0 : IBV_SEND_SIGNALED;
Index: dapl/openib/dapl_ib_util.c
===================================================================
--- dapl/openib/dapl_ib_util.c (revision 2818)
+++ dapl/openib/dapl_ib_util.c (working copy)
@@ -58,34 +58,82 @@
#include <sys/utsname.h>
#include <unistd.h>
#include <fcntl.h>
+#include <strings.h>
int g_dapl_loopback_connection = 0;
-#ifdef SOCKET_CM
+/* get lid */
+int dapli_get_lid(struct dapl_hca *hca_ptr, int port, uint16_t *lid )
+{
+ struct ibv_port_attr attr;
+
+ if (ibv_query_port(hca_ptr->ib_hca_handle, port, &attr))
+ return 1;
+
+ *lid = attr.lid;
+
+ return 0;
+}
+
+/* get gid */
+int dapli_get_gid(struct dapl_hca *hca_ptr, int port,
+ int index, union ibv_gid *gid )
+{
+ /* ibv_query_gid() coming soon, until then HACK */
+ char path[128];
+ char val[40];
+ char name[256];
+ char *token;
+ uint16_t *p_gid;
+
+ if (sysfs_get_mnt_path(path, sizeof path)) {
+ fprintf(stderr, "Couldn't find sysfs mount.\n");
+ return 1;
+ }
+ sprintf(name, "%s/class/infiniband/%s/ports/%d/gids/%d", path,
+ ibv_get_device_name(hca_ptr->ib_trans.ib_dev), port, index);
+
+ if (sysfs_read_attribute_value(name, val, sizeof val)) {
+ fprintf(stderr, "Couldn't read GID at %s\n", name);
+ return 1;
+ }
+
+ /* get token strings with delimiter */
+ token = strtok(val,":");
+ p_gid = (uint16_t*)gid->raw;
+ while (token) {
+ *p_gid = strtoul(token,NULL,16);
+ *p_gid = htons(*p_gid); /* convert each token to network order */
+ token = strtok(NULL,":");
+ p_gid++;
+ }
+ return 0;
+}
+
+
/* just get IP address for hostname */
-DAT_RETURN getipaddr( char *addr, int addr_len)
+int dapli_get_addr( char *addr, int addr_len)
{
struct sockaddr_in *ipv4_addr = (struct sockaddr_in*)addr;
struct hostent *h_ptr;
struct utsname ourname;
if ( uname( &ourname ) < 0 )
- return DAT_INTERNAL_ERROR;
+ return 1;
h_ptr = gethostbyname( ourname.nodename );
if ( h_ptr == NULL )
- return DAT_INTERNAL_ERROR;
+ return 1;
if ( h_ptr->h_addrtype == AF_INET ) {
ipv4_addr = (struct sockaddr_in*) addr;
ipv4_addr->sin_family = AF_INET;
dapl_os_memcpy( &ipv4_addr->sin_addr, h_ptr->h_addr_list[0], 4 );
} else
- return DAT_INVALID_ADDRESS;
+ return 1;
- return DAT_SUCCESS;
+ return 0;
}
-#endif
/*
* dapls_ib_init, dapls_ib_release
@@ -104,11 +152,15 @@
*/
int32_t dapls_ib_init (void)
{
- return 0;
+ if (dapli_cm_thread_init())
+ return -1;
+ else
+ return 0;
}
int32_t dapls_ib_release (void)
{
+ dapli_cm_thread_destroy();
return 0;
}
@@ -160,48 +212,50 @@
hca_ptr->ib_hca_handle = ibv_open_device(hca_ptr->ib_trans.ib_dev);
if (!hca_ptr->ib_hca_handle) {
- dapl_dbg_log (DAPL_DBG_TYPE_ERR,
+ dapl_dbg_log (DAPL_DBG_TYPE_ERR,
" open_hca: IB dev open failed for %s\n",
ibv_get_device_name(hca_ptr->ib_trans.ib_dev) );
return DAT_INTERNAL_ERROR;
}
-#ifdef SOCKET_CM
- /* initialize cr_list lock */
- dat_status = dapl_os_lock_init(&hca_ptr->ib_trans.lock);
- if (dat_status != DAT_SUCCESS)
- {
+ /* set inline max with enviromment or default, get local lid and gid 0 */
+ hca_ptr->ib_trans.max_inline_send =
+ dapl_os_get_env_val ( "DAPL_MAX_INLINE", INLINE_SEND_DEFAULT );
+
+ if ( dapli_get_lid(hca_ptr, hca_ptr->port_num,
+ &hca_ptr->ib_trans.lid )) {
dapl_dbg_log (DAPL_DBG_TYPE_ERR,
- " open_hca: failed to init lock\n");
- return dat_status;
+ " open_hca: IB get LID failed for %s\n",
+ ibv_get_device_name(hca_ptr->ib_trans.ib_dev) );
+ return DAT_INTERNAL_ERROR;
}
- /* initialize CM list for listens on this HCA */
- dapl_llist_init_head(&hca_ptr->ib_trans.list);
-
- /* create thread to process inbound connect request */
- dat_status = dapl_os_thread_create(cr_thread,
- (void*)hca_ptr,
- &hca_ptr->ib_trans.thread );
- if (dat_status != DAT_SUCCESS)
- {
+ if ( dapli_get_gid(hca_ptr, hca_ptr->port_num, 0,
+ &hca_ptr->ib_trans.gid )) {
dapl_dbg_log (DAPL_DBG_TYPE_ERR,
- " open_hca: failed to create thread\n");
- return dat_status;
+ " open_hca: IB get GID failed for %s\n",
+ ibv_get_device_name(hca_ptr->ib_trans.ib_dev) );
+ return DAT_INTERNAL_ERROR;
}
-
+
/* get the IP address of the device */
- dat_status = getipaddr((char*)&hca_ptr->hca_address,
- sizeof(DAT_SOCK_ADDR6) );
+ if ( dapli_get_addr((char*)&hca_ptr->hca_address,
+ sizeof(DAT_SOCK_ADDR6) )) {
+ dapl_dbg_log (DAPL_DBG_TYPE_ERR,
+ " open_hca: IB get ADDR failed for %s\n",
+ ibv_get_device_name(hca_ptr->ib_trans.ib_dev) );
+ return DAT_INTERNAL_ERROR;
+ }
+
dapl_dbg_log (DAPL_DBG_TYPE_UTIL,
- " open_hca: %s, port %d, %s %d.%d.%d.%d\n",
+ " open_hca: %s, port %d, %s %d.%d.%d.%d INLINE_MAX=%d\n",
ibv_get_device_name(hca_ptr->ib_trans.ib_dev), hca_ptr->port_num,
((struct sockaddr_in *)&hca_ptr->hca_address)->sin_family == AF_INET ?
"AF_INET":"AF_INET6",
((struct sockaddr_in *)&hca_ptr->hca_address)->sin_addr.s_addr >> 0 & 0xff,
((struct sockaddr_in *)&hca_ptr->hca_address)->sin_addr.s_addr >> 8 & 0xff,
((struct sockaddr_in *)&hca_ptr->hca_address)->sin_addr.s_addr >> 16 & 0xff,
- ((struct sockaddr_in *)&hca_ptr->hca_address)->sin_addr.s_addr >> 24 & 0xff );
-#endif
+ ((struct sockaddr_in *)&hca_ptr->hca_address)->sin_addr.s_addr >> 24 & 0xff,
+ hca_ptr->ib_trans.max_inline_send );
return dat_status;
}
@@ -232,20 +286,6 @@
return(dapl_convert_errno(errno,"ib_close_device"));
hca_ptr->ib_hca_handle = IB_INVALID_HANDLE;
}
-
-#if SOCKET_CM
- /* destroy cr_thread and lock */
- hca_ptr->ib_trans.destroy = 1;
- while (hca_ptr->ib_trans.destroy) {
- struct timespec sleep, remain;
- sleep.tv_sec = 0;
- sleep.tv_nsec = 10000000; /* 10 ms */
- dapl_dbg_log(DAPL_DBG_TYPE_UTIL,
- " close_hca: waiting for cr_thread\n");
- nanosleep (&sleep, &remain);
- }
- dapl_os_lock_destroy(&hca_ptr->ib_trans.lock);
-#endif
return (DAT_SUCCESS);
}
@@ -407,3 +447,36 @@
}
return DAT_SUCCESS;
}
+
+#ifdef PROVIDER_SPECIFIC_ATTR
+
+/*
+ * dapls_set_provider_specific_attr
+ *
+ * Input:
+ * attr_ptr Pointer provider attributes
+ *
+ * Output:
+ * none
+ *
+ * Returns:
+ * void
+ */
+DAT_NAMED_ATTR ib_attrs[] = {
+ {
+ "I_DAT_SEND_INLINE_THRESHOLD",
+ "128"
+ },
+};
+
+#define SPEC_ATTR_SIZE( x ) (sizeof( x ) / sizeof( DAT_NAMED_ATTR))
+
+void dapls_set_provider_specific_attr(
+ IN DAT_PROVIDER_ATTR *attr_ptr )
+{
+ attr_ptr->num_provider_specific_attr = SPEC_ATTR_SIZE( ib_attrs );
+ attr_ptr->provider_specific_attr = ib_attrs;
+}
+
+#endif
+
Index: dapl/openib/dapl_ib_cm.c
===================================================================
--- dapl/openib/dapl_ib_cm.c (revision 2818)
+++ dapl/openib/dapl_ib_cm.c (working copy)
@@ -52,451 +52,687 @@
#include "dapl_cr_util.h"
#include "dapl_name_service.h"
#include "dapl_ib_util.h"
+#include <sys/poll.h>
+#include <signal.h>
-#ifdef SOCKET_CM
+/* prototypes */
+static void dapli_path_comp_handler(uint64_t req_id, void *context, int rec_num);
+static void dapli_rt_comp_handler(uint64_t req_id, void *context, int rec_num);
+static void dapli_rep_recv(struct dapl_cm_id *conn, struct ib_cm_event *rep_recv_param);
+static struct dapl_cm_id * dapli_req_recv(struct dapl_cm_id *conn, struct ib_cm_event *event);
+static int dapli_cm_active_cb(struct dapl_cm_id *conn, struct ib_cm_event *event);
+static int dapli_cm_passive_cb(struct dapl_cm_id *conn, struct ib_cm_event *event);
+
+/* prototypes */
+#if __BYTE_ORDER == __LITTLE_ENDIAN
+static inline uint64_t cpu_to_be64(uint64_t x) { return bswap_64(x); }
+#elif __BYTE_ORDER == __BIG_ENDIAN
+static inline uint64_t cpu_to_be64(uint64_t x) { return x; }
+#endif
+
+#ifndef IB_AT
#include <stdio.h>
#include <unistd.h>
#include <fcntl.h>
#include <netinet/tcp.h>
#include <sysfs/libsysfs.h>
+#include <signal.h>
-/* prototypes */
-static uint16_t dapli_get_lid( struct ibv_device *dev, int port );
+/* iclust-20 hard coded values, network order */
+#define REMOTE_GID "fe80:0000:0000:0000:0002:c902:0000:4071"
+#define REMOTE_LID "0002"
+
+static int g_cm_destroy;
+static DAPL_OS_THREAD g_cm_thread;
+static DAPL_OS_LOCK g_cm_lock;
+static struct dapl_llist_entry *g_cm_list;
+
+int dapli_cm_thread_init(void)
+{
+ DAT_RETURN dat_status;
-static DAT_RETURN dapli_socket_connect ( DAPL_EP *ep_ptr,
- DAT_IA_ADDRESS_PTR r_addr,
- DAT_CONN_QUAL r_qual,
- DAT_COUNT p_size,
- DAT_PVOID p_data );
-
-static DAT_RETURN dapli_socket_listen ( DAPL_IA *ia_ptr,
- DAT_CONN_QUAL serviceID,
- DAPL_SP *sp_ptr );
-
-static DAT_RETURN dapli_socket_accept( ib_cm_srvc_handle_t cm_ptr );
-
-static DAT_RETURN dapli_socket_accept_final( DAPL_EP *ep_ptr,
- DAPL_CR *cr_ptr,
- DAT_COUNT p_size,
- DAT_PVOID p_data );
-
-/* XXX temporary hack to get lid */
-static uint16_t dapli_get_lid(IN struct ibv_device *dev, IN int port)
-{
- char path[128];
- char val[16];
- char name[256];
+ dapl_dbg_log(DAPL_DBG_TYPE_CM," cm_thread_init(%d)\n", getpid());
- if (sysfs_get_mnt_path(path, sizeof path)) {
- fprintf(stderr, "Couldn't find sysfs mount.\n");
- return 0;
+ /* initialize cr_list lock */
+ dapl_os_lock_init(&g_cm_lock);
+
+ /* initialize CM list for listens on this HCA */
+ dapl_llist_init_head(&g_cm_list);
+
+ /* create thread to process inbound connect request */
+ dat_status = dapl_os_thread_create(cm_thread, NULL, &g_cm_thread);
+ if (dat_status != DAT_SUCCESS)
+ {
+ dapl_dbg_log(DAPL_DBG_TYPE_ERR,
+ " cm_thread_init: failed to create thread\n");
+ return 1;
}
- sprintf(name, "%s/class/infiniband/%s/ports/%d/lid", path,
- ibv_get_device_name(dev), port);
+ return 0;
+}
- if (sysfs_read_attribute_value(name, val, sizeof val)) {
- fprintf(stderr, "Couldn't read LID at %s\n", name);
- return 0;
+void dapli_cm_thread_destroy(void)
+{
+ dapl_dbg_log(DAPL_DBG_TYPE_CM," cm_thread_destroy(%d)\n", getpid());
+
+ /* destroy cr_thread and lock */
+ g_cm_destroy = 1;
+ pthread_kill( g_cm_thread, SIGUSR1 );
+ dapl_dbg_log(DAPL_DBG_TYPE_CM," cm_thread_destroy(%d) SIGUSR1 sent\n",getpid());
+ while (g_cm_destroy) {
+ struct timespec sleep, remain;
+ sleep.tv_sec = 0;
+ sleep.tv_nsec = 200000000; /* 200 ms */
+ dapl_dbg_log(DAPL_DBG_TYPE_CM,
+ " cm_thread_destroy: waiting for cm_thread\n");
+ nanosleep (&sleep, &remain);
}
- return strtol(val, NULL, 0);
+ dapl_dbg_log(DAPL_DBG_TYPE_CM," cm_thread_destroy(%d) exit\n",getpid());
}
-/*
- * ACTIVE: Create socket, connect, and exchange QP information
- */
-static DAT_RETURN
-dapli_socket_connect ( DAPL_EP *ep_ptr,
- DAT_IA_ADDRESS_PTR r_addr,
- DAT_CONN_QUAL r_qual,
- DAT_COUNT p_size,
- DAT_PVOID p_data )
-{
- ib_cm_handle_t cm_ptr;
- DAPL_IA *ia_ptr = ep_ptr->header.owner_ia;
- int len, opt = 1;
- struct iovec iovec[2];
- short rtu_data = htons(0x0E0F);
+static int ib_at_route_by_ip(uint32_t dst_ip, uint32_t src_ip, int tos, uint16_t flags,
+ struct ib_at_ib_route *ib_route,
+ struct ib_at_completion *async_comp)
+{
+ struct dapl_cm_id *conn = (struct dapl_cm_id *)async_comp->context;
- dapl_dbg_log(DAPL_DBG_TYPE_EP, " connect: r_qual %d\n", r_qual);
-
- /*
- * Allocate CM and initialize
- */
- if ((cm_ptr = dapl_os_alloc(sizeof(*cm_ptr))) == NULL ) {
- return DAT_INSUFFICIENT_RESOURCES;
+ dapl_dbg_log (
+ DAPL_DBG_TYPE_CM,
+ " CM at_route_by_ip: conn %p cm_id %d src %d.%d.%d.%d -> dst %d.%d.%d.%d (%d)\n",
+ conn,conn->cm_id,
+ src_ip >> 0 & 0xff, src_ip >> 8 & 0xff,
+ src_ip >> 16 & 0xff,src_ip >> 24 & 0xff,
+ dst_ip >> 0 & 0xff, dst_ip >> 8 & 0xff,
+ dst_ip >> 16 & 0xff,dst_ip >> 24 & 0xff, conn->service_id);
+
+ /* use req_id for loopback indication */
+ if (( src_ip == dst_ip ) || ( dst_ip == 0x0100007f ))
+ async_comp->req_id = 1;
+ else
+ async_comp->req_id = 0;
+
+ return 1;
+}
+
+static int ib_at_paths_by_route(struct ib_at_ib_route *ib_route, uint32_t mpath_type,
+ struct ib_sa_path_rec *pr, int npath,
+ struct ib_at_completion *async_comp)
+{
+ struct dapl_cm_id *conn = (struct dapl_cm_id *)async_comp->context;
+ char *env, *token;
+ char dgid[40];
+ uint16_t *p_gid = (uint16_t*)&ib_route->gid;
+
+ /* set local path record values and send to remote */
+ (void)dapl_os_memzero(pr, sizeof(*pr));
+
+ pr->slid = htons(conn->hca->ib_trans.lid);
+ pr->sgid.global.subnet_prefix = conn->hca->ib_trans.gid.global.subnet_prefix;
+ pr->sgid.global.interface_id = conn->hca->ib_trans.gid.global.interface_id;
+
+ env = getenv("DAPL_REMOTE_LID");
+ if ( env == NULL )
+ env = REMOTE_LID;
+ ib_route->lid = strtol(env,NULL,0);
+
+ env = getenv("DAPL_REMOTE_GID");
+ if ( env == NULL )
+ env = REMOTE_GID;
+
+ dapl_dbg_log(DAPL_DBG_TYPE_CM,
+ " ib_at_paths_by_route: remote LID %x GID %s\n",
+ ib_route->lid,env);
+
+ dapl_os_memcpy( dgid, env, 40 );
+
+ /* get GID with token strings and delimiter */
+ token = strtok(dgid,":");
+ while (token) {
+ *p_gid = strtoul(token,NULL,16);
+ *p_gid = htons(*p_gid); /* convert each token to network order */
+ token = strtok(NULL,":");
+ p_gid++;
+ }
+
+ /* set remote lid and gid, req_id is indication of loopback */
+ if ( !async_comp->req_id ) {
+ pr->dlid = htons(ib_route->lid);
+ pr->dgid.global.subnet_prefix = ib_route->gid.global.subnet_prefix;
+ pr->dgid.global.interface_id = ib_route->gid.global.interface_id;
+ } else {
+ pr->dlid = pr->slid;
+ pr->dgid.global.subnet_prefix = pr->sgid.global.subnet_prefix;
+ pr->dgid.global.interface_id = pr->sgid.global.interface_id;
}
- (void) dapl_os_memzero( cm_ptr, sizeof( *cm_ptr ) );
- cm_ptr->socket = -1;
+ pr->reversible = 0x1000000;
+ pr->pkey = 0xffff;
+ pr->mtu = IBV_MTU_1024;
+ pr->mtu_selector = 2;
+ pr->rate_selector = 2;
+ pr->rate = 3;
+ pr->packet_life_time_selector = 2;
+ pr->packet_life_time = 2;
+
+ dapl_dbg_log(DAPL_DBG_TYPE_CM,
+ " ib_at_paths_by_route: SRC LID 0x%x GID subnet %016llx id %016llx\n",
+ pr->slid,(unsigned long long)(pr->sgid.global.subnet_prefix),
+ (unsigned long long)(pr->sgid.global.interface_id) );
+ dapl_dbg_log(DAPL_DBG_TYPE_CM,
+ " ib_at_paths_by_route: DST LID 0x%x GID subnet %016llx id %016llx\n",
+ pr->dlid,(unsigned long long)(pr->dgid.global.subnet_prefix),
+ (unsigned long long)(pr->dgid.global.interface_id) );
- /* create, connect, sockopt, and exchange QP information */
- if ((cm_ptr->socket = socket(AF_INET,SOCK_STREAM,0)) < 0 ) {
- dapl_os_free( cm_ptr, sizeof( *cm_ptr ) );
- return DAT_INSUFFICIENT_RESOURCES;
- }
+ dapli_path_comp_handler( async_comp->req_id, (void*)conn, 1);
- ((struct sockaddr_in*)r_addr)->sin_port = htons(r_qual);
+ return 0;
+}
- if ( connect(cm_ptr->socket, r_addr, sizeof(*r_addr)) < 0 ) {
- dapl_dbg_log(DAPL_DBG_TYPE_ERR,
- " connect: %s on r_qual %d\n",
- strerror(errno), (unsigned int)r_qual);
- dapl_os_free( cm_ptr, sizeof( *cm_ptr ) );
- return DAT_INVALID_ADDRESS;
- }
- setsockopt(cm_ptr->socket,IPPROTO_TCP,TCP_NODELAY,&opt,sizeof(opt));
-
- /* Send QP info, IA address, and private data */
- cm_ptr->dst.qpn = ep_ptr->qp_handle->qp_num;
- cm_ptr->dst.port = ia_ptr->hca_ptr->port_num;
- cm_ptr->dst.lid = dapli_get_lid( ia_ptr->hca_ptr->ib_trans.ib_dev,
- ia_ptr->hca_ptr->port_num );
- cm_ptr->dst.ia_address = ia_ptr->hca_ptr->hca_address;
- cm_ptr->dst.p_size = p_size;
- iovec[0].iov_base = &cm_ptr->dst;
- iovec[0].iov_len = sizeof(ib_qp_cm_t);
- if ( p_size ) {
- iovec[1].iov_base = p_data;
- iovec[1].iov_len = p_size;
- }
- len = writev( cm_ptr->socket, iovec, (p_size ? 2:1) );
- if ( len != (p_size + sizeof(ib_qp_cm_t)) ) {
- dapl_dbg_log(DAPL_DBG_TYPE_ERR,
- " connect write: ERR %s, wcnt=%d\n",
- strerror(errno), len);
- goto bail;
- }
- dapl_dbg_log(DAPL_DBG_TYPE_EP,
- " connect: SRC port=0x%x lid=0x%x, qpn=0x%x, psize=%d\n",
- cm_ptr->dst.port, cm_ptr->dst.lid,
- cm_ptr->dst.qpn, cm_ptr->dst.p_size );
-
- /* read DST information into cm_ptr, overwrite SRC info */
- len = readv( cm_ptr->socket, iovec, 1 );
- if ( len != sizeof(ib_qp_cm_t) ) {
- dapl_dbg_log(DAPL_DBG_TYPE_ERR,
- " connect read: ERR %s, rcnt=%d\n",
- strerror(errno), len);
- goto bail;
- }
- dapl_dbg_log(DAPL_DBG_TYPE_EP,
- " connect: DST port=0x%x lid=0x%x, qpn=0x%x, psize=%d\n",
- cm_ptr->dst.port, cm_ptr->dst.lid,
- cm_ptr->dst.qpn, cm_ptr->dst.p_size );
+#endif /* ifndef IB_AT */
- /* validate private data size before reading */
- if ( cm_ptr->dst.p_size > IB_MAX_REP_PDATA_SIZE ) {
- dapl_dbg_log(DAPL_DBG_TYPE_ERR,
- " connect read: psize (%d) wrong\n",
- cm_ptr->dst.p_size );
- goto bail;
- }
+static void dapli_path_comp_handler(uint64_t req_id, void *context, int rec_num)
+{
+ struct dapl_cm_id *conn = context;
+ int status;
+ ib_cm_events_t event;
+
+ if (rec_num <= 0) {
+ dapl_dbg_log(DAPL_DBG_TYPE_CM,
+ " path_comp_handler: resolution err %d retry %d\n",
+ rec_num, conn->retries + 1);
+ if (++conn->retries > IB_MAX_AT_RETRY) {
+ dapl_dbg_log(DAPL_DBG_TYPE_CM,
+ " path_comp_handler: ep_ptr 0x%p\n",conn->ep);
+ event = IB_CME_DESTINATION_UNREACHABLE;
+ goto bail;
+ }
- /* read private data into cm_handle if any present */
- if ( cm_ptr->dst.p_size ) {
- iovec[0].iov_base = cm_ptr->p_data;
- iovec[0].iov_len = cm_ptr->dst.p_size;
- len = readv( cm_ptr->socket, iovec, 1 );
- if ( len != cm_ptr->dst.p_size ) {
- dapl_dbg_log(DAPL_DBG_TYPE_ERR,
- " connect read pdata: ERR %s, rcnt=%d\n",
- strerror(errno), len);
+ status = ib_at_paths_by_route(&conn->dapl_rt, 0,
+ &conn->dapl_path, 1,
+ &conn->dapl_comp);
+ if (status) {
+ dapl_dbg_log(DAPL_DBG_TYPE_CM,
+ " path_by_route: err %d id %lld\n",
+ status, conn->dapl_comp.req_id);
+ event = IB_CME_LOCAL_FAILURE;
goto bail;
}
+ return;
}
- /* modify QP to RTR and then to RTS with remote info */
- if ( dapls_modify_qp_state( ep_ptr->qp_handle,
- IBV_QPS_RTR, &cm_ptr->dst ) != DAT_SUCCESS )
- goto bail;
-
- if ( dapls_modify_qp_state( ep_ptr->qp_handle,
- IBV_QPS_RTS, &cm_ptr->dst ) != DAT_SUCCESS )
+ /* Mellanox performance workaround - best performance is MTU of 1024 */
+ if (conn->dapl_path.mtu > IBV_MTU_1024)
+ conn->dapl_path.mtu = IBV_MTU_1024;
+
+ conn->req.service_id = cpu_to_be64(conn->service_id);
+ conn->req.primary_path = &conn->dapl_path;
+ conn->req.alternate_path = NULL;
+
+ status = ib_cm_send_req(conn->cm_id, &conn->req);
+ if (status) {
+ dapl_dbg_log(DAPL_DBG_TYPE_ERR, " ib_send_cm_req failed: %s\n",
+ strerror(errno));
+ event = IB_CME_LOCAL_FAILURE;
goto bail;
-
- ep_ptr->qp_state = IB_QP_STATE_RTS;
-
- /* complete handshake after final QP state change */
- write(cm_ptr->socket, &rtu_data, sizeof(rtu_data) );
-
- /* init cm_handle and post the event with private data */
- ep_ptr->cm_handle = cm_ptr;
- dapl_dbg_log( DAPL_DBG_TYPE_EP," ACTIVE: connected!\n" );
- dapl_evd_connection_callback( ep_ptr->cm_handle,
- IB_CME_CONNECTED,
- cm_ptr->p_data,
- ep_ptr );
- return DAT_SUCCESS;
+ }
+ return;
bail:
- /* close socket, free cm structure and post error event */
- if ( cm_ptr->socket >= 0 )
- close(cm_ptr->socket);
- dapl_os_free( cm_ptr, sizeof( *cm_ptr ) );
- dapls_ib_reinit_ep( ep_ptr ); /* reset QP state */
-
- dapl_evd_connection_callback( ep_ptr->cm_handle,
- IB_CME_LOCAL_FAILURE,
- NULL,
- ep_ptr );
- return DAT_INTERNAL_ERROR;
+ dapl_evd_connection_callback(conn, event, NULL, conn->ep);
}
+static void dapli_rt_comp_handler(uint64_t req_id, void *context, int rec_num)
+{
+ struct dapl_cm_id *conn = context;
+ int status;
+ ib_cm_events_t event;
-/*
- * PASSIVE: Create socket, listen, accept, exchange QP information
- */
-static DAT_RETURN
-dapli_socket_listen ( DAPL_IA *ia_ptr,
- DAT_CONN_QUAL serviceID,
- DAPL_SP *sp_ptr )
-{
- struct sockaddr_in addr;
- ib_cm_srvc_handle_t cm_ptr = NULL;
- int opt = 1;
- DAT_RETURN dat_status = DAT_SUCCESS;
+ if (rec_num <= 0) {
+ dapl_dbg_log(DAPL_DBG_TYPE_ERR,
+ " dapl_rt_comp_handler: rec %d retry %d\n",
+ rec_num, conn->retries+1 );
- dapl_dbg_log ( DAPL_DBG_TYPE_EP,
- " listen(ia_ptr %p ServiceID %d sp_ptr %p)\n",
- ia_ptr, serviceID, sp_ptr);
+ if (++conn->retries > IB_MAX_AT_RETRY) {
+ event = IB_CME_DESTINATION_UNREACHABLE;
+ goto bail;
+ }
- /* Allocate CM and initialize */
- if ((cm_ptr = dapl_os_alloc(sizeof(*cm_ptr))) == NULL)
- return DAT_INSUFFICIENT_RESOURCES;
+ status = ib_at_route_by_ip(((struct sockaddr_in *)&conn->r_addr)->sin_addr.s_addr,
+ 0, 0, 0, &conn->dapl_rt, &conn->dapl_comp);
+ if (status < 0) {
+ dapl_dbg_log(DAPL_DBG_TYPE_ERR, "dapl_rt_comp_handler: "
+ "ib_at_route_by_ip failed with status %d\n",
+ status);
+ event = IB_CME_DESTINATION_UNREACHABLE;
+ goto bail;
+ }
- (void) dapl_os_memzero( cm_ptr, sizeof( *cm_ptr ) );
-
- cm_ptr->socket = cm_ptr->l_socket = -1;
- cm_ptr->sp = sp_ptr;
- cm_ptr->hca_ptr = ia_ptr->hca_ptr;
-
- /* bind, listen, set sockopt, accept, exchange data */
- if ((cm_ptr->l_socket = socket(AF_INET, SOCK_STREAM, 0)) < 0) {
- dapl_dbg_log (DAPL_DBG_TYPE_ERR,
- "socket for listen returned %d\n", errno);
- dat_status = DAT_INSUFFICIENT_RESOURCES;
- goto bail;
+ if (status == 1)
+ dapli_rt_comp_handler(conn->dapl_comp.req_id, conn, 1);
+ return;
}
- setsockopt(cm_ptr->l_socket,SOL_SOCKET,SO_REUSEADDR,&opt,sizeof(opt));
- addr.sin_port = htons(serviceID);
- addr.sin_family = AF_INET;
- addr.sin_addr.s_addr = INADDR_ANY;
-
- if (( bind( cm_ptr->l_socket,(struct sockaddr*)&addr, sizeof(addr) ) < 0) ||
- (listen( cm_ptr->l_socket, 128 ) < 0) ) {
-
- dapl_dbg_log( DAPL_DBG_TYPE_ERR,
- " listen: ERROR %s on conn_qual 0x%x\n",
- strerror(errno),serviceID);
-
- if ( errno == EADDRINUSE )
- dat_status = DAT_CONN_QUAL_IN_USE;
- else
- dat_status = DAT_CONN_QUAL_UNAVAILABLE;
-
+ conn->dapl_comp.fn = &dapli_path_comp_handler;
+ conn->dapl_comp.context = conn;
+ conn->retries = 0;
+ status = ib_at_paths_by_route(&conn->dapl_rt, 0, &conn->dapl_path, 1,
+ &conn->dapl_comp);
+ if (status) {
+ dapl_dbg_log(DAPL_DBG_TYPE_ERR,
+ "dapl_rt_comp_handler: ib_at_paths_by_route "
+ "returned %d id %lld\n", status, conn->dapl_comp.req_id);
+ event = IB_CME_LOCAL_FAILURE;
goto bail;
}
-
- /* set cm_handle for this service point, save listen socket */
- sp_ptr->cm_srvc_handle = cm_ptr;
+ return;
- /* add to SP->CR thread list */
- dapl_llist_init_entry((DAPL_LLIST_ENTRY*)&cm_ptr->entry);
- dapl_os_lock( &cm_ptr->hca_ptr->ib_trans.lock );
- dapl_llist_add_tail(&cm_ptr->hca_ptr->ib_trans.list,
- (DAPL_LLIST_ENTRY*)&cm_ptr->entry, cm_ptr);
- dapl_os_unlock(&cm_ptr->hca_ptr->ib_trans.lock);
-
- dapl_dbg_log( DAPL_DBG_TYPE_CM,
- " listen: qual 0x%x cr %p s_fd %d\n",
- ntohs(serviceID), cm_ptr, cm_ptr->l_socket );
-
- return dat_status;
bail:
- dapl_dbg_log( DAPL_DBG_TYPE_ERR,
- " listen: ERROR on conn_qual 0x%x\n",serviceID);
- if ( cm_ptr->l_socket >= 0 )
- close( cm_ptr->l_socket );
- dapl_os_free( cm_ptr, sizeof( *cm_ptr ) );
- return dat_status;
+ dapl_evd_connection_callback(conn, event, NULL, conn->ep);
}
-
-/*
- * PASSIVE: send local QP information, private data, and wait for
- * active side to respond with QP RTS/RTR status
- */
-static DAT_RETURN
-dapli_socket_accept(ib_cm_srvc_handle_t cm_ptr)
+static void dapli_destroy_cm_id(struct dapl_cm_id *conn)
{
- ib_cm_handle_t acm_ptr;
- void *p_data = NULL;
- int len;
- DAT_RETURN dat_status = DAT_SUCCESS;
-
- /* Allocate accept CM and initialize */
- if ((acm_ptr = dapl_os_alloc(sizeof(*acm_ptr))) == NULL)
- return DAT_INSUFFICIENT_RESOURCES;
+ int in_callback;
- (void) dapl_os_memzero( acm_ptr, sizeof( *acm_ptr ) );
-
- acm_ptr->socket = -1;
- acm_ptr->sp = cm_ptr->sp;
- acm_ptr->hca_ptr = cm_ptr->hca_ptr;
-
- len = sizeof(acm_ptr->dst.ia_address);
- acm_ptr->socket = accept(cm_ptr->l_socket,
- (struct sockaddr*)&acm_ptr->dst.ia_address,
- &len );
+ dapl_dbg_log(DAPL_DBG_TYPE_CM,
+ " destroy_cm_id: conn %p id %d\n",conn,conn->cm_id);
+
+ dapl_os_lock(&conn->lock);
+ conn->destroy = 1;
+ in_callback = conn->in_callback;
+ dapl_os_unlock(&conn->lock);
+
+ if (!in_callback) {
+ ib_cm_destroy_id(conn->cm_id);
+ if (conn->ep)
+ conn->ep->cm_handle = IB_INVALID_HANDLE;
+ if (conn->sp)
+ conn->sp->cm_srvc_handle = IB_INVALID_HANDLE;
+
+ /* take off the CM thread work queue and free */
+ dapl_os_lock( &g_cm_lock );
+ dapl_llist_remove_entry(&g_cm_list,
+ (DAPL_LLIST_ENTRY*)&conn->entry);
+ dapl_os_unlock(&g_cm_lock);
+
+ dapl_os_free(conn, sizeof(*conn));
+ }
+}
- if ( acm_ptr->socket < 0 ) {
+static void dapli_rep_recv(struct dapl_cm_id *conn,
+ struct ib_cm_event *event)
+{
+ int status;
+
+ if (conn->ep->qp_handle == IB_INVALID_HANDLE) {
dapl_dbg_log(DAPL_DBG_TYPE_ERR,
- " accept: ERR %s on FD %d l_cr %p\n",
- strerror(errno),cm_ptr->l_socket,cm_ptr);
- dat_status = DAT_INTERNAL_ERROR;
- goto bail;
- }
+ " dapli_rep_recv: invalid qp "
+ "handle\n");
+ goto disc;
+ }
+
+ /* move QP state to RTR and RTS */
+ /* TODO: could use a ib_cm_init_qp_attr() call here */
+ dapl_dbg_log(DAPL_DBG_TYPE_CM,
+ " rep_recv: RTR_RTS: cm_id %d r_qp 0x%x r_lid 0x%x r_SID %d\n",
+ conn->cm_id,event->param.rep_rcvd.remote_qpn,
+ ntohs(conn->req.primary_path->dlid),conn->service_id );
+
+ if ( dapls_modify_qp_state( conn->ep->qp_handle,
+ IBV_QPS_RTR,
+ event->param.rep_rcvd.remote_qpn,
+ ntohs(conn->req.primary_path->dlid),
+ 1 ) != DAT_SUCCESS )
+ goto disc;
+
+ if ( dapls_modify_qp_state( conn->ep->qp_handle,
+ IBV_QPS_RTS,0,0,0 ) != DAT_SUCCESS)
+ goto disc;
+
- /* read in DST QP info, IA address. check for private data */
- len = read( acm_ptr->socket, &acm_ptr->dst, sizeof(ib_qp_cm_t) );
- if ( len != sizeof(ib_qp_cm_t) ) {
+ status = ib_cm_send_rtu(conn->cm_id, NULL, 0);
+ if (status) {
dapl_dbg_log(DAPL_DBG_TYPE_ERR,
- " accept read: ERR %s, rcnt=%d\n",
- strerror(errno), len);
- dat_status = DAT_INTERNAL_ERROR;
- goto bail;
+ " dapli_rep_recv: ib_send_cm_rtu "
+ "failed: %d\n", status);
+ goto disc;
+ }
+ dapl_dbg_log(DAPL_DBG_TYPE_CM,
+ " rep_recv: conn %p id %d CONNECTED!\n",
+ conn, conn->cm_id );
+
+ dapl_evd_connection_callback(conn, IB_CME_CONNECTED,
+ event->private_data, conn->ep);
+ return;
+
+disc:
+ dapl_evd_connection_callback(conn, IB_CME_LOCAL_FAILURE, NULL,
+ conn->ep);
+}
- }
- dapl_dbg_log(DAPL_DBG_TYPE_EP,
- " accept: DST port=0x%x lid=0x%x, qpn=0x%x, psize=%d\n",
- acm_ptr->dst.port, acm_ptr->dst.lid,
- acm_ptr->dst.qpn, acm_ptr->dst.p_size );
+static struct dapl_cm_id * dapli_req_recv(struct dapl_cm_id *conn,
+ struct ib_cm_event *event)
+{
+ struct dapl_cm_id *new_conn;
- /* validate private data size before reading */
- if ( acm_ptr->dst.p_size > IB_MAX_REQ_PDATA_SIZE ) {
+ if (conn->sp == NULL) {
dapl_dbg_log(DAPL_DBG_TYPE_ERR,
- " accept read: psize (%d) wrong\n",
- acm_ptr->dst.p_size );
- dat_status = DAT_INTERNAL_ERROR;
- goto bail;
+ " dapli_rep_recv: on invalid listen "
+ "handle\n");
+ return NULL;
+ }
+
+ /* allocate new cm_id and merge listen parameters */
+ new_conn = dapl_os_alloc(sizeof(*new_conn));
+ if (new_conn) {
+
+ (void)dapl_os_memzero(new_conn, sizeof(*new_conn));
+ dapl_os_lock_init(&new_conn->lock);
+ new_conn->cm_id = event->cm_id; /* provided by uCM */
+ new_conn->sp = conn->sp;
+ new_conn->hca = conn->hca;
+ new_conn->service_id = conn->service_id;
+
+ /* save request information in new conn */
+ dapl_os_memcpy(&new_conn->req_rcvd,
+ &event->param.req_rcvd,
+ sizeof(struct ib_cm_req_event_param));
+
+ new_conn->req_rcvd.primary_path = &new_conn->dapl_path;
+ new_conn->req_rcvd.alternate_path = NULL;
+
+ dapl_os_memcpy(new_conn->req_rcvd.primary_path,
+ event->param.req_rcvd.primary_path,
+ sizeof(struct ib_sa_path_rec));
+
+ /* put new CR on CM thread event work queue */
+ dapl_llist_init_entry((DAPL_LLIST_ENTRY*)&new_conn->entry);
+ dapl_os_lock( &g_cm_lock );
+ dapl_llist_add_tail(&g_cm_list,
+ (DAPL_LLIST_ENTRY*)&new_conn->entry, new_conn);
+ dapl_os_unlock(&g_cm_lock);
+
+ dapl_dbg_log(DAPL_DBG_TYPE_CM, " passive_cb: "
+ "REQ on HCA %p SP %p SID %d L_ID %d new_id %d p_data %p\n",
+ new_conn->hca, new_conn->sp,
+ conn->service_id, conn->cm_id, new_conn->cm_id,
+ event->private_data );
+
}
+ return new_conn;
+}
- /* read private data into cm_handle if any present */
- if ( acm_ptr->dst.p_size ) {
- len = read( acm_ptr->socket,
- acm_ptr->p_data, acm_ptr->dst.p_size );
- if ( len != acm_ptr->dst.p_size ) {
- dapl_dbg_log(DAPL_DBG_TYPE_ERR,
- " accept read pdata: ERR %s, rcnt=%d\n",
- strerror(errno), len );
- dat_status = DAT_INTERNAL_ERROR;
- goto bail;
- }
- dapl_dbg_log(DAPL_DBG_TYPE_EP,
- " accept: psize=%d read\n",
- acm_ptr->dst.p_size);
- p_data = acm_ptr->p_data;
+
+static int dapli_cm_active_cb(struct dapl_cm_id *conn,
+ struct ib_cm_event *event)
+{
+ int destroy;
+
+ dapl_dbg_log(DAPL_DBG_TYPE_CM,
+ " active_cb: conn %p id %d event %d\n",
+ conn, conn->cm_id, event->event );
+
+ dapl_os_lock(&conn->lock);
+ if (conn->destroy) {
+ dapl_os_unlock(&conn->lock);
+ return 0;
}
-
- /* trigger CR event and return SUCCESS */
- dapls_cr_callback( acm_ptr,
- IB_CME_CONNECTION_REQUEST_PENDING,
- p_data,
- acm_ptr->sp );
+ conn->in_callback = 1;
+ dapl_os_unlock(&conn->lock);
- return DAT_SUCCESS;
+ switch (event->event) {
+ case IB_CM_REQ_ERROR:
+ dapl_evd_connection_callback(conn,
+ IB_CME_DESTINATION_UNREACHABLE,
+ NULL, conn->ep);
+ break;
+ case IB_CM_REJ_RECEIVED:
+ dapl_evd_connection_callback(conn, IB_CME_DESTINATION_REJECT,
+ NULL, conn->ep);
+ break;
+ case IB_CM_REP_RECEIVED:
+ dapli_rep_recv(conn, event);
+ break;
+ case IB_CM_DREQ_RECEIVED:
+ ib_cm_send_drep(conn->cm_id, NULL, 0);
+ break;
+ case IB_CM_DREQ_ERROR:
+ case IB_CM_DREP_RECEIVED:
+ /* Wait to exit timewait. */
+ break;
+ case IB_CM_TIMEWAIT_EXIT:
+ dapl_evd_connection_callback(conn, IB_CME_DISCONNECTED,
+ NULL, conn->ep);
+ break;
+ default:
+ dapl_dbg_log(DAPL_DBG_TYPE_ERR,
+ " dapli_cm_active_cb_handler: Unexpected CM "
+ "event %d on ID 0x%p\n", event->event, conn->cm_id);
+ break;
+ }
-bail:
- if ( acm_ptr->socket >=0 )
- close( acm_ptr->socket );
- dapl_os_free( acm_ptr, sizeof( *acm_ptr ) );
- return DAT_INTERNAL_ERROR;
+ dapl_os_lock(&conn->lock);
+ destroy = conn->destroy;
+ conn->in_callback = conn->destroy;
+ dapl_os_unlock(&conn->lock);
+ if (destroy) {
+ dapl_dbg_log(DAPL_DBG_TYPE_CM,
+ " active_cb: DESTROY conn %p id %d \n",
+ conn, conn->cm_id );
+ if (conn->ep)
+ conn->ep->cm_handle = IB_INVALID_HANDLE;
+
+ /* take off the CM thread work queue and free */
+ dapl_os_lock( &g_cm_lock );
+ dapl_llist_remove_entry(&g_cm_list,
+ (DAPL_LLIST_ENTRY*)&conn->entry);
+ dapl_os_unlock(&g_cm_lock);
+ dapl_os_free(conn, sizeof(*conn));
+ }
+ return(destroy);
}
+static int dapli_cm_passive_cb(struct dapl_cm_id *conn,
+ struct ib_cm_event *event)
+{
+ int destroy;
+ struct dapl_cm_id *new_conn;
-static DAT_RETURN
-dapli_socket_accept_final( DAPL_EP *ep_ptr,
- DAPL_CR *cr_ptr,
- DAT_COUNT p_size,
- DAT_PVOID p_data )
-{
- DAPL_IA *ia_ptr = ep_ptr->header.owner_ia;
- ib_cm_handle_t cm_ptr = cr_ptr->ib_cm_handle;
- ib_qp_cm_t qp_cm;
- struct iovec iovec[2];
- int len;
- short rtu_data = 0;
-
- if (p_size > IB_MAX_REP_PDATA_SIZE)
- return DAT_LENGTH_ERROR;
+ dapl_dbg_log(DAPL_DBG_TYPE_CM,
+ " passive_cb: conn %p id %d event %d\n",
+ conn, conn->cm_id, event->event );
- /* must have a accepted socket */
- if ( cm_ptr->socket < 0 )
- return DAT_INTERNAL_ERROR;
-
- /* modify QP to RTR and then to RTS with remote info already read */
- if ( dapls_modify_qp_state( ep_ptr->qp_handle,
- IBV_QPS_RTR, &cm_ptr->dst ) != DAT_SUCCESS )
- goto bail;
+ if (conn->cm_id == 0)
+ return 0;
- if ( dapls_modify_qp_state( ep_ptr->qp_handle,
- IBV_QPS_RTS, &cm_ptr->dst ) != DAT_SUCCESS )
- goto bail;
+ dapl_os_lock(&conn->lock);
+ if (conn->destroy) {
+ dapl_os_unlock(&conn->lock);
+ return 0;
+ }
+ conn->in_callback = 1;
+ dapl_os_unlock(&conn->lock);
- ep_ptr->qp_state = IB_QP_STATE_RTS;
-
- /* Send QP info, IA address, and private data */
- qp_cm.qpn = ep_ptr->qp_handle->qp_num;
- qp_cm.port = ia_ptr->hca_ptr->port_num;
- qp_cm.lid = dapli_get_lid( ia_ptr->hca_ptr->ib_trans.ib_dev,
- ia_ptr->hca_ptr->port_num );
- qp_cm.ia_address = ia_ptr->hca_ptr->hca_address;
- qp_cm.p_size = p_size;
- iovec[0].iov_base = &qp_cm;
- iovec[0].iov_len = sizeof(ib_qp_cm_t);
- if (p_size) {
- iovec[1].iov_base = p_data;
- iovec[1].iov_len = p_size;
+ switch (event->event) {
+ case IB_CM_REQ_RECEIVED:
+ /* create new conn object with new conn_id from event */
+ new_conn = dapli_req_recv(conn,event);
+
+ if (new_conn)
+ dapls_cr_callback(new_conn, IB_CME_CONNECTION_REQUEST_PENDING,
+ event->private_data, new_conn->sp);
+ break;
+ case IB_CM_REP_ERROR:
+ dapls_cr_callback(conn, IB_CME_DESTINATION_UNREACHABLE,
+ NULL, conn->sp);
+ break;
+ case IB_CM_REJ_RECEIVED:
+ dapls_cr_callback(conn, IB_CME_DESTINATION_REJECT, NULL,
+ conn->sp);
+ break;
+ case IB_CM_RTU_RECEIVED:
+ /* move QP to RTS state */
+ if ( dapls_modify_qp_state(conn->ep->qp_handle,
+ IBV_QPS_RTS,0,0,0 ) != DAT_SUCCESS) {
+ dapls_cr_callback(conn, IB_CME_LOCAL_FAILURE,
+ NULL, conn->sp);
+ } else {
+ dapl_dbg_log(DAPL_DBG_TYPE_CM,
+ " passive_cb: conn %p id %d CONNECTED!\n",
+ conn, conn->cm_id );
+ dapls_cr_callback(conn, IB_CME_CONNECTED,
+ NULL, conn->sp);
+ }
+ break;
+ case IB_CM_DREQ_RECEIVED:
+ ib_cm_send_drep(conn->cm_id, NULL, 0);
+ break;
+ case IB_CM_DREQ_ERROR:
+ case IB_CM_DREP_RECEIVED:
+ /* Wait to exit timewait. */
+ break;
+ case IB_CM_TIMEWAIT_EXIT:
+ dapls_cr_callback(conn, IB_CME_DISCONNECTED, NULL, conn->sp);
+ break;
+ default:
+ dapl_dbg_log(DAPL_DBG_TYPE_ERR, " dapl_cm_passive_cb_handler: "
+ "Unexpected CM event %d on ID 0x%p\n",
+ event->event, conn->cm_id);
+ break;
}
- len = writev( cm_ptr->socket, iovec, (p_size ? 2:1) );
- if (len != (p_size + sizeof(ib_qp_cm_t))) {
- dapl_dbg_log(DAPL_DBG_TYPE_ERR,
- " accept_final: ERR %s, wcnt=%d\n",
- strerror(errno), len);
- goto bail;
+
+ dapl_os_lock(&conn->lock);
+ destroy = conn->destroy;
+ conn->in_callback = conn->destroy;
+ dapl_os_unlock(&conn->lock);
+ if (destroy) {
+ if (conn->ep)
+ conn->ep->cm_handle = IB_INVALID_HANDLE;
+
+ /* take off the CM thread work queue and free */
+ dapl_os_lock( &g_cm_lock );
+ dapl_llist_remove_entry(&g_cm_list,
+ (DAPL_LLIST_ENTRY*)&conn->entry);
+ dapl_os_unlock(&g_cm_lock);
+
+ dapl_os_free(conn, sizeof(*conn));
}
- dapl_dbg_log(DAPL_DBG_TYPE_EP,
- " accept_final: SRC port=0x%x lid=0x%x, qpn=0x%x, psize=%d\n",
- qp_cm.port, qp_cm.lid, qp_cm.qpn, qp_cm.p_size );
+ return(destroy);
+}
+
+/* something to catch the signal */
+static void cm_handler(int signum)
+{
+ dapl_dbg_log (DAPL_DBG_TYPE_CM," cm_thread(%d,0x%x): ENTER cm_handler %d\n",
+ getpid(),g_cm_thread,signum);
+ return;
+}
+
+/* async CM processing thread */
+void cm_thread(void *arg)
+{
+ struct dapl_cm_id *conn, *next_conn;
+ struct ib_cm_event *event;
+ struct pollfd ufds;
+ sigset_t sigset;
+
+ dapl_dbg_log (DAPL_DBG_TYPE_CM,
+ " cm_thread(%d,0x%x): ENTER: cm_fd %d\n",
+ getpid(), g_cm_thread, ib_cm_get_fd());
+
+ sigemptyset(&sigset);
+ sigaddset(&sigset, SIGUSR1);
+ pthread_sigmask(SIG_UNBLOCK, &sigset, NULL);
+ signal( SIGUSR1, cm_handler);
- /* complete handshake after final QP state change */
- len = read(cm_ptr->socket, &rtu_data, sizeof(rtu_data) );
- if ( len != sizeof(rtu_data) || ntohs(rtu_data) != 0x0e0f ) {
- dapl_dbg_log(DAPL_DBG_TYPE_ERR,
- " accept_final: ERR %s, rcnt=%d rdata=%x\n",
- strerror(errno), len, ntohs(rtu_data) );
- goto bail;
- }
+ dapl_os_lock( &g_cm_lock );
+ while (!g_cm_destroy) {
+ int cm_id,ret;
+
+ /* select for CM event, all events process via cm_fd */
+ ufds.fd = ib_cm_get_fd();
+ ufds.events = POLLIN;
+ ufds.revents = 0;
+
+ dapl_os_unlock(&g_cm_lock);
+ ret = poll(&ufds, 1, -1);
+ if ((ret <= 0) || (g_cm_destroy)) {
+ dapl_dbg_log(DAPL_DBG_TYPE_CM,
+ " cm_thread(%d): ERR %s poll\n",
+ getpid(),strerror(errno));
+ dapl_os_lock(&g_cm_lock);
+ break;
+ }
- /* final data exchange if remote QP state is good to go */
- dapl_dbg_log( DAPL_DBG_TYPE_EP," PASSIVE: connected!\n" );
- dapls_cr_callback ( cm_ptr, IB_CME_CONNECTED, NULL, cm_ptr->sp );
- return DAT_SUCCESS;
+ dapl_dbg_log(DAPL_DBG_TYPE_CM,
+ " cm_thread: GET EVENT fd=%d n=%d\n",
+ ib_cm_get_fd(),ret);
+ if (ib_cm_event_get(&event)) {
+ dapl_dbg_log(DAPL_DBG_TYPE_CM,
+ " cm_thread: ERR %s eventi_get on %d\n",
+ strerror(errno), ib_cm_get_fd() );
+ dapl_os_lock(&g_cm_lock);
+ continue;
+ }
+ dapl_dbg_log(DAPL_DBG_TYPE_CM,
+ " cm_thread: GET EVENT fd=%d woke\n",ib_cm_get_fd());
+ dapl_os_lock(&g_cm_lock);
+
+ /* set proper cm_id */
+ if (event->event == IB_CM_REQ_RECEIVED ||
+ event->event == IB_CM_SIDR_REQ_RECEIVED)
+ cm_id = event->param.req_rcvd.listen_id;
+ else
+ cm_id = event->cm_id;
-bail:
- dapl_dbg_log( DAPL_DBG_TYPE_ERR," accept_final: ERR !QP_RTR_RTS \n");
- if ( cm_ptr >= 0 )
- close( cm_ptr->socket );
- dapl_os_free( cm_ptr, sizeof( *cm_ptr ) );
- dapls_ib_reinit_ep( ep_ptr ); /* reset QP state */
+ dapl_dbg_log (DAPL_DBG_TYPE_CM,
+ " cm_thread: EVENT event(%d) cm_id=%d (%d)\n",
+ event->event, event->cm_id, cm_id );
- return DAT_INTERNAL_ERROR;
-}
+ /*
+ * Walk cm_list looking for connection id in event
+ * no need to walk if uCM would provide context with event
+ */
+ if (!dapl_llist_is_empty(&g_cm_list))
+ next_conn = dapl_llist_peek_head(&g_cm_list);
+ else
+ next_conn = NULL;
-#endif
+ ret = 0;
+ while (next_conn) {
+ conn = next_conn;
+ dapl_dbg_log(DAPL_DBG_TYPE_CM,
+ " cm_thread: LIST cm %p c_id %d e_id %d)\n",
+ conn, conn->cm_id, cm_id );
+
+ next_conn = dapl_llist_next_entry(
+ &g_cm_list,
+ (DAPL_LLIST_ENTRY*)&conn->entry );
+
+ if (cm_id == conn->cm_id) {
+ dapl_os_unlock(&g_cm_lock);
+ if (conn->sp)
+ ret = dapli_cm_passive_cb(conn,event);
+ else
+ ret = dapli_cm_active_cb(conn,event);
+ dapl_os_lock(&g_cm_lock);
+ break;
+ }
+ }
+ ib_cm_event_put(event);
+ if (ret) {
+ dapl_dbg_log(DAPL_DBG_TYPE_CM,
+ " cm_thread: destroy cm_id %d\n",cm_id);
+ ib_cm_destroy_id(cm_id);
+ }
+ }
+ dapl_os_unlock(&g_cm_lock);
+ dapl_dbg_log(DAPL_DBG_TYPE_CM," cm_thread(%d) EXIT, cm_list=%s\n",
+ getpid(),dapl_llist_is_empty(&g_cm_list) ? "EMPTY":"NOT EMPTY");
+ g_cm_destroy = 0;
+}
+/************************ DAPL provider entry points **********************/
/*
* dapls_ib_connect
@@ -522,16 +758,17 @@
DAT_RETURN
dapls_ib_connect (
IN DAT_EP_HANDLE ep_handle,
- IN DAT_IA_ADDRESS_PTR remote_ia_address,
- IN DAT_CONN_QUAL remote_conn_qual,
- IN DAT_COUNT private_data_size,
- IN void *private_data )
+ IN DAT_IA_ADDRESS_PTR r_addr,
+ IN DAT_CONN_QUAL r_qual,
+ IN DAT_COUNT p_size,
+ IN void *p_data )
{
DAPL_EP *ep_ptr;
ib_qp_handle_t qp_ptr;
+ int status;
+ DAT_RETURN dat_status = DAT_INTERNAL_ERROR;
+ ib_cm_handle_t conn;
- dapl_dbg_log ( DAPL_DBG_TYPE_EP,
- " connect(ep_handle %p ....)\n", ep_handle);
/*
* Sanity check
*/
@@ -541,13 +778,80 @@
ep_ptr = (DAPL_EP*)ep_handle;
qp_ptr = ep_ptr->qp_handle;
-#ifdef SOCKET_CM
- return (dapli_socket_connect( ep_ptr, remote_ia_address,
- remote_conn_qual,
- private_data_size, private_data ));
-#else
- return DAT_NOT_IMPLEMENTED;
-#endif
+ dapl_dbg_log(DAPL_DBG_TYPE_CM, " connect: r_SID %d, pdata %p, plen %d\n",
+ r_qual,p_data,p_size);
+
+ /* Allocate CM and initialize lock */
+ if ((conn = dapl_os_alloc(sizeof(*conn))) == NULL)
+ return DAT_INSUFFICIENT_RESOURCES;
+
+ (void)dapl_os_memzero(conn, sizeof(*conn));
+
+ dat_status = dapl_os_lock_init(&conn->lock);
+ if (dat_status != DAT_SUCCESS) {
+ dapl_os_free(conn, sizeof(*conn));
+ return DAT_INTERNAL_ERROR;
+ }
+
+ conn->ep = ep_ptr;
+ conn->hca = ep_ptr->header.owner_ia->hca_ptr;
+ status = ib_cm_create_id(&conn->cm_id);
+ if (status < 0) {
+ dat_status = dapl_convert_errno(errno,"create_cm_id");
+ dapl_os_free(conn, sizeof(*conn));
+ return dat_status;
+ }
+ conn->ep->cm_handle = conn;
+
+ /* Setup QP/CM parameters */
+ (void)dapl_os_memzero(&conn->req,sizeof(conn->req));
+ conn->service_id = r_qual;
+ conn->req.qp_num = ep_ptr->qp_handle->qp_num;
+ conn->req.qp_type = IBV_QPT_RC;
+ conn->req.starting_psn = 1;
+ conn->req.private_data = p_data;
+ conn->req.private_data_len = p_size;
+ conn->req.peer_to_peer = 0;
+ conn->req.responder_resources = IB_TARGET_MAX;
+ conn->req.initiator_depth = IB_INITIATOR_DEPTH;
+ conn->req.remote_cm_response_timeout = IB_CM_RESPONSE_TIMEOUT;
+ conn->req.flow_control = 1;
+ conn->req.local_cm_response_timeout = IB_CM_RESPONSE_TIMEOUT;
+ conn->req.retry_count = IB_RC_RETRY_COUNT;
+ conn->req.rnr_retry_count = IB_RNR_RETRY_COUNT;
+ conn->req.max_cm_retries = IB_MAX_CM_RETRIES;
+ conn->req.srq = 0;
+
+ conn->dapl_comp.fn = &dapli_rt_comp_handler;
+ conn->dapl_comp.context = conn;
+ conn->retries = 0;
+ dapl_os_memcpy(&conn->r_addr, r_addr, sizeof(DAT_SOCK_ADDR6));
+
+ status = ib_at_route_by_ip(
+ ((struct sockaddr_in *)&conn->r_addr)->sin_addr.s_addr,
+ ((struct sockaddr_in *)&conn->hca->hca_address)->sin_addr.s_addr,
+ 0, 0, &conn->dapl_rt, &conn->dapl_comp);
+
+ if (status < 0) {
+ dat_status = dapl_convert_errno(errno,"ib_at_route_by_ip");
+ goto destroy;
+ }
+ if (status == 1)
+ dapli_rt_comp_handler(conn->dapl_comp.req_id, conn, 1);
+
+
+ /* put on CM thread work queue */
+ dapl_llist_init_entry((DAPL_LLIST_ENTRY*)&conn->entry);
+ dapl_os_lock( &g_cm_lock );
+ dapl_llist_add_tail(&g_cm_list,
+ (DAPL_LLIST_ENTRY*)&conn->entry, conn);
+ dapl_os_unlock(&g_cm_lock);
+
+ return DAT_SUCCESS;
+
+destroy:
+ dapli_destroy_cm_id(conn);
+ return dat_status;
}
@@ -572,35 +876,26 @@
IN DAPL_EP *ep_ptr,
IN DAT_CLOSE_FLAGS close_flags )
{
- ib_cm_handle_t cm_ptr = ep_ptr->cm_handle;
+ ib_cm_handle_t conn = ep_ptr->cm_handle;
+ int status;
- dapl_dbg_log (DAPL_DBG_TYPE_EP,
- "dapls_ib_disconnect(ep_handle %p ....)\n",
- ep_ptr);
-#ifdef SOCKET_CM
-
- if ( cm_ptr->socket >= 0 ) {
- close( cm_ptr->socket );
- cm_ptr->socket = -1;
+ dapl_dbg_log (DAPL_DBG_TYPE_CM,
+ " disconnect(ep_handle %p, conn %p, cm_id %d flags %x)\n",
+ ep_ptr,conn, (conn?conn->cm_id:0),close_flags);
+
+ if (conn == IB_INVALID_HANDLE)
+ return DAT_SUCCESS;
+
+ if (close_flags == DAT_CLOSE_ABRUPT_FLAG)
+ dapli_destroy_cm_id(conn);
+ else {
+ status = ib_cm_send_dreq(conn->cm_id, NULL, 0);
+ if (status)
+ dapl_dbg_log(DAPL_DBG_TYPE_ERR,
+ "dapl_ib_disconnect: ID %p status %d\n",
+ ep_ptr->cm_handle, status);
}
-
- /* reinit to modify QP state */
- dapls_ib_reinit_ep(ep_ptr);
-#endif
- if ( ep_ptr->cr_ptr ) {
- dapls_cr_callback ( ep_ptr->cm_handle,
- IB_CME_DISCONNECTED,
- NULL,
- ((DAPL_CR *)ep_ptr->cr_ptr)->sp_ptr );
- } else {
- dapl_evd_connection_callback ( ep_ptr->cm_handle,
- IB_CME_DISCONNECTED,
- NULL,
- ep_ptr );
- ep_ptr->cm_handle = NULL;
- dapl_os_free( cm_ptr, sizeof( *cm_ptr ) );
- }
return DAT_SUCCESS;
}
@@ -628,7 +923,14 @@
IN DAT_BOOLEAN active,
IN const ib_cm_events_t ib_cm_event )
{
- return;
+
+ /*
+ * Clean up outstanding connection state
+ */
+ dapls_ib_disconnect(ep_ptr, DAT_CLOSE_ABRUPT_FLAG);
+
+ if (ep_ptr->qp_handle != IB_INVALID_HANDLE)
+ dapls_modify_qp_state(ep_ptr->qp_handle, IBV_QPS_ERR, 0,0,0);
}
/*
@@ -657,11 +959,63 @@
IN DAT_UINT64 ServiceID,
IN DAPL_SP *sp_ptr )
{
-#ifdef SOCKET_CM
- return (dapli_socket_listen( ia_ptr, ServiceID, sp_ptr ));
-#else
- return DAT_NOT_IMPLEMENTED;
-#endif
+ DAT_RETURN dat_status = DAT_SUCCESS;
+ int status;
+ ib_cm_srvc_handle_t conn;
+
+
+ /* Allocate CM and initialize lock */
+ if ((conn = dapl_os_alloc(sizeof(*conn))) == NULL)
+ return DAT_INSUFFICIENT_RESOURCES;
+
+ (void)dapl_os_memzero(conn, sizeof(*conn));
+
+ dat_status = dapl_os_lock_init(&conn->lock);
+ if (dat_status != DAT_SUCCESS) {
+ dapl_os_free(conn, sizeof(*conn));
+ return DAT_INTERNAL_ERROR;
+ }
+
+ status = ib_cm_create_id(&conn->cm_id);
+ if (status < 0) {
+ dat_status = dapl_convert_errno(errno,"create_cm_id");
+ dapl_os_free(conn, sizeof(*conn));
+ return dat_status;
+ }
+
+ dapl_dbg_log(DAPL_DBG_TYPE_CM,
+ " setup_listener(ia_ptr %p SID %d sp_ptr %p conn %p cm_id %d)\n",
+ ia_ptr, ServiceID, sp_ptr, conn, conn->cm_id );
+
+ sp_ptr->cm_srvc_handle = conn;
+ conn->sp = sp_ptr;
+ conn->hca = ia_ptr->hca_ptr;
+ conn->service_id = ServiceID;
+
+ dapl_dbg_log(DAPL_DBG_TYPE_EP,
+ " setup_listener(conn=%p cm_id=%d)\n",
+ sp_ptr->cm_srvc_handle,conn->cm_id);
+
+ status = ib_cm_listen(conn->cm_id,
+ cpu_to_be64(ServiceID), 0);
+ if (status) {
+ if (status == -EBUSY)
+ dat_status = DAT_CONN_QUAL_IN_USE;
+ else
+ dat_status = DAT_INSUFFICIENT_RESOURCES;
+ /* success */
+ } else {
+ /* put on CM thread work queue */
+ dapl_llist_init_entry((DAPL_LLIST_ENTRY*)&conn->entry);
+ dapl_os_lock( &g_cm_lock );
+ dapl_llist_add_tail(&g_cm_list,
+ (DAPL_LLIST_ENTRY*)&conn->entry, conn);
+ dapl_os_unlock(&g_cm_lock);
+ return dat_status;
+ }
+
+ dapli_destroy_cm_id(conn);
+ return dat_status;
}
@@ -687,27 +1041,16 @@
IN DAPL_IA *ia_ptr,
IN DAPL_SP *sp_ptr )
{
- ib_cm_srvc_handle_t cm_ptr = sp_ptr->cm_srvc_handle;
+ ib_cm_srvc_handle_t conn = sp_ptr->cm_srvc_handle;
- dapl_dbg_log (DAPL_DBG_TYPE_EP,
- "dapls_ib_remove_conn_listener(ia_ptr %p sp_ptr %p cm_ptr %p)\n",
- ia_ptr, sp_ptr, cm_ptr );
-#ifdef SOCKET_CM
- /* close accepted socket, free cm_srvc_handle and return */
- if ( cm_ptr != NULL ) {
- if ( cm_ptr->l_socket >= 0 ) {
- close( cm_ptr->l_socket );
- cm_ptr->socket = -1;
- }
- /* cr_thread will free */
- sp_ptr->cm_srvc_handle = NULL;
- }
+ dapl_dbg_log (DAPL_DBG_TYPE_CM,
+ " remove_listener(ia_ptr %p sp_ptr %p cm_ptr %p)\n",
+ ia_ptr, sp_ptr, conn );
+
+ if (sp_ptr->cm_srvc_handle != IB_INVALID_HANDLE)
+ dapli_destroy_cm_id(conn);
+
return DAT_SUCCESS;
-#else
- return DAT_NOT_IMPLEMENTED;
-
-#endif
-
}
/*
@@ -737,30 +1080,84 @@
IN DAT_COUNT p_size,
IN const DAT_PVOID p_data )
{
- DAPL_CR *cr_ptr;
- DAPL_EP *ep_ptr;
-
- dapl_dbg_log (DAPL_DBG_TYPE_EP,
- "dapls_ib_accept_connection(cr %p ep %p prd %p,%d)\n",
- cr_handle, ep_handle, p_data, p_size );
+ DAPL_CR *cr_ptr;
+ DAPL_EP *ep_ptr;
+ DAPL_IA *ia_ptr;
+ DAT_RETURN dat_status;
+ int status;
+ struct ib_cm_rep_param passive_params;
+ struct dapl_cm_id *conn;
+
+ cr_ptr = (DAPL_CR *) cr_handle;
+ ep_ptr = (DAPL_EP *) ep_handle;
+ ia_ptr = ep_ptr->header.owner_ia;
+ conn = cr_ptr->ib_cm_handle;
+
+ dapl_dbg_log (DAPL_DBG_TYPE_CM,
+ " accept_connection(cr %p conn %p, cm_id %d, p_data %p, p_sz=%d)\n",
+ cr_ptr, conn, conn->cm_id, p_data, p_size );
+
+ /* Obtain size of private data structure & contents */
+ if (p_size > IB_MAX_REP_PDATA_SIZE) {
+ dat_status = DAT_ERROR(DAT_LENGTH_ERROR, DAT_NO_SUBTYPE);
+ goto reject;
+ }
+
+ if (ep_ptr->qp_state == DAPL_QP_STATE_UNATTACHED) {
+ /*
+ * If we are lazy attaching the QP then we may need to
+ * hook it up here. Typically, we run this code only for
+ * DAT_PSP_PROVIDER_FLAG
+ */
+ dat_status = dapls_ib_qp_alloc(ia_ptr, ep_ptr, NULL);
+ if (dat_status != DAT_SUCCESS) {
+ dapl_dbg_log(DAPL_DBG_TYPE_ERR,
+ " accept: ib_qp_alloc failed: %d\n",
+ dat_status);
+ goto reject;
+ }
+ }
- cr_ptr = (DAPL_CR *) cr_handle;
- ep_ptr = (DAPL_EP *) ep_handle;
-
- /* allocate and attach a QP if necessary */
- if ( ep_ptr->qp_state == DAPL_QP_STATE_UNATTACHED ) {
- DAT_RETURN status;
- status = dapls_ib_qp_alloc( ep_ptr->header.owner_ia,
- ep_ptr, ep_ptr );
- if ( status != DAT_SUCCESS )
- return status;
- }
-
-#ifdef SOCKET_CM
- return ( dapli_socket_accept_final(ep_ptr, cr_ptr, p_size, p_data) );
-#else
- return DAT_NOT_IMPLEMENTED;
-#endif
+ /* move QP to RTR state, TODO fix port setting */
+ /* TODO: could use a ib_cm_init_qp_attr() call here */
+ dat_status = dapls_modify_qp_state(ep_ptr->qp_handle,
+ IBV_QPS_RTR,
+ conn->req_rcvd.remote_qpn,
+ ntohs(conn->req_rcvd.primary_path->dlid),
+ 1 );
+ if (dat_status != DAT_SUCCESS ) {
+ dapl_dbg_log(DAPL_DBG_TYPE_ERR,
+ " accept: modify_qp_state failed: %d\n",
+ dat_status);
+ goto reject;
+ }
+
+ cr_ptr->param.local_ep_handle = ep_handle;
+ ep_ptr->cm_handle = conn;
+ conn->ep = ep_ptr;
+
+ memset(&passive_params, 0, sizeof(passive_params));
+ passive_params.private_data = p_data;
+ passive_params.private_data_len = p_size;
+ passive_params.qp_num = ep_ptr->qp_handle->qp_num;
+ passive_params.responder_resources = IB_TARGET_MAX;
+ passive_params.initiator_depth = IB_INITIATOR_DEPTH;
+ passive_params.rnr_retry_count = IB_RNR_RETRY_COUNT;
+
+ status = ib_cm_send_rep(conn->cm_id, &passive_params);
+ if (status) {
+ dapl_dbg_log(DAPL_DBG_TYPE_ERR, " accept: "
+ "ib_send_cm_rep failed: %d\n", status);
+ dat_status = DAT_ERROR(DAT_INSUFFICIENT_RESOURCES, 0);
+ goto reject;
+ }
+ return DAT_SUCCESS;
+
+reject:
+ ib_cm_send_rej(conn->cm_id, IB_CM_REJ_CONSUMER_DEFINED, NULL, 0,
+ NULL, 0);
+ dapli_destroy_cm_id(conn);
+ return dat_status;
}
@@ -783,27 +1180,31 @@
*/
DAT_RETURN
dapls_ib_reject_connection (
- IN ib_cm_handle_t ib_cm_handle,
+ IN ib_cm_handle_t cm_handle,
IN int reject_reason )
{
- ib_cm_srvc_handle_t cm_ptr = ib_cm_handle;
+ int status;
- dapl_dbg_log (DAPL_DBG_TYPE_EP,
- "dapls_ib_reject_connection(cm_handle %p reason %x)\n",
- ib_cm_handle, reject_reason );
-#ifdef SOCKET_CM
-
- /* just close the socket and return */
- if ( cm_ptr->socket > 0 ) {
- close( cm_ptr->socket );
- cm_ptr->socket = -1;
+ dapl_dbg_log (DAPL_DBG_TYPE_CM,
+ " reject_connection(cm_handle %p reason %x)\n",
+ cm_handle, reject_reason );
+
+ if (cm_handle == IB_INVALID_HANDLE) {
+ dapl_dbg_log(DAPL_DBG_TYPE_ERR,
+ " reject_conn: invalid handle: reason %d\n",
+ reject_reason);
+ return DAT_SUCCESS;
}
- return DAT_SUCCESS;
-#else
- return DAT_NOT_IMPLEMENTED;
-#endif
-
+ status = ib_cm_send_rej(cm_handle->cm_id, IB_CM_REJ_CONSUMER_DEFINED,
+ NULL, 0, NULL, 0);
+ if (status) {
+ dapl_dbg_log(DAPL_DBG_TYPE_ERR,
+ " reject_conn: cm_rej failed: %d\n", status);
+ return DAT_ERROR(DAT_INSUFFICIENT_RESOURCES, 0);
+ }
+ dapli_destroy_cm_id(cm_handle);
+ return DAT_SUCCESS;
}
/*
@@ -844,7 +1245,7 @@
return DAT_INVALID_HANDLE;
dapl_os_memcpy( remote_ia_address,
- &ib_cm_handle->dst.ia_address,
+ &ib_cm_handle->r_addr,
sizeof(DAT_SOCK_ADDR6) );
return DAT_SUCCESS;
@@ -914,12 +1315,12 @@
return size;
}
-#ifdef SOCKET_CM
+#ifndef SOCKET_CM
/*
* Map all socket CM event codes to the DAT equivelent.
*/
-#define DAPL_IB_EVENT_CNT 11
+#define DAPL_IB_EVENT_CNT 12
static struct ib_cm_event_map
{
@@ -936,17 +1337,19 @@
DAT_CONNECTION_REQUEST_EVENT},
/* 04 */ { IB_CME_CONNECTION_REQUEST_PENDING_PRIVATE_DATA,
DAT_CONNECTION_REQUEST_EVENT},
- /* 05 */ { IB_CME_DESTINATION_REJECT,
+ /* 05 */ { IB_CME_CONNECTION_REQUEST_ACKED,
+ DAT_CONNECTION_REQUEST_EVENT},
+ /* 06 */ { IB_CME_DESTINATION_REJECT,
DAT_CONNECTION_EVENT_NON_PEER_REJECTED},
- /* 06 */ { IB_CME_DESTINATION_REJECT_PRIVATE_DATA,
+ /* 07 */ { IB_CME_DESTINATION_REJECT_PRIVATE_DATA,
DAT_CONNECTION_EVENT_PEER_REJECTED},
- /* 07 */ { IB_CME_DESTINATION_UNREACHABLE,
+ /* 08 */ { IB_CME_DESTINATION_UNREACHABLE,
DAT_CONNECTION_EVENT_UNREACHABLE},
- /* 08 */ { IB_CME_TOO_MANY_CONNECTION_REQUESTS,
+ /* 09 */ { IB_CME_TOO_MANY_CONNECTION_REQUESTS,
DAT_CONNECTION_EVENT_NON_PEER_REJECTED},
- /* 09 */ { IB_CME_LOCAL_FAILURE,
+ /* 10 */ { IB_CME_LOCAL_FAILURE,
DAT_CONNECTION_EVENT_BROKEN},
- /* 10 */ { IB_CM_LOCAL_FAILURE,
+ /* 11 */ { IB_CME_BROKEN,
DAT_CONNECTION_EVENT_BROKEN}
};
@@ -974,7 +1377,7 @@
active = active;
- if (ib_cm_event > IB_CM_LOCAL_FAILURE)
+ if (ib_cm_event > IB_CME_BROKEN)
return (DAT_EVENT_NUMBER) 0;
dat_event_num = 0;
@@ -1024,96 +1427,6 @@
return ib_cm_event;
}
-/* async CR processing thread to avoid blocking applications */
-void cr_thread(void *arg)
-{
- struct dapl_hca *hca_ptr = arg;
- ib_cm_srvc_handle_t cr, next_cr;
- int max_fd;
- fd_set rfd,rfds;
- struct timeval to;
-
- dapl_os_lock( &hca_ptr->ib_trans.lock );
- while ( !hca_ptr->ib_trans.destroy ) {
-
- FD_ZERO( &rfds );
- max_fd = -1;
-
- if (!dapl_llist_is_empty(&hca_ptr->ib_trans.list))
- next_cr = dapl_llist_peek_head (&hca_ptr->ib_trans.list);
- else
- next_cr = NULL;
-
- while (next_cr) {
- cr = next_cr;
- dapl_dbg_log (DAPL_DBG_TYPE_CM," thread: cm_ptr %p\n", cr );
- if (cr->l_socket == -1 || hca_ptr->ib_trans.destroy) {
-
- dapl_dbg_log(DAPL_DBG_TYPE_CM," thread: Freeing %p\n", cr);
- next_cr = dapl_llist_next_entry(&hca_ptr->ib_trans.list,
- (DAPL_LLIST_ENTRY*)&cr->entry );
- dapl_llist_remove_entry(&hca_ptr->ib_trans.list,
- (DAPL_LLIST_ENTRY*)&cr->entry);
- dapl_os_free( cr, sizeof(*cr) );
- continue;
- }
-
- FD_SET( cr->l_socket, &rfds ); /* add to select set */
- if ( cr->l_socket > max_fd )
- max_fd = cr->l_socket;
-
- /* individual select poll to check for work */
- FD_ZERO(&rfd);
- FD_SET(cr->l_socket, &rfd);
- dapl_os_unlock(&hca_ptr->ib_trans.lock);
- to.tv_sec = 0;
- to.tv_usec = 0;
- if ( select(cr->l_socket + 1,&rfd, NULL, NULL, &to) < 0) {
- dapl_dbg_log (DAPL_DBG_TYPE_CM,
- " thread: ERR %s on cr %p sk %d\n",
- strerror(errno), cr, cr->l_socket);
- close(cr->l_socket);
- cr->l_socket = -1;
- } else if ( FD_ISSET(cr->l_socket, &rfd) &&
- dapli_socket_accept(cr)) {
- close(cr->l_socket);
- cr->l_socket = -1;
- }
- dapl_os_lock( &hca_ptr->ib_trans.lock );
- next_cr = dapl_llist_next_entry(&hca_ptr->ib_trans.list,
- (DAPL_LLIST_ENTRY*)&cr->entry );
- }
- dapl_os_unlock( &hca_ptr->ib_trans.lock );
- to.tv_sec = 0;
- to.tv_usec = 500000; /* wakeup and check destroy */
- select(max_fd + 1, &rfds, NULL, NULL, &to);
- dapl_os_lock( &hca_ptr->ib_trans.lock );
- }
- dapl_os_unlock( &hca_ptr->ib_trans.lock );
- hca_ptr->ib_trans.destroy = 0;
- dapl_dbg_log(DAPL_DBG_TYPE_CM," thread(hca %p) exit\n",hca_ptr);
-}
-
-/* Real IBv CM */
-#else
-
-DAT_EVENT_NUMBER
-dapls_ib_get_dat_event (
- IN const ib_cm_events_t ib_cm_event,
- IN DAT_BOOLEAN active)
-{
- return ib_cm_event;
-}
-
-ib_cm_events_t
-dapls_ib_get_cm_event (
- IN DAT_EVENT_NUMBER dat_event_num )
-{
- return dat_event_num;
-}
-
-
-
#endif
/*
Index: dapl/openib/dapl_ib_qp.c
===================================================================
--- dapl/openib/dapl_ib_qp.c (revision 2818)
+++ dapl/openib/dapl_ib_qp.c (working copy)
@@ -107,6 +107,8 @@
qp_create.cap.max_recv_wr = attr->max_recv_dtos;
qp_create.cap.max_send_sge = attr->max_request_iov;
qp_create.cap.max_recv_sge = attr->max_recv_iov;
+ /* TODO: extend ep_attr so user can set */
+ qp_create.cap.max_inline_data = ia_ptr->hca_ptr->ib_trans.max_inline_send;
qp_create.qp_type = IBV_QPT_RC;
qp_create.qp_context = (void*)ep_ptr;
@@ -122,8 +124,7 @@
/* Setup QP attributes for INIT state on the way out */
if (dapls_modify_qp_state(ep_ptr->qp_handle,
- IBV_QPS_INIT,
- NULL ) != DAT_SUCCESS ) {
+ IBV_QPS_INIT,0,0,0 ) != DAT_SUCCESS ) {
ibv_destroy_qp(ep_ptr->qp_handle);
ep_ptr->qp_handle = IB_INVALID_HANDLE;
return DAT_INTERNAL_ERROR;
@@ -160,7 +161,7 @@
if (ep_ptr->qp_handle != IB_INVALID_HANDLE) {
/* force error state to flush queue, then destroy */
- dapls_modify_qp_state(ep_ptr->qp_handle, IBV_QPS_ERR, NULL);
+ dapls_modify_qp_state(ep_ptr->qp_handle, IBV_QPS_ERR, 0,0,0);
if (ibv_destroy_qp(ep_ptr->qp_handle))
return(dapl_convert_errno(errno,"destroy_qp"));
@@ -216,7 +217,7 @@
(ep_ptr->qp_handle->state != IBV_QPS_ERR)) {
ep_ptr->qp_state = IB_QP_STATE_ERROR;
return (dapls_modify_qp_state(ep_ptr->qp_handle,
- IBV_QPS_ERR, NULL));
+ IBV_QPS_ERR,0,0,0));
}
/*
@@ -271,8 +272,8 @@
if ( ep_ptr->qp_handle != IB_INVALID_HANDLE ) {
/* move to RESET state and then to INIT */
- dapls_modify_qp_state(ep_ptr->qp_handle, IBV_QPS_RESET, NULL);
- dapls_modify_qp_state(ep_ptr->qp_handle, IBV_QPS_INIT, NULL );
+ dapls_modify_qp_state(ep_ptr->qp_handle, IBV_QPS_RESET, 0,0,0);
+ dapls_modify_qp_state(ep_ptr->qp_handle, IBV_QPS_INIT, 0,0,0);
ep_ptr->qp_state = IB_QP_STATE_INIT;
}
@@ -287,7 +288,9 @@
DAT_RETURN
dapls_modify_qp_state ( IN ib_qp_handle_t qp_handle,
IN ib_qp_state_t qp_state,
- IN ib_qp_cm_t *qp_cm )
+ IN uint32_t qpn,
+ IN uint16_t lid,
+ IN uint8_t port )
{
struct ibv_qp_attr qp_attr;
enum ibv_qp_attr_mask mask = IBV_QP_STATE;
@@ -299,7 +302,6 @@
/* additional attributes with RTR and RTS */
case IBV_QPS_RTR:
{
-#ifdef SOCKET_CM
mask |= IBV_QP_AV |
IBV_QP_PATH_MTU |
IBV_QP_DEST_QPN |
@@ -308,25 +310,24 @@
IBV_QP_MIN_RNR_TIMER;
qp_attr.qp_state = IBV_QPS_RTR;
qp_attr.path_mtu = IBV_MTU_1024;
- qp_attr.dest_qp_num = qp_cm->qpn;
+ qp_attr.dest_qp_num = qpn;
qp_attr.rq_psn = 1;
- qp_attr.max_dest_rd_atomic = 8;
+ qp_attr.max_dest_rd_atomic = IB_TARGET_MAX;
qp_attr.min_rnr_timer = 12;
qp_attr.ah_attr.is_global = 0;
- qp_attr.ah_attr.dlid = qp_cm->lid;
+ qp_attr.ah_attr.dlid = lid;
qp_attr.ah_attr.sl = 0;
qp_attr.ah_attr.src_path_bits = 0;
- qp_attr.ah_attr.port_num = qp_cm->port;
+ qp_attr.ah_attr.port_num = port;
dapl_dbg_log (DAPL_DBG_TYPE_EP,
- " modify_qp_rtr: qpn %x lid %x port %x\n",
- qp_cm->qpn,qp_cm->lid,qp_cm->port );
+ " modify_qp_RTR: qpn %x lid %x port %x, rq_psn %x\n",
+ qpn,lid,port,ntohl(qp_attr.rq_psn) );
break;
-#endif
+
}
case IBV_QPS_RTS:
{
-#ifdef SOCKET_CM
mask |= IBV_QP_TIMEOUT |
IBV_QP_RETRY_CNT |
IBV_QP_RNR_RETRY |
@@ -337,12 +338,11 @@
qp_attr.retry_cnt = 7;
qp_attr.rnr_retry = 7;
qp_attr.sq_psn = 1;
- qp_attr.max_rd_atomic = 8;
+ qp_attr.max_rd_atomic = IB_TARGET_MAX;
dapl_dbg_log (DAPL_DBG_TYPE_EP,
- " modify_qp_rts: psn %x or %x\n",
- qp_attr.sq_psn, qp_attr.max_rd_atomic );
+ " modify_qp_RTS: psn %x or %x\n",
+ ntohl(qp_attr.sq_psn), qp_attr.max_rd_atomic );
break;
-#endif
}
case IBV_QPS_INIT:
{
@@ -368,7 +368,7 @@
IBV_ACCESS_REMOTE_ATOMIC;
dapl_dbg_log (DAPL_DBG_TYPE_EP,
- " modify_qp_init: pi %x port %x acc %x\n",
+ " modify_qp_INIT: pi %x port %x acc %x\n",
qp_attr.pkey_index, qp_attr.port_num,
qp_attr.qp_access_flags );
break;
Index: dapl/openib/README
===================================================================
--- dapl/openib/README (revision 2818)
+++ dapl/openib/README (working copy)
@@ -6,10 +6,12 @@
- CQ_WAIT_OBJECT support added
- added dapl/openib directory
- modify doc/dat.conf to add a example openib configuration
+- fixes to wait and resize
dapl/common/dapl_adapter_util.h
dapl/common/dapl_evd_dto_callb.c
dapl/common/dapl_evd_util.c
+ dapl/common/dapl_evd_resize.c
dapl/include/dapl.h
dapl/udapl/dapl_evd_set_unwaitable.c
dapl/udapl/dapl_evd_wait.c
@@ -29,8 +31,7 @@
dapl/openib/dapl_ib_qp.c
dapl/openib/dapl_ib_util.c
dapl/openib/dapl_ib_util.h
- dapl/openib/dapl_ib_cm.c
-
+
A simple dapl test just for initial openIB testing...
test/dtest/dtest.c
@@ -38,10 +39,17 @@
server: dtest -s
client: dtest -h hostname
+
+setup/known issues:
+
+ First drop with uCM (without IBAT), tested with simple dtest across 2 nodes.
+ hand rolled path records require remote LID and GID set via enviroment:
+
+ export DAPL_REMOTE_GID "fe80:0000:0000:0000:0002:c902:0000:4071"
+ export DAPL_REMOTE_LID "0002"
-known issues:
-
- early drop, only tested with simple dtest and dapltest SR.
+ Also, hard coded (RTR) for use with port 1 only.
+
no memory windows support in ibverbs, dat_create_rmr fails.
Index: dapl/openib/dapl_ib_util.h
===================================================================
--- dapl/openib/dapl_ib_util.h (revision 2818)
+++ dapl/openib/dapl_ib_util.h (working copy)
@@ -51,6 +51,8 @@
#include "verbs.h"
#include <byteswap.h>
+#include <infiniband/sa.h>
+#include <infiniband/cm.h>
/* Typedefs to map common DAPL provider types to IB verbs */
typedef struct ibv_qp *ib_qp_handle_t;
@@ -64,20 +66,46 @@
typedef struct ibv_context *ib_hca_handle_t;
typedef ib_hca_handle_t dapl_ibal_ca_t;
-/* CM mappings, user CM not complete use SOCKETS */
+#define IB_RC_RETRY_COUNT 7
+#define IB_RNR_RETRY_COUNT 7
+#define IB_CM_RESPONSE_TIMEOUT 20 /* 4 sec */
+#define IB_MAX_CM_RETRIES 4
+
+#define IB_REQ_MRA_TIMEOUT 27 /* a little over 9 minutes */
+#define IB_MAX_AT_RETRY 3
+
+#define IB_TARGET_MAX 4 /* max_qp_ous_rd_atom */
+#define IB_INITIATOR_DEPTH 4 /* max_qp_init_rd_atom */
+
+typedef enum {
+ IB_CME_CONNECTED,
+ IB_CME_DISCONNECTED,
+ IB_CME_DISCONNECTED_ON_LINK_DOWN,
+ IB_CME_CONNECTION_REQUEST_PENDING,
+ IB_CME_CONNECTION_REQUEST_PENDING_PRIVATE_DATA,
+ IB_CME_CONNECTION_REQUEST_ACKED,
+ IB_CME_DESTINATION_REJECT,
+ IB_CME_DESTINATION_REJECT_PRIVATE_DATA,
+ IB_CME_DESTINATION_UNREACHABLE,
+ IB_CME_TOO_MANY_CONNECTION_REQUESTS,
+ IB_CME_LOCAL_FAILURE,
+ IB_CME_BROKEN
+} ib_cm_events_t;
-#ifdef SOCKET_CM
+#ifndef IB_AT
+/* implement a quick hack to exchange GID/LID's until user IB_AT arrives */
+struct ib_at_ib_route {
+ union ibv_gid gid;
+ uint16_t lid;
+};
-/* destination info to exchange until real IB CM shows up */
-typedef struct _ib_qp_cm
-{
- uint32_t qpn;
- uint16_t lid;
- uint16_t port;
- int p_size;
- DAT_SOCK_ADDR6 ia_address;
+struct ib_at_completion {
+ void (*fn)(uint64_t req_id, void *context, int rec_num);
+ void *context;
+ uint64_t req_id;
+};
-} ib_qp_cm_t;
+#endif
/*
* dapl_llist_entry in dapl.h but dapl.h depends on provider
@@ -91,52 +119,27 @@
struct dapl_llist_entry *list_head;
};
-struct ib_cm_handle
-{
- struct ib_llist_entry entry;
- int socket;
- int l_socket;
- struct dapl_hca *hca_ptr;
- DAT_HANDLE cr;
- DAT_HANDLE sp;
- ib_qp_cm_t dst;
- unsigned char p_data[256];
+struct dapl_cm_id {
+ struct ib_llist_entry entry;
+ DAPL_OS_LOCK lock;
+ int retries;
+ int destroy;
+ int in_callback;
+ uint32_t cm_id;
+ DAT_SOCK_ADDR6 r_addr;
+ DAT_CONN_QUAL service_id;
+ struct dapl_hca *hca;
+ struct dapl_sp *sp;
+ struct dapl_ep *ep;
+ struct ib_at_ib_route dapl_rt;
+ struct ib_sa_path_rec dapl_path;
+ struct ib_cm_req_param req;
+ struct ib_cm_req_event_param req_rcvd;
+ struct ib_at_completion dapl_comp;
};
-typedef struct ib_cm_handle *ib_cm_handle_t;
-typedef ib_cm_handle_t ib_cm_srvc_handle_t;
-
-DAT_RETURN getipaddr(char *addr, int addr_len);
-
-/* CM events */
-typedef enum
-{
- IB_CME_CONNECTED,
- IB_CME_DISCONNECTED,
- IB_CME_DISCONNECTED_ON_LINK_DOWN,
- IB_CME_CONNECTION_REQUEST_PENDING,
- IB_CME_CONNECTION_REQUEST_PENDING_PRIVATE_DATA,
- IB_CME_DESTINATION_REJECT,
- IB_CME_DESTINATION_REJECT_PRIVATE_DATA,
- IB_CME_DESTINATION_UNREACHABLE,
- IB_CME_TOO_MANY_CONNECTION_REQUESTS,
- IB_CME_LOCAL_FAILURE,
- IB_CM_LOCAL_FAILURE
-
-} ib_cm_events_t;
-
-/* prototype for cm thread */
-void cr_thread (void *arg);
-
-#else
-
-/* TODO: Waiting for IB CM to define */
-typedef uint32_t *ib_cm_handle_t;
-typedef uint32_t *ib_cm_srvc_handle_t;
-typedef uint32_t ib_qp_cm_t;
-typedef uint32_t ib_cm_events_t;
-
-#endif
+typedef struct dapl_cm_id *ib_cm_handle_t;
+typedef struct dapl_cm_id *ib_cm_srvc_handle_t;
/* Operation and state mappings */
typedef enum ibv_send_flags ib_send_op_type_t;
@@ -167,6 +170,9 @@
/* Definitions */
#define IB_INVALID_HANDLE NULL
+/* inline send rdma threshold */
+#define INLINE_SEND_DEFAULT 128
+
/* CM private data areas */
#define IB_MAX_REQ_PDATA_SIZE 92
#define IB_MAX_REP_PDATA_SIZE 196
@@ -174,7 +180,7 @@
#define IB_MAX_DREQ_PDATA_SIZE 220
#define IB_MAX_DREP_PDATA_SIZE 224
-/* DTO OPs, ordered for DAPL ENUM definitions ???*/
+/* DTO OPs, ordered for DAPL ENUM definitions */
#define OP_RDMA_WRITE IBV_WR_RDMA_WRITE
#define OP_RDMA_WRITE_IMM IBV_WR_RDMA_WRITE_WITH_IMM
#define OP_SEND IBV_WR_SEND
@@ -232,13 +238,9 @@
{
struct ibv_device *ib_dev;
ib_cq_handle_t ib_cq_empty;
-
-#if SOCKET_CM
- int destroy;
- DAPL_OS_THREAD thread;
- DAPL_OS_LOCK lock;
- struct dapl_llist_entry *list;
-#endif
+ int max_inline_send;
+ uint16_t lid;
+ union ibv_gid gid;
ib_async_handler_t async_unafiliated;
ib_async_handler_t async_cq_error;
ib_async_handler_t async_cq;
@@ -252,11 +254,21 @@
/* prototypes */
int32_t dapls_ib_init (void);
int32_t dapls_ib_release (void);
+void cm_thread (void *arg);
+int dapli_cm_thread_init(void);
+void dapli_cm_thread_destroy(void);
+
+int dapli_get_lid(struct dapl_hca *hca_ptr, int port, uint16_t *lid );
+int dapli_get_gid(struct dapl_hca *hca_ptr, int port, int index,
+ union ibv_gid *gid );
+int dapli_get_addr(char *addr, int addr_len);
DAT_RETURN
dapls_modify_qp_state ( IN ib_qp_handle_t qp_handle,
IN ib_qp_state_t qp_state,
- IN ib_qp_cm_t *qp_cm );
+ IN uint32_t qpn,
+ IN uint16_t lid,
+ IN uint8_t port );
/* inline functions */
STATIC _INLINE_ IB_HCA_NAME dapl_ib_convert_name (IN char *name)
Index: dapl/openib/dapl_ib_cq.c
===================================================================
--- dapl/openib/dapl_ib_cq.c (revision 2818)
+++ dapl/openib/dapl_ib_cq.c (working copy)
@@ -50,6 +50,7 @@
#include "dapl_adapter_util.h"
#include "dapl_lmr_util.h"
#include "dapl_evd_util.h"
+#include <sys/poll.h>
/*
@@ -448,36 +449,28 @@
ib_cq_handle_t cq = evd_ptr->ib_cq_handle;
struct ibv_cq *ibv_cq;
void *ibv_ctx;
- int status = EINVAL; /* invalid handle */
+ int status = -ETIMEDOUT;
dapl_dbg_log ( DAPL_DBG_TYPE_UTIL,
" cq_object_wait: dev %p evd %p cq %p, time %d\n",
cq->context, evd_ptr, cq, timeout );
- /* TODO: add timeout, map each CQ created?? */
/* Multiple EVD's sharing one event handle for now */
- while (evd_ptr->ib_cq_handle) {
+ if (cq) {
+ struct pollfd cq_poll = {
+ .fd = cq->context->cq_fd[0],
+ .events = POLLIN
+ };
+ int timeout_ms = -1;
+
+ if (timeout != DAT_TIMEOUT_INFINITE)
+ timeout_ms = timeout/1000;
- status = ibv_get_cq_event(cq->context,
- 0, &ibv_cq, &ibv_ctx);
- if (status)
- break;
-
- /* EVD mismatch, process DTO callback for this EVD */
- if (ibv_cq != cq) {
- ib_hca_tranport_t *hca_ptr =
- &evd_ptr->header.owner_ia->hca_ptr->ib_trans;
-
- if ( hca_ptr->async_cq )
- hca_ptr->async_cq(cq->context,
- (ib_error_record_t*)ibv_cq,
- ibv_ctx);
-
- continue;
- }
- break;
+ status = poll(&cq_poll, 1, timeout_ms);
+ if (status == 1)
+ status = ibv_get_cq_event(cq->context,
+ 0, &ibv_cq, &ibv_ctx);
}
-
dapl_dbg_log (DAPL_DBG_TYPE_UTIL,
" cq_object_wait: RET cq %p ibv_cq %p ibv_ctx %p %x\n",
cq,ibv_cq,ibv_ctx,status);
Index: dtest/dtest.c
===================================================================
--- dtest/dtest.c (revision 0)
+++ dtest/dtest.c (revision 0)
@@ -0,0 +1,1700 @@
+/*
+ * Copyright (c) 2005 Intel Corporation. All rights reserved.
+ *
+ * This software is available to you under a choice of one of two
+ * licenses. You may choose to be licensed under the terms of the GNU
+ * General Public License (GPL) Version 2, available from the file
+ * COPYING in the main directory of this source tree, or the
+ * OpenIB.org BSD license below:
+ *
+ * Redistribution and use in source and binary forms, with or
+ * without modification, are permitted provided that the following
+ * conditions are met:
+ *
+ * - Redistributions of source code must retain the above
+ * copyright notice, this list of conditions and the following
+ * disclaimer.
+ *
+ * - Redistributions in binary form must reproduce the above
+ * copyright notice, this list of conditions and the following
+ * disclaimer in the documentation and/or other materials
+ * provided with the distribution.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
+ * EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
+ * MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
+ * NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS
+ * BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN
+ * ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
+ * CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ *
+ * $Id: $
+ */
+#include <stdio.h>
+#include <stdlib.h>
+#include <netdb.h>
+#include <sys/socket.h>
+#include <netinet/in.h>
+#include <netinet/tcp.h>
+#include <arpa/inet.h>
+#include <sys/mman.h>
+#include <getopt.h>
+
+#ifndef DAPL_PROVIDER
+#define DAPL_PROVIDER "OpenIB1_2"
+#endif
+
+#define MAX_POLLING_CNT 50000
+
+/* Header files needed for DAT/uDAPL */
+#include "dat/udat.h"
+
+/* definitions */
+#define SERVER_CONN_QUAL 71123
+#define DTO_TIMEOUT (1000*1000*5)
+#define DTO_FLUSH_TIMEOUT (1000*1000*2)
+#define CONN_TIMEOUT (1000*1000*10)
+#define SERVER_TIMEOUT (1000*1000*20)
+#define RDMA_BUFFER_SIZE (64)
+
+/* Global DAT vars */
+static DAT_IA_HANDLE h_ia = DAT_HANDLE_NULL;
+static DAT_PZ_HANDLE h_pz = DAT_HANDLE_NULL;
+static DAT_EP_HANDLE h_ep = DAT_HANDLE_NULL;
+static DAT_PSP_HANDLE h_psp = DAT_HANDLE_NULL;
+static DAT_CR_HANDLE h_cr = DAT_HANDLE_NULL;
+
+static DAT_EVD_HANDLE h_async_evd = DAT_HANDLE_NULL;
+static DAT_EVD_HANDLE h_dto_evd = DAT_HANDLE_NULL;
+static DAT_EVD_HANDLE h_cr_evd = DAT_HANDLE_NULL;
+static DAT_EVD_HANDLE h_conn_evd = DAT_HANDLE_NULL;
+static DAT_CNO_HANDLE h_dto_cno = DAT_HANDLE_NULL;
+
+/* RDMA buffers */
+static DAT_LMR_HANDLE h_lmr_send = DAT_HANDLE_NULL;
+static DAT_LMR_HANDLE h_lmr_recv = DAT_HANDLE_NULL;
+static DAT_LMR_CONTEXT lmr_context_send;
+static DAT_LMR_CONTEXT lmr_context_recv;
+static DAT_RMR_CONTEXT rmr_context_send;
+static DAT_RMR_CONTEXT rmr_context_recv;
+static DAT_VLEN registered_size_send;
+static DAT_VLEN registered_size_recv;
+static DAT_VADDR registered_addr_send;
+static DAT_VADDR registered_addr_recv;
+
+/* Initial msg receive buf, RMR exchange, and Rdma-write notification */
+#define MSG_BUF_COUNT 3
+static DAT_RMR_TRIPLET rmr_recv_msg[ MSG_BUF_COUNT ];
+static DAT_LMR_HANDLE h_lmr_recv_msg = DAT_HANDLE_NULL;
+static DAT_LMR_CONTEXT lmr_context_recv_msg;
+static DAT_RMR_CONTEXT rmr_context_recv_msg;
+static DAT_VLEN registered_size_recv_msg;
+static DAT_VADDR registered_addr_recv_msg;
+
+/* message send buffer */
+static DAT_RMR_TRIPLET rmr_send_msg;
+static DAT_LMR_HANDLE h_lmr_send_msg = DAT_HANDLE_NULL;
+static DAT_LMR_CONTEXT lmr_context_send_msg;
+static DAT_RMR_CONTEXT rmr_context_send_msg;
+static DAT_VLEN registered_size_send_msg;
+static DAT_VADDR registered_addr_send_msg;
+static DAT_EP_ATTR ep_attr;
+char hostname[256] = {0};
+
+/* rdma pointers */
+char *rbuf = NULL;
+char *sbuf = NULL;
+int status;
+
+/* timers */
+double start,stop,total_us,total_sec;
+struct {
+ double total;
+ double open;
+ double reg;
+ double unreg;
+ double pzc;
+ double pzf;
+ double evdc;
+ double evdf;
+ double cnoc;
+ double cnof;
+ double epc;
+ double epf;
+ double rdma_wr;
+ double rdma_rd;
+ double rtt;
+ double close;
+} time;
+
+/* defaults */
+static int parent=1;
+static int connected=0;
+static int burst=10;
+static int server=1;
+static int verbose=0;
+static int polling=1;
+static int poll_count=0;
+static int rdma_wr_poll_count=0;
+static int rdma_rd_poll_count=0;
+static int pin_memory=0;
+static int delay=0;
+static int buf_len=RDMA_BUFFER_SIZE;
+static int use_cno=0;
+static int post_recv_count=MSG_BUF_COUNT;
+static int recv_msg_index=0;
+static int burst_msg_posted=0;
+static int burst_msg_index=0;
+
+#define MAX_RDMA_RD 4
+#define MAX_PROCS 1000
+
+static pid_t child[MAX_PROCS+1];
+
+/* forward prototypes */
+const char * DT_RetToString (DAT_RETURN ret_value);
+const char * DT_EventToSTr (DAT_EVENT_NUMBER event_code);
+void print_usage();
+double get_time();
+void init_data();
+
+DAT_RETURN send_msg( void *data,
+ DAT_COUNT size,
+ DAT_LMR_CONTEXT context,
+ DAT_DTO_COOKIE cookie,
+ DAT_COMPLETION_FLAGS flags );
+
+DAT_RETURN connect_ep( char *hostname, int conn_id );
+void disconnect_ep( void );
+DAT_RETURN register_rdma_memory( void );
+DAT_RETURN unregister_rdma_memory( void );
+DAT_RETURN create_events( void );
+DAT_RETURN destroy_events(void);
+DAT_RETURN do_rdma_write_with_msg( void );
+DAT_RETURN do_rdma_read_with_msg( void );
+DAT_RETURN do_ping_pong_msg( void );
+
+#define LOGPRINTF(_format, _aa...) \
+ if (verbose) \
+ printf(_format, ##_aa)
+
+main(int argc, char **argv)
+{
+ int c;
+ DAT_RETURN ret;
+
+ /* parse arguments */
+ while ((c = getopt(argc, argv, "scvpb:d:B:h:")) != -1)
+ {
+ switch(c)
+ {
+ case 's':
+ server = 1;
+ printf("%d Running as server\n",getpid());
+ fflush(stdout);
+ break;
+ case 'c':
+ use_cno = 1;
+ printf("%d Creating CNO for DTO EVD's\n",getpid());
+ fflush(stdout);
+ break;
+ case 'v':
+ verbose = 1;
+ printf("%d Verbose\n",getpid());
+ fflush(stdout);
+ break;
+ case 'p':
+ polling = 1;
+ printf("%d Polling\n",getpid());
+ fflush(stdout);
+ break;
+ case 'B':
+ burst = atoi(optarg);
+ break;
+ case 'd':
+ delay = atoi(optarg);
+ break;
+ case 'b':
+ buf_len = atoi(optarg);
+ break;
+ case 'h':
+ server = 0;
+ strcpy (hostname, optarg);
+ break;
+ default:
+ print_usage();
+ exit(-12);
+ }
+ }
+
+ if (!server) {
+ printf("%d Running as client\n",getpid()); fflush(stdout);
+ } else {
+ printf("%d Running as server\n",getpid()); fflush(stdout);
+ }
+
+ /* allocate send and receive buffers */
+ if (((rbuf = malloc(buf_len*burst)) == NULL) ||
+ ((sbuf = malloc(buf_len*burst)) == NULL)) {
+ perror("malloc");
+ exit(1);
+ }
+ memset( &time, sizeof(time), 0);
+ LOGPRINTF("%d Allocated RDMA buffers (r:%p,s:%p) len %d \n",
+ getpid(), rbuf, sbuf, buf_len);
+
+ /* dat_ia_open, dat_pz_create */
+ h_async_evd = DAT_HANDLE_NULL;
+ start = get_time();
+ ret = dat_ia_open( DAPL_PROVIDER, 8, &h_async_evd, &h_ia );
+ stop = get_time();
+ time.open += ((stop - start)*1.0e6);
+ if(ret != DAT_SUCCESS) {
+ fprintf(stderr, "%d: Error Adaptor open: %s\n",
+ getpid(),DT_RetToString(ret));
+ exit(1);
+ } else
+ LOGPRINTF("%d Opened Interface Adaptor\n",getpid());
+
+ /* Create Protection Zone */
+ start = get_time();
+ LOGPRINTF("%d Create Protection Zone\n",getpid());
+ ret = dat_pz_create(h_ia, &h_pz);
+ stop = get_time();
+ time.pzc += ((stop - start)*1.0e6);
+ if(ret != DAT_SUCCESS) {
+ fprintf(stderr,
+ "%d Error creating Protection Zone: %s\n",
+ getpid(),DT_RetToString(ret));
+ exit(1);
+ } else
+ LOGPRINTF("%d Created Protection Zone\n",getpid());
+
+ /* Register memory */
+ LOGPRINTF("%d Register RDMA memory\n", getpid());
+ ret = register_rdma_memory();
+ if(ret != DAT_SUCCESS) {
+ fprintf(stderr, "%d Error creating events: %s\n",
+ getpid(),DT_RetToString(ret));
+ goto cleanup;
+ } else
+ LOGPRINTF("%d Register RDMA memory done\n", getpid());
+
+ LOGPRINTF("%d Create events\n", getpid());
+ ret = create_events();
+ if(ret != DAT_SUCCESS) {
+ fprintf(stderr, "%d Error creating events: %s\n",
+ getpid(),DT_RetToString(ret));
+ goto cleanup;
+ } else {
+ LOGPRINTF("%d Create events done\n", getpid());
+ }
+
+ /* create EP */
+ memset( &ep_attr, 0, sizeof(ep_attr) );
+ ep_attr.service_type = DAT_SERVICE_TYPE_RC;
+ ep_attr.max_rdma_size = 0x10000;
+ ep_attr.qos = 0;
+ ep_attr.recv_completion_flags = 0;
+ ep_attr.max_recv_dtos = MSG_BUF_COUNT + (burst*3);
+ ep_attr.max_request_dtos = MSG_BUF_COUNT + (burst*3) + MAX_RDMA_RD;
+ ep_attr.max_recv_iov = 1;
+ ep_attr.max_request_iov = 1;
+ ep_attr.max_rdma_read_in = MAX_RDMA_RD;
+ ep_attr.max_rdma_read_out = MAX_RDMA_RD;
+ ep_attr.request_completion_flags = DAT_COMPLETION_DEFAULT_FLAG;
+ ep_attr.ep_transport_specific_count = 0;
+ ep_attr.ep_transport_specific = NULL;
+ ep_attr.ep_provider_specific_count = 0;
+ ep_attr.ep_provider_specific = NULL;
+
+ start = get_time();
+ ret = dat_ep_create( h_ia, h_pz, h_dto_evd, h_dto_evd, h_conn_evd, &ep_attr, &h_ep );
+ stop = get_time();
+ time.epc += ((stop - start)*1.0e6);
+ time.total += time.epc;
+ if(ret != DAT_SUCCESS) {
+ fprintf(stderr, "%d Error dat_ep_create: %s\n",
+ getpid(),DT_RetToString(ret));
+ goto cleanup;
+ } else
+ LOGPRINTF("%d EP created %p \n", getpid(), h_ep);
+
+ /*
+ * register message buffers, establish connection, and
+ * exchange DMA RMR information info via messages
+ */
+ ret = connect_ep( hostname, SERVER_CONN_QUAL );
+ if(ret != DAT_SUCCESS) {
+ fprintf(stderr, "%d Error connect_ep: %s\n",
+ getpid(),DT_RetToString(ret));
+ goto cleanup;
+ } else
+ LOGPRINTF("%d connect_ep complete\n", getpid());
+
+ /*********** RDMA write data *************/
+ ret = do_rdma_write_with_msg();
+ if(ret != DAT_SUCCESS) {
+ fprintf(stderr, "%d Error do_rdma_write_with_msg: %s\n",
+ getpid(),DT_RetToString(ret));
+ goto cleanup;
+ } else
+ LOGPRINTF("%d do_rdma_write_with_msg complete\n", getpid());
+
+ /*********** RDMA read data *************/
+ ret = do_rdma_read_with_msg();
+ if(ret != DAT_SUCCESS) {
+ fprintf(stderr, "%d Error do_rdma_read_with_msg: %s\n",
+ getpid(),DT_RetToString(ret));
+ goto cleanup;
+ } else
+ LOGPRINTF("%d do_rdma_read_with_msg complete\n", getpid());
+
+ /*********** PING PING messages ************/
+ ret = do_ping_pong_msg();
+ if(ret != DAT_SUCCESS) {
+ fprintf(stderr, "%d Error do_ping_pong_msg: %s\n",
+ getpid(),DT_RetToString(ret));
+ goto cleanup;
+ } else
+ LOGPRINTF("%d do_ping_pong_msg complete\n", getpid());
+
+cleanup:
+ /* disconnect and free EP resources */
+ if ( h_ep != DAT_HANDLE_NULL ) {
+ /* unregister message buffers and tear down connection */
+ LOGPRINTF("%d Disconnect and Free EP %p \n",getpid(),h_ep);
+ disconnect_ep();
+ }
+
+ /* free EP */
+ LOGPRINTF("%d Free EP %p \n",getpid(),h_ep);
+ start = get_time();
+ ret = dat_ep_free( h_ep );
+ stop = get_time();
+ time.epf += ((stop - start)*1.0e6);
+ time.total += time.epf;
+ if(ret != DAT_SUCCESS) {
+ fprintf(stderr, "%d Error freeing EP: %s\n",
+ getpid(), DT_RetToString(ret));
+ } else {
+ LOGPRINTF("%d Freed EP\n",getpid());
+ h_ep = DAT_HANDLE_NULL;
+ }
+
+ /* free EVDs */
+ LOGPRINTF("%d destroy events\n", getpid());
+ ret = destroy_events();
+ if(ret != DAT_SUCCESS)
+ fprintf(stderr, "%d Error destroy_events: %s\n",
+ getpid(),DT_RetToString(ret));
+ else
+ LOGPRINTF("%d destroy events done\n", getpid());
+
+
+ ret = unregister_rdma_memory();
+ LOGPRINTF("%d unregister_rdma_memory \n", getpid());
+ if(ret != DAT_SUCCESS)
+ fprintf(stderr, "%d Error unregister_rdma_memory: %s\n",
+ getpid(),DT_RetToString(ret));
+ else
+ LOGPRINTF("%d unregister_rdma_memory done\n", getpid());
+
+
+ if (delay) sleep(delay);
+
+ /* Free protection domain */
+ LOGPRINTF("%d Freeing pz\n",getpid());
+ start = get_time();
+ ret = dat_pz_free( h_pz );
+ stop = get_time();
+ time.pzf += ((stop - start)*1.0e6);
+ if (ret != DAT_SUCCESS) {
+ fprintf(stderr, "%d Error freeing PZ: %s\n",
+ getpid(), DT_RetToString(ret));
+ } else {
+ LOGPRINTF("%d Freed pz\n",getpid());
+ h_pz = NULL;
+ }
+
+ /* close the device */
+ LOGPRINTF("%d Closing Interface Adaptor\n",getpid());
+ start = get_time();
+ ret = dat_ia_close( h_ia, DAT_CLOSE_ABRUPT_FLAG );
+ stop = get_time();
+ time.close += ((stop - start)*1.0e6);
+ if(ret != DAT_SUCCESS) {
+ fprintf(stderr, "%d: Error Adaptor close: %s\n",
+ getpid(),DT_RetToString(ret));
+ exit(1);
+ } else
+ LOGPRINTF("%d Closed Interface Adaptor\n",getpid());
+
+ printf( "\n%d: DAPL Test Complete.\n\n",getpid());
+ printf( "%d: RDMA write: Total=%10.2lf usec, %d bursts, itime=%10.2lf usec, pc=%d\n",
+ getpid(), time.rdma_wr, burst, time.rdma_wr/burst, rdma_wr_poll_count );
+ printf( "%d: RDMA read: Total=%10.2lf usec, %d bursts, itime=%10.2lf usec, pc=%d\n",
+ getpid(), time.rdma_rd, MAX_RDMA_RD, time.rdma_rd/MAX_RDMA_RD, rdma_rd_poll_count );
+ printf( "%d: Message RTT: Total=%10.2lf usec, %d bursts, itime=%10.2lf usec, pc=%d\n\n",
+ getpid(), time.rtt, burst, time.rtt/burst, poll_count );
+
+ printf( "%d: open: %10.2lf usec\n", getpid(), time.open );
+ printf( "%d: close: %10.2lf usec\n", getpid(), time.close );
+ printf( "%d: PZ create: %10.2lf usec\n", getpid(), time.pzc );
+ printf( "%d: PZ free: %10.2lf usec\n", getpid(), time.pzf );
+ printf( "%d: LMR create:%10.2lf usec\n", getpid(), time.reg );
+ printf( "%d: LMR free: %10.2lf usec\n", getpid(), time.unreg );
+ printf( "%d: EVD create:%10.2lf usec\n", getpid(), time.evdc );
+ printf( "%d: EVD free: %10.2lf usec\n", getpid(), time.evdf );
+ if (use_cno) {
+ printf( "%d: CNO create: %10.2lf usec\n", getpid(), time.cnoc );
+ printf( "%d: CNO free: %10.2lf usec\n", getpid(), time.cnof );
+ }
+ printf( "%d: EP create: %10.2lf usec\n",getpid(), time.epc );
+ printf( "%d: EP free: %10.2lf usec\n",getpid(), time.epf );
+ printf( "%d: TOTAL: %10.2lf usec\n",getpid(), time.total );
+
+ /* free rdma buffers */
+ free(rbuf);
+ free(sbuf);
+}
+
+
+double get_time()
+{
+ struct timeval tp;
+
+ gettimeofday(&tp, NULL);
+ return ((double) tp.tv_sec + (double) tp.tv_usec * 1e-6);
+}
+
+void init_data()
+{
+ memset(rbuf, 'a', buf_len);
+ memset(sbuf, 'b', buf_len);
+}
+
+
+DAT_RETURN
+send_msg( void *data,
+ DAT_COUNT size,
+ DAT_LMR_CONTEXT context,
+ DAT_DTO_COOKIE cookie,
+ DAT_COMPLETION_FLAGS flags )
+{
+ DAT_LMR_TRIPLET iov;
+ DAT_EVENT event;
+ DAT_COUNT nmore;
+ DAT_RETURN ret;
+
+ iov.lmr_context = context;
+ iov.pad = 0;
+ iov.virtual_address = (DAT_VADDR)(unsigned long)data;
+ iov.segment_length = size;
+ LOGPRINTF("%d calling post_send\n", getpid());
+ cookie.as_64 = 0xaaaa;
+ ret = dat_ep_post_send( h_ep,
+ 1,
+ &iov,
+ cookie,
+ flags );
+
+ if (ret != DAT_SUCCESS) {
+ fprintf(stderr, "%d: ERROR: dat_ep_post_send() %s\n",
+ getpid(),DT_RetToString(ret));
+ return ret;
+ }
+
+ if (!(flags & DAT_COMPLETION_SUPPRESS_FLAG)) {
+ if ( polling ) {
+ printf("%d Polling post send completion...\n",getpid());
+ while ( dat_evd_dequeue( h_dto_evd, &event ) == DAT_QUEUE_EMPTY );
+ }
+ else {
+ LOGPRINTF("%d waiting for post_send completion event\n", getpid());
+ if (use_cno) {
+ DAT_EVD_HANDLE evd = DAT_HANDLE_NULL;
+ ret = dat_cno_wait( h_dto_cno, DTO_TIMEOUT, &evd );
+ LOGPRINTF("%d cno wait return evd_handle=%p\n", getpid(),evd);
+ if ( evd != h_dto_evd ) {
+ fprintf(stderr,
+ "%d Error waiting on h_dto_cno: evd != h_dto_evd\n",
+ getpid());
+ return( DAT_ABORT );
+ }
+ }
+ /* use wait to dequeue */
+ ret = dat_evd_wait( h_dto_evd, DTO_TIMEOUT, 1, &event, &nmore );
+ if (ret != DAT_SUCCESS) {
+ fprintf(stderr, "%d: ERROR: DTO dat_evd_wait() %s\n",
+ getpid(),DT_RetToString(ret));
+ return ret;
+ }
+ }
+
+ /* validate event number, len, cookie, and status */
+ if ( event.event_number != DAT_DTO_COMPLETION_EVENT ) {
+ fprintf(stderr, "%d: ERROR: DTO event number %s\n",
+ getpid(),DT_EventToSTr(event.event_number));
+ return( DAT_ABORT );
+ }
+
+ if ((event.event_data.dto_completion_event_data.transfered_length != size ) ||
+ (event.event_data.dto_completion_event_data.user_cookie.as_64 != 0xaaaa )) {
+ fprintf(stderr, "%d: ERROR: DTO len %d or cookie %x\n",
+ getpid(),
+ event.event_data.dto_completion_event_data.transfered_length,
+ event.event_data.dto_completion_event_data.user_cookie.as_64 );
+ return( DAT_ABORT );
+
+ }
+ if (event.event_data.dto_completion_event_data.status != DAT_SUCCESS) {
+ fprintf(stderr, "%d: ERROR: DTO event status %s\n",
+ getpid(),DT_RetToString(ret));
+ return( DAT_ABORT );
+ }
+ }
+
+ return DAT_SUCCESS;
+}
+
+
+DAT_RETURN
+connect_ep( char *hostname, int conn_id )
+{
+ DAT_SOCK_ADDR remote_addr;
+ DAT_EP_ATTR ep_attr;
+ DAT_RETURN ret;
+ DAT_REGION_DESCRIPTION region;
+ DAT_EVENT event;
+ DAT_COUNT nmore;
+ DAT_LMR_TRIPLET l_iov;
+ DAT_RMR_TRIPLET r_iov;
+ DAT_DTO_COOKIE cookie;
+ int i;
+
+ /* Register send message buffer */
+ LOGPRINTF("%d Registering send Message Buffer %p, len %d\n",
+ getpid(), &rmr_send_msg, sizeof(DAT_RMR_TRIPLET) );
+ region.for_va = &rmr_send_msg;
+ ret = dat_lmr_create( h_ia,
+ DAT_MEM_TYPE_VIRTUAL,
+ region,
+ sizeof(DAT_RMR_TRIPLET),
+ h_pz,
+ DAT_MEM_PRIV_LOCAL_WRITE_FLAG,
+ &h_lmr_send_msg,
+ &lmr_context_send_msg,
+ &rmr_context_send_msg,
+ ®istered_size_send_msg,
+ ®istered_addr_send_msg );
+
+ if (ret != DAT_SUCCESS) {
+ fprintf(stderr, "%d Error registering send msg buffer: %s\n",
+ getpid(),DT_RetToString(ret));
+ return(ret);
+ }
+ else
+ LOGPRINTF("%d Registered send Message Buffer %p \n",
+ getpid(),region.for_va );
+
+ /* Register Receive buffers */
+ LOGPRINTF("%d Registering Receive Message Buffer %p\n",
+ getpid(), rmr_recv_msg );
+ region.for_va = rmr_recv_msg;
+ ret = dat_lmr_create( h_ia,
+ DAT_MEM_TYPE_VIRTUAL,
+ region,
+ sizeof(DAT_RMR_TRIPLET)*MSG_BUF_COUNT,
+ h_pz,
+ DAT_MEM_PRIV_LOCAL_WRITE_FLAG,
+ &h_lmr_recv_msg,
+ &lmr_context_recv_msg,
+ &rmr_context_recv_msg,
+ ®istered_size_recv_msg,
+ ®istered_addr_recv_msg );
+ if(ret != DAT_SUCCESS) {
+ fprintf(stderr, "%d Error registering recv msg buffer: %s\n",
+ getpid(),DT_RetToString(ret));
+ return(ret);
+ }
+ else
+ LOGPRINTF("%d Registered Receive Message Buffer %p\n",
+ getpid(),region.for_va);
+
+ for ( i = 0; i < MSG_BUF_COUNT; i++ ) {
+ cookie.as_64 = i;
+ l_iov.lmr_context = lmr_context_recv_msg;
+ l_iov.pad = 0;
+ l_iov.virtual_address = (DAT_VADDR)(unsigned long)&rmr_recv_msg[ i ];
+ l_iov.segment_length = sizeof(DAT_RMR_TRIPLET);
+
+ LOGPRINTF("%d Posting Receive Message Buffer %p\n",
+ getpid(), &rmr_recv_msg[ i ]);
+ ret = dat_ep_post_recv( h_ep,
+ 1,
+ &l_iov,
+ cookie,
+ DAT_COMPLETION_DEFAULT_FLAG );
+
+ if(ret != DAT_SUCCESS) {
+ fprintf(stderr, "%d Error registering recv msg buffer: %s\n",
+ getpid(),DT_RetToString(ret));
+ return(ret);
+ }
+ else
+ LOGPRINTF("%d Registered Receive Message Buffer %p\n",
+ getpid(),region.for_va);
+
+ }
+
+ /* setup receive rdma buffer to initial string to be overwritten */
+ strcpy( (char*)rbuf, "blah, blah, blah\n" );
+
+ if ( server ) { /* SERVER */
+
+ /* create the service point for server listen */
+ LOGPRINTF("%d Creating service point for listen\n",getpid());
+ ret = dat_psp_create( h_ia,
+ conn_id,
+ h_cr_evd,
+ DAT_PSP_CONSUMER_FLAG,
+ &h_psp );
+ if(ret != DAT_SUCCESS) {
+ fprintf(stderr, "%d Error dat_psp_create: %s\n",
+ getpid(),DT_RetToString(ret));
+ return(ret);
+ }
+ else
+ LOGPRINTF("%d dat_psp_created for server listen\n", getpid());
+
+ printf("%d Server waiting for connect request..\n", getpid());
+ ret = dat_evd_wait( h_cr_evd, SERVER_TIMEOUT, 1, &event, &nmore );
+ if(ret != DAT_SUCCESS) {
+ fprintf(stderr, "%d Error dat_evd_wait: %s\n",
+ getpid(),DT_RetToString(ret));
+ return(ret);
+ }
+ else
+ LOGPRINTF("%d dat_evd_wait for cr_evd completed\n", getpid());
+
+ if ( event.event_number != DAT_CONNECTION_REQUEST_EVENT ) {
+ fprintf(stderr, "%d Error unexpected cr event : %s\n",
+ getpid(),DT_EventToSTr(event.event_number));
+ return( DAT_ABORT );
+ }
+ if ( (event.event_data.cr_arrival_event_data.conn_qual != SERVER_CONN_QUAL) ||
+ (event.event_data.cr_arrival_event_data.sp_handle.psp_handle != h_psp) ) {
+ fprintf(stderr, "%d Error wrong cr event data : %s\n",
+ getpid(),DT_EventToSTr(event.event_number));
+ return( DAT_ABORT );
+ }
+
+ /* accept connect request from client */
+ h_cr = event.event_data.cr_arrival_event_data.cr_handle;
+ LOGPRINTF("%d Accepting connect request from client\n",getpid());
+ ret = dat_cr_accept( h_cr, h_ep, 0, (DAT_PVOID)0 );
+ if(ret != DAT_SUCCESS) {
+ fprintf(stderr, "%d Error dat_cr_accept: %s\n",
+ getpid(),DT_RetToString(ret));
+ return(ret);
+ }
+ else
+ LOGPRINTF("%d dat_cr_accept completed\n", getpid());
+ }
+ else { /* CLIENT */
+ struct addrinfo *target;
+ int rval;
+
+ if (getaddrinfo (hostname, NULL, NULL, &target) != 0) {
+ printf("\n remote name resolution failed!\n");
+ exit ( 1 );
+ }
+
+ rval = ((struct sockaddr_in *)target->ai_addr)->sin_addr.s_addr;
+ printf ("%d Server Name: %s \n", getpid(), hostname);
+ printf ("%d Server Net Address: %d.%d.%d.%d\n", getpid(),
+ (rval >> 0) & 0xff,
+ (rval >> 8) & 0xff,
+ (rval >> 16) & 0xff,
+ (rval >> 24) & 0xff);
+
+ remote_addr = *((DAT_IA_ADDRESS_PTR)target->ai_addr);
+
+ LOGPRINTF("%d Connecting to server\n",getpid());
+ ret = dat_ep_connect( h_ep,
+ &remote_addr,
+ conn_id,
+ CONN_TIMEOUT,
+ 0,
+ (DAT_PVOID)0,
+ 0,
+ DAT_CONNECT_DEFAULT_FLAG );
+ if(ret != DAT_SUCCESS) {
+ fprintf(stderr, "%d Error dat_ep_connect: %s\n",
+ getpid(), DT_RetToString(ret));
+ return(ret);
+ }
+ else
+ LOGPRINTF("%d dat_ep_connect completed\n", getpid());
+ }
+
+ printf("%d Waiting for connect response\n",getpid());
+
+ ret = dat_evd_wait( h_conn_evd, DAT_TIMEOUT_INFINITE, 1, &event, &nmore );
+ if(ret != DAT_SUCCESS) {
+ fprintf(stderr, "%d Error dat_evd_wait: %s\n",
+ getpid(),DT_RetToString(ret));
+ return(ret);
+ }
+ else
+ LOGPRINTF("%d dat_evd_wait for h_conn_evd completed\n", getpid());
+
+ if ( event.event_number != DAT_CONNECTION_EVENT_ESTABLISHED ) {
+ fprintf(stderr, "%d Error unexpected conn event : %s\n",
+ getpid(),DT_EventToSTr(event.event_number));
+ return( DAT_ABORT );
+ }
+ printf("\n%d CONNECTED!\n\n",getpid());
+ connected = 1;
+
+ /*
+ * Setup our remote memory and tell the other side about it
+ */
+ rmr_send_msg.rmr_context = rmr_context_recv;
+ rmr_send_msg.pad = 0;
+ rmr_send_msg.target_address = (DAT_VADDR)(unsigned long)rbuf;
+ rmr_send_msg.segment_length = RDMA_BUFFER_SIZE;
+
+ printf("%d Send RMR to remote: snd_msg: r_key_ctx=%x,pad=%x,va=%llx,len=0x%x\n",
+ getpid(), rmr_send_msg.rmr_context, rmr_send_msg.pad,
+ rmr_send_msg.target_address, rmr_send_msg.segment_length );
+
+ ret = send_msg( &rmr_send_msg,
+ sizeof( DAT_RMR_TRIPLET ),
+ lmr_context_send_msg,
+ cookie,
+ DAT_COMPLETION_SUPPRESS_FLAG );
+
+ if(ret != DAT_SUCCESS) {
+ fprintf(stderr, "%d Error send_msg: %s\n",
+ getpid(),DT_RetToString(ret));
+ return(ret);
+ }
+ else
+ LOGPRINTF("%d send_msg completed\n", getpid());
+
+ /*
+ * Wait for remote RMR information for RDMA
+ */
+ if ( polling ) {
+ printf("%d Polling for remote to send RMR data\n",getpid());
+ while ( dat_evd_dequeue( h_dto_evd, &event ) == DAT_QUEUE_EMPTY );
+ }
+ else {
+ printf("%d Waiting for remote to send RMR data\n",getpid());
+ if (use_cno)
+ {
+ DAT_EVD_HANDLE evd = DAT_HANDLE_NULL;
+ ret = dat_cno_wait( h_dto_cno, DTO_TIMEOUT, &evd );
+ LOGPRINTF("%d cno wait return evd_handle=%p\n", getpid(),evd);
+ if ( evd != h_dto_evd ) {
+ fprintf(stderr,
+ "%d Error waiting on h_dto_cno: evd != h_dto_evd\n",
+ getpid());
+ return( DAT_ABORT );
+ }
+ }
+ /* use wait to dequeue */
+ ret = dat_evd_wait( h_dto_evd, DTO_TIMEOUT, 1, &event, &nmore );
+ if(ret != DAT_SUCCESS) {
+ fprintf(stderr, "%d Error waiting on h_dto_evd: %s\n",
+ getpid(),DT_RetToString(ret));
+ return(ret);
+ }
+ else {
+ LOGPRINTF("%d dat_evd_wait h_dto_evd completed\n", getpid());
+ }
+ }
+
+ printf("%d remote RMR data arrived!\n",getpid());
+
+ if ( event.event_number != DAT_DTO_COMPLETION_EVENT ) {
+ fprintf(stderr, "%d Error unexpected DTO event : %s\n",
+ getpid(),DT_EventToSTr(event.event_number));
+ return( DAT_ABORT );
+ }
+ if ((event.event_data.dto_completion_event_data.transfered_length !=
+ sizeof( DAT_RMR_TRIPLET )) ||
+ (event.event_data.dto_completion_event_data.user_cookie.as_64 !=
+ recv_msg_index) ) {
+ fprintf(stderr,"ERR recv event: len=%d cookie=%d expected %d/%d\n",
+ (int)event.event_data.dto_completion_event_data.transfered_length,
+ (int)event.event_data.dto_completion_event_data.user_cookie.as_64,
+ sizeof(DAT_RMR_TRIPLET), recv_msg_index );
+ return( DAT_ABORT );
+ }
+
+ r_iov = rmr_recv_msg[ recv_msg_index ];
+
+ printf("%d Received RMR from remote: r_iov: r_key_ctx=%x,pad=%x,va=%llx,len=0x%x\n",
+ getpid(), r_iov.rmr_context, r_iov.pad,
+ r_iov.target_address, r_iov.segment_length );
+
+ recv_msg_index++;
+
+ return ( DAT_SUCCESS );
+}
+
+
+void
+disconnect_ep()
+{
+ DAT_RETURN ret;
+ DAT_EVENT event;
+ DAT_COUNT nmore;
+ int i,flush_cnt;
+
+ if (connected) {
+
+ LOGPRINTF("%d dat_ep_disconnect\n", getpid());
+ ret = dat_ep_disconnect( h_ep, DAT_CLOSE_DEFAULT );
+ if(ret != DAT_SUCCESS) {
+ fprintf(stderr, "%d Error dat_ep_disconnect: %s\n",
+ getpid(),DT_RetToString(ret));
+ }
+ else {
+ LOGPRINTF("%d dat_ep_disconnect completed\n", getpid());
+ }
+ }
+
+ /* destroy service point */
+ if (( server ) && ( h_psp != DAT_HANDLE_NULL )) {
+ ret = dat_psp_free( h_psp );
+ if(ret != DAT_SUCCESS) {
+ fprintf(stderr, "%d Error dat_psp_free: %s\n",
+ getpid(),DT_RetToString(ret));
+ }
+ else {
+ LOGPRINTF("%d dat_psp_free completed\n", getpid());
+ }
+ }
+
+ /* Unregister Send message Buffer */
+ if ( h_lmr_send_msg != DAT_HANDLE_NULL ) {
+ LOGPRINTF("%d Unregister send message h_lmr %p \n",getpid(),h_lmr_send_msg);
+ ret = dat_lmr_free(h_lmr_send_msg);
+ if(ret != DAT_SUCCESS) {
+ fprintf(stderr, "%d Error deregistering send msg mr: %s\n",
+ getpid(), DT_RetToString(ret));
+ } else {
+ LOGPRINTF("%d Unregistered send message Buffer\n",getpid());
+ h_lmr_send_msg = NULL;
+ }
+ }
+
+ /* Unregister recv message Buffer */
+ if ( h_lmr_recv_msg != DAT_HANDLE_NULL ) {
+ LOGPRINTF("%d Unregister recv message h_lmr %p \n",getpid(),h_lmr_recv_msg);
+ ret = dat_lmr_free(h_lmr_recv_msg);
+ if(ret != DAT_SUCCESS) {
+ fprintf(stderr, "%d Error deregistering recv msg mr: %s\n",
+ getpid(), DT_RetToString(ret));
+ } else {
+ LOGPRINTF("%d Unregistered recv message Buffer\n",getpid());
+ h_lmr_recv_msg = NULL;
+ }
+ }
+ return;
+}
+
+
+DAT_RETURN
+do_rdma_write_with_msg( )
+{
+ DAT_REGION_DESCRIPTION region;
+ DAT_EVENT event;
+ DAT_COUNT nmore;
+ DAT_LMR_TRIPLET l_iov;
+ DAT_RMR_TRIPLET r_iov;
+ DAT_DTO_COOKIE cookie;
+ DAT_RMR_CONTEXT their_context;
+ DAT_RETURN ret;
+ int i;
+
+ printf("\n %d RDMA WRITE DATA with SEND MSG\n\n",getpid());
+
+ cookie.as_64 = 0x5555;
+
+ if ( recv_msg_index >= MSG_BUF_COUNT )
+ return( DAT_ABORT );
+
+ /* get RMR information from previously received message */
+ r_iov = rmr_recv_msg[ recv_msg_index-1 ];
+
+ if ( server )
+ strcpy( (char*)sbuf, "server written data..." );
+ else
+ strcpy( (char*)sbuf, "client written data..." );
+
+ l_iov.lmr_context = lmr_context_send;
+ l_iov.pad = 0;
+ l_iov.virtual_address = (DAT_VADDR)(unsigned long)sbuf;
+ l_iov.segment_length = buf_len;
+
+ start = get_time();
+ for (i=0;i<burst;i++) {
+ cookie.as_64 = 0x9999;
+ ret = dat_ep_post_rdma_write( h_ep, // ep_handle
+ 1, // num_segments
+ &l_iov, // LMR
+ cookie, // user_cookie
+ &r_iov, // RMR
+ DAT_COMPLETION_SUPPRESS_FLAG );
+ if (ret != DAT_SUCCESS) {
+ fprintf(stderr, "%d: ERROR: dat_ep_post_rdma_write() %s\n",
+ getpid(),DT_RetToString(ret));
+ return( DAT_ABORT );
+ }
+ LOGPRINTF("%d rdma_write # %d completed\n", getpid(),i+1);
+ }
+
+ /*
+ * Send RMR information a 2nd time to indicate completion
+ */
+ rmr_send_msg.rmr_context = rmr_context_recv;
+ rmr_send_msg.pad = 0;
+ rmr_send_msg.target_address = (DAT_VADDR)(unsigned long)rbuf;
+ rmr_send_msg.segment_length = RDMA_BUFFER_SIZE;
+
+ printf("%d Sending completion message\n",getpid());
+
+ ret = send_msg( &rmr_send_msg,
+ sizeof( DAT_RMR_TRIPLET ),
+ lmr_context_send_msg,
+ cookie,
+ DAT_COMPLETION_SUPPRESS_FLAG );
+
+ if(ret != DAT_SUCCESS) {
+ fprintf(stderr, "%d Error send_msg: %s\n",
+ getpid(),DT_RetToString(ret));
+ return(ret);
+ } else {
+ LOGPRINTF("%d send_msg completed\n", getpid());
+ }
+
+ /*
+ * Collect first event, write completion or the inbound recv with immed
+ */
+ if ( polling ) {
+ while ( dat_evd_dequeue( h_dto_evd, &event ) == DAT_QUEUE_EMPTY )
+ rdma_wr_poll_count++;
+ }
+ else {
+ LOGPRINTF("%d waiting for message receive event\n", getpid());
+ if (use_cno) {
+ DAT_EVD_HANDLE evd = DAT_HANDLE_NULL;
+ ret = dat_cno_wait( h_dto_cno, DTO_TIMEOUT, &evd );
+ LOGPRINTF("%d cno wait return evd_handle=%p\n", getpid(),evd);
+ if ( evd != h_dto_evd ) {
+ fprintf(stderr, "%d Error waiting on h_dto_cno: evd != h_dto_evd\n",
+ getpid());
+ return( ret );
+ }
+ }
+ /* use wait to dequeue */
+ ret = dat_evd_wait( h_dto_evd, DTO_TIMEOUT, 1, &event, &nmore );
+ if (ret != DAT_SUCCESS) {
+ fprintf(stderr, "%d: ERROR: DTO dat_evd_wait() %s\n",
+ getpid(),DT_RetToString(ret));
+ return( ret );
+ }
+ }
+ stop = get_time();
+ time.rdma_wr = ((stop - start)*1.0e6);
+
+ /* validate event number and status */
+ printf("%d inbound rdma_write; send message arrived!\n",getpid());
+ if ( event.event_number != DAT_DTO_COMPLETION_EVENT ) {
+ fprintf(stderr, "%d Error unexpected DTO event : %s\n",
+ getpid(),DT_EventToSTr(event.event_number));
+ return( DAT_ABORT );
+ }
+
+ if ( (event.event_data.dto_completion_event_data.transfered_length != sizeof(
DAT_RMR_TRIPLET )) ||
+ (event.event_data.dto_completion_event_data.user_cookie.as_64 != recv_msg_index) ) {
+
+ fprintf(stderr,"unexpected event data for receive: len=%d cookie=%d exp %d/%d\n",
+ (int)event.event_data.dto_completion_event_data.transfered_length,
+ (int)event.event_data.dto_completion_event_data.user_cookie.as_64,
+ sizeof(DAT_RMR_TRIPLET), recv_msg_index );
+
+ return( DAT_ABORT );
+ }
+
+ r_iov = rmr_recv_msg[ recv_msg_index ];
+
+ printf("%d Received RMR from remote: r_iov: ctx=%x,pad=%x,va=%p,len=0x%x\n",
+ getpid(), r_iov.rmr_context,
+ r_iov.pad,
+ (void*)(unsigned long)r_iov.target_address,
+ r_iov.segment_length );
+
+ LOGPRINTF("%d inbound rdma_write; send msg event SUCCESS!!!\n", getpid());
+
+ printf("%d %s RDMA write buffer contains: %s\n",
+ getpid(),
+ server ? "SERVER:" : "CLIENT:",
+ rbuf );
+
+ recv_msg_index++;
+
+ return ( DAT_SUCCESS );
+}
+
+DAT_RETURN
+do_rdma_read_with_msg( )
+{
+ DAT_REGION_DESCRIPTION region;
+ DAT_EVENT event;
+ DAT_COUNT nmore;
+ DAT_LMR_TRIPLET l_iov;
+ DAT_RMR_TRIPLET r_iov;
+ DAT_DTO_COOKIE cookie;
+ DAT_RMR_CONTEXT their_context;
+ DAT_RETURN ret;
+ int i;
+
+ printf("\n %d RDMA READ DATA with SEND MSG\n\n",getpid());
+
+ cookie.as_64 = 0x5555;
+
+ if ( recv_msg_index >= MSG_BUF_COUNT )
+ return( DAT_ABORT );
+
+ /* get RMR information from previously received message */
+ r_iov = rmr_recv_msg[ recv_msg_index-1 ];
+
+ /* setup rdma read buffer to initial string to be overwritten */
+ strcpy( (char*)sbuf, "blah, blah, blah\n" );
+
+ if ( server )
+ strcpy( (char*)rbuf, "server read data..." );
+ else
+ strcpy( (char*)rbuf, "client read data..." );
+
+ l_iov.lmr_context = lmr_context_send;
+ l_iov.pad = 0;
+ l_iov.virtual_address = (DAT_VADDR)(unsigned long)sbuf;
+ l_iov.segment_length = buf_len;
+
+ start = get_time();
+ for (i=0;i<MAX_RDMA_RD;i++) {
+ cookie.as_64 = 0x9999;
+ ret = dat_ep_post_rdma_read( h_ep, // ep_handle
+ 1, // num_segments
+ &l_iov, // LMR
+ cookie, // user_cookie
+ &r_iov, // RMR
+ DAT_COMPLETION_SUPPRESS_FLAG );
+ if (ret != DAT_SUCCESS) {
+ fprintf(stderr, "%d: ERROR: dat_ep_post_rdma_read() %s\n",
+ getpid(),DT_RetToString(ret));
+ return( DAT_ABORT );
+ }
+ LOGPRINTF("%d rdma_read # %d completed\n", getpid(),i+1);
+ }
+
+ /*
+ * Send RMR information a 2nd time to indicate completion
+ */
+ rmr_send_msg.rmr_context = rmr_context_recv;
+ rmr_send_msg.pad = 0;
+ rmr_send_msg.target_address = (DAT_VADDR)(unsigned long)rbuf;
+ rmr_send_msg.segment_length = RDMA_BUFFER_SIZE;
+
+ printf("%d Sending completion message\n",getpid());
+
+ ret = send_msg( &rmr_send_msg,
+ sizeof( DAT_RMR_TRIPLET ),
+ lmr_context_send_msg,
+ cookie,
+ DAT_COMPLETION_SUPPRESS_FLAG );
+
+ if(ret != DAT_SUCCESS) {
+ fprintf(stderr, "%d Error send_msg: %s\n",
+ getpid(),DT_RetToString(ret));
+ return(ret);
+ } else {
+ LOGPRINTF("%d send_msg completed\n", getpid());
+ }
+
+ /*
+ * Collect first event, write completion or the inbound recv with immed
+ */
+ printf("%d Waiting for inbound message....\n",getpid());
+ if ( polling ) {
+ while ( dat_evd_dequeue( h_dto_evd, &event ) == DAT_QUEUE_EMPTY )
+ rdma_rd_poll_count++;
+ }
+ else {
+ LOGPRINTF("%d waiting for message receive event\n", getpid());
+ if (use_cno) {
+ DAT_EVD_HANDLE evd = DAT_HANDLE_NULL;
+ ret = dat_cno_wait( h_dto_cno, DTO_TIMEOUT, &evd );
+ LOGPRINTF("%d cno wait return evd_handle=%p\n", getpid(),evd);
+ if ( evd != h_dto_evd ) {
+ fprintf(stderr, "%d Error waiting on h_dto_cno: evd != h_dto_evd\n",
+ getpid());
+ return( ret );
+ }
+ }
+ /* use wait to dequeue */
+ ret = dat_evd_wait( h_dto_evd, DTO_TIMEOUT, 1, &event, &nmore );
+ if (ret != DAT_SUCCESS) {
+ fprintf(stderr, "%d: ERROR: DTO dat_evd_wait() %s\n",
+ getpid(),DT_RetToString(ret));
+ return( ret );
+ }
+ }
+ stop = get_time();
+ time.rdma_rd = ((stop - start)*1.0e6);
+
+ /* validate event number and status */
+ printf("%d inbound rdma_read; send message arrived!\n",getpid());
+ if ( event.event_number != DAT_DTO_COMPLETION_EVENT ) {
+ fprintf(stderr, "%d Error unexpected DTO event : %s\n",
+ getpid(),DT_EventToSTr(event.event_number));
+ return( DAT_ABORT );
+ }
+
+ if ( (event.event_data.dto_completion_event_data.transfered_length != sizeof(
DAT_RMR_TRIPLET )) ||
+ (event.event_data.dto_completion_event_data.user_cookie.as_64 != recv_msg_index) ) {
+
+ fprintf(stderr,"unexpected event data for receive: len=%d cookie=%d exp %d/%d\n",
+ (int)event.event_data.dto_completion_event_data.transfered_length,
+ (int)event.event_data.dto_completion_event_data.user_cookie.as_64,
+ sizeof(DAT_RMR_TRIPLET), recv_msg_index );
+
+ return( DAT_ABORT );
+ }
+
+ r_iov = rmr_recv_msg[ recv_msg_index ];
+
+ printf("%d Received RMR from remote: r_iov: ctx=%x,pad=%x,va=%p,len=0x%x\n",
+ getpid(), r_iov.rmr_context, r_iov.pad,
+ (void*)(unsigned long)r_iov.target_address, r_iov.segment_length );
+
+ LOGPRINTF("%d inbound rdma_write; send msg event SUCCESS!!!\n", getpid());
+
+ printf("%d %s RCV RDMA read buffer contains: %s\n",
+ getpid(),
+ server ? "SERVER:" : "CLIENT:",
+ sbuf );
+
+ recv_msg_index++;
+
+ return ( DAT_SUCCESS );
+}
+
+
+DAT_RETURN
+do_ping_pong_msg( )
+{
+ DAT_EVENT event;
+ DAT_COUNT nmore;
+ DAT_DTO_COOKIE cookie;
+ DAT_LMR_TRIPLET l_iov;
+ DAT_RETURN ret;
+ int i;
+ unsigned char *snd_buf;
+ unsigned char *rcv_buf;
+
+ printf("\n %d PING DATA with SEND MSG\n\n",getpid());
+
+ snd_buf = sbuf;
+ rcv_buf = rbuf;
+
+ /* pre-post all buffers */
+ for ( i=0; i < burst; i++ ) {
+ burst_msg_posted++;
+ cookie.as_64 = i;
+ l_iov.lmr_context = lmr_context_recv;
+ l_iov.pad = 0;
+ l_iov.virtual_address = (DAT_VADDR)(unsigned long)rcv_buf;
+ l_iov.segment_length = buf_len;
+
+ LOGPRINTF("%d Pre-posting Receive Message Buffers %p\n",
+ getpid(), rcv_buf );
+
+ ret = dat_ep_post_recv( h_ep,
+ 1,
+ &l_iov,
+ cookie,
+ DAT_COMPLETION_DEFAULT_FLAG );
+
+ if(ret != DAT_SUCCESS) {
+ fprintf(stderr, "%d Error posting recv msg buffer: %s\n",
+ getpid(),DT_RetToString(ret));
+ return(ret);
+ }
+ else {
+ LOGPRINTF("%d Posted Receive Message Buffer %p\n",
+ getpid(),rcv_buf);
+ }
+
+ /* next buffer */
+ rcv_buf += buf_len;
+ }
+ sleep(1);
+
+ /* Initialize recv_buf and index to beginning */
+ rcv_buf = rbuf;
+ burst_msg_index=0;
+
+ /* client ping 0x55, server pong 0xAA in first byte */
+ start = get_time();
+ for ( i=0;i<burst;i++ ) {
+ /* walk the send and recv buffers */
+ if ( !server ) {
+ *snd_buf = 0x55;
+
+ LOGPRINTF("%d %s SND buffer %p contains: 0x%x len=%d\n",
+ getpid(), server ? "SERVER:" : "CLIENT:",
+ snd_buf, *snd_buf, buf_len );
+
+ ret = send_msg( snd_buf,
+ buf_len,
+ lmr_context_send,
+ cookie,
+ DAT_COMPLETION_SUPPRESS_FLAG );
+
+ if(ret != DAT_SUCCESS) {
+ fprintf(stderr, "%d Error send_msg: %s\n",
+ getpid(),DT_RetToString(ret));
+ return(ret);
+ }
+ else {
+ LOGPRINTF("%d send_msg completed\n", getpid());
+ }
+ }
+
+ /* Wait for recv message */
+ if ( polling ) {
+ poll_count=0;
+ LOGPRINTF("%d Polling for message receive event\n", getpid());
+ while ( dat_evd_dequeue( h_dto_evd, &event ) == DAT_QUEUE_EMPTY )
+ poll_count++;
+ }
+ else {
+ LOGPRINTF("%d waiting for message receive event\n", getpid());
+ if (use_cno) {
+ DAT_EVD_HANDLE evd = DAT_HANDLE_NULL;
+ ret = dat_cno_wait( h_dto_cno, DTO_TIMEOUT, &evd );
+ LOGPRINTF("%d cno wait return evd_handle=%p\n", getpid(),evd);
+ if ( evd != h_dto_evd )
+ {
+ fprintf(stderr, "%d Error waiting on h_dto_cno: evd != h_dto_evd\n",
+ getpid());
+ return( ret );
+ }
+ }
+ /* use wait to dequeue */
+ ret = dat_evd_wait( h_dto_evd, DTO_TIMEOUT, 1, &event, &nmore );
+ if (ret != DAT_SUCCESS) {
+ fprintf(stderr, "%d: ERROR: DTO dat_evd_wait() %s\n",
+ getpid(),DT_RetToString(ret));
+ return( ret );
+ }
+ }
+ /* start timer after first message arrives on server */
+ if ( i == 0) {
+ start = get_time();
+ }
+ /* validate event number and status */
+ LOGPRINTF("%d inbound message; message arrived!\n",getpid());
+ if ( event.event_number != DAT_DTO_COMPLETION_EVENT ) {
+ fprintf(stderr, "%d Error unexpected DTO event : %s\n",
+ getpid(),DT_EventToSTr(event.event_number));
+ return( DAT_ABORT );
+ }
+ if ((event.event_data.dto_completion_event_data.transfered_length
+ != buf_len) ||
+ (event.event_data.dto_completion_event_data.user_cookie.as_64
+ != burst_msg_index) ) {
+ fprintf(stderr,"ERR: recv event: len=%d cookie=%d exp %d/%d\n",
+ (int)event.event_data.dto_completion_event_data.transfered_length,
+ (int)event.event_data.dto_completion_event_data.user_cookie.as_64,
+ buf_len, burst_msg_index );
+
+ return( DAT_ABORT );
+ }
+
+ LOGPRINTF("%d %s RCV buffer %p contains: 0x%x len=%d\n",
+ getpid(), server ? "SERVER:" : "CLIENT:",
+ rcv_buf, *rcv_buf, buf_len );
+
+ burst_msg_index++;
+
+ /* If server, change data and send it back to client */
+ if ( server ) {
+ *snd_buf = 0xaa;
+
+ LOGPRINTF("%d %s SND buffer %p contains: 0x%x len=%d\n",
+ getpid(), server ? "SERVER:" : "CLIENT:",
+ snd_buf, *snd_buf, buf_len );
+
+ ret = send_msg( snd_buf,
+ buf_len,
+ lmr_context_send,
+ cookie,
+ DAT_COMPLETION_SUPPRESS_FLAG );
+
+ if(ret != DAT_SUCCESS) {
+ fprintf(stderr, "%d Error send_msg: %s\n",
+ getpid(),DT_RetToString(ret));
+ return(ret);
+ }
+ else {
+ LOGPRINTF("%d send_msg completed\n", getpid());
+ }
+ }
+
+ /* next buffers */
+ rcv_buf += buf_len;
+ snd_buf += buf_len;
+ }
+ stop = get_time();
+ time.rtt = ((stop - start)*1.0e6);
+
+ return ( DAT_SUCCESS );
+}
+
+/* Register RDMA Receive buffer */
+DAT_RETURN
+register_rdma_memory(void)
+{
+ DAT_RETURN ret;
+ DAT_REGION_DESCRIPTION region;
+
+ region.for_va = rbuf;
+ start = get_time();
+ ret = dat_lmr_create( h_ia,
+ DAT_MEM_TYPE_VIRTUAL,
+ region,
+ buf_len*burst,
+ h_pz,
+ DAT_MEM_PRIV_ALL_FLAG,
+ &h_lmr_recv,
+ &lmr_context_recv,
+ &rmr_context_recv,
+ ®istered_size_recv,
+ ®istered_addr_recv );
+ stop = get_time();
+ time.reg += ((stop - start)*1.0e6);
+ time.total += time.reg;
+
+ if(ret != DAT_SUCCESS) {
+ fprintf(stderr, "%d Error registering recv buffer: %s\n",
+ getpid(),DT_RetToString(ret));
+ return (ret);
+ } else {
+ LOGPRINTF("%d Registered Receive RDMA Buffer %p\n",
+ getpid(),region.for_va);
+ }
+
+ /* Register RDMA Send buffer */
+ region.for_va = sbuf;
+ ret = dat_lmr_create( h_ia,
+ DAT_MEM_TYPE_VIRTUAL,
+ region,
+ buf_len*burst,
+ h_pz,
+ DAT_MEM_PRIV_ALL_FLAG,
+ &h_lmr_send,
+ &lmr_context_send,
+ &rmr_context_send,
+ ®istered_size_send,
+ ®istered_addr_send );
+ if(ret != DAT_SUCCESS) {
+ fprintf(stderr, "%d Error registering send RDMA buffer: %s\n",
+ getpid(),DT_RetToString(ret));
+ return (ret);
+ } else {
+ LOGPRINTF("%d Registered Send RDMA Buffer %p\n",
+ getpid(),region.for_va);
+ }
+
+ return DAT_SUCCESS;
+}
+
+/*
+ * Unregister RDMA memory
+ */
+DAT_RETURN
+unregister_rdma_memory(void)
+{
+ DAT_RETURN ret;
+
+ /* Unregister Recv Buffer */
+ if ( h_lmr_recv != DAT_HANDLE_NULL ) {
+ LOGPRINTF("%d Unregister h_lmr %p \n",getpid(),h_lmr_recv);
+ start = get_time();
+ ret = dat_lmr_free(h_lmr_recv);
+ stop = get_time();
+ time.unreg += ((stop - start)*1.0e6);
+ time.total += time.unreg;
+ if(ret != DAT_SUCCESS) {
+ fprintf(stderr, "%d Error deregistering recv mr: %s\n",
+ getpid(), DT_RetToString(ret));
+ return (ret);
+ }
+ else {
+ LOGPRINTF("%d Unregistered Recv Buffer\n",getpid());
+ h_lmr_recv = NULL;
+ }
+ }
+
+ /* Unregister Send Buffer */
+ if ( h_lmr_send != DAT_HANDLE_NULL ) {
+ LOGPRINTF("%d Unregister h_lmr %p \n",getpid(),h_lmr_send);
+ ret = dat_lmr_free(h_lmr_send);
+ if(ret != DAT_SUCCESS) {
+ fprintf(stderr, "%d Error deregistering send mr: %s\n",
+ getpid(), DT_RetToString(ret));
+ return (ret);
+ }
+ else {
+ LOGPRINTF("%d Unregistered send Buffer\n",getpid());
+ h_lmr_send = NULL;
+ }
+ }
+ return DAT_SUCCESS;
+}
+
+ /*
+ * Create CNO, CR, CONN, and DTO events
+ */
+DAT_RETURN
+create_events(void)
+{
+ DAT_RETURN ret;
+
+ /* create CNO */
+ if (use_cno) {
+ start = get_time();
+ ret = dat_cno_create( h_ia, DAT_OS_WAIT_PROXY_AGENT_NULL, &h_dto_cno );
+ stop = get_time();
+ time.cnoc += ((stop - start)*1.0e6);
+ time.total += time.cnoc;
+ if(ret != DAT_SUCCESS) {
+ fprintf(stderr, "%d Error dat_cno_create: %s\n",
+ getpid(),DT_RetToString(ret));
+ return (ret);
+ }
+ else {
+ LOGPRINTF("%d cr_evd created, %p\n", getpid(), h_dto_cno);
+ }
+ }
+
+ /* create cr EVD */
+ start = get_time();
+ ret = dat_evd_create( h_ia, 10, DAT_HANDLE_NULL, DAT_EVD_CR_FLAG, &h_cr_evd );
+ stop = get_time();
+ time.evdc += ((stop - start)*1.0e6);
+ time.total += time.evdc;
+ if(ret != DAT_SUCCESS) {
+ fprintf(stderr, "%d Error dat_evd_create: %s\n",
+ getpid(),DT_RetToString(ret));
+ return (ret);
+ }
+ else {
+ LOGPRINTF("%d cr_evd created %p\n", getpid(),h_cr_evd);
+ }
+ /* create conn EVD */
+ ret = dat_evd_create( h_ia, 10, DAT_HANDLE_NULL, DAT_EVD_CONNECTION_FLAG, &h_conn_evd );
+ if(ret != DAT_SUCCESS) {
+ fprintf(stderr, "%d Error dat_evd_create: %s\n",
+ getpid(),DT_RetToString(ret));
+ return (ret);
+ }
+ else {
+ LOGPRINTF("%d con_evd created %p\n", getpid(),h_conn_evd);
+ }
+
+ /* create dto EVD, with CNO if use_cno was set */
+ ret = dat_evd_create( h_ia,
+ (MSG_BUF_COUNT*2)+burst*2,
+ h_dto_cno,
+ DAT_EVD_DTO_FLAG,
+ &h_dto_evd );
+ if(ret != DAT_SUCCESS) {
+ fprintf(stderr, "%d Error dat_evd_create: %s\n",
+ getpid(),DT_RetToString(ret));
+ return (ret);
+ }
+ else {
+ LOGPRINTF("%d dto_evd created %p\n", getpid(), h_dto_evd );
+ }
+
+ return DAT_SUCCESS;
+}
+
+/*
+ * Destroy CR, CONN, CNO, and DTO events
+ */
+DAT_RETURN
+destroy_events(void)
+{
+ DAT_RETURN ret;
+
+ /* free cr EVD */
+ if ( h_cr_evd != DAT_HANDLE_NULL ) {
+ LOGPRINTF("%d Free cr EVD %p \n",getpid(),h_cr_evd);
+ ret = dat_evd_free( h_cr_evd );
+ if(ret != DAT_SUCCESS) {
+ fprintf(stderr, "%d Error freeing cr EVD: %s\n",
+ getpid(), DT_RetToString(ret));
+ return (ret);
+ } else {
+ LOGPRINTF("%d Freed cr EVD\n",getpid());
+ h_cr_evd = DAT_HANDLE_NULL;
+ }
+ }
+
+ /* free conn EVD */
+ if ( h_conn_evd != DAT_HANDLE_NULL ) {
+ LOGPRINTF("%d Free conn EVD %p \n",getpid(),h_conn_evd);
+ ret = dat_evd_free( h_conn_evd );
+ if(ret != DAT_SUCCESS) {
+ fprintf(stderr, "%d Error freeing conn EVD: %s\n",
+ getpid(), DT_RetToString(ret));
+ return (ret);
+ }
+ else {
+ LOGPRINTF("%d Freed conn EVD\n",getpid());
+ h_conn_evd = DAT_HANDLE_NULL;
+ }
+ }
+
+ /* free dto EVD */
+ if ( h_dto_evd != DAT_HANDLE_NULL ) {
+ LOGPRINTF("%d Free dto EVD %p \n",getpid(),h_dto_evd);
+ start = get_time();
+ ret = dat_evd_free( h_dto_evd );
+ stop = get_time();
+ time.evdf += ((stop - start)*1.0e6);
+ time.total += time.evdf;
+ if(ret != DAT_SUCCESS) {
+ fprintf(stderr, "%d Error freeing dto EVD: %s\n",
+ getpid(), DT_RetToString(ret));
+ return (ret);
+ }
+ else {
+ LOGPRINTF("%d Freed dto EVD\n",getpid());
+ h_dto_evd = DAT_HANDLE_NULL;
+ }
+ }
+
+ /* free CNO */
+ if ( h_dto_cno != DAT_HANDLE_NULL ) {
+ LOGPRINTF("%d Free dto CNO %p \n",getpid(),h_dto_cno);
+ start = get_time();
+ ret = dat_cno_free( h_dto_cno );
+ stop = get_time();
+ time.cnof += ((stop - start)*1.0e6);
+ time.total += time.cnof;
+ if(ret != DAT_SUCCESS) {
+ fprintf(stderr, "%d Error freeing dto CNO: %s\n",
+ getpid(), DT_RetToString(ret));
+ return (ret);
+ }
+ else {
+ LOGPRINTF("%d Freed dto CNO\n",getpid());
+ h_dto_cno = DAT_HANDLE_NULL;
+ }
+ }
+ return DAT_SUCCESS;
+}
+
+/*
+ * Map DAT_RETURN values to readable strings,
+ * but don't assume the values are zero-based or contiguous.
+ */
+char errmsg[512] = {0};
+const char *
+DT_RetToString (DAT_RETURN ret_value)
+{
+ const char *major_msg, *minor_msg;
+ int sz;
+
+ dat_strerror (ret_value, &major_msg, &minor_msg);
+
+ strcpy(errmsg, major_msg);
+ strcat(errmsg, " ");
+ strcat(errmsg, minor_msg);
+
+ return errmsg;
+}
+
+/*
+ * Map DAT_EVENT_CODE values to readable strings
+ */
+const char *
+DT_EventToSTr (DAT_EVENT_NUMBER event_code)
+{
+ unsigned int i;
+ static struct {
+ const char *name;
+ DAT_RETURN value;
+ }
+ dat_events[] =
+ {
+ # define DATxx(x) { # x, x }
+ DATxx (DAT_DTO_COMPLETION_EVENT),
+ DATxx (DAT_RMR_BIND_COMPLETION_EVENT),
+ DATxx (DAT_CONNECTION_REQUEST_EVENT),
+ DATxx (DAT_CONNECTION_EVENT_ESTABLISHED),
+ DATxx (DAT_CONNECTION_EVENT_PEER_REJECTED),
+ DATxx (DAT_CONNECTION_EVENT_NON_PEER_REJECTED),
+ DATxx (DAT_CONNECTION_EVENT_ACCEPT_COMPLETION_ERROR),
+ DATxx (DAT_CONNECTION_EVENT_DISCONNECTED),
+ DATxx (DAT_CONNECTION_EVENT_BROKEN),
+ DATxx (DAT_CONNECTION_EVENT_TIMED_OUT),
+ DATxx (DAT_CONNECTION_EVENT_UNREACHABLE),
+ DATxx (DAT_ASYNC_ERROR_EVD_OVERFLOW),
+ DATxx (DAT_ASYNC_ERROR_IA_CATASTROPHIC),
+ DATxx (DAT_ASYNC_ERROR_EP_BROKEN),
+ DATxx (DAT_ASYNC_ERROR_TIMED_OUT),
+ DATxx (DAT_ASYNC_ERROR_PROVIDER_INTERNAL_ERROR),
+ DATxx (DAT_SOFTWARE_EVENT)
+ # undef DATxx
+ };
+ # define NUM_EVENTS (sizeof(dat_events)/sizeof(dat_events[0]))
+
+ for (i = 0; i < NUM_EVENTS; i++) {
+ if (dat_events[i].value == event_code)
+ {
+ return ( dat_events[i].name );
+ }
+ }
+
+ return ( "Invalid_DAT_EVENT_NUMBER" );
+}
+
+
+void print_usage()
+{
+ printf("\n DAPL USAGE \n\n");
+ printf("s: server\n");
+ printf("c: use cno\n");
+ printf("v: verbose\n");
+ printf("p: polling\n");
+ printf("d: delay before close\n");
+ printf("b: buf length to allocate\n");
+ printf("B: burst count, rdma and msgs \n");
+ printf("h: hostname\n");
+ printf("\n");
+}
+
Index: dtest/dat.conf
===================================================================
--- dtest/dat.conf (revision 0)
+++ dtest/dat.conf (revision 0)
@@ -0,0 +1,11 @@
+#
+# DAT 1.1 and 1.2 configuration file
+#
+# Each entry should have the following fields:
+#
+# <ia_name> <api_version> <threadsafety> <default> <lib_path> \
+# <provider_version> <ia_params> <platform_params>
+#
+# Example for openib using the first Mellanox adapter, port 1 and port 2
+
+IB1 u1.2 nonthreadsafe default
/home/ardavis/Proj/openib/gen2/users/jlentini/userspace/dapl/udapl/Target/libdapl.so r.1.2 "mthca0
1" ""
Index: dtest/README
===================================================================
--- dtest/README (revision 0)
+++ dtest/README (revision 0)
@@ -0,0 +1,19 @@
+simple dapl test just for initial openIB uDAPL testing...
+
+ dtest/dtest.c
+ dtest/makefile
+ dtest/dat.conf
+
+to build (default uDAPL name == IB1, ib device == mthca0, port == 1)
+ edit makefile and change path (DAT_LIB) to appropriate libdat.so
+ edit dat.conf and change path to appropriate libdapl.so
+ cp dat.conf to /etc/dat.conf
+
+to run:
+ server: dtest
+ client: dtest -h hostname
+
+for verbose uDAPL and uDAT debug:
+
+ export DAPL_DBG_TYPE=0xffff
+ export DAT_DBG_TYPE=0xffff
Index: dtest/makefile
===================================================================
--- dtest/makefile (revision 0)
+++ dtest/makefile (revision 0)
@@ -0,0 +1,16 @@
+CC = gcc
+CFLAGS = -O2 -g
+
+DAT_INC = ../dat/include
+DAT_LIB = /usr/lib64
+
+all: dtest
+
+clean:
+ rm -f *.o;touch *.c;rm -f dtest
+
+dtest: ./dtest.c
+ $(CC) $(CFLAGS) ./dtest.c -o dtest \
+ -DDAPL_PROVIDER='"IB1"' \
+ -I $(DAT_INC) -L $(DAT_LIB) -ldat
+
More information about the general
mailing list