LCOV - code coverage report
Current view: top level - include/Runtime/Sequence - Sequence.hpp (source / functions) Hit Total Coverage
Test: streampu_clean.info Lines: 5 5 100.0 %
Date: 2025-07-17 17:04:07 Functions: 6 6 100.0 %

          Line data    Source code
       1             : /*!
       2             :  * \file
       3             :  * \brief Class runtime::Sequence.
       4             :  */
       5             : #ifndef SEQUENCE_HPP_
       6             : #define SEQUENCE_HPP_
       7             : 
       8             : #include <atomic>
       9             : #include <functional>
      10             : #include <iostream>
      11             : #include <map>
      12             : #include <memory>
      13             : #include <mutex>
      14             : #include <string>
      15             : #include <vector>
      16             : 
      17             : #include "Runtime/Socket/Socket.hpp"
      18             : #include "Tools/Algo/Digraph/Digraph_node.hpp"
      19             : #include "Tools/Interface/Interface_clone.hpp"
      20             : #include "Tools/Interface/Interface_get_set_n_frames.hpp"
      21             : #include "Tools/Interface/Interface_is_done.hpp"
      22             : #include "Tools/Thread/Thread_pool/Thread_pool.hpp"
      23             : 
      24             : namespace spu
      25             : {
      26             : namespace module
      27             : {
      28             : class Task;
      29             : class Module;
      30             : } // namespace module
      31             : namespace sched
      32             : {
      33             : class Scheduler;
      34             : }
      35             : namespace runtime
      36             : {
      37             : class Pipeline;
      38             : 
      39             : enum class subseq_t : size_t
      40             : {
      41             :     STD,
      42             :     COMMUTE,
      43             :     SELECT
      44             : };
      45             : 
      46             : template<class VTA = std::vector<runtime::Task*>>
      47             : class Sub_sequence_generic
      48             : {
      49             :   public:
      50             :     subseq_t type;
      51             :     VTA tasks;
      52             :     std::vector<std::function<const int*()>> processes;
      53             :     std::vector<size_t> tasks_id;
      54             :     size_t id;
      55             : 
      56             :     // usefull in case of adaptor to make zero copy and restore original states at
      57             :     // the end of the chain execution
      58             :     std::vector<std::vector<std::vector<Socket*>>> rebind_sockets;
      59             :     std::vector<std::vector<std::vector<void*>>> rebind_dataptrs;
      60             : 
      61       16537 :     explicit Sub_sequence_generic()
      62       16537 :       : type(subseq_t::STD)
      63       16537 :       , id(0)
      64             :     {
      65       16537 :     }
      66       33074 :     virtual ~Sub_sequence_generic() = default;
      67             : };
      68             : 
      69             : using Sub_sequence = Sub_sequence_generic<std::vector<runtime::Task*>>;
      70             : using Sub_sequence_const = Sub_sequence_generic<std::vector<const runtime::Task*>>;
      71             : 
      72             : class Sequence
      73             :   : public tools::Interface_clone
      74             :   , public tools::Interface_get_set_n_frames
      75             :   , public tools::Interface_is_done
      76             : {
      77             :     friend Pipeline;
      78             :     friend sched::Scheduler;
      79             : 
      80             :   protected:
      81             :     const size_t n_threads;
      82             :     std::shared_ptr<tools::Thread_pool> thread_pool;
      83             : 
      84             :     std::vector<tools::Digraph_node<Sub_sequence>*> sequences;
      85             :     std::vector<size_t> firsts_tasks_id;
      86             :     std::vector<size_t> lasts_tasks_id;
      87             :     std::vector<std::vector<runtime::Task*>> firsts_tasks;
      88             :     std::vector<std::vector<runtime::Task*>> lasts_tasks;
      89             :     std::vector<std::vector<std::shared_ptr<module::Module>>> modules;
      90             :     std::vector<std::vector<module::Module*>> all_modules;
      91             :     std::shared_ptr<std::mutex> mtx_exception;
      92             :     std::vector<std::string> prev_exception_messages;
      93             :     std::vector<std::string> prev_exception_messages_to_display;
      94             :     std::shared_ptr<std::atomic<bool>> force_exit_loop;
      95             :     size_t n_tasks;
      96             :     bool tasks_inplace;
      97             :     bool thread_pinning;
      98             :     std::vector<size_t> puids;
      99             :     bool no_copy_mode;
     100             :     const std::vector<const runtime::Task*> saved_exclusions;
     101             :     std::vector<tools::Interface_is_done*> donners;
     102             :     std::vector<std::vector<tools::Interface_reset*>> switchers_reset;
     103             :     bool auto_stop;
     104             :     bool is_part_of_pipeline;
     105             : 
     106             :     // internal state for the `exec_step` method
     107             :     std::vector<bool> next_round_is_over;
     108             :     std::vector<size_t> cur_task_id;
     109             :     std::vector<tools::Digraph_node<Sub_sequence>*> cur_ss;
     110             : 
     111             :     // extra attribute for pinning v2
     112             :     std::vector<std::string> pin_objects_per_thread;
     113             : 
     114             :     // is memory allocation enabled?
     115             :     bool memory_allocation;
     116             : 
     117             :   public:
     118             :     Sequence(const std::vector<const runtime::Task*>& firsts,
     119             :              const size_t n_threads = 1,
     120             :              const bool thread_pinning = false,
     121             :              const std::vector<size_t>& puids = {},
     122             :              const bool memory_allocation = true);
     123             :     Sequence(const std::vector<const runtime::Task*>& firsts,
     124             :              const std::vector<const runtime::Task*>& lasts,
     125             :              const size_t n_threads = 1,
     126             :              const bool thread_pinning = false,
     127             :              const std::vector<size_t>& puids = {},
     128             :              const bool memory_allocation = true);
     129             :     Sequence(const std::vector<const runtime::Task*>& firsts,
     130             :              const std::vector<const runtime::Task*>& lasts,
     131             :              const std::vector<const runtime::Task*>& exclusions,
     132             :              const size_t n_threads = 1,
     133             :              const bool thread_pinning = false,
     134             :              const std::vector<size_t>& puids = {},
     135             :              const bool memory_allocation = true);
     136             :     Sequence(const runtime::Task& first,
     137             :              const size_t n_threads = 1,
     138             :              const bool thread_pinning = false,
     139             :              const std::vector<size_t>& puids = {},
     140             :              const bool memory_allocation = true);
     141             :     Sequence(const runtime::Task& first,
     142             :              const runtime::Task& last,
     143             :              const size_t n_threads = 1,
     144             :              const bool thread_pinning = false,
     145             :              const std::vector<size_t>& puids = {},
     146             :              const bool memory_allocation = true);
     147             :     Sequence(const std::vector<runtime::Task*>& firsts,
     148             :              const size_t n_threads = 1,
     149             :              const bool thread_pinning = false,
     150             :              const std::vector<size_t>& puids = {},
     151             :              const bool tasks_inplace = true,
     152             :              const bool memory_allocation = true);
     153             :     Sequence(const std::vector<runtime::Task*>& firsts,
     154             :              const std::vector<runtime::Task*>& lasts,
     155             :              const size_t n_threads = 1,
     156             :              const bool thread_pinning = false,
     157             :              const std::vector<size_t>& puids = {},
     158             :              const bool tasks_inplace = true,
     159             :              const bool memory_allocation = true);
     160             :     Sequence(const std::vector<runtime::Task*>& firsts,
     161             :              const std::vector<runtime::Task*>& lasts,
     162             :              const std::vector<runtime::Task*>& exclusions,
     163             :              const size_t n_threads = 1,
     164             :              const bool thread_pinning = false,
     165             :              const std::vector<size_t>& puids = {},
     166             :              const bool tasks_inplace = true,
     167             :              const bool memory_allocation = true);
     168             :     Sequence(runtime::Task& first,
     169             :              const size_t n_threads = 1,
     170             :              const bool thread_pinning = false,
     171             :              const std::vector<size_t>& puids = {},
     172             :              const bool tasks_inplace = true,
     173             :              const bool memory_allocation = true);
     174             :     Sequence(runtime::Task& first,
     175             :              runtime::Task& last,
     176             :              const size_t n_threads = 1,
     177             :              const bool thread_pinning = false,
     178             :              const std::vector<size_t>& puids = {},
     179             :              const bool tasks_inplace = true,
     180             :              const bool memory_allocation = true);
     181             : 
     182             :     // Constructors for pinning v2
     183             :     Sequence(const std::vector<const runtime::Task*>& firsts,
     184             :              const size_t n_threads,
     185             :              const bool thread_pinning,
     186             :              const std::string& sequence_pinning_policy,
     187             :              const bool memory_allocation = true);
     188             :     Sequence(const std::vector<const runtime::Task*>& firsts,
     189             :              const std::vector<const runtime::Task*>& lasts,
     190             :              const size_t n_threads,
     191             :              const bool thread_pinning,
     192             :              const std::string& sequence_pinning_policy,
     193             :              const bool memory_allocation = true);
     194             :     Sequence(const std::vector<const runtime::Task*>& firsts,
     195             :              const std::vector<const runtime::Task*>& lasts,
     196             :              const std::vector<const runtime::Task*>& exclusions,
     197             :              const size_t n_threads,
     198             :              const bool thread_pinning,
     199             :              const std::string& sequence_pinning_policy,
     200             :              const bool memory_allocation = true);
     201             :     Sequence(const runtime::Task& first,
     202             :              const size_t n_threads,
     203             :              const bool thread_pinning,
     204             :              const std::string& sequence_pinning_policy,
     205             :              const bool memory_allocation = true);
     206             :     Sequence(const runtime::Task& first,
     207             :              const runtime::Task& last,
     208             :              const size_t n_threads,
     209             :              const bool thread_pinning,
     210             :              const std::string& sequence_pinning_policy,
     211             :              const bool memory_allocation = true);
     212             :     Sequence(const std::vector<runtime::Task*>& firsts,
     213             :              const size_t n_threads,
     214             :              const bool thread_pinning,
     215             :              const std::string& sequence_pinning_policy,
     216             :              const bool tasks_inplace = true,
     217             :              const bool memory_allocation = true);
     218             :     Sequence(const std::vector<runtime::Task*>& firsts,
     219             :              const std::vector<runtime::Task*>& lasts,
     220             :              const size_t n_threads,
     221             :              const bool thread_pinning,
     222             :              const std::string& sequence_pinning_policy,
     223             :              const bool tasks_inplace = true,
     224             :              const bool memory_allocation = true);
     225             :     Sequence(const std::vector<runtime::Task*>& firsts,
     226             :              const std::vector<runtime::Task*>& lasts,
     227             :              const std::vector<runtime::Task*>& exclusions,
     228             :              const size_t n_threads,
     229             :              const bool thread_pinning,
     230             :              const std::string& sequence_pinning_policy,
     231             :              const bool tasks_inplace = true,
     232             :              const bool memory_allocation = true);
     233             :     Sequence(runtime::Task& first,
     234             :              const size_t n_threads,
     235             :              const bool thread_pinning,
     236             :              const std::string& sequence_pinning_policy,
     237             :              const bool tasks_inplace = true,
     238             :              const bool memory_allocation = true);
     239             :     Sequence(runtime::Task& first,
     240             :              runtime::Task& last,
     241             :              const size_t n_threads,
     242             :              const bool thread_pinning,
     243             :              const std::string& sequence_pinning_policy,
     244             :              const bool tasks_inplace = true,
     245             :              const bool memory_allocation = true);
     246             : 
     247             :     virtual ~Sequence();
     248             :     virtual Sequence* clone() const;
     249             : 
     250             :     void set_thread_pinning(const bool thread_pinning);
     251             :     void set_thread_pinning(const bool thread_pinning, const std::vector<size_t>& puids = {});
     252             :     // Set pinning for the second version
     253             :     void set_thread_pinning(const bool thread_pinning, const std::string& sequence_pinning_policy);
     254             :     bool is_thread_pinning();
     255             : 
     256             :     // Allocate sequence memory
     257             :     void allocate_outbuffers();
     258             :     // Deallocate sequence memory
     259             :     void deallocate_outbuffers();
     260             : 
     261             :     void exec(std::function<bool(const std::vector<const int*>&)> stop_condition);
     262             :     void exec(std::function<bool()> stop_condition);
     263             :     void exec();
     264             :     void exec_seq(const size_t tid = 0, const int frame_id = -1);
     265             :     runtime::Task* exec_step(const size_t tid = 0, const int frame_id = -1);
     266             : 
     267             :     inline size_t get_n_threads() const;
     268             : 
     269             :     template<class C = module::Module>
     270             :     std::vector<C*> get_modules(const bool set_modules = true) const;
     271             :     template<class C = module::Module>
     272             :     std::vector<C*> get_cloned_modules(const C& module_ref) const;
     273             : 
     274             :     std::vector<std::vector<module::Module*>> get_modules_per_threads() const;
     275             :     std::vector<std::vector<module::Module*>> get_modules_per_types() const;
     276             : 
     277             :     std::vector<std::vector<runtime::Task*>> get_tasks_per_threads() const;
     278             :     std::vector<std::vector<runtime::Task*>> get_tasks_per_types() const;
     279             : 
     280             :     inline const std::vector<std::vector<runtime::Task*>>& get_firsts_tasks() const;
     281             :     inline const std::vector<std::vector<runtime::Task*>>& get_lasts_tasks() const;
     282             : 
     283             :     void export_dot(std::ostream& stream = std::cout) const;
     284             : 
     285             :     void set_no_copy_mode(const bool no_copy_mode);
     286             :     bool is_no_copy_mode() const;
     287             : 
     288             :     void set_auto_stop(const bool auto_stop);
     289             :     bool is_auto_stop() const;
     290             : 
     291             :     inline size_t get_n_frames() const;
     292             :     void set_n_frames(const size_t n_frames);
     293             : 
     294             :     virtual bool is_done() const;
     295             : 
     296             :     bool is_control_flow() const;
     297             : 
     298             :   protected:
     299             :     template<class SS>
     300             :     void delete_tree(tools::Digraph_node<SS>* node, std::vector<tools::Digraph_node<SS>*>& already_deleted_nodes);
     301             : 
     302             :     template<class SS, class TA>
     303             :     tools::Digraph_node<SS>* init_recursive(tools::Digraph_node<SS>* cur_subseq,
     304             :                                             size_t& ssid,
     305             :                                             size_t& taid,
     306             :                                             std::vector<std::pair<TA*, tools::Digraph_node<SS>*>>& selectors,
     307             :                                             std::vector<TA*>& switchers,
     308             :                                             TA& first,
     309             :                                             TA& current_task,
     310             :                                             const std::vector<TA*>& lasts,
     311             :                                             const std::vector<TA*>& exclusions,
     312             :                                             std::vector<size_t>& real_lasts_id,
     313             :                                             std::vector<TA*>& real_lasts,
     314             :                                             std::map<TA*, unsigned>& in_sockets_feed,
     315             :                                             std::map<TA*, std::pair<tools::Digraph_node<SS>*, size_t>>& task_subseq);
     316             : 
     317             :     template<class VTA>
     318             :     void export_dot_subsequence(const VTA& subseq,
     319             :                                 const std::vector<size_t>& tasks_id,
     320             :                                 const subseq_t& subseq_type,
     321             :                                 const std::string& subseq_name,
     322             :                                 const std::string& tab,
     323             :                                 std::ostream& stream = std::cout) const;
     324             : 
     325             :     template<class VTA>
     326             :     void export_dot_connections(const VTA& subseq, const std::string& tab, std::ostream& stream = std::cout) const;
     327             : 
     328             :     template<class SS>
     329             :     void export_dot(tools::Digraph_node<SS>* root, std::ostream& stream = std::cout) const;
     330             : 
     331             :     template<class SS, class MO>
     332             :     void replicate(const tools::Digraph_node<SS>* sequence);
     333             : 
     334             :     void _exec(const size_t tid,
     335             :                std::function<bool(const std::vector<const int*>&)>& stop_condition,
     336             :                tools::Digraph_node<Sub_sequence>* sequence);
     337             : 
     338             :     void _exec_without_statuses(const size_t tid,
     339             :                                 std::function<bool()>& stop_condition,
     340             :                                 tools::Digraph_node<Sub_sequence>* sequence);
     341             : 
     342             :     void gen_processes(const bool no_copy_mode = false);
     343             :     void reset_no_copy_mode();
     344             : 
     345             :     template<class SS>
     346             :     void check_ctrl_flow(tools::Digraph_node<SS>* root);
     347             :     Sub_sequence* get_last_subsequence(const size_t tid);
     348             :     void update_tasks_id(const size_t tid);
     349             : 
     350             :     std::vector<runtime::Task*> get_tasks_from_id(const size_t taid);
     351             : 
     352             :     void update_firsts_and_lasts_tasks();
     353             : 
     354             :     void _set_n_frames_unbind(std::vector<std::pair<runtime::Socket*, runtime::Socket*>>& unbind_sockets,
     355             :                               std::vector<std::pair<runtime::Task*, runtime::Socket*>>& unbind_tasks);
     356             :     void _set_n_frames(const size_t n_frames);
     357             :     void _set_n_frames_rebind(const std::vector<std::pair<runtime::Socket*, runtime::Socket*>>& unbind_sockets,
     358             :                               const std::vector<std::pair<runtime::Task*, runtime::Socket*>>& unbind_tasks);
     359             : 
     360             :   private:
     361             :     template<class SS, class TA>
     362             :     void init(const std::vector<TA*>& firsts, const std::vector<TA*>& lasts, const std::vector<TA*>& exclusions);
     363             :     template<class SS>
     364             :     inline void _init(tools::Digraph_node<SS>* root);
     365             : };
     366             : } // namespace runtime
     367             : } // namespace spu
     368             : 
     369             : #ifndef DOXYGEN_SHOULD_SKIP_THIS
     370             : #include "Runtime/Sequence/Sequence.hxx"
     371             : #endif
     372             : 
     373             : #endif /* SEQUENCE_HPP_ */

Generated by: LCOV version 1.14