SeqAn3 3.1.0
The Modern C++ library for sequence analysis.
All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Modules Pages
suspendable_queue.hpp
Go to the documentation of this file.
1// -----------------------------------------------------------------------------------------------------
2// Copyright (c) 2006-2021, Knut Reinert & Freie Universität Berlin
3// Copyright (c) 2016-2021, Knut Reinert & MPI für molekulare Genetik
4// This file may be used, modified and/or redistributed under the terms of the 3-clause BSD-License
5// shipped with this file and also available at: https://github.com/seqan/seqan3/blob/master/LICENSE.md
6// -----------------------------------------------------------------------------------------------------
7
14#pragma once
15
16#include <condition_variable>
17#include <mutex>
18#include <thread>
19#include <vector>
20
22#include <seqan3/std/algorithm>
23#include <seqan3/std/iterator>
24#include <seqan3/std/ranges>
25#include <seqan3/std/span>
26
27namespace seqan3::contrib
28{
29
30// ============================================================================
31// Forwards
32// ============================================================================
33
34// ============================================================================
35// Classes
36// ============================================================================
37
38// ----------------------------------------------------------------------------
39// Class ConcurrentQueue
40// ----------------------------------------------------------------------------
41
42template <typename TSpec = void>
43struct Tag{};
44
45template <typename TSpec = Tag<void>>
46struct Suspendable;
47
48template <typename TValue, typename TSpec = Suspendable<>>
49class ConcurrentQueue;
50
51struct Limit_;
52using Limit = Tag<Limit_>;
53
54template <typename TValue, typename TSpec>
55class ConcurrentQueue<TValue, Suspendable<TSpec> >
56{
57public:
58 typedef std::vector<TValue> TString;
59 typedef typename TString::size_type TSize;
60
61 size_t readerCount;
62 size_t writerCount;
63
64 TString data;
65 TSize occupied;
66 TSize back;
67 TSize front;
68
69 std::mutex cs;
71
72 bool virgin;
73
74 ConcurrentQueue():
75 readerCount(0),
76 writerCount(0),
77 occupied(0),
78 back(0),
79 front(0),
80 virgin(true)
81 {}
82
83 ~ConcurrentQueue()
84 {
85 assert(writerCount == 0u);
86
87 // wait for all pending readers to finish
88 while (readerCount != 0u)
89 {}
90 }
91};
92
93template <typename TValue>
94class ConcurrentQueue<TValue, Suspendable<Limit> >:
95 public ConcurrentQueue<TValue, Suspendable<> >
96{
97public:
98 typedef ConcurrentQueue<TValue, Suspendable<> > TBase;
99 typedef typename TBase::TString TString;
100 typedef typename TBase::TSize TSize;
101
103
104 ConcurrentQueue(TSize maxSize):
105 TBase()
106 {
107 this->data.resize(maxSize);
108 // reserve(this->data, maxSize, Exact());
109 // _setLength(this->data, maxSize);
110 }
111
112 ConcurrentQueue(ConcurrentQueue const & other):
113 TBase((TBase const &)other)
114 {}
115};
116
117template <typename TValue, typename TSpec>
118inline void
119lockReading(ConcurrentQueue<TValue, Suspendable<TSpec> > &)
120{}
121
122template <typename TValue, typename TSpec>
123inline void
124unlockReading(ConcurrentQueue<TValue, Suspendable<TSpec> > & me)
125{
126 {
128 if (--me.readerCount != 0u)
129 return;
130 }
131 me.less.notify_all(); // publish the condition that reader count is 0.
132}
133
134template <typename TValue, typename TSpec>
135inline void
136lockWriting(ConcurrentQueue<TValue, Suspendable<TSpec> > &)
137{}
138
139template <typename TValue, typename TSpec>
140inline void
141unlockWriting(ConcurrentQueue<TValue, Suspendable<TSpec> > & me)
142{
143 {
145 if (--me.writerCount != 0u)
146 return;
147 }
148 me.more.notify_all(); // publish the condition, that writer count is 0.
149}
150
151template <typename TValue, typename TSize, typename TSpec>
152inline void
153setReaderCount(ConcurrentQueue<TValue, Suspendable<TSpec> > & me, TSize readerCount)
154{
156 me.readerCount = readerCount;
157}
158
159template <typename TValue, typename TSize, typename TSpec>
160inline void
161setWriterCount(ConcurrentQueue<TValue, Suspendable<TSpec> > & me, TSize writerCount)
162{
164 me.writerCount = writerCount;
165}
166
167template <typename TValue, typename TSize1, typename TSize2, typename TSpec>
168inline void
169setReaderWriterCount(ConcurrentQueue<TValue, Suspendable<TSpec> > & me, TSize1 readerCount, TSize2 writerCount)
170{
172 me.readerCount = readerCount;
173 me.writerCount = writerCount;
174}
175
176template <typename TValue, typename TSize, typename TSpec>
177inline bool
178waitForMinSize(ConcurrentQueue<TValue, Suspendable<TSpec> > & me,
179 TSize minSize)
180{
182 while (me.occupied < minSize && me.writerCount > 0u)
183 me.more.wait(lock);
184 return me.occupied >= minSize;
185}
186
187template <typename TValue, typename TSpec>
188inline bool
189empty(ConcurrentQueue<TValue, Suspendable<TSpec> > const & me)
190{
191 return me.occupied == 0;
192}
193
194template <typename TValue, typename TSpec>
195inline typename ConcurrentQueue<TValue, Suspendable<TSpec> >::SizeType
196length(ConcurrentQueue<TValue, Suspendable<TSpec> > const & me)
197{
198 return me.occupied;
199}
200
201template <typename TValue, typename TSpec>
202inline bool
203_popFront(TValue & result, ConcurrentQueue<TValue, Suspendable<TSpec> > & me,
205{
206 typedef ConcurrentQueue<TValue, Suspendable<TSpec> > TQueue;
207 typedef typename TQueue::TString TString;
208 typedef typename TString::size_type TSize;
209
210 TSize cap = me.data.size();
211
212 while (me.occupied == 0u && me.writerCount > 0u)
213 me.more.wait(lk);
214
215 if (me.occupied == 0u)
216 return false;
217
218 assert(me.occupied > 0u);
219
220 // extract value and destruct it in the data string
221 // TIter it = me.data.begin() + me.front;
222 result = std::ranges::iter_move(std::ranges::next(me.data.begin(), me.front));
223 // std::swap(result, *it);
224 // valueDestruct(it);
225
226 me.front = (me.front + 1) % cap;
227 me.occupied--;
228
229 /* now: either me.occupied > 0 and me.nextout is the index
230 of the next occupied slot in the buffer, or
231 me.occupied == 0 and me.nextout is the index of the next
232 (empty) slot that will be filled by a producer (such as
233 me.nextout == me.nextin) */
234
235 return true;
236}
237
238template <typename TValue, typename TSpec>
239inline bool
240_popBack(TValue & result,
241 ConcurrentQueue<TValue, Suspendable<TSpec> > & me,
243{
244 typedef ConcurrentQueue<TValue, Suspendable<TSpec> > TQueue;
245 typedef typename TQueue::TString TString;
246 typedef typename TString::size_type TSize;
247
248 TSize cap = me.data.size();
249
250 while (me.occupied == 0u && me.writerCount > 0u)
251 me.more.wait(lk);
252
253 if (me.occupied == 0u)
254 return false;
255
256 assert(me.occupied > 0u);
257
258 me.back = (me.back + cap - 1) % cap;
259
260 // extract value and destruct it in the data string
261 // TIter it = me.data.begin() + me.back;
262 result = std::ranges::iter_move(std::ranges::next(me.data.begin(), me.back));
263 // std::swap(result, *it);
264 // valueDestruct(it);
265
266 me.occupied--;
267
268 /* now: either me.occupied > 0 and me.nextout is the index
269 of the next occupied slot in the buffer, or
270 me.occupied == 0 and me.nextout is the index of the next
271 (empty) slot that will be filled by a producer (such as
272 me.nextout == me.nextin) */
273
274 return true;
275}
276
277template <typename TValue, typename TSpec>
278inline bool
279popFront(TValue & result, ConcurrentQueue<TValue, Suspendable<TSpec> > & me)
280{
282 return _popFront(result, me, lock);
283}
284
285template <typename TValue>
286inline bool
287popFront(TValue & result, ConcurrentQueue<TValue, Suspendable<Limit> > & me)
288{
289 {
291 if (!_popFront(result, me, lk))
292 return false;
293 }
294 me.less.notify_all();
295 return true;
296}
297
298template <typename TValue, typename TSpec>
299inline bool
300popBack(TValue & result, ConcurrentQueue<TValue, Suspendable<TSpec> > & me)
301{
303 return _popBack(result, me, lk);
304}
305
306template <typename TValue>
307inline bool
308popBack(TValue & result, ConcurrentQueue<TValue, Suspendable<Limit> > & me)
309{
310 {
312 if (!_popBack(result, me, lk))
313 return false;
314 }
315 me.less.notify_all();
316 return true;
317}
318
319
320template <typename TValue, typename TValue2, typename TSpec, typename TExpand>
321inline bool
322appendValue(ConcurrentQueue<TValue, Suspendable<TSpec> > & me,
323 TValue2 && val,
324 [[maybe_unused]] Tag<TExpand> expandTag)
325{
326 typedef ConcurrentQueue<TValue, Suspendable<TSpec> > TQueue;
327 typedef typename TQueue::TString TString;
328 typedef typename TString::size_type TSize;
329
330 {
332 TSize cap = me.data.size();
333
334 if (me.occupied >= cap)
335 {
336 // increase capacity
337 // _setLength(me.data, cap);
338 // reserve(me.data, cap + 1, expandTag);
339 me.data.resize(cap + 1);
340 TSize delta = me.data.size() - cap;
341 assert(delta == 1);
342
343 // create a gap of delta many values between tail and head
344 // Why?
345 // _clearSpace(me.data, delta, me.back, me.back, expandTag);
346 std::ranges::move_backward(std::span{me.data.data() + me.front, me.data.data() + cap},
347 me.data.data() + me.data.size());
348 if (me.occupied != 0 && me.back <= me.front)
349 me.front += delta;
350
351 cap += delta;
352 }
353
354 // valueConstruct(begin(me.data, Standard()) + me.back, val);
355 *std::ranges::next(me.data.begin(), me.back) = std::forward<TValue2>(val);
356 me.back = (me.back + 1) % cap;
357
358 ++me.occupied;
359 }
360
361 /* now: either me.occupied < BSIZE and me.nextin is the index
362 of the next empty slot in the buffer, or
363 me.occupied == BSIZE and me.nextin is the index of the
364 next (occupied) slot that will be emptied by a consumer
365 (such as me.nextin == me.nextout) */
366 me.more.notify_all();
367 return true;
368}
369
370template <typename TValue, typename TValue2, typename TSpec, typename TExpand>
371inline bool
372appendValue(ConcurrentQueue<TValue, Suspendable<Limit> > & me,
373 TValue2 && val,
374 Tag<TExpand> expandTag);
375
376template <typename TValue, typename TValue2>
377inline bool
378appendValue(ConcurrentQueue<TValue, Suspendable<Limit> > & me,
379 TValue2 && val,
380 Limit)
381{
382 typedef ConcurrentQueue<TValue, Suspendable<Limit> > TQueue;
383 typedef typename TQueue::TString TString;
384 typedef typename TString::size_type TSize;
385
386 {
388 TSize cap = me.data.size();
389
390 while (me.occupied >= cap && me.readerCount > 0u)
391 me.less.wait(lock);
392
393 if (me.occupied >= cap)
394 return false;
395
396 assert(me.occupied < cap);
397
398 // valueConstruct(begin(me.data, Standard()) + me.back, val);
399 *std::ranges::next(me.data.begin(), me.back) = std::forward<TValue2>(val);
400 me.back = (me.back + 1) % cap;
401 me.occupied++;
402 }
403
404 /* now: either me.occupied < BSIZE and me.nextin is the index
405 of the next empty slot in the buffer, or
406 me.occupied == BSIZE and me.nextin is the index of the
407 next (occupied) slot that will be emptied by a consumer
408 (such as me.nextin == me.nextout) */
409 me.more.notify_all();
410 return true;
411}
412
413template <typename TValue, typename TValue2, typename TSpec>
414inline bool
415appendValue(ConcurrentQueue<TValue, Suspendable<TSpec> > & me,
416 TValue2 && val)
417{
418 return appendValue(me, std::forward<TValue2>(val), TSpec{});
419}
420
421} // namespace seqan3::contrib
The <algorithm> header from C++20's standard library.
T data(T... args)
T empty(T... args)
typename decltype(detail::front(list_t{}))::type front
Return the first type from the type list.
Definition: traits.hpp:279
typename decltype(detail::back(list_t{}))::type back
Return the last type from the type list.
Definition: traits.hpp:301
The <iterator> header from C++20's standard library.
T lock(T... args)
Provides platform and dependency checks.
The <ranges> header from C++20's standard library.
Provides std::span from the C++20 standard library.