LibreOffice Module sc (master)  1
datastream.cxx
Go to the documentation of this file.
1 /* -*- Mode: C++; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */
2 /*
3  * This file is part of the LibreOffice project.
4  *
5  * This Source Code Form is subject to the terms of the Mozilla Public
6  * License, v. 2.0. If a copy of the MPL was not distributed with this
7  * file, You can obtain one at http://mozilla.org/MPL/2.0/.
8  */
9 
10 #include <datastream.hxx>
11 #include <datastreamgettime.hxx>
12 
13 #include <com/sun/star/frame/XLayoutManager.hpp>
14 #include <osl/conditn.hxx>
15 #include <osl/time.h>
16 #include <salhelper/thread.hxx>
17 #include <sfx2/viewfrm.hxx>
18 #include <tools/stream.hxx>
19 #include <vcl/svapp.hxx>
20 #include <docsh.hxx>
21 #include <tabvwsh.hxx>
22 #include <viewdata.hxx>
23 #include <stringutil.hxx>
24 #include <documentlinkmgr.hxx>
25 #include <o3tl/enumarray.hxx>
26 
27 #include <officecfg/Office/Calc.hxx>
28 
29 #include <orcus/csv_parser.hpp>
30 
31 #include <queue>
32 
33 namespace com::sun::star::ui { class XUIElement; }
34 
35 namespace sc {
36 
37 static o3tl::enumarray<DebugTime, double> fTimes { 0.0, 0.0, 0.0 };
38 
40 {
41  return fTimes[ nIdx ];
42 }
43 
44 namespace {
45 
46 double getNow()
47 {
48  TimeValue now;
49  osl_getSystemTime(&now);
50  return static_cast<double>(now.Seconds) + static_cast<double>(now.Nanosec) / 1000000000.0;
51 }
52 
53 class CSVHandler
54 {
55  DataStream::Line& mrLine;
56  size_t mnColCount;
57  size_t mnCols;
58  const char* mpLineHead;
59 
60 public:
61  CSVHandler( DataStream::Line& rLine, size_t nColCount ) :
62  mrLine(rLine), mnColCount(nColCount), mnCols(0), mpLineHead(rLine.maLine.getStr()) {}
63 
64  static void begin_parse() {}
65  static void end_parse() {}
66  static void begin_row() {}
67  static void end_row() {}
68 
69  void cell(const char* p, size_t n, bool /*transient*/)
70  {
71  if (mnCols >= mnColCount)
72  return;
73 
74  DataStream::Cell aCell;
75  if (ScStringUtil::parseSimpleNumber(p, n, '.', ',', aCell.mfValue))
76  {
77  aCell.mbValue = true;
78  }
79  else
80  {
81  aCell.mbValue = false;
82  aCell.maStr.Pos = std::distance(mpLineHead, p);
83  aCell.maStr.Size = n;
84  }
85  mrLine.maCells.push_back(aCell);
86 
87  ++mnCols;
88  }
89 };
90 
91 }
92 
93 namespace datastreams {
94 
96 {
97  std::unique_ptr<SvStream> mpStream;
98  size_t mnColCount;
100  osl::Mutex maMtxTerminate;
101 
102  std::queue<std::unique_ptr<DataStream::LinesType>> maPendingLines;
103  std::queue<std::unique_ptr<DataStream::LinesType>> maUsedLines;
104  osl::Mutex maMtxLines;
105 
106  osl::Condition maCondReadStream;
107  osl::Condition maCondConsume;
108 
109  orcus::csv::parser_config maConfig;
110 
111 public:
112 
113  ReaderThread(std::unique_ptr<SvStream> pData, size_t nColCount):
114  Thread("ReaderThread"),
115  mpStream(std::move(pData)),
116  mnColCount(nColCount),
117  mbTerminate(false)
118  {
119  maConfig.delimiters.push_back(',');
120  maConfig.text_qualifier = '"';
121  }
122 
124  {
125  osl::MutexGuard aGuard(maMtxTerminate);
126  return mbTerminate;
127  }
128 
130  {
131  osl::MutexGuard aGuard(maMtxTerminate);
132  mbTerminate = true;
133  }
134 
135  void endThread()
136  {
138  maCondReadStream.set();
139  }
140 
142  {
143  maCondConsume.wait();
144  maCondConsume.reset();
145  }
146 
147  std::unique_ptr<DataStream::LinesType> popNewLines()
148  {
149  auto pLines = std::move(maPendingLines.front());
150  maPendingLines.pop();
151  return pLines;
152  }
153 
155  {
156  if (maPendingLines.size() <= 4)
157  maCondReadStream.set(); // start producer again
158  }
159 
160  bool hasNewLines() const
161  {
162  return !maPendingLines.empty();
163  }
164 
165  void pushUsedLines( std::unique_ptr<DataStream::LinesType> pLines )
166  {
167  maUsedLines.push(std::move(pLines));
168  }
169 
170  osl::Mutex& getLinesMutex()
171  {
172  return maMtxLines;
173  }
174 
175 private:
176  virtual void execute() override
177  {
178  while (!isTerminateRequested())
179  {
180  std::unique_ptr<DataStream::LinesType> pLines;
181  osl::ResettableMutexGuard aGuard(maMtxLines);
182 
183  if (!maUsedLines.empty())
184  {
185  // Re-use lines from previous runs.
186  pLines = std::move(maUsedLines.front());
187  maUsedLines.pop();
188  aGuard.clear(); // unlock
189  }
190  else
191  {
192  aGuard.clear(); // unlock
193  pLines.reset(new DataStream::LinesType(10));
194  }
195 
196  // Read & store new lines from stream.
197  for (DataStream::Line & rLine : *pLines)
198  {
199  rLine.maCells.clear();
200  mpStream->ReadLine(rLine.maLine);
201  CSVHandler aHdl(rLine, mnColCount);
202  orcus::csv_parser<CSVHandler> parser(rLine.maLine.getStr(), rLine.maLine.getLength(), aHdl, maConfig);
203  parser.parse();
204  }
205 
206  aGuard.reset(); // lock
207  while (!isTerminateRequested() && maPendingLines.size() >= 8)
208  {
209  // pause reading for a bit
210  aGuard.clear(); // unlock
211  maCondReadStream.wait();
212  maCondReadStream.reset();
213  aGuard.reset(); // lock
214  }
215  maPendingLines.push(std::move(pLines));
216  maCondConsume.set();
217  if (!mpStream->good())
219  }
220  }
221 };
222 
223 }
224 
225 DataStream::Cell::Cell() : mfValue(0.0), mbValue(true) {}
226 
227 DataStream::Cell::Cell( const Cell& r ) : mbValue(r.mbValue)
228 {
229  if (r.mbValue)
230  mfValue = r.mfValue;
231  else
232  {
233  maStr.Pos = r.maStr.Pos;
234  maStr.Size = r.maStr.Size;
235  }
236 }
237 
239 {
240  css::uno::Reference< css::frame::XFrame > xFrame =
242  if (!xFrame.is())
243  return;
244 
245  css::uno::Reference< css::beans::XPropertySet > xPropSet(xFrame, css::uno::UNO_QUERY);
246  if (!xPropSet.is())
247  return;
248 
249  css::uno::Reference< css::frame::XLayoutManager > xLayoutManager;
250  xPropSet->getPropertyValue("LayoutManager") >>= xLayoutManager;
251  if (!xLayoutManager.is())
252  return;
253 
254  const OUString sResourceURL( "private:resource/toolbar/datastreams" );
255  css::uno::Reference< css::ui::XUIElement > xUIElement = xLayoutManager->getElement(sResourceURL);
256  if (!xUIElement.is())
257  {
258  xLayoutManager->createElement( sResourceURL );
259  xLayoutManager->showElement( sResourceURL );
260  }
261 }
262 
264  ScDocShell *pShell, const OUString& rURL, const ScRange& rRange,
265  sal_Int32 nLimit, MoveType eMove, sal_uInt32 nSettings)
266 {
267  DataStream* pLink = new DataStream(pShell, rURL, rRange, nLimit, eMove, nSettings);
269  rMgr.setDataStream(pLink);
270  return pLink;
271 }
272 
273 DataStream::DataStream(ScDocShell *pShell, const OUString& rURL, const ScRange& rRange,
274  sal_Int32 nLimit, MoveType eMove, sal_uInt32 nSettings) :
275  mpDocShell(pShell),
276  maDocAccess(mpDocShell->GetDocument()),
278  meMove(NO_MOVE),
279  mbRunning(false),
280  mbValuesInLine(false),
281  mbRefreshOnEmptyLine(false),
282  mnLinesCount(0),
284  mfLastRefreshTime(0.0),
285  mnCurRow(0),
286  mbIsFirst(true),
287  mbIsUpdate(false)
288 {
290  maImportTimer.SetInvokeHandler( LINK(this, DataStream, ImportTimerHdl) );
291 
292  Decode(rURL, rRange, nLimit, eMove, nSettings);
293 }
294 
296 {
297  if (mbRunning)
298  StopImport();
299 
300  if (mxReaderThread.is())
301  {
302  mxReaderThread->endThread();
303  mxReaderThread->join();
304  }
305  mpLines.reset();
306 }
307 
309 {
310  if (!mpLines || mnLinesCount >= mpLines->size())
311  {
312  mnLinesCount = 0;
313  if (mxReaderThread->isTerminateRequested())
314  return Line();
315 
316  osl::ResettableMutexGuard aGuard(mxReaderThread->getLinesMutex());
317  if (mpLines)
318  mxReaderThread->pushUsedLines(std::move(mpLines));
319 
320  while (!mxReaderThread->hasNewLines())
321  {
322  aGuard.clear(); // unlock
323  mxReaderThread->waitForNewLines();
324  aGuard.reset(); // lock
325  }
326 
327  mpLines = mxReaderThread->popNewLines();
328  mxReaderThread->resumeReadStream();
329  }
330  return mpLines->at(mnLinesCount++);
331 }
332 
334 {
335  ScRange aRange = maStartRange;
336  aRange.aEnd = maEndRange.aEnd;
337  return aRange;
338 }
339 
340 void DataStream::Decode(const OUString& rURL, const ScRange& rRange,
341  sal_Int32 nLimit, MoveType eMove, const sal_uInt32 nSettings)
342 {
343  msURL = rURL;
344  meMove = eMove;
345  meOrigMove = eMove;
346  mnSettings = nSettings;
347 
348  mbValuesInLine = true; // always true.
349 
350  mnCurRow = rRange.aStart.Row();
351 
352  ScRange aRange = rRange;
353  if (aRange.aStart.Row() != aRange.aEnd.Row())
354  // We only allow this range to be one row tall.
355  aRange.aEnd.SetRow(aRange.aStart.Row());
356 
357  maStartRange = aRange;
358  maEndRange = aRange;
359  const auto & rDoc = mpDocShell->GetDocument();
360  if (nLimit == 0)
361  {
362  // Unlimited
363  maEndRange.aStart.SetRow(rDoc.MaxRow());
364  }
365  else if (nLimit > 0)
366  {
367  // Limited.
368  maEndRange.aStart.IncRow(nLimit-1);
369  if (maEndRange.aStart.Row() > rDoc.MaxRow())
370  maEndRange.aStart.SetRow(rDoc.MaxRow());
371  }
372 
374 }
375 
377 {
378  if (mbRunning)
379  return;
380 
381  if (!mxReaderThread.is())
382  {
383  std::unique_ptr<SvStream> pStream(new SvFileStream(msURL, StreamMode::READ));
385  mxReaderThread->launch();
386  }
387  mbRunning = true;
388  maDocAccess.reset();
389 
391 }
392 
394 {
395  if (!mbRunning)
396  return;
397 
398  mbRunning = false;
399  Refresh();
401 }
402 
404 {
405  mbRefreshOnEmptyLine = bVal;
406 }
407 
409 {
411 
412  double fStart = getNow();
413 
414  // Hard recalc will repaint the grid area.
417 
418  fTimes[ DebugTime::Recalc ] = getNow() - fStart;
419 
420  mfLastRefreshTime = getNow();
422 }
423 
425 {
426  switch (meMove)
427  {
428  case RANGE_DOWN:
429  {
430  if (mnCurRow == maEndRange.aStart.Row())
431  meMove = MOVE_UP;
432  }
433  break;
434  case MOVE_UP:
435  {
436  mbIsUpdate = true;
437  // Remove the top row and shift the remaining rows upward. Then
438  // insert a new row at the end row position.
439  ScRange aRange = maStartRange;
440  aRange.aEnd = maEndRange.aEnd;
441  maDocAccess.shiftRangeUp(aRange);
442  }
443  break;
444  case MOVE_DOWN:
445  {
446  mbIsUpdate = true;
447  // Remove the end row and shift the remaining rows downward by
448  // inserting a new row at the top row.
449  ScRange aRange = maStartRange;
450  aRange.aEnd = maEndRange.aEnd;
451  maDocAccess.shiftRangeDown(aRange);
452  }
453  break;
454  case NO_MOVE:
455  default:
456  ;
457  }
458  if(mbIsFirst && mbIsUpdate)
459  {
460  sal_Int32 nStreamTimeout = officecfg::Office::Calc::DataStream::UpdateTimeout::get();
461  maImportTimer.SetTimeout(nStreamTimeout);
462  mbIsFirst = false;
463  }
464 }
465 
467 {
468  Line aLine = ConsumeLine();
469  if (aLine.maCells.empty() && mbRefreshOnEmptyLine)
470  {
471  // Empty line detected. Trigger refresh and discard it.
472  Refresh();
473  return;
474  }
475 
476  double fStart = getNow();
477 
478  MoveData();
479  {
480  SCCOL nCol = maStartRange.aStart.Col();
481  const char* pLineHead = aLine.maLine.getStr();
482  for (const Cell& rCell : aLine.maCells)
483  {
484  if (rCell.mbValue)
485  {
487  ScAddress(nCol, mnCurRow, maStartRange.aStart.Tab()), rCell.mfValue);
488  }
489  else
490  {
493  OUString(pLineHead+rCell.maStr.Pos, rCell.maStr.Size, RTL_TEXTENCODING_UTF8));
494  }
495  ++nCol;
496  }
497  }
498 
499  fTimes[ DebugTime::Import ] = getNow() - fStart;
500 
501  if (meMove == NO_MOVE)
502  return;
503 
504  if (meMove == RANGE_DOWN)
505  {
506  ++mnCurRow;
507 // mpDocShell->GetViewData().GetView()->AlignToCursor(
508 // maStartRange.aStart.Col(), mnCurRow, SC_FOLLOW_JUMP);
509  }
510 
511  if (getNow() - mfLastRefreshTime > 0.1 && mnLinesSinceRefresh > 200)
512  // Refresh no more frequently than every 0.1 second, and wait until at
513  // least we have processed 200 lines.
514  Refresh();
515 
517 }
518 
520 {
521  if (!mbValuesInLine)
522  // We no longer support this mode. To be deleted later.
523  return false;
524 
525  if (ScDocShell::GetViewData()->GetViewShell()->NeedsRepaint())
526  return mbRunning;
527 
528  Text2Doc();
529  return mbRunning;
530 }
531 
532 IMPL_LINK_NOARG(DataStream, ImportTimerHdl, Timer *, void)
533 {
534  if (ImportData())
535  maImportTimer.Start();
536 }
537 
538 } // namespace sc
539 
540 /* vim:set shiftwidth=4 softtabstop=4 expandtab: */
static bool parseSimpleNumber(const OUString &rStr, sal_Unicode dsep, sal_Unicode gsep, sal_Unicode dsepa, double &rVal)
Check if a given string is a simple decimal number (e.g.
Definition: stringutil.cxx:52
SfxViewFrame * GetViewFrame() const
std::vector< Line > LinesType
Definition: datastream.hxx:63
ScAddress aStart
Definition: address.hxx:500
size_t mnLinesCount
Definition: datastream.hxx:112
MoveType meMove
Definition: datastream.hxx:107
void reset()
Clear its internal state, and more importantly all the block position hints currently held...
DataStream(const DataStream &)=delete
tuple parser
static o3tl::enumarray< DebugTime, double > fTimes
Definition: datastream.cxx:37
SCROW Row() const
Definition: address.hxx:262
sal_uInt32 mnSettings
Definition: datastream.hxx:105
ScDocShell * mpDocShell
Definition: datastream.hxx:102
Reference< XFrame > xFrame
void StartImport()
Definition: datastream.cxx:376
void SetDocumentModified()
Definition: docsh.cxx:2819
DocumentStreamAccess maDocAccess
Definition: datastream.hxx:103
ScAddress aEnd
Definition: address.hxx:501
sc::DocumentLinkManager & GetDocLinkManager()
Definition: documen2.cxx:225
void SetRefreshOnEmptyLine(bool bVal)
Definition: datastream.cxx:403
MoveType meOrigMove
Definition: datastream.hxx:106
std::unique_ptr< DataStream::LinesType > popNewLines()
Definition: datastream.cxx:147
Thread(char const *name)
static void Yield()
SfxFrame & GetFrame() const
SCTAB Tab() const
Definition: address.hxx:271
void SetRow(SCROW nRowP)
Definition: address.hxx:275
double mfLastRefreshTime
Definition: datastream.hxx:114
rtl::Reference< datastreams::ReaderThread > mxReaderThread
Definition: datastream.hxx:121
bool mbRefreshOnEmptyLine
Definition: datastream.hxx:110
size_t mnLinesSinceRefresh
Definition: datastream.hxx:113
void DoHardRecalc()
Definition: docsh4.cxx:1466
ScRange maEndRange
Definition: datastream.hxx:117
ScTabViewShell * GetViewShell() const
Definition: viewdata.hxx:357
std::unique_ptr< SvStream > mpStream
Definition: datastream.cxx:97
IMPL_LINK_NOARG(SearchResultsDlg, ListSelectHdl, weld::TreeView &, void)
Line ConsumeLine()
Definition: datastream.cxx:308
void Decode(const OUString &rURL, const ScRange &rRange, sal_Int32 nLimit, MoveType eMove, const sal_uInt32 nSettings)
Definition: datastream.cxx:340
void shiftRangeUp(const ScRange &rRange)
Pop the top row inside specified range, shift all the other rows up by one, then set the bottom row e...
double datastream_get_time(DebugTime nIdx)
Definition: datastream.cxx:39
sal_Int16 SCCOL
Definition: types.hxx:22
void IncRow(SCROW nDelta=1)
Definition: address.hxx:300
virtual void Start() override
std::queue< std::unique_ptr< DataStream::LinesType > > maPendingLines
Definition: datastream.cxx:102
static ScViewData * GetViewData()
Definition: docsh4.cxx:2542
void SetTimeout(sal_uInt64 nTimeoutMs)
const css::uno::Reference< css::frame::XFrame > & GetFrameInterface() const
SCCOL Col() const
Definition: address.hxx:267
void Stop()
SvBaseLink * pLink
static DataStream * Set(ScDocShell *pShell, const OUString &rURL, const ScRange &rRange, sal_Int32 nLimit, MoveType eMove, sal_uInt32 nSettings)
Definition: datastream.cxx:263
static void MakeToolbarVisible()
Definition: datastream.cxx:238
void shiftRangeDown(const ScRange &rRange)
Top the bottom row inside specified range, shift all the other rows above downward by one by insertin...
void setStringCell(const ScAddress &rPos, const OUString &rStr)
ScRange GetRange() const
Definition: datastream.cxx:333
void SetInvokeHandler(const Link< Timer *, void > &rLink)
void pushUsedLines(std::unique_ptr< DataStream::LinesType > pLines)
Definition: datastream.cxx:165
const ScDocument & GetDocument() const
Definition: docsh.hxx:216
std::vector< Cell > maCells
Definition: datastream.hxx:61
orcus::csv::parser_config maConfig
Definition: datastream.cxx:109
std::unique_ptr< LinesType > mpLines
Definition: datastream.hxx:111
std::queue< std::unique_ptr< DataStream::LinesType > > maUsedLines
Definition: datastream.cxx:103
basegfx::B2DPolygon maLine
ReaderThread(std::unique_ptr< SvStream > pData, size_t nColCount)
Definition: datastream.cxx:113
virtual void execute() override
Definition: datastream.cxx:176
void setDataStream(DataStream *p)
void setNumericCell(const ScAddress &rPos, double fVal)
ScRange maStartRange
Definition: datastream.hxx:116
OUString msURL
Definition: datastream.hxx:104