[ofa-general] [PATCH 2/5] [uDAPL v2] dapl scm: change connect and accept to non-blocking to avoid blocking user thread.
Arlin Davis
arlin.r.davis at intel.com
Thu Aug 14 17:06:43 PDT 2008
The connect socket that is used to exchange QP information is now non-blocking
and the data exchange is done via the cr thread. New state RTU_PENDING added.
On the passive side there is a new state ACCEPT_DATA used to avoid read blocking
on the user accept call.
Signed-off by: Arlin Davis ardavis at ichips.intel.com
---
dapl/openib_scm/dapl_ib_cm.c | 415 +++++++++++++++++++++++-----------
dapl/openib_scm/dapl_ib_extensions.c | 2 +-
dapl/openib_scm/dapl_ib_util.c | 2 -
dapl/openib_scm/dapl_ib_util.h | 2 +
4 files changed, 288 insertions(+), 133 deletions(-)
diff --git a/dapl/openib_scm/dapl_ib_cm.c b/dapl/openib_scm/dapl_ib_cm.c
index e712f9d..5ba5ddc 100644
--- a/dapl/openib_scm/dapl_ib_cm.c
+++ b/dapl/openib_scm/dapl_ib_cm.c
@@ -150,7 +150,7 @@ static uint16_t dapli_get_lid(IN struct ibv_context *ctx, IN uint8_t port)
* ACTIVE/PASSIVE: called from CR thread or consumer via ep_disconnect
*/
static DAT_RETURN
-dapli_socket_disconnect(dp_ib_cm_handle_t cm_ptr)
+dapli_socket_disconnect(dp_ib_cm_handle_t cm_ptr)
{
DAPL_EP *ep_ptr = cm_ptr->ep;
DAT_UINT32 disc_data = htonl(0xdead);
@@ -197,6 +197,65 @@ dapli_socket_disconnect(dp_ib_cm_handle_t cm_ptr)
return DAT_SUCCESS;
}
+/*
+ * ACTIVE: socket connected, send QP information to peer
+ */
+void
+dapli_socket_connected(dp_ib_cm_handle_t cm_ptr, int err)
+{
+ int len, opt = 1;
+ struct iovec iovec[2];
+ struct dapl_ep *ep_ptr = cm_ptr->ep;
+
+ if (err) {
+ dapl_log(DAPL_DBG_TYPE_ERR, " connect: socket ERR %s\n",
+ strerror(err));
+ goto bail;
+ }
+ dapl_dbg_log(DAPL_DBG_TYPE_EP,
+ " socket connected, write QP and private data\n");
+
+ /* no delay for small packets */
+ setsockopt(cm_ptr->socket,IPPROTO_TCP,TCP_NODELAY,&opt,sizeof(opt));
+
+ /* send qp info and pdata to remote peer */
+ iovec[0].iov_base = &cm_ptr->dst;
+ iovec[0].iov_len = sizeof(ib_qp_cm_t);
+ if (cm_ptr->dst.p_size) {
+ iovec[1].iov_base = cm_ptr->p_data;
+ iovec[1].iov_len = ntohl(cm_ptr->dst.p_size);
+ }
+
+ len = writev(cm_ptr->socket, iovec, (cm_ptr->dst.p_size ? 2:1));
+ if (len != (ntohl(cm_ptr->dst.p_size) + sizeof(ib_qp_cm_t))) {
+ dapl_log(DAPL_DBG_TYPE_ERR,
+ " CONN_PENDING write: ERR %s, wcnt=%d -> %s\n",
+ strerror(errno), len,
+ inet_ntoa(((struct sockaddr_in *)
+ ep_ptr->param.remote_ia_address_ptr)->sin_addr));
+ goto bail;
+ }
+ dapl_dbg_log(DAPL_DBG_TYPE_CM,
+ " connected: sending SRC port=0x%x lid=0x%x,"
+ " qpn=0x%x, psize=%d\n",
+ ntohs(cm_ptr->dst.port), ntohs(cm_ptr->dst.lid),
+ ntohl(cm_ptr->dst.qpn), ntohl(cm_ptr->dst.p_size));
+ dapl_dbg_log(DAPL_DBG_TYPE_CM,
+ " connected: sending SRC GID subnet %016llx id %016llx\n",
+ (unsigned long long)
+ cpu_to_be64(cm_ptr->dst.gid.global.subnet_prefix),
+ (unsigned long long)
+ cpu_to_be64(cm_ptr->dst.gid.global.interface_id));
+
+ /* queue up to work thread to avoid blocking consumer */
+ cm_ptr->state = SCM_RTU_PENDING;
+ return;
+bail:
+ /* close socket, free cm structure and post error event */
+ dapli_cm_destroy(cm_ptr);
+ dapl_evd_connection_callback(NULL, IB_CME_LOCAL_FAILURE, NULL, ep_ptr);
+}
+
/*
* ACTIVE: Create socket, connect, defer exchange QP information to CR thread
@@ -210,8 +269,7 @@ dapli_socket_connect(DAPL_EP *ep_ptr,
DAT_PVOID p_data)
{
dp_ib_cm_handle_t cm_ptr;
- int len, opt = 1;
- struct iovec iovec[2];
+ int ret;
DAPL_IA *ia_ptr = ep_ptr->header.owner_ia;
dapl_dbg_log(DAPL_DBG_TYPE_EP, " connect: r_qual %d p_size=%d\n",
@@ -227,75 +285,88 @@ dapli_socket_connect(DAPL_EP *ep_ptr,
return DAT_INSUFFICIENT_RESOURCES;
}
- ((struct sockaddr_in*)r_addr)->sin_port = htons(r_qual);
+ /* non-blocking */
+ ret = fcntl(cm_ptr->socket, F_GETFL);
+ if (ret < 0 || fcntl(cm_ptr->socket,
+ F_SETFL, ret | O_NONBLOCK) < 0) {
+ dapl_log(DAPL_DBG_TYPE_ERR,
+ " socket connect: fcntl on socket %d ERR %d %s\n",
+ cm_ptr->socket, ret,
+ strerror(errno));
+ goto bail;
+ }
- if (connect(cm_ptr->socket, r_addr, sizeof(*r_addr)) < 0) {
- dapl_dbg_log(DAPL_DBG_TYPE_ERR,
- " connect: %s on r_qual %d\n",
- strerror(errno), (unsigned int)r_qual);
+ ((struct sockaddr_in*)r_addr)->sin_port = htons(r_qual);
+ ret = connect(cm_ptr->socket, r_addr, sizeof(*r_addr));
+ if (ret && errno != EINPROGRESS) {
+ dapl_log(DAPL_DBG_TYPE_ERR,
+ " socket connect ERROR: %s -> %s r_qual %d\n",
+ strerror(errno),
+ inet_ntoa(((struct sockaddr_in *)r_addr)->sin_addr),
+ (unsigned int)r_qual);
dapli_cm_destroy(cm_ptr);
return DAT_INVALID_ADDRESS;
- }
- setsockopt(cm_ptr->socket,IPPROTO_TCP,TCP_NODELAY,&opt,sizeof(opt));
-
- dapl_dbg_log(DAPL_DBG_TYPE_EP, " socket connected!\n");
-
+ }
/* Send QP info, IA address, and private data */
cm_ptr->dst.qpn = htonl(ep_ptr->qp_handle->qp_num);
+#ifdef DAT_EXTENSIONS
cm_ptr->dst.qp_type = htons(ep_ptr->qp_handle->qp_type);
+#endif
cm_ptr->dst.port = htons(ia_ptr->hca_ptr->port_num);
cm_ptr->dst.lid =
htons(dapli_get_lid(ia_ptr->hca_ptr->ib_hca_handle,
(uint8_t)ia_ptr->hca_ptr->port_num));
- if (cm_ptr->dst.lid == 0xffff)
+ if (cm_ptr->dst.lid == 0xffff) {
+ dapl_log(DAPL_DBG_TYPE_ERR,
+ " CONNECT: query LID ERR %s -> %s\n",
+ strerror(errno),
+ inet_ntoa(((struct sockaddr_in *)r_addr)->sin_addr));
goto bail;
+ }
/* in network order */
if (ibv_query_gid(ia_ptr->hca_ptr->ib_hca_handle,
(uint8_t)ia_ptr->hca_ptr->port_num,
- 0,
- &cm_ptr->dst.gid))
+ 0, &cm_ptr->dst.gid)) {
+ dapl_log(DAPL_DBG_TYPE_ERR,
+ " CONNECT: query GID ERR %s -> %s\n",
+ strerror(errno),
+ inet_ntoa(((struct sockaddr_in *)r_addr)->sin_addr));
goto bail;
+ }
+ /* save references */
+ cm_ptr->hca = ia_ptr->hca_ptr;
+ cm_ptr->ep = ep_ptr;
cm_ptr->dst.ia_address = ia_ptr->hca_ptr->hca_address;
- cm_ptr->dst.p_size = htonl(p_size);
- iovec[0].iov_base = &cm_ptr->dst;
- iovec[0].iov_len = sizeof(ib_qp_cm_t);
if (p_size) {
- iovec[1].iov_base = p_data;
- iovec[1].iov_len = p_size;
+ cm_ptr->dst.p_size = htonl(p_size);
+ dapl_os_memcpy(cm_ptr->p_data, p_data, p_size);
}
- dapl_dbg_log(DAPL_DBG_TYPE_EP,
- " socket connected, write QP (%d), private data (%d)\n",
- sizeof(ib_qp_cm_t),p_size);
+ /* connected or pending, either way results via async event */
+ if (ret == 0)
+ dapli_socket_connected(cm_ptr,0);
+ else
+ cm_ptr->state = SCM_CONN_PENDING;
- len = writev(cm_ptr->socket, iovec, (p_size ? 2:1));
- if (len != (p_size + sizeof(ib_qp_cm_t))) {
- dapl_dbg_log(DAPL_DBG_TYPE_ERR,
- " connect write: ERR %s, wcnt=%d\n",
- strerror(errno), len);
- goto bail;
- }
- dapl_dbg_log(DAPL_DBG_TYPE_CM,
- " connect: SRC port=0x%x lid=0x%x, qpn=0x%x, psize=%d\n",
- ntohs(cm_ptr->dst.port), ntohs(cm_ptr->dst.lid),
- ntohl(cm_ptr->dst.qpn), ntohl(cm_ptr->dst.p_size));
- dapl_dbg_log(DAPL_DBG_TYPE_CM,
- " connect SRC GID subnet %016llx id %016llx\n",
- (unsigned long long)
- cpu_to_be64(cm_ptr->dst.gid.global.subnet_prefix),
- (unsigned long long)
- cpu_to_be64(cm_ptr->dst.gid.global.interface_id));
-
- /* queue up to work thread to avoid blocking consumer */
- cm_ptr->state = SCM_CONN_PENDING;
- cm_ptr->hca = ia_ptr->hca_ptr;
- cm_ptr->ep = ep_ptr;
+ dapl_dbg_log(DAPL_DBG_TYPE_EP,
+ " connect: socket %d to %s r_qual %d pending\n",
+ cm_ptr->socket,
+ inet_ntoa(((struct sockaddr_in *)r_addr)->sin_addr),
+ (unsigned int)r_qual);
+
dapli_cm_queue(cm_ptr);
return DAT_SUCCESS;
bail:
+ dapl_log(DAPL_DBG_TYPE_ERR,
+ " socket connect ERROR: %s query lid(0x%x)/gid"
+ " -> %s r_qual %d\n",
+ strerror(errno), ntohs(cm_ptr->dst.lid),
+ inet_ntoa(((struct sockaddr_in *)r_addr)->sin_addr),
+ (unsigned int)r_qual);
+
/* close socket, free cm structure */
dapli_cm_destroy(cm_ptr);
return DAT_INTERNAL_ERROR;
@@ -306,7 +377,7 @@ bail:
* ACTIVE: exchange QP information, called from CR thread
*/
void
-dapli_socket_connect_rtu(dp_ib_cm_handle_t cm_ptr)
+dapli_socket_connect_rtu(dp_ib_cm_handle_t cm_ptr)
{
DAPL_EP *ep_ptr = cm_ptr->ep;
int len;
@@ -321,16 +392,20 @@ dapli_socket_connect_rtu(dp_ib_cm_handle_t cm_ptr)
iovec[0].iov_len = sizeof(ib_qp_cm_t);
len = readv(cm_ptr->socket, iovec, 1);
if (len != sizeof(ib_qp_cm_t) || ntohs(cm_ptr->dst.ver) != DSCM_VER) {
- dapl_dbg_log(DAPL_DBG_TYPE_ERR,
- " connect_rtu read: ERR %s, rcnt=%d, ver=%d\n",
- strerror(errno), len, ntohs(cm_ptr->dst.ver));
+ dapl_log(DAPL_DBG_TYPE_ERR,
+ " CONN_RTU read: ERR %s, rcnt=%d, ver=%d -> %s\n",
+ strerror(errno), len, cm_ptr->dst.ver,
+ inet_ntoa(((struct sockaddr_in *)
+ ep_ptr->param.remote_ia_address_ptr)->sin_addr));
goto bail;
}
/* check for consumer reject */
if (cm_ptr->dst.rej) {
- dapl_dbg_log(DAPL_DBG_TYPE_CM,
- " connect_rtu read: PEER REJ reason=0x%x\n",
- ntohs(cm_ptr->dst.rej));
+ dapl_log(DAPL_DBG_TYPE_CM,
+ " CONN_RTU read: PEER REJ reason=0x%x -> %s\n",
+ ntohs(cm_ptr->dst.rej),
+ inet_ntoa(((struct sockaddr_in *)
+ ep_ptr->param.remote_ia_address_ptr)->sin_addr));
event = IB_CME_DESTINATION_REJECT_PRIVATE_DATA;
goto bail;
}
@@ -339,7 +414,9 @@ dapli_socket_connect_rtu(dp_ib_cm_handle_t cm_ptr)
cm_ptr->dst.port = ntohs(cm_ptr->dst.port);
cm_ptr->dst.lid = ntohs(cm_ptr->dst.lid);
cm_ptr->dst.qpn = ntohl(cm_ptr->dst.qpn);
+#ifdef DAT_EXTENSIONS
cm_ptr->dst.qp_type = ntohs(cm_ptr->dst.qp_type);
+#endif
cm_ptr->dst.p_size = ntohl(cm_ptr->dst.p_size);
/* save remote address information */
@@ -348,7 +425,7 @@ dapli_socket_connect_rtu(dp_ib_cm_handle_t cm_ptr)
sizeof(ep_ptr->remote_ia_address));
dapl_dbg_log(DAPL_DBG_TYPE_EP,
- " connect_rtu: DST %s port=0x%x lid=0x%x,"
+ " CONN_RTU: DST %s port=0x%x lid=0x%x,"
" qpn=0x%x, qp_type=%d, psize=%d\n",
inet_ntoa(((struct sockaddr_in *)
&cm_ptr->dst.ia_address)->sin_addr),
@@ -358,35 +435,50 @@ dapli_socket_connect_rtu(dp_ib_cm_handle_t cm_ptr)
/* validate private data size before reading */
if (cm_ptr->dst.p_size > IB_MAX_REP_PDATA_SIZE) {
- dapl_dbg_log(DAPL_DBG_TYPE_ERR,
- " connect_rtu read: psize (%d) wrong\n",
- cm_ptr->dst.p_size );
+ dapl_log(DAPL_DBG_TYPE_ERR,
+ " CONN_RTU read: psize (%d) wrong -> %s\n",
+ cm_ptr->dst.p_size,
+ inet_ntoa(((struct sockaddr_in *)
+ ep_ptr->param.remote_ia_address_ptr)->sin_addr));
goto bail;
}
/* read private data into cm_handle if any present */
- dapl_dbg_log(DAPL_DBG_TYPE_EP," socket connected, read pdata\n");
-
+ dapl_dbg_log(DAPL_DBG_TYPE_EP," socket connected, read private data\n");
if (cm_ptr->dst.p_size) {
iovec[0].iov_base = cm_ptr->p_data;
iovec[0].iov_len = cm_ptr->dst.p_size;
len = readv(cm_ptr->socket, iovec, 1);
if (len != cm_ptr->dst.p_size) {
- dapl_dbg_log(DAPL_DBG_TYPE_ERR,
- " connect_rtu read pdata: ERR %s, rcnt=%d\n",
- strerror(errno), len);
+ dapl_log(DAPL_DBG_TYPE_ERR,
+ " CONN_RTU read pdata: ERR %s, rcnt=%d -> %s\n",
+ strerror(errno), len,
+ inet_ntoa(((struct sockaddr_in *)
+ ep_ptr->param.remote_ia_address_ptr)->sin_addr));
goto bail;
}
}
/* modify QP to RTR and then to RTS with remote info */
if (dapls_modify_qp_state(ep_ptr->qp_handle,
- IBV_QPS_RTR, cm_ptr) != DAT_SUCCESS)
+ IBV_QPS_RTR, cm_ptr) != DAT_SUCCESS) {
+ dapl_log(DAPL_DBG_TYPE_ERR,
+ " CONN_RTU: QPS_RTR ERR %s -> %s\n",
+ strerror(errno),
+ inet_ntoa(((struct sockaddr_in *)
+ ep_ptr->param.remote_ia_address_ptr)->sin_addr));
goto bail;
+ }
if (dapls_modify_qp_state(ep_ptr->qp_handle,
- IBV_QPS_RTS, cm_ptr) != DAT_SUCCESS)
+ IBV_QPS_RTS, cm_ptr) != DAT_SUCCESS) {
+ dapl_log(DAPL_DBG_TYPE_ERR,
+ " CONN_RTU: QPS_RTS ERR %s -> %s\n",
+ strerror(errno),
+ inet_ntoa(((struct sockaddr_in *)
+ ep_ptr->param.remote_ia_address_ptr)->sin_addr));
goto bail;
+ }
ep_ptr->qp_state = IB_QP_STATE_RTS;
@@ -426,7 +518,6 @@ dapli_socket_connect_rtu(dp_ib_cm_handle_t cm_ptr)
IB_CME_CONNECTED,
cm_ptr->p_data,
ep_ptr);
-
return;
bail:
/* close socket, free cm structure and post error event */
@@ -461,8 +552,9 @@ dapli_socket_listen(DAPL_IA *ia_ptr,
/* bind, listen, set sockopt, accept, exchange data */
if ((cm_ptr->socket = socket(AF_INET, SOCK_STREAM, 0)) < 0) {
- dapl_dbg_log (DAPL_DBG_TYPE_ERR,
- "socket for listen returned %d\n", errno);
+ dapl_log(DAPL_DBG_TYPE_ERR,
+ " ERR: listen socket create: %s\n",
+ strerror(errno));
dat_status = DAT_INSUFFICIENT_RESOURCES;
goto bail;
}
@@ -474,9 +566,9 @@ dapli_socket_listen(DAPL_IA *ia_ptr,
if ((bind(cm_ptr->socket,(struct sockaddr*)&addr, sizeof(addr)) < 0) ||
(listen(cm_ptr->socket, 128) < 0)) {
- dapl_dbg_log( DAPL_DBG_TYPE_CM,
- " listen: ERROR %s on conn_qual 0x%x\n",
- strerror(errno),serviceID);
+ dapl_dbg_log(DAPL_DBG_TYPE_CM,
+ " listen: ERROR %s on conn_qual 0x%x\n",
+ strerror(errno),serviceID);
if (errno == EADDRINUSE)
dat_status = DAT_CONN_QUAL_IN_USE;
else
@@ -503,25 +595,22 @@ bail:
return dat_status;
}
-
/*
- * PASSIVE: accept socket, receive peer QP information, private data, post cr_event
+ * PASSIVE: accept socket
*/
-DAT_RETURN
+void
dapli_socket_accept(ib_cm_srvc_handle_t cm_ptr)
{
- dp_ib_cm_handle_t acm_ptr;
- void *p_data = NULL;
+ dp_ib_cm_handle_t acm_ptr;
int len;
- DAT_RETURN dat_status = DAT_SUCCESS;
dapl_dbg_log(DAPL_DBG_TYPE_EP," socket_accept\n");
/* Allocate accept CM and initialize */
if ((acm_ptr = dapl_os_alloc(sizeof(*acm_ptr))) == NULL)
- return DAT_INSUFFICIENT_RESOURCES;
+ goto bail;
- (void) dapl_os_memzero( acm_ptr, sizeof( *acm_ptr ) );
+ (void) dapl_os_memzero(acm_ptr, sizeof(*acm_ptr));
acm_ptr->socket = -1;
acm_ptr->sp = cm_ptr->sp;
@@ -531,25 +620,43 @@ dapli_socket_accept(ib_cm_srvc_handle_t cm_ptr)
acm_ptr->socket = accept(cm_ptr->socket,
(struct sockaddr*)&acm_ptr->dst.ia_address,
(socklen_t*)&len);
-
if (acm_ptr->socket < 0) {
- dapl_dbg_log(DAPL_DBG_TYPE_ERR,
+ dapl_log(DAPL_DBG_TYPE_ERR,
" accept: ERR %s on FD %d l_cr %p\n",
strerror(errno),cm_ptr->socket,cm_ptr);
- dat_status = DAT_INTERNAL_ERROR;
goto bail;
}
+ dapl_dbg_log(DAPL_DBG_TYPE_EP,
+ " socket accepted, queue new cm %p\n",acm_ptr);
+
+ acm_ptr->state = SCM_ACCEPTING;
+ dapli_cm_queue(acm_ptr);
+ return;
+bail:
+ /* close socket, free cm structure, active will see socket close as reject */
+ if (acm_ptr)
+ dapli_cm_destroy(acm_ptr);
+}
+
+/*
+ * PASSIVE: receive peer QP information, private data, post cr_event
+ */
+void
+dapli_socket_accept_data(ib_cm_srvc_handle_t acm_ptr)
+{
+ int len;
+ void *p_data = NULL;
+
dapl_dbg_log(DAPL_DBG_TYPE_EP," socket accepted, read QP data\n");
/* read in DST QP info, IA address. check for private data */
len = read(acm_ptr->socket, &acm_ptr->dst, sizeof(ib_qp_cm_t));
if (len != sizeof(ib_qp_cm_t) ||
ntohs(acm_ptr->dst.ver) != DSCM_VER) {
- dapl_dbg_log(DAPL_DBG_TYPE_ERR,
+ dapl_log(DAPL_DBG_TYPE_ERR,
" accept read: ERR %s, rcnt=%d, ver=%d\n",
- strerror(errno), len, ntohs(acm_ptr->dst.ver));
- dat_status = DAT_INTERNAL_ERROR;
+ strerror(errno), len, acm_ptr->dst.ver);
goto bail;
}
@@ -557,13 +664,14 @@ dapli_socket_accept(ib_cm_srvc_handle_t cm_ptr)
acm_ptr->dst.port = ntohs(acm_ptr->dst.port);
acm_ptr->dst.lid = ntohs(acm_ptr->dst.lid);
acm_ptr->dst.qpn = ntohl(acm_ptr->dst.qpn);
+#ifdef DAT_EXTENSIONS
acm_ptr->dst.qp_type = ntohs(acm_ptr->dst.qp_type);
+#endif
acm_ptr->dst.p_size = ntohl(acm_ptr->dst.p_size);
dapl_dbg_log(DAPL_DBG_TYPE_EP,
- " accept: DST %s port=0x%x lid=0x%x, qpn=0x%x, psz=%d\n",
- inet_ntoa(((struct sockaddr_in *)
- &acm_ptr->dst.ia_address)->sin_addr),
+ " accept: DST %s port=0x%x lid=0x%x, qpn=0x%x, psize=%d\n",
+ inet_ntoa(((struct sockaddr_in *)&acm_ptr->dst.ia_address)->sin_addr),
acm_ptr->dst.port, acm_ptr->dst.lid,
acm_ptr->dst.qpn, acm_ptr->dst.p_size);
@@ -572,7 +680,6 @@ dapli_socket_accept(ib_cm_srvc_handle_t cm_ptr)
dapl_dbg_log(DAPL_DBG_TYPE_ERR,
" accept read: psize (%d) wrong\n",
acm_ptr->dst.p_size);
- dat_status = DAT_INTERNAL_ERROR;
goto bail;
}
@@ -583,18 +690,17 @@ dapli_socket_accept(ib_cm_srvc_handle_t cm_ptr)
len = read( acm_ptr->socket,
acm_ptr->p_data, acm_ptr->dst.p_size);
if (len != acm_ptr->dst.p_size) {
- dapl_dbg_log(DAPL_DBG_TYPE_ERR,
+ dapl_log(DAPL_DBG_TYPE_ERR,
" accept read pdata: ERR %s, rcnt=%d\n",
strerror(errno), len);
- dat_status = DAT_INTERNAL_ERROR;
goto bail;
}
dapl_dbg_log(DAPL_DBG_TYPE_EP," accept: psize=%d read\n",len);
p_data = acm_ptr->p_data;
}
- acm_ptr->state = SCM_ACCEPTING;
-
+ acm_ptr->state = SCM_ACCEPTING_DATA;
+
#ifdef DAT_EXTENSIONS
if (acm_ptr->dst.qp_type == IBV_QPT_UD) {
DAT_IB_EXTENSION_EVENT_DATA xevent;
@@ -617,10 +723,11 @@ dapli_socket_accept(ib_cm_srvc_handle_t cm_ptr)
IB_CME_CONNECTION_REQUEST_PENDING,
p_data,
acm_ptr->sp );
- return DAT_SUCCESS;
+ return;
bail:
+ /* close socket, free cm structure, active will see socket close as reject */
dapli_cm_destroy(acm_ptr);
- return DAT_INTERNAL_ERROR;
+ return;
}
/*
@@ -635,7 +742,7 @@ dapli_socket_accept_usr(DAPL_EP *ep_ptr,
DAT_PVOID p_data)
{
DAPL_IA *ia_ptr = ep_ptr->header.owner_ia;
- dp_ib_cm_handle_t cm_ptr = cr_ptr->ib_cm_handle;
+ dp_ib_cm_handle_t cm_ptr = cr_ptr->ib_cm_handle;
ib_qp_cm_t local;
struct iovec iovec[2];
int len;
@@ -648,7 +755,7 @@ dapli_socket_accept_usr(DAPL_EP *ep_ptr,
return DAT_INTERNAL_ERROR;
dapl_dbg_log(DAPL_DBG_TYPE_EP,
- " accept_usr: remote port=0x%x lid=0x%x"
+ " ACCEPT_USR: remote port=0x%x lid=0x%x"
" qpn=0x%x qp_type %d, psize=%d\n",
cm_ptr->dst.port, cm_ptr->dst.lid,
cm_ptr->dst.qpn, cm_ptr->dst.qp_type,
@@ -658,25 +765,34 @@ dapli_socket_accept_usr(DAPL_EP *ep_ptr,
if (cm_ptr->dst.qp_type == IBV_QPT_UD &&
ep_ptr->qp_handle->qp_type != IBV_QPT_UD) {
dapl_dbg_log(DAPL_DBG_TYPE_ERR,
- " accept_rtu: ERR remote QP is UD,"
+ " ACCEPT_USR: ERR remote QP is UD,"
", but local QP is not\n");
return (DAT_INVALID_HANDLE | DAT_INVALID_HANDLE_EP);
-
}
#endif
/* modify QP to RTR and then to RTS with remote info already read */
if (dapls_modify_qp_state(ep_ptr->qp_handle,
- IBV_QPS_RTR, cm_ptr) != DAT_SUCCESS)
+ IBV_QPS_RTR, cm_ptr) != DAT_SUCCESS) {
+ dapl_log(DAPL_DBG_TYPE_ERR,
+ " ACCEPT_USR: QPS_RTR ERR %s -> %s\n",
+ strerror(errno),
+ inet_ntoa(((struct sockaddr_in *)
+ &cm_ptr->dst.ia_address)->sin_addr));
goto bail;
-
+ }
if (dapls_modify_qp_state(ep_ptr->qp_handle,
- IBV_QPS_RTS, cm_ptr) != DAT_SUCCESS)
+ IBV_QPS_RTS, cm_ptr) != DAT_SUCCESS) {
+ dapl_log(DAPL_DBG_TYPE_ERR,
+ " ACCEPT_USR: QPS_RTS ERR %s -> %s\n",
+ strerror(errno),
+ inet_ntoa(((struct sockaddr_in *)
+ &cm_ptr->dst.ia_address)->sin_addr));
goto bail;
-
+ }
ep_ptr->qp_state = IB_QP_STATE_RTS;
- /* save remote address information, for qp_query */
+ /* save remote address information */
dapl_os_memcpy( &ep_ptr->remote_ia_address,
&cm_ptr->dst.ia_address,
sizeof(ep_ptr->remote_ia_address));
@@ -689,15 +805,26 @@ dapli_socket_accept_usr(DAPL_EP *ep_ptr,
local.port = htons(ia_ptr->hca_ptr->port_num);
local.lid = htons(dapli_get_lid(ia_ptr->hca_ptr->ib_hca_handle,
(uint8_t)ia_ptr->hca_ptr->port_num));
- if (local.lid == 0xffff)
+ if (local.lid == 0xffff) {
+ dapl_log(DAPL_DBG_TYPE_ERR,
+ " ACCEPT_USR: query LID ERR %s -> %s\n",
+ strerror(errno),
+ inet_ntoa(((struct sockaddr_in *)
+ &cm_ptr->dst.ia_address)->sin_addr));
goto bail;
+ }
/* in network order */
if (ibv_query_gid(ia_ptr->hca_ptr->ib_hca_handle,
(uint8_t)ia_ptr->hca_ptr->port_num,
- 0,
- &local.gid))
+ 0, &local.gid)) {
+ dapl_log(DAPL_DBG_TYPE_ERR,
+ " ACCEPT_USR: query GID ERR %s -> %s\n",
+ strerror(errno),
+ inet_ntoa(((struct sockaddr_in *)
+ &cm_ptr->dst.ia_address)->sin_addr));
goto bail;
+ }
local.ia_address = ia_ptr->hca_ptr->hca_address;
local.p_size = htonl(p_size);
@@ -709,19 +836,20 @@ dapli_socket_accept_usr(DAPL_EP *ep_ptr,
}
len = writev(cm_ptr->socket, iovec, (p_size ? 2:1));
if (len != (p_size + sizeof(ib_qp_cm_t))) {
- dapl_dbg_log(DAPL_DBG_TYPE_ERR,
- " accept_rtu: ERR %s, wcnt=%d\n",
- strerror(errno), len);
+ dapl_log(DAPL_DBG_TYPE_ERR,
+ " ACCEPT_USR: ERR %s, wcnt=%d -> %s\n",
+ strerror(errno), len,
+ inet_ntoa(((struct sockaddr_in *)
+ &cm_ptr->dst.ia_address)->sin_addr));
goto bail;
}
dapl_dbg_log(DAPL_DBG_TYPE_CM,
- " accept_usr: local port=0x%x lid=0x%x"
- " qpn=0x%x qp_type=%d psize=%d\n",
+ " ACCEPT_USR: local port=0x%x lid=0x%x"
+ " qpn=0x%x psize=%d\n",
ntohs(local.port), ntohs(local.lid),
- ntohl(local.qpn), ntohs(local.qp_type),
- ntohl(local.p_size));
+ ntohl(local.qpn), ntohl(local.p_size));
dapl_dbg_log(DAPL_DBG_TYPE_CM,
- " accept_usr SRC GID subnet %016llx id %016llx\n",
+ " ACCEPT_USR SRC GID subnet %016llx id %016llx\n",
(unsigned long long)
cpu_to_be64(local.gid.global.subnet_prefix),
(unsigned long long)
@@ -733,10 +861,8 @@ dapli_socket_accept_usr(DAPL_EP *ep_ptr,
cm_ptr->state = SCM_ACCEPTED;
dapl_dbg_log( DAPL_DBG_TYPE_EP," PASSIVE: accepted!\n" );
- dapli_cm_queue(cm_ptr);
return DAT_SUCCESS;
bail:
- dapl_dbg_log(DAPL_DBG_TYPE_ERR," accept_rtu: ERR !QP_RTR_RTS \n");
dapli_cm_destroy(cm_ptr);
dapls_ib_reinit_ep(ep_ptr); /* reset QP state */
return DAT_INTERNAL_ERROR;
@@ -746,7 +872,7 @@ bail:
* PASSIVE: read RTU from active peer, post CONN event
*/
void
-dapli_socket_accept_rtu(dp_ib_cm_handle_t cm_ptr)
+dapli_socket_accept_rtu(dp_ib_cm_handle_t cm_ptr)
{
int len;
short rtu_data = 0;
@@ -754,9 +880,11 @@ dapli_socket_accept_rtu(dp_ib_cm_handle_t cm_ptr)
/* complete handshake after final QP state change */
len = read(cm_ptr->socket, &rtu_data, sizeof(rtu_data));
if (len != sizeof(rtu_data) || ntohs(rtu_data) != 0x0e0f) {
- dapl_dbg_log(DAPL_DBG_TYPE_ERR,
- " accept_rtu: ERR %s, rcnt=%d rdata=%x\n",
- strerror(errno), len, ntohs(rtu_data));
+ dapl_log(DAPL_DBG_TYPE_ERR,
+ " ACCEPT_RTU: ERR %s, rcnt=%d rdata=%x\n",
+ strerror(errno), len, ntohs(rtu_data),
+ inet_ntoa(((struct sockaddr_in *)
+ &cm_ptr->dst.ia_address)->sin_addr));
goto bail;
}
@@ -860,7 +988,7 @@ dapls_ib_disconnect(
IN DAPL_EP *ep_ptr,
IN DAT_CLOSE_FLAGS close_flags)
{
- dp_ib_cm_handle_t cm_ptr = ep_ptr->cm_handle;
+ dp_ib_cm_handle_t cm_ptr = ep_ptr->cm_handle;
dapl_dbg_log (DAPL_DBG_TYPE_EP,
"dapls_ib_disconnect(ep_handle %p ....)\n",
@@ -1045,7 +1173,7 @@ dapls_ib_reject_connection(
IN DAT_COUNT psize,
IN const DAT_PVOID pdata)
{
- struct iovec iovec;
+ struct iovec iovec[2];
dapl_dbg_log (DAPL_DBG_TYPE_EP,
" reject(cm %p reason %x, pdata %p, psize %d)\n",
@@ -1055,9 +1183,15 @@ dapls_ib_reject_connection(
if (cm_ptr->socket >= 0) {
cm_ptr->dst.rej = (uint16_t)reason;
cm_ptr->dst.rej = htons(cm_ptr->dst.rej);
- iovec.iov_base = &cm_ptr->dst;
- iovec.iov_len = sizeof(ib_qp_cm_t);
- writev(cm_ptr->socket, &iovec, 1);
+ iovec[0].iov_base = &cm_ptr->dst;
+ iovec[0].iov_len = sizeof(ib_qp_cm_t);
+ if (psize) {
+ iovec[1].iov_base = pdata;
+ iovec[2].iov_len = psize;
+ writev(cm_ptr->socket, &iovec[0], 2);
+ } else
+ writev(cm_ptr->socket, &iovec[0], 1);
+
close(cm_ptr->socket);
cm_ptr->socket = -1;
}
@@ -1090,7 +1224,7 @@ dapls_ib_cm_remote_addr (
OUT DAT_SOCK_ADDR6 *remote_ia_address )
{
DAPL_HEADER *header;
- dp_ib_cm_handle_t ib_cm_handle;
+ dp_ib_cm_handle_t ib_cm_handle;
dapl_dbg_log (DAPL_DBG_TYPE_EP,
"dapls_ib_cm_remote_addr(dat_handle %p, ....)\n",
@@ -1290,7 +1424,8 @@ void cr_thread(void *arg)
{
struct dapl_hca *hca_ptr = arg;
dp_ib_cm_handle_t cr, next_cr;
- int ret,idx;
+ int opt,ret,idx;
+ socklen_t opt_len;
char rbuf[2];
struct pollfd ufds[SCM_MAX_CONN];
@@ -1330,14 +1465,19 @@ void cr_thread(void *arg)
/* Add to ufds for poll, check for immediate work */
ufds[++idx].fd = cr->socket; /* add listen or cr */
- ufds[idx].events = POLLIN;
+ if (cr->state == SCM_CONN_PENDING)
+ ufds[idx].events = POLLOUT;
+ else
+ ufds[idx].events = POLLIN;
/* check socket for event, accept in or connect out */
dapl_dbg_log(DAPL_DBG_TYPE_CM," poll cr=%p, fd=%d,%d\n",
cr, cr->socket, ufds[idx].fd);
dapl_os_unlock(&hca_ptr->ib_trans.lock);
ret = poll(&ufds[idx],1,1);
- dapl_dbg_log(DAPL_DBG_TYPE_CM," poll wakeup ret=%d cr->st=%d ev=%d fd=%d\n",
+ dapl_dbg_log(DAPL_DBG_TYPE_CM,
+ " poll wakeup ret=%d cr->st=%d"
+ " ev=0x%x fd=%d\n",
ret,cr->state,ufds[idx].revents,ufds[idx].fd);
/* data on listen, qp exchange, and on disconnect request */
@@ -1345,13 +1485,28 @@ void cr_thread(void *arg)
if (cr->socket > 0) {
if (cr->state == SCM_LISTEN)
dapli_socket_accept(cr);
+ else if (cr->state == SCM_ACCEPTING)
+ dapli_socket_accept_data(cr);
else if (cr->state == SCM_ACCEPTED)
dapli_socket_accept_rtu(cr);
- else if (cr->state == SCM_CONN_PENDING)
+ else if (cr->state == SCM_RTU_PENDING)
dapli_socket_connect_rtu(cr);
else if (cr->state == SCM_CONNECTED)
dapli_socket_disconnect(cr);
}
+ /* connect socket is writable, check status */
+ } else if ((ret == 1) &&
+ (ufds[idx].revents & POLLOUT ||
+ ufds[idx].revents & POLLERR)) {
+ if (cr->state == SCM_CONN_PENDING) {
+ opt = 0;
+ ret = getsockopt(cr->socket, SOL_SOCKET,
+ SO_ERROR, &opt, &opt_len);
+ if (!ret)
+ dapli_socket_connected(cr,opt);
+ else
+ dapli_socket_connected(cr,EFAULT);
+ }
} else if (ret != 0) {
dapl_dbg_log(DAPL_DBG_TYPE_CM,
" cr_thread(cr=%p) st=%d poll ERR= %s\n",
diff --git a/dapl/openib_scm/dapl_ib_extensions.c b/dapl/openib_scm/dapl_ib_extensions.c
index b88e853..692c8a8 100755
--- a/dapl/openib_scm/dapl_ib_extensions.c
+++ b/dapl/openib_scm/dapl_ib_extensions.c
@@ -82,7 +82,7 @@ dapl_extensions(IN DAT_HANDLE dat_handle,
IN va_list args)
{
DAT_EP_HANDLE ep;
- DAT_IB_ADDR_HANDLE *ah;
+ DAT_IB_ADDR_HANDLE *ah = NULL;
DAT_LMR_TRIPLET *lmr_p;
DAT_DTO_COOKIE cookie;
const DAT_RMR_TRIPLET *rmr_p;
diff --git a/dapl/openib_scm/dapl_ib_util.c b/dapl/openib_scm/dapl_ib_util.c
index f3874ca..0f24737 100644
--- a/dapl/openib_scm/dapl_ib_util.c
+++ b/dapl/openib_scm/dapl_ib_util.c
@@ -444,8 +444,6 @@ DAT_RETURN dapls_ib_query_hca (
ia_attr->transport_attr = NULL;
ia_attr->num_vendor_attr = 0;
ia_attr->vendor_attr = NULL;
- ia_attr->max_iov_segments_per_rdma_read = dev_attr.max_sge;
- ia_attr->max_iov_segments_per_rdma_write = dev_attr.max_sge;
#ifdef DAT_EXTENSIONS
ia_attr->extension_supported = DAT_EXTENSION_IB;
ia_attr->extension_version = DAT_IB_EXTENSION_VERSION;
diff --git a/dapl/openib_scm/dapl_ib_util.h b/dapl/openib_scm/dapl_ib_util.h
index bd3ea83..deb6be3 100644
--- a/dapl/openib_scm/dapl_ib_util.h
+++ b/dapl/openib_scm/dapl_ib_util.h
@@ -107,7 +107,9 @@ typedef enum scm_state
SCM_INIT,
SCM_LISTEN,
SCM_CONN_PENDING,
+ SCM_RTU_PENDING,
SCM_ACCEPTING,
+ SCM_ACCEPTING_DATA,
SCM_ACCEPTED,
SCM_REJECTED,
SCM_CONNECTED,
--
1.5.2.5
More information about the general
mailing list