LCOV - code coverage report
Current view: top level - src/Runtime/Pipeline - Pipeline.cpp (source / functions) Hit Total Coverage
Test: streampu_clean.info Lines: 549 759 72.3 %
Date: 2025-01-11 12:25:42 Functions: 30 49 61.2 %

          Line data    Source code
       1             : #include <cassert>
       2             : #include <fstream>
       3             : #include <thread>
       4             : #include <tuple>
       5             : #include <utility>
       6             : 
       7             : #include "Module/Stateful/Adaptor/Adaptor_1_to_n.hpp"
       8             : #include "Module/Stateful/Adaptor/Adaptor_n_to_1.hpp"
       9             : #include "Runtime/Pipeline/Pipeline.hpp"
      10             : #include "Tools/Exception/exception.hpp"
      11             : #include "Tools/Interface/Interface_waiting.hpp"
      12             : #include "Tools/Thread/Thread_pinning/Thread_pinning_utils.hpp"
      13             : #include "Tools/Thread/Thread_pool/Standard/Thread_pool_standard.hpp"
      14             : 
      15             : using namespace spu;
      16             : using namespace spu::runtime;
      17             : 
      18             : // Pipeline
      19             : // ::Pipeline(const runtime::Task &first,
      20             : //            const runtime::Task &last,
      21             : //            const std::vector<std::pair<std::vector<const runtime::Task*>, std::vector<const runtime::Task*>>>
      22             : //            &sep_stages, const std::vector<size_t> &n_threads, const std::vector<size_t> &synchro_buffer_sizes, const
      23             : //            std::vector<bool> &synchro_active_waiting, const std::vector<bool> &thread_pinning, const
      24             : //            std::vector<std::vector<size_t>> &puids)
      25             : // : original_sequence(first, last, 1),
      26             : //   stages(sep_stages.size()),
      27             : //   adaptors(sep_stages.size() -1),
      28             : //   saved_firsts_tasks_id(sep_stages.size()),
      29             : //   saved_lasts_tasks_id(sep_stages.size()),
      30             : //   bound_adaptors(false)
      31             : // {
      32             : //      this->init<const runtime::Task>(first,
      33             : //                                      &last,
      34             : //                                      sep_stages,
      35             : //                                      n_threads,
      36             : //                                      synchro_buffer_sizes,
      37             : //                                      synchro_active_waiting,
      38             : //                                      thread_pinning,
      39             : //                                      puids);
      40             : // }
      41             : 
      42             : // Pipeline
      43             : // ::Pipeline(const runtime::Task &first,
      44             : //            const std::vector<std::pair<std::vector<const runtime::Task*>, std::vector<const runtime::Task*>>>
      45             : //            &sep_stages, const std::vector<size_t> &n_threads, const std::vector<size_t> &synchro_buffer_sizes, const
      46             : //            std::vector<bool> &synchro_active_waiting, const std::vector<bool> &thread_pinning, const
      47             : //            std::vector<std::vector<size_t>> &puids)
      48             : // : original_sequence(first, 1),
      49             : //   stages(sep_stages.size()),
      50             : //   adaptors(sep_stages.size() -1),
      51             : //   saved_firsts_tasks_id(sep_stages.size()),
      52             : //   saved_lasts_tasks_id(sep_stages.size()),
      53             : //   bound_adaptors(false)
      54             : // {
      55             : //      const runtime::Task* last = nullptr;
      56             : //      this->init<const runtime::Task>(first,
      57             : //                                      last,
      58             : //                                      sep_stages,
      59             : //                                      n_threads,
      60             : //                                      synchro_buffer_sizes,
      61             : //                                      synchro_active_waiting,
      62             : //                                      thread_pinning,
      63             : //                                      puids);
      64             : // }
      65             : 
      66          13 : Pipeline
      67             : ::Pipeline(const std::vector<runtime::Task*> &firsts,
      68             :            const std::vector<runtime::Task*> &lasts,
      69             :            const std::vector<std::tuple<std::vector<runtime::Task*>, std::vector<runtime::Task*>, std::vector<runtime::Task*>>> &sep_stages,
      70             :            const std::vector<size_t> &n_threads,
      71             :            const std::vector<size_t> &synchro_buffer_sizes,
      72             :            const std::vector<bool> &synchro_active_waiting,
      73             :            const std::vector<bool> &thread_pinning,
      74             :            const std::vector<std::vector<size_t>> &puids/*,
      75          13 :            const std::vector<bool> &tasks_inplace*/)
      76          13 : : original_sequence(firsts, lasts, 1),
      77          13 :   stages(sep_stages.size()),
      78          13 :   adaptors(sep_stages.size() -1),
      79          13 :   saved_firsts_tasks_id(sep_stages.size()),
      80          13 :   saved_lasts_tasks_id(sep_stages.size()),
      81          13 :   bound_adaptors(false),
      82          39 :   auto_stop(true)
      83             : {
      84          13 :     this->init<runtime::Task>(
      85             :       firsts, lasts, sep_stages, n_threads, synchro_buffer_sizes, synchro_active_waiting, thread_pinning, puids
      86             :       /*, tasks_inplace*/);
      87          13 : }
      88             : 
      89          26 : Pipeline
      90             : ::Pipeline(const std::vector<runtime::Task*> &firsts,
      91             :            const std::vector<runtime::Task*> &lasts,
      92             :            const std::vector<std::pair<std::vector<runtime::Task*>, std::vector<runtime::Task*>>> &sep_stages,
      93             :            const std::vector<size_t> &n_threads,
      94             :            const std::vector<size_t> &synchro_buffer_sizes,
      95             :            const std::vector<bool> &synchro_active_waiting,
      96             :            const std::vector<bool> &thread_pinning,
      97             :            const std::vector<std::vector<size_t>> &puids/*,
      98          26 :            const std::vector<bool> &tasks_inplace*/)
      99          26 : : original_sequence(firsts, lasts, 1),
     100          26 :   stages(sep_stages.size()),
     101          26 :   adaptors(sep_stages.size() -1),
     102          26 :   saved_firsts_tasks_id(sep_stages.size()),
     103          26 :   saved_lasts_tasks_id(sep_stages.size()),
     104          26 :   bound_adaptors(false),
     105          78 :   auto_stop(true)
     106             : {
     107             :     std::vector<std::tuple<std::vector<runtime::Task*>, std::vector<runtime::Task*>, std::vector<runtime::Task*>>>
     108          26 :       sep_stages_bis;
     109         117 :     for (auto& sep_stage : sep_stages)
     110          91 :         sep_stages_bis.push_back(std::make_tuple(sep_stage.first, sep_stage.second, std::vector<runtime::Task*>()));
     111             : 
     112          26 :     this->init<runtime::Task>(firsts,
     113             :                                   lasts,
     114             :                                   sep_stages_bis,
     115             :                                   n_threads,
     116             :                                   synchro_buffer_sizes,
     117             :                                   synchro_active_waiting,
     118             :                                   thread_pinning, puids/*,
     119             :                                   tasks_inplace*/);
     120          26 : }
     121             : 
     122          13 : Pipeline
     123             : ::Pipeline(const std::vector<runtime::Task*> &firsts,
     124             :            const std::vector<std::tuple<std::vector<runtime::Task*>, std::vector<runtime::Task*>, std::vector<runtime::Task*>>> &sep_stages,
     125             :            const std::vector<size_t> &n_threads,
     126             :            const std::vector<size_t> &synchro_buffer_sizes,
     127             :            const std::vector<bool> &synchro_active_waiting,
     128             :            const std::vector<bool> &thread_pinning,
     129             :            const std::vector<std::vector<size_t>> &puids/*,
     130          13 :            const std::vector<bool> &tasks_inplace*/)
     131             : : Pipeline(firsts,
     132             :            {},
     133             :            sep_stages,
     134             :            n_threads,
     135             :            synchro_buffer_sizes,
     136             :            synchro_active_waiting,
     137             :            thread_pinning, puids/*,
     138          13 :            tasks_inplace*/)
     139             : {
     140          13 : }
     141             : 
     142          26 : Pipeline
     143             : ::Pipeline(const std::vector<runtime::Task*> &firsts,
     144             :            const std::vector<std::pair<std::vector<runtime::Task*>, std::vector<runtime::Task*>>> &sep_stages,
     145             :            const std::vector<size_t> &n_threads,
     146             :            const std::vector<size_t> &synchro_buffer_sizes,
     147             :            const std::vector<bool> &synchro_active_waiting,
     148             :            const std::vector<bool> &thread_pinning,
     149             :            const std::vector<std::vector<size_t>> &puids/*,
     150          26 :            const std::vector<bool> &tasks_inplace*/)
     151             : : Pipeline(firsts,
     152             :            {},
     153             :            sep_stages,
     154             :            n_threads,
     155             :            synchro_buffer_sizes,
     156             :            synchro_active_waiting,
     157             :            thread_pinning, puids/*,
     158          26 :            tasks_inplace*/)
     159             : {
     160          26 : }
     161             : 
     162           0 : Pipeline
     163             : ::Pipeline(runtime::Task &first,
     164             :            runtime::Task &last,
     165             :            const std::vector<std::pair<std::vector<runtime::Task*>, std::vector<runtime::Task*>>> &sep_stages,
     166             :            const std::vector<size_t> &n_threads,
     167             :            const std::vector<size_t> &synchro_buffer_sizes,
     168             :            const std::vector<bool> &synchro_active_waiting,
     169             :            const std::vector<bool> &thread_pinning,
     170             :            const std::vector<std::vector<size_t>> &puids/*,
     171           0 :            const std::vector<bool> &tasks_inplace*/)
     172             : : Pipeline({&first},
     173             :            {&last},
     174             :            sep_stages,
     175             :            n_threads,
     176             :            synchro_buffer_sizes,
     177             :            synchro_active_waiting,
     178             :            thread_pinning, puids/*,
     179           0 :            tasks_inplace*/)
     180             : {
     181           0 : }
     182             : 
     183          26 : Pipeline
     184             : ::Pipeline(runtime::Task &first,
     185             :            const std::vector<std::pair<std::vector<runtime::Task*>, std::vector<runtime::Task*>>> &sep_stages,
     186             :            const std::vector<size_t> &n_threads,
     187             :            const std::vector<size_t> &synchro_buffer_sizes,
     188             :            const std::vector<bool> &synchro_active_waiting,
     189             :            const std::vector<bool> &thread_pinning,
     190             :            const std::vector<std::vector<size_t>> &puids/*,
     191          26 :            const std::vector<bool> &tasks_inplace*/)
     192             : : Pipeline({&first},
     193             :            sep_stages,
     194             :            n_threads,
     195             :            synchro_buffer_sizes,
     196             :            synchro_active_waiting,
     197             :            thread_pinning, puids/*,
     198          26 :            tasks_inplace*/)
     199             : {
     200          26 : }
     201             : 
     202             : //========================== Pipeline constructors with new version thread pinning =====================================
     203           0 : Pipeline
     204             : ::Pipeline(const std::vector<runtime::Task*> &firsts,
     205             :            const std::vector<runtime::Task*> &lasts,
     206             :            const std::vector<std::tuple<std::vector<runtime::Task*>, std::vector<runtime::Task*>, std::vector<runtime::Task*>>> &sep_stages,
     207             :            const std::vector<size_t> &n_threads,
     208             :            const std::vector<size_t> &synchro_buffer_sizes,
     209             :            const std::vector<bool> &synchro_active_waiting,
     210             :            const std::vector<bool> &thread_pinning,
     211             :            const std::string &pipeline_pinning_policy/*,
     212           0 :            const std::vector<bool> &tasks_inplace*/)
     213           0 : : original_sequence(firsts, lasts, 1),
     214           0 :   stages(sep_stages.size()),
     215           0 :   adaptors(sep_stages.size() -1),
     216           0 :   saved_firsts_tasks_id(sep_stages.size()),
     217           0 :   saved_lasts_tasks_id(sep_stages.size()),
     218           0 :   bound_adaptors(false),
     219           0 :   auto_stop(true)
     220             : {
     221           0 :     this->init<runtime::Task>(firsts,
     222             :                                   lasts,
     223             :                                   sep_stages,
     224             :                                   n_threads,
     225             :                                   synchro_buffer_sizes,
     226             :                                   synchro_active_waiting,
     227             :                                   thread_pinning, {}, pipeline_pinning_policy/*,
     228             :                                   tasks_inplace*/);
     229           0 : }
     230             : 
     231          41 : Pipeline
     232             : ::Pipeline(const std::vector<runtime::Task*> &firsts,
     233             :            const std::vector<runtime::Task*> &lasts,
     234             :            const std::vector<std::pair<std::vector<runtime::Task*>, std::vector<runtime::Task*>>> &sep_stages,
     235             :            const std::vector<size_t> &n_threads,
     236             :            const std::vector<size_t> &synchro_buffer_sizes,
     237             :            const std::vector<bool> &synchro_active_waiting,
     238             :            const std::vector<bool> &thread_pinning,
     239             :            const std::string &pipeline_pinning_policy/*,
     240          41 :            const std::vector<bool> &tasks_inplace*/)
     241          41 : : original_sequence(firsts, lasts, 1),
     242          41 :   stages(sep_stages.size()),
     243          41 :   adaptors(sep_stages.size() -1),
     244          41 :   saved_firsts_tasks_id(sep_stages.size()),
     245          41 :   saved_lasts_tasks_id(sep_stages.size()),
     246          41 :   bound_adaptors(false),
     247         123 :   auto_stop(true)
     248             : {
     249             :     std::vector<std::tuple<std::vector<runtime::Task*>, std::vector<runtime::Task*>, std::vector<runtime::Task*>>>
     250          41 :       sep_stages_bis;
     251         183 :     for (auto& sep_stage : sep_stages)
     252         142 :         sep_stages_bis.push_back(std::make_tuple(sep_stage.first, sep_stage.second, std::vector<runtime::Task*>()));
     253             : 
     254          41 :     this->init<runtime::Task>(firsts,
     255             :                                   lasts,
     256             :                                   sep_stages_bis,
     257             :                                   n_threads,
     258             :                                   synchro_buffer_sizes,
     259             :                                   synchro_active_waiting,
     260             :                                   thread_pinning, {}, pipeline_pinning_policy/*,
     261             :                                   tasks_inplace*/);
     262          41 : }
     263             : 
     264           0 : Pipeline
     265             : ::Pipeline(const std::vector<runtime::Task*> &firsts,
     266             :            const std::vector<std::tuple<std::vector<runtime::Task*>, std::vector<runtime::Task*>, std::vector<runtime::Task*>>> &sep_stages,
     267             :            const std::vector<size_t> &n_threads,
     268             :            const std::vector<size_t> &synchro_buffer_sizes,
     269             :            const std::vector<bool> &synchro_active_waiting,
     270             :            const std::vector<bool> &thread_pinning,
     271             :            const std::string &pipeline_pinning_policy/*,
     272           0 :            const std::vector<bool> &tasks_inplace*/)
     273             : : Pipeline(firsts,
     274             :            {},
     275             :            sep_stages,
     276             :            n_threads,
     277             :            synchro_buffer_sizes,
     278             :            synchro_active_waiting,
     279             :            thread_pinning, pipeline_pinning_policy/*,
     280           0 :            tasks_inplace*/)
     281             : {
     282           0 : }
     283             : 
     284          30 : Pipeline
     285             : ::Pipeline(const std::vector<runtime::Task*> &firsts,
     286             :            const std::vector<std::pair<std::vector<runtime::Task*>, std::vector<runtime::Task*>>> &sep_stages,
     287             :            const std::vector<size_t> &n_threads,
     288             :            const std::vector<size_t> &synchro_buffer_sizes,
     289             :            const std::vector<bool> &synchro_active_waiting,
     290             :            const std::vector<bool> &thread_pinning,
     291             :            const std::string &pipeline_pinning_policy/*,
     292          30 :            const std::vector<bool> &tasks_inplace*/)
     293             : : Pipeline(firsts,
     294             :            {},
     295             :            sep_stages,
     296             :            n_threads,
     297             :            synchro_buffer_sizes,
     298             :            synchro_active_waiting,
     299             :            thread_pinning, pipeline_pinning_policy/*,
     300          30 :            tasks_inplace*/)
     301             : {
     302          30 : }
     303             : 
     304           0 : Pipeline
     305             : ::Pipeline(runtime::Task &first,
     306             :            runtime::Task &last,
     307             :            const std::vector<std::pair<std::vector<runtime::Task*>, std::vector<runtime::Task*>>> &sep_stages,
     308             :            const std::vector<size_t> &n_threads,
     309             :            const std::vector<size_t> &synchro_buffer_sizes,
     310             :            const std::vector<bool> &synchro_active_waiting,
     311             :            const std::vector<bool> &thread_pinning,
     312             :            const std::string &pipeline_pinning_policy/*,
     313           0 :            const std::vector<bool> &tasks_inplace*/)
     314             : : Pipeline({&first},
     315             :            {&last},
     316             :            sep_stages,
     317             :            n_threads,
     318             :            synchro_buffer_sizes,
     319             :            synchro_active_waiting,
     320             :            thread_pinning, pipeline_pinning_policy/*,
     321           0 :            tasks_inplace*/)
     322             : {
     323           0 : }
     324             : 
     325          30 : Pipeline
     326             : ::Pipeline(runtime::Task &first,
     327             :            const std::vector<std::pair<std::vector<runtime::Task*>, std::vector<runtime::Task*>>> &sep_stages,
     328             :            const std::vector<size_t> &n_threads,
     329             :            const std::vector<size_t> &synchro_buffer_sizes,
     330             :            const std::vector<bool> &synchro_active_waiting,
     331             :            const std::vector<bool> &thread_pinning,
     332             :            const std::string &pipeline_pinning_policy/*,
     333          30 :            const std::vector<bool> &tasks_inplace*/)
     334             : : Pipeline({&first},
     335             :            sep_stages,
     336             :            n_threads,
     337             :            synchro_buffer_sizes,
     338             :            synchro_active_waiting,
     339             :            thread_pinning,
     340             :            pipeline_pinning_policy/*,
     341          30 :            tasks_inplace*/)
     342             : {
     343          30 : }
     344             : 
     345         160 : Pipeline::~Pipeline()
     346             : {
     347          80 :     this->unbind_adaptors();
     348         160 : }
     349             : 
     350             : std::vector<Sequence*>
     351         159 : Pipeline::get_stages()
     352             : {
     353         159 :     std::vector<Sequence*> stages;
     354         748 :     for (auto& stage : this->stages)
     355         589 :         stages.push_back(stage.get());
     356         159 :     return stages;
     357           0 : }
     358             : 
     359             : Sequence&
     360           0 : Pipeline::operator[](const size_t stage_id)
     361             : {
     362           0 :     assert(stage_id < this->stages.size());
     363           0 :     return *this->stages[stage_id];
     364             : }
     365             : 
     366             : template<class TA>
     367             : runtime::Sequence*
     368             : create_sequence(const std::vector<TA*>& firsts,
     369             :                 const std::vector<TA*>& lasts,
     370             :                 const std::vector<TA*>& exclusions,
     371             :                 const size_t& n_threads,
     372             :                 const bool& thread_pinning,
     373             :                 const std::vector<size_t>& puids,
     374             :                 const bool& tasks_inplace)
     375             : {
     376             :     throw tools::unimplemented_error(__FILE__, __LINE__, __func__);
     377             : }
     378             : 
     379             : template<>
     380             : runtime::Sequence*
     381           0 : create_sequence<const runtime::Task>(const std::vector<const runtime::Task*>& firsts,
     382             :                                      const std::vector<const runtime::Task*>& lasts,
     383             :                                      const std::vector<const runtime::Task*>& exclusions,
     384             :                                      const size_t& n_threads,
     385             :                                      const bool& thread_pinning,
     386             :                                      const std::vector<size_t>& puids,
     387             :                                      const bool& tasks_inplace)
     388             : {
     389           0 :     return new runtime::Sequence(firsts, lasts, exclusions, n_threads, thread_pinning, puids);
     390             : }
     391             : 
     392             : template<>
     393             : Sequence*
     394         248 : create_sequence<runtime::Task>(const std::vector<runtime::Task*>& firsts,
     395             :                                const std::vector<runtime::Task*>& lasts,
     396             :                                const std::vector<runtime::Task*>& exclusions,
     397             :                                const size_t& n_threads,
     398             :                                const bool& thread_pinning,
     399             :                                const std::vector<size_t>& puids,
     400             :                                const bool& tasks_inplace)
     401             : {
     402         248 :     return new runtime::Sequence(firsts, lasts, exclusions, n_threads, thread_pinning, puids, tasks_inplace);
     403             : }
     404             : 
     405             : // Init and sequence creation for second pinning version
     406             : template<class TA>
     407             : runtime::Sequence*
     408             : create_sequence(const std::vector<TA*>& firsts,
     409             :                 const std::vector<TA*>& lasts,
     410             :                 const std::vector<TA*>& exclusions,
     411             :                 const size_t& n_threads,
     412             :                 const bool& thread_pinning,
     413             :                 const std::string& pipeline_pinning_policy,
     414             :                 const bool& tasks_inplace)
     415             : {
     416             :     throw tools::unimplemented_error(__FILE__, __LINE__, __func__);
     417             : }
     418             : 
     419             : template<>
     420             : runtime::Sequence*
     421           0 : create_sequence<const runtime::Task>(const std::vector<const runtime::Task*>& firsts,
     422             :                                      const std::vector<const runtime::Task*>& lasts,
     423             :                                      const std::vector<const runtime::Task*>& exclusions,
     424             :                                      const size_t& n_threads,
     425             :                                      const bool& thread_pinning,
     426             :                                      const std::string& pipeline_pinning_policy,
     427             :                                      const bool& tasks_inplace)
     428             : {
     429           0 :     return new runtime::Sequence(firsts, lasts, exclusions, n_threads, thread_pinning, pipeline_pinning_policy);
     430             : }
     431             : 
     432             : template<>
     433             : Sequence*
     434          34 : create_sequence<runtime::Task>(const std::vector<runtime::Task*>& firsts,
     435             :                                const std::vector<runtime::Task*>& lasts,
     436             :                                const std::vector<runtime::Task*>& exclusions,
     437             :                                const size_t& n_threads,
     438             :                                const bool& thread_pinning,
     439             :                                const std::string& pipeline_pinning_policy,
     440             :                                const bool& tasks_inplace)
     441             : {
     442             :     return new runtime::Sequence(
     443          34 :       firsts, lasts, exclusions, n_threads, thread_pinning, pipeline_pinning_policy, tasks_inplace);
     444             : }
     445             : 
     446             : template <class TA>
     447          80 : void Pipeline
     448             : ::init(const std::vector<TA*> &firsts,
     449             :        const std::vector<TA*> &lasts,
     450             :        const std::vector<std::tuple<std::vector<TA*>,std::vector<TA*>,std::vector<TA*>>> &sep_stages,
     451             :        const std::vector<size_t> &n_threads,
     452             :        const std::vector<size_t> &synchro_buffer_sizes,
     453             :        const std::vector<bool> &synchro_active_waiting,
     454             :        const std::vector<bool> &thread_pinning,
     455             :        const std::vector<std::vector<size_t>> &puids,
     456             :        const std::string &pipeline_pinning_policy/*,
     457             :        const std::vector<bool> &tasks_inplace*/)
     458             : {
     459          80 :     if (sep_stages.size() != n_threads.size() && n_threads.size() != 0)
     460             :     {
     461           0 :         std::stringstream message;
     462           0 :         message << "'n_threads.size()' has to be equal to 'sep_stages.size()' or equal to '0' ('n_threads.size()' = "
     463           0 :                 << n_threads.size() << " , 'sep_stages.size()' = " << sep_stages.size() << ").";
     464           0 :         throw tools::invalid_argument(__FILE__, __LINE__, __func__, message.str());
     465           0 :     }
     466             : 
     467          80 :     if (sep_stages.size() != synchro_buffer_sizes.size() + 1 && synchro_buffer_sizes.size() != 0)
     468             :     {
     469           0 :         std::stringstream message;
     470             :         message << "'synchro_buffer_sizes.size()' has to be equal to 'sep_stages.size() -1' or equal to '0' "
     471           0 :                 << "('synchro_buffer_sizes.size()' = " << synchro_buffer_sizes.size()
     472           0 :                 << " , 'sep_stages.size()' = " << sep_stages.size() << ").";
     473           0 :         throw tools::invalid_argument(__FILE__, __LINE__, __func__, message.str());
     474           0 :     }
     475             : 
     476          80 :     if (sep_stages.size() != synchro_active_waiting.size() + 1 && synchro_active_waiting.size() != 0)
     477             :     {
     478           0 :         std::stringstream message;
     479             :         message << "'synchro_active_waiting.size()' has to be equal to 'sep_stages.size() -1' or equal to '0' "
     480           0 :                 << "('synchro_active_waiting.size()' = " << synchro_active_waiting.size()
     481           0 :                 << " , 'sep_stages.size()' = " << sep_stages.size() << ").";
     482           0 :         throw tools::invalid_argument(__FILE__, __LINE__, __func__, message.str());
     483           0 :     }
     484             : 
     485          80 :     if (sep_stages.size() != thread_pinning.size() && thread_pinning.size() != 0)
     486             :     {
     487           0 :         std::stringstream message;
     488             :         message << "'thread_pinning.size()' has to be equal to 'sep_stages.size()' or equal to '0' ("
     489           0 :                 << "'thread_pinning.size()' = " << thread_pinning.size()
     490           0 :                 << " , 'sep_stages.size()' = " << sep_stages.size() << ").";
     491           0 :         throw tools::invalid_argument(__FILE__, __LINE__, __func__, message.str());
     492           0 :     }
     493             : 
     494          80 :     if (sep_stages.size() != puids.size() && puids.size() != 0)
     495             :     {
     496           0 :         std::stringstream message;
     497           0 :         message << "'puids.size()' has to be equal to 'sep_stages.size()' or equal to '0' ('puids.size()' = "
     498           0 :                 << puids.size() << " , 'sep_stages.size()' = " << sep_stages.size() << ").";
     499           0 :         throw tools::invalid_argument(__FILE__, __LINE__, __func__, message.str());
     500           0 :     }
     501             : 
     502             :     // if (sep_stages.size() != tasks_inplace.size() && tasks_inplace.size() != 0)
     503             :     // {
     504             :     //  std::stringstream message;
     505             :     //  message << "'tasks_inplace.size()' has to be equal to 'sep_stages.size()' or equal to '0' ('"
     506             :     //          << "tasks_inplace.size()' = " << tasks_inplace.size() << " , 'sep_stages.size()' = "
     507             :     //          << sep_stages.size() << ").";
     508             :     //  throw tools::invalid_argument(__FILE__, __LINE__, __func__, message.str());
     509             :     // }
     510             : 
     511          80 :     bool prev_is_parallel = false;
     512         362 :     for (auto t : n_threads)
     513             :     {
     514         282 :         if (t > 1 && prev_is_parallel)
     515             :         {
     516           0 :             std::stringstream message;
     517           0 :             message << "Consecutive parallel stages are not supported.";
     518           0 :             throw tools::invalid_argument(__FILE__, __LINE__, __func__, message.str());
     519           0 :         }
     520         282 :         prev_is_parallel = t > 1;
     521             :     }
     522             : 
     523             :     // Creating a vector of pinning policies for each sequence
     524          80 :     std::vector<std::string> sequences_pinning_policies;
     525          80 :     if (!pipeline_pinning_policy.empty())
     526          13 :         sequences_pinning_policies =
     527             :           tools::Thread_pinning_utils::pipeline_parser_unpacker(pipeline_pinning_policy, sep_stages.size());
     528             : 
     529         362 :     for (size_t s = 0; s < sep_stages.size(); s++)
     530             :     {
     531         282 :         const std::vector<TA*>& stage_firsts = std::get<0>(sep_stages[s]);
     532         282 :         const std::vector<TA*>& stage_lasts = std::get<1>(sep_stages[s]);
     533         282 :         const std::vector<TA*>& stage_exclusions = std::get<2>(sep_stages[s]);
     534         282 :         const size_t stage_n_threads = n_threads.size() ? n_threads[s] : 1;
     535         282 :         const bool stage_thread_pinning = thread_pinning.size() ? thread_pinning[s] : false;
     536         282 :         const std::vector<size_t> stage_puids = puids.size() ? puids[s] : std::vector<size_t>();
     537         316 :         const std::string sequence_pinning_policy =
     538         316 :           sequences_pinning_policies.size() ? sequences_pinning_policies[s] : "";
     539         282 :         const bool stage_tasks_inplace = /*tasks_inplace.size() ? tasks_inplace[s] :*/ true;
     540             :         try
     541             :         {
     542         282 :             if (pipeline_pinning_policy.empty())
     543         248 :                 this->stages[s].reset(create_sequence<TA>(stage_firsts,
     544             :                                                           stage_lasts,
     545             :                                                           stage_exclusions,
     546             :                                                           stage_n_threads,
     547             :                                                           stage_thread_pinning,
     548             :                                                           stage_puids,
     549             :                                                           stage_tasks_inplace));
     550             :             else
     551          34 :                 this->stages[s].reset(create_sequence<TA>(stage_firsts,
     552             :                                                           stage_lasts,
     553             :                                                           stage_exclusions,
     554             :                                                           stage_n_threads,
     555             :                                                           stage_thread_pinning,
     556             :                                                           sequence_pinning_policy,
     557             :                                                           stage_tasks_inplace));
     558             :         }
     559           0 :         catch (const tools::control_flow_error& e)
     560             :         {
     561           0 :             std::stringstream message;
     562           0 :             message << "Invalid control flow error on stage " << s
     563           0 :                     << " (perhaps a switcher's tasks were separated between different stages)." << std::endl
     564           0 :                     << e.what();
     565           0 :             throw tools::control_flow_error(__FILE__, __LINE__, __func__, message.str());
     566           0 :         }
     567         282 :         this->stages[s]->is_part_of_pipeline = true;
     568             :     }
     569             : 
     570             :     // verify that the sequential sequence is equivalent to the pipeline sequence
     571          80 :     auto ref_tasks = this->original_sequence.get_tasks_per_threads()[0];
     572          80 :     auto cur_tasks = this->get_tasks_per_threads()[0];
     573             : 
     574          80 :     if (ref_tasks.size() != cur_tasks.size())
     575             :     {
     576           0 :         std::ofstream f1("dbg_ref_sequence.dot");
     577           0 :         this->original_sequence.export_dot(f1);
     578           0 :         std::ofstream f2("dbg_cur_pipeline.dot");
     579           0 :         this->export_dot(f2);
     580             : 
     581           0 :         std::stringstream message;
     582           0 :         message << "'ref_tasks.size()' has to be equal to 'cur_tasks.size()' ('ref_tasks.size()' = " << ref_tasks.size()
     583           0 :                 << ", 'cur_tasks.size()' = " << cur_tasks.size() << ").";
     584           0 :         throw tools::invalid_argument(__FILE__, __LINE__, __func__, message.str());
     585           0 :     }
     586             : 
     587         820 :     for (size_t ta = 0; ta < cur_tasks.size(); ta++)
     588             :     {
     589         740 :         if (std::find(ref_tasks.begin(), ref_tasks.end(), cur_tasks[ta]) == ref_tasks.end())
     590             :         {
     591           0 :             std::ofstream f1("dbg_ref_sequence.dot");
     592           0 :             this->original_sequence.export_dot(f1);
     593           0 :             std::ofstream f2("dbg_cur_pipeline.dot");
     594           0 :             this->export_dot(f2);
     595             : 
     596           0 :             std::stringstream message;
     597           0 :             message << "'cur_tasks[ta]' is not contained in the 'ref_tasks' vector ('ta' = " << ta
     598           0 :                     << ", 'cur_tasks[ta]' = " << +cur_tasks[ta]
     599           0 :                     << ", 'cur_tasks[ta]->get_name()' = " << cur_tasks[ta]->get_name() << ").";
     600           0 :             throw tools::invalid_argument(__FILE__, __LINE__, __func__, message.str());
     601           0 :         }
     602             :     }
     603             : 
     604          80 :     this->create_adaptors(synchro_buffer_sizes, synchro_active_waiting);
     605          80 :     this->bind_adaptors();
     606             : 
     607          80 :     this->thread_pool.reset(new tools::Thread_pool_standard(this->stages.size() - 1));
     608          80 :     this->thread_pool->init(); // threads are spawned here
     609          80 : }
     610             : 
     611             : void
     612          80 : Pipeline::create_adaptors(const std::vector<size_t>& synchro_buffer_sizes,
     613             :                           const std::vector<bool>& synchro_active_waiting)
     614             : {
     615             :     //                     sck out addr     occ     stage   tsk id  sck id
     616          80 :     std::vector<std::tuple<runtime::Socket*, size_t, size_t, size_t, size_t>> out_sck_orphans;
     617             : 
     618             :     // for all the stages in the pipeline
     619         282 :     for (size_t sta = 0; sta < this->stages.size() - 1; sta++)
     620             :     {
     621             :         // ------------------------------------------------------------------------------------------------------------
     622             :         // --------------------------------------------------------------- collect orphan output sockets in stage 'sta'
     623             :         // ------------------------------------------------------------------------------------------------------------
     624         202 :         std::vector<std::vector<runtime::Task*>> tasks_per_threads = this->stages[sta]->get_tasks_per_threads();
     625             :         // for all the threads in the current stage
     626             :         // for (size_t t = 0; t < tasks_per_threads.size(); t++)
     627         202 :         size_t t = 0;
     628             :         {
     629             :             // for all the tasks in the stage
     630         746 :             for (size_t tsk_id = 0; tsk_id < tasks_per_threads[t].size(); tsk_id++)
     631             :             {
     632         544 :                 auto tsk = tasks_per_threads[t][tsk_id];
     633             :                 // for all the sockets of the tasks
     634        1914 :                 for (size_t sck_id = 0; sck_id < tsk->sockets.size(); sck_id++)
     635             :                 {
     636        1370 :                     auto sck = tsk->sockets[sck_id];
     637             :                     // if the current socket is an output or forward socket type
     638        1370 :                     if (sck->get_type() == socket_t::SOUT || sck->get_type() == socket_t::SFWD)
     639             :                     {
     640             :                         // for all the bounded sockets to the current socket
     641        1788 :                         for (auto bsck : sck->get_bound_sockets())
     642             :                         {
     643             :                             // check if the task of the bounded socket is not in the current stage
     644         683 :                             if (std::find(tasks_per_threads[t].begin(),
     645         683 :                                           tasks_per_threads[t].end(),
     646        1366 :                                           &bsck->get_task()) == tasks_per_threads[t].end())
     647             :                             {
     648             :                                 // check the position of the socket in the orphans
     649         331 :                                 size_t pos = 0;
     650         904 :                                 for (; pos < out_sck_orphans.size(); pos++)
     651         616 :                                     if (std::get<0>(out_sck_orphans[pos]) == sck.get()) break;
     652             : 
     653         331 :                                 if (pos == out_sck_orphans.size())
     654         288 :                                     out_sck_orphans.push_back(std::make_tuple(sck.get(), 1, sta, tsk_id, sck_id));
     655             :                                 else
     656          43 :                                     std::get<1>(out_sck_orphans[pos])++;
     657             :                             }
     658             :                         }
     659             :                     }
     660        1370 :                 }
     661             :             }
     662             :         }
     663             : 
     664             :         // ------------------------------------------------------------------------------------------------------------
     665             :         // -------------------------------------- collect orphan input sockets in stage 'sta +1' and create connections
     666             :         // ------------------------------------------------------------------------------------------------------------
     667         202 :         tasks_per_threads = this->stages[sta + 1]->get_tasks_per_threads();
     668             :         // for all the threads in the current stage
     669             :         // for (size_t t = 0; t < tasks_per_threads.size(); t++)
     670             :         {
     671             :             // for all the tasks in the stage
     672         791 :             for (size_t tsk_id = 0; tsk_id < tasks_per_threads[t].size(); tsk_id++)
     673             :             {
     674         589 :                 auto tsk = tasks_per_threads[t][tsk_id];
     675             :                 // ----------------------------------------- manage socket to socket bindings (with fake input sockets)
     676             :                 // for all the sockets of the tasks
     677        2056 :                 for (size_t sck_id = 0; sck_id < tsk->sockets.size(); sck_id++)
     678             :                 {
     679        1467 :                     auto sck = tsk->sockets[sck_id];
     680             :                     // if the current socket is an input or forward socket type
     681        1467 :                     if (sck->get_type() == socket_t::SIN || sck->get_type() == socket_t::SFWD)
     682             :                     {
     683         596 :                         runtime::Socket* bsck = nullptr;
     684             :                         try
     685             :                         {
     686             :                             // get output bounded socket
     687         596 :                             bsck = &sck->get_bound_socket(); // can throw if there is no bounded socket
     688             :                         }
     689           0 :                         catch (const std::exception&)
     690             :                         {
     691           0 :                         }
     692         596 :                         if (bsck != nullptr)
     693             :                         {
     694             :                             // check if the task of the bounded socket is not in the current stage
     695         596 :                             if (std::find(tasks_per_threads[t].begin(),
     696         596 :                                           tasks_per_threads[t].end(),
     697        1192 :                                           &bsck->get_task()) == tasks_per_threads[t].end())
     698             :                             {
     699             :                                 // check the position of the bounded socket in the orphans
     700         305 :                                 size_t pos = 0;
     701         787 :                                 for (; pos < out_sck_orphans.size(); pos++)
     702         787 :                                     if (std::get<0>(out_sck_orphans[pos]) == bsck) break;
     703             : 
     704         305 :                                 if (pos < out_sck_orphans.size())
     705             :                                 {
     706         305 :                                     auto sck_out = std::get<0>(out_sck_orphans[pos]);
     707         305 :                                     auto sck_in = sck.get();
     708         610 :                                     auto unbind_sout_pos = std::distance(sck_out->get_bound_sockets().begin(),
     709         305 :                                                                          std::find(sck_out->get_bound_sockets().begin(),
     710         305 :                                                                                    sck_out->get_bound_sockets().end(),
     711         305 :                                                                                    sck_in));
     712         305 :                                     this->sck_orphan_binds.push_back(
     713         305 :                                       std::make_pair(std::make_tuple(std::get<0>(out_sck_orphans[pos]),
     714         305 :                                                                      std::get<2>(out_sck_orphans[pos]),
     715         305 :                                                                      std::get<3>(out_sck_orphans[pos]),
     716         305 :                                                                      std::get<4>(out_sck_orphans[pos]),
     717             :                                                                      unbind_sout_pos),
     718         610 :                                                      std::make_tuple(sck_in, sta + 1, tsk_id, sck_id, nullptr)));
     719             :                                 }
     720             :                             }
     721             :                         }
     722             :                     }
     723        1467 :                 }
     724             :                 // ------------------------------------------- manage socket to task bindings (with fake input sockets)
     725         717 :                 for (size_t sck_id = 0; sck_id < tsk->fake_input_sockets.size(); sck_id++)
     726             :                 {
     727         128 :                     auto sck = tsk->fake_input_sockets[sck_id];
     728         128 :                     runtime::Socket* bsck = nullptr;
     729             :                     try
     730             :                     {
     731             :                         // get output bounded socket
     732         128 :                         bsck = &sck->get_bound_socket(); // can throw if there is no bounded socket
     733             :                     }
     734           0 :                     catch (const std::exception&)
     735             :                     {
     736           0 :                     }
     737         128 :                     if (bsck != nullptr)
     738             :                     {
     739             :                         // check if the task of the bounded socket is not in the current stage
     740         128 :                         if (std::find(tasks_per_threads[t].begin(), tasks_per_threads[t].end(), &bsck->get_task()) ==
     741         256 :                             tasks_per_threads[t].end())
     742             :                         {
     743             :                             // check the position of the bounded socket in the orphans
     744          26 :                             size_t pos = 0;
     745         117 :                             for (; pos < out_sck_orphans.size(); pos++)
     746         117 :                                 if (std::get<0>(out_sck_orphans[pos]) == bsck) break;
     747             : 
     748          26 :                             if (pos < out_sck_orphans.size())
     749             :                             {
     750          26 :                                 auto sck_out = std::get<0>(out_sck_orphans[pos]);
     751          26 :                                 auto sck_in = sck.get();
     752          52 :                                 auto unbind_sout_pos = std::distance(sck_out->get_bound_sockets().begin(),
     753          26 :                                                                      std::find(sck_out->get_bound_sockets().begin(),
     754          26 :                                                                                sck_out->get_bound_sockets().end(),
     755          26 :                                                                                sck_in));
     756          26 :                                 this->sck_orphan_binds.push_back(
     757          26 :                                   std::make_pair(std::make_tuple(std::get<0>(out_sck_orphans[pos]),
     758          26 :                                                                  std::get<2>(out_sck_orphans[pos]),
     759          26 :                                                                  std::get<3>(out_sck_orphans[pos]),
     760          26 :                                                                  std::get<4>(out_sck_orphans[pos]),
     761             :                                                                  unbind_sout_pos),
     762          52 :                                                  std::make_tuple(nullptr, sta + 1, tsk_id, sck_id, tsk)));
     763             :                             }
     764             :                         }
     765             :                     }
     766         128 :                 }
     767             :                 // ----------------------------------------------------------------------------------------------------
     768             :             }
     769             :         }
     770         202 :     }
     771             : 
     772             :     // ----------------------------------------------------------------------------------------------------------------
     773             :     // ----------------------------------------------------------------------------------------------- prints for debug
     774             :     // ----------------------------------------------------------------------------------------------------------------
     775             :     // std::cout << "Orphan output sockets list:" << std::endl;
     776             :     // for (auto &sck : out_sck_orphans)
     777             :     // {
     778             :     //  auto sck_out_name = std::get<0>(sck)->get_name();
     779             :     //  auto tsk_out_name = std::get<0>(sck)->get_task().get_name();
     780             :     //  auto sck_out_occ  = std::get<1>(sck);
     781             :     //  auto tsk_out_sta  = std::get<2>(sck);
     782             :     //  auto tsk_out_id   = std::get<3>(sck);
     783             :     //  auto sck_out_id   = std::get<4>(sck);
     784             : 
     785             :     //  std::cout << "  " << tsk_out_name << "[" << sck_out_name << "] (stage " << tsk_out_sta << ", " << sck_out_occ
     786             :     //            << " occurrences, tsk id = " << tsk_out_id << ", sck id = " << sck_out_id << ")" << std::endl;
     787             :     // }
     788             : 
     789             :     // std::cout << std::endl << "Detected socket binds:" << std::endl;
     790             :     // for (auto &bind : this->sck_orphan_binds)
     791             :     // {
     792             :     //  auto sck_out_name = std::get<0>(bind.first)->get_name();
     793             :     //  auto tsk_out_name = std::get<0>(bind.first)->get_task().get_name();
     794             :     //  auto tsk_out_sta  = std::get<1>(bind.first);
     795             :     //  auto tsk_out_id   = std::get<2>(bind.first);
     796             :     //  auto sck_out_id   = std::get<3>(bind.first);
     797             :     //  auto sck_out_ubp  = std::get<4>(bind.first);
     798             : 
     799             :     //  auto sck_in_name = std::get<0>(bind.second)->get_name();
     800             :     //  auto tsk_in_name = std::get<0>(bind.second)->get_task().get_name();
     801             :     //  auto tsk_in_sta  = std::get<1>(bind.second);
     802             :     //  auto tsk_in_id   = std::get<2>(bind.second);
     803             :     //  auto sck_in_id   = std::get<3>(bind.second);
     804             : 
     805             :     //  std::cout << "  " << tsk_out_name << "[" << sck_out_name << "] (stage " << tsk_out_sta << ", tsk id = "
     806             :     //                    << tsk_out_id << ", sck id = " << sck_out_id << ", ubp = " << sck_out_ubp << ")"  << " -> "
     807             :     //                    << tsk_in_name  << "[" << sck_in_name  << "] (stage " << tsk_in_sta  << ", tsk id = "
     808             :     //                    << tsk_in_id  << ", sck id = " << sck_in_id  << ")" << std::endl;
     809             :     // }
     810             : 
     811             :     // ----------------------------------------------------------------------------------------------------------------
     812             :     // ------------------------------------------------------------------------------------------------ create adaptors
     813             :     // ----------------------------------------------------------------------------------------------------------------
     814          80 :     auto sck_orphan_binds_cpy = this->sck_orphan_binds;
     815          80 :     module::Adaptor* adp = nullptr;
     816          80 :     std::map<runtime::Socket*, size_t> sck_to_adp_sck_id;
     817         362 :     for (size_t sta = 0; sta < this->stages.size(); sta++)
     818             :     {
     819         282 :         const auto n_threads = this->stages[sta]->get_n_threads();
     820         282 :         std::vector<std::vector<runtime::Task*>> tasks_per_threads = this->stages[sta]->get_tasks_per_threads();
     821             : 
     822             :         // ------------------------------------------------------------------------------------------------------------
     823             :         // ----------------------------------------------------------------------------------------------- pull adaptor
     824             :         // ------------------------------------------------------------------------------------------------------------
     825         282 :         if (sta > 0)
     826             :         {
     827         202 :             assert(adp != nullptr);
     828             :             //                               sck out addr      stage   tsk id  sck id  unbind_pos
     829             :             std::vector<std::pair<std::tuple<runtime::Socket*, size_t, size_t, size_t, size_t>,
     830             :                                   //                               sck in addr       stage   tsk id  sck id  tsk in addr
     831             :                                   std::tuple<runtime::Socket*, size_t, size_t, size_t, runtime::Task*>>>
     832         202 :               sck_orphan_binds_new;
     833             : 
     834         202 :             auto n_threads_prev_sta = this->stages[sta - 1]->get_n_threads();
     835        2300 :             for (size_t t = 0; t < n_threads; t++)
     836             :             {
     837        2098 :                 module::Adaptor* cur_adp = (t == 0) ? adp : adp->clone();
     838        6294 :                 for (auto& t : cur_adp->tasks)
     839        4196 :                     t->set_fast(true);
     840        2098 :                 if (t > 0)
     841             :                 {
     842        1896 :                     this->adaptors[sta - 1].second.push_back(std::unique_ptr<module::Adaptor>(cur_adp));
     843        1896 :                     cur_adp->set_custom_name((n_threads_prev_sta == 1 ? "Adp_1_to_n_" : "Adp_n_to_1_") +
     844        3792 :                                              std::to_string(sta - 1));
     845             :                 }
     846             : 
     847        2098 :                 auto task_pull = n_threads_prev_sta == 1 ? &(*cur_adp)[(int)module::adp::tsk::pull_n]
     848          88 :                                                          : &(*cur_adp)[(int)module::adp::tsk::pull_1];
     849             : 
     850        2098 :                 sck_orphan_binds_new.clear();
     851       12883 :                 for (auto& bind : sck_orphan_binds_cpy)
     852             :                 {
     853       10785 :                     auto tsk_out_sta = std::get<1>(bind.first);
     854       10785 :                     if (tsk_out_sta < sta)
     855             :                     {
     856        5856 :                         auto tsk_in_sta = std::get<1>(bind.second);
     857        5856 :                         if (tsk_in_sta == sta)
     858             :                         {
     859        3131 :                             auto sck_out_ptr = std::get<0>(bind.first);
     860        3131 :                             auto priority = std::get<4>(bind.first);
     861        3131 :                             auto tsk_in_id = std::get<2>(bind.second);
     862        3131 :                             auto sck_in_id = std::get<3>(bind.second);
     863        3131 :                             runtime::Socket* sck_in = nullptr;
     864        3131 :                             runtime::Task* tsk_in = nullptr;
     865        3131 :                             if (std::get<0>(bind.second) != nullptr) // if socket to socket binding
     866        2536 :                                 sck_in = tasks_per_threads[t][tsk_in_id]->sockets[sck_in_id].get();
     867             :                             else // if socket to task binding
     868         595 :                                 tsk_in = tasks_per_threads[t][tsk_in_id];
     869        3131 :                             this->adaptors_binds.push_back(std::make_tuple(
     870        6262 :                               task_pull->sockets[sck_to_adp_sck_id[sck_out_ptr]].get(), sck_in, priority, tsk_in));
     871             :                         }
     872             :                         else
     873        2725 :                             sck_orphan_binds_new.push_back(bind);
     874             :                     }
     875             :                     else
     876        4929 :                         sck_orphan_binds_new.push_back(bind);
     877             :                 }
     878             : 
     879        2098 :                 if (t > 0) this->stages[sta]->all_modules[t].push_back(cur_adp);
     880             :             }
     881         202 :             this->saved_firsts_tasks_id[sta] = this->stages[sta]->firsts_tasks_id;
     882         202 :             sck_orphan_binds_cpy = sck_orphan_binds_new;
     883         202 :         }
     884             : 
     885             :         // ------------------------------------------------------------------------------------------------------------
     886             :         // ----------------------------------------------------------------------------------------------- push adaptor
     887             :         // ------------------------------------------------------------------------------------------------------------
     888         282 :         std::map<runtime::Socket*, size_t> sck_to_adp_sck_id_new;
     889         282 :         if (sta < this->stages.size() - 1)
     890             :         {
     891         202 :             std::vector<size_t> adp_n_elmts;
     892         202 :             std::vector<std::type_index> adp_datatype;
     893         202 :             size_t adp_buffer_size = synchro_buffer_sizes.size() ? synchro_buffer_sizes[sta] : 1;
     894         202 :             bool adp_active_waiting = synchro_active_waiting.size() ? synchro_active_waiting[sta] : false;
     895         202 :             size_t adp_n_frames = 1;
     896             : 
     897             :             // a map to remember if a passed socket points already to the same memory space
     898         202 :             std::map<void*, size_t> fwd_source;
     899             : 
     900         202 :             std::vector<runtime::Socket*> passed_scks_out;
     901         909 :             for (auto& bind : sck_orphan_binds_cpy)
     902             :             {
     903         707 :                 auto tsk_out_sta = std::get<1>(bind.first);
     904         707 :                 if (tsk_out_sta <= sta)
     905             :                 {
     906         458 :                     auto sck_out = std::get<0>(bind.first);
     907         458 :                     if (std::find(passed_scks_out.begin(), passed_scks_out.end(), sck_out) == passed_scks_out.end())
     908             :                     {
     909             :                         // avoid the creation of new adaptor sockets for forward sockets pointing to the same memory
     910             :                         // space
     911         406 :                         auto sck_out_dptr = (void*)sck_out->get_dataptr<int8_t>();
     912         406 :                         assert(sck_out_dptr != nullptr);
     913         406 :                         if (fwd_source.find(sck_out_dptr) == fwd_source.end())
     914             :                         {
     915         349 :                             fwd_source[sck_out_dptr] = 1;
     916         349 :                             adp_n_frames = sck_out->get_task().get_module().get_n_frames();
     917         349 :                             adp_n_elmts.push_back(sck_out->get_n_elmts() / adp_n_frames);
     918         349 :                             adp_datatype.push_back(sck_out->get_datatype());
     919             :                         }
     920         406 :                         passed_scks_out.push_back(sck_out);
     921             :                     }
     922             :                 }
     923             :             }
     924         202 :             passed_scks_out.clear();
     925             : 
     926             :             // allocate the adaptor for the first thread
     927         202 :             if (n_threads == 1)
     928         114 :                 adp = new module::Adaptor_1_to_n(adp_n_elmts, adp_datatype, adp_buffer_size, adp_active_waiting);
     929             :             else
     930          88 :                 adp = new module::Adaptor_n_to_1(adp_n_elmts, adp_datatype, adp_buffer_size, adp_active_waiting);
     931         202 :             adp->set_n_frames(adp_n_frames);
     932             : 
     933        2300 :             for (size_t t = 0; t < n_threads; t++)
     934             :             {
     935        2098 :                 module::Adaptor* cur_adp = (t == 0) ? adp : adp->clone();
     936        2098 :                 cur_adp->set_custom_name((n_threads == 1 ? "Adp_1_to_n_" : "Adp_n_to_1_") + std::to_string(sta));
     937        2098 :                 this->adaptors[sta].first.push_back(std::unique_ptr<module::Adaptor>(cur_adp));
     938        2098 :                 auto task_push = n_threads == 1 ? &(*cur_adp)[(int)module::adp::tsk::push_1]
     939        1984 :                                                 : &(*cur_adp)[(int)module::adp::tsk::push_n];
     940             : 
     941        2098 :                 std::map<void*, size_t> fwd_source;
     942        2098 :                 sck_to_adp_sck_id_new.clear();
     943        2098 :                 size_t adp_sck_id = 0;
     944       10083 :                 for (auto& bind : sck_orphan_binds_cpy) // bind standard task to last adaptor
     945             :                 {
     946        7985 :                     auto tsk_out_sta = std::get<1>(bind.first);
     947             : 
     948        7985 :                     if (tsk_out_sta <= sta)
     949             :                     {
     950        6836 :                         auto sck_out_ptr = std::get<0>(bind.first);
     951        6836 :                         auto sck_out_dptr = (void*)sck_out_ptr->get_dataptr<int8_t>();
     952        6836 :                         assert(sck_out_dptr != nullptr);
     953             : 
     954        6836 :                         if (std::find(passed_scks_out.begin(),
     955             :                                       passed_scks_out.end(),
     956       19707 :                                       sck_out_ptr) == passed_scks_out.end() &&
     957       12871 :                             fwd_source.find(sck_out_dptr) == fwd_source.end()) // <= the latest condition is here to
     958             :                                                                                //    avoid to bind adaptor sockets two
     959             :                                                                                //    times the same memory space
     960             :                                                                                //    (usefull in the case of multiple
     961             :                                                                                //    fwd sockets pointing to the same
     962             :                                                                                //    memory address)
     963             :                         {
     964        4838 :                             if (tsk_out_sta == sta)
     965             :                             {
     966        3283 :                                 auto tsk_out_id = std::get<2>(bind.first);
     967        3283 :                                 auto sck_out_id = std::get<3>(bind.first);
     968        3283 :                                 auto sck_out = tasks_per_threads[t][tsk_out_id]->sockets[sck_out_id];
     969        3283 :                                 auto priority = std::get<4>(bind.first);
     970        3283 :                                 sck_to_adp_sck_id_new[sck_out_ptr] = adp_sck_id;
     971        3283 :                                 this->adaptors_binds.push_back(
     972        3283 :                                   std::make_tuple(sck_out.get(),
     973        3283 :                                                   task_push->sockets[adp_sck_id++].get(),
     974             :                                                   priority,
     975        3283 :                                                   nullptr)); // <= only socket to socket binding is possible here
     976        3283 :                             }
     977             :                             else // if (tsk_out_sta < sta) // bind prev. adaptor to last adaptor
     978             :                             {
     979        1555 :                                 auto n_threads_prev_sta = this->stages[sta - 1]->get_n_threads();
     980        1555 :                                 auto tsk_out_id = (n_threads_prev_sta == 1) ? (size_t)module::adp::tsk::pull_n
     981             :                                                                             : (size_t)module::adp::tsk::pull_1;
     982        1555 :                                 auto sck_out_id = sck_to_adp_sck_id[sck_out_ptr];
     983        1555 :                                 sck_to_adp_sck_id_new[sck_out_ptr] = adp_sck_id;
     984             :                                 auto adp_prev =
     985        1555 :                                   t == 0 ? this->adaptors[sta - 1].first[0] : this->adaptors[sta - 1].second[t - 1];
     986        1555 :                                 auto sck_out = (*adp_prev)[tsk_out_id].sockets[sck_out_id];
     987        1555 :                                 auto priority = std::get<4>(bind.first);
     988        1555 :                                 this->adaptors_binds.push_back(
     989        1555 :                                   std::make_tuple(sck_out.get(),
     990        1555 :                                                   task_push->sockets[adp_sck_id++].get(),
     991             :                                                   priority,
     992        1555 :                                                   nullptr)); // <= only socket to socket binding is possible here
     993        1555 :                             }
     994             : 
     995        4838 :                             fwd_source[sck_out_dptr] = 1; // remember that this memory space has been connected to the
     996             :                                                           // adaptor once
     997        4838 :                             passed_scks_out.push_back(sck_out_ptr);
     998             :                         }
     999             :                     }
    1000             :                 }
    1001        2098 :                 passed_scks_out.clear();
    1002        2098 :             }
    1003         202 :             this->saved_lasts_tasks_id[sta] = this->stages[sta]->lasts_tasks_id;
    1004         202 :         }
    1005         282 :         sck_to_adp_sck_id = sck_to_adp_sck_id_new;
    1006         282 :     }
    1007          80 : }
    1008             : 
    1009             : void
    1010         240 : Pipeline::bind_adaptors()
    1011             : {
    1012         240 :     this->_bind_adaptors(true);
    1013         240 : }
    1014             : 
    1015             : void
    1016         240 : Pipeline::_bind_adaptors(const bool bind_adaptors)
    1017             : {
    1018         240 :     if (!this->bound_adaptors)
    1019             :     {
    1020        1086 :         for (size_t sta = 0; sta < this->stages.size(); sta++)
    1021             :         {
    1022         846 :             const auto n_threads = this->stages[sta]->get_n_threads();
    1023             : 
    1024             :             // --------------------------------------------------------------------------------------------------------
    1025             :             // ------------------------------------------------------------------------------------------- pull adaptor
    1026             :             // --------------------------------------------------------------------------------------------------------
    1027         846 :             if (sta > 0)
    1028             :             {
    1029         606 :                 auto n_threads_prev_sta = this->stages[sta - 1]->get_n_threads();
    1030        6900 :                 for (size_t t = 0; t < n_threads; t++)
    1031             :                 {
    1032             :                     module::Adaptor* cur_adp =
    1033        6294 :                       t > 0 ? adaptors[sta - 1].second[t - 1].get() : adaptors[sta - 1].first[0].get();
    1034             : 
    1035        6294 :                     if (t > 0 || sta == this->stages.size() - 1) // add the adaptor to the current stage
    1036        5916 :                         this->stages[sta]->all_modules[t].push_back(cur_adp);
    1037             : 
    1038        6294 :                     auto task_pull = n_threads_prev_sta == 1 ? &(*cur_adp)[(int)module::adp::tsk::pull_n]
    1039         264 :                                                              : &(*cur_adp)[(int)module::adp::tsk::pull_1];
    1040             : 
    1041        6294 :                     auto ss = this->stages[sta]->sequences[t]->get_contents();
    1042        6294 :                     assert(ss != nullptr);
    1043        6294 :                     ss->tasks.insert(ss->tasks.begin(), task_pull);
    1044        6294 :                     ss->processes.insert(ss->processes.begin(),
    1045           0 :                                          [task_pull]() -> const int*
    1046             :                                          {
    1047           0 :                                              task_pull->exec();
    1048           0 :                                              const int* status = task_pull->sockets.back()->get_dataptr<const int>();
    1049           0 :                                              return status;
    1050             :                                          });
    1051        6294 :                     this->stages[sta]->update_tasks_id(t);
    1052             :                 }
    1053         606 :                 this->stages[sta]->firsts_tasks_id.clear();
    1054         606 :                 this->stages[sta]->firsts_tasks_id.push_back(0);
    1055         606 :                 this->stages[sta]->n_tasks++;
    1056             :             }
    1057             : 
    1058             :             // --------------------------------------------------------------------------------------------------------
    1059             :             // ------------------------------------------------------------------------------------------- push adaptor
    1060             :             // --------------------------------------------------------------------------------------------------------
    1061         846 :             if (sta < this->stages.size() - 1)
    1062             :             {
    1063         606 :                 size_t last_task_id = 0;
    1064        6900 :                 for (size_t t = 0; t < n_threads; t++)
    1065             :                 {
    1066        6294 :                     module::Adaptor* cur_adp = adaptors[sta].first[t].get();
    1067             : 
    1068             :                     // add the adaptor to the current stage
    1069        6294 :                     this->stages[sta]->all_modules[t].push_back(cur_adp);
    1070             : 
    1071        6294 :                     auto task_push = n_threads == 1 ? &(*cur_adp)[(int)module::adp::tsk::push_1]
    1072        5952 :                                                     : &(*cur_adp)[(int)module::adp::tsk::push_n];
    1073             : 
    1074        6294 :                     auto ss = this->stages[sta]->get_last_subsequence(t);
    1075        6294 :                     assert(ss != nullptr);
    1076        6294 :                     ss->tasks.push_back(task_push);
    1077        6294 :                     ss->processes.push_back(
    1078           0 :                       [task_push]() -> const int*
    1079             :                       {
    1080           0 :                           task_push->exec();
    1081           0 :                           const int* status = task_push->sockets.back()->get_dataptr<const int>();
    1082           0 :                           return status;
    1083             :                       });
    1084        6294 :                     last_task_id = ss->tasks_id[ss->tasks_id.size() - 1] + 1;
    1085        6294 :                     ss->tasks_id.push_back(last_task_id);
    1086             :                 }
    1087         606 :                 this->stages[sta]->lasts_tasks_id.clear();
    1088         606 :                 this->stages[sta]->lasts_tasks_id.push_back(last_task_id);
    1089         606 :                 this->stages[sta]->n_tasks++;
    1090             :             }
    1091         846 :             this->stages[sta]->update_firsts_and_lasts_tasks();
    1092             :         }
    1093             : 
    1094             :         // ------------------------------------------------------------------------------------------------------------
    1095             :         // ---------------------------------------------------------------------------------------------- bind adaptors
    1096             :         // ------------------------------------------------------------------------------------------------------------
    1097        1233 :         for (auto& bind : this->sck_orphan_binds)
    1098             :         {
    1099         993 :             auto sck_out = std::get<0>(bind.first);
    1100         993 :             auto sck_in = std::get<0>(bind.second);
    1101         993 :             if (sck_in != nullptr) // if socket to socket unbinding
    1102         915 :                 sck_in->unbind(*sck_out);
    1103             :             else // if socket to task unbinding
    1104             :             {
    1105          78 :                 auto tsk_in = std::get<4>(bind.second);
    1106          78 :                 assert(tsk_in != nullptr);
    1107          78 :                 tsk_in->unbind(*sck_out);
    1108             :             }
    1109             :         }
    1110             : 
    1111         240 :         if (bind_adaptors)
    1112             :         {
    1113       24147 :             for (auto& bind : this->adaptors_binds)
    1114             :             {
    1115       23907 :                 auto sck_out = std::get<0>(bind);
    1116       23907 :                 auto sck_in = std::get<1>(bind);
    1117       23907 :                 auto priority = std::get<2>(bind);
    1118       23907 :                 if (sck_in != nullptr) // if socket to socket binding
    1119       22122 :                     sck_in->_bind(*sck_out, priority);
    1120             :                 else // if socket to task binding
    1121             :                 {
    1122        1785 :                     auto tsk_in = std::get<3>(bind);
    1123        1785 :                     assert(tsk_in != nullptr);
    1124        1785 :                     tsk_in->_bind(*sck_out, priority);
    1125             :                 }
    1126             :             }
    1127             :         }
    1128             : 
    1129         240 :         this->bound_adaptors = true;
    1130             :     }
    1131         240 : }
    1132             : 
    1133             : void
    1134         160 : Pipeline::unbind_adaptors()
    1135             : {
    1136         160 :     this->_unbind_adaptors(true);
    1137         160 : }
    1138             : 
    1139             : void
    1140         320 : Pipeline::_unbind_adaptors(const bool bind_orphans)
    1141             : {
    1142         320 :     if (this->bound_adaptors)
    1143             :     {
    1144        1086 :         for (size_t sta = 0; sta < this->stages.size(); sta++)
    1145             :         {
    1146         846 :             const auto n_threads = this->stages[sta]->get_n_threads();
    1147             : 
    1148             :             // --------------------------------------------------------------------------------------------------------
    1149             :             // ------------------------------------------------------------------------------------------- pull adaptor
    1150             :             // --------------------------------------------------------------------------------------------------------
    1151         846 :             if (sta > 0)
    1152             :             {
    1153        6900 :                 for (size_t t = 0; t < n_threads; t++)
    1154             :                 {
    1155        6294 :                     if (t > 0 || sta == this->stages.size() - 1) // rm the adaptor to the current stage
    1156        5916 :                         this->stages[sta]->all_modules[t].pop_back();
    1157             : 
    1158        6294 :                     auto ss = this->stages[sta]->sequences[t]->get_contents();
    1159        6294 :                     assert(ss != nullptr);
    1160        6294 :                     ss->tasks.erase(ss->tasks.begin());
    1161        6294 :                     ss->processes.erase(ss->processes.begin());
    1162        6294 :                     this->stages[sta]->update_tasks_id(t);
    1163             :                 }
    1164         606 :                 this->stages[sta]->firsts_tasks_id = this->saved_firsts_tasks_id[sta];
    1165         606 :                 this->stages[sta]->n_tasks--;
    1166             :             }
    1167             : 
    1168             :             // --------------------------------------------------------------------------------------------------------
    1169             :             // ------------------------------------------------------------------------------------------- push adaptor
    1170             :             // --------------------------------------------------------------------------------------------------------
    1171         846 :             if (sta < this->stages.size() - 1)
    1172             :             {
    1173        6900 :                 for (size_t t = 0; t < n_threads; t++)
    1174             :                 {
    1175             :                     // rm the adaptor to the current stage
    1176        6294 :                     this->stages[sta]->all_modules[t].pop_back();
    1177             : 
    1178        6294 :                     auto ss = this->stages[sta]->get_last_subsequence(t);
    1179        6294 :                     assert(ss != nullptr);
    1180        6294 :                     ss->tasks.pop_back();
    1181        6294 :                     ss->processes.pop_back();
    1182        6294 :                     ss->tasks_id.pop_back();
    1183             :                 }
    1184         606 :                 this->stages[sta]->lasts_tasks_id = this->saved_lasts_tasks_id[sta];
    1185         606 :                 this->stages[sta]->n_tasks--;
    1186             :             }
    1187         846 :             this->stages[sta]->update_firsts_and_lasts_tasks();
    1188             :         }
    1189             : 
    1190             :         // ------------------------------------------------------------------------------------------------------------
    1191             :         // -------------------------------------------------------------------------------------------- unbind adaptors
    1192             :         // ------------------------------------------------------------------------------------------------------------
    1193       24147 :         for (auto& bind : this->adaptors_binds)
    1194             :         {
    1195       23907 :             auto sck_out = std::get<0>(bind);
    1196       23907 :             auto sck_in = std::get<1>(bind);
    1197       23907 :             if (sck_in != nullptr) // if socket to socket unbinding
    1198       22122 :                 sck_in->unbind(*sck_out);
    1199             :             else // if socket to task unbinding
    1200             :             {
    1201        1785 :                 auto tsk_in = std::get<3>(bind);
    1202        1785 :                 assert(tsk_in != nullptr);
    1203        1785 :                 tsk_in->unbind(*sck_out);
    1204             :             }
    1205             :         }
    1206             : 
    1207         240 :         if (bind_orphans)
    1208             :         {
    1209         411 :             for (auto& bind : this->sck_orphan_binds)
    1210             :             {
    1211         331 :                 auto sck_out = std::get<0>(bind.first);
    1212         331 :                 auto priority = std::get<4>(bind.first);
    1213         331 :                 auto sck_in = std::get<0>(bind.second);
    1214         331 :                 if (sck_in != nullptr) // if socket to socket binding
    1215         305 :                     sck_in->_bind(*sck_out, priority);
    1216             :                 else // if socket to task binding
    1217             :                 {
    1218          26 :                     auto tsk_in = std::get<4>(bind.second);
    1219          26 :                     assert(tsk_in != nullptr);
    1220          26 :                     tsk_in->_bind(*sck_out, priority);
    1221             :                 }
    1222             :             }
    1223             :         }
    1224             : 
    1225         240 :         this->bound_adaptors = false;
    1226             :     }
    1227         320 : }
    1228             : 
    1229             : void
    1230           0 : Pipeline::exec(const std::vector<std::function<bool(const std::vector<const int*>&)>>& stop_conditions)
    1231             : {
    1232           0 :     if (stop_conditions.size() != this->stages.size() && stop_conditions.size() != 1)
    1233             :     {
    1234           0 :         std::stringstream message;
    1235           0 :         message << "'stop_conditions.size()' has to be equal to 'stages.size()' or to 1 ('stop_conditions.size()' = "
    1236           0 :                 << stop_conditions.size() << ", 'stages.size()' = " << stages.size() << ").";
    1237           0 :         throw tools::invalid_argument(__FILE__, __LINE__, __func__, message.str());
    1238           0 :     }
    1239             : 
    1240           0 :     if (!this->bound_adaptors)
    1241             :     {
    1242           0 :         std::stringstream message;
    1243           0 :         message << "'bound_adaptors' has to be true to execute the pipeline.";
    1244           0 :         throw tools::runtime_error(__FILE__, __LINE__, __func__, message.str());
    1245           0 :     }
    1246             : 
    1247             :     // ----------------------------------------------------------------------------------------------------------------
    1248           0 :     auto& stages = this->stages;
    1249           0 :     std::vector<const std::function<bool(const std::vector<const int*>&)>*> stop_condition_vec(stages.size() - 1,
    1250           0 :                                                                                                nullptr);
    1251           0 :     if (stop_conditions.size() == stages.size())
    1252           0 :         for (size_t s = 0; s < stages.size() - 1; s++)
    1253           0 :             stop_condition_vec[s] = &stop_conditions[s];
    1254             : 
    1255           0 :     std::function<void(const size_t)> func_exec = [&stages, &stop_condition_vec](const size_t tid)
    1256             :     {
    1257           0 :         size_t s = tid;
    1258           0 :         if (stop_condition_vec[s])
    1259           0 :             stages[s]->exec(*(stop_condition_vec[s]));
    1260             :         else
    1261           0 :             stages[s]->exec();
    1262             : 
    1263             :         // send the signal to stop the next stage
    1264           0 :         const auto& tasks = stages[s + 1]->get_tasks_per_threads();
    1265           0 :         for (size_t th = 0; th < tasks.size(); th++)
    1266           0 :             for (size_t ta = 0; ta < tasks[th].size(); ta++)
    1267             :             {
    1268           0 :                 auto m = dynamic_cast<module::Adaptor*>(&tasks[th][ta]->get_module());
    1269           0 :                 if (m != nullptr)
    1270           0 :                     if (tasks[th][ta]->get_name() == "pull_n" || tasks[th][ta]->get_name() == "pull_1")
    1271           0 :                         m->cancel_waiting();
    1272             :             }
    1273           0 :     };
    1274             : 
    1275           0 :     this->thread_pool->run(func_exec, true);
    1276             : 
    1277           0 :     stages[stages.size() - 1]->exec(stop_conditions[stop_conditions.size() - 1]);
    1278             : 
    1279             :     // stop all the stages before
    1280           0 :     for (size_t notify_s = 0; notify_s < stages.size() - 1; notify_s++)
    1281           0 :         for (auto& m : stages[notify_s]->get_modules<tools::Interface_waiting>())
    1282           0 :             m->cancel_waiting();
    1283             : 
    1284           0 :     this->thread_pool->wait();
    1285           0 :     this->thread_pool->unset_func_exec();
    1286             :     // ----------------------------------------------------------------------------------------------------------------
    1287             : 
    1288             :     // this is NOT made in the tools::Sequence::exec() to correctly flush the pipeline before restoring buffers
    1289             :     // initial configuration
    1290           0 :     for (auto& stage : this->stages)
    1291           0 :         if (stage->is_no_copy_mode())
    1292             :         {
    1293           0 :             stage->reset_no_copy_mode();
    1294           0 :             stage->gen_processes(false);
    1295             :         }
    1296             : 
    1297           0 :     for (auto& padps : this->adaptors)
    1298             :     {
    1299           0 :         for (auto& adp : padps.first)
    1300           0 :             adp->reset();
    1301           0 :         for (auto& adp : padps.second)
    1302           0 :             adp->reset();
    1303             :     }
    1304           0 : }
    1305             : 
    1306             : void
    1307          80 : Pipeline::exec(const std::vector<std::function<bool()>>& stop_conditions)
    1308             : {
    1309          80 :     if (stop_conditions.size() != this->stages.size() && stop_conditions.size() != 1)
    1310             :     {
    1311           0 :         std::stringstream message;
    1312           0 :         message << "'stop_conditions.size()' has to be equal to 'stages.size()' or to 1 ('stop_conditions.size()' = "
    1313           0 :                 << stop_conditions.size() << ", 'stages.size()' = " << stages.size() << ").";
    1314           0 :         throw tools::invalid_argument(__FILE__, __LINE__, __func__, message.str());
    1315           0 :     }
    1316             : 
    1317          80 :     if (!this->bound_adaptors)
    1318             :     {
    1319           0 :         std::stringstream message;
    1320           0 :         message << "'bound_adaptors' has to be true to execute the pipeline.";
    1321           0 :         throw tools::runtime_error(__FILE__, __LINE__, __func__, message.str());
    1322           0 :     }
    1323             : 
    1324             :     // ----------------------------------------------------------------------------------------------------------------
    1325          80 :     auto& stages = this->stages;
    1326          80 :     std::vector<const std::function<bool()>*> stop_condition_vec(stages.size() - 1, nullptr);
    1327          80 :     if (stop_conditions.size() == stages.size())
    1328           4 :         for (size_t s = 0; s < stages.size() - 1; s++)
    1329           0 :             stop_condition_vec[s] = &stop_conditions[s];
    1330             : 
    1331         577 :     std::function<void(const size_t)> func_exec = [&stages, &stop_condition_vec](const size_t tid)
    1332             :     {
    1333         189 :         size_t s = tid;
    1334         189 :         if (stop_condition_vec[s])
    1335           0 :             stages[s]->exec(*(stop_condition_vec[s]));
    1336             :         else
    1337         191 :             stages[s]->exec();
    1338             : 
    1339             :         // send the signal to stop the next stage
    1340         197 :         const auto& tasks = stages[s + 1]->get_tasks_per_threads();
    1341        2297 :         for (size_t th = 0; th < tasks.size(); th++)
    1342       15584 :             for (size_t ta = 0; ta < tasks[th].size(); ta++)
    1343             :             {
    1344       13461 :                 auto m = dynamic_cast<module::Adaptor*>(&tasks[th][ta]->get_module());
    1345       13487 :                 if (m != nullptr)
    1346        4113 :                     if (tasks[th][ta]->get_name() == "pull_n" || tasks[th][ta]->get_name() == "pull_1")
    1347        2097 :                         m->cancel_waiting();
    1348             :             }
    1349         280 :     };
    1350             : 
    1351          80 :     this->thread_pool->run(func_exec, true);
    1352          80 :     stages[stages.size() - 1]->exec(stop_conditions[stop_conditions.size() - 1]);
    1353             : 
    1354             :     // stop all the stages before
    1355         282 :     for (size_t notify_s = 0; notify_s < stages.size() - 1; notify_s++)
    1356        6092 :         for (auto& m : stages[notify_s]->get_modules<tools::Interface_waiting>())
    1357        6092 :             m->cancel_waiting();
    1358             : 
    1359          80 :     this->thread_pool->wait();
    1360          80 :     this->thread_pool->unset_func_exec();
    1361             :     // ----------------------------------------------------------------------------------------------------------------
    1362             : 
    1363             :     // this is NOT made in the tools::Sequence::exec() to correctly flush the pipeline before restoring buffers
    1364             :     // initial configuration
    1365         362 :     for (auto& stage : this->stages)
    1366         282 :         if (stage->is_no_copy_mode())
    1367             :         {
    1368         282 :             stage->reset_no_copy_mode();
    1369         282 :             stage->gen_processes(false);
    1370             :         }
    1371             : 
    1372         282 :     for (auto& padps : this->adaptors)
    1373             :     {
    1374        2300 :         for (auto& adp : padps.first)
    1375        2098 :             adp->reset();
    1376        2098 :         for (auto& adp : padps.second)
    1377        1896 :             adp->reset();
    1378             :     }
    1379          80 : }
    1380             : 
    1381             : void
    1382           0 : Pipeline::exec(std::function<bool(const std::vector<const int*>&)> stop_condition)
    1383             : {
    1384           0 :     this->exec(std::vector<std::function<bool(const std::vector<const int*>&)>>(1, stop_condition));
    1385           0 : }
    1386             : 
    1387             : void
    1388          80 : Pipeline::exec(std::function<bool()> stop_condition)
    1389             : {
    1390          80 :     this->exec(std::vector<std::function<bool()>>(1, stop_condition));
    1391          80 : }
    1392             : 
    1393             : void
    1394          11 : Pipeline::exec()
    1395             : {
    1396        6873 :     this->exec([]() { return false; });
    1397          11 : }
    1398             : 
    1399             : std::vector<std::vector<module::Module*>>
    1400           0 : Pipeline::get_modules_per_threads() const
    1401             : {
    1402           0 :     std::vector<std::vector<module::Module*>> modules_per_threads;
    1403           0 :     for (auto& stage : this->stages)
    1404             :     {
    1405           0 :         auto modules_per_threads_add = stage->get_modules_per_threads();
    1406           0 :         if (modules_per_threads_add.size() > modules_per_threads.size())
    1407           0 :             modules_per_threads.resize(modules_per_threads_add.size());
    1408             : 
    1409           0 :         for (size_t t = 0; t < modules_per_threads_add.size(); t++)
    1410           0 :             modules_per_threads[t].insert(
    1411           0 :               modules_per_threads[t].end(), modules_per_threads_add[t].begin(), modules_per_threads_add[t].end());
    1412           0 :     }
    1413           0 :     return modules_per_threads;
    1414           0 : }
    1415             : 
    1416             : std::vector<std::vector<module::Module*>>
    1417           0 : Pipeline::get_modules_per_types() const
    1418             : {
    1419           0 :     std::vector<std::vector<module::Module*>> modules_per_types;
    1420           0 :     for (auto& stage : this->stages)
    1421             :     {
    1422           0 :         auto modules_per_types_add = stage->get_modules_per_types();
    1423           0 :         modules_per_types.insert(modules_per_types.end(), modules_per_types_add.begin(), modules_per_types_add.end());
    1424           0 :     }
    1425           0 :     return modules_per_types;
    1426           0 : }
    1427             : 
    1428             : std::vector<std::vector<module::Module*>>
    1429           0 : Pipeline::get_original_modules() const
    1430             : {
    1431           0 :     return this->original_sequence.get_modules_per_types();
    1432             : }
    1433             : 
    1434             : std::vector<std::vector<runtime::Task*>>
    1435          80 : Pipeline::get_tasks_per_threads() const
    1436             : {
    1437          80 :     std::vector<std::vector<runtime::Task*>> tasks_per_threads;
    1438         362 :     for (auto& stage : this->stages)
    1439             :     {
    1440         282 :         auto tasks_per_threads_add = stage->get_tasks_per_threads();
    1441         282 :         if (tasks_per_threads_add.size() > tasks_per_threads.size())
    1442         147 :             tasks_per_threads.resize(tasks_per_threads_add.size());
    1443             : 
    1444        2470 :         for (size_t t = 0; t < tasks_per_threads_add.size(); t++)
    1445        8752 :             tasks_per_threads[t].insert(
    1446        8752 :               tasks_per_threads[t].end(), tasks_per_threads_add[t].begin(), tasks_per_threads_add[t].end());
    1447         282 :     }
    1448          80 :     return tasks_per_threads;
    1449           0 : }
    1450             : 
    1451             : std::vector<std::vector<runtime::Task*>>
    1452           0 : Pipeline::get_tasks_per_types() const
    1453             : {
    1454           0 :     std::vector<std::vector<runtime::Task*>> tasks_per_types;
    1455           0 :     for (auto& stage : this->stages)
    1456             :     {
    1457           0 :         auto tasks_per_types_add = stage->get_tasks_per_types();
    1458           0 :         tasks_per_types.insert(tasks_per_types.end(), tasks_per_types_add.begin(), tasks_per_types_add.end());
    1459           0 :     }
    1460           0 :     return tasks_per_types;
    1461           0 : }
    1462             : 
    1463             : void
    1464           5 : Pipeline::export_dot(std::ostream& stream) const
    1465             : {
    1466             :     std::function<void(tools::Digraph_node<Sub_sequence>*,
    1467             :                        const size_t,
    1468             :                        const std::string&,
    1469             :                        std::ostream&,
    1470             :                        std::vector<tools::Digraph_node<Sub_sequence>*>&)>
    1471          22 :       export_dot_subsequences_recursive = [&export_dot_subsequences_recursive,
    1472             :                                            this](tools::Digraph_node<Sub_sequence>* cur_node,
    1473             :                                                  const size_t sta,
    1474             :                                                  const std::string& tab,
    1475             :                                                  std::ostream& stream,
    1476          22 :                                                  std::vector<tools::Digraph_node<Sub_sequence>*>& already_parsed_nodes)
    1477             :     {
    1478          44 :         if (cur_node != nullptr &&
    1479          44 :             std::find(already_parsed_nodes.begin(), already_parsed_nodes.end(), cur_node) == already_parsed_nodes.end())
    1480             :         {
    1481          22 :             already_parsed_nodes.push_back(cur_node);
    1482          44 :             this->stages[sta]->export_dot_subsequence(cur_node->get_c()->tasks,
    1483          22 :                                                       cur_node->get_c()->tasks_id,
    1484          22 :                                                       cur_node->get_c()->type,
    1485          44 :                                                       "Sub-sequence " + std::to_string(cur_node->get_c()->id) +
    1486          88 :                                                         " (depth = " + std::to_string(cur_node->get_depth()) + ")",
    1487             :                                                       tab,
    1488             :                                                       stream);
    1489             : 
    1490          22 :             for (auto c : cur_node->get_children())
    1491           0 :                 export_dot_subsequences_recursive(c, sta, tab, stream, already_parsed_nodes);
    1492             :         }
    1493          27 :     };
    1494             : 
    1495             :     std::function<void(tools::Digraph_node<Sub_sequence>*,
    1496             :                        const size_t,
    1497             :                        const std::string&,
    1498             :                        std::ostream&,
    1499             :                        std::vector<tools::Digraph_node<Sub_sequence>*>&)>
    1500             :       export_dot_connections_recursive =
    1501          22 :         [&export_dot_connections_recursive, this](tools::Digraph_node<Sub_sequence>* cur_node,
    1502             :                                                   const size_t sta,
    1503             :                                                   const std::string& tab,
    1504             :                                                   std::ostream& stream,
    1505          22 :                                                   std::vector<tools::Digraph_node<Sub_sequence>*>& already_parsed_nodes)
    1506             :     {
    1507          44 :         if (cur_node != nullptr &&
    1508          44 :             std::find(already_parsed_nodes.begin(), already_parsed_nodes.end(), cur_node) == already_parsed_nodes.end())
    1509             :         {
    1510          22 :             already_parsed_nodes.push_back(cur_node);
    1511          22 :             this->stages[sta]->export_dot_connections(cur_node->get_c()->tasks, tab, stream);
    1512             : 
    1513          22 :             for (auto c : cur_node->get_children())
    1514           0 :                 export_dot_connections_recursive(c, sta, tab, stream, already_parsed_nodes);
    1515             :         }
    1516          27 :     };
    1517             : 
    1518           5 :     std::string tab = "\t";
    1519           5 :     stream << "digraph Pipeline {" << std::endl;
    1520           5 :     stream << tab << "compound=true;" << std::endl;
    1521             : 
    1522          27 :     for (size_t sta = 0; sta < this->stages.size(); sta++)
    1523             :     {
    1524          22 :         std::vector<tools::Digraph_node<Sub_sequence>*> already_parsed_nodes;
    1525          22 :         const auto n_threads = this->stages[sta]->get_n_threads();
    1526          22 :         stream << tab << "subgraph \"cluster_Stage " << sta << "\" {" << std::endl;
    1527          22 :         stream << tab << tab << "node [style=filled];" << std::endl;
    1528          22 :         export_dot_subsequences_recursive(this->stages[sta]->sequences[0], sta, tab, stream, already_parsed_nodes);
    1529          22 :         stream << tab << tab << "label=\"Pipeline stage " << sta << " (" << n_threads << " thread(s))\";" << std::endl;
    1530          22 :         std::string color = "blue";
    1531          22 :         stream << tab << tab << "color=" << color << ";" << std::endl;
    1532          22 :         stream << tab << "}" << std::endl;
    1533          22 :     }
    1534             : 
    1535          27 :     for (size_t sta = 0; sta < this->stages.size(); sta++)
    1536             :     {
    1537          22 :         std::vector<tools::Digraph_node<Sub_sequence>*> already_parsed_nodes;
    1538          22 :         export_dot_connections_recursive(this->stages[sta]->sequences[0], sta, tab, stream, already_parsed_nodes);
    1539          22 :         if (this->bound_adaptors)
    1540             :         {
    1541          22 :             if (sta > 0)
    1542             :             {
    1543          17 :                 auto tsk1 = this->stages[sta - 1]->get_lasts_tasks()[0].back();
    1544          17 :                 auto tsk2 = this->stages[sta + 0]->get_firsts_tasks()[0][0];
    1545             : 
    1546          17 :                 auto sck1 = tsk1->sockets[0];
    1547          17 :                 auto sck2 = tsk2->sockets[0];
    1548             : 
    1549          17 :                 stream << tab << "\"" << +sck1.get() << "\" -> \"" << +sck2.get() << "\" [ltail=\"cluster_"
    1550          17 :                        << +&tsk1->get_module() << "_" << +tsk1 << "\" lhead=\"cluster_" << +&tsk2->get_module() << "_"
    1551          17 :                        << +tsk2 << "\" color=\"green\" style=\"dashed\"];" << std::endl;
    1552          17 :             }
    1553             :         }
    1554          22 :     }
    1555             : 
    1556           5 :     stream << "}" << std::endl;
    1557           5 : }
    1558             : 
    1559             : bool
    1560         160 : Pipeline::is_bound_adaptors() const
    1561             : {
    1562         160 :     return this->bound_adaptors;
    1563             : }
    1564             : 
    1565             : void
    1566           0 : Pipeline::set_auto_stop(const bool auto_stop)
    1567             : {
    1568           0 :     this->auto_stop = auto_stop;
    1569           0 :     for (auto stage : this->stages)
    1570           0 :         stage->set_auto_stop(auto_stop);
    1571           0 : }
    1572             : 
    1573             : bool
    1574           0 : Pipeline::is_auto_stop() const
    1575             : {
    1576           0 :     return this->auto_stop;
    1577             : }
    1578             : 
    1579             : size_t
    1580           0 : Pipeline::get_n_frames() const
    1581             : {
    1582           0 :     const auto n_frames = this->stages[0]->get_n_frames();
    1583             : 
    1584           0 :     for (auto& sta : this->stages)
    1585           0 :         if (sta->get_n_frames() != n_frames)
    1586             :         {
    1587           0 :             std::stringstream message;
    1588           0 :             message << "All the stages/sequences do not have the same 'n_frames' value ('sta->get_n_frames()' = "
    1589           0 :                     << sta->get_n_frames() << ", 'n_frames' = " << n_frames << ").";
    1590           0 :             throw tools::runtime_error(__FILE__, __LINE__, __func__, message.str());
    1591           0 :         }
    1592             : 
    1593           0 :     return n_frames;
    1594             : }
    1595             : 
    1596             : void
    1597         160 : Pipeline::set_n_frames(const size_t n_frames)
    1598             : {
    1599         160 :     const auto save_bound_adaptors = this->is_bound_adaptors();
    1600         160 :     if (!save_bound_adaptors) this->bind_adaptors();
    1601         160 :     this->_unbind_adaptors(false);
    1602             : 
    1603             :     // set the new "n_frames" val in the sequences
    1604         160 :     std::vector<std::vector<std::pair<runtime::Socket*, runtime::Socket*>>> unbind_sockets(this->stages.size());
    1605         160 :     std::vector<std::vector<std::pair<runtime::Task*, runtime::Socket*>>> unbind_tasks(this->stages.size());
    1606         160 :     std::vector<bool> skip(this->stages.size());
    1607         724 :     for (size_t s = 0; s < this->stages.size(); s++)
    1608         564 :         skip[s] = this->stages[s]->get_n_frames() == n_frames;
    1609         724 :     for (size_t s = 0; s < this->stages.size(); s++)
    1610         564 :         if (!skip[s]) this->stages[s]->_set_n_frames_unbind(unbind_sockets[s], unbind_tasks[s]);
    1611         724 :     for (size_t s = 0; s < this->stages.size(); s++)
    1612         564 :         if (!skip[s]) this->stages[s]->_set_n_frames(n_frames);
    1613         724 :     for (size_t s = 0; s < this->stages.size(); s++)
    1614         564 :         if (!skip[s]) this->stages[s]->_set_n_frames_rebind(unbind_sockets[s], unbind_tasks[s]);
    1615             : 
    1616             :     // set the new "n_frames" val in the adaptors
    1617         564 :     for (auto& adps : this->adaptors)
    1618             :     {
    1619        4600 :         for (auto& adp : adps.first)
    1620        4196 :             adp->set_n_frames(n_frames);
    1621        4196 :         for (auto& adp : adps.second)
    1622        3792 :             adp->set_n_frames(n_frames);
    1623             :     }
    1624             : 
    1625             :     // bind orphans to complete the unbind of the adaptors
    1626         822 :     for (auto& bind : this->sck_orphan_binds)
    1627             :     {
    1628         662 :         auto sck_out = std::get<0>(bind.first);
    1629         662 :         auto priority = std::get<4>(bind.first);
    1630         662 :         auto sck_in = std::get<0>(bind.second);
    1631         662 :         if (sck_in != nullptr)
    1632         610 :             sck_in->_bind(*sck_out, priority);
    1633             :         else
    1634             :         {
    1635          52 :             auto tsk_in = std::get<4>(bind.second);
    1636          52 :             assert(tsk_in != nullptr);
    1637          52 :             tsk_in->_bind(*sck_out, priority);
    1638             :         }
    1639             :     }
    1640             : 
    1641         160 :     if (save_bound_adaptors) this->bind_adaptors();
    1642         160 : }

Generated by: LCOV version 1.14