LCOV - code coverage report
Current view: top level - src/Module/Stateful/Adaptor - Adaptor_1_to_n.cpp (source / functions) Hit Total Coverage
Test: streampu_clean.info Lines: 69 85 81.2 %
Date: 2025-01-11 12:25:42 Functions: 13 15 86.7 %

          Line data    Source code
       1             : #include <algorithm>
       2             : 
       3             : #include "Module/Stateful/Adaptor/Adaptor_1_to_n.hpp"
       4             : #include "Tools/Exception/exception.hpp"
       5             : 
       6             : using namespace spu;
       7             : using namespace spu::module;
       8             : 
       9             : Adaptor_1_to_n*
      10        2124 : Adaptor_1_to_n::clone() const
      11             : {
      12        2124 :     auto m = new Adaptor_1_to_n(*this);
      13        2124 :     m->deep_copy(*this);
      14        2124 :     return m;
      15             : }
      16             : 
      17             : void
      18           0 : Adaptor_1_to_n::push_1(const std::vector<const int8_t*>& in, const size_t frame_id)
      19             : {
      20           0 :     this->wait_push();
      21             : 
      22           0 :     for (size_t s = 0; s < this->n_sockets; s++)
      23             :     {
      24           0 :         int8_t* out = (int8_t*)this->get_empty_buffer(s);
      25             : 
      26           0 :         std::copy(
      27           0 :           in[s] + 0 * this->n_bytes[s], in[s] + this->get_n_frames() * this->n_bytes[s], out + 0 * this->n_bytes[s]);
      28             :     }
      29             : 
      30           0 :     this->wake_up_puller();
      31           0 : }
      32             : 
      33             : void
      34           0 : Adaptor_1_to_n::pull_n(const std::vector<int8_t*>& out, const size_t frame_id)
      35             : {
      36           0 :     this->wait_pull();
      37             : 
      38           0 :     for (size_t s = 0; s < this->n_sockets; s++)
      39             :     {
      40           0 :         const int8_t* in = (const int8_t*)this->get_filled_buffer(s);
      41             : 
      42           0 :         std::copy(
      43           0 :           in + 0 * this->n_bytes[s], in + this->get_n_frames() * this->n_bytes[s], out[s] + 0 * this->n_bytes[s]);
      44             :     }
      45             : 
      46           0 :     this->wake_up_pusher();
      47           0 : }
      48             : 
      49             : void
      50      340223 : Adaptor_1_to_n::wait_push()
      51             : {
      52      340223 :     if (this->active_waiting)
      53             :     {
      54       16482 :         while (this->is_full(this->cur_id) && !*this->waiting_canceled)
      55             :             ;
      56             :     }
      57             :     else // passive waiting
      58             :     {
      59      332468 :         if (this->is_full(this->cur_id) && !*this->waiting_canceled)
      60             :         {
      61      123390 :             std::unique_lock<std::mutex> lock(*this->mtx_put.get());
      62             :             (*this->cnd_put.get())
      63      370679 :               .wait(lock, [this]() { return !(this->is_full(this->cur_id) && !*this->waiting_canceled); });
      64      123305 :         }
      65             :     }
      66             : 
      67      344000 :     if (*this->waiting_canceled) throw tools::waiting_canceled(__FILE__, __LINE__, __func__);
      68      342173 : }
      69             : 
      70             : void
      71      331874 : Adaptor_1_to_n::wait_pull()
      72             : {
      73      331874 :     if (this->active_waiting)
      74             :     {
      75      202248 :         while (this->is_empty(this->id) && !*this->waiting_canceled)
      76             :             ;
      77             :     }
      78             :     else // passive waiting
      79             :     {
      80      324545 :         if (this->is_empty(this->id) && !*this->waiting_canceled)
      81             :         {
      82      182391 :             std::unique_lock<std::mutex> lock((*this->mtx_pull.get())[this->id]);
      83      185737 :             (*this->cnd_pull.get())[this->id].wait(
      84      386073 :               lock, [this]() { return !(this->is_empty(this->id) && !*this->waiting_canceled); });
      85      183568 :         }
      86             :     }
      87             : 
      88      266391 :     if (this->is_empty(this->id) && *this->waiting_canceled)
      89        1465 :         throw tools::waiting_canceled(__FILE__, __LINE__, __func__);
      90      328067 : }
      91             : 
      92             : void*
      93      658059 : Adaptor_1_to_n::get_empty_buffer(const size_t sid)
      94             : {
      95      658059 :     return (void*)(*this->buffer)[this->cur_id][sid][(*this->last)[this->cur_id] % this->buffer_size];
      96             : }
      97             : 
      98             : void*
      99      645450 : Adaptor_1_to_n::get_filled_buffer(const size_t sid)
     100             : {
     101      645450 :     return (void*)(*this->buffer)[this->id][sid][(*this->first)[this->id] % this->buffer_size];
     102             : }
     103             : 
     104             : void*
     105      655280 : Adaptor_1_to_n::get_empty_buffer(const size_t sid, void* swap_buffer)
     106             : {
     107      655280 :     void* empty_buffer = (void*)(*this->buffer)[this->cur_id][sid][(*this->last)[this->cur_id] % this->buffer_size];
     108      649494 :     (*this->buffer)[this->cur_id][sid][(*this->last)[this->cur_id] % this->buffer_size] = (int8_t*)swap_buffer;
     109      646555 :     return empty_buffer;
     110             : }
     111             : 
     112             : void*
     113      645268 : Adaptor_1_to_n::get_filled_buffer(const size_t sid, void* swap_buffer)
     114             : {
     115      645268 :     void* filled_buffer = (void*)(*this->buffer)[this->id][sid][(*this->first)[this->id] % this->buffer_size];
     116      642317 :     (*this->buffer)[this->id][sid][(*this->first)[this->id] % this->buffer_size] = (int8_t*)swap_buffer;
     117      640212 :     return filled_buffer;
     118             : }
     119             : 
     120             : void
     121      343518 : Adaptor_1_to_n::wake_up_puller()
     122             : {
     123      343518 :     (*this->last)[this->cur_id]++;
     124             : 
     125      348306 :     if (!this->active_waiting) // passive waiting
     126             :     {
     127      340628 :         if (!this->is_empty(this->cur_id))
     128             :         {
     129      335538 :             std::lock_guard<std::mutex> lock((*this->mtx_pull.get())[this->cur_id]);
     130      340255 :             (*this->cnd_pull.get())[this->cur_id].notify_one();
     131      341076 :         }
     132             :     }
     133             : 
     134             :     do
     135             :     {
     136      453373 :         this->cur_id = (this->cur_id + 1) % this->buffer->size();
     137      452162 :     } while ((*this->buffer)[this->cur_id].size() == 0);
     138      346402 : }
     139             : 
     140             : void
     141      331862 : Adaptor_1_to_n::wake_up_pusher()
     142             : {
     143      331862 :     (*this->first)[this->id]++;
     144             : 
     145      335718 :     if (!this->active_waiting) // passive waiting
     146             :     {
     147      328194 :         if (!this->is_full(this->id))
     148             :         {
     149      324778 :             std::lock_guard<std::mutex> lock(*this->mtx_put.get());
     150      328148 :             (*this->cnd_put.get()).notify_one();
     151      328213 :         }
     152             :     }
     153      335899 : }
     154             : 
     155             : void
     156        5910 : Adaptor_1_to_n::wake_up()
     157             : {
     158        5910 :     if (!this->active_waiting) // passive waiting
     159             :     {
     160      225550 :         for (size_t i = 0; i < this->buffer->size(); i++)
     161      219975 :             if ((*this->buffer)[i].size() != 0)
     162             :             {
     163      219144 :                 std::unique_lock<std::mutex> lock((*this->mtx_pull.get())[i]);
     164      220511 :                 (*this->cnd_pull.get())[i].notify_all();
     165      220400 :             }
     166             : 
     167        5009 :         std::lock_guard<std::mutex> lock(*this->mtx_put.get());
     168        5010 :         (*this->cnd_put.get()).notify_all();
     169        5010 :     }
     170        5914 : }
     171             : 
     172             : void
     173        5912 : Adaptor_1_to_n::cancel_waiting()
     174             : {
     175        5912 :     this->send_cancel_signal();
     176        5912 :     this->wake_up();
     177        5913 : }

Generated by: LCOV version 1.14