LibreOffice Module io (master) 1
opump.cxx
Go to the documentation of this file.
1/* -*- Mode: C++; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */
2/*
3 * This file is part of the LibreOffice project.
4 *
5 * This Source Code Form is subject to the terms of the Mozilla Public
6 * License, v. 2.0. If a copy of the MPL was not distributed with this
7 * file, You can obtain one at http://mozilla.org/MPL/2.0/.
8 *
9 * This file incorporates work covered by the following license notice:
10 *
11 * Licensed to the Apache Software Foundation (ASF) under one or more
12 * contributor license agreements. See the NOTICE file distributed
13 * with this work for additional information regarding copyright
14 * ownership. The ASF licenses this file to you under the Apache
15 * License, Version 2.0 (the "License"); you may not use this file
16 * except in compliance with the License. You may obtain a copy of
17 * the License at http://www.apache.org/licenses/LICENSE-2.0 .
18 */
19
20
21#include <sal/log.hxx>
22
23#include <com/sun/star/io/IOException.hpp>
24#include <com/sun/star/io/NotConnectedException.hpp>
25#include <com/sun/star/io/XActiveDataSource.hpp>
26#include <com/sun/star/io/XActiveDataSink.hpp>
27#include <com/sun/star/io/XActiveDataControl.hpp>
28#include <com/sun/star/io/XConnectable.hpp>
29#include <com/sun/star/lang/XServiceInfo.hpp>
30#include <com/sun/star/uno/XComponentContext.hpp>
31
35#include <osl/thread.h>
36#include <mutex>
37
38using namespace osl;
39using namespace cppu;
40using namespace com::sun::star::uno;
41using namespace com::sun::star::lang;
42using namespace com::sun::star::io;
43
44namespace io_stm {
45
46 namespace {
47
48 class Pump : public WeakImplHelper<
49 XActiveDataSource, XActiveDataSink, XActiveDataControl, XConnectable, XServiceInfo >
50 {
51 std::mutex m_aMutex;
52 oslThread m_aThread;
53
60
61 void run();
62 static void static_run( void* pObject );
63
64 void close();
65 void fireClose();
66 void fireStarted();
67 void fireTerminated();
68 void fireError( const Any &a );
69
70 public:
71 Pump();
72 virtual ~Pump() override;
73
74 // XActiveDataSource
75 virtual void SAL_CALL setOutputStream( const Reference< css::io::XOutputStream >& xOutput ) override;
76 virtual Reference< css::io::XOutputStream > SAL_CALL getOutputStream() override;
77
78 // XActiveDataSink
79 virtual void SAL_CALL setInputStream( const Reference< css::io::XInputStream >& xStream ) override;
80 virtual Reference< css::io::XInputStream > SAL_CALL getInputStream() override;
81
82 // XActiveDataControl
83 virtual void SAL_CALL addListener( const Reference< css::io::XStreamListener >& xListener ) override;
84 virtual void SAL_CALL removeListener( const Reference< css::io::XStreamListener >& xListener ) override;
85 virtual void SAL_CALL start() override;
86 virtual void SAL_CALL terminate() override;
87
88 // XConnectable
89 virtual void SAL_CALL setPredecessor( const Reference< css::io::XConnectable >& xPred ) override;
90 virtual Reference< css::io::XConnectable > SAL_CALL getPredecessor() override;
91 virtual void SAL_CALL setSuccessor( const Reference< css::io::XConnectable >& xSucc ) override;
92 virtual Reference< css::io::XConnectable > SAL_CALL getSuccessor() override;
93
94 public: // XServiceInfo
95 virtual OUString SAL_CALL getImplementationName() override;
96 virtual Sequence< OUString > SAL_CALL getSupportedServiceNames() override;
97 virtual sal_Bool SAL_CALL supportsService(const OUString& ServiceName) override;
98 };
99
100 }
101
102Pump::Pump() : m_aThread( nullptr ),
103 m_closeFired( false )
104{
105}
106
107Pump::~Pump()
108{
109 // exit gracefully
110 if( m_aThread )
111 {
112 osl_joinWithThread( m_aThread );
113 osl_destroyThread( m_aThread );
114 }
115}
116
117void Pump::fireError( const Any & exception )
118{
119 std::unique_lock guard( m_aMutex );
121 guard.unlock();
122 while( iter.hasMoreElements() )
123 {
124 try
125 {
126 iter.next()->error( exception );
127 }
128 catch ( const RuntimeException &e )
129 {
130 SAL_WARN("io.streams","com.sun.star.comp.stoc.Pump: unexpected exception during calling listeners" << e);
131 }
132 }
133}
134
135void Pump::fireClose()
136{
137 bool bFire = false;
138 {
139 std::unique_lock guard( m_aMutex );
140 if( ! m_closeFired )
141 {
142 m_closeFired = true;
143 bFire = true;
144 }
145 }
146
147 if( !bFire )
148 return;
149
150 std::unique_lock guard( m_aMutex );
152 guard.unlock();
153 while( iter.hasMoreElements() )
154 {
155 try
156 {
157 iter.next()->closed( );
158 }
159 catch ( const RuntimeException &e )
160 {
161 SAL_WARN("io.streams","com.sun.star.comp.stoc.Pump: unexpected exception during calling listeners" << e);
162 }
163 }
164}
165
166void Pump::fireStarted()
167{
168 std::unique_lock guard( m_aMutex );
170 guard.unlock();
171 while( iter.hasMoreElements() )
172 {
173 try
174 {
175 iter.next()->started( );
176 }
177 catch ( const RuntimeException &e )
178 {
179 SAL_WARN("io.streams","com.sun.star.comp.stoc.Pump: unexpected exception during calling listeners" << e);
180 }
181 }
182}
183
184void Pump::fireTerminated()
185{
186 std::unique_lock guard( m_aMutex );
188 guard.unlock();
189 while( iter.hasMoreElements() )
190 {
191 try
192 {
193 iter.next()->terminated();
194 }
195 catch ( const RuntimeException &e )
196 {
197 SAL_WARN("io.streams","com.sun.star.comp.stoc.Pump: unexpected exception during calling listeners" << e);
198 }
199 }
200}
201
202
203void Pump::close()
204{
205 // close streams and release references
208 {
209 std::unique_lock guard( m_aMutex );
210 rInput = m_xInput;
211 m_xInput.clear();
212
213 rOutput = m_xOutput;
214 m_xOutput.clear();
215 m_xSucc.clear();
216 m_xPred.clear();
217 }
218 if( rInput.is() )
219 {
220 try
221 {
222 rInput->closeInput();
223 }
224 catch( Exception & )
225 {
226 // go down calm
227 }
228 }
229 if( rOutput.is() )
230 {
231 try
232 {
233 rOutput->closeOutput();
234 }
235 catch( Exception & )
236 {
237 // go down calm
238 }
239 }
240}
241
242void Pump::static_run( void* pObject )
243{
244 osl_setThreadName("io_stm::Pump::run()");
245 static_cast<Pump*>(pObject)->run();
246 static_cast<Pump*>(pObject)->release();
247}
248
249void Pump::run()
250{
251 try
252 {
253 fireStarted();
254 try
255 {
258 {
259 std::unique_lock aGuard( m_aMutex );
260 rInput = m_xInput;
261 rOutput = m_xOutput;
262 }
263
264 if( ! rInput.is() )
265 {
266 throw NotConnectedException( "no input stream set", getXWeak() );
267 }
269 while( rInput->readSomeBytes( aData, 65536 ) )
270 {
271 if( ! rOutput.is() )
272 {
273 throw NotConnectedException( "no output stream set", getXWeak() );
274 }
275 rOutput->writeBytes( aData );
276 osl_yieldThread();
277 }
278 }
279 catch ( const IOException & e )
280 {
281 fireError( Any( e ) );
282 }
283 catch ( const RuntimeException & e )
284 {
285 fireError( Any( e ) );
286 }
287 catch ( const Exception & e )
288 {
289 fireError( Any( e ) );
290 }
291
292 close();
293 fireClose();
294 }
295 catch ( const css::uno::Exception &e )
296 {
297 // we are the last on the stack.
298 // this is to avoid crashing the program, when e.g. a bridge crashes
299 SAL_WARN("io.streams","com.sun.star.comp.stoc.Pump: unexpected exception during calling listeners" << e);
300 }
301}
302
303
304/*
305 * XConnectable
306 */
307
308void Pump::setPredecessor( const Reference< XConnectable >& xPred )
309{
310 std::unique_lock aGuard( m_aMutex );
311 m_xPred = xPred;
312}
313
314
315Reference< XConnectable > Pump::getPredecessor()
316{
317 std::unique_lock aGuard( m_aMutex );
318 return m_xPred;
319}
320
321
322void Pump::setSuccessor( const Reference< XConnectable >& xSucc )
323{
324 std::unique_lock aGuard( m_aMutex );
325 m_xSucc = xSucc;
326}
327
328
329Reference< XConnectable > Pump::getSuccessor()
330{
331 std::unique_lock aGuard( m_aMutex );
332 return m_xSucc;
333}
334
335
336/*
337 * XActiveDataControl
338 */
339
340void Pump::addListener( const Reference< XStreamListener >& xListener )
341{
342 std::unique_lock aGuard( m_aMutex );
343 m_cnt.addInterface( aGuard, xListener );
344}
345
346
347void Pump::removeListener( const Reference< XStreamListener >& xListener )
348{
349 std::unique_lock aGuard( m_aMutex );
350 m_cnt.removeInterface( aGuard, xListener );
351}
352
353
354void Pump::start()
355{
356 std::unique_lock aGuard( m_aMutex );
357 m_aThread = osl_createSuspendedThread(Pump::static_run,this);
358 if( !m_aThread )
359 {
360 throw RuntimeException(
361 "Pump::start Couldn't create worker thread",
362 *this);
363 }
364
365 // will be released by OPump::static_run
366 acquire();
367 osl_resumeThread( m_aThread );
368
369}
370
371
372void Pump::terminate()
373{
374 close();
375
376 // wait for the worker to die
377 if( m_aThread )
378 osl_joinWithThread( m_aThread );
379
380 fireTerminated();
381 fireClose();
382}
383
384
385/*
386 * XActiveDataSink
387 */
388
389void Pump::setInputStream( const Reference< XInputStream >& xStream )
390{
391 std::unique_lock aGuard( m_aMutex );
393 Reference< XConnectable > xConnect( xStream, UNO_QUERY );
394 if( xConnect.is() )
395 xConnect->setSuccessor( this );
396 // data transfer starts in XActiveDataControl::start
397}
398
399
400Reference< XInputStream > Pump::getInputStream()
401{
402 std::unique_lock aGuard( m_aMutex );
403 return m_xInput;
404}
405
406
407/*
408 * XActiveDataSource
409 */
410
411void Pump::setOutputStream( const Reference< XOutputStream >& xOut )
412{
413 std::unique_lock aGuard( m_aMutex );
414 m_xOutput = xOut;
415 Reference< XConnectable > xConnect( xOut, UNO_QUERY );
416 if( xConnect.is() )
417 xConnect->setPredecessor( this );
418 // data transfer starts in XActiveDataControl::start
419}
420
421Reference< XOutputStream > Pump::getOutputStream()
422{
423 std::unique_lock aGuard( m_aMutex );
424 return m_xOutput;
425}
426
427// XServiceInfo
428OUString Pump::getImplementationName()
429{
430 return "com.sun.star.comp.io.Pump";
431}
432
433// XServiceInfo
434sal_Bool Pump::supportsService(const OUString& ServiceName)
435{
436 return cppu::supportsService(this, ServiceName);
437}
438
439// XServiceInfo
440Sequence< OUString > Pump::getSupportedServiceNames()
441{
442 return { "com.sun.star.io.Pump" };
443}
444
445}
446
447extern "C" SAL_DLLPUBLIC_EXPORT css::uno::XInterface*
449 css::uno::XComponentContext* , css::uno::Sequence<css::uno::Any> const&)
450{
451 return cppu::acquire(new io_stm::Pump());
452}
453
454
455/* vim:set shiftwidth=4 softtabstop=4 expandtab: */
Reference< XInputStream > xStream
bool close
EmbeddedObjectRef * pObject
#define SAL_WARN(area, stream)
def run(arg=None, arg2=-1)
constexpr OUStringLiteral aData
void addListener(const InterfaceRef &xObject, const css::uno::Reference< css::lang::XEventListener > &xListener)
void removeListener(const InterfaceRef &xObject, const css::uno::Reference< css::lang::XEventListener > &xListener)
css::uno::Sequence< OUString > getSupportedServiceNames()
OUString getImplementationName()
bool CPPUHELPER_DLLPUBLIC supportsService(css::lang::XServiceInfo *implementation, rtl::OUString const &name)
Definition: odata.cxx:47
bool getOutputStream(ProgramOptions const &options, OString const &extension, std::ostream **ppOutputStream, OString &targetSourceFileName, OString &tmpSourceFileName)
comphelper::OInterfaceContainerHelper4< XStreamListener > m_cnt
Definition: opump.cxx:58
std::mutex m_aMutex
Definition: opump.cxx:51
Reference< XConnectable > m_xPred
Definition: opump.cxx:54
SAL_DLLPUBLIC_EXPORT css::uno::XInterface * io_Pump_get_implementation(css::uno::XComponentContext *, css::uno::Sequence< css::uno::Any > const &)
Definition: opump.cxx:448
oslThread m_aThread
Definition: opump.cxx:52
Reference< XInputStream > m_xInput
Definition: opump.cxx:56
Reference< XOutputStream > m_xOutput
Definition: opump.cxx:57
bool m_closeFired
Definition: opump.cxx:59
Reference< XConnectable > m_xSucc
Definition: opump.cxx:55
unsigned char sal_Bool