SeqAn3  3.0.1
The Modern C++ library for sequence analysis.
bgzf_ostream.hpp
1 // zipstream Library License:
2 // --------------------------
3 //
4 // The zlib/libpng License Copyright (c) 2003 Jonathan de Halleux.
5 //
6 // This software is provided 'as-is', without any express or implied warranty. In no event will the authors be held liable for any damages arising from the use of this software.
7 //
8 // Permission is granted to anyone to use this software for any purpose, including commercial applications, and to alter it and redistribute it freely, subject to the following restrictions:
9 //
10 // 1. The origin of this software must not be misrepresented; you must not claim that you wrote the original software. If you use this software in a product, an acknowledgment in the product documentation would be appreciated but is not required.
11 //
12 // 2. Altered source versions must be plainly marked as such, and must not be misrepresented as being the original software.
13 //
14 // 3. This notice may not be removed or altered from any source distribution
15 //
16 //
17 // Author: Jonathan de Halleux, dehalleux@pelikhan.com, 2003 (original zlib stream)
18 // Author: David Weese, dave.weese@gmail.com, 2014 (extension to parallel block-wise compression in bgzf format)
19 // Author: RenĂ© Rahn, rene.rahn [at] fu-berlin.de, 2019 (adaptions to SeqAn library version 3)
20 
21 #pragma once
22 
26 
27 namespace seqan3::contrib
28 {
29 
30 // --------------------------------------------------------------------------
31 // Class basic_bgzf_ostreambuf
32 // --------------------------------------------------------------------------
33 
34 template<
35  typename Elem,
36  typename Tr = std::char_traits<Elem>,
37  typename ElemA = std::allocator<Elem>,
38  typename ByteT = char,
39  typename ByteAT = std::allocator<ByteT>
40 >
41 class basic_bgzf_ostreambuf : public std::basic_streambuf<Elem, Tr>
42 {
43 private:
44 
45  typedef std::basic_ostream<Elem, Tr>& ostream_reference;
46  typedef ElemA char_allocator_type;
47  typedef ByteT byte_type;
48  typedef ByteAT byte_allocator_type;
49  typedef byte_type* byte_buffer_type;
50  typedef ConcurrentQueue<size_t, Suspendable<Limit> > job_queue_type;
51 
52 public:
53 
54  typedef Tr traits_type;
55  typedef typename traits_type::char_type char_type;
56  typedef typename traits_type::int_type int_type;
57  typedef typename traits_type::pos_type pos_type;
58  typedef typename traits_type::off_type off_type;
59 
60  struct ScopedLock
61  {
62  ScopedLock(std::function<void()> complete_fn) : completion(std::move(complete_fn))
63  {}
64 
65  ~ScopedLock()
66  {
67  completion();
68  }
69 
70  std::function<void()> completion;
71  };
72 
73  // One compressed block.
74  struct OutputBuffer
75  {
76  char buffer[DefaultPageSize<detail::bgzf_compression>::MAX_BLOCK_SIZE];
77  size_t size;
78  };
79 
80  // Writes the output to the underlying stream when invoked.
81  struct BufferWriter
82  {
83  ostream_reference ostream;
84 
85  BufferWriter(ostream_reference ostream) :
86  ostream(ostream)
87  {}
88 
89  bool operator() (OutputBuffer const & outputBuffer)
90  {
91  ostream.write(outputBuffer.buffer, outputBuffer.size);
92  return ostream.good();
93  }
94  };
95 
96  struct CompressionJob
97  {
99 
100  TBuffer buffer;
101  size_t size;
102  OutputBuffer *outputBuffer;
103 
104  CompressionJob() :
105  buffer(DefaultPageSize<detail::bgzf_compression>::VALUE / sizeof(char_type), 0),
106  size(0),
107  outputBuffer(NULL)
108  {}
109  };
110 
111  // string of recycable jobs
112  size_t numThreads;
113  size_t numJobs;
115  job_queue_type jobQueue;
116  job_queue_type idleQueue;
117  Serializer<OutputBuffer, BufferWriter> serializer;
118  size_t currentJobId;
119  bool currentJobAvail;
120 
121  struct CompressionThread
122  {
123  basic_bgzf_ostreambuf *streamBuf;
124  CompressionContext<detail::bgzf_compression> compressionCtx;
125 
126  void operator()()
127  {
128  ScopedLock readLock{[this] () mutable { unlockReading(this->streamBuf->jobQueue); }};
129  // ScopedReadLock<TJobQueue> readLock(streamBuf->jobQueue);
130  ScopedLock writeLock{[this] () mutable { unlockWriting(this->streamBuf->idleQueue); }};
131  // ScopedWriteLock{obQueue> writeLock{str}amBuf->idleQueue);
132 
133  // wait for a new job to become available
134  bool success = true;
135  while (success)
136  {
137  size_t jobId = -1;
138  if (!popFront(jobId, streamBuf->jobQueue))
139  return;
140 
141  CompressionJob &job = streamBuf->jobs[jobId];
142 
143  // compress block with zlib
144  job.outputBuffer->size = _compressBlock(
145  job.outputBuffer->buffer, sizeof(job.outputBuffer->buffer),
146  &job.buffer[0], job.size, compressionCtx);
147 
148  success = releaseValue(streamBuf->serializer, job.outputBuffer);
149  appendValue(streamBuf->idleQueue, jobId);
150  }
151  }
152  };
153 
154  // array of worker threads
155  // using TFuture = decltype(std::async(CompressionThread{nullptr, CompressionContext<BgzfFile>{}, static_cast<size_t>(0)}));
157 
158  basic_bgzf_ostreambuf(ostream_reference ostream_,
159  size_t numThreads = bgzf_thread_count,
160  size_t jobsPerThread = 8) :
161  numThreads(numThreads),
162  numJobs(numThreads * jobsPerThread),
163  jobQueue(numJobs),
164  idleQueue(numJobs),
165  serializer(ostream_, numThreads * jobsPerThread)
166  {
167  jobs.resize(numJobs);
168  currentJobId = 0;
169 
170  lockWriting(jobQueue);
171  lockReading(idleQueue);
172  setReaderWriterCount(jobQueue, numThreads, 1);
173  setReaderWriterCount(idleQueue, 1, numThreads);
174 
175  // Prepare idle queue.
176  for (size_t i = 0; i < numJobs; ++i)
177  {
178  [[maybe_unused]] bool success = appendValue(idleQueue, i);
179  assert(success);
180  }
181 
182  // Start off threads.
183  for (size_t i = 0; i < numThreads; ++i)
184  pool.emplace_back(CompressionThread{this, CompressionContext<detail::bgzf_compression>{}});
185 
186  currentJobAvail = popFront(currentJobId, idleQueue);
187  assert(currentJobAvail);
188 
189  CompressionJob &job = jobs[currentJobId];
190  job.outputBuffer = aquireValue(serializer);
191  this->setp(&job.buffer[0], &job.buffer[0] + (job.buffer.size() - 1));
192  }
193 
194  ~basic_bgzf_ostreambuf()
195  {
196  // the buffer is now (after addFooter()) and flush will append the empty EOF marker
197  flush(true);
198 
199  unlockWriting(jobQueue);
200 
201  // Wait for threads to finish there active work.
202  for (auto & t : pool)
203  {
204  if (t.joinable())
205  t.join();
206  }
207 
208  unlockReading(idleQueue);
209  }
210 
211  bool compressBuffer(size_t size)
212  {
213  // submit current job
214  if (currentJobAvail)
215  {
216  jobs[currentJobId].size = size;
217  appendValue(jobQueue, currentJobId);
218  }
219 
220  // recycle existing idle job
221  if (!(currentJobAvail = popFront(currentJobId, idleQueue)))
222  return false;
223 
224  jobs[currentJobId].outputBuffer = aquireValue(serializer);
225 
226  return serializer;
227  }
228 
229  int_type overflow(int_type c)
230  {
231  int w = static_cast<int>(this->pptr() - this->pbase());
232  if (c != static_cast<int_type>(EOF))
233  {
234  *this->pptr() = c;
235  ++w;
236  }
237  if (compressBuffer(w))
238  {
239  CompressionJob &job = jobs[currentJobId];
240  this->setp(&job.buffer[0], &job.buffer[0] + (job.buffer.size() - 1));
241  return c;
242  }
243  else
244  {
245  return EOF;
246  }
247  }
248 
249  std::streamsize flush(bool flushEmptyBuffer = false)
250  {
251  int w = static_cast<int>(this->pptr() - this->pbase());
252  if ((w != 0 || flushEmptyBuffer) && compressBuffer(w))
253  {
254  CompressionJob &job = jobs[currentJobId];
255  this->setp(&job.buffer[0], &job.buffer[0] + (job.buffer.size() - 1));
256  }
257  else
258  {
259  w = 0;
260  }
261 
262  // wait for running compressor threads
263  waitForMinSize(idleQueue, numJobs - 1);
264 
265  serializer.worker.ostream.flush();
266  return w;
267  }
268 
269  int sync()
270  {
271  if (this->pptr() != this->pbase())
272  {
273  int_type c = overflow(EOF);
274  if (c == static_cast<int_type>(EOF))
275  return -1;
276  }
277  return 0;
278  }
279 
280  void addFooter()
281  {
282  // we flush the filled buffer here, so that an empty (EOF) buffer is flushed in the d'tor
283  if (this->pptr() != this->pbase())
284  overflow(EOF);
285  }
286 
287  // returns a reference to the output stream
288  ostream_reference get_ostream() const { return serializer.worker.ostream; };
289 };
290 
291 // --------------------------------------------------------------------------
292 // Class basic_bgzf_ostreambase
293 // --------------------------------------------------------------------------
294 
295 template<
296  typename Elem,
297  typename Tr = std::char_traits<Elem>,
298  typename ElemA = std::allocator<Elem>,
299  typename ByteT = char,
300  typename ByteAT = std::allocator<ByteT>
301 >
302 class basic_bgzf_ostreambase : virtual public std::basic_ios<Elem,Tr>
303 {
304 public:
305  typedef std::basic_ostream<Elem, Tr>& ostream_reference;
306  typedef basic_bgzf_ostreambuf<Elem, Tr, ElemA, ByteT, ByteAT> bgzf_streambuf_type;
307 
308  basic_bgzf_ostreambase(ostream_reference ostream_)
309  : m_buf(ostream_)
310  {
311  this->init(&m_buf );
312  };
313 
314  // returns the underlying zip ostream object
315  bgzf_streambuf_type* rdbuf() { return &m_buf; };
316  // returns the bgzf error state
317  int get_zerr() const { return m_buf.get_err(); };
318  // returns the uncompressed data crc
319  long get_crc() const { return m_buf.get_crc(); };
320  // returns the compressed data size
321  long get_out_size() const { return m_buf.get_out_size(); };
322  // returns the uncompressed data size
323  long get_in_size() const { return m_buf.get_in_size(); };
324 
325 private:
326  bgzf_streambuf_type m_buf;
327 };
328 
329 // --------------------------------------------------------------------------
330 // Class basic_bgzf_ostream
331 // --------------------------------------------------------------------------
332 
333 template<
334  typename Elem,
335  typename Tr = std::char_traits<Elem>,
336  typename ElemA = std::allocator<Elem>,
337  typename ByteT = char,
338  typename ByteAT = std::allocator<ByteT>
339 >
340 class basic_bgzf_ostream :
341  public basic_bgzf_ostreambase<Elem,Tr,ElemA,ByteT,ByteAT>,
342  public std::basic_ostream<Elem,Tr>
343 {
344 public:
345  typedef basic_bgzf_ostreambase<Elem,Tr,ElemA,ByteT,ByteAT> bgzf_ostreambase_type;
346  typedef std::basic_ostream<Elem,Tr> ostream_type;
347  typedef ostream_type& ostream_reference;
348 
349  basic_bgzf_ostream(ostream_reference ostream_) :
350  bgzf_ostreambase_type(ostream_),
351  ostream_type(bgzf_ostreambase_type::rdbuf())
352  {}
353 
354  // flush inner buffer and zipper buffer
355  basic_bgzf_ostream<Elem,Tr>& flush()
356  {
357  ostream_type::flush(); this->rdbuf()->flush(); return *this;
358  };
359 
360  ~basic_bgzf_ostream()
361  {
362  this->rdbuf()->addFooter();
363  }
364 
365 private:
366  static void put_long(ostream_reference out_, unsigned long x_);
367 #ifdef _WIN32
368  void _Add_vtordisp1() { } // Required to avoid VC++ warning C4250
369  void _Add_vtordisp2() { } // Required to avoid VC++ warning C4250
370 #endif
371 };
372 
373 // ===========================================================================
374 // Typedefs
375 // ===========================================================================
376 
377 // A typedef for basic_bgzf_ostream<char>
378 typedef basic_bgzf_ostream<char> bgzf_ostream;
379 // A typedef for basic_bgzf_ostream<wchar_t>
380 typedef basic_bgzf_ostream<wchar_t> bgzf_wostream;
381 
382 } // namespace seqan3::contrib
std::vector::resize
T resize(T... args)
std::basic_ios::rdbuf
T rdbuf(T... args)
serialised_resource_pool.hpp
Provides helper structs from SeqAn2 for the bgzf_ostream.
std::vector
std::vector::size
T size(T... args)
bgzf_stream_util.hpp
Provides stream compression utilities.
seqan3::views::move
const auto move
A view that turns lvalue-references into rvalue-references.
Definition: move.hpp:68
std::basic_streambuf
std::function
std::basic_streambuf< Elem, Tr >::setp
T setp(T... args)
std::char_traits
std::streamsize
std::basic_ostream< Elem, Tr >
std::flush
T flush(T... args)
seqan3::pack_traits::size
constexpr size_t size
The size of a type pack.
Definition: traits.hpp:116
std::basic_ios::init
T init(T... args)
std::vector::emplace_back
T emplace_back(T... args)
std
SeqAn specific customisations in the standard namespace.
std::allocator
suspendable_queue.hpp
Provides seqan suspendable queue.
std::basic_ios