SeqAn3 3.2.0
The Modern C++ library for sequence analysis.
All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Modules Pages
execution_handler_parallel.hpp
Go to the documentation of this file.
1// -----------------------------------------------------------------------------------------------------
2// Copyright (c) 2006-2022, Knut Reinert & Freie Universität Berlin
3// Copyright (c) 2016-2022, Knut Reinert & MPI für molekulare Genetik
4// This file may be used, modified and/or redistributed under the terms of the 3-clause BSD-License
5// shipped with this file and also available at: https://github.com/seqan/seqan3/blob/master/LICENSE.md
6// -----------------------------------------------------------------------------------------------------
7
13#pragma once
14
15#include <concepts>
16#include <functional>
17#include <ranges>
18#include <thread>
19#include <type_traits>
20#include <vector>
21
25
26namespace seqan3::detail
27{
28
54class execution_handler_parallel
55{
56private:
58 using task_type = std::function<void()>;
59
60public:
73 execution_handler_parallel(size_t const thread_count) : state{std::make_unique<internal_state>()}
74 {
75 auto * q = &(state->queue);
76 for (size_t i = 0; i < thread_count; ++i)
77 {
78 state->thread_pool.emplace_back(
79 [q]()
80 {
81 for (;;)
82 {
83 task_type task;
84 if (q->wait_pop(task) == contrib::queue_op_status::closed)
85 return;
86
87 task();
88 }
89 });
90 }
91 }
92
107 execution_handler_parallel() : execution_handler_parallel{1u}
108 {}
109
110 execution_handler_parallel(execution_handler_parallel const &) = delete;
111 execution_handler_parallel(execution_handler_parallel &&) = default;
112 execution_handler_parallel & operator=(execution_handler_parallel const &) = delete;
113 execution_handler_parallel & operator=(execution_handler_parallel &&) = default;
114 ~execution_handler_parallel() = default;
115
117
136 template <std::copy_constructible algorithm_t, typename algorithm_input_t, std::copy_constructible callback_t>
137 requires std::invocable<algorithm_t, algorithm_input_t, callback_t>
138 && (std::is_lvalue_reference_v<algorithm_input_t> || std::move_constructible<algorithm_input_t>)
139 void execute(algorithm_t && algorithm, algorithm_input_t && input, callback_t && callback)
140 {
141 assert(state != nullptr);
142
143 // Note: Unfortunately, we can't use std::forward_as_tuple here because a std::function object (`task_type`)
144 // cannot be constructed if the tuple element type is a rvalue-reference.
145 // So we capture the input as a `tuple<algorithm_input_t>` which either is a lvalue reference or has no
146 // reference type according to the reference collapsing rules of forwarding references.
147 // Then we forward the input into the tuple which either just stores the reference or the input is moved into
148 // the tuple. When the task is executed by some thread the stored input will either be forwarded as a
149 // lvalue-reference to the algorithm or the input is moved into the algorithm from the tuple. This is valid
150 // since the task is executed only once by the parallel execution handler.
151 // Here is a discussion about the problem on stackoverflow:
152 // https://stackoverflow.com/questions/26831382/capturing-perfectly-forwarded-variable-in-lambda/
153
154 // Asynchronously pushes the algorithm job as a task to the queue.
155 // Note: that lambda is mutable, s.t. we can move out the content of input_tpl
156 task_type task =
157 [=, input_tpl = std::tuple<algorithm_input_t>{std::forward<algorithm_input_t>(input)}]() mutable
158 {
159 using forward_input_t = std::tuple_element_t<0, decltype(input_tpl)>;
160 algorithm(std::forward<forward_input_t>(std::get<0>(input_tpl)), std::move(callback));
161 };
162
163 [[maybe_unused]] contrib::queue_op_status status = state->queue.wait_push(std::move(task));
164 assert(status == contrib::queue_op_status::success);
165 }
166
183 template <std::copy_constructible algorithm_t,
184 std::ranges::input_range algorithm_input_range_t,
185 std::copy_constructible callback_t>
186 requires std::invocable<algorithm_t, std::ranges::range_reference_t<algorithm_input_range_t>, callback_t>
187 void bulk_execute(algorithm_t && algorithm, algorithm_input_range_t && input_range, callback_t && callback)
188 {
189 for (auto && input : input_range)
190 execute(algorithm, std::forward<decltype(input)>(input), callback);
191
192 wait();
193 }
194
196 void wait()
197 {
198 assert(state != nullptr);
199
200 state->stop_and_wait();
201 }
202
203private:
212 class internal_state
213 {
214 public:
219 internal_state() = default;
220 internal_state(internal_state const &) = delete;
221 internal_state(internal_state &&) = default;
222 internal_state & operator=(internal_state const &) = delete;
223 internal_state & operator=(internal_state &&) = default;
224
226 ~internal_state()
227 {
228 stop_and_wait();
229 }
231
240 void stop_and_wait()
241 {
242 queue.close();
243
244 for (auto & t : thread_pool)
245 {
246 if (t.joinable())
247 t.join();
248 }
249 }
250
252 std::vector<std::thread> thread_pool{};
254 contrib::fixed_buffer_queue<task_type> queue{10000};
255 };
256
258 std::unique_ptr<internal_state> state{nullptr};
259};
260
261} // namespace seqan3::detail
Provides various type traits on generic types.
Provides seqan3::buffer_queue.
T forward(T... args)
T make_unique(T... args)
SeqAn specific customisations in the standard namespace.
Provides seqan3::detail::reader_writer_manager.