suanPan
thread_pool.hpp
Go to the documentation of this file.
1 #pragma once
2 
3 #include <atomic>
4 #include <chrono>
5 #include <condition_variable>
6 #include <exception>
7 #include <functional>
8 #include <future>
9 #include <iostream>
10 #include <memory>
11 #include <mutex>
12 #include <queue>
13 #include <thread>
14 #include <type_traits>
15 #include <utility>
16 #include <vector>
17 
26 class thread_pool {
27  using concurrency_t = std::invoke_result_t<decltype(std::thread::hardware_concurrency)>;
28 
29  std::atomic<bool> running = false;
30 
31  std::condition_variable task_available_cv = {};
32 
33  std::condition_variable task_done_cv = {};
34 
35  std::queue<std::function<void()>> tasks = {};
36 
37  std::atomic<size_t> tasks_total = 0;
38 
39  mutable std::mutex tasks_mutex = {};
40 
41  concurrency_t thread_count = 0;
42 
43  std::unique_ptr<std::thread[]> threads = nullptr;
44 
45  std::atomic<bool> waiting = false;
46 
47  void create_threads() {
48  running = true;
49  for(concurrency_t i = 0; i < thread_count; ++i) threads[i] = std::thread(&thread_pool::worker, this);
50  }
51 
52  void destroy_threads() {
53  running = false;
54  task_available_cv.notify_all();
55  for(concurrency_t i = 0; i < thread_count; ++i) threads[i].join();
56  }
57 
58  void worker() {
59  while(running) {
60  std::function<void()> task;
61  std::unique_lock<std::mutex> tasks_lock(tasks_mutex);
62  task_available_cv.wait(tasks_lock, [&] { return !tasks.empty() || !running; });
63  if(running && !paused) {
64  task = std::move(tasks.front());
65  tasks.pop();
66  tasks_lock.unlock();
67  task();
68  --tasks_total;
69  if(waiting) task_done_cv.notify_one();
70  }
71  }
72  }
73 
74 public:
75  explicit thread_pool(const concurrency_t thread_count_ = std::thread::hardware_concurrency())
76  : thread_count(thread_count_ > 1 ? thread_count_ : 1)
77  , threads(std::make_unique<std::thread[]>(thread_count)) { create_threads(); }
78 
81  destroy_threads();
82  }
83 
84  size_t get_tasks_queued() const {
85  const std::scoped_lock tasks_lock(tasks_mutex);
86  return tasks.size();
87  }
88 
89  size_t get_tasks_running() const {
90  const std::scoped_lock tasks_lock(tasks_mutex);
91  return tasks_total - tasks.size();
92  }
93 
94  size_t get_tasks_total() const { return tasks_total; }
95 
96  concurrency_t get_thread_count() const { return thread_count; }
97 
98  template<typename F, typename... A> void push_task(const F& task, const A&... args) {
99  {
100  const std::scoped_lock tasks_lock(tasks_mutex);
101  if constexpr(sizeof...(args) == 0) tasks.push(std::function<void()>(task));
102  else tasks.push(std::function<void()>([task, args...] { task(args...); }));
103  }
104  ++tasks_total;
105  task_available_cv.notify_one();
106  }
107 
108  void reset(const concurrency_t thread_count_ = std::thread::hardware_concurrency()) {
109  const bool was_paused = paused;
110  paused = true;
111  wait_for_tasks();
112  destroy_threads();
113  thread_count = thread_count_ > 1 ? thread_count_ : 1;
114  threads = std::make_unique<std::thread[]>(thread_count);
115  paused = was_paused;
116  create_threads();
117  }
118 
119  template<typename F, typename... A, typename R = std::invoke_result_t<std::decay_t<F>, std::decay_t<A>...>> std::future<R> submit(const F& task, const A&... args) {
120  std::shared_ptr<std::promise<R>> task_promise = std::make_shared<std::promise<R>>();
121  push_task(
122  [task, args..., task_promise] {
123  try {
124  if constexpr(std::is_void_v<R>) {
125  task(args...);
126  task_promise->set_value();
127  }
128  else { task_promise->set_value(task(args...)); }
129  }
130  catch(...) {
131  try { task_promise->set_exception(std::current_exception()); }
132  catch(...) { }
133  }
134  });
135  return task_promise->get_future();
136  }
137 
138  void wait_for_tasks() {
139  waiting = true;
140  std::unique_lock<std::mutex> tasks_lock(tasks_mutex);
141  task_done_cv.wait(tasks_lock, [this] { return (tasks_total == (paused ? tasks.size() : 0)); });
142  waiting = false;
143  }
144 
145  std::atomic<bool> paused = false;
146 };
Definition: thread_pool.hpp:26
size_t get_tasks_running() const
Definition: thread_pool.hpp:89
std::atomic< bool > paused
Definition: thread_pool.hpp:145
std::future< R > submit(const F &task, const A &... args)
Definition: thread_pool.hpp:119
void wait_for_tasks()
Definition: thread_pool.hpp:138
size_t get_tasks_queued() const
Definition: thread_pool.hpp:84
size_t get_tasks_total() const
Definition: thread_pool.hpp:94
thread_pool(const concurrency_t thread_count_=std::thread::hardware_concurrency())
Definition: thread_pool.hpp:75
void push_task(const F &task, const A &... args)
Definition: thread_pool.hpp:98
void reset(const concurrency_t thread_count_=std::thread::hardware_concurrency())
Definition: thread_pool.hpp:108
~thread_pool()
Definition: thread_pool.hpp:79
concurrency_t get_thread_count() const
Definition: thread_pool.hpp:96