Line data Source code
1 : #include <algorithm> 2 : 3 : #include "Module/Adaptor/Adaptor_1_to_n.hpp" 4 : #include "Tools/Exception/exception.hpp" 5 : 6 : using namespace spu; 7 : using namespace spu::module; 8 : 9 : Adaptor_1_to_n* 10 1888 : Adaptor_1_to_n::clone() const 11 : { 12 1888 : auto m = new Adaptor_1_to_n(*this); 13 1888 : m->deep_copy(*this); 14 1888 : return m; 15 : } 16 : 17 : void 18 0 : Adaptor_1_to_n::push_1(const std::vector<const int8_t*>& in, const size_t frame_id) 19 : { 20 0 : this->wait_push(); 21 : 22 0 : for (size_t s = 0; s < this->n_sockets; s++) 23 : { 24 0 : int8_t* out = (int8_t*)this->get_empty_buffer(s); 25 : 26 0 : std::copy( 27 0 : in[s] + 0 * this->n_bytes[s], in[s] + this->get_n_frames() * this->n_bytes[s], out + 0 * this->n_bytes[s]); 28 : } 29 : 30 0 : this->wake_up_puller(); 31 0 : } 32 : 33 : void 34 0 : Adaptor_1_to_n::pull_n(const std::vector<int8_t*>& out, const size_t frame_id) 35 : { 36 0 : this->wait_pull(); 37 : 38 0 : for (size_t s = 0; s < this->n_sockets; s++) 39 : { 40 0 : const int8_t* in = (const int8_t*)this->get_filled_buffer(s); 41 : 42 0 : std::copy( 43 0 : in + 0 * this->n_bytes[s], in + this->get_n_frames() * this->n_bytes[s], out[s] + 0 * this->n_bytes[s]); 44 : } 45 : 46 0 : this->wake_up_pusher(); 47 0 : } 48 : 49 : void 50 50545 : Adaptor_1_to_n::wait_push() 51 : { 52 50545 : if (this->active_waiting) 53 : { 54 7904 : while (this->is_full(this->cur_id) && !*this->waiting_canceled) 55 : ; 56 : } 57 : else // passive waiting 58 : { 59 42922 : if (this->is_full(this->cur_id) && !*this->waiting_canceled) 60 : { 61 2878 : std::unique_lock<std::mutex> lock(*this->mtx_put.get()); 62 : (*this->cnd_put.get()) 63 9438 : .wait(lock, [this]() { return !(this->is_full(this->cur_id) && !*this->waiting_canceled); }); 64 2878 : } 65 : } 66 : 67 50703 : if (*this->waiting_canceled) throw tools::waiting_canceled(__FILE__, __LINE__, __func__); 68 50643 : } 69 : 70 : void 71 36675 : Adaptor_1_to_n::wait_pull() 72 : { 73 36675 : if (this->active_waiting) 74 : { 75 96177 : while (this->is_empty(this->id) && !*this->waiting_canceled) 76 : ; 77 : } 78 : else // passive waiting 79 : { 80 29572 : if (this->is_empty(this->id) && !*this->waiting_canceled) 81 : { 82 19206 : std::unique_lock<std::mutex> lock((*this->mtx_pull.get())[this->id]); 83 19829 : (*this->cnd_pull.get())[this->id].wait( 84 60177 : lock, [this]() { return !(this->is_empty(this->id) && !*this->waiting_canceled); }); 85 19290 : } 86 : } 87 : 88 0 : if (this->is_empty(this->id) && *this->waiting_canceled) 89 1493 : throw tools::waiting_canceled(__FILE__, __LINE__, __func__); 90 33023 : } 91 : 92 : void* 93 76310 : Adaptor_1_to_n::get_empty_buffer(const size_t sid) 94 : { 95 76310 : return (void*)(*this->buffer)[this->cur_id][sid][(*this->last)[this->cur_id] % this->buffer_size]; 96 : } 97 : 98 : void* 99 57390 : Adaptor_1_to_n::get_filled_buffer(const size_t sid) 100 : { 101 57390 : return (void*)(*this->buffer)[this->id][sid][(*this->first)[this->id] % this->buffer_size]; 102 : } 103 : 104 : void* 105 76693 : Adaptor_1_to_n::get_empty_buffer(const size_t sid, void* swap_buffer) 106 : { 107 76693 : void* empty_buffer = (void*)(*this->buffer)[this->cur_id][sid][(*this->last)[this->cur_id] % this->buffer_size]; 108 76623 : (*this->buffer)[this->cur_id][sid][(*this->last)[this->cur_id] % this->buffer_size] = (int8_t*)swap_buffer; 109 76558 : return empty_buffer; 110 : } 111 : 112 : void* 113 58260 : Adaptor_1_to_n::get_filled_buffer(const size_t sid, void* swap_buffer) 114 : { 115 58260 : void* filled_buffer = (void*)(*this->buffer)[this->id][sid][(*this->first)[this->id] % this->buffer_size]; 116 57324 : (*this->buffer)[this->id][sid][(*this->first)[this->id] % this->buffer_size] = (int8_t*)swap_buffer; 117 56925 : return filled_buffer; 118 : } 119 : 120 : void 121 50817 : Adaptor_1_to_n::wake_up_puller() 122 : { 123 50817 : (*this->last)[this->cur_id]++; 124 : 125 50995 : if (!this->active_waiting) // passive waiting 126 : { 127 43367 : if (!this->is_empty(this->cur_id)) 128 : { 129 43227 : std::lock_guard<std::mutex> lock((*this->mtx_pull.get())[this->cur_id]); 130 43347 : (*this->cnd_pull.get())[this->cur_id].notify_one(); 131 43379 : } 132 : } 133 : 134 : do 135 : { 136 51003 : this->cur_id = (this->cur_id + 1) % this->buffer->size(); 137 50986 : } while ((*this->buffer)[this->cur_id].size() == 0); 138 50958 : } 139 : 140 : void 141 34819 : Adaptor_1_to_n::wake_up_pusher() 142 : { 143 34819 : (*this->first)[this->id]++; 144 : 145 36724 : if (!this->active_waiting) // passive waiting 146 : { 147 29304 : if (!this->is_full(this->id)) 148 : { 149 28245 : std::lock_guard<std::mutex> lock(*this->mtx_put.get()); 150 29411 : (*this->cnd_put.get()).notify_one(); 151 29391 : } 152 : } 153 36821 : } 154 : 155 : void 156 5861 : Adaptor_1_to_n::wake_up() 157 : { 158 5861 : if (!this->active_waiting) // passive waiting 159 : { 160 225038 : for (size_t i = 0; i < this->buffer->size(); i++) 161 219273 : if ((*this->buffer)[i].size() != 0) 162 : { 163 218396 : std::unique_lock<std::mutex> lock((*this->mtx_pull.get())[i]); 164 219923 : (*this->cnd_pull.get())[i].notify_all(); 165 219978 : } 166 : 167 4960 : std::lock_guard<std::mutex> lock(*this->mtx_put.get()); 168 4960 : (*this->cnd_put.get()).notify_all(); 169 4959 : } 170 5862 : } 171 : 172 : void 173 5862 : Adaptor_1_to_n::cancel_waiting() 174 : { 175 5862 : this->send_cancel_signal(); 176 5861 : this->wake_up(); 177 5862 : }