[ofa-general] [PATCH] [DAPL] scm: add support for WinOF
Sean Hefty
sean.hefty at intel.com
Fri Feb 13 14:55:17 PST 2009
Modify the openib_scm provider to support both OFED and WinOF releases.
This takes advantage of having a libibverbs compatibility library.*
Signed-off-by: Sean Hefty <sean.hefty at intel.com>
---
* If only there were a sockets compatility layer... gurgle
This is only build tested for windows, but does run on Linux.
diff --git a/Makefile.am b/Makefile.am
index bfc93f7..5044e36 100755
--- a/Makefile.am
+++ b/Makefile.am
@@ -49,7 +49,8 @@ dapl_udapl_libdaploscm_la_CFLAGS = $(AM_CFLAGS) -D_GNU_SOURCE $(OSFLAGS) $(XFLAG
-DOPENIB -DCQ_WAIT_OBJECT \
-I$(srcdir)/dat/include/ -I$(srcdir)/dapl/include/ \
-I$(srcdir)/dapl/common -I$(srcdir)/dapl/udapl/linux \
- -I$(srcdir)/dapl/openib_scm
+ -I$(srcdir)/dapl/openib_scm \
+ -I$(srcdir)/dapl/openib_scm/linux
if HAVE_LD_VERSION_SCRIPT
dat_version_script = -Wl,--version-script=$(srcdir)/dat/udat/libdat2.map
diff --git a/dapl/openib_scm/README b/dapl/openib_scm/README
deleted file mode 100644
index 239dfe6..0000000
--- a/dapl/openib_scm/README
+++ /dev/null
@@ -1,40 +0,0 @@
-
-OpenIB uDAPL provider using socket-based CM, in leiu of uCM/uAT, to setup QP/channels.
-
-to build:
-
-cd dapl/udapl
-make VERBS=openib_scm clean
-make VERBS=openib_scm
-
-
-Modifications to common code:
-
-- added dapl/openib_scm directory
-
- dapl/udapl/Makefile
-
-New files for openib_scm provider
-
- dapl/openib/dapl_ib_cq.c
- dapl/openib/dapl_ib_dto.h
- dapl/openib/dapl_ib_mem.c
- dapl/openib/dapl_ib_qp.c
- dapl/openib/dapl_ib_util.c
- dapl/openib/dapl_ib_util.h
- dapl/openib/dapl_ib_cm.c
-
-A simple dapl test just for openib_scm testing...
-
- test/dtest/dtest.c
- test/dtest/makefile
-
- server: dtest -s
- client: dtest -h hostname
-
-known issues:
-
- no memory windows support in ibverbs, dat_create_rmr fails.
-
-
-
diff --git a/dapl/openib_scm/dapl_ib_cm.c b/dapl/openib_scm/dapl_ib_cm.c
index 80a7d5e..9a15e42 100644
--- a/dapl/openib_scm/dapl_ib_cm.c
+++ b/dapl/openib_scm/dapl_ib_cm.c
@@ -52,26 +52,169 @@
#include "dapl_cr_util.h"
#include "dapl_name_service.h"
#include "dapl_ib_util.h"
-
-#include <stdio.h>
-#include <unistd.h>
-#include <fcntl.h>
-#include <netinet/tcp.h>
-#include <byteswap.h>
-#include <poll.h>
-
-#include <sys/socket.h>
-#include <netinet/in.h>
-#include <arpa/inet.h>
-
-#if __BYTE_ORDER == __LITTLE_ENDIAN
-static inline uint64_t cpu_to_be64(uint64_t x) {return bswap_64(x);}
-#elif __BYTE_ORDER == __BIG_ENDIAN
-static inline uint64_t cpu_to_be64(uint64_t x) {return x;}
-#endif
+#include "dapl_osd.h"
extern int g_scm_pipe[2];
+#if defined(_WIN32) || defined(_WIN64)
+enum DAPL_FD_EVENTS {
+ DAPL_FD_READ = 0x1,
+ DAPL_FD_WRITE = 0x2,
+ DAPL_FD_ERROR = 0x4
+};
+
+static int dapl_config_socket(DAPL_SOCKET s)
+{
+ unsigned long nonblocking = 1;
+ return ioctlsocket(s, FIONBIO, &nonblocking);
+}
+
+static int dapl_connect_socket(DAPL_SOCKET s, struct sockaddr *addr,
+ int addrlen)
+{
+ int err;
+
+ connect(s, addr, addrlen);
+ err = WSAGetLastError();
+ return (err == WSAEWOULDBLOCK) ? EAGAIN : err;
+}
+
+struct dapl_fd_set {
+ struct fd_set set[3];
+};
+
+static struct dapl_fd_set *dapl_alloc_fd_set(void)
+{
+ return dapl_os_alloc(sizeof(struct dapl_fd_set));
+}
+
+static void dapl_fd_zero(struct dapl_fd_set *set)
+{
+ FD_ZERO(&set->set[0]);
+ FD_ZERO(&set->set[1]);
+ FD_ZERO(&set->set[2]);
+}
+
+static int dapl_fd_set(DAPL_SOCKET s, struct dapl_fd_set *set,
+ enum DAPL_FD_EVENTS event)
+{
+ FD_SET(s, &set->set[(event == DAPL_FD_READ) ? 0 : 1]);
+ FD_SET(s, &set->set[2]);
+ return 0;
+}
+
+static enum DAPL_FD_EVENTS dapl_poll(DAPL_SOCKET s, enum DAPL_FD_EVENTS event)
+{
+ struct fd_set rw_fds;
+ struct fd_set err_fds;
+ struct timeval tv;
+ int ret;
+
+ FD_ZERO(&rw_fds);
+ FD_ZERO(&err_fds);
+ FD_SET(s, &rw_fds);
+ FD_SET(s, &err_fds);
+
+ tv.tv_sec = 0;
+ tv.tv_usec = 0;
+
+ if (event == DAPL_FD_READ)
+ ret = select(1, &rw_fds, NULL, &err_fds, &tv);
+ else
+ ret = select(1, NULL, &rw_fds, &err_fds, &tv);
+
+ if (ret == 0)
+ return 0;
+ else if (FD_ISSET(s, &rw_fds))
+ return event;
+ else if (FD_ISSET(s, &err_fds))
+ return DAPL_FD_ERROR;
+ else
+ return WSAGetLastError();
+}
+
+static int dapl_select(struct dapl_fd_set *set)
+{
+ return select(0, &set->set[0], &set->set[1], &set->set[2], NULL);
+}
+#else // _WIN32 || _WIN64
+enum DAPL_FD_EVENTS {
+ DAPL_FD_READ = POLLIN,
+ DAPL_FD_WRITE = POLLOUT,
+ DAPL_FD_ERROR = POLLERR
+};
+
+static int dapl_config_socket(DAPL_SOCKET s)
+{
+ int ret;
+
+ ret = fcntl(s, F_GETFL);
+ if (ret >= 0)
+ ret = fcntl(s, F_SETFL, ret | O_NONBLOCK);
+ return ret;
+}
+
+static int dapl_connect_socket(DAPL_SOCKET s, struct sockaddr *addr, int addrlen)
+{
+ int ret;
+
+ ret = connect(s, addr, addrlen);
+
+ return (errno == EINPROGRESS) ? EAGAIN : ret;
+}
+
+struct dapl_fd_set {
+ int index;
+ struct pollfd set[DAPL_FD_SETSIZE];
+};
+
+static struct dapl_fd_set *dapl_alloc_fd_set(void)
+{
+ return dapl_os_alloc(sizeof(struct dapl_fd_set));
+}
+
+static void dapl_fd_zero(struct dapl_fd_set *set)
+{
+ set->index = 0;
+}
+
+static int dapl_fd_set(DAPL_SOCKET s, struct dapl_fd_set *set,
+ enum DAPL_FD_EVENTS event)
+{
+ if (set->index == DAPL_FD_SETSIZE - 1) {
+ dapl_log(DAPL_DBG_TYPE_ERR,
+ "SCM ERR: cm_thread exceeded FD_SETSIZE %d\n",
+ set->index + 1);
+ return -1;
+ }
+
+ set->set[set->index].fd = s;
+ set->set[set->index].revents = 0;
+ set->set[set->index++].events = event;
+ return 0;
+}
+
+static enum DAPL_FD_EVENTS dapl_poll(DAPL_SOCKET s, enum DAPL_FD_EVENTS event)
+{
+ struct pollfd fds;
+ int ret;
+
+ fds.fd = s;
+ fds.events = event;
+ fds.revents = 0;
+ ret = poll(&fds, 1, 0);
+ if (ret <= 0)
+ return ret;
+
+ return fds.revents;
+}
+
+static int dapl_select(struct dapl_fd_set *set)
+{
+ return poll(set->set, set->index, -1);
+}
+#endif
+
static struct ib_cm_handle *dapli_cm_create(void)
{
struct ib_cm_handle *cm_ptr;
@@ -85,7 +228,7 @@ static struct ib_cm_handle *dapli_cm_create(void)
(void)dapl_os_memzero(cm_ptr, sizeof(*cm_ptr));
cm_ptr->dst.ver = htons(DSCM_VER);
- cm_ptr->socket = -1;
+ cm_ptr->socket = DAPL_INVALID_SOCKET;
return cm_ptr;
bail:
dapl_os_free(cm_ptr, sizeof(*cm_ptr));
@@ -100,8 +243,8 @@ static void dapli_cm_destroy(struct ib_cm_handle *cm_ptr)
/* cleanup, never made it to work queue */
if (cm_ptr->state == SCM_INIT) {
- if (cm_ptr->socket >= 0)
- close(cm_ptr->socket);
+ if (cm_ptr->socket != DAPL_INVALID_SOCKET)
+ closesocket(cm_ptr->socket);
dapl_os_free(cm_ptr, sizeof(*cm_ptr));
return;
}
@@ -112,9 +255,9 @@ static void dapli_cm_destroy(struct ib_cm_handle *cm_ptr)
cm_ptr->ep->cm_handle = IB_INVALID_HANDLE;
/* close socket if still active */
- if (cm_ptr->socket >= 0) {
- close(cm_ptr->socket);
- cm_ptr->socket = -1;
+ if (cm_ptr->socket != DAPL_INVALID_SOCKET) {
+ closesocket(cm_ptr->socket);
+ cm_ptr->socket = DAPL_INVALID_SOCKET;
}
dapl_os_unlock(&cm_ptr->lock);
@@ -172,14 +315,14 @@ dapli_socket_disconnect(dp_ib_cm_handle_t cm_ptr)
return DAT_SUCCESS;
} else {
/* send disc date, close socket, schedule destroy */
- if (cm_ptr->socket >= 0) {
- if (write(cm_ptr->socket,
- &disc_data, sizeof(disc_data)) == -1)
+ if (cm_ptr->socket != DAPL_INVALID_SOCKET) {
+ if (send(cm_ptr->socket, (char *) &disc_data,
+ sizeof(disc_data), 0) == -1)
dapl_log(DAPL_DBG_TYPE_WARN,
" cm_disc: write error = %s\n",
strerror(errno));
- close(cm_ptr->socket);
- cm_ptr->socket = -1;
+ closesocket(cm_ptr->socket);
+ cm_ptr->socket = DAPL_INVALID_SOCKET;
}
cm_ptr->state = SCM_DISCONNECTED;
}
@@ -211,7 +354,7 @@ void
dapli_socket_connected(dp_ib_cm_handle_t cm_ptr, int err)
{
int len, opt = 1;
- struct iovec iovec[2];
+ struct iovec iov[2];
struct dapl_ep *ep_ptr = cm_ptr->ep;
if (err) {
@@ -226,18 +369,21 @@ dapli_socket_connected(dp_ib_cm_handle_t cm_ptr, int err)
" socket connected, write QP and private data\n");
/* no delay for small packets */
- setsockopt(cm_ptr->socket,IPPROTO_TCP,TCP_NODELAY,&opt,sizeof(opt));
+ setsockopt(cm_ptr->socket, IPPROTO_TCP, TCP_NODELAY,
+ (char *) &opt, sizeof(opt));
/* send qp info and pdata to remote peer */
- iovec[0].iov_base = &cm_ptr->dst;
- iovec[0].iov_len = sizeof(ib_qp_cm_t);
+ iov[0].iov_base = (void *) &cm_ptr->dst;
+ iov[0].iov_len = sizeof(ib_qp_cm_t);
if (cm_ptr->dst.p_size) {
- iovec[1].iov_base = cm_ptr->p_data;
- iovec[1].iov_len = ntohl(cm_ptr->dst.p_size);
+ iov[1].iov_base = cm_ptr->p_data;
+ iov[1].iov_len = ntohl(cm_ptr->dst.p_size);
+ len = writev(cm_ptr->socket, iov, 2);
+ } else {
+ len = writev(cm_ptr->socket, iov, 1);
}
- len = writev(cm_ptr->socket, iovec, (cm_ptr->dst.p_size ? 2:1));
- if (len != (ntohl(cm_ptr->dst.p_size) + sizeof(ib_qp_cm_t))) {
+ if (len != (ntohl(cm_ptr->dst.p_size) + sizeof(ib_qp_cm_t))) {
dapl_log(DAPL_DBG_TYPE_ERR,
" CONN_PENDING write: ERR %s, wcnt=%d -> %s\n",
strerror(errno), len,
@@ -253,9 +399,9 @@ dapli_socket_connected(dp_ib_cm_handle_t cm_ptr, int err)
dapl_dbg_log(DAPL_DBG_TYPE_CM,
" connected: sending SRC GID subnet %016llx id %016llx\n",
(unsigned long long)
- cpu_to_be64(cm_ptr->dst.gid.global.subnet_prefix),
+ htonll(cm_ptr->dst.gid.global.subnet_prefix),
(unsigned long long)
- cpu_to_be64(cm_ptr->dst.gid.global.interface_id));
+ htonll(cm_ptr->dst.gid.global.interface_id));
/* queue up to work thread to avoid blocking consumer */
cm_ptr->state = SCM_RTU_PENDING;
@@ -290,25 +436,23 @@ dapli_socket_connect(DAPL_EP *ep_ptr,
return DAT_INSUFFICIENT_RESOURCES;
/* create, connect, sockopt, and exchange QP information */
- if ((cm_ptr->socket = socket(AF_INET,SOCK_STREAM,0)) < 0 ) {
+ if ((cm_ptr->socket = socket(AF_INET,SOCK_STREAM,0)) == DAPL_INVALID_SOCKET) {
dapl_os_free( cm_ptr, sizeof( *cm_ptr ) );
return DAT_INSUFFICIENT_RESOURCES;
}
- /* non-blocking */
- ret = fcntl(cm_ptr->socket, F_GETFL);
- if (ret < 0 || fcntl(cm_ptr->socket,
- F_SETFL, ret | O_NONBLOCK) < 0) {
- dapl_log(DAPL_DBG_TYPE_ERR,
- " socket connect: fcntl on socket %d ERR %d %s\n",
- cm_ptr->socket, ret,
- strerror(errno));
- goto bail;
- }
+ ret = dapl_config_socket(cm_ptr->socket);
+ if (ret < 0) {
+ dapl_log(DAPL_DBG_TYPE_ERR,
+ " socket connect: config socket %d ERR %d %s\n",
+ cm_ptr->socket, ret, strerror(errno));
+ goto bail;
+ }
((struct sockaddr_in*)r_addr)->sin_port = htons(r_qual);
- ret = connect(cm_ptr->socket, r_addr, sizeof(*r_addr));
- if (ret && errno != EINPROGRESS) {
+ ret = dapl_connect_socket(cm_ptr->socket, (struct sockaddr *) r_addr,
+ sizeof(*r_addr));
+ if (ret && ret != EAGAIN) {
dapl_log(DAPL_DBG_TYPE_ERR,
" socket connect ERROR: %s -> %s r_qual %d\n",
strerror(errno),
@@ -391,16 +535,13 @@ dapli_socket_connect_rtu(dp_ib_cm_handle_t cm_ptr)
{
DAPL_EP *ep_ptr = cm_ptr->ep;
int len;
- struct iovec iovec[2];
short rtu_data = htons(0x0E0F);
ib_cm_events_t event = IB_CME_DESTINATION_REJECT;
/* read DST information into cm_ptr, overwrite SRC info */
dapl_dbg_log(DAPL_DBG_TYPE_EP," connect_rtu: recv peer QP data\n");
- iovec[0].iov_base = &cm_ptr->dst;
- iovec[0].iov_len = sizeof(ib_qp_cm_t);
- len = readv(cm_ptr->socket, iovec, 1);
+ len = recv(cm_ptr->socket, (char *) &cm_ptr->dst, sizeof(ib_qp_cm_t), 0);
if (len != sizeof(ib_qp_cm_t) || ntohs(cm_ptr->dst.ver) != DSCM_VER) {
dapl_log(DAPL_DBG_TYPE_ERR,
" CONN_RTU read: ERR %s, rcnt=%d, ver=%d -> %s\n",
@@ -456,9 +597,7 @@ dapli_socket_connect_rtu(dp_ib_cm_handle_t cm_ptr)
/* read private data into cm_handle if any present */
dapl_dbg_log(DAPL_DBG_TYPE_EP," socket connected, read private data\n");
if (cm_ptr->dst.p_size) {
- iovec[0].iov_base = cm_ptr->p_data;
- iovec[0].iov_len = cm_ptr->dst.p_size;
- len = readv(cm_ptr->socket, iovec, 1);
+ len = recv(cm_ptr->socket, cm_ptr->p_data, cm_ptr->dst.p_size, 0);
if (len != cm_ptr->dst.p_size) {
dapl_log(DAPL_DBG_TYPE_ERR,
" CONN_RTU read pdata: ERR %s, rcnt=%d -> %s\n",
@@ -495,7 +634,7 @@ dapli_socket_connect_rtu(dp_ib_cm_handle_t cm_ptr)
dapl_dbg_log(DAPL_DBG_TYPE_EP," connect_rtu: send RTU\n");
/* complete handshake after final QP state change */
- if (write(cm_ptr->socket, &rtu_data, sizeof(rtu_data)) == -1) {
+ if (send(cm_ptr->socket, (char *) &rtu_data, sizeof(rtu_data), 0) == -1) {
dapl_log(DAPL_DBG_TYPE_ERR,
" CONN_RTU: write error = %s\n", strerror(errno));
goto bail;
@@ -564,7 +703,7 @@ dapli_socket_listen(DAPL_IA *ia_ptr,
cm_ptr->hca = ia_ptr->hca_ptr;
/* bind, listen, set sockopt, accept, exchange data */
- if ((cm_ptr->socket = socket(AF_INET, SOCK_STREAM, 0)) < 0) {
+ if ((cm_ptr->socket = socket(AF_INET, SOCK_STREAM, 0)) == DAPL_INVALID_SOCKET) {
dapl_log(DAPL_DBG_TYPE_ERR,
" ERR: listen socket create: %s\n",
strerror(errno));
@@ -572,7 +711,8 @@ dapli_socket_listen(DAPL_IA *ia_ptr,
goto bail;
}
- setsockopt(cm_ptr->socket,SOL_SOCKET,SO_REUSEADDR,&opt,sizeof(opt));
+ setsockopt(cm_ptr->socket, SOL_SOCKET, SO_REUSEADDR,
+ (char *) &opt, sizeof(opt));
addr.sin_port = htons(serviceID);
addr.sin_family = AF_INET;
addr.sin_addr.s_addr = INADDR_ANY;
@@ -625,7 +765,7 @@ dapli_socket_accept(ib_cm_srvc_handle_t cm_ptr)
(void) dapl_os_memzero(acm_ptr, sizeof(*acm_ptr));
- acm_ptr->socket = -1;
+ acm_ptr->socket = DAPL_INVALID_SOCKET;
acm_ptr->sp = cm_ptr->sp;
acm_ptr->hca = cm_ptr->hca;
@@ -633,7 +773,7 @@ dapli_socket_accept(ib_cm_srvc_handle_t cm_ptr)
acm_ptr->socket = accept(cm_ptr->socket,
(struct sockaddr*)&acm_ptr->dst.ia_address,
(socklen_t*)&len);
- if (acm_ptr->socket < 0) {
+ if (acm_ptr->socket == DAPL_INVALID_SOCKET) {
dapl_log(DAPL_DBG_TYPE_ERR,
" accept: ERR %s on FD %d l_cr %p\n",
strerror(errno),cm_ptr->socket,cm_ptr);
@@ -664,7 +804,7 @@ dapli_socket_accept_data(ib_cm_srvc_handle_t acm_ptr)
dapl_dbg_log(DAPL_DBG_TYPE_EP," socket accepted, read QP data\n");
/* read in DST QP info, IA address. check for private data */
- len = read(acm_ptr->socket, &acm_ptr->dst, sizeof(ib_qp_cm_t));
+ len = recv(acm_ptr->socket, (char *) &acm_ptr->dst, sizeof(ib_qp_cm_t), 0);
if (len != sizeof(ib_qp_cm_t) ||
ntohs(acm_ptr->dst.ver) != DSCM_VER) {
dapl_log(DAPL_DBG_TYPE_ERR,
@@ -700,8 +840,7 @@ dapli_socket_accept_data(ib_cm_srvc_handle_t acm_ptr)
/* read private data into cm_handle if any present */
if (acm_ptr->dst.p_size) {
- len = read( acm_ptr->socket,
- acm_ptr->p_data, acm_ptr->dst.p_size);
+ len = recv(acm_ptr->socket, acm_ptr->p_data, acm_ptr->dst.p_size, 0);
if (len != acm_ptr->dst.p_size) {
dapl_log(DAPL_DBG_TYPE_ERR,
" accept read pdata: ERR %s, rcnt=%d\n",
@@ -757,14 +896,14 @@ dapli_socket_accept_usr(DAPL_EP *ep_ptr,
DAPL_IA *ia_ptr = ep_ptr->header.owner_ia;
dp_ib_cm_handle_t cm_ptr = cr_ptr->ib_cm_handle;
ib_qp_cm_t local;
- struct iovec iovec[2];
+ struct iovec iov[2];
int len;
if (p_size > IB_MAX_REP_PDATA_SIZE)
return DAT_LENGTH_ERROR;
/* must have a accepted socket */
- if (cm_ptr->socket < 0)
+ if (cm_ptr->socket == DAPL_INVALID_SOCKET)
return DAT_INTERNAL_ERROR;
dapl_dbg_log(DAPL_DBG_TYPE_EP,
@@ -844,14 +983,17 @@ dapli_socket_accept_usr(DAPL_EP *ep_ptr,
local.ia_address = ia_ptr->hca_ptr->hca_address;
local.p_size = htonl(p_size);
- iovec[0].iov_base = &local;
- iovec[0].iov_len = sizeof(ib_qp_cm_t);
+ iov[0].iov_base = (void *) &local;
+ iov[0].iov_len = sizeof(ib_qp_cm_t);
if (p_size) {
- iovec[1].iov_base = p_data;
- iovec[1].iov_len = p_size;
+ iov[1].iov_base = p_data;
+ iov[1].iov_len = p_size;
+ len = writev(cm_ptr->socket, iov, 2);
+ } else {
+ len = writev(cm_ptr->socket, iov, 1);
}
- len = writev(cm_ptr->socket, iovec, (p_size ? 2:1));
- if (len != (p_size + sizeof(ib_qp_cm_t))) {
+
+ if (len != (p_size + sizeof(ib_qp_cm_t))) {
dapl_log(DAPL_DBG_TYPE_ERR,
" ACCEPT_USR: ERR %s, wcnt=%d -> %s\n",
strerror(errno), len,
@@ -859,6 +1001,7 @@ dapli_socket_accept_usr(DAPL_EP *ep_ptr,
&cm_ptr->dst.ia_address)->sin_addr));
goto bail;
}
+
dapl_dbg_log(DAPL_DBG_TYPE_CM,
" ACCEPT_USR: local port=0x%x lid=0x%x"
" qpn=0x%x psize=%d\n",
@@ -867,9 +1010,9 @@ dapli_socket_accept_usr(DAPL_EP *ep_ptr,
dapl_dbg_log(DAPL_DBG_TYPE_CM,
" ACCEPT_USR SRC GID subnet %016llx id %016llx\n",
(unsigned long long)
- cpu_to_be64(local.gid.global.subnet_prefix),
+ htonll(local.gid.global.subnet_prefix),
(unsigned long long)
- cpu_to_be64(local.gid.global.interface_id));
+ htonll(local.gid.global.interface_id));
/* save state and reference to EP, queue for RTU data */
cm_ptr->ep = ep_ptr;
@@ -894,7 +1037,7 @@ dapli_socket_accept_rtu(dp_ib_cm_handle_t cm_ptr)
short rtu_data = 0;
/* complete handshake after final QP state change */
- len = read(cm_ptr->socket, &rtu_data, sizeof(rtu_data));
+ len = recv(cm_ptr->socket, (char *) &rtu_data, sizeof(rtu_data), 0);
if (len != sizeof(rtu_data) || ntohs(rtu_data) != 0x0e0f) {
dapl_log(DAPL_DBG_TYPE_ERR,
" ACCEPT_RTU: ERR %s, rcnt=%d rdata=%x\n",
@@ -1108,9 +1251,9 @@ dapls_ib_remove_conn_listener (
/* close accepted socket, free cm_srvc_handle and return */
if (cm_ptr != NULL) {
- if (cm_ptr->socket >= 0) {
- close(cm_ptr->socket );
- cm_ptr->socket = -1;
+ if (cm_ptr->socket != DAPL_INVALID_SOCKET) {
+ closesocket(cm_ptr->socket);
+ cm_ptr->socket = DAPL_INVALID_SOCKET;
}
/* cr_thread will free */
cm_ptr->state = SCM_DESTROY;
@@ -1195,27 +1338,29 @@ dapls_ib_reject_connection(
IN DAT_COUNT psize,
IN const DAT_PVOID pdata)
{
- struct iovec iovec[2];
+ struct iovec iov[2];
dapl_dbg_log (DAPL_DBG_TYPE_EP,
" reject(cm %p reason %x, pdata %p, psize %d)\n",
cm_ptr, reason, pdata, psize);
/* write reject data to indicate reject */
- if (cm_ptr->socket >= 0) {
+ if (cm_ptr->socket != DAPL_INVALID_SOCKET) {
cm_ptr->dst.rej = (uint16_t)reason;
cm_ptr->dst.rej = htons(cm_ptr->dst.rej);
- iovec[0].iov_base = &cm_ptr->dst;
- iovec[0].iov_len = sizeof(ib_qp_cm_t);
+
+ iov[0].iov_base = (void *) &cm_ptr->dst;
+ iov[0].iov_len = sizeof(ib_qp_cm_t);
if (psize) {
- iovec[1].iov_base = pdata;
- iovec[2].iov_len = psize;
- writev(cm_ptr->socket, &iovec[0], 2);
- } else
- writev(cm_ptr->socket, &iovec[0], 1);
-
- close(cm_ptr->socket);
- cm_ptr->socket = -1;
+ iov[1].iov_base = pdata;
+ iov[1].iov_len = psize;
+ writev(cm_ptr->socket, iov, 2);
+ } else {
+ writev(cm_ptr->socket, iov, 1);
+ }
+
+ closesocket(cm_ptr->socket);
+ cm_ptr->socket = DAPL_INVALID_SOCKET;
}
/* cr_thread will destroy CR */
@@ -1444,138 +1589,141 @@ dapls_ib_get_cm_event (
}
/* outbound/inbound CR processing thread to avoid blocking applications */
-#define SCM_MAX_CONN 8192
void cr_thread(void *arg)
{
- struct dapl_hca *hca_ptr = arg;
- dp_ib_cm_handle_t cr, next_cr;
- int opt,ret,idx;
- socklen_t opt_len;
- char rbuf[2];
- struct pollfd ufds[SCM_MAX_CONN];
-
- dapl_dbg_log(DAPL_DBG_TYPE_UTIL," cr_thread: ENTER hca %p\n",hca_ptr);
-
- dapl_os_lock( &hca_ptr->ib_trans.lock );
- hca_ptr->ib_trans.cr_state = IB_THREAD_RUN;
- while (hca_ptr->ib_trans.cr_state == IB_THREAD_RUN) {
- idx=0;
- ufds[idx].fd = g_scm_pipe[0]; /* wakeup and process work */
- ufds[idx].events = POLLIN;
- ufds[idx].revents = 0;
-
- if (!dapl_llist_is_empty(&hca_ptr->ib_trans.list))
- next_cr = dapl_llist_peek_head (&hca_ptr->ib_trans.list);
- else
- next_cr = NULL;
-
- while (next_cr) {
- cr = next_cr;
- if ((cr->socket == -1 && cr->state == SCM_DESTROY) ||
- hca_ptr->ib_trans.cr_state != IB_THREAD_RUN) {
-
- dapl_dbg_log(DAPL_DBG_TYPE_CM," cr_thread: Free %p\n", cr);
- next_cr = dapl_llist_next_entry(&hca_ptr->ib_trans.list,
- (DAPL_LLIST_ENTRY*)&cr->entry );
- dapl_llist_remove_entry(&hca_ptr->ib_trans.list,
- (DAPL_LLIST_ENTRY*)&cr->entry);
- dapl_os_free(cr, sizeof(*cr));
- continue;
- }
-
- if (idx==SCM_MAX_CONN-1) {
- dapl_dbg_log(DAPL_DBG_TYPE_ERR,
- "SCM ERR: cm_thread exceeded FD_SETSIZE %d\n",idx+1);
- continue;
- }
-
- /* Add to ufds for poll, check for immediate work */
- ufds[++idx].fd = cr->socket; /* add listen or cr */
- ufds[idx].revents = 0;
- if (cr->state == SCM_CONN_PENDING)
- ufds[idx].events = POLLOUT;
- else
- ufds[idx].events = POLLIN;
-
- /* check socket for event, accept in or connect out */
- dapl_dbg_log(DAPL_DBG_TYPE_CM," poll cr=%p, fd=%d,%d\n",
- cr, cr->socket, ufds[idx].fd);
- dapl_os_unlock(&hca_ptr->ib_trans.lock);
- ret = poll(&ufds[idx],1,0);
- dapl_dbg_log(DAPL_DBG_TYPE_CM,
- " poll wakeup ret=%d cr->st=%d"
- " ev=0x%x fd=%d\n",
- ret,cr->state,ufds[idx].revents,ufds[idx].fd);
-
- /* data on listen, qp exchange, and on disconnect request */
- if ((ret == 1) && ufds[idx].revents == POLLIN) {
- if (cr->socket > 0) {
- if (cr->state == SCM_LISTEN)
- dapli_socket_accept(cr);
- else if (cr->state == SCM_ACCEPTING)
- dapli_socket_accept_data(cr);
- else if (cr->state == SCM_ACCEPTED)
- dapli_socket_accept_rtu(cr);
- else if (cr->state == SCM_RTU_PENDING)
- dapli_socket_connect_rtu(cr);
- else if (cr->state == SCM_CONNECTED)
- dapli_socket_disconnect(cr);
+ struct dapl_hca *hca_ptr = arg;
+ dp_ib_cm_handle_t cr, next_cr;
+ int opt, ret;
+ socklen_t opt_len;
+ char rbuf[2];
+ struct dapl_fd_set *set;
+ enum DAPL_FD_EVENTS event;
+
+ dapl_dbg_log(DAPL_DBG_TYPE_UTIL," cr_thread: ENTER hca %p\n", hca_ptr);
+ set = dapl_alloc_fd_set();
+ if (!set)
+ goto out;
+
+ dapl_os_lock(&hca_ptr->ib_trans.lock);
+ hca_ptr->ib_trans.cr_state = IB_THREAD_RUN;
+
+ while (hca_ptr->ib_trans.cr_state == IB_THREAD_RUN) {
+ dapl_fd_zero(set);
+ dapl_fd_set(g_scm_pipe[0], set, DAPL_FD_READ);
+
+ if (!dapl_llist_is_empty(&hca_ptr->ib_trans.list))
+ next_cr = dapl_llist_peek_head(&hca_ptr->ib_trans.list);
+ else
+ next_cr = NULL;
+
+ while (next_cr) {
+ cr = next_cr;
+ if ((cr->socket == DAPL_INVALID_SOCKET && cr->state == SCM_DESTROY) ||
+ hca_ptr->ib_trans.cr_state != IB_THREAD_RUN) {
+ next_cr = dapl_llist_next_entry(&hca_ptr->ib_trans.list,
+ (DAPL_LLIST_ENTRY*)&cr->entry);
+ dapl_llist_remove_entry(&hca_ptr->ib_trans.list,
+ (DAPL_LLIST_ENTRY*)&cr->entry);
+ dapl_os_free(cr, sizeof(*cr));
+ continue;
+ }
+
+ event = (cr->state == SCM_CONN_PENDING) ?
+ DAPL_FD_WRITE : DAPL_FD_READ;
+ if (dapl_fd_set(cr->socket, set, event)) {
+ dapl_log(DAPL_DBG_TYPE_ERR,
+ " cr_thread: DESTROY CR st=%d fd %d"
+ " -> %s\n", cr->state, cr->socket,
+ inet_ntoa(((struct sockaddr_in*)
+ &cr->dst.ia_address)->sin_addr));
+ dapli_cm_destroy(cr);
+ continue;
+ }
+
+ dapl_dbg_log(DAPL_DBG_TYPE_CM, " poll cr=%p, fd=%d\n",
+ cr, cr->socket);
+ dapl_os_unlock(&hca_ptr->ib_trans.lock);
+
+ ret = dapl_poll(cr->socket, event);
+
+ dapl_dbg_log(DAPL_DBG_TYPE_CM,
+ " poll wakeup ret=%d cr->st=%d fd=%d\n",
+ ret, cr->state, cr->socket);
+
+ /* data on listen, qp exchange, and on disconnect request */
+ if (ret == DAPL_FD_READ) {
+ if (cr->socket != DAPL_INVALID_SOCKET) {
+ switch (cr->state) {
+ case SCM_LISTEN:
+ dapli_socket_accept(cr);
+ break;
+ case SCM_ACCEPTING:
+ dapli_socket_accept_data(cr);
+ break;
+ case SCM_ACCEPTED:
+ dapli_socket_accept_rtu(cr);
+ break;
+ case SCM_RTU_PENDING:
+ dapli_socket_connect_rtu(cr);
+ break;
+ case SCM_CONNECTED:
+ dapli_socket_disconnect(cr);
+ break;
+ default:
+ break;
+ }
+ }
+ /* connect socket is writable, check status */
+ } else if (ret == DAPL_FD_WRITE || ret == DAPL_FD_ERROR) {
+ if (cr->state == SCM_CONN_PENDING) {
+ opt = 0;
+ ret = getsockopt(cr->socket, SOL_SOCKET,
+ SO_ERROR, (char *) &opt, &opt_len);
+ if (!ret)
+ dapli_socket_connected(cr, opt);
+ else
+ dapli_socket_connected(cr, errno);
+ } else {
+ dapl_log(DAPL_DBG_TYPE_CM,
+ " CM poll ERR, wrong state(%d) -> %s SKIP\n", cr->state,
+ inet_ntoa(((struct sockaddr_in*)&cr->dst.ia_address)->sin_addr));
+ }
+ } else if (ret != 0) {
+ dapl_log(DAPL_DBG_TYPE_CM,
+ " CM poll warning %s, ret=%d st=%d -> %s\n",
+ strerror(errno), ret, cr->state,
+ inet_ntoa(((struct sockaddr_in*)
+ &cr->dst.ia_address)->sin_addr));
+
+ /* POLLUP, NVAL, or poll error, issue event if connected */
+ if (cr->state == SCM_CONNECTED)
+ dapli_socket_disconnect(cr);
+ }
+
+ dapl_os_lock(&hca_ptr->ib_trans.lock);
+ next_cr = dapl_llist_next_entry(&hca_ptr->ib_trans.list,
+ (DAPL_LLIST_ENTRY*)&cr->entry);
}
- /* connect socket is writable, check status */
- } else if ((ret == 1) &&
- (ufds[idx].revents & POLLOUT ||
- ufds[idx].revents & POLLERR)) {
- if (cr->state == SCM_CONN_PENDING) {
- opt = 0;
- ret = getsockopt(cr->socket, SOL_SOCKET,
- SO_ERROR, &opt, &opt_len);
- if (!ret)
- dapli_socket_connected(cr,opt);
- else
- dapli_socket_connected(cr,errno);
- } else {
- dapl_log(DAPL_DBG_TYPE_CM,
- " CM poll ERR, wrong state(%d) -> %s SKIP\n",
- cr->state,
- inet_ntoa(((struct sockaddr_in*)
- &cr->dst.ia_address)->sin_addr));
+
+ dapl_os_unlock(&hca_ptr->ib_trans.lock);
+ dapl_dbg_log(DAPL_DBG_TYPE_CM," cr_thread: sleep, fds=%d\n",
+ set->index+1);
+ dapl_select(set);
+ dapl_dbg_log(DAPL_DBG_TYPE_CM," cr_thread: wakeup\n");
+
+ /* if pipe used to wakeup, consume */
+ if (dapl_poll(g_scm_pipe[0], DAPL_FD_READ) == DAPL_FD_READ) {
+ if (read(g_scm_pipe[0], rbuf, 2) == -1)
+ dapl_log(DAPL_DBG_TYPE_CM,
+ " cr_thread: read pipe error = %s\n",
+ strerror(errno));
}
- } else if (ret != 0) {
- dapl_log(DAPL_DBG_TYPE_CM,
- " CM poll warning %s, ret=%d revnt=%x st=%d -> %s\n",
- strerror(errno), ret, ufds[idx].revents, cr->state,
- inet_ntoa(((struct sockaddr_in*)
- &cr->dst.ia_address)->sin_addr));
-
- /* POLLUP, NVAL, or poll error, issue event if connected */
- if (cr->state == SCM_CONNECTED)
- dapli_socket_disconnect(cr);
- }
- dapl_os_lock(&hca_ptr->ib_trans.lock);
- next_cr = dapl_llist_next_entry(&hca_ptr->ib_trans.list,
- (DAPL_LLIST_ENTRY*)&cr->entry);
+ dapl_os_lock(&hca_ptr->ib_trans.lock);
}
+
dapl_os_unlock(&hca_ptr->ib_trans.lock);
- dapl_dbg_log(DAPL_DBG_TYPE_CM," cr_thread: sleep, %d\n", idx+1);
- poll(ufds,idx+1,-1); /* infinite, all sockets and pipe */
- /* if pipe used to wakeup, consume */
- if (ufds[0].revents == POLLIN)
- if (read(g_scm_pipe[0], rbuf, 2) == -1)
- dapl_log(DAPL_DBG_TYPE_CM,
- " cr_thread: read pipe error = %s\n",
- strerror(errno));
- dapl_dbg_log(DAPL_DBG_TYPE_CM," cr_thread: wakeup\n");
- dapl_os_lock(&hca_ptr->ib_trans.lock);
- }
- dapl_os_unlock(&hca_ptr->ib_trans.lock);
- hca_ptr->ib_trans.cr_state = IB_THREAD_EXIT;
- dapl_dbg_log(DAPL_DBG_TYPE_UTIL," cr_thread(hca %p) exit\n",hca_ptr);
+ free(set);
+out:
+ hca_ptr->ib_trans.cr_state = IB_THREAD_EXIT;
+ dapl_dbg_log(DAPL_DBG_TYPE_UTIL," cr_thread(hca %p) exit\n",hca_ptr);
}
-
-/*
- * Local variables:
- * c-indent-level: 4
- * c-basic-offset: 4
- * tab-width: 8
- * End:
- */
diff --git a/dapl/openib_scm/dapl_ib_cq.c b/dapl/openib_scm/dapl_ib_cq.c
index 7d6bd4f..59fff11 100644
--- a/dapl/openib_scm/dapl_ib_cq.c
+++ b/dapl/openib_scm/dapl_ib_cq.c
@@ -46,97 +46,111 @@
*
**************************************************************************/
+#include "openib_osd.h"
#include "dapl.h"
#include "dapl_adapter_util.h"
#include "dapl_lmr_util.h"
#include "dapl_evd_util.h"
#include "dapl_ring_buffer_util.h"
-#include <sys/poll.h>
-#include <signal.h>
-int dapli_cq_thread_init(struct dapl_hca *hca_ptr)
+#if defined(_WIN64) || defined(_WIN32)
+void dapli_cq_thread_destroy(struct dapl_hca *hca_ptr)
{
- DAT_RETURN dat_status;
+ dapl_dbg_log(DAPL_DBG_TYPE_UTIL," cq_thread_destroy(%p)\n", hca_ptr);
- dapl_dbg_log(DAPL_DBG_TYPE_UTIL," cq_thread_init(%p)\n", hca_ptr);
+ if (hca_ptr->ib_trans.cq_state != IB_THREAD_RUN)
+ return;
- /* create thread to process inbound connect request */
- hca_ptr->ib_trans.cq_state = IB_THREAD_INIT;
- dat_status = dapl_os_thread_create(cq_thread, (void*)hca_ptr, &hca_ptr->ib_trans.cq_thread);
- if (dat_status != DAT_SUCCESS)
- {
- dapl_dbg_log(DAPL_DBG_TYPE_ERR,
- " cq_thread_init: failed to create thread\n");
- return 1;
- }
+ /* destroy cr_thread and lock */
+ hca_ptr->ib_trans.cq_state = IB_THREAD_CANCEL;
+ SetEvent(hca_ptr->ib_trans.ib_cq->event);
+ dapl_dbg_log(DAPL_DBG_TYPE_CM," cq_thread_destroy(%p) cancel\n",hca_ptr);
+ while (hca_ptr->ib_trans.cq_state != IB_THREAD_EXIT) {
+ dapl_os_sleep_usec(20000);
+ }
+ dapl_dbg_log(DAPL_DBG_TYPE_UTIL," cq_thread_destroy(%d) exit\n",dapl_os_getpid());
+}
+
+static void cq_thread(void *arg)
+{
+ struct dapl_hca *hca_ptr = arg;
+ struct dapl_evd *evd_ptr;
+ struct ibv_cq *ibv_cq = NULL;
+
+ hca_ptr->ib_trans.cq_state = IB_THREAD_RUN;
+
+ dapl_dbg_log(DAPL_DBG_TYPE_UTIL," cq_thread: ENTER hca %p\n",hca_ptr);
- /* wait for thread to start */
- while (hca_ptr->ib_trans.cq_state != IB_THREAD_RUN) {
- struct timespec sleep, remain;
- sleep.tv_sec = 0;
- sleep.tv_nsec = 20000000; /* 20 ms */
- dapl_dbg_log(DAPL_DBG_TYPE_UTIL,
- " cq_thread_init: waiting for cq_thread\n");
- nanosleep (&sleep, &remain);
- }
- dapl_dbg_log(DAPL_DBG_TYPE_UTIL," cq_thread_init(%d) exit\n",getpid());
- return 0;
+ /* wait on DTO event, or signal to abort */
+ while (hca_ptr->ib_trans.cq_state == IB_THREAD_RUN) {
+ if (!ibv_get_cq_event(hca_ptr->ib_trans.ib_cq, &ibv_cq, (void*)&evd_ptr)) {
+
+ if (DAPL_BAD_HANDLE(evd_ptr, DAPL_MAGIC_EVD)) {
+ ibv_ack_cq_events(ibv_cq, 1);
+ return;
+ }
+
+ /* process DTO event via callback */
+ dapl_evd_dto_callback(hca_ptr->ib_hca_handle, evd_ptr->ib_cq_handle,
+ (void*)evd_ptr );
+
+ ibv_ack_cq_events(ibv_cq, 1);
+ }
+ }
+ hca_ptr->ib_trans.cq_state = IB_THREAD_EXIT;
+ dapl_dbg_log(DAPL_DBG_TYPE_UTIL," cq_thread: EXIT: hca %p \n", hca_ptr);
}
+#else // _WIN32 || _WIN64
+
void dapli_cq_thread_destroy(struct dapl_hca *hca_ptr)
{
- dapl_dbg_log(DAPL_DBG_TYPE_UTIL," cq_thread_destroy(%p)\n", hca_ptr);
+ dapl_dbg_log(DAPL_DBG_TYPE_UTIL," cq_thread_destroy(%p)\n", hca_ptr);
if (hca_ptr->ib_trans.cq_state != IB_THREAD_RUN)
return;
- /* destroy cr_thread and lock */
- hca_ptr->ib_trans.cq_state = IB_THREAD_CANCEL;
- pthread_kill(hca_ptr->ib_trans.cq_thread, SIGUSR1);
- dapl_dbg_log(DAPL_DBG_TYPE_CM," cq_thread_destroy(%p) cancel\n",hca_ptr);
- while (hca_ptr->ib_trans.cq_state != IB_THREAD_EXIT) {
- struct timespec sleep, remain;
- sleep.tv_sec = 0;
- sleep.tv_nsec = 2000000; /* 2 ms */
- dapl_dbg_log(DAPL_DBG_TYPE_UTIL,
- " cq_thread_destroy: waiting for cq_thread\n");
- nanosleep (&sleep, &remain);
- }
- dapl_dbg_log(DAPL_DBG_TYPE_UTIL," cq_thread_destroy(%d) exit\n",getpid());
+ /* destroy cr_thread and lock */
+ hca_ptr->ib_trans.cq_state = IB_THREAD_CANCEL;
+ pthread_kill(hca_ptr->ib_trans.cq_thread, SIGUSR1);
+ dapl_dbg_log(DAPL_DBG_TYPE_CM," cq_thread_destroy(%p) cancel\n",hca_ptr);
+ while (hca_ptr->ib_trans.cq_state != IB_THREAD_EXIT) {
+ dapl_os_sleep_usec(20000);
+ }
+ dapl_dbg_log(DAPL_DBG_TYPE_UTIL," cq_thread_destroy(%d) exit\n",dapl_os_getpid());
}
/* catch the signal */
static void ib_cq_handler(int signum)
{
- return;
+ return;
}
-void cq_thread( void *arg )
+static void cq_thread(void *arg)
{
- struct dapl_hca *hca_ptr = arg;
- struct dapl_evd *evd_ptr;
- struct ibv_cq *ibv_cq = NULL;
+ struct dapl_hca *hca_ptr = arg;
+ struct dapl_evd *evd_ptr;
+ struct ibv_cq *ibv_cq = NULL;
sigset_t sigset;
sigemptyset(&sigset);
- sigaddset(&sigset,SIGUSR1);
- pthread_sigmask(SIG_UNBLOCK, &sigset, NULL);
- signal(SIGUSR1, ib_cq_handler);
+ sigaddset(&sigset,SIGUSR1);
+ pthread_sigmask(SIG_UNBLOCK, &sigset, NULL);
+ signal(SIGUSR1, ib_cq_handler);
hca_ptr->ib_trans.cq_state = IB_THREAD_RUN;
-
+
dapl_dbg_log(DAPL_DBG_TYPE_UTIL," cq_thread: ENTER hca %p\n",hca_ptr);
- /* wait on DTO event, or signal to abort */
- while (hca_ptr->ib_trans.cq_state == IB_THREAD_RUN) {
- struct pollfd cq_fd = {
- .fd = hca_ptr->ib_trans.ib_cq->fd,
- .events = POLLIN,
- .revents = 0
- };
+ /* wait on DTO event, or signal to abort */
+ while (hca_ptr->ib_trans.cq_state == IB_THREAD_RUN) {
+ struct pollfd cq_fd = {
+ .fd = hca_ptr->ib_trans.ib_cq->fd,
+ .events = POLLIN,
+ .revents = 0
+ };
if ((poll(&cq_fd, 1, -1) == 1) &&
- (!ibv_get_cq_event(hca_ptr->ib_trans.ib_cq,
- &ibv_cq, (void*)&evd_ptr))) {
+ (!ibv_get_cq_event(hca_ptr->ib_trans.ib_cq, &ibv_cq, (void*)&evd_ptr))) {
if (DAPL_BAD_HANDLE(evd_ptr, DAPL_MAGIC_EVD)) {
ibv_ack_cq_events(ibv_cq, 1);
@@ -144,15 +158,40 @@ void cq_thread( void *arg )
}
/* process DTO event via callback */
- dapl_evd_dto_callback ( hca_ptr->ib_hca_handle,
- evd_ptr->ib_cq_handle,
- (void*)evd_ptr );
+ dapl_evd_dto_callback(hca_ptr->ib_hca_handle,
+ evd_ptr->ib_cq_handle, (void*)evd_ptr );
ibv_ack_cq_events(ibv_cq, 1);
}
- }
- hca_ptr->ib_trans.cq_state = IB_THREAD_EXIT;
- dapl_dbg_log(DAPL_DBG_TYPE_UTIL," cq_thread: EXIT: hca %p \n", hca_ptr);
+ }
+ hca_ptr->ib_trans.cq_state = IB_THREAD_EXIT;
+ dapl_dbg_log(DAPL_DBG_TYPE_UTIL," cq_thread: EXIT: hca %p \n", hca_ptr);
+}
+
+#endif // _WIN32 || _WIN64
+
+
+int dapli_cq_thread_init(struct dapl_hca *hca_ptr)
+{
+ DAT_RETURN dat_status;
+
+ dapl_dbg_log(DAPL_DBG_TYPE_UTIL," cq_thread_init(%p)\n", hca_ptr);
+
+ /* create thread to process inbound connect request */
+ hca_ptr->ib_trans.cq_state = IB_THREAD_INIT;
+ dat_status = dapl_os_thread_create(cq_thread, (void*)hca_ptr, &hca_ptr->ib_trans.cq_thread);
+ if (dat_status != DAT_SUCCESS) {
+ dapl_dbg_log(DAPL_DBG_TYPE_ERR,
+ " cq_thread_init: failed to create thread\n");
+ return 1;
+ }
+
+ /* wait for thread to start */
+ while (hca_ptr->ib_trans.cq_state != IB_THREAD_RUN) {
+ dapl_os_sleep_usec(20000);
+ }
+ dapl_dbg_log(DAPL_DBG_TYPE_UTIL," cq_thread_init(%d) exit\n",dapl_os_getpid());
+ return 0;
}
@@ -308,11 +347,11 @@ dapls_ib_cq_alloc (
IN DAPL_EVD *evd_ptr,
IN DAT_COUNT *cqlen )
{
+ struct ibv_comp_channel *channel = ia_ptr->hca_ptr->ib_trans.ib_cq;
+
dapl_dbg_log ( DAPL_DBG_TYPE_UTIL,
"dapls_ib_cq_alloc: evd %p cqlen=%d \n", evd_ptr, *cqlen );
- struct ibv_comp_channel *channel = ia_ptr->hca_ptr->ib_trans.ib_cq;
-
#ifdef CQ_WAIT_OBJECT
if (evd_ptr->cq_wait_obj_handle)
channel = evd_ptr->cq_wait_obj_handle;
diff --git a/dapl/openib_scm/dapl_ib_dto.h b/dapl/openib_scm/dapl_ib_dto.h
index 45000b9..fa19d01 100644
--- a/dapl/openib_scm/dapl_ib_dto.h
+++ b/dapl/openib_scm/dapl_ib_dto.h
@@ -147,12 +147,6 @@ dapls_ib_post_send (
IN const DAT_RMR_TRIPLET *remote_iov,
IN DAT_COMPLETION_FLAGS completion_flags)
{
- dapl_dbg_log(DAPL_DBG_TYPE_EP,
- " post_snd: ep %p op %d ck %p sgs",
- "%d l_iov %p r_iov %p f %d\n",
- ep_ptr, op_type, cookie, segments, local_iov,
- remote_iov, completion_flags);
-
ib_data_segment_t ds_array[DEFAULT_DS_ENTRIES];
ib_data_segment_t *ds_array_p, *ds_array_start_p = NULL;
struct ibv_send_wr wr;
@@ -163,6 +157,12 @@ dapls_ib_post_send (
int ret;
dapl_dbg_log(DAPL_DBG_TYPE_EP,
+ " post_snd: ep %p op %d ck %p sgs",
+ "%d l_iov %p r_iov %p f %d\n",
+ ep_ptr, op_type, cookie, segments, local_iov,
+ remote_iov, completion_flags);
+
+ dapl_dbg_log(DAPL_DBG_TYPE_EP,
" post_snd: ep %p cookie %p segs %d l_iov %p\n",
ep_ptr, cookie, segments, local_iov);
@@ -317,12 +317,6 @@ dapls_ib_post_ext_send (
IN DAT_COMPLETION_FLAGS completion_flags,
IN DAT_IB_ADDR_HANDLE *remote_ah)
{
- dapl_dbg_log(DAPL_DBG_TYPE_EP,
- " post_ext_snd: ep %p op %d ck %p sgs",
- "%d l_iov %p r_iov %p f %d\n",
- ep_ptr, op_type, cookie, segments, local_iov,
- remote_iov, completion_flags, remote_ah);
-
ib_data_segment_t ds_array[DEFAULT_DS_ENTRIES];
ib_data_segment_t *ds_array_p, *ds_array_start_p = NULL;
struct ibv_send_wr wr;
@@ -331,6 +325,12 @@ dapls_ib_post_ext_send (
int ret;
dapl_dbg_log(DAPL_DBG_TYPE_EP,
+ " post_ext_snd: ep %p op %d ck %p sgs",
+ "%d l_iov %p r_iov %p f %d\n",
+ ep_ptr, op_type, cookie, segments, local_iov,
+ remote_iov, completion_flags, remote_ah);
+
+ dapl_dbg_log(DAPL_DBG_TYPE_EP,
" post_snd: ep %p cookie %p segs %d l_iov %p\n",
ep_ptr, cookie, segments, local_iov);
diff --git a/dapl/openib_scm/dapl_ib_mem.c b/dapl/openib_scm/dapl_ib_mem.c
index 54340ed..9a97e5e 100644
--- a/dapl/openib_scm/dapl_ib_mem.c
+++ b/dapl/openib_scm/dapl_ib_mem.c
@@ -1,4 +1,4 @@
-/*
+ /*
* Copyright (c) 2005-2007 Intel Corporation. All rights reserved.
*
* This Software is licensed under one of the following licenses:
@@ -35,13 +35,6 @@
*
**********************************************************************/
-#include <sys/ioctl.h> /* for IOCTL's */
-#include <sys/types.h> /* for socket(2) and related bits and pieces */
-#include <sys/socket.h> /* for socket(2) */
-#include <net/if.h> /* for struct ifreq */
-#include <net/if_arp.h> /* for ARPHRD_ETHER */
-#include <unistd.h> /* for _SC_CLK_TCK */
-
#include "dapl.h"
#include "dapl_adapter_util.h"
#include "dapl_lmr_util.h"
@@ -215,10 +208,9 @@ dapls_ib_mr_register(IN DAPL_IA *ia_ptr,
lmr->param.registered_address = (DAT_VADDR)(uintptr_t)virt_addr;
dapl_dbg_log(DAPL_DBG_TYPE_UTIL,
- " mr_register: mr=%p addr=%p h %x pd %p ctx %p "
+ " mr_register: mr=%p addr=%p pd %p ctx %p "
"lkey=0x%x rkey=0x%x priv=%x\n",
lmr->mr_handle, lmr->mr_handle->addr,
- lmr->mr_handle->handle,
lmr->mr_handle->pd, lmr->mr_handle->context,
lmr->mr_handle->lkey, lmr->mr_handle->rkey,
length, dapls_convert_privileges(privileges));
diff --git a/dapl/openib_scm/dapl_ib_util.c b/dapl/openib_scm/dapl_ib_util.c
index 92b45d5..d82d3f5 100644
--- a/dapl/openib_scm/dapl_ib_util.c
+++ b/dapl/openib_scm/dapl_ib_util.c
@@ -49,17 +49,13 @@
static const char rcsid[] = "$Id: $";
#endif
+#include "openib_osd.h"
#include "dapl.h"
#include "dapl_adapter_util.h"
#include "dapl_ib_util.h"
+#include "dapl_osd.h"
#include <stdlib.h>
-#include <netinet/tcp.h>
-#include <sys/utsname.h>
-#include <sys/socket.h>
-#include <arpa/inet.h>
-#include <unistd.h>
-#include <fcntl.h>
int g_dapl_loopback_connection = 0;
int g_scm_pipe[2];
@@ -88,52 +84,43 @@ char *dapl_ib_mtu_str(enum ibv_mtu mtu)
}
}
-/* just get IP address for hostname */
-DAT_RETURN getipaddr( char *addr, int addr_len)
+static DAT_RETURN getlocalipaddr(DAT_SOCK_ADDR *addr, int addr_len)
{
- struct sockaddr_in *ipv4_addr = (struct sockaddr_in*)addr;
- struct hostent *h_ptr;
- struct utsname ourname;
+ struct sockaddr_in *sin;
+ struct addrinfo *res, hint, *ai;
+ int ret;
+ char hostname[256];
- if (uname(&ourname) < 0) {
- dapl_log(DAPL_DBG_TYPE_ERR,
- " open_hca: uname err=%s\n", strerror(errno));
+ if (addr_len < sizeof(*sin)) {
return DAT_INTERNAL_ERROR;
}
- h_ptr = gethostbyname(ourname.nodename);
- if (h_ptr == NULL) {
- dapl_log(DAPL_DBG_TYPE_ERR,
- " open_hca: gethostbyname err=%s\n",
- strerror(errno));
- return DAT_INTERNAL_ERROR;
+ ret = gethostname(hostname,256);
+ if (ret)
+ return ret;
+
+ memset(&hint, 0, sizeof hint);
+ hint.ai_flags = AI_PASSIVE;
+ hint.ai_family = AF_INET;
+ hint.ai_socktype = SOCK_STREAM;
+ hint.ai_protocol = IPPROTO_TCP;
+
+ ret = getaddrinfo(hostname, NULL, &hint, &res);
+ if (ret)
+ return ret;
+
+ ret = DAT_INVALID_ADDRESS;
+ for (ai = res; ai; ai = ai->ai_next) {
+ sin = (struct sockaddr_in *) ai->ai_addr;
+ if (*((uint32_t *) &sin->sin_addr) != htonl(0x7f000001)) {
+ *((struct sockaddr_in *) addr) = *sin;
+ ret = DAT_SUCCESS;
+ break;
+ }
}
- if (h_ptr->h_addrtype == AF_INET) {
- int i;
- struct in_addr **alist =
- (struct in_addr **)h_ptr->h_addr_list;
-
- *(uint32_t*)&ipv4_addr->sin_addr = 0;
- ipv4_addr->sin_family = AF_INET;
-
- /* Walk the list of addresses for host */
- for (i=0; alist[i] != NULL; i++) {
- /* first non-loopback address */
- if (*(uint32_t*)alist[i] != htonl(0x7f000001)) {
- dapl_os_memcpy(&ipv4_addr->sin_addr,
- h_ptr->h_addr_list[i],
- 4);
- break;
- }
- }
- /* if no acceptable address found */
- if (*(uint32_t*)&ipv4_addr->sin_addr == 0)
- return DAT_INVALID_ADDRESS;
- } else
- return DAT_INVALID_ADDRESS;
-
- return DAT_SUCCESS;
+ freeaddrinfo(res);
+ return ret;
}
/*
@@ -165,6 +152,28 @@ int32_t dapls_ib_release (void)
return 0;
}
+#if defined(_WIN64) || defined(_WIN32)
+int dapls_config_comp_channel(struct ibv_comp_channel *channel)
+{
+ return 0;
+}
+#else // _WIN64 || WIN32
+int dapls_config_comp_channel(struct ibv_comp_channel *channel)
+{
+ int opts;
+
+ opts = fcntl(channel->fd, F_GETFL); /* uCQ */
+ if (opts < 0 || fcntl(channel->fd, F_SETFL, opts | O_NONBLOCK) < 0) {
+ dapl_log(DAPL_DBG_TYPE_ERR,
+ " dapls_create_comp_channel: fcntl on ib_cq->fd %d ERR %d %s\n",
+ channel->fd, opts, strerror(errno));
+ return errno;
+ }
+
+ return 0;
+}
+#endif
+
/*
* dapls_ib_open_hca
*
@@ -187,7 +196,6 @@ DAT_RETURN dapls_ib_open_hca (
IN DAPL_HCA *hca_ptr)
{
struct ibv_device **dev_list;
- int opts;
int i;
DAT_RETURN dat_status = DAT_SUCCESS;
@@ -219,7 +227,7 @@ found:
dapl_dbg_log(DAPL_DBG_TYPE_UTIL," open_hca: Found dev %s %016llx\n",
ibv_get_device_name(hca_ptr->ib_trans.ib_dev),
(unsigned long long)
- bswap_64(ibv_get_device_guid(hca_ptr->ib_trans.ib_dev)));
+ ntohll(ibv_get_device_guid(hca_ptr->ib_trans.ib_dev)));
hca_ptr->ib_hca_handle = ibv_open_device(hca_ptr->ib_trans.ib_dev);
if (!hca_ptr->ib_hca_handle) {
@@ -268,13 +276,7 @@ found:
goto bail;
}
- opts = fcntl(hca_ptr->ib_trans.ib_cq->fd, F_GETFL); /* uCQ */
- if (opts < 0 || fcntl(hca_ptr->ib_trans.ib_cq->fd,
- F_SETFL, opts | O_NONBLOCK) < 0) {
- dapl_log(DAPL_DBG_TYPE_ERR,
- " open_hca: fcntl on ib_cq->fd %d ERR %d %s\n",
- hca_ptr->ib_trans.ib_cq->fd, opts,
- strerror(errno));
+ if (dapls_config_comp_channel(hca_ptr->ib_trans.ib_cq)) {
goto bail;
}
@@ -309,16 +311,11 @@ found:
/* wait for thread */
while (hca_ptr->ib_trans.cr_state != IB_THREAD_RUN) {
- struct timespec sleep, remain;
- sleep.tv_sec = 0;
- sleep.tv_nsec = 2000000; /* 2 ms */
- dapl_dbg_log(DAPL_DBG_TYPE_UTIL,
- " open_hca: waiting for cr_thread\n");
- nanosleep (&sleep, &remain);
+ dapl_os_sleep_usec(20000);
}
/* get the IP address of the device */
- dat_status = getipaddr((char*)&hca_ptr->hca_address,
+ dat_status = getlocalipaddr((DAT_SOCK_ADDR*) &hca_ptr->hca_address,
sizeof(DAT_SOCK_ADDR6));
dapl_dbg_log(DAPL_DBG_TYPE_UTIL,
@@ -376,16 +373,13 @@ DAT_RETURN dapls_ib_close_hca ( IN DAPL_HCA *hca_ptr )
" thread_destroy: thread wakeup err = %s\n",
strerror(errno));
while (hca_ptr->ib_trans.cr_state != IB_THREAD_EXIT) {
- struct timespec sleep, remain;
- sleep.tv_sec = 0;
- sleep.tv_nsec = 2000000; /* 2 ms */
dapl_dbg_log(DAPL_DBG_TYPE_UTIL,
" close_hca: waiting for cr_thread\n");
if (write(g_scm_pipe[1], "w", sizeof "w") == -1)
dapl_log(DAPL_DBG_TYPE_UTIL,
" thread_destroy: thread wakeup err = %s\n",
strerror(errno));
- nanosleep (&sleep, &remain);
+ dapl_os_sleep_usec(20000);
}
dapl_os_lock_destroy(&hca_ptr->ib_trans.lock);
diff --git a/dapl/openib_scm/dapl_ib_util.h b/dapl/openib_scm/dapl_ib_util.h
index 863da2b..fd1c24e 100644
--- a/dapl/openib_scm/dapl_ib_util.h
+++ b/dapl/openib_scm/dapl_ib_util.h
@@ -49,8 +49,8 @@
#ifndef _DAPL_IB_UTIL_H_
#define _DAPL_IB_UTIL_H_
+#include "openib_osd.h"
#include <infiniband/verbs.h>
-#include <byteswap.h>
#ifdef DAT_EXTENSIONS
#include <dat2/dat_ib_extensions.h>
@@ -73,8 +73,6 @@ typedef struct ibv_wc ib_work_completion_t;
typedef struct ibv_context *ib_hca_handle_t;
typedef ib_hca_handle_t dapl_ibal_ca_t;
-/* CM mappings, user CM not complete use SOCKETS */
-
/* destination info to exchange, define wire protocol version */
#define DSCM_VER 3
typedef struct _ib_qp_cm
@@ -86,7 +84,7 @@ typedef struct _ib_qp_cm
uint32_t qpn;
uint32_t p_size;
DAT_SOCK_ADDR6 ia_address;
- union ibv_gid gid;
+ union ibv_gid gid;
uint16_t qp_type;
} ib_qp_cm_t;
@@ -110,20 +108,18 @@ struct ib_cm_handle
struct dapl_llist_entry entry;
DAPL_OS_LOCK lock;
SCM_STATE state;
- int socket;
+ DAPL_SOCKET socket;
struct dapl_hca *hca;
struct dapl_sp *sp;
- struct dapl_ep *ep;
+ struct dapl_ep *ep;
ib_qp_cm_t dst;
- unsigned char p_data[256];
+ unsigned char p_data[256]; /* must follow ib_qp_cm_t */
struct ibv_ah *ah;
};
typedef struct ib_cm_handle *dp_ib_cm_handle_t;
typedef dp_ib_cm_handle_t ib_cm_srvc_handle_t;
-DAT_RETURN getipaddr(char *addr, int addr_len);
-
/* CM events */
typedef enum
{
@@ -141,9 +137,6 @@ typedef enum
} ib_cm_events_t;
-/* prototype for cm thread */
-void cr_thread (void *arg);
-
/* Operation and state mappings */
typedef enum ibv_send_flags ib_send_op_type_t;
typedef struct ibv_sge ib_data_segment_t;
@@ -289,7 +282,7 @@ typedef struct _ib_hca_transport
DAPL_OS_LOCK cq_lock;
int max_inline_send;
ib_thread_state_t cq_state;
- DAPL_OS_THREAD cq_thread;
+ DAPL_OS_THREAD cq_thread;
struct ibv_comp_channel *ib_cq;
int cr_state;
DAPL_OS_THREAD thread;
@@ -317,7 +310,6 @@ typedef uint32_t ib_shm_transport_t;
/* prototypes */
int32_t dapls_ib_init (void);
int32_t dapls_ib_release (void);
-void cq_thread (void *arg);
void cr_thread(void *arg);
int dapli_cq_thread_init(struct dapl_hca *hca_ptr);
void dapli_cq_thread_destroy(struct dapl_hca *hca_ptr);
@@ -349,7 +341,7 @@ dapl_convert_errno( IN int err, IN const char *str )
if (!err) return DAT_SUCCESS;
#if DAPL_DBG
- if ((err != EAGAIN) && (err != ETIME) && (err != ETIMEDOUT))
+ if ((err != EAGAIN) && (err != ETIMEDOUT))
dapl_dbg_log (DAPL_DBG_TYPE_ERR," %s %s\n", str, strerror(err));
#endif
@@ -357,24 +349,15 @@ dapl_convert_errno( IN int err, IN const char *str )
{
case EOVERFLOW : return DAT_LENGTH_ERROR;
case EACCES : return DAT_PRIVILEGES_VIOLATION;
- case ENXIO :
- case ERANGE :
case EPERM : return DAT_PROTECTION_VIOLATION;
- case EINVAL :
- case EBADF :
- case ENOENT :
- case ENOTSOCK : return DAT_INVALID_HANDLE;
+ case EINVAL : return DAT_INVALID_HANDLE;
case EISCONN : return DAT_INVALID_STATE | DAT_INVALID_STATE_EP_CONNECTED;
case ECONNREFUSED : return DAT_INVALID_STATE | DAT_INVALID_STATE_EP_NOTREADY;
- case ETIME :
case ETIMEDOUT : return DAT_TIMEOUT_EXPIRED;
case ENETUNREACH: return DAT_INVALID_ADDRESS | DAT_INVALID_ADDRESS_UNREACHABLE;
case EADDRINUSE : return DAT_CONN_QUAL_IN_USE;
case EALREADY : return DAT_INVALID_STATE | DAT_INVALID_STATE_EP_ACTCONNPENDING;
- case ENOSPC :
- case ENOMEM :
- case E2BIG :
- case EDQUOT : return DAT_INSUFFICIENT_RESOURCES;
+ case ENOMEM : return DAT_INSUFFICIENT_RESOURCES;
case EAGAIN : return DAT_QUEUE_EMPTY;
case EINTR : return DAT_INTERRUPTED_CALL;
case EAFNOSUPPORT : return DAT_INVALID_ADDRESS | DAT_INVALID_ADDRESS_MALFORMED;
diff --git a/dapl/openib_scm/linux/openib_osd.h b/dapl/openib_scm/linux/openib_osd.h
new file mode 100644
index 0000000..235a82e
--- /dev/null
+++ b/dapl/openib_scm/linux/openib_osd.h
@@ -0,0 +1,21 @@
+#ifndef OPENIB_OSD_H
+#define OPENIB_OSD_H
+
+#include <endian.h>
+#include <netinet/in.h>
+
+#if __BYTE_ORDER == __BIG_ENDIAN
+#define htonll(x) (x)
+#define ntohll(x) (x)
+#elif __BYTE_ORDER == __LITTLE_ENDIAN
+#define htonll(x) bswap_64(x)
+#define ntohll(x) bswap_64(x)
+#endif
+
+#define DAPL_SOCKET int
+#define DAPL_INVALID_SOCKET -1
+#define DAPL_FD_SETSIZE 8192
+
+#define closesocket close
+
+#endif // OPENIB_OSD_H
diff --git a/dapl/openib_scm/windows/openib_osd.h b/dapl/openib_scm/windows/openib_osd.h
new file mode 100644
index 0000000..67c70ec
--- /dev/null
+++ b/dapl/openib_scm/windows/openib_osd.h
@@ -0,0 +1,39 @@
+#ifndef OPENIB_OSD_H
+#define OPENIB_OSD_H
+
+#ifndef FD_SETSIZE
+#define FD_SETSIZE 1024 /* Set before including winsock2 - see select help */
+#define DAPL_FD_SETSIZE FD_SETSIZE
+#endif
+
+#include <winsock2.h>
+#include <ws2tcpip.h>
+#include <io.h>
+#include <fcntl.h>
+
+#define ntohll _byteswap_uint64
+#define htonll _byteswap_uint64
+
+#define pipe(x) _pipe(x, 4096, _O_TEXT)
+#define read _read
+#define write _write
+#define DAPL_SOCKET SOCKET
+#define DAPL_INVALID_SOCKET INVALID_SOCKET
+
+/* allow casting to WSABUF */
+struct iovec
+{
+ u_long iov_len;
+ char FAR* iov_base;
+};
+
+static int writev(DAPL_SOCKET s, struct iovec *vector, int count)
+{
+ int len, ret;
+
+ ret = WSASend(s, (WSABUF *) vector, count, &len, 0, NULL, NULL);
+ return ret ? ret : len;
+}
+
+#endif // OPENIB_OSD_H
+
diff --git a/dapl/udapl/linux/dapl_osd.h b/dapl/udapl/linux/dapl_osd.h
index 6fef9af..ae02944 100644
--- a/dapl/udapl/linux/dapl_osd.h
+++ b/dapl/udapl/linux/dapl_osd.h
@@ -302,6 +302,15 @@ dapl_os_thread_create (
IN void *data,
OUT DAPL_OS_THREAD *thread_id );
+STATIC _INLINE_ void
+dapl_os_sleep_usec(int usec)
+{
+ struct timespec sleep, remain;
+
+ sleep.tv_sec = 0;
+ sleep.tv_nsec = usec * 1000;
+ nanosleep(&sleep, &remain);
+}
/*
* Lock Functions
More information about the general
mailing list