[ofw] [PATCH 2/4] etc/comp_channel: add completion channel abstraction

Sean Hefty sean.hefty at intel.com
Tue Mar 10 16:49:14 PDT 2009


Add a new abstraction, the completion channel, capable of de-multiplexing
overlapped completion events among multiple queues.

The completion abstraction consists of 3 main components:

COMP_MANAGER
Maps to an IOCP.  The completion manager tracks all completions on any
of its associated channels.  This allows a user to 'poll' the completion
manager to receive notification when a completion event occurs on any
of the channels, similar to polling a set of fd's.

COMP_CHANNEL
Maps to a queue of completed requests.  A user can 'poll' a single channel
for completions if they are only interest in processing events on that
channel.  Internally, polling on a channel will poll the manager for
completions, but only process those associated with the specified channel.

COMP_EVENT
Maps to an overlapped structure.  Operations needing a completion event
reference this structure.  When the event occurs, it is queued to the
correct channel.

The implementation assumes that only one or a very small number of threads
will ever be trying to process events at any one time.  (Based on existing
applications, this is true.)  The abstraction itself is threadless. 

Signed-off-by: Sean Hefty <sean.hefty at intel.com>
---
Code inserted directly, rather than as a patch, since it's all new.

/*
 * Copyright (c) 2009 Intel Corp., Inc.  All rights reserved.
 *
 * This software is available to you under the OpenIB.org BSD license
 * below:
 *
 *     Redistribution and use in source and binary forms, with or
 *     without modification, are permitted provided that the following
 *     conditions are met:
 *
 *      - Redistributions of source code must retain the above
 *        copyright notice, this list of conditions and the following
 *        disclaimer.
 *
 *      - Redistributions in binary form must reproduce the above
 *        copyright notice, this list of conditions and the following
 *        disclaimer in the documentation and/or other materials
 *        provided with the distribution.
 *
 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
 * EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
 * MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AWV
 * NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS
 * BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN
 * ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
 * CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
 * SOFTWARE.
 */

#pragma once

#ifndef COMP_CHANNEL_H
#define COMP_CHANNEL_H

#include <windows.h>

#ifdef __cplusplus
extern "C" {
#endif

typedef struct _COMP_ENTRY
{
	struct _COMP_ENTRY		*Next;
	OVERLAPPED				Overlap;
	struct _COMP_CHANNEL	*Channel;

}	COMP_ENTRY;

typedef struct _COMP_CHANNEL
{
	struct _COMP_MANAGER	*Manager;
	COMP_ENTRY				*Head;
	COMP_ENTRY				**TailPtr;
	CRITICAL_SECTION		Lock;
	DWORD					Milliseconds;

}	COMP_CHANNEL;

typedef struct _COMP_MANAGER
{
	HANDLE					CompQueue;
	HANDLE					Event;
	LONG volatile			Lock;

}	COMP_MANAGER;

DWORD		CompManagerOpen(COMP_MANAGER *pMgr);
void		CompManagerClose(COMP_MANAGER *pMgr);
DWORD		CompManagerMonitor(COMP_MANAGER *pMgr, HANDLE hFile, ULONG_PTR Key);
DWORD		CompManagerPoll(COMP_MANAGER *pMgr, DWORD Milliseconds,
							COMP_CHANNEL **ppChannel);

void		CompChannelInit(COMP_MANAGER *pMgr, COMP_CHANNEL *pChannel,
							DWORD Milliseconds);
void		CompChannelCleanup(COMP_CHANNEL *pChannel);
DWORD		CompChannelPoll(COMP_CHANNEL *pChannel, COMP_ENTRY **ppEntry);
void		CompChannelRemoveEntry(COMP_CHANNEL *pChannel, COMP_ENTRY *pEntry);

void		CompEntryInit(COMP_CHANNEL *pChannel, COMP_ENTRY *pEntry);
DWORD		CompEntryPost(COMP_ENTRY *pEntry);

#ifdef __cplusplus
}
#endif

#endif /* COMP_CHANNEL_H */

/*
 * Copyright (c) 2008, 2009 Intel Corporation.  All rights reserved.
 *
 * This software is available to you under the OpenIB.org BSD license
 * below:
 *
 *     Redistribution and use in source and binary forms, with or
 *     without modification, are permitted provided that the following
 *     conditions are met:
 *
 *      - Redistributions of source code must retain the above
 *        copyright notice, this list of conditions and the following
 *        disclaimer.
 *
 *      - Redistributions in binary form must reproduce the above
 *        copyright notice, this list of conditions and the following
 *        disclaimer in the documentation and/or other materials
 *        provided with the distribution.
 *
 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
 * EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
 * MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AWV
 * NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS
 * BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN
 * ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
 * CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
 * SOFTWARE.
 */

#include <comp_channel.h>

static void CompChannelQueue(COMP_CHANNEL *pChannel, COMP_ENTRY *pEntry);

DWORD CompManagerOpen(COMP_MANAGER *pMgr)
{
	pMgr->CompQueue = CreateIoCompletionPort(INVALID_HANDLE_VALUE, NULL, 0, -1);
	if (pMgr->CompQueue == NULL) {
		return GetLastError();
	}

	pMgr->Event = CreateEvent(NULL, TRUE, TRUE, NULL);
	if (pMgr->Event == NULL) {
		return GetLastError();
	}

	pMgr->Lock = 0;
	return 0;
}

void CompManagerClose(COMP_MANAGER *pMgr)
{
	CloseHandle(pMgr->CompQueue);
	CloseHandle(pMgr->Event);
}

DWORD CompManagerMonitor(COMP_MANAGER *pMgr, HANDLE hFile, ULONG_PTR Key)
{
	HANDLE cq;

	cq = CreateIoCompletionPort(hFile, pMgr->CompQueue, Key, 0);
	return (cq == NULL) ? GetLastError() : 0;
}

DWORD CompManagerPoll(COMP_MANAGER *pMgr, DWORD Milliseconds,
					  COMP_CHANNEL **ppChannel)
{
	COMP_ENTRY *entry;
	OVERLAPPED *overlap;
	DWORD bytes, ret;
	ULONG_PTR key;

	if (GetQueuedCompletionStatus(pMgr->CompQueue, &bytes, &key, &overlap,
								  Milliseconds)) {
		entry = CONTAINING_RECORD(overlap, COMP_ENTRY, Overlap);
		*ppChannel = entry->Channel;
		CompChannelQueue(entry->Channel, entry);
		ret = 0;
	} else {
		ret = GetLastError();
	}
	return ret;
}


void CompChannelInit(COMP_MANAGER *pMgr, COMP_CHANNEL *pChannel, DWORD Milliseconds)
{
	pChannel->Manager = pMgr;
	pChannel->Head = NULL;
	pChannel->TailPtr = &pChannel->Head;
	InitializeCriticalSection(&pChannel->Lock);
	pChannel->Milliseconds = Milliseconds;
}

void CompChannelCleanup(COMP_CHANNEL *pChannel)
{
	DeleteCriticalSection(&pChannel->Lock);	
}

static void CompChannelInsertTail(COMP_CHANNEL *pChannel, COMP_ENTRY *pEntry)
{
	*pChannel->TailPtr = pEntry;
	pChannel->TailPtr = &pEntry->Next;
}

static COMP_ENTRY *CompChannelRemoveHead(COMP_CHANNEL *pChannel)
{
	COMP_ENTRY *entry;

	entry = pChannel->Head;
	pChannel->Head = entry->Next;
	if (pChannel->Head == NULL) {
		pChannel->TailPtr = &pChannel->Head;
	}
	return entry;
}

static void CompChannelQueue(COMP_CHANNEL *pChannel, COMP_ENTRY *pEntry)
{
	pEntry->Next = NULL;
	EnterCriticalSection(&pChannel->Lock);
	CompChannelInsertTail(pChannel, pEntry);
	LeaveCriticalSection(&pChannel->Lock);
}

DWORD CompChannelPoll(COMP_CHANNEL *pChannel, COMP_ENTRY **ppEntry)
{
	COMP_MANAGER *mgr = pChannel->Manager;
	COMP_CHANNEL *chan;
	DWORD ret = 0;
	ULONG locked;

	EnterCriticalSection(&pChannel->Lock);
	while (pChannel->Head == NULL) {
		LeaveCriticalSection(&pChannel->Lock);

		locked = InterlockedCompareExchange(&mgr->Lock, 1, 0);
		if (locked == 0) {
			ResetEvent(mgr->Event);
			ret = CompManagerPoll(mgr, pChannel->Milliseconds, &chan);
			InterlockedExchange(&mgr->Lock, 0);
			SetEvent(mgr->Event);
		} else {
			ret = WaitForSingleObject(mgr->Event, pChannel->Milliseconds);
		}
		if (ret) {
			goto out;
		}

		EnterCriticalSection(&pChannel->Lock);
	}
	*ppEntry = CompChannelRemoveHead(pChannel);
	LeaveCriticalSection(&pChannel->Lock);

out:
	return ret;
}

void CompChannelRemoveEntry(COMP_CHANNEL *pChannel, COMP_ENTRY *pEntry)
{
	COMP_CHANNEL *chan;
	COMP_ENTRY **entry_ptr;
	DWORD ret;

	do {
		ret = CompManagerPoll(pChannel->Manager, 0, &chan);
	} while (!ret);
	SetEvent(pChannel->Manager->Event);

	EnterCriticalSection(&pChannel->Lock);
	entry_ptr = &pChannel->Head;
	while (*entry_ptr && *entry_ptr != pEntry) {
		entry_ptr = &(*entry_ptr)->Next;
	}

	if (*entry_ptr != NULL) {
		*entry_ptr = pEntry->Next;
		if (pChannel->TailPtr == &pEntry->Next) {
			pChannel->TailPtr = entry_ptr;
		}
	}
	LeaveCriticalSection(&pChannel->Lock);
}

void CompEntryInit(COMP_CHANNEL *pChannel, COMP_ENTRY *pEntry)
{
	pEntry->Channel = pChannel;
}

DWORD CompEntryPost(COMP_ENTRY *pEntry)
{
	if (PostQueuedCompletionStatus(pEntry->Channel->Manager->CompQueue, 0, 0,
								   &pEntry->Overlap)) {
		return 0;
	} else {
		return GetLastError();
	}
}





More information about the ofw mailing list