SeqAn3 3.1.0
The Modern C++ library for sequence analysis.
async_input_buffer.hpp
Go to the documentation of this file.
1// -----------------------------------------------------------------------------------------------------
2// Copyright (c) 2006-2021, Knut Reinert & Freie Universität Berlin
3// Copyright (c) 2016-2021, Knut Reinert & MPI für molekulare Genetik
4// This file may be used, modified and/or redistributed under the terms of the 3-clause BSD-License
5// shipped with this file and also available at: https://github.com/seqan/seqan3/blob/master/LICENSE.md
6// -----------------------------------------------------------------------------------------------------
7
13#pragma once
14
15#include <seqan3/std/concepts>
16#include <seqan3/std/iterator>
17#include <seqan3/std/ranges>
18#include <thread>
19
22
23//-----------------------------------------------------------------------------
24// This is the path a value takes when using this views::
25// urange
26// → async_input_buffer_view.buffer [size n]
27// → iterator.cached_value [size 1]
28// → user
29//-----------------------------------------------------------------------------
30
31namespace seqan3::detail
32{
33
39template <std::ranges::range urng_t>
40class async_input_buffer_view : public std::ranges::view_interface<async_input_buffer_view<urng_t>>
41{
42private:
43 static_assert(std::ranges::input_range<urng_t>,
44 "The range parameter to async_input_buffer_view must be at least a std::ranges::input_range.");
45 static_assert(std::ranges::view<urng_t>,
46 "The range parameter to async_input_buffer_view must model std::ranges::view.");
47 static_assert(std::movable<std::ranges::range_value_t<urng_t>>,
48 "The range parameter to async_input_buffer_view must have a value_type that is std::movable.");
49 static_assert(std::constructible_from<std::ranges::range_value_t<urng_t>,
51 "The range parameter to async_input_buffer_view must have a value_type that is constructible by a moved "
52 "value of its reference type.");
53
55 using urng_iterator_type = std::ranges::iterator_t<urng_t>;
56
58 struct state
59 {
61 urng_t urange;
62
64 contrib::fixed_buffer_queue<std::ranges::range_value_t<urng_t>> buffer;
65
67 std::thread producer;
68 };
69
71 std::shared_ptr<state> state_ptr = nullptr;
72
74 class iterator;
75
76public:
80 async_input_buffer_view() = default;
81 async_input_buffer_view(async_input_buffer_view const &) = default;
82 async_input_buffer_view(async_input_buffer_view &&) = default;
83 async_input_buffer_view & operator=(async_input_buffer_view const &) = default;
84 async_input_buffer_view & operator=(async_input_buffer_view &&) = default;
85 ~async_input_buffer_view() = default;
86
88 async_input_buffer_view(urng_t _urng, size_t const buffer_size)
89 {
90 auto deleter = [] (state * p)
91 {
92 if (p != nullptr)
93 {
94 p->buffer.close();
95 p->producer.join();
96 delete p;
97 }
98 };
99
100 state_ptr = std::shared_ptr<state>(new state{std::move(_urng),
101 contrib::fixed_buffer_queue<std::ranges::range_value_t<urng_t>>{buffer_size},
102 std::thread{}}, // thread is set/started below, needs rest of state
103 deleter);
104
105 auto runner = [&state = *state_ptr] ()
106 {
107 for (auto && val : state.urange)
108 if (state.buffer.wait_push(std::move(val)) == contrib::queue_op_status::closed)
109 break;
110
111 state.buffer.close();
112 };
113
114 state_ptr->producer = std::thread{runner};
115 }
116
118 template <typename other_urng_t>
120 requires (!std::same_as<std::remove_cvref_t<other_urng_t>, async_input_buffer_view>) && // prevent recursive instantiation
121 std::ranges::viewable_range<other_urng_t> &&
122 std::constructible_from<urng_t, std::ranges::ref_view<std::remove_reference_t<other_urng_t>>>
124 async_input_buffer_view(other_urng_t && _urng, size_t const buffer_size) :
125 async_input_buffer_view{std::views::all(_urng), buffer_size}
126 {}
128
143 iterator begin()
144 {
145 assert(state_ptr != nullptr);
146 return {state_ptr->buffer};
147 }
148
150 iterator begin() const = delete;
151
153 std::default_sentinel_t end()
154 {
155 return std::default_sentinel;
156 }
157
159 std::default_sentinel_t end() const = delete;
161};
162
164template <typename urng_t>
165class async_input_buffer_view<urng_t>::iterator
166{
168 using sentinel_type = std::default_sentinel_t;
169
171 contrib::fixed_buffer_queue<std::ranges::range_value_t<urng_t>> * buffer_ptr = nullptr;
172
174 mutable std::ranges::range_value_t<urng_t> cached_value;
175
177 bool at_end = false;
178
179public:
180
185 using difference_type = std::iter_difference_t<urng_iterator_type>;
187 using value_type = std::iter_value_t<urng_iterator_type>;
189 using pointer = detail::iter_pointer_t<urng_iterator_type>;
193 using iterator_category = std::input_iterator_tag;
195 using iterator_concept = iterator_category;
197
202 iterator() = default;
203 //TODO: delete:
204 iterator(iterator const & rhs) = default;
205 iterator(iterator && rhs) = default;
206 //TODO: delete:
207 iterator & operator=(iterator const & rhs) = default;
208 iterator & operator=(iterator && rhs) = default;
209 ~iterator() noexcept = default;
210
212 iterator(contrib::fixed_buffer_queue<std::ranges::range_value_t<urng_t>> & buffer) noexcept : buffer_ptr{&buffer}
213 {
214 ++(*this); // cache first value
215 }
217
222 reference operator*() const noexcept
223 {
224 return cached_value;
225 }
226
228 pointer operator->() const noexcept
229 {
230 return std::addressof(cached_value);
231 }
233
238 iterator & operator++() noexcept
239 {
240 if (at_end) // TODO unlikely
241 return *this;
242
243 assert(buffer_ptr != nullptr);
244
245 if (buffer_ptr->wait_pop(cached_value) == contrib::queue_op_status::closed)
246 at_end = true;
247
248 return *this;
249 }
250
252 void operator++(int) noexcept
253 {
254 ++(*this);
255 }
257
262 friend constexpr bool operator==(iterator const & lhs, std::default_sentinel_t const &) noexcept
263 {
264 return lhs.at_end;
265 }
266
268 friend constexpr bool operator==(std::default_sentinel_t const &, iterator const & rhs) noexcept
269 {
270 return rhs == std::default_sentinel_t{};
271 }
272
274 friend constexpr bool operator!=(iterator const & lhs, std::default_sentinel_t const &) noexcept
275 {
276 return !(lhs == std::default_sentinel_t{});
277 }
278
280 friend constexpr bool operator!=(std::default_sentinel_t const &, iterator const & rhs) noexcept
281 {
282 return rhs != std::default_sentinel_t{};
283 }
285};
286
293template <std::ranges::viewable_range urng_t>
294async_input_buffer_view(urng_t &&, size_t const buffer_size) -> async_input_buffer_view<std::views::all_t<urng_t>>;
296
297// ============================================================================
298// async_input_buffer_fn (adaptor definition
299// ============================================================================
300
302struct async_input_buffer_fn
303{
305 constexpr auto operator()(size_t const buffer_size) const
306 {
307 return detail::adaptor_from_functor{*this, buffer_size};
308 }
309
315 template <std::ranges::range urng_t>
316 constexpr auto operator()(urng_t && urange, size_t const buffer_size) const
317 {
318 static_assert(std::ranges::input_range<urng_t>,
319 "The range parameter to views::async_input_buffer must be at least a std::ranges::input_range.");
320 static_assert(std::ranges::viewable_range<urng_t>,
321 "The range parameter to views::async_input_buffer cannot be a temporary of a non-view range.");
322 static_assert(std::movable<std::ranges::range_value_t<urng_t>>,
323 "The range parameter to views::async_input_buffer must have a value_type that is std::movable.");
324 static_assert(std::constructible_from<std::ranges::range_value_t<urng_t>,
326 "The range parameter to views::async_input_buffer must have a value_type that is constructible by a moved "
327 "value of its reference type.");
328
329 if (buffer_size == 0)
330 throw std::invalid_argument{"The buffer_size parameter to views::async_input_buffer must be > 0."};
331
332 return detail::async_input_buffer_view{std::forward<urng_t>(urange), buffer_size};
333 }
334};
335
336} // seqan3::detail
337
338//-----------------------------------------------------------------------------
339// View shortcut for functor.
340//-----------------------------------------------------------------------------
341
342namespace seqan3::views
343{
479inline constexpr auto async_input_buffer = detail::async_input_buffer_fn{};
480} // namespace seqan3::views
Provides seqan3::detail::adaptor_from_functor.
T addressof(T... args)
T begin(T... args)
Provides seqan3::buffer_queue.
The <concepts> header from C++20's standard library.
T end(T... args)
constexpr auto async_input_buffer
A view adapter that returns a concurrent-queue-like view over the underlying range.
Definition: async_input_buffer.hpp:479
The <iterator> header from C++20's standard library.
The SeqAn namespace for views.
Definition: char_to.hpp:22
SeqAn specific customisations in the standard namespace.
T operator!=(T... args)
The <ranges> header from C++20's standard library.