Line data Source code
1 : #include <algorithm>
2 : #include <cstring>
3 : #include <exception>
4 : #include <fstream>
5 : #include <numeric>
6 : #include <set>
7 : #include <sstream>
8 : #include <thread>
9 : #include <utility>
10 :
11 : #include "Module/Module.hpp"
12 : #include "Module/Stateful/Adaptor/Adaptor.hpp"
13 : #include "Module/Stateful/Probe/Probe.hpp"
14 : #include "Module/Stateful/Switcher/Switcher.hpp"
15 : #include "Runtime/Sequence/Sequence.hpp"
16 : #include "Runtime/Socket/Socket.hpp"
17 : #include "Runtime/Task/Task.hpp"
18 : #include "Tools/Display/rang_format/rang_format.h"
19 : #include "Tools/Exception/exception.hpp"
20 : #include "Tools/Signal_handler/Signal_handler.hpp"
21 : #include "Tools/Thread_pinning/Thread_pinning.hpp"
22 : #include "Tools/Thread_pinning/Thread_pinning_utils.hpp"
23 :
24 : using namespace spu;
25 : using namespace spu::runtime;
26 :
27 0 : Sequence::Sequence(const std::vector<const runtime::Task*>& firsts,
28 : const std::vector<const runtime::Task*>& lasts,
29 : const std::vector<const runtime::Task*>& exclusions,
30 : const size_t n_threads,
31 : const bool thread_pinning,
32 0 : const std::vector<size_t>& puids)
33 0 : : n_threads(n_threads)
34 0 : , sequences(n_threads, nullptr)
35 0 : , modules(n_threads)
36 0 : , all_modules(n_threads)
37 0 : , mtx_exception(new std::mutex())
38 0 : , force_exit_loop(new std::atomic<bool>(false))
39 0 : , tasks_inplace(false)
40 0 : , thread_pinning(thread_pinning)
41 0 : , puids(puids)
42 0 : , no_copy_mode(true)
43 0 : , saved_exclusions(exclusions)
44 0 : , switchers_reset(n_threads)
45 0 : , auto_stop(true)
46 0 : , is_part_of_pipeline(false)
47 0 : , next_round_is_over(n_threads, false)
48 0 : , cur_task_id(n_threads, 0)
49 0 : , cur_ss(n_threads, nullptr)
50 : {
51 : #ifndef SPU_HWLOC
52 : if (thread_pinning)
53 : std::clog << rang::tag::warning
54 : << "StreamPU has not been linked with the 'hwloc' library, the 'thread_pinning' "
55 : "option of the 'runtime::Sequence' will have no effect."
56 : << std::endl;
57 : #endif
58 :
59 0 : if (thread_pinning && puids.size() < n_threads)
60 : {
61 0 : std::stringstream message;
62 0 : message << "'puids.size()' has to be greater or equal to 'n_threads' ('puids.size()' = " << puids.size()
63 0 : << " , 'n_threads' = " << n_threads << ").";
64 0 : throw tools::invalid_argument(__FILE__, __LINE__, __func__, message.str());
65 0 : }
66 :
67 0 : this->init<runtime::Sub_sequence_const, const runtime::Task>(firsts, lasts, exclusions);
68 0 : }
69 :
70 0 : Sequence::Sequence(const std::vector<const runtime::Task*>& firsts,
71 : const std::vector<const runtime::Task*>& lasts,
72 : const size_t n_threads,
73 : const bool thread_pinning,
74 0 : const std::vector<size_t>& puids)
75 0 : : Sequence(firsts, lasts, {}, n_threads, thread_pinning, puids)
76 : {
77 0 : }
78 :
79 0 : Sequence::Sequence(const std::vector<const runtime::Task*>& firsts,
80 : const size_t n_threads,
81 : const bool thread_pinning,
82 0 : const std::vector<size_t>& puids)
83 0 : : Sequence(firsts, {}, {}, n_threads, thread_pinning, puids)
84 : {
85 0 : }
86 :
87 0 : Sequence::Sequence(const runtime::Task& first,
88 : const runtime::Task& last,
89 : const size_t n_threads,
90 : const bool thread_pinning,
91 0 : const std::vector<size_t>& puids)
92 0 : : Sequence({ &first }, { &last }, n_threads, thread_pinning, puids)
93 : {
94 0 : }
95 :
96 0 : Sequence::Sequence(const runtime::Task& first,
97 : const size_t n_threads,
98 : const bool thread_pinning,
99 0 : const std::vector<size_t>& puids)
100 0 : : Sequence({ &first }, n_threads, thread_pinning, puids)
101 : {
102 0 : }
103 :
104 : std::vector<const runtime::Task*>
105 464 : exclusions_convert_to_const(const std::vector<runtime::Task*>& exclusions)
106 : {
107 464 : std::vector<const runtime::Task*> exclusions_const;
108 501 : for (auto exception : exclusions)
109 37 : exclusions_const.push_back(exception);
110 464 : return exclusions_const;
111 0 : }
112 :
113 430 : Sequence::Sequence(const std::vector<runtime::Task*>& firsts,
114 : const std::vector<runtime::Task*>& lasts,
115 : const std::vector<runtime::Task*>& exclusions,
116 : const size_t n_threads,
117 : const bool thread_pinning,
118 : const std::vector<size_t>& puids,
119 430 : const bool tasks_inplace)
120 430 : : n_threads(n_threads)
121 430 : , sequences(n_threads, nullptr)
122 430 : , modules(tasks_inplace ? n_threads - 1 : n_threads)
123 430 : , all_modules(n_threads)
124 430 : , mtx_exception(new std::mutex())
125 430 : , force_exit_loop(new std::atomic<bool>(false))
126 430 : , tasks_inplace(tasks_inplace)
127 430 : , thread_pinning(thread_pinning)
128 430 : , puids(puids)
129 430 : , no_copy_mode(true)
130 430 : , saved_exclusions(exclusions_convert_to_const(exclusions))
131 430 : , switchers_reset(n_threads)
132 430 : , auto_stop(true)
133 430 : , is_part_of_pipeline(false)
134 430 : , next_round_is_over(n_threads, false)
135 430 : , cur_task_id(n_threads, 0)
136 2150 : , cur_ss(n_threads, nullptr)
137 : {
138 430 : if (thread_pinning && puids.size() < n_threads)
139 : {
140 0 : std::stringstream message;
141 0 : message << "'puids.size()' has greater or equal to 'n_threads' ('puids.size()' = " << puids.size()
142 0 : << " , 'n_threads' = " << n_threads << ").";
143 0 : throw tools::invalid_argument(__FILE__, __LINE__, __func__, message.str());
144 0 : }
145 :
146 430 : if (tasks_inplace)
147 430 : this->init<runtime::Sub_sequence, runtime::Task>(firsts, lasts, exclusions);
148 : else
149 : {
150 0 : std::vector<const runtime::Task*> firsts_bis;
151 0 : for (auto first : firsts)
152 0 : firsts_bis.push_back(first);
153 0 : std::vector<const runtime::Task*> lasts_bis;
154 0 : for (auto last : lasts)
155 0 : lasts_bis.push_back(last);
156 0 : std::vector<const runtime::Task*> exclusions_bis;
157 0 : for (auto exception : exclusions)
158 0 : exclusions_bis.push_back(exception);
159 0 : this->init<runtime::Sub_sequence_const, const runtime::Task>(firsts_bis, lasts_bis, exclusions_bis);
160 0 : }
161 430 : }
162 :
163 81 : Sequence::Sequence(const std::vector<runtime::Task*>& firsts,
164 : const std::vector<runtime::Task*>& lasts,
165 : const size_t n_threads,
166 : const bool thread_pinning,
167 : const std::vector<size_t>& puids,
168 81 : const bool tasks_inplace)
169 81 : : Sequence(firsts, lasts, {}, n_threads, thread_pinning, puids, tasks_inplace)
170 : {
171 81 : }
172 :
173 101 : Sequence::Sequence(const std::vector<runtime::Task*>& firsts,
174 : const size_t n_threads,
175 : const bool thread_pinning,
176 : const std::vector<size_t>& puids,
177 101 : const bool tasks_inplace)
178 101 : : Sequence(firsts, {}, {}, n_threads, thread_pinning, puids, tasks_inplace)
179 : {
180 101 : }
181 :
182 1 : Sequence::Sequence(runtime::Task& first,
183 : runtime::Task& last,
184 : const size_t n_threads,
185 : const bool thread_pinning,
186 : const std::vector<size_t>& puids,
187 1 : const bool tasks_inplace)
188 1 : : Sequence({ &first }, { &last }, n_threads, thread_pinning, puids, tasks_inplace)
189 : {
190 1 : }
191 :
192 101 : Sequence::Sequence(runtime::Task& first,
193 : const size_t n_threads,
194 : const bool thread_pinning,
195 : const std::vector<size_t>& puids,
196 101 : const bool tasks_inplace)
197 101 : : Sequence({ &first }, n_threads, thread_pinning, puids, tasks_inplace)
198 : {
199 101 : }
200 : //=======================================New pinning version constructors===============================================
201 0 : Sequence::Sequence(const std::vector<const runtime::Task*>& firsts,
202 : const std::vector<const runtime::Task*>& lasts,
203 : const std::vector<const runtime::Task*>& exclusions,
204 : const size_t n_threads,
205 : const bool thread_pinning,
206 0 : const std::string& sequence_pinning_policy)
207 0 : : n_threads(n_threads)
208 0 : , sequences(n_threads, nullptr)
209 0 : , modules(n_threads)
210 0 : , all_modules(n_threads)
211 0 : , mtx_exception(new std::mutex())
212 0 : , force_exit_loop(new std::atomic<bool>(false))
213 0 : , tasks_inplace(false)
214 0 : , thread_pinning(thread_pinning)
215 0 : , puids({})
216 0 : , no_copy_mode(true)
217 0 : , saved_exclusions(exclusions)
218 0 : , switchers_reset(n_threads)
219 0 : , auto_stop(true)
220 0 : , is_part_of_pipeline(false)
221 0 : , next_round_is_over(n_threads, false)
222 0 : , cur_task_id(n_threads, 0)
223 0 : , cur_ss(n_threads, nullptr)
224 : {
225 : #ifndef SPU_HWLOC
226 : if (thread_pinning)
227 : std::clog << rang::tag::warning
228 : << "StreamPU has not been linked with the 'hwloc' library, the 'thread_pinning' "
229 : "option of the 'runtime::Sequence' will have no effect."
230 : << std::endl;
231 : #endif
232 :
233 0 : if (thread_pinning && !sequence_pinning_policy.empty())
234 : {
235 0 : pin_objects_per_thread = tools::Thread_pinning_utils::stage_parser_unpacker(sequence_pinning_policy, n_threads);
236 : }
237 0 : else if (thread_pinning && sequence_pinning_policy.empty())
238 : {
239 0 : std::stringstream message;
240 0 : message << "Pinning is activated but there is no specified policy." << std::endl;
241 0 : throw tools::invalid_argument(__FILE__, __LINE__, __func__, message.str());
242 0 : }
243 :
244 0 : this->init<runtime::Sub_sequence_const, const runtime::Task>(firsts, lasts, exclusions);
245 0 : }
246 :
247 0 : Sequence::Sequence(const std::vector<const runtime::Task*>& firsts,
248 : const std::vector<const runtime::Task*>& lasts,
249 : const size_t n_threads,
250 : const bool thread_pinning,
251 0 : const std::string& sequence_pinning_policy)
252 0 : : Sequence(firsts, lasts, {}, n_threads, thread_pinning, sequence_pinning_policy)
253 : {
254 0 : }
255 :
256 0 : Sequence::Sequence(const std::vector<const runtime::Task*>& firsts,
257 : const size_t n_threads,
258 : const bool thread_pinning,
259 0 : const std::string& sequence_pinning_policy)
260 0 : : Sequence(firsts, {}, {}, n_threads, thread_pinning, sequence_pinning_policy)
261 : {
262 0 : }
263 :
264 0 : Sequence::Sequence(const runtime::Task& first,
265 : const runtime::Task& last,
266 : const size_t n_threads,
267 : const bool thread_pinning,
268 0 : const std::string& sequence_pinning_policy)
269 0 : : Sequence({ &first }, { &last }, n_threads, thread_pinning, sequence_pinning_policy)
270 : {
271 0 : }
272 :
273 0 : Sequence::Sequence(const runtime::Task& first,
274 : const size_t n_threads,
275 : const bool thread_pinning,
276 0 : const std::string& sequence_pinning_policy)
277 0 : : Sequence({ &first }, n_threads, thread_pinning, sequence_pinning_policy)
278 : {
279 0 : }
280 :
281 34 : Sequence::Sequence(const std::vector<runtime::Task*>& firsts,
282 : const std::vector<runtime::Task*>& lasts,
283 : const std::vector<runtime::Task*>& exclusions,
284 : const size_t n_threads,
285 : const bool thread_pinning,
286 : const std::string& sequence_pinning_policy,
287 34 : const bool tasks_inplace)
288 34 : : n_threads(n_threads)
289 34 : , sequences(n_threads, nullptr)
290 34 : , modules(tasks_inplace ? n_threads - 1 : n_threads)
291 34 : , all_modules(n_threads)
292 34 : , mtx_exception(new std::mutex())
293 34 : , force_exit_loop(new std::atomic<bool>(false))
294 34 : , tasks_inplace(tasks_inplace)
295 34 : , thread_pinning(thread_pinning)
296 34 : , puids({})
297 34 : , no_copy_mode(true)
298 34 : , saved_exclusions(exclusions_convert_to_const(exclusions))
299 34 : , switchers_reset(n_threads)
300 34 : , auto_stop(true)
301 34 : , is_part_of_pipeline(false)
302 34 : , next_round_is_over(n_threads, false)
303 34 : , cur_task_id(n_threads, 0)
304 170 : , cur_ss(n_threads, nullptr)
305 : {
306 34 : if (thread_pinning && !sequence_pinning_policy.empty())
307 : {
308 6 : pin_objects_per_thread = tools::Thread_pinning_utils::stage_parser_unpacker(sequence_pinning_policy, n_threads);
309 : }
310 28 : else if (thread_pinning && sequence_pinning_policy.empty())
311 : {
312 0 : std::stringstream message;
313 0 : message << "Pinning is activated but there is no specified policy." << std::endl;
314 0 : throw tools::invalid_argument(__FILE__, __LINE__, __func__, message.str());
315 0 : }
316 :
317 34 : if (tasks_inplace)
318 34 : this->init<runtime::Sub_sequence, runtime::Task>(firsts, lasts, exclusions);
319 : else
320 : {
321 0 : std::vector<const runtime::Task*> firsts_bis;
322 0 : for (auto first : firsts)
323 0 : firsts_bis.push_back(first);
324 0 : std::vector<const runtime::Task*> lasts_bis;
325 :
326 0 : for (auto last : lasts)
327 0 : lasts_bis.push_back(last);
328 0 : std::vector<const runtime::Task*> exclusions_bis;
329 :
330 0 : for (auto exception : exclusions)
331 0 : exclusions_bis.push_back(exception);
332 0 : this->init<runtime::Sub_sequence_const, const runtime::Task>(firsts_bis, lasts_bis, exclusions_bis);
333 0 : }
334 34 : }
335 :
336 0 : Sequence::Sequence(const std::vector<runtime::Task*>& firsts,
337 : const std::vector<runtime::Task*>& lasts,
338 : const size_t n_threads,
339 : const bool thread_pinning,
340 : const std::string& sequence_pinning_policy,
341 0 : const bool tasks_inplace)
342 0 : : Sequence(firsts, lasts, {}, n_threads, thread_pinning, sequence_pinning_policy, tasks_inplace)
343 : {
344 0 : }
345 :
346 0 : Sequence::Sequence(const std::vector<runtime::Task*>& firsts,
347 : const size_t n_threads,
348 : const bool thread_pinning,
349 : const std::string& sequence_pinning_policy,
350 0 : const bool tasks_inplace)
351 0 : : Sequence(firsts, {}, {}, n_threads, thread_pinning, sequence_pinning_policy, tasks_inplace)
352 : {
353 0 : }
354 :
355 0 : Sequence::Sequence(runtime::Task& first,
356 : runtime::Task& last,
357 : const size_t n_threads,
358 : const bool thread_pinning,
359 : const std::string& sequence_pinning_policy,
360 0 : const bool tasks_inplace)
361 0 : : Sequence({ &first }, { &last }, n_threads, thread_pinning, sequence_pinning_policy, tasks_inplace)
362 : {
363 0 : }
364 :
365 0 : Sequence::Sequence(runtime::Task& first,
366 : const size_t n_threads,
367 : const bool thread_pinning,
368 : const std::string& sequence_pinning_policy,
369 0 : const bool tasks_inplace)
370 0 : : Sequence({ &first }, n_threads, thread_pinning, sequence_pinning_policy, tasks_inplace)
371 : {
372 0 : }
373 :
374 : // ====================================================================================================================
375 :
376 1402 : Sequence::~Sequence()
377 : {
378 512 : std::vector<tools::Digraph_node<Sub_sequence>*> already_deleted_nodes;
379 5355 : for (auto s : this->sequences)
380 4843 : this->delete_tree(s, already_deleted_nodes);
381 890 : }
382 :
383 : template<class SS, class TA>
384 : void
385 512 : Sequence::init(const std::vector<TA*>& firsts, const std::vector<TA*>& lasts, const std::vector<TA*>& exclusions)
386 : {
387 512 : if (this->is_thread_pinning())
388 : {
389 6 : if (!this->puids.empty())
390 0 : tools::Thread_pinning::pin(this->puids[0]);
391 : else
392 6 : tools::Thread_pinning::pin(this->pin_objects_per_thread[0]);
393 : }
394 :
395 512 : if (firsts.size() == 0)
396 : {
397 0 : std::stringstream message;
398 0 : message << "'firsts.size()' has to be strictly greater than 0.";
399 0 : throw tools::invalid_argument(__FILE__, __LINE__, __func__, message.str());
400 0 : }
401 :
402 512 : if (this->n_threads == 0)
403 : {
404 0 : std::stringstream message;
405 0 : message << "'n_threads' has to be strictly greater than 0.";
406 0 : throw tools::invalid_argument(__FILE__, __LINE__, __func__, message.str());
407 0 : }
408 :
409 549 : for (auto exclusion : exclusions)
410 : {
411 37 : if (std::find(firsts.begin(), firsts.end(), exclusion) != firsts.end())
412 : {
413 0 : std::stringstream message;
414 : message << "'exclusion' can't be contained in the 'firsts' vector ("
415 : << "'exclusion'"
416 0 : << " = " << +exclusion << ", "
417 : << "'exclusion->get_name()'"
418 0 : << " = " << exclusion->get_name() << ").";
419 0 : throw tools::invalid_argument(__FILE__, __LINE__, __func__, message.str());
420 0 : }
421 :
422 37 : if (std::find(lasts.begin(), lasts.end(), exclusion) != lasts.end())
423 : {
424 0 : std::stringstream message;
425 : message << "'exclusion' can't be contained in the 'lasts' vector ("
426 : << "'exclusion'"
427 0 : << " = " << +exclusion << ", "
428 : << "'exclusion->get_name()'"
429 0 : << " = " << exclusion->get_name() << ").";
430 0 : throw tools::invalid_argument(__FILE__, __LINE__, __func__, message.str());
431 0 : }
432 : }
433 908 : for (auto t : lasts)
434 : {
435 396 : if (dynamic_cast<const module::Switcher*>(&t->get_module()) && t->get_name() == "commute")
436 : {
437 0 : std::stringstream message;
438 0 : message << "A sequence cannot end with a 'commute' task.";
439 0 : throw tools::invalid_argument(__FILE__, __LINE__, __func__, message.str());
440 0 : }
441 : }
442 :
443 512 : auto root = new tools::Digraph_node<SS>({}, {}, nullptr, 0);
444 512 : root->set_contents(nullptr);
445 512 : size_t ssid = 0, taid = 0;
446 512 : std::vector<TA*> switchers;
447 512 : std::vector<std::pair<TA*, tools::Digraph_node<SS>*>> selectors;
448 512 : std::vector<TA*> real_lasts;
449 :
450 512 : this->lasts_tasks_id.clear();
451 512 : this->firsts_tasks_id.clear();
452 512 : auto last_subseq = root;
453 512 : std::map<TA*, unsigned> in_sockets_feed;
454 1176 : for (auto first : firsts)
455 : {
456 664 : std::map<TA*, std::pair<tools::Digraph_node<SS>*, size_t>> task_subseq;
457 664 : auto contents = last_subseq->get_contents();
458 664 : this->firsts_tasks_id.push_back(contents ? contents->tasks_id[contents->tasks_id.size() - 1] : 0);
459 664 : last_subseq = this->init_recursive<SS, TA>(last_subseq,
460 : ssid,
461 : taid,
462 : selectors,
463 : switchers,
464 : *first,
465 : *first,
466 : lasts,
467 : exclusions,
468 664 : this->lasts_tasks_id,
469 : real_lasts,
470 : in_sockets_feed,
471 : task_subseq);
472 : }
473 :
474 512 : std::stringstream real_lasts_ss;
475 1370 : for (size_t rl = 0; rl < real_lasts.size(); rl++)
476 : real_lasts_ss << "'real_lasts"
477 858 : << "[" << rl << "]'"
478 858 : << " = " << +real_lasts[rl] << ", "
479 : << "'real_lasts"
480 858 : << "[" << rl << "]->get_name()'"
481 858 : << " = " << real_lasts[rl]->get_name() << ((rl < real_lasts.size() - 1) ? ", " : "");
482 :
483 908 : for (auto last : lasts)
484 : {
485 396 : if (std::find(real_lasts.begin(), real_lasts.end(), last) == real_lasts.end())
486 : {
487 0 : std::stringstream message;
488 0 : message << "'last' is not contained in the 'real_lasts[" << real_lasts.size() << "]' vector ("
489 : << "'last'"
490 0 : << " = " << +last << ", "
491 : << "'last->get_name()'"
492 0 : << " = " << last->get_name() << ", " << real_lasts_ss.str() << ").";
493 0 : throw tools::runtime_error(__FILE__, __LINE__, __func__, message.str());
494 0 : }
495 : }
496 :
497 512 : this->n_tasks = taid;
498 : // this->check_ctrl_flow(root); // /!\ this check has been commented because it is known to do not work in the
499 : // general case
500 512 : this->_init<SS>(root);
501 512 : this->update_firsts_and_lasts_tasks();
502 512 : this->gen_processes();
503 512 : this->donners = get_modules<tools::Interface_is_done>(true);
504 :
505 5355 : for (size_t tid = 0; tid < this->n_threads; tid++)
506 39320 : for (auto& mdl : this->all_modules[tid])
507 34477 : if (auto swi = dynamic_cast<module::Switcher*>(mdl))
508 2143 : this->switchers_reset[tid].push_back(dynamic_cast<tools::Interface_reset*>(swi));
509 :
510 5355 : for (size_t tid = 0; tid < this->sequences.size(); tid++)
511 4843 : this->cur_ss[tid] = this->sequences[tid];
512 512 : }
513 :
514 : Sequence*
515 48 : Sequence::clone() const
516 : {
517 48 : auto c = new Sequence(*this);
518 :
519 48 : c->tasks_inplace = false;
520 48 : c->modules.resize(c->get_n_threads());
521 :
522 48 : std::vector<const runtime::Task*> firsts_tasks;
523 96 : for (auto ta : this->get_firsts_tasks()[0])
524 48 : firsts_tasks.push_back(ta);
525 :
526 48 : std::vector<const runtime::Task*> lasts_tasks;
527 96 : for (auto ta : this->get_lasts_tasks()[0])
528 48 : lasts_tasks.push_back(ta);
529 :
530 48 : c->init<runtime::Sub_sequence_const, const runtime::Task>(firsts_tasks, lasts_tasks, this->saved_exclusions);
531 48 : c->mtx_exception.reset(new std::mutex());
532 48 : c->force_exit_loop.reset(new std::atomic<bool>(false));
533 48 : return c;
534 48 : }
535 :
536 : void
537 0 : Sequence::set_thread_pinning(const bool thread_pinning, const std::vector<size_t>& puids)
538 : {
539 0 : if (thread_pinning && puids.size() < n_threads)
540 : {
541 0 : std::stringstream message;
542 0 : message << "'puids.size()' has greater or equal to 'n_threads' ('puids.size()' = " << puids.size()
543 0 : << " , 'n_threads' = " << n_threads << ").";
544 0 : throw tools::invalid_argument(__FILE__, __LINE__, __func__, message.str());
545 0 : }
546 :
547 0 : this->thread_pinning = thread_pinning;
548 0 : this->puids = puids;
549 0 : this->pin_objects_per_thread = {};
550 0 : }
551 :
552 : void
553 0 : Sequence::set_thread_pinning(const bool thread_pinning, const std::string& sequence_pinning_policy)
554 : {
555 0 : this->thread_pinning = thread_pinning;
556 0 : this->puids = {};
557 : this->pin_objects_per_thread =
558 0 : tools::Thread_pinning_utils::stage_parser_unpacker(sequence_pinning_policy, n_threads);
559 0 : }
560 :
561 : bool
562 51141 : Sequence::is_thread_pinning()
563 : {
564 51141 : return this->thread_pinning;
565 : }
566 :
567 : std::vector<std::vector<module::Module*>>
568 0 : Sequence::get_modules_per_threads() const
569 : {
570 0 : std::vector<std::vector<module::Module*>> modules_per_threads(this->all_modules.size());
571 0 : size_t tid = 0;
572 0 : for (auto& e : this->all_modules)
573 : {
574 0 : for (auto& ee : e)
575 0 : modules_per_threads[tid].push_back(ee);
576 0 : tid++;
577 : }
578 0 : return modules_per_threads;
579 0 : }
580 :
581 : std::vector<std::vector<module::Module*>>
582 2 : Sequence::get_modules_per_types() const
583 : {
584 2 : std::vector<std::vector<module::Module*>> modules_per_types(this->all_modules[0].size());
585 4 : for (auto& e : this->all_modules)
586 : {
587 2 : size_t mid = 0;
588 33 : for (auto& ee : e)
589 31 : modules_per_types[mid++].push_back(ee);
590 : }
591 2 : return modules_per_types;
592 0 : }
593 :
594 : std::vector<std::vector<runtime::Task*>>
595 4159 : Sequence::get_tasks_per_threads() const
596 : {
597 4159 : std::vector<std::vector<runtime::Task*>> tasks_per_threads(this->n_threads);
598 :
599 : std::function<void(
600 : tools::Digraph_node<Sub_sequence>*, const size_t, std::vector<tools::Digraph_node<Sub_sequence>*>&)>
601 165924 : get_tasks_recursive = [&](tools::Digraph_node<Sub_sequence>* cur_ss,
602 : const size_t tid,
603 : std::vector<tools::Digraph_node<Sub_sequence>*>& already_parsed_nodes)
604 : {
605 165924 : if (std::find(already_parsed_nodes.begin(), already_parsed_nodes.end(), cur_ss) == already_parsed_nodes.end())
606 : {
607 160955 : already_parsed_nodes.push_back(cur_ss);
608 321901 : tasks_per_threads[tid].insert(
609 160952 : tasks_per_threads[tid].end(), cur_ss->get_c()->tasks.begin(), cur_ss->get_c()->tasks.end());
610 :
611 184336 : for (auto c : cur_ss->get_children())
612 23382 : get_tasks_recursive(c, tid, already_parsed_nodes);
613 : }
614 170083 : };
615 :
616 146704 : for (size_t tid = 0; tid < this->n_threads; tid++)
617 : {
618 142543 : std::vector<tools::Digraph_node<Sub_sequence>*> already_parsed_nodes;
619 142545 : get_tasks_recursive(this->sequences[tid], tid, already_parsed_nodes);
620 142542 : }
621 :
622 8323 : return tasks_per_threads;
623 4161 : }
624 :
625 : std::vector<std::vector<runtime::Task*>>
626 22 : Sequence::get_tasks_per_types() const
627 : {
628 22 : std::vector<std::vector<runtime::Task*>> tasks_per_types(this->n_tasks);
629 :
630 : std::function<void(tools::Digraph_node<Sub_sequence>*, size_t&, std::vector<tools::Digraph_node<Sub_sequence>*>&)>
631 128 : get_tasks_recursive = [&](tools::Digraph_node<Sub_sequence>* cur_ss,
632 : size_t& mid,
633 : std::vector<tools::Digraph_node<Sub_sequence>*>& already_parsed_nodes)
634 : {
635 128 : if (std::find(already_parsed_nodes.begin(), already_parsed_nodes.end(), cur_ss) == already_parsed_nodes.end())
636 : {
637 128 : already_parsed_nodes.push_back(cur_ss);
638 1125 : for (auto& t : cur_ss->get_c()->tasks)
639 997 : tasks_per_types[mid++].push_back(t);
640 :
641 128 : for (auto c : cur_ss->get_children())
642 0 : get_tasks_recursive(c, mid, already_parsed_nodes);
643 : }
644 150 : };
645 :
646 150 : for (size_t tid = 0; tid < this->n_threads; tid++)
647 : {
648 128 : size_t mid = 0;
649 128 : std::vector<tools::Digraph_node<Sub_sequence>*> already_parsed_nodes;
650 128 : get_tasks_recursive(this->sequences[tid], mid, already_parsed_nodes);
651 128 : }
652 :
653 44 : return tasks_per_types;
654 22 : }
655 :
656 : bool
657 777794 : Sequence::is_done() const
658 : {
659 787152 : for (auto donner : this->donners)
660 9404 : if (donner->is_done()) return true;
661 778117 : return false;
662 : }
663 :
664 : void
665 0 : Sequence::_exec(const size_t tid,
666 : std::function<bool(const std::vector<const int*>&)>& stop_condition,
667 : tools::Digraph_node<Sub_sequence>* sequence)
668 : {
669 0 : tools::Signal_handler::reset_sigint();
670 :
671 0 : if (this->is_thread_pinning())
672 : {
673 0 : if (!puids.empty())
674 0 : tools::Thread_pinning::pin(this->puids[tid]);
675 : else
676 0 : tools::Thread_pinning::pin(this->pin_objects_per_thread[tid]);
677 : }
678 :
679 : std::function<void(tools::Digraph_node<Sub_sequence>*, std::vector<const int*>&)> exec_sequence =
680 0 : [&exec_sequence](tools::Digraph_node<Sub_sequence>* cur_ss, std::vector<const int*>& statuses)
681 : {
682 0 : auto type = cur_ss->get_c()->type;
683 0 : auto& tasks_id = cur_ss->get_c()->tasks_id;
684 0 : auto& processes = cur_ss->get_c()->processes;
685 :
686 0 : if (type == subseq_t::COMMUTE)
687 : {
688 0 : statuses[tasks_id[0]] = processes[0]();
689 0 : const int path = statuses[tasks_id[0]][0];
690 0 : if (cur_ss->get_children().size() > (size_t)path) exec_sequence(cur_ss->get_children()[path], statuses);
691 : }
692 : else
693 : {
694 0 : for (size_t p = 0; p < processes.size(); p++)
695 0 : statuses[tasks_id[p]] = processes[p]();
696 0 : for (auto c : cur_ss->get_children())
697 0 : exec_sequence(c, statuses);
698 : }
699 0 : };
700 :
701 0 : std::vector<const int*> statuses(this->n_tasks, nullptr);
702 : try
703 : {
704 : do
705 : {
706 : // force switchers reset to reinitialize the path to the last input socket
707 0 : for (size_t s = 0; s < this->switchers_reset[tid].size(); s++)
708 0 : this->switchers_reset[tid][s]->reset();
709 :
710 0 : std::fill(statuses.begin(), statuses.end(), nullptr);
711 : try
712 : {
713 0 : exec_sequence(sequence, statuses);
714 : }
715 0 : catch (tools::processing_aborted const&)
716 : {
717 : // do nothing, this is normal
718 0 : }
719 0 : } while (!*force_exit_loop && !stop_condition(statuses) && !tools::Signal_handler::is_sigint());
720 : }
721 0 : catch (tools::waiting_canceled const&)
722 : {
723 : // do nothing, this is normal
724 0 : }
725 0 : catch (std::exception const& e)
726 : {
727 0 : *force_exit_loop = true;
728 :
729 0 : this->mtx_exception->lock();
730 :
731 0 : auto save = tools::exception::no_stacktrace;
732 0 : tools::exception::no_stacktrace = true;
733 0 : std::string msg = e.what(); // get only the function signature
734 0 : tools::exception::no_stacktrace = save;
735 :
736 0 : if (std::find(this->prev_exception_messages.begin(), this->prev_exception_messages.end(), msg) ==
737 0 : this->prev_exception_messages.end())
738 : {
739 0 : this->prev_exception_messages.push_back(msg); // save only the function signature
740 0 : this->prev_exception_messages_to_display.push_back(e.what()); // with stacktrace if debug mode
741 : }
742 0 : else if (std::strlen(e.what()) > this->prev_exception_messages_to_display.back().size())
743 0 : this->prev_exception_messages_to_display[prev_exception_messages_to_display.size() - 1] = e.what();
744 :
745 0 : this->mtx_exception->unlock();
746 0 : }
747 :
748 0 : if (this->is_thread_pinning()) tools::Thread_pinning::unpin();
749 0 : }
750 :
751 : void
752 4025 : Sequence::_exec_without_statuses(const size_t tid,
753 : std::function<bool()>& stop_condition,
754 : tools::Digraph_node<Sub_sequence>* sequence)
755 : {
756 4025 : tools::Signal_handler::reset_sigint();
757 :
758 4023 : if (this->is_thread_pinning())
759 : {
760 10 : if (!puids.empty())
761 0 : tools::Thread_pinning::pin(this->puids[tid]);
762 : else
763 10 : tools::Thread_pinning::pin(this->pin_objects_per_thread[tid]);
764 : }
765 :
766 : std::function<void(tools::Digraph_node<Sub_sequence>*)> exec_sequence =
767 1148374 : [&exec_sequence](tools::Digraph_node<Sub_sequence>* cur_ss)
768 : {
769 961282 : auto type = cur_ss->get_c()->type;
770 960987 : auto& processes = cur_ss->get_c()->processes;
771 :
772 962793 : if (type == subseq_t::COMMUTE)
773 : {
774 52606 : const int path = processes[0]()[0];
775 52544 : if (cur_ss->get_children().size() > (size_t)path) exec_sequence(cur_ss->get_children()[path]);
776 : }
777 : else
778 : {
779 3790906 : for (auto& process : processes)
780 2857639 : process();
781 891882 : for (auto c : cur_ss->get_children())
782 134530 : exec_sequence(c);
783 : }
784 965586 : };
785 :
786 : try
787 : {
788 : do
789 : {
790 : // force switchers reset to reinitialize the path to the last input socket
791 795592 : for (size_t s = 0; s < this->switchers_reset[tid].size(); s++)
792 13117 : this->switchers_reset[tid][s]->reset();
793 :
794 : try
795 : {
796 782224 : exec_sequence(sequence);
797 : }
798 2086 : catch (tools::processing_aborted const&)
799 : {
800 : // do nothing, this is normal
801 4 : }
802 781733 : } while (!*force_exit_loop && !stop_condition() && !tools::Signal_handler::is_sigint());
803 : }
804 2071 : catch (tools::waiting_canceled const&)
805 : {
806 : // do nothing, this is normal
807 2072 : }
808 0 : catch (std::exception const& e)
809 : {
810 0 : *force_exit_loop = true;
811 :
812 0 : this->mtx_exception->lock();
813 :
814 0 : auto save = tools::exception::no_stacktrace;
815 0 : tools::exception::no_stacktrace = true;
816 0 : std::string msg = e.what(); // get only the function signature
817 0 : tools::exception::no_stacktrace = save;
818 :
819 0 : if (std::find(this->prev_exception_messages.begin(), this->prev_exception_messages.end(), msg) ==
820 0 : this->prev_exception_messages.end())
821 : {
822 0 : this->prev_exception_messages.push_back(msg); // save only the function signature
823 0 : this->prev_exception_messages_to_display.push_back(e.what()); // with stacktrace if debug mode
824 : }
825 0 : else if (std::strlen(e.what()) > this->prev_exception_messages_to_display.back().size())
826 0 : this->prev_exception_messages_to_display[prev_exception_messages_to_display.size() - 1] = e.what();
827 :
828 0 : this->mtx_exception->unlock();
829 0 : }
830 :
831 4360 : if (this->is_thread_pinning()) tools::Thread_pinning::unpin();
832 4130 : }
833 :
834 : void
835 0 : Sequence::exec(std::function<bool(const std::vector<const int*>&)> stop_condition)
836 : {
837 0 : if (this->is_no_copy_mode()) this->gen_processes(true);
838 :
839 0 : std::function<bool(const std::vector<const int*>&)> real_stop_condition;
840 0 : if (this->auto_stop)
841 0 : real_stop_condition = [this, stop_condition](const std::vector<const int*>& statuses)
842 : {
843 0 : bool res = stop_condition(statuses);
844 0 : return res || this->is_done();
845 0 : };
846 : else
847 0 : real_stop_condition = stop_condition;
848 :
849 0 : std::vector<std::thread> threads(n_threads);
850 0 : for (size_t tid = 1; tid < n_threads; tid++)
851 0 : threads[tid] =
852 0 : std::thread(&Sequence::_exec, this, tid, std::ref(real_stop_condition), std::ref(this->sequences[tid]));
853 :
854 0 : this->_exec(0, real_stop_condition, this->sequences[0]);
855 :
856 0 : for (size_t tid = 1; tid < n_threads; tid++)
857 0 : threads[tid].join();
858 :
859 0 : if (this->is_no_copy_mode() && !this->is_part_of_pipeline)
860 : {
861 0 : this->reset_no_copy_mode();
862 0 : this->gen_processes(false);
863 : }
864 :
865 0 : if (!this->prev_exception_messages_to_display.empty())
866 : {
867 0 : *force_exit_loop = false;
868 0 : throw std::runtime_error(this->prev_exception_messages_to_display.back());
869 : }
870 0 : }
871 :
872 : void
873 363 : Sequence::exec(std::function<bool()> stop_condition)
874 : {
875 363 : if (this->is_no_copy_mode()) this->gen_processes(true);
876 :
877 363 : std::function<bool()> real_stop_condition;
878 363 : if (this->auto_stop)
879 1552407 : real_stop_condition = [this, stop_condition]()
880 : {
881 774879 : bool res = stop_condition();
882 779581 : return res || this->is_done();
883 352 : };
884 : else
885 11 : real_stop_condition = stop_condition;
886 :
887 363 : std::vector<std::thread> threads(n_threads);
888 4207 : for (size_t tid = 1; tid < n_threads; tid++)
889 : {
890 3843 : threads[tid] = std::thread(
891 7688 : &Sequence::_exec_without_statuses, this, tid, std::ref(real_stop_condition), std::ref(this->sequences[tid]));
892 : }
893 :
894 363 : this->_exec_without_statuses(0, real_stop_condition, this->sequences[0]);
895 :
896 4190 : for (size_t tid = 1; tid < n_threads; tid++)
897 3841 : threads[tid].join();
898 :
899 349 : if (this->is_no_copy_mode() && !this->is_part_of_pipeline)
900 : {
901 73 : this->reset_no_copy_mode();
902 73 : this->gen_processes(false);
903 : }
904 :
905 359 : if (!this->prev_exception_messages_to_display.empty())
906 : {
907 0 : *force_exit_loop = false;
908 0 : throw std::runtime_error(this->prev_exception_messages_to_display.back());
909 : }
910 360 : }
911 :
912 : void
913 196 : Sequence::exec()
914 : {
915 564209 : this->exec([]() { return false; });
916 201 : }
917 :
918 : void
919 62 : Sequence::exec_seq(const size_t tid, const int frame_id)
920 : {
921 62 : if (tid >= this->sequences.size())
922 : {
923 0 : std::stringstream message;
924 0 : message << "'tid' has to be smaller than 'sequences.size()' ('tid' = " << tid
925 0 : << ", 'sequences.size()' = " << this->sequences.size() << ").";
926 0 : throw tools::runtime_error(__FILE__, __LINE__, __func__, message.str());
927 0 : }
928 :
929 : std::function<void(tools::Digraph_node<Sub_sequence>*)> exec_sequence =
930 422 : [&exec_sequence, frame_id](tools::Digraph_node<Sub_sequence>* cur_ss)
931 : {
932 61 : auto type = cur_ss->get_c()->type;
933 61 : auto& tasks = cur_ss->get_c()->tasks;
934 61 : if (type == subseq_t::COMMUTE)
935 : {
936 0 : const int path = tasks[0]->exec(frame_id)[0];
937 0 : if (cur_ss->get_children().size() > (size_t)path) exec_sequence(cur_ss->get_children()[path]);
938 : }
939 : else
940 : {
941 417 : for (size_t ta = 0; ta < tasks.size(); ta++)
942 359 : tasks[ta]->exec(frame_id);
943 60 : for (auto c : cur_ss->get_children())
944 0 : exec_sequence(c);
945 : }
946 123 : };
947 :
948 61 : exec_sequence(this->sequences[tid]);
949 62 : }
950 :
951 : runtime::Task*
952 75559 : Sequence::exec_step(const size_t tid, const int frame_id)
953 : {
954 75559 : if (tid >= this->sequences.size())
955 : {
956 0 : std::stringstream message;
957 0 : message << "'tid' has to be smaller than 'sequences.size()' ('tid' = " << tid
958 0 : << ", 'sequences.size()' = " << this->sequences.size() << ").";
959 0 : throw tools::runtime_error(__FILE__, __LINE__, __func__, message.str());
960 0 : }
961 :
962 75559 : runtime::Task* executed_task = nullptr;
963 75559 : if (this->next_round_is_over[tid])
964 : {
965 3410 : this->next_round_is_over[tid] = false;
966 3410 : this->cur_ss[tid] = this->sequences[tid];
967 3410 : this->cur_task_id[tid] = 0;
968 : }
969 : else
970 : {
971 72149 : executed_task = this->cur_ss[tid]->get_c()->tasks[cur_task_id[tid]];
972 72149 : const std::vector<int>& ret = executed_task->exec(frame_id);
973 :
974 72148 : auto type = this->cur_ss[tid]->get_c()->type;
975 72148 : if (type == subseq_t::COMMUTE)
976 : {
977 9515 : const size_t path = (size_t)ret[0];
978 9515 : if (this->cur_ss[tid]->get_children().size() > path)
979 : {
980 9515 : this->cur_ss[tid] = this->cur_ss[tid]->get_children()[path];
981 9515 : this->cur_task_id[tid] = 0;
982 :
983 9515 : if (this->cur_ss[tid]->get_c()->tasks.size() == 0)
984 : {
985 : // skip nodes without tasks if any
986 0 : while (this->cur_ss[tid]->get_children().size() > 0)
987 : {
988 0 : this->cur_ss[tid] = this->cur_ss[tid]->get_children()[0];
989 0 : this->cur_task_id[tid] = 0;
990 0 : if (this->cur_ss[tid]->get_c() && this->cur_ss[tid]->get_c()->tasks.size() > 0) break;
991 : }
992 0 : if (this->cur_task_id[tid] >= this->cur_ss[tid]->get_c()->tasks.size())
993 0 : this->next_round_is_over[tid] = true;
994 : }
995 : }
996 : else
997 : {
998 0 : std::stringstream message;
999 0 : message << "This should never happen ('path' = " << path
1000 0 : << ", 'cur_ss[tid]->get_children().size()' = " << this->cur_ss[tid]->get_children().size()
1001 0 : << ", 'tid' = " << tid << ").";
1002 0 : throw tools::invalid_argument(__FILE__, __LINE__, __func__, message.str());
1003 0 : }
1004 : }
1005 : else
1006 : {
1007 62633 : this->cur_task_id[tid]++;
1008 62633 : if (this->cur_task_id[tid] == (this->cur_ss[tid]->get_c()->tasks.size()))
1009 : {
1010 : // skip nodes without tasks if any
1011 28019 : while (this->cur_ss[tid]->get_children().size() > 0)
1012 : {
1013 24609 : this->cur_ss[tid] = this->cur_ss[tid]->get_children()[0];
1014 24609 : this->cur_task_id[tid] = 0;
1015 24609 : if (this->cur_ss[tid]->get_c() && this->cur_ss[tid]->get_c()->tasks.size() > 0) break;
1016 : }
1017 28019 : if (this->cur_task_id[tid] >= this->cur_ss[tid]->get_c()->tasks.size())
1018 3410 : this->next_round_is_over[tid] = true;
1019 : }
1020 : }
1021 : }
1022 :
1023 75558 : return executed_task;
1024 : }
1025 :
1026 : template<class SS, class TA>
1027 : tools::Digraph_node<SS>*
1028 2780 : Sequence::init_recursive(tools::Digraph_node<SS>* cur_subseq,
1029 : size_t& ssid,
1030 : size_t& taid,
1031 : std::vector<std::pair<TA*, tools::Digraph_node<SS>*>>& selectors,
1032 : std::vector<TA*>& switchers,
1033 : TA& first,
1034 : TA& current_task,
1035 : const std::vector<TA*>& lasts,
1036 : const std::vector<TA*>& exclusions,
1037 : std::vector<size_t>& real_lasts_id,
1038 : std::vector<TA*>& real_lasts,
1039 : std::map<TA*, unsigned>& in_sockets_feed,
1040 : std::map<TA*, std::pair<tools::Digraph_node<SS>*, size_t>>& task_subseq)
1041 : {
1042 2780 : if (this->tasks_inplace && !current_task.is_autoalloc())
1043 : {
1044 0 : std::stringstream message;
1045 : message << "When 'tasks_inplace' is set to true 'current_task' should be in autoalloc mode ("
1046 : << "'current_task.get_name()'"
1047 : << " = " << current_task.get_name() << ", "
1048 : << "'current_task.get_module().get_name()'"
1049 0 : << " = " << current_task.get_module().get_name() << ").";
1050 0 : throw tools::runtime_error(__FILE__, __LINE__, __func__, message.str());
1051 0 : }
1052 :
1053 2780 : if (dynamic_cast<const module::Adaptor*>(¤t_task.get_module()) && !this->tasks_inplace)
1054 : {
1055 0 : std::stringstream message;
1056 0 : message << "'module::Adaptor' objects are not supported if 'tasks_inplace' is set to false.";
1057 0 : throw tools::runtime_error(__FILE__, __LINE__, __func__, message.str());
1058 0 : }
1059 :
1060 2780 : auto it = std::find(real_lasts.begin(), real_lasts.end(), ¤t_task);
1061 2780 : if (it != real_lasts.end())
1062 : {
1063 0 : real_lasts.erase(it);
1064 0 : auto dist = std::distance(real_lasts.begin(), it);
1065 0 : real_lasts_id.erase(real_lasts_id.begin() + dist);
1066 : }
1067 :
1068 2780 : if (cur_subseq->get_contents() == nullptr)
1069 : {
1070 512 : cur_subseq->set_contents(new SS());
1071 512 : ssid++;
1072 : }
1073 :
1074 2780 : bool is_last = true;
1075 2780 : tools::Digraph_node<SS>* last_subseq = nullptr;
1076 :
1077 2780 : if (auto switcher = dynamic_cast<const module::Switcher*>(¤t_task.get_module()))
1078 : {
1079 148 : const auto current_task_name = current_task.get_name();
1080 222 : if (current_task_name == "commute" && // -------------------------------------------------------------- COMMUTE
1081 222 : std::find(switchers.begin(), switchers.end(), ¤t_task) == switchers.end())
1082 : {
1083 74 : switchers.push_back(¤t_task);
1084 74 : auto node_commute = new tools::Digraph_node<SS>({ cur_subseq }, {}, nullptr, cur_subseq->get_depth() + 1);
1085 :
1086 74 : node_commute->set_contents(new SS());
1087 74 : node_commute->get_c()->tasks.push_back(¤t_task);
1088 74 : node_commute->get_c()->tasks_id.push_back(taid++);
1089 74 : node_commute->get_c()->type = subseq_t::COMMUTE;
1090 74 : ssid++;
1091 :
1092 74 : cur_subseq->add_child(node_commute);
1093 :
1094 232 : for (size_t sdo = 0; sdo < switcher->get_n_data_sockets(); sdo++)
1095 : {
1096 316 : auto node_commute_son =
1097 158 : new tools::Digraph_node<SS>({ node_commute }, {}, nullptr, node_commute->get_depth() + 1);
1098 :
1099 158 : node_commute_son->set_contents(new SS());
1100 158 : ssid++;
1101 :
1102 158 : node_commute->add_child(node_commute_son);
1103 :
1104 158 : auto& bss = (*switcher)[module::swi::tsk::commute].sockets[sdo + 2]->get_bound_sockets();
1105 322 : for (auto bs : bss)
1106 : {
1107 164 : if (bs == nullptr) continue;
1108 164 : auto& t = bs->get_task();
1109 164 : if (std::find(exclusions.begin(), exclusions.end(), &t) == exclusions.end())
1110 : {
1111 164 : if (task_subseq.find(&t) == task_subseq.end() || task_subseq[&t].second < ssid)
1112 158 : task_subseq[&t] = { node_commute_son, ssid };
1113 :
1114 164 : in_sockets_feed.find(&t) != in_sockets_feed.end() ? in_sockets_feed[&t]++
1115 124 : : in_sockets_feed[&t] = 1;
1116 164 : bool t_is_select =
1117 164 : dynamic_cast<const module::Switcher*>(&(t.get_module())) && t.get_name() == "select";
1118 114 : if ((!t_is_select && in_sockets_feed[&t] >= (t.get_n_input_sockets() + t.get_n_fwd_sockets()) -
1119 328 : t.get_n_static_input_sockets()) ||
1120 50 : (t_is_select && t.is_last_input_socket(*bs)))
1121 : {
1122 130 : is_last = false;
1123 130 : last_subseq = Sequence::init_recursive<SS, TA>(task_subseq[&t].first,
1124 260 : task_subseq[&t].second,
1125 : taid,
1126 : selectors,
1127 : switchers,
1128 : first,
1129 : t,
1130 : lasts,
1131 : exclusions,
1132 : real_lasts_id,
1133 : real_lasts,
1134 : in_sockets_feed,
1135 : task_subseq);
1136 : }
1137 34 : else if (t_is_select)
1138 : {
1139 34 : tools::Digraph_node<SS>* node_selector = nullptr;
1140 40 : for (auto& sel : selectors)
1141 40 : if (sel.first == &t)
1142 : {
1143 34 : node_selector = sel.second;
1144 34 : break;
1145 : }
1146 :
1147 34 : if (!node_selector)
1148 : {
1149 0 : node_selector = new tools::Digraph_node<SS>(
1150 0 : { node_commute_son }, {}, nullptr, node_commute_son->get_depth() + 1);
1151 0 : selectors.push_back({ &t, node_selector });
1152 : }
1153 : else
1154 34 : node_selector->add_parent(node_commute_son);
1155 34 : node_commute_son->add_child(node_selector);
1156 : }
1157 : }
1158 : }
1159 : }
1160 : // exception for the status socket
1161 : auto& bss =
1162 74 : (*switcher)[module::swi::tsk::commute].sockets[switcher->get_n_data_sockets() + 2]->get_bound_sockets();
1163 74 : for (auto bs : bss)
1164 : {
1165 0 : if (bs == nullptr) continue;
1166 0 : auto& t = bs->get_task();
1167 0 : if (std::find(exclusions.begin(), exclusions.end(), &t) == exclusions.end())
1168 : {
1169 0 : if (task_subseq.find(&t) == task_subseq.end() || task_subseq[&t].second < ssid)
1170 0 : task_subseq[&t] = { node_commute, ssid };
1171 :
1172 0 : in_sockets_feed.find(&t) != in_sockets_feed.end() ? in_sockets_feed[&t]++ : in_sockets_feed[&t] = 1;
1173 0 : bool t_is_select =
1174 0 : dynamic_cast<const module::Switcher*>(&(t.get_module())) && t.get_name() == "select";
1175 0 : if ((!t_is_select && in_sockets_feed[&t] >= (t.get_n_input_sockets() + t.get_n_fwd_sockets()) -
1176 0 : t.get_n_static_input_sockets()) ||
1177 0 : (t_is_select && t.is_last_input_socket(*bs)))
1178 : {
1179 0 : is_last = false;
1180 0 : last_subseq = Sequence::init_recursive<SS, TA>(task_subseq[&t].first,
1181 0 : task_subseq[&t].second,
1182 : taid,
1183 : selectors,
1184 : switchers,
1185 : first,
1186 : t,
1187 : lasts,
1188 : exclusions,
1189 : real_lasts_id,
1190 : real_lasts,
1191 : in_sockets_feed,
1192 : task_subseq);
1193 : }
1194 : }
1195 : }
1196 : }
1197 74 : else if (current_task_name == "select") // ------------------------------------------------------------- SELECT
1198 : {
1199 74 : tools::Digraph_node<SS>* node_selector = nullptr;
1200 :
1201 96 : for (auto& sel : selectors)
1202 40 : if (sel.first == ¤t_task)
1203 : {
1204 18 : node_selector = sel.second;
1205 18 : break;
1206 : }
1207 :
1208 74 : if (!node_selector)
1209 : {
1210 56 : node_selector = new tools::Digraph_node<SS>({ cur_subseq }, {}, nullptr, cur_subseq->get_depth() + 1);
1211 56 : selectors.push_back({ ¤t_task, node_selector });
1212 : }
1213 :
1214 74 : node_selector->set_contents(new SS());
1215 74 : node_selector->get_c()->tasks.push_back(¤t_task);
1216 74 : node_selector->get_c()->tasks_id.push_back(taid++);
1217 74 : node_selector->get_c()->type = subseq_t::SELECT;
1218 74 : ssid++;
1219 :
1220 74 : cur_subseq->add_child(node_selector);
1221 :
1222 148 : auto node_selector_son =
1223 74 : new tools::Digraph_node<SS>({ node_selector }, {}, nullptr, node_selector->get_depth() + 1);
1224 :
1225 74 : node_selector_son->set_contents(new SS());
1226 74 : ssid++;
1227 :
1228 74 : node_selector->add_child(node_selector_son);
1229 :
1230 528 : for (auto& s : current_task.sockets)
1231 : {
1232 306 : if (!(s->get_type() == socket_t::SOUT)) continue;
1233 148 : auto bss = s->get_bound_sockets();
1234 272 : for (auto bs : bss)
1235 : {
1236 124 : if (bs == nullptr) continue;
1237 124 : auto& t = bs->get_task();
1238 124 : if (std::find(exclusions.begin(), exclusions.end(), &t) == exclusions.end())
1239 : {
1240 124 : if (task_subseq.find(&t) == task_subseq.end() || task_subseq[&t].second < ssid)
1241 124 : task_subseq[&t] = { node_selector_son, ssid };
1242 :
1243 124 : in_sockets_feed.find(&t) != in_sockets_feed.end() ? in_sockets_feed[&t]++
1244 124 : : in_sockets_feed[&t] = 1;
1245 124 : bool t_is_select =
1246 124 : dynamic_cast<const module::Switcher*>(&(t.get_module())) && t.get_name() == "select";
1247 :
1248 118 : if ((!t_is_select && in_sockets_feed[&t] >= (t.get_n_input_sockets() + t.get_n_fwd_sockets()) -
1249 248 : t.get_n_static_input_sockets()) ||
1250 6 : (t_is_select && t.is_last_input_socket(*bs)))
1251 : {
1252 86 : is_last = false;
1253 86 : last_subseq = Sequence::init_recursive<SS, TA>(task_subseq[&t].first,
1254 172 : task_subseq[&t].second,
1255 : taid,
1256 : selectors,
1257 : switchers,
1258 : first,
1259 : t,
1260 : lasts,
1261 : exclusions,
1262 : real_lasts_id,
1263 : real_lasts,
1264 : in_sockets_feed,
1265 : task_subseq);
1266 : }
1267 38 : else if (t_is_select)
1268 : {
1269 0 : tools::Digraph_node<SS>* node_selector = nullptr;
1270 0 : for (auto& sel : selectors)
1271 0 : if (sel.first == &t)
1272 : {
1273 0 : node_selector = sel.second;
1274 0 : break;
1275 : }
1276 :
1277 0 : if (!node_selector)
1278 : {
1279 0 : node_selector = new tools::Digraph_node<SS>(
1280 0 : { node_selector_son }, {}, nullptr, node_selector_son->get_depth() + 1);
1281 0 : selectors.push_back({ &t, node_selector });
1282 : }
1283 0 : node_selector->add_parent(node_selector_son);
1284 0 : node_selector_son->add_child(node_selector);
1285 : }
1286 : }
1287 : }
1288 : }
1289 : }
1290 148 : }
1291 : else // --------------------------------------------------------------------------------------------- STANDARD CASE
1292 : {
1293 2632 : cur_subseq->get_c()->tasks.push_back(¤t_task);
1294 2632 : cur_subseq->get_c()->tasks_id.push_back(taid++);
1295 :
1296 2632 : if (std::find(lasts.begin(), lasts.end(), ¤t_task) == lasts.end())
1297 : {
1298 7724 : for (auto& s : current_task.sockets)
1299 : {
1300 5488 : if (s->get_type() == socket_t::SOUT || s->get_type() == socket_t::SFWD)
1301 : {
1302 4066 : auto bss = s->get_bound_sockets();
1303 6336 : for (auto& bs : bss)
1304 : {
1305 2270 : if (bs == nullptr) continue;
1306 2270 : auto& t = bs->get_task();
1307 2270 : if (std::find(exclusions.begin(), exclusions.end(), &t) == exclusions.end())
1308 : {
1309 2249 : if (task_subseq.find(&t) == task_subseq.end() || task_subseq[&t].second < ssid)
1310 1997 : task_subseq[&t] = { cur_subseq, ssid };
1311 :
1312 2249 : in_sockets_feed.find(&t) != in_sockets_feed.end() ? in_sockets_feed[&t]++
1313 1947 : : in_sockets_feed[&t] = 1;
1314 2249 : bool t_is_select =
1315 2249 : dynamic_cast<const module::Switcher*>(&(t.get_module())) && t.get_name() == "select";
1316 6645 : if ((!t_is_select &&
1317 2147 : in_sockets_feed[&t] >= (t.get_n_input_sockets() + t.get_n_fwd_sockets()) -
1318 4498 : t.get_n_static_input_sockets()) ||
1319 102 : (t_is_select && t.is_last_input_socket(*bs)))
1320 : {
1321 1900 : is_last = false;
1322 1900 : last_subseq = Sequence::init_recursive<SS, TA>(task_subseq[&t].first,
1323 3800 : task_subseq[&t].second,
1324 : taid,
1325 : selectors,
1326 : switchers,
1327 : first,
1328 : t,
1329 : lasts,
1330 : exclusions,
1331 : real_lasts_id,
1332 : real_lasts,
1333 : in_sockets_feed,
1334 : task_subseq);
1335 : }
1336 349 : else if (t_is_select)
1337 : {
1338 50 : tools::Digraph_node<SS>* node_selector = nullptr;
1339 66 : for (auto& sel : selectors)
1340 48 : if (sel.first == &t)
1341 : {
1342 32 : node_selector = sel.second;
1343 32 : break;
1344 : }
1345 :
1346 50 : if (!node_selector)
1347 : {
1348 36 : node_selector = new tools::Digraph_node<SS>(
1349 18 : { cur_subseq }, {}, nullptr, cur_subseq->get_depth() + 1);
1350 18 : selectors.push_back({ &t, node_selector });
1351 : }
1352 50 : node_selector->add_parent(cur_subseq);
1353 50 : cur_subseq->add_child(node_selector);
1354 : }
1355 : }
1356 : }
1357 4066 : }
1358 1422 : else if (s->get_type() == socket_t::SIN && s->get_bound_sockets().size() > 1)
1359 : {
1360 0 : std::stringstream message;
1361 : message << "'s->get_bound_sockets().size()' has to be smaller or equal to 1 ("
1362 : << "'s->get_bound_sockets().size()'"
1363 0 : << " = " << s->get_bound_sockets().size() << ", "
1364 : << "'s->get_type()'"
1365 : << " = "
1366 : << "socket_t::SIN"
1367 : << ", "
1368 : << "'s->get_name()'"
1369 0 : << " = " << s->get_name() << ", "
1370 : << "'s->get_task().get_name()'"
1371 0 : << " = " << s->get_task().get_name() << ", "
1372 : << "'s->get_task().get_module().get_name()'"
1373 0 : << " = " << s->get_task().get_module().get_name() << ").";
1374 0 : throw tools::runtime_error(__FILE__, __LINE__, __func__, message.str());
1375 0 : }
1376 : }
1377 : }
1378 : }
1379 :
1380 2780 : if (is_last && std::find(real_lasts.begin(), real_lasts.end(), ¤t_task) == real_lasts.end())
1381 : {
1382 858 : real_lasts.push_back(¤t_task);
1383 858 : real_lasts_id.push_back(cur_subseq->get_contents()->tasks_id.back());
1384 : }
1385 :
1386 2780 : if (last_subseq)
1387 1922 : return last_subseq;
1388 : else
1389 858 : return cur_subseq;
1390 : }
1391 :
1392 : template<class SS, class MO>
1393 : void
1394 512 : Sequence::replicate(const tools::Digraph_node<SS>* sequence)
1395 : {
1396 512 : std::set<MO*> modules_set;
1397 512 : std::vector<const runtime::Task*> tsks_vec; // get a vector of tasks included in the tasks graph
1398 : std::function<void(const tools::Digraph_node<SS>*, std::vector<const tools::Digraph_node<SS>*>&)>
1399 512 : collect_modules_list;
1400 512 : collect_modules_list =
1401 920 : [&](const tools::Digraph_node<SS>* node, std::vector<const tools::Digraph_node<SS>*>& already_parsed_nodes)
1402 : {
1403 1840 : if (node != nullptr &&
1404 1840 : std::find(already_parsed_nodes.begin(), already_parsed_nodes.end(), node) == already_parsed_nodes.end())
1405 : {
1406 836 : already_parsed_nodes.push_back(node);
1407 836 : tsks_vec.insert(tsks_vec.end(), node->get_c()->tasks.begin(), node->get_c()->tasks.end());
1408 836 : if (node->get_c())
1409 3616 : for (auto ta : node->get_c()->tasks)
1410 2780 : modules_set.insert(&ta->get_module());
1411 1244 : for (auto c : node->get_children())
1412 408 : collect_modules_list(c, already_parsed_nodes);
1413 : }
1414 : };
1415 512 : std::vector<const tools::Digraph_node<SS>*> already_parsed_nodes;
1416 512 : collect_modules_list(sequence, already_parsed_nodes);
1417 :
1418 : // check if all the tasks of the sequence are replicable before to perform the modules clones
1419 512 : if (this->n_threads - (this->tasks_inplace ? 1 : 0))
1420 1442 : for (auto& t : tsks_vec)
1421 1244 : if (!t->is_replicable())
1422 : {
1423 0 : std::stringstream message;
1424 : message << "It is not possible to replicate this sequence because at least one of its tasks is not "
1425 : << "replicable (t->get_name() = '" << t->get_name() << "', t->get_module().get_name() = '"
1426 0 : << t->get_module().get_name() << "').";
1427 0 : throw tools::runtime_error(__FILE__, __LINE__, __func__, message.str());
1428 0 : }
1429 :
1430 512 : std::vector<MO*> modules_vec;
1431 3207 : for (auto m : modules_set)
1432 2695 : modules_vec.push_back(m);
1433 :
1434 : // clone the modules
1435 4891 : for (size_t tid = 0; tid < this->n_threads - (this->tasks_inplace ? 1 : 0); tid++)
1436 : {
1437 4379 : if (this->is_thread_pinning())
1438 : {
1439 4 : const auto real_tid = tid + (this->tasks_inplace ? 1 : 0);
1440 4 : if (!this->puids.empty())
1441 0 : tools::Thread_pinning::pin(this->puids[real_tid]);
1442 : else
1443 4 : tools::Thread_pinning::pin(this->pin_objects_per_thread[real_tid]);
1444 : }
1445 :
1446 4379 : this->modules[tid].resize(modules_vec.size());
1447 4379 : this->all_modules[tid + (this->tasks_inplace ? 1 : 0)].resize(modules_vec.size());
1448 36449 : for (size_t m = 0; m < modules_vec.size(); m++)
1449 : {
1450 : try
1451 : {
1452 32070 : this->modules[tid][m].reset(modules_vec[m]->clone());
1453 : }
1454 0 : catch (std::exception& e)
1455 : {
1456 0 : std::stringstream message;
1457 : message << "Module clone failed when trying to replicate the sequence (module name is '"
1458 0 : << modules_vec[m]->get_name() << "').";
1459 :
1460 0 : throw tools::runtime_error(__FILE__, __LINE__, __func__, message.str());
1461 0 : }
1462 32070 : this->all_modules[tid + (this->tasks_inplace ? 1 : 0)][m] = this->modules[tid][m].get();
1463 : }
1464 :
1465 4379 : if (this->is_thread_pinning()) tools::Thread_pinning::unpin();
1466 : }
1467 :
1468 71593 : auto get_module_id = [](const std::vector<MO*>& modules, const module::Module& module)
1469 : {
1470 : int m_id;
1471 403270 : for (m_id = 0; m_id < (int)modules.size(); m_id++)
1472 400658 : if (modules[m_id] == &module) return m_id;
1473 2612 : return -1;
1474 : };
1475 :
1476 68793 : auto get_task_id = [](const std::vector<std::shared_ptr<runtime::Task>>& tasks, const runtime::Task& task)
1477 : {
1478 : int t_id;
1479 83605 : for (t_id = 0; t_id < (int)tasks.size(); t_id++)
1480 83605 : if (tasks[t_id].get() == &task) return t_id;
1481 0 : return -1;
1482 : };
1483 :
1484 34558 : auto get_socket_id = [](const std::vector<std::shared_ptr<runtime::Socket>>& sockets, const runtime::Socket& socket)
1485 : {
1486 : int s_id;
1487 73591 : for (s_id = 0; s_id < (int)sockets.size(); s_id++)
1488 73591 : if (sockets[s_id].get() == &socket) return s_id;
1489 0 : return -1;
1490 : };
1491 :
1492 : std::function<void(const tools::Digraph_node<SS>*,
1493 : tools::Digraph_node<Sub_sequence>*,
1494 : const size_t,
1495 : std::vector<const tools::Digraph_node<SS>*>&,
1496 : std::map<size_t, tools::Digraph_node<Sub_sequence>*>&)>
1497 512 : replicate_sequence;
1498 :
1499 16515 : replicate_sequence = [&](const tools::Digraph_node<SS>* sequence_ref,
1500 : tools::Digraph_node<Sub_sequence>* sequence_cpy,
1501 : const size_t thread_id,
1502 : std::vector<const tools::Digraph_node<SS>*>& already_parsed_nodes,
1503 : std::map<size_t, tools::Digraph_node<Sub_sequence>*>& allocated_nodes)
1504 : {
1505 32006 : if (sequence_ref != nullptr && sequence_ref->get_c() &&
1506 16003 : std::find(already_parsed_nodes.begin(), already_parsed_nodes.end(), sequence_ref) ==
1507 32006 : already_parsed_nodes.end())
1508 : {
1509 13511 : already_parsed_nodes.push_back(sequence_ref);
1510 :
1511 13511 : auto ss_ref = sequence_ref->get_c();
1512 13511 : auto ss_cpy = new Sub_sequence();
1513 :
1514 13511 : ss_cpy->type = ss_ref->type;
1515 13511 : ss_cpy->id = ss_ref->id;
1516 13511 : ss_cpy->tasks_id = ss_ref->tasks_id;
1517 47746 : for (auto t_ref : ss_ref->tasks)
1518 : {
1519 34235 : auto& m_ref = t_ref->get_module();
1520 :
1521 34235 : auto m_id = get_module_id(modules_vec, m_ref);
1522 34235 : auto t_id = get_task_id(m_ref.tasks, *t_ref);
1523 :
1524 34235 : assert(m_id != -1);
1525 34235 : assert(t_id != -1);
1526 :
1527 : // add the task to the sub-sequence
1528 34235 : ss_cpy->tasks.push_back(this->all_modules[thread_id][m_id]->tasks[t_id].get());
1529 :
1530 : // replicate the sockets binding
1531 131253 : for (size_t s_id = 0; s_id < t_ref->sockets.size(); s_id++)
1532 : {
1533 165737 : if (t_ref->sockets[s_id]->get_type() == socket_t::SIN ||
1534 68719 : t_ref->sockets[s_id]->get_type() == socket_t::SFWD)
1535 : {
1536 33834 : const runtime::Socket* s_ref_out = nullptr;
1537 : try
1538 : {
1539 33834 : s_ref_out = &t_ref->sockets[s_id]->get_bound_socket();
1540 : }
1541 48 : catch (...)
1542 : {
1543 : }
1544 33834 : if (s_ref_out)
1545 : {
1546 33786 : auto& t_ref_out = s_ref_out->get_task();
1547 33786 : auto& m_ref_out = t_ref_out.get_module();
1548 :
1549 : // check if `t_ref_out` is included in the tasks graph
1550 33786 : auto t_in_seq = std::find(tsks_vec.begin(), tsks_vec.end(), &t_ref_out) != tsks_vec.end();
1551 33786 : auto m_id_out = get_module_id(modules_vec, m_ref_out);
1552 :
1553 33786 : if (t_in_seq && m_id_out != -1)
1554 : {
1555 31555 : auto t_id_out = get_task_id(m_ref_out.tasks, t_ref_out);
1556 31555 : auto s_id_out = get_socket_id(t_ref_out.sockets, *s_ref_out);
1557 :
1558 31555 : assert(t_id_out != -1);
1559 31555 : assert(s_id_out != -1);
1560 :
1561 31555 : (*this->all_modules[thread_id][m_id_out]).tasks[t_id_out]->set_autoalloc(true);
1562 :
1563 31555 : auto& s_in = *this->all_modules[thread_id][m_id]->tasks[t_id]->sockets[s_id];
1564 : auto& s_out =
1565 31555 : *this->all_modules[thread_id][m_id_out]->tasks[t_id_out]->sockets[s_id_out];
1566 31555 : s_in = s_out;
1567 : }
1568 : }
1569 : }
1570 : }
1571 :
1572 : // replicate the tasks binding
1573 34235 : if (t_ref->fake_input_sockets.size())
1574 : {
1575 7144 : for (auto& fsi : t_ref->fake_input_sockets)
1576 : {
1577 3572 : const runtime::Socket* s_ref_out = nullptr;
1578 : try
1579 : {
1580 3572 : s_ref_out = &fsi->get_bound_socket();
1581 : }
1582 0 : catch (...)
1583 : {
1584 : }
1585 3572 : if (s_ref_out)
1586 : {
1587 3572 : auto& t_ref_out = s_ref_out->get_task();
1588 3572 : auto& m_ref_out = t_ref_out.get_module();
1589 :
1590 : // check if `t_ref_out` is included in the tasks graph
1591 3572 : auto t_in_seq = std::find(tsks_vec.begin(), tsks_vec.end(), &t_ref_out) != tsks_vec.end();
1592 3572 : auto m_id_out = get_module_id(modules_vec, m_ref_out);
1593 :
1594 3572 : if (t_in_seq && m_id_out != -1)
1595 : {
1596 3003 : auto t_id_out = get_task_id(m_ref_out.tasks, t_ref_out);
1597 3003 : auto s_id_out = get_socket_id(t_ref_out.sockets, *s_ref_out);
1598 :
1599 3003 : assert(t_id_out != -1);
1600 3003 : assert(s_id_out != -1);
1601 :
1602 3003 : (*this->all_modules[thread_id][m_id_out]).tasks[t_id_out]->set_autoalloc(true);
1603 :
1604 3003 : auto& t_in = *this->all_modules[thread_id][m_id]->tasks[t_id];
1605 : auto& s_out =
1606 3003 : *this->all_modules[thread_id][m_id_out]->tasks[t_id_out]->sockets[s_id_out];
1607 3003 : t_in = s_out;
1608 : }
1609 : }
1610 : }
1611 : }
1612 : }
1613 :
1614 : // add the sub-sequence to the current tree node
1615 13511 : sequence_cpy->set_contents(ss_cpy);
1616 13511 : allocated_nodes[sequence_cpy->get_c()->id] = sequence_cpy;
1617 :
1618 25135 : for (size_t c = 0; c < sequence_ref->get_children().size(); c++)
1619 : {
1620 11624 : if (std::find(already_parsed_nodes.begin(),
1621 : already_parsed_nodes.end(),
1622 23248 : sequence_ref->get_children()[c]) != already_parsed_nodes.end())
1623 2492 : sequence_cpy->add_child(allocated_nodes[sequence_ref->get_children()[c]->get_c()->id]);
1624 : else
1625 18264 : sequence_cpy->add_child(new tools::Digraph_node<Sub_sequence>(
1626 9132 : { sequence_cpy }, {}, nullptr, sequence_cpy->get_depth() + 1));
1627 : }
1628 :
1629 25135 : for (size_t c = 0; c < sequence_ref->get_children().size(); c++)
1630 11624 : replicate_sequence(sequence_ref->get_children()[c],
1631 11624 : sequence_cpy->get_children()[c],
1632 : thread_id,
1633 : already_parsed_nodes,
1634 : allocated_nodes);
1635 : }
1636 : };
1637 :
1638 : std::function<void(tools::Digraph_node<Sub_sequence>*, std::vector<tools::Digraph_node<Sub_sequence>*>&)>
1639 16515 : set_autoalloc_true = [&set_autoalloc_true](tools::Digraph_node<Sub_sequence>* node,
1640 : std::vector<tools::Digraph_node<Sub_sequence>*>& already_parsed_nodes)
1641 : {
1642 16003 : if (std::find(already_parsed_nodes.begin(), already_parsed_nodes.end(), node) == already_parsed_nodes.end())
1643 : {
1644 13511 : already_parsed_nodes.push_back(node);
1645 :
1646 47746 : for (auto t : node->get_c()->tasks)
1647 34235 : t->set_autoalloc(true);
1648 25135 : for (auto c : node->get_children())
1649 11624 : set_autoalloc_true(c, already_parsed_nodes);
1650 : }
1651 : };
1652 :
1653 4891 : for (size_t thread_id = (this->tasks_inplace ? 1 : 0); thread_id < this->sequences.size(); thread_id++)
1654 : {
1655 4379 : if (this->is_thread_pinning())
1656 : {
1657 4 : if (!this->puids.empty())
1658 0 : tools::Thread_pinning::pin(this->puids[thread_id]);
1659 : else
1660 4 : tools::Thread_pinning::pin(this->pin_objects_per_thread[thread_id]);
1661 : }
1662 :
1663 4379 : this->sequences[thread_id] = new tools::Digraph_node<Sub_sequence>({}, {}, nullptr, 0);
1664 4379 : already_parsed_nodes.clear();
1665 4379 : std::map<size_t, tools::Digraph_node<Sub_sequence>*> allocated_nodes;
1666 4379 : replicate_sequence(sequence, this->sequences[thread_id], thread_id, already_parsed_nodes, allocated_nodes);
1667 4379 : std::vector<tools::Digraph_node<Sub_sequence>*> already_parsed_nodes_bis;
1668 4379 : set_autoalloc_true(this->sequences[thread_id], already_parsed_nodes_bis);
1669 :
1670 4379 : if (this->is_thread_pinning()) tools::Thread_pinning::unpin();
1671 : }
1672 512 : }
1673 :
1674 : template void
1675 : runtime::Sequence::replicate<runtime::Sub_sequence_const, const module::Module>(
1676 : const tools::Digraph_node<runtime::Sub_sequence_const>*);
1677 : template void
1678 : runtime::Sequence::replicate<runtime::Sub_sequence, module::Module>(const tools::Digraph_node<runtime::Sub_sequence>*);
1679 :
1680 : template<class SS>
1681 : void
1682 16923 : Sequence::delete_tree(tools::Digraph_node<SS>* node, std::vector<tools::Digraph_node<SS>*>& already_deleted_nodes)
1683 : {
1684 33846 : if (node != nullptr &&
1685 33846 : std::find(already_deleted_nodes.begin(), already_deleted_nodes.end(), node) == already_deleted_nodes.end())
1686 : {
1687 14347 : already_deleted_nodes.push_back(node);
1688 26379 : for (auto c : node->get_children())
1689 12032 : this->delete_tree(c, already_deleted_nodes);
1690 14347 : auto c = node->get_c();
1691 14347 : if (c != nullptr) delete c;
1692 14347 : delete node;
1693 : }
1694 16923 : }
1695 :
1696 : template void
1697 : runtime::Sequence::delete_tree<runtime::Sub_sequence_const>(
1698 : tools::Digraph_node<runtime::Sub_sequence_const>*,
1699 : std::vector<tools::Digraph_node<Sub_sequence_const>*>& already_deleted_nodes);
1700 : template void
1701 : runtime::Sequence::delete_tree<runtime::Sub_sequence>(
1702 : tools::Digraph_node<runtime::Sub_sequence>*,
1703 : std::vector<tools::Digraph_node<Sub_sequence>*>& already_deleted_nodes);
1704 :
1705 : template<class VTA>
1706 : void
1707 64 : Sequence::export_dot_subsequence(const VTA& subseq,
1708 : const std::vector<size_t>& tasks_id,
1709 : const subseq_t& subseq_type,
1710 : const std::string& subseq_name,
1711 : const std::string& tab,
1712 : std::ostream& stream) const
1713 : {
1714 64 : if (!subseq_name.empty())
1715 : {
1716 64 : stream << tab << "subgraph \"cluster_" << subseq_name << "_" << +this << "\" {" << std::endl;
1717 64 : stream << tab << tab << "node [style=filled];" << std::endl;
1718 : }
1719 64 : size_t exec_order = 0;
1720 286 : for (auto& t : subseq)
1721 : {
1722 222 : std::string color = dynamic_cast<module::Adaptor*>(&t->get_module()) ? "green" : "blue";
1723 222 : color = dynamic_cast<module::AProbe*>(&t->get_module()) ? "pink" : color;
1724 222 : stream << tab << tab << "subgraph \"cluster_" << +&t->get_module() << "_" << +t << "\" {" << std::endl;
1725 222 : stream << tab << tab << tab << "node [style=filled];" << std::endl;
1726 222 : stream << tab << tab << tab << "subgraph \"cluster_" << +&t << "\" {" << std::endl;
1727 222 : stream << tab << tab << tab << tab << "node [style=filled];" << std::endl;
1728 :
1729 222 : if (t->fake_input_sockets.size())
1730 : {
1731 47 : std::string stype = "in";
1732 94 : for (auto& fsi : t->fake_input_sockets)
1733 47 : stream << tab << tab << tab << tab << "\"" << +fsi.get() << "\""
1734 47 : << "[label=\"" << stype << ":" << fsi->get_name() << "\", style=filled, "
1735 94 : << "fillcolor=red, penwidth=\"2.0\"];" << std::endl;
1736 47 : }
1737 :
1738 222 : size_t sid = 0;
1739 800 : for (auto& s : t->sockets)
1740 : {
1741 578 : std::string stype;
1742 578 : bool static_input = false;
1743 578 : switch (s->get_type())
1744 : {
1745 160 : case socket_t::SIN:
1746 160 : stype = "in[" + std::to_string(sid) + "]";
1747 160 : static_input = s->_get_dataptr() != nullptr && s->bound_socket == nullptr;
1748 160 : break;
1749 375 : case socket_t::SOUT:
1750 375 : stype = "out[" + std::to_string(sid) + "]";
1751 375 : break;
1752 43 : case socket_t::SFWD:
1753 43 : stype = "fwd[" + std::to_string(sid) + "]";
1754 43 : break;
1755 0 : default:
1756 0 : stype = "unkn";
1757 0 : break;
1758 : }
1759 :
1760 578 : std::string bold_or_not;
1761 578 : if (t->is_last_input_socket(*s)) bold_or_not = ", penwidth=\"2.0\"";
1762 :
1763 578 : stream << tab << tab << tab << tab << "\"" << +s.get() << "\""
1764 578 : << "[label=\"" << stype << ":" << s->get_name() << "\"" << bold_or_not
1765 1156 : << (static_input ? ", style=filled, fillcolor=green" : "") << "];" << std::endl;
1766 578 : sid++;
1767 : }
1768 :
1769 444 : stream << tab << tab << tab << tab << "label=\"" << t->get_name() << " (id = " << tasks_id[exec_order] << ")"
1770 222 : << "\";" << std::endl;
1771 222 : stream << tab << tab << tab << tab << "color=" << (t->is_replicable() ? color : "red") << ";" << std::endl;
1772 222 : stream << tab << tab << tab << "}" << std::endl;
1773 222 : stream << tab << tab << tab << "label=\"" << t->get_module().get_name() << "\n"
1774 444 : << (t->get_module().get_custom_name().empty() ? "" : t->get_module().get_custom_name() + "\n")
1775 666 : << "exec order: [" << exec_order++ << "]\n"
1776 222 : << "addr: " << +&t->get_module() << "\";" << std::endl;
1777 222 : stream << tab << tab << tab << "color=" << color << ";" << std::endl;
1778 222 : stream << tab << tab << "}" << std::endl;
1779 : }
1780 64 : if (!subseq_name.empty())
1781 : {
1782 64 : stream << tab << tab << "label=\"" << subseq_name << "\";" << std::endl;
1783 64 : std::string color = subseq_type == subseq_t::COMMUTE || subseq_type == subseq_t::SELECT ? "red" : "blue";
1784 64 : stream << tab << tab << "color=" << color << ";" << std::endl;
1785 64 : stream << tab << "}" << std::endl;
1786 64 : }
1787 64 : }
1788 :
1789 : template void
1790 : runtime::Sequence::export_dot_subsequence<std::vector<runtime::Task*>>(const std::vector<runtime::Task*>&,
1791 : const std::vector<size_t>&,
1792 : const subseq_t&,
1793 : const std::string&,
1794 : const std::string&,
1795 : std::ostream&) const;
1796 : template void
1797 : runtime::Sequence::export_dot_subsequence<std::vector<const runtime::Task*>>(const std::vector<const runtime::Task*>&,
1798 : const std::vector<size_t>&,
1799 : const subseq_t&,
1800 : const std::string&,
1801 : const std::string&,
1802 : std::ostream&) const;
1803 :
1804 : template<class VTA>
1805 : void
1806 64 : Sequence::export_dot_connections(const VTA& subseq, const std::string& tab, std::ostream& stream) const
1807 : {
1808 286 : for (auto& t : subseq)
1809 : {
1810 800 : for (auto& s : t->sockets)
1811 : {
1812 578 : if (s->get_type() == socket_t::SOUT || s->get_type() == socket_t::SFWD)
1813 : {
1814 418 : auto& bss = s->get_bound_sockets();
1815 418 : size_t id = 0;
1816 668 : for (auto& bs : bss)
1817 : {
1818 250 : stream << tab << "\"" << +s.get() << "\" -> \"" << +bs << "\""
1819 250 : << (bss.size() > 1 ? "[label=\"" + std::to_string(id++) + "\"]" : "") << std::endl;
1820 : }
1821 : }
1822 : }
1823 : }
1824 64 : }
1825 :
1826 : template void
1827 : runtime::Sequence::export_dot_connections<std::vector<runtime::Task*>>(const std::vector<runtime::Task*>&,
1828 : const std::string&,
1829 : std::ostream&) const;
1830 : template void
1831 : runtime::Sequence::export_dot_connections<std::vector<const runtime::Task*>>(const std::vector<const runtime::Task*>&,
1832 : const std::string&,
1833 : std::ostream&) const;
1834 :
1835 : void
1836 12 : Sequence::export_dot(std::ostream& stream) const
1837 : {
1838 12 : auto root = this->sequences[0];
1839 12 : this->export_dot(root, stream);
1840 12 : }
1841 :
1842 : template<class SS>
1843 : void
1844 12 : Sequence::export_dot(tools::Digraph_node<SS>* root, std::ostream& stream) const
1845 : {
1846 : std::function<void(
1847 : tools::Digraph_node<SS>*, const std::string&, std::ostream&, std::vector<tools::Digraph_node<SS>*>&)>
1848 12 : export_dot_subsequences_recursive =
1849 50 : [&export_dot_subsequences_recursive, this](tools::Digraph_node<SS>* cur_node,
1850 : const std::string& tab,
1851 : std::ostream& stream,
1852 : std::vector<tools::Digraph_node<SS>*>& already_parsed_nodes)
1853 : {
1854 100 : if (cur_node != nullptr &&
1855 100 : std::find(already_parsed_nodes.begin(), already_parsed_nodes.end(), cur_node) == already_parsed_nodes.end())
1856 : {
1857 42 : already_parsed_nodes.push_back(cur_node);
1858 126 : this->export_dot_subsequence(cur_node->get_c()->tasks,
1859 42 : cur_node->get_c()->tasks_id,
1860 42 : cur_node->get_c()->type,
1861 42 : "Sub-sequence " + std::to_string(cur_node->get_c()->id) +
1862 : " (depth = " + std::to_string(cur_node->get_depth()) + ")",
1863 : tab,
1864 : stream);
1865 :
1866 80 : for (auto c : cur_node->get_children())
1867 38 : export_dot_subsequences_recursive(c, tab, stream, already_parsed_nodes);
1868 : }
1869 : };
1870 :
1871 : std::function<void(
1872 : tools::Digraph_node<SS>*, const std::string&, std::ostream&, std::vector<tools::Digraph_node<SS>*>&)>
1873 12 : export_dot_connections_recursive =
1874 50 : [&export_dot_connections_recursive, this](tools::Digraph_node<SS>* cur_node,
1875 : const std::string& tab,
1876 : std::ostream& stream,
1877 : std::vector<tools::Digraph_node<SS>*>& already_parsed_nodes)
1878 : {
1879 100 : if (cur_node != nullptr &&
1880 100 : std::find(already_parsed_nodes.begin(), already_parsed_nodes.end(), cur_node) == already_parsed_nodes.end())
1881 : {
1882 42 : already_parsed_nodes.push_back(cur_node);
1883 42 : this->export_dot_connections(cur_node->get_c()->tasks, tab, stream);
1884 :
1885 80 : for (auto c : cur_node->get_children())
1886 38 : export_dot_connections_recursive(c, tab, stream, already_parsed_nodes);
1887 : }
1888 : };
1889 :
1890 12 : std::string tab = "\t";
1891 12 : stream << "digraph Sequence {" << std::endl;
1892 12 : std::vector<tools::Digraph_node<SS>*> already_parsed_nodes;
1893 12 : export_dot_subsequences_recursive(root, tab, stream, already_parsed_nodes);
1894 12 : already_parsed_nodes.clear();
1895 12 : export_dot_connections_recursive(root, tab, stream, already_parsed_nodes);
1896 12 : stream << "}" << std::endl;
1897 12 : }
1898 :
1899 : void
1900 1222 : Sequence::gen_processes(const bool no_copy_mode)
1901 : {
1902 : std::function<void(Socket * socket, std::vector<runtime::Socket*> & list_fwd)> explore_thread_rec =
1903 11165 : [&explore_thread_rec](Socket* socket, std::vector<runtime::Socket*>& list_fwd)
1904 : {
1905 8807 : auto bound = socket->get_bound_sockets();
1906 17171 : for (auto explore_bound : bound)
1907 : {
1908 9743 : if (find(list_fwd.begin(), list_fwd.end(), explore_bound) == list_fwd.end() &&
1909 1378 : explore_bound->get_type() != socket_t::SOUT)
1910 : {
1911 1382 : list_fwd.push_back(explore_bound);
1912 : }
1913 8367 : if (explore_bound->get_type() == socket_t::SFWD) explore_thread_rec(explore_bound, list_fwd);
1914 : }
1915 10010 : };
1916 :
1917 : std::function<void(Socket * socket, std::vector<runtime::Socket*> & list_fwd)> explore_thread_rec_reverse =
1918 2277 : [&explore_thread_rec, &explore_thread_rec_reverse](Socket* socket, std::vector<runtime::Socket*>& list_fwd)
1919 : {
1920 1139 : auto bound = &socket->get_bound_socket();
1921 1139 : if (find(list_fwd.begin(), list_fwd.end(), bound) == list_fwd.end())
1922 : {
1923 1139 : list_fwd.push_back(bound);
1924 : }
1925 1138 : if (bound->get_type() == socket_t::SFWD)
1926 : {
1927 569 : explore_thread_rec(bound, list_fwd);
1928 569 : explore_thread_rec_reverse(bound, list_fwd);
1929 : }
1930 2359 : };
1931 :
1932 : std::function<void(tools::Digraph_node<Sub_sequence>*, std::vector<tools::Digraph_node<Sub_sequence>*>&)>
1933 : gen_processes_recursive =
1934 40915 : [&gen_processes_recursive, no_copy_mode, &explore_thread_rec, &explore_thread_rec_reverse](
1935 : tools::Digraph_node<Sub_sequence>* cur_node,
1936 53316 : std::vector<tools::Digraph_node<Sub_sequence>*>& already_parsed_nodes)
1937 : {
1938 81826 : if (cur_node != nullptr &&
1939 81826 : std::find(already_parsed_nodes.begin(), already_parsed_nodes.end(), cur_node) == already_parsed_nodes.end())
1940 : {
1941 34791 : already_parsed_nodes.push_back(cur_node);
1942 :
1943 34797 : std::map<runtime::Task*, std::function<const int*()>> modified_tasks;
1944 34798 : auto contents = cur_node->get_c();
1945 34798 : contents->processes.clear();
1946 34798 : contents->rebind_sockets.clear();
1947 34798 : contents->rebind_dataptrs.clear();
1948 132815 : for (auto task : contents->tasks)
1949 : {
1950 108043 : if (dynamic_cast<module::Switcher*>(&task->get_module()) &&
1951 206048 : task->get_name().find("select") != std::string::npos && no_copy_mode)
1952 : {
1953 1435 : auto select_task = task;
1954 1435 : auto switcher = dynamic_cast<module::Switcher*>(&select_task->get_module());
1955 1435 : switcher->set_no_copy_select(true);
1956 :
1957 1435 : const auto rebind_id = contents->rebind_sockets.size();
1958 1435 : contents->rebind_sockets.resize(rebind_id + 1);
1959 1435 : contents->rebind_dataptrs.resize(rebind_id + 1);
1960 :
1961 6077 : for (size_t s = 0; s < select_task->sockets.size() - 1; s++)
1962 : {
1963 : // there should be only one output socket at this time
1964 4642 : if (select_task->sockets[s]->get_type() == socket_t::SOUT)
1965 : {
1966 1435 : std::vector<runtime::Socket*> bound_sockets;
1967 1435 : std::vector<void*> dataptrs;
1968 :
1969 2870 : for (auto socket : select_task->sockets[s]->get_bound_sockets())
1970 : {
1971 1435 : bound_sockets.push_back(socket);
1972 1435 : if (socket->get_type() == socket_t::SFWD) explore_thread_rec(socket, bound_sockets);
1973 : }
1974 2870 : for (auto sck : bound_sockets)
1975 1435 : dataptrs.push_back(sck->_get_dataptr());
1976 :
1977 1435 : contents->rebind_sockets[rebind_id].push_back(bound_sockets);
1978 1435 : contents->rebind_dataptrs[rebind_id].push_back(dataptrs);
1979 1435 : }
1980 : }
1981 :
1982 437781 : modified_tasks[select_task] = [contents, select_task, switcher, rebind_id]() -> const int*
1983 : {
1984 48442 : select_task->exec();
1985 48356 : const int* status = select_task->sockets.back()->get_dataptr<int>();
1986 :
1987 48218 : const auto path = switcher->get_path();
1988 48223 : const auto in_dataptr = select_task->sockets[path]->_get_dataptr();
1989 :
1990 : // rebind input sockets on the fly
1991 : // there should be only one output socket at this time (sout_id == 0)
1992 96659 : for (size_t sout_id = 0; sout_id < contents->rebind_sockets[rebind_id].size(); sout_id++)
1993 96717 : for (size_t sin_id = 0; sin_id < contents->rebind_sockets[rebind_id][sout_id].size();
1994 : sin_id++)
1995 48296 : contents->rebind_sockets[rebind_id][sout_id][sin_id]->dataptr = in_dataptr;
1996 :
1997 48482 : return status;
1998 1435 : };
1999 : }
2000 :
2001 108040 : if (dynamic_cast<module::Switcher*>(&task->get_module()) &&
2002 206057 : task->get_name().find("commute") != std::string::npos && no_copy_mode)
2003 : {
2004 1435 : auto commute_task = task;
2005 1435 : auto switcher = dynamic_cast<module::Switcher*>(&commute_task->get_module());
2006 1435 : switcher->set_no_copy_commute(true);
2007 :
2008 1435 : const auto rebind_id = contents->rebind_sockets.size();
2009 1435 : contents->rebind_sockets.resize(rebind_id + 1);
2010 1435 : contents->rebind_dataptrs.resize(rebind_id + 1);
2011 :
2012 7512 : for (size_t s = 0; s < commute_task->sockets.size() - 1; s++)
2013 : {
2014 6077 : if (commute_task->sockets[s]->get_type() == socket_t::SOUT)
2015 : {
2016 3207 : std::vector<runtime::Socket*> bound_sockets;
2017 3207 : std::vector<void*> dataptrs;
2018 :
2019 6559 : for (auto socket : commute_task->sockets[s]->get_bound_sockets())
2020 : {
2021 3352 : bound_sockets.push_back(socket);
2022 3352 : if (socket->get_type() == socket_t::SFWD) explore_thread_rec(socket, bound_sockets);
2023 : }
2024 6559 : for (auto sck : bound_sockets)
2025 3352 : dataptrs.push_back(sck->_get_dataptr());
2026 :
2027 3207 : contents->rebind_sockets[rebind_id].push_back(bound_sockets);
2028 3207 : contents->rebind_dataptrs[rebind_id].push_back(dataptrs);
2029 3207 : }
2030 : }
2031 :
2032 343626 : modified_tasks[commute_task] = [contents, commute_task, switcher, rebind_id]() -> const int*
2033 : {
2034 48513 : commute_task->exec();
2035 48240 : const int* status = commute_task->sockets.back()->get_dataptr<int>();
2036 48136 : const auto in_dataptr = commute_task->sockets[0]->_get_dataptr();
2037 48056 : const auto path = switcher->get_path();
2038 :
2039 : // rebind input sockets on the fly
2040 98018 : for (size_t sin_id = 0; sin_id < contents->rebind_sockets[rebind_id][path].size(); sin_id++)
2041 49793 : contents->rebind_sockets[rebind_id][path][sin_id]->dataptr = in_dataptr;
2042 :
2043 48404 : return status;
2044 1435 : };
2045 : }
2046 :
2047 106402 : if (dynamic_cast<module::Adaptor*>(&task->get_module()) &&
2048 204416 : task->get_name().find("pull") != std::string::npos && no_copy_mode)
2049 : {
2050 2096 : auto pull_task = task;
2051 2096 : auto adp_pull = dynamic_cast<module::Adaptor*>(&pull_task->get_module());
2052 2095 : adp_pull->set_no_copy_pull(true);
2053 2091 : const auto rebind_id = contents->rebind_sockets.size();
2054 2091 : contents->rebind_sockets.resize(rebind_id + 1);
2055 2092 : contents->rebind_dataptrs.resize(rebind_id + 1);
2056 :
2057 5896 : for (size_t s = 0; s < pull_task->sockets.size() - 1; s++)
2058 : {
2059 3803 : if (pull_task->sockets[s]->get_type() == socket_t::SOUT)
2060 : {
2061 3800 : std::vector<runtime::Socket*> bound_sockets;
2062 3799 : std::vector<void*> dataptrs;
2063 :
2064 3799 : bound_sockets.push_back(pull_task->sockets[s].get());
2065 8469 : for (auto socket : pull_task->sockets[s]->get_bound_sockets())
2066 : {
2067 4662 : bound_sockets.push_back(socket);
2068 4673 : if (socket->get_type() == socket_t::SFWD) explore_thread_rec(socket, bound_sockets);
2069 : }
2070 13130 : for (auto sck : bound_sockets)
2071 9331 : dataptrs.push_back(sck->_get_dataptr());
2072 :
2073 3789 : contents->rebind_sockets[rebind_id].push_back(bound_sockets);
2074 3807 : contents->rebind_dataptrs[rebind_id].push_back(dataptrs);
2075 3805 : }
2076 : }
2077 :
2078 9307777 : modified_tasks[pull_task] = [contents, pull_task, adp_pull, rebind_id]() -> const int*
2079 : {
2080 : // active or passive waiting here
2081 540057 : pull_task->exec();
2082 542244 : const int* status = pull_task->sockets.back()->get_dataptr<int>();
2083 :
2084 : // rebind input sockets on the fly
2085 1485649 : for (size_t sin_id = 0; sin_id < contents->rebind_sockets[rebind_id].size(); sin_id++)
2086 : {
2087 954026 : if (contents->rebind_sockets[rebind_id][sin_id].size() > 1)
2088 : {
2089 : // we start to 1 because the rebinding of the 'pull_task' is made in the
2090 : // 'pull_task->exec()' call (this way the debug mode is still working)
2091 949772 : auto swap_buff = contents->rebind_sockets[rebind_id][sin_id][1]->_get_dataptr();
2092 945593 : auto buff = adp_pull->get_filled_buffer(sin_id, swap_buff);
2093 935803 : contents->rebind_sockets[rebind_id][sin_id][1]->dataptr = buff;
2094 : // for the next tasks the same buffer 'buff' is required, an easy mistake is to re-swap
2095 : // and the data will be false, this is why we just bind 'buff'
2096 1669368 : for (size_t ta = 2; ta < contents->rebind_sockets[rebind_id][sin_id].size(); ta++)
2097 735288 : contents->rebind_sockets[rebind_id][sin_id][ta]->dataptr = buff;
2098 : }
2099 : }
2100 545791 : adp_pull->wake_up_pusher();
2101 557320 : return status;
2102 2091 : };
2103 : }
2104 :
2105 106408 : if (dynamic_cast<module::Adaptor*>(&task->get_module()) &&
2106 204423 : task->get_name().find("push") != std::string::npos && no_copy_mode)
2107 : {
2108 2098 : auto push_task = task;
2109 2098 : auto adp_push = dynamic_cast<module::Adaptor*>(&push_task->get_module());
2110 2098 : adp_push->set_no_copy_push(true);
2111 2098 : const auto rebind_id = contents->rebind_sockets.size();
2112 2098 : contents->rebind_sockets.resize(rebind_id + 1);
2113 2098 : contents->rebind_dataptrs.resize(rebind_id + 1);
2114 :
2115 6934 : for (size_t s = 0; s < push_task->sockets.size() - 1; s++)
2116 4837 : if (push_task->sockets[s]->get_type() == socket_t::SIN)
2117 : {
2118 4836 : std::vector<runtime::Socket*> bound_sockets;
2119 4835 : std::vector<void*> dataptrs;
2120 :
2121 4835 : bound_sockets.push_back(push_task->sockets[s].get());
2122 :
2123 4833 : auto bound_socket = &push_task->sockets[s]->get_bound_socket();
2124 4832 : bound_sockets.push_back(bound_socket);
2125 4829 : explore_thread_rec(bound_socket, bound_sockets);
2126 :
2127 : // If the socket is FWD, we have to update all the other sockets with a backward
2128 : // exploration
2129 4830 : if (bound_socket->get_type() == socket_t::SFWD)
2130 570 : explore_thread_rec_reverse(bound_socket, bound_sockets);
2131 :
2132 16112 : for (auto sck : bound_sockets)
2133 11278 : dataptrs.push_back(sck->_get_dataptr());
2134 :
2135 4806 : contents->rebind_sockets[rebind_id].push_back(bound_sockets);
2136 4834 : contents->rebind_dataptrs[rebind_id].push_back(dataptrs);
2137 4836 : }
2138 :
2139 8380795 : modified_tasks[push_task] = [contents, push_task, adp_push, rebind_id]() -> const int*
2140 : {
2141 : // active or passive waiting here
2142 552835 : push_task->exec();
2143 566932 : const int* status = push_task->sockets.back()->get_dataptr<int>();
2144 : // rebind output sockets on the fly
2145 1535732 : for (size_t sout_id = 0; sout_id < contents->rebind_sockets[rebind_id].size(); sout_id++)
2146 : {
2147 : // we start to 1 because the rebinding of the 'push_task' is made in the
2148 : // 'push_task->exec()' call (this way the debug mode is still working)
2149 986496 : auto swap_buff = contents->rebind_sockets[rebind_id][sout_id][1]->_get_dataptr();
2150 981683 : auto buff = adp_push->get_empty_buffer(sout_id, swap_buff);
2151 966393 : contents->rebind_sockets[rebind_id][sout_id][1]->dataptr = buff;
2152 : // the output socket linked to the push adp can have more than one socket bound and so
2153 : // we have to rebind all the input sokects bound to the current output socket
2154 1592669 : for (size_t ta = 2; ta < contents->rebind_sockets[rebind_id][sout_id].size(); ta++)
2155 628965 : contents->rebind_sockets[rebind_id][sout_id][ta]->dataptr = buff;
2156 : }
2157 564899 : adp_push->wake_up_puller();
2158 578853 : return status;
2159 2094 : };
2160 : }
2161 : }
2162 :
2163 132800 : for (auto task : contents->tasks)
2164 97999 : if (modified_tasks.count(task))
2165 7058 : contents->processes.push_back(modified_tasks[task]);
2166 : else
2167 90948 : contents->processes.push_back(
2168 3788280 : [task]() -> const int*
2169 : {
2170 1912943 : task->exec();
2171 1875337 : const int* status = task->sockets.back()->get_dataptr<int>();
2172 1786889 : return status;
2173 : });
2174 :
2175 63222 : for (auto c : cur_node->get_children())
2176 28432 : gen_processes_recursive(c, already_parsed_nodes);
2177 34798 : }
2178 42141 : };
2179 :
2180 1221 : size_t thread_id = 0;
2181 13710 : for (auto& sequence : this->sequences)
2182 : {
2183 12484 : if (this->is_thread_pinning())
2184 : {
2185 30 : if (!this->puids.empty())
2186 0 : tools::Thread_pinning::pin(this->puids[thread_id++]);
2187 : else
2188 30 : tools::Thread_pinning::pin(this->pin_objects_per_thread[thread_id++]);
2189 : }
2190 12485 : std::vector<tools::Digraph_node<Sub_sequence>*> already_parsed_nodes;
2191 12485 : gen_processes_recursive(sequence, already_parsed_nodes);
2192 :
2193 12488 : if (this->is_thread_pinning()) tools::Thread_pinning::unpin();
2194 12488 : }
2195 1221 : }
2196 :
2197 : void
2198 355 : Sequence::reset_no_copy_mode()
2199 : {
2200 : std::function<void(tools::Digraph_node<Sub_sequence>*, std::vector<tools::Digraph_node<Sub_sequence>*>&)>
2201 : reset_no_copy_mode_recursive =
2202 12023 : [&reset_no_copy_mode_recursive](tools::Digraph_node<Sub_sequence>* cur_node,
2203 8200 : std::vector<tools::Digraph_node<Sub_sequence>*>& already_parsed_nodes)
2204 : {
2205 24046 : if (cur_node != nullptr &&
2206 24046 : std::find(already_parsed_nodes.begin(), already_parsed_nodes.end(), cur_node) == already_parsed_nodes.end())
2207 : {
2208 10251 : already_parsed_nodes.push_back(cur_node);
2209 10251 : auto contents = cur_node->get_c();
2210 40897 : for (auto task : contents->tasks)
2211 : {
2212 33516 : if (dynamic_cast<module::Switcher*>(&task->get_module()) &&
2213 33516 : task->get_name().find("select") != std::string::npos)
2214 : {
2215 1435 : auto select_task = task;
2216 1435 : auto switcher = dynamic_cast<module::Switcher*>(&select_task->get_module());
2217 1435 : switcher->set_no_copy_select(false);
2218 : }
2219 :
2220 33516 : if (dynamic_cast<module::Switcher*>(&task->get_module()) &&
2221 33516 : task->get_name().find("commute") != std::string::npos)
2222 : {
2223 1435 : auto commute_task = task;
2224 1435 : auto switcher = dynamic_cast<module::Switcher*>(&commute_task->get_module());
2225 1435 : switcher->set_no_copy_commute(false);
2226 : }
2227 :
2228 34842 : if (dynamic_cast<module::Adaptor*>(&task->get_module()) &&
2229 34842 : task->get_name().find("pull") != std::string::npos)
2230 : {
2231 2098 : auto pull_task = task;
2232 2098 : auto adp_pull = dynamic_cast<module::Adaptor*>(&pull_task->get_module());
2233 2098 : adp_pull->set_no_copy_pull(false);
2234 2098 : adp_pull->reset_buffer();
2235 : }
2236 :
2237 34842 : if (dynamic_cast<module::Adaptor*>(&task->get_module()) &&
2238 34842 : task->get_name().find("push") != std::string::npos)
2239 : {
2240 2098 : auto push_task = task;
2241 2098 : auto adp_push = dynamic_cast<module::Adaptor*>(&push_task->get_module());
2242 2098 : adp_push->set_no_copy_push(false);
2243 2098 : adp_push->reset_buffer();
2244 : }
2245 : }
2246 :
2247 17317 : for (size_t rebind_id = 0; rebind_id < contents->rebind_sockets.size(); rebind_id++)
2248 20357 : for (size_t s = 0; s < contents->rebind_sockets[rebind_id].size(); s++)
2249 38773 : for (size_t ta = 0; ta < contents->rebind_sockets[rebind_id][s].size(); ta++)
2250 25482 : contents->rebind_sockets[rebind_id][s][ta]->dataptr =
2251 25482 : contents->rebind_dataptrs[rebind_id][s][ta];
2252 :
2253 18451 : for (auto c : cur_node->get_children())
2254 8200 : reset_no_copy_mode_recursive(c, already_parsed_nodes);
2255 : }
2256 12378 : };
2257 :
2258 4178 : for (auto& sequence : this->sequences)
2259 : {
2260 3823 : std::vector<tools::Digraph_node<Sub_sequence>*> already_parsed_nodes;
2261 3823 : reset_no_copy_mode_recursive(sequence, already_parsed_nodes);
2262 3823 : }
2263 355 : }
2264 :
2265 : void
2266 78 : Sequence::set_no_copy_mode(const bool no_copy_mode)
2267 : {
2268 78 : this->no_copy_mode = no_copy_mode;
2269 78 : }
2270 :
2271 : bool
2272 1004 : Sequence::is_no_copy_mode() const
2273 : {
2274 1004 : return this->no_copy_mode;
2275 : }
2276 :
2277 : void
2278 22 : Sequence::set_auto_stop(const bool auto_stop)
2279 : {
2280 22 : this->auto_stop = auto_stop;
2281 22 : }
2282 :
2283 : bool
2284 0 : Sequence::is_auto_stop() const
2285 : {
2286 0 : return this->auto_stop;
2287 : }
2288 :
2289 : Sub_sequence*
2290 12588 : Sequence::get_last_subsequence(const size_t tid)
2291 : {
2292 : std::function<Sub_sequence*(tools::Digraph_node<Sub_sequence>*, std::vector<tools::Digraph_node<Sub_sequence>*>&)>
2293 : get_last_subsequence_recursive =
2294 16596 : [&get_last_subsequence_recursive,
2295 : &tid](tools::Digraph_node<Sub_sequence>* cur_node,
2296 4008 : std::vector<tools::Digraph_node<Sub_sequence>*>& already_parsed_nodes) -> Sub_sequence*
2297 : {
2298 33192 : if (cur_node != nullptr &&
2299 33192 : std::find(already_parsed_nodes.begin(), already_parsed_nodes.end(), cur_node) == already_parsed_nodes.end())
2300 : {
2301 15810 : already_parsed_nodes.push_back(cur_node);
2302 15810 : if (!cur_node->get_children().size()) return cur_node->get_contents();
2303 3222 : Sub_sequence* last_ss = nullptr;
2304 7230 : for (auto c : cur_node->get_children())
2305 : {
2306 4008 : Sub_sequence* last_branch_ss = nullptr;
2307 4008 : last_branch_ss = get_last_subsequence_recursive(c, already_parsed_nodes);
2308 4008 : if (last_ss && last_branch_ss && last_ss != last_branch_ss)
2309 : {
2310 0 : std::stringstream message;
2311 : message << "Multiple candidates have been found for the last subsequence, this shouldn't be "
2312 0 : << "possible. (tid = " << tid << ", "
2313 0 : << "last_ss.id = " << last_ss->id << ", "
2314 0 : << "last_branch_ss.id = " << last_branch_ss->id << ")";
2315 0 : throw tools::runtime_error(__FILE__, __LINE__, __func__, message.str());
2316 0 : }
2317 4008 : last_ss = last_branch_ss ? last_branch_ss : last_ss;
2318 : }
2319 3222 : return last_ss;
2320 : }
2321 786 : return nullptr;
2322 12588 : };
2323 :
2324 12588 : std::vector<tools::Digraph_node<Sub_sequence>*> already_parsed_nodes;
2325 25176 : return get_last_subsequence_recursive(this->sequences[tid], already_parsed_nodes);
2326 12588 : }
2327 :
2328 : void
2329 12588 : Sequence::update_tasks_id(const size_t tid)
2330 : {
2331 : std::function<void(tools::Digraph_node<Sub_sequence>*, std::vector<tools::Digraph_node<Sub_sequence>*>&)>
2332 : update_tasks_id_recursive =
2333 16596 : [&update_tasks_id_recursive](tools::Digraph_node<Sub_sequence>* cur_node,
2334 4008 : std::vector<tools::Digraph_node<Sub_sequence>*>& already_parsed_nodes)
2335 : {
2336 33192 : if (cur_node != nullptr &&
2337 33192 : std::find(already_parsed_nodes.begin(), already_parsed_nodes.end(), cur_node) == already_parsed_nodes.end())
2338 : {
2339 15810 : already_parsed_nodes.push_back(cur_node);
2340 15810 : Sub_sequence* ss = cur_node->get_contents();
2341 15810 : ss->tasks_id.resize(ss->tasks.size());
2342 15810 : std::iota(ss->tasks_id.begin(), ss->tasks_id.end(), ss->tasks_id.front());
2343 :
2344 19818 : for (auto c : cur_node->get_children())
2345 4008 : update_tasks_id_recursive(c, already_parsed_nodes);
2346 : }
2347 29184 : };
2348 :
2349 12588 : std::vector<tools::Digraph_node<Sub_sequence>*> already_parsed_nodes;
2350 25176 : return update_tasks_id_recursive(this->sequences[tid], already_parsed_nodes);
2351 12588 : }
2352 :
2353 : std::vector<runtime::Task*>
2354 5869 : Sequence::get_tasks_from_id(const size_t taid)
2355 : {
2356 : std::function<void(tools::Digraph_node<Sub_sequence>*,
2357 : const size_t,
2358 : std::vector<runtime::Task*>&,
2359 : std::vector<tools::Digraph_node<Sub_sequence>*>&)>
2360 : get_tasks_from_id_recursive =
2361 77921 : [&get_tasks_from_id_recursive](tools::Digraph_node<Sub_sequence>* cur_node,
2362 : const size_t taid,
2363 : std::vector<runtime::Task*>& tasks,
2364 31194 : std::vector<tools::Digraph_node<Sub_sequence>*>& already_parsed_nodes)
2365 : {
2366 155842 : if (cur_node != nullptr &&
2367 155842 : std::find(already_parsed_nodes.begin(), already_parsed_nodes.end(), cur_node) == already_parsed_nodes.end())
2368 : {
2369 72906 : already_parsed_nodes.push_back(cur_node);
2370 72906 : Sub_sequence* ss = cur_node->get_contents();
2371 72906 : bool found = false;
2372 194647 : for (size_t t = 0; t < ss->tasks_id.size(); t++)
2373 168468 : if (ss->tasks_id[t] == taid)
2374 : {
2375 46727 : tasks.push_back(ss->tasks[t]);
2376 46727 : found = true;
2377 46727 : break;
2378 : }
2379 :
2380 72906 : if (!found)
2381 57373 : for (auto c : cur_node->get_children())
2382 31194 : get_tasks_from_id_recursive(c, taid, tasks, already_parsed_nodes);
2383 : }
2384 83790 : };
2385 :
2386 5869 : std::vector<runtime::Task*> tasks;
2387 52596 : for (auto& s : this->sequences)
2388 : {
2389 46727 : std::vector<tools::Digraph_node<Sub_sequence>*> already_parsed_nodes;
2390 46727 : get_tasks_from_id_recursive(s, taid, tasks, already_parsed_nodes);
2391 46727 : }
2392 11738 : return tasks;
2393 5869 : }
2394 :
2395 : void
2396 2204 : Sequence::update_firsts_and_lasts_tasks()
2397 : {
2398 2204 : this->firsts_tasks.clear();
2399 2204 : this->firsts_tasks.resize(this->n_threads);
2400 4965 : for (auto taid : firsts_tasks_id)
2401 : {
2402 2761 : auto tasks = this->get_tasks_from_id(taid);
2403 24423 : for (size_t tid = 0; tid < tasks.size(); tid++)
2404 21662 : firsts_tasks[tid].push_back(tasks[tid]);
2405 2761 : }
2406 :
2407 2204 : this->lasts_tasks.clear();
2408 2204 : this->lasts_tasks.resize(this->n_threads);
2409 5312 : for (auto taid : lasts_tasks_id)
2410 : {
2411 3108 : auto tasks = this->get_tasks_from_id(taid);
2412 28173 : for (size_t tid = 0; tid < tasks.size(); tid++)
2413 25065 : lasts_tasks[tid].push_back(tasks[tid]);
2414 3108 : }
2415 2204 : }
2416 :
2417 : void
2418 258 : Sequence::_set_n_frames_unbind(std::vector<std::pair<runtime::Socket*, runtime::Socket*>>& unbind_sockets,
2419 : std::vector<std::pair<runtime::Task*, runtime::Socket*>>& unbind_tasks)
2420 : {
2421 : std::function<void(const std::vector<runtime::Task*>&,
2422 : tools::Digraph_node<Sub_sequence>*,
2423 : std::vector<tools::Digraph_node<Sub_sequence>*>&)>
2424 24810 : graph_traversal = [&graph_traversal, &unbind_sockets, &unbind_tasks](
2425 : const std::vector<runtime::Task*>& possessed_tsks,
2426 : tools::Digraph_node<Sub_sequence>* cur_node,
2427 68258 : std::vector<tools::Digraph_node<Sub_sequence>*>& already_parsed_nodes)
2428 : {
2429 49620 : if (cur_node != nullptr &&
2430 49620 : std::find(already_parsed_nodes.begin(), already_parsed_nodes.end(), cur_node) == already_parsed_nodes.end())
2431 : {
2432 20506 : already_parsed_nodes.push_back(cur_node);
2433 20506 : Sub_sequence* ss = cur_node->get_contents();
2434 65900 : for (auto tsk_out : ss->tasks)
2435 : {
2436 177584 : for (auto sck_out : tsk_out->sockets)
2437 : {
2438 132190 : if (sck_out->get_type() == socket_t::SOUT || sck_out->get_type() == socket_t::SFWD)
2439 : {
2440 91292 : auto sck_out_bound_sockets_cpy = sck_out->get_bound_sockets();
2441 139566 : for (auto sck_in : sck_out_bound_sockets_cpy)
2442 : {
2443 48274 : auto tsk_in = &sck_in->get_task();
2444 : // if the task of the current input socket is in the tasks of the sequence
2445 48274 : if (std::find(possessed_tsks.begin(), possessed_tsks.end(), tsk_in) != possessed_tsks.end())
2446 : {
2447 : try
2448 : {
2449 48274 : tsk_in->unbind(*sck_out);
2450 4054 : unbind_tasks.push_back(std::make_pair(tsk_in, sck_out.get()));
2451 : }
2452 44220 : catch (...)
2453 : {
2454 44220 : sck_in->unbind(*sck_out);
2455 : // memorize the unbinds to rebind after!
2456 44220 : unbind_sockets.push_back(std::make_pair(sck_in, sck_out.get()));
2457 44220 : }
2458 : }
2459 : }
2460 91292 : }
2461 132190 : }
2462 : }
2463 40490 : for (auto c : cur_node->get_children())
2464 19984 : graph_traversal(possessed_tsks, c, already_parsed_nodes);
2465 : }
2466 25068 : };
2467 :
2468 258 : auto tsks_per_threads = this->get_tasks_per_threads();
2469 258 : std::vector<tools::Digraph_node<Sub_sequence>*> already_parsed_nodes;
2470 5084 : for (size_t t = 0; t < this->get_n_threads(); t++)
2471 : {
2472 4826 : already_parsed_nodes.clear();
2473 4826 : graph_traversal(tsks_per_threads[t], this->sequences[t], already_parsed_nodes);
2474 : }
2475 258 : }
2476 :
2477 : void
2478 258 : Sequence::_set_n_frames(const size_t n_frames)
2479 : {
2480 258 : if (n_frames <= 0)
2481 : {
2482 0 : std::stringstream message;
2483 0 : message << "'n_frames' has to be greater than 0 ('n_frames' = " << n_frames << ").";
2484 0 : throw tools::invalid_argument(__FILE__, __LINE__, __func__, message.str());
2485 0 : }
2486 :
2487 : // set_n_frames for all the modules (be aware that this operation can fail)
2488 5084 : for (auto& mm : this->all_modules)
2489 47636 : for (auto& m : mm)
2490 42810 : m->set_n_frames(n_frames);
2491 258 : }
2492 :
2493 : void
2494 258 : Sequence::_set_n_frames_rebind(const std::vector<std::pair<runtime::Socket*, runtime::Socket*>>& unbind_sockets,
2495 : const std::vector<std::pair<runtime::Task*, runtime::Socket*>>& unbind_tasks)
2496 : {
2497 : // rebind the sockets
2498 44478 : for (auto& u : unbind_sockets)
2499 44220 : (*u.first) = *u.second;
2500 :
2501 : // rebind the tasks
2502 4312 : for (auto& u : unbind_tasks)
2503 4054 : (*u.first) = *u.second;
2504 258 : }
2505 :
2506 : void
2507 276 : Sequence::set_n_frames(const size_t n_frames)
2508 : {
2509 276 : const auto old_n_frames = this->get_n_frames();
2510 276 : if (old_n_frames != n_frames)
2511 : {
2512 190 : std::vector<std::pair<runtime::Socket*, runtime::Socket*>> unbind_sockets;
2513 190 : std::vector<std::pair<runtime::Task*, runtime::Socket*>> unbind_tasks;
2514 190 : this->_set_n_frames_unbind(unbind_sockets, unbind_tasks);
2515 190 : this->_set_n_frames(n_frames);
2516 190 : this->_set_n_frames_rebind(unbind_sockets, unbind_tasks);
2517 190 : }
2518 276 : }
2519 :
2520 : bool
2521 11 : Sequence::is_control_flow() const
2522 : {
2523 11 : return this->sequences[0]->get_children().size();
2524 : }
2525 :
2526 : // /!\ this check has been commented because it is known to do not work in the general case
2527 : /*
2528 : template<class SS>
2529 : void Sequence
2530 : ::check_ctrl_flow(tools::Digraph_node<SS>* root)
2531 : {
2532 : std::function<void(tools::Digraph_node<SS>*,
2533 : std::vector<tools::Digraph_node<SS>*>&)> check_control_flow_parity =
2534 : [&check_control_flow_parity](tools::Digraph_node<SS>* cur_node,
2535 : std::vector<tools::Digraph_node<SS>*> already_parsed_nodes) -> void
2536 : {
2537 : if (cur_node != nullptr &&
2538 : std::find(already_parsed_nodes.begin(),
2539 : already_parsed_nodes.end(),
2540 : cur_node) == already_parsed_nodes.end() &&
2541 : cur_node->get_children().size())
2542 : {
2543 : already_parsed_nodes.push_back(cur_node);
2544 : for (auto c : cur_node->get_children())
2545 : check_control_flow_parity(c, already_parsed_nodes);
2546 : }
2547 : else
2548 : {
2549 : already_parsed_nodes.push_back(cur_node);
2550 : std::vector<module::Module*> parsed_switchers;
2551 : for (size_t i = 0; i < already_parsed_nodes.size(); i++)
2552 : {
2553 : // This check occurs before dud-nodes are removed by _init, some nodes have no
2554 : contents and must be
2555 : // accounted for
2556 : if (already_parsed_nodes[i]->get_c() == nullptr ||
2557 : !(already_parsed_nodes[i]->get_c()->type == subseq_t::COMMUTE ||
2558 : already_parsed_nodes[i]->get_c()->type == subseq_t::SELECT))
2559 : continue;
2560 :
2561 : // We search for the first switcher task in the path taken: already_parsed_nodes
2562 : const runtime::Task *ctrl_task_first = nullptr;
2563 : const runtime::Task *ctrl_task_second = nullptr;
2564 : for (auto t : already_parsed_nodes[i]->get_c()->tasks)
2565 : {
2566 : if (dynamic_cast<const module::Switcher*>(&t->get_module()) &&
2567 : (t->get_name() == "select" || t->get_name() == "commute"))
2568 : {
2569 : ctrl_task_first = t;
2570 : break;
2571 : }
2572 : }
2573 :
2574 : if (std::find(parsed_switchers.begin(), parsed_switchers.end(),
2575 : &(ctrl_task_first->get_module())) != parsed_switchers.end()) continue;
2576 :
2577 : // We now search for the second switcher task in the path taken
2578 : auto expected_type = ctrl_task_first->get_name() == "select" ? subseq_t::COMMUTE
2579 : : subseq_t::SELECT; for (size_t j = i; j < already_parsed_nodes.size() && ctrl_task_second == nullptr; j++)
2580 : {
2581 : if (already_parsed_nodes[j]->get_c() == nullptr ||
2582 : already_parsed_nodes[j]->get_c()->type != expected_type)
2583 : continue;
2584 : for (auto t : already_parsed_nodes[j]->get_c()->tasks)
2585 : {
2586 : if ((t->get_name() == "select" || t->get_name() == "commute") &&
2587 : &(ctrl_task_first->get_module()) == &(t->get_module()))
2588 : {
2589 : parsed_switchers.push_back(&(t->get_module()));
2590 : ctrl_task_second = t;
2591 : break;
2592 : }
2593 : }
2594 : }
2595 :
2596 : if (ctrl_task_second == nullptr)
2597 : {
2598 : for (auto t : ctrl_task_first->get_module().tasks)
2599 : {
2600 : if ((ctrl_task_first->get_name() == "select" && t->get_name() ==
2601 : "commute") || (ctrl_task_first->get_name() == "commute" && t->get_name() == "select"))
2602 : {
2603 : ctrl_task_second = t.get();
2604 : break;
2605 : }
2606 : }
2607 : std::stringstream message;
2608 : message << ctrl_task_first->get_name() << " is missing a path to "
2609 : << ctrl_task_second->get_name() << ".";
2610 : throw tools::control_flow_error(__FILE__, __LINE__, __func__,
2611 : message.str());
2612 : }
2613 : }
2614 : }
2615 : };
2616 :
2617 : std::vector<tools::Digraph_node<SS>*> already_parsed_nodes;
2618 : return check_control_flow_parity(root, already_parsed_nodes);
2619 : }
2620 : */
|