LCOV - code coverage report
Current view: top level - src/Runtime/Pipeline - Pipeline.cpp (source / functions) Hit Total Coverage
Test: streampu_clean.info Lines: 547 754 72.5 %
Date: 2024-07-31 15:48:41 Functions: 30 49 61.2 %

          Line data    Source code
       1             : #include <cassert>
       2             : #include <fstream>
       3             : #include <thread>
       4             : #include <tuple>
       5             : #include <utility>
       6             : 
       7             : #include "Module/Stateful/Adaptor/Adaptor_1_to_n.hpp"
       8             : #include "Module/Stateful/Adaptor/Adaptor_n_to_1.hpp"
       9             : #include "Runtime/Pipeline/Pipeline.hpp"
      10             : #include "Tools/Exception/exception.hpp"
      11             : #include "Tools/Interface/Interface_waiting.hpp"
      12             : #include "Tools/Thread_pinning/Thread_pinning_utils.hpp"
      13             : 
      14             : using namespace spu;
      15             : using namespace spu::runtime;
      16             : 
      17             : // Pipeline
      18             : // ::Pipeline(const runtime::Task &first,
      19             : //            const runtime::Task &last,
      20             : //            const std::vector<std::pair<std::vector<const runtime::Task*>, std::vector<const runtime::Task*>>>
      21             : //            &sep_stages, const std::vector<size_t> &n_threads, const std::vector<size_t> &synchro_buffer_sizes, const
      22             : //            std::vector<bool> &synchro_active_waiting, const std::vector<bool> &thread_pinning, const
      23             : //            std::vector<std::vector<size_t>> &puids)
      24             : // : original_sequence(first, last, 1),
      25             : //   stages(sep_stages.size()),
      26             : //   adaptors(sep_stages.size() -1),
      27             : //   saved_firsts_tasks_id(sep_stages.size()),
      28             : //   saved_lasts_tasks_id(sep_stages.size()),
      29             : //   bound_adaptors(false)
      30             : // {
      31             : //      this->init<const runtime::Task>(first,
      32             : //                                      &last,
      33             : //                                      sep_stages,
      34             : //                                      n_threads,
      35             : //                                      synchro_buffer_sizes,
      36             : //                                      synchro_active_waiting,
      37             : //                                      thread_pinning,
      38             : //                                      puids);
      39             : // }
      40             : 
      41             : // Pipeline
      42             : // ::Pipeline(const runtime::Task &first,
      43             : //            const std::vector<std::pair<std::vector<const runtime::Task*>, std::vector<const runtime::Task*>>>
      44             : //            &sep_stages, const std::vector<size_t> &n_threads, const std::vector<size_t> &synchro_buffer_sizes, const
      45             : //            std::vector<bool> &synchro_active_waiting, const std::vector<bool> &thread_pinning, const
      46             : //            std::vector<std::vector<size_t>> &puids)
      47             : // : original_sequence(first, 1),
      48             : //   stages(sep_stages.size()),
      49             : //   adaptors(sep_stages.size() -1),
      50             : //   saved_firsts_tasks_id(sep_stages.size()),
      51             : //   saved_lasts_tasks_id(sep_stages.size()),
      52             : //   bound_adaptors(false)
      53             : // {
      54             : //      const runtime::Task* last = nullptr;
      55             : //      this->init<const runtime::Task>(first,
      56             : //                                      last,
      57             : //                                      sep_stages,
      58             : //                                      n_threads,
      59             : //                                      synchro_buffer_sizes,
      60             : //                                      synchro_active_waiting,
      61             : //                                      thread_pinning,
      62             : //                                      puids);
      63             : // }
      64             : 
      65          13 : Pipeline
      66             : ::Pipeline(const std::vector<runtime::Task*> &firsts,
      67             :            const std::vector<runtime::Task*> &lasts,
      68             :            const std::vector<std::tuple<std::vector<runtime::Task*>, std::vector<runtime::Task*>, std::vector<runtime::Task*>>> &sep_stages,
      69             :            const std::vector<size_t> &n_threads,
      70             :            const std::vector<size_t> &synchro_buffer_sizes,
      71             :            const std::vector<bool> &synchro_active_waiting,
      72             :            const std::vector<bool> &thread_pinning,
      73             :            const std::vector<std::vector<size_t>> &puids/*,
      74          13 :            const std::vector<bool> &tasks_inplace*/)
      75          13 : : original_sequence(firsts, lasts, 1),
      76          13 :   stages(sep_stages.size()),
      77          13 :   adaptors(sep_stages.size() -1),
      78          13 :   saved_firsts_tasks_id(sep_stages.size()),
      79          13 :   saved_lasts_tasks_id(sep_stages.size()),
      80          13 :   bound_adaptors(false),
      81          26 :   auto_stop(true)
      82             : {
      83          13 :     this->init<runtime::Task>(
      84             :       firsts, lasts, sep_stages, n_threads, synchro_buffer_sizes, synchro_active_waiting, thread_pinning, puids
      85             :       /*, tasks_inplace*/);
      86          13 : }
      87             : 
      88          26 : Pipeline
      89             : ::Pipeline(const std::vector<runtime::Task*> &firsts,
      90             :            const std::vector<runtime::Task*> &lasts,
      91             :            const std::vector<std::pair<std::vector<runtime::Task*>, std::vector<runtime::Task*>>> &sep_stages,
      92             :            const std::vector<size_t> &n_threads,
      93             :            const std::vector<size_t> &synchro_buffer_sizes,
      94             :            const std::vector<bool> &synchro_active_waiting,
      95             :            const std::vector<bool> &thread_pinning,
      96             :            const std::vector<std::vector<size_t>> &puids/*,
      97          26 :            const std::vector<bool> &tasks_inplace*/)
      98          26 : : original_sequence(firsts, lasts, 1),
      99          26 :   stages(sep_stages.size()),
     100          26 :   adaptors(sep_stages.size() -1),
     101          26 :   saved_firsts_tasks_id(sep_stages.size()),
     102          26 :   saved_lasts_tasks_id(sep_stages.size()),
     103          26 :   bound_adaptors(false),
     104          52 :   auto_stop(true)
     105             : {
     106             :     std::vector<std::tuple<std::vector<runtime::Task*>, std::vector<runtime::Task*>, std::vector<runtime::Task*>>>
     107          26 :       sep_stages_bis;
     108         117 :     for (auto& sep_stage : sep_stages)
     109          91 :         sep_stages_bis.push_back(std::make_tuple(sep_stage.first, sep_stage.second, std::vector<runtime::Task*>()));
     110             : 
     111          26 :     this->init<runtime::Task>(firsts,
     112             :                                   lasts,
     113             :                                   sep_stages_bis,
     114             :                                   n_threads,
     115             :                                   synchro_buffer_sizes,
     116             :                                   synchro_active_waiting,
     117             :                                   thread_pinning, puids/*,
     118             :                                   tasks_inplace*/);
     119          26 : }
     120             : 
     121          13 : Pipeline
     122             : ::Pipeline(const std::vector<runtime::Task*> &firsts,
     123             :            const std::vector<std::tuple<std::vector<runtime::Task*>, std::vector<runtime::Task*>, std::vector<runtime::Task*>>> &sep_stages,
     124             :            const std::vector<size_t> &n_threads,
     125             :            const std::vector<size_t> &synchro_buffer_sizes,
     126             :            const std::vector<bool> &synchro_active_waiting,
     127             :            const std::vector<bool> &thread_pinning,
     128             :            const std::vector<std::vector<size_t>> &puids/*,
     129          13 :            const std::vector<bool> &tasks_inplace*/)
     130             : : Pipeline(firsts,
     131             :            {},
     132             :            sep_stages,
     133             :            n_threads,
     134             :            synchro_buffer_sizes,
     135             :            synchro_active_waiting,
     136             :            thread_pinning, puids/*,
     137          13 :            tasks_inplace*/)
     138             : {
     139          13 : }
     140             : 
     141          26 : Pipeline
     142             : ::Pipeline(const std::vector<runtime::Task*> &firsts,
     143             :            const std::vector<std::pair<std::vector<runtime::Task*>, std::vector<runtime::Task*>>> &sep_stages,
     144             :            const std::vector<size_t> &n_threads,
     145             :            const std::vector<size_t> &synchro_buffer_sizes,
     146             :            const std::vector<bool> &synchro_active_waiting,
     147             :            const std::vector<bool> &thread_pinning,
     148             :            const std::vector<std::vector<size_t>> &puids/*,
     149          26 :            const std::vector<bool> &tasks_inplace*/)
     150             : : Pipeline(firsts,
     151             :            {},
     152             :            sep_stages,
     153             :            n_threads,
     154             :            synchro_buffer_sizes,
     155             :            synchro_active_waiting,
     156             :            thread_pinning, puids/*,
     157          26 :            tasks_inplace*/)
     158             : {
     159          26 : }
     160             : 
     161           0 : Pipeline
     162             : ::Pipeline(runtime::Task &first,
     163             :            runtime::Task &last,
     164             :            const std::vector<std::pair<std::vector<runtime::Task*>, std::vector<runtime::Task*>>> &sep_stages,
     165             :            const std::vector<size_t> &n_threads,
     166             :            const std::vector<size_t> &synchro_buffer_sizes,
     167             :            const std::vector<bool> &synchro_active_waiting,
     168             :            const std::vector<bool> &thread_pinning,
     169             :            const std::vector<std::vector<size_t>> &puids/*,
     170           0 :            const std::vector<bool> &tasks_inplace*/)
     171             : : Pipeline({&first},
     172             :            {&last},
     173             :            sep_stages,
     174             :            n_threads,
     175             :            synchro_buffer_sizes,
     176             :            synchro_active_waiting,
     177             :            thread_pinning, puids/*,
     178           0 :            tasks_inplace*/)
     179             : {
     180           0 : }
     181             : 
     182          26 : Pipeline
     183             : ::Pipeline(runtime::Task &first,
     184             :            const std::vector<std::pair<std::vector<runtime::Task*>, std::vector<runtime::Task*>>> &sep_stages,
     185             :            const std::vector<size_t> &n_threads,
     186             :            const std::vector<size_t> &synchro_buffer_sizes,
     187             :            const std::vector<bool> &synchro_active_waiting,
     188             :            const std::vector<bool> &thread_pinning,
     189             :            const std::vector<std::vector<size_t>> &puids/*,
     190          26 :            const std::vector<bool> &tasks_inplace*/)
     191             : : Pipeline({&first},
     192             :            sep_stages,
     193             :            n_threads,
     194             :            synchro_buffer_sizes,
     195             :            synchro_active_waiting,
     196             :            thread_pinning, puids/*,
     197          26 :            tasks_inplace*/)
     198             : {
     199          26 : }
     200             : 
     201             : //========================== Pipeline constructors with new version thread pinning =====================================
     202           0 : Pipeline
     203             : ::Pipeline(const std::vector<runtime::Task*> &firsts,
     204             :            const std::vector<runtime::Task*> &lasts,
     205             :            const std::vector<std::tuple<std::vector<runtime::Task*>, std::vector<runtime::Task*>, std::vector<runtime::Task*>>> &sep_stages,
     206             :            const std::vector<size_t> &n_threads,
     207             :            const std::vector<size_t> &synchro_buffer_sizes,
     208             :            const std::vector<bool> &synchro_active_waiting,
     209             :            const std::vector<bool> &thread_pinning,
     210             :            const std::string &pipeline_pinning_policy/*,
     211           0 :            const std::vector<bool> &tasks_inplace*/)
     212           0 : : original_sequence(firsts, lasts, 1),
     213           0 :   stages(sep_stages.size()),
     214           0 :   adaptors(sep_stages.size() -1),
     215           0 :   saved_firsts_tasks_id(sep_stages.size()),
     216           0 :   saved_lasts_tasks_id(sep_stages.size()),
     217           0 :   bound_adaptors(false),
     218           0 :   auto_stop(true)
     219             : {
     220           0 :     this->init<runtime::Task>(firsts,
     221             :                                   lasts,
     222             :                                   sep_stages,
     223             :                                   n_threads,
     224             :                                   synchro_buffer_sizes,
     225             :                                   synchro_active_waiting,
     226             :                                   thread_pinning, {}, pipeline_pinning_policy/*,
     227             :                                   tasks_inplace*/);
     228           0 : }
     229             : 
     230          41 : Pipeline
     231             : ::Pipeline(const std::vector<runtime::Task*> &firsts,
     232             :            const std::vector<runtime::Task*> &lasts,
     233             :            const std::vector<std::pair<std::vector<runtime::Task*>, std::vector<runtime::Task*>>> &sep_stages,
     234             :            const std::vector<size_t> &n_threads,
     235             :            const std::vector<size_t> &synchro_buffer_sizes,
     236             :            const std::vector<bool> &synchro_active_waiting,
     237             :            const std::vector<bool> &thread_pinning,
     238             :            const std::string &pipeline_pinning_policy/*,
     239          41 :            const std::vector<bool> &tasks_inplace*/)
     240          41 : : original_sequence(firsts, lasts, 1),
     241          41 :   stages(sep_stages.size()),
     242          41 :   adaptors(sep_stages.size() -1),
     243          41 :   saved_firsts_tasks_id(sep_stages.size()),
     244          41 :   saved_lasts_tasks_id(sep_stages.size()),
     245          41 :   bound_adaptors(false),
     246          82 :   auto_stop(true)
     247             : {
     248             :     std::vector<std::tuple<std::vector<runtime::Task*>, std::vector<runtime::Task*>, std::vector<runtime::Task*>>>
     249          41 :       sep_stages_bis;
     250         183 :     for (auto& sep_stage : sep_stages)
     251         142 :         sep_stages_bis.push_back(std::make_tuple(sep_stage.first, sep_stage.second, std::vector<runtime::Task*>()));
     252             : 
     253          41 :     this->init<runtime::Task>(firsts,
     254             :                                   lasts,
     255             :                                   sep_stages_bis,
     256             :                                   n_threads,
     257             :                                   synchro_buffer_sizes,
     258             :                                   synchro_active_waiting,
     259             :                                   thread_pinning, {}, pipeline_pinning_policy/*,
     260             :                                   tasks_inplace*/);
     261          41 : }
     262             : 
     263           0 : Pipeline
     264             : ::Pipeline(const std::vector<runtime::Task*> &firsts,
     265             :            const std::vector<std::tuple<std::vector<runtime::Task*>, std::vector<runtime::Task*>, std::vector<runtime::Task*>>> &sep_stages,
     266             :            const std::vector<size_t> &n_threads,
     267             :            const std::vector<size_t> &synchro_buffer_sizes,
     268             :            const std::vector<bool> &synchro_active_waiting,
     269             :            const std::vector<bool> &thread_pinning,
     270             :            const std::string &pipeline_pinning_policy/*,
     271           0 :            const std::vector<bool> &tasks_inplace*/)
     272             : : Pipeline(firsts,
     273             :            {},
     274             :            sep_stages,
     275             :            n_threads,
     276             :            synchro_buffer_sizes,
     277             :            synchro_active_waiting,
     278             :            thread_pinning, pipeline_pinning_policy/*,
     279           0 :            tasks_inplace*/)
     280             : {
     281           0 : }
     282             : 
     283          30 : Pipeline
     284             : ::Pipeline(const std::vector<runtime::Task*> &firsts,
     285             :            const std::vector<std::pair<std::vector<runtime::Task*>, std::vector<runtime::Task*>>> &sep_stages,
     286             :            const std::vector<size_t> &n_threads,
     287             :            const std::vector<size_t> &synchro_buffer_sizes,
     288             :            const std::vector<bool> &synchro_active_waiting,
     289             :            const std::vector<bool> &thread_pinning,
     290             :            const std::string &pipeline_pinning_policy/*,
     291          30 :            const std::vector<bool> &tasks_inplace*/)
     292             : : Pipeline(firsts,
     293             :            {},
     294             :            sep_stages,
     295             :            n_threads,
     296             :            synchro_buffer_sizes,
     297             :            synchro_active_waiting,
     298             :            thread_pinning, pipeline_pinning_policy/*,
     299          30 :            tasks_inplace*/)
     300             : {
     301          30 : }
     302             : 
     303           0 : Pipeline
     304             : ::Pipeline(runtime::Task &first,
     305             :            runtime::Task &last,
     306             :            const std::vector<std::pair<std::vector<runtime::Task*>, std::vector<runtime::Task*>>> &sep_stages,
     307             :            const std::vector<size_t> &n_threads,
     308             :            const std::vector<size_t> &synchro_buffer_sizes,
     309             :            const std::vector<bool> &synchro_active_waiting,
     310             :            const std::vector<bool> &thread_pinning,
     311             :            const std::string &pipeline_pinning_policy/*,
     312           0 :            const std::vector<bool> &tasks_inplace*/)
     313             : : Pipeline({&first},
     314             :            {&last},
     315             :            sep_stages,
     316             :            n_threads,
     317             :            synchro_buffer_sizes,
     318             :            synchro_active_waiting,
     319             :            thread_pinning, pipeline_pinning_policy/*,
     320           0 :            tasks_inplace*/)
     321             : {
     322           0 : }
     323             : 
     324          30 : Pipeline
     325             : ::Pipeline(runtime::Task &first,
     326             :            const std::vector<std::pair<std::vector<runtime::Task*>, std::vector<runtime::Task*>>> &sep_stages,
     327             :            const std::vector<size_t> &n_threads,
     328             :            const std::vector<size_t> &synchro_buffer_sizes,
     329             :            const std::vector<bool> &synchro_active_waiting,
     330             :            const std::vector<bool> &thread_pinning,
     331             :            const std::string &pipeline_pinning_policy/*,
     332          30 :            const std::vector<bool> &tasks_inplace*/)
     333             : : Pipeline({&first},
     334             :            sep_stages,
     335             :            n_threads,
     336             :            synchro_buffer_sizes,
     337             :            synchro_active_waiting,
     338             :            thread_pinning,
     339             :            pipeline_pinning_policy/*,
     340          30 :            tasks_inplace*/)
     341             : {
     342          30 : }
     343             : 
     344         160 : Pipeline::~Pipeline()
     345             : {
     346          80 :     this->unbind_adaptors();
     347         160 : }
     348             : 
     349             : std::vector<Sequence*>
     350         159 : Pipeline::get_stages()
     351             : {
     352         159 :     std::vector<Sequence*> stages;
     353         748 :     for (auto& stage : this->stages)
     354         589 :         stages.push_back(stage.get());
     355         159 :     return stages;
     356           0 : }
     357             : 
     358             : Sequence&
     359           0 : Pipeline::operator[](const size_t stage_id)
     360             : {
     361           0 :     assert(stage_id < this->stages.size());
     362           0 :     return *this->stages[stage_id];
     363             : }
     364             : 
     365             : template<class TA>
     366             : runtime::Sequence*
     367             : create_sequence(const std::vector<TA*>& firsts,
     368             :                 const std::vector<TA*>& lasts,
     369             :                 const std::vector<TA*>& exclusions,
     370             :                 const size_t& n_threads,
     371             :                 const bool& thread_pinning,
     372             :                 const std::vector<size_t>& puids,
     373             :                 const bool& tasks_inplace)
     374             : {
     375             :     throw tools::unimplemented_error(__FILE__, __LINE__, __func__);
     376             : }
     377             : 
     378             : template<>
     379             : runtime::Sequence*
     380           0 : create_sequence<const runtime::Task>(const std::vector<const runtime::Task*>& firsts,
     381             :                                      const std::vector<const runtime::Task*>& lasts,
     382             :                                      const std::vector<const runtime::Task*>& exclusions,
     383             :                                      const size_t& n_threads,
     384             :                                      const bool& thread_pinning,
     385             :                                      const std::vector<size_t>& puids,
     386             :                                      const bool& tasks_inplace)
     387             : {
     388           0 :     return new runtime::Sequence(firsts, lasts, exclusions, n_threads, thread_pinning, puids);
     389             : }
     390             : 
     391             : template<>
     392             : Sequence*
     393         248 : create_sequence<runtime::Task>(const std::vector<runtime::Task*>& firsts,
     394             :                                const std::vector<runtime::Task*>& lasts,
     395             :                                const std::vector<runtime::Task*>& exclusions,
     396             :                                const size_t& n_threads,
     397             :                                const bool& thread_pinning,
     398             :                                const std::vector<size_t>& puids,
     399             :                                const bool& tasks_inplace)
     400             : {
     401         248 :     return new runtime::Sequence(firsts, lasts, exclusions, n_threads, thread_pinning, puids, tasks_inplace);
     402             : }
     403             : 
     404             : // Init and sequence creation for second pinning version
     405             : template<class TA>
     406             : runtime::Sequence*
     407             : create_sequence(const std::vector<TA*>& firsts,
     408             :                 const std::vector<TA*>& lasts,
     409             :                 const std::vector<TA*>& exclusions,
     410             :                 const size_t& n_threads,
     411             :                 const bool& thread_pinning,
     412             :                 const std::string& pipeline_pinning_policy,
     413             :                 const bool& tasks_inplace)
     414             : {
     415             :     throw tools::unimplemented_error(__FILE__, __LINE__, __func__);
     416             : }
     417             : 
     418             : template<>
     419             : runtime::Sequence*
     420           0 : create_sequence<const runtime::Task>(const std::vector<const runtime::Task*>& firsts,
     421             :                                      const std::vector<const runtime::Task*>& lasts,
     422             :                                      const std::vector<const runtime::Task*>& exclusions,
     423             :                                      const size_t& n_threads,
     424             :                                      const bool& thread_pinning,
     425             :                                      const std::string& pipeline_pinning_policy,
     426             :                                      const bool& tasks_inplace)
     427             : {
     428           0 :     return new runtime::Sequence(firsts, lasts, exclusions, n_threads, thread_pinning, pipeline_pinning_policy);
     429             : }
     430             : 
     431             : template<>
     432             : Sequence*
     433          34 : create_sequence<runtime::Task>(const std::vector<runtime::Task*>& firsts,
     434             :                                const std::vector<runtime::Task*>& lasts,
     435             :                                const std::vector<runtime::Task*>& exclusions,
     436             :                                const size_t& n_threads,
     437             :                                const bool& thread_pinning,
     438             :                                const std::string& pipeline_pinning_policy,
     439             :                                const bool& tasks_inplace)
     440             : {
     441             :     return new runtime::Sequence(
     442          34 :       firsts, lasts, exclusions, n_threads, thread_pinning, pipeline_pinning_policy, tasks_inplace);
     443             : }
     444             : 
     445             : template <class TA>
     446          80 : void Pipeline
     447             : ::init(const std::vector<TA*> &firsts,
     448             :        const std::vector<TA*> &lasts,
     449             :        const std::vector<std::tuple<std::vector<TA*>,std::vector<TA*>,std::vector<TA*>>> &sep_stages,
     450             :        const std::vector<size_t> &n_threads,
     451             :        const std::vector<size_t> &synchro_buffer_sizes,
     452             :        const std::vector<bool> &synchro_active_waiting,
     453             :        const std::vector<bool> &thread_pinning,
     454             :        const std::vector<std::vector<size_t>> &puids,
     455             :        const std::string &pipeline_pinning_policy/*,
     456             :        const std::vector<bool> &tasks_inplace*/)
     457             : {
     458          80 :     if (sep_stages.size() != n_threads.size() && n_threads.size() != 0)
     459             :     {
     460           0 :         std::stringstream message;
     461           0 :         message << "'n_threads.size()' has to be equal to 'sep_stages.size()' or equal to '0' ('n_threads.size()' = "
     462           0 :                 << n_threads.size() << " , 'sep_stages.size()' = " << sep_stages.size() << ").";
     463           0 :         throw tools::invalid_argument(__FILE__, __LINE__, __func__, message.str());
     464           0 :     }
     465             : 
     466          80 :     if (sep_stages.size() != synchro_buffer_sizes.size() + 1 && synchro_buffer_sizes.size() != 0)
     467             :     {
     468           0 :         std::stringstream message;
     469             :         message << "'synchro_buffer_sizes.size()' has to be equal to 'sep_stages.size() -1' or equal to '0' "
     470           0 :                 << "('synchro_buffer_sizes.size()' = " << synchro_buffer_sizes.size()
     471           0 :                 << " , 'sep_stages.size()' = " << sep_stages.size() << ").";
     472           0 :         throw tools::invalid_argument(__FILE__, __LINE__, __func__, message.str());
     473           0 :     }
     474             : 
     475          80 :     if (sep_stages.size() != synchro_active_waiting.size() + 1 && synchro_active_waiting.size() != 0)
     476             :     {
     477           0 :         std::stringstream message;
     478             :         message << "'synchro_active_waiting.size()' has to be equal to 'sep_stages.size() -1' or equal to '0' "
     479           0 :                 << "('synchro_active_waiting.size()' = " << synchro_active_waiting.size()
     480           0 :                 << " , 'sep_stages.size()' = " << sep_stages.size() << ").";
     481           0 :         throw tools::invalid_argument(__FILE__, __LINE__, __func__, message.str());
     482           0 :     }
     483             : 
     484          80 :     if (sep_stages.size() != thread_pinning.size() && thread_pinning.size() != 0)
     485             :     {
     486           0 :         std::stringstream message;
     487             :         message << "'thread_pinning.size()' has to be equal to 'sep_stages.size()' or equal to '0' ("
     488           0 :                 << "'thread_pinning.size()' = " << thread_pinning.size()
     489           0 :                 << " , 'sep_stages.size()' = " << sep_stages.size() << ").";
     490           0 :         throw tools::invalid_argument(__FILE__, __LINE__, __func__, message.str());
     491           0 :     }
     492             : 
     493          80 :     if (sep_stages.size() != puids.size() && puids.size() != 0)
     494             :     {
     495           0 :         std::stringstream message;
     496           0 :         message << "'puids.size()' has to be equal to 'sep_stages.size()' or equal to '0' ('puids.size()' = "
     497           0 :                 << puids.size() << " , 'sep_stages.size()' = " << sep_stages.size() << ").";
     498           0 :         throw tools::invalid_argument(__FILE__, __LINE__, __func__, message.str());
     499           0 :     }
     500             : 
     501             :     // if (sep_stages.size() != tasks_inplace.size() && tasks_inplace.size() != 0)
     502             :     // {
     503             :     //  std::stringstream message;
     504             :     //  message << "'tasks_inplace.size()' has to be equal to 'sep_stages.size()' or equal to '0' ('"
     505             :     //          << "tasks_inplace.size()' = " << tasks_inplace.size() << " , 'sep_stages.size()' = "
     506             :     //          << sep_stages.size() << ").";
     507             :     //  throw tools::invalid_argument(__FILE__, __LINE__, __func__, message.str());
     508             :     // }
     509             : 
     510          80 :     bool prev_is_parallel = false;
     511         362 :     for (auto t : n_threads)
     512             :     {
     513         282 :         if (t > 1 && prev_is_parallel)
     514             :         {
     515           0 :             std::stringstream message;
     516           0 :             message << "Consecutive parallel stages are not supported.";
     517           0 :             throw tools::invalid_argument(__FILE__, __LINE__, __func__, message.str());
     518           0 :         }
     519         282 :         prev_is_parallel = t > 1;
     520             :     }
     521             : 
     522             :     // Creating a vector of pinning policies for each sequence
     523          80 :     std::vector<std::string> sequences_pinning_policies;
     524          80 :     if (!pipeline_pinning_policy.empty())
     525          13 :         sequences_pinning_policies =
     526             :           tools::Thread_pinning_utils::pipeline_parser_unpacker(pipeline_pinning_policy, sep_stages.size());
     527             : 
     528         362 :     for (size_t s = 0; s < sep_stages.size(); s++)
     529             :     {
     530         282 :         const std::vector<TA*>& stage_firsts = std::get<0>(sep_stages[s]);
     531         282 :         const std::vector<TA*>& stage_lasts = std::get<1>(sep_stages[s]);
     532         282 :         const std::vector<TA*>& stage_exclusions = std::get<2>(sep_stages[s]);
     533         282 :         const size_t stage_n_threads = n_threads.size() ? n_threads[s] : 1;
     534         282 :         const bool stage_thread_pinning = thread_pinning.size() ? thread_pinning[s] : false;
     535         282 :         const std::vector<size_t> stage_puids = puids.size() ? puids[s] : std::vector<size_t>();
     536         316 :         const std::string sequence_pinning_policy =
     537         316 :           sequences_pinning_policies.size() ? sequences_pinning_policies[s] : "";
     538         282 :         const bool stage_tasks_inplace = /*tasks_inplace.size() ? tasks_inplace[s] :*/ true;
     539             :         try
     540             :         {
     541         282 :             if (pipeline_pinning_policy.empty())
     542         248 :                 this->stages[s].reset(create_sequence<TA>(stage_firsts,
     543             :                                                           stage_lasts,
     544             :                                                           stage_exclusions,
     545             :                                                           stage_n_threads,
     546             :                                                           stage_thread_pinning,
     547             :                                                           stage_puids,
     548             :                                                           stage_tasks_inplace));
     549             :             else
     550          34 :                 this->stages[s].reset(create_sequence<TA>(stage_firsts,
     551             :                                                           stage_lasts,
     552             :                                                           stage_exclusions,
     553             :                                                           stage_n_threads,
     554             :                                                           stage_thread_pinning,
     555             :                                                           sequence_pinning_policy,
     556             :                                                           stage_tasks_inplace));
     557             :         }
     558           0 :         catch (const tools::control_flow_error& e)
     559             :         {
     560           0 :             std::stringstream message;
     561           0 :             message << "Invalid control flow error on stage " << s
     562           0 :                     << " (perhaps a switcher's tasks were separated between different stages)." << std::endl
     563           0 :                     << e.what();
     564           0 :             throw tools::control_flow_error(__FILE__, __LINE__, __func__, message.str());
     565           0 :         }
     566         282 :         this->stages[s]->is_part_of_pipeline = true;
     567             :     }
     568             : 
     569             :     // verify that the sequential sequence is equivalent to the pipeline sequence
     570          80 :     auto ref_tasks = this->original_sequence.get_tasks_per_threads()[0];
     571          80 :     auto cur_tasks = this->get_tasks_per_threads()[0];
     572             : 
     573          80 :     if (ref_tasks.size() != cur_tasks.size())
     574             :     {
     575           0 :         std::ofstream f1("dbg_ref_sequence.dot");
     576           0 :         this->original_sequence.export_dot(f1);
     577           0 :         std::ofstream f2("dbg_cur_pipeline.dot");
     578           0 :         this->export_dot(f2);
     579             : 
     580           0 :         std::stringstream message;
     581           0 :         message << "'ref_tasks.size()' has to be equal to 'cur_tasks.size()' ('ref_tasks.size()' = " << ref_tasks.size()
     582           0 :                 << ", 'cur_tasks.size()' = " << cur_tasks.size() << ").";
     583           0 :         throw tools::invalid_argument(__FILE__, __LINE__, __func__, message.str());
     584           0 :     }
     585             : 
     586         820 :     for (size_t ta = 0; ta < cur_tasks.size(); ta++)
     587             :     {
     588         740 :         if (std::find(ref_tasks.begin(), ref_tasks.end(), cur_tasks[ta]) == ref_tasks.end())
     589             :         {
     590           0 :             std::ofstream f1("dbg_ref_sequence.dot");
     591           0 :             this->original_sequence.export_dot(f1);
     592           0 :             std::ofstream f2("dbg_cur_pipeline.dot");
     593           0 :             this->export_dot(f2);
     594             : 
     595           0 :             std::stringstream message;
     596           0 :             message << "'cur_tasks[ta]' is not contained in the 'ref_tasks' vector ('ta' = " << ta
     597           0 :                     << ", 'cur_tasks[ta]' = " << +cur_tasks[ta]
     598           0 :                     << ", 'cur_tasks[ta]->get_name()' = " << cur_tasks[ta]->get_name() << ").";
     599           0 :             throw tools::invalid_argument(__FILE__, __LINE__, __func__, message.str());
     600           0 :         }
     601             :     }
     602             : 
     603          80 :     this->create_adaptors(synchro_buffer_sizes, synchro_active_waiting);
     604          80 :     this->bind_adaptors();
     605          80 : }
     606             : 
     607             : void
     608          80 : Pipeline::create_adaptors(const std::vector<size_t>& synchro_buffer_sizes,
     609             :                           const std::vector<bool>& synchro_active_waiting)
     610             : {
     611             :     //                     sck out addr     occ     stage   tsk id  sck id
     612          80 :     std::vector<std::tuple<runtime::Socket*, size_t, size_t, size_t, size_t>> out_sck_orphans;
     613             : 
     614             :     // for all the stages in the pipeline
     615         282 :     for (size_t sta = 0; sta < this->stages.size() - 1; sta++)
     616             :     {
     617             :         // ------------------------------------------------------------------------------------------------------------
     618             :         // --------------------------------------------------------------- collect orphan output sockets in stage 'sta'
     619             :         // ------------------------------------------------------------------------------------------------------------
     620         202 :         std::vector<std::vector<runtime::Task*>> tasks_per_threads = this->stages[sta]->get_tasks_per_threads();
     621             :         // for all the threads in the current stage
     622             :         // for (size_t t = 0; t < tasks_per_threads.size(); t++)
     623         202 :         size_t t = 0;
     624             :         {
     625             :             // for all the tasks in the stage
     626         746 :             for (size_t tsk_id = 0; tsk_id < tasks_per_threads[t].size(); tsk_id++)
     627             :             {
     628         544 :                 auto tsk = tasks_per_threads[t][tsk_id];
     629             :                 // for all the sockets of the tasks
     630        1914 :                 for (size_t sck_id = 0; sck_id < tsk->sockets.size(); sck_id++)
     631             :                 {
     632        1370 :                     auto sck = tsk->sockets[sck_id];
     633             :                     // if the current socket is an output or forward socket type
     634        1370 :                     if (sck->get_type() == socket_t::SOUT || sck->get_type() == socket_t::SFWD)
     635             :                     {
     636             :                         // for all the bounded sockets to the current socket
     637        1788 :                         for (auto bsck : sck->get_bound_sockets())
     638             :                         {
     639             :                             // check if the task of the bounded socket is not in the current stage
     640         683 :                             if (std::find(tasks_per_threads[t].begin(),
     641         683 :                                           tasks_per_threads[t].end(),
     642        1366 :                                           &bsck->get_task()) == tasks_per_threads[t].end())
     643             :                             {
     644             :                                 // check the position of the socket in the orphans
     645         331 :                                 size_t pos = 0;
     646         904 :                                 for (; pos < out_sck_orphans.size(); pos++)
     647         616 :                                     if (std::get<0>(out_sck_orphans[pos]) == sck.get()) break;
     648             : 
     649         331 :                                 if (pos == out_sck_orphans.size())
     650         288 :                                     out_sck_orphans.push_back(std::make_tuple(sck.get(), 1, sta, tsk_id, sck_id));
     651             :                                 else
     652          43 :                                     std::get<1>(out_sck_orphans[pos])++;
     653             :                             }
     654             :                         }
     655             :                     }
     656        1370 :                 }
     657             :             }
     658             :         }
     659             : 
     660             :         // ------------------------------------------------------------------------------------------------------------
     661             :         // -------------------------------------- collect orphan input sockets in stage 'sta +1' and create connections
     662             :         // ------------------------------------------------------------------------------------------------------------
     663         202 :         tasks_per_threads = this->stages[sta + 1]->get_tasks_per_threads();
     664             :         // for all the threads in the current stage
     665             :         // for (size_t t = 0; t < tasks_per_threads.size(); t++)
     666             :         {
     667             :             // for all the tasks in the stage
     668         791 :             for (size_t tsk_id = 0; tsk_id < tasks_per_threads[t].size(); tsk_id++)
     669             :             {
     670         589 :                 auto tsk = tasks_per_threads[t][tsk_id];
     671             :                 // ----------------------------------------- manage socket to socket bindings (with fake input sockets)
     672             :                 // for all the sockets of the tasks
     673        2056 :                 for (size_t sck_id = 0; sck_id < tsk->sockets.size(); sck_id++)
     674             :                 {
     675        1467 :                     auto sck = tsk->sockets[sck_id];
     676             :                     // if the current socket is an input or forward socket type
     677        1467 :                     if (sck->get_type() == socket_t::SIN || sck->get_type() == socket_t::SFWD)
     678             :                     {
     679         596 :                         runtime::Socket* bsck = nullptr;
     680             :                         try
     681             :                         {
     682             :                             // get output bounded socket
     683         596 :                             bsck = &sck->get_bound_socket(); // can throw if there is no bounded socket
     684             :                         }
     685           0 :                         catch (const std::exception&)
     686             :                         {
     687           0 :                         }
     688         596 :                         if (bsck != nullptr)
     689             :                         {
     690             :                             // check if the task of the bounded socket is not in the current stage
     691         596 :                             if (std::find(tasks_per_threads[t].begin(),
     692         596 :                                           tasks_per_threads[t].end(),
     693        1192 :                                           &bsck->get_task()) == tasks_per_threads[t].end())
     694             :                             {
     695             :                                 // check the position of the bounded socket in the orphans
     696         305 :                                 size_t pos = 0;
     697         787 :                                 for (; pos < out_sck_orphans.size(); pos++)
     698         787 :                                     if (std::get<0>(out_sck_orphans[pos]) == bsck) break;
     699             : 
     700         305 :                                 if (pos < out_sck_orphans.size())
     701             :                                 {
     702         305 :                                     auto sck_out = std::get<0>(out_sck_orphans[pos]);
     703         305 :                                     auto sck_in = sck.get();
     704         610 :                                     auto unbind_sout_pos = std::distance(sck_out->get_bound_sockets().begin(),
     705         305 :                                                                          std::find(sck_out->get_bound_sockets().begin(),
     706         305 :                                                                                    sck_out->get_bound_sockets().end(),
     707         305 :                                                                                    sck_in));
     708         305 :                                     this->sck_orphan_binds.push_back(
     709         305 :                                       std::make_pair(std::make_tuple(std::get<0>(out_sck_orphans[pos]),
     710         305 :                                                                      std::get<2>(out_sck_orphans[pos]),
     711         305 :                                                                      std::get<3>(out_sck_orphans[pos]),
     712         305 :                                                                      std::get<4>(out_sck_orphans[pos]),
     713             :                                                                      unbind_sout_pos),
     714         610 :                                                      std::make_tuple(sck_in, sta + 1, tsk_id, sck_id, nullptr)));
     715             :                                 }
     716             :                             }
     717             :                         }
     718             :                     }
     719        1467 :                 }
     720             :                 // ------------------------------------------- manage socket to task bindings (with fake input sockets)
     721         717 :                 for (size_t sck_id = 0; sck_id < tsk->fake_input_sockets.size(); sck_id++)
     722             :                 {
     723         128 :                     auto sck = tsk->fake_input_sockets[sck_id];
     724         128 :                     runtime::Socket* bsck = nullptr;
     725             :                     try
     726             :                     {
     727             :                         // get output bounded socket
     728         128 :                         bsck = &sck->get_bound_socket(); // can throw if there is no bounded socket
     729             :                     }
     730           0 :                     catch (const std::exception&)
     731             :                     {
     732           0 :                     }
     733         128 :                     if (bsck != nullptr)
     734             :                     {
     735             :                         // check if the task of the bounded socket is not in the current stage
     736         128 :                         if (std::find(tasks_per_threads[t].begin(), tasks_per_threads[t].end(), &bsck->get_task()) ==
     737         256 :                             tasks_per_threads[t].end())
     738             :                         {
     739             :                             // check the position of the bounded socket in the orphans
     740          26 :                             size_t pos = 0;
     741         117 :                             for (; pos < out_sck_orphans.size(); pos++)
     742         117 :                                 if (std::get<0>(out_sck_orphans[pos]) == bsck) break;
     743             : 
     744          26 :                             if (pos < out_sck_orphans.size())
     745             :                             {
     746          26 :                                 auto sck_out = std::get<0>(out_sck_orphans[pos]);
     747          26 :                                 auto sck_in = sck.get();
     748          52 :                                 auto unbind_sout_pos = std::distance(sck_out->get_bound_sockets().begin(),
     749          26 :                                                                      std::find(sck_out->get_bound_sockets().begin(),
     750          26 :                                                                                sck_out->get_bound_sockets().end(),
     751          26 :                                                                                sck_in));
     752          26 :                                 this->sck_orphan_binds.push_back(
     753          26 :                                   std::make_pair(std::make_tuple(std::get<0>(out_sck_orphans[pos]),
     754          26 :                                                                  std::get<2>(out_sck_orphans[pos]),
     755          26 :                                                                  std::get<3>(out_sck_orphans[pos]),
     756          26 :                                                                  std::get<4>(out_sck_orphans[pos]),
     757             :                                                                  unbind_sout_pos),
     758          52 :                                                  std::make_tuple(nullptr, sta + 1, tsk_id, sck_id, tsk)));
     759             :                             }
     760             :                         }
     761             :                     }
     762         128 :                 }
     763             :                 // ----------------------------------------------------------------------------------------------------
     764             :             }
     765             :         }
     766         202 :     }
     767             : 
     768             :     // ----------------------------------------------------------------------------------------------------------------
     769             :     // ----------------------------------------------------------------------------------------------- prints for debug
     770             :     // ----------------------------------------------------------------------------------------------------------------
     771             :     // std::cout << "Orphan output sockets list:" << std::endl;
     772             :     // for (auto &sck : out_sck_orphans)
     773             :     // {
     774             :     //  auto sck_out_name = std::get<0>(sck)->get_name();
     775             :     //  auto tsk_out_name = std::get<0>(sck)->get_task().get_name();
     776             :     //  auto sck_out_occ  = std::get<1>(sck);
     777             :     //  auto tsk_out_sta  = std::get<2>(sck);
     778             :     //  auto tsk_out_id   = std::get<3>(sck);
     779             :     //  auto sck_out_id   = std::get<4>(sck);
     780             : 
     781             :     //  std::cout << "  " << tsk_out_name << "[" << sck_out_name << "] (stage " << tsk_out_sta << ", " << sck_out_occ
     782             :     //            << " occurrences, tsk id = " << tsk_out_id << ", sck id = " << sck_out_id << ")" << std::endl;
     783             :     // }
     784             : 
     785             :     // std::cout << std::endl << "Detected socket binds:" << std::endl;
     786             :     // for (auto &bind : this->sck_orphan_binds)
     787             :     // {
     788             :     //  auto sck_out_name = std::get<0>(bind.first)->get_name();
     789             :     //  auto tsk_out_name = std::get<0>(bind.first)->get_task().get_name();
     790             :     //  auto tsk_out_sta  = std::get<1>(bind.first);
     791             :     //  auto tsk_out_id   = std::get<2>(bind.first);
     792             :     //  auto sck_out_id   = std::get<3>(bind.first);
     793             :     //  auto sck_out_ubp  = std::get<4>(bind.first);
     794             : 
     795             :     //  auto sck_in_name = std::get<0>(bind.second)->get_name();
     796             :     //  auto tsk_in_name = std::get<0>(bind.second)->get_task().get_name();
     797             :     //  auto tsk_in_sta  = std::get<1>(bind.second);
     798             :     //  auto tsk_in_id   = std::get<2>(bind.second);
     799             :     //  auto sck_in_id   = std::get<3>(bind.second);
     800             : 
     801             :     //  std::cout << "  " << tsk_out_name << "[" << sck_out_name << "] (stage " << tsk_out_sta << ", tsk id = "
     802             :     //                    << tsk_out_id << ", sck id = " << sck_out_id << ", ubp = " << sck_out_ubp << ")"  << " -> "
     803             :     //                    << tsk_in_name  << "[" << sck_in_name  << "] (stage " << tsk_in_sta  << ", tsk id = "
     804             :     //                    << tsk_in_id  << ", sck id = " << sck_in_id  << ")" << std::endl;
     805             :     // }
     806             : 
     807             :     // ----------------------------------------------------------------------------------------------------------------
     808             :     // ------------------------------------------------------------------------------------------------ create adaptors
     809             :     // ----------------------------------------------------------------------------------------------------------------
     810          80 :     auto sck_orphan_binds_cpy = this->sck_orphan_binds;
     811          80 :     module::Adaptor* adp = nullptr;
     812          80 :     std::map<runtime::Socket*, size_t> sck_to_adp_sck_id;
     813         362 :     for (size_t sta = 0; sta < this->stages.size(); sta++)
     814             :     {
     815         282 :         const auto n_threads = this->stages[sta]->get_n_threads();
     816         282 :         std::vector<std::vector<runtime::Task*>> tasks_per_threads = this->stages[sta]->get_tasks_per_threads();
     817             : 
     818             :         // ------------------------------------------------------------------------------------------------------------
     819             :         // ----------------------------------------------------------------------------------------------- pull adaptor
     820             :         // ------------------------------------------------------------------------------------------------------------
     821         282 :         if (sta > 0)
     822             :         {
     823         202 :             assert(adp != nullptr);
     824             :             //                               sck out addr      stage   tsk id  sck id  unbind_pos
     825             :             std::vector<std::pair<std::tuple<runtime::Socket*, size_t, size_t, size_t, size_t>,
     826             :                                   //                               sck in addr       stage   tsk id  sck id  tsk in addr
     827             :                                   std::tuple<runtime::Socket*, size_t, size_t, size_t, runtime::Task*>>>
     828         202 :               sck_orphan_binds_new;
     829             : 
     830         202 :             auto n_threads_prev_sta = this->stages[sta - 1]->get_n_threads();
     831        2300 :             for (size_t t = 0; t < n_threads; t++)
     832             :             {
     833        2098 :                 module::Adaptor* cur_adp = (t == 0) ? adp : adp->clone();
     834        6294 :                 for (auto& t : cur_adp->tasks)
     835        4196 :                     t->set_fast(true);
     836        2098 :                 if (t > 0)
     837             :                 {
     838        1896 :                     this->adaptors[sta - 1].second.push_back(std::unique_ptr<module::Adaptor>(cur_adp));
     839        1896 :                     cur_adp->set_custom_name((n_threads_prev_sta == 1 ? "Adp_1_to_n_" : "Adp_n_to_1_") +
     840        3792 :                                              std::to_string(sta - 1));
     841             :                 }
     842             : 
     843        2098 :                 auto task_pull = n_threads_prev_sta == 1 ? &(*cur_adp)[(int)module::adp::tsk::pull_n]
     844          88 :                                                          : &(*cur_adp)[(int)module::adp::tsk::pull_1];
     845             : 
     846        2098 :                 sck_orphan_binds_new.clear();
     847       12883 :                 for (auto& bind : sck_orphan_binds_cpy)
     848             :                 {
     849       10785 :                     auto tsk_out_sta = std::get<1>(bind.first);
     850       10785 :                     if (tsk_out_sta < sta)
     851             :                     {
     852        5856 :                         auto tsk_in_sta = std::get<1>(bind.second);
     853        5856 :                         if (tsk_in_sta == sta)
     854             :                         {
     855        3131 :                             auto sck_out_ptr = std::get<0>(bind.first);
     856        3131 :                             auto priority = std::get<4>(bind.first);
     857        3131 :                             auto tsk_in_id = std::get<2>(bind.second);
     858        3131 :                             auto sck_in_id = std::get<3>(bind.second);
     859        3131 :                             runtime::Socket* sck_in = nullptr;
     860        3131 :                             runtime::Task* tsk_in = nullptr;
     861        3131 :                             if (std::get<0>(bind.second) != nullptr) // if socket to socket binding
     862        2536 :                                 sck_in = tasks_per_threads[t][tsk_in_id]->sockets[sck_in_id].get();
     863             :                             else // if socket to task binding
     864         595 :                                 tsk_in = tasks_per_threads[t][tsk_in_id];
     865        3131 :                             this->adaptors_binds.push_back(std::make_tuple(
     866        6262 :                               task_pull->sockets[sck_to_adp_sck_id[sck_out_ptr]].get(), sck_in, priority, tsk_in));
     867             :                         }
     868             :                         else
     869        2725 :                             sck_orphan_binds_new.push_back(bind);
     870             :                     }
     871             :                     else
     872        4929 :                         sck_orphan_binds_new.push_back(bind);
     873             :                 }
     874             : 
     875        2098 :                 if (t > 0) this->stages[sta]->all_modules[t].push_back(cur_adp);
     876             :             }
     877         202 :             this->saved_firsts_tasks_id[sta] = this->stages[sta]->firsts_tasks_id;
     878         202 :             sck_orphan_binds_cpy = sck_orphan_binds_new;
     879         202 :         }
     880             : 
     881             :         // ------------------------------------------------------------------------------------------------------------
     882             :         // ----------------------------------------------------------------------------------------------- push adaptor
     883             :         // ------------------------------------------------------------------------------------------------------------
     884         282 :         std::map<runtime::Socket*, size_t> sck_to_adp_sck_id_new;
     885         282 :         if (sta < this->stages.size() - 1)
     886             :         {
     887         202 :             std::vector<size_t> adp_n_elmts;
     888         202 :             std::vector<std::type_index> adp_datatype;
     889         202 :             size_t adp_buffer_size = synchro_buffer_sizes.size() ? synchro_buffer_sizes[sta] : 1;
     890         202 :             bool adp_active_waiting = synchro_active_waiting.size() ? synchro_active_waiting[sta] : false;
     891         202 :             size_t adp_n_frames = 1;
     892             : 
     893             :             // a map to remember if a passed socket points already to the same memory space
     894         202 :             std::map<void*, size_t> fwd_source;
     895             : 
     896         202 :             std::vector<runtime::Socket*> passed_scks_out;
     897         909 :             for (auto& bind : sck_orphan_binds_cpy)
     898             :             {
     899         707 :                 auto tsk_out_sta = std::get<1>(bind.first);
     900         707 :                 if (tsk_out_sta <= sta)
     901             :                 {
     902         458 :                     auto sck_out = std::get<0>(bind.first);
     903         458 :                     if (std::find(passed_scks_out.begin(), passed_scks_out.end(), sck_out) == passed_scks_out.end())
     904             :                     {
     905             :                         // avoid the creation of new adaptor sockets for forward sockets pointing to the same memory
     906             :                         // space
     907         406 :                         auto sck_out_dptr = (void*)sck_out->get_dataptr<int8_t>();
     908         406 :                         assert(sck_out_dptr != nullptr);
     909         406 :                         if (fwd_source.find(sck_out_dptr) == fwd_source.end())
     910             :                         {
     911         349 :                             fwd_source[sck_out_dptr] = 1;
     912         349 :                             adp_n_frames = sck_out->get_task().get_module().get_n_frames();
     913         349 :                             adp_n_elmts.push_back(sck_out->get_n_elmts() / adp_n_frames);
     914         349 :                             adp_datatype.push_back(sck_out->get_datatype());
     915             :                         }
     916         406 :                         passed_scks_out.push_back(sck_out);
     917             :                     }
     918             :                 }
     919             :             }
     920         202 :             passed_scks_out.clear();
     921             : 
     922             :             // allocate the adaptor for the first thread
     923         202 :             if (n_threads == 1)
     924         114 :                 adp = new module::Adaptor_1_to_n(adp_n_elmts, adp_datatype, adp_buffer_size, adp_active_waiting);
     925             :             else
     926          88 :                 adp = new module::Adaptor_n_to_1(adp_n_elmts, adp_datatype, adp_buffer_size, adp_active_waiting);
     927         202 :             adp->set_n_frames(adp_n_frames);
     928             : 
     929        2300 :             for (size_t t = 0; t < n_threads; t++)
     930             :             {
     931        2098 :                 module::Adaptor* cur_adp = (t == 0) ? adp : adp->clone();
     932        2098 :                 cur_adp->set_custom_name((n_threads == 1 ? "Adp_1_to_n_" : "Adp_n_to_1_") + std::to_string(sta));
     933        2098 :                 this->adaptors[sta].first.push_back(std::unique_ptr<module::Adaptor>(cur_adp));
     934        2098 :                 auto task_push = n_threads == 1 ? &(*cur_adp)[(int)module::adp::tsk::push_1]
     935        1984 :                                                 : &(*cur_adp)[(int)module::adp::tsk::push_n];
     936             : 
     937        2098 :                 std::map<void*, size_t> fwd_source;
     938        2098 :                 sck_to_adp_sck_id_new.clear();
     939        2098 :                 size_t adp_sck_id = 0;
     940       10083 :                 for (auto& bind : sck_orphan_binds_cpy) // bind standard task to last adaptor
     941             :                 {
     942        7985 :                     auto tsk_out_sta = std::get<1>(bind.first);
     943             : 
     944        7985 :                     if (tsk_out_sta <= sta)
     945             :                     {
     946        6836 :                         auto sck_out_ptr = std::get<0>(bind.first);
     947        6836 :                         auto sck_out_dptr = (void*)sck_out_ptr->get_dataptr<int8_t>();
     948        6836 :                         assert(sck_out_dptr != nullptr);
     949             : 
     950        6836 :                         if (std::find(passed_scks_out.begin(),
     951             :                                       passed_scks_out.end(),
     952       19707 :                                       sck_out_ptr) == passed_scks_out.end() &&
     953       12871 :                             fwd_source.find(sck_out_dptr) == fwd_source.end()) // <= the latest condition is here to
     954             :                                                                                //    avoid to bind adaptor sockets two
     955             :                                                                                //    times the same memory space
     956             :                                                                                //    (usefull in the case of multiple
     957             :                                                                                //    fwd sockets pointing to the same
     958             :                                                                                //    memory address)
     959             :                         {
     960        4838 :                             if (tsk_out_sta == sta)
     961             :                             {
     962        3283 :                                 auto tsk_out_id = std::get<2>(bind.first);
     963        3283 :                                 auto sck_out_id = std::get<3>(bind.first);
     964        3283 :                                 auto sck_out = tasks_per_threads[t][tsk_out_id]->sockets[sck_out_id];
     965        3283 :                                 auto priority = std::get<4>(bind.first);
     966        3283 :                                 sck_to_adp_sck_id_new[sck_out_ptr] = adp_sck_id;
     967        3283 :                                 this->adaptors_binds.push_back(
     968        3283 :                                   std::make_tuple(sck_out.get(),
     969        3283 :                                                   task_push->sockets[adp_sck_id++].get(),
     970             :                                                   priority,
     971        3283 :                                                   nullptr)); // <= only socket to socket binding is possible here
     972        3283 :                             }
     973             :                             else // if (tsk_out_sta < sta) // bind prev. adaptor to last adaptor
     974             :                             {
     975        1555 :                                 auto n_threads_prev_sta = this->stages[sta - 1]->get_n_threads();
     976        1555 :                                 auto tsk_out_id = (n_threads_prev_sta == 1) ? (size_t)module::adp::tsk::pull_n
     977             :                                                                             : (size_t)module::adp::tsk::pull_1;
     978        1555 :                                 auto sck_out_id = sck_to_adp_sck_id[sck_out_ptr];
     979        1555 :                                 sck_to_adp_sck_id_new[sck_out_ptr] = adp_sck_id;
     980             :                                 auto adp_prev =
     981        1555 :                                   t == 0 ? this->adaptors[sta - 1].first[0] : this->adaptors[sta - 1].second[t - 1];
     982        1555 :                                 auto sck_out = (*adp_prev)[tsk_out_id].sockets[sck_out_id];
     983        1555 :                                 auto priority = std::get<4>(bind.first);
     984        1555 :                                 this->adaptors_binds.push_back(
     985        1555 :                                   std::make_tuple(sck_out.get(),
     986        1555 :                                                   task_push->sockets[adp_sck_id++].get(),
     987             :                                                   priority,
     988        1555 :                                                   nullptr)); // <= only socket to socket binding is possible here
     989        1555 :                             }
     990             : 
     991        4838 :                             fwd_source[sck_out_dptr] = 1; // remember that this memory space has been connected to the
     992             :                                                           // adaptor once
     993        4838 :                             passed_scks_out.push_back(sck_out_ptr);
     994             :                         }
     995             :                     }
     996             :                 }
     997        2098 :                 passed_scks_out.clear();
     998        2098 :             }
     999         202 :             this->saved_lasts_tasks_id[sta] = this->stages[sta]->lasts_tasks_id;
    1000         202 :         }
    1001         282 :         sck_to_adp_sck_id = sck_to_adp_sck_id_new;
    1002         282 :     }
    1003          80 : }
    1004             : 
    1005             : void
    1006         240 : Pipeline::bind_adaptors()
    1007             : {
    1008         240 :     this->_bind_adaptors(true);
    1009         240 : }
    1010             : 
    1011             : void
    1012         240 : Pipeline::_bind_adaptors(const bool bind_adaptors)
    1013             : {
    1014         240 :     if (!this->bound_adaptors)
    1015             :     {
    1016        1086 :         for (size_t sta = 0; sta < this->stages.size(); sta++)
    1017             :         {
    1018         846 :             const auto n_threads = this->stages[sta]->get_n_threads();
    1019             : 
    1020             :             // --------------------------------------------------------------------------------------------------------
    1021             :             // ------------------------------------------------------------------------------------------- pull adaptor
    1022             :             // --------------------------------------------------------------------------------------------------------
    1023         846 :             if (sta > 0)
    1024             :             {
    1025         606 :                 auto n_threads_prev_sta = this->stages[sta - 1]->get_n_threads();
    1026        6900 :                 for (size_t t = 0; t < n_threads; t++)
    1027             :                 {
    1028             :                     module::Adaptor* cur_adp =
    1029        6294 :                       t > 0 ? adaptors[sta - 1].second[t - 1].get() : adaptors[sta - 1].first[0].get();
    1030             : 
    1031        6294 :                     if (t > 0 || sta == this->stages.size() - 1) // add the adaptor to the current stage
    1032        5916 :                         this->stages[sta]->all_modules[t].push_back(cur_adp);
    1033             : 
    1034        6294 :                     auto task_pull = n_threads_prev_sta == 1 ? &(*cur_adp)[(int)module::adp::tsk::pull_n]
    1035         264 :                                                              : &(*cur_adp)[(int)module::adp::tsk::pull_1];
    1036             : 
    1037        6294 :                     auto ss = this->stages[sta]->sequences[t]->get_contents();
    1038        6294 :                     assert(ss != nullptr);
    1039        6294 :                     ss->tasks.insert(ss->tasks.begin(), task_pull);
    1040        6294 :                     ss->processes.insert(ss->processes.begin(),
    1041           0 :                                          [task_pull]() -> const int*
    1042             :                                          {
    1043           0 :                                              task_pull->exec();
    1044           0 :                                              const int* status = task_pull->sockets.back()->get_dataptr<const int>();
    1045           0 :                                              return status;
    1046             :                                          });
    1047        6294 :                     this->stages[sta]->update_tasks_id(t);
    1048             :                 }
    1049         606 :                 this->stages[sta]->firsts_tasks_id.clear();
    1050         606 :                 this->stages[sta]->firsts_tasks_id.push_back(0);
    1051         606 :                 this->stages[sta]->n_tasks++;
    1052             :             }
    1053             : 
    1054             :             // --------------------------------------------------------------------------------------------------------
    1055             :             // ------------------------------------------------------------------------------------------- push adaptor
    1056             :             // --------------------------------------------------------------------------------------------------------
    1057         846 :             if (sta < this->stages.size() - 1)
    1058             :             {
    1059         606 :                 size_t last_task_id = 0;
    1060        6900 :                 for (size_t t = 0; t < n_threads; t++)
    1061             :                 {
    1062        6294 :                     module::Adaptor* cur_adp = adaptors[sta].first[t].get();
    1063             : 
    1064             :                     // add the adaptor to the current stage
    1065        6294 :                     this->stages[sta]->all_modules[t].push_back(cur_adp);
    1066             : 
    1067        6294 :                     auto task_push = n_threads == 1 ? &(*cur_adp)[(int)module::adp::tsk::push_1]
    1068        5952 :                                                     : &(*cur_adp)[(int)module::adp::tsk::push_n];
    1069             : 
    1070        6294 :                     auto ss = this->stages[sta]->get_last_subsequence(t);
    1071        6294 :                     assert(ss != nullptr);
    1072        6294 :                     ss->tasks.push_back(task_push);
    1073        6294 :                     ss->processes.push_back(
    1074           0 :                       [task_push]() -> const int*
    1075             :                       {
    1076           0 :                           task_push->exec();
    1077           0 :                           const int* status = task_push->sockets.back()->get_dataptr<const int>();
    1078           0 :                           return status;
    1079             :                       });
    1080        6294 :                     last_task_id = ss->tasks_id[ss->tasks_id.size() - 1] + 1;
    1081        6294 :                     ss->tasks_id.push_back(last_task_id);
    1082             :                 }
    1083         606 :                 this->stages[sta]->lasts_tasks_id.clear();
    1084         606 :                 this->stages[sta]->lasts_tasks_id.push_back(last_task_id);
    1085         606 :                 this->stages[sta]->n_tasks++;
    1086             :             }
    1087         846 :             this->stages[sta]->update_firsts_and_lasts_tasks();
    1088             :         }
    1089             : 
    1090             :         // ------------------------------------------------------------------------------------------------------------
    1091             :         // ---------------------------------------------------------------------------------------------- bind adaptors
    1092             :         // ------------------------------------------------------------------------------------------------------------
    1093        1233 :         for (auto& bind : this->sck_orphan_binds)
    1094             :         {
    1095         993 :             auto sck_out = std::get<0>(bind.first);
    1096         993 :             auto sck_in = std::get<0>(bind.second);
    1097         993 :             if (sck_in != nullptr) // if socket to socket unbinding
    1098         915 :                 sck_in->unbind(*sck_out);
    1099             :             else // if socket to task unbinding
    1100             :             {
    1101          78 :                 auto tsk_in = std::get<4>(bind.second);
    1102          78 :                 assert(tsk_in != nullptr);
    1103          78 :                 tsk_in->unbind(*sck_out);
    1104             :             }
    1105             :         }
    1106             : 
    1107         240 :         if (bind_adaptors)
    1108             :         {
    1109       24147 :             for (auto& bind : this->adaptors_binds)
    1110             :             {
    1111       23907 :                 auto sck_out = std::get<0>(bind);
    1112       23907 :                 auto sck_in = std::get<1>(bind);
    1113       23907 :                 auto priority = std::get<2>(bind);
    1114       23907 :                 if (sck_in != nullptr) // if socket to socket binding
    1115       22122 :                     sck_in->_bind(*sck_out, priority);
    1116             :                 else // if socket to task binding
    1117             :                 {
    1118        1785 :                     auto tsk_in = std::get<3>(bind);
    1119        1785 :                     assert(tsk_in != nullptr);
    1120        1785 :                     tsk_in->_bind(*sck_out, priority);
    1121             :                 }
    1122             :             }
    1123             :         }
    1124             : 
    1125         240 :         this->bound_adaptors = true;
    1126             :     }
    1127         240 : }
    1128             : 
    1129             : void
    1130         160 : Pipeline::unbind_adaptors()
    1131             : {
    1132         160 :     this->_unbind_adaptors(true);
    1133         160 : }
    1134             : 
    1135             : void
    1136         320 : Pipeline::_unbind_adaptors(const bool bind_orphans)
    1137             : {
    1138         320 :     if (this->bound_adaptors)
    1139             :     {
    1140        1086 :         for (size_t sta = 0; sta < this->stages.size(); sta++)
    1141             :         {
    1142         846 :             const auto n_threads = this->stages[sta]->get_n_threads();
    1143             : 
    1144             :             // --------------------------------------------------------------------------------------------------------
    1145             :             // ------------------------------------------------------------------------------------------- pull adaptor
    1146             :             // --------------------------------------------------------------------------------------------------------
    1147         846 :             if (sta > 0)
    1148             :             {
    1149        6900 :                 for (size_t t = 0; t < n_threads; t++)
    1150             :                 {
    1151        6294 :                     if (t > 0 || sta == this->stages.size() - 1) // rm the adaptor to the current stage
    1152        5916 :                         this->stages[sta]->all_modules[t].pop_back();
    1153             : 
    1154        6294 :                     auto ss = this->stages[sta]->sequences[t]->get_contents();
    1155        6294 :                     assert(ss != nullptr);
    1156        6294 :                     ss->tasks.erase(ss->tasks.begin());
    1157        6294 :                     ss->processes.erase(ss->processes.begin());
    1158        6294 :                     this->stages[sta]->update_tasks_id(t);
    1159             :                 }
    1160         606 :                 this->stages[sta]->firsts_tasks_id = this->saved_firsts_tasks_id[sta];
    1161         606 :                 this->stages[sta]->n_tasks--;
    1162             :             }
    1163             : 
    1164             :             // --------------------------------------------------------------------------------------------------------
    1165             :             // ------------------------------------------------------------------------------------------- push adaptor
    1166             :             // --------------------------------------------------------------------------------------------------------
    1167         846 :             if (sta < this->stages.size() - 1)
    1168             :             {
    1169        6900 :                 for (size_t t = 0; t < n_threads; t++)
    1170             :                 {
    1171             :                     // rm the adaptor to the current stage
    1172        6294 :                     this->stages[sta]->all_modules[t].pop_back();
    1173             : 
    1174        6294 :                     auto ss = this->stages[sta]->get_last_subsequence(t);
    1175        6294 :                     assert(ss != nullptr);
    1176        6294 :                     ss->tasks.pop_back();
    1177        6294 :                     ss->processes.pop_back();
    1178        6294 :                     ss->tasks_id.pop_back();
    1179             :                 }
    1180         606 :                 this->stages[sta]->lasts_tasks_id = this->saved_lasts_tasks_id[sta];
    1181         606 :                 this->stages[sta]->n_tasks--;
    1182             :             }
    1183         846 :             this->stages[sta]->update_firsts_and_lasts_tasks();
    1184             :         }
    1185             : 
    1186             :         // ------------------------------------------------------------------------------------------------------------
    1187             :         // -------------------------------------------------------------------------------------------- unbind adaptors
    1188             :         // ------------------------------------------------------------------------------------------------------------
    1189       24147 :         for (auto& bind : this->adaptors_binds)
    1190             :         {
    1191       23907 :             auto sck_out = std::get<0>(bind);
    1192       23907 :             auto sck_in = std::get<1>(bind);
    1193       23907 :             if (sck_in != nullptr) // if socket to socket unbinding
    1194       22122 :                 sck_in->unbind(*sck_out);
    1195             :             else // if socket to task unbinding
    1196             :             {
    1197        1785 :                 auto tsk_in = std::get<3>(bind);
    1198        1785 :                 assert(tsk_in != nullptr);
    1199        1785 :                 tsk_in->unbind(*sck_out);
    1200             :             }
    1201             :         }
    1202             : 
    1203         240 :         if (bind_orphans)
    1204             :         {
    1205         411 :             for (auto& bind : this->sck_orphan_binds)
    1206             :             {
    1207         331 :                 auto sck_out = std::get<0>(bind.first);
    1208         331 :                 auto priority = std::get<4>(bind.first);
    1209         331 :                 auto sck_in = std::get<0>(bind.second);
    1210         331 :                 if (sck_in != nullptr) // if socket to socket binding
    1211         305 :                     sck_in->_bind(*sck_out, priority);
    1212             :                 else // if socket to task binding
    1213             :                 {
    1214          26 :                     auto tsk_in = std::get<4>(bind.second);
    1215          26 :                     assert(tsk_in != nullptr);
    1216          26 :                     tsk_in->_bind(*sck_out, priority);
    1217             :                 }
    1218             :             }
    1219             :         }
    1220             : 
    1221         240 :         this->bound_adaptors = false;
    1222             :     }
    1223         320 : }
    1224             : 
    1225             : void
    1226           0 : Pipeline::exec(const std::vector<std::function<bool(const std::vector<const int*>&)>>& stop_conditions)
    1227             : {
    1228           0 :     if (stop_conditions.size() != this->stages.size() && stop_conditions.size() != 1)
    1229             :     {
    1230           0 :         std::stringstream message;
    1231           0 :         message << "'stop_conditions.size()' has to be equal to 'stages.size()' or to 1 ('stop_conditions.size()' = "
    1232           0 :                 << stop_conditions.size() << ", 'stages.size()' = " << stages.size() << ").";
    1233           0 :         throw tools::invalid_argument(__FILE__, __LINE__, __func__, message.str());
    1234           0 :     }
    1235             : 
    1236           0 :     if (!this->bound_adaptors)
    1237             :     {
    1238           0 :         std::stringstream message;
    1239           0 :         message << "'bound_adaptors' has to be true to execute the pipeline.";
    1240           0 :         throw tools::runtime_error(__FILE__, __LINE__, __func__, message.str());
    1241           0 :     }
    1242             : 
    1243           0 :     auto& stages = this->stages;
    1244           0 :     std::vector<std::thread> threads;
    1245           0 :     for (size_t s = 0; s < stages.size() - 1; s++)
    1246             :     {
    1247           0 :         const std::function<bool(const std::vector<const int*>&)>* stop_condition = nullptr;
    1248           0 :         if (stop_conditions.size() == this->stages.size()) stop_condition = &stop_conditions[s];
    1249             : 
    1250           0 :         threads.push_back(std::thread(
    1251           0 :           [&stages, s, stop_condition]()
    1252             :           {
    1253           0 :               if (stop_condition)
    1254           0 :                   stages[s]->exec(*stop_condition);
    1255             :               else
    1256           0 :                   stages[s]->exec();
    1257             : 
    1258             :               // send the signal to stop the next stage
    1259           0 :               const auto& tasks = stages[s + 1]->get_tasks_per_threads();
    1260           0 :               for (size_t th = 0; th < tasks.size(); th++)
    1261           0 :                   for (size_t ta = 0; ta < tasks[th].size(); ta++)
    1262             :                   {
    1263           0 :                       auto m = dynamic_cast<module::Adaptor*>(&tasks[th][ta]->get_module());
    1264           0 :                       if (m != nullptr)
    1265           0 :                           if (tasks[th][ta]->get_name() == "pull_n" || tasks[th][ta]->get_name() == "pull_1")
    1266           0 :                               m->cancel_waiting();
    1267             :                   }
    1268           0 :           }));
    1269             :     }
    1270           0 :     stages[stages.size() - 1]->exec(stop_conditions[stop_conditions.size() - 1]);
    1271             :     // stop all the stages before
    1272           0 :     for (size_t notify_s = 0; notify_s < stages.size() - 1; notify_s++)
    1273           0 :         for (auto& m : stages[notify_s]->get_modules<tools::Interface_waiting>())
    1274           0 :             m->cancel_waiting();
    1275             : 
    1276           0 :     for (auto& t : threads)
    1277           0 :         t.join();
    1278             : 
    1279             :     // this is NOT made in the tools::Sequence::exec() to correctly flush the pipeline before restoring buffers
    1280             :     // initial configuration
    1281           0 :     for (auto& stage : this->stages)
    1282           0 :         if (stage->is_no_copy_mode())
    1283             :         {
    1284           0 :             stage->reset_no_copy_mode();
    1285           0 :             stage->gen_processes(false);
    1286             :         }
    1287             : 
    1288           0 :     for (auto& padps : this->adaptors)
    1289             :     {
    1290           0 :         for (auto& adp : padps.first)
    1291           0 :             adp->reset();
    1292           0 :         for (auto& adp : padps.second)
    1293           0 :             adp->reset();
    1294             :     }
    1295           0 : }
    1296             : 
    1297             : void
    1298          80 : Pipeline::exec(const std::vector<std::function<bool()>>& stop_conditions)
    1299             : {
    1300          80 :     if (stop_conditions.size() != this->stages.size() && stop_conditions.size() != 1)
    1301             :     {
    1302           0 :         std::stringstream message;
    1303           0 :         message << "'stop_conditions.size()' has to be equal to 'stages.size()' or to 1 ('stop_conditions.size()' = "
    1304           0 :                 << stop_conditions.size() << ", 'stages.size()' = " << stages.size() << ").";
    1305           0 :         throw tools::invalid_argument(__FILE__, __LINE__, __func__, message.str());
    1306           0 :     }
    1307             : 
    1308          80 :     if (!this->bound_adaptors)
    1309             :     {
    1310           0 :         std::stringstream message;
    1311           0 :         message << "'bound_adaptors' has to be true to execute the pipeline.";
    1312           0 :         throw tools::runtime_error(__FILE__, __LINE__, __func__, message.str());
    1313           0 :     }
    1314             : 
    1315          80 :     auto& stages = this->stages;
    1316          80 :     std::vector<std::thread> threads;
    1317         282 :     for (size_t s = 0; s < stages.size() - 1; s++)
    1318             :     {
    1319         202 :         const std::function<bool()>* stop_condition = nullptr;
    1320         202 :         if (stop_conditions.size() == this->stages.size()) stop_condition = &stop_conditions[s];
    1321             : 
    1322         202 :         threads.push_back(std::thread(
    1323         599 :           [&stages, s, stop_condition]()
    1324             :           {
    1325         202 :               if (stop_condition)
    1326           0 :                   stages[s]->exec(*stop_condition);
    1327             :               else
    1328         202 :                   stages[s]->exec();
    1329             : 
    1330             :               // send the signal to stop the next stage
    1331         195 :               const auto& tasks = stages[s + 1]->get_tasks_per_threads();
    1332        2296 :               for (size_t th = 0; th < tasks.size(); th++)
    1333       15591 :                   for (size_t ta = 0; ta < tasks[th].size(); ta++)
    1334             :                   {
    1335       13483 :                       auto m = dynamic_cast<module::Adaptor*>(&tasks[th][ta]->get_module());
    1336       13485 :                       if (m != nullptr)
    1337        4115 :                           if (tasks[th][ta]->get_name() == "pull_n" || tasks[th][ta]->get_name() == "pull_1")
    1338        2098 :                               m->cancel_waiting();
    1339             :                   }
    1340         201 :           }));
    1341             :     }
    1342          80 :     stages[stages.size() - 1]->exec(stop_conditions[stop_conditions.size() - 1]);
    1343             :     // stop all the stages before
    1344         282 :     for (size_t notify_s = 0; notify_s < stages.size() - 1; notify_s++)
    1345        6092 :         for (auto& m : stages[notify_s]->get_modules<tools::Interface_waiting>())
    1346        6092 :             m->cancel_waiting();
    1347             : 
    1348         282 :     for (auto& t : threads)
    1349         202 :         t.join();
    1350             : 
    1351             :     // this is NOT made in the tools::Sequence::exec() to correctly flush the pipeline before restoring buffers
    1352             :     // initial configuration
    1353         362 :     for (auto& stage : this->stages)
    1354         282 :         if (stage->is_no_copy_mode())
    1355             :         {
    1356         282 :             stage->reset_no_copy_mode();
    1357         282 :             stage->gen_processes(false);
    1358             :         }
    1359             : 
    1360         282 :     for (auto& padps : this->adaptors)
    1361             :     {
    1362        2300 :         for (auto& adp : padps.first)
    1363        2098 :             adp->reset();
    1364        2098 :         for (auto& adp : padps.second)
    1365        1896 :             adp->reset();
    1366             :     }
    1367          80 : }
    1368             : 
    1369             : void
    1370           0 : Pipeline::exec(std::function<bool(const std::vector<const int*>&)> stop_condition)
    1371             : {
    1372           0 :     this->exec(std::vector<std::function<bool(const std::vector<const int*>&)>>(1, stop_condition));
    1373           0 : }
    1374             : 
    1375             : void
    1376          80 : Pipeline::exec(std::function<bool()> stop_condition)
    1377             : {
    1378          80 :     this->exec(std::vector<std::function<bool()>>(1, stop_condition));
    1379          80 : }
    1380             : 
    1381             : void
    1382          11 : Pipeline::exec()
    1383             : {
    1384        6873 :     this->exec([]() { return false; });
    1385          11 : }
    1386             : 
    1387             : std::vector<std::vector<module::Module*>>
    1388           0 : Pipeline::get_modules_per_threads() const
    1389             : {
    1390           0 :     std::vector<std::vector<module::Module*>> modules_per_threads;
    1391           0 :     for (auto& stage : this->stages)
    1392             :     {
    1393           0 :         auto modules_per_threads_add = stage->get_modules_per_threads();
    1394           0 :         if (modules_per_threads_add.size() > modules_per_threads.size())
    1395           0 :             modules_per_threads.resize(modules_per_threads_add.size());
    1396             : 
    1397           0 :         for (size_t t = 0; t < modules_per_threads_add.size(); t++)
    1398           0 :             modules_per_threads[t].insert(
    1399           0 :               modules_per_threads[t].end(), modules_per_threads_add[t].begin(), modules_per_threads_add[t].end());
    1400           0 :     }
    1401           0 :     return modules_per_threads;
    1402           0 : }
    1403             : 
    1404             : std::vector<std::vector<module::Module*>>
    1405           0 : Pipeline::get_modules_per_types() const
    1406             : {
    1407           0 :     std::vector<std::vector<module::Module*>> modules_per_types;
    1408           0 :     for (auto& stage : this->stages)
    1409             :     {
    1410           0 :         auto modules_per_types_add = stage->get_modules_per_types();
    1411           0 :         modules_per_types.insert(modules_per_types.end(), modules_per_types_add.begin(), modules_per_types_add.end());
    1412           0 :     }
    1413           0 :     return modules_per_types;
    1414           0 : }
    1415             : 
    1416             : std::vector<std::vector<module::Module*>>
    1417           0 : Pipeline::get_original_modules() const
    1418             : {
    1419           0 :     return this->original_sequence.get_modules_per_types();
    1420             : }
    1421             : 
    1422             : std::vector<std::vector<runtime::Task*>>
    1423          80 : Pipeline::get_tasks_per_threads() const
    1424             : {
    1425          80 :     std::vector<std::vector<runtime::Task*>> tasks_per_threads;
    1426         362 :     for (auto& stage : this->stages)
    1427             :     {
    1428         282 :         auto tasks_per_threads_add = stage->get_tasks_per_threads();
    1429         282 :         if (tasks_per_threads_add.size() > tasks_per_threads.size())
    1430         147 :             tasks_per_threads.resize(tasks_per_threads_add.size());
    1431             : 
    1432        2471 :         for (size_t t = 0; t < tasks_per_threads_add.size(); t++)
    1433        8756 :             tasks_per_threads[t].insert(
    1434        8756 :               tasks_per_threads[t].end(), tasks_per_threads_add[t].begin(), tasks_per_threads_add[t].end());
    1435         282 :     }
    1436          80 :     return tasks_per_threads;
    1437           0 : }
    1438             : 
    1439             : std::vector<std::vector<runtime::Task*>>
    1440           0 : Pipeline::get_tasks_per_types() const
    1441             : {
    1442           0 :     std::vector<std::vector<runtime::Task*>> tasks_per_types;
    1443           0 :     for (auto& stage : this->stages)
    1444             :     {
    1445           0 :         auto tasks_per_types_add = stage->get_tasks_per_types();
    1446           0 :         tasks_per_types.insert(tasks_per_types.end(), tasks_per_types_add.begin(), tasks_per_types_add.end());
    1447           0 :     }
    1448           0 :     return tasks_per_types;
    1449           0 : }
    1450             : 
    1451             : void
    1452           5 : Pipeline::export_dot(std::ostream& stream) const
    1453             : {
    1454             :     std::function<void(tools::Digraph_node<Sub_sequence>*,
    1455             :                        const size_t,
    1456             :                        const std::string&,
    1457             :                        std::ostream&,
    1458             :                        std::vector<tools::Digraph_node<Sub_sequence>*>&)>
    1459          22 :       export_dot_subsequences_recursive = [&export_dot_subsequences_recursive,
    1460             :                                            this](tools::Digraph_node<Sub_sequence>* cur_node,
    1461             :                                                  const size_t sta,
    1462             :                                                  const std::string& tab,
    1463             :                                                  std::ostream& stream,
    1464          22 :                                                  std::vector<tools::Digraph_node<Sub_sequence>*>& already_parsed_nodes)
    1465             :     {
    1466          44 :         if (cur_node != nullptr &&
    1467          44 :             std::find(already_parsed_nodes.begin(), already_parsed_nodes.end(), cur_node) == already_parsed_nodes.end())
    1468             :         {
    1469          22 :             already_parsed_nodes.push_back(cur_node);
    1470          44 :             this->stages[sta]->export_dot_subsequence(cur_node->get_c()->tasks,
    1471          22 :                                                       cur_node->get_c()->tasks_id,
    1472          22 :                                                       cur_node->get_c()->type,
    1473          44 :                                                       "Sub-sequence " + std::to_string(cur_node->get_c()->id) +
    1474          88 :                                                         " (depth = " + std::to_string(cur_node->get_depth()) + ")",
    1475             :                                                       tab,
    1476             :                                                       stream);
    1477             : 
    1478          22 :             for (auto c : cur_node->get_children())
    1479           0 :                 export_dot_subsequences_recursive(c, sta, tab, stream, already_parsed_nodes);
    1480             :         }
    1481          27 :     };
    1482             : 
    1483             :     std::function<void(tools::Digraph_node<Sub_sequence>*,
    1484             :                        const size_t,
    1485             :                        const std::string&,
    1486             :                        std::ostream&,
    1487             :                        std::vector<tools::Digraph_node<Sub_sequence>*>&)>
    1488             :       export_dot_connections_recursive =
    1489          22 :         [&export_dot_connections_recursive, this](tools::Digraph_node<Sub_sequence>* cur_node,
    1490             :                                                   const size_t sta,
    1491             :                                                   const std::string& tab,
    1492             :                                                   std::ostream& stream,
    1493          22 :                                                   std::vector<tools::Digraph_node<Sub_sequence>*>& already_parsed_nodes)
    1494             :     {
    1495          44 :         if (cur_node != nullptr &&
    1496          44 :             std::find(already_parsed_nodes.begin(), already_parsed_nodes.end(), cur_node) == already_parsed_nodes.end())
    1497             :         {
    1498          22 :             already_parsed_nodes.push_back(cur_node);
    1499          22 :             this->stages[sta]->export_dot_connections(cur_node->get_c()->tasks, tab, stream);
    1500             : 
    1501          22 :             for (auto c : cur_node->get_children())
    1502           0 :                 export_dot_connections_recursive(c, sta, tab, stream, already_parsed_nodes);
    1503             :         }
    1504          27 :     };
    1505             : 
    1506           5 :     std::string tab = "\t";
    1507           5 :     stream << "digraph Pipeline {" << std::endl;
    1508           5 :     stream << tab << "compound=true;" << std::endl;
    1509             : 
    1510          27 :     for (size_t sta = 0; sta < this->stages.size(); sta++)
    1511             :     {
    1512          22 :         std::vector<tools::Digraph_node<Sub_sequence>*> already_parsed_nodes;
    1513          22 :         const auto n_threads = this->stages[sta]->get_n_threads();
    1514          22 :         stream << tab << "subgraph \"cluster_Stage " << sta << "\" {" << std::endl;
    1515          22 :         stream << tab << tab << "node [style=filled];" << std::endl;
    1516          22 :         export_dot_subsequences_recursive(this->stages[sta]->sequences[0], sta, tab, stream, already_parsed_nodes);
    1517          22 :         stream << tab << tab << "label=\"Pipeline stage " << sta << " (" << n_threads << " thread(s))\";" << std::endl;
    1518          22 :         std::string color = "blue";
    1519          22 :         stream << tab << tab << "color=" << color << ";" << std::endl;
    1520          22 :         stream << tab << "}" << std::endl;
    1521          22 :     }
    1522             : 
    1523          27 :     for (size_t sta = 0; sta < this->stages.size(); sta++)
    1524             :     {
    1525          22 :         std::vector<tools::Digraph_node<Sub_sequence>*> already_parsed_nodes;
    1526          22 :         export_dot_connections_recursive(this->stages[sta]->sequences[0], sta, tab, stream, already_parsed_nodes);
    1527          22 :         if (this->bound_adaptors)
    1528             :         {
    1529          22 :             if (sta > 0)
    1530             :             {
    1531          17 :                 auto tsk1 = this->stages[sta - 1]->get_lasts_tasks()[0].back();
    1532          17 :                 auto tsk2 = this->stages[sta + 0]->get_firsts_tasks()[0][0];
    1533             : 
    1534          17 :                 auto sck1 = tsk1->sockets[0];
    1535          17 :                 auto sck2 = tsk2->sockets[0];
    1536             : 
    1537          17 :                 stream << tab << "\"" << +sck1.get() << "\" -> \"" << +sck2.get() << "\" [ltail=\"cluster_"
    1538          17 :                        << +&tsk1->get_module() << "_" << +tsk1 << "\" lhead=\"cluster_" << +&tsk2->get_module() << "_"
    1539          17 :                        << +tsk2 << "\" color=\"green\" style=\"dashed\"];" << std::endl;
    1540          17 :             }
    1541             :         }
    1542          22 :     }
    1543             : 
    1544           5 :     stream << "}" << std::endl;
    1545           5 : }
    1546             : 
    1547             : bool
    1548         160 : Pipeline::is_bound_adaptors() const
    1549             : {
    1550         160 :     return this->bound_adaptors;
    1551             : }
    1552             : 
    1553             : void
    1554           0 : Pipeline::set_auto_stop(const bool auto_stop)
    1555             : {
    1556           0 :     this->auto_stop = auto_stop;
    1557           0 :     for (auto stage : this->stages)
    1558           0 :         stage->set_auto_stop(auto_stop);
    1559           0 : }
    1560             : 
    1561             : bool
    1562           0 : Pipeline::is_auto_stop() const
    1563             : {
    1564           0 :     return this->auto_stop;
    1565             : }
    1566             : 
    1567             : size_t
    1568           0 : Pipeline::get_n_frames() const
    1569             : {
    1570           0 :     const auto n_frames = this->stages[0]->get_n_frames();
    1571             : 
    1572           0 :     for (auto& sta : this->stages)
    1573           0 :         if (sta->get_n_frames() != n_frames)
    1574             :         {
    1575           0 :             std::stringstream message;
    1576           0 :             message << "All the stages/sequences do not have the same 'n_frames' value ('sta->get_n_frames()' = "
    1577           0 :                     << sta->get_n_frames() << ", 'n_frames' = " << n_frames << ").";
    1578           0 :             throw tools::runtime_error(__FILE__, __LINE__, __func__, message.str());
    1579           0 :         }
    1580             : 
    1581           0 :     return n_frames;
    1582             : }
    1583             : 
    1584             : void
    1585         160 : Pipeline::set_n_frames(const size_t n_frames)
    1586             : {
    1587         160 :     const auto save_bound_adaptors = this->is_bound_adaptors();
    1588         160 :     if (!save_bound_adaptors) this->bind_adaptors();
    1589         160 :     this->_unbind_adaptors(false);
    1590             : 
    1591             :     // set the new "n_frames" val in the sequences
    1592         160 :     std::vector<std::vector<std::pair<runtime::Socket*, runtime::Socket*>>> unbind_sockets(this->stages.size());
    1593         160 :     std::vector<std::vector<std::pair<runtime::Task*, runtime::Socket*>>> unbind_tasks(this->stages.size());
    1594         160 :     std::vector<bool> skip(this->stages.size());
    1595         724 :     for (size_t s = 0; s < this->stages.size(); s++)
    1596         564 :         skip[s] = this->stages[s]->get_n_frames() == n_frames;
    1597         724 :     for (size_t s = 0; s < this->stages.size(); s++)
    1598         564 :         if (!skip[s]) this->stages[s]->_set_n_frames_unbind(unbind_sockets[s], unbind_tasks[s]);
    1599         724 :     for (size_t s = 0; s < this->stages.size(); s++)
    1600         564 :         if (!skip[s]) this->stages[s]->_set_n_frames(n_frames);
    1601         724 :     for (size_t s = 0; s < this->stages.size(); s++)
    1602         564 :         if (!skip[s]) this->stages[s]->_set_n_frames_rebind(unbind_sockets[s], unbind_tasks[s]);
    1603             : 
    1604             :     // set the new "n_frames" val in the adaptors
    1605         564 :     for (auto& adps : this->adaptors)
    1606             :     {
    1607        4600 :         for (auto& adp : adps.first)
    1608        4196 :             adp->set_n_frames(n_frames);
    1609        4196 :         for (auto& adp : adps.second)
    1610        3792 :             adp->set_n_frames(n_frames);
    1611             :     }
    1612             : 
    1613             :     // bind orphans to complete the unbind of the adaptors
    1614         822 :     for (auto& bind : this->sck_orphan_binds)
    1615             :     {
    1616         662 :         auto sck_out = std::get<0>(bind.first);
    1617         662 :         auto priority = std::get<4>(bind.first);
    1618         662 :         auto sck_in = std::get<0>(bind.second);
    1619         662 :         if (sck_in != nullptr)
    1620         610 :             sck_in->_bind(*sck_out, priority);
    1621             :         else
    1622             :         {
    1623          52 :             auto tsk_in = std::get<4>(bind.second);
    1624          52 :             assert(tsk_in != nullptr);
    1625          52 :             tsk_in->_bind(*sck_out, priority);
    1626             :         }
    1627             :     }
    1628             : 
    1629         160 :     if (save_bound_adaptors) this->bind_adaptors();
    1630         160 : }

Generated by: LCOV version 1.14