33namespace seqan3::contrib
37enum class queue_op_status : uint8_t
45enum struct buffer_queue_policy : uint8_t
87template <std::semiregular value_t,
89 buffer_queue_policy buffer_policy = buffer_queue_policy::dynamic>
93 using buffer_type = buffer_t;
94 using value_type =
typename buffer_type::value_type;
95 using size_type =
typename buffer_type::size_type;
96 using reference = void;
97 using const_reference = void;
100 buffer_queue() : buffer_queue{0u}
102 buffer_queue(buffer_queue
const &) =
delete;
103 buffer_queue(buffer_queue &&) =
delete;
104 buffer_queue & operator=(buffer_queue
const &) =
delete;
105 buffer_queue & operator=(buffer_queue &&) =
delete;
106 ~buffer_queue() =
default;
109 explicit buffer_queue(size_type
const init_capacity)
111 buffer.resize(init_capacity + 1);
115 template <std::ranges::input_range range_type>
116 requires std::convertible_to<std::ranges::range_value_t<range_type>, value_type>
117 buffer_queue(size_type
const init_capacity, range_type && r) : buffer_queue{init_capacity}
125 template <
typename value2_t>
126 requires std::convertible_to<value2_t, value_t>
127 void push(value2_t && value)
129 detail::spin_delay delay{};
133 auto status = try_push(std::forward<value2_t>(value));
134 if (status == queue_op_status::closed)
135 throw queue_op_status::closed;
136 else if (status == queue_op_status::success)
139 assert(status != queue_op_status::empty);
140 assert(status == queue_op_status::full);
145 template <
typename value2_t>
146 requires std::convertible_to<value2_t, value_t>
147 queue_op_status wait_push(value2_t && value)
149 detail::spin_delay delay{};
153 auto status = try_push(std::forward<value2_t>(value));
155 if (status != queue_op_status::full)
158 assert(status != queue_op_status::empty);
159 assert(status == queue_op_status::full);
164 value_type value_pop()
166 detail::spin_delay delay{};
171 if (!writer_waiting.load())
173 auto status = try_pop(value);
175 if (status == queue_op_status::closed)
176 throw queue_op_status::closed;
177 else if (status == queue_op_status::success)
180 assert(status != queue_op_status::full);
181 assert(status == queue_op_status::empty);
187 queue_op_status wait_pop(value_type & value)
189 detail::spin_delay delay{};
194 if (!writer_waiting.load())
198 if (status == queue_op_status::closed || status == queue_op_status::success)
201 assert(status != queue_op_status::full);
202 assert(status == queue_op_status::empty);
213 template <
typename value2_t>
214 requires std::convertible_to<value2_t, value_t>
215 queue_op_status try_push(value2_t &&);
217 queue_op_status try_pop(value_t &);
225 if (writer_waiting.exchange(
true))
232 writer_waiting.store(
false);
236 writer_waiting.store(
false);
241 bool is_closed() const noexcept
249 return pop_front_position == push_back_position;
252 bool is_full() const noexcept
255 return is_ring_buffer_exhausted(pop_front_position, push_back_position);
258 size_type
size() const noexcept
261 if (to_buffer_position(pop_front_position) <= to_buffer_position(push_back_position))
263 return to_buffer_position(push_back_position) - to_buffer_position(pop_front_position);
267 assert(buffer.size() > (to_buffer_position(pop_front_position) - to_buffer_position(push_back_position)));
268 return buffer.size() - (to_buffer_position(pop_front_position) - to_buffer_position(push_back_position));
283 constexpr bool is_ring_buffer_exhausted(size_type
const from, size_type
const to)
const
285 assert(to <= (from + ring_buffer_capacity + 1));
287 return to >= from + ring_buffer_capacity;
303 constexpr size_type to_buffer_position(size_type
const position)
const
305 return position & (ring_buffer_capacity - 1);
327 size_type cyclic_increment(size_type position)
334 if (to_buffer_position(++position) >= buffer.size())
335 position += ring_buffer_capacity - buffer.size();
339 template <
typename value2_t>
340 requires (std::convertible_to<value2_t, value_t>) && (buffer_policy == buffer_queue_policy::fixed)
bool
341 overflow(value2_t &&)
346 template <
typename value2_t>
347 requires (std::convertible_to<value2_t, value_t>) && (buffer_policy == buffer_queue_policy::dynamic)
bool
348 overflow(value2_t && value);
363template <std::semiregular value_t, sequence_container buffer_t = std::vector<value_t>>
364using fixed_buffer_queue = buffer_queue<value_t, buffer_t, buffer_queue_policy::fixed>;
367template <std::semiregular value_t, sequence_container buffer_t = std::vector<value_t>>
368using dynamic_buffer_queue = buffer_queue<value_t, buffer_t, buffer_queue_policy::dynamic>;
378template <std::semiregular value_t, sequence_container buffer_t, buffer_queue_policy buffer_policy>
379template <
typename value2_t>
380 requires (std::convertible_to<value2_t, value_t>) && (buffer_policy == buffer_queue_policy::dynamic)
381inline bool buffer_queue<value_t, buffer_t, buffer_policy>::overflow(value2_t && value)
386 size_type old_size = buffer.size();
387 size_type ring_buffer_capacity = this->ring_buffer_capacity;
388 size_type local_front = this->pop_front_position;
389 size_type local_back = this->push_back_position;
392 assert(local_back == this->pending_push_back_position);
393 assert(local_front == this->pending_pop_front_position);
395 bool valueWasAppended =
false;
399 if (is_ring_buffer_exhausted(local_front, cyclic_increment(local_back)))
407 *it = std::forward<value2_t>(value);
408 local_back = local_front + ring_buffer_capacity;
409 valueWasAppended =
true;
412 assert(is_ring_buffer_exhausted(local_front, local_back));
415 size_type front_buffer_position = to_buffer_position(local_front);
416 size_type back_buffer_position = to_buffer_position(local_back);
419 buffer.resize(old_size + 1);
422 buffer.data() + buffer.size());
427 this->pending_pop_front_position = this->pop_front_position = front_buffer_position + 1;
428 this->pending_push_back_position = this->push_back_position = back_buffer_position + ring_buffer_capacity;
430 this->ring_buffer_capacity = ring_buffer_capacity;
432 return valueWasAppended;
455template <std::semiregular value_t, sequence_container buffer_t, buffer_queue_policy buffer_policy>
456inline queue_op_status buffer_queue<value_t, buffer_t, buffer_policy>::try_pop(value_t & result)
461 size_type local_pending_pop_front_position{};
462 size_type next_local_pop_front_position{};
463 detail::spin_delay spinDelay{};
465 local_pending_pop_front_position = this->pending_pop_front_position;
469 size_type local_push_back_position = this->push_back_position;
471 assert(local_pending_pop_front_position <= local_push_back_position);
474 if (local_pending_pop_front_position == local_push_back_position)
476 return is_closed() ? queue_op_status::closed : queue_op_status::empty;
480 next_local_pop_front_position = cyclic_increment(local_pending_pop_front_position);
483 if (this->pending_pop_front_position.compare_exchange_weak(local_pending_pop_front_position,
484 next_local_pop_front_position))
491 result = std::ranges::iter_move(buffer.begin() + to_buffer_position(local_pending_pop_front_position));
495 detail::spin_delay delay{};
496 size_type acquired_slot = local_pending_pop_front_position;
497 while (!this->pop_front_position.compare_exchange_weak(acquired_slot, next_local_pop_front_position))
499 acquired_slot = local_pending_pop_front_position;
504 return queue_op_status::success;
530template <std::semiregular value_t, sequence_container buffer_t, buffer_queue_policy buffer_policy>
531template <
typename value2_t>
532 requires std::convertible_to<value2_t, value_t>
533inline queue_op_status buffer_queue<value_t, buffer_t, buffer_policy>::try_push(value2_t && value)
537 detail::spin_delay delay{};
542 return queue_op_status::closed;
545 size_type local_pending_push_back_position = this->pending_push_back_position;
550 size_type next_local_push_back_position = cyclic_increment(local_pending_push_back_position);
551 size_type local_pop_front_position = this->pop_front_position;
555 if (is_ring_buffer_exhausted(local_pop_front_position, next_local_push_back_position))
560 if (this->pending_push_back_position.compare_exchange_weak(local_pending_push_back_position,
561 next_local_push_back_position))
565 auto it =
std::ranges::begin(buffer) + to_buffer_position(local_pending_push_back_position);
566 *it = std::forward<value2_t>(value);
571 detail::spin_delay delay{};
573 size_type acquired_slot = local_pending_push_back_position;
575 !this->push_back_position.compare_exchange_weak(acquired_slot, next_local_push_back_position))
577 acquired_slot = local_pending_push_back_position;
581 return queue_op_status::success;
589 if (overflow(std::forward<value2_t>(value)))
591 return queue_op_status::success;
595 return queue_op_status::full;
Provides various transformation traits used by the range module.
T current_exception(T... args)
constexpr std::size_t hardware_destructive_interference_size
Minimum offset between two objects to avoid false sharing.
Definition: new:34
seqan::std::ranges::to to
Converts a range to a container. <dl class="no-api">This entity is not part of the SeqAn API....
Definition: to.hpp:26
constexpr size_t size
The size of a type pack.
Definition: type_pack/traits.hpp:146
A more refined container concept than seqan3::container.
T move_backward(T... args)
The <new> header from C++17's standard library.
T rethrow_exception(T... args)
Provides seqan3::detail::spin_delay.
Adaptations of concepts from the standard library.