20 #include <type_traits>
33 namespace seqan3::contrib
37 enum class queue_op_status : uint8_t
45 enum struct buffer_queue_policy : uint8_t
87 template <std::semiregular value_t,
89 buffer_queue_policy buffer_policy = buffer_queue_policy::dynamic>
94 using buffer_type = buffer_t;
95 using value_type =
typename buffer_type::value_type;
96 using size_type =
typename buffer_type::size_type;
97 using reference = void;
98 using const_reference = void;
101 buffer_queue() : buffer_queue{0u}
103 buffer_queue(buffer_queue
const &) =
delete;
104 buffer_queue(buffer_queue &&) =
delete;
105 buffer_queue & operator=(buffer_queue
const &) =
delete;
106 buffer_queue & operator=(buffer_queue &&) =
delete;
107 ~buffer_queue() =
default;
110 explicit buffer_queue(size_type
const init_capacity)
112 buffer.resize(init_capacity + 1);
113 ring_buffer_capacity = seqan3::detail::next_power_of_two(buffer.size());
116 template <std::ranges::input_range range_type>
117 requires std::convertible_to<std::ranges::range_value_t<range_type>, value_type>
118 buffer_queue(size_type
const init_capacity, range_type && r) : buffer_queue{init_capacity}
120 std::ranges::copy(r, std::ranges::begin(buffer));
126 template <
typename value2_t>
127 requires std::convertible_to<value2_t, value_t>
128 void push(value2_t && value)
130 detail::spin_delay delay{};
134 auto status = try_push(std::forward<value2_t>(value));
135 if (status == queue_op_status::closed)
136 throw queue_op_status::closed;
137 else if (status == queue_op_status::success)
140 assert(status != queue_op_status::empty);
141 assert(status == queue_op_status::full);
146 template <
typename value2_t>
147 requires std::convertible_to<value2_t, value_t>
148 queue_op_status wait_push(value2_t && value)
150 detail::spin_delay delay{};
154 auto status = try_push(std::forward<value2_t>(value));
156 if (status != queue_op_status::full)
159 assert(status != queue_op_status::empty);
160 assert(status == queue_op_status::full);
165 value_type value_pop()
167 detail::spin_delay delay{};
172 auto status = try_pop(value);
174 if (status == queue_op_status::closed)
175 throw queue_op_status::closed;
176 else if (status == queue_op_status::success)
179 assert(status != queue_op_status::full);
180 assert(status == queue_op_status::empty);
185 queue_op_status wait_pop(value_type & value)
187 detail::spin_delay delay{};
194 if (status == queue_op_status::closed || status == queue_op_status::success)
197 assert(status != queue_op_status::full);
198 assert(status == queue_op_status::empty);
208 template <
typename value2_t>
209 requires std::convertible_to<value2_t, value_t>
210 queue_op_status try_push(value2_t &&);
212 queue_op_status try_pop(value_t &);
218 void close() noexcept
220 closed_flag.store(
true, std::memory_order_release);
223 bool is_closed() const noexcept
225 return closed_flag.load(std::memory_order_acquire);
231 return pop_front_position == push_back_position;
234 bool is_full() const noexcept
237 return is_ring_buffer_exhausted(pop_front_position, push_back_position);
240 size_type
size() const noexcept
243 if (to_buffer_position(pop_front_position) <= to_buffer_position(push_back_position))
245 return to_buffer_position(push_back_position) - to_buffer_position(pop_front_position);
249 assert(buffer.size() > (to_buffer_position(pop_front_position) - to_buffer_position(push_back_position)));
250 return buffer.size() - (to_buffer_position(pop_front_position) - to_buffer_position(push_back_position));
265 constexpr
bool is_ring_buffer_exhausted(size_type
const from, size_type
const to)
const
267 assert(to <= (from + ring_buffer_capacity + 1));
269 return to >= from + ring_buffer_capacity;
285 constexpr size_type to_buffer_position(size_type
const position)
const
287 return position & (ring_buffer_capacity - 1);
309 size_type cyclic_increment(size_type position)
316 if (to_buffer_position(++position) >= buffer.size())
317 position += ring_buffer_capacity - buffer.size();
321 template <
typename value2_t>
322 requires (std::convertible_to<value2_t, value_t>) &&
323 (buffer_policy == buffer_queue_policy::fixed)
324 bool overflow(value2_t &&)
329 template <
typename value2_t>
330 requires (std::convertible_to<value2_t, value_t>) &&
331 (buffer_policy == buffer_queue_policy::dynamic)
332 bool overflow(value2_t && value);
346 template <std::semiregular value_t, sequence_container buffer_t = std::vector<value_t>>
347 using fixed_buffer_queue = buffer_queue<value_t, buffer_t, buffer_queue_policy::fixed>;
350 template <std::semiregular value_t, sequence_container buffer_t = std::vector<value_t>>
351 using dynamic_buffer_queue = buffer_queue<value_t, buffer_t, buffer_queue_policy::dynamic>;
361 template <
typename value_t,
typename buffer_t, buffer_queue_policy buffer_policy>
362 template <
typename value2_t>
363 requires (std::convertible_to<value2_t, value_t>) &&
364 (buffer_policy == buffer_queue_policy::dynamic)
365 inline bool buffer_queue<value_t, buffer_t, buffer_policy>::overflow(value2_t && value)
370 size_type old_size = buffer.size();
371 size_type ring_buffer_capacity = this->ring_buffer_capacity;
372 size_type local_front = this->pop_front_position;
373 size_type local_back = this->push_back_position;
376 assert(local_back == this->pending_push_back_position);
377 assert(local_front == this->pending_pop_front_position);
379 bool valueWasAppended =
false;
383 if (is_ring_buffer_exhausted(local_front, cyclic_increment(local_back)))
391 *it = std::forward<value2_t>(value);
392 local_back = local_front + ring_buffer_capacity;
393 valueWasAppended =
true;
396 assert(is_ring_buffer_exhausted(local_front, local_back));
399 size_type front_buffer_position = to_buffer_position(local_front);
400 size_type back_buffer_position = to_buffer_position(local_back);
403 buffer.resize(old_size + 1);
404 ring_buffer_capacity = seqan3::detail::next_power_of_two(buffer.size());
405 std::ranges::move_backward(
std::span{buffer.
data() + front_buffer_position, buffer.data() + old_size},
406 buffer.data() + buffer.size());
411 this->pending_pop_front_position = this->pop_front_position = front_buffer_position + 1;
412 this->pending_push_back_position = this->push_back_position = back_buffer_position + ring_buffer_capacity;
414 this->ring_buffer_capacity = ring_buffer_capacity;
416 return valueWasAppended;
439 template <
typename value_t,
typename buffer_t, buffer_queue_policy buffer_policy>
440 inline queue_op_status buffer_queue<value_t, buffer_t, buffer_policy>::try_pop(value_t & result)
445 size_type local_pending_pop_front_position{};
446 size_type next_local_pop_front_position{};
447 detail::spin_delay spinDelay{};
449 local_pending_pop_front_position = this->pending_pop_front_position;
453 size_type local_push_back_position = this->push_back_position;
455 assert(local_pending_pop_front_position <= local_push_back_position);
458 if (local_pending_pop_front_position == local_push_back_position)
460 return is_closed() ? queue_op_status::closed : queue_op_status::empty;
464 next_local_pop_front_position = cyclic_increment(local_pending_pop_front_position);
467 if (this->pending_pop_front_position.compare_exchange_weak(local_pending_pop_front_position,
468 next_local_pop_front_position))
475 result = std::ranges::iter_move(buffer.begin() + to_buffer_position(local_pending_pop_front_position));
479 detail::spin_delay delay{};
480 size_type acquired_slot = local_pending_pop_front_position;
481 while (!this->pop_front_position.compare_exchange_weak(acquired_slot, next_local_pop_front_position))
483 acquired_slot = local_pending_pop_front_position;
488 return queue_op_status::success;
514 template <
typename value_t,
typename buffer_t, buffer_queue_policy buffer_policy>
515 template <
typename value2_t>
516 requires std::convertible_to<value2_t, value_t>
517 inline queue_op_status buffer_queue<value_t, buffer_t, buffer_policy>::try_push(value2_t && value)
521 detail::spin_delay delay{};
526 return queue_op_status::closed;
529 size_type local_pending_push_back_position = this->pending_push_back_position;
534 size_type next_local_push_back_position = cyclic_increment(local_pending_push_back_position);
535 size_type local_pop_front_position = this->pop_front_position;
539 if (is_ring_buffer_exhausted(local_pop_front_position, next_local_push_back_position))
544 if (this->pending_push_back_position.compare_exchange_weak(local_pending_push_back_position,
545 next_local_push_back_position))
549 auto it =
std::ranges::begin(buffer) + to_buffer_position(local_pending_push_back_position);
550 *it = std::forward<value2_t>(value);
555 detail::spin_delay delay{};
557 size_type acquired_slot = local_pending_push_back_position;
558 while (!this->push_back_position.compare_exchange_weak(acquired_slot,
559 next_local_push_back_position))
561 acquired_slot = local_pending_push_back_position;
565 return queue_op_status::success;
573 if (overflow(std::forward<value2_t>(value)))
575 return queue_op_status::success;
579 return queue_op_status::full;