SeqAn3 3.3.0
The Modern C++ library for sequence analysis.
All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Modules Pages
buffer_queue.hpp
Go to the documentation of this file.
1// -----------------------------------------------------------------------------------------------------
2// Copyright (c) 2006-2023, Knut Reinert & Freie Universität Berlin
3// Copyright (c) 2016-2023, Knut Reinert & MPI für molekulare Genetik
4// This file may be used, modified and/or redistributed under the terms of the 3-clause BSD-License
5// shipped with this file and also available at: https://github.com/seqan/seqan3/blob/master/LICENSE.md
6// -----------------------------------------------------------------------------------------------------
7
13#pragma once
14
15#include <algorithm>
16#include <atomic>
17#include <bit>
18#include <cassert>
19#include <cmath>
20#include <concepts>
21#include <mutex>
22#include <ranges>
23#include <seqan3/std/new>
24#include <shared_mutex>
25#include <span>
26#include <type_traits>
27#include <vector>
28
32
33namespace seqan3::contrib
34{
35
37enum class queue_op_status : uint8_t
38{
39 success = 0,
40 empty,
41 full,
42 closed
43};
44
45enum struct buffer_queue_policy : uint8_t
46{
47 fixed,
48 dynamic
49};
50
51// Ringbuffer implementation:
52// The underlying buffer has size (number of actual elements + 1). This is a trick to easily check if the queue is empty
53// or full. Furthermore, the ring buffer uses 4 pointers. The actual push_back and pop_front position as well as
54// the pending push_back and pop_front position. The latter indicates the position that have been advanced concurrently
55// by multiple threads from either end.
56//
57// head: position to read/extract from the queue (first inserted elment) => pop_front_position
58// tail: position where to write to/push new elements to the queue => push_back_position
59// head_read: The actual position after x threads concurrently popped from the queue. => pending_pop_front_position
60// tail_write: The actual position after x threads concurrently pushed to the queue. => pending_push_back_position
61// [ ? ] [ 4 ] [ 3 ] [ 8 ] [ 0 ] [ x ] [ ? ]
62// | ^
63// v |
64// head headRead tail tailWrite
65//
66// valid buffer between [headRead, tail)
67// currently filled [tail, tailWrite)
68// currently removed [head, headRead)
69//
70// State: empty = (head == tail)
71// [ ? ] [ ? ] [ ? ] [ ? ] [ ? ] [ ? ] [ ? ]
72// tail
73// head
74// The head is on the same position as tail.
75// This means that currently no element is in the buffer.
76
77// State: full = (tail + 1 == head)
78// [ 2 ] [ 4 ] [ 3 ] [ ? ] [ 8 ] [ 0 ] [ 7 ]
79// tail
80// head
81// The tail is one position before the head.
82// This means that currently no element can be added to the buffer since it is full.
83// Strategies are to either wait until some elements have been popped or to expand the capacity of the
84// queue by one, inserting the element at the current tail position and moving all elements starting from head one
85// position to the right.
86
87template <std::semiregular value_t,
89 buffer_queue_policy buffer_policy = buffer_queue_policy::dynamic>
90class buffer_queue
91{
92public:
93 using buffer_type = buffer_t;
94 using value_type = typename buffer_type::value_type;
95 using size_type = typename buffer_type::size_type;
96 using reference = void;
97 using const_reference = void;
98
99 // Default constructor sets capacity to 1 (still empty)
100 buffer_queue() : buffer_queue{0u}
101 {}
102 buffer_queue(buffer_queue const &) = delete;
103 buffer_queue(buffer_queue &&) = delete;
104 buffer_queue & operator=(buffer_queue const &) = delete;
105 buffer_queue & operator=(buffer_queue &&) = delete;
106 ~buffer_queue() = default;
107
108 // you can set the initial capacity here
109 explicit buffer_queue(size_type const init_capacity)
110 {
111 buffer.resize(init_capacity + 1);
112 ring_buffer_capacity = std::bit_ceil(buffer.size());
113 }
114
115 template <std::ranges::input_range range_type>
116 requires std::convertible_to<std::ranges::range_value_t<range_type>, value_type>
117 buffer_queue(size_type const init_capacity, range_type && r) : buffer_queue{init_capacity}
118 {
120 }
121
125 template <typename value2_t>
126 requires std::convertible_to<value2_t, value_t>
127 void push(value2_t && value)
128 {
129 detail::spin_delay delay{};
130
131 for (;;)
132 {
133 auto status = try_push(std::forward<value2_t>(value));
134 if (status == queue_op_status::closed)
135 throw queue_op_status::closed;
136 else if (status == queue_op_status::success)
137 return;
138
139 assert(status != queue_op_status::empty);
140 assert(status == queue_op_status::full);
141 delay.wait(); // pause and then try again.
142 }
143 } // throws if closed
144
145 template <typename value2_t>
146 requires std::convertible_to<value2_t, value_t>
147 queue_op_status wait_push(value2_t && value)
148 {
149 detail::spin_delay delay{};
150
151 for (;;)
152 {
153 auto status = try_push(std::forward<value2_t>(value));
154 // wait until queue is not full anymore..
155 if (status != queue_op_status::full)
156 return status;
157
158 assert(status != queue_op_status::empty);
159 assert(status == queue_op_status::full);
160 delay.wait(); // pause and then try again.
161 }
162 }
163
164 value_type value_pop() // throws if closed
165 {
166 detail::spin_delay delay{};
167
168 value_type value{};
169 for (;;)
170 {
171 if (!writer_waiting.load())
172 {
173 auto status = try_pop(value);
174
175 if (status == queue_op_status::closed)
176 throw queue_op_status::closed;
177 else if (status == queue_op_status::success)
178 return value;
179
180 assert(status != queue_op_status::full);
181 assert(status == queue_op_status::empty);
182 }
183 delay.wait(); // pause and then try again.
184 }
185 }
186
187 queue_op_status wait_pop(value_type & value)
188 {
189 detail::spin_delay delay{};
190
191 queue_op_status status;
192 for (;;)
193 {
194 if (!writer_waiting.load())
195 {
196 status = try_pop(value);
197
198 if (status == queue_op_status::closed || status == queue_op_status::success)
199 break;
200
201 assert(status != queue_op_status::full);
202 assert(status == queue_op_status::empty);
203 }
204 delay.wait(); // pause and then try again.
205 }
206 return status;
207 }
209
213 template <typename value2_t>
214 requires std::convertible_to<value2_t, value_t>
215 queue_op_status try_push(value2_t &&);
216
217 queue_op_status try_pop(value_t &);
219
223 void close()
224 {
225 if (writer_waiting.exchange(true)) // First writer that closes the queue will continue, the rest returns.
226 return;
227
228 try
229 {
230 std::unique_lock write_lock{mutex};
231 closed_flag = true;
232 writer_waiting.store(false); // reset the lock.
233 }
234 catch (...)
235 {
236 writer_waiting.store(false); // reset the lock.
238 }
239 }
240
241 bool is_closed() const noexcept
242 {
243 return closed_flag;
244 }
245
246 bool is_empty() const noexcept
247 {
248 std::unique_lock write_lock(mutex);
249 return pop_front_position == push_back_position;
250 }
251
252 bool is_full() const noexcept
253 {
254 std::unique_lock write_lock(mutex);
255 return is_ring_buffer_exhausted(pop_front_position, push_back_position);
256 }
257
258 size_type size() const noexcept
259 {
260 std::unique_lock write_lock(mutex);
261 if (to_buffer_position(pop_front_position) <= to_buffer_position(push_back_position))
262 {
263 return to_buffer_position(push_back_position) - to_buffer_position(pop_front_position);
264 }
265 else
266 {
267 assert(buffer.size() > (to_buffer_position(pop_front_position) - to_buffer_position(push_back_position)));
268 return buffer.size() - (to_buffer_position(pop_front_position) - to_buffer_position(push_back_position));
269 }
270 }
272
273private:
283 constexpr bool is_ring_buffer_exhausted(size_type const from, size_type const to) const
284 {
285 assert(to <= (from + ring_buffer_capacity + 1)); // The tail cannot overwrite the head.
286
287 return to >= from + ring_buffer_capacity;
288 }
289
303 constexpr size_type to_buffer_position(size_type const position) const
304 {
305 return position & (ring_buffer_capacity - 1);
306 }
307
327 size_type cyclic_increment(size_type position)
328 {
329 // invariants:
330 // - ring_buffer_capacity is a power of 2
331 // - (position % ring_buffer_capacity) is in [0, buffer.size())
332 //
333 // return the next greater position that fulfils the invariants
334 if (to_buffer_position(++position) >= buffer.size())
335 position += ring_buffer_capacity - buffer.size(); // If the position reached
336 return position;
337 }
338
339 template <typename value2_t>
340 requires (std::convertible_to<value2_t, value_t>) && (buffer_policy == buffer_queue_policy::fixed) bool
341 overflow(value2_t &&)
342 {
343 return false;
344 }
345
346 template <typename value2_t>
347 requires (std::convertible_to<value2_t, value_t>) && (buffer_policy == buffer_queue_policy::dynamic) bool
348 overflow(value2_t && value);
349
351 buffer_t buffer;
354 alignas(std::hardware_destructive_interference_size) std::atomic<size_type> pending_pop_front_position{0};
356 alignas(std::hardware_destructive_interference_size) std::atomic<size_type> pending_push_back_position{0};
359 alignas(std::hardware_destructive_interference_size) bool closed_flag{false};
360};
361
362// Specifies a fixed size buffer queue.
363template <std::semiregular value_t, sequence_container buffer_t = std::vector<value_t>>
364using fixed_buffer_queue = buffer_queue<value_t, buffer_t, buffer_queue_policy::fixed>;
365
366// Specifies a dynamic size buffer queue (growable).
367template <std::semiregular value_t, sequence_container buffer_t = std::vector<value_t>>
368using dynamic_buffer_queue = buffer_queue<value_t, buffer_t, buffer_queue_policy::dynamic>;
369
370// ============================================================================
371// Metafunctions
372// ============================================================================
373
374// ============================================================================
375// Functions
376// ============================================================================
377
378template <std::semiregular value_t, sequence_container buffer_t, buffer_queue_policy buffer_policy>
379template <typename value2_t>
380 requires (std::convertible_to<value2_t, value_t>) && (buffer_policy == buffer_queue_policy::dynamic)
381inline bool buffer_queue<value_t, buffer_t, buffer_policy>::overflow(value2_t && value)
382{
383 // try to extend capacity
384 std::unique_lock write_lock{mutex};
385
386 size_type old_size = buffer.size();
387 size_type ring_buffer_capacity = this->ring_buffer_capacity;
388 size_type local_front = this->pop_front_position;
389 size_type local_back = this->push_back_position;
390
391 // Expects no pending pushes or pops in unique lock.
392 assert(local_back == this->pending_push_back_position);
393 assert(local_front == this->pending_pop_front_position);
394
395 bool valueWasAppended = false;
396
397 // did we reach the capacity limit (another thread could have done the upgrade already)?
398 // buffer is full if tail_pos + 1 == head_pos
399 if (is_ring_buffer_exhausted(local_front, cyclic_increment(local_back)))
400 {
401 // In case of a full queue write the value into the additional slot.
402 // Note, that the ring-buffer implementation uses one additional field which is not used except
403 // when overflow happens. This invariant is used, to simply check for the full/empty state of the queue.
404 if (old_size != 0)
405 {
406 auto it = std::ranges::begin(buffer) + to_buffer_position(local_back);
407 *it = std::forward<value2_t>(value);
408 local_back = local_front + ring_buffer_capacity;
409 valueWasAppended = true;
410 }
411
412 assert(is_ring_buffer_exhausted(local_front, local_back));
413
414 // get positions of head/tail in current buffer sequence
415 size_type front_buffer_position = to_buffer_position(local_front);
416 size_type back_buffer_position = to_buffer_position(local_back);
417
418 // increase capacity by one and move all elements from current pop_front_position one to the right.
419 buffer.resize(old_size + 1);
420 ring_buffer_capacity = std::bit_ceil(buffer.size());
421 std::ranges::move_backward(std::span{buffer.data() + front_buffer_position, buffer.data() + old_size},
422 buffer.data() + buffer.size());
423
424 // Update the pop_front and push_back positions.
425 if (old_size != 0)
426 {
427 this->pending_pop_front_position = this->pop_front_position = front_buffer_position + 1;
428 this->pending_push_back_position = this->push_back_position = back_buffer_position + ring_buffer_capacity;
429 }
430 this->ring_buffer_capacity = ring_buffer_capacity;
431 }
432 return valueWasAppended;
433}
434
435// ----------------------------------------------------------------------------
436// Function try_pop()
437// ----------------------------------------------------------------------------
438
439/*
440 * @fn ConcurrentQueue#tryPopFront
441 * @headerfile <seqan/parallel.h>
442 * @brief Try to dequeue a value from a queue.
443 *
444 * @signature bool tryPopFront(result, queue[, parallelTag]);
445 *
446 *
447 * @param[in,out] queue A queue.
448 * @param[out] result The dequeued value (if available).
449 * @param[in] parallelTag The concurrency scheme. If multiple threads dequeue values concurrently this tag must be
450 * @link ParallelismTags#Parallel @endlink. The more efficient @link ParallelismTags#Serial
451 * @endlink tag can only be used if one thread calls <tt>popFront</tt> at a time.
452 * Default is @link ParallelismTags#Parallel @endlink.
453 * @return bool Returns <tt>true</tt> if a value could be dequeued and <tt>false</tt> otherwise.
454 */
455template <std::semiregular value_t, sequence_container buffer_t, buffer_queue_policy buffer_policy>
456inline queue_op_status buffer_queue<value_t, buffer_t, buffer_policy>::try_pop(value_t & result)
457{
458 // try to extract a value
459 std::shared_lock read_lock{mutex};
460
461 size_type local_pending_pop_front_position{};
462 size_type next_local_pop_front_position{};
463 detail::spin_delay spinDelay{};
464
465 local_pending_pop_front_position = this->pending_pop_front_position;
466 // wait for queue to become filled
467 while (true)
468 {
469 size_type local_push_back_position = this->push_back_position;
470
471 assert(local_pending_pop_front_position <= local_push_back_position);
472
473 // Check if queue is empty
474 if (local_pending_pop_front_position == local_push_back_position)
475 {
476 return is_closed() ? queue_op_status::closed : queue_op_status::empty;
477 }
478
479 // Get the next ring-buffer position to read from.
480 next_local_pop_front_position = cyclic_increment(local_pending_pop_front_position);
481 // Did another/other thread(s) already acquired this slot?
482 // If yes, try with next position. If not, break and read from aquired position.
483 if (this->pending_pop_front_position.compare_exchange_weak(local_pending_pop_front_position,
484 next_local_pop_front_position))
485 break;
486
487 spinDelay.wait();
488 }
489
490 // Store the value from the aquired read position.
491 result = std::ranges::iter_move(buffer.begin() + to_buffer_position(local_pending_pop_front_position));
492
493 // wait for pending previous reads and synchronize pop_front_position to local_pending_pop_front_position
494 {
495 detail::spin_delay delay{};
496 size_type acquired_slot = local_pending_pop_front_position;
497 while (!this->pop_front_position.compare_exchange_weak(acquired_slot, next_local_pop_front_position))
498 {
499 acquired_slot = local_pending_pop_front_position;
500 delay.wait(); // add adapting delay in case of high contention.
501 }
502 }
503
504 return queue_op_status::success;
505}
506
507// ----------------------------------------------------------------------------
508// Function try_push()
509// ----------------------------------------------------------------------------
510
511/*
512 * @fn ConcurrentQueue#appendValue
513 * @headerfile <seqan/parallel.h>
514 * @brief Enqueue a value to a queue.
515 *
516 * @signature void appendValue(queue, val[, expandTag[, parallelTag]);
517 *
518 *
519 * @param[in,out] queue A queue.
520 * @param[in] val The value to enqueue.
521 * @param[in] expandTag The overflow strategy. If @link OverflowStrategyTags#Generous @endlink the queue will be
522 * automatically resized if the capacity is exceeded, otherwise the thread spinlocks until
523 * the element can be enqueued.
524 * Default is the @link DefaultOverflowImplicit @endlink result for the <tt>queue</tt> type.
525 * @param[in] parallelTag The concurrency scheme. If multiple threads enqueue values concurrently this tag must be
526 * @link ParallelismTags#Parallel @endlink. The more efficient @link ParallelismTags#Serial
527 * @endlink tag can only be used if one thread calls <tt>appendValue</tt> at a time.
528 * Default is @link ParallelismTags#Parallel @endlink.
529 */
530template <std::semiregular value_t, sequence_container buffer_t, buffer_queue_policy buffer_policy>
531template <typename value2_t>
532 requires std::convertible_to<value2_t, value_t>
533inline queue_op_status buffer_queue<value_t, buffer_t, buffer_policy>::try_push(value2_t && value)
534{
535 // try to push the value
536 {
537 detail::spin_delay delay{};
538
539 std::shared_lock read_lock(mutex);
540
541 if (is_closed())
542 return queue_op_status::closed;
543
544 // Current up to date position to push an element to
545 size_type local_pending_push_back_position = this->pending_push_back_position;
546
547 while (true)
548 {
549 // Get the next potential position to write the value too.
550 size_type next_local_push_back_position = cyclic_increment(local_pending_push_back_position);
551 size_type local_pop_front_position = this->pop_front_position;
552
553 // Check if there are enough slots to write to.
554 // If not either wait or try to overflow if it is a dynamic queue.
555 if (is_ring_buffer_exhausted(local_pop_front_position, next_local_push_back_position))
556 break;
557
558 // Did another/other thread(s) acquired the current pending position before this thread
559 // If yes, try again if not, write into acquired slot.
560 if (this->pending_push_back_position.compare_exchange_weak(local_pending_push_back_position,
561 next_local_push_back_position))
562 {
563 // Current thread acquired the local_pending_push_back_position and can now write the value into the
564 // proper slot of the ring buffer.
565 auto it = std::ranges::begin(buffer) + to_buffer_position(local_pending_push_back_position);
566 *it = std::forward<value2_t>(value);
567
568 // wait for pending previous writes and synchronise push_back_position to
569 // local_pending_push_back_position
570 {
571 detail::spin_delay delay{};
572 // the slot this thread acquired to write to
573 size_type acquired_slot = local_pending_push_back_position;
574 while (
575 !this->push_back_position.compare_exchange_weak(acquired_slot, next_local_push_back_position))
576 {
577 acquired_slot = local_pending_push_back_position;
578 delay.wait();
579 }
580 }
581 return queue_op_status::success;
582 }
583
584 delay.wait();
585 }
586 }
587
588 // if possible extend capacity and return.
589 if (overflow(std::forward<value2_t>(value)))
590 {
591 return queue_op_status::success; // always return success, since the queue resizes and cannot be full.
592 }
593
594 // We could not extend the queue so it must be full.
595 return queue_op_status::full;
596}
598} // namespace seqan3::contrib
T begin(T... args)
T bit_ceil(T... args)
T copy(T... args)
Provides various transformation traits used by the range module.
T current_exception(T... args)
T data(T... args)
T empty(T... args)
T fixed(T... args)
constexpr std::size_t hardware_destructive_interference_size
Minimum offset between two objects to avoid false sharing.
Definition: new:34
seqan::std::ranges::to to
Converts a range to a container. <dl class="no-api">This entity is not part of the SeqAn API....
Definition: to.hpp:26
constexpr size_t size
The size of a type pack.
Definition: type_pack/traits.hpp:146
A more refined container concept than seqan3::container.
T move_backward(T... args)
The <new> header from C++17's standard library.
T rethrow_exception(T... args)
Provides seqan3::detail::spin_delay.
Adaptations of concepts from the standard library.