[ofw] [RFC] [PATCH 1/2 v3] work_queue: abstraction to manage small pool of work items
Sean Hefty
sean.hefty at intel.com
Tue May 26 16:10:32 PDT 2009
Create an abstraction for managing a small pool of IO_WORKITEMs that
can be used to process a queue of work requests at passive level.
To prevent starvation of other work items and ensure fairness of system
threads, only a single work requests is processed each time a work
item is queued. If more work remains, the work item is requeued.
Using a pool of work items, rather than a single work item, allows for
some parallelism of tasks.
Signed-off-by: Sean Hefty <sean.hefty at intel.com>
---
Changes from previous versions:
Abstraction was re-added. The WORK_ENTRY was redefined to allow
overlaying it on top of the IRP.Tail.Overlay.DriverContext. This makes
it trivial to queue an IRP for asynchronous processing.
Code is inserted inline, since it is all new.
/*
* Copyright (c) 2009 Intel Corporation. All rights reserved.
*
* This software is available to you under the OpenIB.org BSD license
* below:
*
* Redistribution and use in source and binary forms, with or
* without modification, are permitted provided that the following
* conditions are met:
*
* - Redistributions of source code must retain the above
* copyright notice, this list of conditions and the following
* disclaimer.
*
* - Redistributions in binary form must reproduce the above
* copyright notice, this list of conditions and the following
* disclaimer in the documentation and/or other materials
* provided with the distribution.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
* EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
* MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AWV
* NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS
* BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN
* ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
* CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
* SOFTWARE.
*/
#pragma once
#ifndef _WORK_QUEUE_H_
#define _WORK_QUEUE_H_
#include <ntddk.h>
// Allow overlaying across IRP.Tail.Overlay.DriverContext
typedef struct _WORK_ENTRY
{
LIST_ENTRY Entry;
void (*WorkHandler)(struct _WORK_ENTRY *Work);
void *Context;
} WORK_ENTRY;
static void WorkEntryInit(WORK_ENTRY *pWork,
void (*WorkHandler)(struct _WORK_ENTRY *Work), void *Context)
{
pWork->WorkHandler = WorkHandler;
pWork->Context = Context;
}
#define WorkEntryFromIrp(pIrp) ((WORK_ENTRY *) (pIrp)->Tail.Overlay.DriverContext)
struct _WORK_QUEUE_TASK;
typedef struct _WORK_QUEUE
{
LIST_ENTRY List;
KSPIN_LOCK Lock;
int TaskCount;
struct _WORK_QUEUE_TASK *TaskArray; // TaskArray[0] is for internal use
} WORK_QUEUE;
NTSTATUS WorkQueueInit(WORK_QUEUE *pWorkQueue, PDEVICE_OBJECT Device,
int TaskCount);
void WorkQueueDestroy(WORK_QUEUE *pWorkQueue);
void WorkQueueInsert(WORK_QUEUE *pWorkQueue, WORK_ENTRY *pWork);
#endif // _WORK_QUEUE_H_
/*
* Copyright (c) 2009 Intel Corporation. All rights reserved.
*
* This software is available to you under the OpenIB.org BSD license
* below:
*
* Redistribution and use in source and binary forms, with or
* without modification, are permitted provided that the following
* conditions are met:
*
* - Redistributions of source code must retain the above
* copyright notice, this list of conditions and the following
* disclaimer.
*
* - Redistributions in binary form must reproduce the above
* copyright notice, this list of conditions and the following
* disclaimer in the documentation and/or other materials
* provided with the distribution.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
* EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
* MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AWV
* NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS
* BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN
* ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
* CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
* SOFTWARE.
*/
#include <ntddk.h>
#include "work_queue.h"
typedef struct _WORK_QUEUE_TASK
{
WORK_QUEUE *pWorkQueue;
PIO_WORKITEM pWorkItem;
int Next;
int Index;
} WORK_QUEUE_TASK;
#if (WINVER < _WIN32_WINNT_WIN6)
#define KeQueryActiveProcessorCount(x) KeNumberProcessors
#endif
NTSTATUS WorkQueueInit(WORK_QUEUE *pWorkQueue, PDEVICE_OBJECT Device, int TaskCount)
{
WORK_QUEUE_TASK *task;
KAFFINITY procs;
int i;
if (TaskCount == 0) {
TaskCount = KeQueryActiveProcessorCount(&procs);
}
KeInitializeSpinLock(&pWorkQueue->Lock);
InitializeListHead(&pWorkQueue->List);
pWorkQueue->TaskCount = TaskCount;
pWorkQueue->TaskArray = ExAllocatePoolWithTag(NonPagedPool,
sizeof(WORK_QUEUE_TASK) * (TaskCount + 1),
'ktqw');
if (pWorkQueue->TaskArray == NULL) {
return STATUS_INSUFFICIENT_RESOURCES;
}
for (i = 0; i <= TaskCount; i++) {
task = &pWorkQueue->TaskArray[i];
task->pWorkQueue = pWorkQueue;
task->Index = i;
task->Next = i + 1;
if (i > 0) {
task->pWorkItem = IoAllocateWorkItem(Device);
if (task->pWorkItem == NULL) {
goto err;
}
}
}
task->Next = 0;
return STATUS_SUCCESS;
err:
while (--i > 0) {
IoFreeWorkItem(pWorkQueue->TaskArray[i].pWorkItem);
}
ExFreePool(pWorkQueue->TaskArray);
return STATUS_INSUFFICIENT_RESOURCES;
}
void WorkQueueDestroy(WORK_QUEUE *pWorkQueue)
{
while (pWorkQueue->TaskCount > 0) {
IoFreeWorkItem(pWorkQueue->TaskArray[pWorkQueue->TaskCount--].pWorkItem);
}
ExFreePool(pWorkQueue->TaskArray);
}
static VOID WorkQueueHandler(PDEVICE_OBJECT pDevice, void *Context)
{
WORK_QUEUE *wq;
WORK_QUEUE_TASK *task = (WORK_QUEUE_TASK *) Context;
WORK_ENTRY *work;
LIST_ENTRY *entry;
KLOCK_QUEUE_HANDLE lockqueue;
UNREFERENCED_PARAMETER(pDevice);
wq = task->pWorkQueue;
KeAcquireInStackQueuedSpinLock(&wq->Lock, &lockqueue);
if (!IsListEmpty(&wq->List)) {
entry = RemoveHeadList(&wq->List);
KeReleaseInStackQueuedSpinLock(&lockqueue);
work = CONTAINING_RECORD(entry, WORK_ENTRY, Entry);
work->WorkHandler(work);
KeAcquireInStackQueuedSpinLock(&wq->Lock, &lockqueue);
if (!IsListEmpty(&wq->List)) {
KeReleaseInStackQueuedSpinLock(&lockqueue);
IoQueueWorkItem(task->pWorkItem, WorkQueueHandler, DelayedWorkQueue, task);
return;
}
}
task->Next = wq->TaskArray[0].Next;
wq->TaskArray[0].Next = task->Index;
KeReleaseInStackQueuedSpinLock(&lockqueue);
}
void WorkQueueInsert(WORK_QUEUE *pWorkQueue, WORK_ENTRY *pWork)
{
WORK_QUEUE_TASK *task;
KLOCK_QUEUE_HANDLE lockqueue;
KeAcquireInStackQueuedSpinLock(&pWorkQueue->Lock, &lockqueue);
InsertHeadList(&pWorkQueue->List, &pWork->Entry);
task = &pWorkQueue->TaskArray[pWorkQueue->TaskArray[0].Next];
pWorkQueue->TaskArray[0].Next = task->Next;
KeReleaseInStackQueuedSpinLock(&lockqueue);
if (task->Index != 0) {
IoQueueWorkItem(task->pWorkItem, WorkQueueHandler, DelayedWorkQueue, task);
}
}
More information about the ofw
mailing list