[ofw] [PATCH 2/3] uDAPL v2: cma: improve serialization of destroy and event processing

Davis, Arlin R arlin.r.davis at intel.com
Mon Sep 28 15:08:16 PDT 2009


WinOF testing with slightly different scheduler and verbs
showed some issues with cleanup. Add better protection around
destroy and event processing thread.

Remove destroy flag and add refs counting to conn objects
to block destroy until all references are cleared. Add
locking aroung ref counting and passive and active
event processing.

Signed-off-by: Arlin Davis <arlin.r.davis at intel.com>
---
 dapl/openib_cma/cm.c           |  264 ++++++++++++++++++---------------------
 dapl/openib_cma/dapl_ib_util.h |    2 +-
 dapl/openib_cma/device.c       |   17 ++-
 3 files changed, 133 insertions(+), 150 deletions(-)

diff --git a/dapl/openib_cma/cm.c b/dapl/openib_cma/cm.c
index 545190d..40634b2 100644
--- a/dapl/openib_cma/cm.c
+++ b/dapl/openib_cma/cm.c
@@ -163,6 +163,7 @@ dp_ib_cm_handle_t dapls_ib_cm_create(DAPL_EP *ep)

        dapl_os_memzero(conn, sizeof(*conn));
        dapl_os_lock_init(&conn->lock);
+       conn->refs++;

        /* create CM_ID, bind to local device, create QP */
        if (rdma_create_id(g_cm_events, &cm_id, (void *)conn, RDMA_PS_TCP)) {
@@ -189,46 +190,37 @@ dp_ib_cm_handle_t dapls_ib_cm_create(DAPL_EP *ep)
 }

 /*
- * Called from consumer thread via dat_ep_free().
- * CANNOT be called from the async event processing thread
- * dapli_cma_event_cb() since a cm_id reference is held and
- * a deadlock will occur.
+ * Only called from consumer thread via dat_ep_free()
+ * accept, reject, or connect.
+ * Cannot be called from callback thread.
+ * rdma_destroy_id will block until rdma_get_cm_event is acked.
  */
-
 void dapls_ib_cm_free(dp_ib_cm_handle_t conn, DAPL_EP *ep)
 {
-       struct rdma_cm_id *cm_id;
-
-       if (conn == NULL)
-               return;
-
        dapl_dbg_log(DAPL_DBG_TYPE_CM,
-                    " destroy_conn: conn %p id %d\n", conn, conn->cm_id);
+                    " destroy_conn: conn %p id %d\n",
+                    conn, conn->cm_id);

        dapl_os_lock(&conn->lock);
-       conn->destroy = 1;
+       conn->refs--;
+       dapl_os_unlock(&conn->lock);

-       if (ep != NULL) {
+       /* block until event thread complete */
+       while (conn->refs)
+               dapl_os_sleep_usec(10000);
+
+       if (ep) {
                ep->cm_handle = NULL;
                ep->qp_handle = NULL;
                ep->qp_state = IB_QP_STATE_ERROR;
        }

-       cm_id = conn->cm_id;
-       conn->cm_id = NULL;
-       dapl_os_unlock(&conn->lock);
-
-       /*
-        * rdma_destroy_id will force synchronization with async CM event
-        * thread since it blocks until the in-process event reference
-        * is cleared during our event processing call exit.
-        */
-       if (cm_id) {
-               if (cm_id->qp)
-                       rdma_destroy_qp(cm_id);
-
-               rdma_destroy_id(cm_id);
+       if (conn->cm_id) {
+               if (conn->cm_id->qp)
+                       rdma_destroy_qp(conn->cm_id);
+               rdma_destroy_id(conn->cm_id);
        }
+
        dapl_os_free(conn, sizeof(*conn));
 }

@@ -255,6 +247,7 @@ static struct dapl_cm_id *dapli_req_recv(struct dapl_cm_id *conn,
                event->id->context = new_conn;  /* update CM_ID context */
                new_conn->sp = conn->sp;
                new_conn->hca = conn->hca;
+               new_conn->refs++;

                /* Get requesters connect data, setup for accept */
                new_conn->params.responder_resources =
@@ -308,17 +301,14 @@ static struct dapl_cm_id *dapli_req_recv(struct dapl_cm_id *conn,
 static void dapli_cm_active_cb(struct dapl_cm_id *conn,
                               struct rdma_cm_event *event)
 {
+       DAPL_OS_LOCK *lock = &conn->lock;
+       ib_cm_events_t ib_cm_event;
+       const void *pdata = NULL;
+
        dapl_dbg_log(DAPL_DBG_TYPE_CM,
                     " active_cb: conn %p id %d event %d\n",
                     conn, conn->cm_id, event->event);

-       dapl_os_lock(&conn->lock);
-       if (conn->destroy) {
-               dapl_os_unlock(&conn->lock);
-               return;
-       }
-       dapl_os_unlock(&conn->lock);
-
        /* There is a chance that we can get events after
         * the consumer calls disconnect in a pending state
         * since the IB CM and uDAPL states are not shared.
@@ -340,64 +330,53 @@ static void dapli_cm_active_cb(struct dapl_cm_id *conn,
                conn->ep->param.ep_state = DAT_EP_STATE_DISCONNECTED;

        dapl_os_unlock(&conn->ep->header.lock);
+       dapl_os_lock(lock);

        switch (event->event) {
        case RDMA_CM_EVENT_UNREACHABLE:
        case RDMA_CM_EVENT_CONNECT_ERROR:
-               {
+               dapl_log(DAPL_DBG_TYPE_WARN,
+                        "dapl_cma_active: CONN_ERR event=0x%x"
+                        " status=%d %s DST %s, %d\n",
+                        event->event, event->status,
+                        (event->status == -ETIMEDOUT) ? "TIMEOUT" : "",
+                        inet_ntoa(((struct sockaddr_in *)
+                                   &conn->cm_id->route.addr.dst_addr)->
+                                  sin_addr),
+                        ntohs(((struct sockaddr_in *)
+                               &conn->cm_id->route.addr.dst_addr)->
+                              sin_port));
+
+               /* per DAT SPEC provider always returns UNREACHABLE */
+               ib_cm_event = IB_CME_DESTINATION_UNREACHABLE;
+               break;
+       case RDMA_CM_EVENT_REJECTED:
+               dapl_dbg_log(DAPL_DBG_TYPE_CM,
+                            " dapli_cm_active_handler: REJECTED reason=%d\n",
+                            event->status);
+
+               /* valid REJ from consumer will always contain private data */
+               if (event->status == 28 &&
+                   event->param.conn.private_data_len) {
+                       ib_cm_event = IB_CME_DESTINATION_REJECT_PRIVATE_DATA;
+                       pdata =
+                           (unsigned char *)event->param.conn.
+                           private_data +
+                           sizeof(struct dapl_pdata_hdr);
+               } else {
+                       ib_cm_event = IB_CME_DESTINATION_REJECT;
                        dapl_log(DAPL_DBG_TYPE_WARN,
-                                "dapl_cma_active: CONN_ERR event=0x%x"
-                                " status=%d %s DST %s, %d\n",
-                                event->event, event->status,
-                                (event->status == -ETIMEDOUT) ? "TIMEOUT" : "",
+                                "dapl_cma_active: non-consumer REJ,"
+                                " reason=%d, DST %s, %d\n",
+                                event->status,
                                 inet_ntoa(((struct sockaddr_in *)
-                                           &conn->cm_id->route.addr.dst_addr)->
-                                          sin_addr),
+                                           &conn->cm_id->route.addr.
+                                           dst_addr)->sin_addr),
                                 ntohs(((struct sockaddr_in *)
-                                       &conn->cm_id->route.addr.dst_addr)->
-                                      sin_port));
-
-                       /* per DAT SPEC provider always returns UNREACHABLE */
-                       dapl_evd_connection_callback(conn,
-                                                    IB_CME_DESTINATION_UNREACHABLE,
-                                                    NULL, conn->ep);
-                       break;
-               }
-       case RDMA_CM_EVENT_REJECTED:
-               {
-                       ib_cm_events_t cm_event;
-                       unsigned char *pdata = NULL;
-
-                       dapl_dbg_log(DAPL_DBG_TYPE_CM,
-                                    " dapli_cm_active_handler: REJECTED reason=%d\n",
-                                    event->status);
-
-                       /* valid REJ from consumer will always contain private data */
-                       if (event->status == 28 &&
-                           event->param.conn.private_data_len) {
-                               cm_event =
-                                   IB_CME_DESTINATION_REJECT_PRIVATE_DATA;
-                               pdata =
-                                   (unsigned char *)event->param.conn.
-                                   private_data +
-                                   sizeof(struct dapl_pdata_hdr);
-                       } else {
-                               cm_event = IB_CME_DESTINATION_REJECT;
-                               dapl_log(DAPL_DBG_TYPE_WARN,
-                                        "dapl_cma_active: non-consumer REJ,"
-                                        " reason=%d, DST %s, %d\n",
-                                        event->status,
-                                        inet_ntoa(((struct sockaddr_in *)
-                                                   &conn->cm_id->route.addr.
-                                                   dst_addr)->sin_addr),
-                                        ntohs(((struct sockaddr_in *)
-                                               &conn->cm_id->route.addr.
-                                               dst_addr)->sin_port));
-                       }
-                       dapl_evd_connection_callback(conn, cm_event, pdata,
-                                                    conn->ep);
-                       break;
+                                       &conn->cm_id->route.addr.
+                                       dst_addr)->sin_port));
                }
+               break;
        case RDMA_CM_EVENT_ESTABLISHED:
                dapl_dbg_log(DAPL_DBG_TYPE_CM,
                             " active_cb: cm_id %d PORT %d CONNECTED to %s!\n",
@@ -414,58 +393,51 @@ static void dapli_cm_active_cb(struct dapl_cm_id *conn,
                conn->ep->param.local_port_qual =
                    PORT_TO_SID(rdma_get_src_port(conn->cm_id));

-               dapl_evd_connection_callback(conn, IB_CME_CONNECTED,
-                                            event->param.conn.private_data,
-                                            conn->ep);
+               ib_cm_event = IB_CME_CONNECTED;
+               pdata = event->param.conn.private_data;
                break;
-
        case RDMA_CM_EVENT_DISCONNECTED:
                dapl_dbg_log(DAPL_DBG_TYPE_CM,
                             " active_cb: DISC EVENT - EP %p\n",conn->ep);
                rdma_disconnect(conn->cm_id);   /* required for DREP */
+               ib_cm_event = IB_CME_DISCONNECTED;
                /* validate EP handle */
-               if (!DAPL_BAD_HANDLE(conn->ep, DAPL_MAGIC_EP))
-                       dapl_evd_connection_callback(conn,
-                                                    IB_CME_DISCONNECTED,
-                                                    NULL, conn->ep);
+               if (DAPL_BAD_HANDLE(conn->ep, DAPL_MAGIC_EP))
+                       conn = NULL;
                break;
        default:
                dapl_dbg_log(DAPL_DBG_TYPE_ERR,
                             " dapli_cm_active_cb_handler: Unexpected CM "
                             "event %d on ID 0x%p\n", event->event,
                             conn->cm_id);
+               conn = NULL;
                break;
        }

-       return;
+       dapl_os_unlock(lock);
+       if (conn)
+               dapl_evd_connection_callback(conn, ib_cm_event, pdata, conn->ep);
 }

 static void dapli_cm_passive_cb(struct dapl_cm_id *conn,
                                struct rdma_cm_event *event)
 {
-       struct dapl_cm_id *new_conn;
-
+       ib_cm_events_t ib_cm_event;
+       struct dapl_cm_id *conn_recv = conn;
+       const void *pdata = NULL;
+
        dapl_dbg_log(DAPL_DBG_TYPE_CM,
                     " passive_cb: conn %p id %d event %d\n",
                     conn, event->id, event->event);

        dapl_os_lock(&conn->lock);
-       if (conn->destroy) {
-               dapl_os_unlock(&conn->lock);
-               return;
-       }
-       dapl_os_unlock(&conn->lock);

        switch (event->event) {
        case RDMA_CM_EVENT_CONNECT_REQUEST:
                /* create new conn object with new conn_id from event */
-               new_conn = dapli_req_recv(conn, event);
-
-               if (new_conn)
-                       dapls_cr_callback(new_conn,
-                                         IB_CME_CONNECTION_REQUEST_PENDING,
-                                         event->param.conn.private_data,
-                                         new_conn->sp);
+               conn_recv = dapli_req_recv(conn, event);
+               ib_cm_event = IB_CME_CONNECTION_REQUEST_PENDING;
+               pdata = event->param.conn.private_data;
                break;
        case RDMA_CM_EVENT_UNREACHABLE:
        case RDMA_CM_EVENT_CONNECT_ERROR:
@@ -479,29 +451,22 @@ static void dapli_cm_passive_cb(struct dapl_cm_id *conn,
                                   sin_addr), ntohs(((struct sockaddr_in *)
                                                     &conn->cm_id->route.addr.
                                                     dst_addr)->sin_port));
-
-               dapls_cr_callback(conn, IB_CME_DESTINATION_UNREACHABLE,
-                                 NULL, conn->sp);
+               ib_cm_event = IB_CME_DESTINATION_UNREACHABLE;
                break;
-
        case RDMA_CM_EVENT_REJECTED:
-               {
-                       /* will alwasys be abnormal NON-consumer from active side */
-                       dapl_log(DAPL_DBG_TYPE_WARN,
-                                "dapl_cm_passive: non-consumer REJ, reason=%d,"
-                                " DST %s, %d\n",
-                                event->status,
-                                inet_ntoa(((struct sockaddr_in *)
-                                           &conn->cm_id->route.addr.dst_addr)->
-                                          sin_addr),
-                                ntohs(((struct sockaddr_in *)
-                                       &conn->cm_id->route.addr.dst_addr)->
-                                      sin_port));
-
-                       dapls_cr_callback(conn, IB_CME_DESTINATION_REJECT,
-                                         NULL, conn->sp);
-                       break;
-               }
+               /* will alwasys be abnormal NON-consumer from active side */
+               dapl_log(DAPL_DBG_TYPE_WARN,
+                        "dapl_cm_passive: non-consumer REJ, reason=%d,"
+                        " DST %s, %d\n",
+                        event->status,
+                        inet_ntoa(((struct sockaddr_in *)
+                                   &conn->cm_id->route.addr.dst_addr)->
+                                  sin_addr),
+                        ntohs(((struct sockaddr_in *)
+                               &conn->cm_id->route.addr.dst_addr)->
+                              sin_port));
+               ib_cm_event = IB_CME_DESTINATION_REJECT;
+               break;
        case RDMA_CM_EVENT_ESTABLISHED:
                dapl_dbg_log(DAPL_DBG_TYPE_CM,
                             " passive_cb: cm_id %p PORT %d CONNECTED from 0x%x!\n",
@@ -511,26 +476,27 @@ static void dapli_cm_passive_cb(struct dapl_cm_id *conn,
                             ntohl(((struct sockaddr_in *)
                                    &conn->cm_id->route.addr.dst_addr)->
                                   sin_addr.s_addr));
-
-               dapls_cr_callback(conn, IB_CME_CONNECTED, NULL, conn->sp);
-
+               ib_cm_event = IB_CME_CONNECTED;
                break;
        case RDMA_CM_EVENT_DISCONNECTED:
                rdma_disconnect(conn->cm_id);   /* required for DREP */
+               ib_cm_event = IB_CME_DISCONNECTED;
                /* validate SP handle context */
-               if (!DAPL_BAD_HANDLE(conn->sp, DAPL_MAGIC_PSP) ||
-                   !DAPL_BAD_HANDLE(conn->sp, DAPL_MAGIC_RSP))
-                       dapls_cr_callback(conn,
-                                         IB_CME_DISCONNECTED, NULL, conn->sp);
+               if (DAPL_BAD_HANDLE(conn->sp, DAPL_MAGIC_PSP) &&
+                   DAPL_BAD_HANDLE(conn->sp, DAPL_MAGIC_RSP))
+                       conn_recv = NULL;
                break;
        default:
                dapl_dbg_log(DAPL_DBG_TYPE_ERR, " passive_cb: "
                             "Unexpected CM event %d on ID 0x%p\n",
                             event->event, conn->cm_id);
+               conn_recv = NULL;
                break;
        }

-       return;
+       dapl_os_unlock(&conn->lock);
+       if (conn_recv)
+               dapls_cr_callback(conn_recv, ib_cm_event, pdata, conn_recv->sp);
 }

 /************************ DAPL provider entry points **********************/
@@ -713,6 +679,7 @@ dapls_ib_setup_conn_listener(IN DAPL_IA * ia_ptr,

        dapl_os_memzero(conn, sizeof(*conn));
        dapl_os_lock_init(&conn->lock);
+       conn->refs++;

        /* create CM_ID, bind to local device, create QP */
        if (rdma_create_id
@@ -1196,10 +1163,8 @@ ib_cm_events_t dapls_ib_get_cm_event(IN DAT_EVENT_NUMBER dat_event_num)
 void dapli_cma_event_cb(void)
 {
        struct rdma_cm_event *event;
-
-       dapl_dbg_log(DAPL_DBG_TYPE_UTIL, " cm_event()\n");
-
-       /* process one CM event, fairness */
+
+       /* process one CM event, fairness, non-blocking */
        if (!rdma_get_cm_event(g_cm_events, &event)) {
                struct dapl_cm_id *conn;

@@ -1212,6 +1177,16 @@ void dapli_cma_event_cb(void)
                dapl_dbg_log(DAPL_DBG_TYPE_CM,
                             " cm_event: EVENT=%d ID=%p LID=%p CTX=%p\n",
                             event->event, event->id, event->listen_id, conn);
+
+               /* cm_free is blocked waiting for ack  */
+               dapl_os_lock(&conn->lock);
+               if (!conn->refs) {
+                       dapl_os_unlock(&conn->lock);
+                       rdma_ack_cm_event(event);
+                       return;
+               }
+               conn->refs++;
+               dapl_os_unlock(&conn->lock);

                switch (event->event) {
                case RDMA_CM_EVENT_ADDR_RESOLVED:
@@ -1317,15 +1292,20 @@ void dapli_cma_event_cb(void)
 #endif
                        break;
                default:
-                       dapl_dbg_log(DAPL_DBG_TYPE_WARN,
+                       dapl_dbg_log(DAPL_DBG_TYPE_CM,
                                     " cm_event: UNEXPECTED EVENT=%p ID=%p CTX=%p\n",
                                     event->event, event->id,
                                     event->id->context);
                        break;
                }
+
                /* ack event, unblocks destroy_cm_id in consumer threads */
                rdma_ack_cm_event(event);
-       }
+
+               dapl_os_lock(&conn->lock);
+                conn->refs--;
+               dapl_os_unlock(&conn->lock);
+       }
 }

 /*
diff --git a/dapl/openib_cma/dapl_ib_util.h b/dapl/openib_cma/dapl_ib_util.h
index 35900e7..309db53 100755
--- a/dapl/openib_cma/dapl_ib_util.h
+++ b/dapl/openib_cma/dapl_ib_util.h
@@ -58,7 +58,7 @@

 struct dapl_cm_id {
        DAPL_OS_LOCK                    lock;
-       int                             destroy;
+       int                             refs;
        int                             arp_retries;
        int                             arp_timeout;
        int                             route_retries;
diff --git a/dapl/openib_cma/device.c b/dapl/openib_cma/device.c
index c1c1ee2..e9ec733 100644
--- a/dapl/openib_cma/device.c
+++ b/dapl/openib_cma/device.c
@@ -57,7 +57,7 @@ struct dapl_llist_entry *g_hca_list;
 #include "..\..\..\..\..\etc\user\comp_channel.cpp"
 #include <rdma\winverbs.h>

-struct ibvw_windata windata;
+static COMP_SET ufds;

 static int getipaddr_netdev(char *name, char *addr, int addr_len)
 {
@@ -101,14 +101,12 @@ release:

 static int dapls_os_init(void)
 {
-       return ibvw_get_windata(&windata, IBVW_WINDATA_VERSION);
+       return CompSetInit(&ufds);
 }

 static void dapls_os_release(void)
 {
-       if (windata.comp_mgr)
-               ibvw_release_windata(&windata, IBVW_WINDATA_VERSION);
-       windata.comp_mgr = NULL;
+       CompSetCleanup(&ufds);
 }

 static int dapls_config_cm_channel(struct rdma_event_channel *channel)
@@ -131,7 +129,7 @@ static int dapls_config_comp_channel(struct ibv_comp_channel *channel)

 static int dapls_thread_signal(void)
 {
-       CompManagerCancel(windata.comp_mgr);
+       CompSetCancel(&ufds);
        return 0;
 }
 #else                          // _WIN64 || WIN32
@@ -611,11 +609,16 @@ void dapli_thread(void *arg)
             g_ib_thread_state == IB_THREAD_RUN;
             dapl_os_lock(&g_hca_lock)) {

+               CompSetZero(&ufds);
+               CompSetAdd(&g_cm_events->channel, &ufds);
+
                idx = 0;
                hca = dapl_llist_is_empty(&g_hca_list) ? NULL :
                      dapl_llist_peek_head(&g_hca_list);

                while (hca) {
+                       CompSetAdd(&hca->ib_ctx->channel, &ufds);
+                       CompSetAdd(&hca->ib_cq->comp_channel, &ufds);
                        uhca[idx++] = hca;
                        hca = dapl_llist_next_entry(&g_hca_list,
                                                    (DAPL_LLIST_ENTRY *)
@@ -624,7 +627,7 @@ void dapli_thread(void *arg)
                cnt = idx;

                dapl_os_unlock(&g_hca_lock);
-               ret = CompManagerPoll(windata.comp_mgr, INFINITE, &channel);
+               ret = CompSetPoll(&ufds, INFINITE);

                dapl_dbg_log(DAPL_DBG_TYPE_UTIL,
                             " ib_thread(%d) poll_event 0x%x\n",
--
1.5.2.5




More information about the ofw mailing list