LibreOffice Module comphelper (master)  1
threadpool.cxx
Go to the documentation of this file.
1 /* -*- Mode: C++; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */
2 /*
3  * This file is part of the LibreOffice project.
4  *
5  * This Source Code Form is subject to the terms of the Mozilla Public
6  * License, v. 2.0. If a copy of the MPL was not distributed with this
7  * file, You can obtain one at http://mozilla.org/MPL/2.0/.
8  */
9 
11 
12 #include <com/sun/star/uno/Exception.hpp>
13 #include <config_options.h>
14 #include <sal/config.h>
15 #include <sal/log.hxx>
16 #include <rtl/instance.hxx>
17 #include <salhelper/thread.hxx>
18 #include <algorithm>
19 #include <memory>
20 #include <thread>
21 #include <chrono>
22 
23 #if defined HAVE_VALGRIND_HEADERS
24 #include <valgrind/memcheck.h>
25 #endif
26 
27 #if defined(_WIN32)
28 #define WIN32_LEAN_AND_MEAN
29 #include <windows.h>
30 #elif defined UNX
31 #include <unistd.h>
32 #include <fcntl.h>
33 #endif
34 
35 namespace comphelper {
36 
38 #if defined DBG_UTIL && (defined LINUX || defined _WIN32)
39 static thread_local bool gbIsWorkerThread;
40 #endif
41 
42 // used to group thread-tasks for waiting in waitTillDone()
44 {
45  std::mutex maMutex;
46  sal_Int32 mnTasksWorking;
47  std::condition_variable maTasksComplete;
48 
49 public:
50  ThreadTaskTag();
51  bool isDone();
52  void waitUntilDone();
53  void onTaskWorkerDone();
54  void onTaskPushed();
55 };
56 
57 
59 {
61 public:
62 
63  explicit ThreadWorker( ThreadPool *pPool ) :
64  salhelper::Thread("thread-pool"),
65  mpPool( pPool )
66  {
67  }
68 
69  virtual void execute() override
70  {
71 #if defined DBG_UTIL && (defined LINUX || defined _WIN32)
72  gbIsWorkerThread = true;
73 #endif
74  std::unique_lock< std::mutex > aGuard( mpPool->maMutex );
75 
76  while( !mpPool->mbTerminate )
77  {
78  std::unique_ptr<ThreadTask> pTask = mpPool->popWorkLocked( aGuard, true );
79  if( pTask )
80  {
81  aGuard.unlock();
82 
83  pTask->exec();
84  pTask.reset();
85 
86  aGuard.lock();
87  }
88  }
89  }
90 };
91 
92 ThreadPool::ThreadPool(sal_Int32 nWorkers)
93  : mbTerminate(true)
94  , mnWorkers(nWorkers)
95 {
96 }
97 
99 {
100  // note: calling shutdown from global variable dtor blocks forever on Win7
101  // note2: there isn't enough MSVCRT left on exit to call assert() properly
102  // so these asserts just print something to stderr but exit status is
103  // still 0, but hopefully they will be more helpful on non-WNT platforms
104  assert(mbTerminate);
105  assert(maTasks.empty());
106 }
107 
108 struct ThreadPoolStatic : public rtl::StaticWithInit< std::shared_ptr< ThreadPool >,
109  ThreadPoolStatic >
110 {
111  std::shared_ptr< ThreadPool > operator () () {
112  const sal_Int32 nThreads = ThreadPool::getPreferredConcurrency();
113  return std::make_shared< ThreadPool >( nThreads );
114  };
115 };
116 
118 {
119  return *ThreadPoolStatic::get();
120 }
121 
123 {
124  static sal_Int32 ThreadCount = [&]()
125  {
126  const sal_Int32 nHardThreads = std::max(std::thread::hardware_concurrency(), 1U);
127  sal_Int32 nThreads = nHardThreads;
128  const char *pEnv = getenv("MAX_CONCURRENCY");
129  if (pEnv != nullptr)
130  {
131  // Override with user/admin preference.
132  nThreads = rtl_str_toInt32(pEnv, 10);
133  }
134 
135  nThreads = std::min(nHardThreads, nThreads);
136  return std::max<sal_Int32>(nThreads, 1);
137  }();
138 
139  return ThreadCount;
140 }
141 
142 // Used to order shutdown, and to ensure there are no lingering
143 // threads after LibreOfficeKit pre-init.
145 {
146 // if (mbTerminate)
147 // return;
148 
149  std::unique_lock< std::mutex > aGuard( maMutex );
150  shutdownLocked(aGuard);
151 }
152 
153 void ThreadPool::shutdownLocked(std::unique_lock<std::mutex>& aGuard)
154 {
155  if( maWorkers.empty() )
156  { // no threads at all -> execute the work in-line
157  std::unique_ptr<ThreadTask> pTask;
158  while ( ( pTask = popWorkLocked(aGuard, false) ) )
159  pTask->exec();
160  }
161  else
162  {
163  while( !maTasks.empty() )
164  maTasksChanged.wait( aGuard );
165  }
166  assert( maTasks.empty() );
167 
168  // coverity[missing_lock] - on purpose
169  mbTerminate = true;
170 
171  maTasksChanged.notify_all();
172 
173  decltype(maWorkers) aWorkers;
174  std::swap(maWorkers, aWorkers);
175  aGuard.unlock();
176 
177  while (!aWorkers.empty())
178  {
179  rtl::Reference<ThreadWorker> xWorker = aWorkers.back();
180  aWorkers.pop_back();
181  assert(std::find(aWorkers.begin(), aWorkers.end(), xWorker)
182  == aWorkers.end());
183  {
184  xWorker->join();
185  xWorker.clear();
186  }
187  }
188 }
189 
190 void ThreadPool::pushTask( std::unique_ptr<ThreadTask> pTask )
191 {
192  std::scoped_lock< std::mutex > aGuard( maMutex );
193 
194  mbTerminate = false;
195 
196  if (maWorkers.size() < mnWorkers && maWorkers.size() <= maTasks.size())
197  {
198  maWorkers.push_back( new ThreadWorker( this ) );
199  maWorkers.back()->launch();
200  }
201 
202  pTask->mpTag->onTaskPushed();
203  maTasks.insert( maTasks.begin(), std::move(pTask) );
204 
205  maTasksChanged.notify_one();
206 }
207 
208 std::unique_ptr<ThreadTask> ThreadPool::popWorkLocked( std::unique_lock< std::mutex > & rGuard, bool bWait )
209 {
210  do
211  {
212  if( !maTasks.empty() )
213  {
214  std::unique_ptr<ThreadTask> pTask = std::move(maTasks.back());
215  maTasks.pop_back();
216  return pTask;
217  }
218  else if (!bWait || mbTerminate)
219  return nullptr;
220 
221  maTasksChanged.wait( rGuard );
222 
223  } while (!mbTerminate);
224 
225  return nullptr;
226 }
227 
228 void ThreadPool::waitUntilDone(const std::shared_ptr<ThreadTaskTag>& rTag, bool bJoinAll)
229 {
230 #if defined DBG_UTIL && (defined LINUX || defined _WIN32)
231  assert(!gbIsWorkerThread && "cannot wait for tasks from inside a task");
232 #endif
233  {
234  std::unique_lock< std::mutex > aGuard( maMutex );
235 
236  if( maWorkers.empty() )
237  { // no threads at all -> execute the work in-line
238  std::unique_ptr<ThreadTask> pTask;
239  while (!rTag->isDone() &&
240  ( pTask = popWorkLocked(aGuard, false) ) )
241  pTask->exec();
242  }
243  }
244 
245  rTag->waitUntilDone();
246 
247  if (bJoinAll)
248  joinAll();
249 }
250 
252 {
253  std::unique_lock< std::mutex > aGuard( maMutex );
254  if (maTasks.empty()) // check if there are still tasks from another tag
255  {
256  shutdownLocked(aGuard);
257  }
258 }
259 
260 std::shared_ptr<ThreadTaskTag> ThreadPool::createThreadTaskTag()
261 {
262  return std::make_shared<ThreadTaskTag>();
263 }
264 
265 bool ThreadPool::isTaskTagDone(const std::shared_ptr<ThreadTaskTag>& pTag)
266 {
267  return pTag->isDone();
268 }
269 
270 ThreadTask::ThreadTask(const std::shared_ptr<ThreadTaskTag>& pTag)
271  : mpTag(pTag)
272 {
273 }
274 
276 {
277  std::shared_ptr<ThreadTaskTag> pTag(mpTag);
278  try {
279  doWork();
280  }
281  catch (const std::exception &e)
282  {
283  SAL_WARN("comphelper", "exception in thread worker while calling doWork(): " << e.what());
284  }
285  catch (const css::uno::Exception &e)
286  {
287  SAL_WARN("comphelper", "exception in thread worker while calling doWork(): " << e);
288  }
289 
290  pTag->onTaskWorkerDone();
291 }
292 
293 ThreadTaskTag::ThreadTaskTag() : mnTasksWorking(0)
294 {
295 }
296 
298 {
299  std::scoped_lock< std::mutex > aGuard( maMutex );
300  mnTasksWorking++;
301  assert( mnTasksWorking < 65536 ); // sanity checking
302 }
303 
305 {
306  std::scoped_lock< std::mutex > aGuard( maMutex );
307  mnTasksWorking--;
308  assert(mnTasksWorking >= 0);
309  if (mnTasksWorking == 0)
310  maTasksComplete.notify_all();
311 }
312 
314 {
315  std::scoped_lock< std::mutex > aGuard( maMutex );
316  return mnTasksWorking == 0;
317 }
318 
319 #if defined DBG_UTIL && !defined NDEBUG
320 static bool isDebuggerAttached()
321 {
322 #if defined(_WIN32)
323  return IsDebuggerPresent();
324 #elif defined LINUX
325  char buf[ 4096 ];
326  int fd = open( "/proc/self/status", O_RDONLY );
327  if( fd < 0 )
328  return false;
329  int size = read( fd, buf, sizeof( buf ) - 1 );
330  close( fd );
331  if( size < 0 )
332  return false;
333  assert( size < int( sizeof( buf )) - 1 );
334  buf[ sizeof( buf ) - 1 ] = '\0';
335  // "TracerPid: <pid>" for pid != 0 means something is attached
336  const char* pos = strstr( buf, "TracerPid:" );
337  if( pos == nullptr )
338  return false;
339  pos += strlen( "TracerPid:" );
340  while( *pos != '\n' && isspace( *pos ))
341  ++pos;
342  return *pos != '\n' && *pos != '0';
343 #else
344  return false; // feel free to add your platform
345 #endif
346 }
347 #endif
348 
350 {
351  std::unique_lock< std::mutex > aGuard( maMutex );
352  while( mnTasksWorking > 0 )
353  {
354 #if defined DBG_UTIL && !defined NDEBUG
355  // 10 minute timeout in debug mode, unless the code is built with
356  // sanitizers or debugged in valgrind or gdb, in which case the threads
357  // should not time out in the middle of a debugging session
358  int maxTimeout = 10 * 60;
359 #if !ENABLE_RUNTIME_OPTIMIZATIONS
360  maxTimeout = 30 * 60;
361 #endif
362 #if defined HAVE_VALGRIND_HEADERS
363  if( RUNNING_ON_VALGRIND )
364  maxTimeout = 30 * 60;
365 #endif
366  if( isDebuggerAttached())
367  maxTimeout = 300 * 60;
368  std::cv_status result = maTasksComplete.wait_for(
369  aGuard, std::chrono::seconds( maxTimeout ));
370  assert(result != std::cv_status::timeout);
371 #else
372  // 10 minute timeout in production so the app eventually throws some kind of error
373  if (maTasksComplete.wait_for(
374  aGuard, std::chrono::seconds( 10 * 60 )) == std::cv_status::timeout)
375  throw std::runtime_error("timeout waiting for threadpool tasks");
376 #endif
377  }
378 }
379 
380 } // namespace comphelper
381 
382 /* vim:set shiftwidth=4 softtabstop=4 expandtab: */
std::vector< std::unique_ptr< ThreadTask > > maTasks
Definition: threadpool.hxx:99
A very basic thread-safe thread pool implementation.
Definition: threadpool.hxx:43
static sal_Int32 getPreferredConcurrency()
returns a configurable max-concurrency limit to avoid spawning an unnecessarily large number of threa...
Definition: threadpool.cxx:122
void waitUntilDone(const std::shared_ptr< ThreadTaskTag > &, bool bJoinAll=true)
Wait until all queued tasks associated with the tag are completed.
Definition: threadpool.cxx:228
virtual void doWork()=0
override to get your task performed by the pool
static thread_local bool gbIsWorkerThread
prevent waiting for a task from inside a task
Definition: threadpool.cxx:39
std::condition_variable maTasksComplete
Definition: threadpool.cxx:47
Thread(char const *name)
std::mutex maMutex
signalled when all in-progress tasks are complete
Definition: threadpool.hxx:95
ThreadPool(sal_Int32 nWorkers)
Definition: threadpool.cxx:92
static ThreadPool & getSharedOptimalPool()
returns a pointer to a shared pool with optimal thread count for the CPU
Definition: threadpool.cxx:117
ThreadTask(const std::shared_ptr< ThreadTaskTag > &pTag)
Definition: threadpool.cxx:270
std::condition_variable maTasksChanged
Definition: threadpool.hxx:96
void pushTask(std::unique_ptr< ThreadTask > pTask)
push a new task onto the work queue
Definition: threadpool.cxx:190
void joinAll()
join all threads if there are no tasks presently.
Definition: threadpool.cxx:251
MetadataImporterPluginType * result
std::size_t const mnWorkers
Definition: threadpool.hxx:98
static bool isTaskTagDone(const std::shared_ptr< ThreadTaskTag > &)
Definition: threadpool.cxx:265
friend class ThreadWorker
Definition: threadpool.hxx:84
size
virtual void execute() override
Definition: threadpool.cxx:69
static bool isDebuggerAttached()
Definition: threadpool.cxx:320
std::vector< rtl::Reference< ThreadWorker > > maWorkers
Definition: threadpool.hxx:100
std::shared_ptr< ThreadTaskTag > mpTag
Definition: threadpool.hxx:29
void exec()
execute this task
Definition: threadpool.cxx:275
std::unique_ptr< ThreadTask > popWorkLocked(std::unique_lock< std::mutex > &rGuard, bool bWait)
Pop a work task.
Definition: threadpool.cxx:208
void shutdown()
wait until all work is completed, then join all threads
Definition: threadpool.cxx:144
static std::shared_ptr< ThreadTaskTag > createThreadTaskTag()
Definition: threadpool.cxx:260
std::shared_ptr< ThreadPool > operator()()
Definition: threadpool.cxx:111
#define SAL_WARN(area, stream)
void shutdownLocked(std::unique_lock< std::mutex > &)
Definition: threadpool.cxx:153