LCOV - code coverage report
Current view: top level - include/Module/Stateful/Adaptor - Adaptor_n_to_1.hxx (source / functions) Hit Total Coverage
Test: streampu_clean.info Lines: 40 50 80.0 %
Date: 2025-01-11 12:25:42 Functions: 4 4 100.0 %

          Line data    Source code
       1             : #include <sstream>
       2             : #include <string>
       3             : 
       4             : #include "Module/Stateful/Adaptor/Adaptor_n_to_1.hpp"
       5             : #include "Tools/Exception/exception.hpp"
       6             : 
       7             : namespace spu
       8             : {
       9             : namespace module
      10             : {
      11             : 
      12             : runtime::Task&
      13             : Adaptor_n_to_1::operator[](const adp::tsk t)
      14             : {
      15             :     return Module::operator[]((size_t)t);
      16             : }
      17             : 
      18             : runtime::Socket&
      19             : Adaptor_n_to_1::operator[](const adp::sck::push_n s)
      20             : {
      21             :     return Module::operator[]((size_t)adp::tsk::push_n)[(size_t)s];
      22             : }
      23             : 
      24             : runtime::Socket&
      25             : Adaptor_n_to_1::operator[](const adp::sck::pull_1 s)
      26             : {
      27             :     return Module::operator[]((size_t)adp::tsk::pull_1)[(size_t)s];
      28             : }
      29             : 
      30             : Adaptor_n_to_1::Adaptor_n_to_1(const size_t n_elmts,
      31             :                                const std::type_index datatype,
      32             :                                const size_t buffer_size,
      33             :                                const bool active_waiting)
      34             :   : Adaptor(n_elmts, datatype, buffer_size)
      35             :   , active_waiting(active_waiting)
      36             :   , cnd_put(new std::vector<std::condition_variable>(1000))
      37             :   , mtx_put(new std::vector<std::mutex>(1000))
      38             :   , cnd_pull(new std::condition_variable())
      39             :   , mtx_pull(new std::mutex())
      40             : {
      41             :     this->init();
      42             : }
      43             : 
      44          88 : Adaptor_n_to_1::Adaptor_n_to_1(const std::vector<size_t>& n_elmts,
      45             :                                const std::vector<std::type_index>& datatype,
      46             :                                const size_t buffer_size,
      47          88 :                                const bool active_waiting)
      48             :   : Adaptor(n_elmts, datatype, buffer_size)
      49          88 :   , active_waiting(active_waiting)
      50          88 :   , cnd_put(new std::vector<std::condition_variable>(1000))
      51          88 :   , mtx_put(new std::vector<std::mutex>(1000))
      52          88 :   , cnd_pull(new std::condition_variable())
      53         176 :   , mtx_pull(new std::mutex())
      54             : {
      55          88 :     this->init();
      56          88 : }
      57             : 
      58             : void
      59          88 : Adaptor_n_to_1::init()
      60             : {
      61          88 :     const std::string name = "Adaptor_n_to_1";
      62          88 :     this->set_name(name);
      63          88 :     this->set_short_name(name);
      64          88 :     this->set_single_wave(true);
      65             : 
      66          88 :     auto& p1 = this->create_task("push_n", (int)adp::tsk::push_n);
      67          88 :     std::vector<size_t> p1s_in;
      68         255 :     for (size_t s = 0; s < this->n_sockets; s++)
      69         167 :         p1s_in.push_back(this->create_socket_in(p1, "in" + std::to_string(s), this->n_elmts[s], this->datatype[s]));
      70             : 
      71          88 :     this->create_codelet(p1,
      72      229223 :                          [p1s_in](Module& m, runtime::Task& t, const size_t frame_id) -> int
      73             :                          {
      74      229223 :                              auto& adp = static_cast<Adaptor_n_to_1&>(m);
      75      229223 :                              if (adp.is_no_copy_push())
      76             :                              {
      77      229390 :                                  adp.wait_push();
      78             :                                  // for debug mode coherence
      79      576606 :                                  for (size_t s = 0; s < t.sockets.size() - 1; s++)
      80      345138 :                                      t.sockets[s]->dataptr = adp.get_empty_buffer(s);
      81             :                              }
      82             :                              else
      83             :                              {
      84           0 :                                  std::vector<const int8_t*> sockets_dataptr(p1s_in.size());
      85           0 :                                  for (size_t s = 0; s < p1s_in.size(); s++)
      86           0 :                                      sockets_dataptr[s] = t[p1s_in[s]].get_dataptr<const int8_t>();
      87           0 :                                  adp.push_n(sockets_dataptr, frame_id);
      88           0 :                              }
      89      229280 :                              return runtime::status_t::SUCCESS;
      90             :                          });
      91             : 
      92          88 :     auto& p2 = this->create_task("pull_1", (int)adp::tsk::pull_1);
      93          88 :     p2.set_replicability(false);
      94          88 :     std::vector<size_t> p2s_out;
      95         255 :     for (size_t s = 0; s < this->n_sockets; s++)
      96         167 :         p2s_out.push_back(this->create_socket_out(p2, "out" + std::to_string(s), this->n_elmts[s], this->datatype[s]));
      97             : 
      98          88 :     this->create_codelet(p2,
      99      217325 :                          [p2s_out](Module& m, runtime::Task& t, const size_t frame_id) -> int
     100             :                          {
     101      217325 :                              auto& adp = static_cast<Adaptor_n_to_1&>(m);
     102      217325 :                              if (adp.is_no_copy_pull())
     103             :                              {
     104      217465 :                                  adp.wait_pull();
     105             :                                  // for debug mode coherence
     106      546538 :                                  for (size_t s = 0; s < t.sockets.size() - 1; s++)
     107      323008 :                                      t.sockets[s]->_bind(adp.get_filled_buffer(s));
     108             :                              }
     109             :                              else
     110             :                              {
     111           0 :                                  std::vector<int8_t*> sockets_dataptr(p2s_out.size());
     112           0 :                                  for (size_t s = 0; s < p2s_out.size(); s++)
     113           0 :                                      sockets_dataptr[s] = t[p2s_out[s]].get_dataptr<int8_t>();
     114           0 :                                  adp.pull_1(sockets_dataptr, frame_id);
     115           0 :                              }
     116      219179 :                              return runtime::status_t::SUCCESS;
     117             :                          });
     118          88 : }
     119             : }
     120             : }

Generated by: LCOV version 1.14