[ofw] [RFC] [PATCH 1/2] work_queue: new abstraction for queuing work

Sean Hefty sean.hefty at intel.com
Wed May 20 16:17:14 PDT 2009


Create an abstraction for queuing work for asynchronous processing
that must occur at passive level.  The work queue uses the system
threads and does not allocate its own threads.  The parallelism of
the work queue is controlled through an initialization parameter,
TaskCount.  Setting TaskCount = 1 results in a serialized work queue.
Otherwise, the work queue will have up to TaskCount threads processing
the work list, depending on the number of threads available in the
system thread pool.

The work queue maintains a small pool of IO_WORKITEMs that it uses
to process a list of queued work entries.  This allows
objects to be created with embedded work entry objects.  (IO_WORKITEMs
cannot be embedded in structures pre-Vista.)

The work queue also alleviates the queuing of a large number of
associated work requests from consuming all available system threads.
A future enhancement would be to limit how long the work callback handler
can run before the work item is requeued.  Work entries are protected
against being queued multiple times.

Signed-off-by: Sean Hefty <sean.hefty at intel.com>
---
I'm sure that I read in the WDK documentation that it recommended
that drivers use a pool of reusable IO_WORKITEMs to process a list
of pending requests if it created more than one every minute, but
I can't find that reference now.  That's what sent me down this path
to begin with.

I anticipate that this abstraction will be even more useful when
creating a kernel winverbs interface that allows most function calls
at dispatch.

/*
 * Copyright (c) 2009 Intel Corporation. All rights reserved.
 *
 * This software is available to you under the OpenIB.org BSD license
 * below:
 *
 *     Redistribution and use in source and binary forms, with or
 *     without modification, are permitted provided that the following
 *     conditions are met:
 *
 *      - Redistributions of source code must retain the above
 *        copyright notice, this list of conditions and the following
 *        disclaimer.
 *
 *      - Redistributions in binary form must reproduce the above
 *        copyright notice, this list of conditions and the following
 *        disclaimer in the documentation and/or other materials
 *        provided with the distribution.
 *
 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
 * EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
 * MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AWV
 * NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS
 * BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN
 * ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
 * CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
 * SOFTWARE.
 */

#pragma once

#ifndef _WORK_QUEUE_H_
#define _WORK_QUEUE_H_

#include <ntddk.h>


typedef struct _WORK_ENTRY
{
	LIST_ENTRY			Entry;
	void				(*WorkHandler)(struct _WORK_ENTRY *Work);
	LONG volatile		Queued;

}	WORK_ENTRY;

static void WorkEntryInit(WORK_ENTRY *pWork,
				  void (*WorkHandler)(struct _WORK_ENTRY *Work))
{
	pWork->WorkHandler = WorkHandler;
	pWork->Queued = 0;
}

typedef struct _WORK_CONTEXT_ENTRY
{
	WORK_ENTRY			Work;
	void				*Context;

}	WORK_CONTEXT_ENTRY;

static void WorkContextEntryInit(WORK_CONTEXT_ENTRY *pWorkContext, void *Context,
					 void (*WorkHandler)(struct _WORK_ENTRY *Work))
{
	WorkEntryInit(&pWorkContext->Work, WorkHandler);
	pWorkContext->Context = Context;
}

struct _WORK_QUEUE_TASK;

typedef struct _WORK_QUEUE
{
	LIST_ENTRY				List;
	KSPIN_LOCK				Lock;
	int					TaskCount;
	struct _WORK_QUEUE_TASK		*TaskArray;	// TaskArray[0] is for internal use

}	WORK_QUEUE;

NTSTATUS WorkQueueInit(WORK_QUEUE *pWorkQueue, PDEVICE_OBJECT Device,
			   int TaskCount);
void WorkQueueDestroy(WORK_QUEUE *pWorkQueue);
void WorkQueueInsert(WORK_QUEUE *pWorkQueue, WORK_ENTRY *pWork);

#endif // _WORK_QUEUE_H_


/*
 * Copyright (c) 2009 Intel Corporation. All rights reserved.
 *
 * This software is available to you under the OpenIB.org BSD license
 * below:
 *
 *     Redistribution and use in source and binary forms, with or
 *     without modification, are permitted provided that the following
 *     conditions are met:
 *
 *      - Redistributions of source code must retain the above
 *        copyright notice, this list of conditions and the following
 *        disclaimer.
 *
 *      - Redistributions in binary form must reproduce the above
 *        copyright notice, this list of conditions and the following
 *        disclaimer in the documentation and/or other materials
 *        provided with the distribution.
 *
 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
 * EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
 * MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AWV
 * NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS
 * BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN
 * ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
 * CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
 * SOFTWARE.
 */

#include <ntddk.h>
#include "work_queue.h"


typedef struct _WORK_QUEUE_TASK
{
	WORK_QUEUE			*pWorkQueue;
	PIO_WORKITEM		pWorkItem;
	int				Next;
	int				Index;

}	WORK_QUEUE_TASK;


#if (WINVER < _WIN32_WINNT_WIN6)
#define KeQueryActiveProcessorCount(x) KeNumberProcessors
#endif


NTSTATUS WorkQueueInit(WORK_QUEUE *pWorkQueue, PDEVICE_OBJECT Device, int TaskCount)
{
	WORK_QUEUE_TASK *task;
	KAFFINITY procs;
	int i;

	if (TaskCount == 0) {
		TaskCount = KeQueryActiveProcessorCount(&procs);
	}

	KeInitializeSpinLock(&pWorkQueue->Lock);
	InitializeListHead(&pWorkQueue->List);
	pWorkQueue->TaskCount = TaskCount;
	pWorkQueue->TaskArray = ExAllocatePoolWithTag(NonPagedPool,
								  sizeof(WORK_QUEUE_TASK) * (TaskCount + 1),
								  'ktqw');
	if (pWorkQueue->TaskArray == NULL) {
		return STATUS_INSUFFICIENT_RESOURCES;
	}

	for (i = 0; i <= TaskCount; i++) {
		task = &pWorkQueue->TaskArray[i];
		task->pWorkQueue = pWorkQueue;
		task->Index = i;
		task->Next = i + 1;
		if (i > 0) {
			task->pWorkItem = IoAllocateWorkItem(Device);
			if (task->pWorkItem == NULL) {
				goto err;
			}
		}
	}
	task->Next = 0;
	return STATUS_SUCCESS;

err:
	while (--i > 0) {
		IoFreeWorkItem(pWorkQueue->TaskArray[i].pWorkItem);
	}
	ExFreePool(pWorkQueue->TaskArray);
	return STATUS_INSUFFICIENT_RESOURCES;
}

void WorkQueueDestroy(WORK_QUEUE *pWorkQueue)
{
	while (pWorkQueue->TaskCount > 0) {
		IoFreeWorkItem(pWorkQueue->TaskArray[pWorkQueue->TaskCount--].pWorkItem);
	}
	ExFreePool(pWorkQueue->TaskArray);
}

static VOID WorkQueueHandler(PDEVICE_OBJECT pDevice, void *Context)
{
	WORK_QUEUE *wq;
	WORK_QUEUE_TASK *task = (WORK_QUEUE_TASK *) Context;
	WORK_ENTRY *work;
	LIST_ENTRY *entry;
	KLOCK_QUEUE_HANDLE lockqueue;
	UNREFERENCED_PARAMETER(pDevice);

	wq = task->pWorkQueue;
	KeAcquireInStackQueuedSpinLock(&wq->Lock, &lockqueue);
	while (!IsListEmpty(&wq->List)) {
		entry = RemoveHeadList(&wq->List);
		KeReleaseInStackQueuedSpinLock(&lockqueue);

		work = CONTAINING_RECORD(entry, WORK_ENTRY, Entry);
		InterlockedExchange(&work->Queued, 0);
		work->WorkHandler(work);
		KeAcquireInStackQueuedSpinLock(&wq->Lock, &lockqueue);
	}

	task->Next = wq->TaskArray[0].Next;
	wq->TaskArray[0].Next = task->Index;
	KeReleaseInStackQueuedSpinLock(&lockqueue);

}

void WorkQueueInsert(WORK_QUEUE *pWorkQueue, WORK_ENTRY *pWork)
{
	WORK_QUEUE_TASK *task;
	KLOCK_QUEUE_HANDLE lockqueue;

	if (InterlockedCompareExchange(&pWork->Queued, 1, 0) != 0) {
		return;
	}

	KeAcquireInStackQueuedSpinLock(&pWorkQueue->Lock, &lockqueue);
	InsertHeadList(&pWorkQueue->List, &pWork->Entry);
	task = &pWorkQueue->TaskArray[pWorkQueue->TaskArray[0].Next];
	pWorkQueue->TaskArray[0].Next = task->Next;
	KeReleaseInStackQueuedSpinLock(&lockqueue);

	if (task->Index != 0) {
		IoQueueWorkItem(task->pWorkItem, WorkQueueHandler, DelayedWorkQueue, task);
	}
}




More information about the ofw mailing list