LCOV - code coverage report
Current view: top level - src/Scheduler - Scheduler.cpp (source / functions) Hit Total Coverage
Test: streampu_clean.info Lines: 94 189 49.7 %
Date: 2025-03-14 12:33:06 Functions: 12 19 63.2 %

          Line data    Source code
       1             : #include "Scheduler/Scheduler.hpp"
       2             : #include "Tools/Display/Statistics/Statistics.hpp"
       3             : #include "Tools/Exception/exception.hpp"
       4             : #include "Tools/Thread/Thread_pinning/Thread_pinning.hpp"
       5             : 
       6             : #include <iostream>
       7             : #include <sstream>
       8             : 
       9             : using namespace spu;
      10             : using namespace spu::sched;
      11             : 
      12           0 : Scheduler::Scheduler(runtime::Sequence& sequence)
      13           0 :   : sequence(&sequence)
      14             : {
      15           0 :     this->sequence = &sequence;
      16           0 : }
      17             : 
      18          11 : Scheduler::Scheduler(runtime::Sequence* sequence)
      19          11 :   : sequence(sequence)
      20             : {
      21          11 :     if (sequence == nullptr)
      22             :     {
      23           0 :         std::stringstream message;
      24           0 :         message << "'sequence' can't be nullptr.";
      25           0 :         throw tools::invalid_argument(__FILE__, __LINE__, __func__, message.str());
      26           0 :     }
      27             : 
      28          11 :     this->sequence = sequence;
      29          11 : }
      30             : 
      31             : void
      32          11 : Scheduler::_profile(const int puid, const size_t n_exec)
      33             : {
      34          11 :     if (n_exec == 0)
      35             :     {
      36           0 :         std::stringstream message;
      37           0 :         message << "'n_exec' has to be higher than zero.";
      38           0 :         throw tools::invalid_argument(__FILE__, __LINE__, __func__, message.str());
      39           0 :     }
      40             : 
      41          11 :     if (sequence->get_n_threads() > 1)
      42             :     {
      43           0 :         std::stringstream message;
      44           0 :         message << "'sequence->get_n_threads()' has to be strictly equal to 1.";
      45           0 :         throw tools::invalid_argument(__FILE__, __LINE__, __func__, message.str());
      46           0 :     }
      47             : 
      48          11 :     if (this->sequence->is_control_flow())
      49             :     {
      50           0 :         std::stringstream message;
      51           0 :         message << "Control flow in the sequence is not supported yet.";
      52           0 :         throw tools::invalid_argument(__FILE__, __LINE__, __func__, message.str());
      53           0 :     }
      54             : 
      55          11 :     this->sequence->set_auto_stop(false);
      56          69 :     for (auto& mod : this->sequence->get_modules<module::Module>(false))
      57         150 :         for (auto& tsk : mod->tasks)
      58             :         {
      59          92 :             tsk->reset();
      60          92 :             tsk->set_stats(true); // enable the statistics
      61          92 :             tsk->set_fast(true);  // enable the fast mode (= disable the useless verifs
      62             :                                   // in the tasks)
      63          11 :         }
      64             : 
      65             :     bool prev_thread_pinning;
      66          11 :     std::vector<std::string> prev_pin_objects_per_thread;
      67          11 :     std::vector<size_t> prev_puids;
      68          11 :     if (puid >= 0)
      69             :     {
      70           0 :         prev_thread_pinning = this->sequence->thread_pinning;
      71           0 :         prev_pin_objects_per_thread = this->sequence->pin_objects_per_thread;
      72           0 :         prev_puids = this->sequence->puids;
      73             : 
      74           0 :         this->sequence->set_thread_pinning(true, std::vector<size_t>(1, puid));
      75             :     }
      76             : 
      77          11 :     unsigned int counter = 0;
      78        1111 :     this->sequence->exec([&counter, &n_exec]() { return ++counter >= n_exec; });
      79          11 :     this->sequence->set_auto_stop(true);
      80             : 
      81          11 :     if (puid >= 0)
      82             :     {
      83           0 :         this->sequence->thread_pinning = prev_thread_pinning;
      84           0 :         this->sequence->pin_objects_per_thread = prev_pin_objects_per_thread;
      85           0 :         this->sequence->puids = prev_puids;
      86             :     }
      87             : 
      88          11 :     if (this->tasks_desc.empty())
      89             :     {
      90          11 :         std::vector<runtime::Task*>& tasks = this->sequence->sequences[0]->get_contents()->tasks;
      91          70 :         for (auto& t : tasks)
      92             :         {
      93          59 :             task_desc_t new_t;
      94          59 :             new_t.tptr = t;
      95          59 :             new_t.exec_duration.push_back(t->get_duration_avg());
      96          59 :             this->tasks_desc.push_back(new_t);
      97          59 :         }
      98             :     }
      99             :     else
     100             :     {
     101           0 :         std::vector<runtime::Task*>& tasks = this->sequence->sequences[0]->get_contents()->tasks;
     102           0 :         size_t i = 0;
     103           0 :         for (auto& t : tasks)
     104             :         {
     105           0 :             task_desc_t& cur_t = this->tasks_desc[i];
     106           0 :             if (t != cur_t.tptr)
     107             :             {
     108           0 :                 std::stringstream message;
     109           0 :                 message << "'t' should be equal to 'cur_t.tptr' ('t' = " << std::hex << (uint64_t)t
     110           0 :                         << ", 'cur_t.tptr' = " << (uint64_t)cur_t.tptr << ", 'i' = " << i << ").";
     111           0 :                 throw tools::runtime_error(__FILE__, __LINE__, __func__, message.str());
     112           0 :             }
     113           0 :             cur_t.exec_duration.push_back(t->get_duration_avg());
     114           0 :             i++;
     115             :         }
     116             :     }
     117             : 
     118             :     // TODO: Restore stats and fast mode of tasks as before!
     119          11 : }
     120             : 
     121             : void
     122          11 : Scheduler::profile(const size_t n_exec)
     123             : {
     124          11 :     if (!this->tasks_desc.empty())
     125             :     {
     126           0 :         std::stringstream message;
     127             :         message << "'tasks_desc' should be empty, you should call 'Scheduler::reset' first if you want to re-run the "
     128           0 :                    "profiling'.";
     129           0 :         throw tools::runtime_error(__FILE__, __LINE__, __func__, message.str());
     130           0 :     }
     131             : 
     132          11 :     this->_profile(-1, n_exec);
     133             : 
     134          11 :     this->profiling_summary.resize(1);
     135             : 
     136          11 :     std::stringstream ss;
     137          11 :     tools::Stats::show(this->sequence->get_tasks_per_threads()[0], false, false, ss);
     138          11 :     this->profiling_summary[0] = ss.str();
     139          11 : }
     140             : 
     141             : void
     142           0 : Scheduler::profile(const std::vector<size_t>& puids, const size_t n_exec)
     143             : {
     144           0 :     if (!this->tasks_desc.empty())
     145             :     {
     146           0 :         std::stringstream message;
     147             :         message << "'tasks_desc' should be empty, you should call 'Scheduler::reset' first if you want to re-run the "
     148           0 :                    "profiling'.";
     149           0 :         throw tools::runtime_error(__FILE__, __LINE__, __func__, message.str());
     150           0 :     }
     151             : 
     152           0 :     this->profiling_summary.resize(puids.size());
     153           0 :     for (size_t i = 0; i < puids.size(); i++)
     154             :     {
     155           0 :         this->_profile(puids[i], n_exec);
     156           0 :         std::stringstream ss;
     157           0 :         tools::Stats::show(this->sequence->get_tasks_per_threads()[0], false, false, ss);
     158           0 :         this->profiling_summary[i] = ss.str();
     159           0 :     }
     160             : 
     161           0 :     this->profiled_puids = puids;
     162           0 : }
     163             : 
     164             : void
     165           6 : Scheduler::print_profiling(std::ostream& stream)
     166             : {
     167           6 :     stream << "# Profiling:" << std::endl;
     168           6 :     if (this->profiled_puids.size())
     169           0 :         for (size_t p = 0; p < this->profiled_puids.size(); p++)
     170             :         {
     171           0 :             stream << "# On PUID n°" << this->profiled_puids[p] << std::endl;
     172           0 :             std::cout << this->profiling_summary[p];
     173           0 :             if (p < this->profiled_puids.size() - 1) stream << "# ----------- " << std::endl;
     174             :         }
     175             :     else
     176           6 :         stream << this->profiling_summary[0];
     177           6 : }
     178             : 
     179             : const std::vector<task_desc_t>&
     180           0 : Scheduler::get_profiling()
     181             : {
     182           0 :     return this->tasks_desc;
     183             : }
     184             : 
     185             : void
     186           0 : Scheduler::reset()
     187             : {
     188           0 :     this->solution.clear();
     189           0 :     this->tasks_desc.clear();
     190           0 : }
     191             : 
     192             : std::vector<bool>
     193           1 : Scheduler::get_thread_pinnings() const
     194             : {
     195           2 :     return std::vector<bool>(this->solution.size(), tools::Thread_pinning::is_init());
     196             : }
     197             : 
     198             : std::vector<size_t>
     199           1 : Scheduler::get_sync_buff_sizes() const
     200             : {
     201           2 :     return std::vector<size_t>(this->solution.size() - 1, 1);
     202             : }
     203             : 
     204             : std::vector<bool>
     205           1 : Scheduler::get_sync_active_waitings() const
     206             : {
     207           1 :     return std::vector<bool>(this->solution.size() - 1, false);
     208             : }
     209             : 
     210             : std::string
     211           1 : Scheduler::get_threads_mapping() const
     212             : {
     213           1 :     if (this->solution.size() == 0)
     214             :     {
     215           0 :         std::stringstream message;
     216             :         message
     217           0 :           << "The solution has to contain at least one element, please run the 'Scheduler::schedule' method first.";
     218           0 :         throw tools::invalid_argument(__FILE__, __LINE__, __func__, message.str());
     219           0 :     }
     220             : 
     221           1 :     std::string pinning_policy;
     222           1 :     bool first_stage = true;
     223           1 :     size_t puid = 0;
     224           7 :     for (auto& stage : this->solution)
     225             :     {
     226           6 :         if (!first_stage) pinning_policy += " | ";
     227             : 
     228          14 :         for (size_t st = 0; st < stage.second; st++)
     229           8 :             pinning_policy += std::string((st == 0) ? "" : "; ") + "PU_" + std::to_string(puid++);
     230             : 
     231           6 :         first_stage = false;
     232             :     }
     233             : 
     234           1 :     return pinning_policy;
     235           0 : }
     236             : 
     237             : runtime::Pipeline*
     238          11 : Scheduler::instantiate_pipeline(const std::vector<size_t> synchro_buffer_sizes,
     239             :                                 const std::vector<bool> synchro_active_waitings,
     240             :                                 const std::vector<bool> thread_pinings,
     241             :                                 const std::string& pinning_policy)
     242             : {
     243          11 :     if (this->solution.size() == 0)
     244             :     {
     245           0 :         std::stringstream message;
     246             :         message
     247           0 :           << "The solution has to contain at least one element, please run the 'Scheduler::schedule' method first.";
     248           0 :         throw tools::invalid_argument(__FILE__, __LINE__, __func__, message.str());
     249           0 :     }
     250             : 
     251             :     // std::cout << "synchro_buffer_sizes = {";
     252             :     // for (size_t i = 0; i < synchro_buffer_sizes.size(); i++)
     253             :     //     std::cout << synchro_buffer_sizes[i] << ", ";
     254             :     // std::cout << "}" << std::endl;
     255             :     // std::cout << "synchro_active_waitings = {";
     256             :     // for (size_t i = 0; i < synchro_active_waitings.size(); i++)
     257             :     //     std::cout << synchro_active_waitings[i] << ", ";
     258             :     // std::cout << "}" << std::endl;
     259             :     // std::cout << "thread_pinings = {";
     260             :     // for (size_t i = 0; i < thread_pinings.size(); i++)
     261             :     //     std::cout << thread_pinings[i] << ", ";
     262             :     // std::cout << "}" << std::endl;
     263             :     // std::cout << "pinning_policy = " << pinning_policy << std::endl;
     264             : 
     265          11 :     std::vector<runtime::Task*> firsts(this->tasks_desc.size());
     266          11 :     std::vector<runtime::Task*> lasts(this->tasks_desc.size());
     267             : 
     268          11 :     std::vector<std::pair<std::vector<runtime::Task*>, std::vector<runtime::Task*>>> sep_stages(this->solution.size());
     269          11 :     std::vector<size_t> n_threads(this->solution.size());
     270          11 :     size_t s = 0;
     271          11 :     size_t i = 0;
     272          39 :     for (auto& stage : this->solution)
     273             :     {
     274          28 :         std::pair<std::vector<runtime::Task*>, std::vector<runtime::Task*>> cur_stage_desc;
     275          28 :         sep_stages[s].first.resize(stage.first);
     276          28 :         sep_stages[s].second.resize(stage.first);
     277          87 :         for (size_t t = 0; t < stage.first; t++)
     278             :         {
     279          59 :             firsts[i] = this->tasks_desc[i].tptr;
     280          59 :             lasts[i] = this->tasks_desc[i].tptr;
     281             : 
     282          59 :             sep_stages[s].first[t] = this->tasks_desc[i].tptr;
     283          59 :             sep_stages[s].second[t] = this->tasks_desc[i].tptr;
     284             : 
     285          59 :             i++;
     286             :         }
     287             : 
     288          28 :         n_threads[s] = stage.second;
     289          28 :         s++;
     290          28 :     }
     291             : 
     292             :     return new runtime::Pipeline(firsts,
     293             :                                  lasts,
     294             :                                  sep_stages,
     295             :                                  n_threads,
     296             :                                  synchro_buffer_sizes,
     297             :                                  synchro_active_waitings,
     298             :                                  thread_pinings,
     299          22 :                                  pinning_policy);
     300          11 : }
     301             : 
     302             : runtime::Pipeline*
     303           0 : Scheduler::instantiate_pipeline(const size_t buffer_size,
     304             :                                 const bool active_waiting,
     305             :                                 const bool thread_pining,
     306             :                                 const std::string& pinning_policy)
     307             : {
     308           0 :     std::vector<size_t> synchro_buffer_sizes(this->solution.size() - 1, buffer_size);
     309           0 :     std::vector<bool> synchro_active_waitings(this->solution.size() - 1, active_waiting);
     310           0 :     std::vector<bool> thread_pinings(this->solution.size(), thread_pining);
     311             : 
     312           0 :     return instantiate_pipeline(synchro_buffer_sizes, synchro_active_waitings, thread_pinings, pinning_policy);
     313           0 : }
     314             : 
     315             : std::vector<std::pair<size_t, size_t>>
     316          36 : Scheduler::get_solution()
     317             : {
     318          36 :     return this->solution;
     319             : }
     320             : 
     321             : runtime::Pipeline*
     322           1 : Scheduler::generate_pipeline()
     323             : {
     324           1 :     if (tasks_desc.empty()) this->profile();
     325             : 
     326           1 :     if (solution.empty()) this->schedule();
     327             : 
     328           2 :     return this->instantiate_pipeline(this->get_sync_buff_sizes(),
     329           2 :                                       this->get_sync_active_waitings(),
     330           2 :                                       this->get_thread_pinnings(),
     331           3 :                                       this->get_threads_mapping());
     332             : }
     333             : 
     334             : size_t
     335           0 : Scheduler::get_n_alloc_ressources() const
     336             : {
     337           0 :     if (this->solution.size() == 0)
     338             :     {
     339           0 :         std::stringstream message;
     340             :         message
     341           0 :           << "The solution has to contain at least one element, please run the 'Scheduler::schedule' method first.";
     342           0 :         throw tools::invalid_argument(__FILE__, __LINE__, __func__, message.str());
     343           0 :     }
     344           0 :     size_t R = 0;
     345           0 :     for (auto s : this->solution)
     346           0 :         R += s.second;
     347           0 :     return R;
     348             : }
     349             : 
     350             : double
     351           0 : Scheduler::get_throughput_est() const
     352             : {
     353           0 :     throw tools::unimplemented_error(__FILE__, __LINE__, __func__);
     354             : }

Generated by: LCOV version 1.14