SeqAn3  3.0.3
The Modern C++ library for sequence analysis.
suspendable_queue.hpp
Go to the documentation of this file.
1 // -----------------------------------------------------------------------------------------------------
2 // Copyright (c) 2006-2021, Knut Reinert & Freie Universität Berlin
3 // Copyright (c) 2016-2021, Knut Reinert & MPI für molekulare Genetik
4 // This file may be used, modified and/or redistributed under the terms of the 3-clause BSD-License
5 // shipped with this file and also available at: https://github.com/seqan/seqan3/blob/master/LICENSE.md
6 // -----------------------------------------------------------------------------------------------------
7 
14 #pragma once
15 
16 #include <condition_variable>
17 #include <mutex>
18 #include <thread>
19 #include <vector>
20 
21 #include <seqan3/core/platform.hpp>
22 #include <seqan3/std/algorithm>
23 #include <seqan3/std/iterator>
24 #include <seqan3/std/ranges>
25 #include <seqan3/std/span>
26 
27 namespace seqan3::contrib
28 {
29 
30 // ============================================================================
31 // Forwards
32 // ============================================================================
33 
34 // ============================================================================
35 // Classes
36 // ============================================================================
37 
38 // ----------------------------------------------------------------------------
39 // Class ConcurrentQueue
40 // ----------------------------------------------------------------------------
41 
42 template <typename TSpec = void>
43 struct Tag{};
44 
45 template <typename TSpec = Tag<void>>
46 struct Suspendable;
47 
48 template <typename TValue, typename TSpec = Suspendable<>>
49 class ConcurrentQueue;
50 
51 struct Limit_;
52 using Limit = Tag<Limit_>;
53 
54 template <typename TValue, typename TSpec>
55 class ConcurrentQueue<TValue, Suspendable<TSpec> >
56 {
57 public:
58  typedef std::vector<TValue> TString;
59  typedef typename TString::size_type TSize;
60 
61  size_t readerCount;
62  size_t writerCount;
63 
64  TString data;
65  TSize occupied;
66  TSize back;
67  TSize front;
68 
69  std::mutex cs;
71 
72  bool virgin;
73 
74  ConcurrentQueue():
75  readerCount(0),
76  writerCount(0),
77  occupied(0),
78  back(0),
79  front(0),
80  virgin(true)
81  {}
82 
83  ~ConcurrentQueue()
84  {
85  assert(writerCount == 0u);
86 
87  // wait for all pending readers to finish
88  while (readerCount != 0u)
89  {}
90  }
91 };
92 
93 template <typename TValue>
94 class ConcurrentQueue<TValue, Suspendable<Limit> >:
95  public ConcurrentQueue<TValue, Suspendable<> >
96 {
97 public:
98  typedef ConcurrentQueue<TValue, Suspendable<> > TBase;
99  typedef typename TBase::TString TString;
100  typedef typename TBase::TSize TSize;
101 
103 
104  ConcurrentQueue(TSize maxSize):
105  TBase()
106  {
107  this->data.resize(maxSize);
108  // reserve(this->data, maxSize, Exact());
109  // _setLength(this->data, maxSize);
110  }
111 
112  ConcurrentQueue(ConcurrentQueue const & other):
113  TBase((TBase const &)other)
114  {}
115 };
116 
117 template <typename TValue, typename TSpec>
118 inline void
119 lockReading(ConcurrentQueue<TValue, Suspendable<TSpec> > &)
120 {}
121 
122 template <typename TValue, typename TSpec>
123 inline void
124 unlockReading(ConcurrentQueue<TValue, Suspendable<TSpec> > & me)
125 {
126  {
128  if (--me.readerCount != 0u)
129  return;
130  }
131  me.less.notify_all(); // publish the condition that reader count is 0.
132 }
133 
134 template <typename TValue, typename TSpec>
135 inline void
136 lockWriting(ConcurrentQueue<TValue, Suspendable<TSpec> > &)
137 {}
138 
139 template <typename TValue, typename TSpec>
140 inline void
141 unlockWriting(ConcurrentQueue<TValue, Suspendable<TSpec> > & me)
142 {
143  {
144  std::lock_guard<std::mutex> lk(me.cs);
145  if (--me.writerCount != 0u)
146  return;
147  }
148  me.more.notify_all(); // publish the condition, that writer count is 0.
149 }
150 
151 template <typename TValue, typename TSize, typename TSpec>
152 inline void
153 setReaderCount(ConcurrentQueue<TValue, Suspendable<TSpec> > & me, TSize readerCount)
154 {
156  me.readerCount = readerCount;
157 }
158 
159 template <typename TValue, typename TSize, typename TSpec>
160 inline void
161 setWriterCount(ConcurrentQueue<TValue, Suspendable<TSpec> > & me, TSize writerCount)
162 {
164  me.writerCount = writerCount;
165 }
166 
167 template <typename TValue, typename TSize1, typename TSize2, typename TSpec>
168 inline void
169 setReaderWriterCount(ConcurrentQueue<TValue, Suspendable<TSpec> > & me, TSize1 readerCount, TSize2 writerCount)
170 {
172  me.readerCount = readerCount;
173  me.writerCount = writerCount;
174 }
175 
176 template <typename TValue, typename TSize, typename TSpec>
177 inline bool
178 waitForMinSize(ConcurrentQueue<TValue, Suspendable<TSpec> > & me,
179  TSize minSize)
180 {
182  while (me.occupied < minSize && me.writerCount > 0u)
183  me.more.wait(lock);
184  return me.occupied >= minSize;
185 }
186 
187 template <typename TValue, typename TSpec>
188 inline bool
189 empty(ConcurrentQueue<TValue, Suspendable<TSpec> > const & me)
190 {
191  return me.occupied == 0;
192 }
193 
194 template <typename TValue, typename TSpec>
195 inline typename ConcurrentQueue<TValue, Suspendable<TSpec> >::SizeType
196 length(ConcurrentQueue<TValue, Suspendable<TSpec> > const & me)
197 {
198  return me.occupied;
199 }
200 
201 template <typename TValue, typename TSpec>
202 inline bool
203 _popFront(TValue & result, ConcurrentQueue<TValue, Suspendable<TSpec> > & me,
205 {
206  typedef ConcurrentQueue<TValue, Suspendable<TSpec> > TQueue;
207  typedef typename TQueue::TString TString;
208  typedef typename TString::size_type TSize;
209 
210  TSize cap = me.data.size();
211 
212  while (me.occupied == 0u && me.writerCount > 0u)
213  me.more.wait(lk);
214 
215  if (me.occupied == 0u)
216  return false;
217 
218  assert(me.occupied > 0u);
219 
220  // extract value and destruct it in the data string
221  // TIter it = me.data.begin() + me.front;
222  result = std::ranges::iter_move(std::ranges::next(me.data.begin(), me.front));
223  // std::swap(result, *it);
224  // valueDestruct(it);
225 
226  me.front = (me.front + 1) % cap;
227  me.occupied--;
228 
229  /* now: either me.occupied > 0 and me.nextout is the index
230  of the next occupied slot in the buffer, or
231  me.occupied == 0 and me.nextout is the index of the next
232  (empty) slot that will be filled by a producer (such as
233  me.nextout == me.nextin) */
234 
235  return true;
236 }
237 
238 template <typename TValue, typename TSpec>
239 inline bool
240 _popBack(TValue & result,
241  ConcurrentQueue<TValue, Suspendable<TSpec> > & me,
243 {
244  typedef ConcurrentQueue<TValue, Suspendable<TSpec> > TQueue;
245  typedef typename TQueue::TString TString;
246  typedef typename TString::size_type TSize;
247 
248  TSize cap = me.data.size();
249 
250  while (me.occupied == 0u && me.writerCount > 0u)
251  me.more.wait(lk);
252 
253  if (me.occupied == 0u)
254  return false;
255 
256  assert(me.occupied > 0u);
257 
258  me.back = (me.back + cap - 1) % cap;
259 
260  // extract value and destruct it in the data string
261  // TIter it = me.data.begin() + me.back;
262  result = std::ranges::iter_move(std::ranges::next(me.data.begin(), me.back));
263  // std::swap(result, *it);
264  // valueDestruct(it);
265 
266  me.occupied--;
267 
268  /* now: either me.occupied > 0 and me.nextout is the index
269  of the next occupied slot in the buffer, or
270  me.occupied == 0 and me.nextout is the index of the next
271  (empty) slot that will be filled by a producer (such as
272  me.nextout == me.nextin) */
273 
274  return true;
275 }
276 
277 template <typename TValue, typename TSpec>
278 inline bool
279 popFront(TValue & result, ConcurrentQueue<TValue, Suspendable<TSpec> > & me)
280 {
282  return _popFront(result, me, lock);
283 }
284 
285 template <typename TValue>
286 inline bool
287 popFront(TValue & result, ConcurrentQueue<TValue, Suspendable<Limit> > & me)
288 {
289  {
291  if (!_popFront(result, me, lk))
292  return false;
293  }
294  me.less.notify_all();
295  return true;
296 }
297 
298 template <typename TValue, typename TSpec>
299 inline bool
300 popBack(TValue & result, ConcurrentQueue<TValue, Suspendable<TSpec> > & me)
301 {
303  return _popBack(result, me, lk);
304 }
305 
306 template <typename TValue>
307 inline bool
308 popBack(TValue & result, ConcurrentQueue<TValue, Suspendable<Limit> > & me)
309 {
310  {
312  if (!_popBack(result, me, lk))
313  return false;
314  }
315  me.less.notify_all();
316  return true;
317 }
318 
319 
320 template <typename TValue, typename TValue2, typename TSpec, typename TExpand>
321 inline bool
322 appendValue(ConcurrentQueue<TValue, Suspendable<TSpec> > & me,
323  TValue2 && val,
324  [[maybe_unused]] Tag<TExpand> expandTag)
325 {
326  typedef ConcurrentQueue<TValue, Suspendable<TSpec> > TQueue;
327  typedef typename TQueue::TString TString;
328  typedef typename TString::size_type TSize;
329 
330  {
332  TSize cap = me.data.size();
333 
334  if (me.occupied >= cap)
335  {
336  // increase capacity
337  // _setLength(me.data, cap);
338  // reserve(me.data, cap + 1, expandTag);
339  me.data.resize(cap + 1);
340  TSize delta = me.data.size() - cap;
341  assert(delta == 1);
342 
343  // create a gap of delta many values between tail and head
344  // Why?
345  // _clearSpace(me.data, delta, me.back, me.back, expandTag);
346  std::ranges::move_backward(std::span{me.data.data() + me.front, me.data.data() + cap},
347  me.data.data() + me.data.size());
348  if (me.occupied != 0 && me.back <= me.front)
349  me.front += delta;
350 
351  cap += delta;
352  }
353 
354  // valueConstruct(begin(me.data, Standard()) + me.back, val);
355  *std::ranges::next(me.data.begin(), me.back) = std::forward<TValue2>(val);
356  me.back = (me.back + 1) % cap;
357 
358  ++me.occupied;
359  }
360 
361  /* now: either me.occupied < BSIZE and me.nextin is the index
362  of the next empty slot in the buffer, or
363  me.occupied == BSIZE and me.nextin is the index of the
364  next (occupied) slot that will be emptied by a consumer
365  (such as me.nextin == me.nextout) */
366  me.more.notify_all();
367  return true;
368 }
369 
370 template <typename TValue, typename TValue2, typename TSpec, typename TExpand>
371 inline bool
372 appendValue(ConcurrentQueue<TValue, Suspendable<Limit> > & me,
373  TValue2 && val,
374  Tag<TExpand> expandTag);
375 
376 template <typename TValue, typename TValue2>
377 inline bool
378 appendValue(ConcurrentQueue<TValue, Suspendable<Limit> > & me,
379  TValue2 && val,
380  Limit)
381 {
382  typedef ConcurrentQueue<TValue, Suspendable<Limit> > TQueue;
383  typedef typename TQueue::TString TString;
384  typedef typename TString::size_type TSize;
385 
386  {
388  TSize cap = me.data.size();
389 
390  while (me.occupied >= cap && me.readerCount > 0u)
391  me.less.wait(lock);
392 
393  if (me.occupied >= cap)
394  return false;
395 
396  assert(me.occupied < cap);
397 
398  // valueConstruct(begin(me.data, Standard()) + me.back, val);
399  *std::ranges::next(me.data.begin(), me.back) = std::forward<TValue2>(val);
400  me.back = (me.back + 1) % cap;
401  me.occupied++;
402  }
403 
404  /* now: either me.occupied < BSIZE and me.nextin is the index
405  of the next empty slot in the buffer, or
406  me.occupied == BSIZE and me.nextin is the index of the
407  next (occupied) slot that will be emptied by a consumer
408  (such as me.nextin == me.nextout) */
409  me.more.notify_all();
410  return true;
411 }
412 
413 template <typename TValue, typename TValue2, typename TSpec>
414 inline bool
415 appendValue(ConcurrentQueue<TValue, Suspendable<TSpec> > & me,
416  TValue2 && val)
417 {
418  return appendValue(me, std::forward<TValue2>(val), TSpec{});
419 }
420 
421 } // namespace seqan3::contrib
Adaptations of algorithms from the Ranges TS.
T data(T... args)
T empty(T... args)
typename decltype(detail::front(list_t{}))::type front
Return the first type from the type list.
Definition: traits.hpp:279
typename decltype(detail::back(list_t{}))::type back
Return the last type from the type list.
Definition: traits.hpp:301
Provides C++20 additions to the <iterator> header.
T lock(T... args)
Provides platform and dependency checks.
Adaptations of concepts from the Ranges TS.
Provides std::span from the C++20 standard library.