[ofw] [PATCH] etc/comp_channel: fix lost event problem

Sean Hefty sean.hefty at intel.com
Fri Apr 17 11:23:08 PDT 2009


The previous version of the completion channel was racy and would
occasionally lose events, resulting in users blocking indefinitely
if no new events occurred.  The most sure fix for this is to add
a thread to the completion manager that reaps events from an IO
completion port and dispatches them to the correct completion
channel.  This results in a 1-2% performance hit in libibverbs
bandwidth tests that wait on CQ, but actually works.

Signed-off-by: Sean Hefty <sean.hefty at intel.com>
---
As a reminder, the completion manager / channel abstraction is used to
simulate the select/poll functionality across multiple FDs on linux.

diff -up -r -X \mshefty\scm\winof\trunk\docs\dontdiff.txt -I '\$Id:' trunk\etc/user/comp_channel.cpp
branches\winverbs\etc/user/comp_channel.cpp
--- trunk\etc/user/comp_channel.cpp	2009-03-10 02:11:36.546875000 -0700
+++ branches\winverbs\etc/user/comp_channel.cpp	2009-04-10 11:57:38.534233100 -0700
@@ -28,29 +28,88 @@
  */
 
 #include <comp_channel.h>
+#include <process.h>
 
+static void CompManagerQueue(COMP_MANAGER *pMgr, COMP_ENTRY *pEntry);
 static void CompChannelQueue(COMP_CHANNEL *pChannel, COMP_ENTRY *pEntry);
 
+
+/*
+ * Completion manager
+ */
+
+static unsigned __stdcall CompThreadPoll(void *Context)
+{
+	COMP_MANAGER *mgr = (COMP_MANAGER *) Context;
+	COMP_ENTRY *entry;
+	OVERLAPPED *overlap;
+	DWORD bytes;
+	ULONG_PTR key;
+
+	while (mgr->Run) {
+		GetQueuedCompletionStatus(mgr->CompQueue, &bytes, &key,
+								  &overlap, INFINITE);
+		entry = CONTAINING_RECORD(overlap, COMP_ENTRY, Overlap);
+
+		if (entry->Channel) {
+			CompChannelQueue(entry->Channel, entry);
+		} else {
+			CompManagerQueue(mgr, entry);
+		}
+	}
+
+	_endthreadex(0);
+	return 0;
+}
+
 DWORD CompManagerOpen(COMP_MANAGER *pMgr)
 {
+	DWORD ret;
+
+	InitializeCriticalSection(&pMgr->Lock);
+	pMgr->Busy = 0;
+	DListInit(&pMgr->DoneList);
+	CompEntryInit(NULL, &pMgr->Entry);
+
 	pMgr->CompQueue = CreateIoCompletionPort(INVALID_HANDLE_VALUE, NULL, 0, -1);
 	if (pMgr->CompQueue == NULL) {
-		return GetLastError();
+		ret = GetLastError();
+		goto err1;
 	}
 
 	pMgr->Event = CreateEvent(NULL, TRUE, TRUE, NULL);
 	if (pMgr->Event == NULL) {
-		return GetLastError();
+		ret = GetLastError();
+		goto err2;
 	}
 
-	pMgr->Lock = 0;
+	pMgr->Run = TRUE;
+	pMgr->Thread = (HANDLE) _beginthreadex(NULL, 0, CompThreadPoll, pMgr, 0, NULL);
+	if (pMgr->Thread == NULL) {
+		ret = GetLastError();
+		goto err3;
+	}
 	return 0;
+
+err3:
+	CloseHandle(pMgr->Event);
+err2:
+	CloseHandle(pMgr->CompQueue);
+err1:
+	DeleteCriticalSection(&pMgr->Lock);	
+	return ret;
 }
 
 void CompManagerClose(COMP_MANAGER *pMgr)
 {
+	pMgr->Run = FALSE;
+	CompManagerCancel(pMgr);
+	WaitForSingleObject(pMgr->Thread, INFINITE);
+	CloseHandle(pMgr->Thread);
+
 	CloseHandle(pMgr->CompQueue);
 	CloseHandle(pMgr->Event);
+	DeleteCriticalSection(&pMgr->Lock);	
 }
 
 DWORD CompManagerMonitor(COMP_MANAGER *pMgr, HANDLE hFile, ULONG_PTR Key)
@@ -61,38 +120,85 @@ DWORD CompManagerMonitor(COMP_MANAGER *p
 	return (cq == NULL) ? GetLastError() : 0;
 }
 
+static void CompManagerQueue(COMP_MANAGER *pMgr, COMP_ENTRY *pEntry)
+{
+	EnterCriticalSection(&pMgr->Lock);
+	DListInsertTail(&pEntry->MgrEntry, &pMgr->DoneList);
+	SetEvent(pMgr->Event);
+	LeaveCriticalSection(&pMgr->Lock);
+}
+
+static void CompManagerRemoveEntry(COMP_MANAGER *pMgr, COMP_ENTRY *pEntry)
+{
+	EnterCriticalSection(&pMgr->Lock);
+	DListRemove(&pEntry->MgrEntry);
+	LeaveCriticalSection(&pMgr->Lock);
+}
+
 DWORD CompManagerPoll(COMP_MANAGER *pMgr, DWORD Milliseconds,
 					  COMP_CHANNEL **ppChannel)
 {
 	COMP_ENTRY *entry;
-	OVERLAPPED *overlap;
-	DWORD bytes, ret;
-	ULONG_PTR key;
+	DWORD ret = 0;
 
-	if (GetQueuedCompletionStatus(pMgr->CompQueue, &bytes, &key, &overlap,
-								  Milliseconds)) {
-		entry = CONTAINING_RECORD(overlap, COMP_ENTRY, Overlap);
-		*ppChannel = entry->Channel;
-		CompChannelQueue(entry->Channel, entry);
-		ret = 0;
-	} else {
-		ret = GetLastError();
+	EnterCriticalSection(&pMgr->Lock);
+	while (DListEmpty(&pMgr->DoneList)) {
+		ResetEvent(pMgr->Event);
+		LeaveCriticalSection(&pMgr->Lock);
+	
+		ret = WaitForSingleObject(pMgr->Event, Milliseconds);
+		if (ret) {
+			return ret;
+		}
+
+		EnterCriticalSection(&pMgr->Lock);
 	}
+
+	entry = CONTAINING_RECORD(pMgr->DoneList.Next, COMP_ENTRY, MgrEntry);
+	*ppChannel = entry->Channel;
+	if (entry->Channel == NULL) {
+		DListRemove(&entry->MgrEntry);
+		InterlockedExchange(&entry->Busy, 0);
+		ret = ERROR_CANCELLED;
+	}
+	LeaveCriticalSection(&pMgr->Lock);
+
 	return ret;
 }
 
+void CompManagerCancel(COMP_MANAGER *pMgr)
+{
+	if (InterlockedCompareExchange(&pMgr->Entry.Busy, 1, 0) == 0) {
+		PostQueuedCompletionStatus(pMgr->CompQueue, 0, (ULONG_PTR) pMgr,
+								   &pMgr->Entry.Overlap);
+	}
+}
+
 
-void CompChannelInit(COMP_MANAGER *pMgr, COMP_CHANNEL *pChannel, DWORD Milliseconds)
+/*
+ * Completion channel
+ */
+
+DWORD CompChannelInit(COMP_MANAGER *pMgr, COMP_CHANNEL *pChannel, DWORD Milliseconds)
 {
 	pChannel->Manager = pMgr;
 	pChannel->Head = NULL;
 	pChannel->TailPtr = &pChannel->Head;
-	InitializeCriticalSection(&pChannel->Lock);
 	pChannel->Milliseconds = Milliseconds;
+
+	pChannel->Event = CreateEvent(NULL, TRUE, TRUE, NULL);
+	if (pChannel->Event == NULL) {
+		return GetLastError();
+	}
+
+	InitializeCriticalSection(&pChannel->Lock);
+	CompEntryInit(pChannel, &pChannel->Entry);
+	return 0;
 }
 
 void CompChannelCleanup(COMP_CHANNEL *pChannel)
 {
+	CloseHandle(pChannel->Event);
 	DeleteCriticalSection(&pChannel->Lock);	
 }
 
@@ -114,84 +220,101 @@ static COMP_ENTRY *CompChannelRemoveHead
 	return entry;
 }
 
+static COMP_ENTRY *CompChannelFindRemove(COMP_CHANNEL *pChannel, COMP_ENTRY *pEntry)
+{
+	COMP_ENTRY **entry_ptr, *entry;
+
+	EnterCriticalSection(&pChannel->Lock);
+	entry_ptr = &pChannel->Head;
+	while (*entry_ptr && *entry_ptr != pEntry) {
+		entry_ptr = &(*entry_ptr)->Next;
+	}
+
+	if (*entry_ptr != NULL) {
+		*entry_ptr = pEntry->Next;
+		if (pChannel->TailPtr == &pEntry->Next) {
+			pChannel->TailPtr = entry_ptr;
+		}
+		CompManagerRemoveEntry(pChannel->Manager, pEntry);
+		InterlockedExchange(&pEntry->Busy, 0);
+	}
+	LeaveCriticalSection(&pChannel->Lock);
+	return *entry_ptr;
+}
+
 static void CompChannelQueue(COMP_CHANNEL *pChannel, COMP_ENTRY *pEntry)
 {
 	pEntry->Next = NULL;
 	EnterCriticalSection(&pChannel->Lock);
+	CompManagerQueue(pChannel->Manager, pEntry);
 	CompChannelInsertTail(pChannel, pEntry);
+	SetEvent(pChannel->Event);
 	LeaveCriticalSection(&pChannel->Lock);
 }
 
 DWORD CompChannelPoll(COMP_CHANNEL *pChannel, COMP_ENTRY **ppEntry)
 {
-	COMP_MANAGER *mgr = pChannel->Manager;
-	COMP_CHANNEL *chan;
-	DWORD ret = 0;
-	ULONG locked;
+	COMP_ENTRY *entry;
+	DWORD ret;
 
 	EnterCriticalSection(&pChannel->Lock);
 	while (pChannel->Head == NULL) {
+		ResetEvent(pChannel->Event);
 		LeaveCriticalSection(&pChannel->Lock);
 
-		locked = InterlockedCompareExchange(&mgr->Lock, 1, 0);
-		if (locked == 0) {
-			ResetEvent(mgr->Event);
-			ret = CompManagerPoll(mgr, pChannel->Milliseconds, &chan);
-			InterlockedExchange(&mgr->Lock, 0);
-			SetEvent(mgr->Event);
-		} else {
-			ret = WaitForSingleObject(mgr->Event, pChannel->Milliseconds);
-		}
+		ret = WaitForSingleObject(pChannel->Event, pChannel->Milliseconds);
 		if (ret) {
-			goto out;
+			return ret;
 		}
 
 		EnterCriticalSection(&pChannel->Lock);
 	}
-	*ppEntry = CompChannelRemoveHead(pChannel);
+	entry = CompChannelRemoveHead(pChannel);
+	CompManagerRemoveEntry(pChannel->Manager, entry);
 	LeaveCriticalSection(&pChannel->Lock);
 
-out:
+	InterlockedExchange(&entry->Busy, 0);
+	*ppEntry = entry;
+	ret = (entry == &pChannel->Entry) ? ERROR_CANCELLED : 0;
+
 	return ret;
 }
 
-void CompChannelRemoveEntry(COMP_CHANNEL *pChannel, COMP_ENTRY *pEntry)
+void CompChannelCancel(COMP_CHANNEL *pChannel)
 {
-	COMP_CHANNEL *chan;
-	COMP_ENTRY **entry_ptr;
-	DWORD ret;
-
-	do {
-		ret = CompManagerPoll(pChannel->Manager, 0, &chan);
-	} while (!ret);
-	SetEvent(pChannel->Manager->Event);
-
-	EnterCriticalSection(&pChannel->Lock);
-	entry_ptr = &pChannel->Head;
-	while (*entry_ptr && *entry_ptr != pEntry) {
-		entry_ptr = &(*entry_ptr)->Next;
+	if (InterlockedCompareExchange(&pChannel->Entry.Busy, 1, 0) == 0) {
+		PostQueuedCompletionStatus(pChannel->Manager->CompQueue, 0,
+								   (ULONG_PTR) pChannel, &pChannel->Entry.Overlap);
 	}
-
-	if (*entry_ptr != NULL) {
-		*entry_ptr = pEntry->Next;
-		if (pChannel->TailPtr == &pEntry->Next) {
-			pChannel->TailPtr = entry_ptr;
-		}
-	}
-	LeaveCriticalSection(&pChannel->Lock);
 }
 
+
+/*
+ * Completion entry
+ */
+
 void CompEntryInit(COMP_CHANNEL *pChannel, COMP_ENTRY *pEntry)
 {
+	RtlZeroMemory(pEntry, sizeof *pEntry);
 	pEntry->Channel = pChannel;
 }
 
 DWORD CompEntryPost(COMP_ENTRY *pEntry)
 {
-	if (PostQueuedCompletionStatus(pEntry->Channel->Manager->CompQueue, 0, 0,
-								   &pEntry->Overlap)) {
-		return 0;
-	} else {
-		return GetLastError();
+	if (InterlockedCompareExchange(&pEntry->Busy, 1, 0) == 0) {
+		if (!PostQueuedCompletionStatus(pEntry->Channel->Manager->CompQueue,
+										0, 0, &pEntry->Overlap)) {
+			InterlockedExchange(&pEntry->Busy, 0);
+			return GetLastError();
+		}
+	}
+	return 0;
+}
+
+void CompEntryCancel(COMP_ENTRY *pEntry)
+{
+	while (pEntry->Busy) {
+		Sleep(0);
+		CompChannelFindRemove(pEntry->Channel, pEntry);
 	}
 }
--- trunk\inc\user\comp_channel.h	2009-03-10 02:09:09.765625000 -0700
+++ branches\winverbs\inc\user\comp_channel.h	2009-04-09 17:43:01.515373300 -0700
@@ -33,6 +33,7 @@
 #define COMP_CHANNEL_H
 
 #include <windows.h>
+#include <dlist.h>
 
 #ifdef __cplusplus
 extern "C" {
@@ -41,8 +42,10 @@ extern "C" {
 typedef struct _COMP_ENTRY
 {
 	struct _COMP_ENTRY		*Next;
+	DLIST_ENTRY				MgrEntry;
 	OVERLAPPED				Overlap;
 	struct _COMP_CHANNEL	*Channel;
+	LONG volatile			Busy;
 
 }	COMP_ENTRY;
 
@@ -51,6 +54,8 @@ typedef struct _COMP_CHANNEL
 	struct _COMP_MANAGER	*Manager;
 	COMP_ENTRY				*Head;
 	COMP_ENTRY				**TailPtr;
+	COMP_ENTRY				Entry;
+	HANDLE					Event;
 	CRITICAL_SECTION		Lock;
 	DWORD					Milliseconds;
 
@@ -59,8 +64,13 @@ typedef struct _COMP_CHANNEL
 typedef struct _COMP_MANAGER
 {
 	HANDLE					CompQueue;
+	DLIST_ENTRY				DoneList;
+	COMP_ENTRY				Entry;
+	HANDLE					Thread;
+	BOOL					Run;
 	HANDLE					Event;
-	LONG volatile			Lock;
+	LONG volatile			Busy;
+	CRITICAL_SECTION		Lock;
 
 }	COMP_MANAGER;
 
@@ -69,15 +79,17 @@ void		CompManagerClose(COMP_MANAGER *pMg
 DWORD		CompManagerMonitor(COMP_MANAGER *pMgr, HANDLE hFile, ULONG_PTR Key);
 DWORD		CompManagerPoll(COMP_MANAGER *pMgr, DWORD Milliseconds,
 							COMP_CHANNEL **ppChannel);
+void		CompManagerCancel(COMP_MANAGER *pMgr);
 
-void		CompChannelInit(COMP_MANAGER *pMgr, COMP_CHANNEL *pChannel,
+DWORD		CompChannelInit(COMP_MANAGER *pMgr, COMP_CHANNEL *pChannel,
 							DWORD Milliseconds);
 void		CompChannelCleanup(COMP_CHANNEL *pChannel);
 DWORD		CompChannelPoll(COMP_CHANNEL *pChannel, COMP_ENTRY **ppEntry);
-void		CompChannelRemoveEntry(COMP_CHANNEL *pChannel, COMP_ENTRY *pEntry);
+void		CompChannelCancel(COMP_CHANNEL *pChannel);
 
 void		CompEntryInit(COMP_CHANNEL *pChannel, COMP_ENTRY *pEntry);
 DWORD		CompEntryPost(COMP_ENTRY *pEntry);
+void		CompEntryCancel(COMP_ENTRY *pEntry);
 
 #ifdef __cplusplus
 }




More information about the ofw mailing list