LibreOffice Module io (master) 1
opipe.cxx
Go to the documentation of this file.
1/* -*- Mode: C++; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */
2/*
3 * This file is part of the LibreOffice project.
4 *
5 * This Source Code Form is subject to the terms of the Mozilla Public
6 * License, v. 2.0. If a copy of the MPL was not distributed with this
7 * file, You can obtain one at http://mozilla.org/MPL/2.0/.
8 *
9 * This file incorporates work covered by the following license notice:
10 *
11 * Licensed to the Apache Software Foundation (ASF) under one or more
12 * contributor license agreements. See the NOTICE file distributed
13 * with this work for additional information regarding copyright
14 * ownership. The ASF licenses this file to you under the Apache
15 * License, Version 2.0 (the "License"); you may not use this file
16 * except in compliance with the License. You may obtain a copy of
17 * the License at http://www.apache.org/licenses/LICENSE-2.0 .
18 */
19
20#include <sal/config.h>
21
22#include <com/sun/star/io/BufferSizeExceededException.hpp>
23#include <com/sun/star/io/NotConnectedException.hpp>
24#include <com/sun/star/io/XPipe.hpp>
25#include <com/sun/star/io/XConnectable.hpp>
26
27#include <com/sun/star/lang/XServiceInfo.hpp>
28
31
32#include <osl/conditn.hxx>
33#include <osl/mutex.hxx>
34
35#include <limits>
36#include <memory>
37#include <string.h>
38
39using namespace ::osl;
40using namespace ::cppu;
41using namespace ::com::sun::star::uno;
42using namespace ::com::sun::star::io;
43using namespace ::com::sun::star::lang;
44
45#include "streamhelper.hxx"
46
47namespace com::sun::star::uno { class XComponentContext; }
48
49namespace io_stm{
50
51namespace {
52
53class OPipeImpl :
54 public WeakImplHelper< XPipe , XConnectable , XServiceInfo >
55{
56public:
57 OPipeImpl( );
58
59public: // XInputStream
60 virtual sal_Int32 SAL_CALL readBytes(Sequence< sal_Int8 >& aData, sal_Int32 nBytesToRead) override;
61 virtual sal_Int32 SAL_CALL readSomeBytes(Sequence< sal_Int8 >& aData, sal_Int32 nMaxBytesToRead) override;
62 virtual void SAL_CALL skipBytes(sal_Int32 nBytesToSkip) override;
63 virtual sal_Int32 SAL_CALL available() override;
64 virtual void SAL_CALL closeInput() override;
65
66public: // XOutputStream
67
68 virtual void SAL_CALL writeBytes(const Sequence< sal_Int8 >& aData) override;
69 virtual void SAL_CALL flush() override;
70 virtual void SAL_CALL closeOutput() override;
71
72public: // XConnectable
73 virtual void SAL_CALL setPredecessor(const Reference< XConnectable >& aPredecessor) override;
74 virtual Reference< XConnectable > SAL_CALL getPredecessor() override;
75 virtual void SAL_CALL setSuccessor(const Reference < XConnectable > & aSuccessor) override;
76 virtual Reference < XConnectable > SAL_CALL getSuccessor() override ;
77
78
79public: // XServiceInfo
80 OUString SAL_CALL getImplementationName() override;
82 sal_Bool SAL_CALL supportsService(const OUString& ServiceName) override;
83
84private:
85
88
89 sal_Int32 m_nBytesToSkip;
90
93
94 osl::Condition m_conditionBytesAvail;
96 std::unique_ptr<MemFIFO> m_pFIFO;
97};
98
99}
100
101OPipeImpl::OPipeImpl()
102 : m_nBytesToSkip(0 )
103 , m_bOutputStreamClosed(false )
104 , m_bInputStreamClosed( false )
105 , m_pFIFO( new MemFIFO )
106{
107}
108
109
110
111sal_Int32 OPipeImpl::readBytes(Sequence< sal_Int8 >& aData, sal_Int32 nBytesToRead)
112{
113 while( true )
114 {
115 { // start guarded section
116 MutexGuard guard( m_mutexAccess );
118 {
119 throw NotConnectedException(
120 "Pipe::readBytes NotConnectedException",
121 *this );
122 }
123 sal_Int32 nOccupiedBufferLen = m_pFIFO->getSize();
124
125 if( m_bOutputStreamClosed && nBytesToRead > nOccupiedBufferLen )
126 {
127 nBytesToRead = nOccupiedBufferLen;
128 }
129
130 if( nOccupiedBufferLen < nBytesToRead )
131 {
132 // wait outside guarded section
133 m_conditionBytesAvail.reset();
134 }
135 else {
136 // necessary bytes are available
137 m_pFIFO->read( aData , nBytesToRead );
138 return nBytesToRead;
139 }
140 } // end guarded section
141
142 // wait for new data outside guarded section!
144 }
145}
146
147
148sal_Int32 OPipeImpl::readSomeBytes(Sequence< sal_Int8 >& aData, sal_Int32 nMaxBytesToRead)
149{
150 while( true ) {
151 {
152 MutexGuard guard( m_mutexAccess );
154 {
155 throw NotConnectedException(
156 "Pipe::readSomeBytes NotConnectedException",
157 *this );
158 }
159 if( m_pFIFO->getSize() )
160 {
161 sal_Int32 nSize = std::min( nMaxBytesToRead , m_pFIFO->getSize() );
162 aData.realloc( nSize );
163 m_pFIFO->read( aData , nSize );
164 return nSize;
165 }
166
168 {
169 // no bytes in buffer anymore
170 return 0;
171 }
172 }
173
175 }
176}
177
178
179void OPipeImpl::skipBytes(sal_Int32 nBytesToSkip)
180{
181 MutexGuard guard( m_mutexAccess );
183 {
184 throw NotConnectedException(
185 "Pipe::skipBytes NotConnectedException",
186 *this );
187 }
188
189 if( nBytesToSkip < 0
190 || (nBytesToSkip
191 > std::numeric_limits< sal_Int32 >::max() - m_nBytesToSkip) )
192 {
193 throw BufferSizeExceededException(
194 "Pipe::skipBytes BufferSizeExceededException",
195 *this );
196 }
197 m_nBytesToSkip += nBytesToSkip;
198
199 nBytesToSkip = std::min( m_pFIFO->getSize() , m_nBytesToSkip );
200 m_pFIFO->skip( nBytesToSkip );
201 m_nBytesToSkip -= nBytesToSkip;
202}
203
204
205sal_Int32 OPipeImpl::available()
206 {
207 MutexGuard guard( m_mutexAccess );
209 {
210 throw NotConnectedException(
211 "Pipe::available NotConnectedException",
212 *this );
213 }
214 return m_pFIFO->getSize();
215}
216
217void OPipeImpl::closeInput()
218{
219 MutexGuard guard( m_mutexAccess );
220
222
223 m_pFIFO.reset();
224
225 // readBytes may throw an exception
227
228 setSuccessor( Reference< XConnectable > () );
229}
230
231
232void OPipeImpl::writeBytes(const Sequence< sal_Int8 >& aData)
233{
234 MutexGuard guard( m_mutexAccess );
235
237 {
238 throw NotConnectedException(
239 "Pipe::writeBytes NotConnectedException (outputstream)",
240 *this );
241 }
242
244 {
245 throw NotConnectedException(
246 "Pipe::writeBytes NotConnectedException (inputstream)",
247 *this );
248 }
249
250 // check skipping
251 sal_Int32 nLen = aData.getLength();
252 if( m_nBytesToSkip && m_nBytesToSkip >= nLen ) {
253 // all must be skipped - forget whole call
254 m_nBytesToSkip -= nLen;
255 return;
256 }
257
258 // adjust buffersize if necessary
259 if( m_nBytesToSkip )
260 {
261 Sequence< sal_Int8 > seqCopy( nLen - m_nBytesToSkip );
262 memcpy( seqCopy.getArray() , &( aData.getConstArray()[m_nBytesToSkip] ) , nLen-m_nBytesToSkip );
263 m_pFIFO->write( seqCopy );
264 }
265 else
266 {
267 m_pFIFO->write( aData );
268 }
269 m_nBytesToSkip = 0;
270
271 // readBytes may check again if enough bytes are available
273}
274
275
276void OPipeImpl::flush()
277{
278 // nothing to do for a pipe
279}
280
281void OPipeImpl::closeOutput()
282{
283 MutexGuard guard( m_mutexAccess );
284
287 setPredecessor( Reference < XConnectable > () );
288}
289
290
291void OPipeImpl::setSuccessor( const Reference < XConnectable > &r )
292{
294 if( m_succ != r ) {
296 m_succ = r;
297
298 if( m_succ.is() )
299 {
300 m_succ->setPredecessor(
301 Reference< XConnectable > ( static_cast< XConnectable * >(this) ) );
302 }
303 }
304}
305
306Reference < XConnectable > OPipeImpl::getSuccessor()
307{
308 return m_succ;
309}
310
311
312// XDataSource
313void OPipeImpl::setPredecessor( const Reference < XConnectable > &r )
314{
315 if( r != m_pred ) {
316 m_pred = r;
317 if( m_pred.is() ) {
318 m_pred->setSuccessor(
319 Reference < XConnectable > ( static_cast< XConnectable * >(this) ) );
320 }
321 }
322}
323
324Reference < XConnectable > OPipeImpl::getPredecessor()
325{
326 return m_pred;
327}
328
329
330// XServiceInfo
331OUString OPipeImpl::getImplementationName()
332{
333 return "com.sun.star.comp.io.stm.Pipe";
334}
335
336// XServiceInfo
337sal_Bool OPipeImpl::supportsService(const OUString& ServiceName)
338{
339 return cppu::supportsService(this, ServiceName);
340}
341
342// XServiceInfo
343Sequence< OUString > OPipeImpl::getSupportedServiceNames()
344{
345 return { "com.sun.star.io.Pipe" };
346}
347
348}
349
350extern "C" SAL_DLLPUBLIC_EXPORT css::uno::XInterface*
352 css::uno::XComponentContext* , css::uno::Sequence<css::uno::Any> const&)
353{
354 return cppu::acquire(new io_stm::OPipeImpl());
355}
356
357/* vim:set shiftwidth=4 softtabstop=4 expandtab: */
constexpr OUStringLiteral aData
css::uno::Sequence< OUString > getSupportedServiceNames()
OUString getImplementationName()
bool CPPUHELPER_DLLPUBLIC supportsService(css::lang::XServiceInfo *implementation, rtl::OUString const &name)
Definition: odata.cxx:47
Reference< XConnectable > m_succ
Definition: opipe.cxx:86
bool m_bInputStreamClosed
Definition: opipe.cxx:92
SAL_DLLPUBLIC_EXPORT css::uno::XInterface * io_OPipeImpl_get_implementation(css::uno::XComponentContext *, css::uno::Sequence< css::uno::Any > const &)
Definition: opipe.cxx:351
osl::Condition m_conditionBytesAvail
Definition: opipe.cxx:94
bool m_bOutputStreamClosed
Definition: opipe.cxx:91
sal_Int32 m_nBytesToSkip
Definition: opipe.cxx:89
Mutex m_mutexAccess
Definition: opipe.cxx:95
Reference< XConnectable > m_pred
Definition: opipe.cxx:87
std::unique_ptr< MemFIFO > m_pFIFO
Definition: opipe.cxx:96
unsigned char sal_Bool