LibreOffice Module comphelper (master) 1
threadpool.cxx
Go to the documentation of this file.
1/* -*- Mode: C++; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */
2/*
3 * This file is part of the LibreOffice project.
4 *
5 * This Source Code Form is subject to the terms of the Mozilla Public
6 * License, v. 2.0. If a copy of the MPL was not distributed with this
7 * file, You can obtain one at http://mozilla.org/MPL/2.0/.
8 */
9
11
12#include <com/sun/star/uno/Exception.hpp>
13#include <config_options.h>
14#include <o3tl/safeint.hxx>
15#include <sal/config.h>
16#include <sal/log.hxx>
17#include <salhelper/thread.hxx>
18#include <algorithm>
19#include <memory>
20#include <thread>
21#include <chrono>
22#include <cstddef>
24#include <utility>
25
26#if defined HAVE_VALGRIND_HEADERS
27#include <valgrind/memcheck.h>
28#endif
29
30#if defined(_WIN32)
31#define WIN32_LEAN_AND_MEAN
32#include <windows.h>
33#endif
34
35namespace comphelper {
36
38#if defined DBG_UTIL && (defined LINUX || defined _WIN32)
39static thread_local bool gbIsWorkerThread;
40#endif
41
42// used to group thread-tasks for waiting in waitTillDone()
44{
46 sal_Int32 mnTasksWorking;
47 std::condition_variable maTasksComplete;
48
49public:
51 bool isDone();
52 void waitUntilDone();
53 void onTaskWorkerDone();
54 void onTaskPushed();
55};
56
57
59{
61public:
62
63 explicit ThreadWorker( ThreadPool *pPool ) :
64 salhelper::Thread("thread-pool"),
65 mpPool( pPool )
66 {
67 }
68
69 virtual void execute() override
70 {
71#if defined DBG_UTIL && (defined LINUX || defined _WIN32)
72 gbIsWorkerThread = true;
73#endif
74 std::unique_lock< std::mutex > aGuard( mpPool->maMutex );
75
76 while( !mpPool->mbTerminate )
77 {
78 std::unique_ptr<ThreadTask> pTask = mpPool->popWorkLocked( aGuard, true );
79 if( pTask )
80 {
81 std::shared_ptr<ThreadTaskTag> pTag(pTask->mpTag);
83 aGuard.unlock();
84
85 pTask->exec();
86 pTask.reset();
87
88 aGuard.lock();
90 pTag->onTaskWorkerDone();
91 }
92 }
93 }
94};
95
96ThreadPool::ThreadPool(std::size_t nWorkers)
97 : mbTerminate(true)
98 , mnMaxWorkers(nWorkers)
99 , mnBusyWorkers(0)
100{
101}
102
104{
105 // note: calling shutdown from global variable dtor blocks forever on Win7
106 // note2: there isn't enough MSVCRT left on exit to call assert() properly
107 // so these asserts just print something to stderr but exit status is
108 // still 0, but hopefully they will be more helpful on non-WNT platforms
109 assert(mbTerminate);
110 assert(maTasks.empty());
111 assert(mnBusyWorkers == 0);
112}
113
114namespace {
115
116std::shared_ptr< ThreadPool >& GetStaticThreadPool()
117{
118 static std::shared_ptr< ThreadPool > POOL =
119 []()
120 {
121 const std::size_t nThreads = ThreadPool::getPreferredConcurrency();
122 return std::make_shared< ThreadPool >( nThreads );
123 }();
124 return POOL;
125}
126
127}
128
130{
131 return *GetStaticThreadPool();
132}
133
135{
136 static std::size_t ThreadCount = []()
137 {
138 const std::size_t nHardThreads = o3tl::clamp_to_unsigned<std::size_t>(
139 std::max(std::thread::hardware_concurrency(), 1U));
140 std::size_t nThreads = nHardThreads;
141 const char *pEnv = getenv("MAX_CONCURRENCY");
142 if (pEnv != nullptr)
143 {
144 // Override with user/admin preference.
145 nThreads = o3tl::clamp_to_unsigned<std::size_t>(rtl_str_toInt32(pEnv, 10));
146 }
147
148 nThreads = std::min(nHardThreads, nThreads);
149 return std::max<std::size_t>(nThreads, 1);
150 }();
151
152 return ThreadCount;
153}
154
155// Used to order shutdown, and to ensure there are no lingering
156// threads after LibreOfficeKit pre-init.
158{
159// if (mbTerminate)
160// return;
161
162 std::unique_lock< std::mutex > aGuard( maMutex );
163 shutdownLocked(aGuard);
164}
165
166void ThreadPool::shutdownLocked(std::unique_lock<std::mutex>& aGuard)
167{
168 if( maWorkers.empty() )
169 { // no threads at all -> execute the work in-line
170 std::unique_ptr<ThreadTask> pTask;
171 while ( ( pTask = popWorkLocked(aGuard, false) ) )
172 {
173 std::shared_ptr<ThreadTaskTag> pTag(pTask->mpTag);
174 pTask->exec();
175 pTag->onTaskWorkerDone();
176 }
177 }
178 else
179 {
180 while( !maTasks.empty() )
181 {
182 maTasksChanged.wait( aGuard );
183 // In the (unlikely but possible?) case pushTask() gets called meanwhile,
184 // its notify_one() call is meant to wake a up a thread and process the task.
185 // But if this code gets woken up instead, it could lead to a deadlock.
186 // Pass on the notification.
187 maTasksChanged.notify_one();
188 }
189 }
190 assert( maTasks.empty() );
191
192 // coverity[missing_lock] - on purpose
193 mbTerminate = true;
194
195 maTasksChanged.notify_all();
196
197 decltype(maWorkers) aWorkers;
198 std::swap(maWorkers, aWorkers);
199 aGuard.unlock();
200
201 while (!aWorkers.empty())
202 {
203 rtl::Reference<ThreadWorker> xWorker = aWorkers.back();
204 aWorkers.pop_back();
205 assert(std::find(aWorkers.begin(), aWorkers.end(), xWorker)
206 == aWorkers.end());
207 {
208 xWorker->join();
209 xWorker.clear();
210 }
211 }
212}
213
214void ThreadPool::pushTask( std::unique_ptr<ThreadTask> pTask )
215{
216 std::scoped_lock< std::mutex > aGuard( maMutex );
217
218 mbTerminate = false;
219
220 // Worked on tasks are already removed from maTasks, so include the count of busy workers.
221 if (maWorkers.size() < mnMaxWorkers && maWorkers.size() <= maTasks.size() + mnBusyWorkers)
222 {
223 maWorkers.push_back( new ThreadWorker( this ) );
224 maWorkers.back()->launch();
225 }
226
227 pTask->mpTag->onTaskPushed();
228 maTasks.insert( maTasks.begin(), std::move(pTask) );
229
230 maTasksChanged.notify_one();
231}
232
233std::unique_ptr<ThreadTask> ThreadPool::popWorkLocked( std::unique_lock< std::mutex > & rGuard, bool bWait )
234{
235 do
236 {
237 if( !maTasks.empty() )
238 {
239 std::unique_ptr<ThreadTask> pTask = std::move(maTasks.back());
240 maTasks.pop_back();
241 return pTask;
242 }
243 else if (!bWait || mbTerminate)
244 return nullptr;
245
246 maTasksChanged.wait( rGuard );
247
248 } while (!mbTerminate);
249
250 return nullptr;
251}
252
254{
256}
257
259{
260 assert(mnBusyWorkers >= 1);
262}
263
264void ThreadPool::waitUntilDone(const std::shared_ptr<ThreadTaskTag>& rTag, bool bJoin)
265{
266#if defined DBG_UTIL && (defined LINUX || defined _WIN32)
267 assert(!gbIsWorkerThread && "cannot wait for tasks from inside a task");
268#endif
269 {
270 std::unique_lock< std::mutex > aGuard( maMutex );
271
272 if( maWorkers.empty() )
273 { // no threads at all -> execute the work in-line
274 while (!rTag->isDone())
275 {
276 std::unique_ptr<ThreadTask> pTask = popWorkLocked(aGuard, false);
277 if (!pTask)
278 break;
279 std::shared_ptr<ThreadTaskTag> pTag(pTask->mpTag);
280 pTask->exec();
281 pTag->onTaskWorkerDone();
282 }
283 }
284 }
285
286 rTag->waitUntilDone();
287
288 if (bJoin)
290}
291
293{
294 std::unique_lock< std::mutex > aGuard( maMutex );
295 if (isIdle()) // check if there are still tasks from another tag
296 {
297 shutdownLocked(aGuard);
298 }
299}
300
301std::shared_ptr<ThreadTaskTag> ThreadPool::createThreadTaskTag()
302{
303 return std::make_shared<ThreadTaskTag>();
304}
305
306bool ThreadPool::isTaskTagDone(const std::shared_ptr<ThreadTaskTag>& pTag)
307{
308 return pTag->isDone();
309}
310
311ThreadTask::ThreadTask(std::shared_ptr<ThreadTaskTag> xTag)
312 : mpTag(std::move(xTag))
313{
314}
315
317{
318 try {
319 doWork();
320 }
321 catch (const std::exception &e)
322 {
323 SAL_WARN("comphelper", "exception in thread worker while calling doWork(): " << e.what());
324 }
325 catch (const css::uno::Exception &e)
326 {
327 SAL_WARN("comphelper", "exception in thread worker while calling doWork(): " << e);
328 }
329 catch (...)
330 {
331 SAL_WARN("comphelper", "unknown exception in thread worker while calling doWork()");
332 }
333}
334
335ThreadTaskTag::ThreadTaskTag() : mnTasksWorking(0)
336{
337}
338
340{
341 std::scoped_lock< std::mutex > aGuard( maMutex );
343 assert( mnTasksWorking < 65536 ); // sanity checking
344}
345
347{
348 std::scoped_lock< std::mutex > aGuard( maMutex );
350 assert(mnTasksWorking >= 0);
351 if (mnTasksWorking == 0)
352 maTasksComplete.notify_all();
353}
354
356{
357 std::scoped_lock< std::mutex > aGuard( maMutex );
358 return mnTasksWorking == 0;
359}
360
362{
363 std::unique_lock< std::mutex > aGuard( maMutex );
364 while( mnTasksWorking > 0 )
365 {
366#if defined DBG_UTIL && !defined NDEBUG
367 // 10 minute timeout in debug mode, unless the code is built with
368 // sanitizers or debugged in valgrind or gdb, in which case the threads
369 // should not time out in the middle of a debugging session
370 int maxTimeout = 10 * 60;
371#if !ENABLE_RUNTIME_OPTIMIZATIONS
372 maxTimeout = 30 * 60;
373#endif
374#if defined HAVE_VALGRIND_HEADERS
375 if( RUNNING_ON_VALGRIND )
376 maxTimeout = 30 * 60;
377#endif
378 if( isDebuggerAttached())
379 maxTimeout = 300 * 60;
380 std::cv_status result = maTasksComplete.wait_for(
381 aGuard, std::chrono::seconds( maxTimeout ));
382 assert(result != std::cv_status::timeout);
383#else
384 // 10 minute timeout in production so the app eventually throws some kind of error
385 if (maTasksComplete.wait_for(
386 aGuard, std::chrono::seconds( 10 * 60 )) == std::cv_status::timeout)
387 throw std::runtime_error("timeout waiting for threadpool tasks");
388#endif
389 }
390}
391
392} // namespace comphelper
393
394/* vim:set shiftwidth=4 softtabstop=4 expandtab: */
virtual void execute() override
Definition: threadpool.cxx:69
A very basic thread-safe thread pool implementation.
Definition: threadpool.hxx:45
static ThreadPool & getSharedOptimalPool()
returns a pointer to a shared pool with optimal thread count for the CPU
Definition: threadpool.cxx:129
void waitUntilDone(const std::shared_ptr< ThreadTaskTag > &, bool bJoin=true)
Wait until all queued tasks associated with the tag are completed.
Definition: threadpool.cxx:264
std::size_t const mnMaxWorkers
Definition: threadpool.hxx:104
static std::shared_ptr< ThreadTaskTag > createThreadTaskTag()
Definition: threadpool.cxx:301
void joinThreadsIfIdle()
join all threads if there are no tasks presently.
Definition: threadpool.cxx:292
void pushTask(std::unique_ptr< ThreadTask > pTask)
push a new task onto the work queue
Definition: threadpool.cxx:214
std::condition_variable maTasksChanged
Definition: threadpool.hxx:102
std::size_t mnBusyWorkers
Definition: threadpool.hxx:105
std::vector< rtl::Reference< ThreadWorker > > maWorkers
Definition: threadpool.hxx:107
static std::size_t getPreferredConcurrency()
returns a configurable max-concurrency limit to avoid spawning an unnecessarily large number of threa...
Definition: threadpool.cxx:134
friend class ThreadWorker
Definition: threadpool.hxx:88
std::vector< std::unique_ptr< ThreadTask > > maTasks
Definition: threadpool.hxx:106
static bool isTaskTagDone(const std::shared_ptr< ThreadTaskTag > &)
Definition: threadpool.cxx:306
ThreadPool(std::size_t nWorkers)
Definition: threadpool.cxx:96
void shutdown()
wait until all work is completed, then join all threads
Definition: threadpool.cxx:157
bool isIdle() const
return true if there are no queued or worked-on tasks
Definition: threadpool.hxx:76
void shutdownLocked(std::unique_lock< std::mutex > &)
Definition: threadpool.cxx:166
std::unique_ptr< ThreadTask > popWorkLocked(std::unique_lock< std::mutex > &rGuard, bool bWait)
Pop a work task.
Definition: threadpool.cxx:233
std::mutex maMutex
signalled when all in-progress tasks are complete
Definition: threadpool.hxx:101
std::condition_variable maTasksComplete
Definition: threadpool.cxx:47
void exec()
execute this task
Definition: threadpool.cxx:316
virtual void doWork()=0
override to get your task performed by the pool
ThreadTask(std::shared_ptr< ThreadTaskTag > pTag)
Definition: threadpool.cxx:311
Thread(char const *name)
#define SAL_WARN(area, stream)
bool isDebuggerAttached()
Returns true if the process is running with a debugger attached.
static thread_local bool gbIsWorkerThread
prevent waiting for a task from inside a task
Definition: threadpool.cxx:39
static PropertyMapEntry const * find(const rtl::Reference< PropertySetInfo > &mxInfo, const OUString &aName) noexcept
std::mutex mutex
Definition: random.cxx:41
Any result