31#include <seqan3/utility/parallel/detail/reader_writer_manager.hpp>
33#if !defined(SEQAN3_HAS_ZLIB) && !defined(SEQAN3_HEADER_TEST)
34# error "This file cannot be used when building without GZip-support."
37#if defined(SEQAN3_HAS_ZLIB)
38namespace seqan3::contrib
53 typename ByteT = char,
60 typedef Tr traits_type;
61 typedef typename traits_type::char_type char_type;
62 typedef typename traits_type::int_type int_type;
63 typedef typename traits_type::off_type off_type;
64 typedef typename traits_type::pos_type pos_type;
70 typedef ElemA char_allocator_type;
71 typedef ByteT byte_type;
72 typedef ByteAT byte_allocator_type;
73 typedef byte_type* byte_buffer_type;
76 typedef fixed_buffer_queue<int32_t> TJobQueue;
78 static const size_t MAX_PUTBACK = 4;
83 istream_reference istream;
88 Serializer(istream_reference istream) :
100 Serializer serializer;
102 struct DecompressionJob
106 TInputBuffer inputBuffer;
110 uint32_t compressedSize;
118 inputBuffer(DefaultPageSize<detail::bgzf_compression>::MAX_BLOCK_SIZE, 0),
119 buffer(MAX_PUTBACK + DefaultPageSize<detail::bgzf_compression>::MAX_BLOCK_SIZE / sizeof(char_type), 0),
128 DecompressionJob(DecompressionJob
const &other) :
129 inputBuffer(other.inputBuffer),
130 buffer(other.buffer),
131 fileOfs(other.fileOfs),
136 bgzfEofMarker(other.bgzfEofMarker)
144 TJobQueue runningQueue;
146 detail::reader_writer_manager runningQueueManager;
147 detail::reader_writer_manager todoQueueManager;
150 struct DecompressionThread
152 basic_bgzf_istreambuf *streamBuf;
153 CompressionContext<detail::bgzf_compression> compressionCtx;
158 auto reader_raii = streamBuf->todoQueueManager.register_reader();
160 auto writer_raii = streamBuf->runningQueueManager.register_writer();
167 if (streamBuf->todoQueue.wait_pop(jobId) == queue_op_status::closed)
170 DecompressionJob &job = streamBuf->jobs[jobId];
179 job.readyEvent.wait(lock, [&job]{
return job.ready;});
180 assert(job.ready ==
true);
186 job.bgzfEofMarker =
false;
187 if (streamBuf->serializer.error != NULL)
191 job.fileOfs = streamBuf->serializer.fileOfs;
193 job.compressedSize = 0;
196 if (job.fileOfs != -1)
199 streamBuf->serializer.istream.read(
200 (char_type*)&job.inputBuffer[0],
201 DefaultPageSize<detail::bgzf_compression>::BLOCK_HEADER_LENGTH);
203 if (!streamBuf->serializer.istream.good())
205 streamBuf->serializer.fileOfs = -1;
206 if (streamBuf->serializer.istream.eof())
208 streamBuf->serializer.error =
new io_error(
"Stream read error.");
213 if (!detail::bgzf_compression::validate_header(
std::span{job.inputBuffer}))
215 streamBuf->serializer.fileOfs = -1;
216 streamBuf->serializer.error =
new io_error(
"Invalid BGZF block header.");
221 tailLen = _bgzfUnpack16(&job.inputBuffer[0] + 16) +
222 1u - DefaultPageSize<detail::bgzf_compression>::BLOCK_HEADER_LENGTH;
225 streamBuf->serializer.istream.read(
226 (char_type*)&job.inputBuffer[0] + DefaultPageSize<detail::bgzf_compression>::BLOCK_HEADER_LENGTH,
230 if (
memcmp(
reinterpret_cast<uint8_t
const *
>(&job.inputBuffer[0]),
231 reinterpret_cast<uint8_t
const *
>(&BGZF_END_OF_FILE_MARKER[0]),
234 job.bgzfEofMarker =
true;
237 if (!streamBuf->serializer.istream.good())
239 streamBuf->serializer.fileOfs = -1;
240 if (streamBuf->serializer.istream.eof())
242 streamBuf->serializer.error =
new io_error(
"Stream read error.");
246 job.compressedSize = DefaultPageSize<detail::bgzf_compression>::BLOCK_HEADER_LENGTH + tailLen;
247 streamBuf->serializer.fileOfs += job.compressedSize;
251 streamBuf->serializer.istream.clear(
252 streamBuf->serializer.istream.rdstate() & ~std::ios_base::failbit);
255 if (streamBuf->runningQueue.try_push(jobId) != queue_op_status::success)
262 job.readyEvent.notify_all();
270 job.size = _decompressBlock(
271 &job.buffer[0] + MAX_PUTBACK, job.buffer.capacity(),
272 &job.inputBuffer[0], job.compressedSize, compressionCtx);
279 job.readyEvent.notify_all();
286 TBuffer putbackBuffer;
290 basic_bgzf_istreambuf(istream_reference istream_,
291 size_t numThreads = bgzf_thread_count,
292 size_t jobsPerThread = 8) :
293 serializer(istream_),
294 numThreads(numThreads),
295 numJobs(numThreads * jobsPerThread),
296 runningQueue(numJobs),
298 runningQueueManager(detail::reader_count{1}, detail::writer_count{numThreads}, runningQueue),
299 todoQueueManager(detail::reader_count{numThreads}, detail::writer_count{1}, todoQueue),
300 putbackBuffer(MAX_PUTBACK)
306 for (
size_t i = 0; i < numJobs; ++i)
308 [[maybe_unused]] queue_op_status
status = todoQueue.try_push(i);
309 assert(status == queue_op_status::success);
313 for (
size_t i = 0; i < numThreads; ++i)
314 pool.
emplace_back(DecompressionThread{this, CompressionContext<detail::bgzf_compression>{}});
317 ~basic_bgzf_istreambuf()
320 todoQueueManager.writer_arrive();
323 for (
auto & t : pool)
330 runningQueueManager.reader_arrive();
336 if (this->gptr() && this->gptr() < this->egptr())
337 return traits_type::to_int_type(*this->gptr());
339 size_t putback = this->gptr() - this->eback();
340 if (putback > MAX_PUTBACK)
341 putback = MAX_PUTBACK;
346 this->gptr() - putback,
350 if (currentJobId >= 0)
351 todoQueue.wait_push(currentJobId);
356 if (runningQueue.wait_pop(currentJobId) == queue_op_status::closed)
359 assert(serializer.error != NULL);
360 if (serializer.error != NULL)
361 throw *serializer.error;
365 DecompressionJob &job = jobs[currentJobId];
368 this->setp(&job.buffer[0], &job.buffer[0] + (job.buffer.size() - 1));
372 &putbackBuffer[0] + putback,
373 &job.buffer[0] + (MAX_PUTBACK - putback));
378 job.readyEvent.wait(lock, [&job]{
return job.ready;});
381 size_t size = (job.size != -1)? job.size : 0;
385 &job.buffer[0] + (MAX_PUTBACK - putback),
386 &job.buffer[0] + MAX_PUTBACK,
387 &job.buffer[0] + (MAX_PUTBACK + size));
391 if (job.size == -1 || (job.size == 0 && job.bgzfEofMarker))
393 else if (job.size > 0)
394 return traits_type::to_int_type(*this->gptr());
396 throw io_error(
"BGZF: Invalid end condition in decompression. "
397 "Most likely due to an empty bgzf block without end-of-file marker.");
401 pos_type seekoff(off_type ofs, std::ios_base::seekdir dir, std::ios_base::openmode openMode)
403 if ((openMode & (std::ios_base::in | std::ios_base::out)) == std::ios_base::in)
405 if (dir == std::ios_base::cur && ofs >= 0)
408 while (currentJobId < 0 || this->egptr() - this->gptr() < ofs)
410 ofs -= this->egptr() - this->gptr();
411 if (this->underflow() ==
static_cast<int_type
>(EOF))
415 if (currentJobId >= 0 && ofs <= this->egptr() - this->gptr())
417 DecompressionJob &job = jobs[currentJobId];
425 if (this->gptr() != this->egptr())
426 return pos_type((job.fileOfs << 16) + ((this->gptr() - &job.buffer[MAX_PUTBACK])));
428 return pos_type((job.fileOfs + job.compressedSize) << 16);
432 else if (dir == std::ios_base::beg)
438 if (currentJobId >= 0 && jobs[currentJobId].fileOfs == (off_type)destFileOfs)
440 DecompressionJob &job = jobs[currentJobId];
445 &job.buffer[0] + (MAX_PUTBACK + (ofs & 0xffff)),
457 if (currentJobId >= 0)
458 todoQueue.wait_push(currentJobId);
463 if (runningQueue.is_empty())
467 while (!runningQueue.is_empty())
469 runningQueue.wait_pop(currentJobId);
471 if (jobs[currentJobId].fileOfs == (off_type)destFileOfs)
475 todoQueue.wait_push(currentJobId);
479 if (currentJobId == -1)
481 assert(runningQueue.is_empty());
482 serializer.istream.clear(serializer.istream.rdstate() & ~std::ios_base::eofbit);
483 if (serializer.istream.rdbuf()->pubseekpos(destFileOfs, std::ios_base::in) == destFileOfs)
484 serializer.fileOfs = destFileOfs;
492 if (currentJobId == -1)
493 runningQueue.wait_pop(currentJobId);
494 else if (currentJobId == -2)
497 if (currentJobId >= 0)
500 DecompressionJob &job = jobs[currentJobId];
504 job.readyEvent.wait(lock, [&job]{
return job.ready;});
507 assert(job.fileOfs == (off_type)destFileOfs);
511 &job.buffer[0] + MAX_PUTBACK,
512 &job.buffer[0] + (MAX_PUTBACK + (ofs & 0xffff)),
513 &job.buffer[0] + (MAX_PUTBACK + job.size));
518 return pos_type(off_type(-1));
521 pos_type seekpos(pos_type pos, std::ios_base::openmode openMode)
523 return seekoff(off_type(pos), std::ios_base::beg, openMode);
527 istream_reference get_istream() {
return serializer.istream; };
538 typename ByteT = char,
541class basic_bgzf_istreambase :
virtual public std::basic_ios<Elem,Tr>
545 typedef basic_bgzf_istreambuf<Elem, Tr, ElemA, ByteT, ByteAT> decompression_bgzf_streambuf_type;
547 basic_bgzf_istreambase(istream_reference istream_)
554 decompression_bgzf_streambuf_type*
rdbuf() {
return &m_buf; };
557 int get_zerr()
const {
return m_buf.get_zerr(); };
559 long get_crc()
const {
return m_buf.get_crc(); };
561 long get_out_size()
const {
return m_buf.get_out_size(); };
563 long get_in_size()
const {
return m_buf.get_in_size(); };
566 decompression_bgzf_streambuf_type m_buf;
577 typename ByteT = char,
580class basic_bgzf_istream :
581 public basic_bgzf_istreambase<Elem,Tr,ElemA,ByteT,ByteAT>,
585 typedef basic_bgzf_istreambase<Elem,Tr,ElemA,ByteT,ByteAT> bgzf_istreambase_type;
587 typedef istream_type & istream_reference;
588 typedef char byte_type;
590 basic_bgzf_istream(istream_reference istream_) :
591 bgzf_istreambase_type(istream_),
592 istream_type(bgzf_istreambase_type::rdbuf()),
598 bool is_gzip()
const {
return m_is_gzip; };
600 bool check_data_size()
const {
return this->get_out_size() == m_gbgzf_data_size; };
603 long get_gbgzf_data_size()
const {
return m_gbgzf_data_size; };
606 static void read_long(istream_reference in_,
unsigned long& x_);
610 unsigned long m_gbgzf_data_size;
614 void _Add_vtordisp1() { }
615 void _Add_vtordisp2() { }
624typedef basic_bgzf_istream<char> bgzf_istream;
626typedef basic_bgzf_istream<wchar_t> bgzf_wistream;
Provides stream compression utilities.
Provides seqan3::buffer_queue.
T emplace_back(T... args)
constexpr size_t size
The size of a type pack.
Definition type_pack/traits.hpp:143