31 namespace seqan3::contrib
46 typename ByteT = char,
53 typedef Tr traits_type;
54 typedef typename traits_type::char_type char_type;
55 typedef typename traits_type::int_type int_type;
56 typedef typename traits_type::off_type off_type;
57 typedef typename traits_type::pos_type pos_type;
63 typedef ElemA char_allocator_type;
64 typedef ByteT byte_type;
65 typedef ByteAT byte_allocator_type;
66 typedef byte_type* byte_buffer_type;
69 typedef fixed_buffer_queue<int32_t> TJobQueue;
71 static const size_t MAX_PUTBACK = 4;
76 istream_reference istream;
81 Serializer(istream_reference istream) :
93 Serializer serializer;
95 struct DecompressionJob
99 TInputBuffer inputBuffer;
103 uint32_t compressedSize;
111 inputBuffer(DefaultPageSize<detail::bgzf_compression>::MAX_BLOCK_SIZE, 0),
112 buffer(MAX_PUTBACK + DefaultPageSize<detail::bgzf_compression>::MAX_BLOCK_SIZE / sizeof(char_type), 0),
121 DecompressionJob(DecompressionJob
const &other) :
122 inputBuffer(other.inputBuffer),
123 buffer(other.buffer),
124 fileOfs(other.fileOfs),
129 bgzfEofMarker(other.bgzfEofMarker)
137 TJobQueue runningQueue;
139 detail::reader_writer_manager runningQueueManager;
140 detail::reader_writer_manager todoQueueManager;
143 struct DecompressionThread
145 basic_bgzf_istreambuf *streamBuf;
146 CompressionContext<detail::bgzf_compression> compressionCtx;
151 auto reader_raii = streamBuf->todoQueueManager.register_reader();
153 auto writer_raii = streamBuf->runningQueueManager.register_writer();
160 if (streamBuf->todoQueue.wait_pop(jobId) == queue_op_status::closed)
163 DecompressionJob &job = streamBuf->jobs[jobId];
172 job.readyEvent.wait(lock, [&job]{
return job.ready;});
173 assert(job.ready ==
true);
179 job.bgzfEofMarker =
false;
180 if (streamBuf->serializer.error != NULL)
184 job.fileOfs = streamBuf->serializer.fileOfs;
186 job.compressedSize = 0;
189 if (job.fileOfs != -1)
192 streamBuf->serializer.istream.read(
193 (char_type*)&job.inputBuffer[0],
194 DefaultPageSize<detail::bgzf_compression>::BLOCK_HEADER_LENGTH);
196 if (!streamBuf->serializer.istream.good())
198 streamBuf->serializer.fileOfs = -1;
199 if (streamBuf->serializer.istream.eof())
201 streamBuf->serializer.error =
new io_error(
"Stream read error.");
206 if (!detail::bgzf_compression::validate_header(
std::span{job.inputBuffer}))
208 streamBuf->serializer.fileOfs = -1;
209 streamBuf->serializer.error =
new io_error(
"Invalid BGZF block header.");
214 tailLen = _bgzfUnpack16(&job.inputBuffer[0] + 16) +
215 1u - DefaultPageSize<detail::bgzf_compression>::BLOCK_HEADER_LENGTH;
218 streamBuf->serializer.istream.read(
219 (char_type*)&job.inputBuffer[0] + DefaultPageSize<detail::bgzf_compression>::BLOCK_HEADER_LENGTH,
223 if (
memcmp(
reinterpret_cast<uint8_t
const *
>(&job.inputBuffer[0]),
224 reinterpret_cast<uint8_t
const *
>(&BGZF_END_OF_FILE_MARKER[0]),
227 job.bgzfEofMarker =
true;
230 if (!streamBuf->serializer.istream.good())
232 streamBuf->serializer.fileOfs = -1;
233 if (streamBuf->serializer.istream.eof())
235 streamBuf->serializer.error =
new io_error(
"Stream read error.");
239 job.compressedSize = DefaultPageSize<detail::bgzf_compression>::BLOCK_HEADER_LENGTH + tailLen;
240 streamBuf->serializer.fileOfs += job.compressedSize;
244 streamBuf->serializer.istream.clear(
245 streamBuf->serializer.istream.rdstate() & ~std::ios_base::failbit);
248 if (streamBuf->runningQueue.try_push(jobId) != queue_op_status::success)
255 job.readyEvent.notify_all();
263 job.size = _decompressBlock(
264 &job.buffer[0] + MAX_PUTBACK, job.buffer.capacity(),
265 &job.inputBuffer[0], job.compressedSize, compressionCtx);
272 job.readyEvent.notify_all();
279 TBuffer putbackBuffer;
283 basic_bgzf_istreambuf(istream_reference istream_,
284 size_t numThreads = bgzf_thread_count,
285 size_t jobsPerThread = 8) :
286 serializer(istream_),
287 numThreads(numThreads),
288 numJobs(numThreads * jobsPerThread),
289 runningQueue(numJobs),
291 runningQueueManager(detail::reader_count{1}, detail::writer_count{numThreads}, runningQueue),
292 todoQueueManager(detail::reader_count{numThreads}, detail::writer_count{1}, todoQueue),
293 putbackBuffer(MAX_PUTBACK)
299 for (
size_t i = 0; i < numJobs; ++i)
301 [[maybe_unused]] queue_op_status
status = todoQueue.try_push(i);
302 assert(status == queue_op_status::success);
306 for (
size_t i = 0; i < numThreads; ++i)
307 pool.
emplace_back(DecompressionThread{this, CompressionContext<detail::bgzf_compression>{}});
310 ~basic_bgzf_istreambuf()
313 todoQueueManager.writer_arrive();
316 for (
auto & t : pool)
323 runningQueueManager.reader_arrive();
329 if (this->gptr() && this->gptr() < this->egptr())
330 return traits_type::to_int_type(*this->gptr());
332 size_t putback = this->gptr() - this->eback();
333 if (putback > MAX_PUTBACK)
334 putback = MAX_PUTBACK;
339 this->gptr() - putback,
343 if (currentJobId >= 0)
344 todoQueue.wait_push(currentJobId);
349 if (runningQueue.wait_pop(currentJobId) == queue_op_status::closed)
352 assert(serializer.error != NULL);
353 if (serializer.error != NULL)
354 throw *serializer.error;
358 DecompressionJob &job = jobs[currentJobId];
361 this->setp(&job.buffer[0], &job.buffer[0] + (job.buffer.size() - 1));
365 &putbackBuffer[0] + putback,
366 &job.buffer[0] + (MAX_PUTBACK - putback));
371 job.readyEvent.wait(lock, [&job]{
return job.ready;});
374 size_t size = (job.size != -1)? job.size : 0;
378 &job.buffer[0] + (MAX_PUTBACK - putback),
379 &job.buffer[0] + MAX_PUTBACK,
380 &job.buffer[0] + (MAX_PUTBACK +
size));
384 if (job.size == -1 || (job.size == 0 && job.bgzfEofMarker))
386 else if (job.size > 0)
387 return traits_type::to_int_type(*this->gptr());
389 throw io_error(
"BGZF: Invalid end condition in decompression. "
390 "Most likely due to an empty bgzf block without end-of-file marker.");
394 pos_type seekoff(off_type ofs, std::ios_base::seekdir dir, std::ios_base::openmode openMode)
396 if ((openMode & (std::ios_base::in | std::ios_base::out)) == std::ios_base::in)
398 if (dir == std::ios_base::cur && ofs >= 0)
401 while (currentJobId < 0 || this->egptr() - this->gptr() < ofs)
403 ofs -= this->egptr() - this->gptr();
404 if (this->underflow() ==
static_cast<int_type
>(EOF))
408 if (currentJobId >= 0 && ofs <= this->egptr() - this->gptr())
410 DecompressionJob &job = jobs[currentJobId];
418 if (this->gptr() != this->egptr())
419 return pos_type((job.fileOfs << 16) + ((this->gptr() - &job.buffer[MAX_PUTBACK])));
421 return pos_type((job.fileOfs + job.compressedSize) << 16);
425 else if (dir == std::ios_base::beg)
431 if (currentJobId >= 0 && jobs[currentJobId].fileOfs == (off_type)destFileOfs)
433 DecompressionJob &job = jobs[currentJobId];
438 &job.buffer[0] + (MAX_PUTBACK + (ofs & 0xffff)),
450 if (currentJobId >= 0)
451 todoQueue.wait_push(currentJobId);
456 if (runningQueue.is_empty())
460 while (!runningQueue.is_empty())
462 runningQueue.wait_pop(currentJobId);
464 if (jobs[currentJobId].fileOfs == (off_type)destFileOfs)
468 todoQueue.wait_push(currentJobId);
472 if (currentJobId == -1)
474 assert(runningQueue.is_empty());
475 serializer.istream.clear(serializer.istream.rdstate() & ~std::ios_base::eofbit);
476 if (serializer.istream.rdbuf()->pubseekpos(destFileOfs, std::ios_base::in) == destFileOfs)
477 serializer.fileOfs = destFileOfs;
485 if (currentJobId == -1)
486 runningQueue.wait_pop(currentJobId);
487 else if (currentJobId == -2)
490 if (currentJobId >= 0)
493 DecompressionJob &job = jobs[currentJobId];
497 job.readyEvent.wait(lock, [&job]{
return job.ready;});
500 assert(job.fileOfs == (off_type)destFileOfs);
504 &job.buffer[0] + MAX_PUTBACK,
505 &job.buffer[0] + (MAX_PUTBACK + (ofs & 0xffff)),
506 &job.buffer[0] + (MAX_PUTBACK + job.size));
511 return pos_type(off_type(-1));
514 pos_type seekpos(pos_type pos, std::ios_base::openmode openMode)
516 return seekoff(off_type(pos), std::ios_base::beg, openMode);
520 istream_reference get_istream() {
return serializer.istream; };
531 typename ByteT = char,
534 class basic_bgzf_istreambase :
virtual public std::basic_ios<Elem,Tr>
538 typedef basic_bgzf_istreambuf<Elem, Tr, ElemA, ByteT, ByteAT> decompression_bgzf_streambuf_type;
540 basic_bgzf_istreambase(istream_reference istream_)
547 decompression_bgzf_streambuf_type*
rdbuf() {
return &m_buf; };
550 int get_zerr()
const {
return m_buf.get_zerr(); };
552 long get_crc()
const {
return m_buf.get_crc(); };
554 long get_out_size()
const {
return m_buf.get_out_size(); };
556 long get_in_size()
const {
return m_buf.get_in_size(); };
559 decompression_bgzf_streambuf_type m_buf;
570 typename ByteT = char,
573 class basic_bgzf_istream :
574 public basic_bgzf_istreambase<Elem,Tr,ElemA,ByteT,ByteAT>,
578 typedef basic_bgzf_istreambase<Elem,Tr,ElemA,ByteT,ByteAT> bgzf_istreambase_type;
580 typedef istream_type & istream_reference;
581 typedef char byte_type;
583 basic_bgzf_istream(istream_reference istream_) :
584 bgzf_istreambase_type(istream_),
585 istream_type(bgzf_istreambase_type::rdbuf()),
591 bool is_gzip()
const {
return m_is_gzip; };
593 bool check_data_size()
const {
return this->get_out_size() == m_gbgzf_data_size; };
596 long get_gbgzf_data_size()
const {
return m_gbgzf_data_size; };
599 static void read_long(istream_reference in_,
unsigned long& x_);
603 unsigned long m_gbgzf_data_size;
607 void _Add_vtordisp1() { }
608 void _Add_vtordisp2() { }
617 typedef basic_bgzf_istream<char> bgzf_istream;
619 typedef basic_bgzf_istream<wchar_t> bgzf_wistream;
Provides stream compression utilities.
Provides seqan3::buffer_queue.
T emplace_back(T... args)
constexpr size_t size
The size of a type pack.
Definition: traits.hpp:151
Provides seqan3::detail::reader_writer_manager.