IlmThreadPool.cpp 20 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851
  1. ///////////////////////////////////////////////////////////////////////////
  2. //
  3. // Copyright (c) 2005-2012, Industrial Light & Magic, a division of Lucas
  4. // Digital Ltd. LLC
  5. //
  6. // All rights reserved.
  7. //
  8. // Redistribution and use in source and binary forms, with or without
  9. // modification, are permitted provided that the following conditions are
  10. // met:
  11. // * Redistributions of source code must retain the above copyright
  12. // notice, this list of conditions and the following disclaimer.
  13. // * Redistributions in binary form must reproduce the above
  14. // copyright notice, this list of conditions and the following disclaimer
  15. // in the documentation and/or other materials provided with the
  16. // distribution.
  17. // * Neither the name of Industrial Light & Magic nor the names of
  18. // its contributors may be used to endorse or promote products derived
  19. // from this software without specific prior written permission.
  20. //
  21. // THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
  22. // "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
  23. // LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
  24. // A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
  25. // OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
  26. // SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
  27. // LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
  28. // DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
  29. // THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
  30. // (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
  31. // OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
  32. //
  33. ///////////////////////////////////////////////////////////////////////////
  34. //-----------------------------------------------------------------------------
  35. //
  36. // class Task, class ThreadPool, class TaskGroup
  37. //
  38. //-----------------------------------------------------------------------------
  39. #include "IlmThread.h"
  40. #include "IlmThreadMutex.h"
  41. #include "IlmThreadSemaphore.h"
  42. #include "IlmThreadPool.h"
  43. #include "Iex.h"
  44. #include <vector>
  45. #ifndef ILMBASE_FORCE_CXX03
  46. # include <memory>
  47. # include <atomic>
  48. # include <thread>
  49. #endif
  50. using namespace std;
  51. ILMTHREAD_INTERNAL_NAMESPACE_SOURCE_ENTER
  52. #if defined(__GNU_LIBRARY__) && ( __GLIBC__ < 2 || ( __GLIBC__ == 2 && __GLIBC_MINOR__ < 21 ) )
  53. # define ENABLE_SEM_DTOR_WORKAROUND
  54. #endif
  55. struct TaskGroup::Data
  56. {
  57. Data ();
  58. ~Data ();
  59. void addTask () ;
  60. void removeTask ();
  61. #ifndef ILMBASE_FORCE_CXX03
  62. std::atomic<int> numPending;
  63. #else
  64. int numPending; // number of pending tasks to still execute
  65. #endif
  66. Semaphore isEmpty; // used to signal that the taskgroup is empty
  67. #if defined(ENABLE_SEM_DTOR_WORKAROUND) || defined(ILMBASE_FORCE_CXX03)
  68. // this mutex is also used to lock numPending in the legacy c++ mode...
  69. Mutex dtorMutex; // used to work around the glibc bug:
  70. // http://sources.redhat.com/bugzilla/show_bug.cgi?id=12674
  71. #endif
  72. };
  73. struct ThreadPool::Data
  74. {
  75. typedef ThreadPoolProvider *TPPointer;
  76. Data ();
  77. ~Data();
  78. struct SafeProvider
  79. {
  80. SafeProvider (Data *d, ThreadPoolProvider *p) : _data( d ), _ptr( p )
  81. {
  82. }
  83. ~SafeProvider()
  84. {
  85. if ( _data )
  86. _data->coalesceProviderUse();
  87. }
  88. SafeProvider (const SafeProvider &o)
  89. : _data( o._data ), _ptr( o._ptr )
  90. {
  91. if ( _data )
  92. _data->bumpProviderUse();
  93. }
  94. SafeProvider &operator= (const SafeProvider &o)
  95. {
  96. if ( this != &o )
  97. {
  98. if ( o._data )
  99. o._data->bumpProviderUse();
  100. if ( _data )
  101. _data->coalesceProviderUse();
  102. _data = o._data;
  103. _ptr = o._ptr;
  104. }
  105. return *this;
  106. }
  107. #ifndef ILMBASE_FORCE_CXX03
  108. SafeProvider( SafeProvider &&o )
  109. : _data( o._data ), _ptr( o._ptr )
  110. {
  111. o._data = nullptr;
  112. }
  113. SafeProvider &operator=( SafeProvider &&o )
  114. {
  115. std::swap( _data, o._data );
  116. std::swap( _ptr, o._ptr );
  117. return *this;
  118. }
  119. #endif
  120. inline ThreadPoolProvider *get () const
  121. {
  122. return _ptr;
  123. }
  124. ThreadPoolProvider *operator-> () const
  125. {
  126. return get();
  127. }
  128. Data *_data;
  129. ThreadPoolProvider *_ptr;
  130. };
  131. // NB: In C++20, there is full support for atomic shared_ptr, but that is not
  132. // yet in use or finalized. Once stabilized, add appropriate usage here
  133. inline SafeProvider getProvider ();
  134. inline void coalesceProviderUse ();
  135. inline void bumpProviderUse ();
  136. inline void setProvider (ThreadPoolProvider *p);
  137. #ifdef ILMBASE_FORCE_CXX03
  138. Semaphore provSem;
  139. Mutex provMutex;
  140. int provUsers;
  141. ThreadPoolProvider *provider;
  142. ThreadPoolProvider *oldprovider;
  143. #else
  144. std::atomic<ThreadPoolProvider *> provider;
  145. std::atomic<int> provUsers;
  146. #endif
  147. };
  148. namespace {
  149. class DefaultWorkerThread;
  150. struct DefaultWorkData
  151. {
  152. Semaphore taskSemaphore; // threads wait on this for ready tasks
  153. mutable Mutex taskMutex; // mutual exclusion for the tasks list
  154. vector<Task*> tasks; // the list of tasks to execute
  155. Semaphore threadSemaphore; // signaled when a thread starts executing
  156. mutable Mutex threadMutex; // mutual exclusion for threads list
  157. vector<DefaultWorkerThread*> threads; // the list of all threads
  158. #ifdef ILMBASE_FORCE_CXX03
  159. bool stopping; // flag indicating whether to stop threads
  160. mutable Mutex stopMutex; // mutual exclusion for stopping flag
  161. #else
  162. std::atomic<bool> hasThreads;
  163. std::atomic<bool> stopping;
  164. #endif
  165. inline bool stopped () const
  166. {
  167. #ifdef ILMBASE_FORCE_CXX03
  168. Lock lock (stopMutex);
  169. return stopping;
  170. #else
  171. return stopping.load( std::memory_order_relaxed );
  172. #endif
  173. }
  174. inline void stop ()
  175. {
  176. #ifdef ILMBASE_FORCE_CXX03
  177. Lock lock (stopMutex);
  178. #endif
  179. stopping = true;
  180. }
  181. };
  182. //
  183. // class WorkerThread
  184. //
  185. class DefaultWorkerThread: public Thread
  186. {
  187. public:
  188. DefaultWorkerThread (DefaultWorkData* data);
  189. virtual void run ();
  190. private:
  191. DefaultWorkData * _data;
  192. };
  193. DefaultWorkerThread::DefaultWorkerThread (DefaultWorkData* data):
  194. _data (data)
  195. {
  196. start();
  197. }
  198. void
  199. DefaultWorkerThread::run ()
  200. {
  201. //
  202. // Signal that the thread has started executing
  203. //
  204. _data->threadSemaphore.post();
  205. while (true)
  206. {
  207. //
  208. // Wait for a task to become available
  209. //
  210. _data->taskSemaphore.wait();
  211. {
  212. Lock taskLock (_data->taskMutex);
  213. //
  214. // If there is a task pending, pop off the next task in the FIFO
  215. //
  216. if (!_data->tasks.empty())
  217. {
  218. Task* task = _data->tasks.back();
  219. _data->tasks.pop_back();
  220. taskLock.release();
  221. TaskGroup* taskGroup = task->group();
  222. task->execute();
  223. delete task;
  224. taskGroup->_data->removeTask ();
  225. }
  226. else if (_data->stopped())
  227. {
  228. break;
  229. }
  230. }
  231. }
  232. }
  233. //
  234. // class DefaultThreadPoolProvider
  235. //
  236. class DefaultThreadPoolProvider : public ThreadPoolProvider
  237. {
  238. public:
  239. DefaultThreadPoolProvider(int count);
  240. virtual ~DefaultThreadPoolProvider();
  241. virtual int numThreads() const;
  242. virtual void setNumThreads(int count);
  243. virtual void addTask(Task *task);
  244. virtual void finish();
  245. private:
  246. DefaultWorkData _data;
  247. };
  248. DefaultThreadPoolProvider::DefaultThreadPoolProvider (int count)
  249. {
  250. setNumThreads(count);
  251. }
  252. DefaultThreadPoolProvider::~DefaultThreadPoolProvider ()
  253. {
  254. finish();
  255. }
  256. int
  257. DefaultThreadPoolProvider::numThreads () const
  258. {
  259. Lock lock (_data.threadMutex);
  260. return static_cast<int> (_data.threads.size());
  261. }
  262. void
  263. DefaultThreadPoolProvider::setNumThreads (int count)
  264. {
  265. //
  266. // Lock access to thread list and size
  267. //
  268. Lock lock (_data.threadMutex);
  269. size_t desired = static_cast<size_t>(count);
  270. if (desired > _data.threads.size())
  271. {
  272. //
  273. // Add more threads
  274. //
  275. while (_data.threads.size() < desired)
  276. _data.threads.push_back (new DefaultWorkerThread (&_data));
  277. }
  278. else if ((size_t)count < _data.threads.size())
  279. {
  280. //
  281. // Wait until all existing threads are finished processing,
  282. // then delete all threads.
  283. //
  284. finish ();
  285. //
  286. // Add in new threads
  287. //
  288. while (_data.threads.size() < desired)
  289. _data.threads.push_back (new DefaultWorkerThread (&_data));
  290. }
  291. #ifndef ILMBASE_FORCE_CXX03
  292. _data.hasThreads = !(_data.threads.empty());
  293. #endif
  294. }
  295. void
  296. DefaultThreadPoolProvider::addTask (Task *task)
  297. {
  298. //
  299. // Lock the threads, needed to access numThreads
  300. //
  301. #ifdef ILMBASE_FORCE_CXX03
  302. bool doPush;
  303. {
  304. Lock lock (_data.threadMutex);
  305. doPush = !_data.threads.empty();
  306. }
  307. #else
  308. bool doPush = _data.hasThreads.load( std::memory_order_relaxed );
  309. #endif
  310. if ( doPush )
  311. {
  312. //
  313. // Get exclusive access to the tasks queue
  314. //
  315. {
  316. Lock taskLock (_data.taskMutex);
  317. //
  318. // Push the new task into the FIFO
  319. //
  320. _data.tasks.push_back (task);
  321. }
  322. //
  323. // Signal that we have a new task to process
  324. //
  325. _data.taskSemaphore.post ();
  326. }
  327. else
  328. {
  329. // this path shouldn't normally happen since we have the
  330. // NullThreadPoolProvider, but just in case...
  331. task->execute ();
  332. task->group()->_data->removeTask ();
  333. delete task;
  334. }
  335. }
  336. void
  337. DefaultThreadPoolProvider::finish ()
  338. {
  339. _data.stop();
  340. //
  341. // Signal enough times to allow all threads to stop.
  342. //
  343. // Wait until all threads have started their run functions.
  344. // If we do not wait before we destroy the threads then it's
  345. // possible that the threads have not yet called their run
  346. // functions.
  347. // If this happens then the run function will be called off
  348. // of an invalid object and we will crash, most likely with
  349. // an error like: "pure virtual method called"
  350. //
  351. size_t curT = _data.threads.size();
  352. for (size_t i = 0; i != curT; ++i)
  353. {
  354. _data.taskSemaphore.post();
  355. _data.threadSemaphore.wait();
  356. }
  357. //
  358. // Join all the threads
  359. //
  360. for (size_t i = 0; i != curT; ++i)
  361. delete _data.threads[i];
  362. Lock lock1 (_data.taskMutex);
  363. #ifdef ILMBASE_FORCE_CXX03
  364. Lock lock2 (_data.stopMutex);
  365. #endif
  366. _data.threads.clear();
  367. _data.tasks.clear();
  368. _data.stopping = false;
  369. }
  370. class NullThreadPoolProvider : public ThreadPoolProvider
  371. {
  372. virtual ~NullThreadPoolProvider() {}
  373. virtual int numThreads () const { return 0; }
  374. virtual void setNumThreads (int count)
  375. {
  376. }
  377. virtual void addTask (Task *t)
  378. {
  379. t->execute ();
  380. t->group()->_data->removeTask ();
  381. delete t;
  382. }
  383. virtual void finish () {}
  384. };
  385. } //namespace
  386. //
  387. // struct TaskGroup::Data
  388. //
  389. TaskGroup::Data::Data (): isEmpty (1), numPending (0)
  390. {
  391. // empty
  392. }
  393. TaskGroup::Data::~Data ()
  394. {
  395. //
  396. // A TaskGroup acts like an "inverted" semaphore: if the count
  397. // is above 0 then waiting on the taskgroup will block. This
  398. // destructor waits until the taskgroup is empty before returning.
  399. //
  400. isEmpty.wait ();
  401. #ifdef ENABLE_SEM_DTOR_WORKAROUND
  402. // Update: this was fixed in v. 2.2.21, so this ifdef checks for that
  403. //
  404. // Alas, given the current bug in glibc we need a secondary
  405. // syncronisation primitive here to account for the fact that
  406. // destructing the isEmpty Semaphore in this thread can cause
  407. // an error for a separate thread that is issuing the post() call.
  408. // We are entitled to destruct the semaphore at this point, however,
  409. // that post() call attempts to access data out of the associated
  410. // memory *after* it has woken the waiting threads, including this one,
  411. // potentially leading to invalid memory reads.
  412. // http://sources.redhat.com/bugzilla/show_bug.cgi?id=12674
  413. Lock lock (dtorMutex);
  414. #endif
  415. }
  416. void
  417. TaskGroup::Data::addTask ()
  418. {
  419. //
  420. // in c++11, we use an atomic to protect numPending to avoid the
  421. // extra lock but for c++98, to add the ability for custom thread
  422. // pool we add the lock here
  423. //
  424. #if ILMBASE_FORCE_CXX03
  425. Lock lock (dtorMutex);
  426. #endif
  427. if (numPending++ == 0)
  428. isEmpty.wait ();
  429. }
  430. void
  431. TaskGroup::Data::removeTask ()
  432. {
  433. // Alas, given the current bug in glibc we need a secondary
  434. // syncronisation primitive here to account for the fact that
  435. // destructing the isEmpty Semaphore in a separate thread can
  436. // cause an error. Issuing the post call here the current libc
  437. // implementation attempts to access memory *after* it has woken
  438. // waiting threads.
  439. // Since other threads are entitled to delete the semaphore the
  440. // access to the memory location can be invalid.
  441. // http://sources.redhat.com/bugzilla/show_bug.cgi?id=12674
  442. // Update: this bug has been fixed, but how do we know which
  443. // glibc version we're in?
  444. // Further update:
  445. //
  446. // we could remove this if it is a new enough glibc, however
  447. // we've changed the API to enable a custom override of a
  448. // thread pool. In order to provide safe access to the numPending,
  449. // we need the lock anyway, except for c++11 or newer
  450. #ifdef ILMBASE_FORCE_CXX03
  451. Lock lock (dtorMutex);
  452. if (--numPending == 0)
  453. isEmpty.post ();
  454. #else
  455. if (--numPending == 0)
  456. {
  457. #ifdef ENABLE_SEM_DTOR_WORKAROUND
  458. Lock lock (dtorMutex);
  459. #endif
  460. isEmpty.post ();
  461. }
  462. #endif
  463. }
  464. //
  465. // struct ThreadPool::Data
  466. //
  467. ThreadPool::Data::Data ():
  468. provUsers (0), provider (NULL)
  469. #ifdef ILMBASE_FORCE_CXX03
  470. , oldprovider (NULL)
  471. #else
  472. #endif
  473. {
  474. // empty
  475. }
  476. ThreadPool::Data::~Data()
  477. {
  478. #ifdef ILMBASE_FORCE_CXX03
  479. provider->finish();
  480. #else
  481. ThreadPoolProvider *p = provider.load( std::memory_order_relaxed );
  482. p->finish();
  483. #endif
  484. }
  485. inline ThreadPool::Data::SafeProvider
  486. ThreadPool::Data::getProvider ()
  487. {
  488. #ifdef ILMBASE_FORCE_CXX03
  489. Lock provLock( provMutex );
  490. ++provUsers;
  491. return SafeProvider( this, provider );
  492. #else
  493. provUsers.fetch_add( 1, std::memory_order_relaxed );
  494. return SafeProvider( this, provider.load( std::memory_order_relaxed ) );
  495. #endif
  496. }
  497. inline void
  498. ThreadPool::Data::coalesceProviderUse ()
  499. {
  500. #ifdef ILMBASE_FORCE_CXX03
  501. Lock provLock( provMutex );
  502. --provUsers;
  503. if ( provUsers == 0 )
  504. {
  505. if ( oldprovider )
  506. provSem.post();
  507. }
  508. #else
  509. int ov = provUsers.fetch_sub( 1, std::memory_order_relaxed );
  510. // ov is the previous value, so one means that now it might be 0
  511. if ( ov == 1 )
  512. {
  513. }
  514. #endif
  515. }
  516. inline void
  517. ThreadPool::Data::bumpProviderUse ()
  518. {
  519. #ifdef ILMBASE_FORCE_CXX03
  520. Lock lock (provMutex);
  521. ++provUsers;
  522. #else
  523. provUsers.fetch_add( 1, std::memory_order_relaxed );
  524. #endif
  525. }
  526. inline void
  527. ThreadPool::Data::setProvider (ThreadPoolProvider *p)
  528. {
  529. #ifdef ILMBASE_FORCE_CXX03
  530. Lock provLock( provMutex );
  531. if ( oldprovider )
  532. throw IEX_INTERNAL_NAMESPACE::ArgExc ("Attempt to set the thread pool provider while"
  533. " another thread is currently setting the provider.");
  534. oldprovider = provider;
  535. provider = p;
  536. while ( provUsers > 0 )
  537. {
  538. provLock.release();
  539. provSem.wait();
  540. provLock.acquire();
  541. }
  542. if ( oldprovider )
  543. {
  544. oldprovider->finish();
  545. delete oldprovider;
  546. oldprovider = NULL;
  547. }
  548. #else
  549. ThreadPoolProvider *old = provider.load( std::memory_order_relaxed );
  550. do
  551. {
  552. if ( ! provider.compare_exchange_weak( old, p, std::memory_order_release, std::memory_order_relaxed ) )
  553. continue;
  554. } while ( false );
  555. // wait for any other users to finish prior to deleting, given
  556. // that these are just mostly to query the thread count or push a
  557. // task to the queue (so fast), just spin...
  558. //
  559. // (well, and normally, people don't do this mid stream anyway, so
  560. // this will be 0 99.999% of the time, but just to be safe)
  561. //
  562. while ( provUsers.load( std::memory_order_relaxed ) > 0 )
  563. std::this_thread::yield();
  564. if ( old )
  565. {
  566. old->finish();
  567. delete old;
  568. }
  569. // NB: the shared_ptr mechanism is safer and means we don't have
  570. // to have the provUsers counter since the shared_ptr keeps that
  571. // for us. However, gcc 4.8/9 compilers which many people are
  572. // still using even though it is 2018 forgot to add the shared_ptr
  573. // functions... once that compiler is fully deprecated, switch to
  574. // using the below, change provider to a std::shared_ptr and remove
  575. // provUsers...
  576. //
  577. // std::shared_ptr<ThreadPoolProvider> newp( p );
  578. // std::shared_ptr<ThreadPoolProvider> curp = std::atomic_load_explicit( &provider, std::memory_order_relaxed );
  579. // do
  580. // {
  581. // if ( ! std::atomic_compare_exchange_weak_explicit( &provider, &curp, newp, std::memory_order_release, std::memory_order_relaxed ) )
  582. // continue;
  583. // } while ( false );
  584. // if ( curp )
  585. // curp->finish();
  586. #endif
  587. }
  588. //
  589. // class Task
  590. //
  591. Task::Task (TaskGroup* g): _group(g)
  592. {
  593. if ( g )
  594. g->_data->addTask ();
  595. }
  596. Task::~Task()
  597. {
  598. // empty
  599. }
  600. TaskGroup*
  601. Task::group ()
  602. {
  603. return _group;
  604. }
  605. TaskGroup::TaskGroup ():
  606. _data (new Data())
  607. {
  608. // empty
  609. }
  610. TaskGroup::~TaskGroup ()
  611. {
  612. delete _data;
  613. }
  614. void
  615. TaskGroup::finishOneTask ()
  616. {
  617. _data->removeTask ();
  618. }
  619. //
  620. // class ThreadPoolProvider
  621. //
  622. ThreadPoolProvider::ThreadPoolProvider()
  623. {
  624. }
  625. ThreadPoolProvider::~ThreadPoolProvider()
  626. {
  627. }
  628. //
  629. // class ThreadPool
  630. //
  631. ThreadPool::ThreadPool (unsigned nthreads):
  632. _data (new Data)
  633. {
  634. if ( nthreads == 0 )
  635. _data->setProvider( new NullThreadPoolProvider );
  636. else
  637. _data->setProvider( new DefaultThreadPoolProvider( int(nthreads) ) );
  638. }
  639. ThreadPool::~ThreadPool ()
  640. {
  641. delete _data;
  642. }
  643. int
  644. ThreadPool::numThreads () const
  645. {
  646. return _data->getProvider ()->numThreads ();
  647. }
  648. void
  649. ThreadPool::setNumThreads (int count)
  650. {
  651. if (count < 0)
  652. throw IEX_INTERNAL_NAMESPACE::ArgExc ("Attempt to set the number of threads "
  653. "in a thread pool to a negative value.");
  654. bool doReset = false;
  655. {
  656. Data::SafeProvider sp = _data->getProvider ();
  657. int curT = sp->numThreads ();
  658. if ( curT == count )
  659. return;
  660. if ( curT == 0 )
  661. {
  662. NullThreadPoolProvider *npp = dynamic_cast<NullThreadPoolProvider *>( sp.get() );
  663. if ( npp )
  664. doReset = true;
  665. }
  666. else if ( count == 0 )
  667. {
  668. DefaultThreadPoolProvider *dpp = dynamic_cast<DefaultThreadPoolProvider *>( sp.get() );
  669. if ( dpp )
  670. doReset = true;
  671. }
  672. if ( ! doReset )
  673. sp->setNumThreads( count );
  674. }
  675. if ( doReset )
  676. {
  677. if ( count == 0 )
  678. _data->setProvider( new NullThreadPoolProvider );
  679. else
  680. _data->setProvider( new DefaultThreadPoolProvider( count ) );
  681. }
  682. }
  683. void
  684. ThreadPool::setThreadProvider (ThreadPoolProvider *provider)
  685. {
  686. _data->setProvider (provider);
  687. }
  688. void
  689. ThreadPool::addTask (Task* task)
  690. {
  691. _data->getProvider ()->addTask (task);
  692. }
  693. ThreadPool&
  694. ThreadPool::globalThreadPool ()
  695. {
  696. //
  697. // The global thread pool
  698. //
  699. static ThreadPool gThreadPool (0);
  700. return gThreadPool;
  701. }
  702. void
  703. ThreadPool::addGlobalTask (Task* task)
  704. {
  705. globalThreadPool().addTask (task);
  706. }
  707. ILMTHREAD_INTERNAL_NAMESPACE_SOURCE_EXIT