[ofa-general] [PATCH 1/6][uDAPL v1] dapl scm: change connect and accept to non-blocking to avoid blocking user thread.
Arlin Davis
arlin.r.davis at intel.com
Thu Aug 14 16:19:16 PDT 2008
Patch set for uDAPL v1 that includes socket cm provider improvements.
Similar patch set coming for uDAPL v2.
The connect socket that is used to exchange QP information is now non-blocking
and the data exchange is done via the cr thread. New state RTU_PENDING added.
On the passive side there is a new state ACCEPT_DATA used to avoid read blocking
on the user accept call.
Signed-off by: Arlin Davis ardavis at ichips.intel.com
---
dapl/openib_scm/dapl_ib_cm.c | 214 +++++++++++++++++++++++++++++-----------
dapl/openib_scm/dapl_ib_util.h | 2 +
2 files changed, 156 insertions(+), 60 deletions(-)
diff --git a/dapl/openib_scm/dapl_ib_cm.c b/dapl/openib_scm/dapl_ib_cm.c
index f78ebe6..03b0f12 100644
--- a/dapl/openib_scm/dapl_ib_cm.c
+++ b/dapl/openib_scm/dapl_ib_cm.c
@@ -197,6 +197,63 @@ dapli_socket_disconnect(ib_cm_handle_t cm_ptr)
return DAT_SUCCESS;
}
+/*
+ * ACTIVE: socket connected, send QP information to peer
+ */
+void
+dapli_socket_connected(ib_cm_handle_t cm_ptr, int err)
+{
+ int len, opt = 1;
+ struct iovec iovec[2];
+ struct dapl_ep *ep_ptr = cm_ptr->ep;
+
+ if (err) {
+ dapl_log(DAPL_DBG_TYPE_ERR, " connect: socket ERR %s\n",
+ strerror(err));
+ goto bail;
+ }
+ dapl_dbg_log(DAPL_DBG_TYPE_EP,
+ " socket connected, write QP and private data\n");
+
+ /* no delay for small packets */
+ setsockopt(cm_ptr->socket,IPPROTO_TCP,TCP_NODELAY,&opt,sizeof(opt));
+
+ /* send qp info and pdata to remote peer */
+ iovec[0].iov_base = &cm_ptr->dst;
+ iovec[0].iov_len = sizeof(ib_qp_cm_t);
+ if (cm_ptr->dst.p_size) {
+ iovec[1].iov_base = cm_ptr->p_data;
+ iovec[1].iov_len = ntohl(cm_ptr->dst.p_size);
+ }
+
+ len = writev(cm_ptr->socket, iovec, (cm_ptr->dst.p_size ? 2:1));
+ if (len != (ntohl(cm_ptr->dst.p_size) + sizeof(ib_qp_cm_t))) {
+ dapl_log(DAPL_DBG_TYPE_ERR,
+ " connect write: ERR %s, wcnt=%d\n",
+ strerror(errno), len);
+ goto bail;
+ }
+ dapl_dbg_log(DAPL_DBG_TYPE_CM,
+ " connected: sending SRC port=0x%x lid=0x%x,"
+ " qpn=0x%x, psize=%d\n",
+ ntohs(cm_ptr->dst.port), ntohs(cm_ptr->dst.lid),
+ ntohl(cm_ptr->dst.qpn), ntohl(cm_ptr->dst.p_size));
+ dapl_dbg_log(DAPL_DBG_TYPE_CM,
+ " connected: sending SRC GID subnet %016llx id %016llx\n",
+ (unsigned long long)
+ cpu_to_be64(cm_ptr->dst.gid.global.subnet_prefix),
+ (unsigned long long)
+ cpu_to_be64(cm_ptr->dst.gid.global.interface_id));
+
+ /* queue up to work thread to avoid blocking consumer */
+ cm_ptr->state = SCM_RTU_PENDING;
+ return;
+bail:
+ /* close socket, free cm structure and post error event */
+ dapli_cm_destroy(cm_ptr);
+ dapl_evd_connection_callback(NULL, IB_CME_LOCAL_FAILURE, NULL, ep_ptr);
+}
+
/*
* ACTIVE: Create socket, connect, defer exchange QP information to CR thread
@@ -210,8 +267,7 @@ dapli_socket_connect(DAPL_EP *ep_ptr,
DAT_PVOID p_data)
{
ib_cm_handle_t cm_ptr;
- int len, opt = 1;
- struct iovec iovec[2];
+ int ret;
DAPL_IA *ia_ptr = ep_ptr->header.owner_ia;
dapl_dbg_log(DAPL_DBG_TYPE_EP, " connect: r_qual %d p_size=%d\n",
@@ -227,19 +283,28 @@ dapli_socket_connect(DAPL_EP *ep_ptr,
return DAT_INSUFFICIENT_RESOURCES;
}
- ((struct sockaddr_in*)r_addr)->sin_port = htons(r_qual);
+ /* non-blocking */
+ ret = fcntl(cm_ptr->socket, F_GETFL);
+ if (ret < 0 || fcntl(cm_ptr->socket,
+ F_SETFL, ret | O_NONBLOCK) < 0) {
+ dapl_log(DAPL_DBG_TYPE_ERR,
+ " connect: fcntl on socket %d ERR %d %s\n",
+ cm_ptr->socket, ret,
+ strerror(errno));
+ goto bail;
+ }
- if (connect(cm_ptr->socket, r_addr, sizeof(*r_addr)) < 0) {
- dapl_dbg_log(DAPL_DBG_TYPE_ERR,
- " connect: %s on r_qual %d\n",
- strerror(errno), (unsigned int)r_qual);
+ ((struct sockaddr_in*)r_addr)->sin_port = htons(r_qual);
+ ret = connect(cm_ptr->socket, r_addr, sizeof(*r_addr));
+ if (ret && errno != EINPROGRESS) {
+ dapl_log(DAPL_DBG_TYPE_ERR,
+ " connect ERROR: %s on %s r_qual %d\n",
+ strerror(errno),
+ inet_ntoa(((struct sockaddr_in *)r_addr)->sin_addr),
+ (unsigned int)r_qual);
dapli_cm_destroy(cm_ptr);
return DAT_INVALID_ADDRESS;
- }
- setsockopt(cm_ptr->socket,IPPROTO_TCP,TCP_NODELAY,&opt,sizeof(opt));
-
- dapl_dbg_log(DAPL_DBG_TYPE_EP, " socket connected!\n");
-
+ }
/* Send QP info, IA address, and private data */
cm_ptr->dst.qpn = htonl(ep_ptr->qp_handle->qp_num);
@@ -257,41 +322,36 @@ dapli_socket_connect(DAPL_EP *ep_ptr,
&cm_ptr->dst.gid))
goto bail;
+ /* save references */
+ cm_ptr->hca = ia_ptr->hca_ptr;
+ cm_ptr->ep = ep_ptr;
cm_ptr->dst.ia_address = ia_ptr->hca_ptr->hca_address;
- cm_ptr->dst.p_size = htonl(p_size);
- iovec[0].iov_base = &cm_ptr->dst;
- iovec[0].iov_len = sizeof(ib_qp_cm_t);
if (p_size) {
- iovec[1].iov_base = p_data;
- iovec[1].iov_len = p_size;
+ cm_ptr->dst.p_size = htonl(p_size);
+ dapl_os_memcpy(cm_ptr->p_data, p_data, p_size);
}
- dapl_dbg_log(DAPL_DBG_TYPE_EP," socket connected, write QP and private data\n");
- len = writev(cm_ptr->socket, iovec, (p_size ? 2:1));
- if (len != (p_size + sizeof(ib_qp_cm_t))) {
- dapl_dbg_log(DAPL_DBG_TYPE_ERR,
- " connect write: ERR %s, wcnt=%d\n",
- strerror(errno), len);
- goto bail;
- }
- dapl_dbg_log(DAPL_DBG_TYPE_CM,
- " connect: SRC port=0x%x lid=0x%x, qpn=0x%x, psize=%d\n",
- ntohs(cm_ptr->dst.port), ntohs(cm_ptr->dst.lid),
- ntohl(cm_ptr->dst.qpn), ntohl(cm_ptr->dst.p_size));
- dapl_dbg_log(DAPL_DBG_TYPE_CM,
- " connect SRC GID subnet %016llx id %016llx\n",
- (unsigned long long)
- cpu_to_be64(cm_ptr->dst.gid.global.subnet_prefix),
- (unsigned long long)
- cpu_to_be64(cm_ptr->dst.gid.global.interface_id));
-
- /* queue up to work thread to avoid blocking consumer */
- cm_ptr->state = SCM_CONN_PENDING;
- cm_ptr->hca = ia_ptr->hca_ptr;
- cm_ptr->ep = ep_ptr;
+ /* connected or pending, either way results via async event */
+ if (ret == 0)
+ dapli_socket_connected(cm_ptr,0);
+ else
+ cm_ptr->state = SCM_CONN_PENDING;
+
+ dapl_dbg_log(DAPL_DBG_TYPE_EP,
+ " connect: socket %d to %s r_qual %d pending\n",
+ cm_ptr->socket,
+ inet_ntoa(((struct sockaddr_in *)r_addr)->sin_addr),
+ (unsigned int)r_qual);
+
dapli_cm_queue(cm_ptr);
return DAT_SUCCESS;
bail:
+ dapl_log(DAPL_DBG_TYPE_ERR,
+ " connect ERROR: %s query lid(0x%x)/gid on %s r_qual %d\n",
+ strerror(errno),ntohs(cm_ptr->dst.lid),
+ inet_ntoa(((struct sockaddr_in *)r_addr)->sin_addr),
+ (unsigned int)r_qual);
+
/* close socket, free cm structure */
dapli_cm_destroy(cm_ptr);
return DAT_INTERNAL_ERROR;
@@ -470,25 +530,22 @@ bail:
return dat_status;
}
-
/*
- * PASSIVE: accept socket, receive peer QP information, private data, post cr_event
+ * PASSIVE: accept socket
*/
-DAT_RETURN
+void
dapli_socket_accept(ib_cm_srvc_handle_t cm_ptr)
{
ib_cm_handle_t acm_ptr;
- void *p_data = NULL;
int len;
- DAT_RETURN dat_status = DAT_SUCCESS;
dapl_dbg_log(DAPL_DBG_TYPE_EP," socket_accept\n");
/* Allocate accept CM and initialize */
if ((acm_ptr = dapl_os_alloc(sizeof(*acm_ptr))) == NULL)
- return DAT_INSUFFICIENT_RESOURCES;
+ goto bail;
- (void) dapl_os_memzero( acm_ptr, sizeof( *acm_ptr ) );
+ (void) dapl_os_memzero(acm_ptr, sizeof(*acm_ptr));
acm_ptr->socket = -1;
acm_ptr->sp = cm_ptr->sp;
@@ -498,15 +555,34 @@ dapli_socket_accept(ib_cm_srvc_handle_t cm_ptr)
acm_ptr->socket = accept(cm_ptr->socket,
(struct sockaddr*)&acm_ptr->dst.ia_address,
(socklen_t*)&len);
-
if (acm_ptr->socket < 0) {
dapl_dbg_log(DAPL_DBG_TYPE_ERR,
" accept: ERR %s on FD %d l_cr %p\n",
strerror(errno),cm_ptr->socket,cm_ptr);
- dat_status = DAT_INTERNAL_ERROR;
goto bail;
}
+ dapl_dbg_log(DAPL_DBG_TYPE_EP,
+ " socket accepted, queue new cm %p\n",acm_ptr);
+
+ acm_ptr->state = SCM_ACCEPTING;
+ dapli_cm_queue(acm_ptr);
+ return;
+bail:
+ /* close socket, free cm structure, active will see socket close as reject */
+ if (acm_ptr)
+ dapli_cm_destroy(acm_ptr);
+}
+
+/*
+ * PASSIVE: receive peer QP information, private data, post cr_event
+ */
+void
+dapli_socket_accept_data(ib_cm_srvc_handle_t acm_ptr)
+{
+ int len;
+ void *p_data = NULL;
+
dapl_dbg_log(DAPL_DBG_TYPE_EP," socket accepted, read QP data\n");
/* read in DST QP info, IA address. check for private data */
@@ -516,7 +592,6 @@ dapli_socket_accept(ib_cm_srvc_handle_t cm_ptr)
dapl_dbg_log(DAPL_DBG_TYPE_ERR,
" accept read: ERR %s, rcnt=%d, ver=%d\n",
strerror(errno), len, acm_ptr->dst.ver);
- dat_status = DAT_INTERNAL_ERROR;
goto bail;
}
@@ -537,7 +612,6 @@ dapli_socket_accept(ib_cm_srvc_handle_t cm_ptr)
dapl_dbg_log(DAPL_DBG_TYPE_ERR,
" accept read: psize (%d) wrong\n",
acm_ptr->dst.p_size);
- dat_status = DAT_INTERNAL_ERROR;
goto bail;
}
@@ -551,24 +625,24 @@ dapli_socket_accept(ib_cm_srvc_handle_t cm_ptr)
dapl_dbg_log(DAPL_DBG_TYPE_ERR,
" accept read pdata: ERR %s, rcnt=%d\n",
strerror(errno), len);
- dat_status = DAT_INTERNAL_ERROR;
goto bail;
}
dapl_dbg_log(DAPL_DBG_TYPE_EP," accept: psize=%d read\n",len);
p_data = acm_ptr->p_data;
}
- acm_ptr->state = SCM_ACCEPTING;
+ acm_ptr->state = SCM_ACCEPTING_DATA;
/* trigger CR event and return SUCCESS */
dapls_cr_callback(acm_ptr,
IB_CME_CONNECTION_REQUEST_PENDING,
p_data,
acm_ptr->sp );
- return DAT_SUCCESS;
+ return;
bail:
+ /* close socket, free cm structure, active will see socket close as reject */
dapli_cm_destroy(acm_ptr);
- return DAT_INTERNAL_ERROR;
+ return;
}
/*
@@ -669,7 +743,6 @@ dapli_socket_accept_usr(DAPL_EP *ep_ptr,
sizeof(cm_ptr->dst.ia_address));
dapl_dbg_log( DAPL_DBG_TYPE_EP," PASSIVE: accepted!\n" );
- dapli_cm_queue(cm_ptr);
return DAT_SUCCESS;
bail:
dapl_dbg_log(DAPL_DBG_TYPE_ERR," accept_rtu: ERR !QP_RTR_RTS \n");
@@ -1202,7 +1275,8 @@ void cr_thread(void *arg)
{
struct dapl_hca *hca_ptr = arg;
ib_cm_handle_t cr, next_cr;
- int ret,idx;
+ int opt,ret,idx;
+ socklen_t opt_len;
char rbuf[2];
struct pollfd ufds[SCM_MAX_CONN];
@@ -1242,14 +1316,19 @@ void cr_thread(void *arg)
/* Add to ufds for poll, check for immediate work */
ufds[++idx].fd = cr->socket; /* add listen or cr */
- ufds[idx].events = POLLIN;
+ if (cr->state == SCM_CONN_PENDING)
+ ufds[idx].events = POLLOUT;
+ else
+ ufds[idx].events = POLLIN;
/* check socket for event, accept in or connect out */
dapl_dbg_log(DAPL_DBG_TYPE_CM," poll cr=%p, fd=%d,%d\n",
cr, cr->socket, ufds[idx].fd);
dapl_os_unlock(&hca_ptr->ib_trans.lock);
ret = poll(&ufds[idx],1,1);
- dapl_dbg_log(DAPL_DBG_TYPE_CM," poll wakeup ret=%d cr->st=%d ev=%d fd=%d\n",
+ dapl_dbg_log(DAPL_DBG_TYPE_CM,
+ " poll wakeup ret=%d cr->st=%d"
+ " ev=0x%x fd=%d\n",
ret,cr->state,ufds[idx].revents,ufds[idx].fd);
/* data on listen, qp exchange, and on disconnect request */
@@ -1257,13 +1336,28 @@ void cr_thread(void *arg)
if (cr->socket > 0) {
if (cr->state == SCM_LISTEN)
dapli_socket_accept(cr);
+ else if (cr->state == SCM_ACCEPTING)
+ dapli_socket_accept_data(cr);
else if (cr->state == SCM_ACCEPTED)
dapli_socket_accept_rtu(cr);
- else if (cr->state == SCM_CONN_PENDING)
+ else if (cr->state == SCM_RTU_PENDING)
dapli_socket_connect_rtu(cr);
else if (cr->state == SCM_CONNECTED)
dapli_socket_disconnect(cr);
}
+ /* connect socket is writable, check status */
+ } else if ((ret == 1) &&
+ (ufds[idx].revents & POLLOUT ||
+ ufds[idx].revents & POLLERR)) {
+ if (cr->state == SCM_CONN_PENDING) {
+ opt = 0;
+ ret = getsockopt(cr->socket, SOL_SOCKET,
+ SO_ERROR, &opt, &opt_len);
+ if (!ret)
+ dapli_socket_connected(cr,opt);
+ else
+ dapli_socket_connected(cr,EFAULT);
+ }
} else if (ret != 0) {
dapl_dbg_log(DAPL_DBG_TYPE_CM,
" cr_thread(cr=%p) st=%d poll ERR= %s\n",
diff --git a/dapl/openib_scm/dapl_ib_util.h b/dapl/openib_scm/dapl_ib_util.h
index 37c5dbb..6d7568c 100644
--- a/dapl/openib_scm/dapl_ib_util.h
+++ b/dapl/openib_scm/dapl_ib_util.h
@@ -102,7 +102,9 @@ typedef enum scm_state
SCM_INIT,
SCM_LISTEN,
SCM_CONN_PENDING,
+ SCM_RTU_PENDING,
SCM_ACCEPTING,
+ SCM_ACCEPTING_DATA,
SCM_ACCEPTED,
SCM_REJECTED,
SCM_CONNECTED,
--
1.5.2.5
More information about the general
mailing list