Line data Source code
1 : /*!
2 : * \file
3 : * \brief Class runtime::Sequence.
4 : */
5 : #ifndef SEQUENCE_HPP_
6 : #define SEQUENCE_HPP_
7 :
8 : #include <atomic>
9 : #include <functional>
10 : #include <iostream>
11 : #include <map>
12 : #include <memory>
13 : #include <mutex>
14 : #include <string>
15 : #include <vector>
16 :
17 : #include "Runtime/Socket/Socket.hpp"
18 : #include "Tools/Algo/Digraph/Digraph_node.hpp"
19 : #include "Tools/Interface/Interface_clone.hpp"
20 : #include "Tools/Interface/Interface_get_set_n_frames.hpp"
21 : #include "Tools/Interface/Interface_is_done.hpp"
22 :
23 : namespace spu
24 : {
25 : namespace module
26 : {
27 : class Task;
28 : class Module;
29 : } // namespace module
30 : namespace sched
31 : {
32 : class Scheduler;
33 : }
34 : namespace runtime
35 : {
36 : class Pipeline;
37 :
38 : enum class subseq_t : size_t
39 : {
40 : STD,
41 : COMMUTE,
42 : SELECT
43 : };
44 :
45 : template<class VTA = std::vector<runtime::Task*>>
46 : class Sub_sequence_generic
47 : {
48 : public:
49 : subseq_t type;
50 : VTA tasks;
51 : std::vector<std::function<const int*()>> processes;
52 : std::vector<size_t> tasks_id;
53 : size_t id;
54 :
55 : // usefull in case of adaptor to make zero copy and restore original states at
56 : // the end of the chain execution
57 : std::vector<std::vector<std::vector<Socket*>>> rebind_sockets;
58 : std::vector<std::vector<std::vector<void*>>> rebind_dataptrs;
59 :
60 14403 : explicit Sub_sequence_generic()
61 14403 : : type(subseq_t::STD)
62 14403 : , id(0)
63 : {
64 14403 : }
65 28806 : virtual ~Sub_sequence_generic() = default;
66 : };
67 :
68 : using Sub_sequence = Sub_sequence_generic<std::vector<runtime::Task*>>;
69 : using Sub_sequence_const = Sub_sequence_generic<std::vector<const runtime::Task*>>;
70 :
71 : class Sequence
72 : : public tools::Interface_clone
73 : , public tools::Interface_get_set_n_frames
74 : , public tools::Interface_is_done
75 : {
76 : friend Pipeline;
77 : friend sched::Scheduler;
78 :
79 : protected:
80 : size_t n_threads;
81 : std::vector<tools::Digraph_node<Sub_sequence>*> sequences;
82 : std::vector<size_t> firsts_tasks_id;
83 : std::vector<size_t> lasts_tasks_id;
84 : std::vector<std::vector<runtime::Task*>> firsts_tasks;
85 : std::vector<std::vector<runtime::Task*>> lasts_tasks;
86 : std::vector<std::vector<std::shared_ptr<module::Module>>> modules;
87 : std::vector<std::vector<module::Module*>> all_modules;
88 : std::shared_ptr<std::mutex> mtx_exception;
89 : std::vector<std::string> prev_exception_messages;
90 : std::vector<std::string> prev_exception_messages_to_display;
91 : std::shared_ptr<std::atomic<bool>> force_exit_loop;
92 : size_t n_tasks;
93 : bool tasks_inplace;
94 : bool thread_pinning;
95 : std::vector<size_t> puids;
96 : bool no_copy_mode;
97 : const std::vector<const runtime::Task*> saved_exclusions;
98 : std::vector<tools::Interface_is_done*> donners;
99 : std::vector<std::vector<tools::Interface_reset*>> switchers_reset;
100 : bool auto_stop;
101 : bool is_part_of_pipeline;
102 :
103 : // internal state for the `exec_step` method
104 : std::vector<bool> next_round_is_over;
105 : std::vector<size_t> cur_task_id;
106 : std::vector<tools::Digraph_node<Sub_sequence>*> cur_ss;
107 :
108 : // extra attribute for pinning v2
109 : std::vector<std::string> pin_objects_per_thread;
110 :
111 : public:
112 : Sequence(const std::vector<const runtime::Task*>& firsts,
113 : const size_t n_threads = 1,
114 : const bool thread_pinning = false,
115 : const std::vector<size_t>& puids = {});
116 : Sequence(const std::vector<const runtime::Task*>& firsts,
117 : const std::vector<const runtime::Task*>& lasts,
118 : const size_t n_threads = 1,
119 : const bool thread_pinning = false,
120 : const std::vector<size_t>& puids = {});
121 : Sequence(const std::vector<const runtime::Task*>& firsts,
122 : const std::vector<const runtime::Task*>& lasts,
123 : const std::vector<const runtime::Task*>& exclusions,
124 : const size_t n_threads = 1,
125 : const bool thread_pinning = false,
126 : const std::vector<size_t>& puids = {});
127 : Sequence(const runtime::Task& first,
128 : const size_t n_threads = 1,
129 : const bool thread_pinning = false,
130 : const std::vector<size_t>& puids = {});
131 : Sequence(const runtime::Task& first,
132 : const runtime::Task& last,
133 : const size_t n_threads = 1,
134 : const bool thread_pinning = false,
135 : const std::vector<size_t>& puids = {});
136 : Sequence(const std::vector<runtime::Task*>& firsts,
137 : const size_t n_threads = 1,
138 : const bool thread_pinning = false,
139 : const std::vector<size_t>& puids = {},
140 : const bool tasks_inplace = true);
141 : Sequence(const std::vector<runtime::Task*>& firsts,
142 : const std::vector<runtime::Task*>& lasts,
143 : const size_t n_threads = 1,
144 : const bool thread_pinning = false,
145 : const std::vector<size_t>& puids = {},
146 : const bool tasks_inplace = true);
147 : Sequence(const std::vector<runtime::Task*>& firsts,
148 : const std::vector<runtime::Task*>& lasts,
149 : const std::vector<runtime::Task*>& exclusions,
150 : const size_t n_threads = 1,
151 : const bool thread_pinning = false,
152 : const std::vector<size_t>& puids = {},
153 : const bool tasks_inplace = true);
154 : Sequence(runtime::Task& first,
155 : const size_t n_threads = 1,
156 : const bool thread_pinning = false,
157 : const std::vector<size_t>& puids = {},
158 : const bool tasks_inplace = true);
159 : Sequence(runtime::Task& first,
160 : runtime::Task& last,
161 : const size_t n_threads = 1,
162 : const bool thread_pinning = false,
163 : const std::vector<size_t>& puids = {},
164 : const bool tasks_inplace = true);
165 :
166 : // Constructors for pinning v2
167 : Sequence(const std::vector<const runtime::Task*>& firsts,
168 : const size_t n_threads,
169 : const bool thread_pinning,
170 : const std::string& sequence_pinning_policy);
171 : Sequence(const std::vector<const runtime::Task*>& firsts,
172 : const std::vector<const runtime::Task*>& lasts,
173 : const size_t n_threads,
174 : const bool thread_pinning,
175 : const std::string& sequence_pinning_policy);
176 : Sequence(const std::vector<const runtime::Task*>& firsts,
177 : const std::vector<const runtime::Task*>& lasts,
178 : const std::vector<const runtime::Task*>& exclusions,
179 : const size_t n_threads,
180 : const bool thread_pinning,
181 : const std::string& sequence_pinning_policy);
182 : Sequence(const runtime::Task& first,
183 : const size_t n_threads,
184 : const bool thread_pinning,
185 : const std::string& sequence_pinning_policy);
186 : Sequence(const runtime::Task& first,
187 : const runtime::Task& last,
188 : const size_t n_threads,
189 : const bool thread_pinning,
190 : const std::string& sequence_pinning_policy);
191 : Sequence(const std::vector<runtime::Task*>& firsts,
192 : const size_t n_threads,
193 : const bool thread_pinning,
194 : const std::string& sequence_pinning_policy,
195 : const bool tasks_inplace = true);
196 : Sequence(const std::vector<runtime::Task*>& firsts,
197 : const std::vector<runtime::Task*>& lasts,
198 : const size_t n_threads,
199 : const bool thread_pinning,
200 : const std::string& sequence_pinning_policy,
201 : const bool tasks_inplace = true);
202 : Sequence(const std::vector<runtime::Task*>& firsts,
203 : const std::vector<runtime::Task*>& lasts,
204 : const std::vector<runtime::Task*>& exclusions,
205 : const size_t n_threads,
206 : const bool thread_pinning,
207 : const std::string& sequence_pinning_policy,
208 : const bool tasks_inplace = true);
209 : Sequence(runtime::Task& first,
210 : const size_t n_threads,
211 : const bool thread_pinning,
212 : const std::string& sequence_pinning_policy,
213 : const bool tasks_inplace = true);
214 : Sequence(runtime::Task& first,
215 : runtime::Task& last,
216 : const size_t n_threads,
217 : const bool thread_pinning,
218 : const std::string& sequence_pinning_policy,
219 : const bool tasks_inplace = true);
220 :
221 : virtual ~Sequence();
222 : virtual Sequence* clone() const;
223 :
224 : void set_thread_pinning(const bool thread_pinning);
225 : void set_thread_pinning(const bool thread_pinning, const std::vector<size_t>& puids = {});
226 : // Set pinning for the second version
227 : void set_thread_pinning(const bool thread_pinning, const std::string& sequence_pinning_policy);
228 : bool is_thread_pinning();
229 :
230 : void exec(std::function<bool(const std::vector<const int*>&)> stop_condition);
231 : void exec(std::function<bool()> stop_condition);
232 : void exec();
233 : void exec_seq(const size_t tid = 0, const int frame_id = -1);
234 : runtime::Task* exec_step(const size_t tid = 0, const int frame_id = -1);
235 :
236 : inline size_t get_n_threads() const;
237 :
238 : template<class C = module::Module>
239 : std::vector<C*> get_modules(const bool set_modules = true) const;
240 : template<class C = module::Module>
241 : std::vector<C*> get_cloned_modules(const C& module_ref) const;
242 :
243 : std::vector<std::vector<module::Module*>> get_modules_per_threads() const;
244 : std::vector<std::vector<module::Module*>> get_modules_per_types() const;
245 :
246 : std::vector<std::vector<runtime::Task*>> get_tasks_per_threads() const;
247 : std::vector<std::vector<runtime::Task*>> get_tasks_per_types() const;
248 :
249 : inline const std::vector<std::vector<runtime::Task*>>& get_firsts_tasks() const;
250 : inline const std::vector<std::vector<runtime::Task*>>& get_lasts_tasks() const;
251 :
252 : void export_dot(std::ostream& stream = std::cout) const;
253 :
254 : void set_no_copy_mode(const bool no_copy_mode);
255 : bool is_no_copy_mode() const;
256 :
257 : void set_auto_stop(const bool auto_stop);
258 : bool is_auto_stop() const;
259 :
260 : inline size_t get_n_frames() const;
261 : void set_n_frames(const size_t n_frames);
262 :
263 : virtual bool is_done() const;
264 :
265 : bool is_control_flow() const;
266 :
267 : protected:
268 : template<class SS>
269 : void delete_tree(tools::Digraph_node<SS>* node, std::vector<tools::Digraph_node<SS>*>& already_deleted_nodes);
270 :
271 : template<class SS, class TA>
272 : tools::Digraph_node<SS>* init_recursive(tools::Digraph_node<SS>* cur_subseq,
273 : size_t& ssid,
274 : size_t& taid,
275 : std::vector<std::pair<TA*, tools::Digraph_node<SS>*>>& selectors,
276 : std::vector<TA*>& switchers,
277 : TA& first,
278 : TA& current_task,
279 : const std::vector<TA*>& lasts,
280 : const std::vector<TA*>& exclusions,
281 : std::vector<size_t>& real_lasts_id,
282 : std::vector<TA*>& real_lasts,
283 : std::map<TA*, unsigned>& in_sockets_feed,
284 : std::map<TA*, std::pair<tools::Digraph_node<SS>*, size_t>>& task_subseq);
285 :
286 : template<class VTA>
287 : void export_dot_subsequence(const VTA& subseq,
288 : const std::vector<size_t>& tasks_id,
289 : const subseq_t& subseq_type,
290 : const std::string& subseq_name,
291 : const std::string& tab,
292 : std::ostream& stream = std::cout) const;
293 :
294 : template<class VTA>
295 : void export_dot_connections(const VTA& subseq, const std::string& tab, std::ostream& stream = std::cout) const;
296 :
297 : template<class SS>
298 : void export_dot(tools::Digraph_node<SS>* root, std::ostream& stream = std::cout) const;
299 :
300 : template<class SS, class MO>
301 : void replicate(const tools::Digraph_node<SS>* sequence);
302 :
303 : void _exec(const size_t tid,
304 : std::function<bool(const std::vector<const int*>&)>& stop_condition,
305 : tools::Digraph_node<Sub_sequence>* sequence);
306 :
307 : void _exec_without_statuses(const size_t tid,
308 : std::function<bool()>& stop_condition,
309 : tools::Digraph_node<Sub_sequence>* sequence);
310 :
311 : void gen_processes(const bool no_copy_mode = false);
312 : void reset_no_copy_mode();
313 :
314 : template<class SS>
315 : void check_ctrl_flow(tools::Digraph_node<SS>* root);
316 : Sub_sequence* get_last_subsequence(const size_t tid);
317 : void update_tasks_id(const size_t tid);
318 :
319 : std::vector<runtime::Task*> get_tasks_from_id(const size_t taid);
320 :
321 : void update_firsts_and_lasts_tasks();
322 :
323 : void _set_n_frames_unbind(std::vector<std::pair<runtime::Socket*, runtime::Socket*>>& unbind_sockets,
324 : std::vector<std::pair<runtime::Task*, runtime::Socket*>>& unbind_tasks);
325 : void _set_n_frames(const size_t n_frames);
326 : void _set_n_frames_rebind(const std::vector<std::pair<runtime::Socket*, runtime::Socket*>>& unbind_sockets,
327 : const std::vector<std::pair<runtime::Task*, runtime::Socket*>>& unbind_tasks);
328 :
329 : private:
330 : template<class SS, class TA>
331 : void init(const std::vector<TA*>& firsts, const std::vector<TA*>& lasts, const std::vector<TA*>& exclusions);
332 : template<class SS>
333 : inline void _init(tools::Digraph_node<SS>* root);
334 : };
335 : } // namespace runtime
336 : } // namespace spu
337 :
338 : #ifndef DOXYGEN_SHOULD_SKIP_THIS
339 : #include "Runtime/Sequence/Sequence.hxx"
340 : #endif
341 :
342 : #endif /* SEQUENCE_HPP_ */
|