Line data Source code
1 : #include "Module/Stateful/Adaptor/Adaptor_m_to_n.hpp"
2 : #include "Tools/Math/utils.h"
3 :
4 : using namespace spu;
5 : using namespace spu::module;
6 :
7 19359 : Adaptor_m_to_n::~Adaptor_m_to_n()
8 : {
9 1565393 : for (auto b : this->buffer_to_free)
10 1558940 : delete[] b;
11 6453 : if (this->cloned) (*this->n_clones)--;
12 12906 : }
13 :
14 : Adaptor_m_to_n*
15 6184 : Adaptor_m_to_n::clone() const
16 : {
17 6184 : auto m = new Adaptor_m_to_n(*this);
18 6184 : m->deep_copy(*this);
19 6184 : return m;
20 : }
21 :
22 : void
23 6184 : Adaptor_m_to_n::deep_copy(const Adaptor_m_to_n& m)
24 : {
25 6184 : Stateful::deep_copy(m);
26 :
27 6184 : if (*this->buffers_allocated)
28 : {
29 0 : std::stringstream message;
30 0 : message << "Shared buffers have already been allocated, cloning is no more possible." << std::endl;
31 0 : throw tools::runtime_error(__FILE__, __LINE__, __func__, message.str());
32 0 : }
33 :
34 6184 : this->cloned = true;
35 6184 : (*this->n_clones)++;
36 6184 : this->tid_push = -1;
37 6184 : this->tid_pull = -1;
38 6184 : this->cur_push_id = -1;
39 6184 : this->cur_pull_id = -1;
40 :
41 6184 : this->waiting_canceled.reset(new std::atomic<bool>(m.waiting_canceled->load()));
42 6184 : }
43 :
44 : void
45 269 : Adaptor_m_to_n::alloc_buffers()
46 : {
47 269 : if (*this->buffers_allocated)
48 : {
49 0 : std::stringstream message;
50 0 : message << "Synchronization buffers have already been allocated.";
51 0 : throw tools::runtime_error(__FILE__, __LINE__, __func__, message.str());
52 0 : }
53 :
54 269 : if (this->cloned)
55 : {
56 0 : std::stringstream message;
57 0 : message << "'alloc_buffers()' cannot be called on a cloned module ('tid_push' = " << this->tid_push
58 0 : << ", 'tid_pull' = " << this->tid_pull << ").";
59 0 : throw tools::runtime_error(__FILE__, __LINE__, __func__, message.str());
60 0 : }
61 :
62 269 : size_t ppcm = tools::find_smallest_common_multiple(*this->n_pushers, *this->n_pullers);
63 :
64 269 : if (ppcm > 1000)
65 : {
66 0 : std::stringstream message;
67 0 : message << "'ppcm' cannot exceed 1000 ('ppcm' = " << ppcm << ").";
68 0 : throw tools::invalid_argument(__FILE__, __LINE__, __func__, message.str());
69 0 : }
70 :
71 538 : this->buffer->resize(ppcm,
72 538 : std::vector<std::vector<int8_t*>>(this->n_sockets, std::vector<int8_t*>(this->buffer_size)));
73 :
74 269 : if (this->buffer_to_free.size())
75 : {
76 0 : std::stringstream message;
77 0 : message << "This should never happen.";
78 0 : throw tools::runtime_error(__FILE__, __LINE__, __func__, message.str());
79 0 : }
80 :
81 : // this->buffer_to_free.clear();
82 6436 : for (size_t d = 0; d < ppcm; d++)
83 : {
84 18495 : for (size_t s = 0; s < this->n_sockets; s++)
85 1571268 : for (size_t b = 0; b < this->buffer_size; b++)
86 : {
87 1558940 : (*this->buffer)[d][s][b] = new int8_t[this->n_frames * this->n_bytes[s]];
88 1558940 : this->buffer_to_free.push_back((*this->buffer)[d][s][b]);
89 : }
90 6167 : (*this->first)[d] = 0;
91 6167 : (*this->last)[d] = 0;
92 6167 : (*this->counter)[d] = this->buffer_size;
93 : }
94 :
95 269 : *this->buffers_allocated = true;
96 269 : }
97 :
98 : void
99 2823 : Adaptor_m_to_n::add_pusher()
100 : {
101 2823 : if (*this->n_pushers - 1 + *this->n_pullers - 1 >= *this->n_clones)
102 : {
103 0 : std::stringstream message;
104 0 : message << "Pusher cannot be added because a clone is missing ('n_pushers' = " << *this->n_pushers
105 0 : << ", 'n_pullers' = " << *this->n_pullers << ", 'n_clones' = " << *this->n_clones << ").";
106 0 : throw tools::runtime_error(__FILE__, __LINE__, __func__, message.str());
107 0 : }
108 :
109 2823 : this->tid_push = (*this->n_pushers);
110 2823 : (*this->n_pushers)++;
111 2823 : this->cur_push_id = this->tid_push;
112 2823 : }
113 :
114 : void
115 2823 : Adaptor_m_to_n::add_puller()
116 : {
117 2823 : if (*this->n_pushers - 1 + *this->n_pullers - 1 >= *this->n_clones)
118 : {
119 0 : std::stringstream message;
120 0 : message << "Puller cannot be added because a clone is missing ('n_pushers' = " << *this->n_pushers
121 0 : << ", 'n_pullers' = " << *this->n_pullers << ", 'n_clones' = " << *this->n_clones << ").";
122 0 : throw tools::runtime_error(__FILE__, __LINE__, __func__, message.str());
123 0 : }
124 :
125 2823 : this->tid_pull = (*this->n_pullers);
126 2823 : (*this->n_pullers)++;
127 2823 : this->cur_pull_id = this->tid_pull;
128 2823 : }
129 :
130 : void
131 11824 : Adaptor_m_to_n::send_cancel_signal()
132 : {
133 11824 : *this->waiting_canceled = true;
134 11825 : }
135 :
136 : void
137 5915 : Adaptor_m_to_n::reset()
138 : {
139 5915 : if (!this->cloned)
140 : {
141 269 : *this->waiting_canceled = false;
142 6436 : for (size_t d = 0; d < this->buffer->size(); d++)
143 : {
144 6167 : (*this->first)[d] = 0;
145 6167 : (*this->last)[d] = 0;
146 6167 : (*this->counter)[d] = this->buffer_size;
147 : }
148 : }
149 5915 : this->cur_push_id = (size_t)this->tid_push;
150 5915 : this->cur_pull_id = (size_t)this->tid_pull;
151 5915 : this->reset_buffer();
152 5915 : }
153 :
154 : void
155 6182 : Adaptor_m_to_n::set_no_copy_pull(const bool no_copy_pull)
156 : {
157 6182 : this->no_copy_pull = no_copy_pull;
158 6182 : }
159 :
160 : void
161 6183 : Adaptor_m_to_n::set_no_copy_push(const bool no_copy_push)
162 : {
163 6183 : this->no_copy_push = no_copy_push;
164 6183 : }
165 :
166 : bool
167 540553 : Adaptor_m_to_n::is_no_copy_pull()
168 : {
169 540553 : return this->no_copy_pull;
170 : }
171 :
172 : bool
173 590313 : Adaptor_m_to_n::is_no_copy_push()
174 : {
175 590313 : return this->no_copy_push;
176 : }
177 :
178 : void
179 12099 : Adaptor_m_to_n::reset_buffer()
180 : {
181 12099 : if (!this->cloned && *this->buffers_allocated)
182 : {
183 807 : size_t id_buff = 0;
184 19308 : for (size_t d = 0; d < (*this->buffer).size(); d++)
185 55485 : for (size_t s = 0; s < this->n_sockets; s++)
186 4713804 : for (size_t b = 0; b < this->buffer_size; b++)
187 4676820 : (*this->buffer)[d][s][b] = this->buffer_to_free[id_buff++];
188 : }
189 12099 : }
190 :
191 : void
192 14364 : Adaptor_m_to_n::set_n_frames(const size_t n_frames)
193 : {
194 14364 : const auto old_n_frames = this->get_n_frames();
195 14364 : if (old_n_frames != n_frames)
196 : {
197 4102 : Module::set_n_frames(n_frames);
198 4102 : if (!this->cloned)
199 : {
200 98 : if (*this->buffers_allocated)
201 : {
202 4212 : for (size_t d = 0; d < (*this->buffer).size(); d++)
203 : {
204 12074 : for (size_t s = 0; s < (*this->buffer)[d].size(); s++)
205 : {
206 202048 : for (size_t b = 0; b < (*this->buffer)[d][s].size(); b++)
207 : {
208 194088 : auto old_ptr = (*this->buffer)[d][s][b];
209 194088 : (*this->buffer)[d][s][b] = new int8_t[this->n_bytes[s] * n_frames];
210 :
211 194088 : bool found = false;
212 374933220 : for (size_t bf = 0; bf < this->buffer_to_free.size(); bf++)
213 374933220 : if (this->buffer_to_free[bf] == old_ptr)
214 : {
215 194088 : delete[] this->buffer_to_free[bf];
216 194088 : this->buffer_to_free[bf] = (*this->buffer)[d][s][b];
217 194088 : found = true;
218 194088 : break;
219 : }
220 :
221 194088 : if (found == false)
222 : {
223 0 : std::stringstream message;
224 0 : message << "This should never happen.";
225 0 : throw tools::runtime_error(__FILE__, __LINE__, __func__, message.str());
226 0 : }
227 : }
228 : }
229 : }
230 : }
231 : }
232 : }
233 14364 : }
234 :
235 : // --------------------------------------------------------------------------------------------------------------------
236 :
237 : void
238 0 : Adaptor_m_to_n::push(const std::vector<const int8_t*>& in, const size_t /*frame_id*/)
239 : {
240 0 : this->wait_push();
241 :
242 0 : for (size_t s = 0; s < this->n_sockets; s++)
243 : {
244 0 : int8_t* out = (int8_t*)this->get_empty_buffer(s);
245 :
246 0 : std::copy(
247 0 : in[s] + 0 * this->n_bytes[s], in[s] + this->get_n_frames() * this->n_bytes[s], out + 0 * this->n_bytes[s]);
248 : }
249 :
250 0 : this->wake_up_puller();
251 0 : }
252 :
253 : void
254 0 : Adaptor_m_to_n::pull(const std::vector<int8_t*>& out, const size_t /*frame_id*/)
255 : {
256 0 : this->wait_pull();
257 :
258 0 : for (size_t s = 0; s < this->n_sockets; s++)
259 : {
260 0 : const int8_t* in = (const int8_t*)this->get_filled_buffer(s);
261 :
262 0 : std::copy(
263 0 : in + 0 * this->n_bytes[s], in + this->get_n_frames() * this->n_bytes[s], out[s] + 0 * this->n_bytes[s]);
264 : }
265 :
266 0 : this->wake_up_pusher();
267 0 : }
268 :
269 : void
270 591915 : Adaptor_m_to_n::wait_push()
271 : {
272 591915 : if (this->tid_push < 0)
273 : {
274 0 : std::stringstream message;
275 0 : message << "This instance is not a pusher.";
276 0 : throw tools::runtime_error(__FILE__, __LINE__, __func__, message.str());
277 0 : }
278 :
279 591915 : if (this->active_waiting)
280 : {
281 40531 : while (this->is_full(this->cur_push_id) && !*this->waiting_canceled)
282 : ;
283 : }
284 : else // passive waiting
285 : {
286 555759 : if (this->is_full(this->cur_push_id) && !*this->waiting_canceled)
287 : {
288 225214 : std::unique_lock<std::mutex> lock((*this->mtx_push.get())[this->cur_push_id]);
289 235636 : ((*this->cnd_push.get())[this->cur_push_id])
290 702854 : .wait(lock, [this]() { return !(this->is_full(this->cur_push_id) && !*this->waiting_canceled); });
291 235225 : }
292 : }
293 :
294 615907 : if (*this->waiting_canceled) throw tools::waiting_canceled(__FILE__, __LINE__, __func__);
295 607987 : }
296 :
297 : void
298 542255 : Adaptor_m_to_n::wait_pull()
299 : {
300 542255 : if (this->tid_pull < 0)
301 : {
302 0 : std::stringstream message;
303 0 : message << "This instance is not a puller.";
304 0 : throw tools::runtime_error(__FILE__, __LINE__, __func__, message.str());
305 0 : }
306 :
307 542255 : if (this->active_waiting)
308 : {
309 524050 : while (this->is_empty(this->cur_pull_id) && !*this->waiting_canceled)
310 : ;
311 : }
312 : else // passive waiting
313 : {
314 519222 : if (this->is_empty(this->cur_pull_id) && !*this->waiting_canceled)
315 : {
316 200178 : std::unique_lock<std::mutex> lock((*this->mtx_pull.get())[this->cur_pull_id]);
317 203869 : ((*this->cnd_pull.get())[this->cur_pull_id])
318 628234 : .wait(lock, [this]() { return !(this->is_empty(this->cur_pull_id) && !*this->waiting_canceled); });
319 202835 : }
320 : }
321 :
322 546635 : if (this->is_empty(this->cur_pull_id) && *this->waiting_canceled)
323 1769 : throw tools::waiting_canceled(__FILE__, __LINE__, __func__);
324 539509 : }
325 :
326 : void*
327 1051051 : Adaptor_m_to_n::get_empty_buffer(const size_t sid)
328 : {
329 : #ifndef SPU_FAST
330 1051051 : if (!*this->buffers_allocated)
331 : {
332 0 : std::stringstream message;
333 0 : message << "You need to call 'alloc_buffers()' before to change 'get_empty_buffer()'.";
334 0 : throw tools::runtime_error(__FILE__, __LINE__, __func__, message.str());
335 0 : }
336 : #endif
337 1050753 : return (void*)(*this->buffer)[this->cur_push_id][sid][(*this->last)[this->cur_push_id]];
338 : }
339 :
340 : void*
341 940144 : Adaptor_m_to_n::get_filled_buffer(const size_t sid)
342 : {
343 : #ifndef SPU_FAST
344 940144 : if (!*this->buffers_allocated)
345 : {
346 0 : std::stringstream message;
347 0 : message << "You need to call 'alloc_buffers()' before to change 'get_filled_buffer()'.";
348 0 : throw tools::runtime_error(__FILE__, __LINE__, __func__, message.str());
349 0 : }
350 : #endif
351 931452 : return (void*)(*this->buffer)[this->cur_pull_id][sid][(*this->first)[this->cur_pull_id]];
352 : }
353 :
354 : void*
355 1053614 : Adaptor_m_to_n::get_empty_buffer(const size_t sid, void* swap_buffer)
356 : {
357 : #ifndef SPU_FAST
358 1053614 : if (!*this->buffers_allocated)
359 : {
360 0 : std::stringstream message;
361 0 : message << "You need to call 'alloc_buffers()' before to change 'get_empty_buffer()'.";
362 0 : throw tools::runtime_error(__FILE__, __LINE__, __func__, message.str());
363 0 : }
364 : #endif
365 1051885 : void* empty_buffer = (void*)(*this->buffer)[this->cur_push_id][sid][(*this->last)[this->cur_push_id]];
366 1028026 : (*this->buffer)[this->cur_push_id][sid][(*this->last)[this->cur_push_id]] = (int8_t*)swap_buffer;
367 1015987 : return empty_buffer;
368 : }
369 :
370 : void*
371 955951 : Adaptor_m_to_n::get_filled_buffer(const size_t sid, void* swap_buffer)
372 : {
373 : #ifndef SPU_FAST
374 955951 : if (!*this->buffers_allocated)
375 : {
376 0 : std::stringstream message;
377 0 : message << "You need to call 'alloc_buffers()' before to change 'get_filled_buffer()'.";
378 0 : throw tools::runtime_error(__FILE__, __LINE__, __func__, message.str());
379 0 : }
380 : #endif
381 949948 : void* filled_buffer = (void*)(*this->buffer)[this->cur_pull_id][sid][(*this->first)[this->cur_pull_id]];
382 924122 : (*this->buffer)[this->cur_pull_id][sid][(*this->first)[this->cur_pull_id]] = (int8_t*)swap_buffer;
383 909752 : return filled_buffer;
384 : }
385 :
386 : void
387 618336 : Adaptor_m_to_n::wake_up_puller()
388 : {
389 618336 : (*this->last)[this->cur_push_id] = ((*this->last)[this->cur_push_id] + 1) % this->buffer_size;
390 614755 : (*this->counter)[this->cur_push_id]--; // atomic fetch sub
391 :
392 640105 : if (!this->active_waiting) // passive waiting
393 : {
394 602293 : if (!this->is_empty(this->cur_push_id))
395 : {
396 591359 : std::lock_guard<std::mutex> lock((*this->mtx_pull.get())[this->cur_push_id]);
397 599100 : (*this->cnd_pull.get())[this->cur_push_id].notify_one();
398 601388 : }
399 : }
400 :
401 639956 : this->cur_push_id = (this->cur_push_id + *this->n_pushers) % this->buffer->size();
402 627812 : }
403 :
404 : void
405 562708 : Adaptor_m_to_n::wake_up_pusher()
406 : {
407 562708 : (*this->first)[this->cur_pull_id] = ((*this->first)[this->cur_pull_id] + 1) % this->buffer_size;
408 555046 : (*this->counter)[this->cur_pull_id]++; // atomic fetch add
409 :
410 591068 : if (!this->active_waiting) // passive waiting
411 : {
412 566397 : if (!this->is_full(this->cur_pull_id))
413 : {
414 550342 : std::lock_guard<std::mutex> lock((*this->mtx_push.get())[this->cur_pull_id]);
415 563443 : (*this->cnd_push.get())[this->cur_pull_id].notify_one();
416 564374 : }
417 : }
418 :
419 591785 : this->cur_pull_id = (this->cur_pull_id + *this->n_pullers) % this->buffer->size();
420 576860 : }
421 :
422 : void
423 11821 : Adaptor_m_to_n::wake_up()
424 : {
425 11821 : if (!this->active_waiting) // passive waiting
426 : {
427 487427 : for (size_t i = 0; i < this->buffer->size(); i++)
428 : {
429 475296 : std::unique_lock<std::mutex> lock((*this->mtx_push.get())[i]);
430 477485 : (*this->cnd_push.get())[i].notify_all();
431 476175 : }
432 487671 : for (size_t i = 0; i < this->buffer->size(); i++)
433 : {
434 475174 : std::unique_lock<std::mutex> lock((*this->mtx_pull.get())[i]);
435 477099 : (*this->cnd_pull.get())[i].notify_all();
436 476271 : }
437 : }
438 11572 : }
439 :
440 : void
441 11825 : Adaptor_m_to_n::cancel_waiting()
442 : {
443 11825 : this->send_cancel_signal();
444 11824 : this->wake_up();
445 11818 : }
|