SeqAn3  3.0.3
The Modern C++ library for sequence analysis.
buffer_queue.hpp
Go to the documentation of this file.
1 // -----------------------------------------------------------------------------------------------------
2 // Copyright (c) 2006-2021, Knut Reinert & Freie Universität Berlin
3 // Copyright (c) 2016-2021, Knut Reinert & MPI für molekulare Genetik
4 // This file may be used, modified and/or redistributed under the terms of the 3-clause BSD-License
5 // shipped with this file and also available at: https://github.com/seqan/seqan3/blob/master/LICENSE.md
6 // -----------------------------------------------------------------------------------------------------
7 
13 #pragma once
14 
15 #include <seqan3/std/algorithm>
16 #include <atomic>
17 #include <seqan3/std/bit>
18 #include <cmath>
19 #include <seqan3/std/concepts>
20 #include <mutex>
21 #include <seqan3/std/new>
22 #include <seqan3/std/ranges>
23 #include <shared_mutex>
24 #include <seqan3/std/span>
25 #include <type_traits>
26 #include <vector>
27 
31 
32 namespace seqan3::contrib
33 {
34 
36 enum class queue_op_status : uint8_t
37 {
38  success = 0,
39  empty,
40  full,
41  closed
42 };
43 
44 enum struct buffer_queue_policy : uint8_t
45 {
46  fixed,
47  dynamic
48 };
49 
50 // Ringbuffer implementation:
51 // The underlying buffer has size (number of actual elements + 1). This is a trick to easily check if the queue is empty
52 // or full. Furthermore, the ring buffer uses 4 pointers. The actual push_back and pop_front position as well as
53 // the pending push_back and pop_front position. The latter indicates the position that have been advanced concurrently
54 // by multiple threads from either end.
55 //
56 // head: position to read/extract from the queue (first inserted elment) => pop_front_position
57 // tail: position where to write to/push new elements to the queue => push_back_position
58 // head_read: The actual position after x threads concurrently popped from the queue. => pending_pop_front_position
59 // tail_write: The actual position after x threads concurrently pushed to the queue. => pending_push_back_position
60 // [ ? ] [ 4 ] [ 3 ] [ 8 ] [ 0 ] [ x ] [ ? ]
61 // | ^
62 // v |
63 // head headRead tail tailWrite
64 //
65 // valid buffer between [headRead, tail)
66 // currently filled [tail, tailWrite)
67 // currently removed [head, headRead)
68 //
69 // State: empty = (head == tail)
70 // [ ? ] [ ? ] [ ? ] [ ? ] [ ? ] [ ? ] [ ? ]
71 // tail
72 // head
73 // The head is on the same position as tail.
74 // This means that currently no element is in the buffer.
75 
76 // State: full = (tail + 1 == head)
77 // [ 2 ] [ 4 ] [ 3 ] [ ? ] [ 8 ] [ 0 ] [ 7 ]
78 // tail
79 // head
80 // The tail is one position before the head.
81 // This means that currently no element can be added to the buffer since it is full.
82 // Strategies are to either wait until some elements have been popped or to expand the capacity of the
83 // queue by one, inserting the element at the current tail position and moving all elements starting from head one
84 // position to the right.
85 
86 template <std::semiregular value_t,
88  buffer_queue_policy buffer_policy = buffer_queue_policy::dynamic>
89 class buffer_queue
90 {
91 public:
92 
93  using buffer_type = buffer_t;
94  using value_type = typename buffer_type::value_type;
95  using size_type = typename buffer_type::size_type;
96  using reference = void;
97  using const_reference = void;
98 
99  // Default constructor sets capacity to 1 (still empty)
100  buffer_queue() : buffer_queue{0u}
101  {}
102  buffer_queue(buffer_queue const &) = delete;
103  buffer_queue(buffer_queue &&) = delete;
104  buffer_queue & operator=(buffer_queue const &) = delete;
105  buffer_queue & operator=(buffer_queue &&) = delete;
106  ~buffer_queue() = default;
107 
108  // you can set the initial capacity here
109  explicit buffer_queue(size_type const init_capacity)
110  {
111  buffer.resize(init_capacity + 1);
112  ring_buffer_capacity = std::bit_ceil(buffer.size());
113  }
114 
115  template <std::ranges::input_range range_type>
116  requires std::convertible_to<std::ranges::range_value_t<range_type>, value_type>
117  buffer_queue(size_type const init_capacity, range_type && r) : buffer_queue{init_capacity}
118  {
119  std::ranges::copy(r, std::ranges::begin(buffer));
120  }
121 
125  template <typename value2_t>
126  requires std::convertible_to<value2_t, value_t>
127  void push(value2_t && value)
128  {
129  detail::spin_delay delay{};
130 
131  for (;;)
132  {
133  auto status = try_push(std::forward<value2_t>(value));
134  if (status == queue_op_status::closed)
135  throw queue_op_status::closed;
136  else if (status == queue_op_status::success)
137  return;
138 
139  assert(status != queue_op_status::empty);
140  assert(status == queue_op_status::full);
141  delay.wait(); // pause and then try again.
142  }
143  } // throws if closed
144 
145  template <typename value2_t>
146  requires std::convertible_to<value2_t, value_t>
147  queue_op_status wait_push(value2_t && value)
148  {
149  detail::spin_delay delay{};
150 
151  for (;;)
152  {
153  auto status = try_push(std::forward<value2_t>(value));
154  // wait until queue is not full anymore..
155  if (status != queue_op_status::full)
156  return status;
157 
158  assert(status != queue_op_status::empty);
159  assert(status == queue_op_status::full);
160  delay.wait(); // pause and then try again.
161  }
162  }
163 
164  value_type value_pop() // throws if closed
165  {
166  detail::spin_delay delay{};
167 
168  value_type value{};
169  for (;;)
170  {
171  if (!writer_waiting.load())
172  {
173  auto status = try_pop(value);
174 
175  if (status == queue_op_status::closed)
176  throw queue_op_status::closed;
177  else if (status == queue_op_status::success)
178  return value;
179 
180  assert(status != queue_op_status::full);
181  assert(status == queue_op_status::empty);
182  }
183  delay.wait(); // pause and then try again.
184  }
185  }
186 
187  queue_op_status wait_pop(value_type & value)
188  {
189  detail::spin_delay delay{};
190 
191  queue_op_status status;
192  for (;;)
193  {
194  if (!writer_waiting.load())
195  {
196  status = try_pop(value);
197 
198  if (status == queue_op_status::closed || status == queue_op_status::success)
199  break;
200 
201  assert(status != queue_op_status::full);
202  assert(status == queue_op_status::empty);
203  }
204  delay.wait(); // pause and then try again.
205  }
206  return status;
207  }
209 
213  template <typename value2_t>
214  requires std::convertible_to<value2_t, value_t>
215  queue_op_status try_push(value2_t &&);
216 
217  queue_op_status try_pop(value_t &);
219 
223  void close()
224  {
225  if (writer_waiting.exchange(true)) // First writer that closes the queue will continue, the rest returns.
226  return;
227 
228  try
229  {
230  std::unique_lock write_lock{mutex};
231  closed_flag = true;
232  writer_waiting.store(false); // reset the lock.
233  }
234  catch (...)
235  {
236  writer_waiting.store(false); // reset the lock.
238  }
239  }
240 
241  bool is_closed() const noexcept
242  {
243  return closed_flag;
244  }
245 
246  bool is_empty() const noexcept
247  {
248  std::unique_lock write_lock(mutex);
249  return pop_front_position == push_back_position;
250  }
251 
252  bool is_full() const noexcept
253  {
254  std::unique_lock write_lock(mutex);
255  return is_ring_buffer_exhausted(pop_front_position, push_back_position);
256  }
257 
258  size_type size() const noexcept
259  {
260  std::unique_lock write_lock(mutex);
261  if (to_buffer_position(pop_front_position) <= to_buffer_position(push_back_position))
262  {
263  return to_buffer_position(push_back_position) - to_buffer_position(pop_front_position);
264  }
265  else
266  {
267  assert(buffer.size() > (to_buffer_position(pop_front_position) - to_buffer_position(push_back_position)));
268  return buffer.size() - (to_buffer_position(pop_front_position) - to_buffer_position(push_back_position));
269  }
270  }
272 private:
273 
283  constexpr bool is_ring_buffer_exhausted(size_type const from, size_type const to) const
284  {
285  assert(to <= (from + ring_buffer_capacity + 1)); // The tail cannot overwrite the head.
286 
287  return to >= from + ring_buffer_capacity;
288  }
289 
303  constexpr size_type to_buffer_position(size_type const position) const
304  {
305  return position & (ring_buffer_capacity - 1);
306  }
307 
327  size_type cyclic_increment(size_type position)
328  {
329  // invariants:
330  // - ring_buffer_capacity is a power of 2
331  // - (position % ring_buffer_capacity) is in [0, buffer.size())
332  //
333  // return the next greater position that fulfils the invariants
334  if (to_buffer_position(++position) >= buffer.size())
335  position += ring_buffer_capacity - buffer.size(); // If the position reached
336  return position;
337  }
338 
339  template <typename value2_t>
340  requires (std::convertible_to<value2_t, value_t>) &&
341  (buffer_policy == buffer_queue_policy::fixed)
342  bool overflow(value2_t &&)
343  {
344  return false;
345  }
346 
347  template <typename value2_t>
348  requires (std::convertible_to<value2_t, value_t>) &&
349  (buffer_policy == buffer_queue_policy::dynamic)
350  bool overflow(value2_t && value);
351 
353  buffer_t buffer;
356  alignas(std::hardware_destructive_interference_size) std::atomic<size_type> pending_pop_front_position{0};
358  alignas(std::hardware_destructive_interference_size) std::atomic<size_type> pending_push_back_position{0};
360  alignas(std::hardware_destructive_interference_size) std::atomic_bool writer_waiting{false};
361  alignas(std::hardware_destructive_interference_size) bool closed_flag{false};
362 };
363 
364 // Specifies a fixed size buffer queue.
365 template <std::semiregular value_t, sequence_container buffer_t = std::vector<value_t>>
366 using fixed_buffer_queue = buffer_queue<value_t, buffer_t, buffer_queue_policy::fixed>;
367 
368 // Specifies a dynamic size buffer queue (growable).
369 template <std::semiregular value_t, sequence_container buffer_t = std::vector<value_t>>
370 using dynamic_buffer_queue = buffer_queue<value_t, buffer_t, buffer_queue_policy::dynamic>;
371 
372 // ============================================================================
373 // Metafunctions
374 // ============================================================================
375 
376 // ============================================================================
377 // Functions
378 // ============================================================================
379 
380 template <typename value_t, typename buffer_t, buffer_queue_policy buffer_policy>
381 template <typename value2_t>
382  requires (std::convertible_to<value2_t, value_t>) &&
383  (buffer_policy == buffer_queue_policy::dynamic)
384 inline bool buffer_queue<value_t, buffer_t, buffer_policy>::overflow(value2_t && value)
385 {
386  // try to extend capacity
387  std::unique_lock write_lock{mutex};
388 
389  size_type old_size = buffer.size();
390  size_type ring_buffer_capacity = this->ring_buffer_capacity;
391  size_type local_front = this->pop_front_position;
392  size_type local_back = this->push_back_position;
393 
394  // Expects no pending pushes or pops in unique lock.
395  assert(local_back == this->pending_push_back_position);
396  assert(local_front == this->pending_pop_front_position);
397 
398  bool valueWasAppended = false;
399 
400  // did we reach the capacity limit (another thread could have done the upgrade already)?
401  // buffer is full if tail_pos + 1 == head_pos
402  if (is_ring_buffer_exhausted(local_front, cyclic_increment(local_back)))
403  {
404  // In case of a full queue write the value into the additional slot.
405  // Note, that the ring-buffer implementation uses one additional field which is not used except
406  // when overflow happens. This invariant is used, to simply check for the full/empty state of the queue.
407  if (old_size != 0)
408  {
409  auto it = std::ranges::begin(buffer) + to_buffer_position(local_back);
410  *it = std::forward<value2_t>(value);
411  local_back = local_front + ring_buffer_capacity;
412  valueWasAppended = true;
413  }
414 
415  assert(is_ring_buffer_exhausted(local_front, local_back));
416 
417  // get positions of head/tail in current buffer sequence
418  size_type front_buffer_position = to_buffer_position(local_front);
419  size_type back_buffer_position = to_buffer_position(local_back);
420 
421  // increase capacity by one and move all elements from current pop_front_position one to the right.
422  buffer.resize(old_size + 1);
423  ring_buffer_capacity = std::bit_ceil(buffer.size());
424  std::ranges::move_backward(std::span{buffer.data() + front_buffer_position, buffer.data() + old_size},
425  buffer.data() + buffer.size());
426 
427  // Update the pop_front and push_back positions.
428  if (old_size != 0)
429  {
430  this->pending_pop_front_position = this->pop_front_position = front_buffer_position + 1;
431  this->pending_push_back_position = this->push_back_position = back_buffer_position + ring_buffer_capacity;
432  }
433  this->ring_buffer_capacity = ring_buffer_capacity;
434  }
435  return valueWasAppended;
436 }
437 
438 // ----------------------------------------------------------------------------
439 // Function try_pop()
440 // ----------------------------------------------------------------------------
441 
442 /*
443  * @fn ConcurrentQueue#tryPopFront
444  * @headerfile <seqan/parallel.h>
445  * @brief Try to dequeue a value from a queue.
446  *
447  * @signature bool tryPopFront(result, queue[, parallelTag]);
448  *
449  *
450  * @param[in,out] queue A queue.
451  * @param[out] result The dequeued value (if available).
452  * @param[in] parallelTag The concurrency scheme. If multiple threads dequeue values concurrently this tag must be
453  * @link ParallelismTags#Parallel @endlink. The more efficient @link ParallelismTags#Serial
454  * @endlink tag can only be used if one thread calls <tt>popFront</tt> at a time.
455  * Default is @link ParallelismTags#Parallel @endlink.
456  * @return bool Returns <tt>true</tt> if a value could be dequeued and <tt>false</tt> otherwise.
457  */
458 template <typename value_t, typename buffer_t, buffer_queue_policy buffer_policy>
459 inline queue_op_status buffer_queue<value_t, buffer_t, buffer_policy>::try_pop(value_t & result)
460 {
461  // try to extract a value
462  std::shared_lock read_lock{mutex};
463 
464  size_type local_pending_pop_front_position{};
465  size_type next_local_pop_front_position{};
466  detail::spin_delay spinDelay{};
467 
468  local_pending_pop_front_position = this->pending_pop_front_position;
469  // wait for queue to become filled
470  while (true)
471  {
472  size_type local_push_back_position = this->push_back_position;
473 
474  assert(local_pending_pop_front_position <= local_push_back_position);
475 
476  // Check if queue is empty
477  if (local_pending_pop_front_position == local_push_back_position)
478  {
479  return is_closed() ? queue_op_status::closed : queue_op_status::empty;
480  }
481 
482  // Get the next ring-buffer position to read from.
483  next_local_pop_front_position = cyclic_increment(local_pending_pop_front_position);
484  // Did another/other thread(s) already acquired this slot?
485  // If yes, try with next position. If not, break and read from aquired position.
486  if (this->pending_pop_front_position.compare_exchange_weak(local_pending_pop_front_position,
487  next_local_pop_front_position))
488  break;
489 
490  spinDelay.wait();
491  }
492 
493  // Store the value from the aquired read position.
494  result = std::ranges::iter_move(buffer.begin() + to_buffer_position(local_pending_pop_front_position));
495 
496  // wait for pending previous reads and synchronize pop_front_position to local_pending_pop_front_position
497  {
498  detail::spin_delay delay{};
499  size_type acquired_slot = local_pending_pop_front_position;
500  while (!this->pop_front_position.compare_exchange_weak(acquired_slot, next_local_pop_front_position))
501  {
502  acquired_slot = local_pending_pop_front_position;
503  delay.wait(); // add adapting delay in case of high contention.
504  }
505  }
506 
507  return queue_op_status::success;
508 }
509 
510 // ----------------------------------------------------------------------------
511 // Function try_push()
512 // ----------------------------------------------------------------------------
513 
514 /*
515  * @fn ConcurrentQueue#appendValue
516  * @headerfile <seqan/parallel.h>
517  * @brief Enqueue a value to a queue.
518  *
519  * @signature void appendValue(queue, val[, expandTag[, parallelTag]);
520  *
521  *
522  * @param[in,out] queue A queue.
523  * @param[in] val The value to enqueue.
524  * @param[in] expandTag The overflow strategy. If @link OverflowStrategyTags#Generous @endlink the queue will be
525  * automatically resized if the capacity is exceeded, otherwise the thread spinlocks until
526  * the element can be enqueued.
527  * Default is the @link DefaultOverflowImplicit @endlink result for the <tt>queue</tt> type.
528  * @param[in] parallelTag The concurrency scheme. If multiple threads enqueue values concurrently this tag must be
529  * @link ParallelismTags#Parallel @endlink. The more efficient @link ParallelismTags#Serial
530  * @endlink tag can only be used if one thread calls <tt>appendValue</tt> at a time.
531  * Default is @link ParallelismTags#Parallel @endlink.
532  */
533 template <typename value_t, typename buffer_t, buffer_queue_policy buffer_policy>
534 template <typename value2_t>
535  requires std::convertible_to<value2_t, value_t>
536 inline queue_op_status buffer_queue<value_t, buffer_t, buffer_policy>::try_push(value2_t && value)
537 {
538  // try to push the value
539  {
540  detail::spin_delay delay{};
541 
542  std::shared_lock read_lock(mutex);
543 
544  if (is_closed())
545  return queue_op_status::closed;
546 
547  // Current up to date position to push an element to
548  size_type local_pending_push_back_position = this->pending_push_back_position;
549 
550  while (true)
551  {
552  // Get the next potential position to write the value too.
553  size_type next_local_push_back_position = cyclic_increment(local_pending_push_back_position);
554  size_type local_pop_front_position = this->pop_front_position;
555 
556  // Check if there are enough slots to write to.
557  // If not either wait or try to overflow if it is a dynamic queue.
558  if (is_ring_buffer_exhausted(local_pop_front_position, next_local_push_back_position))
559  break;
560 
561  // Did another/other thread(s) acquired the current pending position before this thread
562  // If yes, try again if not, write into acquired slot.
563  if (this->pending_push_back_position.compare_exchange_weak(local_pending_push_back_position,
564  next_local_push_back_position))
565  {
566  // Current thread acquired the local_pending_push_back_position and can now write the value into the
567  // proper slot of the ring buffer.
568  auto it = std::ranges::begin(buffer) + to_buffer_position(local_pending_push_back_position);
569  *it = std::forward<value2_t>(value);
570 
571  // wait for pending previous writes and synchronise push_back_position to
572  // local_pending_push_back_position
573  {
574  detail::spin_delay delay{};
575  // the slot this thread acquired to write to
576  size_type acquired_slot = local_pending_push_back_position;
577  while (!this->push_back_position.compare_exchange_weak(acquired_slot,
578  next_local_push_back_position))
579  {
580  acquired_slot = local_pending_push_back_position;
581  delay.wait();
582  }
583  }
584  return queue_op_status::success;
585  }
586 
587  delay.wait();
588  }
589  }
590 
591  // if possible extend capacity and return.
592  if (overflow(std::forward<value2_t>(value)))
593  {
594  return queue_op_status::success; // always return success, since the queue resizes and cannot be full.
595  }
596 
597  // We could not extend the queue so it must be full.
598  return queue_op_status::full;
599 }
601 } // namespace seqan3::contrib
Adaptations of algorithms from the Ranges TS.
T begin(T... args)
Provides the C++20 <bit> header if it is not already available.
The Concepts library.
Provides various transformation traits used by the range module.
T current_exception(T... args)
T data(T... args)
T empty(T... args)
T fixed(T... args)
constexpr T bit_ceil(T x) noexcept
Calculates the smallest integral power of two that is not smaller than x.
Definition: bit:133
constexpr std::size_t hardware_destructive_interference_size
Minimum offset between two objects to avoid false sharing.
Definition: new:34
constexpr size_t size
The size of a type pack.
Definition: traits.hpp:151
constexpr auto to
A to view.
Definition: to.hpp:30
A more refined container concept than seqan3::container.
Provides C++17/20 additions to the <new> header, if they are not already available.
Adaptations of concepts from the Ranges TS.
T rethrow_exception(T... args)
Provides std::span from the C++20 standard library.
Adaptations of concepts from the standard library.
Provides seqan3::detail::spin_delay.