[ofw] [PATCH] librdmacm: add multi-thread support
Sean Hefty
sean.hefty at intel.com
Fri May 15 15:54:16 PDT 2009
Finally get around to adding support for processing rdma_cm events
in a separate thread from those using other interface calls, but
mainly rdma_destroy_id. This is needed to support DAPL, which
does event processing in a dedicated thread.
The only real challenge is handling EP:GetRequest() for reporting
new connect requests. When a listen is destroyed, all outstanding
GetRequests associated with the listen must be canceled. This
generates events, that the library must intercept to prevent them
from being reported to the user.
Since a canceled GetRequest event can be picked up by either a
thread calling rdma_destroy_id or rdma_get_cm_event, we modify
CompEntryCancel to return whether or not the thread that canceled
the request now owns the completed object, or if another thread
picked it up first.
To ensure that a thread calling rdma_get_cm_event handles canceled
events correctly, we add a new destroying state and reference
counting to rdma_cm_id's. Synchronization is provided by using
either interlocked calls (where needed only) or a single critical
section lock. I'm not expecting that the overhead of having per
rdma_cm_id events and locks will be worthwhile.
Signed-off-by: Sean Hefty <sean.hefty at intel.com>
---
Index: etc/user/comp_channel.cpp
===================================================================
--- etc/user/comp_channel.cpp (revision 2100)
+++ etc/user/comp_channel.cpp (working copy)
@@ -230,7 +230,8 @@
entry_ptr = &(*entry_ptr)->Next;
}
- if (*entry_ptr != NULL) {
+ entry = *entry_ptr;
+ if (entry != NULL) {
*entry_ptr = pEntry->Next;
if (pChannel->TailPtr == &pEntry->Next) {
pChannel->TailPtr = entry_ptr;
@@ -239,7 +240,7 @@
InterlockedExchange(&pEntry->Busy, 0);
}
LeaveCriticalSection(&pChannel->Lock);
- return *entry_ptr;
+ return entry;
}
static void CompChannelQueue(COMP_CHANNEL *pChannel, COMP_ENTRY *pEntry)
@@ -311,10 +312,13 @@
return 0;
}
-void CompEntryCancel(COMP_ENTRY *pEntry)
+COMP_ENTRY *CompEntryCancel(COMP_ENTRY *pEntry)
{
+ COMP_ENTRY *entry = NULL;
+
while (pEntry->Busy) {
Sleep(0);
- CompChannelFindRemove(pEntry->Channel, pEntry);
+ entry = CompChannelFindRemove(pEntry->Channel, pEntry);
}
+ return entry;
}
Index: inc/user/comp_channel.h
===================================================================
--- inc/user/comp_channel.h (revision 2100)
+++ inc/user/comp_channel.h (working copy)
@@ -89,7 +89,7 @@
void CompEntryInit(COMP_CHANNEL *pChannel, COMP_ENTRY *pEntry);
DWORD CompEntryPost(COMP_ENTRY *pEntry);
-void CompEntryCancel(COMP_ENTRY *pEntry);
+COMP_ENTRY *CompEntryCancel(COMP_ENTRY *pEntry);
#ifdef __cplusplus
}
Index: ulp/librdmacm/src/cma.cpp
===================================================================
--- ulp/librdmacm/src/cma.cpp (revision 2100)
+++ ulp/librdmacm/src/cma.cpp (working copy)
@@ -57,7 +57,8 @@
cma_connected,
cma_active_disconnect,
cma_passive_disconnect,
- cma_disconnected
+ cma_disconnected,
+ cma_destroying
};
#define CMA_DEFAULT_BACKLOG 16
@@ -69,6 +70,7 @@
struct cma_device *cma_dev;
int backlog;
int index;
+ volatile LONG refcnt;
struct rdma_cm_id **req_list;
};
@@ -243,6 +245,7 @@
}
RtlZeroMemory(id_priv, sizeof(struct cma_id_private));
+ id_priv->refcnt = 1;
id_priv->id.context = context;
id_priv->id.channel = channel;
id_priv->id.ps = ps;
@@ -269,6 +272,7 @@
{
while (--id_priv->backlog >= 0) {
if (id_priv->req_list[id_priv->backlog] != NULL) {
+ InterlockedDecrement(&id_priv->refcnt);
rdma_destroy_id(id_priv->req_list[id_priv->backlog]);
}
}
@@ -282,13 +286,20 @@
struct cma_id_private *id_priv;
id_priv = CONTAINING_RECORD(id, struct cma_id_private, id);
+
+ EnterCriticalSection(&lock);
+ id_priv->state = cma_destroying;
+ LeaveCriticalSection(&lock);
+
if (id->ps == RDMA_PS_TCP) {
id->ep.connect->CancelOverlappedRequests();
} else {
id->ep.datagram->CancelOverlappedRequests();
}
- CompEntryCancel(&id->comp_entry);
+ if (CompEntryCancel(&id->comp_entry) != NULL) {
+ InterlockedDecrement(&id_priv->refcnt);
+ }
if (id_priv->backlog > 0) {
ucma_destroy_listen(id_priv);
@@ -300,6 +311,10 @@
id_priv->id.ep.datagram->Release();
}
+ InterlockedDecrement(&id_priv->refcnt);
+ while (id_priv->refcnt) {
+ Sleep(0);
+ }
delete id_priv;
return 0;
}
@@ -471,6 +486,7 @@
RtlCopyMemory(&id->route.addr.dst_addr, dst_addr, ucma_addrlen(dst_addr));
id_priv->state = cma_addr_resolve;
+ id_priv->refcnt++;
CompEntryPost(&id->comp_entry);
return 0;
}
@@ -497,6 +513,7 @@
id_priv = CONTAINING_RECORD(id, struct cma_id_private, id);
id_priv->state = cma_route_resolve;
+ id_priv->refcnt++;
CompEntryPost(&id->comp_entry);
return 0;
}
@@ -622,10 +639,12 @@
}
id_priv->state = cma_active_connect;
+ id_priv->refcnt++;
id->comp_entry.Busy = 1;
hr = id->ep.connect->Connect(id->qp->conn_handle, &id->route.addr.dst_addr,
&attr, &id->comp_entry.Overlap);
if (FAILED(hr) && hr != WV_IO_PENDING) {
+ id_priv->refcnt--;
id->comp_entry.Busy = 0;
id_priv->state = cma_route_resolve;
return hr;
@@ -636,19 +655,27 @@
static int ucma_get_request(struct cma_id_private *listen, int index)
{
- struct cma_id_private *id_priv;
+ struct cma_id_private *id_priv = NULL;
HRESULT hr;
+ EnterCriticalSection(&lock);
+ if (listen->state != cma_listening) {
+ hr = WV_INVALID_PARAMETER;
+ goto err1;
+ }
+
+ InterlockedIncrement(&listen->refcnt);
hr = rdma_create_id(listen->id.channel, &listen->req_list[index],
listen, listen->id.ps);
if (FAILED(hr)) {
- return hr;
+ goto err2;
}
id_priv = CONTAINING_RECORD(listen->req_list[index], struct cma_id_private, id);
id_priv->index = index;
id_priv->state = cma_get_request;
+ id_priv->refcnt++;
id_priv->id.comp_entry.Busy = 1;
if (listen->id.ps == RDMA_PS_TCP) {
hr = listen->id.ep.connect->GetRequest(id_priv->id.ep.connect,
@@ -659,10 +686,21 @@
}
if (FAILED(hr) && hr != WV_IO_PENDING) {
id_priv->id.comp_entry.Busy = 0;
- return hr;
+ id_priv->refcnt--;
+ goto err2;
}
+ LeaveCriticalSection(&lock);
return 0;
+
+err2:
+ InterlockedDecrement(&listen->refcnt);
+err1:
+ LeaveCriticalSection(&lock);
+ if (id_priv != NULL) {
+ rdma_destroy_id(&id_priv->id);
+ }
+ return hr;
}
__declspec(dllexport)
@@ -725,10 +763,12 @@
}
id_priv->state = cma_accepting;
+ id_priv->refcnt++;
id->comp_entry.Busy = 1;
hr = id->ep.connect->Accept(id->qp->conn_handle, &attr,
&id->comp_entry.Overlap);
if (FAILED(hr) && hr != WV_IO_PENDING) {
+ id_priv->refcnt--;
id->comp_entry.Busy = 0;
id_priv->state = cma_disconnected;
return hr;
@@ -783,31 +823,41 @@
int rdma_ack_cm_event(struct rdma_cm_event *event)
{
struct cma_event *evt;
+ struct cma_id_private *listen;
evt = CONTAINING_RECORD(event, struct cma_event, event);
+ InterlockedDecrement(&evt->id_priv->refcnt);
+ if (evt->event.listen_id) {
+ listen = CONTAINING_RECORD(evt->event.listen_id, struct cma_id_private, id);
+ InterlockedDecrement(&listen->refcnt);
+ }
delete evt;
return 0;
}
static int ucma_process_conn_req(struct cma_event *event)
{
- struct cma_id_private *listen;
+ struct cma_id_private *listen, *id_priv;
struct cma_event_channel *chan;
listen = (struct cma_id_private *) event->id_priv->id.context;
- ucma_get_request(listen, event->id_priv->index);
+ id_priv = event->id_priv;
+ ucma_get_request(listen, id_priv->index);
+
if (SUCCEEDED(event->event.status)) {
- event->event.status = ucma_query_connect(&event->id_priv->id,
+ event->event.status = ucma_query_connect(&id_priv->id,
&event->event.param.conn);
}
if (SUCCEEDED(event->event.status)) {
event->event.event = RDMA_CM_EVENT_CONNECT_REQUEST;
- event->id_priv->state = cma_passive_connect;
+ id_priv->state = cma_passive_connect;
event->event.listen_id = &listen->id;
} else {
- rdma_destroy_id(&event->id_priv->id);
+ InterlockedDecrement(&listen->refcnt);
+ InterlockedDecrement(&id_priv->refcnt);
+ rdma_destroy_id(&id_priv->id);
}
return event->event.status;
@@ -859,6 +909,7 @@
event->event.event = RDMA_CM_EVENT_ESTABLISHED;
id_priv->state = cma_connected;
+ InterlockedIncrement(&id_priv->refcnt);
id_priv->id.comp_entry.Busy = 1;
id_priv->id.ep.connect->NotifyDisconnect(&id_priv->id.comp_entry.Overlap);
} else {
@@ -869,13 +920,25 @@
static int ucma_process_event(struct cma_event *event)
{
+ struct cma_id_private *listen, *id_priv;
WV_CONNECT_ATTRIBUTES attr;
HRESULT hr = 0;
- switch (event->id_priv->state) {
+ id_priv = event->id_priv;
+
+ EnterCriticalSection(&lock);
+ switch (id_priv->state) {
case cma_get_request:
- hr = ucma_process_conn_req(event);
- break;
+ listen = (struct cma_id_private *) id_priv->id.context;
+ if (listen->state != cma_listening) {
+ InterlockedDecrement(&id_priv->refcnt);
+ hr = WV_CANCELLED;
+ break;
+ }
+
+ listen->req_list[id_priv->index] = NULL;
+ LeaveCriticalSection(&lock);
+ return ucma_process_conn_req(event);
case cma_addr_resolve:
event->event.event = RDMA_CM_EVENT_ADDR_RESOLVED;
break;
@@ -890,15 +953,17 @@
break;
case cma_connected:
event->event.event = RDMA_CM_EVENT_DISCONNECTED;
- event->id_priv->state = cma_passive_disconnect;
+ id_priv->state = cma_passive_disconnect;
break;
case cma_active_disconnect:
event->event.event = RDMA_CM_EVENT_DISCONNECTED;
- event->id_priv->state = cma_disconnected;
+ id_priv->state = cma_disconnected;
break;
default:
- return -1;
+ InterlockedDecrement(&id_priv->refcnt);
+ hr = WV_CANCELLED;
}
+ LeaveCriticalSection(&lock);
return hr;
}
-------------- next part --------------
A non-text attachment was scrubbed...
Name: cma-mt.patch
Type: application/octet-stream
Size: 8464 bytes
Desc: not available
URL: <http://lists.openfabrics.org/pipermail/ofw/attachments/20090515/96cd9cd6/attachment.obj>
More information about the ofw
mailing list