[ofw] [PATCH] etc/comp_channel: fix lost event problem
Sean Hefty
sean.hefty at intel.com
Fri Apr 17 11:23:08 PDT 2009
The previous version of the completion channel was racy and would
occasionally lose events, resulting in users blocking indefinitely
if no new events occurred. The most sure fix for this is to add
a thread to the completion manager that reaps events from an IO
completion port and dispatches them to the correct completion
channel. This results in a 1-2% performance hit in libibverbs
bandwidth tests that wait on CQ, but actually works.
Signed-off-by: Sean Hefty <sean.hefty at intel.com>
---
As a reminder, the completion manager / channel abstraction is used to
simulate the select/poll functionality across multiple FDs on linux.
diff -up -r -X \mshefty\scm\winof\trunk\docs\dontdiff.txt -I '\$Id:' trunk\etc/user/comp_channel.cpp
branches\winverbs\etc/user/comp_channel.cpp
--- trunk\etc/user/comp_channel.cpp 2009-03-10 02:11:36.546875000 -0700
+++ branches\winverbs\etc/user/comp_channel.cpp 2009-04-10 11:57:38.534233100 -0700
@@ -28,29 +28,88 @@
*/
#include <comp_channel.h>
+#include <process.h>
+static void CompManagerQueue(COMP_MANAGER *pMgr, COMP_ENTRY *pEntry);
static void CompChannelQueue(COMP_CHANNEL *pChannel, COMP_ENTRY *pEntry);
+
+/*
+ * Completion manager
+ */
+
+static unsigned __stdcall CompThreadPoll(void *Context)
+{
+ COMP_MANAGER *mgr = (COMP_MANAGER *) Context;
+ COMP_ENTRY *entry;
+ OVERLAPPED *overlap;
+ DWORD bytes;
+ ULONG_PTR key;
+
+ while (mgr->Run) {
+ GetQueuedCompletionStatus(mgr->CompQueue, &bytes, &key,
+ &overlap, INFINITE);
+ entry = CONTAINING_RECORD(overlap, COMP_ENTRY, Overlap);
+
+ if (entry->Channel) {
+ CompChannelQueue(entry->Channel, entry);
+ } else {
+ CompManagerQueue(mgr, entry);
+ }
+ }
+
+ _endthreadex(0);
+ return 0;
+}
+
DWORD CompManagerOpen(COMP_MANAGER *pMgr)
{
+ DWORD ret;
+
+ InitializeCriticalSection(&pMgr->Lock);
+ pMgr->Busy = 0;
+ DListInit(&pMgr->DoneList);
+ CompEntryInit(NULL, &pMgr->Entry);
+
pMgr->CompQueue = CreateIoCompletionPort(INVALID_HANDLE_VALUE, NULL, 0, -1);
if (pMgr->CompQueue == NULL) {
- return GetLastError();
+ ret = GetLastError();
+ goto err1;
}
pMgr->Event = CreateEvent(NULL, TRUE, TRUE, NULL);
if (pMgr->Event == NULL) {
- return GetLastError();
+ ret = GetLastError();
+ goto err2;
}
- pMgr->Lock = 0;
+ pMgr->Run = TRUE;
+ pMgr->Thread = (HANDLE) _beginthreadex(NULL, 0, CompThreadPoll, pMgr, 0, NULL);
+ if (pMgr->Thread == NULL) {
+ ret = GetLastError();
+ goto err3;
+ }
return 0;
+
+err3:
+ CloseHandle(pMgr->Event);
+err2:
+ CloseHandle(pMgr->CompQueue);
+err1:
+ DeleteCriticalSection(&pMgr->Lock);
+ return ret;
}
void CompManagerClose(COMP_MANAGER *pMgr)
{
+ pMgr->Run = FALSE;
+ CompManagerCancel(pMgr);
+ WaitForSingleObject(pMgr->Thread, INFINITE);
+ CloseHandle(pMgr->Thread);
+
CloseHandle(pMgr->CompQueue);
CloseHandle(pMgr->Event);
+ DeleteCriticalSection(&pMgr->Lock);
}
DWORD CompManagerMonitor(COMP_MANAGER *pMgr, HANDLE hFile, ULONG_PTR Key)
@@ -61,38 +120,85 @@ DWORD CompManagerMonitor(COMP_MANAGER *p
return (cq == NULL) ? GetLastError() : 0;
}
+static void CompManagerQueue(COMP_MANAGER *pMgr, COMP_ENTRY *pEntry)
+{
+ EnterCriticalSection(&pMgr->Lock);
+ DListInsertTail(&pEntry->MgrEntry, &pMgr->DoneList);
+ SetEvent(pMgr->Event);
+ LeaveCriticalSection(&pMgr->Lock);
+}
+
+static void CompManagerRemoveEntry(COMP_MANAGER *pMgr, COMP_ENTRY *pEntry)
+{
+ EnterCriticalSection(&pMgr->Lock);
+ DListRemove(&pEntry->MgrEntry);
+ LeaveCriticalSection(&pMgr->Lock);
+}
+
DWORD CompManagerPoll(COMP_MANAGER *pMgr, DWORD Milliseconds,
COMP_CHANNEL **ppChannel)
{
COMP_ENTRY *entry;
- OVERLAPPED *overlap;
- DWORD bytes, ret;
- ULONG_PTR key;
+ DWORD ret = 0;
- if (GetQueuedCompletionStatus(pMgr->CompQueue, &bytes, &key, &overlap,
- Milliseconds)) {
- entry = CONTAINING_RECORD(overlap, COMP_ENTRY, Overlap);
- *ppChannel = entry->Channel;
- CompChannelQueue(entry->Channel, entry);
- ret = 0;
- } else {
- ret = GetLastError();
+ EnterCriticalSection(&pMgr->Lock);
+ while (DListEmpty(&pMgr->DoneList)) {
+ ResetEvent(pMgr->Event);
+ LeaveCriticalSection(&pMgr->Lock);
+
+ ret = WaitForSingleObject(pMgr->Event, Milliseconds);
+ if (ret) {
+ return ret;
+ }
+
+ EnterCriticalSection(&pMgr->Lock);
}
+
+ entry = CONTAINING_RECORD(pMgr->DoneList.Next, COMP_ENTRY, MgrEntry);
+ *ppChannel = entry->Channel;
+ if (entry->Channel == NULL) {
+ DListRemove(&entry->MgrEntry);
+ InterlockedExchange(&entry->Busy, 0);
+ ret = ERROR_CANCELLED;
+ }
+ LeaveCriticalSection(&pMgr->Lock);
+
return ret;
}
+void CompManagerCancel(COMP_MANAGER *pMgr)
+{
+ if (InterlockedCompareExchange(&pMgr->Entry.Busy, 1, 0) == 0) {
+ PostQueuedCompletionStatus(pMgr->CompQueue, 0, (ULONG_PTR) pMgr,
+ &pMgr->Entry.Overlap);
+ }
+}
+
-void CompChannelInit(COMP_MANAGER *pMgr, COMP_CHANNEL *pChannel, DWORD Milliseconds)
+/*
+ * Completion channel
+ */
+
+DWORD CompChannelInit(COMP_MANAGER *pMgr, COMP_CHANNEL *pChannel, DWORD Milliseconds)
{
pChannel->Manager = pMgr;
pChannel->Head = NULL;
pChannel->TailPtr = &pChannel->Head;
- InitializeCriticalSection(&pChannel->Lock);
pChannel->Milliseconds = Milliseconds;
+
+ pChannel->Event = CreateEvent(NULL, TRUE, TRUE, NULL);
+ if (pChannel->Event == NULL) {
+ return GetLastError();
+ }
+
+ InitializeCriticalSection(&pChannel->Lock);
+ CompEntryInit(pChannel, &pChannel->Entry);
+ return 0;
}
void CompChannelCleanup(COMP_CHANNEL *pChannel)
{
+ CloseHandle(pChannel->Event);
DeleteCriticalSection(&pChannel->Lock);
}
@@ -114,84 +220,101 @@ static COMP_ENTRY *CompChannelRemoveHead
return entry;
}
+static COMP_ENTRY *CompChannelFindRemove(COMP_CHANNEL *pChannel, COMP_ENTRY *pEntry)
+{
+ COMP_ENTRY **entry_ptr, *entry;
+
+ EnterCriticalSection(&pChannel->Lock);
+ entry_ptr = &pChannel->Head;
+ while (*entry_ptr && *entry_ptr != pEntry) {
+ entry_ptr = &(*entry_ptr)->Next;
+ }
+
+ if (*entry_ptr != NULL) {
+ *entry_ptr = pEntry->Next;
+ if (pChannel->TailPtr == &pEntry->Next) {
+ pChannel->TailPtr = entry_ptr;
+ }
+ CompManagerRemoveEntry(pChannel->Manager, pEntry);
+ InterlockedExchange(&pEntry->Busy, 0);
+ }
+ LeaveCriticalSection(&pChannel->Lock);
+ return *entry_ptr;
+}
+
static void CompChannelQueue(COMP_CHANNEL *pChannel, COMP_ENTRY *pEntry)
{
pEntry->Next = NULL;
EnterCriticalSection(&pChannel->Lock);
+ CompManagerQueue(pChannel->Manager, pEntry);
CompChannelInsertTail(pChannel, pEntry);
+ SetEvent(pChannel->Event);
LeaveCriticalSection(&pChannel->Lock);
}
DWORD CompChannelPoll(COMP_CHANNEL *pChannel, COMP_ENTRY **ppEntry)
{
- COMP_MANAGER *mgr = pChannel->Manager;
- COMP_CHANNEL *chan;
- DWORD ret = 0;
- ULONG locked;
+ COMP_ENTRY *entry;
+ DWORD ret;
EnterCriticalSection(&pChannel->Lock);
while (pChannel->Head == NULL) {
+ ResetEvent(pChannel->Event);
LeaveCriticalSection(&pChannel->Lock);
- locked = InterlockedCompareExchange(&mgr->Lock, 1, 0);
- if (locked == 0) {
- ResetEvent(mgr->Event);
- ret = CompManagerPoll(mgr, pChannel->Milliseconds, &chan);
- InterlockedExchange(&mgr->Lock, 0);
- SetEvent(mgr->Event);
- } else {
- ret = WaitForSingleObject(mgr->Event, pChannel->Milliseconds);
- }
+ ret = WaitForSingleObject(pChannel->Event, pChannel->Milliseconds);
if (ret) {
- goto out;
+ return ret;
}
EnterCriticalSection(&pChannel->Lock);
}
- *ppEntry = CompChannelRemoveHead(pChannel);
+ entry = CompChannelRemoveHead(pChannel);
+ CompManagerRemoveEntry(pChannel->Manager, entry);
LeaveCriticalSection(&pChannel->Lock);
-out:
+ InterlockedExchange(&entry->Busy, 0);
+ *ppEntry = entry;
+ ret = (entry == &pChannel->Entry) ? ERROR_CANCELLED : 0;
+
return ret;
}
-void CompChannelRemoveEntry(COMP_CHANNEL *pChannel, COMP_ENTRY *pEntry)
+void CompChannelCancel(COMP_CHANNEL *pChannel)
{
- COMP_CHANNEL *chan;
- COMP_ENTRY **entry_ptr;
- DWORD ret;
-
- do {
- ret = CompManagerPoll(pChannel->Manager, 0, &chan);
- } while (!ret);
- SetEvent(pChannel->Manager->Event);
-
- EnterCriticalSection(&pChannel->Lock);
- entry_ptr = &pChannel->Head;
- while (*entry_ptr && *entry_ptr != pEntry) {
- entry_ptr = &(*entry_ptr)->Next;
+ if (InterlockedCompareExchange(&pChannel->Entry.Busy, 1, 0) == 0) {
+ PostQueuedCompletionStatus(pChannel->Manager->CompQueue, 0,
+ (ULONG_PTR) pChannel, &pChannel->Entry.Overlap);
}
-
- if (*entry_ptr != NULL) {
- *entry_ptr = pEntry->Next;
- if (pChannel->TailPtr == &pEntry->Next) {
- pChannel->TailPtr = entry_ptr;
- }
- }
- LeaveCriticalSection(&pChannel->Lock);
}
+
+/*
+ * Completion entry
+ */
+
void CompEntryInit(COMP_CHANNEL *pChannel, COMP_ENTRY *pEntry)
{
+ RtlZeroMemory(pEntry, sizeof *pEntry);
pEntry->Channel = pChannel;
}
DWORD CompEntryPost(COMP_ENTRY *pEntry)
{
- if (PostQueuedCompletionStatus(pEntry->Channel->Manager->CompQueue, 0, 0,
- &pEntry->Overlap)) {
- return 0;
- } else {
- return GetLastError();
+ if (InterlockedCompareExchange(&pEntry->Busy, 1, 0) == 0) {
+ if (!PostQueuedCompletionStatus(pEntry->Channel->Manager->CompQueue,
+ 0, 0, &pEntry->Overlap)) {
+ InterlockedExchange(&pEntry->Busy, 0);
+ return GetLastError();
+ }
+ }
+ return 0;
+}
+
+void CompEntryCancel(COMP_ENTRY *pEntry)
+{
+ while (pEntry->Busy) {
+ Sleep(0);
+ CompChannelFindRemove(pEntry->Channel, pEntry);
}
}
--- trunk\inc\user\comp_channel.h 2009-03-10 02:09:09.765625000 -0700
+++ branches\winverbs\inc\user\comp_channel.h 2009-04-09 17:43:01.515373300 -0700
@@ -33,6 +33,7 @@
#define COMP_CHANNEL_H
#include <windows.h>
+#include <dlist.h>
#ifdef __cplusplus
extern "C" {
@@ -41,8 +42,10 @@ extern "C" {
typedef struct _COMP_ENTRY
{
struct _COMP_ENTRY *Next;
+ DLIST_ENTRY MgrEntry;
OVERLAPPED Overlap;
struct _COMP_CHANNEL *Channel;
+ LONG volatile Busy;
} COMP_ENTRY;
@@ -51,6 +54,8 @@ typedef struct _COMP_CHANNEL
struct _COMP_MANAGER *Manager;
COMP_ENTRY *Head;
COMP_ENTRY **TailPtr;
+ COMP_ENTRY Entry;
+ HANDLE Event;
CRITICAL_SECTION Lock;
DWORD Milliseconds;
@@ -59,8 +64,13 @@ typedef struct _COMP_CHANNEL
typedef struct _COMP_MANAGER
{
HANDLE CompQueue;
+ DLIST_ENTRY DoneList;
+ COMP_ENTRY Entry;
+ HANDLE Thread;
+ BOOL Run;
HANDLE Event;
- LONG volatile Lock;
+ LONG volatile Busy;
+ CRITICAL_SECTION Lock;
} COMP_MANAGER;
@@ -69,15 +79,17 @@ void CompManagerClose(COMP_MANAGER *pMg
DWORD CompManagerMonitor(COMP_MANAGER *pMgr, HANDLE hFile, ULONG_PTR Key);
DWORD CompManagerPoll(COMP_MANAGER *pMgr, DWORD Milliseconds,
COMP_CHANNEL **ppChannel);
+void CompManagerCancel(COMP_MANAGER *pMgr);
-void CompChannelInit(COMP_MANAGER *pMgr, COMP_CHANNEL *pChannel,
+DWORD CompChannelInit(COMP_MANAGER *pMgr, COMP_CHANNEL *pChannel,
DWORD Milliseconds);
void CompChannelCleanup(COMP_CHANNEL *pChannel);
DWORD CompChannelPoll(COMP_CHANNEL *pChannel, COMP_ENTRY **ppEntry);
-void CompChannelRemoveEntry(COMP_CHANNEL *pChannel, COMP_ENTRY *pEntry);
+void CompChannelCancel(COMP_CHANNEL *pChannel);
void CompEntryInit(COMP_CHANNEL *pChannel, COMP_ENTRY *pEntry);
DWORD CompEntryPost(COMP_ENTRY *pEntry);
+void CompEntryCancel(COMP_ENTRY *pEntry);
#ifdef __cplusplus
}
More information about the ofw
mailing list