Line data Source code
1 : #include <algorithm>
2 : #include <cstring>
3 : #include <exception>
4 : #include <fstream>
5 : #include <numeric>
6 : #include <set>
7 : #include <sstream>
8 : #include <thread>
9 : #include <utility>
10 :
11 : #include "Module/Module.hpp"
12 : #include "Module/Stateful/Adaptor/Adaptor_m_to_n.hpp"
13 : #include "Module/Stateful/Probe/Probe.hpp"
14 : #include "Module/Stateful/Switcher/Switcher.hpp"
15 : #include "Runtime/Sequence/Sequence.hpp"
16 : #include "Runtime/Socket/Socket.hpp"
17 : #include "Runtime/Task/Task.hpp"
18 : #include "Tools/Buffer_allocator/Buffer_allocator.hpp"
19 : #include "Tools/Display/rang_format/rang_format.h"
20 : #include "Tools/Exception/exception.hpp"
21 : #include "Tools/Signal_handler/Signal_handler.hpp"
22 : #include "Tools/Thread/Thread_pinning/Thread_pinning.hpp"
23 : #include "Tools/Thread/Thread_pinning/Thread_pinning_utils.hpp"
24 : #include "Tools/Thread/Thread_pool/Standard/Thread_pool_standard.hpp"
25 :
26 : using namespace spu;
27 : using namespace spu::runtime;
28 :
29 0 : Sequence::Sequence(const std::vector<const runtime::Task*>& firsts,
30 : const std::vector<const runtime::Task*>& lasts,
31 : const std::vector<const runtime::Task*>& exclusions,
32 : const size_t n_threads,
33 : const bool thread_pinning,
34 : const std::vector<size_t>& puids,
35 0 : const bool memory_allocation)
36 0 : : n_threads(n_threads)
37 0 : , sequences(n_threads, nullptr)
38 0 : , modules(n_threads)
39 0 : , all_modules(n_threads)
40 0 : , mtx_exception(new std::mutex())
41 0 : , force_exit_loop(new std::atomic<bool>(false))
42 0 : , tasks_inplace(false)
43 0 : , thread_pinning(thread_pinning)
44 0 : , puids(puids)
45 0 : , no_copy_mode(true)
46 0 : , saved_exclusions(exclusions)
47 0 : , switchers_reset(n_threads)
48 0 : , auto_stop(true)
49 0 : , is_part_of_pipeline(false)
50 0 : , next_round_is_over(n_threads, false)
51 0 : , cur_task_id(n_threads, 0)
52 0 : , cur_ss(n_threads, nullptr)
53 0 : , memory_allocation(memory_allocation)
54 : {
55 : #ifndef SPU_HWLOC
56 : if (thread_pinning)
57 : std::clog << rang::tag::warning
58 : << "StreamPU has not been linked with the 'hwloc' library, the 'thread_pinning' "
59 : "option of the 'runtime::Sequence' will have no effect."
60 : << std::endl;
61 : #endif
62 :
63 0 : if (thread_pinning && puids.size() < n_threads)
64 : {
65 0 : std::stringstream message;
66 0 : message << "'puids.size()' has to be greater or equal to 'n_threads' ('puids.size()' = " << puids.size()
67 0 : << " , 'n_threads' = " << n_threads << ").";
68 0 : throw tools::invalid_argument(__FILE__, __LINE__, __func__, message.str());
69 0 : }
70 :
71 0 : this->init<runtime::Sub_sequence_const, const runtime::Task>(firsts, lasts, exclusions);
72 0 : }
73 :
74 0 : Sequence::Sequence(const std::vector<const runtime::Task*>& firsts,
75 : const std::vector<const runtime::Task*>& lasts,
76 : const size_t n_threads,
77 : const bool thread_pinning,
78 : const std::vector<size_t>& puids,
79 0 : const bool memory_allocation)
80 0 : : Sequence(firsts, lasts, {}, n_threads, thread_pinning, puids, memory_allocation)
81 : {
82 0 : }
83 :
84 0 : Sequence::Sequence(const std::vector<const runtime::Task*>& firsts,
85 : const size_t n_threads,
86 : const bool thread_pinning,
87 : const std::vector<size_t>& puids,
88 0 : const bool memory_allocation)
89 0 : : Sequence(firsts, {}, {}, n_threads, thread_pinning, puids, memory_allocation)
90 : {
91 0 : }
92 :
93 0 : Sequence::Sequence(const runtime::Task& first,
94 : const runtime::Task& last,
95 : const size_t n_threads,
96 : const bool thread_pinning,
97 : const std::vector<size_t>& puids,
98 0 : const bool memory_allocation)
99 0 : : Sequence({ &first }, { &last }, n_threads, thread_pinning, puids, memory_allocation)
100 : {
101 0 : }
102 :
103 0 : Sequence::Sequence(const runtime::Task& first,
104 : const size_t n_threads,
105 : const bool thread_pinning,
106 : const std::vector<size_t>& puids,
107 0 : const bool memory_allocation)
108 0 : : Sequence({ &first }, n_threads, thread_pinning, puids, memory_allocation)
109 : {
110 0 : }
111 :
112 : std::vector<const runtime::Task*>
113 591 : exclusions_convert_to_const(const std::vector<runtime::Task*>& exclusions)
114 : {
115 591 : std::vector<const runtime::Task*> exclusions_const;
116 637 : for (auto exception : exclusions)
117 46 : exclusions_const.push_back(exception);
118 591 : return exclusions_const;
119 0 : }
120 :
121 526 : Sequence::Sequence(const std::vector<runtime::Task*>& firsts,
122 : const std::vector<runtime::Task*>& lasts,
123 : const std::vector<runtime::Task*>& exclusions,
124 : const size_t n_threads,
125 : const bool thread_pinning,
126 : const std::vector<size_t>& puids,
127 : const bool tasks_inplace,
128 526 : const bool memory_allocation)
129 526 : : n_threads(n_threads)
130 526 : , sequences(n_threads, nullptr)
131 526 : , modules(tasks_inplace ? n_threads - 1 : n_threads)
132 526 : , all_modules(n_threads)
133 526 : , mtx_exception(new std::mutex())
134 526 : , force_exit_loop(new std::atomic<bool>(false))
135 526 : , tasks_inplace(tasks_inplace)
136 526 : , thread_pinning(thread_pinning)
137 526 : , puids(puids)
138 526 : , no_copy_mode(true)
139 526 : , saved_exclusions(exclusions_convert_to_const(exclusions))
140 526 : , switchers_reset(n_threads)
141 526 : , auto_stop(true)
142 526 : , is_part_of_pipeline(false)
143 526 : , next_round_is_over(n_threads, false)
144 526 : , cur_task_id(n_threads, 0)
145 526 : , cur_ss(n_threads, nullptr)
146 2630 : , memory_allocation(memory_allocation)
147 : {
148 526 : if (thread_pinning && puids.size() < n_threads)
149 : {
150 0 : std::stringstream message;
151 0 : message << "'puids.size()' has greater or equal to 'n_threads' ('puids.size()' = " << puids.size()
152 0 : << " , 'n_threads' = " << n_threads << ").";
153 0 : throw tools::invalid_argument(__FILE__, __LINE__, __func__, message.str());
154 0 : }
155 :
156 526 : if (tasks_inplace)
157 526 : this->init<runtime::Sub_sequence, runtime::Task>(firsts, lasts, exclusions);
158 : else
159 : {
160 0 : std::vector<const runtime::Task*> firsts_bis;
161 0 : for (auto first : firsts)
162 0 : firsts_bis.push_back(first);
163 0 : std::vector<const runtime::Task*> lasts_bis;
164 0 : for (auto last : lasts)
165 0 : lasts_bis.push_back(last);
166 0 : std::vector<const runtime::Task*> exclusions_bis;
167 0 : for (auto exception : exclusions)
168 0 : exclusions_bis.push_back(exception);
169 0 : this->init<runtime::Sub_sequence_const, const runtime::Task>(firsts_bis, lasts_bis, exclusions_bis);
170 0 : }
171 526 : }
172 :
173 106 : Sequence::Sequence(const std::vector<runtime::Task*>& firsts,
174 : const std::vector<runtime::Task*>& lasts,
175 : const size_t n_threads,
176 : const bool thread_pinning,
177 : const std::vector<size_t>& puids,
178 : const bool tasks_inplace,
179 106 : const bool memory_allocation)
180 106 : : Sequence(firsts, lasts, {}, n_threads, thread_pinning, puids, tasks_inplace, memory_allocation)
181 : {
182 106 : }
183 :
184 112 : Sequence::Sequence(const std::vector<runtime::Task*>& firsts,
185 : const size_t n_threads,
186 : const bool thread_pinning,
187 : const std::vector<size_t>& puids,
188 : const bool tasks_inplace,
189 112 : const bool memory_allocation)
190 112 : : Sequence(firsts, {}, {}, n_threads, thread_pinning, puids, tasks_inplace, memory_allocation)
191 : {
192 112 : }
193 :
194 2 : Sequence::Sequence(runtime::Task& first,
195 : runtime::Task& last,
196 : const size_t n_threads,
197 : const bool thread_pinning,
198 : const std::vector<size_t>& puids,
199 : const bool tasks_inplace,
200 2 : const bool memory_allocation)
201 2 : : Sequence({ &first }, { &last }, n_threads, thread_pinning, puids, tasks_inplace, memory_allocation)
202 : {
203 2 : }
204 :
205 112 : Sequence::Sequence(runtime::Task& first,
206 : const size_t n_threads,
207 : const bool thread_pinning,
208 : const std::vector<size_t>& puids,
209 : const bool tasks_inplace,
210 112 : const bool memory_allocation)
211 112 : : Sequence({ &first }, n_threads, thread_pinning, puids, tasks_inplace, memory_allocation)
212 : {
213 112 : }
214 : //=======================================New pinning version constructors===============================================
215 0 : Sequence::Sequence(const std::vector<const runtime::Task*>& firsts,
216 : const std::vector<const runtime::Task*>& lasts,
217 : const std::vector<const runtime::Task*>& exclusions,
218 : const size_t n_threads,
219 : const bool thread_pinning,
220 : const std::string& sequence_pinning_policy,
221 0 : const bool memory_allocation)
222 0 : : n_threads(n_threads)
223 0 : , sequences(n_threads, nullptr)
224 0 : , modules(n_threads)
225 0 : , all_modules(n_threads)
226 0 : , mtx_exception(new std::mutex())
227 0 : , force_exit_loop(new std::atomic<bool>(false))
228 0 : , tasks_inplace(false)
229 0 : , thread_pinning(thread_pinning)
230 0 : , puids({})
231 0 : , no_copy_mode(true)
232 0 : , saved_exclusions(exclusions)
233 0 : , switchers_reset(n_threads)
234 0 : , auto_stop(true)
235 0 : , is_part_of_pipeline(false)
236 0 : , next_round_is_over(n_threads, false)
237 0 : , cur_task_id(n_threads, 0)
238 0 : , cur_ss(n_threads, nullptr)
239 0 : , memory_allocation(memory_allocation)
240 : {
241 : #ifndef SPU_HWLOC
242 : if (thread_pinning)
243 : std::clog << rang::tag::warning
244 : << "StreamPU has not been linked with the 'hwloc' library, the 'thread_pinning' "
245 : "option of the 'runtime::Sequence' will have no effect."
246 : << std::endl;
247 : #endif
248 :
249 0 : if (thread_pinning && !sequence_pinning_policy.empty())
250 : {
251 0 : pin_objects_per_thread = tools::Thread_pinning_utils::stage_parser_unpacker(sequence_pinning_policy, n_threads);
252 : }
253 0 : else if (thread_pinning && sequence_pinning_policy.empty())
254 : {
255 0 : std::stringstream message;
256 0 : message << "Pinning is activated but there is no specified policy." << std::endl;
257 0 : throw tools::invalid_argument(__FILE__, __LINE__, __func__, message.str());
258 0 : }
259 :
260 0 : this->init<runtime::Sub_sequence_const, const runtime::Task>(firsts, lasts, exclusions);
261 0 : }
262 :
263 0 : Sequence::Sequence(const std::vector<const runtime::Task*>& firsts,
264 : const std::vector<const runtime::Task*>& lasts,
265 : const size_t n_threads,
266 : const bool thread_pinning,
267 : const std::string& sequence_pinning_policy,
268 0 : const bool memory_allocation)
269 0 : : Sequence(firsts, lasts, {}, n_threads, thread_pinning, sequence_pinning_policy, memory_allocation)
270 : {
271 0 : }
272 :
273 0 : Sequence::Sequence(const std::vector<const runtime::Task*>& firsts,
274 : const size_t n_threads,
275 : const bool thread_pinning,
276 : const std::string& sequence_pinning_policy,
277 0 : const bool memory_allocation)
278 0 : : Sequence(firsts, {}, {}, n_threads, thread_pinning, sequence_pinning_policy, memory_allocation)
279 : {
280 0 : }
281 :
282 0 : Sequence::Sequence(const runtime::Task& first,
283 : const runtime::Task& last,
284 : const size_t n_threads,
285 : const bool thread_pinning,
286 : const std::string& sequence_pinning_policy,
287 0 : const bool memory_allocation)
288 0 : : Sequence({ &first }, { &last }, n_threads, thread_pinning, sequence_pinning_policy, memory_allocation)
289 : {
290 0 : }
291 :
292 0 : Sequence::Sequence(const runtime::Task& first,
293 : const size_t n_threads,
294 : const bool thread_pinning,
295 : const std::string& sequence_pinning_policy,
296 0 : const bool memory_allocation)
297 0 : : Sequence({ &first }, n_threads, thread_pinning, sequence_pinning_policy, memory_allocation)
298 : {
299 0 : }
300 :
301 65 : Sequence::Sequence(const std::vector<runtime::Task*>& firsts,
302 : const std::vector<runtime::Task*>& lasts,
303 : const std::vector<runtime::Task*>& exclusions,
304 : const size_t n_threads,
305 : const bool thread_pinning,
306 : const std::string& sequence_pinning_policy,
307 : const bool tasks_inplace,
308 65 : const bool memory_allocation)
309 65 : : n_threads(n_threads)
310 65 : , sequences(n_threads, nullptr)
311 65 : , modules(tasks_inplace ? n_threads - 1 : n_threads)
312 65 : , all_modules(n_threads)
313 65 : , mtx_exception(new std::mutex())
314 65 : , force_exit_loop(new std::atomic<bool>(false))
315 65 : , tasks_inplace(tasks_inplace)
316 65 : , thread_pinning(thread_pinning)
317 65 : , puids({})
318 65 : , no_copy_mode(true)
319 65 : , saved_exclusions(exclusions_convert_to_const(exclusions))
320 65 : , switchers_reset(n_threads)
321 65 : , auto_stop(true)
322 65 : , is_part_of_pipeline(false)
323 65 : , next_round_is_over(n_threads, false)
324 65 : , cur_task_id(n_threads, 0)
325 65 : , cur_ss(n_threads, nullptr)
326 325 : , memory_allocation(memory_allocation)
327 : {
328 65 : if (thread_pinning && !sequence_pinning_policy.empty())
329 : {
330 29 : pin_objects_per_thread = tools::Thread_pinning_utils::stage_parser_unpacker(sequence_pinning_policy, n_threads);
331 : }
332 36 : else if (thread_pinning && sequence_pinning_policy.empty())
333 : {
334 0 : std::stringstream message;
335 0 : message << "Pinning is activated but there is no specified policy." << std::endl;
336 0 : throw tools::invalid_argument(__FILE__, __LINE__, __func__, message.str());
337 0 : }
338 :
339 65 : if (tasks_inplace)
340 65 : this->init<runtime::Sub_sequence, runtime::Task>(firsts, lasts, exclusions);
341 : else
342 : {
343 0 : std::vector<const runtime::Task*> firsts_bis;
344 0 : for (auto first : firsts)
345 0 : firsts_bis.push_back(first);
346 0 : std::vector<const runtime::Task*> lasts_bis;
347 :
348 0 : for (auto last : lasts)
349 0 : lasts_bis.push_back(last);
350 0 : std::vector<const runtime::Task*> exclusions_bis;
351 :
352 0 : for (auto exception : exclusions)
353 0 : exclusions_bis.push_back(exception);
354 0 : this->init<runtime::Sub_sequence_const, const runtime::Task>(firsts_bis, lasts_bis, exclusions_bis);
355 0 : }
356 65 : }
357 :
358 0 : Sequence::Sequence(const std::vector<runtime::Task*>& firsts,
359 : const std::vector<runtime::Task*>& lasts,
360 : const size_t n_threads,
361 : const bool thread_pinning,
362 : const std::string& sequence_pinning_policy,
363 : const bool tasks_inplace,
364 0 : const bool memory_allocation)
365 0 : : Sequence(firsts, lasts, {}, n_threads, thread_pinning, sequence_pinning_policy, tasks_inplace, memory_allocation)
366 : {
367 0 : }
368 :
369 0 : Sequence::Sequence(const std::vector<runtime::Task*>& firsts,
370 : const size_t n_threads,
371 : const bool thread_pinning,
372 : const std::string& sequence_pinning_policy,
373 : const bool tasks_inplace,
374 0 : const bool memory_allocation)
375 0 : : Sequence(firsts, {}, {}, n_threads, thread_pinning, sequence_pinning_policy, tasks_inplace, memory_allocation)
376 : {
377 0 : }
378 :
379 0 : Sequence::Sequence(runtime::Task& first,
380 : runtime::Task& last,
381 : const size_t n_threads,
382 : const bool thread_pinning,
383 : const std::string& sequence_pinning_policy,
384 : const bool tasks_inplace,
385 0 : const bool memory_allocation)
386 : : Sequence({ &first },
387 : { &last },
388 : n_threads,
389 : thread_pinning,
390 : sequence_pinning_policy,
391 : tasks_inplace,
392 0 : memory_allocation)
393 : {
394 0 : }
395 :
396 0 : Sequence::Sequence(runtime::Task& first,
397 : const size_t n_threads,
398 : const bool thread_pinning,
399 : const std::string& sequence_pinning_policy,
400 : const bool tasks_inplace,
401 0 : const bool memory_allocation)
402 0 : : Sequence({ &first }, n_threads, thread_pinning, sequence_pinning_policy, tasks_inplace, memory_allocation)
403 : {
404 0 : }
405 :
406 : // ====================================================================================================================
407 :
408 1900 : Sequence::~Sequence()
409 : {
410 : // The following line is commented because it leads to double free correction in AFF3CT because the tasks can be
411 : // freed before the sequence :-(
412 : // if (this->memory_allocation) tools::Buffer_allocator::deallocate_sequence_memory(this);
413 :
414 687 : std::vector<tools::Digraph_node<Sub_sequence>*> already_deleted_nodes;
415 6784 : for (auto s : this->sequences)
416 6097 : this->delete_tree(s, already_deleted_nodes);
417 1213 : }
418 :
419 : template<class SS, class TA>
420 : void
421 687 : Sequence::init(const std::vector<TA*>& firsts, const std::vector<TA*>& lasts, const std::vector<TA*>& exclusions)
422 : {
423 687 : if (this->is_thread_pinning())
424 : {
425 29 : if (!this->puids.empty())
426 0 : tools::Thread_pinning::pin(this->puids[0]);
427 : else
428 29 : tools::Thread_pinning::pin(this->pin_objects_per_thread[0]);
429 : }
430 :
431 687 : if (firsts.size() == 0)
432 : {
433 0 : std::stringstream message;
434 0 : message << "'firsts.size()' has to be strictly greater than 0.";
435 0 : throw tools::invalid_argument(__FILE__, __LINE__, __func__, message.str());
436 0 : }
437 :
438 687 : if (this->n_threads == 0)
439 : {
440 0 : std::stringstream message;
441 0 : message << "'n_threads' has to be strictly greater than 0.";
442 0 : throw tools::invalid_argument(__FILE__, __LINE__, __func__, message.str());
443 0 : }
444 :
445 733 : for (auto exclusion : exclusions)
446 : {
447 46 : if (std::find(firsts.begin(), firsts.end(), exclusion) != firsts.end())
448 : {
449 0 : std::stringstream message;
450 : message << "'exclusion' can't be contained in the 'firsts' vector ("
451 : << "'exclusion'"
452 0 : << " = " << +exclusion << ", "
453 : << "'exclusion->get_name()'"
454 0 : << " = " << exclusion->get_name() << ").";
455 0 : throw tools::invalid_argument(__FILE__, __LINE__, __func__, message.str());
456 0 : }
457 :
458 46 : if (std::find(lasts.begin(), lasts.end(), exclusion) != lasts.end())
459 : {
460 0 : std::stringstream message;
461 : message << "'exclusion' can't be contained in the 'lasts' vector ("
462 : << "'exclusion'"
463 0 : << " = " << +exclusion << ", "
464 : << "'exclusion->get_name()'"
465 0 : << " = " << exclusion->get_name() << ").";
466 0 : throw tools::invalid_argument(__FILE__, __LINE__, __func__, message.str());
467 0 : }
468 : }
469 1205 : for (auto t : lasts)
470 : {
471 518 : if (dynamic_cast<const module::Switcher*>(&t->get_module()) && t->get_name() == "commute")
472 : {
473 0 : std::stringstream message;
474 0 : message << "A sequence cannot end with a 'commute' task.";
475 0 : throw tools::invalid_argument(__FILE__, __LINE__, __func__, message.str());
476 0 : }
477 : }
478 :
479 687 : auto root = new tools::Digraph_node<SS>({}, {}, nullptr, 0);
480 687 : root->set_contents(nullptr);
481 687 : size_t ssid = 0, taid = 0;
482 687 : std::vector<TA*> switchers;
483 687 : std::vector<std::pair<TA*, tools::Digraph_node<SS>*>> selectors;
484 687 : std::vector<TA*> real_lasts;
485 :
486 687 : this->lasts_tasks_id.clear();
487 687 : this->firsts_tasks_id.clear();
488 687 : auto last_subseq = root;
489 687 : std::map<TA*, unsigned> in_sockets_feed;
490 1540 : for (auto first : firsts)
491 : {
492 853 : std::map<TA*, std::pair<tools::Digraph_node<SS>*, size_t>> task_subseq;
493 853 : auto contents = last_subseq->get_contents();
494 853 : this->firsts_tasks_id.push_back(contents ? contents->tasks_id[contents->tasks_id.size() - 1] : 0);
495 853 : last_subseq = this->init_recursive<SS, TA>(last_subseq,
496 : ssid,
497 : taid,
498 : selectors,
499 : switchers,
500 : *first,
501 : *first,
502 : lasts,
503 : exclusions,
504 853 : this->lasts_tasks_id,
505 : real_lasts,
506 : in_sockets_feed,
507 : task_subseq);
508 : }
509 :
510 687 : std::stringstream real_lasts_ss;
511 1769 : for (size_t rl = 0; rl < real_lasts.size(); rl++)
512 : real_lasts_ss << "'real_lasts"
513 1082 : << "[" << rl << "]'"
514 1082 : << " = " << +real_lasts[rl] << ", "
515 : << "'real_lasts"
516 1082 : << "[" << rl << "]->get_name()'"
517 1082 : << " = " << real_lasts[rl]->get_name() << ((rl < real_lasts.size() - 1) ? ", " : "");
518 :
519 1205 : for (auto last : lasts)
520 : {
521 518 : if (std::find(real_lasts.begin(), real_lasts.end(), last) == real_lasts.end())
522 : {
523 0 : std::stringstream message;
524 0 : message << "'last' is not contained in the 'real_lasts[" << real_lasts.size() << "]' vector ("
525 : << "'last'"
526 0 : << " = " << +last << ", "
527 : << "'last->get_name()'"
528 0 : << " = " << last->get_name() << ", " << real_lasts_ss.str() << ").";
529 0 : throw tools::runtime_error(__FILE__, __LINE__, __func__, message.str());
530 0 : }
531 : }
532 :
533 687 : this->n_tasks = taid;
534 : // this->check_ctrl_flow(root); // /!\ this check has been commented because it is known to do not work in the
535 : // general case
536 687 : this->_init<SS>(root);
537 687 : this->update_firsts_and_lasts_tasks();
538 687 : this->gen_processes();
539 687 : this->donners = get_modules<tools::Interface_is_done>(true);
540 :
541 6784 : for (size_t tid = 0; tid < this->n_threads; tid++)
542 47463 : for (auto& mdl : this->all_modules[tid])
543 41366 : if (auto swi = dynamic_cast<module::Switcher*>(mdl))
544 2323 : this->switchers_reset[tid].push_back(dynamic_cast<tools::Interface_reset*>(swi));
545 :
546 6784 : for (size_t tid = 0; tid < this->sequences.size(); tid++)
547 : {
548 6097 : this->cur_ss[tid] = this->sequences[tid];
549 : }
550 :
551 : // Allocating Memory if requested
552 687 : if (this->memory_allocation)
553 : {
554 314 : tools::Buffer_allocator::allocate_sequence_memory(this);
555 : }
556 :
557 687 : this->thread_pool.reset(new tools::Thread_pool_standard(this->n_threads - 1));
558 687 : this->thread_pool->init(); // threads are spawned here
559 687 : }
560 :
561 : Sequence*
562 96 : Sequence::clone() const
563 : {
564 96 : auto c = new Sequence(*this);
565 :
566 96 : c->tasks_inplace = false;
567 96 : c->modules.resize(c->get_n_threads());
568 :
569 96 : std::vector<const runtime::Task*> firsts_tasks;
570 192 : for (auto ta : this->get_firsts_tasks()[0])
571 96 : firsts_tasks.push_back(ta);
572 :
573 96 : std::vector<const runtime::Task*> lasts_tasks;
574 192 : for (auto ta : this->get_lasts_tasks()[0])
575 96 : lasts_tasks.push_back(ta);
576 :
577 96 : c->init<runtime::Sub_sequence_const, const runtime::Task>(firsts_tasks, lasts_tasks, this->saved_exclusions);
578 96 : c->mtx_exception.reset(new std::mutex());
579 96 : c->force_exit_loop.reset(new std::atomic<bool>(false));
580 96 : return c;
581 96 : }
582 :
583 : void
584 0 : Sequence::set_thread_pinning(const bool thread_pinning, const std::vector<size_t>& puids)
585 : {
586 0 : if (thread_pinning && puids.size() < n_threads)
587 : {
588 0 : std::stringstream message;
589 0 : message << "'puids.size()' has greater or equal to 'n_threads' ('puids.size()' = " << puids.size()
590 0 : << " , 'n_threads' = " << n_threads << ").";
591 0 : throw tools::invalid_argument(__FILE__, __LINE__, __func__, message.str());
592 0 : }
593 :
594 0 : this->thread_pinning = thread_pinning;
595 0 : this->puids = puids;
596 0 : this->pin_objects_per_thread = {};
597 0 : }
598 :
599 : void
600 0 : Sequence::set_thread_pinning(const bool thread_pinning, const std::string& sequence_pinning_policy)
601 : {
602 0 : this->thread_pinning = thread_pinning;
603 0 : this->puids = {};
604 : this->pin_objects_per_thread =
605 0 : tools::Thread_pinning_utils::stage_parser_unpacker(sequence_pinning_policy, n_threads);
606 0 : }
607 :
608 : bool
609 63762 : Sequence::is_thread_pinning()
610 : {
611 63762 : return this->thread_pinning;
612 : }
613 :
614 : std::vector<std::vector<module::Module*>>
615 0 : Sequence::get_modules_per_threads() const
616 : {
617 0 : std::vector<std::vector<module::Module*>> modules_per_threads(this->all_modules.size());
618 0 : size_t tid = 0;
619 0 : for (auto& e : this->all_modules)
620 : {
621 0 : for (auto& ee : e)
622 0 : modules_per_threads[tid].push_back(ee);
623 0 : tid++;
624 : }
625 0 : return modules_per_threads;
626 0 : }
627 :
628 : std::vector<std::vector<module::Module*>>
629 2 : Sequence::get_modules_per_types() const
630 : {
631 2 : std::vector<std::vector<module::Module*>> modules_per_types(this->all_modules[0].size());
632 4 : for (auto& e : this->all_modules)
633 : {
634 2 : size_t mid = 0;
635 33 : for (auto& ee : e)
636 31 : modules_per_types[mid++].push_back(ee);
637 : }
638 2 : return modules_per_types;
639 0 : }
640 :
641 : std::vector<std::vector<runtime::Task*>>
642 5819 : Sequence::get_tasks_per_threads() const
643 : {
644 5819 : std::vector<std::vector<runtime::Task*>> tasks_per_threads(this->n_threads);
645 :
646 : std::function<void(
647 : tools::Digraph_node<Sub_sequence>*, const size_t, std::vector<tools::Digraph_node<Sub_sequence>*>&)>
648 200600 : get_tasks_recursive = [&](tools::Digraph_node<Sub_sequence>* cur_ss,
649 : const size_t tid,
650 : std::vector<tools::Digraph_node<Sub_sequence>*>& already_parsed_nodes)
651 : {
652 200600 : if (std::find(already_parsed_nodes.begin(), already_parsed_nodes.end(), cur_ss) == already_parsed_nodes.end())
653 : {
654 191646 : already_parsed_nodes.push_back(cur_ss);
655 383291 : tasks_per_threads[tid].insert(
656 191645 : tasks_per_threads[tid].end(), cur_ss->get_c()->tasks.begin(), cur_ss->get_c()->tasks.end());
657 :
658 233934 : for (auto c : cur_ss->get_children())
659 42288 : get_tasks_recursive(c, tid, already_parsed_nodes);
660 : }
661 206428 : };
662 :
663 164148 : for (size_t tid = 0; tid < this->n_threads; tid++)
664 : {
665 158324 : std::vector<tools::Digraph_node<Sub_sequence>*> already_parsed_nodes;
666 158325 : get_tasks_recursive(this->sequences[tid], tid, already_parsed_nodes);
667 158319 : }
668 :
669 11648 : return tasks_per_threads;
670 5824 : }
671 :
672 : std::vector<std::vector<runtime::Task*>>
673 29 : Sequence::get_tasks_per_types() const
674 : {
675 29 : std::vector<std::vector<runtime::Task*>> tasks_per_types(this->n_tasks);
676 :
677 : std::function<void(tools::Digraph_node<Sub_sequence>*, size_t&, std::vector<tools::Digraph_node<Sub_sequence>*>&)>
678 187 : get_tasks_recursive = [&](tools::Digraph_node<Sub_sequence>* cur_ss,
679 : size_t& mid,
680 : std::vector<tools::Digraph_node<Sub_sequence>*>& already_parsed_nodes)
681 : {
682 187 : if (std::find(already_parsed_nodes.begin(), already_parsed_nodes.end(), cur_ss) == already_parsed_nodes.end())
683 : {
684 187 : already_parsed_nodes.push_back(cur_ss);
685 1615 : for (auto& t : cur_ss->get_c()->tasks)
686 1428 : tasks_per_types[mid++].push_back(t);
687 :
688 187 : for (auto c : cur_ss->get_children())
689 0 : get_tasks_recursive(c, mid, already_parsed_nodes);
690 : }
691 216 : };
692 :
693 216 : for (size_t tid = 0; tid < this->n_threads; tid++)
694 : {
695 187 : size_t mid = 0;
696 187 : std::vector<tools::Digraph_node<Sub_sequence>*> already_parsed_nodes;
697 187 : get_tasks_recursive(this->sequences[tid], mid, already_parsed_nodes);
698 187 : }
699 :
700 58 : return tasks_per_types;
701 29 : }
702 :
703 : bool
704 923211 : Sequence::is_done() const
705 : {
706 934900 : for (auto donner : this->donners)
707 11750 : if (donner->is_done()) return true;
708 921744 : return false;
709 : }
710 :
711 : void
712 373 : Sequence::allocate_outbuffers()
713 : {
714 373 : if (!this->memory_allocation)
715 : {
716 373 : tools::Buffer_allocator::allocate_sequence_memory(this);
717 373 : this->memory_allocation = true;
718 : }
719 : else
720 : {
721 0 : std::stringstream message;
722 0 : message << "The memory is already allocated for this sequence";
723 0 : throw tools::invalid_argument(__FILE__, __LINE__, __func__, message.str());
724 0 : }
725 373 : }
726 :
727 : void
728 373 : Sequence::deallocate_outbuffers()
729 : {
730 373 : if (this->memory_allocation)
731 : {
732 373 : tools::Buffer_allocator::deallocate_sequence_memory(this);
733 373 : this->memory_allocation = false;
734 : }
735 : else
736 : {
737 0 : std::stringstream message;
738 0 : message << "Sequence memory is not allocated";
739 0 : throw tools::invalid_argument(__FILE__, __LINE__, __func__, message.str());
740 0 : }
741 373 : }
742 :
743 : void
744 0 : Sequence::_exec(const size_t tid,
745 : std::function<bool(const std::vector<const int*>&)>& stop_condition,
746 : tools::Digraph_node<Sub_sequence>* sequence)
747 : {
748 0 : tools::Signal_handler::reset_sigint();
749 :
750 0 : if (this->is_thread_pinning())
751 : {
752 0 : if (!puids.empty())
753 0 : tools::Thread_pinning::pin(this->puids[tid]);
754 : else
755 0 : tools::Thread_pinning::pin(this->pin_objects_per_thread[tid]);
756 : }
757 :
758 : std::function<void(tools::Digraph_node<Sub_sequence>*, std::vector<const int*>&)> exec_sequence =
759 0 : [&exec_sequence](tools::Digraph_node<Sub_sequence>* cur_ss, std::vector<const int*>& statuses)
760 : {
761 0 : auto type = cur_ss->get_c()->type;
762 0 : auto& tasks_id = cur_ss->get_c()->tasks_id;
763 0 : auto& processes = cur_ss->get_c()->processes;
764 :
765 0 : if (type == subseq_t::COMMUTE)
766 : {
767 0 : statuses[tasks_id[0]] = processes[0]();
768 0 : const int path = statuses[tasks_id[0]][0];
769 0 : if (cur_ss->get_children().size() > (size_t)path) exec_sequence(cur_ss->get_children()[path], statuses);
770 : }
771 : else
772 : {
773 0 : for (size_t p = 0; p < processes.size(); p++)
774 0 : statuses[tasks_id[p]] = processes[p]();
775 0 : for (auto c : cur_ss->get_children())
776 0 : exec_sequence(c, statuses);
777 : }
778 0 : };
779 :
780 0 : std::vector<const int*> statuses(this->n_tasks, nullptr);
781 : try
782 : {
783 : do
784 : {
785 : // force switchers reset to reinitialize the path to the last input socket
786 0 : for (size_t s = 0; s < this->switchers_reset[tid].size(); s++)
787 0 : this->switchers_reset[tid][s]->reset();
788 :
789 0 : std::fill(statuses.begin(), statuses.end(), nullptr);
790 : try
791 : {
792 0 : exec_sequence(sequence, statuses);
793 : }
794 0 : catch (tools::processing_aborted const&)
795 : {
796 : // do nothing, this is normal
797 0 : }
798 0 : } while (!*force_exit_loop && !stop_condition(statuses) && !tools::Signal_handler::is_sigint());
799 : }
800 0 : catch (tools::waiting_canceled const&)
801 : {
802 : // do nothing, this is normal
803 0 : }
804 0 : catch (std::exception const& e)
805 : {
806 0 : *force_exit_loop = true;
807 :
808 0 : this->mtx_exception->lock();
809 :
810 0 : auto save = tools::exception::no_stacktrace;
811 0 : tools::exception::no_stacktrace = true;
812 0 : std::string msg = e.what(); // get only the function signature
813 0 : tools::exception::no_stacktrace = save;
814 :
815 0 : if (std::find(this->prev_exception_messages.begin(), this->prev_exception_messages.end(), msg) ==
816 0 : this->prev_exception_messages.end())
817 : {
818 0 : this->prev_exception_messages.push_back(msg); // save only the function signature
819 0 : this->prev_exception_messages_to_display.push_back(e.what()); // with stacktrace if debug mode
820 : }
821 0 : else if (std::strlen(e.what()) > this->prev_exception_messages_to_display.back().size())
822 0 : this->prev_exception_messages_to_display[prev_exception_messages_to_display.size() - 1] = e.what();
823 :
824 0 : this->mtx_exception->unlock();
825 0 : }
826 :
827 0 : if (this->is_thread_pinning()) tools::Thread_pinning::unpin();
828 0 : }
829 :
830 : void
831 3940 : Sequence::_exec_without_statuses(const size_t tid,
832 : std::function<bool()>& stop_condition,
833 : tools::Digraph_node<Sub_sequence>* sequence)
834 : {
835 3940 : tools::Signal_handler::reset_sigint();
836 :
837 3933 : if (this->is_thread_pinning())
838 : {
839 44 : if (!puids.empty())
840 0 : tools::Thread_pinning::pin(this->puids[tid]);
841 : else
842 43 : tools::Thread_pinning::pin(this->pin_objects_per_thread[tid]);
843 : }
844 :
845 : std::function<void(tools::Digraph_node<Sub_sequence>*)> exec_sequence =
846 1359599 : [&exec_sequence](tools::Digraph_node<Sub_sequence>* cur_ss)
847 : {
848 1138789 : auto type = cur_ss->get_c()->type;
849 1139423 : auto& processes = cur_ss->get_c()->processes;
850 :
851 1139834 : if (type == subseq_t::COMMUTE)
852 : {
853 61838 : const int path = processes[0]()[0];
854 61888 : if (cur_ss->get_children().size() > (size_t)path) exec_sequence(cur_ss->get_children()[path]);
855 : }
856 : else
857 : {
858 5039372 : for (auto& process : processes)
859 3888317 : process();
860 1067903 : for (auto c : cur_ss->get_children())
861 158913 : exec_sequence(c);
862 : }
863 1145104 : };
864 :
865 : try
866 : {
867 : do
868 : {
869 : // force switchers reset to reinitialize the path to the last input socket
870 941073 : for (size_t s = 0; s < this->switchers_reset[tid].size(); s++)
871 15156 : this->switchers_reset[tid][s]->reset();
872 :
873 : try
874 : {
875 925938 : exec_sequence(sequence);
876 : }
877 3030 : catch (tools::processing_aborted const&)
878 : {
879 : // do nothing, this is normal
880 5 : }
881 931334 : } while (!*force_exit_loop && !stop_condition() && !tools::Signal_handler::is_sigint());
882 : }
883 3005 : catch (tools::waiting_canceled const&)
884 : {
885 : // do nothing, this is normal
886 3023 : }
887 0 : catch (std::exception const& e)
888 : {
889 0 : *force_exit_loop = true;
890 :
891 0 : this->mtx_exception->lock();
892 :
893 0 : auto save = tools::exception::no_stacktrace;
894 0 : tools::exception::no_stacktrace = true;
895 0 : std::string msg = e.what(); // get only the function signature
896 0 : tools::exception::no_stacktrace = save;
897 :
898 0 : if (std::find(this->prev_exception_messages.begin(), this->prev_exception_messages.end(), msg) ==
899 0 : this->prev_exception_messages.end())
900 : {
901 0 : this->prev_exception_messages.push_back(msg); // save only the function signature
902 0 : this->prev_exception_messages_to_display.push_back(e.what()); // with stacktrace if debug mode
903 : }
904 0 : else if (std::strlen(e.what()) > this->prev_exception_messages_to_display.back().size())
905 0 : this->prev_exception_messages_to_display[prev_exception_messages_to_display.size() - 1] = e.what();
906 :
907 0 : this->mtx_exception->unlock();
908 0 : }
909 :
910 5193 : if (this->is_thread_pinning()) tools::Thread_pinning::unpin();
911 4997 : }
912 :
913 : void
914 0 : Sequence::exec(std::function<bool(const std::vector<const int*>&)> stop_condition)
915 : {
916 0 : if (this->is_no_copy_mode()) this->gen_processes(true);
917 :
918 0 : std::function<bool(const std::vector<const int*>&)> real_stop_condition;
919 0 : if (this->auto_stop)
920 0 : real_stop_condition = [this, stop_condition](const std::vector<const int*>& statuses)
921 : {
922 0 : bool res = stop_condition(statuses);
923 0 : return res || this->is_done();
924 0 : };
925 : else
926 0 : real_stop_condition = stop_condition;
927 :
928 0 : std::function<void(const size_t)> func_exec = [this, &real_stop_condition](const size_t tid)
929 0 : { this->Sequence::_exec(tid + 1, real_stop_condition, this->sequences[tid + 1]); };
930 :
931 0 : this->thread_pool->run(func_exec, true);
932 0 : this->_exec(0, real_stop_condition, this->sequences[0]);
933 0 : this->thread_pool->wait();
934 :
935 0 : this->thread_pool->unset_func_exec();
936 :
937 0 : if (this->is_no_copy_mode() && !this->is_part_of_pipeline)
938 : {
939 0 : this->reset_no_copy_mode();
940 0 : this->gen_processes(false);
941 : }
942 :
943 0 : if (!this->prev_exception_messages_to_display.empty())
944 : {
945 0 : *force_exit_loop = false;
946 0 : throw std::runtime_error(this->prev_exception_messages_to_display.back());
947 : }
948 0 : }
949 :
950 : void
951 434 : Sequence::exec(std::function<bool()> stop_condition)
952 : {
953 434 : if (this->is_no_copy_mode()) this->gen_processes(true);
954 :
955 460 : std::function<bool()> real_stop_condition;
956 460 : if (this->auto_stop)
957 1843197 : real_stop_condition = [this, stop_condition]()
958 : {
959 920216 : bool res = stop_condition();
960 924974 : return res || this->is_done();
961 449 : };
962 : else
963 11 : real_stop_condition = stop_condition;
964 :
965 6800 : std::function<void(const size_t)> func_exec = [this, &real_stop_condition](const size_t tid)
966 3857 : { this->Sequence::_exec_without_statuses(tid + 1, real_stop_condition, this->sequences[tid + 1]); };
967 :
968 460 : this->thread_pool->run(func_exec, true);
969 460 : this->_exec_without_statuses(0, real_stop_condition, this->sequences[0]);
970 448 : this->thread_pool->wait();
971 :
972 457 : this->thread_pool->unset_func_exec();
973 :
974 450 : if (this->is_no_copy_mode() && !this->is_part_of_pipeline)
975 : {
976 79 : this->reset_no_copy_mode();
977 79 : this->gen_processes(false);
978 : }
979 :
980 450 : if (!this->prev_exception_messages_to_display.empty())
981 : {
982 0 : *force_exit_loop = false;
983 0 : throw std::runtime_error(this->prev_exception_messages_to_display.back());
984 : }
985 451 : }
986 :
987 : void
988 218 : Sequence::exec()
989 : {
990 603571 : this->exec([]() { return false; });
991 269 : }
992 :
993 : void
994 118 : Sequence::exec_seq(const size_t tid, const int frame_id)
995 : {
996 118 : if (tid >= this->sequences.size())
997 : {
998 0 : std::stringstream message;
999 0 : message << "'tid' has to be smaller than 'sequences.size()' ('tid' = " << tid
1000 0 : << ", 'sequences.size()' = " << this->sequences.size() << ").";
1001 0 : throw tools::runtime_error(__FILE__, __LINE__, __func__, message.str());
1002 0 : }
1003 :
1004 : std::function<void(tools::Digraph_node<Sub_sequence>*)> exec_sequence =
1005 849 : [&exec_sequence, frame_id](tools::Digraph_node<Sub_sequence>* cur_ss)
1006 : {
1007 118 : auto type = cur_ss->get_c()->type;
1008 118 : auto& tasks = cur_ss->get_c()->tasks;
1009 118 : if (type == subseq_t::COMMUTE)
1010 : {
1011 0 : const int path = tasks[0]->exec(frame_id)[0];
1012 0 : if (cur_ss->get_children().size() > (size_t)path) exec_sequence(cur_ss->get_children()[path]);
1013 : }
1014 : else
1015 : {
1016 855 : for (size_t ta = 0; ta < tasks.size(); ta++)
1017 731 : tasks[ta]->exec(frame_id);
1018 124 : for (auto c : cur_ss->get_children())
1019 0 : exec_sequence(c);
1020 : }
1021 243 : };
1022 :
1023 121 : exec_sequence(this->sequences[tid]);
1024 124 : }
1025 :
1026 : runtime::Task*
1027 122723 : Sequence::exec_step(const size_t tid, const int frame_id)
1028 : {
1029 122723 : if (tid >= this->sequences.size())
1030 : {
1031 0 : std::stringstream message;
1032 0 : message << "'tid' has to be smaller than 'sequences.size()' ('tid' = " << tid
1033 0 : << ", 'sequences.size()' = " << this->sequences.size() << ").";
1034 0 : throw tools::runtime_error(__FILE__, __LINE__, __func__, message.str());
1035 0 : }
1036 :
1037 122723 : runtime::Task* executed_task = nullptr;
1038 122723 : if (this->next_round_is_over[tid])
1039 : {
1040 6109 : this->next_round_is_over[tid] = false;
1041 6109 : this->cur_ss[tid] = this->sequences[tid];
1042 6109 : this->cur_task_id[tid] = 0;
1043 : }
1044 : else
1045 : {
1046 116614 : executed_task = this->cur_ss[tid]->get_c()->tasks[cur_task_id[tid]];
1047 116614 : const std::vector<int>& ret = executed_task->exec(frame_id);
1048 :
1049 116612 : auto type = this->cur_ss[tid]->get_c()->type;
1050 116612 : if (type == subseq_t::COMMUTE)
1051 : {
1052 15814 : const size_t path = (size_t)ret[0];
1053 15814 : if (this->cur_ss[tid]->get_children().size() > path)
1054 : {
1055 15814 : this->cur_ss[tid] = this->cur_ss[tid]->get_children()[path];
1056 15814 : this->cur_task_id[tid] = 0;
1057 :
1058 15814 : if (this->cur_ss[tid]->get_c()->tasks.size() == 0)
1059 : {
1060 : // skip nodes without tasks if any
1061 0 : while (this->cur_ss[tid]->get_children().size() > 0)
1062 : {
1063 0 : this->cur_ss[tid] = this->cur_ss[tid]->get_children()[0];
1064 0 : this->cur_task_id[tid] = 0;
1065 0 : if (this->cur_ss[tid]->get_c() && this->cur_ss[tid]->get_c()->tasks.size() > 0) break;
1066 : }
1067 0 : if (this->cur_task_id[tid] >= this->cur_ss[tid]->get_c()->tasks.size())
1068 0 : this->next_round_is_over[tid] = true;
1069 : }
1070 : }
1071 : else
1072 : {
1073 0 : std::stringstream message;
1074 0 : message << "This should never happen ('path' = " << path
1075 0 : << ", 'cur_ss[tid]->get_children().size()' = " << this->cur_ss[tid]->get_children().size()
1076 0 : << ", 'tid' = " << tid << ").";
1077 0 : throw tools::invalid_argument(__FILE__, __LINE__, __func__, message.str());
1078 0 : }
1079 : }
1080 : else
1081 : {
1082 100798 : this->cur_task_id[tid]++;
1083 100798 : if (this->cur_task_id[tid] == (this->cur_ss[tid]->get_c()->tasks.size()))
1084 : {
1085 : // skip nodes without tasks if any
1086 47695 : while (this->cur_ss[tid]->get_children().size() > 0)
1087 : {
1088 41586 : this->cur_ss[tid] = this->cur_ss[tid]->get_children()[0];
1089 41586 : this->cur_task_id[tid] = 0;
1090 41586 : if (this->cur_ss[tid]->get_c() && this->cur_ss[tid]->get_c()->tasks.size() > 0) break;
1091 : }
1092 47695 : if (this->cur_task_id[tid] >= this->cur_ss[tid]->get_c()->tasks.size())
1093 6109 : this->next_round_is_over[tid] = true;
1094 : }
1095 : }
1096 : }
1097 :
1098 122721 : return executed_task;
1099 : }
1100 :
1101 : template<class SS, class TA>
1102 : tools::Digraph_node<SS>*
1103 3661 : Sequence::init_recursive(tools::Digraph_node<SS>* cur_subseq,
1104 : size_t& ssid,
1105 : size_t& taid,
1106 : std::vector<std::pair<TA*, tools::Digraph_node<SS>*>>& selectors,
1107 : std::vector<TA*>& switchers,
1108 : TA& first,
1109 : TA& current_task,
1110 : const std::vector<TA*>& lasts,
1111 : const std::vector<TA*>& exclusions,
1112 : std::vector<size_t>& real_lasts_id,
1113 : std::vector<TA*>& real_lasts,
1114 : std::map<TA*, unsigned>& in_sockets_feed,
1115 : std::map<TA*, std::pair<tools::Digraph_node<SS>*, size_t>>& task_subseq)
1116 : {
1117 :
1118 3661 : if (dynamic_cast<const module::Adaptor_m_to_n*>(¤t_task.get_module()) && !this->tasks_inplace)
1119 : {
1120 0 : std::stringstream message;
1121 0 : message << "'module::Adaptor_m_to_n' objects are not supported if 'tasks_inplace' is set to false.";
1122 0 : throw tools::runtime_error(__FILE__, __LINE__, __func__, message.str());
1123 0 : }
1124 :
1125 3661 : auto it = std::find(real_lasts.begin(), real_lasts.end(), ¤t_task);
1126 3661 : if (it != real_lasts.end())
1127 : {
1128 0 : real_lasts.erase(it);
1129 0 : auto dist = std::distance(real_lasts.begin(), it);
1130 0 : real_lasts_id.erase(real_lasts_id.begin() + dist);
1131 : }
1132 :
1133 3661 : if (cur_subseq->get_contents() == nullptr)
1134 : {
1135 687 : cur_subseq->set_contents(new SS());
1136 687 : ssid++;
1137 : }
1138 :
1139 3661 : bool is_last = true;
1140 3661 : tools::Digraph_node<SS>* last_subseq = nullptr;
1141 :
1142 3661 : if (auto switcher = dynamic_cast<const module::Switcher*>(¤t_task.get_module()))
1143 : {
1144 168 : const auto current_task_name = current_task.get_name();
1145 252 : if (current_task_name == "commute" && // -------------------------------------------------------------- COMMUTE
1146 252 : std::find(switchers.begin(), switchers.end(), ¤t_task) == switchers.end())
1147 : {
1148 84 : switchers.push_back(¤t_task);
1149 84 : auto node_commute = new tools::Digraph_node<SS>({ cur_subseq }, {}, nullptr, cur_subseq->get_depth() + 1);
1150 :
1151 84 : node_commute->set_contents(new SS());
1152 84 : node_commute->get_c()->tasks.push_back(¤t_task);
1153 84 : node_commute->get_c()->tasks_id.push_back(taid++);
1154 84 : node_commute->get_c()->type = subseq_t::COMMUTE;
1155 84 : ssid++;
1156 :
1157 84 : cur_subseq->add_child(node_commute);
1158 :
1159 263 : for (size_t sdo = 0; sdo < switcher->get_n_data_sockets(); sdo++)
1160 : {
1161 358 : auto node_commute_son =
1162 179 : new tools::Digraph_node<SS>({ node_commute }, {}, nullptr, node_commute->get_depth() + 1);
1163 :
1164 179 : node_commute_son->set_contents(new SS());
1165 179 : ssid++;
1166 :
1167 179 : node_commute->add_child(node_commute_son);
1168 :
1169 179 : auto& bss = (*switcher)[module::swi::tsk::commute].sockets[sdo + 2]->get_bound_sockets();
1170 364 : for (auto bs : bss)
1171 : {
1172 185 : if (bs == nullptr) continue;
1173 185 : auto& t = bs->get_task();
1174 185 : if (std::find(exclusions.begin(), exclusions.end(), &t) == exclusions.end())
1175 : {
1176 185 : if (task_subseq.find(&t) == task_subseq.end() || task_subseq[&t].second < ssid)
1177 179 : task_subseq[&t] = { node_commute_son, ssid };
1178 :
1179 185 : in_sockets_feed.find(&t) != in_sockets_feed.end() ? in_sockets_feed[&t]++
1180 142 : : in_sockets_feed[&t] = 1;
1181 185 : bool t_is_select =
1182 185 : dynamic_cast<const module::Switcher*>(&(t.get_module())) && t.get_name() == "select";
1183 129 : if ((!t_is_select && in_sockets_feed[&t] >= (t.get_n_input_sockets() + t.get_n_fwd_sockets()) -
1184 370 : t.get_n_static_input_sockets()) ||
1185 56 : (t_is_select && t.is_last_input_socket(*bs)))
1186 : {
1187 148 : is_last = false;
1188 148 : last_subseq = Sequence::init_recursive<SS, TA>(task_subseq[&t].first,
1189 296 : task_subseq[&t].second,
1190 : taid,
1191 : selectors,
1192 : switchers,
1193 : first,
1194 : t,
1195 : lasts,
1196 : exclusions,
1197 : real_lasts_id,
1198 : real_lasts,
1199 : in_sockets_feed,
1200 : task_subseq);
1201 : }
1202 37 : else if (t_is_select)
1203 : {
1204 37 : tools::Digraph_node<SS>* node_selector = nullptr;
1205 43 : for (auto& sel : selectors)
1206 43 : if (sel.first == &t)
1207 : {
1208 37 : node_selector = sel.second;
1209 37 : break;
1210 : }
1211 :
1212 37 : if (!node_selector)
1213 : {
1214 0 : node_selector = new tools::Digraph_node<SS>(
1215 0 : { node_commute_son }, {}, nullptr, node_commute_son->get_depth() + 1);
1216 0 : selectors.push_back({ &t, node_selector });
1217 : }
1218 : else
1219 37 : node_selector->add_parent(node_commute_son);
1220 37 : node_commute_son->add_child(node_selector);
1221 : }
1222 : }
1223 : }
1224 : }
1225 : // exception for the status socket
1226 : auto& bss =
1227 84 : (*switcher)[module::swi::tsk::commute].sockets[switcher->get_n_data_sockets() + 2]->get_bound_sockets();
1228 84 : for (auto bs : bss)
1229 : {
1230 0 : if (bs == nullptr) continue;
1231 0 : auto& t = bs->get_task();
1232 0 : if (std::find(exclusions.begin(), exclusions.end(), &t) == exclusions.end())
1233 : {
1234 0 : if (task_subseq.find(&t) == task_subseq.end() || task_subseq[&t].second < ssid)
1235 0 : task_subseq[&t] = { node_commute, ssid };
1236 :
1237 0 : in_sockets_feed.find(&t) != in_sockets_feed.end() ? in_sockets_feed[&t]++ : in_sockets_feed[&t] = 1;
1238 0 : bool t_is_select =
1239 0 : dynamic_cast<const module::Switcher*>(&(t.get_module())) && t.get_name() == "select";
1240 0 : if ((!t_is_select && in_sockets_feed[&t] >= (t.get_n_input_sockets() + t.get_n_fwd_sockets()) -
1241 0 : t.get_n_static_input_sockets()) ||
1242 0 : (t_is_select && t.is_last_input_socket(*bs)))
1243 : {
1244 0 : is_last = false;
1245 0 : last_subseq = Sequence::init_recursive<SS, TA>(task_subseq[&t].first,
1246 0 : task_subseq[&t].second,
1247 : taid,
1248 : selectors,
1249 : switchers,
1250 : first,
1251 : t,
1252 : lasts,
1253 : exclusions,
1254 : real_lasts_id,
1255 : real_lasts,
1256 : in_sockets_feed,
1257 : task_subseq);
1258 : }
1259 : }
1260 : }
1261 : }
1262 84 : else if (current_task_name == "select") // ------------------------------------------------------------- SELECT
1263 : {
1264 84 : tools::Digraph_node<SS>* node_selector = nullptr;
1265 :
1266 109 : for (auto& sel : selectors)
1267 47 : if (sel.first == ¤t_task)
1268 : {
1269 22 : node_selector = sel.second;
1270 22 : break;
1271 : }
1272 :
1273 84 : if (!node_selector)
1274 : {
1275 62 : node_selector = new tools::Digraph_node<SS>({ cur_subseq }, {}, nullptr, cur_subseq->get_depth() + 1);
1276 62 : selectors.push_back({ ¤t_task, node_selector });
1277 : }
1278 :
1279 84 : node_selector->set_contents(new SS());
1280 84 : node_selector->get_c()->tasks.push_back(¤t_task);
1281 84 : node_selector->get_c()->tasks_id.push_back(taid++);
1282 84 : node_selector->get_c()->type = subseq_t::SELECT;
1283 84 : ssid++;
1284 :
1285 84 : cur_subseq->add_child(node_selector);
1286 :
1287 168 : auto node_selector_son =
1288 84 : new tools::Digraph_node<SS>({ node_selector }, {}, nullptr, node_selector->get_depth() + 1);
1289 :
1290 84 : node_selector_son->set_contents(new SS());
1291 84 : ssid++;
1292 :
1293 84 : node_selector->add_child(node_selector_son);
1294 :
1295 599 : for (auto& s : current_task.sockets)
1296 : {
1297 347 : if (!(s->get_type() == socket_t::SOUT)) continue;
1298 168 : auto bss = s->get_bound_sockets();
1299 308 : for (auto bs : bss)
1300 : {
1301 140 : if (bs == nullptr) continue;
1302 140 : auto& t = bs->get_task();
1303 140 : if (std::find(exclusions.begin(), exclusions.end(), &t) == exclusions.end())
1304 : {
1305 140 : if (task_subseq.find(&t) == task_subseq.end() || task_subseq[&t].second < ssid)
1306 140 : task_subseq[&t] = { node_selector_son, ssid };
1307 :
1308 140 : in_sockets_feed.find(&t) != in_sockets_feed.end() ? in_sockets_feed[&t]++
1309 140 : : in_sockets_feed[&t] = 1;
1310 140 : bool t_is_select =
1311 140 : dynamic_cast<const module::Switcher*>(&(t.get_module())) && t.get_name() == "select";
1312 :
1313 134 : if ((!t_is_select && in_sockets_feed[&t] >= (t.get_n_input_sockets() + t.get_n_fwd_sockets()) -
1314 280 : t.get_n_static_input_sockets()) ||
1315 6 : (t_is_select && t.is_last_input_socket(*bs)))
1316 : {
1317 96 : is_last = false;
1318 96 : last_subseq = Sequence::init_recursive<SS, TA>(task_subseq[&t].first,
1319 192 : task_subseq[&t].second,
1320 : taid,
1321 : selectors,
1322 : switchers,
1323 : first,
1324 : t,
1325 : lasts,
1326 : exclusions,
1327 : real_lasts_id,
1328 : real_lasts,
1329 : in_sockets_feed,
1330 : task_subseq);
1331 : }
1332 44 : else if (t_is_select)
1333 : {
1334 0 : tools::Digraph_node<SS>* node_selector = nullptr;
1335 0 : for (auto& sel : selectors)
1336 0 : if (sel.first == &t)
1337 : {
1338 0 : node_selector = sel.second;
1339 0 : break;
1340 : }
1341 :
1342 0 : if (!node_selector)
1343 : {
1344 0 : node_selector = new tools::Digraph_node<SS>(
1345 0 : { node_selector_son }, {}, nullptr, node_selector_son->get_depth() + 1);
1346 0 : selectors.push_back({ &t, node_selector });
1347 : }
1348 0 : node_selector->add_parent(node_selector_son);
1349 0 : node_selector_son->add_child(node_selector);
1350 : }
1351 : }
1352 : }
1353 : }
1354 : }
1355 168 : }
1356 : else // --------------------------------------------------------------------------------------------- STANDARD CASE
1357 : {
1358 3493 : cur_subseq->get_c()->tasks.push_back(¤t_task);
1359 3493 : cur_subseq->get_c()->tasks_id.push_back(taid++);
1360 :
1361 3493 : if (std::find(lasts.begin(), lasts.end(), ¤t_task) == lasts.end())
1362 : {
1363 10479 : for (auto& s : current_task.sockets)
1364 : {
1365 7504 : if (s->get_type() == socket_t::SOUT || s->get_type() == socket_t::SFWD)
1366 : {
1367 5479 : auto bss = s->get_bound_sockets();
1368 8486 : for (auto& bs : bss)
1369 : {
1370 3007 : if (bs == nullptr) continue;
1371 3007 : auto& t = bs->get_task();
1372 3007 : if (std::find(exclusions.begin(), exclusions.end(), &t) == exclusions.end())
1373 : {
1374 2979 : if (task_subseq.find(&t) == task_subseq.end() || task_subseq[&t].second < ssid)
1375 2679 : task_subseq[&t] = { cur_subseq, ssid };
1376 :
1377 2979 : in_sockets_feed.find(&t) != in_sockets_feed.end() ? in_sockets_feed[&t]++
1378 2621 : : in_sockets_feed[&t] = 1;
1379 2979 : bool t_is_select =
1380 2979 : dynamic_cast<const module::Switcher*>(&(t.get_module())) && t.get_name() == "select";
1381 8820 : if ((!t_is_select &&
1382 2862 : in_sockets_feed[&t] >= (t.get_n_input_sockets() + t.get_n_fwd_sockets()) -
1383 5958 : t.get_n_static_input_sockets()) ||
1384 117 : (t_is_select && t.is_last_input_socket(*bs)))
1385 : {
1386 2564 : is_last = false;
1387 2564 : last_subseq = Sequence::init_recursive<SS, TA>(task_subseq[&t].first,
1388 5128 : task_subseq[&t].second,
1389 : taid,
1390 : selectors,
1391 : switchers,
1392 : first,
1393 : t,
1394 : lasts,
1395 : exclusions,
1396 : real_lasts_id,
1397 : real_lasts,
1398 : in_sockets_feed,
1399 : task_subseq);
1400 : }
1401 415 : else if (t_is_select)
1402 : {
1403 58 : tools::Digraph_node<SS>* node_selector = nullptr;
1404 77 : for (auto& sel : selectors)
1405 55 : if (sel.first == &t)
1406 : {
1407 36 : node_selector = sel.second;
1408 36 : break;
1409 : }
1410 :
1411 58 : if (!node_selector)
1412 : {
1413 44 : node_selector = new tools::Digraph_node<SS>(
1414 22 : { cur_subseq }, {}, nullptr, cur_subseq->get_depth() + 1);
1415 22 : selectors.push_back({ &t, node_selector });
1416 : }
1417 58 : node_selector->add_parent(cur_subseq);
1418 58 : cur_subseq->add_child(node_selector);
1419 : }
1420 : }
1421 : }
1422 5479 : }
1423 2025 : else if (s->get_type() == socket_t::SIN && s->get_bound_sockets().size() > 1)
1424 : {
1425 0 : std::stringstream message;
1426 : message << "'s->get_bound_sockets().size()' has to be smaller or equal to 1 ("
1427 : << "'s->get_bound_sockets().size()'"
1428 0 : << " = " << s->get_bound_sockets().size() << ", "
1429 : << "'s->get_type()'"
1430 : << " = "
1431 : << "socket_t::SIN"
1432 : << ", "
1433 : << "'s->get_name()'"
1434 0 : << " = " << s->get_name() << ", "
1435 : << "'s->get_task().get_name()'"
1436 0 : << " = " << s->get_task().get_name() << ", "
1437 : << "'s->get_task().get_module().get_name()'"
1438 0 : << " = " << s->get_task().get_module().get_name() << ").";
1439 0 : throw tools::runtime_error(__FILE__, __LINE__, __func__, message.str());
1440 0 : }
1441 : }
1442 : }
1443 : }
1444 :
1445 3661 : if (is_last && std::find(real_lasts.begin(), real_lasts.end(), ¤t_task) == real_lasts.end())
1446 : {
1447 1082 : real_lasts.push_back(¤t_task);
1448 1082 : real_lasts_id.push_back(cur_subseq->get_contents()->tasks_id.back());
1449 : }
1450 :
1451 3661 : if (last_subseq)
1452 2579 : return last_subseq;
1453 : else
1454 1082 : return cur_subseq;
1455 : }
1456 :
1457 : template<class SS, class MO>
1458 : void
1459 687 : Sequence::replicate(const tools::Digraph_node<SS>* sequence)
1460 : {
1461 687 : std::set<MO*> modules_set;
1462 687 : std::vector<const runtime::Task*> tsks_vec; // get a vector of tasks included in the tasks graph
1463 : std::function<void(const tools::Digraph_node<SS>*, std::vector<const tools::Digraph_node<SS>*>&)>
1464 687 : collect_modules_list;
1465 687 : collect_modules_list =
1466 1151 : [&](const tools::Digraph_node<SS>* node, std::vector<const tools::Digraph_node<SS>*>& already_parsed_nodes)
1467 : {
1468 2302 : if (node != nullptr &&
1469 2302 : std::find(already_parsed_nodes.begin(), already_parsed_nodes.end(), node) == already_parsed_nodes.end())
1470 : {
1471 1056 : already_parsed_nodes.push_back(node);
1472 1056 : tsks_vec.insert(tsks_vec.end(), node->get_c()->tasks.begin(), node->get_c()->tasks.end());
1473 1056 : if (node->get_c())
1474 4717 : for (auto ta : node->get_c()->tasks)
1475 3661 : modules_set.insert(&ta->get_module());
1476 1520 : for (auto c : node->get_children())
1477 464 : collect_modules_list(c, already_parsed_nodes);
1478 : }
1479 : };
1480 687 : std::vector<const tools::Digraph_node<SS>*> already_parsed_nodes;
1481 687 : collect_modules_list(sequence, already_parsed_nodes);
1482 :
1483 : // check if all the tasks of the sequence are replicable before to perform the modules clones
1484 687 : if (this->n_threads - (this->tasks_inplace ? 1 : 0))
1485 2036 : for (auto& t : tsks_vec)
1486 1748 : if (!t->is_replicable())
1487 : {
1488 0 : std::stringstream message;
1489 : message << "It is not possible to replicate this sequence because at least one of its tasks is not "
1490 : << "replicable (t->get_name() = '" << t->get_name() << "', t->get_module().get_name() = '"
1491 0 : << t->get_module().get_name() << "').";
1492 0 : throw tools::runtime_error(__FILE__, __LINE__, __func__, message.str());
1493 0 : }
1494 :
1495 687 : std::vector<MO*> modules_vec;
1496 4251 : for (auto m : modules_set)
1497 3564 : modules_vec.push_back(m);
1498 :
1499 : // clone the modules
1500 6193 : for (size_t tid = 0; tid < this->n_threads - (this->tasks_inplace ? 1 : 0); tid++)
1501 : {
1502 5506 : if (this->is_thread_pinning())
1503 : {
1504 15 : const auto real_tid = tid + (this->tasks_inplace ? 1 : 0);
1505 15 : if (!this->puids.empty())
1506 0 : tools::Thread_pinning::pin(this->puids[real_tid]);
1507 : else
1508 15 : tools::Thread_pinning::pin(this->pin_objects_per_thread[real_tid]);
1509 : }
1510 :
1511 5506 : this->modules[tid].resize(modules_vec.size());
1512 5506 : this->all_modules[tid + (this->tasks_inplace ? 1 : 0)].resize(modules_vec.size());
1513 43884 : for (size_t m = 0; m < modules_vec.size(); m++)
1514 : {
1515 : try
1516 : {
1517 38378 : this->modules[tid][m].reset(modules_vec[m]->clone());
1518 : }
1519 0 : catch (std::exception& e)
1520 : {
1521 0 : std::stringstream message;
1522 : message << "Module clone failed when trying to replicate the sequence (module name is '"
1523 0 : << modules_vec[m]->get_name() << "').";
1524 :
1525 0 : throw tools::runtime_error(__FILE__, __LINE__, __func__, message.str());
1526 0 : }
1527 38378 : this->all_modules[tid + (this->tasks_inplace ? 1 : 0)][m] = this->modules[tid][m].get();
1528 : }
1529 :
1530 5506 : if (this->is_thread_pinning()) tools::Thread_pinning::unpin();
1531 : }
1532 :
1533 84844 : auto get_module_id = [](const std::vector<MO*>& modules, const module::Module& module)
1534 : {
1535 : int m_id;
1536 462531 : for (m_id = 0; m_id < (int)modules.size(); m_id++)
1537 458844 : if (modules[m_id] == &module) return m_id;
1538 3687 : return -1;
1539 : };
1540 :
1541 80922 : auto get_task_id = [](const std::vector<std::shared_ptr<runtime::Task>>& tasks, const runtime::Task& task)
1542 : {
1543 : int t_id;
1544 97225 : for (t_id = 0; t_id < (int)tasks.size(); t_id++)
1545 97225 : if (tasks[t_id].get() == &task) return t_id;
1546 0 : return -1;
1547 : };
1548 :
1549 40207 : auto get_socket_id = [](const std::vector<std::shared_ptr<runtime::Socket>>& sockets, const runtime::Socket& socket)
1550 : {
1551 : int s_id;
1552 85302 : for (s_id = 0; s_id < (int)sockets.size(); s_id++)
1553 85302 : if (sockets[s_id].get() == &socket) return s_id;
1554 0 : return -1;
1555 : };
1556 :
1557 : std::function<void(const tools::Digraph_node<SS>*,
1558 : tools::Digraph_node<Sub_sequence>*,
1559 : const size_t,
1560 : std::vector<const tools::Digraph_node<SS>*>&,
1561 : std::map<size_t, tools::Digraph_node<Sub_sequence>*>&)>
1562 687 : replicate_sequence;
1563 :
1564 18815 : replicate_sequence = [&](const tools::Digraph_node<SS>* sequence_ref,
1565 : tools::Digraph_node<Sub_sequence>* sequence_cpy,
1566 : const size_t thread_id,
1567 : std::vector<const tools::Digraph_node<SS>*>& already_parsed_nodes,
1568 : std::map<size_t, tools::Digraph_node<Sub_sequence>*>& allocated_nodes)
1569 : {
1570 36256 : if (sequence_ref != nullptr && sequence_ref->get_c() &&
1571 18128 : std::find(already_parsed_nodes.begin(), already_parsed_nodes.end(), sequence_ref) ==
1572 36256 : already_parsed_nodes.end())
1573 : {
1574 15419 : already_parsed_nodes.push_back(sequence_ref);
1575 :
1576 15419 : auto ss_ref = sequence_ref->get_c();
1577 15419 : auto ss_cpy = new Sub_sequence();
1578 :
1579 15419 : ss_cpy->type = ss_ref->type;
1580 15419 : ss_cpy->id = ss_ref->id;
1581 15419 : ss_cpy->tasks_id = ss_ref->tasks_id;
1582 56134 : for (auto t_ref : ss_ref->tasks)
1583 : {
1584 40715 : auto& m_ref = t_ref->get_module();
1585 :
1586 40715 : auto m_id = get_module_id(modules_vec, m_ref);
1587 40715 : auto t_id = get_task_id(m_ref.tasks, *t_ref);
1588 :
1589 40715 : assert(m_id != -1);
1590 40715 : assert(t_id != -1);
1591 :
1592 : // add the task to the sub-sequence
1593 40715 : ss_cpy->tasks.push_back(this->all_modules[thread_id][m_id]->tasks[t_id].get());
1594 :
1595 : // replicate the sockets binding
1596 156551 : for (size_t s_id = 0; s_id < t_ref->sockets.size(); s_id++)
1597 : {
1598 197584 : if (t_ref->sockets[s_id]->get_type() == socket_t::SIN ||
1599 81748 : t_ref->sockets[s_id]->get_type() == socket_t::SFWD)
1600 : {
1601 40298 : const runtime::Socket* s_ref_out = nullptr;
1602 : try
1603 : {
1604 40298 : s_ref_out = &t_ref->sockets[s_id]->get_bound_socket();
1605 : }
1606 96 : catch (...)
1607 : {
1608 : }
1609 40298 : if (s_ref_out)
1610 : {
1611 40202 : auto& t_ref_out = s_ref_out->get_task();
1612 40202 : auto& m_ref_out = t_ref_out.get_module();
1613 :
1614 : // check if `t_ref_out` is included in the tasks graph
1615 40202 : auto t_in_seq = std::find(tsks_vec.begin(), tsks_vec.end(), &t_ref_out) != tsks_vec.end();
1616 40202 : auto m_id_out = get_module_id(modules_vec, m_ref_out);
1617 :
1618 40202 : if (t_in_seq && m_id_out != -1)
1619 : {
1620 36943 : auto t_id_out = get_task_id(m_ref_out.tasks, t_ref_out);
1621 36943 : auto s_id_out = get_socket_id(t_ref_out.sockets, *s_ref_out);
1622 :
1623 36943 : assert(t_id_out != -1);
1624 36943 : assert(s_id_out != -1);
1625 :
1626 36943 : auto& s_in = *this->all_modules[thread_id][m_id]->tasks[t_id]->sockets[s_id];
1627 : auto& s_out =
1628 36943 : *this->all_modules[thread_id][m_id_out]->tasks[t_id_out]->sockets[s_id_out];
1629 36943 : s_in = s_out;
1630 : }
1631 : }
1632 : }
1633 : }
1634 :
1635 : // replicate the tasks binding
1636 40715 : if (t_ref->fake_input_sockets.size())
1637 : {
1638 7854 : for (auto& fsi : t_ref->fake_input_sockets)
1639 : {
1640 3927 : const runtime::Socket* s_ref_out = nullptr;
1641 : try
1642 : {
1643 3927 : s_ref_out = &fsi->get_bound_socket();
1644 : }
1645 0 : catch (...)
1646 : {
1647 : }
1648 3927 : if (s_ref_out)
1649 : {
1650 3927 : auto& t_ref_out = s_ref_out->get_task();
1651 3927 : auto& m_ref_out = t_ref_out.get_module();
1652 :
1653 : // check if `t_ref_out` is included in the tasks graph
1654 3927 : auto t_in_seq = std::find(tsks_vec.begin(), tsks_vec.end(), &t_ref_out) != tsks_vec.end();
1655 3927 : auto m_id_out = get_module_id(modules_vec, m_ref_out);
1656 :
1657 3927 : if (t_in_seq && m_id_out != -1)
1658 : {
1659 3264 : auto t_id_out = get_task_id(m_ref_out.tasks, t_ref_out);
1660 3264 : auto s_id_out = get_socket_id(t_ref_out.sockets, *s_ref_out);
1661 :
1662 3264 : assert(t_id_out != -1);
1663 3264 : assert(s_id_out != -1);
1664 :
1665 3264 : auto& t_in = *this->all_modules[thread_id][m_id]->tasks[t_id];
1666 : auto& s_out =
1667 3264 : *this->all_modules[thread_id][m_id_out]->tasks[t_id_out]->sockets[s_id_out];
1668 3264 : t_in = s_out;
1669 : }
1670 : }
1671 : }
1672 : }
1673 : }
1674 :
1675 : // add the sub-sequence to the current tree node
1676 15419 : sequence_cpy->set_contents(ss_cpy);
1677 15419 : allocated_nodes[sequence_cpy->get_c()->id] = sequence_cpy;
1678 :
1679 28041 : for (size_t c = 0; c < sequence_ref->get_children().size(); c++)
1680 : {
1681 12622 : if (std::find(already_parsed_nodes.begin(),
1682 : already_parsed_nodes.end(),
1683 25244 : sequence_ref->get_children()[c]) != already_parsed_nodes.end())
1684 2709 : sequence_cpy->add_child(allocated_nodes[sequence_ref->get_children()[c]->get_c()->id]);
1685 : else
1686 19826 : sequence_cpy->add_child(new tools::Digraph_node<Sub_sequence>(
1687 9913 : { sequence_cpy }, {}, nullptr, sequence_cpy->get_depth() + 1));
1688 : }
1689 :
1690 28041 : for (size_t c = 0; c < sequence_ref->get_children().size(); c++)
1691 12622 : replicate_sequence(sequence_ref->get_children()[c],
1692 12622 : sequence_cpy->get_children()[c],
1693 : thread_id,
1694 : already_parsed_nodes,
1695 : allocated_nodes);
1696 : }
1697 : };
1698 :
1699 6193 : for (size_t thread_id = (this->tasks_inplace ? 1 : 0); thread_id < this->sequences.size(); thread_id++)
1700 : {
1701 5506 : if (this->is_thread_pinning())
1702 : {
1703 15 : if (!this->puids.empty())
1704 0 : tools::Thread_pinning::pin(this->puids[thread_id]);
1705 : else
1706 15 : tools::Thread_pinning::pin(this->pin_objects_per_thread[thread_id]);
1707 : }
1708 :
1709 5506 : this->sequences[thread_id] = new tools::Digraph_node<Sub_sequence>({}, {}, nullptr, 0);
1710 5506 : already_parsed_nodes.clear();
1711 5506 : std::map<size_t, tools::Digraph_node<Sub_sequence>*> allocated_nodes;
1712 5506 : replicate_sequence(sequence, this->sequences[thread_id], thread_id, already_parsed_nodes, allocated_nodes);
1713 :
1714 5506 : if (this->is_thread_pinning()) tools::Thread_pinning::unpin();
1715 : }
1716 687 : }
1717 :
1718 : template void
1719 : runtime::Sequence::replicate<runtime::Sub_sequence_const, const module::Module>(
1720 : const tools::Digraph_node<runtime::Sub_sequence_const>*);
1721 : template void
1722 : runtime::Sequence::replicate<runtime::Sub_sequence, module::Module>(const tools::Digraph_node<runtime::Sub_sequence>*);
1723 :
1724 : template<class SS>
1725 : void
1726 19279 : Sequence::delete_tree(tools::Digraph_node<SS>* node, std::vector<tools::Digraph_node<SS>*>& already_deleted_nodes)
1727 : {
1728 38558 : if (node != nullptr &&
1729 38558 : std::find(already_deleted_nodes.begin(), already_deleted_nodes.end(), node) == already_deleted_nodes.end())
1730 : {
1731 16475 : already_deleted_nodes.push_back(node);
1732 29561 : for (auto c : node->get_children())
1733 13086 : this->delete_tree(c, already_deleted_nodes);
1734 16475 : auto c = node->get_c();
1735 16475 : if (c != nullptr) delete c;
1736 16475 : delete node;
1737 : }
1738 19279 : }
1739 :
1740 : template void
1741 : runtime::Sequence::delete_tree<runtime::Sub_sequence_const>(
1742 : tools::Digraph_node<runtime::Sub_sequence_const>*,
1743 : std::vector<tools::Digraph_node<Sub_sequence_const>*>& already_deleted_nodes);
1744 : template void
1745 : runtime::Sequence::delete_tree<runtime::Sub_sequence>(
1746 : tools::Digraph_node<runtime::Sub_sequence>*,
1747 : std::vector<tools::Digraph_node<Sub_sequence>*>& already_deleted_nodes);
1748 :
1749 : template<class VTA>
1750 : void
1751 67 : Sequence::export_dot_subsequence(const VTA& subseq,
1752 : const std::vector<size_t>& tasks_id,
1753 : const subseq_t& subseq_type,
1754 : const std::string& subseq_name,
1755 : const std::string& tab,
1756 : std::ostream& stream) const
1757 : {
1758 67 : if (!subseq_name.empty())
1759 : {
1760 67 : stream << tab << "subgraph \"cluster_" << subseq_name << "_" << +this << "\" {" << std::endl;
1761 67 : stream << tab << tab << "node [style=filled];" << std::endl;
1762 : }
1763 67 : size_t exec_order = 0;
1764 301 : for (auto& t : subseq)
1765 : {
1766 234 : std::string color = dynamic_cast<module::Adaptor_m_to_n*>(&t->get_module()) ? "green" : "blue";
1767 234 : color = dynamic_cast<module::AProbe*>(&t->get_module()) ? "pink" : color;
1768 234 : stream << tab << tab << "subgraph \"cluster_" << +&t->get_module() << "_" << +t << "\" {" << std::endl;
1769 234 : stream << tab << tab << tab << "node [style=filled];" << std::endl;
1770 234 : stream << tab << tab << tab << "subgraph \"cluster_" << +&t << "\" {" << std::endl;
1771 234 : stream << tab << tab << tab << tab << "node [style=filled];" << std::endl;
1772 :
1773 234 : if (t->fake_input_sockets.size())
1774 : {
1775 47 : std::string stype = "in";
1776 94 : for (auto& fsi : t->fake_input_sockets)
1777 47 : stream << tab << tab << tab << tab << "\"" << +fsi.get() << "\""
1778 47 : << "[label=\"" << stype << ":" << fsi->get_name() << "\", style=filled, "
1779 94 : << "fillcolor=red, penwidth=\"2.0\"];" << std::endl;
1780 47 : }
1781 :
1782 234 : size_t sid = 0;
1783 848 : for (auto& s : t->sockets)
1784 : {
1785 614 : std::string stype;
1786 614 : bool static_input = false;
1787 614 : switch (s->get_type())
1788 : {
1789 172 : case socket_t::SIN:
1790 172 : stype = "in[" + std::to_string(sid) + "]";
1791 172 : static_input = s->_get_dataptr() != nullptr && s->bound_socket == nullptr;
1792 172 : break;
1793 399 : case socket_t::SOUT:
1794 399 : stype = "out[" + std::to_string(sid) + "]";
1795 399 : break;
1796 43 : case socket_t::SFWD:
1797 43 : stype = "fwd[" + std::to_string(sid) + "]";
1798 43 : break;
1799 0 : default:
1800 0 : stype = "unkn";
1801 0 : break;
1802 : }
1803 :
1804 614 : std::string bold_or_not;
1805 614 : if (t->is_last_input_socket(*s)) bold_or_not = ", penwidth=\"2.0\"";
1806 :
1807 614 : stream << tab << tab << tab << tab << "\"" << +s.get() << "\""
1808 614 : << "[label=\"" << stype << ":" << s->get_name() << "\"" << bold_or_not
1809 1228 : << (static_input ? ", style=filled, fillcolor=green" : "") << "];" << std::endl;
1810 614 : sid++;
1811 : }
1812 :
1813 468 : stream << tab << tab << tab << tab << "label=\"" << t->get_name() << " (id = " << tasks_id[exec_order] << ")"
1814 234 : << "\";" << std::endl;
1815 234 : stream << tab << tab << tab << tab << "color=" << (t->is_replicable() ? color : "red") << ";" << std::endl;
1816 234 : stream << tab << tab << tab << "}" << std::endl;
1817 234 : stream << tab << tab << tab << "label=\"" << t->get_module().get_name() << "\n"
1818 468 : << (t->get_module().get_custom_name().empty() ? "" : t->get_module().get_custom_name() + "\n")
1819 702 : << "exec order: [" << exec_order++ << "]\n"
1820 234 : << "addr: " << +&t->get_module() << "\";" << std::endl;
1821 234 : stream << tab << tab << tab << "color=" << color << ";" << std::endl;
1822 234 : stream << tab << tab << "}" << std::endl;
1823 : }
1824 67 : if (!subseq_name.empty())
1825 : {
1826 67 : stream << tab << tab << "label=\"" << subseq_name << "\";" << std::endl;
1827 67 : std::string color = subseq_type == subseq_t::COMMUTE || subseq_type == subseq_t::SELECT ? "red" : "blue";
1828 67 : stream << tab << tab << "color=" << color << ";" << std::endl;
1829 67 : stream << tab << "}" << std::endl;
1830 67 : }
1831 67 : }
1832 :
1833 : template void
1834 : runtime::Sequence::export_dot_subsequence<std::vector<runtime::Task*>>(const std::vector<runtime::Task*>&,
1835 : const std::vector<size_t>&,
1836 : const subseq_t&,
1837 : const std::string&,
1838 : const std::string&,
1839 : std::ostream&) const;
1840 : template void
1841 : runtime::Sequence::export_dot_subsequence<std::vector<const runtime::Task*>>(const std::vector<const runtime::Task*>&,
1842 : const std::vector<size_t>&,
1843 : const subseq_t&,
1844 : const std::string&,
1845 : const std::string&,
1846 : std::ostream&) const;
1847 :
1848 : template<class VTA>
1849 : void
1850 67 : Sequence::export_dot_connections(const VTA& subseq, const std::string& tab, std::ostream& stream) const
1851 : {
1852 301 : for (auto& t : subseq)
1853 : {
1854 848 : for (auto& s : t->sockets)
1855 : {
1856 614 : if (s->get_type() == socket_t::SOUT || s->get_type() == socket_t::SFWD)
1857 : {
1858 442 : auto& bss = s->get_bound_sockets();
1859 442 : size_t id = 0;
1860 704 : for (auto& bs : bss)
1861 : {
1862 262 : stream << tab << "\"" << +s.get() << "\" -> \"" << +bs << "\""
1863 262 : << (bss.size() > 1 ? "[label=\"" + std::to_string(id++) + "\"]" : "") << std::endl;
1864 : }
1865 : }
1866 : }
1867 : }
1868 67 : }
1869 :
1870 : template void
1871 : runtime::Sequence::export_dot_connections<std::vector<runtime::Task*>>(const std::vector<runtime::Task*>&,
1872 : const std::string&,
1873 : std::ostream&) const;
1874 : template void
1875 : runtime::Sequence::export_dot_connections<std::vector<const runtime::Task*>>(const std::vector<const runtime::Task*>&,
1876 : const std::string&,
1877 : std::ostream&) const;
1878 :
1879 : void
1880 12 : Sequence::export_dot(std::ostream& stream) const
1881 : {
1882 12 : auto root = this->sequences[0];
1883 12 : this->export_dot(root, stream);
1884 12 : }
1885 :
1886 : template<class SS>
1887 : void
1888 12 : Sequence::export_dot(tools::Digraph_node<SS>* root, std::ostream& stream) const
1889 : {
1890 : std::function<void(
1891 : tools::Digraph_node<SS>*, const std::string&, std::ostream&, std::vector<tools::Digraph_node<SS>*>&)>
1892 12 : export_dot_subsequences_recursive =
1893 50 : [&export_dot_subsequences_recursive, this](tools::Digraph_node<SS>* cur_node,
1894 : const std::string& tab,
1895 : std::ostream& stream,
1896 : std::vector<tools::Digraph_node<SS>*>& already_parsed_nodes)
1897 : {
1898 100 : if (cur_node != nullptr &&
1899 100 : std::find(already_parsed_nodes.begin(), already_parsed_nodes.end(), cur_node) == already_parsed_nodes.end())
1900 : {
1901 42 : already_parsed_nodes.push_back(cur_node);
1902 126 : this->export_dot_subsequence(cur_node->get_c()->tasks,
1903 42 : cur_node->get_c()->tasks_id,
1904 42 : cur_node->get_c()->type,
1905 42 : "Sub-sequence " + std::to_string(cur_node->get_c()->id) +
1906 : " (depth = " + std::to_string(cur_node->get_depth()) + ")",
1907 : tab,
1908 : stream);
1909 :
1910 80 : for (auto c : cur_node->get_children())
1911 38 : export_dot_subsequences_recursive(c, tab, stream, already_parsed_nodes);
1912 : }
1913 : };
1914 :
1915 : std::function<void(
1916 : tools::Digraph_node<SS>*, const std::string&, std::ostream&, std::vector<tools::Digraph_node<SS>*>&)>
1917 12 : export_dot_connections_recursive =
1918 50 : [&export_dot_connections_recursive, this](tools::Digraph_node<SS>* cur_node,
1919 : const std::string& tab,
1920 : std::ostream& stream,
1921 : std::vector<tools::Digraph_node<SS>*>& already_parsed_nodes)
1922 : {
1923 100 : if (cur_node != nullptr &&
1924 100 : std::find(already_parsed_nodes.begin(), already_parsed_nodes.end(), cur_node) == already_parsed_nodes.end())
1925 : {
1926 42 : already_parsed_nodes.push_back(cur_node);
1927 42 : this->export_dot_connections(cur_node->get_c()->tasks, tab, stream);
1928 :
1929 80 : for (auto c : cur_node->get_children())
1930 38 : export_dot_connections_recursive(c, tab, stream, already_parsed_nodes);
1931 : }
1932 : };
1933 :
1934 12 : std::string tab = "\t";
1935 12 : stream << "digraph Sequence {" << std::endl;
1936 12 : std::vector<tools::Digraph_node<SS>*> already_parsed_nodes;
1937 12 : export_dot_subsequences_recursive(root, tab, stream, already_parsed_nodes);
1938 12 : already_parsed_nodes.clear();
1939 12 : export_dot_connections_recursive(root, tab, stream, already_parsed_nodes);
1940 12 : stream << "}" << std::endl;
1941 12 : }
1942 :
1943 : void
1944 1565 : Sequence::gen_processes(const bool no_copy_mode)
1945 : {
1946 : std::function<void(Socket * socket, std::vector<runtime::Socket*> & list_fwd)> explore_thread_rec =
1947 15637 : [&explore_thread_rec](Socket* socket, std::vector<runtime::Socket*>& list_fwd)
1948 : {
1949 12274 : auto bound = socket->get_bound_sockets();
1950 24020 : for (auto explore_bound : bound)
1951 : {
1952 13519 : if (find(list_fwd.begin(), list_fwd.end(), explore_bound) == list_fwd.end() &&
1953 1775 : explore_bound->get_type() != socket_t::SOUT)
1954 : {
1955 1791 : list_fwd.push_back(explore_bound);
1956 : }
1957 11747 : if (explore_bound->get_type() == socket_t::SFWD) explore_thread_rec(explore_bound, list_fwd);
1958 : }
1959 13818 : };
1960 :
1961 : std::function<void(Socket * socket, std::vector<runtime::Socket*> & list_fwd)> explore_thread_rec_reverse =
1962 3267 : [&explore_thread_rec, &explore_thread_rec_reverse](Socket* socket, std::vector<runtime::Socket*>& list_fwd)
1963 : {
1964 1582 : auto bound = &socket->get_bound_socket();
1965 1582 : if (find(list_fwd.begin(), list_fwd.end(), bound) == list_fwd.end())
1966 : {
1967 1582 : list_fwd.push_back(bound);
1968 : }
1969 1582 : if (bound->get_type() == socket_t::SFWD)
1970 : {
1971 843 : explore_thread_rec(bound, list_fwd);
1972 842 : explore_thread_rec_reverse(bound, list_fwd);
1973 : }
1974 3137 : };
1975 :
1976 : std::function<void(tools::Digraph_node<Sub_sequence>*, std::vector<tools::Digraph_node<Sub_sequence>*>&)>
1977 : gen_processes_recursive =
1978 47357 : [&gen_processes_recursive, no_copy_mode, &explore_thread_rec, &explore_thread_rec_reverse](
1979 : tools::Digraph_node<Sub_sequence>* cur_node,
1980 63509 : std::vector<tools::Digraph_node<Sub_sequence>*>& already_parsed_nodes)
1981 : {
1982 94709 : if (cur_node != nullptr &&
1983 94709 : std::find(already_parsed_nodes.begin(), already_parsed_nodes.end(), cur_node) == already_parsed_nodes.end())
1984 : {
1985 40605 : already_parsed_nodes.push_back(cur_node);
1986 :
1987 40609 : std::map<runtime::Task*, std::function<const int*()>> modified_tasks;
1988 40613 : auto contents = cur_node->get_c();
1989 40613 : contents->processes.clear();
1990 40615 : contents->rebind_sockets.clear();
1991 40615 : contents->rebind_dataptrs.clear();
1992 162242 : for (auto task : contents->tasks)
1993 : {
1994 132633 : if (dynamic_cast<module::Switcher*>(&task->get_module()) &&
1995 254249 : task->get_name().find("select") != std::string::npos && no_copy_mode)
1996 : {
1997 1587 : auto select_task = task;
1998 1587 : auto switcher = dynamic_cast<module::Switcher*>(&select_task->get_module());
1999 1587 : switcher->set_no_copy_select(true);
2000 :
2001 1587 : const auto rebind_id = contents->rebind_sockets.size();
2002 1587 : contents->rebind_sockets.resize(rebind_id + 1);
2003 1587 : contents->rebind_dataptrs.resize(rebind_id + 1);
2004 :
2005 6733 : for (size_t s = 0; s < select_task->sockets.size() - 1; s++)
2006 : {
2007 : // there should be only one output socket at this time
2008 5146 : if (select_task->sockets[s]->get_type() == socket_t::SOUT)
2009 : {
2010 1587 : std::vector<runtime::Socket*> bound_sockets;
2011 1587 : std::vector<void*> dataptrs;
2012 :
2013 3174 : for (auto socket : select_task->sockets[s]->get_bound_sockets())
2014 : {
2015 1587 : bound_sockets.push_back(socket);
2016 1587 : if (socket->get_type() == socket_t::SFWD) explore_thread_rec(socket, bound_sockets);
2017 : }
2018 3174 : for (auto sck : bound_sockets)
2019 1587 : dataptrs.push_back(sck->_get_dataptr());
2020 :
2021 1587 : contents->rebind_sockets[rebind_id].push_back(bound_sockets);
2022 1587 : contents->rebind_dataptrs[rebind_id].push_back(dataptrs);
2023 1587 : }
2024 : }
2025 :
2026 522258 : modified_tasks[select_task] = [contents, select_task, switcher, rebind_id]() -> const int*
2027 : {
2028 57840 : select_task->exec();
2029 57732 : const int* status = select_task->sockets.back()->get_dataptr<int>();
2030 :
2031 57621 : const auto path = switcher->get_path();
2032 57609 : const auto in_dataptr = select_task->sockets[path]->_get_dataptr();
2033 :
2034 : // rebind input sockets on the fly
2035 : // there should be only one output socket at this time (sout_id == 0)
2036 115333 : for (size_t sout_id = 0; sout_id < contents->rebind_sockets[rebind_id].size(); sout_id++)
2037 115358 : for (size_t sin_id = 0; sin_id < contents->rebind_sockets[rebind_id][sout_id].size();
2038 : sin_id++)
2039 57591 : contents->rebind_sockets[rebind_id][sout_id][sin_id]->dataptr = in_dataptr;
2040 :
2041 57762 : return status;
2042 1587 : };
2043 : }
2044 :
2045 132624 : if (dynamic_cast<module::Switcher*>(&task->get_module()) &&
2046 254263 : task->get_name().find("commute") != std::string::npos && no_copy_mode)
2047 : {
2048 1587 : auto commute_task = task;
2049 1587 : auto switcher = dynamic_cast<module::Switcher*>(&commute_task->get_module());
2050 1587 : switcher->set_no_copy_commute(true);
2051 :
2052 1587 : const auto rebind_id = contents->rebind_sockets.size();
2053 1587 : contents->rebind_sockets.resize(rebind_id + 1);
2054 1587 : contents->rebind_dataptrs.resize(rebind_id + 1);
2055 :
2056 8320 : for (size_t s = 0; s < commute_task->sockets.size() - 1; s++)
2057 : {
2058 6733 : if (commute_task->sockets[s]->get_type() == socket_t::SOUT)
2059 : {
2060 3559 : std::vector<runtime::Socket*> bound_sockets;
2061 3559 : std::vector<void*> dataptrs;
2062 :
2063 7263 : for (auto socket : commute_task->sockets[s]->get_bound_sockets())
2064 : {
2065 3704 : bound_sockets.push_back(socket);
2066 3704 : if (socket->get_type() == socket_t::SFWD) explore_thread_rec(socket, bound_sockets);
2067 : }
2068 7263 : for (auto sck : bound_sockets)
2069 3704 : dataptrs.push_back(sck->_get_dataptr());
2070 :
2071 3559 : contents->rebind_sockets[rebind_id].push_back(bound_sockets);
2072 3559 : contents->rebind_dataptrs[rebind_id].push_back(dataptrs);
2073 3559 : }
2074 : }
2075 :
2076 410143 : modified_tasks[commute_task] = [contents, commute_task, switcher, rebind_id]() -> const int*
2077 : {
2078 57772 : commute_task->exec();
2079 57790 : const int* status = commute_task->sockets.back()->get_dataptr<int>();
2080 57643 : const auto in_dataptr = commute_task->sockets[0]->_get_dataptr();
2081 57556 : const auto path = switcher->get_path();
2082 :
2083 : // rebind input sockets on the fly
2084 116932 : for (size_t sin_id = 0; sin_id < contents->rebind_sockets[rebind_id][path].size(); sin_id++)
2085 59276 : contents->rebind_sockets[rebind_id][path][sin_id]->dataptr = in_dataptr;
2086 :
2087 57778 : return status;
2088 1587 : };
2089 : }
2090 :
2091 133988 : if (dynamic_cast<module::Adaptor_m_to_n*>(&task->get_module()) &&
2092 255618 : task->get_name().find("pull") != std::string::npos && no_copy_mode)
2093 : {
2094 3090 : auto pull_task = task;
2095 3090 : auto adp_pull = dynamic_cast<module::Adaptor_m_to_n*>(&pull_task->get_module());
2096 3090 : adp_pull->set_no_copy_pull(true);
2097 3089 : const auto rebind_id = contents->rebind_sockets.size();
2098 3088 : contents->rebind_sockets.resize(rebind_id + 1);
2099 3087 : contents->rebind_dataptrs.resize(rebind_id + 1);
2100 :
2101 8648 : for (size_t s = 0; s < pull_task->sockets.size() - 1; s++)
2102 : {
2103 5560 : if (pull_task->sockets[s]->get_type() == socket_t::SOUT)
2104 : {
2105 5559 : std::vector<runtime::Socket*> bound_sockets;
2106 5558 : std::vector<void*> dataptrs;
2107 :
2108 5559 : bound_sockets.push_back(pull_task->sockets[s].get());
2109 12221 : for (auto socket : pull_task->sockets[s]->get_bound_sockets())
2110 : {
2111 6663 : bound_sockets.push_back(socket);
2112 6666 : if (socket->get_type() == socket_t::SFWD) explore_thread_rec(socket, bound_sockets);
2113 : }
2114 18857 : for (auto sck : bound_sockets)
2115 13323 : dataptrs.push_back(sck->_get_dataptr());
2116 :
2117 5511 : contents->rebind_sockets[rebind_id].push_back(bound_sockets);
2118 5553 : contents->rebind_dataptrs[rebind_id].push_back(dataptrs);
2119 5555 : }
2120 : }
2121 :
2122 9355933 : modified_tasks[pull_task] = [contents, pull_task, adp_pull, rebind_id]() -> const int*
2123 : {
2124 : // active or passive waiting here
2125 557851 : pull_task->exec();
2126 549974 : const int* status = pull_task->sockets.back()->get_dataptr<int>();
2127 :
2128 : // rebind input sockets on the fly
2129 1499302 : for (size_t sin_id = 0; sin_id < contents->rebind_sockets[rebind_id].size(); sin_id++)
2130 : {
2131 959571 : if (contents->rebind_sockets[rebind_id][sin_id].size() > 1)
2132 : {
2133 : // we start to 1 because the rebinding of the 'pull_task' is made in the
2134 : // 'pull_task->exec()' call (this way the debug mode is still working)
2135 954420 : auto swap_buff = contents->rebind_sockets[rebind_id][sin_id][1]->_get_dataptr();
2136 950993 : auto buff = adp_pull->get_filled_buffer(sin_id, swap_buff);
2137 912125 : contents->rebind_sockets[rebind_id][sin_id][1]->dataptr = buff;
2138 : // for the next tasks the same buffer 'buff' is required, an easy mistake is to re-swap
2139 : // and the data will be false, this is why we just bind 'buff'
2140 1658601 : for (size_t ta = 2; ta < contents->rebind_sockets[rebind_id][sin_id].size(); ta++)
2141 745284 : contents->rebind_sockets[rebind_id][sin_id][ta]->dataptr = buff;
2142 : }
2143 : }
2144 561647 : adp_pull->wake_up_pusher();
2145 576279 : return status;
2146 3078 : };
2147 : }
2148 :
2149 133994 : if (dynamic_cast<module::Adaptor_m_to_n*>(&task->get_module()) &&
2150 255619 : task->get_name().find("push") != std::string::npos && no_copy_mode)
2151 : {
2152 3092 : auto push_task = task;
2153 3092 : auto adp_push = dynamic_cast<module::Adaptor_m_to_n*>(&push_task->get_module());
2154 3092 : adp_push->set_no_copy_push(true);
2155 3091 : const auto rebind_id = contents->rebind_sockets.size();
2156 3091 : contents->rebind_sockets.resize(rebind_id + 1);
2157 3091 : contents->rebind_dataptrs.resize(rebind_id + 1);
2158 :
2159 9852 : for (size_t s = 0; s < push_task->sockets.size() - 1; s++)
2160 6766 : if (push_task->sockets[s]->get_type() == socket_t::SIN)
2161 : {
2162 6764 : std::vector<runtime::Socket*> bound_sockets;
2163 6762 : std::vector<void*> dataptrs;
2164 :
2165 6761 : bound_sockets.push_back(push_task->sockets[s].get());
2166 :
2167 6758 : auto bound_socket = &push_task->sockets[s]->get_bound_socket();
2168 6759 : bound_sockets.push_back(bound_socket);
2169 6754 : explore_thread_rec(bound_socket, bound_sockets);
2170 :
2171 : // If the socket is FWD, we have to update all the other sockets with a backward
2172 : // exploration
2173 6763 : if (bound_socket->get_type() == socket_t::SFWD)
2174 740 : explore_thread_rec_reverse(bound_socket, bound_sockets);
2175 :
2176 22479 : for (auto sck : bound_sockets)
2177 15720 : dataptrs.push_back(sck->_get_dataptr());
2178 :
2179 6715 : contents->rebind_sockets[rebind_id].push_back(bound_sockets);
2180 6762 : contents->rebind_dataptrs[rebind_id].push_back(dataptrs);
2181 6761 : }
2182 :
2183 8964217 : modified_tasks[push_task] = [contents, push_task, adp_push, rebind_id]() -> const int*
2184 : {
2185 : // active or passive waiting here
2186 603187 : push_task->exec();
2187 616498 : const int* status = push_task->sockets.back()->get_dataptr<int>();
2188 : // rebind output sockets on the fly
2189 1641871 : for (size_t sout_id = 0; sout_id < contents->rebind_sockets[rebind_id].size(); sout_id++)
2190 : {
2191 : // we start to 1 because the rebinding of the 'push_task' is made in the
2192 : // 'push_task->exec()' call (this way the debug mode is still working)
2193 1053208 : auto swap_buff = contents->rebind_sockets[rebind_id][sout_id][1]->_get_dataptr();
2194 1049286 : auto buff = adp_push->get_empty_buffer(sout_id, swap_buff);
2195 1021233 : contents->rebind_sockets[rebind_id][sout_id][1]->dataptr = buff;
2196 : // the output socket linked to the push adp can have more than one socket bound and so
2197 : // we have to rebind all the input sokects bound to the current output socket
2198 1690312 : for (size_t ta = 2; ta < contents->rebind_sockets[rebind_id][sout_id].size(); ta++)
2199 664859 : contents->rebind_sockets[rebind_id][sout_id][ta]->dataptr = buff;
2200 : }
2201 617597 : adp_push->wake_up_puller();
2202 627213 : return status;
2203 3080 : };
2204 : }
2205 : }
2206 :
2207 162239 : for (auto task : contents->tasks)
2208 121612 : if (modified_tasks.count(task))
2209 9348 : contents->processes.push_back(modified_tasks[task]);
2210 : else
2211 112273 : contents->processes.push_back(
2212 5755794 : [task]() -> const int*
2213 : {
2214 2819769 : task->exec();
2215 2936025 : const int* status = task->sockets.back()->get_dataptr<int>();
2216 2832706 : return status;
2217 : });
2218 :
2219 71924 : for (auto c : cur_node->get_children())
2220 31310 : gen_processes_recursive(c, already_parsed_nodes);
2221 40627 : }
2222 48917 : };
2223 :
2224 1588 : size_t thread_id = 0;
2225 17653 : for (auto& sequence : this->sequences)
2226 : {
2227 16059 : if (this->is_thread_pinning())
2228 : {
2229 132 : if (!this->puids.empty())
2230 0 : tools::Thread_pinning::pin(this->puids[thread_id++]);
2231 : else
2232 132 : tools::Thread_pinning::pin(this->pin_objects_per_thread[thread_id++]);
2233 : }
2234 16060 : std::vector<tools::Digraph_node<Sub_sequence>*> already_parsed_nodes;
2235 16056 : gen_processes_recursive(sequence, already_parsed_nodes);
2236 :
2237 16065 : if (this->is_thread_pinning()) tools::Thread_pinning::unpin();
2238 16065 : }
2239 1591 : }
2240 :
2241 : void
2242 452 : Sequence::reset_no_copy_mode()
2243 : {
2244 : std::function<void(tools::Digraph_node<Sub_sequence>*, std::vector<tools::Digraph_node<Sub_sequence>*>&)>
2245 : reset_no_copy_mode_recursive =
2246 14096 : [&reset_no_copy_mode_recursive](tools::Digraph_node<Sub_sequence>* cur_node,
2247 9112 : std::vector<tools::Digraph_node<Sub_sequence>*>& already_parsed_nodes)
2248 : {
2249 28192 : if (cur_node != nullptr &&
2250 28192 : std::find(already_parsed_nodes.begin(), already_parsed_nodes.end(), cur_node) == already_parsed_nodes.end())
2251 : {
2252 12124 : already_parsed_nodes.push_back(cur_node);
2253 12124 : auto contents = cur_node->get_c();
2254 51044 : for (auto task : contents->tasks)
2255 : {
2256 42094 : if (dynamic_cast<module::Switcher*>(&task->get_module()) &&
2257 42094 : task->get_name().find("select") != std::string::npos)
2258 : {
2259 1587 : auto select_task = task;
2260 1587 : auto switcher = dynamic_cast<module::Switcher*>(&select_task->get_module());
2261 1587 : switcher->set_no_copy_select(false);
2262 : }
2263 :
2264 42094 : if (dynamic_cast<module::Switcher*>(&task->get_module()) &&
2265 42094 : task->get_name().find("commute") != std::string::npos)
2266 : {
2267 1587 : auto commute_task = task;
2268 1587 : auto switcher = dynamic_cast<module::Switcher*>(&commute_task->get_module());
2269 1587 : switcher->set_no_copy_commute(false);
2270 : }
2271 :
2272 45104 : if (dynamic_cast<module::Adaptor_m_to_n*>(&task->get_module()) &&
2273 45104 : task->get_name().find("pull") != std::string::npos)
2274 : {
2275 3092 : auto pull_task = task;
2276 3092 : auto adp_pull = dynamic_cast<module::Adaptor_m_to_n*>(&pull_task->get_module());
2277 3092 : adp_pull->set_no_copy_pull(false);
2278 3092 : adp_pull->reset_buffer();
2279 : }
2280 :
2281 45104 : if (dynamic_cast<module::Adaptor_m_to_n*>(&task->get_module()) &&
2282 45104 : task->get_name().find("push") != std::string::npos)
2283 : {
2284 3092 : auto push_task = task;
2285 3092 : auto adp_push = dynamic_cast<module::Adaptor_m_to_n*>(&push_task->get_module());
2286 3092 : adp_push->set_no_copy_push(false);
2287 3092 : adp_push->reset_buffer();
2288 : }
2289 : }
2290 :
2291 21482 : for (size_t rebind_id = 0; rebind_id < contents->rebind_sockets.size(); rebind_id++)
2292 26834 : for (size_t s = 0; s < contents->rebind_sockets[rebind_id].size(); s++)
2293 51923 : for (size_t ta = 0; ta < contents->rebind_sockets[rebind_id][s].size(); ta++)
2294 34447 : contents->rebind_sockets[rebind_id][s][ta]->dataptr =
2295 34447 : contents->rebind_dataptrs[rebind_id][s][ta];
2296 :
2297 21236 : for (auto c : cur_node->get_children())
2298 9112 : reset_no_copy_mode_recursive(c, already_parsed_nodes);
2299 : }
2300 14548 : };
2301 :
2302 5436 : for (auto& sequence : this->sequences)
2303 : {
2304 4984 : std::vector<tools::Digraph_node<Sub_sequence>*> already_parsed_nodes;
2305 4984 : reset_no_copy_mode_recursive(sequence, already_parsed_nodes);
2306 4984 : }
2307 452 : }
2308 :
2309 : void
2310 86 : Sequence::set_no_copy_mode(const bool no_copy_mode)
2311 : {
2312 86 : this->no_copy_mode = no_copy_mode;
2313 86 : }
2314 :
2315 : bool
2316 1260 : Sequence::is_no_copy_mode() const
2317 : {
2318 1260 : return this->no_copy_mode;
2319 : }
2320 :
2321 : void
2322 22 : Sequence::set_auto_stop(const bool auto_stop)
2323 : {
2324 22 : this->auto_stop = auto_stop;
2325 22 : }
2326 :
2327 : bool
2328 0 : Sequence::is_auto_stop() const
2329 : {
2330 0 : return this->auto_stop;
2331 : }
2332 :
2333 : Sub_sequence*
2334 18540 : Sequence::get_last_subsequence(const size_t tid)
2335 : {
2336 : std::function<Sub_sequence*(tools::Digraph_node<Sub_sequence>*, std::vector<tools::Digraph_node<Sub_sequence>*>&)>
2337 : get_last_subsequence_recursive =
2338 25716 : [&get_last_subsequence_recursive,
2339 : &tid](tools::Digraph_node<Sub_sequence>* cur_node,
2340 7176 : std::vector<tools::Digraph_node<Sub_sequence>*>& already_parsed_nodes) -> Sub_sequence*
2341 : {
2342 51432 : if (cur_node != nullptr &&
2343 51432 : std::find(already_parsed_nodes.begin(), already_parsed_nodes.end(), cur_node) == already_parsed_nodes.end())
2344 : {
2345 24306 : already_parsed_nodes.push_back(cur_node);
2346 24306 : if (!cur_node->get_children().size()) return cur_node->get_contents();
2347 5766 : Sub_sequence* last_ss = nullptr;
2348 12942 : for (auto c : cur_node->get_children())
2349 : {
2350 7176 : Sub_sequence* last_branch_ss = nullptr;
2351 7176 : last_branch_ss = get_last_subsequence_recursive(c, already_parsed_nodes);
2352 7176 : if (last_ss && last_branch_ss && last_ss != last_branch_ss)
2353 : {
2354 0 : std::stringstream message;
2355 : message << "Multiple candidates have been found for the last subsequence, this shouldn't be "
2356 0 : << "possible. (tid = " << tid << ", "
2357 0 : << "last_ss.id = " << last_ss->id << ", "
2358 0 : << "last_branch_ss.id = " << last_branch_ss->id << ")";
2359 0 : throw tools::runtime_error(__FILE__, __LINE__, __func__, message.str());
2360 0 : }
2361 7176 : last_ss = last_branch_ss ? last_branch_ss : last_ss;
2362 : }
2363 5766 : return last_ss;
2364 : }
2365 1410 : return nullptr;
2366 18540 : };
2367 :
2368 18540 : std::vector<tools::Digraph_node<Sub_sequence>*> already_parsed_nodes;
2369 37080 : return get_last_subsequence_recursive(this->sequences[tid], already_parsed_nodes);
2370 18540 : }
2371 :
2372 : void
2373 18540 : Sequence::update_tasks_id(const size_t tid)
2374 : {
2375 : std::function<void(tools::Digraph_node<Sub_sequence>*, std::vector<tools::Digraph_node<Sub_sequence>*>&)>
2376 : update_tasks_id_recursive =
2377 25716 : [&update_tasks_id_recursive](tools::Digraph_node<Sub_sequence>* cur_node,
2378 7176 : std::vector<tools::Digraph_node<Sub_sequence>*>& already_parsed_nodes)
2379 : {
2380 51432 : if (cur_node != nullptr &&
2381 51432 : std::find(already_parsed_nodes.begin(), already_parsed_nodes.end(), cur_node) == already_parsed_nodes.end())
2382 : {
2383 24306 : already_parsed_nodes.push_back(cur_node);
2384 24306 : Sub_sequence* ss = cur_node->get_contents();
2385 24306 : ss->tasks_id.resize(ss->tasks.size());
2386 24306 : std::iota(ss->tasks_id.begin(), ss->tasks_id.end(), ss->tasks_id.front());
2387 :
2388 31482 : for (auto c : cur_node->get_children())
2389 7176 : update_tasks_id_recursive(c, already_parsed_nodes);
2390 : }
2391 44256 : };
2392 :
2393 18540 : std::vector<tools::Digraph_node<Sub_sequence>*> already_parsed_nodes;
2394 37080 : return update_tasks_id_recursive(this->sequences[tid], already_parsed_nodes);
2395 18540 : }
2396 :
2397 : std::vector<runtime::Task*>
2398 7444 : Sequence::get_tasks_from_id(const size_t taid)
2399 : {
2400 : std::function<void(tools::Digraph_node<Sub_sequence>*,
2401 : const size_t,
2402 : std::vector<runtime::Task*>&,
2403 : std::vector<tools::Digraph_node<Sub_sequence>*>&)>
2404 : get_tasks_from_id_recursive =
2405 100879 : [&get_tasks_from_id_recursive](tools::Digraph_node<Sub_sequence>* cur_node,
2406 : const size_t taid,
2407 : std::vector<runtime::Task*>& tasks,
2408 38063 : std::vector<tools::Digraph_node<Sub_sequence>*>& already_parsed_nodes)
2409 : {
2410 201758 : if (cur_node != nullptr &&
2411 201758 : std::find(already_parsed_nodes.begin(), already_parsed_nodes.end(), cur_node) == already_parsed_nodes.end())
2412 : {
2413 94709 : already_parsed_nodes.push_back(cur_node);
2414 94709 : Sub_sequence* ss = cur_node->get_contents();
2415 94709 : bool found = false;
2416 257781 : for (size_t t = 0; t < ss->tasks_id.size(); t++)
2417 225888 : if (ss->tasks_id[t] == taid)
2418 : {
2419 62816 : tasks.push_back(ss->tasks[t]);
2420 62816 : found = true;
2421 62816 : break;
2422 : }
2423 :
2424 94709 : if (!found)
2425 69956 : for (auto c : cur_node->get_children())
2426 38063 : get_tasks_from_id_recursive(c, taid, tasks, already_parsed_nodes);
2427 : }
2428 108323 : };
2429 :
2430 7444 : std::vector<runtime::Task*> tasks;
2431 70260 : for (auto& s : this->sequences)
2432 : {
2433 62816 : std::vector<tools::Digraph_node<Sub_sequence>*> already_parsed_nodes;
2434 62816 : get_tasks_from_id_recursive(s, taid, tasks, already_parsed_nodes);
2435 62816 : }
2436 14888 : return tasks;
2437 7444 : }
2438 :
2439 : void
2440 2909 : Sequence::update_firsts_and_lasts_tasks()
2441 : {
2442 2909 : this->firsts_tasks.clear();
2443 2909 : this->firsts_tasks.resize(this->n_threads);
2444 6434 : for (auto taid : firsts_tasks_id)
2445 : {
2446 3525 : auto tasks = this->get_tasks_from_id(taid);
2447 33020 : for (size_t tid = 0; tid < tasks.size(); tid++)
2448 29495 : firsts_tasks[tid].push_back(tasks[tid]);
2449 3525 : }
2450 :
2451 2909 : this->lasts_tasks.clear();
2452 2909 : this->lasts_tasks.resize(this->n_threads);
2453 6828 : for (auto taid : lasts_tasks_id)
2454 : {
2455 3919 : auto tasks = this->get_tasks_from_id(taid);
2456 37240 : for (size_t tid = 0; tid < tasks.size(); tid++)
2457 33321 : lasts_tasks[tid].push_back(tasks[tid]);
2458 3919 : }
2459 2909 : }
2460 :
2461 : void
2462 440 : Sequence::_set_n_frames_unbind(std::vector<std::pair<runtime::Socket*, runtime::Socket*>>& unbind_sockets,
2463 : std::vector<std::pair<runtime::Task*, runtime::Socket*>>& unbind_tasks)
2464 : {
2465 : std::function<void(const std::vector<runtime::Task*>&,
2466 : tools::Digraph_node<Sub_sequence>*,
2467 : std::vector<tools::Digraph_node<Sub_sequence>*>&)>
2468 28226 : graph_traversal = [&graph_traversal, &unbind_sockets, &unbind_tasks](
2469 : const std::vector<runtime::Task*>& possessed_tsks,
2470 : tools::Digraph_node<Sub_sequence>* cur_node,
2471 77512 : std::vector<tools::Digraph_node<Sub_sequence>*>& already_parsed_nodes)
2472 : {
2473 56452 : if (cur_node != nullptr &&
2474 56452 : std::find(already_parsed_nodes.begin(), already_parsed_nodes.end(), cur_node) == already_parsed_nodes.end())
2475 : {
2476 23490 : already_parsed_nodes.push_back(cur_node);
2477 23490 : Sub_sequence* ss = cur_node->get_contents();
2478 76996 : for (auto tsk_out : ss->tasks)
2479 : {
2480 209378 : for (auto sck_out : tsk_out->sockets)
2481 : {
2482 155872 : if (sck_out->get_type() == socket_t::SOUT || sck_out->get_type() == socket_t::SFWD)
2483 : {
2484 107668 : auto sck_out_bound_sockets_cpy = sck_out->get_bound_sockets();
2485 163228 : for (auto sck_in : sck_out_bound_sockets_cpy)
2486 : {
2487 55560 : auto tsk_in = &sck_in->get_task();
2488 : // if the task of the current input socket is in the tasks of the sequence
2489 55560 : if (std::find(possessed_tsks.begin(), possessed_tsks.end(), tsk_in) != possessed_tsks.end())
2490 : {
2491 : try
2492 : {
2493 55560 : tsk_in->unbind(*sck_out);
2494 4656 : unbind_tasks.push_back(std::make_pair(tsk_in, sck_out.get()));
2495 : }
2496 50904 : catch (...)
2497 : {
2498 50904 : sck_in->unbind(*sck_out);
2499 : // memorize the unbinds to rebind after!
2500 50904 : unbind_sockets.push_back(std::make_pair(sck_in, sck_out.get()));
2501 50904 : }
2502 : }
2503 : }
2504 107668 : }
2505 155872 : }
2506 : }
2507 45442 : for (auto c : cur_node->get_children())
2508 21952 : graph_traversal(possessed_tsks, c, already_parsed_nodes);
2509 : }
2510 28666 : };
2511 :
2512 440 : auto tsks_per_threads = this->get_tasks_per_threads();
2513 440 : std::vector<tools::Digraph_node<Sub_sequence>*> already_parsed_nodes;
2514 6714 : for (size_t t = 0; t < this->get_n_threads(); t++)
2515 : {
2516 6274 : already_parsed_nodes.clear();
2517 6274 : graph_traversal(tsks_per_threads[t], this->sequences[t], already_parsed_nodes);
2518 : }
2519 440 : }
2520 :
2521 : void
2522 440 : Sequence::_set_n_frames(const size_t n_frames)
2523 : {
2524 440 : if (n_frames <= 0)
2525 : {
2526 0 : std::stringstream message;
2527 0 : message << "'n_frames' has to be greater than 0 ('n_frames' = " << n_frames << ").";
2528 0 : throw tools::invalid_argument(__FILE__, __LINE__, __func__, message.str());
2529 0 : }
2530 :
2531 : // set_n_frames for all the modules (be aware that this operation can fail)
2532 6714 : for (auto& mm : this->all_modules)
2533 57898 : for (auto& m : mm)
2534 51624 : m->set_n_frames(n_frames);
2535 440 : }
2536 :
2537 : void
2538 440 : Sequence::_set_n_frames_rebind(const std::vector<std::pair<runtime::Socket*, runtime::Socket*>>& unbind_sockets,
2539 : const std::vector<std::pair<runtime::Task*, runtime::Socket*>>& unbind_tasks)
2540 : {
2541 : // rebind the sockets
2542 51344 : for (auto& u : unbind_sockets)
2543 50904 : (*u.first) = *u.second;
2544 :
2545 : // rebind the tasks
2546 5096 : for (auto& u : unbind_tasks)
2547 4656 : (*u.first) = *u.second;
2548 440 : }
2549 :
2550 : void
2551 394 : Sequence::set_n_frames(const size_t n_frames)
2552 : {
2553 394 : const auto old_n_frames = this->get_n_frames();
2554 394 : if (old_n_frames != n_frames)
2555 : {
2556 302 : std::vector<std::pair<runtime::Socket*, runtime::Socket*>> unbind_sockets;
2557 302 : std::vector<std::pair<runtime::Task*, runtime::Socket*>> unbind_tasks;
2558 302 : this->_set_n_frames_unbind(unbind_sockets, unbind_tasks);
2559 302 : this->_set_n_frames(n_frames);
2560 302 : this->_set_n_frames_rebind(unbind_sockets, unbind_tasks);
2561 302 : }
2562 394 : }
2563 :
2564 : bool
2565 11 : Sequence::is_control_flow() const
2566 : {
2567 11 : return this->sequences[0]->get_children().size();
2568 : }
2569 :
2570 : // /!\ this check has been commented because it is known to do not work in the general case
2571 : /*
2572 : template<class SS>
2573 : void Sequence
2574 : ::check_ctrl_flow(tools::Digraph_node<SS>* root)
2575 : {
2576 : std::function<void(tools::Digraph_node<SS>*,
2577 : std::vector<tools::Digraph_node<SS>*>&)> check_control_flow_parity =
2578 : [&check_control_flow_parity](tools::Digraph_node<SS>* cur_node,
2579 : std::vector<tools::Digraph_node<SS>*> already_parsed_nodes) -> void
2580 : {
2581 : if (cur_node != nullptr &&
2582 : std::find(already_parsed_nodes.begin(),
2583 : already_parsed_nodes.end(),
2584 : cur_node) == already_parsed_nodes.end() &&
2585 : cur_node->get_children().size())
2586 : {
2587 : already_parsed_nodes.push_back(cur_node);
2588 : for (auto c : cur_node->get_children())
2589 : check_control_flow_parity(c, already_parsed_nodes);
2590 : }
2591 : else
2592 : {
2593 : already_parsed_nodes.push_back(cur_node);
2594 : std::vector<module::Module*> parsed_switchers;
2595 : for (size_t i = 0; i < already_parsed_nodes.size(); i++)
2596 : {
2597 : // This check occurs before dud-nodes are removed by _init, some nodes have no
2598 : contents and must be
2599 : // accounted for
2600 : if (already_parsed_nodes[i]->get_c() == nullptr ||
2601 : !(already_parsed_nodes[i]->get_c()->type == subseq_t::COMMUTE ||
2602 : already_parsed_nodes[i]->get_c()->type == subseq_t::SELECT))
2603 : continue;
2604 :
2605 : // We search for the first switcher task in the path taken: already_parsed_nodes
2606 : const runtime::Task *ctrl_task_first = nullptr;
2607 : const runtime::Task *ctrl_task_second = nullptr;
2608 : for (auto t : already_parsed_nodes[i]->get_c()->tasks)
2609 : {
2610 : if (dynamic_cast<const module::Switcher*>(&t->get_module()) &&
2611 : (t->get_name() == "select" || t->get_name() == "commute"))
2612 : {
2613 : ctrl_task_first = t;
2614 : break;
2615 : }
2616 : }
2617 :
2618 : if (std::find(parsed_switchers.begin(), parsed_switchers.end(),
2619 : &(ctrl_task_first->get_module())) != parsed_switchers.end()) continue;
2620 :
2621 : // We now search for the second switcher task in the path taken
2622 : auto expected_type = ctrl_task_first->get_name() == "select" ? subseq_t::COMMUTE
2623 : : subseq_t::SELECT; for (size_t j = i; j < already_parsed_nodes.size() && ctrl_task_second == nullptr; j++)
2624 : {
2625 : if (already_parsed_nodes[j]->get_c() == nullptr ||
2626 : already_parsed_nodes[j]->get_c()->type != expected_type)
2627 : continue;
2628 : for (auto t : already_parsed_nodes[j]->get_c()->tasks)
2629 : {
2630 : if ((t->get_name() == "select" || t->get_name() == "commute") &&
2631 : &(ctrl_task_first->get_module()) == &(t->get_module()))
2632 : {
2633 : parsed_switchers.push_back(&(t->get_module()));
2634 : ctrl_task_second = t;
2635 : break;
2636 : }
2637 : }
2638 : }
2639 :
2640 : if (ctrl_task_second == nullptr)
2641 : {
2642 : for (auto t : ctrl_task_first->get_module().tasks)
2643 : {
2644 : if ((ctrl_task_first->get_name() == "select" && t->get_name() ==
2645 : "commute") || (ctrl_task_first->get_name() == "commute" && t->get_name() == "select"))
2646 : {
2647 : ctrl_task_second = t.get();
2648 : break;
2649 : }
2650 : }
2651 : std::stringstream message;
2652 : message << ctrl_task_first->get_name() << " is missing a path to "
2653 : << ctrl_task_second->get_name() << ".";
2654 : throw tools::control_flow_error(__FILE__, __LINE__, __func__,
2655 : message.str());
2656 : }
2657 : }
2658 : }
2659 : };
2660 :
2661 : std::vector<tools::Digraph_node<SS>*> already_parsed_nodes;
2662 : return check_control_flow_parity(root, already_parsed_nodes);
2663 : }
2664 : */
|