[openib-general] [PATCH] uDAPL changes to support async events

Arlin Davis arlin.r.davis at intel.com
Thu Sep 1 11:08:15 PDT 2005


James,

 

Here are the changes to support async events. Also consolidated the uAT,uCM,uCQ threads into one
processing thread.

 

Thanks,

 

-arlin

 

 

Signed-off-by: Arlin Davis ardavis at ichips.intel.com

 

Index: dapl/openib/dapl_ib_util.c
===================================================================
--- dapl/openib/dapl_ib_util.c      (revision 3293)
+++ dapl/openib/dapl_ib_util.c      (working copy)
@@ -55,13 +55,14 @@
 
 #include <stdlib.h>
 #include <netinet/tcp.h>
-#include <sys/utsname.h>
-#include <unistd.h>    
-#include <fcntl.h>
-#include <strings.h>
-
-int g_dapl_loopback_connection = 0;
+#include <sys/poll.h>
 
+int              g_dapl_loopback_connection = 0;
+int              g_ib_destroy = 0;
+int              g_ib_pipe[2];
+DAPL_OS_THREAD         g_ib_thread;
+DAPL_OS_LOCK           g_hca_lock;
+struct dapl_llist_entry      *g_hca_list;      
 
 /* just get IP address, IPv4 only for now  */
 int dapli_get_hca_addr( struct dapl_hca *hca_ptr )
@@ -130,7 +131,18 @@
 int32_t dapls_ib_init (void)
 {    
      dapl_dbg_log (DAPL_DBG_TYPE_UTIL, " dapl_ib_init: \n" );
-     if (dapli_cm_thread_init() || dapli_at_thread_init()) 
+
+     /* initialize hca_list lock */
+     dapl_os_lock_init(&g_hca_lock);
+     
+     /* initialize hca list for CQ events */
+     dapl_llist_init_head(&g_hca_list);
+
+     /* create pipe for waking up work thread */
+     if (pipe(g_ib_pipe))
+           return 1;
+
+     if (dapli_ib_thread_init()) 
            return 1;
 
      return 0;
@@ -139,8 +151,7 @@
 int32_t dapls_ib_release (void)
 {
      dapl_dbg_log (DAPL_DBG_TYPE_UTIL, " dapl_ib_release: \n" );
-     dapli_at_thread_destroy();
-     dapli_cm_thread_destroy();
+     dapli_ib_thread_destroy();
      return 0;
 }
 
@@ -196,6 +207,7 @@
                        ibv_get_device_name(hca_ptr->ib_trans.ib_dev) );
            return DAT_INTERNAL_ERROR;
      }
+     hca_ptr->ib_trans.ib_ctx = hca_ptr->ib_hca_handle;
 
      /* set inline max with enviromment or default, get local lid and gid 0 */
      hca_ptr->ib_trans.max_inline_send = 
@@ -223,19 +235,22 @@
            goto bail;
      }
 
-     /* one thread for each device open */
-     if (dapli_cq_thread_init(hca_ptr)) {
-           dapl_dbg_log (DAPL_DBG_TYPE_ERR, 
-                       " open_hca: cq_thread_init failed for %s\n", 
-                       ibv_get_device_name(hca_ptr->ib_trans.ib_dev) );
-           goto bail;
-     }
+     /* initialize hca wait object for uAT event */
+     dapl_os_wait_object_init(&hca_ptr->ib_trans.wait_object);
 
-     /* initialize cq_lock and wait object */
-     dapl_os_lock_init(&hca_ptr->ib_trans.cq_lock);
-     dapl_os_wait_object_init (&hca_ptr->ib_trans.wait_object);
-  
-     dapl_dbg_log (DAPL_DBG_TYPE_UTIL, 
+     /* 
+     * Put new hca_transport on list for async and CQ event processing 
+     * Wakeup work thread to add to polling list
+     */
+     dapl_llist_init_entry((DAPL_LLIST_ENTRY*)&hca_ptr->ib_trans.entry);
+     dapl_os_lock( &g_hca_lock );
+     dapl_llist_add_tail(&g_hca_list, 
+                     (DAPL_LLIST_ENTRY*)&hca_ptr->ib_trans.entry, 
+                     &hca_ptr->ib_trans.entry);
+     write(g_ib_pipe[1], "w", sizeof "w");
+     dapl_os_unlock(&g_hca_lock);
+     
+     dapl_dbg_log (DAPL_DBG_TYPE_UTIL, 
                  " open_hca: %s, port %d, %s  %d.%d.%d.%d INLINE_MAX=%d\n", 
                  ibv_get_device_name(hca_ptr->ib_trans.ib_dev), hca_ptr->port_num,
                  ((struct sockaddr_in *)&hca_ptr->hca_address)->sin_family == AF_INET ?
"AF_INET":"AF_INET6",
@@ -245,7 +260,6 @@
                  ((struct sockaddr_in *)&hca_ptr->hca_address)->sin_addr.s_addr >> 24 & 0xff,
                  hca_ptr->ib_trans.max_inline_send );
 
-
      return DAT_SUCCESS;
 
 bail:
@@ -276,16 +290,28 @@
      dapl_dbg_log (DAPL_DBG_TYPE_UTIL," close_hca: %p->%p\n",
                  hca_ptr,hca_ptr->ib_hca_handle);
 
-     dapli_cq_thread_destroy(hca_ptr);
-
      if (hca_ptr->ib_hca_handle != IB_INVALID_HANDLE) {
            if (ibv_close_device(hca_ptr->ib_hca_handle)) 
                  return(dapl_convert_errno(errno,"ib_close_device"));
            hca_ptr->ib_hca_handle = IB_INVALID_HANDLE;
      }
-     
-     dapl_os_lock_destroy(&hca_ptr->ib_trans.cq_lock);
 
+     /* 
+     * Remove hca from async and CQ event processing list
+     * Wakeup work thread to remove from polling list
+     */
+     hca_ptr->ib_trans.destroy = 1;
+     write(g_ib_pipe[1], "w", sizeof "w");
+
+     /* wait for thread to remove HCA references */
+     while (hca_ptr->ib_trans.destroy != 2) {
+           struct timespec   sleep, remain;
+           sleep.tv_sec = 0;
+           sleep.tv_nsec = 10000000; /* 10 ms */
+           dapl_dbg_log(DAPL_DBG_TYPE_UTIL, 
+                      " ib_thread_destroy: waiting on hca %p destroy\n");
+           nanosleep (&sleep, &remain);
+     }
      return (DAT_SUCCESS);
 }
   
@@ -432,31 +458,285 @@
      IN  void                *context )
 
 {
-    ib_hca_transport_t *hca_ptr;
+     ib_hca_transport_t      *hca_ptr;
 
-    dapl_dbg_log (DAPL_DBG_TYPE_UTIL,
-             " setup_async_cb: ia %p type %d handle %p cb %p ctx %p\n",
-             ia_ptr, handler_type, evd_ptr, callback, context);
-
-    hca_ptr = &ia_ptr->hca_ptr->ib_trans;
-    switch(handler_type)
-    {
-     case DAPL_ASYNC_UNAFILIATED:
-         hca_ptr->async_unafiliated = callback;
-         break;
-     case DAPL_ASYNC_CQ_ERROR:
-         hca_ptr->async_cq_error = callback;
-         break;
-     case DAPL_ASYNC_CQ_COMPLETION:
-        hca_ptr->async_cq = callback;
-        break;
-     case DAPL_ASYNC_QP_ERROR:
-         hca_ptr->async_qp_error = callback;
-         break;
-     default:
-         break;
-    }
-    return DAT_SUCCESS;
+     dapl_dbg_log (DAPL_DBG_TYPE_UTIL,
+                 " setup_async_cb: ia %p type %d handle %p cb %p ctx %p\n",
+                 ia_ptr, handler_type, evd_ptr, callback, context);
+
+     hca_ptr = &ia_ptr->hca_ptr->ib_trans;
+     switch(handler_type)
+     {
+           case DAPL_ASYNC_UNAFILIATED:
+                 hca_ptr->async_unafiliated = callback;
+                 hca_ptr->async_un_ctx = context;
+                 break;
+           case DAPL_ASYNC_CQ_ERROR:
+                 hca_ptr->async_cq_error = callback;
+                 hca_ptr->async_cq_ctx = context;
+                 break;
+           case DAPL_ASYNC_CQ_COMPLETION:
+                 hca_ptr->async_cq = callback;
+                 hca_ptr->async_ctx = context;
+                 break;
+           case DAPL_ASYNC_QP_ERROR:
+                 hca_ptr->async_qp_error = callback;
+                 hca_ptr->async_qp_ctx = context;
+                 break;
+           default:
+                 break;
+     }
+     return DAT_SUCCESS;
 }
 
+int dapli_ib_thread_init(void)
+{
+     DAT_RETURN dat_status;
+
+     dapl_dbg_log(DAPL_DBG_TYPE_UTIL,
+                " ib_thread_init(%d)\n", getpid());
+
+     /* create thread to process inbound connect request */
+     dat_status = dapl_os_thread_create(dapli_thread, NULL, &g_ib_thread);
+     if (dat_status != DAT_SUCCESS)
+     {
+           dapl_dbg_log(DAPL_DBG_TYPE_ERR, 
+                      " ib_thread_init: failed to create thread\n");
+           return 1;
+     }
+     return 0;
+}
+
+void dapli_ib_thread_destroy(void)
+{
+     dapl_dbg_log(DAPL_DBG_TYPE_UTIL,
+                " ib_thread_destroy(%d)\n", getpid());
+
+     /* destroy ib_thread, wait for termination */
+     g_ib_destroy = 1;
+     write(g_ib_pipe[1], "w", sizeof "w");
+     while (g_ib_destroy != 2) {
+           struct timespec   sleep, remain;
+           sleep.tv_sec = 0;
+           sleep.tv_nsec = 10000000; /* 10 ms */
+           dapl_dbg_log(DAPL_DBG_TYPE_UTIL, 
+                      " ib_thread_destroy: waiting for ib_thread\n");
+           nanosleep(&sleep, &remain);
+     }
+     dapl_dbg_log(DAPL_DBG_TYPE_UTIL,
+                " ib_thread_destroy(%d) exit\n",getpid());
+}
+
+void dapli_async_event_cb(struct _ib_hca_transport *hca)
+{
+     struct ibv_async_event  event;
+     struct pollfd     async_fd = {
+           .fd      = hca->ib_ctx->async_fd,
+           .events  = POLLIN,
+           .revents = 0
+     };
+     
+     dapl_dbg_log(DAPL_DBG_TYPE_UTIL, " dapli_async_event_cb(%p)\n",hca);
+
+     if (hca->destroy)
+           return;
+
+     if ((poll(&async_fd, 1, 0)==1) &&
+           (!ibv_get_async_event(hca->ib_ctx, &event))) {
+
+           switch (event.event_type) {
+
+                 case  IBV_EVENT_CQ_ERR:
+                 {
+                       dapl_dbg_log(DAPL_DBG_TYPE_WARN,
+                                  " dapli_async_event CQ ERR %d\n",
+                                  event.event_type);                   
+                       
+                       /* report up if async callback still setup */
+                       if (hca->async_cq_error)
+                             hca->async_cq_error(hca->ib_ctx,
+                                             &event,
+                                             hca->async_cq_ctx);
+                       break;
+                 }
+                 case  IBV_EVENT_COMM_EST:
+                 {
+                       /* Received messages on connected QP before RTU */
+                       struct dapl_ep *ep_ptr = event.element.qp->qp_context;
+                       
+                       /* TODO: cannot process COMM_EST until ibv  
+                       * guarantees valid QP context for events. 
+                       * Race conditions exist with QP destroy call. 
+                       * For now, assume the RTU will arrive.
+                       */
+                       dapl_dbg_log(DAPL_DBG_TYPE_UTIL,
+                                  " dapli_async_event COMM_EST (qp=%p)\n",
+                                  event.element.qp); 
+
+                       if (!DAPL_BAD_HANDLE(ep_ptr, DAPL_MAGIC_EP) &&
+                           ep_ptr->cm_handle != IB_INVALID_HANDLE)
+                             ib_cm_establish(ep_ptr->cm_handle->cm_id);
+                 
+                       break;
+                 }
+                 case  IBV_EVENT_QP_FATAL:
+                 case  IBV_EVENT_QP_REQ_ERR:
+                 case  IBV_EVENT_QP_ACCESS_ERR:
+                 case  IBV_EVENT_QP_LAST_WQE_REACHED:
+                 case  IBV_EVENT_SRQ_ERR:
+                 case  IBV_EVENT_SRQ_LIMIT_REACHED:
+                 case  IBV_EVENT_SQ_DRAINED:
+                 {
+                       dapl_dbg_log(DAPL_DBG_TYPE_WARN,
+                                  " dapli_async_event QP ERR %d\n",
+                                  event.event_type); 
+                       
+                       /* report up if async callback still setup */
+                       if (hca->async_qp_error)
+                             hca->async_qp_error(hca->ib_ctx,
+                                             &event,
+                                             hca->async_qp_ctx);
+                       break;
+                 }
+                 case  IBV_EVENT_PATH_MIG:
+                 case  IBV_EVENT_PATH_MIG_ERR:
+                 case  IBV_EVENT_DEVICE_FATAL:
+                 case  IBV_EVENT_PORT_ACTIVE:
+                 case  IBV_EVENT_PORT_ERR:
+                 case  IBV_EVENT_LID_CHANGE:
+                 case  IBV_EVENT_PKEY_CHANGE:
+                 case  IBV_EVENT_SM_CHANGE:
+                 {
+                       dapl_dbg_log(DAPL_DBG_TYPE_WARN,
+                                  " dapli_async_event DEV ERR %d\n",
+                                  event.event_type); 
+
+                       /* report up if async callback still setup */
+                       if (hca->async_unafiliated)
+                             hca->async_unafiliated( 
+                                         hca->ib_ctx,
+                                         &event,
+                                         hca->async_un_ctx);
+                       break;
+                 }
+                 default:
+                 {
+                       dapl_dbg_log (DAPL_DBG_TYPE_WARN,
+                                   "--> DsEventCb: UNKNOWN\n");
+                       break;
+                 }
+           }
+           ibv_put_async_event(&event);
+     }
+}
+
+
+/* work thread for uAT, uCM, CQ, and async events */
+void dapli_thread(void *arg) 
+{
+     struct pollfd           ufds[__FD_SETSIZE];
+     struct _ib_hca_transport *uhca[__FD_SETSIZE]={NULL};
+     struct _ib_hca_transport *hca;
+     int               ret,idx,fds;
+     char              rbuf[2];
+
+     dapl_dbg_log (DAPL_DBG_TYPE_UTIL,
+                 " ib_thread(%d,0x%x): ENTER: pipe %d cm %d at %d\n",
+                 getpid(), g_ib_thread, 
+                 g_ib_pipe[0], ib_cm_get_fd(), 
+                 ib_at_get_fd());
+
+     /* Poll across pipe, CM, AT never changes */
+     dapl_os_lock( &g_hca_lock );
+     
+     ufds[0].fd = g_ib_pipe[0];    /* pipe */
+     ufds[0].events = POLLIN;
+     ufds[1].fd = ib_cm_get_fd();  /* uCM */
+     ufds[1].events = POLLIN;
+     ufds[2].fd = ib_at_get_fd();  /* uAT */
+     ufds[2].events = POLLIN;
+           
+     while (!g_ib_destroy) {
+           
+           /* build ufds after pipe, cm, at events */
+           ufds[0].revents = 0;
+           ufds[1].revents = 0;
+           ufds[2].revents = 0;
+           idx=2;
+
+           /*  Walk HCA list and setup async and CQ events */
+           if (!dapl_llist_is_empty(&g_hca_list))
+                 hca = dapl_llist_peek_head(&g_hca_list);
+           else
+                 hca = NULL;
+           
+           while(hca) {
+                 int i;
+                 ufds[++idx].fd = hca->ib_ctx->async_fd;   /* uASYNC */
+                 ufds[idx].events = POLLIN;
+                 ufds[idx].revents = 0;
+                 uhca[idx] = hca;
+                 for (i=0;i<hca->ib_ctx->num_comp;i++) {   /* uCQ */
+                       ufds[++idx].fd = hca->ib_ctx->cq_fd[i];   
+                       ufds[idx].events = POLLIN;
+                       ufds[idx].revents = 0;
+                       uhca[idx] = hca;
+                 }
+                 hca = dapl_llist_next_entry(
+                             &g_hca_list,
+                             (DAPL_LLIST_ENTRY*)&hca->entry);
+           }
+           
+           /* unlock, and setup poll */
+           fds = idx+1;
+           dapl_os_unlock(&g_hca_lock);
+                ret = poll(ufds, fds, -1); 
+           if (ret <= 0) {
+                 dapl_dbg_log(DAPL_DBG_TYPE_WARN,
+                            " ib_thread(%d): ERR %s poll\n",
+                            getpid(),strerror(errno));
+                dapl_os_lock(&g_hca_lock);
+                 continue;
+           }
+
+           /* check and process CQ and ASYNC events, each open device */
+           for(idx=3;idx<fds;idx++) {
+                 if (ufds[idx].revents == POLLIN) {
+                       dapli_cq_event_cb(uhca[idx]);
+                       dapli_async_event_cb(uhca[idx]);
+                 }
+           }
+           
+           /* check and process user events */
+           if (ufds[0].revents == POLLIN) {
+
+                 read(g_ib_pipe[0], rbuf, 2);
+                 
+                 /* cleanup any device on list marked for destroy */
+                 for(idx=3;idx<fds;idx++) {
+                       if(uhca[idx] && uhca[idx]->destroy == 1) {
+                             dapl_os_lock(&g_hca_lock);
+                             dapl_llist_remove_entry(
+                                   &g_hca_list, 
+                                   (DAPL_LLIST_ENTRY*)
+                                         &uhca[idx]->entry);
+                             dapl_os_unlock(&g_hca_lock);
+                             uhca[idx]->destroy = 2;
+                       }
+                 }
+           }
+
+           /* CM and AT events */
+           if (ufds[1].revents == POLLIN)
+                 dapli_cm_event_cb();
+
+           if (ufds[2].revents == POLLIN)
+                 dapli_at_event_cb();
+
+           dapl_os_lock(&g_hca_lock);
+     }
+     dapl_dbg_log(DAPL_DBG_TYPE_UTIL," ib_thread(%d) EXIT\n",getpid());
+     g_ib_destroy = 2;
+     dapl_os_unlock(&g_hca_lock);  
+}
 
Index: dapl/openib/dapl_ib_cm.c
===================================================================
--- dapl/openib/dapl_ib_cm.c  (revision 3293)
+++ dapl/openib/dapl_ib_cm.c  (working copy)
@@ -70,90 +70,6 @@
 static inline uint64_t cpu_to_be64(uint64_t x) { return x; }
 #endif
 
-static int             g_at_destroy;
-static DAPL_OS_THREAD        g_at_thread;
-static int             g_cm_destroy;
-static DAPL_OS_THREAD        g_cm_thread;
-static DAPL_OS_LOCK          g_cm_lock;
-static struct dapl_llist_entry     *g_cm_list; 
-
-int dapli_cm_thread_init(void)
-{
-     DAT_RETURN dat_status;
-
-     dapl_dbg_log(DAPL_DBG_TYPE_CM," cm_thread_init(%d)\n", getpid());
-
-        /* initialize cr_list lock */
-     dapl_os_lock_init(&g_cm_lock);
-     
-     /* initialize CM list for listens on this HCA */
-     dapl_llist_init_head(&g_cm_list);
-
-     /* create thread to process inbound connect request */
-     dat_status = dapl_os_thread_create(cm_thread, NULL, &g_cm_thread);
-     if (dat_status != DAT_SUCCESS)
-     {
-           dapl_dbg_log(DAPL_DBG_TYPE_ERR, 
-                      " cm_thread_init: failed to create thread\n");
-           return 1;
-     }
-     return 0;
-}
-
-void dapli_cm_thread_destroy(void)
-{
-     dapl_dbg_log(DAPL_DBG_TYPE_CM," cm_thread_destroy(%d)\n", getpid());
-
-     /* destroy cr_thread and lock */
-     g_cm_destroy = 1;
-     pthread_kill( g_cm_thread, SIGUSR1 );
-     dapl_dbg_log(DAPL_DBG_TYPE_CM," cm_thread_destroy(%d) SIGUSR1 sent\n",getpid());
-     while (g_cm_destroy) {
-           struct timespec   sleep, remain;
-           sleep.tv_sec = 0;
-           sleep.tv_nsec = 10000000; /* 10 ms */
-           dapl_dbg_log(DAPL_DBG_TYPE_CM, 
-                      " cm_thread_destroy: waiting for cm_thread\n");
-           nanosleep (&sleep, &remain);
-     }
-     dapl_dbg_log(DAPL_DBG_TYPE_CM," cm_thread_destroy(%d) exit\n",getpid());
-}
-
-int dapli_at_thread_init(void)
-{
-     DAT_RETURN dat_status;
-
-     dapl_dbg_log(DAPL_DBG_TYPE_CM," at_thread_init(%d)\n", getpid());
-
-     /* create thread to process AT async requests */
-     dat_status = dapl_os_thread_create(at_thread, NULL, &g_at_thread);
-     if (dat_status != DAT_SUCCESS)
-     {
-           dapl_dbg_log(DAPL_DBG_TYPE_ERR, 
-                      " at_thread_init: failed to create thread\n");
-           return 1;
-     }
-     return 0;
-}
-
-void dapli_at_thread_destroy(void)
-{
-     dapl_dbg_log(DAPL_DBG_TYPE_CM," at_thread_destroy(%d)\n", getpid());
-
-     /* destroy cr_thread and lock */
-     g_at_destroy = 1;
-     pthread_kill( g_at_thread, SIGUSR1 );
-     dapl_dbg_log(DAPL_DBG_TYPE_CM," at_thread_destroy(%d) SIGUSR1 sent\n",getpid());
-     while (g_at_destroy) {
-           struct timespec   sleep, remain;
-           sleep.tv_sec = 0;
-           sleep.tv_nsec = 10000000; /* 10 ms */
-           dapl_dbg_log(DAPL_DBG_TYPE_CM, 
-                      " at_thread_destroy: waiting for at_thread\n");
-           nanosleep (&sleep, &remain);
-     }
-     dapl_dbg_log(DAPL_DBG_TYPE_CM," at_thread_destroy(%d) exit\n",getpid());
-}
 
 void dapli_ip_comp_handler(uint64_t req_id, void *context, int rec_num)
 {
@@ -348,12 +264,6 @@
            if (conn->ep)
                  conn->ep->cm_handle = IB_INVALID_HANDLE;
 
-           /* take off the CM thread work queue and free */
-           dapl_os_lock( &g_cm_lock );
-           dapl_llist_remove_entry(&g_cm_list, 
-                             (DAPL_LLIST_ENTRY*)&conn->entry);
-           dapl_os_unlock(&g_cm_lock);
-
            dapl_os_free(conn, sizeof(*conn));
      }
 }
@@ -426,8 +336,8 @@
      if (new_conn) {
 
            (void)dapl_os_memzero(new_conn, sizeof(*new_conn));
-           dapl_os_lock_init(&new_conn->lock);
            new_conn->cm_id = event->cm_id; /* provided by uCM */
+           event->cm_id->context = new_conn; /* update CM_ID context */
            new_conn->sp = conn->sp;
            new_conn->hca = conn->hca;
            new_conn->service_id = conn->service_id;
@@ -444,13 +354,6 @@
                        event->param.req_rcvd.primary_path,
                        sizeof(struct ib_sa_path_rec));
                        
-           /* put new CR on CM thread event work queue */
-           dapl_llist_init_entry((DAPL_LLIST_ENTRY*)&new_conn->entry);
-           dapl_os_lock( &g_cm_lock );
-           dapl_llist_add_tail(&g_cm_list, 
-                       (DAPL_LLIST_ENTRY*)&new_conn->entry, new_conn);
-           dapl_os_unlock(&g_cm_lock);
-
            dapl_dbg_log(DAPL_DBG_TYPE_CM, " passive_cb: "
                        "REQ on HCA %p SP %p SID %d L_ID %d new_id %d p_data %p\n",
                        new_conn->hca, new_conn->sp, 
@@ -521,18 +424,13 @@
            if (conn->ep)
                  conn->ep->cm_handle = IB_INVALID_HANDLE;
            
-           /* take off the CM thread work queue and free */
-           dapl_os_lock( &g_cm_lock );
-           dapl_llist_remove_entry(&g_cm_list, 
-                             (DAPL_LLIST_ENTRY*)&conn->entry);
-           dapl_os_unlock(&g_cm_lock);
            dapl_os_free(conn, sizeof(*conn));
      }
      return(destroy);
 }
 
 static int dapli_cm_passive_cb(struct dapl_cm_id *conn,
-                             struct ib_cm_event *event)
+                        struct ib_cm_event *event)
 {
      int destroy;
      struct dapl_cm_id *new_conn;
@@ -541,9 +439,6 @@
                 " passive_cb: conn %p id %d event %d\n",
                 conn, conn->cm_id, event->event );
 
-     if (conn->cm_id == 0)
-           return 0;
-
      dapl_os_lock(&conn->lock);
      if (conn->destroy) {
            dapl_os_unlock(&conn->lock);
@@ -608,155 +503,11 @@
            if (conn->ep)
                  conn->ep->cm_handle = IB_INVALID_HANDLE;
 
-           /* take off the CM thread work queue and free */
-           dapl_os_lock( &g_cm_lock );
-           dapl_llist_remove_entry(&g_cm_list, 
-                             (DAPL_LLIST_ENTRY*)&conn->entry);
-           dapl_os_unlock(&g_cm_lock);
-
            dapl_os_free(conn, sizeof(*conn));
      }
      return(destroy);
 }
 
-/* something to catch the signal */
-static void ib_sig_handler(int signum)
-{
-     return;
-}
-
-/* async CM processing thread */
-void cm_thread(void *arg) 
-{
-     struct dapl_cm_id *conn, *next_conn;
-     struct ib_cm_event      *event;
-     struct pollfd           ufds;
-     sigset_t          sigset;
-
-     dapl_dbg_log (DAPL_DBG_TYPE_CM,
-                 " cm_thread(%d,0x%x): ENTER: cm_fd %d\n",
-                 getpid(), g_cm_thread, ib_cm_get_fd());
-
-    sigemptyset(&sigset);
-     sigaddset(&sigset, SIGUSR1);
-    pthread_sigmask(SIG_UNBLOCK, &sigset, NULL);
-     signal( SIGUSR1, ib_sig_handler); 
-     
-     dapl_os_lock( &g_cm_lock );
-     while (!g_cm_destroy) {
-           struct ib_cm_id *cm_id;
-           int ret;
-
-           /* select for CM event, all events process via cm_fd */
-                ufds.fd      = ib_cm_get_fd();
-                ufds.events  = POLLIN;
-                ufds.revents = 0;
-
-           dapl_os_unlock(&g_cm_lock);
-                ret = poll(&ufds, 1, -1); 
-           if (ret <= 0) {
-                 dapl_dbg_log(DAPL_DBG_TYPE_CM,
-                            " cm_thread(%d): ERR %s poll\n",
-                            getpid(),strerror(errno));
-                dapl_os_lock(&g_cm_lock);
-                 continue;
-           }
-
-           dapl_dbg_log(DAPL_DBG_TYPE_CM,
-                 " cm_thread: GET EVENT fd=%d n=%d\n",
-                 ib_cm_get_fd(),ret);
-
-           if (ib_cm_event_get_timed(0,&event)) { 
-                 dapl_dbg_log(DAPL_DBG_TYPE_CM,
-                       " cm_thread: ERR %s event_get on %d\n", 
-                       strerror(errno), ib_cm_get_fd() );
-                 dapl_os_lock(&g_cm_lock);
-                 continue;
-           }
-           dapl_dbg_log(DAPL_DBG_TYPE_CM,
-                 " cm_thread: GET EVENT fd=%d woke\n",ib_cm_get_fd());
-           dapl_os_lock(&g_cm_lock);
-
-           /* set proper cm_id */
-           if (event->event == IB_CM_REQ_RECEIVED ||
-                 event->event == IB_CM_SIDR_REQ_RECEIVED)
-                 cm_id = event->param.req_rcvd.listen_id;
-           else
-                 cm_id = event->cm_id;
-
-           dapl_dbg_log (DAPL_DBG_TYPE_CM,
-                 " cm_thread: EVENT event(%d) cm_id=%d (%d)\n",
-                 event->event, event->cm_id, cm_id );
-
-           /* 
-           * Walk cm_list looking for connection id in event
-           * no need to walk if uCM would provide context with event
-           */
-           if (!dapl_llist_is_empty(&g_cm_list))
-                 next_conn = dapl_llist_peek_head(&g_cm_list);
-           else
-                 next_conn = NULL;
-
-           ret = 0;
-           while (next_conn) {
-                 conn = next_conn;
-                 dapl_dbg_log(DAPL_DBG_TYPE_CM,
-                      " cm_thread: LIST cm %p c_id %d e_id %d)\n", 
-                      conn, conn->cm_id, cm_id );
-                       
-                 next_conn = dapl_llist_next_entry(
-                             &g_cm_list,
-                             (DAPL_LLIST_ENTRY*)&conn->entry );
-
-                 if (cm_id == conn->cm_id) {
-                       dapl_os_unlock(&g_cm_lock);
-                       if (conn->sp)
-                             ret = dapli_cm_passive_cb(conn,event);
-                       else
-                             ret = dapli_cm_active_cb(conn,event);
-                       dapl_os_lock(&g_cm_lock);
-                       break;
-                 }
-           } 
-           ib_cm_event_put(event);
-           if (ret) {
-                 dapl_dbg_log(DAPL_DBG_TYPE_CM,
-                      " cm_thread: destroy cm_id %d\n",cm_id);
-                 ib_cm_destroy_id(cm_id);
-           }
-     }
-     dapl_os_unlock(&g_cm_lock);   
-     dapl_dbg_log(DAPL_DBG_TYPE_CM," cm_thread(%d) EXIT, cm_list=%s\n",
-                getpid(),dapl_llist_is_empty(&g_cm_list) ? "EMPTY":"NOT EMPTY");
-     g_cm_destroy = 0;
-}
-
-/* async AT processing thread */
-void at_thread(void *arg) 
-{
-     sigset_t sigset;
-
-     dapl_dbg_log (DAPL_DBG_TYPE_CM,
-                 " at_thread(%d,0x%x): ENTER: at_fd %d\n",
-                 getpid(), g_at_thread, ib_at_get_fd());
-
-    sigemptyset(&sigset);
-     sigaddset(&sigset, SIGUSR1);
-    pthread_sigmask(SIG_UNBLOCK, &sigset, NULL);
-     signal(SIGUSR1, ib_sig_handler); 
-     
-     while (!g_at_destroy) {
-           /* poll forever until callback or signal */
-           if (ib_at_callback_get_timed(-1) < 0) { 
-                 dapl_dbg_log(DAPL_DBG_TYPE_CM,
-                       " at_thread: SIG? ret=%s, destroy=%d\n", 
-                       strerror(errno), g_at_destroy );
-           }
-           dapl_dbg_log(DAPL_DBG_TYPE_CM," at_thread: callback woke\n");
-     }
-     dapl_dbg_log(DAPL_DBG_TYPE_CM," at_thread(%d) EXIT \n", getpid());
-     g_at_destroy = 0;
-}
 
 /************************ DAPL provider entry points **********************/
 
@@ -853,13 +604,6 @@
      conn->retries = 0;
      dapl_os_memcpy(&conn->r_addr, r_addr, sizeof(DAT_SOCK_ADDR6));
      
-     /* put on CM thread work queue */
-     dapl_llist_init_entry((DAPL_LLIST_ENTRY*)&conn->entry);
-     dapl_os_lock( &g_cm_lock );
-     dapl_llist_add_tail(&g_cm_list, 
-                     (DAPL_LLIST_ENTRY*)&conn->entry, conn);
-     dapl_os_unlock(&g_cm_lock);
-
      status = ib_at_route_by_ip(
            ((struct sockaddr_in *)&conn->r_addr)->sin_addr.s_addr, 
            ((struct sockaddr_in *)&conn->hca->hca_address)->sin_addr.s_addr, 
@@ -1019,13 +763,6 @@
      conn->hca = ia_ptr->hca_ptr;
      conn->service_id = ServiceID;
 
-     /* put on CM thread work queue */
-     dapl_llist_init_entry((DAPL_LLIST_ENTRY*)&conn->entry);
-     dapl_os_lock( &g_cm_lock );
-     dapl_llist_add_tail(&g_cm_list, 
-                 (DAPL_LLIST_ENTRY*)&conn->entry, conn);
-     dapl_os_unlock(&g_cm_lock);
-
      dapl_dbg_log(DAPL_DBG_TYPE_EP,
                 " setup_listener(conn=%p cm_id=%d)\n",
                 sp_ptr->cm_srvc_handle,conn->cm_id);
@@ -1345,8 +1082,6 @@
      return size;
 }
 
-#ifndef SOCKET_CM
-
 /*
  * Map all socket CM event codes to the DAT equivelent.
  */
@@ -1457,7 +1192,44 @@
     return ib_cm_event;
 }
 
-#endif
+void dapli_cm_event_cb()
+{
+     struct ib_cm_event *event;
+           
+     dapl_dbg_log(DAPL_DBG_TYPE_UTIL, " dapli_cm_event()\n");
+
+     /* process one CM event, fairness */
+     if(!ib_cm_event_get_timed(0,&event)) {
+           struct dapl_cm_id *conn;
+           int               ret;
+           dapl_dbg_log(DAPL_DBG_TYPE_CM,
+                      " dapli_cm_event: EVENT=%p ID=%p CTX=%p\n",
+                      event->event, event->cm_id, 
+                      event->cm_id->context);
+           
+           /* set proper conn from cm_id context*/
+           conn = (struct dapl_cm_id*)event->cm_id->context;
+
+           if (conn->sp) 
+                 ret = dapli_cm_passive_cb(conn,event);
+           else 
+                 ret = dapli_cm_active_cb(conn,event);
+           
+           ib_cm_event_put(event);
+
+           if (ret) 
+                 ib_cm_destroy_id(conn->cm_id);
+     }
+}
+
+void dapli_at_event_cb()
+{
+     dapl_dbg_log(DAPL_DBG_TYPE_UTIL, " dapli_at_event_cb()\n");
+
+     /* process one AT event, fairness */
+     ib_at_callback_get_timed(0);
+}
+
 
 /*
  * Local variables:
Index: dapl/openib/dapl_ib_util.h
===================================================================
--- dapl/openib/dapl_ib_util.h      (revision 3293)
+++ dapl/openib/dapl_ib_util.h      (working copy)
@@ -231,18 +231,22 @@
 /* ib_hca_transport_t, specific to this implementation */
 typedef struct _ib_hca_transport
 { 
-     struct      ibv_device  *ib_dev;
+     struct ib_llist_entry   entry;
+     int               destroy;
+     struct ibv_device *ib_dev;
+     struct ibv_context      *ib_ctx;
      ib_cq_handle_t          ib_cq_empty;
-     DAPL_OS_LOCK            cq_lock;
      DAPL_OS_WAIT_OBJECT     wait_object;
-     int               cq_destroy;
-     DAPL_OS_THREAD          cq_thread;
      int               max_inline_send;
      union ibv_gid           gid;
      ib_async_handler_t      async_unafiliated;
+     void              *async_un_ctx;
      ib_async_handler_t      async_cq_error;
+     void              *async_ctx;
      ib_async_handler_t      async_cq;
+     void              *async_cq_ctx;
      ib_async_handler_t      async_qp_error;
+     void              *async_qp_ctx;
 
 } ib_hca_transport_t;
 
@@ -252,21 +256,15 @@
 /* prototypes */
 int32_t    dapls_ib_init (void);
 int32_t    dapls_ib_release (void);
-void cm_thread (void *arg);
-int dapli_cm_thread_init(void);
-void dapli_cm_thread_destroy(void);
-void at_thread (void *arg);
-int dapli_at_thread_init(void);
-void dapli_at_thread_destroy(void);
-void cq_thread (void *arg);
-int dapli_cq_thread_init(struct dapl_hca *hca_ptr);
-void dapli_cq_thread_destroy(struct dapl_hca *hca_ptr);
-
-int dapli_get_lid(struct dapl_hca *hca_ptr, int port, uint16_t *lid);
-int dapli_get_gid(struct dapl_hca *hca_ptr, int port, int index, 
-             union ibv_gid *gid);
-int dapli_get_hca_addr(struct dapl_hca *hca_ptr);
+void dapli_thread(void *arg);
+int  dapli_ib_thread_init(void);
+void dapli_ib_thread_destroy(void);
+int  dapli_get_hca_addr(struct dapl_hca *hca_ptr);
 void dapli_ip_comp_handler(uint64_t req_id, void *context, int rec_num);
+void dapli_cm_event_cb(void);
+void dapli_at_event_cb(void);
+void dapli_cq_event_cb(struct _ib_hca_transport *hca);
+void dapli_async_event_cb(struct _ib_hca_transport *hca);
 
 DAT_RETURN
 dapls_modify_qp_state ( IN ib_qp_handle_t      qp_handle,
Index: dapl/openib/dapl_ib_cq.c
===================================================================
--- dapl/openib/dapl_ib_cq.c  (revision 3293)
+++ dapl/openib/dapl_ib_cq.c  (working copy)
@@ -52,94 +52,40 @@
 #include "dapl_evd_util.h"
 #include "dapl_ring_buffer_util.h"
 #include <sys/poll.h>
-#include <signal.h>
 
-int dapli_cq_thread_init(struct dapl_hca *hca_ptr)
+void dapli_cq_event_cb(struct _ib_hca_transport *hca)
 {
-        DAT_RETURN dat_status;
+     int i;
+     dapl_dbg_log(DAPL_DBG_TYPE_UTIL," dapli_cq_event_cb(%p)\n", hca);
 
-        dapl_dbg_log(DAPL_DBG_TYPE_UTIL," cq_thread_init(%p)\n", hca_ptr);
-
-        /* create thread to process inbound connect request */
-        dat_status = dapl_os_thread_create( cq_thread,
(void*)hca_ptr,&hca_ptr->ib_trans.cq_thread);
-        if (dat_status != DAT_SUCCESS)
-        {
-                dapl_dbg_log(DAPL_DBG_TYPE_ERR,
-                             " cq_thread_init: failed to create thread\n");
-                return 1;
-        }
-        return 0;
-}
-
-void dapli_cq_thread_destroy(struct dapl_hca *hca_ptr)
-{
-        dapl_dbg_log(DAPL_DBG_TYPE_UTIL," cq_thread_destroy(%p)\n", hca_ptr);
-
-        /* destroy cr_thread and lock */
-        hca_ptr->ib_trans.cq_destroy = 1;
-        pthread_kill(hca_ptr->ib_trans.cq_thread, SIGUSR1);
-        dapl_dbg_log(DAPL_DBG_TYPE_CM," cq_thread_destroy(%p) SIGUSR1 sent\n",hca_ptr);
-        while (hca_ptr->ib_trans.cq_destroy != 2) {
-                struct timespec sleep, remain;
-                sleep.tv_sec = 0;
-                sleep.tv_nsec = 10000000; /* 10 ms */
-                dapl_dbg_log(DAPL_DBG_TYPE_UTIL,
-                             " cq_thread_destroy: waiting for cq_thread\n");
-                nanosleep (&sleep, &remain);
-        }
-        dapl_dbg_log(DAPL_DBG_TYPE_UTIL," cq_thread_destroy(%d) exit\n",getpid());
-     return;
-}
-
-/* something to catch the signal */
-static void ib_cq_handler(int signum)
-{
-        return;
-}
-
-void cq_thread( void *arg )
-{
-     struct dapl_hca   *hca_ptr = arg;
-     struct dapl_evd   *evd_ptr;
-     struct ibv_cq     *ibv_cq = NULL;
-     sigset_t    sigset;
-     int         status = 0; 
-
-     dapl_dbg_log ( DAPL_DBG_TYPE_UTIL," cq_thread: ENTER hca %p\n",hca_ptr);
-  
-        sigemptyset(&sigset);
-        sigaddset(&sigset,SIGUSR1);
-        pthread_sigmask(SIG_UNBLOCK, &sigset, NULL);
-        signal(SIGUSR1, ib_cq_handler);
-
-     /* wait on DTO event, or signal to abort */
-     while (!hca_ptr->ib_trans.cq_destroy) {
-
-           struct pollfd cq_poll = {
-                 .fd      = hca_ptr->ib_hca_handle->cq_fd[0],
+     /* check all comp events on this device */
+     for(i=0;i<hca->ib_ctx->num_comp;i++) {
+           struct dapl_evd   *evd_ptr = NULL;
+           struct ibv_cq     *ibv_cq = NULL;
+           struct pollfd     cq_fd = {
+                 .fd      = hca->ib_ctx->cq_fd[i],
                  .events  = POLLIN,
                  .revents = 0
            };
-
-           status = poll(&cq_poll, 1, -1);
-           if ((status == 1) &&
-                 (!ibv_get_cq_event(hca_ptr->ib_hca_handle, 0, &ibv_cq, (void*)&evd_ptr))) {
-     
+           if ((poll(&cq_fd, 1, 0) == 1) &&
+                 (!ibv_get_cq_event(hca->ib_ctx, i, 
+                                &ibv_cq, (void*)&evd_ptr))) {
+
+                 /* 
+                 * TODO: ibv put event to protect against
+                 * destroy CQ race conditions?
+                 */
                  if (DAPL_BAD_HANDLE(evd_ptr, DAPL_MAGIC_EVD))
                        continue;
 
                  /* process DTO event via callback */
-                 dapl_evd_dto_callback ( evd_ptr->header.owner_ia->hca_ptr->ib_hca_handle,
+                 dapl_evd_dto_callback ( hca->ib_ctx,
                                    evd_ptr->ib_cq_handle,
                                    (void*)evd_ptr );
-           } else {
-
-           }
-     } 
-     hca_ptr->ib_trans.cq_destroy = 2;
-     dapl_dbg_log(DAPL_DBG_TYPE_UTIL," cq_thread: EXIT: hca %p \n", hca_ptr);
-     return;
+           } 
+     }
 }
+
 /*
  * Map all verbs DTO completion codes to the DAT equivelent.
  *
 

 

 

-------------- next part --------------
An HTML attachment was scrubbed...
URL: <http://lists.openfabrics.org/pipermail/general/attachments/20050901/86818698/attachment.html>


More information about the general mailing list