LibreOffice Module comphelper (master)  1
threadpool.cxx
Go to the documentation of this file.
1 /* -*- Mode: C++; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */
2 /*
3  * This file is part of the LibreOffice project.
4  *
5  * This Source Code Form is subject to the terms of the Mozilla Public
6  * License, v. 2.0. If a copy of the MPL was not distributed with this
7  * file, You can obtain one at http://mozilla.org/MPL/2.0/.
8  */
9 
11 
12 #include <com/sun/star/uno/Exception.hpp>
13 #include <config_options.h>
14 #include <sal/config.h>
15 #include <sal/log.hxx>
16 #include <rtl/instance.hxx>
17 #include <salhelper/thread.hxx>
18 #include <algorithm>
19 #include <memory>
20 #include <thread>
21 #include <chrono>
23 
24 #if defined HAVE_VALGRIND_HEADERS
25 #include <valgrind/memcheck.h>
26 #endif
27 
28 #if defined(_WIN32)
29 #define WIN32_LEAN_AND_MEAN
30 #include <windows.h>
31 #endif
32 
33 namespace comphelper {
34 
36 #if defined DBG_UTIL && (defined LINUX || defined _WIN32)
37 static thread_local bool gbIsWorkerThread;
38 #endif
39 
40 // used to group thread-tasks for waiting in waitTillDone()
42 {
44  sal_Int32 mnTasksWorking;
45  std::condition_variable maTasksComplete;
46 
47 public:
48  ThreadTaskTag();
49  bool isDone();
50  void waitUntilDone();
51  void onTaskWorkerDone();
52  void onTaskPushed();
53 };
54 
55 
57 {
59 public:
60 
61  explicit ThreadWorker( ThreadPool *pPool ) :
62  salhelper::Thread("thread-pool"),
63  mpPool( pPool )
64  {
65  }
66 
67  virtual void execute() override
68  {
69 #if defined DBG_UTIL && (defined LINUX || defined _WIN32)
70  gbIsWorkerThread = true;
71 #endif
72  std::unique_lock< std::mutex > aGuard( mpPool->maMutex );
73 
74  while( !mpPool->mbTerminate )
75  {
76  std::unique_ptr<ThreadTask> pTask = mpPool->popWorkLocked( aGuard, true );
77  if( pTask )
78  {
79  std::shared_ptr<ThreadTaskTag> pTag(pTask->mpTag);
80  mpPool->incBusyWorker();
81  aGuard.unlock();
82 
83  pTask->exec();
84  pTask.reset();
85 
86  aGuard.lock();
87  mpPool->decBusyWorker();
88  pTag->onTaskWorkerDone();
89  }
90  }
91  }
92 };
93 
94 ThreadPool::ThreadPool(sal_Int32 nWorkers)
95  : mbTerminate(true)
96  , mnMaxWorkers(nWorkers)
97  , mnBusyWorkers(0)
98 {
99 }
100 
102 {
103  // note: calling shutdown from global variable dtor blocks forever on Win7
104  // note2: there isn't enough MSVCRT left on exit to call assert() properly
105  // so these asserts just print something to stderr but exit status is
106  // still 0, but hopefully they will be more helpful on non-WNT platforms
108  assert(maTasks.empty());
109  assert(mnBusyWorkers == 0);
110 }
111 
112 namespace {
113 
114 struct ThreadPoolStatic : public rtl::StaticWithInit< std::shared_ptr< ThreadPool >,
115  ThreadPoolStatic >
116 {
117  std::shared_ptr< ThreadPool > operator () () {
118  const sal_Int32 nThreads = ThreadPool::getPreferredConcurrency();
119  return std::make_shared< ThreadPool >( nThreads );
120  };
121 };
122 
123 }
124 
126 {
127  return *ThreadPoolStatic::get();
128 }
129 
131 {
132  static sal_Int32 ThreadCount = [&]()
133  {
134  const sal_Int32 nHardThreads = std::max(std::thread::hardware_concurrency(), 1U);
135  sal_Int32 nThreads = nHardThreads;
136  const char *pEnv = getenv("MAX_CONCURRENCY");
137  if (pEnv != nullptr)
138  {
139  // Override with user/admin preference.
140  nThreads = rtl_str_toInt32(pEnv, 10);
141  }
142 
143  nThreads = std::min(nHardThreads, nThreads);
144  return std::max<sal_Int32>(nThreads, 1);
145  }();
146 
147  return ThreadCount;
148 }
149 
150 // Used to order shutdown, and to ensure there are no lingering
151 // threads after LibreOfficeKit pre-init.
153 {
154 // if (mbTerminate)
155 // return;
156 
157  std::unique_lock< std::mutex > aGuard( maMutex );
158  shutdownLocked(aGuard);
159 }
160 
161 void ThreadPool::shutdownLocked(std::unique_lock<std::mutex>& aGuard)
162 {
163  if( maWorkers.empty() )
164  { // no threads at all -> execute the work in-line
165  std::unique_ptr<ThreadTask> pTask;
166  while ( ( pTask = popWorkLocked(aGuard, false) ) )
167  {
168  std::shared_ptr<ThreadTaskTag> pTag(pTask->mpTag);
169  pTask->exec();
170  pTag->onTaskWorkerDone();
171  }
172  }
173  else
174  {
175  while( !maTasks.empty() )
176  {
177  maTasksChanged.wait( aGuard );
178  // In the (unlikely but possible?) case pushTask() gets called meanwhile,
179  // its notify_one() call is meant to wake a up a thread and process the task.
180  // But if this code gets woken up instead, it could lead to a deadlock.
181  // Pass on the notification.
182  maTasksChanged.notify_one();
183  }
184  }
185  assert( maTasks.empty() );
186 
187  // coverity[missing_lock] - on purpose
188  mbTerminate = true;
189 
190  maTasksChanged.notify_all();
191 
192  decltype(maWorkers) aWorkers;
193  std::swap(maWorkers, aWorkers);
194  aGuard.unlock();
195 
196  while (!aWorkers.empty())
197  {
198  rtl::Reference<ThreadWorker> xWorker = aWorkers.back();
199  aWorkers.pop_back();
200  assert(std::find(aWorkers.begin(), aWorkers.end(), xWorker)
201  == aWorkers.end());
202  {
203  xWorker->join();
204  xWorker.clear();
205  }
206  }
207 }
208 
209 void ThreadPool::pushTask( std::unique_ptr<ThreadTask> pTask )
210 {
211  std::scoped_lock< std::mutex > aGuard( maMutex );
212 
213  mbTerminate = false;
214 
215  // Worked on tasks are already removed from maTasks, so include the count of busy workers.
216  if (maWorkers.size() < mnMaxWorkers && maWorkers.size() <= maTasks.size() + mnBusyWorkers)
217  {
218  maWorkers.push_back( new ThreadWorker( this ) );
219  maWorkers.back()->launch();
220  }
221 
222  pTask->mpTag->onTaskPushed();
223  maTasks.insert( maTasks.begin(), std::move(pTask) );
224 
225  maTasksChanged.notify_one();
226 }
227 
228 std::unique_ptr<ThreadTask> ThreadPool::popWorkLocked( std::unique_lock< std::mutex > & rGuard, bool bWait )
229 {
230  do
231  {
232  if( !maTasks.empty() )
233  {
234  std::unique_ptr<ThreadTask> pTask = std::move(maTasks.back());
235  maTasks.pop_back();
236  return pTask;
237  }
238  else if (!bWait || mbTerminate)
239  return nullptr;
240 
241  maTasksChanged.wait( rGuard );
242 
243  } while (!mbTerminate);
244 
245  return nullptr;
246 }
247 
249 {
250  ++mnBusyWorkers;
251 }
252 
254 {
255  assert(mnBusyWorkers >= 1);
256  --mnBusyWorkers;
257 }
258 
259 void ThreadPool::waitUntilDone(const std::shared_ptr<ThreadTaskTag>& rTag, bool bJoin)
260 {
261 #if defined DBG_UTIL && (defined LINUX || defined _WIN32)
262  assert(!gbIsWorkerThread && "cannot wait for tasks from inside a task");
263 #endif
264  {
265  std::unique_lock< std::mutex > aGuard( maMutex );
266 
267  if( maWorkers.empty() )
268  { // no threads at all -> execute the work in-line
269  while (!rTag->isDone())
270  {
271  std::unique_ptr<ThreadTask> pTask = popWorkLocked(aGuard, false);
272  if (!pTask)
273  break;
274  std::shared_ptr<ThreadTaskTag> pTag(pTask->mpTag);
275  pTask->exec();
276  pTag->onTaskWorkerDone();
277  }
278  }
279  }
280 
281  rTag->waitUntilDone();
282 
283  if (bJoin)
285 }
286 
288 {
289  std::unique_lock< std::mutex > aGuard( maMutex );
290  if (isIdle()) // check if there are still tasks from another tag
291  {
292  shutdownLocked(aGuard);
293  }
294 }
295 
296 std::shared_ptr<ThreadTaskTag> ThreadPool::createThreadTaskTag()
297 {
298  return std::make_shared<ThreadTaskTag>();
299 }
300 
301 bool ThreadPool::isTaskTagDone(const std::shared_ptr<ThreadTaskTag>& pTag)
302 {
303  return pTag->isDone();
304 }
305 
306 ThreadTask::ThreadTask(const std::shared_ptr<ThreadTaskTag>& pTag)
307  : mpTag(pTag)
308 {
309 }
310 
312 {
313  try {
314  doWork();
315  }
316  catch (const std::exception &e)
317  {
318  SAL_WARN("comphelper", "exception in thread worker while calling doWork(): " << e.what());
319  }
320  catch (const css::uno::Exception &e)
321  {
322  SAL_WARN("comphelper", "exception in thread worker while calling doWork(): " << e);
323  }
324  catch (...)
325  {
326  SAL_WARN("comphelper", "unknown exception in thread worker while calling doWork()");
327  }
328 }
329 
330 ThreadTaskTag::ThreadTaskTag() : mnTasksWorking(0)
331 {
332 }
333 
335 {
336  std::scoped_lock< std::mutex > aGuard( maMutex );
337  mnTasksWorking++;
338  assert( mnTasksWorking < 65536 ); // sanity checking
339 }
340 
342 {
343  std::scoped_lock< std::mutex > aGuard( maMutex );
344  mnTasksWorking--;
345  assert(mnTasksWorking >= 0);
346  if (mnTasksWorking == 0)
347  maTasksComplete.notify_all();
348 }
349 
351 {
352  std::scoped_lock< std::mutex > aGuard( maMutex );
353  return mnTasksWorking == 0;
354 }
355 
357 {
358  std::unique_lock< std::mutex > aGuard( maMutex );
359  while( mnTasksWorking > 0 )
360  {
361 #if defined DBG_UTIL && !defined NDEBUG
362  // 10 minute timeout in debug mode, unless the code is built with
363  // sanitizers or debugged in valgrind or gdb, in which case the threads
364  // should not time out in the middle of a debugging session
365  int maxTimeout = 10 * 60;
366 #if !ENABLE_RUNTIME_OPTIMIZATIONS
367  maxTimeout = 30 * 60;
368 #endif
369 #if defined HAVE_VALGRIND_HEADERS
370  if( RUNNING_ON_VALGRIND )
371  maxTimeout = 30 * 60;
372 #endif
373  if( isDebuggerAttached())
374  maxTimeout = 300 * 60;
375  std::cv_status result = maTasksComplete.wait_for(
376  aGuard, std::chrono::seconds( maxTimeout ));
377  assert(result != std::cv_status::timeout);
378 #else
379  // 10 minute timeout in production so the app eventually throws some kind of error
380  if (maTasksComplete.wait_for(
381  aGuard, std::chrono::seconds( 10 * 60 )) == std::cv_status::timeout)
382  throw std::runtime_error("timeout waiting for threadpool tasks");
383 #endif
384  }
385 }
386 
387 } // namespace comphelper
388 
389 /* vim:set shiftwidth=4 softtabstop=4 expandtab: */
std::vector< std::unique_ptr< ThreadTask > > maTasks
Definition: threadpool.hxx:105
A very basic thread-safe thread pool implementation.
Definition: threadpool.hxx:43
static sal_Int32 getPreferredConcurrency()
returns a configurable max-concurrency limit to avoid spawning an unnecessarily large number of threa...
Definition: threadpool.cxx:130
virtual void doWork()=0
override to get your task performed by the pool
static thread_local bool gbIsWorkerThread
prevent waiting for a task from inside a task
Definition: threadpool.cxx:37
void waitUntilDone(const std::shared_ptr< ThreadTaskTag > &, bool bJoin=true)
Wait until all queued tasks associated with the tag are completed.
Definition: threadpool.cxx:259
std::condition_variable maTasksComplete
Definition: threadpool.cxx:45
Thread(char const *name)
const BorderLinePrimitive2D *pCandidateB assert(pCandidateA)
std::mutex maMutex
signalled when all in-progress tasks are complete
Definition: threadpool.hxx:100
ThreadPool(sal_Int32 nWorkers)
Definition: threadpool.cxx:94
static ThreadPool & getSharedOptimalPool()
returns a pointer to a shared pool with optimal thread count for the CPU
Definition: threadpool.cxx:125
ThreadTask(const std::shared_ptr< ThreadTaskTag > &pTag)
Definition: threadpool.cxx:306
bool isIdle() const
return true if there are no queued or worked-on tasks
Definition: threadpool.hxx:75
std::condition_variable maTasksChanged
Definition: threadpool.hxx:101
void pushTask(std::unique_ptr< ThreadTask > pTask)
push a new task onto the work queue
Definition: threadpool.cxx:209
static bool isTaskTagDone(const std::shared_ptr< ThreadTaskTag > &)
Definition: threadpool.cxx:301
friend class ThreadWorker
Definition: threadpool.hxx:87
virtual void execute() override
Definition: threadpool.cxx:67
std::vector< rtl::Reference< ThreadWorker > > maWorkers
Definition: threadpool.hxx:106
static PropertyMapEntry const * find(rtl::Reference< PropertySetInfo > &mxInfo, const OUString &aName)
void exec()
execute this task
Definition: threadpool.cxx:311
std::mutex mutex
Definition: random.cxx:42
std::unique_ptr< ThreadTask > popWorkLocked(std::unique_lock< std::mutex > &rGuard, bool bWait)
Pop a work task.
Definition: threadpool.cxx:228
void shutdown()
wait until all work is completed, then join all threads
Definition: threadpool.cxx:152
static std::shared_ptr< ThreadTaskTag > createThreadTaskTag()
Definition: threadpool.cxx:296
std::size_t mnBusyWorkers
Definition: threadpool.hxx:104
Any result
#define SAL_WARN(area, stream)
void shutdownLocked(std::unique_lock< std::mutex > &)
Definition: threadpool.cxx:161
std::size_t const mnMaxWorkers
Definition: threadpool.hxx:103
bool isDebuggerAttached()
Returns true if the process is running with a debugger attached.
void joinThreadsIfIdle()
join all threads if there are no tasks presently.
Definition: threadpool.cxx:287