[openib-general] [PATCH] complib: thread_pool rework
Sasha Khapyorsky
sashak at voltaire.com
Mon Feb 19 15:01:39 PST 2007
This reworks complib's thread_pool implementation (used by opensm
dispatcher). Prevents events signaling merges, termination races,
eliminates using of broken cl_atomic stuff, reduces memory allocations
and code complexity.
Signed-off-by: Sasha Khapyorsky <sashak at voltaire.com>
---
osm/complib/cl_async_proc.c | 1 -
osm/complib/cl_dispatcher.c | 2 +-
osm/complib/cl_thread.c | 13 --
osm/complib/cl_threadpool.c | 208 +++++++++++-----------------------
osm/complib/libosmcomp.map | 1 -
osm/include/complib/cl_thread.h | 16 ---
osm/include/complib/cl_threadpool.h | 84 ++++----------
osm/osmtest/osmt_multicast.c | 1 +
8 files changed, 92 insertions(+), 234 deletions(-)
diff --git a/osm/complib/cl_async_proc.c b/osm/complib/cl_async_proc.c
index 51561af..7ac96bb 100644
--- a/osm/complib/cl_async_proc.c
+++ b/osm/complib/cl_async_proc.c
@@ -55,7 +55,6 @@ cl_async_proc_construct(
cl_qlist_init( &p_async_proc->item_queue );
cl_spinlock_construct( &p_async_proc->lock );
- cl_thread_pool_construct( &p_async_proc->thread_pool );
}
cl_status_t
diff --git a/osm/complib/cl_dispatcher.c b/osm/complib/cl_dispatcher.c
index a7c0ac7..4a1960c 100644
--- a/osm/complib/cl_dispatcher.c
+++ b/osm/complib/cl_dispatcher.c
@@ -49,6 +49,7 @@
#include <stdlib.h>
#include <complib/cl_dispatcher.h>
+#include <complib/cl_thread.h>
#include <complib/cl_timer.h>
/* give some guidance when we build our cl_pool of messages */
@@ -132,7 +133,6 @@ cl_disp_construct(
cl_qlist_init( &p_disp->reg_list );
cl_ptr_vector_construct( &p_disp->reg_vec );
- cl_thread_pool_construct( &p_disp->worker_threads );
cl_qlist_init( &p_disp->msg_fifo );
cl_spinlock_construct( &p_disp->lock );
cl_qpool_construct( &p_disp->msg_pool );
diff --git a/osm/complib/cl_thread.c b/osm/complib/cl_thread.c
index f131480..eecc7d6 100644
--- a/osm/complib/cl_thread.c
+++ b/osm/complib/cl_thread.c
@@ -39,7 +39,6 @@
#include <stdio.h>
#include <unistd.h>
-#include <sys/sysinfo.h>
#include <complib/cl_thread.h>
/*
@@ -129,18 +128,6 @@ cl_thread_stall(
usleep( pause_us );
}
-uint32_t
-cl_proc_count( void )
-{
- uint32_t ret;
-
- ret = get_nprocs();
- if( !ret)
- return 1;/* Workaround for PPC where get_nprocs() returns 0 */
-
- return ret;
-}
-
boolean_t
cl_is_current_thread(
IN const cl_thread_t* const p_thread )
diff --git a/osm/complib/cl_threadpool.c b/osm/complib/cl_threadpool.c
index ff8bf90..ca4e261 100644
--- a/osm/complib/cl_threadpool.c
+++ b/osm/complib/cl_threadpool.c
@@ -1,5 +1,5 @@
/*
- * Copyright (c) 2004-2006 Voltaire, Inc. All rights reserved.
+ * Copyright (c) 2004-2007 Voltaire, Inc. All rights reserved.
* Copyright (c) 2002-2005 Mellanox Technologies LTD. All rights reserved.
* Copyright (c) 1996-2003 Intel Corporation. All rights reserved.
*
@@ -49,134 +49,85 @@
#include <stdlib.h>
#include <string.h>
+#include <pthread.h>
+#include <sys/sysinfo.h>
#include <complib/cl_threadpool.h>
-#include <complib/cl_atomic.h>
-void
-__cl_thread_pool_routine(
- IN void* const context )
+static int proc_count( void )
{
- cl_status_t status = CL_SUCCESS;
- cl_thread_pool_t *p_thread_pool = (cl_thread_pool_t*)context;
-
- /* Continue looping until signalled to end. */
- while( !p_thread_pool->exit )
- {
- /* Wait for the specified event to occur. */
- status = cl_event_wait_on( &p_thread_pool->wakeup_event,
- EVENT_NO_TIMEOUT, TRUE );
-
- /* See if we've been signalled to end execution. */
- if( (p_thread_pool->exit) || (status == CL_NOT_DONE) )
- break;
-
- /* The event has been signalled. Invoke the callback. */
- (*p_thread_pool->pfn_callback)( (void*)p_thread_pool->context );
- }
+ int ret = get_nprocs();
+ if (!ret)
+ return 1;/* Workaround for PPC where get_nprocs() returns 0 */
+ return ret;
+}
- /*
- * Decrement the running count to notify the destroying thread
- * that the event was received and processed.
- */
- cl_atomic_dec( &p_thread_pool->running_count );
- cl_event_signal( &p_thread_pool->destroy_event );
+static void cleanup_mutex(void *arg)
+{
+ pthread_mutex_unlock(&((cl_thread_pool_t *)arg)->mutex);
}
-void
-cl_thread_pool_construct(
- IN cl_thread_pool_t* const p_thread_pool )
+static void *thread_pool_routine(void* context)
{
- CL_ASSERT( p_thread_pool);
+ cl_thread_pool_t *p_thread_pool = (cl_thread_pool_t*)context;
+
+ do {
+ pthread_mutex_lock(&p_thread_pool->mutex);
+ pthread_cleanup_push(cleanup_mutex, p_thread_pool);
+ while(!p_thread_pool->events)
+ pthread_cond_wait(&p_thread_pool->cond,
+ &p_thread_pool->mutex);
+ p_thread_pool->events--;
+ pthread_cleanup_pop(1);
+ /* The event has been signalled. Invoke the callback. */
+ (*p_thread_pool->pfn_callback)(p_thread_pool->context);
+ } while (1);
- memset( p_thread_pool, 0, sizeof(cl_thread_pool_t) );
- cl_event_construct( &p_thread_pool->wakeup_event );
- cl_event_construct( &p_thread_pool->destroy_event );
- cl_list_construct( &p_thread_pool->thread_list );
- p_thread_pool->state = CL_UNINITIALIZED;
+ return NULL;
}
cl_status_t
cl_thread_pool_init(
- IN cl_thread_pool_t* const p_thread_pool,
- IN uint32_t count,
- IN cl_pfn_thread_callback_t pfn_callback,
- IN const void* const context,
- IN const char* const name )
+ IN cl_thread_pool_t* const p_thread_pool,
+ IN unsigned count,
+ IN void (*pfn_callback)(void*),
+ IN void *context,
+ IN const char* const name )
{
- cl_status_t status;
- cl_thread_t *p_thread;
- uint32_t i;
+ int i;
CL_ASSERT( p_thread_pool );
CL_ASSERT( pfn_callback );
- cl_thread_pool_construct( p_thread_pool );
+ memset(p_thread_pool, 0, sizeof(*p_thread_pool));
- if( !count )
- count = cl_proc_count();
+ if(!count)
+ count = proc_count();
- status = cl_list_init( &p_thread_pool->thread_list, count );
- if( status != CL_SUCCESS )
- {
- cl_thread_pool_destroy( p_thread_pool );
- return( status );
- }
+ pthread_mutex_init(&p_thread_pool->mutex, NULL);
+ pthread_cond_init(&p_thread_pool->cond, NULL);
- /* Initialize the event that the threads wait on. */
- status = cl_event_init( &p_thread_pool->wakeup_event, FALSE );
- if( status != CL_SUCCESS )
- {
- cl_thread_pool_destroy( p_thread_pool );
- return( status );
- }
+ p_thread_pool->events = 0;
- /* Initialize the event used to destroy the threadpool. */
- status = cl_event_init( &p_thread_pool->destroy_event, FALSE );
- if( status != CL_SUCCESS )
- {
+ p_thread_pool->pfn_callback = pfn_callback;
+ p_thread_pool->context = context;
+
+ p_thread_pool->tid = calloc(count, sizeof(*p_thread_pool->tid));
+ if (!p_thread_pool->tid) {
cl_thread_pool_destroy( p_thread_pool );
- return( status );
+ return CL_INSUFFICIENT_MEMORY;
}
- p_thread_pool->pfn_callback = pfn_callback;
- p_thread_pool->context = context;
+ p_thread_pool->running_count = count;
for( i = 0; i < count; i++ )
{
- /* Create a new thread. */
- p_thread = (cl_thread_t*)malloc( sizeof(cl_thread_t) );
- if( !p_thread )
- {
+ if (pthread_create(&p_thread_pool->tid[i], NULL,
+ thread_pool_routine, p_thread_pool) < 0) {
cl_thread_pool_destroy( p_thread_pool );
- return( CL_INSUFFICIENT_MEMORY );
+ return CL_INSUFFICIENT_RESOURCES;
}
-
- cl_thread_construct( p_thread );
-
- /*
- * Add it to the list. This is guaranteed to work since we
- * initialized the list to hold at least the number of threads we want
- * to store there.
- */
- status = cl_list_insert_head( &p_thread_pool->thread_list, p_thread );
- CL_ASSERT( status == CL_SUCCESS );
-
- /* Start the thread. */
- status = cl_thread_init( p_thread, __cl_thread_pool_routine,
- p_thread_pool, name );
- if( status != CL_SUCCESS )
- {
- cl_thread_pool_destroy( p_thread_pool );
- return( status );
- }
-
- /*
- * Increment the running count to insure that a destroying thread
- * will signal all the threads.
- */
- cl_atomic_inc( &p_thread_pool->running_count );
}
- p_thread_pool->state = CL_INITIALIZED;
+
return( CL_SUCCESS );
}
@@ -184,59 +135,34 @@ void
cl_thread_pool_destroy(
IN cl_thread_pool_t* const p_thread_pool )
{
- cl_thread_t *p_thread;
+ int i;
CL_ASSERT( p_thread_pool );
- CL_ASSERT( cl_is_state_valid( p_thread_pool->state ) );
- /* Indicate to all threads that they need to exit. */
- p_thread_pool->exit = TRUE;
+ for (i = 0 ; i < p_thread_pool->running_count; i++)
+ if (p_thread_pool->tid[i])
+ pthread_cancel(p_thread_pool->tid[i]);
- /*
- * Signal the threads until they have all exited. Signalling
- * once for each thread is not guaranteed to work since two events
- * could release only a single thread, depending on the rate at which
- * the events are set and how the thread scheduler processes notifications.
- */
+ for (i = 0 ; i < p_thread_pool->running_count; i++)
+ if (p_thread_pool->tid[i])
+ pthread_join(p_thread_pool->tid[i], NULL);
- while( p_thread_pool->running_count )
- {
- cl_event_signal( &p_thread_pool->wakeup_event );
- /*
- * Wait for the destroy event to occur, indicating that the thread
- * has exited.
- */
- cl_event_wait_on( &p_thread_pool->destroy_event,
- EVENT_NO_TIMEOUT, TRUE );
- }
-
- /*
- * Stop each thread one at a time. Note that this cannot be done in the
- * above for loop because signal will wake up an unknown thread.
- */
- if( cl_is_list_inited( &p_thread_pool->thread_list ) )
- {
- while( !cl_is_list_empty( &p_thread_pool->thread_list ) )
- {
- p_thread =
- (cl_thread_t*)cl_list_remove_head( &p_thread_pool->thread_list );
- cl_thread_destroy( p_thread );
- free( p_thread );
- }
- }
+ p_thread_pool->running_count = 0;
+ pthread_cond_destroy(&p_thread_pool->cond);
+ pthread_mutex_destroy(&p_thread_pool->mutex);
- cl_event_destroy( &p_thread_pool->destroy_event );
- cl_event_destroy( &p_thread_pool->wakeup_event );
- cl_list_destroy( &p_thread_pool->thread_list );
- p_thread_pool->state = CL_UNINITIALIZED;
+ p_thread_pool->events = 0;
}
cl_status_t
cl_thread_pool_signal(
IN cl_thread_pool_t* const p_thread_pool )
{
+ int ret;
CL_ASSERT( p_thread_pool );
- CL_ASSERT( p_thread_pool->state == CL_INITIALIZED );
-
- return( cl_event_signal( &p_thread_pool->wakeup_event ) );
+ pthread_mutex_lock(&p_thread_pool->mutex);
+ p_thread_pool->events++;
+ ret = pthread_cond_signal(&p_thread_pool->cond);
+ pthread_mutex_unlock(&p_thread_pool->mutex);
+ return ret;
}
diff --git a/osm/complib/libosmcomp.map b/osm/complib/libosmcomp.map
index e2e58b1..3b8c040 100644
--- a/osm/complib/libosmcomp.map
+++ b/osm/complib/libosmcomp.map
@@ -138,7 +138,6 @@ OSMCOMP_1.1 {
cl_thread_destroy;
cl_thread_suspend;
cl_thread_stall;
- cl_proc_count;
cl_is_current_thread;
__cl_thread_pool_routine;
cl_thread_pool_construct;
diff --git a/osm/include/complib/cl_thread.h b/osm/include/complib/cl_thread.h
index 4752278..9635e22 100644
--- a/osm/include/complib/cl_thread.h
+++ b/osm/include/complib/cl_thread.h
@@ -312,22 +312,6 @@ cl_thread_stall(
* Thread, cl_thread_suspend
*********/
-/****f* Component Library: Thread/cl_proc_count
-* NAME
-* cl_proc_count
-*
-* DESCRIPTION
-* The cl_proc_count function returns the number of processors in the system.
-*
-* SYNOPSIS
-*/
-uint32_t
-cl_proc_count( void );
-/*
-* RETURN VALUE
-* Returns the number of processors in the system.
-*********/
-
/****i* Component Library: Thread/cl_is_current_thread
* NAME
* cl_is_current_thread
diff --git a/osm/include/complib/cl_threadpool.h b/osm/include/complib/cl_threadpool.h
index aa1e066..30b5f86 100644
--- a/osm/include/complib/cl_threadpool.h
+++ b/osm/include/complib/cl_threadpool.h
@@ -46,9 +46,8 @@
#ifndef _CL_THREAD_POOL_H_
#define _CL_THREAD_POOL_H_
-#include <complib/cl_list.h>
-#include <complib/cl_thread.h>
-#include <complib/cl_event.h>
+#include <pthread.h>
+#include <complib/cl_types.h>
#ifdef __cplusplus
# define BEGIN_C_DECLS extern "C" {
@@ -100,15 +99,13 @@ BEGIN_C_DECLS
*/
typedef struct _cl_thread_pool
{
- cl_pfn_thread_callback_t pfn_callback;
- const void *context;
- cl_list_t thread_list;
- cl_event_t wakeup_event;
- cl_event_t destroy_event;
- boolean_t exit;
- cl_state_t state;
- atomic32_t running_count;
-
+ void (*pfn_callback)(void*);
+ void *context;
+ unsigned running_count;
+ unsigned events;
+ pthread_cond_t cond;
+ pthread_mutex_t mutex;
+ pthread_t *tid;
} cl_thread_pool_t;
/*
* FIELDS
@@ -118,58 +115,23 @@ typedef struct _cl_thread_pool
* context
* Context to pass to the thread callback function.
*
-* thread_list
-* List of threads managed by the thread pool.
-*
-* event
-* Event used to signal threads to wake up and do work.
-*
-* destroy_event
-* Event used to signal threads to exit.
-*
-* exit
-* Flag used to indicates threads to exit.
-*
-* state
-* State of the thread pool.
-*
* running_count
* Number of threads running.
*
-* SEE ALSO
-* Thread Pool
-*********/
-
-/****f* Component Library: Thread Pool/cl_thread_pool_construct
-* NAME
-* cl_thread_pool_construct
+* events
+* events counter
*
-* DESCRIPTION
-* The cl_thread_pool_construct function initializes the state of a
-* thread pool.
+* mutex
+* mutex for cond variable protection
*
-* SYNOPSIS
-*/
-void
-cl_thread_pool_construct(
- IN cl_thread_pool_t* const p_thread_pool );
-/*
-* PARAMETERS
-* p_thread_pool
-* [in] Pointer to a thread pool structure.
+* cond
+* conditional variable to signal an event to thread
*
-* RETURN VALUE
-* This function does not return a value.
-*
-* NOTES
-* Allows calling cl_thread_pool_destroy without first calling
-* cl_thread_pool_init.
-*
-* Calling cl_thread_pool_construct is a prerequisite to calling any other
-* thread pool function except cl_thread_pool_init.
+* tid
+* array of allocated thread ids.
*
* SEE ALSO
-* Thread Pool, cl_thread_pool_init, cl_thread_pool_destroy
+* Thread Pool
*********/
/****f* Component Library: Thread Pool/cl_thread_pool_init
@@ -184,11 +146,11 @@ cl_thread_pool_construct(
*/
cl_status_t
cl_thread_pool_init(
- IN cl_thread_pool_t* const p_thread_pool,
- IN uint32_t thread_count,
- IN cl_pfn_thread_callback_t pfn_callback,
- IN const void* const context,
- IN const char* const name );
+ IN cl_thread_pool_t* const p_thread_pool,
+ IN unsigned count,
+ IN void (*pfn_callback)(void*),
+ IN void *context,
+ IN const char* const name );
/*
* PARAMETERS
* p_thread_pool
diff --git a/osm/osmtest/osmt_multicast.c b/osm/osmtest/osmt_multicast.c
index d5519eb..724a0bb 100644
--- a/osm/osmtest/osmt_multicast.c
+++ b/osm/osmtest/osmt_multicast.c
@@ -51,6 +51,7 @@
#include <string.h>
#include <complib/cl_debug.h>
#include <complib/cl_map.h>
+#include <complib/cl_list.h>
#include "osmtest.h"
/**********************************************************************
--
1.5.0.1.40.gb40d
More information about the general
mailing list