LibreOffice Module package (master) 1
XBufferedThreadedStream.cxx
Go to the documentation of this file.
1/* -*- Mode: C++; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */
2/*
3 * This file is part of the LibreOffice project.
4 *
5 * This Source Code Form is subject to the terms of the Mozilla Public
6 * License, v. 2.0. If a copy of the MPL was not distributed with this
7 * file, You can obtain one at http://mozilla.org/MPL/2.0/.
8 */
9
11
12using namespace css::uno;
13
14namespace {
15
16class UnzippingThread: public salhelper::Thread
17{
19public:
20 explicit UnzippingThread(XBufferedThreadedStream &xStream): Thread("Unzipping"), mxStream(xStream) {}
21private:
22 virtual void execute() override
23 {
24 try
25 {
26 mxStream.produce();
27 }
28 catch (...)
29 {
30 mxStream.saveException(std::current_exception());
31 }
32
33 mxStream.setTerminateThread();
34 }
35};
36
37}
38
40 const Reference<XInputStream>& xSrcStream,
41 sal_Int64 nStreamSize)
42: mxSrcStream( xSrcStream )
43, mnPos(0)
44, mnStreamSize( nStreamSize )
45, mnOffset( 0 )
46, mxUnzippingThread( new UnzippingThread(*this) )
47, mbTerminateThread( false )
48{
49 mxUnzippingThread->launch();
50}
51
53{
55 mxUnzippingThread->join();
56}
57
63{
64 Buffer pProducedBuffer;
65 sal_Int64 nTotalBytesRead(0);
66 std::unique_lock<std::mutex> aGuard( maBufferProtector );
67 do
68 {
69 if( !maUsedBuffers.empty() )
70 {
71 pProducedBuffer = maUsedBuffers.front();
72 maUsedBuffers.pop();
73 }
74
75 aGuard.unlock();
76 nTotalBytesRead += mxSrcStream->readBytes( pProducedBuffer, nBufferSize );
77
78 aGuard.lock();
79 maPendingBuffers.push( pProducedBuffer );
80 maBufferConsumeResume.notify_one();
81
83 maBufferProduceResume.wait( aGuard, [&]{return canProduce(); } );
84
85 } while( !mbTerminateThread && nTotalBytesRead < mnStreamSize );
86}
87
92{
93 std::unique_lock<std::mutex> aGuard( maBufferProtector );
94 const sal_Int32 nBufSize = maInUseBuffer.getLength();
95 if( nBufSize <= 0 || mnOffset >= nBufSize )
96 {
97 if( mnOffset >= nBufSize )
99
100 maBufferConsumeResume.wait( aGuard, [&]{return canConsume(); } );
101
102 if( maPendingBuffers.empty() )
103 {
106 std::rethrow_exception(maSavedException);
107 }
108 else
109 {
111 maPendingBuffers.pop();
112 mnOffset = 0;
113
114 if( maPendingBuffers.size() <= nBufferLowWater )
115 maBufferProduceResume.notify_one();
116 }
117 }
118
119 return maInUseBuffer;
120}
121
123{
124 std::scoped_lock<std::mutex> aGuard( maBufferProtector );
125 mbTerminateThread = true;
126 maBufferProduceResume.notify_one();
127 maBufferConsumeResume.notify_one();
128}
129
130sal_Int32 SAL_CALL XBufferedThreadedStream::readBytes( Sequence< sal_Int8 >& rData, sal_Int32 nBytesToRead )
131{
132 if( !hasBytes() )
133 return 0;
134
135 const sal_Int32 nAvailableSize = static_cast< sal_Int32 > ( std::min< sal_Int64 >( nBytesToRead, remainingSize() ) );
136 rData.realloc( nAvailableSize );
137 auto pData = rData.getArray();
138 sal_Int32 i = 0, nPendingBytes = nAvailableSize;
139
140 while( nPendingBytes )
141 {
142 const Buffer &pBuffer = getNextBlock();
143 if( !pBuffer.hasElements() )
144 {
145 rData.realloc( nAvailableSize - nPendingBytes );
146 return nAvailableSize - nPendingBytes;
147 }
148 const sal_Int32 limit = std::min<sal_Int32>( nPendingBytes, pBuffer.getLength() - mnOffset );
149
150 memcpy( &pData[i], &pBuffer[mnOffset], limit );
151
152 nPendingBytes -= limit;
153 mnOffset += limit;
154 mnPos += limit;
155 i += limit;
156 }
157
158 return nAvailableSize;
159}
160
161sal_Int32 SAL_CALL XBufferedThreadedStream::readSomeBytes( Sequence< sal_Int8 >& aData, sal_Int32 nMaxBytesToRead )
162{
163 return readBytes( aData, nMaxBytesToRead );
164}
165void SAL_CALL XBufferedThreadedStream::skipBytes( sal_Int32 nBytesToSkip )
166{
167 if( nBytesToSkip )
168 {
169 Sequence < sal_Int8 > aSequence( nBytesToSkip );
170 readBytes( aSequence, nBytesToSkip );
171 }
172}
173
175{
176 if( !hasBytes() )
177 return 0;
178
179 return static_cast< sal_Int32 > ( std::min< sal_Int64 >( SAL_MAX_INT32, remainingSize() ) );
180}
181
183{
185 mxUnzippingThread->join();
186 mxSrcStream->closeInput();
187}
188
189/* vim:set shiftwidth=4 softtabstop=4 expandtab: */
Reference< XInputStream > xStream
css::uno::Sequence< sal_Int8 > Buffer
void saveException(const std::exception_ptr &exception)
virtual void SAL_CALL skipBytes(sal_Int32 nBytesToSkip) override
std::condition_variable maBufferConsumeResume
mutex protecting Buffer queues.
Buffer maInUseBuffer
available size of stream
virtual sal_Int32 SAL_CALL available() override
const Buffer & getNextBlock()
Fetches next available block from maPendingBuffers for use in Reading thread.
XBufferedThreadedStream(const css::uno::Reference< XInputStream > &xSrcStream, sal_Int64 nStreamSize)
sal_Int64 mnStreamSize
position in stream
virtual sal_Int32 SAL_CALL readBytes(css::uno::Sequence< sal_Int8 > &aData, sal_Int32 nBytesToRead) override
std::queue< Buffer > maUsedBuffers
Buffers that are available for use.
virtual void SAL_CALL closeInput() override
int mnOffset
Buffer block in use.
std::condition_variable maBufferProduceResume
rtl::Reference< salhelper::Thread > mxUnzippingThread
virtual ~XBufferedThreadedStream() override
virtual sal_Int32 SAL_CALL readSomeBytes(css::uno::Sequence< sal_Int8 > &aData, sal_Int32 nMaxBytesToRead) override
void produce()
Reads from UnbufferedStream in a separate thread and stores the buffer blocks in maPendingBuffers que...
const css::uno::Reference< XInputStream > mxSrcStream
std::exception_ptr maSavedException
indicates the failure of one of the threads
static const size_t nBufferLowWater
exception caught during unzipping is saved to be thrown during reading
std::queue< Buffer > maPendingBuffers
position in maInUseBuffer
virtual void execute()=0
std::unique_ptr< sal_Int32[]> pData
constexpr OUStringLiteral aData
int i
#define SAL_MAX_INT32