27 namespace seqan3::contrib
38 typename ByteT = char,
46 typedef ElemA char_allocator_type;
47 typedef ByteT byte_type;
48 typedef ByteAT byte_allocator_type;
49 typedef byte_type* byte_buffer_type;
50 typedef ConcurrentQueue<size_t, Suspendable<Limit> > job_queue_type;
54 typedef Tr traits_type;
55 typedef typename traits_type::char_type char_type;
56 typedef typename traits_type::int_type int_type;
57 typedef typename traits_type::pos_type pos_type;
58 typedef typename traits_type::off_type off_type;
76 char buffer[DefaultPageSize<detail::bgzf_compression>::MAX_BLOCK_SIZE];
83 ostream_reference ostream;
85 BufferWriter(ostream_reference ostream) :
89 bool operator() (OutputBuffer
const & outputBuffer)
91 ostream.write(outputBuffer.buffer, outputBuffer.size);
92 return ostream.good();
102 OutputBuffer *outputBuffer;
105 buffer(DefaultPageSize<detail::bgzf_compression>::VALUE / sizeof(char_type), 0),
115 job_queue_type jobQueue;
116 job_queue_type idleQueue;
117 Serializer<OutputBuffer, BufferWriter> serializer;
119 bool currentJobAvail;
121 struct CompressionThread
123 basic_bgzf_ostreambuf *streamBuf;
124 CompressionContext<detail::bgzf_compression> compressionCtx;
128 ScopedLock readLock{[
this] ()
mutable { unlockReading(this->streamBuf->jobQueue); }};
130 ScopedLock writeLock{[
this] ()
mutable { unlockWriting(this->streamBuf->idleQueue); }};
138 if (!popFront(jobId, streamBuf->jobQueue))
141 CompressionJob &job = streamBuf->jobs[jobId];
144 job.outputBuffer->size = _compressBlock(
145 job.outputBuffer->buffer,
sizeof(job.outputBuffer->buffer),
146 &job.buffer[0], job.size, compressionCtx);
148 success = releaseValue(streamBuf->serializer, job.outputBuffer);
149 appendValue(streamBuf->idleQueue, jobId);
158 basic_bgzf_ostreambuf(ostream_reference ostream_,
159 size_t numThreads = bgzf_thread_count,
160 size_t jobsPerThread = 8) :
161 numThreads(numThreads),
162 numJobs(numThreads * jobsPerThread),
165 serializer(ostream_, numThreads * jobsPerThread)
170 lockWriting(jobQueue);
171 lockReading(idleQueue);
172 setReaderWriterCount(jobQueue, numThreads, 1);
173 setReaderWriterCount(idleQueue, 1, numThreads);
176 for (
size_t i = 0; i < numJobs; ++i)
178 [[maybe_unused]]
bool success = appendValue(idleQueue, i);
183 for (
size_t i = 0; i < numThreads; ++i)
184 pool.
emplace_back(CompressionThread{this, CompressionContext<detail::bgzf_compression>{}});
186 currentJobAvail = popFront(currentJobId, idleQueue);
187 assert(currentJobAvail);
189 CompressionJob &job = jobs[currentJobId];
190 job.outputBuffer = aquireValue(serializer);
191 this->
setp(&job.buffer[0], &job.buffer[0] + (job.buffer.size() - 1));
194 ~basic_bgzf_ostreambuf()
199 unlockWriting(jobQueue);
202 for (
auto & t : pool)
208 unlockReading(idleQueue);
211 bool compressBuffer(
size_t size)
217 appendValue(jobQueue, currentJobId);
221 if (!(currentJobAvail = popFront(currentJobId, idleQueue)))
224 jobs[currentJobId].outputBuffer = aquireValue(serializer);
229 int_type overflow(int_type c)
231 int w =
static_cast<int>(this->pptr() - this->pbase());
232 if (c !=
static_cast<int_type
>(EOF))
237 if (compressBuffer(w))
239 CompressionJob &job = jobs[currentJobId];
240 this->setp(&job.buffer[0], &job.buffer[0] + (job.buffer.size() - 1));
251 int w =
static_cast<int>(this->pptr() - this->pbase());
252 if ((w != 0 || flushEmptyBuffer) && compressBuffer(w))
254 CompressionJob &job = jobs[currentJobId];
255 this->setp(&job.buffer[0], &job.buffer[0] + (job.buffer.size() - 1));
263 waitForMinSize(idleQueue, numJobs - 1);
265 serializer.worker.ostream.flush();
271 if (this->pptr() != this->pbase())
273 int_type c = overflow(EOF);
274 if (c ==
static_cast<int_type
>(EOF))
283 if (this->pptr() != this->pbase())
288 ostream_reference get_ostream()
const {
return serializer.worker.ostream; };
299 typename ByteT = char,
302 class basic_bgzf_ostreambase :
virtual public std::basic_ios<Elem,Tr>
306 typedef basic_bgzf_ostreambuf<Elem, Tr, ElemA, ByteT, ByteAT> bgzf_streambuf_type;
308 basic_bgzf_ostreambase(ostream_reference ostream_)
315 bgzf_streambuf_type*
rdbuf() {
return &m_buf; };
317 int get_zerr()
const {
return m_buf.get_err(); };
319 long get_crc()
const {
return m_buf.get_crc(); };
321 long get_out_size()
const {
return m_buf.get_out_size(); };
323 long get_in_size()
const {
return m_buf.get_in_size(); };
326 bgzf_streambuf_type m_buf;
337 typename ByteT = char,
340 class basic_bgzf_ostream :
341 public basic_bgzf_ostreambase<Elem,Tr,ElemA,ByteT,ByteAT>,
345 typedef basic_bgzf_ostreambase<Elem,Tr,ElemA,ByteT,ByteAT> bgzf_ostreambase_type;
347 typedef ostream_type& ostream_reference;
349 basic_bgzf_ostream(ostream_reference ostream_) :
350 bgzf_ostreambase_type(ostream_),
351 ostream_type(bgzf_ostreambase_type::rdbuf())
355 basic_bgzf_ostream<Elem,Tr>&
flush()
357 ostream_type::flush(); this->
rdbuf()->flush();
return *
this;
360 ~basic_bgzf_ostream()
362 this->
rdbuf()->addFooter();
366 static void put_long(ostream_reference out_,
unsigned long x_);
368 void _Add_vtordisp1() { }
369 void _Add_vtordisp2() { }
378 typedef basic_bgzf_ostream<char> bgzf_ostream;
380 typedef basic_bgzf_ostream<wchar_t> bgzf_wostream;
Provides stream compression utilities.
T emplace_back(T... args)
constexpr size_t size
The size of a type pack.
Definition: traits.hpp:151
auto const move
A view that turns lvalue-references into rvalue-references.
Definition: move.hpp:74
SeqAn specific customisations in the standard namespace.
Provides helper structs from SeqAn2 for the bgzf_ostream.
Provides seqan suspendable queue.