Line data Source code
1 : #include "Scheduler/OTAC/Scheduler_OTAC.hpp"
2 : #include "Tools/Exception/exception.hpp"
3 : #include <cmath>
4 : #include <iostream>
5 : #include <limits>
6 : #include <sstream>
7 :
8 : using namespace spu;
9 : using namespace spu::sched;
10 :
11 : /*#define VERBOSE*/
12 :
13 0 : Scheduler_OTAC::Scheduler_OTAC(runtime::Sequence& sequence, const size_t R)
14 : : Scheduler(&sequence)
15 0 : , R(R)
16 0 : , P(std::numeric_limits<double>::infinity())
17 : {
18 0 : }
19 :
20 11 : Scheduler_OTAC::Scheduler_OTAC(runtime::Sequence* sequence, const size_t R)
21 : : Scheduler(sequence)
22 11 : , R(R)
23 11 : , P(std::numeric_limits<double>::infinity())
24 : {
25 11 : }
26 :
27 : // USEFUL FUNCTIONS
28 : // Weight of a sub-sequence (stage)
29 : double
30 544 : weight(const std::vector<runtime::Task*>& s, const unsigned int r)
31 : {
32 544 : if (r == 0)
33 : {
34 0 : return std::numeric_limits<double>::infinity();
35 : }
36 : else
37 : {
38 544 : double sum = 0.;
39 1388 : for (auto& t : s)
40 : {
41 844 : sum = sum + (t->get_duration_avg()).count();
42 : }
43 544 : return sum / r;
44 : }
45 : }
46 :
47 : double
48 17 : weight_t(const std::vector<task_desc_t>& s, const unsigned int r)
49 : {
50 17 : if (r == 0)
51 : {
52 0 : return std::numeric_limits<double>::infinity();
53 : }
54 : else
55 : {
56 17 : double sum = 0.;
57 106 : for (auto& t : s)
58 : {
59 89 : sum = sum + (t.tptr->get_duration_avg()).count();
60 : }
61 17 : return sum / r;
62 : }
63 : }
64 :
65 : // Function to find the index of an element
66 : int
67 0 : getIndex(const std::vector<task_desc_t>& chain, const task_desc_t target)
68 : {
69 0 : int index = -1;
70 0 : for (size_t i = 0; i < chain.size(); i++)
71 : {
72 0 : if (chain[i].tptr == target.tptr)
73 : {
74 0 : index = i;
75 : }
76 : }
77 0 : return index;
78 : }
79 :
80 : // Is the subsequence replicable?
81 : bool
82 102 : is_replicable(const std::vector<runtime::Task*>& s)
83 : {
84 160 : for (auto& t : s)
85 : {
86 110 : if (!t->is_replicable())
87 : {
88 52 : return false;
89 : }
90 : }
91 50 : return true;
92 : }
93 :
94 : // return the maximal execution time among Tf (stateful task)
95 : double
96 11 : max_stateful_weight(const std::vector<task_desc_t>& chain)
97 : {
98 11 : double max = 0.0;
99 :
100 70 : for (auto& t : chain)
101 : {
102 59 : if (!t.tptr->is_replicable())
103 : {
104 28 : if ((t.tptr->get_duration_avg()).count() > max)
105 : {
106 19 : max = (t.tptr->get_duration_avg()).count();
107 : }
108 : }
109 : }
110 11 : return max;
111 : }
112 :
113 : // return the maximal execution time among the tasks
114 : double
115 11 : max_weight_t(const std::vector<task_desc_t>& tasks)
116 : {
117 11 : double max = 0.0;
118 11 : int size = tasks.size();
119 11 : if (size >= 1)
120 : {
121 70 : for (auto& t : tasks)
122 : {
123 59 : if ((t.tptr->get_duration_avg()).count() > max)
124 : {
125 33 : max = (t.tptr->get_duration_avg()).count();
126 : }
127 : }
128 : }
129 11 : return max;
130 : }
131 :
132 : // PACKING FUNCTIONS
133 : // Main loop packing (inplace)
134 : void
135 147 : main_loop_packing(const std::vector<task_desc_t>& chain, const double P, int& e, std::vector<runtime::Task*>& s, int& n)
136 : {
137 147 : int N = chain.size();
138 :
139 245 : while ((e <= N) && (weight(s, 1) + chain[e - 1].exec_duration.count() <= P))
140 : {
141 : #ifdef VERBOSE
142 : std::cout << "main loop packing: weight = " << weight(s, 1);
143 : std::cout << " new_task=" << chain[e - 1].exec_duration.count();
144 : std::cout << " weight+new_task= " << (weight(s, 1) + chain[e - 1].exec_duration.count());
145 : std::cout << " P = " << P << std::endl;
146 : #endif
147 98 : s.push_back(chain[e - 1].tptr);
148 98 : n += 1;
149 98 : e += 1;
150 : }
151 147 : }
152 :
153 : // Packing if the current stage is stateless
154 : int
155 50 : stateless_packing(const std::vector<task_desc_t>& chain, const int e, std::vector<runtime::Task*>& s, int& n)
156 : {
157 50 : int N = chain.size();
158 50 : int f = e;
159 72 : while ((f <= N) && chain[f - 1].tptr->is_replicable())
160 : {
161 22 : s.push_back(chain[f - 1].tptr);
162 22 : n += 1;
163 22 : f += 1;
164 : }
165 50 : return f;
166 : }
167 :
168 : // New packing with the current sub-sequence to find extra tasks (f >= e)
169 : int
170 0 : extra_tasks_packing(const std::vector<task_desc_t>& chain,
171 : const double P,
172 : const int f,
173 : std::vector<runtime::Task*>& s,
174 : int& n)
175 : {
176 0 : std::vector<runtime::Task*> s_temp;
177 0 : s_temp.push_back(chain[f - 1].tptr);
178 0 : int e = f;
179 0 : while ((chain[e - 2].exec_duration.count() + weight(s_temp, 1)) <= P)
180 : {
181 0 : s.pop_back();
182 0 : n -= 1;
183 0 : s_temp.push_back(chain[e - 1].tptr);
184 0 : e -= 1;
185 : }
186 0 : return e;
187 0 : }
188 :
189 : // If taking tasks from the predecessor with less ressources (ri-1) is a success
190 : void
191 0 : improved_packing(const std::vector<task_desc_t>& chain,
192 : const double P,
193 : int& e,
194 : std::vector<runtime::Task*>& s,
195 : int& n,
196 : int& r)
197 : {
198 0 : r -= 1;
199 0 : int N = chain.size();
200 0 : while ((e <= N) && (weight(s, r) + chain[e - 1].exec_duration.count() / r <= P))
201 : {
202 0 : s.push_back(chain[e - 1].tptr);
203 0 : n += 1;
204 0 : e += 1;
205 : }
206 0 : }
207 :
208 : // Else, go back to the previous configuration
209 : void
210 0 : go_back_packing(const std::vector<task_desc_t>& chain, const int f, int& e, std::vector<runtime::Task*>& s, int& n)
211 : {
212 0 : while (e != f)
213 : {
214 0 : s.push_back(chain[e - 1].tptr);
215 0 : n += 1;
216 0 : e += 1;
217 : }
218 0 : }
219 :
220 : void
221 0 : print_solution(const std::vector<std::pair<size_t, size_t>>& solution, const std::string tag)
222 : {
223 0 : std::cout << tag << ": {";
224 0 : for (auto& pair_s : solution)
225 : {
226 0 : std::cout << "(" << pair_s.first << ", " << pair_s.second << ")";
227 : }
228 0 : std::cout << "}" << std::endl;
229 0 : }
230 :
231 : bool
232 51 : Scheduler_OTAC::PROBE(const std::vector<task_desc_t>& chain,
233 : const size_t R,
234 : double& P,
235 : std::vector<std::pair<size_t, size_t>>& solution)
236 : {
237 51 : solution.clear();
238 : #ifdef VERBOSE
239 : std::cout << "--" << std::endl;
240 : #endif
241 : // Initial state
242 51 : int i = 0; // current sub-sequence index
243 : // int b = 0; // begin index of the current sub sequence
244 51 : int e = 1; // end index of the current sub sequence
245 : int f;
246 : int n; // nunber of tasks by sub-sequence (in solution)
247 : int r; // number of resources by sub-sequence (in solution)
248 51 : std::pair<size_t, size_t> tmp_nr;
249 51 : std::vector<double> w;
250 318 : for (auto& t : chain)
251 : {
252 267 : w.push_back(t.exec_duration.count());
253 : }
254 51 : int N = w.size(); // number of tasks in the chain
255 51 : float maxWeight = 0;
256 51 : std::vector<std::vector<runtime::Task*>> sequence; // sub-sequence list (containing s)
257 :
258 : // Loop to create packing
259 198 : while (e <= N)
260 : {
261 147 : i = i + 1;
262 : // int list_index = i - 1;
263 147 : int b = e;
264 147 : e = b + 1;
265 :
266 : // New loop
267 147 : std::vector<runtime::Task*> s;
268 147 : s.push_back(chain[b - 1].tptr);
269 147 : r = 1;
270 147 : n = 1;
271 :
272 : // Add tasks to si while they fit
273 147 : main_loop_packing(chain, P, e, s, n);
274 :
275 : // Resources needed for a packing
276 147 : r = std::ceil(weight(s, 1) / P);
277 : #ifdef VERBOSE
278 : std::cout << "r=" << r << " weight=" << weight(s, 1) << " P=" << P << std::endl;
279 : #endif
280 :
281 : // All tasks do not fit with the last packing and if replicable
282 147 : if ((e <= N) && is_replicable(s))
283 : {
284 50 : f = stateless_packing(chain, e, s, n);
285 : // Processors needed for a packing
286 50 : r = std::ceil(weight(s, 1) / P);
287 50 : if (f > N)
288 : {
289 6 : e = f;
290 : }
291 50 : if (e != f)
292 : {
293 0 : e = extra_tasks_packing(chain, P, f, s, n);
294 0 : if (weight(s, r - 1) <= P)
295 : {
296 0 : improved_packing(chain, P, e, s, n, r);
297 : }
298 : // No benefit from taking tasks
299 : else
300 : {
301 0 : go_back_packing(chain, f, e, s, n);
302 : }
303 : }
304 : }
305 147 : sequence.push_back(s);
306 147 : tmp_nr.first = n; //
307 147 : tmp_nr.second = r;
308 147 : solution.push_back(tmp_nr);
309 147 : float w = weight(s, r);
310 147 : if (w >= maxWeight)
311 : {
312 98 : maxWeight = w;
313 : }
314 147 : }
315 : #ifdef VERBOSE
316 : print_solution(solution, "Probe");
317 : #endif
318 51 : size_t sum = 0;
319 198 : for (auto& nr : solution)
320 : {
321 147 : sum += nr.second;
322 : }
323 51 : if (sum <= R)
324 : {
325 21 : P = maxWeight;
326 21 : return true;
327 : }
328 : else
329 : {
330 30 : return false;
331 : }
332 51 : }
333 :
334 : void
335 11 : Scheduler_OTAC::SOLVE(const std::vector<task_desc_t>& chain,
336 : const size_t R,
337 : double& P,
338 : std::vector<std::pair<size_t, size_t>>& solution)
339 : {
340 11 : if (R == 0)
341 : {
342 0 : std::stringstream message;
343 0 : message << "The number of ressources R has to be higher than 0!";
344 0 : throw tools::invalid_argument(__FILE__, __LINE__, __func__, message.str());
345 0 : }
346 :
347 11 : if (!solution.empty())
348 : {
349 0 : std::stringstream message;
350 0 : message << "'solution' should be empty!";
351 0 : throw tools::runtime_error(__FILE__, __LINE__, __func__, message.str());
352 0 : }
353 :
354 11 : double maxTf = max_stateful_weight(chain);
355 11 : double maxWeight = max_weight_t(chain);
356 11 : double eps = 1 / (double)R;
357 11 : double Pmin = weight_t(chain, R) > maxTf ? weight_t(chain, R) : maxTf;
358 11 : double Pmax = Pmin + maxWeight;
359 11 : std::vector<std::pair<size_t, size_t>> solution_current;
360 : #ifdef VERBOSE
361 : std::cout << "Pmin=" << Pmin << " Pmax=" << Pmax << " maxWeight=" << maxWeight << std::endl;
362 : #endif
363 :
364 11 : if (R == 1)
365 : {
366 2 : P = Pmax;
367 2 : std::pair<size_t, size_t> pair_r1;
368 2 : pair_r1.first = chain.size();
369 2 : pair_r1.second = R;
370 2 : solution.push_back(pair_r1);
371 : }
372 : else
373 : {
374 9 : P = (Pmax + Pmin) / 2;
375 9 : std::vector<std::pair<size_t, size_t>> solution_tmp;
376 60 : while ((Pmax - Pmin) > eps && P > maxTf) //
377 : {
378 51 : bool is_there_a_solution = this->PROBE(chain, R, P, solution_tmp);
379 51 : if (is_there_a_solution == true)
380 : {
381 21 : Pmax = P;
382 21 : solution_current = solution_tmp;
383 : #ifdef VERBOSE
384 : print_solution(solution_tmp, "Pmax " + std::to_string(Pmax) + " Pmin " + std::to_string(Pmin));
385 : #endif
386 : }
387 : else
388 : {
389 30 : Pmin = P;
390 : }
391 51 : P = (Pmax + Pmin) / 2;
392 : }
393 9 : solution = solution_current;
394 9 : }
395 : // print_solution(solution, "# Solution stages {(n,r)}");
396 : /*return new std::vector<std::pair<size_t, size_t>>(solution); // solution_returned*/;
397 11 : }
398 :
399 : void
400 11 : Scheduler_OTAC::schedule()
401 : {
402 11 : if (this->tasks_desc.empty())
403 : {
404 0 : std::stringstream message;
405 0 : message << "'tasks_desc' cannot be empty, you need to execute the 'Scheduler::profile()' method first!";
406 0 : throw tools::runtime_error(__FILE__, __LINE__, __func__, message.str());
407 0 : }
408 :
409 11 : if (!this->solution.empty()) this->solution.clear();
410 11 : this->SOLVE(this->tasks_desc, this->R, this->P, this->solution);
411 11 : }
412 :
413 : double
414 0 : Scheduler_OTAC::get_period() const
415 : {
416 0 : if (this->P == std::numeric_limits<double>::infinity())
417 : {
418 0 : std::stringstream message;
419 0 : message << "You cannot get the period before executing the 'Scheduler::schedule()' method!";
420 0 : throw tools::runtime_error(__FILE__, __LINE__, __func__, message.str());
421 0 : }
422 :
423 0 : return this->P;
424 : }
425 :
426 : void
427 0 : Scheduler_OTAC::reset()
428 : {
429 0 : Scheduler::reset();
430 0 : this->P = std::numeric_limits<double>::infinity();
431 0 : }
432 :
433 : double
434 0 : Scheduler_OTAC::get_throughput_est() const
435 : {
436 0 : return (1.0 / this->get_period()) * 1e9; // n streams per second
437 : }
|