SeqAn3 3.2.0
The Modern C++ library for sequence analysis.
async_input_buffer.hpp
Go to the documentation of this file.
1// -----------------------------------------------------------------------------------------------------
2// Copyright (c) 2006-2022, Knut Reinert & Freie Universität Berlin
3// Copyright (c) 2016-2022, Knut Reinert & MPI für molekulare Genetik
4// This file may be used, modified and/or redistributed under the terms of the 3-clause BSD-License
5// shipped with this file and also available at: https://github.com/seqan/seqan3/blob/master/LICENSE.md
6// -----------------------------------------------------------------------------------------------------
7
13#pragma once
14
15#include <concepts>
16#include <iterator>
17#include <memory>
18#include <ranges>
19#include <thread>
20
23
24//-----------------------------------------------------------------------------
25// This is the path a value takes when using this views::
26// urange
27// → async_input_buffer_view.buffer [size n]
28// → iterator.cached_value [size 1]
29// → user
30//-----------------------------------------------------------------------------
31
32namespace seqan3::detail
33{
34
40template <std::ranges::range urng_t>
41class async_input_buffer_view : public std::ranges::view_interface<async_input_buffer_view<urng_t>>
42{
43private:
44 static_assert(std::ranges::input_range<urng_t>,
45 "The range parameter to async_input_buffer_view must be at least a std::ranges::input_range.");
46 static_assert(std::ranges::view<urng_t>,
47 "The range parameter to async_input_buffer_view must model std::ranges::view.");
48 static_assert(std::movable<std::ranges::range_value_t<urng_t>>,
49 "The range parameter to async_input_buffer_view must have a value_type that is std::movable.");
50 static_assert(
51 std::constructible_from<std::ranges::range_value_t<urng_t>,
53 "The range parameter to async_input_buffer_view must have a value_type that is constructible by a moved "
54 "value of its reference type.");
55
57 using urng_iterator_type = std::ranges::iterator_t<urng_t>;
58
60 struct state
61 {
63 urng_t urange;
64
66 contrib::fixed_buffer_queue<std::ranges::range_value_t<urng_t>> buffer;
67
69 std::thread producer;
70 };
71
73 std::shared_ptr<state> state_ptr = nullptr;
74
76 class iterator;
77
78public:
82 async_input_buffer_view() = default;
83 async_input_buffer_view(async_input_buffer_view const &) = default;
84 async_input_buffer_view(async_input_buffer_view &&) = default;
85 async_input_buffer_view & operator=(async_input_buffer_view const &) = default;
86 async_input_buffer_view & operator=(async_input_buffer_view &&) = default;
87 ~async_input_buffer_view() = default;
88
90 async_input_buffer_view(urng_t _urng, size_t const buffer_size)
91 {
92 auto deleter = [](state * p)
93 {
94 if (p != nullptr)
95 {
96 p->buffer.close();
97 p->producer.join();
98 delete p;
99 }
100 };
101
102 state_ptr = std::shared_ptr<state>(
103 new state{std::move(_urng),
104 contrib::fixed_buffer_queue<std::ranges::range_value_t<urng_t>>{buffer_size},
105 std::thread{}}, // thread is set/started below, needs rest of state
106 deleter);
107
108 auto runner = [&state = *state_ptr]()
109 {
110 for (auto && val : state.urange)
111 if (state.buffer.wait_push(std::move(val)) == contrib::queue_op_status::closed)
112 break;
113
114 state.buffer.close();
115 };
116
117 state_ptr->producer = std::thread{runner};
118 }
119
121 template <typename other_urng_t>
122 requires (!std::same_as<std::remove_cvref_t<other_urng_t>, async_input_buffer_view>)
123 && // prevent recursive instantiation
124 std::ranges::viewable_range<other_urng_t>
125 && std::constructible_from<urng_t, std::ranges::ref_view<std::remove_reference_t<other_urng_t>>>
126 async_input_buffer_view(other_urng_t && _urng, size_t const buffer_size) :
127 async_input_buffer_view{std::views::all(_urng), buffer_size}
128 {}
130
145 iterator begin()
146 {
147 assert(state_ptr != nullptr);
148 return {state_ptr->buffer};
149 }
150
152 iterator begin() const = delete;
153
155 std::default_sentinel_t end()
156 {
157 return std::default_sentinel;
158 }
159
161 std::default_sentinel_t end() const = delete;
163};
164
166template <std::ranges::range urng_t>
167class async_input_buffer_view<urng_t>::iterator
168{
170 using sentinel_type = std::default_sentinel_t;
171
173 contrib::fixed_buffer_queue<std::ranges::range_value_t<urng_t>> * buffer_ptr = nullptr;
174
176 mutable std::ranges::range_value_t<urng_t> cached_value;
177
179 bool at_end = false;
180
181public:
186 using difference_type = std::iter_difference_t<urng_iterator_type>;
188 using value_type = std::iter_value_t<urng_iterator_type>;
190 using pointer = detail::iter_pointer_t<urng_iterator_type>;
194 using iterator_category = std::input_iterator_tag;
196 using iterator_concept = iterator_category;
198
203 iterator() = default;
204 //TODO: delete:
205 iterator(iterator const & rhs) = default;
206 iterator(iterator && rhs) = default;
207 //TODO: delete:
208 iterator & operator=(iterator const & rhs) = default;
209 iterator & operator=(iterator && rhs) = default;
210 ~iterator() noexcept = default;
211
213 iterator(contrib::fixed_buffer_queue<std::ranges::range_value_t<urng_t>> & buffer) noexcept : buffer_ptr{&buffer}
214 {
215 ++(*this); // cache first value
216 }
218
223 reference operator*() const noexcept
224 {
225 return cached_value;
226 }
227
229 pointer operator->() const noexcept
230 {
231 return std::addressof(cached_value);
232 }
234
239 iterator & operator++() noexcept
240 {
241 if (at_end) // TODO unlikely
242 return *this;
243
244 assert(buffer_ptr != nullptr);
245
246 if (buffer_ptr->wait_pop(cached_value) == contrib::queue_op_status::closed)
247 at_end = true;
248
249 return *this;
250 }
251
253 void operator++(int) noexcept
254 {
255 ++(*this);
256 }
258
263 friend constexpr bool operator==(iterator const & lhs, std::default_sentinel_t const &) noexcept
264 {
265 return lhs.at_end;
266 }
267
269 friend constexpr bool operator==(std::default_sentinel_t const &, iterator const & rhs) noexcept
270 {
271 return rhs == std::default_sentinel_t{};
272 }
273
275 friend constexpr bool operator!=(iterator const & lhs, std::default_sentinel_t const &) noexcept
276 {
277 return !(lhs == std::default_sentinel_t{});
278 }
279
281 friend constexpr bool operator!=(std::default_sentinel_t const &, iterator const & rhs) noexcept
282 {
283 return rhs != std::default_sentinel_t{};
284 }
286};
287
294template <std::ranges::viewable_range urng_t>
295async_input_buffer_view(urng_t &&, size_t const buffer_size) -> async_input_buffer_view<std::views::all_t<urng_t>>;
297
298// ============================================================================
299// async_input_buffer_fn (adaptor definition
300// ============================================================================
301
303struct async_input_buffer_fn
304{
306 constexpr auto operator()(size_t const buffer_size) const
307 {
308 return detail::adaptor_from_functor{*this, buffer_size};
309 }
310
316 template <std::ranges::range urng_t>
317 constexpr auto operator()(urng_t && urange, size_t const buffer_size) const
318 {
319 static_assert(std::ranges::input_range<urng_t>,
320 "The range parameter to views::async_input_buffer must be at least a std::ranges::input_range.");
321 static_assert(std::ranges::viewable_range<urng_t>,
322 "The range parameter to views::async_input_buffer cannot be a temporary of a non-view range.");
323 static_assert(std::movable<std::ranges::range_value_t<urng_t>>,
324 "The range parameter to views::async_input_buffer must have a value_type that is std::movable.");
325 static_assert(
326 std::constructible_from<std::ranges::range_value_t<urng_t>,
328 "The range parameter to views::async_input_buffer must have a value_type that is constructible by a moved "
329 "value of its reference type.");
330
331 if (buffer_size == 0)
332 throw std::invalid_argument{"The buffer_size parameter to views::async_input_buffer must be > 0."};
333
334 return detail::async_input_buffer_view{std::forward<urng_t>(urange), buffer_size};
335 }
336};
337
338} // namespace seqan3::detail
339
340//-----------------------------------------------------------------------------
341// View shortcut for functor.
342//-----------------------------------------------------------------------------
343
344namespace seqan3::views
345{
481inline constexpr auto async_input_buffer = detail::async_input_buffer_fn{};
482} // namespace seqan3::views
Provides seqan3::detail::adaptor_from_functor.
T addressof(T... args)
T begin(T... args)
Provides seqan3::buffer_queue.
T end(T... args)
constexpr auto async_input_buffer
A view adapter that returns a concurrent-queue-like view over the underlying range.
Definition: async_input_buffer.hpp:481
The SeqAn namespace for views.
Definition: char_strictly_to.hpp:22
SeqAn specific customisations in the standard namespace.
T operator!=(T... args)