Blob Blame History Raw
///////////////////////////////////////////////////////////////////////////
//
// Copyright (c) 2005-2012, Industrial Light & Magic, a division of Lucas
// Digital Ltd. LLC
// 
// All rights reserved.
// 
// Redistribution and use in source and binary forms, with or without
// modification, are permitted provided that the following conditions are
// met:
// *       Redistributions of source code must retain the above copyright
// notice, this list of conditions and the following disclaimer.
// *       Redistributions in binary form must reproduce the above
// copyright notice, this list of conditions and the following disclaimer
// in the documentation and/or other materials provided with the
// distribution.
// *       Neither the name of Industrial Light & Magic nor the names of
// its contributors may be used to endorse or promote products derived
// from this software without specific prior written permission. 
// 
// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
// "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
// LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
// A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
// OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
// LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
// DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
// THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
// (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
// OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
//
///////////////////////////////////////////////////////////////////////////

//-----------------------------------------------------------------------------
//
//	class Task, class ThreadPool, class TaskGroup
//
//-----------------------------------------------------------------------------

#include "IlmThread.h"
#include "IlmThreadMutex.h"
#include "IlmThreadSemaphore.h"
#include "IlmThreadPool.h"
#include "Iex.h"
#include <list>

using namespace std;

ILMTHREAD_INTERNAL_NAMESPACE_SOURCE_ENTER
namespace {

class WorkerThread: public Thread
{
  public:

    WorkerThread (ThreadPool::Data* data);

    virtual void	run ();
    
  private:

    ThreadPool::Data *	_data;
};

} //namespace


struct TaskGroup::Data
{
     Data ();
    ~Data ();
    
    void	addTask () ;
    void	removeTask ();
    
    Semaphore	isEmpty;        // used to signal that the taskgroup is empty
    int         numPending;     // number of pending tasks to still execute
    Mutex       dtorMutex;      // used to work around the glibc bug:
                                // http://sources.redhat.com/bugzilla/show_bug.cgi?id=12674
};


struct ThreadPool::Data
{
     Data ();
    ~Data();
    
    void	finish ();
    bool	stopped () const;
    void	stop ();

    Semaphore taskSemaphore;        // threads wait on this for ready tasks
    Mutex taskMutex;                // mutual exclusion for the tasks list
    list<Task*> tasks;              // the list of tasks to execute
    size_t numTasks;                // fast access to list size
                                    //   (list::size() can be O(n))

    Semaphore threadSemaphore;      // signaled when a thread starts executing
    Mutex threadMutex;              // mutual exclusion for threads list
    list<WorkerThread*> threads;    // the list of all threads
    size_t numThreads;              // fast access to list size
    
    bool stopping;                  // flag indicating whether to stop threads
    Mutex stopMutex;                // mutual exclusion for stopping flag
};



//
// class WorkerThread
//

WorkerThread::WorkerThread (ThreadPool::Data* data):
    _data (data)
{
    start();
}


void
WorkerThread::run ()
{
    //
    // Signal that the thread has started executing
    //

    _data->threadSemaphore.post();

    while (true)
    {
	//
        // Wait for a task to become available
	//

        _data->taskSemaphore.wait();

        {
            Lock taskLock (_data->taskMutex);
    
	    //
            // If there is a task pending, pop off the next task in the FIFO
	    //

            if (_data->numTasks > 0)
            {
                Task* task = _data->tasks.front();
		TaskGroup* taskGroup = task->group();
                _data->tasks.pop_front();
                _data->numTasks--;

                taskLock.release();
                task->execute();
                taskLock.acquire();

                delete task;
                taskGroup->_data->removeTask();
            }
            else if (_data->stopped())
	    {
                break;
	    }
        }
    }
}


//
// struct TaskGroup::Data
//

TaskGroup::Data::Data (): isEmpty (1), numPending (0)
{
    // empty
}


TaskGroup::Data::~Data ()
{
    //
    // A TaskGroup acts like an "inverted" semaphore: if the count
    // is above 0 then waiting on the taskgroup will block.  This
    // destructor waits until the taskgroup is empty before returning.
    //

    isEmpty.wait ();

    // Alas, given the current bug in glibc we need a secondary
    // syncronisation primitive here to account for the fact that
    // destructing the isEmpty Semaphore in this thread can cause
    // an error for a separate thread that is issuing the post() call.
    // We are entitled to destruct the semaphore at this point, however,
    // that post() call attempts to access data out of the associated
    // memory *after* it has woken the waiting threads, including this one,
    // potentially leading to invalid memory reads.
    // http://sources.redhat.com/bugzilla/show_bug.cgi?id=12674

    Lock lock (dtorMutex);
}


void
TaskGroup::Data::addTask () 
{
    //
    // Any access to the taskgroup is protected by a mutex that is
    // held by the threadpool.  Therefore it is safe to access
    // numPending before we wait on the semaphore.
    //

    if (numPending++ == 0)
	isEmpty.wait ();
}


void
TaskGroup::Data::removeTask ()
{
    // Alas, given the current bug in glibc we need a secondary
    // syncronisation primitive here to account for the fact that
    // destructing the isEmpty Semaphore in a separate thread can
    // cause an error. Issuing the post call here the current libc
    // implementation attempts to access memory *after* it has woken
    // waiting threads.
    // Since other threads are entitled to delete the semaphore the
    // access to the memory location can be invalid.
    // http://sources.redhat.com/bugzilla/show_bug.cgi?id=12674

    if (--numPending == 0)
    {
        Lock lock (dtorMutex);
        isEmpty.post ();
    }
}
    

//
// struct ThreadPool::Data
//

ThreadPool::Data::Data (): numTasks (0), numThreads (0), stopping (false)
{
    // empty
}


ThreadPool::Data::~Data()
{
    Lock lock (threadMutex);
    finish ();
}


void
ThreadPool::Data::finish ()
{
    stop();

    //
    // Signal enough times to allow all threads to stop.
    //
    // Wait until all threads have started their run functions.
    // If we do not wait before we destroy the threads then it's
    // possible that the threads have not yet called their run
    // functions.
    // If this happens then the run function will be called off
    // of an invalid object and we will crash, most likely with
    // an error like: "pure virtual method called"
    //

    for (size_t i = 0; i < numThreads; i++)
    {
	taskSemaphore.post();
	threadSemaphore.wait();
    }

    //
    // Join all the threads
    //

    for (list<WorkerThread*>::iterator i = threads.begin();
	 i != threads.end();
	 ++i)
    {
	delete (*i);
    }

    Lock lock1 (taskMutex);
    Lock lock2 (stopMutex);
    threads.clear();
    tasks.clear();
    numThreads = 0;
    numTasks = 0;
    stopping = false;
}


bool
ThreadPool::Data::stopped () const
{
    Lock lock (stopMutex);
    return stopping;
}


void
ThreadPool::Data::stop ()
{
    Lock lock (stopMutex);
    stopping = true;
}


//
// class Task
//

Task::Task (TaskGroup* g): _group(g)
{
    // empty
}


Task::~Task()
{
    // empty
}


TaskGroup*
Task::group ()
{
    return _group;
}


TaskGroup::TaskGroup ():
    _data (new Data())
{
    // empty
}


TaskGroup::~TaskGroup ()
{
    delete _data;
}


//
// class ThreadPool
//

ThreadPool::ThreadPool (unsigned nthreads):
    _data (new Data())
{
    setNumThreads (nthreads);
}


ThreadPool::~ThreadPool ()
{
    delete _data;
}


int
ThreadPool::numThreads () const
{
    Lock lock (_data->threadMutex);
    return _data->numThreads;
}


void
ThreadPool::setNumThreads (int count)
{
    if (count < 0)
        throw IEX_INTERNAL_NAMESPACE::ArgExc ("Attempt to set the number of threads "
			   "in a thread pool to a negative value.");

    //
    // Lock access to thread list and size
    //

    Lock lock (_data->threadMutex);

    if ((size_t)count > _data->numThreads)
    {
	//
        // Add more threads
	//

        while (_data->numThreads < (size_t)count)
        {
            _data->threads.push_back (new WorkerThread (_data));
            _data->numThreads++;
        }
    }
    else if ((size_t)count < _data->numThreads)
    {
	//
	// Wait until all existing threads are finished processing,
	// then delete all threads.
	//

        _data->finish ();

	//
        // Add in new threads
	//

        while (_data->numThreads < (size_t)count)
        {
            _data->threads.push_back (new WorkerThread (_data));
            _data->numThreads++;
        }
    }
}


void
ThreadPool::addTask (Task* task) 
{
    //
    // Lock the threads, needed to access numThreads
    //

    Lock lock (_data->threadMutex);

    if (_data->numThreads == 0)
    {
        task->execute ();
        delete task;
    }
    else
    {
	//
        // Get exclusive access to the tasks queue
	//

        {
            Lock taskLock (_data->taskMutex);

	    //
            // Push the new task into the FIFO
	    //

            _data->tasks.push_back (task);
            _data->numTasks++;
            task->group()->_data->addTask();
        }
        
	//
        // Signal that we have a new task to process
	//

        _data->taskSemaphore.post ();
    }
}


ThreadPool&
ThreadPool::globalThreadPool ()
{
    //
    // The global thread pool
    //
    
    static ThreadPool gThreadPool (0);

    return gThreadPool;
}


void
ThreadPool::addGlobalTask (Task* task)
{
    globalThreadPool().addTask (task);
}


ILMTHREAD_INTERNAL_NAMESPACE_SOURCE_EXIT