LibreOffice Module comphelper (master)  1
threadpool.cxx
Go to the documentation of this file.
1 /* -*- Mode: C++; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */
2 /*
3  * This file is part of the LibreOffice project.
4  *
5  * This Source Code Form is subject to the terms of the Mozilla Public
6  * License, v. 2.0. If a copy of the MPL was not distributed with this
7  * file, You can obtain one at http://mozilla.org/MPL/2.0/.
8  */
9 
11 
12 #include <com/sun/star/uno/Exception.hpp>
13 #include <config_options.h>
14 #include <sal/config.h>
15 #include <sal/log.hxx>
16 #include <rtl/instance.hxx>
17 #include <salhelper/thread.hxx>
18 #include <algorithm>
19 #include <memory>
20 #include <thread>
21 #include <chrono>
23 
24 #if defined HAVE_VALGRIND_HEADERS
25 #include <valgrind/memcheck.h>
26 #endif
27 
28 #if defined(_WIN32)
29 #define WIN32_LEAN_AND_MEAN
30 #include <windows.h>
31 #elif defined UNX
32 #include <unistd.h>
33 #include <fcntl.h>
34 #endif
35 
36 namespace comphelper {
37 
39 #if defined DBG_UTIL && (defined LINUX || defined _WIN32)
40 static thread_local bool gbIsWorkerThread;
41 #endif
42 
43 // used to group thread-tasks for waiting in waitTillDone()
45 {
47  sal_Int32 mnTasksWorking;
48  std::condition_variable maTasksComplete;
49 
50 public:
51  ThreadTaskTag();
52  bool isDone();
53  void waitUntilDone();
54  void onTaskWorkerDone();
55  void onTaskPushed();
56 };
57 
58 
60 {
62 public:
63 
64  explicit ThreadWorker( ThreadPool *pPool ) :
65  salhelper::Thread("thread-pool"),
66  mpPool( pPool )
67  {
68  }
69 
70  virtual void execute() override
71  {
72 #if defined DBG_UTIL && (defined LINUX || defined _WIN32)
73  gbIsWorkerThread = true;
74 #endif
75  std::unique_lock< std::mutex > aGuard( mpPool->maMutex );
76 
77  while( !mpPool->mbTerminate )
78  {
79  std::unique_ptr<ThreadTask> pTask = mpPool->popWorkLocked( aGuard, true );
80  if( pTask )
81  {
82  aGuard.unlock();
83 
84  pTask->exec();
85  pTask.reset();
86 
87  aGuard.lock();
88  }
89  }
90  }
91 };
92 
93 ThreadPool::ThreadPool(sal_Int32 nWorkers)
94  : mbTerminate(true)
95  , mnWorkers(nWorkers)
96 {
97 }
98 
100 {
101  // note: calling shutdown from global variable dtor blocks forever on Win7
102  // note2: there isn't enough MSVCRT left on exit to call assert() properly
103  // so these asserts just print something to stderr but exit status is
104  // still 0, but hopefully they will be more helpful on non-WNT platforms
105  assert(mbTerminate);
106  assert(maTasks.empty());
107 }
108 
109 namespace {
110 
111 struct ThreadPoolStatic : public rtl::StaticWithInit< std::shared_ptr< ThreadPool >,
112  ThreadPoolStatic >
113 {
114  std::shared_ptr< ThreadPool > operator () () {
115  const sal_Int32 nThreads = ThreadPool::getPreferredConcurrency();
116  return std::make_shared< ThreadPool >( nThreads );
117  };
118 };
119 
120 }
121 
123 {
124  return *ThreadPoolStatic::get();
125 }
126 
128 {
129  static sal_Int32 ThreadCount = [&]()
130  {
131  const sal_Int32 nHardThreads = std::max(std::thread::hardware_concurrency(), 1U);
132  sal_Int32 nThreads = nHardThreads;
133  const char *pEnv = getenv("MAX_CONCURRENCY");
134  if (pEnv != nullptr)
135  {
136  // Override with user/admin preference.
137  nThreads = rtl_str_toInt32(pEnv, 10);
138  }
139 
140  nThreads = std::min(nHardThreads, nThreads);
141  return std::max<sal_Int32>(nThreads, 1);
142  }();
143 
144  return ThreadCount;
145 }
146 
147 // Used to order shutdown, and to ensure there are no lingering
148 // threads after LibreOfficeKit pre-init.
150 {
151 // if (mbTerminate)
152 // return;
153 
154  std::unique_lock< std::mutex > aGuard( maMutex );
155  shutdownLocked(aGuard);
156 }
157 
158 void ThreadPool::shutdownLocked(std::unique_lock<std::mutex>& aGuard)
159 {
160  if( maWorkers.empty() )
161  { // no threads at all -> execute the work in-line
162  std::unique_ptr<ThreadTask> pTask;
163  while ( ( pTask = popWorkLocked(aGuard, false) ) )
164  pTask->exec();
165  }
166  else
167  {
168  while( !maTasks.empty() )
169  maTasksChanged.wait( aGuard );
170  }
171  assert( maTasks.empty() );
172 
173  // coverity[missing_lock] - on purpose
174  mbTerminate = true;
175 
176  maTasksChanged.notify_all();
177 
178  decltype(maWorkers) aWorkers;
179  std::swap(maWorkers, aWorkers);
180  aGuard.unlock();
181 
182  while (!aWorkers.empty())
183  {
184  rtl::Reference<ThreadWorker> xWorker = aWorkers.back();
185  aWorkers.pop_back();
186  assert(std::find(aWorkers.begin(), aWorkers.end(), xWorker)
187  == aWorkers.end());
188  {
189  xWorker->join();
190  xWorker.clear();
191  }
192  }
193 }
194 
195 void ThreadPool::pushTask( std::unique_ptr<ThreadTask> pTask )
196 {
197  std::scoped_lock< std::mutex > aGuard( maMutex );
198 
199  mbTerminate = false;
200 
201  if (maWorkers.size() < mnWorkers && maWorkers.size() <= maTasks.size())
202  {
203  maWorkers.push_back( new ThreadWorker( this ) );
204  maWorkers.back()->launch();
205  }
206 
207  pTask->mpTag->onTaskPushed();
208  maTasks.insert( maTasks.begin(), std::move(pTask) );
209 
210  maTasksChanged.notify_one();
211 }
212 
213 std::unique_ptr<ThreadTask> ThreadPool::popWorkLocked( std::unique_lock< std::mutex > & rGuard, bool bWait )
214 {
215  do
216  {
217  if( !maTasks.empty() )
218  {
219  std::unique_ptr<ThreadTask> pTask = std::move(maTasks.back());
220  maTasks.pop_back();
221  return pTask;
222  }
223  else if (!bWait || mbTerminate)
224  return nullptr;
225 
226  maTasksChanged.wait( rGuard );
227 
228  } while (!mbTerminate);
229 
230  return nullptr;
231 }
232 
233 void ThreadPool::waitUntilDone(const std::shared_ptr<ThreadTaskTag>& rTag, bool bJoinAll)
234 {
235 #if defined DBG_UTIL && (defined LINUX || defined _WIN32)
236  assert(!gbIsWorkerThread && "cannot wait for tasks from inside a task");
237 #endif
238  {
239  std::unique_lock< std::mutex > aGuard( maMutex );
240 
241  if( maWorkers.empty() )
242  { // no threads at all -> execute the work in-line
243  std::unique_ptr<ThreadTask> pTask;
244  while (!rTag->isDone() &&
245  ( pTask = popWorkLocked(aGuard, false) ) )
246  pTask->exec();
247  }
248  }
249 
250  rTag->waitUntilDone();
251 
252  if (bJoinAll)
253  joinAll();
254 }
255 
257 {
258  std::unique_lock< std::mutex > aGuard( maMutex );
259  if (maTasks.empty()) // check if there are still tasks from another tag
260  {
261  shutdownLocked(aGuard);
262  }
263 }
264 
265 std::shared_ptr<ThreadTaskTag> ThreadPool::createThreadTaskTag()
266 {
267  return std::make_shared<ThreadTaskTag>();
268 }
269 
270 bool ThreadPool::isTaskTagDone(const std::shared_ptr<ThreadTaskTag>& pTag)
271 {
272  return pTag->isDone();
273 }
274 
275 ThreadTask::ThreadTask(const std::shared_ptr<ThreadTaskTag>& pTag)
276  : mpTag(pTag)
277 {
278 }
279 
281 {
282  std::shared_ptr<ThreadTaskTag> pTag(mpTag);
283  try {
284  doWork();
285  }
286  catch (const std::exception &e)
287  {
288  SAL_WARN("comphelper", "exception in thread worker while calling doWork(): " << e.what());
289  }
290  catch (const css::uno::Exception &e)
291  {
292  SAL_WARN("comphelper", "exception in thread worker while calling doWork(): " << e);
293  }
294 
295  pTag->onTaskWorkerDone();
296 }
297 
298 ThreadTaskTag::ThreadTaskTag() : mnTasksWorking(0)
299 {
300 }
301 
303 {
304  std::scoped_lock< std::mutex > aGuard( maMutex );
305  mnTasksWorking++;
306  assert( mnTasksWorking < 65536 ); // sanity checking
307 }
308 
310 {
311  std::scoped_lock< std::mutex > aGuard( maMutex );
312  mnTasksWorking--;
313  assert(mnTasksWorking >= 0);
314  if (mnTasksWorking == 0)
315  maTasksComplete.notify_all();
316 }
317 
319 {
320  std::scoped_lock< std::mutex > aGuard( maMutex );
321  return mnTasksWorking == 0;
322 }
323 
325 {
326  std::unique_lock< std::mutex > aGuard( maMutex );
327  while( mnTasksWorking > 0 )
328  {
329 #if defined DBG_UTIL && !defined NDEBUG
330  // 10 minute timeout in debug mode, unless the code is built with
331  // sanitizers or debugged in valgrind or gdb, in which case the threads
332  // should not time out in the middle of a debugging session
333  int maxTimeout = 10 * 60;
334 #if !ENABLE_RUNTIME_OPTIMIZATIONS
335  maxTimeout = 30 * 60;
336 #endif
337 #if defined HAVE_VALGRIND_HEADERS
338  if( RUNNING_ON_VALGRIND )
339  maxTimeout = 30 * 60;
340 #endif
341  if( isDebuggerAttached())
342  maxTimeout = 300 * 60;
343  std::cv_status result = maTasksComplete.wait_for(
344  aGuard, std::chrono::seconds( maxTimeout ));
345  assert(result != std::cv_status::timeout);
346 #else
347  // 10 minute timeout in production so the app eventually throws some kind of error
348  if (maTasksComplete.wait_for(
349  aGuard, std::chrono::seconds( 10 * 60 )) == std::cv_status::timeout)
350  throw std::runtime_error("timeout waiting for threadpool tasks");
351 #endif
352  }
353 }
354 
355 } // namespace comphelper
356 
357 /* vim:set shiftwidth=4 softtabstop=4 expandtab: */
std::vector< std::unique_ptr< ThreadTask > > maTasks
Definition: threadpool.hxx:99
A very basic thread-safe thread pool implementation.
Definition: threadpool.hxx:43
static sal_Int32 getPreferredConcurrency()
returns a configurable max-concurrency limit to avoid spawning an unnecessarily large number of threa...
Definition: threadpool.cxx:127
void waitUntilDone(const std::shared_ptr< ThreadTaskTag > &, bool bJoinAll=true)
Wait until all queued tasks associated with the tag are completed.
Definition: threadpool.cxx:233
virtual void doWork()=0
override to get your task performed by the pool
static thread_local bool gbIsWorkerThread
prevent waiting for a task from inside a task
Definition: threadpool.cxx:40
std::condition_variable maTasksComplete
Definition: threadpool.cxx:48
Thread(char const *name)
std::mutex maMutex
signalled when all in-progress tasks are complete
Definition: threadpool.hxx:95
ThreadPool(sal_Int32 nWorkers)
Definition: threadpool.cxx:93
static ThreadPool & getSharedOptimalPool()
returns a pointer to a shared pool with optimal thread count for the CPU
Definition: threadpool.cxx:122
ThreadTask(const std::shared_ptr< ThreadTaskTag > &pTag)
Definition: threadpool.cxx:275
std::condition_variable maTasksChanged
Definition: threadpool.hxx:96
void pushTask(std::unique_ptr< ThreadTask > pTask)
push a new task onto the work queue
Definition: threadpool.cxx:195
void joinAll()
join all threads if there are no tasks presently.
Definition: threadpool.cxx:256
std::size_t const mnWorkers
Definition: threadpool.hxx:98
static bool isTaskTagDone(const std::shared_ptr< ThreadTaskTag > &)
Definition: threadpool.cxx:270
friend class ThreadWorker
Definition: threadpool.hxx:84
virtual void execute() override
Definition: threadpool.cxx:70
std::vector< rtl::Reference< ThreadWorker > > maWorkers
Definition: threadpool.hxx:100
std::shared_ptr< ThreadTaskTag > mpTag
Definition: threadpool.hxx:29
void exec()
execute this task
Definition: threadpool.cxx:280
std::mutex mutex
Definition: random.cxx:43
std::unique_ptr< ThreadTask > popWorkLocked(std::unique_lock< std::mutex > &rGuard, bool bWait)
Pop a work task.
Definition: threadpool.cxx:213
void shutdown()
wait until all work is completed, then join all threads
Definition: threadpool.cxx:149
static std::shared_ptr< ThreadTaskTag > createThreadTaskTag()
Definition: threadpool.cxx:265
Any result
#define SAL_WARN(area, stream)
void shutdownLocked(std::unique_lock< std::mutex > &)
Definition: threadpool.cxx:158
bool isDebuggerAttached()
Returns true if the process is running with a debugger attached.