SeqAn3 3.4.0-rc.1
The Modern C++ library for sequence analysis.
Loading...
Searching...
No Matches
serialised_resource_pool.hpp
Go to the documentation of this file.
1// SPDX-FileCopyrightText: 2006-2024 Knut Reinert & Freie Universität Berlin
2// SPDX-FileCopyrightText: 2016-2024 Knut Reinert & MPI für molekulare Genetik
3// SPDX-License-Identifier: BSD-3-Clause
4
11#pragma once
12
13#include <mutex>
14
16
17namespace seqan3::contrib
18{
19
20// ============================================================================
21// Classes
22// ============================================================================
23
24template <typename TValue>
25struct ResourcePool
26{
27 typedef ConcurrentQueue<TValue *, Suspendable<>> TStack;
28 typedef typename TStack::TSize TSize;
29
30 TStack recycled;
31
32 ResourcePool(TSize maxSize)
33 {
34 setWriterCount(recycled, 1);
35 for (; maxSize != 0; --maxSize)
36 appendValue(recycled, (TValue *)NULL);
37 }
38
39 ~ResourcePool()
40 {
41 unlockWriting(recycled);
42 TValue * ptr = NULL;
43 while (popBack(ptr, recycled))
44 {
45 delete ptr;
46 }
47 }
48};
49
50// ----------------------------------------------------------------------------
51// Struct SerializerItem
52// ----------------------------------------------------------------------------
53
54template <typename TValue>
55struct SerializerItem
56{
57 TValue val;
58 SerializerItem * next;
59 bool ready;
60};
61
62// ----------------------------------------------------------------------------
63// Class Serializer
64// ----------------------------------------------------------------------------
65
66template <typename TValue, typename TWorker>
67class Serializer
68{
69public:
70 typedef SerializerItem<TValue> TItem;
71 typedef TItem * TItemPtr;
72 typedef ResourcePool<TItem> TPool;
73 typedef size_t TSize;
74
75 std::mutex cs;
76 TWorker worker;
77 TItemPtr first;
78 TItemPtr last;
79 TPool pool;
80 bool stop;
81
82 Serializer() : first(NULL), last(NULL), stop(false)
83 {}
84
85 template <typename TArg>
86 explicit Serializer(TArg & arg, TSize maxItems = 1024) :
87 worker(arg),
88 first(NULL),
89 last(NULL),
90 pool(maxItems),
91 stop(false)
92 {}
93
94 ~Serializer()
95 {
96 while (first != NULL)
97 {
98 TItemPtr item = first;
99 first = first->next;
100 delete item;
101 }
102 }
103
104 operator bool()
105 {
106 return !stop;
107 }
108};
109
110// ============================================================================
111// Functions
112// ============================================================================
113
114// ----------------------------------------------------------------------------
115// Function aquireValue()
116// ----------------------------------------------------------------------------
117
118template <typename TValue>
119inline TValue * aquireValue(ResourcePool<TValue> & me)
120{
121 TValue * ptr = NULL;
122 if (!popBack(ptr, me.recycled))
123 return NULL;
124
125 if (ptr == NULL)
126 ptr = new TValue;
127
128 return ptr;
129}
130
131// ----------------------------------------------------------------------------
132// Function releaseValue()
133// ----------------------------------------------------------------------------
134
135template <typename TValue>
136inline void releaseValue(ResourcePool<TValue> & me, TValue * ptr)
137{
138 appendValue(me.recycled, ptr);
139}
140
141// ----------------------------------------------------------------------------
142// Function clear()
143// ----------------------------------------------------------------------------
144
145template <typename TValue, typename TWorker>
146inline void clear(Serializer<TValue, TWorker> & me)
147{
148 me.stop = false;
149 while (me.first != NULL)
150 {
151 TValue * item = me.first;
152 me.first = me.first->next;
153 releaseValue(me.recycled, item);
154 }
155 me.last = NULL;
156}
157
158// ----------------------------------------------------------------------------
159// Function aquireValue()
160// ----------------------------------------------------------------------------
161
162// this function is not thread-safe as it would make
163// not much sense to order a stream by the random
164// order of executition behind a mutex
165template <typename TValue, typename TWorker>
166inline TValue * aquireValue(Serializer<TValue, TWorker> & me)
167{
168 typedef SerializerItem<TValue> TItem;
169
170 TItem * item = aquireValue(me.pool);
171 item->next = NULL;
172 item->ready = false;
173
174 // add item to the end of our linked list
175 {
177 if (me.first == NULL)
178 me.first = item;
179 else
180 me.last->next = item;
181 me.last = item;
182 }
183 return &item->val;
184}
185
186// ----------------------------------------------------------------------------
187// Function releaseValue()
188// ----------------------------------------------------------------------------
189
190template <typename TValue, typename TWorker>
191inline bool releaseValue(Serializer<TValue, TWorker> & me, TValue * ptr)
192{
193 typedef SerializerItem<TValue> TItem;
194
195 TItem * item = reinterpret_cast<TItem *>(ptr);
196 assert(!item->ready);
197
198 // changing me.first or the ready flag must be done synchronized (me.mutex)
199 // the thread who changed me.first->ready to be true has to write it.
200
201 // change our ready flag and test if me.first->ready became true
202 {
204 item->ready = true;
205 if (item != me.first)
206 return true;
207 }
208
209 // ok, if we are here it seems that we are responsible for writing the buffer
210
211 assert(me.first != NULL);
212
213 bool success;
214 do
215 {
216 // process item
217 success = me.worker(item->val);
218
219 // remove item from linked list
220 {
222 me.first = item->next;
223
224 // recycle released items
225 releaseValue(me.pool, item);
226
227 // can we leave?
228 item = me.first;
229 if (item == NULL || !item->ready)
230 return success;
231 }
232
233 // we continue to process the next buffer
234 }
235 while (success);
236
237 return false;
238}
239
240} // namespace seqan3::contrib
T lock(T... args)
T next(T... args)
Provides seqan suspendable queue.
Hide me