SeqAn3 3.4.0-rc.1
The Modern C++ library for sequence analysis.
Loading...
Searching...
No Matches
execution_handler_parallel.hpp
Go to the documentation of this file.
1// SPDX-FileCopyrightText: 2006-2024 Knut Reinert & Freie Universität Berlin
2// SPDX-FileCopyrightText: 2016-2024 Knut Reinert & MPI für molekulare Genetik
3// SPDX-License-Identifier: BSD-3-Clause
4
10#pragma once
11
12#include <concepts>
13#include <functional>
14#include <ranges>
15#include <thread>
16#include <type_traits>
17#include <vector>
18
20#include <seqan3/utility/parallel/detail/reader_writer_manager.hpp>
22
23namespace seqan3::detail
24{
25
51class execution_handler_parallel
52{
53private:
55 using task_type = std::function<void()>;
56
57public:
70 execution_handler_parallel(size_t const thread_count) : state{std::make_unique<internal_state>()}
71 {
72 auto * q = &(state->queue);
73 for (size_t i = 0; i < thread_count; ++i)
74 {
75 state->thread_pool.emplace_back(
76 [q]()
77 {
78 for (;;)
79 {
80 task_type task;
81 if (q->wait_pop(task) == contrib::queue_op_status::closed)
82 return;
83
84 task();
85 }
86 });
87 }
88 }
89
104 execution_handler_parallel() : execution_handler_parallel{1u}
105 {}
106
107 execution_handler_parallel(execution_handler_parallel const &) = delete;
108 execution_handler_parallel(execution_handler_parallel &&) = default;
109 execution_handler_parallel & operator=(execution_handler_parallel const &) = delete;
110 execution_handler_parallel & operator=(execution_handler_parallel &&) = default;
111 ~execution_handler_parallel() = default;
112
114
133 template <std::copy_constructible algorithm_t, typename algorithm_input_t, std::copy_constructible callback_t>
134 requires std::invocable<algorithm_t, algorithm_input_t, callback_t>
135 && (std::is_lvalue_reference_v<algorithm_input_t> || std::move_constructible<algorithm_input_t>)
136 void execute(algorithm_t && algorithm, algorithm_input_t && input, callback_t && callback)
137 {
138 assert(state != nullptr);
139
140 // Note: Unfortunately, we can't use std::forward_as_tuple here because a std::function object (`task_type`)
141 // cannot be constructed if the tuple element type is a rvalue-reference.
142 // So we capture the input as a `tuple<algorithm_input_t>` which either is a lvalue reference or has no
143 // reference type according to the reference collapsing rules of forwarding references.
144 // Then we forward the input into the tuple which either just stores the reference or the input is moved into
145 // the tuple. When the task is executed by some thread the stored input will either be forwarded as a
146 // lvalue-reference to the algorithm or the input is moved into the algorithm from the tuple. This is valid
147 // since the task is executed only once by the parallel execution handler.
148 // Here is a discussion about the problem on stackoverflow:
149 // https://stackoverflow.com/questions/26831382/capturing-perfectly-forwarded-variable-in-lambda/
150
151 // Asynchronously pushes the algorithm job as a task to the queue.
152 // Note: that lambda is mutable, s.t. we can move out the content of input_tpl
153 task_type task =
154 [=, input_tpl = std::tuple<algorithm_input_t>{std::forward<algorithm_input_t>(input)}]() mutable
155 {
156 using forward_input_t = std::tuple_element_t<0, decltype(input_tpl)>;
157 algorithm(std::forward<forward_input_t>(std::get<0>(input_tpl)), std::move(callback));
158 };
159
160 [[maybe_unused]] contrib::queue_op_status status = state->queue.wait_push(std::move(task));
161 assert(status == contrib::queue_op_status::success);
162 }
163
180 template <std::copy_constructible algorithm_t,
181 std::ranges::input_range algorithm_input_range_t,
182 std::copy_constructible callback_t>
183 requires std::invocable<algorithm_t, std::ranges::range_reference_t<algorithm_input_range_t>, callback_t>
184 void bulk_execute(algorithm_t && algorithm, algorithm_input_range_t && input_range, callback_t && callback)
185 {
186 for (auto && input : input_range)
187 execute(algorithm, std::forward<decltype(input)>(input), callback);
188
189 wait();
190 }
191
193 void wait()
194 {
195 assert(state != nullptr);
196
197 state->stop_and_wait();
198 }
199
200private:
209 class internal_state
210 {
211 public:
216 internal_state() = default;
217 internal_state(internal_state const &) = delete;
218 internal_state(internal_state &&) = delete;
219 internal_state & operator=(internal_state const &) = delete;
220 internal_state & operator=(internal_state &&) = delete;
221
223 ~internal_state()
224 {
225 stop_and_wait();
226 }
228
237 void stop_and_wait()
238 {
239 queue.close();
240
241 for (auto & t : thread_pool)
242 {
243 if (t.joinable())
244 t.join();
245 }
246 }
247
249 std::vector<std::thread> thread_pool{};
251 contrib::fixed_buffer_queue<task_type> queue{10000};
252 };
253
255 std::unique_ptr<internal_state> state{nullptr};
256};
257
258} // namespace seqan3::detail
Provides various type traits on generic types.
Provides seqan3::buffer_queue.
T forward(T... args)
T make_unique(T... args)
SeqAn specific customisations in the standard namespace.
Hide me