yarp-devices
Loading...
Searching...
No Matches
ctpl_stl.h
1/*********************************************************
2*
3* Copyright (C) 2014 by Vitaliy Vitsentiy
4*
5* Licensed under the Apache License, Version 2.0 (the "License");
6* you may not use this file except in compliance with the License.
7* You may obtain a copy of the License at
8*
9* http://www.apache.org/licenses/LICENSE-2.0
10*
11* Unless required by applicable law or agreed to in writing, software
12* distributed under the License is distributed on an "AS IS" BASIS,
13* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14* See the License for the specific language governing permissions and
15* limitations under the License.
16*
17*********************************************************/
18
19
20#ifndef __ctpl_stl_thread_pool_H__
21#define __ctpl_stl_thread_pool_H__
22
23#include <functional>
24#include <thread>
25#include <atomic>
26#include <vector>
27#include <memory>
28#include <exception>
29#include <future>
30#include <mutex>
31#include <queue>
32
33
34
35// thread pool to run user's functors with signature
36// ret func(int id, other_params)
37// where id is the index of the thread that runs the functor
38// ret is some return type
39
40
41namespace ctpl {
42
43 namespace detail {
44 template <typename T>
45 class Queue {
46 public:
47 bool push(T const & value) {
48 std::unique_lock<std::mutex> lock(this->mutex);
49 this->q.push(value);
50 return true;
51 }
52 // deletes the retrieved element, do not use for non integral types
53 bool pop(T & v) {
54 std::unique_lock<std::mutex> lock(this->mutex);
55 if (this->q.empty())
56 return false;
57 v = this->q.front();
58 this->q.pop();
59 return true;
60 }
61 bool empty() {
62 std::unique_lock<std::mutex> lock(this->mutex);
63 return this->q.empty();
64 }
65 private:
66 std::queue<T> q;
67 std::mutex mutex;
68 };
69 }
70
72
73 public:
74
75 thread_pool() { this->init(); }
76 thread_pool(int nThreads) { this->init(); this->resize(nThreads); }
77
78 // the destructor waits for all the functions in the queue to be finished
79 ~thread_pool() {
80 this->stop(true);
81 }
82
83 // get the number of running threads in the pool
84 int size() { return static_cast<int>(this->threads.size()); }
85
86 // number of idle threads
87 int n_idle() { return this->nWaiting; }
88 std::thread & get_thread(int i) { return *this->threads[i]; }
89
90 // change the number of threads in the pool
91 // should be called from one thread, otherwise be careful to not interleave, also with this->stop()
92 // nThreads must be >= 0
93 void resize(int nThreads) {
94 if (!this->isStop && !this->isDone) {
95 int oldNThreads = static_cast<int>(this->threads.size());
96 if (oldNThreads <= nThreads) { // if the number of threads is increased
97 this->threads.resize(nThreads);
98 this->flags.resize(nThreads);
99
100 for (int i = oldNThreads; i < nThreads; ++i) {
101 this->flags[i] = std::make_shared<std::atomic<bool>>(false);
102 this->set_thread(i);
103 }
104 }
105 else { // the number of threads is decreased
106 for (int i = oldNThreads - 1; i >= nThreads; --i) {
107 *this->flags[i] = true; // this thread will finish
108 this->threads[i]->detach();
109 }
110 {
111 // stop the detached threads that were waiting
112 std::unique_lock<std::mutex> lock(this->mutex);
113 this->cv.notify_all();
114 }
115 this->threads.resize(nThreads); // safe to delete because the threads are detached
116 this->flags.resize(nThreads); // safe to delete because the threads have copies of shared_ptr of the flags, not originals
117 }
118 }
119 }
120
121 // empty the queue
122 void clear_queue() {
123 std::function<void(int id)> * _f;
124 while (this->q.pop(_f))
125 delete _f; // empty the queue
126 }
127
128 // pops a functional wrapper to the original function
129 std::function<void(int)> pop() {
130 std::function<void(int id)> * _f = nullptr;
131 this->q.pop(_f);
132 std::unique_ptr<std::function<void(int id)>> func(_f); // at return, delete the function even if an exception occurred
133 std::function<void(int)> f;
134 if (_f)
135 f = *_f;
136 return f;
137 }
138
139 // wait for all computing threads to finish and stop all threads
140 // may be called asynchronously to not pause the calling thread while waiting
141 // if isWait == true, all the functions in the queue are run, otherwise the queue is cleared without running the functions
142 void stop(bool isWait = false) {
143 if (!isWait) {
144 if (this->isStop)
145 return;
146 this->isStop = true;
147 for (int i = 0, n = this->size(); i < n; ++i) {
148 *this->flags[i] = true; // command the threads to stop
149 }
150 this->clear_queue(); // empty the queue
151 }
152 else {
153 if (this->isDone || this->isStop)
154 return;
155 this->isDone = true; // give the waiting threads a command to finish
156 }
157 {
158 std::unique_lock<std::mutex> lock(this->mutex);
159 this->cv.notify_all(); // stop all waiting threads
160 }
161 for (int i = 0; i < static_cast<int>(this->threads.size()); ++i) { // wait for the computing threads to finish
162 if (this->threads[i]->joinable())
163 this->threads[i]->join();
164 }
165 // if there were no threads in the pool but some functors in the queue, the functors are not deleted by the threads
166 // therefore delete them here
167 this->clear_queue();
168 this->threads.clear();
169 this->flags.clear();
170 }
171
172 template<typename F, typename... Rest>
173 auto push(F && f, Rest&&... rest) ->std::future<decltype(f(0, rest...))> {
174 auto pck = std::make_shared<std::packaged_task<decltype(f(0, rest...))(int)>>(
175 std::bind(std::forward<F>(f), std::placeholders::_1, std::forward<Rest>(rest)...)
176 );
177 auto _f = new std::function<void(int id)>([pck](int id) {
178 (*pck)(id);
179 });
180 this->q.push(_f);
181 std::unique_lock<std::mutex> lock(this->mutex);
182 this->cv.notify_one();
183 return pck->get_future();
184 }
185
186 // run the user's function that excepts argument int - id of the running thread. returned value is templatized
187 // operator returns std::future, where the user can get the result and rethrow the catched exceptins
188 template<typename F>
189 auto push(F && f) ->std::future<decltype(f(0))> {
190 auto pck = std::make_shared<std::packaged_task<decltype(f(0))(int)>>(std::forward<F>(f));
191 auto _f = new std::function<void(int id)>([pck](int id) {
192 (*pck)(id);
193 });
194 this->q.push(_f);
195 std::unique_lock<std::mutex> lock(this->mutex);
196 this->cv.notify_one();
197 return pck->get_future();
198 }
199
200
201 private:
202
203 // deleted
204 thread_pool(const thread_pool &);// = delete;
205 thread_pool(thread_pool &&);// = delete;
206 thread_pool & operator=(const thread_pool &);// = delete;
207 thread_pool & operator=(thread_pool &&);// = delete;
208
209 void set_thread(int i) {
210 std::shared_ptr<std::atomic<bool>> flag(this->flags[i]); // a copy of the shared ptr to the flag
211 auto f = [this, i, flag/* a copy of the shared ptr to the flag */]() {
212 std::atomic<bool> & _flag = *flag;
213 std::function<void(int id)> * _f;
214 bool isPop = this->q.pop(_f);
215 while (true) {
216 while (isPop) { // if there is anything in the queue
217 std::unique_ptr<std::function<void(int id)>> func(_f); // at return, delete the function even if an exception occurred
218 (*_f)(i);
219 if (_flag)
220 return; // the thread is wanted to stop, return even if the queue is not empty yet
221 else
222 isPop = this->q.pop(_f);
223 }
224 // the queue is empty here, wait for the next command
225 std::unique_lock<std::mutex> lock(this->mutex);
226 ++this->nWaiting;
227 this->cv.wait(lock, [this, &_f, &isPop, &_flag](){ isPop = this->q.pop(_f); return isPop || this->isDone || _flag; });
228 --this->nWaiting;
229 if (!isPop)
230 return; // if the queue is empty and this->isDone == true or *flag then return
231 }
232 };
233 this->threads[i].reset(new std::thread(f)); // compiler may not support std::make_unique()
234 }
235
236 void init() { this->nWaiting = 0; this->isStop = false; this->isDone = false; }
237
238 std::vector<std::unique_ptr<std::thread>> threads;
239 std::vector<std::shared_ptr<std::atomic<bool>>> flags;
240 detail::Queue<std::function<void(int id)> *> q;
241 std::atomic<bool> isDone;
242 std::atomic<bool> isStop;
243 std::atomic<int> nWaiting; // how many threads are waiting
244
245 std::mutex mutex;
246 std::condition_variable cv;
247 };
248
249}
250
251#endif // __ctpl_stl_thread_pool_H__
Definition ctpl_stl.h:45
Definition ctpl_stl.h:71