LibreOffice Module sc (master) 1
datastream.cxx
Go to the documentation of this file.
1/* -*- Mode: C++; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */
2/*
3 * This file is part of the LibreOffice project.
4 *
5 * This Source Code Form is subject to the terms of the Mozilla Public
6 * License, v. 2.0. If a copy of the MPL was not distributed with this
7 * file, You can obtain one at http://mozilla.org/MPL/2.0/.
8 */
9
10#include <datastream.hxx>
11#include <datastreamgettime.hxx>
12
13#include <com/sun/star/frame/XLayoutManager.hpp>
14#include <osl/conditn.hxx>
15#include <osl/time.h>
16#include <salhelper/thread.hxx>
17#include <sfx2/viewfrm.hxx>
18#include <tools/stream.hxx>
19#include <vcl/svapp.hxx>
20#include <docsh.hxx>
21#include <tabvwsh.hxx>
22#include <viewdata.hxx>
23#include <stringutil.hxx>
24#include <documentlinkmgr.hxx>
25#include <o3tl/enumarray.hxx>
26
27#include <officecfg/Office/Calc.hxx>
28
29#include <orcus/csv_parser.hpp>
30
31#include <atomic>
32#include <queue>
33
34namespace com::sun::star::ui { class XUIElement; }
35
36namespace sc {
37
39
41{
42 return fTimes[ nIdx ];
43}
44
45namespace {
46
47double getNow()
48{
49 TimeValue now;
50 osl_getSystemTime(&now);
51 return static_cast<double>(now.Seconds) + static_cast<double>(now.Nanosec) / 1000000000.0;
52}
53
54class CSVHandler
55{
56 DataStream::Line& mrLine;
57 size_t mnColCount;
58 size_t mnCols;
59 const char* mpLineHead;
60
61public:
62 CSVHandler( DataStream::Line& rLine, size_t nColCount ) :
63 mrLine(rLine), mnColCount(nColCount), mnCols(0), mpLineHead(rLine.maLine.getStr()) {}
64
65 static void begin_parse() {}
66 static void end_parse() {}
67 static void begin_row() {}
68 static void end_row() {}
69
70 void cell(std::string_view s, bool /*transient*/)
71 {
72 if (mnCols >= mnColCount)
73 return;
74
75 DataStream::Cell aCell;
76 if (ScStringUtil::parseSimpleNumber(s.data(), s.size(), '.', ',', aCell.mfValue))
77 {
78 aCell.mbValue = true;
79 }
80 else
81 {
82 aCell.mbValue = false;
83 aCell.maStr.Pos = std::distance(mpLineHead, s.data());
84 aCell.maStr.Size = s.size();
85 }
86 mrLine.maCells.push_back(aCell);
87
88 ++mnCols;
89 }
90};
91
92}
93
94namespace datastreams {
95
97{
98 std::unique_ptr<SvStream> mpStream;
99 size_t mnColCount;
100 std::atomic<bool> mbTerminate;
101
102 std::queue<DataStream::LinesType> maPendingLines;
103 std::queue<DataStream::LinesType> maUsedLines;
104 std::mutex maMtxLines;
105
106 osl::Condition maCondReadStream;
107 osl::Condition maCondConsume;
108
109 orcus::csv::parser_config maConfig;
110
111public:
112
113 ReaderThread(std::unique_ptr<SvStream> pData, size_t nColCount):
114 Thread("ReaderThread"),
115 mpStream(std::move(pData)),
116 mnColCount(nColCount),
117 mbTerminate(false)
118 {
119 maConfig.delimiters.push_back(',');
120 maConfig.text_qualifier = '"';
121 }
122
124 {
125 return mbTerminate.load();
126 }
127
129 {
130 mbTerminate.store(true);
131 }
132
134 {
136 maCondReadStream.set();
137 }
138
140 {
141 maCondConsume.wait();
142 maCondConsume.reset();
143 }
144
146 {
147 auto pLines = std::move(maPendingLines.front());
148 maPendingLines.pop();
149 return pLines;
150 }
151
153 {
154 if (maPendingLines.size() <= 4)
155 maCondReadStream.set(); // start producer again
156 }
157
158 bool hasNewLines() const
159 {
160 return !maPendingLines.empty();
161 }
162
164 {
165 maUsedLines.push(std::move(pLines));
166 }
167
168 std::mutex& getLinesMutex()
169 {
170 return maMtxLines;
171 }
172
173private:
174 virtual void execute() override
175 {
176 while (!isTerminateRequested())
177 {
178 std::optional<DataStream::LinesType> oLines;
179 std::unique_lock aGuard(maMtxLines);
180
181 if (!maUsedLines.empty())
182 {
183 // Re-use lines from previous runs.
184 oLines = std::move(maUsedLines.front());
185 maUsedLines.pop();
186 aGuard.unlock(); // unlock
187 }
188 else
189 {
190 aGuard.unlock(); // unlock
191 oLines.emplace(10);
192 }
193
194 // Read & store new lines from stream.
195 for (DataStream::Line & rLine : *oLines)
196 {
197 rLine.maCells.clear();
198 mpStream->ReadLine(rLine.maLine);
199 CSVHandler aHdl(rLine, mnColCount);
200 orcus::csv_parser<CSVHandler> parser(rLine.maLine, aHdl, maConfig);
201 parser.parse();
202 }
203
204 aGuard.lock(); // lock
205 while (!isTerminateRequested() && maPendingLines.size() >= 8)
206 {
207 // pause reading for a bit
208 aGuard.unlock(); // unlock
209 maCondReadStream.wait();
210 maCondReadStream.reset();
211 aGuard.lock(); // lock
212 }
213 maPendingLines.push(std::move(*oLines));
214 maCondConsume.set();
215 if (!mpStream->good())
217 }
218 }
219};
220
221}
222
223DataStream::Cell::Cell() : mfValue(0.0), mbValue(true) {}
224
225DataStream::Cell::Cell( const Cell& r ) : mbValue(r.mbValue)
226{
227 if (r.mbValue)
228 mfValue = r.mfValue;
229 else
230 {
231 maStr.Pos = r.maStr.Pos;
232 maStr.Size = r.maStr.Size;
233 }
234}
235
237{
238 ScViewData* pViewData = ScDocShell::GetViewData();
239 if (!pViewData)
240 return;
241
242 css::uno::Reference< css::frame::XFrame > xFrame =
244 if (!xFrame.is())
245 return;
246
247 css::uno::Reference< css::beans::XPropertySet > xPropSet(xFrame, css::uno::UNO_QUERY);
248 if (!xPropSet.is())
249 return;
250
251 css::uno::Reference< css::frame::XLayoutManager > xLayoutManager;
252 xPropSet->getPropertyValue("LayoutManager") >>= xLayoutManager;
253 if (!xLayoutManager.is())
254 return;
255
256 static constexpr OUStringLiteral sResourceURL( u"private:resource/toolbar/datastreams" );
257 css::uno::Reference< css::ui::XUIElement > xUIElement = xLayoutManager->getElement(sResourceURL);
258 if (!xUIElement.is())
259 {
260 xLayoutManager->createElement( sResourceURL );
261 xLayoutManager->showElement( sResourceURL );
262 }
263}
264
266 ScDocShell *pShell, const OUString& rURL, const ScRange& rRange,
267 sal_Int32 nLimit, MoveType eMove, sal_uInt32 nSettings)
268{
269 DataStream* pLink = new DataStream(pShell, rURL, rRange, nLimit, eMove, nSettings);
271 rMgr.setDataStream(pLink);
272 return pLink;
273}
274
275DataStream::DataStream(ScDocShell *pShell, const OUString& rURL, const ScRange& rRange,
276 sal_Int32 nLimit, MoveType eMove, sal_uInt32 nSettings) :
277 mpDocShell(pShell),
278 maDocAccess(mpDocShell->GetDocument()),
281 mbRunning(false),
282 mbValuesInLine(false),
284 mnLinesCount(0),
287 mnCurRow(0),
288 maImportTimer("sc DataStream maImportTimer"),
289 mbIsFirst(true),
290 mbIsUpdate(false)
291{
293 maImportTimer.SetInvokeHandler( LINK(this, DataStream, ImportTimerHdl) );
294
295 Decode(rURL, rRange, nLimit, eMove, nSettings);
296}
297
299{
300 if (mbRunning)
301 StopImport();
302
303 if (mxReaderThread.is())
304 {
305 mxReaderThread->endThread();
306 mxReaderThread->join();
307 }
308 moLines.reset();
309}
310
312{
313 if (!moLines || mnLinesCount >= moLines->size())
314 {
315 mnLinesCount = 0;
316 if (mxReaderThread->isTerminateRequested())
317 return Line();
318
319 std::unique_lock aGuard(mxReaderThread->getLinesMutex());
320 if (moLines)
321 {
322 mxReaderThread->pushUsedLines(std::move(*moLines));
323 moLines.reset();
324 }
325
326 while (!mxReaderThread->hasNewLines())
327 {
328 aGuard.unlock(); // unlock
329 mxReaderThread->waitForNewLines();
330 aGuard.lock(); // lock
331 }
332
333 moLines = mxReaderThread->popNewLines();
334 mxReaderThread->resumeReadStream();
335 }
336 return moLines->at(mnLinesCount++);
337}
338
340{
341 ScRange aRange = maStartRange;
342 aRange.aEnd = maEndRange.aEnd;
343 return aRange;
344}
345
346void DataStream::Decode(const OUString& rURL, const ScRange& rRange,
347 sal_Int32 nLimit, MoveType eMove, const sal_uInt32 nSettings)
348{
349 msURL = rURL;
350 meMove = eMove;
351 meOrigMove = eMove;
352 mnSettings = nSettings;
353
354 mbValuesInLine = true; // always true.
355
356 mnCurRow = rRange.aStart.Row();
357
358 ScRange aRange = rRange;
359 if (aRange.aStart.Row() != aRange.aEnd.Row())
360 // We only allow this range to be one row tall.
361 aRange.aEnd.SetRow(aRange.aStart.Row());
362
363 maStartRange = aRange;
364 maEndRange = aRange;
365 const auto & rDoc = mpDocShell->GetDocument();
366 if (nLimit == 0)
367 {
368 // Unlimited
369 maEndRange.aStart.SetRow(rDoc.MaxRow());
370 }
371 else if (nLimit > 0)
372 {
373 // Limited.
374 maEndRange.aStart.IncRow(nLimit-1);
375 if (maEndRange.aStart.Row() > rDoc.MaxRow())
376 maEndRange.aStart.SetRow(rDoc.MaxRow());
377 }
378
380}
381
383{
384 if (mbRunning)
385 return;
386
387 if (!mxReaderThread.is())
388 {
389 std::unique_ptr<SvStream> pStream(new SvFileStream(msURL, StreamMode::READ));
391 mxReaderThread->launch();
392 }
393 mbRunning = true;
395
397}
398
400{
401 if (!mbRunning)
402 return;
403
404 mbRunning = false;
405 Refresh();
407}
408
410{
412}
413
415{
417
418 double fStart = getNow();
419
420 // Hard recalc will repaint the grid area.
423
424 fTimes[ DebugTime::Recalc ] = getNow() - fStart;
425
426 mfLastRefreshTime = getNow();
428}
429
431{
432 switch (meMove)
433 {
434 case RANGE_DOWN:
435 {
436 if (mnCurRow == maEndRange.aStart.Row())
437 meMove = MOVE_UP;
438 }
439 break;
440 case MOVE_UP:
441 {
442 mbIsUpdate = true;
443 // Remove the top row and shift the remaining rows upward. Then
444 // insert a new row at the end row position.
445 ScRange aRange = maStartRange;
446 aRange.aEnd = maEndRange.aEnd;
448 }
449 break;
450 case MOVE_DOWN:
451 {
452 mbIsUpdate = true;
453 // Remove the end row and shift the remaining rows downward by
454 // inserting a new row at the top row.
455 ScRange aRange = maStartRange;
456 aRange.aEnd = maEndRange.aEnd;
458 }
459 break;
460 case NO_MOVE:
461 default:
462 ;
463 }
464 if(mbIsFirst && mbIsUpdate)
465 {
466 sal_Int32 nStreamTimeout = officecfg::Office::Calc::DataStream::UpdateTimeout::get();
467 maImportTimer.SetTimeout(nStreamTimeout);
468 mbIsFirst = false;
469 }
470}
471
473{
474 Line aLine = ConsumeLine();
475 if (aLine.maCells.empty() && mbRefreshOnEmptyLine)
476 {
477 // Empty line detected. Trigger refresh and discard it.
478 Refresh();
479 return;
480 }
481
482 double fStart = getNow();
483
484 MoveData();
485 {
486 SCCOL nCol = maStartRange.aStart.Col();
487 const char* pLineHead = aLine.maLine.getStr();
488 for (const Cell& rCell : aLine.maCells)
489 {
490 if (rCell.mbValue)
491 {
494 }
495 else
496 {
499 OUString(pLineHead+rCell.maStr.Pos, rCell.maStr.Size, RTL_TEXTENCODING_UTF8));
500 }
501 ++nCol;
502 }
503 }
504
505 fTimes[ DebugTime::Import ] = getNow() - fStart;
506
507 if (meMove == NO_MOVE)
508 return;
509
510 if (meMove == RANGE_DOWN)
511 {
512 ++mnCurRow;
513// mpDocShell->GetViewData().GetView()->AlignToCursor(
514// maStartRange.aStart.Col(), mnCurRow, SC_FOLLOW_JUMP);
515 }
516
517 if (getNow() - mfLastRefreshTime > 0.1 && mnLinesSinceRefresh > 200)
518 // Refresh no more frequently than every 0.1 second, and wait until at
519 // least we have processed 200 lines.
520 Refresh();
521
523}
524
526{
527 if (!mbValuesInLine)
528 // We no longer support this mode. To be deleted later.
529 return false;
530
531 ScViewData* pViewData = ScDocShell::GetViewData();
532 if (!pViewData)
533 return false;
534
535 if (pViewData->GetViewShell()->NeedsRepaint())
536 return mbRunning;
537
538 Text2Doc();
539 return mbRunning;
540}
541
542IMPL_LINK_NOARG(DataStream, ImportTimerHdl, Timer *, void)
543{
544 if (ImportData())
545 maImportTimer.Start();
546}
547
548} // namespace sc
549
550/* vim:set shiftwidth=4 softtabstop=4 expandtab: */
basegfx::B2DPolygon maLine
static void Yield()
SCTAB Tab() const
Definition: address.hxx:283
SCROW Row() const
Definition: address.hxx:274
void SetRow(SCROW nRowP)
Definition: address.hxx:287
void IncRow(SCROW nDelta=1)
Definition: address.hxx:312
SCCOL Col() const
Definition: address.hxx:279
void DoHardRecalc()
Definition: docsh4.cxx:1530
void SetDocumentModified()
Definition: docsh.cxx:2982
const ScDocument & GetDocument() const
Definition: docsh.hxx:219
static ScViewData * GetViewData()
Definition: docsh4.cxx:2607
sc::DocumentLinkManager & GetDocLinkManager()
Definition: documen2.cxx:241
ScAddress aEnd
Definition: address.hxx:498
ScAddress aStart
Definition: address.hxx:497
static bool parseSimpleNumber(const OUString &rStr, sal_Unicode dsep, sal_Unicode gsep, sal_Unicode dsepa, double &rVal, bool bDetectScientificNumber=true)
Check if a given string is a simple decimal number (e.g.
Definition: stringutil.cxx:55
bool NeedsRepaint()
Definition: tabview3.cxx:3122
ScTabViewShell * GetViewShell() const
Definition: viewdata.hxx:357
const css::uno::Reference< css::frame::XFrame > & GetFrameInterface() const
SfxFrame & GetFrame() const
SfxViewFrame & GetViewFrame() const
void Stop()
void SetTimeout(sal_uInt64 nTimeoutMs)
void SetInvokeHandler(const Link< Timer *, void > &rLink)
virtual void Start(bool bStartTimer=true) override
Thread(char const *name)
double mfLastRefreshTime
Definition: datastream.hxx:113
size_t mnLinesSinceRefresh
Definition: datastream.hxx:112
void StartImport()
Definition: datastream.cxx:382
std::vector< Line > LinesType
Definition: datastream.hxx:62
DocumentStreamAccess maDocAccess
Definition: datastream.hxx:102
sal_uInt32 mnSettings
Definition: datastream.hxx:104
OUString msURL
Definition: datastream.hxx:103
bool mbRefreshOnEmptyLine
Definition: datastream.hxx:109
static void MakeToolbarVisible()
Definition: datastream.cxx:236
ScRange maStartRange
Definition: datastream.hxx:115
ScRange maEndRange
Definition: datastream.hxx:116
void Decode(const OUString &rURL, const ScRange &rRange, sal_Int32 nLimit, MoveType eMove, const sal_uInt32 nSettings)
Definition: datastream.cxx:346
ScDocShell * mpDocShell
Definition: datastream.hxx:101
MoveType meOrigMove
Definition: datastream.hxx:105
MoveType meMove
Definition: datastream.hxx:106
static DataStream * Set(ScDocShell *pShell, const OUString &rURL, const ScRange &rRange, sal_Int32 nLimit, MoveType eMove, sal_uInt32 nSettings)
Definition: datastream.cxx:265
void SetRefreshOnEmptyLine(bool bVal)
Definition: datastream.cxx:409
rtl::Reference< datastreams::ReaderThread > mxReaderThread
Definition: datastream.hxx:120
DataStream(const DataStream &)=delete
std::optional< LinesType > moLines
Definition: datastream.hxx:110
Line ConsumeLine()
Definition: datastream.cxx:311
size_t mnLinesCount
Definition: datastream.hxx:111
ScRange GetRange() const
Definition: datastream.cxx:339
void setDataStream(DataStream *p)
void reset()
Clear its internal state, and more importantly all the block position hints currently held.
void shiftRangeUp(const ScRange &rRange)
Pop the top row inside specified range, shift all the other rows up by one, then set the bottom row e...
void setStringCell(const ScAddress &rPos, const OUString &rStr)
void setNumericCell(const ScAddress &rPos, double fVal)
void shiftRangeDown(const ScRange &rRange)
Top the bottom row inside specified range, shift all the other rows above downward by one by insertin...
std::atomic< bool > mbTerminate
Definition: datastream.cxx:100
std::queue< DataStream::LinesType > maPendingLines
Definition: datastream.cxx:102
virtual void execute() override
Definition: datastream.cxx:174
std::queue< DataStream::LinesType > maUsedLines
Definition: datastream.cxx:103
ReaderThread(std::unique_ptr< SvStream > pData, size_t nColCount)
Definition: datastream.cxx:113
void pushUsedLines(DataStream::LinesType pLines)
Definition: datastream.cxx:163
orcus::csv::parser_config maConfig
Definition: datastream.cxx:109
DataStream::LinesType popNewLines()
Definition: datastream.cxx:145
std::unique_ptr< SvStream > mpStream
Definition: datastream.cxx:98
float u
SvBaseLink * pLink
DateTime now
std::unique_ptr< sal_Int32[]> pData
parser
CAUTION! The following defines must be in the same namespace as the respective type.
Definition: broadcast.cxx:15
IMPL_LINK_NOARG(SharedStringPoolPurge, timerHandler, Timer *, void)
static o3tl::enumarray< DebugTime, double > fTimes
Definition: datastream.cxx:38
double datastream_get_time(DebugTime nIdx)
Definition: datastream.cxx:40
std::vector< Cell > maCells
Definition: datastream.hxx:60
Reference< XFrame > xFrame
sal_Int16 SCCOL
Definition: types.hxx:21