SeqAn3 3.2.0
The Modern C++ library for sequence analysis.
buffer_queue.hpp
Go to the documentation of this file.
1// -----------------------------------------------------------------------------------------------------
2// Copyright (c) 2006-2022, Knut Reinert & Freie Universität Berlin
3// Copyright (c) 2016-2022, Knut Reinert & MPI für molekulare Genetik
4// This file may be used, modified and/or redistributed under the terms of the 3-clause BSD-License
5// shipped with this file and also available at: https://github.com/seqan/seqan3/blob/master/LICENSE.md
6// -----------------------------------------------------------------------------------------------------
7
13#pragma once
14
15#include <algorithm>
16#include <atomic>
17#include <bit>
18#include <cassert>
19#include <cmath>
20#include <concepts>
21#include <mutex>
22#include <seqan3/std/new>
23#include <ranges>
24#include <shared_mutex>
25#include <span>
26#include <type_traits>
27#include <vector>
28
32
33namespace seqan3::contrib
34{
35
37enum class queue_op_status : uint8_t
38{
39 success = 0,
40 empty,
41 full,
42 closed
43};
44
45enum struct buffer_queue_policy : uint8_t
46{
47 fixed,
48 dynamic
49};
50
51// Ringbuffer implementation:
52// The underlying buffer has size (number of actual elements + 1). This is a trick to easily check if the queue is empty
53// or full. Furthermore, the ring buffer uses 4 pointers. The actual push_back and pop_front position as well as
54// the pending push_back and pop_front position. The latter indicates the position that have been advanced concurrently
55// by multiple threads from either end.
56//
57// head: position to read/extract from the queue (first inserted elment) => pop_front_position
58// tail: position where to write to/push new elements to the queue => push_back_position
59// head_read: The actual position after x threads concurrently popped from the queue. => pending_pop_front_position
60// tail_write: The actual position after x threads concurrently pushed to the queue. => pending_push_back_position
61// [ ? ] [ 4 ] [ 3 ] [ 8 ] [ 0 ] [ x ] [ ? ]
62// | ^
63// v |
64// head headRead tail tailWrite
65//
66// valid buffer between [headRead, tail)
67// currently filled [tail, tailWrite)
68// currently removed [head, headRead)
69//
70// State: empty = (head == tail)
71// [ ? ] [ ? ] [ ? ] [ ? ] [ ? ] [ ? ] [ ? ]
72// tail
73// head
74// The head is on the same position as tail.
75// This means that currently no element is in the buffer.
76
77// State: full = (tail + 1 == head)
78// [ 2 ] [ 4 ] [ 3 ] [ ? ] [ 8 ] [ 0 ] [ 7 ]
79// tail
80// head
81// The tail is one position before the head.
82// This means that currently no element can be added to the buffer since it is full.
83// Strategies are to either wait until some elements have been popped or to expand the capacity of the
84// queue by one, inserting the element at the current tail position and moving all elements starting from head one
85// position to the right.
86
87template <std::semiregular value_t,
89 buffer_queue_policy buffer_policy = buffer_queue_policy::dynamic>
90class buffer_queue
91{
92public:
93
94 using buffer_type = buffer_t;
95 using value_type = typename buffer_type::value_type;
96 using size_type = typename buffer_type::size_type;
97 using reference = void;
98 using const_reference = void;
99
100 // Default constructor sets capacity to 1 (still empty)
101 buffer_queue() : buffer_queue{0u}
102 {}
103 buffer_queue(buffer_queue const &) = delete;
104 buffer_queue(buffer_queue &&) = delete;
105 buffer_queue & operator=(buffer_queue const &) = delete;
106 buffer_queue & operator=(buffer_queue &&) = delete;
107 ~buffer_queue() = default;
108
109 // you can set the initial capacity here
110 explicit buffer_queue(size_type const init_capacity)
111 {
112 buffer.resize(init_capacity + 1);
113 ring_buffer_capacity = std::bit_ceil(buffer.size());
114 }
115
116 template <std::ranges::input_range range_type>
117 requires std::convertible_to<std::ranges::range_value_t<range_type>, value_type>
118 buffer_queue(size_type const init_capacity, range_type && r) : buffer_queue{init_capacity}
119 {
121 }
122
126 template <typename value2_t>
127 requires std::convertible_to<value2_t, value_t>
128 void push(value2_t && value)
129 {
130 detail::spin_delay delay{};
131
132 for (;;)
133 {
134 auto status = try_push(std::forward<value2_t>(value));
135 if (status == queue_op_status::closed)
136 throw queue_op_status::closed;
137 else if (status == queue_op_status::success)
138 return;
139
140 assert(status != queue_op_status::empty);
141 assert(status == queue_op_status::full);
142 delay.wait(); // pause and then try again.
143 }
144 } // throws if closed
145
146 template <typename value2_t>
147 requires std::convertible_to<value2_t, value_t>
148 queue_op_status wait_push(value2_t && value)
149 {
150 detail::spin_delay delay{};
151
152 for (;;)
153 {
154 auto status = try_push(std::forward<value2_t>(value));
155 // wait until queue is not full anymore..
156 if (status != queue_op_status::full)
157 return status;
158
159 assert(status != queue_op_status::empty);
160 assert(status == queue_op_status::full);
161 delay.wait(); // pause and then try again.
162 }
163 }
164
165 value_type value_pop() // throws if closed
166 {
167 detail::spin_delay delay{};
168
169 value_type value{};
170 for (;;)
171 {
172 if (!writer_waiting.load())
173 {
174 auto status = try_pop(value);
175
176 if (status == queue_op_status::closed)
177 throw queue_op_status::closed;
178 else if (status == queue_op_status::success)
179 return value;
180
181 assert(status != queue_op_status::full);
182 assert(status == queue_op_status::empty);
183 }
184 delay.wait(); // pause and then try again.
185 }
186 }
187
188 queue_op_status wait_pop(value_type & value)
189 {
190 detail::spin_delay delay{};
191
192 queue_op_status status;
193 for (;;)
194 {
195 if (!writer_waiting.load())
196 {
197 status = try_pop(value);
198
199 if (status == queue_op_status::closed || status == queue_op_status::success)
200 break;
201
202 assert(status != queue_op_status::full);
203 assert(status == queue_op_status::empty);
204 }
205 delay.wait(); // pause and then try again.
206 }
207 return status;
208 }
210
214 template <typename value2_t>
215 requires std::convertible_to<value2_t, value_t>
216 queue_op_status try_push(value2_t &&);
217
218 queue_op_status try_pop(value_t &);
220
224 void close()
225 {
226 if (writer_waiting.exchange(true)) // First writer that closes the queue will continue, the rest returns.
227 return;
228
229 try
230 {
231 std::unique_lock write_lock{mutex};
232 closed_flag = true;
233 writer_waiting.store(false); // reset the lock.
234 }
235 catch (...)
236 {
237 writer_waiting.store(false); // reset the lock.
239 }
240 }
241
242 bool is_closed() const noexcept
243 {
244 return closed_flag;
245 }
246
247 bool is_empty() const noexcept
248 {
249 std::unique_lock write_lock(mutex);
250 return pop_front_position == push_back_position;
251 }
252
253 bool is_full() const noexcept
254 {
255 std::unique_lock write_lock(mutex);
256 return is_ring_buffer_exhausted(pop_front_position, push_back_position);
257 }
258
259 size_type size() const noexcept
260 {
261 std::unique_lock write_lock(mutex);
262 if (to_buffer_position(pop_front_position) <= to_buffer_position(push_back_position))
263 {
264 return to_buffer_position(push_back_position) - to_buffer_position(pop_front_position);
265 }
266 else
267 {
268 assert(buffer.size() > (to_buffer_position(pop_front_position) - to_buffer_position(push_back_position)));
269 return buffer.size() - (to_buffer_position(pop_front_position) - to_buffer_position(push_back_position));
270 }
271 }
273private:
274
284 constexpr bool is_ring_buffer_exhausted(size_type const from, size_type const to) const
285 {
286 assert(to <= (from + ring_buffer_capacity + 1)); // The tail cannot overwrite the head.
287
288 return to >= from + ring_buffer_capacity;
289 }
290
304 constexpr size_type to_buffer_position(size_type const position) const
305 {
306 return position & (ring_buffer_capacity - 1);
307 }
308
328 size_type cyclic_increment(size_type position)
329 {
330 // invariants:
331 // - ring_buffer_capacity is a power of 2
332 // - (position % ring_buffer_capacity) is in [0, buffer.size())
333 //
334 // return the next greater position that fulfils the invariants
335 if (to_buffer_position(++position) >= buffer.size())
336 position += ring_buffer_capacity - buffer.size(); // If the position reached
337 return position;
338 }
339
340 template <typename value2_t>
341 requires (std::convertible_to<value2_t, value_t>) &&
342 (buffer_policy == buffer_queue_policy::fixed)
343 bool overflow(value2_t &&)
344 {
345 return false;
346 }
347
348 template <typename value2_t>
349 requires (std::convertible_to<value2_t, value_t>) &&
350 (buffer_policy == buffer_queue_policy::dynamic)
351 bool overflow(value2_t && value);
352
354 buffer_t buffer;
357 alignas(std::hardware_destructive_interference_size) std::atomic<size_type> pending_pop_front_position{0};
359 alignas(std::hardware_destructive_interference_size) std::atomic<size_type> pending_push_back_position{0};
362 alignas(std::hardware_destructive_interference_size) bool closed_flag{false};
363};
364
365// Specifies a fixed size buffer queue.
366template <std::semiregular value_t, sequence_container buffer_t = std::vector<value_t>>
367using fixed_buffer_queue = buffer_queue<value_t, buffer_t, buffer_queue_policy::fixed>;
368
369// Specifies a dynamic size buffer queue (growable).
370template <std::semiregular value_t, sequence_container buffer_t = std::vector<value_t>>
371using dynamic_buffer_queue = buffer_queue<value_t, buffer_t, buffer_queue_policy::dynamic>;
372
373// ============================================================================
374// Metafunctions
375// ============================================================================
376
377// ============================================================================
378// Functions
379// ============================================================================
380
381template <std::semiregular value_t, sequence_container buffer_t, buffer_queue_policy buffer_policy>
382template <typename value2_t>
383 requires (std::convertible_to<value2_t, value_t>) &&
384 (buffer_policy == buffer_queue_policy::dynamic)
385inline bool buffer_queue<value_t, buffer_t, buffer_policy>::overflow(value2_t && value)
386{
387 // try to extend capacity
388 std::unique_lock write_lock{mutex};
389
390 size_type old_size = buffer.size();
391 size_type ring_buffer_capacity = this->ring_buffer_capacity;
392 size_type local_front = this->pop_front_position;
393 size_type local_back = this->push_back_position;
394
395 // Expects no pending pushes or pops in unique lock.
396 assert(local_back == this->pending_push_back_position);
397 assert(local_front == this->pending_pop_front_position);
398
399 bool valueWasAppended = false;
400
401 // did we reach the capacity limit (another thread could have done the upgrade already)?
402 // buffer is full if tail_pos + 1 == head_pos
403 if (is_ring_buffer_exhausted(local_front, cyclic_increment(local_back)))
404 {
405 // In case of a full queue write the value into the additional slot.
406 // Note, that the ring-buffer implementation uses one additional field which is not used except
407 // when overflow happens. This invariant is used, to simply check for the full/empty state of the queue.
408 if (old_size != 0)
409 {
410 auto it = std::ranges::begin(buffer) + to_buffer_position(local_back);
411 *it = std::forward<value2_t>(value);
412 local_back = local_front + ring_buffer_capacity;
413 valueWasAppended = true;
414 }
415
416 assert(is_ring_buffer_exhausted(local_front, local_back));
417
418 // get positions of head/tail in current buffer sequence
419 size_type front_buffer_position = to_buffer_position(local_front);
420 size_type back_buffer_position = to_buffer_position(local_back);
421
422 // increase capacity by one and move all elements from current pop_front_position one to the right.
423 buffer.resize(old_size + 1);
424 ring_buffer_capacity = std::bit_ceil(buffer.size());
425 std::ranges::move_backward(std::span{buffer.data() + front_buffer_position, buffer.data() + old_size},
426 buffer.data() + buffer.size());
427
428 // Update the pop_front and push_back positions.
429 if (old_size != 0)
430 {
431 this->pending_pop_front_position = this->pop_front_position = front_buffer_position + 1;
432 this->pending_push_back_position = this->push_back_position = back_buffer_position + ring_buffer_capacity;
433 }
434 this->ring_buffer_capacity = ring_buffer_capacity;
435 }
436 return valueWasAppended;
437}
438
439// ----------------------------------------------------------------------------
440// Function try_pop()
441// ----------------------------------------------------------------------------
442
443/*
444 * @fn ConcurrentQueue#tryPopFront
445 * @headerfile <seqan/parallel.h>
446 * @brief Try to dequeue a value from a queue.
447 *
448 * @signature bool tryPopFront(result, queue[, parallelTag]);
449 *
450 *
451 * @param[in,out] queue A queue.
452 * @param[out] result The dequeued value (if available).
453 * @param[in] parallelTag The concurrency scheme. If multiple threads dequeue values concurrently this tag must be
454 * @link ParallelismTags#Parallel @endlink. The more efficient @link ParallelismTags#Serial
455 * @endlink tag can only be used if one thread calls <tt>popFront</tt> at a time.
456 * Default is @link ParallelismTags#Parallel @endlink.
457 * @return bool Returns <tt>true</tt> if a value could be dequeued and <tt>false</tt> otherwise.
458 */
459template <std::semiregular value_t, sequence_container buffer_t, buffer_queue_policy buffer_policy>
460inline queue_op_status buffer_queue<value_t, buffer_t, buffer_policy>::try_pop(value_t & result)
461{
462 // try to extract a value
463 std::shared_lock read_lock{mutex};
464
465 size_type local_pending_pop_front_position{};
466 size_type next_local_pop_front_position{};
467 detail::spin_delay spinDelay{};
468
469 local_pending_pop_front_position = this->pending_pop_front_position;
470 // wait for queue to become filled
471 while (true)
472 {
473 size_type local_push_back_position = this->push_back_position;
474
475 assert(local_pending_pop_front_position <= local_push_back_position);
476
477 // Check if queue is empty
478 if (local_pending_pop_front_position == local_push_back_position)
479 {
480 return is_closed() ? queue_op_status::closed : queue_op_status::empty;
481 }
482
483 // Get the next ring-buffer position to read from.
484 next_local_pop_front_position = cyclic_increment(local_pending_pop_front_position);
485 // Did another/other thread(s) already acquired this slot?
486 // If yes, try with next position. If not, break and read from aquired position.
487 if (this->pending_pop_front_position.compare_exchange_weak(local_pending_pop_front_position,
488 next_local_pop_front_position))
489 break;
490
491 spinDelay.wait();
492 }
493
494 // Store the value from the aquired read position.
495 result = std::ranges::iter_move(buffer.begin() + to_buffer_position(local_pending_pop_front_position));
496
497 // wait for pending previous reads and synchronize pop_front_position to local_pending_pop_front_position
498 {
499 detail::spin_delay delay{};
500 size_type acquired_slot = local_pending_pop_front_position;
501 while (!this->pop_front_position.compare_exchange_weak(acquired_slot, next_local_pop_front_position))
502 {
503 acquired_slot = local_pending_pop_front_position;
504 delay.wait(); // add adapting delay in case of high contention.
505 }
506 }
507
508 return queue_op_status::success;
509}
510
511// ----------------------------------------------------------------------------
512// Function try_push()
513// ----------------------------------------------------------------------------
514
515/*
516 * @fn ConcurrentQueue#appendValue
517 * @headerfile <seqan/parallel.h>
518 * @brief Enqueue a value to a queue.
519 *
520 * @signature void appendValue(queue, val[, expandTag[, parallelTag]);
521 *
522 *
523 * @param[in,out] queue A queue.
524 * @param[in] val The value to enqueue.
525 * @param[in] expandTag The overflow strategy. If @link OverflowStrategyTags#Generous @endlink the queue will be
526 * automatically resized if the capacity is exceeded, otherwise the thread spinlocks until
527 * the element can be enqueued.
528 * Default is the @link DefaultOverflowImplicit @endlink result for the <tt>queue</tt> type.
529 * @param[in] parallelTag The concurrency scheme. If multiple threads enqueue values concurrently this tag must be
530 * @link ParallelismTags#Parallel @endlink. The more efficient @link ParallelismTags#Serial
531 * @endlink tag can only be used if one thread calls <tt>appendValue</tt> at a time.
532 * Default is @link ParallelismTags#Parallel @endlink.
533 */
534template <std::semiregular value_t, sequence_container buffer_t, buffer_queue_policy buffer_policy>
535template <typename value2_t>
536 requires std::convertible_to<value2_t, value_t>
537inline queue_op_status buffer_queue<value_t, buffer_t, buffer_policy>::try_push(value2_t && value)
538{
539 // try to push the value
540 {
541 detail::spin_delay delay{};
542
543 std::shared_lock read_lock(mutex);
544
545 if (is_closed())
546 return queue_op_status::closed;
547
548 // Current up to date position to push an element to
549 size_type local_pending_push_back_position = this->pending_push_back_position;
550
551 while (true)
552 {
553 // Get the next potential position to write the value too.
554 size_type next_local_push_back_position = cyclic_increment(local_pending_push_back_position);
555 size_type local_pop_front_position = this->pop_front_position;
556
557 // Check if there are enough slots to write to.
558 // If not either wait or try to overflow if it is a dynamic queue.
559 if (is_ring_buffer_exhausted(local_pop_front_position, next_local_push_back_position))
560 break;
561
562 // Did another/other thread(s) acquired the current pending position before this thread
563 // If yes, try again if not, write into acquired slot.
564 if (this->pending_push_back_position.compare_exchange_weak(local_pending_push_back_position,
565 next_local_push_back_position))
566 {
567 // Current thread acquired the local_pending_push_back_position and can now write the value into the
568 // proper slot of the ring buffer.
569 auto it = std::ranges::begin(buffer) + to_buffer_position(local_pending_push_back_position);
570 *it = std::forward<value2_t>(value);
571
572 // wait for pending previous writes and synchronise push_back_position to
573 // local_pending_push_back_position
574 {
575 detail::spin_delay delay{};
576 // the slot this thread acquired to write to
577 size_type acquired_slot = local_pending_push_back_position;
578 while (!this->push_back_position.compare_exchange_weak(acquired_slot,
579 next_local_push_back_position))
580 {
581 acquired_slot = local_pending_push_back_position;
582 delay.wait();
583 }
584 }
585 return queue_op_status::success;
586 }
587
588 delay.wait();
589 }
590 }
591
592 // if possible extend capacity and return.
593 if (overflow(std::forward<value2_t>(value)))
594 {
595 return queue_op_status::success; // always return success, since the queue resizes and cannot be full.
596 }
597
598 // We could not extend the queue so it must be full.
599 return queue_op_status::full;
600}
602} // namespace seqan3::contrib
T begin(T... args)
T bit_ceil(T... args)
T copy(T... args)
Provides various transformation traits used by the range module.
T current_exception(T... args)
T data(T... args)
T empty(T... args)
T fixed(T... args)
constexpr std::size_t hardware_destructive_interference_size
Minimum offset between two objects to avoid false sharing.
Definition: new:34
constexpr auto to(args_t &&... args)
Converts a range to a container.
Definition: to.hpp:114
constexpr size_t size
The size of a type pack.
Definition: traits.hpp:146
A more refined container concept than seqan3::container.
T move_backward(T... args)
The <new> header from C++17's standard library.
T rethrow_exception(T... args)
Provides seqan3::detail::spin_delay.
Adaptations of concepts from the standard library.