Line data Source code
1 : #include <algorithm>
2 : #include <cstring>
3 : #include <exception>
4 : #include <fstream>
5 : #include <numeric>
6 : #include <set>
7 : #include <sstream>
8 : #include <thread>
9 : #include <utility>
10 :
11 : #include "Module/Module.hpp"
12 : #include "Module/Stateful/Adaptor/Adaptor_m_to_n.hpp"
13 : #include "Module/Stateful/Probe/Probe.hpp"
14 : #include "Module/Stateful/Switcher/Switcher.hpp"
15 : #include "Runtime/Sequence/Sequence.hpp"
16 : #include "Runtime/Socket/Socket.hpp"
17 : #include "Runtime/Task/Task.hpp"
18 : #include "Tools/Buffer_allocator/Buffer_allocator.hpp"
19 : #include "Tools/Display/rang_format/rang_format.h"
20 : #include "Tools/Exception/exception.hpp"
21 : #include "Tools/Signal_handler/Signal_handler.hpp"
22 : #include "Tools/Thread/Thread_pinning/Thread_pinning.hpp"
23 : #include "Tools/Thread/Thread_pinning/Thread_pinning_utils.hpp"
24 : #include "Tools/Thread/Thread_pool/Standard/Thread_pool_standard.hpp"
25 :
26 : using namespace spu;
27 : using namespace spu::runtime;
28 :
29 0 : Sequence::Sequence(const std::vector<const runtime::Task*>& firsts,
30 : const std::vector<const runtime::Task*>& lasts,
31 : const std::vector<const runtime::Task*>& exclusions,
32 : const size_t n_threads,
33 : const bool thread_pinning,
34 : const std::vector<size_t>& puids,
35 0 : const bool memory_allocation)
36 0 : : n_threads(n_threads)
37 0 : , sequences(n_threads, nullptr)
38 0 : , modules(n_threads)
39 0 : , all_modules(n_threads)
40 0 : , mtx_exception(new std::mutex())
41 0 : , force_exit_loop(new std::atomic<bool>(false))
42 0 : , tasks_inplace(false)
43 0 : , thread_pinning(thread_pinning)
44 0 : , puids(puids)
45 0 : , no_copy_mode(true)
46 0 : , saved_exclusions(exclusions)
47 0 : , switchers_reset(n_threads)
48 0 : , auto_stop(true)
49 0 : , is_part_of_pipeline(false)
50 0 : , next_round_is_over(n_threads, false)
51 0 : , cur_task_id(n_threads, 0)
52 0 : , cur_ss(n_threads, nullptr)
53 0 : , memory_allocation(memory_allocation)
54 : {
55 : #ifndef SPU_HWLOC
56 : if (thread_pinning)
57 : std::clog << rang::tag::warning
58 : << "StreamPU has not been linked with the 'hwloc' library, the 'thread_pinning' "
59 : "option of the 'runtime::Sequence' will have no effect."
60 : << std::endl;
61 : #endif
62 :
63 0 : if (thread_pinning && puids.size() < n_threads)
64 : {
65 0 : std::stringstream message;
66 0 : message << "'puids.size()' has to be greater or equal to 'n_threads' ('puids.size()' = " << puids.size()
67 0 : << " , 'n_threads' = " << n_threads << ").";
68 0 : throw tools::invalid_argument(__FILE__, __LINE__, __func__, message.str());
69 0 : }
70 :
71 0 : this->init<runtime::Sub_sequence_const, const runtime::Task>(firsts, lasts, exclusions);
72 0 : }
73 :
74 0 : Sequence::Sequence(const std::vector<const runtime::Task*>& firsts,
75 : const std::vector<const runtime::Task*>& lasts,
76 : const size_t n_threads,
77 : const bool thread_pinning,
78 : const std::vector<size_t>& puids,
79 0 : const bool memory_allocation)
80 0 : : Sequence(firsts, lasts, {}, n_threads, thread_pinning, puids, memory_allocation)
81 : {
82 0 : }
83 :
84 0 : Sequence::Sequence(const std::vector<const runtime::Task*>& firsts,
85 : const size_t n_threads,
86 : const bool thread_pinning,
87 : const std::vector<size_t>& puids,
88 0 : const bool memory_allocation)
89 0 : : Sequence(firsts, {}, {}, n_threads, thread_pinning, puids, memory_allocation)
90 : {
91 0 : }
92 :
93 0 : Sequence::Sequence(const runtime::Task& first,
94 : const runtime::Task& last,
95 : const size_t n_threads,
96 : const bool thread_pinning,
97 : const std::vector<size_t>& puids,
98 0 : const bool memory_allocation)
99 0 : : Sequence({ &first }, { &last }, n_threads, thread_pinning, puids, memory_allocation)
100 : {
101 0 : }
102 :
103 0 : Sequence::Sequence(const runtime::Task& first,
104 : const size_t n_threads,
105 : const bool thread_pinning,
106 : const std::vector<size_t>& puids,
107 0 : const bool memory_allocation)
108 0 : : Sequence({ &first }, n_threads, thread_pinning, puids, memory_allocation)
109 : {
110 0 : }
111 :
112 : std::vector<const runtime::Task*>
113 591 : exclusions_convert_to_const(const std::vector<runtime::Task*>& exclusions)
114 : {
115 591 : std::vector<const runtime::Task*> exclusions_const;
116 637 : for (auto exception : exclusions)
117 46 : exclusions_const.push_back(exception);
118 591 : return exclusions_const;
119 0 : }
120 :
121 526 : Sequence::Sequence(const std::vector<runtime::Task*>& firsts,
122 : const std::vector<runtime::Task*>& lasts,
123 : const std::vector<runtime::Task*>& exclusions,
124 : const size_t n_threads,
125 : const bool thread_pinning,
126 : const std::vector<size_t>& puids,
127 : const bool tasks_inplace,
128 526 : const bool memory_allocation)
129 526 : : n_threads(n_threads)
130 526 : , sequences(n_threads, nullptr)
131 526 : , modules(tasks_inplace ? n_threads - 1 : n_threads)
132 526 : , all_modules(n_threads)
133 526 : , mtx_exception(new std::mutex())
134 526 : , force_exit_loop(new std::atomic<bool>(false))
135 526 : , tasks_inplace(tasks_inplace)
136 526 : , thread_pinning(thread_pinning)
137 526 : , puids(puids)
138 526 : , no_copy_mode(true)
139 526 : , saved_exclusions(exclusions_convert_to_const(exclusions))
140 526 : , switchers_reset(n_threads)
141 526 : , auto_stop(true)
142 526 : , is_part_of_pipeline(false)
143 526 : , next_round_is_over(n_threads, false)
144 526 : , cur_task_id(n_threads, 0)
145 526 : , cur_ss(n_threads, nullptr)
146 2630 : , memory_allocation(memory_allocation)
147 : {
148 526 : if (thread_pinning && puids.size() < n_threads)
149 : {
150 0 : std::stringstream message;
151 0 : message << "'puids.size()' has greater or equal to 'n_threads' ('puids.size()' = " << puids.size()
152 0 : << " , 'n_threads' = " << n_threads << ").";
153 0 : throw tools::invalid_argument(__FILE__, __LINE__, __func__, message.str());
154 0 : }
155 :
156 526 : if (tasks_inplace)
157 526 : this->init<runtime::Sub_sequence, runtime::Task>(firsts, lasts, exclusions);
158 : else
159 : {
160 0 : std::vector<const runtime::Task*> firsts_bis;
161 0 : for (auto first : firsts)
162 0 : firsts_bis.push_back(first);
163 0 : std::vector<const runtime::Task*> lasts_bis;
164 0 : for (auto last : lasts)
165 0 : lasts_bis.push_back(last);
166 0 : std::vector<const runtime::Task*> exclusions_bis;
167 0 : for (auto exception : exclusions)
168 0 : exclusions_bis.push_back(exception);
169 0 : this->init<runtime::Sub_sequence_const, const runtime::Task>(firsts_bis, lasts_bis, exclusions_bis);
170 0 : }
171 526 : }
172 :
173 106 : Sequence::Sequence(const std::vector<runtime::Task*>& firsts,
174 : const std::vector<runtime::Task*>& lasts,
175 : const size_t n_threads,
176 : const bool thread_pinning,
177 : const std::vector<size_t>& puids,
178 : const bool tasks_inplace,
179 106 : const bool memory_allocation)
180 106 : : Sequence(firsts, lasts, {}, n_threads, thread_pinning, puids, tasks_inplace, memory_allocation)
181 : {
182 106 : }
183 :
184 112 : Sequence::Sequence(const std::vector<runtime::Task*>& firsts,
185 : const size_t n_threads,
186 : const bool thread_pinning,
187 : const std::vector<size_t>& puids,
188 : const bool tasks_inplace,
189 112 : const bool memory_allocation)
190 112 : : Sequence(firsts, {}, {}, n_threads, thread_pinning, puids, tasks_inplace, memory_allocation)
191 : {
192 112 : }
193 :
194 2 : Sequence::Sequence(runtime::Task& first,
195 : runtime::Task& last,
196 : const size_t n_threads,
197 : const bool thread_pinning,
198 : const std::vector<size_t>& puids,
199 : const bool tasks_inplace,
200 2 : const bool memory_allocation)
201 2 : : Sequence({ &first }, { &last }, n_threads, thread_pinning, puids, tasks_inplace, memory_allocation)
202 : {
203 2 : }
204 :
205 112 : Sequence::Sequence(runtime::Task& first,
206 : const size_t n_threads,
207 : const bool thread_pinning,
208 : const std::vector<size_t>& puids,
209 : const bool tasks_inplace,
210 112 : const bool memory_allocation)
211 112 : : Sequence({ &first }, n_threads, thread_pinning, puids, tasks_inplace, memory_allocation)
212 : {
213 112 : }
214 : //=======================================New pinning version constructors===============================================
215 0 : Sequence::Sequence(const std::vector<const runtime::Task*>& firsts,
216 : const std::vector<const runtime::Task*>& lasts,
217 : const std::vector<const runtime::Task*>& exclusions,
218 : const size_t n_threads,
219 : const bool thread_pinning,
220 : const std::string& sequence_pinning_policy,
221 0 : const bool memory_allocation)
222 0 : : n_threads(n_threads)
223 0 : , sequences(n_threads, nullptr)
224 0 : , modules(n_threads)
225 0 : , all_modules(n_threads)
226 0 : , mtx_exception(new std::mutex())
227 0 : , force_exit_loop(new std::atomic<bool>(false))
228 0 : , tasks_inplace(false)
229 0 : , thread_pinning(thread_pinning)
230 0 : , puids({})
231 0 : , no_copy_mode(true)
232 0 : , saved_exclusions(exclusions)
233 0 : , switchers_reset(n_threads)
234 0 : , auto_stop(true)
235 0 : , is_part_of_pipeline(false)
236 0 : , next_round_is_over(n_threads, false)
237 0 : , cur_task_id(n_threads, 0)
238 0 : , cur_ss(n_threads, nullptr)
239 0 : , memory_allocation(memory_allocation)
240 : {
241 : #ifndef SPU_HWLOC
242 : if (thread_pinning)
243 : std::clog << rang::tag::warning
244 : << "StreamPU has not been linked with the 'hwloc' library, the 'thread_pinning' "
245 : "option of the 'runtime::Sequence' will have no effect."
246 : << std::endl;
247 : #endif
248 :
249 0 : if (thread_pinning && !sequence_pinning_policy.empty())
250 : {
251 0 : pin_objects_per_thread = tools::Thread_pinning_utils::stage_parser_unpacker(sequence_pinning_policy, n_threads);
252 : }
253 0 : else if (thread_pinning && sequence_pinning_policy.empty())
254 : {
255 0 : std::stringstream message;
256 0 : message << "Pinning is activated but there is no specified policy." << std::endl;
257 0 : throw tools::invalid_argument(__FILE__, __LINE__, __func__, message.str());
258 0 : }
259 :
260 0 : this->init<runtime::Sub_sequence_const, const runtime::Task>(firsts, lasts, exclusions);
261 0 : }
262 :
263 0 : Sequence::Sequence(const std::vector<const runtime::Task*>& firsts,
264 : const std::vector<const runtime::Task*>& lasts,
265 : const size_t n_threads,
266 : const bool thread_pinning,
267 : const std::string& sequence_pinning_policy,
268 0 : const bool memory_allocation)
269 0 : : Sequence(firsts, lasts, {}, n_threads, thread_pinning, sequence_pinning_policy, memory_allocation)
270 : {
271 0 : }
272 :
273 0 : Sequence::Sequence(const std::vector<const runtime::Task*>& firsts,
274 : const size_t n_threads,
275 : const bool thread_pinning,
276 : const std::string& sequence_pinning_policy,
277 0 : const bool memory_allocation)
278 0 : : Sequence(firsts, {}, {}, n_threads, thread_pinning, sequence_pinning_policy, memory_allocation)
279 : {
280 0 : }
281 :
282 0 : Sequence::Sequence(const runtime::Task& first,
283 : const runtime::Task& last,
284 : const size_t n_threads,
285 : const bool thread_pinning,
286 : const std::string& sequence_pinning_policy,
287 0 : const bool memory_allocation)
288 0 : : Sequence({ &first }, { &last }, n_threads, thread_pinning, sequence_pinning_policy, memory_allocation)
289 : {
290 0 : }
291 :
292 0 : Sequence::Sequence(const runtime::Task& first,
293 : const size_t n_threads,
294 : const bool thread_pinning,
295 : const std::string& sequence_pinning_policy,
296 0 : const bool memory_allocation)
297 0 : : Sequence({ &first }, n_threads, thread_pinning, sequence_pinning_policy, memory_allocation)
298 : {
299 0 : }
300 :
301 65 : Sequence::Sequence(const std::vector<runtime::Task*>& firsts,
302 : const std::vector<runtime::Task*>& lasts,
303 : const std::vector<runtime::Task*>& exclusions,
304 : const size_t n_threads,
305 : const bool thread_pinning,
306 : const std::string& sequence_pinning_policy,
307 : const bool tasks_inplace,
308 65 : const bool memory_allocation)
309 65 : : n_threads(n_threads)
310 65 : , sequences(n_threads, nullptr)
311 65 : , modules(tasks_inplace ? n_threads - 1 : n_threads)
312 65 : , all_modules(n_threads)
313 65 : , mtx_exception(new std::mutex())
314 65 : , force_exit_loop(new std::atomic<bool>(false))
315 65 : , tasks_inplace(tasks_inplace)
316 65 : , thread_pinning(thread_pinning)
317 65 : , puids({})
318 65 : , no_copy_mode(true)
319 65 : , saved_exclusions(exclusions_convert_to_const(exclusions))
320 65 : , switchers_reset(n_threads)
321 65 : , auto_stop(true)
322 65 : , is_part_of_pipeline(false)
323 65 : , next_round_is_over(n_threads, false)
324 65 : , cur_task_id(n_threads, 0)
325 65 : , cur_ss(n_threads, nullptr)
326 325 : , memory_allocation(memory_allocation)
327 : {
328 65 : if (thread_pinning && !sequence_pinning_policy.empty())
329 : {
330 29 : pin_objects_per_thread = tools::Thread_pinning_utils::stage_parser_unpacker(sequence_pinning_policy, n_threads);
331 : }
332 36 : else if (thread_pinning && sequence_pinning_policy.empty())
333 : {
334 0 : std::stringstream message;
335 0 : message << "Pinning is activated but there is no specified policy." << std::endl;
336 0 : throw tools::invalid_argument(__FILE__, __LINE__, __func__, message.str());
337 0 : }
338 :
339 65 : if (tasks_inplace)
340 65 : this->init<runtime::Sub_sequence, runtime::Task>(firsts, lasts, exclusions);
341 : else
342 : {
343 0 : std::vector<const runtime::Task*> firsts_bis;
344 0 : for (auto first : firsts)
345 0 : firsts_bis.push_back(first);
346 0 : std::vector<const runtime::Task*> lasts_bis;
347 :
348 0 : for (auto last : lasts)
349 0 : lasts_bis.push_back(last);
350 0 : std::vector<const runtime::Task*> exclusions_bis;
351 :
352 0 : for (auto exception : exclusions)
353 0 : exclusions_bis.push_back(exception);
354 0 : this->init<runtime::Sub_sequence_const, const runtime::Task>(firsts_bis, lasts_bis, exclusions_bis);
355 0 : }
356 65 : }
357 :
358 0 : Sequence::Sequence(const std::vector<runtime::Task*>& firsts,
359 : const std::vector<runtime::Task*>& lasts,
360 : const size_t n_threads,
361 : const bool thread_pinning,
362 : const std::string& sequence_pinning_policy,
363 : const bool tasks_inplace,
364 0 : const bool memory_allocation)
365 0 : : Sequence(firsts, lasts, {}, n_threads, thread_pinning, sequence_pinning_policy, tasks_inplace, memory_allocation)
366 : {
367 0 : }
368 :
369 0 : Sequence::Sequence(const std::vector<runtime::Task*>& firsts,
370 : const size_t n_threads,
371 : const bool thread_pinning,
372 : const std::string& sequence_pinning_policy,
373 : const bool tasks_inplace,
374 0 : const bool memory_allocation)
375 0 : : Sequence(firsts, {}, {}, n_threads, thread_pinning, sequence_pinning_policy, tasks_inplace, memory_allocation)
376 : {
377 0 : }
378 :
379 0 : Sequence::Sequence(runtime::Task& first,
380 : runtime::Task& last,
381 : const size_t n_threads,
382 : const bool thread_pinning,
383 : const std::string& sequence_pinning_policy,
384 : const bool tasks_inplace,
385 0 : const bool memory_allocation)
386 : : Sequence({ &first },
387 : { &last },
388 : n_threads,
389 : thread_pinning,
390 : sequence_pinning_policy,
391 : tasks_inplace,
392 0 : memory_allocation)
393 : {
394 0 : }
395 :
396 0 : Sequence::Sequence(runtime::Task& first,
397 : const size_t n_threads,
398 : const bool thread_pinning,
399 : const std::string& sequence_pinning_policy,
400 : const bool tasks_inplace,
401 0 : const bool memory_allocation)
402 0 : : Sequence({ &first }, n_threads, thread_pinning, sequence_pinning_policy, tasks_inplace, memory_allocation)
403 : {
404 0 : }
405 :
406 : // ====================================================================================================================
407 :
408 1900 : Sequence::~Sequence()
409 : {
410 : // The following line is commented because it leads to double free correction in AFF3CT because the tasks can be
411 : // freed before the sequence :-(
412 : // if (this->memory_allocation) tools::Buffer_allocator::deallocate_sequence_memory(this);
413 :
414 687 : std::vector<tools::Digraph_node<Sub_sequence>*> already_deleted_nodes;
415 6782 : for (auto s : this->sequences)
416 6095 : this->delete_tree(s, already_deleted_nodes);
417 1213 : }
418 :
419 : template<class SS, class TA>
420 : void
421 687 : Sequence::init(const std::vector<TA*>& firsts, const std::vector<TA*>& lasts, const std::vector<TA*>& exclusions)
422 : {
423 687 : if (this->is_thread_pinning())
424 : {
425 29 : if (!this->puids.empty())
426 0 : tools::Thread_pinning::pin(this->puids[0]);
427 : else
428 29 : tools::Thread_pinning::pin(this->pin_objects_per_thread[0]);
429 : }
430 :
431 687 : if (firsts.size() == 0)
432 : {
433 0 : std::stringstream message;
434 0 : message << "'firsts.size()' has to be strictly greater than 0.";
435 0 : throw tools::invalid_argument(__FILE__, __LINE__, __func__, message.str());
436 0 : }
437 :
438 687 : if (this->n_threads == 0)
439 : {
440 0 : std::stringstream message;
441 0 : message << "'n_threads' has to be strictly greater than 0.";
442 0 : throw tools::invalid_argument(__FILE__, __LINE__, __func__, message.str());
443 0 : }
444 :
445 733 : for (auto exclusion : exclusions)
446 : {
447 46 : if (std::find(firsts.begin(), firsts.end(), exclusion) != firsts.end())
448 : {
449 0 : std::stringstream message;
450 : message << "'exclusion' can't be contained in the 'firsts' vector ("
451 : << "'exclusion'"
452 0 : << " = " << +exclusion << ", "
453 : << "'exclusion->get_name()'"
454 0 : << " = " << exclusion->get_name() << ").";
455 0 : throw tools::invalid_argument(__FILE__, __LINE__, __func__, message.str());
456 0 : }
457 :
458 46 : if (std::find(lasts.begin(), lasts.end(), exclusion) != lasts.end())
459 : {
460 0 : std::stringstream message;
461 : message << "'exclusion' can't be contained in the 'lasts' vector ("
462 : << "'exclusion'"
463 0 : << " = " << +exclusion << ", "
464 : << "'exclusion->get_name()'"
465 0 : << " = " << exclusion->get_name() << ").";
466 0 : throw tools::invalid_argument(__FILE__, __LINE__, __func__, message.str());
467 0 : }
468 : }
469 1205 : for (auto t : lasts)
470 : {
471 518 : if (dynamic_cast<const module::Switcher*>(&t->get_module()) && t->get_name() == "commute")
472 : {
473 0 : std::stringstream message;
474 0 : message << "A sequence cannot end with a 'commute' task.";
475 0 : throw tools::invalid_argument(__FILE__, __LINE__, __func__, message.str());
476 0 : }
477 : }
478 :
479 687 : auto root = new tools::Digraph_node<SS>({}, {}, nullptr, 0);
480 687 : root->set_contents(nullptr);
481 687 : size_t ssid = 0, taid = 0;
482 687 : std::vector<TA*> switchers;
483 687 : std::vector<std::pair<TA*, tools::Digraph_node<SS>*>> selectors;
484 687 : std::vector<TA*> real_lasts;
485 :
486 687 : this->lasts_tasks_id.clear();
487 687 : this->firsts_tasks_id.clear();
488 687 : auto last_subseq = root;
489 687 : std::map<TA*, unsigned> in_sockets_feed;
490 1540 : for (auto first : firsts)
491 : {
492 853 : std::map<TA*, std::pair<tools::Digraph_node<SS>*, size_t>> task_subseq;
493 853 : auto contents = last_subseq->get_contents();
494 853 : this->firsts_tasks_id.push_back(contents ? contents->tasks_id[contents->tasks_id.size() - 1] : 0);
495 853 : last_subseq = this->init_recursive<SS, TA>(last_subseq,
496 : ssid,
497 : taid,
498 : selectors,
499 : switchers,
500 : *first,
501 : *first,
502 : lasts,
503 : exclusions,
504 853 : this->lasts_tasks_id,
505 : real_lasts,
506 : in_sockets_feed,
507 : task_subseq);
508 : }
509 :
510 687 : std::stringstream real_lasts_ss;
511 1769 : for (size_t rl = 0; rl < real_lasts.size(); rl++)
512 : real_lasts_ss << "'real_lasts"
513 1082 : << "[" << rl << "]'"
514 1082 : << " = " << +real_lasts[rl] << ", "
515 : << "'real_lasts"
516 1082 : << "[" << rl << "]->get_name()'"
517 1082 : << " = " << real_lasts[rl]->get_name() << ((rl < real_lasts.size() - 1) ? ", " : "");
518 :
519 1205 : for (auto last : lasts)
520 : {
521 518 : if (std::find(real_lasts.begin(), real_lasts.end(), last) == real_lasts.end())
522 : {
523 0 : std::stringstream message;
524 0 : message << "'last' is not contained in the 'real_lasts[" << real_lasts.size() << "]' vector ("
525 : << "'last'"
526 0 : << " = " << +last << ", "
527 : << "'last->get_name()'"
528 0 : << " = " << last->get_name() << ", " << real_lasts_ss.str() << ").";
529 0 : throw tools::runtime_error(__FILE__, __LINE__, __func__, message.str());
530 0 : }
531 : }
532 :
533 687 : this->n_tasks = taid;
534 : // this->check_ctrl_flow(root); // /!\ this check has been commented because it is known to do not work in the
535 : // general case
536 687 : this->_init<SS>(root);
537 687 : this->update_firsts_and_lasts_tasks();
538 687 : this->gen_processes();
539 687 : this->donners = get_modules<tools::Interface_is_done>(true);
540 :
541 6782 : for (size_t tid = 0; tid < this->n_threads; tid++)
542 47451 : for (auto& mdl : this->all_modules[tid])
543 41356 : if (auto swi = dynamic_cast<module::Switcher*>(mdl))
544 2323 : this->switchers_reset[tid].push_back(dynamic_cast<tools::Interface_reset*>(swi));
545 :
546 6782 : for (size_t tid = 0; tid < this->sequences.size(); tid++)
547 : {
548 6095 : this->cur_ss[tid] = this->sequences[tid];
549 : }
550 :
551 : // Allocating Memory if requested
552 687 : if (this->memory_allocation)
553 : {
554 314 : tools::Buffer_allocator::allocate_sequence_memory(this);
555 : }
556 :
557 687 : this->thread_pool.reset(new tools::Thread_pool_standard(this->n_threads - 1));
558 687 : this->thread_pool->init(); // threads are spawned here
559 687 : }
560 :
561 : Sequence*
562 96 : Sequence::clone() const
563 : {
564 96 : auto c = new Sequence(*this);
565 :
566 96 : c->tasks_inplace = false;
567 96 : c->modules.resize(c->get_n_threads());
568 :
569 96 : std::vector<const runtime::Task*> firsts_tasks;
570 192 : for (auto ta : this->get_firsts_tasks()[0])
571 96 : firsts_tasks.push_back(ta);
572 :
573 96 : std::vector<const runtime::Task*> lasts_tasks;
574 192 : for (auto ta : this->get_lasts_tasks()[0])
575 96 : lasts_tasks.push_back(ta);
576 :
577 96 : c->init<runtime::Sub_sequence_const, const runtime::Task>(firsts_tasks, lasts_tasks, this->saved_exclusions);
578 96 : c->mtx_exception.reset(new std::mutex());
579 96 : c->force_exit_loop.reset(new std::atomic<bool>(false));
580 96 : return c;
581 96 : }
582 :
583 : void
584 0 : Sequence::set_thread_pinning(const bool thread_pinning, const std::vector<size_t>& puids)
585 : {
586 0 : if (thread_pinning && puids.size() < n_threads)
587 : {
588 0 : std::stringstream message;
589 0 : message << "'puids.size()' has greater or equal to 'n_threads' ('puids.size()' = " << puids.size()
590 0 : << " , 'n_threads' = " << n_threads << ").";
591 0 : throw tools::invalid_argument(__FILE__, __LINE__, __func__, message.str());
592 0 : }
593 :
594 0 : this->thread_pinning = thread_pinning;
595 0 : this->puids = puids;
596 0 : this->pin_objects_per_thread = {};
597 0 : }
598 :
599 : void
600 0 : Sequence::set_thread_pinning(const bool thread_pinning, const std::string& sequence_pinning_policy)
601 : {
602 0 : this->thread_pinning = thread_pinning;
603 0 : this->puids = {};
604 : this->pin_objects_per_thread =
605 0 : tools::Thread_pinning_utils::stage_parser_unpacker(sequence_pinning_policy, n_threads);
606 0 : }
607 :
608 : bool
609 63179 : Sequence::is_thread_pinning()
610 : {
611 63179 : return this->thread_pinning;
612 : }
613 :
614 : std::vector<std::vector<module::Module*>>
615 0 : Sequence::get_modules_per_threads() const
616 : {
617 0 : std::vector<std::vector<module::Module*>> modules_per_threads(this->all_modules.size());
618 0 : size_t tid = 0;
619 0 : for (auto& e : this->all_modules)
620 : {
621 0 : for (auto& ee : e)
622 0 : modules_per_threads[tid].push_back(ee);
623 0 : tid++;
624 : }
625 0 : return modules_per_threads;
626 0 : }
627 :
628 : std::vector<std::vector<module::Module*>>
629 2 : Sequence::get_modules_per_types() const
630 : {
631 2 : std::vector<std::vector<module::Module*>> modules_per_types(this->all_modules[0].size());
632 4 : for (auto& e : this->all_modules)
633 : {
634 2 : size_t mid = 0;
635 33 : for (auto& ee : e)
636 31 : modules_per_types[mid++].push_back(ee);
637 : }
638 2 : return modules_per_types;
639 0 : }
640 :
641 : std::vector<std::vector<runtime::Task*>>
642 5818 : Sequence::get_tasks_per_threads() const
643 : {
644 5818 : std::vector<std::vector<runtime::Task*>> tasks_per_threads(this->n_threads);
645 :
646 : std::function<void(
647 : tools::Digraph_node<Sub_sequence>*, const size_t, std::vector<tools::Digraph_node<Sub_sequence>*>&)>
648 200605 : get_tasks_recursive = [&](tools::Digraph_node<Sub_sequence>* cur_ss,
649 : const size_t tid,
650 : std::vector<tools::Digraph_node<Sub_sequence>*>& already_parsed_nodes)
651 : {
652 200605 : if (std::find(already_parsed_nodes.begin(), already_parsed_nodes.end(), cur_ss) == already_parsed_nodes.end())
653 : {
654 191641 : already_parsed_nodes.push_back(cur_ss);
655 383291 : tasks_per_threads[tid].insert(
656 191646 : tasks_per_threads[tid].end(), cur_ss->get_c()->tasks.begin(), cur_ss->get_c()->tasks.end());
657 :
658 233930 : for (auto c : cur_ss->get_children())
659 42288 : get_tasks_recursive(c, tid, already_parsed_nodes);
660 : }
661 206423 : };
662 :
663 164140 : for (size_t tid = 0; tid < this->n_threads; tid++)
664 : {
665 158319 : std::vector<tools::Digraph_node<Sub_sequence>*> already_parsed_nodes;
666 158317 : get_tasks_recursive(this->sequences[tid], tid, already_parsed_nodes);
667 158318 : }
668 :
669 11643 : return tasks_per_threads;
670 5821 : }
671 :
672 : std::vector<std::vector<runtime::Task*>>
673 29 : Sequence::get_tasks_per_types() const
674 : {
675 29 : std::vector<std::vector<runtime::Task*>> tasks_per_types(this->n_tasks);
676 :
677 : std::function<void(tools::Digraph_node<Sub_sequence>*, size_t&, std::vector<tools::Digraph_node<Sub_sequence>*>&)>
678 187 : get_tasks_recursive = [&](tools::Digraph_node<Sub_sequence>* cur_ss,
679 : size_t& mid,
680 : std::vector<tools::Digraph_node<Sub_sequence>*>& already_parsed_nodes)
681 : {
682 187 : if (std::find(already_parsed_nodes.begin(), already_parsed_nodes.end(), cur_ss) == already_parsed_nodes.end())
683 : {
684 187 : already_parsed_nodes.push_back(cur_ss);
685 1615 : for (auto& t : cur_ss->get_c()->tasks)
686 1428 : tasks_per_types[mid++].push_back(t);
687 :
688 187 : for (auto c : cur_ss->get_children())
689 0 : get_tasks_recursive(c, mid, already_parsed_nodes);
690 : }
691 216 : };
692 :
693 216 : for (size_t tid = 0; tid < this->n_threads; tid++)
694 : {
695 187 : size_t mid = 0;
696 187 : std::vector<tools::Digraph_node<Sub_sequence>*> already_parsed_nodes;
697 187 : get_tasks_recursive(this->sequences[tid], mid, already_parsed_nodes);
698 187 : }
699 :
700 58 : return tasks_per_types;
701 29 : }
702 :
703 : bool
704 901129 : Sequence::is_done() const
705 : {
706 912818 : for (auto donner : this->donners)
707 11750 : if (donner->is_done()) return true;
708 898022 : return false;
709 : }
710 :
711 : void
712 373 : Sequence::allocate_outbuffers()
713 : {
714 373 : if (!this->memory_allocation)
715 : {
716 373 : tools::Buffer_allocator::allocate_sequence_memory(this);
717 373 : this->memory_allocation = true;
718 : }
719 : else
720 : {
721 0 : std::stringstream message;
722 0 : message << "The memory is already allocated for this sequence";
723 0 : throw tools::invalid_argument(__FILE__, __LINE__, __func__, message.str());
724 0 : }
725 373 : }
726 :
727 : void
728 373 : Sequence::deallocate_outbuffers()
729 : {
730 373 : if (this->memory_allocation)
731 : {
732 373 : tools::Buffer_allocator::deallocate_sequence_memory(this);
733 373 : this->memory_allocation = false;
734 : }
735 : else
736 : {
737 0 : std::stringstream message;
738 0 : message << "Sequence memory is not allocated";
739 0 : throw tools::invalid_argument(__FILE__, __LINE__, __func__, message.str());
740 0 : }
741 373 : }
742 :
743 : void
744 0 : Sequence::_exec(const size_t tid,
745 : std::function<bool(const std::vector<const int*>&)>& stop_condition,
746 : tools::Digraph_node<Sub_sequence>* sequence)
747 : {
748 0 : tools::Signal_handler::reset_sigint();
749 :
750 0 : if (this->is_thread_pinning())
751 : {
752 0 : if (!puids.empty())
753 0 : tools::Thread_pinning::pin(this->puids[tid]);
754 : else
755 0 : tools::Thread_pinning::pin(this->pin_objects_per_thread[tid]);
756 : }
757 :
758 : std::function<void(tools::Digraph_node<Sub_sequence>*, std::vector<const int*>&)> exec_sequence =
759 0 : [&exec_sequence](tools::Digraph_node<Sub_sequence>* cur_ss, std::vector<const int*>& statuses)
760 : {
761 0 : auto type = cur_ss->get_c()->type;
762 0 : auto& tasks_id = cur_ss->get_c()->tasks_id;
763 0 : auto& processes = cur_ss->get_c()->processes;
764 :
765 0 : if (type == subseq_t::COMMUTE)
766 : {
767 0 : statuses[tasks_id[0]] = processes[0]();
768 0 : const int path = statuses[tasks_id[0]][0];
769 0 : if (cur_ss->get_children().size() > (size_t)path) exec_sequence(cur_ss->get_children()[path], statuses);
770 : }
771 : else
772 : {
773 0 : for (size_t p = 0; p < processes.size(); p++)
774 0 : statuses[tasks_id[p]] = processes[p]();
775 0 : for (auto c : cur_ss->get_children())
776 0 : exec_sequence(c, statuses);
777 : }
778 0 : };
779 :
780 0 : std::vector<const int*> statuses(this->n_tasks, nullptr);
781 : try
782 : {
783 : do
784 : {
785 : // force switchers reset to reinitialize the path to the last input socket
786 0 : for (size_t s = 0; s < this->switchers_reset[tid].size(); s++)
787 0 : this->switchers_reset[tid][s]->reset();
788 :
789 0 : std::fill(statuses.begin(), statuses.end(), nullptr);
790 : try
791 : {
792 0 : exec_sequence(sequence, statuses);
793 : }
794 0 : catch (tools::processing_aborted const&)
795 : {
796 : // do nothing, this is normal
797 0 : }
798 0 : } while (!*force_exit_loop && !stop_condition(statuses) && !tools::Signal_handler::is_sigint());
799 :
800 0 : if (tools::Signal_handler::is_sigint())
801 0 : for (auto& m : this->get_modules<tools::Interface_waiting>())
802 0 : m->cancel_waiting();
803 : }
804 0 : catch (tools::waiting_canceled const&)
805 : {
806 : // do nothing, this is normal
807 0 : }
808 0 : catch (std::exception const& e)
809 : {
810 0 : *force_exit_loop = true;
811 :
812 0 : this->mtx_exception->lock();
813 :
814 0 : auto save = tools::exception::no_stacktrace;
815 0 : tools::exception::no_stacktrace = true;
816 0 : std::string msg = e.what(); // get only the function signature
817 0 : tools::exception::no_stacktrace = save;
818 :
819 0 : if (std::find(this->prev_exception_messages.begin(), this->prev_exception_messages.end(), msg) ==
820 0 : this->prev_exception_messages.end())
821 : {
822 0 : this->prev_exception_messages.push_back(msg); // save only the function signature
823 0 : this->prev_exception_messages_to_display.push_back(e.what()); // with stacktrace if debug mode
824 : }
825 0 : else if (std::strlen(e.what()) > this->prev_exception_messages_to_display.back().size())
826 0 : this->prev_exception_messages_to_display[prev_exception_messages_to_display.size() - 1] = e.what();
827 :
828 0 : this->mtx_exception->unlock();
829 0 : }
830 :
831 0 : if (this->is_thread_pinning()) tools::Thread_pinning::unpin();
832 0 : }
833 :
834 : void
835 3412 : Sequence::_exec_without_statuses(const size_t tid,
836 : std::function<bool()>& stop_condition,
837 : tools::Digraph_node<Sub_sequence>* sequence)
838 : {
839 3412 : tools::Signal_handler::reset_sigint();
840 :
841 3455 : if (this->is_thread_pinning())
842 : {
843 40 : if (!puids.empty())
844 0 : tools::Thread_pinning::pin(this->puids[tid]);
845 : else
846 40 : tools::Thread_pinning::pin(this->pin_objects_per_thread[tid]);
847 : }
848 :
849 : std::function<void(tools::Digraph_node<Sub_sequence>*)> exec_sequence =
850 1327530 : [&exec_sequence](tools::Digraph_node<Sub_sequence>* cur_ss)
851 : {
852 1111665 : auto type = cur_ss->get_c()->type;
853 1113206 : auto& processes = cur_ss->get_c()->processes;
854 :
855 1116177 : if (type == subseq_t::COMMUTE)
856 : {
857 60812 : const int path = processes[0]()[0];
858 60835 : if (cur_ss->get_children().size() > (size_t)path) exec_sequence(cur_ss->get_children()[path]);
859 : }
860 : else
861 : {
862 4514416 : for (auto& process : processes)
863 3496721 : process();
864 947090 : for (auto c : cur_ss->get_children())
865 155017 : exec_sequence(c);
866 : }
867 1112663 : };
868 :
869 : try
870 : {
871 : do
872 : {
873 : // force switchers reset to reinitialize the path to the last input socket
874 917357 : for (size_t s = 0; s < this->switchers_reset[tid].size(); s++)
875 14900 : this->switchers_reset[tid][s]->reset();
876 :
877 : try
878 : {
879 902429 : exec_sequence(sequence);
880 : }
881 3028 : catch (tools::processing_aborted const&)
882 : {
883 : // do nothing, this is normal
884 5 : }
885 903473 : } while (!*force_exit_loop && !stop_condition() && !tools::Signal_handler::is_sigint());
886 :
887 2504 : if (tools::Signal_handler::is_sigint())
888 0 : for (auto& m : this->get_modules<tools::Interface_waiting>())
889 0 : m->cancel_waiting();
890 : }
891 3007 : catch (tools::waiting_canceled const&)
892 : {
893 : // do nothing, this is normal
894 3021 : }
895 0 : catch (std::exception const& e)
896 : {
897 0 : *force_exit_loop = true;
898 :
899 0 : this->mtx_exception->lock();
900 :
901 0 : auto save = tools::exception::no_stacktrace;
902 0 : tools::exception::no_stacktrace = true;
903 0 : std::string msg = e.what(); // get only the function signature
904 0 : tools::exception::no_stacktrace = save;
905 :
906 0 : if (std::find(this->prev_exception_messages.begin(), this->prev_exception_messages.end(), msg) ==
907 0 : this->prev_exception_messages.end())
908 : {
909 0 : this->prev_exception_messages.push_back(msg); // save only the function signature
910 0 : this->prev_exception_messages_to_display.push_back(e.what()); // with stacktrace if debug mode
911 : }
912 0 : else if (std::strlen(e.what()) > this->prev_exception_messages_to_display.back().size())
913 0 : this->prev_exception_messages_to_display[prev_exception_messages_to_display.size() - 1] = e.what();
914 :
915 0 : this->mtx_exception->unlock();
916 0 : }
917 :
918 4919 : if (this->is_thread_pinning()) tools::Thread_pinning::unpin();
919 4977 : }
920 :
921 : void
922 0 : Sequence::exec(std::function<bool(const std::vector<const int*>&)> stop_condition)
923 : {
924 0 : if (this->is_no_copy_mode()) this->gen_processes(true);
925 :
926 0 : std::function<bool(const std::vector<const int*>&)> real_stop_condition;
927 0 : if (this->auto_stop)
928 0 : real_stop_condition = [this, stop_condition](const std::vector<const int*>& statuses)
929 : {
930 0 : bool res = stop_condition(statuses);
931 0 : return res || this->is_done();
932 0 : };
933 : else
934 0 : real_stop_condition = stop_condition;
935 :
936 0 : std::function<void(const size_t)> func_exec = [this, &real_stop_condition](const size_t tid)
937 0 : { this->Sequence::_exec(tid + 1, real_stop_condition, this->sequences[tid + 1]); };
938 :
939 0 : this->thread_pool->run(func_exec, true);
940 0 : this->_exec(0, real_stop_condition, this->sequences[0]);
941 0 : this->thread_pool->wait();
942 :
943 0 : this->thread_pool->unset_func_exec();
944 :
945 0 : if (this->is_no_copy_mode() && !this->is_part_of_pipeline)
946 : {
947 0 : this->reset_no_copy_mode();
948 0 : this->gen_processes(false);
949 : }
950 :
951 0 : if (!this->prev_exception_messages_to_display.empty())
952 : {
953 0 : *force_exit_loop = false;
954 0 : throw std::runtime_error(this->prev_exception_messages_to_display.back());
955 : }
956 0 : }
957 :
958 : void
959 426 : Sequence::exec(std::function<bool()> stop_condition)
960 : {
961 426 : if (this->is_no_copy_mode()) this->gen_processes(true);
962 :
963 460 : std::function<bool()> real_stop_condition;
964 460 : if (this->auto_stop)
965 1793065 : real_stop_condition = [this, stop_condition]()
966 : {
967 892857 : bool res = stop_condition();
968 902423 : return res || this->is_done();
969 449 : };
970 : else
971 11 : real_stop_condition = stop_condition;
972 :
973 5401 : std::function<void(const size_t)> func_exec = [this, &real_stop_condition](const size_t tid)
974 3013 : { this->Sequence::_exec_without_statuses(tid + 1, real_stop_condition, this->sequences[tid + 1]); };
975 :
976 460 : this->thread_pool->run(func_exec, true);
977 460 : this->_exec_without_statuses(0, real_stop_condition, this->sequences[0]);
978 447 : this->thread_pool->wait();
979 :
980 456 : this->thread_pool->unset_func_exec();
981 :
982 454 : if (this->is_no_copy_mode() && !this->is_part_of_pipeline)
983 : {
984 79 : this->reset_no_copy_mode();
985 79 : this->gen_processes(false);
986 : }
987 :
988 455 : if (!this->prev_exception_messages_to_display.empty())
989 : {
990 0 : *force_exit_loop = false;
991 0 : throw std::runtime_error(this->prev_exception_messages_to_display.back());
992 : }
993 453 : }
994 :
995 : void
996 229 : Sequence::exec()
997 : {
998 582614 : this->exec([]() { return false; });
999 270 : }
1000 :
1001 : void
1002 102 : Sequence::exec_seq(const size_t tid, const int frame_id)
1003 : {
1004 102 : if (tid >= this->sequences.size())
1005 : {
1006 0 : std::stringstream message;
1007 0 : message << "'tid' has to be smaller than 'sequences.size()' ('tid' = " << tid
1008 0 : << ", 'sequences.size()' = " << this->sequences.size() << ").";
1009 0 : throw tools::runtime_error(__FILE__, __LINE__, __func__, message.str());
1010 0 : }
1011 :
1012 : std::function<void(tools::Digraph_node<Sub_sequence>*)> exec_sequence =
1013 811 : [&exec_sequence, frame_id](tools::Digraph_node<Sub_sequence>* cur_ss)
1014 : {
1015 104 : auto type = cur_ss->get_c()->type;
1016 105 : auto& tasks = cur_ss->get_c()->tasks;
1017 103 : if (type == subseq_t::COMMUTE)
1018 : {
1019 0 : const int path = tasks[0]->exec(frame_id)[0];
1020 0 : if (cur_ss->get_children().size() > (size_t)path) exec_sequence(cur_ss->get_children()[path]);
1021 : }
1022 : else
1023 : {
1024 828 : for (size_t ta = 0; ta < tasks.size(); ta++)
1025 708 : tasks[ta]->exec(frame_id);
1026 121 : for (auto c : cur_ss->get_children())
1027 0 : exec_sequence(c);
1028 : }
1029 227 : };
1030 :
1031 108 : exec_sequence(this->sequences[tid]);
1032 123 : }
1033 :
1034 : runtime::Task*
1035 122723 : Sequence::exec_step(const size_t tid, const int frame_id)
1036 : {
1037 122723 : if (tid >= this->sequences.size())
1038 : {
1039 0 : std::stringstream message;
1040 0 : message << "'tid' has to be smaller than 'sequences.size()' ('tid' = " << tid
1041 0 : << ", 'sequences.size()' = " << this->sequences.size() << ").";
1042 0 : throw tools::runtime_error(__FILE__, __LINE__, __func__, message.str());
1043 0 : }
1044 :
1045 122723 : runtime::Task* executed_task = nullptr;
1046 122723 : if (this->next_round_is_over[tid])
1047 : {
1048 6109 : this->next_round_is_over[tid] = false;
1049 6109 : this->cur_ss[tid] = this->sequences[tid];
1050 6109 : this->cur_task_id[tid] = 0;
1051 : }
1052 : else
1053 : {
1054 116614 : executed_task = this->cur_ss[tid]->get_c()->tasks[cur_task_id[tid]];
1055 116614 : const std::vector<int>& ret = executed_task->exec(frame_id);
1056 :
1057 116612 : auto type = this->cur_ss[tid]->get_c()->type;
1058 116612 : if (type == subseq_t::COMMUTE)
1059 : {
1060 15814 : const size_t path = (size_t)ret[0];
1061 15814 : if (this->cur_ss[tid]->get_children().size() > path)
1062 : {
1063 15814 : this->cur_ss[tid] = this->cur_ss[tid]->get_children()[path];
1064 15814 : this->cur_task_id[tid] = 0;
1065 :
1066 15814 : if (this->cur_ss[tid]->get_c()->tasks.size() == 0)
1067 : {
1068 : // skip nodes without tasks if any
1069 0 : while (this->cur_ss[tid]->get_children().size() > 0)
1070 : {
1071 0 : this->cur_ss[tid] = this->cur_ss[tid]->get_children()[0];
1072 0 : this->cur_task_id[tid] = 0;
1073 0 : if (this->cur_ss[tid]->get_c() && this->cur_ss[tid]->get_c()->tasks.size() > 0) break;
1074 : }
1075 0 : if (this->cur_task_id[tid] >= this->cur_ss[tid]->get_c()->tasks.size())
1076 0 : this->next_round_is_over[tid] = true;
1077 : }
1078 : }
1079 : else
1080 : {
1081 0 : std::stringstream message;
1082 0 : message << "This should never happen ('path' = " << path
1083 0 : << ", 'cur_ss[tid]->get_children().size()' = " << this->cur_ss[tid]->get_children().size()
1084 0 : << ", 'tid' = " << tid << ").";
1085 0 : throw tools::invalid_argument(__FILE__, __LINE__, __func__, message.str());
1086 0 : }
1087 : }
1088 : else
1089 : {
1090 100798 : this->cur_task_id[tid]++;
1091 100798 : if (this->cur_task_id[tid] == (this->cur_ss[tid]->get_c()->tasks.size()))
1092 : {
1093 : // skip nodes without tasks if any
1094 47695 : while (this->cur_ss[tid]->get_children().size() > 0)
1095 : {
1096 41586 : this->cur_ss[tid] = this->cur_ss[tid]->get_children()[0];
1097 41586 : this->cur_task_id[tid] = 0;
1098 41586 : if (this->cur_ss[tid]->get_c() && this->cur_ss[tid]->get_c()->tasks.size() > 0) break;
1099 : }
1100 47695 : if (this->cur_task_id[tid] >= this->cur_ss[tid]->get_c()->tasks.size())
1101 6109 : this->next_round_is_over[tid] = true;
1102 : }
1103 : }
1104 : }
1105 :
1106 122721 : return executed_task;
1107 : }
1108 :
1109 : template<class SS, class TA>
1110 : tools::Digraph_node<SS>*
1111 3661 : Sequence::init_recursive(tools::Digraph_node<SS>* cur_subseq,
1112 : size_t& ssid,
1113 : size_t& taid,
1114 : std::vector<std::pair<TA*, tools::Digraph_node<SS>*>>& selectors,
1115 : std::vector<TA*>& switchers,
1116 : TA& first,
1117 : TA& current_task,
1118 : const std::vector<TA*>& lasts,
1119 : const std::vector<TA*>& exclusions,
1120 : std::vector<size_t>& real_lasts_id,
1121 : std::vector<TA*>& real_lasts,
1122 : std::map<TA*, unsigned>& in_sockets_feed,
1123 : std::map<TA*, std::pair<tools::Digraph_node<SS>*, size_t>>& task_subseq)
1124 : {
1125 :
1126 3661 : if (dynamic_cast<const module::Adaptor_m_to_n*>(¤t_task.get_module()) && !this->tasks_inplace)
1127 : {
1128 0 : std::stringstream message;
1129 0 : message << "'module::Adaptor_m_to_n' objects are not supported if 'tasks_inplace' is set to false.";
1130 0 : throw tools::runtime_error(__FILE__, __LINE__, __func__, message.str());
1131 0 : }
1132 :
1133 3661 : auto it = std::find(real_lasts.begin(), real_lasts.end(), ¤t_task);
1134 3661 : if (it != real_lasts.end())
1135 : {
1136 0 : real_lasts.erase(it);
1137 0 : auto dist = std::distance(real_lasts.begin(), it);
1138 0 : real_lasts_id.erase(real_lasts_id.begin() + dist);
1139 : }
1140 :
1141 3661 : if (cur_subseq->get_contents() == nullptr)
1142 : {
1143 687 : cur_subseq->set_contents(new SS());
1144 687 : ssid++;
1145 : }
1146 :
1147 3661 : bool is_last = true;
1148 3661 : tools::Digraph_node<SS>* last_subseq = nullptr;
1149 :
1150 3661 : if (auto switcher = dynamic_cast<const module::Switcher*>(¤t_task.get_module()))
1151 : {
1152 168 : const auto current_task_name = current_task.get_name();
1153 252 : if (current_task_name == "commute" && // -------------------------------------------------------------- COMMUTE
1154 252 : std::find(switchers.begin(), switchers.end(), ¤t_task) == switchers.end())
1155 : {
1156 84 : switchers.push_back(¤t_task);
1157 84 : auto node_commute = new tools::Digraph_node<SS>({ cur_subseq }, {}, nullptr, cur_subseq->get_depth() + 1);
1158 :
1159 84 : node_commute->set_contents(new SS());
1160 84 : node_commute->get_c()->tasks.push_back(¤t_task);
1161 84 : node_commute->get_c()->tasks_id.push_back(taid++);
1162 84 : node_commute->get_c()->type = subseq_t::COMMUTE;
1163 84 : ssid++;
1164 :
1165 84 : cur_subseq->add_child(node_commute);
1166 :
1167 263 : for (size_t sdo = 0; sdo < switcher->get_n_data_sockets(); sdo++)
1168 : {
1169 358 : auto node_commute_son =
1170 179 : new tools::Digraph_node<SS>({ node_commute }, {}, nullptr, node_commute->get_depth() + 1);
1171 :
1172 179 : node_commute_son->set_contents(new SS());
1173 179 : ssid++;
1174 :
1175 179 : node_commute->add_child(node_commute_son);
1176 :
1177 179 : auto& bss = (*switcher)[module::swi::tsk::commute].sockets[sdo + 2]->get_bound_sockets();
1178 364 : for (auto bs : bss)
1179 : {
1180 185 : if (bs == nullptr) continue;
1181 185 : auto& t = bs->get_task();
1182 185 : if (std::find(exclusions.begin(), exclusions.end(), &t) == exclusions.end())
1183 : {
1184 185 : if (task_subseq.find(&t) == task_subseq.end() || task_subseq[&t].second < ssid)
1185 179 : task_subseq[&t] = { node_commute_son, ssid };
1186 :
1187 185 : in_sockets_feed.find(&t) != in_sockets_feed.end() ? in_sockets_feed[&t]++
1188 142 : : in_sockets_feed[&t] = 1;
1189 185 : bool t_is_select =
1190 185 : dynamic_cast<const module::Switcher*>(&(t.get_module())) && t.get_name() == "select";
1191 129 : if ((!t_is_select && in_sockets_feed[&t] >= (t.get_n_input_sockets() + t.get_n_fwd_sockets()) -
1192 370 : t.get_n_static_input_sockets()) ||
1193 56 : (t_is_select && t.is_last_input_socket(*bs)))
1194 : {
1195 148 : is_last = false;
1196 148 : last_subseq = Sequence::init_recursive<SS, TA>(task_subseq[&t].first,
1197 296 : task_subseq[&t].second,
1198 : taid,
1199 : selectors,
1200 : switchers,
1201 : first,
1202 : t,
1203 : lasts,
1204 : exclusions,
1205 : real_lasts_id,
1206 : real_lasts,
1207 : in_sockets_feed,
1208 : task_subseq);
1209 : }
1210 37 : else if (t_is_select)
1211 : {
1212 37 : tools::Digraph_node<SS>* node_selector = nullptr;
1213 43 : for (auto& sel : selectors)
1214 43 : if (sel.first == &t)
1215 : {
1216 37 : node_selector = sel.second;
1217 37 : break;
1218 : }
1219 :
1220 37 : if (!node_selector)
1221 : {
1222 0 : node_selector = new tools::Digraph_node<SS>(
1223 0 : { node_commute_son }, {}, nullptr, node_commute_son->get_depth() + 1);
1224 0 : selectors.push_back({ &t, node_selector });
1225 : }
1226 : else
1227 37 : node_selector->add_parent(node_commute_son);
1228 37 : node_commute_son->add_child(node_selector);
1229 : }
1230 : }
1231 : }
1232 : }
1233 : // exception for the status socket
1234 : auto& bss =
1235 84 : (*switcher)[module::swi::tsk::commute].sockets[switcher->get_n_data_sockets() + 2]->get_bound_sockets();
1236 84 : for (auto bs : bss)
1237 : {
1238 0 : if (bs == nullptr) continue;
1239 0 : auto& t = bs->get_task();
1240 0 : if (std::find(exclusions.begin(), exclusions.end(), &t) == exclusions.end())
1241 : {
1242 0 : if (task_subseq.find(&t) == task_subseq.end() || task_subseq[&t].second < ssid)
1243 0 : task_subseq[&t] = { node_commute, ssid };
1244 :
1245 0 : in_sockets_feed.find(&t) != in_sockets_feed.end() ? in_sockets_feed[&t]++ : in_sockets_feed[&t] = 1;
1246 0 : bool t_is_select =
1247 0 : dynamic_cast<const module::Switcher*>(&(t.get_module())) && t.get_name() == "select";
1248 0 : if ((!t_is_select && in_sockets_feed[&t] >= (t.get_n_input_sockets() + t.get_n_fwd_sockets()) -
1249 0 : t.get_n_static_input_sockets()) ||
1250 0 : (t_is_select && t.is_last_input_socket(*bs)))
1251 : {
1252 0 : is_last = false;
1253 0 : last_subseq = Sequence::init_recursive<SS, TA>(task_subseq[&t].first,
1254 0 : task_subseq[&t].second,
1255 : taid,
1256 : selectors,
1257 : switchers,
1258 : first,
1259 : t,
1260 : lasts,
1261 : exclusions,
1262 : real_lasts_id,
1263 : real_lasts,
1264 : in_sockets_feed,
1265 : task_subseq);
1266 : }
1267 : }
1268 : }
1269 : }
1270 84 : else if (current_task_name == "select") // ------------------------------------------------------------- SELECT
1271 : {
1272 84 : tools::Digraph_node<SS>* node_selector = nullptr;
1273 :
1274 109 : for (auto& sel : selectors)
1275 47 : if (sel.first == ¤t_task)
1276 : {
1277 22 : node_selector = sel.second;
1278 22 : break;
1279 : }
1280 :
1281 84 : if (!node_selector)
1282 : {
1283 62 : node_selector = new tools::Digraph_node<SS>({ cur_subseq }, {}, nullptr, cur_subseq->get_depth() + 1);
1284 62 : selectors.push_back({ ¤t_task, node_selector });
1285 : }
1286 :
1287 84 : node_selector->set_contents(new SS());
1288 84 : node_selector->get_c()->tasks.push_back(¤t_task);
1289 84 : node_selector->get_c()->tasks_id.push_back(taid++);
1290 84 : node_selector->get_c()->type = subseq_t::SELECT;
1291 84 : ssid++;
1292 :
1293 84 : cur_subseq->add_child(node_selector);
1294 :
1295 168 : auto node_selector_son =
1296 84 : new tools::Digraph_node<SS>({ node_selector }, {}, nullptr, node_selector->get_depth() + 1);
1297 :
1298 84 : node_selector_son->set_contents(new SS());
1299 84 : ssid++;
1300 :
1301 84 : node_selector->add_child(node_selector_son);
1302 :
1303 599 : for (auto& s : current_task.sockets)
1304 : {
1305 347 : if (!(s->get_type() == socket_t::SOUT)) continue;
1306 168 : auto bss = s->get_bound_sockets();
1307 308 : for (auto bs : bss)
1308 : {
1309 140 : if (bs == nullptr) continue;
1310 140 : auto& t = bs->get_task();
1311 140 : if (std::find(exclusions.begin(), exclusions.end(), &t) == exclusions.end())
1312 : {
1313 140 : if (task_subseq.find(&t) == task_subseq.end() || task_subseq[&t].second < ssid)
1314 140 : task_subseq[&t] = { node_selector_son, ssid };
1315 :
1316 140 : in_sockets_feed.find(&t) != in_sockets_feed.end() ? in_sockets_feed[&t]++
1317 140 : : in_sockets_feed[&t] = 1;
1318 140 : bool t_is_select =
1319 140 : dynamic_cast<const module::Switcher*>(&(t.get_module())) && t.get_name() == "select";
1320 :
1321 134 : if ((!t_is_select && in_sockets_feed[&t] >= (t.get_n_input_sockets() + t.get_n_fwd_sockets()) -
1322 280 : t.get_n_static_input_sockets()) ||
1323 6 : (t_is_select && t.is_last_input_socket(*bs)))
1324 : {
1325 96 : is_last = false;
1326 96 : last_subseq = Sequence::init_recursive<SS, TA>(task_subseq[&t].first,
1327 192 : task_subseq[&t].second,
1328 : taid,
1329 : selectors,
1330 : switchers,
1331 : first,
1332 : t,
1333 : lasts,
1334 : exclusions,
1335 : real_lasts_id,
1336 : real_lasts,
1337 : in_sockets_feed,
1338 : task_subseq);
1339 : }
1340 44 : else if (t_is_select)
1341 : {
1342 0 : tools::Digraph_node<SS>* node_selector = nullptr;
1343 0 : for (auto& sel : selectors)
1344 0 : if (sel.first == &t)
1345 : {
1346 0 : node_selector = sel.second;
1347 0 : break;
1348 : }
1349 :
1350 0 : if (!node_selector)
1351 : {
1352 0 : node_selector = new tools::Digraph_node<SS>(
1353 0 : { node_selector_son }, {}, nullptr, node_selector_son->get_depth() + 1);
1354 0 : selectors.push_back({ &t, node_selector });
1355 : }
1356 0 : node_selector->add_parent(node_selector_son);
1357 0 : node_selector_son->add_child(node_selector);
1358 : }
1359 : }
1360 : }
1361 : }
1362 : }
1363 168 : }
1364 : else // --------------------------------------------------------------------------------------------- STANDARD CASE
1365 : {
1366 3493 : cur_subseq->get_c()->tasks.push_back(¤t_task);
1367 3493 : cur_subseq->get_c()->tasks_id.push_back(taid++);
1368 :
1369 3493 : if (std::find(lasts.begin(), lasts.end(), ¤t_task) == lasts.end())
1370 : {
1371 10479 : for (auto& s : current_task.sockets)
1372 : {
1373 7504 : if (s->get_type() == socket_t::SOUT || s->get_type() == socket_t::SFWD)
1374 : {
1375 5479 : auto bss = s->get_bound_sockets();
1376 8486 : for (auto& bs : bss)
1377 : {
1378 3007 : if (bs == nullptr) continue;
1379 3007 : auto& t = bs->get_task();
1380 3007 : if (std::find(exclusions.begin(), exclusions.end(), &t) == exclusions.end())
1381 : {
1382 2979 : if (task_subseq.find(&t) == task_subseq.end() || task_subseq[&t].second < ssid)
1383 2679 : task_subseq[&t] = { cur_subseq, ssid };
1384 :
1385 2979 : in_sockets_feed.find(&t) != in_sockets_feed.end() ? in_sockets_feed[&t]++
1386 2621 : : in_sockets_feed[&t] = 1;
1387 2979 : bool t_is_select =
1388 2979 : dynamic_cast<const module::Switcher*>(&(t.get_module())) && t.get_name() == "select";
1389 8820 : if ((!t_is_select &&
1390 2862 : in_sockets_feed[&t] >= (t.get_n_input_sockets() + t.get_n_fwd_sockets()) -
1391 5958 : t.get_n_static_input_sockets()) ||
1392 117 : (t_is_select && t.is_last_input_socket(*bs)))
1393 : {
1394 2564 : is_last = false;
1395 2564 : last_subseq = Sequence::init_recursive<SS, TA>(task_subseq[&t].first,
1396 5128 : task_subseq[&t].second,
1397 : taid,
1398 : selectors,
1399 : switchers,
1400 : first,
1401 : t,
1402 : lasts,
1403 : exclusions,
1404 : real_lasts_id,
1405 : real_lasts,
1406 : in_sockets_feed,
1407 : task_subseq);
1408 : }
1409 415 : else if (t_is_select)
1410 : {
1411 58 : tools::Digraph_node<SS>* node_selector = nullptr;
1412 77 : for (auto& sel : selectors)
1413 55 : if (sel.first == &t)
1414 : {
1415 36 : node_selector = sel.second;
1416 36 : break;
1417 : }
1418 :
1419 58 : if (!node_selector)
1420 : {
1421 44 : node_selector = new tools::Digraph_node<SS>(
1422 22 : { cur_subseq }, {}, nullptr, cur_subseq->get_depth() + 1);
1423 22 : selectors.push_back({ &t, node_selector });
1424 : }
1425 58 : node_selector->add_parent(cur_subseq);
1426 58 : cur_subseq->add_child(node_selector);
1427 : }
1428 : }
1429 : }
1430 5479 : }
1431 2025 : else if (s->get_type() == socket_t::SIN && s->get_bound_sockets().size() > 1)
1432 : {
1433 0 : std::stringstream message;
1434 : message << "'s->get_bound_sockets().size()' has to be smaller or equal to 1 ("
1435 : << "'s->get_bound_sockets().size()'"
1436 0 : << " = " << s->get_bound_sockets().size() << ", "
1437 : << "'s->get_type()'"
1438 : << " = "
1439 : << "socket_t::SIN"
1440 : << ", "
1441 : << "'s->get_name()'"
1442 0 : << " = " << s->get_name() << ", "
1443 : << "'s->get_task().get_name()'"
1444 0 : << " = " << s->get_task().get_name() << ", "
1445 : << "'s->get_task().get_module().get_name()'"
1446 0 : << " = " << s->get_task().get_module().get_name() << ").";
1447 0 : throw tools::runtime_error(__FILE__, __LINE__, __func__, message.str());
1448 0 : }
1449 : }
1450 : }
1451 : }
1452 :
1453 3661 : if (is_last && std::find(real_lasts.begin(), real_lasts.end(), ¤t_task) == real_lasts.end())
1454 : {
1455 1082 : real_lasts.push_back(¤t_task);
1456 1082 : real_lasts_id.push_back(cur_subseq->get_contents()->tasks_id.back());
1457 : }
1458 :
1459 3661 : if (last_subseq)
1460 2579 : return last_subseq;
1461 : else
1462 1082 : return cur_subseq;
1463 : }
1464 :
1465 : template<class SS, class MO>
1466 : void
1467 687 : Sequence::replicate(const tools::Digraph_node<SS>* sequence)
1468 : {
1469 687 : std::set<MO*> modules_set;
1470 687 : std::vector<const runtime::Task*> tsks_vec; // get a vector of tasks included in the tasks graph
1471 : std::function<void(const tools::Digraph_node<SS>*, std::vector<const tools::Digraph_node<SS>*>&)>
1472 687 : collect_modules_list;
1473 687 : collect_modules_list =
1474 1151 : [&](const tools::Digraph_node<SS>* node, std::vector<const tools::Digraph_node<SS>*>& already_parsed_nodes)
1475 : {
1476 2302 : if (node != nullptr &&
1477 2302 : std::find(already_parsed_nodes.begin(), already_parsed_nodes.end(), node) == already_parsed_nodes.end())
1478 : {
1479 1056 : already_parsed_nodes.push_back(node);
1480 1056 : tsks_vec.insert(tsks_vec.end(), node->get_c()->tasks.begin(), node->get_c()->tasks.end());
1481 1056 : if (node->get_c())
1482 4717 : for (auto ta : node->get_c()->tasks)
1483 3661 : modules_set.insert(&ta->get_module());
1484 1520 : for (auto c : node->get_children())
1485 464 : collect_modules_list(c, already_parsed_nodes);
1486 : }
1487 : };
1488 687 : std::vector<const tools::Digraph_node<SS>*> already_parsed_nodes;
1489 687 : collect_modules_list(sequence, already_parsed_nodes);
1490 :
1491 : // check if all the tasks of the sequence are replicable before to perform the modules clones
1492 687 : if (this->n_threads - (this->tasks_inplace ? 1 : 0))
1493 2030 : for (auto& t : tsks_vec)
1494 1743 : if (!t->is_replicable())
1495 : {
1496 0 : std::stringstream message;
1497 : message << "It is not possible to replicate this sequence because at least one of its tasks is not "
1498 : << "replicable (t->get_name() = '" << t->get_name() << "', t->get_module().get_name() = '"
1499 0 : << t->get_module().get_name() << "').";
1500 0 : throw tools::runtime_error(__FILE__, __LINE__, __func__, message.str());
1501 0 : }
1502 :
1503 687 : std::vector<MO*> modules_vec;
1504 4251 : for (auto m : modules_set)
1505 3564 : modules_vec.push_back(m);
1506 :
1507 : // clone the modules
1508 6191 : for (size_t tid = 0; tid < this->n_threads - (this->tasks_inplace ? 1 : 0); tid++)
1509 : {
1510 5504 : if (this->is_thread_pinning())
1511 : {
1512 13 : const auto real_tid = tid + (this->tasks_inplace ? 1 : 0);
1513 13 : if (!this->puids.empty())
1514 0 : tools::Thread_pinning::pin(this->puids[real_tid]);
1515 : else
1516 13 : tools::Thread_pinning::pin(this->pin_objects_per_thread[real_tid]);
1517 : }
1518 :
1519 5504 : this->modules[tid].resize(modules_vec.size());
1520 5504 : this->all_modules[tid + (this->tasks_inplace ? 1 : 0)].resize(modules_vec.size());
1521 43872 : for (size_t m = 0; m < modules_vec.size(); m++)
1522 : {
1523 : try
1524 : {
1525 38368 : this->modules[tid][m].reset(modules_vec[m]->clone());
1526 : }
1527 0 : catch (std::exception& e)
1528 : {
1529 0 : std::stringstream message;
1530 : message << "Module clone failed when trying to replicate the sequence (module name is '"
1531 0 : << modules_vec[m]->get_name() << "').";
1532 :
1533 0 : throw tools::runtime_error(__FILE__, __LINE__, __func__, message.str());
1534 0 : }
1535 38368 : this->all_modules[tid + (this->tasks_inplace ? 1 : 0)][m] = this->modules[tid][m].get();
1536 : }
1537 :
1538 5504 : if (this->is_thread_pinning()) tools::Thread_pinning::unpin();
1539 : }
1540 :
1541 84826 : auto get_module_id = [](const std::vector<MO*>& modules, const module::Module& module)
1542 : {
1543 : int m_id;
1544 462481 : for (m_id = 0; m_id < (int)modules.size(); m_id++)
1545 458794 : if (modules[m_id] == &module) return m_id;
1546 3687 : return -1;
1547 : };
1548 :
1549 80904 : auto get_task_id = [](const std::vector<std::shared_ptr<runtime::Task>>& tasks, const runtime::Task& task)
1550 : {
1551 : int t_id;
1552 97199 : for (t_id = 0; t_id < (int)tasks.size(); t_id++)
1553 97199 : if (tasks[t_id].get() == &task) return t_id;
1554 0 : return -1;
1555 : };
1556 :
1557 40199 : auto get_socket_id = [](const std::vector<std::shared_ptr<runtime::Socket>>& sockets, const runtime::Socket& socket)
1558 : {
1559 : int s_id;
1560 85292 : for (s_id = 0; s_id < (int)sockets.size(); s_id++)
1561 85292 : if (sockets[s_id].get() == &socket) return s_id;
1562 0 : return -1;
1563 : };
1564 :
1565 : std::function<void(const tools::Digraph_node<SS>*,
1566 : tools::Digraph_node<Sub_sequence>*,
1567 : const size_t,
1568 : std::vector<const tools::Digraph_node<SS>*>&,
1569 : std::map<size_t, tools::Digraph_node<Sub_sequence>*>&)>
1570 687 : replicate_sequence;
1571 :
1572 18813 : replicate_sequence = [&](const tools::Digraph_node<SS>* sequence_ref,
1573 : tools::Digraph_node<Sub_sequence>* sequence_cpy,
1574 : const size_t thread_id,
1575 : std::vector<const tools::Digraph_node<SS>*>& already_parsed_nodes,
1576 : std::map<size_t, tools::Digraph_node<Sub_sequence>*>& allocated_nodes)
1577 : {
1578 36252 : if (sequence_ref != nullptr && sequence_ref->get_c() &&
1579 18126 : std::find(already_parsed_nodes.begin(), already_parsed_nodes.end(), sequence_ref) ==
1580 36252 : already_parsed_nodes.end())
1581 : {
1582 15417 : already_parsed_nodes.push_back(sequence_ref);
1583 :
1584 15417 : auto ss_ref = sequence_ref->get_c();
1585 15417 : auto ss_cpy = new Sub_sequence();
1586 :
1587 15417 : ss_cpy->type = ss_ref->type;
1588 15417 : ss_cpy->id = ss_ref->id;
1589 15417 : ss_cpy->tasks_id = ss_ref->tasks_id;
1590 56122 : for (auto t_ref : ss_ref->tasks)
1591 : {
1592 40705 : auto& m_ref = t_ref->get_module();
1593 :
1594 40705 : auto m_id = get_module_id(modules_vec, m_ref);
1595 40705 : auto t_id = get_task_id(m_ref.tasks, *t_ref);
1596 :
1597 40705 : assert(m_id != -1);
1598 40705 : assert(t_id != -1);
1599 :
1600 : // add the task to the sub-sequence
1601 40705 : ss_cpy->tasks.push_back(this->all_modules[thread_id][m_id]->tasks[t_id].get());
1602 :
1603 : // replicate the sockets binding
1604 156519 : for (size_t s_id = 0; s_id < t_ref->sockets.size(); s_id++)
1605 : {
1606 197544 : if (t_ref->sockets[s_id]->get_type() == socket_t::SIN ||
1607 81730 : t_ref->sockets[s_id]->get_type() == socket_t::SFWD)
1608 : {
1609 40290 : const runtime::Socket* s_ref_out = nullptr;
1610 : try
1611 : {
1612 40290 : s_ref_out = &t_ref->sockets[s_id]->get_bound_socket();
1613 : }
1614 96 : catch (...)
1615 : {
1616 : }
1617 40290 : if (s_ref_out)
1618 : {
1619 40194 : auto& t_ref_out = s_ref_out->get_task();
1620 40194 : auto& m_ref_out = t_ref_out.get_module();
1621 :
1622 : // check if `t_ref_out` is included in the tasks graph
1623 40194 : auto t_in_seq = std::find(tsks_vec.begin(), tsks_vec.end(), &t_ref_out) != tsks_vec.end();
1624 40194 : auto m_id_out = get_module_id(modules_vec, m_ref_out);
1625 :
1626 40194 : if (t_in_seq && m_id_out != -1)
1627 : {
1628 36935 : auto t_id_out = get_task_id(m_ref_out.tasks, t_ref_out);
1629 36935 : auto s_id_out = get_socket_id(t_ref_out.sockets, *s_ref_out);
1630 :
1631 36935 : assert(t_id_out != -1);
1632 36935 : assert(s_id_out != -1);
1633 :
1634 36935 : auto& s_in = *this->all_modules[thread_id][m_id]->tasks[t_id]->sockets[s_id];
1635 : auto& s_out =
1636 36935 : *this->all_modules[thread_id][m_id_out]->tasks[t_id_out]->sockets[s_id_out];
1637 36935 : s_in = s_out;
1638 : }
1639 : }
1640 : }
1641 : }
1642 :
1643 : // replicate the tasks binding
1644 40705 : if (t_ref->fake_input_sockets.size())
1645 : {
1646 7854 : for (auto& fsi : t_ref->fake_input_sockets)
1647 : {
1648 3927 : const runtime::Socket* s_ref_out = nullptr;
1649 : try
1650 : {
1651 3927 : s_ref_out = &fsi->get_bound_socket();
1652 : }
1653 0 : catch (...)
1654 : {
1655 : }
1656 3927 : if (s_ref_out)
1657 : {
1658 3927 : auto& t_ref_out = s_ref_out->get_task();
1659 3927 : auto& m_ref_out = t_ref_out.get_module();
1660 :
1661 : // check if `t_ref_out` is included in the tasks graph
1662 3927 : auto t_in_seq = std::find(tsks_vec.begin(), tsks_vec.end(), &t_ref_out) != tsks_vec.end();
1663 3927 : auto m_id_out = get_module_id(modules_vec, m_ref_out);
1664 :
1665 3927 : if (t_in_seq && m_id_out != -1)
1666 : {
1667 3264 : auto t_id_out = get_task_id(m_ref_out.tasks, t_ref_out);
1668 3264 : auto s_id_out = get_socket_id(t_ref_out.sockets, *s_ref_out);
1669 :
1670 3264 : assert(t_id_out != -1);
1671 3264 : assert(s_id_out != -1);
1672 :
1673 3264 : auto& t_in = *this->all_modules[thread_id][m_id]->tasks[t_id];
1674 : auto& s_out =
1675 3264 : *this->all_modules[thread_id][m_id_out]->tasks[t_id_out]->sockets[s_id_out];
1676 3264 : t_in = s_out;
1677 : }
1678 : }
1679 : }
1680 : }
1681 : }
1682 :
1683 : // add the sub-sequence to the current tree node
1684 15417 : sequence_cpy->set_contents(ss_cpy);
1685 15417 : allocated_nodes[sequence_cpy->get_c()->id] = sequence_cpy;
1686 :
1687 28039 : for (size_t c = 0; c < sequence_ref->get_children().size(); c++)
1688 : {
1689 12622 : if (std::find(already_parsed_nodes.begin(),
1690 : already_parsed_nodes.end(),
1691 25244 : sequence_ref->get_children()[c]) != already_parsed_nodes.end())
1692 2709 : sequence_cpy->add_child(allocated_nodes[sequence_ref->get_children()[c]->get_c()->id]);
1693 : else
1694 19826 : sequence_cpy->add_child(new tools::Digraph_node<Sub_sequence>(
1695 9913 : { sequence_cpy }, {}, nullptr, sequence_cpy->get_depth() + 1));
1696 : }
1697 :
1698 28039 : for (size_t c = 0; c < sequence_ref->get_children().size(); c++)
1699 12622 : replicate_sequence(sequence_ref->get_children()[c],
1700 12622 : sequence_cpy->get_children()[c],
1701 : thread_id,
1702 : already_parsed_nodes,
1703 : allocated_nodes);
1704 : }
1705 : };
1706 :
1707 6191 : for (size_t thread_id = (this->tasks_inplace ? 1 : 0); thread_id < this->sequences.size(); thread_id++)
1708 : {
1709 5504 : if (this->is_thread_pinning())
1710 : {
1711 13 : if (!this->puids.empty())
1712 0 : tools::Thread_pinning::pin(this->puids[thread_id]);
1713 : else
1714 13 : tools::Thread_pinning::pin(this->pin_objects_per_thread[thread_id]);
1715 : }
1716 :
1717 5504 : this->sequences[thread_id] = new tools::Digraph_node<Sub_sequence>({}, {}, nullptr, 0);
1718 5504 : already_parsed_nodes.clear();
1719 5504 : std::map<size_t, tools::Digraph_node<Sub_sequence>*> allocated_nodes;
1720 5504 : replicate_sequence(sequence, this->sequences[thread_id], thread_id, already_parsed_nodes, allocated_nodes);
1721 :
1722 5504 : if (this->is_thread_pinning()) tools::Thread_pinning::unpin();
1723 : }
1724 687 : }
1725 :
1726 : template void
1727 : runtime::Sequence::replicate<runtime::Sub_sequence_const, const module::Module>(
1728 : const tools::Digraph_node<runtime::Sub_sequence_const>*);
1729 : template void
1730 : runtime::Sequence::replicate<runtime::Sub_sequence, module::Module>(const tools::Digraph_node<runtime::Sub_sequence>*);
1731 :
1732 : template<class SS>
1733 : void
1734 19277 : Sequence::delete_tree(tools::Digraph_node<SS>* node, std::vector<tools::Digraph_node<SS>*>& already_deleted_nodes)
1735 : {
1736 38554 : if (node != nullptr &&
1737 38554 : std::find(already_deleted_nodes.begin(), already_deleted_nodes.end(), node) == already_deleted_nodes.end())
1738 : {
1739 16473 : already_deleted_nodes.push_back(node);
1740 29559 : for (auto c : node->get_children())
1741 13086 : this->delete_tree(c, already_deleted_nodes);
1742 16473 : auto c = node->get_c();
1743 16473 : if (c != nullptr) delete c;
1744 16473 : delete node;
1745 : }
1746 19277 : }
1747 :
1748 : template void
1749 : runtime::Sequence::delete_tree<runtime::Sub_sequence_const>(
1750 : tools::Digraph_node<runtime::Sub_sequence_const>*,
1751 : std::vector<tools::Digraph_node<Sub_sequence_const>*>& already_deleted_nodes);
1752 : template void
1753 : runtime::Sequence::delete_tree<runtime::Sub_sequence>(
1754 : tools::Digraph_node<runtime::Sub_sequence>*,
1755 : std::vector<tools::Digraph_node<Sub_sequence>*>& already_deleted_nodes);
1756 :
1757 : template<class VTA>
1758 : void
1759 67 : Sequence::export_dot_subsequence(const VTA& subseq,
1760 : const std::vector<size_t>& tasks_id,
1761 : const subseq_t& subseq_type,
1762 : const std::string& subseq_name,
1763 : const std::string& tab,
1764 : std::ostream& stream) const
1765 : {
1766 67 : if (!subseq_name.empty())
1767 : {
1768 67 : stream << tab << "subgraph \"cluster_" << subseq_name << "_" << +this << "\" {" << std::endl;
1769 67 : stream << tab << tab << "node [style=filled];" << std::endl;
1770 : }
1771 67 : size_t exec_order = 0;
1772 301 : for (auto& t : subseq)
1773 : {
1774 234 : std::string color = dynamic_cast<module::Adaptor_m_to_n*>(&t->get_module()) ? "green" : "blue";
1775 234 : color = dynamic_cast<module::AProbe*>(&t->get_module()) ? "pink" : color;
1776 234 : stream << tab << tab << "subgraph \"cluster_" << +&t->get_module() << "_" << +t << "\" {" << std::endl;
1777 234 : stream << tab << tab << tab << "node [style=filled];" << std::endl;
1778 234 : stream << tab << tab << tab << "subgraph \"cluster_" << +&t << "\" {" << std::endl;
1779 234 : stream << tab << tab << tab << tab << "node [style=filled];" << std::endl;
1780 :
1781 234 : if (t->fake_input_sockets.size())
1782 : {
1783 47 : std::string stype = "in";
1784 94 : for (auto& fsi : t->fake_input_sockets)
1785 47 : stream << tab << tab << tab << tab << "\"" << +fsi.get() << "\""
1786 47 : << "[label=\"" << stype << ":" << fsi->get_name() << "\", style=filled, "
1787 94 : << "fillcolor=red, penwidth=\"2.0\"];" << std::endl;
1788 47 : }
1789 :
1790 234 : size_t sid = 0;
1791 848 : for (auto& s : t->sockets)
1792 : {
1793 614 : std::string stype;
1794 614 : bool static_input = false;
1795 614 : switch (s->get_type())
1796 : {
1797 172 : case socket_t::SIN:
1798 172 : stype = "in[" + std::to_string(sid) + "]";
1799 172 : static_input = s->_get_dataptr() != nullptr && s->bound_socket == nullptr;
1800 172 : break;
1801 399 : case socket_t::SOUT:
1802 399 : stype = "out[" + std::to_string(sid) + "]";
1803 399 : break;
1804 43 : case socket_t::SFWD:
1805 43 : stype = "fwd[" + std::to_string(sid) + "]";
1806 43 : break;
1807 0 : default:
1808 0 : stype = "unkn";
1809 0 : break;
1810 : }
1811 :
1812 614 : std::string bold_or_not;
1813 614 : if (t->is_last_input_socket(*s)) bold_or_not = ", penwidth=\"2.0\"";
1814 :
1815 614 : stream << tab << tab << tab << tab << "\"" << +s.get() << "\""
1816 614 : << "[label=\"" << stype << ":" << s->get_name() << "\"" << bold_or_not
1817 1228 : << (static_input ? ", style=filled, fillcolor=green" : "") << "];" << std::endl;
1818 614 : sid++;
1819 : }
1820 :
1821 468 : stream << tab << tab << tab << tab << "label=\"" << t->get_name() << " (id = " << tasks_id[exec_order] << ")"
1822 234 : << "\";" << std::endl;
1823 234 : stream << tab << tab << tab << tab << "color=" << (t->is_replicable() ? color : "red") << ";" << std::endl;
1824 234 : stream << tab << tab << tab << "}" << std::endl;
1825 234 : stream << tab << tab << tab << "label=\"" << t->get_module().get_name() << "\\n"
1826 468 : << (t->get_module().get_custom_name().empty() ? "" : t->get_module().get_custom_name() + "\\n")
1827 702 : << "exec order: [" << exec_order++ << "]\\n"
1828 234 : << "addr: " << +&t->get_module() << "\";" << std::endl;
1829 234 : stream << tab << tab << tab << "color=" << color << ";" << std::endl;
1830 234 : stream << tab << tab << "}" << std::endl;
1831 : }
1832 67 : if (!subseq_name.empty())
1833 : {
1834 67 : stream << tab << tab << "label=\"" << subseq_name << "\";" << std::endl;
1835 67 : std::string color = subseq_type == subseq_t::COMMUTE || subseq_type == subseq_t::SELECT ? "red" : "blue";
1836 67 : stream << tab << tab << "color=" << color << ";" << std::endl;
1837 67 : stream << tab << "}" << std::endl;
1838 67 : }
1839 67 : }
1840 :
1841 : template void
1842 : runtime::Sequence::export_dot_subsequence<std::vector<runtime::Task*>>(const std::vector<runtime::Task*>&,
1843 : const std::vector<size_t>&,
1844 : const subseq_t&,
1845 : const std::string&,
1846 : const std::string&,
1847 : std::ostream&) const;
1848 : template void
1849 : runtime::Sequence::export_dot_subsequence<std::vector<const runtime::Task*>>(const std::vector<const runtime::Task*>&,
1850 : const std::vector<size_t>&,
1851 : const subseq_t&,
1852 : const std::string&,
1853 : const std::string&,
1854 : std::ostream&) const;
1855 :
1856 : template<class VTA>
1857 : void
1858 67 : Sequence::export_dot_connections(const VTA& subseq, const std::string& tab, std::ostream& stream) const
1859 : {
1860 301 : for (auto& t : subseq)
1861 : {
1862 848 : for (auto& s : t->sockets)
1863 : {
1864 614 : if (s->get_type() == socket_t::SOUT || s->get_type() == socket_t::SFWD)
1865 : {
1866 442 : auto& bss = s->get_bound_sockets();
1867 442 : size_t id = 0;
1868 704 : for (auto& bs : bss)
1869 : {
1870 262 : stream << tab << "\"" << +s.get() << "\" -> \"" << +bs << "\""
1871 262 : << (bss.size() > 1 ? "[label=\"" + std::to_string(id++) + "\"]" : "") << std::endl;
1872 : }
1873 : }
1874 : }
1875 : }
1876 67 : }
1877 :
1878 : template void
1879 : runtime::Sequence::export_dot_connections<std::vector<runtime::Task*>>(const std::vector<runtime::Task*>&,
1880 : const std::string&,
1881 : std::ostream&) const;
1882 : template void
1883 : runtime::Sequence::export_dot_connections<std::vector<const runtime::Task*>>(const std::vector<const runtime::Task*>&,
1884 : const std::string&,
1885 : std::ostream&) const;
1886 :
1887 : void
1888 12 : Sequence::export_dot(std::ostream& stream) const
1889 : {
1890 12 : auto root = this->sequences[0];
1891 12 : this->export_dot(root, stream);
1892 12 : }
1893 :
1894 : template<class SS>
1895 : void
1896 12 : Sequence::export_dot(tools::Digraph_node<SS>* root, std::ostream& stream) const
1897 : {
1898 : std::function<void(
1899 : tools::Digraph_node<SS>*, const std::string&, std::ostream&, std::vector<tools::Digraph_node<SS>*>&)>
1900 12 : export_dot_subsequences_recursive =
1901 50 : [&export_dot_subsequences_recursive, this](tools::Digraph_node<SS>* cur_node,
1902 : const std::string& tab,
1903 : std::ostream& stream,
1904 : std::vector<tools::Digraph_node<SS>*>& already_parsed_nodes)
1905 : {
1906 100 : if (cur_node != nullptr &&
1907 100 : std::find(already_parsed_nodes.begin(), already_parsed_nodes.end(), cur_node) == already_parsed_nodes.end())
1908 : {
1909 42 : already_parsed_nodes.push_back(cur_node);
1910 126 : this->export_dot_subsequence(cur_node->get_c()->tasks,
1911 42 : cur_node->get_c()->tasks_id,
1912 42 : cur_node->get_c()->type,
1913 42 : "Sub-sequence " + std::to_string(cur_node->get_c()->id) +
1914 : " (depth = " + std::to_string(cur_node->get_depth()) + ")",
1915 : tab,
1916 : stream);
1917 :
1918 80 : for (auto c : cur_node->get_children())
1919 38 : export_dot_subsequences_recursive(c, tab, stream, already_parsed_nodes);
1920 : }
1921 : };
1922 :
1923 : std::function<void(
1924 : tools::Digraph_node<SS>*, const std::string&, std::ostream&, std::vector<tools::Digraph_node<SS>*>&)>
1925 12 : export_dot_connections_recursive =
1926 50 : [&export_dot_connections_recursive, this](tools::Digraph_node<SS>* cur_node,
1927 : const std::string& tab,
1928 : std::ostream& stream,
1929 : std::vector<tools::Digraph_node<SS>*>& already_parsed_nodes)
1930 : {
1931 100 : if (cur_node != nullptr &&
1932 100 : std::find(already_parsed_nodes.begin(), already_parsed_nodes.end(), cur_node) == already_parsed_nodes.end())
1933 : {
1934 42 : already_parsed_nodes.push_back(cur_node);
1935 42 : this->export_dot_connections(cur_node->get_c()->tasks, tab, stream);
1936 :
1937 80 : for (auto c : cur_node->get_children())
1938 38 : export_dot_connections_recursive(c, tab, stream, already_parsed_nodes);
1939 : }
1940 : };
1941 :
1942 12 : std::string tab = "\t";
1943 12 : stream << "digraph Sequence {" << std::endl;
1944 12 : std::vector<tools::Digraph_node<SS>*> already_parsed_nodes;
1945 12 : export_dot_subsequences_recursive(root, tab, stream, already_parsed_nodes);
1946 12 : already_parsed_nodes.clear();
1947 12 : export_dot_connections_recursive(root, tab, stream, already_parsed_nodes);
1948 12 : stream << "}" << std::endl;
1949 12 : }
1950 :
1951 : void
1952 1555 : Sequence::gen_processes(const bool no_copy_mode)
1953 : {
1954 : std::function<void(Socket * socket, std::vector<runtime::Socket*> & list_fwd)> explore_thread_rec =
1955 15641 : [&explore_thread_rec](Socket* socket, std::vector<runtime::Socket*>& list_fwd)
1956 : {
1957 12276 : auto bound = socket->get_bound_sockets();
1958 24014 : for (auto explore_bound : bound)
1959 : {
1960 13522 : if (find(list_fwd.begin(), list_fwd.end(), explore_bound) == list_fwd.end() &&
1961 1776 : explore_bound->get_type() != socket_t::SOUT)
1962 : {
1963 1789 : list_fwd.push_back(explore_bound);
1964 : }
1965 11753 : if (explore_bound->get_type() == socket_t::SFWD) explore_thread_rec(explore_bound, list_fwd);
1966 : }
1967 13792 : };
1968 :
1969 : std::function<void(Socket * socket, std::vector<runtime::Socket*> & list_fwd)> explore_thread_rec_reverse =
1970 3268 : [&explore_thread_rec, &explore_thread_rec_reverse](Socket* socket, std::vector<runtime::Socket*>& list_fwd)
1971 : {
1972 1582 : auto bound = &socket->get_bound_socket();
1973 1582 : if (find(list_fwd.begin(), list_fwd.end(), bound) == list_fwd.end())
1974 : {
1975 1581 : list_fwd.push_back(bound);
1976 : }
1977 1583 : if (bound->get_type() == socket_t::SFWD)
1978 : {
1979 843 : explore_thread_rec(bound, list_fwd);
1980 843 : explore_thread_rec_reverse(bound, list_fwd);
1981 : }
1982 3126 : };
1983 :
1984 : std::function<void(tools::Digraph_node<Sub_sequence>*, std::vector<tools::Digraph_node<Sub_sequence>*>&)>
1985 : gen_processes_recursive =
1986 47355 : [&gen_processes_recursive, no_copy_mode, &explore_thread_rec, &explore_thread_rec_reverse](
1987 : tools::Digraph_node<Sub_sequence>* cur_node,
1988 63507 : std::vector<tools::Digraph_node<Sub_sequence>*>& already_parsed_nodes)
1989 : {
1990 94703 : if (cur_node != nullptr &&
1991 94703 : std::find(already_parsed_nodes.begin(), already_parsed_nodes.end(), cur_node) == already_parsed_nodes.end())
1992 : {
1993 40600 : already_parsed_nodes.push_back(cur_node);
1994 :
1995 40603 : std::map<runtime::Task*, std::function<const int*()>> modified_tasks;
1996 40606 : auto contents = cur_node->get_c();
1997 40611 : contents->processes.clear();
1998 40609 : contents->rebind_sockets.clear();
1999 40608 : contents->rebind_dataptrs.clear();
2000 162212 : for (auto task : contents->tasks)
2001 : {
2002 132599 : if (dynamic_cast<module::Switcher*>(&task->get_module()) &&
2003 254180 : task->get_name().find("select") != std::string::npos && no_copy_mode)
2004 : {
2005 1587 : auto select_task = task;
2006 1587 : auto switcher = dynamic_cast<module::Switcher*>(&select_task->get_module());
2007 1587 : switcher->set_no_copy_select(true);
2008 :
2009 1587 : const auto rebind_id = contents->rebind_sockets.size();
2010 1587 : contents->rebind_sockets.resize(rebind_id + 1);
2011 1587 : contents->rebind_dataptrs.resize(rebind_id + 1);
2012 :
2013 6733 : for (size_t s = 0; s < select_task->sockets.size() - 1; s++)
2014 : {
2015 : // there should be only one output socket at this time
2016 5146 : if (select_task->sockets[s]->get_type() == socket_t::SOUT)
2017 : {
2018 1587 : std::vector<runtime::Socket*> bound_sockets;
2019 1587 : std::vector<void*> dataptrs;
2020 :
2021 3174 : for (auto socket : select_task->sockets[s]->get_bound_sockets())
2022 : {
2023 1587 : bound_sockets.push_back(socket);
2024 1587 : if (socket->get_type() == socket_t::SFWD) explore_thread_rec(socket, bound_sockets);
2025 : }
2026 3174 : for (auto sck : bound_sockets)
2027 1587 : dataptrs.push_back(sck->_get_dataptr());
2028 :
2029 1587 : contents->rebind_sockets[rebind_id].push_back(bound_sockets);
2030 1587 : contents->rebind_dataptrs[rebind_id].push_back(dataptrs);
2031 1587 : }
2032 : }
2033 :
2034 510691 : modified_tasks[select_task] = [contents, select_task, switcher, rebind_id]() -> const int*
2035 : {
2036 56674 : select_task->exec();
2037 56651 : const int* status = select_task->sockets.back()->get_dataptr<int>();
2038 :
2039 56132 : const auto path = switcher->get_path();
2040 56108 : const auto in_dataptr = select_task->sockets[path]->_get_dataptr();
2041 :
2042 : // rebind input sockets on the fly
2043 : // there should be only one output socket at this time (sout_id == 0)
2044 112774 : for (size_t sout_id = 0; sout_id < contents->rebind_sockets[rebind_id].size(); sout_id++)
2045 112878 : for (size_t sin_id = 0; sin_id < contents->rebind_sockets[rebind_id][sout_id].size();
2046 : sin_id++)
2047 56300 : contents->rebind_sockets[rebind_id][sout_id][sin_id]->dataptr = in_dataptr;
2048 :
2049 56729 : return status;
2050 1587 : };
2051 : }
2052 :
2053 132592 : if (dynamic_cast<module::Switcher*>(&task->get_module()) &&
2054 254197 : task->get_name().find("commute") != std::string::npos && no_copy_mode)
2055 : {
2056 1587 : auto commute_task = task;
2057 1587 : auto switcher = dynamic_cast<module::Switcher*>(&commute_task->get_module());
2058 1587 : switcher->set_no_copy_commute(true);
2059 :
2060 1587 : const auto rebind_id = contents->rebind_sockets.size();
2061 1587 : contents->rebind_sockets.resize(rebind_id + 1);
2062 1587 : contents->rebind_dataptrs.resize(rebind_id + 1);
2063 :
2064 8320 : for (size_t s = 0; s < commute_task->sockets.size() - 1; s++)
2065 : {
2066 6733 : if (commute_task->sockets[s]->get_type() == socket_t::SOUT)
2067 : {
2068 3559 : std::vector<runtime::Socket*> bound_sockets;
2069 3559 : std::vector<void*> dataptrs;
2070 :
2071 7263 : for (auto socket : commute_task->sockets[s]->get_bound_sockets())
2072 : {
2073 3704 : bound_sockets.push_back(socket);
2074 3704 : if (socket->get_type() == socket_t::SFWD) explore_thread_rec(socket, bound_sockets);
2075 : }
2076 7263 : for (auto sck : bound_sockets)
2077 3704 : dataptrs.push_back(sck->_get_dataptr());
2078 :
2079 3559 : contents->rebind_sockets[rebind_id].push_back(bound_sockets);
2080 3559 : contents->rebind_dataptrs[rebind_id].push_back(dataptrs);
2081 3559 : }
2082 : }
2083 :
2084 400931 : modified_tasks[commute_task] = [contents, commute_task, switcher, rebind_id]() -> const int*
2085 : {
2086 56808 : commute_task->exec();
2087 56505 : const int* status = commute_task->sockets.back()->get_dataptr<int>();
2088 56181 : const auto in_dataptr = commute_task->sockets[0]->_get_dataptr();
2089 56086 : const auto path = switcher->get_path();
2090 :
2091 : // rebind input sockets on the fly
2092 114292 : for (size_t sin_id = 0; sin_id < contents->rebind_sockets[rebind_id][path].size(); sin_id++)
2093 57885 : contents->rebind_sockets[rebind_id][path][sin_id]->dataptr = in_dataptr;
2094 :
2095 56701 : return status;
2096 1587 : };
2097 : }
2098 :
2099 133956 : if (dynamic_cast<module::Adaptor_m_to_n*>(&task->get_module()) &&
2100 255554 : task->get_name().find("pull") != std::string::npos && no_copy_mode)
2101 : {
2102 3089 : auto pull_task = task;
2103 3089 : auto adp_pull = dynamic_cast<module::Adaptor_m_to_n*>(&pull_task->get_module());
2104 3088 : adp_pull->set_no_copy_pull(true);
2105 3088 : const auto rebind_id = contents->rebind_sockets.size();
2106 3085 : contents->rebind_sockets.resize(rebind_id + 1);
2107 3085 : contents->rebind_dataptrs.resize(rebind_id + 1);
2108 :
2109 8643 : for (size_t s = 0; s < pull_task->sockets.size() - 1; s++)
2110 : {
2111 5551 : if (pull_task->sockets[s]->get_type() == socket_t::SOUT)
2112 : {
2113 5550 : std::vector<runtime::Socket*> bound_sockets;
2114 5549 : std::vector<void*> dataptrs;
2115 :
2116 5548 : bound_sockets.push_back(pull_task->sockets[s].get());
2117 12207 : for (auto socket : pull_task->sockets[s]->get_bound_sockets())
2118 : {
2119 6654 : bound_sockets.push_back(socket);
2120 6666 : if (socket->get_type() == socket_t::SFWD) explore_thread_rec(socket, bound_sockets);
2121 : }
2122 18876 : for (auto sck : bound_sockets)
2123 13332 : dataptrs.push_back(sck->_get_dataptr());
2124 :
2125 5525 : contents->rebind_sockets[rebind_id].push_back(bound_sockets);
2126 5551 : contents->rebind_dataptrs[rebind_id].push_back(dataptrs);
2127 5558 : }
2128 : }
2129 :
2130 9371323 : modified_tasks[pull_task] = [contents, pull_task, adp_pull, rebind_id]() -> const int*
2131 : {
2132 : // active or passive waiting here
2133 556436 : pull_task->exec();
2134 541138 : const int* status = pull_task->sockets.back()->get_dataptr<int>();
2135 :
2136 : // rebind input sockets on the fly
2137 1499470 : for (size_t sin_id = 0; sin_id < contents->rebind_sockets[rebind_id].size(); sin_id++)
2138 : {
2139 967500 : if (contents->rebind_sockets[rebind_id][sin_id].size() > 1)
2140 : {
2141 : // we start to 1 because the rebinding of the 'pull_task' is made in the
2142 : // 'pull_task->exec()' call (this way the debug mode is still working)
2143 962684 : auto swap_buff = contents->rebind_sockets[rebind_id][sin_id][1]->_get_dataptr();
2144 958896 : auto buff = adp_pull->get_filled_buffer(sin_id, swap_buff);
2145 920592 : contents->rebind_sockets[rebind_id][sin_id][1]->dataptr = buff;
2146 : // for the next tasks the same buffer 'buff' is required, an easy mistake is to re-swap
2147 : // and the data will be false, this is why we just bind 'buff'
2148 1662576 : for (size_t ta = 2; ta < contents->rebind_sockets[rebind_id][sin_id].size(); ta++)
2149 738733 : contents->rebind_sockets[rebind_id][sin_id][ta]->dataptr = buff;
2150 : }
2151 : }
2152 557127 : adp_pull->wake_up_pusher();
2153 573334 : return status;
2154 3082 : };
2155 : }
2156 :
2157 133970 : if (dynamic_cast<module::Adaptor_m_to_n*>(&task->get_module()) &&
2158 255568 : task->get_name().find("push") != std::string::npos && no_copy_mode)
2159 : {
2160 3092 : auto push_task = task;
2161 3092 : auto adp_push = dynamic_cast<module::Adaptor_m_to_n*>(&push_task->get_module());
2162 3090 : adp_push->set_no_copy_push(true);
2163 3090 : const auto rebind_id = contents->rebind_sockets.size();
2164 3089 : contents->rebind_sockets.resize(rebind_id + 1);
2165 3092 : contents->rebind_dataptrs.resize(rebind_id + 1);
2166 :
2167 9856 : for (size_t s = 0; s < push_task->sockets.size() - 1; s++)
2168 6767 : if (push_task->sockets[s]->get_type() == socket_t::SIN)
2169 : {
2170 6767 : std::vector<runtime::Socket*> bound_sockets;
2171 6766 : std::vector<void*> dataptrs;
2172 :
2173 6766 : bound_sockets.push_back(push_task->sockets[s].get());
2174 :
2175 6758 : auto bound_socket = &push_task->sockets[s]->get_bound_socket();
2176 6758 : bound_sockets.push_back(bound_socket);
2177 6756 : explore_thread_rec(bound_socket, bound_sockets);
2178 :
2179 : // If the socket is FWD, we have to update all the other sockets with a backward
2180 : // exploration
2181 6758 : if (bound_socket->get_type() == socket_t::SFWD)
2182 740 : explore_thread_rec_reverse(bound_socket, bound_sockets);
2183 :
2184 22469 : for (auto sck : bound_sockets)
2185 15710 : dataptrs.push_back(sck->_get_dataptr());
2186 :
2187 6721 : contents->rebind_sockets[rebind_id].push_back(bound_sockets);
2188 6763 : contents->rebind_dataptrs[rebind_id].push_back(dataptrs);
2189 6764 : }
2190 :
2191 8787947 : modified_tasks[push_task] = [contents, push_task, adp_push, rebind_id]() -> const int*
2192 : {
2193 : // active or passive waiting here
2194 572606 : push_task->exec();
2195 593472 : const int* status = push_task->sockets.back()->get_dataptr<int>();
2196 : // rebind output sockets on the fly
2197 1611810 : for (size_t sout_id = 0; sout_id < contents->rebind_sockets[rebind_id].size(); sout_id++)
2198 : {
2199 : // we start to 1 because the rebinding of the 'push_task' is made in the
2200 : // 'push_task->exec()' call (this way the debug mode is still working)
2201 1042552 : auto swap_buff = contents->rebind_sockets[rebind_id][sout_id][1]->_get_dataptr();
2202 1038366 : auto buff = adp_push->get_empty_buffer(sout_id, swap_buff);
2203 1007822 : contents->rebind_sockets[rebind_id][sout_id][1]->dataptr = buff;
2204 : // the output socket linked to the push adp can have more than one socket bound and so
2205 : // we have to rebind all the input sokects bound to the current output socket
2206 1664806 : for (size_t ta = 2; ta < contents->rebind_sockets[rebind_id][sout_id].size(); ta++)
2207 655107 : contents->rebind_sockets[rebind_id][sout_id][ta]->dataptr = buff;
2208 : }
2209 595239 : adp_push->wake_up_puller();
2210 601612 : return status;
2211 3079 : };
2212 : }
2213 : }
2214 :
2215 162214 : for (auto task : contents->tasks)
2216 121583 : if (modified_tasks.count(task))
2217 9354 : contents->processes.push_back(modified_tasks[task]);
2218 : else
2219 112246 : contents->processes.push_back(
2220 5076359 : [task]() -> const int*
2221 : {
2222 2527637 : task->exec();
2223 2548722 : const int* status = task->sockets.back()->get_dataptr<int>();
2224 2352422 : return status;
2225 : });
2226 :
2227 71912 : for (auto c : cur_node->get_children())
2228 31310 : gen_processes_recursive(c, already_parsed_nodes);
2229 40619 : }
2230 48916 : };
2231 :
2232 1590 : size_t thread_id = 0;
2233 17649 : for (auto& sequence : this->sequences)
2234 : {
2235 16051 : if (this->is_thread_pinning())
2236 : {
2237 125 : if (!this->puids.empty())
2238 0 : tools::Thread_pinning::pin(this->puids[thread_id++]);
2239 : else
2240 125 : tools::Thread_pinning::pin(this->pin_objects_per_thread[thread_id++]);
2241 : }
2242 16053 : std::vector<tools::Digraph_node<Sub_sequence>*> already_parsed_nodes;
2243 16054 : gen_processes_recursive(sequence, already_parsed_nodes);
2244 :
2245 16059 : if (this->is_thread_pinning()) tools::Thread_pinning::unpin();
2246 16059 : }
2247 1591 : }
2248 :
2249 : void
2250 452 : Sequence::reset_no_copy_mode()
2251 : {
2252 : std::function<void(tools::Digraph_node<Sub_sequence>*, std::vector<tools::Digraph_node<Sub_sequence>*>&)>
2253 : reset_no_copy_mode_recursive =
2254 14094 : [&reset_no_copy_mode_recursive](tools::Digraph_node<Sub_sequence>* cur_node,
2255 9112 : std::vector<tools::Digraph_node<Sub_sequence>*>& already_parsed_nodes)
2256 : {
2257 28188 : if (cur_node != nullptr &&
2258 28188 : std::find(already_parsed_nodes.begin(), already_parsed_nodes.end(), cur_node) == already_parsed_nodes.end())
2259 : {
2260 12122 : already_parsed_nodes.push_back(cur_node);
2261 12122 : auto contents = cur_node->get_c();
2262 51032 : for (auto task : contents->tasks)
2263 : {
2264 42084 : if (dynamic_cast<module::Switcher*>(&task->get_module()) &&
2265 42084 : task->get_name().find("select") != std::string::npos)
2266 : {
2267 1587 : auto select_task = task;
2268 1587 : auto switcher = dynamic_cast<module::Switcher*>(&select_task->get_module());
2269 1587 : switcher->set_no_copy_select(false);
2270 : }
2271 :
2272 42084 : if (dynamic_cast<module::Switcher*>(&task->get_module()) &&
2273 42084 : task->get_name().find("commute") != std::string::npos)
2274 : {
2275 1587 : auto commute_task = task;
2276 1587 : auto switcher = dynamic_cast<module::Switcher*>(&commute_task->get_module());
2277 1587 : switcher->set_no_copy_commute(false);
2278 : }
2279 :
2280 45094 : if (dynamic_cast<module::Adaptor_m_to_n*>(&task->get_module()) &&
2281 45094 : task->get_name().find("pull") != std::string::npos)
2282 : {
2283 3092 : auto pull_task = task;
2284 3092 : auto adp_pull = dynamic_cast<module::Adaptor_m_to_n*>(&pull_task->get_module());
2285 3092 : adp_pull->set_no_copy_pull(false);
2286 3092 : adp_pull->reset_buffer();
2287 : }
2288 :
2289 45094 : if (dynamic_cast<module::Adaptor_m_to_n*>(&task->get_module()) &&
2290 45094 : task->get_name().find("push") != std::string::npos)
2291 : {
2292 3092 : auto push_task = task;
2293 3092 : auto adp_push = dynamic_cast<module::Adaptor_m_to_n*>(&push_task->get_module());
2294 3092 : adp_push->set_no_copy_push(false);
2295 3092 : adp_push->reset_buffer();
2296 : }
2297 : }
2298 :
2299 21480 : for (size_t rebind_id = 0; rebind_id < contents->rebind_sockets.size(); rebind_id++)
2300 26834 : for (size_t s = 0; s < contents->rebind_sockets[rebind_id].size(); s++)
2301 51923 : for (size_t ta = 0; ta < contents->rebind_sockets[rebind_id][s].size(); ta++)
2302 34447 : contents->rebind_sockets[rebind_id][s][ta]->dataptr =
2303 34447 : contents->rebind_dataptrs[rebind_id][s][ta];
2304 :
2305 21234 : for (auto c : cur_node->get_children())
2306 9112 : reset_no_copy_mode_recursive(c, already_parsed_nodes);
2307 : }
2308 14546 : };
2309 :
2310 5434 : for (auto& sequence : this->sequences)
2311 : {
2312 4982 : std::vector<tools::Digraph_node<Sub_sequence>*> already_parsed_nodes;
2313 4982 : reset_no_copy_mode_recursive(sequence, already_parsed_nodes);
2314 4982 : }
2315 452 : }
2316 :
2317 : void
2318 86 : Sequence::set_no_copy_mode(const bool no_copy_mode)
2319 : {
2320 86 : this->no_copy_mode = no_copy_mode;
2321 86 : }
2322 :
2323 : bool
2324 1254 : Sequence::is_no_copy_mode() const
2325 : {
2326 1254 : return this->no_copy_mode;
2327 : }
2328 :
2329 : void
2330 22 : Sequence::set_auto_stop(const bool auto_stop)
2331 : {
2332 22 : this->auto_stop = auto_stop;
2333 22 : }
2334 :
2335 : bool
2336 0 : Sequence::is_auto_stop() const
2337 : {
2338 0 : return this->auto_stop;
2339 : }
2340 :
2341 : Sub_sequence*
2342 18540 : Sequence::get_last_subsequence(const size_t tid)
2343 : {
2344 : std::function<Sub_sequence*(tools::Digraph_node<Sub_sequence>*, std::vector<tools::Digraph_node<Sub_sequence>*>&)>
2345 : get_last_subsequence_recursive =
2346 25716 : [&get_last_subsequence_recursive,
2347 : &tid](tools::Digraph_node<Sub_sequence>* cur_node,
2348 7176 : std::vector<tools::Digraph_node<Sub_sequence>*>& already_parsed_nodes) -> Sub_sequence*
2349 : {
2350 51432 : if (cur_node != nullptr &&
2351 51432 : std::find(already_parsed_nodes.begin(), already_parsed_nodes.end(), cur_node) == already_parsed_nodes.end())
2352 : {
2353 24306 : already_parsed_nodes.push_back(cur_node);
2354 24306 : if (!cur_node->get_children().size()) return cur_node->get_contents();
2355 5766 : Sub_sequence* last_ss = nullptr;
2356 12942 : for (auto c : cur_node->get_children())
2357 : {
2358 7176 : Sub_sequence* last_branch_ss = nullptr;
2359 7176 : last_branch_ss = get_last_subsequence_recursive(c, already_parsed_nodes);
2360 7176 : if (last_ss && last_branch_ss && last_ss != last_branch_ss)
2361 : {
2362 0 : std::stringstream message;
2363 : message << "Multiple candidates have been found for the last subsequence, this shouldn't be "
2364 0 : << "possible. (tid = " << tid << ", "
2365 0 : << "last_ss.id = " << last_ss->id << ", "
2366 0 : << "last_branch_ss.id = " << last_branch_ss->id << ")";
2367 0 : throw tools::runtime_error(__FILE__, __LINE__, __func__, message.str());
2368 0 : }
2369 7176 : last_ss = last_branch_ss ? last_branch_ss : last_ss;
2370 : }
2371 5766 : return last_ss;
2372 : }
2373 1410 : return nullptr;
2374 18540 : };
2375 :
2376 18540 : std::vector<tools::Digraph_node<Sub_sequence>*> already_parsed_nodes;
2377 37080 : return get_last_subsequence_recursive(this->sequences[tid], already_parsed_nodes);
2378 18540 : }
2379 :
2380 : void
2381 18540 : Sequence::update_tasks_id(const size_t tid)
2382 : {
2383 : std::function<void(tools::Digraph_node<Sub_sequence>*, std::vector<tools::Digraph_node<Sub_sequence>*>&)>
2384 : update_tasks_id_recursive =
2385 25716 : [&update_tasks_id_recursive](tools::Digraph_node<Sub_sequence>* cur_node,
2386 7176 : std::vector<tools::Digraph_node<Sub_sequence>*>& already_parsed_nodes)
2387 : {
2388 51432 : if (cur_node != nullptr &&
2389 51432 : std::find(already_parsed_nodes.begin(), already_parsed_nodes.end(), cur_node) == already_parsed_nodes.end())
2390 : {
2391 24306 : already_parsed_nodes.push_back(cur_node);
2392 24306 : Sub_sequence* ss = cur_node->get_contents();
2393 24306 : ss->tasks_id.resize(ss->tasks.size());
2394 24306 : std::iota(ss->tasks_id.begin(), ss->tasks_id.end(), ss->tasks_id.front());
2395 :
2396 31482 : for (auto c : cur_node->get_children())
2397 7176 : update_tasks_id_recursive(c, already_parsed_nodes);
2398 : }
2399 44256 : };
2400 :
2401 18540 : std::vector<tools::Digraph_node<Sub_sequence>*> already_parsed_nodes;
2402 37080 : return update_tasks_id_recursive(this->sequences[tid], already_parsed_nodes);
2403 18540 : }
2404 :
2405 : std::vector<runtime::Task*>
2406 7444 : Sequence::get_tasks_from_id(const size_t taid)
2407 : {
2408 : std::function<void(tools::Digraph_node<Sub_sequence>*,
2409 : const size_t,
2410 : std::vector<runtime::Task*>&,
2411 : std::vector<tools::Digraph_node<Sub_sequence>*>&)>
2412 : get_tasks_from_id_recursive =
2413 100739 : [&get_tasks_from_id_recursive](tools::Digraph_node<Sub_sequence>* cur_node,
2414 : const size_t taid,
2415 : std::vector<runtime::Task*>& tasks,
2416 38063 : std::vector<tools::Digraph_node<Sub_sequence>*>& already_parsed_nodes)
2417 : {
2418 201478 : if (cur_node != nullptr &&
2419 201478 : std::find(already_parsed_nodes.begin(), already_parsed_nodes.end(), cur_node) == already_parsed_nodes.end())
2420 : {
2421 94569 : already_parsed_nodes.push_back(cur_node);
2422 94569 : Sub_sequence* ss = cur_node->get_contents();
2423 94569 : bool found = false;
2424 257417 : for (size_t t = 0; t < ss->tasks_id.size(); t++)
2425 225524 : if (ss->tasks_id[t] == taid)
2426 : {
2427 62676 : tasks.push_back(ss->tasks[t]);
2428 62676 : found = true;
2429 62676 : break;
2430 : }
2431 :
2432 94569 : if (!found)
2433 69956 : for (auto c : cur_node->get_children())
2434 38063 : get_tasks_from_id_recursive(c, taid, tasks, already_parsed_nodes);
2435 : }
2436 108183 : };
2437 :
2438 7444 : std::vector<runtime::Task*> tasks;
2439 70120 : for (auto& s : this->sequences)
2440 : {
2441 62676 : std::vector<tools::Digraph_node<Sub_sequence>*> already_parsed_nodes;
2442 62676 : get_tasks_from_id_recursive(s, taid, tasks, already_parsed_nodes);
2443 62676 : }
2444 14888 : return tasks;
2445 7444 : }
2446 :
2447 : void
2448 2909 : Sequence::update_firsts_and_lasts_tasks()
2449 : {
2450 2909 : this->firsts_tasks.clear();
2451 2909 : this->firsts_tasks.resize(this->n_threads);
2452 6434 : for (auto taid : firsts_tasks_id)
2453 : {
2454 3525 : auto tasks = this->get_tasks_from_id(taid);
2455 32950 : for (size_t tid = 0; tid < tasks.size(); tid++)
2456 29425 : firsts_tasks[tid].push_back(tasks[tid]);
2457 3525 : }
2458 :
2459 2909 : this->lasts_tasks.clear();
2460 2909 : this->lasts_tasks.resize(this->n_threads);
2461 6828 : for (auto taid : lasts_tasks_id)
2462 : {
2463 3919 : auto tasks = this->get_tasks_from_id(taid);
2464 37170 : for (size_t tid = 0; tid < tasks.size(); tid++)
2465 33251 : lasts_tasks[tid].push_back(tasks[tid]);
2466 3919 : }
2467 2909 : }
2468 :
2469 : void
2470 440 : Sequence::_set_n_frames_unbind(std::vector<std::pair<runtime::Socket*, runtime::Socket*>>& unbind_sockets,
2471 : std::vector<std::pair<runtime::Task*, runtime::Socket*>>& unbind_tasks)
2472 : {
2473 : std::function<void(const std::vector<runtime::Task*>&,
2474 : tools::Digraph_node<Sub_sequence>*,
2475 : std::vector<tools::Digraph_node<Sub_sequence>*>&)>
2476 28226 : graph_traversal = [&graph_traversal, &unbind_sockets, &unbind_tasks](
2477 : const std::vector<runtime::Task*>& possessed_tsks,
2478 : tools::Digraph_node<Sub_sequence>* cur_node,
2479 77512 : std::vector<tools::Digraph_node<Sub_sequence>*>& already_parsed_nodes)
2480 : {
2481 56452 : if (cur_node != nullptr &&
2482 56452 : std::find(already_parsed_nodes.begin(), already_parsed_nodes.end(), cur_node) == already_parsed_nodes.end())
2483 : {
2484 23490 : already_parsed_nodes.push_back(cur_node);
2485 23490 : Sub_sequence* ss = cur_node->get_contents();
2486 76996 : for (auto tsk_out : ss->tasks)
2487 : {
2488 209378 : for (auto sck_out : tsk_out->sockets)
2489 : {
2490 155872 : if (sck_out->get_type() == socket_t::SOUT || sck_out->get_type() == socket_t::SFWD)
2491 : {
2492 107668 : auto sck_out_bound_sockets_cpy = sck_out->get_bound_sockets();
2493 163228 : for (auto sck_in : sck_out_bound_sockets_cpy)
2494 : {
2495 55560 : auto tsk_in = &sck_in->get_task();
2496 : // if the task of the current input socket is in the tasks of the sequence
2497 55560 : if (std::find(possessed_tsks.begin(), possessed_tsks.end(), tsk_in) != possessed_tsks.end())
2498 : {
2499 : try
2500 : {
2501 55560 : tsk_in->unbind(*sck_out);
2502 4656 : unbind_tasks.push_back(std::make_pair(tsk_in, sck_out.get()));
2503 : }
2504 50904 : catch (...)
2505 : {
2506 50904 : sck_in->unbind(*sck_out);
2507 : // memorize the unbinds to rebind after!
2508 50904 : unbind_sockets.push_back(std::make_pair(sck_in, sck_out.get()));
2509 50904 : }
2510 : }
2511 : }
2512 107668 : }
2513 155872 : }
2514 : }
2515 45442 : for (auto c : cur_node->get_children())
2516 21952 : graph_traversal(possessed_tsks, c, already_parsed_nodes);
2517 : }
2518 28666 : };
2519 :
2520 440 : auto tsks_per_threads = this->get_tasks_per_threads();
2521 440 : std::vector<tools::Digraph_node<Sub_sequence>*> already_parsed_nodes;
2522 6714 : for (size_t t = 0; t < this->get_n_threads(); t++)
2523 : {
2524 6274 : already_parsed_nodes.clear();
2525 6274 : graph_traversal(tsks_per_threads[t], this->sequences[t], already_parsed_nodes);
2526 : }
2527 440 : }
2528 :
2529 : void
2530 440 : Sequence::_set_n_frames(const size_t n_frames)
2531 : {
2532 440 : if (n_frames <= 0)
2533 : {
2534 0 : std::stringstream message;
2535 0 : message << "'n_frames' has to be greater than 0 ('n_frames' = " << n_frames << ").";
2536 0 : throw tools::invalid_argument(__FILE__, __LINE__, __func__, message.str());
2537 0 : }
2538 :
2539 : // set_n_frames for all the modules (be aware that this operation can fail)
2540 6714 : for (auto& mm : this->all_modules)
2541 57898 : for (auto& m : mm)
2542 51624 : m->set_n_frames(n_frames);
2543 440 : }
2544 :
2545 : void
2546 440 : Sequence::_set_n_frames_rebind(const std::vector<std::pair<runtime::Socket*, runtime::Socket*>>& unbind_sockets,
2547 : const std::vector<std::pair<runtime::Task*, runtime::Socket*>>& unbind_tasks)
2548 : {
2549 : // rebind the sockets
2550 51344 : for (auto& u : unbind_sockets)
2551 50904 : (*u.first) = *u.second;
2552 :
2553 : // rebind the tasks
2554 5096 : for (auto& u : unbind_tasks)
2555 4656 : (*u.first) = *u.second;
2556 440 : }
2557 :
2558 : void
2559 394 : Sequence::set_n_frames(const size_t n_frames)
2560 : {
2561 394 : const auto old_n_frames = this->get_n_frames();
2562 394 : if (old_n_frames != n_frames)
2563 : {
2564 302 : std::vector<std::pair<runtime::Socket*, runtime::Socket*>> unbind_sockets;
2565 302 : std::vector<std::pair<runtime::Task*, runtime::Socket*>> unbind_tasks;
2566 302 : this->_set_n_frames_unbind(unbind_sockets, unbind_tasks);
2567 302 : this->_set_n_frames(n_frames);
2568 302 : this->_set_n_frames_rebind(unbind_sockets, unbind_tasks);
2569 302 : }
2570 394 : }
2571 :
2572 : bool
2573 11 : Sequence::is_control_flow() const
2574 : {
2575 11 : return this->sequences[0]->get_children().size();
2576 : }
2577 :
2578 : // /!\ this check has been commented because it is known to do not work in the general case
2579 : /*
2580 : template<class SS>
2581 : void Sequence
2582 : ::check_ctrl_flow(tools::Digraph_node<SS>* root)
2583 : {
2584 : std::function<void(tools::Digraph_node<SS>*,
2585 : std::vector<tools::Digraph_node<SS>*>&)> check_control_flow_parity =
2586 : [&check_control_flow_parity](tools::Digraph_node<SS>* cur_node,
2587 : std::vector<tools::Digraph_node<SS>*> already_parsed_nodes) -> void
2588 : {
2589 : if (cur_node != nullptr &&
2590 : std::find(already_parsed_nodes.begin(),
2591 : already_parsed_nodes.end(),
2592 : cur_node) == already_parsed_nodes.end() &&
2593 : cur_node->get_children().size())
2594 : {
2595 : already_parsed_nodes.push_back(cur_node);
2596 : for (auto c : cur_node->get_children())
2597 : check_control_flow_parity(c, already_parsed_nodes);
2598 : }
2599 : else
2600 : {
2601 : already_parsed_nodes.push_back(cur_node);
2602 : std::vector<module::Module*> parsed_switchers;
2603 : for (size_t i = 0; i < already_parsed_nodes.size(); i++)
2604 : {
2605 : // This check occurs before dud-nodes are removed by _init, some nodes have no
2606 : contents and must be
2607 : // accounted for
2608 : if (already_parsed_nodes[i]->get_c() == nullptr ||
2609 : !(already_parsed_nodes[i]->get_c()->type == subseq_t::COMMUTE ||
2610 : already_parsed_nodes[i]->get_c()->type == subseq_t::SELECT))
2611 : continue;
2612 :
2613 : // We search for the first switcher task in the path taken: already_parsed_nodes
2614 : const runtime::Task *ctrl_task_first = nullptr;
2615 : const runtime::Task *ctrl_task_second = nullptr;
2616 : for (auto t : already_parsed_nodes[i]->get_c()->tasks)
2617 : {
2618 : if (dynamic_cast<const module::Switcher*>(&t->get_module()) &&
2619 : (t->get_name() == "select" || t->get_name() == "commute"))
2620 : {
2621 : ctrl_task_first = t;
2622 : break;
2623 : }
2624 : }
2625 :
2626 : if (std::find(parsed_switchers.begin(), parsed_switchers.end(),
2627 : &(ctrl_task_first->get_module())) != parsed_switchers.end()) continue;
2628 :
2629 : // We now search for the second switcher task in the path taken
2630 : auto expected_type = ctrl_task_first->get_name() == "select" ? subseq_t::COMMUTE
2631 : : subseq_t::SELECT; for (size_t j = i; j < already_parsed_nodes.size() && ctrl_task_second == nullptr; j++)
2632 : {
2633 : if (already_parsed_nodes[j]->get_c() == nullptr ||
2634 : already_parsed_nodes[j]->get_c()->type != expected_type)
2635 : continue;
2636 : for (auto t : already_parsed_nodes[j]->get_c()->tasks)
2637 : {
2638 : if ((t->get_name() == "select" || t->get_name() == "commute") &&
2639 : &(ctrl_task_first->get_module()) == &(t->get_module()))
2640 : {
2641 : parsed_switchers.push_back(&(t->get_module()));
2642 : ctrl_task_second = t;
2643 : break;
2644 : }
2645 : }
2646 : }
2647 :
2648 : if (ctrl_task_second == nullptr)
2649 : {
2650 : for (auto t : ctrl_task_first->get_module().tasks)
2651 : {
2652 : if ((ctrl_task_first->get_name() == "select" && t->get_name() ==
2653 : "commute") || (ctrl_task_first->get_name() == "commute" && t->get_name() == "select"))
2654 : {
2655 : ctrl_task_second = t.get();
2656 : break;
2657 : }
2658 : }
2659 : std::stringstream message;
2660 : message << ctrl_task_first->get_name() << " is missing a path to "
2661 : << ctrl_task_second->get_name() << ".";
2662 : throw tools::control_flow_error(__FILE__, __LINE__, __func__,
2663 : message.str());
2664 : }
2665 : }
2666 : }
2667 : };
2668 :
2669 : std::vector<tools::Digraph_node<SS>*> already_parsed_nodes;
2670 : return check_control_flow_parity(root, already_parsed_nodes);
2671 : }
2672 : */
|