Line data Source code
1 : #include <algorithm> 2 : 3 : #include "Module/Stateful/Adaptor/Adaptor_n_to_1.hpp" 4 : #include "Tools/Exception/exception.hpp" 5 : 6 : using namespace spu; 7 : using namespace spu::module; 8 : 9 : Adaptor_n_to_1* 10 2072 : Adaptor_n_to_1::clone() const 11 : { 12 2072 : auto m = new Adaptor_n_to_1(*this); 13 2072 : m->deep_copy(*this); 14 2072 : return m; 15 : } 16 : 17 : void 18 0 : Adaptor_n_to_1::push_n(const std::vector<const int8_t*>& in, const size_t frame_id) 19 : { 20 0 : this->wait_push(); 21 : 22 0 : for (size_t s = 0; s < this->n_sockets; s++) 23 : { 24 0 : int8_t* out = (int8_t*)this->get_empty_buffer(s); 25 : 26 0 : std::copy( 27 0 : in[s] + 0 * this->n_bytes[s], in[s] + this->get_n_frames() * this->n_bytes[s], out + 0 * this->n_bytes[s]); 28 : } 29 : 30 0 : this->wake_up_puller(); 31 0 : } 32 : 33 : void 34 0 : Adaptor_n_to_1::pull_1(const std::vector<int8_t*>& out, const size_t frame_id) 35 : { 36 0 : this->wait_pull(); 37 : 38 0 : for (size_t s = 0; s < this->n_sockets; s++) 39 : { 40 0 : const int8_t* in = (const int8_t*)this->get_filled_buffer(s); 41 : 42 0 : std::copy( 43 0 : in + 0 * this->n_bytes[s], in + this->get_n_frames() * this->n_bytes[s], out[s] + 0 * this->n_bytes[s]); 44 : } 45 : 46 0 : this->wake_up_pusher(); 47 0 : } 48 : 49 : void 50 229531 : Adaptor_n_to_1::wait_push() 51 : { 52 229531 : if (this->active_waiting) 53 : { 54 7501 : while (this->is_full(this->id) && !*this->waiting_canceled) 55 : ; 56 : } 57 : else // passive waiting 58 : { 59 222350 : if (this->is_full(this->id) && !*this->waiting_canceled) 60 : { 61 128399 : std::unique_lock<std::mutex> lock((*this->mtx_put.get())[this->id]); 62 128831 : (*this->cnd_put.get())[this->id].wait( 63 257188 : lock, [this]() { return !(this->is_full(this->id) && !*this->waiting_canceled); }); 64 128699 : } 65 : } 66 : 67 229262 : if (*this->waiting_canceled) throw tools::waiting_canceled(__FILE__, __LINE__, __func__); 68 228723 : } 69 : 70 : void 71 217485 : Adaptor_n_to_1::wait_pull() 72 : { 73 217485 : if (this->active_waiting) 74 : { 75 1213842 : while (this->is_empty(this->cur_id) && !*this->waiting_canceled) 76 : ; 77 : } 78 : else // passive waiting 79 : { 80 213251 : if (this->is_empty(this->cur_id) && !*this->waiting_canceled) 81 : { 82 47979 : std::unique_lock<std::mutex> lock(*this->mtx_pull.get()); 83 : (*this->cnd_pull.get()) 84 144265 : .wait(lock, [this]() { return !(this->is_empty(this->cur_id) && !*this->waiting_canceled); }); 85 47977 : } 86 : } 87 : 88 216605 : if (this->is_empty(this->cur_id) && *this->waiting_canceled) 89 43 : throw tools::waiting_canceled(__FILE__, __LINE__, __func__); 90 215627 : } 91 : 92 : void* 93 348679 : Adaptor_n_to_1::get_empty_buffer(const size_t sid) 94 : { 95 348679 : return (void*)(*this->buffer)[this->id][sid][(*this->last)[this->id] % this->buffer_size]; 96 : } 97 : 98 : void* 99 331202 : Adaptor_n_to_1::get_filled_buffer(const size_t sid) 100 : { 101 331202 : return (void*)(*this->buffer)[this->cur_id][sid][(*this->first)[this->cur_id] % this->buffer_size]; 102 : } 103 : 104 : void* 105 349113 : Adaptor_n_to_1::get_empty_buffer(const size_t sid, void* swap_buffer) 106 : { 107 349113 : void* empty_buffer = (void*)(*this->buffer)[this->id][sid][(*this->last)[this->id] % this->buffer_size]; 108 347647 : (*this->buffer)[this->id][sid][(*this->last)[this->id] % this->buffer_size] = (int8_t*)swap_buffer; 109 347063 : return empty_buffer; 110 : } 111 : 112 : void* 113 332388 : Adaptor_n_to_1::get_filled_buffer(const size_t sid, void* swap_buffer) 114 : { 115 332388 : void* filled_buffer = (void*)(*this->buffer)[this->cur_id][sid][(*this->first)[this->cur_id] % this->buffer_size]; 116 330907 : (*this->buffer)[this->cur_id][sid][(*this->first)[this->cur_id] % this->buffer_size] = (int8_t*)swap_buffer; 117 329764 : return filled_buffer; 118 : } 119 : 120 : void 121 229642 : Adaptor_n_to_1::wake_up_puller() 122 : { 123 229642 : (*this->last)[this->id]++; 124 : 125 230909 : if (!this->active_waiting) // passive waiting 126 : { 127 223331 : if (!this->is_empty(this->id)) 128 : { 129 222477 : std::lock_guard<std::mutex> lock(*this->mtx_pull.get()); 130 223386 : (*this->cnd_pull.get()).notify_one(); 131 223378 : } 132 : } 133 230973 : } 134 : 135 : void 136 220776 : Adaptor_n_to_1::wake_up_pusher() 137 : { 138 220776 : (*this->first)[this->cur_id]++; 139 : 140 221189 : if (!this->active_waiting) // passive waiting 141 : { 142 216978 : if (!this->is_full(this->cur_id)) 143 : { 144 215372 : std::lock_guard<std::mutex> lock((*this->mtx_put.get())[this->cur_id]); 145 216906 : (*this->cnd_put.get())[this->cur_id].notify_one(); 146 217033 : } 147 : } 148 : 149 : do 150 : { 151 221232 : this->cur_id = (this->cur_id + 1) % this->buffer->size(); 152 221071 : } while ((*this->buffer)[this->cur_id].size() == 0); 153 220556 : } 154 : 155 : void 156 2072 : Adaptor_n_to_1::wake_up() 157 : { 158 2072 : if (!this->active_waiting) // passive waiting 159 : { 160 77672 : for (size_t i = 0; i < this->buffer->size(); i++) 161 75918 : if ((*this->buffer)[i].size() != 0) 162 : { 163 75918 : std::unique_lock<std::mutex> lock((*this->mtx_put.get())[i]); 164 75918 : (*this->cnd_put.get())[i].notify_all(); 165 75918 : } 166 : 167 1754 : std::lock_guard<std::mutex> lock(*this->mtx_pull.get()); 168 1754 : (*this->cnd_pull.get()).notify_all(); 169 1754 : } 170 2072 : } 171 : 172 : void 173 2072 : Adaptor_n_to_1::cancel_waiting() 174 : { 175 2072 : this->send_cancel_signal(); 176 2072 : this->wake_up(); 177 2072 : }