SeqAn3 3.4.0-rc.1
The Modern C++ library for sequence analysis.
Loading...
Searching...
No Matches
bgzf_istream.hpp
1// SPDX-FileCopyrightText: 2003 Jonathan de Halleux
2// SPDX-License-Identifier: Zlib
3// zipstream Library License:
4// --------------------------
5//
6// The zlib/libpng License Copyright (c) 2003 Jonathan de Halleux.
7//
8// This software is provided 'as-is', without any express or implied warranty. In no event will the authors be held liable for any damages arising from the use of this software.
9//
10// Permission is granted to anyone to use this software for any purpose, including commercial applications, and to alter it and redistribute it freely, subject to the following restrictions:
11//
12// 1. The origin of this software must not be misrepresented; you must not claim that you wrote the original software. If you use this software in a product, an acknowledgment in the product documentation would be appreciated but is not required.
13//
14// 2. Altered source versions must be plainly marked as such, and must not be misrepresented as being the original software.
15//
16// 3. This notice may not be removed or altered from any source distribution
17//
18//
19// Author: Jonathan de Halleux, dehalleux@pelikhan.com, 2003 (original zlib stream)
20// Author: David Weese, dave.weese@gmail.com, 2014 (extension to parallel block-wise compression in bgzf format)
21// Author: Rene Rahn, rene.rahn AT fu-berlin.de, 2019 (adaption to SeqAn library version 3)
22
23#pragma once
24
25#include <cstring>
26#include <condition_variable>
27#include <mutex>
28
31#include <seqan3/utility/parallel/detail/reader_writer_manager.hpp>
32
33#if !defined(SEQAN3_HAS_ZLIB) && !defined(SEQAN3_HEADER_TEST)
34# error "This file cannot be used when building without GZip-support."
35#endif // !defined(SEQAN3_HAS_ZLIB) && !defined(SEQAN3_HEADER_TEST)
36
37#if defined(SEQAN3_HAS_ZLIB)
38namespace seqan3::contrib
39{
40
41// ===========================================================================
42// Classes
43// ===========================================================================
44
45// --------------------------------------------------------------------------
46// Class basic_bgzf_istreambuf
47// --------------------------------------------------------------------------
48
49template<
50 typename Elem,
51 typename Tr = std::char_traits<Elem>,
52 typename ElemA = std::allocator<Elem>,
53 typename ByteT = char,
54 typename ByteAT = std::allocator<ByteT>
55>
56class basic_bgzf_istreambuf : public std::basic_streambuf<Elem, Tr>
57{
58public:
59
60 typedef Tr traits_type;
61 typedef typename traits_type::char_type char_type;
62 typedef typename traits_type::int_type int_type;
63 typedef typename traits_type::off_type off_type;
64 typedef typename traits_type::pos_type pos_type;
65
66private:
67
68 typedef std::basic_istream<Elem, Tr>& istream_reference;
69
70 typedef ElemA char_allocator_type;
71 typedef ByteT byte_type;
72 typedef ByteAT byte_allocator_type;
73 typedef byte_type* byte_buffer_type;
74
76 typedef fixed_buffer_queue<int32_t> TJobQueue;
77
78 static const size_t MAX_PUTBACK = 4;
79
80 // Allows serialized access to the underlying buffer.
81 struct Serializer
82 {
83 istream_reference istream;
85 io_error *error;
86 off_type fileOfs;
87
88 Serializer(istream_reference istream) :
89 istream(istream),
90 error(NULL),
91 fileOfs(0u)
92 {}
93
94 ~Serializer()
95 {
96 delete error;
97 }
98 };
99
100 Serializer serializer;
101
102 struct DecompressionJob
103 {
105
106 TInputBuffer inputBuffer;
107 TBuffer buffer;
108 off_type fileOfs;
109 int32_t size;
110 uint32_t compressedSize;
111
112 std::mutex cs;
113 std::condition_variable readyEvent;
114 bool ready;
115 bool bgzfEofMarker;
116
117 DecompressionJob() :
118 inputBuffer(DefaultPageSize<detail::bgzf_compression>::MAX_BLOCK_SIZE, 0),
119 buffer(MAX_PUTBACK + DefaultPageSize<detail::bgzf_compression>::MAX_BLOCK_SIZE / sizeof(char_type), 0),
120 fileOfs(),
121 size(0),
122 cs(),
123 readyEvent(),
124 ready(true),
125 bgzfEofMarker(false)
126 {}
127
128 DecompressionJob(DecompressionJob const &other) :
129 inputBuffer(other.inputBuffer),
130 buffer(other.buffer),
131 fileOfs(other.fileOfs),
132 size(other.size),
133 cs(),
134 readyEvent(),
135 ready(other.ready),
136 bgzfEofMarker(other.bgzfEofMarker)
137 {}
138 };
139
140 // string of recyclable jobs
141 size_t numThreads;
142 size_t numJobs;
144 TJobQueue runningQueue;
145 TJobQueue todoQueue;
146 detail::reader_writer_manager runningQueueManager; // synchronises reader, writer with running queue.
147 detail::reader_writer_manager todoQueueManager; // synchronises reader, writer with todo queue.
148 int currentJobId;
149
150 struct DecompressionThread
151 {
152 basic_bgzf_istreambuf *streamBuf;
153 CompressionContext<detail::bgzf_compression> compressionCtx;
154
155 void operator()()
156 {
157 // Active reader to consume from todo queue.
158 auto reader_raii = streamBuf->todoQueueManager.register_reader();
159 // Active writer to produce work for the decompression queue.
160 auto writer_raii = streamBuf->runningQueueManager.register_writer();
161
162 // wait for a new job to become available
163 while (true)
164 {
165
166 int jobId = -1;
167 if (streamBuf->todoQueue.wait_pop(jobId) == queue_op_status::closed)
168 return;
169
170 DecompressionJob &job = streamBuf->jobs[jobId];
171 size_t tailLen = 0;
172
173 // typically the idle queue contains only ready jobs
174 // however, if seek() fast forwards running jobs into the todoQueue
175 // the caller defers the task of waiting to the decompression threads
176 if (!job.ready)
177 {
179 job.readyEvent.wait(lock, [&job]{return job.ready;});
180 assert(job.ready == true);
181 }
182
183 {
184 std::lock_guard<std::mutex> scopedLock(streamBuf->serializer.lock);
185
186 job.bgzfEofMarker = false;
187 if (streamBuf->serializer.error != NULL)
188 return;
189
190 // remember start offset (for tellg later)
191 job.fileOfs = streamBuf->serializer.fileOfs;
192 job.size = -1;
193 job.compressedSize = 0;
194
195 // only load if not at EOF
196 if (job.fileOfs != -1)
197 {
198 // read header
199 streamBuf->serializer.istream.read(
200 (char_type*)&job.inputBuffer[0],
201 DefaultPageSize<detail::bgzf_compression>::BLOCK_HEADER_LENGTH);
202
203 if (!streamBuf->serializer.istream.good())
204 {
205 streamBuf->serializer.fileOfs = -1;
206 if (streamBuf->serializer.istream.eof())
207 goto eofSkip;
208 streamBuf->serializer.error = new io_error("Stream read error.");
209 return;
210 }
211
212 // check header
213 if (!detail::bgzf_compression::validate_header(std::span{job.inputBuffer}))
214 {
215 streamBuf->serializer.fileOfs = -1;
216 streamBuf->serializer.error = new io_error("Invalid BGZF block header.");
217 return;
218 }
219
220 // extract length of compressed data
221 tailLen = _bgzfUnpack16(&job.inputBuffer[0] + 16) +
222 1u - DefaultPageSize<detail::bgzf_compression>::BLOCK_HEADER_LENGTH;
223
224 // read compressed data and tail
225 streamBuf->serializer.istream.read(
226 (char_type*)&job.inputBuffer[0] + DefaultPageSize<detail::bgzf_compression>::BLOCK_HEADER_LENGTH,
227 tailLen);
228
229 // Check if end-of-file marker is set
230 if (memcmp(reinterpret_cast<uint8_t const *>(&job.inputBuffer[0]),
231 reinterpret_cast<uint8_t const *>(&BGZF_END_OF_FILE_MARKER[0]),
232 28) == 0)
233 {
234 job.bgzfEofMarker = true;
235 }
236
237 if (!streamBuf->serializer.istream.good())
238 {
239 streamBuf->serializer.fileOfs = -1;
240 if (streamBuf->serializer.istream.eof())
241 goto eofSkip;
242 streamBuf->serializer.error = new io_error("Stream read error.");
243 return;
244 }
245
246 job.compressedSize = DefaultPageSize<detail::bgzf_compression>::BLOCK_HEADER_LENGTH + tailLen;
247 streamBuf->serializer.fileOfs += job.compressedSize;
248 job.ready = false;
249
250 eofSkip:
251 streamBuf->serializer.istream.clear(
252 streamBuf->serializer.istream.rdstate() & ~std::ios_base::failbit);
253 }
254
255 if (streamBuf->runningQueue.try_push(jobId) != queue_op_status::success)
256 {
257 // signal that job is ready
258 {
260 job.ready = true;
261 }
262 job.readyEvent.notify_all();
263 return; // Terminate this thread.
264 }
265 }
266
267 if (!job.ready)
268 {
269 // decompress block
270 job.size = _decompressBlock(
271 &job.buffer[0] + MAX_PUTBACK, job.buffer.capacity(),
272 &job.inputBuffer[0], job.compressedSize, compressionCtx);
273
274 // signal that job is ready
275 {
277 job.ready = true;
278 }
279 job.readyEvent.notify_all();
280 }
281 }
282 }
283 };
284
285 std::vector<std::thread> pool; // pool of worker threads
286 TBuffer putbackBuffer;
287
288public:
289
290 basic_bgzf_istreambuf(istream_reference istream_,
291 size_t numThreads = bgzf_thread_count,
292 size_t jobsPerThread = 8) :
293 serializer(istream_),
294 numThreads(numThreads),
295 numJobs(numThreads * jobsPerThread),
296 runningQueue(numJobs),
297 todoQueue(numJobs),
298 runningQueueManager(detail::reader_count{1}, detail::writer_count{numThreads}, runningQueue),
299 todoQueueManager(detail::reader_count{numThreads}, detail::writer_count{1}, todoQueue),
300 putbackBuffer(MAX_PUTBACK)
301 {
302 jobs.resize(numJobs);
303 currentJobId = -1;
304
305 // Prepare todo queue.
306 for (size_t i = 0; i < numJobs; ++i)
307 {
308 [[maybe_unused]] queue_op_status status = todoQueue.try_push(i);
309 assert(status == queue_op_status::success);
310 }
311
312 // Start off the threads.
313 for (size_t i = 0; i < numThreads; ++i)
314 pool.emplace_back(DecompressionThread{this, CompressionContext<detail::bgzf_compression>{}});
315 }
316
317 ~basic_bgzf_istreambuf()
318 {
319 // Signal todoQueue that no more work is coming and close todo queue.
320 todoQueueManager.writer_arrive();
321
322 // Wait for threads to finish there active work.
323 for (auto & t : pool)
324 {
325 if (t.joinable())
326 t.join();
327 }
328
329 // Signal running queue that reader is done.
330 runningQueueManager.reader_arrive();
331 }
332
333 int_type underflow()
334 {
335 // no need to use the next buffer?
336 if (this->gptr() && this->gptr() < this->egptr())
337 return traits_type::to_int_type(*this->gptr());
338
339 size_t putback = this->gptr() - this->eback();
340 if (putback > MAX_PUTBACK)
341 putback = MAX_PUTBACK;
342
343 // save at most MAX_PUTBACK characters from previous page to putback buffer
344 if (putback != 0)
345 std::copy(
346 this->gptr() - putback,
347 this->gptr(),
348 &putbackBuffer[0]);
349
350 if (currentJobId >= 0)
351 todoQueue.wait_push(currentJobId);
352 // appendValue(todoQueue, currentJobId);
353
354 while (true)
355 {
356 if (runningQueue.wait_pop(currentJobId) == queue_op_status::closed)
357 {
358 currentJobId = -1;
359 assert(serializer.error != NULL);
360 if (serializer.error != NULL)
361 throw *serializer.error;
362 return EOF;
363 }
364
365 DecompressionJob &job = jobs[currentJobId];
366
367 // restore putback buffer
368 this->setp(&job.buffer[0], &job.buffer[0] + (job.buffer.size() - 1));
369 if (putback != 0)
370 std::copy(
371 &putbackBuffer[0],
372 &putbackBuffer[0] + putback,
373 &job.buffer[0] + (MAX_PUTBACK - putback));
374
375 // wait for the end of decompression
376 {
378 job.readyEvent.wait(lock, [&job]{return job.ready;});
379 }
380
381 size_t size = (job.size != -1)? job.size : 0;
382
383 // reset buffer pointers
384 this->setg(
385 &job.buffer[0] + (MAX_PUTBACK - putback), // beginning of putback area
386 &job.buffer[0] + MAX_PUTBACK, // read position
387 &job.buffer[0] + (MAX_PUTBACK + size)); // end of buffer
388
389 // The end of the bgzf file is reached, either if there was an error, or if the
390 // end-of-file marker was reached, while the uncompressed block had zero size.
391 if (job.size == -1 || (job.size == 0 && job.bgzfEofMarker))
392 return EOF;
393 else if (job.size > 0)
394 return traits_type::to_int_type(*this->gptr()); // return next character
395
396 throw io_error("BGZF: Invalid end condition in decompression. "
397 "Most likely due to an empty bgzf block without end-of-file marker.");
398 }
399 }
400
401 pos_type seekoff(off_type ofs, std::ios_base::seekdir dir, std::ios_base::openmode openMode)
402 {
403 if ((openMode & (std::ios_base::in | std::ios_base::out)) == std::ios_base::in)
404 {
405 if (dir == std::ios_base::cur && ofs >= 0)
406 {
407 // forward delta seek
408 while (currentJobId < 0 || this->egptr() - this->gptr() < ofs)
409 {
410 ofs -= this->egptr() - this->gptr();
411 if (this->underflow() == static_cast<int_type>(EOF))
412 break;
413 }
414
415 if (currentJobId >= 0 && ofs <= this->egptr() - this->gptr())
416 {
417 DecompressionJob &job = jobs[currentJobId];
418
419 // reset buffer pointers
420 this->setg(
421 this->eback(), // beginning of putback area
422 this->gptr() + ofs, // read position
423 this->egptr()); // end of buffer
424
425 if (this->gptr() != this->egptr())
426 return pos_type((job.fileOfs << 16) + ((this->gptr() - &job.buffer[MAX_PUTBACK])));
427 else
428 return pos_type((job.fileOfs + job.compressedSize) << 16);
429 }
430
431 }
432 else if (dir == std::ios_base::beg)
433 {
434 // random seek
435 std::streampos destFileOfs = ofs >> 16;
436
437 // are we in the same block?
438 if (currentJobId >= 0 && jobs[currentJobId].fileOfs == (off_type)destFileOfs)
439 {
440 DecompressionJob &job = jobs[currentJobId];
441
442 // reset buffer pointers
443 this->setg(
444 this->eback(), // beginning of putback area
445 &job.buffer[0] + (MAX_PUTBACK + (ofs & 0xffff)), // read position
446 this->egptr()); // end of buffer
447 return ofs;
448 }
449
450 // ok, different block
451 {
452 std::lock_guard<std::mutex> scopedLock(serializer.lock);
453
454 // remove all running jobs and put them in the idle queue unless we
455 // find our seek target
456
457 if (currentJobId >= 0)
458 todoQueue.wait_push(currentJobId);
459
460 // Note that if we are here the current job does not represent the sought block.
461 // Hence if the running queue is empty we need to explicitly unset the jobId,
462 // otherwise we would not update the serializers istream pointer to the correct position.
463 if (runningQueue.is_empty())
464 currentJobId = -1;
465
466 // empty is thread-safe in serializer.lock
467 while (!runningQueue.is_empty())
468 {
469 runningQueue.wait_pop(currentJobId);
470
471 if (jobs[currentJobId].fileOfs == (off_type)destFileOfs)
472 break;
473
474 // push back useless job
475 todoQueue.wait_push(currentJobId);
476 currentJobId = -1;
477 }
478
479 if (currentJobId == -1)
480 {
481 assert(runningQueue.is_empty());
482 serializer.istream.clear(serializer.istream.rdstate() & ~std::ios_base::eofbit);
483 if (serializer.istream.rdbuf()->pubseekpos(destFileOfs, std::ios_base::in) == destFileOfs)
484 serializer.fileOfs = destFileOfs;
485 else
486 currentJobId = -2; // temporarily signals a seek error
487 }
488 }
489
490 // if our block wasn't in the running queue yet, it should now
491 // be the first that falls out after modifying serializer.fileOfs
492 if (currentJobId == -1)
493 runningQueue.wait_pop(currentJobId);
494 else if (currentJobId == -2)
495 currentJobId = -1;
496
497 if (currentJobId >= 0)
498 {
499 // wait for the end of decompression
500 DecompressionJob &job = jobs[currentJobId];
501
502 {
504 job.readyEvent.wait(lock, [&job]{return job.ready;});
505 }
506
507 assert(job.fileOfs == (off_type)destFileOfs);
508
509 // reset buffer pointers
510 this->setg(
511 &job.buffer[0] + MAX_PUTBACK, // no putback area
512 &job.buffer[0] + (MAX_PUTBACK + (ofs & 0xffff)), // read position
513 &job.buffer[0] + (MAX_PUTBACK + job.size)); // end of buffer
514 return ofs;
515 }
516 }
517 }
518 return pos_type(off_type(-1));
519 }
520
521 pos_type seekpos(pos_type pos, std::ios_base::openmode openMode)
522 {
523 return seekoff(off_type(pos), std::ios_base::beg, openMode);
524 }
525
526 // returns the compressed input istream
527 istream_reference get_istream() { return serializer.istream; };
528};
529
530// --------------------------------------------------------------------------
531// Class basic_bgzf_istreambase
532// --------------------------------------------------------------------------
533
534template<
535 typename Elem,
536 typename Tr = std::char_traits<Elem>,
537 typename ElemA = std::allocator<Elem>,
538 typename ByteT = char,
539 typename ByteAT = std::allocator<ByteT>
540>
541class basic_bgzf_istreambase : virtual public std::basic_ios<Elem,Tr>
542{
543public:
544 typedef std::basic_istream<Elem, Tr>& istream_reference;
545 typedef basic_bgzf_istreambuf<Elem, Tr, ElemA, ByteT, ByteAT> decompression_bgzf_streambuf_type;
546
547 basic_bgzf_istreambase(istream_reference istream_)
548 : m_buf(istream_)
549 {
550 this->init(&m_buf);
551 };
552
553 // returns the underlying decompression bgzf istream object
554 decompression_bgzf_streambuf_type* rdbuf() { return &m_buf; };
555
556 // returns the bgzf error state
557 int get_zerr() const { return m_buf.get_zerr(); };
558 // returns the uncompressed data crc
559 long get_crc() const { return m_buf.get_crc(); };
560 // returns the uncompressed data size
561 long get_out_size() const { return m_buf.get_out_size(); };
562 // returns the compressed data size
563 long get_in_size() const { return m_buf.get_in_size(); };
564
565private:
566 decompression_bgzf_streambuf_type m_buf;
567};
568
569// --------------------------------------------------------------------------
570// Class basic_bgzf_istream
571// --------------------------------------------------------------------------
572
573template<
574 typename Elem,
575 typename Tr = std::char_traits<Elem>,
576 typename ElemA = std::allocator<Elem>,
577 typename ByteT = char,
578 typename ByteAT = std::allocator<ByteT>
579>
580class basic_bgzf_istream :
581 public basic_bgzf_istreambase<Elem,Tr,ElemA,ByteT,ByteAT>,
582 public std::basic_istream<Elem,Tr>
583{
584public:
585 typedef basic_bgzf_istreambase<Elem,Tr,ElemA,ByteT,ByteAT> bgzf_istreambase_type;
586 typedef std::basic_istream<Elem,Tr> istream_type;
587 typedef istream_type & istream_reference;
588 typedef char byte_type;
589
590 basic_bgzf_istream(istream_reference istream_) :
591 bgzf_istreambase_type(istream_),
592 istream_type(bgzf_istreambase_type::rdbuf()),
593 m_is_gzip(false),
594 m_gbgzf_data_size(0)
595 {};
596
597 // returns true if it is a gzip file
598 bool is_gzip() const { return m_is_gzip; };
599 // return data size check
600 bool check_data_size() const { return this->get_out_size() == m_gbgzf_data_size; };
601
602 // return the data size in the file
603 long get_gbgzf_data_size() const { return m_gbgzf_data_size; };
604
605protected:
606 static void read_long(istream_reference in_, unsigned long& x_);
607
608 int check_header();
609 bool m_is_gzip;
610 unsigned long m_gbgzf_data_size;
611
612#ifdef _WIN32
613private:
614 void _Add_vtordisp1() { } // Required to avoid VC++ warning C4250
615 void _Add_vtordisp2() { } // Required to avoid VC++ warning C4250
616#endif
617};
618
619// ===========================================================================
620// Typedefs
621// ===========================================================================
622
623// A typedef for basic_bgzf_istream<char>
624typedef basic_bgzf_istream<char> bgzf_istream;
625// A typedef for basic_bgzf_istream<wchart>
626typedef basic_bgzf_istream<wchar_t> bgzf_wistream;
627
628} // namespace seqan3::conrib
629
630#endif // defined(SEQAN3_HAS_ZLIB)
Provides stream compression utilities.
Provides seqan3::buffer_queue.
T copy(T... args)
T emplace_back(T... args)
constexpr size_t size
The size of a type pack.
Definition type_pack/traits.hpp:143
T init(T... args)
T lock(T... args)
T memcmp(T... args)
T rdbuf(T... args)
T resize(T... args)
Hide me