libco源码解析(6) co_eventloop

libco源码解析(1) 协程运行与基本结构
libco源码解析(2) 创建协程,co_create
libco源码解析(3) 协程执行,co_resume
libco源码解析(4) 协程切换,coctx_make与coctx_swap
libco源码解析(5) poll
libco源码解析(6) co_eventloop
libco源码解析(7) read,write与条件变量
libco源码解析(8) hook机制探究
libco源码解析(9) closure实现

引言

我们总能在运行libco协程代码的最后看到对于函数co_eventloop的调用,它可以理解为主协程执行的函数。我们举一个简单的例子来说明它的作用:


void* routinefun(void* args){
    co_enable_hook_sys();
    while(true){
        poll(NULL, 0, 1000);
    }
    return 0;
}

int main(int argc,char *argv[])
{
    vector<task_t> v;
    for(int i=1;i<argc;i+=2)
    {
        task_t task = { 0 };
        SetAddr( argv[i],atoi(argv[i+1]),task.addr );
        v.push_back( task );
    }

    for(int i=0;i<2;i++)
    {
        stCoRoutine_t *co = 0;
        co_create( &co,NULL,routinefun,v2 );
        printf("routine i %d\n",i);
        co_resume( co );
    }

    co_eventloop( co_get_epoll_ct(),0,0 );

    return 0;
}

这段代码非常简单,主协程运行两个协程,协程函数所做的事情就是使用poll切换执行权,并在一秒后切换回来(超时)。这里线程的执行过程是这样的,我们把主协程看做A,其他两个协程看做BC。执行过程为:

  1. B协程执行,使用poll把一个stPoll_t结构插入时间轮,切换执行权,回到A协程。
  2. C协程执行,使用poll把一个stPoll_t结构插入时间轮,切换执行权,回到A协程。
  3. 此时A协程执行Eventloop中,不停的循环,直到B协程注册的事件超时,调用回调回到B协程。
  4. B协程继续执行,再次使用poll,重复第一步,回到A协程。
  5. A协程继续执行Eventloop,不停的循环,直到C协程注册的事件超时,调用回调回到C协程。
  6. C协程继续执行,再次使用poll,重复第二步,回到A协程。

这样我们就可以看清楚co_eventloop到底做了什么,其实就是不停的轮询等待其他协程注册的事件成立,仅此而已。

co_eventloop

/*
* libco的核心调度
* 在此处调度三种事件:
* 1. 被hook的io事件,该io事件是通过co_poll_inner注册进来的
* 2. 超时事件
* 3. 用户主动使用poll的事件
* 所以,如果用户用到了三种事件,必须得配合使用co_eventloop
*
* @param ctx epoll管理器
* @param pfn 每轮事件循环的最后会调用该函数
* @param arg pfn的参数
*/

void co_eventloop( stCoEpoll_t *ctx,pfn_co_eventloop_t pfn,void *arg )
{
    if( !ctx->result )  // 给结果集分配空间
    {                                                    // epoll结果集大小
        ctx->result =  co_epoll_res_alloc( stCoEpoll_t::_EPOLL_SIZE );
    }
    co_epoll_res *result = ctx->result;


    for(;;)
    {
        // 最大超时时间设置为 1 ms
        // 所以最长1ms,epoll_wait就会被唤醒
        int ret = co_epoll_wait( ctx->iEpollFd,result,stCoEpoll_t::_EPOLL_SIZE, 1 );

        // 不使用局部变量的原因是epoll循环并不是元素的唯一来源.例如条件变量相关(co_routine.cpp stCoCondItem_t)
        stTimeoutItemLink_t *active = (ctx->pstActiveList);
        stTimeoutItemLink_t *timeout = (ctx->pstTimeoutList);

        memset( timeout,0,sizeof(stTimeoutItemLink_t) );

        // 获取在co_poll_inner放入epoll_event中的stTimeoutItem_t结构体
        for(int i=0;i<ret;i++)
        {
            stTimeoutItem_t *item = (stTimeoutItem_t*)result->events[i].data.ptr;

            if( item->pfnPrepare ) // 如果用户设置预处理回调的话就执行
            {
                // 若是hook后的poll的话,会把此事件加入到active队列中,并更新一些状态
                item->pfnPrepare( item,result->events[i],active );
            }
            else
            {
                AddTail( active,item );
            }
        }


        // 从时间轮上取出超时事件
        unsigned long long now = GetTickMS();

        // 以当前时间为超时截止点
        // 从时间轮中取出超时的时间放入到timeout中
        TakeAllTimeout( ctx->pTimeout,now,timeout );

        stTimeoutItem_t *lp = timeout->head;
        while( lp ) // 遍历超时链表,设置超时标志,并加入active链表
        {
            //printf("raise timeout %p\n",lp);
            lp->bTimeout = true;
            lp = lp->pNext;
        }

        // 把timeout合并到active中
        Join<stTimeoutItem_t,stTimeoutItemLink_t>( active,timeout );

        lp = active->head;
        // 开始遍历active链表
        while( lp )
        {
            // 在链表不为空的时候删除active的第一个元素 如果删除成功,那个元素就是lp
            PopHead<stTimeoutItem_t,stTimeoutItemLink_t>( active );
            if (lp->bTimeout && now < lp->ullExpireTime) 
            { // 一种排错机制,在超时和所等待的时间内已经完成只有一个条件满足才是正确的
                int ret = AddTimeout(ctx->pTimeout, lp, now);
                if (!ret) //插入成功
                {
                    lp->bTimeout = false;
                    lp = active->head;
                    continue;
                }
            }
            // TODO 有问题,如果同一个协程有两个事件在一次epoll循环中触发,
            // 那么第一个事件切回去执行协程,第二个呢,已提交issue
            if( lp->pfnProcess )
            {    // 默认为OnPollProcessEvent 会切换协程
                lp->pfnProcess( lp );
            }

            lp = active->head;
        }
        // 每次事件循环结束以后执行该函数, 用于终止协程
        if( pfn )
        {
            if( -1 == pfn( arg ) )
            {
                break;
            }
        }

    }
}

首先我们可以看到activetimeout链表都在stCoEpoll_t中存储,而这个结构是线程私有的。那么为什么不把这个值设置成局部变量呢?答案不在co_eventloop中,而藏在其他函数,比如libco实现的条件变量中,条件变量会在signal后把值放入到active链表或者timeout链表,而这些只能放在stCoEpoll_t中。

还有这里的timeout链表其实最终会合并到active中,先分开纯粹是为了处理方便一点。

然后就是把事件从epoll结果集中拿出来,去执行预处理回调。我们来看看预处理回调,我们曾在poll中提到过:

void OnPollPreparePfn( stTimeoutItem_t * ap,struct epoll_event &e,stTimeoutItemLink_t *active )
{
    stPollItem_t *lp = (stPollItem_t *)ap;
    // 把epoll此次触发的事件转换成poll中的事件
    lp->pSelf->revents = EpollEvent2Poll( e.events );


    stPoll_t *pPoll = lp->pPoll;
    // 已经触发的事件数加一
    pPoll->iRaiseCnt++;

    // 若此事件还未被触发过
    if( !pPoll->iAllEventDetach )
    {
        // 设置已经被触发的标志
        pPoll->iAllEventDetach = 1;

        // 将该事件从时间轮中移除
        // 因为事件已经触发了,肯定不能再超时了
        RemoveFromLink<stTimeoutItem_t,stTimeoutItemLink_t>( pPoll );

        // 将该事件添加到active列表中
        AddTail( active,pPoll );

    }
}

我们可以看到其实所做的事情就是把epoll事件对应的stPoll_t结构中的值执行一些修改,并把此项插入到active链表中。

然后就是从时间轮中取出根据目前时间来说已经超时的事件,并插入到timeout链表中:

inline void TakeAllTimeout( stTimeout_t *apTimeout,unsigned long long allNow,stTimeoutItemLink_t *apResult )
{
    // 第一次调用是设置初始时间
    if( apTimeout->ullStart == 0 )
    {
        apTimeout->ullStart = allNow;
        apTimeout->llStartIdx = 0;
    }

    // 当前时间小于初始时间显然是有问题的
    if( allNow < apTimeout->ullStart )
    {
        return ;
    }
    // 求一个取出事件的有效区间
    int cnt = allNow - apTimeout->ullStart + 1;
    if( cnt > apTimeout->iItemSize )
    {
        cnt = apTimeout->iItemSize;
    }
    if( cnt < 0 )
    {
        return;
    }
    for( int i = 0;i<cnt;i++)
    {    // 把上面求的有效区间过一遍,某一项存在数据的话插入到超时链表中
        int idx = ( apTimeout->llStartIdx + i) % apTimeout->iItemSize;
        // 链表操作,没什么可说的
        Join<stTimeoutItem_t,stTimeoutItemLink_t>( apResult,apTimeout->pItems + idx  );
    }
    // 更新时间轮属性
    apTimeout->ullStart = allNow;
    apTimeout->llStartIdx += cnt - 1;
}

然后就是把超时链表处理以后加入到active链表啦,不得不说这些封装的链表操作还是非常实用的。

然后就是遍历active链表,一一执行每一个事件的回调啦,当然没执行一次回调就意味着一次协程的切换,因为我们在poll中注册的回调执行co_resume。

这里其实我个人认为是有一些问题的。如果我们在poll中注册了两个fd的事件,这两个时间在一次epoll_wait中被触发,那么第一个被执行了,第二个呢?如果再执行的话就会core dump,因为这个上下文已经被用过了。这里我们应该做一个哈希表,给每一个协程一个特定编号,在遍历active时如果某一个协程已经被使用,我们在后面的遍历过程不再调用回调,这样就可以避免这个问题,已经提交issume。

我们注意到循环的最后调用了pfn,这是一个我们在调用co_eventloop时传入的函数指针,它的作用是什么呢?我的想法是跳出Eventloop循环,因为不是所有的协程使用都想例子一样把 co_eventloop放在函数最后,协程更多的是嵌到代码中,我们需要在有些时候终止eventloop,传入一个终止回调就是一个不错的方法。

原文链接: https://www.cnblogs.com/lizhaolong/p/16437251.html

欢迎关注

微信关注下方公众号,第一时间获取干货硬货;公众号内回复【pdf】免费获取数百本计算机经典书籍;

也有高质量的技术群,里面有嵌入式、搜广推等BAT大佬

    libco源码解析(6) co_eventloop

原创文章受到原创版权保护。转载请注明出处:https://www.ccppcoding.com/archives/396065

非原创文章文中已经注明原地址,如有侵权,联系删除

关注公众号【高性能架构探索】,第一时间获取最新文章

转载文章受原作者版权保护。转载请注明原作者出处!

(0)
上一篇 2023年4月5日 下午1:52
下一篇 2023年4月5日 下午1:53

相关推荐