Line data Source code
1 : /*!
2 : * \file
3 : * \brief Class runtime::Sequence.
4 : */
5 : #ifndef SEQUENCE_HPP_
6 : #define SEQUENCE_HPP_
7 :
8 : #include <atomic>
9 : #include <functional>
10 : #include <iostream>
11 : #include <map>
12 : #include <memory>
13 : #include <mutex>
14 : #include <string>
15 : #include <vector>
16 :
17 : #include "Runtime/Socket/Socket.hpp"
18 : #include "Tools/Algo/Digraph/Digraph_node.hpp"
19 : #include "Tools/Interface/Interface_clone.hpp"
20 : #include "Tools/Interface/Interface_get_set_n_frames.hpp"
21 : #include "Tools/Interface/Interface_is_done.hpp"
22 : #include "Tools/Thread/Thread_pool/Thread_pool.hpp"
23 :
24 : namespace spu
25 : {
26 : namespace module
27 : {
28 : class Task;
29 : class Module;
30 : } // namespace module
31 : namespace sched
32 : {
33 : class Scheduler;
34 : }
35 : namespace runtime
36 : {
37 : class Pipeline;
38 :
39 : enum class subseq_t : size_t
40 : {
41 : STD,
42 : COMMUTE,
43 : SELECT
44 : };
45 :
46 : template<class VTA = std::vector<runtime::Task*>>
47 : class Sub_sequence_generic
48 : {
49 : public:
50 : subseq_t type;
51 : VTA tasks;
52 : std::vector<std::function<const int*()>> processes;
53 : std::vector<size_t> tasks_id;
54 : size_t id;
55 :
56 : // usefull in case of adaptor to make zero copy and restore original states at
57 : // the end of the chain execution
58 : std::vector<std::vector<std::vector<Socket*>>> rebind_sockets;
59 : std::vector<std::vector<std::vector<void*>>> rebind_dataptrs;
60 :
61 16537 : explicit Sub_sequence_generic()
62 16537 : : type(subseq_t::STD)
63 16537 : , id(0)
64 : {
65 16537 : }
66 33074 : virtual ~Sub_sequence_generic() = default;
67 : };
68 :
69 : using Sub_sequence = Sub_sequence_generic<std::vector<runtime::Task*>>;
70 : using Sub_sequence_const = Sub_sequence_generic<std::vector<const runtime::Task*>>;
71 :
72 : class Sequence
73 : : public tools::Interface_clone
74 : , public tools::Interface_get_set_n_frames
75 : , public tools::Interface_is_done
76 : {
77 : friend Pipeline;
78 : friend sched::Scheduler;
79 :
80 : protected:
81 : const size_t n_threads;
82 : std::shared_ptr<tools::Thread_pool> thread_pool;
83 :
84 : std::vector<tools::Digraph_node<Sub_sequence>*> sequences;
85 : std::vector<size_t> firsts_tasks_id;
86 : std::vector<size_t> lasts_tasks_id;
87 : std::vector<std::vector<runtime::Task*>> firsts_tasks;
88 : std::vector<std::vector<runtime::Task*>> lasts_tasks;
89 : std::vector<std::vector<std::shared_ptr<module::Module>>> modules;
90 : std::vector<std::vector<module::Module*>> all_modules;
91 : std::shared_ptr<std::mutex> mtx_exception;
92 : std::vector<std::string> prev_exception_messages;
93 : std::vector<std::string> prev_exception_messages_to_display;
94 : std::shared_ptr<std::atomic<bool>> force_exit_loop;
95 : size_t n_tasks;
96 : bool tasks_inplace;
97 : bool thread_pinning;
98 : std::vector<size_t> puids;
99 : bool no_copy_mode;
100 : const std::vector<const runtime::Task*> saved_exclusions;
101 : std::vector<tools::Interface_is_done*> donners;
102 : std::vector<std::vector<tools::Interface_reset*>> switchers_reset;
103 : bool auto_stop;
104 : bool is_part_of_pipeline;
105 :
106 : // internal state for the `exec_step` method
107 : std::vector<bool> next_round_is_over;
108 : std::vector<size_t> cur_task_id;
109 : std::vector<tools::Digraph_node<Sub_sequence>*> cur_ss;
110 :
111 : // extra attribute for pinning v2
112 : std::vector<std::string> pin_objects_per_thread;
113 :
114 : // is memory allocation enabled?
115 : bool memory_allocation;
116 :
117 : public:
118 : Sequence(const std::vector<const runtime::Task*>& firsts,
119 : const size_t n_threads = 1,
120 : const bool thread_pinning = false,
121 : const std::vector<size_t>& puids = {},
122 : const bool memory_allocation = true);
123 : Sequence(const std::vector<const runtime::Task*>& firsts,
124 : const std::vector<const runtime::Task*>& lasts,
125 : const size_t n_threads = 1,
126 : const bool thread_pinning = false,
127 : const std::vector<size_t>& puids = {},
128 : const bool memory_allocation = true);
129 : Sequence(const std::vector<const runtime::Task*>& firsts,
130 : const std::vector<const runtime::Task*>& lasts,
131 : const std::vector<const runtime::Task*>& exclusions,
132 : const size_t n_threads = 1,
133 : const bool thread_pinning = false,
134 : const std::vector<size_t>& puids = {},
135 : const bool memory_allocation = true);
136 : Sequence(const runtime::Task& first,
137 : const size_t n_threads = 1,
138 : const bool thread_pinning = false,
139 : const std::vector<size_t>& puids = {},
140 : const bool memory_allocation = true);
141 : Sequence(const runtime::Task& first,
142 : const runtime::Task& last,
143 : const size_t n_threads = 1,
144 : const bool thread_pinning = false,
145 : const std::vector<size_t>& puids = {},
146 : const bool memory_allocation = true);
147 : Sequence(const std::vector<runtime::Task*>& firsts,
148 : const size_t n_threads = 1,
149 : const bool thread_pinning = false,
150 : const std::vector<size_t>& puids = {},
151 : const bool tasks_inplace = true,
152 : const bool memory_allocation = true);
153 : Sequence(const std::vector<runtime::Task*>& firsts,
154 : const std::vector<runtime::Task*>& lasts,
155 : const size_t n_threads = 1,
156 : const bool thread_pinning = false,
157 : const std::vector<size_t>& puids = {},
158 : const bool tasks_inplace = true,
159 : const bool memory_allocation = true);
160 : Sequence(const std::vector<runtime::Task*>& firsts,
161 : const std::vector<runtime::Task*>& lasts,
162 : const std::vector<runtime::Task*>& exclusions,
163 : const size_t n_threads = 1,
164 : const bool thread_pinning = false,
165 : const std::vector<size_t>& puids = {},
166 : const bool tasks_inplace = true,
167 : const bool memory_allocation = true);
168 : Sequence(runtime::Task& first,
169 : const size_t n_threads = 1,
170 : const bool thread_pinning = false,
171 : const std::vector<size_t>& puids = {},
172 : const bool tasks_inplace = true,
173 : const bool memory_allocation = true);
174 : Sequence(runtime::Task& first,
175 : runtime::Task& last,
176 : const size_t n_threads = 1,
177 : const bool thread_pinning = false,
178 : const std::vector<size_t>& puids = {},
179 : const bool tasks_inplace = true,
180 : const bool memory_allocation = true);
181 :
182 : // Constructors for pinning v2
183 : Sequence(const std::vector<const runtime::Task*>& firsts,
184 : const size_t n_threads,
185 : const bool thread_pinning,
186 : const std::string& sequence_pinning_policy,
187 : const bool memory_allocation = true);
188 : Sequence(const std::vector<const runtime::Task*>& firsts,
189 : const std::vector<const runtime::Task*>& lasts,
190 : const size_t n_threads,
191 : const bool thread_pinning,
192 : const std::string& sequence_pinning_policy,
193 : const bool memory_allocation = true);
194 : Sequence(const std::vector<const runtime::Task*>& firsts,
195 : const std::vector<const runtime::Task*>& lasts,
196 : const std::vector<const runtime::Task*>& exclusions,
197 : const size_t n_threads,
198 : const bool thread_pinning,
199 : const std::string& sequence_pinning_policy,
200 : const bool memory_allocation = true);
201 : Sequence(const runtime::Task& first,
202 : const size_t n_threads,
203 : const bool thread_pinning,
204 : const std::string& sequence_pinning_policy,
205 : const bool memory_allocation = true);
206 : Sequence(const runtime::Task& first,
207 : const runtime::Task& last,
208 : const size_t n_threads,
209 : const bool thread_pinning,
210 : const std::string& sequence_pinning_policy,
211 : const bool memory_allocation = true);
212 : Sequence(const std::vector<runtime::Task*>& firsts,
213 : const size_t n_threads,
214 : const bool thread_pinning,
215 : const std::string& sequence_pinning_policy,
216 : const bool tasks_inplace = true,
217 : const bool memory_allocation = true);
218 : Sequence(const std::vector<runtime::Task*>& firsts,
219 : const std::vector<runtime::Task*>& lasts,
220 : const size_t n_threads,
221 : const bool thread_pinning,
222 : const std::string& sequence_pinning_policy,
223 : const bool tasks_inplace = true,
224 : const bool memory_allocation = true);
225 : Sequence(const std::vector<runtime::Task*>& firsts,
226 : const std::vector<runtime::Task*>& lasts,
227 : const std::vector<runtime::Task*>& exclusions,
228 : const size_t n_threads,
229 : const bool thread_pinning,
230 : const std::string& sequence_pinning_policy,
231 : const bool tasks_inplace = true,
232 : const bool memory_allocation = true);
233 : Sequence(runtime::Task& first,
234 : const size_t n_threads,
235 : const bool thread_pinning,
236 : const std::string& sequence_pinning_policy,
237 : const bool tasks_inplace = true,
238 : const bool memory_allocation = true);
239 : Sequence(runtime::Task& first,
240 : runtime::Task& last,
241 : const size_t n_threads,
242 : const bool thread_pinning,
243 : const std::string& sequence_pinning_policy,
244 : const bool tasks_inplace = true,
245 : const bool memory_allocation = true);
246 :
247 : virtual ~Sequence();
248 : virtual Sequence* clone() const;
249 :
250 : void set_thread_pinning(const bool thread_pinning);
251 : void set_thread_pinning(const bool thread_pinning, const std::vector<size_t>& puids = {});
252 : // Set pinning for the second version
253 : void set_thread_pinning(const bool thread_pinning, const std::string& sequence_pinning_policy);
254 : bool is_thread_pinning();
255 :
256 : // Allocate sequence memory
257 : void allocate_outbuffers();
258 : // Deallocate sequence memory
259 : void deallocate_outbuffers();
260 :
261 : void exec(std::function<bool(const std::vector<const int*>&)> stop_condition);
262 : void exec(std::function<bool()> stop_condition);
263 : void exec();
264 : void exec_seq(const size_t tid = 0, const int frame_id = -1);
265 : runtime::Task* exec_step(const size_t tid = 0, const int frame_id = -1);
266 :
267 : inline size_t get_n_threads() const;
268 :
269 : template<class C = module::Module>
270 : std::vector<C*> get_modules(const bool set_modules = true) const;
271 : template<class C = module::Module>
272 : std::vector<C*> get_cloned_modules(const C& module_ref) const;
273 :
274 : std::vector<std::vector<module::Module*>> get_modules_per_threads() const;
275 : std::vector<std::vector<module::Module*>> get_modules_per_types() const;
276 :
277 : std::vector<std::vector<runtime::Task*>> get_tasks_per_threads() const;
278 : std::vector<std::vector<runtime::Task*>> get_tasks_per_types() const;
279 :
280 : inline const std::vector<std::vector<runtime::Task*>>& get_firsts_tasks() const;
281 : inline const std::vector<std::vector<runtime::Task*>>& get_lasts_tasks() const;
282 :
283 : void export_dot(std::ostream& stream = std::cout) const;
284 :
285 : void set_no_copy_mode(const bool no_copy_mode);
286 : bool is_no_copy_mode() const;
287 :
288 : void set_auto_stop(const bool auto_stop);
289 : bool is_auto_stop() const;
290 :
291 : inline size_t get_n_frames() const;
292 : void set_n_frames(const size_t n_frames);
293 :
294 : virtual bool is_done() const;
295 :
296 : bool is_control_flow() const;
297 :
298 : protected:
299 : template<class SS>
300 : void delete_tree(tools::Digraph_node<SS>* node, std::vector<tools::Digraph_node<SS>*>& already_deleted_nodes);
301 :
302 : template<class SS, class TA>
303 : tools::Digraph_node<SS>* init_recursive(tools::Digraph_node<SS>* cur_subseq,
304 : size_t& ssid,
305 : size_t& taid,
306 : std::vector<std::pair<TA*, tools::Digraph_node<SS>*>>& selectors,
307 : std::vector<TA*>& switchers,
308 : TA& first,
309 : TA& current_task,
310 : const std::vector<TA*>& lasts,
311 : const std::vector<TA*>& exclusions,
312 : std::vector<size_t>& real_lasts_id,
313 : std::vector<TA*>& real_lasts,
314 : std::map<TA*, unsigned>& in_sockets_feed,
315 : std::map<TA*, std::pair<tools::Digraph_node<SS>*, size_t>>& task_subseq);
316 :
317 : template<class VTA>
318 : void export_dot_subsequence(const VTA& subseq,
319 : const std::vector<size_t>& tasks_id,
320 : const subseq_t& subseq_type,
321 : const std::string& subseq_name,
322 : const std::string& tab,
323 : std::ostream& stream = std::cout) const;
324 :
325 : template<class VTA>
326 : void export_dot_connections(const VTA& subseq, const std::string& tab, std::ostream& stream = std::cout) const;
327 :
328 : template<class SS>
329 : void export_dot(tools::Digraph_node<SS>* root, std::ostream& stream = std::cout) const;
330 :
331 : template<class SS, class MO>
332 : void replicate(const tools::Digraph_node<SS>* sequence);
333 :
334 : void _exec(const size_t tid,
335 : std::function<bool(const std::vector<const int*>&)>& stop_condition,
336 : tools::Digraph_node<Sub_sequence>* sequence);
337 :
338 : void _exec_without_statuses(const size_t tid,
339 : std::function<bool()>& stop_condition,
340 : tools::Digraph_node<Sub_sequence>* sequence);
341 :
342 : void gen_processes(const bool no_copy_mode = false);
343 : void reset_no_copy_mode();
344 :
345 : template<class SS>
346 : void check_ctrl_flow(tools::Digraph_node<SS>* root);
347 : Sub_sequence* get_last_subsequence(const size_t tid);
348 : void update_tasks_id(const size_t tid);
349 :
350 : std::vector<runtime::Task*> get_tasks_from_id(const size_t taid);
351 :
352 : void update_firsts_and_lasts_tasks();
353 :
354 : void _set_n_frames_unbind(std::vector<std::pair<runtime::Socket*, runtime::Socket*>>& unbind_sockets,
355 : std::vector<std::pair<runtime::Task*, runtime::Socket*>>& unbind_tasks);
356 : void _set_n_frames(const size_t n_frames);
357 : void _set_n_frames_rebind(const std::vector<std::pair<runtime::Socket*, runtime::Socket*>>& unbind_sockets,
358 : const std::vector<std::pair<runtime::Task*, runtime::Socket*>>& unbind_tasks);
359 :
360 : private:
361 : template<class SS, class TA>
362 : void init(const std::vector<TA*>& firsts, const std::vector<TA*>& lasts, const std::vector<TA*>& exclusions);
363 : template<class SS>
364 : inline void _init(tools::Digraph_node<SS>* root);
365 : };
366 : } // namespace runtime
367 : } // namespace spu
368 :
369 : #ifndef DOXYGEN_SHOULD_SKIP_THIS
370 : #include "Runtime/Sequence/Sequence.hxx"
371 : #endif
372 :
373 : #endif /* SEQUENCE_HPP_ */
|