SeqAn3 3.1.0
The Modern C++ library for sequence analysis.
All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Modules Pages
serialised_resource_pool.hpp
Go to the documentation of this file.
1// -----------------------------------------------------------------------------------------------------
2// Copyright (c) 2006-2021, Knut Reinert & Freie Universität Berlin
3// Copyright (c) 2016-2021, Knut Reinert & MPI für molekulare Genetik
4// This file may be used, modified and/or redistributed under the terms of the 3-clause BSD-License
5// shipped with this file and also available at: https://github.com/seqan/seqan3/blob/master/LICENSE.md
6// -----------------------------------------------------------------------------------------------------
7
14#pragma once
15
16#include <mutex>
17
19
20namespace seqan3::contrib
21{
22
23// ============================================================================
24// Classes
25// ============================================================================
26
27template <typename TValue>
28struct ResourcePool
29{
30 typedef ConcurrentQueue<TValue *, Suspendable<> > TStack;
31 typedef typename TStack::TSize TSize;
32
33 TStack recycled;
34
35 ResourcePool(TSize maxSize)
36 {
37 setWriterCount(recycled, 1);
38 for (; maxSize != 0; --maxSize)
39 appendValue(recycled, (TValue *)NULL);
40 }
41
42 ~ResourcePool()
43 {
44 unlockWriting(recycled);
45 TValue *ptr = NULL;
46 unsigned count = 0;
47 while (popBack(ptr, recycled))
48 {
49 if (ptr != NULL)
50 count++;
51 delete ptr;
52 }
53 }
54};
55
56// ----------------------------------------------------------------------------
57// Struct SerializerItem
58// ----------------------------------------------------------------------------
59
60template <typename TValue>
61struct SerializerItem
62{
63 TValue val;
64 SerializerItem *next;
65 bool ready;
66};
67
68// ----------------------------------------------------------------------------
69// Class Serializer
70// ----------------------------------------------------------------------------
71
72template <typename TValue, typename TWorker>
73class Serializer
74{
75public:
76 typedef SerializerItem<TValue> TItem;
77 typedef TItem * TItemPtr;
78 typedef ResourcePool<TItem> TPool;
79 typedef size_t TSize;
80
81 std::mutex cs;
82 TWorker worker;
83 TItemPtr first;
84 TItemPtr last;
85 TPool pool;
86 bool stop;
87
88 Serializer() :
89 first(NULL),
90 last(NULL),
91 stop(false)
92 {}
93
94 template <typename TArg>
95 explicit
96 Serializer(TArg &arg, TSize maxItems = 1024) :
97 worker(arg),
98 first(NULL),
99 last(NULL),
100 pool(maxItems),
101 stop(false)
102 {}
103
104 ~Serializer()
105 {
106 while (first != NULL)
107 {
108 TItemPtr item = first;
109 first = first->next;
110 delete item;
111 }
112 }
113
114 operator bool()
115 {
116 return !stop;
117 }
118};
119
120// ============================================================================
121// Functions
122// ============================================================================
123
124// ----------------------------------------------------------------------------
125// Function aquireValue()
126// ----------------------------------------------------------------------------
127
128template <typename TValue>
129inline TValue *
130aquireValue(ResourcePool<TValue> & me)
131{
132 TValue *ptr = NULL;
133 if (!popBack(ptr, me.recycled))
134 return NULL;
135
136 if (ptr == NULL)
137 ptr = new TValue;
138
139 return ptr;
140}
141
142// ----------------------------------------------------------------------------
143// Function releaseValue()
144// ----------------------------------------------------------------------------
145
146template <typename TValue>
147inline void
148releaseValue(ResourcePool<TValue> & me, TValue *ptr)
149{
150 appendValue(me.recycled, ptr);
151}
152
153
154// ----------------------------------------------------------------------------
155// Function clear()
156// ----------------------------------------------------------------------------
157
158template <typename TValue, typename TWorker>
159inline void
160clear(Serializer<TValue, TWorker> & me)
161{
162 me.stop = false;
163 while (me.first != NULL)
164 {
165 TValue *item = me.first;
166 me.first = me.first->next;
167 releaseValue(me.recycled, item);
168 }
169 me.last = NULL;
170}
171
172// ----------------------------------------------------------------------------
173// Function aquireValue()
174// ----------------------------------------------------------------------------
175
176// this function is not thread-safe as it would make
177// not much sense to order a stream by the random
178// order of executition behind a mutex
179template <typename TValue, typename TWorker>
180inline TValue *
181aquireValue(Serializer<TValue, TWorker> & me)
182{
183 typedef SerializerItem<TValue> TItem;
184
185 TItem *item = aquireValue(me.pool);
186 item->next = NULL;
187 item->ready = false;
188
189 // add item to the end of our linked list
190 {
192 if (me.first == NULL)
193 me.first = item;
194 else
195 me.last->next = item;
196 me.last = item;
197 }
198 return &item->val;
199}
200
201// ----------------------------------------------------------------------------
202// Function releaseValue()
203// ----------------------------------------------------------------------------
204
205template <typename TValue, typename TWorker>
206inline bool
207releaseValue(Serializer<TValue, TWorker> & me, TValue *ptr)
208{
209 typedef SerializerItem<TValue> TItem;
210
211 TItem *item = reinterpret_cast<TItem *>(ptr);
212 assert(!item->ready);
213
214 // changing me.first or the ready flag must be done synchronized (me.mutex)
215 // the thread who changed me.first->ready to be true has to write it.
216
217 // change our ready flag and test if me.first->ready became true
218 {
220 item->ready = true;
221 if (item != me.first)
222 return true;
223 }
224
225 // ok, if we are here it seems that we are responsible for writing the buffer
226
227 assert(me.first != NULL);
228
229 bool success;
230 do
231 {
232 // process item
233 success = me.worker(item->val);
234
235 // remove item from linked list
236 {
238 me.first = item->next;
239
240 // recycle released items
241 releaseValue(me.pool, item);
242
243 // can we leave?
244 item = me.first;
245 if (item == NULL || !item->ready)
246 return success;
247 }
248
249 // we continue to process the next buffer
250 }
251 while (success);
252
253 return false;
254}
255
256} // namespace seqan3::contrib
constexpr ptrdiff_t count
Count the occurrences of a type in a pack.
Definition: traits.hpp:169
T lock(T... args)
T next(T... args)
Provides seqan suspendable queue.