【C++】一种线程池的实现方式

  • 1.需要预先定义任务的函数指针
  • 2.构造函数接受容量参数,申请线程池空间,并将线程全部启动
  • 3.初始状态任务队列tasks_为空,当队列为空,则阻塞线程——thread_loop()
  • 4.当任务队列中有元素后,唤醒其中一个线程,执行这个任务——take()->task()
/**************************************************************************************************
* File Name : ring_queue_nolock.h
* Created : 20 / 03 / 03
* Author : GYT
* Description : 线程池
* 1.需要预先定义任务的函数指针
* 2.构造函数接受容量参数,申请线程池空间,并将线程全部启动
* 3.初始状态任务队列tasks_为空,当队列为空,则阻塞线程——thread_loop()
* 4.当任务队列中有元素后,唤醒其中一个线程,执行这个任务——take()->task()
**************************************************************************************************/
#ifndef _thread_pool_HPP
#define _thread_pool_HPP

#include <vector>
#include <deque>
#include <thread>
#include <functional>
#include <mutex>
#include <condition_variable>

namespace gyt {

    class Thread_Pool{
    public:
        typedef std::function<void()> task_t;
        /**************************************************************************************************
        * Function Name : Thread_Pool
        * Description : 构造函数
        * Date : 20 / 03 / 03
        * Parameter : int init_size:线程池大小
        * Return Code : none
        * Author : GYT
        **************************************************************************************************/
        Thread_Pool(int init_size);
        /**************************************************************************************************
        * Function Name : ~Thread_Pool
        * Description : 析构函数
        * Date : 20 / 03 / 0
        * Parameter : none
        * Return Code : none
        * Author : GYT
        **************************************************************************************************/
        ~Thread_Pool();
        /**************************************************************************************************
        * Function Name : stop
        * Description : 停止函数
        * Date : 20 / 03 / 03
        * Parameter : none
        * Return Code : none
        * Author : GYT
        **************************************************************************************************/
        void stop();
        /**************************************************************************************************
        * Function Name : add_task
        * Description : 添加任务函数
        * Date : 20 / 03 / 03
        * Parameter : const task_t& 需要添加的任务函数指针
        * Return Code : none
        * Author : GYT
        **************************************************************************************************/
        void add_task(const task_t&/*, type para*/);

    private:
        Thread_Pool(const Thread_Pool&);
        const Thread_Pool& operator=(const Thread_Pool&);
        /**************************************************************************************************
        * Function Name : is_started
        * Description : 判断是否启动
        * Date : 20 / 03 / 03
        * Parameter : none
        * Return Code : bool
        * Author : GYT
        **************************************************************************************************/
        bool is_started() { return is_started_; }
        /**************************************************************************************************
        * Function Name : start
        * Description : 启动线程池
        * Date : 20 / 03 / 03
        * Parameter : none
        * Return Code : none
        * Author : GYT
        **************************************************************************************************/
        void start();
        /**************************************************************************************************
        * Function Name : thread_loop
        * Description : 线程循环遍历任务队列,如果不为空,则取任务
        * Date : 20 / 03 / 03
        * Parameter : none
        * Return Code : bool
        * Author : GYT
        **************************************************************************************************/
        void thread_loop();
        /**************************************************************************************************
        * Function Name : take
        * Description : 从任务队列中获取任务,返回函数指针
        * Date : 20 / 03 / 03
        * Parameter : none
        * Return Code : task_t
        * Author : GYT
        **************************************************************************************************/
        task_t take();

        typedef std::vector<std::thread*> THREADS_T;  //线程容器,大小由构造函数决定
        typedef std::deque<task_t>          TASKS_T;      //任务队列,先入先出


        int                     init_threads_size_;     //初始化线程池数量
        THREADS_T               threads_;               //线程容器本体
        TASKS_T                 tasks_;                 //任务队列
        std::mutex              mutex_;                 //同步控制-锁
        std::condition_variable cond_;                  //同步控制-条件变量
        bool                    is_started_;            //线程池是否正在运行
    };

}
#endif
#include <assert.h>
#include <iostream>
#include <sstream>
#include "thread_pool.h"

namespace gyt {
    Thread_Pool::Thread_Pool(int init_size)
        :init_threads_size_(init_size),
        mutex_(),
        cond_(),
        is_started_(false)
    {
        start();
    }

    Thread_Pool::~Thread_Pool()
    {
        if (is_started_)
        {
            stop();
        }
    }

    void Thread_Pool::start()
    {
        assert(threads_.empty());
        is_started_ = true;
        threads_.reserve(init_threads_size_);
        for (int i = 0; i < init_threads_size_; i++)
        {
            threads_.push_back(new std::thread(std::bind(&Thread_Pool::thread_loop, this)));
        }

    }

    void Thread_Pool::stop()
    {
        std::unique_lock<std::mutex> lock(mutex_);
        is_started_ = false;
        cond_.notify_all();

        for (THREADS_T::iterator it = threads_.begin(); it != threads_.end(); ++it)
        {
            (*it)->join();
            delete *it;
        }
        threads_.clear();
    }


    void Thread_Pool::thread_loop()
    {
        while (is_started_)
        {
            task_t task = take();
            if (task)
            {
                //std::unique_lock<std::mutex> lock(mutex_);
                task();
            }
        }
    }

    void Thread_Pool::add_task(const task_t& task/*, type para*/)
    {
        std::unique_lock<std::mutex> lock(mutex_);
        tasks_.push_back(task);
        cond_.notify_one();
    }

    Thread_Pool::task_t Thread_Pool::take()
    {
        std::unique_lock<std::mutex> lock(mutex_);
        while (tasks_.empty() && is_started_)
        {
            cond_.wait(lock);
        }

        task_t task;
        TASKS_T::size_type size = tasks_.size();
        if (!tasks_.empty() && is_started_)
        {
            task = tasks_.front();
            tasks_.pop_front();
            assert(size - 1 == tasks_.size());
        }
        return task;
    }   
}

测试代码:

#include <iostream>
#include <chrono>
#include <condition_variable>
#include "thread_pool.h"

std::mutex g_mutex;

void DoTest()
{
    for (int i = 1; i < 4; ++i)
    {
        std::this_thread::sleep_for(std::chrono::seconds(1));
        std::lock_guard<std::mutex> lock(g_mutex);
        std::cout << "testFunc() [" << i << "] at thread [ " << std::this_thread::get_id() << "] output" << std::endl;
    }

}

int main()
{
    gyt::Thread_Pool thread_pool(7);
    for (int i = 0; i < 7; i++){
        thread_pool.add_task(DoTest);
    }
    system("pause");
    return 0;
}

原文链接: https://www.cnblogs.com/fishily/p/12402247.html

欢迎关注

微信关注下方公众号,第一时间获取干货硬货;公众号内回复【pdf】免费获取数百本计算机经典书籍;

也有高质量的技术群,里面有嵌入式、搜广推等BAT大佬

    【C++】一种线程池的实现方式

原创文章受到原创版权保护。转载请注明出处:https://www.ccppcoding.com/archives/333097

非原创文章文中已经注明原地址,如有侵权,联系删除

关注公众号【高性能架构探索】,第一时间获取最新文章

转载文章受原作者版权保护。转载请注明原作者出处!

(0)
上一篇 2023年3月1日 下午8:58
下一篇 2023年3月1日 下午8:58

相关推荐