30#if !defined(SEQAN3_HAS_ZLIB) && !defined(SEQAN3_HEADER_TEST)
31# error "This file cannot be used when building without GZip-support."
34#if defined(SEQAN3_HAS_ZLIB)
36namespace seqan3::contrib
47 typename ByteT = char,
55 typedef ElemA char_allocator_type;
56 typedef ByteT byte_type;
57 typedef ByteAT byte_allocator_type;
58 typedef byte_type* byte_buffer_type;
59 typedef ConcurrentQueue<size_t, Suspendable<Limit> > job_queue_type;
63 typedef Tr traits_type;
64 typedef typename traits_type::char_type char_type;
65 typedef typename traits_type::int_type int_type;
66 typedef typename traits_type::pos_type pos_type;
67 typedef typename traits_type::off_type off_type;
85 char buffer[DefaultPageSize<detail::bgzf_compression>::MAX_BLOCK_SIZE];
92 ostream_reference ostream;
94 BufferWriter(ostream_reference ostream) :
98 bool operator() (OutputBuffer
const & outputBuffer)
100 ostream.write(outputBuffer.buffer, outputBuffer.size);
101 return ostream.good();
105 struct CompressionJob
111 OutputBuffer *outputBuffer;
114 buffer(DefaultPageSize<detail::bgzf_compression>::VALUE / sizeof(char_type), 0),
124 job_queue_type jobQueue;
125 job_queue_type idleQueue;
126 Serializer<OutputBuffer, BufferWriter> serializer;
128 bool currentJobAvail;
130 struct CompressionThread
132 basic_bgzf_ostreambuf *streamBuf;
133 CompressionContext<detail::bgzf_compression> compressionCtx;
137 ScopedLock readLock{[
this] ()
mutable { unlockReading(this->streamBuf->jobQueue); }};
139 ScopedLock writeLock{[
this] ()
mutable { unlockWriting(this->streamBuf->idleQueue); }};
147 if (!popFront(jobId, streamBuf->jobQueue))
150 CompressionJob &job = streamBuf->jobs[jobId];
153 job.outputBuffer->size = _compressBlock(
154 job.outputBuffer->buffer,
sizeof(job.outputBuffer->buffer),
155 &job.buffer[0], job.size, compressionCtx);
157 success = releaseValue(streamBuf->serializer, job.outputBuffer);
158 appendValue(streamBuf->idleQueue, jobId);
167 basic_bgzf_ostreambuf(ostream_reference ostream_,
168 size_t numThreads = bgzf_thread_count,
169 size_t jobsPerThread = 8) :
170 numThreads(numThreads),
171 numJobs(numThreads * jobsPerThread),
174 serializer(ostream_, numThreads * jobsPerThread)
179 lockWriting(jobQueue);
180 lockReading(idleQueue);
181 setReaderWriterCount(jobQueue, numThreads, 1);
182 setReaderWriterCount(idleQueue, 1, numThreads);
185 for (
size_t i = 0; i < numJobs; ++i)
187 [[maybe_unused]]
bool success = appendValue(idleQueue, i);
192 for (
size_t i = 0; i < numThreads; ++i)
193 pool.
emplace_back(CompressionThread{this, CompressionContext<detail::bgzf_compression>{}});
195 currentJobAvail = popFront(currentJobId, idleQueue);
196 assert(currentJobAvail);
198 CompressionJob &job = jobs[currentJobId];
199 job.outputBuffer = aquireValue(serializer);
200 this->setp(&job.buffer[0], &job.buffer[0] + (job.buffer.size() - 1));
203 ~basic_bgzf_ostreambuf()
208 unlockWriting(jobQueue);
211 for (
auto & t : pool)
217 unlockReading(idleQueue);
220 bool compressBuffer(
size_t size)
226 appendValue(jobQueue, currentJobId);
230 if (!(currentJobAvail = popFront(currentJobId, idleQueue)))
233 jobs[currentJobId].outputBuffer = aquireValue(serializer);
238 int_type overflow(int_type c)
240 int w =
static_cast<int>(this->pptr() - this->pbase());
241 if (c !=
static_cast<int_type
>(EOF))
246 if (compressBuffer(w))
248 CompressionJob &job = jobs[currentJobId];
249 this->setp(&job.buffer[0], &job.buffer[0] + (job.buffer.size() - 1));
260 int w =
static_cast<int>(this->pptr() - this->pbase());
261 if ((w != 0 || flushEmptyBuffer) && compressBuffer(w))
263 CompressionJob &job = jobs[currentJobId];
264 this->setp(&job.buffer[0], &job.buffer[0] + (job.buffer.size() - 1));
272 waitForMinSize(idleQueue, numJobs - 1);
274 serializer.worker.ostream.flush();
280 if (this->pptr() != this->pbase())
282 int_type c = overflow(EOF);
283 if (c ==
static_cast<int_type
>(EOF))
292 if (this->pptr() != this->pbase())
297 ostream_reference get_ostream()
const {
return serializer.worker.ostream; };
308 typename ByteT = char,
311class basic_bgzf_ostreambase :
virtual public std::basic_ios<Elem,Tr>
315 typedef basic_bgzf_ostreambuf<Elem, Tr, ElemA, ByteT, ByteAT> bgzf_streambuf_type;
317 basic_bgzf_ostreambase(ostream_reference ostream_)
324 bgzf_streambuf_type*
rdbuf() {
return &m_buf; };
326 int get_zerr()
const {
return m_buf.get_err(); };
328 long get_crc()
const {
return m_buf.get_crc(); };
330 long get_out_size()
const {
return m_buf.get_out_size(); };
332 long get_in_size()
const {
return m_buf.get_in_size(); };
335 bgzf_streambuf_type m_buf;
346 typename ByteT = char,
349class basic_bgzf_ostream :
350 public basic_bgzf_ostreambase<Elem,Tr,ElemA,ByteT,ByteAT>,
354 typedef basic_bgzf_ostreambase<Elem,Tr,ElemA,ByteT,ByteAT> bgzf_ostreambase_type;
356 typedef ostream_type& ostream_reference;
358 basic_bgzf_ostream(ostream_reference ostream_) :
359 bgzf_ostreambase_type(ostream_),
360 ostream_type(bgzf_ostreambase_type::rdbuf())
364 basic_bgzf_ostream<Elem,Tr>&
flush()
366 ostream_type::flush(); this->
rdbuf()->flush();
return *
this;
369 ~basic_bgzf_ostream()
371 this->
rdbuf()->addFooter();
375 static void put_long(ostream_reference out_,
unsigned long x_);
377 void _Add_vtordisp1() { }
378 void _Add_vtordisp2() { }
387typedef basic_bgzf_ostream<char> bgzf_ostream;
389typedef basic_bgzf_ostream<wchar_t> bgzf_wostream;
Provides stream compression utilities.
T emplace_back(T... args)
constexpr size_t size
The size of a type pack.
Definition: traits.hpp:146
SeqAn specific customisations in the standard namespace.
Provides helper structs from SeqAn2 for the bgzf_ostream.
Provides seqan suspendable queue.