SeqAn3 3.4.0-rc.1
The Modern C++ library for sequence analysis.
Loading...
Searching...
No Matches
async_input_buffer.hpp
Go to the documentation of this file.
1// SPDX-FileCopyrightText: 2006-2024 Knut Reinert & Freie Universität Berlin
2// SPDX-FileCopyrightText: 2016-2024 Knut Reinert & MPI für molekulare Genetik
3// SPDX-License-Identifier: BSD-3-Clause
4
10#pragma once
11
12#include <concepts>
13#include <iterator>
14#include <memory>
15#include <ranges>
16#include <thread>
17
20
21//-----------------------------------------------------------------------------
22// This is the path a value takes when using this views::
23// urange
24// → async_input_buffer_view.buffer [size n]
25// → iterator.cached_value [size 1]
26// → user
27//-----------------------------------------------------------------------------
28
29namespace seqan3::detail
30{
31
37template <std::ranges::range urng_t>
38class async_input_buffer_view : public std::ranges::view_interface<async_input_buffer_view<urng_t>>
39{
40private:
41 static_assert(std::ranges::input_range<urng_t>,
42 "The range parameter to async_input_buffer_view must be at least a std::ranges::input_range.");
43 static_assert(std::ranges::view<urng_t>,
44 "The range parameter to async_input_buffer_view must model std::ranges::view.");
45 static_assert(std::movable<std::ranges::range_value_t<urng_t>>,
46 "The range parameter to async_input_buffer_view must have a value_type that is std::movable.");
47 static_assert(
48 std::constructible_from<std::ranges::range_value_t<urng_t>,
50 "The range parameter to async_input_buffer_view must have a value_type that is constructible by a moved "
51 "value of its reference type.");
52
54 using urng_iterator_type = std::ranges::iterator_t<urng_t>;
55
57 struct state
58 {
60 urng_t urange;
61
63 contrib::fixed_buffer_queue<std::ranges::range_value_t<urng_t>> buffer;
64
67 };
68
71
73 class iterator;
74
75public:
85
87 async_input_buffer_view(urng_t _urng, size_t const buffer_size)
88 {
89 auto deleter = [](state * p)
90 {
91 if (p != nullptr)
92 {
93 p->buffer.close();
94 p->producer.join();
95 delete p;
96 }
97 };
98
100 new state{std::move(_urng),
101 contrib::fixed_buffer_queue<std::ranges::range_value_t<urng_t>>{buffer_size},
102 std::thread{}}, // thread is set/started below, needs rest of state
103 deleter);
104
105 auto runner = [&state = *state_ptr]()
106 {
107 for (auto && val : state.urange)
108 if (state.buffer.wait_push(std::move(val)) == contrib::queue_op_status::closed)
109 break;
110
111 state.buffer.close();
112 };
113
114 state_ptr->producer = std::thread{runner};
115 }
116
118 template <typename other_urng_t>
119 requires (!std::same_as<std::remove_cvref_t<other_urng_t>, async_input_buffer_view>)
120 && // prevent recursive instantiation
121 std::ranges::viewable_range<other_urng_t>
122 && std::constructible_from<urng_t, std::ranges::ref_view<std::remove_reference_t<other_urng_t>>>
123 async_input_buffer_view(other_urng_t && _urng, size_t const buffer_size) :
124 async_input_buffer_view{std::views::all(_urng), buffer_size}
125 {}
127
143 {
144 assert(state_ptr != nullptr);
145 return {state_ptr->buffer};
146 }
147
149 iterator begin() const = delete;
150
152 std::default_sentinel_t end()
153 {
154 return std::default_sentinel;
155 }
156
158 std::default_sentinel_t end() const = delete;
160};
161
163template <std::ranges::range urng_t>
165{
167 using sentinel_type = std::default_sentinel_t;
168
170 contrib::fixed_buffer_queue<std::ranges::range_value_t<urng_t>> * buffer_ptr = nullptr;
171
173 mutable std::ranges::range_value_t<urng_t> cached_value;
174
176 bool at_end = false;
177
178public:
195
200 iterator() = default;
201 //TODO: delete:
202 iterator(iterator const & rhs) = default;
203 iterator(iterator && rhs) = default;
204 //TODO: delete:
205 iterator & operator=(iterator const & rhs) = default;
206 iterator & operator=(iterator && rhs) = default;
207 ~iterator() noexcept = default;
208
210 iterator(contrib::fixed_buffer_queue<std::ranges::range_value_t<urng_t>> & buffer) noexcept : buffer_ptr{&buffer}
211 {
212 ++(*this); // cache first value
213 }
215
220 reference operator*() const noexcept
221 {
222 return cached_value;
223 }
224
226 pointer operator->() const noexcept
227 {
228 return std::addressof(cached_value);
229 }
231
236 iterator & operator++() noexcept
237 {
238 if (at_end) // TODO unlikely
239 return *this;
240
241 assert(buffer_ptr != nullptr);
242
243 if (buffer_ptr->wait_pop(cached_value) == contrib::queue_op_status::closed)
244 at_end = true;
245
246 return *this;
247 }
248
250 void operator++(int) noexcept
251 {
252 ++(*this);
253 }
255
260 friend constexpr bool operator==(iterator const & lhs, std::default_sentinel_t const &) noexcept
261 {
262 return lhs.at_end;
263 }
264
266 friend constexpr bool operator==(std::default_sentinel_t const &, iterator const & rhs) noexcept
267 {
268 return rhs == std::default_sentinel_t{};
269 }
270
272 friend constexpr bool operator!=(iterator const & lhs, std::default_sentinel_t const &) noexcept
273 {
274 return !(lhs == std::default_sentinel_t{});
275 }
276
278 friend constexpr bool operator!=(std::default_sentinel_t const &, iterator const & rhs) noexcept
279 {
280 return rhs != std::default_sentinel_t{};
281 }
283};
284
291template <std::ranges::viewable_range urng_t>
294
295// ============================================================================
296// async_input_buffer_fn (adaptor definition
297// ============================================================================
298
301{
303 constexpr auto operator()(size_t const buffer_size) const
304 {
305 return detail::adaptor_from_functor{*this, buffer_size};
306 }
307
313 template <std::ranges::range urng_t>
314 constexpr auto operator()(urng_t && urange, size_t const buffer_size) const
315 {
316 static_assert(std::ranges::input_range<urng_t>,
317 "The range parameter to views::async_input_buffer must be at least a std::ranges::input_range.");
318 static_assert(std::ranges::viewable_range<urng_t>,
319 "The range parameter to views::async_input_buffer cannot be a temporary of a non-view range.");
320 static_assert(std::movable<std::ranges::range_value_t<urng_t>>,
321 "The range parameter to views::async_input_buffer must have a value_type that is std::movable.");
322 static_assert(
323 std::constructible_from<std::ranges::range_value_t<urng_t>,
325 "The range parameter to views::async_input_buffer must have a value_type that is constructible by a moved "
326 "value of its reference type.");
327
328 if (buffer_size == 0)
329 throw std::invalid_argument{"The buffer_size parameter to views::async_input_buffer must be > 0."};
330
331 return detail::async_input_buffer_view{std::forward<urng_t>(urange), buffer_size};
332 }
333};
334
335} // namespace seqan3::detail
336
337//-----------------------------------------------------------------------------
338// View shortcut for functor.
339//-----------------------------------------------------------------------------
340
341namespace seqan3::views
342{
479} // namespace seqan3::views
Provides seqan3::detail::adaptor_from_functor.
T addressof(T... args)
Provides seqan3::buffer_queue.
Template for range adaptor closure objects that store arguments and wrap a proto-adaptor.
Definition adaptor_from_functor.hpp:54
The iterator of the seqan3::detail::async_input_buffer_view.
Definition async_input_buffer.hpp:165
iterator(iterator &&rhs)=default
Defaulted.
void operator++(int) noexcept
Post-increment.
Definition async_input_buffer.hpp:250
pointer operator->() const noexcept
Returns pointer to the pointed-to object.
Definition async_input_buffer.hpp:226
iterator & operator=(iterator &&rhs)=default
Defaulted.
friend constexpr bool operator!=(std::default_sentinel_t const &, iterator const &rhs) noexcept
Compares for inequality with sentinel.
Definition async_input_buffer.hpp:278
iterator & operator++() noexcept
Pre-increment.
Definition async_input_buffer.hpp:236
std::default_sentinel_t sentinel_type
The sentinel type to compare to.
Definition async_input_buffer.hpp:167
reference operator*() const noexcept
Return the cached value.
Definition async_input_buffer.hpp:220
std::ranges::range_value_t< urng_t > cached_value
The cached value this iterator holds.
Definition async_input_buffer.hpp:173
iterator & operator=(iterator const &rhs)=default
Defaulted.
detail::iter_pointer_t< urng_iterator_type > pointer
Pointer type.
Definition async_input_buffer.hpp:187
friend constexpr bool operator==(iterator const &lhs, std::default_sentinel_t const &) noexcept
Compares for equality with sentinel.
Definition async_input_buffer.hpp:260
friend constexpr bool operator!=(iterator const &lhs, std::default_sentinel_t const &) noexcept
Compares for inequality with sentinel.
Definition async_input_buffer.hpp:272
iterator(iterator const &rhs)=default
Defaulted.
friend constexpr bool operator==(std::default_sentinel_t const &, iterator const &rhs) noexcept
Compares for equality with sentinel.
Definition async_input_buffer.hpp:266
The type returned by seqan3::views::async_input_buffer.
Definition async_input_buffer.hpp:39
async_input_buffer_view & operator=(async_input_buffer_view &&)=default
Defaulted.
std::ranges::iterator_t< urng_t > urng_iterator_type
The iterator type for the underlying range.
Definition async_input_buffer.hpp:54
std::shared_ptr< state > state_ptr
Shared holder of the state.
Definition async_input_buffer.hpp:70
async_input_buffer_view(urng_t _urng, size_t const buffer_size)
Construction from the underlying view.
Definition async_input_buffer.hpp:87
std::default_sentinel_t end()
Returns a sentinel.
Definition async_input_buffer.hpp:152
async_input_buffer_view & operator=(async_input_buffer_view const &)=default
Defaulted.
async_input_buffer_view(other_urng_t &&_urng, size_t const buffer_size)
Construction from std::ranges::viewable_range.
Definition async_input_buffer.hpp:123
std::default_sentinel_t end() const =delete
Const-qualified async_input_buffer_view::end() is deleted, because iterating changes the view.
iterator begin()
Returns an iterator to the current begin of the underlying range.
Definition async_input_buffer.hpp:142
async_input_buffer_view(urng_t &&, size_t const buffer_size) -> async_input_buffer_view< std::views::all_t< urng_t > >
Deduces the async_input_buffer_view from the underlying range if it is a std::ranges::viewable_range.
~async_input_buffer_view()=default
Defaulted.
iterator begin() const =delete
Const-qualified async_input_buffer_view::begin() is deleted, because iterating changes the view.
async_input_buffer_view()=default
Defaulted.
async_input_buffer_view(async_input_buffer_view &&)=default
Defaulted.
async_input_buffer_view(async_input_buffer_view const &)=default
Defaulted.
typename iter_pointer< it_t >::type iter_pointer_t
Return the pointer type of the input type (transformation_trait shortcut).
Definition iterator_traits.hpp:151
constexpr auto async_input_buffer
A view adapter that returns a concurrent-queue-like view over the underlying range.
Definition async_input_buffer.hpp:478
The internal SeqAn3 namespace.
Definition aligned_sequence_concept.hpp:26
The SeqAn namespace for views.
Definition char_strictly_to.hpp:19
SeqAn specific customisations in the standard namespace.
Definition of the range adaptor object type for seqan3::views::async_input_buffer.
Definition async_input_buffer.hpp:301
constexpr auto operator()(urng_t &&urange, size_t const buffer_size) const
Directly return an instance of the view, initialised with the given parameters.
Definition async_input_buffer.hpp:314
constexpr auto operator()(size_t const buffer_size) const
Store the argument and return a range adaptor closure object.
Definition async_input_buffer.hpp:303
Buffer and thread and shared between copies of this type.
Definition async_input_buffer.hpp:58
contrib::fixed_buffer_queue< std::ranges::range_value_t< urng_t > > buffer
The buffer queue.
Definition async_input_buffer.hpp:63
std::thread producer
Thread that rebuffers in the background.
Definition async_input_buffer.hpp:66
urng_t urange
The underlying range.
Definition async_input_buffer.hpp:60
Hide me