c++线程池实现

转自:https://zhuanlan.zhihu.com/p/77551877

定时器模块的文章中我们知道,如果在定时器很多时只有一个线程去逐个判断定时器是否需要被执行,其效率是不高的,因此我们需要有一个更加高效的模型来实现这个工作。同时我们知道为了更有效地利用机器资源,现代编程中引入了很多概念,从最重的进程,到后来的线程,到近几年大火的协程,我们不去评价哪个模型是最合理的,因为存在即是合理,所以这里我们只是讨论我们使用到的模型,多线程模型。使用多线程时最容易会想到的使用方式肯定就是one requests one thread,但是在linux操作系统中线程其实是用进程来实现的,因此如果不加管理地使用使用线程其实对操作系统而言,会存在很大的上下文切换的压力,因此,使用线程池是一个比较好的实践方式。由于线程的概念比c/c++的出现要晚,所以过去在c/c++中并没有对线程的支持,因此一直都是使用pthread库来作为线程的标准实现方式,直到c++11标准之后,c++才引入thread作为标准库,使用c++来进行并发编程才变得简单。那在过去如果要在c++中使用多线程编程,或者说要编写一个c++线程池是怎样做的呢?下面我们逐个知识点来讲。

用pthread实现简单的线程池

phtread库的使用方式

要使用pthread库,必须在代码中包含头文件

1 #include <pthread.h>

并且要在链接时加入库

-lpthread

类和线程如何结合起来

首先来看看pthread库中创建线程的api

int pthread_create(pthread_t *thread, const pthread_attr_t *attr,
                          void *(*start_routine) (void *), void *arg);

pthread_t *thread 为这个线程的线程id,如果在创建时这个参数为null,可以在后续通过调用pthread_self()函数来获取本线程的线程id
const pthread_attr_t *attr 为线程的属性,具体选项有不少,常用的包括设置joinable,线程堆栈大小等等,这里不再啰嗦,可以通过pthread_attr_init()等函数来设置attr
void *(*start_routine) (void *) 是这个线程要执行哪个函数,这个函数必须是返回void*指针,而且能够接受一个void*参数
void *arg 就是上面函数接受的void *参数

看完上述api的介绍之后,可能有同学会有一个问题,api我是知道了,一个线程会有一个线程id,也会有一个attr我也知道了,作为写c++的人,我会希望把这些属性用面向对象的方式给装置起来,但是,这函数接受的是一个函数,我要怎样才能在这个函数中找到我对应的封装类的实例呢?又要怎样才能调用类的成员函数呢?不说废话,我们直接上一个玩具级别的线程封装的代码吧。

#include <pthread.h>
#include <iostream>
#include <unistd.h>

class Thread{
public:
    Thread();
    ~Thread();
    bool Start();
    static void* ThreadFunc( void* );
    bool Run();
private:
    pthread_t mTid;
};



Thread::Thread()
    :mTid(0){

}

Thread::~Thread(){

}

bool Thread::Start(){
    return 0==pthread_create( &mTid, NULL, &Thread::ThreadFunc, (void*)this );
}

void* Thread::ThreadFunc( void* arg ){
    Thread* self = (Thread*)arg;
    self->Run();
    return NULL;
}

bool Thread::Run(){
    while(true){
        std::cout << mTid << std::endl;
        sleep(1);
    }

    return true;
}

int main(){
    Thread t1;
    t1.Start();
    Thread t2;
    t2.Start();
    Thread t3;
    t3.Start();

    while(true){
        sleep(1);
    }

}

上述代码中,Thread类提供类3个函数,其中Start()为入口,调用pthreadcreate,而关键点就在这里了。pthread_create的第三个参数,也就是线程调用的函数传了一个Thread类的静态函数进去,因为在类中,静态成员函数是不需要通过类的访问的,所以pthread_create能够直接调用这个函数。而最关键的点就在于第四个参数,这里把this指针给传进去了,为什么呢?大家可以看到,在ThreadFunc函数的实现中,我们会把本身为void类型的this指针重新强制类型转换为Thread 类型,然后用来调用Run()方法。由于this指针就是类实例本身,通过这样两次类型的转换,线程和对应的类实例就绑定起来了。

怎样让线程完成不同类型的任务

上面我们解决了怎样用类把线程相关的数据给封装起来,但是,上面的线程是一个简单得不能再简单的封装了,尽管它已经完成了线程的启动部分,因此,为了能够让线程完成不同的任务,我们需要在这个封装类的基础上面派生成更多的任务,如下

#include <pthreah.h>
#include <iostream>

class Thread{
public:
    Thread();
    ~Thread();
    bool Start();
    static void* ThreadFunc( void* );
    virtual bool Run() = 0;
private:
    pthread_t mTid;
};

class WorkThread : public Thread{
public:
        WorkThread( );
        virtual ~WorkThread();
        virtual void Run();
        void Stop();
} ;


WorkThread::WorkThread( ){
}

WorkThread::~WorkThread(){
}

void WorkThread::Run(){
    while(true)
        std::cout << "work thread" << std::endl;
}

怎样让线程动态绑定任务

现在我们有线程了,也知道怎样能够派生出不同类型的线程来了。但是,如果一个任务就写派生一个线程的话,这样的做法貌似不那么的理智,在任务很多的情况下,我们得产生多少的线程呢?那怎么办呢?我们能不能复用线程,只是改变线程的运行参数呢?如果是c程序员,应该会很快想到用函数指针加void*,那我们写c++的吧,有没有更加c++点的做法呢?必须有的,继承和多态。我们可以把任务抽象成一个基类,每个任务派生一个任务参数,这样线程完成一个任务之后只要领取另外一个任务就可以了。

//task.h
#ifndef __ABSTRACT_TASK__
#define __ABSTRACT_TASK__

class Task {
public:
        virtual void Execute() = 0;
        void Run();
};

#endif

//task.cpp
void Task::Run(){
        Execute();
        delete this;
}

//work_thread.h
#ifndef __WORK_THREAD__
#define __WORK_THREAD__

#include <queue>

#include "thread.h"
#include "abstract_task.h"
#include "mutex.h"
#include "cond.h"

class WorkThread : public Thread{
public:
        WorkThread(  );
        virtual ~WorkThread();
        virtual void Run();
        void Stop();
        void AddTask( Task* task );

private:

        bool mIsBusy;
        bool mQuit;
        Mutex mMutex;
        Cond mCond;
        std::queue<Task*> mTaskQueue;
} ;

#endif

//work_thread.cpp
#include "work_thread.h"
#include <iostream>
#include <stdexcept>

WorkThread::WorkThread( )
mIsBusy(false)
,mQuit(false){

}

WorkThread::~WorkThread(){

}

void WorkThread::Run(){
        while(true){
                Task* task = NULL;

                {
                        ScopeLock guard(mMutex);
                        if( mTaskQueue.empty() ){
                                if( mQuit )
                                        break;
                                else{
                                        mIsBusy = false;
                                        //等待信号,要么有新任务到来,要么被回收了,因此在被唤醒时不能直接去干活,而是先多花一次循环来判断到底要做什么
                                        mCond.Wait( mMutex.Get() );
                                        continue;
                                }
                        }

                        task = mTaskQueue.front();
                        mTaskQueue.pop();
                }

                try{
                        task->Run();

                }catch( std::exception& e){
                        std::cout <<"exception: "<< e.wht() << std::endl;;
                }
        }
}


void WorkThread::Stop(){
        ScopeLock guard(mMutex);
        mQuit = true;
        if ( !mIsBusy )
                mCond.Notify();
}

void WorkThread::AddTask( Task* task ){
        ScopeLock guard(mMutex);
        mTaskQueue.push( task );
        if ( mTaskQueue.size()==1 ){
                mIsBusy = true;
                mCond.Notify();
        }
}

怎样把多个线程管理起来

有了上面的工作,我们就可以实现动态绑定任务和线程,但是哪个任务需要长时间执行,哪个能够很快执行,这个是没法判断的,那么这就带来了一个问题,就是我们要应该把任务分配给哪个线程?我们怎样才能知道哪个线程是闲的,总不能出现把线程饱的饱死,饿的饿死吧。因此,我们需要实现一个线程管理器,这个管理器需要把知道所有线程的状态,这样有新任务来的时候就能够把任务分配给有资源的线程来处理。另外为了避免因为任务执行时间太长而导致其他任务无法被执行的问题出现,我们规定每个线程只能拿到一个任务。因此,总结一下,这个线程管理器就需要做到下面几个功能

  • 维护一定数量的线程,并且能够掌握线程的状态。因此线程需要向这个管理器汇报自己的状态
  • 当任务数量多于线程数量时,管理器能够自动扩充线程池,以保证每个任务都有处理的线程;当线程池处于空闲状态时,管理器能够自动回收多余的线程,以保证线程池大小稳定

我们直接上代码吧

#include <list>
#include <pthread.h>
#include <stdint.h>

#include "task.h"
#include "mutex.h"
#include "work_thread.h"


class ThreadManager{
public:
        ThreadManager(uint32_t poolSize);
        ~ThreadManager();
        bool Start();
        bool Stop();
        void AddTask( Task* task );
        void OnIdle( pthread_t );

private:
        uint32_t mPoolSize;
        Mutex mMutex;
        std::list<WorkThread*> mThreadPool; 
        std::list<WorkThread*> mDeadThreads;
};

ThreadManager::ThreadManager( uint32_t poolSize )
:mPoolSize(poolSize) {

}

ThreadManager::~ThreadManager(){
        if ( mThreadPool.empty() )
                return;

        for( std::list<WorkThread*>::iterator it=mThreadPool.begin(); it!=mThreadPool.end(); ++it ){
                delete *it;
        }
}

bool ThreadManager::Start( ){
        ScopeLock guard(mMutex);
        for( uint32_t i=0; i<mPoolSize; i++ ){
                WorkThread* t = new WorkThread( this );
                t->Start();
                mThreadPool.push_back( t );
        }
}

bool ThreadManager::Stop(){
        ScopeLock guard(mMutex);
        for( std::list<WorkThread*>::iterator it=mThreadPool.begin(); it!=mThreadPool.end(); ++it ){
                (*it)->Stop();
        }
}

void ThreadManager::AddTask( Task* task ){
        ScopeLock guard(mMutex);
        for ( std::list<WorkThread*>::iterator it=mThreadPool.begin(); it!=mThreadPool.end(); ++it ){
                if ( !(*it)->IsBusy() ){
                        (*it)->AddTask( task );
                        return;
                }
        }

        WorkThread* t = new WorkThread( this );
        t->Start();
        mThreadPool.push_back( t );
        t->AddTask( task );
}

void ThreadManager::OnIdle( pthread_t id ){
        ScopeLock guard(mMutex);
        for ( std::list<WorkThread*>::iterator it = mDeadThreads.begin(); it != mDeadThreads.end(); ++it ){
                (*it)->Stop();
                delete *it;
        }
        mDeadThreads.clear();

        if ( mPoolSize >= mThreadPool.size() ){
                return;
        }

        for ( std::list<WorkThread*>::iterator it=mThreadPool.begin(); it!=mThreadPool.end();  ){
                if( id == (*it)->GetId() ){
                       //这里不能直接让线程执行Stop,因为在执行OnIdle时已经拿到了锁,自己无法做递归加锁,因此把自己加入到队列中,由其他线程来Stop自己
                        mDeadThreads.push_back(*it);
                        it = mThreadPool.erase(it);
                }
                else
                        ++it;
        }
}


//main.cpp
#include "thread.h"
#include "work_thread.h"
#include "thread_manager.h"
#include "task.h"

class TaskA : public Task{
public:
        virtual void Execute(){
                std::cout <<"TaskA" << std::endl;
                sleep(1);
        }
};
class TaskB : public Task{
public:
        virtual void Execute(){
                std::cout <<"TaskB" << std::endl;
                sleep(1);
        }
};


int main(){
        ThreadManager* tm = new ThreadManager(10) ;

        tm->Start();

        for(int i=0; i<20; i++ ){
                if(i%2==0)
                        tm->AddTask( new TaskA );
                else
                        tm->AddTask( new TaskB );
        }
        sleep(5);
        tm->Stop();

        return 0;
}

one more thing

到这里,基于phtread的线程池的实现已经差不多要结束了,至于像mutex和cond的实现,有兴趣的同学可以参见 完整代码 。但是,这里只是一个玩具级别的实现,如果要用于生产,还需要增加不少功能,例如

  • 所有的类都需要增加禁止拷贝的能力,这个可有通过把拷贝构造和=的重载增加到private中
  • thread_manager的只能用单例来实现
  • 增加日志以简化定位问题过程

其实在c++11中,thread,mutex,cond都已经有其对应的封装库,大家可以直接使用,这样在实现线程池时就能减少好多代码,专注于work_thread的实现了。另外,像用基类来实现任务公共类的做法,在c++11中也提供了更加好用的function/bind,后面有机会我们在拿一篇文章来讲下这个。

编辑于 2019-08-18原文链接: https://www.cnblogs.com/warmlight/p/12426674.html

欢迎关注

微信关注下方公众号,第一时间获取干货硬货;公众号内回复【pdf】免费获取数百本计算机经典书籍

原创文章受到原创版权保护。转载请注明出处:https://www.ccppcoding.com/archives/194012

非原创文章文中已经注明原地址,如有侵权,联系删除

关注公众号【高性能架构探索】,第一时间获取最新文章

转载文章受原作者版权保护。转载请注明原作者出处!

(0)
上一篇 2023年2月12日 下午6:33
下一篇 2023年2月12日 下午6:33

相关推荐