LibreOffice Module sc (master)  1
datastream.cxx
Go to the documentation of this file.
1 /* -*- Mode: C++; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */
2 /*
3  * This file is part of the LibreOffice project.
4  *
5  * This Source Code Form is subject to the terms of the Mozilla Public
6  * License, v. 2.0. If a copy of the MPL was not distributed with this
7  * file, You can obtain one at http://mozilla.org/MPL/2.0/.
8  */
9 
10 #include <datastream.hxx>
11 #include <datastreamgettime.hxx>
12 
13 #include <com/sun/star/frame/XLayoutManager.hpp>
14 #include <osl/conditn.hxx>
15 #include <osl/time.h>
16 #include <salhelper/thread.hxx>
17 #include <sfx2/viewfrm.hxx>
18 #include <tools/stream.hxx>
19 #include <vcl/svapp.hxx>
20 #include <docsh.hxx>
21 #include <tabvwsh.hxx>
22 #include <viewdata.hxx>
23 #include <stringutil.hxx>
24 #include <documentlinkmgr.hxx>
25 #include <o3tl/enumarray.hxx>
26 
27 #include <officecfg/Office/Calc.hxx>
28 
29 #include <orcus/csv_parser.hpp>
30 
31 #include <queue>
32 
33 namespace com::sun::star::ui { class XUIElement; }
34 
35 namespace sc {
36 
37 static o3tl::enumarray<DebugTime, double> fTimes { 0.0, 0.0, 0.0 };
38 
40 {
41  return fTimes[ nIdx ];
42 }
43 
44 namespace {
45 
46 double getNow()
47 {
48  TimeValue now;
49  osl_getSystemTime(&now);
50  return static_cast<double>(now.Seconds) + static_cast<double>(now.Nanosec) / 1000000000.0;
51 }
52 
53 class CSVHandler
54 {
55  DataStream::Line& mrLine;
56  size_t mnColCount;
57  size_t mnCols;
58  const char* mpLineHead;
59 
60 public:
61  CSVHandler( DataStream::Line& rLine, size_t nColCount ) :
62  mrLine(rLine), mnColCount(nColCount), mnCols(0), mpLineHead(rLine.maLine.getStr()) {}
63 
64  static void begin_parse() {}
65  static void end_parse() {}
66  static void begin_row() {}
67  static void end_row() {}
68 
69  void cell(const char* p, size_t n, bool /*transient*/)
70  {
71  if (mnCols >= mnColCount)
72  return;
73 
74  DataStream::Cell aCell;
75  if (ScStringUtil::parseSimpleNumber(p, n, '.', ',', aCell.mfValue))
76  {
77  aCell.mbValue = true;
78  }
79  else
80  {
81  aCell.mbValue = false;
82  aCell.maStr.Pos = std::distance(mpLineHead, p);
83  aCell.maStr.Size = n;
84  }
85  mrLine.maCells.push_back(aCell);
86 
87  ++mnCols;
88  }
89 };
90 
91 }
92 
93 namespace datastreams {
94 
96 {
97  std::unique_ptr<SvStream> mpStream;
98  size_t mnColCount;
100  osl::Mutex maMtxTerminate;
101 
102  std::queue<std::unique_ptr<DataStream::LinesType>> maPendingLines;
103  std::queue<std::unique_ptr<DataStream::LinesType>> maUsedLines;
104  osl::Mutex maMtxLines;
105 
106  osl::Condition maCondReadStream;
107  osl::Condition maCondConsume;
108 
109  orcus::csv::parser_config maConfig;
110 
111 public:
112 
113  ReaderThread(std::unique_ptr<SvStream> pData, size_t nColCount):
114  Thread("ReaderThread"),
115  mpStream(std::move(pData)),
116  mnColCount(nColCount),
117  mbTerminate(false)
118  {
119  maConfig.delimiters.push_back(',');
120  maConfig.text_qualifier = '"';
121  }
122 
124  {
125  osl::MutexGuard aGuard(maMtxTerminate);
126  return mbTerminate;
127  }
128 
130  {
131  osl::MutexGuard aGuard(maMtxTerminate);
132  mbTerminate = true;
133  }
134 
135  void endThread()
136  {
138  maCondReadStream.set();
139  }
140 
142  {
143  maCondConsume.wait();
144  maCondConsume.reset();
145  }
146 
147  std::unique_ptr<DataStream::LinesType> popNewLines()
148  {
149  auto pLines = std::move(maPendingLines.front());
150  maPendingLines.pop();
151  return pLines;
152  }
153 
155  {
156  if (maPendingLines.size() <= 4)
157  maCondReadStream.set(); // start producer again
158  }
159 
160  bool hasNewLines() const
161  {
162  return !maPendingLines.empty();
163  }
164 
165  void pushUsedLines( std::unique_ptr<DataStream::LinesType> pLines )
166  {
167  maUsedLines.push(std::move(pLines));
168  }
169 
170  osl::Mutex& getLinesMutex()
171  {
172  return maMtxLines;
173  }
174 
175 private:
176  virtual void execute() override
177  {
178  while (!isTerminateRequested())
179  {
180  std::unique_ptr<DataStream::LinesType> pLines;
181  osl::ResettableMutexGuard aGuard(maMtxLines);
182 
183  if (!maUsedLines.empty())
184  {
185  // Re-use lines from previous runs.
186  pLines = std::move(maUsedLines.front());
187  maUsedLines.pop();
188  aGuard.clear(); // unlock
189  }
190  else
191  {
192  aGuard.clear(); // unlock
193  pLines.reset(new DataStream::LinesType(10));
194  }
195 
196  // Read & store new lines from stream.
197  for (DataStream::Line & rLine : *pLines)
198  {
199  rLine.maCells.clear();
200  mpStream->ReadLine(rLine.maLine);
201  CSVHandler aHdl(rLine, mnColCount);
202  orcus::csv_parser<CSVHandler> parser(rLine.maLine.getStr(), rLine.maLine.getLength(), aHdl, maConfig);
203  parser.parse();
204  }
205 
206  aGuard.reset(); // lock
207  while (!isTerminateRequested() && maPendingLines.size() >= 8)
208  {
209  // pause reading for a bit
210  aGuard.clear(); // unlock
211  maCondReadStream.wait();
212  maCondReadStream.reset();
213  aGuard.reset(); // lock
214  }
215  maPendingLines.push(std::move(pLines));
216  maCondConsume.set();
217  if (!mpStream->good())
219  }
220  }
221 };
222 
223 }
224 
225 DataStream::Cell::Cell() : mfValue(0.0), mbValue(true) {}
226 
227 DataStream::Cell::Cell( const Cell& r ) : mbValue(r.mbValue)
228 {
229  if (r.mbValue)
230  mfValue = r.mfValue;
231  else
232  {
233  maStr.Pos = r.maStr.Pos;
234  maStr.Size = r.maStr.Size;
235  }
236 }
237 
239 {
240  css::uno::Reference< css::frame::XFrame > xFrame =
242  if (!xFrame.is())
243  return;
244 
245  css::uno::Reference< css::beans::XPropertySet > xPropSet(xFrame, css::uno::UNO_QUERY);
246  if (!xPropSet.is())
247  return;
248 
249  css::uno::Reference< css::frame::XLayoutManager > xLayoutManager;
250  xPropSet->getPropertyValue("LayoutManager") >>= xLayoutManager;
251  if (!xLayoutManager.is())
252  return;
253 
254  static const OUStringLiteral sResourceURL( u"private:resource/toolbar/datastreams" );
255  css::uno::Reference< css::ui::XUIElement > xUIElement = xLayoutManager->getElement(sResourceURL);
256  if (!xUIElement.is())
257  {
258  xLayoutManager->createElement( sResourceURL );
259  xLayoutManager->showElement( sResourceURL );
260  }
261 }
262 
264  ScDocShell *pShell, const OUString& rURL, const ScRange& rRange,
265  sal_Int32 nLimit, MoveType eMove, sal_uInt32 nSettings)
266 {
267  DataStream* pLink = new DataStream(pShell, rURL, rRange, nLimit, eMove, nSettings);
269  rMgr.setDataStream(pLink);
270  return pLink;
271 }
272 
273 DataStream::DataStream(ScDocShell *pShell, const OUString& rURL, const ScRange& rRange,
274  sal_Int32 nLimit, MoveType eMove, sal_uInt32 nSettings) :
275  mpDocShell(pShell),
276  maDocAccess(mpDocShell->GetDocument()),
278  meMove(NO_MOVE),
279  mbRunning(false),
280  mbValuesInLine(false),
281  mbRefreshOnEmptyLine(false),
282  mnLinesCount(0),
284  mfLastRefreshTime(0.0),
285  mnCurRow(0),
286  maImportTimer("sc DataStream maImportTimer"),
287  mbIsFirst(true),
288  mbIsUpdate(false)
289 {
291  maImportTimer.SetInvokeHandler( LINK(this, DataStream, ImportTimerHdl) );
292 
293  Decode(rURL, rRange, nLimit, eMove, nSettings);
294 }
295 
297 {
298  if (mbRunning)
299  StopImport();
300 
301  if (mxReaderThread.is())
302  {
303  mxReaderThread->endThread();
304  mxReaderThread->join();
305  }
306  mpLines.reset();
307 }
308 
310 {
311  if (!mpLines || mnLinesCount >= mpLines->size())
312  {
313  mnLinesCount = 0;
314  if (mxReaderThread->isTerminateRequested())
315  return Line();
316 
317  osl::ResettableMutexGuard aGuard(mxReaderThread->getLinesMutex());
318  if (mpLines)
319  mxReaderThread->pushUsedLines(std::move(mpLines));
320 
321  while (!mxReaderThread->hasNewLines())
322  {
323  aGuard.clear(); // unlock
324  mxReaderThread->waitForNewLines();
325  aGuard.reset(); // lock
326  }
327 
328  mpLines = mxReaderThread->popNewLines();
329  mxReaderThread->resumeReadStream();
330  }
331  return mpLines->at(mnLinesCount++);
332 }
333 
335 {
336  ScRange aRange = maStartRange;
337  aRange.aEnd = maEndRange.aEnd;
338  return aRange;
339 }
340 
341 void DataStream::Decode(const OUString& rURL, const ScRange& rRange,
342  sal_Int32 nLimit, MoveType eMove, const sal_uInt32 nSettings)
343 {
344  msURL = rURL;
345  meMove = eMove;
346  meOrigMove = eMove;
347  mnSettings = nSettings;
348 
349  mbValuesInLine = true; // always true.
350 
351  mnCurRow = rRange.aStart.Row();
352 
353  ScRange aRange = rRange;
354  if (aRange.aStart.Row() != aRange.aEnd.Row())
355  // We only allow this range to be one row tall.
356  aRange.aEnd.SetRow(aRange.aStart.Row());
357 
358  maStartRange = aRange;
359  maEndRange = aRange;
360  const auto & rDoc = mpDocShell->GetDocument();
361  if (nLimit == 0)
362  {
363  // Unlimited
364  maEndRange.aStart.SetRow(rDoc.MaxRow());
365  }
366  else if (nLimit > 0)
367  {
368  // Limited.
369  maEndRange.aStart.IncRow(nLimit-1);
370  if (maEndRange.aStart.Row() > rDoc.MaxRow())
371  maEndRange.aStart.SetRow(rDoc.MaxRow());
372  }
373 
375 }
376 
378 {
379  if (mbRunning)
380  return;
381 
382  if (!mxReaderThread.is())
383  {
384  std::unique_ptr<SvStream> pStream(new SvFileStream(msURL, StreamMode::READ));
386  mxReaderThread->launch();
387  }
388  mbRunning = true;
389  maDocAccess.reset();
390 
392 }
393 
395 {
396  if (!mbRunning)
397  return;
398 
399  mbRunning = false;
400  Refresh();
402 }
403 
405 {
406  mbRefreshOnEmptyLine = bVal;
407 }
408 
410 {
412 
413  double fStart = getNow();
414 
415  // Hard recalc will repaint the grid area.
418 
419  fTimes[ DebugTime::Recalc ] = getNow() - fStart;
420 
421  mfLastRefreshTime = getNow();
423 }
424 
426 {
427  switch (meMove)
428  {
429  case RANGE_DOWN:
430  {
431  if (mnCurRow == maEndRange.aStart.Row())
432  meMove = MOVE_UP;
433  }
434  break;
435  case MOVE_UP:
436  {
437  mbIsUpdate = true;
438  // Remove the top row and shift the remaining rows upward. Then
439  // insert a new row at the end row position.
440  ScRange aRange = maStartRange;
441  aRange.aEnd = maEndRange.aEnd;
442  maDocAccess.shiftRangeUp(aRange);
443  }
444  break;
445  case MOVE_DOWN:
446  {
447  mbIsUpdate = true;
448  // Remove the end row and shift the remaining rows downward by
449  // inserting a new row at the top row.
450  ScRange aRange = maStartRange;
451  aRange.aEnd = maEndRange.aEnd;
452  maDocAccess.shiftRangeDown(aRange);
453  }
454  break;
455  case NO_MOVE:
456  default:
457  ;
458  }
459  if(mbIsFirst && mbIsUpdate)
460  {
461  sal_Int32 nStreamTimeout = officecfg::Office::Calc::DataStream::UpdateTimeout::get();
462  maImportTimer.SetTimeout(nStreamTimeout);
463  mbIsFirst = false;
464  }
465 }
466 
468 {
469  Line aLine = ConsumeLine();
470  if (aLine.maCells.empty() && mbRefreshOnEmptyLine)
471  {
472  // Empty line detected. Trigger refresh and discard it.
473  Refresh();
474  return;
475  }
476 
477  double fStart = getNow();
478 
479  MoveData();
480  {
481  SCCOL nCol = maStartRange.aStart.Col();
482  const char* pLineHead = aLine.maLine.getStr();
483  for (const Cell& rCell : aLine.maCells)
484  {
485  if (rCell.mbValue)
486  {
488  ScAddress(nCol, mnCurRow, maStartRange.aStart.Tab()), rCell.mfValue);
489  }
490  else
491  {
494  OUString(pLineHead+rCell.maStr.Pos, rCell.maStr.Size, RTL_TEXTENCODING_UTF8));
495  }
496  ++nCol;
497  }
498  }
499 
500  fTimes[ DebugTime::Import ] = getNow() - fStart;
501 
502  if (meMove == NO_MOVE)
503  return;
504 
505  if (meMove == RANGE_DOWN)
506  {
507  ++mnCurRow;
508 // mpDocShell->GetViewData().GetView()->AlignToCursor(
509 // maStartRange.aStart.Col(), mnCurRow, SC_FOLLOW_JUMP);
510  }
511 
512  if (getNow() - mfLastRefreshTime > 0.1 && mnLinesSinceRefresh > 200)
513  // Refresh no more frequently than every 0.1 second, and wait until at
514  // least we have processed 200 lines.
515  Refresh();
516 
518 }
519 
521 {
522  if (!mbValuesInLine)
523  // We no longer support this mode. To be deleted later.
524  return false;
525 
526  if (ScDocShell::GetViewData()->GetViewShell()->NeedsRepaint())
527  return mbRunning;
528 
529  Text2Doc();
530  return mbRunning;
531 }
532 
533 IMPL_LINK_NOARG(DataStream, ImportTimerHdl, Timer *, void)
534 {
535  if (ImportData())
536  maImportTimer.Start();
537 }
538 
539 } // namespace sc
540 
541 /* vim:set shiftwidth=4 softtabstop=4 expandtab: */
static bool parseSimpleNumber(const OUString &rStr, sal_Unicode dsep, sal_Unicode gsep, sal_Unicode dsepa, double &rVal)
Check if a given string is a simple decimal number (e.g.
Definition: stringutil.cxx:52
SfxViewFrame * GetViewFrame() const
std::vector< Line > LinesType
Definition: datastream.hxx:62
ScAddress aStart
Definition: address.hxx:499
size_t mnLinesCount
Definition: datastream.hxx:111
MoveType meMove
Definition: datastream.hxx:106
void reset()
Clear its internal state, and more importantly all the block position hints currently held...
DataStream(const DataStream &)=delete
tuple parser
static o3tl::enumarray< DebugTime, double > fTimes
Definition: datastream.cxx:37
SCROW Row() const
Definition: address.hxx:261
sal_uInt32 mnSettings
Definition: datastream.hxx:104
ScDocShell * mpDocShell
Definition: datastream.hxx:101
Reference< XFrame > xFrame
void StartImport()
Definition: datastream.cxx:377
void SetDocumentModified()
Definition: docsh.cxx:2941
DocumentStreamAccess maDocAccess
Definition: datastream.hxx:102
ScAddress aEnd
Definition: address.hxx:500
sc::DocumentLinkManager & GetDocLinkManager()
Definition: documen2.cxx:228
void SetRefreshOnEmptyLine(bool bVal)
Definition: datastream.cxx:404
MoveType meOrigMove
Definition: datastream.hxx:105
std::unique_ptr< DataStream::LinesType > popNewLines()
Definition: datastream.cxx:147
Thread(char const *name)
static void Yield()
SfxFrame & GetFrame() const
SCTAB Tab() const
Definition: address.hxx:270
void SetRow(SCROW nRowP)
Definition: address.hxx:274
double mfLastRefreshTime
Definition: datastream.hxx:113
rtl::Reference< datastreams::ReaderThread > mxReaderThread
Definition: datastream.hxx:120
bool mbRefreshOnEmptyLine
Definition: datastream.hxx:109
size_t mnLinesSinceRefresh
Definition: datastream.hxx:112
void DoHardRecalc()
Definition: docsh4.cxx:1497
ScRange maEndRange
Definition: datastream.hxx:116
ScTabViewShell * GetViewShell() const
Definition: viewdata.hxx:356
std::unique_ptr< SvStream > mpStream
Definition: datastream.cxx:97
IMPL_LINK_NOARG(SearchResultsDlg, ListSelectHdl, weld::TreeView &, void)
Line ConsumeLine()
Definition: datastream.cxx:309
void Decode(const OUString &rURL, const ScRange &rRange, sal_Int32 nLimit, MoveType eMove, const sal_uInt32 nSettings)
Definition: datastream.cxx:341
void shiftRangeUp(const ScRange &rRange)
Pop the top row inside specified range, shift all the other rows up by one, then set the bottom row e...
double datastream_get_time(DebugTime nIdx)
Definition: datastream.cxx:39
sal_Int16 SCCOL
Definition: types.hxx:21
virtual void Start(bool bStartTimer=true) override
void IncRow(SCROW nDelta=1)
Definition: address.hxx:299
float u
std::queue< std::unique_ptr< DataStream::LinesType > > maPendingLines
Definition: datastream.cxx:102
static ScViewData * GetViewData()
Definition: docsh4.cxx:2578
void SetTimeout(sal_uInt64 nTimeoutMs)
const css::uno::Reference< css::frame::XFrame > & GetFrameInterface() const
SCCOL Col() const
Definition: address.hxx:266
void Stop()
SvBaseLink * pLink
static DataStream * Set(ScDocShell *pShell, const OUString &rURL, const ScRange &rRange, sal_Int32 nLimit, MoveType eMove, sal_uInt32 nSettings)
Definition: datastream.cxx:263
static void MakeToolbarVisible()
Definition: datastream.cxx:238
void shiftRangeDown(const ScRange &rRange)
Top the bottom row inside specified range, shift all the other rows above downward by one by insertin...
void setStringCell(const ScAddress &rPos, const OUString &rStr)
ScRange GetRange() const
Definition: datastream.cxx:334
void SetInvokeHandler(const Link< Timer *, void > &rLink)
void pushUsedLines(std::unique_ptr< DataStream::LinesType > pLines)
Definition: datastream.cxx:165
const ScDocument & GetDocument() const
Definition: docsh.hxx:220
std::vector< Cell > maCells
Definition: datastream.hxx:60
orcus::csv::parser_config maConfig
Definition: datastream.cxx:109
std::unique_ptr< LinesType > mpLines
Definition: datastream.hxx:110
std::queue< std::unique_ptr< DataStream::LinesType > > maUsedLines
Definition: datastream.cxx:103
basegfx::B2DPolygon maLine
ReaderThread(std::unique_ptr< SvStream > pData, size_t nColCount)
Definition: datastream.cxx:113
virtual void execute() override
Definition: datastream.cxx:176
void setDataStream(DataStream *p)
void setNumericCell(const ScAddress &rPos, double fVal)
ScRange maStartRange
Definition: datastream.hxx:115
OUString msURL
Definition: datastream.hxx:103