SeqAn3 3.4.0-rc.1
The Modern C++ library for sequence analysis.
Loading...
Searching...
No Matches
buffer_queue.hpp
Go to the documentation of this file.
1// SPDX-FileCopyrightText: 2006-2024 Knut Reinert & Freie Universität Berlin
2// SPDX-FileCopyrightText: 2016-2024 Knut Reinert & MPI für molekulare Genetik
3// SPDX-License-Identifier: BSD-3-Clause
4
10#pragma once
11
12#include <algorithm>
13#include <atomic>
14#include <bit>
15#include <cassert>
16#include <cmath>
17#include <concepts>
18#include <mutex>
19#include <ranges>
20#include <seqan3/std/new>
21#include <shared_mutex>
22#include <span>
23#include <type_traits>
24#include <vector>
25
29
30namespace seqan3::contrib
31{
32
34enum class queue_op_status : uint8_t
35{
36 success = 0,
37 empty,
38 full,
39 closed
40};
41
42enum struct buffer_queue_policy : uint8_t
43{
44 fixed,
45 dynamic
46};
47
48// Ringbuffer implementation:
49// The underlying buffer has size (number of actual elements + 1). This is a trick to easily check if the queue is empty
50// or full. Furthermore, the ring buffer uses 4 pointers. The actual push_back and pop_front position as well as
51// the pending push_back and pop_front position. The latter indicates the position that have been advanced concurrently
52// by multiple threads from either end.
53//
54// head: position to read/extract from the queue (first inserted elment) => pop_front_position
55// tail: position where to write to/push new elements to the queue => push_back_position
56// head_read: The actual position after x threads concurrently popped from the queue. => pending_pop_front_position
57// tail_write: The actual position after x threads concurrently pushed to the queue. => pending_push_back_position
58// [ ? ] [ 4 ] [ 3 ] [ 8 ] [ 0 ] [ x ] [ ? ]
59// | ^
60// v |
61// head headRead tail tailWrite
62//
63// valid buffer between [headRead, tail)
64// currently filled [tail, tailWrite)
65// currently removed [head, headRead)
66//
67// State: empty = (head == tail)
68// [ ? ] [ ? ] [ ? ] [ ? ] [ ? ] [ ? ] [ ? ]
69// tail
70// head
71// The head is on the same position as tail.
72// This means that currently no element is in the buffer.
73
74// State: full = (tail + 1 == head)
75// [ 2 ] [ 4 ] [ 3 ] [ ? ] [ 8 ] [ 0 ] [ 7 ]
76// tail
77// head
78// The tail is one position before the head.
79// This means that currently no element can be added to the buffer since it is full.
80// Strategies are to either wait until some elements have been popped or to expand the capacity of the
81// queue by one, inserting the element at the current tail position and moving all elements starting from head one
82// position to the right.
83
84template <std::semiregular value_t,
86 buffer_queue_policy buffer_policy = buffer_queue_policy::dynamic>
87class buffer_queue
88{
89public:
90 using buffer_type = buffer_t;
91 using value_type = typename buffer_type::value_type;
92 using size_type = typename buffer_type::size_type;
93 using reference = void;
94 using const_reference = void;
95
96 // Default constructor sets capacity to 1 (still empty)
97 buffer_queue() : buffer_queue{0u}
98 {}
99 buffer_queue(buffer_queue const &) = delete;
100 buffer_queue(buffer_queue &&) = delete;
101 buffer_queue & operator=(buffer_queue const &) = delete;
102 buffer_queue & operator=(buffer_queue &&) = delete;
103 ~buffer_queue() = default;
104
105 // you can set the initial capacity here
106 explicit buffer_queue(size_type const init_capacity)
107 {
108 buffer.resize(init_capacity + 1);
109 ring_buffer_capacity = std::bit_ceil(buffer.size());
110 }
111
112 template <std::ranges::input_range range_type>
113 requires std::convertible_to<std::ranges::range_value_t<range_type>, value_type>
114 buffer_queue(size_type const init_capacity, range_type && r) : buffer_queue{init_capacity}
115 {
117 }
118
122 template <typename value2_t>
123 requires std::convertible_to<value2_t, value_t>
124 void push(value2_t && value)
125 {
126 detail::spin_delay delay{};
127
128 for (;;)
129 {
130 auto status = try_push(std::forward<value2_t>(value));
131 if (status == queue_op_status::closed)
132 throw queue_op_status::closed;
133 else if (status == queue_op_status::success)
134 return;
135
136 assert(status != queue_op_status::empty);
137 assert(status == queue_op_status::full);
138 delay.wait(); // pause and then try again.
139 }
140 } // throws if closed
141
142 template <typename value2_t>
143 requires std::convertible_to<value2_t, value_t>
144 queue_op_status wait_push(value2_t && value)
145 {
146 detail::spin_delay delay{};
147
148 for (;;)
149 {
150 auto status = try_push(std::forward<value2_t>(value));
151 // wait until queue is not full anymore..
152 if (status != queue_op_status::full)
153 return status;
154
155 assert(status != queue_op_status::empty);
156 assert(status == queue_op_status::full);
157 delay.wait(); // pause and then try again.
158 }
159 }
160
161 value_type value_pop() // throws if closed
162 {
163 detail::spin_delay delay{};
164
165 value_type value{};
166 for (;;)
167 {
168 if (!writer_waiting.load())
169 {
170 auto status = try_pop(value);
171
172 if (status == queue_op_status::closed)
173 throw queue_op_status::closed;
174 else if (status == queue_op_status::success)
175 return value;
176
177 assert(status != queue_op_status::full);
178 assert(status == queue_op_status::empty);
179 }
180 delay.wait(); // pause and then try again.
181 }
182 }
183
184 queue_op_status wait_pop(value_type & value)
185 {
186 detail::spin_delay delay{};
187
188 queue_op_status status;
189 for (;;)
190 {
191 if (!writer_waiting.load())
192 {
193 status = try_pop(value);
194
195 if (status == queue_op_status::closed || status == queue_op_status::success)
196 break;
197
198 assert(status != queue_op_status::full);
199 assert(status == queue_op_status::empty);
200 }
201 delay.wait(); // pause and then try again.
202 }
203 return status;
204 }
206
210 template <typename value2_t>
211 requires std::convertible_to<value2_t, value_t>
212 queue_op_status try_push(value2_t &&);
213
214 queue_op_status try_pop(value_t &);
216
220 void close()
221 {
222 if (writer_waiting.exchange(true)) // First writer that closes the queue will continue, the rest returns.
223 return;
224
225 try
226 {
227 std::unique_lock write_lock{mutex};
228 closed_flag = true;
229 writer_waiting.store(false); // reset the lock.
230 }
231 catch (...)
232 {
233 writer_waiting.store(false); // reset the lock.
235 }
236 }
237
238 bool is_closed() const noexcept
239 {
240 return closed_flag;
241 }
242
243 bool is_empty() const noexcept
244 {
245 std::unique_lock write_lock(mutex);
246 return pop_front_position == push_back_position;
247 }
248
249 bool is_full() const noexcept
250 {
251 std::unique_lock write_lock(mutex);
252 return is_ring_buffer_exhausted(pop_front_position, push_back_position);
253 }
254
255 size_type size() const noexcept
256 {
257 std::unique_lock write_lock(mutex);
258 if (to_buffer_position(pop_front_position) <= to_buffer_position(push_back_position))
259 {
260 return to_buffer_position(push_back_position) - to_buffer_position(pop_front_position);
261 }
262 else
263 {
264 assert(buffer.size() > (to_buffer_position(pop_front_position) - to_buffer_position(push_back_position)));
265 return buffer.size() - (to_buffer_position(pop_front_position) - to_buffer_position(push_back_position));
266 }
267 }
269
270private:
280 constexpr bool is_ring_buffer_exhausted(size_type const from, size_type const to) const
281 {
282 assert(to <= (from + ring_buffer_capacity + 1)); // The tail cannot overwrite the head.
283
284 return to >= from + ring_buffer_capacity;
285 }
286
300 constexpr size_type to_buffer_position(size_type const position) const
301 {
302 return position & (ring_buffer_capacity - 1);
303 }
304
324 size_type cyclic_increment(size_type position)
325 {
326 // invariants:
327 // - ring_buffer_capacity is a power of 2
328 // - (position % ring_buffer_capacity) is in [0, buffer.size())
329 //
330 // return the next greater position that fulfils the invariants
331 if (to_buffer_position(++position) >= buffer.size())
332 position += ring_buffer_capacity - buffer.size(); // If the position reached
333 return position;
334 }
335
336 template <typename value2_t>
337 requires (std::convertible_to<value2_t, value_t>) && (buffer_policy == buffer_queue_policy::fixed) bool
338 overflow(value2_t &&)
339 {
340 return false;
341 }
342
343 template <typename value2_t>
344 requires (std::convertible_to<value2_t, value_t>) && (buffer_policy == buffer_queue_policy::dynamic) bool
345 overflow(value2_t && value);
346
348 buffer_t buffer;
351 alignas(std::hardware_destructive_interference_size) std::atomic<size_type> pending_pop_front_position{0};
353 alignas(std::hardware_destructive_interference_size) std::atomic<size_type> pending_push_back_position{0};
356 alignas(std::hardware_destructive_interference_size) bool closed_flag{false};
357};
358
359// Specifies a fixed size buffer queue.
360template <std::semiregular value_t, sequence_container buffer_t = std::vector<value_t>>
361using fixed_buffer_queue = buffer_queue<value_t, buffer_t, buffer_queue_policy::fixed>;
362
363// Specifies a dynamic size buffer queue (growable).
364template <std::semiregular value_t, sequence_container buffer_t = std::vector<value_t>>
365using dynamic_buffer_queue = buffer_queue<value_t, buffer_t, buffer_queue_policy::dynamic>;
366
367// ============================================================================
368// Metafunctions
369// ============================================================================
370
371// ============================================================================
372// Functions
373// ============================================================================
374
375template <std::semiregular value_t, sequence_container buffer_t, buffer_queue_policy buffer_policy>
376template <typename value2_t>
377 requires (std::convertible_to<value2_t, value_t>) && (buffer_policy == buffer_queue_policy::dynamic)
378inline bool buffer_queue<value_t, buffer_t, buffer_policy>::overflow(value2_t && value)
379{
380 // try to extend capacity
381 std::unique_lock write_lock{mutex};
382
383 size_type old_size = buffer.size();
384 size_type ring_buffer_capacity = this->ring_buffer_capacity;
385 size_type local_front = this->pop_front_position;
386 size_type local_back = this->push_back_position;
387
388 // Expects no pending pushes or pops in unique lock.
389 assert(local_back == this->pending_push_back_position);
390 assert(local_front == this->pending_pop_front_position);
391
392 bool valueWasAppended = false;
393
394 // did we reach the capacity limit (another thread could have done the upgrade already)?
395 // buffer is full if tail_pos + 1 == head_pos
396 if (is_ring_buffer_exhausted(local_front, cyclic_increment(local_back)))
397 {
398 // In case of a full queue write the value into the additional slot.
399 // Note, that the ring-buffer implementation uses one additional field which is not used except
400 // when overflow happens. This invariant is used, to simply check for the full/empty state of the queue.
401 if (old_size != 0)
402 {
403 auto it = std::ranges::begin(buffer) + to_buffer_position(local_back);
404 *it = std::forward<value2_t>(value);
405 local_back = local_front + ring_buffer_capacity;
406 valueWasAppended = true;
407 }
408
409 assert(is_ring_buffer_exhausted(local_front, local_back));
410
411 // get positions of head/tail in current buffer sequence
412 size_type front_buffer_position = to_buffer_position(local_front);
413 size_type back_buffer_position = to_buffer_position(local_back);
414
415 // increase capacity by one and move all elements from current pop_front_position one to the right.
416 buffer.resize(old_size + 1);
417 ring_buffer_capacity = std::bit_ceil(buffer.size());
418 std::ranges::move_backward(std::span{buffer.data() + front_buffer_position, buffer.data() + old_size},
419 buffer.data() + buffer.size());
420
421 // Update the pop_front and push_back positions.
422 if (old_size != 0)
423 {
424 this->pending_pop_front_position = this->pop_front_position = front_buffer_position + 1;
425 this->pending_push_back_position = this->push_back_position = back_buffer_position + ring_buffer_capacity;
426 }
427 this->ring_buffer_capacity = ring_buffer_capacity;
428 }
429 return valueWasAppended;
430}
431
432// ----------------------------------------------------------------------------
433// Function try_pop()
434// ----------------------------------------------------------------------------
435
436/*
437 * @fn ConcurrentQueue#tryPopFront
438 * @headerfile <seqan/parallel.h>
439 * @brief Try to dequeue a value from a queue.
440 *
441 * @signature bool tryPopFront(result, queue[, parallelTag]);
442 *
443 *
444 * @param[in,out] queue A queue.
445 * @param[out] result The dequeued value (if available).
446 * @param[in] parallelTag The concurrency scheme. If multiple threads dequeue values concurrently this tag must be
447 * @link ParallelismTags#Parallel @endlink. The more efficient @link ParallelismTags#Serial
448 * @endlink tag can only be used if one thread calls <tt>popFront</tt> at a time.
449 * Default is @link ParallelismTags#Parallel @endlink.
450 * @return bool Returns <tt>true</tt> if a value could be dequeued and <tt>false</tt> otherwise.
451 */
452template <std::semiregular value_t, sequence_container buffer_t, buffer_queue_policy buffer_policy>
453inline queue_op_status buffer_queue<value_t, buffer_t, buffer_policy>::try_pop(value_t & result)
454{
455 // try to extract a value
456 std::shared_lock read_lock{mutex};
457
458 size_type local_pending_pop_front_position{};
459 size_type next_local_pop_front_position{};
460 detail::spin_delay spinDelay{};
461
462 local_pending_pop_front_position = this->pending_pop_front_position;
463 // wait for queue to become filled
464 while (true)
465 {
466 size_type local_push_back_position = this->push_back_position;
467
468 assert(local_pending_pop_front_position <= local_push_back_position);
469
470 // Check if queue is empty
471 if (local_pending_pop_front_position == local_push_back_position)
472 {
473 return is_closed() ? queue_op_status::closed : queue_op_status::empty;
474 }
475
476 // Get the next ring-buffer position to read from.
477 next_local_pop_front_position = cyclic_increment(local_pending_pop_front_position);
478 // Did another/other thread(s) already acquired this slot?
479 // If yes, try with next position. If not, break and read from aquired position.
480 if (this->pending_pop_front_position.compare_exchange_weak(local_pending_pop_front_position,
481 next_local_pop_front_position))
482 break;
483
484 spinDelay.wait();
485 }
486
487 // Store the value from the aquired read position.
488 result = std::ranges::iter_move(buffer.begin() + to_buffer_position(local_pending_pop_front_position));
489
490 // wait for pending previous reads and synchronize pop_front_position to local_pending_pop_front_position
491 {
492 detail::spin_delay delay{};
493 size_type acquired_slot = local_pending_pop_front_position;
494 while (!this->pop_front_position.compare_exchange_weak(acquired_slot, next_local_pop_front_position))
495 {
496 acquired_slot = local_pending_pop_front_position;
497 delay.wait(); // add adapting delay in case of high contention.
498 }
499 }
500
501 return queue_op_status::success;
502}
503
504// ----------------------------------------------------------------------------
505// Function try_push()
506// ----------------------------------------------------------------------------
507
508/*
509 * @fn ConcurrentQueue#appendValue
510 * @headerfile <seqan/parallel.h>
511 * @brief Enqueue a value to a queue.
512 *
513 * @signature void appendValue(queue, val[, expandTag[, parallelTag]);
514 *
515 *
516 * @param[in,out] queue A queue.
517 * @param[in] val The value to enqueue.
518 * @param[in] expandTag The overflow strategy. If @link OverflowStrategyTags#Generous @endlink the queue will be
519 * automatically resized if the capacity is exceeded, otherwise the thread spinlocks until
520 * the element can be enqueued.
521 * Default is the @link DefaultOverflowImplicit @endlink result for the <tt>queue</tt> type.
522 * @param[in] parallelTag The concurrency scheme. If multiple threads enqueue values concurrently this tag must be
523 * @link ParallelismTags#Parallel @endlink. The more efficient @link ParallelismTags#Serial
524 * @endlink tag can only be used if one thread calls <tt>appendValue</tt> at a time.
525 * Default is @link ParallelismTags#Parallel @endlink.
526 */
527template <std::semiregular value_t, sequence_container buffer_t, buffer_queue_policy buffer_policy>
528template <typename value2_t>
529 requires std::convertible_to<value2_t, value_t>
530inline queue_op_status buffer_queue<value_t, buffer_t, buffer_policy>::try_push(value2_t && value)
531{
532 // try to push the value
533 {
534 detail::spin_delay delay{};
535
536 std::shared_lock read_lock(mutex);
537
538 if (is_closed())
539 return queue_op_status::closed;
540
541 // Current up to date position to push an element to
542 size_type local_pending_push_back_position = this->pending_push_back_position;
543
544 while (true)
545 {
546 // Get the next potential position to write the value too.
547 size_type next_local_push_back_position = cyclic_increment(local_pending_push_back_position);
548 size_type local_pop_front_position = this->pop_front_position;
549
550 // Check if there are enough slots to write to.
551 // If not either wait or try to overflow if it is a dynamic queue.
552 if (is_ring_buffer_exhausted(local_pop_front_position, next_local_push_back_position))
553 break;
554
555 // Did another/other thread(s) acquired the current pending position before this thread
556 // If yes, try again if not, write into acquired slot.
557 if (this->pending_push_back_position.compare_exchange_weak(local_pending_push_back_position,
558 next_local_push_back_position))
559 {
560 // Current thread acquired the local_pending_push_back_position and can now write the value into the
561 // proper slot of the ring buffer.
562 auto it = std::ranges::begin(buffer) + to_buffer_position(local_pending_push_back_position);
563 *it = std::forward<value2_t>(value);
564
565 // wait for pending previous writes and synchronise push_back_position to
566 // local_pending_push_back_position
567 {
568 detail::spin_delay delay{};
569 // the slot this thread acquired to write to
570 size_type acquired_slot = local_pending_push_back_position;
571 while (
572 !this->push_back_position.compare_exchange_weak(acquired_slot, next_local_push_back_position))
573 {
574 acquired_slot = local_pending_push_back_position;
575 delay.wait();
576 }
577 }
578 return queue_op_status::success;
579 }
580
581 delay.wait();
582 }
583 }
584
585 // if possible extend capacity and return.
586 if (overflow(std::forward<value2_t>(value)))
587 {
588 return queue_op_status::success; // always return success, since the queue resizes and cannot be full.
589 }
590
591 // We could not extend the queue so it must be full.
592 return queue_op_status::full;
593}
595} // namespace seqan3::contrib
T begin(T... args)
T bit_ceil(T... args)
T copy(T... args)
Provides various transformation traits used by the range module.
T current_exception(T... args)
T data(T... args)
T empty(T... args)
T fixed(T... args)
constexpr std::size_t hardware_destructive_interference_size
Minimum offset between two objects to avoid false sharing.
Definition new:54
seqan::stl::ranges::to to
Converts a range to a container. <dl class="no-api">This entity is not part of the SeqAn API....
Definition to.hpp:23
constexpr size_t size
The size of a type pack.
Definition type_pack/traits.hpp:143
A more refined container concept than seqan3::container.
T move_backward(T... args)
The <new> header from C++17's standard library.
T rethrow_exception(T... args)
Provides seqan3::detail::spin_delay.
Adaptations of concepts from the standard library.
Hide me