27#if !defined(SEQAN3_HAS_ZLIB) && !defined(SEQAN3_HEADER_TEST)
28# error "This file cannot be used when building without GZip-support."
31#if defined(SEQAN3_HAS_ZLIB)
33namespace seqan3::contrib
44 typename ByteT = char,
52 typedef ElemA char_allocator_type;
53 typedef ByteT byte_type;
54 typedef ByteAT byte_allocator_type;
55 typedef byte_type* byte_buffer_type;
56 typedef ConcurrentQueue<size_t, Suspendable<Limit> > job_queue_type;
60 typedef Tr traits_type;
61 typedef typename traits_type::char_type char_type;
62 typedef typename traits_type::int_type int_type;
63 typedef typename traits_type::pos_type pos_type;
64 typedef typename traits_type::off_type off_type;
68 ScopedLock(
std::function<
void()> complete_fn) : completion(
std::move(complete_fn))
82 char buffer[DefaultPageSize<detail::bgzf_compression>::MAX_BLOCK_SIZE];
89 ostream_reference ostream;
91 BufferWriter(ostream_reference ostream) :
95 bool operator() (OutputBuffer
const & outputBuffer)
97 ostream.write(outputBuffer.buffer, outputBuffer.size);
98 return ostream.good();
102 struct CompressionJob
108 OutputBuffer *outputBuffer;
111 buffer(DefaultPageSize<detail::bgzf_compression>::VALUE / sizeof(char_type), 0),
121 job_queue_type jobQueue;
122 job_queue_type idleQueue;
123 Serializer<OutputBuffer, BufferWriter> serializer;
125 bool currentJobAvail;
127 struct CompressionThread
129 basic_bgzf_ostreambuf *streamBuf;
130 CompressionContext<detail::bgzf_compression> compressionCtx;
134 ScopedLock readLock{[
this] ()
mutable { unlockReading(this->streamBuf->jobQueue); }};
136 ScopedLock writeLock{[
this] ()
mutable { unlockWriting(this->streamBuf->idleQueue); }};
144 if (!popFront(jobId, streamBuf->jobQueue))
147 CompressionJob &job = streamBuf->jobs[jobId];
150 job.outputBuffer->size = _compressBlock(
151 job.outputBuffer->buffer,
sizeof(job.outputBuffer->buffer),
152 &job.buffer[0], job.size, compressionCtx);
154 success = releaseValue(streamBuf->serializer, job.outputBuffer);
155 appendValue(streamBuf->idleQueue, jobId);
164 basic_bgzf_ostreambuf(ostream_reference ostream_,
165 size_t numThreads = bgzf_thread_count,
166 size_t jobsPerThread = 8) :
167 numThreads(numThreads),
168 numJobs(numThreads * jobsPerThread),
171 serializer(ostream_, numThreads * jobsPerThread)
176 lockWriting(jobQueue);
177 lockReading(idleQueue);
178 setReaderWriterCount(jobQueue, numThreads, 1);
179 setReaderWriterCount(idleQueue, 1, numThreads);
182 for (
size_t i = 0; i < numJobs; ++i)
184 [[maybe_unused]]
bool success = appendValue(idleQueue, i);
189 for (
size_t i = 0; i < numThreads; ++i)
190 pool.
emplace_back(CompressionThread{this, CompressionContext<detail::bgzf_compression>{}});
192 currentJobAvail = popFront(currentJobId, idleQueue);
193 assert(currentJobAvail);
195 CompressionJob &job = jobs[currentJobId];
196 job.outputBuffer = aquireValue(serializer);
197 this->setp(&job.buffer[0], &job.buffer[0] + (job.buffer.size() - 1));
200 ~basic_bgzf_ostreambuf()
205 unlockWriting(jobQueue);
208 for (
auto & t : pool)
214 unlockReading(idleQueue);
217 bool compressBuffer(
size_t size)
223 appendValue(jobQueue, currentJobId);
227 if (!(currentJobAvail = popFront(currentJobId, idleQueue)))
230 jobs[currentJobId].outputBuffer = aquireValue(serializer);
235 int_type overflow(int_type c)
237 int w =
static_cast<int>(this->pptr() - this->pbase());
238 if (c !=
static_cast<int_type
>(EOF))
243 if (compressBuffer(w))
245 CompressionJob &job = jobs[currentJobId];
246 this->setp(&job.buffer[0], &job.buffer[0] + (job.buffer.size() - 1));
257 int w =
static_cast<int>(this->pptr() - this->pbase());
258 if ((w != 0 || flushEmptyBuffer) && compressBuffer(w))
260 CompressionJob &job = jobs[currentJobId];
261 this->setp(&job.buffer[0], &job.buffer[0] + (job.buffer.size() - 1));
269 waitForMinSize(idleQueue, numJobs - 1);
271 serializer.worker.ostream.flush();
277 if (this->pptr() != this->pbase())
279 int_type c = overflow(EOF);
280 if (c ==
static_cast<int_type
>(EOF))
289 if (this->pptr() != this->pbase())
294 ostream_reference get_ostream()
const {
return serializer.worker.ostream; };
305 typename ByteT = char,
308class basic_bgzf_ostreambase :
virtual public std::basic_ios<Elem,Tr>
312 typedef basic_bgzf_ostreambuf<Elem, Tr, ElemA, ByteT, ByteAT> bgzf_streambuf_type;
314 basic_bgzf_ostreambase(ostream_reference ostream_)
321 bgzf_streambuf_type*
rdbuf() {
return &m_buf; };
323 int get_zerr()
const {
return m_buf.get_err(); };
325 long get_crc()
const {
return m_buf.get_crc(); };
327 long get_out_size()
const {
return m_buf.get_out_size(); };
329 long get_in_size()
const {
return m_buf.get_in_size(); };
332 bgzf_streambuf_type m_buf;
343 typename ByteT = char,
346class basic_bgzf_ostream :
347 public basic_bgzf_ostreambase<Elem,Tr,ElemA,ByteT,ByteAT>,
351 typedef basic_bgzf_ostreambase<Elem,Tr,ElemA,ByteT,ByteAT> bgzf_ostreambase_type;
353 typedef ostream_type& ostream_reference;
355 basic_bgzf_ostream(ostream_reference ostream_) :
356 bgzf_ostreambase_type(ostream_),
357 ostream_type(bgzf_ostreambase_type::rdbuf())
361 basic_bgzf_ostream<Elem,Tr>&
flush()
363 ostream_type::flush(); this->
rdbuf()->flush();
return *
this;
366 ~basic_bgzf_ostream()
368 this->
rdbuf()->addFooter();
372 static void put_long(ostream_reference out_,
unsigned long x_);
374 void _Add_vtordisp1() { }
375 void _Add_vtordisp2() { }
384typedef basic_bgzf_ostream<char> bgzf_ostream;
386typedef basic_bgzf_ostream<wchar_t> bgzf_wostream;
Provides stream compression utilities.
T emplace_back(T... args)
constexpr size_t size
The size of a type pack.
Definition: traits.hpp:151
SeqAn specific customisations in the standard namespace.
Provides helper structs from SeqAn2 for the bgzf_ostream.
Provides seqan suspendable queue.