[ofw] [PATCH 01/11] uDAPL v2 - ucm: latest patch set for new ucm provider, complete timer/retry logic and windows abstractions
Arlin Davis
arlin.r.davis at intel.com
Fri Oct 16 16:48:41 PDT 2009
add timer/retry CM logic to the ucm provider
add reply, rtu and retry count options via
environment variables. Times in msecs.
DAPL_UCM_RETRY 10
DAPL_UCM_REP_TIME 400
DAPL_UCM_RTU_TIME 200
Add RTU_PENDING and DISC_RECV states
Add check timer code to the cm_thread
and the option to the select abstaction
to take timeout values in msecs.
DREQ, REQ, and REPLY will all be timed
and retried.
Split out reply code and disconnect_final
code to better facilitate retry timers.
Add checking for duplicate messages.
Added new UD extension events for errors.
DAT_IB_UD_CONNECTION_REJECT_EVENT
DAT_IB_UD_CONNECTION_ERROR_EVENT
Signed-off-by: Arlin Davis <arlin.r.davis at intel.com>
---
dapl/common/dapl_debug.c | 2 +-
dapl/openib_common/dapl_ib_common.h | 36 ++-
dapl/openib_ucm/cm.c | 622 ++++++++++++++++++++++++----------
dapl/openib_ucm/dapl_ib_util.h | 8 +-
dapl/openib_ucm/device.c | 10 +-
dat/include/dat2/dat_ib_extensions.h | 10 +-
6 files changed, 485 insertions(+), 203 deletions(-)
diff --git a/dapl/common/dapl_debug.c b/dapl/common/dapl_debug.c
index 960bc00..904d075 100644
--- a/dapl/common/dapl_debug.c
+++ b/dapl/common/dapl_debug.c
@@ -50,7 +50,7 @@ void dapl_internal_dbg_log(DAPL_DBG_TYPE type, const char *fmt, ...)
if (DAPL_DBG_DEST_STDOUT & g_dapl_dbg_dest) {
va_start(args, fmt);
fprintf(stdout, "%s:%x: ", _ptr_host_,
- dapl_os_gettid());
+ dapl_os_getpid());
dapl_os_vprintf(fmt, args);
va_end(args);
}
diff --git a/dapl/openib_common/dapl_ib_common.h b/dapl/openib_common/dapl_ib_common.h
index 065cfca..982621c 100644
--- a/dapl/openib_common/dapl_ib_common.h
+++ b/dapl/openib_common/dapl_ib_common.h
@@ -165,9 +165,12 @@ typedef uint16_t ib_hca_port_t;
#define DCM_HOP_LIMIT 0xff
#define DCM_TCLASS 0
-/* DAPL uCM timers */
-#define DCM_RETRY_CNT 7
-#define DCM_RETRY_TIME_MS 1000
+/* DAPL uCM timers, default queue sizes */
+#define DCM_RETRY_CNT 10
+#define DCM_REP_TIME 400 /* reply timeout in m_secs */
+#define DCM_RTU_TIME 200 /* rtu timeout in m_secs */
+#define DCM_QP_SIZE 500 /* uCM tx, rx qp size */
+#define DCM_CQ_SIZE 500 /* uCM cq size */
/* DTO OPs, ordered for DAPL ENUM definitions */
#define OP_RDMA_WRITE IBV_WR_RDMA_WRITE
@@ -254,7 +257,7 @@ typedef enum
typedef enum dapl_cm_op
{
- DCM_REQ,
+ DCM_REQ = 1,
DCM_REP,
DCM_REJ_USER, /* user reject */
DCM_REJ_CM, /* cm reject, no SID */
@@ -279,7 +282,9 @@ typedef enum dapl_cm_state
DCM_RELEASED,
DCM_DISC_PENDING,
DCM_DISCONNECTED,
- DCM_DESTROY
+ DCM_DESTROY,
+ DCM_RTU_PENDING,
+ DCM_DISC_RECV
} DAPL_CM_STATE;
@@ -370,9 +375,26 @@ STATIC _INLINE_ char * dapl_cm_state_str(IN int st)
"CM_RELEASED",
"CM_DISC_PENDING",
"CM_DISCONNECTED",
- "CM_DESTROY"
+ "CM_DESTROY",
+ "CM_RTU_PENDING",
+ "CM_DISC_RECV"
};
- return ((st < 0 || st > 13) ? "Invalid CM state?" : state[st]);
+ return ((st < 0 || st > 15) ? "Invalid CM state?" : state[st]);
+}
+
+STATIC _INLINE_ char * dapl_cm_op_str(IN int op)
+{
+ static char *ops[] = {
+ "INVALID",
+ "REQ",
+ "REP",
+ "REJ_USER",
+ "REJ_CM",
+ "RTU",
+ "DREQ",
+ "DREP",
+ };
+ return ((op < 1 || op > 7) ? "Invalid OP?" : ops[op]);
}
#endif /* _DAPL_IB_COMMON_H_ */
diff --git a/dapl/openib_ucm/cm.c b/dapl/openib_ucm/cm.c
index 4dc67c9..099cadf 100644
--- a/dapl/openib_ucm/cm.c
+++ b/dapl/openib_ucm/cm.c
@@ -95,12 +95,19 @@ static enum DAPL_FD_EVENTS dapl_poll(DAPL_SOCKET s, enum DAPL_FD_EVENTS event)
return DAPL_FD_ERROR;
}
-static int dapl_select(struct dapl_fd_set *set)
+static int dapl_select(struct dapl_fd_set *set, int time_ms)
{
int ret;
+ struct timeval tv, *p_tv = NULL;
+
+ if (time_ms != -1) {
+ p_tv = &tv;
+ tv.tv_sec = time_ms/1000;
+ tv.tv_usec = (time_ms%1000)*1000;
+ }
dapl_dbg_log(DAPL_DBG_TYPE_CM, " dapl_select: sleep\n");
- ret = select(0, &set->set[0], &set->set[1], &set->set[2], NULL);
+ ret = select(0, &set->set[0], &set->set[1], &set->set[2], p_tv);
dapl_dbg_log(DAPL_DBG_TYPE_CM, " dapl_select: wakeup\n");
if (ret == SOCKET_ERROR)
@@ -166,24 +173,27 @@ static enum DAPL_FD_EVENTS dapl_poll(DAPL_SOCKET s, enum DAPL_FD_EVENTS event)
return fds.revents;
}
-static int dapl_select(struct dapl_fd_set *set)
+static int dapl_select(struct dapl_fd_set *set, int time_ms)
{
int ret;
dapl_dbg_log(DAPL_DBG_TYPE_CM, " dapl_select: sleep, fds=%d\n",
set->index);
- ret = poll(set->set, set->index, -1);
+ ret = poll(set->set, set->index, time_ms);
dapl_dbg_log(DAPL_DBG_TYPE_CM, " dapl_select: wakeup, ret=0x%x\n", ret);
return ret;
}
#endif
/* forward declarations */
+static int ucm_reply(dp_ib_cm_handle_t cm);
static void ucm_accept(ib_cm_srvc_handle_t cm, ib_cm_msg_t *msg);
static void ucm_connect_rtu(dp_ib_cm_handle_t cm, ib_cm_msg_t *msg);
static void ucm_accept_rtu(dp_ib_cm_handle_t cm, ib_cm_msg_t *msg);
static int ucm_send(ib_hca_transport_t *tp, ib_cm_msg_t *msg, DAT_PVOID p_data, DAT_COUNT p_size);
+static void ucm_disconnect_final(dp_ib_cm_handle_t cm);
DAT_RETURN dapli_cm_disconnect(dp_ib_cm_handle_t cm);
+DAT_RETURN dapli_cm_connect(DAPL_EP *ep, dp_ib_cm_handle_t cm);
#define UCM_SND_BURST 50
@@ -221,6 +231,77 @@ static void ucm_free_port(ib_hca_transport_t *tp, uint16_t port)
dapl_os_unlock(&tp->plock);
}
+static void ucm_check_timers(dp_ib_cm_handle_t cm, int *timer)
+{
+ DAPL_OS_TIMEVAL time;
+
+ dapl_os_lock(&cm->lock);
+ dapl_os_get_time(&time);
+ switch (cm->state) {
+ case DCM_REP_PENDING:
+ *timer = cm->hca->ib_trans.cm_timer;
+ /* wait longer each retry */
+ if ((time - cm->timer)/1000 >
+ (cm->hca->ib_trans.rep_time * cm->retries)) {
+ dapl_log(DAPL_DBG_TYPE_WARN,
+ " CM_REQ retry %d [lid, port, qpn]:"
+ " 0x%x %d 0x%x -> 0x%x %d 0x%x\n",
+ cm->retries,
+ ntohs(cm->msg.saddr.ib.lid),
+ ntohs(cm->msg.sport),
+ ntohl(cm->msg.saddr.ib.qpn),
+ ntohs(cm->msg.daddr.ib.lid),
+ ntohs(cm->msg.dport),
+ ntohl(cm->msg.dqpn));
+ dapl_os_unlock(&cm->lock);
+ dapli_cm_connect(cm->ep, cm);
+ return;
+ }
+ break;
+ case DCM_RTU_PENDING:
+ *timer = cm->hca->ib_trans.cm_timer;
+ if ((time - cm->timer)/1000 > cm->hca->ib_trans.rtu_time) {
+ dapl_log(DAPL_DBG_TYPE_WARN,
+ " CM_REPLY retry %d [lid, port, qpn]:"
+ " 0x%x %d 0x%x -> 0x%x %d 0x%x\n",
+ cm->retries,
+ ntohs(cm->msg.saddr.ib.lid),
+ ntohs(cm->msg.sport),
+ ntohl(cm->msg.saddr.ib.qpn),
+ ntohs(cm->msg.daddr.ib.lid),
+ ntohs(cm->msg.dport),
+ ntohl(cm->msg.daddr.ib.qpn));
+ dapl_os_unlock(&cm->lock);
+ ucm_reply(cm);
+ return;
+ }
+ break;
+ case DCM_DISC_PENDING:
+ *timer = cm->hca->ib_trans.cm_timer;
+ /* wait longer each retry */
+ if ((time - cm->timer)/1000 >
+ (cm->hca->ib_trans.rep_time)) {
+ dapl_log(DAPL_DBG_TYPE_WARN,
+ " CM_DREQ retry %d [lid, port, qpn]:"
+ " 0x%x %d 0x%x -> 0x%x %d 0x%x\n",
+ cm->retries,
+ ntohs(cm->msg.saddr.ib.lid),
+ ntohs(cm->msg.sport),
+ ntohl(cm->msg.saddr.ib.qpn),
+ ntohs(cm->msg.daddr.ib.lid),
+ ntohs(cm->msg.dport),
+ ntohl(cm->msg.dqpn));
+ dapl_os_unlock(&cm->lock);
+ dapli_cm_disconnect(cm);
+ return;
+ }
+ break;
+ default:
+ break;
+ }
+ dapl_os_unlock(&cm->lock);
+}
+
/* SEND CM MESSAGE PROCESSING */
/* Get CM UD message from send queue, called with s_lock held */
@@ -313,34 +394,75 @@ static void ucm_process_recv(ib_hca_transport_t *tp,
{
dapl_os_lock(&cm->lock);
switch (cm->state) {
- case DCM_LISTEN:
- dapl_dbg_log(DAPL_DBG_TYPE_CM, " ucm_recv: LISTEN\n");
+ case DCM_LISTEN: /* passive */
dapl_os_unlock(&cm->lock);
ucm_accept(cm, msg);
break;
- case DCM_ACCEPTED:
- dapl_dbg_log(DAPL_DBG_TYPE_CM, " ucm_recv: ACCEPT_RTU\n");
+ case DCM_RTU_PENDING: /* passive */
dapl_os_unlock(&cm->lock);
ucm_accept_rtu(cm, msg);
break;
- case DCM_CONN_PENDING:
- dapl_dbg_log(DAPL_DBG_TYPE_CM, " ucm_recv: CONN_RTU\n");
+ case DCM_REP_PENDING: /* active */
dapl_os_unlock(&cm->lock);
ucm_connect_rtu(cm, msg);
break;
- case DCM_CONNECTED:
- dapl_dbg_log(DAPL_DBG_TYPE_CM, " ucm_recv: DREQ connect\n");
- dapl_os_unlock(&cm->lock);
- if (ntohs(msg->op) == DCM_DREQ)
+ case DCM_CONNECTED: /* active and passive */
+ /* DREQ, change state and process */
+ if (ntohs(msg->op) == DCM_DREQ) {
+ cm->state = DCM_DISC_RECV;
+ dapl_os_unlock(&cm->lock);
dapli_cm_disconnect(cm);
+ break;
+ }
+ /* active: RTU was dropped, resend */
+ if (ntohs(msg->op) == DCM_REP) {
+ dapl_log(DAPL_DBG_TYPE_WARN,
+ " RESEND RTU: op %s st %s [lid, port, qpn]:"
+ " 0x%x %d 0x%x -> 0x%x %d 0x%x\n",
+ dapl_cm_op_str(ntohs(msg->op)),
+ dapl_cm_state_str(cm->state),
+ ntohs(msg->saddr.ib.lid),
+ ntohs(msg->sport),
+ ntohl(msg->saddr.ib.qpn),
+ ntohs(msg->daddr.ib.lid),
+ ntohs(msg->dport),
+ ntohl(msg->daddr.ib.qpn));
+
+ cm->msg.op = htons(DCM_RTU);
+ ucm_send(&cm->hca->ib_trans, &cm->msg, NULL, 0);
+ }
+ dapl_os_unlock(&cm->lock);
break;
- case DCM_DISC_PENDING:
+ case DCM_DISC_PENDING: /* active and passive */
+ /* DREQ or DREP, finalize */
+ dapl_os_unlock(&cm->lock);
+ ucm_disconnect_final(cm);
+ break;
+ case DCM_DISCONNECTED:
case DCM_DESTROY:
- dapl_dbg_log(DAPL_DBG_TYPE_CM, " ucm_recv: DREQ toss\n");
+ /* DREQ dropped, resend */
+ if (ntohs(msg->op) == DCM_DREQ) {
+ dapl_log(DAPL_DBG_TYPE_WARN,
+ " RESEND DREP: op %s st %s [lid, port, qpn]:"
+ " 0x%x %d 0x%x -> 0x%x %d 0x%x\n",
+ dapl_cm_op_str(ntohs(msg->op)),
+ dapl_cm_state_str(cm->state),
+ ntohs(msg->saddr.ib.lid),
+ ntohs(msg->sport),
+ ntohl(msg->saddr.ib.qpn),
+ ntohs(msg->daddr.ib.lid),
+ ntohs(msg->dport),
+ ntohl(msg->daddr.ib.qpn));
+ cm->msg.op = htons(DCM_DREP);
+ ucm_send(&cm->hca->ib_trans, &cm->msg, NULL, 0);
+
+ }
+ dapl_os_unlock(&cm->lock);
break;
+
default:
dapl_log(DAPL_DBG_TYPE_WARN,
- " process_recv: UNKNOWN state"
+ " ucm_recv: UNKNOWN state"
" <- op %d, st %d spsp %d sqpn %d\n",
ntohs(msg->op), cm->state,
ntohs(msg->sport), ntohl(msg->sqpn));
@@ -349,24 +471,19 @@ static void ucm_process_recv(ib_hca_transport_t *tp,
}
}
-/* Find matching CM object for this receive message, return CM reference */
+/* Find matching CM object for this receive message, return CM reference, timer */
dp_ib_cm_handle_t ucm_cm_find(ib_hca_transport_t *tp, ib_cm_msg_t *msg)
{
dp_ib_cm_handle_t cm, next, found = NULL;
struct dapl_llist_entry *list;
DAPL_OS_LOCK lock;
+ int listenq = 0;
- /* connect request - listen list, otherwise conn list */
- if (ntohs(msg->op) == DCM_REQ) {
- dapl_dbg_log(DAPL_DBG_TYPE_CM," search - listenQ\n");
- list = tp->llist;
- lock = tp->llock;
- } else {
- dapl_dbg_log(DAPL_DBG_TYPE_CM," search - connectQ\n");
- list = tp->list;
- lock = tp->lock;
- }
+ /* conn list first, duplicate requests for DCM_REQ */
+ list = tp->list;
+ lock = tp->lock;
+retry_listenq:
dapl_os_lock(&lock);
if (!dapl_llist_is_empty(&list))
next = dapl_llist_peek_head(&list);
@@ -380,46 +497,53 @@ dp_ib_cm_handle_t ucm_cm_find(ib_hca_transport_t *tp, ib_cm_msg_t *msg)
if (cm->state == DCM_DESTROY)
continue;
- dapl_dbg_log(DAPL_DBG_TYPE_CM,
- " MATCH? cm %p st %s sport %d sqpn %x lid %x\n",
- cm, dapl_cm_state_str(cm->state),
- ntohs(cm->msg.sport), ntohl(cm->msg.sqpn),
- ntohs(cm->msg.saddr.ib.lid));
-
- dapl_dbg_log(DAPL_DBG_TYPE_CM,
- " src port %d=%d, sqp %x=%x slid %x=%x, iqp %x=%x\n",
- ntohs(cm->msg.sport), ntohs(msg->dport),
- ntohl(cm->msg.sqpn), ntohl(msg->dqpn),
- ntohs(cm->msg.saddr.ib.lid),
- ntohs(msg->daddr.ib.lid),
- ntohl(cm->msg.saddr.ib.qpn),
- ntohl(msg->daddr.ib.qpn));
- dapl_dbg_log(DAPL_DBG_TYPE_CM,
- " dst port %d=%d, sqp %x=%x slid %x=%x, iqp %x=%x\n",
- ntohs(cm->msg.dport), ntohs(msg->sport),
- ntohl(cm->msg.dqpn), ntohl(msg->sqpn),
- ntohs(cm->msg.daddr.ib.lid),
- ntohs(msg->saddr.ib.lid),
- ntohl(cm->msg.daddr.ib.qpn),
- ntohl(msg->saddr.ib.qpn));
-
- /* REQ: CM sPORT + QPN, match is good enough */
- if ((cm->msg.sport == msg->dport) &&
- (cm->msg.sqpn == msg->dqpn)) {
- if (ntohs(msg->op) == DCM_REQ) {
- found = cm;
- break;
- /* NOT REQ: add remote CM sPORT, QPN, LID match */
- } else if ((cm->msg.dport == msg->sport) &&
- (cm->msg.dqpn == msg->sqpn) &&
- (cm->msg.daddr.ib.lid ==
- msg->saddr.ib.lid)) {
+ /* CM sPORT + QPN, match is good enough for listenq */
+ if (listenq &&
+ cm->msg.sport == msg->dport &&
+ cm->msg.sqpn == msg->dqpn) {
+ found = cm;
+ break;
+ }
+ /* connectq, check src and dst, check duplicate conn_reqs */
+ if (!listenq &&
+ cm->msg.sport == msg->dport && cm->msg.sqpn == msg->dqpn &&
+ cm->msg.dport == msg->sport && cm->msg.dqpn == msg->sqpn &&
+ cm->msg.daddr.ib.lid == msg->saddr.ib.lid) {
+ if (ntohs(msg->op) != DCM_REQ) {
found = cm;
- break;
+ break;
+ } else {
+ /* duplicate; bail and throw away */
+ dapl_os_unlock(&lock);
+ dapl_log(DAPL_DBG_TYPE_CM,
+ " duplicate: op %s st %s [lid, port, qpn]:"
+ " 0x%x %d 0x%x <- 0x%x %d 0x%x\n",
+ dapl_cm_op_str(ntohs(msg->op)),
+ dapl_cm_state_str(cm->state),
+ ntohs(msg->daddr.ib.lid),
+ ntohs(msg->dport),
+ ntohl(msg->daddr.ib.qpn),
+ ntohs(msg->saddr.ib.lid),
+ ntohs(msg->sport),
+ ntohl(msg->saddr.ib.qpn));
+ return NULL;
}
}
}
dapl_os_unlock(&lock);
+
+ /* no duplicate request on connq, check listenq for new request */
+ if (ntohs(msg->op) == DCM_REQ && !listenq && !found) {
+ listenq = 1;
+ list = tp->llist;
+ lock = tp->llock;
+ goto retry_listenq;
+ }
+
+ /* not match on listenq for valid request, send reject */
+ if (ntohs(msg->op) == DCM_REQ && !found)
+ ucm_reject(tp, msg);
+
return found;
}
@@ -467,12 +591,16 @@ retry:
continue;
}
if (!(cm = ucm_cm_find(tp, msg))) {
- dapl_log(DAPL_DBG_TYPE_CM,
- " ucm_recv: NO MATCH op %d port %d cqp %x\n",
- ntohs(msg->op), ntohs(msg->dport),
- ntohl(msg->dqpn));
- if (ntohs(msg->op) == DCM_REQ)
- ucm_reject(tp, msg);
+ dapl_log(DAPL_DBG_TYPE_WARN,
+ " ucm_recv: NO MATCH op %s 0x%x %d i0x%x c0x%x"
+ " < 0x%x %d 0x%x\n",
+ dapl_cm_op_str(ntohs(msg->op)),
+ ntohs(msg->daddr.ib.lid), ntohs(msg->dport),
+ ntohl(msg->daddr.ib.qpn),
+ ntohl(msg->sqpn),
+ ntohs(msg->saddr.ib.lid), ntohs(msg->sport),
+ ntohl(msg->saddr.ib.qpn));
+
ucm_post_rmsg(tp, msg);
continue;
}
@@ -485,7 +613,6 @@ retry:
/* finished this batch of WC's, poll and rearm */
goto retry;
-
}
/* ACTIVE/PASSIVE: build and send CM message out of CM object */
@@ -504,8 +631,10 @@ static int ucm_send(ib_hca_transport_t *tp, ib_cm_msg_t *msg, DAT_PVOID p_data,
len = (sizeof(*msg) - DCM_MAX_PDATA_SIZE);
dapl_os_memcpy(smsg, msg, len);
- if (p_size)
+ if (p_size) {
+ smsg->p_size = ntohs(p_size);
dapl_os_memcpy(&smsg->p_data, p_data, p_size);
+ }
wr.next = NULL;
wr.sg_list = &sge;
@@ -634,8 +763,10 @@ void dapls_ib_cm_free(dp_ib_cm_handle_t cm, DAPL_EP *ep)
dapl_dbg_log(DAPL_DBG_TYPE_CM,
" cm_destroy: cm %p ep %p\n", cm, ep);
- if (!cm && ep)
- return (ucm_ud_free(ep));
+ if (!cm && ep) {
+ ucm_ud_free(ep);
+ return;
+ }
dapl_os_lock(&cm->lock);
@@ -669,7 +800,7 @@ void dapls_ib_cm_free(dp_ib_cm_handle_t cm, DAPL_EP *ep)
send(cm->hca->ib_trans.scm[1], "w", sizeof "w", 0);
}
-/* ACTIVE/PASSIVE: queue up connection object on CM list */
+/* ACTIVE/PASSIVE: queue up connection object on CM list, wakeup thread */
static void ucm_queue_conn(dp_ib_cm_handle_t cm)
{
/* add to work queue, list, for cm thread processing */
@@ -678,6 +809,7 @@ static void ucm_queue_conn(dp_ib_cm_handle_t cm)
dapl_llist_add_tail(&cm->hca->ib_trans.list,
(DAPL_LLIST_ENTRY *)&cm->entry, cm);
dapl_os_unlock(&cm->hca->ib_trans.lock);
+ send(cm->hca->ib_trans.scm[1], "w", sizeof "w", 0);
}
/* PASSIVE: queue up listen object on listen list */
@@ -698,54 +830,68 @@ static void ucm_dequeue_listen(dp_ib_cm_handle_t cm) {
dapl_os_unlock(&cm->hca->ib_trans.llock);
}
+static void ucm_disconnect_final(dp_ib_cm_handle_t cm)
+{
+ /* no EP attachment or not RC, nothing to process */
+ if (cm->ep == NULL ||
+ cm->ep->param.ep_attr.service_type != DAT_SERVICE_TYPE_RC)
+ return;
+
+ dapl_os_lock(&cm->lock);
+ if (cm->state == DCM_DISCONNECTED) {
+ dapl_os_unlock(&cm->lock);
+ return;
+ }
+
+ cm->state = DCM_DISCONNECTED;
+ dapl_os_unlock(&cm->lock);
+
+ if (cm->sp)
+ dapls_cr_callback(cm, IB_CME_DISCONNECTED, NULL, cm->sp);
+ else
+ dapl_evd_connection_callback(cm, IB_CME_DISCONNECTED, NULL, cm->ep);
+}
+
/*
- * ACTIVE/PASSIVE: called from CR thread or consumer via ep_disconnect
- * or from ep_free
+ * called from consumer thread via ep_disconnect/ep_free or
+ * from cm_thread when receiving DREQ
*/
DAT_RETURN dapli_cm_disconnect(dp_ib_cm_handle_t cm)
{
- DAPL_EP *ep = cm->ep;
-
- if (ep == NULL)
- return DAT_SUCCESS;
+ int finalize = 1;
dapl_os_lock(&cm->lock);
- if ((cm->state == DCM_INIT) ||
- (cm->state == DCM_DISC_PENDING) ||
- (cm->state == DCM_DISCONNECTED) ||
- (cm->state == DCM_DESTROY)) {
+ switch (cm->state) {
+ case DCM_CONNECTED:
+ /* send DREQ, event after DREP or DREQ timeout */
+ cm->state = DCM_DISC_PENDING;
+ cm->msg.op = htons(DCM_DREQ);
+ cm->retries = 1;
+ finalize = 0; /* wait for DREP, wakeup timer thread */
+ send(cm->hca->ib_trans.scm[1], "w", sizeof "w", 0);
+ break;
+ case DCM_DISC_PENDING:
+ /* DREQ timeout, resend until retries exhausted */
+ cm->msg.op = htons(DCM_DREQ);
+ if (cm->retries++ >= cm->hca->ib_trans.retries)
+ finalize = 1;
+ break;
+ case DCM_DISC_RECV:
+ /* DREQ received, send DREP and schedule event */
+ cm->msg.op = htons(DCM_DREP);
+ break;
+ default:
dapl_os_unlock(&cm->lock);
return DAT_SUCCESS;
- } else {
- /* send disc, schedule destroy */
- cm->msg.op = htons(DCM_DREQ);
- if (ucm_send(&cm->hca->ib_trans, &cm->msg, NULL, 0)) {
- dapl_log(DAPL_DBG_TYPE_WARN,
- " disc_req: ERR-> %s lid %d qpn %d"
- " r_psp %d \n", strerror(errno),
- htons(cm->msg.saddr.ib.lid),
- htonl(cm->msg.saddr.ib.qpn),
- htons(cm->msg.sport));
- }
- cm->state = DCM_DISC_PENDING;
}
+
+ dapl_os_get_time(&cm->timer); /* reply expected */
+ ucm_send(&cm->hca->ib_trans, &cm->msg, NULL, 0);
dapl_os_unlock(&cm->lock);
- /* disconnect events for RC's only */
- if (ep->param.ep_attr.service_type == DAT_SERVICE_TYPE_RC) {
- if (ep->cr_ptr) {
- dapls_cr_callback(cm,
- IB_CME_DISCONNECTED,
- NULL,
- ((DAPL_CR *)ep->cr_ptr)->sp_ptr);
- } else {
- dapl_evd_connection_callback(ep->cm_handle,
- IB_CME_DISCONNECTED,
- NULL, ep);
- }
- }
-
- /* scheduled destroy via disconnect clean in callback */
+ if (finalize)
+ ucm_disconnect_final(cm);
+
return DAT_SUCCESS;
}
@@ -765,43 +911,55 @@ dapli_cm_connect(DAPL_EP *ep, dp_ib_cm_handle_t cm)
htons(cm->msg.dport));
dapl_os_lock(&cm->lock);
- if (cm->state == DCM_INIT)
- cm->state = DCM_CONN_PENDING;
- else if (++cm->retries == DCM_RETRY_CNT) {
+ if (cm->state != DCM_REP_PENDING) {
+ dapl_os_unlock(&cm->lock);
+ return DAT_INVALID_STATE;
+ }
+
+ if (cm->retries++ == cm->hca->ib_trans.retries) {
dapl_log(DAPL_DBG_TYPE_WARN,
- " connect: RETRIES EXHAUSTED -> lid %d qpn %d r_psp"
- " %d p_sz=%d\n",
- strerror(errno), htons(cm->msg.daddr.ib.lid),
- htonl(cm->msg.dqpn), htons(cm->msg.dport),
- htons(cm->msg.p_size));
+ " CM_REQ: RETRIES EXHAUSTED:"
+ " 0x%x %d 0x%x -> 0x%x %d 0x%x\n",
+ htons(cm->msg.saddr.ib.lid),
+ htonl(cm->msg.saddr.ib.qpn),
+ htons(cm->msg.sport),
+ htons(cm->msg.daddr.ib.lid),
+ htonl(cm->msg.dqpn),
+ htons(cm->msg.dport));
/* update ep->cm reference so we get cleaned up on callback */
if (cm->msg.saddr.ib.qp_type == IBV_QPT_RC);
ep->cm_handle = cm;
dapl_os_unlock(&cm->lock);
+
+#ifdef DAPL_COUNTERS
+ if (g_dapl_dbg_type & DAPL_DBG_TYPE_CM_LIST)
+ dapls_print_cm_list(ep->header.owner_ia);
+#endif
dapl_evd_connection_callback(cm,
IB_CME_DESTINATION_UNREACHABLE,
NULL, ep);
-
+
return DAT_ERROR(DAT_INVALID_ADDRESS,
DAT_INVALID_ADDRESS_UNREACHABLE);
}
dapl_os_unlock(&cm->lock);
cm->msg.op = htons(DCM_REQ);
+ dapl_os_get_time(&cm->timer); /* reply expected */
if (ucm_send(&cm->hca->ib_trans, &cm->msg,
&cm->msg.p_data, ntohs(cm->msg.p_size)))
goto bail;
/* first time through, put on work queue */
- if (!cm->retries)
+ if (cm->retries == 1)
ucm_queue_conn(cm);
-
+
return DAT_SUCCESS;
bail:
- dapl_log(DAPL_DBG_TYPE_ERR,
+ dapl_log(DAPL_DBG_TYPE_WARN,
" connect: ERR %s -> cm_lid %d cm_qpn %d r_psp %d p_sz=%d\n",
strerror(errno), htons(cm->msg.daddr.ib.lid),
htonl(cm->msg.dqpn), htons(cm->msg.dport),
@@ -821,7 +979,7 @@ static void ucm_connect_rtu(dp_ib_cm_handle_t cm, ib_cm_msg_t *msg)
ib_cm_events_t event = IB_CME_CONNECTED;
dapl_os_lock(&cm->lock);
- if (cm->state != DCM_CONN_PENDING) {
+ if (cm->state != DCM_REP_PENDING) {
dapl_log(DAPL_DBG_TYPE_WARN,
" CONN_RTU: UNEXPECTED state:"
" op %d, st %s <- lid %d sqpn %d sport %d\n",
@@ -831,7 +989,6 @@ static void ucm_connect_rtu(dp_ib_cm_handle_t cm, ib_cm_msg_t *msg)
dapl_os_unlock(&cm->lock);
return;
}
- dapl_os_unlock(&cm->lock);
/* save remote address information to EP and CM */
dapl_os_memcpy(&ep->remote_ia_address,
@@ -871,10 +1028,9 @@ static void ucm_connect_rtu(dp_ib_cm_handle_t cm, ib_cm_msg_t *msg)
event = IB_CME_DESTINATION_REJECT;
if (event != IB_CME_CONNECTED) {
- dapl_log(DAPL_DBG_TYPE_CM,
- " CONN_RTU: REJ op=%d <- lid %x, iqp %x, psp %d\n",
- ntohs(msg->op), ntohs(msg->saddr.ib.lid),
- ntohl(msg->saddr.ib.qpn), ntohs(msg->sport));
+ cm->state = DCM_REJECTED;
+ dapl_os_unlock(&cm->lock);
+
#ifdef DAT_EXTENSIONS
if (cm->msg.daddr.ib.qp_type == IBV_QPT_UD)
goto ud_bail;
@@ -882,6 +1038,7 @@ static void ucm_connect_rtu(dp_ib_cm_handle_t cm, ib_cm_msg_t *msg)
#endif
goto bail;
}
+ dapl_os_unlock(&cm->lock);
/* modify QP to RTR and then to RTS with remote info */
dapl_os_lock(&cm->ep->header.lock);
@@ -971,8 +1128,10 @@ ud_bail:
if (event == IB_CME_CONNECTED)
event = DAT_IB_UD_CONNECTION_EVENT_ESTABLISHED;
- else
+ else {
+ xevent.type = DAT_IB_UD_CONNECT_REJECT;
event = DAT_IB_UD_CONNECTION_REJECT_EVENT;
+ }
dapls_evd_post_connection_event_ext(
(DAPL_EVD *)cm->ep->param.connect_evd_handle,
@@ -996,11 +1155,12 @@ ud_bail:
cm->msg.p_data, cm->ep);
}
return;
-
bail:
if (cm->msg.saddr.ib.qp_type != IBV_QPT_UD)
dapls_ib_reinit_ep(cm->ep); /* reset QP state */
+
dapl_evd_connection_callback(NULL, event, cm->msg.p_data, cm->ep);
+ dapls_ib_cm_free(cm, NULL);
}
/*
@@ -1022,7 +1182,7 @@ static void ucm_accept(ib_cm_srvc_handle_t cm, ib_cm_msg_t *msg)
/* dest CM info from CR msg, source CM info from listen */
acm->sp = cm->sp;
acm->hca = cm->hca;
- acm->state = DCM_ACCEPTING;
+ acm->retries = 1;
acm->msg.dport = msg->sport;
acm->msg.dqpn = msg->sqpn;
acm->msg.sport = cm->msg.sport;
@@ -1049,7 +1209,7 @@ static void ucm_accept(ib_cm_srvc_handle_t cm, ib_cm_msg_t *msg)
dapl_os_memcpy(acm->msg.p_data,
msg->p_data, ntohs(msg->p_size));
- acm->state = DCM_ACCEPTING_DATA;
+ acm->state = DCM_ACCEPTING;
ucm_queue_conn(acm);
#ifdef DAT_EXTENSIONS
@@ -1086,7 +1246,7 @@ bail:
static void ucm_accept_rtu(dp_ib_cm_handle_t cm, ib_cm_msg_t *msg)
{
dapl_os_lock(&cm->lock);
- if ((ntohs(msg->op) != DCM_RTU) || (cm->state != DCM_ACCEPTED)) {
+ if ((ntohs(msg->op) != DCM_RTU) || (cm->state != DCM_RTU_PENDING)) {
dapl_log(DAPL_DBG_TYPE_WARN,
" accept_rtu: UNEXPECTED op, state:"
" op %d, st %s <- lid %x iqp %x sport %d\n",
@@ -1168,6 +1328,59 @@ bail:
}
/*
+ * PASSIVE: user accepted, send reply message with pdata
+ */
+static int ucm_reply(dp_ib_cm_handle_t cm)
+{
+ dapl_os_lock(&cm->lock);
+ if (cm->state != DCM_RTU_PENDING) {
+ dapl_os_unlock(&cm->lock);
+ return -1;
+ }
+
+ if (++cm->retries == cm->hca->ib_trans.retries) {
+ dapl_log(DAPL_DBG_TYPE_WARN,
+ " CM_REP: RETRIES EXHAUSTED"
+ " 0x%x %d 0x%x -> 0x%x %d 0x%x\n",
+ htons(cm->msg.saddr.ib.lid),
+ htons(cm->msg.sport),
+ htonl(cm->msg.saddr.ib.qpn),
+ htons(cm->msg.daddr.ib.lid),
+ htons(cm->msg.dport),
+ htonl(cm->msg.daddr.ib.qpn));
+
+ dapl_os_unlock(&cm->lock);
+#ifdef DAT_EXTENSIONS
+ if (cm->msg.saddr.ib.qp_type == IBV_QPT_UD) {
+ DAT_IB_EXTENSION_EVENT_DATA xevent;
+
+ /* post REJECT event with CONN_REQ p_data */
+ xevent.status = 0;
+ xevent.type = DAT_IB_UD_CONNECT_ERROR;
+
+ dapls_evd_post_connection_event_ext(
+ (DAPL_EVD *)cm->ep->param.connect_evd_handle,
+ DAT_IB_UD_CONNECTION_ERROR_EVENT,
+ (DAT_EP_HANDLE)cm->ep,
+ (DAT_COUNT)ntohs(cm->msg.p_size),
+ (DAT_PVOID *)cm->msg.p_data,
+ (DAT_PVOID *)&xevent);
+ } else
+#endif
+ dapls_cr_callback(cm, IB_CME_LOCAL_FAILURE,
+ NULL, cm->sp);
+ return -1;
+ }
+ dapl_os_get_time(&cm->timer); /* RTU expected */
+ dapl_os_unlock(&cm->lock);
+ if (ucm_send(&cm->hca->ib_trans, &cm->msg, cm->p_data, cm->p_size))
+ return -1;
+
+ return 0;
+}
+
+
+/*
* PASSIVE: consumer accept, send local QP information, private data,
* queue on work thread to receive RTU information to avoid blocking
* user thread.
@@ -1182,7 +1395,7 @@ dapli_accept_usr(DAPL_EP *ep, DAPL_CR *cr, DAT_COUNT p_size, DAT_PVOID p_data)
return DAT_LENGTH_ERROR;
dapl_os_lock(&cm->lock);
- if (cm->state != DCM_ACCEPTING_DATA) {
+ if (cm->state != DCM_ACCEPTING) {
dapl_os_unlock(&cm->lock);
return DAT_INVALID_STATE;
}
@@ -1193,7 +1406,8 @@ dapli_accept_usr(DAPL_EP *ep, DAPL_CR *cr, DAT_COUNT p_size, DAT_PVOID p_data)
" iqp=%x qp_type %d, psize=%d\n",
ntohs(cm->msg.daddr.ib.lid),
ntohl(cm->msg.daddr.ib.qpn), cm->msg.daddr.ib.qp_type,
- ntohs(cm->msg.p_size));
+ p_size);
+
dapl_dbg_log(DAPL_DBG_TYPE_CM,
" ACCEPT_USR: remote GID subnet %016llx id %016llx\n",
(unsigned long long)
@@ -1204,7 +1418,7 @@ dapli_accept_usr(DAPL_EP *ep, DAPL_CR *cr, DAT_COUNT p_size, DAT_PVOID p_data)
#ifdef DAT_EXTENSIONS
if (cm->msg.daddr.ib.qp_type == IBV_QPT_UD &&
ep->qp_handle->qp_type != IBV_QPT_UD) {
- dapl_dbg_log(DAPL_DBG_TYPE_ERR,
+ dapl_log(DAPL_DBG_TYPE_ERR,
" ACCEPT_USR: ERR remote QP is UD,"
", but local QP is not\n");
return (DAT_INVALID_HANDLE | DAT_INVALID_HANDLE_EP);
@@ -1262,18 +1476,23 @@ dapli_accept_usr(DAPL_EP *ep, DAPL_CR *cr, DAT_COUNT p_size, DAT_PVOID p_data)
dapl_os_lock(&cm->lock);
cm->ep = ep;
cm->hca = ia->hca_ptr;
- cm->state = DCM_ACCEPTED;
+ cm->state = DCM_RTU_PENDING;
+ dapl_os_get_time(&cm->timer); /* RTU expected */
dapl_os_unlock(&cm->lock);
- if (ucm_send(&cm->hca->ib_trans, &cm->msg, p_data, p_size))
+ if (ucm_reply(cm))
goto bail;
dapl_dbg_log(DAPL_DBG_TYPE_CM, " PASSIVE: accepted!\n");
- return DAT_SUCCESS;
+
+ /* Timed RTU, wakeup thread */
+ send(cm->hca->ib_trans.scm[1], "w", sizeof "w", 0);
+ return DAT_SUCCESS;
bail:
if (cm->msg.saddr.ib.qp_type != IBV_QPT_UD)
dapls_ib_reinit_ep(ep);
+
dapls_ib_cm_free(cm, ep);
return DAT_INTERNAL_ERROR;
}
@@ -1326,6 +1545,8 @@ dapls_ib_connect(IN DAT_EP_HANDLE ep_handle,
cm->msg.p_size = htons(p_size);
dapl_os_memcpy(&cm->msg.p_data, p_data, p_size);
}
+
+ cm->state = DCM_REP_PENDING;
/* build connect request, send to remote CM based on r_addr info */
return(dapli_cm_connect(ep, cm));
@@ -1571,7 +1792,6 @@ dapls_ib_reject_connection(IN dp_ib_cm_handle_t cm,
IN int reason,
IN DAT_COUNT psize, IN const DAT_PVOID pdata)
{
-
dapl_dbg_log(DAPL_DBG_TYPE_EP,
" reject(cm %p reason %x, pdata %p, psize %d)\n",
cm, reason, pdata, psize);
@@ -1579,22 +1799,24 @@ dapls_ib_reject_connection(IN dp_ib_cm_handle_t cm,
if (psize > DCM_MAX_PDATA_SIZE)
return DAT_LENGTH_ERROR;
- cm->msg.op = htons(DCM_REJ_USER);
- if (psize)
- dapl_os_memcpy(&cm->msg.p_data, pdata, psize);
-
- /* cr_thread will destroy CR */
+ /* cr_thread will destroy CR, update saddr lid, gid info */
dapl_os_lock(&cm->lock);
- cm->state = DCM_REJECTING;
- dapl_os_unlock(&cm->lock);
-
- if (ucm_send(&cm->hca->ib_trans, &cm->msg, NULL, 0)) {
+ cm->state = DCM_REJECTED;
+ cm->msg.saddr.ib.lid = cm->hca->ib_trans.addr.ib.lid;
+ dapl_os_memcpy(&cm->msg.saddr.ib.gid[0],
+ &cm->hca->ib_trans.addr.ib.gid, 16);
+ cm->msg.op = htons(DCM_REJ_USER);
+
+ if (ucm_send(&cm->hca->ib_trans, &cm->msg, pdata, psize)) {
dapl_log(DAPL_DBG_TYPE_WARN,
" cm_reject: ERR: %s\n", strerror(errno));
return DAT_INTERNAL_ERROR;
}
+ dapl_os_unlock(&cm->lock);
- send(cm->hca->ib_trans.scm[1], "w", sizeof "w", 0);
+ /* cleanup and destroy CM resources */
+ dapls_ib_cm_free(cm, NULL);
+
return DAT_SUCCESS;
}
@@ -1786,6 +2008,7 @@ void cm_thread(void *arg)
dp_ib_cm_handle_t cm, next;
struct dapl_fd_set *set;
char rbuf[2];
+ int time_ms;
dapl_dbg_log(DAPL_DBG_TYPE_UTIL, " cm_thread: ENTER hca %p\n", hca);
set = dapl_alloc_fd_set();
@@ -1796,11 +2019,12 @@ void cm_thread(void *arg)
hca->ib_trans.cm_state = IB_THREAD_RUN;
while (1) {
+ time_ms = -1; /* reset to blocking */
dapl_fd_zero(set);
dapl_fd_set(hca->ib_trans.scm[0], set, DAPL_FD_READ);
dapl_fd_set(hca->ib_hca_handle->async_fd, set, DAPL_FD_READ);
dapl_fd_set(hca->ib_trans.rch->fd, set, DAPL_FD_READ);
-
+
if (!dapl_llist_is_empty(&hca->ib_trans.list))
next = dapl_llist_peek_head(&hca->ib_trans.list);
else
@@ -1811,18 +2035,18 @@ void cm_thread(void *arg)
next = dapl_llist_next_entry(
&hca->ib_trans.list,
(DAPL_LLIST_ENTRY *)&cm->entry);
-
+ dapl_os_lock(&cm->lock);
if (cm->state == DCM_DESTROY ||
hca->ib_trans.cm_state != IB_THREAD_RUN) {
dapl_llist_remove_entry(
&hca->ib_trans.list,
(DAPL_LLIST_ENTRY *)&cm->entry);
+ dapl_os_unlock(&cm->lock);
dapl_os_free(cm, sizeof(*cm));
continue;
}
-
- /* TODO: Check and process retries here */
-
+ dapl_os_unlock(&cm->lock);
+ ucm_check_timers(cm, &time_ms);
continue;
}
@@ -1832,9 +2056,7 @@ void cm_thread(void *arg)
break;
dapl_os_unlock(&hca->ib_trans.lock);
- dapl_dbg_log(DAPL_DBG_TYPE_UTIL, " cm_thread: select sleep\n");
- dapl_select(set);
- dapl_dbg_log(DAPL_DBG_TYPE_UTIL, " cm_thread: select wake\n");
+ dapl_select(set, time_ms);
/* Process events: CM, ASYNC, NOTIFY THREAD */
if (dapl_poll(hca->ib_trans.rch->fd,
@@ -1870,38 +2092,66 @@ out:
/* Debug aid: List all Connections in process and state */
void dapls_print_cm_list(IN DAPL_IA *ia_ptr)
{
- /* Print in process CR's for this IA, if debug type set */
+ /* Print in process CM's for this IA, if debug type set */
int i = 0;
- dp_ib_cm_handle_t cr, next_cr;
+ dp_ib_cm_handle_t cm, next_cm;
+ struct dapl_llist_entry *list;
+ DAPL_OS_LOCK lock;
+
+ /* LISTEN LIST */
+ list = ia_ptr->hca_ptr->ib_trans.llist;
+ lock = ia_ptr->hca_ptr->ib_trans.llock;
- dapl_os_lock(&ia_ptr->hca_ptr->ib_trans.lock);
- if (!dapl_llist_is_empty((DAPL_LLIST_HEAD*)
- &ia_ptr->hca_ptr->ib_trans.list))
- next_cr = dapl_llist_peek_head((DAPL_LLIST_HEAD*)
- &ia_ptr->hca_ptr->ib_trans.list);
+ dapl_os_lock(&lock);
+ if (!dapl_llist_is_empty((DAPL_LLIST_HEAD*)&list))
+ next_cm = dapl_llist_peek_head((DAPL_LLIST_HEAD*)&list);
else
- next_cr = NULL;
-
- printf("\n DAPL IA CONNECTIONS IN PROCESS:\n");
- while (next_cr) {
- cr = next_cr;
- next_cr = dapl_llist_next_entry((DAPL_LLIST_HEAD*)
- &ia_ptr->hca_ptr->ib_trans.list,
- (DAPL_LLIST_ENTRY*)&cr->entry);
-
- printf( " CONN[%d]: sp %p ep %p %s %s %s"
- " dst lid %x iqp %x port %d\n",
- i, cr->sp, cr->ep,
- cr->msg.saddr.ib.qp_type == IBV_QPT_RC ? "RC" : "UD",
- dapl_cm_state_str(cr->state),
- cr->sp ? "<-" : "->",
- ntohs(cr->msg.daddr.ib.lid),
- ntohl(cr->msg.daddr.ib.qpn),
- cr->sp ?
- (int)cr->sp->conn_qual : ntohs(cr->msg.dport) );
+ next_cm = NULL;
+
+ printf("\n DAPL IA LISTEN/CONNECTIONS IN PROCESS:\n");
+ while (next_cm) {
+ cm = next_cm;
+ next_cm = dapl_llist_next_entry((DAPL_LLIST_HEAD*)list,
+ (DAPL_LLIST_ENTRY*)&cm->entry);
+
+ printf( " LISTEN[%d]: sp %p %s uCM_QP: 0x%x %d 0x%x\n",
+ i, cm->sp, dapl_cm_state_str(cm->state),
+ ntohs(cm->msg.saddr.ib.lid), ntohs(cm->msg.sport),
+ ntohl(cm->msg.sqpn));
+ i++;
+ }
+ dapl_os_unlock(&lock);
+
+ /* CONNECTION LIST */
+ list = ia_ptr->hca_ptr->ib_trans.list;
+ lock = ia_ptr->hca_ptr->ib_trans.lock;
+
+ dapl_os_lock(&lock);
+ if (!dapl_llist_is_empty((DAPL_LLIST_HEAD*)&list))
+ next_cm = dapl_llist_peek_head((DAPL_LLIST_HEAD*)&list);
+ else
+ next_cm = NULL;
+
+ while (next_cm) {
+ cm = next_cm;
+ next_cm = dapl_llist_next_entry((DAPL_LLIST_HEAD*)&list,
+ (DAPL_LLIST_ENTRY*)&cm->entry);
+
+ printf( " CONN[%d]: ep %p cm %p %s %s"
+ " 0x%x %d 0x%x %s 0x%x %d 0x%x\n",
+ i, cm->ep, cm,
+ cm->msg.saddr.ib.qp_type == IBV_QPT_RC ? "RC" : "UD",
+ dapl_cm_state_str(cm->state),
+ ntohs(cm->msg.saddr.ib.lid),
+ ntohs(cm->msg.sport),
+ ntohl(cm->msg.saddr.ib.qpn),
+ cm->sp ? "<-" : "->",
+ ntohs(cm->msg.daddr.ib.lid),
+ ntohs(cm->msg.dport),
+ ntohl(cm->msg.daddr.ib.qpn));
i++;
}
printf("\n");
- dapl_os_unlock(&ia_ptr->hca_ptr->ib_trans.lock);
+ dapl_os_unlock(&lock);
}
#endif
diff --git a/dapl/openib_ucm/dapl_ib_util.h b/dapl/openib_ucm/dapl_ib_util.h
index ef5358a..53f00bb 100644
--- a/dapl/openib_ucm/dapl_ib_util.h
+++ b/dapl/openib_ucm/dapl_ib_util.h
@@ -33,13 +33,11 @@
#include "openib_osd.h"
#include "dapl_ib_common.h"
-#define UCM_DEFAULT_CQE 500
-#define UCM_DEFAULT_QPE 500
-
struct ib_cm_handle
{
struct dapl_llist_entry entry;
DAPL_OS_LOCK lock;
+ DAPL_OS_TIMEVAL timer;
int state;
int retries;
struct dapl_hca *hca;
@@ -92,6 +90,10 @@ typedef struct _ib_hca_transport
DAPL_SOCKET scm[2];
int cqe;
int qpe;
+ int retries;
+ int cm_timer;
+ int rep_time;
+ int rtu_time;
DAPL_OS_LOCK slock;
int s_hd;
int s_tl;
diff --git a/dapl/openib_ucm/device.c b/dapl/openib_ucm/device.c
index b887186..bd31cea 100644
--- a/dapl/openib_ucm/device.c
+++ b/dapl/openib_ucm/device.c
@@ -421,9 +421,13 @@ static int ucm_service_create(IN DAPL_HCA *hca)
dapl_dbg_log(DAPL_DBG_TYPE_UTIL, " ucm_create: \n");
- /* get queue sizes */
- tp->qpe = dapl_os_get_env_val("DAPL_UCM_QPE", UCM_DEFAULT_QPE);
- tp->cqe = dapl_os_get_env_val("DAPL_UCM_CQE", UCM_DEFAULT_CQE);
+ /* setup CM timers and queue sizes */
+ tp->retries = dapl_os_get_env_val("DAPL_UCM_RETRY", DCM_RETRY_CNT);
+ tp->rep_time = dapl_os_get_env_val("DAPL_UCM_REP_TIME", DCM_REP_TIME);
+ tp->rtu_time = dapl_os_get_env_val("DAPL_UCM_RTU_TIME", DCM_RTU_TIME);
+ tp->cm_timer = DAPL_MIN(tp->rep_time,tp->rtu_time);
+ tp->qpe = dapl_os_get_env_val("DAPL_UCM_QP_SIZE", DCM_QP_SIZE);
+ tp->cqe = dapl_os_get_env_val("DAPL_UCM_CQ_SIZE", DCM_CQ_SIZE);
tp->pd = ibv_alloc_pd(hca->ib_hca_handle);
if (!tp->pd)
goto bail;
diff --git a/dat/include/dat2/dat_ib_extensions.h b/dat/include/dat2/dat_ib_extensions.h
index 59df1de..a32a4ed 100755
--- a/dat/include/dat2/dat_ib_extensions.h
+++ b/dat/include/dat2/dat_ib_extensions.h
@@ -71,9 +71,10 @@
* dat_query_counters(), dat_print_counters()
*
* 2.0.4 - Add DAT_IB_UD_CONNECTION_REJECT_EVENT extended UD event
+ * 2.0.5 - Add DAT_IB_UD extended UD connection error events
*
*/
-#define DAT_IB_EXTENSION_VERSION 204 /* 2.0.4 */
+#define DAT_IB_EXTENSION_VERSION 205 /* 2.0.5 */
#define DAT_ATTR_COUNTERS "DAT_COUNTERS"
#define DAT_IB_ATTR_FETCH_AND_ADD "DAT_IB_FETCH_AND_ADD"
#define DAT_IB_ATTR_CMP_AND_SWAP "DAT_IB_CMP_AND_SWAP"
@@ -92,7 +93,8 @@ typedef enum dat_ib_event_number
DAT_IB_DTO_EVENT = DAT_IB_EXTENSION_RANGE_BASE,
DAT_IB_UD_CONNECTION_REQUEST_EVENT,
DAT_IB_UD_CONNECTION_EVENT_ESTABLISHED,
- DAT_IB_UD_CONNECTION_REJECT_EVENT
+ DAT_IB_UD_CONNECTION_REJECT_EVENT,
+ DAT_IB_UD_CONNECTION_ERROR_EVENT
} DAT_IB_EVENT_NUMBER;
@@ -129,7 +131,9 @@ typedef enum dat_ib_ext_type
DAT_IB_UD_REMOTE_AH, // 6
DAT_IB_UD_PASSIVE_REMOTE_AH, // 7
DAT_IB_UD_SEND, // 8
- DAT_IB_UD_RECV // 9
+ DAT_IB_UD_RECV, // 9
+ DAT_IB_UD_CONNECT_REJECT, // 10
+ DAT_IB_UD_CONNECT_ERROR, // 11
} DAT_IB_EXT_TYPE;
--
1.5.2.5
More information about the ofw
mailing list