LibreOffice Module comphelper (master) 1
parallelsort.hxx
Go to the documentation of this file.
1/* -*- Mode: C++; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */
2/*
3 * This file is part of the LibreOffice project.
4 *
5 * This Source Code Form is subject to the terms of the Mozilla Public
6 * License, v. 2.0. If a copy of the MPL was not distributed with this
7 * file, You can obtain one at http://mozilla.org/MPL/2.0/.
8 */
9
10#ifndef INCLUDED_COMPHELPER_PARALLELSORT_HXX
11#define INCLUDED_COMPHELPER_PARALLELSORT_HXX
12
14#include <tools/cpuid.hxx>
15
16#include <memory>
17#include <iterator>
18#include <thread>
19#include <algorithm>
20#include <cmath>
21#include <random>
22#include <functional>
23#include <iostream>
24#include <chrono>
25
26namespace comphelper
27{
28const size_t nThreadCountGlobal = std::thread::hardware_concurrency();
31
32static thread_local std::mt19937 aGenerator{ std::random_device{}() };
33
34#define PARALLELSORT_ENABLEPZ 0
35
36namespace
37{
38class ProfileZone
39{
40public:
41#if PARALLELSORT_ENABLEPZ
42 ProfileZone(const char* pTag)
43 : maTag(pTag)
44 , maStart(std::chrono::steady_clock::now())
45 , mbFinished(false)
46 {
47 }
48
49 ~ProfileZone()
50 {
51 if (!mbFinished)
52 showTimeElapsed();
53 }
54
55 void stop()
56 {
57 showTimeElapsed();
58 mbFinished = true;
59 }
60#else
61 ProfileZone(const char* /*pTag*/)
62 : mbDummy(true)
63 {
64 }
65
66 void stop()
67 {
68 // Avoid loplugin:staticmethods, loplugin:staticaccess errors
69 (void)mbDummy;
70 }
71#endif
72
73private:
74#if PARALLELSORT_ENABLEPZ
75
76 void showTimeElapsed()
77 {
78 auto end = std::chrono::steady_clock::now();
79 size_t elapsed
80 = std::chrono::duration_cast<std::chrono::milliseconds>(end - maStart).count();
81 std::cout << maTag << " : " << elapsed << " ms" << std::endl << std::flush;
82 }
83
84 std::string maTag;
85 std::chrono::steady_clock::time_point maStart;
86 bool mbFinished;
87#else
88 bool mbDummy;
89
90#endif
91};
92
93class ParallelRunner
94{
95 class Executor final : public comphelper::ThreadTask
96 {
97 public:
98 Executor(const std::shared_ptr<comphelper::ThreadTaskTag>& rTag,
99 std::function<void()> aFunc)
101 , maFunc(std::move(aFunc))
102 {
103 }
104
105 virtual void doWork() override { maFunc(); }
106
107 private:
108 const std::function<void()> maFunc;
109 };
110
111public:
112 ParallelRunner() { maTag = comphelper::ThreadPool::createThreadTaskTag(); }
113
114 void enqueue(std::function<void()> aFunc)
115 {
116 rTPool.pushTask(std::make_unique<Executor>(maTag, aFunc));
117 }
118
119 void wait() { rTPool.waitUntilDone(maTag, false); }
120
121private:
122 std::shared_ptr<comphelper::ThreadTaskTag> maTag;
123};
124
125constexpr size_t nMaxTreeArraySize = 64;
126
127size_t lcl_round_down_pow2(size_t nNum)
128{
129 size_t nPow2;
130 for (nPow2 = 1; nPow2 <= nNum; nPow2 <<= 1)
131 ;
132 return std::min((nPow2 >> 1), nMaxTreeArraySize);
133}
134
135template <class RandItr> struct Sampler
136{
137 using ValueType = typename std::iterator_traits<RandItr>::value_type;
138
139 static void sample(RandItr aBegin, RandItr aEnd, ValueType* pSamples, size_t nSamples,
140 size_t /*nParallelism*/)
141 {
142 ProfileZone aZone("\tsample()");
143 assert(aBegin <= aEnd);
144 size_t nLen = static_cast<std::size_t>(aEnd - aBegin);
145 assert(std::mt19937::max() >= nLen);
146
147 for (size_t nIdx = 0; nIdx < nSamples; ++nIdx)
148 {
149 size_t nSel = aGenerator() % nLen--;
150 using namespace std;
151 swap(*(aBegin + nSel), *(aBegin + nLen));
152 pSamples[nIdx] = *(aBegin + nLen);
153 }
154 }
155};
156
157template <class RandItr, class Compare> class Binner
158{
159 using ValueType = typename std::iterator_traits<RandItr>::value_type;
160
161 const size_t mnTreeArraySize;
162 const size_t mnDividers;
163 constexpr static size_t mnMaxStaticSize = 1024 * 50;
165 ValueType maDividers[nMaxTreeArraySize];
166 std::unique_ptr<uint8_t[]> pLabels;
167 size_t maSepBinEnds[nMaxTreeArraySize * nMaxTreeArraySize];
169
170public:
171 size_t maBinEnds[nMaxTreeArraySize];
172
173 Binner(const ValueType* pSamples, size_t nSamples, size_t nBins, bool bThreaded)
174 : mnTreeArraySize(lcl_round_down_pow2(nBins))
176 , mbThreaded(bThreaded)
177 {
178 assert((nSamples % mnTreeArraySize) == 0);
179 assert(mnTreeArraySize <= nMaxTreeArraySize);
180 std::fill(maBinEnds, maBinEnds + mnTreeArraySize, 0);
182 fillTreeArray(1, pSamples, pSamples + nSamples);
183 }
184
185 void fillTreeArray(size_t nPos, const ValueType* pLow, const ValueType* pHigh)
186 {
187 assert(pLow <= pHigh);
188 const ValueType* pMid = pLow + (pHigh - pLow) / 2;
189 maDividers[nPos] = *pMid;
190
191 if (2 * nPos < mnDividers) // So that 2*nPos < mnTreeArraySize
192 {
193 fillTreeArray(2 * nPos, pLow, pMid);
194 fillTreeArray(2 * nPos + 1, pMid + 1, pHigh);
195 }
196 }
197
198 constexpr inline size_t findBin(const ValueType& rVal, Compare& aComp)
199 {
200 size_t nIdx = 1;
201 while (nIdx <= mnDividers)
202 nIdx = ((nIdx << 1) + aComp(maDividers[nIdx], rVal));
203 return (nIdx - mnTreeArraySize);
204 }
205
206 void label(const RandItr aBegin, const RandItr aEnd, Compare& aComp)
207 {
208 ProfileZone aZoneSetup("\tlabel():setup");
209 size_t nLen = static_cast<std::size_t>(aEnd - aBegin);
210 if (nLen > mnMaxStaticSize)
211 pLabels = std::make_unique<uint8_t[]>(nLen);
212 uint8_t* pLabelsRaw = (nLen > mnMaxStaticSize) ? pLabels.get() : maLabels;
213 aZoneSetup.stop();
214 ProfileZone aZoneFindBins("\tFindBins()");
215 if (mbThreaded)
216 {
217 ParallelRunner aPRunner;
218 const size_t nBins = mnTreeArraySize;
219 for (size_t nTIdx = 0; nTIdx < nBins; ++nTIdx)
220 {
221 aPRunner.enqueue([this, nTIdx, nBins, nLen, aBegin, pLabelsRaw, &aComp] {
222 ProfileZone aZoneIn("\t\tFindBinsThreaded()");
223 size_t nBinEndsStartIdx = nTIdx * mnTreeArraySize;
224 size_t* pBinEnds = maSepBinEnds + nBinEndsStartIdx;
225 size_t aBinEndsF[nMaxTreeArraySize] = { 0 };
226 for (size_t nIdx = nTIdx; nIdx < nLen; nIdx += nBins)
227 {
228 size_t nBinIdx = findBin(*(aBegin + nIdx), aComp);
229 pLabelsRaw[nIdx] = static_cast<uint8_t>(nBinIdx);
230 ++aBinEndsF[nBinIdx];
231 }
232
233 for (size_t nIdx = 0; nIdx < mnTreeArraySize; ++nIdx)
234 pBinEnds[nIdx] = aBinEndsF[nIdx];
235 });
236 }
237
238 aPRunner.wait();
239
240 // Populate maBinEnds from maSepBinEnds
241 for (size_t nTIdx = 0; nTIdx < mnTreeArraySize; ++nTIdx)
242 {
243 for (size_t nSepIdx = 0; nSepIdx < mnTreeArraySize; ++nSepIdx)
244 maBinEnds[nTIdx] += maSepBinEnds[nSepIdx * mnTreeArraySize + nTIdx];
245 }
246 }
247 else
248 {
249 uint8_t* pLabel = pLabelsRaw;
250 for (RandItr aItr = aBegin; aItr != aEnd; ++aItr)
251 {
252 size_t nBinIdx = findBin(*aItr, aComp);
253 *pLabel++ = nBinIdx;
254 ++maBinEnds[nBinIdx];
255 }
256 }
257
258 aZoneFindBins.stop();
259
260 size_t nSum = 0;
261 // Store each bin's starting position in maBinEnds array for now.
262 for (size_t nIdx = 0; nIdx < mnTreeArraySize; ++nIdx)
263 {
264 size_t nSize = maBinEnds[nIdx];
265 maBinEnds[nIdx] = nSum;
266 nSum += nSize;
267 }
268
269 // Now maBinEnds has end positions of each bin.
270 }
271
272 void bin(const RandItr aBegin, const RandItr aEnd, ValueType* pOut)
273 {
274 ProfileZone aZone("\tbin()");
275 const size_t nLen = static_cast<std::size_t>(aEnd - aBegin);
276 uint8_t* pLabelsRaw = (nLen > mnMaxStaticSize) ? pLabels.get() : maLabels;
277 size_t nIdx;
278 for (nIdx = 0; nIdx < nLen; ++nIdx)
279 {
280 pOut[maBinEnds[pLabelsRaw[nIdx]]++] = *(aBegin + nIdx);
281 }
282 }
283};
284
285template <class RandItr, class Compare = std::less<>>
286void s3sort(const RandItr aBegin, const RandItr aEnd, Compare aComp = Compare(),
287 bool bThreaded = true)
288{
289 static size_t nThreadCount = nThreadCountGlobal;
290
291 constexpr size_t nBaseCaseSize = 1024;
292 const std::size_t nLen = static_cast<std::size_t>(aEnd - aBegin);
293 if (nLen < nBaseCaseSize)
294 {
295 std::stable_sort(aBegin, aEnd, aComp);
296 return;
297 }
298
299 using ValueType = typename std::iterator_traits<RandItr>::value_type;
300 auto pOut = std::make_unique<ValueType[]>(nLen);
301
302 const size_t nBins = lcl_round_down_pow2(nThreadCount);
303 const size_t nOverSamplingFactor = std::max(1.0, std::sqrt(static_cast<double>(nLen) / 64));
304 const size_t nSamples = nOverSamplingFactor * nBins;
305 auto aSamples = std::make_unique<ValueType[]>(nSamples);
306 ProfileZone aZoneSampleAnsSort("SampleAndSort");
307 // Select samples and sort them
308 Sampler<RandItr>::sample(aBegin, aEnd, aSamples.get(), nSamples, nBins);
309 std::sort(aSamples.get(), aSamples.get() + nSamples, aComp);
310 aZoneSampleAnsSort.stop();
311
312 if (!aComp(aSamples[0], aSamples[nSamples - 1]))
313 {
314 // All samples are equal, fallback to standard sort.
315 std::sort(aBegin, aEnd, aComp);
316 return;
317 }
318
319 ProfileZone aZoneBinner("Binner");
320 // Create and populate bins using pOut from input iterators.
321 Binner<RandItr, Compare> aBinner(aSamples.get(), nSamples, nBins, bThreaded);
322 aBinner.label(aBegin, aEnd, aComp);
323 aBinner.bin(aBegin, aEnd, pOut.get());
324 aZoneBinner.stop();
325
326 ProfileZone aZoneSortBins("SortBins");
327 ValueType* pOutRaw = pOut.get();
328 if (bThreaded)
329 {
330 ParallelRunner aPRunner;
331 // Sort the bins separately.
332 for (size_t nBinIdx = 0, nBinStart = 0; nBinIdx < nBins; ++nBinIdx)
333 {
334 size_t nBinEnd = aBinner.maBinEnds[nBinIdx];
335 aPRunner.enqueue([pOutRaw, nBinStart, nBinEnd, &aComp] {
336 std::sort(pOutRaw + nBinStart, pOutRaw + nBinEnd, aComp);
337 });
338
339 nBinStart = nBinEnd;
340 }
341
342 aPRunner.wait();
343 }
344 else
345 {
346 for (size_t nBinIdx = 0, nBinStart = 0; nBinIdx < nBins; ++nBinIdx)
347 {
348 auto nBinEnd = aBinner.maBinEnds[nBinIdx];
349 std::sort(pOutRaw + nBinStart, pOutRaw + nBinEnd, aComp);
350 nBinStart = nBinEnd;
351 }
352 }
353
354 aZoneSortBins.stop();
355
356 // Move the sorted array to the array specified by input iterators.
357 std::move(pOutRaw, pOutRaw + nLen, aBegin);
358}
359
360} // anonymous namespace
361
362template <class RandItr, class Compare = std::less<>>
363void parallelSort(const RandItr aBegin, const RandItr aEnd, Compare aComp = Compare())
364{
365 assert(aBegin <= aEnd);
366 s3sort(aBegin, aEnd, aComp);
367}
368
369} // namespace comphelper
370
371#endif // INCLUDED_COMPHELPER_PARALLELSORT_HXX
372
373/* vim:set shiftwidth=4 softtabstop=4 expandtab: */
Point maStart
A very basic thread-safe thread pool implementation.
Definition: threadpool.hxx:45
static ThreadPool & getSharedOptimalPool()
returns a pointer to a shared pool with optimal thread count for the CPU
Definition: threadpool.cxx:129
void waitUntilDone(const std::shared_ptr< ThreadTaskTag > &, bool bJoin=true)
Wait until all queued tasks associated with the tag are completed.
Definition: threadpool.cxx:264
static std::shared_ptr< ThreadTaskTag > createThreadTaskTag()
Definition: threadpool.cxx:301
void pushTask(std::unique_ptr< ThreadTask > pTask)
push a new task onto the work queue
Definition: threadpool.cxx:214
sal_uInt16 nPos
def label(st)
def stop(arg=None)
static thread_local std::mt19937 aGenerator
const size_t nThreadCountGlobal
const bool bHyperThreadingActive
void parallelSort(const RandItr aBegin, const RandItr aEnd, Compare aComp=Compare())
static comphelper::ThreadPool & rTPool(comphelper::ThreadPool::getSharedOptimalPool())
bool hasHyperThreading()
void swap(cow_wrapper< T, P > &a, cow_wrapper< T, P > &b)
end
ValueType
uint8_t maLabels[mnMaxStaticSize]
const size_t mnDividers
static constexpr size_t mnMaxStaticSize
size_t maSepBinEnds[nMaxTreeArraySize *nMaxTreeArraySize]
const std::function< void()> maFunc
bool mbDummy
size_t maBinEnds[nMaxTreeArraySize]
std::unique_ptr< uint8_t[]> pLabels
bool mbThreaded
std::shared_ptr< comphelper::ThreadTaskTag > maTag
ValueType maDividers[nMaxTreeArraySize]
const size_t mnTreeArraySize
oslFileHandle & pOut