SeqAn3 3.1.0
The Modern C++ library for sequence analysis.
bgzf_ostream.hpp
1// zipstream Library License:
2// --------------------------
3//
4// The zlib/libpng License Copyright (c) 2003 Jonathan de Halleux.
5//
6// This software is provided 'as-is', without any express or implied warranty. In no event will the authors be held liable for any damages arising from the use of this software.
7//
8// Permission is granted to anyone to use this software for any purpose, including commercial applications, and to alter it and redistribute it freely, subject to the following restrictions:
9//
10// 1. The origin of this software must not be misrepresented; you must not claim that you wrote the original software. If you use this software in a product, an acknowledgment in the product documentation would be appreciated but is not required.
11//
12// 2. Altered source versions must be plainly marked as such, and must not be misrepresented as being the original software.
13//
14// 3. This notice may not be removed or altered from any source distribution
15//
16//
17// Author: Jonathan de Halleux, dehalleux@pelikhan.com, 2003 (original zlib stream)
18// Author: David Weese, dave.weese@gmail.com, 2014 (extension to parallel block-wise compression in bgzf format)
19// Author: René Rahn, rene.rahn [at] fu-berlin.de, 2019 (adaptions to SeqAn library version 3)
20
21#pragma once
22
26
27#if !defined(SEQAN3_HAS_ZLIB) && !defined(SEQAN3_HEADER_TEST)
28# error "This file cannot be used when building without GZip-support."
29#endif // !defined(SEQAN3_HAS_ZLIB) && !defined(SEQAN3_HEADER_TEST)
30
31#if defined(SEQAN3_HAS_ZLIB)
32
33namespace seqan3::contrib
34{
35
36// --------------------------------------------------------------------------
37// Class basic_bgzf_ostreambuf
38// --------------------------------------------------------------------------
39
40template<
41 typename Elem,
42 typename Tr = std::char_traits<Elem>,
43 typename ElemA = std::allocator<Elem>,
44 typename ByteT = char,
45 typename ByteAT = std::allocator<ByteT>
46>
47class basic_bgzf_ostreambuf : public std::basic_streambuf<Elem, Tr>
48{
49private:
50
51 typedef std::basic_ostream<Elem, Tr>& ostream_reference;
52 typedef ElemA char_allocator_type;
53 typedef ByteT byte_type;
54 typedef ByteAT byte_allocator_type;
55 typedef byte_type* byte_buffer_type;
56 typedef ConcurrentQueue<size_t, Suspendable<Limit> > job_queue_type;
57
58public:
59
60 typedef Tr traits_type;
61 typedef typename traits_type::char_type char_type;
62 typedef typename traits_type::int_type int_type;
63 typedef typename traits_type::pos_type pos_type;
64 typedef typename traits_type::off_type off_type;
65
66 struct ScopedLock
67 {
68 ScopedLock(std::function<void()> complete_fn) : completion(std::move(complete_fn))
69 {}
70
71 ~ScopedLock()
72 {
73 completion();
74 }
75
76 std::function<void()> completion;
77 };
78
79 // One compressed block.
80 struct OutputBuffer
81 {
82 char buffer[DefaultPageSize<detail::bgzf_compression>::MAX_BLOCK_SIZE];
83 size_t size;
84 };
85
86 // Writes the output to the underlying stream when invoked.
87 struct BufferWriter
88 {
89 ostream_reference ostream;
90
91 BufferWriter(ostream_reference ostream) :
92 ostream(ostream)
93 {}
94
95 bool operator() (OutputBuffer const & outputBuffer)
96 {
97 ostream.write(outputBuffer.buffer, outputBuffer.size);
98 return ostream.good();
99 }
100 };
101
102 struct CompressionJob
103 {
105
106 TBuffer buffer;
107 size_t size;
108 OutputBuffer *outputBuffer;
109
110 CompressionJob() :
111 buffer(DefaultPageSize<detail::bgzf_compression>::VALUE / sizeof(char_type), 0),
112 size(0),
113 outputBuffer(NULL)
114 {}
115 };
116
117 // string of recycable jobs
118 size_t numThreads;
119 size_t numJobs;
121 job_queue_type jobQueue;
122 job_queue_type idleQueue;
123 Serializer<OutputBuffer, BufferWriter> serializer;
124 size_t currentJobId;
125 bool currentJobAvail;
126
127 struct CompressionThread
128 {
129 basic_bgzf_ostreambuf *streamBuf;
130 CompressionContext<detail::bgzf_compression> compressionCtx;
131
132 void operator()()
133 {
134 ScopedLock readLock{[this] () mutable { unlockReading(this->streamBuf->jobQueue); }};
135 // ScopedReadLock<TJobQueue> readLock(streamBuf->jobQueue);
136 ScopedLock writeLock{[this] () mutable { unlockWriting(this->streamBuf->idleQueue); }};
137 // ScopedWriteLock{obQueue> writeLock{str}amBuf->idleQueue);
138
139 // wait for a new job to become available
140 bool success = true;
141 while (success)
142 {
143 size_t jobId = -1;
144 if (!popFront(jobId, streamBuf->jobQueue))
145 return;
146
147 CompressionJob &job = streamBuf->jobs[jobId];
148
149 // compress block with zlib
150 job.outputBuffer->size = _compressBlock(
151 job.outputBuffer->buffer, sizeof(job.outputBuffer->buffer),
152 &job.buffer[0], job.size, compressionCtx);
153
154 success = releaseValue(streamBuf->serializer, job.outputBuffer);
155 appendValue(streamBuf->idleQueue, jobId);
156 }
157 }
158 };
159
160 // array of worker threads
161 // using TFuture = decltype(std::async(CompressionThread{nullptr, CompressionContext<BgzfFile>{}, static_cast<size_t>(0)}));
163
164 basic_bgzf_ostreambuf(ostream_reference ostream_,
165 size_t numThreads = bgzf_thread_count,
166 size_t jobsPerThread = 8) :
167 numThreads(numThreads),
168 numJobs(numThreads * jobsPerThread),
169 jobQueue(numJobs),
170 idleQueue(numJobs),
171 serializer(ostream_, numThreads * jobsPerThread)
172 {
173 jobs.resize(numJobs);
174 currentJobId = 0;
175
176 lockWriting(jobQueue);
177 lockReading(idleQueue);
178 setReaderWriterCount(jobQueue, numThreads, 1);
179 setReaderWriterCount(idleQueue, 1, numThreads);
180
181 // Prepare idle queue.
182 for (size_t i = 0; i < numJobs; ++i)
183 {
184 [[maybe_unused]] bool success = appendValue(idleQueue, i);
185 assert(success);
186 }
187
188 // Start off threads.
189 for (size_t i = 0; i < numThreads; ++i)
190 pool.emplace_back(CompressionThread{this, CompressionContext<detail::bgzf_compression>{}});
191
192 currentJobAvail = popFront(currentJobId, idleQueue);
193 assert(currentJobAvail);
194
195 CompressionJob &job = jobs[currentJobId];
196 job.outputBuffer = aquireValue(serializer);
197 this->setp(&job.buffer[0], &job.buffer[0] + (job.buffer.size() - 1));
198 }
199
200 ~basic_bgzf_ostreambuf()
201 {
202 // the buffer is now (after addFooter()) and flush will append the empty EOF marker
203 flush(true);
204
205 unlockWriting(jobQueue);
206
207 // Wait for threads to finish there active work.
208 for (auto & t : pool)
209 {
210 if (t.joinable())
211 t.join();
212 }
213
214 unlockReading(idleQueue);
215 }
216
217 bool compressBuffer(size_t size)
218 {
219 // submit current job
220 if (currentJobAvail)
221 {
222 jobs[currentJobId].size = size;
223 appendValue(jobQueue, currentJobId);
224 }
225
226 // recycle existing idle job
227 if (!(currentJobAvail = popFront(currentJobId, idleQueue)))
228 return false;
229
230 jobs[currentJobId].outputBuffer = aquireValue(serializer);
231
232 return serializer;
233 }
234
235 int_type overflow(int_type c)
236 {
237 int w = static_cast<int>(this->pptr() - this->pbase());
238 if (c != static_cast<int_type>(EOF))
239 {
240 *this->pptr() = c;
241 ++w;
242 }
243 if (compressBuffer(w))
244 {
245 CompressionJob &job = jobs[currentJobId];
246 this->setp(&job.buffer[0], &job.buffer[0] + (job.buffer.size() - 1));
247 return c;
248 }
249 else
250 {
251 return EOF;
252 }
253 }
254
255 std::streamsize flush(bool flushEmptyBuffer = false)
256 {
257 int w = static_cast<int>(this->pptr() - this->pbase());
258 if ((w != 0 || flushEmptyBuffer) && compressBuffer(w))
259 {
260 CompressionJob &job = jobs[currentJobId];
261 this->setp(&job.buffer[0], &job.buffer[0] + (job.buffer.size() - 1));
262 }
263 else
264 {
265 w = 0;
266 }
267
268 // wait for running compressor threads
269 waitForMinSize(idleQueue, numJobs - 1);
270
271 serializer.worker.ostream.flush();
272 return w;
273 }
274
275 int sync()
276 {
277 if (this->pptr() != this->pbase())
278 {
279 int_type c = overflow(EOF);
280 if (c == static_cast<int_type>(EOF))
281 return -1;
282 }
283 return 0;
284 }
285
286 void addFooter()
287 {
288 // we flush the filled buffer here, so that an empty (EOF) buffer is flushed in the d'tor
289 if (this->pptr() != this->pbase())
290 overflow(EOF);
291 }
292
293 // returns a reference to the output stream
294 ostream_reference get_ostream() const { return serializer.worker.ostream; };
295};
296
297// --------------------------------------------------------------------------
298// Class basic_bgzf_ostreambase
299// --------------------------------------------------------------------------
300
301template<
302 typename Elem,
303 typename Tr = std::char_traits<Elem>,
304 typename ElemA = std::allocator<Elem>,
305 typename ByteT = char,
306 typename ByteAT = std::allocator<ByteT>
307>
308class basic_bgzf_ostreambase : virtual public std::basic_ios<Elem,Tr>
309{
310public:
311 typedef std::basic_ostream<Elem, Tr>& ostream_reference;
312 typedef basic_bgzf_ostreambuf<Elem, Tr, ElemA, ByteT, ByteAT> bgzf_streambuf_type;
313
314 basic_bgzf_ostreambase(ostream_reference ostream_)
315 : m_buf(ostream_)
316 {
317 this->init(&m_buf );
318 };
319
320 // returns the underlying zip ostream object
321 bgzf_streambuf_type* rdbuf() { return &m_buf; };
322 // returns the bgzf error state
323 int get_zerr() const { return m_buf.get_err(); };
324 // returns the uncompressed data crc
325 long get_crc() const { return m_buf.get_crc(); };
326 // returns the compressed data size
327 long get_out_size() const { return m_buf.get_out_size(); };
328 // returns the uncompressed data size
329 long get_in_size() const { return m_buf.get_in_size(); };
330
331private:
332 bgzf_streambuf_type m_buf;
333};
334
335// --------------------------------------------------------------------------
336// Class basic_bgzf_ostream
337// --------------------------------------------------------------------------
338
339template<
340 typename Elem,
341 typename Tr = std::char_traits<Elem>,
342 typename ElemA = std::allocator<Elem>,
343 typename ByteT = char,
344 typename ByteAT = std::allocator<ByteT>
345>
346class basic_bgzf_ostream :
347 public basic_bgzf_ostreambase<Elem,Tr,ElemA,ByteT,ByteAT>,
348 public std::basic_ostream<Elem,Tr>
349{
350public:
351 typedef basic_bgzf_ostreambase<Elem,Tr,ElemA,ByteT,ByteAT> bgzf_ostreambase_type;
352 typedef std::basic_ostream<Elem,Tr> ostream_type;
353 typedef ostream_type& ostream_reference;
354
355 basic_bgzf_ostream(ostream_reference ostream_) :
356 bgzf_ostreambase_type(ostream_),
357 ostream_type(bgzf_ostreambase_type::rdbuf())
358 {}
359
360 // flush inner buffer and zipper buffer
361 basic_bgzf_ostream<Elem,Tr>& flush()
362 {
363 ostream_type::flush(); this->rdbuf()->flush(); return *this;
364 };
365
366 ~basic_bgzf_ostream()
367 {
368 this->rdbuf()->addFooter();
369 }
370
371private:
372 static void put_long(ostream_reference out_, unsigned long x_);
373#ifdef _WIN32
374 void _Add_vtordisp1() { } // Required to avoid VC++ warning C4250
375 void _Add_vtordisp2() { } // Required to avoid VC++ warning C4250
376#endif
377};
378
379// ===========================================================================
380// Typedefs
381// ===========================================================================
382
383// A typedef for basic_bgzf_ostream<char>
384typedef basic_bgzf_ostream<char> bgzf_ostream;
385// A typedef for basic_bgzf_ostream<wchar_t>
386typedef basic_bgzf_ostream<wchar_t> bgzf_wostream;
387
388} // namespace seqan3::contrib
389
390#endif // defined(SEQAN3_HAS_ZLIB)
Provides stream compression utilities.
T emplace_back(T... args)
T flush(T... args)
constexpr size_t size
The size of a type pack.
Definition: traits.hpp:151
T init(T... args)
SeqAn specific customisations in the standard namespace.
T rdbuf(T... args)
T resize(T... args)
Provides helper structs from SeqAn2 for the bgzf_ostream.
T size(T... args)
Provides seqan suspendable queue.