c++11之100行实现简单线程池

代码从github上拷的,写了一些理解,如有错误请指正

Threadpool.h

  1 #ifndef THREAD_POOL_H
  2 #define THREAD_POOL_H
  3 
  4 #include <vector>
  5 #include <queue>
  6 #include <memory>
  7 #include <thread>
  8 #include <mutex>
  9 #include <condition_variable>
 10 #include <future>
 11 #include <functional>
 12 #include <stdexcept>
 13 
 14 class ThreadPool {
 15 public:
 16     ThreadPool(size_t);
 17     template<class F, class... Args>
 18     auto enqueue(F&& f, Args&&... args) 
 19         -> std::future<typename std::result_of<F(Args...)>::type>;
 20     ~ThreadPool();
 21 private:
 22     // need to keep track of threads so we can join them
 23     std::vector< std::thread > workers;
 24     // the task queue
 25     std::queue< std::function<void()> > tasks;
 26     
 27     // synchronization
 28     std::mutex queue_mutex;
 29     std::condition_variable condition;
 30     bool stop;
 31 };
 32  
 33 // the constructor just launches some amount of workers
 34 inline ThreadPool::ThreadPool(size_t threads)
 35     : stop(false)
 36 {
 37     for(size_t i = 0;i<threads;++i)
 38         workers.emplace_back(                //这里一下启动threads个,即使lambda阻塞(在已启动子线程内的阻塞),主线程还是会循环
 39             [this]
 40             {
 41                 for(;;)
 42                 {
 43                     std::function<void()> task;
 44 
 45                     {
 46                         std::unique_lock<std::mutex> lock(this->queue_mutex);
 47                         
 48                         //当前线程被阻塞, 直到condition.notify_one()调用,如果lambda返回false,wait会解锁互斥元lock并置阻塞或等待状态,如果条件满足互斥元仍被锁定
 49                         //而这里锁用的是std::unique_lock而不是std::lock_guard,是因为std::lock_guard不能在wait等待中解锁,并在之后重新锁定
 50                         //如果互斥元在线程休眠期间始终被锁定,enqueue就无法锁定互斥元往下执行,则造成死锁
 51                         this->condition.wait(lock,                                
 52                             [this]{ return this->stop || !this->tasks.empty(); });    
 53                         
 54                         if(this->stop && this->tasks.empty())
 55                             return;
 56                         task = std::move(this->tasks.front());
 57                         this->tasks.pop();
 58                     }
 59 
 60                     task();
 61                     
 62                 }
 63             }
 64         );
 65 }
 66 
 67 // add new work item to the pool            //函数后跟throw()代表不抛出任何异常,跟thorw(...)代表可以抛出任何异常
 68 template<class F, class... Args>            //... Args这个代表不限类型,不限数量
 69 auto ThreadPool::enqueue(F&& f, Args&&... args)                    //&&代表右值引用(可以用常量做参数)
 70     -> std::future<typename std::result_of<F(Args...)>::type>            //<F(Args...)>代表这个是个返回类型为F,参数不确定多的函数;放入类型(如int)会报错
 71 {
 72     using return_type = std::result_of<F(Args...)>::type;                //std::result_of::type 获得函数返回类型,直接用decltype会报错
 73 
 74     auto task = std::make_shared< std::packaged_task<return_type()> >(
 75             std::bind(std::forward<F>(f), std::forward<Args>(args)...)
 76         );
 77         
 78     std::future<return_type> res = task->get_future();
 79     {                                                                //这加一个作用域的作用是出了这个作用域就解锁
 80         std::unique_lock<std::mutex> lock(queue_mutex);
 81 
 82         // don't allow enqueueing after stopping the pool
 83         if(stop)
 84             throw std::runtime_error("enqueue on stopped ThreadPool");
 85 
 86         tasks.emplace([task](){ (*task)(); });
 87     }
 88     condition.notify_one();        ////选择一个wait状态的线程进行唤醒,并使他获得对象上的锁来完成任务(即其他线程无法访问对象)
 89     return res;
 90 }
 91 
 92 // the destructor joins all threads
 93 inline ThreadPool::~ThreadPool()
 94 {
 95     {
 96         std::unique_lock<std::mutex> lock(queue_mutex);
 97         stop = true;
 98     }
 99     condition.notify_all();                        //通知所有wait状态的线程竞争对象的控制权,唤醒所有线程执行
100     for(std::thread &worker: workers)
101         worker.join();
102 }
103 
104 #endif

运行代码

 1 #include "stdafx.h"
 2 #include <iostream>
 3 #include <vector>
 4 #include <chrono>
 5 
 6 #include "ThreadPool.h"
 7 
 8 int main()
 9 {
10     
11     ThreadPool pool(4);
12     std::vector< std::future<int> > results;
13 
14     for(int i = 0; i < 8; ++i) {
15         results.emplace_back(
16             pool.enqueue([i] {
17                 std::cout << "hello " << i << std::endl;
18                 std::this_thread::sleep_for(std::chrono::seconds(1));
19                 std::cout << "world " << i << std::endl;
20                 return i*i;
21             })
22         );
23     }
24 
25     for(auto && result: results)
26         std::cout << result.get() << ' ';
27     std::cout << std::endl;
28     system("pause");
29     return 0;
30 }

 

原文链接: https://www.cnblogs.com/wangshaowei/p/8884290.html

欢迎关注

微信关注下方公众号,第一时间获取干货硬货;公众号内回复【pdf】免费获取数百本计算机经典书籍;

也有高质量的技术群,里面有嵌入式、搜广推等BAT大佬

    c++11之100行实现简单线程池

原创文章受到原创版权保护。转载请注明出处:https://www.ccppcoding.com/archives/391930

非原创文章文中已经注明原地址,如有侵权,联系删除

关注公众号【高性能架构探索】,第一时间获取最新文章

转载文章受原作者版权保护。转载请注明原作者出处!

(0)
上一篇 2023年3月31日 上午10:50
下一篇 2023年3月31日 上午10:50

相关推荐