1.C++11中引入了lambada表达式,很好的支持异步编程
2.C++11中引入了std::thread,可以很方便的构建线程,更方便的可移植特性
3.C++11中引入了std::mutex,可以很方便的构建线程锁互斥访问,更方便的可移植特性
4.C++11中引入了std::condition_variable,可以不依赖于win32 api实现自己的消费者生产者模型
5.利用改进版本的shared_ptr,可以很好的解决多线程生命周期的棘手问题
1 /************************************************************************/
2 /* */
3 /************************************************************************/
4
5 #ifndef __CARBON_THREAD_POOL_H
6 #define __CARBON_THREAD_POOL_H
7
8 #include <vector>
9 #include <memory>
10 #include <thread>
11 #include <mutex>
12 #include <condition_variable>
13 #include <future>
14 #include <functional>
15 #include <stdexcept>
16 #include <string>
17 #include <sstream>
18 #include <deque>
19
20 namespace CARBON {
21
22 //************************************
23 // Method: Create
24 // Returns: std::shared_ptr
25 // Qualifier: 用于创建智能指针实例
26 // Parameter: args, 可变参数,接受任意个数的参数,传递给T的构造函数
27 //************************************
28 template<typename T, typename... ARG>
29 std::shared_ptr<T> Create(ARG&&... args)
30 {
31 struct TEnableShared : public T
32 {
33 TEnableShared(ARG&&... args)
34 : T(std::forward<ARG>(args)...)
35 {}
36 };
37
38 return std::make_shared<TEnableShared>(std::forward<ARG>(args)...);
39 }
40
41 class ThreadPool : public std::enable_shared_from_this<ThreadPool>
42 {
43 protected:
44 ThreadPool()
45 : _stop(false)
46 {}
47
48 virtual ~ThreadPool()
49 {
50 {
51 std::unique_lock<std::mutex> lock(_lock);
52 _stop = true;
53 }
54 _condition.notify_all();
55 for (std::thread &worker : _workers)
56 worker.join();
57 }
58
59 public:
60 // initialize thread pool with number of threads
61 bool InitializePool(size_t threads)
62 {
63 if (!_workers.empty()) return true;
64
65 for (size_t i = 0; i < threads; ++i)
66 {
67 std::weak_ptr<ThreadPool> _wtp = this->shared_from_this();
68 auto th = [](std::weak_ptr<ThreadPool> wtp) {
69 for (;;)
70 {
71 std::function<void()> task;
72
73 {
74 std::shared_ptr<ThreadPool> stp = wtp.lock();
75 if (!stp)
76 return;
77
78 std::unique_lock<std::mutex> lock(stp->_lock);
79 auto shipment = [&] ()->bool { return stp->_stop || !stp->_tasks.empty(); };
80 stp->_condition.wait(lock, shipment);
81 if (stp->_stop)
82 return;
83 if (stp->_tasks.empty())
84 continue;
85 task = std::move(stp->_tasks.front()).task;
86 stp->_tasks.pop_front();
87 }
88
89 task();
90 }
91 };
92 _workers.emplace_back(th, _wtp);
93 }
94
95 return !_workers.empty();
96 }
97
98 //************************************
99 // Method: EnqueueTask
100 // Returns: std::future, 值类型由functor f指定
101 // Qualifier: 可以借由返回的std::future获取结果,但是更建议在functor中做异步通知
102 // Parameter: taskid 用于接受任务的id描述
103 // Parameter: functor f, 函数对象,用于执行任务
104 // Parameter: args, 可变参数,接受任意个数的参数,传递给functor f
105 //************************************
106 template<class F, class... Args>
107 auto EnqueueTask(std::string& taskid, F&& f, Args&&... args)
108 ->std::future<typename std::result_of<F(Args...)>::type>
109 {
110 if (_workers.empty())
111 throw std::runtime_error("ThreadPool not initialized yet");
112
113 using return_type = typename std::result_of<F(Args...)>::type;
114
115 auto task = std::make_shared<std::packaged_task<return_type()>>(
116 std::bind(std::forward<F>(f), std::forward<Args>(args)...)
117 );
118
119 std::future<return_type> res = task->get_future();
120 {
121 std::unique_lock<std::mutex> lock(_lock);
122
123 // don't allow enqueueing after stopping the pool
124 if (_stop)
125 throw std::runtime_error("enqueue on stopped ThreadPool");
126
127 stThreadTask st;
128 std::stringstream ss;
129 ss << (void*)task.get();
130 ss >> taskid;
131 st.taskid = taskid;
132 st.task = [task]() { (*task)(); };
133 _tasks.push_back(st);
134 }
135 _condition.notify_one();
136 return res;
137 }
138
139 //************************************
140 // Method: GetTasksSize
141 // Returns: size_t
142 // Qualifier: 获取等待任务队列的任务数,正在执行的任务已经弹出队列,所以不参与计算
143 //************************************
144 size_t GetTasksSize()
145 {
146 std::unique_lock<std::mutex> lock(_lock);
147 return _tasks.size();
148 }
149
150 //************************************
151 // Method: RemoveTask
152 // Returns: bool, 找到任务并移除则返回true,否则返回false
153 // Qualifier: 正在执行的任务已经弹出任务队列,应该在其它地方通知任务退出
154 // Qualifier: 执行完成的任务已经弹出任务队列,无法移除不存在的任务
155 // Qualifier: 该接口只能移除处在等待中的任务
156 // Parameter: taskid是任务的唯一标示,由EnqueueTask返回
157 //************************************
158 bool RemoveTask(const std::string& taskid)
159 {
160 std::unique_lock<std::mutex> lock(_lock);
161 for (auto& t = _tasks.begin(); t != _tasks.end(); ++t)
162 {
163 if (taskid == t->taskid)
164 {
165 _tasks.erase(t);
166 return true;
167 }
168 }
169
170 return false;
171 }
172
173 private:
174 typedef struct stThreadTask
175 {
176 std::function<void()> task;
177 std::string taskid;
178 }stThreadTask;
179
180 // need to keep track of threads so we can join them
181 std::vector< std::thread > _workers;
182 // the task queue
183 std::deque< stThreadTask > _tasks;
184
185 // synchronization
186 std::mutex _lock;
187 std::condition_variable _condition;
188 bool _stop;
189 };
190 }
191
192 #endif
使用enable_shared_from_this来确保内部线程访问指针时,不会因为指针失效造成的非法访问
weak_ptr很好的保证了ThreadPool的生命周期安全性和实效性
由于使用了share_from_this,将初始化代码整体拿出来放到InitializePool中实现
ThreadPool的构造函数和析构函数声明为protected,用于保证外部不要直接生成ThreadPool实例
应该使用Create函数来生成ThreadPool实例
测试代码如下:
1 namespace {
2 std::condition_variable _exit_cv;
3 }
4
5 void func(int n)
6 {
7 std::cout << "func with n " << n << std::endl;
8 }
9
10 using CARBON::ThreadPool;
11
12 std::string taskid;
13 std::shared_ptr<ThreadPool> stp = CARBON::Create<ThreadPool>();
14 std::weak_ptr<ThreadPool> _wtp = stp;
15 stp->InitializePool(2);
16
17
18 stp->EnqueueTask(taskid, [](std::function<void(int)> cbf, std::weak_ptr<ThreadPool> wtp) ->int {
19 std::cout << "task1\n";
20
21 for (int i = 0; i < 5; ++i) {
22 std::mutex mtx;
23 std::unique_lock<std::mutex> lck(mtx);
24 if(_exit_cv.wait_for(lck, std::chrono::milliseconds(400)) == std::cv_status::no_timeout)
25 break;
26
27 if (cbf) cbf(i);
28 if (wtp.expired())
29 break;
30 }
31
32 return 5;
33 }, func, _wtp);
当需要中断线程执行时,应该在外部通知线程中的任务自行退出
例子中可以在主线程中这么做
_exit_cv.notify_all();
_exit_cv用于模拟sleep操作func用于模拟任务结果的异步通知,这里为了省事使用了函数指针,实际工作中应该使用functor来传递,以保证生命周期的有效性比如std::bind和shared_ptr一起构造的functor对象
原文链接: https://www.cnblogs.com/jojodru/p/6675200.html
欢迎关注
微信关注下方公众号,第一时间获取干货硬货;公众号内回复【pdf】免费获取数百本计算机经典书籍
原创文章受到原创版权保护。转载请注明出处:https://www.ccppcoding.com/archives/252100
非原创文章文中已经注明原地址,如有侵权,联系删除
关注公众号【高性能架构探索】,第一时间获取最新文章
转载文章受原作者版权保护。转载请注明原作者出处!