19 #include <type_traits>
26 namespace seqan3::detail
51 class execution_handler_parallel
70 execution_handler_parallel(
size_t const thread_count) : state{std::make_unique<internal_state>()}
72 auto * q = &(state->queue);
73 for (
size_t i = 0; i < thread_count; ++i)
75 state->thread_pool.emplace_back([q] ()
80 if (q->wait_pop(task) == contrib::queue_op_status::closed)
93 execution_handler_parallel(execution_handler_parallel
const &) =
delete;
94 execution_handler_parallel(execution_handler_parallel &&) =
default;
95 execution_handler_parallel & operator=(execution_handler_parallel
const &) =
delete;
96 execution_handler_parallel & operator=(execution_handler_parallel &&) =
default;
99 ~execution_handler_parallel()
101 if (state !=
nullptr)
124 template <std::copy_constructible algorithm_t,
125 typename algorithm_input_t,
126 std::copy_constructible callback_t>
128 requires std::invocable<algorithm_t, algorithm_input_t, callback_t> &&
129 (std::is_lvalue_reference_v<algorithm_input_t> || std::move_constructible<algorithm_input_t>)
131 void execute(algorithm_t && algorithm, algorithm_input_t && input, callback_t && callback)
133 assert(state !=
nullptr);
149 using forward_input_t = std::tuple_element_t<0, decltype(input_tpl)>;
150 algorithm(std::forward<forward_input_t>(std::get<0>(input_tpl)),
std::move(callback));
153 [[maybe_unused]] contrib::queue_op_status
status = state->queue.wait_push(
std::move(task));
154 assert(status == contrib::queue_op_status::success);
173 template <std::copy_constructible algorithm_t,
174 std::ranges::input_range algorithm_input_range_t,
175 std::copy_constructible callback_t>
177 requires std::invocable<algorithm_t, std::ranges::range_reference_t<algorithm_input_range_t>, callback_t>
179 void bulk_execute(algorithm_t && algorithm, algorithm_input_range_t && input_range, callback_t && callback)
181 for (
auto && input : input_range)
182 execute(algorithm,
std::forward<decltype(input)>(input), callback);
190 assert(state !=
nullptr);
192 if (!state->is_waiting)
194 state->is_waiting =
true;
195 state->queue.close();
197 for (
auto & t : state->thread_pool)
207 struct internal_state
212 contrib::fixed_buffer_queue<task_type> queue{10000};
214 bool is_waiting{
false};