LCOV - code coverage report
Current view: top level - src/Runtime/Pipeline - Pipeline.cpp (source / functions) Hit Total Coverage
Test: streampu_clean.info Lines: 557 747 74.6 %
Date: 2025-07-17 17:04:07 Functions: 32 49 65.3 %

          Line data    Source code
       1             : #include <cassert>
       2             : #include <fstream>
       3             : #include <thread>
       4             : #include <tuple>
       5             : #include <utility>
       6             : 
       7             : #include "Module/Stateful/Adaptor/Adaptor_m_to_n.hpp"
       8             : #include "Runtime/Pipeline/Pipeline.hpp"
       9             : #include "Tools/Exception/exception.hpp"
      10             : #include "Tools/Interface/Interface_waiting.hpp"
      11             : #include "Tools/Thread/Thread_pinning/Thread_pinning_utils.hpp"
      12             : #include "Tools/Thread/Thread_pool/Standard/Thread_pool_standard.hpp"
      13             : 
      14             : using namespace spu;
      15             : using namespace spu::runtime;
      16             : 
      17             : // Pipeline
      18             : // ::Pipeline(const runtime::Task &first,
      19             : //            const runtime::Task &last,
      20             : //            const std::vector<std::pair<std::vector<const runtime::Task*>, std::vector<const runtime::Task*>>>
      21             : //            &sep_stages, const std::vector<size_t> &n_threads, const std::vector<size_t> &synchro_buffer_sizes, const
      22             : //            std::vector<bool> &synchro_active_waiting, const std::vector<bool> &thread_pinning, const
      23             : //            std::vector<std::vector<size_t>> &puids)
      24             : // : original_sequence(first, last, 1),
      25             : //   stages(sep_stages.size()),
      26             : //   adaptors(sep_stages.size() -1),
      27             : //   saved_firsts_tasks_id(sep_stages.size()),
      28             : //   saved_lasts_tasks_id(sep_stages.size()),
      29             : //   bound_adaptors(false)
      30             : // {
      31             : //      this->init<const runtime::Task>(first,
      32             : //                                      &last,
      33             : //                                      sep_stages,
      34             : //                                      n_threads,
      35             : //                                      synchro_buffer_sizes,
      36             : //                                      synchro_active_waiting,
      37             : //                                      thread_pinning,
      38             : //                                      puids);
      39             : // }
      40             : 
      41             : // Pipeline
      42             : // ::Pipeline(const runtime::Task &first,
      43             : //            const std::vector<std::pair<std::vector<const runtime::Task*>, std::vector<const runtime::Task*>>>
      44             : //            &sep_stages, const std::vector<size_t> &n_threads, const std::vector<size_t> &synchro_buffer_sizes, const
      45             : //            std::vector<bool> &synchro_active_waiting, const std::vector<bool> &thread_pinning, const
      46             : //            std::vector<std::vector<size_t>> &puids)
      47             : // : original_sequence(first, 1),
      48             : //   stages(sep_stages.size()),
      49             : //   adaptors(sep_stages.size() -1),
      50             : //   saved_firsts_tasks_id(sep_stages.size()),
      51             : //   saved_lasts_tasks_id(sep_stages.size()),
      52             : //   bound_adaptors(false)
      53             : // {
      54             : //      const runtime::Task* last = nullptr;
      55             : //      this->init<const runtime::Task>(first,
      56             : //                                      last,
      57             : //                                      sep_stages,
      58             : //                                      n_threads,
      59             : //                                      synchro_buffer_sizes,
      60             : //                                      synchro_active_waiting,
      61             : //                                      thread_pinning,
      62             : //                                      puids);
      63             : // }
      64             : 
      65          19 : Pipeline
      66             : ::Pipeline(const std::vector<runtime::Task*> &firsts,
      67             :            const std::vector<runtime::Task*> &lasts,
      68             :            const std::vector<std::tuple<std::vector<runtime::Task*>, std::vector<runtime::Task*>, std::vector<runtime::Task*>>> &sep_stages,
      69             :            const std::vector<size_t> &n_threads,
      70             :            const std::vector<size_t> &synchro_buffer_sizes,
      71             :            const std::vector<bool> &synchro_active_waiting,
      72             :            const std::vector<bool> &thread_pinning,
      73             :            const std::vector<std::vector<size_t>> &puids/*,
      74          19 :            const std::vector<bool> &tasks_inplace*/)
      75          19 : : original_sequence(firsts, lasts, 1),
      76          19 :   stages(sep_stages.size()),
      77          19 :   adaptors(sep_stages.size() -1),
      78          19 :   saved_firsts_tasks_id(sep_stages.size()),
      79          19 :   saved_lasts_tasks_id(sep_stages.size()),
      80          19 :   bound_adaptors(false),
      81          57 :   auto_stop(true)
      82             : {
      83          19 :     this->init<runtime::Task>(
      84             :       firsts, lasts, sep_stages, n_threads, synchro_buffer_sizes, synchro_active_waiting, thread_pinning, puids
      85             :       /*, tasks_inplace*/);
      86          19 : }
      87             : 
      88          28 : Pipeline
      89             : ::Pipeline(const std::vector<runtime::Task*> &firsts,
      90             :            const std::vector<runtime::Task*> &lasts,
      91             :            const std::vector<std::pair<std::vector<runtime::Task*>, std::vector<runtime::Task*>>> &sep_stages,
      92             :            const std::vector<size_t> &n_threads,
      93             :            const std::vector<size_t> &synchro_buffer_sizes,
      94             :            const std::vector<bool> &synchro_active_waiting,
      95             :            const std::vector<bool> &thread_pinning,
      96             :            const std::vector<std::vector<size_t>> &puids/*,
      97          28 :            const std::vector<bool> &tasks_inplace*/)
      98          28 : : original_sequence(firsts, lasts, 1),
      99          28 :   stages(sep_stages.size()),
     100          28 :   adaptors(sep_stages.size() -1),
     101          28 :   saved_firsts_tasks_id(sep_stages.size()),
     102          28 :   saved_lasts_tasks_id(sep_stages.size()),
     103          28 :   bound_adaptors(false),
     104          84 :   auto_stop(true)
     105             : {
     106             :     std::vector<std::tuple<std::vector<runtime::Task*>, std::vector<runtime::Task*>, std::vector<runtime::Task*>>>
     107          28 :       sep_stages_bis;
     108         128 :     for (auto& sep_stage : sep_stages)
     109         100 :         sep_stages_bis.push_back(std::make_tuple(sep_stage.first, sep_stage.second, std::vector<runtime::Task*>()));
     110             : 
     111          28 :     this->init<runtime::Task>(firsts,
     112             :                                   lasts,
     113             :                                   sep_stages_bis,
     114             :                                   n_threads,
     115             :                                   synchro_buffer_sizes,
     116             :                                   synchro_active_waiting,
     117             :                                   thread_pinning, puids/*,
     118             :                                   tasks_inplace*/);
     119          28 : }
     120             : 
     121          19 : Pipeline
     122             : ::Pipeline(const std::vector<runtime::Task*> &firsts,
     123             :            const std::vector<std::tuple<std::vector<runtime::Task*>, std::vector<runtime::Task*>, std::vector<runtime::Task*>>> &sep_stages,
     124             :            const std::vector<size_t> &n_threads,
     125             :            const std::vector<size_t> &synchro_buffer_sizes,
     126             :            const std::vector<bool> &synchro_active_waiting,
     127             :            const std::vector<bool> &thread_pinning,
     128             :            const std::vector<std::vector<size_t>> &puids/*,
     129          19 :            const std::vector<bool> &tasks_inplace*/)
     130             : : Pipeline(firsts,
     131             :            {},
     132             :            sep_stages,
     133             :            n_threads,
     134             :            synchro_buffer_sizes,
     135             :            synchro_active_waiting,
     136             :            thread_pinning, puids/*,
     137          19 :            tasks_inplace*/)
     138             : {
     139          19 : }
     140             : 
     141          28 : Pipeline
     142             : ::Pipeline(const std::vector<runtime::Task*> &firsts,
     143             :            const std::vector<std::pair<std::vector<runtime::Task*>, std::vector<runtime::Task*>>> &sep_stages,
     144             :            const std::vector<size_t> &n_threads,
     145             :            const std::vector<size_t> &synchro_buffer_sizes,
     146             :            const std::vector<bool> &synchro_active_waiting,
     147             :            const std::vector<bool> &thread_pinning,
     148             :            const std::vector<std::vector<size_t>> &puids/*,
     149          28 :            const std::vector<bool> &tasks_inplace*/)
     150             : : Pipeline(firsts,
     151             :            {},
     152             :            sep_stages,
     153             :            n_threads,
     154             :            synchro_buffer_sizes,
     155             :            synchro_active_waiting,
     156             :            thread_pinning, puids/*,
     157          28 :            tasks_inplace*/)
     158             : {
     159          28 : }
     160             : 
     161           0 : Pipeline
     162             : ::Pipeline(runtime::Task &first,
     163             :            runtime::Task &last,
     164             :            const std::vector<std::pair<std::vector<runtime::Task*>, std::vector<runtime::Task*>>> &sep_stages,
     165             :            const std::vector<size_t> &n_threads,
     166             :            const std::vector<size_t> &synchro_buffer_sizes,
     167             :            const std::vector<bool> &synchro_active_waiting,
     168             :            const std::vector<bool> &thread_pinning,
     169             :            const std::vector<std::vector<size_t>> &puids/*,
     170           0 :            const std::vector<bool> &tasks_inplace*/)
     171             : : Pipeline({&first},
     172             :            {&last},
     173             :            sep_stages,
     174             :            n_threads,
     175             :            synchro_buffer_sizes,
     176             :            synchro_active_waiting,
     177             :            thread_pinning, puids/*,
     178           0 :            tasks_inplace*/)
     179             : {
     180           0 : }
     181             : 
     182          28 : Pipeline
     183             : ::Pipeline(runtime::Task &first,
     184             :            const std::vector<std::pair<std::vector<runtime::Task*>, std::vector<runtime::Task*>>> &sep_stages,
     185             :            const std::vector<size_t> &n_threads,
     186             :            const std::vector<size_t> &synchro_buffer_sizes,
     187             :            const std::vector<bool> &synchro_active_waiting,
     188             :            const std::vector<bool> &thread_pinning,
     189             :            const std::vector<std::vector<size_t>> &puids/*,
     190          28 :            const std::vector<bool> &tasks_inplace*/)
     191             : : Pipeline({&first},
     192             :            sep_stages,
     193             :            n_threads,
     194             :            synchro_buffer_sizes,
     195             :            synchro_active_waiting,
     196             :            thread_pinning, puids/*,
     197          28 :            tasks_inplace*/)
     198             : {
     199          28 : }
     200             : 
     201             : //========================== Pipeline constructors with new version thread pinning =====================================
     202          10 : Pipeline
     203             : ::Pipeline(const std::vector<runtime::Task*> &firsts,
     204             :            const std::vector<runtime::Task*> &lasts,
     205             :            const std::vector<std::tuple<std::vector<runtime::Task*>, std::vector<runtime::Task*>, std::vector<runtime::Task*>>> &sep_stages,
     206             :            const std::vector<size_t> &n_threads,
     207             :            const std::vector<size_t> &synchro_buffer_sizes,
     208             :            const std::vector<bool> &synchro_active_waiting,
     209             :            const std::vector<bool> &thread_pinning,
     210             :            const std::string &pipeline_pinning_policy/*,
     211          10 :            const std::vector<bool> &tasks_inplace*/)
     212          10 : : original_sequence(firsts, lasts, 1),
     213          10 :   stages(sep_stages.size()),
     214          10 :   adaptors(sep_stages.size() -1),
     215          10 :   saved_firsts_tasks_id(sep_stages.size()),
     216          10 :   saved_lasts_tasks_id(sep_stages.size()),
     217          10 :   bound_adaptors(false),
     218          30 :   auto_stop(true)
     219             : {
     220          10 :     this->init<runtime::Task>(firsts,
     221             :                                   lasts,
     222             :                                   sep_stages,
     223             :                                   n_threads,
     224             :                                   synchro_buffer_sizes,
     225             :                                   synchro_active_waiting,
     226             :                                   thread_pinning, {}, pipeline_pinning_policy/*,
     227             :                                   tasks_inplace*/);
     228          10 : }
     229             : 
     230          47 : Pipeline
     231             : ::Pipeline(const std::vector<runtime::Task*> &firsts,
     232             :            const std::vector<runtime::Task*> &lasts,
     233             :            const std::vector<std::pair<std::vector<runtime::Task*>, std::vector<runtime::Task*>>> &sep_stages,
     234             :            const std::vector<size_t> &n_threads,
     235             :            const std::vector<size_t> &synchro_buffer_sizes,
     236             :            const std::vector<bool> &synchro_active_waiting,
     237             :            const std::vector<bool> &thread_pinning,
     238             :            const std::string &pipeline_pinning_policy/*,
     239          47 :            const std::vector<bool> &tasks_inplace*/)
     240          47 : : original_sequence(firsts, lasts, 1),
     241          47 :   stages(sep_stages.size()),
     242          47 :   adaptors(sep_stages.size() -1),
     243          47 :   saved_firsts_tasks_id(sep_stages.size()),
     244          47 :   saved_lasts_tasks_id(sep_stages.size()),
     245          47 :   bound_adaptors(false),
     246         141 :   auto_stop(true)
     247             : {
     248             :     std::vector<std::tuple<std::vector<runtime::Task*>, std::vector<runtime::Task*>, std::vector<runtime::Task*>>>
     249          47 :       sep_stages_bis;
     250         220 :     for (auto& sep_stage : sep_stages)
     251         173 :         sep_stages_bis.push_back(std::make_tuple(sep_stage.first, sep_stage.second, std::vector<runtime::Task*>()));
     252             : 
     253          47 :     this->init<runtime::Task>(firsts,
     254             :                                   lasts,
     255             :                                   sep_stages_bis,
     256             :                                   n_threads,
     257             :                                   synchro_buffer_sizes,
     258             :                                   synchro_active_waiting,
     259             :                                   thread_pinning, {}, pipeline_pinning_policy/*,
     260             :                                   tasks_inplace*/);
     261          47 : }
     262             : 
     263          10 : Pipeline
     264             : ::Pipeline(const std::vector<runtime::Task*> &firsts,
     265             :            const std::vector<std::tuple<std::vector<runtime::Task*>, std::vector<runtime::Task*>, std::vector<runtime::Task*>>> &sep_stages,
     266             :            const std::vector<size_t> &n_threads,
     267             :            const std::vector<size_t> &synchro_buffer_sizes,
     268             :            const std::vector<bool> &synchro_active_waiting,
     269             :            const std::vector<bool> &thread_pinning,
     270             :            const std::string &pipeline_pinning_policy/*,
     271          10 :            const std::vector<bool> &tasks_inplace*/)
     272             : : Pipeline(firsts,
     273             :            {},
     274             :            sep_stages,
     275             :            n_threads,
     276             :            synchro_buffer_sizes,
     277             :            synchro_active_waiting,
     278             :            thread_pinning, pipeline_pinning_policy/*,
     279          10 :            tasks_inplace*/)
     280             : {
     281          10 : }
     282             : 
     283          36 : Pipeline
     284             : ::Pipeline(const std::vector<runtime::Task*> &firsts,
     285             :            const std::vector<std::pair<std::vector<runtime::Task*>, std::vector<runtime::Task*>>> &sep_stages,
     286             :            const std::vector<size_t> &n_threads,
     287             :            const std::vector<size_t> &synchro_buffer_sizes,
     288             :            const std::vector<bool> &synchro_active_waiting,
     289             :            const std::vector<bool> &thread_pinning,
     290             :            const std::string &pipeline_pinning_policy/*,
     291          36 :            const std::vector<bool> &tasks_inplace*/)
     292             : : Pipeline(firsts,
     293             :            {},
     294             :            sep_stages,
     295             :            n_threads,
     296             :            synchro_buffer_sizes,
     297             :            synchro_active_waiting,
     298             :            thread_pinning, pipeline_pinning_policy/*,
     299          36 :            tasks_inplace*/)
     300             : {
     301          36 : }
     302             : 
     303           0 : Pipeline
     304             : ::Pipeline(runtime::Task &first,
     305             :            runtime::Task &last,
     306             :            const std::vector<std::pair<std::vector<runtime::Task*>, std::vector<runtime::Task*>>> &sep_stages,
     307             :            const std::vector<size_t> &n_threads,
     308             :            const std::vector<size_t> &synchro_buffer_sizes,
     309             :            const std::vector<bool> &synchro_active_waiting,
     310             :            const std::vector<bool> &thread_pinning,
     311             :            const std::string &pipeline_pinning_policy/*,
     312           0 :            const std::vector<bool> &tasks_inplace*/)
     313             : : Pipeline({&first},
     314             :            {&last},
     315             :            sep_stages,
     316             :            n_threads,
     317             :            synchro_buffer_sizes,
     318             :            synchro_active_waiting,
     319             :            thread_pinning, pipeline_pinning_policy/*,
     320           0 :            tasks_inplace*/)
     321             : {
     322           0 : }
     323             : 
     324          36 : Pipeline
     325             : ::Pipeline(runtime::Task &first,
     326             :            const std::vector<std::pair<std::vector<runtime::Task*>, std::vector<runtime::Task*>>> &sep_stages,
     327             :            const std::vector<size_t> &n_threads,
     328             :            const std::vector<size_t> &synchro_buffer_sizes,
     329             :            const std::vector<bool> &synchro_active_waiting,
     330             :            const std::vector<bool> &thread_pinning,
     331             :            const std::string &pipeline_pinning_policy/*,
     332          36 :            const std::vector<bool> &tasks_inplace*/)
     333             : : Pipeline({&first},
     334             :            sep_stages,
     335             :            n_threads,
     336             :            synchro_buffer_sizes,
     337             :            synchro_active_waiting,
     338             :            thread_pinning,
     339             :            pipeline_pinning_policy/*,
     340          36 :            tasks_inplace*/)
     341             : {
     342          36 : }
     343             : 
     344         198 : Pipeline::~Pipeline()
     345             : {
     346             :     // hack: this is important to avoid to deallocate twice the same out buffers
     347         104 :     this->original_sequence.memory_allocation = false;
     348             : 
     349             :     // deallocating stages memory
     350         477 :     for (auto stage : this->stages)
     351         373 :         stage->deallocate_outbuffers();
     352             : 
     353         104 :     this->unbind_adaptors();
     354         198 : }
     355             : 
     356             : std::vector<Sequence*>
     357         192 : Pipeline::get_stages()
     358             : {
     359         192 :     std::vector<Sequence*> stages;
     360         924 :     for (auto& stage : this->stages)
     361         732 :         stages.push_back(stage.get());
     362         192 :     return stages;
     363           0 : }
     364             : 
     365             : Sequence&
     366           0 : Pipeline::operator[](const size_t stage_id)
     367             : {
     368           0 :     assert(stage_id < this->stages.size());
     369           0 :     return *this->stages[stage_id];
     370             : }
     371             : 
     372             : template<class TA>
     373             : runtime::Sequence*
     374             : create_sequence(const std::vector<TA*>& firsts,
     375             :                 const std::vector<TA*>& lasts,
     376             :                 const std::vector<TA*>& exclusions,
     377             :                 const size_t& n_threads,
     378             :                 const bool& thread_pinning,
     379             :                 const std::vector<size_t>& puids,
     380             :                 const bool& tasks_inplace)
     381             : {
     382             :     throw tools::unimplemented_error(__FILE__, __LINE__, __func__);
     383             : }
     384             : 
     385             : template<>
     386             : runtime::Sequence*
     387           0 : create_sequence<const runtime::Task>(const std::vector<const runtime::Task*>& firsts,
     388             :                                      const std::vector<const runtime::Task*>& lasts,
     389             :                                      const std::vector<const runtime::Task*>& exclusions,
     390             :                                      const size_t& n_threads,
     391             :                                      const bool& thread_pinning,
     392             :                                      const std::vector<size_t>& puids,
     393             :                                      const bool& /*tasks_inplace*/)
     394             : {
     395           0 :     return new runtime::Sequence(firsts, lasts, exclusions, n_threads, thread_pinning, puids, false);
     396             : }
     397             : 
     398             : template<>
     399             : Sequence*
     400         308 : create_sequence<runtime::Task>(const std::vector<runtime::Task*>& firsts,
     401             :                                const std::vector<runtime::Task*>& lasts,
     402             :                                const std::vector<runtime::Task*>& exclusions,
     403             :                                const size_t& n_threads,
     404             :                                const bool& thread_pinning,
     405             :                                const std::vector<size_t>& puids,
     406             :                                const bool& tasks_inplace)
     407             : {
     408         308 :     return new runtime::Sequence(firsts, lasts, exclusions, n_threads, thread_pinning, puids, tasks_inplace, false);
     409             : }
     410             : 
     411             : // Init and sequence creation for second pinning version
     412             : template<class TA>
     413             : runtime::Sequence*
     414             : create_sequence(const std::vector<TA*>& firsts,
     415             :                 const std::vector<TA*>& lasts,
     416             :                 const std::vector<TA*>& exclusions,
     417             :                 const size_t& n_threads,
     418             :                 const bool& thread_pinning,
     419             :                 const std::string& pipeline_pinning_policy,
     420             :                 const bool& tasks_inplace)
     421             : {
     422             :     throw tools::unimplemented_error(__FILE__, __LINE__, __func__);
     423             : }
     424             : 
     425             : template<>
     426             : runtime::Sequence*
     427           0 : create_sequence<const runtime::Task>(const std::vector<const runtime::Task*>& firsts,
     428             :                                      const std::vector<const runtime::Task*>& lasts,
     429             :                                      const std::vector<const runtime::Task*>& exclusions,
     430             :                                      const size_t& n_threads,
     431             :                                      const bool& thread_pinning,
     432             :                                      const std::string& pipeline_pinning_policy,
     433             :                                      const bool& /*tasks_inplace*/)
     434             : {
     435           0 :     return new runtime::Sequence(firsts, lasts, exclusions, n_threads, thread_pinning, pipeline_pinning_policy, false);
     436             : }
     437             : 
     438             : template<>
     439             : Sequence*
     440          65 : create_sequence<runtime::Task>(const std::vector<runtime::Task*>& firsts,
     441             :                                const std::vector<runtime::Task*>& lasts,
     442             :                                const std::vector<runtime::Task*>& exclusions,
     443             :                                const size_t& n_threads,
     444             :                                const bool& thread_pinning,
     445             :                                const std::string& pipeline_pinning_policy,
     446             :                                const bool& tasks_inplace)
     447             : {
     448             :     return new runtime::Sequence(
     449          65 :       firsts, lasts, exclusions, n_threads, thread_pinning, pipeline_pinning_policy, tasks_inplace, false);
     450             : }
     451             : 
     452             : template <class TA>
     453         104 : void Pipeline
     454             : ::init(const std::vector<TA*> &/*firsts*/,
     455             :        const std::vector<TA*> &/*lasts*/,
     456             :        const std::vector<std::tuple<std::vector<TA*>,std::vector<TA*>,std::vector<TA*>>> &sep_stages,
     457             :        const std::vector<size_t> &n_threads,
     458             :        const std::vector<size_t> &synchro_buffer_sizes,
     459             :        const std::vector<bool> &synchro_active_waiting,
     460             :        const std::vector<bool> &thread_pinning,
     461             :        const std::vector<std::vector<size_t>> &puids,
     462             :        const std::string &pipeline_pinning_policy/*,
     463             :        const std::vector<bool> &tasks_inplace*/)
     464             : {
     465         104 :     if (sep_stages.size() != n_threads.size() && n_threads.size() != 0)
     466             :     {
     467           0 :         std::stringstream message;
     468           0 :         message << "'n_threads.size()' has to be equal to 'sep_stages.size()' or equal to '0' ('n_threads.size()' = "
     469           0 :                 << n_threads.size() << " , 'sep_stages.size()' = " << sep_stages.size() << ").";
     470           0 :         throw tools::invalid_argument(__FILE__, __LINE__, __func__, message.str());
     471           0 :     }
     472             : 
     473         104 :     if (sep_stages.size() != synchro_buffer_sizes.size() + 1 && synchro_buffer_sizes.size() != 0)
     474             :     {
     475           0 :         std::stringstream message;
     476             :         message << "'synchro_buffer_sizes.size()' has to be equal to 'sep_stages.size() -1' or equal to '0' "
     477           0 :                 << "('synchro_buffer_sizes.size()' = " << synchro_buffer_sizes.size()
     478           0 :                 << " , 'sep_stages.size()' = " << sep_stages.size() << ").";
     479           0 :         throw tools::invalid_argument(__FILE__, __LINE__, __func__, message.str());
     480           0 :     }
     481             : 
     482         104 :     if (sep_stages.size() != synchro_active_waiting.size() + 1 && synchro_active_waiting.size() != 0)
     483             :     {
     484           0 :         std::stringstream message;
     485             :         message << "'synchro_active_waiting.size()' has to be equal to 'sep_stages.size() -1' or equal to '0' "
     486           0 :                 << "('synchro_active_waiting.size()' = " << synchro_active_waiting.size()
     487           0 :                 << " , 'sep_stages.size()' = " << sep_stages.size() << ").";
     488           0 :         throw tools::invalid_argument(__FILE__, __LINE__, __func__, message.str());
     489           0 :     }
     490             : 
     491         104 :     if (sep_stages.size() != thread_pinning.size() && thread_pinning.size() != 0)
     492             :     {
     493           0 :         std::stringstream message;
     494             :         message << "'thread_pinning.size()' has to be equal to 'sep_stages.size()' or equal to '0' ("
     495           0 :                 << "'thread_pinning.size()' = " << thread_pinning.size()
     496           0 :                 << " , 'sep_stages.size()' = " << sep_stages.size() << ").";
     497           0 :         throw tools::invalid_argument(__FILE__, __LINE__, __func__, message.str());
     498           0 :     }
     499             : 
     500         104 :     if (sep_stages.size() != puids.size() && puids.size() != 0)
     501             :     {
     502           0 :         std::stringstream message;
     503           0 :         message << "'puids.size()' has to be equal to 'sep_stages.size()' or equal to '0' ('puids.size()' = "
     504           0 :                 << puids.size() << " , 'sep_stages.size()' = " << sep_stages.size() << ").";
     505           0 :         throw tools::invalid_argument(__FILE__, __LINE__, __func__, message.str());
     506           0 :     }
     507             : 
     508             :     // if (sep_stages.size() != tasks_inplace.size() && tasks_inplace.size() != 0)
     509             :     // {
     510             :     //  std::stringstream message;
     511             :     //  message << "'tasks_inplace.size()' has to be equal to 'sep_stages.size()' or equal to '0' ('"
     512             :     //          << "tasks_inplace.size()' = " << tasks_inplace.size() << " , 'sep_stages.size()' = "
     513             :     //          << sep_stages.size() << ").";
     514             :     //  throw tools::invalid_argument(__FILE__, __LINE__, __func__, message.str());
     515             :     // }
     516             : 
     517             :     // Creating a vector of pinning policies for each sequence
     518         104 :     std::vector<std::string> sequences_pinning_policies;
     519         104 :     if (!pipeline_pinning_policy.empty())
     520          23 :         sequences_pinning_policies =
     521             :           tools::Thread_pinning_utils::pipeline_parser_unpacker(pipeline_pinning_policy, sep_stages.size());
     522             : 
     523         477 :     for (size_t s = 0; s < sep_stages.size(); s++)
     524             :     {
     525         373 :         const std::vector<TA*>& stage_firsts = std::get<0>(sep_stages[s]);
     526         373 :         const std::vector<TA*>& stage_lasts = std::get<1>(sep_stages[s]);
     527         373 :         const std::vector<TA*>& stage_exclusions = std::get<2>(sep_stages[s]);
     528         373 :         const size_t stage_n_threads = n_threads.size() ? n_threads[s] : 1;
     529         373 :         const bool stage_thread_pinning = thread_pinning.size() ? thread_pinning[s] : false;
     530         373 :         const std::vector<size_t> stage_puids = puids.size() ? puids[s] : std::vector<size_t>();
     531         438 :         const std::string sequence_pinning_policy =
     532         438 :           sequences_pinning_policies.size() ? sequences_pinning_policies[s] : "";
     533         373 :         const bool stage_tasks_inplace = /*tasks_inplace.size() ? tasks_inplace[s] :*/ true;
     534             :         try
     535             :         {
     536         373 :             if (pipeline_pinning_policy.empty())
     537         308 :                 this->stages[s].reset(create_sequence<TA>(stage_firsts,
     538             :                                                           stage_lasts,
     539             :                                                           stage_exclusions,
     540             :                                                           stage_n_threads,
     541             :                                                           stage_thread_pinning,
     542             :                                                           stage_puids,
     543             :                                                           stage_tasks_inplace));
     544             :             else
     545          65 :                 this->stages[s].reset(create_sequence<TA>(stage_firsts,
     546             :                                                           stage_lasts,
     547             :                                                           stage_exclusions,
     548             :                                                           stage_n_threads,
     549             :                                                           stage_thread_pinning,
     550             :                                                           sequence_pinning_policy,
     551             :                                                           stage_tasks_inplace));
     552             :         }
     553           0 :         catch (const tools::control_flow_error& e)
     554             :         {
     555           0 :             std::stringstream message;
     556           0 :             message << "Invalid control flow error on stage " << s
     557           0 :                     << " (perhaps a switcher's tasks were separated between different stages)." << std::endl
     558           0 :                     << e.what();
     559           0 :             throw tools::control_flow_error(__FILE__, __LINE__, __func__, message.str());
     560           0 :         }
     561         373 :         this->stages[s]->is_part_of_pipeline = true;
     562             :     }
     563             : 
     564             :     // verify that the sequential sequence is equivalent to the pipeline sequence
     565         104 :     auto ref_tasks = this->original_sequence.get_tasks_per_threads()[0];
     566         104 :     auto cur_tasks = this->get_tasks_per_threads()[0];
     567             : 
     568         104 :     if (ref_tasks.size() != cur_tasks.size())
     569             :     {
     570           0 :         std::ofstream f1("dbg_ref_sequence.dot");
     571           0 :         this->original_sequence.export_dot(f1);
     572           0 :         std::ofstream f2("dbg_cur_pipeline.dot");
     573           0 :         this->export_dot(f2);
     574             : 
     575           0 :         std::stringstream message;
     576           0 :         message << "'ref_tasks.size()' has to be equal to 'cur_tasks.size()' ('ref_tasks.size()' = " << ref_tasks.size()
     577           0 :                 << ", 'cur_tasks.size()' = " << cur_tasks.size() << ").";
     578           0 :         throw tools::invalid_argument(__FILE__, __LINE__, __func__, message.str());
     579           0 :     }
     580             : 
     581        1082 :     for (size_t ta = 0; ta < cur_tasks.size(); ta++)
     582             :     {
     583         978 :         if (std::find(ref_tasks.begin(), ref_tasks.end(), cur_tasks[ta]) == ref_tasks.end())
     584             :         {
     585           0 :             std::ofstream f1("dbg_ref_sequence.dot");
     586           0 :             this->original_sequence.export_dot(f1);
     587           0 :             std::ofstream f2("dbg_cur_pipeline.dot");
     588           0 :             this->export_dot(f2);
     589             : 
     590           0 :             std::stringstream message;
     591           0 :             message << "'cur_tasks[ta]' is not contained in the 'ref_tasks' vector ('ta' = " << ta
     592           0 :                     << ", 'cur_tasks[ta]' = " << +cur_tasks[ta]
     593           0 :                     << ", 'cur_tasks[ta]->get_name()' = " << cur_tasks[ta]->get_name() << ").";
     594           0 :             throw tools::invalid_argument(__FILE__, __LINE__, __func__, message.str());
     595           0 :         }
     596             :     }
     597             : 
     598             :     // Adding adaptors to pipeline stages
     599         104 :     this->create_adaptors(synchro_buffer_sizes, synchro_active_waiting);
     600         104 :     this->bind_adaptors();
     601             : 
     602             :     // Allocating memory for stages
     603         477 :     for (auto stage : this->stages)
     604             :     {
     605         373 :         stage->allocate_outbuffers();
     606             :     }
     607             : 
     608         104 :     this->thread_pool.reset(new tools::Thread_pool_standard(this->stages.size() - 1));
     609         104 :     this->thread_pool->init(); // threads are spawned here
     610         104 : }
     611             : 
     612             : void
     613         104 : Pipeline::create_adaptors(const std::vector<size_t>& synchro_buffer_sizes,
     614             :                           const std::vector<bool>& synchro_active_waiting)
     615             : {
     616             :     //                     sck out addr     occ     stage   tsk id  sck id
     617         104 :     std::vector<std::tuple<runtime::Socket*, size_t, size_t, size_t, size_t>> out_sck_orphans;
     618             : 
     619             :     // for all the stages in the pipeline
     620         373 :     for (size_t sta = 0; sta < this->stages.size() - 1; sta++)
     621             :     {
     622             :         // ------------------------------------------------------------------------------------------------------------
     623             :         // --------------------------------------------------------------- collect orphan output sockets in stage 'sta'
     624             :         // ------------------------------------------------------------------------------------------------------------
     625         269 :         std::vector<std::vector<runtime::Task*>> tasks_per_threads = this->stages[sta]->get_tasks_per_threads();
     626             :         // for all the threads in the current stage
     627             :         // for (size_t t = 0; t < tasks_per_threads.size(); t++)
     628         269 :         size_t t = 0;
     629             :         {
     630             :             // for all the tasks in the stage
     631        1015 :             for (size_t tsk_id = 0; tsk_id < tasks_per_threads[t].size(); tsk_id++)
     632             :             {
     633         746 :                 auto tsk = tasks_per_threads[t][tsk_id];
     634             :                 // for all the sockets of the tasks
     635        2666 :                 for (size_t sck_id = 0; sck_id < tsk->sockets.size(); sck_id++)
     636             :                 {
     637        1920 :                     auto sck = tsk->sockets[sck_id];
     638             :                     // if the current socket is an output or forward socket type
     639        1920 :                     if (sck->get_type() == socket_t::SOUT || sck->get_type() == socket_t::SFWD)
     640             :                     {
     641             :                         // for all the bounded sockets to the current socket
     642        2438 :                         for (auto bsck : sck->get_bound_sockets())
     643             :                         {
     644             :                             // check if the task of the bounded socket is not in the current stage
     645         916 :                             if (std::find(tasks_per_threads[t].begin(),
     646         916 :                                           tasks_per_threads[t].end(),
     647        1832 :                                           &bsck->get_task()) == tasks_per_threads[t].end())
     648             :                             {
     649             :                                 // check the position of the socket in the orphans
     650         426 :                                 size_t pos = 0;
     651        1149 :                                 for (; pos < out_sck_orphans.size(); pos++)
     652         773 :                                     if (std::get<0>(out_sck_orphans[pos]) == sck.get()) break;
     653             : 
     654         426 :                                 if (pos == out_sck_orphans.size())
     655         376 :                                     out_sck_orphans.push_back(std::make_tuple(sck.get(), 1, sta, tsk_id, sck_id));
     656             :                                 else
     657          50 :                                     std::get<1>(out_sck_orphans[pos])++;
     658             :                             }
     659             :                         }
     660             :                     }
     661        1920 :                 }
     662             :             }
     663             :         }
     664             : 
     665             :         // ------------------------------------------------------------------------------------------------------------
     666             :         // -------------------------------------- collect orphan input sockets in stage 'sta +1' and create connections
     667             :         // ------------------------------------------------------------------------------------------------------------
     668         269 :         tasks_per_threads = this->stages[sta + 1]->get_tasks_per_threads();
     669             :         // for all the threads in the current stage
     670             :         // for (size_t t = 0; t < tasks_per_threads.size(); t++)
     671             :         {
     672             :             // for all the tasks in the stage
     673        1068 :             for (size_t tsk_id = 0; tsk_id < tasks_per_threads[t].size(); tsk_id++)
     674             :             {
     675         799 :                 auto tsk = tasks_per_threads[t][tsk_id];
     676             :                 // ----------------------------------------- manage socket to socket bindings (with fake input sockets)
     677             :                 // for all the sockets of the tasks
     678        2836 :                 for (size_t sck_id = 0; sck_id < tsk->sockets.size(); sck_id++)
     679             :                 {
     680        2037 :                     auto sck = tsk->sockets[sck_id];
     681             :                     // if the current socket is an input or forward socket type
     682        2037 :                     if (sck->get_type() == socket_t::SIN || sck->get_type() == socket_t::SFWD)
     683             :                     {
     684         820 :                         runtime::Socket* bsck = nullptr;
     685             :                         try
     686             :                         {
     687             :                             // get output bounded socket
     688         820 :                             bsck = &sck->get_bound_socket(); // can throw if there is no bounded socket
     689             :                         }
     690           0 :                         catch (const std::exception&)
     691             :                         {
     692           0 :                         }
     693         820 :                         if (bsck != nullptr)
     694             :                         {
     695             :                             // check if the task of the bounded socket is not in the current stage
     696         820 :                             if (std::find(tasks_per_threads[t].begin(),
     697         820 :                                           tasks_per_threads[t].end(),
     698        1640 :                                           &bsck->get_task()) == tasks_per_threads[t].end())
     699             :                             {
     700             :                                 // check the position of the bounded socket in the orphans
     701         397 :                                 size_t pos = 0;
     702        1017 :                                 for (; pos < out_sck_orphans.size(); pos++)
     703        1017 :                                     if (std::get<0>(out_sck_orphans[pos]) == bsck) break;
     704             : 
     705         397 :                                 if (pos < out_sck_orphans.size())
     706             :                                 {
     707         397 :                                     auto sck_out = std::get<0>(out_sck_orphans[pos]);
     708         397 :                                     auto sck_in = sck.get();
     709         794 :                                     auto unbind_sout_pos = std::distance(sck_out->get_bound_sockets().begin(),
     710         397 :                                                                          std::find(sck_out->get_bound_sockets().begin(),
     711         397 :                                                                                    sck_out->get_bound_sockets().end(),
     712         397 :                                                                                    sck_in));
     713         397 :                                     this->sck_orphan_binds.push_back(
     714         397 :                                       std::make_pair(std::make_tuple(std::get<0>(out_sck_orphans[pos]),
     715         397 :                                                                      std::get<2>(out_sck_orphans[pos]),
     716         397 :                                                                      std::get<3>(out_sck_orphans[pos]),
     717         397 :                                                                      std::get<4>(out_sck_orphans[pos]),
     718             :                                                                      unbind_sout_pos),
     719         794 :                                                      std::make_tuple(sck_in, sta + 1, tsk_id, sck_id, nullptr)));
     720             :                                 }
     721             :                             }
     722             :                         }
     723             :                     }
     724        2037 :                 }
     725             :                 // ------------------------------------------- manage socket to task bindings (with fake input sockets)
     726         944 :                 for (size_t sck_id = 0; sck_id < tsk->fake_input_sockets.size(); sck_id++)
     727             :                 {
     728         145 :                     auto sck = tsk->fake_input_sockets[sck_id];
     729         145 :                     runtime::Socket* bsck = nullptr;
     730             :                     try
     731             :                     {
     732             :                         // get output bounded socket
     733         145 :                         bsck = &sck->get_bound_socket(); // can throw if there is no bounded socket
     734             :                     }
     735           0 :                     catch (const std::exception&)
     736             :                     {
     737           0 :                     }
     738         145 :                     if (bsck != nullptr)
     739             :                     {
     740             :                         // check if the task of the bounded socket is not in the current stage
     741         145 :                         if (std::find(tasks_per_threads[t].begin(), tasks_per_threads[t].end(), &bsck->get_task()) ==
     742         290 :                             tasks_per_threads[t].end())
     743             :                         {
     744             :                             // check the position of the bounded socket in the orphans
     745          29 :                             size_t pos = 0;
     746         132 :                             for (; pos < out_sck_orphans.size(); pos++)
     747         132 :                                 if (std::get<0>(out_sck_orphans[pos]) == bsck) break;
     748             : 
     749          29 :                             if (pos < out_sck_orphans.size())
     750             :                             {
     751          29 :                                 auto sck_out = std::get<0>(out_sck_orphans[pos]);
     752          29 :                                 auto sck_in = sck.get();
     753          58 :                                 auto unbind_sout_pos = std::distance(sck_out->get_bound_sockets().begin(),
     754          29 :                                                                      std::find(sck_out->get_bound_sockets().begin(),
     755          29 :                                                                                sck_out->get_bound_sockets().end(),
     756          29 :                                                                                sck_in));
     757          29 :                                 this->sck_orphan_binds.push_back(
     758          29 :                                   std::make_pair(std::make_tuple(std::get<0>(out_sck_orphans[pos]),
     759          29 :                                                                  std::get<2>(out_sck_orphans[pos]),
     760          29 :                                                                  std::get<3>(out_sck_orphans[pos]),
     761          29 :                                                                  std::get<4>(out_sck_orphans[pos]),
     762             :                                                                  unbind_sout_pos),
     763          58 :                                                  std::make_tuple(nullptr, sta + 1, tsk_id, sck_id, tsk)));
     764             :                             }
     765             :                         }
     766             :                     }
     767         145 :                 }
     768             :                 // ----------------------------------------------------------------------------------------------------
     769             :             }
     770             :         }
     771         269 :     }
     772             : 
     773             :     // ----------------------------------------------------------------------------------------------------------------
     774             :     // ----------------------------------------------------------------------------------------------- prints for debug
     775             :     // ----------------------------------------------------------------------------------------------------------------
     776             :     // std::cout << "Orphan output sockets list:" << std::endl;
     777             :     // for (auto &sck : out_sck_orphans)
     778             :     // {
     779             :     //  auto sck_out_name = std::get<0>(sck)->get_name();
     780             :     //  auto tsk_out_name = std::get<0>(sck)->get_task().get_name();
     781             :     //  auto sck_out_occ  = std::get<1>(sck);
     782             :     //  auto tsk_out_sta  = std::get<2>(sck);
     783             :     //  auto tsk_out_id   = std::get<3>(sck);
     784             :     //  auto sck_out_id   = std::get<4>(sck);
     785             : 
     786             :     //  std::cout << "  " << tsk_out_name << "[" << sck_out_name << "] (stage " << tsk_out_sta << ", " << sck_out_occ
     787             :     //            << " occurrences, tsk id = " << tsk_out_id << ", sck id = " << sck_out_id << ")" << std::endl;
     788             :     // }
     789             : 
     790             :     // std::cout << std::endl << "Detected socket binds:" << std::endl;
     791             :     // for (auto &bind : this->sck_orphan_binds)
     792             :     // {
     793             :     //  auto sck_out_name = std::get<0>(bind.first)->get_name();
     794             :     //  auto tsk_out_name = std::get<0>(bind.first)->get_task().get_name();
     795             :     //  auto tsk_out_sta  = std::get<1>(bind.first);
     796             :     //  auto tsk_out_id   = std::get<2>(bind.first);
     797             :     //  auto sck_out_id   = std::get<3>(bind.first);
     798             :     //  auto sck_out_ubp  = std::get<4>(bind.first);
     799             : 
     800             :     //  auto sck_in_name = std::get<0>(bind.second)->get_name();
     801             :     //  auto tsk_in_name = std::get<0>(bind.second)->get_task().get_name();
     802             :     //  auto tsk_in_sta  = std::get<1>(bind.second);
     803             :     //  auto tsk_in_id   = std::get<2>(bind.second);
     804             :     //  auto sck_in_id   = std::get<3>(bind.second);
     805             : 
     806             :     //  std::cout << "  " << tsk_out_name << "[" << sck_out_name << "] (stage " << tsk_out_sta << ", tsk id = "
     807             :     //                    << tsk_out_id << ", sck id = " << sck_out_id << ", ubp = " << sck_out_ubp << ")"  << " -> "
     808             :     //                    << tsk_in_name  << "[" << sck_in_name  << "] (stage " << tsk_in_sta  << ", tsk id = "
     809             :     //                    << tsk_in_id  << ", sck id = " << sck_in_id  << ")" << std::endl;
     810             :     // }
     811             : 
     812             :     // ----------------------------------------------------------------------------------------------------------------
     813             :     // ------------------------------------------------------------------------------------------------ create adaptors
     814             :     // ----------------------------------------------------------------------------------------------------------------
     815         104 :     auto sck_orphan_binds_cpy = this->sck_orphan_binds;
     816         104 :     module::Adaptor_m_to_n* adp = nullptr;
     817         104 :     std::map<runtime::Socket*, size_t> sck_to_adp_sck_id;
     818         477 :     for (size_t sta = 0; sta < this->stages.size(); sta++)
     819             :     {
     820         373 :         const auto n_threads = this->stages[sta]->get_n_threads();
     821         373 :         std::vector<std::vector<runtime::Task*>> tasks_per_threads = this->stages[sta]->get_tasks_per_threads();
     822             : 
     823             :         // ------------------------------------------------------------------------------------------------------------
     824             :         // ----------------------------------------------------------------------------------------------- pull adaptor
     825             :         // ------------------------------------------------------------------------------------------------------------
     826         373 :         if (sta > 0)
     827             :         {
     828         269 :             assert(adp != nullptr);
     829             :             //                               sck out addr      stage   tsk id  sck id  unbind_pos
     830             :             std::vector<std::pair<std::tuple<runtime::Socket*, size_t, size_t, size_t, size_t>,
     831             :                                   //                               sck in addr       stage   tsk id  sck id  tsk in addr
     832             :                                   std::tuple<runtime::Socket*, size_t, size_t, size_t, runtime::Task*>>>
     833         269 :               sck_orphan_binds_new;
     834             : 
     835        3361 :             for (size_t t = 0; t < n_threads; t++)
     836             :             {
     837        3092 :                 module::Adaptor_m_to_n* cur_adp = (t == 0) ? adp : adp->clone();
     838        3092 :                 if (t > 0) cur_adp->add_puller();
     839             : 
     840        9276 :                 for (auto& t : cur_adp->tasks)
     841        6184 :                     t->set_fast(true);
     842        3092 :                 if (t > 0)
     843             :                 {
     844        2823 :                     this->adaptors[sta - 1].second.push_back(std::unique_ptr<module::Adaptor_m_to_n>(cur_adp));
     845        2823 :                     cur_adp->set_custom_name("Adp_m_to_n_" + std::to_string(sta - 1));
     846             :                 }
     847             : 
     848        3092 :                 auto task_pull = &(*cur_adp)("pull");
     849             : 
     850        3092 :                 sck_orphan_binds_new.clear();
     851       17727 :                 for (auto& bind : sck_orphan_binds_cpy)
     852             :                 {
     853       14635 :                     auto tsk_out_sta = std::get<1>(bind.first);
     854       14635 :                     if (tsk_out_sta < sta)
     855             :                     {
     856        8094 :                         auto tsk_in_sta = std::get<1>(bind.second);
     857        8094 :                         if (tsk_in_sta == sta)
     858             :                         {
     859        4348 :                             auto sck_out_ptr = std::get<0>(bind.first);
     860        4348 :                             auto priority = std::get<4>(bind.first);
     861        4348 :                             auto tsk_in_id = std::get<2>(bind.second);
     862        4348 :                             auto sck_in_id = std::get<3>(bind.second);
     863        4348 :                             runtime::Socket* sck_in = nullptr;
     864        4348 :                             runtime::Task* tsk_in = nullptr;
     865        4348 :                             if (std::get<0>(bind.second) != nullptr) // if socket to socket binding
     866        3656 :                                 sck_in = tasks_per_threads[t][tsk_in_id]->sockets[sck_in_id].get();
     867             :                             else // if socket to task binding
     868         692 :                                 tsk_in = tasks_per_threads[t][tsk_in_id];
     869        4348 :                             this->adaptors_binds.push_back(std::make_tuple(
     870        8696 :                               task_pull->sockets[sck_to_adp_sck_id[sck_out_ptr]].get(), sck_in, priority, tsk_in));
     871             :                         }
     872             :                         else
     873        3746 :                             sck_orphan_binds_new.push_back(bind);
     874             :                     }
     875             :                     else
     876        6541 :                         sck_orphan_binds_new.push_back(bind);
     877             :                 }
     878             : 
     879        3092 :                 if (t > 0) this->stages[sta]->all_modules[t].push_back(cur_adp);
     880             :             }
     881         269 :             this->saved_firsts_tasks_id[sta] = this->stages[sta]->firsts_tasks_id;
     882         269 :             sck_orphan_binds_cpy = sck_orphan_binds_new;
     883             : 
     884         269 :             adp->alloc_buffers();
     885         269 :         }
     886             : 
     887             :         // ------------------------------------------------------------------------------------------------------------
     888             :         // ----------------------------------------------------------------------------------------------- push adaptor
     889             :         // ------------------------------------------------------------------------------------------------------------
     890         373 :         std::map<runtime::Socket*, size_t> sck_to_adp_sck_id_new;
     891         373 :         if (sta < this->stages.size() - 1)
     892             :         {
     893         269 :             std::vector<size_t> adp_n_elmts;
     894         269 :             std::vector<std::type_index> adp_datatype;
     895         269 :             size_t adp_buffer_size = synchro_buffer_sizes.size() ? synchro_buffer_sizes[sta] : 1;
     896         269 :             bool adp_active_waiting = synchro_active_waiting.size() ? synchro_active_waiting[sta] : false;
     897         269 :             size_t adp_n_frames = 1;
     898             : 
     899             :             // a map to remember if a passed socket points already to the same memory space
     900         269 :             std::map<void*, size_t> fwd_source;
     901             : 
     902         269 :             std::vector<runtime::Socket*> passed_scks_out;
     903        1186 :             for (auto& bind : sck_orphan_binds_cpy)
     904             :             {
     905         917 :                 auto tsk_out_sta = std::get<1>(bind.first);
     906         917 :                 if (tsk_out_sta <= sta)
     907             :                 {
     908         585 :                     auto sck_out = std::get<0>(bind.first);
     909         585 :                     if (std::find(passed_scks_out.begin(), passed_scks_out.end(), sck_out) == passed_scks_out.end())
     910             :                     {
     911             :                         // avoid the creation of new adaptor sockets for forward sockets pointing to the same memory
     912             :                         // space
     913         525 :                         auto sck_out_dptr = (void*)sck_out->get_dataptr<int8_t>();
     914         525 :                         assert(sck_out_dptr != nullptr);
     915         525 :                         if (fwd_source.find(sck_out_dptr) == fwd_source.end())
     916             :                         {
     917         458 :                             fwd_source[sck_out_dptr] = 1;
     918         458 :                             adp_n_frames = sck_out->get_task().get_module().get_n_frames();
     919         458 :                             adp_n_elmts.push_back(sck_out->get_n_elmts() / adp_n_frames);
     920         458 :                             adp_datatype.push_back(sck_out->get_datatype());
     921             :                         }
     922         525 :                         passed_scks_out.push_back(sck_out);
     923             :                     }
     924             :                 }
     925             :             }
     926         269 :             passed_scks_out.clear();
     927             : 
     928             :             // allocate the adaptor for the first thread
     929         269 :             adp = new module::Adaptor_m_to_n(adp_n_elmts, adp_datatype, adp_buffer_size, adp_active_waiting);
     930         269 :             adp->set_n_frames(adp_n_frames);
     931             : 
     932        3361 :             for (size_t t = 0; t < n_threads; t++)
     933             :             {
     934        3092 :                 module::Adaptor_m_to_n* cur_adp = (t == 0) ? adp : adp->clone();
     935        3092 :                 cur_adp->set_custom_name("Adp_m_to_n_" + std::to_string(sta));
     936        3092 :                 if (t > 0) cur_adp->add_pusher();
     937        3092 :                 this->adaptors[sta].first.push_back(std::unique_ptr<module::Adaptor_m_to_n>(cur_adp));
     938        3092 :                 auto task_push = &(*cur_adp)("push");
     939             : 
     940        3092 :                 std::map<void*, size_t> fwd_source;
     941        3092 :                 sck_to_adp_sck_id_new.clear();
     942        3092 :                 size_t adp_sck_id = 0;
     943       13805 :                 for (auto& bind : sck_orphan_binds_cpy) // bind standard task to last adaptor
     944             :                 {
     945       10713 :                     auto tsk_out_sta = std::get<1>(bind.first);
     946             : 
     947       10713 :                     if (tsk_out_sta <= sta)
     948             :                     {
     949        9161 :                         auto sck_out_ptr = std::get<0>(bind.first);
     950        9161 :                         auto sck_out_dptr = (void*)sck_out_ptr->get_dataptr<int8_t>();
     951        9161 :                         assert(sck_out_dptr != nullptr);
     952             : 
     953        9161 :                         if (std::find(passed_scks_out.begin(),
     954             :                                       passed_scks_out.end(),
     955       26581 :                                       sck_out_ptr) == passed_scks_out.end() &&
     956       17420 :                             fwd_source.find(sck_out_dptr) == fwd_source.end()) // <= the latest condition is here to
     957             :                                                                                //    avoid to bind adaptor sockets two
     958             :                                                                                //    times the same memory space
     959             :                                                                                //    (usefull in the case of multiple
     960             :                                                                                //    fwd sockets pointing to the same
     961             :                                                                                //    memory address)
     962             :                         {
     963        6769 :                             if (tsk_out_sta == sta)
     964             :                             {
     965        4434 :                                 auto tsk_out_id = std::get<2>(bind.first);
     966        4434 :                                 auto sck_out_id = std::get<3>(bind.first);
     967        4434 :                                 auto sck_out = tasks_per_threads[t][tsk_out_id]->sockets[sck_out_id];
     968        4434 :                                 auto priority = std::get<4>(bind.first);
     969        4434 :                                 sck_to_adp_sck_id_new[sck_out_ptr] = adp_sck_id;
     970        4434 :                                 this->adaptors_binds.push_back(
     971        4434 :                                   std::make_tuple(sck_out.get(),
     972        4434 :                                                   task_push->sockets[adp_sck_id++].get(),
     973             :                                                   priority,
     974        4434 :                                                   nullptr)); // <= only socket to socket binding is possible here
     975        4434 :                             }
     976             :                             else // if (tsk_out_sta < sta) // bind prev. adaptor to last adaptor
     977             :                             {
     978        2335 :                                 auto tsk_out_id = 1;
     979        2335 :                                 auto sck_out_id = sck_to_adp_sck_id[sck_out_ptr];
     980        2335 :                                 sck_to_adp_sck_id_new[sck_out_ptr] = adp_sck_id;
     981             :                                 auto adp_prev =
     982        2335 :                                   t == 0 ? this->adaptors[sta - 1].first[0] : this->adaptors[sta - 1].second[t - 1];
     983        2335 :                                 auto sck_out = (*adp_prev)[tsk_out_id].sockets[sck_out_id];
     984        2335 :                                 auto priority = std::get<4>(bind.first);
     985        2335 :                                 this->adaptors_binds.push_back(
     986        2335 :                                   std::make_tuple(sck_out.get(),
     987        2335 :                                                   task_push->sockets[adp_sck_id++].get(),
     988             :                                                   priority,
     989        2335 :                                                   nullptr)); // <= only socket to socket binding is possible here
     990        2335 :                             }
     991             : 
     992        6769 :                             fwd_source[sck_out_dptr] = 1; // remember that this memory space has been connected to the
     993             :                                                           // adaptor once
     994        6769 :                             passed_scks_out.push_back(sck_out_ptr);
     995             :                         }
     996             :                     }
     997             :                 }
     998        3092 :                 passed_scks_out.clear();
     999        3092 :             }
    1000         269 :             this->saved_lasts_tasks_id[sta] = this->stages[sta]->lasts_tasks_id;
    1001         269 :         }
    1002         373 :         sck_to_adp_sck_id = sck_to_adp_sck_id_new;
    1003         373 :     }
    1004         104 : }
    1005             : 
    1006             : void
    1007         310 : Pipeline::bind_adaptors()
    1008             : {
    1009         310 :     this->_bind_adaptors(true);
    1010         310 : }
    1011             : 
    1012             : void
    1013         310 : Pipeline::_bind_adaptors(const bool bind_adaptors)
    1014             : {
    1015         310 :     if (!this->bound_adaptors)
    1016             :     {
    1017        1421 :         for (size_t sta = 0; sta < this->stages.size(); sta++)
    1018             :         {
    1019        1111 :             const auto n_threads = this->stages[sta]->get_n_threads();
    1020             : 
    1021             :             // --------------------------------------------------------------------------------------------------------
    1022             :             // ------------------------------------------------------------------------------------------- pull adaptor
    1023             :             // --------------------------------------------------------------------------------------------------------
    1024        1111 :             if (sta > 0)
    1025             :             {
    1026       10071 :                 for (size_t t = 0; t < n_threads; t++)
    1027             :                 {
    1028             :                     module::Adaptor_m_to_n* cur_adp =
    1029        9270 :                       t > 0 ? adaptors[sta - 1].second[t - 1].get() : adaptors[sta - 1].first[0].get();
    1030             : 
    1031        9270 :                     if (t > 0 || sta == this->stages.size() - 1) // add the adaptor to the current stage
    1032        8767 :                         this->stages[sta]->all_modules[t].push_back(cur_adp);
    1033             : 
    1034        9270 :                     auto task_pull = &(*cur_adp)("pull");
    1035             : 
    1036        9270 :                     auto ss = this->stages[sta]->sequences[t]->get_contents();
    1037        9270 :                     assert(ss != nullptr);
    1038        9270 :                     ss->tasks.insert(ss->tasks.begin(), task_pull);
    1039        9270 :                     ss->processes.insert(ss->processes.begin(),
    1040           0 :                                          [task_pull]() -> const int*
    1041             :                                          {
    1042           0 :                                              task_pull->exec();
    1043           0 :                                              const int* status = task_pull->sockets.back()->get_dataptr<const int>();
    1044           0 :                                              return status;
    1045             :                                          });
    1046        9270 :                     this->stages[sta]->update_tasks_id(t);
    1047             :                 }
    1048         801 :                 this->stages[sta]->firsts_tasks_id.clear();
    1049         801 :                 this->stages[sta]->firsts_tasks_id.push_back(0);
    1050         801 :                 this->stages[sta]->n_tasks++;
    1051             :             }
    1052             : 
    1053             :             // --------------------------------------------------------------------------------------------------------
    1054             :             // ------------------------------------------------------------------------------------------- push adaptor
    1055             :             // --------------------------------------------------------------------------------------------------------
    1056        1111 :             if (sta < this->stages.size() - 1)
    1057             :             {
    1058         801 :                 size_t last_task_id = 0;
    1059       10071 :                 for (size_t t = 0; t < n_threads; t++)
    1060             :                 {
    1061        9270 :                     module::Adaptor_m_to_n* cur_adp = adaptors[sta].first[t].get();
    1062             : 
    1063             :                     // add the adaptor to the current stage
    1064        9270 :                     this->stages[sta]->all_modules[t].push_back(cur_adp);
    1065             : 
    1066        9270 :                     auto task_push = &(*cur_adp)("push");
    1067             : 
    1068        9270 :                     auto ss = this->stages[sta]->get_last_subsequence(t);
    1069        9270 :                     assert(ss != nullptr);
    1070        9270 :                     ss->tasks.push_back(task_push);
    1071        9270 :                     ss->processes.push_back(
    1072           0 :                       [task_push]() -> const int*
    1073             :                       {
    1074           0 :                           task_push->exec();
    1075           0 :                           const int* status = task_push->sockets.back()->get_dataptr<const int>();
    1076           0 :                           return status;
    1077             :                       });
    1078        9270 :                     last_task_id = ss->tasks_id[ss->tasks_id.size() - 1] + 1;
    1079        9270 :                     ss->tasks_id.push_back(last_task_id);
    1080             :                 }
    1081         801 :                 this->stages[sta]->lasts_tasks_id.clear();
    1082         801 :                 this->stages[sta]->lasts_tasks_id.push_back(last_task_id);
    1083         801 :                 this->stages[sta]->n_tasks++;
    1084             :             }
    1085        1111 :             this->stages[sta]->update_firsts_and_lasts_tasks();
    1086             :         }
    1087             : 
    1088             :         // ------------------------------------------------------------------------------------------------------------
    1089             :         // ---------------------------------------------------------------------------------------------- bind adaptors
    1090             :         // ------------------------------------------------------------------------------------------------------------
    1091        1582 :         for (auto& bind : this->sck_orphan_binds)
    1092             :         {
    1093        1272 :             auto sck_out = std::get<0>(bind.first);
    1094        1272 :             auto sck_in = std::get<0>(bind.second);
    1095        1272 :             if (sck_in != nullptr) // if socket to socket unbinding
    1096        1185 :                 sck_in->unbind(*sck_out);
    1097             :             else // if socket to task unbinding
    1098             :             {
    1099          87 :                 auto tsk_in = std::get<4>(bind.second);
    1100          87 :                 assert(tsk_in != nullptr);
    1101          87 :                 tsk_in->unbind(*sck_out);
    1102             :             }
    1103             :         }
    1104             : 
    1105         310 :         if (bind_adaptors)
    1106             :         {
    1107       33649 :             for (auto& bind : this->adaptors_binds)
    1108             :             {
    1109       33339 :                 auto sck_out = std::get<0>(bind);
    1110       33339 :                 auto sck_in = std::get<1>(bind);
    1111       33339 :                 auto priority = std::get<2>(bind);
    1112       33339 :                 if (sck_in != nullptr) // if socket to socket binding
    1113       31263 :                     sck_in->_bind(*sck_out, priority);
    1114             :                 else // if socket to task binding
    1115             :                 {
    1116        2076 :                     auto tsk_in = std::get<3>(bind);
    1117        2076 :                     assert(tsk_in != nullptr);
    1118        2076 :                     tsk_in->_bind(*sck_out, priority);
    1119             :                 }
    1120             :             }
    1121             :         }
    1122             : 
    1123         310 :         this->bound_adaptors = true;
    1124             :     }
    1125         310 : }
    1126             : 
    1127             : void
    1128         207 : Pipeline::unbind_adaptors()
    1129             : {
    1130         207 :     this->_unbind_adaptors(true);
    1131         207 : }
    1132             : 
    1133             : void
    1134         413 : Pipeline::_unbind_adaptors(const bool bind_orphans)
    1135             : {
    1136         413 :     if (this->bound_adaptors)
    1137             :     {
    1138        1421 :         for (size_t sta = 0; sta < this->stages.size(); sta++)
    1139             :         {
    1140        1111 :             const auto n_threads = this->stages[sta]->get_n_threads();
    1141             : 
    1142             :             // --------------------------------------------------------------------------------------------------------
    1143             :             // ------------------------------------------------------------------------------------------- pull adaptor
    1144             :             // --------------------------------------------------------------------------------------------------------
    1145        1111 :             if (sta > 0)
    1146             :             {
    1147       10071 :                 for (size_t t = 0; t < n_threads; t++)
    1148             :                 {
    1149        9270 :                     if (t > 0 || sta == this->stages.size() - 1) // rm the adaptor to the current stage
    1150        8767 :                         this->stages[sta]->all_modules[t].pop_back();
    1151             : 
    1152        9270 :                     auto ss = this->stages[sta]->sequences[t]->get_contents();
    1153        9270 :                     assert(ss != nullptr);
    1154        9270 :                     ss->tasks.erase(ss->tasks.begin());
    1155        9270 :                     ss->processes.erase(ss->processes.begin());
    1156        9270 :                     this->stages[sta]->update_tasks_id(t);
    1157             :                 }
    1158         801 :                 this->stages[sta]->firsts_tasks_id = this->saved_firsts_tasks_id[sta];
    1159         801 :                 this->stages[sta]->n_tasks--;
    1160             :             }
    1161             : 
    1162             :             // --------------------------------------------------------------------------------------------------------
    1163             :             // ------------------------------------------------------------------------------------------- push adaptor
    1164             :             // --------------------------------------------------------------------------------------------------------
    1165        1111 :             if (sta < this->stages.size() - 1)
    1166             :             {
    1167       10071 :                 for (size_t t = 0; t < n_threads; t++)
    1168             :                 {
    1169             :                     // rm the adaptor to the current stage
    1170        9270 :                     this->stages[sta]->all_modules[t].pop_back();
    1171             : 
    1172        9270 :                     auto ss = this->stages[sta]->get_last_subsequence(t);
    1173        9270 :                     assert(ss != nullptr);
    1174        9270 :                     ss->tasks.pop_back();
    1175        9270 :                     ss->processes.pop_back();
    1176        9270 :                     ss->tasks_id.pop_back();
    1177             :                 }
    1178         801 :                 this->stages[sta]->lasts_tasks_id = this->saved_lasts_tasks_id[sta];
    1179         801 :                 this->stages[sta]->n_tasks--;
    1180             :             }
    1181        1111 :             this->stages[sta]->update_firsts_and_lasts_tasks();
    1182             :         }
    1183             : 
    1184             :         // ------------------------------------------------------------------------------------------------------------
    1185             :         // -------------------------------------------------------------------------------------------- unbind adaptors
    1186             :         // ------------------------------------------------------------------------------------------------------------
    1187       33649 :         for (auto& bind : this->adaptors_binds)
    1188             :         {
    1189       33339 :             auto sck_out = std::get<0>(bind);
    1190       33339 :             auto sck_in = std::get<1>(bind);
    1191       33339 :             if (sck_in != nullptr) // if socket to socket unbinding
    1192       31263 :                 sck_in->unbind(*sck_out);
    1193             :             else // if socket to task unbinding
    1194             :             {
    1195        2076 :                 auto tsk_in = std::get<3>(bind);
    1196        2076 :                 assert(tsk_in != nullptr);
    1197        2076 :                 tsk_in->unbind(*sck_out);
    1198             :             }
    1199             :         }
    1200             : 
    1201         310 :         if (bind_orphans)
    1202             :         {
    1203         530 :             for (auto& bind : this->sck_orphan_binds)
    1204             :             {
    1205         426 :                 auto sck_out = std::get<0>(bind.first);
    1206         426 :                 auto priority = std::get<4>(bind.first);
    1207         426 :                 auto sck_in = std::get<0>(bind.second);
    1208         426 :                 if (sck_in != nullptr) // if socket to socket binding
    1209         397 :                     sck_in->_bind(*sck_out, priority);
    1210             :                 else // if socket to task binding
    1211             :                 {
    1212          29 :                     auto tsk_in = std::get<4>(bind.second);
    1213          29 :                     assert(tsk_in != nullptr);
    1214          29 :                     tsk_in->_bind(*sck_out, priority);
    1215             :                 }
    1216             :             }
    1217             :         }
    1218             : 
    1219         310 :         this->bound_adaptors = false;
    1220             :     }
    1221         413 : }
    1222             : 
    1223             : void
    1224           0 : Pipeline::exec(const std::vector<std::function<bool(const std::vector<const int*>&)>>& stop_conditions)
    1225             : {
    1226           0 :     if (stop_conditions.size() != this->stages.size() && stop_conditions.size() != 1)
    1227             :     {
    1228           0 :         std::stringstream message;
    1229           0 :         message << "'stop_conditions.size()' has to be equal to 'stages.size()' or to 1 ('stop_conditions.size()' = "
    1230           0 :                 << stop_conditions.size() << ", 'stages.size()' = " << stages.size() << ").";
    1231           0 :         throw tools::invalid_argument(__FILE__, __LINE__, __func__, message.str());
    1232           0 :     }
    1233             : 
    1234           0 :     if (!this->bound_adaptors)
    1235             :     {
    1236           0 :         std::stringstream message;
    1237           0 :         message << "'bound_adaptors' has to be true to execute the pipeline.";
    1238           0 :         throw tools::runtime_error(__FILE__, __LINE__, __func__, message.str());
    1239           0 :     }
    1240             : 
    1241             :     // ----------------------------------------------------------------------------------------------------------------
    1242           0 :     auto& stages = this->stages;
    1243           0 :     std::vector<const std::function<bool(const std::vector<const int*>&)>*> stop_condition_vec(stages.size() - 1,
    1244           0 :                                                                                                nullptr);
    1245           0 :     if (stop_conditions.size() == stages.size())
    1246           0 :         for (size_t s = 0; s < stages.size() - 1; s++)
    1247           0 :             stop_condition_vec[s] = &stop_conditions[s];
    1248             : 
    1249           0 :     std::function<void(const size_t)> func_exec = [&stages, &stop_condition_vec](const size_t tid)
    1250             :     {
    1251           0 :         size_t s = tid;
    1252           0 :         if (stop_condition_vec[s])
    1253           0 :             stages[s]->exec(*(stop_condition_vec[s]));
    1254             :         else
    1255           0 :             stages[s]->exec();
    1256             : 
    1257             :         // send the signal to stop the next stage
    1258           0 :         const auto& tasks = stages[s + 1]->get_tasks_per_threads();
    1259           0 :         for (size_t th = 0; th < tasks.size(); th++)
    1260           0 :             for (size_t ta = 0; ta < tasks[th].size(); ta++)
    1261             :             {
    1262           0 :                 auto m = dynamic_cast<module::Adaptor_m_to_n*>(&tasks[th][ta]->get_module());
    1263           0 :                 if (m != nullptr)
    1264           0 :                     if (tasks[th][ta]->get_name().find("pull") != std::string::npos) m->cancel_waiting();
    1265             :             }
    1266           0 :     };
    1267             : 
    1268           0 :     this->thread_pool->run(func_exec, true);
    1269             : 
    1270           0 :     stages[stages.size() - 1]->exec(stop_conditions[stop_conditions.size() - 1]);
    1271             : 
    1272             :     // stop all the stages before
    1273           0 :     for (size_t notify_s = 0; notify_s < stages.size() - 1; notify_s++)
    1274           0 :         for (auto& m : stages[notify_s]->get_modules<tools::Interface_waiting>())
    1275           0 :             m->cancel_waiting();
    1276             : 
    1277           0 :     this->thread_pool->wait();
    1278           0 :     this->thread_pool->unset_func_exec();
    1279             :     // ----------------------------------------------------------------------------------------------------------------
    1280             : 
    1281             :     // this is NOT made in the tools::Sequence::exec() to correctly flush the pipeline before restoring buffers
    1282             :     // initial configuration
    1283           0 :     for (auto& stage : this->stages)
    1284           0 :         if (stage->is_no_copy_mode())
    1285             :         {
    1286           0 :             stage->reset_no_copy_mode();
    1287           0 :             stage->gen_processes(false);
    1288             :         }
    1289             : 
    1290           0 :     for (auto& padps : this->adaptors)
    1291             :     {
    1292           0 :         for (auto& adp : padps.first)
    1293           0 :             adp->reset();
    1294           0 :         for (auto& adp : padps.second)
    1295           0 :             adp->reset();
    1296             :     }
    1297           0 : }
    1298             : 
    1299             : void
    1300         104 : Pipeline::exec(const std::vector<std::function<bool()>>& stop_conditions)
    1301             : {
    1302         104 :     if (stop_conditions.size() != this->stages.size() && stop_conditions.size() != 1)
    1303             :     {
    1304           0 :         std::stringstream message;
    1305           0 :         message << "'stop_conditions.size()' has to be equal to 'stages.size()' or to 1 ('stop_conditions.size()' = "
    1306           0 :                 << stop_conditions.size() << ", 'stages.size()' = " << stages.size() << ").";
    1307           0 :         throw tools::invalid_argument(__FILE__, __LINE__, __func__, message.str());
    1308           0 :     }
    1309             : 
    1310         104 :     if (!this->bound_adaptors)
    1311             :     {
    1312           0 :         std::stringstream message;
    1313           0 :         message << "'bound_adaptors' has to be true to execute the pipeline.";
    1314           0 :         throw tools::runtime_error(__FILE__, __LINE__, __func__, message.str());
    1315           0 :     }
    1316             : 
    1317             :     // ----------------------------------------------------------------------------------------------------------------
    1318         104 :     auto& stages = this->stages;
    1319         104 :     std::vector<const std::function<bool()>*> stop_condition_vec(stages.size() - 1, nullptr);
    1320         104 :     if (stop_conditions.size() == stages.size())
    1321           4 :         for (size_t s = 0; s < stages.size() - 1; s++)
    1322           0 :             stop_condition_vec[s] = &stop_conditions[s];
    1323             : 
    1324         758 :     std::function<void(const size_t)> func_exec = [&stages, &stop_condition_vec](const size_t tid)
    1325             :     {
    1326         247 :         size_t s = tid;
    1327         247 :         if (stop_condition_vec[s])
    1328           0 :             stages[s]->exec(*(stop_condition_vec[s]));
    1329             :         else
    1330         248 :             stages[s]->exec();
    1331             : 
    1332             :         // send the signal to stop the next stage
    1333         263 :         const auto& tasks = stages[s + 1]->get_tasks_per_threads();
    1334        3354 :         for (size_t th = 0; th < tasks.size(); th++)
    1335       23676 :             for (size_t ta = 0; ta < tasks[th].size(); ta++)
    1336             :             {
    1337       20545 :                 auto m = dynamic_cast<module::Adaptor_m_to_n*>(&tasks[th][ta]->get_module());
    1338       20536 :                 if (m != nullptr)
    1339        6033 :                     if (tasks[th][ta]->get_name().find("pull") != std::string::npos) m->cancel_waiting();
    1340             :             }
    1341         371 :     };
    1342             : 
    1343         104 :     this->thread_pool->run(func_exec, true);
    1344         104 :     stages[stages.size() - 1]->exec(stop_conditions[stop_conditions.size() - 1]);
    1345             : 
    1346             :     // stop all the stages before
    1347         373 :     for (size_t notify_s = 0; notify_s < stages.size() - 1; notify_s++)
    1348        9007 :         for (auto& m : stages[notify_s]->get_modules<tools::Interface_waiting>())
    1349        9007 :             m->cancel_waiting();
    1350             : 
    1351         104 :     this->thread_pool->wait();
    1352         104 :     this->thread_pool->unset_func_exec();
    1353             :     // ----------------------------------------------------------------------------------------------------------------
    1354             : 
    1355             :     // this is NOT made in the tools::Sequence::exec() to correctly flush the pipeline before restoring buffers
    1356             :     // initial configuration
    1357         477 :     for (auto& stage : this->stages)
    1358         373 :         if (stage->is_no_copy_mode())
    1359             :         {
    1360         373 :             stage->reset_no_copy_mode();
    1361         373 :             stage->gen_processes(false);
    1362             :         }
    1363             : 
    1364         373 :     for (auto& padps : this->adaptors)
    1365             :     {
    1366        3361 :         for (auto& adp : padps.first)
    1367        3092 :             adp->reset();
    1368        3092 :         for (auto& adp : padps.second)
    1369        2823 :             adp->reset();
    1370             :     }
    1371         104 : }
    1372             : 
    1373             : void
    1374           0 : Pipeline::exec(std::function<bool(const std::vector<const int*>&)> stop_condition)
    1375             : {
    1376           0 :     this->exec(std::vector<std::function<bool(const std::vector<const int*>&)>>(1, stop_condition));
    1377           0 : }
    1378             : 
    1379             : void
    1380         104 : Pipeline::exec(std::function<bool()> stop_condition)
    1381             : {
    1382         104 :     this->exec(std::vector<std::function<bool()>>(1, stop_condition));
    1383         104 : }
    1384             : 
    1385             : void
    1386          22 : Pipeline::exec()
    1387             : {
    1388        9216 :     this->exec([]() { return false; });
    1389          22 : }
    1390             : 
    1391             : std::vector<std::vector<module::Module*>>
    1392           0 : Pipeline::get_modules_per_threads() const
    1393             : {
    1394           0 :     std::vector<std::vector<module::Module*>> modules_per_threads;
    1395           0 :     for (auto& stage : this->stages)
    1396             :     {
    1397           0 :         auto modules_per_threads_add = stage->get_modules_per_threads();
    1398           0 :         if (modules_per_threads_add.size() > modules_per_threads.size())
    1399           0 :             modules_per_threads.resize(modules_per_threads_add.size());
    1400             : 
    1401           0 :         for (size_t t = 0; t < modules_per_threads_add.size(); t++)
    1402           0 :             modules_per_threads[t].insert(
    1403           0 :               modules_per_threads[t].end(), modules_per_threads_add[t].begin(), modules_per_threads_add[t].end());
    1404           0 :     }
    1405           0 :     return modules_per_threads;
    1406           0 : }
    1407             : 
    1408             : std::vector<std::vector<module::Module*>>
    1409           0 : Pipeline::get_modules_per_types() const
    1410             : {
    1411           0 :     std::vector<std::vector<module::Module*>> modules_per_types;
    1412           0 :     for (auto& stage : this->stages)
    1413             :     {
    1414           0 :         auto modules_per_types_add = stage->get_modules_per_types();
    1415           0 :         modules_per_types.insert(modules_per_types.end(), modules_per_types_add.begin(), modules_per_types_add.end());
    1416           0 :     }
    1417           0 :     return modules_per_types;
    1418           0 : }
    1419             : 
    1420             : std::vector<std::vector<module::Module*>>
    1421           0 : Pipeline::get_original_modules() const
    1422             : {
    1423           0 :     return this->original_sequence.get_modules_per_types();
    1424             : }
    1425             : 
    1426             : std::vector<std::vector<runtime::Task*>>
    1427         104 : Pipeline::get_tasks_per_threads() const
    1428             : {
    1429         104 :     std::vector<std::vector<runtime::Task*>> tasks_per_threads;
    1430         477 :     for (auto& stage : this->stages)
    1431             :     {
    1432         373 :         auto tasks_per_threads_add = stage->get_tasks_per_threads();
    1433         373 :         if (tasks_per_threads_add.size() > tasks_per_threads.size())
    1434         202 :             tasks_per_threads.resize(tasks_per_threads_add.size());
    1435             : 
    1436        3574 :         for (size_t t = 0; t < tasks_per_threads_add.size(); t++)
    1437       12804 :             tasks_per_threads[t].insert(
    1438       12804 :               tasks_per_threads[t].end(), tasks_per_threads_add[t].begin(), tasks_per_threads_add[t].end());
    1439         373 :     }
    1440         104 :     return tasks_per_threads;
    1441           0 : }
    1442             : 
    1443             : std::vector<std::vector<runtime::Task*>>
    1444           0 : Pipeline::get_tasks_per_types() const
    1445             : {
    1446           0 :     std::vector<std::vector<runtime::Task*>> tasks_per_types;
    1447           0 :     for (auto& stage : this->stages)
    1448             :     {
    1449           0 :         auto tasks_per_types_add = stage->get_tasks_per_types();
    1450           0 :         tasks_per_types.insert(tasks_per_types.end(), tasks_per_types_add.begin(), tasks_per_types_add.end());
    1451           0 :     }
    1452           0 :     return tasks_per_types;
    1453           0 : }
    1454             : 
    1455             : void
    1456           6 : Pipeline::export_dot(std::ostream& stream) const
    1457             : {
    1458             :     std::function<void(tools::Digraph_node<Sub_sequence>*,
    1459             :                        const size_t,
    1460             :                        const std::string&,
    1461             :                        std::ostream&,
    1462             :                        std::vector<tools::Digraph_node<Sub_sequence>*>&)>
    1463          25 :       export_dot_subsequences_recursive = [&export_dot_subsequences_recursive,
    1464             :                                            this](tools::Digraph_node<Sub_sequence>* cur_node,
    1465             :                                                  const size_t sta,
    1466             :                                                  const std::string& tab,
    1467             :                                                  std::ostream& stream,
    1468          25 :                                                  std::vector<tools::Digraph_node<Sub_sequence>*>& already_parsed_nodes)
    1469             :     {
    1470          50 :         if (cur_node != nullptr &&
    1471          50 :             std::find(already_parsed_nodes.begin(), already_parsed_nodes.end(), cur_node) == already_parsed_nodes.end())
    1472             :         {
    1473          25 :             already_parsed_nodes.push_back(cur_node);
    1474          50 :             this->stages[sta]->export_dot_subsequence(cur_node->get_c()->tasks,
    1475          25 :                                                       cur_node->get_c()->tasks_id,
    1476          25 :                                                       cur_node->get_c()->type,
    1477          50 :                                                       "Sub-sequence " + std::to_string(cur_node->get_c()->id) +
    1478         100 :                                                         " (depth = " + std::to_string(cur_node->get_depth()) + ")",
    1479             :                                                       tab,
    1480             :                                                       stream);
    1481             : 
    1482          25 :             for (auto c : cur_node->get_children())
    1483           0 :                 export_dot_subsequences_recursive(c, sta, tab, stream, already_parsed_nodes);
    1484             :         }
    1485          31 :     };
    1486             : 
    1487             :     std::function<void(tools::Digraph_node<Sub_sequence>*,
    1488             :                        const size_t,
    1489             :                        const std::string&,
    1490             :                        std::ostream&,
    1491             :                        std::vector<tools::Digraph_node<Sub_sequence>*>&)>
    1492             :       export_dot_connections_recursive =
    1493          25 :         [&export_dot_connections_recursive, this](tools::Digraph_node<Sub_sequence>* cur_node,
    1494             :                                                   const size_t sta,
    1495             :                                                   const std::string& tab,
    1496             :                                                   std::ostream& stream,
    1497          25 :                                                   std::vector<tools::Digraph_node<Sub_sequence>*>& already_parsed_nodes)
    1498             :     {
    1499          50 :         if (cur_node != nullptr &&
    1500          50 :             std::find(already_parsed_nodes.begin(), already_parsed_nodes.end(), cur_node) == already_parsed_nodes.end())
    1501             :         {
    1502          25 :             already_parsed_nodes.push_back(cur_node);
    1503          25 :             this->stages[sta]->export_dot_connections(cur_node->get_c()->tasks, tab, stream);
    1504             : 
    1505          25 :             for (auto c : cur_node->get_children())
    1506           0 :                 export_dot_connections_recursive(c, sta, tab, stream, already_parsed_nodes);
    1507             :         }
    1508          31 :     };
    1509             : 
    1510           6 :     std::string tab = "\t";
    1511           6 :     stream << "digraph Pipeline {" << std::endl;
    1512           6 :     stream << tab << "compound=true;" << std::endl;
    1513             : 
    1514          31 :     for (size_t sta = 0; sta < this->stages.size(); sta++)
    1515             :     {
    1516          25 :         std::vector<tools::Digraph_node<Sub_sequence>*> already_parsed_nodes;
    1517          25 :         const auto n_threads = this->stages[sta]->get_n_threads();
    1518          25 :         stream << tab << "subgraph \"cluster_Stage " << sta << "\" {" << std::endl;
    1519          25 :         stream << tab << tab << "node [style=filled];" << std::endl;
    1520          25 :         export_dot_subsequences_recursive(this->stages[sta]->sequences[0], sta, tab, stream, already_parsed_nodes);
    1521          25 :         stream << tab << tab << "label=\"Pipeline stage " << sta << " (" << n_threads << " thread(s))\";" << std::endl;
    1522          25 :         std::string color = "blue";
    1523          25 :         stream << tab << tab << "color=" << color << ";" << std::endl;
    1524          25 :         stream << tab << "}" << std::endl;
    1525          25 :     }
    1526             : 
    1527          31 :     for (size_t sta = 0; sta < this->stages.size(); sta++)
    1528             :     {
    1529          25 :         std::vector<tools::Digraph_node<Sub_sequence>*> already_parsed_nodes;
    1530          25 :         export_dot_connections_recursive(this->stages[sta]->sequences[0], sta, tab, stream, already_parsed_nodes);
    1531          25 :         if (this->bound_adaptors)
    1532             :         {
    1533          25 :             if (sta > 0)
    1534             :             {
    1535          19 :                 auto tsk1 = this->stages[sta - 1]->get_lasts_tasks()[0].back();
    1536          19 :                 auto tsk2 = this->stages[sta + 0]->get_firsts_tasks()[0][0];
    1537             : 
    1538          19 :                 auto sck1 = tsk1->sockets[0];
    1539          19 :                 auto sck2 = tsk2->sockets[0];
    1540             : 
    1541          19 :                 stream << tab << "\"" << +sck1.get() << "\" -> \"" << +sck2.get() << "\" [ltail=\"cluster_"
    1542          19 :                        << +&tsk1->get_module() << "_" << +tsk1 << "\" lhead=\"cluster_" << +&tsk2->get_module() << "_"
    1543          19 :                        << +tsk2 << "\" color=\"green\" style=\"dashed\"];" << std::endl;
    1544          19 :             }
    1545             :         }
    1546          25 :     }
    1547             : 
    1548           6 :     stream << "}" << std::endl;
    1549           6 : }
    1550             : 
    1551             : bool
    1552         206 : Pipeline::is_bound_adaptors() const
    1553             : {
    1554         206 :     return this->bound_adaptors;
    1555             : }
    1556             : 
    1557             : void
    1558           0 : Pipeline::set_auto_stop(const bool auto_stop)
    1559             : {
    1560           0 :     this->auto_stop = auto_stop;
    1561           0 :     for (auto stage : this->stages)
    1562           0 :         stage->set_auto_stop(auto_stop);
    1563           0 : }
    1564             : 
    1565             : bool
    1566           0 : Pipeline::is_auto_stop() const
    1567             : {
    1568           0 :     return this->auto_stop;
    1569             : }
    1570             : 
    1571             : size_t
    1572           0 : Pipeline::get_n_frames() const
    1573             : {
    1574           0 :     const auto n_frames = this->stages[0]->get_n_frames();
    1575             : 
    1576           0 :     for (auto& sta : this->stages)
    1577           0 :         if (sta->get_n_frames() != n_frames)
    1578             :         {
    1579           0 :             std::stringstream message;
    1580           0 :             message << "All the stages/sequences do not have the same 'n_frames' value ('sta->get_n_frames()' = "
    1581           0 :                     << sta->get_n_frames() << ", 'n_frames' = " << n_frames << ").";
    1582           0 :             throw tools::runtime_error(__FILE__, __LINE__, __func__, message.str());
    1583           0 :         }
    1584             : 
    1585           0 :     return n_frames;
    1586             : }
    1587             : 
    1588             : void
    1589         206 : Pipeline::set_n_frames(const size_t n_frames)
    1590             : {
    1591         206 :     const auto save_bound_adaptors = this->is_bound_adaptors();
    1592         206 :     if (!save_bound_adaptors) this->bind_adaptors();
    1593         206 :     this->_unbind_adaptors(false);
    1594             : 
    1595             :     // set the new "n_frames" val in the sequences
    1596         206 :     std::vector<std::vector<std::pair<runtime::Socket*, runtime::Socket*>>> unbind_sockets(this->stages.size());
    1597         206 :     std::vector<std::vector<std::pair<runtime::Task*, runtime::Socket*>>> unbind_tasks(this->stages.size());
    1598         206 :     std::vector<bool> skip(this->stages.size());
    1599         944 :     for (size_t s = 0; s < this->stages.size(); s++)
    1600         738 :         skip[s] = this->stages[s]->get_n_frames() == n_frames;
    1601         944 :     for (size_t s = 0; s < this->stages.size(); s++)
    1602         738 :         if (!skip[s]) this->stages[s]->_set_n_frames_unbind(unbind_sockets[s], unbind_tasks[s]);
    1603         944 :     for (size_t s = 0; s < this->stages.size(); s++)
    1604         738 :         if (!skip[s]) this->stages[s]->_set_n_frames(n_frames);
    1605         944 :     for (size_t s = 0; s < this->stages.size(); s++)
    1606         738 :         if (!skip[s]) this->stages[s]->_set_n_frames_rebind(unbind_sockets[s], unbind_tasks[s]);
    1607             : 
    1608             :     // set the new "n_frames" val in the adaptors
    1609         738 :     for (auto& adps : this->adaptors)
    1610             :     {
    1611        6710 :         for (auto& adp : adps.first)
    1612        6178 :             adp->set_n_frames(n_frames);
    1613        6178 :         for (auto& adp : adps.second)
    1614        5646 :             adp->set_n_frames(n_frames);
    1615             :     }
    1616             : 
    1617             :     // bind orphans to complete the unbind of the adaptors
    1618        1052 :     for (auto& bind : this->sck_orphan_binds)
    1619             :     {
    1620         846 :         auto sck_out = std::get<0>(bind.first);
    1621         846 :         auto priority = std::get<4>(bind.first);
    1622         846 :         auto sck_in = std::get<0>(bind.second);
    1623         846 :         if (sck_in != nullptr)
    1624         788 :             sck_in->_bind(*sck_out, priority);
    1625             :         else
    1626             :         {
    1627          58 :             auto tsk_in = std::get<4>(bind.second);
    1628          58 :             assert(tsk_in != nullptr);
    1629          58 :             tsk_in->_bind(*sck_out, priority);
    1630             :         }
    1631             :     }
    1632             : 
    1633         206 :     if (save_bound_adaptors) this->bind_adaptors();
    1634         206 : }

Generated by: LCOV version 1.14