SeqAn3  3.0.1
The Modern C++ library for sequence analysis.
async_input_buffer.hpp
Go to the documentation of this file.
1 // -----------------------------------------------------------------------------------------------------
2 // Copyright (c) 2006-2020, Knut Reinert & Freie Universität Berlin
3 // Copyright (c) 2016-2020, Knut Reinert & MPI für molekulare Genetik
4 // This file may be used, modified and/or redistributed under the terms of the 3-clause BSD-License
5 // shipped with this file and also available at: https://github.com/seqan/seqan3/blob/master/LICENSE.md
6 // -----------------------------------------------------------------------------------------------------
7 
13 #pragma once
14 
15 #include <thread>
16 
21 #include <seqan3/std/concepts>
22 #include <seqan3/std/iterator>
23 #include <seqan3/std/ranges>
24 
25 //-----------------------------------------------------------------------------
26 // This is the path a value takes when using this views::
27 // urange
28 // → async_input_buffer_view.buffer [size n]
29 // → async_input_buffer_iterator.cached_value [size 1]
30 // → user
31 //-----------------------------------------------------------------------------
32 
33 namespace seqan3::detail
34 {
35 
41 template <std::ranges::range urng_t>
42 class async_input_buffer_view : public std::ranges::view_interface<async_input_buffer_view<urng_t>>
43 {
44 private:
45  static_assert(std::ranges::input_range<urng_t>,
46  "The range parameter to async_input_buffer_view must be at least an std::ranges::InputRange.");
47  static_assert(std::ranges::view<urng_t>,
48  "The range parameter to async_input_buffer_view must model std::ranges::View.");
49  static_assert(std::movable<value_type_t<urng_t>>,
50  "The range parameter to async_input_buffer_view must have a value_type that is std::Movable.");
51  static_assert(std::constructible_from<value_type_t<urng_t>, std::remove_reference_t<reference_t<urng_t>> &&>,
52  "The range parameter to async_input_buffer_view must have a value_type that is constructible by a moved "
53  "value of its reference type.");
54 
56  using urng_iterator_type = std::ranges::iterator_t<urng_t>;
57 
59  struct state
60  {
62  urng_t urange;
63 
65  contrib::fixed_buffer_queue<value_type_t<urng_t>> buffer;
66 
68  std::thread producer;
69  };
70 
72  std::shared_ptr<state> state_ptr = nullptr;
73 
75  class async_input_buffer_iterator;
76 
77 public:
81  async_input_buffer_view() = default;
82  async_input_buffer_view(async_input_buffer_view const &) = default;
83  async_input_buffer_view(async_input_buffer_view &&) = default;
84  async_input_buffer_view & operator=(async_input_buffer_view const &) = default;
85  async_input_buffer_view & operator=(async_input_buffer_view &&) = default;
86  ~async_input_buffer_view() = default;
87 
89  async_input_buffer_view(urng_t _urng, size_t const buffer_size)
90  {
91  auto deleter = [] (state * p)
92  {
93  if (p != nullptr)
94  {
95  p->buffer.close();
96  p->producer.join();
97  delete p;
98  }
99  };
100 
101  state_ptr = std::shared_ptr<state>(new state{std::move(_urng),
102  contrib::fixed_buffer_queue<value_type_t<urng_t>>{buffer_size},
103  std::thread{}}, // thread is set/started below, needs rest of state
104  deleter);
105 
106  auto runner = [&state = *state_ptr] ()
107  {
108  for (auto && val : state.urange)
109  if (state.buffer.wait_push(std::move(val)) == contrib::queue_op_status::closed)
110  break;
111 
112  state.buffer.close();
113  };
114 
115  state_ptr->producer = std::thread{runner};
116  }
117 
119  template <typename other_urng_t>
121  requires !std::same_as<remove_cvref_t<other_urng_t>, async_input_buffer_view> && // prevent recursive instantiation
122  std::ranges::viewable_range<other_urng_t> &&
125  async_input_buffer_view(other_urng_t && _urng, size_t const buffer_size) :
126  async_input_buffer_view{std::views::all(_urng), buffer_size}
127  {}
129 
144  async_input_buffer_iterator begin()
145  {
146  assert(state_ptr != nullptr);
147  return {state_ptr->buffer};
148  }
149 
151  async_input_buffer_iterator begin() const = delete;
152 
154  async_input_buffer_iterator cbegin() const = delete;
155 
157  std::ranges::default_sentinel_t end()
158  {
159  return std::ranges::default_sentinel;
160  }
161 
163  std::ranges::default_sentinel_t end() const = delete;
164 
166  std::ranges::default_sentinel_t cend() const = delete;
168 };
169 
171 template <typename urng_t>
172 class async_input_buffer_view<urng_t>::async_input_buffer_iterator
173 {
175  using sentinel_type = std::ranges::default_sentinel_t;
176 
178  contrib::fixed_buffer_queue<value_type_t<urng_t>> * buffer_ptr = nullptr;
179 
181  mutable value_type_t<urng_t> cached_value;
182 
184  bool at_end = false;
185 
186 public:
187 
191  using difference_type = difference_type_t<urng_iterator_type>;
194  using value_type = value_type_t<urng_iterator_type>;
196  using pointer = value_type *;
198  using reference = value_type &;
200  using iterator_category = void;
202  using iterator_concept = std::input_iterator_tag;
204 
209  async_input_buffer_iterator() = default;
210  //TODO: delete:
211  async_input_buffer_iterator(async_input_buffer_iterator const & rhs) = default;
212  async_input_buffer_iterator(async_input_buffer_iterator && rhs) = default;
213  //TODO: delete:
214  async_input_buffer_iterator & operator=(async_input_buffer_iterator const & rhs) = default;
215  async_input_buffer_iterator & operator=(async_input_buffer_iterator && rhs) = default;
216  ~async_input_buffer_iterator() noexcept = default;
217 
219  async_input_buffer_iterator(contrib::fixed_buffer_queue<value_type_t<urng_t>> & buffer) noexcept :
220  buffer_ptr{&buffer}
221  {
222  ++(*this); // cache first value
223  }
225 
229  reference operator*() const noexcept
231  {
232  return cached_value;
233  }
234 
236  pointer operator->() const noexcept
237  {
238  return std::addressof(cached_value);
239  }
241 
245  async_input_buffer_iterator & operator++() noexcept
247  {
248  if (at_end) // TODO unlikely
249  return *this;
250 
251  assert(buffer_ptr != nullptr);
252 
253  if (buffer_ptr->wait_pop(cached_value) == contrib::queue_op_status::closed)
254  at_end = true;
255 
256  return *this;
257  }
258 
260  void operator++(int) noexcept
261  {
262  ++(*this);
263  }
265 
269  friend constexpr bool operator==(async_input_buffer_iterator const & lhs,
271  std::ranges::default_sentinel_t const &) noexcept
272  {
273  return lhs.at_end;
274  }
275 
277  friend constexpr bool operator==(std::ranges::default_sentinel_t const &,
278  async_input_buffer_iterator const & rhs) noexcept
279  {
280  return rhs == std::ranges::default_sentinel_t{};
281  }
282 
284  friend constexpr bool operator!=(async_input_buffer_iterator const & lhs,
285  std::ranges::default_sentinel_t const &) noexcept
286  {
287  return !(lhs == std::ranges::default_sentinel_t{});
288  }
289 
291  friend constexpr bool operator!=(std::ranges::default_sentinel_t const &,
292  async_input_buffer_iterator const & rhs) noexcept
293  {
294  return rhs != std::ranges::default_sentinel_t{};
295  }
297 };
298 
304 template <std::ranges::viewable_range urng_t>
306 async_input_buffer_view(urng_t &&, size_t const buffer_size) -> async_input_buffer_view<std::ranges::all_view<urng_t>>;
308 
309 // ============================================================================
310 // async_input_buffer_fn (adaptor definition
311 // ============================================================================
312 
314 struct async_input_buffer_fn
315 {
317  constexpr auto operator()(size_t const buffer_size) const
318  {
319  return detail::adaptor_from_functor{*this, buffer_size};
320  }
321 
327  template <std::ranges::range urng_t>
328  constexpr auto operator()(urng_t && urange, size_t const buffer_size) const
329  {
330  static_assert(std::ranges::input_range<urng_t>,
331  "The range parameter to views::async_input_buffer must be at least an std::ranges::InputRange.");
332  static_assert(std::ranges::viewable_range<urng_t>,
333  "The range parameter to views::async_input_buffer cannot be a temporary of a non-view range.");
334  static_assert(std::movable<value_type_t<urng_t>>,
335  "The range parameter to views::async_input_buffer must have a value_type that is std::Movable.");
336  static_assert(std::constructible_from<value_type_t<urng_t>, std::remove_reference_t<reference_t<urng_t>> &&>,
337  "The range parameter to views::async_input_buffer must have a value_type that is constructible by a moved "
338  "value of its reference type.");
339 
340  if (buffer_size == 0)
341  throw std::invalid_argument{"The buffer_size parameter to views::async_input_buffer must be > 0."};
342 
343  return detail::async_input_buffer_view{std::forward<urng_t>(urange), buffer_size};
344  }
345 };
346 
347 } // seqan3::detail
348 
349 //-----------------------------------------------------------------------------
350 // View shortcut for functor.
351 //-----------------------------------------------------------------------------
352 
353 namespace seqan3::views
354 {
495 inline constexpr auto async_input_buffer = detail::async_input_buffer_fn{};
496 
498 } // namespace seqan3::views
shortcuts.hpp
Provides various shortcuts for common std::ranges functions.
seqan3::views
The SeqAn namespace for views.
Definition: view_to_simd.hpp:672
std::shared_ptr
constructible_from
The std::constructible_from concept specifies that a variable of type T can be initialized with the g...
all.hpp
Provides various type traits.
buffer_queue.hpp
Provides seqan3::buffer_queue.
std::rel_ops::operator!=
T operator!=(T... args)
movable
Subsumes std::Object, std::move_constructible, std::swappable bool and requires that the type be std:...
iterator
Provides C++20 additions to the <iterator> header.
std::input_iterator_tag
seqan3::views::move
const auto move
A view that turns lvalue-references into rvalue-references.
Definition: move.hpp:68
concepts
The Concepts library.
same_as
The concept std::same_as<T, U> is satisfied if and only if T and U denote the same type.
std::addressof
T addressof(T... args)
thread
seqan3::value_type_t
typename value_type< t >::type value_type_t
Shortcut for seqan3::value_type (transformation_trait shortcut).
Definition: pre.hpp:48
seqan3::views::async_input_buffer
constexpr auto async_input_buffer
A view adapter that returns a concurrent-queue-like view over the underlying range.
Definition: async_input_buffer.hpp:495
std::invalid_argument
seqan3::search_cfg::all
constexpr detail::search_mode_all all
Configuration element to receive all hits within the error bounds.
Definition: mode.hpp:43
ranges
Adaptations of concepts from the Ranges TS.
std::remove_reference_t
std::begin
T begin(T... args)
std::end
T end(T... args)
detail.hpp
Auxiliary header for the views submodule .