Line data Source code
1 : #include <cassert>
2 : #include <fstream>
3 : #include <thread>
4 : #include <tuple>
5 : #include <utility>
6 :
7 : #include "Module/Stateful/Adaptor/Adaptor_m_to_n.hpp"
8 : #include "Runtime/Pipeline/Pipeline.hpp"
9 : #include "Tools/Exception/exception.hpp"
10 : #include "Tools/Interface/Interface_waiting.hpp"
11 : #include "Tools/Thread/Thread_pinning/Thread_pinning_utils.hpp"
12 : #include "Tools/Thread/Thread_pool/Standard/Thread_pool_standard.hpp"
13 :
14 : using namespace spu;
15 : using namespace spu::runtime;
16 :
17 : // Pipeline
18 : // ::Pipeline(const runtime::Task &first,
19 : // const runtime::Task &last,
20 : // const std::vector<std::pair<std::vector<const runtime::Task*>, std::vector<const runtime::Task*>>>
21 : // &sep_stages, const std::vector<size_t> &n_threads, const std::vector<size_t> &synchro_buffer_sizes, const
22 : // std::vector<bool> &synchro_active_waiting, const std::vector<bool> &thread_pinning, const
23 : // std::vector<std::vector<size_t>> &puids)
24 : // : original_sequence(first, last, 1),
25 : // stages(sep_stages.size()),
26 : // adaptors(sep_stages.size() -1),
27 : // saved_firsts_tasks_id(sep_stages.size()),
28 : // saved_lasts_tasks_id(sep_stages.size()),
29 : // bound_adaptors(false)
30 : // {
31 : // this->init<const runtime::Task>(first,
32 : // &last,
33 : // sep_stages,
34 : // n_threads,
35 : // synchro_buffer_sizes,
36 : // synchro_active_waiting,
37 : // thread_pinning,
38 : // puids);
39 : // }
40 :
41 : // Pipeline
42 : // ::Pipeline(const runtime::Task &first,
43 : // const std::vector<std::pair<std::vector<const runtime::Task*>, std::vector<const runtime::Task*>>>
44 : // &sep_stages, const std::vector<size_t> &n_threads, const std::vector<size_t> &synchro_buffer_sizes, const
45 : // std::vector<bool> &synchro_active_waiting, const std::vector<bool> &thread_pinning, const
46 : // std::vector<std::vector<size_t>> &puids)
47 : // : original_sequence(first, 1),
48 : // stages(sep_stages.size()),
49 : // adaptors(sep_stages.size() -1),
50 : // saved_firsts_tasks_id(sep_stages.size()),
51 : // saved_lasts_tasks_id(sep_stages.size()),
52 : // bound_adaptors(false)
53 : // {
54 : // const runtime::Task* last = nullptr;
55 : // this->init<const runtime::Task>(first,
56 : // last,
57 : // sep_stages,
58 : // n_threads,
59 : // synchro_buffer_sizes,
60 : // synchro_active_waiting,
61 : // thread_pinning,
62 : // puids);
63 : // }
64 :
65 16 : Pipeline
66 : ::Pipeline(const std::vector<runtime::Task*> &firsts,
67 : const std::vector<runtime::Task*> &lasts,
68 : const std::vector<std::tuple<std::vector<runtime::Task*>, std::vector<runtime::Task*>, std::vector<runtime::Task*>>> &sep_stages,
69 : const std::vector<size_t> &n_threads,
70 : const std::vector<size_t> &synchro_buffer_sizes,
71 : const std::vector<bool> &synchro_active_waiting,
72 : const std::vector<bool> &thread_pinning,
73 : const std::vector<std::vector<size_t>> &puids/*,
74 16 : const std::vector<bool> &tasks_inplace*/)
75 16 : : original_sequence(firsts, lasts, 1),
76 16 : stages(sep_stages.size()),
77 16 : adaptors(sep_stages.size() -1),
78 16 : saved_firsts_tasks_id(sep_stages.size()),
79 16 : saved_lasts_tasks_id(sep_stages.size()),
80 16 : bound_adaptors(false),
81 48 : auto_stop(true)
82 : {
83 16 : this->init<runtime::Task>(
84 : firsts, lasts, sep_stages, n_threads, synchro_buffer_sizes, synchro_active_waiting, thread_pinning, puids
85 : /*, tasks_inplace*/);
86 16 : }
87 :
88 23 : Pipeline
89 : ::Pipeline(const std::vector<runtime::Task*> &firsts,
90 : const std::vector<runtime::Task*> &lasts,
91 : const std::vector<std::pair<std::vector<runtime::Task*>, std::vector<runtime::Task*>>> &sep_stages,
92 : const std::vector<size_t> &n_threads,
93 : const std::vector<size_t> &synchro_buffer_sizes,
94 : const std::vector<bool> &synchro_active_waiting,
95 : const std::vector<bool> &thread_pinning,
96 : const std::vector<std::vector<size_t>> &puids/*,
97 23 : const std::vector<bool> &tasks_inplace*/)
98 23 : : original_sequence(firsts, lasts, 1),
99 23 : stages(sep_stages.size()),
100 23 : adaptors(sep_stages.size() -1),
101 23 : saved_firsts_tasks_id(sep_stages.size()),
102 23 : saved_lasts_tasks_id(sep_stages.size()),
103 23 : bound_adaptors(false),
104 69 : auto_stop(true)
105 : {
106 : std::vector<std::tuple<std::vector<runtime::Task*>, std::vector<runtime::Task*>, std::vector<runtime::Task*>>>
107 23 : sep_stages_bis;
108 105 : for (auto& sep_stage : sep_stages)
109 82 : sep_stages_bis.push_back(std::make_tuple(sep_stage.first, sep_stage.second, std::vector<runtime::Task*>()));
110 :
111 23 : this->init<runtime::Task>(firsts,
112 : lasts,
113 : sep_stages_bis,
114 : n_threads,
115 : synchro_buffer_sizes,
116 : synchro_active_waiting,
117 : thread_pinning, puids/*,
118 : tasks_inplace*/);
119 23 : }
120 :
121 16 : Pipeline
122 : ::Pipeline(const std::vector<runtime::Task*> &firsts,
123 : const std::vector<std::tuple<std::vector<runtime::Task*>, std::vector<runtime::Task*>, std::vector<runtime::Task*>>> &sep_stages,
124 : const std::vector<size_t> &n_threads,
125 : const std::vector<size_t> &synchro_buffer_sizes,
126 : const std::vector<bool> &synchro_active_waiting,
127 : const std::vector<bool> &thread_pinning,
128 : const std::vector<std::vector<size_t>> &puids/*,
129 16 : const std::vector<bool> &tasks_inplace*/)
130 : : Pipeline(firsts,
131 : {},
132 : sep_stages,
133 : n_threads,
134 : synchro_buffer_sizes,
135 : synchro_active_waiting,
136 : thread_pinning, puids/*,
137 16 : tasks_inplace*/)
138 : {
139 16 : }
140 :
141 23 : Pipeline
142 : ::Pipeline(const std::vector<runtime::Task*> &firsts,
143 : const std::vector<std::pair<std::vector<runtime::Task*>, std::vector<runtime::Task*>>> &sep_stages,
144 : const std::vector<size_t> &n_threads,
145 : const std::vector<size_t> &synchro_buffer_sizes,
146 : const std::vector<bool> &synchro_active_waiting,
147 : const std::vector<bool> &thread_pinning,
148 : const std::vector<std::vector<size_t>> &puids/*,
149 23 : const std::vector<bool> &tasks_inplace*/)
150 : : Pipeline(firsts,
151 : {},
152 : sep_stages,
153 : n_threads,
154 : synchro_buffer_sizes,
155 : synchro_active_waiting,
156 : thread_pinning, puids/*,
157 23 : tasks_inplace*/)
158 : {
159 23 : }
160 :
161 0 : Pipeline
162 : ::Pipeline(runtime::Task &first,
163 : runtime::Task &last,
164 : const std::vector<std::pair<std::vector<runtime::Task*>, std::vector<runtime::Task*>>> &sep_stages,
165 : const std::vector<size_t> &n_threads,
166 : const std::vector<size_t> &synchro_buffer_sizes,
167 : const std::vector<bool> &synchro_active_waiting,
168 : const std::vector<bool> &thread_pinning,
169 : const std::vector<std::vector<size_t>> &puids/*,
170 0 : const std::vector<bool> &tasks_inplace*/)
171 : : Pipeline({&first},
172 : {&last},
173 : sep_stages,
174 : n_threads,
175 : synchro_buffer_sizes,
176 : synchro_active_waiting,
177 : thread_pinning, puids/*,
178 0 : tasks_inplace*/)
179 : {
180 0 : }
181 :
182 23 : Pipeline
183 : ::Pipeline(runtime::Task &first,
184 : const std::vector<std::pair<std::vector<runtime::Task*>, std::vector<runtime::Task*>>> &sep_stages,
185 : const std::vector<size_t> &n_threads,
186 : const std::vector<size_t> &synchro_buffer_sizes,
187 : const std::vector<bool> &synchro_active_waiting,
188 : const std::vector<bool> &thread_pinning,
189 : const std::vector<std::vector<size_t>> &puids/*,
190 23 : const std::vector<bool> &tasks_inplace*/)
191 : : Pipeline({&first},
192 : sep_stages,
193 : n_threads,
194 : synchro_buffer_sizes,
195 : synchro_active_waiting,
196 : thread_pinning, puids/*,
197 23 : tasks_inplace*/)
198 : {
199 23 : }
200 :
201 : //========================== Pipeline constructors with new version thread pinning =====================================
202 9 : Pipeline
203 : ::Pipeline(const std::vector<runtime::Task*> &firsts,
204 : const std::vector<runtime::Task*> &lasts,
205 : const std::vector<std::tuple<std::vector<runtime::Task*>, std::vector<runtime::Task*>, std::vector<runtime::Task*>>> &sep_stages,
206 : const std::vector<size_t> &n_threads,
207 : const std::vector<size_t> &synchro_buffer_sizes,
208 : const std::vector<bool> &synchro_active_waiting,
209 : const std::vector<bool> &thread_pinning,
210 : const std::string &pipeline_pinning_policy/*,
211 9 : const std::vector<bool> &tasks_inplace*/)
212 9 : : original_sequence(firsts, lasts, 1),
213 9 : stages(sep_stages.size()),
214 9 : adaptors(sep_stages.size() -1),
215 9 : saved_firsts_tasks_id(sep_stages.size()),
216 9 : saved_lasts_tasks_id(sep_stages.size()),
217 9 : bound_adaptors(false),
218 27 : auto_stop(true)
219 : {
220 9 : this->init<runtime::Task>(firsts,
221 : lasts,
222 : sep_stages,
223 : n_threads,
224 : synchro_buffer_sizes,
225 : synchro_active_waiting,
226 : thread_pinning, {}, pipeline_pinning_policy/*,
227 : tasks_inplace*/);
228 9 : }
229 :
230 46 : Pipeline
231 : ::Pipeline(const std::vector<runtime::Task*> &firsts,
232 : const std::vector<runtime::Task*> &lasts,
233 : const std::vector<std::pair<std::vector<runtime::Task*>, std::vector<runtime::Task*>>> &sep_stages,
234 : const std::vector<size_t> &n_threads,
235 : const std::vector<size_t> &synchro_buffer_sizes,
236 : const std::vector<bool> &synchro_active_waiting,
237 : const std::vector<bool> &thread_pinning,
238 : const std::string &pipeline_pinning_policy/*,
239 46 : const std::vector<bool> &tasks_inplace*/)
240 46 : : original_sequence(firsts, lasts, 1),
241 46 : stages(sep_stages.size()),
242 46 : adaptors(sep_stages.size() -1),
243 46 : saved_firsts_tasks_id(sep_stages.size()),
244 46 : saved_lasts_tasks_id(sep_stages.size()),
245 46 : bound_adaptors(false),
246 138 : auto_stop(true)
247 : {
248 : std::vector<std::tuple<std::vector<runtime::Task*>, std::vector<runtime::Task*>, std::vector<runtime::Task*>>>
249 46 : sep_stages_bis;
250 214 : for (auto& sep_stage : sep_stages)
251 168 : sep_stages_bis.push_back(std::make_tuple(sep_stage.first, sep_stage.second, std::vector<runtime::Task*>()));
252 :
253 46 : this->init<runtime::Task>(firsts,
254 : lasts,
255 : sep_stages_bis,
256 : n_threads,
257 : synchro_buffer_sizes,
258 : synchro_active_waiting,
259 : thread_pinning, {}, pipeline_pinning_policy/*,
260 : tasks_inplace*/);
261 46 : }
262 :
263 9 : Pipeline
264 : ::Pipeline(const std::vector<runtime::Task*> &firsts,
265 : const std::vector<std::tuple<std::vector<runtime::Task*>, std::vector<runtime::Task*>, std::vector<runtime::Task*>>> &sep_stages,
266 : const std::vector<size_t> &n_threads,
267 : const std::vector<size_t> &synchro_buffer_sizes,
268 : const std::vector<bool> &synchro_active_waiting,
269 : const std::vector<bool> &thread_pinning,
270 : const std::string &pipeline_pinning_policy/*,
271 9 : const std::vector<bool> &tasks_inplace*/)
272 : : Pipeline(firsts,
273 : {},
274 : sep_stages,
275 : n_threads,
276 : synchro_buffer_sizes,
277 : synchro_active_waiting,
278 : thread_pinning, pipeline_pinning_policy/*,
279 9 : tasks_inplace*/)
280 : {
281 9 : }
282 :
283 35 : Pipeline
284 : ::Pipeline(const std::vector<runtime::Task*> &firsts,
285 : const std::vector<std::pair<std::vector<runtime::Task*>, std::vector<runtime::Task*>>> &sep_stages,
286 : const std::vector<size_t> &n_threads,
287 : const std::vector<size_t> &synchro_buffer_sizes,
288 : const std::vector<bool> &synchro_active_waiting,
289 : const std::vector<bool> &thread_pinning,
290 : const std::string &pipeline_pinning_policy/*,
291 35 : const std::vector<bool> &tasks_inplace*/)
292 : : Pipeline(firsts,
293 : {},
294 : sep_stages,
295 : n_threads,
296 : synchro_buffer_sizes,
297 : synchro_active_waiting,
298 : thread_pinning, pipeline_pinning_policy/*,
299 35 : tasks_inplace*/)
300 : {
301 35 : }
302 :
303 0 : Pipeline
304 : ::Pipeline(runtime::Task &first,
305 : runtime::Task &last,
306 : const std::vector<std::pair<std::vector<runtime::Task*>, std::vector<runtime::Task*>>> &sep_stages,
307 : const std::vector<size_t> &n_threads,
308 : const std::vector<size_t> &synchro_buffer_sizes,
309 : const std::vector<bool> &synchro_active_waiting,
310 : const std::vector<bool> &thread_pinning,
311 : const std::string &pipeline_pinning_policy/*,
312 0 : const std::vector<bool> &tasks_inplace*/)
313 : : Pipeline({&first},
314 : {&last},
315 : sep_stages,
316 : n_threads,
317 : synchro_buffer_sizes,
318 : synchro_active_waiting,
319 : thread_pinning, pipeline_pinning_policy/*,
320 0 : tasks_inplace*/)
321 : {
322 0 : }
323 :
324 35 : Pipeline
325 : ::Pipeline(runtime::Task &first,
326 : const std::vector<std::pair<std::vector<runtime::Task*>, std::vector<runtime::Task*>>> &sep_stages,
327 : const std::vector<size_t> &n_threads,
328 : const std::vector<size_t> &synchro_buffer_sizes,
329 : const std::vector<bool> &synchro_active_waiting,
330 : const std::vector<bool> &thread_pinning,
331 : const std::string &pipeline_pinning_policy/*,
332 35 : const std::vector<bool> &tasks_inplace*/)
333 : : Pipeline({&first},
334 : sep_stages,
335 : n_threads,
336 : synchro_buffer_sizes,
337 : synchro_active_waiting,
338 : thread_pinning,
339 : pipeline_pinning_policy/*,
340 35 : tasks_inplace*/)
341 : {
342 35 : }
343 :
344 179 : Pipeline::~Pipeline()
345 : {
346 94 : this->unbind_adaptors();
347 179 : }
348 :
349 : std::vector<Sequence*>
350 173 : Pipeline::get_stages()
351 : {
352 173 : std::vector<Sequence*> stages;
353 823 : for (auto& stage : this->stages)
354 650 : stages.push_back(stage.get());
355 173 : return stages;
356 0 : }
357 :
358 : Sequence&
359 0 : Pipeline::operator[](const size_t stage_id)
360 : {
361 0 : assert(stage_id < this->stages.size());
362 0 : return *this->stages[stage_id];
363 : }
364 :
365 : template<class TA>
366 : runtime::Sequence*
367 : create_sequence(const std::vector<TA*>& firsts,
368 : const std::vector<TA*>& lasts,
369 : const std::vector<TA*>& exclusions,
370 : const size_t& n_threads,
371 : const bool& thread_pinning,
372 : const std::vector<size_t>& puids,
373 : const bool& tasks_inplace)
374 : {
375 : throw tools::unimplemented_error(__FILE__, __LINE__, __func__);
376 : }
377 :
378 : template<>
379 : runtime::Sequence*
380 0 : create_sequence<const runtime::Task>(const std::vector<const runtime::Task*>& firsts,
381 : const std::vector<const runtime::Task*>& lasts,
382 : const std::vector<const runtime::Task*>& exclusions,
383 : const size_t& n_threads,
384 : const bool& thread_pinning,
385 : const std::vector<size_t>& puids,
386 : const bool& /*tasks_inplace*/)
387 : {
388 0 : return new runtime::Sequence(firsts, lasts, exclusions, n_threads, thread_pinning, puids);
389 : }
390 :
391 : template<>
392 : Sequence*
393 274 : create_sequence<runtime::Task>(const std::vector<runtime::Task*>& firsts,
394 : const std::vector<runtime::Task*>& lasts,
395 : const std::vector<runtime::Task*>& exclusions,
396 : const size_t& n_threads,
397 : const bool& thread_pinning,
398 : const std::vector<size_t>& puids,
399 : const bool& tasks_inplace)
400 : {
401 274 : return new runtime::Sequence(firsts, lasts, exclusions, n_threads, thread_pinning, puids, tasks_inplace);
402 : }
403 :
404 : // Init and sequence creation for second pinning version
405 : template<class TA>
406 : runtime::Sequence*
407 : create_sequence(const std::vector<TA*>& firsts,
408 : const std::vector<TA*>& lasts,
409 : const std::vector<TA*>& exclusions,
410 : const size_t& n_threads,
411 : const bool& thread_pinning,
412 : const std::string& pipeline_pinning_policy,
413 : const bool& tasks_inplace)
414 : {
415 : throw tools::unimplemented_error(__FILE__, __LINE__, __func__);
416 : }
417 :
418 : template<>
419 : runtime::Sequence*
420 0 : create_sequence<const runtime::Task>(const std::vector<const runtime::Task*>& firsts,
421 : const std::vector<const runtime::Task*>& lasts,
422 : const std::vector<const runtime::Task*>& exclusions,
423 : const size_t& n_threads,
424 : const bool& thread_pinning,
425 : const std::string& pipeline_pinning_policy,
426 : const bool& /*tasks_inplace*/)
427 : {
428 0 : return new runtime::Sequence(firsts, lasts, exclusions, n_threads, thread_pinning, pipeline_pinning_policy);
429 : }
430 :
431 : template<>
432 : Sequence*
433 62 : create_sequence<runtime::Task>(const std::vector<runtime::Task*>& firsts,
434 : const std::vector<runtime::Task*>& lasts,
435 : const std::vector<runtime::Task*>& exclusions,
436 : const size_t& n_threads,
437 : const bool& thread_pinning,
438 : const std::string& pipeline_pinning_policy,
439 : const bool& tasks_inplace)
440 : {
441 : return new runtime::Sequence(
442 62 : firsts, lasts, exclusions, n_threads, thread_pinning, pipeline_pinning_policy, tasks_inplace);
443 : }
444 :
445 : template <class TA>
446 94 : void Pipeline
447 : ::init(const std::vector<TA*> &/*firsts*/,
448 : const std::vector<TA*> &/*lasts*/,
449 : const std::vector<std::tuple<std::vector<TA*>,std::vector<TA*>,std::vector<TA*>>> &sep_stages,
450 : const std::vector<size_t> &n_threads,
451 : const std::vector<size_t> &synchro_buffer_sizes,
452 : const std::vector<bool> &synchro_active_waiting,
453 : const std::vector<bool> &thread_pinning,
454 : const std::vector<std::vector<size_t>> &puids,
455 : const std::string &pipeline_pinning_policy/*,
456 : const std::vector<bool> &tasks_inplace*/)
457 : {
458 94 : if (sep_stages.size() != n_threads.size() && n_threads.size() != 0)
459 : {
460 0 : std::stringstream message;
461 0 : message << "'n_threads.size()' has to be equal to 'sep_stages.size()' or equal to '0' ('n_threads.size()' = "
462 0 : << n_threads.size() << " , 'sep_stages.size()' = " << sep_stages.size() << ").";
463 0 : throw tools::invalid_argument(__FILE__, __LINE__, __func__, message.str());
464 0 : }
465 :
466 94 : if (sep_stages.size() != synchro_buffer_sizes.size() + 1 && synchro_buffer_sizes.size() != 0)
467 : {
468 0 : std::stringstream message;
469 : message << "'synchro_buffer_sizes.size()' has to be equal to 'sep_stages.size() -1' or equal to '0' "
470 0 : << "('synchro_buffer_sizes.size()' = " << synchro_buffer_sizes.size()
471 0 : << " , 'sep_stages.size()' = " << sep_stages.size() << ").";
472 0 : throw tools::invalid_argument(__FILE__, __LINE__, __func__, message.str());
473 0 : }
474 :
475 94 : if (sep_stages.size() != synchro_active_waiting.size() + 1 && synchro_active_waiting.size() != 0)
476 : {
477 0 : std::stringstream message;
478 : message << "'synchro_active_waiting.size()' has to be equal to 'sep_stages.size() -1' or equal to '0' "
479 0 : << "('synchro_active_waiting.size()' = " << synchro_active_waiting.size()
480 0 : << " , 'sep_stages.size()' = " << sep_stages.size() << ").";
481 0 : throw tools::invalid_argument(__FILE__, __LINE__, __func__, message.str());
482 0 : }
483 :
484 94 : if (sep_stages.size() != thread_pinning.size() && thread_pinning.size() != 0)
485 : {
486 0 : std::stringstream message;
487 : message << "'thread_pinning.size()' has to be equal to 'sep_stages.size()' or equal to '0' ("
488 0 : << "'thread_pinning.size()' = " << thread_pinning.size()
489 0 : << " , 'sep_stages.size()' = " << sep_stages.size() << ").";
490 0 : throw tools::invalid_argument(__FILE__, __LINE__, __func__, message.str());
491 0 : }
492 :
493 94 : if (sep_stages.size() != puids.size() && puids.size() != 0)
494 : {
495 0 : std::stringstream message;
496 0 : message << "'puids.size()' has to be equal to 'sep_stages.size()' or equal to '0' ('puids.size()' = "
497 0 : << puids.size() << " , 'sep_stages.size()' = " << sep_stages.size() << ").";
498 0 : throw tools::invalid_argument(__FILE__, __LINE__, __func__, message.str());
499 0 : }
500 :
501 : // if (sep_stages.size() != tasks_inplace.size() && tasks_inplace.size() != 0)
502 : // {
503 : // std::stringstream message;
504 : // message << "'tasks_inplace.size()' has to be equal to 'sep_stages.size()' or equal to '0' ('"
505 : // << "tasks_inplace.size()' = " << tasks_inplace.size() << " , 'sep_stages.size()' = "
506 : // << sep_stages.size() << ").";
507 : // throw tools::invalid_argument(__FILE__, __LINE__, __func__, message.str());
508 : // }
509 :
510 : // Creating a vector of pinning policies for each sequence
511 94 : std::vector<std::string> sequences_pinning_policies;
512 94 : if (!pipeline_pinning_policy.empty())
513 22 : sequences_pinning_policies =
514 : tools::Thread_pinning_utils::pipeline_parser_unpacker(pipeline_pinning_policy, sep_stages.size());
515 :
516 430 : for (size_t s = 0; s < sep_stages.size(); s++)
517 : {
518 336 : const std::vector<TA*>& stage_firsts = std::get<0>(sep_stages[s]);
519 336 : const std::vector<TA*>& stage_lasts = std::get<1>(sep_stages[s]);
520 336 : const std::vector<TA*>& stage_exclusions = std::get<2>(sep_stages[s]);
521 336 : const size_t stage_n_threads = n_threads.size() ? n_threads[s] : 1;
522 336 : const bool stage_thread_pinning = thread_pinning.size() ? thread_pinning[s] : false;
523 336 : const std::vector<size_t> stage_puids = puids.size() ? puids[s] : std::vector<size_t>();
524 398 : const std::string sequence_pinning_policy =
525 398 : sequences_pinning_policies.size() ? sequences_pinning_policies[s] : "";
526 336 : const bool stage_tasks_inplace = /*tasks_inplace.size() ? tasks_inplace[s] :*/ true;
527 : try
528 : {
529 336 : if (pipeline_pinning_policy.empty())
530 274 : this->stages[s].reset(create_sequence<TA>(stage_firsts,
531 : stage_lasts,
532 : stage_exclusions,
533 : stage_n_threads,
534 : stage_thread_pinning,
535 : stage_puids,
536 : stage_tasks_inplace));
537 : else
538 62 : this->stages[s].reset(create_sequence<TA>(stage_firsts,
539 : stage_lasts,
540 : stage_exclusions,
541 : stage_n_threads,
542 : stage_thread_pinning,
543 : sequence_pinning_policy,
544 : stage_tasks_inplace));
545 : }
546 0 : catch (const tools::control_flow_error& e)
547 : {
548 0 : std::stringstream message;
549 0 : message << "Invalid control flow error on stage " << s
550 0 : << " (perhaps a switcher's tasks were separated between different stages)." << std::endl
551 0 : << e.what();
552 0 : throw tools::control_flow_error(__FILE__, __LINE__, __func__, message.str());
553 0 : }
554 336 : this->stages[s]->is_part_of_pipeline = true;
555 : }
556 :
557 : // verify that the sequential sequence is equivalent to the pipeline sequence
558 94 : auto ref_tasks = this->original_sequence.get_tasks_per_threads()[0];
559 94 : auto cur_tasks = this->get_tasks_per_threads()[0];
560 :
561 94 : if (ref_tasks.size() != cur_tasks.size())
562 : {
563 0 : std::ofstream f1("dbg_ref_sequence.dot");
564 0 : this->original_sequence.export_dot(f1);
565 0 : std::ofstream f2("dbg_cur_pipeline.dot");
566 0 : this->export_dot(f2);
567 :
568 0 : std::stringstream message;
569 0 : message << "'ref_tasks.size()' has to be equal to 'cur_tasks.size()' ('ref_tasks.size()' = " << ref_tasks.size()
570 0 : << ", 'cur_tasks.size()' = " << cur_tasks.size() << ").";
571 0 : throw tools::invalid_argument(__FILE__, __LINE__, __func__, message.str());
572 0 : }
573 :
574 963 : for (size_t ta = 0; ta < cur_tasks.size(); ta++)
575 : {
576 869 : if (std::find(ref_tasks.begin(), ref_tasks.end(), cur_tasks[ta]) == ref_tasks.end())
577 : {
578 0 : std::ofstream f1("dbg_ref_sequence.dot");
579 0 : this->original_sequence.export_dot(f1);
580 0 : std::ofstream f2("dbg_cur_pipeline.dot");
581 0 : this->export_dot(f2);
582 :
583 0 : std::stringstream message;
584 0 : message << "'cur_tasks[ta]' is not contained in the 'ref_tasks' vector ('ta' = " << ta
585 0 : << ", 'cur_tasks[ta]' = " << +cur_tasks[ta]
586 0 : << ", 'cur_tasks[ta]->get_name()' = " << cur_tasks[ta]->get_name() << ").";
587 0 : throw tools::invalid_argument(__FILE__, __LINE__, __func__, message.str());
588 0 : }
589 : }
590 :
591 94 : this->create_adaptors(synchro_buffer_sizes, synchro_active_waiting);
592 94 : this->bind_adaptors();
593 :
594 94 : this->thread_pool.reset(new tools::Thread_pool_standard(this->stages.size() - 1));
595 94 : this->thread_pool->init(); // threads are spawned here
596 94 : }
597 :
598 : void
599 94 : Pipeline::create_adaptors(const std::vector<size_t>& synchro_buffer_sizes,
600 : const std::vector<bool>& synchro_active_waiting)
601 : {
602 : // sck out addr occ stage tsk id sck id
603 94 : std::vector<std::tuple<runtime::Socket*, size_t, size_t, size_t, size_t>> out_sck_orphans;
604 :
605 : // for all the stages in the pipeline
606 336 : for (size_t sta = 0; sta < this->stages.size() - 1; sta++)
607 : {
608 : // ------------------------------------------------------------------------------------------------------------
609 : // --------------------------------------------------------------- collect orphan output sockets in stage 'sta'
610 : // ------------------------------------------------------------------------------------------------------------
611 242 : std::vector<std::vector<runtime::Task*>> tasks_per_threads = this->stages[sta]->get_tasks_per_threads();
612 : // for all the threads in the current stage
613 : // for (size_t t = 0; t < tasks_per_threads.size(); t++)
614 242 : size_t t = 0;
615 : {
616 : // for all the tasks in the stage
617 901 : for (size_t tsk_id = 0; tsk_id < tasks_per_threads[t].size(); tsk_id++)
618 : {
619 659 : auto tsk = tasks_per_threads[t][tsk_id];
620 : // for all the sockets of the tasks
621 2350 : for (size_t sck_id = 0; sck_id < tsk->sockets.size(); sck_id++)
622 : {
623 1691 : auto sck = tsk->sockets[sck_id];
624 : // if the current socket is an output or forward socket type
625 1691 : if (sck->get_type() == socket_t::SOUT || sck->get_type() == socket_t::SFWD)
626 : {
627 : // for all the bounded sockets to the current socket
628 2151 : for (auto bsck : sck->get_bound_sockets())
629 : {
630 : // check if the task of the bounded socket is not in the current stage
631 807 : if (std::find(tasks_per_threads[t].begin(),
632 807 : tasks_per_threads[t].end(),
633 1614 : &bsck->get_task()) == tasks_per_threads[t].end())
634 : {
635 : // check the position of the socket in the orphans
636 379 : size_t pos = 0;
637 1018 : for (; pos < out_sck_orphans.size(); pos++)
638 682 : if (std::get<0>(out_sck_orphans[pos]) == sck.get()) break;
639 :
640 379 : if (pos == out_sck_orphans.size())
641 336 : out_sck_orphans.push_back(std::make_tuple(sck.get(), 1, sta, tsk_id, sck_id));
642 : else
643 43 : std::get<1>(out_sck_orphans[pos])++;
644 : }
645 : }
646 : }
647 1691 : }
648 : }
649 : }
650 :
651 : // ------------------------------------------------------------------------------------------------------------
652 : // -------------------------------------- collect orphan input sockets in stage 'sta +1' and create connections
653 : // ------------------------------------------------------------------------------------------------------------
654 242 : tasks_per_threads = this->stages[sta + 1]->get_tasks_per_threads();
655 : // for all the threads in the current stage
656 : // for (size_t t = 0; t < tasks_per_threads.size(); t++)
657 : {
658 : // for all the tasks in the stage
659 949 : for (size_t tsk_id = 0; tsk_id < tasks_per_threads[t].size(); tsk_id++)
660 : {
661 707 : auto tsk = tasks_per_threads[t][tsk_id];
662 : // ----------------------------------------- manage socket to socket bindings (with fake input sockets)
663 : // for all the sockets of the tasks
664 2504 : for (size_t sck_id = 0; sck_id < tsk->sockets.size(); sck_id++)
665 : {
666 1797 : auto sck = tsk->sockets[sck_id];
667 : // if the current socket is an input or forward socket type
668 1797 : if (sck->get_type() == socket_t::SIN || sck->get_type() == socket_t::SFWD)
669 : {
670 723 : runtime::Socket* bsck = nullptr;
671 : try
672 : {
673 : // get output bounded socket
674 723 : bsck = &sck->get_bound_socket(); // can throw if there is no bounded socket
675 : }
676 0 : catch (const std::exception&)
677 : {
678 0 : }
679 723 : if (bsck != nullptr)
680 : {
681 : // check if the task of the bounded socket is not in the current stage
682 723 : if (std::find(tasks_per_threads[t].begin(),
683 723 : tasks_per_threads[t].end(),
684 1446 : &bsck->get_task()) == tasks_per_threads[t].end())
685 : {
686 : // check the position of the bounded socket in the orphans
687 354 : size_t pos = 0;
688 907 : for (; pos < out_sck_orphans.size(); pos++)
689 907 : if (std::get<0>(out_sck_orphans[pos]) == bsck) break;
690 :
691 354 : if (pos < out_sck_orphans.size())
692 : {
693 354 : auto sck_out = std::get<0>(out_sck_orphans[pos]);
694 354 : auto sck_in = sck.get();
695 708 : auto unbind_sout_pos = std::distance(sck_out->get_bound_sockets().begin(),
696 354 : std::find(sck_out->get_bound_sockets().begin(),
697 354 : sck_out->get_bound_sockets().end(),
698 354 : sck_in));
699 354 : this->sck_orphan_binds.push_back(
700 354 : std::make_pair(std::make_tuple(std::get<0>(out_sck_orphans[pos]),
701 354 : std::get<2>(out_sck_orphans[pos]),
702 354 : std::get<3>(out_sck_orphans[pos]),
703 354 : std::get<4>(out_sck_orphans[pos]),
704 : unbind_sout_pos),
705 708 : std::make_tuple(sck_in, sta + 1, tsk_id, sck_id, nullptr)));
706 : }
707 : }
708 : }
709 : }
710 1797 : }
711 : // ------------------------------------------- manage socket to task bindings (with fake input sockets)
712 835 : for (size_t sck_id = 0; sck_id < tsk->fake_input_sockets.size(); sck_id++)
713 : {
714 128 : auto sck = tsk->fake_input_sockets[sck_id];
715 128 : runtime::Socket* bsck = nullptr;
716 : try
717 : {
718 : // get output bounded socket
719 128 : bsck = &sck->get_bound_socket(); // can throw if there is no bounded socket
720 : }
721 0 : catch (const std::exception&)
722 : {
723 0 : }
724 128 : if (bsck != nullptr)
725 : {
726 : // check if the task of the bounded socket is not in the current stage
727 128 : if (std::find(tasks_per_threads[t].begin(), tasks_per_threads[t].end(), &bsck->get_task()) ==
728 256 : tasks_per_threads[t].end())
729 : {
730 : // check the position of the bounded socket in the orphans
731 25 : size_t pos = 0;
732 111 : for (; pos < out_sck_orphans.size(); pos++)
733 111 : if (std::get<0>(out_sck_orphans[pos]) == bsck) break;
734 :
735 25 : if (pos < out_sck_orphans.size())
736 : {
737 25 : auto sck_out = std::get<0>(out_sck_orphans[pos]);
738 25 : auto sck_in = sck.get();
739 50 : auto unbind_sout_pos = std::distance(sck_out->get_bound_sockets().begin(),
740 25 : std::find(sck_out->get_bound_sockets().begin(),
741 25 : sck_out->get_bound_sockets().end(),
742 25 : sck_in));
743 25 : this->sck_orphan_binds.push_back(
744 25 : std::make_pair(std::make_tuple(std::get<0>(out_sck_orphans[pos]),
745 25 : std::get<2>(out_sck_orphans[pos]),
746 25 : std::get<3>(out_sck_orphans[pos]),
747 25 : std::get<4>(out_sck_orphans[pos]),
748 : unbind_sout_pos),
749 50 : std::make_tuple(nullptr, sta + 1, tsk_id, sck_id, tsk)));
750 : }
751 : }
752 : }
753 128 : }
754 : // ----------------------------------------------------------------------------------------------------
755 : }
756 : }
757 242 : }
758 :
759 : // ----------------------------------------------------------------------------------------------------------------
760 : // ----------------------------------------------------------------------------------------------- prints for debug
761 : // ----------------------------------------------------------------------------------------------------------------
762 : // std::cout << "Orphan output sockets list:" << std::endl;
763 : // for (auto &sck : out_sck_orphans)
764 : // {
765 : // auto sck_out_name = std::get<0>(sck)->get_name();
766 : // auto tsk_out_name = std::get<0>(sck)->get_task().get_name();
767 : // auto sck_out_occ = std::get<1>(sck);
768 : // auto tsk_out_sta = std::get<2>(sck);
769 : // auto tsk_out_id = std::get<3>(sck);
770 : // auto sck_out_id = std::get<4>(sck);
771 :
772 : // std::cout << " " << tsk_out_name << "[" << sck_out_name << "] (stage " << tsk_out_sta << ", " << sck_out_occ
773 : // << " occurrences, tsk id = " << tsk_out_id << ", sck id = " << sck_out_id << ")" << std::endl;
774 : // }
775 :
776 : // std::cout << std::endl << "Detected socket binds:" << std::endl;
777 : // for (auto &bind : this->sck_orphan_binds)
778 : // {
779 : // auto sck_out_name = std::get<0>(bind.first)->get_name();
780 : // auto tsk_out_name = std::get<0>(bind.first)->get_task().get_name();
781 : // auto tsk_out_sta = std::get<1>(bind.first);
782 : // auto tsk_out_id = std::get<2>(bind.first);
783 : // auto sck_out_id = std::get<3>(bind.first);
784 : // auto sck_out_ubp = std::get<4>(bind.first);
785 :
786 : // auto sck_in_name = std::get<0>(bind.second)->get_name();
787 : // auto tsk_in_name = std::get<0>(bind.second)->get_task().get_name();
788 : // auto tsk_in_sta = std::get<1>(bind.second);
789 : // auto tsk_in_id = std::get<2>(bind.second);
790 : // auto sck_in_id = std::get<3>(bind.second);
791 :
792 : // std::cout << " " << tsk_out_name << "[" << sck_out_name << "] (stage " << tsk_out_sta << ", tsk id = "
793 : // << tsk_out_id << ", sck id = " << sck_out_id << ", ubp = " << sck_out_ubp << ")" << " -> "
794 : // << tsk_in_name << "[" << sck_in_name << "] (stage " << tsk_in_sta << ", tsk id = "
795 : // << tsk_in_id << ", sck id = " << sck_in_id << ")" << std::endl;
796 : // }
797 :
798 : // ----------------------------------------------------------------------------------------------------------------
799 : // ------------------------------------------------------------------------------------------------ create adaptors
800 : // ----------------------------------------------------------------------------------------------------------------
801 94 : auto sck_orphan_binds_cpy = this->sck_orphan_binds;
802 94 : module::Adaptor_m_to_n* adp = nullptr;
803 94 : std::map<runtime::Socket*, size_t> sck_to_adp_sck_id;
804 430 : for (size_t sta = 0; sta < this->stages.size(); sta++)
805 : {
806 336 : const auto n_threads = this->stages[sta]->get_n_threads();
807 336 : std::vector<std::vector<runtime::Task*>> tasks_per_threads = this->stages[sta]->get_tasks_per_threads();
808 :
809 : // ------------------------------------------------------------------------------------------------------------
810 : // ----------------------------------------------------------------------------------------------- pull adaptor
811 : // ------------------------------------------------------------------------------------------------------------
812 336 : if (sta > 0)
813 : {
814 242 : assert(adp != nullptr);
815 : // sck out addr stage tsk id sck id unbind_pos
816 : std::vector<std::pair<std::tuple<runtime::Socket*, size_t, size_t, size_t, size_t>,
817 : // sck in addr stage tsk id sck id tsk in addr
818 : std::tuple<runtime::Socket*, size_t, size_t, size_t, runtime::Task*>>>
819 242 : sck_orphan_binds_new;
820 :
821 2826 : for (size_t t = 0; t < n_threads; t++)
822 : {
823 2584 : module::Adaptor_m_to_n* cur_adp = (t == 0) ? adp : adp->clone();
824 2584 : if (t > 0) cur_adp->add_puller();
825 :
826 7752 : for (auto& t : cur_adp->tasks)
827 5168 : t->set_fast(true);
828 2584 : if (t > 0)
829 : {
830 2342 : this->adaptors[sta - 1].second.push_back(std::unique_ptr<module::Adaptor_m_to_n>(cur_adp));
831 2342 : cur_adp->set_custom_name("Adp_m_to_n_" + std::to_string(sta - 1));
832 : }
833 :
834 2584 : auto task_pull = &(*cur_adp)("pull");
835 :
836 2584 : sck_orphan_binds_new.clear();
837 14870 : for (auto& bind : sck_orphan_binds_cpy)
838 : {
839 12286 : auto tsk_out_sta = std::get<1>(bind.first);
840 12286 : if (tsk_out_sta < sta)
841 : {
842 6781 : auto tsk_in_sta = std::get<1>(bind.second);
843 6781 : if (tsk_in_sta == sta)
844 : {
845 3625 : auto sck_out_ptr = std::get<0>(bind.first);
846 3625 : auto priority = std::get<4>(bind.first);
847 3625 : auto tsk_in_id = std::get<2>(bind.second);
848 3625 : auto sck_in_id = std::get<3>(bind.second);
849 3625 : runtime::Socket* sck_in = nullptr;
850 3625 : runtime::Task* tsk_in = nullptr;
851 3625 : if (std::get<0>(bind.second) != nullptr) // if socket to socket binding
852 3032 : sck_in = tasks_per_threads[t][tsk_in_id]->sockets[sck_in_id].get();
853 : else // if socket to task binding
854 593 : tsk_in = tasks_per_threads[t][tsk_in_id];
855 3625 : this->adaptors_binds.push_back(std::make_tuple(
856 7250 : task_pull->sockets[sck_to_adp_sck_id[sck_out_ptr]].get(), sck_in, priority, tsk_in));
857 : }
858 : else
859 3156 : sck_orphan_binds_new.push_back(bind);
860 : }
861 : else
862 5505 : sck_orphan_binds_new.push_back(bind);
863 : }
864 :
865 2584 : if (t > 0) this->stages[sta]->all_modules[t].push_back(cur_adp);
866 : }
867 242 : this->saved_firsts_tasks_id[sta] = this->stages[sta]->firsts_tasks_id;
868 242 : sck_orphan_binds_cpy = sck_orphan_binds_new;
869 :
870 242 : adp->alloc_buffers();
871 242 : }
872 :
873 : // ------------------------------------------------------------------------------------------------------------
874 : // ----------------------------------------------------------------------------------------------- push adaptor
875 : // ------------------------------------------------------------------------------------------------------------
876 336 : std::map<runtime::Socket*, size_t> sck_to_adp_sck_id_new;
877 336 : if (sta < this->stages.size() - 1)
878 : {
879 242 : std::vector<size_t> adp_n_elmts;
880 242 : std::vector<std::type_index> adp_datatype;
881 242 : size_t adp_buffer_size = synchro_buffer_sizes.size() ? synchro_buffer_sizes[sta] : 1;
882 242 : bool adp_active_waiting = synchro_active_waiting.size() ? synchro_active_waiting[sta] : false;
883 242 : size_t adp_n_frames = 1;
884 :
885 : // a map to remember if a passed socket points already to the same memory space
886 242 : std::map<void*, size_t> fwd_source;
887 :
888 242 : std::vector<runtime::Socket*> passed_scks_out;
889 1060 : for (auto& bind : sck_orphan_binds_cpy)
890 : {
891 818 : auto tsk_out_sta = std::get<1>(bind.first);
892 818 : if (tsk_out_sta <= sta)
893 : {
894 520 : auto sck_out = std::get<0>(bind.first);
895 520 : if (std::find(passed_scks_out.begin(), passed_scks_out.end(), sck_out) == passed_scks_out.end())
896 : {
897 : // avoid the creation of new adaptor sockets for forward sockets pointing to the same memory
898 : // space
899 467 : auto sck_out_dptr = (void*)sck_out->get_dataptr<int8_t>();
900 467 : assert(sck_out_dptr != nullptr);
901 467 : if (fwd_source.find(sck_out_dptr) == fwd_source.end())
902 : {
903 410 : fwd_source[sck_out_dptr] = 1;
904 410 : adp_n_frames = sck_out->get_task().get_module().get_n_frames();
905 410 : adp_n_elmts.push_back(sck_out->get_n_elmts() / adp_n_frames);
906 410 : adp_datatype.push_back(sck_out->get_datatype());
907 : }
908 467 : passed_scks_out.push_back(sck_out);
909 : }
910 : }
911 : }
912 242 : passed_scks_out.clear();
913 :
914 : // allocate the adaptor for the first thread
915 242 : adp = new module::Adaptor_m_to_n(adp_n_elmts, adp_datatype, adp_buffer_size, adp_active_waiting);
916 242 : adp->set_n_frames(adp_n_frames);
917 :
918 2826 : for (size_t t = 0; t < n_threads; t++)
919 : {
920 2584 : module::Adaptor_m_to_n* cur_adp = (t == 0) ? adp : adp->clone();
921 2584 : cur_adp->set_custom_name("Adp_m_to_n_" + std::to_string(sta));
922 2584 : if (t > 0) cur_adp->add_pusher();
923 2584 : this->adaptors[sta].first.push_back(std::unique_ptr<module::Adaptor_m_to_n>(cur_adp));
924 2584 : auto task_push = &(*cur_adp)("push");
925 :
926 2584 : std::map<void*, size_t> fwd_source;
927 2584 : sck_to_adp_sck_id_new.clear();
928 2584 : size_t adp_sck_id = 0;
929 11624 : for (auto& bind : sck_orphan_binds_cpy) // bind standard task to last adaptor
930 : {
931 9040 : auto tsk_out_sta = std::get<1>(bind.first);
932 :
933 9040 : if (tsk_out_sta <= sta)
934 : {
935 7762 : auto sck_out_ptr = std::get<0>(bind.first);
936 7762 : auto sck_out_dptr = (void*)sck_out_ptr->get_dataptr<int8_t>();
937 7762 : assert(sck_out_dptr != nullptr);
938 :
939 7762 : if (std::find(passed_scks_out.begin(),
940 : passed_scks_out.end(),
941 22484 : sck_out_ptr) == passed_scks_out.end() &&
942 14722 : fwd_source.find(sck_out_dptr) == fwd_source.end()) // <= the latest condition is here to
943 : // avoid to bind adaptor sockets two
944 : // times the same memory space
945 : // (usefull in the case of multiple
946 : // fwd sockets pointing to the same
947 : // memory address)
948 : {
949 5763 : if (tsk_out_sta == sta)
950 : {
951 3778 : auto tsk_out_id = std::get<2>(bind.first);
952 3778 : auto sck_out_id = std::get<3>(bind.first);
953 3778 : auto sck_out = tasks_per_threads[t][tsk_out_id]->sockets[sck_out_id];
954 3778 : auto priority = std::get<4>(bind.first);
955 3778 : sck_to_adp_sck_id_new[sck_out_ptr] = adp_sck_id;
956 3778 : this->adaptors_binds.push_back(
957 3778 : std::make_tuple(sck_out.get(),
958 3778 : task_push->sockets[adp_sck_id++].get(),
959 : priority,
960 3778 : nullptr)); // <= only socket to socket binding is possible here
961 3778 : }
962 : else // if (tsk_out_sta < sta) // bind prev. adaptor to last adaptor
963 : {
964 1985 : auto tsk_out_id = 1;
965 1985 : auto sck_out_id = sck_to_adp_sck_id[sck_out_ptr];
966 1985 : sck_to_adp_sck_id_new[sck_out_ptr] = adp_sck_id;
967 : auto adp_prev =
968 1985 : t == 0 ? this->adaptors[sta - 1].first[0] : this->adaptors[sta - 1].second[t - 1];
969 1985 : auto sck_out = (*adp_prev)[tsk_out_id].sockets[sck_out_id];
970 1985 : auto priority = std::get<4>(bind.first);
971 1985 : this->adaptors_binds.push_back(
972 1985 : std::make_tuple(sck_out.get(),
973 1985 : task_push->sockets[adp_sck_id++].get(),
974 : priority,
975 1985 : nullptr)); // <= only socket to socket binding is possible here
976 1985 : }
977 :
978 5763 : fwd_source[sck_out_dptr] = 1; // remember that this memory space has been connected to the
979 : // adaptor once
980 5763 : passed_scks_out.push_back(sck_out_ptr);
981 : }
982 : }
983 : }
984 2584 : passed_scks_out.clear();
985 2584 : }
986 242 : this->saved_lasts_tasks_id[sta] = this->stages[sta]->lasts_tasks_id;
987 242 : }
988 336 : sck_to_adp_sck_id = sck_to_adp_sck_id_new;
989 336 : }
990 94 : }
991 :
992 : void
993 280 : Pipeline::bind_adaptors()
994 : {
995 280 : this->_bind_adaptors(true);
996 280 : }
997 :
998 : void
999 280 : Pipeline::_bind_adaptors(const bool bind_adaptors)
1000 : {
1001 280 : if (!this->bound_adaptors)
1002 : {
1003 1280 : for (size_t sta = 0; sta < this->stages.size(); sta++)
1004 : {
1005 1000 : const auto n_threads = this->stages[sta]->get_n_threads();
1006 :
1007 : // --------------------------------------------------------------------------------------------------------
1008 : // ------------------------------------------------------------------------------------------- pull adaptor
1009 : // --------------------------------------------------------------------------------------------------------
1010 1000 : if (sta > 0)
1011 : {
1012 8466 : for (size_t t = 0; t < n_threads; t++)
1013 : {
1014 : module::Adaptor_m_to_n* cur_adp =
1015 7746 : t > 0 ? adaptors[sta - 1].second[t - 1].get() : adaptors[sta - 1].first[0].get();
1016 :
1017 7746 : if (t > 0 || sta == this->stages.size() - 1) // add the adaptor to the current stage
1018 7294 : this->stages[sta]->all_modules[t].push_back(cur_adp);
1019 :
1020 7746 : auto task_pull = &(*cur_adp)("pull");
1021 :
1022 7746 : auto ss = this->stages[sta]->sequences[t]->get_contents();
1023 7746 : assert(ss != nullptr);
1024 7746 : ss->tasks.insert(ss->tasks.begin(), task_pull);
1025 7746 : ss->processes.insert(ss->processes.begin(),
1026 0 : [task_pull]() -> const int*
1027 : {
1028 0 : task_pull->exec();
1029 0 : const int* status = task_pull->sockets.back()->get_dataptr<const int>();
1030 0 : return status;
1031 : });
1032 7746 : this->stages[sta]->update_tasks_id(t);
1033 : }
1034 720 : this->stages[sta]->firsts_tasks_id.clear();
1035 720 : this->stages[sta]->firsts_tasks_id.push_back(0);
1036 720 : this->stages[sta]->n_tasks++;
1037 : }
1038 :
1039 : // --------------------------------------------------------------------------------------------------------
1040 : // ------------------------------------------------------------------------------------------- push adaptor
1041 : // --------------------------------------------------------------------------------------------------------
1042 1000 : if (sta < this->stages.size() - 1)
1043 : {
1044 720 : size_t last_task_id = 0;
1045 8466 : for (size_t t = 0; t < n_threads; t++)
1046 : {
1047 7746 : module::Adaptor_m_to_n* cur_adp = adaptors[sta].first[t].get();
1048 :
1049 : // add the adaptor to the current stage
1050 7746 : this->stages[sta]->all_modules[t].push_back(cur_adp);
1051 :
1052 7746 : auto task_push = &(*cur_adp)("push");
1053 :
1054 7746 : auto ss = this->stages[sta]->get_last_subsequence(t);
1055 7746 : assert(ss != nullptr);
1056 7746 : ss->tasks.push_back(task_push);
1057 7746 : ss->processes.push_back(
1058 0 : [task_push]() -> const int*
1059 : {
1060 0 : task_push->exec();
1061 0 : const int* status = task_push->sockets.back()->get_dataptr<const int>();
1062 0 : return status;
1063 : });
1064 7746 : last_task_id = ss->tasks_id[ss->tasks_id.size() - 1] + 1;
1065 7746 : ss->tasks_id.push_back(last_task_id);
1066 : }
1067 720 : this->stages[sta]->lasts_tasks_id.clear();
1068 720 : this->stages[sta]->lasts_tasks_id.push_back(last_task_id);
1069 720 : this->stages[sta]->n_tasks++;
1070 : }
1071 1000 : this->stages[sta]->update_firsts_and_lasts_tasks();
1072 : }
1073 :
1074 : // ------------------------------------------------------------------------------------------------------------
1075 : // ---------------------------------------------------------------------------------------------- bind adaptors
1076 : // ------------------------------------------------------------------------------------------------------------
1077 1411 : for (auto& bind : this->sck_orphan_binds)
1078 : {
1079 1131 : auto sck_out = std::get<0>(bind.first);
1080 1131 : auto sck_in = std::get<0>(bind.second);
1081 1131 : if (sck_in != nullptr) // if socket to socket unbinding
1082 1056 : sck_in->unbind(*sck_out);
1083 : else // if socket to task unbinding
1084 : {
1085 75 : auto tsk_in = std::get<4>(bind.second);
1086 75 : assert(tsk_in != nullptr);
1087 75 : tsk_in->unbind(*sck_out);
1088 : }
1089 : }
1090 :
1091 280 : if (bind_adaptors)
1092 : {
1093 28432 : for (auto& bind : this->adaptors_binds)
1094 : {
1095 28152 : auto sck_out = std::get<0>(bind);
1096 28152 : auto sck_in = std::get<1>(bind);
1097 28152 : auto priority = std::get<2>(bind);
1098 28152 : if (sck_in != nullptr) // if socket to socket binding
1099 26373 : sck_in->_bind(*sck_out, priority);
1100 : else // if socket to task binding
1101 : {
1102 1779 : auto tsk_in = std::get<3>(bind);
1103 1779 : assert(tsk_in != nullptr);
1104 1779 : tsk_in->_bind(*sck_out, priority);
1105 : }
1106 : }
1107 : }
1108 :
1109 280 : this->bound_adaptors = true;
1110 : }
1111 280 : }
1112 :
1113 : void
1114 187 : Pipeline::unbind_adaptors()
1115 : {
1116 187 : this->_unbind_adaptors(true);
1117 187 : }
1118 :
1119 : void
1120 373 : Pipeline::_unbind_adaptors(const bool bind_orphans)
1121 : {
1122 373 : if (this->bound_adaptors)
1123 : {
1124 1280 : for (size_t sta = 0; sta < this->stages.size(); sta++)
1125 : {
1126 1000 : const auto n_threads = this->stages[sta]->get_n_threads();
1127 :
1128 : // --------------------------------------------------------------------------------------------------------
1129 : // ------------------------------------------------------------------------------------------- pull adaptor
1130 : // --------------------------------------------------------------------------------------------------------
1131 1000 : if (sta > 0)
1132 : {
1133 8466 : for (size_t t = 0; t < n_threads; t++)
1134 : {
1135 7746 : if (t > 0 || sta == this->stages.size() - 1) // rm the adaptor to the current stage
1136 7294 : this->stages[sta]->all_modules[t].pop_back();
1137 :
1138 7746 : auto ss = this->stages[sta]->sequences[t]->get_contents();
1139 7746 : assert(ss != nullptr);
1140 7746 : ss->tasks.erase(ss->tasks.begin());
1141 7746 : ss->processes.erase(ss->processes.begin());
1142 7746 : this->stages[sta]->update_tasks_id(t);
1143 : }
1144 720 : this->stages[sta]->firsts_tasks_id = this->saved_firsts_tasks_id[sta];
1145 720 : this->stages[sta]->n_tasks--;
1146 : }
1147 :
1148 : // --------------------------------------------------------------------------------------------------------
1149 : // ------------------------------------------------------------------------------------------- push adaptor
1150 : // --------------------------------------------------------------------------------------------------------
1151 1000 : if (sta < this->stages.size() - 1)
1152 : {
1153 8466 : for (size_t t = 0; t < n_threads; t++)
1154 : {
1155 : // rm the adaptor to the current stage
1156 7746 : this->stages[sta]->all_modules[t].pop_back();
1157 :
1158 7746 : auto ss = this->stages[sta]->get_last_subsequence(t);
1159 7746 : assert(ss != nullptr);
1160 7746 : ss->tasks.pop_back();
1161 7746 : ss->processes.pop_back();
1162 7746 : ss->tasks_id.pop_back();
1163 : }
1164 720 : this->stages[sta]->lasts_tasks_id = this->saved_lasts_tasks_id[sta];
1165 720 : this->stages[sta]->n_tasks--;
1166 : }
1167 1000 : this->stages[sta]->update_firsts_and_lasts_tasks();
1168 : }
1169 :
1170 : // ------------------------------------------------------------------------------------------------------------
1171 : // -------------------------------------------------------------------------------------------- unbind adaptors
1172 : // ------------------------------------------------------------------------------------------------------------
1173 28432 : for (auto& bind : this->adaptors_binds)
1174 : {
1175 28152 : auto sck_out = std::get<0>(bind);
1176 28152 : auto sck_in = std::get<1>(bind);
1177 28152 : if (sck_in != nullptr) // if socket to socket unbinding
1178 26373 : sck_in->unbind(*sck_out);
1179 : else // if socket to task unbinding
1180 : {
1181 1779 : auto tsk_in = std::get<3>(bind);
1182 1779 : assert(tsk_in != nullptr);
1183 1779 : tsk_in->unbind(*sck_out);
1184 : }
1185 : }
1186 :
1187 280 : if (bind_orphans)
1188 : {
1189 473 : for (auto& bind : this->sck_orphan_binds)
1190 : {
1191 379 : auto sck_out = std::get<0>(bind.first);
1192 379 : auto priority = std::get<4>(bind.first);
1193 379 : auto sck_in = std::get<0>(bind.second);
1194 379 : if (sck_in != nullptr) // if socket to socket binding
1195 354 : sck_in->_bind(*sck_out, priority);
1196 : else // if socket to task binding
1197 : {
1198 25 : auto tsk_in = std::get<4>(bind.second);
1199 25 : assert(tsk_in != nullptr);
1200 25 : tsk_in->_bind(*sck_out, priority);
1201 : }
1202 : }
1203 : }
1204 :
1205 280 : this->bound_adaptors = false;
1206 : }
1207 373 : }
1208 :
1209 : void
1210 0 : Pipeline::exec(const std::vector<std::function<bool(const std::vector<const int*>&)>>& stop_conditions)
1211 : {
1212 0 : if (stop_conditions.size() != this->stages.size() && stop_conditions.size() != 1)
1213 : {
1214 0 : std::stringstream message;
1215 0 : message << "'stop_conditions.size()' has to be equal to 'stages.size()' or to 1 ('stop_conditions.size()' = "
1216 0 : << stop_conditions.size() << ", 'stages.size()' = " << stages.size() << ").";
1217 0 : throw tools::invalid_argument(__FILE__, __LINE__, __func__, message.str());
1218 0 : }
1219 :
1220 0 : if (!this->bound_adaptors)
1221 : {
1222 0 : std::stringstream message;
1223 0 : message << "'bound_adaptors' has to be true to execute the pipeline.";
1224 0 : throw tools::runtime_error(__FILE__, __LINE__, __func__, message.str());
1225 0 : }
1226 :
1227 : // ----------------------------------------------------------------------------------------------------------------
1228 0 : auto& stages = this->stages;
1229 0 : std::vector<const std::function<bool(const std::vector<const int*>&)>*> stop_condition_vec(stages.size() - 1,
1230 0 : nullptr);
1231 0 : if (stop_conditions.size() == stages.size())
1232 0 : for (size_t s = 0; s < stages.size() - 1; s++)
1233 0 : stop_condition_vec[s] = &stop_conditions[s];
1234 :
1235 0 : std::function<void(const size_t)> func_exec = [&stages, &stop_condition_vec](const size_t tid)
1236 : {
1237 0 : size_t s = tid;
1238 0 : if (stop_condition_vec[s])
1239 0 : stages[s]->exec(*(stop_condition_vec[s]));
1240 : else
1241 0 : stages[s]->exec();
1242 :
1243 : // send the signal to stop the next stage
1244 0 : const auto& tasks = stages[s + 1]->get_tasks_per_threads();
1245 0 : for (size_t th = 0; th < tasks.size(); th++)
1246 0 : for (size_t ta = 0; ta < tasks[th].size(); ta++)
1247 : {
1248 0 : auto m = dynamic_cast<module::Adaptor_m_to_n*>(&tasks[th][ta]->get_module());
1249 0 : if (m != nullptr)
1250 0 : if (tasks[th][ta]->get_name().find("pull") != std::string::npos) m->cancel_waiting();
1251 : }
1252 0 : };
1253 :
1254 0 : this->thread_pool->run(func_exec, true);
1255 :
1256 0 : stages[stages.size() - 1]->exec(stop_conditions[stop_conditions.size() - 1]);
1257 :
1258 : // stop all the stages before
1259 0 : for (size_t notify_s = 0; notify_s < stages.size() - 1; notify_s++)
1260 0 : for (auto& m : stages[notify_s]->get_modules<tools::Interface_waiting>())
1261 0 : m->cancel_waiting();
1262 :
1263 0 : this->thread_pool->wait();
1264 0 : this->thread_pool->unset_func_exec();
1265 : // ----------------------------------------------------------------------------------------------------------------
1266 :
1267 : // this is NOT made in the tools::Sequence::exec() to correctly flush the pipeline before restoring buffers
1268 : // initial configuration
1269 0 : for (auto& stage : this->stages)
1270 0 : if (stage->is_no_copy_mode())
1271 : {
1272 0 : stage->reset_no_copy_mode();
1273 0 : stage->gen_processes(false);
1274 : }
1275 :
1276 0 : for (auto& padps : this->adaptors)
1277 : {
1278 0 : for (auto& adp : padps.first)
1279 0 : adp->reset();
1280 0 : for (auto& adp : padps.second)
1281 0 : adp->reset();
1282 : }
1283 0 : }
1284 :
1285 : void
1286 94 : Pipeline::exec(const std::vector<std::function<bool()>>& stop_conditions)
1287 : {
1288 94 : if (stop_conditions.size() != this->stages.size() && stop_conditions.size() != 1)
1289 : {
1290 0 : std::stringstream message;
1291 0 : message << "'stop_conditions.size()' has to be equal to 'stages.size()' or to 1 ('stop_conditions.size()' = "
1292 0 : << stop_conditions.size() << ", 'stages.size()' = " << stages.size() << ").";
1293 0 : throw tools::invalid_argument(__FILE__, __LINE__, __func__, message.str());
1294 0 : }
1295 :
1296 94 : if (!this->bound_adaptors)
1297 : {
1298 0 : std::stringstream message;
1299 0 : message << "'bound_adaptors' has to be true to execute the pipeline.";
1300 0 : throw tools::runtime_error(__FILE__, __LINE__, __func__, message.str());
1301 0 : }
1302 :
1303 : // ----------------------------------------------------------------------------------------------------------------
1304 94 : auto& stages = this->stages;
1305 94 : std::vector<const std::function<bool()>*> stop_condition_vec(stages.size() - 1, nullptr);
1306 94 : if (stop_conditions.size() == stages.size())
1307 4 : for (size_t s = 0; s < stages.size() - 1; s++)
1308 0 : stop_condition_vec[s] = &stop_conditions[s];
1309 :
1310 709 : std::function<void(const size_t)> func_exec = [&stages, &stop_condition_vec](const size_t tid)
1311 : {
1312 238 : size_t s = tid;
1313 238 : if (stop_condition_vec[s])
1314 0 : stages[s]->exec(*(stop_condition_vec[s]));
1315 : else
1316 239 : stages[s]->exec();
1317 :
1318 : // send the signal to stop the next stage
1319 232 : const auto& tasks = stages[s + 1]->get_tasks_per_threads();
1320 2819 : for (size_t th = 0; th < tasks.size(); th++)
1321 19598 : for (size_t ta = 0; ta < tasks[th].size(); ta++)
1322 : {
1323 17006 : auto m = dynamic_cast<module::Adaptor_m_to_n*>(&tasks[th][ta]->get_module());
1324 17010 : if (m != nullptr)
1325 5068 : if (tasks[th][ta]->get_name().find("pull") != std::string::npos) m->cancel_waiting();
1326 : }
1327 333 : };
1328 :
1329 94 : this->thread_pool->run(func_exec, true);
1330 94 : stages[stages.size() - 1]->exec(stop_conditions[stop_conditions.size() - 1]);
1331 :
1332 : // stop all the stages before
1333 336 : for (size_t notify_s = 0; notify_s < stages.size() - 1; notify_s++)
1334 7510 : for (auto& m : stages[notify_s]->get_modules<tools::Interface_waiting>())
1335 7510 : m->cancel_waiting();
1336 :
1337 94 : this->thread_pool->wait();
1338 94 : this->thread_pool->unset_func_exec();
1339 : // ----------------------------------------------------------------------------------------------------------------
1340 :
1341 : // this is NOT made in the tools::Sequence::exec() to correctly flush the pipeline before restoring buffers
1342 : // initial configuration
1343 430 : for (auto& stage : this->stages)
1344 336 : if (stage->is_no_copy_mode())
1345 : {
1346 336 : stage->reset_no_copy_mode();
1347 336 : stage->gen_processes(false);
1348 : }
1349 :
1350 336 : for (auto& padps : this->adaptors)
1351 : {
1352 2826 : for (auto& adp : padps.first)
1353 2584 : adp->reset();
1354 2584 : for (auto& adp : padps.second)
1355 2342 : adp->reset();
1356 : }
1357 94 : }
1358 :
1359 : void
1360 0 : Pipeline::exec(std::function<bool(const std::vector<const int*>&)> stop_condition)
1361 : {
1362 0 : this->exec(std::vector<std::function<bool(const std::vector<const int*>&)>>(1, stop_condition));
1363 0 : }
1364 :
1365 : void
1366 94 : Pipeline::exec(std::function<bool()> stop_condition)
1367 : {
1368 94 : this->exec(std::vector<std::function<bool()>>(1, stop_condition));
1369 94 : }
1370 :
1371 : void
1372 19 : Pipeline::exec()
1373 : {
1374 6942 : this->exec([]() { return false; });
1375 19 : }
1376 :
1377 : std::vector<std::vector<module::Module*>>
1378 0 : Pipeline::get_modules_per_threads() const
1379 : {
1380 0 : std::vector<std::vector<module::Module*>> modules_per_threads;
1381 0 : for (auto& stage : this->stages)
1382 : {
1383 0 : auto modules_per_threads_add = stage->get_modules_per_threads();
1384 0 : if (modules_per_threads_add.size() > modules_per_threads.size())
1385 0 : modules_per_threads.resize(modules_per_threads_add.size());
1386 :
1387 0 : for (size_t t = 0; t < modules_per_threads_add.size(); t++)
1388 0 : modules_per_threads[t].insert(
1389 0 : modules_per_threads[t].end(), modules_per_threads_add[t].begin(), modules_per_threads_add[t].end());
1390 0 : }
1391 0 : return modules_per_threads;
1392 0 : }
1393 :
1394 : std::vector<std::vector<module::Module*>>
1395 0 : Pipeline::get_modules_per_types() const
1396 : {
1397 0 : std::vector<std::vector<module::Module*>> modules_per_types;
1398 0 : for (auto& stage : this->stages)
1399 : {
1400 0 : auto modules_per_types_add = stage->get_modules_per_types();
1401 0 : modules_per_types.insert(modules_per_types.end(), modules_per_types_add.begin(), modules_per_types_add.end());
1402 0 : }
1403 0 : return modules_per_types;
1404 0 : }
1405 :
1406 : std::vector<std::vector<module::Module*>>
1407 0 : Pipeline::get_original_modules() const
1408 : {
1409 0 : return this->original_sequence.get_modules_per_types();
1410 : }
1411 :
1412 : std::vector<std::vector<runtime::Task*>>
1413 94 : Pipeline::get_tasks_per_threads() const
1414 : {
1415 94 : std::vector<std::vector<runtime::Task*>> tasks_per_threads;
1416 430 : for (auto& stage : this->stages)
1417 : {
1418 336 : auto tasks_per_threads_add = stage->get_tasks_per_threads();
1419 336 : if (tasks_per_threads_add.size() > tasks_per_threads.size())
1420 182 : tasks_per_threads.resize(tasks_per_threads_add.size());
1421 :
1422 3017 : for (size_t t = 0; t < tasks_per_threads_add.size(); t++)
1423 10724 : tasks_per_threads[t].insert(
1424 10724 : tasks_per_threads[t].end(), tasks_per_threads_add[t].begin(), tasks_per_threads_add[t].end());
1425 336 : }
1426 94 : return tasks_per_threads;
1427 0 : }
1428 :
1429 : std::vector<std::vector<runtime::Task*>>
1430 0 : Pipeline::get_tasks_per_types() const
1431 : {
1432 0 : std::vector<std::vector<runtime::Task*>> tasks_per_types;
1433 0 : for (auto& stage : this->stages)
1434 : {
1435 0 : auto tasks_per_types_add = stage->get_tasks_per_types();
1436 0 : tasks_per_types.insert(tasks_per_types.end(), tasks_per_types_add.begin(), tasks_per_types_add.end());
1437 0 : }
1438 0 : return tasks_per_types;
1439 0 : }
1440 :
1441 : void
1442 6 : Pipeline::export_dot(std::ostream& stream) const
1443 : {
1444 : std::function<void(tools::Digraph_node<Sub_sequence>*,
1445 : const size_t,
1446 : const std::string&,
1447 : std::ostream&,
1448 : std::vector<tools::Digraph_node<Sub_sequence>*>&)>
1449 25 : export_dot_subsequences_recursive = [&export_dot_subsequences_recursive,
1450 : this](tools::Digraph_node<Sub_sequence>* cur_node,
1451 : const size_t sta,
1452 : const std::string& tab,
1453 : std::ostream& stream,
1454 25 : std::vector<tools::Digraph_node<Sub_sequence>*>& already_parsed_nodes)
1455 : {
1456 50 : if (cur_node != nullptr &&
1457 50 : std::find(already_parsed_nodes.begin(), already_parsed_nodes.end(), cur_node) == already_parsed_nodes.end())
1458 : {
1459 25 : already_parsed_nodes.push_back(cur_node);
1460 50 : this->stages[sta]->export_dot_subsequence(cur_node->get_c()->tasks,
1461 25 : cur_node->get_c()->tasks_id,
1462 25 : cur_node->get_c()->type,
1463 50 : "Sub-sequence " + std::to_string(cur_node->get_c()->id) +
1464 100 : " (depth = " + std::to_string(cur_node->get_depth()) + ")",
1465 : tab,
1466 : stream);
1467 :
1468 25 : for (auto c : cur_node->get_children())
1469 0 : export_dot_subsequences_recursive(c, sta, tab, stream, already_parsed_nodes);
1470 : }
1471 31 : };
1472 :
1473 : std::function<void(tools::Digraph_node<Sub_sequence>*,
1474 : const size_t,
1475 : const std::string&,
1476 : std::ostream&,
1477 : std::vector<tools::Digraph_node<Sub_sequence>*>&)>
1478 : export_dot_connections_recursive =
1479 25 : [&export_dot_connections_recursive, this](tools::Digraph_node<Sub_sequence>* cur_node,
1480 : const size_t sta,
1481 : const std::string& tab,
1482 : std::ostream& stream,
1483 25 : std::vector<tools::Digraph_node<Sub_sequence>*>& already_parsed_nodes)
1484 : {
1485 50 : if (cur_node != nullptr &&
1486 50 : std::find(already_parsed_nodes.begin(), already_parsed_nodes.end(), cur_node) == already_parsed_nodes.end())
1487 : {
1488 25 : already_parsed_nodes.push_back(cur_node);
1489 25 : this->stages[sta]->export_dot_connections(cur_node->get_c()->tasks, tab, stream);
1490 :
1491 25 : for (auto c : cur_node->get_children())
1492 0 : export_dot_connections_recursive(c, sta, tab, stream, already_parsed_nodes);
1493 : }
1494 31 : };
1495 :
1496 6 : std::string tab = "\t";
1497 6 : stream << "digraph Pipeline {" << std::endl;
1498 6 : stream << tab << "compound=true;" << std::endl;
1499 :
1500 31 : for (size_t sta = 0; sta < this->stages.size(); sta++)
1501 : {
1502 25 : std::vector<tools::Digraph_node<Sub_sequence>*> already_parsed_nodes;
1503 25 : const auto n_threads = this->stages[sta]->get_n_threads();
1504 25 : stream << tab << "subgraph \"cluster_Stage " << sta << "\" {" << std::endl;
1505 25 : stream << tab << tab << "node [style=filled];" << std::endl;
1506 25 : export_dot_subsequences_recursive(this->stages[sta]->sequences[0], sta, tab, stream, already_parsed_nodes);
1507 25 : stream << tab << tab << "label=\"Pipeline stage " << sta << " (" << n_threads << " thread(s))\";" << std::endl;
1508 25 : std::string color = "blue";
1509 25 : stream << tab << tab << "color=" << color << ";" << std::endl;
1510 25 : stream << tab << "}" << std::endl;
1511 25 : }
1512 :
1513 31 : for (size_t sta = 0; sta < this->stages.size(); sta++)
1514 : {
1515 25 : std::vector<tools::Digraph_node<Sub_sequence>*> already_parsed_nodes;
1516 25 : export_dot_connections_recursive(this->stages[sta]->sequences[0], sta, tab, stream, already_parsed_nodes);
1517 25 : if (this->bound_adaptors)
1518 : {
1519 25 : if (sta > 0)
1520 : {
1521 19 : auto tsk1 = this->stages[sta - 1]->get_lasts_tasks()[0].back();
1522 19 : auto tsk2 = this->stages[sta + 0]->get_firsts_tasks()[0][0];
1523 :
1524 19 : auto sck1 = tsk1->sockets[0];
1525 19 : auto sck2 = tsk2->sockets[0];
1526 :
1527 19 : stream << tab << "\"" << +sck1.get() << "\" -> \"" << +sck2.get() << "\" [ltail=\"cluster_"
1528 19 : << +&tsk1->get_module() << "_" << +tsk1 << "\" lhead=\"cluster_" << +&tsk2->get_module() << "_"
1529 19 : << +tsk2 << "\" color=\"green\" style=\"dashed\"];" << std::endl;
1530 19 : }
1531 : }
1532 25 : }
1533 :
1534 6 : stream << "}" << std::endl;
1535 6 : }
1536 :
1537 : bool
1538 186 : Pipeline::is_bound_adaptors() const
1539 : {
1540 186 : return this->bound_adaptors;
1541 : }
1542 :
1543 : void
1544 0 : Pipeline::set_auto_stop(const bool auto_stop)
1545 : {
1546 0 : this->auto_stop = auto_stop;
1547 0 : for (auto stage : this->stages)
1548 0 : stage->set_auto_stop(auto_stop);
1549 0 : }
1550 :
1551 : bool
1552 0 : Pipeline::is_auto_stop() const
1553 : {
1554 0 : return this->auto_stop;
1555 : }
1556 :
1557 : size_t
1558 0 : Pipeline::get_n_frames() const
1559 : {
1560 0 : const auto n_frames = this->stages[0]->get_n_frames();
1561 :
1562 0 : for (auto& sta : this->stages)
1563 0 : if (sta->get_n_frames() != n_frames)
1564 : {
1565 0 : std::stringstream message;
1566 0 : message << "All the stages/sequences do not have the same 'n_frames' value ('sta->get_n_frames()' = "
1567 0 : << sta->get_n_frames() << ", 'n_frames' = " << n_frames << ").";
1568 0 : throw tools::runtime_error(__FILE__, __LINE__, __func__, message.str());
1569 0 : }
1570 :
1571 0 : return n_frames;
1572 : }
1573 :
1574 : void
1575 186 : Pipeline::set_n_frames(const size_t n_frames)
1576 : {
1577 186 : const auto save_bound_adaptors = this->is_bound_adaptors();
1578 186 : if (!save_bound_adaptors) this->bind_adaptors();
1579 186 : this->_unbind_adaptors(false);
1580 :
1581 : // set the new "n_frames" val in the sequences
1582 186 : std::vector<std::vector<std::pair<runtime::Socket*, runtime::Socket*>>> unbind_sockets(this->stages.size());
1583 186 : std::vector<std::vector<std::pair<runtime::Task*, runtime::Socket*>>> unbind_tasks(this->stages.size());
1584 186 : std::vector<bool> skip(this->stages.size());
1585 850 : for (size_t s = 0; s < this->stages.size(); s++)
1586 664 : skip[s] = this->stages[s]->get_n_frames() == n_frames;
1587 850 : for (size_t s = 0; s < this->stages.size(); s++)
1588 664 : if (!skip[s]) this->stages[s]->_set_n_frames_unbind(unbind_sockets[s], unbind_tasks[s]);
1589 850 : for (size_t s = 0; s < this->stages.size(); s++)
1590 664 : if (!skip[s]) this->stages[s]->_set_n_frames(n_frames);
1591 850 : for (size_t s = 0; s < this->stages.size(); s++)
1592 664 : if (!skip[s]) this->stages[s]->_set_n_frames_rebind(unbind_sockets[s], unbind_tasks[s]);
1593 :
1594 : // set the new "n_frames" val in the adaptors
1595 664 : for (auto& adps : this->adaptors)
1596 : {
1597 5640 : for (auto& adp : adps.first)
1598 5162 : adp->set_n_frames(n_frames);
1599 5162 : for (auto& adp : adps.second)
1600 4684 : adp->set_n_frames(n_frames);
1601 : }
1602 :
1603 : // bind orphans to complete the unbind of the adaptors
1604 938 : for (auto& bind : this->sck_orphan_binds)
1605 : {
1606 752 : auto sck_out = std::get<0>(bind.first);
1607 752 : auto priority = std::get<4>(bind.first);
1608 752 : auto sck_in = std::get<0>(bind.second);
1609 752 : if (sck_in != nullptr)
1610 702 : sck_in->_bind(*sck_out, priority);
1611 : else
1612 : {
1613 50 : auto tsk_in = std::get<4>(bind.second);
1614 50 : assert(tsk_in != nullptr);
1615 50 : tsk_in->_bind(*sck_out, priority);
1616 : }
1617 : }
1618 :
1619 186 : if (save_bound_adaptors) this->bind_adaptors();
1620 186 : }
|