SeqAn3  3.0.2
The Modern C++ library for sequence analysis.
algorithm_executor_blocking.hpp
Go to the documentation of this file.
1 // -----------------------------------------------------------------------------------------------------
2 // Copyright (c) 2006-2020, Knut Reinert & Freie Universität Berlin
3 // Copyright (c) 2016-2020, Knut Reinert & MPI für molekulare Genetik
4 // This file may be used, modified and/or redistributed under the terms of the 3-clause BSD-License
5 // shipped with this file and also available at: https://github.com/seqan/seqan3/blob/master/LICENSE.md
6 // -----------------------------------------------------------------------------------------------------
7 
13 #pragma once
14 
15 #include <functional>
16 #include <optional>
17 #include <seqan3/std/ranges>
18 #include <type_traits>
19 
23 
24 namespace seqan3::detail
25 {
26 
55 template <std::ranges::viewable_range resource_t,
56  std::semiregular algorithm_t,
57  std::semiregular algorithm_result_t,
58  typename execution_handler_t = execution_handler_sequential>
60  requires std::ranges::forward_range<resource_t> &&
61  std::invocable<algorithm_t, std::ranges::range_reference_t<resource_t>,
62  std::function<void(algorithm_result_t)>>
64 class algorithm_executor_blocking
65 {
66 private:
70  using resource_type = std::views::all_t<resource_t>;
73  using resource_iterator_type = std::ranges::iterator_t<resource_type>;
75 
79  using bucket_type = std::vector<algorithm_result_t>;
82  using bucket_iterator_type = std::ranges::iterator_t<bucket_type>;
84  using buffer_type = std::vector<bucket_type>;
86  using buffer_iterator_type = std::ranges::iterator_t<buffer_type>;
88 
90  enum fill_status
91  {
92  non_empty_buffer,
93  empty_buffer,
94  end_of_resource
95  };
96 
97 public:
102  algorithm_executor_blocking() = delete;
105  algorithm_executor_blocking(algorithm_executor_blocking const &) = delete;
106 
124  algorithm_executor_blocking(algorithm_executor_blocking && other) noexcept
125  {
126  move_initialise(std::move(other));
127  }
128 
130  algorithm_executor_blocking & operator=(algorithm_executor_blocking const &) = delete;
131 
134  algorithm_executor_blocking & operator=(algorithm_executor_blocking && other)
135  {
136  move_initialise(std::move(other));
137  return *this;
138  }
139 
141  ~algorithm_executor_blocking() = default;
142 
159  algorithm_executor_blocking(resource_t resource,
160  algorithm_t algorithm,
161  algorithm_result_t const SEQAN3_DOXYGEN_ONLY(result) = algorithm_result_t{},
162  execution_handler_t && exec_handler = execution_handler_t{}) :
163  exec_handler{std::move(exec_handler)},
164  resource{std::views::all(resource)},
165  resource_it{std::ranges::begin(this->resource)},
166  algorithm{std::move(algorithm)}
167  {
168  if constexpr (std::same_as<execution_handler_t, execution_handler_parallel>)
169  buffer_size = static_cast<size_t>(std::ranges::distance(resource));
170 
171  buffer.resize(buffer_size);
172  buffer_it = buffer.end();
173  buffer_end_it = buffer_it;
174  }
176 
193  {
194  fill_status status;
195  // Each invocation of the algorithm might produce zero results (e.g. a search might not find a query)
196  // this repeats the algorithm until it produces the first result or the input resource was consumed.
197  do { status = fill_buffer(); } while (status == fill_status::empty_buffer);
198 
199  if (status == fill_status::end_of_resource)
200  return {std::nullopt};
201 
202  assert(status == fill_status::non_empty_buffer);
203  assert(bucket_it != buffer_it->end());
204 
205  std::optional<algorithm_result_t> result = std::ranges::iter_move(bucket_it);
206  go_to_next_result(); // Go to next buffered result.
207  return result;
208  }
209 
211  bool is_eof() noexcept
212  {
213  return resource_it == std::ranges::end(resource);
214  }
215 
216 private:
218  fill_status fill_buffer()
219  {
220  if (!is_buffer_empty()) // Not everything consumed yet.
221  return fill_status::non_empty_buffer;
222 
223  if (is_eof()) // Case: reached end of resource.
224  return fill_status::end_of_resource;
225 
226  // Reset the buckets and the buffer iterator.
227  reset_buffer();
228 
229  // Execute the algorithm (possibly asynchronous) and fill the buckets in this pre-assigned order.
230  for (buffer_end_it = buffer_it; buffer_end_it != buffer.end() && !is_eof(); ++buffer_end_it, ++resource_it)
231  {
232  exec_handler.execute(algorithm, *resource_it, [target_buffer_it = buffer_end_it] (auto && algorithm_result)
233  {
234  target_buffer_it->push_back(std::move(algorithm_result));
235  });
236  }
237 
238  exec_handler.wait();
239 
240  // Move the results iterator to the next available result. (This skips empty results of the algorithm)
241  find_next_non_empty_bucket();
242 
243  if (is_buffer_empty())
244  return fill_status::empty_buffer;
245 
246  return fill_status::non_empty_buffer;
247  }
248 
252  bool is_buffer_empty() const
253  {
254  return buffer_it == buffer_end_it;
255  }
256 
265  void reset_buffer()
266  {
267  // Clear all buckets
268  for (auto & bucket : buffer)
269  bucket.clear();
270 
271  // Reset the iterator over the buckets.
272  buffer_it = buffer.begin();
273  }
274 
283  void find_next_non_empty_bucket()
284  {
285  assert(buffer_it <= buffer_end_it);
286  // find first buffered bucket that contains at least one element
287  buffer_it = std::find_if(buffer_it, buffer_end_it, [] (auto const & buffer)
288  {
289  return !buffer.empty();
290  });
291 
292  if (buffer_it != buffer_end_it)
293  bucket_it = buffer_it->begin();
294  }
295 
303  void go_to_next_result()
304  {
305  if (++bucket_it == buffer_it->end())
306  {
307  ++buffer_it;
308  find_next_non_empty_bucket();
309  }
310  }
311 
314  void move_initialise(algorithm_executor_blocking && other) noexcept
315  {
316  algorithm = std::move(other.algorithm);
317  buffer_size = std::move(other.buffer_size);
318  // Get the old resource position.
319  auto old_resource_position = std::ranges::distance(std::ranges::begin(other.resource),
320  other.resource_it);
321  // Move the resource and set the iterator state accordingly.
322  resource = std::move(other.resource);
323  resource_it = std::ranges::next(std::ranges::begin(resource), old_resource_position);
324 
325  // Get the old buffer and bucket iterator positions.
326  auto buffer_it_position = other.buffer_it - other.buffer.begin();
327  auto buffer_end_it_position = other.buffer_end_it - other.buffer.begin();
328 
329  std::ptrdiff_t bucket_it_position = 0;
330  if (buffer_it_position != buffer_end_it_position)
331  bucket_it_position = other.bucket_it - other.buffer_it->begin();
332 
333  // Move the buffer and set the buffer and bucket iterator accordingly.
334  buffer = std::move(other.buffer);
335  buffer_it = buffer.begin() + buffer_it_position;
336  buffer_end_it = buffer.begin() + buffer_end_it_position;
337 
338  if (buffer_it_position != buffer_end_it_position)
339  bucket_it = buffer_it->begin() + bucket_it_position;
340  }
341 
343  execution_handler_t exec_handler{};
344 
346  resource_type resource{};
348  resource_iterator_type resource_it{};
350  algorithm_t algorithm{};
351 
353  buffer_type buffer{};
355  buffer_iterator_type buffer_it{};
357  buffer_iterator_type buffer_end_it{};
359  bucket_iterator_type bucket_it{};
361  size_t buffer_size{1};
362 };
363 
369 template <typename resource_rng_t, std::semiregular algorithm_t, std::semiregular algorithm_result_t>
371 algorithm_executor_blocking(resource_rng_t &&, algorithm_t, algorithm_result_t const &) ->
372  algorithm_executor_blocking<resource_rng_t, algorithm_t, algorithm_result_t, execution_handler_sequential>;
374 } // namespace seqan3::detail
functional
std::vector
std::find_if
T find_if(T... args)
execution_handler_parallel.hpp
Provides seqan3::detail::execution_handler_parallel.
std::function
execution.hpp
Provides execution policies.
seqan3::views::move
auto const move
A view that turns lvalue-references into rvalue-references.
Definition: move.hpp:68
std::experimental::filesystem::status
T status(T... args)
ranges
Adaptations of concepts from the Ranges TS.
std::ranges::begin
T begin(T... args)
seqan3::is_eof
constexpr auto is_eof
Checks whether a given letter is equal to the EOF constant defined in <cstdio>.
Definition: predicate.hpp:95
std::ptrdiff_t
optional
execution_handler_sequential.hpp
Provides seqan3::detail::execution_handler_sequential.