SeqAn3 3.4.0-rc.1
The Modern C++ library for sequence analysis.
Loading...
Searching...
No Matches
bgzf_ostream.hpp
1// SPDX-FileCopyrightText: 2003 Jonathan de Halleux
2// SPDX-License-Identifier: Zlib
3// zipstream Library License:
4// --------------------------
5//
6// The zlib/libpng License Copyright (c) 2003 Jonathan de Halleux.
7//
8// This software is provided 'as-is', without any express or implied warranty. In no event will the authors be held liable for any damages arising from the use of this software.
9//
10// Permission is granted to anyone to use this software for any purpose, including commercial applications, and to alter it and redistribute it freely, subject to the following restrictions:
11//
12// 1. The origin of this software must not be misrepresented; you must not claim that you wrote the original software. If you use this software in a product, an acknowledgment in the product documentation would be appreciated but is not required.
13//
14// 2. Altered source versions must be plainly marked as such, and must not be misrepresented as being the original software.
15//
16// 3. This notice may not be removed or altered from any source distribution
17//
18//
19// Author: Jonathan de Halleux, dehalleux@pelikhan.com, 2003 (original zlib stream)
20// Author: David Weese, dave.weese@gmail.com, 2014 (extension to parallel block-wise compression in bgzf format)
21// Author: René Rahn, rene.rahn [at] fu-berlin.de, 2019 (adaptions to SeqAn library version 3)
22
23#pragma once
24
25#include <cassert>
26#include <functional>
27
31
32#if !defined(SEQAN3_HAS_ZLIB) && !defined(SEQAN3_HEADER_TEST)
33# error "This file cannot be used when building without GZip-support."
34#endif // !defined(SEQAN3_HAS_ZLIB) && !defined(SEQAN3_HEADER_TEST)
35
36#if defined(SEQAN3_HAS_ZLIB)
37
38namespace seqan3::contrib
39{
40
41// --------------------------------------------------------------------------
42// Class basic_bgzf_ostreambuf
43// --------------------------------------------------------------------------
44
45template<
46 typename Elem,
47 typename Tr = std::char_traits<Elem>,
48 typename ElemA = std::allocator<Elem>,
49 typename ByteT = char,
50 typename ByteAT = std::allocator<ByteT>
51>
52class basic_bgzf_ostreambuf : public std::basic_streambuf<Elem, Tr>
53{
54private:
55
56 typedef std::basic_ostream<Elem, Tr>& ostream_reference;
57 typedef ElemA char_allocator_type;
58 typedef ByteT byte_type;
59 typedef ByteAT byte_allocator_type;
60 typedef byte_type* byte_buffer_type;
61 typedef ConcurrentQueue<size_t, Suspendable<Limit> > job_queue_type;
62
63public:
64
65 typedef Tr traits_type;
66 typedef typename traits_type::char_type char_type;
67 typedef typename traits_type::int_type int_type;
68 typedef typename traits_type::pos_type pos_type;
69 typedef typename traits_type::off_type off_type;
70
71 struct ScopedLock
72 {
73 ScopedLock(std::function<void()> complete_fn) : completion(std::move(complete_fn))
74 {}
75
76 ~ScopedLock()
77 {
78 completion();
79 }
80
81 std::function<void()> completion;
82 };
83
84 // One compressed block.
85 struct OutputBuffer
86 {
87 char buffer[DefaultPageSize<detail::bgzf_compression>::MAX_BLOCK_SIZE];
88 size_t size;
89 };
90
91 // Writes the output to the underlying stream when invoked.
92 struct BufferWriter
93 {
94 ostream_reference ostream;
95
96 BufferWriter(ostream_reference ostream) :
97 ostream(ostream)
98 {}
99
100 bool operator() (OutputBuffer const & outputBuffer)
101 {
102 ostream.write(outputBuffer.buffer, outputBuffer.size);
103 return ostream.good();
104 }
105 };
106
107 struct CompressionJob
108 {
110
111 TBuffer buffer;
112 size_t size;
113 OutputBuffer *outputBuffer;
114
115 CompressionJob() :
116 buffer(DefaultPageSize<detail::bgzf_compression>::VALUE / sizeof(char_type), 0),
117 size(0),
118 outputBuffer(NULL)
119 {}
120 };
121
122 // string of recycable jobs
123 size_t numThreads;
124 size_t numJobs;
126 job_queue_type jobQueue;
127 job_queue_type idleQueue;
128 Serializer<OutputBuffer, BufferWriter> serializer;
129 size_t currentJobId;
130 bool currentJobAvail;
131
132 struct CompressionThread
133 {
134 basic_bgzf_ostreambuf *streamBuf;
135 CompressionContext<detail::bgzf_compression> compressionCtx;
136
137 void operator()()
138 {
139 ScopedLock readLock{[this] () mutable { unlockReading(this->streamBuf->jobQueue); }};
140 // ScopedReadLock<TJobQueue> readLock(streamBuf->jobQueue);
141 ScopedLock writeLock{[this] () mutable { unlockWriting(this->streamBuf->idleQueue); }};
142 // ScopedWriteLock{obQueue> writeLock{str}amBuf->idleQueue);
143
144 // wait for a new job to become available
145 bool success = true;
146 while (success)
147 {
148 size_t jobId = -1;
149 if (!popFront(jobId, streamBuf->jobQueue))
150 return;
151
152 CompressionJob &job = streamBuf->jobs[jobId];
153
154 // compress block with zlib
155 job.outputBuffer->size = _compressBlock(
156 job.outputBuffer->buffer, sizeof(job.outputBuffer->buffer),
157 &job.buffer[0], job.size, compressionCtx);
158
159 success = releaseValue(streamBuf->serializer, job.outputBuffer);
160 appendValue(streamBuf->idleQueue, jobId);
161 }
162 }
163 };
164
165 // array of worker threads
166 // using TFuture = decltype(std::async(CompressionThread{nullptr, CompressionContext<BgzfFile>{}, static_cast<size_t>(0)}));
168
169 basic_bgzf_ostreambuf(ostream_reference ostream_,
170 size_t numThreads = bgzf_thread_count,
171 size_t jobsPerThread = 8) :
172 numThreads(numThreads),
173 numJobs(numThreads * jobsPerThread),
174 jobQueue(numJobs),
175 idleQueue(numJobs),
176 serializer(ostream_, numThreads * jobsPerThread)
177 {
178 jobs.resize(numJobs);
179 currentJobId = 0;
180
181 lockWriting(jobQueue);
182 lockReading(idleQueue);
183 setReaderWriterCount(jobQueue, numThreads, 1);
184 setReaderWriterCount(idleQueue, 1, numThreads);
185
186 // Prepare idle queue.
187 for (size_t i = 0; i < numJobs; ++i)
188 {
189 [[maybe_unused]] bool success = appendValue(idleQueue, i);
190 assert(success);
191 }
192
193 // Start off threads.
194 for (size_t i = 0; i < numThreads; ++i)
195 pool.emplace_back(CompressionThread{this, CompressionContext<detail::bgzf_compression>{}});
196
197 currentJobAvail = popFront(currentJobId, idleQueue);
198 assert(currentJobAvail);
199
200 CompressionJob &job = jobs[currentJobId];
201 job.outputBuffer = aquireValue(serializer);
202 this->setp(&job.buffer[0], &job.buffer[0] + (job.buffer.size() - 1));
203 }
204
205 ~basic_bgzf_ostreambuf()
206 {
207 // the buffer is now (after addFooter()) and flush will append the empty EOF marker
208 flush(true);
209
210 unlockWriting(jobQueue);
211
212 // Wait for threads to finish there active work.
213 for (auto & t : pool)
214 {
215 if (t.joinable())
216 t.join();
217 }
218
219 unlockReading(idleQueue);
220 }
221
222 bool compressBuffer(size_t size)
223 {
224 // submit current job
225 if (currentJobAvail)
226 {
227 jobs[currentJobId].size = size;
228 appendValue(jobQueue, currentJobId);
229 }
230
231 // recycle existing idle job
232 if (!(currentJobAvail = popFront(currentJobId, idleQueue)))
233 return false;
234
235 jobs[currentJobId].outputBuffer = aquireValue(serializer);
236
237 return serializer;
238 }
239
240 int_type overflow(int_type c)
241 {
242 int w = static_cast<int>(this->pptr() - this->pbase());
243 if (c != static_cast<int_type>(EOF))
244 {
245 *this->pptr() = c;
246 ++w;
247 }
248 if (compressBuffer(w))
249 {
250 CompressionJob &job = jobs[currentJobId];
251 this->setp(&job.buffer[0], &job.buffer[0] + (job.buffer.size() - 1));
252 return c;
253 }
254 else
255 {
256 return EOF;
257 }
258 }
259
260 std::streamsize flush(bool flushEmptyBuffer = false)
261 {
262 int w = static_cast<int>(this->pptr() - this->pbase());
263 if ((w != 0 || flushEmptyBuffer) && compressBuffer(w))
264 {
265 CompressionJob &job = jobs[currentJobId];
266 this->setp(&job.buffer[0], &job.buffer[0] + (job.buffer.size() - 1));
267 }
268 else
269 {
270 w = 0;
271 }
272
273 // wait for running compressor threads
274 waitForMinSize(idleQueue, numJobs - 1);
275
276 serializer.worker.ostream.flush();
277 return w;
278 }
279
280 int sync()
281 {
282 if (this->pptr() != this->pbase())
283 {
284 int_type c = overflow(EOF);
285 if (c == static_cast<int_type>(EOF))
286 return -1;
287 }
288 return 0;
289 }
290
291 void addFooter()
292 {
293 // we flush the filled buffer here, so that an empty (EOF) buffer is flushed in the d'tor
294 if (this->pptr() != this->pbase())
295 overflow(EOF);
296 }
297
298 // returns a reference to the output stream
299 ostream_reference get_ostream() const { return serializer.worker.ostream; };
300};
301
302// --------------------------------------------------------------------------
303// Class basic_bgzf_ostreambase
304// --------------------------------------------------------------------------
305
306template<
307 typename Elem,
308 typename Tr = std::char_traits<Elem>,
309 typename ElemA = std::allocator<Elem>,
310 typename ByteT = char,
311 typename ByteAT = std::allocator<ByteT>
312>
313class basic_bgzf_ostreambase : virtual public std::basic_ios<Elem,Tr>
314{
315public:
316 typedef std::basic_ostream<Elem, Tr>& ostream_reference;
317 typedef basic_bgzf_ostreambuf<Elem, Tr, ElemA, ByteT, ByteAT> bgzf_streambuf_type;
318
319 basic_bgzf_ostreambase(ostream_reference ostream_)
320 : m_buf(ostream_)
321 {
322 this->init(&m_buf );
323 };
324
325 // returns the underlying zip ostream object
326 bgzf_streambuf_type* rdbuf() { return &m_buf; };
327 // returns the bgzf error state
328 int get_zerr() const { return m_buf.get_err(); };
329 // returns the uncompressed data crc
330 long get_crc() const { return m_buf.get_crc(); };
331 // returns the compressed data size
332 long get_out_size() const { return m_buf.get_out_size(); };
333 // returns the uncompressed data size
334 long get_in_size() const { return m_buf.get_in_size(); };
335
336private:
337 bgzf_streambuf_type m_buf;
338};
339
340// --------------------------------------------------------------------------
341// Class basic_bgzf_ostream
342// --------------------------------------------------------------------------
343
344template<
345 typename Elem,
346 typename Tr = std::char_traits<Elem>,
347 typename ElemA = std::allocator<Elem>,
348 typename ByteT = char,
349 typename ByteAT = std::allocator<ByteT>
350>
351class basic_bgzf_ostream :
352 public basic_bgzf_ostreambase<Elem,Tr,ElemA,ByteT,ByteAT>,
353 public std::basic_ostream<Elem,Tr>
354{
355public:
356 typedef basic_bgzf_ostreambase<Elem,Tr,ElemA,ByteT,ByteAT> bgzf_ostreambase_type;
357 typedef std::basic_ostream<Elem,Tr> ostream_type;
358 typedef ostream_type& ostream_reference;
359
360 basic_bgzf_ostream(ostream_reference ostream_) :
361 bgzf_ostreambase_type(ostream_),
362 ostream_type(bgzf_ostreambase_type::rdbuf())
363 {}
364
365 // flush inner buffer and zipper buffer
366 basic_bgzf_ostream<Elem,Tr>& flush()
367 {
368 ostream_type::flush(); this->rdbuf()->flush(); return *this;
369 };
370
371 ~basic_bgzf_ostream()
372 {
373 this->rdbuf()->addFooter();
374 }
375
376private:
377 static void put_long(ostream_reference out_, unsigned long x_);
378#ifdef _WIN32
379 void _Add_vtordisp1() { } // Required to avoid VC++ warning C4250
380 void _Add_vtordisp2() { } // Required to avoid VC++ warning C4250
381#endif
382};
383
384// ===========================================================================
385// Typedefs
386// ===========================================================================
387
388// A typedef for basic_bgzf_ostream<char>
389typedef basic_bgzf_ostream<char> bgzf_ostream;
390// A typedef for basic_bgzf_ostream<wchar_t>
391typedef basic_bgzf_ostream<wchar_t> bgzf_wostream;
392
393} // namespace seqan3::contrib
394
395#endif // defined(SEQAN3_HAS_ZLIB)
Provides stream compression utilities.
T emplace_back(T... args)
T flush(T... args)
constexpr size_t size
The size of a type pack.
Definition type_pack/traits.hpp:143
T init(T... args)
T move(T... args)
SeqAn specific customisations in the standard namespace.
T rdbuf(T... args)
T resize(T... args)
Provides helper structs from SeqAn2 for the bgzf_ostream.
T size(T... args)
Provides seqan suspendable queue.
Hide me