Line data Source code
1 : #include "Scheduler/Scheduler.hpp"
2 : #include "Tools/Display/Statistics/Statistics.hpp"
3 : #include "Tools/Exception/exception.hpp"
4 : #include "Tools/Thread/Thread_pinning/Thread_pinning.hpp"
5 :
6 : #include <iostream>
7 : #include <sstream>
8 :
9 : using namespace spu;
10 : using namespace spu::sched;
11 :
12 0 : Scheduler::Scheduler(runtime::Sequence& sequence)
13 0 : : sequence(&sequence)
14 : {
15 0 : this->sequence = &sequence;
16 0 : }
17 :
18 11 : Scheduler::Scheduler(runtime::Sequence* sequence)
19 11 : : sequence(sequence)
20 : {
21 11 : if (sequence == nullptr)
22 : {
23 0 : std::stringstream message;
24 0 : message << "'sequence' can't be nullptr.";
25 0 : throw tools::invalid_argument(__FILE__, __LINE__, __func__, message.str());
26 0 : }
27 :
28 11 : this->sequence = sequence;
29 11 : }
30 :
31 : void
32 11 : Scheduler::_profile(const int puid, const size_t n_exec)
33 : {
34 11 : if (n_exec == 0)
35 : {
36 0 : std::stringstream message;
37 0 : message << "'n_exec' has to be higher than zero.";
38 0 : throw tools::invalid_argument(__FILE__, __LINE__, __func__, message.str());
39 0 : }
40 :
41 11 : if (sequence->get_n_threads() > 1)
42 : {
43 0 : std::stringstream message;
44 0 : message << "'sequence->get_n_threads()' has to be strictly equal to 1.";
45 0 : throw tools::invalid_argument(__FILE__, __LINE__, __func__, message.str());
46 0 : }
47 :
48 11 : if (this->sequence->is_control_flow())
49 : {
50 0 : std::stringstream message;
51 0 : message << "Control flow in the sequence is not supported yet.";
52 0 : throw tools::invalid_argument(__FILE__, __LINE__, __func__, message.str());
53 0 : }
54 :
55 11 : this->sequence->set_auto_stop(false);
56 69 : for (auto& mod : this->sequence->get_modules<module::Module>(false))
57 150 : for (auto& tsk : mod->tasks)
58 : {
59 92 : tsk->reset();
60 92 : tsk->set_stats(true); // enable the statistics
61 92 : tsk->set_fast(true); // enable the fast mode (= disable the useless verifs
62 : // in the tasks)
63 11 : }
64 :
65 : bool prev_thread_pinning;
66 11 : std::vector<std::string> prev_pin_objects_per_thread;
67 11 : std::vector<size_t> prev_puids;
68 11 : if (puid >= 0)
69 : {
70 0 : prev_thread_pinning = this->sequence->thread_pinning;
71 0 : prev_pin_objects_per_thread = this->sequence->pin_objects_per_thread;
72 0 : prev_puids = this->sequence->puids;
73 :
74 0 : this->sequence->set_thread_pinning(true, std::vector<size_t>(1, puid));
75 : }
76 :
77 11 : unsigned int counter = 0;
78 1111 : this->sequence->exec([&counter, &n_exec]() { return ++counter >= n_exec; });
79 11 : this->sequence->set_auto_stop(true);
80 :
81 11 : if (puid >= 0)
82 : {
83 0 : this->sequence->thread_pinning = prev_thread_pinning;
84 0 : this->sequence->pin_objects_per_thread = prev_pin_objects_per_thread;
85 0 : this->sequence->puids = prev_puids;
86 : }
87 :
88 11 : if (this->tasks_desc.empty())
89 : {
90 11 : std::vector<runtime::Task*>& tasks = this->sequence->sequences[0]->get_contents()->tasks;
91 70 : for (auto& t : tasks)
92 : {
93 59 : task_desc_t new_t;
94 59 : new_t.tptr = t;
95 59 : new_t.exec_duration.push_back(t->get_duration_avg());
96 59 : this->tasks_desc.push_back(new_t);
97 59 : }
98 : }
99 : else
100 : {
101 0 : std::vector<runtime::Task*>& tasks = this->sequence->sequences[0]->get_contents()->tasks;
102 0 : size_t i = 0;
103 0 : for (auto& t : tasks)
104 : {
105 0 : task_desc_t& cur_t = this->tasks_desc[i];
106 0 : if (t != cur_t.tptr)
107 : {
108 0 : std::stringstream message;
109 0 : message << "'t' should be equal to 'cur_t.tptr' ('t' = " << std::hex << (uint64_t)t
110 0 : << ", 'cur_t.tptr' = " << (uint64_t)cur_t.tptr << ", 'i' = " << i << ").";
111 0 : throw tools::runtime_error(__FILE__, __LINE__, __func__, message.str());
112 0 : }
113 0 : cur_t.exec_duration.push_back(t->get_duration_avg());
114 0 : i++;
115 : }
116 : }
117 :
118 : // TODO: Restore stats and fast mode of tasks as before!
119 11 : }
120 :
121 : void
122 11 : Scheduler::profile(const size_t n_exec)
123 : {
124 11 : if (!this->tasks_desc.empty())
125 : {
126 0 : std::stringstream message;
127 : message << "'tasks_desc' should be empty, you should call 'Scheduler::reset' first if you want to re-run the "
128 0 : "profiling'.";
129 0 : throw tools::runtime_error(__FILE__, __LINE__, __func__, message.str());
130 0 : }
131 :
132 11 : this->_profile(-1, n_exec);
133 :
134 11 : this->profiling_summary.resize(1);
135 :
136 11 : std::stringstream ss;
137 11 : tools::Stats::show(this->sequence->get_tasks_per_threads()[0], false, false, ss);
138 11 : this->profiling_summary[0] = ss.str();
139 11 : }
140 :
141 : void
142 0 : Scheduler::profile(const std::vector<size_t>& puids, const size_t n_exec)
143 : {
144 0 : if (!this->tasks_desc.empty())
145 : {
146 0 : std::stringstream message;
147 : message << "'tasks_desc' should be empty, you should call 'Scheduler::reset' first if you want to re-run the "
148 0 : "profiling'.";
149 0 : throw tools::runtime_error(__FILE__, __LINE__, __func__, message.str());
150 0 : }
151 :
152 0 : this->profiling_summary.resize(puids.size());
153 0 : for (size_t i = 0; i < puids.size(); i++)
154 : {
155 0 : this->_profile(puids[i], n_exec);
156 0 : std::stringstream ss;
157 0 : tools::Stats::show(this->sequence->get_tasks_per_threads()[0], false, false, ss);
158 0 : this->profiling_summary[i] = ss.str();
159 0 : }
160 :
161 0 : this->profiled_puids = puids;
162 0 : }
163 :
164 : void
165 6 : Scheduler::print_profiling(std::ostream& stream)
166 : {
167 6 : stream << "# Profiling:" << std::endl;
168 6 : if (this->profiled_puids.size())
169 0 : for (size_t p = 0; p < this->profiled_puids.size(); p++)
170 : {
171 0 : stream << "# On PUID n°" << this->profiled_puids[p] << std::endl;
172 0 : std::cout << this->profiling_summary[p];
173 0 : if (p < this->profiled_puids.size() - 1) stream << "# ----------- " << std::endl;
174 : }
175 : else
176 6 : stream << this->profiling_summary[0];
177 6 : }
178 :
179 : const std::vector<task_desc_t>&
180 0 : Scheduler::get_profiling()
181 : {
182 0 : return this->tasks_desc;
183 : }
184 :
185 : void
186 0 : Scheduler::reset()
187 : {
188 0 : this->solution.clear();
189 0 : this->tasks_desc.clear();
190 0 : }
191 :
192 : std::vector<bool>
193 1 : Scheduler::get_thread_pinnings() const
194 : {
195 2 : return std::vector<bool>(this->solution.size(), tools::Thread_pinning::is_init());
196 : }
197 :
198 : std::vector<size_t>
199 1 : Scheduler::get_sync_buff_sizes() const
200 : {
201 2 : return std::vector<size_t>(this->solution.size() - 1, 1);
202 : }
203 :
204 : std::vector<bool>
205 1 : Scheduler::get_sync_active_waitings() const
206 : {
207 1 : return std::vector<bool>(this->solution.size() - 1, false);
208 : }
209 :
210 : std::string
211 1 : Scheduler::get_threads_mapping() const
212 : {
213 1 : if (this->solution.size() == 0)
214 : {
215 0 : std::stringstream message;
216 : message
217 0 : << "The solution has to contain at least one element, please run the 'Scheduler::schedule' method first.";
218 0 : throw tools::invalid_argument(__FILE__, __LINE__, __func__, message.str());
219 0 : }
220 :
221 1 : std::string pinning_policy;
222 1 : bool first_stage = true;
223 1 : size_t puid = 0;
224 7 : for (auto& stage : this->solution)
225 : {
226 6 : if (!first_stage) pinning_policy += " | ";
227 :
228 14 : for (size_t st = 0; st < stage.second; st++)
229 8 : pinning_policy += std::string((st == 0) ? "" : "; ") + "PU_" + std::to_string(puid++);
230 :
231 6 : first_stage = false;
232 : }
233 :
234 1 : return pinning_policy;
235 0 : }
236 :
237 : runtime::Pipeline*
238 11 : Scheduler::instantiate_pipeline(const std::vector<size_t> synchro_buffer_sizes,
239 : const std::vector<bool> synchro_active_waitings,
240 : const std::vector<bool> thread_pinings,
241 : const std::string& pinning_policy)
242 : {
243 11 : if (this->solution.size() == 0)
244 : {
245 0 : std::stringstream message;
246 : message
247 0 : << "The solution has to contain at least one element, please run the 'Scheduler::schedule' method first.";
248 0 : throw tools::invalid_argument(__FILE__, __LINE__, __func__, message.str());
249 0 : }
250 :
251 : // std::cout << "synchro_buffer_sizes = {";
252 : // for (size_t i = 0; i < synchro_buffer_sizes.size(); i++)
253 : // std::cout << synchro_buffer_sizes[i] << ", ";
254 : // std::cout << "}" << std::endl;
255 : // std::cout << "synchro_active_waitings = {";
256 : // for (size_t i = 0; i < synchro_active_waitings.size(); i++)
257 : // std::cout << synchro_active_waitings[i] << ", ";
258 : // std::cout << "}" << std::endl;
259 : // std::cout << "thread_pinings = {";
260 : // for (size_t i = 0; i < thread_pinings.size(); i++)
261 : // std::cout << thread_pinings[i] << ", ";
262 : // std::cout << "}" << std::endl;
263 : // std::cout << "pinning_policy = " << pinning_policy << std::endl;
264 :
265 11 : std::vector<runtime::Task*> firsts(this->tasks_desc.size());
266 11 : std::vector<runtime::Task*> lasts(this->tasks_desc.size());
267 :
268 11 : std::vector<std::pair<std::vector<runtime::Task*>, std::vector<runtime::Task*>>> sep_stages(this->solution.size());
269 11 : std::vector<size_t> n_threads(this->solution.size());
270 11 : size_t s = 0;
271 11 : size_t i = 0;
272 39 : for (auto& stage : this->solution)
273 : {
274 28 : std::pair<std::vector<runtime::Task*>, std::vector<runtime::Task*>> cur_stage_desc;
275 28 : sep_stages[s].first.resize(stage.first);
276 28 : sep_stages[s].second.resize(stage.first);
277 87 : for (size_t t = 0; t < stage.first; t++)
278 : {
279 59 : firsts[i] = this->tasks_desc[i].tptr;
280 59 : lasts[i] = this->tasks_desc[i].tptr;
281 :
282 59 : sep_stages[s].first[t] = this->tasks_desc[i].tptr;
283 59 : sep_stages[s].second[t] = this->tasks_desc[i].tptr;
284 :
285 59 : i++;
286 : }
287 :
288 28 : n_threads[s] = stage.second;
289 28 : s++;
290 28 : }
291 :
292 : return new runtime::Pipeline(firsts,
293 : lasts,
294 : sep_stages,
295 : n_threads,
296 : synchro_buffer_sizes,
297 : synchro_active_waitings,
298 : thread_pinings,
299 22 : pinning_policy);
300 11 : }
301 :
302 : runtime::Pipeline*
303 0 : Scheduler::instantiate_pipeline(const size_t buffer_size,
304 : const bool active_waiting,
305 : const bool thread_pining,
306 : const std::string& pinning_policy)
307 : {
308 0 : std::vector<size_t> synchro_buffer_sizes(this->solution.size() - 1, buffer_size);
309 0 : std::vector<bool> synchro_active_waitings(this->solution.size() - 1, active_waiting);
310 0 : std::vector<bool> thread_pinings(this->solution.size(), thread_pining);
311 :
312 0 : return instantiate_pipeline(synchro_buffer_sizes, synchro_active_waitings, thread_pinings, pinning_policy);
313 0 : }
314 :
315 : std::vector<std::pair<size_t, size_t>>
316 36 : Scheduler::get_solution()
317 : {
318 36 : return this->solution;
319 : }
320 :
321 : runtime::Pipeline*
322 1 : Scheduler::generate_pipeline()
323 : {
324 1 : if (tasks_desc.empty()) this->profile();
325 :
326 1 : if (solution.empty()) this->schedule();
327 :
328 2 : return this->instantiate_pipeline(this->get_sync_buff_sizes(),
329 2 : this->get_sync_active_waitings(),
330 2 : this->get_thread_pinnings(),
331 3 : this->get_threads_mapping());
332 : }
333 :
334 : size_t
335 0 : Scheduler::get_n_alloc_ressources() const
336 : {
337 0 : if (this->solution.size() == 0)
338 : {
339 0 : std::stringstream message;
340 : message
341 0 : << "The solution has to contain at least one element, please run the 'Scheduler::schedule' method first.";
342 0 : throw tools::invalid_argument(__FILE__, __LINE__, __func__, message.str());
343 0 : }
344 0 : size_t R = 0;
345 0 : for (auto s : this->solution)
346 0 : R += s.second;
347 0 : return R;
348 : }
349 :
350 : double
351 0 : Scheduler::get_throughput_est() const
352 : {
353 0 : throw tools::unimplemented_error(__FILE__, __LINE__, __func__);
354 : }
|