Line data Source code
1 : #include <cassert>
2 : #include <fstream>
3 : #include <thread>
4 : #include <tuple>
5 : #include <utility>
6 :
7 : #include "Module/Stateful/Adaptor/Adaptor_m_to_n.hpp"
8 : #include "Runtime/Pipeline/Pipeline.hpp"
9 : #include "Tools/Exception/exception.hpp"
10 : #include "Tools/Interface/Interface_waiting.hpp"
11 : #include "Tools/Thread/Thread_pinning/Thread_pinning_utils.hpp"
12 : #include "Tools/Thread/Thread_pool/Standard/Thread_pool_standard.hpp"
13 :
14 : using namespace spu;
15 : using namespace spu::runtime;
16 :
17 : // Pipeline
18 : // ::Pipeline(const runtime::Task &first,
19 : // const runtime::Task &last,
20 : // const std::vector<std::pair<std::vector<const runtime::Task*>, std::vector<const runtime::Task*>>>
21 : // &sep_stages, const std::vector<size_t> &n_threads, const std::vector<size_t> &synchro_buffer_sizes, const
22 : // std::vector<bool> &synchro_active_waiting, const std::vector<bool> &thread_pinning, const
23 : // std::vector<std::vector<size_t>> &puids)
24 : // : original_sequence(first, last, 1),
25 : // stages(sep_stages.size()),
26 : // adaptors(sep_stages.size() -1),
27 : // saved_firsts_tasks_id(sep_stages.size()),
28 : // saved_lasts_tasks_id(sep_stages.size()),
29 : // bound_adaptors(false)
30 : // {
31 : // this->init<const runtime::Task>(first,
32 : // &last,
33 : // sep_stages,
34 : // n_threads,
35 : // synchro_buffer_sizes,
36 : // synchro_active_waiting,
37 : // thread_pinning,
38 : // puids);
39 : // }
40 :
41 : // Pipeline
42 : // ::Pipeline(const runtime::Task &first,
43 : // const std::vector<std::pair<std::vector<const runtime::Task*>, std::vector<const runtime::Task*>>>
44 : // &sep_stages, const std::vector<size_t> &n_threads, const std::vector<size_t> &synchro_buffer_sizes, const
45 : // std::vector<bool> &synchro_active_waiting, const std::vector<bool> &thread_pinning, const
46 : // std::vector<std::vector<size_t>> &puids)
47 : // : original_sequence(first, 1),
48 : // stages(sep_stages.size()),
49 : // adaptors(sep_stages.size() -1),
50 : // saved_firsts_tasks_id(sep_stages.size()),
51 : // saved_lasts_tasks_id(sep_stages.size()),
52 : // bound_adaptors(false)
53 : // {
54 : // const runtime::Task* last = nullptr;
55 : // this->init<const runtime::Task>(first,
56 : // last,
57 : // sep_stages,
58 : // n_threads,
59 : // synchro_buffer_sizes,
60 : // synchro_active_waiting,
61 : // thread_pinning,
62 : // puids);
63 : // }
64 :
65 19 : Pipeline
66 : ::Pipeline(const std::vector<runtime::Task*> &firsts,
67 : const std::vector<runtime::Task*> &lasts,
68 : const std::vector<std::tuple<std::vector<runtime::Task*>, std::vector<runtime::Task*>, std::vector<runtime::Task*>>> &sep_stages,
69 : const std::vector<size_t> &n_threads,
70 : const std::vector<size_t> &synchro_buffer_sizes,
71 : const std::vector<bool> &synchro_active_waiting,
72 : const std::vector<bool> &thread_pinning,
73 : const std::vector<std::vector<size_t>> &puids/*,
74 19 : const std::vector<bool> &tasks_inplace*/)
75 19 : : original_sequence(firsts, lasts, 1),
76 19 : stages(sep_stages.size()),
77 19 : adaptors(sep_stages.size() -1),
78 19 : saved_firsts_tasks_id(sep_stages.size()),
79 19 : saved_lasts_tasks_id(sep_stages.size()),
80 19 : bound_adaptors(false),
81 57 : auto_stop(true)
82 : {
83 19 : this->init<runtime::Task>(
84 : firsts, lasts, sep_stages, n_threads, synchro_buffer_sizes, synchro_active_waiting, thread_pinning, puids
85 : /*, tasks_inplace*/);
86 19 : }
87 :
88 28 : Pipeline
89 : ::Pipeline(const std::vector<runtime::Task*> &firsts,
90 : const std::vector<runtime::Task*> &lasts,
91 : const std::vector<std::pair<std::vector<runtime::Task*>, std::vector<runtime::Task*>>> &sep_stages,
92 : const std::vector<size_t> &n_threads,
93 : const std::vector<size_t> &synchro_buffer_sizes,
94 : const std::vector<bool> &synchro_active_waiting,
95 : const std::vector<bool> &thread_pinning,
96 : const std::vector<std::vector<size_t>> &puids/*,
97 28 : const std::vector<bool> &tasks_inplace*/)
98 28 : : original_sequence(firsts, lasts, 1),
99 28 : stages(sep_stages.size()),
100 28 : adaptors(sep_stages.size() -1),
101 28 : saved_firsts_tasks_id(sep_stages.size()),
102 28 : saved_lasts_tasks_id(sep_stages.size()),
103 28 : bound_adaptors(false),
104 84 : auto_stop(true)
105 : {
106 : std::vector<std::tuple<std::vector<runtime::Task*>, std::vector<runtime::Task*>, std::vector<runtime::Task*>>>
107 28 : sep_stages_bis;
108 128 : for (auto& sep_stage : sep_stages)
109 100 : sep_stages_bis.push_back(std::make_tuple(sep_stage.first, sep_stage.second, std::vector<runtime::Task*>()));
110 :
111 28 : this->init<runtime::Task>(firsts,
112 : lasts,
113 : sep_stages_bis,
114 : n_threads,
115 : synchro_buffer_sizes,
116 : synchro_active_waiting,
117 : thread_pinning, puids/*,
118 : tasks_inplace*/);
119 28 : }
120 :
121 19 : Pipeline
122 : ::Pipeline(const std::vector<runtime::Task*> &firsts,
123 : const std::vector<std::tuple<std::vector<runtime::Task*>, std::vector<runtime::Task*>, std::vector<runtime::Task*>>> &sep_stages,
124 : const std::vector<size_t> &n_threads,
125 : const std::vector<size_t> &synchro_buffer_sizes,
126 : const std::vector<bool> &synchro_active_waiting,
127 : const std::vector<bool> &thread_pinning,
128 : const std::vector<std::vector<size_t>> &puids/*,
129 19 : const std::vector<bool> &tasks_inplace*/)
130 : : Pipeline(firsts,
131 : {},
132 : sep_stages,
133 : n_threads,
134 : synchro_buffer_sizes,
135 : synchro_active_waiting,
136 : thread_pinning, puids/*,
137 19 : tasks_inplace*/)
138 : {
139 19 : }
140 :
141 28 : Pipeline
142 : ::Pipeline(const std::vector<runtime::Task*> &firsts,
143 : const std::vector<std::pair<std::vector<runtime::Task*>, std::vector<runtime::Task*>>> &sep_stages,
144 : const std::vector<size_t> &n_threads,
145 : const std::vector<size_t> &synchro_buffer_sizes,
146 : const std::vector<bool> &synchro_active_waiting,
147 : const std::vector<bool> &thread_pinning,
148 : const std::vector<std::vector<size_t>> &puids/*,
149 28 : const std::vector<bool> &tasks_inplace*/)
150 : : Pipeline(firsts,
151 : {},
152 : sep_stages,
153 : n_threads,
154 : synchro_buffer_sizes,
155 : synchro_active_waiting,
156 : thread_pinning, puids/*,
157 28 : tasks_inplace*/)
158 : {
159 28 : }
160 :
161 0 : Pipeline
162 : ::Pipeline(runtime::Task &first,
163 : runtime::Task &last,
164 : const std::vector<std::pair<std::vector<runtime::Task*>, std::vector<runtime::Task*>>> &sep_stages,
165 : const std::vector<size_t> &n_threads,
166 : const std::vector<size_t> &synchro_buffer_sizes,
167 : const std::vector<bool> &synchro_active_waiting,
168 : const std::vector<bool> &thread_pinning,
169 : const std::vector<std::vector<size_t>> &puids/*,
170 0 : const std::vector<bool> &tasks_inplace*/)
171 : : Pipeline({&first},
172 : {&last},
173 : sep_stages,
174 : n_threads,
175 : synchro_buffer_sizes,
176 : synchro_active_waiting,
177 : thread_pinning, puids/*,
178 0 : tasks_inplace*/)
179 : {
180 0 : }
181 :
182 28 : Pipeline
183 : ::Pipeline(runtime::Task &first,
184 : const std::vector<std::pair<std::vector<runtime::Task*>, std::vector<runtime::Task*>>> &sep_stages,
185 : const std::vector<size_t> &n_threads,
186 : const std::vector<size_t> &synchro_buffer_sizes,
187 : const std::vector<bool> &synchro_active_waiting,
188 : const std::vector<bool> &thread_pinning,
189 : const std::vector<std::vector<size_t>> &puids/*,
190 28 : const std::vector<bool> &tasks_inplace*/)
191 : : Pipeline({&first},
192 : sep_stages,
193 : n_threads,
194 : synchro_buffer_sizes,
195 : synchro_active_waiting,
196 : thread_pinning, puids/*,
197 28 : tasks_inplace*/)
198 : {
199 28 : }
200 :
201 : //========================== Pipeline constructors with new version thread pinning =====================================
202 10 : Pipeline
203 : ::Pipeline(const std::vector<runtime::Task*> &firsts,
204 : const std::vector<runtime::Task*> &lasts,
205 : const std::vector<std::tuple<std::vector<runtime::Task*>, std::vector<runtime::Task*>, std::vector<runtime::Task*>>> &sep_stages,
206 : const std::vector<size_t> &n_threads,
207 : const std::vector<size_t> &synchro_buffer_sizes,
208 : const std::vector<bool> &synchro_active_waiting,
209 : const std::vector<bool> &thread_pinning,
210 : const std::string &pipeline_pinning_policy/*,
211 10 : const std::vector<bool> &tasks_inplace*/)
212 10 : : original_sequence(firsts, lasts, 1),
213 10 : stages(sep_stages.size()),
214 10 : adaptors(sep_stages.size() -1),
215 10 : saved_firsts_tasks_id(sep_stages.size()),
216 10 : saved_lasts_tasks_id(sep_stages.size()),
217 10 : bound_adaptors(false),
218 30 : auto_stop(true)
219 : {
220 10 : this->init<runtime::Task>(firsts,
221 : lasts,
222 : sep_stages,
223 : n_threads,
224 : synchro_buffer_sizes,
225 : synchro_active_waiting,
226 : thread_pinning, {}, pipeline_pinning_policy/*,
227 : tasks_inplace*/);
228 10 : }
229 :
230 47 : Pipeline
231 : ::Pipeline(const std::vector<runtime::Task*> &firsts,
232 : const std::vector<runtime::Task*> &lasts,
233 : const std::vector<std::pair<std::vector<runtime::Task*>, std::vector<runtime::Task*>>> &sep_stages,
234 : const std::vector<size_t> &n_threads,
235 : const std::vector<size_t> &synchro_buffer_sizes,
236 : const std::vector<bool> &synchro_active_waiting,
237 : const std::vector<bool> &thread_pinning,
238 : const std::string &pipeline_pinning_policy/*,
239 47 : const std::vector<bool> &tasks_inplace*/)
240 47 : : original_sequence(firsts, lasts, 1),
241 47 : stages(sep_stages.size()),
242 47 : adaptors(sep_stages.size() -1),
243 47 : saved_firsts_tasks_id(sep_stages.size()),
244 47 : saved_lasts_tasks_id(sep_stages.size()),
245 47 : bound_adaptors(false),
246 141 : auto_stop(true)
247 : {
248 : std::vector<std::tuple<std::vector<runtime::Task*>, std::vector<runtime::Task*>, std::vector<runtime::Task*>>>
249 47 : sep_stages_bis;
250 220 : for (auto& sep_stage : sep_stages)
251 173 : sep_stages_bis.push_back(std::make_tuple(sep_stage.first, sep_stage.second, std::vector<runtime::Task*>()));
252 :
253 47 : this->init<runtime::Task>(firsts,
254 : lasts,
255 : sep_stages_bis,
256 : n_threads,
257 : synchro_buffer_sizes,
258 : synchro_active_waiting,
259 : thread_pinning, {}, pipeline_pinning_policy/*,
260 : tasks_inplace*/);
261 47 : }
262 :
263 10 : Pipeline
264 : ::Pipeline(const std::vector<runtime::Task*> &firsts,
265 : const std::vector<std::tuple<std::vector<runtime::Task*>, std::vector<runtime::Task*>, std::vector<runtime::Task*>>> &sep_stages,
266 : const std::vector<size_t> &n_threads,
267 : const std::vector<size_t> &synchro_buffer_sizes,
268 : const std::vector<bool> &synchro_active_waiting,
269 : const std::vector<bool> &thread_pinning,
270 : const std::string &pipeline_pinning_policy/*,
271 10 : const std::vector<bool> &tasks_inplace*/)
272 : : Pipeline(firsts,
273 : {},
274 : sep_stages,
275 : n_threads,
276 : synchro_buffer_sizes,
277 : synchro_active_waiting,
278 : thread_pinning, pipeline_pinning_policy/*,
279 10 : tasks_inplace*/)
280 : {
281 10 : }
282 :
283 36 : Pipeline
284 : ::Pipeline(const std::vector<runtime::Task*> &firsts,
285 : const std::vector<std::pair<std::vector<runtime::Task*>, std::vector<runtime::Task*>>> &sep_stages,
286 : const std::vector<size_t> &n_threads,
287 : const std::vector<size_t> &synchro_buffer_sizes,
288 : const std::vector<bool> &synchro_active_waiting,
289 : const std::vector<bool> &thread_pinning,
290 : const std::string &pipeline_pinning_policy/*,
291 36 : const std::vector<bool> &tasks_inplace*/)
292 : : Pipeline(firsts,
293 : {},
294 : sep_stages,
295 : n_threads,
296 : synchro_buffer_sizes,
297 : synchro_active_waiting,
298 : thread_pinning, pipeline_pinning_policy/*,
299 36 : tasks_inplace*/)
300 : {
301 36 : }
302 :
303 0 : Pipeline
304 : ::Pipeline(runtime::Task &first,
305 : runtime::Task &last,
306 : const std::vector<std::pair<std::vector<runtime::Task*>, std::vector<runtime::Task*>>> &sep_stages,
307 : const std::vector<size_t> &n_threads,
308 : const std::vector<size_t> &synchro_buffer_sizes,
309 : const std::vector<bool> &synchro_active_waiting,
310 : const std::vector<bool> &thread_pinning,
311 : const std::string &pipeline_pinning_policy/*,
312 0 : const std::vector<bool> &tasks_inplace*/)
313 : : Pipeline({&first},
314 : {&last},
315 : sep_stages,
316 : n_threads,
317 : synchro_buffer_sizes,
318 : synchro_active_waiting,
319 : thread_pinning, pipeline_pinning_policy/*,
320 0 : tasks_inplace*/)
321 : {
322 0 : }
323 :
324 36 : Pipeline
325 : ::Pipeline(runtime::Task &first,
326 : const std::vector<std::pair<std::vector<runtime::Task*>, std::vector<runtime::Task*>>> &sep_stages,
327 : const std::vector<size_t> &n_threads,
328 : const std::vector<size_t> &synchro_buffer_sizes,
329 : const std::vector<bool> &synchro_active_waiting,
330 : const std::vector<bool> &thread_pinning,
331 : const std::string &pipeline_pinning_policy/*,
332 36 : const std::vector<bool> &tasks_inplace*/)
333 : : Pipeline({&first},
334 : sep_stages,
335 : n_threads,
336 : synchro_buffer_sizes,
337 : synchro_active_waiting,
338 : thread_pinning,
339 : pipeline_pinning_policy/*,
340 36 : tasks_inplace*/)
341 : {
342 36 : }
343 :
344 198 : Pipeline::~Pipeline()
345 : {
346 : // hack: this is important to avoid to deallocate twice the same out buffers
347 104 : this->original_sequence.memory_allocation = false;
348 :
349 : // deallocating stages memory
350 477 : for (auto stage : this->stages)
351 373 : stage->deallocate_outbuffers();
352 :
353 104 : this->unbind_adaptors();
354 198 : }
355 :
356 : std::vector<Sequence*>
357 192 : Pipeline::get_stages()
358 : {
359 192 : std::vector<Sequence*> stages;
360 924 : for (auto& stage : this->stages)
361 732 : stages.push_back(stage.get());
362 192 : return stages;
363 0 : }
364 :
365 : Sequence&
366 0 : Pipeline::operator[](const size_t stage_id)
367 : {
368 0 : assert(stage_id < this->stages.size());
369 0 : return *this->stages[stage_id];
370 : }
371 :
372 : template<class TA>
373 : runtime::Sequence*
374 : create_sequence(const std::vector<TA*>& firsts,
375 : const std::vector<TA*>& lasts,
376 : const std::vector<TA*>& exclusions,
377 : const size_t& n_threads,
378 : const bool& thread_pinning,
379 : const std::vector<size_t>& puids,
380 : const bool& tasks_inplace)
381 : {
382 : throw tools::unimplemented_error(__FILE__, __LINE__, __func__);
383 : }
384 :
385 : template<>
386 : runtime::Sequence*
387 0 : create_sequence<const runtime::Task>(const std::vector<const runtime::Task*>& firsts,
388 : const std::vector<const runtime::Task*>& lasts,
389 : const std::vector<const runtime::Task*>& exclusions,
390 : const size_t& n_threads,
391 : const bool& thread_pinning,
392 : const std::vector<size_t>& puids,
393 : const bool& /*tasks_inplace*/)
394 : {
395 0 : return new runtime::Sequence(firsts, lasts, exclusions, n_threads, thread_pinning, puids, false);
396 : }
397 :
398 : template<>
399 : Sequence*
400 308 : create_sequence<runtime::Task>(const std::vector<runtime::Task*>& firsts,
401 : const std::vector<runtime::Task*>& lasts,
402 : const std::vector<runtime::Task*>& exclusions,
403 : const size_t& n_threads,
404 : const bool& thread_pinning,
405 : const std::vector<size_t>& puids,
406 : const bool& tasks_inplace)
407 : {
408 308 : return new runtime::Sequence(firsts, lasts, exclusions, n_threads, thread_pinning, puids, tasks_inplace, false);
409 : }
410 :
411 : // Init and sequence creation for second pinning version
412 : template<class TA>
413 : runtime::Sequence*
414 : create_sequence(const std::vector<TA*>& firsts,
415 : const std::vector<TA*>& lasts,
416 : const std::vector<TA*>& exclusions,
417 : const size_t& n_threads,
418 : const bool& thread_pinning,
419 : const std::string& pipeline_pinning_policy,
420 : const bool& tasks_inplace)
421 : {
422 : throw tools::unimplemented_error(__FILE__, __LINE__, __func__);
423 : }
424 :
425 : template<>
426 : runtime::Sequence*
427 0 : create_sequence<const runtime::Task>(const std::vector<const runtime::Task*>& firsts,
428 : const std::vector<const runtime::Task*>& lasts,
429 : const std::vector<const runtime::Task*>& exclusions,
430 : const size_t& n_threads,
431 : const bool& thread_pinning,
432 : const std::string& pipeline_pinning_policy,
433 : const bool& /*tasks_inplace*/)
434 : {
435 0 : return new runtime::Sequence(firsts, lasts, exclusions, n_threads, thread_pinning, pipeline_pinning_policy, false);
436 : }
437 :
438 : template<>
439 : Sequence*
440 65 : create_sequence<runtime::Task>(const std::vector<runtime::Task*>& firsts,
441 : const std::vector<runtime::Task*>& lasts,
442 : const std::vector<runtime::Task*>& exclusions,
443 : const size_t& n_threads,
444 : const bool& thread_pinning,
445 : const std::string& pipeline_pinning_policy,
446 : const bool& tasks_inplace)
447 : {
448 : return new runtime::Sequence(
449 65 : firsts, lasts, exclusions, n_threads, thread_pinning, pipeline_pinning_policy, tasks_inplace, false);
450 : }
451 :
452 : template <class TA>
453 104 : void Pipeline
454 : ::init(const std::vector<TA*> &/*firsts*/,
455 : const std::vector<TA*> &/*lasts*/,
456 : const std::vector<std::tuple<std::vector<TA*>,std::vector<TA*>,std::vector<TA*>>> &sep_stages,
457 : const std::vector<size_t> &n_threads,
458 : const std::vector<size_t> &synchro_buffer_sizes,
459 : const std::vector<bool> &synchro_active_waiting,
460 : const std::vector<bool> &thread_pinning,
461 : const std::vector<std::vector<size_t>> &puids,
462 : const std::string &pipeline_pinning_policy/*,
463 : const std::vector<bool> &tasks_inplace*/)
464 : {
465 104 : if (sep_stages.size() != n_threads.size() && n_threads.size() != 0)
466 : {
467 0 : std::stringstream message;
468 0 : message << "'n_threads.size()' has to be equal to 'sep_stages.size()' or equal to '0' ('n_threads.size()' = "
469 0 : << n_threads.size() << " , 'sep_stages.size()' = " << sep_stages.size() << ").";
470 0 : throw tools::invalid_argument(__FILE__, __LINE__, __func__, message.str());
471 0 : }
472 :
473 104 : if (sep_stages.size() != synchro_buffer_sizes.size() + 1 && synchro_buffer_sizes.size() != 0)
474 : {
475 0 : std::stringstream message;
476 : message << "'synchro_buffer_sizes.size()' has to be equal to 'sep_stages.size() -1' or equal to '0' "
477 0 : << "('synchro_buffer_sizes.size()' = " << synchro_buffer_sizes.size()
478 0 : << " , 'sep_stages.size()' = " << sep_stages.size() << ").";
479 0 : throw tools::invalid_argument(__FILE__, __LINE__, __func__, message.str());
480 0 : }
481 :
482 104 : if (sep_stages.size() != synchro_active_waiting.size() + 1 && synchro_active_waiting.size() != 0)
483 : {
484 0 : std::stringstream message;
485 : message << "'synchro_active_waiting.size()' has to be equal to 'sep_stages.size() -1' or equal to '0' "
486 0 : << "('synchro_active_waiting.size()' = " << synchro_active_waiting.size()
487 0 : << " , 'sep_stages.size()' = " << sep_stages.size() << ").";
488 0 : throw tools::invalid_argument(__FILE__, __LINE__, __func__, message.str());
489 0 : }
490 :
491 104 : if (sep_stages.size() != thread_pinning.size() && thread_pinning.size() != 0)
492 : {
493 0 : std::stringstream message;
494 : message << "'thread_pinning.size()' has to be equal to 'sep_stages.size()' or equal to '0' ("
495 0 : << "'thread_pinning.size()' = " << thread_pinning.size()
496 0 : << " , 'sep_stages.size()' = " << sep_stages.size() << ").";
497 0 : throw tools::invalid_argument(__FILE__, __LINE__, __func__, message.str());
498 0 : }
499 :
500 104 : if (sep_stages.size() != puids.size() && puids.size() != 0)
501 : {
502 0 : std::stringstream message;
503 0 : message << "'puids.size()' has to be equal to 'sep_stages.size()' or equal to '0' ('puids.size()' = "
504 0 : << puids.size() << " , 'sep_stages.size()' = " << sep_stages.size() << ").";
505 0 : throw tools::invalid_argument(__FILE__, __LINE__, __func__, message.str());
506 0 : }
507 :
508 : // if (sep_stages.size() != tasks_inplace.size() && tasks_inplace.size() != 0)
509 : // {
510 : // std::stringstream message;
511 : // message << "'tasks_inplace.size()' has to be equal to 'sep_stages.size()' or equal to '0' ('"
512 : // << "tasks_inplace.size()' = " << tasks_inplace.size() << " , 'sep_stages.size()' = "
513 : // << sep_stages.size() << ").";
514 : // throw tools::invalid_argument(__FILE__, __LINE__, __func__, message.str());
515 : // }
516 :
517 : // Creating a vector of pinning policies for each sequence
518 104 : std::vector<std::string> sequences_pinning_policies;
519 104 : if (!pipeline_pinning_policy.empty())
520 23 : sequences_pinning_policies =
521 : tools::Thread_pinning_utils::pipeline_parser_unpacker(pipeline_pinning_policy, sep_stages.size());
522 :
523 477 : for (size_t s = 0; s < sep_stages.size(); s++)
524 : {
525 373 : const std::vector<TA*>& stage_firsts = std::get<0>(sep_stages[s]);
526 373 : const std::vector<TA*>& stage_lasts = std::get<1>(sep_stages[s]);
527 373 : const std::vector<TA*>& stage_exclusions = std::get<2>(sep_stages[s]);
528 373 : const size_t stage_n_threads = n_threads.size() ? n_threads[s] : 1;
529 373 : const bool stage_thread_pinning = thread_pinning.size() ? thread_pinning[s] : false;
530 373 : const std::vector<size_t> stage_puids = puids.size() ? puids[s] : std::vector<size_t>();
531 438 : const std::string sequence_pinning_policy =
532 438 : sequences_pinning_policies.size() ? sequences_pinning_policies[s] : "";
533 373 : const bool stage_tasks_inplace = /*tasks_inplace.size() ? tasks_inplace[s] :*/ true;
534 : try
535 : {
536 373 : if (pipeline_pinning_policy.empty())
537 308 : this->stages[s].reset(create_sequence<TA>(stage_firsts,
538 : stage_lasts,
539 : stage_exclusions,
540 : stage_n_threads,
541 : stage_thread_pinning,
542 : stage_puids,
543 : stage_tasks_inplace));
544 : else
545 65 : this->stages[s].reset(create_sequence<TA>(stage_firsts,
546 : stage_lasts,
547 : stage_exclusions,
548 : stage_n_threads,
549 : stage_thread_pinning,
550 : sequence_pinning_policy,
551 : stage_tasks_inplace));
552 : }
553 0 : catch (const tools::control_flow_error& e)
554 : {
555 0 : std::stringstream message;
556 0 : message << "Invalid control flow error on stage " << s
557 0 : << " (perhaps a switcher's tasks were separated between different stages)." << std::endl
558 0 : << e.what();
559 0 : throw tools::control_flow_error(__FILE__, __LINE__, __func__, message.str());
560 0 : }
561 373 : this->stages[s]->is_part_of_pipeline = true;
562 : }
563 :
564 : // verify that the sequential sequence is equivalent to the pipeline sequence
565 104 : auto ref_tasks = this->original_sequence.get_tasks_per_threads()[0];
566 104 : auto cur_tasks = this->get_tasks_per_threads()[0];
567 :
568 104 : if (ref_tasks.size() != cur_tasks.size())
569 : {
570 0 : std::ofstream f1("dbg_ref_sequence.dot");
571 0 : this->original_sequence.export_dot(f1);
572 0 : std::ofstream f2("dbg_cur_pipeline.dot");
573 0 : this->export_dot(f2);
574 :
575 0 : std::stringstream message;
576 0 : message << "'ref_tasks.size()' has to be equal to 'cur_tasks.size()' ('ref_tasks.size()' = " << ref_tasks.size()
577 0 : << ", 'cur_tasks.size()' = " << cur_tasks.size() << ").";
578 0 : throw tools::invalid_argument(__FILE__, __LINE__, __func__, message.str());
579 0 : }
580 :
581 1082 : for (size_t ta = 0; ta < cur_tasks.size(); ta++)
582 : {
583 978 : if (std::find(ref_tasks.begin(), ref_tasks.end(), cur_tasks[ta]) == ref_tasks.end())
584 : {
585 0 : std::ofstream f1("dbg_ref_sequence.dot");
586 0 : this->original_sequence.export_dot(f1);
587 0 : std::ofstream f2("dbg_cur_pipeline.dot");
588 0 : this->export_dot(f2);
589 :
590 0 : std::stringstream message;
591 0 : message << "'cur_tasks[ta]' is not contained in the 'ref_tasks' vector ('ta' = " << ta
592 0 : << ", 'cur_tasks[ta]' = " << +cur_tasks[ta]
593 0 : << ", 'cur_tasks[ta]->get_name()' = " << cur_tasks[ta]->get_name() << ").";
594 0 : throw tools::invalid_argument(__FILE__, __LINE__, __func__, message.str());
595 0 : }
596 : }
597 :
598 : // Adding adaptors to pipeline stages
599 104 : this->create_adaptors(synchro_buffer_sizes, synchro_active_waiting);
600 104 : this->bind_adaptors();
601 :
602 : // Allocating memory for stages
603 477 : for (auto stage : this->stages)
604 : {
605 373 : stage->allocate_outbuffers();
606 : }
607 :
608 104 : this->thread_pool.reset(new tools::Thread_pool_standard(this->stages.size() - 1));
609 104 : this->thread_pool->init(); // threads are spawned here
610 104 : }
611 :
612 : void
613 104 : Pipeline::create_adaptors(const std::vector<size_t>& synchro_buffer_sizes,
614 : const std::vector<bool>& synchro_active_waiting)
615 : {
616 : // sck out addr occ stage tsk id sck id
617 104 : std::vector<std::tuple<runtime::Socket*, size_t, size_t, size_t, size_t>> out_sck_orphans;
618 :
619 : // for all the stages in the pipeline
620 373 : for (size_t sta = 0; sta < this->stages.size() - 1; sta++)
621 : {
622 : // ------------------------------------------------------------------------------------------------------------
623 : // --------------------------------------------------------------- collect orphan output sockets in stage 'sta'
624 : // ------------------------------------------------------------------------------------------------------------
625 269 : std::vector<std::vector<runtime::Task*>> tasks_per_threads = this->stages[sta]->get_tasks_per_threads();
626 : // for all the threads in the current stage
627 : // for (size_t t = 0; t < tasks_per_threads.size(); t++)
628 269 : size_t t = 0;
629 : {
630 : // for all the tasks in the stage
631 1015 : for (size_t tsk_id = 0; tsk_id < tasks_per_threads[t].size(); tsk_id++)
632 : {
633 746 : auto tsk = tasks_per_threads[t][tsk_id];
634 : // for all the sockets of the tasks
635 2666 : for (size_t sck_id = 0; sck_id < tsk->sockets.size(); sck_id++)
636 : {
637 1920 : auto sck = tsk->sockets[sck_id];
638 : // if the current socket is an output or forward socket type
639 1920 : if (sck->get_type() == socket_t::SOUT || sck->get_type() == socket_t::SFWD)
640 : {
641 : // for all the bounded sockets to the current socket
642 2438 : for (auto bsck : sck->get_bound_sockets())
643 : {
644 : // check if the task of the bounded socket is not in the current stage
645 916 : if (std::find(tasks_per_threads[t].begin(),
646 916 : tasks_per_threads[t].end(),
647 1832 : &bsck->get_task()) == tasks_per_threads[t].end())
648 : {
649 : // check the position of the socket in the orphans
650 426 : size_t pos = 0;
651 1149 : for (; pos < out_sck_orphans.size(); pos++)
652 773 : if (std::get<0>(out_sck_orphans[pos]) == sck.get()) break;
653 :
654 426 : if (pos == out_sck_orphans.size())
655 376 : out_sck_orphans.push_back(std::make_tuple(sck.get(), 1, sta, tsk_id, sck_id));
656 : else
657 50 : std::get<1>(out_sck_orphans[pos])++;
658 : }
659 : }
660 : }
661 1920 : }
662 : }
663 : }
664 :
665 : // ------------------------------------------------------------------------------------------------------------
666 : // -------------------------------------- collect orphan input sockets in stage 'sta +1' and create connections
667 : // ------------------------------------------------------------------------------------------------------------
668 269 : tasks_per_threads = this->stages[sta + 1]->get_tasks_per_threads();
669 : // for all the threads in the current stage
670 : // for (size_t t = 0; t < tasks_per_threads.size(); t++)
671 : {
672 : // for all the tasks in the stage
673 1068 : for (size_t tsk_id = 0; tsk_id < tasks_per_threads[t].size(); tsk_id++)
674 : {
675 799 : auto tsk = tasks_per_threads[t][tsk_id];
676 : // ----------------------------------------- manage socket to socket bindings (with fake input sockets)
677 : // for all the sockets of the tasks
678 2836 : for (size_t sck_id = 0; sck_id < tsk->sockets.size(); sck_id++)
679 : {
680 2037 : auto sck = tsk->sockets[sck_id];
681 : // if the current socket is an input or forward socket type
682 2037 : if (sck->get_type() == socket_t::SIN || sck->get_type() == socket_t::SFWD)
683 : {
684 820 : runtime::Socket* bsck = nullptr;
685 : try
686 : {
687 : // get output bounded socket
688 820 : bsck = &sck->get_bound_socket(); // can throw if there is no bounded socket
689 : }
690 0 : catch (const std::exception&)
691 : {
692 0 : }
693 820 : if (bsck != nullptr)
694 : {
695 : // check if the task of the bounded socket is not in the current stage
696 820 : if (std::find(tasks_per_threads[t].begin(),
697 820 : tasks_per_threads[t].end(),
698 1640 : &bsck->get_task()) == tasks_per_threads[t].end())
699 : {
700 : // check the position of the bounded socket in the orphans
701 397 : size_t pos = 0;
702 1017 : for (; pos < out_sck_orphans.size(); pos++)
703 1017 : if (std::get<0>(out_sck_orphans[pos]) == bsck) break;
704 :
705 397 : if (pos < out_sck_orphans.size())
706 : {
707 397 : auto sck_out = std::get<0>(out_sck_orphans[pos]);
708 397 : auto sck_in = sck.get();
709 794 : auto unbind_sout_pos = std::distance(sck_out->get_bound_sockets().begin(),
710 397 : std::find(sck_out->get_bound_sockets().begin(),
711 397 : sck_out->get_bound_sockets().end(),
712 397 : sck_in));
713 397 : this->sck_orphan_binds.push_back(
714 397 : std::make_pair(std::make_tuple(std::get<0>(out_sck_orphans[pos]),
715 397 : std::get<2>(out_sck_orphans[pos]),
716 397 : std::get<3>(out_sck_orphans[pos]),
717 397 : std::get<4>(out_sck_orphans[pos]),
718 : unbind_sout_pos),
719 794 : std::make_tuple(sck_in, sta + 1, tsk_id, sck_id, nullptr)));
720 : }
721 : }
722 : }
723 : }
724 2037 : }
725 : // ------------------------------------------- manage socket to task bindings (with fake input sockets)
726 944 : for (size_t sck_id = 0; sck_id < tsk->fake_input_sockets.size(); sck_id++)
727 : {
728 145 : auto sck = tsk->fake_input_sockets[sck_id];
729 145 : runtime::Socket* bsck = nullptr;
730 : try
731 : {
732 : // get output bounded socket
733 145 : bsck = &sck->get_bound_socket(); // can throw if there is no bounded socket
734 : }
735 0 : catch (const std::exception&)
736 : {
737 0 : }
738 145 : if (bsck != nullptr)
739 : {
740 : // check if the task of the bounded socket is not in the current stage
741 145 : if (std::find(tasks_per_threads[t].begin(), tasks_per_threads[t].end(), &bsck->get_task()) ==
742 290 : tasks_per_threads[t].end())
743 : {
744 : // check the position of the bounded socket in the orphans
745 29 : size_t pos = 0;
746 132 : for (; pos < out_sck_orphans.size(); pos++)
747 132 : if (std::get<0>(out_sck_orphans[pos]) == bsck) break;
748 :
749 29 : if (pos < out_sck_orphans.size())
750 : {
751 29 : auto sck_out = std::get<0>(out_sck_orphans[pos]);
752 29 : auto sck_in = sck.get();
753 58 : auto unbind_sout_pos = std::distance(sck_out->get_bound_sockets().begin(),
754 29 : std::find(sck_out->get_bound_sockets().begin(),
755 29 : sck_out->get_bound_sockets().end(),
756 29 : sck_in));
757 29 : this->sck_orphan_binds.push_back(
758 29 : std::make_pair(std::make_tuple(std::get<0>(out_sck_orphans[pos]),
759 29 : std::get<2>(out_sck_orphans[pos]),
760 29 : std::get<3>(out_sck_orphans[pos]),
761 29 : std::get<4>(out_sck_orphans[pos]),
762 : unbind_sout_pos),
763 58 : std::make_tuple(nullptr, sta + 1, tsk_id, sck_id, tsk)));
764 : }
765 : }
766 : }
767 145 : }
768 : // ----------------------------------------------------------------------------------------------------
769 : }
770 : }
771 269 : }
772 :
773 : // ----------------------------------------------------------------------------------------------------------------
774 : // ----------------------------------------------------------------------------------------------- prints for debug
775 : // ----------------------------------------------------------------------------------------------------------------
776 : // std::cout << "Orphan output sockets list:" << std::endl;
777 : // for (auto &sck : out_sck_orphans)
778 : // {
779 : // auto sck_out_name = std::get<0>(sck)->get_name();
780 : // auto tsk_out_name = std::get<0>(sck)->get_task().get_name();
781 : // auto sck_out_occ = std::get<1>(sck);
782 : // auto tsk_out_sta = std::get<2>(sck);
783 : // auto tsk_out_id = std::get<3>(sck);
784 : // auto sck_out_id = std::get<4>(sck);
785 :
786 : // std::cout << " " << tsk_out_name << "[" << sck_out_name << "] (stage " << tsk_out_sta << ", " << sck_out_occ
787 : // << " occurrences, tsk id = " << tsk_out_id << ", sck id = " << sck_out_id << ")" << std::endl;
788 : // }
789 :
790 : // std::cout << std::endl << "Detected socket binds:" << std::endl;
791 : // for (auto &bind : this->sck_orphan_binds)
792 : // {
793 : // auto sck_out_name = std::get<0>(bind.first)->get_name();
794 : // auto tsk_out_name = std::get<0>(bind.first)->get_task().get_name();
795 : // auto tsk_out_sta = std::get<1>(bind.first);
796 : // auto tsk_out_id = std::get<2>(bind.first);
797 : // auto sck_out_id = std::get<3>(bind.first);
798 : // auto sck_out_ubp = std::get<4>(bind.first);
799 :
800 : // auto sck_in_name = std::get<0>(bind.second)->get_name();
801 : // auto tsk_in_name = std::get<0>(bind.second)->get_task().get_name();
802 : // auto tsk_in_sta = std::get<1>(bind.second);
803 : // auto tsk_in_id = std::get<2>(bind.second);
804 : // auto sck_in_id = std::get<3>(bind.second);
805 :
806 : // std::cout << " " << tsk_out_name << "[" << sck_out_name << "] (stage " << tsk_out_sta << ", tsk id = "
807 : // << tsk_out_id << ", sck id = " << sck_out_id << ", ubp = " << sck_out_ubp << ")" << " -> "
808 : // << tsk_in_name << "[" << sck_in_name << "] (stage " << tsk_in_sta << ", tsk id = "
809 : // << tsk_in_id << ", sck id = " << sck_in_id << ")" << std::endl;
810 : // }
811 :
812 : // ----------------------------------------------------------------------------------------------------------------
813 : // ------------------------------------------------------------------------------------------------ create adaptors
814 : // ----------------------------------------------------------------------------------------------------------------
815 104 : auto sck_orphan_binds_cpy = this->sck_orphan_binds;
816 104 : module::Adaptor_m_to_n* adp = nullptr;
817 104 : std::map<runtime::Socket*, size_t> sck_to_adp_sck_id;
818 477 : for (size_t sta = 0; sta < this->stages.size(); sta++)
819 : {
820 373 : const auto n_threads = this->stages[sta]->get_n_threads();
821 373 : std::vector<std::vector<runtime::Task*>> tasks_per_threads = this->stages[sta]->get_tasks_per_threads();
822 :
823 : // ------------------------------------------------------------------------------------------------------------
824 : // ----------------------------------------------------------------------------------------------- pull adaptor
825 : // ------------------------------------------------------------------------------------------------------------
826 373 : if (sta > 0)
827 : {
828 269 : assert(adp != nullptr);
829 : // sck out addr stage tsk id sck id unbind_pos
830 : std::vector<std::pair<std::tuple<runtime::Socket*, size_t, size_t, size_t, size_t>,
831 : // sck in addr stage tsk id sck id tsk in addr
832 : std::tuple<runtime::Socket*, size_t, size_t, size_t, runtime::Task*>>>
833 269 : sck_orphan_binds_new;
834 :
835 3361 : for (size_t t = 0; t < n_threads; t++)
836 : {
837 3092 : module::Adaptor_m_to_n* cur_adp = (t == 0) ? adp : adp->clone();
838 3092 : if (t > 0) cur_adp->add_puller();
839 :
840 9276 : for (auto& t : cur_adp->tasks)
841 6184 : t->set_fast(true);
842 3092 : if (t > 0)
843 : {
844 2823 : this->adaptors[sta - 1].second.push_back(std::unique_ptr<module::Adaptor_m_to_n>(cur_adp));
845 2823 : cur_adp->set_custom_name("Adp_m_to_n_" + std::to_string(sta - 1));
846 : }
847 :
848 3092 : auto task_pull = &(*cur_adp)("pull");
849 :
850 3092 : sck_orphan_binds_new.clear();
851 17727 : for (auto& bind : sck_orphan_binds_cpy)
852 : {
853 14635 : auto tsk_out_sta = std::get<1>(bind.first);
854 14635 : if (tsk_out_sta < sta)
855 : {
856 8094 : auto tsk_in_sta = std::get<1>(bind.second);
857 8094 : if (tsk_in_sta == sta)
858 : {
859 4348 : auto sck_out_ptr = std::get<0>(bind.first);
860 4348 : auto priority = std::get<4>(bind.first);
861 4348 : auto tsk_in_id = std::get<2>(bind.second);
862 4348 : auto sck_in_id = std::get<3>(bind.second);
863 4348 : runtime::Socket* sck_in = nullptr;
864 4348 : runtime::Task* tsk_in = nullptr;
865 4348 : if (std::get<0>(bind.second) != nullptr) // if socket to socket binding
866 3656 : sck_in = tasks_per_threads[t][tsk_in_id]->sockets[sck_in_id].get();
867 : else // if socket to task binding
868 692 : tsk_in = tasks_per_threads[t][tsk_in_id];
869 4348 : this->adaptors_binds.push_back(std::make_tuple(
870 8696 : task_pull->sockets[sck_to_adp_sck_id[sck_out_ptr]].get(), sck_in, priority, tsk_in));
871 : }
872 : else
873 3746 : sck_orphan_binds_new.push_back(bind);
874 : }
875 : else
876 6541 : sck_orphan_binds_new.push_back(bind);
877 : }
878 :
879 3092 : if (t > 0) this->stages[sta]->all_modules[t].push_back(cur_adp);
880 : }
881 269 : this->saved_firsts_tasks_id[sta] = this->stages[sta]->firsts_tasks_id;
882 269 : sck_orphan_binds_cpy = sck_orphan_binds_new;
883 :
884 269 : adp->alloc_buffers();
885 269 : }
886 :
887 : // ------------------------------------------------------------------------------------------------------------
888 : // ----------------------------------------------------------------------------------------------- push adaptor
889 : // ------------------------------------------------------------------------------------------------------------
890 373 : std::map<runtime::Socket*, size_t> sck_to_adp_sck_id_new;
891 373 : if (sta < this->stages.size() - 1)
892 : {
893 269 : std::vector<size_t> adp_n_elmts;
894 269 : std::vector<std::type_index> adp_datatype;
895 269 : size_t adp_buffer_size = synchro_buffer_sizes.size() ? synchro_buffer_sizes[sta] : 1;
896 269 : bool adp_active_waiting = synchro_active_waiting.size() ? synchro_active_waiting[sta] : false;
897 269 : size_t adp_n_frames = 1;
898 :
899 : // a map to remember if a passed socket points already to the same memory space
900 269 : std::map<void*, size_t> fwd_source;
901 :
902 269 : std::vector<runtime::Socket*> passed_scks_out;
903 1186 : for (auto& bind : sck_orphan_binds_cpy)
904 : {
905 917 : auto tsk_out_sta = std::get<1>(bind.first);
906 917 : if (tsk_out_sta <= sta)
907 : {
908 585 : auto sck_out = std::get<0>(bind.first);
909 585 : if (std::find(passed_scks_out.begin(), passed_scks_out.end(), sck_out) == passed_scks_out.end())
910 : {
911 : // avoid the creation of new adaptor sockets for forward sockets pointing to the same memory
912 : // space
913 525 : auto sck_out_dptr = (void*)sck_out->get_dataptr<int8_t>();
914 525 : assert(sck_out_dptr != nullptr);
915 525 : if (fwd_source.find(sck_out_dptr) == fwd_source.end())
916 : {
917 458 : fwd_source[sck_out_dptr] = 1;
918 458 : adp_n_frames = sck_out->get_task().get_module().get_n_frames();
919 458 : adp_n_elmts.push_back(sck_out->get_n_elmts() / adp_n_frames);
920 458 : adp_datatype.push_back(sck_out->get_datatype());
921 : }
922 525 : passed_scks_out.push_back(sck_out);
923 : }
924 : }
925 : }
926 269 : passed_scks_out.clear();
927 :
928 : // allocate the adaptor for the first thread
929 269 : adp = new module::Adaptor_m_to_n(adp_n_elmts, adp_datatype, adp_buffer_size, adp_active_waiting);
930 269 : adp->set_n_frames(adp_n_frames);
931 :
932 3361 : for (size_t t = 0; t < n_threads; t++)
933 : {
934 3092 : module::Adaptor_m_to_n* cur_adp = (t == 0) ? adp : adp->clone();
935 3092 : cur_adp->set_custom_name("Adp_m_to_n_" + std::to_string(sta));
936 3092 : if (t > 0) cur_adp->add_pusher();
937 3092 : this->adaptors[sta].first.push_back(std::unique_ptr<module::Adaptor_m_to_n>(cur_adp));
938 3092 : auto task_push = &(*cur_adp)("push");
939 :
940 3092 : std::map<void*, size_t> fwd_source;
941 3092 : sck_to_adp_sck_id_new.clear();
942 3092 : size_t adp_sck_id = 0;
943 13805 : for (auto& bind : sck_orphan_binds_cpy) // bind standard task to last adaptor
944 : {
945 10713 : auto tsk_out_sta = std::get<1>(bind.first);
946 :
947 10713 : if (tsk_out_sta <= sta)
948 : {
949 9161 : auto sck_out_ptr = std::get<0>(bind.first);
950 9161 : auto sck_out_dptr = (void*)sck_out_ptr->get_dataptr<int8_t>();
951 9161 : assert(sck_out_dptr != nullptr);
952 :
953 9161 : if (std::find(passed_scks_out.begin(),
954 : passed_scks_out.end(),
955 26581 : sck_out_ptr) == passed_scks_out.end() &&
956 17420 : fwd_source.find(sck_out_dptr) == fwd_source.end()) // <= the latest condition is here to
957 : // avoid to bind adaptor sockets two
958 : // times the same memory space
959 : // (usefull in the case of multiple
960 : // fwd sockets pointing to the same
961 : // memory address)
962 : {
963 6769 : if (tsk_out_sta == sta)
964 : {
965 4434 : auto tsk_out_id = std::get<2>(bind.first);
966 4434 : auto sck_out_id = std::get<3>(bind.first);
967 4434 : auto sck_out = tasks_per_threads[t][tsk_out_id]->sockets[sck_out_id];
968 4434 : auto priority = std::get<4>(bind.first);
969 4434 : sck_to_adp_sck_id_new[sck_out_ptr] = adp_sck_id;
970 4434 : this->adaptors_binds.push_back(
971 4434 : std::make_tuple(sck_out.get(),
972 4434 : task_push->sockets[adp_sck_id++].get(),
973 : priority,
974 4434 : nullptr)); // <= only socket to socket binding is possible here
975 4434 : }
976 : else // if (tsk_out_sta < sta) // bind prev. adaptor to last adaptor
977 : {
978 2335 : auto tsk_out_id = 1;
979 2335 : auto sck_out_id = sck_to_adp_sck_id[sck_out_ptr];
980 2335 : sck_to_adp_sck_id_new[sck_out_ptr] = adp_sck_id;
981 : auto adp_prev =
982 2335 : t == 0 ? this->adaptors[sta - 1].first[0] : this->adaptors[sta - 1].second[t - 1];
983 2335 : auto sck_out = (*adp_prev)[tsk_out_id].sockets[sck_out_id];
984 2335 : auto priority = std::get<4>(bind.first);
985 2335 : this->adaptors_binds.push_back(
986 2335 : std::make_tuple(sck_out.get(),
987 2335 : task_push->sockets[adp_sck_id++].get(),
988 : priority,
989 2335 : nullptr)); // <= only socket to socket binding is possible here
990 2335 : }
991 :
992 6769 : fwd_source[sck_out_dptr] = 1; // remember that this memory space has been connected to the
993 : // adaptor once
994 6769 : passed_scks_out.push_back(sck_out_ptr);
995 : }
996 : }
997 : }
998 3092 : passed_scks_out.clear();
999 3092 : }
1000 269 : this->saved_lasts_tasks_id[sta] = this->stages[sta]->lasts_tasks_id;
1001 269 : }
1002 373 : sck_to_adp_sck_id = sck_to_adp_sck_id_new;
1003 373 : }
1004 104 : }
1005 :
1006 : void
1007 310 : Pipeline::bind_adaptors()
1008 : {
1009 310 : this->_bind_adaptors(true);
1010 310 : }
1011 :
1012 : void
1013 310 : Pipeline::_bind_adaptors(const bool bind_adaptors)
1014 : {
1015 310 : if (!this->bound_adaptors)
1016 : {
1017 1421 : for (size_t sta = 0; sta < this->stages.size(); sta++)
1018 : {
1019 1111 : const auto n_threads = this->stages[sta]->get_n_threads();
1020 :
1021 : // --------------------------------------------------------------------------------------------------------
1022 : // ------------------------------------------------------------------------------------------- pull adaptor
1023 : // --------------------------------------------------------------------------------------------------------
1024 1111 : if (sta > 0)
1025 : {
1026 10071 : for (size_t t = 0; t < n_threads; t++)
1027 : {
1028 : module::Adaptor_m_to_n* cur_adp =
1029 9270 : t > 0 ? adaptors[sta - 1].second[t - 1].get() : adaptors[sta - 1].first[0].get();
1030 :
1031 9270 : if (t > 0 || sta == this->stages.size() - 1) // add the adaptor to the current stage
1032 8767 : this->stages[sta]->all_modules[t].push_back(cur_adp);
1033 :
1034 9270 : auto task_pull = &(*cur_adp)("pull");
1035 :
1036 9270 : auto ss = this->stages[sta]->sequences[t]->get_contents();
1037 9270 : assert(ss != nullptr);
1038 9270 : ss->tasks.insert(ss->tasks.begin(), task_pull);
1039 9270 : ss->processes.insert(ss->processes.begin(),
1040 0 : [task_pull]() -> const int*
1041 : {
1042 0 : task_pull->exec();
1043 0 : const int* status = task_pull->sockets.back()->get_dataptr<const int>();
1044 0 : return status;
1045 : });
1046 9270 : this->stages[sta]->update_tasks_id(t);
1047 : }
1048 801 : this->stages[sta]->firsts_tasks_id.clear();
1049 801 : this->stages[sta]->firsts_tasks_id.push_back(0);
1050 801 : this->stages[sta]->n_tasks++;
1051 : }
1052 :
1053 : // --------------------------------------------------------------------------------------------------------
1054 : // ------------------------------------------------------------------------------------------- push adaptor
1055 : // --------------------------------------------------------------------------------------------------------
1056 1111 : if (sta < this->stages.size() - 1)
1057 : {
1058 801 : size_t last_task_id = 0;
1059 10071 : for (size_t t = 0; t < n_threads; t++)
1060 : {
1061 9270 : module::Adaptor_m_to_n* cur_adp = adaptors[sta].first[t].get();
1062 :
1063 : // add the adaptor to the current stage
1064 9270 : this->stages[sta]->all_modules[t].push_back(cur_adp);
1065 :
1066 9270 : auto task_push = &(*cur_adp)("push");
1067 :
1068 9270 : auto ss = this->stages[sta]->get_last_subsequence(t);
1069 9270 : assert(ss != nullptr);
1070 9270 : ss->tasks.push_back(task_push);
1071 9270 : ss->processes.push_back(
1072 0 : [task_push]() -> const int*
1073 : {
1074 0 : task_push->exec();
1075 0 : const int* status = task_push->sockets.back()->get_dataptr<const int>();
1076 0 : return status;
1077 : });
1078 9270 : last_task_id = ss->tasks_id[ss->tasks_id.size() - 1] + 1;
1079 9270 : ss->tasks_id.push_back(last_task_id);
1080 : }
1081 801 : this->stages[sta]->lasts_tasks_id.clear();
1082 801 : this->stages[sta]->lasts_tasks_id.push_back(last_task_id);
1083 801 : this->stages[sta]->n_tasks++;
1084 : }
1085 1111 : this->stages[sta]->update_firsts_and_lasts_tasks();
1086 : }
1087 :
1088 : // ------------------------------------------------------------------------------------------------------------
1089 : // ---------------------------------------------------------------------------------------------- bind adaptors
1090 : // ------------------------------------------------------------------------------------------------------------
1091 1582 : for (auto& bind : this->sck_orphan_binds)
1092 : {
1093 1272 : auto sck_out = std::get<0>(bind.first);
1094 1272 : auto sck_in = std::get<0>(bind.second);
1095 1272 : if (sck_in != nullptr) // if socket to socket unbinding
1096 1185 : sck_in->unbind(*sck_out);
1097 : else // if socket to task unbinding
1098 : {
1099 87 : auto tsk_in = std::get<4>(bind.second);
1100 87 : assert(tsk_in != nullptr);
1101 87 : tsk_in->unbind(*sck_out);
1102 : }
1103 : }
1104 :
1105 310 : if (bind_adaptors)
1106 : {
1107 33649 : for (auto& bind : this->adaptors_binds)
1108 : {
1109 33339 : auto sck_out = std::get<0>(bind);
1110 33339 : auto sck_in = std::get<1>(bind);
1111 33339 : auto priority = std::get<2>(bind);
1112 33339 : if (sck_in != nullptr) // if socket to socket binding
1113 31263 : sck_in->_bind(*sck_out, priority);
1114 : else // if socket to task binding
1115 : {
1116 2076 : auto tsk_in = std::get<3>(bind);
1117 2076 : assert(tsk_in != nullptr);
1118 2076 : tsk_in->_bind(*sck_out, priority);
1119 : }
1120 : }
1121 : }
1122 :
1123 310 : this->bound_adaptors = true;
1124 : }
1125 310 : }
1126 :
1127 : void
1128 207 : Pipeline::unbind_adaptors()
1129 : {
1130 207 : this->_unbind_adaptors(true);
1131 207 : }
1132 :
1133 : void
1134 413 : Pipeline::_unbind_adaptors(const bool bind_orphans)
1135 : {
1136 413 : if (this->bound_adaptors)
1137 : {
1138 1421 : for (size_t sta = 0; sta < this->stages.size(); sta++)
1139 : {
1140 1111 : const auto n_threads = this->stages[sta]->get_n_threads();
1141 :
1142 : // --------------------------------------------------------------------------------------------------------
1143 : // ------------------------------------------------------------------------------------------- pull adaptor
1144 : // --------------------------------------------------------------------------------------------------------
1145 1111 : if (sta > 0)
1146 : {
1147 10071 : for (size_t t = 0; t < n_threads; t++)
1148 : {
1149 9270 : if (t > 0 || sta == this->stages.size() - 1) // rm the adaptor to the current stage
1150 8767 : this->stages[sta]->all_modules[t].pop_back();
1151 :
1152 9270 : auto ss = this->stages[sta]->sequences[t]->get_contents();
1153 9270 : assert(ss != nullptr);
1154 9270 : ss->tasks.erase(ss->tasks.begin());
1155 9270 : ss->processes.erase(ss->processes.begin());
1156 9270 : this->stages[sta]->update_tasks_id(t);
1157 : }
1158 801 : this->stages[sta]->firsts_tasks_id = this->saved_firsts_tasks_id[sta];
1159 801 : this->stages[sta]->n_tasks--;
1160 : }
1161 :
1162 : // --------------------------------------------------------------------------------------------------------
1163 : // ------------------------------------------------------------------------------------------- push adaptor
1164 : // --------------------------------------------------------------------------------------------------------
1165 1111 : if (sta < this->stages.size() - 1)
1166 : {
1167 10071 : for (size_t t = 0; t < n_threads; t++)
1168 : {
1169 : // rm the adaptor to the current stage
1170 9270 : this->stages[sta]->all_modules[t].pop_back();
1171 :
1172 9270 : auto ss = this->stages[sta]->get_last_subsequence(t);
1173 9270 : assert(ss != nullptr);
1174 9270 : ss->tasks.pop_back();
1175 9270 : ss->processes.pop_back();
1176 9270 : ss->tasks_id.pop_back();
1177 : }
1178 801 : this->stages[sta]->lasts_tasks_id = this->saved_lasts_tasks_id[sta];
1179 801 : this->stages[sta]->n_tasks--;
1180 : }
1181 1111 : this->stages[sta]->update_firsts_and_lasts_tasks();
1182 : }
1183 :
1184 : // ------------------------------------------------------------------------------------------------------------
1185 : // -------------------------------------------------------------------------------------------- unbind adaptors
1186 : // ------------------------------------------------------------------------------------------------------------
1187 33649 : for (auto& bind : this->adaptors_binds)
1188 : {
1189 33339 : auto sck_out = std::get<0>(bind);
1190 33339 : auto sck_in = std::get<1>(bind);
1191 33339 : if (sck_in != nullptr) // if socket to socket unbinding
1192 31263 : sck_in->unbind(*sck_out);
1193 : else // if socket to task unbinding
1194 : {
1195 2076 : auto tsk_in = std::get<3>(bind);
1196 2076 : assert(tsk_in != nullptr);
1197 2076 : tsk_in->unbind(*sck_out);
1198 : }
1199 : }
1200 :
1201 310 : if (bind_orphans)
1202 : {
1203 530 : for (auto& bind : this->sck_orphan_binds)
1204 : {
1205 426 : auto sck_out = std::get<0>(bind.first);
1206 426 : auto priority = std::get<4>(bind.first);
1207 426 : auto sck_in = std::get<0>(bind.second);
1208 426 : if (sck_in != nullptr) // if socket to socket binding
1209 397 : sck_in->_bind(*sck_out, priority);
1210 : else // if socket to task binding
1211 : {
1212 29 : auto tsk_in = std::get<4>(bind.second);
1213 29 : assert(tsk_in != nullptr);
1214 29 : tsk_in->_bind(*sck_out, priority);
1215 : }
1216 : }
1217 : }
1218 :
1219 310 : this->bound_adaptors = false;
1220 : }
1221 413 : }
1222 :
1223 : void
1224 0 : Pipeline::exec(const std::vector<std::function<bool(const std::vector<const int*>&)>>& stop_conditions)
1225 : {
1226 0 : if (stop_conditions.size() != this->stages.size() && stop_conditions.size() != 1)
1227 : {
1228 0 : std::stringstream message;
1229 0 : message << "'stop_conditions.size()' has to be equal to 'stages.size()' or to 1 ('stop_conditions.size()' = "
1230 0 : << stop_conditions.size() << ", 'stages.size()' = " << stages.size() << ").";
1231 0 : throw tools::invalid_argument(__FILE__, __LINE__, __func__, message.str());
1232 0 : }
1233 :
1234 0 : if (!this->bound_adaptors)
1235 : {
1236 0 : std::stringstream message;
1237 0 : message << "'bound_adaptors' has to be true to execute the pipeline.";
1238 0 : throw tools::runtime_error(__FILE__, __LINE__, __func__, message.str());
1239 0 : }
1240 :
1241 : // ----------------------------------------------------------------------------------------------------------------
1242 0 : auto& stages = this->stages;
1243 0 : std::vector<const std::function<bool(const std::vector<const int*>&)>*> stop_condition_vec(stages.size() - 1,
1244 0 : nullptr);
1245 0 : if (stop_conditions.size() == stages.size())
1246 0 : for (size_t s = 0; s < stages.size() - 1; s++)
1247 0 : stop_condition_vec[s] = &stop_conditions[s];
1248 :
1249 0 : std::function<void(const size_t)> func_exec = [&stages, &stop_condition_vec](const size_t tid)
1250 : {
1251 0 : size_t s = tid;
1252 0 : if (stop_condition_vec[s])
1253 0 : stages[s]->exec(*(stop_condition_vec[s]));
1254 : else
1255 0 : stages[s]->exec();
1256 :
1257 : // send the signal to stop the next stage
1258 0 : const auto& tasks = stages[s + 1]->get_tasks_per_threads();
1259 0 : for (size_t th = 0; th < tasks.size(); th++)
1260 0 : for (size_t ta = 0; ta < tasks[th].size(); ta++)
1261 : {
1262 0 : auto m = dynamic_cast<module::Adaptor_m_to_n*>(&tasks[th][ta]->get_module());
1263 0 : if (m != nullptr)
1264 0 : if (tasks[th][ta]->get_name().find("pull") != std::string::npos) m->cancel_waiting();
1265 : }
1266 0 : };
1267 :
1268 0 : this->thread_pool->run(func_exec, true);
1269 :
1270 0 : stages[stages.size() - 1]->exec(stop_conditions[stop_conditions.size() - 1]);
1271 :
1272 : // stop all the stages before
1273 0 : for (size_t notify_s = 0; notify_s < stages.size() - 1; notify_s++)
1274 0 : for (auto& m : stages[notify_s]->get_modules<tools::Interface_waiting>())
1275 0 : m->cancel_waiting();
1276 :
1277 0 : this->thread_pool->wait();
1278 0 : this->thread_pool->unset_func_exec();
1279 : // ----------------------------------------------------------------------------------------------------------------
1280 :
1281 : // this is NOT made in the tools::Sequence::exec() to correctly flush the pipeline before restoring buffers
1282 : // initial configuration
1283 0 : for (auto& stage : this->stages)
1284 0 : if (stage->is_no_copy_mode())
1285 : {
1286 0 : stage->reset_no_copy_mode();
1287 0 : stage->gen_processes(false);
1288 : }
1289 :
1290 0 : for (auto& padps : this->adaptors)
1291 : {
1292 0 : for (auto& adp : padps.first)
1293 0 : adp->reset();
1294 0 : for (auto& adp : padps.second)
1295 0 : adp->reset();
1296 : }
1297 0 : }
1298 :
1299 : void
1300 104 : Pipeline::exec(const std::vector<std::function<bool()>>& stop_conditions)
1301 : {
1302 104 : if (stop_conditions.size() != this->stages.size() && stop_conditions.size() != 1)
1303 : {
1304 0 : std::stringstream message;
1305 0 : message << "'stop_conditions.size()' has to be equal to 'stages.size()' or to 1 ('stop_conditions.size()' = "
1306 0 : << stop_conditions.size() << ", 'stages.size()' = " << stages.size() << ").";
1307 0 : throw tools::invalid_argument(__FILE__, __LINE__, __func__, message.str());
1308 0 : }
1309 :
1310 104 : if (!this->bound_adaptors)
1311 : {
1312 0 : std::stringstream message;
1313 0 : message << "'bound_adaptors' has to be true to execute the pipeline.";
1314 0 : throw tools::runtime_error(__FILE__, __LINE__, __func__, message.str());
1315 0 : }
1316 :
1317 : // ----------------------------------------------------------------------------------------------------------------
1318 104 : auto& stages = this->stages;
1319 104 : std::vector<const std::function<bool()>*> stop_condition_vec(stages.size() - 1, nullptr);
1320 104 : if (stop_conditions.size() == stages.size())
1321 4 : for (size_t s = 0; s < stages.size() - 1; s++)
1322 0 : stop_condition_vec[s] = &stop_conditions[s];
1323 :
1324 758 : std::function<void(const size_t)> func_exec = [&stages, &stop_condition_vec](const size_t tid)
1325 : {
1326 247 : size_t s = tid;
1327 247 : if (stop_condition_vec[s])
1328 0 : stages[s]->exec(*(stop_condition_vec[s]));
1329 : else
1330 248 : stages[s]->exec();
1331 :
1332 : // send the signal to stop the next stage
1333 263 : const auto& tasks = stages[s + 1]->get_tasks_per_threads();
1334 3354 : for (size_t th = 0; th < tasks.size(); th++)
1335 23676 : for (size_t ta = 0; ta < tasks[th].size(); ta++)
1336 : {
1337 20545 : auto m = dynamic_cast<module::Adaptor_m_to_n*>(&tasks[th][ta]->get_module());
1338 20536 : if (m != nullptr)
1339 6033 : if (tasks[th][ta]->get_name().find("pull") != std::string::npos) m->cancel_waiting();
1340 : }
1341 371 : };
1342 :
1343 104 : this->thread_pool->run(func_exec, true);
1344 104 : stages[stages.size() - 1]->exec(stop_conditions[stop_conditions.size() - 1]);
1345 :
1346 : // stop all the stages before
1347 373 : for (size_t notify_s = 0; notify_s < stages.size() - 1; notify_s++)
1348 9007 : for (auto& m : stages[notify_s]->get_modules<tools::Interface_waiting>())
1349 9007 : m->cancel_waiting();
1350 :
1351 104 : this->thread_pool->wait();
1352 104 : this->thread_pool->unset_func_exec();
1353 : // ----------------------------------------------------------------------------------------------------------------
1354 :
1355 : // this is NOT made in the tools::Sequence::exec() to correctly flush the pipeline before restoring buffers
1356 : // initial configuration
1357 477 : for (auto& stage : this->stages)
1358 373 : if (stage->is_no_copy_mode())
1359 : {
1360 373 : stage->reset_no_copy_mode();
1361 373 : stage->gen_processes(false);
1362 : }
1363 :
1364 373 : for (auto& padps : this->adaptors)
1365 : {
1366 3361 : for (auto& adp : padps.first)
1367 3092 : adp->reset();
1368 3092 : for (auto& adp : padps.second)
1369 2823 : adp->reset();
1370 : }
1371 104 : }
1372 :
1373 : void
1374 0 : Pipeline::exec(std::function<bool(const std::vector<const int*>&)> stop_condition)
1375 : {
1376 0 : this->exec(std::vector<std::function<bool(const std::vector<const int*>&)>>(1, stop_condition));
1377 0 : }
1378 :
1379 : void
1380 104 : Pipeline::exec(std::function<bool()> stop_condition)
1381 : {
1382 104 : this->exec(std::vector<std::function<bool()>>(1, stop_condition));
1383 104 : }
1384 :
1385 : void
1386 22 : Pipeline::exec()
1387 : {
1388 9216 : this->exec([]() { return false; });
1389 22 : }
1390 :
1391 : std::vector<std::vector<module::Module*>>
1392 0 : Pipeline::get_modules_per_threads() const
1393 : {
1394 0 : std::vector<std::vector<module::Module*>> modules_per_threads;
1395 0 : for (auto& stage : this->stages)
1396 : {
1397 0 : auto modules_per_threads_add = stage->get_modules_per_threads();
1398 0 : if (modules_per_threads_add.size() > modules_per_threads.size())
1399 0 : modules_per_threads.resize(modules_per_threads_add.size());
1400 :
1401 0 : for (size_t t = 0; t < modules_per_threads_add.size(); t++)
1402 0 : modules_per_threads[t].insert(
1403 0 : modules_per_threads[t].end(), modules_per_threads_add[t].begin(), modules_per_threads_add[t].end());
1404 0 : }
1405 0 : return modules_per_threads;
1406 0 : }
1407 :
1408 : std::vector<std::vector<module::Module*>>
1409 0 : Pipeline::get_modules_per_types() const
1410 : {
1411 0 : std::vector<std::vector<module::Module*>> modules_per_types;
1412 0 : for (auto& stage : this->stages)
1413 : {
1414 0 : auto modules_per_types_add = stage->get_modules_per_types();
1415 0 : modules_per_types.insert(modules_per_types.end(), modules_per_types_add.begin(), modules_per_types_add.end());
1416 0 : }
1417 0 : return modules_per_types;
1418 0 : }
1419 :
1420 : std::vector<std::vector<module::Module*>>
1421 0 : Pipeline::get_original_modules() const
1422 : {
1423 0 : return this->original_sequence.get_modules_per_types();
1424 : }
1425 :
1426 : std::vector<std::vector<runtime::Task*>>
1427 104 : Pipeline::get_tasks_per_threads() const
1428 : {
1429 104 : std::vector<std::vector<runtime::Task*>> tasks_per_threads;
1430 477 : for (auto& stage : this->stages)
1431 : {
1432 373 : auto tasks_per_threads_add = stage->get_tasks_per_threads();
1433 373 : if (tasks_per_threads_add.size() > tasks_per_threads.size())
1434 202 : tasks_per_threads.resize(tasks_per_threads_add.size());
1435 :
1436 3574 : for (size_t t = 0; t < tasks_per_threads_add.size(); t++)
1437 12804 : tasks_per_threads[t].insert(
1438 12804 : tasks_per_threads[t].end(), tasks_per_threads_add[t].begin(), tasks_per_threads_add[t].end());
1439 373 : }
1440 104 : return tasks_per_threads;
1441 0 : }
1442 :
1443 : std::vector<std::vector<runtime::Task*>>
1444 0 : Pipeline::get_tasks_per_types() const
1445 : {
1446 0 : std::vector<std::vector<runtime::Task*>> tasks_per_types;
1447 0 : for (auto& stage : this->stages)
1448 : {
1449 0 : auto tasks_per_types_add = stage->get_tasks_per_types();
1450 0 : tasks_per_types.insert(tasks_per_types.end(), tasks_per_types_add.begin(), tasks_per_types_add.end());
1451 0 : }
1452 0 : return tasks_per_types;
1453 0 : }
1454 :
1455 : void
1456 6 : Pipeline::export_dot(std::ostream& stream) const
1457 : {
1458 : std::function<void(tools::Digraph_node<Sub_sequence>*,
1459 : const size_t,
1460 : const std::string&,
1461 : std::ostream&,
1462 : std::vector<tools::Digraph_node<Sub_sequence>*>&)>
1463 25 : export_dot_subsequences_recursive = [&export_dot_subsequences_recursive,
1464 : this](tools::Digraph_node<Sub_sequence>* cur_node,
1465 : const size_t sta,
1466 : const std::string& tab,
1467 : std::ostream& stream,
1468 25 : std::vector<tools::Digraph_node<Sub_sequence>*>& already_parsed_nodes)
1469 : {
1470 50 : if (cur_node != nullptr &&
1471 50 : std::find(already_parsed_nodes.begin(), already_parsed_nodes.end(), cur_node) == already_parsed_nodes.end())
1472 : {
1473 25 : already_parsed_nodes.push_back(cur_node);
1474 50 : this->stages[sta]->export_dot_subsequence(cur_node->get_c()->tasks,
1475 25 : cur_node->get_c()->tasks_id,
1476 25 : cur_node->get_c()->type,
1477 50 : "Sub-sequence " + std::to_string(cur_node->get_c()->id) +
1478 100 : " (depth = " + std::to_string(cur_node->get_depth()) + ")",
1479 : tab,
1480 : stream);
1481 :
1482 25 : for (auto c : cur_node->get_children())
1483 0 : export_dot_subsequences_recursive(c, sta, tab, stream, already_parsed_nodes);
1484 : }
1485 31 : };
1486 :
1487 : std::function<void(tools::Digraph_node<Sub_sequence>*,
1488 : const size_t,
1489 : const std::string&,
1490 : std::ostream&,
1491 : std::vector<tools::Digraph_node<Sub_sequence>*>&)>
1492 : export_dot_connections_recursive =
1493 25 : [&export_dot_connections_recursive, this](tools::Digraph_node<Sub_sequence>* cur_node,
1494 : const size_t sta,
1495 : const std::string& tab,
1496 : std::ostream& stream,
1497 25 : std::vector<tools::Digraph_node<Sub_sequence>*>& already_parsed_nodes)
1498 : {
1499 50 : if (cur_node != nullptr &&
1500 50 : std::find(already_parsed_nodes.begin(), already_parsed_nodes.end(), cur_node) == already_parsed_nodes.end())
1501 : {
1502 25 : already_parsed_nodes.push_back(cur_node);
1503 25 : this->stages[sta]->export_dot_connections(cur_node->get_c()->tasks, tab, stream);
1504 :
1505 25 : for (auto c : cur_node->get_children())
1506 0 : export_dot_connections_recursive(c, sta, tab, stream, already_parsed_nodes);
1507 : }
1508 31 : };
1509 :
1510 6 : std::string tab = "\t";
1511 6 : stream << "digraph Pipeline {" << std::endl;
1512 6 : stream << tab << "compound=true;" << std::endl;
1513 :
1514 31 : for (size_t sta = 0; sta < this->stages.size(); sta++)
1515 : {
1516 25 : std::vector<tools::Digraph_node<Sub_sequence>*> already_parsed_nodes;
1517 25 : const auto n_threads = this->stages[sta]->get_n_threads();
1518 25 : stream << tab << "subgraph \"cluster_Stage " << sta << "\" {" << std::endl;
1519 25 : stream << tab << tab << "node [style=filled];" << std::endl;
1520 25 : export_dot_subsequences_recursive(this->stages[sta]->sequences[0], sta, tab, stream, already_parsed_nodes);
1521 25 : stream << tab << tab << "label=\"Pipeline stage " << sta << " (" << n_threads << " thread(s))\";" << std::endl;
1522 25 : std::string color = "blue";
1523 25 : stream << tab << tab << "color=" << color << ";" << std::endl;
1524 25 : stream << tab << "}" << std::endl;
1525 25 : }
1526 :
1527 31 : for (size_t sta = 0; sta < this->stages.size(); sta++)
1528 : {
1529 25 : std::vector<tools::Digraph_node<Sub_sequence>*> already_parsed_nodes;
1530 25 : export_dot_connections_recursive(this->stages[sta]->sequences[0], sta, tab, stream, already_parsed_nodes);
1531 25 : if (this->bound_adaptors)
1532 : {
1533 25 : if (sta > 0)
1534 : {
1535 19 : auto tsk1 = this->stages[sta - 1]->get_lasts_tasks()[0].back();
1536 19 : auto tsk2 = this->stages[sta + 0]->get_firsts_tasks()[0][0];
1537 :
1538 19 : auto sck1 = tsk1->sockets[0];
1539 19 : auto sck2 = tsk2->sockets[0];
1540 :
1541 19 : stream << tab << "\"" << +sck1.get() << "\" -> \"" << +sck2.get() << "\" [ltail=\"cluster_"
1542 19 : << +&tsk1->get_module() << "_" << +tsk1 << "\" lhead=\"cluster_" << +&tsk2->get_module() << "_"
1543 19 : << +tsk2 << "\" color=\"green\" style=\"dashed\"];" << std::endl;
1544 19 : }
1545 : }
1546 25 : }
1547 :
1548 6 : stream << "}" << std::endl;
1549 6 : }
1550 :
1551 : bool
1552 206 : Pipeline::is_bound_adaptors() const
1553 : {
1554 206 : return this->bound_adaptors;
1555 : }
1556 :
1557 : void
1558 0 : Pipeline::set_auto_stop(const bool auto_stop)
1559 : {
1560 0 : this->auto_stop = auto_stop;
1561 0 : for (auto stage : this->stages)
1562 0 : stage->set_auto_stop(auto_stop);
1563 0 : }
1564 :
1565 : bool
1566 0 : Pipeline::is_auto_stop() const
1567 : {
1568 0 : return this->auto_stop;
1569 : }
1570 :
1571 : size_t
1572 0 : Pipeline::get_n_frames() const
1573 : {
1574 0 : const auto n_frames = this->stages[0]->get_n_frames();
1575 :
1576 0 : for (auto& sta : this->stages)
1577 0 : if (sta->get_n_frames() != n_frames)
1578 : {
1579 0 : std::stringstream message;
1580 0 : message << "All the stages/sequences do not have the same 'n_frames' value ('sta->get_n_frames()' = "
1581 0 : << sta->get_n_frames() << ", 'n_frames' = " << n_frames << ").";
1582 0 : throw tools::runtime_error(__FILE__, __LINE__, __func__, message.str());
1583 0 : }
1584 :
1585 0 : return n_frames;
1586 : }
1587 :
1588 : void
1589 206 : Pipeline::set_n_frames(const size_t n_frames)
1590 : {
1591 206 : const auto save_bound_adaptors = this->is_bound_adaptors();
1592 206 : if (!save_bound_adaptors) this->bind_adaptors();
1593 206 : this->_unbind_adaptors(false);
1594 :
1595 : // set the new "n_frames" val in the sequences
1596 206 : std::vector<std::vector<std::pair<runtime::Socket*, runtime::Socket*>>> unbind_sockets(this->stages.size());
1597 206 : std::vector<std::vector<std::pair<runtime::Task*, runtime::Socket*>>> unbind_tasks(this->stages.size());
1598 206 : std::vector<bool> skip(this->stages.size());
1599 944 : for (size_t s = 0; s < this->stages.size(); s++)
1600 738 : skip[s] = this->stages[s]->get_n_frames() == n_frames;
1601 944 : for (size_t s = 0; s < this->stages.size(); s++)
1602 738 : if (!skip[s]) this->stages[s]->_set_n_frames_unbind(unbind_sockets[s], unbind_tasks[s]);
1603 944 : for (size_t s = 0; s < this->stages.size(); s++)
1604 738 : if (!skip[s]) this->stages[s]->_set_n_frames(n_frames);
1605 944 : for (size_t s = 0; s < this->stages.size(); s++)
1606 738 : if (!skip[s]) this->stages[s]->_set_n_frames_rebind(unbind_sockets[s], unbind_tasks[s]);
1607 :
1608 : // set the new "n_frames" val in the adaptors
1609 738 : for (auto& adps : this->adaptors)
1610 : {
1611 6710 : for (auto& adp : adps.first)
1612 6178 : adp->set_n_frames(n_frames);
1613 6178 : for (auto& adp : adps.second)
1614 5646 : adp->set_n_frames(n_frames);
1615 : }
1616 :
1617 : // bind orphans to complete the unbind of the adaptors
1618 1052 : for (auto& bind : this->sck_orphan_binds)
1619 : {
1620 846 : auto sck_out = std::get<0>(bind.first);
1621 846 : auto priority = std::get<4>(bind.first);
1622 846 : auto sck_in = std::get<0>(bind.second);
1623 846 : if (sck_in != nullptr)
1624 788 : sck_in->_bind(*sck_out, priority);
1625 : else
1626 : {
1627 58 : auto tsk_in = std::get<4>(bind.second);
1628 58 : assert(tsk_in != nullptr);
1629 58 : tsk_in->_bind(*sck_out, priority);
1630 : }
1631 : }
1632 :
1633 206 : if (save_bound_adaptors) this->bind_adaptors();
1634 206 : }
|