/////////////////////////////////////////////////////////////////////////// // // Copyright (c) 2005-2012, Industrial Light & Magic, a division of Lucas // Digital Ltd. LLC // // All rights reserved. // // Redistribution and use in source and binary forms, with or without // modification, are permitted provided that the following conditions are // met: // * Redistributions of source code must retain the above copyright // notice, this list of conditions and the following disclaimer. // * Redistributions in binary form must reproduce the above // copyright notice, this list of conditions and the following disclaimer // in the documentation and/or other materials provided with the // distribution. // * Neither the name of Industrial Light & Magic nor the names of // its contributors may be used to endorse or promote products derived // from this software without specific prior written permission. // // THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS // "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT // LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR // A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT // OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, // SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT // LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, // DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY // THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT // (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE // OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. // /////////////////////////////////////////////////////////////////////////// //----------------------------------------------------------------------------- // // class Task, class ThreadPool, class TaskGroup // //----------------------------------------------------------------------------- #include "IlmThread.h" #include "IlmThreadMutex.h" #include "IlmThreadSemaphore.h" #include "IlmThreadPool.h" #include "Iex.h" #include using namespace std; ILMTHREAD_INTERNAL_NAMESPACE_SOURCE_ENTER namespace { class WorkerThread: public Thread { public: WorkerThread (ThreadPool::Data* data); virtual void run (); private: ThreadPool::Data * _data; }; } //namespace struct TaskGroup::Data { Data (); ~Data (); void addTask () ; void removeTask (); Semaphore isEmpty; // used to signal that the taskgroup is empty int numPending; // number of pending tasks to still execute Mutex dtorMutex; // used to work around the glibc bug: // http://sources.redhat.com/bugzilla/show_bug.cgi?id=12674 }; struct ThreadPool::Data { Data (); ~Data(); void finish (); bool stopped () const; void stop (); Semaphore taskSemaphore; // threads wait on this for ready tasks Mutex taskMutex; // mutual exclusion for the tasks list list tasks; // the list of tasks to execute size_t numTasks; // fast access to list size // (list::size() can be O(n)) Semaphore threadSemaphore; // signaled when a thread starts executing Mutex threadMutex; // mutual exclusion for threads list list threads; // the list of all threads size_t numThreads; // fast access to list size bool stopping; // flag indicating whether to stop threads Mutex stopMutex; // mutual exclusion for stopping flag }; // // class WorkerThread // WorkerThread::WorkerThread (ThreadPool::Data* data): _data (data) { start(); } void WorkerThread::run () { // // Signal that the thread has started executing // _data->threadSemaphore.post(); while (true) { // // Wait for a task to become available // _data->taskSemaphore.wait(); { Lock taskLock (_data->taskMutex); // // If there is a task pending, pop off the next task in the FIFO // if (_data->numTasks > 0) { Task* task = _data->tasks.front(); TaskGroup* taskGroup = task->group(); _data->tasks.pop_front(); _data->numTasks--; taskLock.release(); task->execute(); taskLock.acquire(); delete task; taskGroup->_data->removeTask(); } else if (_data->stopped()) { break; } } } } // // struct TaskGroup::Data // TaskGroup::Data::Data (): isEmpty (1), numPending (0) { // empty } TaskGroup::Data::~Data () { // // A TaskGroup acts like an "inverted" semaphore: if the count // is above 0 then waiting on the taskgroup will block. This // destructor waits until the taskgroup is empty before returning. // isEmpty.wait (); // Alas, given the current bug in glibc we need a secondary // syncronisation primitive here to account for the fact that // destructing the isEmpty Semaphore in this thread can cause // an error for a separate thread that is issuing the post() call. // We are entitled to destruct the semaphore at this point, however, // that post() call attempts to access data out of the associated // memory *after* it has woken the waiting threads, including this one, // potentially leading to invalid memory reads. // http://sources.redhat.com/bugzilla/show_bug.cgi?id=12674 Lock lock (dtorMutex); } void TaskGroup::Data::addTask () { // // Any access to the taskgroup is protected by a mutex that is // held by the threadpool. Therefore it is safe to access // numPending before we wait on the semaphore. // if (numPending++ == 0) isEmpty.wait (); } void TaskGroup::Data::removeTask () { // Alas, given the current bug in glibc we need a secondary // syncronisation primitive here to account for the fact that // destructing the isEmpty Semaphore in a separate thread can // cause an error. Issuing the post call here the current libc // implementation attempts to access memory *after* it has woken // waiting threads. // Since other threads are entitled to delete the semaphore the // access to the memory location can be invalid. // http://sources.redhat.com/bugzilla/show_bug.cgi?id=12674 if (--numPending == 0) { Lock lock (dtorMutex); isEmpty.post (); } } // // struct ThreadPool::Data // ThreadPool::Data::Data (): numTasks (0), numThreads (0), stopping (false) { // empty } ThreadPool::Data::~Data() { Lock lock (threadMutex); finish (); } void ThreadPool::Data::finish () { stop(); // // Signal enough times to allow all threads to stop. // // Wait until all threads have started their run functions. // If we do not wait before we destroy the threads then it's // possible that the threads have not yet called their run // functions. // If this happens then the run function will be called off // of an invalid object and we will crash, most likely with // an error like: "pure virtual method called" // for (size_t i = 0; i < numThreads; i++) { taskSemaphore.post(); threadSemaphore.wait(); } // // Join all the threads // for (list::iterator i = threads.begin(); i != threads.end(); ++i) { delete (*i); } Lock lock1 (taskMutex); Lock lock2 (stopMutex); threads.clear(); tasks.clear(); numThreads = 0; numTasks = 0; stopping = false; } bool ThreadPool::Data::stopped () const { Lock lock (stopMutex); return stopping; } void ThreadPool::Data::stop () { Lock lock (stopMutex); stopping = true; } // // class Task // Task::Task (TaskGroup* g): _group(g) { // empty } Task::~Task() { // empty } TaskGroup* Task::group () { return _group; } TaskGroup::TaskGroup (): _data (new Data()) { // empty } TaskGroup::~TaskGroup () { delete _data; } // // class ThreadPool // ThreadPool::ThreadPool (unsigned nthreads): _data (new Data()) { setNumThreads (nthreads); } ThreadPool::~ThreadPool () { delete _data; } int ThreadPool::numThreads () const { Lock lock (_data->threadMutex); return _data->numThreads; } void ThreadPool::setNumThreads (int count) { if (count < 0) throw IEX_INTERNAL_NAMESPACE::ArgExc ("Attempt to set the number of threads " "in a thread pool to a negative value."); // // Lock access to thread list and size // Lock lock (_data->threadMutex); if ((size_t)count > _data->numThreads) { // // Add more threads // while (_data->numThreads < (size_t)count) { _data->threads.push_back (new WorkerThread (_data)); _data->numThreads++; } } else if ((size_t)count < _data->numThreads) { // // Wait until all existing threads are finished processing, // then delete all threads. // _data->finish (); // // Add in new threads // while (_data->numThreads < (size_t)count) { _data->threads.push_back (new WorkerThread (_data)); _data->numThreads++; } } } void ThreadPool::addTask (Task* task) { // // Lock the threads, needed to access numThreads // Lock lock (_data->threadMutex); if (_data->numThreads == 0) { task->execute (); delete task; } else { // // Get exclusive access to the tasks queue // { Lock taskLock (_data->taskMutex); // // Push the new task into the FIFO // _data->tasks.push_back (task); _data->numTasks++; task->group()->_data->addTask(); } // // Signal that we have a new task to process // _data->taskSemaphore.post (); } } ThreadPool& ThreadPool::globalThreadPool () { // // The global thread pool // static ThreadPool gThreadPool (0); return gThreadPool; } void ThreadPool::addGlobalTask (Task* task) { globalThreadPool().addTask (task); } ILMTHREAD_INTERNAL_NAMESPACE_SOURCE_EXIT