33namespace seqan3::contrib
37enum class queue_op_status : uint8_t
45enum struct buffer_queue_policy : uint8_t
87template <std::semiregular value_t,
89 buffer_queue_policy buffer_policy = buffer_queue_policy::dynamic>
94 using buffer_type = buffer_t;
95 using value_type =
typename buffer_type::value_type;
96 using size_type =
typename buffer_type::size_type;
97 using reference = void;
98 using const_reference = void;
101 buffer_queue() : buffer_queue{0u}
103 buffer_queue(buffer_queue
const &) =
delete;
104 buffer_queue(buffer_queue &&) =
delete;
105 buffer_queue & operator=(buffer_queue
const &) =
delete;
106 buffer_queue & operator=(buffer_queue &&) =
delete;
107 ~buffer_queue() =
default;
110 explicit buffer_queue(size_type
const init_capacity)
112 buffer.resize(init_capacity + 1);
116 template <std::ranges::input_range range_type>
117 requires std::convertible_to<std::ranges::range_value_t<range_type>, value_type>
118 buffer_queue(size_type
const init_capacity, range_type && r) : buffer_queue{init_capacity}
126 template <
typename value2_t>
127 requires std::convertible_to<value2_t, value_t>
128 void push(value2_t && value)
130 detail::spin_delay delay{};
134 auto status = try_push(std::forward<value2_t>(value));
135 if (status == queue_op_status::closed)
136 throw queue_op_status::closed;
137 else if (status == queue_op_status::success)
140 assert(status != queue_op_status::empty);
141 assert(status == queue_op_status::full);
146 template <
typename value2_t>
147 requires std::convertible_to<value2_t, value_t>
148 queue_op_status wait_push(value2_t && value)
150 detail::spin_delay delay{};
154 auto status = try_push(std::forward<value2_t>(value));
156 if (status != queue_op_status::full)
159 assert(status != queue_op_status::empty);
160 assert(status == queue_op_status::full);
165 value_type value_pop()
167 detail::spin_delay delay{};
172 if (!writer_waiting.load())
174 auto status = try_pop(value);
176 if (status == queue_op_status::closed)
177 throw queue_op_status::closed;
178 else if (status == queue_op_status::success)
181 assert(status != queue_op_status::full);
182 assert(status == queue_op_status::empty);
188 queue_op_status wait_pop(value_type & value)
190 detail::spin_delay delay{};
195 if (!writer_waiting.load())
199 if (status == queue_op_status::closed || status == queue_op_status::success)
202 assert(status != queue_op_status::full);
203 assert(status == queue_op_status::empty);
214 template <
typename value2_t>
215 requires std::convertible_to<value2_t, value_t>
216 queue_op_status try_push(value2_t &&);
218 queue_op_status try_pop(value_t &);
226 if (writer_waiting.exchange(
true))
233 writer_waiting.store(
false);
237 writer_waiting.store(
false);
242 bool is_closed() const noexcept
250 return pop_front_position == push_back_position;
253 bool is_full() const noexcept
256 return is_ring_buffer_exhausted(pop_front_position, push_back_position);
259 size_type
size() const noexcept
262 if (to_buffer_position(pop_front_position) <= to_buffer_position(push_back_position))
264 return to_buffer_position(push_back_position) - to_buffer_position(pop_front_position);
268 assert(buffer.size() > (to_buffer_position(pop_front_position) - to_buffer_position(push_back_position)));
269 return buffer.size() - (to_buffer_position(pop_front_position) - to_buffer_position(push_back_position));
284 constexpr bool is_ring_buffer_exhausted(size_type
const from, size_type
const to)
const
286 assert(
to <= (from + ring_buffer_capacity + 1));
288 return to >= from + ring_buffer_capacity;
304 constexpr size_type to_buffer_position(size_type
const position)
const
306 return position & (ring_buffer_capacity - 1);
328 size_type cyclic_increment(size_type position)
335 if (to_buffer_position(++position) >= buffer.size())
336 position += ring_buffer_capacity - buffer.size();
340 template <
typename value2_t>
341 requires (std::convertible_to<value2_t, value_t>) &&
342 (buffer_policy == buffer_queue_policy::fixed)
343 bool overflow(value2_t &&)
348 template <
typename value2_t>
349 requires (std::convertible_to<value2_t, value_t>) &&
350 (buffer_policy == buffer_queue_policy::dynamic)
351 bool overflow(value2_t && value);
366template <std::semiregular value_t, sequence_container buffer_t = std::vector<value_t>>
367using fixed_buffer_queue = buffer_queue<value_t, buffer_t, buffer_queue_policy::fixed>;
370template <std::semiregular value_t, sequence_container buffer_t = std::vector<value_t>>
371using dynamic_buffer_queue = buffer_queue<value_t, buffer_t, buffer_queue_policy::dynamic>;
381template <std::semiregular value_t, sequence_container buffer_t, buffer_queue_policy buffer_policy>
382template <
typename value2_t>
383 requires (std::convertible_to<value2_t, value_t>) &&
384 (buffer_policy == buffer_queue_policy::dynamic)
385inline bool buffer_queue<value_t, buffer_t, buffer_policy>::overflow(value2_t && value)
390 size_type old_size = buffer.size();
391 size_type ring_buffer_capacity = this->ring_buffer_capacity;
392 size_type local_front = this->pop_front_position;
393 size_type local_back = this->push_back_position;
396 assert(local_back == this->pending_push_back_position);
397 assert(local_front == this->pending_pop_front_position);
399 bool valueWasAppended =
false;
403 if (is_ring_buffer_exhausted(local_front, cyclic_increment(local_back)))
411 *it = std::forward<value2_t>(value);
412 local_back = local_front + ring_buffer_capacity;
413 valueWasAppended =
true;
416 assert(is_ring_buffer_exhausted(local_front, local_back));
419 size_type front_buffer_position = to_buffer_position(local_front);
420 size_type back_buffer_position = to_buffer_position(local_back);
423 buffer.resize(old_size + 1);
426 buffer.data() + buffer.size());
431 this->pending_pop_front_position = this->pop_front_position = front_buffer_position + 1;
432 this->pending_push_back_position = this->push_back_position = back_buffer_position + ring_buffer_capacity;
434 this->ring_buffer_capacity = ring_buffer_capacity;
436 return valueWasAppended;
459template <std::semiregular value_t, sequence_container buffer_t, buffer_queue_policy buffer_policy>
460inline queue_op_status buffer_queue<value_t, buffer_t, buffer_policy>::try_pop(value_t & result)
465 size_type local_pending_pop_front_position{};
466 size_type next_local_pop_front_position{};
467 detail::spin_delay spinDelay{};
469 local_pending_pop_front_position = this->pending_pop_front_position;
473 size_type local_push_back_position = this->push_back_position;
475 assert(local_pending_pop_front_position <= local_push_back_position);
478 if (local_pending_pop_front_position == local_push_back_position)
480 return is_closed() ? queue_op_status::closed : queue_op_status::empty;
484 next_local_pop_front_position = cyclic_increment(local_pending_pop_front_position);
487 if (this->pending_pop_front_position.compare_exchange_weak(local_pending_pop_front_position,
488 next_local_pop_front_position))
495 result = std::ranges::iter_move(buffer.begin() + to_buffer_position(local_pending_pop_front_position));
499 detail::spin_delay delay{};
500 size_type acquired_slot = local_pending_pop_front_position;
501 while (!this->pop_front_position.compare_exchange_weak(acquired_slot, next_local_pop_front_position))
503 acquired_slot = local_pending_pop_front_position;
508 return queue_op_status::success;
534template <std::semiregular value_t, sequence_container buffer_t, buffer_queue_policy buffer_policy>
535template <
typename value2_t>
536 requires std::convertible_to<value2_t, value_t>
537inline queue_op_status buffer_queue<value_t, buffer_t, buffer_policy>::try_push(value2_t && value)
541 detail::spin_delay delay{};
546 return queue_op_status::closed;
549 size_type local_pending_push_back_position = this->pending_push_back_position;
554 size_type next_local_push_back_position = cyclic_increment(local_pending_push_back_position);
555 size_type local_pop_front_position = this->pop_front_position;
559 if (is_ring_buffer_exhausted(local_pop_front_position, next_local_push_back_position))
564 if (this->pending_push_back_position.compare_exchange_weak(local_pending_push_back_position,
565 next_local_push_back_position))
569 auto it =
std::ranges::begin(buffer) + to_buffer_position(local_pending_push_back_position);
570 *it = std::forward<value2_t>(value);
575 detail::spin_delay delay{};
577 size_type acquired_slot = local_pending_push_back_position;
578 while (!this->push_back_position.compare_exchange_weak(acquired_slot,
579 next_local_push_back_position))
581 acquired_slot = local_pending_push_back_position;
585 return queue_op_status::success;
593 if (overflow(std::forward<value2_t>(value)))
595 return queue_op_status::success;
599 return queue_op_status::full;
Provides various transformation traits used by the range module.
T current_exception(T... args)
constexpr std::size_t hardware_destructive_interference_size
Minimum offset between two objects to avoid false sharing.
Definition: new:34
constexpr auto to(args_t &&... args)
Converts a range to a container.
Definition: to.hpp:114
constexpr size_t size
The size of a type pack.
Definition: traits.hpp:146
A more refined container concept than seqan3::container.
T move_backward(T... args)
The <new> header from C++17's standard library.
T rethrow_exception(T... args)
Provides seqan3::detail::spin_delay.
Adaptations of concepts from the standard library.