LibreOffice Module comphelper (master)  1
threadpool.cxx
Go to the documentation of this file.
1 /* -*- Mode: C++; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */
2 /*
3  * This file is part of the LibreOffice project.
4  *
5  * This Source Code Form is subject to the terms of the Mozilla Public
6  * License, v. 2.0. If a copy of the MPL was not distributed with this
7  * file, You can obtain one at http://mozilla.org/MPL/2.0/.
8  */
9 
11 
12 #include <com/sun/star/uno/Exception.hpp>
13 #include <config_options.h>
14 #include <sal/config.h>
15 #include <sal/log.hxx>
16 #include <salhelper/thread.hxx>
17 #include <algorithm>
18 #include <memory>
19 #include <thread>
20 #include <chrono>
22 
23 #if defined HAVE_VALGRIND_HEADERS
24 #include <valgrind/memcheck.h>
25 #endif
26 
27 #if defined(_WIN32)
28 #define WIN32_LEAN_AND_MEAN
29 #include <windows.h>
30 #endif
31 
32 namespace comphelper {
33 
35 #if defined DBG_UTIL && (defined LINUX || defined _WIN32)
36 static thread_local bool gbIsWorkerThread;
37 #endif
38 
39 // used to group thread-tasks for waiting in waitTillDone()
41 {
43  sal_Int32 mnTasksWorking;
44  std::condition_variable maTasksComplete;
45 
46 public:
47  ThreadTaskTag();
48  bool isDone();
49  void waitUntilDone();
50  void onTaskWorkerDone();
51  void onTaskPushed();
52 };
53 
54 
56 {
58 public:
59 
60  explicit ThreadWorker( ThreadPool *pPool ) :
61  salhelper::Thread("thread-pool"),
62  mpPool( pPool )
63  {
64  }
65 
66  virtual void execute() override
67  {
68 #if defined DBG_UTIL && (defined LINUX || defined _WIN32)
69  gbIsWorkerThread = true;
70 #endif
71  std::unique_lock< std::mutex > aGuard( mpPool->maMutex );
72 
73  while( !mpPool->mbTerminate )
74  {
75  std::unique_ptr<ThreadTask> pTask = mpPool->popWorkLocked( aGuard, true );
76  if( pTask )
77  {
78  std::shared_ptr<ThreadTaskTag> pTag(pTask->mpTag);
79  mpPool->incBusyWorker();
80  aGuard.unlock();
81 
82  pTask->exec();
83  pTask.reset();
84 
85  aGuard.lock();
86  mpPool->decBusyWorker();
87  pTag->onTaskWorkerDone();
88  }
89  }
90  }
91 };
92 
93 ThreadPool::ThreadPool(sal_Int32 nWorkers)
94  : mbTerminate(true)
95  , mnMaxWorkers(nWorkers)
96  , mnBusyWorkers(0)
97 {
98 }
99 
101 {
102  // note: calling shutdown from global variable dtor blocks forever on Win7
103  // note2: there isn't enough MSVCRT left on exit to call assert() properly
104  // so these asserts just print something to stderr but exit status is
105  // still 0, but hopefully they will be more helpful on non-WNT platforms
106  assert(mbTerminate);
107  assert(maTasks.empty());
108  assert(mnBusyWorkers == 0);
109 }
110 
111 namespace {
112 
113 std::shared_ptr< ThreadPool >& GetStaticThreadPool()
114 {
115  static std::shared_ptr< ThreadPool > POOL =
116  []()
117  {
118  const sal_Int32 nThreads = ThreadPool::getPreferredConcurrency();
119  return std::make_shared< ThreadPool >( nThreads );
120  }();
121  return POOL;
122 }
123 
124 }
125 
127 {
128  return *GetStaticThreadPool();
129 }
130 
132 {
133  static sal_Int32 ThreadCount = []()
134  {
135  const sal_Int32 nHardThreads = std::max(std::thread::hardware_concurrency(), 1U);
136  sal_Int32 nThreads = nHardThreads;
137  const char *pEnv = getenv("MAX_CONCURRENCY");
138  if (pEnv != nullptr)
139  {
140  // Override with user/admin preference.
141  nThreads = rtl_str_toInt32(pEnv, 10);
142  }
143 
144  nThreads = std::min(nHardThreads, nThreads);
145  return std::max<sal_Int32>(nThreads, 1);
146  }();
147 
148  return ThreadCount;
149 }
150 
151 // Used to order shutdown, and to ensure there are no lingering
152 // threads after LibreOfficeKit pre-init.
154 {
155 // if (mbTerminate)
156 // return;
157 
158  std::unique_lock< std::mutex > aGuard( maMutex );
159  shutdownLocked(aGuard);
160 }
161 
162 void ThreadPool::shutdownLocked(std::unique_lock<std::mutex>& aGuard)
163 {
164  if( maWorkers.empty() )
165  { // no threads at all -> execute the work in-line
166  std::unique_ptr<ThreadTask> pTask;
167  while ( ( pTask = popWorkLocked(aGuard, false) ) )
168  {
169  std::shared_ptr<ThreadTaskTag> pTag(pTask->mpTag);
170  pTask->exec();
171  pTag->onTaskWorkerDone();
172  }
173  }
174  else
175  {
176  while( !maTasks.empty() )
177  {
178  maTasksChanged.wait( aGuard );
179  // In the (unlikely but possible?) case pushTask() gets called meanwhile,
180  // its notify_one() call is meant to wake a up a thread and process the task.
181  // But if this code gets woken up instead, it could lead to a deadlock.
182  // Pass on the notification.
183  maTasksChanged.notify_one();
184  }
185  }
186  assert( maTasks.empty() );
187 
188  // coverity[missing_lock] - on purpose
189  mbTerminate = true;
190 
191  maTasksChanged.notify_all();
192 
193  decltype(maWorkers) aWorkers;
194  std::swap(maWorkers, aWorkers);
195  aGuard.unlock();
196 
197  while (!aWorkers.empty())
198  {
199  rtl::Reference<ThreadWorker> xWorker = aWorkers.back();
200  aWorkers.pop_back();
201  assert(std::find(aWorkers.begin(), aWorkers.end(), xWorker)
202  == aWorkers.end());
203  {
204  xWorker->join();
205  xWorker.clear();
206  }
207  }
208 }
209 
210 void ThreadPool::pushTask( std::unique_ptr<ThreadTask> pTask )
211 {
212  std::scoped_lock< std::mutex > aGuard( maMutex );
213 
214  mbTerminate = false;
215 
216  // Worked on tasks are already removed from maTasks, so include the count of busy workers.
217  if (maWorkers.size() < mnMaxWorkers && maWorkers.size() <= maTasks.size() + mnBusyWorkers)
218  {
219  maWorkers.push_back( new ThreadWorker( this ) );
220  maWorkers.back()->launch();
221  }
222 
223  pTask->mpTag->onTaskPushed();
224  maTasks.insert( maTasks.begin(), std::move(pTask) );
225 
226  maTasksChanged.notify_one();
227 }
228 
229 std::unique_ptr<ThreadTask> ThreadPool::popWorkLocked( std::unique_lock< std::mutex > & rGuard, bool bWait )
230 {
231  do
232  {
233  if( !maTasks.empty() )
234  {
235  std::unique_ptr<ThreadTask> pTask = std::move(maTasks.back());
236  maTasks.pop_back();
237  return pTask;
238  }
239  else if (!bWait || mbTerminate)
240  return nullptr;
241 
242  maTasksChanged.wait( rGuard );
243 
244  } while (!mbTerminate);
245 
246  return nullptr;
247 }
248 
250 {
251  ++mnBusyWorkers;
252 }
253 
255 {
256  assert(mnBusyWorkers >= 1);
257  --mnBusyWorkers;
258 }
259 
260 void ThreadPool::waitUntilDone(const std::shared_ptr<ThreadTaskTag>& rTag, bool bJoin)
261 {
262 #if defined DBG_UTIL && (defined LINUX || defined _WIN32)
263  assert(!gbIsWorkerThread && "cannot wait for tasks from inside a task");
264 #endif
265  {
266  std::unique_lock< std::mutex > aGuard( maMutex );
267 
268  if( maWorkers.empty() )
269  { // no threads at all -> execute the work in-line
270  while (!rTag->isDone())
271  {
272  std::unique_ptr<ThreadTask> pTask = popWorkLocked(aGuard, false);
273  if (!pTask)
274  break;
275  std::shared_ptr<ThreadTaskTag> pTag(pTask->mpTag);
276  pTask->exec();
277  pTag->onTaskWorkerDone();
278  }
279  }
280  }
281 
282  rTag->waitUntilDone();
283 
284  if (bJoin)
286 }
287 
289 {
290  std::unique_lock< std::mutex > aGuard( maMutex );
291  if (isIdle()) // check if there are still tasks from another tag
292  {
293  shutdownLocked(aGuard);
294  }
295 }
296 
297 std::shared_ptr<ThreadTaskTag> ThreadPool::createThreadTaskTag()
298 {
299  return std::make_shared<ThreadTaskTag>();
300 }
301 
302 bool ThreadPool::isTaskTagDone(const std::shared_ptr<ThreadTaskTag>& pTag)
303 {
304  return pTag->isDone();
305 }
306 
307 ThreadTask::ThreadTask(const std::shared_ptr<ThreadTaskTag>& pTag)
308  : mpTag(pTag)
309 {
310 }
311 
313 {
314  try {
315  doWork();
316  }
317  catch (const std::exception &e)
318  {
319  SAL_WARN("comphelper", "exception in thread worker while calling doWork(): " << e.what());
320  }
321  catch (const css::uno::Exception &e)
322  {
323  SAL_WARN("comphelper", "exception in thread worker while calling doWork(): " << e);
324  }
325  catch (...)
326  {
327  SAL_WARN("comphelper", "unknown exception in thread worker while calling doWork()");
328  }
329 }
330 
331 ThreadTaskTag::ThreadTaskTag() : mnTasksWorking(0)
332 {
333 }
334 
336 {
337  std::scoped_lock< std::mutex > aGuard( maMutex );
338  mnTasksWorking++;
339  assert( mnTasksWorking < 65536 ); // sanity checking
340 }
341 
343 {
344  std::scoped_lock< std::mutex > aGuard( maMutex );
345  mnTasksWorking--;
346  assert(mnTasksWorking >= 0);
347  if (mnTasksWorking == 0)
348  maTasksComplete.notify_all();
349 }
350 
352 {
353  std::scoped_lock< std::mutex > aGuard( maMutex );
354  return mnTasksWorking == 0;
355 }
356 
358 {
359  std::unique_lock< std::mutex > aGuard( maMutex );
360  while( mnTasksWorking > 0 )
361  {
362 #if defined DBG_UTIL && !defined NDEBUG
363  // 10 minute timeout in debug mode, unless the code is built with
364  // sanitizers or debugged in valgrind or gdb, in which case the threads
365  // should not time out in the middle of a debugging session
366  int maxTimeout = 10 * 60;
367 #if !ENABLE_RUNTIME_OPTIMIZATIONS
368  maxTimeout = 30 * 60;
369 #endif
370 #if defined HAVE_VALGRIND_HEADERS
371  if( RUNNING_ON_VALGRIND )
372  maxTimeout = 30 * 60;
373 #endif
374  if( isDebuggerAttached())
375  maxTimeout = 300 * 60;
376  std::cv_status result = maTasksComplete.wait_for(
377  aGuard, std::chrono::seconds( maxTimeout ));
378  assert(result != std::cv_status::timeout);
379 #else
380  // 10 minute timeout in production so the app eventually throws some kind of error
381  if (maTasksComplete.wait_for(
382  aGuard, std::chrono::seconds( 10 * 60 )) == std::cv_status::timeout)
383  throw std::runtime_error("timeout waiting for threadpool tasks");
384 #endif
385  }
386 }
387 
388 } // namespace comphelper
389 
390 /* vim:set shiftwidth=4 softtabstop=4 expandtab: */
std::vector< std::unique_ptr< ThreadTask > > maTasks
Definition: threadpool.hxx:105
A very basic thread-safe thread pool implementation.
Definition: threadpool.hxx:43
static sal_Int32 getPreferredConcurrency()
returns a configurable max-concurrency limit to avoid spawning an unnecessarily large number of threa...
Definition: threadpool.cxx:131
virtual void doWork()=0
override to get your task performed by the pool
static thread_local bool gbIsWorkerThread
prevent waiting for a task from inside a task
Definition: threadpool.cxx:36
void waitUntilDone(const std::shared_ptr< ThreadTaskTag > &, bool bJoin=true)
Wait until all queued tasks associated with the tag are completed.
Definition: threadpool.cxx:260
std::condition_variable maTasksComplete
Definition: threadpool.cxx:44
Thread(char const *name)
std::mutex maMutex
signalled when all in-progress tasks are complete
Definition: threadpool.hxx:100
ThreadPool(sal_Int32 nWorkers)
Definition: threadpool.cxx:93
static ThreadPool & getSharedOptimalPool()
returns a pointer to a shared pool with optimal thread count for the CPU
Definition: threadpool.cxx:126
ThreadTask(const std::shared_ptr< ThreadTaskTag > &pTag)
Definition: threadpool.cxx:307
bool isIdle() const
return true if there are no queued or worked-on tasks
Definition: threadpool.hxx:75
std::condition_variable maTasksChanged
Definition: threadpool.hxx:101
void pushTask(std::unique_ptr< ThreadTask > pTask)
push a new task onto the work queue
Definition: threadpool.cxx:210
static bool isTaskTagDone(const std::shared_ptr< ThreadTaskTag > &)
Definition: threadpool.cxx:302
friend class ThreadWorker
Definition: threadpool.hxx:87
virtual void execute() override
Definition: threadpool.cxx:66
std::vector< rtl::Reference< ThreadWorker > > maWorkers
Definition: threadpool.hxx:106
static PropertyMapEntry const * find(const rtl::Reference< PropertySetInfo > &mxInfo, const OUString &aName) noexcept
void exec()
execute this task
Definition: threadpool.cxx:312
std::mutex mutex
Definition: random.cxx:41
std::unique_ptr< ThreadTask > popWorkLocked(std::unique_lock< std::mutex > &rGuard, bool bWait)
Pop a work task.
Definition: threadpool.cxx:229
void shutdown()
wait until all work is completed, then join all threads
Definition: threadpool.cxx:153
static std::shared_ptr< ThreadTaskTag > createThreadTaskTag()
Definition: threadpool.cxx:297
std::size_t mnBusyWorkers
Definition: threadpool.hxx:104
Any result
#define SAL_WARN(area, stream)
void shutdownLocked(std::unique_lock< std::mutex > &)
Definition: threadpool.cxx:162
std::size_t const mnMaxWorkers
Definition: threadpool.hxx:103
bool isDebuggerAttached()
Returns true if the process is running with a debugger attached.
void joinThreadsIfIdle()
join all threads if there are no tasks presently.
Definition: threadpool.cxx:288