Line data Source code
1 : #include <algorithm> 2 : 3 : #include "Module/Stateful/Adaptor/Adaptor_1_to_n.hpp" 4 : #include "Tools/Exception/exception.hpp" 5 : 6 : using namespace spu; 7 : using namespace spu::module; 8 : 9 : Adaptor_1_to_n* 10 2124 : Adaptor_1_to_n::clone() const 11 : { 12 2124 : auto m = new Adaptor_1_to_n(*this); 13 2124 : m->deep_copy(*this); 14 2124 : return m; 15 : } 16 : 17 : void 18 0 : Adaptor_1_to_n::push_1(const std::vector<const int8_t*>& in, const size_t frame_id) 19 : { 20 0 : this->wait_push(); 21 : 22 0 : for (size_t s = 0; s < this->n_sockets; s++) 23 : { 24 0 : int8_t* out = (int8_t*)this->get_empty_buffer(s); 25 : 26 0 : std::copy( 27 0 : in[s] + 0 * this->n_bytes[s], in[s] + this->get_n_frames() * this->n_bytes[s], out + 0 * this->n_bytes[s]); 28 : } 29 : 30 0 : this->wake_up_puller(); 31 0 : } 32 : 33 : void 34 0 : Adaptor_1_to_n::pull_n(const std::vector<int8_t*>& out, const size_t frame_id) 35 : { 36 0 : this->wait_pull(); 37 : 38 0 : for (size_t s = 0; s < this->n_sockets; s++) 39 : { 40 0 : const int8_t* in = (const int8_t*)this->get_filled_buffer(s); 41 : 42 0 : std::copy( 43 0 : in + 0 * this->n_bytes[s], in + this->get_n_frames() * this->n_bytes[s], out[s] + 0 * this->n_bytes[s]); 44 : } 45 : 46 0 : this->wake_up_pusher(); 47 0 : } 48 : 49 : void 50 340223 : Adaptor_1_to_n::wait_push() 51 : { 52 340223 : if (this->active_waiting) 53 : { 54 16482 : while (this->is_full(this->cur_id) && !*this->waiting_canceled) 55 : ; 56 : } 57 : else // passive waiting 58 : { 59 332468 : if (this->is_full(this->cur_id) && !*this->waiting_canceled) 60 : { 61 123390 : std::unique_lock<std::mutex> lock(*this->mtx_put.get()); 62 : (*this->cnd_put.get()) 63 370679 : .wait(lock, [this]() { return !(this->is_full(this->cur_id) && !*this->waiting_canceled); }); 64 123305 : } 65 : } 66 : 67 344000 : if (*this->waiting_canceled) throw tools::waiting_canceled(__FILE__, __LINE__, __func__); 68 342173 : } 69 : 70 : void 71 331874 : Adaptor_1_to_n::wait_pull() 72 : { 73 331874 : if (this->active_waiting) 74 : { 75 202248 : while (this->is_empty(this->id) && !*this->waiting_canceled) 76 : ; 77 : } 78 : else // passive waiting 79 : { 80 324545 : if (this->is_empty(this->id) && !*this->waiting_canceled) 81 : { 82 182391 : std::unique_lock<std::mutex> lock((*this->mtx_pull.get())[this->id]); 83 185737 : (*this->cnd_pull.get())[this->id].wait( 84 386073 : lock, [this]() { return !(this->is_empty(this->id) && !*this->waiting_canceled); }); 85 183568 : } 86 : } 87 : 88 266391 : if (this->is_empty(this->id) && *this->waiting_canceled) 89 1465 : throw tools::waiting_canceled(__FILE__, __LINE__, __func__); 90 328067 : } 91 : 92 : void* 93 658059 : Adaptor_1_to_n::get_empty_buffer(const size_t sid) 94 : { 95 658059 : return (void*)(*this->buffer)[this->cur_id][sid][(*this->last)[this->cur_id] % this->buffer_size]; 96 : } 97 : 98 : void* 99 645450 : Adaptor_1_to_n::get_filled_buffer(const size_t sid) 100 : { 101 645450 : return (void*)(*this->buffer)[this->id][sid][(*this->first)[this->id] % this->buffer_size]; 102 : } 103 : 104 : void* 105 655280 : Adaptor_1_to_n::get_empty_buffer(const size_t sid, void* swap_buffer) 106 : { 107 655280 : void* empty_buffer = (void*)(*this->buffer)[this->cur_id][sid][(*this->last)[this->cur_id] % this->buffer_size]; 108 649494 : (*this->buffer)[this->cur_id][sid][(*this->last)[this->cur_id] % this->buffer_size] = (int8_t*)swap_buffer; 109 646555 : return empty_buffer; 110 : } 111 : 112 : void* 113 645268 : Adaptor_1_to_n::get_filled_buffer(const size_t sid, void* swap_buffer) 114 : { 115 645268 : void* filled_buffer = (void*)(*this->buffer)[this->id][sid][(*this->first)[this->id] % this->buffer_size]; 116 642317 : (*this->buffer)[this->id][sid][(*this->first)[this->id] % this->buffer_size] = (int8_t*)swap_buffer; 117 640212 : return filled_buffer; 118 : } 119 : 120 : void 121 343518 : Adaptor_1_to_n::wake_up_puller() 122 : { 123 343518 : (*this->last)[this->cur_id]++; 124 : 125 348306 : if (!this->active_waiting) // passive waiting 126 : { 127 340628 : if (!this->is_empty(this->cur_id)) 128 : { 129 335538 : std::lock_guard<std::mutex> lock((*this->mtx_pull.get())[this->cur_id]); 130 340255 : (*this->cnd_pull.get())[this->cur_id].notify_one(); 131 341076 : } 132 : } 133 : 134 : do 135 : { 136 453373 : this->cur_id = (this->cur_id + 1) % this->buffer->size(); 137 452162 : } while ((*this->buffer)[this->cur_id].size() == 0); 138 346402 : } 139 : 140 : void 141 331862 : Adaptor_1_to_n::wake_up_pusher() 142 : { 143 331862 : (*this->first)[this->id]++; 144 : 145 335718 : if (!this->active_waiting) // passive waiting 146 : { 147 328194 : if (!this->is_full(this->id)) 148 : { 149 324778 : std::lock_guard<std::mutex> lock(*this->mtx_put.get()); 150 328148 : (*this->cnd_put.get()).notify_one(); 151 328213 : } 152 : } 153 335899 : } 154 : 155 : void 156 5910 : Adaptor_1_to_n::wake_up() 157 : { 158 5910 : if (!this->active_waiting) // passive waiting 159 : { 160 225550 : for (size_t i = 0; i < this->buffer->size(); i++) 161 219975 : if ((*this->buffer)[i].size() != 0) 162 : { 163 219144 : std::unique_lock<std::mutex> lock((*this->mtx_pull.get())[i]); 164 220511 : (*this->cnd_pull.get())[i].notify_all(); 165 220400 : } 166 : 167 5009 : std::lock_guard<std::mutex> lock(*this->mtx_put.get()); 168 5010 : (*this->cnd_put.get()).notify_all(); 169 5010 : } 170 5914 : } 171 : 172 : void 173 5912 : Adaptor_1_to_n::cancel_waiting() 174 : { 175 5912 : this->send_cancel_signal(); 176 5912 : this->wake_up(); 177 5913 : }