SeqAn3  3.0.2
The Modern C++ library for sequence analysis.
execution_handler_parallel.hpp
Go to the documentation of this file.
1 // -----------------------------------------------------------------------------------------------------
2 // Copyright (c) 2006-2020, Knut Reinert & Freie Universität Berlin
3 // Copyright (c) 2016-2020, Knut Reinert & MPI für molekulare Genetik
4 // This file may be used, modified and/or redistributed under the terms of the 3-clause BSD-License
5 // shipped with this file and also available at: https://github.com/seqan/seqan3/blob/master/LICENSE.md
6 // -----------------------------------------------------------------------------------------------------
7 
13 #pragma once
14 
15 #include <seqan3/std/concepts>
16 #include <functional>
17 #include <seqan3/std/ranges>
18 #include <thread>
19 #include <type_traits>
20 #include <vector>
21 
25 
26 namespace seqan3::detail
27 {
28 
51 class execution_handler_parallel
52 {
53 private:
55  using task_type = std::function<void()>;
56 
57 public:
70  execution_handler_parallel(size_t const thread_count) : state{std::make_unique<internal_state>()}
71  {
72  auto * q = &(state->queue);
73  for (size_t i = 0; i < thread_count; ++i)
74  {
75  state->thread_pool.emplace_back([q] ()
76  {
77  for (;;)
78  {
79  task_type task;
80  if (q->wait_pop(task) == contrib::queue_op_status::closed)
81  return;
82 
83  task();
84  }
85  });
86  }
87  }
88 
90  execution_handler_parallel() : execution_handler_parallel{std::thread::hardware_concurrency()}
91  {}
92 
93  execution_handler_parallel(execution_handler_parallel const &) = delete;
94  execution_handler_parallel(execution_handler_parallel &&) = default;
95  execution_handler_parallel & operator=(execution_handler_parallel const &) = delete;
96  execution_handler_parallel & operator=(execution_handler_parallel &&) = default;
97 
99  ~execution_handler_parallel()
100  {
101  if (state != nullptr)
102  wait();
103  }
105 
124  template <std::copy_constructible algorithm_t,
125  typename algorithm_input_t,
126  std::copy_constructible callback_t>
128  requires std::invocable<algorithm_t, algorithm_input_t, callback_t> &&
129  (std::is_lvalue_reference_v<algorithm_input_t> || std::move_constructible<algorithm_input_t>)
131  void execute(algorithm_t && algorithm, algorithm_input_t && input, callback_t && callback)
132  {
133  assert(state != nullptr);
134 
135  // Note: Unfortunately, we can't use std::forward_as_tuple here because a std::function object (`task_type`)
136  // cannot be constructed if the tuple element type is a rvalue-reference.
137  // So we capture the input as a `tuple<algorithm_input_t>` which either is a lvalue reference or has no
138  // reference type according to the reference collapsing rules of forwarding references.
139  // Then we forward the input into the tuple which either just stores the reference or the input is moved into
140  // the tuple. When the task is executed by some thread the stored input will either be forwarded as a
141  // lvalue-reference to the algorithm or the input is moved into the algorithm from the tuple. This is valid
142  // since the task is executed only once by the parallel execution handler.
143  // Here is a discussion about the problem on stackoverflow:
144  // https://stackoverflow.com/questions/26831382/capturing-perfectly-forwarded-variable-in-lambda/
145 
146  // Asynchronously pushes the algorithm job as a task to the queue.
147  task_type task = [=, input_tpl = std::tuple<algorithm_input_t>{std::forward<algorithm_input_t>(input)}] ()
148  {
149  using forward_input_t = std::tuple_element_t<0, decltype(input_tpl)>;
150  algorithm(std::forward<forward_input_t>(std::get<0>(input_tpl)), std::move(callback));
151  };
152 
153  [[maybe_unused]] contrib::queue_op_status status = state->queue.wait_push(std::move(task));
154  assert(status == contrib::queue_op_status::success);
155  }
156 
173  template <std::copy_constructible algorithm_t,
174  std::ranges::input_range algorithm_input_range_t,
175  std::copy_constructible callback_t>
177  requires std::invocable<algorithm_t, std::ranges::range_reference_t<algorithm_input_range_t>, callback_t>
179  void bulk_execute(algorithm_t && algorithm, algorithm_input_range_t && input_range, callback_t && callback)
180  {
181  for (auto && input : input_range)
182  execute(algorithm, std::forward<decltype(input)>(input), callback);
183 
184  wait();
185  }
186 
188  void wait()
189  {
190  assert(state != nullptr);
191 
192  if (!state->is_waiting)
193  {
194  state->is_waiting = true;
195  state->queue.close();
196 
197  for (auto & t : state->thread_pool)
198  {
199  if (t.joinable())
200  t.join();
201  }
202  }
203  }
204 
205 private:
207  struct internal_state
208  {
210  std::vector<std::thread> thread_pool{};
212  contrib::fixed_buffer_queue<task_type> queue{10000};
214  bool is_waiting{false};
215  };
216 
218  std::unique_ptr<internal_state> state{nullptr};
219 };
220 
221 } // namespace seqan3
reader_writer_manager.hpp
Provides seqan3::detail::reader_writer_manager.
functional
buffer_queue.hpp
Provides seqan3::buffer_queue.
vector
basic.hpp
Provides various type traits on generic types.
std::tuple
std::function
concepts
The Concepts library.
thread
seqan3::views::move
auto const move
A view that turns lvalue-references into rvalue-references.
Definition: move.hpp:68
std::forward
T forward(T... args)
std::thread::hardware_concurrency
T hardware_concurrency(T... args)
std::experimental::filesystem::status
T status(T... args)
ranges
Adaptations of concepts from the Ranges TS.
std::unique_ptr