Line data Source code
1 : #include "Scheduler/Scheduler.hpp"
2 : #include "Tools/Exception/exception.hpp"
3 : #include "Tools/Thread/Thread_pinning/Thread_pinning.hpp"
4 :
5 : #include <iostream>
6 : #include <sstream>
7 :
8 : using namespace spu;
9 : using namespace spu::sched;
10 :
11 0 : Scheduler::Scheduler(runtime::Sequence& sequence)
12 0 : : sequence(&sequence)
13 : {
14 0 : this->sequence = &sequence;
15 0 : }
16 :
17 11 : Scheduler::Scheduler(runtime::Sequence* sequence)
18 11 : : sequence(sequence)
19 : {
20 11 : if (sequence == nullptr)
21 : {
22 0 : std::stringstream message;
23 0 : message << "'sequence' can't be nullptr.";
24 0 : throw tools::invalid_argument(__FILE__, __LINE__, __func__, message.str());
25 0 : }
26 :
27 11 : this->sequence = sequence;
28 11 : }
29 :
30 : void
31 11 : Scheduler::profile(const size_t n_exec)
32 : {
33 11 : if (n_exec == 0)
34 : {
35 0 : std::stringstream message;
36 0 : message << "'n_exec' has to be higher than zero.";
37 0 : throw tools::invalid_argument(__FILE__, __LINE__, __func__, message.str());
38 0 : }
39 :
40 11 : if (sequence->get_n_threads() > 1)
41 : {
42 0 : std::stringstream message;
43 0 : message << "'sequence->get_n_threads()' has to be strictly equal to 1.";
44 0 : throw tools::invalid_argument(__FILE__, __LINE__, __func__, message.str());
45 0 : }
46 :
47 11 : if (this->sequence->is_control_flow())
48 : {
49 0 : std::stringstream message;
50 0 : message << "Control flow in the sequence is not supported yet.";
51 0 : throw tools::invalid_argument(__FILE__, __LINE__, __func__, message.str());
52 0 : }
53 :
54 11 : if (!this->tasks_desc.empty())
55 : {
56 0 : std::stringstream message;
57 : message << "'tasks_desc' should be empty, you should call 'Scheduler::reset' first if you want to re-run the "
58 0 : "profiling'.";
59 0 : throw tools::runtime_error(__FILE__, __LINE__, __func__, message.str());
60 0 : }
61 :
62 11 : this->sequence->set_auto_stop(false);
63 69 : for (auto& mod : this->sequence->get_modules<module::Module>(false))
64 150 : for (auto& tsk : mod->tasks)
65 : {
66 92 : tsk->reset();
67 92 : tsk->set_stats(true); // enable the statistics
68 92 : tsk->set_fast(true); // enable the fast mode (= disable the useless verifs
69 : // in the tasks)
70 11 : }
71 11 : unsigned int counter = 0;
72 1111 : this->sequence->exec([&counter, &n_exec]() { return ++counter >= n_exec; });
73 11 : this->sequence->set_auto_stop(true);
74 :
75 11 : std::vector<runtime::Task*>& tasks = this->sequence->sequences[0]->get_contents()->tasks;
76 70 : for (auto& t : tasks)
77 : {
78 : task_desc_t new_t;
79 59 : new_t.tptr = t;
80 59 : new_t.exec_duration = t->get_duration_avg();
81 59 : this->tasks_desc.push_back(new_t);
82 : }
83 11 : }
84 :
85 : void
86 6 : Scheduler::print_profiling(std::ostream& stream)
87 : {
88 6 : stream << "# Profiling:" << std::endl;
89 40 : for (auto& t : this->tasks_desc)
90 : {
91 34 : stream << "# - Name: " << t.tptr->get_name();
92 34 : stream << " - Execution duration: " << t.exec_duration.count() << " ns";
93 34 : stream << " - Replicable: " << (t.tptr->is_replicable() ? "yes" : "no") << std::endl;
94 : }
95 6 : }
96 :
97 : const std::vector<task_desc_t>&
98 0 : Scheduler::get_profiling()
99 : {
100 0 : return this->tasks_desc;
101 : }
102 :
103 : void
104 0 : Scheduler::reset()
105 : {
106 0 : this->solution.clear();
107 0 : this->tasks_desc.clear();
108 0 : }
109 :
110 : std::string
111 1 : Scheduler::perform_threads_mapping() const
112 : {
113 1 : if (this->solution.size() == 0)
114 : {
115 0 : std::stringstream message;
116 : message
117 0 : << "The solution has to contain at least one element, please run the 'Scheduler::schedule' method first.";
118 0 : throw tools::invalid_argument(__FILE__, __LINE__, __func__, message.str());
119 0 : }
120 :
121 1 : std::string pinning_policy;
122 1 : bool first_stage = true;
123 1 : size_t puid = 0;
124 7 : for (auto& stage : this->solution)
125 : {
126 6 : if (!first_stage) pinning_policy += " | ";
127 :
128 14 : for (size_t st = 0; st < stage.second; st++)
129 8 : pinning_policy += std::string((st == 0) ? "" : "; ") + "PU_" + std::to_string(puid++);
130 :
131 6 : first_stage = false;
132 : }
133 :
134 1 : return pinning_policy;
135 0 : }
136 :
137 : runtime::Pipeline*
138 11 : Scheduler::instantiate_pipeline(const size_t buffer_size,
139 : const bool active_waiting,
140 : const bool thread_pining,
141 : const std::string& pinning_policy)
142 : {
143 11 : if (this->solution.size() == 0)
144 : {
145 0 : std::stringstream message;
146 : message
147 0 : << "The solution has to contain at least one element, please run the 'Scheduler::schedule' method first.";
148 0 : throw tools::invalid_argument(__FILE__, __LINE__, __func__, message.str());
149 0 : }
150 :
151 11 : std::vector<runtime::Task*> firsts(this->tasks_desc.size());
152 11 : std::vector<runtime::Task*> lasts(this->tasks_desc.size());
153 :
154 11 : std::vector<std::pair<std::vector<runtime::Task*>, std::vector<runtime::Task*>>> sep_stages(this->solution.size());
155 11 : std::vector<size_t> n_threads(this->solution.size());
156 11 : size_t s = 0;
157 11 : size_t i = 0;
158 39 : for (auto& stage : this->solution)
159 : {
160 28 : std::pair<std::vector<runtime::Task*>, std::vector<runtime::Task*>> cur_stage_desc;
161 28 : sep_stages[s].first.resize(stage.first);
162 28 : sep_stages[s].second.resize(stage.first);
163 87 : for (size_t t = 0; t < stage.first; t++)
164 : {
165 59 : firsts[i] = this->tasks_desc[i].tptr;
166 59 : lasts[i] = this->tasks_desc[i].tptr;
167 :
168 59 : sep_stages[s].first[t] = this->tasks_desc[i].tptr;
169 59 : sep_stages[s].second[t] = this->tasks_desc[i].tptr;
170 :
171 59 : i++;
172 : }
173 :
174 28 : n_threads[s] = stage.second;
175 28 : s++;
176 28 : }
177 :
178 11 : std::vector<size_t> synchro_buffer_sizes(this->solution.size() - 1, buffer_size);
179 11 : std::vector<bool> synchro_active_waitings(this->solution.size() - 1, active_waiting);
180 11 : std::vector<bool> thread_pinings(this->solution.size(), thread_pining);
181 :
182 : return new runtime::Pipeline(firsts,
183 : lasts,
184 : sep_stages,
185 : n_threads,
186 : synchro_buffer_sizes,
187 : synchro_active_waitings,
188 : thread_pinings,
189 22 : pinning_policy);
190 11 : }
191 :
192 : std::vector<std::pair<size_t, size_t>>
193 6 : Scheduler::get_solution()
194 : {
195 6 : return this->solution;
196 : }
197 :
198 : runtime::Pipeline*
199 1 : Scheduler::generate_pipeline()
200 : {
201 1 : if (tasks_desc.empty()) this->profile();
202 :
203 1 : if (solution.empty()) this->schedule();
204 :
205 1 : return this->instantiate_pipeline(1, false, tools::Thread_pinning::is_init(), this->perform_threads_mapping());
206 : }
207 :
208 : size_t
209 0 : Scheduler::get_n_alloc_ressources() const
210 : {
211 0 : if (this->solution.size() == 0)
212 : {
213 0 : std::stringstream message;
214 : message
215 0 : << "The solution has to contain at least one element, please run the 'Scheduler::schedule' method first.";
216 0 : throw tools::invalid_argument(__FILE__, __LINE__, __func__, message.str());
217 0 : }
218 0 : size_t R = 0;
219 0 : for (auto s : this->solution)
220 0 : R += s.second;
221 0 : return R;
222 : }
223 :
224 : double
225 0 : Scheduler::get_throughput_est() const
226 : {
227 0 : throw tools::unimplemented_error(__FILE__, __LINE__, __func__);
228 : }
|