LCOV - code coverage report
Current view: top level - src/Module/Adaptor - Adaptor_n_to_1.cpp (source / functions) Hit Total Coverage
Test: streampu_clean.info Lines: 69 85 81.2 %
Date: 2024-06-12 12:04:18 Functions: 13 15 86.7 %

          Line data    Source code
       1             : #include <algorithm>
       2             : 
       3             : #include "Module/Adaptor/Adaptor_n_to_1.hpp"
       4             : #include "Tools/Exception/exception.hpp"
       5             : 
       6             : using namespace spu;
       7             : using namespace spu::module;
       8             : 
       9             : Adaptor_n_to_1*
      10        1888 : Adaptor_n_to_1::clone() const
      11             : {
      12        1888 :     auto m = new Adaptor_n_to_1(*this);
      13        1888 :     m->deep_copy(*this);
      14        1888 :     return m;
      15             : }
      16             : 
      17             : void
      18           0 : Adaptor_n_to_1::push_n(const std::vector<const int8_t*>& in, const size_t frame_id)
      19             : {
      20           0 :     this->wait_push();
      21             : 
      22           0 :     for (size_t s = 0; s < this->n_sockets; s++)
      23             :     {
      24           0 :         int8_t* out = (int8_t*)this->get_empty_buffer(s);
      25             : 
      26           0 :         std::copy(
      27           0 :           in[s] + 0 * this->n_bytes[s], in[s] + this->get_n_frames() * this->n_bytes[s], out + 0 * this->n_bytes[s]);
      28             :     }
      29             : 
      30           0 :     this->wake_up_puller();
      31           0 : }
      32             : 
      33             : void
      34           0 : Adaptor_n_to_1::pull_1(const std::vector<int8_t*>& out, const size_t frame_id)
      35             : {
      36           0 :     this->wait_pull();
      37             : 
      38           0 :     for (size_t s = 0; s < this->n_sockets; s++)
      39             :     {
      40           0 :         const int8_t* in = (const int8_t*)this->get_filled_buffer(s);
      41             : 
      42           0 :         std::copy(
      43           0 :           in + 0 * this->n_bytes[s], in + this->get_n_frames() * this->n_bytes[s], out[s] + 0 * this->n_bytes[s]);
      44             :     }
      45             : 
      46           0 :     this->wake_up_pusher();
      47           0 : }
      48             : 
      49             : void
      50       30846 : Adaptor_n_to_1::wait_push()
      51             : {
      52       30846 :     if (this->active_waiting)
      53             :     {
      54        6963 :         while (this->is_full(this->id) && !*this->waiting_canceled)
      55             :             ;
      56             :     }
      57             :     else // passive waiting
      58             :     {
      59       24025 :         if (this->is_full(this->id) && !*this->waiting_canceled)
      60             :         {
      61         559 :             std::unique_lock<std::mutex> lock((*this->mtx_put.get())[this->id]);
      62         561 :             (*this->cnd_put.get())[this->id].wait(
      63        1172 :               lock, [this]() { return !(this->is_full(this->id) && !*this->waiting_canceled); });
      64         554 :         }
      65             :     }
      66             : 
      67       29928 :     if (*this->waiting_canceled) throw tools::waiting_canceled(__FILE__, __LINE__, __func__);
      68       29546 : }
      69             : 
      70             : void
      71       21890 : Adaptor_n_to_1::wait_pull()
      72             : {
      73       21890 :     if (this->active_waiting)
      74             :     {
      75      610410 :         while (this->is_empty(this->cur_id) && !*this->waiting_canceled)
      76             :             ;
      77             :     }
      78             :     else // passive waiting
      79             :     {
      80       17681 :         if (this->is_empty(this->cur_id) && !*this->waiting_canceled)
      81             :         {
      82        2228 :             std::unique_lock<std::mutex> lock(*this->mtx_pull.get());
      83             :             (*this->cnd_pull.get())
      84        9334 :               .wait(lock, [this]() { return !(this->is_empty(this->cur_id) && !*this->waiting_canceled); });
      85        2228 :         }
      86             :     }
      87             : 
      88       21831 :     if (this->is_empty(this->cur_id) && *this->waiting_canceled)
      89          42 :         throw tools::waiting_canceled(__FILE__, __LINE__, __func__);
      90       21679 : }
      91             : 
      92             : void*
      93       52512 : Adaptor_n_to_1::get_empty_buffer(const size_t sid)
      94             : {
      95       52512 :     return (void*)(*this->buffer)[this->id][sid][(*this->last)[this->id] % this->buffer_size];
      96             : }
      97             : 
      98             : void*
      99       39853 : Adaptor_n_to_1::get_filled_buffer(const size_t sid)
     100             : {
     101       39853 :     return (void*)(*this->buffer)[this->cur_id][sid][(*this->first)[this->cur_id] % this->buffer_size];
     102             : }
     103             : 
     104             : void*
     105       52951 : Adaptor_n_to_1::get_empty_buffer(const size_t sid, void* swap_buffer)
     106             : {
     107       52951 :     void* empty_buffer = (void*)(*this->buffer)[this->id][sid][(*this->last)[this->id] % this->buffer_size];
     108       51433 :     (*this->buffer)[this->id][sid][(*this->last)[this->id] % this->buffer_size] = (int8_t*)swap_buffer;
     109       50933 :     return empty_buffer;
     110             : }
     111             : 
     112             : void*
     113       39870 : Adaptor_n_to_1::get_filled_buffer(const size_t sid, void* swap_buffer)
     114             : {
     115       39870 :     void* filled_buffer = (void*)(*this->buffer)[this->cur_id][sid][(*this->first)[this->cur_id] % this->buffer_size];
     116       39777 :     (*this->buffer)[this->cur_id][sid][(*this->first)[this->cur_id] % this->buffer_size] = (int8_t*)swap_buffer;
     117       39743 :     return filled_buffer;
     118             : }
     119             : 
     120             : void
     121       30648 : Adaptor_n_to_1::wake_up_puller()
     122             : {
     123       30648 :     (*this->last)[this->id]++;
     124             : 
     125       31955 :     if (!this->active_waiting) // passive waiting
     126             :     {
     127       24454 :         if (!this->is_empty(this->id))
     128             :         {
     129       23683 :             std::lock_guard<std::mutex> lock(*this->mtx_pull.get());
     130       24547 :             (*this->cnd_pull.get()).notify_one();
     131       24541 :         }
     132             :     }
     133       32042 : }
     134             : 
     135             : void
     136       21843 : Adaptor_n_to_1::wake_up_pusher()
     137             : {
     138       21843 :     (*this->first)[this->cur_id]++;
     139             : 
     140       21908 :     if (!this->active_waiting) // passive waiting
     141             :     {
     142       17701 :         if (!this->is_full(this->cur_id))
     143             :         {
     144       17616 :             std::lock_guard<std::mutex> lock((*this->mtx_put.get())[this->cur_id]);
     145       17694 :             (*this->cnd_put.get())[this->cur_id].notify_one();
     146       17688 :         }
     147             :     }
     148             : 
     149             :     do
     150             :     {
     151       21977 :         this->cur_id = (this->cur_id + 1) % this->buffer->size();
     152       21875 :     } while ((*this->buffer)[this->cur_id].size() == 0);
     153       21777 : }
     154             : 
     155             : void
     156        2056 : Adaptor_n_to_1::wake_up()
     157             : {
     158        2056 :     if (!this->active_waiting) // passive waiting
     159             :     {
     160       77589 :         for (size_t i = 0; i < this->buffer->size(); i++)
     161       75845 :             if ((*this->buffer)[i].size() != 0)
     162             :             {
     163       75837 :                 std::unique_lock<std::mutex> lock((*this->mtx_put.get())[i]);
     164       75853 :                 (*this->cnd_put.get())[i].notify_all();
     165       75847 :             }
     166             : 
     167        1738 :         std::lock_guard<std::mutex> lock(*this->mtx_pull.get());
     168        1738 :         (*this->cnd_pull.get()).notify_all();
     169        1738 :     }
     170        2056 : }
     171             : 
     172             : void
     173        2056 : Adaptor_n_to_1::cancel_waiting()
     174             : {
     175        2056 :     this->send_cancel_signal();
     176        2056 :     this->wake_up();
     177        2056 : }

Generated by: LCOV version 1.14