LCOV - code coverage report
Current view: top level - include/Module/Adaptor - Adaptor_1_to_n.hxx (source / functions) Hit Total Coverage
Test: streampu_clean.info Lines: 39 49 79.6 %
Date: 2024-06-12 12:04:18 Functions: 4 4 100.0 %

          Line data    Source code
       1             : #include <sstream>
       2             : #include <string>
       3             : 
       4             : #include "Module/Adaptor/Adaptor_1_to_n.hpp"
       5             : #include "Tools/Exception/exception.hpp"
       6             : 
       7             : namespace spu
       8             : {
       9             : namespace module
      10             : {
      11             : 
      12             : runtime::Task&
      13             : Adaptor_1_to_n::operator[](const adp::tsk t)
      14             : {
      15             :     return Module::operator[]((size_t)t);
      16             : }
      17             : 
      18             : runtime::Socket&
      19             : Adaptor_1_to_n::operator[](const adp::sck::push_1 s)
      20             : {
      21             :     return Module::operator[]((size_t)adp::tsk::push_1)[(size_t)s];
      22             : }
      23             : 
      24             : runtime::Socket&
      25             : Adaptor_1_to_n::operator[](const adp::sck::pull_n s)
      26             : {
      27             :     return Module::operator[]((size_t)adp::tsk::pull_n)[(size_t)s];
      28             : }
      29             : 
      30             : Adaptor_1_to_n::Adaptor_1_to_n(const size_t n_elmts,
      31             :                                const std::type_index datatype,
      32             :                                const size_t buffer_size,
      33             :                                const bool active_waiting)
      34             :   : Adaptor(n_elmts, datatype, buffer_size)
      35             :   , active_waiting(active_waiting)
      36             :   , cnd_pull(new std::vector<std::condition_variable>(1000))
      37             :   , mtx_pull(new std::vector<std::mutex>(1000))
      38             :   , cnd_put(new std::condition_variable())
      39             :   , mtx_put(new std::mutex())
      40             : {
      41             :     this->init();
      42             : }
      43             : 
      44         101 : Adaptor_1_to_n::Adaptor_1_to_n(const std::vector<size_t>& n_elmts,
      45             :                                const std::vector<std::type_index>& datatype,
      46             :                                const size_t buffer_size,
      47         101 :                                const bool active_waiting)
      48             :   : Adaptor(n_elmts, datatype, buffer_size)
      49         101 :   , active_waiting(active_waiting)
      50         101 :   , cnd_pull(new std::vector<std::condition_variable>(1000))
      51         101 :   , mtx_pull(new std::vector<std::mutex>(1000))
      52         101 :   , cnd_put(new std::condition_variable())
      53         202 :   , mtx_put(new std::mutex())
      54             : {
      55         101 :     this->init();
      56         101 : }
      57             : 
      58             : void
      59         101 : Adaptor_1_to_n::init()
      60             : {
      61         101 :     const std::string name = "Adaptor_1_to_n";
      62         101 :     this->set_name(name);
      63         101 :     this->set_short_name(name);
      64         101 :     this->set_single_wave(true);
      65             : 
      66         101 :     auto& p1 = this->create_task("push_1", (int)adp::tsk::push_1);
      67         101 :     std::vector<size_t> p1s_in;
      68         267 :     for (size_t s = 0; s < this->n_sockets; s++)
      69         166 :         p1s_in.push_back(this->create_socket_in(p1, "in" + std::to_string(s), this->n_elmts[s], this->datatype[s]));
      70             : 
      71         101 :     this->create_codelet(p1,
      72       50525 :                          [p1s_in](Module& m, runtime::Task& t, const size_t frame_id) -> int
      73             :                          {
      74       50525 :                              auto& adp = static_cast<Adaptor_1_to_n&>(m);
      75       50525 :                              if (adp.is_no_copy_push())
      76             :                              {
      77       50537 :                                  adp.wait_push();
      78             :                                  // for debug mode coherence
      79      127169 :                                  for (size_t s = 0; s < t.sockets.size() - 1; s++)
      80       76324 :                                      t.sockets[s]->dataptr = adp.get_empty_buffer(s);
      81             :                              }
      82             :                              else
      83             :                              {
      84           0 :                                  std::vector<const int8_t*> sockets_dataptr(p1s_in.size());
      85           0 :                                  for (size_t s = 0; s < p1s_in.size(); s++)
      86           0 :                                      sockets_dataptr[s] = t[p1s_in[s]].get_dataptr<const int8_t>();
      87           0 :                                  adp.push_1(sockets_dataptr, frame_id);
      88           0 :                              }
      89       50648 :                              return runtime::status_t::SUCCESS;
      90             :                          });
      91             : 
      92         101 :     auto& p2 = this->create_task("pull_n", (int)adp::tsk::pull_n);
      93         101 :     std::vector<size_t> p2s_out;
      94         267 :     for (size_t s = 0; s < this->n_sockets; s++)
      95         166 :         p2s_out.push_back(this->create_socket_out(p2, "out" + std::to_string(s), this->n_elmts[s], this->datatype[s]));
      96             : 
      97         101 :     this->create_codelet(p2,
      98       36534 :                          [p2s_out](Module& m, runtime::Task& t, const size_t frame_id) -> int
      99             :                          {
     100       36534 :                              auto& adp = static_cast<Adaptor_1_to_n&>(m);
     101       36534 :                              if (adp.is_no_copy_pull())
     102             :                              {
     103       36610 :                                  adp.wait_pull();
     104             :                                  // for debug mode coherence
     105       90659 :                                  for (size_t s = 0; s < t.sockets.size() - 1; s++) // Adaptor pull has only OUT sockets
     106       53719 :                                      t.sockets[s]->_bind(adp.get_filled_buffer(s));
     107             :                              }
     108             :                              else
     109             :                              {
     110           0 :                                  std::vector<int8_t*> sockets_dataptr(p2s_out.size());
     111           0 :                                  for (size_t s = 0; s < p2s_out.size(); s++)
     112           0 :                                      sockets_dataptr[s] = t[p2s_out[s]].get_dataptr<int8_t>();
     113           0 :                                  adp.pull_n(sockets_dataptr, frame_id);
     114           0 :                              }
     115       34239 :                              return runtime::status_t::SUCCESS;
     116             :                          });
     117         101 : }
     118             : }
     119             : }

Generated by: LCOV version 1.14