SeqAn3  3.0.3
The Modern C++ library for sequence analysis.
serialised_resource_pool.hpp
Go to the documentation of this file.
1 // -----------------------------------------------------------------------------------------------------
2 // Copyright (c) 2006-2021, Knut Reinert & Freie Universität Berlin
3 // Copyright (c) 2016-2021, Knut Reinert & MPI für molekulare Genetik
4 // This file may be used, modified and/or redistributed under the terms of the 3-clause BSD-License
5 // shipped with this file and also available at: https://github.com/seqan/seqan3/blob/master/LICENSE.md
6 // -----------------------------------------------------------------------------------------------------
7 
14 #pragma once
15 
16 #include <mutex>
17 
19 
20 namespace seqan3::contrib
21 {
22 
23 // ============================================================================
24 // Classes
25 // ============================================================================
26 
27 template <typename TValue>
28 struct ResourcePool
29 {
30  typedef ConcurrentQueue<TValue *, Suspendable<> > TStack;
31  typedef typename TStack::TSize TSize;
32 
33  TStack recycled;
34 
35  ResourcePool(TSize maxSize)
36  {
37  setWriterCount(recycled, 1);
38  for (; maxSize != 0; --maxSize)
39  appendValue(recycled, (TValue *)NULL);
40  }
41 
42  ~ResourcePool()
43  {
44  unlockWriting(recycled);
45  TValue *ptr = NULL;
46  unsigned count = 0;
47  while (popBack(ptr, recycled))
48  {
49  if (ptr != NULL)
50  count++;
51  delete ptr;
52  }
53  }
54 };
55 
56 // ----------------------------------------------------------------------------
57 // Struct SerializerItem
58 // ----------------------------------------------------------------------------
59 
60 template <typename TValue>
61 struct SerializerItem
62 {
63  TValue val;
64  SerializerItem *next;
65  bool ready;
66 };
67 
68 // ----------------------------------------------------------------------------
69 // Class Serializer
70 // ----------------------------------------------------------------------------
71 
72 template <typename TValue, typename TWorker>
73 class Serializer
74 {
75 public:
76  typedef SerializerItem<TValue> TItem;
77  typedef TItem * TItemPtr;
78  typedef ResourcePool<TItem> TPool;
79  typedef size_t TSize;
80 
81  std::mutex cs;
82  TWorker worker;
83  TItemPtr first;
84  TItemPtr last;
85  TPool pool;
86  bool stop;
87 
88  Serializer() :
89  first(NULL),
90  last(NULL),
91  stop(false)
92  {}
93 
94  template <typename TArg>
95  explicit
96  Serializer(TArg &arg, TSize maxItems = 1024) :
97  worker(arg),
98  first(NULL),
99  last(NULL),
100  pool(maxItems),
101  stop(false)
102  {}
103 
104  ~Serializer()
105  {
106  while (first != NULL)
107  {
108  TItemPtr item = first;
109  first = first->next;
110  delete item;
111  }
112  }
113 
114  operator bool()
115  {
116  return !stop;
117  }
118 };
119 
120 // ============================================================================
121 // Functions
122 // ============================================================================
123 
124 // ----------------------------------------------------------------------------
125 // Function aquireValue()
126 // ----------------------------------------------------------------------------
127 
128 template <typename TValue>
129 inline TValue *
130 aquireValue(ResourcePool<TValue> & me)
131 {
132  TValue *ptr = NULL;
133  if (!popBack(ptr, me.recycled))
134  return NULL;
135 
136  if (ptr == NULL)
137  ptr = new TValue;
138 
139  return ptr;
140 }
141 
142 // ----------------------------------------------------------------------------
143 // Function releaseValue()
144 // ----------------------------------------------------------------------------
145 
146 template <typename TValue>
147 inline void
148 releaseValue(ResourcePool<TValue> & me, TValue *ptr)
149 {
150  appendValue(me.recycled, ptr);
151 }
152 
153 
154 // ----------------------------------------------------------------------------
155 // Function clear()
156 // ----------------------------------------------------------------------------
157 
158 template <typename TValue, typename TWorker>
159 inline void
160 clear(Serializer<TValue, TWorker> & me)
161 {
162  me.stop = false;
163  while (me.first != NULL)
164  {
165  TValue *item = me.first;
166  me.first = me.first->next;
167  releaseValue(me.recycled, item);
168  }
169  me.last = NULL;
170 }
171 
172 // ----------------------------------------------------------------------------
173 // Function aquireValue()
174 // ----------------------------------------------------------------------------
175 
176 // this function is not thread-safe as it would make
177 // not much sense to order a stream by the random
178 // order of executition behind a mutex
179 template <typename TValue, typename TWorker>
180 inline TValue *
181 aquireValue(Serializer<TValue, TWorker> & me)
182 {
183  typedef SerializerItem<TValue> TItem;
184 
185  TItem *item = aquireValue(me.pool);
186  item->next = NULL;
187  item->ready = false;
188 
189  // add item to the end of our linked list
190  {
192  if (me.first == NULL)
193  me.first = item;
194  else
195  me.last->next = item;
196  me.last = item;
197  }
198  return &item->val;
199 }
200 
201 // ----------------------------------------------------------------------------
202 // Function releaseValue()
203 // ----------------------------------------------------------------------------
204 
205 template <typename TValue, typename TWorker>
206 inline bool
207 releaseValue(Serializer<TValue, TWorker> & me, TValue *ptr)
208 {
209  typedef SerializerItem<TValue> TItem;
210 
211  TItem *item = reinterpret_cast<TItem *>(ptr);
212  assert(!item->ready);
213 
214  // changing me.first or the ready flag must be done synchronized (me.mutex)
215  // the thread who changed me.first->ready to be true has to write it.
216 
217  // change our ready flag and test if me.first->ready became true
218  {
220  item->ready = true;
221  if (item != me.first)
222  return true;
223  }
224 
225  // ok, if we are here it seems that we are responsible for writing the buffer
226 
227  assert(me.first != NULL);
228 
229  bool success;
230  do
231  {
232  // process item
233  success = me.worker(item->val);
234 
235  // remove item from linked list
236  {
238  me.first = item->next;
239 
240  // recycle released items
241  releaseValue(me.pool, item);
242 
243  // can we leave?
244  item = me.first;
245  if (item == NULL || !item->ready)
246  return success;
247  }
248 
249  // we continue to process the next buffer
250  }
251  while (success);
252 
253  return false;
254 }
255 
256 } // namespace seqan3::contrib
constexpr ptrdiff_t count
Count the occurrences of a type in a pack.
Definition: traits.hpp:169
T lock(T... args)
T next(T... args)
Provides seqan suspendable queue.