[openib-general] [PATCH] complib: thread_pool rework

Sasha Khapyorsky sashak at voltaire.com
Mon Feb 19 15:01:39 PST 2007


This reworks complib's thread_pool implementation (used by opensm
dispatcher). Prevents events signaling merges, termination races,
eliminates using of broken cl_atomic stuff, reduces memory allocations
and code complexity.

Signed-off-by: Sasha Khapyorsky <sashak at voltaire.com>
---
 osm/complib/cl_async_proc.c         |    1 -
 osm/complib/cl_dispatcher.c         |    2 +-
 osm/complib/cl_thread.c             |   13 --
 osm/complib/cl_threadpool.c         |  208 +++++++++++-----------------------
 osm/complib/libosmcomp.map          |    1 -
 osm/include/complib/cl_thread.h     |   16 ---
 osm/include/complib/cl_threadpool.h |   84 ++++----------
 osm/osmtest/osmt_multicast.c        |    1 +
 8 files changed, 92 insertions(+), 234 deletions(-)

diff --git a/osm/complib/cl_async_proc.c b/osm/complib/cl_async_proc.c
index 51561af..7ac96bb 100644
--- a/osm/complib/cl_async_proc.c
+++ b/osm/complib/cl_async_proc.c
@@ -55,7 +55,6 @@ cl_async_proc_construct(
 
 	cl_qlist_init( &p_async_proc->item_queue );
 	cl_spinlock_construct( &p_async_proc->lock );
-	cl_thread_pool_construct( &p_async_proc->thread_pool );
 }
 
 cl_status_t
diff --git a/osm/complib/cl_dispatcher.c b/osm/complib/cl_dispatcher.c
index a7c0ac7..4a1960c 100644
--- a/osm/complib/cl_dispatcher.c
+++ b/osm/complib/cl_dispatcher.c
@@ -49,6 +49,7 @@
 
 #include <stdlib.h>
 #include <complib/cl_dispatcher.h>
+#include <complib/cl_thread.h>
 #include <complib/cl_timer.h>
 
 /* give some guidance when we build our cl_pool of messages */
@@ -132,7 +133,6 @@ cl_disp_construct(
 
   cl_qlist_init( &p_disp->reg_list );
   cl_ptr_vector_construct( &p_disp->reg_vec );
-  cl_thread_pool_construct( &p_disp->worker_threads );
   cl_qlist_init( &p_disp->msg_fifo );
   cl_spinlock_construct( &p_disp->lock );
   cl_qpool_construct( &p_disp->msg_pool );
diff --git a/osm/complib/cl_thread.c b/osm/complib/cl_thread.c
index f131480..eecc7d6 100644
--- a/osm/complib/cl_thread.c
+++ b/osm/complib/cl_thread.c
@@ -39,7 +39,6 @@
 
 #include <stdio.h>
 #include <unistd.h>
-#include <sys/sysinfo.h>
 #include <complib/cl_thread.h>
 
 /*
@@ -129,18 +128,6 @@ cl_thread_stall(
 	usleep( pause_us );
 }
 
-uint32_t
-cl_proc_count( void )
-{
-	uint32_t ret;
-
-	ret = get_nprocs();
-	if( !ret)
-		return 1;/* Workaround for PPC where get_nprocs() returns 0 */
-
-	return ret;
-}
-
 boolean_t
 cl_is_current_thread(
 	IN	const cl_thread_t* const	p_thread )
diff --git a/osm/complib/cl_threadpool.c b/osm/complib/cl_threadpool.c
index ff8bf90..ca4e261 100644
--- a/osm/complib/cl_threadpool.c
+++ b/osm/complib/cl_threadpool.c
@@ -1,5 +1,5 @@
 /*
- * Copyright (c) 2004-2006 Voltaire, Inc. All rights reserved.
+ * Copyright (c) 2004-2007 Voltaire, Inc. All rights reserved.
  * Copyright (c) 2002-2005 Mellanox Technologies LTD. All rights reserved.
  * Copyright (c) 1996-2003 Intel Corporation. All rights reserved.
  *
@@ -49,134 +49,85 @@
 
 #include <stdlib.h>
 #include <string.h>
+#include <pthread.h>
+#include <sys/sysinfo.h>
 #include <complib/cl_threadpool.h>
-#include <complib/cl_atomic.h>
 
-void
-__cl_thread_pool_routine(
-	IN	void* const	context )
+static int proc_count( void )
 {
-	cl_status_t			status = CL_SUCCESS;
-	cl_thread_pool_t	*p_thread_pool = (cl_thread_pool_t*)context;
-
-	/* Continue looping until signalled to end. */
-	while( !p_thread_pool->exit )
-	{
-		/* Wait for the specified event to occur. */
-		status = cl_event_wait_on( &p_thread_pool->wakeup_event, 
-							EVENT_NO_TIMEOUT, TRUE );
-
-		/* See if we've been signalled to end execution. */
-		if( (p_thread_pool->exit) || (status == CL_NOT_DONE) )
-			break;
-
-		/* The event has been signalled.  Invoke the callback. */
-		(*p_thread_pool->pfn_callback)( (void*)p_thread_pool->context );
-	}
+	int ret = get_nprocs();
+	if (!ret)
+		return 1;/* Workaround for PPC where get_nprocs() returns 0 */
+	return ret;
+}
 
-	/*
-	 * Decrement the running count to notify the destroying thread
-	 * that the event was received and processed.
-	 */
-	cl_atomic_dec( &p_thread_pool->running_count );
-	cl_event_signal( &p_thread_pool->destroy_event );
+static void cleanup_mutex(void *arg)
+{
+	pthread_mutex_unlock(&((cl_thread_pool_t *)arg)->mutex);
 }
 
-void
-cl_thread_pool_construct(
-	IN	cl_thread_pool_t* const	p_thread_pool )
+static void *thread_pool_routine(void* context)
 {
-	CL_ASSERT( p_thread_pool);
+	cl_thread_pool_t *p_thread_pool = (cl_thread_pool_t*)context;
+
+	do {
+		pthread_mutex_lock(&p_thread_pool->mutex);
+		pthread_cleanup_push(cleanup_mutex, p_thread_pool);
+		while(!p_thread_pool->events)
+			pthread_cond_wait(&p_thread_pool->cond,
+					  &p_thread_pool->mutex);
+		p_thread_pool->events--;
+		pthread_cleanup_pop(1);
+		/* The event has been signalled.  Invoke the callback. */
+		(*p_thread_pool->pfn_callback)(p_thread_pool->context);
+	} while (1);
 
-	memset( p_thread_pool, 0, sizeof(cl_thread_pool_t) );
-	cl_event_construct( &p_thread_pool->wakeup_event );
-	cl_event_construct( &p_thread_pool->destroy_event );
-	cl_list_construct( &p_thread_pool->thread_list );
-	p_thread_pool->state = CL_UNINITIALIZED;
+	return NULL;
 }
 
 cl_status_t
 cl_thread_pool_init(
-	IN	cl_thread_pool_t* const		p_thread_pool,
-	IN	uint32_t					count,
-	IN	cl_pfn_thread_callback_t	pfn_callback,
-	IN	const void* const			context,
-	IN	const char* const			name )
+	IN cl_thread_pool_t* const p_thread_pool,
+	IN unsigned count,
+	IN void	(*pfn_callback)(void*),
+	IN void *context,
+	IN const char* const name )
 {
-	cl_status_t	status;
-	cl_thread_t	*p_thread;
-	uint32_t	i;
+	int i;
 
 	CL_ASSERT( p_thread_pool );
 	CL_ASSERT( pfn_callback );
 
-	cl_thread_pool_construct( p_thread_pool );
+	memset(p_thread_pool, 0, sizeof(*p_thread_pool));
 
-	if( !count )
-		count = cl_proc_count();
+	if(!count)
+		count = proc_count();
 
-	status = cl_list_init( &p_thread_pool->thread_list, count );
-	if( status != CL_SUCCESS )
-	{
-		cl_thread_pool_destroy( p_thread_pool );
-		return( status );
-	}
+	pthread_mutex_init(&p_thread_pool->mutex, NULL);
+	pthread_cond_init(&p_thread_pool->cond, NULL);
 
-	/* Initialize the event that the threads wait on. */
-	status = cl_event_init( &p_thread_pool->wakeup_event, FALSE );
-	if( status != CL_SUCCESS )
-	{
-		cl_thread_pool_destroy( p_thread_pool );
-		return( status );
-	}
+	p_thread_pool->events = 0;
 
-	/* Initialize the event used to destroy the threadpool. */
-	status = cl_event_init( &p_thread_pool->destroy_event, FALSE );
-	if( status != CL_SUCCESS )
-	{
+	p_thread_pool->pfn_callback = pfn_callback;
+	p_thread_pool->context = context;
+
+	p_thread_pool->tid = calloc(count, sizeof(*p_thread_pool->tid));
+	if (!p_thread_pool->tid) {
 		cl_thread_pool_destroy( p_thread_pool );
-		return( status );
+		return CL_INSUFFICIENT_MEMORY;
 	}
 
-	p_thread_pool->pfn_callback = pfn_callback;
-	p_thread_pool->context = context;
+	p_thread_pool->running_count = count;
 
 	for( i = 0; i < count; i++ )
 	{
-		/* Create a new thread. */
-		p_thread = (cl_thread_t*)malloc( sizeof(cl_thread_t) );
-		if( !p_thread )
-		{
+		if (pthread_create(&p_thread_pool->tid[i], NULL,
+				   thread_pool_routine, p_thread_pool) < 0) {
 			cl_thread_pool_destroy( p_thread_pool );
-			return( CL_INSUFFICIENT_MEMORY );
+			return CL_INSUFFICIENT_RESOURCES;
 		}
-
-		cl_thread_construct( p_thread );
-
-		/*
-		 * Add it to the list.  This is guaranteed to work since we
-		 * initialized the list to hold at least the number of threads we want
-		 * to store there.
-		 */
-		status = cl_list_insert_head( &p_thread_pool->thread_list, p_thread );
-		CL_ASSERT( status == CL_SUCCESS );
-
-		/* Start the thread. */
-		status = cl_thread_init( p_thread, __cl_thread_pool_routine,
-			p_thread_pool, name );
-		if( status != CL_SUCCESS )
-		{
-			cl_thread_pool_destroy( p_thread_pool );
-			return( status );
-		}
-
-		/*
-		 * Increment the running count to insure that a destroying thread
-		 * will signal all the threads.
-		 */
-		cl_atomic_inc( &p_thread_pool->running_count );
 	}
-	p_thread_pool->state = CL_INITIALIZED;
+
 	return( CL_SUCCESS );
 }
 
@@ -184,59 +135,34 @@ void
 cl_thread_pool_destroy(
 	IN	cl_thread_pool_t* const	p_thread_pool )
 {
-	cl_thread_t		*p_thread;
+	int i;
 
 	CL_ASSERT( p_thread_pool );
-	CL_ASSERT( cl_is_state_valid( p_thread_pool->state ) );
 
-	/* Indicate to all threads that they need to exit. */
-	p_thread_pool->exit = TRUE;
+	for (i = 0 ; i < p_thread_pool->running_count; i++)
+		if (p_thread_pool->tid[i])
+			pthread_cancel(p_thread_pool->tid[i]);
 
-	/*
-	 * Signal the threads until they have all exited.  Signalling
-	 * once for each thread is not guaranteed to work since two events
-	 * could release only a single thread, depending on the rate at which
-	 * the events are set and how the thread scheduler processes notifications.
-	 */
+	for (i = 0 ; i < p_thread_pool->running_count; i++)
+		if (p_thread_pool->tid[i])
+			pthread_join(p_thread_pool->tid[i], NULL);
 
-	while( p_thread_pool->running_count )
-	{
-     cl_event_signal( &p_thread_pool->wakeup_event );
-     /*
-      * Wait for the destroy event to occur, indicating that the thread
-      * has exited.
-      */
-     cl_event_wait_on( &p_thread_pool->destroy_event,
-                       EVENT_NO_TIMEOUT, TRUE );
-   }
-
-	/*
-	 * Stop each thread one at a time.  Note that this cannot be done in the
-	 * above for loop because signal will wake up an unknown thread.
-	 */
-	if( cl_is_list_inited( &p_thread_pool->thread_list ) )
-	{
-		while( !cl_is_list_empty( &p_thread_pool->thread_list ) )
-		{
-			p_thread =
-				(cl_thread_t*)cl_list_remove_head( &p_thread_pool->thread_list );
-			cl_thread_destroy( p_thread );
-			free( p_thread );
-		}
-	}
+	p_thread_pool->running_count = 0;
+	pthread_cond_destroy(&p_thread_pool->cond);
+	pthread_mutex_destroy(&p_thread_pool->mutex);
 
-	cl_event_destroy( &p_thread_pool->destroy_event );
-	cl_event_destroy( &p_thread_pool->wakeup_event );
-	cl_list_destroy( &p_thread_pool->thread_list );
-	p_thread_pool->state = CL_UNINITIALIZED;
+	p_thread_pool->events = 0;
 }
 
 cl_status_t
 cl_thread_pool_signal(
 	IN	cl_thread_pool_t* const	p_thread_pool )
 {
+	int ret;
 	CL_ASSERT( p_thread_pool );
-	CL_ASSERT( p_thread_pool->state == CL_INITIALIZED );
-
-	return( cl_event_signal( &p_thread_pool->wakeup_event ) );
+	pthread_mutex_lock(&p_thread_pool->mutex);
+	p_thread_pool->events++;
+	ret = pthread_cond_signal(&p_thread_pool->cond);
+	pthread_mutex_unlock(&p_thread_pool->mutex);
+	return ret;
 }
diff --git a/osm/complib/libosmcomp.map b/osm/complib/libosmcomp.map
index e2e58b1..3b8c040 100644
--- a/osm/complib/libosmcomp.map
+++ b/osm/complib/libosmcomp.map
@@ -138,7 +138,6 @@ OSMCOMP_1.1 {
 		cl_thread_destroy;
 		cl_thread_suspend;
 		cl_thread_stall;
-		cl_proc_count;
 		cl_is_current_thread;
 		__cl_thread_pool_routine;
 		cl_thread_pool_construct;
diff --git a/osm/include/complib/cl_thread.h b/osm/include/complib/cl_thread.h
index 4752278..9635e22 100644
--- a/osm/include/complib/cl_thread.h
+++ b/osm/include/complib/cl_thread.h
@@ -312,22 +312,6 @@ cl_thread_stall(
 *	Thread, cl_thread_suspend
 *********/
 
-/****f* Component Library: Thread/cl_proc_count
-* NAME
-*	cl_proc_count
-*
-* DESCRIPTION
-*	The cl_proc_count function returns the number of processors in the system.
-*
-* SYNOPSIS
-*/
-uint32_t
-cl_proc_count( void );
-/*
-* RETURN VALUE
-*	Returns the number of processors in the system.
-*********/
-
 /****i* Component Library: Thread/cl_is_current_thread
 * NAME
 *	cl_is_current_thread
diff --git a/osm/include/complib/cl_threadpool.h b/osm/include/complib/cl_threadpool.h
index aa1e066..30b5f86 100644
--- a/osm/include/complib/cl_threadpool.h
+++ b/osm/include/complib/cl_threadpool.h
@@ -46,9 +46,8 @@
 #ifndef _CL_THREAD_POOL_H_
 #define _CL_THREAD_POOL_H_
 
-#include <complib/cl_list.h>
-#include <complib/cl_thread.h>
-#include <complib/cl_event.h>
+#include <pthread.h>
+#include <complib/cl_types.h>
 
 #ifdef __cplusplus
 #  define BEGIN_C_DECLS extern "C" {
@@ -100,15 +99,13 @@ BEGIN_C_DECLS
 */
 typedef struct _cl_thread_pool
 {
-	cl_pfn_thread_callback_t	pfn_callback;
-	const void					*context;
-	cl_list_t					thread_list;
-	cl_event_t					wakeup_event;
-	cl_event_t					destroy_event;
-	boolean_t					exit;
-	cl_state_t					state;
-	atomic32_t					running_count;
-
+	void (*pfn_callback)(void*);
+	void *context;
+	unsigned running_count;
+	unsigned events;
+	pthread_cond_t cond;
+	pthread_mutex_t mutex;
+	pthread_t *tid;
 } cl_thread_pool_t;
 /*
 * FIELDS
@@ -118,58 +115,23 @@ typedef struct _cl_thread_pool
 *	context
 *		Context to pass to the thread callback function.
 *
-*	thread_list
-*		List of threads managed by the thread pool.
-*
-*	event
-*		Event used to signal threads to wake up and do work.
-*
-*	destroy_event
-*		Event used to signal threads to exit.
-*
-*	exit
-*		Flag used to indicates threads to exit.
-*
-*	state
-*		State of the thread pool.
-*
 *	running_count
 *		Number of threads running.
 *
-* SEE ALSO
-*	Thread Pool
-*********/
-
-/****f* Component Library: Thread Pool/cl_thread_pool_construct
-* NAME
-*	cl_thread_pool_construct
+*	events
+*		events counter
 *
-* DESCRIPTION
-*	The cl_thread_pool_construct function initializes the state of a
-*	thread pool.
+*	mutex
+*		mutex for cond variable protection
 *
-* SYNOPSIS
-*/
-void
-cl_thread_pool_construct(
-	IN	cl_thread_pool_t* const	p_thread_pool );
-/*
-* PARAMETERS
-*	p_thread_pool
-*		[in] Pointer to a thread pool structure.
+*	cond
+*		conditional variable to signal an event to thread
 *
-* RETURN VALUE
-*	This function does not return a value.
-*
-* NOTES
-*	Allows calling cl_thread_pool_destroy without first calling
-*	cl_thread_pool_init.
-*
-*	Calling cl_thread_pool_construct is a prerequisite to calling any other
-*	thread pool function except cl_thread_pool_init.
+*	tid
+*		array of allocated thread ids.
 *
 * SEE ALSO
-*	Thread Pool, cl_thread_pool_init, cl_thread_pool_destroy
+*	Thread Pool
 *********/
 
 /****f* Component Library: Thread Pool/cl_thread_pool_init
@@ -184,11 +146,11 @@ cl_thread_pool_construct(
 */
 cl_status_t
 cl_thread_pool_init(
-	IN	cl_thread_pool_t* const		p_thread_pool,
-	IN	uint32_t					thread_count,
-	IN	cl_pfn_thread_callback_t	pfn_callback,
-	IN	const void* const			context,
-	IN	const char* const			name );
+	IN cl_thread_pool_t* const p_thread_pool,
+	IN unsigned count,
+	IN void	(*pfn_callback)(void*),
+	IN void *context,
+	IN const char* const name );
 /*
 * PARAMETERS
 *	p_thread_pool
diff --git a/osm/osmtest/osmt_multicast.c b/osm/osmtest/osmt_multicast.c
index d5519eb..724a0bb 100644
--- a/osm/osmtest/osmt_multicast.c
+++ b/osm/osmtest/osmt_multicast.c
@@ -51,6 +51,7 @@
 #include <string.h>
 #include <complib/cl_debug.h>
 #include <complib/cl_map.h>
+#include <complib/cl_list.h>
 #include "osmtest.h"
 
 /**********************************************************************
-- 
1.5.0.1.40.gb40d





More information about the general mailing list