SeqAn3 3.2.0
The Modern C++ library for sequence analysis.
bgzf_ostream.hpp
1// zipstream Library License:
2// --------------------------
3//
4// The zlib/libpng License Copyright (c) 2003 Jonathan de Halleux.
5//
6// This software is provided 'as-is', without any express or implied warranty. In no event will the authors be held liable for any damages arising from the use of this software.
7//
8// Permission is granted to anyone to use this software for any purpose, including commercial applications, and to alter it and redistribute it freely, subject to the following restrictions:
9//
10// 1. The origin of this software must not be misrepresented; you must not claim that you wrote the original software. If you use this software in a product, an acknowledgment in the product documentation would be appreciated but is not required.
11//
12// 2. Altered source versions must be plainly marked as such, and must not be misrepresented as being the original software.
13//
14// 3. This notice may not be removed or altered from any source distribution
15//
16//
17// Author: Jonathan de Halleux, dehalleux@pelikhan.com, 2003 (original zlib stream)
18// Author: David Weese, dave.weese@gmail.com, 2014 (extension to parallel block-wise compression in bgzf format)
19// Author: René Rahn, rene.rahn [at] fu-berlin.de, 2019 (adaptions to SeqAn library version 3)
20
21#pragma once
22
23#include <cassert>
24#include <functional>
25
29
30#if !defined(SEQAN3_HAS_ZLIB) && !defined(SEQAN3_HEADER_TEST)
31# error "This file cannot be used when building without GZip-support."
32#endif // !defined(SEQAN3_HAS_ZLIB) && !defined(SEQAN3_HEADER_TEST)
33
34#if defined(SEQAN3_HAS_ZLIB)
35
36namespace seqan3::contrib
37{
38
39// --------------------------------------------------------------------------
40// Class basic_bgzf_ostreambuf
41// --------------------------------------------------------------------------
42
43template<
44 typename Elem,
45 typename Tr = std::char_traits<Elem>,
46 typename ElemA = std::allocator<Elem>,
47 typename ByteT = char,
48 typename ByteAT = std::allocator<ByteT>
49>
50class basic_bgzf_ostreambuf : public std::basic_streambuf<Elem, Tr>
51{
52private:
53
54 typedef std::basic_ostream<Elem, Tr>& ostream_reference;
55 typedef ElemA char_allocator_type;
56 typedef ByteT byte_type;
57 typedef ByteAT byte_allocator_type;
58 typedef byte_type* byte_buffer_type;
59 typedef ConcurrentQueue<size_t, Suspendable<Limit> > job_queue_type;
60
61public:
62
63 typedef Tr traits_type;
64 typedef typename traits_type::char_type char_type;
65 typedef typename traits_type::int_type int_type;
66 typedef typename traits_type::pos_type pos_type;
67 typedef typename traits_type::off_type off_type;
68
69 struct ScopedLock
70 {
71 ScopedLock(std::function<void()> complete_fn) : completion(std::move(complete_fn))
72 {}
73
74 ~ScopedLock()
75 {
76 completion();
77 }
78
79 std::function<void()> completion;
80 };
81
82 // One compressed block.
83 struct OutputBuffer
84 {
85 char buffer[DefaultPageSize<detail::bgzf_compression>::MAX_BLOCK_SIZE];
86 size_t size;
87 };
88
89 // Writes the output to the underlying stream when invoked.
90 struct BufferWriter
91 {
92 ostream_reference ostream;
93
94 BufferWriter(ostream_reference ostream) :
95 ostream(ostream)
96 {}
97
98 bool operator() (OutputBuffer const & outputBuffer)
99 {
100 ostream.write(outputBuffer.buffer, outputBuffer.size);
101 return ostream.good();
102 }
103 };
104
105 struct CompressionJob
106 {
108
109 TBuffer buffer;
110 size_t size;
111 OutputBuffer *outputBuffer;
112
113 CompressionJob() :
114 buffer(DefaultPageSize<detail::bgzf_compression>::VALUE / sizeof(char_type), 0),
115 size(0),
116 outputBuffer(NULL)
117 {}
118 };
119
120 // string of recycable jobs
121 size_t numThreads;
122 size_t numJobs;
124 job_queue_type jobQueue;
125 job_queue_type idleQueue;
126 Serializer<OutputBuffer, BufferWriter> serializer;
127 size_t currentJobId;
128 bool currentJobAvail;
129
130 struct CompressionThread
131 {
132 basic_bgzf_ostreambuf *streamBuf;
133 CompressionContext<detail::bgzf_compression> compressionCtx;
134
135 void operator()()
136 {
137 ScopedLock readLock{[this] () mutable { unlockReading(this->streamBuf->jobQueue); }};
138 // ScopedReadLock<TJobQueue> readLock(streamBuf->jobQueue);
139 ScopedLock writeLock{[this] () mutable { unlockWriting(this->streamBuf->idleQueue); }};
140 // ScopedWriteLock{obQueue> writeLock{str}amBuf->idleQueue);
141
142 // wait for a new job to become available
143 bool success = true;
144 while (success)
145 {
146 size_t jobId = -1;
147 if (!popFront(jobId, streamBuf->jobQueue))
148 return;
149
150 CompressionJob &job = streamBuf->jobs[jobId];
151
152 // compress block with zlib
153 job.outputBuffer->size = _compressBlock(
154 job.outputBuffer->buffer, sizeof(job.outputBuffer->buffer),
155 &job.buffer[0], job.size, compressionCtx);
156
157 success = releaseValue(streamBuf->serializer, job.outputBuffer);
158 appendValue(streamBuf->idleQueue, jobId);
159 }
160 }
161 };
162
163 // array of worker threads
164 // using TFuture = decltype(std::async(CompressionThread{nullptr, CompressionContext<BgzfFile>{}, static_cast<size_t>(0)}));
166
167 basic_bgzf_ostreambuf(ostream_reference ostream_,
168 size_t numThreads = bgzf_thread_count,
169 size_t jobsPerThread = 8) :
170 numThreads(numThreads),
171 numJobs(numThreads * jobsPerThread),
172 jobQueue(numJobs),
173 idleQueue(numJobs),
174 serializer(ostream_, numThreads * jobsPerThread)
175 {
176 jobs.resize(numJobs);
177 currentJobId = 0;
178
179 lockWriting(jobQueue);
180 lockReading(idleQueue);
181 setReaderWriterCount(jobQueue, numThreads, 1);
182 setReaderWriterCount(idleQueue, 1, numThreads);
183
184 // Prepare idle queue.
185 for (size_t i = 0; i < numJobs; ++i)
186 {
187 [[maybe_unused]] bool success = appendValue(idleQueue, i);
188 assert(success);
189 }
190
191 // Start off threads.
192 for (size_t i = 0; i < numThreads; ++i)
193 pool.emplace_back(CompressionThread{this, CompressionContext<detail::bgzf_compression>{}});
194
195 currentJobAvail = popFront(currentJobId, idleQueue);
196 assert(currentJobAvail);
197
198 CompressionJob &job = jobs[currentJobId];
199 job.outputBuffer = aquireValue(serializer);
200 this->setp(&job.buffer[0], &job.buffer[0] + (job.buffer.size() - 1));
201 }
202
203 ~basic_bgzf_ostreambuf()
204 {
205 // the buffer is now (after addFooter()) and flush will append the empty EOF marker
206 flush(true);
207
208 unlockWriting(jobQueue);
209
210 // Wait for threads to finish there active work.
211 for (auto & t : pool)
212 {
213 if (t.joinable())
214 t.join();
215 }
216
217 unlockReading(idleQueue);
218 }
219
220 bool compressBuffer(size_t size)
221 {
222 // submit current job
223 if (currentJobAvail)
224 {
225 jobs[currentJobId].size = size;
226 appendValue(jobQueue, currentJobId);
227 }
228
229 // recycle existing idle job
230 if (!(currentJobAvail = popFront(currentJobId, idleQueue)))
231 return false;
232
233 jobs[currentJobId].outputBuffer = aquireValue(serializer);
234
235 return serializer;
236 }
237
238 int_type overflow(int_type c)
239 {
240 int w = static_cast<int>(this->pptr() - this->pbase());
241 if (c != static_cast<int_type>(EOF))
242 {
243 *this->pptr() = c;
244 ++w;
245 }
246 if (compressBuffer(w))
247 {
248 CompressionJob &job = jobs[currentJobId];
249 this->setp(&job.buffer[0], &job.buffer[0] + (job.buffer.size() - 1));
250 return c;
251 }
252 else
253 {
254 return EOF;
255 }
256 }
257
258 std::streamsize flush(bool flushEmptyBuffer = false)
259 {
260 int w = static_cast<int>(this->pptr() - this->pbase());
261 if ((w != 0 || flushEmptyBuffer) && compressBuffer(w))
262 {
263 CompressionJob &job = jobs[currentJobId];
264 this->setp(&job.buffer[0], &job.buffer[0] + (job.buffer.size() - 1));
265 }
266 else
267 {
268 w = 0;
269 }
270
271 // wait for running compressor threads
272 waitForMinSize(idleQueue, numJobs - 1);
273
274 serializer.worker.ostream.flush();
275 return w;
276 }
277
278 int sync()
279 {
280 if (this->pptr() != this->pbase())
281 {
282 int_type c = overflow(EOF);
283 if (c == static_cast<int_type>(EOF))
284 return -1;
285 }
286 return 0;
287 }
288
289 void addFooter()
290 {
291 // we flush the filled buffer here, so that an empty (EOF) buffer is flushed in the d'tor
292 if (this->pptr() != this->pbase())
293 overflow(EOF);
294 }
295
296 // returns a reference to the output stream
297 ostream_reference get_ostream() const { return serializer.worker.ostream; };
298};
299
300// --------------------------------------------------------------------------
301// Class basic_bgzf_ostreambase
302// --------------------------------------------------------------------------
303
304template<
305 typename Elem,
306 typename Tr = std::char_traits<Elem>,
307 typename ElemA = std::allocator<Elem>,
308 typename ByteT = char,
309 typename ByteAT = std::allocator<ByteT>
310>
311class basic_bgzf_ostreambase : virtual public std::basic_ios<Elem,Tr>
312{
313public:
314 typedef std::basic_ostream<Elem, Tr>& ostream_reference;
315 typedef basic_bgzf_ostreambuf<Elem, Tr, ElemA, ByteT, ByteAT> bgzf_streambuf_type;
316
317 basic_bgzf_ostreambase(ostream_reference ostream_)
318 : m_buf(ostream_)
319 {
320 this->init(&m_buf );
321 };
322
323 // returns the underlying zip ostream object
324 bgzf_streambuf_type* rdbuf() { return &m_buf; };
325 // returns the bgzf error state
326 int get_zerr() const { return m_buf.get_err(); };
327 // returns the uncompressed data crc
328 long get_crc() const { return m_buf.get_crc(); };
329 // returns the compressed data size
330 long get_out_size() const { return m_buf.get_out_size(); };
331 // returns the uncompressed data size
332 long get_in_size() const { return m_buf.get_in_size(); };
333
334private:
335 bgzf_streambuf_type m_buf;
336};
337
338// --------------------------------------------------------------------------
339// Class basic_bgzf_ostream
340// --------------------------------------------------------------------------
341
342template<
343 typename Elem,
344 typename Tr = std::char_traits<Elem>,
345 typename ElemA = std::allocator<Elem>,
346 typename ByteT = char,
347 typename ByteAT = std::allocator<ByteT>
348>
349class basic_bgzf_ostream :
350 public basic_bgzf_ostreambase<Elem,Tr,ElemA,ByteT,ByteAT>,
351 public std::basic_ostream<Elem,Tr>
352{
353public:
354 typedef basic_bgzf_ostreambase<Elem,Tr,ElemA,ByteT,ByteAT> bgzf_ostreambase_type;
355 typedef std::basic_ostream<Elem,Tr> ostream_type;
356 typedef ostream_type& ostream_reference;
357
358 basic_bgzf_ostream(ostream_reference ostream_) :
359 bgzf_ostreambase_type(ostream_),
360 ostream_type(bgzf_ostreambase_type::rdbuf())
361 {}
362
363 // flush inner buffer and zipper buffer
364 basic_bgzf_ostream<Elem,Tr>& flush()
365 {
366 ostream_type::flush(); this->rdbuf()->flush(); return *this;
367 };
368
369 ~basic_bgzf_ostream()
370 {
371 this->rdbuf()->addFooter();
372 }
373
374private:
375 static void put_long(ostream_reference out_, unsigned long x_);
376#ifdef _WIN32
377 void _Add_vtordisp1() { } // Required to avoid VC++ warning C4250
378 void _Add_vtordisp2() { } // Required to avoid VC++ warning C4250
379#endif
380};
381
382// ===========================================================================
383// Typedefs
384// ===========================================================================
385
386// A typedef for basic_bgzf_ostream<char>
387typedef basic_bgzf_ostream<char> bgzf_ostream;
388// A typedef for basic_bgzf_ostream<wchar_t>
389typedef basic_bgzf_ostream<wchar_t> bgzf_wostream;
390
391} // namespace seqan3::contrib
392
393#endif // defined(SEQAN3_HAS_ZLIB)
Provides stream compression utilities.
T emplace_back(T... args)
T flush(T... args)
constexpr size_t size
The size of a type pack.
Definition: traits.hpp:146
T init(T... args)
T move(T... args)
SeqAn specific customisations in the standard namespace.
T rdbuf(T... args)
T resize(T... args)
Provides helper structs from SeqAn2 for the bgzf_ostream.
T size(T... args)
Provides seqan suspendable queue.