32 namespace seqan3::contrib
47 typename ByteT = char,
54 typedef Tr traits_type;
55 typedef typename traits_type::char_type char_type;
56 typedef typename traits_type::int_type int_type;
57 typedef typename traits_type::off_type off_type;
58 typedef typename traits_type::pos_type pos_type;
64 typedef ElemA char_allocator_type;
65 typedef ByteT byte_type;
66 typedef ByteAT byte_allocator_type;
67 typedef byte_type* byte_buffer_type;
70 typedef fixed_buffer_queue<int32_t> TJobQueue;
72 static const size_t MAX_PUTBACK = 4;
77 istream_reference istream;
82 Serializer(istream_reference istream) :
94 Serializer serializer;
96 struct DecompressionJob
100 TInputBuffer inputBuffer;
104 uint32_t compressedSize;
112 inputBuffer(DefaultPageSize<detail::bgzf_compression>::MAX_BLOCK_SIZE, 0),
113 buffer(MAX_PUTBACK + DefaultPageSize<detail::bgzf_compression>::MAX_BLOCK_SIZE / sizeof(char_type), 0),
122 DecompressionJob(DecompressionJob
const &other) :
123 inputBuffer(other.inputBuffer),
124 buffer(other.buffer),
125 fileOfs(other.fileOfs),
130 bgzfEofMarker(other.bgzfEofMarker)
138 TJobQueue runningQueue;
140 detail::reader_writer_manager runningQueueManager;
141 detail::reader_writer_manager todoQueueManager;
144 struct DecompressionThread
146 basic_bgzf_istreambuf *streamBuf;
147 CompressionContext<detail::bgzf_compression> compressionCtx;
152 auto reader_raii = streamBuf->todoQueueManager.register_reader();
154 auto writer_raii = streamBuf->runningQueueManager.register_writer();
161 if (streamBuf->todoQueue.wait_pop(jobId) == queue_op_status::closed)
164 DecompressionJob &job = streamBuf->jobs[jobId];
173 job.readyEvent.wait(lock, [&job]{
return job.ready;});
174 assert(job.ready ==
true);
180 job.bgzfEofMarker =
false;
181 if (streamBuf->serializer.error != NULL)
185 job.fileOfs = streamBuf->serializer.fileOfs;
187 job.compressedSize = 0;
190 if (job.fileOfs != -1)
193 streamBuf->serializer.istream.read(
194 (char_type*)&job.inputBuffer[0],
195 DefaultPageSize<detail::bgzf_compression>::BLOCK_HEADER_LENGTH);
197 if (!streamBuf->serializer.istream.good())
199 streamBuf->serializer.fileOfs = -1;
200 if (streamBuf->serializer.istream.eof())
202 streamBuf->serializer.error =
new io_error(
"Stream read error.");
207 if (!detail::bgzf_compression::validate_header(
std::span{job.inputBuffer}))
209 streamBuf->serializer.fileOfs = -1;
210 streamBuf->serializer.error =
new io_error(
"Invalid BGZF block header.");
215 tailLen = _bgzfUnpack16(&job.inputBuffer[0] + 16) +
216 1u - DefaultPageSize<detail::bgzf_compression>::BLOCK_HEADER_LENGTH;
219 streamBuf->serializer.istream.read(
220 (char_type*)&job.inputBuffer[0] + DefaultPageSize<detail::bgzf_compression>::BLOCK_HEADER_LENGTH,
224 if (
memcmp(reinterpret_cast<uint8_t const *>(&job.inputBuffer[0]),
225 reinterpret_cast<uint8_t const *>(&BGZF_END_OF_FILE_MARKER[0]),
228 job.bgzfEofMarker =
true;
231 if (!streamBuf->serializer.istream.good())
233 streamBuf->serializer.fileOfs = -1;
234 if (streamBuf->serializer.istream.eof())
236 streamBuf->serializer.error =
new io_error(
"Stream read error.");
240 job.compressedSize = DefaultPageSize<detail::bgzf_compression>::BLOCK_HEADER_LENGTH + tailLen;
241 streamBuf->serializer.fileOfs += job.compressedSize;
245 streamBuf->serializer.istream.clear(
246 streamBuf->serializer.istream.rdstate() & ~
std::ios_base::failbit);
249 if (streamBuf->runningQueue.try_push(jobId) != queue_op_status::success)
256 job.readyEvent.notify_all();
264 job.size = _decompressBlock(
265 &job.buffer[0] + MAX_PUTBACK, job.buffer.capacity(),
266 &job.inputBuffer[0], job.compressedSize, compressionCtx);
273 job.readyEvent.notify_all();
280 TBuffer putbackBuffer;
284 basic_bgzf_istreambuf(istream_reference istream_,
285 size_t numThreads = bgzf_thread_count,
286 size_t jobsPerThread = 8) :
287 serializer(istream_),
288 numThreads(numThreads),
289 numJobs(numThreads * jobsPerThread),
290 runningQueue(numJobs),
292 runningQueueManager(detail::reader_count{1}, detail::writer_count{numThreads}, runningQueue),
293 todoQueueManager(detail::reader_count{numThreads}, detail::writer_count{1}, todoQueue),
294 putbackBuffer(MAX_PUTBACK)
300 for (
size_t i = 0; i < numJobs; ++i)
302 [[maybe_unused]] queue_op_status
status = todoQueue.try_push(i);
303 assert(status == queue_op_status::success);
307 for (
size_t i = 0; i < numThreads; ++i)
308 pool.
emplace_back(DecompressionThread{this, CompressionContext<detail::bgzf_compression>{}});
311 ~basic_bgzf_istreambuf()
314 todoQueueManager.writer_arrive();
317 for (
auto & t : pool)
324 runningQueueManager.reader_arrive();
330 if (this->gptr() && this->gptr() < this->egptr())
331 return traits_type::to_int_type(*this->gptr());
333 size_t putback = this->gptr() - this->eback();
334 if (putback > MAX_PUTBACK)
335 putback = MAX_PUTBACK;
340 this->gptr() - putback,
344 if (currentJobId >= 0)
345 todoQueue.wait_push(currentJobId);
350 if (runningQueue.wait_pop(currentJobId) == queue_op_status::closed)
353 assert(serializer.error != NULL);
354 if (serializer.error != NULL)
355 throw *serializer.error;
359 DecompressionJob &job = jobs[currentJobId];
362 this->setp(&job.buffer[0], &job.buffer[0] + (job.buffer.size() - 1));
366 &putbackBuffer[0] + putback,
367 &job.buffer[0] + (MAX_PUTBACK - putback));
372 job.readyEvent.wait(lock, [&job]{
return job.ready;});
375 size_t size = (job.size != -1)? job.size : 0;
379 &job.buffer[0] + (MAX_PUTBACK - putback),
380 &job.buffer[0] + MAX_PUTBACK,
381 &job.buffer[0] + (MAX_PUTBACK +
size));
385 if (job.size == -1 || (job.size == 0 && job.bgzfEofMarker))
387 else if (job.size > 0)
388 return traits_type::to_int_type(*this->gptr());
390 throw io_error(
"BGZF: Invalid end condition in decompression. "
391 "Most likely due to an empty bgzf block without end-of-file marker.");
395 pos_type seekoff(off_type ofs, std::ios_base::seekdir dir, std::ios_base::openmode openMode)
397 if ((openMode & (std::ios_base::in | std::ios_base::out)) == std::ios_base::in)
399 if (dir == std::ios_base::cur && ofs >= 0)
402 while (currentJobId < 0 || this->egptr() - this->gptr() < ofs)
404 ofs -= this->egptr() - this->gptr();
405 if (this->underflow() == static_cast<int_type>(EOF))
409 if (currentJobId >= 0 && ofs <= this->egptr() - this->gptr())
411 DecompressionJob &job = jobs[currentJobId];
419 if (this->gptr() != this->egptr())
420 return pos_type((job.fileOfs << 16) + ((this->gptr() - &job.buffer[MAX_PUTBACK])));
422 return pos_type((job.fileOfs + job.compressedSize) << 16);
426 else if (dir == std::ios_base::beg)
432 if (currentJobId >= 0 && jobs[currentJobId].fileOfs == (off_type)destFileOfs)
434 DecompressionJob &job = jobs[currentJobId];
439 &job.buffer[0] + (MAX_PUTBACK + (ofs & 0xffff)),
451 if (currentJobId >= 0)
452 todoQueue.wait_push(currentJobId);
457 if (runningQueue.is_empty())
461 while (!runningQueue.is_empty())
463 runningQueue.wait_pop(currentJobId);
465 if (jobs[currentJobId].fileOfs == (off_type)destFileOfs)
469 todoQueue.wait_push(currentJobId);
473 if (currentJobId == -1)
475 assert(runningQueue.is_empty());
476 serializer.istream.clear(serializer.istream.rdstate() & ~
std::ios_base::eofbit);
477 if (serializer.istream.rdbuf()->pubseekpos(destFileOfs, std::ios_base::in) == destFileOfs)
478 serializer.fileOfs = destFileOfs;
486 if (currentJobId == -1)
487 runningQueue.wait_pop(currentJobId);
488 else if (currentJobId == -2)
491 if (currentJobId >= 0)
494 DecompressionJob &job = jobs[currentJobId];
498 job.readyEvent.wait(lock, [&job]{
return job.ready;});
501 assert(job.fileOfs == (off_type)destFileOfs);
505 &job.buffer[0] + MAX_PUTBACK,
506 &job.buffer[0] + (MAX_PUTBACK + (ofs & 0xffff)),
507 &job.buffer[0] + (MAX_PUTBACK + job.size));
512 return pos_type(off_type(-1));
515 pos_type seekpos(pos_type pos, std::ios_base::openmode openMode)
517 return seekoff(off_type(pos), std::ios_base::beg, openMode);
521 istream_reference get_istream() {
return serializer.istream; };
532 typename ByteT = char,
535 class basic_bgzf_istreambase :
virtual public std::basic_ios<Elem,Tr>
539 typedef basic_bgzf_istreambuf<Elem, Tr, ElemA, ByteT, ByteAT> decompression_bgzf_streambuf_type;
541 basic_bgzf_istreambase(istream_reference istream_)
548 decompression_bgzf_streambuf_type*
rdbuf() {
return &m_buf; };
551 int get_zerr()
const {
return m_buf.get_zerr(); };
553 long get_crc()
const {
return m_buf.get_crc(); };
555 long get_out_size()
const {
return m_buf.get_out_size(); };
557 long get_in_size()
const {
return m_buf.get_in_size(); };
560 decompression_bgzf_streambuf_type m_buf;
571 typename ByteT = char,
574 class basic_bgzf_istream :
575 public basic_bgzf_istreambase<Elem,Tr,ElemA,ByteT,ByteAT>,
579 typedef basic_bgzf_istreambase<Elem,Tr,ElemA,ByteT,ByteAT> bgzf_istreambase_type;
581 typedef istream_type & istream_reference;
582 typedef char byte_type;
584 basic_bgzf_istream(istream_reference istream_) :
585 bgzf_istreambase_type(istream_),
586 istream_type(bgzf_istreambase_type::rdbuf()),
592 bool is_gzip()
const {
return m_is_gzip; };
594 bool check_data_size()
const {
return this->get_out_size() == m_gbgzf_data_size; };
597 long get_gbgzf_data_size()
const {
return m_gbgzf_data_size; };
600 static void read_long(istream_reference in_,
unsigned long& x_);
604 unsigned long m_gbgzf_data_size;
608 void _Add_vtordisp1() { }
609 void _Add_vtordisp2() { }
618 typedef basic_bgzf_istream<char> bgzf_istream;
620 typedef basic_bgzf_istream<wchar_t> bgzf_wistream;