SeqAn3 3.1.0
The Modern C++ library for sequence analysis.
buffer_queue.hpp
Go to the documentation of this file.
1// -----------------------------------------------------------------------------------------------------
2// Copyright (c) 2006-2021, Knut Reinert & Freie Universität Berlin
3// Copyright (c) 2016-2021, Knut Reinert & MPI für molekulare Genetik
4// This file may be used, modified and/or redistributed under the terms of the 3-clause BSD-License
5// shipped with this file and also available at: https://github.com/seqan/seqan3/blob/master/LICENSE.md
6// -----------------------------------------------------------------------------------------------------
7
13#pragma once
14
15#include <seqan3/std/algorithm>
16#include <atomic>
17#include <seqan3/std/bit>
18#include <cmath>
19#include <seqan3/std/concepts>
20#include <mutex>
21#include <seqan3/std/new>
22#include <seqan3/std/ranges>
23#include <shared_mutex>
24#include <seqan3/std/span>
25#include <type_traits>
26#include <vector>
27
31
32namespace seqan3::contrib
33{
34
36enum class queue_op_status : uint8_t
37{
38 success = 0,
39 empty,
40 full,
41 closed
42};
43
44enum struct buffer_queue_policy : uint8_t
45{
46 fixed,
47 dynamic
48};
49
50// Ringbuffer implementation:
51// The underlying buffer has size (number of actual elements + 1). This is a trick to easily check if the queue is empty
52// or full. Furthermore, the ring buffer uses 4 pointers. The actual push_back and pop_front position as well as
53// the pending push_back and pop_front position. The latter indicates the position that have been advanced concurrently
54// by multiple threads from either end.
55//
56// head: position to read/extract from the queue (first inserted elment) => pop_front_position
57// tail: position where to write to/push new elements to the queue => push_back_position
58// head_read: The actual position after x threads concurrently popped from the queue. => pending_pop_front_position
59// tail_write: The actual position after x threads concurrently pushed to the queue. => pending_push_back_position
60// [ ? ] [ 4 ] [ 3 ] [ 8 ] [ 0 ] [ x ] [ ? ]
61// | ^
62// v |
63// head headRead tail tailWrite
64//
65// valid buffer between [headRead, tail)
66// currently filled [tail, tailWrite)
67// currently removed [head, headRead)
68//
69// State: empty = (head == tail)
70// [ ? ] [ ? ] [ ? ] [ ? ] [ ? ] [ ? ] [ ? ]
71// tail
72// head
73// The head is on the same position as tail.
74// This means that currently no element is in the buffer.
75
76// State: full = (tail + 1 == head)
77// [ 2 ] [ 4 ] [ 3 ] [ ? ] [ 8 ] [ 0 ] [ 7 ]
78// tail
79// head
80// The tail is one position before the head.
81// This means that currently no element can be added to the buffer since it is full.
82// Strategies are to either wait until some elements have been popped or to expand the capacity of the
83// queue by one, inserting the element at the current tail position and moving all elements starting from head one
84// position to the right.
85
86template <std::semiregular value_t,
88 buffer_queue_policy buffer_policy = buffer_queue_policy::dynamic>
89class buffer_queue
90{
91public:
92
93 using buffer_type = buffer_t;
94 using value_type = typename buffer_type::value_type;
95 using size_type = typename buffer_type::size_type;
96 using reference = void;
97 using const_reference = void;
98
99 // Default constructor sets capacity to 1 (still empty)
100 buffer_queue() : buffer_queue{0u}
101 {}
102 buffer_queue(buffer_queue const &) = delete;
103 buffer_queue(buffer_queue &&) = delete;
104 buffer_queue & operator=(buffer_queue const &) = delete;
105 buffer_queue & operator=(buffer_queue &&) = delete;
106 ~buffer_queue() = default;
107
108 // you can set the initial capacity here
109 explicit buffer_queue(size_type const init_capacity)
110 {
111 buffer.resize(init_capacity + 1);
112 ring_buffer_capacity = std::bit_ceil(buffer.size());
113 }
114
115 template <std::ranges::input_range range_type>
116 requires std::convertible_to<std::ranges::range_value_t<range_type>, value_type>
117 buffer_queue(size_type const init_capacity, range_type && r) : buffer_queue{init_capacity}
118 {
119 std::ranges::copy(r, std::ranges::begin(buffer));
120 }
121
125 template <typename value2_t>
126 requires std::convertible_to<value2_t, value_t>
127 void push(value2_t && value)
128 {
129 detail::spin_delay delay{};
130
131 for (;;)
132 {
133 auto status = try_push(std::forward<value2_t>(value));
134 if (status == queue_op_status::closed)
135 throw queue_op_status::closed;
136 else if (status == queue_op_status::success)
137 return;
138
139 assert(status != queue_op_status::empty);
140 assert(status == queue_op_status::full);
141 delay.wait(); // pause and then try again.
142 }
143 } // throws if closed
144
145 template <typename value2_t>
146 requires std::convertible_to<value2_t, value_t>
147 queue_op_status wait_push(value2_t && value)
148 {
149 detail::spin_delay delay{};
150
151 for (;;)
152 {
153 auto status = try_push(std::forward<value2_t>(value));
154 // wait until queue is not full anymore..
155 if (status != queue_op_status::full)
156 return status;
157
158 assert(status != queue_op_status::empty);
159 assert(status == queue_op_status::full);
160 delay.wait(); // pause and then try again.
161 }
162 }
163
164 value_type value_pop() // throws if closed
165 {
166 detail::spin_delay delay{};
167
168 value_type value{};
169 for (;;)
170 {
171 if (!writer_waiting.load())
172 {
173 auto status = try_pop(value);
174
175 if (status == queue_op_status::closed)
176 throw queue_op_status::closed;
177 else if (status == queue_op_status::success)
178 return value;
179
180 assert(status != queue_op_status::full);
181 assert(status == queue_op_status::empty);
182 }
183 delay.wait(); // pause and then try again.
184 }
185 }
186
187 queue_op_status wait_pop(value_type & value)
188 {
189 detail::spin_delay delay{};
190
191 queue_op_status status;
192 for (;;)
193 {
194 if (!writer_waiting.load())
195 {
196 status = try_pop(value);
197
198 if (status == queue_op_status::closed || status == queue_op_status::success)
199 break;
200
201 assert(status != queue_op_status::full);
202 assert(status == queue_op_status::empty);
203 }
204 delay.wait(); // pause and then try again.
205 }
206 return status;
207 }
209
213 template <typename value2_t>
214 requires std::convertible_to<value2_t, value_t>
215 queue_op_status try_push(value2_t &&);
216
217 queue_op_status try_pop(value_t &);
219
223 void close()
224 {
225 if (writer_waiting.exchange(true)) // First writer that closes the queue will continue, the rest returns.
226 return;
227
228 try
229 {
230 std::unique_lock write_lock{mutex};
231 closed_flag = true;
232 writer_waiting.store(false); // reset the lock.
233 }
234 catch (...)
235 {
236 writer_waiting.store(false); // reset the lock.
238 }
239 }
240
241 bool is_closed() const noexcept
242 {
243 return closed_flag;
244 }
245
246 bool is_empty() const noexcept
247 {
248 std::unique_lock write_lock(mutex);
249 return pop_front_position == push_back_position;
250 }
251
252 bool is_full() const noexcept
253 {
254 std::unique_lock write_lock(mutex);
255 return is_ring_buffer_exhausted(pop_front_position, push_back_position);
256 }
257
258 size_type size() const noexcept
259 {
260 std::unique_lock write_lock(mutex);
261 if (to_buffer_position(pop_front_position) <= to_buffer_position(push_back_position))
262 {
263 return to_buffer_position(push_back_position) - to_buffer_position(pop_front_position);
264 }
265 else
266 {
267 assert(buffer.size() > (to_buffer_position(pop_front_position) - to_buffer_position(push_back_position)));
268 return buffer.size() - (to_buffer_position(pop_front_position) - to_buffer_position(push_back_position));
269 }
270 }
272private:
273
283 constexpr bool is_ring_buffer_exhausted(size_type const from, size_type const to) const
284 {
285 assert(to <= (from + ring_buffer_capacity + 1)); // The tail cannot overwrite the head.
286
287 return to >= from + ring_buffer_capacity;
288 }
289
303 constexpr size_type to_buffer_position(size_type const position) const
304 {
305 return position & (ring_buffer_capacity - 1);
306 }
307
327 size_type cyclic_increment(size_type position)
328 {
329 // invariants:
330 // - ring_buffer_capacity is a power of 2
331 // - (position % ring_buffer_capacity) is in [0, buffer.size())
332 //
333 // return the next greater position that fulfils the invariants
334 if (to_buffer_position(++position) >= buffer.size())
335 position += ring_buffer_capacity - buffer.size(); // If the position reached
336 return position;
337 }
338
339 template <typename value2_t>
340 requires (std::convertible_to<value2_t, value_t>) &&
341 (buffer_policy == buffer_queue_policy::fixed)
342 bool overflow(value2_t &&)
343 {
344 return false;
345 }
346
347 template <typename value2_t>
348 requires (std::convertible_to<value2_t, value_t>) &&
349 (buffer_policy == buffer_queue_policy::dynamic)
350 bool overflow(value2_t && value);
351
353 buffer_t buffer;
356 alignas(std::hardware_destructive_interference_size) std::atomic<size_type> pending_pop_front_position{0};
358 alignas(std::hardware_destructive_interference_size) std::atomic<size_type> pending_push_back_position{0};
361 alignas(std::hardware_destructive_interference_size) bool closed_flag{false};
362};
363
364// Specifies a fixed size buffer queue.
365template <std::semiregular value_t, sequence_container buffer_t = std::vector<value_t>>
366using fixed_buffer_queue = buffer_queue<value_t, buffer_t, buffer_queue_policy::fixed>;
367
368// Specifies a dynamic size buffer queue (growable).
369template <std::semiregular value_t, sequence_container buffer_t = std::vector<value_t>>
370using dynamic_buffer_queue = buffer_queue<value_t, buffer_t, buffer_queue_policy::dynamic>;
371
372// ============================================================================
373// Metafunctions
374// ============================================================================
375
376// ============================================================================
377// Functions
378// ============================================================================
379
380template <typename value_t, typename buffer_t, buffer_queue_policy buffer_policy>
381template <typename value2_t>
382 requires (std::convertible_to<value2_t, value_t>) &&
383 (buffer_policy == buffer_queue_policy::dynamic)
384inline bool buffer_queue<value_t, buffer_t, buffer_policy>::overflow(value2_t && value)
385{
386 // try to extend capacity
387 std::unique_lock write_lock{mutex};
388
389 size_type old_size = buffer.size();
390 size_type ring_buffer_capacity = this->ring_buffer_capacity;
391 size_type local_front = this->pop_front_position;
392 size_type local_back = this->push_back_position;
393
394 // Expects no pending pushes or pops in unique lock.
395 assert(local_back == this->pending_push_back_position);
396 assert(local_front == this->pending_pop_front_position);
397
398 bool valueWasAppended = false;
399
400 // did we reach the capacity limit (another thread could have done the upgrade already)?
401 // buffer is full if tail_pos + 1 == head_pos
402 if (is_ring_buffer_exhausted(local_front, cyclic_increment(local_back)))
403 {
404 // In case of a full queue write the value into the additional slot.
405 // Note, that the ring-buffer implementation uses one additional field which is not used except
406 // when overflow happens. This invariant is used, to simply check for the full/empty state of the queue.
407 if (old_size != 0)
408 {
409 auto it = std::ranges::begin(buffer) + to_buffer_position(local_back);
410 *it = std::forward<value2_t>(value);
411 local_back = local_front + ring_buffer_capacity;
412 valueWasAppended = true;
413 }
414
415 assert(is_ring_buffer_exhausted(local_front, local_back));
416
417 // get positions of head/tail in current buffer sequence
418 size_type front_buffer_position = to_buffer_position(local_front);
419 size_type back_buffer_position = to_buffer_position(local_back);
420
421 // increase capacity by one and move all elements from current pop_front_position one to the right.
422 buffer.resize(old_size + 1);
423 ring_buffer_capacity = std::bit_ceil(buffer.size());
424 std::ranges::move_backward(std::span{buffer.data() + front_buffer_position, buffer.data() + old_size},
425 buffer.data() + buffer.size());
426
427 // Update the pop_front and push_back positions.
428 if (old_size != 0)
429 {
430 this->pending_pop_front_position = this->pop_front_position = front_buffer_position + 1;
431 this->pending_push_back_position = this->push_back_position = back_buffer_position + ring_buffer_capacity;
432 }
433 this->ring_buffer_capacity = ring_buffer_capacity;
434 }
435 return valueWasAppended;
436}
437
438// ----------------------------------------------------------------------------
439// Function try_pop()
440// ----------------------------------------------------------------------------
441
442/*
443 * @fn ConcurrentQueue#tryPopFront
444 * @headerfile <seqan/parallel.h>
445 * @brief Try to dequeue a value from a queue.
446 *
447 * @signature bool tryPopFront(result, queue[, parallelTag]);
448 *
449 *
450 * @param[in,out] queue A queue.
451 * @param[out] result The dequeued value (if available).
452 * @param[in] parallelTag The concurrency scheme. If multiple threads dequeue values concurrently this tag must be
453 * @link ParallelismTags#Parallel @endlink. The more efficient @link ParallelismTags#Serial
454 * @endlink tag can only be used if one thread calls <tt>popFront</tt> at a time.
455 * Default is @link ParallelismTags#Parallel @endlink.
456 * @return bool Returns <tt>true</tt> if a value could be dequeued and <tt>false</tt> otherwise.
457 */
458template <typename value_t, typename buffer_t, buffer_queue_policy buffer_policy>
459inline queue_op_status buffer_queue<value_t, buffer_t, buffer_policy>::try_pop(value_t & result)
460{
461 // try to extract a value
462 std::shared_lock read_lock{mutex};
463
464 size_type local_pending_pop_front_position{};
465 size_type next_local_pop_front_position{};
466 detail::spin_delay spinDelay{};
467
468 local_pending_pop_front_position = this->pending_pop_front_position;
469 // wait for queue to become filled
470 while (true)
471 {
472 size_type local_push_back_position = this->push_back_position;
473
474 assert(local_pending_pop_front_position <= local_push_back_position);
475
476 // Check if queue is empty
477 if (local_pending_pop_front_position == local_push_back_position)
478 {
479 return is_closed() ? queue_op_status::closed : queue_op_status::empty;
480 }
481
482 // Get the next ring-buffer position to read from.
483 next_local_pop_front_position = cyclic_increment(local_pending_pop_front_position);
484 // Did another/other thread(s) already acquired this slot?
485 // If yes, try with next position. If not, break and read from aquired position.
486 if (this->pending_pop_front_position.compare_exchange_weak(local_pending_pop_front_position,
487 next_local_pop_front_position))
488 break;
489
490 spinDelay.wait();
491 }
492
493 // Store the value from the aquired read position.
494 result = std::ranges::iter_move(buffer.begin() + to_buffer_position(local_pending_pop_front_position));
495
496 // wait for pending previous reads and synchronize pop_front_position to local_pending_pop_front_position
497 {
498 detail::spin_delay delay{};
499 size_type acquired_slot = local_pending_pop_front_position;
500 while (!this->pop_front_position.compare_exchange_weak(acquired_slot, next_local_pop_front_position))
501 {
502 acquired_slot = local_pending_pop_front_position;
503 delay.wait(); // add adapting delay in case of high contention.
504 }
505 }
506
507 return queue_op_status::success;
508}
509
510// ----------------------------------------------------------------------------
511// Function try_push()
512// ----------------------------------------------------------------------------
513
514/*
515 * @fn ConcurrentQueue#appendValue
516 * @headerfile <seqan/parallel.h>
517 * @brief Enqueue a value to a queue.
518 *
519 * @signature void appendValue(queue, val[, expandTag[, parallelTag]);
520 *
521 *
522 * @param[in,out] queue A queue.
523 * @param[in] val The value to enqueue.
524 * @param[in] expandTag The overflow strategy. If @link OverflowStrategyTags#Generous @endlink the queue will be
525 * automatically resized if the capacity is exceeded, otherwise the thread spinlocks until
526 * the element can be enqueued.
527 * Default is the @link DefaultOverflowImplicit @endlink result for the <tt>queue</tt> type.
528 * @param[in] parallelTag The concurrency scheme. If multiple threads enqueue values concurrently this tag must be
529 * @link ParallelismTags#Parallel @endlink. The more efficient @link ParallelismTags#Serial
530 * @endlink tag can only be used if one thread calls <tt>appendValue</tt> at a time.
531 * Default is @link ParallelismTags#Parallel @endlink.
532 */
533template <typename value_t, typename buffer_t, buffer_queue_policy buffer_policy>
534template <typename value2_t>
535 requires std::convertible_to<value2_t, value_t>
536inline queue_op_status buffer_queue<value_t, buffer_t, buffer_policy>::try_push(value2_t && value)
537{
538 // try to push the value
539 {
540 detail::spin_delay delay{};
541
542 std::shared_lock read_lock(mutex);
543
544 if (is_closed())
545 return queue_op_status::closed;
546
547 // Current up to date position to push an element to
548 size_type local_pending_push_back_position = this->pending_push_back_position;
549
550 while (true)
551 {
552 // Get the next potential position to write the value too.
553 size_type next_local_push_back_position = cyclic_increment(local_pending_push_back_position);
554 size_type local_pop_front_position = this->pop_front_position;
555
556 // Check if there are enough slots to write to.
557 // If not either wait or try to overflow if it is a dynamic queue.
558 if (is_ring_buffer_exhausted(local_pop_front_position, next_local_push_back_position))
559 break;
560
561 // Did another/other thread(s) acquired the current pending position before this thread
562 // If yes, try again if not, write into acquired slot.
563 if (this->pending_push_back_position.compare_exchange_weak(local_pending_push_back_position,
564 next_local_push_back_position))
565 {
566 // Current thread acquired the local_pending_push_back_position and can now write the value into the
567 // proper slot of the ring buffer.
568 auto it = std::ranges::begin(buffer) + to_buffer_position(local_pending_push_back_position);
569 *it = std::forward<value2_t>(value);
570
571 // wait for pending previous writes and synchronise push_back_position to
572 // local_pending_push_back_position
573 {
574 detail::spin_delay delay{};
575 // the slot this thread acquired to write to
576 size_type acquired_slot = local_pending_push_back_position;
577 while (!this->push_back_position.compare_exchange_weak(acquired_slot,
578 next_local_push_back_position))
579 {
580 acquired_slot = local_pending_push_back_position;
581 delay.wait();
582 }
583 }
584 return queue_op_status::success;
585 }
586
587 delay.wait();
588 }
589 }
590
591 // if possible extend capacity and return.
592 if (overflow(std::forward<value2_t>(value)))
593 {
594 return queue_op_status::success; // always return success, since the queue resizes and cannot be full.
595 }
596
597 // We could not extend the queue so it must be full.
598 return queue_op_status::full;
599}
601} // namespace seqan3::contrib
The <algorithm> header from C++20's standard library.
T begin(T... args)
The <bit> header from C++20's standard library.
The <concepts> header from C++20's standard library.
Provides various transformation traits used by the range module.
T current_exception(T... args)
T data(T... args)
T empty(T... args)
T fixed(T... args)
constexpr T bit_ceil(T x) noexcept
Calculates the smallest integral power of two that is not smaller than x.
Definition: bit:162
constexpr std::size_t hardware_destructive_interference_size
Minimum offset between two objects to avoid false sharing.
Definition: new:32
constexpr size_t size
The size of a type pack.
Definition: traits.hpp:151
constexpr auto to
A to view.
Definition: to.hpp:30
A more refined container concept than seqan3::container.
The <new> header from C++17's standard library.
The <ranges> header from C++20's standard library.
T rethrow_exception(T... args)
Provides seqan3::detail::spin_delay.
Provides std::span from the C++20 standard library.
Adaptations of concepts from the standard library.