SeqAn3  3.0.3
The Modern C++ library for sequence analysis.
async_input_buffer.hpp
Go to the documentation of this file.
1 // -----------------------------------------------------------------------------------------------------
2 // Copyright (c) 2006-2021, Knut Reinert & Freie Universität Berlin
3 // Copyright (c) 2016-2021, Knut Reinert & MPI für molekulare Genetik
4 // This file may be used, modified and/or redistributed under the terms of the 3-clause BSD-License
5 // shipped with this file and also available at: https://github.com/seqan/seqan3/blob/master/LICENSE.md
6 // -----------------------------------------------------------------------------------------------------
7 
13 #pragma once
14 
15 #include <seqan3/std/concepts>
16 #include <seqan3/std/iterator>
17 #include <seqan3/std/ranges>
18 #include <thread>
19 
22 
23 //-----------------------------------------------------------------------------
24 // This is the path a value takes when using this views::
25 // urange
26 // → async_input_buffer_view.buffer [size n]
27 // → iterator.cached_value [size 1]
28 // → user
29 //-----------------------------------------------------------------------------
30 
31 namespace seqan3::detail
32 {
33 
39 template <std::ranges::range urng_t>
40 class async_input_buffer_view : public std::ranges::view_interface<async_input_buffer_view<urng_t>>
41 {
42 private:
43  static_assert(std::ranges::input_range<urng_t>,
44  "The range parameter to async_input_buffer_view must be at least an std::ranges::input_range.");
45  static_assert(std::ranges::view<urng_t>,
46  "The range parameter to async_input_buffer_view must model std::ranges::view.");
47  static_assert(std::movable<std::ranges::range_value_t<urng_t>>,
48  "The range parameter to async_input_buffer_view must have a value_type that is std::movable.");
49  static_assert(std::constructible_from<std::ranges::range_value_t<urng_t>,
50  std::remove_reference_t<std::ranges::range_reference_t<urng_t>> &&>,
51  "The range parameter to async_input_buffer_view must have a value_type that is constructible by a moved "
52  "value of its reference type.");
53 
55  using urng_iterator_type = std::ranges::iterator_t<urng_t>;
56 
58  struct state
59  {
61  urng_t urange;
62 
64  contrib::fixed_buffer_queue<std::ranges::range_value_t<urng_t>> buffer;
65 
67  std::thread producer;
68  };
69 
71  std::shared_ptr<state> state_ptr = nullptr;
72 
74  class iterator;
75 
76 public:
80  async_input_buffer_view() = default;
81  async_input_buffer_view(async_input_buffer_view const &) = default;
82  async_input_buffer_view(async_input_buffer_view &&) = default;
83  async_input_buffer_view & operator=(async_input_buffer_view const &) = default;
84  async_input_buffer_view & operator=(async_input_buffer_view &&) = default;
85  ~async_input_buffer_view() = default;
86 
88  async_input_buffer_view(urng_t _urng, size_t const buffer_size)
89  {
90  auto deleter = [] (state * p)
91  {
92  if (p != nullptr)
93  {
94  p->buffer.close();
95  p->producer.join();
96  delete p;
97  }
98  };
99 
100  state_ptr = std::shared_ptr<state>(new state{std::move(_urng),
101  contrib::fixed_buffer_queue<std::ranges::range_value_t<urng_t>>{buffer_size},
102  std::thread{}}, // thread is set/started below, needs rest of state
103  deleter);
104 
105  auto runner = [&state = *state_ptr] ()
106  {
107  for (auto && val : state.urange)
108  if (state.buffer.wait_push(std::move(val)) == contrib::queue_op_status::closed)
109  break;
110 
111  state.buffer.close();
112  };
113 
114  state_ptr->producer = std::thread{runner};
115  }
116 
118  template <typename other_urng_t>
120  requires (!std::same_as<std::remove_cvref_t<other_urng_t>, async_input_buffer_view>) && // prevent recursive instantiation
121  std::ranges::viewable_range<other_urng_t> &&
122  std::constructible_from<urng_t, std::ranges::ref_view<std::remove_reference_t<other_urng_t>>>
124  async_input_buffer_view(other_urng_t && _urng, size_t const buffer_size) :
125  async_input_buffer_view{std::views::all(_urng), buffer_size}
126  {}
128 
143  iterator begin()
144  {
145  assert(state_ptr != nullptr);
146  return {state_ptr->buffer};
147  }
148 
150  iterator begin() const = delete;
151 
153  std::default_sentinel_t end()
154  {
155  return std::default_sentinel;
156  }
157 
159  std::default_sentinel_t end() const = delete;
161 };
162 
164 template <typename urng_t>
165 class async_input_buffer_view<urng_t>::iterator
166 {
168  using sentinel_type = std::default_sentinel_t;
169 
171  contrib::fixed_buffer_queue<std::ranges::range_value_t<urng_t>> * buffer_ptr = nullptr;
172 
174  mutable std::ranges::range_value_t<urng_t> cached_value;
175 
177  bool at_end = false;
178 
179 public:
180 
185  using difference_type = std::iter_difference_t<urng_iterator_type>;
187  using value_type = std::iter_value_t<urng_iterator_type>;
189  using pointer = detail::iter_pointer_t<urng_iterator_type>;
193  using iterator_category = std::input_iterator_tag;
195  using iterator_concept = iterator_category;
197 
202  iterator() = default;
203  //TODO: delete:
204  iterator(iterator const & rhs) = default;
205  iterator(iterator && rhs) = default;
206  //TODO: delete:
207  iterator & operator=(iterator const & rhs) = default;
208  iterator & operator=(iterator && rhs) = default;
209  ~iterator() noexcept = default;
210 
212  iterator(contrib::fixed_buffer_queue<std::ranges::range_value_t<urng_t>> & buffer) noexcept : buffer_ptr{&buffer}
213  {
214  ++(*this); // cache first value
215  }
217 
222  reference operator*() const noexcept
223  {
224  return cached_value;
225  }
226 
228  pointer operator->() const noexcept
229  {
230  return std::addressof(cached_value);
231  }
233 
238  iterator & operator++() noexcept
239  {
240  if (at_end) // TODO unlikely
241  return *this;
242 
243  assert(buffer_ptr != nullptr);
244 
245  if (buffer_ptr->wait_pop(cached_value) == contrib::queue_op_status::closed)
246  at_end = true;
247 
248  return *this;
249  }
250 
252  void operator++(int) noexcept
253  {
254  ++(*this);
255  }
257 
262  friend constexpr bool operator==(iterator const & lhs, std::default_sentinel_t const &) noexcept
263  {
264  return lhs.at_end;
265  }
266 
268  friend constexpr bool operator==(std::default_sentinel_t const &, iterator const & rhs) noexcept
269  {
270  return rhs == std::default_sentinel_t{};
271  }
272 
274  friend constexpr bool operator!=(iterator const & lhs, std::default_sentinel_t const &) noexcept
275  {
276  return !(lhs == std::default_sentinel_t{});
277  }
278 
280  friend constexpr bool operator!=(std::default_sentinel_t const &, iterator const & rhs) noexcept
281  {
282  return rhs != std::default_sentinel_t{};
283  }
285 };
286 
293 template <std::ranges::viewable_range urng_t>
294 async_input_buffer_view(urng_t &&, size_t const buffer_size) -> async_input_buffer_view<std::views::all_t<urng_t>>;
296 
297 // ============================================================================
298 // async_input_buffer_fn (adaptor definition
299 // ============================================================================
300 
302 struct async_input_buffer_fn
303 {
305  constexpr auto operator()(size_t const buffer_size) const
306  {
307  return detail::adaptor_from_functor{*this, buffer_size};
308  }
309 
315  template <std::ranges::range urng_t>
316  constexpr auto operator()(urng_t && urange, size_t const buffer_size) const
317  {
318  static_assert(std::ranges::input_range<urng_t>,
319  "The range parameter to views::async_input_buffer must be at least an std::ranges::input_range.");
320  static_assert(std::ranges::viewable_range<urng_t>,
321  "The range parameter to views::async_input_buffer cannot be a temporary of a non-view range.");
322  static_assert(std::movable<std::ranges::range_value_t<urng_t>>,
323  "The range parameter to views::async_input_buffer must have a value_type that is std::movable.");
324  static_assert(std::constructible_from<std::ranges::range_value_t<urng_t>,
325  std::remove_reference_t<std::ranges::range_reference_t<urng_t>> &&>,
326  "The range parameter to views::async_input_buffer must have a value_type that is constructible by a moved "
327  "value of its reference type.");
328 
329  if (buffer_size == 0)
330  throw std::invalid_argument{"The buffer_size parameter to views::async_input_buffer must be > 0."};
331 
332  return detail::async_input_buffer_view{std::forward<urng_t>(urange), buffer_size};
333  }
334 };
335 
336 } // seqan3::detail
337 
338 //-----------------------------------------------------------------------------
339 // View shortcut for functor.
340 //-----------------------------------------------------------------------------
341 
342 namespace seqan3::views
343 {
483 inline constexpr auto async_input_buffer = detail::async_input_buffer_fn{};
484 
486 } // namespace seqan3::views
Provides seqan3::detail::adaptor_from_functor.
T addressof(T... args)
T begin(T... args)
Provides seqan3::buffer_queue.
The Concepts library.
T end(T... args)
constexpr auto async_input_buffer
A view adapter that returns a concurrent-queue-like view over the underlying range.
Definition: async_input_buffer.hpp:483
auto const move
A view that turns lvalue-references into rvalue-references.
Definition: move.hpp:74
Provides C++20 additions to the <iterator> header.
The SeqAn namespace for views.
Definition: char_to.hpp:22
SeqAn specific customisations in the standard namespace.
T operator!=(T... args)
Adaptations of concepts from the Ranges TS.