SeqAn3 3.1.0
The Modern C++ library for sequence analysis.
bgzf_istream.hpp
1// zipstream Library License:
2// --------------------------
3//
4// The zlib/libpng License Copyright (c) 2003 Jonathan de Halleux.
5//
6// This software is provided 'as-is', without any express or implied warranty. In no event will the authors be held liable for any damages arising from the use of this software.
7//
8// Permission is granted to anyone to use this software for any purpose, including commercial applications, and to alter it and redistribute it freely, subject to the following restrictions:
9//
10// 1. The origin of this software must not be misrepresented; you must not claim that you wrote the original software. If you use this software in a product, an acknowledgment in the product documentation would be appreciated but is not required.
11//
12// 2. Altered source versions must be plainly marked as such, and must not be misrepresented as being the original software.
13//
14// 3. This notice may not be removed or altered from any source distribution
15//
16//
17// Author: Jonathan de Halleux, dehalleux@pelikhan.com, 2003 (original zlib stream)
18// Author: David Weese, dave.weese@gmail.com, 2014 (extension to parallel block-wise compression in bgzf format)
19// Author: Rene Rahn, rene.rahn AT fu-berlin.de, 2019 (adaption to SeqAn library version 3)
20
21#pragma once
22
23#include <cstring>
24#include <condition_variable>
25#include <mutex>
26
30
31#if !defined(SEQAN3_HAS_ZLIB) && !defined(SEQAN3_HEADER_TEST)
32# error "This file cannot be used when building without GZip-support."
33#endif // !defined(SEQAN3_HAS_ZLIB) && !defined(SEQAN3_HEADER_TEST)
34
35#if defined(SEQAN3_HAS_ZLIB)
36namespace seqan3::contrib
37{
38
39// ===========================================================================
40// Classes
41// ===========================================================================
42
43// --------------------------------------------------------------------------
44// Class basic_bgzf_istreambuf
45// --------------------------------------------------------------------------
46
47template<
48 typename Elem,
49 typename Tr = std::char_traits<Elem>,
50 typename ElemA = std::allocator<Elem>,
51 typename ByteT = char,
52 typename ByteAT = std::allocator<ByteT>
53>
54class basic_bgzf_istreambuf : public std::basic_streambuf<Elem, Tr>
55{
56public:
57
58 typedef Tr traits_type;
59 typedef typename traits_type::char_type char_type;
60 typedef typename traits_type::int_type int_type;
61 typedef typename traits_type::off_type off_type;
62 typedef typename traits_type::pos_type pos_type;
63
64private:
65
66 typedef std::basic_istream<Elem, Tr>& istream_reference;
67
68 typedef ElemA char_allocator_type;
69 typedef ByteT byte_type;
70 typedef ByteAT byte_allocator_type;
71 typedef byte_type* byte_buffer_type;
72
74 typedef fixed_buffer_queue<int32_t> TJobQueue;
75
76 static const size_t MAX_PUTBACK = 4;
77
78 // Allows serialized access to the underlying buffer.
79 struct Serializer
80 {
81 istream_reference istream;
83 io_error *error;
84 off_type fileOfs;
85
86 Serializer(istream_reference istream) :
87 istream(istream),
88 error(NULL),
89 fileOfs(0u)
90 {}
91
92 ~Serializer()
93 {
94 delete error;
95 }
96 };
97
98 Serializer serializer;
99
100 struct DecompressionJob
101 {
103
104 TInputBuffer inputBuffer;
105 TBuffer buffer;
106 off_type fileOfs;
107 int32_t size;
108 uint32_t compressedSize;
109
110 std::mutex cs;
111 std::condition_variable readyEvent;
112 bool ready;
113 bool bgzfEofMarker;
114
115 DecompressionJob() :
116 inputBuffer(DefaultPageSize<detail::bgzf_compression>::MAX_BLOCK_SIZE, 0),
117 buffer(MAX_PUTBACK + DefaultPageSize<detail::bgzf_compression>::MAX_BLOCK_SIZE / sizeof(char_type), 0),
118 fileOfs(),
119 size(0),
120 cs(),
121 readyEvent(),
122 ready(true),
123 bgzfEofMarker(false)
124 {}
125
126 DecompressionJob(DecompressionJob const &other) :
127 inputBuffer(other.inputBuffer),
128 buffer(other.buffer),
129 fileOfs(other.fileOfs),
130 size(other.size),
131 cs(),
132 readyEvent(),
133 ready(other.ready),
134 bgzfEofMarker(other.bgzfEofMarker)
135 {}
136 };
137
138 // string of recyclable jobs
139 size_t numThreads;
140 size_t numJobs;
142 TJobQueue runningQueue;
143 TJobQueue todoQueue;
144 detail::reader_writer_manager runningQueueManager; // synchronises reader, writer with running queue.
145 detail::reader_writer_manager todoQueueManager; // synchronises reader, writer with todo queue.
146 int currentJobId;
147
148 struct DecompressionThread
149 {
150 basic_bgzf_istreambuf *streamBuf;
151 CompressionContext<detail::bgzf_compression> compressionCtx;
152
153 void operator()()
154 {
155 // Active reader to consume from todo queue.
156 auto reader_raii = streamBuf->todoQueueManager.register_reader();
157 // Active writer to produce work for the decompression queue.
158 auto writer_raii = streamBuf->runningQueueManager.register_writer();
159
160 // wait for a new job to become available
161 while (true)
162 {
163
164 int jobId = -1;
165 if (streamBuf->todoQueue.wait_pop(jobId) == queue_op_status::closed)
166 return;
167
168 DecompressionJob &job = streamBuf->jobs[jobId];
169 size_t tailLen = 0;
170
171 // typically the idle queue contains only ready jobs
172 // however, if seek() fast forwards running jobs into the todoQueue
173 // the caller defers the task of waiting to the decompression threads
174 if (!job.ready)
175 {
177 job.readyEvent.wait(lock, [&job]{return job.ready;});
178 assert(job.ready == true);
179 }
180
181 {
182 std::lock_guard<std::mutex> scopedLock(streamBuf->serializer.lock);
183
184 job.bgzfEofMarker = false;
185 if (streamBuf->serializer.error != NULL)
186 return;
187
188 // remember start offset (for tellg later)
189 job.fileOfs = streamBuf->serializer.fileOfs;
190 job.size = -1;
191 job.compressedSize = 0;
192
193 // only load if not at EOF
194 if (job.fileOfs != -1)
195 {
196 // read header
197 streamBuf->serializer.istream.read(
198 (char_type*)&job.inputBuffer[0],
199 DefaultPageSize<detail::bgzf_compression>::BLOCK_HEADER_LENGTH);
200
201 if (!streamBuf->serializer.istream.good())
202 {
203 streamBuf->serializer.fileOfs = -1;
204 if (streamBuf->serializer.istream.eof())
205 goto eofSkip;
206 streamBuf->serializer.error = new io_error("Stream read error.");
207 return;
208 }
209
210 // check header
211 if (!detail::bgzf_compression::validate_header(std::span{job.inputBuffer}))
212 {
213 streamBuf->serializer.fileOfs = -1;
214 streamBuf->serializer.error = new io_error("Invalid BGZF block header.");
215 return;
216 }
217
218 // extract length of compressed data
219 tailLen = _bgzfUnpack16(&job.inputBuffer[0] + 16) +
220 1u - DefaultPageSize<detail::bgzf_compression>::BLOCK_HEADER_LENGTH;
221
222 // read compressed data and tail
223 streamBuf->serializer.istream.read(
224 (char_type*)&job.inputBuffer[0] + DefaultPageSize<detail::bgzf_compression>::BLOCK_HEADER_LENGTH,
225 tailLen);
226
227 // Check if end-of-file marker is set
228 if (memcmp(reinterpret_cast<uint8_t const *>(&job.inputBuffer[0]),
229 reinterpret_cast<uint8_t const *>(&BGZF_END_OF_FILE_MARKER[0]),
230 28) == 0)
231 {
232 job.bgzfEofMarker = true;
233 }
234
235 if (!streamBuf->serializer.istream.good())
236 {
237 streamBuf->serializer.fileOfs = -1;
238 if (streamBuf->serializer.istream.eof())
239 goto eofSkip;
240 streamBuf->serializer.error = new io_error("Stream read error.");
241 return;
242 }
243
244 job.compressedSize = DefaultPageSize<detail::bgzf_compression>::BLOCK_HEADER_LENGTH + tailLen;
245 streamBuf->serializer.fileOfs += job.compressedSize;
246 job.ready = false;
247
248 eofSkip:
249 streamBuf->serializer.istream.clear(
250 streamBuf->serializer.istream.rdstate() & ~std::ios_base::failbit);
251 }
252
253 if (streamBuf->runningQueue.try_push(jobId) != queue_op_status::success)
254 {
255 // signal that job is ready
256 {
258 job.ready = true;
259 }
260 job.readyEvent.notify_all();
261 return; // Terminate this thread.
262 }
263 }
264
265 if (!job.ready)
266 {
267 // decompress block
268 job.size = _decompressBlock(
269 &job.buffer[0] + MAX_PUTBACK, job.buffer.capacity(),
270 &job.inputBuffer[0], job.compressedSize, compressionCtx);
271
272 // signal that job is ready
273 {
275 job.ready = true;
276 }
277 job.readyEvent.notify_all();
278 }
279 }
280 }
281 };
282
283 std::vector<std::thread> pool; // pool of worker threads
284 TBuffer putbackBuffer;
285
286public:
287
288 basic_bgzf_istreambuf(istream_reference istream_,
289 size_t numThreads = bgzf_thread_count,
290 size_t jobsPerThread = 8) :
291 serializer(istream_),
292 numThreads(numThreads),
293 numJobs(numThreads * jobsPerThread),
294 runningQueue(numJobs),
295 todoQueue(numJobs),
296 runningQueueManager(detail::reader_count{1}, detail::writer_count{numThreads}, runningQueue),
297 todoQueueManager(detail::reader_count{numThreads}, detail::writer_count{1}, todoQueue),
298 putbackBuffer(MAX_PUTBACK)
299 {
300 jobs.resize(numJobs);
301 currentJobId = -1;
302
303 // Prepare todo queue.
304 for (size_t i = 0; i < numJobs; ++i)
305 {
306 [[maybe_unused]] queue_op_status status = todoQueue.try_push(i);
307 assert(status == queue_op_status::success);
308 }
309
310 // Start off the threads.
311 for (size_t i = 0; i < numThreads; ++i)
312 pool.emplace_back(DecompressionThread{this, CompressionContext<detail::bgzf_compression>{}});
313 }
314
315 ~basic_bgzf_istreambuf()
316 {
317 // Signal todoQueue that no more work is coming and close todo queue.
318 todoQueueManager.writer_arrive();
319
320 // Wait for threads to finish there active work.
321 for (auto & t : pool)
322 {
323 if (t.joinable())
324 t.join();
325 }
326
327 // Signal running queue that reader is done.
328 runningQueueManager.reader_arrive();
329 }
330
331 int_type underflow()
332 {
333 // no need to use the next buffer?
334 if (this->gptr() && this->gptr() < this->egptr())
335 return traits_type::to_int_type(*this->gptr());
336
337 size_t putback = this->gptr() - this->eback();
338 if (putback > MAX_PUTBACK)
339 putback = MAX_PUTBACK;
340
341 // save at most MAX_PUTBACK characters from previous page to putback buffer
342 if (putback != 0)
343 std::copy(
344 this->gptr() - putback,
345 this->gptr(),
346 &putbackBuffer[0]);
347
348 if (currentJobId >= 0)
349 todoQueue.wait_push(currentJobId);
350 // appendValue(todoQueue, currentJobId);
351
352 while (true)
353 {
354 if (runningQueue.wait_pop(currentJobId) == queue_op_status::closed)
355 {
356 currentJobId = -1;
357 assert(serializer.error != NULL);
358 if (serializer.error != NULL)
359 throw *serializer.error;
360 return EOF;
361 }
362
363 DecompressionJob &job = jobs[currentJobId];
364
365 // restore putback buffer
366 this->setp(&job.buffer[0], &job.buffer[0] + (job.buffer.size() - 1));
367 if (putback != 0)
368 std::copy(
369 &putbackBuffer[0],
370 &putbackBuffer[0] + putback,
371 &job.buffer[0] + (MAX_PUTBACK - putback));
372
373 // wait for the end of decompression
374 {
376 job.readyEvent.wait(lock, [&job]{return job.ready;});
377 }
378
379 size_t size = (job.size != -1)? job.size : 0;
380
381 // reset buffer pointers
382 this->setg(
383 &job.buffer[0] + (MAX_PUTBACK - putback), // beginning of putback area
384 &job.buffer[0] + MAX_PUTBACK, // read position
385 &job.buffer[0] + (MAX_PUTBACK + size)); // end of buffer
386
387 // The end of the bgzf file is reached, either if there was an error, or if the
388 // end-of-file marker was reached, while the uncompressed block had zero size.
389 if (job.size == -1 || (job.size == 0 && job.bgzfEofMarker))
390 return EOF;
391 else if (job.size > 0)
392 return traits_type::to_int_type(*this->gptr()); // return next character
393
394 throw io_error("BGZF: Invalid end condition in decompression. "
395 "Most likely due to an empty bgzf block without end-of-file marker.");
396 }
397 }
398
399 pos_type seekoff(off_type ofs, std::ios_base::seekdir dir, std::ios_base::openmode openMode)
400 {
401 if ((openMode & (std::ios_base::in | std::ios_base::out)) == std::ios_base::in)
402 {
403 if (dir == std::ios_base::cur && ofs >= 0)
404 {
405 // forward delta seek
406 while (currentJobId < 0 || this->egptr() - this->gptr() < ofs)
407 {
408 ofs -= this->egptr() - this->gptr();
409 if (this->underflow() == static_cast<int_type>(EOF))
410 break;
411 }
412
413 if (currentJobId >= 0 && ofs <= this->egptr() - this->gptr())
414 {
415 DecompressionJob &job = jobs[currentJobId];
416
417 // reset buffer pointers
418 this->setg(
419 this->eback(), // beginning of putback area
420 this->gptr() + ofs, // read position
421 this->egptr()); // end of buffer
422
423 if (this->gptr() != this->egptr())
424 return pos_type((job.fileOfs << 16) + ((this->gptr() - &job.buffer[MAX_PUTBACK])));
425 else
426 return pos_type((job.fileOfs + job.compressedSize) << 16);
427 }
428
429 }
430 else if (dir == std::ios_base::beg)
431 {
432 // random seek
433 std::streampos destFileOfs = ofs >> 16;
434
435 // are we in the same block?
436 if (currentJobId >= 0 && jobs[currentJobId].fileOfs == (off_type)destFileOfs)
437 {
438 DecompressionJob &job = jobs[currentJobId];
439
440 // reset buffer pointers
441 this->setg(
442 this->eback(), // beginning of putback area
443 &job.buffer[0] + (MAX_PUTBACK + (ofs & 0xffff)), // read position
444 this->egptr()); // end of buffer
445 return ofs;
446 }
447
448 // ok, different block
449 {
450 std::lock_guard<std::mutex> scopedLock(serializer.lock);
451
452 // remove all running jobs and put them in the idle queue unless we
453 // find our seek target
454
455 if (currentJobId >= 0)
456 todoQueue.wait_push(currentJobId);
457
458 // Note that if we are here the current job does not represent the sought block.
459 // Hence if the running queue is empty we need to explicitly unset the jobId,
460 // otherwise we would not update the serializers istream pointer to the correct position.
461 if (runningQueue.is_empty())
462 currentJobId = -1;
463
464 // empty is thread-safe in serializer.lock
465 while (!runningQueue.is_empty())
466 {
467 runningQueue.wait_pop(currentJobId);
468
469 if (jobs[currentJobId].fileOfs == (off_type)destFileOfs)
470 break;
471
472 // push back useless job
473 todoQueue.wait_push(currentJobId);
474 currentJobId = -1;
475 }
476
477 if (currentJobId == -1)
478 {
479 assert(runningQueue.is_empty());
480 serializer.istream.clear(serializer.istream.rdstate() & ~std::ios_base::eofbit);
481 if (serializer.istream.rdbuf()->pubseekpos(destFileOfs, std::ios_base::in) == destFileOfs)
482 serializer.fileOfs = destFileOfs;
483 else
484 currentJobId = -2; // temporarily signals a seek error
485 }
486 }
487
488 // if our block wasn't in the running queue yet, it should now
489 // be the first that falls out after modifying serializer.fileOfs
490 if (currentJobId == -1)
491 runningQueue.wait_pop(currentJobId);
492 else if (currentJobId == -2)
493 currentJobId = -1;
494
495 if (currentJobId >= 0)
496 {
497 // wait for the end of decompression
498 DecompressionJob &job = jobs[currentJobId];
499
500 {
502 job.readyEvent.wait(lock, [&job]{return job.ready;});
503 }
504
505 assert(job.fileOfs == (off_type)destFileOfs);
506
507 // reset buffer pointers
508 this->setg(
509 &job.buffer[0] + MAX_PUTBACK, // no putback area
510 &job.buffer[0] + (MAX_PUTBACK + (ofs & 0xffff)), // read position
511 &job.buffer[0] + (MAX_PUTBACK + job.size)); // end of buffer
512 return ofs;
513 }
514 }
515 }
516 return pos_type(off_type(-1));
517 }
518
519 pos_type seekpos(pos_type pos, std::ios_base::openmode openMode)
520 {
521 return seekoff(off_type(pos), std::ios_base::beg, openMode);
522 }
523
524 // returns the compressed input istream
525 istream_reference get_istream() { return serializer.istream; };
526};
527
528// --------------------------------------------------------------------------
529// Class basic_bgzf_istreambase
530// --------------------------------------------------------------------------
531
532template<
533 typename Elem,
534 typename Tr = std::char_traits<Elem>,
535 typename ElemA = std::allocator<Elem>,
536 typename ByteT = char,
537 typename ByteAT = std::allocator<ByteT>
538>
539class basic_bgzf_istreambase : virtual public std::basic_ios<Elem,Tr>
540{
541public:
542 typedef std::basic_istream<Elem, Tr>& istream_reference;
543 typedef basic_bgzf_istreambuf<Elem, Tr, ElemA, ByteT, ByteAT> decompression_bgzf_streambuf_type;
544
545 basic_bgzf_istreambase(istream_reference istream_)
546 : m_buf(istream_)
547 {
548 this->init(&m_buf);
549 };
550
551 // returns the underlying decompression bgzf istream object
552 decompression_bgzf_streambuf_type* rdbuf() { return &m_buf; };
553
554 // returns the bgzf error state
555 int get_zerr() const { return m_buf.get_zerr(); };
556 // returns the uncompressed data crc
557 long get_crc() const { return m_buf.get_crc(); };
558 // returns the uncompressed data size
559 long get_out_size() const { return m_buf.get_out_size(); };
560 // returns the compressed data size
561 long get_in_size() const { return m_buf.get_in_size(); };
562
563private:
564 decompression_bgzf_streambuf_type m_buf;
565};
566
567// --------------------------------------------------------------------------
568// Class basic_bgzf_istream
569// --------------------------------------------------------------------------
570
571template<
572 typename Elem,
573 typename Tr = std::char_traits<Elem>,
574 typename ElemA = std::allocator<Elem>,
575 typename ByteT = char,
576 typename ByteAT = std::allocator<ByteT>
577>
578class basic_bgzf_istream :
579 public basic_bgzf_istreambase<Elem,Tr,ElemA,ByteT,ByteAT>,
580 public std::basic_istream<Elem,Tr>
581{
582public:
583 typedef basic_bgzf_istreambase<Elem,Tr,ElemA,ByteT,ByteAT> bgzf_istreambase_type;
584 typedef std::basic_istream<Elem,Tr> istream_type;
585 typedef istream_type & istream_reference;
586 typedef char byte_type;
587
588 basic_bgzf_istream(istream_reference istream_) :
589 bgzf_istreambase_type(istream_),
590 istream_type(bgzf_istreambase_type::rdbuf()),
591 m_is_gzip(false),
592 m_gbgzf_data_size(0)
593 {};
594
595 // returns true if it is a gzip file
596 bool is_gzip() const { return m_is_gzip; };
597 // return data size check
598 bool check_data_size() const { return this->get_out_size() == m_gbgzf_data_size; };
599
600 // return the data size in the file
601 long get_gbgzf_data_size() const { return m_gbgzf_data_size; };
602
603protected:
604 static void read_long(istream_reference in_, unsigned long& x_);
605
606 int check_header();
607 bool m_is_gzip;
608 unsigned long m_gbgzf_data_size;
609
610#ifdef _WIN32
611private:
612 void _Add_vtordisp1() { } // Required to avoid VC++ warning C4250
613 void _Add_vtordisp2() { } // Required to avoid VC++ warning C4250
614#endif
615};
616
617// ===========================================================================
618// Typedefs
619// ===========================================================================
620
621// A typedef for basic_bgzf_istream<char>
622typedef basic_bgzf_istream<char> bgzf_istream;
623// A typedef for basic_bgzf_istream<wchart>
624typedef basic_bgzf_istream<wchar_t> bgzf_wistream;
625
626} // namespace seqan3::conrib
627
628#endif // defined(SEQAN3_HAS_ZLIB)
Provides stream compression utilities.
Provides seqan3::buffer_queue.
T copy(T... args)
T emplace_back(T... args)
constexpr size_t size
The size of a type pack.
Definition: traits.hpp:151
T init(T... args)
T lock(T... args)
T memcmp(T... args)
T rdbuf(T... args)
Provides seqan3::detail::reader_writer_manager.
T resize(T... args)