LCOV - code coverage report
Current view: top level - src/Module/Adaptor - Adaptor_1_to_n.cpp (source / functions) Hit Total Coverage
Test: streampu_clean.info Lines: 68 85 80.0 %
Date: 2024-06-12 12:04:18 Functions: 13 15 86.7 %

          Line data    Source code
       1             : #include <algorithm>
       2             : 
       3             : #include "Module/Adaptor/Adaptor_1_to_n.hpp"
       4             : #include "Tools/Exception/exception.hpp"
       5             : 
       6             : using namespace spu;
       7             : using namespace spu::module;
       8             : 
       9             : Adaptor_1_to_n*
      10        1888 : Adaptor_1_to_n::clone() const
      11             : {
      12        1888 :     auto m = new Adaptor_1_to_n(*this);
      13        1888 :     m->deep_copy(*this);
      14        1888 :     return m;
      15             : }
      16             : 
      17             : void
      18           0 : Adaptor_1_to_n::push_1(const std::vector<const int8_t*>& in, const size_t frame_id)
      19             : {
      20           0 :     this->wait_push();
      21             : 
      22           0 :     for (size_t s = 0; s < this->n_sockets; s++)
      23             :     {
      24           0 :         int8_t* out = (int8_t*)this->get_empty_buffer(s);
      25             : 
      26           0 :         std::copy(
      27           0 :           in[s] + 0 * this->n_bytes[s], in[s] + this->get_n_frames() * this->n_bytes[s], out + 0 * this->n_bytes[s]);
      28             :     }
      29             : 
      30           0 :     this->wake_up_puller();
      31           0 : }
      32             : 
      33             : void
      34           0 : Adaptor_1_to_n::pull_n(const std::vector<int8_t*>& out, const size_t frame_id)
      35             : {
      36           0 :     this->wait_pull();
      37             : 
      38           0 :     for (size_t s = 0; s < this->n_sockets; s++)
      39             :     {
      40           0 :         const int8_t* in = (const int8_t*)this->get_filled_buffer(s);
      41             : 
      42           0 :         std::copy(
      43           0 :           in + 0 * this->n_bytes[s], in + this->get_n_frames() * this->n_bytes[s], out[s] + 0 * this->n_bytes[s]);
      44             :     }
      45             : 
      46           0 :     this->wake_up_pusher();
      47           0 : }
      48             : 
      49             : void
      50       50545 : Adaptor_1_to_n::wait_push()
      51             : {
      52       50545 :     if (this->active_waiting)
      53             :     {
      54        7904 :         while (this->is_full(this->cur_id) && !*this->waiting_canceled)
      55             :             ;
      56             :     }
      57             :     else // passive waiting
      58             :     {
      59       42922 :         if (this->is_full(this->cur_id) && !*this->waiting_canceled)
      60             :         {
      61        2878 :             std::unique_lock<std::mutex> lock(*this->mtx_put.get());
      62             :             (*this->cnd_put.get())
      63        9438 :               .wait(lock, [this]() { return !(this->is_full(this->cur_id) && !*this->waiting_canceled); });
      64        2878 :         }
      65             :     }
      66             : 
      67       50703 :     if (*this->waiting_canceled) throw tools::waiting_canceled(__FILE__, __LINE__, __func__);
      68       50643 : }
      69             : 
      70             : void
      71       36675 : Adaptor_1_to_n::wait_pull()
      72             : {
      73       36675 :     if (this->active_waiting)
      74             :     {
      75       96177 :         while (this->is_empty(this->id) && !*this->waiting_canceled)
      76             :             ;
      77             :     }
      78             :     else // passive waiting
      79             :     {
      80       29572 :         if (this->is_empty(this->id) && !*this->waiting_canceled)
      81             :         {
      82       19206 :             std::unique_lock<std::mutex> lock((*this->mtx_pull.get())[this->id]);
      83       19829 :             (*this->cnd_pull.get())[this->id].wait(
      84       60177 :               lock, [this]() { return !(this->is_empty(this->id) && !*this->waiting_canceled); });
      85       19290 :         }
      86             :     }
      87             : 
      88           0 :     if (this->is_empty(this->id) && *this->waiting_canceled)
      89        1493 :         throw tools::waiting_canceled(__FILE__, __LINE__, __func__);
      90       33023 : }
      91             : 
      92             : void*
      93       76310 : Adaptor_1_to_n::get_empty_buffer(const size_t sid)
      94             : {
      95       76310 :     return (void*)(*this->buffer)[this->cur_id][sid][(*this->last)[this->cur_id] % this->buffer_size];
      96             : }
      97             : 
      98             : void*
      99       57390 : Adaptor_1_to_n::get_filled_buffer(const size_t sid)
     100             : {
     101       57390 :     return (void*)(*this->buffer)[this->id][sid][(*this->first)[this->id] % this->buffer_size];
     102             : }
     103             : 
     104             : void*
     105       76693 : Adaptor_1_to_n::get_empty_buffer(const size_t sid, void* swap_buffer)
     106             : {
     107       76693 :     void* empty_buffer = (void*)(*this->buffer)[this->cur_id][sid][(*this->last)[this->cur_id] % this->buffer_size];
     108       76623 :     (*this->buffer)[this->cur_id][sid][(*this->last)[this->cur_id] % this->buffer_size] = (int8_t*)swap_buffer;
     109       76558 :     return empty_buffer;
     110             : }
     111             : 
     112             : void*
     113       58260 : Adaptor_1_to_n::get_filled_buffer(const size_t sid, void* swap_buffer)
     114             : {
     115       58260 :     void* filled_buffer = (void*)(*this->buffer)[this->id][sid][(*this->first)[this->id] % this->buffer_size];
     116       57324 :     (*this->buffer)[this->id][sid][(*this->first)[this->id] % this->buffer_size] = (int8_t*)swap_buffer;
     117       56925 :     return filled_buffer;
     118             : }
     119             : 
     120             : void
     121       50817 : Adaptor_1_to_n::wake_up_puller()
     122             : {
     123       50817 :     (*this->last)[this->cur_id]++;
     124             : 
     125       50995 :     if (!this->active_waiting) // passive waiting
     126             :     {
     127       43367 :         if (!this->is_empty(this->cur_id))
     128             :         {
     129       43227 :             std::lock_guard<std::mutex> lock((*this->mtx_pull.get())[this->cur_id]);
     130       43347 :             (*this->cnd_pull.get())[this->cur_id].notify_one();
     131       43379 :         }
     132             :     }
     133             : 
     134             :     do
     135             :     {
     136       51003 :         this->cur_id = (this->cur_id + 1) % this->buffer->size();
     137       50986 :     } while ((*this->buffer)[this->cur_id].size() == 0);
     138       50958 : }
     139             : 
     140             : void
     141       34819 : Adaptor_1_to_n::wake_up_pusher()
     142             : {
     143       34819 :     (*this->first)[this->id]++;
     144             : 
     145       36724 :     if (!this->active_waiting) // passive waiting
     146             :     {
     147       29304 :         if (!this->is_full(this->id))
     148             :         {
     149       28245 :             std::lock_guard<std::mutex> lock(*this->mtx_put.get());
     150       29411 :             (*this->cnd_put.get()).notify_one();
     151       29391 :         }
     152             :     }
     153       36821 : }
     154             : 
     155             : void
     156        5861 : Adaptor_1_to_n::wake_up()
     157             : {
     158        5861 :     if (!this->active_waiting) // passive waiting
     159             :     {
     160      225038 :         for (size_t i = 0; i < this->buffer->size(); i++)
     161      219273 :             if ((*this->buffer)[i].size() != 0)
     162             :             {
     163      218396 :                 std::unique_lock<std::mutex> lock((*this->mtx_pull.get())[i]);
     164      219923 :                 (*this->cnd_pull.get())[i].notify_all();
     165      219978 :             }
     166             : 
     167        4960 :         std::lock_guard<std::mutex> lock(*this->mtx_put.get());
     168        4960 :         (*this->cnd_put.get()).notify_all();
     169        4959 :     }
     170        5862 : }
     171             : 
     172             : void
     173        5862 : Adaptor_1_to_n::cancel_waiting()
     174             : {
     175        5862 :     this->send_cancel_signal();
     176        5861 :     this->wake_up();
     177        5862 : }

Generated by: LCOV version 1.14