Line data Source code
1 : #include "Tools/Thread/Thread_pool/Standard/Thread_pool_standard.hpp" 2 : 3 : using namespace spu; 4 : using namespace spu::tools; 5 : 6 592 : Thread_pool_standard::Thread_pool_standard(const size_t n_threads) 7 : : Thread_pool(n_threads) 8 592 : , pool(n_threads) 9 592 : , mtx(n_threads) 10 592 : , cnd(n_threads) 11 1184 : , barrier(n_threads) 12 : { 13 592 : } 14 : 15 0 : Thread_pool_standard::Thread_pool_standard(const size_t n_threads, std::function<void(const size_t)>& func_init) 16 0 : : Thread_pool_standard(n_threads) 17 : { 18 0 : this->set_func_init(func_init); 19 0 : this->init(); 20 0 : } 21 : 22 0 : Thread_pool_standard::Thread_pool_standard(const Thread_pool_standard& other) 23 : : Thread_pool(other) 24 0 : , pool(other.n_threads) 25 0 : , mtx(other.n_threads) 26 0 : , cnd(other.n_threads) 27 0 : , barrier(other.barrier) 28 : { 29 0 : } 30 : 31 : void 32 592 : Thread_pool_standard::init(const bool async) 33 : { 34 592 : if (this->initialized) 35 : { 36 0 : std::stringstream message; 37 0 : message << "This pool of threads has already been initialized and cannot be initialized twice."; 38 0 : throw tools::runtime_error(__FILE__, __LINE__, __func__, message.str()); 39 0 : } 40 : 41 5124 : for (size_t tid = 0; tid < n_threads; tid++) 42 4532 : this->pool[tid] = std::thread(&Thread_pool_standard::_start_thread, this, tid); 43 : 44 592 : if (!async) 45 : { 46 592 : this->barrier.wait(); 47 592 : this->initialized = true; 48 : } 49 592 : } 50 : 51 1184 : Thread_pool_standard::~Thread_pool_standard() 52 : { 53 : // stop the threads pool 54 592 : this->stop_threads = true; 55 5124 : for (size_t tid = 0; tid < this->n_threads; tid++) 56 : { 57 4532 : std::lock_guard<std::mutex> lock(this->mtx[tid]); 58 4532 : this->cnd[tid].notify_one(); 59 4532 : } 60 : 61 5124 : for (size_t tid = 0; tid < n_threads; tid++) 62 4532 : this->pool[tid].join(); 63 1184 : } 64 : 65 : void 66 443 : Thread_pool_standard::run(const bool async) 67 : { 68 443 : if (!this->initialized) 69 : { 70 0 : std::stringstream message; 71 0 : message << "This pool of threads cannot be run because it has not been initialized."; 72 0 : throw tools::runtime_error(__FILE__, __LINE__, __func__, message.str()); 73 0 : } 74 : 75 4488 : for (size_t tid = 0; tid < n_threads; tid++) 76 : { 77 4045 : std::lock_guard<std::mutex> lock(this->mtx[tid]); 78 4045 : this->cnd[tid].notify_one(); 79 4045 : } 80 : 81 443 : if (!async) this->barrier.wait(); 82 443 : } 83 : 84 : void 85 443 : Thread_pool_standard::run(std::function<void(const size_t)>& set_func_exec, const bool async) 86 : { 87 443 : this->set_func_exec(set_func_exec); 88 443 : this->run(async); 89 443 : } 90 : 91 : void 92 436 : Thread_pool_standard::wait() 93 : { 94 436 : this->barrier.wait(); 95 442 : this->initialized = true; 96 442 : } 97 : 98 : void 99 2567 : Thread_pool_standard::_start_thread(const size_t tid) 100 : { 101 2567 : this->func_init(tid); 102 : 103 10856 : while (!this->stop_threads) 104 : { 105 6711 : std::unique_lock<std::mutex> lock(this->mtx[tid]); 106 7623 : this->barrier.arrive(); 107 7549 : this->cnd[tid].wait(lock); 108 : 109 7665 : if (!this->stop_threads) this->func_exec(tid); 110 8162 : } 111 : 112 4145 : this->func_deinit(tid); 113 3882 : }