LCOV - code coverage report
Current view: top level - src/Scheduler/OTAC - Scheduler_OTAC.cpp (source / functions) Hit Total Coverage
Test: streampu_clean.info Lines: 126 198 63.6 %
Date: 2025-01-11 12:25:42 Functions: 11 20 55.0 %

          Line data    Source code
       1             : #include "Scheduler/OTAC/Scheduler_OTAC.hpp"
       2             : #include "Tools/Exception/exception.hpp"
       3             : #include <cmath>
       4             : #include <iostream>
       5             : #include <limits>
       6             : #include <sstream>
       7             : 
       8             : using namespace spu;
       9             : using namespace spu::sched;
      10             : 
      11             : /*#define VERBOSE*/
      12             : 
      13           0 : Scheduler_OTAC::Scheduler_OTAC(runtime::Sequence& sequence, const size_t R)
      14             :   : Scheduler(&sequence)
      15           0 :   , R(R)
      16           0 :   , P(std::numeric_limits<double>::infinity())
      17             : {
      18           0 : }
      19             : 
      20          11 : Scheduler_OTAC::Scheduler_OTAC(runtime::Sequence* sequence, const size_t R)
      21             :   : Scheduler(sequence)
      22          11 :   , R(R)
      23          11 :   , P(std::numeric_limits<double>::infinity())
      24             : {
      25          11 : }
      26             : 
      27             : // USEFUL FUNCTIONS
      28             : // Weight of a sub-sequence (stage)
      29             : double
      30         544 : weight(const std::vector<runtime::Task*>& s, const unsigned int r)
      31             : {
      32         544 :     if (r == 0)
      33             :     {
      34           0 :         return std::numeric_limits<double>::infinity();
      35             :     }
      36             :     else
      37             :     {
      38         544 :         double sum = 0.;
      39        1388 :         for (auto& t : s)
      40             :         {
      41         844 :             sum = sum + (t->get_duration_avg()).count();
      42             :         }
      43         544 :         return sum / r;
      44             :     }
      45             : }
      46             : 
      47             : double
      48          17 : weight_t(const std::vector<task_desc_t>& s, const unsigned int r)
      49             : {
      50          17 :     if (r == 0)
      51             :     {
      52           0 :         return std::numeric_limits<double>::infinity();
      53             :     }
      54             :     else
      55             :     {
      56          17 :         double sum = 0.;
      57         106 :         for (auto& t : s)
      58             :         {
      59          89 :             sum = sum + (t.tptr->get_duration_avg()).count();
      60             :         }
      61          17 :         return sum / r;
      62             :     }
      63             : }
      64             : 
      65             : // Function to find the index of an element
      66             : int
      67           0 : getIndex(const std::vector<task_desc_t>& chain, const task_desc_t target)
      68             : {
      69           0 :     int index = -1;
      70           0 :     for (size_t i = 0; i < chain.size(); i++)
      71             :     {
      72           0 :         if (chain[i].tptr == target.tptr)
      73             :         {
      74           0 :             index = i;
      75             :         }
      76             :     }
      77           0 :     return index;
      78             : }
      79             : 
      80             : // Is the subsequence replicable?
      81             : bool
      82         102 : is_replicable(const std::vector<runtime::Task*>& s)
      83             : {
      84         160 :     for (auto& t : s)
      85             :     {
      86         110 :         if (!t->is_replicable())
      87             :         {
      88          52 :             return false;
      89             :         }
      90             :     }
      91          50 :     return true;
      92             : }
      93             : 
      94             : // return the maximal execution time among Tf (stateful task)
      95             : double
      96          11 : max_stateful_weight(const std::vector<task_desc_t>& chain)
      97             : {
      98          11 :     double max = 0.0;
      99             : 
     100          70 :     for (auto& t : chain)
     101             :     {
     102          59 :         if (!t.tptr->is_replicable())
     103             :         {
     104          28 :             if ((t.tptr->get_duration_avg()).count() > max)
     105             :             {
     106          19 :                 max = (t.tptr->get_duration_avg()).count();
     107             :             }
     108             :         }
     109             :     }
     110          11 :     return max;
     111             : }
     112             : 
     113             : // return the maximal execution time among the tasks
     114             : double
     115          11 : max_weight_t(const std::vector<task_desc_t>& tasks)
     116             : {
     117          11 :     double max = 0.0;
     118          11 :     int size = tasks.size();
     119          11 :     if (size >= 1)
     120             :     {
     121          70 :         for (auto& t : tasks)
     122             :         {
     123          59 :             if ((t.tptr->get_duration_avg()).count() > max)
     124             :             {
     125          33 :                 max = (t.tptr->get_duration_avg()).count();
     126             :             }
     127             :         }
     128             :     }
     129          11 :     return max;
     130             : }
     131             : 
     132             : // PACKING FUNCTIONS
     133             : // Main loop packing (inplace)
     134             : void
     135         147 : main_loop_packing(const std::vector<task_desc_t>& chain, const double P, int& e, std::vector<runtime::Task*>& s, int& n)
     136             : {
     137         147 :     int N = chain.size();
     138             : 
     139         245 :     while ((e <= N) && (weight(s, 1) + chain[e - 1].exec_duration.count() <= P))
     140             :     {
     141             : #ifdef VERBOSE
     142             :         std::cout << "main loop packing: weight = " << weight(s, 1);
     143             :         std::cout << " new_task=" << chain[e - 1].exec_duration.count();
     144             :         std::cout << " weight+new_task= " << (weight(s, 1) + chain[e - 1].exec_duration.count());
     145             :         std::cout << " P = " << P << std::endl;
     146             : #endif
     147          98 :         s.push_back(chain[e - 1].tptr);
     148          98 :         n += 1;
     149          98 :         e += 1;
     150             :     }
     151         147 : }
     152             : 
     153             : // Packing if the current stage is stateless
     154             : int
     155          50 : stateless_packing(const std::vector<task_desc_t>& chain, const int e, std::vector<runtime::Task*>& s, int& n)
     156             : {
     157          50 :     int N = chain.size();
     158          50 :     int f = e;
     159          72 :     while ((f <= N) && chain[f - 1].tptr->is_replicable())
     160             :     {
     161          22 :         s.push_back(chain[f - 1].tptr);
     162          22 :         n += 1;
     163          22 :         f += 1;
     164             :     }
     165          50 :     return f;
     166             : }
     167             : 
     168             : // New packing with the current sub-sequence to find extra tasks (f >= e)
     169             : int
     170           0 : extra_tasks_packing(const std::vector<task_desc_t>& chain,
     171             :                     const double P,
     172             :                     const int f,
     173             :                     std::vector<runtime::Task*>& s,
     174             :                     int& n)
     175             : {
     176           0 :     std::vector<runtime::Task*> s_temp;
     177           0 :     s_temp.push_back(chain[f - 1].tptr);
     178           0 :     int e = f;
     179           0 :     while ((chain[e - 2].exec_duration.count() + weight(s_temp, 1)) <= P)
     180             :     {
     181           0 :         s.pop_back();
     182           0 :         n -= 1;
     183           0 :         s_temp.push_back(chain[e - 1].tptr);
     184           0 :         e -= 1;
     185             :     }
     186           0 :     return e;
     187           0 : }
     188             : 
     189             : // If taking tasks from the predecessor with less ressources (ri-1) is a success
     190             : void
     191           0 : improved_packing(const std::vector<task_desc_t>& chain,
     192             :                  const double P,
     193             :                  int& e,
     194             :                  std::vector<runtime::Task*>& s,
     195             :                  int& n,
     196             :                  int& r)
     197             : {
     198           0 :     r -= 1;
     199           0 :     int N = chain.size();
     200           0 :     while ((e <= N) && (weight(s, r) + chain[e - 1].exec_duration.count() / r <= P))
     201             :     {
     202           0 :         s.push_back(chain[e - 1].tptr);
     203           0 :         n += 1;
     204           0 :         e += 1;
     205             :     }
     206           0 : }
     207             : 
     208             : // Else, go back to the previous configuration
     209             : void
     210           0 : go_back_packing(const std::vector<task_desc_t>& chain, const int f, int& e, std::vector<runtime::Task*>& s, int& n)
     211             : {
     212           0 :     while (e != f)
     213             :     {
     214           0 :         s.push_back(chain[e - 1].tptr);
     215           0 :         n += 1;
     216           0 :         e += 1;
     217             :     }
     218           0 : }
     219             : 
     220             : void
     221           0 : print_solution(const std::vector<std::pair<size_t, size_t>>& solution, const std::string tag)
     222             : {
     223           0 :     std::cout << tag << ": {";
     224           0 :     for (auto& pair_s : solution)
     225             :     {
     226           0 :         std::cout << "(" << pair_s.first << ", " << pair_s.second << ")";
     227             :     }
     228           0 :     std::cout << "}" << std::endl;
     229           0 : }
     230             : 
     231             : bool
     232          51 : Scheduler_OTAC::PROBE(const std::vector<task_desc_t>& chain,
     233             :                       const size_t R,
     234             :                       double& P,
     235             :                       std::vector<std::pair<size_t, size_t>>& solution)
     236             : {
     237          51 :     solution.clear();
     238             : #ifdef VERBOSE
     239             :     std::cout << "--" << std::endl;
     240             : #endif
     241             :     // Initial state
     242          51 :     int i = 0; // current sub-sequence index
     243             :     // int b = 0; // begin index of the current sub sequence
     244          51 :     int e = 1; // end index of the current sub sequence
     245             :     int f;
     246             :     int n; // nunber of tasks by sub-sequence (in solution)
     247             :     int r; // number of resources by sub-sequence (in solution)
     248          51 :     std::pair<size_t, size_t> tmp_nr;
     249          51 :     std::vector<double> w;
     250         318 :     for (auto& t : chain)
     251             :     {
     252         267 :         w.push_back(t.exec_duration.count());
     253             :     }
     254          51 :     int N = w.size(); // number of tasks in the chain
     255          51 :     float maxWeight = 0;
     256          51 :     std::vector<std::vector<runtime::Task*>> sequence; // sub-sequence list (containing s)
     257             : 
     258             :     // Loop to create packing
     259         198 :     while (e <= N)
     260             :     {
     261         147 :         i = i + 1;
     262             :         // int list_index = i - 1;
     263         147 :         int b = e;
     264         147 :         e = b + 1;
     265             : 
     266             :         // New loop
     267         147 :         std::vector<runtime::Task*> s;
     268         147 :         s.push_back(chain[b - 1].tptr);
     269         147 :         r = 1;
     270         147 :         n = 1;
     271             : 
     272             :         // Add tasks to si while they fit
     273         147 :         main_loop_packing(chain, P, e, s, n);
     274             : 
     275             :         // Resources needed for a packing
     276         147 :         r = std::ceil(weight(s, 1) / P);
     277             : #ifdef VERBOSE
     278             :         std::cout << "r=" << r << " weight=" << weight(s, 1) << " P=" << P << std::endl;
     279             : #endif
     280             : 
     281             :         // All tasks do not fit with the last packing and if replicable
     282         147 :         if ((e <= N) && is_replicable(s))
     283             :         {
     284          50 :             f = stateless_packing(chain, e, s, n);
     285             :             // Processors needed for a packing
     286          50 :             r = std::ceil(weight(s, 1) / P);
     287          50 :             if (f > N)
     288             :             {
     289           6 :                 e = f;
     290             :             }
     291          50 :             if (e != f)
     292             :             {
     293           0 :                 e = extra_tasks_packing(chain, P, f, s, n);
     294           0 :                 if (weight(s, r - 1) <= P)
     295             :                 {
     296           0 :                     improved_packing(chain, P, e, s, n, r);
     297             :                 }
     298             :                 // No benefit from taking tasks
     299             :                 else
     300             :                 {
     301           0 :                     go_back_packing(chain, f, e, s, n);
     302             :                 }
     303             :             }
     304             :         }
     305         147 :         sequence.push_back(s);
     306         147 :         tmp_nr.first = n; //
     307         147 :         tmp_nr.second = r;
     308         147 :         solution.push_back(tmp_nr);
     309         147 :         float w = weight(s, r);
     310         147 :         if (w >= maxWeight)
     311             :         {
     312          98 :             maxWeight = w;
     313             :         }
     314         147 :     }
     315             : #ifdef VERBOSE
     316             :     print_solution(solution, "Probe");
     317             : #endif
     318          51 :     size_t sum = 0;
     319         198 :     for (auto& nr : solution)
     320             :     {
     321         147 :         sum += nr.second;
     322             :     }
     323          51 :     if (sum <= R)
     324             :     {
     325          21 :         P = maxWeight;
     326          21 :         return true;
     327             :     }
     328             :     else
     329             :     {
     330          30 :         return false;
     331             :     }
     332          51 : }
     333             : 
     334             : void
     335          11 : Scheduler_OTAC::SOLVE(const std::vector<task_desc_t>& chain,
     336             :                       const size_t R,
     337             :                       double& P,
     338             :                       std::vector<std::pair<size_t, size_t>>& solution)
     339             : {
     340          11 :     if (R == 0)
     341             :     {
     342           0 :         std::stringstream message;
     343           0 :         message << "The number of ressources R has to be higher than 0!";
     344           0 :         throw tools::invalid_argument(__FILE__, __LINE__, __func__, message.str());
     345           0 :     }
     346             : 
     347          11 :     if (!solution.empty())
     348             :     {
     349           0 :         std::stringstream message;
     350           0 :         message << "'solution' should be empty!";
     351           0 :         throw tools::runtime_error(__FILE__, __LINE__, __func__, message.str());
     352           0 :     }
     353             : 
     354          11 :     double maxTf = max_stateful_weight(chain);
     355          11 :     double maxWeight = max_weight_t(chain);
     356          11 :     double eps = 1 / (double)R;
     357          11 :     double Pmin = weight_t(chain, R) > maxTf ? weight_t(chain, R) : maxTf;
     358          11 :     double Pmax = Pmin + maxWeight;
     359          11 :     std::vector<std::pair<size_t, size_t>> solution_current;
     360             : #ifdef VERBOSE
     361             :     std::cout << "Pmin=" << Pmin << " Pmax=" << Pmax << " maxWeight=" << maxWeight << std::endl;
     362             : #endif
     363             : 
     364          11 :     if (R == 1)
     365             :     {
     366           2 :         P = Pmax;
     367           2 :         std::pair<size_t, size_t> pair_r1;
     368           2 :         pair_r1.first = chain.size();
     369           2 :         pair_r1.second = R;
     370           2 :         solution.push_back(pair_r1);
     371             :     }
     372             :     else
     373             :     {
     374           9 :         P = (Pmax + Pmin) / 2;
     375           9 :         std::vector<std::pair<size_t, size_t>> solution_tmp;
     376          60 :         while ((Pmax - Pmin) > eps && P > maxTf) //
     377             :         {
     378          51 :             bool is_there_a_solution = this->PROBE(chain, R, P, solution_tmp);
     379          51 :             if (is_there_a_solution == true)
     380             :             {
     381          21 :                 Pmax = P;
     382          21 :                 solution_current = solution_tmp;
     383             : #ifdef VERBOSE
     384             :                 print_solution(solution_tmp, "Pmax " + std::to_string(Pmax) + " Pmin " + std::to_string(Pmin));
     385             : #endif
     386             :             }
     387             :             else
     388             :             {
     389          30 :                 Pmin = P;
     390             :             }
     391          51 :             P = (Pmax + Pmin) / 2;
     392             :         }
     393           9 :         solution = solution_current;
     394           9 :     }
     395             :     // print_solution(solution, "# Solution stages {(n,r)}");
     396             :     /*return new std::vector<std::pair<size_t, size_t>>(solution); // solution_returned*/;
     397          11 : }
     398             : 
     399             : void
     400          11 : Scheduler_OTAC::schedule()
     401             : {
     402          11 :     if (this->tasks_desc.empty())
     403             :     {
     404           0 :         std::stringstream message;
     405           0 :         message << "'tasks_desc' cannot be empty, you need to execute the 'Scheduler::profile()' method first!";
     406           0 :         throw tools::runtime_error(__FILE__, __LINE__, __func__, message.str());
     407           0 :     }
     408             : 
     409          11 :     if (!this->solution.empty()) this->solution.clear();
     410          11 :     this->SOLVE(this->tasks_desc, this->R, this->P, this->solution);
     411          11 : }
     412             : 
     413             : double
     414           0 : Scheduler_OTAC::get_period() const
     415             : {
     416           0 :     if (this->P == std::numeric_limits<double>::infinity())
     417             :     {
     418           0 :         std::stringstream message;
     419           0 :         message << "You cannot get the period before executing the 'Scheduler::schedule()' method!";
     420           0 :         throw tools::runtime_error(__FILE__, __LINE__, __func__, message.str());
     421           0 :     }
     422             : 
     423           0 :     return this->P;
     424             : }
     425             : 
     426             : void
     427           0 : Scheduler_OTAC::reset()
     428             : {
     429           0 :     Scheduler::reset();
     430           0 :     this->P = std::numeric_limits<double>::infinity();
     431           0 : }
     432             : 
     433             : double
     434           0 : Scheduler_OTAC::get_throughput_est() const
     435             : {
     436           0 :     return (1.0 / this->get_period()) * 1e9; // n streams per second
     437             : }

Generated by: LCOV version 1.14