SeqAn3 3.4.0-rc.1
The Modern C++ library for sequence analysis.
Loading...
Searching...
No Matches
async_input_buffer.hpp
Go to the documentation of this file.
1// SPDX-FileCopyrightText: 2006-2024 Knut Reinert & Freie Universität Berlin
2// SPDX-FileCopyrightText: 2016-2024 Knut Reinert & MPI für molekulare Genetik
3// SPDX-License-Identifier: BSD-3-Clause
4
10#pragma once
11
12#include <concepts>
13#include <iterator>
14#include <memory>
15#include <ranges>
16#include <thread>
17
20
21//-----------------------------------------------------------------------------
22// This is the path a value takes when using this views::
23// urange
24// → async_input_buffer_view.buffer [size n]
25// → iterator.cached_value [size 1]
26// → user
27//-----------------------------------------------------------------------------
28
29namespace seqan3::detail
30{
31
37template <std::ranges::range urng_t>
38class async_input_buffer_view : public std::ranges::view_interface<async_input_buffer_view<urng_t>>
39{
40private:
41 static_assert(std::ranges::input_range<urng_t>,
42 "The range parameter to async_input_buffer_view must be at least a std::ranges::input_range.");
43 static_assert(std::ranges::view<urng_t>,
44 "The range parameter to async_input_buffer_view must model std::ranges::view.");
45 static_assert(std::movable<std::ranges::range_value_t<urng_t>>,
46 "The range parameter to async_input_buffer_view must have a value_type that is std::movable.");
47 static_assert(
48 std::constructible_from<std::ranges::range_value_t<urng_t>,
50 "The range parameter to async_input_buffer_view must have a value_type that is constructible by a moved "
51 "value of its reference type.");
52
54 using urng_iterator_type = std::ranges::iterator_t<urng_t>;
55
57 struct state
58 {
60 urng_t urange;
61
63 contrib::fixed_buffer_queue<std::ranges::range_value_t<urng_t>> buffer;
64
66 std::thread producer;
67 };
68
70 std::shared_ptr<state> state_ptr = nullptr;
71
73 class iterator;
74
75public:
79 async_input_buffer_view() = default;
80 async_input_buffer_view(async_input_buffer_view const &) = default;
81 async_input_buffer_view(async_input_buffer_view &&) = default;
82 async_input_buffer_view & operator=(async_input_buffer_view const &) = default;
83 async_input_buffer_view & operator=(async_input_buffer_view &&) = default;
84 ~async_input_buffer_view() = default;
85
87 async_input_buffer_view(urng_t _urng, size_t const buffer_size)
88 {
89 auto deleter = [](state * p)
90 {
91 if (p != nullptr)
92 {
93 p->buffer.close();
94 p->producer.join();
95 delete p;
96 }
97 };
98
99 state_ptr = std::shared_ptr<state>(
100 new state{std::move(_urng),
101 contrib::fixed_buffer_queue<std::ranges::range_value_t<urng_t>>{buffer_size},
102 std::thread{}}, // thread is set/started below, needs rest of state
103 deleter);
104
105 auto runner = [&state = *state_ptr]()
106 {
107 for (auto && val : state.urange)
108 if (state.buffer.wait_push(std::move(val)) == contrib::queue_op_status::closed)
109 break;
110
111 state.buffer.close();
112 };
113
114 state_ptr->producer = std::thread{runner};
115 }
116
118 template <typename other_urng_t>
119 requires (!std::same_as<std::remove_cvref_t<other_urng_t>, async_input_buffer_view>)
120 && // prevent recursive instantiation
121 std::ranges::viewable_range<other_urng_t>
122 && std::constructible_from<urng_t, std::ranges::ref_view<std::remove_reference_t<other_urng_t>>>
123 async_input_buffer_view(other_urng_t && _urng, size_t const buffer_size) :
124 async_input_buffer_view{std::views::all(_urng), buffer_size}
125 {}
127
142 iterator begin()
143 {
144 assert(state_ptr != nullptr);
145 return {state_ptr->buffer};
146 }
147
149 iterator begin() const = delete;
150
152 std::default_sentinel_t end()
153 {
154 return std::default_sentinel;
155 }
156
158 std::default_sentinel_t end() const = delete;
160};
161
163template <std::ranges::range urng_t>
164class async_input_buffer_view<urng_t>::iterator
165{
167 using sentinel_type = std::default_sentinel_t;
168
170 contrib::fixed_buffer_queue<std::ranges::range_value_t<urng_t>> * buffer_ptr = nullptr;
171
173 mutable std::ranges::range_value_t<urng_t> cached_value;
174
176 bool at_end = false;
177
178public:
183 using difference_type = std::iter_difference_t<urng_iterator_type>;
185 using value_type = std::iter_value_t<urng_iterator_type>;
187 using pointer = detail::iter_pointer_t<urng_iterator_type>;
191 using iterator_category = std::input_iterator_tag;
193 using iterator_concept = iterator_category;
195
200 iterator() = default;
201 //TODO: delete:
202 iterator(iterator const & rhs) = default;
203 iterator(iterator && rhs) = default;
204 //TODO: delete:
205 iterator & operator=(iterator const & rhs) = default;
206 iterator & operator=(iterator && rhs) = default;
207 ~iterator() noexcept = default;
208
210 iterator(contrib::fixed_buffer_queue<std::ranges::range_value_t<urng_t>> & buffer) noexcept : buffer_ptr{&buffer}
211 {
212 ++(*this); // cache first value
213 }
215
220 reference operator*() const noexcept
221 {
222 return cached_value;
223 }
224
226 pointer operator->() const noexcept
227 {
228 return std::addressof(cached_value);
229 }
231
236 iterator & operator++() noexcept
237 {
238 if (at_end) // TODO unlikely
239 return *this;
240
241 assert(buffer_ptr != nullptr);
242
243 if (buffer_ptr->wait_pop(cached_value) == contrib::queue_op_status::closed)
244 at_end = true;
245
246 return *this;
247 }
248
250 void operator++(int) noexcept
251 {
252 ++(*this);
253 }
255
260 friend constexpr bool operator==(iterator const & lhs, std::default_sentinel_t const &) noexcept
261 {
262 return lhs.at_end;
263 }
264
266 friend constexpr bool operator==(std::default_sentinel_t const &, iterator const & rhs) noexcept
267 {
268 return rhs == std::default_sentinel_t{};
269 }
270
272 friend constexpr bool operator!=(iterator const & lhs, std::default_sentinel_t const &) noexcept
273 {
274 return !(lhs == std::default_sentinel_t{});
275 }
276
278 friend constexpr bool operator!=(std::default_sentinel_t const &, iterator const & rhs) noexcept
279 {
280 return rhs != std::default_sentinel_t{};
281 }
283};
284
291template <std::ranges::viewable_range urng_t>
292async_input_buffer_view(urng_t &&, size_t const buffer_size) -> async_input_buffer_view<std::views::all_t<urng_t>>;
294
295// ============================================================================
296// async_input_buffer_fn (adaptor definition
297// ============================================================================
298
300struct async_input_buffer_fn
301{
303 constexpr auto operator()(size_t const buffer_size) const
304 {
305 return detail::adaptor_from_functor{*this, buffer_size};
306 }
307
313 template <std::ranges::range urng_t>
314 constexpr auto operator()(urng_t && urange, size_t const buffer_size) const
315 {
316 static_assert(std::ranges::input_range<urng_t>,
317 "The range parameter to views::async_input_buffer must be at least a std::ranges::input_range.");
318 static_assert(std::ranges::viewable_range<urng_t>,
319 "The range parameter to views::async_input_buffer cannot be a temporary of a non-view range.");
320 static_assert(std::movable<std::ranges::range_value_t<urng_t>>,
321 "The range parameter to views::async_input_buffer must have a value_type that is std::movable.");
322 static_assert(
323 std::constructible_from<std::ranges::range_value_t<urng_t>,
325 "The range parameter to views::async_input_buffer must have a value_type that is constructible by a moved "
326 "value of its reference type.");
327
328 if (buffer_size == 0)
329 throw std::invalid_argument{"The buffer_size parameter to views::async_input_buffer must be > 0."};
330
331 return detail::async_input_buffer_view{std::forward<urng_t>(urange), buffer_size};
332 }
333};
334
335} // namespace seqan3::detail
336
337//-----------------------------------------------------------------------------
338// View shortcut for functor.
339//-----------------------------------------------------------------------------
340
341namespace seqan3::views
342{
478inline constexpr auto async_input_buffer = detail::async_input_buffer_fn{};
479} // namespace seqan3::views
Provides seqan3::detail::adaptor_from_functor.
T addressof(T... args)
T begin(T... args)
Provides seqan3::buffer_queue.
T end(T... args)
constexpr auto async_input_buffer
A view adapter that returns a concurrent-queue-like view over the underlying range.
Definition async_input_buffer.hpp:478
T move(T... args)
The SeqAn namespace for views.
Definition char_strictly_to.hpp:19
SeqAn specific customisations in the standard namespace.
T operator!=(T... args)
Hide me