muduo网络库源码解析(8):EventLoopThreadPool与EventLoopThread

muduo网络库源码解析(1):多线程异步日志库(上)
muduo网络库源码解析(2):多线程异步日志库(中)
muduo网络库源码解析(3):多线程异步日志库(下)
muduo网络库源码解析(4):TimerQueue定时机制
muduo网络库源码解析(5):EventLoop,Channel与事件分发机制
muduo网络库源码解析(6):TcpServer与TcpConnection(上)
muduo网络库源码解析(7):TcpServer与TcpConnection(下)
muduo网络库源码解析(8):EventLoopThreadPool与EventLoopThread
muduo网络库源码解析(9):Connector与TcpClient

引言

在看muduo源码时我始终有这样一个疑问,即为什么要大量使用回调?这样做使得代码的复杂度大规模提升,且在结构上修改的话这些结构是不必要的.这两个类也许能在一定角度回答这个问题.在第六篇时我们简单介绍过EventLoopThreadPool,这一篇就更详细的分析下这两个类所实现的功能.

首先我们来看一下调用EventLoopThreadPool的地方,即TcpServer::start,我们从这里入手

void TcpServer::start()
{
  if (started_.getAndSet(1) == 0)
  {
    threadPool_->start(threadInitCallback_); //启动IO线程池

    assert(!acceptor_->listenning());
    loop_->runInLoop(
        std::bind(&Acceptor::listen, get_pointer(acceptor_)));
  }
}

先看一下EventLoopThreadPool的数据

 private:

  EventLoop* baseLoop_; //主线程loop
  string name_; 
  bool started_; //标记当前状态 即IO线程是否开始运行
  int numThreads_; //IO线程数
  int next_; //负载均衡用
  std::vector<std::unique_ptr<EventLoopThread>> threads_; //线程管理
  std::vector<EventLoop*> loops_; //管理线程返回的IO线程loop

我们直接来看看start

void EventLoopThreadPool::start(const ThreadInitCallback& cb)
{
  assert(!started_);
  baseLoop_->assertInLoopThread(); //一定在主线程

  started_ = true; //设置状态

  for (int i = 0; i < numThreads_; ++i)
  { 
    char buf[name_.size() + 32];
    snprintf(buf, sizeof buf, "%s%d", name_.c_str(), i);
    EventLoopThread* t = new EventLoopThread(cb, buf); //创建一个EventLoopThread对象
    threads_.push_back(std::unique_ptr<EventLoopThread>(t)); //管理线程
    loops_.push_back(t->startLoop()); //存储每一个reactor线程的loop指针,用作事件分发
  }
  if (numThreads_ == 0 && cb) //无reactor线程且回调存在
  {
    cb(baseLoop_); //既然没有处理事件的线程当然只能执行回调了
  }
}

我们来看看EventLoopThread对象

class EventLoopThread : noncopyable
{
 public:
  typedef std::function<void(EventLoop*)> ThreadInitCallback;

  EventLoopThread(const ThreadInitCallback& cb = ThreadInitCallback(),
                  const string& name = string());
  ~EventLoopThread();
  EventLoop* startLoop();

 private: 
  void threadFunc();

  EventLoop* loop_ GUARDED_BY(mutex_); //所属IO线程
  bool exiting_;
  Thread thread_; //
  MutexLock mutex_; //辅助条件变量
  Condition cond_ GUARDED_BY(mutex_); //传递loop指针时的同步
  ThreadInitCallback callback_; //线程初始化时的回调
};

我们直接看在EventLoopThreadPool::start中调用的startLoop.

EventLoop* EventLoopThread::startLoop()
{
  assert(!thread_.started());
  thread_.start();

  EventLoop* loop = NULL; 
  {
    MutexLockGuard lock(mutex_);
    while (loop_ == NULL)
    {
      cond_.wait(); //线程中在调用notify是线程的eventloop对象已经被赋予loop_
    } 
    loop = loop_;
  }

  return loop; 
}

void EventLoopThread::threadFunc()
{
  EventLoop loop;//得到IO线程loop

  if (callback_)
  {
    callback_(&loop);
  } 

  {
    MutexLockGuard lock(mutex_);
    loop_ = &loop; //这里是重点 会把本线程的Eventloop对象返回
    cond_.notify(); //进行同步 防止上一步还没赋值完EventLoopThread::startLoop中就进行赋值导致野指针
  }

  loop.loop();
  //assert(exiting_);
  MutexLockGuard lock(mutex_);
  loop_ = NULL;
}

muduo中自定义的Thread会在执行start的时候开始执行,我们可以在构造函数中看到,线程绑定的是ThreadFun,

thread_(std::bind(&EventLoopThread::threadFunc, this), name),

我们看到ThreadFun会在子线程中把线程私有的loop赋给自己的成员loop_,其中用到了条件变量保证同步.C++11中我们可以用std::promise更轻松完成同步.我们可以很清楚的看到threadFunc的最后执行了loop.loop(),开始了reactor的事件循环.
EventLoopThread::startLoop最后也会把子线程的loop返回给调用者.

我们再回到EventLoopThreadPool::start,

void EventLoopThreadPool::start(const ThreadInitCallback& cb)
{
  assert(!started_);
  baseLoop_->assertInLoopThread(); //一定在主线程

  started_ = true; //设置状态

  for (int i = 0; i < numThreads_; ++i)
  { 
    char buf[name_.size() + 32];
    snprintf(buf, sizeof buf, "%s%d", name_.c_str(), i);
    EventLoopThread* t = new EventLoopThread(cb, buf); //创建一个EventLoopThread对象
    threads_.push_back(std::unique_ptr<EventLoopThread>(t)); //管理线程
    loops_.push_back(t->startLoop()); //存储每一个reactor线程的loop指针,用作事件分发
  }
  if (numThreads_ == 0 && cb) //无reactor线程且回调存在
  {
    cb(baseLoop_); //既然没有处理事件的线程当然只能执行回调了
  }
}

我们发现for循环执行完以后loops_中就存着所有子线程线程私有的loop指针了,而最后一个判断条件揭示了其实reactor线程可为零,这就把muduo变成了一个单线程代码.所以muduo的结构依赖于numThreads的多少.

我们在第六篇说道TcpServer::newConnection中使用了getNextLoop做负载均衡.我们来再看看这个函数.

EventLoop* EventLoopThreadPool::getNextLoop()
{
  baseLoop_->assertInLoopThread();
  assert(started_);
  EventLoop* loop = baseLoop_; //如果不存在的话返回当前线程

  if (!loops_.empty())
  {
    // round-robin
    loop = loops_[next_]; //next协助负载均衡
    ++next_;
    if (implicit_cast<size_t>(next_) >= loops_.size())
    {
      next_ = 0;
    }
  }
  return loop;
}

我们会发现初始时loop被赋值为baseLoop_,也就是主线程的loop,如果我们设置子线程为零的话程序也可正常运行,这就是我在这篇文章要说的重点所在,getNextLoop的实现意味着muduo是支持单线程的.

其实muduo中还写了一种负载均衡的方式,我猜测是对新来的连接的某个属性,比如IP等做一个hash,进行匹配,文末的文章可供参考.

EventLoop* EventLoopThreadPool::getLoopForHash(size_t hashCode)
{
  baseLoop_->assertInLoopThread();
  EventLoop* loop = baseLoop_;

  if (!loops_.empty())
  {
    loop = loops_[hashCode % loops_.size()];
  }
  return loop;
}

总结

我这两天一直在想一个问题,为什么muduo中要把管理连接的map放在主线程中,这样导致在子线程删除连接对象时必须先触发回调回到主线程从map中删除,然后再触发回到回到子线程从事件循环中删除,而切换线程使用runinloop,其中是加锁的,这显然是一个性能消耗,如果我们把map放在子线程中不就没有这些事情了吗?但是显然我忽略了一个问题,即这样是不支持单线程的.所以至少这里是要使用回调的.其实保存一个指针,用时调用可以吗?不行,会出现race condition,所以需要runinloop传递消息,这样有回到了Eventloop,它的触发机制就是使用channel中的回调,直接传一个function不行吗,会触发多次事件,不及一个channel中注册回调来的划算.这样就算可写可读时间都注册其实也只触发一次,所以,muduo中大量使用回调的原因我认为其实就是因为它是一个网络库,它的性质决定了它必须使用回调,如果写的是服务器的话当然就没有必要了,主线程与子线程之间的通信只需要fd即可,那如果服务器也要支持单线程和多线程的扩展的话如何做,此时Map是放在主线程的,我们此时就没必要回调,子线程删除的时候直接删除即可,当然子线程使用的回调还有一个好处,就是对于子线程来说是异步删除,不耽误业务处理时间.你也许会说异步的效率高为什么不用异步的呢,异步在这里意味着我们加入时必须上锁,这与子线程直接删除当然就没必要加锁,因为是没有race condition的,两者孰优孰略得需要测试,但我觉得的也不会有太大差别的.这也许也是一点原因.

参考:
分布式缓存中三种负载均衡的方法

原文链接: https://www.cnblogs.com/lizhaolong/p/16437331.html

欢迎关注

微信关注下方公众号,第一时间获取干货硬货;公众号内回复【pdf】免费获取数百本计算机经典书籍;

也有高质量的技术群,里面有嵌入式、搜广推等BAT大佬

    muduo网络库源码解析(8):EventLoopThreadPool与EventLoopThread

原创文章受到原创版权保护。转载请注明出处:https://www.ccppcoding.com/archives/395995

非原创文章文中已经注明原地址,如有侵权,联系删除

关注公众号【高性能架构探索】,第一时间获取最新文章

转载文章受原作者版权保护。转载请注明原作者出处!

(0)
上一篇 2023年4月5日 下午1:50
下一篇 2023年4月5日 下午1:50

相关推荐