代码从github上拷的,写了一些理解,如有错误请指正
Threadpool.h
1 #ifndef THREAD_POOL_H
2 #define THREAD_POOL_H
3
4 #include <vector>
5 #include <queue>
6 #include <memory>
7 #include <thread>
8 #include <mutex>
9 #include <condition_variable>
10 #include <future>
11 #include <functional>
12 #include <stdexcept>
13
14 class ThreadPool {
15 public:
16 ThreadPool(size_t);
17 template<class F, class... Args>
18 auto enqueue(F&& f, Args&&... args)
19 -> std::future<typename std::result_of<F(Args...)>::type>;
20 ~ThreadPool();
21 private:
22 // need to keep track of threads so we can join them
23 std::vector< std::thread > workers;
24 // the task queue
25 std::queue< std::function<void()> > tasks;
26
27 // synchronization
28 std::mutex queue_mutex;
29 std::condition_variable condition;
30 bool stop;
31 };
32
33 // the constructor just launches some amount of workers
34 inline ThreadPool::ThreadPool(size_t threads)
35 : stop(false)
36 {
37 for(size_t i = 0;i<threads;++i)
38 workers.emplace_back( //这里一下启动threads个,即使lambda阻塞(在已启动子线程内的阻塞),主线程还是会循环
39 [this]
40 {
41 for(;;)
42 {
43 std::function<void()> task;
44
45 {
46 std::unique_lock<std::mutex> lock(this->queue_mutex);
47
48 //当前线程被阻塞, 直到condition.notify_one()调用,如果lambda返回false,wait会解锁互斥元lock并置阻塞或等待状态,如果条件满足互斥元仍被锁定
49 //而这里锁用的是std::unique_lock而不是std::lock_guard,是因为std::lock_guard不能在wait等待中解锁,并在之后重新锁定
50 //如果互斥元在线程休眠期间始终被锁定,enqueue就无法锁定互斥元往下执行,则造成死锁
51 this->condition.wait(lock,
52 [this]{ return this->stop || !this->tasks.empty(); });
53
54 if(this->stop && this->tasks.empty())
55 return;
56 task = std::move(this->tasks.front());
57 this->tasks.pop();
58 }
59
60 task();
61
62 }
63 }
64 );
65 }
66
67 // add new work item to the pool //函数后跟throw()代表不抛出任何异常,跟thorw(...)代表可以抛出任何异常
68 template<class F, class... Args> //... Args这个代表不限类型,不限数量
69 auto ThreadPool::enqueue(F&& f, Args&&... args) //&&代表右值引用(可以用常量做参数)
70 -> std::future<typename std::result_of<F(Args...)>::type> //<F(Args...)>代表这个是个返回类型为F,参数不确定多的函数;放入类型(如int)会报错
71 {
72 using return_type = std::result_of<F(Args...)>::type; //std::result_of::type 获得函数返回类型,直接用decltype会报错
73
74 auto task = std::make_shared< std::packaged_task<return_type()> >(
75 std::bind(std::forward<F>(f), std::forward<Args>(args)...)
76 );
77
78 std::future<return_type> res = task->get_future();
79 { //这加一个作用域的作用是出了这个作用域就解锁
80 std::unique_lock<std::mutex> lock(queue_mutex);
81
82 // don't allow enqueueing after stopping the pool
83 if(stop)
84 throw std::runtime_error("enqueue on stopped ThreadPool");
85
86 tasks.emplace([task](){ (*task)(); });
87 }
88 condition.notify_one(); ////选择一个wait状态的线程进行唤醒,并使他获得对象上的锁来完成任务(即其他线程无法访问对象)
89 return res;
90 }
91
92 // the destructor joins all threads
93 inline ThreadPool::~ThreadPool()
94 {
95 {
96 std::unique_lock<std::mutex> lock(queue_mutex);
97 stop = true;
98 }
99 condition.notify_all(); //通知所有wait状态的线程竞争对象的控制权,唤醒所有线程执行
100 for(std::thread &worker: workers)
101 worker.join();
102 }
103
104 #endif
运行代码
1 #include "stdafx.h"
2 #include <iostream>
3 #include <vector>
4 #include <chrono>
5
6 #include "ThreadPool.h"
7
8 int main()
9 {
10
11 ThreadPool pool(4);
12 std::vector< std::future<int> > results;
13
14 for(int i = 0; i < 8; ++i) {
15 results.emplace_back(
16 pool.enqueue([i] {
17 std::cout << "hello " << i << std::endl;
18 std::this_thread::sleep_for(std::chrono::seconds(1));
19 std::cout << "world " << i << std::endl;
20 return i*i;
21 })
22 );
23 }
24
25 for(auto && result: results)
26 std::cout << result.get() << ' ';
27 std::cout << std::endl;
28 system("pause");
29 return 0;
30 }
原文链接: https://www.cnblogs.com/wangshaowei/p/8884290.html
欢迎关注
微信关注下方公众号,第一时间获取干货硬货;公众号内回复【pdf】免费获取数百本计算机经典书籍
原创文章受到原创版权保护。转载请注明出处:https://www.ccppcoding.com/archives/272625
非原创文章文中已经注明原地址,如有侵权,联系删除
关注公众号【高性能架构探索】,第一时间获取最新文章
转载文章受原作者版权保护。转载请注明原作者出处!