SeqAn3 3.2.0
The Modern C++ library for sequence analysis.
All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Modules Pages
suspendable_queue.hpp
Go to the documentation of this file.
1// -----------------------------------------------------------------------------------------------------
2// Copyright (c) 2006-2022, Knut Reinert & Freie Universität Berlin
3// Copyright (c) 2016-2022, Knut Reinert & MPI für molekulare Genetik
4// This file may be used, modified and/or redistributed under the terms of the 3-clause BSD-License
5// shipped with this file and also available at: https://github.com/seqan/seqan3/blob/master/LICENSE.md
6// -----------------------------------------------------------------------------------------------------
7
14#pragma once
15
16#include <algorithm>
17#include <cassert>
18#include <condition_variable>
19#include <iterator>
20#include <mutex>
21#include <ranges>
22#include <span>
23#include <thread>
24#include <vector>
25
27
28namespace seqan3::contrib
29{
30
31// ============================================================================
32// Forwards
33// ============================================================================
34
35// ============================================================================
36// Classes
37// ============================================================================
38
39// ----------------------------------------------------------------------------
40// Class ConcurrentQueue
41// ----------------------------------------------------------------------------
42
43template <typename TSpec = void>
44struct Tag{};
45
46template <typename TSpec = Tag<void>>
47struct Suspendable;
48
49template <typename TValue, typename TSpec = Suspendable<>>
50class ConcurrentQueue;
51
52struct Limit_;
53using Limit = Tag<Limit_>;
54
55template <typename TValue, typename TSpec>
56class ConcurrentQueue<TValue, Suspendable<TSpec> >
57{
58public:
59 typedef std::vector<TValue> TString;
60 typedef typename TString::size_type TSize;
61
62 size_t readerCount;
63 size_t writerCount;
64
65 TString data;
66 TSize occupied;
67 TSize back;
68 TSize front;
69
70 std::mutex cs;
72
73 bool virgin;
74
75 ConcurrentQueue():
76 readerCount(0),
77 writerCount(0),
78 occupied(0),
79 back(0),
80 front(0),
81 virgin(true)
82 {}
83
84 ~ConcurrentQueue()
85 {
86 assert(writerCount == 0u);
87
88 // wait for all pending readers to finish
89 while (readerCount != 0u)
90 {}
91 }
92};
93
94template <typename TValue>
95class ConcurrentQueue<TValue, Suspendable<Limit> >:
96 public ConcurrentQueue<TValue, Suspendable<> >
97{
98public:
99 typedef ConcurrentQueue<TValue, Suspendable<> > TBase;
100 typedef typename TBase::TString TString;
101 typedef typename TBase::TSize TSize;
102
104
105 ConcurrentQueue(TSize maxSize):
106 TBase()
107 {
108 this->data.resize(maxSize);
109 // reserve(this->data, maxSize, Exact());
110 // _setLength(this->data, maxSize);
111 }
112
113 ConcurrentQueue(ConcurrentQueue const & other):
114 TBase((TBase const &)other)
115 {}
116};
117
118template <typename TValue, typename TSpec>
119inline void
120lockReading(ConcurrentQueue<TValue, Suspendable<TSpec> > &)
121{}
122
123template <typename TValue, typename TSpec>
124inline void
125unlockReading(ConcurrentQueue<TValue, Suspendable<TSpec> > & me)
126{
127 {
129 if (--me.readerCount != 0u)
130 return;
131 }
132 me.less.notify_all(); // publish the condition that reader count is 0.
133}
134
135template <typename TValue, typename TSpec>
136inline void
137lockWriting(ConcurrentQueue<TValue, Suspendable<TSpec> > &)
138{}
139
140template <typename TValue, typename TSpec>
141inline void
142unlockWriting(ConcurrentQueue<TValue, Suspendable<TSpec> > & me)
143{
144 {
146 if (--me.writerCount != 0u)
147 return;
148 }
149 me.more.notify_all(); // publish the condition, that writer count is 0.
150}
151
152template <typename TValue, typename TSize, typename TSpec>
153inline void
154setReaderCount(ConcurrentQueue<TValue, Suspendable<TSpec> > & me, TSize readerCount)
155{
157 me.readerCount = readerCount;
158}
159
160template <typename TValue, typename TSize, typename TSpec>
161inline void
162setWriterCount(ConcurrentQueue<TValue, Suspendable<TSpec> > & me, TSize writerCount)
163{
165 me.writerCount = writerCount;
166}
167
168template <typename TValue, typename TSize1, typename TSize2, typename TSpec>
169inline void
170setReaderWriterCount(ConcurrentQueue<TValue, Suspendable<TSpec> > & me, TSize1 readerCount, TSize2 writerCount)
171{
173 me.readerCount = readerCount;
174 me.writerCount = writerCount;
175}
176
177template <typename TValue, typename TSize, typename TSpec>
178inline bool
179waitForMinSize(ConcurrentQueue<TValue, Suspendable<TSpec> > & me,
180 TSize minSize)
181{
183 while (me.occupied < minSize && me.writerCount > 0u)
184 me.more.wait(lock);
185 return me.occupied >= minSize;
186}
187
188template <typename TValue, typename TSpec>
189inline bool
190empty(ConcurrentQueue<TValue, Suspendable<TSpec> > const & me)
191{
192 return me.occupied == 0;
193}
194
195template <typename TValue, typename TSpec>
196inline typename ConcurrentQueue<TValue, Suspendable<TSpec> >::SizeType
197length(ConcurrentQueue<TValue, Suspendable<TSpec> > const & me)
198{
199 return me.occupied;
200}
201
202template <typename TValue, typename TSpec>
203inline bool
204_popFront(TValue & result, ConcurrentQueue<TValue, Suspendable<TSpec> > & me,
206{
207 typedef ConcurrentQueue<TValue, Suspendable<TSpec> > TQueue;
208 typedef typename TQueue::TString TString;
209 typedef typename TString::size_type TSize;
210
211 TSize cap = me.data.size();
212
213 while (me.occupied == 0u && me.writerCount > 0u)
214 me.more.wait(lk);
215
216 if (me.occupied == 0u)
217 return false;
218
219 assert(me.occupied > 0u);
220
221 // extract value and destruct it in the data string
222 // TIter it = me.data.begin() + me.front;
223 result = std::ranges::iter_move(std::ranges::next(me.data.begin(), me.front));
224 // std::swap(result, *it);
225 // valueDestruct(it);
226
227 me.front = (me.front + 1) % cap;
228 me.occupied--;
229
230 /* now: either me.occupied > 0 and me.nextout is the index
231 of the next occupied slot in the buffer, or
232 me.occupied == 0 and me.nextout is the index of the next
233 (empty) slot that will be filled by a producer (such as
234 me.nextout == me.nextin) */
235
236 return true;
237}
238
239template <typename TValue, typename TSpec>
240inline bool
241_popBack(TValue & result,
242 ConcurrentQueue<TValue, Suspendable<TSpec> > & me,
244{
245 typedef ConcurrentQueue<TValue, Suspendable<TSpec> > TQueue;
246 typedef typename TQueue::TString TString;
247 typedef typename TString::size_type TSize;
248
249 TSize cap = me.data.size();
250
251 while (me.occupied == 0u && me.writerCount > 0u)
252 me.more.wait(lk);
253
254 if (me.occupied == 0u)
255 return false;
256
257 assert(me.occupied > 0u);
258
259 me.back = (me.back + cap - 1) % cap;
260
261 // extract value and destruct it in the data string
262 // TIter it = me.data.begin() + me.back;
263 result = std::ranges::iter_move(std::ranges::next(me.data.begin(), me.back));
264 // std::swap(result, *it);
265 // valueDestruct(it);
266
267 me.occupied--;
268
269 /* now: either me.occupied > 0 and me.nextout is the index
270 of the next occupied slot in the buffer, or
271 me.occupied == 0 and me.nextout is the index of the next
272 (empty) slot that will be filled by a producer (such as
273 me.nextout == me.nextin) */
274
275 return true;
276}
277
278template <typename TValue, typename TSpec>
279inline bool
280popFront(TValue & result, ConcurrentQueue<TValue, Suspendable<TSpec> > & me)
281{
283 return _popFront(result, me, lock);
284}
285
286template <typename TValue>
287inline bool
288popFront(TValue & result, ConcurrentQueue<TValue, Suspendable<Limit> > & me)
289{
290 {
292 if (!_popFront(result, me, lk))
293 return false;
294 }
295 me.less.notify_all();
296 return true;
297}
298
299template <typename TValue, typename TSpec>
300inline bool
301popBack(TValue & result, ConcurrentQueue<TValue, Suspendable<TSpec> > & me)
302{
304 return _popBack(result, me, lk);
305}
306
307template <typename TValue>
308inline bool
309popBack(TValue & result, ConcurrentQueue<TValue, Suspendable<Limit> > & me)
310{
311 {
313 if (!_popBack(result, me, lk))
314 return false;
315 }
316 me.less.notify_all();
317 return true;
318}
319
320
321template <typename TValue, typename TValue2, typename TSpec, typename TExpand>
322inline bool
323appendValue(ConcurrentQueue<TValue, Suspendable<TSpec> > & me,
324 TValue2 && val,
325 [[maybe_unused]] Tag<TExpand> expandTag)
326{
327 typedef ConcurrentQueue<TValue, Suspendable<TSpec> > TQueue;
328 typedef typename TQueue::TString TString;
329 typedef typename TString::size_type TSize;
330
331 {
333 TSize cap = me.data.size();
334
335 if (me.occupied >= cap)
336 {
337 // increase capacity
338 // _setLength(me.data, cap);
339 // reserve(me.data, cap + 1, expandTag);
340 me.data.resize(cap + 1);
341 TSize delta = me.data.size() - cap;
342 assert(delta == 1);
343
344 // create a gap of delta many values between tail and head
345 // Why?
346 // _clearSpace(me.data, delta, me.back, me.back, expandTag);
347 std::ranges::move_backward(std::span{me.data.data() + me.front, me.data.data() + cap},
348 me.data.data() + me.data.size());
349 if (me.occupied != 0 && me.back <= me.front)
350 me.front += delta;
351
352 cap += delta;
353 }
354
355 // valueConstruct(begin(me.data, Standard()) + me.back, val);
356 *std::ranges::next(me.data.begin(), me.back) = std::forward<TValue2>(val);
357 me.back = (me.back + 1) % cap;
358
359 ++me.occupied;
360 }
361
362 /* now: either me.occupied < BSIZE and me.nextin is the index
363 of the next empty slot in the buffer, or
364 me.occupied == BSIZE and me.nextin is the index of the
365 next (occupied) slot that will be emptied by a consumer
366 (such as me.nextin == me.nextout) */
367 me.more.notify_all();
368 return true;
369}
370
371template <typename TValue, typename TValue2, typename TSpec, typename TExpand>
372inline bool
373appendValue(ConcurrentQueue<TValue, Suspendable<Limit> > & me,
374 TValue2 && val,
375 Tag<TExpand> expandTag);
376
377template <typename TValue, typename TValue2>
378inline bool
379appendValue(ConcurrentQueue<TValue, Suspendable<Limit> > & me,
380 TValue2 && val,
381 Limit)
382{
383 typedef ConcurrentQueue<TValue, Suspendable<Limit> > TQueue;
384 typedef typename TQueue::TString TString;
385 typedef typename TString::size_type TSize;
386
387 {
389 TSize cap = me.data.size();
390
391 while (me.occupied >= cap && me.readerCount > 0u)
392 me.less.wait(lock);
393
394 if (me.occupied >= cap)
395 return false;
396
397 assert(me.occupied < cap);
398
399 // valueConstruct(begin(me.data, Standard()) + me.back, val);
400 *std::ranges::next(me.data.begin(), me.back) = std::forward<TValue2>(val);
401 me.back = (me.back + 1) % cap;
402 me.occupied++;
403 }
404
405 /* now: either me.occupied < BSIZE and me.nextin is the index
406 of the next empty slot in the buffer, or
407 me.occupied == BSIZE and me.nextin is the index of the
408 next (occupied) slot that will be emptied by a consumer
409 (such as me.nextin == me.nextout) */
410 me.more.notify_all();
411 return true;
412}
413
414template <typename TValue, typename TValue2, typename TSpec>
415inline bool
416appendValue(ConcurrentQueue<TValue, Suspendable<TSpec> > & me,
417 TValue2 && val)
418{
419 return appendValue(me, std::forward<TValue2>(val), TSpec{});
420}
421
422} // namespace seqan3::contrib
T data(T... args)
T empty(T... args)
typename decltype(detail::front(list_t{}))::type front
Return the first type from the type list.
Definition: traits.hpp:296
typename decltype(detail::back(list_t{}))::type back
Return the last type from the type list.
Definition: traits.hpp:316
T lock(T... args)
T move_backward(T... args)
Provides platform and dependency checks.