LibreOffice Module comphelper (master)  1
threadpool.cxx
Go to the documentation of this file.
1 /* -*- Mode: C++; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */
2 /*
3  * This file is part of the LibreOffice project.
4  *
5  * This Source Code Form is subject to the terms of the Mozilla Public
6  * License, v. 2.0. If a copy of the MPL was not distributed with this
7  * file, You can obtain one at http://mozilla.org/MPL/2.0/.
8  */
9 
11 
12 #include <com/sun/star/uno/Exception.hpp>
13 #include <config_options.h>
14 #include <sal/config.h>
15 #include <sal/log.hxx>
16 #include <rtl/instance.hxx>
17 #include <salhelper/thread.hxx>
18 #include <algorithm>
19 #include <memory>
20 #include <thread>
21 #include <chrono>
23 
24 #if defined HAVE_VALGRIND_HEADERS
25 #include <valgrind/memcheck.h>
26 #endif
27 
28 #if defined(_WIN32)
29 #define WIN32_LEAN_AND_MEAN
30 #include <windows.h>
31 #elif defined UNX
32 #include <unistd.h>
33 #include <fcntl.h>
34 #endif
35 
36 namespace comphelper {
37 
39 #if defined DBG_UTIL && (defined LINUX || defined _WIN32)
40 static thread_local bool gbIsWorkerThread;
41 #endif
42 
43 // used to group thread-tasks for waiting in waitTillDone()
45 {
47  sal_Int32 mnTasksWorking;
48  std::condition_variable maTasksComplete;
49 
50 public:
51  ThreadTaskTag();
52  bool isDone();
53  void waitUntilDone();
54  void onTaskWorkerDone();
55  void onTaskPushed();
56 };
57 
58 
60 {
62 public:
63 
64  explicit ThreadWorker( ThreadPool *pPool ) :
65  salhelper::Thread("thread-pool"),
66  mpPool( pPool )
67  {
68  }
69 
70  virtual void execute() override
71  {
72 #if defined DBG_UTIL && (defined LINUX || defined _WIN32)
73  gbIsWorkerThread = true;
74 #endif
75  std::unique_lock< std::mutex > aGuard( mpPool->maMutex );
76 
77  while( !mpPool->mbTerminate )
78  {
79  std::unique_ptr<ThreadTask> pTask = mpPool->popWorkLocked( aGuard, true );
80  if( pTask )
81  {
82  aGuard.unlock();
83 
84  pTask->exec();
85  pTask.reset();
86 
87  aGuard.lock();
88  }
89  }
90  }
91 };
92 
93 ThreadPool::ThreadPool(sal_Int32 nWorkers)
94  : mbTerminate(true)
95  , mnWorkers(nWorkers)
96 {
97 }
98 
100 {
101  // note: calling shutdown from global variable dtor blocks forever on Win7
102  // note2: there isn't enough MSVCRT left on exit to call assert() properly
103  // so these asserts just print something to stderr but exit status is
104  // still 0, but hopefully they will be more helpful on non-WNT platforms
106  assert(maTasks.empty());
107 }
108 
109 namespace {
110 
111 struct ThreadPoolStatic : public rtl::StaticWithInit< std::shared_ptr< ThreadPool >,
112  ThreadPoolStatic >
113 {
114  std::shared_ptr< ThreadPool > operator () () {
115  const sal_Int32 nThreads = ThreadPool::getPreferredConcurrency();
116  return std::make_shared< ThreadPool >( nThreads );
117  };
118 };
119 
120 }
121 
123 {
124  return *ThreadPoolStatic::get();
125 }
126 
128 {
129  static sal_Int32 ThreadCount = [&]()
130  {
131  const sal_Int32 nHardThreads = std::max(std::thread::hardware_concurrency(), 1U);
132  sal_Int32 nThreads = nHardThreads;
133  const char *pEnv = getenv("MAX_CONCURRENCY");
134  if (pEnv != nullptr)
135  {
136  // Override with user/admin preference.
137  nThreads = rtl_str_toInt32(pEnv, 10);
138  }
139 
140  nThreads = std::min(nHardThreads, nThreads);
141  return std::max<sal_Int32>(nThreads, 1);
142  }();
143 
144  return ThreadCount;
145 }
146 
147 // Used to order shutdown, and to ensure there are no lingering
148 // threads after LibreOfficeKit pre-init.
150 {
151 // if (mbTerminate)
152 // return;
153 
154  std::unique_lock< std::mutex > aGuard( maMutex );
155  shutdownLocked(aGuard);
156 }
157 
158 void ThreadPool::shutdownLocked(std::unique_lock<std::mutex>& aGuard)
159 {
160  if( maWorkers.empty() )
161  { // no threads at all -> execute the work in-line
162  std::unique_ptr<ThreadTask> pTask;
163  while ( ( pTask = popWorkLocked(aGuard, false) ) )
164  pTask->exec();
165  }
166  else
167  {
168  while( !maTasks.empty() )
169  maTasksChanged.wait( aGuard );
170  }
171  assert( maTasks.empty() );
172 
173  // coverity[missing_lock] - on purpose
174  mbTerminate = true;
175 
176  maTasksChanged.notify_all();
177 
178  decltype(maWorkers) aWorkers;
179  std::swap(maWorkers, aWorkers);
180  aGuard.unlock();
181 
182  while (!aWorkers.empty())
183  {
184  rtl::Reference<ThreadWorker> xWorker = aWorkers.back();
185  aWorkers.pop_back();
186  assert(std::find(aWorkers.begin(), aWorkers.end(), xWorker)
187  == aWorkers.end());
188  {
189  xWorker->join();
190  xWorker.clear();
191  }
192  }
193 }
194 
195 void ThreadPool::pushTask( std::unique_ptr<ThreadTask> pTask )
196 {
197  std::scoped_lock< std::mutex > aGuard( maMutex );
198 
199  mbTerminate = false;
200 
201  if (maWorkers.size() < mnWorkers && maWorkers.size() <= maTasks.size())
202  {
203  maWorkers.push_back( new ThreadWorker( this ) );
204  maWorkers.back()->launch();
205  }
206 
207  pTask->mpTag->onTaskPushed();
208  maTasks.insert( maTasks.begin(), std::move(pTask) );
209 
210  maTasksChanged.notify_one();
211 }
212 
213 std::unique_ptr<ThreadTask> ThreadPool::popWorkLocked( std::unique_lock< std::mutex > & rGuard, bool bWait )
214 {
215  do
216  {
217  if( !maTasks.empty() )
218  {
219  std::unique_ptr<ThreadTask> pTask = std::move(maTasks.back());
220  maTasks.pop_back();
221  return pTask;
222  }
223  else if (!bWait || mbTerminate)
224  return nullptr;
225 
226  maTasksChanged.wait( rGuard );
227 
228  } while (!mbTerminate);
229 
230  return nullptr;
231 }
232 
233 void ThreadPool::waitUntilDone(const std::shared_ptr<ThreadTaskTag>& rTag, bool bJoinAll)
234 {
235 #if defined DBG_UTIL && (defined LINUX || defined _WIN32)
236  assert(!gbIsWorkerThread && "cannot wait for tasks from inside a task");
237 #endif
238  {
239  std::unique_lock< std::mutex > aGuard( maMutex );
240 
241  if( maWorkers.empty() )
242  { // no threads at all -> execute the work in-line
243  while (!rTag->isDone())
244  {
245  std::unique_ptr<ThreadTask> pTask = popWorkLocked(aGuard, false);
246  if (!pTask)
247  break;
248  pTask->exec();
249  }
250  }
251  }
252 
253  rTag->waitUntilDone();
254 
255  if (bJoinAll)
256  joinAll();
257 }
258 
260 {
261  std::unique_lock< std::mutex > aGuard( maMutex );
262  if (maTasks.empty()) // check if there are still tasks from another tag
263  {
264  shutdownLocked(aGuard);
265  }
266 }
267 
268 std::shared_ptr<ThreadTaskTag> ThreadPool::createThreadTaskTag()
269 {
270  return std::make_shared<ThreadTaskTag>();
271 }
272 
273 bool ThreadPool::isTaskTagDone(const std::shared_ptr<ThreadTaskTag>& pTag)
274 {
275  return pTag->isDone();
276 }
277 
278 ThreadTask::ThreadTask(const std::shared_ptr<ThreadTaskTag>& pTag)
279  : mpTag(pTag)
280 {
281 }
282 
284 {
285  std::shared_ptr<ThreadTaskTag> pTag(mpTag);
286  try {
287  doWork();
288  }
289  catch (const std::exception &e)
290  {
291  SAL_WARN("comphelper", "exception in thread worker while calling doWork(): " << e.what());
292  }
293  catch (const css::uno::Exception &e)
294  {
295  SAL_WARN("comphelper", "exception in thread worker while calling doWork(): " << e);
296  }
297 
298  pTag->onTaskWorkerDone();
299 }
300 
301 ThreadTaskTag::ThreadTaskTag() : mnTasksWorking(0)
302 {
303 }
304 
306 {
307  std::scoped_lock< std::mutex > aGuard( maMutex );
308  mnTasksWorking++;
309  assert( mnTasksWorking < 65536 ); // sanity checking
310 }
311 
313 {
314  std::scoped_lock< std::mutex > aGuard( maMutex );
315  mnTasksWorking--;
316  assert(mnTasksWorking >= 0);
317  if (mnTasksWorking == 0)
318  maTasksComplete.notify_all();
319 }
320 
322 {
323  std::scoped_lock< std::mutex > aGuard( maMutex );
324  return mnTasksWorking == 0;
325 }
326 
328 {
329  std::unique_lock< std::mutex > aGuard( maMutex );
330  while( mnTasksWorking > 0 )
331  {
332 #if defined DBG_UTIL && !defined NDEBUG
333  // 10 minute timeout in debug mode, unless the code is built with
334  // sanitizers or debugged in valgrind or gdb, in which case the threads
335  // should not time out in the middle of a debugging session
336  int maxTimeout = 10 * 60;
337 #if !ENABLE_RUNTIME_OPTIMIZATIONS
338  maxTimeout = 30 * 60;
339 #endif
340 #if defined HAVE_VALGRIND_HEADERS
341  if( RUNNING_ON_VALGRIND )
342  maxTimeout = 30 * 60;
343 #endif
344  if( isDebuggerAttached())
345  maxTimeout = 300 * 60;
346  std::cv_status result = maTasksComplete.wait_for(
347  aGuard, std::chrono::seconds( maxTimeout ));
348  assert(result != std::cv_status::timeout);
349 #else
350  // 10 minute timeout in production so the app eventually throws some kind of error
351  if (maTasksComplete.wait_for(
352  aGuard, std::chrono::seconds( 10 * 60 )) == std::cv_status::timeout)
353  throw std::runtime_error("timeout waiting for threadpool tasks");
354 #endif
355  }
356 }
357 
358 } // namespace comphelper
359 
360 /* vim:set shiftwidth=4 softtabstop=4 expandtab: */
std::vector< std::unique_ptr< ThreadTask > > maTasks
Definition: threadpool.hxx:99
A very basic thread-safe thread pool implementation.
Definition: threadpool.hxx:43
static sal_Int32 getPreferredConcurrency()
returns a configurable max-concurrency limit to avoid spawning an unnecessarily large number of threa...
Definition: threadpool.cxx:127
void waitUntilDone(const std::shared_ptr< ThreadTaskTag > &, bool bJoinAll=true)
Wait until all queued tasks associated with the tag are completed.
Definition: threadpool.cxx:233
virtual void doWork()=0
override to get your task performed by the pool
static thread_local bool gbIsWorkerThread
prevent waiting for a task from inside a task
Definition: threadpool.cxx:40
std::condition_variable maTasksComplete
Definition: threadpool.cxx:48
Thread(char const *name)
const BorderLinePrimitive2D *pCandidateB assert(pCandidateA)
std::mutex maMutex
signalled when all in-progress tasks are complete
Definition: threadpool.hxx:95
ThreadPool(sal_Int32 nWorkers)
Definition: threadpool.cxx:93
static ThreadPool & getSharedOptimalPool()
returns a pointer to a shared pool with optimal thread count for the CPU
Definition: threadpool.cxx:122
ThreadTask(const std::shared_ptr< ThreadTaskTag > &pTag)
Definition: threadpool.cxx:278
std::condition_variable maTasksChanged
Definition: threadpool.hxx:96
void pushTask(std::unique_ptr< ThreadTask > pTask)
push a new task onto the work queue
Definition: threadpool.cxx:195
void joinAll()
join all threads if there are no tasks presently.
Definition: threadpool.cxx:259
std::size_t const mnWorkers
Definition: threadpool.hxx:98
static bool isTaskTagDone(const std::shared_ptr< ThreadTaskTag > &)
Definition: threadpool.cxx:273
friend class ThreadWorker
Definition: threadpool.hxx:84
virtual void execute() override
Definition: threadpool.cxx:70
std::vector< rtl::Reference< ThreadWorker > > maWorkers
Definition: threadpool.hxx:100
std::shared_ptr< ThreadTaskTag > mpTag
Definition: threadpool.hxx:29
void exec()
execute this task
Definition: threadpool.cxx:283
std::mutex mutex
Definition: random.cxx:43
std::unique_ptr< ThreadTask > popWorkLocked(std::unique_lock< std::mutex > &rGuard, bool bWait)
Pop a work task.
Definition: threadpool.cxx:213
void shutdown()
wait until all work is completed, then join all threads
Definition: threadpool.cxx:149
static std::shared_ptr< ThreadTaskTag > createThreadTaskTag()
Definition: threadpool.cxx:268
Any result
#define SAL_WARN(area, stream)
void shutdownLocked(std::unique_lock< std::mutex > &)
Definition: threadpool.cxx:158
bool isDebuggerAttached()
Returns true if the process is running with a debugger attached.