12 #include <com/sun/star/uno/Exception.hpp>
13 #include <config_options.h>
16 #include <rtl/instance.hxx>
24 #if defined HAVE_VALGRIND_HEADERS
25 #include <valgrind/memcheck.h>
29 #define WIN32_LEAN_AND_MEAN
36 #if defined DBG_UTIL && (defined LINUX || defined _WIN32)
69 #if defined DBG_UTIL && (defined LINUX || defined _WIN32)
70 gbIsWorkerThread =
true;
72 std::unique_lock< std::mutex > aGuard( mpPool->
maMutex );
76 std::unique_ptr<ThreadTask> pTask = mpPool->
popWorkLocked( aGuard,
true );
79 std::shared_ptr<ThreadTaskTag> pTag(pTask->mpTag);
88 pTag->onTaskWorkerDone();
96 , mnMaxWorkers(nWorkers)
114 struct ThreadPoolStatic :
public rtl::StaticWithInit< std::shared_ptr< ThreadPool >,
117 std::shared_ptr< ThreadPool > operator () () {
119 return std::make_shared< ThreadPool >( nThreads );
127 return *ThreadPoolStatic::get();
132 static sal_Int32 ThreadCount = [&]()
134 const sal_Int32 nHardThreads = std::max(std::thread::hardware_concurrency(), 1U);
135 sal_Int32 nThreads = nHardThreads;
136 const char *pEnv = getenv(
"MAX_CONCURRENCY");
140 nThreads = rtl_str_toInt32(pEnv, 10);
143 nThreads = std::min(nHardThreads, nThreads);
144 return std::max<sal_Int32>(nThreads, 1);
157 std::unique_lock< std::mutex > aGuard(
maMutex );
165 std::unique_ptr<ThreadTask> pTask;
168 std::shared_ptr<ThreadTaskTag> pTag(pTask->mpTag);
170 pTag->onTaskWorkerDone();
196 while (!aWorkers.empty())
211 std::scoped_lock< std::mutex > aGuard(
maMutex );
222 pTask->mpTag->onTaskPushed();
234 std::unique_ptr<ThreadTask> pTask = std::move(
maTasks.back());
261 #if defined DBG_UTIL && (defined LINUX || defined _WIN32)
262 assert(!gbIsWorkerThread &&
"cannot wait for tasks from inside a task");
265 std::unique_lock< std::mutex > aGuard(
maMutex );
269 while (!rTag->isDone())
271 std::unique_ptr<ThreadTask> pTask =
popWorkLocked(aGuard,
false);
274 std::shared_ptr<ThreadTaskTag> pTag(pTask->mpTag);
276 pTag->onTaskWorkerDone();
281 rTag->waitUntilDone();
289 std::unique_lock< std::mutex > aGuard(
maMutex );
298 return std::make_shared<ThreadTaskTag>();
303 return pTag->isDone();
316 catch (
const std::exception &e)
318 SAL_WARN(
"comphelper",
"exception in thread worker while calling doWork(): " << e.what());
320 catch (
const css::uno::Exception &e)
322 SAL_WARN(
"comphelper",
"exception in thread worker while calling doWork(): " << e);
326 SAL_WARN(
"comphelper",
"unknown exception in thread worker while calling doWork()");
336 std::scoped_lock< std::mutex > aGuard(
maMutex );
343 std::scoped_lock< std::mutex > aGuard(
maMutex );
352 std::scoped_lock< std::mutex > aGuard(
maMutex );
358 std::unique_lock< std::mutex > aGuard(
maMutex );
361 #if defined DBG_UTIL && !defined NDEBUG
365 int maxTimeout = 10 * 60;
366 #if !ENABLE_RUNTIME_OPTIMIZATIONS
367 maxTimeout = 30 * 60;
369 #if defined HAVE_VALGRIND_HEADERS
370 if( RUNNING_ON_VALGRIND )
371 maxTimeout = 30 * 60;
374 maxTimeout = 300 * 60;
376 aGuard, std::chrono::seconds( maxTimeout ));
377 assert(result != std::cv_status::timeout);
381 aGuard, std::chrono::seconds( 10 * 60 )) == std::cv_status::timeout)
382 throw std::runtime_error(
"timeout waiting for threadpool tasks");
std::vector< std::unique_ptr< ThreadTask > > maTasks
A very basic thread-safe thread pool implementation.
static sal_Int32 getPreferredConcurrency()
returns a configurable max-concurrency limit to avoid spawning an unnecessarily large number of threa...
virtual void doWork()=0
override to get your task performed by the pool
static thread_local bool gbIsWorkerThread
prevent waiting for a task from inside a task
void waitUntilDone(const std::shared_ptr< ThreadTaskTag > &, bool bJoin=true)
Wait until all queued tasks associated with the tag are completed.
ThreadWorker(ThreadPool *pPool)
std::condition_variable maTasksComplete
const BorderLinePrimitive2D *pCandidateB assert(pCandidateA)
std::mutex maMutex
signalled when all in-progress tasks are complete
ThreadPool(sal_Int32 nWorkers)
static ThreadPool & getSharedOptimalPool()
returns a pointer to a shared pool with optimal thread count for the CPU
ThreadTask(const std::shared_ptr< ThreadTaskTag > &pTag)
bool isIdle() const
return true if there are no queued or worked-on tasks
std::condition_variable maTasksChanged
void pushTask(std::unique_ptr< ThreadTask > pTask)
push a new task onto the work queue
static bool isTaskTagDone(const std::shared_ptr< ThreadTaskTag > &)
friend class ThreadWorker
virtual void execute() override
std::vector< rtl::Reference< ThreadWorker > > maWorkers
static PropertyMapEntry const * find(rtl::Reference< PropertySetInfo > &mxInfo, const OUString &aName)
void exec()
execute this task
std::unique_ptr< ThreadTask > popWorkLocked(std::unique_lock< std::mutex > &rGuard, bool bWait)
Pop a work task.
void shutdown()
wait until all work is completed, then join all threads
static std::shared_ptr< ThreadTaskTag > createThreadTaskTag()
std::size_t mnBusyWorkers
#define SAL_WARN(area, stream)
void shutdownLocked(std::unique_lock< std::mutex > &)
std::size_t const mnMaxWorkers
bool isDebuggerAttached()
Returns true if the process is running with a debugger attached.
void joinThreadsIfIdle()
join all threads if there are no tasks presently.