LCOV - code coverage report
Current view: top level - src/Module/Stateful/Adaptor - Adaptor_n_to_1.cpp (source / functions) Hit Total Coverage
Test: streampu_clean.info Lines: 69 85 81.2 %
Date: 2025-01-11 12:25:42 Functions: 13 15 86.7 %

          Line data    Source code
       1             : #include <algorithm>
       2             : 
       3             : #include "Module/Stateful/Adaptor/Adaptor_n_to_1.hpp"
       4             : #include "Tools/Exception/exception.hpp"
       5             : 
       6             : using namespace spu;
       7             : using namespace spu::module;
       8             : 
       9             : Adaptor_n_to_1*
      10        2072 : Adaptor_n_to_1::clone() const
      11             : {
      12        2072 :     auto m = new Adaptor_n_to_1(*this);
      13        2072 :     m->deep_copy(*this);
      14        2072 :     return m;
      15             : }
      16             : 
      17             : void
      18           0 : Adaptor_n_to_1::push_n(const std::vector<const int8_t*>& in, const size_t frame_id)
      19             : {
      20           0 :     this->wait_push();
      21             : 
      22           0 :     for (size_t s = 0; s < this->n_sockets; s++)
      23             :     {
      24           0 :         int8_t* out = (int8_t*)this->get_empty_buffer(s);
      25             : 
      26           0 :         std::copy(
      27           0 :           in[s] + 0 * this->n_bytes[s], in[s] + this->get_n_frames() * this->n_bytes[s], out + 0 * this->n_bytes[s]);
      28             :     }
      29             : 
      30           0 :     this->wake_up_puller();
      31           0 : }
      32             : 
      33             : void
      34           0 : Adaptor_n_to_1::pull_1(const std::vector<int8_t*>& out, const size_t frame_id)
      35             : {
      36           0 :     this->wait_pull();
      37             : 
      38           0 :     for (size_t s = 0; s < this->n_sockets; s++)
      39             :     {
      40           0 :         const int8_t* in = (const int8_t*)this->get_filled_buffer(s);
      41             : 
      42           0 :         std::copy(
      43           0 :           in + 0 * this->n_bytes[s], in + this->get_n_frames() * this->n_bytes[s], out[s] + 0 * this->n_bytes[s]);
      44             :     }
      45             : 
      46           0 :     this->wake_up_pusher();
      47           0 : }
      48             : 
      49             : void
      50      229531 : Adaptor_n_to_1::wait_push()
      51             : {
      52      229531 :     if (this->active_waiting)
      53             :     {
      54        7501 :         while (this->is_full(this->id) && !*this->waiting_canceled)
      55             :             ;
      56             :     }
      57             :     else // passive waiting
      58             :     {
      59      222350 :         if (this->is_full(this->id) && !*this->waiting_canceled)
      60             :         {
      61      128399 :             std::unique_lock<std::mutex> lock((*this->mtx_put.get())[this->id]);
      62      128831 :             (*this->cnd_put.get())[this->id].wait(
      63      257188 :               lock, [this]() { return !(this->is_full(this->id) && !*this->waiting_canceled); });
      64      128699 :         }
      65             :     }
      66             : 
      67      229262 :     if (*this->waiting_canceled) throw tools::waiting_canceled(__FILE__, __LINE__, __func__);
      68      228723 : }
      69             : 
      70             : void
      71      217485 : Adaptor_n_to_1::wait_pull()
      72             : {
      73      217485 :     if (this->active_waiting)
      74             :     {
      75     1213842 :         while (this->is_empty(this->cur_id) && !*this->waiting_canceled)
      76             :             ;
      77             :     }
      78             :     else // passive waiting
      79             :     {
      80      213251 :         if (this->is_empty(this->cur_id) && !*this->waiting_canceled)
      81             :         {
      82       47979 :             std::unique_lock<std::mutex> lock(*this->mtx_pull.get());
      83             :             (*this->cnd_pull.get())
      84      144265 :               .wait(lock, [this]() { return !(this->is_empty(this->cur_id) && !*this->waiting_canceled); });
      85       47977 :         }
      86             :     }
      87             : 
      88      216605 :     if (this->is_empty(this->cur_id) && *this->waiting_canceled)
      89          43 :         throw tools::waiting_canceled(__FILE__, __LINE__, __func__);
      90      215627 : }
      91             : 
      92             : void*
      93      348679 : Adaptor_n_to_1::get_empty_buffer(const size_t sid)
      94             : {
      95      348679 :     return (void*)(*this->buffer)[this->id][sid][(*this->last)[this->id] % this->buffer_size];
      96             : }
      97             : 
      98             : void*
      99      331202 : Adaptor_n_to_1::get_filled_buffer(const size_t sid)
     100             : {
     101      331202 :     return (void*)(*this->buffer)[this->cur_id][sid][(*this->first)[this->cur_id] % this->buffer_size];
     102             : }
     103             : 
     104             : void*
     105      349113 : Adaptor_n_to_1::get_empty_buffer(const size_t sid, void* swap_buffer)
     106             : {
     107      349113 :     void* empty_buffer = (void*)(*this->buffer)[this->id][sid][(*this->last)[this->id] % this->buffer_size];
     108      347647 :     (*this->buffer)[this->id][sid][(*this->last)[this->id] % this->buffer_size] = (int8_t*)swap_buffer;
     109      347063 :     return empty_buffer;
     110             : }
     111             : 
     112             : void*
     113      332388 : Adaptor_n_to_1::get_filled_buffer(const size_t sid, void* swap_buffer)
     114             : {
     115      332388 :     void* filled_buffer = (void*)(*this->buffer)[this->cur_id][sid][(*this->first)[this->cur_id] % this->buffer_size];
     116      330907 :     (*this->buffer)[this->cur_id][sid][(*this->first)[this->cur_id] % this->buffer_size] = (int8_t*)swap_buffer;
     117      329764 :     return filled_buffer;
     118             : }
     119             : 
     120             : void
     121      229642 : Adaptor_n_to_1::wake_up_puller()
     122             : {
     123      229642 :     (*this->last)[this->id]++;
     124             : 
     125      230909 :     if (!this->active_waiting) // passive waiting
     126             :     {
     127      223331 :         if (!this->is_empty(this->id))
     128             :         {
     129      222477 :             std::lock_guard<std::mutex> lock(*this->mtx_pull.get());
     130      223386 :             (*this->cnd_pull.get()).notify_one();
     131      223378 :         }
     132             :     }
     133      230973 : }
     134             : 
     135             : void
     136      220776 : Adaptor_n_to_1::wake_up_pusher()
     137             : {
     138      220776 :     (*this->first)[this->cur_id]++;
     139             : 
     140      221189 :     if (!this->active_waiting) // passive waiting
     141             :     {
     142      216978 :         if (!this->is_full(this->cur_id))
     143             :         {
     144      215372 :             std::lock_guard<std::mutex> lock((*this->mtx_put.get())[this->cur_id]);
     145      216906 :             (*this->cnd_put.get())[this->cur_id].notify_one();
     146      217033 :         }
     147             :     }
     148             : 
     149             :     do
     150             :     {
     151      221232 :         this->cur_id = (this->cur_id + 1) % this->buffer->size();
     152      221071 :     } while ((*this->buffer)[this->cur_id].size() == 0);
     153      220556 : }
     154             : 
     155             : void
     156        2072 : Adaptor_n_to_1::wake_up()
     157             : {
     158        2072 :     if (!this->active_waiting) // passive waiting
     159             :     {
     160       77672 :         for (size_t i = 0; i < this->buffer->size(); i++)
     161       75918 :             if ((*this->buffer)[i].size() != 0)
     162             :             {
     163       75918 :                 std::unique_lock<std::mutex> lock((*this->mtx_put.get())[i]);
     164       75918 :                 (*this->cnd_put.get())[i].notify_all();
     165       75918 :             }
     166             : 
     167        1754 :         std::lock_guard<std::mutex> lock(*this->mtx_pull.get());
     168        1754 :         (*this->cnd_pull.get()).notify_all();
     169        1754 :     }
     170        2072 : }
     171             : 
     172             : void
     173        2072 : Adaptor_n_to_1::cancel_waiting()
     174             : {
     175        2072 :     this->send_cancel_signal();
     176        2072 :     this->wake_up();
     177        2072 : }

Generated by: LCOV version 1.14