SeqAn3 3.1.0
The Modern C++ library for sequence analysis.
algorithm_executor_blocking.hpp
Go to the documentation of this file.
1// -----------------------------------------------------------------------------------------------------
2// Copyright (c) 2006-2021, Knut Reinert & Freie Universität Berlin
3// Copyright (c) 2016-2021, Knut Reinert & MPI für molekulare Genetik
4// This file may be used, modified and/or redistributed under the terms of the 3-clause BSD-License
5// shipped with this file and also available at: https://github.com/seqan/seqan3/blob/master/LICENSE.md
6// -----------------------------------------------------------------------------------------------------
7
13#pragma once
14
15#include <functional>
16#include <optional>
17#include <seqan3/std/ranges>
18#include <type_traits>
19
22
23namespace seqan3::detail
24{
25
54template <std::ranges::viewable_range resource_t,
55 std::semiregular algorithm_t,
56 std::semiregular algorithm_result_t,
57 typename execution_handler_t = execution_handler_sequential>
59 requires std::ranges::forward_range<resource_t> &&
60 std::invocable<algorithm_t, std::ranges::range_reference_t<resource_t>,
61 std::function<void(algorithm_result_t)>>
63class algorithm_executor_blocking
64{
65private:
70 using resource_type = std::views::all_t<resource_t>;
72 using resource_iterator_type = std::ranges::iterator_t<resource_type>;
74
79 using bucket_type = std::vector<algorithm_result_t>;
81 using bucket_iterator_type = std::ranges::iterator_t<bucket_type>;
83 using buffer_type = std::vector<bucket_type>;
85 using buffer_iterator_type = std::ranges::iterator_t<buffer_type>;
87
89 enum fill_status
90 {
91 non_empty_buffer,
92 empty_buffer,
93 end_of_resource
94 };
95
96public:
102 algorithm_executor_blocking() = delete;
104 algorithm_executor_blocking(algorithm_executor_blocking const &) = delete;
105
123 algorithm_executor_blocking(algorithm_executor_blocking && other) noexcept
124 {
125 move_initialise(std::move(other));
126 }
127
129 algorithm_executor_blocking & operator=(algorithm_executor_blocking const &) = delete;
130
133 algorithm_executor_blocking & operator=(algorithm_executor_blocking && other)
134 {
135 move_initialise(std::move(other));
136 return *this;
137 }
138
140 ~algorithm_executor_blocking() = default;
141
156 algorithm_executor_blocking(resource_t resource,
157 algorithm_t algorithm,
158 algorithm_result_t const SEQAN3_DOXYGEN_ONLY(result) = algorithm_result_t{},
159 execution_handler_t && exec_handler = execution_handler_t{}) :
160 exec_handler{std::move(exec_handler)},
161 resource{std::views::all(resource)},
162 resource_it{std::ranges::begin(this->resource)},
163 algorithm{std::move(algorithm)}
164 {
165 if constexpr (std::same_as<execution_handler_t, execution_handler_parallel>)
166 buffer_size = static_cast<size_t>(std::ranges::distance(resource));
167
168 buffer.resize(buffer_size);
169 buffer_it = buffer.end();
170 buffer_end_it = buffer_it;
171 }
173
190 {
191 fill_status status;
192 // Each invocation of the algorithm might produce zero results (e.g. a search might not find a query)
193 // this repeats the algorithm until it produces the first result or the input resource was consumed.
194 do { status = fill_buffer(); } while (status == fill_status::empty_buffer);
195
196 if (status == fill_status::end_of_resource)
197 return {std::nullopt};
198
199 assert(status == fill_status::non_empty_buffer);
200 assert(bucket_it != buffer_it->end());
201
202 std::optional<algorithm_result_t> result = std::ranges::iter_move(bucket_it);
203 go_to_next_result(); // Go to next buffered result.
204 return result;
205 }
206
208 bool is_eof() noexcept
209 {
210 return resource_it == std::ranges::end(resource);
211 }
212
213private:
215 fill_status fill_buffer()
216 {
217 if (!is_buffer_empty()) // Not everything consumed yet.
218 return fill_status::non_empty_buffer;
219
220 if (is_eof()) // Case: reached end of resource.
221 return fill_status::end_of_resource;
222
223 // Reset the buckets and the buffer iterator.
224 reset_buffer();
225
226 // Execute the algorithm (possibly asynchronous) and fill the buckets in this pre-assigned order.
227 for (buffer_end_it = buffer_it; buffer_end_it != buffer.end() && !is_eof(); ++buffer_end_it, ++resource_it)
228 {
229 exec_handler.execute(algorithm, *resource_it, [target_buffer_it = buffer_end_it] (auto && algorithm_result)
230 {
231 target_buffer_it->push_back(std::move(algorithm_result));
232 });
233 }
234
235 exec_handler.wait();
236
237 // Move the results iterator to the next available result. (This skips empty results of the algorithm)
238 find_next_non_empty_bucket();
239
240 if (is_buffer_empty())
241 return fill_status::empty_buffer;
242
243 return fill_status::non_empty_buffer;
244 }
245
249 bool is_buffer_empty() const
250 {
251 return buffer_it == buffer_end_it;
252 }
253
262 void reset_buffer()
263 {
264 // Clear all buckets
265 for (auto & bucket : buffer)
266 bucket.clear();
267
268 // Reset the iterator over the buckets.
269 buffer_it = buffer.begin();
270 }
271
280 void find_next_non_empty_bucket()
281 {
282 assert(buffer_it <= buffer_end_it);
283 // find first buffered bucket that contains at least one element
284 buffer_it = std::find_if(buffer_it, buffer_end_it, [] (auto const & buffer)
285 {
286 return !buffer.empty();
287 });
288
289 if (buffer_it != buffer_end_it)
290 bucket_it = buffer_it->begin();
291 }
292
300 void go_to_next_result()
301 {
302 if (++bucket_it == buffer_it->end())
303 {
304 ++buffer_it;
305 find_next_non_empty_bucket();
306 }
307 }
308
311 void move_initialise(algorithm_executor_blocking && other) noexcept
312 {
313 algorithm = std::move(other.algorithm);
314 buffer_size = std::move(other.buffer_size);
315 exec_handler = std::move(other.exec_handler);
316 // Get the old resource position.
317 auto old_resource_position = std::ranges::distance(std::ranges::begin(other.resource),
318 other.resource_it);
319 // Move the resource and set the iterator state accordingly.
320 resource = std::move(other.resource);
321 resource_it = std::ranges::next(std::ranges::begin(resource), old_resource_position);
322
323 // Get the old buffer and bucket iterator positions.
324 auto buffer_it_position = other.buffer_it - other.buffer.begin();
325 auto buffer_end_it_position = other.buffer_end_it - other.buffer.begin();
326
327 std::ptrdiff_t bucket_it_position = 0;
328 if (buffer_it_position != buffer_end_it_position)
329 bucket_it_position = other.bucket_it - other.buffer_it->begin();
330
331 // Move the buffer and set the buffer and bucket iterator accordingly.
332 buffer = std::move(other.buffer);
333 buffer_it = buffer.begin() + buffer_it_position;
334 buffer_end_it = buffer.begin() + buffer_end_it_position;
335
336 if (buffer_it_position != buffer_end_it_position)
337 bucket_it = buffer_it->begin() + bucket_it_position;
338 }
339
341 execution_handler_t exec_handler{};
342
344 resource_type resource{};
346 resource_iterator_type resource_it{};
348 algorithm_t algorithm{};
349
351 buffer_type buffer{};
353 buffer_iterator_type buffer_it{};
355 buffer_iterator_type buffer_end_it{};
357 bucket_iterator_type bucket_it{};
359 size_t buffer_size{1};
360};
361
368template <typename resource_rng_t, std::semiregular algorithm_t, std::semiregular algorithm_result_t>
369algorithm_executor_blocking(resource_rng_t &&, algorithm_t, algorithm_result_t const &) ->
370 algorithm_executor_blocking<resource_rng_t, algorithm_t, algorithm_result_t, execution_handler_sequential>;
372} // namespace seqan3::detail
T begin(T... args)
Provides seqan3::detail::execution_handler_parallel.
Provides seqan3::detail::execution_handler_sequential.
T find_if(T... args)
constexpr auto is_eof
Checks whether a given letter is equal to the EOF constant defined in <cstdio>.
Definition: predicate.hpp:77
The <ranges> header from C++20's standard library.