32#if !defined(SEQAN3_HAS_ZLIB) && !defined(SEQAN3_HEADER_TEST)
33# error "This file cannot be used when building without GZip-support."
36#if defined(SEQAN3_HAS_ZLIB)
38namespace seqan3::contrib
49 typename ByteT = char,
57 typedef ElemA char_allocator_type;
58 typedef ByteT byte_type;
59 typedef ByteAT byte_allocator_type;
60 typedef byte_type* byte_buffer_type;
61 typedef ConcurrentQueue<size_t, Suspendable<Limit> > job_queue_type;
65 typedef Tr traits_type;
66 typedef typename traits_type::char_type char_type;
67 typedef typename traits_type::int_type int_type;
68 typedef typename traits_type::pos_type pos_type;
69 typedef typename traits_type::off_type off_type;
87 char buffer[DefaultPageSize<detail::bgzf_compression>::MAX_BLOCK_SIZE];
94 ostream_reference ostream;
96 BufferWriter(ostream_reference ostream) :
100 bool operator() (OutputBuffer
const & outputBuffer)
102 ostream.write(outputBuffer.buffer, outputBuffer.size);
103 return ostream.good();
107 struct CompressionJob
113 OutputBuffer *outputBuffer;
116 buffer(DefaultPageSize<detail::bgzf_compression>::VALUE / sizeof(char_type), 0),
126 job_queue_type jobQueue;
127 job_queue_type idleQueue;
128 Serializer<OutputBuffer, BufferWriter> serializer;
130 bool currentJobAvail;
132 struct CompressionThread
134 basic_bgzf_ostreambuf *streamBuf;
135 CompressionContext<detail::bgzf_compression> compressionCtx;
139 ScopedLock readLock{[
this] ()
mutable { unlockReading(this->streamBuf->jobQueue); }};
141 ScopedLock writeLock{[
this] ()
mutable { unlockWriting(this->streamBuf->idleQueue); }};
149 if (!popFront(jobId, streamBuf->jobQueue))
152 CompressionJob &job = streamBuf->jobs[jobId];
155 job.outputBuffer->size = _compressBlock(
156 job.outputBuffer->buffer,
sizeof(job.outputBuffer->buffer),
157 &job.buffer[0], job.size, compressionCtx);
159 success = releaseValue(streamBuf->serializer, job.outputBuffer);
160 appendValue(streamBuf->idleQueue, jobId);
169 basic_bgzf_ostreambuf(ostream_reference ostream_,
170 size_t numThreads = bgzf_thread_count,
171 size_t jobsPerThread = 8) :
172 numThreads(numThreads),
173 numJobs(numThreads * jobsPerThread),
176 serializer(ostream_, numThreads * jobsPerThread)
181 lockWriting(jobQueue);
182 lockReading(idleQueue);
183 setReaderWriterCount(jobQueue, numThreads, 1);
184 setReaderWriterCount(idleQueue, 1, numThreads);
187 for (
size_t i = 0; i < numJobs; ++i)
189 [[maybe_unused]]
bool success = appendValue(idleQueue, i);
194 for (
size_t i = 0; i < numThreads; ++i)
195 pool.
emplace_back(CompressionThread{this, CompressionContext<detail::bgzf_compression>{}});
197 currentJobAvail = popFront(currentJobId, idleQueue);
198 assert(currentJobAvail);
200 CompressionJob &job = jobs[currentJobId];
201 job.outputBuffer = aquireValue(serializer);
202 this->setp(&job.buffer[0], &job.buffer[0] + (job.buffer.size() - 1));
205 ~basic_bgzf_ostreambuf()
210 unlockWriting(jobQueue);
213 for (
auto & t : pool)
219 unlockReading(idleQueue);
222 bool compressBuffer(
size_t size)
228 appendValue(jobQueue, currentJobId);
232 if (!(currentJobAvail = popFront(currentJobId, idleQueue)))
235 jobs[currentJobId].outputBuffer = aquireValue(serializer);
240 int_type overflow(int_type c)
242 int w =
static_cast<int>(this->pptr() - this->pbase());
243 if (c !=
static_cast<int_type
>(EOF))
248 if (compressBuffer(w))
250 CompressionJob &job = jobs[currentJobId];
251 this->setp(&job.buffer[0], &job.buffer[0] + (job.buffer.size() - 1));
262 int w =
static_cast<int>(this->pptr() - this->pbase());
263 if ((w != 0 || flushEmptyBuffer) && compressBuffer(w))
265 CompressionJob &job = jobs[currentJobId];
266 this->setp(&job.buffer[0], &job.buffer[0] + (job.buffer.size() - 1));
274 waitForMinSize(idleQueue, numJobs - 1);
276 serializer.worker.ostream.flush();
282 if (this->pptr() != this->pbase())
284 int_type c = overflow(EOF);
285 if (c ==
static_cast<int_type
>(EOF))
294 if (this->pptr() != this->pbase())
299 ostream_reference get_ostream()
const {
return serializer.worker.ostream; };
310 typename ByteT = char,
313class basic_bgzf_ostreambase :
virtual public std::basic_ios<Elem,Tr>
317 typedef basic_bgzf_ostreambuf<Elem, Tr, ElemA, ByteT, ByteAT> bgzf_streambuf_type;
319 basic_bgzf_ostreambase(ostream_reference ostream_)
326 bgzf_streambuf_type*
rdbuf() {
return &m_buf; };
328 int get_zerr()
const {
return m_buf.get_err(); };
330 long get_crc()
const {
return m_buf.get_crc(); };
332 long get_out_size()
const {
return m_buf.get_out_size(); };
334 long get_in_size()
const {
return m_buf.get_in_size(); };
337 bgzf_streambuf_type m_buf;
348 typename ByteT = char,
351class basic_bgzf_ostream :
352 public basic_bgzf_ostreambase<Elem,Tr,ElemA,ByteT,ByteAT>,
356 typedef basic_bgzf_ostreambase<Elem,Tr,ElemA,ByteT,ByteAT> bgzf_ostreambase_type;
358 typedef ostream_type& ostream_reference;
360 basic_bgzf_ostream(ostream_reference ostream_) :
361 bgzf_ostreambase_type(ostream_),
362 ostream_type(bgzf_ostreambase_type::rdbuf())
366 basic_bgzf_ostream<Elem,Tr>&
flush()
368 ostream_type::flush(); this->
rdbuf()->flush();
return *
this;
371 ~basic_bgzf_ostream()
373 this->
rdbuf()->addFooter();
377 static void put_long(ostream_reference out_,
unsigned long x_);
379 void _Add_vtordisp1() { }
380 void _Add_vtordisp2() { }
389typedef basic_bgzf_ostream<char> bgzf_ostream;
391typedef basic_bgzf_ostream<wchar_t> bgzf_wostream;
Provides stream compression utilities.
T emplace_back(T... args)
constexpr size_t size
The size of a type pack.
Definition type_pack/traits.hpp:143
SeqAn specific customisations in the standard namespace.
Provides helper structs from SeqAn2 for the bgzf_ostream.
Provides seqan suspendable queue.