LCOV - code coverage report
Current view: top level - include/Module/Stateful/Adaptor - Adaptor_1_to_n.hxx (source / functions) Hit Total Coverage
Test: streampu_clean.info Lines: 40 50 80.0 %
Date: 2025-01-11 12:25:42 Functions: 4 4 100.0 %

          Line data    Source code
       1             : #include <sstream>
       2             : #include <string>
       3             : 
       4             : #include "Module/Stateful/Adaptor/Adaptor_1_to_n.hpp"
       5             : #include "Tools/Exception/exception.hpp"
       6             : 
       7             : namespace spu
       8             : {
       9             : namespace module
      10             : {
      11             : 
      12             : runtime::Task&
      13             : Adaptor_1_to_n::operator[](const adp::tsk t)
      14             : {
      15             :     return Module::operator[]((size_t)t);
      16             : }
      17             : 
      18             : runtime::Socket&
      19             : Adaptor_1_to_n::operator[](const adp::sck::push_1 s)
      20             : {
      21             :     return Module::operator[]((size_t)adp::tsk::push_1)[(size_t)s];
      22             : }
      23             : 
      24             : runtime::Socket&
      25             : Adaptor_1_to_n::operator[](const adp::sck::pull_n s)
      26             : {
      27             :     return Module::operator[]((size_t)adp::tsk::pull_n)[(size_t)s];
      28             : }
      29             : 
      30             : Adaptor_1_to_n::Adaptor_1_to_n(const size_t n_elmts,
      31             :                                const std::type_index datatype,
      32             :                                const size_t buffer_size,
      33             :                                const bool active_waiting)
      34             :   : Adaptor(n_elmts, datatype, buffer_size)
      35             :   , active_waiting(active_waiting)
      36             :   , cnd_pull(new std::vector<std::condition_variable>(1000))
      37             :   , mtx_pull(new std::vector<std::mutex>(1000))
      38             :   , cnd_put(new std::condition_variable())
      39             :   , mtx_put(new std::mutex())
      40             : {
      41             :     this->init();
      42             : }
      43             : 
      44         114 : Adaptor_1_to_n::Adaptor_1_to_n(const std::vector<size_t>& n_elmts,
      45             :                                const std::vector<std::type_index>& datatype,
      46             :                                const size_t buffer_size,
      47         114 :                                const bool active_waiting)
      48             :   : Adaptor(n_elmts, datatype, buffer_size)
      49         114 :   , active_waiting(active_waiting)
      50         114 :   , cnd_pull(new std::vector<std::condition_variable>(1000))
      51         114 :   , mtx_pull(new std::vector<std::mutex>(1000))
      52         114 :   , cnd_put(new std::condition_variable())
      53         228 :   , mtx_put(new std::mutex())
      54             : {
      55         114 :     this->init();
      56         114 : }
      57             : 
      58             : void
      59         114 : Adaptor_1_to_n::init()
      60             : {
      61         114 :     const std::string name = "Adaptor_1_to_n";
      62         114 :     this->set_name(name);
      63         114 :     this->set_short_name(name);
      64         114 :     this->set_single_wave(true);
      65             : 
      66         114 :     auto& p1 = this->create_task("push_1", (int)adp::tsk::push_1);
      67         114 :     p1.set_replicability(false);
      68         114 :     std::vector<size_t> p1s_in;
      69         296 :     for (size_t s = 0; s < this->n_sockets; s++)
      70         182 :         p1s_in.push_back(this->create_socket_in(p1, "in" + std::to_string(s), this->n_elmts[s], this->datatype[s]));
      71             : 
      72         114 :     this->create_codelet(p1,
      73      339919 :                          [p1s_in](Module& m, runtime::Task& t, const size_t frame_id) -> int
      74             :                          {
      75      339919 :                              auto& adp = static_cast<Adaptor_1_to_n&>(m);
      76      339919 :                              if (adp.is_no_copy_push())
      77             :                              {
      78      340180 :                                  adp.wait_push();
      79             :                                  // for debug mode coherence
      80      998537 :                                  for (size_t s = 0; s < t.sockets.size() - 1; s++)
      81      643702 :                                      t.sockets[s]->dataptr = adp.get_empty_buffer(s);
      82             :                              }
      83             :                              else
      84             :                              {
      85           0 :                                  std::vector<const int8_t*> sockets_dataptr(p1s_in.size());
      86           0 :                                  for (size_t s = 0; s < p1s_in.size(); s++)
      87           0 :                                      sockets_dataptr[s] = t[p1s_in[s]].get_dataptr<const int8_t>();
      88           0 :                                  adp.push_1(sockets_dataptr, frame_id);
      89           0 :                              }
      90      345486 :                              return runtime::status_t::SUCCESS;
      91             :                          });
      92             : 
      93         114 :     auto& p2 = this->create_task("pull_n", (int)adp::tsk::pull_n);
      94         114 :     std::vector<size_t> p2s_out;
      95         296 :     for (size_t s = 0; s < this->n_sockets; s++)
      96         182 :         p2s_out.push_back(this->create_socket_out(p2, "out" + std::to_string(s), this->n_elmts[s], this->datatype[s]));
      97             : 
      98         114 :     this->create_codelet(p2,
      99      331532 :                          [p2s_out](Module& m, runtime::Task& t, const size_t frame_id) -> int
     100             :                          {
     101      331532 :                              auto& adp = static_cast<Adaptor_1_to_n&>(m);
     102      331532 :                              if (adp.is_no_copy_pull())
     103             :                              {
     104      331793 :                                  adp.wait_pull();
     105             :                                  // for debug mode coherence
     106      972842 :                                  for (size_t s = 0; s < t.sockets.size() - 1; s++) // Adaptor pull has only OUT sockets
     107      630595 :                                      t.sockets[s]->_bind(adp.get_filled_buffer(s));
     108             :                              }
     109             :                              else
     110             :                              {
     111           0 :                                  std::vector<int8_t*> sockets_dataptr(p2s_out.size());
     112           0 :                                  for (size_t s = 0; s < p2s_out.size(); s++)
     113           0 :                                      sockets_dataptr[s] = t[p2s_out[s]].get_dataptr<int8_t>();
     114           0 :                                  adp.pull_n(sockets_dataptr, frame_id);
     115           0 :                              }
     116      331498 :                              return runtime::status_t::SUCCESS;
     117             :                          });
     118         114 : }
     119             : }
     120             : }

Generated by: LCOV version 1.14