Line data Source code
1 : #include "Tools/Thread/Thread_pool/Standard/Thread_pool_standard.hpp" 2 : 3 : using namespace spu; 4 : using namespace spu::tools; 5 : 6 674 : Thread_pool_standard::Thread_pool_standard(const size_t n_threads) 7 : : Thread_pool(n_threads) 8 674 : , pool(n_threads) 9 674 : , mtx(n_threads) 10 674 : , cnd(n_threads) 11 1348 : , barrier(n_threads) 12 : { 13 674 : } 14 : 15 0 : Thread_pool_standard::Thread_pool_standard(const size_t n_threads, std::function<void(const size_t)>& func_init) 16 0 : : Thread_pool_standard(n_threads) 17 : { 18 0 : this->set_func_init(func_init); 19 0 : this->init(); 20 0 : } 21 : 22 0 : Thread_pool_standard::Thread_pool_standard(const Thread_pool_standard& other) 23 : : Thread_pool(other) 24 0 : , pool(other.n_threads) 25 0 : , mtx(other.n_threads) 26 0 : , cnd(other.n_threads) 27 0 : , barrier(other.barrier) 28 : { 29 0 : } 30 : 31 : void 32 674 : Thread_pool_standard::init(const bool async) 33 : { 34 674 : if (this->initialized) 35 : { 36 0 : std::stringstream message; 37 0 : message << "This pool of threads has already been initialized and cannot be initialized twice."; 38 0 : throw tools::runtime_error(__FILE__, __LINE__, __func__, message.str()); 39 0 : } 40 : 41 5685 : for (size_t tid = 0; tid < n_threads; tid++) 42 5011 : this->pool[tid] = std::thread(&Thread_pool_standard::_start_thread, this, tid); 43 : 44 674 : if (!async) 45 : { 46 674 : this->barrier.wait(); 47 674 : this->initialized = true; 48 : } 49 674 : } 50 : 51 1348 : Thread_pool_standard::~Thread_pool_standard() 52 : { 53 : // stop the threads pool 54 674 : this->stop_threads = true; 55 5685 : for (size_t tid = 0; tid < this->n_threads; tid++) 56 : { 57 5011 : std::lock_guard<std::mutex> lock(this->mtx[tid]); 58 5011 : this->cnd[tid].notify_one(); 59 5011 : } 60 : 61 5685 : for (size_t tid = 0; tid < n_threads; tid++) 62 5011 : this->pool[tid].join(); 63 1348 : } 64 : 65 : void 66 511 : Thread_pool_standard::run(const bool async) 67 : { 68 511 : if (!this->initialized) 69 : { 70 0 : std::stringstream message; 71 0 : message << "This pool of threads cannot be run because it has not been initialized."; 72 0 : throw tools::runtime_error(__FILE__, __LINE__, __func__, message.str()); 73 0 : } 74 : 75 5034 : for (size_t tid = 0; tid < n_threads; tid++) 76 : { 77 4523 : std::lock_guard<std::mutex> lock(this->mtx[tid]); 78 4524 : this->cnd[tid].notify_one(); 79 4522 : } 80 : 81 511 : if (!async) this->barrier.wait(); 82 511 : } 83 : 84 : void 85 511 : Thread_pool_standard::run(std::function<void(const size_t)>& set_func_exec, const bool async) 86 : { 87 511 : this->set_func_exec(set_func_exec); 88 511 : this->run(async); 89 511 : } 90 : 91 : void 92 498 : Thread_pool_standard::wait() 93 : { 94 498 : this->barrier.wait(); 95 505 : this->initialized = true; 96 505 : } 97 : 98 : void 99 3715 : Thread_pool_standard::_start_thread(const size_t tid) 100 : { 101 3715 : this->func_init(tid); 102 : 103 13225 : while (!this->stop_threads) 104 : { 105 8292 : std::unique_lock<std::mutex> lock(this->mtx[tid]); 106 9013 : this->barrier.arrive(); 107 8936 : this->cnd[tid].wait(lock); 108 : 109 9267 : if (!this->stop_threads) this->func_exec(tid); 110 9367 : } 111 : 112 4933 : this->func_deinit(tid); 113 4706 : }