SeqAn3  3.0.2
The Modern C++ library for sequence analysis.
buffer_queue.hpp
Go to the documentation of this file.
1 // -----------------------------------------------------------------------------------------------------
2 // Copyright (c) 2006-2020, Knut Reinert & Freie Universität Berlin
3 // Copyright (c) 2016-2020, Knut Reinert & MPI für molekulare Genetik
4 // This file may be used, modified and/or redistributed under the terms of the 3-clause BSD-License
5 // shipped with this file and also available at: https://github.com/seqan/seqan3/blob/master/LICENSE.md
6 // -----------------------------------------------------------------------------------------------------
7 
13 #pragma once
14 
15 #include <atomic>
16 #include <cmath>
17 #include <mutex>
18 #include <new>
19 #include <shared_mutex>
20 #include <type_traits>
21 #include <vector>
22 
27 #include <seqan3/std/algorithm>
28 #include <seqan3/std/concepts>
29 #include <seqan3/std/new>
30 #include <seqan3/std/ranges>
31 #include <seqan3/std/span>
32 
33 namespace seqan3::contrib
34 {
35 
37 enum class queue_op_status : uint8_t
38 {
39  success = 0,
40  empty,
41  full,
42  closed
43 };
44 
45 enum struct buffer_queue_policy : uint8_t
46 {
47  fixed,
48  dynamic
49 };
50 
51 // Ringbuffer implementation:
52 // The underlying buffer has size (number of actual elements + 1). This is a trick to easily check if the queue is empty
53 // or full. Furthermore, the ring buffer uses 4 pointers. The actual push_back and pop_front position as well as
54 // the pending push_back and pop_front position. The latter indicates the position that have been advanced concurrently
55 // by multiple threads from either end.
56 //
57 // head: position to read/extract from the queue (first inserted elment) => pop_front_position
58 // tail: position where to write to/push new elements to the queue => push_back_position
59 // head_read: The actual position after x threads concurrently popped from the queue. => pending_pop_front_position
60 // tail_write: The actual position after x threads concurrently pushed to the queue. => pending_push_back_position
61 // [ ? ] [ 4 ] [ 3 ] [ 8 ] [ 0 ] [ x ] [ ? ]
62 // | ^
63 // v |
64 // head headRead tail tailWrite
65 //
66 // valid buffer between [headRead, tail)
67 // currently filled [tail, tailWrite)
68 // currently removed [head, headRead)
69 //
70 // State: empty = (head == tail)
71 // [ ? ] [ ? ] [ ? ] [ ? ] [ ? ] [ ? ] [ ? ]
72 // tail
73 // head
74 // The head is on the same position as tail.
75 // This means that currently no element is in the buffer.
76 
77 // State: full = (tail + 1 == head)
78 // [ 2 ] [ 4 ] [ 3 ] [ ? ] [ 8 ] [ 0 ] [ 7 ]
79 // tail
80 // head
81 // The tail is one position before the head.
82 // This means that currently no element can be added to the buffer since it is full.
83 // Strategies are to either wait until some elements have been popped or to expand the capacity of the
84 // queue by one, inserting the element at the current tail position and moving all elements starting from head one
85 // position to the right.
86 
87 template <std::semiregular value_t,
89  buffer_queue_policy buffer_policy = buffer_queue_policy::dynamic>
90 class buffer_queue
91 {
92 public:
93 
94  using buffer_type = buffer_t;
95  using value_type = typename buffer_type::value_type;
96  using size_type = typename buffer_type::size_type;
97  using reference = void;
98  using const_reference = void;
99 
100  // Default constructor sets capacity to 1 (still empty)
101  buffer_queue() : buffer_queue{0u}
102  {}
103  buffer_queue(buffer_queue const &) = delete;
104  buffer_queue(buffer_queue &&) = delete;
105  buffer_queue & operator=(buffer_queue const &) = delete;
106  buffer_queue & operator=(buffer_queue &&) = delete;
107  ~buffer_queue() = default;
108 
109  // you can set the initial capacity here
110  explicit buffer_queue(size_type const init_capacity)
111  {
112  buffer.resize(init_capacity + 1);
113  ring_buffer_capacity = seqan3::detail::next_power_of_two(buffer.size());
114  }
115 
116  template <std::ranges::input_range range_type>
117  requires std::convertible_to<std::ranges::range_value_t<range_type>, value_type>
118  buffer_queue(size_type const init_capacity, range_type && r) : buffer_queue{init_capacity}
119  {
120  std::ranges::copy(r, std::ranges::begin(buffer));
121  }
122 
126  template <typename value2_t>
127  requires std::convertible_to<value2_t, value_t>
128  void push(value2_t && value)
129  {
130  detail::spin_delay delay{};
131 
132  for (;;)
133  {
134  auto status = try_push(std::forward<value2_t>(value));
135  if (status == queue_op_status::closed)
136  throw queue_op_status::closed;
137  else if (status == queue_op_status::success)
138  return;
139 
140  assert(status != queue_op_status::empty);
141  assert(status == queue_op_status::full);
142  delay.wait(); // pause and then try again.
143  }
144  } // throws if closed
145 
146  template <typename value2_t>
147  requires std::convertible_to<value2_t, value_t>
148  queue_op_status wait_push(value2_t && value)
149  {
150  detail::spin_delay delay{};
151 
152  for (;;)
153  {
154  auto status = try_push(std::forward<value2_t>(value));
155  // wait until queue is not full anymore..
156  if (status != queue_op_status::full)
157  return status;
158 
159  assert(status != queue_op_status::empty);
160  assert(status == queue_op_status::full);
161  delay.wait(); // pause and then try again.
162  }
163  }
164 
165  value_type value_pop() // throws if closed
166  {
167  detail::spin_delay delay{};
168 
169  value_type value{};
170  for (;;)
171  {
172  auto status = try_pop(value);
173 
174  if (status == queue_op_status::closed)
175  throw queue_op_status::closed;
176  else if (status == queue_op_status::success)
177  return value;
178 
179  assert(status != queue_op_status::full);
180  assert(status == queue_op_status::empty);
181  delay.wait(); // pause and then try again.
182  }
183  }
184 
185  queue_op_status wait_pop(value_type & value)
186  {
187  detail::spin_delay delay{};
188 
189  queue_op_status status;
190  for (;;)
191  {
192  status = try_pop(value);
193 
194  if (status == queue_op_status::closed || status == queue_op_status::success)
195  break;
196 
197  assert(status != queue_op_status::full);
198  assert(status == queue_op_status::empty);
199  delay.wait(); // pause and then try again.
200  }
201  return status;
202  }
204 
208  template <typename value2_t>
209  requires std::convertible_to<value2_t, value_t>
210  queue_op_status try_push(value2_t &&);
211 
212  queue_op_status try_pop(value_t &);
214 
218  void close() noexcept
219  {
220  closed_flag.store(true, std::memory_order_release);
221  }
222 
223  bool is_closed() const noexcept
224  {
225  return closed_flag.load(std::memory_order_acquire);
226  }
227 
228  bool is_empty() const noexcept
229  {
230  std::unique_lock write_lock(mutex);
231  return pop_front_position == push_back_position;
232  }
233 
234  bool is_full() const noexcept
235  {
236  std::unique_lock write_lock(mutex);
237  return is_ring_buffer_exhausted(pop_front_position, push_back_position);
238  }
239 
240  size_type size() const noexcept
241  {
242  std::unique_lock write_lock(mutex);
243  if (to_buffer_position(pop_front_position) <= to_buffer_position(push_back_position))
244  {
245  return to_buffer_position(push_back_position) - to_buffer_position(pop_front_position);
246  }
247  else
248  {
249  assert(buffer.size() > (to_buffer_position(pop_front_position) - to_buffer_position(push_back_position)));
250  return buffer.size() - (to_buffer_position(pop_front_position) - to_buffer_position(push_back_position));
251  }
252  }
254 private:
255 
265  constexpr bool is_ring_buffer_exhausted(size_type const from, size_type const to) const
266  {
267  assert(to <= (from + ring_buffer_capacity + 1)); // The tail cannot overwrite the head.
268 
269  return to >= from + ring_buffer_capacity;
270  }
271 
285  constexpr size_type to_buffer_position(size_type const position) const
286  {
287  return position & (ring_buffer_capacity - 1);
288  }
289 
309  size_type cyclic_increment(size_type position)
310  {
311  // invariants:
312  // - ring_buffer_capacity is a power of 2
313  // - (position % ring_buffer_capacity) is in [0, buffer.size())
314  //
315  // return the next greater position that fulfils the invariants
316  if (to_buffer_position(++position) >= buffer.size())
317  position += ring_buffer_capacity - buffer.size(); // If the position reached
318  return position;
319  }
320 
321  template <typename value2_t>
322  requires (std::convertible_to<value2_t, value_t>) &&
323  (buffer_policy == buffer_queue_policy::fixed)
324  bool overflow(value2_t &&)
325  {
326  return false;
327  }
328 
329  template <typename value2_t>
330  requires (std::convertible_to<value2_t, value_t>) &&
331  (buffer_policy == buffer_queue_policy::dynamic)
332  bool overflow(value2_t && value);
333 
335  buffer_t buffer;
338  alignas(std::hardware_destructive_interference_size) std::atomic<size_type> pending_pop_front_position{0};
340  alignas(std::hardware_destructive_interference_size) std::atomic<size_type> pending_push_back_position{0};
343 };
344 
345 // Specifies a fixed size buffer queue.
346 template <std::semiregular value_t, sequence_container buffer_t = std::vector<value_t>>
347 using fixed_buffer_queue = buffer_queue<value_t, buffer_t, buffer_queue_policy::fixed>;
348 
349 // Specifies a dynamic size buffer queue (growable).
350 template <std::semiregular value_t, sequence_container buffer_t = std::vector<value_t>>
351 using dynamic_buffer_queue = buffer_queue<value_t, buffer_t, buffer_queue_policy::dynamic>;
352 
353 // ============================================================================
354 // Metafunctions
355 // ============================================================================
356 
357 // ============================================================================
358 // Functions
359 // ============================================================================
360 
361 template <typename value_t, typename buffer_t, buffer_queue_policy buffer_policy>
362 template <typename value2_t>
363  requires (std::convertible_to<value2_t, value_t>) &&
364  (buffer_policy == buffer_queue_policy::dynamic)
365 inline bool buffer_queue<value_t, buffer_t, buffer_policy>::overflow(value2_t && value)
366 {
367  // try to extend capacity
368  std::unique_lock write_lock{mutex};
369 
370  size_type old_size = buffer.size();
371  size_type ring_buffer_capacity = this->ring_buffer_capacity;
372  size_type local_front = this->pop_front_position;
373  size_type local_back = this->push_back_position;
374 
375  // Expects no pending pushes or pops in unique lock.
376  assert(local_back == this->pending_push_back_position);
377  assert(local_front == this->pending_pop_front_position);
378 
379  bool valueWasAppended = false;
380 
381  // did we reach the capacity limit (another thread could have done the upgrade already)?
382  // buffer is full if tail_pos + 1 == head_pos
383  if (is_ring_buffer_exhausted(local_front, cyclic_increment(local_back)))
384  {
385  // In case of a full queue write the value into the additional slot.
386  // Note, that the ring-buffer implementation uses one additional field which is not used except
387  // when overflow happens. This invariant is used, to simply check for the full/empty state of the queue.
388  if (old_size != 0)
389  {
390  auto it = std::ranges::begin(buffer) + to_buffer_position(local_back);
391  *it = std::forward<value2_t>(value);
392  local_back = local_front + ring_buffer_capacity;
393  valueWasAppended = true;
394  }
395 
396  assert(is_ring_buffer_exhausted(local_front, local_back));
397 
398  // get positions of head/tail in current buffer sequence
399  size_type front_buffer_position = to_buffer_position(local_front);
400  size_type back_buffer_position = to_buffer_position(local_back);
401 
402  // increase capacity by one and move all elements from current pop_front_position one to the right.
403  buffer.resize(old_size + 1);
404  ring_buffer_capacity = seqan3::detail::next_power_of_two(buffer.size());
405  std::ranges::move_backward(std::span{buffer.data() + front_buffer_position, buffer.data() + old_size},
406  buffer.data() + buffer.size());
407 
408  // Update the pop_front and push_back positions.
409  if (old_size != 0)
410  {
411  this->pending_pop_front_position = this->pop_front_position = front_buffer_position + 1;
412  this->pending_push_back_position = this->push_back_position = back_buffer_position + ring_buffer_capacity;
413  }
414  this->ring_buffer_capacity = ring_buffer_capacity;
415  }
416  return valueWasAppended;
417 }
418 
419 // ----------------------------------------------------------------------------
420 // Function try_pop()
421 // ----------------------------------------------------------------------------
422 
423 /*
424  * @fn ConcurrentQueue#tryPopFront
425  * @headerfile <seqan/parallel.h>
426  * @brief Try to dequeue a value from a queue.
427  *
428  * @signature bool tryPopFront(result, queue[, parallelTag]);
429  *
430  *
431  * @param[in,out] queue A queue.
432  * @param[out] result The dequeued value (if available).
433  * @param[in] parallelTag The concurrency scheme. If multiple threads dequeue values concurrently this tag must be
434  * @link ParallelismTags#Parallel @endlink. The more efficient @link ParallelismTags#Serial
435  * @endlink tag can only be used if one thread calls <tt>popFront</tt> at a time.
436  * Default is @link ParallelismTags#Parallel @endlink.
437  * @return bool Returns <tt>true</tt> if a value could be dequeued and <tt>false</tt> otherwise.
438  */
439 template <typename value_t, typename buffer_t, buffer_queue_policy buffer_policy>
440 inline queue_op_status buffer_queue<value_t, buffer_t, buffer_policy>::try_pop(value_t & result)
441 {
442  // try to extract a value
443  std::shared_lock read_lock{mutex};
444 
445  size_type local_pending_pop_front_position{};
446  size_type next_local_pop_front_position{};
447  detail::spin_delay spinDelay{};
448 
449  local_pending_pop_front_position = this->pending_pop_front_position;
450  // wait for queue to become filled
451  while (true)
452  {
453  size_type local_push_back_position = this->push_back_position;
454 
455  assert(local_pending_pop_front_position <= local_push_back_position);
456 
457  // Check if queue is empty
458  if (local_pending_pop_front_position == local_push_back_position)
459  {
460  return is_closed() ? queue_op_status::closed : queue_op_status::empty;
461  }
462 
463  // Get the next ring-buffer position to read from.
464  next_local_pop_front_position = cyclic_increment(local_pending_pop_front_position);
465  // Did another/other thread(s) already acquired this slot?
466  // If yes, try with next position. If not, break and read from aquired position.
467  if (this->pending_pop_front_position.compare_exchange_weak(local_pending_pop_front_position,
468  next_local_pop_front_position))
469  break;
470 
471  spinDelay.wait();
472  }
473 
474  // Store the value from the aquired read position.
475  result = std::ranges::iter_move(buffer.begin() + to_buffer_position(local_pending_pop_front_position));
476 
477  // wait for pending previous reads and synchronize pop_front_position to local_pending_pop_front_position
478  {
479  detail::spin_delay delay{};
480  size_type acquired_slot = local_pending_pop_front_position;
481  while (!this->pop_front_position.compare_exchange_weak(acquired_slot, next_local_pop_front_position))
482  {
483  acquired_slot = local_pending_pop_front_position;
484  delay.wait(); // add adapting delay in case of high contention.
485  }
486  }
487 
488  return queue_op_status::success;
489 }
490 
491 // ----------------------------------------------------------------------------
492 // Function try_push()
493 // ----------------------------------------------------------------------------
494 
495 /*
496  * @fn ConcurrentQueue#appendValue
497  * @headerfile <seqan/parallel.h>
498  * @brief Enqueue a value to a queue.
499  *
500  * @signature void appendValue(queue, val[, expandTag[, parallelTag]);
501  *
502  *
503  * @param[in,out] queue A queue.
504  * @param[in] val The value to enqueue.
505  * @param[in] expandTag The overflow strategy. If @link OverflowStrategyTags#Generous @endlink the queue will be
506  * automatically resized if the capacity is exceeded, otherwise the thread spinlocks until
507  * the element can be enqueued.
508  * Default is the @link DefaultOverflowImplicit @endlink result for the <tt>queue</tt> type.
509  * @param[in] parallelTag The concurrency scheme. If multiple threads enqueue values concurrently this tag must be
510  * @link ParallelismTags#Parallel @endlink. The more efficient @link ParallelismTags#Serial
511  * @endlink tag can only be used if one thread calls <tt>appendValue</tt> at a time.
512  * Default is @link ParallelismTags#Parallel @endlink.
513  */
514 template <typename value_t, typename buffer_t, buffer_queue_policy buffer_policy>
515 template <typename value2_t>
516  requires std::convertible_to<value2_t, value_t>
517 inline queue_op_status buffer_queue<value_t, buffer_t, buffer_policy>::try_push(value2_t && value)
518 {
519  // try to push the value
520  {
521  detail::spin_delay delay{};
522 
523  std::shared_lock read_lock(mutex);
524 
525  if (is_closed())
526  return queue_op_status::closed;
527 
528  // Current up to date position to push an element to
529  size_type local_pending_push_back_position = this->pending_push_back_position;
530 
531  while (true)
532  {
533  // Get the next potential position to write the value too.
534  size_type next_local_push_back_position = cyclic_increment(local_pending_push_back_position);
535  size_type local_pop_front_position = this->pop_front_position;
536 
537  // Check if there are enough slots to write to.
538  // If not either wait or try to overflow if it is a dynamic queue.
539  if (is_ring_buffer_exhausted(local_pop_front_position, next_local_push_back_position))
540  break;
541 
542  // Did another/other thread(s) acquired the current pending position before this thread
543  // If yes, try again if not, write into acquired slot.
544  if (this->pending_push_back_position.compare_exchange_weak(local_pending_push_back_position,
545  next_local_push_back_position))
546  {
547  // Current thread acquired the local_pending_push_back_position and can now write the value into the
548  // proper slot of the ring buffer.
549  auto it = std::ranges::begin(buffer) + to_buffer_position(local_pending_push_back_position);
550  *it = std::forward<value2_t>(value);
551 
552  // wait for pending previous writes and synchronise push_back_position to
553  // local_pending_push_back_position
554  {
555  detail::spin_delay delay{};
556  // the slot this thread acquired to write to
557  size_type acquired_slot = local_pending_push_back_position;
558  while (!this->push_back_position.compare_exchange_weak(acquired_slot,
559  next_local_push_back_position))
560  {
561  acquired_slot = local_pending_push_back_position;
562  delay.wait();
563  }
564  }
565  return queue_op_status::success;
566  }
567 
568  delay.wait();
569  }
570  }
571 
572  // if possible extend capacity and return.
573  if (overflow(std::forward<value2_t>(value)))
574  {
575  return queue_op_status::success; // always return success, since the queue resizes and cannot be full.
576  }
577 
578  // We could not extend the queue so it must be full.
579  return queue_op_status::full;
580 }
582 } // namespace seqan3::contrib
bit_manipulation.hpp
Provides utility functions for bit twiddling.
span
Provides std::span from the C++20 standard library.
sequence_container
A more refined container concept than seqan3::container.
vector
std::hardware_destructive_interference_size
constexpr std::size_t hardware_destructive_interference_size
Minimum offset between two objects to avoid false sharing.
Definition: new:34
concept.hpp
Adaptations of concepts from the standard library.
new
cmath
algorithm
Adaptations of algorithms from the Ranges TS.
concepts
The Concepts library.
range.hpp
Provides various transformation traits used by the range module.
std::unique_lock
atomic
std::experimental::filesystem::status
T status(T... args)
seqan3::pack_traits::size
constexpr size_t size
The size of a type pack.
Definition: traits.hpp:116
ranges
Adaptations of concepts from the Ranges TS.
std::ranges::begin
T begin(T... args)
spin_delay.hpp
Provides seqan3::detail::spin_delay.
std::experimental::filesystem::is_empty
T is_empty(T... args)
std::fixed
T fixed(T... args)
std::empty
T empty(T... args)
mutex
shared_mutex
std::span::data
T data(T... args)
std::shared_lock