Line data Source code
1 : #include "Module/Stateful/Adaptor/Adaptor_m_to_n.hpp"
2 : #include "Tools/Math/utils.h"
3 :
4 : using namespace spu;
5 : using namespace spu::module;
6 :
7 16230 : Adaptor_m_to_n::~Adaptor_m_to_n()
8 : {
9 1518145 : for (auto b : this->buffer_to_free)
10 1512735 : delete[] b;
11 5410 : if (this->cloned) (*this->n_clones)--;
12 10820 : }
13 :
14 : Adaptor_m_to_n*
15 5168 : Adaptor_m_to_n::clone() const
16 : {
17 5168 : auto m = new Adaptor_m_to_n(*this);
18 5168 : m->deep_copy(*this);
19 5168 : return m;
20 : }
21 :
22 : void
23 5168 : Adaptor_m_to_n::deep_copy(const Adaptor_m_to_n& m)
24 : {
25 5168 : Stateful::deep_copy(m);
26 :
27 5168 : if (*this->buffers_allocated)
28 : {
29 0 : std::stringstream message;
30 0 : message << "Shared buffers have already been allocated, cloning is no more possible." << std::endl;
31 0 : throw tools::runtime_error(__FILE__, __LINE__, __func__, message.str());
32 0 : }
33 :
34 5168 : this->cloned = true;
35 5168 : (*this->n_clones)++;
36 5168 : this->tid_push = -1;
37 5168 : this->tid_pull = -1;
38 5168 : this->cur_push_id = -1;
39 5168 : this->cur_pull_id = -1;
40 :
41 5168 : this->waiting_canceled.reset(new std::atomic<bool>(m.waiting_canceled->load()));
42 5168 : }
43 :
44 : void
45 242 : Adaptor_m_to_n::alloc_buffers()
46 : {
47 242 : if (*this->buffers_allocated)
48 : {
49 0 : std::stringstream message;
50 0 : message << "Synchronization buffers have already been allocated.";
51 0 : throw tools::runtime_error(__FILE__, __LINE__, __func__, message.str());
52 0 : }
53 :
54 242 : if (this->cloned)
55 : {
56 0 : std::stringstream message;
57 0 : message << "'alloc_buffers()' cannot be called on a cloned module ('tid_push' = " << this->tid_push
58 0 : << ", 'tid_pull' = " << this->tid_pull << ").";
59 0 : throw tools::runtime_error(__FILE__, __LINE__, __func__, message.str());
60 0 : }
61 :
62 242 : size_t ppcm = tools::find_smallest_common_multiple(*this->n_pushers, *this->n_pullers);
63 :
64 242 : if (ppcm > 1000)
65 : {
66 0 : std::stringstream message;
67 0 : message << "'ppcm' cannot exceed 1000 ('ppcm' = " << ppcm << ").";
68 0 : throw tools::invalid_argument(__FILE__, __LINE__, __func__, message.str());
69 0 : }
70 :
71 484 : this->buffer->resize(ppcm,
72 484 : std::vector<std::vector<int8_t*>>(this->n_sockets, std::vector<int8_t*>(this->buffer_size)));
73 :
74 242 : if (this->buffer_to_free.size())
75 : {
76 0 : std::stringstream message;
77 0 : message << "This should never happen.";
78 0 : throw tools::runtime_error(__FILE__, __LINE__, __func__, message.str());
79 0 : }
80 :
81 : // this->buffer_to_free.clear();
82 5420 : for (size_t d = 0; d < ppcm; d++)
83 : {
84 15721 : for (size_t s = 0; s < this->n_sockets; s++)
85 1523278 : for (size_t b = 0; b < this->buffer_size; b++)
86 : {
87 1512735 : (*this->buffer)[d][s][b] = new int8_t[this->n_frames * this->n_bytes[s]];
88 1512735 : this->buffer_to_free.push_back((*this->buffer)[d][s][b]);
89 : }
90 5178 : (*this->first)[d] = 0;
91 5178 : (*this->last)[d] = 0;
92 5178 : (*this->counter)[d] = this->buffer_size;
93 : }
94 :
95 242 : *this->buffers_allocated = true;
96 242 : }
97 :
98 : void
99 2342 : Adaptor_m_to_n::add_pusher()
100 : {
101 2342 : if (*this->n_pushers - 1 + *this->n_pullers - 1 >= *this->n_clones)
102 : {
103 0 : std::stringstream message;
104 0 : message << "Pusher cannot be added because a clone is missing ('n_pushers' = " << *this->n_pushers
105 0 : << ", 'n_pullers' = " << *this->n_pullers << ", 'n_clones' = " << *this->n_clones << ").";
106 0 : throw tools::runtime_error(__FILE__, __LINE__, __func__, message.str());
107 0 : }
108 :
109 2342 : this->tid_push = (*this->n_pushers);
110 2342 : (*this->n_pushers)++;
111 2342 : this->cur_push_id = this->tid_push;
112 2342 : }
113 :
114 : void
115 2342 : Adaptor_m_to_n::add_puller()
116 : {
117 2342 : if (*this->n_pushers - 1 + *this->n_pullers - 1 >= *this->n_clones)
118 : {
119 0 : std::stringstream message;
120 0 : message << "Puller cannot be added because a clone is missing ('n_pushers' = " << *this->n_pushers
121 0 : << ", 'n_pullers' = " << *this->n_pullers << ", 'n_clones' = " << *this->n_clones << ").";
122 0 : throw tools::runtime_error(__FILE__, __LINE__, __func__, message.str());
123 0 : }
124 :
125 2342 : this->tid_pull = (*this->n_pullers);
126 2342 : (*this->n_pullers)++;
127 2342 : this->cur_pull_id = this->tid_pull;
128 2342 : }
129 :
130 : void
131 9848 : Adaptor_m_to_n::send_cancel_signal()
132 : {
133 9848 : *this->waiting_canceled = true;
134 9846 : }
135 :
136 : void
137 4926 : Adaptor_m_to_n::reset()
138 : {
139 4926 : if (!this->cloned)
140 : {
141 242 : *this->waiting_canceled = false;
142 5420 : for (size_t d = 0; d < this->buffer->size(); d++)
143 : {
144 5178 : (*this->first)[d] = 0;
145 5178 : (*this->last)[d] = 0;
146 5178 : (*this->counter)[d] = this->buffer_size;
147 : }
148 : }
149 4926 : this->cur_push_id = (size_t)this->tid_push;
150 4926 : this->cur_pull_id = (size_t)this->tid_pull;
151 4926 : this->reset_buffer();
152 4926 : }
153 :
154 : void
155 5167 : Adaptor_m_to_n::set_no_copy_pull(const bool no_copy_pull)
156 : {
157 5167 : this->no_copy_pull = no_copy_pull;
158 5167 : }
159 :
160 : void
161 5168 : Adaptor_m_to_n::set_no_copy_push(const bool no_copy_push)
162 : {
163 5168 : this->no_copy_push = no_copy_push;
164 5168 : }
165 :
166 : bool
167 536035 : Adaptor_m_to_n::is_no_copy_pull()
168 : {
169 536035 : return this->no_copy_pull;
170 : }
171 :
172 : bool
173 544648 : Adaptor_m_to_n::is_no_copy_push()
174 : {
175 544648 : return this->no_copy_push;
176 : }
177 :
178 : void
179 10094 : Adaptor_m_to_n::reset_buffer()
180 : {
181 10094 : if (!this->cloned && *this->buffers_allocated)
182 : {
183 726 : size_t id_buff = 0;
184 16260 : for (size_t d = 0; d < (*this->buffer).size(); d++)
185 47163 : for (size_t s = 0; s < this->n_sockets; s++)
186 4569834 : for (size_t b = 0; b < this->buffer_size; b++)
187 4538205 : (*this->buffer)[d][s][b] = this->buffer_to_free[id_buff++];
188 : }
189 10094 : }
190 :
191 : void
192 11486 : Adaptor_m_to_n::set_n_frames(const size_t n_frames)
193 : {
194 11486 : const auto old_n_frames = this->get_n_frames();
195 11486 : if (old_n_frames != n_frames)
196 : {
197 2374 : Module::set_n_frames(n_frames);
198 2374 : if (!this->cloned)
199 : {
200 62 : if (*this->buffers_allocated)
201 : {
202 2448 : for (size_t d = 0; d < (*this->buffer).size(); d++)
203 : {
204 7178 : for (size_t s = 0; s < (*this->buffer)[d].size(); s++)
205 : {
206 108352 : for (size_t b = 0; b < (*this->buffer)[d][s].size(); b++)
207 : {
208 103560 : auto old_ptr = (*this->buffer)[d][s][b];
209 103560 : (*this->buffer)[d][s][b] = new int8_t[this->n_bytes[s] * n_frames];
210 :
211 103560 : bool found = false;
212 191178516 : for (size_t bf = 0; bf < this->buffer_to_free.size(); bf++)
213 191178516 : if (this->buffer_to_free[bf] == old_ptr)
214 : {
215 103560 : delete[] this->buffer_to_free[bf];
216 103560 : this->buffer_to_free[bf] = (*this->buffer)[d][s][b];
217 103560 : found = true;
218 103560 : break;
219 : }
220 :
221 103560 : if (found == false)
222 : {
223 0 : std::stringstream message;
224 0 : message << "This should never happen.";
225 0 : throw tools::runtime_error(__FILE__, __LINE__, __func__, message.str());
226 0 : }
227 : }
228 : }
229 : }
230 : }
231 : }
232 : }
233 11486 : }
234 :
235 : // --------------------------------------------------------------------------------------------------------------------
236 :
237 : void
238 0 : Adaptor_m_to_n::push(const std::vector<const int8_t*>& in, const size_t frame_id)
239 : {
240 0 : this->wait_push();
241 :
242 0 : for (size_t s = 0; s < this->n_sockets; s++)
243 : {
244 0 : int8_t* out = (int8_t*)this->get_empty_buffer(s);
245 :
246 0 : std::copy(
247 0 : in[s] + 0 * this->n_bytes[s], in[s] + this->get_n_frames() * this->n_bytes[s], out + 0 * this->n_bytes[s]);
248 : }
249 :
250 0 : this->wake_up_puller();
251 0 : }
252 :
253 : void
254 0 : Adaptor_m_to_n::pull(const std::vector<int8_t*>& out, const size_t frame_id)
255 : {
256 0 : this->wait_pull();
257 :
258 0 : for (size_t s = 0; s < this->n_sockets; s++)
259 : {
260 0 : const int8_t* in = (const int8_t*)this->get_filled_buffer(s);
261 :
262 0 : std::copy(
263 0 : in + 0 * this->n_bytes[s], in + this->get_n_frames() * this->n_bytes[s], out[s] + 0 * this->n_bytes[s]);
264 : }
265 :
266 0 : this->wake_up_pusher();
267 0 : }
268 :
269 : void
270 545787 : Adaptor_m_to_n::wait_push()
271 : {
272 545787 : if (this->tid_push < 0)
273 : {
274 0 : std::stringstream message;
275 0 : message << "This instance is not a pusher.";
276 0 : throw tools::runtime_error(__FILE__, __LINE__, __func__, message.str());
277 0 : }
278 :
279 545787 : if (this->active_waiting)
280 : {
281 47934 : while (this->is_full(this->cur_push_id) && !*this->waiting_canceled)
282 : ;
283 : }
284 : else // passive waiting
285 : {
286 527901 : if (this->is_full(this->cur_push_id) && !*this->waiting_canceled)
287 : {
288 299917 : std::unique_lock<std::mutex> lock((*this->mtx_push.get())[this->cur_push_id]);
289 314605 : ((*this->cnd_push.get())[this->cur_push_id])
290 937479 : .wait(lock, [this]() { return !(this->is_full(this->cur_push_id) && !*this->waiting_canceled); });
291 313896 : }
292 : }
293 :
294 570791 : if (*this->waiting_canceled) throw tools::waiting_canceled(__FILE__, __LINE__, __func__);
295 564464 : }
296 :
297 : void
298 537039 : Adaptor_m_to_n::wait_pull()
299 : {
300 537039 : if (this->tid_pull < 0)
301 : {
302 0 : std::stringstream message;
303 0 : message << "This instance is not a puller.";
304 0 : throw tools::runtime_error(__FILE__, __LINE__, __func__, message.str());
305 0 : }
306 :
307 537039 : if (this->active_waiting)
308 : {
309 808320 : while (this->is_empty(this->cur_pull_id) && !*this->waiting_canceled)
310 : ;
311 : }
312 : else // passive waiting
313 : {
314 522766 : if (this->is_empty(this->cur_pull_id) && !*this->waiting_canceled)
315 : {
316 151014 : std::unique_lock<std::mutex> lock((*this->mtx_pull.get())[this->cur_pull_id]);
317 152969 : ((*this->cnd_pull.get())[this->cur_pull_id])
318 485293 : .wait(lock, [this]() { return !(this->is_empty(this->cur_pull_id) && !*this->waiting_canceled); });
319 152447 : }
320 : }
321 :
322 508778 : if (this->is_empty(this->cur_pull_id) && *this->waiting_canceled)
323 2009 : throw tools::waiting_canceled(__FILE__, __LINE__, __func__);
324 530286 : }
325 :
326 : void*
327 797272 : Adaptor_m_to_n::get_empty_buffer(const size_t sid)
328 : {
329 : #ifndef SPU_FAST
330 797272 : if (!*this->buffers_allocated)
331 : {
332 0 : std::stringstream message;
333 0 : message << "You need to call 'alloc_buffers()' before to change 'get_empty_buffer()'.";
334 0 : throw tools::runtime_error(__FILE__, __LINE__, __func__, message.str());
335 0 : }
336 : #endif
337 797133 : return (void*)(*this->buffer)[this->cur_push_id][sid][(*this->last)[this->cur_push_id]];
338 : }
339 :
340 : void*
341 748041 : Adaptor_m_to_n::get_filled_buffer(const size_t sid)
342 : {
343 : #ifndef SPU_FAST
344 748041 : if (!*this->buffers_allocated)
345 : {
346 0 : std::stringstream message;
347 0 : message << "You need to call 'alloc_buffers()' before to change 'get_filled_buffer()'.";
348 0 : throw tools::runtime_error(__FILE__, __LINE__, __func__, message.str());
349 0 : }
350 : #endif
351 745370 : return (void*)(*this->buffer)[this->cur_pull_id][sid][(*this->first)[this->cur_pull_id]];
352 : }
353 :
354 : void*
355 797307 : Adaptor_m_to_n::get_empty_buffer(const size_t sid, void* swap_buffer)
356 : {
357 : #ifndef SPU_FAST
358 797307 : if (!*this->buffers_allocated)
359 : {
360 0 : std::stringstream message;
361 0 : message << "You need to call 'alloc_buffers()' before to change 'get_empty_buffer()'.";
362 0 : throw tools::runtime_error(__FILE__, __LINE__, __func__, message.str());
363 0 : }
364 : #endif
365 796775 : void* empty_buffer = (void*)(*this->buffer)[this->cur_push_id][sid][(*this->last)[this->cur_push_id]];
366 787781 : (*this->buffer)[this->cur_push_id][sid][(*this->last)[this->cur_push_id]] = (int8_t*)swap_buffer;
367 783766 : return empty_buffer;
368 : }
369 :
370 : void*
371 757227 : Adaptor_m_to_n::get_filled_buffer(const size_t sid, void* swap_buffer)
372 : {
373 : #ifndef SPU_FAST
374 757227 : if (!*this->buffers_allocated)
375 : {
376 0 : std::stringstream message;
377 0 : message << "You need to call 'alloc_buffers()' before to change 'get_filled_buffer()'.";
378 0 : throw tools::runtime_error(__FILE__, __LINE__, __func__, message.str());
379 0 : }
380 : #endif
381 754833 : void* filled_buffer = (void*)(*this->buffer)[this->cur_pull_id][sid][(*this->first)[this->cur_pull_id]];
382 736371 : (*this->buffer)[this->cur_pull_id][sid][(*this->first)[this->cur_pull_id]] = (int8_t*)swap_buffer;
383 733858 : return filled_buffer;
384 : }
385 :
386 : void
387 565179 : Adaptor_m_to_n::wake_up_puller()
388 : {
389 565179 : (*this->last)[this->cur_push_id] = ((*this->last)[this->cur_push_id] + 1) % this->buffer_size;
390 564209 : (*this->counter)[this->cur_push_id]--; // atomic fetch sub
391 :
392 578307 : if (!this->active_waiting) // passive waiting
393 : {
394 559270 : if (!this->is_empty(this->cur_push_id))
395 : {
396 552058 : std::lock_guard<std::mutex> lock((*this->mtx_pull.get())[this->cur_push_id]);
397 558938 : (*this->cnd_pull.get())[this->cur_push_id].notify_one();
398 559183 : }
399 : }
400 :
401 579476 : this->cur_push_id = (this->cur_push_id + *this->n_pushers) % this->buffer->size();
402 571213 : }
403 :
404 : void
405 535636 : Adaptor_m_to_n::wake_up_pusher()
406 : {
407 535636 : (*this->first)[this->cur_pull_id] = ((*this->first)[this->cur_pull_id] + 1) % this->buffer_size;
408 532722 : (*this->counter)[this->cur_pull_id]++; // atomic fetch add
409 :
410 555799 : if (!this->active_waiting) // passive waiting
411 : {
412 540274 : if (!this->is_full(this->cur_pull_id))
413 : {
414 526593 : std::lock_guard<std::mutex> lock((*this->mtx_push.get())[this->cur_pull_id]);
415 540391 : (*this->cnd_push.get())[this->cur_pull_id].notify_one();
416 542715 : }
417 : }
418 :
419 559386 : this->cur_pull_id = (this->cur_pull_id + *this->n_pullers) % this->buffer->size();
420 549128 : }
421 :
422 : void
423 9844 : Adaptor_m_to_n::wake_up()
424 : {
425 9844 : if (!this->active_waiting) // passive waiting
426 : {
427 394245 : for (size_t i = 0; i < this->buffer->size(); i++)
428 : {
429 385214 : std::unique_lock<std::mutex> lock((*this->mtx_push.get())[i]);
430 385826 : (*this->cnd_push.get())[i].notify_all();
431 385248 : }
432 394095 : for (size_t i = 0; i < this->buffer->size(); i++)
433 : {
434 384701 : std::unique_lock<std::mutex> lock((*this->mtx_pull.get())[i]);
435 385653 : (*this->cnd_pull.get())[i].notify_all();
436 385369 : }
437 : }
438 9741 : }
439 :
440 : void
441 9848 : Adaptor_m_to_n::cancel_waiting()
442 : {
443 9848 : this->send_cancel_signal();
444 9846 : this->wake_up();
445 9848 : }
|