Line data Source code
1 : #include "Module/Stateful/Adaptor/Adaptor_m_to_n.hpp"
2 : #include "Tools/Math/utils.h"
3 :
4 : using namespace spu;
5 : using namespace spu::module;
6 :
7 16230 : Adaptor_m_to_n::~Adaptor_m_to_n()
8 : {
9 1518149 : for (auto b : this->buffer_to_free)
10 1512739 : delete[] b;
11 5410 : if (this->cloned) (*this->n_clones)--;
12 10820 : }
13 :
14 : Adaptor_m_to_n*
15 5168 : Adaptor_m_to_n::clone() const
16 : {
17 5168 : auto m = new Adaptor_m_to_n(*this);
18 5168 : m->deep_copy(*this);
19 5168 : return m;
20 : }
21 :
22 : void
23 5168 : Adaptor_m_to_n::deep_copy(const Adaptor_m_to_n& m)
24 : {
25 5168 : Stateful::deep_copy(m);
26 :
27 5168 : if (*this->buffers_allocated)
28 : {
29 0 : std::stringstream message;
30 0 : message << "Shared buffers have already been allocated, cloning is no more possible." << std::endl;
31 0 : throw tools::runtime_error(__FILE__, __LINE__, __func__, message.str());
32 0 : }
33 :
34 5168 : this->cloned = true;
35 5168 : (*this->n_clones)++;
36 5168 : this->tid_push = -1;
37 5168 : this->tid_pull = -1;
38 5168 : this->cur_push_id = -1;
39 5168 : this->cur_pull_id = -1;
40 :
41 5168 : this->waiting_canceled.reset(new std::atomic<bool>(m.waiting_canceled->load()));
42 5168 : }
43 :
44 : void
45 242 : Adaptor_m_to_n::alloc_buffers()
46 : {
47 242 : if (*this->buffers_allocated)
48 : {
49 0 : std::stringstream message;
50 0 : message << "Synchronization buffers have already been allocated.";
51 0 : throw tools::runtime_error(__FILE__, __LINE__, __func__, message.str());
52 0 : }
53 :
54 242 : if (this->cloned)
55 : {
56 0 : std::stringstream message;
57 0 : message << "'alloc_buffers()' cannot be called on a cloned module ('tid_push' = " << this->tid_push
58 0 : << ", 'tid_pull' = " << this->tid_pull << ").";
59 0 : throw tools::runtime_error(__FILE__, __LINE__, __func__, message.str());
60 0 : }
61 :
62 242 : size_t ppcm = tools::find_smallest_common_multiple(*this->n_pushers, *this->n_pullers);
63 :
64 242 : if (ppcm > 1000)
65 : {
66 0 : std::stringstream message;
67 0 : message << "'ppcm' cannot exceed 1000 ('ppcm' = " << ppcm << ").";
68 0 : throw tools::invalid_argument(__FILE__, __LINE__, __func__, message.str());
69 0 : }
70 :
71 484 : this->buffer->resize(ppcm,
72 484 : std::vector<std::vector<int8_t*>>(this->n_sockets, std::vector<int8_t*>(this->buffer_size)));
73 :
74 242 : if (this->buffer_to_free.size())
75 : {
76 0 : std::stringstream message;
77 0 : message << "This should never happen.";
78 0 : throw tools::runtime_error(__FILE__, __LINE__, __func__, message.str());
79 0 : }
80 :
81 : // this->buffer_to_free.clear();
82 5420 : for (size_t d = 0; d < ppcm; d++)
83 : {
84 15725 : for (size_t s = 0; s < this->n_sockets; s++)
85 1523286 : for (size_t b = 0; b < this->buffer_size; b++)
86 : {
87 1512739 : (*this->buffer)[d][s][b] = new int8_t[this->n_frames * this->n_bytes[s]];
88 1512739 : this->buffer_to_free.push_back((*this->buffer)[d][s][b]);
89 : }
90 5178 : (*this->first)[d] = 0;
91 5178 : (*this->last)[d] = 0;
92 5178 : (*this->counter)[d] = this->buffer_size;
93 : }
94 :
95 242 : *this->buffers_allocated = true;
96 242 : }
97 :
98 : void
99 2342 : Adaptor_m_to_n::add_pusher()
100 : {
101 2342 : if (*this->n_pushers - 1 + *this->n_pullers - 1 >= *this->n_clones)
102 : {
103 0 : std::stringstream message;
104 0 : message << "Pusher cannot be added because a clone is missing ('n_pushers' = " << *this->n_pushers
105 0 : << ", 'n_pullers' = " << *this->n_pullers << ", 'n_clones' = " << *this->n_clones << ").";
106 0 : throw tools::runtime_error(__FILE__, __LINE__, __func__, message.str());
107 0 : }
108 :
109 2342 : this->tid_push = (*this->n_pushers);
110 2342 : (*this->n_pushers)++;
111 2342 : this->cur_push_id = this->tid_push;
112 2342 : }
113 :
114 : void
115 2342 : Adaptor_m_to_n::add_puller()
116 : {
117 2342 : if (*this->n_pushers - 1 + *this->n_pullers - 1 >= *this->n_clones)
118 : {
119 0 : std::stringstream message;
120 0 : message << "Puller cannot be added because a clone is missing ('n_pushers' = " << *this->n_pushers
121 0 : << ", 'n_pullers' = " << *this->n_pullers << ", 'n_clones' = " << *this->n_clones << ").";
122 0 : throw tools::runtime_error(__FILE__, __LINE__, __func__, message.str());
123 0 : }
124 :
125 2342 : this->tid_pull = (*this->n_pullers);
126 2342 : (*this->n_pullers)++;
127 2342 : this->cur_pull_id = this->tid_pull;
128 2342 : }
129 :
130 : void
131 9845 : Adaptor_m_to_n::send_cancel_signal()
132 : {
133 9845 : *this->waiting_canceled = true;
134 9838 : }
135 :
136 : void
137 4926 : Adaptor_m_to_n::reset()
138 : {
139 4926 : if (!this->cloned)
140 : {
141 242 : *this->waiting_canceled = false;
142 5420 : for (size_t d = 0; d < this->buffer->size(); d++)
143 : {
144 5178 : (*this->first)[d] = 0;
145 5178 : (*this->last)[d] = 0;
146 5178 : (*this->counter)[d] = this->buffer_size;
147 : }
148 : }
149 4926 : this->cur_push_id = (size_t)this->tid_push;
150 4926 : this->cur_pull_id = (size_t)this->tid_pull;
151 4926 : this->reset_buffer();
152 4926 : }
153 :
154 : void
155 5162 : Adaptor_m_to_n::set_no_copy_pull(const bool no_copy_pull)
156 : {
157 5162 : this->no_copy_pull = no_copy_pull;
158 5162 : }
159 :
160 : void
161 5168 : Adaptor_m_to_n::set_no_copy_push(const bool no_copy_push)
162 : {
163 5168 : this->no_copy_push = no_copy_push;
164 5168 : }
165 :
166 : bool
167 545477 : Adaptor_m_to_n::is_no_copy_pull()
168 : {
169 545477 : return this->no_copy_pull;
170 : }
171 :
172 : bool
173 583401 : Adaptor_m_to_n::is_no_copy_push()
174 : {
175 583401 : return this->no_copy_push;
176 : }
177 :
178 : void
179 10094 : Adaptor_m_to_n::reset_buffer()
180 : {
181 10094 : if (!this->cloned && *this->buffers_allocated)
182 : {
183 726 : size_t id_buff = 0;
184 16260 : for (size_t d = 0; d < (*this->buffer).size(); d++)
185 47175 : for (size_t s = 0; s < this->n_sockets; s++)
186 4569858 : for (size_t b = 0; b < this->buffer_size; b++)
187 4538217 : (*this->buffer)[d][s][b] = this->buffer_to_free[id_buff++];
188 : }
189 10094 : }
190 :
191 : void
192 11486 : Adaptor_m_to_n::set_n_frames(const size_t n_frames)
193 : {
194 11486 : const auto old_n_frames = this->get_n_frames();
195 11486 : if (old_n_frames != n_frames)
196 : {
197 2374 : Module::set_n_frames(n_frames);
198 2374 : if (!this->cloned)
199 : {
200 62 : if (*this->buffers_allocated)
201 : {
202 2448 : for (size_t d = 0; d < (*this->buffer).size(); d++)
203 : {
204 7178 : for (size_t s = 0; s < (*this->buffer)[d].size(); s++)
205 : {
206 108352 : for (size_t b = 0; b < (*this->buffer)[d][s].size(); b++)
207 : {
208 103560 : auto old_ptr = (*this->buffer)[d][s][b];
209 103560 : (*this->buffer)[d][s][b] = new int8_t[this->n_bytes[s] * n_frames];
210 :
211 103560 : bool found = false;
212 191178516 : for (size_t bf = 0; bf < this->buffer_to_free.size(); bf++)
213 191178516 : if (this->buffer_to_free[bf] == old_ptr)
214 : {
215 103560 : delete[] this->buffer_to_free[bf];
216 103560 : this->buffer_to_free[bf] = (*this->buffer)[d][s][b];
217 103560 : found = true;
218 103560 : break;
219 : }
220 :
221 103560 : if (found == false)
222 : {
223 0 : std::stringstream message;
224 0 : message << "This should never happen.";
225 0 : throw tools::runtime_error(__FILE__, __LINE__, __func__, message.str());
226 0 : }
227 : }
228 : }
229 : }
230 : }
231 : }
232 : }
233 11486 : }
234 :
235 : // --------------------------------------------------------------------------------------------------------------------
236 :
237 : void
238 0 : Adaptor_m_to_n::push(const std::vector<const int8_t*>& in, const size_t /*frame_id*/)
239 : {
240 0 : this->wait_push();
241 :
242 0 : for (size_t s = 0; s < this->n_sockets; s++)
243 : {
244 0 : int8_t* out = (int8_t*)this->get_empty_buffer(s);
245 :
246 0 : std::copy(
247 0 : in[s] + 0 * this->n_bytes[s], in[s] + this->get_n_frames() * this->n_bytes[s], out + 0 * this->n_bytes[s]);
248 : }
249 :
250 0 : this->wake_up_puller();
251 0 : }
252 :
253 : void
254 0 : Adaptor_m_to_n::pull(const std::vector<int8_t*>& out, const size_t /*frame_id*/)
255 : {
256 0 : this->wait_pull();
257 :
258 0 : for (size_t s = 0; s < this->n_sockets; s++)
259 : {
260 0 : const int8_t* in = (const int8_t*)this->get_filled_buffer(s);
261 :
262 0 : std::copy(
263 0 : in + 0 * this->n_bytes[s], in + this->get_n_frames() * this->n_bytes[s], out[s] + 0 * this->n_bytes[s]);
264 : }
265 :
266 0 : this->wake_up_pusher();
267 0 : }
268 :
269 : void
270 585263 : Adaptor_m_to_n::wait_push()
271 : {
272 585263 : if (this->tid_push < 0)
273 : {
274 0 : std::stringstream message;
275 0 : message << "This instance is not a pusher.";
276 0 : throw tools::runtime_error(__FILE__, __LINE__, __func__, message.str());
277 0 : }
278 :
279 585263 : if (this->active_waiting)
280 : {
281 66037 : while (this->is_full(this->cur_push_id) && !*this->waiting_canceled)
282 : ;
283 : }
284 : else // passive waiting
285 : {
286 522799 : if (this->is_full(this->cur_push_id) && !*this->waiting_canceled)
287 : {
288 203902 : std::unique_lock<std::mutex> lock((*this->mtx_push.get())[this->cur_push_id]);
289 214230 : ((*this->cnd_push.get())[this->cur_push_id])
290 639780 : .wait(lock, [this]() { return !(this->is_full(this->cur_push_id) && !*this->waiting_canceled); });
291 213980 : }
292 : }
293 :
294 597740 : if (*this->waiting_canceled) throw tools::waiting_canceled(__FILE__, __LINE__, __func__);
295 587060 : }
296 :
297 : void
298 548064 : Adaptor_m_to_n::wait_pull()
299 : {
300 548064 : if (this->tid_pull < 0)
301 : {
302 0 : std::stringstream message;
303 0 : message << "This instance is not a puller.";
304 0 : throw tools::runtime_error(__FILE__, __LINE__, __func__, message.str());
305 0 : }
306 :
307 548064 : if (this->active_waiting)
308 : {
309 581241 : while (this->is_empty(this->cur_pull_id) && !*this->waiting_canceled)
310 : ;
311 : }
312 : else // passive waiting
313 : {
314 505888 : if (this->is_empty(this->cur_pull_id) && !*this->waiting_canceled)
315 : {
316 227272 : std::unique_lock<std::mutex> lock((*this->mtx_pull.get())[this->cur_pull_id]);
317 231226 : ((*this->cnd_pull.get())[this->cur_pull_id])
318 710084 : .wait(lock, [this]() { return !(this->is_empty(this->cur_pull_id) && !*this->waiting_canceled); });
319 230079 : }
320 : }
321 :
322 524156 : if (this->is_empty(this->cur_pull_id) && *this->waiting_canceled)
323 1680 : throw tools::waiting_canceled(__FILE__, __LINE__, __func__);
324 538400 : }
325 :
326 : void*
327 1031437 : Adaptor_m_to_n::get_empty_buffer(const size_t sid)
328 : {
329 : #ifndef SPU_FAST
330 1031437 : if (!*this->buffers_allocated)
331 : {
332 0 : std::stringstream message;
333 0 : message << "You need to call 'alloc_buffers()' before to change 'get_empty_buffer()'.";
334 0 : throw tools::runtime_error(__FILE__, __LINE__, __func__, message.str());
335 0 : }
336 : #endif
337 1034216 : return (void*)(*this->buffer)[this->cur_push_id][sid][(*this->last)[this->cur_push_id]];
338 : }
339 :
340 : void*
341 944085 : Adaptor_m_to_n::get_filled_buffer(const size_t sid)
342 : {
343 : #ifndef SPU_FAST
344 944085 : if (!*this->buffers_allocated)
345 : {
346 0 : std::stringstream message;
347 0 : message << "You need to call 'alloc_buffers()' before to change 'get_filled_buffer()'.";
348 0 : throw tools::runtime_error(__FILE__, __LINE__, __func__, message.str());
349 0 : }
350 : #endif
351 940555 : return (void*)(*this->buffer)[this->cur_pull_id][sid][(*this->first)[this->cur_pull_id]];
352 : }
353 :
354 : void*
355 1052205 : Adaptor_m_to_n::get_empty_buffer(const size_t sid, void* swap_buffer)
356 : {
357 : #ifndef SPU_FAST
358 1052205 : if (!*this->buffers_allocated)
359 : {
360 0 : std::stringstream message;
361 0 : message << "You need to call 'alloc_buffers()' before to change 'get_empty_buffer()'.";
362 0 : throw tools::runtime_error(__FILE__, __LINE__, __func__, message.str());
363 0 : }
364 : #endif
365 1049342 : void* empty_buffer = (void*)(*this->buffer)[this->cur_push_id][sid][(*this->last)[this->cur_push_id]];
366 1021216 : (*this->buffer)[this->cur_push_id][sid][(*this->last)[this->cur_push_id]] = (int8_t*)swap_buffer;
367 1007081 : return empty_buffer;
368 : }
369 :
370 : void*
371 957834 : Adaptor_m_to_n::get_filled_buffer(const size_t sid, void* swap_buffer)
372 : {
373 : #ifndef SPU_FAST
374 957834 : if (!*this->buffers_allocated)
375 : {
376 0 : std::stringstream message;
377 0 : message << "You need to call 'alloc_buffers()' before to change 'get_filled_buffer()'.";
378 0 : throw tools::runtime_error(__FILE__, __LINE__, __func__, message.str());
379 0 : }
380 : #endif
381 952220 : void* filled_buffer = (void*)(*this->buffer)[this->cur_pull_id][sid][(*this->first)[this->cur_pull_id]];
382 922279 : (*this->buffer)[this->cur_pull_id][sid][(*this->first)[this->cur_pull_id]] = (int8_t*)swap_buffer;
383 910548 : return filled_buffer;
384 : }
385 :
386 : void
387 606575 : Adaptor_m_to_n::wake_up_puller()
388 : {
389 606575 : (*this->last)[this->cur_push_id] = ((*this->last)[this->cur_push_id] + 1) % this->buffer_size;
390 602234 : (*this->counter)[this->cur_push_id]--; // atomic fetch sub
391 :
392 627753 : if (!this->active_waiting) // passive waiting
393 : {
394 560850 : if (!this->is_empty(this->cur_push_id))
395 : {
396 549093 : std::lock_guard<std::mutex> lock((*this->mtx_pull.get())[this->cur_push_id]);
397 558659 : (*this->cnd_pull.get())[this->cur_push_id].notify_one();
398 561305 : }
399 : }
400 :
401 628747 : this->cur_push_id = (this->cur_push_id + *this->n_pushers) % this->buffer->size();
402 614578 : }
403 :
404 : void
405 564364 : Adaptor_m_to_n::wake_up_pusher()
406 : {
407 564364 : (*this->first)[this->cur_pull_id] = ((*this->first)[this->cur_pull_id] + 1) % this->buffer_size;
408 555165 : (*this->counter)[this->cur_pull_id]++; // atomic fetch add
409 :
410 589956 : if (!this->active_waiting) // passive waiting
411 : {
412 543986 : if (!this->is_full(this->cur_pull_id))
413 : {
414 527478 : std::lock_guard<std::mutex> lock((*this->mtx_push.get())[this->cur_pull_id]);
415 542338 : (*this->cnd_push.get())[this->cur_pull_id].notify_one();
416 543269 : }
417 : }
418 :
419 592130 : this->cur_pull_id = (this->cur_pull_id + *this->n_pullers) % this->buffer->size();
420 573787 : }
421 :
422 : void
423 9837 : Adaptor_m_to_n::wake_up()
424 : {
425 9837 : if (!this->active_waiting) // passive waiting
426 : {
427 394209 : for (size_t i = 0; i < this->buffer->size(); i++)
428 : {
429 384798 : std::unique_lock<std::mutex> lock((*this->mtx_push.get())[i]);
430 385682 : (*this->cnd_push.get())[i].notify_all();
431 385069 : }
432 393869 : for (size_t i = 0; i < this->buffer->size(); i++)
433 : {
434 383923 : std::unique_lock<std::mutex> lock((*this->mtx_pull.get())[i]);
435 385183 : (*this->cnd_pull.get())[i].notify_all();
436 385337 : }
437 : }
438 9671 : }
439 :
440 : void
441 9846 : Adaptor_m_to_n::cancel_waiting()
442 : {
443 9846 : this->send_cancel_signal();
444 9839 : this->wake_up();
445 9837 : }
|