[openib-general] [PATCH] uDAPL changes to support async events
Arlin Davis
arlin.r.davis at intel.com
Thu Sep 1 11:08:15 PDT 2005
James,
Here are the changes to support async events. Also consolidated the uAT,uCM,uCQ threads into one
processing thread.
Thanks,
-arlin
Signed-off-by: Arlin Davis ardavis at ichips.intel.com
Index: dapl/openib/dapl_ib_util.c
===================================================================
--- dapl/openib/dapl_ib_util.c (revision 3293)
+++ dapl/openib/dapl_ib_util.c (working copy)
@@ -55,13 +55,14 @@
#include <stdlib.h>
#include <netinet/tcp.h>
-#include <sys/utsname.h>
-#include <unistd.h>
-#include <fcntl.h>
-#include <strings.h>
-
-int g_dapl_loopback_connection = 0;
+#include <sys/poll.h>
+int g_dapl_loopback_connection = 0;
+int g_ib_destroy = 0;
+int g_ib_pipe[2];
+DAPL_OS_THREAD g_ib_thread;
+DAPL_OS_LOCK g_hca_lock;
+struct dapl_llist_entry *g_hca_list;
/* just get IP address, IPv4 only for now */
int dapli_get_hca_addr( struct dapl_hca *hca_ptr )
@@ -130,7 +131,18 @@
int32_t dapls_ib_init (void)
{
dapl_dbg_log (DAPL_DBG_TYPE_UTIL, " dapl_ib_init: \n" );
- if (dapli_cm_thread_init() || dapli_at_thread_init())
+
+ /* initialize hca_list lock */
+ dapl_os_lock_init(&g_hca_lock);
+
+ /* initialize hca list for CQ events */
+ dapl_llist_init_head(&g_hca_list);
+
+ /* create pipe for waking up work thread */
+ if (pipe(g_ib_pipe))
+ return 1;
+
+ if (dapli_ib_thread_init())
return 1;
return 0;
@@ -139,8 +151,7 @@
int32_t dapls_ib_release (void)
{
dapl_dbg_log (DAPL_DBG_TYPE_UTIL, " dapl_ib_release: \n" );
- dapli_at_thread_destroy();
- dapli_cm_thread_destroy();
+ dapli_ib_thread_destroy();
return 0;
}
@@ -196,6 +207,7 @@
ibv_get_device_name(hca_ptr->ib_trans.ib_dev) );
return DAT_INTERNAL_ERROR;
}
+ hca_ptr->ib_trans.ib_ctx = hca_ptr->ib_hca_handle;
/* set inline max with enviromment or default, get local lid and gid 0 */
hca_ptr->ib_trans.max_inline_send =
@@ -223,19 +235,22 @@
goto bail;
}
- /* one thread for each device open */
- if (dapli_cq_thread_init(hca_ptr)) {
- dapl_dbg_log (DAPL_DBG_TYPE_ERR,
- " open_hca: cq_thread_init failed for %s\n",
- ibv_get_device_name(hca_ptr->ib_trans.ib_dev) );
- goto bail;
- }
+ /* initialize hca wait object for uAT event */
+ dapl_os_wait_object_init(&hca_ptr->ib_trans.wait_object);
- /* initialize cq_lock and wait object */
- dapl_os_lock_init(&hca_ptr->ib_trans.cq_lock);
- dapl_os_wait_object_init (&hca_ptr->ib_trans.wait_object);
-
- dapl_dbg_log (DAPL_DBG_TYPE_UTIL,
+ /*
+ * Put new hca_transport on list for async and CQ event processing
+ * Wakeup work thread to add to polling list
+ */
+ dapl_llist_init_entry((DAPL_LLIST_ENTRY*)&hca_ptr->ib_trans.entry);
+ dapl_os_lock( &g_hca_lock );
+ dapl_llist_add_tail(&g_hca_list,
+ (DAPL_LLIST_ENTRY*)&hca_ptr->ib_trans.entry,
+ &hca_ptr->ib_trans.entry);
+ write(g_ib_pipe[1], "w", sizeof "w");
+ dapl_os_unlock(&g_hca_lock);
+
+ dapl_dbg_log (DAPL_DBG_TYPE_UTIL,
" open_hca: %s, port %d, %s %d.%d.%d.%d INLINE_MAX=%d\n",
ibv_get_device_name(hca_ptr->ib_trans.ib_dev), hca_ptr->port_num,
((struct sockaddr_in *)&hca_ptr->hca_address)->sin_family == AF_INET ?
"AF_INET":"AF_INET6",
@@ -245,7 +260,6 @@
((struct sockaddr_in *)&hca_ptr->hca_address)->sin_addr.s_addr >> 24 & 0xff,
hca_ptr->ib_trans.max_inline_send );
-
return DAT_SUCCESS;
bail:
@@ -276,16 +290,28 @@
dapl_dbg_log (DAPL_DBG_TYPE_UTIL," close_hca: %p->%p\n",
hca_ptr,hca_ptr->ib_hca_handle);
- dapli_cq_thread_destroy(hca_ptr);
-
if (hca_ptr->ib_hca_handle != IB_INVALID_HANDLE) {
if (ibv_close_device(hca_ptr->ib_hca_handle))
return(dapl_convert_errno(errno,"ib_close_device"));
hca_ptr->ib_hca_handle = IB_INVALID_HANDLE;
}
-
- dapl_os_lock_destroy(&hca_ptr->ib_trans.cq_lock);
+ /*
+ * Remove hca from async and CQ event processing list
+ * Wakeup work thread to remove from polling list
+ */
+ hca_ptr->ib_trans.destroy = 1;
+ write(g_ib_pipe[1], "w", sizeof "w");
+
+ /* wait for thread to remove HCA references */
+ while (hca_ptr->ib_trans.destroy != 2) {
+ struct timespec sleep, remain;
+ sleep.tv_sec = 0;
+ sleep.tv_nsec = 10000000; /* 10 ms */
+ dapl_dbg_log(DAPL_DBG_TYPE_UTIL,
+ " ib_thread_destroy: waiting on hca %p destroy\n");
+ nanosleep (&sleep, &remain);
+ }
return (DAT_SUCCESS);
}
@@ -432,31 +458,285 @@
IN void *context )
{
- ib_hca_transport_t *hca_ptr;
+ ib_hca_transport_t *hca_ptr;
- dapl_dbg_log (DAPL_DBG_TYPE_UTIL,
- " setup_async_cb: ia %p type %d handle %p cb %p ctx %p\n",
- ia_ptr, handler_type, evd_ptr, callback, context);
-
- hca_ptr = &ia_ptr->hca_ptr->ib_trans;
- switch(handler_type)
- {
- case DAPL_ASYNC_UNAFILIATED:
- hca_ptr->async_unafiliated = callback;
- break;
- case DAPL_ASYNC_CQ_ERROR:
- hca_ptr->async_cq_error = callback;
- break;
- case DAPL_ASYNC_CQ_COMPLETION:
- hca_ptr->async_cq = callback;
- break;
- case DAPL_ASYNC_QP_ERROR:
- hca_ptr->async_qp_error = callback;
- break;
- default:
- break;
- }
- return DAT_SUCCESS;
+ dapl_dbg_log (DAPL_DBG_TYPE_UTIL,
+ " setup_async_cb: ia %p type %d handle %p cb %p ctx %p\n",
+ ia_ptr, handler_type, evd_ptr, callback, context);
+
+ hca_ptr = &ia_ptr->hca_ptr->ib_trans;
+ switch(handler_type)
+ {
+ case DAPL_ASYNC_UNAFILIATED:
+ hca_ptr->async_unafiliated = callback;
+ hca_ptr->async_un_ctx = context;
+ break;
+ case DAPL_ASYNC_CQ_ERROR:
+ hca_ptr->async_cq_error = callback;
+ hca_ptr->async_cq_ctx = context;
+ break;
+ case DAPL_ASYNC_CQ_COMPLETION:
+ hca_ptr->async_cq = callback;
+ hca_ptr->async_ctx = context;
+ break;
+ case DAPL_ASYNC_QP_ERROR:
+ hca_ptr->async_qp_error = callback;
+ hca_ptr->async_qp_ctx = context;
+ break;
+ default:
+ break;
+ }
+ return DAT_SUCCESS;
}
+int dapli_ib_thread_init(void)
+{
+ DAT_RETURN dat_status;
+
+ dapl_dbg_log(DAPL_DBG_TYPE_UTIL,
+ " ib_thread_init(%d)\n", getpid());
+
+ /* create thread to process inbound connect request */
+ dat_status = dapl_os_thread_create(dapli_thread, NULL, &g_ib_thread);
+ if (dat_status != DAT_SUCCESS)
+ {
+ dapl_dbg_log(DAPL_DBG_TYPE_ERR,
+ " ib_thread_init: failed to create thread\n");
+ return 1;
+ }
+ return 0;
+}
+
+void dapli_ib_thread_destroy(void)
+{
+ dapl_dbg_log(DAPL_DBG_TYPE_UTIL,
+ " ib_thread_destroy(%d)\n", getpid());
+
+ /* destroy ib_thread, wait for termination */
+ g_ib_destroy = 1;
+ write(g_ib_pipe[1], "w", sizeof "w");
+ while (g_ib_destroy != 2) {
+ struct timespec sleep, remain;
+ sleep.tv_sec = 0;
+ sleep.tv_nsec = 10000000; /* 10 ms */
+ dapl_dbg_log(DAPL_DBG_TYPE_UTIL,
+ " ib_thread_destroy: waiting for ib_thread\n");
+ nanosleep(&sleep, &remain);
+ }
+ dapl_dbg_log(DAPL_DBG_TYPE_UTIL,
+ " ib_thread_destroy(%d) exit\n",getpid());
+}
+
+void dapli_async_event_cb(struct _ib_hca_transport *hca)
+{
+ struct ibv_async_event event;
+ struct pollfd async_fd = {
+ .fd = hca->ib_ctx->async_fd,
+ .events = POLLIN,
+ .revents = 0
+ };
+
+ dapl_dbg_log(DAPL_DBG_TYPE_UTIL, " dapli_async_event_cb(%p)\n",hca);
+
+ if (hca->destroy)
+ return;
+
+ if ((poll(&async_fd, 1, 0)==1) &&
+ (!ibv_get_async_event(hca->ib_ctx, &event))) {
+
+ switch (event.event_type) {
+
+ case IBV_EVENT_CQ_ERR:
+ {
+ dapl_dbg_log(DAPL_DBG_TYPE_WARN,
+ " dapli_async_event CQ ERR %d\n",
+ event.event_type);
+
+ /* report up if async callback still setup */
+ if (hca->async_cq_error)
+ hca->async_cq_error(hca->ib_ctx,
+ &event,
+ hca->async_cq_ctx);
+ break;
+ }
+ case IBV_EVENT_COMM_EST:
+ {
+ /* Received messages on connected QP before RTU */
+ struct dapl_ep *ep_ptr = event.element.qp->qp_context;
+
+ /* TODO: cannot process COMM_EST until ibv
+ * guarantees valid QP context for events.
+ * Race conditions exist with QP destroy call.
+ * For now, assume the RTU will arrive.
+ */
+ dapl_dbg_log(DAPL_DBG_TYPE_UTIL,
+ " dapli_async_event COMM_EST (qp=%p)\n",
+ event.element.qp);
+
+ if (!DAPL_BAD_HANDLE(ep_ptr, DAPL_MAGIC_EP) &&
+ ep_ptr->cm_handle != IB_INVALID_HANDLE)
+ ib_cm_establish(ep_ptr->cm_handle->cm_id);
+
+ break;
+ }
+ case IBV_EVENT_QP_FATAL:
+ case IBV_EVENT_QP_REQ_ERR:
+ case IBV_EVENT_QP_ACCESS_ERR:
+ case IBV_EVENT_QP_LAST_WQE_REACHED:
+ case IBV_EVENT_SRQ_ERR:
+ case IBV_EVENT_SRQ_LIMIT_REACHED:
+ case IBV_EVENT_SQ_DRAINED:
+ {
+ dapl_dbg_log(DAPL_DBG_TYPE_WARN,
+ " dapli_async_event QP ERR %d\n",
+ event.event_type);
+
+ /* report up if async callback still setup */
+ if (hca->async_qp_error)
+ hca->async_qp_error(hca->ib_ctx,
+ &event,
+ hca->async_qp_ctx);
+ break;
+ }
+ case IBV_EVENT_PATH_MIG:
+ case IBV_EVENT_PATH_MIG_ERR:
+ case IBV_EVENT_DEVICE_FATAL:
+ case IBV_EVENT_PORT_ACTIVE:
+ case IBV_EVENT_PORT_ERR:
+ case IBV_EVENT_LID_CHANGE:
+ case IBV_EVENT_PKEY_CHANGE:
+ case IBV_EVENT_SM_CHANGE:
+ {
+ dapl_dbg_log(DAPL_DBG_TYPE_WARN,
+ " dapli_async_event DEV ERR %d\n",
+ event.event_type);
+
+ /* report up if async callback still setup */
+ if (hca->async_unafiliated)
+ hca->async_unafiliated(
+ hca->ib_ctx,
+ &event,
+ hca->async_un_ctx);
+ break;
+ }
+ default:
+ {
+ dapl_dbg_log (DAPL_DBG_TYPE_WARN,
+ "--> DsEventCb: UNKNOWN\n");
+ break;
+ }
+ }
+ ibv_put_async_event(&event);
+ }
+}
+
+
+/* work thread for uAT, uCM, CQ, and async events */
+void dapli_thread(void *arg)
+{
+ struct pollfd ufds[__FD_SETSIZE];
+ struct _ib_hca_transport *uhca[__FD_SETSIZE]={NULL};
+ struct _ib_hca_transport *hca;
+ int ret,idx,fds;
+ char rbuf[2];
+
+ dapl_dbg_log (DAPL_DBG_TYPE_UTIL,
+ " ib_thread(%d,0x%x): ENTER: pipe %d cm %d at %d\n",
+ getpid(), g_ib_thread,
+ g_ib_pipe[0], ib_cm_get_fd(),
+ ib_at_get_fd());
+
+ /* Poll across pipe, CM, AT never changes */
+ dapl_os_lock( &g_hca_lock );
+
+ ufds[0].fd = g_ib_pipe[0]; /* pipe */
+ ufds[0].events = POLLIN;
+ ufds[1].fd = ib_cm_get_fd(); /* uCM */
+ ufds[1].events = POLLIN;
+ ufds[2].fd = ib_at_get_fd(); /* uAT */
+ ufds[2].events = POLLIN;
+
+ while (!g_ib_destroy) {
+
+ /* build ufds after pipe, cm, at events */
+ ufds[0].revents = 0;
+ ufds[1].revents = 0;
+ ufds[2].revents = 0;
+ idx=2;
+
+ /* Walk HCA list and setup async and CQ events */
+ if (!dapl_llist_is_empty(&g_hca_list))
+ hca = dapl_llist_peek_head(&g_hca_list);
+ else
+ hca = NULL;
+
+ while(hca) {
+ int i;
+ ufds[++idx].fd = hca->ib_ctx->async_fd; /* uASYNC */
+ ufds[idx].events = POLLIN;
+ ufds[idx].revents = 0;
+ uhca[idx] = hca;
+ for (i=0;i<hca->ib_ctx->num_comp;i++) { /* uCQ */
+ ufds[++idx].fd = hca->ib_ctx->cq_fd[i];
+ ufds[idx].events = POLLIN;
+ ufds[idx].revents = 0;
+ uhca[idx] = hca;
+ }
+ hca = dapl_llist_next_entry(
+ &g_hca_list,
+ (DAPL_LLIST_ENTRY*)&hca->entry);
+ }
+
+ /* unlock, and setup poll */
+ fds = idx+1;
+ dapl_os_unlock(&g_hca_lock);
+ ret = poll(ufds, fds, -1);
+ if (ret <= 0) {
+ dapl_dbg_log(DAPL_DBG_TYPE_WARN,
+ " ib_thread(%d): ERR %s poll\n",
+ getpid(),strerror(errno));
+ dapl_os_lock(&g_hca_lock);
+ continue;
+ }
+
+ /* check and process CQ and ASYNC events, each open device */
+ for(idx=3;idx<fds;idx++) {
+ if (ufds[idx].revents == POLLIN) {
+ dapli_cq_event_cb(uhca[idx]);
+ dapli_async_event_cb(uhca[idx]);
+ }
+ }
+
+ /* check and process user events */
+ if (ufds[0].revents == POLLIN) {
+
+ read(g_ib_pipe[0], rbuf, 2);
+
+ /* cleanup any device on list marked for destroy */
+ for(idx=3;idx<fds;idx++) {
+ if(uhca[idx] && uhca[idx]->destroy == 1) {
+ dapl_os_lock(&g_hca_lock);
+ dapl_llist_remove_entry(
+ &g_hca_list,
+ (DAPL_LLIST_ENTRY*)
+ &uhca[idx]->entry);
+ dapl_os_unlock(&g_hca_lock);
+ uhca[idx]->destroy = 2;
+ }
+ }
+ }
+
+ /* CM and AT events */
+ if (ufds[1].revents == POLLIN)
+ dapli_cm_event_cb();
+
+ if (ufds[2].revents == POLLIN)
+ dapli_at_event_cb();
+
+ dapl_os_lock(&g_hca_lock);
+ }
+ dapl_dbg_log(DAPL_DBG_TYPE_UTIL," ib_thread(%d) EXIT\n",getpid());
+ g_ib_destroy = 2;
+ dapl_os_unlock(&g_hca_lock);
+}
Index: dapl/openib/dapl_ib_cm.c
===================================================================
--- dapl/openib/dapl_ib_cm.c (revision 3293)
+++ dapl/openib/dapl_ib_cm.c (working copy)
@@ -70,90 +70,6 @@
static inline uint64_t cpu_to_be64(uint64_t x) { return x; }
#endif
-static int g_at_destroy;
-static DAPL_OS_THREAD g_at_thread;
-static int g_cm_destroy;
-static DAPL_OS_THREAD g_cm_thread;
-static DAPL_OS_LOCK g_cm_lock;
-static struct dapl_llist_entry *g_cm_list;
-
-int dapli_cm_thread_init(void)
-{
- DAT_RETURN dat_status;
-
- dapl_dbg_log(DAPL_DBG_TYPE_CM," cm_thread_init(%d)\n", getpid());
-
- /* initialize cr_list lock */
- dapl_os_lock_init(&g_cm_lock);
-
- /* initialize CM list for listens on this HCA */
- dapl_llist_init_head(&g_cm_list);
-
- /* create thread to process inbound connect request */
- dat_status = dapl_os_thread_create(cm_thread, NULL, &g_cm_thread);
- if (dat_status != DAT_SUCCESS)
- {
- dapl_dbg_log(DAPL_DBG_TYPE_ERR,
- " cm_thread_init: failed to create thread\n");
- return 1;
- }
- return 0;
-}
-
-void dapli_cm_thread_destroy(void)
-{
- dapl_dbg_log(DAPL_DBG_TYPE_CM," cm_thread_destroy(%d)\n", getpid());
-
- /* destroy cr_thread and lock */
- g_cm_destroy = 1;
- pthread_kill( g_cm_thread, SIGUSR1 );
- dapl_dbg_log(DAPL_DBG_TYPE_CM," cm_thread_destroy(%d) SIGUSR1 sent\n",getpid());
- while (g_cm_destroy) {
- struct timespec sleep, remain;
- sleep.tv_sec = 0;
- sleep.tv_nsec = 10000000; /* 10 ms */
- dapl_dbg_log(DAPL_DBG_TYPE_CM,
- " cm_thread_destroy: waiting for cm_thread\n");
- nanosleep (&sleep, &remain);
- }
- dapl_dbg_log(DAPL_DBG_TYPE_CM," cm_thread_destroy(%d) exit\n",getpid());
-}
-
-int dapli_at_thread_init(void)
-{
- DAT_RETURN dat_status;
-
- dapl_dbg_log(DAPL_DBG_TYPE_CM," at_thread_init(%d)\n", getpid());
-
- /* create thread to process AT async requests */
- dat_status = dapl_os_thread_create(at_thread, NULL, &g_at_thread);
- if (dat_status != DAT_SUCCESS)
- {
- dapl_dbg_log(DAPL_DBG_TYPE_ERR,
- " at_thread_init: failed to create thread\n");
- return 1;
- }
- return 0;
-}
-
-void dapli_at_thread_destroy(void)
-{
- dapl_dbg_log(DAPL_DBG_TYPE_CM," at_thread_destroy(%d)\n", getpid());
-
- /* destroy cr_thread and lock */
- g_at_destroy = 1;
- pthread_kill( g_at_thread, SIGUSR1 );
- dapl_dbg_log(DAPL_DBG_TYPE_CM," at_thread_destroy(%d) SIGUSR1 sent\n",getpid());
- while (g_at_destroy) {
- struct timespec sleep, remain;
- sleep.tv_sec = 0;
- sleep.tv_nsec = 10000000; /* 10 ms */
- dapl_dbg_log(DAPL_DBG_TYPE_CM,
- " at_thread_destroy: waiting for at_thread\n");
- nanosleep (&sleep, &remain);
- }
- dapl_dbg_log(DAPL_DBG_TYPE_CM," at_thread_destroy(%d) exit\n",getpid());
-}
void dapli_ip_comp_handler(uint64_t req_id, void *context, int rec_num)
{
@@ -348,12 +264,6 @@
if (conn->ep)
conn->ep->cm_handle = IB_INVALID_HANDLE;
- /* take off the CM thread work queue and free */
- dapl_os_lock( &g_cm_lock );
- dapl_llist_remove_entry(&g_cm_list,
- (DAPL_LLIST_ENTRY*)&conn->entry);
- dapl_os_unlock(&g_cm_lock);
-
dapl_os_free(conn, sizeof(*conn));
}
}
@@ -426,8 +336,8 @@
if (new_conn) {
(void)dapl_os_memzero(new_conn, sizeof(*new_conn));
- dapl_os_lock_init(&new_conn->lock);
new_conn->cm_id = event->cm_id; /* provided by uCM */
+ event->cm_id->context = new_conn; /* update CM_ID context */
new_conn->sp = conn->sp;
new_conn->hca = conn->hca;
new_conn->service_id = conn->service_id;
@@ -444,13 +354,6 @@
event->param.req_rcvd.primary_path,
sizeof(struct ib_sa_path_rec));
- /* put new CR on CM thread event work queue */
- dapl_llist_init_entry((DAPL_LLIST_ENTRY*)&new_conn->entry);
- dapl_os_lock( &g_cm_lock );
- dapl_llist_add_tail(&g_cm_list,
- (DAPL_LLIST_ENTRY*)&new_conn->entry, new_conn);
- dapl_os_unlock(&g_cm_lock);
-
dapl_dbg_log(DAPL_DBG_TYPE_CM, " passive_cb: "
"REQ on HCA %p SP %p SID %d L_ID %d new_id %d p_data %p\n",
new_conn->hca, new_conn->sp,
@@ -521,18 +424,13 @@
if (conn->ep)
conn->ep->cm_handle = IB_INVALID_HANDLE;
- /* take off the CM thread work queue and free */
- dapl_os_lock( &g_cm_lock );
- dapl_llist_remove_entry(&g_cm_list,
- (DAPL_LLIST_ENTRY*)&conn->entry);
- dapl_os_unlock(&g_cm_lock);
dapl_os_free(conn, sizeof(*conn));
}
return(destroy);
}
static int dapli_cm_passive_cb(struct dapl_cm_id *conn,
- struct ib_cm_event *event)
+ struct ib_cm_event *event)
{
int destroy;
struct dapl_cm_id *new_conn;
@@ -541,9 +439,6 @@
" passive_cb: conn %p id %d event %d\n",
conn, conn->cm_id, event->event );
- if (conn->cm_id == 0)
- return 0;
-
dapl_os_lock(&conn->lock);
if (conn->destroy) {
dapl_os_unlock(&conn->lock);
@@ -608,155 +503,11 @@
if (conn->ep)
conn->ep->cm_handle = IB_INVALID_HANDLE;
- /* take off the CM thread work queue and free */
- dapl_os_lock( &g_cm_lock );
- dapl_llist_remove_entry(&g_cm_list,
- (DAPL_LLIST_ENTRY*)&conn->entry);
- dapl_os_unlock(&g_cm_lock);
-
dapl_os_free(conn, sizeof(*conn));
}
return(destroy);
}
-/* something to catch the signal */
-static void ib_sig_handler(int signum)
-{
- return;
-}
-
-/* async CM processing thread */
-void cm_thread(void *arg)
-{
- struct dapl_cm_id *conn, *next_conn;
- struct ib_cm_event *event;
- struct pollfd ufds;
- sigset_t sigset;
-
- dapl_dbg_log (DAPL_DBG_TYPE_CM,
- " cm_thread(%d,0x%x): ENTER: cm_fd %d\n",
- getpid(), g_cm_thread, ib_cm_get_fd());
-
- sigemptyset(&sigset);
- sigaddset(&sigset, SIGUSR1);
- pthread_sigmask(SIG_UNBLOCK, &sigset, NULL);
- signal( SIGUSR1, ib_sig_handler);
-
- dapl_os_lock( &g_cm_lock );
- while (!g_cm_destroy) {
- struct ib_cm_id *cm_id;
- int ret;
-
- /* select for CM event, all events process via cm_fd */
- ufds.fd = ib_cm_get_fd();
- ufds.events = POLLIN;
- ufds.revents = 0;
-
- dapl_os_unlock(&g_cm_lock);
- ret = poll(&ufds, 1, -1);
- if (ret <= 0) {
- dapl_dbg_log(DAPL_DBG_TYPE_CM,
- " cm_thread(%d): ERR %s poll\n",
- getpid(),strerror(errno));
- dapl_os_lock(&g_cm_lock);
- continue;
- }
-
- dapl_dbg_log(DAPL_DBG_TYPE_CM,
- " cm_thread: GET EVENT fd=%d n=%d\n",
- ib_cm_get_fd(),ret);
-
- if (ib_cm_event_get_timed(0,&event)) {
- dapl_dbg_log(DAPL_DBG_TYPE_CM,
- " cm_thread: ERR %s event_get on %d\n",
- strerror(errno), ib_cm_get_fd() );
- dapl_os_lock(&g_cm_lock);
- continue;
- }
- dapl_dbg_log(DAPL_DBG_TYPE_CM,
- " cm_thread: GET EVENT fd=%d woke\n",ib_cm_get_fd());
- dapl_os_lock(&g_cm_lock);
-
- /* set proper cm_id */
- if (event->event == IB_CM_REQ_RECEIVED ||
- event->event == IB_CM_SIDR_REQ_RECEIVED)
- cm_id = event->param.req_rcvd.listen_id;
- else
- cm_id = event->cm_id;
-
- dapl_dbg_log (DAPL_DBG_TYPE_CM,
- " cm_thread: EVENT event(%d) cm_id=%d (%d)\n",
- event->event, event->cm_id, cm_id );
-
- /*
- * Walk cm_list looking for connection id in event
- * no need to walk if uCM would provide context with event
- */
- if (!dapl_llist_is_empty(&g_cm_list))
- next_conn = dapl_llist_peek_head(&g_cm_list);
- else
- next_conn = NULL;
-
- ret = 0;
- while (next_conn) {
- conn = next_conn;
- dapl_dbg_log(DAPL_DBG_TYPE_CM,
- " cm_thread: LIST cm %p c_id %d e_id %d)\n",
- conn, conn->cm_id, cm_id );
-
- next_conn = dapl_llist_next_entry(
- &g_cm_list,
- (DAPL_LLIST_ENTRY*)&conn->entry );
-
- if (cm_id == conn->cm_id) {
- dapl_os_unlock(&g_cm_lock);
- if (conn->sp)
- ret = dapli_cm_passive_cb(conn,event);
- else
- ret = dapli_cm_active_cb(conn,event);
- dapl_os_lock(&g_cm_lock);
- break;
- }
- }
- ib_cm_event_put(event);
- if (ret) {
- dapl_dbg_log(DAPL_DBG_TYPE_CM,
- " cm_thread: destroy cm_id %d\n",cm_id);
- ib_cm_destroy_id(cm_id);
- }
- }
- dapl_os_unlock(&g_cm_lock);
- dapl_dbg_log(DAPL_DBG_TYPE_CM," cm_thread(%d) EXIT, cm_list=%s\n",
- getpid(),dapl_llist_is_empty(&g_cm_list) ? "EMPTY":"NOT EMPTY");
- g_cm_destroy = 0;
-}
-
-/* async AT processing thread */
-void at_thread(void *arg)
-{
- sigset_t sigset;
-
- dapl_dbg_log (DAPL_DBG_TYPE_CM,
- " at_thread(%d,0x%x): ENTER: at_fd %d\n",
- getpid(), g_at_thread, ib_at_get_fd());
-
- sigemptyset(&sigset);
- sigaddset(&sigset, SIGUSR1);
- pthread_sigmask(SIG_UNBLOCK, &sigset, NULL);
- signal(SIGUSR1, ib_sig_handler);
-
- while (!g_at_destroy) {
- /* poll forever until callback or signal */
- if (ib_at_callback_get_timed(-1) < 0) {
- dapl_dbg_log(DAPL_DBG_TYPE_CM,
- " at_thread: SIG? ret=%s, destroy=%d\n",
- strerror(errno), g_at_destroy );
- }
- dapl_dbg_log(DAPL_DBG_TYPE_CM," at_thread: callback woke\n");
- }
- dapl_dbg_log(DAPL_DBG_TYPE_CM," at_thread(%d) EXIT \n", getpid());
- g_at_destroy = 0;
-}
/************************ DAPL provider entry points **********************/
@@ -853,13 +604,6 @@
conn->retries = 0;
dapl_os_memcpy(&conn->r_addr, r_addr, sizeof(DAT_SOCK_ADDR6));
- /* put on CM thread work queue */
- dapl_llist_init_entry((DAPL_LLIST_ENTRY*)&conn->entry);
- dapl_os_lock( &g_cm_lock );
- dapl_llist_add_tail(&g_cm_list,
- (DAPL_LLIST_ENTRY*)&conn->entry, conn);
- dapl_os_unlock(&g_cm_lock);
-
status = ib_at_route_by_ip(
((struct sockaddr_in *)&conn->r_addr)->sin_addr.s_addr,
((struct sockaddr_in *)&conn->hca->hca_address)->sin_addr.s_addr,
@@ -1019,13 +763,6 @@
conn->hca = ia_ptr->hca_ptr;
conn->service_id = ServiceID;
- /* put on CM thread work queue */
- dapl_llist_init_entry((DAPL_LLIST_ENTRY*)&conn->entry);
- dapl_os_lock( &g_cm_lock );
- dapl_llist_add_tail(&g_cm_list,
- (DAPL_LLIST_ENTRY*)&conn->entry, conn);
- dapl_os_unlock(&g_cm_lock);
-
dapl_dbg_log(DAPL_DBG_TYPE_EP,
" setup_listener(conn=%p cm_id=%d)\n",
sp_ptr->cm_srvc_handle,conn->cm_id);
@@ -1345,8 +1082,6 @@
return size;
}
-#ifndef SOCKET_CM
-
/*
* Map all socket CM event codes to the DAT equivelent.
*/
@@ -1457,7 +1192,44 @@
return ib_cm_event;
}
-#endif
+void dapli_cm_event_cb()
+{
+ struct ib_cm_event *event;
+
+ dapl_dbg_log(DAPL_DBG_TYPE_UTIL, " dapli_cm_event()\n");
+
+ /* process one CM event, fairness */
+ if(!ib_cm_event_get_timed(0,&event)) {
+ struct dapl_cm_id *conn;
+ int ret;
+ dapl_dbg_log(DAPL_DBG_TYPE_CM,
+ " dapli_cm_event: EVENT=%p ID=%p CTX=%p\n",
+ event->event, event->cm_id,
+ event->cm_id->context);
+
+ /* set proper conn from cm_id context*/
+ conn = (struct dapl_cm_id*)event->cm_id->context;
+
+ if (conn->sp)
+ ret = dapli_cm_passive_cb(conn,event);
+ else
+ ret = dapli_cm_active_cb(conn,event);
+
+ ib_cm_event_put(event);
+
+ if (ret)
+ ib_cm_destroy_id(conn->cm_id);
+ }
+}
+
+void dapli_at_event_cb()
+{
+ dapl_dbg_log(DAPL_DBG_TYPE_UTIL, " dapli_at_event_cb()\n");
+
+ /* process one AT event, fairness */
+ ib_at_callback_get_timed(0);
+}
+
/*
* Local variables:
Index: dapl/openib/dapl_ib_util.h
===================================================================
--- dapl/openib/dapl_ib_util.h (revision 3293)
+++ dapl/openib/dapl_ib_util.h (working copy)
@@ -231,18 +231,22 @@
/* ib_hca_transport_t, specific to this implementation */
typedef struct _ib_hca_transport
{
- struct ibv_device *ib_dev;
+ struct ib_llist_entry entry;
+ int destroy;
+ struct ibv_device *ib_dev;
+ struct ibv_context *ib_ctx;
ib_cq_handle_t ib_cq_empty;
- DAPL_OS_LOCK cq_lock;
DAPL_OS_WAIT_OBJECT wait_object;
- int cq_destroy;
- DAPL_OS_THREAD cq_thread;
int max_inline_send;
union ibv_gid gid;
ib_async_handler_t async_unafiliated;
+ void *async_un_ctx;
ib_async_handler_t async_cq_error;
+ void *async_ctx;
ib_async_handler_t async_cq;
+ void *async_cq_ctx;
ib_async_handler_t async_qp_error;
+ void *async_qp_ctx;
} ib_hca_transport_t;
@@ -252,21 +256,15 @@
/* prototypes */
int32_t dapls_ib_init (void);
int32_t dapls_ib_release (void);
-void cm_thread (void *arg);
-int dapli_cm_thread_init(void);
-void dapli_cm_thread_destroy(void);
-void at_thread (void *arg);
-int dapli_at_thread_init(void);
-void dapli_at_thread_destroy(void);
-void cq_thread (void *arg);
-int dapli_cq_thread_init(struct dapl_hca *hca_ptr);
-void dapli_cq_thread_destroy(struct dapl_hca *hca_ptr);
-
-int dapli_get_lid(struct dapl_hca *hca_ptr, int port, uint16_t *lid);
-int dapli_get_gid(struct dapl_hca *hca_ptr, int port, int index,
- union ibv_gid *gid);
-int dapli_get_hca_addr(struct dapl_hca *hca_ptr);
+void dapli_thread(void *arg);
+int dapli_ib_thread_init(void);
+void dapli_ib_thread_destroy(void);
+int dapli_get_hca_addr(struct dapl_hca *hca_ptr);
void dapli_ip_comp_handler(uint64_t req_id, void *context, int rec_num);
+void dapli_cm_event_cb(void);
+void dapli_at_event_cb(void);
+void dapli_cq_event_cb(struct _ib_hca_transport *hca);
+void dapli_async_event_cb(struct _ib_hca_transport *hca);
DAT_RETURN
dapls_modify_qp_state ( IN ib_qp_handle_t qp_handle,
Index: dapl/openib/dapl_ib_cq.c
===================================================================
--- dapl/openib/dapl_ib_cq.c (revision 3293)
+++ dapl/openib/dapl_ib_cq.c (working copy)
@@ -52,94 +52,40 @@
#include "dapl_evd_util.h"
#include "dapl_ring_buffer_util.h"
#include <sys/poll.h>
-#include <signal.h>
-int dapli_cq_thread_init(struct dapl_hca *hca_ptr)
+void dapli_cq_event_cb(struct _ib_hca_transport *hca)
{
- DAT_RETURN dat_status;
+ int i;
+ dapl_dbg_log(DAPL_DBG_TYPE_UTIL," dapli_cq_event_cb(%p)\n", hca);
- dapl_dbg_log(DAPL_DBG_TYPE_UTIL," cq_thread_init(%p)\n", hca_ptr);
-
- /* create thread to process inbound connect request */
- dat_status = dapl_os_thread_create( cq_thread,
(void*)hca_ptr,&hca_ptr->ib_trans.cq_thread);
- if (dat_status != DAT_SUCCESS)
- {
- dapl_dbg_log(DAPL_DBG_TYPE_ERR,
- " cq_thread_init: failed to create thread\n");
- return 1;
- }
- return 0;
-}
-
-void dapli_cq_thread_destroy(struct dapl_hca *hca_ptr)
-{
- dapl_dbg_log(DAPL_DBG_TYPE_UTIL," cq_thread_destroy(%p)\n", hca_ptr);
-
- /* destroy cr_thread and lock */
- hca_ptr->ib_trans.cq_destroy = 1;
- pthread_kill(hca_ptr->ib_trans.cq_thread, SIGUSR1);
- dapl_dbg_log(DAPL_DBG_TYPE_CM," cq_thread_destroy(%p) SIGUSR1 sent\n",hca_ptr);
- while (hca_ptr->ib_trans.cq_destroy != 2) {
- struct timespec sleep, remain;
- sleep.tv_sec = 0;
- sleep.tv_nsec = 10000000; /* 10 ms */
- dapl_dbg_log(DAPL_DBG_TYPE_UTIL,
- " cq_thread_destroy: waiting for cq_thread\n");
- nanosleep (&sleep, &remain);
- }
- dapl_dbg_log(DAPL_DBG_TYPE_UTIL," cq_thread_destroy(%d) exit\n",getpid());
- return;
-}
-
-/* something to catch the signal */
-static void ib_cq_handler(int signum)
-{
- return;
-}
-
-void cq_thread( void *arg )
-{
- struct dapl_hca *hca_ptr = arg;
- struct dapl_evd *evd_ptr;
- struct ibv_cq *ibv_cq = NULL;
- sigset_t sigset;
- int status = 0;
-
- dapl_dbg_log ( DAPL_DBG_TYPE_UTIL," cq_thread: ENTER hca %p\n",hca_ptr);
-
- sigemptyset(&sigset);
- sigaddset(&sigset,SIGUSR1);
- pthread_sigmask(SIG_UNBLOCK, &sigset, NULL);
- signal(SIGUSR1, ib_cq_handler);
-
- /* wait on DTO event, or signal to abort */
- while (!hca_ptr->ib_trans.cq_destroy) {
-
- struct pollfd cq_poll = {
- .fd = hca_ptr->ib_hca_handle->cq_fd[0],
+ /* check all comp events on this device */
+ for(i=0;i<hca->ib_ctx->num_comp;i++) {
+ struct dapl_evd *evd_ptr = NULL;
+ struct ibv_cq *ibv_cq = NULL;
+ struct pollfd cq_fd = {
+ .fd = hca->ib_ctx->cq_fd[i],
.events = POLLIN,
.revents = 0
};
-
- status = poll(&cq_poll, 1, -1);
- if ((status == 1) &&
- (!ibv_get_cq_event(hca_ptr->ib_hca_handle, 0, &ibv_cq, (void*)&evd_ptr))) {
-
+ if ((poll(&cq_fd, 1, 0) == 1) &&
+ (!ibv_get_cq_event(hca->ib_ctx, i,
+ &ibv_cq, (void*)&evd_ptr))) {
+
+ /*
+ * TODO: ibv put event to protect against
+ * destroy CQ race conditions?
+ */
if (DAPL_BAD_HANDLE(evd_ptr, DAPL_MAGIC_EVD))
continue;
/* process DTO event via callback */
- dapl_evd_dto_callback ( evd_ptr->header.owner_ia->hca_ptr->ib_hca_handle,
+ dapl_evd_dto_callback ( hca->ib_ctx,
evd_ptr->ib_cq_handle,
(void*)evd_ptr );
- } else {
-
- }
- }
- hca_ptr->ib_trans.cq_destroy = 2;
- dapl_dbg_log(DAPL_DBG_TYPE_UTIL," cq_thread: EXIT: hca %p \n", hca_ptr);
- return;
+ }
+ }
}
+
/*
* Map all verbs DTO completion codes to the DAT equivelent.
*
-------------- next part --------------
An HTML attachment was scrubbed...
URL: <http://lists.openfabrics.org/pipermail/general/attachments/20050901/86818698/attachment.html>
More information about the general
mailing list