29#include <seqan3/utility/parallel/detail/reader_writer_manager.hpp>
31#if !defined(SEQAN3_HAS_ZLIB) && !defined(SEQAN3_HEADER_TEST)
32# error "This file cannot be used when building without GZip-support."
35#if defined(SEQAN3_HAS_ZLIB)
36namespace seqan3::contrib
51 typename ByteT = char,
58 typedef Tr traits_type;
59 typedef typename traits_type::char_type char_type;
60 typedef typename traits_type::int_type int_type;
61 typedef typename traits_type::off_type off_type;
62 typedef typename traits_type::pos_type pos_type;
68 typedef ElemA char_allocator_type;
69 typedef ByteT byte_type;
70 typedef ByteAT byte_allocator_type;
71 typedef byte_type* byte_buffer_type;
74 typedef fixed_buffer_queue<int32_t> TJobQueue;
76 static const size_t MAX_PUTBACK = 4;
81 istream_reference istream;
86 Serializer(istream_reference istream) :
98 Serializer serializer;
100 struct DecompressionJob
104 TInputBuffer inputBuffer;
108 uint32_t compressedSize;
116 inputBuffer(DefaultPageSize<detail::bgzf_compression>::MAX_BLOCK_SIZE, 0),
117 buffer(MAX_PUTBACK + DefaultPageSize<detail::bgzf_compression>::MAX_BLOCK_SIZE / sizeof(char_type), 0),
126 DecompressionJob(DecompressionJob
const &other) :
127 inputBuffer(other.inputBuffer),
128 buffer(other.buffer),
129 fileOfs(other.fileOfs),
134 bgzfEofMarker(other.bgzfEofMarker)
142 TJobQueue runningQueue;
144 detail::reader_writer_manager runningQueueManager;
145 detail::reader_writer_manager todoQueueManager;
148 struct DecompressionThread
150 basic_bgzf_istreambuf *streamBuf;
151 CompressionContext<detail::bgzf_compression> compressionCtx;
156 auto reader_raii = streamBuf->todoQueueManager.register_reader();
158 auto writer_raii = streamBuf->runningQueueManager.register_writer();
165 if (streamBuf->todoQueue.wait_pop(jobId) == queue_op_status::closed)
168 DecompressionJob &job = streamBuf->jobs[jobId];
177 job.readyEvent.wait(lock, [&job]{
return job.ready;});
178 assert(job.ready ==
true);
184 job.bgzfEofMarker =
false;
185 if (streamBuf->serializer.error != NULL)
189 job.fileOfs = streamBuf->serializer.fileOfs;
191 job.compressedSize = 0;
194 if (job.fileOfs != -1)
197 streamBuf->serializer.istream.read(
198 (char_type*)&job.inputBuffer[0],
199 DefaultPageSize<detail::bgzf_compression>::BLOCK_HEADER_LENGTH);
201 if (!streamBuf->serializer.istream.good())
203 streamBuf->serializer.fileOfs = -1;
204 if (streamBuf->serializer.istream.eof())
206 streamBuf->serializer.error =
new io_error(
"Stream read error.");
211 if (!detail::bgzf_compression::validate_header(
std::span{job.inputBuffer}))
213 streamBuf->serializer.fileOfs = -1;
214 streamBuf->serializer.error =
new io_error(
"Invalid BGZF block header.");
219 tailLen = _bgzfUnpack16(&job.inputBuffer[0] + 16) +
220 1u - DefaultPageSize<detail::bgzf_compression>::BLOCK_HEADER_LENGTH;
223 streamBuf->serializer.istream.read(
224 (char_type*)&job.inputBuffer[0] + DefaultPageSize<detail::bgzf_compression>::BLOCK_HEADER_LENGTH,
228 if (
memcmp(
reinterpret_cast<uint8_t
const *
>(&job.inputBuffer[0]),
229 reinterpret_cast<uint8_t
const *
>(&BGZF_END_OF_FILE_MARKER[0]),
232 job.bgzfEofMarker =
true;
235 if (!streamBuf->serializer.istream.good())
237 streamBuf->serializer.fileOfs = -1;
238 if (streamBuf->serializer.istream.eof())
240 streamBuf->serializer.error =
new io_error(
"Stream read error.");
244 job.compressedSize = DefaultPageSize<detail::bgzf_compression>::BLOCK_HEADER_LENGTH + tailLen;
245 streamBuf->serializer.fileOfs += job.compressedSize;
249 streamBuf->serializer.istream.clear(
250 streamBuf->serializer.istream.rdstate() & ~std::ios_base::failbit);
253 if (streamBuf->runningQueue.try_push(jobId) != queue_op_status::success)
260 job.readyEvent.notify_all();
268 job.size = _decompressBlock(
269 &job.buffer[0] + MAX_PUTBACK, job.buffer.capacity(),
270 &job.inputBuffer[0], job.compressedSize, compressionCtx);
277 job.readyEvent.notify_all();
284 TBuffer putbackBuffer;
288 basic_bgzf_istreambuf(istream_reference istream_,
289 size_t numThreads = bgzf_thread_count,
290 size_t jobsPerThread = 8) :
291 serializer(istream_),
292 numThreads(numThreads),
293 numJobs(numThreads * jobsPerThread),
294 runningQueue(numJobs),
296 runningQueueManager(detail::reader_count{1}, detail::writer_count{numThreads}, runningQueue),
297 todoQueueManager(detail::reader_count{numThreads}, detail::writer_count{1}, todoQueue),
298 putbackBuffer(MAX_PUTBACK)
304 for (
size_t i = 0; i < numJobs; ++i)
306 [[maybe_unused]] queue_op_status
status = todoQueue.try_push(i);
307 assert(status == queue_op_status::success);
311 for (
size_t i = 0; i < numThreads; ++i)
312 pool.
emplace_back(DecompressionThread{this, CompressionContext<detail::bgzf_compression>{}});
315 ~basic_bgzf_istreambuf()
318 todoQueueManager.writer_arrive();
321 for (
auto & t : pool)
328 runningQueueManager.reader_arrive();
334 if (this->gptr() && this->gptr() < this->egptr())
335 return traits_type::to_int_type(*this->gptr());
337 size_t putback = this->gptr() - this->eback();
338 if (putback > MAX_PUTBACK)
339 putback = MAX_PUTBACK;
344 this->gptr() - putback,
348 if (currentJobId >= 0)
349 todoQueue.wait_push(currentJobId);
354 if (runningQueue.wait_pop(currentJobId) == queue_op_status::closed)
357 assert(serializer.error != NULL);
358 if (serializer.error != NULL)
359 throw *serializer.error;
363 DecompressionJob &job = jobs[currentJobId];
366 this->setp(&job.buffer[0], &job.buffer[0] + (job.buffer.size() - 1));
370 &putbackBuffer[0] + putback,
371 &job.buffer[0] + (MAX_PUTBACK - putback));
376 job.readyEvent.wait(lock, [&job]{
return job.ready;});
379 size_t size = (job.size != -1)? job.size : 0;
383 &job.buffer[0] + (MAX_PUTBACK - putback),
384 &job.buffer[0] + MAX_PUTBACK,
385 &job.buffer[0] + (MAX_PUTBACK + size));
389 if (job.size == -1 || (job.size == 0 && job.bgzfEofMarker))
391 else if (job.size > 0)
392 return traits_type::to_int_type(*this->gptr());
394 throw io_error(
"BGZF: Invalid end condition in decompression. "
395 "Most likely due to an empty bgzf block without end-of-file marker.");
399 pos_type seekoff(off_type ofs, std::ios_base::seekdir dir, std::ios_base::openmode openMode)
401 if ((openMode & (std::ios_base::in | std::ios_base::out)) == std::ios_base::in)
403 if (dir == std::ios_base::cur && ofs >= 0)
406 while (currentJobId < 0 || this->egptr() - this->gptr() < ofs)
408 ofs -= this->egptr() - this->gptr();
409 if (this->underflow() ==
static_cast<int_type
>(EOF))
413 if (currentJobId >= 0 && ofs <= this->egptr() - this->gptr())
415 DecompressionJob &job = jobs[currentJobId];
423 if (this->gptr() != this->egptr())
424 return pos_type((job.fileOfs << 16) + ((this->gptr() - &job.buffer[MAX_PUTBACK])));
426 return pos_type((job.fileOfs + job.compressedSize) << 16);
430 else if (dir == std::ios_base::beg)
436 if (currentJobId >= 0 && jobs[currentJobId].fileOfs == (off_type)destFileOfs)
438 DecompressionJob &job = jobs[currentJobId];
443 &job.buffer[0] + (MAX_PUTBACK + (ofs & 0xffff)),
455 if (currentJobId >= 0)
456 todoQueue.wait_push(currentJobId);
461 if (runningQueue.is_empty())
465 while (!runningQueue.is_empty())
467 runningQueue.wait_pop(currentJobId);
469 if (jobs[currentJobId].fileOfs == (off_type)destFileOfs)
473 todoQueue.wait_push(currentJobId);
477 if (currentJobId == -1)
479 assert(runningQueue.is_empty());
480 serializer.istream.clear(serializer.istream.rdstate() & ~std::ios_base::eofbit);
481 if (serializer.istream.rdbuf()->pubseekpos(destFileOfs, std::ios_base::in) == destFileOfs)
482 serializer.fileOfs = destFileOfs;
490 if (currentJobId == -1)
491 runningQueue.wait_pop(currentJobId);
492 else if (currentJobId == -2)
495 if (currentJobId >= 0)
498 DecompressionJob &job = jobs[currentJobId];
502 job.readyEvent.wait(lock, [&job]{
return job.ready;});
505 assert(job.fileOfs == (off_type)destFileOfs);
509 &job.buffer[0] + MAX_PUTBACK,
510 &job.buffer[0] + (MAX_PUTBACK + (ofs & 0xffff)),
511 &job.buffer[0] + (MAX_PUTBACK + job.size));
516 return pos_type(off_type(-1));
519 pos_type seekpos(pos_type pos, std::ios_base::openmode openMode)
521 return seekoff(off_type(pos), std::ios_base::beg, openMode);
525 istream_reference get_istream() {
return serializer.istream; };
536 typename ByteT = char,
539class basic_bgzf_istreambase :
virtual public std::basic_ios<Elem,Tr>
543 typedef basic_bgzf_istreambuf<Elem, Tr, ElemA, ByteT, ByteAT> decompression_bgzf_streambuf_type;
545 basic_bgzf_istreambase(istream_reference istream_)
552 decompression_bgzf_streambuf_type*
rdbuf() {
return &m_buf; };
555 int get_zerr()
const {
return m_buf.get_zerr(); };
557 long get_crc()
const {
return m_buf.get_crc(); };
559 long get_out_size()
const {
return m_buf.get_out_size(); };
561 long get_in_size()
const {
return m_buf.get_in_size(); };
564 decompression_bgzf_streambuf_type m_buf;
575 typename ByteT = char,
578class basic_bgzf_istream :
579 public basic_bgzf_istreambase<Elem,Tr,ElemA,ByteT,ByteAT>,
583 typedef basic_bgzf_istreambase<Elem,Tr,ElemA,ByteT,ByteAT> bgzf_istreambase_type;
585 typedef istream_type & istream_reference;
586 typedef char byte_type;
588 basic_bgzf_istream(istream_reference istream_) :
589 bgzf_istreambase_type(istream_),
590 istream_type(bgzf_istreambase_type::rdbuf()),
596 bool is_gzip()
const {
return m_is_gzip; };
598 bool check_data_size()
const {
return this->get_out_size() == m_gbgzf_data_size; };
601 long get_gbgzf_data_size()
const {
return m_gbgzf_data_size; };
604 static void read_long(istream_reference in_,
unsigned long& x_);
608 unsigned long m_gbgzf_data_size;
612 void _Add_vtordisp1() { }
613 void _Add_vtordisp2() { }
622typedef basic_bgzf_istream<char> bgzf_istream;
624typedef basic_bgzf_istream<wchar_t> bgzf_wistream;
Provides stream compression utilities.
Provides seqan3::buffer_queue.
T emplace_back(T... args)
constexpr size_t size
The size of a type pack.
Definition type_pack/traits.hpp:146