123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851 |
- ///////////////////////////////////////////////////////////////////////////
- //
- // Copyright (c) 2005-2012, Industrial Light & Magic, a division of Lucas
- // Digital Ltd. LLC
- //
- // All rights reserved.
- //
- // Redistribution and use in source and binary forms, with or without
- // modification, are permitted provided that the following conditions are
- // met:
- // * Redistributions of source code must retain the above copyright
- // notice, this list of conditions and the following disclaimer.
- // * Redistributions in binary form must reproduce the above
- // copyright notice, this list of conditions and the following disclaimer
- // in the documentation and/or other materials provided with the
- // distribution.
- // * Neither the name of Industrial Light & Magic nor the names of
- // its contributors may be used to endorse or promote products derived
- // from this software without specific prior written permission.
- //
- // THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
- // "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
- // LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
- // A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
- // OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
- // SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
- // LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
- // DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
- // THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
- // (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
- // OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
- //
- ///////////////////////////////////////////////////////////////////////////
- //-----------------------------------------------------------------------------
- //
- // class Task, class ThreadPool, class TaskGroup
- //
- //-----------------------------------------------------------------------------
- #include "IlmThread.h"
- #include "IlmThreadMutex.h"
- #include "IlmThreadSemaphore.h"
- #include "IlmThreadPool.h"
- #include "Iex.h"
- #include <vector>
- #ifndef ILMBASE_FORCE_CXX03
- # include <memory>
- # include <atomic>
- # include <thread>
- #endif
- using namespace std;
- ILMTHREAD_INTERNAL_NAMESPACE_SOURCE_ENTER
- #if defined(__GNU_LIBRARY__) && ( __GLIBC__ < 2 || ( __GLIBC__ == 2 && __GLIBC_MINOR__ < 21 ) )
- # define ENABLE_SEM_DTOR_WORKAROUND
- #endif
- struct TaskGroup::Data
- {
- Data ();
- ~Data ();
-
- void addTask () ;
- void removeTask ();
- #ifndef ILMBASE_FORCE_CXX03
- std::atomic<int> numPending;
- #else
- int numPending; // number of pending tasks to still execute
- #endif
- Semaphore isEmpty; // used to signal that the taskgroup is empty
- #if defined(ENABLE_SEM_DTOR_WORKAROUND) || defined(ILMBASE_FORCE_CXX03)
- // this mutex is also used to lock numPending in the legacy c++ mode...
- Mutex dtorMutex; // used to work around the glibc bug:
- // http://sources.redhat.com/bugzilla/show_bug.cgi?id=12674
- #endif
- };
- struct ThreadPool::Data
- {
- typedef ThreadPoolProvider *TPPointer;
- Data ();
- ~Data();
- struct SafeProvider
- {
- SafeProvider (Data *d, ThreadPoolProvider *p) : _data( d ), _ptr( p )
- {
- }
- ~SafeProvider()
- {
- if ( _data )
- _data->coalesceProviderUse();
- }
- SafeProvider (const SafeProvider &o)
- : _data( o._data ), _ptr( o._ptr )
- {
- if ( _data )
- _data->bumpProviderUse();
- }
- SafeProvider &operator= (const SafeProvider &o)
- {
- if ( this != &o )
- {
- if ( o._data )
- o._data->bumpProviderUse();
- if ( _data )
- _data->coalesceProviderUse();
- _data = o._data;
- _ptr = o._ptr;
- }
- return *this;
- }
- #ifndef ILMBASE_FORCE_CXX03
- SafeProvider( SafeProvider &&o )
- : _data( o._data ), _ptr( o._ptr )
- {
- o._data = nullptr;
- }
- SafeProvider &operator=( SafeProvider &&o )
- {
- std::swap( _data, o._data );
- std::swap( _ptr, o._ptr );
- return *this;
- }
- #endif
- inline ThreadPoolProvider *get () const
- {
- return _ptr;
- }
- ThreadPoolProvider *operator-> () const
- {
- return get();
- }
- Data *_data;
- ThreadPoolProvider *_ptr;
- };
- // NB: In C++20, there is full support for atomic shared_ptr, but that is not
- // yet in use or finalized. Once stabilized, add appropriate usage here
- inline SafeProvider getProvider ();
- inline void coalesceProviderUse ();
- inline void bumpProviderUse ();
- inline void setProvider (ThreadPoolProvider *p);
- #ifdef ILMBASE_FORCE_CXX03
- Semaphore provSem;
- Mutex provMutex;
- int provUsers;
- ThreadPoolProvider *provider;
- ThreadPoolProvider *oldprovider;
- #else
- std::atomic<ThreadPoolProvider *> provider;
- std::atomic<int> provUsers;
- #endif
- };
- namespace {
- class DefaultWorkerThread;
- struct DefaultWorkData
- {
- Semaphore taskSemaphore; // threads wait on this for ready tasks
- mutable Mutex taskMutex; // mutual exclusion for the tasks list
- vector<Task*> tasks; // the list of tasks to execute
- Semaphore threadSemaphore; // signaled when a thread starts executing
- mutable Mutex threadMutex; // mutual exclusion for threads list
- vector<DefaultWorkerThread*> threads; // the list of all threads
-
- #ifdef ILMBASE_FORCE_CXX03
- bool stopping; // flag indicating whether to stop threads
- mutable Mutex stopMutex; // mutual exclusion for stopping flag
- #else
- std::atomic<bool> hasThreads;
- std::atomic<bool> stopping;
- #endif
- inline bool stopped () const
- {
- #ifdef ILMBASE_FORCE_CXX03
- Lock lock (stopMutex);
- return stopping;
- #else
- return stopping.load( std::memory_order_relaxed );
- #endif
- }
- inline void stop ()
- {
- #ifdef ILMBASE_FORCE_CXX03
- Lock lock (stopMutex);
- #endif
- stopping = true;
- }
- };
- //
- // class WorkerThread
- //
- class DefaultWorkerThread: public Thread
- {
- public:
- DefaultWorkerThread (DefaultWorkData* data);
- virtual void run ();
-
- private:
- DefaultWorkData * _data;
- };
- DefaultWorkerThread::DefaultWorkerThread (DefaultWorkData* data):
- _data (data)
- {
- start();
- }
- void
- DefaultWorkerThread::run ()
- {
- //
- // Signal that the thread has started executing
- //
- _data->threadSemaphore.post();
- while (true)
- {
- //
- // Wait for a task to become available
- //
- _data->taskSemaphore.wait();
- {
- Lock taskLock (_data->taskMutex);
-
- //
- // If there is a task pending, pop off the next task in the FIFO
- //
- if (!_data->tasks.empty())
- {
- Task* task = _data->tasks.back();
- _data->tasks.pop_back();
- taskLock.release();
- TaskGroup* taskGroup = task->group();
- task->execute();
- delete task;
- taskGroup->_data->removeTask ();
- }
- else if (_data->stopped())
- {
- break;
- }
- }
- }
- }
- //
- // class DefaultThreadPoolProvider
- //
- class DefaultThreadPoolProvider : public ThreadPoolProvider
- {
- public:
- DefaultThreadPoolProvider(int count);
- virtual ~DefaultThreadPoolProvider();
- virtual int numThreads() const;
- virtual void setNumThreads(int count);
- virtual void addTask(Task *task);
- virtual void finish();
- private:
- DefaultWorkData _data;
- };
- DefaultThreadPoolProvider::DefaultThreadPoolProvider (int count)
- {
- setNumThreads(count);
- }
- DefaultThreadPoolProvider::~DefaultThreadPoolProvider ()
- {
- finish();
- }
- int
- DefaultThreadPoolProvider::numThreads () const
- {
- Lock lock (_data.threadMutex);
- return static_cast<int> (_data.threads.size());
- }
- void
- DefaultThreadPoolProvider::setNumThreads (int count)
- {
- //
- // Lock access to thread list and size
- //
- Lock lock (_data.threadMutex);
- size_t desired = static_cast<size_t>(count);
- if (desired > _data.threads.size())
- {
- //
- // Add more threads
- //
- while (_data.threads.size() < desired)
- _data.threads.push_back (new DefaultWorkerThread (&_data));
- }
- else if ((size_t)count < _data.threads.size())
- {
- //
- // Wait until all existing threads are finished processing,
- // then delete all threads.
- //
- finish ();
- //
- // Add in new threads
- //
- while (_data.threads.size() < desired)
- _data.threads.push_back (new DefaultWorkerThread (&_data));
- }
- #ifndef ILMBASE_FORCE_CXX03
- _data.hasThreads = !(_data.threads.empty());
- #endif
- }
- void
- DefaultThreadPoolProvider::addTask (Task *task)
- {
- //
- // Lock the threads, needed to access numThreads
- //
- #ifdef ILMBASE_FORCE_CXX03
- bool doPush;
- {
- Lock lock (_data.threadMutex);
- doPush = !_data.threads.empty();
- }
- #else
- bool doPush = _data.hasThreads.load( std::memory_order_relaxed );
- #endif
- if ( doPush )
- {
- //
- // Get exclusive access to the tasks queue
- //
- {
- Lock taskLock (_data.taskMutex);
- //
- // Push the new task into the FIFO
- //
- _data.tasks.push_back (task);
- }
-
- //
- // Signal that we have a new task to process
- //
- _data.taskSemaphore.post ();
- }
- else
- {
- // this path shouldn't normally happen since we have the
- // NullThreadPoolProvider, but just in case...
- task->execute ();
- task->group()->_data->removeTask ();
- delete task;
- }
- }
- void
- DefaultThreadPoolProvider::finish ()
- {
- _data.stop();
- //
- // Signal enough times to allow all threads to stop.
- //
- // Wait until all threads have started their run functions.
- // If we do not wait before we destroy the threads then it's
- // possible that the threads have not yet called their run
- // functions.
- // If this happens then the run function will be called off
- // of an invalid object and we will crash, most likely with
- // an error like: "pure virtual method called"
- //
- size_t curT = _data.threads.size();
- for (size_t i = 0; i != curT; ++i)
- {
- _data.taskSemaphore.post();
- _data.threadSemaphore.wait();
- }
- //
- // Join all the threads
- //
- for (size_t i = 0; i != curT; ++i)
- delete _data.threads[i];
- Lock lock1 (_data.taskMutex);
- #ifdef ILMBASE_FORCE_CXX03
- Lock lock2 (_data.stopMutex);
- #endif
- _data.threads.clear();
- _data.tasks.clear();
- _data.stopping = false;
- }
- class NullThreadPoolProvider : public ThreadPoolProvider
- {
- virtual ~NullThreadPoolProvider() {}
- virtual int numThreads () const { return 0; }
- virtual void setNumThreads (int count)
- {
- }
- virtual void addTask (Task *t)
- {
- t->execute ();
- t->group()->_data->removeTask ();
- delete t;
- }
- virtual void finish () {}
- };
- } //namespace
- //
- // struct TaskGroup::Data
- //
- TaskGroup::Data::Data (): isEmpty (1), numPending (0)
- {
- // empty
- }
- TaskGroup::Data::~Data ()
- {
- //
- // A TaskGroup acts like an "inverted" semaphore: if the count
- // is above 0 then waiting on the taskgroup will block. This
- // destructor waits until the taskgroup is empty before returning.
- //
- isEmpty.wait ();
- #ifdef ENABLE_SEM_DTOR_WORKAROUND
- // Update: this was fixed in v. 2.2.21, so this ifdef checks for that
- //
- // Alas, given the current bug in glibc we need a secondary
- // syncronisation primitive here to account for the fact that
- // destructing the isEmpty Semaphore in this thread can cause
- // an error for a separate thread that is issuing the post() call.
- // We are entitled to destruct the semaphore at this point, however,
- // that post() call attempts to access data out of the associated
- // memory *after* it has woken the waiting threads, including this one,
- // potentially leading to invalid memory reads.
- // http://sources.redhat.com/bugzilla/show_bug.cgi?id=12674
- Lock lock (dtorMutex);
- #endif
- }
- void
- TaskGroup::Data::addTask ()
- {
- //
- // in c++11, we use an atomic to protect numPending to avoid the
- // extra lock but for c++98, to add the ability for custom thread
- // pool we add the lock here
- //
- #if ILMBASE_FORCE_CXX03
- Lock lock (dtorMutex);
- #endif
- if (numPending++ == 0)
- isEmpty.wait ();
- }
- void
- TaskGroup::Data::removeTask ()
- {
- // Alas, given the current bug in glibc we need a secondary
- // syncronisation primitive here to account for the fact that
- // destructing the isEmpty Semaphore in a separate thread can
- // cause an error. Issuing the post call here the current libc
- // implementation attempts to access memory *after* it has woken
- // waiting threads.
- // Since other threads are entitled to delete the semaphore the
- // access to the memory location can be invalid.
- // http://sources.redhat.com/bugzilla/show_bug.cgi?id=12674
- // Update: this bug has been fixed, but how do we know which
- // glibc version we're in?
- // Further update:
- //
- // we could remove this if it is a new enough glibc, however
- // we've changed the API to enable a custom override of a
- // thread pool. In order to provide safe access to the numPending,
- // we need the lock anyway, except for c++11 or newer
- #ifdef ILMBASE_FORCE_CXX03
- Lock lock (dtorMutex);
- if (--numPending == 0)
- isEmpty.post ();
- #else
- if (--numPending == 0)
- {
- #ifdef ENABLE_SEM_DTOR_WORKAROUND
- Lock lock (dtorMutex);
- #endif
- isEmpty.post ();
- }
- #endif
- }
-
- //
- // struct ThreadPool::Data
- //
- ThreadPool::Data::Data ():
- provUsers (0), provider (NULL)
- #ifdef ILMBASE_FORCE_CXX03
- , oldprovider (NULL)
- #else
- #endif
- {
- // empty
- }
- ThreadPool::Data::~Data()
- {
- #ifdef ILMBASE_FORCE_CXX03
- provider->finish();
- #else
- ThreadPoolProvider *p = provider.load( std::memory_order_relaxed );
- p->finish();
- #endif
- }
- inline ThreadPool::Data::SafeProvider
- ThreadPool::Data::getProvider ()
- {
- #ifdef ILMBASE_FORCE_CXX03
- Lock provLock( provMutex );
- ++provUsers;
- return SafeProvider( this, provider );
- #else
- provUsers.fetch_add( 1, std::memory_order_relaxed );
- return SafeProvider( this, provider.load( std::memory_order_relaxed ) );
- #endif
- }
- inline void
- ThreadPool::Data::coalesceProviderUse ()
- {
- #ifdef ILMBASE_FORCE_CXX03
- Lock provLock( provMutex );
- --provUsers;
- if ( provUsers == 0 )
- {
- if ( oldprovider )
- provSem.post();
- }
- #else
- int ov = provUsers.fetch_sub( 1, std::memory_order_relaxed );
- // ov is the previous value, so one means that now it might be 0
- if ( ov == 1 )
- {
-
- }
- #endif
- }
- inline void
- ThreadPool::Data::bumpProviderUse ()
- {
- #ifdef ILMBASE_FORCE_CXX03
- Lock lock (provMutex);
- ++provUsers;
- #else
- provUsers.fetch_add( 1, std::memory_order_relaxed );
- #endif
- }
- inline void
- ThreadPool::Data::setProvider (ThreadPoolProvider *p)
- {
- #ifdef ILMBASE_FORCE_CXX03
- Lock provLock( provMutex );
- if ( oldprovider )
- throw IEX_INTERNAL_NAMESPACE::ArgExc ("Attempt to set the thread pool provider while"
- " another thread is currently setting the provider.");
- oldprovider = provider;
- provider = p;
- while ( provUsers > 0 )
- {
- provLock.release();
- provSem.wait();
- provLock.acquire();
- }
- if ( oldprovider )
- {
- oldprovider->finish();
- delete oldprovider;
- oldprovider = NULL;
- }
- #else
- ThreadPoolProvider *old = provider.load( std::memory_order_relaxed );
- do
- {
- if ( ! provider.compare_exchange_weak( old, p, std::memory_order_release, std::memory_order_relaxed ) )
- continue;
- } while ( false );
- // wait for any other users to finish prior to deleting, given
- // that these are just mostly to query the thread count or push a
- // task to the queue (so fast), just spin...
- //
- // (well, and normally, people don't do this mid stream anyway, so
- // this will be 0 99.999% of the time, but just to be safe)
- //
- while ( provUsers.load( std::memory_order_relaxed ) > 0 )
- std::this_thread::yield();
- if ( old )
- {
- old->finish();
- delete old;
- }
- // NB: the shared_ptr mechanism is safer and means we don't have
- // to have the provUsers counter since the shared_ptr keeps that
- // for us. However, gcc 4.8/9 compilers which many people are
- // still using even though it is 2018 forgot to add the shared_ptr
- // functions... once that compiler is fully deprecated, switch to
- // using the below, change provider to a std::shared_ptr and remove
- // provUsers...
- //
- // std::shared_ptr<ThreadPoolProvider> newp( p );
- // std::shared_ptr<ThreadPoolProvider> curp = std::atomic_load_explicit( &provider, std::memory_order_relaxed );
- // do
- // {
- // if ( ! std::atomic_compare_exchange_weak_explicit( &provider, &curp, newp, std::memory_order_release, std::memory_order_relaxed ) )
- // continue;
- // } while ( false );
- // if ( curp )
- // curp->finish();
- #endif
- }
- //
- // class Task
- //
- Task::Task (TaskGroup* g): _group(g)
- {
- if ( g )
- g->_data->addTask ();
- }
- Task::~Task()
- {
- // empty
- }
- TaskGroup*
- Task::group ()
- {
- return _group;
- }
- TaskGroup::TaskGroup ():
- _data (new Data())
- {
- // empty
- }
- TaskGroup::~TaskGroup ()
- {
- delete _data;
- }
- void
- TaskGroup::finishOneTask ()
- {
- _data->removeTask ();
- }
- //
- // class ThreadPoolProvider
- //
- ThreadPoolProvider::ThreadPoolProvider()
- {
- }
- ThreadPoolProvider::~ThreadPoolProvider()
- {
- }
- //
- // class ThreadPool
- //
- ThreadPool::ThreadPool (unsigned nthreads):
- _data (new Data)
- {
- if ( nthreads == 0 )
- _data->setProvider( new NullThreadPoolProvider );
- else
- _data->setProvider( new DefaultThreadPoolProvider( int(nthreads) ) );
- }
- ThreadPool::~ThreadPool ()
- {
- delete _data;
- }
- int
- ThreadPool::numThreads () const
- {
- return _data->getProvider ()->numThreads ();
- }
- void
- ThreadPool::setNumThreads (int count)
- {
- if (count < 0)
- throw IEX_INTERNAL_NAMESPACE::ArgExc ("Attempt to set the number of threads "
- "in a thread pool to a negative value.");
- bool doReset = false;
- {
- Data::SafeProvider sp = _data->getProvider ();
- int curT = sp->numThreads ();
- if ( curT == count )
- return;
- if ( curT == 0 )
- {
- NullThreadPoolProvider *npp = dynamic_cast<NullThreadPoolProvider *>( sp.get() );
- if ( npp )
- doReset = true;
- }
- else if ( count == 0 )
- {
- DefaultThreadPoolProvider *dpp = dynamic_cast<DefaultThreadPoolProvider *>( sp.get() );
- if ( dpp )
- doReset = true;
- }
- if ( ! doReset )
- sp->setNumThreads( count );
- }
- if ( doReset )
- {
- if ( count == 0 )
- _data->setProvider( new NullThreadPoolProvider );
- else
- _data->setProvider( new DefaultThreadPoolProvider( count ) );
- }
- }
- void
- ThreadPool::setThreadProvider (ThreadPoolProvider *provider)
- {
- _data->setProvider (provider);
- }
- void
- ThreadPool::addTask (Task* task)
- {
- _data->getProvider ()->addTask (task);
- }
- ThreadPool&
- ThreadPool::globalThreadPool ()
- {
- //
- // The global thread pool
- //
-
- static ThreadPool gThreadPool (0);
- return gThreadPool;
- }
- void
- ThreadPool::addGlobalTask (Task* task)
- {
- globalThreadPool().addTask (task);
- }
- ILMTHREAD_INTERNAL_NAMESPACE_SOURCE_EXIT
|