Line data Source code
1 : #include <sstream> 2 : #include <string> 3 : 4 : #include "Module/Stateful/Adaptor/Adaptor_1_to_n.hpp" 5 : #include "Tools/Exception/exception.hpp" 6 : 7 : namespace spu 8 : { 9 : namespace module 10 : { 11 : 12 : runtime::Task& 13 : Adaptor_1_to_n::operator[](const adp::tsk t) 14 : { 15 : return Module::operator[]((size_t)t); 16 : } 17 : 18 : runtime::Socket& 19 : Adaptor_1_to_n::operator[](const adp::sck::push_1 s) 20 : { 21 : return Module::operator[]((size_t)adp::tsk::push_1)[(size_t)s]; 22 : } 23 : 24 : runtime::Socket& 25 : Adaptor_1_to_n::operator[](const adp::sck::pull_n s) 26 : { 27 : return Module::operator[]((size_t)adp::tsk::pull_n)[(size_t)s]; 28 : } 29 : 30 : Adaptor_1_to_n::Adaptor_1_to_n(const size_t n_elmts, 31 : const std::type_index datatype, 32 : const size_t buffer_size, 33 : const bool active_waiting) 34 : : Adaptor(n_elmts, datatype, buffer_size) 35 : , active_waiting(active_waiting) 36 : , cnd_pull(new std::vector<std::condition_variable>(1000)) 37 : , mtx_pull(new std::vector<std::mutex>(1000)) 38 : , cnd_put(new std::condition_variable()) 39 : , mtx_put(new std::mutex()) 40 : { 41 : this->init(); 42 : } 43 : 44 114 : Adaptor_1_to_n::Adaptor_1_to_n(const std::vector<size_t>& n_elmts, 45 : const std::vector<std::type_index>& datatype, 46 : const size_t buffer_size, 47 114 : const bool active_waiting) 48 : : Adaptor(n_elmts, datatype, buffer_size) 49 114 : , active_waiting(active_waiting) 50 114 : , cnd_pull(new std::vector<std::condition_variable>(1000)) 51 114 : , mtx_pull(new std::vector<std::mutex>(1000)) 52 114 : , cnd_put(new std::condition_variable()) 53 228 : , mtx_put(new std::mutex()) 54 : { 55 114 : this->init(); 56 114 : } 57 : 58 : void 59 114 : Adaptor_1_to_n::init() 60 : { 61 114 : const std::string name = "Adaptor_1_to_n"; 62 114 : this->set_name(name); 63 114 : this->set_short_name(name); 64 114 : this->set_single_wave(true); 65 : 66 114 : auto& p1 = this->create_task("push_1", (int)adp::tsk::push_1); 67 114 : p1.set_replicability(false); 68 114 : std::vector<size_t> p1s_in; 69 296 : for (size_t s = 0; s < this->n_sockets; s++) 70 182 : p1s_in.push_back(this->create_socket_in(p1, "in" + std::to_string(s), this->n_elmts[s], this->datatype[s])); 71 : 72 114 : this->create_codelet(p1, 73 339919 : [p1s_in](Module& m, runtime::Task& t, const size_t frame_id) -> int 74 : { 75 339919 : auto& adp = static_cast<Adaptor_1_to_n&>(m); 76 339919 : if (adp.is_no_copy_push()) 77 : { 78 340180 : adp.wait_push(); 79 : // for debug mode coherence 80 998537 : for (size_t s = 0; s < t.sockets.size() - 1; s++) 81 643702 : t.sockets[s]->dataptr = adp.get_empty_buffer(s); 82 : } 83 : else 84 : { 85 0 : std::vector<const int8_t*> sockets_dataptr(p1s_in.size()); 86 0 : for (size_t s = 0; s < p1s_in.size(); s++) 87 0 : sockets_dataptr[s] = t[p1s_in[s]].get_dataptr<const int8_t>(); 88 0 : adp.push_1(sockets_dataptr, frame_id); 89 0 : } 90 345486 : return runtime::status_t::SUCCESS; 91 : }); 92 : 93 114 : auto& p2 = this->create_task("pull_n", (int)adp::tsk::pull_n); 94 114 : std::vector<size_t> p2s_out; 95 296 : for (size_t s = 0; s < this->n_sockets; s++) 96 182 : p2s_out.push_back(this->create_socket_out(p2, "out" + std::to_string(s), this->n_elmts[s], this->datatype[s])); 97 : 98 114 : this->create_codelet(p2, 99 331532 : [p2s_out](Module& m, runtime::Task& t, const size_t frame_id) -> int 100 : { 101 331532 : auto& adp = static_cast<Adaptor_1_to_n&>(m); 102 331532 : if (adp.is_no_copy_pull()) 103 : { 104 331793 : adp.wait_pull(); 105 : // for debug mode coherence 106 972842 : for (size_t s = 0; s < t.sockets.size() - 1; s++) // Adaptor pull has only OUT sockets 107 630595 : t.sockets[s]->_bind(adp.get_filled_buffer(s)); 108 : } 109 : else 110 : { 111 0 : std::vector<int8_t*> sockets_dataptr(p2s_out.size()); 112 0 : for (size_t s = 0; s < p2s_out.size(); s++) 113 0 : sockets_dataptr[s] = t[p2s_out[s]].get_dataptr<int8_t>(); 114 0 : adp.pull_n(sockets_dataptr, frame_id); 115 0 : } 116 331498 : return runtime::status_t::SUCCESS; 117 : }); 118 114 : } 119 : } 120 : }