LCOV - code coverage report
Current view: top level - src/Scheduler - Scheduler.cpp (source / functions) Hit Total Coverage
Test: streampu_clean.info Lines: 76 127 59.8 %
Date: 2025-01-11 12:25:42 Functions: 8 13 61.5 %

          Line data    Source code
       1             : #include "Scheduler/Scheduler.hpp"
       2             : #include "Tools/Exception/exception.hpp"
       3             : #include "Tools/Thread/Thread_pinning/Thread_pinning.hpp"
       4             : 
       5             : #include <iostream>
       6             : #include <sstream>
       7             : 
       8             : using namespace spu;
       9             : using namespace spu::sched;
      10             : 
      11           0 : Scheduler::Scheduler(runtime::Sequence& sequence)
      12           0 :   : sequence(&sequence)
      13             : {
      14           0 :     this->sequence = &sequence;
      15           0 : }
      16             : 
      17          11 : Scheduler::Scheduler(runtime::Sequence* sequence)
      18          11 :   : sequence(sequence)
      19             : {
      20          11 :     if (sequence == nullptr)
      21             :     {
      22           0 :         std::stringstream message;
      23           0 :         message << "'sequence' can't be nullptr.";
      24           0 :         throw tools::invalid_argument(__FILE__, __LINE__, __func__, message.str());
      25           0 :     }
      26             : 
      27          11 :     this->sequence = sequence;
      28          11 : }
      29             : 
      30             : void
      31          11 : Scheduler::profile(const size_t n_exec)
      32             : {
      33          11 :     if (n_exec == 0)
      34             :     {
      35           0 :         std::stringstream message;
      36           0 :         message << "'n_exec' has to be higher than zero.";
      37           0 :         throw tools::invalid_argument(__FILE__, __LINE__, __func__, message.str());
      38           0 :     }
      39             : 
      40          11 :     if (sequence->get_n_threads() > 1)
      41             :     {
      42           0 :         std::stringstream message;
      43           0 :         message << "'sequence->get_n_threads()' has to be strictly equal to 1.";
      44           0 :         throw tools::invalid_argument(__FILE__, __LINE__, __func__, message.str());
      45           0 :     }
      46             : 
      47          11 :     if (this->sequence->is_control_flow())
      48             :     {
      49           0 :         std::stringstream message;
      50           0 :         message << "Control flow in the sequence is not supported yet.";
      51           0 :         throw tools::invalid_argument(__FILE__, __LINE__, __func__, message.str());
      52           0 :     }
      53             : 
      54          11 :     if (!this->tasks_desc.empty())
      55             :     {
      56           0 :         std::stringstream message;
      57             :         message << "'tasks_desc' should be empty, you should call 'Scheduler::reset' first if you want to re-run the "
      58           0 :                    "profiling'.";
      59           0 :         throw tools::runtime_error(__FILE__, __LINE__, __func__, message.str());
      60           0 :     }
      61             : 
      62          11 :     this->sequence->set_auto_stop(false);
      63          69 :     for (auto& mod : this->sequence->get_modules<module::Module>(false))
      64         150 :         for (auto& tsk : mod->tasks)
      65             :         {
      66          92 :             tsk->reset();
      67          92 :             tsk->set_stats(true); // enable the statistics
      68          92 :             tsk->set_fast(true);  // enable the fast mode (= disable the useless verifs
      69             :                                   // in the tasks)
      70          11 :         }
      71          11 :     unsigned int counter = 0;
      72        1111 :     this->sequence->exec([&counter, &n_exec]() { return ++counter >= n_exec; });
      73          11 :     this->sequence->set_auto_stop(true);
      74             : 
      75          11 :     std::vector<runtime::Task*>& tasks = this->sequence->sequences[0]->get_contents()->tasks;
      76          70 :     for (auto& t : tasks)
      77             :     {
      78             :         task_desc_t new_t;
      79          59 :         new_t.tptr = t;
      80          59 :         new_t.exec_duration = t->get_duration_avg();
      81          59 :         this->tasks_desc.push_back(new_t);
      82             :     }
      83          11 : }
      84             : 
      85             : void
      86           6 : Scheduler::print_profiling(std::ostream& stream)
      87             : {
      88           6 :     stream << "# Profiling:" << std::endl;
      89          40 :     for (auto& t : this->tasks_desc)
      90             :     {
      91          34 :         stream << "# - Name: " << t.tptr->get_name();
      92          34 :         stream << " - Execution duration: " << t.exec_duration.count() << " ns";
      93          34 :         stream << " - Replicable: " << (t.tptr->is_replicable() ? "yes" : "no") << std::endl;
      94             :     }
      95           6 : }
      96             : 
      97             : const std::vector<task_desc_t>&
      98           0 : Scheduler::get_profiling()
      99             : {
     100           0 :     return this->tasks_desc;
     101             : }
     102             : 
     103             : void
     104           0 : Scheduler::reset()
     105             : {
     106           0 :     this->solution.clear();
     107           0 :     this->tasks_desc.clear();
     108           0 : }
     109             : 
     110             : std::string
     111           1 : Scheduler::perform_threads_mapping() const
     112             : {
     113           1 :     if (this->solution.size() == 0)
     114             :     {
     115           0 :         std::stringstream message;
     116             :         message
     117           0 :           << "The solution has to contain at least one element, please run the 'Scheduler::schedule' method first.";
     118           0 :         throw tools::invalid_argument(__FILE__, __LINE__, __func__, message.str());
     119           0 :     }
     120             : 
     121           1 :     std::string pinning_policy;
     122           1 :     bool first_stage = true;
     123           1 :     size_t puid = 0;
     124           7 :     for (auto& stage : this->solution)
     125             :     {
     126           6 :         if (!first_stage) pinning_policy += " | ";
     127             : 
     128          14 :         for (size_t st = 0; st < stage.second; st++)
     129           8 :             pinning_policy += std::string((st == 0) ? "" : "; ") + "PU_" + std::to_string(puid++);
     130             : 
     131           6 :         first_stage = false;
     132             :     }
     133             : 
     134           1 :     return pinning_policy;
     135           0 : }
     136             : 
     137             : runtime::Pipeline*
     138          11 : Scheduler::instantiate_pipeline(const size_t buffer_size,
     139             :                                 const bool active_waiting,
     140             :                                 const bool thread_pining,
     141             :                                 const std::string& pinning_policy)
     142             : {
     143          11 :     if (this->solution.size() == 0)
     144             :     {
     145           0 :         std::stringstream message;
     146             :         message
     147           0 :           << "The solution has to contain at least one element, please run the 'Scheduler::schedule' method first.";
     148           0 :         throw tools::invalid_argument(__FILE__, __LINE__, __func__, message.str());
     149           0 :     }
     150             : 
     151          11 :     std::vector<runtime::Task*> firsts(this->tasks_desc.size());
     152          11 :     std::vector<runtime::Task*> lasts(this->tasks_desc.size());
     153             : 
     154          11 :     std::vector<std::pair<std::vector<runtime::Task*>, std::vector<runtime::Task*>>> sep_stages(this->solution.size());
     155          11 :     std::vector<size_t> n_threads(this->solution.size());
     156          11 :     size_t s = 0;
     157          11 :     size_t i = 0;
     158          39 :     for (auto& stage : this->solution)
     159             :     {
     160          28 :         std::pair<std::vector<runtime::Task*>, std::vector<runtime::Task*>> cur_stage_desc;
     161          28 :         sep_stages[s].first.resize(stage.first);
     162          28 :         sep_stages[s].second.resize(stage.first);
     163          87 :         for (size_t t = 0; t < stage.first; t++)
     164             :         {
     165          59 :             firsts[i] = this->tasks_desc[i].tptr;
     166          59 :             lasts[i] = this->tasks_desc[i].tptr;
     167             : 
     168          59 :             sep_stages[s].first[t] = this->tasks_desc[i].tptr;
     169          59 :             sep_stages[s].second[t] = this->tasks_desc[i].tptr;
     170             : 
     171          59 :             i++;
     172             :         }
     173             : 
     174          28 :         n_threads[s] = stage.second;
     175          28 :         s++;
     176          28 :     }
     177             : 
     178          11 :     std::vector<size_t> synchro_buffer_sizes(this->solution.size() - 1, buffer_size);
     179          11 :     std::vector<bool> synchro_active_waitings(this->solution.size() - 1, active_waiting);
     180          11 :     std::vector<bool> thread_pinings(this->solution.size(), thread_pining);
     181             : 
     182             :     return new runtime::Pipeline(firsts,
     183             :                                  lasts,
     184             :                                  sep_stages,
     185             :                                  n_threads,
     186             :                                  synchro_buffer_sizes,
     187             :                                  synchro_active_waitings,
     188             :                                  thread_pinings,
     189          22 :                                  pinning_policy);
     190          11 : }
     191             : 
     192             : std::vector<std::pair<size_t, size_t>>
     193           6 : Scheduler::get_solution()
     194             : {
     195           6 :     return this->solution;
     196             : }
     197             : 
     198             : runtime::Pipeline*
     199           1 : Scheduler::generate_pipeline()
     200             : {
     201           1 :     if (tasks_desc.empty()) this->profile();
     202             : 
     203           1 :     if (solution.empty()) this->schedule();
     204             : 
     205           1 :     return this->instantiate_pipeline(1, false, tools::Thread_pinning::is_init(), this->perform_threads_mapping());
     206             : }
     207             : 
     208             : size_t
     209           0 : Scheduler::get_n_alloc_ressources() const
     210             : {
     211           0 :     if (this->solution.size() == 0)
     212             :     {
     213           0 :         std::stringstream message;
     214             :         message
     215           0 :           << "The solution has to contain at least one element, please run the 'Scheduler::schedule' method first.";
     216           0 :         throw tools::invalid_argument(__FILE__, __LINE__, __func__, message.str());
     217           0 :     }
     218           0 :     size_t R = 0;
     219           0 :     for (auto s : this->solution)
     220           0 :         R += s.second;
     221           0 :     return R;
     222             : }
     223             : 
     224             : double
     225           0 : Scheduler::get_throughput_est() const
     226             : {
     227           0 :     throw tools::unimplemented_error(__FILE__, __LINE__, __func__);
     228             : }

Generated by: LCOV version 1.14