SeqAn3  3.0.1
The Modern C++ library for sequence analysis.
execution_handler_parallel.hpp
Go to the documentation of this file.
1 // -----------------------------------------------------------------------------------------------------
2 // Copyright (c) 2006-2020, Knut Reinert & Freie Universität Berlin
3 // Copyright (c) 2016-2020, Knut Reinert & MPI für molekulare Genetik
4 // This file may be used, modified and/or redistributed under the terms of the 3-clause BSD-License
5 // shipped with this file and also available at: https://github.com/seqan/seqan3/blob/master/LICENSE.md
6 // -----------------------------------------------------------------------------------------------------
7 
13 #pragma once
14 
15 #include <functional>
16 #include <thread>
17 #include <type_traits>
18 #include <vector>
19 
23 #include <seqan3/core/platform.hpp>
24 #include <seqan3/std/concepts>
25 #include <seqan3/std/ranges>
26 
27 namespace seqan3::detail
28 {
29 
45 class execution_handler_parallel
46 {
47 private:
49  using task_type = std::function<void()>;
50 
51 public:
64  execution_handler_parallel(size_t const thread_count) : state{std::make_unique<internal_state>()}
65  {
66  auto * q = &(state->queue);
67  for (size_t i = 0; i < thread_count; ++i)
68  {
69  state->thread_pool.emplace_back([q] ()
70  {
71  for (;;)
72  {
73  task_type task;
74  if (q->wait_pop(task) == contrib::queue_op_status::closed)
75  return;
76 
77  task();
78  }
79  });
80  }
81  }
82 
84  execution_handler_parallel() : execution_handler_parallel{std::thread::hardware_concurrency()}
85  {}
86 
87  execution_handler_parallel(execution_handler_parallel const &) = delete;
88  execution_handler_parallel(execution_handler_parallel &&) = default;
89  execution_handler_parallel & operator=(execution_handler_parallel const &) = delete;
90  execution_handler_parallel & operator=(execution_handler_parallel &&) = default;
91 
93  ~execution_handler_parallel()
94  {
95  if (state != nullptr)
96  wait();
97  }
99 
110  template <typename algorithm_t, indexed_sequence_pair_range indexed_sequence_pairs_t, typename delegate_type>
111  void execute(algorithm_t && algorithm,
112  indexed_sequence_pairs_t indexed_sequence_pairs,
113  delegate_type && delegate)
114  {
115  assert(state != nullptr);
116 
117  // Asynchronously pushes the alignment job as a task to the queue.
118  task_type task = [=, indexed_sequence_pairs = std::move(indexed_sequence_pairs)] ()
119  {
120  delegate(algorithm(std::move(indexed_sequence_pairs)));
121  };
122 
123  [[maybe_unused]] contrib::queue_op_status status = state->queue.wait_push(std::move(task));
124  assert(status == contrib::queue_op_status::success);
125  }
126 
128  void wait()
129  {
130  assert(state != nullptr);
131 
132  if (!state->is_waiting)
133  {
134  state->is_waiting = true;
135  state->queue.close();
136 
137  for (auto & t : state->thread_pool)
138  {
139  if (t.joinable())
140  t.join();
141  }
142  }
143  }
144 
145 private:
147  struct internal_state
148  {
150  std::vector<std::thread> thread_pool{};
152  contrib::fixed_buffer_queue<task_type> queue{10000};
154  bool is_waiting{false};
155  };
156 
158  std::unique_ptr<internal_state> state{nullptr};
159 };
160 
161 } // namespace seqan3
reader_writer_manager.hpp
Provides seqan3::detail::reader_writer_manager.
functional
buffer_queue.hpp
Provides seqan3::buffer_queue.
vector
seqan3::views::move
const auto move
A view that turns lvalue-references into rvalue-references.
Definition: move.hpp:68
std::function
concepts
The Concepts library.
thread
std::thread::hardware_concurrency
T hardware_concurrency(T... args)
concept.hpp
Provides concepts needed internally for the alignment algorithms.
std::experimental::filesystem::status
T status(T... args)
ranges
Adaptations of concepts from the Ranges TS.
platform.hpp
Provides platform and dependency checks.
std::unique_ptr