Blame IlmThread/IlmThreadPool.cpp

Packit 8dc392
///////////////////////////////////////////////////////////////////////////
Packit 8dc392
//
Packit 8dc392
// Copyright (c) 2005-2012, Industrial Light & Magic, a division of Lucas
Packit 8dc392
// Digital Ltd. LLC
Packit 8dc392
// 
Packit 8dc392
// All rights reserved.
Packit 8dc392
// 
Packit 8dc392
// Redistribution and use in source and binary forms, with or without
Packit 8dc392
// modification, are permitted provided that the following conditions are
Packit 8dc392
// met:
Packit 8dc392
// *       Redistributions of source code must retain the above copyright
Packit 8dc392
// notice, this list of conditions and the following disclaimer.
Packit 8dc392
// *       Redistributions in binary form must reproduce the above
Packit 8dc392
// copyright notice, this list of conditions and the following disclaimer
Packit 8dc392
// in the documentation and/or other materials provided with the
Packit 8dc392
// distribution.
Packit 8dc392
// *       Neither the name of Industrial Light & Magic nor the names of
Packit 8dc392
// its contributors may be used to endorse or promote products derived
Packit 8dc392
// from this software without specific prior written permission. 
Packit 8dc392
// 
Packit 8dc392
// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
Packit 8dc392
// "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
Packit 8dc392
// LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
Packit 8dc392
// A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
Packit 8dc392
// OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
Packit 8dc392
// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
Packit 8dc392
// LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
Packit 8dc392
// DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
Packit 8dc392
// THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
Packit 8dc392
// (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
Packit 8dc392
// OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
Packit 8dc392
//
Packit 8dc392
///////////////////////////////////////////////////////////////////////////
Packit 8dc392
Packit 8dc392
//-----------------------------------------------------------------------------
Packit 8dc392
//
Packit 8dc392
//	class Task, class ThreadPool, class TaskGroup
Packit 8dc392
//
Packit 8dc392
//-----------------------------------------------------------------------------
Packit 8dc392
Packit 8dc392
#include "IlmThread.h"
Packit 8dc392
#include "IlmThreadMutex.h"
Packit 8dc392
#include "IlmThreadSemaphore.h"
Packit 8dc392
#include "IlmThreadPool.h"
Packit 8dc392
#include "Iex.h"
Packit 8dc392
#include <list>
Packit 8dc392
Packit 8dc392
using namespace std;
Packit 8dc392
Packit 8dc392
ILMTHREAD_INTERNAL_NAMESPACE_SOURCE_ENTER
Packit 8dc392
namespace {
Packit 8dc392
Packit 8dc392
class WorkerThread: public Thread
Packit 8dc392
{
Packit 8dc392
  public:
Packit 8dc392
Packit 8dc392
    WorkerThread (ThreadPool::Data* data);
Packit 8dc392
Packit 8dc392
    virtual void	run ();
Packit 8dc392
    
Packit 8dc392
  private:
Packit 8dc392
Packit 8dc392
    ThreadPool::Data *	_data;
Packit 8dc392
};
Packit 8dc392
Packit 8dc392
} //namespace
Packit 8dc392
Packit 8dc392
Packit 8dc392
struct TaskGroup::Data
Packit 8dc392
{
Packit 8dc392
     Data ();
Packit 8dc392
    ~Data ();
Packit 8dc392
    
Packit 8dc392
    void	addTask () ;
Packit 8dc392
    void	removeTask ();
Packit 8dc392
    
Packit 8dc392
    Semaphore	isEmpty;        // used to signal that the taskgroup is empty
Packit 8dc392
    int         numPending;     // number of pending tasks to still execute
Packit 8dc392
    Mutex       dtorMutex;      // used to work around the glibc bug:
Packit 8dc392
                                // http://sources.redhat.com/bugzilla/show_bug.cgi?id=12674
Packit 8dc392
};
Packit 8dc392
Packit 8dc392
Packit 8dc392
struct ThreadPool::Data
Packit 8dc392
{
Packit 8dc392
     Data ();
Packit 8dc392
    ~Data();
Packit 8dc392
    
Packit 8dc392
    void	finish ();
Packit 8dc392
    bool	stopped () const;
Packit 8dc392
    void	stop ();
Packit 8dc392
Packit 8dc392
    Semaphore taskSemaphore;        // threads wait on this for ready tasks
Packit 8dc392
    Mutex taskMutex;                // mutual exclusion for the tasks list
Packit 8dc392
    list<Task*> tasks;              // the list of tasks to execute
Packit 8dc392
    size_t numTasks;                // fast access to list size
Packit 8dc392
                                    //   (list::size() can be O(n))
Packit 8dc392
Packit 8dc392
    Semaphore threadSemaphore;      // signaled when a thread starts executing
Packit 8dc392
    Mutex threadMutex;              // mutual exclusion for threads list
Packit 8dc392
    list<WorkerThread*> threads;    // the list of all threads
Packit 8dc392
    size_t numThreads;              // fast access to list size
Packit 8dc392
    
Packit 8dc392
    bool stopping;                  // flag indicating whether to stop threads
Packit 8dc392
    Mutex stopMutex;                // mutual exclusion for stopping flag
Packit 8dc392
};
Packit 8dc392
Packit 8dc392
Packit 8dc392
Packit 8dc392
//
Packit 8dc392
// class WorkerThread
Packit 8dc392
//
Packit 8dc392
Packit 8dc392
WorkerThread::WorkerThread (ThreadPool::Data* data):
Packit 8dc392
    _data (data)
Packit 8dc392
{
Packit 8dc392
    start();
Packit 8dc392
}
Packit 8dc392
Packit 8dc392
Packit 8dc392
void
Packit 8dc392
WorkerThread::run ()
Packit 8dc392
{
Packit 8dc392
    //
Packit 8dc392
    // Signal that the thread has started executing
Packit 8dc392
    //
Packit 8dc392
Packit 8dc392
    _data->threadSemaphore.post();
Packit 8dc392
Packit 8dc392
    while (true)
Packit 8dc392
    {
Packit 8dc392
	//
Packit 8dc392
        // Wait for a task to become available
Packit 8dc392
	//
Packit 8dc392
Packit 8dc392
        _data->taskSemaphore.wait();
Packit 8dc392
Packit 8dc392
        {
Packit 8dc392
            Lock taskLock (_data->taskMutex);
Packit 8dc392
    
Packit 8dc392
	    //
Packit 8dc392
            // If there is a task pending, pop off the next task in the FIFO
Packit 8dc392
	    //
Packit 8dc392
Packit 8dc392
            if (_data->numTasks > 0)
Packit 8dc392
            {
Packit 8dc392
                Task* task = _data->tasks.front();
Packit 8dc392
		TaskGroup* taskGroup = task->group();
Packit 8dc392
                _data->tasks.pop_front();
Packit 8dc392
                _data->numTasks--;
Packit 8dc392
Packit 8dc392
                taskLock.release();
Packit 8dc392
                task->execute();
Packit 8dc392
                taskLock.acquire();
Packit 8dc392
Packit 8dc392
                delete task;
Packit 8dc392
                taskGroup->_data->removeTask();
Packit 8dc392
            }
Packit 8dc392
            else if (_data->stopped())
Packit 8dc392
	    {
Packit 8dc392
                break;
Packit 8dc392
	    }
Packit 8dc392
        }
Packit 8dc392
    }
Packit 8dc392
}
Packit 8dc392
Packit 8dc392
Packit 8dc392
//
Packit 8dc392
// struct TaskGroup::Data
Packit 8dc392
//
Packit 8dc392
Packit 8dc392
TaskGroup::Data::Data (): isEmpty (1), numPending (0)
Packit 8dc392
{
Packit 8dc392
    // empty
Packit 8dc392
}
Packit 8dc392
Packit 8dc392
Packit 8dc392
TaskGroup::Data::~Data ()
Packit 8dc392
{
Packit 8dc392
    //
Packit 8dc392
    // A TaskGroup acts like an "inverted" semaphore: if the count
Packit 8dc392
    // is above 0 then waiting on the taskgroup will block.  This
Packit 8dc392
    // destructor waits until the taskgroup is empty before returning.
Packit 8dc392
    //
Packit 8dc392
Packit 8dc392
    isEmpty.wait ();
Packit 8dc392
Packit 8dc392
    // Alas, given the current bug in glibc we need a secondary
Packit 8dc392
    // syncronisation primitive here to account for the fact that
Packit 8dc392
    // destructing the isEmpty Semaphore in this thread can cause
Packit 8dc392
    // an error for a separate thread that is issuing the post() call.
Packit 8dc392
    // We are entitled to destruct the semaphore at this point, however,
Packit 8dc392
    // that post() call attempts to access data out of the associated
Packit 8dc392
    // memory *after* it has woken the waiting threads, including this one,
Packit 8dc392
    // potentially leading to invalid memory reads.
Packit 8dc392
    // http://sources.redhat.com/bugzilla/show_bug.cgi?id=12674
Packit 8dc392
Packit 8dc392
    Lock lock (dtorMutex);
Packit 8dc392
}
Packit 8dc392
Packit 8dc392
Packit 8dc392
void
Packit 8dc392
TaskGroup::Data::addTask () 
Packit 8dc392
{
Packit 8dc392
    //
Packit 8dc392
    // Any access to the taskgroup is protected by a mutex that is
Packit 8dc392
    // held by the threadpool.  Therefore it is safe to access
Packit 8dc392
    // numPending before we wait on the semaphore.
Packit 8dc392
    //
Packit 8dc392
Packit 8dc392
    if (numPending++ == 0)
Packit 8dc392
	isEmpty.wait ();
Packit 8dc392
}
Packit 8dc392
Packit 8dc392
Packit 8dc392
void
Packit 8dc392
TaskGroup::Data::removeTask ()
Packit 8dc392
{
Packit 8dc392
    // Alas, given the current bug in glibc we need a secondary
Packit 8dc392
    // syncronisation primitive here to account for the fact that
Packit 8dc392
    // destructing the isEmpty Semaphore in a separate thread can
Packit 8dc392
    // cause an error. Issuing the post call here the current libc
Packit 8dc392
    // implementation attempts to access memory *after* it has woken
Packit 8dc392
    // waiting threads.
Packit 8dc392
    // Since other threads are entitled to delete the semaphore the
Packit 8dc392
    // access to the memory location can be invalid.
Packit 8dc392
    // http://sources.redhat.com/bugzilla/show_bug.cgi?id=12674
Packit 8dc392
Packit 8dc392
    if (--numPending == 0)
Packit 8dc392
    {
Packit 8dc392
        Lock lock (dtorMutex);
Packit 8dc392
        isEmpty.post ();
Packit 8dc392
    }
Packit 8dc392
}
Packit 8dc392
    
Packit 8dc392
Packit 8dc392
//
Packit 8dc392
// struct ThreadPool::Data
Packit 8dc392
//
Packit 8dc392
Packit 8dc392
ThreadPool::Data::Data (): numTasks (0), numThreads (0), stopping (false)
Packit 8dc392
{
Packit 8dc392
    // empty
Packit 8dc392
}
Packit 8dc392
Packit 8dc392
Packit 8dc392
ThreadPool::Data::~Data()
Packit 8dc392
{
Packit 8dc392
    Lock lock (threadMutex);
Packit 8dc392
    finish ();
Packit 8dc392
}
Packit 8dc392
Packit 8dc392
Packit 8dc392
void
Packit 8dc392
ThreadPool::Data::finish ()
Packit 8dc392
{
Packit 8dc392
    stop();
Packit 8dc392
Packit 8dc392
    //
Packit 8dc392
    // Signal enough times to allow all threads to stop.
Packit 8dc392
    //
Packit 8dc392
    // Wait until all threads have started their run functions.
Packit 8dc392
    // If we do not wait before we destroy the threads then it's
Packit 8dc392
    // possible that the threads have not yet called their run
Packit 8dc392
    // functions.
Packit 8dc392
    // If this happens then the run function will be called off
Packit 8dc392
    // of an invalid object and we will crash, most likely with
Packit 8dc392
    // an error like: "pure virtual method called"
Packit 8dc392
    //
Packit 8dc392
Packit 8dc392
    for (size_t i = 0; i < numThreads; i++)
Packit 8dc392
    {
Packit 8dc392
	taskSemaphore.post();
Packit 8dc392
	threadSemaphore.wait();
Packit 8dc392
    }
Packit 8dc392
Packit 8dc392
    //
Packit 8dc392
    // Join all the threads
Packit 8dc392
    //
Packit 8dc392
Packit 8dc392
    for (list<WorkerThread*>::iterator i = threads.begin();
Packit 8dc392
	 i != threads.end();
Packit 8dc392
	 ++i)
Packit 8dc392
    {
Packit 8dc392
	delete (*i);
Packit 8dc392
    }
Packit 8dc392
Packit 8dc392
    Lock lock1 (taskMutex);
Packit 8dc392
    Lock lock2 (stopMutex);
Packit 8dc392
    threads.clear();
Packit 8dc392
    tasks.clear();
Packit 8dc392
    numThreads = 0;
Packit 8dc392
    numTasks = 0;
Packit 8dc392
    stopping = false;
Packit 8dc392
}
Packit 8dc392
Packit 8dc392
Packit 8dc392
bool
Packit 8dc392
ThreadPool::Data::stopped () const
Packit 8dc392
{
Packit 8dc392
    Lock lock (stopMutex);
Packit 8dc392
    return stopping;
Packit 8dc392
}
Packit 8dc392
Packit 8dc392
Packit 8dc392
void
Packit 8dc392
ThreadPool::Data::stop ()
Packit 8dc392
{
Packit 8dc392
    Lock lock (stopMutex);
Packit 8dc392
    stopping = true;
Packit 8dc392
}
Packit 8dc392
Packit 8dc392
Packit 8dc392
//
Packit 8dc392
// class Task
Packit 8dc392
//
Packit 8dc392
Packit 8dc392
Task::Task (TaskGroup* g): _group(g)
Packit 8dc392
{
Packit 8dc392
    // empty
Packit 8dc392
}
Packit 8dc392
Packit 8dc392
Packit 8dc392
Task::~Task()
Packit 8dc392
{
Packit 8dc392
    // empty
Packit 8dc392
}
Packit 8dc392
Packit 8dc392
Packit 8dc392
TaskGroup*
Packit 8dc392
Task::group ()
Packit 8dc392
{
Packit 8dc392
    return _group;
Packit 8dc392
}
Packit 8dc392
Packit 8dc392
Packit 8dc392
TaskGroup::TaskGroup ():
Packit 8dc392
    _data (new Data())
Packit 8dc392
{
Packit 8dc392
    // empty
Packit 8dc392
}
Packit 8dc392
Packit 8dc392
Packit 8dc392
TaskGroup::~TaskGroup ()
Packit 8dc392
{
Packit 8dc392
    delete _data;
Packit 8dc392
}
Packit 8dc392
Packit 8dc392
Packit 8dc392
//
Packit 8dc392
// class ThreadPool
Packit 8dc392
//
Packit 8dc392
Packit 8dc392
ThreadPool::ThreadPool (unsigned nthreads):
Packit 8dc392
    _data (new Data())
Packit 8dc392
{
Packit 8dc392
    setNumThreads (nthreads);
Packit 8dc392
}
Packit 8dc392
Packit 8dc392
Packit 8dc392
ThreadPool::~ThreadPool ()
Packit 8dc392
{
Packit 8dc392
    delete _data;
Packit 8dc392
}
Packit 8dc392
Packit 8dc392
Packit 8dc392
int
Packit 8dc392
ThreadPool::numThreads () const
Packit 8dc392
{
Packit 8dc392
    Lock lock (_data->threadMutex);
Packit 8dc392
    return _data->numThreads;
Packit 8dc392
}
Packit 8dc392
Packit 8dc392
Packit 8dc392
void
Packit 8dc392
ThreadPool::setNumThreads (int count)
Packit 8dc392
{
Packit 8dc392
    if (count < 0)
Packit 8dc392
        throw IEX_INTERNAL_NAMESPACE::ArgExc ("Attempt to set the number of threads "
Packit 8dc392
			   "in a thread pool to a negative value.");
Packit 8dc392
Packit 8dc392
    //
Packit 8dc392
    // Lock access to thread list and size
Packit 8dc392
    //
Packit 8dc392
Packit 8dc392
    Lock lock (_data->threadMutex);
Packit 8dc392
Packit 8dc392
    if ((size_t)count > _data->numThreads)
Packit 8dc392
    {
Packit 8dc392
	//
Packit 8dc392
        // Add more threads
Packit 8dc392
	//
Packit 8dc392
Packit 8dc392
        while (_data->numThreads < (size_t)count)
Packit 8dc392
        {
Packit 8dc392
            _data->threads.push_back (new WorkerThread (_data));
Packit 8dc392
            _data->numThreads++;
Packit 8dc392
        }
Packit 8dc392
    }
Packit 8dc392
    else if ((size_t)count < _data->numThreads)
Packit 8dc392
    {
Packit 8dc392
	//
Packit 8dc392
	// Wait until all existing threads are finished processing,
Packit 8dc392
	// then delete all threads.
Packit 8dc392
	//
Packit 8dc392
Packit 8dc392
        _data->finish ();
Packit 8dc392
Packit 8dc392
	//
Packit 8dc392
        // Add in new threads
Packit 8dc392
	//
Packit 8dc392
Packit 8dc392
        while (_data->numThreads < (size_t)count)
Packit 8dc392
        {
Packit 8dc392
            _data->threads.push_back (new WorkerThread (_data));
Packit 8dc392
            _data->numThreads++;
Packit 8dc392
        }
Packit 8dc392
    }
Packit 8dc392
}
Packit 8dc392
Packit 8dc392
Packit 8dc392
void
Packit 8dc392
ThreadPool::addTask (Task* task) 
Packit 8dc392
{
Packit 8dc392
    //
Packit 8dc392
    // Lock the threads, needed to access numThreads
Packit 8dc392
    //
Packit 8dc392
Packit 8dc392
    Lock lock (_data->threadMutex);
Packit 8dc392
Packit 8dc392
    if (_data->numThreads == 0)
Packit 8dc392
    {
Packit 8dc392
        task->execute ();
Packit 8dc392
        delete task;
Packit 8dc392
    }
Packit 8dc392
    else
Packit 8dc392
    {
Packit 8dc392
	//
Packit 8dc392
        // Get exclusive access to the tasks queue
Packit 8dc392
	//
Packit 8dc392
Packit 8dc392
        {
Packit 8dc392
            Lock taskLock (_data->taskMutex);
Packit 8dc392
Packit 8dc392
	    //
Packit 8dc392
            // Push the new task into the FIFO
Packit 8dc392
	    //
Packit 8dc392
Packit 8dc392
            _data->tasks.push_back (task);
Packit 8dc392
            _data->numTasks++;
Packit 8dc392
            task->group()->_data->addTask();
Packit 8dc392
        }
Packit 8dc392
        
Packit 8dc392
	//
Packit 8dc392
        // Signal that we have a new task to process
Packit 8dc392
	//
Packit 8dc392
Packit 8dc392
        _data->taskSemaphore.post ();
Packit 8dc392
    }
Packit 8dc392
}
Packit 8dc392
Packit 8dc392
Packit 8dc392
ThreadPool&
Packit 8dc392
ThreadPool::globalThreadPool ()
Packit 8dc392
{
Packit 8dc392
    //
Packit 8dc392
    // The global thread pool
Packit 8dc392
    //
Packit 8dc392
    
Packit 8dc392
    static ThreadPool gThreadPool (0);
Packit 8dc392
Packit 8dc392
    return gThreadPool;
Packit 8dc392
}
Packit 8dc392
Packit 8dc392
Packit 8dc392
void
Packit 8dc392
ThreadPool::addGlobalTask (Task* task)
Packit 8dc392
{
Packit 8dc392
    globalThreadPool().addTask (task);
Packit 8dc392
}
Packit 8dc392
Packit 8dc392
Packit 8dc392
ILMTHREAD_INTERNAL_NAMESPACE_SOURCE_EXIT