SeqAn3 3.2.0
The Modern C++ library for sequence analysis.
algorithm_executor_blocking.hpp
Go to the documentation of this file.
1// -----------------------------------------------------------------------------------------------------
2// Copyright (c) 2006-2022, Knut Reinert & Freie Universität Berlin
3// Copyright (c) 2016-2022, Knut Reinert & MPI für molekulare Genetik
4// This file may be used, modified and/or redistributed under the terms of the 3-clause BSD-License
5// shipped with this file and also available at: https://github.com/seqan/seqan3/blob/master/LICENSE.md
6// -----------------------------------------------------------------------------------------------------
7
13#pragma once
14
15#include <functional>
16#include <optional>
17#include <ranges>
18#include <type_traits>
19
22
23namespace seqan3::detail
24{
25
54template <std::ranges::viewable_range resource_t,
55 std::semiregular algorithm_t,
56 std::semiregular algorithm_result_t,
57 typename execution_handler_t = execution_handler_sequential>
58 requires std::ranges::forward_range<resource_t>
59 && std::invocable<algorithm_t,
60 std::ranges::range_reference_t<resource_t>,
61 std::function<void(algorithm_result_t)>>
62class algorithm_executor_blocking
63{
64private:
69 using resource_type = std::views::all_t<resource_t>;
71 using resource_iterator_type = std::ranges::iterator_t<resource_type>;
73 using resource_difference_type = std::iter_difference_t<resource_iterator_type>;
75
80 using bucket_type = std::vector<algorithm_result_t>;
82 using bucket_iterator_type = std::ranges::iterator_t<bucket_type>;
84 using buffer_type = std::vector<bucket_type>;
86 using buffer_iterator_type = std::ranges::iterator_t<buffer_type>;
88
90 enum fill_status
91 {
92 non_empty_buffer,
93 empty_buffer,
94 end_of_resource
95 };
96
97public:
103 algorithm_executor_blocking() = delete;
105 algorithm_executor_blocking(algorithm_executor_blocking const &) = delete;
106
124 algorithm_executor_blocking(algorithm_executor_blocking && other) noexcept :
125 algorithm_executor_blocking{std::move(other), other.resource_position()}
126 {}
127
129 algorithm_executor_blocking & operator=(algorithm_executor_blocking const &) = delete;
130
133 algorithm_executor_blocking & operator=(algorithm_executor_blocking && other)
134 {
135 auto old_resource_position = other.resource_position();
136
137 resource = std::move(other.resource);
138 move_initialise(std::move(other), old_resource_position);
139 return *this;
140 }
141
143 ~algorithm_executor_blocking() = default;
144
159 algorithm_executor_blocking(resource_t resource,
160 algorithm_t algorithm,
161 algorithm_result_t const SEQAN3_DOXYGEN_ONLY(result) = algorithm_result_t{},
162 execution_handler_t && exec_handler = execution_handler_t{}) :
163 exec_handler{std::move(exec_handler)},
164 resource{std::forward<resource_t>(resource)},
165 resource_it{std::ranges::begin(this->resource)},
166 algorithm{std::move(algorithm)}
167 {
168 if constexpr (std::same_as<execution_handler_t, execution_handler_parallel>)
169 buffer_size = static_cast<size_t>(std::ranges::distance(this->resource));
170
171 buffer.resize(buffer_size);
172 buffer_it = buffer.end();
173 buffer_end_it = buffer_it;
174 }
176
193 {
194 fill_status status;
195 // Each invocation of the algorithm might produce zero results (e.g. a search might not find a query)
196 // this repeats the algorithm until it produces the first result or the input resource was consumed.
197 do
198 {
199 status = fill_buffer();
200 }
201 while (status == fill_status::empty_buffer);
202
203 if (status == fill_status::end_of_resource)
204 return {std::nullopt};
205
206 assert(status == fill_status::non_empty_buffer);
207 assert(bucket_it != buffer_it->end());
208
209 std::optional<algorithm_result_t> result = std::ranges::iter_move(bucket_it);
210 go_to_next_result(); // Go to next buffered result.
211 return result;
212 }
213
215 bool is_eof() noexcept
216 {
217 return resource_it == std::ranges::end(resource);
218 }
219
220private:
226 algorithm_executor_blocking(algorithm_executor_blocking && other,
227 resource_difference_type old_resource_position) noexcept :
228 resource{std::move(other.resource)}
229 {
230 move_initialise(std::move(other), old_resource_position);
231 }
232
239 resource_difference_type resource_position()
240 {
241 // Get the old resource position.
242 auto position = std::ranges::distance(std::ranges::begin(resource), resource_it);
243 assert(position >= 0);
244 return position;
245 }
246
248 fill_status fill_buffer()
249 {
250 if (!is_buffer_empty()) // Not everything consumed yet.
251 return fill_status::non_empty_buffer;
252
253 if (is_eof()) // Case: reached end of resource.
254 return fill_status::end_of_resource;
255
256 // Reset the buckets and the buffer iterator.
257 reset_buffer();
258
259 // Execute the algorithm (possibly asynchronous) and fill the buckets in this pre-assigned order.
260 for (buffer_end_it = buffer_it; buffer_end_it != buffer.end() && !is_eof(); ++buffer_end_it, ++resource_it)
261 {
262 exec_handler.execute(algorithm,
263 *resource_it,
264 [target_buffer_it = buffer_end_it](auto && algorithm_result)
265 {
266 target_buffer_it->push_back(std::move(algorithm_result));
267 });
268 }
269
270 exec_handler.wait();
271
272 // Move the results iterator to the next available result. (This skips empty results of the algorithm)
273 find_next_non_empty_bucket();
274
275 if (is_buffer_empty())
276 return fill_status::empty_buffer;
277
278 return fill_status::non_empty_buffer;
279 }
280
284 bool is_buffer_empty() const
285 {
286 return buffer_it == buffer_end_it;
287 }
288
297 void reset_buffer()
298 {
299 // Clear all buckets
300 for (auto & bucket : buffer)
301 bucket.clear();
302
303 // Reset the iterator over the buckets.
304 buffer_it = buffer.begin();
305 }
306
315 void find_next_non_empty_bucket()
316 {
317 assert(buffer_it <= buffer_end_it);
318 // find first buffered bucket that contains at least one element
319 buffer_it = std::find_if(buffer_it,
320 buffer_end_it,
321 [](auto const & buffer)
322 {
323 return !buffer.empty();
324 });
325
326 if (buffer_it != buffer_end_it)
327 bucket_it = buffer_it->begin();
328 }
329
337 void go_to_next_result()
338 {
339 if (++bucket_it == buffer_it->end())
340 {
341 ++buffer_it;
342 find_next_non_empty_bucket();
343 }
344 }
345
347 void move_initialise(algorithm_executor_blocking && other, resource_difference_type old_resource_position) noexcept
348 {
349 algorithm = std::move(other.algorithm);
350 buffer_size = std::move(other.buffer_size);
351 exec_handler = std::move(other.exec_handler);
352 // Move the resource and set the iterator state accordingly.
353 resource_it = std::ranges::next(std::ranges::begin(resource), old_resource_position);
354
355 // Get the old buffer and bucket iterator positions.
356 auto buffer_it_position = other.buffer_it - other.buffer.begin();
357 auto buffer_end_it_position = other.buffer_end_it - other.buffer.begin();
358
359 std::ptrdiff_t bucket_it_position = 0;
360 if (buffer_it_position != buffer_end_it_position)
361 bucket_it_position = other.bucket_it - other.buffer_it->begin();
362
363 // Move the buffer and set the buffer and bucket iterator accordingly.
364 buffer = std::move(other.buffer);
365 buffer_it = buffer.begin() + buffer_it_position;
366 buffer_end_it = buffer.begin() + buffer_end_it_position;
367
368 if (buffer_it_position != buffer_end_it_position)
369 bucket_it = buffer_it->begin() + bucket_it_position;
370 }
371
373 execution_handler_t exec_handler{};
374
376 resource_type resource; // a std::ranges::view
378 resource_iterator_type resource_it{};
380 algorithm_t algorithm{};
381
383 buffer_type buffer{};
385 buffer_iterator_type buffer_it{};
387 buffer_iterator_type buffer_end_it{};
389 bucket_iterator_type bucket_it{};
391 size_t buffer_size{1};
392};
393
400template <typename resource_rng_t, std::semiregular algorithm_t, std::semiregular algorithm_result_t>
401algorithm_executor_blocking(resource_rng_t &&, algorithm_t, algorithm_result_t const &)
402 -> algorithm_executor_blocking<resource_rng_t, algorithm_t, algorithm_result_t, execution_handler_sequential>;
404} // namespace seqan3::detail
T begin(T... args)
Provides seqan3::detail::execution_handler_parallel.
Provides seqan3::detail::execution_handler_sequential.
T find_if(T... args)
constexpr auto is_eof
Checks whether a given letter is equal to the EOF constant defined in <cstdio>.
Definition: predicate.hpp:75