12#include <com/sun/star/uno/Exception.hpp>
13#include <config_options.h>
26#if defined HAVE_VALGRIND_HEADERS
27#include <valgrind/memcheck.h>
31#define WIN32_LEAN_AND_MEAN
38#if defined DBG_UTIL && (defined LINUX || defined _WIN32)
71#if defined DBG_UTIL && (defined LINUX || defined _WIN32)
81 std::shared_ptr<ThreadTaskTag> pTag(pTask->mpTag);
90 pTag->onTaskWorkerDone();
98 , mnMaxWorkers(nWorkers)
116std::shared_ptr< ThreadPool >& GetStaticThreadPool()
118 static std::shared_ptr< ThreadPool > POOL =
122 return std::make_shared< ThreadPool >( nThreads );
131 return *GetStaticThreadPool();
136 static std::size_t ThreadCount = []()
138 const std::size_t nHardThreads = o3tl::clamp_to_unsigned<std::size_t>(
139 std::max(std::thread::hardware_concurrency(), 1U));
140 std::size_t nThreads = nHardThreads;
141 const char *pEnv = getenv(
"MAX_CONCURRENCY");
145 nThreads = o3tl::clamp_to_unsigned<std::size_t>(rtl_str_toInt32(pEnv, 10));
148 nThreads = std::min(nHardThreads, nThreads);
149 return std::max<std::size_t>(nThreads, 1);
162 std::unique_lock< std::mutex > aGuard(
maMutex );
170 std::unique_ptr<ThreadTask> pTask;
173 std::shared_ptr<ThreadTaskTag> pTag(pTask->mpTag);
175 pTag->onTaskWorkerDone();
201 while (!aWorkers.empty())
205 assert(
std::find(aWorkers.begin(), aWorkers.end(), xWorker)
216 std::scoped_lock< std::mutex > aGuard(
maMutex );
227 pTask->mpTag->onTaskPushed();
239 std::unique_ptr<ThreadTask> pTask = std::move(
maTasks.back());
266#if defined DBG_UTIL && (defined LINUX || defined _WIN32)
270 std::unique_lock< std::mutex > aGuard(
maMutex );
274 while (!rTag->isDone())
276 std::unique_ptr<ThreadTask> pTask =
popWorkLocked(aGuard,
false);
279 std::shared_ptr<ThreadTaskTag> pTag(pTask->mpTag);
281 pTag->onTaskWorkerDone();
286 rTag->waitUntilDone();
294 std::unique_lock< std::mutex > aGuard(
maMutex );
303 return std::make_shared<ThreadTaskTag>();
308 return pTag->isDone();
312 : mpTag(
std::move(xTag))
321 catch (
const std::exception &e)
323 SAL_WARN(
"comphelper",
"exception in thread worker while calling doWork(): " << e.what());
325 catch (
const css::uno::Exception &e)
327 SAL_WARN(
"comphelper",
"exception in thread worker while calling doWork(): " << e);
331 SAL_WARN(
"comphelper",
"unknown exception in thread worker while calling doWork()");
341 std::scoped_lock< std::mutex > aGuard(
maMutex );
348 std::scoped_lock< std::mutex > aGuard(
maMutex );
357 std::scoped_lock< std::mutex > aGuard(
maMutex );
363 std::unique_lock< std::mutex > aGuard(
maMutex );
366#if defined DBG_UTIL && !defined NDEBUG
370 int maxTimeout = 10 * 60;
371#if !ENABLE_RUNTIME_OPTIMIZATIONS
372 maxTimeout = 30 * 60;
374#if defined HAVE_VALGRIND_HEADERS
375 if( RUNNING_ON_VALGRIND )
376 maxTimeout = 30 * 60;
379 maxTimeout = 300 * 60;
381 aGuard, std::chrono::seconds( maxTimeout ));
382 assert(
result != std::cv_status::timeout);
386 aGuard, std::chrono::seconds( 10 * 60 )) == std::cv_status::timeout)
387 throw std::runtime_error(
"timeout waiting for threadpool tasks");
ThreadWorker(ThreadPool *pPool)
virtual void execute() override
A very basic thread-safe thread pool implementation.
static ThreadPool & getSharedOptimalPool()
returns a pointer to a shared pool with optimal thread count for the CPU
void waitUntilDone(const std::shared_ptr< ThreadTaskTag > &, bool bJoin=true)
Wait until all queued tasks associated with the tag are completed.
std::size_t const mnMaxWorkers
static std::shared_ptr< ThreadTaskTag > createThreadTaskTag()
void joinThreadsIfIdle()
join all threads if there are no tasks presently.
void pushTask(std::unique_ptr< ThreadTask > pTask)
push a new task onto the work queue
std::condition_variable maTasksChanged
std::size_t mnBusyWorkers
std::vector< rtl::Reference< ThreadWorker > > maWorkers
static std::size_t getPreferredConcurrency()
returns a configurable max-concurrency limit to avoid spawning an unnecessarily large number of threa...
friend class ThreadWorker
std::vector< std::unique_ptr< ThreadTask > > maTasks
static bool isTaskTagDone(const std::shared_ptr< ThreadTaskTag > &)
ThreadPool(std::size_t nWorkers)
void shutdown()
wait until all work is completed, then join all threads
bool isIdle() const
return true if there are no queued or worked-on tasks
void shutdownLocked(std::unique_lock< std::mutex > &)
std::unique_ptr< ThreadTask > popWorkLocked(std::unique_lock< std::mutex > &rGuard, bool bWait)
Pop a work task.
std::mutex maMutex
signalled when all in-progress tasks are complete
std::condition_variable maTasksComplete
void exec()
execute this task
virtual void doWork()=0
override to get your task performed by the pool
ThreadTask(std::shared_ptr< ThreadTaskTag > pTag)
#define SAL_WARN(area, stream)
bool isDebuggerAttached()
Returns true if the process is running with a debugger attached.
static thread_local bool gbIsWorkerThread
prevent waiting for a task from inside a task
static PropertyMapEntry const * find(const rtl::Reference< PropertySetInfo > &mxInfo, const OUString &aName) noexcept