Line data Source code
1 : #include <sstream> 2 : #include <string> 3 : 4 : #include "Module/Adaptor/Adaptor_1_to_n.hpp" 5 : #include "Tools/Exception/exception.hpp" 6 : 7 : namespace spu 8 : { 9 : namespace module 10 : { 11 : 12 : runtime::Task& 13 : Adaptor_1_to_n::operator[](const adp::tsk t) 14 : { 15 : return Module::operator[]((size_t)t); 16 : } 17 : 18 : runtime::Socket& 19 : Adaptor_1_to_n::operator[](const adp::sck::push_1 s) 20 : { 21 : return Module::operator[]((size_t)adp::tsk::push_1)[(size_t)s]; 22 : } 23 : 24 : runtime::Socket& 25 : Adaptor_1_to_n::operator[](const adp::sck::pull_n s) 26 : { 27 : return Module::operator[]((size_t)adp::tsk::pull_n)[(size_t)s]; 28 : } 29 : 30 : Adaptor_1_to_n::Adaptor_1_to_n(const size_t n_elmts, 31 : const std::type_index datatype, 32 : const size_t buffer_size, 33 : const bool active_waiting) 34 : : Adaptor(n_elmts, datatype, buffer_size) 35 : , active_waiting(active_waiting) 36 : , cnd_pull(new std::vector<std::condition_variable>(1000)) 37 : , mtx_pull(new std::vector<std::mutex>(1000)) 38 : , cnd_put(new std::condition_variable()) 39 : , mtx_put(new std::mutex()) 40 : { 41 : this->init(); 42 : } 43 : 44 101 : Adaptor_1_to_n::Adaptor_1_to_n(const std::vector<size_t>& n_elmts, 45 : const std::vector<std::type_index>& datatype, 46 : const size_t buffer_size, 47 101 : const bool active_waiting) 48 : : Adaptor(n_elmts, datatype, buffer_size) 49 101 : , active_waiting(active_waiting) 50 101 : , cnd_pull(new std::vector<std::condition_variable>(1000)) 51 101 : , mtx_pull(new std::vector<std::mutex>(1000)) 52 101 : , cnd_put(new std::condition_variable()) 53 202 : , mtx_put(new std::mutex()) 54 : { 55 101 : this->init(); 56 101 : } 57 : 58 : void 59 101 : Adaptor_1_to_n::init() 60 : { 61 101 : const std::string name = "Adaptor_1_to_n"; 62 101 : this->set_name(name); 63 101 : this->set_short_name(name); 64 101 : this->set_single_wave(true); 65 : 66 101 : auto& p1 = this->create_task("push_1", (int)adp::tsk::push_1); 67 101 : std::vector<size_t> p1s_in; 68 267 : for (size_t s = 0; s < this->n_sockets; s++) 69 166 : p1s_in.push_back(this->create_socket_in(p1, "in" + std::to_string(s), this->n_elmts[s], this->datatype[s])); 70 : 71 101 : this->create_codelet(p1, 72 50525 : [p1s_in](Module& m, runtime::Task& t, const size_t frame_id) -> int 73 : { 74 50525 : auto& adp = static_cast<Adaptor_1_to_n&>(m); 75 50525 : if (adp.is_no_copy_push()) 76 : { 77 50537 : adp.wait_push(); 78 : // for debug mode coherence 79 127169 : for (size_t s = 0; s < t.sockets.size() - 1; s++) 80 76324 : t.sockets[s]->dataptr = adp.get_empty_buffer(s); 81 : } 82 : else 83 : { 84 0 : std::vector<const int8_t*> sockets_dataptr(p1s_in.size()); 85 0 : for (size_t s = 0; s < p1s_in.size(); s++) 86 0 : sockets_dataptr[s] = t[p1s_in[s]].get_dataptr<const int8_t>(); 87 0 : adp.push_1(sockets_dataptr, frame_id); 88 0 : } 89 50648 : return runtime::status_t::SUCCESS; 90 : }); 91 : 92 101 : auto& p2 = this->create_task("pull_n", (int)adp::tsk::pull_n); 93 101 : std::vector<size_t> p2s_out; 94 267 : for (size_t s = 0; s < this->n_sockets; s++) 95 166 : p2s_out.push_back(this->create_socket_out(p2, "out" + std::to_string(s), this->n_elmts[s], this->datatype[s])); 96 : 97 101 : this->create_codelet(p2, 98 36534 : [p2s_out](Module& m, runtime::Task& t, const size_t frame_id) -> int 99 : { 100 36534 : auto& adp = static_cast<Adaptor_1_to_n&>(m); 101 36534 : if (adp.is_no_copy_pull()) 102 : { 103 36610 : adp.wait_pull(); 104 : // for debug mode coherence 105 90659 : for (size_t s = 0; s < t.sockets.size() - 1; s++) // Adaptor pull has only OUT sockets 106 53719 : t.sockets[s]->_bind(adp.get_filled_buffer(s)); 107 : } 108 : else 109 : { 110 0 : std::vector<int8_t*> sockets_dataptr(p2s_out.size()); 111 0 : for (size_t s = 0; s < p2s_out.size(); s++) 112 0 : sockets_dataptr[s] = t[p2s_out[s]].get_dataptr<int8_t>(); 113 0 : adp.pull_n(sockets_dataptr, frame_id); 114 0 : } 115 34239 : return runtime::status_t::SUCCESS; 116 : }); 117 101 : } 118 : } 119 : }