Line data Source code
1 : #include <cassert>
2 : #include <fstream>
3 : #include <thread>
4 : #include <tuple>
5 : #include <utility>
6 :
7 : #include "Module/Stateful/Adaptor/Adaptor_1_to_n.hpp"
8 : #include "Module/Stateful/Adaptor/Adaptor_n_to_1.hpp"
9 : #include "Runtime/Pipeline/Pipeline.hpp"
10 : #include "Tools/Exception/exception.hpp"
11 : #include "Tools/Interface/Interface_waiting.hpp"
12 : #include "Tools/Thread/Thread_pinning/Thread_pinning_utils.hpp"
13 : #include "Tools/Thread/Thread_pool/Standard/Thread_pool_standard.hpp"
14 :
15 : using namespace spu;
16 : using namespace spu::runtime;
17 :
18 : // Pipeline
19 : // ::Pipeline(const runtime::Task &first,
20 : // const runtime::Task &last,
21 : // const std::vector<std::pair<std::vector<const runtime::Task*>, std::vector<const runtime::Task*>>>
22 : // &sep_stages, const std::vector<size_t> &n_threads, const std::vector<size_t> &synchro_buffer_sizes, const
23 : // std::vector<bool> &synchro_active_waiting, const std::vector<bool> &thread_pinning, const
24 : // std::vector<std::vector<size_t>> &puids)
25 : // : original_sequence(first, last, 1),
26 : // stages(sep_stages.size()),
27 : // adaptors(sep_stages.size() -1),
28 : // saved_firsts_tasks_id(sep_stages.size()),
29 : // saved_lasts_tasks_id(sep_stages.size()),
30 : // bound_adaptors(false)
31 : // {
32 : // this->init<const runtime::Task>(first,
33 : // &last,
34 : // sep_stages,
35 : // n_threads,
36 : // synchro_buffer_sizes,
37 : // synchro_active_waiting,
38 : // thread_pinning,
39 : // puids);
40 : // }
41 :
42 : // Pipeline
43 : // ::Pipeline(const runtime::Task &first,
44 : // const std::vector<std::pair<std::vector<const runtime::Task*>, std::vector<const runtime::Task*>>>
45 : // &sep_stages, const std::vector<size_t> &n_threads, const std::vector<size_t> &synchro_buffer_sizes, const
46 : // std::vector<bool> &synchro_active_waiting, const std::vector<bool> &thread_pinning, const
47 : // std::vector<std::vector<size_t>> &puids)
48 : // : original_sequence(first, 1),
49 : // stages(sep_stages.size()),
50 : // adaptors(sep_stages.size() -1),
51 : // saved_firsts_tasks_id(sep_stages.size()),
52 : // saved_lasts_tasks_id(sep_stages.size()),
53 : // bound_adaptors(false)
54 : // {
55 : // const runtime::Task* last = nullptr;
56 : // this->init<const runtime::Task>(first,
57 : // last,
58 : // sep_stages,
59 : // n_threads,
60 : // synchro_buffer_sizes,
61 : // synchro_active_waiting,
62 : // thread_pinning,
63 : // puids);
64 : // }
65 :
66 13 : Pipeline
67 : ::Pipeline(const std::vector<runtime::Task*> &firsts,
68 : const std::vector<runtime::Task*> &lasts,
69 : const std::vector<std::tuple<std::vector<runtime::Task*>, std::vector<runtime::Task*>, std::vector<runtime::Task*>>> &sep_stages,
70 : const std::vector<size_t> &n_threads,
71 : const std::vector<size_t> &synchro_buffer_sizes,
72 : const std::vector<bool> &synchro_active_waiting,
73 : const std::vector<bool> &thread_pinning,
74 : const std::vector<std::vector<size_t>> &puids/*,
75 13 : const std::vector<bool> &tasks_inplace*/)
76 13 : : original_sequence(firsts, lasts, 1),
77 13 : stages(sep_stages.size()),
78 13 : adaptors(sep_stages.size() -1),
79 13 : saved_firsts_tasks_id(sep_stages.size()),
80 13 : saved_lasts_tasks_id(sep_stages.size()),
81 13 : bound_adaptors(false),
82 39 : auto_stop(true)
83 : {
84 13 : this->init<runtime::Task>(
85 : firsts, lasts, sep_stages, n_threads, synchro_buffer_sizes, synchro_active_waiting, thread_pinning, puids
86 : /*, tasks_inplace*/);
87 13 : }
88 :
89 26 : Pipeline
90 : ::Pipeline(const std::vector<runtime::Task*> &firsts,
91 : const std::vector<runtime::Task*> &lasts,
92 : const std::vector<std::pair<std::vector<runtime::Task*>, std::vector<runtime::Task*>>> &sep_stages,
93 : const std::vector<size_t> &n_threads,
94 : const std::vector<size_t> &synchro_buffer_sizes,
95 : const std::vector<bool> &synchro_active_waiting,
96 : const std::vector<bool> &thread_pinning,
97 : const std::vector<std::vector<size_t>> &puids/*,
98 26 : const std::vector<bool> &tasks_inplace*/)
99 26 : : original_sequence(firsts, lasts, 1),
100 26 : stages(sep_stages.size()),
101 26 : adaptors(sep_stages.size() -1),
102 26 : saved_firsts_tasks_id(sep_stages.size()),
103 26 : saved_lasts_tasks_id(sep_stages.size()),
104 26 : bound_adaptors(false),
105 78 : auto_stop(true)
106 : {
107 : std::vector<std::tuple<std::vector<runtime::Task*>, std::vector<runtime::Task*>, std::vector<runtime::Task*>>>
108 26 : sep_stages_bis;
109 117 : for (auto& sep_stage : sep_stages)
110 91 : sep_stages_bis.push_back(std::make_tuple(sep_stage.first, sep_stage.second, std::vector<runtime::Task*>()));
111 :
112 26 : this->init<runtime::Task>(firsts,
113 : lasts,
114 : sep_stages_bis,
115 : n_threads,
116 : synchro_buffer_sizes,
117 : synchro_active_waiting,
118 : thread_pinning, puids/*,
119 : tasks_inplace*/);
120 26 : }
121 :
122 13 : Pipeline
123 : ::Pipeline(const std::vector<runtime::Task*> &firsts,
124 : const std::vector<std::tuple<std::vector<runtime::Task*>, std::vector<runtime::Task*>, std::vector<runtime::Task*>>> &sep_stages,
125 : const std::vector<size_t> &n_threads,
126 : const std::vector<size_t> &synchro_buffer_sizes,
127 : const std::vector<bool> &synchro_active_waiting,
128 : const std::vector<bool> &thread_pinning,
129 : const std::vector<std::vector<size_t>> &puids/*,
130 13 : const std::vector<bool> &tasks_inplace*/)
131 : : Pipeline(firsts,
132 : {},
133 : sep_stages,
134 : n_threads,
135 : synchro_buffer_sizes,
136 : synchro_active_waiting,
137 : thread_pinning, puids/*,
138 13 : tasks_inplace*/)
139 : {
140 13 : }
141 :
142 26 : Pipeline
143 : ::Pipeline(const std::vector<runtime::Task*> &firsts,
144 : const std::vector<std::pair<std::vector<runtime::Task*>, std::vector<runtime::Task*>>> &sep_stages,
145 : const std::vector<size_t> &n_threads,
146 : const std::vector<size_t> &synchro_buffer_sizes,
147 : const std::vector<bool> &synchro_active_waiting,
148 : const std::vector<bool> &thread_pinning,
149 : const std::vector<std::vector<size_t>> &puids/*,
150 26 : const std::vector<bool> &tasks_inplace*/)
151 : : Pipeline(firsts,
152 : {},
153 : sep_stages,
154 : n_threads,
155 : synchro_buffer_sizes,
156 : synchro_active_waiting,
157 : thread_pinning, puids/*,
158 26 : tasks_inplace*/)
159 : {
160 26 : }
161 :
162 0 : Pipeline
163 : ::Pipeline(runtime::Task &first,
164 : runtime::Task &last,
165 : const std::vector<std::pair<std::vector<runtime::Task*>, std::vector<runtime::Task*>>> &sep_stages,
166 : const std::vector<size_t> &n_threads,
167 : const std::vector<size_t> &synchro_buffer_sizes,
168 : const std::vector<bool> &synchro_active_waiting,
169 : const std::vector<bool> &thread_pinning,
170 : const std::vector<std::vector<size_t>> &puids/*,
171 0 : const std::vector<bool> &tasks_inplace*/)
172 : : Pipeline({&first},
173 : {&last},
174 : sep_stages,
175 : n_threads,
176 : synchro_buffer_sizes,
177 : synchro_active_waiting,
178 : thread_pinning, puids/*,
179 0 : tasks_inplace*/)
180 : {
181 0 : }
182 :
183 26 : Pipeline
184 : ::Pipeline(runtime::Task &first,
185 : const std::vector<std::pair<std::vector<runtime::Task*>, std::vector<runtime::Task*>>> &sep_stages,
186 : const std::vector<size_t> &n_threads,
187 : const std::vector<size_t> &synchro_buffer_sizes,
188 : const std::vector<bool> &synchro_active_waiting,
189 : const std::vector<bool> &thread_pinning,
190 : const std::vector<std::vector<size_t>> &puids/*,
191 26 : const std::vector<bool> &tasks_inplace*/)
192 : : Pipeline({&first},
193 : sep_stages,
194 : n_threads,
195 : synchro_buffer_sizes,
196 : synchro_active_waiting,
197 : thread_pinning, puids/*,
198 26 : tasks_inplace*/)
199 : {
200 26 : }
201 :
202 : //========================== Pipeline constructors with new version thread pinning =====================================
203 0 : Pipeline
204 : ::Pipeline(const std::vector<runtime::Task*> &firsts,
205 : const std::vector<runtime::Task*> &lasts,
206 : const std::vector<std::tuple<std::vector<runtime::Task*>, std::vector<runtime::Task*>, std::vector<runtime::Task*>>> &sep_stages,
207 : const std::vector<size_t> &n_threads,
208 : const std::vector<size_t> &synchro_buffer_sizes,
209 : const std::vector<bool> &synchro_active_waiting,
210 : const std::vector<bool> &thread_pinning,
211 : const std::string &pipeline_pinning_policy/*,
212 0 : const std::vector<bool> &tasks_inplace*/)
213 0 : : original_sequence(firsts, lasts, 1),
214 0 : stages(sep_stages.size()),
215 0 : adaptors(sep_stages.size() -1),
216 0 : saved_firsts_tasks_id(sep_stages.size()),
217 0 : saved_lasts_tasks_id(sep_stages.size()),
218 0 : bound_adaptors(false),
219 0 : auto_stop(true)
220 : {
221 0 : this->init<runtime::Task>(firsts,
222 : lasts,
223 : sep_stages,
224 : n_threads,
225 : synchro_buffer_sizes,
226 : synchro_active_waiting,
227 : thread_pinning, {}, pipeline_pinning_policy/*,
228 : tasks_inplace*/);
229 0 : }
230 :
231 41 : Pipeline
232 : ::Pipeline(const std::vector<runtime::Task*> &firsts,
233 : const std::vector<runtime::Task*> &lasts,
234 : const std::vector<std::pair<std::vector<runtime::Task*>, std::vector<runtime::Task*>>> &sep_stages,
235 : const std::vector<size_t> &n_threads,
236 : const std::vector<size_t> &synchro_buffer_sizes,
237 : const std::vector<bool> &synchro_active_waiting,
238 : const std::vector<bool> &thread_pinning,
239 : const std::string &pipeline_pinning_policy/*,
240 41 : const std::vector<bool> &tasks_inplace*/)
241 41 : : original_sequence(firsts, lasts, 1),
242 41 : stages(sep_stages.size()),
243 41 : adaptors(sep_stages.size() -1),
244 41 : saved_firsts_tasks_id(sep_stages.size()),
245 41 : saved_lasts_tasks_id(sep_stages.size()),
246 41 : bound_adaptors(false),
247 123 : auto_stop(true)
248 : {
249 : std::vector<std::tuple<std::vector<runtime::Task*>, std::vector<runtime::Task*>, std::vector<runtime::Task*>>>
250 41 : sep_stages_bis;
251 183 : for (auto& sep_stage : sep_stages)
252 142 : sep_stages_bis.push_back(std::make_tuple(sep_stage.first, sep_stage.second, std::vector<runtime::Task*>()));
253 :
254 41 : this->init<runtime::Task>(firsts,
255 : lasts,
256 : sep_stages_bis,
257 : n_threads,
258 : synchro_buffer_sizes,
259 : synchro_active_waiting,
260 : thread_pinning, {}, pipeline_pinning_policy/*,
261 : tasks_inplace*/);
262 41 : }
263 :
264 0 : Pipeline
265 : ::Pipeline(const std::vector<runtime::Task*> &firsts,
266 : const std::vector<std::tuple<std::vector<runtime::Task*>, std::vector<runtime::Task*>, std::vector<runtime::Task*>>> &sep_stages,
267 : const std::vector<size_t> &n_threads,
268 : const std::vector<size_t> &synchro_buffer_sizes,
269 : const std::vector<bool> &synchro_active_waiting,
270 : const std::vector<bool> &thread_pinning,
271 : const std::string &pipeline_pinning_policy/*,
272 0 : const std::vector<bool> &tasks_inplace*/)
273 : : Pipeline(firsts,
274 : {},
275 : sep_stages,
276 : n_threads,
277 : synchro_buffer_sizes,
278 : synchro_active_waiting,
279 : thread_pinning, pipeline_pinning_policy/*,
280 0 : tasks_inplace*/)
281 : {
282 0 : }
283 :
284 30 : Pipeline
285 : ::Pipeline(const std::vector<runtime::Task*> &firsts,
286 : const std::vector<std::pair<std::vector<runtime::Task*>, std::vector<runtime::Task*>>> &sep_stages,
287 : const std::vector<size_t> &n_threads,
288 : const std::vector<size_t> &synchro_buffer_sizes,
289 : const std::vector<bool> &synchro_active_waiting,
290 : const std::vector<bool> &thread_pinning,
291 : const std::string &pipeline_pinning_policy/*,
292 30 : const std::vector<bool> &tasks_inplace*/)
293 : : Pipeline(firsts,
294 : {},
295 : sep_stages,
296 : n_threads,
297 : synchro_buffer_sizes,
298 : synchro_active_waiting,
299 : thread_pinning, pipeline_pinning_policy/*,
300 30 : tasks_inplace*/)
301 : {
302 30 : }
303 :
304 0 : Pipeline
305 : ::Pipeline(runtime::Task &first,
306 : runtime::Task &last,
307 : const std::vector<std::pair<std::vector<runtime::Task*>, std::vector<runtime::Task*>>> &sep_stages,
308 : const std::vector<size_t> &n_threads,
309 : const std::vector<size_t> &synchro_buffer_sizes,
310 : const std::vector<bool> &synchro_active_waiting,
311 : const std::vector<bool> &thread_pinning,
312 : const std::string &pipeline_pinning_policy/*,
313 0 : const std::vector<bool> &tasks_inplace*/)
314 : : Pipeline({&first},
315 : {&last},
316 : sep_stages,
317 : n_threads,
318 : synchro_buffer_sizes,
319 : synchro_active_waiting,
320 : thread_pinning, pipeline_pinning_policy/*,
321 0 : tasks_inplace*/)
322 : {
323 0 : }
324 :
325 30 : Pipeline
326 : ::Pipeline(runtime::Task &first,
327 : const std::vector<std::pair<std::vector<runtime::Task*>, std::vector<runtime::Task*>>> &sep_stages,
328 : const std::vector<size_t> &n_threads,
329 : const std::vector<size_t> &synchro_buffer_sizes,
330 : const std::vector<bool> &synchro_active_waiting,
331 : const std::vector<bool> &thread_pinning,
332 : const std::string &pipeline_pinning_policy/*,
333 30 : const std::vector<bool> &tasks_inplace*/)
334 : : Pipeline({&first},
335 : sep_stages,
336 : n_threads,
337 : synchro_buffer_sizes,
338 : synchro_active_waiting,
339 : thread_pinning,
340 : pipeline_pinning_policy/*,
341 30 : tasks_inplace*/)
342 : {
343 30 : }
344 :
345 160 : Pipeline::~Pipeline()
346 : {
347 80 : this->unbind_adaptors();
348 160 : }
349 :
350 : std::vector<Sequence*>
351 159 : Pipeline::get_stages()
352 : {
353 159 : std::vector<Sequence*> stages;
354 748 : for (auto& stage : this->stages)
355 589 : stages.push_back(stage.get());
356 159 : return stages;
357 0 : }
358 :
359 : Sequence&
360 0 : Pipeline::operator[](const size_t stage_id)
361 : {
362 0 : assert(stage_id < this->stages.size());
363 0 : return *this->stages[stage_id];
364 : }
365 :
366 : template<class TA>
367 : runtime::Sequence*
368 : create_sequence(const std::vector<TA*>& firsts,
369 : const std::vector<TA*>& lasts,
370 : const std::vector<TA*>& exclusions,
371 : const size_t& n_threads,
372 : const bool& thread_pinning,
373 : const std::vector<size_t>& puids,
374 : const bool& tasks_inplace)
375 : {
376 : throw tools::unimplemented_error(__FILE__, __LINE__, __func__);
377 : }
378 :
379 : template<>
380 : runtime::Sequence*
381 0 : create_sequence<const runtime::Task>(const std::vector<const runtime::Task*>& firsts,
382 : const std::vector<const runtime::Task*>& lasts,
383 : const std::vector<const runtime::Task*>& exclusions,
384 : const size_t& n_threads,
385 : const bool& thread_pinning,
386 : const std::vector<size_t>& puids,
387 : const bool& tasks_inplace)
388 : {
389 0 : return new runtime::Sequence(firsts, lasts, exclusions, n_threads, thread_pinning, puids);
390 : }
391 :
392 : template<>
393 : Sequence*
394 248 : create_sequence<runtime::Task>(const std::vector<runtime::Task*>& firsts,
395 : const std::vector<runtime::Task*>& lasts,
396 : const std::vector<runtime::Task*>& exclusions,
397 : const size_t& n_threads,
398 : const bool& thread_pinning,
399 : const std::vector<size_t>& puids,
400 : const bool& tasks_inplace)
401 : {
402 248 : return new runtime::Sequence(firsts, lasts, exclusions, n_threads, thread_pinning, puids, tasks_inplace);
403 : }
404 :
405 : // Init and sequence creation for second pinning version
406 : template<class TA>
407 : runtime::Sequence*
408 : create_sequence(const std::vector<TA*>& firsts,
409 : const std::vector<TA*>& lasts,
410 : const std::vector<TA*>& exclusions,
411 : const size_t& n_threads,
412 : const bool& thread_pinning,
413 : const std::string& pipeline_pinning_policy,
414 : const bool& tasks_inplace)
415 : {
416 : throw tools::unimplemented_error(__FILE__, __LINE__, __func__);
417 : }
418 :
419 : template<>
420 : runtime::Sequence*
421 0 : create_sequence<const runtime::Task>(const std::vector<const runtime::Task*>& firsts,
422 : const std::vector<const runtime::Task*>& lasts,
423 : const std::vector<const runtime::Task*>& exclusions,
424 : const size_t& n_threads,
425 : const bool& thread_pinning,
426 : const std::string& pipeline_pinning_policy,
427 : const bool& tasks_inplace)
428 : {
429 0 : return new runtime::Sequence(firsts, lasts, exclusions, n_threads, thread_pinning, pipeline_pinning_policy);
430 : }
431 :
432 : template<>
433 : Sequence*
434 34 : create_sequence<runtime::Task>(const std::vector<runtime::Task*>& firsts,
435 : const std::vector<runtime::Task*>& lasts,
436 : const std::vector<runtime::Task*>& exclusions,
437 : const size_t& n_threads,
438 : const bool& thread_pinning,
439 : const std::string& pipeline_pinning_policy,
440 : const bool& tasks_inplace)
441 : {
442 : return new runtime::Sequence(
443 34 : firsts, lasts, exclusions, n_threads, thread_pinning, pipeline_pinning_policy, tasks_inplace);
444 : }
445 :
446 : template <class TA>
447 80 : void Pipeline
448 : ::init(const std::vector<TA*> &firsts,
449 : const std::vector<TA*> &lasts,
450 : const std::vector<std::tuple<std::vector<TA*>,std::vector<TA*>,std::vector<TA*>>> &sep_stages,
451 : const std::vector<size_t> &n_threads,
452 : const std::vector<size_t> &synchro_buffer_sizes,
453 : const std::vector<bool> &synchro_active_waiting,
454 : const std::vector<bool> &thread_pinning,
455 : const std::vector<std::vector<size_t>> &puids,
456 : const std::string &pipeline_pinning_policy/*,
457 : const std::vector<bool> &tasks_inplace*/)
458 : {
459 80 : if (sep_stages.size() != n_threads.size() && n_threads.size() != 0)
460 : {
461 0 : std::stringstream message;
462 0 : message << "'n_threads.size()' has to be equal to 'sep_stages.size()' or equal to '0' ('n_threads.size()' = "
463 0 : << n_threads.size() << " , 'sep_stages.size()' = " << sep_stages.size() << ").";
464 0 : throw tools::invalid_argument(__FILE__, __LINE__, __func__, message.str());
465 0 : }
466 :
467 80 : if (sep_stages.size() != synchro_buffer_sizes.size() + 1 && synchro_buffer_sizes.size() != 0)
468 : {
469 0 : std::stringstream message;
470 : message << "'synchro_buffer_sizes.size()' has to be equal to 'sep_stages.size() -1' or equal to '0' "
471 0 : << "('synchro_buffer_sizes.size()' = " << synchro_buffer_sizes.size()
472 0 : << " , 'sep_stages.size()' = " << sep_stages.size() << ").";
473 0 : throw tools::invalid_argument(__FILE__, __LINE__, __func__, message.str());
474 0 : }
475 :
476 80 : if (sep_stages.size() != synchro_active_waiting.size() + 1 && synchro_active_waiting.size() != 0)
477 : {
478 0 : std::stringstream message;
479 : message << "'synchro_active_waiting.size()' has to be equal to 'sep_stages.size() -1' or equal to '0' "
480 0 : << "('synchro_active_waiting.size()' = " << synchro_active_waiting.size()
481 0 : << " , 'sep_stages.size()' = " << sep_stages.size() << ").";
482 0 : throw tools::invalid_argument(__FILE__, __LINE__, __func__, message.str());
483 0 : }
484 :
485 80 : if (sep_stages.size() != thread_pinning.size() && thread_pinning.size() != 0)
486 : {
487 0 : std::stringstream message;
488 : message << "'thread_pinning.size()' has to be equal to 'sep_stages.size()' or equal to '0' ("
489 0 : << "'thread_pinning.size()' = " << thread_pinning.size()
490 0 : << " , 'sep_stages.size()' = " << sep_stages.size() << ").";
491 0 : throw tools::invalid_argument(__FILE__, __LINE__, __func__, message.str());
492 0 : }
493 :
494 80 : if (sep_stages.size() != puids.size() && puids.size() != 0)
495 : {
496 0 : std::stringstream message;
497 0 : message << "'puids.size()' has to be equal to 'sep_stages.size()' or equal to '0' ('puids.size()' = "
498 0 : << puids.size() << " , 'sep_stages.size()' = " << sep_stages.size() << ").";
499 0 : throw tools::invalid_argument(__FILE__, __LINE__, __func__, message.str());
500 0 : }
501 :
502 : // if (sep_stages.size() != tasks_inplace.size() && tasks_inplace.size() != 0)
503 : // {
504 : // std::stringstream message;
505 : // message << "'tasks_inplace.size()' has to be equal to 'sep_stages.size()' or equal to '0' ('"
506 : // << "tasks_inplace.size()' = " << tasks_inplace.size() << " , 'sep_stages.size()' = "
507 : // << sep_stages.size() << ").";
508 : // throw tools::invalid_argument(__FILE__, __LINE__, __func__, message.str());
509 : // }
510 :
511 80 : bool prev_is_parallel = false;
512 362 : for (auto t : n_threads)
513 : {
514 282 : if (t > 1 && prev_is_parallel)
515 : {
516 0 : std::stringstream message;
517 0 : message << "Consecutive parallel stages are not supported.";
518 0 : throw tools::invalid_argument(__FILE__, __LINE__, __func__, message.str());
519 0 : }
520 282 : prev_is_parallel = t > 1;
521 : }
522 :
523 : // Creating a vector of pinning policies for each sequence
524 80 : std::vector<std::string> sequences_pinning_policies;
525 80 : if (!pipeline_pinning_policy.empty())
526 13 : sequences_pinning_policies =
527 : tools::Thread_pinning_utils::pipeline_parser_unpacker(pipeline_pinning_policy, sep_stages.size());
528 :
529 362 : for (size_t s = 0; s < sep_stages.size(); s++)
530 : {
531 282 : const std::vector<TA*>& stage_firsts = std::get<0>(sep_stages[s]);
532 282 : const std::vector<TA*>& stage_lasts = std::get<1>(sep_stages[s]);
533 282 : const std::vector<TA*>& stage_exclusions = std::get<2>(sep_stages[s]);
534 282 : const size_t stage_n_threads = n_threads.size() ? n_threads[s] : 1;
535 282 : const bool stage_thread_pinning = thread_pinning.size() ? thread_pinning[s] : false;
536 282 : const std::vector<size_t> stage_puids = puids.size() ? puids[s] : std::vector<size_t>();
537 316 : const std::string sequence_pinning_policy =
538 316 : sequences_pinning_policies.size() ? sequences_pinning_policies[s] : "";
539 282 : const bool stage_tasks_inplace = /*tasks_inplace.size() ? tasks_inplace[s] :*/ true;
540 : try
541 : {
542 282 : if (pipeline_pinning_policy.empty())
543 248 : this->stages[s].reset(create_sequence<TA>(stage_firsts,
544 : stage_lasts,
545 : stage_exclusions,
546 : stage_n_threads,
547 : stage_thread_pinning,
548 : stage_puids,
549 : stage_tasks_inplace));
550 : else
551 34 : this->stages[s].reset(create_sequence<TA>(stage_firsts,
552 : stage_lasts,
553 : stage_exclusions,
554 : stage_n_threads,
555 : stage_thread_pinning,
556 : sequence_pinning_policy,
557 : stage_tasks_inplace));
558 : }
559 0 : catch (const tools::control_flow_error& e)
560 : {
561 0 : std::stringstream message;
562 0 : message << "Invalid control flow error on stage " << s
563 0 : << " (perhaps a switcher's tasks were separated between different stages)." << std::endl
564 0 : << e.what();
565 0 : throw tools::control_flow_error(__FILE__, __LINE__, __func__, message.str());
566 0 : }
567 282 : this->stages[s]->is_part_of_pipeline = true;
568 : }
569 :
570 : // verify that the sequential sequence is equivalent to the pipeline sequence
571 80 : auto ref_tasks = this->original_sequence.get_tasks_per_threads()[0];
572 80 : auto cur_tasks = this->get_tasks_per_threads()[0];
573 :
574 80 : if (ref_tasks.size() != cur_tasks.size())
575 : {
576 0 : std::ofstream f1("dbg_ref_sequence.dot");
577 0 : this->original_sequence.export_dot(f1);
578 0 : std::ofstream f2("dbg_cur_pipeline.dot");
579 0 : this->export_dot(f2);
580 :
581 0 : std::stringstream message;
582 0 : message << "'ref_tasks.size()' has to be equal to 'cur_tasks.size()' ('ref_tasks.size()' = " << ref_tasks.size()
583 0 : << ", 'cur_tasks.size()' = " << cur_tasks.size() << ").";
584 0 : throw tools::invalid_argument(__FILE__, __LINE__, __func__, message.str());
585 0 : }
586 :
587 820 : for (size_t ta = 0; ta < cur_tasks.size(); ta++)
588 : {
589 740 : if (std::find(ref_tasks.begin(), ref_tasks.end(), cur_tasks[ta]) == ref_tasks.end())
590 : {
591 0 : std::ofstream f1("dbg_ref_sequence.dot");
592 0 : this->original_sequence.export_dot(f1);
593 0 : std::ofstream f2("dbg_cur_pipeline.dot");
594 0 : this->export_dot(f2);
595 :
596 0 : std::stringstream message;
597 0 : message << "'cur_tasks[ta]' is not contained in the 'ref_tasks' vector ('ta' = " << ta
598 0 : << ", 'cur_tasks[ta]' = " << +cur_tasks[ta]
599 0 : << ", 'cur_tasks[ta]->get_name()' = " << cur_tasks[ta]->get_name() << ").";
600 0 : throw tools::invalid_argument(__FILE__, __LINE__, __func__, message.str());
601 0 : }
602 : }
603 :
604 80 : this->create_adaptors(synchro_buffer_sizes, synchro_active_waiting);
605 80 : this->bind_adaptors();
606 :
607 80 : this->thread_pool.reset(new tools::Thread_pool_standard(this->stages.size() - 1));
608 80 : this->thread_pool->init(); // threads are spawned here
609 80 : }
610 :
611 : void
612 80 : Pipeline::create_adaptors(const std::vector<size_t>& synchro_buffer_sizes,
613 : const std::vector<bool>& synchro_active_waiting)
614 : {
615 : // sck out addr occ stage tsk id sck id
616 80 : std::vector<std::tuple<runtime::Socket*, size_t, size_t, size_t, size_t>> out_sck_orphans;
617 :
618 : // for all the stages in the pipeline
619 282 : for (size_t sta = 0; sta < this->stages.size() - 1; sta++)
620 : {
621 : // ------------------------------------------------------------------------------------------------------------
622 : // --------------------------------------------------------------- collect orphan output sockets in stage 'sta'
623 : // ------------------------------------------------------------------------------------------------------------
624 202 : std::vector<std::vector<runtime::Task*>> tasks_per_threads = this->stages[sta]->get_tasks_per_threads();
625 : // for all the threads in the current stage
626 : // for (size_t t = 0; t < tasks_per_threads.size(); t++)
627 202 : size_t t = 0;
628 : {
629 : // for all the tasks in the stage
630 746 : for (size_t tsk_id = 0; tsk_id < tasks_per_threads[t].size(); tsk_id++)
631 : {
632 544 : auto tsk = tasks_per_threads[t][tsk_id];
633 : // for all the sockets of the tasks
634 1914 : for (size_t sck_id = 0; sck_id < tsk->sockets.size(); sck_id++)
635 : {
636 1370 : auto sck = tsk->sockets[sck_id];
637 : // if the current socket is an output or forward socket type
638 1370 : if (sck->get_type() == socket_t::SOUT || sck->get_type() == socket_t::SFWD)
639 : {
640 : // for all the bounded sockets to the current socket
641 1788 : for (auto bsck : sck->get_bound_sockets())
642 : {
643 : // check if the task of the bounded socket is not in the current stage
644 683 : if (std::find(tasks_per_threads[t].begin(),
645 683 : tasks_per_threads[t].end(),
646 1366 : &bsck->get_task()) == tasks_per_threads[t].end())
647 : {
648 : // check the position of the socket in the orphans
649 331 : size_t pos = 0;
650 904 : for (; pos < out_sck_orphans.size(); pos++)
651 616 : if (std::get<0>(out_sck_orphans[pos]) == sck.get()) break;
652 :
653 331 : if (pos == out_sck_orphans.size())
654 288 : out_sck_orphans.push_back(std::make_tuple(sck.get(), 1, sta, tsk_id, sck_id));
655 : else
656 43 : std::get<1>(out_sck_orphans[pos])++;
657 : }
658 : }
659 : }
660 1370 : }
661 : }
662 : }
663 :
664 : // ------------------------------------------------------------------------------------------------------------
665 : // -------------------------------------- collect orphan input sockets in stage 'sta +1' and create connections
666 : // ------------------------------------------------------------------------------------------------------------
667 202 : tasks_per_threads = this->stages[sta + 1]->get_tasks_per_threads();
668 : // for all the threads in the current stage
669 : // for (size_t t = 0; t < tasks_per_threads.size(); t++)
670 : {
671 : // for all the tasks in the stage
672 791 : for (size_t tsk_id = 0; tsk_id < tasks_per_threads[t].size(); tsk_id++)
673 : {
674 589 : auto tsk = tasks_per_threads[t][tsk_id];
675 : // ----------------------------------------- manage socket to socket bindings (with fake input sockets)
676 : // for all the sockets of the tasks
677 2056 : for (size_t sck_id = 0; sck_id < tsk->sockets.size(); sck_id++)
678 : {
679 1467 : auto sck = tsk->sockets[sck_id];
680 : // if the current socket is an input or forward socket type
681 1467 : if (sck->get_type() == socket_t::SIN || sck->get_type() == socket_t::SFWD)
682 : {
683 596 : runtime::Socket* bsck = nullptr;
684 : try
685 : {
686 : // get output bounded socket
687 596 : bsck = &sck->get_bound_socket(); // can throw if there is no bounded socket
688 : }
689 0 : catch (const std::exception&)
690 : {
691 0 : }
692 596 : if (bsck != nullptr)
693 : {
694 : // check if the task of the bounded socket is not in the current stage
695 596 : if (std::find(tasks_per_threads[t].begin(),
696 596 : tasks_per_threads[t].end(),
697 1192 : &bsck->get_task()) == tasks_per_threads[t].end())
698 : {
699 : // check the position of the bounded socket in the orphans
700 305 : size_t pos = 0;
701 787 : for (; pos < out_sck_orphans.size(); pos++)
702 787 : if (std::get<0>(out_sck_orphans[pos]) == bsck) break;
703 :
704 305 : if (pos < out_sck_orphans.size())
705 : {
706 305 : auto sck_out = std::get<0>(out_sck_orphans[pos]);
707 305 : auto sck_in = sck.get();
708 610 : auto unbind_sout_pos = std::distance(sck_out->get_bound_sockets().begin(),
709 305 : std::find(sck_out->get_bound_sockets().begin(),
710 305 : sck_out->get_bound_sockets().end(),
711 305 : sck_in));
712 305 : this->sck_orphan_binds.push_back(
713 305 : std::make_pair(std::make_tuple(std::get<0>(out_sck_orphans[pos]),
714 305 : std::get<2>(out_sck_orphans[pos]),
715 305 : std::get<3>(out_sck_orphans[pos]),
716 305 : std::get<4>(out_sck_orphans[pos]),
717 : unbind_sout_pos),
718 610 : std::make_tuple(sck_in, sta + 1, tsk_id, sck_id, nullptr)));
719 : }
720 : }
721 : }
722 : }
723 1467 : }
724 : // ------------------------------------------- manage socket to task bindings (with fake input sockets)
725 717 : for (size_t sck_id = 0; sck_id < tsk->fake_input_sockets.size(); sck_id++)
726 : {
727 128 : auto sck = tsk->fake_input_sockets[sck_id];
728 128 : runtime::Socket* bsck = nullptr;
729 : try
730 : {
731 : // get output bounded socket
732 128 : bsck = &sck->get_bound_socket(); // can throw if there is no bounded socket
733 : }
734 0 : catch (const std::exception&)
735 : {
736 0 : }
737 128 : if (bsck != nullptr)
738 : {
739 : // check if the task of the bounded socket is not in the current stage
740 128 : if (std::find(tasks_per_threads[t].begin(), tasks_per_threads[t].end(), &bsck->get_task()) ==
741 256 : tasks_per_threads[t].end())
742 : {
743 : // check the position of the bounded socket in the orphans
744 26 : size_t pos = 0;
745 117 : for (; pos < out_sck_orphans.size(); pos++)
746 117 : if (std::get<0>(out_sck_orphans[pos]) == bsck) break;
747 :
748 26 : if (pos < out_sck_orphans.size())
749 : {
750 26 : auto sck_out = std::get<0>(out_sck_orphans[pos]);
751 26 : auto sck_in = sck.get();
752 52 : auto unbind_sout_pos = std::distance(sck_out->get_bound_sockets().begin(),
753 26 : std::find(sck_out->get_bound_sockets().begin(),
754 26 : sck_out->get_bound_sockets().end(),
755 26 : sck_in));
756 26 : this->sck_orphan_binds.push_back(
757 26 : std::make_pair(std::make_tuple(std::get<0>(out_sck_orphans[pos]),
758 26 : std::get<2>(out_sck_orphans[pos]),
759 26 : std::get<3>(out_sck_orphans[pos]),
760 26 : std::get<4>(out_sck_orphans[pos]),
761 : unbind_sout_pos),
762 52 : std::make_tuple(nullptr, sta + 1, tsk_id, sck_id, tsk)));
763 : }
764 : }
765 : }
766 128 : }
767 : // ----------------------------------------------------------------------------------------------------
768 : }
769 : }
770 202 : }
771 :
772 : // ----------------------------------------------------------------------------------------------------------------
773 : // ----------------------------------------------------------------------------------------------- prints for debug
774 : // ----------------------------------------------------------------------------------------------------------------
775 : // std::cout << "Orphan output sockets list:" << std::endl;
776 : // for (auto &sck : out_sck_orphans)
777 : // {
778 : // auto sck_out_name = std::get<0>(sck)->get_name();
779 : // auto tsk_out_name = std::get<0>(sck)->get_task().get_name();
780 : // auto sck_out_occ = std::get<1>(sck);
781 : // auto tsk_out_sta = std::get<2>(sck);
782 : // auto tsk_out_id = std::get<3>(sck);
783 : // auto sck_out_id = std::get<4>(sck);
784 :
785 : // std::cout << " " << tsk_out_name << "[" << sck_out_name << "] (stage " << tsk_out_sta << ", " << sck_out_occ
786 : // << " occurrences, tsk id = " << tsk_out_id << ", sck id = " << sck_out_id << ")" << std::endl;
787 : // }
788 :
789 : // std::cout << std::endl << "Detected socket binds:" << std::endl;
790 : // for (auto &bind : this->sck_orphan_binds)
791 : // {
792 : // auto sck_out_name = std::get<0>(bind.first)->get_name();
793 : // auto tsk_out_name = std::get<0>(bind.first)->get_task().get_name();
794 : // auto tsk_out_sta = std::get<1>(bind.first);
795 : // auto tsk_out_id = std::get<2>(bind.first);
796 : // auto sck_out_id = std::get<3>(bind.first);
797 : // auto sck_out_ubp = std::get<4>(bind.first);
798 :
799 : // auto sck_in_name = std::get<0>(bind.second)->get_name();
800 : // auto tsk_in_name = std::get<0>(bind.second)->get_task().get_name();
801 : // auto tsk_in_sta = std::get<1>(bind.second);
802 : // auto tsk_in_id = std::get<2>(bind.second);
803 : // auto sck_in_id = std::get<3>(bind.second);
804 :
805 : // std::cout << " " << tsk_out_name << "[" << sck_out_name << "] (stage " << tsk_out_sta << ", tsk id = "
806 : // << tsk_out_id << ", sck id = " << sck_out_id << ", ubp = " << sck_out_ubp << ")" << " -> "
807 : // << tsk_in_name << "[" << sck_in_name << "] (stage " << tsk_in_sta << ", tsk id = "
808 : // << tsk_in_id << ", sck id = " << sck_in_id << ")" << std::endl;
809 : // }
810 :
811 : // ----------------------------------------------------------------------------------------------------------------
812 : // ------------------------------------------------------------------------------------------------ create adaptors
813 : // ----------------------------------------------------------------------------------------------------------------
814 80 : auto sck_orphan_binds_cpy = this->sck_orphan_binds;
815 80 : module::Adaptor* adp = nullptr;
816 80 : std::map<runtime::Socket*, size_t> sck_to_adp_sck_id;
817 362 : for (size_t sta = 0; sta < this->stages.size(); sta++)
818 : {
819 282 : const auto n_threads = this->stages[sta]->get_n_threads();
820 282 : std::vector<std::vector<runtime::Task*>> tasks_per_threads = this->stages[sta]->get_tasks_per_threads();
821 :
822 : // ------------------------------------------------------------------------------------------------------------
823 : // ----------------------------------------------------------------------------------------------- pull adaptor
824 : // ------------------------------------------------------------------------------------------------------------
825 282 : if (sta > 0)
826 : {
827 202 : assert(adp != nullptr);
828 : // sck out addr stage tsk id sck id unbind_pos
829 : std::vector<std::pair<std::tuple<runtime::Socket*, size_t, size_t, size_t, size_t>,
830 : // sck in addr stage tsk id sck id tsk in addr
831 : std::tuple<runtime::Socket*, size_t, size_t, size_t, runtime::Task*>>>
832 202 : sck_orphan_binds_new;
833 :
834 202 : auto n_threads_prev_sta = this->stages[sta - 1]->get_n_threads();
835 2300 : for (size_t t = 0; t < n_threads; t++)
836 : {
837 2098 : module::Adaptor* cur_adp = (t == 0) ? adp : adp->clone();
838 6294 : for (auto& t : cur_adp->tasks)
839 4196 : t->set_fast(true);
840 2098 : if (t > 0)
841 : {
842 1896 : this->adaptors[sta - 1].second.push_back(std::unique_ptr<module::Adaptor>(cur_adp));
843 1896 : cur_adp->set_custom_name((n_threads_prev_sta == 1 ? "Adp_1_to_n_" : "Adp_n_to_1_") +
844 3792 : std::to_string(sta - 1));
845 : }
846 :
847 2098 : auto task_pull = n_threads_prev_sta == 1 ? &(*cur_adp)[(int)module::adp::tsk::pull_n]
848 88 : : &(*cur_adp)[(int)module::adp::tsk::pull_1];
849 :
850 2098 : sck_orphan_binds_new.clear();
851 12883 : for (auto& bind : sck_orphan_binds_cpy)
852 : {
853 10785 : auto tsk_out_sta = std::get<1>(bind.first);
854 10785 : if (tsk_out_sta < sta)
855 : {
856 5856 : auto tsk_in_sta = std::get<1>(bind.second);
857 5856 : if (tsk_in_sta == sta)
858 : {
859 3131 : auto sck_out_ptr = std::get<0>(bind.first);
860 3131 : auto priority = std::get<4>(bind.first);
861 3131 : auto tsk_in_id = std::get<2>(bind.second);
862 3131 : auto sck_in_id = std::get<3>(bind.second);
863 3131 : runtime::Socket* sck_in = nullptr;
864 3131 : runtime::Task* tsk_in = nullptr;
865 3131 : if (std::get<0>(bind.second) != nullptr) // if socket to socket binding
866 2536 : sck_in = tasks_per_threads[t][tsk_in_id]->sockets[sck_in_id].get();
867 : else // if socket to task binding
868 595 : tsk_in = tasks_per_threads[t][tsk_in_id];
869 3131 : this->adaptors_binds.push_back(std::make_tuple(
870 6262 : task_pull->sockets[sck_to_adp_sck_id[sck_out_ptr]].get(), sck_in, priority, tsk_in));
871 : }
872 : else
873 2725 : sck_orphan_binds_new.push_back(bind);
874 : }
875 : else
876 4929 : sck_orphan_binds_new.push_back(bind);
877 : }
878 :
879 2098 : if (t > 0) this->stages[sta]->all_modules[t].push_back(cur_adp);
880 : }
881 202 : this->saved_firsts_tasks_id[sta] = this->stages[sta]->firsts_tasks_id;
882 202 : sck_orphan_binds_cpy = sck_orphan_binds_new;
883 202 : }
884 :
885 : // ------------------------------------------------------------------------------------------------------------
886 : // ----------------------------------------------------------------------------------------------- push adaptor
887 : // ------------------------------------------------------------------------------------------------------------
888 282 : std::map<runtime::Socket*, size_t> sck_to_adp_sck_id_new;
889 282 : if (sta < this->stages.size() - 1)
890 : {
891 202 : std::vector<size_t> adp_n_elmts;
892 202 : std::vector<std::type_index> adp_datatype;
893 202 : size_t adp_buffer_size = synchro_buffer_sizes.size() ? synchro_buffer_sizes[sta] : 1;
894 202 : bool adp_active_waiting = synchro_active_waiting.size() ? synchro_active_waiting[sta] : false;
895 202 : size_t adp_n_frames = 1;
896 :
897 : // a map to remember if a passed socket points already to the same memory space
898 202 : std::map<void*, size_t> fwd_source;
899 :
900 202 : std::vector<runtime::Socket*> passed_scks_out;
901 909 : for (auto& bind : sck_orphan_binds_cpy)
902 : {
903 707 : auto tsk_out_sta = std::get<1>(bind.first);
904 707 : if (tsk_out_sta <= sta)
905 : {
906 458 : auto sck_out = std::get<0>(bind.first);
907 458 : if (std::find(passed_scks_out.begin(), passed_scks_out.end(), sck_out) == passed_scks_out.end())
908 : {
909 : // avoid the creation of new adaptor sockets for forward sockets pointing to the same memory
910 : // space
911 406 : auto sck_out_dptr = (void*)sck_out->get_dataptr<int8_t>();
912 406 : assert(sck_out_dptr != nullptr);
913 406 : if (fwd_source.find(sck_out_dptr) == fwd_source.end())
914 : {
915 349 : fwd_source[sck_out_dptr] = 1;
916 349 : adp_n_frames = sck_out->get_task().get_module().get_n_frames();
917 349 : adp_n_elmts.push_back(sck_out->get_n_elmts() / adp_n_frames);
918 349 : adp_datatype.push_back(sck_out->get_datatype());
919 : }
920 406 : passed_scks_out.push_back(sck_out);
921 : }
922 : }
923 : }
924 202 : passed_scks_out.clear();
925 :
926 : // allocate the adaptor for the first thread
927 202 : if (n_threads == 1)
928 114 : adp = new module::Adaptor_1_to_n(adp_n_elmts, adp_datatype, adp_buffer_size, adp_active_waiting);
929 : else
930 88 : adp = new module::Adaptor_n_to_1(adp_n_elmts, adp_datatype, adp_buffer_size, adp_active_waiting);
931 202 : adp->set_n_frames(adp_n_frames);
932 :
933 2300 : for (size_t t = 0; t < n_threads; t++)
934 : {
935 2098 : module::Adaptor* cur_adp = (t == 0) ? adp : adp->clone();
936 2098 : cur_adp->set_custom_name((n_threads == 1 ? "Adp_1_to_n_" : "Adp_n_to_1_") + std::to_string(sta));
937 2098 : this->adaptors[sta].first.push_back(std::unique_ptr<module::Adaptor>(cur_adp));
938 2098 : auto task_push = n_threads == 1 ? &(*cur_adp)[(int)module::adp::tsk::push_1]
939 1984 : : &(*cur_adp)[(int)module::adp::tsk::push_n];
940 :
941 2098 : std::map<void*, size_t> fwd_source;
942 2098 : sck_to_adp_sck_id_new.clear();
943 2098 : size_t adp_sck_id = 0;
944 10083 : for (auto& bind : sck_orphan_binds_cpy) // bind standard task to last adaptor
945 : {
946 7985 : auto tsk_out_sta = std::get<1>(bind.first);
947 :
948 7985 : if (tsk_out_sta <= sta)
949 : {
950 6836 : auto sck_out_ptr = std::get<0>(bind.first);
951 6836 : auto sck_out_dptr = (void*)sck_out_ptr->get_dataptr<int8_t>();
952 6836 : assert(sck_out_dptr != nullptr);
953 :
954 6836 : if (std::find(passed_scks_out.begin(),
955 : passed_scks_out.end(),
956 19707 : sck_out_ptr) == passed_scks_out.end() &&
957 12871 : fwd_source.find(sck_out_dptr) == fwd_source.end()) // <= the latest condition is here to
958 : // avoid to bind adaptor sockets two
959 : // times the same memory space
960 : // (usefull in the case of multiple
961 : // fwd sockets pointing to the same
962 : // memory address)
963 : {
964 4838 : if (tsk_out_sta == sta)
965 : {
966 3283 : auto tsk_out_id = std::get<2>(bind.first);
967 3283 : auto sck_out_id = std::get<3>(bind.first);
968 3283 : auto sck_out = tasks_per_threads[t][tsk_out_id]->sockets[sck_out_id];
969 3283 : auto priority = std::get<4>(bind.first);
970 3283 : sck_to_adp_sck_id_new[sck_out_ptr] = adp_sck_id;
971 3283 : this->adaptors_binds.push_back(
972 3283 : std::make_tuple(sck_out.get(),
973 3283 : task_push->sockets[adp_sck_id++].get(),
974 : priority,
975 3283 : nullptr)); // <= only socket to socket binding is possible here
976 3283 : }
977 : else // if (tsk_out_sta < sta) // bind prev. adaptor to last adaptor
978 : {
979 1555 : auto n_threads_prev_sta = this->stages[sta - 1]->get_n_threads();
980 1555 : auto tsk_out_id = (n_threads_prev_sta == 1) ? (size_t)module::adp::tsk::pull_n
981 : : (size_t)module::adp::tsk::pull_1;
982 1555 : auto sck_out_id = sck_to_adp_sck_id[sck_out_ptr];
983 1555 : sck_to_adp_sck_id_new[sck_out_ptr] = adp_sck_id;
984 : auto adp_prev =
985 1555 : t == 0 ? this->adaptors[sta - 1].first[0] : this->adaptors[sta - 1].second[t - 1];
986 1555 : auto sck_out = (*adp_prev)[tsk_out_id].sockets[sck_out_id];
987 1555 : auto priority = std::get<4>(bind.first);
988 1555 : this->adaptors_binds.push_back(
989 1555 : std::make_tuple(sck_out.get(),
990 1555 : task_push->sockets[adp_sck_id++].get(),
991 : priority,
992 1555 : nullptr)); // <= only socket to socket binding is possible here
993 1555 : }
994 :
995 4838 : fwd_source[sck_out_dptr] = 1; // remember that this memory space has been connected to the
996 : // adaptor once
997 4838 : passed_scks_out.push_back(sck_out_ptr);
998 : }
999 : }
1000 : }
1001 2098 : passed_scks_out.clear();
1002 2098 : }
1003 202 : this->saved_lasts_tasks_id[sta] = this->stages[sta]->lasts_tasks_id;
1004 202 : }
1005 282 : sck_to_adp_sck_id = sck_to_adp_sck_id_new;
1006 282 : }
1007 80 : }
1008 :
1009 : void
1010 240 : Pipeline::bind_adaptors()
1011 : {
1012 240 : this->_bind_adaptors(true);
1013 240 : }
1014 :
1015 : void
1016 240 : Pipeline::_bind_adaptors(const bool bind_adaptors)
1017 : {
1018 240 : if (!this->bound_adaptors)
1019 : {
1020 1086 : for (size_t sta = 0; sta < this->stages.size(); sta++)
1021 : {
1022 846 : const auto n_threads = this->stages[sta]->get_n_threads();
1023 :
1024 : // --------------------------------------------------------------------------------------------------------
1025 : // ------------------------------------------------------------------------------------------- pull adaptor
1026 : // --------------------------------------------------------------------------------------------------------
1027 846 : if (sta > 0)
1028 : {
1029 606 : auto n_threads_prev_sta = this->stages[sta - 1]->get_n_threads();
1030 6900 : for (size_t t = 0; t < n_threads; t++)
1031 : {
1032 : module::Adaptor* cur_adp =
1033 6294 : t > 0 ? adaptors[sta - 1].second[t - 1].get() : adaptors[sta - 1].first[0].get();
1034 :
1035 6294 : if (t > 0 || sta == this->stages.size() - 1) // add the adaptor to the current stage
1036 5916 : this->stages[sta]->all_modules[t].push_back(cur_adp);
1037 :
1038 6294 : auto task_pull = n_threads_prev_sta == 1 ? &(*cur_adp)[(int)module::adp::tsk::pull_n]
1039 264 : : &(*cur_adp)[(int)module::adp::tsk::pull_1];
1040 :
1041 6294 : auto ss = this->stages[sta]->sequences[t]->get_contents();
1042 6294 : assert(ss != nullptr);
1043 6294 : ss->tasks.insert(ss->tasks.begin(), task_pull);
1044 6294 : ss->processes.insert(ss->processes.begin(),
1045 0 : [task_pull]() -> const int*
1046 : {
1047 0 : task_pull->exec();
1048 0 : const int* status = task_pull->sockets.back()->get_dataptr<const int>();
1049 0 : return status;
1050 : });
1051 6294 : this->stages[sta]->update_tasks_id(t);
1052 : }
1053 606 : this->stages[sta]->firsts_tasks_id.clear();
1054 606 : this->stages[sta]->firsts_tasks_id.push_back(0);
1055 606 : this->stages[sta]->n_tasks++;
1056 : }
1057 :
1058 : // --------------------------------------------------------------------------------------------------------
1059 : // ------------------------------------------------------------------------------------------- push adaptor
1060 : // --------------------------------------------------------------------------------------------------------
1061 846 : if (sta < this->stages.size() - 1)
1062 : {
1063 606 : size_t last_task_id = 0;
1064 6900 : for (size_t t = 0; t < n_threads; t++)
1065 : {
1066 6294 : module::Adaptor* cur_adp = adaptors[sta].first[t].get();
1067 :
1068 : // add the adaptor to the current stage
1069 6294 : this->stages[sta]->all_modules[t].push_back(cur_adp);
1070 :
1071 6294 : auto task_push = n_threads == 1 ? &(*cur_adp)[(int)module::adp::tsk::push_1]
1072 5952 : : &(*cur_adp)[(int)module::adp::tsk::push_n];
1073 :
1074 6294 : auto ss = this->stages[sta]->get_last_subsequence(t);
1075 6294 : assert(ss != nullptr);
1076 6294 : ss->tasks.push_back(task_push);
1077 6294 : ss->processes.push_back(
1078 0 : [task_push]() -> const int*
1079 : {
1080 0 : task_push->exec();
1081 0 : const int* status = task_push->sockets.back()->get_dataptr<const int>();
1082 0 : return status;
1083 : });
1084 6294 : last_task_id = ss->tasks_id[ss->tasks_id.size() - 1] + 1;
1085 6294 : ss->tasks_id.push_back(last_task_id);
1086 : }
1087 606 : this->stages[sta]->lasts_tasks_id.clear();
1088 606 : this->stages[sta]->lasts_tasks_id.push_back(last_task_id);
1089 606 : this->stages[sta]->n_tasks++;
1090 : }
1091 846 : this->stages[sta]->update_firsts_and_lasts_tasks();
1092 : }
1093 :
1094 : // ------------------------------------------------------------------------------------------------------------
1095 : // ---------------------------------------------------------------------------------------------- bind adaptors
1096 : // ------------------------------------------------------------------------------------------------------------
1097 1233 : for (auto& bind : this->sck_orphan_binds)
1098 : {
1099 993 : auto sck_out = std::get<0>(bind.first);
1100 993 : auto sck_in = std::get<0>(bind.second);
1101 993 : if (sck_in != nullptr) // if socket to socket unbinding
1102 915 : sck_in->unbind(*sck_out);
1103 : else // if socket to task unbinding
1104 : {
1105 78 : auto tsk_in = std::get<4>(bind.second);
1106 78 : assert(tsk_in != nullptr);
1107 78 : tsk_in->unbind(*sck_out);
1108 : }
1109 : }
1110 :
1111 240 : if (bind_adaptors)
1112 : {
1113 24147 : for (auto& bind : this->adaptors_binds)
1114 : {
1115 23907 : auto sck_out = std::get<0>(bind);
1116 23907 : auto sck_in = std::get<1>(bind);
1117 23907 : auto priority = std::get<2>(bind);
1118 23907 : if (sck_in != nullptr) // if socket to socket binding
1119 22122 : sck_in->_bind(*sck_out, priority);
1120 : else // if socket to task binding
1121 : {
1122 1785 : auto tsk_in = std::get<3>(bind);
1123 1785 : assert(tsk_in != nullptr);
1124 1785 : tsk_in->_bind(*sck_out, priority);
1125 : }
1126 : }
1127 : }
1128 :
1129 240 : this->bound_adaptors = true;
1130 : }
1131 240 : }
1132 :
1133 : void
1134 160 : Pipeline::unbind_adaptors()
1135 : {
1136 160 : this->_unbind_adaptors(true);
1137 160 : }
1138 :
1139 : void
1140 320 : Pipeline::_unbind_adaptors(const bool bind_orphans)
1141 : {
1142 320 : if (this->bound_adaptors)
1143 : {
1144 1086 : for (size_t sta = 0; sta < this->stages.size(); sta++)
1145 : {
1146 846 : const auto n_threads = this->stages[sta]->get_n_threads();
1147 :
1148 : // --------------------------------------------------------------------------------------------------------
1149 : // ------------------------------------------------------------------------------------------- pull adaptor
1150 : // --------------------------------------------------------------------------------------------------------
1151 846 : if (sta > 0)
1152 : {
1153 6900 : for (size_t t = 0; t < n_threads; t++)
1154 : {
1155 6294 : if (t > 0 || sta == this->stages.size() - 1) // rm the adaptor to the current stage
1156 5916 : this->stages[sta]->all_modules[t].pop_back();
1157 :
1158 6294 : auto ss = this->stages[sta]->sequences[t]->get_contents();
1159 6294 : assert(ss != nullptr);
1160 6294 : ss->tasks.erase(ss->tasks.begin());
1161 6294 : ss->processes.erase(ss->processes.begin());
1162 6294 : this->stages[sta]->update_tasks_id(t);
1163 : }
1164 606 : this->stages[sta]->firsts_tasks_id = this->saved_firsts_tasks_id[sta];
1165 606 : this->stages[sta]->n_tasks--;
1166 : }
1167 :
1168 : // --------------------------------------------------------------------------------------------------------
1169 : // ------------------------------------------------------------------------------------------- push adaptor
1170 : // --------------------------------------------------------------------------------------------------------
1171 846 : if (sta < this->stages.size() - 1)
1172 : {
1173 6900 : for (size_t t = 0; t < n_threads; t++)
1174 : {
1175 : // rm the adaptor to the current stage
1176 6294 : this->stages[sta]->all_modules[t].pop_back();
1177 :
1178 6294 : auto ss = this->stages[sta]->get_last_subsequence(t);
1179 6294 : assert(ss != nullptr);
1180 6294 : ss->tasks.pop_back();
1181 6294 : ss->processes.pop_back();
1182 6294 : ss->tasks_id.pop_back();
1183 : }
1184 606 : this->stages[sta]->lasts_tasks_id = this->saved_lasts_tasks_id[sta];
1185 606 : this->stages[sta]->n_tasks--;
1186 : }
1187 846 : this->stages[sta]->update_firsts_and_lasts_tasks();
1188 : }
1189 :
1190 : // ------------------------------------------------------------------------------------------------------------
1191 : // -------------------------------------------------------------------------------------------- unbind adaptors
1192 : // ------------------------------------------------------------------------------------------------------------
1193 24147 : for (auto& bind : this->adaptors_binds)
1194 : {
1195 23907 : auto sck_out = std::get<0>(bind);
1196 23907 : auto sck_in = std::get<1>(bind);
1197 23907 : if (sck_in != nullptr) // if socket to socket unbinding
1198 22122 : sck_in->unbind(*sck_out);
1199 : else // if socket to task unbinding
1200 : {
1201 1785 : auto tsk_in = std::get<3>(bind);
1202 1785 : assert(tsk_in != nullptr);
1203 1785 : tsk_in->unbind(*sck_out);
1204 : }
1205 : }
1206 :
1207 240 : if (bind_orphans)
1208 : {
1209 411 : for (auto& bind : this->sck_orphan_binds)
1210 : {
1211 331 : auto sck_out = std::get<0>(bind.first);
1212 331 : auto priority = std::get<4>(bind.first);
1213 331 : auto sck_in = std::get<0>(bind.second);
1214 331 : if (sck_in != nullptr) // if socket to socket binding
1215 305 : sck_in->_bind(*sck_out, priority);
1216 : else // if socket to task binding
1217 : {
1218 26 : auto tsk_in = std::get<4>(bind.second);
1219 26 : assert(tsk_in != nullptr);
1220 26 : tsk_in->_bind(*sck_out, priority);
1221 : }
1222 : }
1223 : }
1224 :
1225 240 : this->bound_adaptors = false;
1226 : }
1227 320 : }
1228 :
1229 : void
1230 0 : Pipeline::exec(const std::vector<std::function<bool(const std::vector<const int*>&)>>& stop_conditions)
1231 : {
1232 0 : if (stop_conditions.size() != this->stages.size() && stop_conditions.size() != 1)
1233 : {
1234 0 : std::stringstream message;
1235 0 : message << "'stop_conditions.size()' has to be equal to 'stages.size()' or to 1 ('stop_conditions.size()' = "
1236 0 : << stop_conditions.size() << ", 'stages.size()' = " << stages.size() << ").";
1237 0 : throw tools::invalid_argument(__FILE__, __LINE__, __func__, message.str());
1238 0 : }
1239 :
1240 0 : if (!this->bound_adaptors)
1241 : {
1242 0 : std::stringstream message;
1243 0 : message << "'bound_adaptors' has to be true to execute the pipeline.";
1244 0 : throw tools::runtime_error(__FILE__, __LINE__, __func__, message.str());
1245 0 : }
1246 :
1247 : // ----------------------------------------------------------------------------------------------------------------
1248 0 : auto& stages = this->stages;
1249 0 : std::vector<const std::function<bool(const std::vector<const int*>&)>*> stop_condition_vec(stages.size() - 1,
1250 0 : nullptr);
1251 0 : if (stop_conditions.size() == stages.size())
1252 0 : for (size_t s = 0; s < stages.size() - 1; s++)
1253 0 : stop_condition_vec[s] = &stop_conditions[s];
1254 :
1255 0 : std::function<void(const size_t)> func_exec = [&stages, &stop_condition_vec](const size_t tid)
1256 : {
1257 0 : size_t s = tid;
1258 0 : if (stop_condition_vec[s])
1259 0 : stages[s]->exec(*(stop_condition_vec[s]));
1260 : else
1261 0 : stages[s]->exec();
1262 :
1263 : // send the signal to stop the next stage
1264 0 : const auto& tasks = stages[s + 1]->get_tasks_per_threads();
1265 0 : for (size_t th = 0; th < tasks.size(); th++)
1266 0 : for (size_t ta = 0; ta < tasks[th].size(); ta++)
1267 : {
1268 0 : auto m = dynamic_cast<module::Adaptor*>(&tasks[th][ta]->get_module());
1269 0 : if (m != nullptr)
1270 0 : if (tasks[th][ta]->get_name() == "pull_n" || tasks[th][ta]->get_name() == "pull_1")
1271 0 : m->cancel_waiting();
1272 : }
1273 0 : };
1274 :
1275 0 : this->thread_pool->run(func_exec, true);
1276 :
1277 0 : stages[stages.size() - 1]->exec(stop_conditions[stop_conditions.size() - 1]);
1278 :
1279 : // stop all the stages before
1280 0 : for (size_t notify_s = 0; notify_s < stages.size() - 1; notify_s++)
1281 0 : for (auto& m : stages[notify_s]->get_modules<tools::Interface_waiting>())
1282 0 : m->cancel_waiting();
1283 :
1284 0 : this->thread_pool->wait();
1285 0 : this->thread_pool->unset_func_exec();
1286 : // ----------------------------------------------------------------------------------------------------------------
1287 :
1288 : // this is NOT made in the tools::Sequence::exec() to correctly flush the pipeline before restoring buffers
1289 : // initial configuration
1290 0 : for (auto& stage : this->stages)
1291 0 : if (stage->is_no_copy_mode())
1292 : {
1293 0 : stage->reset_no_copy_mode();
1294 0 : stage->gen_processes(false);
1295 : }
1296 :
1297 0 : for (auto& padps : this->adaptors)
1298 : {
1299 0 : for (auto& adp : padps.first)
1300 0 : adp->reset();
1301 0 : for (auto& adp : padps.second)
1302 0 : adp->reset();
1303 : }
1304 0 : }
1305 :
1306 : void
1307 80 : Pipeline::exec(const std::vector<std::function<bool()>>& stop_conditions)
1308 : {
1309 80 : if (stop_conditions.size() != this->stages.size() && stop_conditions.size() != 1)
1310 : {
1311 0 : std::stringstream message;
1312 0 : message << "'stop_conditions.size()' has to be equal to 'stages.size()' or to 1 ('stop_conditions.size()' = "
1313 0 : << stop_conditions.size() << ", 'stages.size()' = " << stages.size() << ").";
1314 0 : throw tools::invalid_argument(__FILE__, __LINE__, __func__, message.str());
1315 0 : }
1316 :
1317 80 : if (!this->bound_adaptors)
1318 : {
1319 0 : std::stringstream message;
1320 0 : message << "'bound_adaptors' has to be true to execute the pipeline.";
1321 0 : throw tools::runtime_error(__FILE__, __LINE__, __func__, message.str());
1322 0 : }
1323 :
1324 : // ----------------------------------------------------------------------------------------------------------------
1325 80 : auto& stages = this->stages;
1326 80 : std::vector<const std::function<bool()>*> stop_condition_vec(stages.size() - 1, nullptr);
1327 80 : if (stop_conditions.size() == stages.size())
1328 4 : for (size_t s = 0; s < stages.size() - 1; s++)
1329 0 : stop_condition_vec[s] = &stop_conditions[s];
1330 :
1331 577 : std::function<void(const size_t)> func_exec = [&stages, &stop_condition_vec](const size_t tid)
1332 : {
1333 189 : size_t s = tid;
1334 189 : if (stop_condition_vec[s])
1335 0 : stages[s]->exec(*(stop_condition_vec[s]));
1336 : else
1337 191 : stages[s]->exec();
1338 :
1339 : // send the signal to stop the next stage
1340 197 : const auto& tasks = stages[s + 1]->get_tasks_per_threads();
1341 2297 : for (size_t th = 0; th < tasks.size(); th++)
1342 15584 : for (size_t ta = 0; ta < tasks[th].size(); ta++)
1343 : {
1344 13461 : auto m = dynamic_cast<module::Adaptor*>(&tasks[th][ta]->get_module());
1345 13487 : if (m != nullptr)
1346 4113 : if (tasks[th][ta]->get_name() == "pull_n" || tasks[th][ta]->get_name() == "pull_1")
1347 2097 : m->cancel_waiting();
1348 : }
1349 280 : };
1350 :
1351 80 : this->thread_pool->run(func_exec, true);
1352 80 : stages[stages.size() - 1]->exec(stop_conditions[stop_conditions.size() - 1]);
1353 :
1354 : // stop all the stages before
1355 282 : for (size_t notify_s = 0; notify_s < stages.size() - 1; notify_s++)
1356 6092 : for (auto& m : stages[notify_s]->get_modules<tools::Interface_waiting>())
1357 6092 : m->cancel_waiting();
1358 :
1359 80 : this->thread_pool->wait();
1360 80 : this->thread_pool->unset_func_exec();
1361 : // ----------------------------------------------------------------------------------------------------------------
1362 :
1363 : // this is NOT made in the tools::Sequence::exec() to correctly flush the pipeline before restoring buffers
1364 : // initial configuration
1365 362 : for (auto& stage : this->stages)
1366 282 : if (stage->is_no_copy_mode())
1367 : {
1368 282 : stage->reset_no_copy_mode();
1369 282 : stage->gen_processes(false);
1370 : }
1371 :
1372 282 : for (auto& padps : this->adaptors)
1373 : {
1374 2300 : for (auto& adp : padps.first)
1375 2098 : adp->reset();
1376 2098 : for (auto& adp : padps.second)
1377 1896 : adp->reset();
1378 : }
1379 80 : }
1380 :
1381 : void
1382 0 : Pipeline::exec(std::function<bool(const std::vector<const int*>&)> stop_condition)
1383 : {
1384 0 : this->exec(std::vector<std::function<bool(const std::vector<const int*>&)>>(1, stop_condition));
1385 0 : }
1386 :
1387 : void
1388 80 : Pipeline::exec(std::function<bool()> stop_condition)
1389 : {
1390 80 : this->exec(std::vector<std::function<bool()>>(1, stop_condition));
1391 80 : }
1392 :
1393 : void
1394 11 : Pipeline::exec()
1395 : {
1396 6873 : this->exec([]() { return false; });
1397 11 : }
1398 :
1399 : std::vector<std::vector<module::Module*>>
1400 0 : Pipeline::get_modules_per_threads() const
1401 : {
1402 0 : std::vector<std::vector<module::Module*>> modules_per_threads;
1403 0 : for (auto& stage : this->stages)
1404 : {
1405 0 : auto modules_per_threads_add = stage->get_modules_per_threads();
1406 0 : if (modules_per_threads_add.size() > modules_per_threads.size())
1407 0 : modules_per_threads.resize(modules_per_threads_add.size());
1408 :
1409 0 : for (size_t t = 0; t < modules_per_threads_add.size(); t++)
1410 0 : modules_per_threads[t].insert(
1411 0 : modules_per_threads[t].end(), modules_per_threads_add[t].begin(), modules_per_threads_add[t].end());
1412 0 : }
1413 0 : return modules_per_threads;
1414 0 : }
1415 :
1416 : std::vector<std::vector<module::Module*>>
1417 0 : Pipeline::get_modules_per_types() const
1418 : {
1419 0 : std::vector<std::vector<module::Module*>> modules_per_types;
1420 0 : for (auto& stage : this->stages)
1421 : {
1422 0 : auto modules_per_types_add = stage->get_modules_per_types();
1423 0 : modules_per_types.insert(modules_per_types.end(), modules_per_types_add.begin(), modules_per_types_add.end());
1424 0 : }
1425 0 : return modules_per_types;
1426 0 : }
1427 :
1428 : std::vector<std::vector<module::Module*>>
1429 0 : Pipeline::get_original_modules() const
1430 : {
1431 0 : return this->original_sequence.get_modules_per_types();
1432 : }
1433 :
1434 : std::vector<std::vector<runtime::Task*>>
1435 80 : Pipeline::get_tasks_per_threads() const
1436 : {
1437 80 : std::vector<std::vector<runtime::Task*>> tasks_per_threads;
1438 362 : for (auto& stage : this->stages)
1439 : {
1440 282 : auto tasks_per_threads_add = stage->get_tasks_per_threads();
1441 282 : if (tasks_per_threads_add.size() > tasks_per_threads.size())
1442 147 : tasks_per_threads.resize(tasks_per_threads_add.size());
1443 :
1444 2470 : for (size_t t = 0; t < tasks_per_threads_add.size(); t++)
1445 8752 : tasks_per_threads[t].insert(
1446 8752 : tasks_per_threads[t].end(), tasks_per_threads_add[t].begin(), tasks_per_threads_add[t].end());
1447 282 : }
1448 80 : return tasks_per_threads;
1449 0 : }
1450 :
1451 : std::vector<std::vector<runtime::Task*>>
1452 0 : Pipeline::get_tasks_per_types() const
1453 : {
1454 0 : std::vector<std::vector<runtime::Task*>> tasks_per_types;
1455 0 : for (auto& stage : this->stages)
1456 : {
1457 0 : auto tasks_per_types_add = stage->get_tasks_per_types();
1458 0 : tasks_per_types.insert(tasks_per_types.end(), tasks_per_types_add.begin(), tasks_per_types_add.end());
1459 0 : }
1460 0 : return tasks_per_types;
1461 0 : }
1462 :
1463 : void
1464 5 : Pipeline::export_dot(std::ostream& stream) const
1465 : {
1466 : std::function<void(tools::Digraph_node<Sub_sequence>*,
1467 : const size_t,
1468 : const std::string&,
1469 : std::ostream&,
1470 : std::vector<tools::Digraph_node<Sub_sequence>*>&)>
1471 22 : export_dot_subsequences_recursive = [&export_dot_subsequences_recursive,
1472 : this](tools::Digraph_node<Sub_sequence>* cur_node,
1473 : const size_t sta,
1474 : const std::string& tab,
1475 : std::ostream& stream,
1476 22 : std::vector<tools::Digraph_node<Sub_sequence>*>& already_parsed_nodes)
1477 : {
1478 44 : if (cur_node != nullptr &&
1479 44 : std::find(already_parsed_nodes.begin(), already_parsed_nodes.end(), cur_node) == already_parsed_nodes.end())
1480 : {
1481 22 : already_parsed_nodes.push_back(cur_node);
1482 44 : this->stages[sta]->export_dot_subsequence(cur_node->get_c()->tasks,
1483 22 : cur_node->get_c()->tasks_id,
1484 22 : cur_node->get_c()->type,
1485 44 : "Sub-sequence " + std::to_string(cur_node->get_c()->id) +
1486 88 : " (depth = " + std::to_string(cur_node->get_depth()) + ")",
1487 : tab,
1488 : stream);
1489 :
1490 22 : for (auto c : cur_node->get_children())
1491 0 : export_dot_subsequences_recursive(c, sta, tab, stream, already_parsed_nodes);
1492 : }
1493 27 : };
1494 :
1495 : std::function<void(tools::Digraph_node<Sub_sequence>*,
1496 : const size_t,
1497 : const std::string&,
1498 : std::ostream&,
1499 : std::vector<tools::Digraph_node<Sub_sequence>*>&)>
1500 : export_dot_connections_recursive =
1501 22 : [&export_dot_connections_recursive, this](tools::Digraph_node<Sub_sequence>* cur_node,
1502 : const size_t sta,
1503 : const std::string& tab,
1504 : std::ostream& stream,
1505 22 : std::vector<tools::Digraph_node<Sub_sequence>*>& already_parsed_nodes)
1506 : {
1507 44 : if (cur_node != nullptr &&
1508 44 : std::find(already_parsed_nodes.begin(), already_parsed_nodes.end(), cur_node) == already_parsed_nodes.end())
1509 : {
1510 22 : already_parsed_nodes.push_back(cur_node);
1511 22 : this->stages[sta]->export_dot_connections(cur_node->get_c()->tasks, tab, stream);
1512 :
1513 22 : for (auto c : cur_node->get_children())
1514 0 : export_dot_connections_recursive(c, sta, tab, stream, already_parsed_nodes);
1515 : }
1516 27 : };
1517 :
1518 5 : std::string tab = "\t";
1519 5 : stream << "digraph Pipeline {" << std::endl;
1520 5 : stream << tab << "compound=true;" << std::endl;
1521 :
1522 27 : for (size_t sta = 0; sta < this->stages.size(); sta++)
1523 : {
1524 22 : std::vector<tools::Digraph_node<Sub_sequence>*> already_parsed_nodes;
1525 22 : const auto n_threads = this->stages[sta]->get_n_threads();
1526 22 : stream << tab << "subgraph \"cluster_Stage " << sta << "\" {" << std::endl;
1527 22 : stream << tab << tab << "node [style=filled];" << std::endl;
1528 22 : export_dot_subsequences_recursive(this->stages[sta]->sequences[0], sta, tab, stream, already_parsed_nodes);
1529 22 : stream << tab << tab << "label=\"Pipeline stage " << sta << " (" << n_threads << " thread(s))\";" << std::endl;
1530 22 : std::string color = "blue";
1531 22 : stream << tab << tab << "color=" << color << ";" << std::endl;
1532 22 : stream << tab << "}" << std::endl;
1533 22 : }
1534 :
1535 27 : for (size_t sta = 0; sta < this->stages.size(); sta++)
1536 : {
1537 22 : std::vector<tools::Digraph_node<Sub_sequence>*> already_parsed_nodes;
1538 22 : export_dot_connections_recursive(this->stages[sta]->sequences[0], sta, tab, stream, already_parsed_nodes);
1539 22 : if (this->bound_adaptors)
1540 : {
1541 22 : if (sta > 0)
1542 : {
1543 17 : auto tsk1 = this->stages[sta - 1]->get_lasts_tasks()[0].back();
1544 17 : auto tsk2 = this->stages[sta + 0]->get_firsts_tasks()[0][0];
1545 :
1546 17 : auto sck1 = tsk1->sockets[0];
1547 17 : auto sck2 = tsk2->sockets[0];
1548 :
1549 17 : stream << tab << "\"" << +sck1.get() << "\" -> \"" << +sck2.get() << "\" [ltail=\"cluster_"
1550 17 : << +&tsk1->get_module() << "_" << +tsk1 << "\" lhead=\"cluster_" << +&tsk2->get_module() << "_"
1551 17 : << +tsk2 << "\" color=\"green\" style=\"dashed\"];" << std::endl;
1552 17 : }
1553 : }
1554 22 : }
1555 :
1556 5 : stream << "}" << std::endl;
1557 5 : }
1558 :
1559 : bool
1560 160 : Pipeline::is_bound_adaptors() const
1561 : {
1562 160 : return this->bound_adaptors;
1563 : }
1564 :
1565 : void
1566 0 : Pipeline::set_auto_stop(const bool auto_stop)
1567 : {
1568 0 : this->auto_stop = auto_stop;
1569 0 : for (auto stage : this->stages)
1570 0 : stage->set_auto_stop(auto_stop);
1571 0 : }
1572 :
1573 : bool
1574 0 : Pipeline::is_auto_stop() const
1575 : {
1576 0 : return this->auto_stop;
1577 : }
1578 :
1579 : size_t
1580 0 : Pipeline::get_n_frames() const
1581 : {
1582 0 : const auto n_frames = this->stages[0]->get_n_frames();
1583 :
1584 0 : for (auto& sta : this->stages)
1585 0 : if (sta->get_n_frames() != n_frames)
1586 : {
1587 0 : std::stringstream message;
1588 0 : message << "All the stages/sequences do not have the same 'n_frames' value ('sta->get_n_frames()' = "
1589 0 : << sta->get_n_frames() << ", 'n_frames' = " << n_frames << ").";
1590 0 : throw tools::runtime_error(__FILE__, __LINE__, __func__, message.str());
1591 0 : }
1592 :
1593 0 : return n_frames;
1594 : }
1595 :
1596 : void
1597 160 : Pipeline::set_n_frames(const size_t n_frames)
1598 : {
1599 160 : const auto save_bound_adaptors = this->is_bound_adaptors();
1600 160 : if (!save_bound_adaptors) this->bind_adaptors();
1601 160 : this->_unbind_adaptors(false);
1602 :
1603 : // set the new "n_frames" val in the sequences
1604 160 : std::vector<std::vector<std::pair<runtime::Socket*, runtime::Socket*>>> unbind_sockets(this->stages.size());
1605 160 : std::vector<std::vector<std::pair<runtime::Task*, runtime::Socket*>>> unbind_tasks(this->stages.size());
1606 160 : std::vector<bool> skip(this->stages.size());
1607 724 : for (size_t s = 0; s < this->stages.size(); s++)
1608 564 : skip[s] = this->stages[s]->get_n_frames() == n_frames;
1609 724 : for (size_t s = 0; s < this->stages.size(); s++)
1610 564 : if (!skip[s]) this->stages[s]->_set_n_frames_unbind(unbind_sockets[s], unbind_tasks[s]);
1611 724 : for (size_t s = 0; s < this->stages.size(); s++)
1612 564 : if (!skip[s]) this->stages[s]->_set_n_frames(n_frames);
1613 724 : for (size_t s = 0; s < this->stages.size(); s++)
1614 564 : if (!skip[s]) this->stages[s]->_set_n_frames_rebind(unbind_sockets[s], unbind_tasks[s]);
1615 :
1616 : // set the new "n_frames" val in the adaptors
1617 564 : for (auto& adps : this->adaptors)
1618 : {
1619 4600 : for (auto& adp : adps.first)
1620 4196 : adp->set_n_frames(n_frames);
1621 4196 : for (auto& adp : adps.second)
1622 3792 : adp->set_n_frames(n_frames);
1623 : }
1624 :
1625 : // bind orphans to complete the unbind of the adaptors
1626 822 : for (auto& bind : this->sck_orphan_binds)
1627 : {
1628 662 : auto sck_out = std::get<0>(bind.first);
1629 662 : auto priority = std::get<4>(bind.first);
1630 662 : auto sck_in = std::get<0>(bind.second);
1631 662 : if (sck_in != nullptr)
1632 610 : sck_in->_bind(*sck_out, priority);
1633 : else
1634 : {
1635 52 : auto tsk_in = std::get<4>(bind.second);
1636 52 : assert(tsk_in != nullptr);
1637 52 : tsk_in->_bind(*sck_out, priority);
1638 : }
1639 : }
1640 :
1641 160 : if (save_bound_adaptors) this->bind_adaptors();
1642 160 : }
|