suanPan
thread_pool.hpp
Go to the documentation of this file.
1 #pragma once
2 
3 #include <atomic>
4 #include <chrono>
5 #include <condition_variable>
6 #include <exception>
7 #include <functional>
8 #include <future>
9 #include <iostream>
10 #include <memory>
11 #include <mutex>
12 #include <queue>
13 #include <thread>
14 #include <type_traits>
15 #include <utility>
16 #include <vector>
17 
26 class thread_pool {
27  using concurrency_t = std::invoke_result_t<decltype(std::thread::hardware_concurrency)>;
28 
29  std::atomic<bool> running = false;
30 
31  std::condition_variable task_available_cv = {};
32 
33  std::condition_variable task_done_cv = {};
34 
35  std::queue<std::function<void()>> tasks = {};
36 
37  std::atomic<size_t> tasks_total = 0;
38 
39  mutable std::mutex tasks_mutex = {};
40 
41  concurrency_t thread_count = 0;
42 
43  std::unique_ptr<std::thread[]> threads = nullptr;
44 
45  std::atomic<bool> waiting = false;
46 
47  void create_threads() {
48  running = true;
49  for(concurrency_t i = 0; i < thread_count; ++i) threads[i] = std::thread(&thread_pool::worker, this);
50  }
51 
52  void destroy_threads() {
53  running = false;
54  task_available_cv.notify_all();
55  for(concurrency_t i = 0; i < thread_count; ++i) threads[i].join();
56  }
57 
58  void worker() {
59  while(running) {
60  std::function<void()> task;
61  std::unique_lock<std::mutex> tasks_lock(tasks_mutex);
62  task_available_cv.wait(tasks_lock, [&] { return !tasks.empty() || !running; });
63  if(running && !paused) {
64  task = std::move(tasks.front());
65  tasks.pop();
66  tasks_lock.unlock();
67  task();
68  --tasks_total;
69  if(waiting) task_done_cv.notify_one();
70  }
71  }
72  }
73 
74 public:
75  explicit thread_pool(const concurrency_t thread_count_ = std::thread::hardware_concurrency())
76  : thread_count(thread_count_ > 1 ? thread_count_ : 1), threads(std::make_unique<std::thread[]>(thread_count)) { create_threads(); }
77 
80  destroy_threads();
81  }
82 
83  size_t get_tasks_queued() const {
84  const std::scoped_lock tasks_lock(tasks_mutex);
85  return tasks.size();
86  }
87 
88  size_t get_tasks_running() const {
89  const std::scoped_lock tasks_lock(tasks_mutex);
90  return tasks_total - tasks.size();
91  }
92 
93  size_t get_tasks_total() const { return tasks_total; }
94 
95  concurrency_t get_thread_count() const { return thread_count; }
96 
97  template<typename F, typename... A>
98  void push_task(const F& task, const A&... args) {
99  {
100  const std::scoped_lock tasks_lock(tasks_mutex);
101  if constexpr(sizeof...(args) == 0)
102  tasks.push(std::function<void()>(task));
103  else
104  tasks.push(std::function<void()>([task, args...] { task(args...); }));
105  }
106  ++tasks_total;
107  task_available_cv.notify_one();
108  }
109 
110  void reset(const concurrency_t thread_count_ = std::thread::hardware_concurrency()) {
111  const bool was_paused = paused;
112  paused = true;
113  wait_for_tasks();
114  destroy_threads();
115  thread_count = thread_count_ > 1 ? thread_count_ : 1;
116  threads = std::make_unique<std::thread[]>(thread_count);
117  paused = was_paused;
118  create_threads();
119  }
120 
121  template<typename F, typename... A, typename R = std::invoke_result_t<std::decay_t<F>, std::decay_t<A>...>>
122  std::future<R> submit(const F& task, const A&... args) {
123  std::shared_ptr<std::promise<R>> task_promise = std::make_shared<std::promise<R>>();
124  push_task(
125  [task, args..., task_promise] {
126  try {
127  if constexpr(std::is_void_v<R>) {
128  task(args...);
129  task_promise->set_value();
130  }
131  else {
132  task_promise->set_value(task(args...));
133  }
134  }
135  catch(...) {
136  try {
137  task_promise->set_exception(std::current_exception());
138  }
139  catch(...) {
140  }
141  }
142  });
143  return task_promise->get_future();
144  }
145 
146  void wait_for_tasks() {
147  waiting = true;
148  std::unique_lock<std::mutex> tasks_lock(tasks_mutex);
149  task_done_cv.wait(tasks_lock, [this] { return (tasks_total == (paused ? tasks.size() : 0)); });
150  waiting = false;
151  }
152 
153  std::atomic<bool> paused = false;
154 };
void wait_for_tasks()
Definition: thread_pool.hpp:146
size_t get_tasks_running() const
Definition: thread_pool.hpp:88
Definition: thread_pool.hpp:26
void reset(const concurrency_t thread_count_=std::thread::hardware_concurrency())
Definition: thread_pool.hpp:110
std::future< R > submit(const F &task, const A &... args)
Definition: thread_pool.hpp:122
size_t get_tasks_total() const
Definition: thread_pool.hpp:93
thread_pool(const concurrency_t thread_count_=std::thread::hardware_concurrency())
Definition: thread_pool.hpp:75
void push_task(const F &task, const A &... args)
Definition: thread_pool.hpp:98
size_t get_tasks_queued() const
Definition: thread_pool.hpp:83
concurrency_t get_thread_count() const
Definition: thread_pool.hpp:95
std::atomic< bool > paused
Definition: thread_pool.hpp:153
~thread_pool()
Definition: thread_pool.hpp:78