5 #include <condition_variable> 14 #include <type_traits> 27 using concurrency_t = std::invoke_result_t<decltype(std::thread::hardware_concurrency)>;
29 std::atomic<bool> running =
false;
31 std::condition_variable task_available_cv = {};
33 std::condition_variable task_done_cv = {};
35 std::queue<std::function<void()>> tasks = {};
37 std::atomic<size_t> tasks_total = 0;
39 mutable std::mutex tasks_mutex = {};
41 concurrency_t thread_count = 0;
43 std::unique_ptr<std::thread[]> threads =
nullptr;
45 std::atomic<bool> waiting =
false;
47 void create_threads() {
49 for(concurrency_t i = 0; i < thread_count; ++i) threads[i] = std::thread(&thread_pool::worker,
this);
52 void destroy_threads() {
54 task_available_cv.notify_all();
55 for(concurrency_t i = 0; i < thread_count; ++i) threads[i].join();
60 std::function<void()> task;
61 std::unique_lock<std::mutex> tasks_lock(tasks_mutex);
62 task_available_cv.wait(tasks_lock, [&] {
return !tasks.empty() || !running; });
64 task = std::move(tasks.front());
69 if(waiting) task_done_cv.notify_one();
75 explicit thread_pool(
const concurrency_t thread_count_ = std::thread::hardware_concurrency())
76 : thread_count(thread_count_ > 1 ? thread_count_ : 1), threads(std::make_unique<std::thread[]>(thread_count)) { create_threads(); }
84 const std::scoped_lock tasks_lock(tasks_mutex);
89 const std::scoped_lock tasks_lock(tasks_mutex);
90 return tasks_total - tasks.size();
97 template<
typename F,
typename...
A>
100 const std::scoped_lock tasks_lock(tasks_mutex);
101 if constexpr(
sizeof...(args) == 0)
102 tasks.push(std::function<
void()>(task));
104 tasks.push(std::function<
void()>([task, args...] { task(args...); }));
107 task_available_cv.notify_one();
110 void reset(
const concurrency_t thread_count_ = std::thread::hardware_concurrency()) {
111 const bool was_paused =
paused;
115 thread_count = thread_count_ > 1 ? thread_count_ : 1;
116 threads = std::make_unique<std::thread[]>(thread_count);
121 template<
typename F,
typename...
A,
typename R = std::invoke_result_t<std::decay_t<F>, std::decay_t<A>...>>
122 std::future<R>
submit(
const F& task,
const A&... args) {
123 std::shared_ptr<std::promise<R>> task_promise = std::make_shared<std::promise<R>>();
125 [task, args..., task_promise] {
127 if constexpr(std::is_void_v<R>) {
129 task_promise->set_value();
132 task_promise->set_value(task(args...));
137 task_promise->set_exception(std::current_exception());
143 return task_promise->get_future();
148 std::unique_lock<std::mutex> tasks_lock(tasks_mutex);
149 task_done_cv.wait(tasks_lock, [
this] {
return (tasks_total == (
paused ? tasks.size() : 0)); });
void wait_for_tasks()
Definition: thread_pool.hpp:146
size_t get_tasks_running() const
Definition: thread_pool.hpp:88
Definition: thread_pool.hpp:26
void reset(const concurrency_t thread_count_=std::thread::hardware_concurrency())
Definition: thread_pool.hpp:110
std::future< R > submit(const F &task, const A &... args)
Definition: thread_pool.hpp:122
size_t get_tasks_total() const
Definition: thread_pool.hpp:93
thread_pool(const concurrency_t thread_count_=std::thread::hardware_concurrency())
Definition: thread_pool.hpp:75
void push_task(const F &task, const A &... args)
Definition: thread_pool.hpp:98
size_t get_tasks_queued() const
Definition: thread_pool.hpp:83
concurrency_t get_thread_count() const
Definition: thread_pool.hpp:95
std::atomic< bool > paused
Definition: thread_pool.hpp:153
~thread_pool()
Definition: thread_pool.hpp:78