LCOV - code coverage report
Current view: top level - src/Runtime/Pipeline - Pipeline.cpp (source / functions) Hit Total Coverage
Test: streampu_clean.info Lines: 552 742 74.4 %
Date: 2025-03-14 12:33:06 Functions: 32 49 65.3 %

          Line data    Source code
       1             : #include <cassert>
       2             : #include <fstream>
       3             : #include <thread>
       4             : #include <tuple>
       5             : #include <utility>
       6             : 
       7             : #include "Module/Stateful/Adaptor/Adaptor_m_to_n.hpp"
       8             : #include "Runtime/Pipeline/Pipeline.hpp"
       9             : #include "Tools/Exception/exception.hpp"
      10             : #include "Tools/Interface/Interface_waiting.hpp"
      11             : #include "Tools/Thread/Thread_pinning/Thread_pinning_utils.hpp"
      12             : #include "Tools/Thread/Thread_pool/Standard/Thread_pool_standard.hpp"
      13             : 
      14             : using namespace spu;
      15             : using namespace spu::runtime;
      16             : 
      17             : // Pipeline
      18             : // ::Pipeline(const runtime::Task &first,
      19             : //            const runtime::Task &last,
      20             : //            const std::vector<std::pair<std::vector<const runtime::Task*>, std::vector<const runtime::Task*>>>
      21             : //            &sep_stages, const std::vector<size_t> &n_threads, const std::vector<size_t> &synchro_buffer_sizes, const
      22             : //            std::vector<bool> &synchro_active_waiting, const std::vector<bool> &thread_pinning, const
      23             : //            std::vector<std::vector<size_t>> &puids)
      24             : // : original_sequence(first, last, 1),
      25             : //   stages(sep_stages.size()),
      26             : //   adaptors(sep_stages.size() -1),
      27             : //   saved_firsts_tasks_id(sep_stages.size()),
      28             : //   saved_lasts_tasks_id(sep_stages.size()),
      29             : //   bound_adaptors(false)
      30             : // {
      31             : //      this->init<const runtime::Task>(first,
      32             : //                                      &last,
      33             : //                                      sep_stages,
      34             : //                                      n_threads,
      35             : //                                      synchro_buffer_sizes,
      36             : //                                      synchro_active_waiting,
      37             : //                                      thread_pinning,
      38             : //                                      puids);
      39             : // }
      40             : 
      41             : // Pipeline
      42             : // ::Pipeline(const runtime::Task &first,
      43             : //            const std::vector<std::pair<std::vector<const runtime::Task*>, std::vector<const runtime::Task*>>>
      44             : //            &sep_stages, const std::vector<size_t> &n_threads, const std::vector<size_t> &synchro_buffer_sizes, const
      45             : //            std::vector<bool> &synchro_active_waiting, const std::vector<bool> &thread_pinning, const
      46             : //            std::vector<std::vector<size_t>> &puids)
      47             : // : original_sequence(first, 1),
      48             : //   stages(sep_stages.size()),
      49             : //   adaptors(sep_stages.size() -1),
      50             : //   saved_firsts_tasks_id(sep_stages.size()),
      51             : //   saved_lasts_tasks_id(sep_stages.size()),
      52             : //   bound_adaptors(false)
      53             : // {
      54             : //      const runtime::Task* last = nullptr;
      55             : //      this->init<const runtime::Task>(first,
      56             : //                                      last,
      57             : //                                      sep_stages,
      58             : //                                      n_threads,
      59             : //                                      synchro_buffer_sizes,
      60             : //                                      synchro_active_waiting,
      61             : //                                      thread_pinning,
      62             : //                                      puids);
      63             : // }
      64             : 
      65          16 : Pipeline
      66             : ::Pipeline(const std::vector<runtime::Task*> &firsts,
      67             :            const std::vector<runtime::Task*> &lasts,
      68             :            const std::vector<std::tuple<std::vector<runtime::Task*>, std::vector<runtime::Task*>, std::vector<runtime::Task*>>> &sep_stages,
      69             :            const std::vector<size_t> &n_threads,
      70             :            const std::vector<size_t> &synchro_buffer_sizes,
      71             :            const std::vector<bool> &synchro_active_waiting,
      72             :            const std::vector<bool> &thread_pinning,
      73             :            const std::vector<std::vector<size_t>> &puids/*,
      74          16 :            const std::vector<bool> &tasks_inplace*/)
      75          16 : : original_sequence(firsts, lasts, 1),
      76          16 :   stages(sep_stages.size()),
      77          16 :   adaptors(sep_stages.size() -1),
      78          16 :   saved_firsts_tasks_id(sep_stages.size()),
      79          16 :   saved_lasts_tasks_id(sep_stages.size()),
      80          16 :   bound_adaptors(false),
      81          48 :   auto_stop(true)
      82             : {
      83          16 :     this->init<runtime::Task>(
      84             :       firsts, lasts, sep_stages, n_threads, synchro_buffer_sizes, synchro_active_waiting, thread_pinning, puids
      85             :       /*, tasks_inplace*/);
      86          16 : }
      87             : 
      88          23 : Pipeline
      89             : ::Pipeline(const std::vector<runtime::Task*> &firsts,
      90             :            const std::vector<runtime::Task*> &lasts,
      91             :            const std::vector<std::pair<std::vector<runtime::Task*>, std::vector<runtime::Task*>>> &sep_stages,
      92             :            const std::vector<size_t> &n_threads,
      93             :            const std::vector<size_t> &synchro_buffer_sizes,
      94             :            const std::vector<bool> &synchro_active_waiting,
      95             :            const std::vector<bool> &thread_pinning,
      96             :            const std::vector<std::vector<size_t>> &puids/*,
      97          23 :            const std::vector<bool> &tasks_inplace*/)
      98          23 : : original_sequence(firsts, lasts, 1),
      99          23 :   stages(sep_stages.size()),
     100          23 :   adaptors(sep_stages.size() -1),
     101          23 :   saved_firsts_tasks_id(sep_stages.size()),
     102          23 :   saved_lasts_tasks_id(sep_stages.size()),
     103          23 :   bound_adaptors(false),
     104          69 :   auto_stop(true)
     105             : {
     106             :     std::vector<std::tuple<std::vector<runtime::Task*>, std::vector<runtime::Task*>, std::vector<runtime::Task*>>>
     107          23 :       sep_stages_bis;
     108         105 :     for (auto& sep_stage : sep_stages)
     109          82 :         sep_stages_bis.push_back(std::make_tuple(sep_stage.first, sep_stage.second, std::vector<runtime::Task*>()));
     110             : 
     111          23 :     this->init<runtime::Task>(firsts,
     112             :                                   lasts,
     113             :                                   sep_stages_bis,
     114             :                                   n_threads,
     115             :                                   synchro_buffer_sizes,
     116             :                                   synchro_active_waiting,
     117             :                                   thread_pinning, puids/*,
     118             :                                   tasks_inplace*/);
     119          23 : }
     120             : 
     121          16 : Pipeline
     122             : ::Pipeline(const std::vector<runtime::Task*> &firsts,
     123             :            const std::vector<std::tuple<std::vector<runtime::Task*>, std::vector<runtime::Task*>, std::vector<runtime::Task*>>> &sep_stages,
     124             :            const std::vector<size_t> &n_threads,
     125             :            const std::vector<size_t> &synchro_buffer_sizes,
     126             :            const std::vector<bool> &synchro_active_waiting,
     127             :            const std::vector<bool> &thread_pinning,
     128             :            const std::vector<std::vector<size_t>> &puids/*,
     129          16 :            const std::vector<bool> &tasks_inplace*/)
     130             : : Pipeline(firsts,
     131             :            {},
     132             :            sep_stages,
     133             :            n_threads,
     134             :            synchro_buffer_sizes,
     135             :            synchro_active_waiting,
     136             :            thread_pinning, puids/*,
     137          16 :            tasks_inplace*/)
     138             : {
     139          16 : }
     140             : 
     141          23 : Pipeline
     142             : ::Pipeline(const std::vector<runtime::Task*> &firsts,
     143             :            const std::vector<std::pair<std::vector<runtime::Task*>, std::vector<runtime::Task*>>> &sep_stages,
     144             :            const std::vector<size_t> &n_threads,
     145             :            const std::vector<size_t> &synchro_buffer_sizes,
     146             :            const std::vector<bool> &synchro_active_waiting,
     147             :            const std::vector<bool> &thread_pinning,
     148             :            const std::vector<std::vector<size_t>> &puids/*,
     149          23 :            const std::vector<bool> &tasks_inplace*/)
     150             : : Pipeline(firsts,
     151             :            {},
     152             :            sep_stages,
     153             :            n_threads,
     154             :            synchro_buffer_sizes,
     155             :            synchro_active_waiting,
     156             :            thread_pinning, puids/*,
     157          23 :            tasks_inplace*/)
     158             : {
     159          23 : }
     160             : 
     161           0 : Pipeline
     162             : ::Pipeline(runtime::Task &first,
     163             :            runtime::Task &last,
     164             :            const std::vector<std::pair<std::vector<runtime::Task*>, std::vector<runtime::Task*>>> &sep_stages,
     165             :            const std::vector<size_t> &n_threads,
     166             :            const std::vector<size_t> &synchro_buffer_sizes,
     167             :            const std::vector<bool> &synchro_active_waiting,
     168             :            const std::vector<bool> &thread_pinning,
     169             :            const std::vector<std::vector<size_t>> &puids/*,
     170           0 :            const std::vector<bool> &tasks_inplace*/)
     171             : : Pipeline({&first},
     172             :            {&last},
     173             :            sep_stages,
     174             :            n_threads,
     175             :            synchro_buffer_sizes,
     176             :            synchro_active_waiting,
     177             :            thread_pinning, puids/*,
     178           0 :            tasks_inplace*/)
     179             : {
     180           0 : }
     181             : 
     182          23 : Pipeline
     183             : ::Pipeline(runtime::Task &first,
     184             :            const std::vector<std::pair<std::vector<runtime::Task*>, std::vector<runtime::Task*>>> &sep_stages,
     185             :            const std::vector<size_t> &n_threads,
     186             :            const std::vector<size_t> &synchro_buffer_sizes,
     187             :            const std::vector<bool> &synchro_active_waiting,
     188             :            const std::vector<bool> &thread_pinning,
     189             :            const std::vector<std::vector<size_t>> &puids/*,
     190          23 :            const std::vector<bool> &tasks_inplace*/)
     191             : : Pipeline({&first},
     192             :            sep_stages,
     193             :            n_threads,
     194             :            synchro_buffer_sizes,
     195             :            synchro_active_waiting,
     196             :            thread_pinning, puids/*,
     197          23 :            tasks_inplace*/)
     198             : {
     199          23 : }
     200             : 
     201             : //========================== Pipeline constructors with new version thread pinning =====================================
     202           9 : Pipeline
     203             : ::Pipeline(const std::vector<runtime::Task*> &firsts,
     204             :            const std::vector<runtime::Task*> &lasts,
     205             :            const std::vector<std::tuple<std::vector<runtime::Task*>, std::vector<runtime::Task*>, std::vector<runtime::Task*>>> &sep_stages,
     206             :            const std::vector<size_t> &n_threads,
     207             :            const std::vector<size_t> &synchro_buffer_sizes,
     208             :            const std::vector<bool> &synchro_active_waiting,
     209             :            const std::vector<bool> &thread_pinning,
     210             :            const std::string &pipeline_pinning_policy/*,
     211           9 :            const std::vector<bool> &tasks_inplace*/)
     212           9 : : original_sequence(firsts, lasts, 1),
     213           9 :   stages(sep_stages.size()),
     214           9 :   adaptors(sep_stages.size() -1),
     215           9 :   saved_firsts_tasks_id(sep_stages.size()),
     216           9 :   saved_lasts_tasks_id(sep_stages.size()),
     217           9 :   bound_adaptors(false),
     218          27 :   auto_stop(true)
     219             : {
     220           9 :     this->init<runtime::Task>(firsts,
     221             :                                   lasts,
     222             :                                   sep_stages,
     223             :                                   n_threads,
     224             :                                   synchro_buffer_sizes,
     225             :                                   synchro_active_waiting,
     226             :                                   thread_pinning, {}, pipeline_pinning_policy/*,
     227             :                                   tasks_inplace*/);
     228           9 : }
     229             : 
     230          46 : Pipeline
     231             : ::Pipeline(const std::vector<runtime::Task*> &firsts,
     232             :            const std::vector<runtime::Task*> &lasts,
     233             :            const std::vector<std::pair<std::vector<runtime::Task*>, std::vector<runtime::Task*>>> &sep_stages,
     234             :            const std::vector<size_t> &n_threads,
     235             :            const std::vector<size_t> &synchro_buffer_sizes,
     236             :            const std::vector<bool> &synchro_active_waiting,
     237             :            const std::vector<bool> &thread_pinning,
     238             :            const std::string &pipeline_pinning_policy/*,
     239          46 :            const std::vector<bool> &tasks_inplace*/)
     240          46 : : original_sequence(firsts, lasts, 1),
     241          46 :   stages(sep_stages.size()),
     242          46 :   adaptors(sep_stages.size() -1),
     243          46 :   saved_firsts_tasks_id(sep_stages.size()),
     244          46 :   saved_lasts_tasks_id(sep_stages.size()),
     245          46 :   bound_adaptors(false),
     246         138 :   auto_stop(true)
     247             : {
     248             :     std::vector<std::tuple<std::vector<runtime::Task*>, std::vector<runtime::Task*>, std::vector<runtime::Task*>>>
     249          46 :       sep_stages_bis;
     250         214 :     for (auto& sep_stage : sep_stages)
     251         168 :         sep_stages_bis.push_back(std::make_tuple(sep_stage.first, sep_stage.second, std::vector<runtime::Task*>()));
     252             : 
     253          46 :     this->init<runtime::Task>(firsts,
     254             :                                   lasts,
     255             :                                   sep_stages_bis,
     256             :                                   n_threads,
     257             :                                   synchro_buffer_sizes,
     258             :                                   synchro_active_waiting,
     259             :                                   thread_pinning, {}, pipeline_pinning_policy/*,
     260             :                                   tasks_inplace*/);
     261          46 : }
     262             : 
     263           9 : Pipeline
     264             : ::Pipeline(const std::vector<runtime::Task*> &firsts,
     265             :            const std::vector<std::tuple<std::vector<runtime::Task*>, std::vector<runtime::Task*>, std::vector<runtime::Task*>>> &sep_stages,
     266             :            const std::vector<size_t> &n_threads,
     267             :            const std::vector<size_t> &synchro_buffer_sizes,
     268             :            const std::vector<bool> &synchro_active_waiting,
     269             :            const std::vector<bool> &thread_pinning,
     270             :            const std::string &pipeline_pinning_policy/*,
     271           9 :            const std::vector<bool> &tasks_inplace*/)
     272             : : Pipeline(firsts,
     273             :            {},
     274             :            sep_stages,
     275             :            n_threads,
     276             :            synchro_buffer_sizes,
     277             :            synchro_active_waiting,
     278             :            thread_pinning, pipeline_pinning_policy/*,
     279           9 :            tasks_inplace*/)
     280             : {
     281           9 : }
     282             : 
     283          35 : Pipeline
     284             : ::Pipeline(const std::vector<runtime::Task*> &firsts,
     285             :            const std::vector<std::pair<std::vector<runtime::Task*>, std::vector<runtime::Task*>>> &sep_stages,
     286             :            const std::vector<size_t> &n_threads,
     287             :            const std::vector<size_t> &synchro_buffer_sizes,
     288             :            const std::vector<bool> &synchro_active_waiting,
     289             :            const std::vector<bool> &thread_pinning,
     290             :            const std::string &pipeline_pinning_policy/*,
     291          35 :            const std::vector<bool> &tasks_inplace*/)
     292             : : Pipeline(firsts,
     293             :            {},
     294             :            sep_stages,
     295             :            n_threads,
     296             :            synchro_buffer_sizes,
     297             :            synchro_active_waiting,
     298             :            thread_pinning, pipeline_pinning_policy/*,
     299          35 :            tasks_inplace*/)
     300             : {
     301          35 : }
     302             : 
     303           0 : Pipeline
     304             : ::Pipeline(runtime::Task &first,
     305             :            runtime::Task &last,
     306             :            const std::vector<std::pair<std::vector<runtime::Task*>, std::vector<runtime::Task*>>> &sep_stages,
     307             :            const std::vector<size_t> &n_threads,
     308             :            const std::vector<size_t> &synchro_buffer_sizes,
     309             :            const std::vector<bool> &synchro_active_waiting,
     310             :            const std::vector<bool> &thread_pinning,
     311             :            const std::string &pipeline_pinning_policy/*,
     312           0 :            const std::vector<bool> &tasks_inplace*/)
     313             : : Pipeline({&first},
     314             :            {&last},
     315             :            sep_stages,
     316             :            n_threads,
     317             :            synchro_buffer_sizes,
     318             :            synchro_active_waiting,
     319             :            thread_pinning, pipeline_pinning_policy/*,
     320           0 :            tasks_inplace*/)
     321             : {
     322           0 : }
     323             : 
     324          35 : Pipeline
     325             : ::Pipeline(runtime::Task &first,
     326             :            const std::vector<std::pair<std::vector<runtime::Task*>, std::vector<runtime::Task*>>> &sep_stages,
     327             :            const std::vector<size_t> &n_threads,
     328             :            const std::vector<size_t> &synchro_buffer_sizes,
     329             :            const std::vector<bool> &synchro_active_waiting,
     330             :            const std::vector<bool> &thread_pinning,
     331             :            const std::string &pipeline_pinning_policy/*,
     332          35 :            const std::vector<bool> &tasks_inplace*/)
     333             : : Pipeline({&first},
     334             :            sep_stages,
     335             :            n_threads,
     336             :            synchro_buffer_sizes,
     337             :            synchro_active_waiting,
     338             :            thread_pinning,
     339             :            pipeline_pinning_policy/*,
     340          35 :            tasks_inplace*/)
     341             : {
     342          35 : }
     343             : 
     344         179 : Pipeline::~Pipeline()
     345             : {
     346          94 :     this->unbind_adaptors();
     347         179 : }
     348             : 
     349             : std::vector<Sequence*>
     350         173 : Pipeline::get_stages()
     351             : {
     352         173 :     std::vector<Sequence*> stages;
     353         823 :     for (auto& stage : this->stages)
     354         650 :         stages.push_back(stage.get());
     355         173 :     return stages;
     356           0 : }
     357             : 
     358             : Sequence&
     359           0 : Pipeline::operator[](const size_t stage_id)
     360             : {
     361           0 :     assert(stage_id < this->stages.size());
     362           0 :     return *this->stages[stage_id];
     363             : }
     364             : 
     365             : template<class TA>
     366             : runtime::Sequence*
     367             : create_sequence(const std::vector<TA*>& firsts,
     368             :                 const std::vector<TA*>& lasts,
     369             :                 const std::vector<TA*>& exclusions,
     370             :                 const size_t& n_threads,
     371             :                 const bool& thread_pinning,
     372             :                 const std::vector<size_t>& puids,
     373             :                 const bool& tasks_inplace)
     374             : {
     375             :     throw tools::unimplemented_error(__FILE__, __LINE__, __func__);
     376             : }
     377             : 
     378             : template<>
     379             : runtime::Sequence*
     380           0 : create_sequence<const runtime::Task>(const std::vector<const runtime::Task*>& firsts,
     381             :                                      const std::vector<const runtime::Task*>& lasts,
     382             :                                      const std::vector<const runtime::Task*>& exclusions,
     383             :                                      const size_t& n_threads,
     384             :                                      const bool& thread_pinning,
     385             :                                      const std::vector<size_t>& puids,
     386             :                                      const bool& /*tasks_inplace*/)
     387             : {
     388           0 :     return new runtime::Sequence(firsts, lasts, exclusions, n_threads, thread_pinning, puids);
     389             : }
     390             : 
     391             : template<>
     392             : Sequence*
     393         274 : create_sequence<runtime::Task>(const std::vector<runtime::Task*>& firsts,
     394             :                                const std::vector<runtime::Task*>& lasts,
     395             :                                const std::vector<runtime::Task*>& exclusions,
     396             :                                const size_t& n_threads,
     397             :                                const bool& thread_pinning,
     398             :                                const std::vector<size_t>& puids,
     399             :                                const bool& tasks_inplace)
     400             : {
     401         274 :     return new runtime::Sequence(firsts, lasts, exclusions, n_threads, thread_pinning, puids, tasks_inplace);
     402             : }
     403             : 
     404             : // Init and sequence creation for second pinning version
     405             : template<class TA>
     406             : runtime::Sequence*
     407             : create_sequence(const std::vector<TA*>& firsts,
     408             :                 const std::vector<TA*>& lasts,
     409             :                 const std::vector<TA*>& exclusions,
     410             :                 const size_t& n_threads,
     411             :                 const bool& thread_pinning,
     412             :                 const std::string& pipeline_pinning_policy,
     413             :                 const bool& tasks_inplace)
     414             : {
     415             :     throw tools::unimplemented_error(__FILE__, __LINE__, __func__);
     416             : }
     417             : 
     418             : template<>
     419             : runtime::Sequence*
     420           0 : create_sequence<const runtime::Task>(const std::vector<const runtime::Task*>& firsts,
     421             :                                      const std::vector<const runtime::Task*>& lasts,
     422             :                                      const std::vector<const runtime::Task*>& exclusions,
     423             :                                      const size_t& n_threads,
     424             :                                      const bool& thread_pinning,
     425             :                                      const std::string& pipeline_pinning_policy,
     426             :                                      const bool& /*tasks_inplace*/)
     427             : {
     428           0 :     return new runtime::Sequence(firsts, lasts, exclusions, n_threads, thread_pinning, pipeline_pinning_policy);
     429             : }
     430             : 
     431             : template<>
     432             : Sequence*
     433          62 : create_sequence<runtime::Task>(const std::vector<runtime::Task*>& firsts,
     434             :                                const std::vector<runtime::Task*>& lasts,
     435             :                                const std::vector<runtime::Task*>& exclusions,
     436             :                                const size_t& n_threads,
     437             :                                const bool& thread_pinning,
     438             :                                const std::string& pipeline_pinning_policy,
     439             :                                const bool& tasks_inplace)
     440             : {
     441             :     return new runtime::Sequence(
     442          62 :       firsts, lasts, exclusions, n_threads, thread_pinning, pipeline_pinning_policy, tasks_inplace);
     443             : }
     444             : 
     445             : template <class TA>
     446          94 : void Pipeline
     447             : ::init(const std::vector<TA*> &/*firsts*/,
     448             :        const std::vector<TA*> &/*lasts*/,
     449             :        const std::vector<std::tuple<std::vector<TA*>,std::vector<TA*>,std::vector<TA*>>> &sep_stages,
     450             :        const std::vector<size_t> &n_threads,
     451             :        const std::vector<size_t> &synchro_buffer_sizes,
     452             :        const std::vector<bool> &synchro_active_waiting,
     453             :        const std::vector<bool> &thread_pinning,
     454             :        const std::vector<std::vector<size_t>> &puids,
     455             :        const std::string &pipeline_pinning_policy/*,
     456             :        const std::vector<bool> &tasks_inplace*/)
     457             : {
     458          94 :     if (sep_stages.size() != n_threads.size() && n_threads.size() != 0)
     459             :     {
     460           0 :         std::stringstream message;
     461           0 :         message << "'n_threads.size()' has to be equal to 'sep_stages.size()' or equal to '0' ('n_threads.size()' = "
     462           0 :                 << n_threads.size() << " , 'sep_stages.size()' = " << sep_stages.size() << ").";
     463           0 :         throw tools::invalid_argument(__FILE__, __LINE__, __func__, message.str());
     464           0 :     }
     465             : 
     466          94 :     if (sep_stages.size() != synchro_buffer_sizes.size() + 1 && synchro_buffer_sizes.size() != 0)
     467             :     {
     468           0 :         std::stringstream message;
     469             :         message << "'synchro_buffer_sizes.size()' has to be equal to 'sep_stages.size() -1' or equal to '0' "
     470           0 :                 << "('synchro_buffer_sizes.size()' = " << synchro_buffer_sizes.size()
     471           0 :                 << " , 'sep_stages.size()' = " << sep_stages.size() << ").";
     472           0 :         throw tools::invalid_argument(__FILE__, __LINE__, __func__, message.str());
     473           0 :     }
     474             : 
     475          94 :     if (sep_stages.size() != synchro_active_waiting.size() + 1 && synchro_active_waiting.size() != 0)
     476             :     {
     477           0 :         std::stringstream message;
     478             :         message << "'synchro_active_waiting.size()' has to be equal to 'sep_stages.size() -1' or equal to '0' "
     479           0 :                 << "('synchro_active_waiting.size()' = " << synchro_active_waiting.size()
     480           0 :                 << " , 'sep_stages.size()' = " << sep_stages.size() << ").";
     481           0 :         throw tools::invalid_argument(__FILE__, __LINE__, __func__, message.str());
     482           0 :     }
     483             : 
     484          94 :     if (sep_stages.size() != thread_pinning.size() && thread_pinning.size() != 0)
     485             :     {
     486           0 :         std::stringstream message;
     487             :         message << "'thread_pinning.size()' has to be equal to 'sep_stages.size()' or equal to '0' ("
     488           0 :                 << "'thread_pinning.size()' = " << thread_pinning.size()
     489           0 :                 << " , 'sep_stages.size()' = " << sep_stages.size() << ").";
     490           0 :         throw tools::invalid_argument(__FILE__, __LINE__, __func__, message.str());
     491           0 :     }
     492             : 
     493          94 :     if (sep_stages.size() != puids.size() && puids.size() != 0)
     494             :     {
     495           0 :         std::stringstream message;
     496           0 :         message << "'puids.size()' has to be equal to 'sep_stages.size()' or equal to '0' ('puids.size()' = "
     497           0 :                 << puids.size() << " , 'sep_stages.size()' = " << sep_stages.size() << ").";
     498           0 :         throw tools::invalid_argument(__FILE__, __LINE__, __func__, message.str());
     499           0 :     }
     500             : 
     501             :     // if (sep_stages.size() != tasks_inplace.size() && tasks_inplace.size() != 0)
     502             :     // {
     503             :     //  std::stringstream message;
     504             :     //  message << "'tasks_inplace.size()' has to be equal to 'sep_stages.size()' or equal to '0' ('"
     505             :     //          << "tasks_inplace.size()' = " << tasks_inplace.size() << " , 'sep_stages.size()' = "
     506             :     //          << sep_stages.size() << ").";
     507             :     //  throw tools::invalid_argument(__FILE__, __LINE__, __func__, message.str());
     508             :     // }
     509             : 
     510             :     // Creating a vector of pinning policies for each sequence
     511          94 :     std::vector<std::string> sequences_pinning_policies;
     512          94 :     if (!pipeline_pinning_policy.empty())
     513          22 :         sequences_pinning_policies =
     514             :           tools::Thread_pinning_utils::pipeline_parser_unpacker(pipeline_pinning_policy, sep_stages.size());
     515             : 
     516         430 :     for (size_t s = 0; s < sep_stages.size(); s++)
     517             :     {
     518         336 :         const std::vector<TA*>& stage_firsts = std::get<0>(sep_stages[s]);
     519         336 :         const std::vector<TA*>& stage_lasts = std::get<1>(sep_stages[s]);
     520         336 :         const std::vector<TA*>& stage_exclusions = std::get<2>(sep_stages[s]);
     521         336 :         const size_t stage_n_threads = n_threads.size() ? n_threads[s] : 1;
     522         336 :         const bool stage_thread_pinning = thread_pinning.size() ? thread_pinning[s] : false;
     523         336 :         const std::vector<size_t> stage_puids = puids.size() ? puids[s] : std::vector<size_t>();
     524         398 :         const std::string sequence_pinning_policy =
     525         398 :           sequences_pinning_policies.size() ? sequences_pinning_policies[s] : "";
     526         336 :         const bool stage_tasks_inplace = /*tasks_inplace.size() ? tasks_inplace[s] :*/ true;
     527             :         try
     528             :         {
     529         336 :             if (pipeline_pinning_policy.empty())
     530         274 :                 this->stages[s].reset(create_sequence<TA>(stage_firsts,
     531             :                                                           stage_lasts,
     532             :                                                           stage_exclusions,
     533             :                                                           stage_n_threads,
     534             :                                                           stage_thread_pinning,
     535             :                                                           stage_puids,
     536             :                                                           stage_tasks_inplace));
     537             :             else
     538          62 :                 this->stages[s].reset(create_sequence<TA>(stage_firsts,
     539             :                                                           stage_lasts,
     540             :                                                           stage_exclusions,
     541             :                                                           stage_n_threads,
     542             :                                                           stage_thread_pinning,
     543             :                                                           sequence_pinning_policy,
     544             :                                                           stage_tasks_inplace));
     545             :         }
     546           0 :         catch (const tools::control_flow_error& e)
     547             :         {
     548           0 :             std::stringstream message;
     549           0 :             message << "Invalid control flow error on stage " << s
     550           0 :                     << " (perhaps a switcher's tasks were separated between different stages)." << std::endl
     551           0 :                     << e.what();
     552           0 :             throw tools::control_flow_error(__FILE__, __LINE__, __func__, message.str());
     553           0 :         }
     554         336 :         this->stages[s]->is_part_of_pipeline = true;
     555             :     }
     556             : 
     557             :     // verify that the sequential sequence is equivalent to the pipeline sequence
     558          94 :     auto ref_tasks = this->original_sequence.get_tasks_per_threads()[0];
     559          94 :     auto cur_tasks = this->get_tasks_per_threads()[0];
     560             : 
     561          94 :     if (ref_tasks.size() != cur_tasks.size())
     562             :     {
     563           0 :         std::ofstream f1("dbg_ref_sequence.dot");
     564           0 :         this->original_sequence.export_dot(f1);
     565           0 :         std::ofstream f2("dbg_cur_pipeline.dot");
     566           0 :         this->export_dot(f2);
     567             : 
     568           0 :         std::stringstream message;
     569           0 :         message << "'ref_tasks.size()' has to be equal to 'cur_tasks.size()' ('ref_tasks.size()' = " << ref_tasks.size()
     570           0 :                 << ", 'cur_tasks.size()' = " << cur_tasks.size() << ").";
     571           0 :         throw tools::invalid_argument(__FILE__, __LINE__, __func__, message.str());
     572           0 :     }
     573             : 
     574         963 :     for (size_t ta = 0; ta < cur_tasks.size(); ta++)
     575             :     {
     576         869 :         if (std::find(ref_tasks.begin(), ref_tasks.end(), cur_tasks[ta]) == ref_tasks.end())
     577             :         {
     578           0 :             std::ofstream f1("dbg_ref_sequence.dot");
     579           0 :             this->original_sequence.export_dot(f1);
     580           0 :             std::ofstream f2("dbg_cur_pipeline.dot");
     581           0 :             this->export_dot(f2);
     582             : 
     583           0 :             std::stringstream message;
     584           0 :             message << "'cur_tasks[ta]' is not contained in the 'ref_tasks' vector ('ta' = " << ta
     585           0 :                     << ", 'cur_tasks[ta]' = " << +cur_tasks[ta]
     586           0 :                     << ", 'cur_tasks[ta]->get_name()' = " << cur_tasks[ta]->get_name() << ").";
     587           0 :             throw tools::invalid_argument(__FILE__, __LINE__, __func__, message.str());
     588           0 :         }
     589             :     }
     590             : 
     591          94 :     this->create_adaptors(synchro_buffer_sizes, synchro_active_waiting);
     592          94 :     this->bind_adaptors();
     593             : 
     594          94 :     this->thread_pool.reset(new tools::Thread_pool_standard(this->stages.size() - 1));
     595          94 :     this->thread_pool->init(); // threads are spawned here
     596          94 : }
     597             : 
     598             : void
     599          94 : Pipeline::create_adaptors(const std::vector<size_t>& synchro_buffer_sizes,
     600             :                           const std::vector<bool>& synchro_active_waiting)
     601             : {
     602             :     //                     sck out addr     occ     stage   tsk id  sck id
     603          94 :     std::vector<std::tuple<runtime::Socket*, size_t, size_t, size_t, size_t>> out_sck_orphans;
     604             : 
     605             :     // for all the stages in the pipeline
     606         336 :     for (size_t sta = 0; sta < this->stages.size() - 1; sta++)
     607             :     {
     608             :         // ------------------------------------------------------------------------------------------------------------
     609             :         // --------------------------------------------------------------- collect orphan output sockets in stage 'sta'
     610             :         // ------------------------------------------------------------------------------------------------------------
     611         242 :         std::vector<std::vector<runtime::Task*>> tasks_per_threads = this->stages[sta]->get_tasks_per_threads();
     612             :         // for all the threads in the current stage
     613             :         // for (size_t t = 0; t < tasks_per_threads.size(); t++)
     614         242 :         size_t t = 0;
     615             :         {
     616             :             // for all the tasks in the stage
     617         901 :             for (size_t tsk_id = 0; tsk_id < tasks_per_threads[t].size(); tsk_id++)
     618             :             {
     619         659 :                 auto tsk = tasks_per_threads[t][tsk_id];
     620             :                 // for all the sockets of the tasks
     621        2350 :                 for (size_t sck_id = 0; sck_id < tsk->sockets.size(); sck_id++)
     622             :                 {
     623        1691 :                     auto sck = tsk->sockets[sck_id];
     624             :                     // if the current socket is an output or forward socket type
     625        1691 :                     if (sck->get_type() == socket_t::SOUT || sck->get_type() == socket_t::SFWD)
     626             :                     {
     627             :                         // for all the bounded sockets to the current socket
     628        2151 :                         for (auto bsck : sck->get_bound_sockets())
     629             :                         {
     630             :                             // check if the task of the bounded socket is not in the current stage
     631         807 :                             if (std::find(tasks_per_threads[t].begin(),
     632         807 :                                           tasks_per_threads[t].end(),
     633        1614 :                                           &bsck->get_task()) == tasks_per_threads[t].end())
     634             :                             {
     635             :                                 // check the position of the socket in the orphans
     636         379 :                                 size_t pos = 0;
     637        1018 :                                 for (; pos < out_sck_orphans.size(); pos++)
     638         682 :                                     if (std::get<0>(out_sck_orphans[pos]) == sck.get()) break;
     639             : 
     640         379 :                                 if (pos == out_sck_orphans.size())
     641         336 :                                     out_sck_orphans.push_back(std::make_tuple(sck.get(), 1, sta, tsk_id, sck_id));
     642             :                                 else
     643          43 :                                     std::get<1>(out_sck_orphans[pos])++;
     644             :                             }
     645             :                         }
     646             :                     }
     647        1691 :                 }
     648             :             }
     649             :         }
     650             : 
     651             :         // ------------------------------------------------------------------------------------------------------------
     652             :         // -------------------------------------- collect orphan input sockets in stage 'sta +1' and create connections
     653             :         // ------------------------------------------------------------------------------------------------------------
     654         242 :         tasks_per_threads = this->stages[sta + 1]->get_tasks_per_threads();
     655             :         // for all the threads in the current stage
     656             :         // for (size_t t = 0; t < tasks_per_threads.size(); t++)
     657             :         {
     658             :             // for all the tasks in the stage
     659         949 :             for (size_t tsk_id = 0; tsk_id < tasks_per_threads[t].size(); tsk_id++)
     660             :             {
     661         707 :                 auto tsk = tasks_per_threads[t][tsk_id];
     662             :                 // ----------------------------------------- manage socket to socket bindings (with fake input sockets)
     663             :                 // for all the sockets of the tasks
     664        2504 :                 for (size_t sck_id = 0; sck_id < tsk->sockets.size(); sck_id++)
     665             :                 {
     666        1797 :                     auto sck = tsk->sockets[sck_id];
     667             :                     // if the current socket is an input or forward socket type
     668        1797 :                     if (sck->get_type() == socket_t::SIN || sck->get_type() == socket_t::SFWD)
     669             :                     {
     670         723 :                         runtime::Socket* bsck = nullptr;
     671             :                         try
     672             :                         {
     673             :                             // get output bounded socket
     674         723 :                             bsck = &sck->get_bound_socket(); // can throw if there is no bounded socket
     675             :                         }
     676           0 :                         catch (const std::exception&)
     677             :                         {
     678           0 :                         }
     679         723 :                         if (bsck != nullptr)
     680             :                         {
     681             :                             // check if the task of the bounded socket is not in the current stage
     682         723 :                             if (std::find(tasks_per_threads[t].begin(),
     683         723 :                                           tasks_per_threads[t].end(),
     684        1446 :                                           &bsck->get_task()) == tasks_per_threads[t].end())
     685             :                             {
     686             :                                 // check the position of the bounded socket in the orphans
     687         354 :                                 size_t pos = 0;
     688         907 :                                 for (; pos < out_sck_orphans.size(); pos++)
     689         907 :                                     if (std::get<0>(out_sck_orphans[pos]) == bsck) break;
     690             : 
     691         354 :                                 if (pos < out_sck_orphans.size())
     692             :                                 {
     693         354 :                                     auto sck_out = std::get<0>(out_sck_orphans[pos]);
     694         354 :                                     auto sck_in = sck.get();
     695         708 :                                     auto unbind_sout_pos = std::distance(sck_out->get_bound_sockets().begin(),
     696         354 :                                                                          std::find(sck_out->get_bound_sockets().begin(),
     697         354 :                                                                                    sck_out->get_bound_sockets().end(),
     698         354 :                                                                                    sck_in));
     699         354 :                                     this->sck_orphan_binds.push_back(
     700         354 :                                       std::make_pair(std::make_tuple(std::get<0>(out_sck_orphans[pos]),
     701         354 :                                                                      std::get<2>(out_sck_orphans[pos]),
     702         354 :                                                                      std::get<3>(out_sck_orphans[pos]),
     703         354 :                                                                      std::get<4>(out_sck_orphans[pos]),
     704             :                                                                      unbind_sout_pos),
     705         708 :                                                      std::make_tuple(sck_in, sta + 1, tsk_id, sck_id, nullptr)));
     706             :                                 }
     707             :                             }
     708             :                         }
     709             :                     }
     710        1797 :                 }
     711             :                 // ------------------------------------------- manage socket to task bindings (with fake input sockets)
     712         835 :                 for (size_t sck_id = 0; sck_id < tsk->fake_input_sockets.size(); sck_id++)
     713             :                 {
     714         128 :                     auto sck = tsk->fake_input_sockets[sck_id];
     715         128 :                     runtime::Socket* bsck = nullptr;
     716             :                     try
     717             :                     {
     718             :                         // get output bounded socket
     719         128 :                         bsck = &sck->get_bound_socket(); // can throw if there is no bounded socket
     720             :                     }
     721           0 :                     catch (const std::exception&)
     722             :                     {
     723           0 :                     }
     724         128 :                     if (bsck != nullptr)
     725             :                     {
     726             :                         // check if the task of the bounded socket is not in the current stage
     727         128 :                         if (std::find(tasks_per_threads[t].begin(), tasks_per_threads[t].end(), &bsck->get_task()) ==
     728         256 :                             tasks_per_threads[t].end())
     729             :                         {
     730             :                             // check the position of the bounded socket in the orphans
     731          25 :                             size_t pos = 0;
     732         111 :                             for (; pos < out_sck_orphans.size(); pos++)
     733         111 :                                 if (std::get<0>(out_sck_orphans[pos]) == bsck) break;
     734             : 
     735          25 :                             if (pos < out_sck_orphans.size())
     736             :                             {
     737          25 :                                 auto sck_out = std::get<0>(out_sck_orphans[pos]);
     738          25 :                                 auto sck_in = sck.get();
     739          50 :                                 auto unbind_sout_pos = std::distance(sck_out->get_bound_sockets().begin(),
     740          25 :                                                                      std::find(sck_out->get_bound_sockets().begin(),
     741          25 :                                                                                sck_out->get_bound_sockets().end(),
     742          25 :                                                                                sck_in));
     743          25 :                                 this->sck_orphan_binds.push_back(
     744          25 :                                   std::make_pair(std::make_tuple(std::get<0>(out_sck_orphans[pos]),
     745          25 :                                                                  std::get<2>(out_sck_orphans[pos]),
     746          25 :                                                                  std::get<3>(out_sck_orphans[pos]),
     747          25 :                                                                  std::get<4>(out_sck_orphans[pos]),
     748             :                                                                  unbind_sout_pos),
     749          50 :                                                  std::make_tuple(nullptr, sta + 1, tsk_id, sck_id, tsk)));
     750             :                             }
     751             :                         }
     752             :                     }
     753         128 :                 }
     754             :                 // ----------------------------------------------------------------------------------------------------
     755             :             }
     756             :         }
     757         242 :     }
     758             : 
     759             :     // ----------------------------------------------------------------------------------------------------------------
     760             :     // ----------------------------------------------------------------------------------------------- prints for debug
     761             :     // ----------------------------------------------------------------------------------------------------------------
     762             :     // std::cout << "Orphan output sockets list:" << std::endl;
     763             :     // for (auto &sck : out_sck_orphans)
     764             :     // {
     765             :     //  auto sck_out_name = std::get<0>(sck)->get_name();
     766             :     //  auto tsk_out_name = std::get<0>(sck)->get_task().get_name();
     767             :     //  auto sck_out_occ  = std::get<1>(sck);
     768             :     //  auto tsk_out_sta  = std::get<2>(sck);
     769             :     //  auto tsk_out_id   = std::get<3>(sck);
     770             :     //  auto sck_out_id   = std::get<4>(sck);
     771             : 
     772             :     //  std::cout << "  " << tsk_out_name << "[" << sck_out_name << "] (stage " << tsk_out_sta << ", " << sck_out_occ
     773             :     //            << " occurrences, tsk id = " << tsk_out_id << ", sck id = " << sck_out_id << ")" << std::endl;
     774             :     // }
     775             : 
     776             :     // std::cout << std::endl << "Detected socket binds:" << std::endl;
     777             :     // for (auto &bind : this->sck_orphan_binds)
     778             :     // {
     779             :     //  auto sck_out_name = std::get<0>(bind.first)->get_name();
     780             :     //  auto tsk_out_name = std::get<0>(bind.first)->get_task().get_name();
     781             :     //  auto tsk_out_sta  = std::get<1>(bind.first);
     782             :     //  auto tsk_out_id   = std::get<2>(bind.first);
     783             :     //  auto sck_out_id   = std::get<3>(bind.first);
     784             :     //  auto sck_out_ubp  = std::get<4>(bind.first);
     785             : 
     786             :     //  auto sck_in_name = std::get<0>(bind.second)->get_name();
     787             :     //  auto tsk_in_name = std::get<0>(bind.second)->get_task().get_name();
     788             :     //  auto tsk_in_sta  = std::get<1>(bind.second);
     789             :     //  auto tsk_in_id   = std::get<2>(bind.second);
     790             :     //  auto sck_in_id   = std::get<3>(bind.second);
     791             : 
     792             :     //  std::cout << "  " << tsk_out_name << "[" << sck_out_name << "] (stage " << tsk_out_sta << ", tsk id = "
     793             :     //                    << tsk_out_id << ", sck id = " << sck_out_id << ", ubp = " << sck_out_ubp << ")"  << " -> "
     794             :     //                    << tsk_in_name  << "[" << sck_in_name  << "] (stage " << tsk_in_sta  << ", tsk id = "
     795             :     //                    << tsk_in_id  << ", sck id = " << sck_in_id  << ")" << std::endl;
     796             :     // }
     797             : 
     798             :     // ----------------------------------------------------------------------------------------------------------------
     799             :     // ------------------------------------------------------------------------------------------------ create adaptors
     800             :     // ----------------------------------------------------------------------------------------------------------------
     801          94 :     auto sck_orphan_binds_cpy = this->sck_orphan_binds;
     802          94 :     module::Adaptor_m_to_n* adp = nullptr;
     803          94 :     std::map<runtime::Socket*, size_t> sck_to_adp_sck_id;
     804         430 :     for (size_t sta = 0; sta < this->stages.size(); sta++)
     805             :     {
     806         336 :         const auto n_threads = this->stages[sta]->get_n_threads();
     807         336 :         std::vector<std::vector<runtime::Task*>> tasks_per_threads = this->stages[sta]->get_tasks_per_threads();
     808             : 
     809             :         // ------------------------------------------------------------------------------------------------------------
     810             :         // ----------------------------------------------------------------------------------------------- pull adaptor
     811             :         // ------------------------------------------------------------------------------------------------------------
     812         336 :         if (sta > 0)
     813             :         {
     814         242 :             assert(adp != nullptr);
     815             :             //                               sck out addr      stage   tsk id  sck id  unbind_pos
     816             :             std::vector<std::pair<std::tuple<runtime::Socket*, size_t, size_t, size_t, size_t>,
     817             :                                   //                               sck in addr       stage   tsk id  sck id  tsk in addr
     818             :                                   std::tuple<runtime::Socket*, size_t, size_t, size_t, runtime::Task*>>>
     819         242 :               sck_orphan_binds_new;
     820             : 
     821        2826 :             for (size_t t = 0; t < n_threads; t++)
     822             :             {
     823        2584 :                 module::Adaptor_m_to_n* cur_adp = (t == 0) ? adp : adp->clone();
     824        2584 :                 if (t > 0) cur_adp->add_puller();
     825             : 
     826        7752 :                 for (auto& t : cur_adp->tasks)
     827        5168 :                     t->set_fast(true);
     828        2584 :                 if (t > 0)
     829             :                 {
     830        2342 :                     this->adaptors[sta - 1].second.push_back(std::unique_ptr<module::Adaptor_m_to_n>(cur_adp));
     831        2342 :                     cur_adp->set_custom_name("Adp_m_to_n_" + std::to_string(sta - 1));
     832             :                 }
     833             : 
     834        2584 :                 auto task_pull = &(*cur_adp)("pull");
     835             : 
     836        2584 :                 sck_orphan_binds_new.clear();
     837       14870 :                 for (auto& bind : sck_orphan_binds_cpy)
     838             :                 {
     839       12286 :                     auto tsk_out_sta = std::get<1>(bind.first);
     840       12286 :                     if (tsk_out_sta < sta)
     841             :                     {
     842        6781 :                         auto tsk_in_sta = std::get<1>(bind.second);
     843        6781 :                         if (tsk_in_sta == sta)
     844             :                         {
     845        3625 :                             auto sck_out_ptr = std::get<0>(bind.first);
     846        3625 :                             auto priority = std::get<4>(bind.first);
     847        3625 :                             auto tsk_in_id = std::get<2>(bind.second);
     848        3625 :                             auto sck_in_id = std::get<3>(bind.second);
     849        3625 :                             runtime::Socket* sck_in = nullptr;
     850        3625 :                             runtime::Task* tsk_in = nullptr;
     851        3625 :                             if (std::get<0>(bind.second) != nullptr) // if socket to socket binding
     852        3032 :                                 sck_in = tasks_per_threads[t][tsk_in_id]->sockets[sck_in_id].get();
     853             :                             else // if socket to task binding
     854         593 :                                 tsk_in = tasks_per_threads[t][tsk_in_id];
     855        3625 :                             this->adaptors_binds.push_back(std::make_tuple(
     856        7250 :                               task_pull->sockets[sck_to_adp_sck_id[sck_out_ptr]].get(), sck_in, priority, tsk_in));
     857             :                         }
     858             :                         else
     859        3156 :                             sck_orphan_binds_new.push_back(bind);
     860             :                     }
     861             :                     else
     862        5505 :                         sck_orphan_binds_new.push_back(bind);
     863             :                 }
     864             : 
     865        2584 :                 if (t > 0) this->stages[sta]->all_modules[t].push_back(cur_adp);
     866             :             }
     867         242 :             this->saved_firsts_tasks_id[sta] = this->stages[sta]->firsts_tasks_id;
     868         242 :             sck_orphan_binds_cpy = sck_orphan_binds_new;
     869             : 
     870         242 :             adp->alloc_buffers();
     871         242 :         }
     872             : 
     873             :         // ------------------------------------------------------------------------------------------------------------
     874             :         // ----------------------------------------------------------------------------------------------- push adaptor
     875             :         // ------------------------------------------------------------------------------------------------------------
     876         336 :         std::map<runtime::Socket*, size_t> sck_to_adp_sck_id_new;
     877         336 :         if (sta < this->stages.size() - 1)
     878             :         {
     879         242 :             std::vector<size_t> adp_n_elmts;
     880         242 :             std::vector<std::type_index> adp_datatype;
     881         242 :             size_t adp_buffer_size = synchro_buffer_sizes.size() ? synchro_buffer_sizes[sta] : 1;
     882         242 :             bool adp_active_waiting = synchro_active_waiting.size() ? synchro_active_waiting[sta] : false;
     883         242 :             size_t adp_n_frames = 1;
     884             : 
     885             :             // a map to remember if a passed socket points already to the same memory space
     886         242 :             std::map<void*, size_t> fwd_source;
     887             : 
     888         242 :             std::vector<runtime::Socket*> passed_scks_out;
     889        1060 :             for (auto& bind : sck_orphan_binds_cpy)
     890             :             {
     891         818 :                 auto tsk_out_sta = std::get<1>(bind.first);
     892         818 :                 if (tsk_out_sta <= sta)
     893             :                 {
     894         520 :                     auto sck_out = std::get<0>(bind.first);
     895         520 :                     if (std::find(passed_scks_out.begin(), passed_scks_out.end(), sck_out) == passed_scks_out.end())
     896             :                     {
     897             :                         // avoid the creation of new adaptor sockets for forward sockets pointing to the same memory
     898             :                         // space
     899         467 :                         auto sck_out_dptr = (void*)sck_out->get_dataptr<int8_t>();
     900         467 :                         assert(sck_out_dptr != nullptr);
     901         467 :                         if (fwd_source.find(sck_out_dptr) == fwd_source.end())
     902             :                         {
     903         410 :                             fwd_source[sck_out_dptr] = 1;
     904         410 :                             adp_n_frames = sck_out->get_task().get_module().get_n_frames();
     905         410 :                             adp_n_elmts.push_back(sck_out->get_n_elmts() / adp_n_frames);
     906         410 :                             adp_datatype.push_back(sck_out->get_datatype());
     907             :                         }
     908         467 :                         passed_scks_out.push_back(sck_out);
     909             :                     }
     910             :                 }
     911             :             }
     912         242 :             passed_scks_out.clear();
     913             : 
     914             :             // allocate the adaptor for the first thread
     915         242 :             adp = new module::Adaptor_m_to_n(adp_n_elmts, adp_datatype, adp_buffer_size, adp_active_waiting);
     916         242 :             adp->set_n_frames(adp_n_frames);
     917             : 
     918        2826 :             for (size_t t = 0; t < n_threads; t++)
     919             :             {
     920        2584 :                 module::Adaptor_m_to_n* cur_adp = (t == 0) ? adp : adp->clone();
     921        2584 :                 cur_adp->set_custom_name("Adp_m_to_n_" + std::to_string(sta));
     922        2584 :                 if (t > 0) cur_adp->add_pusher();
     923        2584 :                 this->adaptors[sta].first.push_back(std::unique_ptr<module::Adaptor_m_to_n>(cur_adp));
     924        2584 :                 auto task_push = &(*cur_adp)("push");
     925             : 
     926        2584 :                 std::map<void*, size_t> fwd_source;
     927        2584 :                 sck_to_adp_sck_id_new.clear();
     928        2584 :                 size_t adp_sck_id = 0;
     929       11624 :                 for (auto& bind : sck_orphan_binds_cpy) // bind standard task to last adaptor
     930             :                 {
     931        9040 :                     auto tsk_out_sta = std::get<1>(bind.first);
     932             : 
     933        9040 :                     if (tsk_out_sta <= sta)
     934             :                     {
     935        7762 :                         auto sck_out_ptr = std::get<0>(bind.first);
     936        7762 :                         auto sck_out_dptr = (void*)sck_out_ptr->get_dataptr<int8_t>();
     937        7762 :                         assert(sck_out_dptr != nullptr);
     938             : 
     939        7762 :                         if (std::find(passed_scks_out.begin(),
     940             :                                       passed_scks_out.end(),
     941       22484 :                                       sck_out_ptr) == passed_scks_out.end() &&
     942       14722 :                             fwd_source.find(sck_out_dptr) == fwd_source.end()) // <= the latest condition is here to
     943             :                                                                                //    avoid to bind adaptor sockets two
     944             :                                                                                //    times the same memory space
     945             :                                                                                //    (usefull in the case of multiple
     946             :                                                                                //    fwd sockets pointing to the same
     947             :                                                                                //    memory address)
     948             :                         {
     949        5763 :                             if (tsk_out_sta == sta)
     950             :                             {
     951        3778 :                                 auto tsk_out_id = std::get<2>(bind.first);
     952        3778 :                                 auto sck_out_id = std::get<3>(bind.first);
     953        3778 :                                 auto sck_out = tasks_per_threads[t][tsk_out_id]->sockets[sck_out_id];
     954        3778 :                                 auto priority = std::get<4>(bind.first);
     955        3778 :                                 sck_to_adp_sck_id_new[sck_out_ptr] = adp_sck_id;
     956        3778 :                                 this->adaptors_binds.push_back(
     957        3778 :                                   std::make_tuple(sck_out.get(),
     958        3778 :                                                   task_push->sockets[adp_sck_id++].get(),
     959             :                                                   priority,
     960        3778 :                                                   nullptr)); // <= only socket to socket binding is possible here
     961        3778 :                             }
     962             :                             else // if (tsk_out_sta < sta) // bind prev. adaptor to last adaptor
     963             :                             {
     964        1985 :                                 auto tsk_out_id = 1;
     965        1985 :                                 auto sck_out_id = sck_to_adp_sck_id[sck_out_ptr];
     966        1985 :                                 sck_to_adp_sck_id_new[sck_out_ptr] = adp_sck_id;
     967             :                                 auto adp_prev =
     968        1985 :                                   t == 0 ? this->adaptors[sta - 1].first[0] : this->adaptors[sta - 1].second[t - 1];
     969        1985 :                                 auto sck_out = (*adp_prev)[tsk_out_id].sockets[sck_out_id];
     970        1985 :                                 auto priority = std::get<4>(bind.first);
     971        1985 :                                 this->adaptors_binds.push_back(
     972        1985 :                                   std::make_tuple(sck_out.get(),
     973        1985 :                                                   task_push->sockets[adp_sck_id++].get(),
     974             :                                                   priority,
     975        1985 :                                                   nullptr)); // <= only socket to socket binding is possible here
     976        1985 :                             }
     977             : 
     978        5763 :                             fwd_source[sck_out_dptr] = 1; // remember that this memory space has been connected to the
     979             :                                                           // adaptor once
     980        5763 :                             passed_scks_out.push_back(sck_out_ptr);
     981             :                         }
     982             :                     }
     983             :                 }
     984        2584 :                 passed_scks_out.clear();
     985        2584 :             }
     986         242 :             this->saved_lasts_tasks_id[sta] = this->stages[sta]->lasts_tasks_id;
     987         242 :         }
     988         336 :         sck_to_adp_sck_id = sck_to_adp_sck_id_new;
     989         336 :     }
     990          94 : }
     991             : 
     992             : void
     993         280 : Pipeline::bind_adaptors()
     994             : {
     995         280 :     this->_bind_adaptors(true);
     996         280 : }
     997             : 
     998             : void
     999         280 : Pipeline::_bind_adaptors(const bool bind_adaptors)
    1000             : {
    1001         280 :     if (!this->bound_adaptors)
    1002             :     {
    1003        1280 :         for (size_t sta = 0; sta < this->stages.size(); sta++)
    1004             :         {
    1005        1000 :             const auto n_threads = this->stages[sta]->get_n_threads();
    1006             : 
    1007             :             // --------------------------------------------------------------------------------------------------------
    1008             :             // ------------------------------------------------------------------------------------------- pull adaptor
    1009             :             // --------------------------------------------------------------------------------------------------------
    1010        1000 :             if (sta > 0)
    1011             :             {
    1012        8466 :                 for (size_t t = 0; t < n_threads; t++)
    1013             :                 {
    1014             :                     module::Adaptor_m_to_n* cur_adp =
    1015        7746 :                       t > 0 ? adaptors[sta - 1].second[t - 1].get() : adaptors[sta - 1].first[0].get();
    1016             : 
    1017        7746 :                     if (t > 0 || sta == this->stages.size() - 1) // add the adaptor to the current stage
    1018        7294 :                         this->stages[sta]->all_modules[t].push_back(cur_adp);
    1019             : 
    1020        7746 :                     auto task_pull = &(*cur_adp)("pull");
    1021             : 
    1022        7746 :                     auto ss = this->stages[sta]->sequences[t]->get_contents();
    1023        7746 :                     assert(ss != nullptr);
    1024        7746 :                     ss->tasks.insert(ss->tasks.begin(), task_pull);
    1025        7746 :                     ss->processes.insert(ss->processes.begin(),
    1026           0 :                                          [task_pull]() -> const int*
    1027             :                                          {
    1028           0 :                                              task_pull->exec();
    1029           0 :                                              const int* status = task_pull->sockets.back()->get_dataptr<const int>();
    1030           0 :                                              return status;
    1031             :                                          });
    1032        7746 :                     this->stages[sta]->update_tasks_id(t);
    1033             :                 }
    1034         720 :                 this->stages[sta]->firsts_tasks_id.clear();
    1035         720 :                 this->stages[sta]->firsts_tasks_id.push_back(0);
    1036         720 :                 this->stages[sta]->n_tasks++;
    1037             :             }
    1038             : 
    1039             :             // --------------------------------------------------------------------------------------------------------
    1040             :             // ------------------------------------------------------------------------------------------- push adaptor
    1041             :             // --------------------------------------------------------------------------------------------------------
    1042        1000 :             if (sta < this->stages.size() - 1)
    1043             :             {
    1044         720 :                 size_t last_task_id = 0;
    1045        8466 :                 for (size_t t = 0; t < n_threads; t++)
    1046             :                 {
    1047        7746 :                     module::Adaptor_m_to_n* cur_adp = adaptors[sta].first[t].get();
    1048             : 
    1049             :                     // add the adaptor to the current stage
    1050        7746 :                     this->stages[sta]->all_modules[t].push_back(cur_adp);
    1051             : 
    1052        7746 :                     auto task_push = &(*cur_adp)("push");
    1053             : 
    1054        7746 :                     auto ss = this->stages[sta]->get_last_subsequence(t);
    1055        7746 :                     assert(ss != nullptr);
    1056        7746 :                     ss->tasks.push_back(task_push);
    1057        7746 :                     ss->processes.push_back(
    1058           0 :                       [task_push]() -> const int*
    1059             :                       {
    1060           0 :                           task_push->exec();
    1061           0 :                           const int* status = task_push->sockets.back()->get_dataptr<const int>();
    1062           0 :                           return status;
    1063             :                       });
    1064        7746 :                     last_task_id = ss->tasks_id[ss->tasks_id.size() - 1] + 1;
    1065        7746 :                     ss->tasks_id.push_back(last_task_id);
    1066             :                 }
    1067         720 :                 this->stages[sta]->lasts_tasks_id.clear();
    1068         720 :                 this->stages[sta]->lasts_tasks_id.push_back(last_task_id);
    1069         720 :                 this->stages[sta]->n_tasks++;
    1070             :             }
    1071        1000 :             this->stages[sta]->update_firsts_and_lasts_tasks();
    1072             :         }
    1073             : 
    1074             :         // ------------------------------------------------------------------------------------------------------------
    1075             :         // ---------------------------------------------------------------------------------------------- bind adaptors
    1076             :         // ------------------------------------------------------------------------------------------------------------
    1077        1411 :         for (auto& bind : this->sck_orphan_binds)
    1078             :         {
    1079        1131 :             auto sck_out = std::get<0>(bind.first);
    1080        1131 :             auto sck_in = std::get<0>(bind.second);
    1081        1131 :             if (sck_in != nullptr) // if socket to socket unbinding
    1082        1056 :                 sck_in->unbind(*sck_out);
    1083             :             else // if socket to task unbinding
    1084             :             {
    1085          75 :                 auto tsk_in = std::get<4>(bind.second);
    1086          75 :                 assert(tsk_in != nullptr);
    1087          75 :                 tsk_in->unbind(*sck_out);
    1088             :             }
    1089             :         }
    1090             : 
    1091         280 :         if (bind_adaptors)
    1092             :         {
    1093       28432 :             for (auto& bind : this->adaptors_binds)
    1094             :             {
    1095       28152 :                 auto sck_out = std::get<0>(bind);
    1096       28152 :                 auto sck_in = std::get<1>(bind);
    1097       28152 :                 auto priority = std::get<2>(bind);
    1098       28152 :                 if (sck_in != nullptr) // if socket to socket binding
    1099       26373 :                     sck_in->_bind(*sck_out, priority);
    1100             :                 else // if socket to task binding
    1101             :                 {
    1102        1779 :                     auto tsk_in = std::get<3>(bind);
    1103        1779 :                     assert(tsk_in != nullptr);
    1104        1779 :                     tsk_in->_bind(*sck_out, priority);
    1105             :                 }
    1106             :             }
    1107             :         }
    1108             : 
    1109         280 :         this->bound_adaptors = true;
    1110             :     }
    1111         280 : }
    1112             : 
    1113             : void
    1114         187 : Pipeline::unbind_adaptors()
    1115             : {
    1116         187 :     this->_unbind_adaptors(true);
    1117         187 : }
    1118             : 
    1119             : void
    1120         373 : Pipeline::_unbind_adaptors(const bool bind_orphans)
    1121             : {
    1122         373 :     if (this->bound_adaptors)
    1123             :     {
    1124        1280 :         for (size_t sta = 0; sta < this->stages.size(); sta++)
    1125             :         {
    1126        1000 :             const auto n_threads = this->stages[sta]->get_n_threads();
    1127             : 
    1128             :             // --------------------------------------------------------------------------------------------------------
    1129             :             // ------------------------------------------------------------------------------------------- pull adaptor
    1130             :             // --------------------------------------------------------------------------------------------------------
    1131        1000 :             if (sta > 0)
    1132             :             {
    1133        8466 :                 for (size_t t = 0; t < n_threads; t++)
    1134             :                 {
    1135        7746 :                     if (t > 0 || sta == this->stages.size() - 1) // rm the adaptor to the current stage
    1136        7294 :                         this->stages[sta]->all_modules[t].pop_back();
    1137             : 
    1138        7746 :                     auto ss = this->stages[sta]->sequences[t]->get_contents();
    1139        7746 :                     assert(ss != nullptr);
    1140        7746 :                     ss->tasks.erase(ss->tasks.begin());
    1141        7746 :                     ss->processes.erase(ss->processes.begin());
    1142        7746 :                     this->stages[sta]->update_tasks_id(t);
    1143             :                 }
    1144         720 :                 this->stages[sta]->firsts_tasks_id = this->saved_firsts_tasks_id[sta];
    1145         720 :                 this->stages[sta]->n_tasks--;
    1146             :             }
    1147             : 
    1148             :             // --------------------------------------------------------------------------------------------------------
    1149             :             // ------------------------------------------------------------------------------------------- push adaptor
    1150             :             // --------------------------------------------------------------------------------------------------------
    1151        1000 :             if (sta < this->stages.size() - 1)
    1152             :             {
    1153        8466 :                 for (size_t t = 0; t < n_threads; t++)
    1154             :                 {
    1155             :                     // rm the adaptor to the current stage
    1156        7746 :                     this->stages[sta]->all_modules[t].pop_back();
    1157             : 
    1158        7746 :                     auto ss = this->stages[sta]->get_last_subsequence(t);
    1159        7746 :                     assert(ss != nullptr);
    1160        7746 :                     ss->tasks.pop_back();
    1161        7746 :                     ss->processes.pop_back();
    1162        7746 :                     ss->tasks_id.pop_back();
    1163             :                 }
    1164         720 :                 this->stages[sta]->lasts_tasks_id = this->saved_lasts_tasks_id[sta];
    1165         720 :                 this->stages[sta]->n_tasks--;
    1166             :             }
    1167        1000 :             this->stages[sta]->update_firsts_and_lasts_tasks();
    1168             :         }
    1169             : 
    1170             :         // ------------------------------------------------------------------------------------------------------------
    1171             :         // -------------------------------------------------------------------------------------------- unbind adaptors
    1172             :         // ------------------------------------------------------------------------------------------------------------
    1173       28432 :         for (auto& bind : this->adaptors_binds)
    1174             :         {
    1175       28152 :             auto sck_out = std::get<0>(bind);
    1176       28152 :             auto sck_in = std::get<1>(bind);
    1177       28152 :             if (sck_in != nullptr) // if socket to socket unbinding
    1178       26373 :                 sck_in->unbind(*sck_out);
    1179             :             else // if socket to task unbinding
    1180             :             {
    1181        1779 :                 auto tsk_in = std::get<3>(bind);
    1182        1779 :                 assert(tsk_in != nullptr);
    1183        1779 :                 tsk_in->unbind(*sck_out);
    1184             :             }
    1185             :         }
    1186             : 
    1187         280 :         if (bind_orphans)
    1188             :         {
    1189         473 :             for (auto& bind : this->sck_orphan_binds)
    1190             :             {
    1191         379 :                 auto sck_out = std::get<0>(bind.first);
    1192         379 :                 auto priority = std::get<4>(bind.first);
    1193         379 :                 auto sck_in = std::get<0>(bind.second);
    1194         379 :                 if (sck_in != nullptr) // if socket to socket binding
    1195         354 :                     sck_in->_bind(*sck_out, priority);
    1196             :                 else // if socket to task binding
    1197             :                 {
    1198          25 :                     auto tsk_in = std::get<4>(bind.second);
    1199          25 :                     assert(tsk_in != nullptr);
    1200          25 :                     tsk_in->_bind(*sck_out, priority);
    1201             :                 }
    1202             :             }
    1203             :         }
    1204             : 
    1205         280 :         this->bound_adaptors = false;
    1206             :     }
    1207         373 : }
    1208             : 
    1209             : void
    1210           0 : Pipeline::exec(const std::vector<std::function<bool(const std::vector<const int*>&)>>& stop_conditions)
    1211             : {
    1212           0 :     if (stop_conditions.size() != this->stages.size() && stop_conditions.size() != 1)
    1213             :     {
    1214           0 :         std::stringstream message;
    1215           0 :         message << "'stop_conditions.size()' has to be equal to 'stages.size()' or to 1 ('stop_conditions.size()' = "
    1216           0 :                 << stop_conditions.size() << ", 'stages.size()' = " << stages.size() << ").";
    1217           0 :         throw tools::invalid_argument(__FILE__, __LINE__, __func__, message.str());
    1218           0 :     }
    1219             : 
    1220           0 :     if (!this->bound_adaptors)
    1221             :     {
    1222           0 :         std::stringstream message;
    1223           0 :         message << "'bound_adaptors' has to be true to execute the pipeline.";
    1224           0 :         throw tools::runtime_error(__FILE__, __LINE__, __func__, message.str());
    1225           0 :     }
    1226             : 
    1227             :     // ----------------------------------------------------------------------------------------------------------------
    1228           0 :     auto& stages = this->stages;
    1229           0 :     std::vector<const std::function<bool(const std::vector<const int*>&)>*> stop_condition_vec(stages.size() - 1,
    1230           0 :                                                                                                nullptr);
    1231           0 :     if (stop_conditions.size() == stages.size())
    1232           0 :         for (size_t s = 0; s < stages.size() - 1; s++)
    1233           0 :             stop_condition_vec[s] = &stop_conditions[s];
    1234             : 
    1235           0 :     std::function<void(const size_t)> func_exec = [&stages, &stop_condition_vec](const size_t tid)
    1236             :     {
    1237           0 :         size_t s = tid;
    1238           0 :         if (stop_condition_vec[s])
    1239           0 :             stages[s]->exec(*(stop_condition_vec[s]));
    1240             :         else
    1241           0 :             stages[s]->exec();
    1242             : 
    1243             :         // send the signal to stop the next stage
    1244           0 :         const auto& tasks = stages[s + 1]->get_tasks_per_threads();
    1245           0 :         for (size_t th = 0; th < tasks.size(); th++)
    1246           0 :             for (size_t ta = 0; ta < tasks[th].size(); ta++)
    1247             :             {
    1248           0 :                 auto m = dynamic_cast<module::Adaptor_m_to_n*>(&tasks[th][ta]->get_module());
    1249           0 :                 if (m != nullptr)
    1250           0 :                     if (tasks[th][ta]->get_name().find("pull") != std::string::npos) m->cancel_waiting();
    1251             :             }
    1252           0 :     };
    1253             : 
    1254           0 :     this->thread_pool->run(func_exec, true);
    1255             : 
    1256           0 :     stages[stages.size() - 1]->exec(stop_conditions[stop_conditions.size() - 1]);
    1257             : 
    1258             :     // stop all the stages before
    1259           0 :     for (size_t notify_s = 0; notify_s < stages.size() - 1; notify_s++)
    1260           0 :         for (auto& m : stages[notify_s]->get_modules<tools::Interface_waiting>())
    1261           0 :             m->cancel_waiting();
    1262             : 
    1263           0 :     this->thread_pool->wait();
    1264           0 :     this->thread_pool->unset_func_exec();
    1265             :     // ----------------------------------------------------------------------------------------------------------------
    1266             : 
    1267             :     // this is NOT made in the tools::Sequence::exec() to correctly flush the pipeline before restoring buffers
    1268             :     // initial configuration
    1269           0 :     for (auto& stage : this->stages)
    1270           0 :         if (stage->is_no_copy_mode())
    1271             :         {
    1272           0 :             stage->reset_no_copy_mode();
    1273           0 :             stage->gen_processes(false);
    1274             :         }
    1275             : 
    1276           0 :     for (auto& padps : this->adaptors)
    1277             :     {
    1278           0 :         for (auto& adp : padps.first)
    1279           0 :             adp->reset();
    1280           0 :         for (auto& adp : padps.second)
    1281           0 :             adp->reset();
    1282             :     }
    1283           0 : }
    1284             : 
    1285             : void
    1286          94 : Pipeline::exec(const std::vector<std::function<bool()>>& stop_conditions)
    1287             : {
    1288          94 :     if (stop_conditions.size() != this->stages.size() && stop_conditions.size() != 1)
    1289             :     {
    1290           0 :         std::stringstream message;
    1291           0 :         message << "'stop_conditions.size()' has to be equal to 'stages.size()' or to 1 ('stop_conditions.size()' = "
    1292           0 :                 << stop_conditions.size() << ", 'stages.size()' = " << stages.size() << ").";
    1293           0 :         throw tools::invalid_argument(__FILE__, __LINE__, __func__, message.str());
    1294           0 :     }
    1295             : 
    1296          94 :     if (!this->bound_adaptors)
    1297             :     {
    1298           0 :         std::stringstream message;
    1299           0 :         message << "'bound_adaptors' has to be true to execute the pipeline.";
    1300           0 :         throw tools::runtime_error(__FILE__, __LINE__, __func__, message.str());
    1301           0 :     }
    1302             : 
    1303             :     // ----------------------------------------------------------------------------------------------------------------
    1304          94 :     auto& stages = this->stages;
    1305          94 :     std::vector<const std::function<bool()>*> stop_condition_vec(stages.size() - 1, nullptr);
    1306          94 :     if (stop_conditions.size() == stages.size())
    1307           4 :         for (size_t s = 0; s < stages.size() - 1; s++)
    1308           0 :             stop_condition_vec[s] = &stop_conditions[s];
    1309             : 
    1310         709 :     std::function<void(const size_t)> func_exec = [&stages, &stop_condition_vec](const size_t tid)
    1311             :     {
    1312         238 :         size_t s = tid;
    1313         238 :         if (stop_condition_vec[s])
    1314           0 :             stages[s]->exec(*(stop_condition_vec[s]));
    1315             :         else
    1316         239 :             stages[s]->exec();
    1317             : 
    1318             :         // send the signal to stop the next stage
    1319         232 :         const auto& tasks = stages[s + 1]->get_tasks_per_threads();
    1320        2819 :         for (size_t th = 0; th < tasks.size(); th++)
    1321       19598 :             for (size_t ta = 0; ta < tasks[th].size(); ta++)
    1322             :             {
    1323       17006 :                 auto m = dynamic_cast<module::Adaptor_m_to_n*>(&tasks[th][ta]->get_module());
    1324       17010 :                 if (m != nullptr)
    1325        5068 :                     if (tasks[th][ta]->get_name().find("pull") != std::string::npos) m->cancel_waiting();
    1326             :             }
    1327         333 :     };
    1328             : 
    1329          94 :     this->thread_pool->run(func_exec, true);
    1330          94 :     stages[stages.size() - 1]->exec(stop_conditions[stop_conditions.size() - 1]);
    1331             : 
    1332             :     // stop all the stages before
    1333         336 :     for (size_t notify_s = 0; notify_s < stages.size() - 1; notify_s++)
    1334        7510 :         for (auto& m : stages[notify_s]->get_modules<tools::Interface_waiting>())
    1335        7510 :             m->cancel_waiting();
    1336             : 
    1337          94 :     this->thread_pool->wait();
    1338          94 :     this->thread_pool->unset_func_exec();
    1339             :     // ----------------------------------------------------------------------------------------------------------------
    1340             : 
    1341             :     // this is NOT made in the tools::Sequence::exec() to correctly flush the pipeline before restoring buffers
    1342             :     // initial configuration
    1343         430 :     for (auto& stage : this->stages)
    1344         336 :         if (stage->is_no_copy_mode())
    1345             :         {
    1346         336 :             stage->reset_no_copy_mode();
    1347         336 :             stage->gen_processes(false);
    1348             :         }
    1349             : 
    1350         336 :     for (auto& padps : this->adaptors)
    1351             :     {
    1352        2826 :         for (auto& adp : padps.first)
    1353        2584 :             adp->reset();
    1354        2584 :         for (auto& adp : padps.second)
    1355        2342 :             adp->reset();
    1356             :     }
    1357          94 : }
    1358             : 
    1359             : void
    1360           0 : Pipeline::exec(std::function<bool(const std::vector<const int*>&)> stop_condition)
    1361             : {
    1362           0 :     this->exec(std::vector<std::function<bool(const std::vector<const int*>&)>>(1, stop_condition));
    1363           0 : }
    1364             : 
    1365             : void
    1366          94 : Pipeline::exec(std::function<bool()> stop_condition)
    1367             : {
    1368          94 :     this->exec(std::vector<std::function<bool()>>(1, stop_condition));
    1369          94 : }
    1370             : 
    1371             : void
    1372          19 : Pipeline::exec()
    1373             : {
    1374        6942 :     this->exec([]() { return false; });
    1375          19 : }
    1376             : 
    1377             : std::vector<std::vector<module::Module*>>
    1378           0 : Pipeline::get_modules_per_threads() const
    1379             : {
    1380           0 :     std::vector<std::vector<module::Module*>> modules_per_threads;
    1381           0 :     for (auto& stage : this->stages)
    1382             :     {
    1383           0 :         auto modules_per_threads_add = stage->get_modules_per_threads();
    1384           0 :         if (modules_per_threads_add.size() > modules_per_threads.size())
    1385           0 :             modules_per_threads.resize(modules_per_threads_add.size());
    1386             : 
    1387           0 :         for (size_t t = 0; t < modules_per_threads_add.size(); t++)
    1388           0 :             modules_per_threads[t].insert(
    1389           0 :               modules_per_threads[t].end(), modules_per_threads_add[t].begin(), modules_per_threads_add[t].end());
    1390           0 :     }
    1391           0 :     return modules_per_threads;
    1392           0 : }
    1393             : 
    1394             : std::vector<std::vector<module::Module*>>
    1395           0 : Pipeline::get_modules_per_types() const
    1396             : {
    1397           0 :     std::vector<std::vector<module::Module*>> modules_per_types;
    1398           0 :     for (auto& stage : this->stages)
    1399             :     {
    1400           0 :         auto modules_per_types_add = stage->get_modules_per_types();
    1401           0 :         modules_per_types.insert(modules_per_types.end(), modules_per_types_add.begin(), modules_per_types_add.end());
    1402           0 :     }
    1403           0 :     return modules_per_types;
    1404           0 : }
    1405             : 
    1406             : std::vector<std::vector<module::Module*>>
    1407           0 : Pipeline::get_original_modules() const
    1408             : {
    1409           0 :     return this->original_sequence.get_modules_per_types();
    1410             : }
    1411             : 
    1412             : std::vector<std::vector<runtime::Task*>>
    1413          94 : Pipeline::get_tasks_per_threads() const
    1414             : {
    1415          94 :     std::vector<std::vector<runtime::Task*>> tasks_per_threads;
    1416         430 :     for (auto& stage : this->stages)
    1417             :     {
    1418         336 :         auto tasks_per_threads_add = stage->get_tasks_per_threads();
    1419         336 :         if (tasks_per_threads_add.size() > tasks_per_threads.size())
    1420         182 :             tasks_per_threads.resize(tasks_per_threads_add.size());
    1421             : 
    1422        3017 :         for (size_t t = 0; t < tasks_per_threads_add.size(); t++)
    1423       10724 :             tasks_per_threads[t].insert(
    1424       10724 :               tasks_per_threads[t].end(), tasks_per_threads_add[t].begin(), tasks_per_threads_add[t].end());
    1425         336 :     }
    1426          94 :     return tasks_per_threads;
    1427           0 : }
    1428             : 
    1429             : std::vector<std::vector<runtime::Task*>>
    1430           0 : Pipeline::get_tasks_per_types() const
    1431             : {
    1432           0 :     std::vector<std::vector<runtime::Task*>> tasks_per_types;
    1433           0 :     for (auto& stage : this->stages)
    1434             :     {
    1435           0 :         auto tasks_per_types_add = stage->get_tasks_per_types();
    1436           0 :         tasks_per_types.insert(tasks_per_types.end(), tasks_per_types_add.begin(), tasks_per_types_add.end());
    1437           0 :     }
    1438           0 :     return tasks_per_types;
    1439           0 : }
    1440             : 
    1441             : void
    1442           6 : Pipeline::export_dot(std::ostream& stream) const
    1443             : {
    1444             :     std::function<void(tools::Digraph_node<Sub_sequence>*,
    1445             :                        const size_t,
    1446             :                        const std::string&,
    1447             :                        std::ostream&,
    1448             :                        std::vector<tools::Digraph_node<Sub_sequence>*>&)>
    1449          25 :       export_dot_subsequences_recursive = [&export_dot_subsequences_recursive,
    1450             :                                            this](tools::Digraph_node<Sub_sequence>* cur_node,
    1451             :                                                  const size_t sta,
    1452             :                                                  const std::string& tab,
    1453             :                                                  std::ostream& stream,
    1454          25 :                                                  std::vector<tools::Digraph_node<Sub_sequence>*>& already_parsed_nodes)
    1455             :     {
    1456          50 :         if (cur_node != nullptr &&
    1457          50 :             std::find(already_parsed_nodes.begin(), already_parsed_nodes.end(), cur_node) == already_parsed_nodes.end())
    1458             :         {
    1459          25 :             already_parsed_nodes.push_back(cur_node);
    1460          50 :             this->stages[sta]->export_dot_subsequence(cur_node->get_c()->tasks,
    1461          25 :                                                       cur_node->get_c()->tasks_id,
    1462          25 :                                                       cur_node->get_c()->type,
    1463          50 :                                                       "Sub-sequence " + std::to_string(cur_node->get_c()->id) +
    1464         100 :                                                         " (depth = " + std::to_string(cur_node->get_depth()) + ")",
    1465             :                                                       tab,
    1466             :                                                       stream);
    1467             : 
    1468          25 :             for (auto c : cur_node->get_children())
    1469           0 :                 export_dot_subsequences_recursive(c, sta, tab, stream, already_parsed_nodes);
    1470             :         }
    1471          31 :     };
    1472             : 
    1473             :     std::function<void(tools::Digraph_node<Sub_sequence>*,
    1474             :                        const size_t,
    1475             :                        const std::string&,
    1476             :                        std::ostream&,
    1477             :                        std::vector<tools::Digraph_node<Sub_sequence>*>&)>
    1478             :       export_dot_connections_recursive =
    1479          25 :         [&export_dot_connections_recursive, this](tools::Digraph_node<Sub_sequence>* cur_node,
    1480             :                                                   const size_t sta,
    1481             :                                                   const std::string& tab,
    1482             :                                                   std::ostream& stream,
    1483          25 :                                                   std::vector<tools::Digraph_node<Sub_sequence>*>& already_parsed_nodes)
    1484             :     {
    1485          50 :         if (cur_node != nullptr &&
    1486          50 :             std::find(already_parsed_nodes.begin(), already_parsed_nodes.end(), cur_node) == already_parsed_nodes.end())
    1487             :         {
    1488          25 :             already_parsed_nodes.push_back(cur_node);
    1489          25 :             this->stages[sta]->export_dot_connections(cur_node->get_c()->tasks, tab, stream);
    1490             : 
    1491          25 :             for (auto c : cur_node->get_children())
    1492           0 :                 export_dot_connections_recursive(c, sta, tab, stream, already_parsed_nodes);
    1493             :         }
    1494          31 :     };
    1495             : 
    1496           6 :     std::string tab = "\t";
    1497           6 :     stream << "digraph Pipeline {" << std::endl;
    1498           6 :     stream << tab << "compound=true;" << std::endl;
    1499             : 
    1500          31 :     for (size_t sta = 0; sta < this->stages.size(); sta++)
    1501             :     {
    1502          25 :         std::vector<tools::Digraph_node<Sub_sequence>*> already_parsed_nodes;
    1503          25 :         const auto n_threads = this->stages[sta]->get_n_threads();
    1504          25 :         stream << tab << "subgraph \"cluster_Stage " << sta << "\" {" << std::endl;
    1505          25 :         stream << tab << tab << "node [style=filled];" << std::endl;
    1506          25 :         export_dot_subsequences_recursive(this->stages[sta]->sequences[0], sta, tab, stream, already_parsed_nodes);
    1507          25 :         stream << tab << tab << "label=\"Pipeline stage " << sta << " (" << n_threads << " thread(s))\";" << std::endl;
    1508          25 :         std::string color = "blue";
    1509          25 :         stream << tab << tab << "color=" << color << ";" << std::endl;
    1510          25 :         stream << tab << "}" << std::endl;
    1511          25 :     }
    1512             : 
    1513          31 :     for (size_t sta = 0; sta < this->stages.size(); sta++)
    1514             :     {
    1515          25 :         std::vector<tools::Digraph_node<Sub_sequence>*> already_parsed_nodes;
    1516          25 :         export_dot_connections_recursive(this->stages[sta]->sequences[0], sta, tab, stream, already_parsed_nodes);
    1517          25 :         if (this->bound_adaptors)
    1518             :         {
    1519          25 :             if (sta > 0)
    1520             :             {
    1521          19 :                 auto tsk1 = this->stages[sta - 1]->get_lasts_tasks()[0].back();
    1522          19 :                 auto tsk2 = this->stages[sta + 0]->get_firsts_tasks()[0][0];
    1523             : 
    1524          19 :                 auto sck1 = tsk1->sockets[0];
    1525          19 :                 auto sck2 = tsk2->sockets[0];
    1526             : 
    1527          19 :                 stream << tab << "\"" << +sck1.get() << "\" -> \"" << +sck2.get() << "\" [ltail=\"cluster_"
    1528          19 :                        << +&tsk1->get_module() << "_" << +tsk1 << "\" lhead=\"cluster_" << +&tsk2->get_module() << "_"
    1529          19 :                        << +tsk2 << "\" color=\"green\" style=\"dashed\"];" << std::endl;
    1530          19 :             }
    1531             :         }
    1532          25 :     }
    1533             : 
    1534           6 :     stream << "}" << std::endl;
    1535           6 : }
    1536             : 
    1537             : bool
    1538         186 : Pipeline::is_bound_adaptors() const
    1539             : {
    1540         186 :     return this->bound_adaptors;
    1541             : }
    1542             : 
    1543             : void
    1544           0 : Pipeline::set_auto_stop(const bool auto_stop)
    1545             : {
    1546           0 :     this->auto_stop = auto_stop;
    1547           0 :     for (auto stage : this->stages)
    1548           0 :         stage->set_auto_stop(auto_stop);
    1549           0 : }
    1550             : 
    1551             : bool
    1552           0 : Pipeline::is_auto_stop() const
    1553             : {
    1554           0 :     return this->auto_stop;
    1555             : }
    1556             : 
    1557             : size_t
    1558           0 : Pipeline::get_n_frames() const
    1559             : {
    1560           0 :     const auto n_frames = this->stages[0]->get_n_frames();
    1561             : 
    1562           0 :     for (auto& sta : this->stages)
    1563           0 :         if (sta->get_n_frames() != n_frames)
    1564             :         {
    1565           0 :             std::stringstream message;
    1566           0 :             message << "All the stages/sequences do not have the same 'n_frames' value ('sta->get_n_frames()' = "
    1567           0 :                     << sta->get_n_frames() << ", 'n_frames' = " << n_frames << ").";
    1568           0 :             throw tools::runtime_error(__FILE__, __LINE__, __func__, message.str());
    1569           0 :         }
    1570             : 
    1571           0 :     return n_frames;
    1572             : }
    1573             : 
    1574             : void
    1575         186 : Pipeline::set_n_frames(const size_t n_frames)
    1576             : {
    1577         186 :     const auto save_bound_adaptors = this->is_bound_adaptors();
    1578         186 :     if (!save_bound_adaptors) this->bind_adaptors();
    1579         186 :     this->_unbind_adaptors(false);
    1580             : 
    1581             :     // set the new "n_frames" val in the sequences
    1582         186 :     std::vector<std::vector<std::pair<runtime::Socket*, runtime::Socket*>>> unbind_sockets(this->stages.size());
    1583         186 :     std::vector<std::vector<std::pair<runtime::Task*, runtime::Socket*>>> unbind_tasks(this->stages.size());
    1584         186 :     std::vector<bool> skip(this->stages.size());
    1585         850 :     for (size_t s = 0; s < this->stages.size(); s++)
    1586         664 :         skip[s] = this->stages[s]->get_n_frames() == n_frames;
    1587         850 :     for (size_t s = 0; s < this->stages.size(); s++)
    1588         664 :         if (!skip[s]) this->stages[s]->_set_n_frames_unbind(unbind_sockets[s], unbind_tasks[s]);
    1589         850 :     for (size_t s = 0; s < this->stages.size(); s++)
    1590         664 :         if (!skip[s]) this->stages[s]->_set_n_frames(n_frames);
    1591         850 :     for (size_t s = 0; s < this->stages.size(); s++)
    1592         664 :         if (!skip[s]) this->stages[s]->_set_n_frames_rebind(unbind_sockets[s], unbind_tasks[s]);
    1593             : 
    1594             :     // set the new "n_frames" val in the adaptors
    1595         664 :     for (auto& adps : this->adaptors)
    1596             :     {
    1597        5640 :         for (auto& adp : adps.first)
    1598        5162 :             adp->set_n_frames(n_frames);
    1599        5162 :         for (auto& adp : adps.second)
    1600        4684 :             adp->set_n_frames(n_frames);
    1601             :     }
    1602             : 
    1603             :     // bind orphans to complete the unbind of the adaptors
    1604         938 :     for (auto& bind : this->sck_orphan_binds)
    1605             :     {
    1606         752 :         auto sck_out = std::get<0>(bind.first);
    1607         752 :         auto priority = std::get<4>(bind.first);
    1608         752 :         auto sck_in = std::get<0>(bind.second);
    1609         752 :         if (sck_in != nullptr)
    1610         702 :             sck_in->_bind(*sck_out, priority);
    1611             :         else
    1612             :         {
    1613          50 :             auto tsk_in = std::get<4>(bind.second);
    1614          50 :             assert(tsk_in != nullptr);
    1615          50 :             tsk_in->_bind(*sck_out, priority);
    1616             :         }
    1617             :     }
    1618             : 
    1619         186 :     if (save_bound_adaptors) this->bind_adaptors();
    1620         186 : }

Generated by: LCOV version 1.14