Line data Source code
1 : #include <algorithm>
2 : #include <cstring>
3 : #include <exception>
4 : #include <fstream>
5 : #include <numeric>
6 : #include <set>
7 : #include <sstream>
8 : #include <thread>
9 : #include <utility>
10 :
11 : #include "Module/Module.hpp"
12 : #include "Module/Stateful/Adaptor/Adaptor_m_to_n.hpp"
13 : #include "Module/Stateful/Probe/Probe.hpp"
14 : #include "Module/Stateful/Switcher/Switcher.hpp"
15 : #include "Runtime/Sequence/Sequence.hpp"
16 : #include "Runtime/Socket/Socket.hpp"
17 : #include "Runtime/Task/Task.hpp"
18 : #include "Tools/Display/rang_format/rang_format.h"
19 : #include "Tools/Exception/exception.hpp"
20 : #include "Tools/Signal_handler/Signal_handler.hpp"
21 : #include "Tools/Thread/Thread_pinning/Thread_pinning.hpp"
22 : #include "Tools/Thread/Thread_pinning/Thread_pinning_utils.hpp"
23 : #include "Tools/Thread/Thread_pool/Standard/Thread_pool_standard.hpp"
24 :
25 : using namespace spu;
26 : using namespace spu::runtime;
27 :
28 0 : Sequence::Sequence(const std::vector<const runtime::Task*>& firsts,
29 : const std::vector<const runtime::Task*>& lasts,
30 : const std::vector<const runtime::Task*>& exclusions,
31 : const size_t n_threads,
32 : const bool thread_pinning,
33 0 : const std::vector<size_t>& puids)
34 0 : : n_threads(n_threads)
35 0 : , sequences(n_threads, nullptr)
36 0 : , modules(n_threads)
37 0 : , all_modules(n_threads)
38 0 : , mtx_exception(new std::mutex())
39 0 : , force_exit_loop(new std::atomic<bool>(false))
40 0 : , tasks_inplace(false)
41 0 : , thread_pinning(thread_pinning)
42 0 : , puids(puids)
43 0 : , no_copy_mode(true)
44 0 : , saved_exclusions(exclusions)
45 0 : , switchers_reset(n_threads)
46 0 : , auto_stop(true)
47 0 : , is_part_of_pipeline(false)
48 0 : , next_round_is_over(n_threads, false)
49 0 : , cur_task_id(n_threads, 0)
50 0 : , cur_ss(n_threads, nullptr)
51 : {
52 : #ifndef SPU_HWLOC
53 : if (thread_pinning)
54 : std::clog << rang::tag::warning
55 : << "StreamPU has not been linked with the 'hwloc' library, the 'thread_pinning' "
56 : "option of the 'runtime::Sequence' will have no effect."
57 : << std::endl;
58 : #endif
59 :
60 0 : if (thread_pinning && puids.size() < n_threads)
61 : {
62 0 : std::stringstream message;
63 0 : message << "'puids.size()' has to be greater or equal to 'n_threads' ('puids.size()' = " << puids.size()
64 0 : << " , 'n_threads' = " << n_threads << ").";
65 0 : throw tools::invalid_argument(__FILE__, __LINE__, __func__, message.str());
66 0 : }
67 :
68 0 : this->init<runtime::Sub_sequence_const, const runtime::Task>(firsts, lasts, exclusions);
69 0 : }
70 :
71 0 : Sequence::Sequence(const std::vector<const runtime::Task*>& firsts,
72 : const std::vector<const runtime::Task*>& lasts,
73 : const size_t n_threads,
74 : const bool thread_pinning,
75 0 : const std::vector<size_t>& puids)
76 0 : : Sequence(firsts, lasts, {}, n_threads, thread_pinning, puids)
77 : {
78 0 : }
79 :
80 0 : Sequence::Sequence(const std::vector<const runtime::Task*>& firsts,
81 : const size_t n_threads,
82 : const bool thread_pinning,
83 0 : const std::vector<size_t>& puids)
84 0 : : Sequence(firsts, {}, {}, n_threads, thread_pinning, puids)
85 : {
86 0 : }
87 :
88 0 : Sequence::Sequence(const runtime::Task& first,
89 : const runtime::Task& last,
90 : const size_t n_threads,
91 : const bool thread_pinning,
92 0 : const std::vector<size_t>& puids)
93 0 : : Sequence({ &first }, { &last }, n_threads, thread_pinning, puids)
94 : {
95 0 : }
96 :
97 0 : Sequence::Sequence(const runtime::Task& first,
98 : const size_t n_threads,
99 : const bool thread_pinning,
100 0 : const std::vector<size_t>& puids)
101 0 : : Sequence({ &first }, n_threads, thread_pinning, puids)
102 : {
103 0 : }
104 :
105 : std::vector<const runtime::Task*>
106 532 : exclusions_convert_to_const(const std::vector<runtime::Task*>& exclusions)
107 : {
108 532 : std::vector<const runtime::Task*> exclusions_const;
109 572 : for (auto exception : exclusions)
110 40 : exclusions_const.push_back(exception);
111 532 : return exclusions_const;
112 0 : }
113 :
114 470 : Sequence::Sequence(const std::vector<runtime::Task*>& firsts,
115 : const std::vector<runtime::Task*>& lasts,
116 : const std::vector<runtime::Task*>& exclusions,
117 : const size_t n_threads,
118 : const bool thread_pinning,
119 : const std::vector<size_t>& puids,
120 470 : const bool tasks_inplace)
121 470 : : n_threads(n_threads)
122 470 : , sequences(n_threads, nullptr)
123 470 : , modules(tasks_inplace ? n_threads - 1 : n_threads)
124 470 : , all_modules(n_threads)
125 470 : , mtx_exception(new std::mutex())
126 470 : , force_exit_loop(new std::atomic<bool>(false))
127 470 : , tasks_inplace(tasks_inplace)
128 470 : , thread_pinning(thread_pinning)
129 470 : , puids(puids)
130 470 : , no_copy_mode(true)
131 470 : , saved_exclusions(exclusions_convert_to_const(exclusions))
132 470 : , switchers_reset(n_threads)
133 470 : , auto_stop(true)
134 470 : , is_part_of_pipeline(false)
135 470 : , next_round_is_over(n_threads, false)
136 470 : , cur_task_id(n_threads, 0)
137 2350 : , cur_ss(n_threads, nullptr)
138 : {
139 470 : if (thread_pinning && puids.size() < n_threads)
140 : {
141 0 : std::stringstream message;
142 0 : message << "'puids.size()' has greater or equal to 'n_threads' ('puids.size()' = " << puids.size()
143 0 : << " , 'n_threads' = " << n_threads << ").";
144 0 : throw tools::invalid_argument(__FILE__, __LINE__, __func__, message.str());
145 0 : }
146 :
147 470 : if (tasks_inplace)
148 470 : this->init<runtime::Sub_sequence, runtime::Task>(firsts, lasts, exclusions);
149 : else
150 : {
151 0 : std::vector<const runtime::Task*> firsts_bis;
152 0 : for (auto first : firsts)
153 0 : firsts_bis.push_back(first);
154 0 : std::vector<const runtime::Task*> lasts_bis;
155 0 : for (auto last : lasts)
156 0 : lasts_bis.push_back(last);
157 0 : std::vector<const runtime::Task*> exclusions_bis;
158 0 : for (auto exception : exclusions)
159 0 : exclusions_bis.push_back(exception);
160 0 : this->init<runtime::Sub_sequence_const, const runtime::Task>(firsts_bis, lasts_bis, exclusions_bis);
161 0 : }
162 470 : }
163 :
164 95 : Sequence::Sequence(const std::vector<runtime::Task*>& firsts,
165 : const std::vector<runtime::Task*>& lasts,
166 : const size_t n_threads,
167 : const bool thread_pinning,
168 : const std::vector<size_t>& puids,
169 95 : const bool tasks_inplace)
170 95 : : Sequence(firsts, lasts, {}, n_threads, thread_pinning, puids, tasks_inplace)
171 : {
172 95 : }
173 :
174 101 : Sequence::Sequence(const std::vector<runtime::Task*>& firsts,
175 : const size_t n_threads,
176 : const bool thread_pinning,
177 : const std::vector<size_t>& puids,
178 101 : const bool tasks_inplace)
179 101 : : Sequence(firsts, {}, {}, n_threads, thread_pinning, puids, tasks_inplace)
180 : {
181 101 : }
182 :
183 1 : Sequence::Sequence(runtime::Task& first,
184 : runtime::Task& last,
185 : const size_t n_threads,
186 : const bool thread_pinning,
187 : const std::vector<size_t>& puids,
188 1 : const bool tasks_inplace)
189 1 : : Sequence({ &first }, { &last }, n_threads, thread_pinning, puids, tasks_inplace)
190 : {
191 1 : }
192 :
193 101 : Sequence::Sequence(runtime::Task& first,
194 : const size_t n_threads,
195 : const bool thread_pinning,
196 : const std::vector<size_t>& puids,
197 101 : const bool tasks_inplace)
198 101 : : Sequence({ &first }, n_threads, thread_pinning, puids, tasks_inplace)
199 : {
200 101 : }
201 : //=======================================New pinning version constructors===============================================
202 0 : Sequence::Sequence(const std::vector<const runtime::Task*>& firsts,
203 : const std::vector<const runtime::Task*>& lasts,
204 : const std::vector<const runtime::Task*>& exclusions,
205 : const size_t n_threads,
206 : const bool thread_pinning,
207 0 : const std::string& sequence_pinning_policy)
208 0 : : n_threads(n_threads)
209 0 : , sequences(n_threads, nullptr)
210 0 : , modules(n_threads)
211 0 : , all_modules(n_threads)
212 0 : , mtx_exception(new std::mutex())
213 0 : , force_exit_loop(new std::atomic<bool>(false))
214 0 : , tasks_inplace(false)
215 0 : , thread_pinning(thread_pinning)
216 0 : , puids({})
217 0 : , no_copy_mode(true)
218 0 : , saved_exclusions(exclusions)
219 0 : , switchers_reset(n_threads)
220 0 : , auto_stop(true)
221 0 : , is_part_of_pipeline(false)
222 0 : , next_round_is_over(n_threads, false)
223 0 : , cur_task_id(n_threads, 0)
224 0 : , cur_ss(n_threads, nullptr)
225 : {
226 : #ifndef SPU_HWLOC
227 : if (thread_pinning)
228 : std::clog << rang::tag::warning
229 : << "StreamPU has not been linked with the 'hwloc' library, the 'thread_pinning' "
230 : "option of the 'runtime::Sequence' will have no effect."
231 : << std::endl;
232 : #endif
233 :
234 0 : if (thread_pinning && !sequence_pinning_policy.empty())
235 : {
236 0 : pin_objects_per_thread = tools::Thread_pinning_utils::stage_parser_unpacker(sequence_pinning_policy, n_threads);
237 : }
238 0 : else if (thread_pinning && sequence_pinning_policy.empty())
239 : {
240 0 : std::stringstream message;
241 0 : message << "Pinning is activated but there is no specified policy." << std::endl;
242 0 : throw tools::invalid_argument(__FILE__, __LINE__, __func__, message.str());
243 0 : }
244 :
245 0 : this->init<runtime::Sub_sequence_const, const runtime::Task>(firsts, lasts, exclusions);
246 0 : }
247 :
248 0 : Sequence::Sequence(const std::vector<const runtime::Task*>& firsts,
249 : const std::vector<const runtime::Task*>& lasts,
250 : const size_t n_threads,
251 : const bool thread_pinning,
252 0 : const std::string& sequence_pinning_policy)
253 0 : : Sequence(firsts, lasts, {}, n_threads, thread_pinning, sequence_pinning_policy)
254 : {
255 0 : }
256 :
257 0 : Sequence::Sequence(const std::vector<const runtime::Task*>& firsts,
258 : const size_t n_threads,
259 : const bool thread_pinning,
260 0 : const std::string& sequence_pinning_policy)
261 0 : : Sequence(firsts, {}, {}, n_threads, thread_pinning, sequence_pinning_policy)
262 : {
263 0 : }
264 :
265 0 : Sequence::Sequence(const runtime::Task& first,
266 : const runtime::Task& last,
267 : const size_t n_threads,
268 : const bool thread_pinning,
269 0 : const std::string& sequence_pinning_policy)
270 0 : : Sequence({ &first }, { &last }, n_threads, thread_pinning, sequence_pinning_policy)
271 : {
272 0 : }
273 :
274 0 : Sequence::Sequence(const runtime::Task& first,
275 : const size_t n_threads,
276 : const bool thread_pinning,
277 0 : const std::string& sequence_pinning_policy)
278 0 : : Sequence({ &first }, n_threads, thread_pinning, sequence_pinning_policy)
279 : {
280 0 : }
281 :
282 62 : Sequence::Sequence(const std::vector<runtime::Task*>& firsts,
283 : const std::vector<runtime::Task*>& lasts,
284 : const std::vector<runtime::Task*>& exclusions,
285 : const size_t n_threads,
286 : const bool thread_pinning,
287 : const std::string& sequence_pinning_policy,
288 62 : const bool tasks_inplace)
289 62 : : n_threads(n_threads)
290 62 : , sequences(n_threads, nullptr)
291 62 : , modules(tasks_inplace ? n_threads - 1 : n_threads)
292 62 : , all_modules(n_threads)
293 62 : , mtx_exception(new std::mutex())
294 62 : , force_exit_loop(new std::atomic<bool>(false))
295 62 : , tasks_inplace(tasks_inplace)
296 62 : , thread_pinning(thread_pinning)
297 62 : , puids({})
298 62 : , no_copy_mode(true)
299 62 : , saved_exclusions(exclusions_convert_to_const(exclusions))
300 62 : , switchers_reset(n_threads)
301 62 : , auto_stop(true)
302 62 : , is_part_of_pipeline(false)
303 62 : , next_round_is_over(n_threads, false)
304 62 : , cur_task_id(n_threads, 0)
305 310 : , cur_ss(n_threads, nullptr)
306 : {
307 62 : if (thread_pinning && !sequence_pinning_policy.empty())
308 : {
309 29 : pin_objects_per_thread = tools::Thread_pinning_utils::stage_parser_unpacker(sequence_pinning_policy, n_threads);
310 : }
311 33 : else if (thread_pinning && sequence_pinning_policy.empty())
312 : {
313 0 : std::stringstream message;
314 0 : message << "Pinning is activated but there is no specified policy." << std::endl;
315 0 : throw tools::invalid_argument(__FILE__, __LINE__, __func__, message.str());
316 0 : }
317 :
318 62 : if (tasks_inplace)
319 62 : this->init<runtime::Sub_sequence, runtime::Task>(firsts, lasts, exclusions);
320 : else
321 : {
322 0 : std::vector<const runtime::Task*> firsts_bis;
323 0 : for (auto first : firsts)
324 0 : firsts_bis.push_back(first);
325 0 : std::vector<const runtime::Task*> lasts_bis;
326 :
327 0 : for (auto last : lasts)
328 0 : lasts_bis.push_back(last);
329 0 : std::vector<const runtime::Task*> exclusions_bis;
330 :
331 0 : for (auto exception : exclusions)
332 0 : exclusions_bis.push_back(exception);
333 0 : this->init<runtime::Sub_sequence_const, const runtime::Task>(firsts_bis, lasts_bis, exclusions_bis);
334 0 : }
335 62 : }
336 :
337 0 : Sequence::Sequence(const std::vector<runtime::Task*>& firsts,
338 : const std::vector<runtime::Task*>& lasts,
339 : const size_t n_threads,
340 : const bool thread_pinning,
341 : const std::string& sequence_pinning_policy,
342 0 : const bool tasks_inplace)
343 0 : : Sequence(firsts, lasts, {}, n_threads, thread_pinning, sequence_pinning_policy, tasks_inplace)
344 : {
345 0 : }
346 :
347 0 : Sequence::Sequence(const std::vector<runtime::Task*>& firsts,
348 : const size_t n_threads,
349 : const bool thread_pinning,
350 : const std::string& sequence_pinning_policy,
351 0 : const bool tasks_inplace)
352 0 : : Sequence(firsts, {}, {}, n_threads, thread_pinning, sequence_pinning_policy, tasks_inplace)
353 : {
354 0 : }
355 :
356 0 : Sequence::Sequence(runtime::Task& first,
357 : runtime::Task& last,
358 : const size_t n_threads,
359 : const bool thread_pinning,
360 : const std::string& sequence_pinning_policy,
361 0 : const bool tasks_inplace)
362 0 : : Sequence({ &first }, { &last }, n_threads, thread_pinning, sequence_pinning_policy, tasks_inplace)
363 : {
364 0 : }
365 :
366 0 : Sequence::Sequence(runtime::Task& first,
367 : const size_t n_threads,
368 : const bool thread_pinning,
369 : const std::string& sequence_pinning_policy,
370 0 : const bool tasks_inplace)
371 0 : : Sequence({ &first }, n_threads, thread_pinning, sequence_pinning_policy, tasks_inplace)
372 : {
373 0 : }
374 :
375 : // ====================================================================================================================
376 :
377 1592 : Sequence::~Sequence()
378 : {
379 580 : std::vector<tools::Digraph_node<Sub_sequence>*> already_deleted_nodes;
380 5929 : for (auto s : this->sequences)
381 5349 : this->delete_tree(s, already_deleted_nodes);
382 1012 : }
383 :
384 : template<class SS, class TA>
385 : void
386 580 : Sequence::init(const std::vector<TA*>& firsts, const std::vector<TA*>& lasts, const std::vector<TA*>& exclusions)
387 : {
388 580 : if (this->is_thread_pinning())
389 : {
390 29 : if (!this->puids.empty())
391 0 : tools::Thread_pinning::pin(this->puids[0]);
392 : else
393 29 : tools::Thread_pinning::pin(this->pin_objects_per_thread[0]);
394 : }
395 :
396 580 : if (firsts.size() == 0)
397 : {
398 0 : std::stringstream message;
399 0 : message << "'firsts.size()' has to be strictly greater than 0.";
400 0 : throw tools::invalid_argument(__FILE__, __LINE__, __func__, message.str());
401 0 : }
402 :
403 580 : if (this->n_threads == 0)
404 : {
405 0 : std::stringstream message;
406 0 : message << "'n_threads' has to be strictly greater than 0.";
407 0 : throw tools::invalid_argument(__FILE__, __LINE__, __func__, message.str());
408 0 : }
409 :
410 620 : for (auto exclusion : exclusions)
411 : {
412 40 : if (std::find(firsts.begin(), firsts.end(), exclusion) != firsts.end())
413 : {
414 0 : std::stringstream message;
415 : message << "'exclusion' can't be contained in the 'firsts' vector ("
416 : << "'exclusion'"
417 0 : << " = " << +exclusion << ", "
418 : << "'exclusion->get_name()'"
419 0 : << " = " << exclusion->get_name() << ").";
420 0 : throw tools::invalid_argument(__FILE__, __LINE__, __func__, message.str());
421 0 : }
422 :
423 40 : if (std::find(lasts.begin(), lasts.end(), exclusion) != lasts.end())
424 : {
425 0 : std::stringstream message;
426 : message << "'exclusion' can't be contained in the 'lasts' vector ("
427 : << "'exclusion'"
428 0 : << " = " << +exclusion << ", "
429 : << "'exclusion->get_name()'"
430 0 : << " = " << exclusion->get_name() << ").";
431 0 : throw tools::invalid_argument(__FILE__, __LINE__, __func__, message.str());
432 0 : }
433 : }
434 1019 : for (auto t : lasts)
435 : {
436 439 : if (dynamic_cast<const module::Switcher*>(&t->get_module()) && t->get_name() == "commute")
437 : {
438 0 : std::stringstream message;
439 0 : message << "A sequence cannot end with a 'commute' task.";
440 0 : throw tools::invalid_argument(__FILE__, __LINE__, __func__, message.str());
441 0 : }
442 : }
443 :
444 580 : auto root = new tools::Digraph_node<SS>({}, {}, nullptr, 0);
445 580 : root->set_contents(nullptr);
446 580 : size_t ssid = 0, taid = 0;
447 580 : std::vector<TA*> switchers;
448 580 : std::vector<std::pair<TA*, tools::Digraph_node<SS>*>> selectors;
449 580 : std::vector<TA*> real_lasts;
450 :
451 580 : this->lasts_tasks_id.clear();
452 580 : this->firsts_tasks_id.clear();
453 580 : auto last_subseq = root;
454 580 : std::map<TA*, unsigned> in_sockets_feed;
455 1315 : for (auto first : firsts)
456 : {
457 735 : std::map<TA*, std::pair<tools::Digraph_node<SS>*, size_t>> task_subseq;
458 735 : auto contents = last_subseq->get_contents();
459 735 : this->firsts_tasks_id.push_back(contents ? contents->tasks_id[contents->tasks_id.size() - 1] : 0);
460 735 : last_subseq = this->init_recursive<SS, TA>(last_subseq,
461 : ssid,
462 : taid,
463 : selectors,
464 : switchers,
465 : *first,
466 : *first,
467 : lasts,
468 : exclusions,
469 735 : this->lasts_tasks_id,
470 : real_lasts,
471 : in_sockets_feed,
472 : task_subseq);
473 : }
474 :
475 580 : std::stringstream real_lasts_ss;
476 1515 : for (size_t rl = 0; rl < real_lasts.size(); rl++)
477 : real_lasts_ss << "'real_lasts"
478 935 : << "[" << rl << "]'"
479 935 : << " = " << +real_lasts[rl] << ", "
480 : << "'real_lasts"
481 935 : << "[" << rl << "]->get_name()'"
482 935 : << " = " << real_lasts[rl]->get_name() << ((rl < real_lasts.size() - 1) ? ", " : "");
483 :
484 1019 : for (auto last : lasts)
485 : {
486 439 : if (std::find(real_lasts.begin(), real_lasts.end(), last) == real_lasts.end())
487 : {
488 0 : std::stringstream message;
489 0 : message << "'last' is not contained in the 'real_lasts[" << real_lasts.size() << "]' vector ("
490 : << "'last'"
491 0 : << " = " << +last << ", "
492 : << "'last->get_name()'"
493 0 : << " = " << last->get_name() << ", " << real_lasts_ss.str() << ").";
494 0 : throw tools::runtime_error(__FILE__, __LINE__, __func__, message.str());
495 0 : }
496 : }
497 :
498 580 : this->n_tasks = taid;
499 : // this->check_ctrl_flow(root); // /!\ this check has been commented because it is known to do not work in the
500 : // general case
501 580 : this->_init<SS>(root);
502 580 : this->update_firsts_and_lasts_tasks();
503 580 : this->gen_processes();
504 580 : this->donners = get_modules<tools::Interface_is_done>(true);
505 :
506 5929 : for (size_t tid = 0; tid < this->n_threads; tid++)
507 42511 : for (auto& mdl : this->all_modules[tid])
508 37162 : if (auto swi = dynamic_cast<module::Switcher*>(mdl))
509 2143 : this->switchers_reset[tid].push_back(dynamic_cast<tools::Interface_reset*>(swi));
510 :
511 5929 : for (size_t tid = 0; tid < this->sequences.size(); tid++)
512 5349 : this->cur_ss[tid] = this->sequences[tid];
513 :
514 580 : this->thread_pool.reset(new tools::Thread_pool_standard(this->n_threads - 1));
515 580 : this->thread_pool->init(); // threads are spawned here
516 580 : }
517 :
518 : Sequence*
519 48 : Sequence::clone() const
520 : {
521 48 : auto c = new Sequence(*this);
522 :
523 48 : c->tasks_inplace = false;
524 48 : c->modules.resize(c->get_n_threads());
525 :
526 48 : std::vector<const runtime::Task*> firsts_tasks;
527 96 : for (auto ta : this->get_firsts_tasks()[0])
528 48 : firsts_tasks.push_back(ta);
529 :
530 48 : std::vector<const runtime::Task*> lasts_tasks;
531 96 : for (auto ta : this->get_lasts_tasks()[0])
532 48 : lasts_tasks.push_back(ta);
533 :
534 48 : c->init<runtime::Sub_sequence_const, const runtime::Task>(firsts_tasks, lasts_tasks, this->saved_exclusions);
535 48 : c->mtx_exception.reset(new std::mutex());
536 48 : c->force_exit_loop.reset(new std::atomic<bool>(false));
537 48 : return c;
538 48 : }
539 :
540 : void
541 0 : Sequence::set_thread_pinning(const bool thread_pinning, const std::vector<size_t>& puids)
542 : {
543 0 : if (thread_pinning && puids.size() < n_threads)
544 : {
545 0 : std::stringstream message;
546 0 : message << "'puids.size()' has greater or equal to 'n_threads' ('puids.size()' = " << puids.size()
547 0 : << " , 'n_threads' = " << n_threads << ").";
548 0 : throw tools::invalid_argument(__FILE__, __LINE__, __func__, message.str());
549 0 : }
550 :
551 0 : this->thread_pinning = thread_pinning;
552 0 : this->puids = puids;
553 0 : this->pin_objects_per_thread = {};
554 0 : }
555 :
556 : void
557 0 : Sequence::set_thread_pinning(const bool thread_pinning, const std::string& sequence_pinning_policy)
558 : {
559 0 : this->thread_pinning = thread_pinning;
560 0 : this->puids = {};
561 : this->pin_objects_per_thread =
562 0 : tools::Thread_pinning_utils::stage_parser_unpacker(sequence_pinning_policy, n_threads);
563 0 : }
564 :
565 : bool
566 56524 : Sequence::is_thread_pinning()
567 : {
568 56524 : return this->thread_pinning;
569 : }
570 :
571 : std::vector<std::vector<module::Module*>>
572 0 : Sequence::get_modules_per_threads() const
573 : {
574 0 : std::vector<std::vector<module::Module*>> modules_per_threads(this->all_modules.size());
575 0 : size_t tid = 0;
576 0 : for (auto& e : this->all_modules)
577 : {
578 0 : for (auto& ee : e)
579 0 : modules_per_threads[tid].push_back(ee);
580 0 : tid++;
581 : }
582 0 : return modules_per_threads;
583 0 : }
584 :
585 : std::vector<std::vector<module::Module*>>
586 2 : Sequence::get_modules_per_types() const
587 : {
588 2 : std::vector<std::vector<module::Module*>> modules_per_types(this->all_modules[0].size());
589 4 : for (auto& e : this->all_modules)
590 : {
591 2 : size_t mid = 0;
592 33 : for (auto& ee : e)
593 31 : modules_per_types[mid++].push_back(ee);
594 : }
595 2 : return modules_per_types;
596 0 : }
597 :
598 : std::vector<std::vector<runtime::Task*>>
599 4428 : Sequence::get_tasks_per_threads() const
600 : {
601 4428 : std::vector<std::vector<runtime::Task*>> tasks_per_threads(this->n_threads);
602 :
603 : std::function<void(
604 : tools::Digraph_node<Sub_sequence>*, const size_t, std::vector<tools::Digraph_node<Sub_sequence>*>&)>
605 168599 : get_tasks_recursive = [&](tools::Digraph_node<Sub_sequence>* cur_ss,
606 : const size_t tid,
607 : std::vector<tools::Digraph_node<Sub_sequence>*>& already_parsed_nodes)
608 : {
609 168599 : if (std::find(already_parsed_nodes.begin(), already_parsed_nodes.end(), cur_ss) == already_parsed_nodes.end())
610 : {
611 163627 : already_parsed_nodes.push_back(cur_ss);
612 327264 : tasks_per_threads[tid].insert(
613 163633 : tasks_per_threads[tid].end(), cur_ss->get_c()->tasks.begin(), cur_ss->get_c()->tasks.end());
614 :
615 187017 : for (auto c : cur_ss->get_children())
616 23382 : get_tasks_recursive(c, tid, already_parsed_nodes);
617 : }
618 173032 : };
619 :
620 149660 : for (size_t tid = 0; tid < this->n_threads; tid++)
621 : {
622 145225 : std::vector<tools::Digraph_node<Sub_sequence>*> already_parsed_nodes;
623 145224 : get_tasks_recursive(this->sequences[tid], tid, already_parsed_nodes);
624 145222 : }
625 :
626 8869 : return tasks_per_threads;
627 4435 : }
628 :
629 : std::vector<std::vector<runtime::Task*>>
630 29 : Sequence::get_tasks_per_types() const
631 : {
632 29 : std::vector<std::vector<runtime::Task*>> tasks_per_types(this->n_tasks);
633 :
634 : std::function<void(tools::Digraph_node<Sub_sequence>*, size_t&, std::vector<tools::Digraph_node<Sub_sequence>*>&)>
635 187 : get_tasks_recursive = [&](tools::Digraph_node<Sub_sequence>* cur_ss,
636 : size_t& mid,
637 : std::vector<tools::Digraph_node<Sub_sequence>*>& already_parsed_nodes)
638 : {
639 187 : if (std::find(already_parsed_nodes.begin(), already_parsed_nodes.end(), cur_ss) == already_parsed_nodes.end())
640 : {
641 187 : already_parsed_nodes.push_back(cur_ss);
642 1615 : for (auto& t : cur_ss->get_c()->tasks)
643 1428 : tasks_per_types[mid++].push_back(t);
644 :
645 187 : for (auto c : cur_ss->get_children())
646 0 : get_tasks_recursive(c, mid, already_parsed_nodes);
647 : }
648 216 : };
649 :
650 216 : for (size_t tid = 0; tid < this->n_threads; tid++)
651 : {
652 187 : size_t mid = 0;
653 187 : std::vector<tools::Digraph_node<Sub_sequence>*> already_parsed_nodes;
654 187 : get_tasks_recursive(this->sequences[tid], mid, already_parsed_nodes);
655 187 : }
656 :
657 58 : return tasks_per_types;
658 29 : }
659 :
660 : bool
661 777962 : Sequence::is_done() const
662 : {
663 787381 : for (auto donner : this->donners)
664 9474 : if (donner->is_done()) return true;
665 778628 : return false;
666 : }
667 :
668 : void
669 0 : Sequence::_exec(const size_t tid,
670 : std::function<bool(const std::vector<const int*>&)>& stop_condition,
671 : tools::Digraph_node<Sub_sequence>* sequence)
672 : {
673 0 : tools::Signal_handler::reset_sigint();
674 :
675 0 : if (this->is_thread_pinning())
676 : {
677 0 : if (!puids.empty())
678 0 : tools::Thread_pinning::pin(this->puids[tid]);
679 : else
680 0 : tools::Thread_pinning::pin(this->pin_objects_per_thread[tid]);
681 : }
682 :
683 : std::function<void(tools::Digraph_node<Sub_sequence>*, std::vector<const int*>&)> exec_sequence =
684 0 : [&exec_sequence](tools::Digraph_node<Sub_sequence>* cur_ss, std::vector<const int*>& statuses)
685 : {
686 0 : auto type = cur_ss->get_c()->type;
687 0 : auto& tasks_id = cur_ss->get_c()->tasks_id;
688 0 : auto& processes = cur_ss->get_c()->processes;
689 :
690 0 : if (type == subseq_t::COMMUTE)
691 : {
692 0 : statuses[tasks_id[0]] = processes[0]();
693 0 : const int path = statuses[tasks_id[0]][0];
694 0 : if (cur_ss->get_children().size() > (size_t)path) exec_sequence(cur_ss->get_children()[path], statuses);
695 : }
696 : else
697 : {
698 0 : for (size_t p = 0; p < processes.size(); p++)
699 0 : statuses[tasks_id[p]] = processes[p]();
700 0 : for (auto c : cur_ss->get_children())
701 0 : exec_sequence(c, statuses);
702 : }
703 0 : };
704 :
705 0 : std::vector<const int*> statuses(this->n_tasks, nullptr);
706 : try
707 : {
708 : do
709 : {
710 : // force switchers reset to reinitialize the path to the last input socket
711 0 : for (size_t s = 0; s < this->switchers_reset[tid].size(); s++)
712 0 : this->switchers_reset[tid][s]->reset();
713 :
714 0 : std::fill(statuses.begin(), statuses.end(), nullptr);
715 : try
716 : {
717 0 : exec_sequence(sequence, statuses);
718 : }
719 0 : catch (tools::processing_aborted const&)
720 : {
721 : // do nothing, this is normal
722 0 : }
723 0 : } while (!*force_exit_loop && !stop_condition(statuses) && !tools::Signal_handler::is_sigint());
724 : }
725 0 : catch (tools::waiting_canceled const&)
726 : {
727 : // do nothing, this is normal
728 0 : }
729 0 : catch (std::exception const& e)
730 : {
731 0 : *force_exit_loop = true;
732 :
733 0 : this->mtx_exception->lock();
734 :
735 0 : auto save = tools::exception::no_stacktrace;
736 0 : tools::exception::no_stacktrace = true;
737 0 : std::string msg = e.what(); // get only the function signature
738 0 : tools::exception::no_stacktrace = save;
739 :
740 0 : if (std::find(this->prev_exception_messages.begin(), this->prev_exception_messages.end(), msg) ==
741 0 : this->prev_exception_messages.end())
742 : {
743 0 : this->prev_exception_messages.push_back(msg); // save only the function signature
744 0 : this->prev_exception_messages_to_display.push_back(e.what()); // with stacktrace if debug mode
745 : }
746 0 : else if (std::strlen(e.what()) > this->prev_exception_messages_to_display.back().size())
747 0 : this->prev_exception_messages_to_display[prev_exception_messages_to_display.size() - 1] = e.what();
748 :
749 0 : this->mtx_exception->unlock();
750 0 : }
751 :
752 0 : if (this->is_thread_pinning()) tools::Thread_pinning::unpin();
753 0 : }
754 :
755 : void
756 4172 : Sequence::_exec_without_statuses(const size_t tid,
757 : std::function<bool()>& stop_condition,
758 : tools::Digraph_node<Sub_sequence>* sequence)
759 : {
760 4172 : tools::Signal_handler::reset_sigint();
761 :
762 4165 : if (this->is_thread_pinning())
763 : {
764 42 : if (!puids.empty())
765 0 : tools::Thread_pinning::pin(this->puids[tid]);
766 : else
767 41 : tools::Thread_pinning::pin(this->pin_objects_per_thread[tid]);
768 : }
769 :
770 : std::function<void(tools::Digraph_node<Sub_sequence>*)> exec_sequence =
771 1157201 : [&exec_sequence](tools::Digraph_node<Sub_sequence>* cur_ss)
772 : {
773 966933 : auto type = cur_ss->get_c()->type;
774 967210 : auto& processes = cur_ss->get_c()->processes;
775 :
776 967915 : if (type == subseq_t::COMMUTE)
777 : {
778 53337 : const int path = processes[0]()[0];
779 53400 : if (cur_ss->get_children().size() > (size_t)path) exec_sequence(cur_ss->get_children()[path]);
780 : }
781 : else
782 : {
783 3970447 : for (auto& process : processes)
784 3007561 : process();
785 917970 : for (auto c : cur_ss->get_children())
786 136850 : exec_sequence(c);
787 : }
788 964359 : };
789 :
790 : try
791 : {
792 : do
793 : {
794 : // force switchers reset to reinitialize the path to the last input socket
795 796342 : for (size_t s = 0; s < this->switchers_reset[tid].size(); s++)
796 12920 : this->switchers_reset[tid][s]->reset();
797 :
798 : try
799 : {
800 783188 : exec_sequence(sequence);
801 : }
802 2566 : catch (tools::processing_aborted const&)
803 : {
804 : // do nothing, this is normal
805 4 : }
806 779939 : } while (!*force_exit_loop && !stop_condition() && !tools::Signal_handler::is_sigint());
807 : }
808 2561 : catch (tools::waiting_canceled const&)
809 : {
810 : // do nothing, this is normal
811 2552 : }
812 0 : catch (std::exception const& e)
813 : {
814 0 : *force_exit_loop = true;
815 :
816 0 : this->mtx_exception->lock();
817 :
818 0 : auto save = tools::exception::no_stacktrace;
819 0 : tools::exception::no_stacktrace = true;
820 0 : std::string msg = e.what(); // get only the function signature
821 0 : tools::exception::no_stacktrace = save;
822 :
823 0 : if (std::find(this->prev_exception_messages.begin(), this->prev_exception_messages.end(), msg) ==
824 0 : this->prev_exception_messages.end())
825 : {
826 0 : this->prev_exception_messages.push_back(msg); // save only the function signature
827 0 : this->prev_exception_messages_to_display.push_back(e.what()); // with stacktrace if debug mode
828 : }
829 0 : else if (std::strlen(e.what()) > this->prev_exception_messages_to_display.back().size())
830 0 : this->prev_exception_messages_to_display[prev_exception_messages_to_display.size() - 1] = e.what();
831 :
832 0 : this->mtx_exception->unlock();
833 0 : }
834 :
835 4750 : if (this->is_thread_pinning()) tools::Thread_pinning::unpin();
836 4575 : }
837 :
838 : void
839 0 : Sequence::exec(std::function<bool(const std::vector<const int*>&)> stop_condition)
840 : {
841 0 : if (this->is_no_copy_mode()) this->gen_processes(true);
842 :
843 0 : std::function<bool(const std::vector<const int*>&)> real_stop_condition;
844 0 : if (this->auto_stop)
845 0 : real_stop_condition = [this, stop_condition](const std::vector<const int*>& statuses)
846 : {
847 0 : bool res = stop_condition(statuses);
848 0 : return res || this->is_done();
849 0 : };
850 : else
851 0 : real_stop_condition = stop_condition;
852 :
853 0 : std::function<void(const size_t)> func_exec = [this, &real_stop_condition](const size_t tid)
854 0 : { this->Sequence::_exec(tid + 1, real_stop_condition, this->sequences[tid + 1]); };
855 :
856 0 : this->thread_pool->run(func_exec, true);
857 0 : this->_exec(0, real_stop_condition, this->sequences[0]);
858 0 : this->thread_pool->wait();
859 :
860 0 : this->thread_pool->unset_func_exec();
861 :
862 0 : if (this->is_no_copy_mode() && !this->is_part_of_pipeline)
863 : {
864 0 : this->reset_no_copy_mode();
865 0 : this->gen_processes(false);
866 : }
867 :
868 0 : if (!this->prev_exception_messages_to_display.empty())
869 : {
870 0 : *force_exit_loop = false;
871 0 : throw std::runtime_error(this->prev_exception_messages_to_display.back());
872 : }
873 0 : }
874 :
875 : void
876 400 : Sequence::exec(std::function<bool()> stop_condition)
877 : {
878 400 : if (this->is_no_copy_mode()) this->gen_processes(true);
879 :
880 417 : std::function<bool()> real_stop_condition;
881 417 : if (this->auto_stop)
882 1552351 : real_stop_condition = [this, stop_condition]()
883 : {
884 774516 : bool res = stop_condition();
885 779749 : return res || this->is_done();
886 406 : };
887 : else
888 11 : real_stop_condition = stop_condition;
889 :
890 7435 : std::function<void(const size_t)> func_exec = [this, &real_stop_condition](const size_t tid)
891 4112 : { this->Sequence::_exec_without_statuses(tid + 1, real_stop_condition, this->sequences[tid + 1]); };
892 :
893 417 : this->thread_pool->run(func_exec, true);
894 417 : this->_exec_without_statuses(0, real_stop_condition, this->sequences[0]);
895 401 : this->thread_pool->wait();
896 :
897 412 : this->thread_pool->unset_func_exec();
898 :
899 406 : if (this->is_no_copy_mode() && !this->is_part_of_pipeline)
900 : {
901 73 : this->reset_no_copy_mode();
902 73 : this->gen_processes(false);
903 : }
904 :
905 406 : if (!this->prev_exception_messages_to_display.empty())
906 : {
907 0 : *force_exit_loop = false;
908 0 : throw std::runtime_error(this->prev_exception_messages_to_display.back());
909 : }
910 408 : }
911 :
912 : void
913 221 : Sequence::exec()
914 : {
915 562982 : this->exec([]() { return false; });
916 237 : }
917 :
918 : void
919 56 : Sequence::exec_seq(const size_t tid, const int frame_id)
920 : {
921 56 : if (tid >= this->sequences.size())
922 : {
923 0 : std::stringstream message;
924 0 : message << "'tid' has to be smaller than 'sequences.size()' ('tid' = " << tid
925 0 : << ", 'sequences.size()' = " << this->sequences.size() << ").";
926 0 : throw tools::runtime_error(__FILE__, __LINE__, __func__, message.str());
927 0 : }
928 :
929 : std::function<void(tools::Digraph_node<Sub_sequence>*)> exec_sequence =
930 422 : [&exec_sequence, frame_id](tools::Digraph_node<Sub_sequence>* cur_ss)
931 : {
932 56 : auto type = cur_ss->get_c()->type;
933 56 : auto& tasks = cur_ss->get_c()->tasks;
934 56 : if (type == subseq_t::COMMUTE)
935 : {
936 0 : const int path = tasks[0]->exec(frame_id)[0];
937 0 : if (cur_ss->get_children().size() > (size_t)path) exec_sequence(cur_ss->get_children()[path]);
938 : }
939 : else
940 : {
941 427 : for (size_t ta = 0; ta < tasks.size(); ta++)
942 366 : tasks[ta]->exec(frame_id);
943 61 : for (auto c : cur_ss->get_children())
944 0 : exec_sequence(c);
945 : }
946 118 : };
947 :
948 56 : exec_sequence(this->sequences[tid]);
949 62 : }
950 :
951 : runtime::Task*
952 75559 : Sequence::exec_step(const size_t tid, const int frame_id)
953 : {
954 75559 : if (tid >= this->sequences.size())
955 : {
956 0 : std::stringstream message;
957 0 : message << "'tid' has to be smaller than 'sequences.size()' ('tid' = " << tid
958 0 : << ", 'sequences.size()' = " << this->sequences.size() << ").";
959 0 : throw tools::runtime_error(__FILE__, __LINE__, __func__, message.str());
960 0 : }
961 :
962 75559 : runtime::Task* executed_task = nullptr;
963 75559 : if (this->next_round_is_over[tid])
964 : {
965 3410 : this->next_round_is_over[tid] = false;
966 3410 : this->cur_ss[tid] = this->sequences[tid];
967 3410 : this->cur_task_id[tid] = 0;
968 : }
969 : else
970 : {
971 72149 : executed_task = this->cur_ss[tid]->get_c()->tasks[cur_task_id[tid]];
972 72149 : const std::vector<int>& ret = executed_task->exec(frame_id);
973 :
974 72148 : auto type = this->cur_ss[tid]->get_c()->type;
975 72148 : if (type == subseq_t::COMMUTE)
976 : {
977 9515 : const size_t path = (size_t)ret[0];
978 9515 : if (this->cur_ss[tid]->get_children().size() > path)
979 : {
980 9515 : this->cur_ss[tid] = this->cur_ss[tid]->get_children()[path];
981 9515 : this->cur_task_id[tid] = 0;
982 :
983 9515 : if (this->cur_ss[tid]->get_c()->tasks.size() == 0)
984 : {
985 : // skip nodes without tasks if any
986 0 : while (this->cur_ss[tid]->get_children().size() > 0)
987 : {
988 0 : this->cur_ss[tid] = this->cur_ss[tid]->get_children()[0];
989 0 : this->cur_task_id[tid] = 0;
990 0 : if (this->cur_ss[tid]->get_c() && this->cur_ss[tid]->get_c()->tasks.size() > 0) break;
991 : }
992 0 : if (this->cur_task_id[tid] >= this->cur_ss[tid]->get_c()->tasks.size())
993 0 : this->next_round_is_over[tid] = true;
994 : }
995 : }
996 : else
997 : {
998 0 : std::stringstream message;
999 0 : message << "This should never happen ('path' = " << path
1000 0 : << ", 'cur_ss[tid]->get_children().size()' = " << this->cur_ss[tid]->get_children().size()
1001 0 : << ", 'tid' = " << tid << ").";
1002 0 : throw tools::invalid_argument(__FILE__, __LINE__, __func__, message.str());
1003 0 : }
1004 : }
1005 : else
1006 : {
1007 62633 : this->cur_task_id[tid]++;
1008 62633 : if (this->cur_task_id[tid] == (this->cur_ss[tid]->get_c()->tasks.size()))
1009 : {
1010 : // skip nodes without tasks if any
1011 28019 : while (this->cur_ss[tid]->get_children().size() > 0)
1012 : {
1013 24609 : this->cur_ss[tid] = this->cur_ss[tid]->get_children()[0];
1014 24609 : this->cur_task_id[tid] = 0;
1015 24609 : if (this->cur_ss[tid]->get_c() && this->cur_ss[tid]->get_c()->tasks.size() > 0) break;
1016 : }
1017 28019 : if (this->cur_task_id[tid] >= this->cur_ss[tid]->get_c()->tasks.size())
1018 3410 : this->next_round_is_over[tid] = true;
1019 : }
1020 : }
1021 : }
1022 :
1023 75558 : return executed_task;
1024 : }
1025 :
1026 : template<class SS, class TA>
1027 : tools::Digraph_node<SS>*
1028 3038 : Sequence::init_recursive(tools::Digraph_node<SS>* cur_subseq,
1029 : size_t& ssid,
1030 : size_t& taid,
1031 : std::vector<std::pair<TA*, tools::Digraph_node<SS>*>>& selectors,
1032 : std::vector<TA*>& switchers,
1033 : TA& first,
1034 : TA& current_task,
1035 : const std::vector<TA*>& lasts,
1036 : const std::vector<TA*>& exclusions,
1037 : std::vector<size_t>& real_lasts_id,
1038 : std::vector<TA*>& real_lasts,
1039 : std::map<TA*, unsigned>& in_sockets_feed,
1040 : std::map<TA*, std::pair<tools::Digraph_node<SS>*, size_t>>& task_subseq)
1041 : {
1042 3038 : if (this->tasks_inplace && !current_task.is_autoalloc())
1043 : {
1044 0 : std::stringstream message;
1045 : message << "When 'tasks_inplace' is set to true 'current_task' should be in autoalloc mode ("
1046 : << "'current_task.get_name()'"
1047 : << " = " << current_task.get_name() << ", "
1048 : << "'current_task.get_module().get_name()'"
1049 0 : << " = " << current_task.get_module().get_name() << ").";
1050 0 : throw tools::runtime_error(__FILE__, __LINE__, __func__, message.str());
1051 0 : }
1052 :
1053 3038 : if (dynamic_cast<const module::Adaptor_m_to_n*>(¤t_task.get_module()) && !this->tasks_inplace)
1054 : {
1055 0 : std::stringstream message;
1056 0 : message << "'module::Adaptor_m_to_n' objects are not supported if 'tasks_inplace' is set to false.";
1057 0 : throw tools::runtime_error(__FILE__, __LINE__, __func__, message.str());
1058 0 : }
1059 :
1060 3038 : auto it = std::find(real_lasts.begin(), real_lasts.end(), ¤t_task);
1061 3038 : if (it != real_lasts.end())
1062 : {
1063 0 : real_lasts.erase(it);
1064 0 : auto dist = std::distance(real_lasts.begin(), it);
1065 0 : real_lasts_id.erase(real_lasts_id.begin() + dist);
1066 : }
1067 :
1068 3038 : if (cur_subseq->get_contents() == nullptr)
1069 : {
1070 580 : cur_subseq->set_contents(new SS());
1071 580 : ssid++;
1072 : }
1073 :
1074 3038 : bool is_last = true;
1075 3038 : tools::Digraph_node<SS>* last_subseq = nullptr;
1076 :
1077 3038 : if (auto switcher = dynamic_cast<const module::Switcher*>(¤t_task.get_module()))
1078 : {
1079 148 : const auto current_task_name = current_task.get_name();
1080 222 : if (current_task_name == "commute" && // -------------------------------------------------------------- COMMUTE
1081 222 : std::find(switchers.begin(), switchers.end(), ¤t_task) == switchers.end())
1082 : {
1083 74 : switchers.push_back(¤t_task);
1084 74 : auto node_commute = new tools::Digraph_node<SS>({ cur_subseq }, {}, nullptr, cur_subseq->get_depth() + 1);
1085 :
1086 74 : node_commute->set_contents(new SS());
1087 74 : node_commute->get_c()->tasks.push_back(¤t_task);
1088 74 : node_commute->get_c()->tasks_id.push_back(taid++);
1089 74 : node_commute->get_c()->type = subseq_t::COMMUTE;
1090 74 : ssid++;
1091 :
1092 74 : cur_subseq->add_child(node_commute);
1093 :
1094 232 : for (size_t sdo = 0; sdo < switcher->get_n_data_sockets(); sdo++)
1095 : {
1096 316 : auto node_commute_son =
1097 158 : new tools::Digraph_node<SS>({ node_commute }, {}, nullptr, node_commute->get_depth() + 1);
1098 :
1099 158 : node_commute_son->set_contents(new SS());
1100 158 : ssid++;
1101 :
1102 158 : node_commute->add_child(node_commute_son);
1103 :
1104 158 : auto& bss = (*switcher)[module::swi::tsk::commute].sockets[sdo + 2]->get_bound_sockets();
1105 322 : for (auto bs : bss)
1106 : {
1107 164 : if (bs == nullptr) continue;
1108 164 : auto& t = bs->get_task();
1109 164 : if (std::find(exclusions.begin(), exclusions.end(), &t) == exclusions.end())
1110 : {
1111 164 : if (task_subseq.find(&t) == task_subseq.end() || task_subseq[&t].second < ssid)
1112 158 : task_subseq[&t] = { node_commute_son, ssid };
1113 :
1114 164 : in_sockets_feed.find(&t) != in_sockets_feed.end() ? in_sockets_feed[&t]++
1115 124 : : in_sockets_feed[&t] = 1;
1116 164 : bool t_is_select =
1117 164 : dynamic_cast<const module::Switcher*>(&(t.get_module())) && t.get_name() == "select";
1118 114 : if ((!t_is_select && in_sockets_feed[&t] >= (t.get_n_input_sockets() + t.get_n_fwd_sockets()) -
1119 328 : t.get_n_static_input_sockets()) ||
1120 50 : (t_is_select && t.is_last_input_socket(*bs)))
1121 : {
1122 130 : is_last = false;
1123 130 : last_subseq = Sequence::init_recursive<SS, TA>(task_subseq[&t].first,
1124 260 : task_subseq[&t].second,
1125 : taid,
1126 : selectors,
1127 : switchers,
1128 : first,
1129 : t,
1130 : lasts,
1131 : exclusions,
1132 : real_lasts_id,
1133 : real_lasts,
1134 : in_sockets_feed,
1135 : task_subseq);
1136 : }
1137 34 : else if (t_is_select)
1138 : {
1139 34 : tools::Digraph_node<SS>* node_selector = nullptr;
1140 40 : for (auto& sel : selectors)
1141 40 : if (sel.first == &t)
1142 : {
1143 34 : node_selector = sel.second;
1144 34 : break;
1145 : }
1146 :
1147 34 : if (!node_selector)
1148 : {
1149 0 : node_selector = new tools::Digraph_node<SS>(
1150 0 : { node_commute_son }, {}, nullptr, node_commute_son->get_depth() + 1);
1151 0 : selectors.push_back({ &t, node_selector });
1152 : }
1153 : else
1154 34 : node_selector->add_parent(node_commute_son);
1155 34 : node_commute_son->add_child(node_selector);
1156 : }
1157 : }
1158 : }
1159 : }
1160 : // exception for the status socket
1161 : auto& bss =
1162 74 : (*switcher)[module::swi::tsk::commute].sockets[switcher->get_n_data_sockets() + 2]->get_bound_sockets();
1163 74 : for (auto bs : bss)
1164 : {
1165 0 : if (bs == nullptr) continue;
1166 0 : auto& t = bs->get_task();
1167 0 : if (std::find(exclusions.begin(), exclusions.end(), &t) == exclusions.end())
1168 : {
1169 0 : if (task_subseq.find(&t) == task_subseq.end() || task_subseq[&t].second < ssid)
1170 0 : task_subseq[&t] = { node_commute, ssid };
1171 :
1172 0 : in_sockets_feed.find(&t) != in_sockets_feed.end() ? in_sockets_feed[&t]++ : in_sockets_feed[&t] = 1;
1173 0 : bool t_is_select =
1174 0 : dynamic_cast<const module::Switcher*>(&(t.get_module())) && t.get_name() == "select";
1175 0 : if ((!t_is_select && in_sockets_feed[&t] >= (t.get_n_input_sockets() + t.get_n_fwd_sockets()) -
1176 0 : t.get_n_static_input_sockets()) ||
1177 0 : (t_is_select && t.is_last_input_socket(*bs)))
1178 : {
1179 0 : is_last = false;
1180 0 : last_subseq = Sequence::init_recursive<SS, TA>(task_subseq[&t].first,
1181 0 : task_subseq[&t].second,
1182 : taid,
1183 : selectors,
1184 : switchers,
1185 : first,
1186 : t,
1187 : lasts,
1188 : exclusions,
1189 : real_lasts_id,
1190 : real_lasts,
1191 : in_sockets_feed,
1192 : task_subseq);
1193 : }
1194 : }
1195 : }
1196 : }
1197 74 : else if (current_task_name == "select") // ------------------------------------------------------------- SELECT
1198 : {
1199 74 : tools::Digraph_node<SS>* node_selector = nullptr;
1200 :
1201 96 : for (auto& sel : selectors)
1202 40 : if (sel.first == ¤t_task)
1203 : {
1204 18 : node_selector = sel.second;
1205 18 : break;
1206 : }
1207 :
1208 74 : if (!node_selector)
1209 : {
1210 56 : node_selector = new tools::Digraph_node<SS>({ cur_subseq }, {}, nullptr, cur_subseq->get_depth() + 1);
1211 56 : selectors.push_back({ ¤t_task, node_selector });
1212 : }
1213 :
1214 74 : node_selector->set_contents(new SS());
1215 74 : node_selector->get_c()->tasks.push_back(¤t_task);
1216 74 : node_selector->get_c()->tasks_id.push_back(taid++);
1217 74 : node_selector->get_c()->type = subseq_t::SELECT;
1218 74 : ssid++;
1219 :
1220 74 : cur_subseq->add_child(node_selector);
1221 :
1222 148 : auto node_selector_son =
1223 74 : new tools::Digraph_node<SS>({ node_selector }, {}, nullptr, node_selector->get_depth() + 1);
1224 :
1225 74 : node_selector_son->set_contents(new SS());
1226 74 : ssid++;
1227 :
1228 74 : node_selector->add_child(node_selector_son);
1229 :
1230 528 : for (auto& s : current_task.sockets)
1231 : {
1232 306 : if (!(s->get_type() == socket_t::SOUT)) continue;
1233 148 : auto bss = s->get_bound_sockets();
1234 272 : for (auto bs : bss)
1235 : {
1236 124 : if (bs == nullptr) continue;
1237 124 : auto& t = bs->get_task();
1238 124 : if (std::find(exclusions.begin(), exclusions.end(), &t) == exclusions.end())
1239 : {
1240 124 : if (task_subseq.find(&t) == task_subseq.end() || task_subseq[&t].second < ssid)
1241 124 : task_subseq[&t] = { node_selector_son, ssid };
1242 :
1243 124 : in_sockets_feed.find(&t) != in_sockets_feed.end() ? in_sockets_feed[&t]++
1244 124 : : in_sockets_feed[&t] = 1;
1245 124 : bool t_is_select =
1246 124 : dynamic_cast<const module::Switcher*>(&(t.get_module())) && t.get_name() == "select";
1247 :
1248 118 : if ((!t_is_select && in_sockets_feed[&t] >= (t.get_n_input_sockets() + t.get_n_fwd_sockets()) -
1249 248 : t.get_n_static_input_sockets()) ||
1250 6 : (t_is_select && t.is_last_input_socket(*bs)))
1251 : {
1252 86 : is_last = false;
1253 86 : last_subseq = Sequence::init_recursive<SS, TA>(task_subseq[&t].first,
1254 172 : task_subseq[&t].second,
1255 : taid,
1256 : selectors,
1257 : switchers,
1258 : first,
1259 : t,
1260 : lasts,
1261 : exclusions,
1262 : real_lasts_id,
1263 : real_lasts,
1264 : in_sockets_feed,
1265 : task_subseq);
1266 : }
1267 38 : else if (t_is_select)
1268 : {
1269 0 : tools::Digraph_node<SS>* node_selector = nullptr;
1270 0 : for (auto& sel : selectors)
1271 0 : if (sel.first == &t)
1272 : {
1273 0 : node_selector = sel.second;
1274 0 : break;
1275 : }
1276 :
1277 0 : if (!node_selector)
1278 : {
1279 0 : node_selector = new tools::Digraph_node<SS>(
1280 0 : { node_selector_son }, {}, nullptr, node_selector_son->get_depth() + 1);
1281 0 : selectors.push_back({ &t, node_selector });
1282 : }
1283 0 : node_selector->add_parent(node_selector_son);
1284 0 : node_selector_son->add_child(node_selector);
1285 : }
1286 : }
1287 : }
1288 : }
1289 : }
1290 148 : }
1291 : else // --------------------------------------------------------------------------------------------- STANDARD CASE
1292 : {
1293 2890 : cur_subseq->get_c()->tasks.push_back(¤t_task);
1294 2890 : cur_subseq->get_c()->tasks_id.push_back(taid++);
1295 :
1296 2890 : if (std::find(lasts.begin(), lasts.end(), ¤t_task) == lasts.end())
1297 : {
1298 8542 : for (auto& s : current_task.sockets)
1299 : {
1300 6091 : if (s->get_type() == socket_t::SOUT || s->get_type() == socket_t::SFWD)
1301 : {
1302 4483 : auto bss = s->get_bound_sockets();
1303 6955 : for (auto& bs : bss)
1304 : {
1305 2472 : if (bs == nullptr) continue;
1306 2472 : auto& t = bs->get_task();
1307 2472 : if (std::find(exclusions.begin(), exclusions.end(), &t) == exclusions.end())
1308 : {
1309 2448 : if (task_subseq.find(&t) == task_subseq.end() || task_subseq[&t].second < ssid)
1310 2187 : task_subseq[&t] = { cur_subseq, ssid };
1311 :
1312 2448 : in_sockets_feed.find(&t) != in_sockets_feed.end() ? in_sockets_feed[&t]++
1313 2137 : : in_sockets_feed[&t] = 1;
1314 2448 : bool t_is_select =
1315 2448 : dynamic_cast<const module::Switcher*>(&(t.get_module())) && t.get_name() == "select";
1316 7242 : if ((!t_is_select &&
1317 2346 : in_sockets_feed[&t] >= (t.get_n_input_sockets() + t.get_n_fwd_sockets()) -
1318 4896 : t.get_n_static_input_sockets()) ||
1319 102 : (t_is_select && t.is_last_input_socket(*bs)))
1320 : {
1321 2087 : is_last = false;
1322 2087 : last_subseq = Sequence::init_recursive<SS, TA>(task_subseq[&t].first,
1323 4174 : task_subseq[&t].second,
1324 : taid,
1325 : selectors,
1326 : switchers,
1327 : first,
1328 : t,
1329 : lasts,
1330 : exclusions,
1331 : real_lasts_id,
1332 : real_lasts,
1333 : in_sockets_feed,
1334 : task_subseq);
1335 : }
1336 361 : else if (t_is_select)
1337 : {
1338 50 : tools::Digraph_node<SS>* node_selector = nullptr;
1339 66 : for (auto& sel : selectors)
1340 48 : if (sel.first == &t)
1341 : {
1342 32 : node_selector = sel.second;
1343 32 : break;
1344 : }
1345 :
1346 50 : if (!node_selector)
1347 : {
1348 36 : node_selector = new tools::Digraph_node<SS>(
1349 18 : { cur_subseq }, {}, nullptr, cur_subseq->get_depth() + 1);
1350 18 : selectors.push_back({ &t, node_selector });
1351 : }
1352 50 : node_selector->add_parent(cur_subseq);
1353 50 : cur_subseq->add_child(node_selector);
1354 : }
1355 : }
1356 : }
1357 4483 : }
1358 1608 : else if (s->get_type() == socket_t::SIN && s->get_bound_sockets().size() > 1)
1359 : {
1360 0 : std::stringstream message;
1361 : message << "'s->get_bound_sockets().size()' has to be smaller or equal to 1 ("
1362 : << "'s->get_bound_sockets().size()'"
1363 0 : << " = " << s->get_bound_sockets().size() << ", "
1364 : << "'s->get_type()'"
1365 : << " = "
1366 : << "socket_t::SIN"
1367 : << ", "
1368 : << "'s->get_name()'"
1369 0 : << " = " << s->get_name() << ", "
1370 : << "'s->get_task().get_name()'"
1371 0 : << " = " << s->get_task().get_name() << ", "
1372 : << "'s->get_task().get_module().get_name()'"
1373 0 : << " = " << s->get_task().get_module().get_name() << ").";
1374 0 : throw tools::runtime_error(__FILE__, __LINE__, __func__, message.str());
1375 0 : }
1376 : }
1377 : }
1378 : }
1379 :
1380 3038 : if (is_last && std::find(real_lasts.begin(), real_lasts.end(), ¤t_task) == real_lasts.end())
1381 : {
1382 935 : real_lasts.push_back(¤t_task);
1383 935 : real_lasts_id.push_back(cur_subseq->get_contents()->tasks_id.back());
1384 : }
1385 :
1386 3038 : if (last_subseq)
1387 2103 : return last_subseq;
1388 : else
1389 935 : return cur_subseq;
1390 : }
1391 :
1392 : template<class SS, class MO>
1393 : void
1394 580 : Sequence::replicate(const tools::Digraph_node<SS>* sequence)
1395 : {
1396 580 : std::set<MO*> modules_set;
1397 580 : std::vector<const runtime::Task*> tsks_vec; // get a vector of tasks included in the tasks graph
1398 : std::function<void(const tools::Digraph_node<SS>*, std::vector<const tools::Digraph_node<SS>*>&)>
1399 580 : collect_modules_list;
1400 580 : collect_modules_list =
1401 988 : [&](const tools::Digraph_node<SS>* node, std::vector<const tools::Digraph_node<SS>*>& already_parsed_nodes)
1402 : {
1403 1976 : if (node != nullptr &&
1404 1976 : std::find(already_parsed_nodes.begin(), already_parsed_nodes.end(), node) == already_parsed_nodes.end())
1405 : {
1406 904 : already_parsed_nodes.push_back(node);
1407 904 : tsks_vec.insert(tsks_vec.end(), node->get_c()->tasks.begin(), node->get_c()->tasks.end());
1408 904 : if (node->get_c())
1409 3942 : for (auto ta : node->get_c()->tasks)
1410 3038 : modules_set.insert(&ta->get_module());
1411 1312 : for (auto c : node->get_children())
1412 408 : collect_modules_list(c, already_parsed_nodes);
1413 : }
1414 : };
1415 580 : std::vector<const tools::Digraph_node<SS>*> already_parsed_nodes;
1416 580 : collect_modules_list(sequence, already_parsed_nodes);
1417 :
1418 : // check if all the tasks of the sequence are replicable before to perform the modules clones
1419 580 : if (this->n_threads - (this->tasks_inplace ? 1 : 0))
1420 1549 : for (auto& t : tsks_vec)
1421 1330 : if (!t->is_replicable())
1422 : {
1423 0 : std::stringstream message;
1424 : message << "It is not possible to replicate this sequence because at least one of its tasks is not "
1425 : << "replicable (t->get_name() = '" << t->get_name() << "', t->get_module().get_name() = '"
1426 0 : << t->get_module().get_name() << "').";
1427 0 : throw tools::runtime_error(__FILE__, __LINE__, __func__, message.str());
1428 0 : }
1429 :
1430 580 : std::vector<MO*> modules_vec;
1431 3533 : for (auto m : modules_set)
1432 2953 : modules_vec.push_back(m);
1433 :
1434 : // clone the modules
1435 5397 : for (size_t tid = 0; tid < this->n_threads - (this->tasks_inplace ? 1 : 0); tid++)
1436 : {
1437 4817 : if (this->is_thread_pinning())
1438 : {
1439 13 : const auto real_tid = tid + (this->tasks_inplace ? 1 : 0);
1440 13 : if (!this->puids.empty())
1441 0 : tools::Thread_pinning::pin(this->puids[real_tid]);
1442 : else
1443 13 : tools::Thread_pinning::pin(this->pin_objects_per_thread[real_tid]);
1444 : }
1445 :
1446 4817 : this->modules[tid].resize(modules_vec.size());
1447 4817 : this->all_modules[tid + (this->tasks_inplace ? 1 : 0)].resize(modules_vec.size());
1448 39314 : for (size_t m = 0; m < modules_vec.size(); m++)
1449 : {
1450 : try
1451 : {
1452 34497 : this->modules[tid][m].reset(modules_vec[m]->clone());
1453 : }
1454 0 : catch (std::exception& e)
1455 : {
1456 0 : std::stringstream message;
1457 : message << "Module clone failed when trying to replicate the sequence (module name is '"
1458 0 : << modules_vec[m]->get_name() << "').";
1459 :
1460 0 : throw tools::runtime_error(__FILE__, __LINE__, __func__, message.str());
1461 0 : }
1462 34497 : this->all_modules[tid + (this->tasks_inplace ? 1 : 0)][m] = this->modules[tid][m].get();
1463 : }
1464 :
1465 4817 : if (this->is_thread_pinning()) tools::Thread_pinning::unpin();
1466 : }
1467 :
1468 76455 : auto get_module_id = [](const std::vector<MO*>& modules, const module::Module& module)
1469 : {
1470 : int m_id;
1471 420265 : for (m_id = 0; m_id < (int)modules.size(); m_id++)
1472 417208 : if (modules[m_id] == &module) return m_id;
1473 3057 : return -1;
1474 : };
1475 :
1476 73209 : auto get_task_id = [](const std::vector<std::shared_ptr<runtime::Task>>& tasks, const runtime::Task& task)
1477 : {
1478 : int t_id;
1479 88042 : for (t_id = 0; t_id < (int)tasks.size(); t_id++)
1480 88042 : if (tasks[t_id].get() == &task) return t_id;
1481 0 : return -1;
1482 : };
1483 :
1484 36547 : auto get_socket_id = [](const std::vector<std::shared_ptr<runtime::Socket>>& sockets, const runtime::Socket& socket)
1485 : {
1486 : int s_id;
1487 77573 : for (s_id = 0; s_id < (int)sockets.size(); s_id++)
1488 77573 : if (sockets[s_id].get() == &socket) return s_id;
1489 0 : return -1;
1490 : };
1491 :
1492 : std::function<void(const tools::Digraph_node<SS>*,
1493 : tools::Digraph_node<Sub_sequence>*,
1494 : const size_t,
1495 : std::vector<const tools::Digraph_node<SS>*>&,
1496 : std::map<size_t, tools::Digraph_node<Sub_sequence>*>&)>
1497 580 : replicate_sequence;
1498 :
1499 17021 : replicate_sequence = [&](const tools::Digraph_node<SS>* sequence_ref,
1500 : tools::Digraph_node<Sub_sequence>* sequence_cpy,
1501 : const size_t thread_id,
1502 : std::vector<const tools::Digraph_node<SS>*>& already_parsed_nodes,
1503 : std::map<size_t, tools::Digraph_node<Sub_sequence>*>& allocated_nodes)
1504 : {
1505 32882 : if (sequence_ref != nullptr && sequence_ref->get_c() &&
1506 16441 : std::find(already_parsed_nodes.begin(), already_parsed_nodes.end(), sequence_ref) ==
1507 32882 : already_parsed_nodes.end())
1508 : {
1509 13949 : already_parsed_nodes.push_back(sequence_ref);
1510 :
1511 13949 : auto ss_ref = sequence_ref->get_c();
1512 13949 : auto ss_cpy = new Sub_sequence();
1513 :
1514 13949 : ss_cpy->type = ss_ref->type;
1515 13949 : ss_cpy->id = ss_ref->id;
1516 13949 : ss_cpy->tasks_id = ss_ref->tasks_id;
1517 50611 : for (auto t_ref : ss_ref->tasks)
1518 : {
1519 36662 : auto& m_ref = t_ref->get_module();
1520 :
1521 36662 : auto m_id = get_module_id(modules_vec, m_ref);
1522 36662 : auto t_id = get_task_id(m_ref.tasks, *t_ref);
1523 :
1524 36662 : assert(m_id != -1);
1525 36662 : assert(t_id != -1);
1526 :
1527 : // add the task to the sub-sequence
1528 36662 : ss_cpy->tasks.push_back(this->all_modules[thread_id][m_id]->tasks[t_id].get());
1529 :
1530 : // replicate the sockets binding
1531 140961 : for (size_t s_id = 0; s_id < t_ref->sockets.size(); s_id++)
1532 : {
1533 177880 : if (t_ref->sockets[s_id]->get_type() == socket_t::SIN ||
1534 73581 : t_ref->sockets[s_id]->get_type() == socket_t::SFWD)
1535 : {
1536 36269 : const runtime::Socket* s_ref_out = nullptr;
1537 : try
1538 : {
1539 36269 : s_ref_out = &t_ref->sockets[s_id]->get_bound_socket();
1540 : }
1541 48 : catch (...)
1542 : {
1543 : }
1544 36269 : if (s_ref_out)
1545 : {
1546 36221 : auto& t_ref_out = s_ref_out->get_task();
1547 36221 : auto& m_ref_out = t_ref_out.get_module();
1548 :
1549 : // check if `t_ref_out` is included in the tasks graph
1550 36221 : auto t_in_seq = std::find(tsks_vec.begin(), tsks_vec.end(), &t_ref_out) != tsks_vec.end();
1551 36221 : auto m_id_out = get_module_id(modules_vec, m_ref_out);
1552 :
1553 36221 : if (t_in_seq && m_id_out != -1)
1554 : {
1555 33543 : auto t_id_out = get_task_id(m_ref_out.tasks, t_ref_out);
1556 33543 : auto s_id_out = get_socket_id(t_ref_out.sockets, *s_ref_out);
1557 :
1558 33543 : assert(t_id_out != -1);
1559 33543 : assert(s_id_out != -1);
1560 :
1561 33543 : (*this->all_modules[thread_id][m_id_out]).tasks[t_id_out]->set_autoalloc(true);
1562 :
1563 33543 : auto& s_in = *this->all_modules[thread_id][m_id]->tasks[t_id]->sockets[s_id];
1564 : auto& s_out =
1565 33543 : *this->all_modules[thread_id][m_id_out]->tasks[t_id_out]->sockets[s_id_out];
1566 33543 : s_in = s_out;
1567 : }
1568 : }
1569 : }
1570 : }
1571 :
1572 : // replicate the tasks binding
1573 36662 : if (t_ref->fake_input_sockets.size())
1574 : {
1575 7144 : for (auto& fsi : t_ref->fake_input_sockets)
1576 : {
1577 3572 : const runtime::Socket* s_ref_out = nullptr;
1578 : try
1579 : {
1580 3572 : s_ref_out = &fsi->get_bound_socket();
1581 : }
1582 0 : catch (...)
1583 : {
1584 : }
1585 3572 : if (s_ref_out)
1586 : {
1587 3572 : auto& t_ref_out = s_ref_out->get_task();
1588 3572 : auto& m_ref_out = t_ref_out.get_module();
1589 :
1590 : // check if `t_ref_out` is included in the tasks graph
1591 3572 : auto t_in_seq = std::find(tsks_vec.begin(), tsks_vec.end(), &t_ref_out) != tsks_vec.end();
1592 3572 : auto m_id_out = get_module_id(modules_vec, m_ref_out);
1593 :
1594 3572 : if (t_in_seq && m_id_out != -1)
1595 : {
1596 3004 : auto t_id_out = get_task_id(m_ref_out.tasks, t_ref_out);
1597 3004 : auto s_id_out = get_socket_id(t_ref_out.sockets, *s_ref_out);
1598 :
1599 3004 : assert(t_id_out != -1);
1600 3004 : assert(s_id_out != -1);
1601 :
1602 3004 : (*this->all_modules[thread_id][m_id_out]).tasks[t_id_out]->set_autoalloc(true);
1603 :
1604 3004 : auto& t_in = *this->all_modules[thread_id][m_id]->tasks[t_id];
1605 : auto& s_out =
1606 3004 : *this->all_modules[thread_id][m_id_out]->tasks[t_id_out]->sockets[s_id_out];
1607 3004 : t_in = s_out;
1608 : }
1609 : }
1610 : }
1611 : }
1612 : }
1613 :
1614 : // add the sub-sequence to the current tree node
1615 13949 : sequence_cpy->set_contents(ss_cpy);
1616 13949 : allocated_nodes[sequence_cpy->get_c()->id] = sequence_cpy;
1617 :
1618 25573 : for (size_t c = 0; c < sequence_ref->get_children().size(); c++)
1619 : {
1620 11624 : if (std::find(already_parsed_nodes.begin(),
1621 : already_parsed_nodes.end(),
1622 23248 : sequence_ref->get_children()[c]) != already_parsed_nodes.end())
1623 2492 : sequence_cpy->add_child(allocated_nodes[sequence_ref->get_children()[c]->get_c()->id]);
1624 : else
1625 18264 : sequence_cpy->add_child(new tools::Digraph_node<Sub_sequence>(
1626 9132 : { sequence_cpy }, {}, nullptr, sequence_cpy->get_depth() + 1));
1627 : }
1628 :
1629 25573 : for (size_t c = 0; c < sequence_ref->get_children().size(); c++)
1630 11624 : replicate_sequence(sequence_ref->get_children()[c],
1631 11624 : sequence_cpy->get_children()[c],
1632 : thread_id,
1633 : already_parsed_nodes,
1634 : allocated_nodes);
1635 : }
1636 : };
1637 :
1638 : std::function<void(tools::Digraph_node<Sub_sequence>*, std::vector<tools::Digraph_node<Sub_sequence>*>&)>
1639 17021 : set_autoalloc_true = [&set_autoalloc_true](tools::Digraph_node<Sub_sequence>* node,
1640 : std::vector<tools::Digraph_node<Sub_sequence>*>& already_parsed_nodes)
1641 : {
1642 16441 : if (std::find(already_parsed_nodes.begin(), already_parsed_nodes.end(), node) == already_parsed_nodes.end())
1643 : {
1644 13949 : already_parsed_nodes.push_back(node);
1645 :
1646 50611 : for (auto t : node->get_c()->tasks)
1647 36662 : t->set_autoalloc(true);
1648 25573 : for (auto c : node->get_children())
1649 11624 : set_autoalloc_true(c, already_parsed_nodes);
1650 : }
1651 : };
1652 :
1653 5397 : for (size_t thread_id = (this->tasks_inplace ? 1 : 0); thread_id < this->sequences.size(); thread_id++)
1654 : {
1655 4817 : if (this->is_thread_pinning())
1656 : {
1657 13 : if (!this->puids.empty())
1658 0 : tools::Thread_pinning::pin(this->puids[thread_id]);
1659 : else
1660 13 : tools::Thread_pinning::pin(this->pin_objects_per_thread[thread_id]);
1661 : }
1662 :
1663 4817 : this->sequences[thread_id] = new tools::Digraph_node<Sub_sequence>({}, {}, nullptr, 0);
1664 4817 : already_parsed_nodes.clear();
1665 4817 : std::map<size_t, tools::Digraph_node<Sub_sequence>*> allocated_nodes;
1666 4817 : replicate_sequence(sequence, this->sequences[thread_id], thread_id, already_parsed_nodes, allocated_nodes);
1667 4817 : std::vector<tools::Digraph_node<Sub_sequence>*> already_parsed_nodes_bis;
1668 4817 : set_autoalloc_true(this->sequences[thread_id], already_parsed_nodes_bis);
1669 :
1670 4817 : if (this->is_thread_pinning()) tools::Thread_pinning::unpin();
1671 : }
1672 580 : }
1673 :
1674 : template void
1675 : runtime::Sequence::replicate<runtime::Sub_sequence_const, const module::Module>(
1676 : const tools::Digraph_node<runtime::Sub_sequence_const>*);
1677 : template void
1678 : runtime::Sequence::replicate<runtime::Sub_sequence, module::Module>(const tools::Digraph_node<runtime::Sub_sequence>*);
1679 :
1680 : template<class SS>
1681 : void
1682 17429 : Sequence::delete_tree(tools::Digraph_node<SS>* node, std::vector<tools::Digraph_node<SS>*>& already_deleted_nodes)
1683 : {
1684 34858 : if (node != nullptr &&
1685 34858 : std::find(already_deleted_nodes.begin(), already_deleted_nodes.end(), node) == already_deleted_nodes.end())
1686 : {
1687 14853 : already_deleted_nodes.push_back(node);
1688 26885 : for (auto c : node->get_children())
1689 12032 : this->delete_tree(c, already_deleted_nodes);
1690 14853 : auto c = node->get_c();
1691 14853 : if (c != nullptr) delete c;
1692 14853 : delete node;
1693 : }
1694 17429 : }
1695 :
1696 : template void
1697 : runtime::Sequence::delete_tree<runtime::Sub_sequence_const>(
1698 : tools::Digraph_node<runtime::Sub_sequence_const>*,
1699 : std::vector<tools::Digraph_node<Sub_sequence_const>*>& already_deleted_nodes);
1700 : template void
1701 : runtime::Sequence::delete_tree<runtime::Sub_sequence>(
1702 : tools::Digraph_node<runtime::Sub_sequence>*,
1703 : std::vector<tools::Digraph_node<Sub_sequence>*>& already_deleted_nodes);
1704 :
1705 : template<class VTA>
1706 : void
1707 67 : Sequence::export_dot_subsequence(const VTA& subseq,
1708 : const std::vector<size_t>& tasks_id,
1709 : const subseq_t& subseq_type,
1710 : const std::string& subseq_name,
1711 : const std::string& tab,
1712 : std::ostream& stream) const
1713 : {
1714 67 : if (!subseq_name.empty())
1715 : {
1716 67 : stream << tab << "subgraph \"cluster_" << subseq_name << "_" << +this << "\" {" << std::endl;
1717 67 : stream << tab << tab << "node [style=filled];" << std::endl;
1718 : }
1719 67 : size_t exec_order = 0;
1720 301 : for (auto& t : subseq)
1721 : {
1722 234 : std::string color = dynamic_cast<module::Adaptor_m_to_n*>(&t->get_module()) ? "green" : "blue";
1723 234 : color = dynamic_cast<module::AProbe*>(&t->get_module()) ? "pink" : color;
1724 234 : stream << tab << tab << "subgraph \"cluster_" << +&t->get_module() << "_" << +t << "\" {" << std::endl;
1725 234 : stream << tab << tab << tab << "node [style=filled];" << std::endl;
1726 234 : stream << tab << tab << tab << "subgraph \"cluster_" << +&t << "\" {" << std::endl;
1727 234 : stream << tab << tab << tab << tab << "node [style=filled];" << std::endl;
1728 :
1729 234 : if (t->fake_input_sockets.size())
1730 : {
1731 47 : std::string stype = "in";
1732 94 : for (auto& fsi : t->fake_input_sockets)
1733 47 : stream << tab << tab << tab << tab << "\"" << +fsi.get() << "\""
1734 47 : << "[label=\"" << stype << ":" << fsi->get_name() << "\", style=filled, "
1735 94 : << "fillcolor=red, penwidth=\"2.0\"];" << std::endl;
1736 47 : }
1737 :
1738 234 : size_t sid = 0;
1739 844 : for (auto& s : t->sockets)
1740 : {
1741 610 : std::string stype;
1742 610 : bool static_input = false;
1743 610 : switch (s->get_type())
1744 : {
1745 170 : case socket_t::SIN:
1746 170 : stype = "in[" + std::to_string(sid) + "]";
1747 170 : static_input = s->_get_dataptr() != nullptr && s->bound_socket == nullptr;
1748 170 : break;
1749 397 : case socket_t::SOUT:
1750 397 : stype = "out[" + std::to_string(sid) + "]";
1751 397 : break;
1752 43 : case socket_t::SFWD:
1753 43 : stype = "fwd[" + std::to_string(sid) + "]";
1754 43 : break;
1755 0 : default:
1756 0 : stype = "unkn";
1757 0 : break;
1758 : }
1759 :
1760 610 : std::string bold_or_not;
1761 610 : if (t->is_last_input_socket(*s)) bold_or_not = ", penwidth=\"2.0\"";
1762 :
1763 610 : stream << tab << tab << tab << tab << "\"" << +s.get() << "\""
1764 610 : << "[label=\"" << stype << ":" << s->get_name() << "\"" << bold_or_not
1765 1220 : << (static_input ? ", style=filled, fillcolor=green" : "") << "];" << std::endl;
1766 610 : sid++;
1767 : }
1768 :
1769 468 : stream << tab << tab << tab << tab << "label=\"" << t->get_name() << " (id = " << tasks_id[exec_order] << ")"
1770 234 : << "\";" << std::endl;
1771 234 : stream << tab << tab << tab << tab << "color=" << (t->is_replicable() ? color : "red") << ";" << std::endl;
1772 234 : stream << tab << tab << tab << "}" << std::endl;
1773 234 : stream << tab << tab << tab << "label=\"" << t->get_module().get_name() << "\n"
1774 468 : << (t->get_module().get_custom_name().empty() ? "" : t->get_module().get_custom_name() + "\n")
1775 702 : << "exec order: [" << exec_order++ << "]\n"
1776 234 : << "addr: " << +&t->get_module() << "\";" << std::endl;
1777 234 : stream << tab << tab << tab << "color=" << color << ";" << std::endl;
1778 234 : stream << tab << tab << "}" << std::endl;
1779 : }
1780 67 : if (!subseq_name.empty())
1781 : {
1782 67 : stream << tab << tab << "label=\"" << subseq_name << "\";" << std::endl;
1783 67 : std::string color = subseq_type == subseq_t::COMMUTE || subseq_type == subseq_t::SELECT ? "red" : "blue";
1784 67 : stream << tab << tab << "color=" << color << ";" << std::endl;
1785 67 : stream << tab << "}" << std::endl;
1786 67 : }
1787 67 : }
1788 :
1789 : template void
1790 : runtime::Sequence::export_dot_subsequence<std::vector<runtime::Task*>>(const std::vector<runtime::Task*>&,
1791 : const std::vector<size_t>&,
1792 : const subseq_t&,
1793 : const std::string&,
1794 : const std::string&,
1795 : std::ostream&) const;
1796 : template void
1797 : runtime::Sequence::export_dot_subsequence<std::vector<const runtime::Task*>>(const std::vector<const runtime::Task*>&,
1798 : const std::vector<size_t>&,
1799 : const subseq_t&,
1800 : const std::string&,
1801 : const std::string&,
1802 : std::ostream&) const;
1803 :
1804 : template<class VTA>
1805 : void
1806 67 : Sequence::export_dot_connections(const VTA& subseq, const std::string& tab, std::ostream& stream) const
1807 : {
1808 301 : for (auto& t : subseq)
1809 : {
1810 844 : for (auto& s : t->sockets)
1811 : {
1812 610 : if (s->get_type() == socket_t::SOUT || s->get_type() == socket_t::SFWD)
1813 : {
1814 440 : auto& bss = s->get_bound_sockets();
1815 440 : size_t id = 0;
1816 700 : for (auto& bs : bss)
1817 : {
1818 260 : stream << tab << "\"" << +s.get() << "\" -> \"" << +bs << "\""
1819 260 : << (bss.size() > 1 ? "[label=\"" + std::to_string(id++) + "\"]" : "") << std::endl;
1820 : }
1821 : }
1822 : }
1823 : }
1824 67 : }
1825 :
1826 : template void
1827 : runtime::Sequence::export_dot_connections<std::vector<runtime::Task*>>(const std::vector<runtime::Task*>&,
1828 : const std::string&,
1829 : std::ostream&) const;
1830 : template void
1831 : runtime::Sequence::export_dot_connections<std::vector<const runtime::Task*>>(const std::vector<const runtime::Task*>&,
1832 : const std::string&,
1833 : std::ostream&) const;
1834 :
1835 : void
1836 12 : Sequence::export_dot(std::ostream& stream) const
1837 : {
1838 12 : auto root = this->sequences[0];
1839 12 : this->export_dot(root, stream);
1840 12 : }
1841 :
1842 : template<class SS>
1843 : void
1844 12 : Sequence::export_dot(tools::Digraph_node<SS>* root, std::ostream& stream) const
1845 : {
1846 : std::function<void(
1847 : tools::Digraph_node<SS>*, const std::string&, std::ostream&, std::vector<tools::Digraph_node<SS>*>&)>
1848 12 : export_dot_subsequences_recursive =
1849 50 : [&export_dot_subsequences_recursive, this](tools::Digraph_node<SS>* cur_node,
1850 : const std::string& tab,
1851 : std::ostream& stream,
1852 : std::vector<tools::Digraph_node<SS>*>& already_parsed_nodes)
1853 : {
1854 100 : if (cur_node != nullptr &&
1855 100 : std::find(already_parsed_nodes.begin(), already_parsed_nodes.end(), cur_node) == already_parsed_nodes.end())
1856 : {
1857 42 : already_parsed_nodes.push_back(cur_node);
1858 126 : this->export_dot_subsequence(cur_node->get_c()->tasks,
1859 42 : cur_node->get_c()->tasks_id,
1860 42 : cur_node->get_c()->type,
1861 42 : "Sub-sequence " + std::to_string(cur_node->get_c()->id) +
1862 : " (depth = " + std::to_string(cur_node->get_depth()) + ")",
1863 : tab,
1864 : stream);
1865 :
1866 80 : for (auto c : cur_node->get_children())
1867 38 : export_dot_subsequences_recursive(c, tab, stream, already_parsed_nodes);
1868 : }
1869 : };
1870 :
1871 : std::function<void(
1872 : tools::Digraph_node<SS>*, const std::string&, std::ostream&, std::vector<tools::Digraph_node<SS>*>&)>
1873 12 : export_dot_connections_recursive =
1874 50 : [&export_dot_connections_recursive, this](tools::Digraph_node<SS>* cur_node,
1875 : const std::string& tab,
1876 : std::ostream& stream,
1877 : std::vector<tools::Digraph_node<SS>*>& already_parsed_nodes)
1878 : {
1879 100 : if (cur_node != nullptr &&
1880 100 : std::find(already_parsed_nodes.begin(), already_parsed_nodes.end(), cur_node) == already_parsed_nodes.end())
1881 : {
1882 42 : already_parsed_nodes.push_back(cur_node);
1883 42 : this->export_dot_connections(cur_node->get_c()->tasks, tab, stream);
1884 :
1885 80 : for (auto c : cur_node->get_children())
1886 38 : export_dot_connections_recursive(c, tab, stream, already_parsed_nodes);
1887 : }
1888 : };
1889 :
1890 12 : std::string tab = "\t";
1891 12 : stream << "digraph Sequence {" << std::endl;
1892 12 : std::vector<tools::Digraph_node<SS>*> already_parsed_nodes;
1893 12 : export_dot_subsequences_recursive(root, tab, stream, already_parsed_nodes);
1894 12 : already_parsed_nodes.clear();
1895 12 : export_dot_connections_recursive(root, tab, stream, already_parsed_nodes);
1896 12 : stream << "}" << std::endl;
1897 12 : }
1898 :
1899 : void
1900 1384 : Sequence::gen_processes(const bool no_copy_mode)
1901 : {
1902 : std::function<void(Socket * socket, std::vector<runtime::Socket*> & list_fwd)> explore_thread_rec =
1903 12248 : [&explore_thread_rec](Socket* socket, std::vector<runtime::Socket*>& list_fwd)
1904 : {
1905 9829 : auto bound = socket->get_bound_sockets();
1906 19223 : for (auto explore_bound : bound)
1907 : {
1908 10811 : if (find(list_fwd.begin(), list_fwd.end(), explore_bound) == list_fwd.end() &&
1909 1424 : explore_bound->get_type() != socket_t::SOUT)
1910 : {
1911 1430 : list_fwd.push_back(explore_bound);
1912 : }
1913 9395 : if (explore_bound->get_type() == socket_t::SFWD) explore_thread_rec(explore_bound, list_fwd);
1914 : }
1915 11198 : };
1916 :
1917 : std::function<void(Socket * socket, std::vector<runtime::Socket*> & list_fwd)> explore_thread_rec_reverse =
1918 2359 : [&explore_thread_rec, &explore_thread_rec_reverse](Socket* socket, std::vector<runtime::Socket*>& list_fwd)
1919 : {
1920 1177 : auto bound = &socket->get_bound_socket();
1921 1177 : if (find(list_fwd.begin(), list_fwd.end(), bound) == list_fwd.end())
1922 : {
1923 1177 : list_fwd.push_back(bound);
1924 : }
1925 1178 : if (bound->get_type() == socket_t::SFWD)
1926 : {
1927 591 : explore_thread_rec(bound, list_fwd);
1928 591 : explore_thread_rec_reverse(bound, list_fwd);
1929 : }
1930 2559 : };
1931 :
1932 : std::function<void(tools::Digraph_node<Sub_sequence>*, std::vector<tools::Digraph_node<Sub_sequence>*>&)>
1933 : gen_processes_recursive =
1934 42403 : [&gen_processes_recursive, no_copy_mode, &explore_thread_rec, &explore_thread_rec_reverse](
1935 : tools::Digraph_node<Sub_sequence>* cur_node,
1936 56224 : std::vector<tools::Digraph_node<Sub_sequence>*>& already_parsed_nodes)
1937 : {
1938 84803 : if (cur_node != nullptr &&
1939 84803 : std::find(already_parsed_nodes.begin(), already_parsed_nodes.end(), cur_node) == already_parsed_nodes.end())
1940 : {
1941 36280 : already_parsed_nodes.push_back(cur_node);
1942 :
1943 36280 : std::map<runtime::Task*, std::function<const int*()>> modified_tasks;
1944 36284 : auto contents = cur_node->get_c();
1945 36286 : contents->processes.clear();
1946 36285 : contents->rebind_sockets.clear();
1947 36287 : contents->rebind_dataptrs.clear();
1948 144045 : for (auto task : contents->tasks)
1949 : {
1950 117782 : if (dynamic_cast<module::Switcher*>(&task->get_module()) &&
1951 225532 : task->get_name().find("select") != std::string::npos && no_copy_mode)
1952 : {
1953 1435 : auto select_task = task;
1954 1435 : auto switcher = dynamic_cast<module::Switcher*>(&select_task->get_module());
1955 1435 : switcher->set_no_copy_select(true);
1956 :
1957 1435 : const auto rebind_id = contents->rebind_sockets.size();
1958 1435 : contents->rebind_sockets.resize(rebind_id + 1);
1959 1435 : contents->rebind_dataptrs.resize(rebind_id + 1);
1960 :
1961 6077 : for (size_t s = 0; s < select_task->sockets.size() - 1; s++)
1962 : {
1963 : // there should be only one output socket at this time
1964 4642 : if (select_task->sockets[s]->get_type() == socket_t::SOUT)
1965 : {
1966 1435 : std::vector<runtime::Socket*> bound_sockets;
1967 1435 : std::vector<void*> dataptrs;
1968 :
1969 2870 : for (auto socket : select_task->sockets[s]->get_bound_sockets())
1970 : {
1971 1435 : bound_sockets.push_back(socket);
1972 1435 : if (socket->get_type() == socket_t::SFWD) explore_thread_rec(socket, bound_sockets);
1973 : }
1974 2870 : for (auto sck : bound_sockets)
1975 1435 : dataptrs.push_back(sck->_get_dataptr());
1976 :
1977 1435 : contents->rebind_sockets[rebind_id].push_back(bound_sockets);
1978 1435 : contents->rebind_dataptrs[rebind_id].push_back(dataptrs);
1979 1435 : }
1980 : }
1981 :
1982 446203 : modified_tasks[select_task] = [contents, select_task, switcher, rebind_id]() -> const int*
1983 : {
1984 49279 : select_task->exec();
1985 49294 : const int* status = select_task->sockets.back()->get_dataptr<int>();
1986 :
1987 49251 : const auto path = switcher->get_path();
1988 49240 : const auto in_dataptr = select_task->sockets[path]->_get_dataptr();
1989 :
1990 : // rebind input sockets on the fly
1991 : // there should be only one output socket at this time (sout_id == 0)
1992 98506 : for (size_t sout_id = 0; sout_id < contents->rebind_sockets[rebind_id].size(); sout_id++)
1993 98526 : for (size_t sin_id = 0; sin_id < contents->rebind_sockets[rebind_id][sout_id].size();
1994 : sin_id++)
1995 49237 : contents->rebind_sockets[rebind_id][sout_id][sin_id]->dataptr = in_dataptr;
1996 :
1997 49290 : return status;
1998 1435 : };
1999 : }
2000 :
2001 117779 : if (dynamic_cast<module::Switcher*>(&task->get_module()) &&
2002 225535 : task->get_name().find("commute") != std::string::npos && no_copy_mode)
2003 : {
2004 1435 : auto commute_task = task;
2005 1435 : auto switcher = dynamic_cast<module::Switcher*>(&commute_task->get_module());
2006 1435 : switcher->set_no_copy_commute(true);
2007 :
2008 1435 : const auto rebind_id = contents->rebind_sockets.size();
2009 1435 : contents->rebind_sockets.resize(rebind_id + 1);
2010 1435 : contents->rebind_dataptrs.resize(rebind_id + 1);
2011 :
2012 7512 : for (size_t s = 0; s < commute_task->sockets.size() - 1; s++)
2013 : {
2014 6077 : if (commute_task->sockets[s]->get_type() == socket_t::SOUT)
2015 : {
2016 3207 : std::vector<runtime::Socket*> bound_sockets;
2017 3207 : std::vector<void*> dataptrs;
2018 :
2019 6559 : for (auto socket : commute_task->sockets[s]->get_bound_sockets())
2020 : {
2021 3352 : bound_sockets.push_back(socket);
2022 3352 : if (socket->get_type() == socket_t::SFWD) explore_thread_rec(socket, bound_sockets);
2023 : }
2024 6559 : for (auto sck : bound_sockets)
2025 3352 : dataptrs.push_back(sck->_get_dataptr());
2026 :
2027 3207 : contents->rebind_sockets[rebind_id].push_back(bound_sockets);
2028 3207 : contents->rebind_dataptrs[rebind_id].push_back(dataptrs);
2029 3207 : }
2030 : }
2031 :
2032 350850 : modified_tasks[commute_task] = [contents, commute_task, switcher, rebind_id]() -> const int*
2033 : {
2034 49253 : commute_task->exec();
2035 49316 : const int* status = commute_task->sockets.back()->get_dataptr<int>();
2036 49254 : const auto in_dataptr = commute_task->sockets[0]->_get_dataptr();
2037 49198 : const auto path = switcher->get_path();
2038 :
2039 : // rebind input sockets on the fly
2040 100093 : for (size_t sin_id = 0; sin_id < contents->rebind_sockets[rebind_id][path].size(); sin_id++)
2041 50866 : contents->rebind_sockets[rebind_id][path][sin_id]->dataptr = in_dataptr;
2042 :
2043 49286 : return status;
2044 1435 : };
2045 : }
2046 :
2047 118089 : if (dynamic_cast<module::Adaptor_m_to_n*>(&task->get_module()) &&
2048 225842 : task->get_name().find("pull") != std::string::npos && no_copy_mode)
2049 : {
2050 2582 : auto pull_task = task;
2051 2582 : auto adp_pull = dynamic_cast<module::Adaptor_m_to_n*>(&pull_task->get_module());
2052 2582 : adp_pull->set_no_copy_pull(true);
2053 2582 : const auto rebind_id = contents->rebind_sockets.size();
2054 2580 : contents->rebind_sockets.resize(rebind_id + 1);
2055 2581 : contents->rebind_dataptrs.resize(rebind_id + 1);
2056 :
2057 7310 : for (size_t s = 0; s < pull_task->sockets.size() - 1; s++)
2058 : {
2059 4729 : if (pull_task->sockets[s]->get_type() == socket_t::SOUT)
2060 : {
2061 4730 : std::vector<runtime::Socket*> bound_sockets;
2062 4729 : std::vector<void*> dataptrs;
2063 :
2064 4727 : bound_sockets.push_back(pull_task->sockets[s].get());
2065 10325 : for (auto socket : pull_task->sockets[s]->get_bound_sockets())
2066 : {
2067 5600 : bound_sockets.push_back(socket);
2068 5600 : if (socket->get_type() == socket_t::SFWD) explore_thread_rec(socket, bound_sockets);
2069 : }
2070 15949 : for (auto sck : bound_sockets)
2071 11233 : dataptrs.push_back(sck->_get_dataptr());
2072 :
2073 4713 : contents->rebind_sockets[rebind_id].push_back(bound_sockets);
2074 4730 : contents->rebind_dataptrs[rebind_id].push_back(dataptrs);
2075 4730 : }
2076 : }
2077 :
2078 8115040 : modified_tasks[pull_task] = [contents, pull_task, adp_pull, rebind_id]() -> const int*
2079 : {
2080 : // active or passive waiting here
2081 543177 : pull_task->exec();
2082 532364 : const int* status = pull_task->sockets.back()->get_dataptr<int>();
2083 :
2084 : // rebind input sockets on the fly
2085 1287178 : for (size_t sin_id = 0; sin_id < contents->rebind_sockets[rebind_id].size(); sin_id++)
2086 : {
2087 757123 : if (contents->rebind_sockets[rebind_id][sin_id].size() > 1)
2088 : {
2089 : // we start to 1 because the rebinding of the 'pull_task' is made in the
2090 : // 'pull_task->exec()' call (this way the debug mode is still working)
2091 755760 : auto swap_buff = contents->rebind_sockets[rebind_id][sin_id][1]->_get_dataptr();
2092 754411 : auto buff = adp_pull->get_filled_buffer(sin_id, swap_buff);
2093 735798 : contents->rebind_sockets[rebind_id][sin_id][1]->dataptr = buff;
2094 : // for the next tasks the same buffer 'buff' is required, an easy mistake is to re-swap
2095 : // and the data will be false, this is why we just bind 'buff'
2096 1474748 : for (size_t ta = 2; ta < contents->rebind_sockets[rebind_id][sin_id].size(); ta++)
2097 735925 : contents->rebind_sockets[rebind_id][sin_id][ta]->dataptr = buff;
2098 : }
2099 : }
2100 533400 : adp_pull->wake_up_pusher();
2101 548702 : return status;
2102 2576 : };
2103 : }
2104 :
2105 118090 : if (dynamic_cast<module::Adaptor_m_to_n*>(&task->get_module()) &&
2106 225847 : task->get_name().find("push") != std::string::npos && no_copy_mode)
2107 : {
2108 2584 : auto push_task = task;
2109 2584 : auto adp_push = dynamic_cast<module::Adaptor_m_to_n*>(&push_task->get_module());
2110 2584 : adp_push->set_no_copy_push(true);
2111 2584 : const auto rebind_id = contents->rebind_sockets.size();
2112 2583 : contents->rebind_sockets.resize(rebind_id + 1);
2113 2582 : contents->rebind_dataptrs.resize(rebind_id + 1);
2114 :
2115 8341 : for (size_t s = 0; s < push_task->sockets.size() - 1; s++)
2116 5761 : if (push_task->sockets[s]->get_type() == socket_t::SIN)
2117 : {
2118 5760 : std::vector<runtime::Socket*> bound_sockets;
2119 5759 : std::vector<void*> dataptrs;
2120 :
2121 5758 : bound_sockets.push_back(push_task->sockets[s].get());
2122 :
2123 5753 : auto bound_socket = &push_task->sockets[s]->get_bound_socket();
2124 5753 : bound_sockets.push_back(bound_socket);
2125 5752 : explore_thread_rec(bound_socket, bound_sockets);
2126 :
2127 : // If the socket is FWD, we have to update all the other sockets with a backward
2128 : // exploration
2129 5753 : if (bound_socket->get_type() == socket_t::SFWD)
2130 587 : explore_thread_rec_reverse(bound_socket, bound_sockets);
2131 :
2132 18914 : for (auto sck : bound_sockets)
2133 13170 : dataptrs.push_back(sck->_get_dataptr());
2134 :
2135 5729 : contents->rebind_sockets[rebind_id].push_back(bound_sockets);
2136 5754 : contents->rebind_dataptrs[rebind_id].push_back(dataptrs);
2137 5760 : }
2138 :
2139 7448428 : modified_tasks[push_task] = [contents, push_task, adp_push, rebind_id]() -> const int*
2140 : {
2141 : // active or passive waiting here
2142 545515 : push_task->exec();
2143 565002 : const int* status = push_task->sockets.back()->get_dataptr<int>();
2144 : // rebind output sockets on the fly
2145 1350074 : for (size_t sout_id = 0; sout_id < contents->rebind_sockets[rebind_id].size(); sout_id++)
2146 : {
2147 : // we start to 1 because the rebinding of the 'push_task' is made in the
2148 : // 'push_task->exec()' call (this way the debug mode is still working)
2149 796579 : auto swap_buff = contents->rebind_sockets[rebind_id][sout_id][1]->_get_dataptr();
2150 795802 : auto buff = adp_push->get_empty_buffer(sout_id, swap_buff);
2151 784462 : contents->rebind_sockets[rebind_id][sout_id][1]->dataptr = buff;
2152 : // the output socket linked to the push adp can have more than one socket bound and so
2153 : // we have to rebind all the input sokects bound to the current output socket
2154 1412858 : for (size_t ta = 2; ta < contents->rebind_sockets[rebind_id][sout_id].size(); ta++)
2155 628123 : contents->rebind_sockets[rebind_id][sout_id][ta]->dataptr = buff;
2156 : }
2157 564852 : adp_push->wake_up_puller();
2158 571286 : return status;
2159 2579 : };
2160 : }
2161 : }
2162 :
2163 144030 : for (auto task : contents->tasks)
2164 107736 : if (modified_tasks.count(task))
2165 8032 : contents->processes.push_back(modified_tasks[task]);
2166 : else
2167 99715 : contents->processes.push_back(
2168 4016839 : [task]() -> const int*
2169 : {
2170 2004728 : task->exec();
2171 2012111 : const int* status = task->sockets.back()->get_dataptr<int>();
2172 1995536 : return status;
2173 : });
2174 :
2175 64712 : for (auto c : cur_node->get_children())
2176 28432 : gen_processes_recursive(c, already_parsed_nodes);
2177 36289 : }
2178 43788 : };
2179 :
2180 1397 : size_t thread_id = 0;
2181 15374 : for (auto& sequence : this->sequences)
2182 : {
2183 13972 : if (this->is_thread_pinning())
2184 : {
2185 126 : if (!this->puids.empty())
2186 0 : tools::Thread_pinning::pin(this->puids[thread_id++]);
2187 : else
2188 126 : tools::Thread_pinning::pin(this->pin_objects_per_thread[thread_id++]);
2189 : }
2190 13972 : std::vector<tools::Digraph_node<Sub_sequence>*> already_parsed_nodes;
2191 13971 : gen_processes_recursive(sequence, already_parsed_nodes);
2192 :
2193 13978 : if (this->is_thread_pinning()) tools::Thread_pinning::unpin();
2194 13978 : }
2195 1398 : }
2196 :
2197 : void
2198 409 : Sequence::reset_no_copy_mode()
2199 : {
2200 : std::function<void(tools::Digraph_node<Sub_sequence>*, std::vector<tools::Digraph_node<Sub_sequence>*>&)>
2201 : reset_no_copy_mode_recursive =
2202 12515 : [&reset_no_copy_mode_recursive](tools::Digraph_node<Sub_sequence>* cur_node,
2203 8200 : std::vector<tools::Digraph_node<Sub_sequence>*>& already_parsed_nodes)
2204 : {
2205 25030 : if (cur_node != nullptr &&
2206 25030 : std::find(already_parsed_nodes.begin(), already_parsed_nodes.end(), cur_node) == already_parsed_nodes.end())
2207 : {
2208 10743 : already_parsed_nodes.push_back(cur_node);
2209 10743 : auto contents = cur_node->get_c();
2210 44917 : for (auto task : contents->tasks)
2211 : {
2212 37044 : if (dynamic_cast<module::Switcher*>(&task->get_module()) &&
2213 37044 : task->get_name().find("select") != std::string::npos)
2214 : {
2215 1435 : auto select_task = task;
2216 1435 : auto switcher = dynamic_cast<module::Switcher*>(&select_task->get_module());
2217 1435 : switcher->set_no_copy_select(false);
2218 : }
2219 :
2220 37044 : if (dynamic_cast<module::Switcher*>(&task->get_module()) &&
2221 37044 : task->get_name().find("commute") != std::string::npos)
2222 : {
2223 1435 : auto commute_task = task;
2224 1435 : auto switcher = dynamic_cast<module::Switcher*>(&commute_task->get_module());
2225 1435 : switcher->set_no_copy_commute(false);
2226 : }
2227 :
2228 39342 : if (dynamic_cast<module::Adaptor_m_to_n*>(&task->get_module()) &&
2229 39342 : task->get_name().find("pull") != std::string::npos)
2230 : {
2231 2584 : auto pull_task = task;
2232 2584 : auto adp_pull = dynamic_cast<module::Adaptor_m_to_n*>(&pull_task->get_module());
2233 2584 : adp_pull->set_no_copy_pull(false);
2234 2584 : adp_pull->reset_buffer();
2235 : }
2236 :
2237 39342 : if (dynamic_cast<module::Adaptor_m_to_n*>(&task->get_module()) &&
2238 39342 : task->get_name().find("push") != std::string::npos)
2239 : {
2240 2584 : auto push_task = task;
2241 2584 : auto adp_push = dynamic_cast<module::Adaptor_m_to_n*>(&push_task->get_module());
2242 2584 : adp_push->set_no_copy_push(false);
2243 2584 : adp_push->reset_buffer();
2244 : }
2245 : }
2246 :
2247 18781 : for (size_t rebind_id = 0; rebind_id < contents->rebind_sockets.size(); rebind_id++)
2248 23177 : for (size_t s = 0; s < contents->rebind_sockets[rebind_id].size(); s++)
2249 44406 : for (size_t ta = 0; ta < contents->rebind_sockets[rebind_id][s].size(); ta++)
2250 29267 : contents->rebind_sockets[rebind_id][s][ta]->dataptr =
2251 29267 : contents->rebind_dataptrs[rebind_id][s][ta];
2252 :
2253 18943 : for (auto c : cur_node->get_children())
2254 8200 : reset_no_copy_mode_recursive(c, already_parsed_nodes);
2255 : }
2256 12924 : };
2257 :
2258 4724 : for (auto& sequence : this->sequences)
2259 : {
2260 4315 : std::vector<tools::Digraph_node<Sub_sequence>*> already_parsed_nodes;
2261 4315 : reset_no_copy_mode_recursive(sequence, already_parsed_nodes);
2262 4315 : }
2263 409 : }
2264 :
2265 : void
2266 78 : Sequence::set_no_copy_mode(const bool no_copy_mode)
2267 : {
2268 78 : this->no_copy_mode = no_copy_mode;
2269 78 : }
2270 :
2271 : bool
2272 1145 : Sequence::is_no_copy_mode() const
2273 : {
2274 1145 : return this->no_copy_mode;
2275 : }
2276 :
2277 : void
2278 22 : Sequence::set_auto_stop(const bool auto_stop)
2279 : {
2280 22 : this->auto_stop = auto_stop;
2281 22 : }
2282 :
2283 : bool
2284 0 : Sequence::is_auto_stop() const
2285 : {
2286 0 : return this->auto_stop;
2287 : }
2288 :
2289 : Sub_sequence*
2290 15492 : Sequence::get_last_subsequence(const size_t tid)
2291 : {
2292 : std::function<Sub_sequence*(tools::Digraph_node<Sub_sequence>*, std::vector<tools::Digraph_node<Sub_sequence>*>&)>
2293 : get_last_subsequence_recursive =
2294 19500 : [&get_last_subsequence_recursive,
2295 : &tid](tools::Digraph_node<Sub_sequence>* cur_node,
2296 4008 : std::vector<tools::Digraph_node<Sub_sequence>*>& already_parsed_nodes) -> Sub_sequence*
2297 : {
2298 39000 : if (cur_node != nullptr &&
2299 39000 : std::find(already_parsed_nodes.begin(), already_parsed_nodes.end(), cur_node) == already_parsed_nodes.end())
2300 : {
2301 18714 : already_parsed_nodes.push_back(cur_node);
2302 18714 : if (!cur_node->get_children().size()) return cur_node->get_contents();
2303 3222 : Sub_sequence* last_ss = nullptr;
2304 7230 : for (auto c : cur_node->get_children())
2305 : {
2306 4008 : Sub_sequence* last_branch_ss = nullptr;
2307 4008 : last_branch_ss = get_last_subsequence_recursive(c, already_parsed_nodes);
2308 4008 : if (last_ss && last_branch_ss && last_ss != last_branch_ss)
2309 : {
2310 0 : std::stringstream message;
2311 : message << "Multiple candidates have been found for the last subsequence, this shouldn't be "
2312 0 : << "possible. (tid = " << tid << ", "
2313 0 : << "last_ss.id = " << last_ss->id << ", "
2314 0 : << "last_branch_ss.id = " << last_branch_ss->id << ")";
2315 0 : throw tools::runtime_error(__FILE__, __LINE__, __func__, message.str());
2316 0 : }
2317 4008 : last_ss = last_branch_ss ? last_branch_ss : last_ss;
2318 : }
2319 3222 : return last_ss;
2320 : }
2321 786 : return nullptr;
2322 15492 : };
2323 :
2324 15492 : std::vector<tools::Digraph_node<Sub_sequence>*> already_parsed_nodes;
2325 30984 : return get_last_subsequence_recursive(this->sequences[tid], already_parsed_nodes);
2326 15492 : }
2327 :
2328 : void
2329 15492 : Sequence::update_tasks_id(const size_t tid)
2330 : {
2331 : std::function<void(tools::Digraph_node<Sub_sequence>*, std::vector<tools::Digraph_node<Sub_sequence>*>&)>
2332 : update_tasks_id_recursive =
2333 19500 : [&update_tasks_id_recursive](tools::Digraph_node<Sub_sequence>* cur_node,
2334 4008 : std::vector<tools::Digraph_node<Sub_sequence>*>& already_parsed_nodes)
2335 : {
2336 39000 : if (cur_node != nullptr &&
2337 39000 : std::find(already_parsed_nodes.begin(), already_parsed_nodes.end(), cur_node) == already_parsed_nodes.end())
2338 : {
2339 18714 : already_parsed_nodes.push_back(cur_node);
2340 18714 : Sub_sequence* ss = cur_node->get_contents();
2341 18714 : ss->tasks_id.resize(ss->tasks.size());
2342 18714 : std::iota(ss->tasks_id.begin(), ss->tasks_id.end(), ss->tasks_id.front());
2343 :
2344 22722 : for (auto c : cur_node->get_children())
2345 4008 : update_tasks_id_recursive(c, already_parsed_nodes);
2346 : }
2347 34992 : };
2348 :
2349 15492 : std::vector<tools::Digraph_node<Sub_sequence>*> already_parsed_nodes;
2350 30984 : return update_tasks_id_recursive(this->sequences[tid], already_parsed_nodes);
2351 15492 : }
2352 :
2353 : std::vector<runtime::Task*>
2354 6642 : Sequence::get_tasks_from_id(const size_t taid)
2355 : {
2356 : std::function<void(tools::Digraph_node<Sub_sequence>*,
2357 : const size_t,
2358 : std::vector<runtime::Task*>&,
2359 : std::vector<tools::Digraph_node<Sub_sequence>*>&)>
2360 : get_tasks_from_id_recursive =
2361 84414 : [&get_tasks_from_id_recursive](tools::Digraph_node<Sub_sequence>* cur_node,
2362 : const size_t taid,
2363 : std::vector<runtime::Task*>& tasks,
2364 31142 : std::vector<tools::Digraph_node<Sub_sequence>*>& already_parsed_nodes)
2365 : {
2366 168828 : if (cur_node != nullptr &&
2367 168828 : std::find(already_parsed_nodes.begin(), already_parsed_nodes.end(), cur_node) == already_parsed_nodes.end())
2368 : {
2369 79399 : already_parsed_nodes.push_back(cur_node);
2370 79399 : Sub_sequence* ss = cur_node->get_contents();
2371 79399 : bool found = false;
2372 217981 : for (size_t t = 0; t < ss->tasks_id.size(); t++)
2373 191854 : if (ss->tasks_id[t] == taid)
2374 : {
2375 53272 : tasks.push_back(ss->tasks[t]);
2376 53272 : found = true;
2377 53272 : break;
2378 : }
2379 :
2380 79399 : if (!found)
2381 57269 : for (auto c : cur_node->get_children())
2382 31142 : get_tasks_from_id_recursive(c, taid, tasks, already_parsed_nodes);
2383 : }
2384 91056 : };
2385 :
2386 6642 : std::vector<runtime::Task*> tasks;
2387 59914 : for (auto& s : this->sequences)
2388 : {
2389 53272 : std::vector<tools::Digraph_node<Sub_sequence>*> already_parsed_nodes;
2390 53272 : get_tasks_from_id_recursive(s, taid, tasks, already_parsed_nodes);
2391 53272 : }
2392 13284 : return tasks;
2393 6642 : }
2394 :
2395 : void
2396 2580 : Sequence::update_firsts_and_lasts_tasks()
2397 : {
2398 2580 : this->firsts_tasks.clear();
2399 2580 : this->firsts_tasks.resize(this->n_threads);
2400 5729 : for (auto taid : firsts_tasks_id)
2401 : {
2402 3149 : auto tasks = this->get_tasks_from_id(taid);
2403 28085 : for (size_t tid = 0; tid < tasks.size(); tid++)
2404 24936 : firsts_tasks[tid].push_back(tasks[tid]);
2405 3149 : }
2406 :
2407 2580 : this->lasts_tasks.clear();
2408 2580 : this->lasts_tasks.resize(this->n_threads);
2409 6073 : for (auto taid : lasts_tasks_id)
2410 : {
2411 3493 : auto tasks = this->get_tasks_from_id(taid);
2412 31829 : for (size_t tid = 0; tid < tasks.size(); tid++)
2413 28336 : lasts_tasks[tid].push_back(tasks[tid]);
2414 3493 : }
2415 2580 : }
2416 :
2417 : void
2418 278 : Sequence::_set_n_frames_unbind(std::vector<std::pair<runtime::Socket*, runtime::Socket*>>& unbind_sockets,
2419 : std::vector<std::pair<runtime::Task*, runtime::Socket*>>& unbind_tasks)
2420 : {
2421 : std::function<void(const std::vector<runtime::Task*>&,
2422 : tools::Digraph_node<Sub_sequence>*,
2423 : std::vector<tools::Digraph_node<Sub_sequence>*>&)>
2424 25028 : graph_traversal = [&graph_traversal, &unbind_sockets, &unbind_tasks](
2425 : const std::vector<runtime::Task*>& possessed_tsks,
2426 : tools::Digraph_node<Sub_sequence>* cur_node,
2427 69254 : std::vector<tools::Digraph_node<Sub_sequence>*>& already_parsed_nodes)
2428 : {
2429 50056 : if (cur_node != nullptr &&
2430 50056 : std::find(already_parsed_nodes.begin(), already_parsed_nodes.end(), cur_node) == already_parsed_nodes.end())
2431 : {
2432 20724 : already_parsed_nodes.push_back(cur_node);
2433 20724 : Sub_sequence* ss = cur_node->get_contents();
2434 67332 : for (auto tsk_out : ss->tasks)
2435 : {
2436 182436 : for (auto sck_out : tsk_out->sockets)
2437 : {
2438 135828 : if (sck_out->get_type() == socket_t::SOUT || sck_out->get_type() == socket_t::SFWD)
2439 : {
2440 93718 : auto sck_out_bound_sockets_cpy = sck_out->get_bound_sockets();
2441 142988 : for (auto sck_in : sck_out_bound_sockets_cpy)
2442 : {
2443 49270 : auto tsk_in = &sck_in->get_task();
2444 : // if the task of the current input socket is in the tasks of the sequence
2445 49270 : if (std::find(possessed_tsks.begin(), possessed_tsks.end(), tsk_in) != possessed_tsks.end())
2446 : {
2447 : try
2448 : {
2449 49270 : tsk_in->unbind(*sck_out);
2450 4054 : unbind_tasks.push_back(std::make_pair(tsk_in, sck_out.get()));
2451 : }
2452 45216 : catch (...)
2453 : {
2454 45216 : sck_in->unbind(*sck_out);
2455 : // memorize the unbinds to rebind after!
2456 45216 : unbind_sockets.push_back(std::make_pair(sck_in, sck_out.get()));
2457 45216 : }
2458 : }
2459 : }
2460 93718 : }
2461 135828 : }
2462 : }
2463 40708 : for (auto c : cur_node->get_children())
2464 19984 : graph_traversal(possessed_tsks, c, already_parsed_nodes);
2465 : }
2466 25306 : };
2467 :
2468 278 : auto tsks_per_threads = this->get_tasks_per_threads();
2469 278 : std::vector<tools::Digraph_node<Sub_sequence>*> already_parsed_nodes;
2470 5322 : for (size_t t = 0; t < this->get_n_threads(); t++)
2471 : {
2472 5044 : already_parsed_nodes.clear();
2473 5044 : graph_traversal(tsks_per_threads[t], this->sequences[t], already_parsed_nodes);
2474 : }
2475 278 : }
2476 :
2477 : void
2478 278 : Sequence::_set_n_frames(const size_t n_frames)
2479 : {
2480 278 : if (n_frames <= 0)
2481 : {
2482 0 : std::stringstream message;
2483 0 : message << "'n_frames' has to be greater than 0 ('n_frames' = " << n_frames << ").";
2484 0 : throw tools::invalid_argument(__FILE__, __LINE__, __func__, message.str());
2485 0 : }
2486 :
2487 : // set_n_frames for all the modules (be aware that this operation can fail)
2488 5322 : for (auto& mm : this->all_modules)
2489 49266 : for (auto& m : mm)
2490 44222 : m->set_n_frames(n_frames);
2491 278 : }
2492 :
2493 : void
2494 278 : Sequence::_set_n_frames_rebind(const std::vector<std::pair<runtime::Socket*, runtime::Socket*>>& unbind_sockets,
2495 : const std::vector<std::pair<runtime::Task*, runtime::Socket*>>& unbind_tasks)
2496 : {
2497 : // rebind the sockets
2498 45494 : for (auto& u : unbind_sockets)
2499 45216 : (*u.first) = *u.second;
2500 :
2501 : // rebind the tasks
2502 4332 : for (auto& u : unbind_tasks)
2503 4054 : (*u.first) = *u.second;
2504 278 : }
2505 :
2506 : void
2507 276 : Sequence::set_n_frames(const size_t n_frames)
2508 : {
2509 276 : const auto old_n_frames = this->get_n_frames();
2510 276 : if (old_n_frames != n_frames)
2511 : {
2512 190 : std::vector<std::pair<runtime::Socket*, runtime::Socket*>> unbind_sockets;
2513 190 : std::vector<std::pair<runtime::Task*, runtime::Socket*>> unbind_tasks;
2514 190 : this->_set_n_frames_unbind(unbind_sockets, unbind_tasks);
2515 190 : this->_set_n_frames(n_frames);
2516 190 : this->_set_n_frames_rebind(unbind_sockets, unbind_tasks);
2517 190 : }
2518 276 : }
2519 :
2520 : bool
2521 11 : Sequence::is_control_flow() const
2522 : {
2523 11 : return this->sequences[0]->get_children().size();
2524 : }
2525 :
2526 : // /!\ this check has been commented because it is known to do not work in the general case
2527 : /*
2528 : template<class SS>
2529 : void Sequence
2530 : ::check_ctrl_flow(tools::Digraph_node<SS>* root)
2531 : {
2532 : std::function<void(tools::Digraph_node<SS>*,
2533 : std::vector<tools::Digraph_node<SS>*>&)> check_control_flow_parity =
2534 : [&check_control_flow_parity](tools::Digraph_node<SS>* cur_node,
2535 : std::vector<tools::Digraph_node<SS>*> already_parsed_nodes) -> void
2536 : {
2537 : if (cur_node != nullptr &&
2538 : std::find(already_parsed_nodes.begin(),
2539 : already_parsed_nodes.end(),
2540 : cur_node) == already_parsed_nodes.end() &&
2541 : cur_node->get_children().size())
2542 : {
2543 : already_parsed_nodes.push_back(cur_node);
2544 : for (auto c : cur_node->get_children())
2545 : check_control_flow_parity(c, already_parsed_nodes);
2546 : }
2547 : else
2548 : {
2549 : already_parsed_nodes.push_back(cur_node);
2550 : std::vector<module::Module*> parsed_switchers;
2551 : for (size_t i = 0; i < already_parsed_nodes.size(); i++)
2552 : {
2553 : // This check occurs before dud-nodes are removed by _init, some nodes have no
2554 : contents and must be
2555 : // accounted for
2556 : if (already_parsed_nodes[i]->get_c() == nullptr ||
2557 : !(already_parsed_nodes[i]->get_c()->type == subseq_t::COMMUTE ||
2558 : already_parsed_nodes[i]->get_c()->type == subseq_t::SELECT))
2559 : continue;
2560 :
2561 : // We search for the first switcher task in the path taken: already_parsed_nodes
2562 : const runtime::Task *ctrl_task_first = nullptr;
2563 : const runtime::Task *ctrl_task_second = nullptr;
2564 : for (auto t : already_parsed_nodes[i]->get_c()->tasks)
2565 : {
2566 : if (dynamic_cast<const module::Switcher*>(&t->get_module()) &&
2567 : (t->get_name() == "select" || t->get_name() == "commute"))
2568 : {
2569 : ctrl_task_first = t;
2570 : break;
2571 : }
2572 : }
2573 :
2574 : if (std::find(parsed_switchers.begin(), parsed_switchers.end(),
2575 : &(ctrl_task_first->get_module())) != parsed_switchers.end()) continue;
2576 :
2577 : // We now search for the second switcher task in the path taken
2578 : auto expected_type = ctrl_task_first->get_name() == "select" ? subseq_t::COMMUTE
2579 : : subseq_t::SELECT; for (size_t j = i; j < already_parsed_nodes.size() && ctrl_task_second == nullptr; j++)
2580 : {
2581 : if (already_parsed_nodes[j]->get_c() == nullptr ||
2582 : already_parsed_nodes[j]->get_c()->type != expected_type)
2583 : continue;
2584 : for (auto t : already_parsed_nodes[j]->get_c()->tasks)
2585 : {
2586 : if ((t->get_name() == "select" || t->get_name() == "commute") &&
2587 : &(ctrl_task_first->get_module()) == &(t->get_module()))
2588 : {
2589 : parsed_switchers.push_back(&(t->get_module()));
2590 : ctrl_task_second = t;
2591 : break;
2592 : }
2593 : }
2594 : }
2595 :
2596 : if (ctrl_task_second == nullptr)
2597 : {
2598 : for (auto t : ctrl_task_first->get_module().tasks)
2599 : {
2600 : if ((ctrl_task_first->get_name() == "select" && t->get_name() ==
2601 : "commute") || (ctrl_task_first->get_name() == "commute" && t->get_name() == "select"))
2602 : {
2603 : ctrl_task_second = t.get();
2604 : break;
2605 : }
2606 : }
2607 : std::stringstream message;
2608 : message << ctrl_task_first->get_name() << " is missing a path to "
2609 : << ctrl_task_second->get_name() << ".";
2610 : throw tools::control_flow_error(__FILE__, __LINE__, __func__,
2611 : message.str());
2612 : }
2613 : }
2614 : }
2615 : };
2616 :
2617 : std::vector<tools::Digraph_node<SS>*> already_parsed_nodes;
2618 : return check_control_flow_parity(root, already_parsed_nodes);
2619 : }
2620 : */
|