17 #include <type_traits>
27 namespace seqan3::detail
45 class execution_handler_parallel
64 execution_handler_parallel(
size_t const thread_count) : state{std::make_unique<internal_state>()}
66 auto * q = &(state->queue);
67 for (
size_t i = 0; i < thread_count; ++i)
69 state->thread_pool.emplace_back([q] ()
74 if (q->wait_pop(task) == contrib::queue_op_status::closed)
87 execution_handler_parallel(execution_handler_parallel
const &) =
delete;
88 execution_handler_parallel(execution_handler_parallel &&) =
default;
89 execution_handler_parallel & operator=(execution_handler_parallel
const &) =
delete;
90 execution_handler_parallel & operator=(execution_handler_parallel &&) =
default;
93 ~execution_handler_parallel()
110 template <
typename algorithm_t, indexed_sequence_pair_range indexed_sequence_pairs_t,
typename delegate_type>
111 void execute(algorithm_t && algorithm,
112 indexed_sequence_pairs_t indexed_sequence_pairs,
113 delegate_type && delegate)
115 assert(state !=
nullptr);
118 task_type task = [=, indexed_sequence_pairs =
std::move(indexed_sequence_pairs)] ()
120 delegate(algorithm(
std::move(indexed_sequence_pairs)));
123 [[maybe_unused]] contrib::queue_op_status
status = state->queue.wait_push(
std::move(task));
124 assert(status == contrib::queue_op_status::success);
130 assert(state !=
nullptr);
132 if (!state->is_waiting)
134 state->is_waiting =
true;
135 state->queue.close();
137 for (
auto & t : state->thread_pool)
147 struct internal_state
152 contrib::fixed_buffer_queue<task_type> queue{10000};
154 bool is_waiting{
false};