25 #include <type_traits>
32 namespace seqan3::contrib
36 enum class queue_op_status : uint8_t
44 enum struct buffer_queue_policy : uint8_t
86 template <std::semiregular value_t,
88 buffer_queue_policy buffer_policy = buffer_queue_policy::dynamic>
93 using buffer_type = buffer_t;
94 using value_type =
typename buffer_type::value_type;
95 using size_type =
typename buffer_type::size_type;
96 using reference = void;
97 using const_reference = void;
100 buffer_queue() : buffer_queue{0u}
102 buffer_queue(buffer_queue
const &) =
delete;
103 buffer_queue(buffer_queue &&) =
delete;
104 buffer_queue & operator=(buffer_queue
const &) =
delete;
105 buffer_queue & operator=(buffer_queue &&) =
delete;
106 ~buffer_queue() =
default;
109 explicit buffer_queue(size_type
const init_capacity)
111 buffer.resize(init_capacity + 1);
115 template <std::ranges::input_range range_type>
116 requires std::convertible_to<std::ranges::range_value_t<range_type>, value_type>
117 buffer_queue(size_type
const init_capacity, range_type && r) : buffer_queue{init_capacity}
119 std::ranges::copy(r, std::ranges::begin(buffer));
125 template <
typename value2_t>
126 requires std::convertible_to<value2_t, value_t>
127 void push(value2_t && value)
129 detail::spin_delay delay{};
133 auto status = try_push(std::forward<value2_t>(value));
134 if (status == queue_op_status::closed)
135 throw queue_op_status::closed;
136 else if (status == queue_op_status::success)
139 assert(status != queue_op_status::empty);
140 assert(status == queue_op_status::full);
145 template <
typename value2_t>
146 requires std::convertible_to<value2_t, value_t>
147 queue_op_status wait_push(value2_t && value)
149 detail::spin_delay delay{};
153 auto status = try_push(std::forward<value2_t>(value));
155 if (status != queue_op_status::full)
158 assert(status != queue_op_status::empty);
159 assert(status == queue_op_status::full);
164 value_type value_pop()
166 detail::spin_delay delay{};
171 if (!writer_waiting.load())
173 auto status = try_pop(value);
175 if (status == queue_op_status::closed)
176 throw queue_op_status::closed;
177 else if (status == queue_op_status::success)
180 assert(status != queue_op_status::full);
181 assert(status == queue_op_status::empty);
187 queue_op_status wait_pop(value_type & value)
189 detail::spin_delay delay{};
194 if (!writer_waiting.load())
198 if (status == queue_op_status::closed || status == queue_op_status::success)
201 assert(status != queue_op_status::full);
202 assert(status == queue_op_status::empty);
213 template <
typename value2_t>
214 requires std::convertible_to<value2_t, value_t>
215 queue_op_status try_push(value2_t &&);
217 queue_op_status try_pop(value_t &);
225 if (writer_waiting.exchange(
true))
232 writer_waiting.store(
false);
236 writer_waiting.store(
false);
241 bool is_closed() const noexcept
249 return pop_front_position == push_back_position;
252 bool is_full() const noexcept
255 return is_ring_buffer_exhausted(pop_front_position, push_back_position);
258 size_type
size() const noexcept
261 if (to_buffer_position(pop_front_position) <= to_buffer_position(push_back_position))
263 return to_buffer_position(push_back_position) - to_buffer_position(pop_front_position);
267 assert(buffer.size() > (to_buffer_position(pop_front_position) - to_buffer_position(push_back_position)));
268 return buffer.size() - (to_buffer_position(pop_front_position) - to_buffer_position(push_back_position));
283 constexpr
bool is_ring_buffer_exhausted(size_type
const from, size_type
const to)
const
285 assert(
to <= (from + ring_buffer_capacity + 1));
287 return to >= from + ring_buffer_capacity;
303 constexpr size_type to_buffer_position(size_type
const position)
const
305 return position & (ring_buffer_capacity - 1);
327 size_type cyclic_increment(size_type position)
334 if (to_buffer_position(++position) >= buffer.size())
335 position += ring_buffer_capacity - buffer.size();
339 template <
typename value2_t>
340 requires (std::convertible_to<value2_t, value_t>) &&
341 (buffer_policy == buffer_queue_policy::fixed)
342 bool overflow(value2_t &&)
347 template <
typename value2_t>
348 requires (std::convertible_to<value2_t, value_t>) &&
349 (buffer_policy == buffer_queue_policy::dynamic)
350 bool overflow(value2_t && value);
365 template <std::semiregular value_t, sequence_container buffer_t = std::vector<value_t>>
366 using fixed_buffer_queue = buffer_queue<value_t, buffer_t, buffer_queue_policy::fixed>;
369 template <std::semiregular value_t, sequence_container buffer_t = std::vector<value_t>>
370 using dynamic_buffer_queue = buffer_queue<value_t, buffer_t, buffer_queue_policy::dynamic>;
380 template <
typename value_t,
typename buffer_t, buffer_queue_policy buffer_policy>
381 template <
typename value2_t>
382 requires (std::convertible_to<value2_t, value_t>) &&
383 (buffer_policy == buffer_queue_policy::dynamic)
384 inline bool buffer_queue<value_t, buffer_t, buffer_policy>::overflow(value2_t && value)
389 size_type old_size = buffer.size();
390 size_type ring_buffer_capacity = this->ring_buffer_capacity;
391 size_type local_front = this->pop_front_position;
392 size_type local_back = this->push_back_position;
395 assert(local_back == this->pending_push_back_position);
396 assert(local_front == this->pending_pop_front_position);
398 bool valueWasAppended =
false;
402 if (is_ring_buffer_exhausted(local_front, cyclic_increment(local_back)))
410 *it = std::forward<value2_t>(value);
411 local_back = local_front + ring_buffer_capacity;
412 valueWasAppended =
true;
415 assert(is_ring_buffer_exhausted(local_front, local_back));
418 size_type front_buffer_position = to_buffer_position(local_front);
419 size_type back_buffer_position = to_buffer_position(local_back);
422 buffer.resize(old_size + 1);
424 std::ranges::move_backward(
std::span{buffer.
data() + front_buffer_position, buffer.data() + old_size},
425 buffer.data() + buffer.size());
430 this->pending_pop_front_position = this->pop_front_position = front_buffer_position + 1;
431 this->pending_push_back_position = this->push_back_position = back_buffer_position + ring_buffer_capacity;
433 this->ring_buffer_capacity = ring_buffer_capacity;
435 return valueWasAppended;
458 template <
typename value_t,
typename buffer_t, buffer_queue_policy buffer_policy>
459 inline queue_op_status buffer_queue<value_t, buffer_t, buffer_policy>::try_pop(value_t & result)
464 size_type local_pending_pop_front_position{};
465 size_type next_local_pop_front_position{};
466 detail::spin_delay spinDelay{};
468 local_pending_pop_front_position = this->pending_pop_front_position;
472 size_type local_push_back_position = this->push_back_position;
474 assert(local_pending_pop_front_position <= local_push_back_position);
477 if (local_pending_pop_front_position == local_push_back_position)
479 return is_closed() ? queue_op_status::closed : queue_op_status::empty;
483 next_local_pop_front_position = cyclic_increment(local_pending_pop_front_position);
486 if (this->pending_pop_front_position.compare_exchange_weak(local_pending_pop_front_position,
487 next_local_pop_front_position))
494 result = std::ranges::iter_move(buffer.begin() + to_buffer_position(local_pending_pop_front_position));
498 detail::spin_delay delay{};
499 size_type acquired_slot = local_pending_pop_front_position;
500 while (!this->pop_front_position.compare_exchange_weak(acquired_slot, next_local_pop_front_position))
502 acquired_slot = local_pending_pop_front_position;
507 return queue_op_status::success;
533 template <
typename value_t,
typename buffer_t, buffer_queue_policy buffer_policy>
534 template <
typename value2_t>
535 requires std::convertible_to<value2_t, value_t>
536 inline queue_op_status buffer_queue<value_t, buffer_t, buffer_policy>::try_push(value2_t && value)
540 detail::spin_delay delay{};
545 return queue_op_status::closed;
548 size_type local_pending_push_back_position = this->pending_push_back_position;
553 size_type next_local_push_back_position = cyclic_increment(local_pending_push_back_position);
554 size_type local_pop_front_position = this->pop_front_position;
558 if (is_ring_buffer_exhausted(local_pop_front_position, next_local_push_back_position))
563 if (this->pending_push_back_position.compare_exchange_weak(local_pending_push_back_position,
564 next_local_push_back_position))
568 auto it =
std::ranges::begin(buffer) + to_buffer_position(local_pending_push_back_position);
569 *it = std::forward<value2_t>(value);
574 detail::spin_delay delay{};
576 size_type acquired_slot = local_pending_push_back_position;
577 while (!this->push_back_position.compare_exchange_weak(acquired_slot,
578 next_local_push_back_position))
580 acquired_slot = local_pending_push_back_position;
584 return queue_op_status::success;
592 if (overflow(std::forward<value2_t>(value)))
594 return queue_op_status::success;
598 return queue_op_status::full;
Adaptations of algorithms from the Ranges TS.
Provides the C++20 <bit> header if it is not already available.
Provides various transformation traits used by the range module.
T current_exception(T... args)
constexpr T bit_ceil(T x) noexcept
Calculates the smallest integral power of two that is not smaller than x.
Definition: bit:133
constexpr std::size_t hardware_destructive_interference_size
Minimum offset between two objects to avoid false sharing.
Definition: new:34
constexpr size_t size
The size of a type pack.
Definition: traits.hpp:151
constexpr auto to
A to view.
Definition: to.hpp:30
A more refined container concept than seqan3::container.
Provides C++17/20 additions to the <new> header, if they are not already available.
Adaptations of concepts from the Ranges TS.
T rethrow_exception(T... args)
Provides std::span from the C++20 standard library.
Adaptations of concepts from the standard library.
Provides seqan3::detail::spin_delay.