Line data Source code
1 : #include <sstream> 2 : #include <string> 3 : 4 : #include "Module/Stateful/Adaptor/Adaptor_n_to_1.hpp" 5 : #include "Tools/Exception/exception.hpp" 6 : 7 : namespace spu 8 : { 9 : namespace module 10 : { 11 : 12 : runtime::Task& 13 : Adaptor_n_to_1::operator[](const adp::tsk t) 14 : { 15 : return Module::operator[]((size_t)t); 16 : } 17 : 18 : runtime::Socket& 19 : Adaptor_n_to_1::operator[](const adp::sck::push_n s) 20 : { 21 : return Module::operator[]((size_t)adp::tsk::push_n)[(size_t)s]; 22 : } 23 : 24 : runtime::Socket& 25 : Adaptor_n_to_1::operator[](const adp::sck::pull_1 s) 26 : { 27 : return Module::operator[]((size_t)adp::tsk::pull_1)[(size_t)s]; 28 : } 29 : 30 : Adaptor_n_to_1::Adaptor_n_to_1(const size_t n_elmts, 31 : const std::type_index datatype, 32 : const size_t buffer_size, 33 : const bool active_waiting) 34 : : Adaptor(n_elmts, datatype, buffer_size) 35 : , active_waiting(active_waiting) 36 : , cnd_put(new std::vector<std::condition_variable>(1000)) 37 : , mtx_put(new std::vector<std::mutex>(1000)) 38 : , cnd_pull(new std::condition_variable()) 39 : , mtx_pull(new std::mutex()) 40 : { 41 : this->init(); 42 : } 43 : 44 88 : Adaptor_n_to_1::Adaptor_n_to_1(const std::vector<size_t>& n_elmts, 45 : const std::vector<std::type_index>& datatype, 46 : const size_t buffer_size, 47 88 : const bool active_waiting) 48 : : Adaptor(n_elmts, datatype, buffer_size) 49 88 : , active_waiting(active_waiting) 50 88 : , cnd_put(new std::vector<std::condition_variable>(1000)) 51 88 : , mtx_put(new std::vector<std::mutex>(1000)) 52 88 : , cnd_pull(new std::condition_variable()) 53 176 : , mtx_pull(new std::mutex()) 54 : { 55 88 : this->init(); 56 88 : } 57 : 58 : void 59 88 : Adaptor_n_to_1::init() 60 : { 61 88 : const std::string name = "Adaptor_n_to_1"; 62 88 : this->set_name(name); 63 88 : this->set_short_name(name); 64 88 : this->set_single_wave(true); 65 : 66 88 : auto& p1 = this->create_task("push_n", (int)adp::tsk::push_n); 67 88 : std::vector<size_t> p1s_in; 68 255 : for (size_t s = 0; s < this->n_sockets; s++) 69 167 : p1s_in.push_back(this->create_socket_in(p1, "in" + std::to_string(s), this->n_elmts[s], this->datatype[s])); 70 : 71 88 : this->create_codelet(p1, 72 229223 : [p1s_in](Module& m, runtime::Task& t, const size_t frame_id) -> int 73 : { 74 229223 : auto& adp = static_cast<Adaptor_n_to_1&>(m); 75 229223 : if (adp.is_no_copy_push()) 76 : { 77 229390 : adp.wait_push(); 78 : // for debug mode coherence 79 576606 : for (size_t s = 0; s < t.sockets.size() - 1; s++) 80 345138 : t.sockets[s]->dataptr = adp.get_empty_buffer(s); 81 : } 82 : else 83 : { 84 0 : std::vector<const int8_t*> sockets_dataptr(p1s_in.size()); 85 0 : for (size_t s = 0; s < p1s_in.size(); s++) 86 0 : sockets_dataptr[s] = t[p1s_in[s]].get_dataptr<const int8_t>(); 87 0 : adp.push_n(sockets_dataptr, frame_id); 88 0 : } 89 229280 : return runtime::status_t::SUCCESS; 90 : }); 91 : 92 88 : auto& p2 = this->create_task("pull_1", (int)adp::tsk::pull_1); 93 88 : p2.set_replicability(false); 94 88 : std::vector<size_t> p2s_out; 95 255 : for (size_t s = 0; s < this->n_sockets; s++) 96 167 : p2s_out.push_back(this->create_socket_out(p2, "out" + std::to_string(s), this->n_elmts[s], this->datatype[s])); 97 : 98 88 : this->create_codelet(p2, 99 217325 : [p2s_out](Module& m, runtime::Task& t, const size_t frame_id) -> int 100 : { 101 217325 : auto& adp = static_cast<Adaptor_n_to_1&>(m); 102 217325 : if (adp.is_no_copy_pull()) 103 : { 104 217465 : adp.wait_pull(); 105 : // for debug mode coherence 106 546538 : for (size_t s = 0; s < t.sockets.size() - 1; s++) 107 323008 : t.sockets[s]->_bind(adp.get_filled_buffer(s)); 108 : } 109 : else 110 : { 111 0 : std::vector<int8_t*> sockets_dataptr(p2s_out.size()); 112 0 : for (size_t s = 0; s < p2s_out.size(); s++) 113 0 : sockets_dataptr[s] = t[p2s_out[s]].get_dataptr<int8_t>(); 114 0 : adp.pull_1(sockets_dataptr, frame_id); 115 0 : } 116 219179 : return runtime::status_t::SUCCESS; 117 : }); 118 88 : } 119 : } 120 : }