Line data Source code
1 : #include <algorithm> 2 : 3 : #include "Module/Adaptor/Adaptor_n_to_1.hpp" 4 : #include "Tools/Exception/exception.hpp" 5 : 6 : using namespace spu; 7 : using namespace spu::module; 8 : 9 : Adaptor_n_to_1* 10 1888 : Adaptor_n_to_1::clone() const 11 : { 12 1888 : auto m = new Adaptor_n_to_1(*this); 13 1888 : m->deep_copy(*this); 14 1888 : return m; 15 : } 16 : 17 : void 18 0 : Adaptor_n_to_1::push_n(const std::vector<const int8_t*>& in, const size_t frame_id) 19 : { 20 0 : this->wait_push(); 21 : 22 0 : for (size_t s = 0; s < this->n_sockets; s++) 23 : { 24 0 : int8_t* out = (int8_t*)this->get_empty_buffer(s); 25 : 26 0 : std::copy( 27 0 : in[s] + 0 * this->n_bytes[s], in[s] + this->get_n_frames() * this->n_bytes[s], out + 0 * this->n_bytes[s]); 28 : } 29 : 30 0 : this->wake_up_puller(); 31 0 : } 32 : 33 : void 34 0 : Adaptor_n_to_1::pull_1(const std::vector<int8_t*>& out, const size_t frame_id) 35 : { 36 0 : this->wait_pull(); 37 : 38 0 : for (size_t s = 0; s < this->n_sockets; s++) 39 : { 40 0 : const int8_t* in = (const int8_t*)this->get_filled_buffer(s); 41 : 42 0 : std::copy( 43 0 : in + 0 * this->n_bytes[s], in + this->get_n_frames() * this->n_bytes[s], out[s] + 0 * this->n_bytes[s]); 44 : } 45 : 46 0 : this->wake_up_pusher(); 47 0 : } 48 : 49 : void 50 30846 : Adaptor_n_to_1::wait_push() 51 : { 52 30846 : if (this->active_waiting) 53 : { 54 6963 : while (this->is_full(this->id) && !*this->waiting_canceled) 55 : ; 56 : } 57 : else // passive waiting 58 : { 59 24025 : if (this->is_full(this->id) && !*this->waiting_canceled) 60 : { 61 559 : std::unique_lock<std::mutex> lock((*this->mtx_put.get())[this->id]); 62 561 : (*this->cnd_put.get())[this->id].wait( 63 1172 : lock, [this]() { return !(this->is_full(this->id) && !*this->waiting_canceled); }); 64 554 : } 65 : } 66 : 67 29928 : if (*this->waiting_canceled) throw tools::waiting_canceled(__FILE__, __LINE__, __func__); 68 29546 : } 69 : 70 : void 71 21890 : Adaptor_n_to_1::wait_pull() 72 : { 73 21890 : if (this->active_waiting) 74 : { 75 610410 : while (this->is_empty(this->cur_id) && !*this->waiting_canceled) 76 : ; 77 : } 78 : else // passive waiting 79 : { 80 17681 : if (this->is_empty(this->cur_id) && !*this->waiting_canceled) 81 : { 82 2228 : std::unique_lock<std::mutex> lock(*this->mtx_pull.get()); 83 : (*this->cnd_pull.get()) 84 9334 : .wait(lock, [this]() { return !(this->is_empty(this->cur_id) && !*this->waiting_canceled); }); 85 2228 : } 86 : } 87 : 88 21831 : if (this->is_empty(this->cur_id) && *this->waiting_canceled) 89 42 : throw tools::waiting_canceled(__FILE__, __LINE__, __func__); 90 21679 : } 91 : 92 : void* 93 52512 : Adaptor_n_to_1::get_empty_buffer(const size_t sid) 94 : { 95 52512 : return (void*)(*this->buffer)[this->id][sid][(*this->last)[this->id] % this->buffer_size]; 96 : } 97 : 98 : void* 99 39853 : Adaptor_n_to_1::get_filled_buffer(const size_t sid) 100 : { 101 39853 : return (void*)(*this->buffer)[this->cur_id][sid][(*this->first)[this->cur_id] % this->buffer_size]; 102 : } 103 : 104 : void* 105 52951 : Adaptor_n_to_1::get_empty_buffer(const size_t sid, void* swap_buffer) 106 : { 107 52951 : void* empty_buffer = (void*)(*this->buffer)[this->id][sid][(*this->last)[this->id] % this->buffer_size]; 108 51433 : (*this->buffer)[this->id][sid][(*this->last)[this->id] % this->buffer_size] = (int8_t*)swap_buffer; 109 50933 : return empty_buffer; 110 : } 111 : 112 : void* 113 39870 : Adaptor_n_to_1::get_filled_buffer(const size_t sid, void* swap_buffer) 114 : { 115 39870 : void* filled_buffer = (void*)(*this->buffer)[this->cur_id][sid][(*this->first)[this->cur_id] % this->buffer_size]; 116 39777 : (*this->buffer)[this->cur_id][sid][(*this->first)[this->cur_id] % this->buffer_size] = (int8_t*)swap_buffer; 117 39743 : return filled_buffer; 118 : } 119 : 120 : void 121 30648 : Adaptor_n_to_1::wake_up_puller() 122 : { 123 30648 : (*this->last)[this->id]++; 124 : 125 31955 : if (!this->active_waiting) // passive waiting 126 : { 127 24454 : if (!this->is_empty(this->id)) 128 : { 129 23683 : std::lock_guard<std::mutex> lock(*this->mtx_pull.get()); 130 24547 : (*this->cnd_pull.get()).notify_one(); 131 24541 : } 132 : } 133 32042 : } 134 : 135 : void 136 21843 : Adaptor_n_to_1::wake_up_pusher() 137 : { 138 21843 : (*this->first)[this->cur_id]++; 139 : 140 21908 : if (!this->active_waiting) // passive waiting 141 : { 142 17701 : if (!this->is_full(this->cur_id)) 143 : { 144 17616 : std::lock_guard<std::mutex> lock((*this->mtx_put.get())[this->cur_id]); 145 17694 : (*this->cnd_put.get())[this->cur_id].notify_one(); 146 17688 : } 147 : } 148 : 149 : do 150 : { 151 21977 : this->cur_id = (this->cur_id + 1) % this->buffer->size(); 152 21875 : } while ((*this->buffer)[this->cur_id].size() == 0); 153 21777 : } 154 : 155 : void 156 2056 : Adaptor_n_to_1::wake_up() 157 : { 158 2056 : if (!this->active_waiting) // passive waiting 159 : { 160 77589 : for (size_t i = 0; i < this->buffer->size(); i++) 161 75845 : if ((*this->buffer)[i].size() != 0) 162 : { 163 75837 : std::unique_lock<std::mutex> lock((*this->mtx_put.get())[i]); 164 75853 : (*this->cnd_put.get())[i].notify_all(); 165 75847 : } 166 : 167 1738 : std::lock_guard<std::mutex> lock(*this->mtx_pull.get()); 168 1738 : (*this->cnd_pull.get()).notify_all(); 169 1738 : } 170 2056 : } 171 : 172 : void 173 2056 : Adaptor_n_to_1::cancel_waiting() 174 : { 175 2056 : this->send_cancel_signal(); 176 2056 : this->wake_up(); 177 2056 : }