SeqAn3 3.4.0-rc.1
The Modern C++ library for sequence analysis.
Loading...
Searching...
No Matches
suspendable_queue.hpp
Go to the documentation of this file.
1// SPDX-FileCopyrightText: 2006-2024 Knut Reinert & Freie Universität Berlin
2// SPDX-FileCopyrightText: 2016-2024 Knut Reinert & MPI für molekulare Genetik
3// SPDX-License-Identifier: BSD-3-Clause
4
11#pragma once
12
13#include <algorithm>
14#include <cassert>
15#include <condition_variable>
16#include <iterator>
17#include <mutex>
18#include <ranges>
19#include <span>
20#include <thread>
21#include <vector>
22
24
25namespace seqan3::contrib
26{
27
28// ============================================================================
29// Forwards
30// ============================================================================
31
32// ============================================================================
33// Classes
34// ============================================================================
35
36// ----------------------------------------------------------------------------
37// Class ConcurrentQueue
38// ----------------------------------------------------------------------------
39
40template <typename TSpec = void>
41struct Tag
42{};
43
44template <typename TSpec = Tag<void>>
45struct Suspendable;
46
47template <typename TValue, typename TSpec = Suspendable<>>
48class ConcurrentQueue;
49
50struct Limit_;
51using Limit = Tag<Limit_>;
52
53template <typename TValue, typename TSpec>
54class ConcurrentQueue<TValue, Suspendable<TSpec>>
55{
56public:
57 typedef std::vector<TValue> TString;
58 typedef typename TString::size_type TSize;
59
60 size_t readerCount;
61 size_t writerCount;
62
63 TString data;
64 TSize occupied;
65 TSize back;
66 TSize front;
67
68 std::mutex cs;
70
71 bool virgin;
72
73 ConcurrentQueue() : readerCount(0), writerCount(0), occupied(0), back(0), front(0), virgin(true)
74 {}
75
76 ~ConcurrentQueue()
77 {
78 assert(writerCount == 0u);
79
80 // wait for all pending readers to finish
81 while (readerCount != 0u)
82 {}
83 }
84};
85
86template <typename TValue>
87class ConcurrentQueue<TValue, Suspendable<Limit>> : public ConcurrentQueue<TValue, Suspendable<>>
88{
89public:
90 typedef ConcurrentQueue<TValue, Suspendable<>> TBase;
91 typedef typename TBase::TString TString;
92 typedef typename TBase::TSize TSize;
93
95
96 ConcurrentQueue(TSize maxSize) : TBase()
97 {
98 this->data.resize(maxSize);
99 // reserve(this->data, maxSize, Exact());
100 // _setLength(this->data, maxSize);
101 }
102
103 ConcurrentQueue(ConcurrentQueue const & other) : TBase((TBase const &)other)
104 {}
105};
106
107template <typename TValue, typename TSpec>
108inline void lockReading(ConcurrentQueue<TValue, Suspendable<TSpec>> &)
109{}
110
111template <typename TValue, typename TSpec>
112inline void unlockReading(ConcurrentQueue<TValue, Suspendable<TSpec>> & me)
113{
114 {
116 if (--me.readerCount != 0u)
117 return;
118 }
119 me.less.notify_all(); // publish the condition that reader count is 0.
120}
121
122template <typename TValue, typename TSpec>
123inline void lockWriting(ConcurrentQueue<TValue, Suspendable<TSpec>> &)
124{}
125
126template <typename TValue, typename TSpec>
127inline void unlockWriting(ConcurrentQueue<TValue, Suspendable<TSpec>> & me)
128{
129 {
131 if (--me.writerCount != 0u)
132 return;
133 }
134 me.more.notify_all(); // publish the condition, that writer count is 0.
135}
136
137template <typename TValue, typename TSize, typename TSpec>
138inline void setReaderCount(ConcurrentQueue<TValue, Suspendable<TSpec>> & me, TSize readerCount)
139{
141 me.readerCount = readerCount;
142}
143
144template <typename TValue, typename TSize, typename TSpec>
145inline void setWriterCount(ConcurrentQueue<TValue, Suspendable<TSpec>> & me, TSize writerCount)
146{
148 me.writerCount = writerCount;
149}
150
151template <typename TValue, typename TSize1, typename TSize2, typename TSpec>
152inline void
153setReaderWriterCount(ConcurrentQueue<TValue, Suspendable<TSpec>> & me, TSize1 readerCount, TSize2 writerCount)
154{
156 me.readerCount = readerCount;
157 me.writerCount = writerCount;
158}
159
160template <typename TValue, typename TSize, typename TSpec>
161inline bool waitForMinSize(ConcurrentQueue<TValue, Suspendable<TSpec>> & me, TSize minSize)
162{
164 while (me.occupied < minSize && me.writerCount > 0u)
165 me.more.wait(lock);
166 return me.occupied >= minSize;
167}
168
169template <typename TValue, typename TSpec>
170inline bool empty(ConcurrentQueue<TValue, Suspendable<TSpec>> const & me)
171{
172 return me.occupied == 0;
173}
174
175template <typename TValue, typename TSpec>
176inline typename ConcurrentQueue<TValue, Suspendable<TSpec>>::SizeType
177length(ConcurrentQueue<TValue, Suspendable<TSpec>> const & me)
178{
179 return me.occupied;
180}
181
182template <typename TValue, typename TSpec>
183inline bool
184_popFront(TValue & result, ConcurrentQueue<TValue, Suspendable<TSpec>> & me, std::unique_lock<std::mutex> & lk)
185{
186 typedef ConcurrentQueue<TValue, Suspendable<TSpec>> TQueue;
187 typedef typename TQueue::TString TString;
188 typedef typename TString::size_type TSize;
189
190 TSize cap = me.data.size();
191
192 while (me.occupied == 0u && me.writerCount > 0u)
193 me.more.wait(lk);
194
195 if (me.occupied == 0u)
196 return false;
197
198 assert(me.occupied > 0u);
199
200 // extract value and destruct it in the data string
201 // TIter it = me.data.begin() + me.front;
202 result = std::ranges::iter_move(std::ranges::next(me.data.begin(), me.front));
203 // std::swap(result, *it);
204 // valueDestruct(it);
205
206 me.front = (me.front + 1) % cap;
207 me.occupied--;
208
209 /* now: either me.occupied > 0 and me.nextout is the index
210 of the next occupied slot in the buffer, or
211 me.occupied == 0 and me.nextout is the index of the next
212 (empty) slot that will be filled by a producer (such as
213 me.nextout == me.nextin) */
214
215 return true;
216}
217
218template <typename TValue, typename TSpec>
219inline bool
220_popBack(TValue & result, ConcurrentQueue<TValue, Suspendable<TSpec>> & me, std::unique_lock<std::mutex> & lk)
221{
222 typedef ConcurrentQueue<TValue, Suspendable<TSpec>> TQueue;
223 typedef typename TQueue::TString TString;
224 typedef typename TString::size_type TSize;
225
226 TSize cap = me.data.size();
227
228 while (me.occupied == 0u && me.writerCount > 0u)
229 me.more.wait(lk);
230
231 if (me.occupied == 0u)
232 return false;
233
234 assert(me.occupied > 0u);
235
236 me.back = (me.back + cap - 1) % cap;
237
238 // extract value and destruct it in the data string
239 // TIter it = me.data.begin() + me.back;
240 result = std::ranges::iter_move(std::ranges::next(me.data.begin(), me.back));
241 // std::swap(result, *it);
242 // valueDestruct(it);
243
244 me.occupied--;
245
246 /* now: either me.occupied > 0 and me.nextout is the index
247 of the next occupied slot in the buffer, or
248 me.occupied == 0 and me.nextout is the index of the next
249 (empty) slot that will be filled by a producer (such as
250 me.nextout == me.nextin) */
251
252 return true;
253}
254
255template <typename TValue, typename TSpec>
256inline bool popFront(TValue & result, ConcurrentQueue<TValue, Suspendable<TSpec>> & me)
257{
259 return _popFront(result, me, lock);
260}
261
262template <typename TValue>
263inline bool popFront(TValue & result, ConcurrentQueue<TValue, Suspendable<Limit>> & me)
264{
265 {
267 if (!_popFront(result, me, lk))
268 return false;
269 }
270 me.less.notify_all();
271 return true;
272}
273
274template <typename TValue, typename TSpec>
275inline bool popBack(TValue & result, ConcurrentQueue<TValue, Suspendable<TSpec>> & me)
276{
278 return _popBack(result, me, lk);
279}
280
281template <typename TValue>
282inline bool popBack(TValue & result, ConcurrentQueue<TValue, Suspendable<Limit>> & me)
283{
284 {
286 if (!_popBack(result, me, lk))
287 return false;
288 }
289 me.less.notify_all();
290 return true;
291}
292
293template <typename TValue, typename TValue2, typename TSpec, typename TExpand>
294inline bool
295appendValue(ConcurrentQueue<TValue, Suspendable<TSpec>> & me, TValue2 && val, [[maybe_unused]] Tag<TExpand> expandTag)
296{
297 typedef ConcurrentQueue<TValue, Suspendable<TSpec>> TQueue;
298 typedef typename TQueue::TString TString;
299 typedef typename TString::size_type TSize;
300
301 {
303 TSize cap = me.data.size();
304
305 if (me.occupied >= cap)
306 {
307 // increase capacity
308 // _setLength(me.data, cap);
309 // reserve(me.data, cap + 1, expandTag);
310 me.data.resize(cap + 1);
311 TSize delta = me.data.size() - cap;
312 assert(delta == 1);
313
314 // create a gap of delta many values between tail and head
315 // Why?
316 // _clearSpace(me.data, delta, me.back, me.back, expandTag);
317 std::ranges::move_backward(std::span{me.data.data() + me.front, me.data.data() + cap},
318 me.data.data() + me.data.size());
319 if (me.occupied != 0 && me.back <= me.front)
320 me.front += delta;
321
322 cap += delta;
323 }
324
325 // valueConstruct(begin(me.data, Standard()) + me.back, val);
326 *std::ranges::next(me.data.begin(), me.back) = std::forward<TValue2>(val);
327 me.back = (me.back + 1) % cap;
328
329 ++me.occupied;
330 }
331
332 /* now: either me.occupied < BSIZE and me.nextin is the index
333 of the next empty slot in the buffer, or
334 me.occupied == BSIZE and me.nextin is the index of the
335 next (occupied) slot that will be emptied by a consumer
336 (such as me.nextin == me.nextout) */
337 me.more.notify_all();
338 return true;
339}
340
341template <typename TValue, typename TValue2, typename TSpec, typename TExpand>
342inline bool appendValue(ConcurrentQueue<TValue, Suspendable<Limit>> & me, TValue2 && val, Tag<TExpand> expandTag);
343
344template <typename TValue, typename TValue2>
345inline bool appendValue(ConcurrentQueue<TValue, Suspendable<Limit>> & me, TValue2 && val, Limit)
346{
347 typedef ConcurrentQueue<TValue, Suspendable<Limit>> TQueue;
348 typedef typename TQueue::TString TString;
349 typedef typename TString::size_type TSize;
350
351 {
353 TSize cap = me.data.size();
354
355 while (me.occupied >= cap && me.readerCount > 0u)
356 me.less.wait(lock);
357
358 if (me.occupied >= cap)
359 return false;
360
361 assert(me.occupied < cap);
362
363 // valueConstruct(begin(me.data, Standard()) + me.back, val);
364 *std::ranges::next(me.data.begin(), me.back) = std::forward<TValue2>(val);
365 me.back = (me.back + 1) % cap;
366 me.occupied++;
367 }
368
369 /* now: either me.occupied < BSIZE and me.nextin is the index
370 of the next empty slot in the buffer, or
371 me.occupied == BSIZE and me.nextin is the index of the
372 next (occupied) slot that will be emptied by a consumer
373 (such as me.nextin == me.nextout) */
374 me.more.notify_all();
375 return true;
376}
377
378template <typename TValue, typename TValue2, typename TSpec>
379inline bool appendValue(ConcurrentQueue<TValue, Suspendable<TSpec>> & me, TValue2 && val)
380{
381 return appendValue(me, std::forward<TValue2>(val), TSpec{});
382}
383
384} // namespace seqan3::contrib
T data(T... args)
T empty(T... args)
typename decltype(detail::front(list_t{}))::type front
Return the first type from the type list.
Definition type_list/traits.hpp:293
typename decltype(detail::back(list_t{}))::type back
Return the last type from the type list.
Definition type_list/traits.hpp:313
T lock(T... args)
T move_backward(T... args)
Provides platform and dependency checks.
Hide me