SeqAn3  3.0.1
The Modern C++ library for sequence analysis.
buffer_queue.hpp
Go to the documentation of this file.
1 // -----------------------------------------------------------------------------------------------------
2 // Copyright (c) 2006-2020, Knut Reinert & Freie Universität Berlin
3 // Copyright (c) 2016-2020, Knut Reinert & MPI für molekulare Genetik
4 // This file may be used, modified and/or redistributed under the terms of the 3-clause BSD-License
5 // shipped with this file and also available at: https://github.com/seqan/seqan3/blob/master/LICENSE.md
6 // -----------------------------------------------------------------------------------------------------
7 
13 #pragma once
14 
15 #include <atomic>
16 #include <cmath>
17 #include <mutex>
18 #include <new>
19 #include <shared_mutex>
20 #include <type_traits>
21 #include <vector>
22 
27 #include <seqan3/std/algorithm>
28 #include <seqan3/std/concepts>
29 #include <seqan3/std/new>
30 #include <seqan3/std/ranges>
31 #include <seqan3/std/span>
32 
33 namespace seqan3::contrib
34 {
35 
37 enum class queue_op_status : uint8_t
38 {
39  success = 0,
40  empty,
41  full,
42  closed
43 };
44 
45 enum struct buffer_queue_policy : uint8_t
46 {
47  fixed,
48  dynamic
49 };
50 
51 // Ringbuffer implementation:
52 // The underlying buffer has size (number of actual elements + 1). This is a trick to easily check if the queue is empty
53 // or full. Furthermore, the ring buffer uses 4 pointers. The actual push_back and pop_front position as well as
54 // the pending push_back and pop_front position. The latter indicates the position that have been advanced concurrently
55 // by multiple threads from either end.
56 //
57 // head: position to read/extract from the queue (first inserted elment) => pop_front_position
58 // tail: position where to write to/push new elements to the queue => push_back_position
59 // head_read: The actual position after x threads concurrently popped from the queue. => pending_pop_front_position
60 // tail_write: The actual position after x threads concurrently pushed to the queue. => pending_push_back_position
61 // [ ? ] [ 4 ] [ 3 ] [ 8 ] [ 0 ] [ x ] [ ? ]
62 // | ^
63 // v |
64 // head headRead tail tailWrite
65 //
66 // valid buffer between [headRead, tail)
67 // currently filled [tail, tailWrite)
68 // currently removed [head, headRead)
69 //
70 // State: empty = (head == tail)
71 // [ ? ] [ ? ] [ ? ] [ ? ] [ ? ] [ ? ] [ ? ]
72 // tail
73 // head
74 // The head is on the same position as tail.
75 // This means that currently no element is in the buffer.
76 
77 // State: full = (tail + 1 == head)
78 // [ 2 ] [ 4 ] [ 3 ] [ ? ] [ 8 ] [ 0 ] [ 7 ]
79 // tail
80 // head
81 // The tail is one position before the head.
82 // This means that currently no element can be added to the buffer since it is full.
83 // Strategies are to either wait until some elements have been popped or to expand the capacity of the
84 // queue by one, inserting the element at the current tail position and moving all elements starting from head one
85 // position to the right.
86 
87 template <std::semiregular value_t,
89  buffer_queue_policy buffer_policy = buffer_queue_policy::dynamic>
90 class buffer_queue
91 {
92 public:
93 
94  using buffer_type = buffer_t;
95  using value_type = typename buffer_type::value_type;
96  using size_type = typename buffer_type::size_type;
97  using reference = void;
98  using const_reference = void;
99 
100  // Default constructor sets capacity to 1 (still empty)
101  buffer_queue() : buffer_queue{0u}
102  {}
103  buffer_queue(buffer_queue const &) = delete;
104  buffer_queue(buffer_queue &&) = delete;
105  buffer_queue & operator=(buffer_queue const &) = delete;
106  buffer_queue & operator=(buffer_queue &&) = delete;
107  ~buffer_queue() = default;
108 
109  // you can set the initial capacity here
110  explicit buffer_queue(size_type const init_capacity)
111  {
112  buffer.resize(init_capacity + 1);
113  ring_buffer_capacity = seqan3::detail::next_power_of_two(buffer.size());
114  }
115 
116  template <std::ranges::input_range range_type>
118  buffer_queue(size_type const init_capacity, range_type && r) : buffer_queue{init_capacity}
119  {
120  std::ranges::copy(r, std::ranges::begin(buffer));
121  }
122 
126  template <typename value2_t>
128  void push(value2_t && value)
129  {
130  detail::spin_delay delay{};
131 
132  for (;;)
133  {
134  auto status = try_push(std::forward<value2_t>(value));
135  if (status == queue_op_status::closed)
136  throw queue_op_status::closed;
137  else if (status == queue_op_status::success)
138  return;
139 
140  assert(status != queue_op_status::empty);
141  assert(status == queue_op_status::full);
142  delay.wait(); // pause and then try again.
143  }
144  } // throws if closed
145 
146  template <typename value2_t>
148  queue_op_status wait_push(value2_t && value)
149  {
150  detail::spin_delay delay{};
151 
152  for (;;)
153  {
154  auto status = try_push(std::forward<value2_t>(value));
155  // wait until queue is not full anymore..
156  if (status != queue_op_status::full)
157  return status;
158 
159  assert(status != queue_op_status::empty);
160  assert(status == queue_op_status::full);
161  delay.wait(); // pause and then try again.
162  }
163  }
164 
165  value_type value_pop() // throws if closed
166  {
167  detail::spin_delay delay{};
168 
169  value_type value{};
170  for (;;)
171  {
172  auto status = try_pop(value);
173 
174  if (status == queue_op_status::closed)
175  throw queue_op_status::closed;
176  else if (status == queue_op_status::success)
177  return value;
178 
179  assert(status != queue_op_status::full);
180  assert(status == queue_op_status::empty);
181  delay.wait(); // pause and then try again.
182  }
183  }
184 
185  queue_op_status wait_pop(value_type & value)
186  {
187  detail::spin_delay delay{};
188 
189  queue_op_status status;
190  for (;;)
191  {
192  status = try_pop(value);
193 
194  if (status == queue_op_status::closed || status == queue_op_status::success)
195  break;
196 
197  assert(status != queue_op_status::full);
198  assert(status == queue_op_status::empty);
199  delay.wait(); // pause and then try again.
200  }
201  return status;
202  }
204 
208  template <typename value2_t>
210  queue_op_status try_push(value2_t &&);
211 
212  queue_op_status try_pop(value_t &);
214 
218  void close() noexcept
219  {
220  closed_flag.store(true, std::memory_order_release);
221  }
222 
223  bool is_closed() const noexcept
224  {
225  return closed_flag.load(std::memory_order_acquire);
226  }
227 
228  bool is_empty() const noexcept
229  {
230  std::unique_lock write_lock(mutex);
231  return pop_front_position == push_back_position;
232  }
233 
234  bool is_full() const noexcept
235  {
236  std::unique_lock write_lock(mutex);
237  return is_ring_buffer_exhausted(pop_front_position, push_back_position);
238  }
239 
240  size_type size() const noexcept
241  {
242  std::unique_lock write_lock(mutex);
243  if (to_buffer_position(pop_front_position) <= to_buffer_position(push_back_position))
244  {
245  return to_buffer_position(push_back_position) - to_buffer_position(pop_front_position);
246  }
247  else
248  {
249  assert(buffer.size() > (to_buffer_position(pop_front_position) - to_buffer_position(push_back_position)));
250  return buffer.size() - (to_buffer_position(pop_front_position) - to_buffer_position(push_back_position));
251  }
252  }
254 private:
255 
265  constexpr bool is_ring_buffer_exhausted(size_type const from, size_type const to) const
266  {
267  assert(to <= (from + ring_buffer_capacity + 1)); // The tail cannot overwrite the head.
268 
269  return to >= from + ring_buffer_capacity;
270  }
271 
285  constexpr size_type to_buffer_position(size_type const position) const
286  {
287  return position & (ring_buffer_capacity - 1);
288  }
289 
309  size_type cyclic_increment(size_type position)
310  {
311  // invariants:
312  // - ring_buffer_capacity is a power of 2
313  // - (position % ring_buffer_capacity) is in [0, buffer.size())
314  //
315  // return the next greater position that fulfils the invariants
316  if (to_buffer_position(++position) >= buffer.size())
317  position += ring_buffer_capacity - buffer.size(); // If the position reached
318  return position;
319  }
320 
321  template <typename value2_t>
323  (buffer_policy == buffer_queue_policy::fixed)
324  bool overflow(value2_t &&)
325  {
326  return false;
327  }
328 
329  template <typename value2_t>
331  (buffer_policy == buffer_queue_policy::dynamic)
332  bool overflow(value2_t && value);
333 
335  buffer_t buffer;
338  alignas(std::hardware_destructive_interference_size) std::atomic<size_type> pending_pop_front_position{0};
340  alignas(std::hardware_destructive_interference_size) std::atomic<size_type> pending_push_back_position{0};
343 };
344 
345 // Specifies a fixed size buffer queue.
346 template <std::semiregular value_t, sequence_container buffer_t = std::vector<value_t>>
347 using fixed_buffer_queue = buffer_queue<value_t, buffer_t, buffer_queue_policy::fixed>;
348 
349 // Specifies a dynamic size buffer queue (growable).
350 template <std::semiregular value_t, sequence_container buffer_t = std::vector<value_t>>
351 using dynamic_buffer_queue = buffer_queue<value_t, buffer_t, buffer_queue_policy::dynamic>;
352 
353 // ============================================================================
354 // Metafunctions
355 // ============================================================================
356 
357 // ============================================================================
358 // Functions
359 // ============================================================================
360 
361 template <typename value_t, typename buffer_t, buffer_queue_policy buffer_policy>
362 template <typename value2_t>
364  (buffer_policy == buffer_queue_policy::dynamic)
365 inline bool buffer_queue<value_t, buffer_t, buffer_policy>::overflow(value2_t && value)
366 {
367  // try to extend capacity
368  std::unique_lock write_lock{mutex};
369 
370  size_type old_size = buffer.size();
371  size_type ring_buffer_capacity = this->ring_buffer_capacity;
372  size_type local_front = this->pop_front_position;
373  size_type local_back = this->push_back_position;
374 
375  // Expects no pending pushes or pops in unique lock.
376  assert(local_back == this->pending_push_back_position);
377  assert(local_front == this->pending_pop_front_position);
378 
379  bool valueWasAppended = false;
380 
381  // did we reach the capacity limit (another thread could have done the upgrade already)?
382  // buffer is full if tail_pos + 1 == head_pos
383  if (is_ring_buffer_exhausted(local_front, cyclic_increment(local_back)))
384  {
385  // In case of a full queue write the value into the additional slot.
386  // Note, that the ring-buffer implementation uses one additional field which is not used except
387  // when overflow happens. This invariant is used, to simply check for the full/empty state of the queue.
388  if (old_size != 0)
389  {
390  auto it = std::ranges::begin(buffer) + to_buffer_position(local_back);
391  *it = std::forward<value2_t>(value);
392  local_back = local_front + ring_buffer_capacity;
393  valueWasAppended = true;
394  }
395 
396  assert(is_ring_buffer_exhausted(local_front, local_back));
397 
398  // get positions of head/tail in current buffer sequence
399  size_type front_buffer_position = to_buffer_position(local_front);
400  size_type back_buffer_position = to_buffer_position(local_back);
401 
402  // increase capacity by one and move all elements from current pop_front_position one to the right.
403  buffer.resize(old_size + 1);
404  ring_buffer_capacity = seqan3::detail::next_power_of_two(buffer.size());
405  std::ranges::move_backward(std::span{buffer.data() + front_buffer_position, buffer.data() + old_size},
406  buffer.data() + buffer.size());
407 
408  // Update the pop_front and push_back positions.
409  if (old_size != 0)
410  {
411  this->pending_pop_front_position = this->pop_front_position = front_buffer_position + 1;
412  this->pending_push_back_position = this->push_back_position = back_buffer_position + ring_buffer_capacity;
413  }
414  this->ring_buffer_capacity = ring_buffer_capacity;
415  }
416  return valueWasAppended;
417 }
418 
419 // ----------------------------------------------------------------------------
420 // Function try_pop()
421 // ----------------------------------------------------------------------------
422 
423 /*
424  * @fn ConcurrentQueue#tryPopFront
425  * @headerfile <seqan/parallel.h>
426  * @brief Try to dequeue a value from a queue.
427  *
428  * @signature bool tryPopFront(result, queue[, parallelTag]);
429  *
430  *
431  * @param[in,out] queue A queue.
432  * @param[out] result The dequeued value (if available).
433  * @param[in] parallelTag The concurrency scheme. If multiple threads dequeue values concurrently this tag must be
434  * @link ParallelismTags#Parallel @endlink. The more efficient @link ParallelismTags#Serial
435  * @endlink tag can only be used if one thread calls <tt>popFront</tt> at a time.
436  * Default is @link ParallelismTags#Parallel @endlink.
437  * @return bool Returns <tt>true</tt> if a value could be dequeued and <tt>false</tt> otherwise.
438  */
439 template <typename value_t, typename buffer_t, buffer_queue_policy buffer_policy>
440 inline queue_op_status buffer_queue<value_t, buffer_t, buffer_policy>::try_pop(value_t & result)
441 {
442  // try to extract a value
443  std::shared_lock read_lock{mutex};
444 
445  size_type local_pending_pop_front_position{};
446  size_type next_local_pop_front_position{};
447  detail::spin_delay spinDelay{};
448 
449  local_pending_pop_front_position = this->pending_pop_front_position;
450  // wait for queue to become filled
451  while (true)
452  {
453  size_type local_push_back_position = this->push_back_position;
454 
455  assert(local_pending_pop_front_position <= local_push_back_position);
456 
457  // Check if queue is empty
458  if (local_pending_pop_front_position == local_push_back_position)
459  {
460  return is_closed() ? queue_op_status::closed : queue_op_status::empty;
461  }
462 
463  // Get the next ring-buffer position to read from.
464  next_local_pop_front_position = cyclic_increment(local_pending_pop_front_position);
465  // Did another/other thread(s) already acquired this slot?
466  // If yes, try with next position. If not, break and read from aquired position.
467  if (this->pending_pop_front_position.compare_exchange_weak(local_pending_pop_front_position,
468  next_local_pop_front_position))
469  break;
470 
471  spinDelay.wait();
472  }
473 
474  // Store the value from the aquired read position.
475  result = std::ranges::iter_move(buffer.begin() + to_buffer_position(local_pending_pop_front_position));
476 
477  // wait for pending previous reads and synchronize pop_front_position to local_pending_pop_front_position
478  {
479  detail::spin_delay delay{};
480  size_type acquired_slot = local_pending_pop_front_position;
481  while (!this->pop_front_position.compare_exchange_weak(acquired_slot, next_local_pop_front_position))
482  {
483  acquired_slot = local_pending_pop_front_position;
484  delay.wait(); // add adapting delay in case of high contention.
485  }
486  }
487 
488  return queue_op_status::success;
489 }
490 
491 // ----------------------------------------------------------------------------
492 // Function try_push()
493 // ----------------------------------------------------------------------------
494 
495 /*
496  * @fn ConcurrentQueue#appendValue
497  * @headerfile <seqan/parallel.h>
498  * @brief Enqueue a value to a queue.
499  *
500  * @signature void appendValue(queue, val[, expandTag[, parallelTag]);
501  *
502  *
503  * @param[in,out] queue A queue.
504  * @param[in] val The value to enqueue.
505  * @param[in] expandTag The overflow strategy. If @link OverflowStrategyTags#Generous @endlink the queue will be
506  * automatically resized if the capacity is exceeded, otherwise the thread spinlocks until
507  * the element can be enqueued.
508  * Default is the @link DefaultOverflowImplicit @endlink result for the <tt>queue</tt> type.
509  * @param[in] parallelTag The concurrency scheme. If multiple threads enqueue values concurrently this tag must be
510  * @link ParallelismTags#Parallel @endlink. The more efficient @link ParallelismTags#Serial
511  * @endlink tag can only be used if one thread calls <tt>appendValue</tt> at a time.
512  * Default is @link ParallelismTags#Parallel @endlink.
513  */
514 template <typename value_t, typename buffer_t, buffer_queue_policy buffer_policy>
515 template <typename value2_t>
517 inline queue_op_status buffer_queue<value_t, buffer_t, buffer_policy>::try_push(value2_t && value)
518 {
519  // try to push the value
520  {
521  detail::spin_delay delay{};
522 
523  std::shared_lock read_lock(mutex);
524 
525  if (is_closed())
526  return queue_op_status::closed;
527 
528  // Current up to date position to push an element to
529  size_type local_pending_push_back_position = this->pending_push_back_position;
530 
531  while (true)
532  {
533  // Get the next potential position to write the value too.
534  size_type next_local_push_back_position = cyclic_increment(local_pending_push_back_position);
535  size_type local_pop_front_position = this->pop_front_position;
536 
537  // Check if there are enough slots to write to.
538  // If not either wait or try to overflow if it is a dynamic queue.
539  if (is_ring_buffer_exhausted(local_pop_front_position, next_local_push_back_position))
540  break;
541 
542  // Did another/other thread(s) acquired the current pending position before this thread
543  // If yes, try again if not, write into acquired slot.
544  if (this->pending_push_back_position.compare_exchange_weak(local_pending_push_back_position,
545  next_local_push_back_position))
546  {
547  // Current thread acquired the local_pending_push_back_position and can now write the value into the
548  // proper slot of the ring buffer.
549  auto it = std::ranges::begin(buffer) + to_buffer_position(local_pending_push_back_position);
550  *it = std::forward<value2_t>(value);
551 
552  // wait for pending previous writes and synchronise push_back_position to
553  // local_pending_push_back_position
554  {
555  detail::spin_delay delay{};
556  // the slot this thread acquired to write to
557  size_type acquired_slot = local_pending_push_back_position;
558  while (!this->push_back_position.compare_exchange_weak(acquired_slot,
559  next_local_push_back_position))
560  {
561  acquired_slot = local_pending_push_back_position;
562  delay.wait();
563  }
564  }
565  return queue_op_status::success;
566  }
567 
568  delay.wait();
569  }
570  }
571 
572  // if possible extend capacity and return.
573  if (overflow(std::forward<value2_t>(value)))
574  {
575  return queue_op_status::success; // always return success, since the queue resizes and cannot be full.
576  }
577 
578  // We could not extend the queue so it must be full.
579  return queue_op_status::full;
580 }
582 } // namespace seqan3::contrib
bit_manipulation.hpp
Provides utility functions for bit twiddling.
span
Provides std::span from the C++20 standard library.
sequence_container
A more refined container concept than seqan3::container.
semiregular
Subsumes std::copyable and std::default_constructible.
vector
std::hardware_destructive_interference_size
constexpr std::size_t hardware_destructive_interference_size
Minimum offset between two objects to avoid false sharing.
Definition: new:34
concept.hpp
Adaptations of concepts from the standard library.
new
Provides C++17/20 additions to the <new> header, if they are not already available.
cmath
algorithm
Adaptations of algorithms from the Ranges TS.
concepts
The Concepts library.
range.hpp
Provides various transformation traits used by the range module.
std::unique_lock
convertible_to
The concept std::convertible_to<From, To> specifies that an expression of the type and value category...
atomic
std::experimental::filesystem::status
T status(T... args)
seqan3::pack_traits::size
constexpr size_t size
The size of a type pack.
Definition: traits.hpp:116
ranges
Adaptations of concepts from the Ranges TS.
std::ranges::begin
T begin(T... args)
spin_delay.hpp
Provides seqan3::detail::spin_delay.
std::experimental::filesystem::is_empty
T is_empty(T... args)
std::fixed
T fixed(T... args)
std::empty
T empty(T... args)
mutex
shared_mutex
std::span::data
T data(T... args)
std::shared_lock