Line data Source code
1 : #include "Module/Stateful/Adaptor/Adaptor_m_to_n.hpp"
2 : #include "Tools/Math/utils.h"
3 :
4 : using namespace spu;
5 : using namespace spu::module;
6 :
7 19359 : Adaptor_m_to_n::~Adaptor_m_to_n()
8 : {
9 1565393 : for (auto b : this->buffer_to_free)
10 1558940 : delete[] b;
11 6453 : if (this->cloned) (*this->n_clones)--;
12 12906 : }
13 :
14 : Adaptor_m_to_n*
15 6184 : Adaptor_m_to_n::clone() const
16 : {
17 6184 : auto m = new Adaptor_m_to_n(*this);
18 6184 : m->deep_copy(*this);
19 6184 : return m;
20 : }
21 :
22 : void
23 6184 : Adaptor_m_to_n::deep_copy(const Adaptor_m_to_n& m)
24 : {
25 6184 : Stateful::deep_copy(m);
26 :
27 6184 : if (*this->buffers_allocated)
28 : {
29 0 : std::stringstream message;
30 0 : message << "Shared buffers have already been allocated, cloning is no more possible." << std::endl;
31 0 : throw tools::runtime_error(__FILE__, __LINE__, __func__, message.str());
32 0 : }
33 :
34 6184 : this->cloned = true;
35 6184 : (*this->n_clones)++;
36 6184 : this->tid_push = -1;
37 6184 : this->tid_pull = -1;
38 6184 : this->cur_push_id = -1;
39 6184 : this->cur_pull_id = -1;
40 :
41 6184 : this->waiting_canceled.reset(new std::atomic<bool>(m.waiting_canceled->load()));
42 6184 : }
43 :
44 : void
45 269 : Adaptor_m_to_n::alloc_buffers()
46 : {
47 269 : if (*this->buffers_allocated)
48 : {
49 0 : std::stringstream message;
50 0 : message << "Synchronization buffers have already been allocated.";
51 0 : throw tools::runtime_error(__FILE__, __LINE__, __func__, message.str());
52 0 : }
53 :
54 269 : if (this->cloned)
55 : {
56 0 : std::stringstream message;
57 0 : message << "'alloc_buffers()' cannot be called on a cloned module ('tid_push' = " << this->tid_push
58 0 : << ", 'tid_pull' = " << this->tid_pull << ").";
59 0 : throw tools::runtime_error(__FILE__, __LINE__, __func__, message.str());
60 0 : }
61 :
62 269 : size_t ppcm = tools::find_smallest_common_multiple(*this->n_pushers, *this->n_pullers);
63 :
64 269 : if (ppcm > 1000)
65 : {
66 0 : std::stringstream message;
67 0 : message << "'ppcm' cannot exceed 1000 ('ppcm' = " << ppcm << ").";
68 0 : throw tools::invalid_argument(__FILE__, __LINE__, __func__, message.str());
69 0 : }
70 :
71 538 : this->buffer->resize(ppcm,
72 538 : std::vector<std::vector<int8_t*>>(this->n_sockets, std::vector<int8_t*>(this->buffer_size)));
73 :
74 269 : if (this->buffer_to_free.size())
75 : {
76 0 : std::stringstream message;
77 0 : message << "This should never happen.";
78 0 : throw tools::runtime_error(__FILE__, __LINE__, __func__, message.str());
79 0 : }
80 :
81 : // this->buffer_to_free.clear();
82 6436 : for (size_t d = 0; d < ppcm; d++)
83 : {
84 18495 : for (size_t s = 0; s < this->n_sockets; s++)
85 1571268 : for (size_t b = 0; b < this->buffer_size; b++)
86 : {
87 1558940 : (*this->buffer)[d][s][b] = new int8_t[this->n_frames * this->n_bytes[s]];
88 1558940 : this->buffer_to_free.push_back((*this->buffer)[d][s][b]);
89 : }
90 6167 : (*this->first)[d] = 0;
91 6167 : (*this->last)[d] = 0;
92 6167 : (*this->counter)[d] = this->buffer_size;
93 : }
94 :
95 269 : *this->buffers_allocated = true;
96 269 : }
97 :
98 : void
99 2823 : Adaptor_m_to_n::add_pusher()
100 : {
101 2823 : if (*this->n_pushers - 1 + *this->n_pullers - 1 >= *this->n_clones)
102 : {
103 0 : std::stringstream message;
104 0 : message << "Pusher cannot be added because a clone is missing ('n_pushers' = " << *this->n_pushers
105 0 : << ", 'n_pullers' = " << *this->n_pullers << ", 'n_clones' = " << *this->n_clones << ").";
106 0 : throw tools::runtime_error(__FILE__, __LINE__, __func__, message.str());
107 0 : }
108 :
109 2823 : this->tid_push = (*this->n_pushers);
110 2823 : (*this->n_pushers)++;
111 2823 : this->cur_push_id = this->tid_push;
112 2823 : }
113 :
114 : void
115 2823 : Adaptor_m_to_n::add_puller()
116 : {
117 2823 : if (*this->n_pushers - 1 + *this->n_pullers - 1 >= *this->n_clones)
118 : {
119 0 : std::stringstream message;
120 0 : message << "Puller cannot be added because a clone is missing ('n_pushers' = " << *this->n_pushers
121 0 : << ", 'n_pullers' = " << *this->n_pullers << ", 'n_clones' = " << *this->n_clones << ").";
122 0 : throw tools::runtime_error(__FILE__, __LINE__, __func__, message.str());
123 0 : }
124 :
125 2823 : this->tid_pull = (*this->n_pullers);
126 2823 : (*this->n_pullers)++;
127 2823 : this->cur_pull_id = this->tid_pull;
128 2823 : }
129 :
130 : void
131 11827 : Adaptor_m_to_n::send_cancel_signal()
132 : {
133 11827 : *this->waiting_canceled = true;
134 11826 : }
135 :
136 : void
137 5915 : Adaptor_m_to_n::reset()
138 : {
139 5915 : if (!this->cloned)
140 : {
141 269 : *this->waiting_canceled = false;
142 6436 : for (size_t d = 0; d < this->buffer->size(); d++)
143 : {
144 6167 : (*this->first)[d] = 0;
145 6167 : (*this->last)[d] = 0;
146 6167 : (*this->counter)[d] = this->buffer_size;
147 : }
148 : }
149 5915 : this->cur_push_id = (size_t)this->tid_push;
150 5915 : this->cur_pull_id = (size_t)this->tid_pull;
151 5915 : this->reset_buffer();
152 5915 : }
153 :
154 : void
155 6180 : Adaptor_m_to_n::set_no_copy_pull(const bool no_copy_pull)
156 : {
157 6180 : this->no_copy_pull = no_copy_pull;
158 6180 : }
159 :
160 : void
161 6182 : Adaptor_m_to_n::set_no_copy_push(const bool no_copy_push)
162 : {
163 6182 : this->no_copy_push = no_copy_push;
164 6182 : }
165 :
166 : bool
167 540685 : Adaptor_m_to_n::is_no_copy_pull()
168 : {
169 540685 : return this->no_copy_pull;
170 : }
171 :
172 : bool
173 566956 : Adaptor_m_to_n::is_no_copy_push()
174 : {
175 566956 : return this->no_copy_push;
176 : }
177 :
178 : void
179 12099 : Adaptor_m_to_n::reset_buffer()
180 : {
181 12099 : if (!this->cloned && *this->buffers_allocated)
182 : {
183 807 : size_t id_buff = 0;
184 19308 : for (size_t d = 0; d < (*this->buffer).size(); d++)
185 55485 : for (size_t s = 0; s < this->n_sockets; s++)
186 4713804 : for (size_t b = 0; b < this->buffer_size; b++)
187 4676820 : (*this->buffer)[d][s][b] = this->buffer_to_free[id_buff++];
188 : }
189 12099 : }
190 :
191 : void
192 14364 : Adaptor_m_to_n::set_n_frames(const size_t n_frames)
193 : {
194 14364 : const auto old_n_frames = this->get_n_frames();
195 14364 : if (old_n_frames != n_frames)
196 : {
197 4102 : Module::set_n_frames(n_frames);
198 4102 : if (!this->cloned)
199 : {
200 98 : if (*this->buffers_allocated)
201 : {
202 4212 : for (size_t d = 0; d < (*this->buffer).size(); d++)
203 : {
204 12074 : for (size_t s = 0; s < (*this->buffer)[d].size(); s++)
205 : {
206 202048 : for (size_t b = 0; b < (*this->buffer)[d][s].size(); b++)
207 : {
208 194088 : auto old_ptr = (*this->buffer)[d][s][b];
209 194088 : (*this->buffer)[d][s][b] = new int8_t[this->n_bytes[s] * n_frames];
210 :
211 194088 : bool found = false;
212 374933220 : for (size_t bf = 0; bf < this->buffer_to_free.size(); bf++)
213 374933220 : if (this->buffer_to_free[bf] == old_ptr)
214 : {
215 194088 : delete[] this->buffer_to_free[bf];
216 194088 : this->buffer_to_free[bf] = (*this->buffer)[d][s][b];
217 194088 : found = true;
218 194088 : break;
219 : }
220 :
221 194088 : if (found == false)
222 : {
223 0 : std::stringstream message;
224 0 : message << "This should never happen.";
225 0 : throw tools::runtime_error(__FILE__, __LINE__, __func__, message.str());
226 0 : }
227 : }
228 : }
229 : }
230 : }
231 : }
232 : }
233 14364 : }
234 :
235 : // --------------------------------------------------------------------------------------------------------------------
236 :
237 : void
238 0 : Adaptor_m_to_n::push(const std::vector<const int8_t*>& in, const size_t /*frame_id*/)
239 : {
240 0 : this->wait_push();
241 :
242 0 : for (size_t s = 0; s < this->n_sockets; s++)
243 : {
244 0 : int8_t* out = (int8_t*)this->get_empty_buffer(s);
245 :
246 0 : std::copy(
247 0 : in[s] + 0 * this->n_bytes[s], in[s] + this->get_n_frames() * this->n_bytes[s], out + 0 * this->n_bytes[s]);
248 : }
249 :
250 0 : this->wake_up_puller();
251 0 : }
252 :
253 : void
254 0 : Adaptor_m_to_n::pull(const std::vector<int8_t*>& out, const size_t /*frame_id*/)
255 : {
256 0 : this->wait_pull();
257 :
258 0 : for (size_t s = 0; s < this->n_sockets; s++)
259 : {
260 0 : const int8_t* in = (const int8_t*)this->get_filled_buffer(s);
261 :
262 0 : std::copy(
263 0 : in + 0 * this->n_bytes[s], in + this->get_n_frames() * this->n_bytes[s], out[s] + 0 * this->n_bytes[s]);
264 : }
265 :
266 0 : this->wake_up_pusher();
267 0 : }
268 :
269 : void
270 568441 : Adaptor_m_to_n::wait_push()
271 : {
272 568441 : if (this->tid_push < 0)
273 : {
274 0 : std::stringstream message;
275 0 : message << "This instance is not a pusher.";
276 0 : throw tools::runtime_error(__FILE__, __LINE__, __func__, message.str());
277 0 : }
278 :
279 568441 : if (this->active_waiting)
280 : {
281 45121 : while (this->is_full(this->cur_push_id) && !*this->waiting_canceled)
282 : ;
283 : }
284 : else // passive waiting
285 : {
286 524931 : if (this->is_full(this->cur_push_id) && !*this->waiting_canceled)
287 : {
288 236112 : std::unique_lock<std::mutex> lock((*this->mtx_push.get())[this->cur_push_id]);
289 250896 : ((*this->cnd_push.get())[this->cur_push_id])
290 746922 : .wait(lock, [this]() { return !(this->is_full(this->cur_push_id) && !*this->waiting_canceled); });
291 250632 : }
292 : }
293 :
294 594115 : if (*this->waiting_canceled) throw tools::waiting_canceled(__FILE__, __LINE__, __func__);
295 585309 : }
296 :
297 : void
298 543387 : Adaptor_m_to_n::wait_pull()
299 : {
300 543387 : if (this->tid_pull < 0)
301 : {
302 0 : std::stringstream message;
303 0 : message << "This instance is not a puller.";
304 0 : throw tools::runtime_error(__FILE__, __LINE__, __func__, message.str());
305 0 : }
306 :
307 543387 : if (this->active_waiting)
308 : {
309 641732 : while (this->is_empty(this->cur_pull_id) && !*this->waiting_canceled)
310 : ;
311 : }
312 : else // passive waiting
313 : {
314 513732 : if (this->is_empty(this->cur_pull_id) && !*this->waiting_canceled)
315 : {
316 191325 : std::unique_lock<std::mutex> lock((*this->mtx_pull.get())[this->cur_pull_id]);
317 194874 : ((*this->cnd_pull.get())[this->cur_pull_id])
318 609729 : .wait(lock, [this]() { return !(this->is_empty(this->cur_pull_id) && !*this->waiting_canceled); });
319 194407 : }
320 : }
321 :
322 534080 : if (this->is_empty(this->cur_pull_id) && *this->waiting_canceled)
323 1957 : throw tools::waiting_canceled(__FILE__, __LINE__, __func__);
324 535202 : }
325 :
326 : void*
327 1035146 : Adaptor_m_to_n::get_empty_buffer(const size_t sid)
328 : {
329 : #ifndef SPU_FAST
330 1035146 : if (!*this->buffers_allocated)
331 : {
332 0 : std::stringstream message;
333 0 : message << "You need to call 'alloc_buffers()' before to change 'get_empty_buffer()'.";
334 0 : throw tools::runtime_error(__FILE__, __LINE__, __func__, message.str());
335 0 : }
336 : #endif
337 1037056 : return (void*)(*this->buffer)[this->cur_push_id][sid][(*this->last)[this->cur_push_id]];
338 : }
339 :
340 : void*
341 949487 : Adaptor_m_to_n::get_filled_buffer(const size_t sid)
342 : {
343 : #ifndef SPU_FAST
344 949487 : if (!*this->buffers_allocated)
345 : {
346 0 : std::stringstream message;
347 0 : message << "You need to call 'alloc_buffers()' before to change 'get_filled_buffer()'.";
348 0 : throw tools::runtime_error(__FILE__, __LINE__, __func__, message.str());
349 0 : }
350 : #endif
351 944214 : return (void*)(*this->buffer)[this->cur_pull_id][sid][(*this->first)[this->cur_pull_id]];
352 : }
353 :
354 : void*
355 1041965 : Adaptor_m_to_n::get_empty_buffer(const size_t sid, void* swap_buffer)
356 : {
357 : #ifndef SPU_FAST
358 1041965 : if (!*this->buffers_allocated)
359 : {
360 0 : std::stringstream message;
361 0 : message << "You need to call 'alloc_buffers()' before to change 'get_empty_buffer()'.";
362 0 : throw tools::runtime_error(__FILE__, __LINE__, __func__, message.str());
363 0 : }
364 : #endif
365 1038143 : void* empty_buffer = (void*)(*this->buffer)[this->cur_push_id][sid][(*this->last)[this->cur_push_id]];
366 1018264 : (*this->buffer)[this->cur_push_id][sid][(*this->last)[this->cur_push_id]] = (int8_t*)swap_buffer;
367 1006157 : return empty_buffer;
368 : }
369 :
370 : void*
371 961882 : Adaptor_m_to_n::get_filled_buffer(const size_t sid, void* swap_buffer)
372 : {
373 : #ifndef SPU_FAST
374 961882 : if (!*this->buffers_allocated)
375 : {
376 0 : std::stringstream message;
377 0 : message << "You need to call 'alloc_buffers()' before to change 'get_filled_buffer()'.";
378 0 : throw tools::runtime_error(__FILE__, __LINE__, __func__, message.str());
379 0 : }
380 : #endif
381 956283 : void* filled_buffer = (void*)(*this->buffer)[this->cur_pull_id][sid][(*this->first)[this->cur_pull_id]];
382 926461 : (*this->buffer)[this->cur_pull_id][sid][(*this->first)[this->cur_pull_id]] = (int8_t*)swap_buffer;
383 916257 : return filled_buffer;
384 : }
385 :
386 : void
387 595349 : Adaptor_m_to_n::wake_up_puller()
388 : {
389 595349 : (*this->last)[this->cur_push_id] = ((*this->last)[this->cur_push_id] + 1) % this->buffer_size;
390 593159 : (*this->counter)[this->cur_push_id]--; // atomic fetch sub
391 :
392 616131 : if (!this->active_waiting) // passive waiting
393 : {
394 569962 : if (!this->is_empty(this->cur_push_id))
395 : {
396 557642 : std::lock_guard<std::mutex> lock((*this->mtx_pull.get())[this->cur_push_id]);
397 566712 : (*this->cnd_pull.get())[this->cur_push_id].notify_one();
398 568884 : }
399 : }
400 :
401 616715 : this->cur_push_id = (this->cur_push_id + *this->n_pushers) % this->buffer->size();
402 603029 : }
403 :
404 : void
405 557628 : Adaptor_m_to_n::wake_up_pusher()
406 : {
407 557628 : (*this->first)[this->cur_pull_id] = ((*this->first)[this->cur_pull_id] + 1) % this->buffer_size;
408 551832 : (*this->counter)[this->cur_pull_id]++; // atomic fetch add
409 :
410 586651 : if (!this->active_waiting) // passive waiting
411 : {
412 553911 : if (!this->is_full(this->cur_pull_id))
413 : {
414 533375 : std::lock_guard<std::mutex> lock((*this->mtx_push.get())[this->cur_pull_id]);
415 552700 : (*this->cnd_push.get())[this->cur_pull_id].notify_one();
416 557518 : }
417 : }
418 :
419 591211 : this->cur_pull_id = (this->cur_pull_id + *this->n_pullers) % this->buffer->size();
420 574028 : }
421 :
422 : void
423 11825 : Adaptor_m_to_n::wake_up()
424 : {
425 11825 : if (!this->active_waiting) // passive waiting
426 : {
427 488305 : for (size_t i = 0; i < this->buffer->size(); i++)
428 : {
429 476540 : std::unique_lock<std::mutex> lock((*this->mtx_push.get())[i]);
430 477816 : (*this->cnd_push.get())[i].notify_all();
431 476975 : }
432 487965 : for (size_t i = 0; i < this->buffer->size(); i++)
433 : {
434 475325 : std::unique_lock<std::mutex> lock((*this->mtx_pull.get())[i]);
435 477462 : (*this->cnd_pull.get())[i].notify_all();
436 477503 : }
437 : }
438 11573 : }
439 :
440 : void
441 11829 : Adaptor_m_to_n::cancel_waiting()
442 : {
443 11829 : this->send_cancel_signal();
444 11826 : this->wake_up();
445 11830 : }
|