Line data Source code
1 : #include <cassert>
2 : #include <fstream>
3 : #include <thread>
4 : #include <tuple>
5 : #include <utility>
6 :
7 : #include "Module/Stateful/Adaptor/Adaptor_1_to_n.hpp"
8 : #include "Module/Stateful/Adaptor/Adaptor_n_to_1.hpp"
9 : #include "Runtime/Pipeline/Pipeline.hpp"
10 : #include "Tools/Exception/exception.hpp"
11 : #include "Tools/Interface/Interface_waiting.hpp"
12 : #include "Tools/Thread_pinning/Thread_pinning_utils.hpp"
13 :
14 : using namespace spu;
15 : using namespace spu::runtime;
16 :
17 : // Pipeline
18 : // ::Pipeline(const runtime::Task &first,
19 : // const runtime::Task &last,
20 : // const std::vector<std::pair<std::vector<const runtime::Task*>, std::vector<const runtime::Task*>>>
21 : // &sep_stages, const std::vector<size_t> &n_threads, const std::vector<size_t> &synchro_buffer_sizes, const
22 : // std::vector<bool> &synchro_active_waiting, const std::vector<bool> &thread_pinning, const
23 : // std::vector<std::vector<size_t>> &puids)
24 : // : original_sequence(first, last, 1),
25 : // stages(sep_stages.size()),
26 : // adaptors(sep_stages.size() -1),
27 : // saved_firsts_tasks_id(sep_stages.size()),
28 : // saved_lasts_tasks_id(sep_stages.size()),
29 : // bound_adaptors(false)
30 : // {
31 : // this->init<const runtime::Task>(first,
32 : // &last,
33 : // sep_stages,
34 : // n_threads,
35 : // synchro_buffer_sizes,
36 : // synchro_active_waiting,
37 : // thread_pinning,
38 : // puids);
39 : // }
40 :
41 : // Pipeline
42 : // ::Pipeline(const runtime::Task &first,
43 : // const std::vector<std::pair<std::vector<const runtime::Task*>, std::vector<const runtime::Task*>>>
44 : // &sep_stages, const std::vector<size_t> &n_threads, const std::vector<size_t> &synchro_buffer_sizes, const
45 : // std::vector<bool> &synchro_active_waiting, const std::vector<bool> &thread_pinning, const
46 : // std::vector<std::vector<size_t>> &puids)
47 : // : original_sequence(first, 1),
48 : // stages(sep_stages.size()),
49 : // adaptors(sep_stages.size() -1),
50 : // saved_firsts_tasks_id(sep_stages.size()),
51 : // saved_lasts_tasks_id(sep_stages.size()),
52 : // bound_adaptors(false)
53 : // {
54 : // const runtime::Task* last = nullptr;
55 : // this->init<const runtime::Task>(first,
56 : // last,
57 : // sep_stages,
58 : // n_threads,
59 : // synchro_buffer_sizes,
60 : // synchro_active_waiting,
61 : // thread_pinning,
62 : // puids);
63 : // }
64 :
65 13 : Pipeline
66 : ::Pipeline(const std::vector<runtime::Task*> &firsts,
67 : const std::vector<runtime::Task*> &lasts,
68 : const std::vector<std::tuple<std::vector<runtime::Task*>, std::vector<runtime::Task*>, std::vector<runtime::Task*>>> &sep_stages,
69 : const std::vector<size_t> &n_threads,
70 : const std::vector<size_t> &synchro_buffer_sizes,
71 : const std::vector<bool> &synchro_active_waiting,
72 : const std::vector<bool> &thread_pinning,
73 : const std::vector<std::vector<size_t>> &puids/*,
74 13 : const std::vector<bool> &tasks_inplace*/)
75 13 : : original_sequence(firsts, lasts, 1),
76 13 : stages(sep_stages.size()),
77 13 : adaptors(sep_stages.size() -1),
78 13 : saved_firsts_tasks_id(sep_stages.size()),
79 13 : saved_lasts_tasks_id(sep_stages.size()),
80 13 : bound_adaptors(false),
81 26 : auto_stop(true)
82 : {
83 13 : this->init<runtime::Task>(
84 : firsts, lasts, sep_stages, n_threads, synchro_buffer_sizes, synchro_active_waiting, thread_pinning, puids
85 : /*, tasks_inplace*/);
86 13 : }
87 :
88 26 : Pipeline
89 : ::Pipeline(const std::vector<runtime::Task*> &firsts,
90 : const std::vector<runtime::Task*> &lasts,
91 : const std::vector<std::pair<std::vector<runtime::Task*>, std::vector<runtime::Task*>>> &sep_stages,
92 : const std::vector<size_t> &n_threads,
93 : const std::vector<size_t> &synchro_buffer_sizes,
94 : const std::vector<bool> &synchro_active_waiting,
95 : const std::vector<bool> &thread_pinning,
96 : const std::vector<std::vector<size_t>> &puids/*,
97 26 : const std::vector<bool> &tasks_inplace*/)
98 26 : : original_sequence(firsts, lasts, 1),
99 26 : stages(sep_stages.size()),
100 26 : adaptors(sep_stages.size() -1),
101 26 : saved_firsts_tasks_id(sep_stages.size()),
102 26 : saved_lasts_tasks_id(sep_stages.size()),
103 26 : bound_adaptors(false),
104 52 : auto_stop(true)
105 : {
106 : std::vector<std::tuple<std::vector<runtime::Task*>, std::vector<runtime::Task*>, std::vector<runtime::Task*>>>
107 26 : sep_stages_bis;
108 117 : for (auto& sep_stage : sep_stages)
109 91 : sep_stages_bis.push_back(std::make_tuple(sep_stage.first, sep_stage.second, std::vector<runtime::Task*>()));
110 :
111 26 : this->init<runtime::Task>(firsts,
112 : lasts,
113 : sep_stages_bis,
114 : n_threads,
115 : synchro_buffer_sizes,
116 : synchro_active_waiting,
117 : thread_pinning, puids/*,
118 : tasks_inplace*/);
119 26 : }
120 :
121 13 : Pipeline
122 : ::Pipeline(const std::vector<runtime::Task*> &firsts,
123 : const std::vector<std::tuple<std::vector<runtime::Task*>, std::vector<runtime::Task*>, std::vector<runtime::Task*>>> &sep_stages,
124 : const std::vector<size_t> &n_threads,
125 : const std::vector<size_t> &synchro_buffer_sizes,
126 : const std::vector<bool> &synchro_active_waiting,
127 : const std::vector<bool> &thread_pinning,
128 : const std::vector<std::vector<size_t>> &puids/*,
129 13 : const std::vector<bool> &tasks_inplace*/)
130 : : Pipeline(firsts,
131 : {},
132 : sep_stages,
133 : n_threads,
134 : synchro_buffer_sizes,
135 : synchro_active_waiting,
136 : thread_pinning, puids/*,
137 13 : tasks_inplace*/)
138 : {
139 13 : }
140 :
141 26 : Pipeline
142 : ::Pipeline(const std::vector<runtime::Task*> &firsts,
143 : const std::vector<std::pair<std::vector<runtime::Task*>, std::vector<runtime::Task*>>> &sep_stages,
144 : const std::vector<size_t> &n_threads,
145 : const std::vector<size_t> &synchro_buffer_sizes,
146 : const std::vector<bool> &synchro_active_waiting,
147 : const std::vector<bool> &thread_pinning,
148 : const std::vector<std::vector<size_t>> &puids/*,
149 26 : const std::vector<bool> &tasks_inplace*/)
150 : : Pipeline(firsts,
151 : {},
152 : sep_stages,
153 : n_threads,
154 : synchro_buffer_sizes,
155 : synchro_active_waiting,
156 : thread_pinning, puids/*,
157 26 : tasks_inplace*/)
158 : {
159 26 : }
160 :
161 0 : Pipeline
162 : ::Pipeline(runtime::Task &first,
163 : runtime::Task &last,
164 : const std::vector<std::pair<std::vector<runtime::Task*>, std::vector<runtime::Task*>>> &sep_stages,
165 : const std::vector<size_t> &n_threads,
166 : const std::vector<size_t> &synchro_buffer_sizes,
167 : const std::vector<bool> &synchro_active_waiting,
168 : const std::vector<bool> &thread_pinning,
169 : const std::vector<std::vector<size_t>> &puids/*,
170 0 : const std::vector<bool> &tasks_inplace*/)
171 : : Pipeline({&first},
172 : {&last},
173 : sep_stages,
174 : n_threads,
175 : synchro_buffer_sizes,
176 : synchro_active_waiting,
177 : thread_pinning, puids/*,
178 0 : tasks_inplace*/)
179 : {
180 0 : }
181 :
182 26 : Pipeline
183 : ::Pipeline(runtime::Task &first,
184 : const std::vector<std::pair<std::vector<runtime::Task*>, std::vector<runtime::Task*>>> &sep_stages,
185 : const std::vector<size_t> &n_threads,
186 : const std::vector<size_t> &synchro_buffer_sizes,
187 : const std::vector<bool> &synchro_active_waiting,
188 : const std::vector<bool> &thread_pinning,
189 : const std::vector<std::vector<size_t>> &puids/*,
190 26 : const std::vector<bool> &tasks_inplace*/)
191 : : Pipeline({&first},
192 : sep_stages,
193 : n_threads,
194 : synchro_buffer_sizes,
195 : synchro_active_waiting,
196 : thread_pinning, puids/*,
197 26 : tasks_inplace*/)
198 : {
199 26 : }
200 :
201 : //========================== Pipeline constructors with new version thread pinning =====================================
202 0 : Pipeline
203 : ::Pipeline(const std::vector<runtime::Task*> &firsts,
204 : const std::vector<runtime::Task*> &lasts,
205 : const std::vector<std::tuple<std::vector<runtime::Task*>, std::vector<runtime::Task*>, std::vector<runtime::Task*>>> &sep_stages,
206 : const std::vector<size_t> &n_threads,
207 : const std::vector<size_t> &synchro_buffer_sizes,
208 : const std::vector<bool> &synchro_active_waiting,
209 : const std::vector<bool> &thread_pinning,
210 : const std::string &pipeline_pinning_policy/*,
211 0 : const std::vector<bool> &tasks_inplace*/)
212 0 : : original_sequence(firsts, lasts, 1),
213 0 : stages(sep_stages.size()),
214 0 : adaptors(sep_stages.size() -1),
215 0 : saved_firsts_tasks_id(sep_stages.size()),
216 0 : saved_lasts_tasks_id(sep_stages.size()),
217 0 : bound_adaptors(false),
218 0 : auto_stop(true)
219 : {
220 0 : this->init<runtime::Task>(firsts,
221 : lasts,
222 : sep_stages,
223 : n_threads,
224 : synchro_buffer_sizes,
225 : synchro_active_waiting,
226 : thread_pinning, {}, pipeline_pinning_policy/*,
227 : tasks_inplace*/);
228 0 : }
229 :
230 41 : Pipeline
231 : ::Pipeline(const std::vector<runtime::Task*> &firsts,
232 : const std::vector<runtime::Task*> &lasts,
233 : const std::vector<std::pair<std::vector<runtime::Task*>, std::vector<runtime::Task*>>> &sep_stages,
234 : const std::vector<size_t> &n_threads,
235 : const std::vector<size_t> &synchro_buffer_sizes,
236 : const std::vector<bool> &synchro_active_waiting,
237 : const std::vector<bool> &thread_pinning,
238 : const std::string &pipeline_pinning_policy/*,
239 41 : const std::vector<bool> &tasks_inplace*/)
240 41 : : original_sequence(firsts, lasts, 1),
241 41 : stages(sep_stages.size()),
242 41 : adaptors(sep_stages.size() -1),
243 41 : saved_firsts_tasks_id(sep_stages.size()),
244 41 : saved_lasts_tasks_id(sep_stages.size()),
245 41 : bound_adaptors(false),
246 82 : auto_stop(true)
247 : {
248 : std::vector<std::tuple<std::vector<runtime::Task*>, std::vector<runtime::Task*>, std::vector<runtime::Task*>>>
249 41 : sep_stages_bis;
250 183 : for (auto& sep_stage : sep_stages)
251 142 : sep_stages_bis.push_back(std::make_tuple(sep_stage.first, sep_stage.second, std::vector<runtime::Task*>()));
252 :
253 41 : this->init<runtime::Task>(firsts,
254 : lasts,
255 : sep_stages_bis,
256 : n_threads,
257 : synchro_buffer_sizes,
258 : synchro_active_waiting,
259 : thread_pinning, {}, pipeline_pinning_policy/*,
260 : tasks_inplace*/);
261 41 : }
262 :
263 0 : Pipeline
264 : ::Pipeline(const std::vector<runtime::Task*> &firsts,
265 : const std::vector<std::tuple<std::vector<runtime::Task*>, std::vector<runtime::Task*>, std::vector<runtime::Task*>>> &sep_stages,
266 : const std::vector<size_t> &n_threads,
267 : const std::vector<size_t> &synchro_buffer_sizes,
268 : const std::vector<bool> &synchro_active_waiting,
269 : const std::vector<bool> &thread_pinning,
270 : const std::string &pipeline_pinning_policy/*,
271 0 : const std::vector<bool> &tasks_inplace*/)
272 : : Pipeline(firsts,
273 : {},
274 : sep_stages,
275 : n_threads,
276 : synchro_buffer_sizes,
277 : synchro_active_waiting,
278 : thread_pinning, pipeline_pinning_policy/*,
279 0 : tasks_inplace*/)
280 : {
281 0 : }
282 :
283 30 : Pipeline
284 : ::Pipeline(const std::vector<runtime::Task*> &firsts,
285 : const std::vector<std::pair<std::vector<runtime::Task*>, std::vector<runtime::Task*>>> &sep_stages,
286 : const std::vector<size_t> &n_threads,
287 : const std::vector<size_t> &synchro_buffer_sizes,
288 : const std::vector<bool> &synchro_active_waiting,
289 : const std::vector<bool> &thread_pinning,
290 : const std::string &pipeline_pinning_policy/*,
291 30 : const std::vector<bool> &tasks_inplace*/)
292 : : Pipeline(firsts,
293 : {},
294 : sep_stages,
295 : n_threads,
296 : synchro_buffer_sizes,
297 : synchro_active_waiting,
298 : thread_pinning, pipeline_pinning_policy/*,
299 30 : tasks_inplace*/)
300 : {
301 30 : }
302 :
303 0 : Pipeline
304 : ::Pipeline(runtime::Task &first,
305 : runtime::Task &last,
306 : const std::vector<std::pair<std::vector<runtime::Task*>, std::vector<runtime::Task*>>> &sep_stages,
307 : const std::vector<size_t> &n_threads,
308 : const std::vector<size_t> &synchro_buffer_sizes,
309 : const std::vector<bool> &synchro_active_waiting,
310 : const std::vector<bool> &thread_pinning,
311 : const std::string &pipeline_pinning_policy/*,
312 0 : const std::vector<bool> &tasks_inplace*/)
313 : : Pipeline({&first},
314 : {&last},
315 : sep_stages,
316 : n_threads,
317 : synchro_buffer_sizes,
318 : synchro_active_waiting,
319 : thread_pinning, pipeline_pinning_policy/*,
320 0 : tasks_inplace*/)
321 : {
322 0 : }
323 :
324 30 : Pipeline
325 : ::Pipeline(runtime::Task &first,
326 : const std::vector<std::pair<std::vector<runtime::Task*>, std::vector<runtime::Task*>>> &sep_stages,
327 : const std::vector<size_t> &n_threads,
328 : const std::vector<size_t> &synchro_buffer_sizes,
329 : const std::vector<bool> &synchro_active_waiting,
330 : const std::vector<bool> &thread_pinning,
331 : const std::string &pipeline_pinning_policy/*,
332 30 : const std::vector<bool> &tasks_inplace*/)
333 : : Pipeline({&first},
334 : sep_stages,
335 : n_threads,
336 : synchro_buffer_sizes,
337 : synchro_active_waiting,
338 : thread_pinning,
339 : pipeline_pinning_policy/*,
340 30 : tasks_inplace*/)
341 : {
342 30 : }
343 :
344 160 : Pipeline::~Pipeline()
345 : {
346 80 : this->unbind_adaptors();
347 160 : }
348 :
349 : std::vector<Sequence*>
350 159 : Pipeline::get_stages()
351 : {
352 159 : std::vector<Sequence*> stages;
353 748 : for (auto& stage : this->stages)
354 589 : stages.push_back(stage.get());
355 159 : return stages;
356 0 : }
357 :
358 : Sequence&
359 0 : Pipeline::operator[](const size_t stage_id)
360 : {
361 0 : assert(stage_id < this->stages.size());
362 0 : return *this->stages[stage_id];
363 : }
364 :
365 : template<class TA>
366 : runtime::Sequence*
367 : create_sequence(const std::vector<TA*>& firsts,
368 : const std::vector<TA*>& lasts,
369 : const std::vector<TA*>& exclusions,
370 : const size_t& n_threads,
371 : const bool& thread_pinning,
372 : const std::vector<size_t>& puids,
373 : const bool& tasks_inplace)
374 : {
375 : throw tools::unimplemented_error(__FILE__, __LINE__, __func__);
376 : }
377 :
378 : template<>
379 : runtime::Sequence*
380 0 : create_sequence<const runtime::Task>(const std::vector<const runtime::Task*>& firsts,
381 : const std::vector<const runtime::Task*>& lasts,
382 : const std::vector<const runtime::Task*>& exclusions,
383 : const size_t& n_threads,
384 : const bool& thread_pinning,
385 : const std::vector<size_t>& puids,
386 : const bool& tasks_inplace)
387 : {
388 0 : return new runtime::Sequence(firsts, lasts, exclusions, n_threads, thread_pinning, puids);
389 : }
390 :
391 : template<>
392 : Sequence*
393 248 : create_sequence<runtime::Task>(const std::vector<runtime::Task*>& firsts,
394 : const std::vector<runtime::Task*>& lasts,
395 : const std::vector<runtime::Task*>& exclusions,
396 : const size_t& n_threads,
397 : const bool& thread_pinning,
398 : const std::vector<size_t>& puids,
399 : const bool& tasks_inplace)
400 : {
401 248 : return new runtime::Sequence(firsts, lasts, exclusions, n_threads, thread_pinning, puids, tasks_inplace);
402 : }
403 :
404 : // Init and sequence creation for second pinning version
405 : template<class TA>
406 : runtime::Sequence*
407 : create_sequence(const std::vector<TA*>& firsts,
408 : const std::vector<TA*>& lasts,
409 : const std::vector<TA*>& exclusions,
410 : const size_t& n_threads,
411 : const bool& thread_pinning,
412 : const std::string& pipeline_pinning_policy,
413 : const bool& tasks_inplace)
414 : {
415 : throw tools::unimplemented_error(__FILE__, __LINE__, __func__);
416 : }
417 :
418 : template<>
419 : runtime::Sequence*
420 0 : create_sequence<const runtime::Task>(const std::vector<const runtime::Task*>& firsts,
421 : const std::vector<const runtime::Task*>& lasts,
422 : const std::vector<const runtime::Task*>& exclusions,
423 : const size_t& n_threads,
424 : const bool& thread_pinning,
425 : const std::string& pipeline_pinning_policy,
426 : const bool& tasks_inplace)
427 : {
428 0 : return new runtime::Sequence(firsts, lasts, exclusions, n_threads, thread_pinning, pipeline_pinning_policy);
429 : }
430 :
431 : template<>
432 : Sequence*
433 34 : create_sequence<runtime::Task>(const std::vector<runtime::Task*>& firsts,
434 : const std::vector<runtime::Task*>& lasts,
435 : const std::vector<runtime::Task*>& exclusions,
436 : const size_t& n_threads,
437 : const bool& thread_pinning,
438 : const std::string& pipeline_pinning_policy,
439 : const bool& tasks_inplace)
440 : {
441 : return new runtime::Sequence(
442 34 : firsts, lasts, exclusions, n_threads, thread_pinning, pipeline_pinning_policy, tasks_inplace);
443 : }
444 :
445 : template <class TA>
446 80 : void Pipeline
447 : ::init(const std::vector<TA*> &firsts,
448 : const std::vector<TA*> &lasts,
449 : const std::vector<std::tuple<std::vector<TA*>,std::vector<TA*>,std::vector<TA*>>> &sep_stages,
450 : const std::vector<size_t> &n_threads,
451 : const std::vector<size_t> &synchro_buffer_sizes,
452 : const std::vector<bool> &synchro_active_waiting,
453 : const std::vector<bool> &thread_pinning,
454 : const std::vector<std::vector<size_t>> &puids,
455 : const std::string &pipeline_pinning_policy/*,
456 : const std::vector<bool> &tasks_inplace*/)
457 : {
458 80 : if (sep_stages.size() != n_threads.size() && n_threads.size() != 0)
459 : {
460 0 : std::stringstream message;
461 0 : message << "'n_threads.size()' has to be equal to 'sep_stages.size()' or equal to '0' ('n_threads.size()' = "
462 0 : << n_threads.size() << " , 'sep_stages.size()' = " << sep_stages.size() << ").";
463 0 : throw tools::invalid_argument(__FILE__, __LINE__, __func__, message.str());
464 0 : }
465 :
466 80 : if (sep_stages.size() != synchro_buffer_sizes.size() + 1 && synchro_buffer_sizes.size() != 0)
467 : {
468 0 : std::stringstream message;
469 : message << "'synchro_buffer_sizes.size()' has to be equal to 'sep_stages.size() -1' or equal to '0' "
470 0 : << "('synchro_buffer_sizes.size()' = " << synchro_buffer_sizes.size()
471 0 : << " , 'sep_stages.size()' = " << sep_stages.size() << ").";
472 0 : throw tools::invalid_argument(__FILE__, __LINE__, __func__, message.str());
473 0 : }
474 :
475 80 : if (sep_stages.size() != synchro_active_waiting.size() + 1 && synchro_active_waiting.size() != 0)
476 : {
477 0 : std::stringstream message;
478 : message << "'synchro_active_waiting.size()' has to be equal to 'sep_stages.size() -1' or equal to '0' "
479 0 : << "('synchro_active_waiting.size()' = " << synchro_active_waiting.size()
480 0 : << " , 'sep_stages.size()' = " << sep_stages.size() << ").";
481 0 : throw tools::invalid_argument(__FILE__, __LINE__, __func__, message.str());
482 0 : }
483 :
484 80 : if (sep_stages.size() != thread_pinning.size() && thread_pinning.size() != 0)
485 : {
486 0 : std::stringstream message;
487 : message << "'thread_pinning.size()' has to be equal to 'sep_stages.size()' or equal to '0' ("
488 0 : << "'thread_pinning.size()' = " << thread_pinning.size()
489 0 : << " , 'sep_stages.size()' = " << sep_stages.size() << ").";
490 0 : throw tools::invalid_argument(__FILE__, __LINE__, __func__, message.str());
491 0 : }
492 :
493 80 : if (sep_stages.size() != puids.size() && puids.size() != 0)
494 : {
495 0 : std::stringstream message;
496 0 : message << "'puids.size()' has to be equal to 'sep_stages.size()' or equal to '0' ('puids.size()' = "
497 0 : << puids.size() << " , 'sep_stages.size()' = " << sep_stages.size() << ").";
498 0 : throw tools::invalid_argument(__FILE__, __LINE__, __func__, message.str());
499 0 : }
500 :
501 : // if (sep_stages.size() != tasks_inplace.size() && tasks_inplace.size() != 0)
502 : // {
503 : // std::stringstream message;
504 : // message << "'tasks_inplace.size()' has to be equal to 'sep_stages.size()' or equal to '0' ('"
505 : // << "tasks_inplace.size()' = " << tasks_inplace.size() << " , 'sep_stages.size()' = "
506 : // << sep_stages.size() << ").";
507 : // throw tools::invalid_argument(__FILE__, __LINE__, __func__, message.str());
508 : // }
509 :
510 80 : bool prev_is_parallel = false;
511 362 : for (auto t : n_threads)
512 : {
513 282 : if (t > 1 && prev_is_parallel)
514 : {
515 0 : std::stringstream message;
516 0 : message << "Consecutive parallel stages are not supported.";
517 0 : throw tools::invalid_argument(__FILE__, __LINE__, __func__, message.str());
518 0 : }
519 282 : prev_is_parallel = t > 1;
520 : }
521 :
522 : // Creating a vector of pinning policies for each sequence
523 80 : std::vector<std::string> sequences_pinning_policies;
524 80 : if (!pipeline_pinning_policy.empty())
525 13 : sequences_pinning_policies =
526 : tools::Thread_pinning_utils::pipeline_parser_unpacker(pipeline_pinning_policy, sep_stages.size());
527 :
528 362 : for (size_t s = 0; s < sep_stages.size(); s++)
529 : {
530 282 : const std::vector<TA*>& stage_firsts = std::get<0>(sep_stages[s]);
531 282 : const std::vector<TA*>& stage_lasts = std::get<1>(sep_stages[s]);
532 282 : const std::vector<TA*>& stage_exclusions = std::get<2>(sep_stages[s]);
533 282 : const size_t stage_n_threads = n_threads.size() ? n_threads[s] : 1;
534 282 : const bool stage_thread_pinning = thread_pinning.size() ? thread_pinning[s] : false;
535 282 : const std::vector<size_t> stage_puids = puids.size() ? puids[s] : std::vector<size_t>();
536 316 : const std::string sequence_pinning_policy =
537 316 : sequences_pinning_policies.size() ? sequences_pinning_policies[s] : "";
538 282 : const bool stage_tasks_inplace = /*tasks_inplace.size() ? tasks_inplace[s] :*/ true;
539 : try
540 : {
541 282 : if (pipeline_pinning_policy.empty())
542 248 : this->stages[s].reset(create_sequence<TA>(stage_firsts,
543 : stage_lasts,
544 : stage_exclusions,
545 : stage_n_threads,
546 : stage_thread_pinning,
547 : stage_puids,
548 : stage_tasks_inplace));
549 : else
550 34 : this->stages[s].reset(create_sequence<TA>(stage_firsts,
551 : stage_lasts,
552 : stage_exclusions,
553 : stage_n_threads,
554 : stage_thread_pinning,
555 : sequence_pinning_policy,
556 : stage_tasks_inplace));
557 : }
558 0 : catch (const tools::control_flow_error& e)
559 : {
560 0 : std::stringstream message;
561 0 : message << "Invalid control flow error on stage " << s
562 0 : << " (perhaps a switcher's tasks were separated between different stages)." << std::endl
563 0 : << e.what();
564 0 : throw tools::control_flow_error(__FILE__, __LINE__, __func__, message.str());
565 0 : }
566 282 : this->stages[s]->is_part_of_pipeline = true;
567 : }
568 :
569 : // verify that the sequential sequence is equivalent to the pipeline sequence
570 80 : auto ref_tasks = this->original_sequence.get_tasks_per_threads()[0];
571 80 : auto cur_tasks = this->get_tasks_per_threads()[0];
572 :
573 80 : if (ref_tasks.size() != cur_tasks.size())
574 : {
575 0 : std::ofstream f1("dbg_ref_sequence.dot");
576 0 : this->original_sequence.export_dot(f1);
577 0 : std::ofstream f2("dbg_cur_pipeline.dot");
578 0 : this->export_dot(f2);
579 :
580 0 : std::stringstream message;
581 0 : message << "'ref_tasks.size()' has to be equal to 'cur_tasks.size()' ('ref_tasks.size()' = " << ref_tasks.size()
582 0 : << ", 'cur_tasks.size()' = " << cur_tasks.size() << ").";
583 0 : throw tools::invalid_argument(__FILE__, __LINE__, __func__, message.str());
584 0 : }
585 :
586 820 : for (size_t ta = 0; ta < cur_tasks.size(); ta++)
587 : {
588 740 : if (std::find(ref_tasks.begin(), ref_tasks.end(), cur_tasks[ta]) == ref_tasks.end())
589 : {
590 0 : std::ofstream f1("dbg_ref_sequence.dot");
591 0 : this->original_sequence.export_dot(f1);
592 0 : std::ofstream f2("dbg_cur_pipeline.dot");
593 0 : this->export_dot(f2);
594 :
595 0 : std::stringstream message;
596 0 : message << "'cur_tasks[ta]' is not contained in the 'ref_tasks' vector ('ta' = " << ta
597 0 : << ", 'cur_tasks[ta]' = " << +cur_tasks[ta]
598 0 : << ", 'cur_tasks[ta]->get_name()' = " << cur_tasks[ta]->get_name() << ").";
599 0 : throw tools::invalid_argument(__FILE__, __LINE__, __func__, message.str());
600 0 : }
601 : }
602 :
603 80 : this->create_adaptors(synchro_buffer_sizes, synchro_active_waiting);
604 80 : this->bind_adaptors();
605 80 : }
606 :
607 : void
608 80 : Pipeline::create_adaptors(const std::vector<size_t>& synchro_buffer_sizes,
609 : const std::vector<bool>& synchro_active_waiting)
610 : {
611 : // sck out addr occ stage tsk id sck id
612 80 : std::vector<std::tuple<runtime::Socket*, size_t, size_t, size_t, size_t>> out_sck_orphans;
613 :
614 : // for all the stages in the pipeline
615 282 : for (size_t sta = 0; sta < this->stages.size() - 1; sta++)
616 : {
617 : // ------------------------------------------------------------------------------------------------------------
618 : // --------------------------------------------------------------- collect orphan output sockets in stage 'sta'
619 : // ------------------------------------------------------------------------------------------------------------
620 202 : std::vector<std::vector<runtime::Task*>> tasks_per_threads = this->stages[sta]->get_tasks_per_threads();
621 : // for all the threads in the current stage
622 : // for (size_t t = 0; t < tasks_per_threads.size(); t++)
623 202 : size_t t = 0;
624 : {
625 : // for all the tasks in the stage
626 746 : for (size_t tsk_id = 0; tsk_id < tasks_per_threads[t].size(); tsk_id++)
627 : {
628 544 : auto tsk = tasks_per_threads[t][tsk_id];
629 : // for all the sockets of the tasks
630 1914 : for (size_t sck_id = 0; sck_id < tsk->sockets.size(); sck_id++)
631 : {
632 1370 : auto sck = tsk->sockets[sck_id];
633 : // if the current socket is an output or forward socket type
634 1370 : if (sck->get_type() == socket_t::SOUT || sck->get_type() == socket_t::SFWD)
635 : {
636 : // for all the bounded sockets to the current socket
637 1788 : for (auto bsck : sck->get_bound_sockets())
638 : {
639 : // check if the task of the bounded socket is not in the current stage
640 683 : if (std::find(tasks_per_threads[t].begin(),
641 683 : tasks_per_threads[t].end(),
642 1366 : &bsck->get_task()) == tasks_per_threads[t].end())
643 : {
644 : // check the position of the socket in the orphans
645 331 : size_t pos = 0;
646 904 : for (; pos < out_sck_orphans.size(); pos++)
647 616 : if (std::get<0>(out_sck_orphans[pos]) == sck.get()) break;
648 :
649 331 : if (pos == out_sck_orphans.size())
650 288 : out_sck_orphans.push_back(std::make_tuple(sck.get(), 1, sta, tsk_id, sck_id));
651 : else
652 43 : std::get<1>(out_sck_orphans[pos])++;
653 : }
654 : }
655 : }
656 1370 : }
657 : }
658 : }
659 :
660 : // ------------------------------------------------------------------------------------------------------------
661 : // -------------------------------------- collect orphan input sockets in stage 'sta +1' and create connections
662 : // ------------------------------------------------------------------------------------------------------------
663 202 : tasks_per_threads = this->stages[sta + 1]->get_tasks_per_threads();
664 : // for all the threads in the current stage
665 : // for (size_t t = 0; t < tasks_per_threads.size(); t++)
666 : {
667 : // for all the tasks in the stage
668 791 : for (size_t tsk_id = 0; tsk_id < tasks_per_threads[t].size(); tsk_id++)
669 : {
670 589 : auto tsk = tasks_per_threads[t][tsk_id];
671 : // ----------------------------------------- manage socket to socket bindings (with fake input sockets)
672 : // for all the sockets of the tasks
673 2056 : for (size_t sck_id = 0; sck_id < tsk->sockets.size(); sck_id++)
674 : {
675 1467 : auto sck = tsk->sockets[sck_id];
676 : // if the current socket is an input or forward socket type
677 1467 : if (sck->get_type() == socket_t::SIN || sck->get_type() == socket_t::SFWD)
678 : {
679 596 : runtime::Socket* bsck = nullptr;
680 : try
681 : {
682 : // get output bounded socket
683 596 : bsck = &sck->get_bound_socket(); // can throw if there is no bounded socket
684 : }
685 0 : catch (const std::exception&)
686 : {
687 0 : }
688 596 : if (bsck != nullptr)
689 : {
690 : // check if the task of the bounded socket is not in the current stage
691 596 : if (std::find(tasks_per_threads[t].begin(),
692 596 : tasks_per_threads[t].end(),
693 1192 : &bsck->get_task()) == tasks_per_threads[t].end())
694 : {
695 : // check the position of the bounded socket in the orphans
696 305 : size_t pos = 0;
697 787 : for (; pos < out_sck_orphans.size(); pos++)
698 787 : if (std::get<0>(out_sck_orphans[pos]) == bsck) break;
699 :
700 305 : if (pos < out_sck_orphans.size())
701 : {
702 305 : auto sck_out = std::get<0>(out_sck_orphans[pos]);
703 305 : auto sck_in = sck.get();
704 610 : auto unbind_sout_pos = std::distance(sck_out->get_bound_sockets().begin(),
705 305 : std::find(sck_out->get_bound_sockets().begin(),
706 305 : sck_out->get_bound_sockets().end(),
707 305 : sck_in));
708 305 : this->sck_orphan_binds.push_back(
709 305 : std::make_pair(std::make_tuple(std::get<0>(out_sck_orphans[pos]),
710 305 : std::get<2>(out_sck_orphans[pos]),
711 305 : std::get<3>(out_sck_orphans[pos]),
712 305 : std::get<4>(out_sck_orphans[pos]),
713 : unbind_sout_pos),
714 610 : std::make_tuple(sck_in, sta + 1, tsk_id, sck_id, nullptr)));
715 : }
716 : }
717 : }
718 : }
719 1467 : }
720 : // ------------------------------------------- manage socket to task bindings (with fake input sockets)
721 717 : for (size_t sck_id = 0; sck_id < tsk->fake_input_sockets.size(); sck_id++)
722 : {
723 128 : auto sck = tsk->fake_input_sockets[sck_id];
724 128 : runtime::Socket* bsck = nullptr;
725 : try
726 : {
727 : // get output bounded socket
728 128 : bsck = &sck->get_bound_socket(); // can throw if there is no bounded socket
729 : }
730 0 : catch (const std::exception&)
731 : {
732 0 : }
733 128 : if (bsck != nullptr)
734 : {
735 : // check if the task of the bounded socket is not in the current stage
736 128 : if (std::find(tasks_per_threads[t].begin(), tasks_per_threads[t].end(), &bsck->get_task()) ==
737 256 : tasks_per_threads[t].end())
738 : {
739 : // check the position of the bounded socket in the orphans
740 26 : size_t pos = 0;
741 117 : for (; pos < out_sck_orphans.size(); pos++)
742 117 : if (std::get<0>(out_sck_orphans[pos]) == bsck) break;
743 :
744 26 : if (pos < out_sck_orphans.size())
745 : {
746 26 : auto sck_out = std::get<0>(out_sck_orphans[pos]);
747 26 : auto sck_in = sck.get();
748 52 : auto unbind_sout_pos = std::distance(sck_out->get_bound_sockets().begin(),
749 26 : std::find(sck_out->get_bound_sockets().begin(),
750 26 : sck_out->get_bound_sockets().end(),
751 26 : sck_in));
752 26 : this->sck_orphan_binds.push_back(
753 26 : std::make_pair(std::make_tuple(std::get<0>(out_sck_orphans[pos]),
754 26 : std::get<2>(out_sck_orphans[pos]),
755 26 : std::get<3>(out_sck_orphans[pos]),
756 26 : std::get<4>(out_sck_orphans[pos]),
757 : unbind_sout_pos),
758 52 : std::make_tuple(nullptr, sta + 1, tsk_id, sck_id, tsk)));
759 : }
760 : }
761 : }
762 128 : }
763 : // ----------------------------------------------------------------------------------------------------
764 : }
765 : }
766 202 : }
767 :
768 : // ----------------------------------------------------------------------------------------------------------------
769 : // ----------------------------------------------------------------------------------------------- prints for debug
770 : // ----------------------------------------------------------------------------------------------------------------
771 : // std::cout << "Orphan output sockets list:" << std::endl;
772 : // for (auto &sck : out_sck_orphans)
773 : // {
774 : // auto sck_out_name = std::get<0>(sck)->get_name();
775 : // auto tsk_out_name = std::get<0>(sck)->get_task().get_name();
776 : // auto sck_out_occ = std::get<1>(sck);
777 : // auto tsk_out_sta = std::get<2>(sck);
778 : // auto tsk_out_id = std::get<3>(sck);
779 : // auto sck_out_id = std::get<4>(sck);
780 :
781 : // std::cout << " " << tsk_out_name << "[" << sck_out_name << "] (stage " << tsk_out_sta << ", " << sck_out_occ
782 : // << " occurrences, tsk id = " << tsk_out_id << ", sck id = " << sck_out_id << ")" << std::endl;
783 : // }
784 :
785 : // std::cout << std::endl << "Detected socket binds:" << std::endl;
786 : // for (auto &bind : this->sck_orphan_binds)
787 : // {
788 : // auto sck_out_name = std::get<0>(bind.first)->get_name();
789 : // auto tsk_out_name = std::get<0>(bind.first)->get_task().get_name();
790 : // auto tsk_out_sta = std::get<1>(bind.first);
791 : // auto tsk_out_id = std::get<2>(bind.first);
792 : // auto sck_out_id = std::get<3>(bind.first);
793 : // auto sck_out_ubp = std::get<4>(bind.first);
794 :
795 : // auto sck_in_name = std::get<0>(bind.second)->get_name();
796 : // auto tsk_in_name = std::get<0>(bind.second)->get_task().get_name();
797 : // auto tsk_in_sta = std::get<1>(bind.second);
798 : // auto tsk_in_id = std::get<2>(bind.second);
799 : // auto sck_in_id = std::get<3>(bind.second);
800 :
801 : // std::cout << " " << tsk_out_name << "[" << sck_out_name << "] (stage " << tsk_out_sta << ", tsk id = "
802 : // << tsk_out_id << ", sck id = " << sck_out_id << ", ubp = " << sck_out_ubp << ")" << " -> "
803 : // << tsk_in_name << "[" << sck_in_name << "] (stage " << tsk_in_sta << ", tsk id = "
804 : // << tsk_in_id << ", sck id = " << sck_in_id << ")" << std::endl;
805 : // }
806 :
807 : // ----------------------------------------------------------------------------------------------------------------
808 : // ------------------------------------------------------------------------------------------------ create adaptors
809 : // ----------------------------------------------------------------------------------------------------------------
810 80 : auto sck_orphan_binds_cpy = this->sck_orphan_binds;
811 80 : module::Adaptor* adp = nullptr;
812 80 : std::map<runtime::Socket*, size_t> sck_to_adp_sck_id;
813 362 : for (size_t sta = 0; sta < this->stages.size(); sta++)
814 : {
815 282 : const auto n_threads = this->stages[sta]->get_n_threads();
816 282 : std::vector<std::vector<runtime::Task*>> tasks_per_threads = this->stages[sta]->get_tasks_per_threads();
817 :
818 : // ------------------------------------------------------------------------------------------------------------
819 : // ----------------------------------------------------------------------------------------------- pull adaptor
820 : // ------------------------------------------------------------------------------------------------------------
821 282 : if (sta > 0)
822 : {
823 202 : assert(adp != nullptr);
824 : // sck out addr stage tsk id sck id unbind_pos
825 : std::vector<std::pair<std::tuple<runtime::Socket*, size_t, size_t, size_t, size_t>,
826 : // sck in addr stage tsk id sck id tsk in addr
827 : std::tuple<runtime::Socket*, size_t, size_t, size_t, runtime::Task*>>>
828 202 : sck_orphan_binds_new;
829 :
830 202 : auto n_threads_prev_sta = this->stages[sta - 1]->get_n_threads();
831 2300 : for (size_t t = 0; t < n_threads; t++)
832 : {
833 2098 : module::Adaptor* cur_adp = (t == 0) ? adp : adp->clone();
834 6294 : for (auto& t : cur_adp->tasks)
835 4196 : t->set_fast(true);
836 2098 : if (t > 0)
837 : {
838 1896 : this->adaptors[sta - 1].second.push_back(std::unique_ptr<module::Adaptor>(cur_adp));
839 1896 : cur_adp->set_custom_name((n_threads_prev_sta == 1 ? "Adp_1_to_n_" : "Adp_n_to_1_") +
840 3792 : std::to_string(sta - 1));
841 : }
842 :
843 2098 : auto task_pull = n_threads_prev_sta == 1 ? &(*cur_adp)[(int)module::adp::tsk::pull_n]
844 88 : : &(*cur_adp)[(int)module::adp::tsk::pull_1];
845 :
846 2098 : sck_orphan_binds_new.clear();
847 12883 : for (auto& bind : sck_orphan_binds_cpy)
848 : {
849 10785 : auto tsk_out_sta = std::get<1>(bind.first);
850 10785 : if (tsk_out_sta < sta)
851 : {
852 5856 : auto tsk_in_sta = std::get<1>(bind.second);
853 5856 : if (tsk_in_sta == sta)
854 : {
855 3131 : auto sck_out_ptr = std::get<0>(bind.first);
856 3131 : auto priority = std::get<4>(bind.first);
857 3131 : auto tsk_in_id = std::get<2>(bind.second);
858 3131 : auto sck_in_id = std::get<3>(bind.second);
859 3131 : runtime::Socket* sck_in = nullptr;
860 3131 : runtime::Task* tsk_in = nullptr;
861 3131 : if (std::get<0>(bind.second) != nullptr) // if socket to socket binding
862 2536 : sck_in = tasks_per_threads[t][tsk_in_id]->sockets[sck_in_id].get();
863 : else // if socket to task binding
864 595 : tsk_in = tasks_per_threads[t][tsk_in_id];
865 3131 : this->adaptors_binds.push_back(std::make_tuple(
866 6262 : task_pull->sockets[sck_to_adp_sck_id[sck_out_ptr]].get(), sck_in, priority, tsk_in));
867 : }
868 : else
869 2725 : sck_orphan_binds_new.push_back(bind);
870 : }
871 : else
872 4929 : sck_orphan_binds_new.push_back(bind);
873 : }
874 :
875 2098 : if (t > 0) this->stages[sta]->all_modules[t].push_back(cur_adp);
876 : }
877 202 : this->saved_firsts_tasks_id[sta] = this->stages[sta]->firsts_tasks_id;
878 202 : sck_orphan_binds_cpy = sck_orphan_binds_new;
879 202 : }
880 :
881 : // ------------------------------------------------------------------------------------------------------------
882 : // ----------------------------------------------------------------------------------------------- push adaptor
883 : // ------------------------------------------------------------------------------------------------------------
884 282 : std::map<runtime::Socket*, size_t> sck_to_adp_sck_id_new;
885 282 : if (sta < this->stages.size() - 1)
886 : {
887 202 : std::vector<size_t> adp_n_elmts;
888 202 : std::vector<std::type_index> adp_datatype;
889 202 : size_t adp_buffer_size = synchro_buffer_sizes.size() ? synchro_buffer_sizes[sta] : 1;
890 202 : bool adp_active_waiting = synchro_active_waiting.size() ? synchro_active_waiting[sta] : false;
891 202 : size_t adp_n_frames = 1;
892 :
893 : // a map to remember if a passed socket points already to the same memory space
894 202 : std::map<void*, size_t> fwd_source;
895 :
896 202 : std::vector<runtime::Socket*> passed_scks_out;
897 909 : for (auto& bind : sck_orphan_binds_cpy)
898 : {
899 707 : auto tsk_out_sta = std::get<1>(bind.first);
900 707 : if (tsk_out_sta <= sta)
901 : {
902 458 : auto sck_out = std::get<0>(bind.first);
903 458 : if (std::find(passed_scks_out.begin(), passed_scks_out.end(), sck_out) == passed_scks_out.end())
904 : {
905 : // avoid the creation of new adaptor sockets for forward sockets pointing to the same memory
906 : // space
907 406 : auto sck_out_dptr = (void*)sck_out->get_dataptr<int8_t>();
908 406 : assert(sck_out_dptr != nullptr);
909 406 : if (fwd_source.find(sck_out_dptr) == fwd_source.end())
910 : {
911 349 : fwd_source[sck_out_dptr] = 1;
912 349 : adp_n_frames = sck_out->get_task().get_module().get_n_frames();
913 349 : adp_n_elmts.push_back(sck_out->get_n_elmts() / adp_n_frames);
914 349 : adp_datatype.push_back(sck_out->get_datatype());
915 : }
916 406 : passed_scks_out.push_back(sck_out);
917 : }
918 : }
919 : }
920 202 : passed_scks_out.clear();
921 :
922 : // allocate the adaptor for the first thread
923 202 : if (n_threads == 1)
924 114 : adp = new module::Adaptor_1_to_n(adp_n_elmts, adp_datatype, adp_buffer_size, adp_active_waiting);
925 : else
926 88 : adp = new module::Adaptor_n_to_1(adp_n_elmts, adp_datatype, adp_buffer_size, adp_active_waiting);
927 202 : adp->set_n_frames(adp_n_frames);
928 :
929 2300 : for (size_t t = 0; t < n_threads; t++)
930 : {
931 2098 : module::Adaptor* cur_adp = (t == 0) ? adp : adp->clone();
932 2098 : cur_adp->set_custom_name((n_threads == 1 ? "Adp_1_to_n_" : "Adp_n_to_1_") + std::to_string(sta));
933 2098 : this->adaptors[sta].first.push_back(std::unique_ptr<module::Adaptor>(cur_adp));
934 2098 : auto task_push = n_threads == 1 ? &(*cur_adp)[(int)module::adp::tsk::push_1]
935 1984 : : &(*cur_adp)[(int)module::adp::tsk::push_n];
936 :
937 2098 : std::map<void*, size_t> fwd_source;
938 2098 : sck_to_adp_sck_id_new.clear();
939 2098 : size_t adp_sck_id = 0;
940 10083 : for (auto& bind : sck_orphan_binds_cpy) // bind standard task to last adaptor
941 : {
942 7985 : auto tsk_out_sta = std::get<1>(bind.first);
943 :
944 7985 : if (tsk_out_sta <= sta)
945 : {
946 6836 : auto sck_out_ptr = std::get<0>(bind.first);
947 6836 : auto sck_out_dptr = (void*)sck_out_ptr->get_dataptr<int8_t>();
948 6836 : assert(sck_out_dptr != nullptr);
949 :
950 6836 : if (std::find(passed_scks_out.begin(),
951 : passed_scks_out.end(),
952 19707 : sck_out_ptr) == passed_scks_out.end() &&
953 12871 : fwd_source.find(sck_out_dptr) == fwd_source.end()) // <= the latest condition is here to
954 : // avoid to bind adaptor sockets two
955 : // times the same memory space
956 : // (usefull in the case of multiple
957 : // fwd sockets pointing to the same
958 : // memory address)
959 : {
960 4838 : if (tsk_out_sta == sta)
961 : {
962 3283 : auto tsk_out_id = std::get<2>(bind.first);
963 3283 : auto sck_out_id = std::get<3>(bind.first);
964 3283 : auto sck_out = tasks_per_threads[t][tsk_out_id]->sockets[sck_out_id];
965 3283 : auto priority = std::get<4>(bind.first);
966 3283 : sck_to_adp_sck_id_new[sck_out_ptr] = adp_sck_id;
967 3283 : this->adaptors_binds.push_back(
968 3283 : std::make_tuple(sck_out.get(),
969 3283 : task_push->sockets[adp_sck_id++].get(),
970 : priority,
971 3283 : nullptr)); // <= only socket to socket binding is possible here
972 3283 : }
973 : else // if (tsk_out_sta < sta) // bind prev. adaptor to last adaptor
974 : {
975 1555 : auto n_threads_prev_sta = this->stages[sta - 1]->get_n_threads();
976 1555 : auto tsk_out_id = (n_threads_prev_sta == 1) ? (size_t)module::adp::tsk::pull_n
977 : : (size_t)module::adp::tsk::pull_1;
978 1555 : auto sck_out_id = sck_to_adp_sck_id[sck_out_ptr];
979 1555 : sck_to_adp_sck_id_new[sck_out_ptr] = adp_sck_id;
980 : auto adp_prev =
981 1555 : t == 0 ? this->adaptors[sta - 1].first[0] : this->adaptors[sta - 1].second[t - 1];
982 1555 : auto sck_out = (*adp_prev)[tsk_out_id].sockets[sck_out_id];
983 1555 : auto priority = std::get<4>(bind.first);
984 1555 : this->adaptors_binds.push_back(
985 1555 : std::make_tuple(sck_out.get(),
986 1555 : task_push->sockets[adp_sck_id++].get(),
987 : priority,
988 1555 : nullptr)); // <= only socket to socket binding is possible here
989 1555 : }
990 :
991 4838 : fwd_source[sck_out_dptr] = 1; // remember that this memory space has been connected to the
992 : // adaptor once
993 4838 : passed_scks_out.push_back(sck_out_ptr);
994 : }
995 : }
996 : }
997 2098 : passed_scks_out.clear();
998 2098 : }
999 202 : this->saved_lasts_tasks_id[sta] = this->stages[sta]->lasts_tasks_id;
1000 202 : }
1001 282 : sck_to_adp_sck_id = sck_to_adp_sck_id_new;
1002 282 : }
1003 80 : }
1004 :
1005 : void
1006 240 : Pipeline::bind_adaptors()
1007 : {
1008 240 : this->_bind_adaptors(true);
1009 240 : }
1010 :
1011 : void
1012 240 : Pipeline::_bind_adaptors(const bool bind_adaptors)
1013 : {
1014 240 : if (!this->bound_adaptors)
1015 : {
1016 1086 : for (size_t sta = 0; sta < this->stages.size(); sta++)
1017 : {
1018 846 : const auto n_threads = this->stages[sta]->get_n_threads();
1019 :
1020 : // --------------------------------------------------------------------------------------------------------
1021 : // ------------------------------------------------------------------------------------------- pull adaptor
1022 : // --------------------------------------------------------------------------------------------------------
1023 846 : if (sta > 0)
1024 : {
1025 606 : auto n_threads_prev_sta = this->stages[sta - 1]->get_n_threads();
1026 6900 : for (size_t t = 0; t < n_threads; t++)
1027 : {
1028 : module::Adaptor* cur_adp =
1029 6294 : t > 0 ? adaptors[sta - 1].second[t - 1].get() : adaptors[sta - 1].first[0].get();
1030 :
1031 6294 : if (t > 0 || sta == this->stages.size() - 1) // add the adaptor to the current stage
1032 5916 : this->stages[sta]->all_modules[t].push_back(cur_adp);
1033 :
1034 6294 : auto task_pull = n_threads_prev_sta == 1 ? &(*cur_adp)[(int)module::adp::tsk::pull_n]
1035 264 : : &(*cur_adp)[(int)module::adp::tsk::pull_1];
1036 :
1037 6294 : auto ss = this->stages[sta]->sequences[t]->get_contents();
1038 6294 : assert(ss != nullptr);
1039 6294 : ss->tasks.insert(ss->tasks.begin(), task_pull);
1040 6294 : ss->processes.insert(ss->processes.begin(),
1041 0 : [task_pull]() -> const int*
1042 : {
1043 0 : task_pull->exec();
1044 0 : const int* status = task_pull->sockets.back()->get_dataptr<const int>();
1045 0 : return status;
1046 : });
1047 6294 : this->stages[sta]->update_tasks_id(t);
1048 : }
1049 606 : this->stages[sta]->firsts_tasks_id.clear();
1050 606 : this->stages[sta]->firsts_tasks_id.push_back(0);
1051 606 : this->stages[sta]->n_tasks++;
1052 : }
1053 :
1054 : // --------------------------------------------------------------------------------------------------------
1055 : // ------------------------------------------------------------------------------------------- push adaptor
1056 : // --------------------------------------------------------------------------------------------------------
1057 846 : if (sta < this->stages.size() - 1)
1058 : {
1059 606 : size_t last_task_id = 0;
1060 6900 : for (size_t t = 0; t < n_threads; t++)
1061 : {
1062 6294 : module::Adaptor* cur_adp = adaptors[sta].first[t].get();
1063 :
1064 : // add the adaptor to the current stage
1065 6294 : this->stages[sta]->all_modules[t].push_back(cur_adp);
1066 :
1067 6294 : auto task_push = n_threads == 1 ? &(*cur_adp)[(int)module::adp::tsk::push_1]
1068 5952 : : &(*cur_adp)[(int)module::adp::tsk::push_n];
1069 :
1070 6294 : auto ss = this->stages[sta]->get_last_subsequence(t);
1071 6294 : assert(ss != nullptr);
1072 6294 : ss->tasks.push_back(task_push);
1073 6294 : ss->processes.push_back(
1074 0 : [task_push]() -> const int*
1075 : {
1076 0 : task_push->exec();
1077 0 : const int* status = task_push->sockets.back()->get_dataptr<const int>();
1078 0 : return status;
1079 : });
1080 6294 : last_task_id = ss->tasks_id[ss->tasks_id.size() - 1] + 1;
1081 6294 : ss->tasks_id.push_back(last_task_id);
1082 : }
1083 606 : this->stages[sta]->lasts_tasks_id.clear();
1084 606 : this->stages[sta]->lasts_tasks_id.push_back(last_task_id);
1085 606 : this->stages[sta]->n_tasks++;
1086 : }
1087 846 : this->stages[sta]->update_firsts_and_lasts_tasks();
1088 : }
1089 :
1090 : // ------------------------------------------------------------------------------------------------------------
1091 : // ---------------------------------------------------------------------------------------------- bind adaptors
1092 : // ------------------------------------------------------------------------------------------------------------
1093 1233 : for (auto& bind : this->sck_orphan_binds)
1094 : {
1095 993 : auto sck_out = std::get<0>(bind.first);
1096 993 : auto sck_in = std::get<0>(bind.second);
1097 993 : if (sck_in != nullptr) // if socket to socket unbinding
1098 915 : sck_in->unbind(*sck_out);
1099 : else // if socket to task unbinding
1100 : {
1101 78 : auto tsk_in = std::get<4>(bind.second);
1102 78 : assert(tsk_in != nullptr);
1103 78 : tsk_in->unbind(*sck_out);
1104 : }
1105 : }
1106 :
1107 240 : if (bind_adaptors)
1108 : {
1109 24147 : for (auto& bind : this->adaptors_binds)
1110 : {
1111 23907 : auto sck_out = std::get<0>(bind);
1112 23907 : auto sck_in = std::get<1>(bind);
1113 23907 : auto priority = std::get<2>(bind);
1114 23907 : if (sck_in != nullptr) // if socket to socket binding
1115 22122 : sck_in->_bind(*sck_out, priority);
1116 : else // if socket to task binding
1117 : {
1118 1785 : auto tsk_in = std::get<3>(bind);
1119 1785 : assert(tsk_in != nullptr);
1120 1785 : tsk_in->_bind(*sck_out, priority);
1121 : }
1122 : }
1123 : }
1124 :
1125 240 : this->bound_adaptors = true;
1126 : }
1127 240 : }
1128 :
1129 : void
1130 160 : Pipeline::unbind_adaptors()
1131 : {
1132 160 : this->_unbind_adaptors(true);
1133 160 : }
1134 :
1135 : void
1136 320 : Pipeline::_unbind_adaptors(const bool bind_orphans)
1137 : {
1138 320 : if (this->bound_adaptors)
1139 : {
1140 1086 : for (size_t sta = 0; sta < this->stages.size(); sta++)
1141 : {
1142 846 : const auto n_threads = this->stages[sta]->get_n_threads();
1143 :
1144 : // --------------------------------------------------------------------------------------------------------
1145 : // ------------------------------------------------------------------------------------------- pull adaptor
1146 : // --------------------------------------------------------------------------------------------------------
1147 846 : if (sta > 0)
1148 : {
1149 6900 : for (size_t t = 0; t < n_threads; t++)
1150 : {
1151 6294 : if (t > 0 || sta == this->stages.size() - 1) // rm the adaptor to the current stage
1152 5916 : this->stages[sta]->all_modules[t].pop_back();
1153 :
1154 6294 : auto ss = this->stages[sta]->sequences[t]->get_contents();
1155 6294 : assert(ss != nullptr);
1156 6294 : ss->tasks.erase(ss->tasks.begin());
1157 6294 : ss->processes.erase(ss->processes.begin());
1158 6294 : this->stages[sta]->update_tasks_id(t);
1159 : }
1160 606 : this->stages[sta]->firsts_tasks_id = this->saved_firsts_tasks_id[sta];
1161 606 : this->stages[sta]->n_tasks--;
1162 : }
1163 :
1164 : // --------------------------------------------------------------------------------------------------------
1165 : // ------------------------------------------------------------------------------------------- push adaptor
1166 : // --------------------------------------------------------------------------------------------------------
1167 846 : if (sta < this->stages.size() - 1)
1168 : {
1169 6900 : for (size_t t = 0; t < n_threads; t++)
1170 : {
1171 : // rm the adaptor to the current stage
1172 6294 : this->stages[sta]->all_modules[t].pop_back();
1173 :
1174 6294 : auto ss = this->stages[sta]->get_last_subsequence(t);
1175 6294 : assert(ss != nullptr);
1176 6294 : ss->tasks.pop_back();
1177 6294 : ss->processes.pop_back();
1178 6294 : ss->tasks_id.pop_back();
1179 : }
1180 606 : this->stages[sta]->lasts_tasks_id = this->saved_lasts_tasks_id[sta];
1181 606 : this->stages[sta]->n_tasks--;
1182 : }
1183 846 : this->stages[sta]->update_firsts_and_lasts_tasks();
1184 : }
1185 :
1186 : // ------------------------------------------------------------------------------------------------------------
1187 : // -------------------------------------------------------------------------------------------- unbind adaptors
1188 : // ------------------------------------------------------------------------------------------------------------
1189 24147 : for (auto& bind : this->adaptors_binds)
1190 : {
1191 23907 : auto sck_out = std::get<0>(bind);
1192 23907 : auto sck_in = std::get<1>(bind);
1193 23907 : if (sck_in != nullptr) // if socket to socket unbinding
1194 22122 : sck_in->unbind(*sck_out);
1195 : else // if socket to task unbinding
1196 : {
1197 1785 : auto tsk_in = std::get<3>(bind);
1198 1785 : assert(tsk_in != nullptr);
1199 1785 : tsk_in->unbind(*sck_out);
1200 : }
1201 : }
1202 :
1203 240 : if (bind_orphans)
1204 : {
1205 411 : for (auto& bind : this->sck_orphan_binds)
1206 : {
1207 331 : auto sck_out = std::get<0>(bind.first);
1208 331 : auto priority = std::get<4>(bind.first);
1209 331 : auto sck_in = std::get<0>(bind.second);
1210 331 : if (sck_in != nullptr) // if socket to socket binding
1211 305 : sck_in->_bind(*sck_out, priority);
1212 : else // if socket to task binding
1213 : {
1214 26 : auto tsk_in = std::get<4>(bind.second);
1215 26 : assert(tsk_in != nullptr);
1216 26 : tsk_in->_bind(*sck_out, priority);
1217 : }
1218 : }
1219 : }
1220 :
1221 240 : this->bound_adaptors = false;
1222 : }
1223 320 : }
1224 :
1225 : void
1226 0 : Pipeline::exec(const std::vector<std::function<bool(const std::vector<const int*>&)>>& stop_conditions)
1227 : {
1228 0 : if (stop_conditions.size() != this->stages.size() && stop_conditions.size() != 1)
1229 : {
1230 0 : std::stringstream message;
1231 0 : message << "'stop_conditions.size()' has to be equal to 'stages.size()' or to 1 ('stop_conditions.size()' = "
1232 0 : << stop_conditions.size() << ", 'stages.size()' = " << stages.size() << ").";
1233 0 : throw tools::invalid_argument(__FILE__, __LINE__, __func__, message.str());
1234 0 : }
1235 :
1236 0 : if (!this->bound_adaptors)
1237 : {
1238 0 : std::stringstream message;
1239 0 : message << "'bound_adaptors' has to be true to execute the pipeline.";
1240 0 : throw tools::runtime_error(__FILE__, __LINE__, __func__, message.str());
1241 0 : }
1242 :
1243 0 : auto& stages = this->stages;
1244 0 : std::vector<std::thread> threads;
1245 0 : for (size_t s = 0; s < stages.size() - 1; s++)
1246 : {
1247 0 : const std::function<bool(const std::vector<const int*>&)>* stop_condition = nullptr;
1248 0 : if (stop_conditions.size() == this->stages.size()) stop_condition = &stop_conditions[s];
1249 :
1250 0 : threads.push_back(std::thread(
1251 0 : [&stages, s, stop_condition]()
1252 : {
1253 0 : if (stop_condition)
1254 0 : stages[s]->exec(*stop_condition);
1255 : else
1256 0 : stages[s]->exec();
1257 :
1258 : // send the signal to stop the next stage
1259 0 : const auto& tasks = stages[s + 1]->get_tasks_per_threads();
1260 0 : for (size_t th = 0; th < tasks.size(); th++)
1261 0 : for (size_t ta = 0; ta < tasks[th].size(); ta++)
1262 : {
1263 0 : auto m = dynamic_cast<module::Adaptor*>(&tasks[th][ta]->get_module());
1264 0 : if (m != nullptr)
1265 0 : if (tasks[th][ta]->get_name() == "pull_n" || tasks[th][ta]->get_name() == "pull_1")
1266 0 : m->cancel_waiting();
1267 : }
1268 0 : }));
1269 : }
1270 0 : stages[stages.size() - 1]->exec(stop_conditions[stop_conditions.size() - 1]);
1271 : // stop all the stages before
1272 0 : for (size_t notify_s = 0; notify_s < stages.size() - 1; notify_s++)
1273 0 : for (auto& m : stages[notify_s]->get_modules<tools::Interface_waiting>())
1274 0 : m->cancel_waiting();
1275 :
1276 0 : for (auto& t : threads)
1277 0 : t.join();
1278 :
1279 : // this is NOT made in the tools::Sequence::exec() to correctly flush the pipeline before restoring buffers
1280 : // initial configuration
1281 0 : for (auto& stage : this->stages)
1282 0 : if (stage->is_no_copy_mode())
1283 : {
1284 0 : stage->reset_no_copy_mode();
1285 0 : stage->gen_processes(false);
1286 : }
1287 :
1288 0 : for (auto& padps : this->adaptors)
1289 : {
1290 0 : for (auto& adp : padps.first)
1291 0 : adp->reset();
1292 0 : for (auto& adp : padps.second)
1293 0 : adp->reset();
1294 : }
1295 0 : }
1296 :
1297 : void
1298 80 : Pipeline::exec(const std::vector<std::function<bool()>>& stop_conditions)
1299 : {
1300 80 : if (stop_conditions.size() != this->stages.size() && stop_conditions.size() != 1)
1301 : {
1302 0 : std::stringstream message;
1303 0 : message << "'stop_conditions.size()' has to be equal to 'stages.size()' or to 1 ('stop_conditions.size()' = "
1304 0 : << stop_conditions.size() << ", 'stages.size()' = " << stages.size() << ").";
1305 0 : throw tools::invalid_argument(__FILE__, __LINE__, __func__, message.str());
1306 0 : }
1307 :
1308 80 : if (!this->bound_adaptors)
1309 : {
1310 0 : std::stringstream message;
1311 0 : message << "'bound_adaptors' has to be true to execute the pipeline.";
1312 0 : throw tools::runtime_error(__FILE__, __LINE__, __func__, message.str());
1313 0 : }
1314 :
1315 80 : auto& stages = this->stages;
1316 80 : std::vector<std::thread> threads;
1317 282 : for (size_t s = 0; s < stages.size() - 1; s++)
1318 : {
1319 202 : const std::function<bool()>* stop_condition = nullptr;
1320 202 : if (stop_conditions.size() == this->stages.size()) stop_condition = &stop_conditions[s];
1321 :
1322 202 : threads.push_back(std::thread(
1323 599 : [&stages, s, stop_condition]()
1324 : {
1325 202 : if (stop_condition)
1326 0 : stages[s]->exec(*stop_condition);
1327 : else
1328 202 : stages[s]->exec();
1329 :
1330 : // send the signal to stop the next stage
1331 195 : const auto& tasks = stages[s + 1]->get_tasks_per_threads();
1332 2296 : for (size_t th = 0; th < tasks.size(); th++)
1333 15591 : for (size_t ta = 0; ta < tasks[th].size(); ta++)
1334 : {
1335 13483 : auto m = dynamic_cast<module::Adaptor*>(&tasks[th][ta]->get_module());
1336 13485 : if (m != nullptr)
1337 4115 : if (tasks[th][ta]->get_name() == "pull_n" || tasks[th][ta]->get_name() == "pull_1")
1338 2098 : m->cancel_waiting();
1339 : }
1340 201 : }));
1341 : }
1342 80 : stages[stages.size() - 1]->exec(stop_conditions[stop_conditions.size() - 1]);
1343 : // stop all the stages before
1344 282 : for (size_t notify_s = 0; notify_s < stages.size() - 1; notify_s++)
1345 6092 : for (auto& m : stages[notify_s]->get_modules<tools::Interface_waiting>())
1346 6092 : m->cancel_waiting();
1347 :
1348 282 : for (auto& t : threads)
1349 202 : t.join();
1350 :
1351 : // this is NOT made in the tools::Sequence::exec() to correctly flush the pipeline before restoring buffers
1352 : // initial configuration
1353 362 : for (auto& stage : this->stages)
1354 282 : if (stage->is_no_copy_mode())
1355 : {
1356 282 : stage->reset_no_copy_mode();
1357 282 : stage->gen_processes(false);
1358 : }
1359 :
1360 282 : for (auto& padps : this->adaptors)
1361 : {
1362 2300 : for (auto& adp : padps.first)
1363 2098 : adp->reset();
1364 2098 : for (auto& adp : padps.second)
1365 1896 : adp->reset();
1366 : }
1367 80 : }
1368 :
1369 : void
1370 0 : Pipeline::exec(std::function<bool(const std::vector<const int*>&)> stop_condition)
1371 : {
1372 0 : this->exec(std::vector<std::function<bool(const std::vector<const int*>&)>>(1, stop_condition));
1373 0 : }
1374 :
1375 : void
1376 80 : Pipeline::exec(std::function<bool()> stop_condition)
1377 : {
1378 80 : this->exec(std::vector<std::function<bool()>>(1, stop_condition));
1379 80 : }
1380 :
1381 : void
1382 11 : Pipeline::exec()
1383 : {
1384 6873 : this->exec([]() { return false; });
1385 11 : }
1386 :
1387 : std::vector<std::vector<module::Module*>>
1388 0 : Pipeline::get_modules_per_threads() const
1389 : {
1390 0 : std::vector<std::vector<module::Module*>> modules_per_threads;
1391 0 : for (auto& stage : this->stages)
1392 : {
1393 0 : auto modules_per_threads_add = stage->get_modules_per_threads();
1394 0 : if (modules_per_threads_add.size() > modules_per_threads.size())
1395 0 : modules_per_threads.resize(modules_per_threads_add.size());
1396 :
1397 0 : for (size_t t = 0; t < modules_per_threads_add.size(); t++)
1398 0 : modules_per_threads[t].insert(
1399 0 : modules_per_threads[t].end(), modules_per_threads_add[t].begin(), modules_per_threads_add[t].end());
1400 0 : }
1401 0 : return modules_per_threads;
1402 0 : }
1403 :
1404 : std::vector<std::vector<module::Module*>>
1405 0 : Pipeline::get_modules_per_types() const
1406 : {
1407 0 : std::vector<std::vector<module::Module*>> modules_per_types;
1408 0 : for (auto& stage : this->stages)
1409 : {
1410 0 : auto modules_per_types_add = stage->get_modules_per_types();
1411 0 : modules_per_types.insert(modules_per_types.end(), modules_per_types_add.begin(), modules_per_types_add.end());
1412 0 : }
1413 0 : return modules_per_types;
1414 0 : }
1415 :
1416 : std::vector<std::vector<module::Module*>>
1417 0 : Pipeline::get_original_modules() const
1418 : {
1419 0 : return this->original_sequence.get_modules_per_types();
1420 : }
1421 :
1422 : std::vector<std::vector<runtime::Task*>>
1423 80 : Pipeline::get_tasks_per_threads() const
1424 : {
1425 80 : std::vector<std::vector<runtime::Task*>> tasks_per_threads;
1426 362 : for (auto& stage : this->stages)
1427 : {
1428 282 : auto tasks_per_threads_add = stage->get_tasks_per_threads();
1429 282 : if (tasks_per_threads_add.size() > tasks_per_threads.size())
1430 147 : tasks_per_threads.resize(tasks_per_threads_add.size());
1431 :
1432 2471 : for (size_t t = 0; t < tasks_per_threads_add.size(); t++)
1433 8756 : tasks_per_threads[t].insert(
1434 8756 : tasks_per_threads[t].end(), tasks_per_threads_add[t].begin(), tasks_per_threads_add[t].end());
1435 282 : }
1436 80 : return tasks_per_threads;
1437 0 : }
1438 :
1439 : std::vector<std::vector<runtime::Task*>>
1440 0 : Pipeline::get_tasks_per_types() const
1441 : {
1442 0 : std::vector<std::vector<runtime::Task*>> tasks_per_types;
1443 0 : for (auto& stage : this->stages)
1444 : {
1445 0 : auto tasks_per_types_add = stage->get_tasks_per_types();
1446 0 : tasks_per_types.insert(tasks_per_types.end(), tasks_per_types_add.begin(), tasks_per_types_add.end());
1447 0 : }
1448 0 : return tasks_per_types;
1449 0 : }
1450 :
1451 : void
1452 5 : Pipeline::export_dot(std::ostream& stream) const
1453 : {
1454 : std::function<void(tools::Digraph_node<Sub_sequence>*,
1455 : const size_t,
1456 : const std::string&,
1457 : std::ostream&,
1458 : std::vector<tools::Digraph_node<Sub_sequence>*>&)>
1459 22 : export_dot_subsequences_recursive = [&export_dot_subsequences_recursive,
1460 : this](tools::Digraph_node<Sub_sequence>* cur_node,
1461 : const size_t sta,
1462 : const std::string& tab,
1463 : std::ostream& stream,
1464 22 : std::vector<tools::Digraph_node<Sub_sequence>*>& already_parsed_nodes)
1465 : {
1466 44 : if (cur_node != nullptr &&
1467 44 : std::find(already_parsed_nodes.begin(), already_parsed_nodes.end(), cur_node) == already_parsed_nodes.end())
1468 : {
1469 22 : already_parsed_nodes.push_back(cur_node);
1470 44 : this->stages[sta]->export_dot_subsequence(cur_node->get_c()->tasks,
1471 22 : cur_node->get_c()->tasks_id,
1472 22 : cur_node->get_c()->type,
1473 44 : "Sub-sequence " + std::to_string(cur_node->get_c()->id) +
1474 88 : " (depth = " + std::to_string(cur_node->get_depth()) + ")",
1475 : tab,
1476 : stream);
1477 :
1478 22 : for (auto c : cur_node->get_children())
1479 0 : export_dot_subsequences_recursive(c, sta, tab, stream, already_parsed_nodes);
1480 : }
1481 27 : };
1482 :
1483 : std::function<void(tools::Digraph_node<Sub_sequence>*,
1484 : const size_t,
1485 : const std::string&,
1486 : std::ostream&,
1487 : std::vector<tools::Digraph_node<Sub_sequence>*>&)>
1488 : export_dot_connections_recursive =
1489 22 : [&export_dot_connections_recursive, this](tools::Digraph_node<Sub_sequence>* cur_node,
1490 : const size_t sta,
1491 : const std::string& tab,
1492 : std::ostream& stream,
1493 22 : std::vector<tools::Digraph_node<Sub_sequence>*>& already_parsed_nodes)
1494 : {
1495 44 : if (cur_node != nullptr &&
1496 44 : std::find(already_parsed_nodes.begin(), already_parsed_nodes.end(), cur_node) == already_parsed_nodes.end())
1497 : {
1498 22 : already_parsed_nodes.push_back(cur_node);
1499 22 : this->stages[sta]->export_dot_connections(cur_node->get_c()->tasks, tab, stream);
1500 :
1501 22 : for (auto c : cur_node->get_children())
1502 0 : export_dot_connections_recursive(c, sta, tab, stream, already_parsed_nodes);
1503 : }
1504 27 : };
1505 :
1506 5 : std::string tab = "\t";
1507 5 : stream << "digraph Pipeline {" << std::endl;
1508 5 : stream << tab << "compound=true;" << std::endl;
1509 :
1510 27 : for (size_t sta = 0; sta < this->stages.size(); sta++)
1511 : {
1512 22 : std::vector<tools::Digraph_node<Sub_sequence>*> already_parsed_nodes;
1513 22 : const auto n_threads = this->stages[sta]->get_n_threads();
1514 22 : stream << tab << "subgraph \"cluster_Stage " << sta << "\" {" << std::endl;
1515 22 : stream << tab << tab << "node [style=filled];" << std::endl;
1516 22 : export_dot_subsequences_recursive(this->stages[sta]->sequences[0], sta, tab, stream, already_parsed_nodes);
1517 22 : stream << tab << tab << "label=\"Pipeline stage " << sta << " (" << n_threads << " thread(s))\";" << std::endl;
1518 22 : std::string color = "blue";
1519 22 : stream << tab << tab << "color=" << color << ";" << std::endl;
1520 22 : stream << tab << "}" << std::endl;
1521 22 : }
1522 :
1523 27 : for (size_t sta = 0; sta < this->stages.size(); sta++)
1524 : {
1525 22 : std::vector<tools::Digraph_node<Sub_sequence>*> already_parsed_nodes;
1526 22 : export_dot_connections_recursive(this->stages[sta]->sequences[0], sta, tab, stream, already_parsed_nodes);
1527 22 : if (this->bound_adaptors)
1528 : {
1529 22 : if (sta > 0)
1530 : {
1531 17 : auto tsk1 = this->stages[sta - 1]->get_lasts_tasks()[0].back();
1532 17 : auto tsk2 = this->stages[sta + 0]->get_firsts_tasks()[0][0];
1533 :
1534 17 : auto sck1 = tsk1->sockets[0];
1535 17 : auto sck2 = tsk2->sockets[0];
1536 :
1537 17 : stream << tab << "\"" << +sck1.get() << "\" -> \"" << +sck2.get() << "\" [ltail=\"cluster_"
1538 17 : << +&tsk1->get_module() << "_" << +tsk1 << "\" lhead=\"cluster_" << +&tsk2->get_module() << "_"
1539 17 : << +tsk2 << "\" color=\"green\" style=\"dashed\"];" << std::endl;
1540 17 : }
1541 : }
1542 22 : }
1543 :
1544 5 : stream << "}" << std::endl;
1545 5 : }
1546 :
1547 : bool
1548 160 : Pipeline::is_bound_adaptors() const
1549 : {
1550 160 : return this->bound_adaptors;
1551 : }
1552 :
1553 : void
1554 0 : Pipeline::set_auto_stop(const bool auto_stop)
1555 : {
1556 0 : this->auto_stop = auto_stop;
1557 0 : for (auto stage : this->stages)
1558 0 : stage->set_auto_stop(auto_stop);
1559 0 : }
1560 :
1561 : bool
1562 0 : Pipeline::is_auto_stop() const
1563 : {
1564 0 : return this->auto_stop;
1565 : }
1566 :
1567 : size_t
1568 0 : Pipeline::get_n_frames() const
1569 : {
1570 0 : const auto n_frames = this->stages[0]->get_n_frames();
1571 :
1572 0 : for (auto& sta : this->stages)
1573 0 : if (sta->get_n_frames() != n_frames)
1574 : {
1575 0 : std::stringstream message;
1576 0 : message << "All the stages/sequences do not have the same 'n_frames' value ('sta->get_n_frames()' = "
1577 0 : << sta->get_n_frames() << ", 'n_frames' = " << n_frames << ").";
1578 0 : throw tools::runtime_error(__FILE__, __LINE__, __func__, message.str());
1579 0 : }
1580 :
1581 0 : return n_frames;
1582 : }
1583 :
1584 : void
1585 160 : Pipeline::set_n_frames(const size_t n_frames)
1586 : {
1587 160 : const auto save_bound_adaptors = this->is_bound_adaptors();
1588 160 : if (!save_bound_adaptors) this->bind_adaptors();
1589 160 : this->_unbind_adaptors(false);
1590 :
1591 : // set the new "n_frames" val in the sequences
1592 160 : std::vector<std::vector<std::pair<runtime::Socket*, runtime::Socket*>>> unbind_sockets(this->stages.size());
1593 160 : std::vector<std::vector<std::pair<runtime::Task*, runtime::Socket*>>> unbind_tasks(this->stages.size());
1594 160 : std::vector<bool> skip(this->stages.size());
1595 724 : for (size_t s = 0; s < this->stages.size(); s++)
1596 564 : skip[s] = this->stages[s]->get_n_frames() == n_frames;
1597 724 : for (size_t s = 0; s < this->stages.size(); s++)
1598 564 : if (!skip[s]) this->stages[s]->_set_n_frames_unbind(unbind_sockets[s], unbind_tasks[s]);
1599 724 : for (size_t s = 0; s < this->stages.size(); s++)
1600 564 : if (!skip[s]) this->stages[s]->_set_n_frames(n_frames);
1601 724 : for (size_t s = 0; s < this->stages.size(); s++)
1602 564 : if (!skip[s]) this->stages[s]->_set_n_frames_rebind(unbind_sockets[s], unbind_tasks[s]);
1603 :
1604 : // set the new "n_frames" val in the adaptors
1605 564 : for (auto& adps : this->adaptors)
1606 : {
1607 4600 : for (auto& adp : adps.first)
1608 4196 : adp->set_n_frames(n_frames);
1609 4196 : for (auto& adp : adps.second)
1610 3792 : adp->set_n_frames(n_frames);
1611 : }
1612 :
1613 : // bind orphans to complete the unbind of the adaptors
1614 822 : for (auto& bind : this->sck_orphan_binds)
1615 : {
1616 662 : auto sck_out = std::get<0>(bind.first);
1617 662 : auto priority = std::get<4>(bind.first);
1618 662 : auto sck_in = std::get<0>(bind.second);
1619 662 : if (sck_in != nullptr)
1620 610 : sck_in->_bind(*sck_out, priority);
1621 : else
1622 : {
1623 52 : auto tsk_in = std::get<4>(bind.second);
1624 52 : assert(tsk_in != nullptr);
1625 52 : tsk_in->_bind(*sck_out, priority);
1626 : }
1627 : }
1628 :
1629 160 : if (save_bound_adaptors) this->bind_adaptors();
1630 160 : }
|