SeqAn3  3.0.3
The Modern C++ library for sequence analysis.
bgzf_istream.hpp
1 // zipstream Library License:
2 // --------------------------
3 //
4 // The zlib/libpng License Copyright (c) 2003 Jonathan de Halleux.
5 //
6 // This software is provided 'as-is', without any express or implied warranty. In no event will the authors be held liable for any damages arising from the use of this software.
7 //
8 // Permission is granted to anyone to use this software for any purpose, including commercial applications, and to alter it and redistribute it freely, subject to the following restrictions:
9 //
10 // 1. The origin of this software must not be misrepresented; you must not claim that you wrote the original software. If you use this software in a product, an acknowledgment in the product documentation would be appreciated but is not required.
11 //
12 // 2. Altered source versions must be plainly marked as such, and must not be misrepresented as being the original software.
13 //
14 // 3. This notice may not be removed or altered from any source distribution
15 //
16 //
17 // Author: Jonathan de Halleux, dehalleux@pelikhan.com, 2003 (original zlib stream)
18 // Author: David Weese, dave.weese@gmail.com, 2014 (extension to parallel block-wise compression in bgzf format)
19 // Author: Rene Rahn, rene.rahn AT fu-berlin.de, 2019 (adaption to SeqAn library version 3)
20 
21 #pragma once
22 
23 #include <cstring>
24 #include <condition_variable>
25 #include <mutex>
26 
30 
31 namespace seqan3::contrib
32 {
33 
34 // ===========================================================================
35 // Classes
36 // ===========================================================================
37 
38 // --------------------------------------------------------------------------
39 // Class basic_bgzf_istreambuf
40 // --------------------------------------------------------------------------
41 
42 template<
43  typename Elem,
44  typename Tr = std::char_traits<Elem>,
45  typename ElemA = std::allocator<Elem>,
46  typename ByteT = char,
47  typename ByteAT = std::allocator<ByteT>
48 >
49 class basic_bgzf_istreambuf : public std::basic_streambuf<Elem, Tr>
50 {
51 public:
52 
53  typedef Tr traits_type;
54  typedef typename traits_type::char_type char_type;
55  typedef typename traits_type::int_type int_type;
56  typedef typename traits_type::off_type off_type;
57  typedef typename traits_type::pos_type pos_type;
58 
59 private:
60 
61  typedef std::basic_istream<Elem, Tr>& istream_reference;
62 
63  typedef ElemA char_allocator_type;
64  typedef ByteT byte_type;
65  typedef ByteAT byte_allocator_type;
66  typedef byte_type* byte_buffer_type;
67 
69  typedef fixed_buffer_queue<int32_t> TJobQueue;
70 
71  static const size_t MAX_PUTBACK = 4;
72 
73  // Allows serialized access to the underlying buffer.
74  struct Serializer
75  {
76  istream_reference istream;
78  io_error *error;
79  off_type fileOfs;
80 
81  Serializer(istream_reference istream) :
82  istream(istream),
83  error(NULL),
84  fileOfs(0u)
85  {}
86 
87  ~Serializer()
88  {
89  delete error;
90  }
91  };
92 
93  Serializer serializer;
94 
95  struct DecompressionJob
96  {
98 
99  TInputBuffer inputBuffer;
100  TBuffer buffer;
101  off_type fileOfs;
102  int32_t size;
103  uint32_t compressedSize;
104 
105  std::mutex cs;
106  std::condition_variable readyEvent;
107  bool ready;
108  bool bgzfEofMarker;
109 
110  DecompressionJob() :
111  inputBuffer(DefaultPageSize<detail::bgzf_compression>::MAX_BLOCK_SIZE, 0),
112  buffer(MAX_PUTBACK + DefaultPageSize<detail::bgzf_compression>::MAX_BLOCK_SIZE / sizeof(char_type), 0),
113  fileOfs(),
114  size(0),
115  cs(),
116  readyEvent(),
117  ready(true),
118  bgzfEofMarker(false)
119  {}
120 
121  DecompressionJob(DecompressionJob const &other) :
122  inputBuffer(other.inputBuffer),
123  buffer(other.buffer),
124  fileOfs(other.fileOfs),
125  size(other.size),
126  cs(),
127  readyEvent(),
128  ready(other.ready),
129  bgzfEofMarker(other.bgzfEofMarker)
130  {}
131  };
132 
133  // string of recyclable jobs
134  size_t numThreads;
135  size_t numJobs;
137  TJobQueue runningQueue;
138  TJobQueue todoQueue;
139  detail::reader_writer_manager runningQueueManager; // synchronises reader, writer with running queue.
140  detail::reader_writer_manager todoQueueManager; // synchronises reader, writer with todo queue.
141  int currentJobId;
142 
143  struct DecompressionThread
144  {
145  basic_bgzf_istreambuf *streamBuf;
146  CompressionContext<detail::bgzf_compression> compressionCtx;
147 
148  void operator()()
149  {
150  // Active reader to consume from todo queue.
151  auto reader_raii = streamBuf->todoQueueManager.register_reader();
152  // Active writer to produce work for the decompression queue.
153  auto writer_raii = streamBuf->runningQueueManager.register_writer();
154 
155  // wait for a new job to become available
156  while (true)
157  {
158 
159  int jobId = -1;
160  if (streamBuf->todoQueue.wait_pop(jobId) == queue_op_status::closed)
161  return;
162 
163  DecompressionJob &job = streamBuf->jobs[jobId];
164  size_t tailLen = 0;
165 
166  // typically the idle queue contains only ready jobs
167  // however, if seek() fast forwards running jobs into the todoQueue
168  // the caller defers the task of waiting to the decompression threads
169  if (!job.ready)
170  {
172  job.readyEvent.wait(lock, [&job]{return job.ready;});
173  assert(job.ready == true);
174  }
175 
176  {
177  std::lock_guard<std::mutex> scopedLock(streamBuf->serializer.lock);
178 
179  job.bgzfEofMarker = false;
180  if (streamBuf->serializer.error != NULL)
181  return;
182 
183  // remember start offset (for tellg later)
184  job.fileOfs = streamBuf->serializer.fileOfs;
185  job.size = -1;
186  job.compressedSize = 0;
187 
188  // only load if not at EOF
189  if (job.fileOfs != -1)
190  {
191  // read header
192  streamBuf->serializer.istream.read(
193  (char_type*)&job.inputBuffer[0],
194  DefaultPageSize<detail::bgzf_compression>::BLOCK_HEADER_LENGTH);
195 
196  if (!streamBuf->serializer.istream.good())
197  {
198  streamBuf->serializer.fileOfs = -1;
199  if (streamBuf->serializer.istream.eof())
200  goto eofSkip;
201  streamBuf->serializer.error = new io_error("Stream read error.");
202  return;
203  }
204 
205  // check header
206  if (!detail::bgzf_compression::validate_header(std::span{job.inputBuffer}))
207  {
208  streamBuf->serializer.fileOfs = -1;
209  streamBuf->serializer.error = new io_error("Invalid BGZF block header.");
210  return;
211  }
212 
213  // extract length of compressed data
214  tailLen = _bgzfUnpack16(&job.inputBuffer[0] + 16) +
215  1u - DefaultPageSize<detail::bgzf_compression>::BLOCK_HEADER_LENGTH;
216 
217  // read compressed data and tail
218  streamBuf->serializer.istream.read(
219  (char_type*)&job.inputBuffer[0] + DefaultPageSize<detail::bgzf_compression>::BLOCK_HEADER_LENGTH,
220  tailLen);
221 
222  // Check if end-of-file marker is set
223  if (memcmp(reinterpret_cast<uint8_t const *>(&job.inputBuffer[0]),
224  reinterpret_cast<uint8_t const *>(&BGZF_END_OF_FILE_MARKER[0]),
225  28) == 0)
226  {
227  job.bgzfEofMarker = true;
228  }
229 
230  if (!streamBuf->serializer.istream.good())
231  {
232  streamBuf->serializer.fileOfs = -1;
233  if (streamBuf->serializer.istream.eof())
234  goto eofSkip;
235  streamBuf->serializer.error = new io_error("Stream read error.");
236  return;
237  }
238 
239  job.compressedSize = DefaultPageSize<detail::bgzf_compression>::BLOCK_HEADER_LENGTH + tailLen;
240  streamBuf->serializer.fileOfs += job.compressedSize;
241  job.ready = false;
242 
243  eofSkip:
244  streamBuf->serializer.istream.clear(
245  streamBuf->serializer.istream.rdstate() & ~std::ios_base::failbit);
246  }
247 
248  if (streamBuf->runningQueue.try_push(jobId) != queue_op_status::success)
249  {
250  // signal that job is ready
251  {
253  job.ready = true;
254  }
255  job.readyEvent.notify_all();
256  return; // Terminate this thread.
257  }
258  }
259 
260  if (!job.ready)
261  {
262  // decompress block
263  job.size = _decompressBlock(
264  &job.buffer[0] + MAX_PUTBACK, job.buffer.capacity(),
265  &job.inputBuffer[0], job.compressedSize, compressionCtx);
266 
267  // signal that job is ready
268  {
270  job.ready = true;
271  }
272  job.readyEvent.notify_all();
273  }
274  }
275  }
276  };
277 
278  std::vector<std::thread> pool; // pool of worker threads
279  TBuffer putbackBuffer;
280 
281 public:
282 
283  basic_bgzf_istreambuf(istream_reference istream_,
284  size_t numThreads = bgzf_thread_count,
285  size_t jobsPerThread = 8) :
286  serializer(istream_),
287  numThreads(numThreads),
288  numJobs(numThreads * jobsPerThread),
289  runningQueue(numJobs),
290  todoQueue(numJobs),
291  runningQueueManager(detail::reader_count{1}, detail::writer_count{numThreads}, runningQueue),
292  todoQueueManager(detail::reader_count{numThreads}, detail::writer_count{1}, todoQueue),
293  putbackBuffer(MAX_PUTBACK)
294  {
295  jobs.resize(numJobs);
296  currentJobId = -1;
297 
298  // Prepare todo queue.
299  for (size_t i = 0; i < numJobs; ++i)
300  {
301  [[maybe_unused]] queue_op_status status = todoQueue.try_push(i);
302  assert(status == queue_op_status::success);
303  }
304 
305  // Start off the threads.
306  for (size_t i = 0; i < numThreads; ++i)
307  pool.emplace_back(DecompressionThread{this, CompressionContext<detail::bgzf_compression>{}});
308  }
309 
310  ~basic_bgzf_istreambuf()
311  {
312  // Signal todoQueue that no more work is coming and close todo queue.
313  todoQueueManager.writer_arrive();
314 
315  // Wait for threads to finish there active work.
316  for (auto & t : pool)
317  {
318  if (t.joinable())
319  t.join();
320  }
321 
322  // Signal running queue that reader is done.
323  runningQueueManager.reader_arrive();
324  }
325 
326  int_type underflow()
327  {
328  // no need to use the next buffer?
329  if (this->gptr() && this->gptr() < this->egptr())
330  return traits_type::to_int_type(*this->gptr());
331 
332  size_t putback = this->gptr() - this->eback();
333  if (putback > MAX_PUTBACK)
334  putback = MAX_PUTBACK;
335 
336  // save at most MAX_PUTBACK characters from previous page to putback buffer
337  if (putback != 0)
338  std::copy(
339  this->gptr() - putback,
340  this->gptr(),
341  &putbackBuffer[0]);
342 
343  if (currentJobId >= 0)
344  todoQueue.wait_push(currentJobId);
345  // appendValue(todoQueue, currentJobId);
346 
347  while (true)
348  {
349  if (runningQueue.wait_pop(currentJobId) == queue_op_status::closed)
350  {
351  currentJobId = -1;
352  assert(serializer.error != NULL);
353  if (serializer.error != NULL)
354  throw *serializer.error;
355  return EOF;
356  }
357 
358  DecompressionJob &job = jobs[currentJobId];
359 
360  // restore putback buffer
361  this->setp(&job.buffer[0], &job.buffer[0] + (job.buffer.size() - 1));
362  if (putback != 0)
363  std::copy(
364  &putbackBuffer[0],
365  &putbackBuffer[0] + putback,
366  &job.buffer[0] + (MAX_PUTBACK - putback));
367 
368  // wait for the end of decompression
369  {
371  job.readyEvent.wait(lock, [&job]{return job.ready;});
372  }
373 
374  size_t size = (job.size != -1)? job.size : 0;
375 
376  // reset buffer pointers
377  this->setg(
378  &job.buffer[0] + (MAX_PUTBACK - putback), // beginning of putback area
379  &job.buffer[0] + MAX_PUTBACK, // read position
380  &job.buffer[0] + (MAX_PUTBACK + size)); // end of buffer
381 
382  // The end of the bgzf file is reached, either if there was an error, or if the
383  // end-of-file marker was reached, while the uncompressed block had zero size.
384  if (job.size == -1 || (job.size == 0 && job.bgzfEofMarker))
385  return EOF;
386  else if (job.size > 0)
387  return traits_type::to_int_type(*this->gptr()); // return next character
388 
389  throw io_error("BGZF: Invalid end condition in decompression. "
390  "Most likely due to an empty bgzf block without end-of-file marker.");
391  }
392  }
393 
394  pos_type seekoff(off_type ofs, std::ios_base::seekdir dir, std::ios_base::openmode openMode)
395  {
396  if ((openMode & (std::ios_base::in | std::ios_base::out)) == std::ios_base::in)
397  {
398  if (dir == std::ios_base::cur && ofs >= 0)
399  {
400  // forward delta seek
401  while (currentJobId < 0 || this->egptr() - this->gptr() < ofs)
402  {
403  ofs -= this->egptr() - this->gptr();
404  if (this->underflow() == static_cast<int_type>(EOF))
405  break;
406  }
407 
408  if (currentJobId >= 0 && ofs <= this->egptr() - this->gptr())
409  {
410  DecompressionJob &job = jobs[currentJobId];
411 
412  // reset buffer pointers
413  this->setg(
414  this->eback(), // beginning of putback area
415  this->gptr() + ofs, // read position
416  this->egptr()); // end of buffer
417 
418  if (this->gptr() != this->egptr())
419  return pos_type((job.fileOfs << 16) + ((this->gptr() - &job.buffer[MAX_PUTBACK])));
420  else
421  return pos_type((job.fileOfs + job.compressedSize) << 16);
422  }
423 
424  }
425  else if (dir == std::ios_base::beg)
426  {
427  // random seek
428  std::streampos destFileOfs = ofs >> 16;
429 
430  // are we in the same block?
431  if (currentJobId >= 0 && jobs[currentJobId].fileOfs == (off_type)destFileOfs)
432  {
433  DecompressionJob &job = jobs[currentJobId];
434 
435  // reset buffer pointers
436  this->setg(
437  this->eback(), // beginning of putback area
438  &job.buffer[0] + (MAX_PUTBACK + (ofs & 0xffff)), // read position
439  this->egptr()); // end of buffer
440  return ofs;
441  }
442 
443  // ok, different block
444  {
445  std::lock_guard<std::mutex> scopedLock(serializer.lock);
446 
447  // remove all running jobs and put them in the idle queue unless we
448  // find our seek target
449 
450  if (currentJobId >= 0)
451  todoQueue.wait_push(currentJobId);
452 
453  // Note that if we are here the current job does not represent the sought block.
454  // Hence if the running queue is empty we need to explicitly unset the jobId,
455  // otherwise we would not update the serializers istream pointer to the correct position.
456  if (runningQueue.is_empty())
457  currentJobId = -1;
458 
459  // empty is thread-safe in serializer.lock
460  while (!runningQueue.is_empty())
461  {
462  runningQueue.wait_pop(currentJobId);
463 
464  if (jobs[currentJobId].fileOfs == (off_type)destFileOfs)
465  break;
466 
467  // push back useless job
468  todoQueue.wait_push(currentJobId);
469  currentJobId = -1;
470  }
471 
472  if (currentJobId == -1)
473  {
474  assert(runningQueue.is_empty());
475  serializer.istream.clear(serializer.istream.rdstate() & ~std::ios_base::eofbit);
476  if (serializer.istream.rdbuf()->pubseekpos(destFileOfs, std::ios_base::in) == destFileOfs)
477  serializer.fileOfs = destFileOfs;
478  else
479  currentJobId = -2; // temporarily signals a seek error
480  }
481  }
482 
483  // if our block wasn't in the running queue yet, it should now
484  // be the first that falls out after modifying serializer.fileOfs
485  if (currentJobId == -1)
486  runningQueue.wait_pop(currentJobId);
487  else if (currentJobId == -2)
488  currentJobId = -1;
489 
490  if (currentJobId >= 0)
491  {
492  // wait for the end of decompression
493  DecompressionJob &job = jobs[currentJobId];
494 
495  {
497  job.readyEvent.wait(lock, [&job]{return job.ready;});
498  }
499 
500  assert(job.fileOfs == (off_type)destFileOfs);
501 
502  // reset buffer pointers
503  this->setg(
504  &job.buffer[0] + MAX_PUTBACK, // no putback area
505  &job.buffer[0] + (MAX_PUTBACK + (ofs & 0xffff)), // read position
506  &job.buffer[0] + (MAX_PUTBACK + job.size)); // end of buffer
507  return ofs;
508  }
509  }
510  }
511  return pos_type(off_type(-1));
512  }
513 
514  pos_type seekpos(pos_type pos, std::ios_base::openmode openMode)
515  {
516  return seekoff(off_type(pos), std::ios_base::beg, openMode);
517  }
518 
519  // returns the compressed input istream
520  istream_reference get_istream() { return serializer.istream; };
521 };
522 
523 // --------------------------------------------------------------------------
524 // Class basic_bgzf_istreambase
525 // --------------------------------------------------------------------------
526 
527 template<
528  typename Elem,
529  typename Tr = std::char_traits<Elem>,
530  typename ElemA = std::allocator<Elem>,
531  typename ByteT = char,
532  typename ByteAT = std::allocator<ByteT>
533 >
534 class basic_bgzf_istreambase : virtual public std::basic_ios<Elem,Tr>
535 {
536 public:
537  typedef std::basic_istream<Elem, Tr>& istream_reference;
538  typedef basic_bgzf_istreambuf<Elem, Tr, ElemA, ByteT, ByteAT> decompression_bgzf_streambuf_type;
539 
540  basic_bgzf_istreambase(istream_reference istream_)
541  : m_buf(istream_)
542  {
543  this->init(&m_buf);
544  };
545 
546  // returns the underlying decompression bgzf istream object
547  decompression_bgzf_streambuf_type* rdbuf() { return &m_buf; };
548 
549  // returns the bgzf error state
550  int get_zerr() const { return m_buf.get_zerr(); };
551  // returns the uncompressed data crc
552  long get_crc() const { return m_buf.get_crc(); };
553  // returns the uncompressed data size
554  long get_out_size() const { return m_buf.get_out_size(); };
555  // returns the compressed data size
556  long get_in_size() const { return m_buf.get_in_size(); };
557 
558 private:
559  decompression_bgzf_streambuf_type m_buf;
560 };
561 
562 // --------------------------------------------------------------------------
563 // Class basic_bgzf_istream
564 // --------------------------------------------------------------------------
565 
566 template<
567  typename Elem,
568  typename Tr = std::char_traits<Elem>,
569  typename ElemA = std::allocator<Elem>,
570  typename ByteT = char,
571  typename ByteAT = std::allocator<ByteT>
572 >
573 class basic_bgzf_istream :
574  public basic_bgzf_istreambase<Elem,Tr,ElemA,ByteT,ByteAT>,
575  public std::basic_istream<Elem,Tr>
576 {
577 public:
578  typedef basic_bgzf_istreambase<Elem,Tr,ElemA,ByteT,ByteAT> bgzf_istreambase_type;
579  typedef std::basic_istream<Elem,Tr> istream_type;
580  typedef istream_type & istream_reference;
581  typedef char byte_type;
582 
583  basic_bgzf_istream(istream_reference istream_) :
584  bgzf_istreambase_type(istream_),
585  istream_type(bgzf_istreambase_type::rdbuf()),
586  m_is_gzip(false),
587  m_gbgzf_data_size(0)
588  {};
589 
590  // returns true if it is a gzip file
591  bool is_gzip() const { return m_is_gzip; };
592  // return data size check
593  bool check_data_size() const { return this->get_out_size() == m_gbgzf_data_size; };
594 
595  // return the data size in the file
596  long get_gbgzf_data_size() const { return m_gbgzf_data_size; };
597 
598 protected:
599  static void read_long(istream_reference in_, unsigned long& x_);
600 
601  int check_header();
602  bool m_is_gzip;
603  unsigned long m_gbgzf_data_size;
604 
605 #ifdef _WIN32
606 private:
607  void _Add_vtordisp1() { } // Required to avoid VC++ warning C4250
608  void _Add_vtordisp2() { } // Required to avoid VC++ warning C4250
609 #endif
610 };
611 
612 // ===========================================================================
613 // Typedefs
614 // ===========================================================================
615 
616 // A typedef for basic_bgzf_istream<char>
617 typedef basic_bgzf_istream<char> bgzf_istream;
618 // A typedef for basic_bgzf_istream<wchart>
619 typedef basic_bgzf_istream<wchar_t> bgzf_wistream;
620 
621 } // namespace seqan3::conrib
Provides stream compression utilities.
Provides seqan3::buffer_queue.
T copy(T... args)
T emplace_back(T... args)
constexpr size_t size
The size of a type pack.
Definition: traits.hpp:151
T init(T... args)
T lock(T... args)
T memcmp(T... args)
T rdbuf(T... args)
T resize(T... args)
Provides seqan3::detail::reader_writer_manager.