LCOV - code coverage report
Current view: top level - src/Module/Stateful/Adaptor - Adaptor_m_to_n.cpp (source / functions) Hit Total Coverage
Test: streampu_clean.info Lines: 176 251 70.1 %
Date: 2025-03-14 12:33:06 Functions: 27 29 93.1 %

          Line data    Source code
       1             : #include "Module/Stateful/Adaptor/Adaptor_m_to_n.hpp"
       2             : #include "Tools/Math/utils.h"
       3             : 
       4             : using namespace spu;
       5             : using namespace spu::module;
       6             : 
       7       16230 : Adaptor_m_to_n::~Adaptor_m_to_n()
       8             : {
       9     1518145 :     for (auto b : this->buffer_to_free)
      10     1512735 :         delete[] b;
      11        5410 :     if (this->cloned) (*this->n_clones)--;
      12       10820 : }
      13             : 
      14             : Adaptor_m_to_n*
      15        5168 : Adaptor_m_to_n::clone() const
      16             : {
      17        5168 :     auto m = new Adaptor_m_to_n(*this);
      18        5168 :     m->deep_copy(*this);
      19        5168 :     return m;
      20             : }
      21             : 
      22             : void
      23        5168 : Adaptor_m_to_n::deep_copy(const Adaptor_m_to_n& m)
      24             : {
      25        5168 :     Stateful::deep_copy(m);
      26             : 
      27        5168 :     if (*this->buffers_allocated)
      28             :     {
      29           0 :         std::stringstream message;
      30           0 :         message << "Shared buffers have already been allocated, cloning is no more possible." << std::endl;
      31           0 :         throw tools::runtime_error(__FILE__, __LINE__, __func__, message.str());
      32           0 :     }
      33             : 
      34        5168 :     this->cloned = true;
      35        5168 :     (*this->n_clones)++;
      36        5168 :     this->tid_push = -1;
      37        5168 :     this->tid_pull = -1;
      38        5168 :     this->cur_push_id = -1;
      39        5168 :     this->cur_pull_id = -1;
      40             : 
      41        5168 :     this->waiting_canceled.reset(new std::atomic<bool>(m.waiting_canceled->load()));
      42        5168 : }
      43             : 
      44             : void
      45         242 : Adaptor_m_to_n::alloc_buffers()
      46             : {
      47         242 :     if (*this->buffers_allocated)
      48             :     {
      49           0 :         std::stringstream message;
      50           0 :         message << "Synchronization buffers have already been allocated.";
      51           0 :         throw tools::runtime_error(__FILE__, __LINE__, __func__, message.str());
      52           0 :     }
      53             : 
      54         242 :     if (this->cloned)
      55             :     {
      56           0 :         std::stringstream message;
      57           0 :         message << "'alloc_buffers()' cannot be called on a cloned module ('tid_push' = " << this->tid_push
      58           0 :                 << ", 'tid_pull' = " << this->tid_pull << ").";
      59           0 :         throw tools::runtime_error(__FILE__, __LINE__, __func__, message.str());
      60           0 :     }
      61             : 
      62         242 :     size_t ppcm = tools::find_smallest_common_multiple(*this->n_pushers, *this->n_pullers);
      63             : 
      64         242 :     if (ppcm > 1000)
      65             :     {
      66           0 :         std::stringstream message;
      67           0 :         message << "'ppcm' cannot exceed 1000 ('ppcm' = " << ppcm << ").";
      68           0 :         throw tools::invalid_argument(__FILE__, __LINE__, __func__, message.str());
      69           0 :     }
      70             : 
      71         484 :     this->buffer->resize(ppcm,
      72         484 :                          std::vector<std::vector<int8_t*>>(this->n_sockets, std::vector<int8_t*>(this->buffer_size)));
      73             : 
      74         242 :     if (this->buffer_to_free.size())
      75             :     {
      76           0 :         std::stringstream message;
      77           0 :         message << "This should never happen.";
      78           0 :         throw tools::runtime_error(__FILE__, __LINE__, __func__, message.str());
      79           0 :     }
      80             : 
      81             :     // this->buffer_to_free.clear();
      82        5420 :     for (size_t d = 0; d < ppcm; d++)
      83             :     {
      84       15721 :         for (size_t s = 0; s < this->n_sockets; s++)
      85     1523278 :             for (size_t b = 0; b < this->buffer_size; b++)
      86             :             {
      87     1512735 :                 (*this->buffer)[d][s][b] = new int8_t[this->n_frames * this->n_bytes[s]];
      88     1512735 :                 this->buffer_to_free.push_back((*this->buffer)[d][s][b]);
      89             :             }
      90        5178 :         (*this->first)[d] = 0;
      91        5178 :         (*this->last)[d] = 0;
      92        5178 :         (*this->counter)[d] = this->buffer_size;
      93             :     }
      94             : 
      95         242 :     *this->buffers_allocated = true;
      96         242 : }
      97             : 
      98             : void
      99        2342 : Adaptor_m_to_n::add_pusher()
     100             : {
     101        2342 :     if (*this->n_pushers - 1 + *this->n_pullers - 1 >= *this->n_clones)
     102             :     {
     103           0 :         std::stringstream message;
     104           0 :         message << "Pusher cannot be added because a clone is missing ('n_pushers' = " << *this->n_pushers
     105           0 :                 << ", 'n_pullers' = " << *this->n_pullers << ", 'n_clones' = " << *this->n_clones << ").";
     106           0 :         throw tools::runtime_error(__FILE__, __LINE__, __func__, message.str());
     107           0 :     }
     108             : 
     109        2342 :     this->tid_push = (*this->n_pushers);
     110        2342 :     (*this->n_pushers)++;
     111        2342 :     this->cur_push_id = this->tid_push;
     112        2342 : }
     113             : 
     114             : void
     115        2342 : Adaptor_m_to_n::add_puller()
     116             : {
     117        2342 :     if (*this->n_pushers - 1 + *this->n_pullers - 1 >= *this->n_clones)
     118             :     {
     119           0 :         std::stringstream message;
     120           0 :         message << "Puller cannot be added because a clone is missing ('n_pushers' = " << *this->n_pushers
     121           0 :                 << ", 'n_pullers' = " << *this->n_pullers << ", 'n_clones' = " << *this->n_clones << ").";
     122           0 :         throw tools::runtime_error(__FILE__, __LINE__, __func__, message.str());
     123           0 :     }
     124             : 
     125        2342 :     this->tid_pull = (*this->n_pullers);
     126        2342 :     (*this->n_pullers)++;
     127        2342 :     this->cur_pull_id = this->tid_pull;
     128        2342 : }
     129             : 
     130             : void
     131        9848 : Adaptor_m_to_n::send_cancel_signal()
     132             : {
     133        9848 :     *this->waiting_canceled = true;
     134        9846 : }
     135             : 
     136             : void
     137        4926 : Adaptor_m_to_n::reset()
     138             : {
     139        4926 :     if (!this->cloned)
     140             :     {
     141         242 :         *this->waiting_canceled = false;
     142        5420 :         for (size_t d = 0; d < this->buffer->size(); d++)
     143             :         {
     144        5178 :             (*this->first)[d] = 0;
     145        5178 :             (*this->last)[d] = 0;
     146        5178 :             (*this->counter)[d] = this->buffer_size;
     147             :         }
     148             :     }
     149        4926 :     this->cur_push_id = (size_t)this->tid_push;
     150        4926 :     this->cur_pull_id = (size_t)this->tid_pull;
     151        4926 :     this->reset_buffer();
     152        4926 : }
     153             : 
     154             : void
     155        5167 : Adaptor_m_to_n::set_no_copy_pull(const bool no_copy_pull)
     156             : {
     157        5167 :     this->no_copy_pull = no_copy_pull;
     158        5167 : }
     159             : 
     160             : void
     161        5168 : Adaptor_m_to_n::set_no_copy_push(const bool no_copy_push)
     162             : {
     163        5168 :     this->no_copy_push = no_copy_push;
     164        5168 : }
     165             : 
     166             : bool
     167      536035 : Adaptor_m_to_n::is_no_copy_pull()
     168             : {
     169      536035 :     return this->no_copy_pull;
     170             : }
     171             : 
     172             : bool
     173      544648 : Adaptor_m_to_n::is_no_copy_push()
     174             : {
     175      544648 :     return this->no_copy_push;
     176             : }
     177             : 
     178             : void
     179       10094 : Adaptor_m_to_n::reset_buffer()
     180             : {
     181       10094 :     if (!this->cloned && *this->buffers_allocated)
     182             :     {
     183         726 :         size_t id_buff = 0;
     184       16260 :         for (size_t d = 0; d < (*this->buffer).size(); d++)
     185       47163 :             for (size_t s = 0; s < this->n_sockets; s++)
     186     4569834 :                 for (size_t b = 0; b < this->buffer_size; b++)
     187     4538205 :                     (*this->buffer)[d][s][b] = this->buffer_to_free[id_buff++];
     188             :     }
     189       10094 : }
     190             : 
     191             : void
     192       11486 : Adaptor_m_to_n::set_n_frames(const size_t n_frames)
     193             : {
     194       11486 :     const auto old_n_frames = this->get_n_frames();
     195       11486 :     if (old_n_frames != n_frames)
     196             :     {
     197        2374 :         Module::set_n_frames(n_frames);
     198        2374 :         if (!this->cloned)
     199             :         {
     200          62 :             if (*this->buffers_allocated)
     201             :             {
     202        2448 :                 for (size_t d = 0; d < (*this->buffer).size(); d++)
     203             :                 {
     204        7178 :                     for (size_t s = 0; s < (*this->buffer)[d].size(); s++)
     205             :                     {
     206      108352 :                         for (size_t b = 0; b < (*this->buffer)[d][s].size(); b++)
     207             :                         {
     208      103560 :                             auto old_ptr = (*this->buffer)[d][s][b];
     209      103560 :                             (*this->buffer)[d][s][b] = new int8_t[this->n_bytes[s] * n_frames];
     210             : 
     211      103560 :                             bool found = false;
     212   191178516 :                             for (size_t bf = 0; bf < this->buffer_to_free.size(); bf++)
     213   191178516 :                                 if (this->buffer_to_free[bf] == old_ptr)
     214             :                                 {
     215      103560 :                                     delete[] this->buffer_to_free[bf];
     216      103560 :                                     this->buffer_to_free[bf] = (*this->buffer)[d][s][b];
     217      103560 :                                     found = true;
     218      103560 :                                     break;
     219             :                                 }
     220             : 
     221      103560 :                             if (found == false)
     222             :                             {
     223           0 :                                 std::stringstream message;
     224           0 :                                 message << "This should never happen.";
     225           0 :                                 throw tools::runtime_error(__FILE__, __LINE__, __func__, message.str());
     226           0 :                             }
     227             :                         }
     228             :                     }
     229             :                 }
     230             :             }
     231             :         }
     232             :     }
     233       11486 : }
     234             : 
     235             : // --------------------------------------------------------------------------------------------------------------------
     236             : 
     237             : void
     238           0 : Adaptor_m_to_n::push(const std::vector<const int8_t*>& in, const size_t frame_id)
     239             : {
     240           0 :     this->wait_push();
     241             : 
     242           0 :     for (size_t s = 0; s < this->n_sockets; s++)
     243             :     {
     244           0 :         int8_t* out = (int8_t*)this->get_empty_buffer(s);
     245             : 
     246           0 :         std::copy(
     247           0 :           in[s] + 0 * this->n_bytes[s], in[s] + this->get_n_frames() * this->n_bytes[s], out + 0 * this->n_bytes[s]);
     248             :     }
     249             : 
     250           0 :     this->wake_up_puller();
     251           0 : }
     252             : 
     253             : void
     254           0 : Adaptor_m_to_n::pull(const std::vector<int8_t*>& out, const size_t frame_id)
     255             : {
     256           0 :     this->wait_pull();
     257             : 
     258           0 :     for (size_t s = 0; s < this->n_sockets; s++)
     259             :     {
     260           0 :         const int8_t* in = (const int8_t*)this->get_filled_buffer(s);
     261             : 
     262           0 :         std::copy(
     263           0 :           in + 0 * this->n_bytes[s], in + this->get_n_frames() * this->n_bytes[s], out[s] + 0 * this->n_bytes[s]);
     264             :     }
     265             : 
     266           0 :     this->wake_up_pusher();
     267           0 : }
     268             : 
     269             : void
     270      545787 : Adaptor_m_to_n::wait_push()
     271             : {
     272      545787 :     if (this->tid_push < 0)
     273             :     {
     274           0 :         std::stringstream message;
     275           0 :         message << "This instance is not a pusher.";
     276           0 :         throw tools::runtime_error(__FILE__, __LINE__, __func__, message.str());
     277           0 :     }
     278             : 
     279      545787 :     if (this->active_waiting)
     280             :     {
     281       47934 :         while (this->is_full(this->cur_push_id) && !*this->waiting_canceled)
     282             :             ;
     283             :     }
     284             :     else // passive waiting
     285             :     {
     286      527901 :         if (this->is_full(this->cur_push_id) && !*this->waiting_canceled)
     287             :         {
     288      299917 :             std::unique_lock<std::mutex> lock((*this->mtx_push.get())[this->cur_push_id]);
     289      314605 :             ((*this->cnd_push.get())[this->cur_push_id])
     290      937479 :               .wait(lock, [this]() { return !(this->is_full(this->cur_push_id) && !*this->waiting_canceled); });
     291      313896 :         }
     292             :     }
     293             : 
     294      570791 :     if (*this->waiting_canceled) throw tools::waiting_canceled(__FILE__, __LINE__, __func__);
     295      564464 : }
     296             : 
     297             : void
     298      537039 : Adaptor_m_to_n::wait_pull()
     299             : {
     300      537039 :     if (this->tid_pull < 0)
     301             :     {
     302           0 :         std::stringstream message;
     303           0 :         message << "This instance is not a puller.";
     304           0 :         throw tools::runtime_error(__FILE__, __LINE__, __func__, message.str());
     305           0 :     }
     306             : 
     307      537039 :     if (this->active_waiting)
     308             :     {
     309      808320 :         while (this->is_empty(this->cur_pull_id) && !*this->waiting_canceled)
     310             :             ;
     311             :     }
     312             :     else // passive waiting
     313             :     {
     314      522766 :         if (this->is_empty(this->cur_pull_id) && !*this->waiting_canceled)
     315             :         {
     316      151014 :             std::unique_lock<std::mutex> lock((*this->mtx_pull.get())[this->cur_pull_id]);
     317      152969 :             ((*this->cnd_pull.get())[this->cur_pull_id])
     318      485293 :               .wait(lock, [this]() { return !(this->is_empty(this->cur_pull_id) && !*this->waiting_canceled); });
     319      152447 :         }
     320             :     }
     321             : 
     322      508778 :     if (this->is_empty(this->cur_pull_id) && *this->waiting_canceled)
     323        2009 :         throw tools::waiting_canceled(__FILE__, __LINE__, __func__);
     324      530286 : }
     325             : 
     326             : void*
     327      797272 : Adaptor_m_to_n::get_empty_buffer(const size_t sid)
     328             : {
     329             : #ifndef SPU_FAST
     330      797272 :     if (!*this->buffers_allocated)
     331             :     {
     332           0 :         std::stringstream message;
     333           0 :         message << "You need to call 'alloc_buffers()' before to change 'get_empty_buffer()'.";
     334           0 :         throw tools::runtime_error(__FILE__, __LINE__, __func__, message.str());
     335           0 :     }
     336             : #endif
     337      797133 :     return (void*)(*this->buffer)[this->cur_push_id][sid][(*this->last)[this->cur_push_id]];
     338             : }
     339             : 
     340             : void*
     341      748041 : Adaptor_m_to_n::get_filled_buffer(const size_t sid)
     342             : {
     343             : #ifndef SPU_FAST
     344      748041 :     if (!*this->buffers_allocated)
     345             :     {
     346           0 :         std::stringstream message;
     347           0 :         message << "You need to call 'alloc_buffers()' before to change 'get_filled_buffer()'.";
     348           0 :         throw tools::runtime_error(__FILE__, __LINE__, __func__, message.str());
     349           0 :     }
     350             : #endif
     351      745370 :     return (void*)(*this->buffer)[this->cur_pull_id][sid][(*this->first)[this->cur_pull_id]];
     352             : }
     353             : 
     354             : void*
     355      797307 : Adaptor_m_to_n::get_empty_buffer(const size_t sid, void* swap_buffer)
     356             : {
     357             : #ifndef SPU_FAST
     358      797307 :     if (!*this->buffers_allocated)
     359             :     {
     360           0 :         std::stringstream message;
     361           0 :         message << "You need to call 'alloc_buffers()' before to change 'get_empty_buffer()'.";
     362           0 :         throw tools::runtime_error(__FILE__, __LINE__, __func__, message.str());
     363           0 :     }
     364             : #endif
     365      796775 :     void* empty_buffer = (void*)(*this->buffer)[this->cur_push_id][sid][(*this->last)[this->cur_push_id]];
     366      787781 :     (*this->buffer)[this->cur_push_id][sid][(*this->last)[this->cur_push_id]] = (int8_t*)swap_buffer;
     367      783766 :     return empty_buffer;
     368             : }
     369             : 
     370             : void*
     371      757227 : Adaptor_m_to_n::get_filled_buffer(const size_t sid, void* swap_buffer)
     372             : {
     373             : #ifndef SPU_FAST
     374      757227 :     if (!*this->buffers_allocated)
     375             :     {
     376           0 :         std::stringstream message;
     377           0 :         message << "You need to call 'alloc_buffers()' before to change 'get_filled_buffer()'.";
     378           0 :         throw tools::runtime_error(__FILE__, __LINE__, __func__, message.str());
     379           0 :     }
     380             : #endif
     381      754833 :     void* filled_buffer = (void*)(*this->buffer)[this->cur_pull_id][sid][(*this->first)[this->cur_pull_id]];
     382      736371 :     (*this->buffer)[this->cur_pull_id][sid][(*this->first)[this->cur_pull_id]] = (int8_t*)swap_buffer;
     383      733858 :     return filled_buffer;
     384             : }
     385             : 
     386             : void
     387      565179 : Adaptor_m_to_n::wake_up_puller()
     388             : {
     389      565179 :     (*this->last)[this->cur_push_id] = ((*this->last)[this->cur_push_id] + 1) % this->buffer_size;
     390      564209 :     (*this->counter)[this->cur_push_id]--; // atomic fetch sub
     391             : 
     392      578307 :     if (!this->active_waiting) // passive waiting
     393             :     {
     394      559270 :         if (!this->is_empty(this->cur_push_id))
     395             :         {
     396      552058 :             std::lock_guard<std::mutex> lock((*this->mtx_pull.get())[this->cur_push_id]);
     397      558938 :             (*this->cnd_pull.get())[this->cur_push_id].notify_one();
     398      559183 :         }
     399             :     }
     400             : 
     401      579476 :     this->cur_push_id = (this->cur_push_id + *this->n_pushers) % this->buffer->size();
     402      571213 : }
     403             : 
     404             : void
     405      535636 : Adaptor_m_to_n::wake_up_pusher()
     406             : {
     407      535636 :     (*this->first)[this->cur_pull_id] = ((*this->first)[this->cur_pull_id] + 1) % this->buffer_size;
     408      532722 :     (*this->counter)[this->cur_pull_id]++; // atomic fetch add
     409             : 
     410      555799 :     if (!this->active_waiting) // passive waiting
     411             :     {
     412      540274 :         if (!this->is_full(this->cur_pull_id))
     413             :         {
     414      526593 :             std::lock_guard<std::mutex> lock((*this->mtx_push.get())[this->cur_pull_id]);
     415      540391 :             (*this->cnd_push.get())[this->cur_pull_id].notify_one();
     416      542715 :         }
     417             :     }
     418             : 
     419      559386 :     this->cur_pull_id = (this->cur_pull_id + *this->n_pullers) % this->buffer->size();
     420      549128 : }
     421             : 
     422             : void
     423        9844 : Adaptor_m_to_n::wake_up()
     424             : {
     425        9844 :     if (!this->active_waiting) // passive waiting
     426             :     {
     427      394245 :         for (size_t i = 0; i < this->buffer->size(); i++)
     428             :         {
     429      385214 :             std::unique_lock<std::mutex> lock((*this->mtx_push.get())[i]);
     430      385826 :             (*this->cnd_push.get())[i].notify_all();
     431      385248 :         }
     432      394095 :         for (size_t i = 0; i < this->buffer->size(); i++)
     433             :         {
     434      384701 :             std::unique_lock<std::mutex> lock((*this->mtx_pull.get())[i]);
     435      385653 :             (*this->cnd_pull.get())[i].notify_all();
     436      385369 :         }
     437             :     }
     438        9741 : }
     439             : 
     440             : void
     441        9848 : Adaptor_m_to_n::cancel_waiting()
     442             : {
     443        9848 :     this->send_cancel_signal();
     444        9846 :     this->wake_up();
     445        9848 : }

Generated by: LCOV version 1.14