SeqAn3  3.0.1
The Modern C++ library for sequence analysis.
bgzf_istream.hpp
1 // zipstream Library License:
2 // --------------------------
3 //
4 // The zlib/libpng License Copyright (c) 2003 Jonathan de Halleux.
5 //
6 // This software is provided 'as-is', without any express or implied warranty. In no event will the authors be held liable for any damages arising from the use of this software.
7 //
8 // Permission is granted to anyone to use this software for any purpose, including commercial applications, and to alter it and redistribute it freely, subject to the following restrictions:
9 //
10 // 1. The origin of this software must not be misrepresented; you must not claim that you wrote the original software. If you use this software in a product, an acknowledgment in the product documentation would be appreciated but is not required.
11 //
12 // 2. Altered source versions must be plainly marked as such, and must not be misrepresented as being the original software.
13 //
14 // 3. This notice may not be removed or altered from any source distribution
15 //
16 //
17 // Author: Jonathan de Halleux, dehalleux@pelikhan.com, 2003 (original zlib stream)
18 // Author: David Weese, dave.weese@gmail.com, 2014 (extension to parallel block-wise compression in bgzf format)
19 // Author: Rene Rahn, rene.rahn AT fu-berlin.de, 2019 (adaption to SeqAn library version 3)
20 
21 #pragma once
22 
23 #include <cstring>
24 #include <condition_variable>
25 #include <mutex>
26 
30 #include <seqan3/io/exception.hpp>
31 
32 namespace seqan3::contrib
33 {
34 
35 // ===========================================================================
36 // Classes
37 // ===========================================================================
38 
39 // --------------------------------------------------------------------------
40 // Class basic_bgzf_istreambuf
41 // --------------------------------------------------------------------------
42 
43 template<
44  typename Elem,
45  typename Tr = std::char_traits<Elem>,
46  typename ElemA = std::allocator<Elem>,
47  typename ByteT = char,
48  typename ByteAT = std::allocator<ByteT>
49 >
50 class basic_bgzf_istreambuf : public std::basic_streambuf<Elem, Tr>
51 {
52 public:
53 
54  typedef Tr traits_type;
55  typedef typename traits_type::char_type char_type;
56  typedef typename traits_type::int_type int_type;
57  typedef typename traits_type::off_type off_type;
58  typedef typename traits_type::pos_type pos_type;
59 
60 private:
61 
62  typedef std::basic_istream<Elem, Tr>& istream_reference;
63 
64  typedef ElemA char_allocator_type;
65  typedef ByteT byte_type;
66  typedef ByteAT byte_allocator_type;
67  typedef byte_type* byte_buffer_type;
68 
70  typedef fixed_buffer_queue<int32_t> TJobQueue;
71 
72  static const size_t MAX_PUTBACK = 4;
73 
74  // Allows serialized access to the underlying buffer.
75  struct Serializer
76  {
77  istream_reference istream;
79  io_error *error;
80  off_type fileOfs;
81 
82  Serializer(istream_reference istream) :
83  istream(istream),
84  error(NULL),
85  fileOfs(0u)
86  {}
87 
88  ~Serializer()
89  {
90  delete error;
91  }
92  };
93 
94  Serializer serializer;
95 
96  struct DecompressionJob
97  {
99 
100  TInputBuffer inputBuffer;
101  TBuffer buffer;
102  off_type fileOfs;
103  int32_t size;
104  uint32_t compressedSize;
105 
106  std::mutex cs;
107  std::condition_variable readyEvent;
108  bool ready;
109  bool bgzfEofMarker;
110 
111  DecompressionJob() :
112  inputBuffer(DefaultPageSize<detail::bgzf_compression>::MAX_BLOCK_SIZE, 0),
113  buffer(MAX_PUTBACK + DefaultPageSize<detail::bgzf_compression>::MAX_BLOCK_SIZE / sizeof(char_type), 0),
114  fileOfs(),
115  size(0),
116  cs(),
117  readyEvent(),
118  ready(true),
119  bgzfEofMarker(false)
120  {}
121 
122  DecompressionJob(DecompressionJob const &other) :
123  inputBuffer(other.inputBuffer),
124  buffer(other.buffer),
125  fileOfs(other.fileOfs),
126  size(other.size),
127  cs(),
128  readyEvent(),
129  ready(other.ready),
130  bgzfEofMarker(other.bgzfEofMarker)
131  {}
132  };
133 
134  // string of recyclable jobs
135  size_t numThreads;
136  size_t numJobs;
138  TJobQueue runningQueue;
139  TJobQueue todoQueue;
140  detail::reader_writer_manager runningQueueManager; // synchronises reader, writer with running queue.
141  detail::reader_writer_manager todoQueueManager; // synchronises reader, writer with todo queue.
142  int currentJobId;
143 
144  struct DecompressionThread
145  {
146  basic_bgzf_istreambuf *streamBuf;
147  CompressionContext<detail::bgzf_compression> compressionCtx;
148 
149  void operator()()
150  {
151  // Active reader to consume from todo queue.
152  auto reader_raii = streamBuf->todoQueueManager.register_reader();
153  // Active writer to produce work for the decompression queue.
154  auto writer_raii = streamBuf->runningQueueManager.register_writer();
155 
156  // wait for a new job to become available
157  while (true)
158  {
159 
160  int jobId = -1;
161  if (streamBuf->todoQueue.wait_pop(jobId) == queue_op_status::closed)
162  return;
163 
164  DecompressionJob &job = streamBuf->jobs[jobId];
165  size_t tailLen = 0;
166 
167  // typically the idle queue contains only ready jobs
168  // however, if seek() fast forwards running jobs into the todoQueue
169  // the caller defers the task of waiting to the decompression threads
170  if (!job.ready)
171  {
173  job.readyEvent.wait(lock, [&job]{return job.ready;});
174  assert(job.ready == true);
175  }
176 
177  {
178  std::lock_guard<std::mutex> scopedLock(streamBuf->serializer.lock);
179 
180  job.bgzfEofMarker = false;
181  if (streamBuf->serializer.error != NULL)
182  return;
183 
184  // remember start offset (for tellg later)
185  job.fileOfs = streamBuf->serializer.fileOfs;
186  job.size = -1;
187  job.compressedSize = 0;
188 
189  // only load if not at EOF
190  if (job.fileOfs != -1)
191  {
192  // read header
193  streamBuf->serializer.istream.read(
194  (char_type*)&job.inputBuffer[0],
195  DefaultPageSize<detail::bgzf_compression>::BLOCK_HEADER_LENGTH);
196 
197  if (!streamBuf->serializer.istream.good())
198  {
199  streamBuf->serializer.fileOfs = -1;
200  if (streamBuf->serializer.istream.eof())
201  goto eofSkip;
202  streamBuf->serializer.error = new io_error("Stream read error.");
203  return;
204  }
205 
206  // check header
207  if (!detail::bgzf_compression::validate_header(std::span{job.inputBuffer}))
208  {
209  streamBuf->serializer.fileOfs = -1;
210  streamBuf->serializer.error = new io_error("Invalid BGZF block header.");
211  return;
212  }
213 
214  // extract length of compressed data
215  tailLen = _bgzfUnpack16(&job.inputBuffer[0] + 16) +
216  1u - DefaultPageSize<detail::bgzf_compression>::BLOCK_HEADER_LENGTH;
217 
218  // read compressed data and tail
219  streamBuf->serializer.istream.read(
220  (char_type*)&job.inputBuffer[0] + DefaultPageSize<detail::bgzf_compression>::BLOCK_HEADER_LENGTH,
221  tailLen);
222 
223  // Check if end-of-file marker is set
224  if (memcmp(reinterpret_cast<uint8_t const *>(&job.inputBuffer[0]),
225  reinterpret_cast<uint8_t const *>(&BGZF_END_OF_FILE_MARKER[0]),
226  28) == 0)
227  {
228  job.bgzfEofMarker = true;
229  }
230 
231  if (!streamBuf->serializer.istream.good())
232  {
233  streamBuf->serializer.fileOfs = -1;
234  if (streamBuf->serializer.istream.eof())
235  goto eofSkip;
236  streamBuf->serializer.error = new io_error("Stream read error.");
237  return;
238  }
239 
240  job.compressedSize = DefaultPageSize<detail::bgzf_compression>::BLOCK_HEADER_LENGTH + tailLen;
241  streamBuf->serializer.fileOfs += job.compressedSize;
242  job.ready = false;
243 
244  eofSkip:
245  streamBuf->serializer.istream.clear(
246  streamBuf->serializer.istream.rdstate() & ~std::ios_base::failbit);
247  }
248 
249  if (streamBuf->runningQueue.try_push(jobId) != queue_op_status::success)
250  {
251  // signal that job is ready
252  {
254  job.ready = true;
255  }
256  job.readyEvent.notify_all();
257  return; // Terminate this thread.
258  }
259  }
260 
261  if (!job.ready)
262  {
263  // decompress block
264  job.size = _decompressBlock(
265  &job.buffer[0] + MAX_PUTBACK, job.buffer.capacity(),
266  &job.inputBuffer[0], job.compressedSize, compressionCtx);
267 
268  // signal that job is ready
269  {
271  job.ready = true;
272  }
273  job.readyEvent.notify_all();
274  }
275  }
276  }
277  };
278 
279  std::vector<std::thread> pool; // pool of worker threads
280  TBuffer putbackBuffer;
281 
282 public:
283 
284  basic_bgzf_istreambuf(istream_reference istream_,
285  size_t numThreads = bgzf_thread_count,
286  size_t jobsPerThread = 8) :
287  serializer(istream_),
288  numThreads(numThreads),
289  numJobs(numThreads * jobsPerThread),
290  runningQueue(numJobs),
291  todoQueue(numJobs),
292  runningQueueManager(detail::reader_count{1}, detail::writer_count{numThreads}, runningQueue),
293  todoQueueManager(detail::reader_count{numThreads}, detail::writer_count{1}, todoQueue),
294  putbackBuffer(MAX_PUTBACK)
295  {
296  jobs.resize(numJobs);
297  currentJobId = -1;
298 
299  // Prepare todo queue.
300  for (size_t i = 0; i < numJobs; ++i)
301  {
302  [[maybe_unused]] queue_op_status status = todoQueue.try_push(i);
303  assert(status == queue_op_status::success);
304  }
305 
306  // Start off the threads.
307  for (size_t i = 0; i < numThreads; ++i)
308  pool.emplace_back(DecompressionThread{this, CompressionContext<detail::bgzf_compression>{}});
309  }
310 
311  ~basic_bgzf_istreambuf()
312  {
313  // Signal todoQueue that no more work is coming and close todo queue.
314  todoQueueManager.writer_arrive();
315 
316  // Wait for threads to finish there active work.
317  for (auto & t : pool)
318  {
319  if (t.joinable())
320  t.join();
321  }
322 
323  // Signal running queue that reader is done.
324  runningQueueManager.reader_arrive();
325  }
326 
327  int_type underflow()
328  {
329  // no need to use the next buffer?
330  if (this->gptr() && this->gptr() < this->egptr())
331  return traits_type::to_int_type(*this->gptr());
332 
333  size_t putback = this->gptr() - this->eback();
334  if (putback > MAX_PUTBACK)
335  putback = MAX_PUTBACK;
336 
337  // save at most MAX_PUTBACK characters from previous page to putback buffer
338  if (putback != 0)
339  std::copy(
340  this->gptr() - putback,
341  this->gptr(),
342  &putbackBuffer[0]);
343 
344  if (currentJobId >= 0)
345  todoQueue.wait_push(currentJobId);
346  // appendValue(todoQueue, currentJobId);
347 
348  while (true)
349  {
350  if (runningQueue.wait_pop(currentJobId) == queue_op_status::closed)
351  {
352  currentJobId = -1;
353  assert(serializer.error != NULL);
354  if (serializer.error != NULL)
355  throw *serializer.error;
356  return EOF;
357  }
358 
359  DecompressionJob &job = jobs[currentJobId];
360 
361  // restore putback buffer
362  this->setp(&job.buffer[0], &job.buffer[0] + (job.buffer.size() - 1));
363  if (putback != 0)
364  std::copy(
365  &putbackBuffer[0],
366  &putbackBuffer[0] + putback,
367  &job.buffer[0] + (MAX_PUTBACK - putback));
368 
369  // wait for the end of decompression
370  {
372  job.readyEvent.wait(lock, [&job]{return job.ready;});
373  }
374 
375  size_t size = (job.size != -1)? job.size : 0;
376 
377  // reset buffer pointers
378  this->setg(
379  &job.buffer[0] + (MAX_PUTBACK - putback), // beginning of putback area
380  &job.buffer[0] + MAX_PUTBACK, // read position
381  &job.buffer[0] + (MAX_PUTBACK + size)); // end of buffer
382 
383  // The end of the bgzf file is reached, either if there was an error, or if the
384  // end-of-file marker was reached, while the uncompressed block had zero size.
385  if (job.size == -1 || (job.size == 0 && job.bgzfEofMarker))
386  return EOF;
387  else if (job.size > 0)
388  return traits_type::to_int_type(*this->gptr()); // return next character
389 
390  throw io_error("BGZF: Invalid end condition in decompression. "
391  "Most likely due to an empty bgzf block without end-of-file marker.");
392  }
393  }
394 
395  pos_type seekoff(off_type ofs, std::ios_base::seekdir dir, std::ios_base::openmode openMode)
396  {
397  if ((openMode & (std::ios_base::in | std::ios_base::out)) == std::ios_base::in)
398  {
399  if (dir == std::ios_base::cur && ofs >= 0)
400  {
401  // forward delta seek
402  while (currentJobId < 0 || this->egptr() - this->gptr() < ofs)
403  {
404  ofs -= this->egptr() - this->gptr();
405  if (this->underflow() == static_cast<int_type>(EOF))
406  break;
407  }
408 
409  if (currentJobId >= 0 && ofs <= this->egptr() - this->gptr())
410  {
411  DecompressionJob &job = jobs[currentJobId];
412 
413  // reset buffer pointers
414  this->setg(
415  this->eback(), // beginning of putback area
416  this->gptr() + ofs, // read position
417  this->egptr()); // end of buffer
418 
419  if (this->gptr() != this->egptr())
420  return pos_type((job.fileOfs << 16) + ((this->gptr() - &job.buffer[MAX_PUTBACK])));
421  else
422  return pos_type((job.fileOfs + job.compressedSize) << 16);
423  }
424 
425  }
426  else if (dir == std::ios_base::beg)
427  {
428  // random seek
429  std::streampos destFileOfs = ofs >> 16;
430 
431  // are we in the same block?
432  if (currentJobId >= 0 && jobs[currentJobId].fileOfs == (off_type)destFileOfs)
433  {
434  DecompressionJob &job = jobs[currentJobId];
435 
436  // reset buffer pointers
437  this->setg(
438  this->eback(), // beginning of putback area
439  &job.buffer[0] + (MAX_PUTBACK + (ofs & 0xffff)), // read position
440  this->egptr()); // end of buffer
441  return ofs;
442  }
443 
444  // ok, different block
445  {
446  std::lock_guard<std::mutex> scopedLock(serializer.lock);
447 
448  // remove all running jobs and put them in the idle queue unless we
449  // find our seek target
450 
451  if (currentJobId >= 0)
452  todoQueue.wait_push(currentJobId);
453 
454  // Note that if we are here the current job does not represent the sought block.
455  // Hence if the running queue is empty we need to explicitly unset the jobId,
456  // otherwise we would not update the serializers istream pointer to the correct position.
457  if (runningQueue.is_empty())
458  currentJobId = -1;
459 
460  // empty is thread-safe in serializer.lock
461  while (!runningQueue.is_empty())
462  {
463  runningQueue.wait_pop(currentJobId);
464 
465  if (jobs[currentJobId].fileOfs == (off_type)destFileOfs)
466  break;
467 
468  // push back useless job
469  todoQueue.wait_push(currentJobId);
470  currentJobId = -1;
471  }
472 
473  if (currentJobId == -1)
474  {
475  assert(runningQueue.is_empty());
476  serializer.istream.clear(serializer.istream.rdstate() & ~std::ios_base::eofbit);
477  if (serializer.istream.rdbuf()->pubseekpos(destFileOfs, std::ios_base::in) == destFileOfs)
478  serializer.fileOfs = destFileOfs;
479  else
480  currentJobId = -2; // temporarily signals a seek error
481  }
482  }
483 
484  // if our block wasn't in the running queue yet, it should now
485  // be the first that falls out after modifying serializer.fileOfs
486  if (currentJobId == -1)
487  runningQueue.wait_pop(currentJobId);
488  else if (currentJobId == -2)
489  currentJobId = -1;
490 
491  if (currentJobId >= 0)
492  {
493  // wait for the end of decompression
494  DecompressionJob &job = jobs[currentJobId];
495 
496  {
498  job.readyEvent.wait(lock, [&job]{return job.ready;});
499  }
500 
501  assert(job.fileOfs == (off_type)destFileOfs);
502 
503  // reset buffer pointers
504  this->setg(
505  &job.buffer[0] + MAX_PUTBACK, // no putback area
506  &job.buffer[0] + (MAX_PUTBACK + (ofs & 0xffff)), // read position
507  &job.buffer[0] + (MAX_PUTBACK + job.size)); // end of buffer
508  return ofs;
509  }
510  }
511  }
512  return pos_type(off_type(-1));
513  }
514 
515  pos_type seekpos(pos_type pos, std::ios_base::openmode openMode)
516  {
517  return seekoff(off_type(pos), std::ios_base::beg, openMode);
518  }
519 
520  // returns the compressed input istream
521  istream_reference get_istream() { return serializer.istream; };
522 };
523 
524 // --------------------------------------------------------------------------
525 // Class basic_bgzf_istreambase
526 // --------------------------------------------------------------------------
527 
528 template<
529  typename Elem,
530  typename Tr = std::char_traits<Elem>,
531  typename ElemA = std::allocator<Elem>,
532  typename ByteT = char,
533  typename ByteAT = std::allocator<ByteT>
534 >
535 class basic_bgzf_istreambase : virtual public std::basic_ios<Elem,Tr>
536 {
537 public:
538  typedef std::basic_istream<Elem, Tr>& istream_reference;
539  typedef basic_bgzf_istreambuf<Elem, Tr, ElemA, ByteT, ByteAT> decompression_bgzf_streambuf_type;
540 
541  basic_bgzf_istreambase(istream_reference istream_)
542  : m_buf(istream_)
543  {
544  this->init(&m_buf);
545  };
546 
547  // returns the underlying decompression bgzf istream object
548  decompression_bgzf_streambuf_type* rdbuf() { return &m_buf; };
549 
550  // returns the bgzf error state
551  int get_zerr() const { return m_buf.get_zerr(); };
552  // returns the uncompressed data crc
553  long get_crc() const { return m_buf.get_crc(); };
554  // returns the uncompressed data size
555  long get_out_size() const { return m_buf.get_out_size(); };
556  // returns the compressed data size
557  long get_in_size() const { return m_buf.get_in_size(); };
558 
559 private:
560  decompression_bgzf_streambuf_type m_buf;
561 };
562 
563 // --------------------------------------------------------------------------
564 // Class basic_bgzf_istream
565 // --------------------------------------------------------------------------
566 
567 template<
568  typename Elem,
569  typename Tr = std::char_traits<Elem>,
570  typename ElemA = std::allocator<Elem>,
571  typename ByteT = char,
572  typename ByteAT = std::allocator<ByteT>
573 >
574 class basic_bgzf_istream :
575  public basic_bgzf_istreambase<Elem,Tr,ElemA,ByteT,ByteAT>,
576  public std::basic_istream<Elem,Tr>
577 {
578 public:
579  typedef basic_bgzf_istreambase<Elem,Tr,ElemA,ByteT,ByteAT> bgzf_istreambase_type;
580  typedef std::basic_istream<Elem,Tr> istream_type;
581  typedef istream_type & istream_reference;
582  typedef char byte_type;
583 
584  basic_bgzf_istream(istream_reference istream_) :
585  bgzf_istreambase_type(istream_),
586  istream_type(bgzf_istreambase_type::rdbuf()),
587  m_is_gzip(false),
588  m_gbgzf_data_size(0)
589  {};
590 
591  // returns true if it is a gzip file
592  bool is_gzip() const { return m_is_gzip; };
593  // return data size check
594  bool check_data_size() const { return this->get_out_size() == m_gbgzf_data_size; };
595 
596  // return the data size in the file
597  long get_gbgzf_data_size() const { return m_gbgzf_data_size; };
598 
599 protected:
600  static void read_long(istream_reference in_, unsigned long& x_);
601 
602  int check_header();
603  bool m_is_gzip;
604  unsigned long m_gbgzf_data_size;
605 
606 #ifdef _WIN32
607 private:
608  void _Add_vtordisp1() { } // Required to avoid VC++ warning C4250
609  void _Add_vtordisp2() { } // Required to avoid VC++ warning C4250
610 #endif
611 };
612 
613 // ===========================================================================
614 // Typedefs
615 // ===========================================================================
616 
617 // A typedef for basic_bgzf_istream<char>
618 typedef basic_bgzf_istream<char> bgzf_istream;
619 // A typedef for basic_bgzf_istream<wchart>
620 typedef basic_bgzf_istream<wchar_t> bgzf_wistream;
621 
622 } // namespace seqan3::conrib
std::vector::resize
T resize(T... args)
std::lock
T lock(T... args)
std::basic_ios::rdbuf
T rdbuf(T... args)
reader_writer_manager.hpp
Provides seqan3::detail::reader_writer_manager.
std::span
cstring
buffer_queue.hpp
Provides seqan3::buffer_queue.
std::vector
bgzf_stream_util.hpp
Provides stream compression utilities.
std::lock_guard
std::basic_streambuf
std::char_traits
std::streampos
std::unique_lock
exception.hpp
Provides exceptions used in the I/O module.
std::copy
T copy(T... args)
std::experimental::filesystem::status
T status(T... args)
seqan3::pack_traits::size
constexpr size_t size
The size of a type pack.
Definition: traits.hpp:116
std::basic_ios::init
T init(T... args)
std::vector::emplace_back
T emplace_back(T... args)
std
SeqAn specific customisations in the standard namespace.
condition_variable
std::allocator
mutex
std::memcmp
T memcmp(T... args)
std::basic_istream< Elem, Tr >
std::basic_ios