Line data Source code
1 : /*!
2 : * \file
3 : * \brief Class runtime::Sequence.
4 : */
5 : #ifndef SEQUENCE_HPP_
6 : #define SEQUENCE_HPP_
7 :
8 : #include <atomic>
9 : #include <functional>
10 : #include <iostream>
11 : #include <map>
12 : #include <memory>
13 : #include <mutex>
14 : #include <string>
15 : #include <vector>
16 :
17 : #include "Runtime/Socket/Socket.hpp"
18 : #include "Tools/Algo/Digraph/Digraph_node.hpp"
19 : #include "Tools/Interface/Interface_clone.hpp"
20 : #include "Tools/Interface/Interface_get_set_n_frames.hpp"
21 : #include "Tools/Interface/Interface_is_done.hpp"
22 : #include "Tools/Thread/Thread_pool/Thread_pool.hpp"
23 :
24 : namespace spu
25 : {
26 : namespace module
27 : {
28 : class Task;
29 : class Module;
30 : } // namespace module
31 : namespace sched
32 : {
33 : class Scheduler;
34 : }
35 : namespace runtime
36 : {
37 : class Pipeline;
38 :
39 : enum class subseq_t : size_t
40 : {
41 : STD,
42 : COMMUTE,
43 : SELECT
44 : };
45 :
46 : template<class VTA = std::vector<runtime::Task*>>
47 : class Sub_sequence_generic
48 : {
49 : public:
50 : subseq_t type;
51 : VTA tasks;
52 : std::vector<std::function<const int*()>> processes;
53 : std::vector<size_t> tasks_id;
54 : size_t id;
55 :
56 : // usefull in case of adaptor to make zero copy and restore original states at
57 : // the end of the chain execution
58 : std::vector<std::vector<std::vector<Socket*>>> rebind_sockets;
59 : std::vector<std::vector<std::vector<void*>>> rebind_dataptrs;
60 :
61 14402 : explicit Sub_sequence_generic()
62 14402 : : type(subseq_t::STD)
63 14402 : , id(0)
64 : {
65 14402 : }
66 28804 : virtual ~Sub_sequence_generic() = default;
67 : };
68 :
69 : using Sub_sequence = Sub_sequence_generic<std::vector<runtime::Task*>>;
70 : using Sub_sequence_const = Sub_sequence_generic<std::vector<const runtime::Task*>>;
71 :
72 : class Sequence
73 : : public tools::Interface_clone
74 : , public tools::Interface_get_set_n_frames
75 : , public tools::Interface_is_done
76 : {
77 : friend Pipeline;
78 : friend sched::Scheduler;
79 :
80 : protected:
81 : const size_t n_threads;
82 : std::shared_ptr<tools::Thread_pool> thread_pool;
83 :
84 : std::vector<tools::Digraph_node<Sub_sequence>*> sequences;
85 : std::vector<size_t> firsts_tasks_id;
86 : std::vector<size_t> lasts_tasks_id;
87 : std::vector<std::vector<runtime::Task*>> firsts_tasks;
88 : std::vector<std::vector<runtime::Task*>> lasts_tasks;
89 : std::vector<std::vector<std::shared_ptr<module::Module>>> modules;
90 : std::vector<std::vector<module::Module*>> all_modules;
91 : std::shared_ptr<std::mutex> mtx_exception;
92 : std::vector<std::string> prev_exception_messages;
93 : std::vector<std::string> prev_exception_messages_to_display;
94 : std::shared_ptr<std::atomic<bool>> force_exit_loop;
95 : size_t n_tasks;
96 : bool tasks_inplace;
97 : bool thread_pinning;
98 : std::vector<size_t> puids;
99 : bool no_copy_mode;
100 : const std::vector<const runtime::Task*> saved_exclusions;
101 : std::vector<tools::Interface_is_done*> donners;
102 : std::vector<std::vector<tools::Interface_reset*>> switchers_reset;
103 : bool auto_stop;
104 : bool is_part_of_pipeline;
105 :
106 : // internal state for the `exec_step` method
107 : std::vector<bool> next_round_is_over;
108 : std::vector<size_t> cur_task_id;
109 : std::vector<tools::Digraph_node<Sub_sequence>*> cur_ss;
110 :
111 : // extra attribute for pinning v2
112 : std::vector<std::string> pin_objects_per_thread;
113 :
114 : public:
115 : Sequence(const std::vector<const runtime::Task*>& firsts,
116 : const size_t n_threads = 1,
117 : const bool thread_pinning = false,
118 : const std::vector<size_t>& puids = {});
119 : Sequence(const std::vector<const runtime::Task*>& firsts,
120 : const std::vector<const runtime::Task*>& lasts,
121 : const size_t n_threads = 1,
122 : const bool thread_pinning = false,
123 : const std::vector<size_t>& puids = {});
124 : Sequence(const std::vector<const runtime::Task*>& firsts,
125 : const std::vector<const runtime::Task*>& lasts,
126 : const std::vector<const runtime::Task*>& exclusions,
127 : const size_t n_threads = 1,
128 : const bool thread_pinning = false,
129 : const std::vector<size_t>& puids = {});
130 : Sequence(const runtime::Task& first,
131 : const size_t n_threads = 1,
132 : const bool thread_pinning = false,
133 : const std::vector<size_t>& puids = {});
134 : Sequence(const runtime::Task& first,
135 : const runtime::Task& last,
136 : const size_t n_threads = 1,
137 : const bool thread_pinning = false,
138 : const std::vector<size_t>& puids = {});
139 : Sequence(const std::vector<runtime::Task*>& firsts,
140 : const size_t n_threads = 1,
141 : const bool thread_pinning = false,
142 : const std::vector<size_t>& puids = {},
143 : const bool tasks_inplace = true);
144 : Sequence(const std::vector<runtime::Task*>& firsts,
145 : const std::vector<runtime::Task*>& lasts,
146 : const size_t n_threads = 1,
147 : const bool thread_pinning = false,
148 : const std::vector<size_t>& puids = {},
149 : const bool tasks_inplace = true);
150 : Sequence(const std::vector<runtime::Task*>& firsts,
151 : const std::vector<runtime::Task*>& lasts,
152 : const std::vector<runtime::Task*>& exclusions,
153 : const size_t n_threads = 1,
154 : const bool thread_pinning = false,
155 : const std::vector<size_t>& puids = {},
156 : const bool tasks_inplace = true);
157 : Sequence(runtime::Task& first,
158 : const size_t n_threads = 1,
159 : const bool thread_pinning = false,
160 : const std::vector<size_t>& puids = {},
161 : const bool tasks_inplace = true);
162 : Sequence(runtime::Task& first,
163 : runtime::Task& last,
164 : const size_t n_threads = 1,
165 : const bool thread_pinning = false,
166 : const std::vector<size_t>& puids = {},
167 : const bool tasks_inplace = true);
168 :
169 : // Constructors for pinning v2
170 : Sequence(const std::vector<const runtime::Task*>& firsts,
171 : const size_t n_threads,
172 : const bool thread_pinning,
173 : const std::string& sequence_pinning_policy);
174 : Sequence(const std::vector<const runtime::Task*>& firsts,
175 : const std::vector<const runtime::Task*>& lasts,
176 : const size_t n_threads,
177 : const bool thread_pinning,
178 : const std::string& sequence_pinning_policy);
179 : Sequence(const std::vector<const runtime::Task*>& firsts,
180 : const std::vector<const runtime::Task*>& lasts,
181 : const std::vector<const runtime::Task*>& exclusions,
182 : const size_t n_threads,
183 : const bool thread_pinning,
184 : const std::string& sequence_pinning_policy);
185 : Sequence(const runtime::Task& first,
186 : const size_t n_threads,
187 : const bool thread_pinning,
188 : const std::string& sequence_pinning_policy);
189 : Sequence(const runtime::Task& first,
190 : const runtime::Task& last,
191 : const size_t n_threads,
192 : const bool thread_pinning,
193 : const std::string& sequence_pinning_policy);
194 : Sequence(const std::vector<runtime::Task*>& firsts,
195 : const size_t n_threads,
196 : const bool thread_pinning,
197 : const std::string& sequence_pinning_policy,
198 : const bool tasks_inplace = true);
199 : Sequence(const std::vector<runtime::Task*>& firsts,
200 : const std::vector<runtime::Task*>& lasts,
201 : const size_t n_threads,
202 : const bool thread_pinning,
203 : const std::string& sequence_pinning_policy,
204 : const bool tasks_inplace = true);
205 : Sequence(const std::vector<runtime::Task*>& firsts,
206 : const std::vector<runtime::Task*>& lasts,
207 : const std::vector<runtime::Task*>& exclusions,
208 : const size_t n_threads,
209 : const bool thread_pinning,
210 : const std::string& sequence_pinning_policy,
211 : const bool tasks_inplace = true);
212 : Sequence(runtime::Task& first,
213 : const size_t n_threads,
214 : const bool thread_pinning,
215 : const std::string& sequence_pinning_policy,
216 : const bool tasks_inplace = true);
217 : Sequence(runtime::Task& first,
218 : runtime::Task& last,
219 : const size_t n_threads,
220 : const bool thread_pinning,
221 : const std::string& sequence_pinning_policy,
222 : const bool tasks_inplace = true);
223 :
224 : virtual ~Sequence();
225 : virtual Sequence* clone() const;
226 :
227 : void set_thread_pinning(const bool thread_pinning);
228 : void set_thread_pinning(const bool thread_pinning, const std::vector<size_t>& puids = {});
229 : // Set pinning for the second version
230 : void set_thread_pinning(const bool thread_pinning, const std::string& sequence_pinning_policy);
231 : bool is_thread_pinning();
232 :
233 : void exec(std::function<bool(const std::vector<const int*>&)> stop_condition);
234 : void exec(std::function<bool()> stop_condition);
235 : void exec();
236 : void exec_seq(const size_t tid = 0, const int frame_id = -1);
237 : runtime::Task* exec_step(const size_t tid = 0, const int frame_id = -1);
238 :
239 : inline size_t get_n_threads() const;
240 :
241 : template<class C = module::Module>
242 : std::vector<C*> get_modules(const bool set_modules = true) const;
243 : template<class C = module::Module>
244 : std::vector<C*> get_cloned_modules(const C& module_ref) const;
245 :
246 : std::vector<std::vector<module::Module*>> get_modules_per_threads() const;
247 : std::vector<std::vector<module::Module*>> get_modules_per_types() const;
248 :
249 : std::vector<std::vector<runtime::Task*>> get_tasks_per_threads() const;
250 : std::vector<std::vector<runtime::Task*>> get_tasks_per_types() const;
251 :
252 : inline const std::vector<std::vector<runtime::Task*>>& get_firsts_tasks() const;
253 : inline const std::vector<std::vector<runtime::Task*>>& get_lasts_tasks() const;
254 :
255 : void export_dot(std::ostream& stream = std::cout) const;
256 :
257 : void set_no_copy_mode(const bool no_copy_mode);
258 : bool is_no_copy_mode() const;
259 :
260 : void set_auto_stop(const bool auto_stop);
261 : bool is_auto_stop() const;
262 :
263 : inline size_t get_n_frames() const;
264 : void set_n_frames(const size_t n_frames);
265 :
266 : virtual bool is_done() const;
267 :
268 : bool is_control_flow() const;
269 :
270 : protected:
271 : template<class SS>
272 : void delete_tree(tools::Digraph_node<SS>* node, std::vector<tools::Digraph_node<SS>*>& already_deleted_nodes);
273 :
274 : template<class SS, class TA>
275 : tools::Digraph_node<SS>* init_recursive(tools::Digraph_node<SS>* cur_subseq,
276 : size_t& ssid,
277 : size_t& taid,
278 : std::vector<std::pair<TA*, tools::Digraph_node<SS>*>>& selectors,
279 : std::vector<TA*>& switchers,
280 : TA& first,
281 : TA& current_task,
282 : const std::vector<TA*>& lasts,
283 : const std::vector<TA*>& exclusions,
284 : std::vector<size_t>& real_lasts_id,
285 : std::vector<TA*>& real_lasts,
286 : std::map<TA*, unsigned>& in_sockets_feed,
287 : std::map<TA*, std::pair<tools::Digraph_node<SS>*, size_t>>& task_subseq);
288 :
289 : template<class VTA>
290 : void export_dot_subsequence(const VTA& subseq,
291 : const std::vector<size_t>& tasks_id,
292 : const subseq_t& subseq_type,
293 : const std::string& subseq_name,
294 : const std::string& tab,
295 : std::ostream& stream = std::cout) const;
296 :
297 : template<class VTA>
298 : void export_dot_connections(const VTA& subseq, const std::string& tab, std::ostream& stream = std::cout) const;
299 :
300 : template<class SS>
301 : void export_dot(tools::Digraph_node<SS>* root, std::ostream& stream = std::cout) const;
302 :
303 : template<class SS, class MO>
304 : void replicate(const tools::Digraph_node<SS>* sequence);
305 :
306 : void _exec(const size_t tid,
307 : std::function<bool(const std::vector<const int*>&)>& stop_condition,
308 : tools::Digraph_node<Sub_sequence>* sequence);
309 :
310 : void _exec_without_statuses(const size_t tid,
311 : std::function<bool()>& stop_condition,
312 : tools::Digraph_node<Sub_sequence>* sequence);
313 :
314 : void gen_processes(const bool no_copy_mode = false);
315 : void reset_no_copy_mode();
316 :
317 : template<class SS>
318 : void check_ctrl_flow(tools::Digraph_node<SS>* root);
319 : Sub_sequence* get_last_subsequence(const size_t tid);
320 : void update_tasks_id(const size_t tid);
321 :
322 : std::vector<runtime::Task*> get_tasks_from_id(const size_t taid);
323 :
324 : void update_firsts_and_lasts_tasks();
325 :
326 : void _set_n_frames_unbind(std::vector<std::pair<runtime::Socket*, runtime::Socket*>>& unbind_sockets,
327 : std::vector<std::pair<runtime::Task*, runtime::Socket*>>& unbind_tasks);
328 : void _set_n_frames(const size_t n_frames);
329 : void _set_n_frames_rebind(const std::vector<std::pair<runtime::Socket*, runtime::Socket*>>& unbind_sockets,
330 : const std::vector<std::pair<runtime::Task*, runtime::Socket*>>& unbind_tasks);
331 :
332 : private:
333 : template<class SS, class TA>
334 : void init(const std::vector<TA*>& firsts, const std::vector<TA*>& lasts, const std::vector<TA*>& exclusions);
335 : template<class SS>
336 : inline void _init(tools::Digraph_node<SS>* root);
337 : };
338 : } // namespace runtime
339 : } // namespace spu
340 :
341 : #ifndef DOXYGEN_SHOULD_SKIP_THIS
342 : #include "Runtime/Sequence/Sequence.hxx"
343 : #endif
344 :
345 : #endif /* SEQUENCE_HPP_ */
|