epoll源码解析(3) epoll_wait

示例代码内核版本为2.6.38

epoll源码解析(1) epoll_create
epoll源码解析(2) epoll_ctl
epoll源码解析(3) epoll_wait

引言

这篇文章主要对epoll_wait进行分析,其中可以说藏着很多以前想知道而又没办法知道的东西,正如侯捷老师所言,“源码面前,了无秘密”.在这篇文章里你可以知道ET与LT究竟有什么区别,epoll如何防止惊群问题等等有意思的东西.

/*
 * Implement the event wait interface for the eventpoll file. It is the kernel
 * part of the user space epoll_wait(2).
 */
SYSCALL_DEFINE4(epoll_wait, int, epfd, struct epoll_event __user *, events,
        int, maxevents, int, timeout)
{
    int error;
    struct file *file;
    struct eventpoll *ep; 

    //这个函数中基本是对用户传进来的参数进行一些正确性检验,因为内核对于用户态是不信任的,这也就是干什么都要拷贝的原因吧.

    /* The maximum number of event must be greater than zero */
    if (maxevents <= 0 || maxevents > EP_MAX_EVENTS) //判断最大事件数是否在正确范围内
        return -EINVAL;

    /* Verify that the area passed by the user is writeable */
    //检测传用户传入的指针所指区域是否可写
    if (!access_ok(VERIFY_WRITE, events, maxevents * sizeof(struct epoll_event))) {
        error = -EFAULT;
        goto error_return;
    }

    /* Get the "struct file *" for the eventpoll file */
    error = -EBADF;
    file = fget(epfd); //获取epoll的file结构体 linux的文件概念真滴是强大
    if (!file)
        goto error_return;

    /*
     * We have to check that the file structure underneath the fd
     * the user passed to us _is_ an eventpoll file.
     */
    error = -EINVAL;
    if (!is_file_epoll(file)) //判断这是不是一个epoll的文件指针
        goto error_fput;

    /*
     * At this point it is safe to assume that the "private_data" contains
     * our own data structure.
     */
    ep = file->private_data; //从file的private_data中获取eventpoll结构

    /* Time to fish for events ... */
    error = ep_poll(ep, events, maxevents, timeout); //epoll_wait的主函数体

error_fput:
    fput(file);
error_return:

    return error;
}
static int ep_poll(struct eventpoll *ep, struct epoll_event __user *events,
           int maxevents, long timeout)
{
    int res, eavail;
    unsigned long flags;
    long jtimeout;
    wait_queue_t wait;

    /*
     * Calculate the timeout by checking for the "infinite" value ( -1 )
     * and the overflow condition. The passed timeout is in milliseconds,
     * that why (t * HZ) / 1000.
     */
     //计算睡眠事件 要转换成毫秒 式子里那个+999)/1000的作用是为了向上取整 HZ在/drivers/md/raid6.h中定义 值为1000
     //所以这里面的向上取整是否真的有必要呢?
    jtimeout = (timeout < 0 || timeout >= EP_MAX_MSTIMEO) ?
        MAX_SCHEDULE_TIMEOUT : (timeout * HZ + 999) / 1000;

retry: //这个标记很重要 下面会说道

    //#define write_lock_irqsave(lock, flags)   flags = _write_lock_irqsave(lock)
    //#define _write_lock_irqsave(lock, flags)  __LOCK_IRQSAVE(lock, flags)
    /*
    #define __LOCK_IRQSAVE(lock, flags) \
    do { local_irq_save(flags); __LOCK(lock); } while (0)
    */
    write_lock_irqsave(&ep->lock, flags); //自旋锁上锁 一系列宏上面已列出

    res = 0;
    if (list_empty(&ep->rdllist)) { //如果就绪链表为空的话就进入if 也就是睡眠了,否则的话直接跳出,
        //相当于我们如果在epoll_ctl(ADD)后,事件已经发生了后在wait,消耗实际上就只是一个用户态到内核态的转换和拷贝而已,
        //不涉及从等待队列中唤醒
        /*
         * We don't have any available event to return to the caller.
         * We need to sleep here, and we will be wake up by
         * ep_poll_callback() when events will become available.
         */
        init_waitqueue_entry(&wait, current); //用当前进程初始化一个等待队列的entry
        add_wait_queue(&ep->wq, &wait);
        //把刚刚初始化的这个等待队列节点加到epoll内部的等待队列中去,也就是说在epoll_wait被唤醒时唤醒本进程

        for (;;) { //开始睡眠
            /*
             * We don't want to sleep if the ep_poll_callback() sends us
             * a wakeup in between. That's why we set the task state
             * to TASK_INTERRUPTIBLE before doing the checks.
             */
             //这里我翻译下源码中的解释,
             //我们不希望ep_poll_callback()发送给我们wakeup消息时我们还在沉睡,这就是为什么我们我们要在检查前设置成TASK_INTERRUPTIBLE
            set_current_state(TASK_INTERRUPTIBLE);
            if (!list_empty(&ep->rdllist) || !jtimeout) //rdllist不为空或者超时
                break;
            //当阻塞于某个慢系统调用的一个进程捕获某个信号且相应信号处理函数返回时,该系统调用可能返回一个EINTR错误
            if (signal_pending(current)) { //收到一个信号的时候也可能被唤醒
                res = -EINTR;
                break;
            }

            write_unlock_irqrestore(&ep->lock, flags); //解锁 开始睡眠
            jtimeout = schedule_timeout(jtimeout);//schedule_timeout的功能源码中的介绍为 sleep until timeout
            write_lock_irqsave(&ep->lock, flags);
        }
        remove_wait_queue(&ep->wq, &wait); //醒来啦!从等待队列中移除

        set_current_state(TASK_RUNNING);
    }
    /* Is it worth to try to dig for events ? */

    //这里要注意 当rdlist被其他进程访问的时候,ep->ovflist会被设置为NULL,那时rdlist会被txlist替换,
    //因为在遍历rdlist的时候有可能ovlist传入数据,然后写入rdllist,所以rdllist也有可能不为空,
    //但为空且其他进程在访问的时候就会将eavail设置为true,为后面goto到retry再次进行睡眠做准备,
    //这样就避免了用户态的唤醒,从而避免了一定程度上的惊群.文末附上我对惊群的测试于结论的链接
    eavail = !list_empty(&ep->rdllist) || ep->ovflist != EP_UNACTIVE_PTR;

    write_unlock_irqrestore(&ep->lock, flags);

    /*
     * Try to transfer events to user space. In case we get 0 events and
     * there's still timeout left over, we go trying again in search of
     * more luck.
     */
    if (!res && eavail &&
        !(res = ep_send_events(ep, events, maxevents)) && jtimeout)
       //res还得为0 epoll才会继续沉睡,有可能ovflist其中有数据 ,后面赋给了rdllist,这是rdllist有数据,也就是说此时epoll_wait醒来还有数据,所以不必继续沉睡
        goto retry;

    return res;
}
//传入eventpoll结构,用户指定的内存和最大事件数 然后执行ep_send_events_proc回调
static int ep_send_events(struct eventpoll *ep,
              struct epoll_event __user *events, int maxevents)
{
    struct ep_send_events_data esed;
    esed.maxevents = maxevents;
    esed.events = events; 
    return ep_scan_ready_list(ep, ep_send_events_proc, &esed);
}
/*ep_scan_ready_list - Scans the ready list in a way that makes possible for
                       the scan code, to call f_op->poll(). Also allows for
                       O(NumReady) performance.
 */
static int ep_scan_ready_list(struct eventpoll *ep,
                  int (*sproc)(struct eventpoll *, //注意上面调用的回调在这个函数中名为sproc
                       struct list_head *, void *),
                  void *priv)
{
    int error, pwake = 0;
    unsigned long flags;
    struct epitem *epi, *nepi;
    LIST_HEAD(txlist); //初始化一个链表 作用是把rellist中的数据换出来

    /*
     * We need to lock this because we could be hit by
     * eventpoll_release_file() and epoll_ctl().
     */
    mutex_lock(&ep->mtx);//操作时加锁 防止ctl中对结构进行修改

    /*
     * Steal the ready list, and re-init the original one to the
     * empty list. Also, set ep->ovflist to NULL so that events
     * happening while looping w/out locks, are not lost. We cannot
     * have the poll callback to queue directly on ep->rdllist,
     * because we want the "sproc" callback to be able to do it
     * in a lockless way.
     */
    spin_lock_irqsave(&ep->lock, flags); //加锁
    list_splice_init(&ep->rdllist, &txlist);
    ep->ovflist = NULL; //这里设置为NULL
    spin_unlock_irqrestore(&ep->lock, flags);

    /*
     * Now call the callback function.
     */
    error = (*sproc)(ep, &txlist, priv); 
    //对整个txlist执行回调,也就是对rdllist执行回调 
    //遍历的时候可能所监控的fd也会执行回调,向把fd加入到rellist中,但那个时候可能这里正在遍历,为了不竞争锁,把数据放到ovflist中

    spin_lock_irqsave(&ep->lock, flags); //加锁,把其中数据放入rdllist
    /*
     * During the time we spent inside the "sproc" callback, some
     * other events might have been queued by the poll callback.
     * We re-insert them inside the main ready-list here.
     */
     //上面提到了 当执行sproc回调的时候可能也会有到来的数据,为了避免那时插入rdllist加锁,
     //把数据放到ovlist中.在执行完后加入rdllist中
    for (nepi = ep->ovflist; (epi = nepi) != NULL;
         nepi = epi->next, epi->next = EP_UNACTIVE_PTR) {
        /*
         * We need to check if the item is already in the list.
         * During the "sproc" callback execution time, items are
         * queued into ->ovflist but the "txlist" might already
         * contain them, and the list_splice() below takes care of them.
         */
        if (!ep_is_linked(&epi->rdllink))
            list_add_tail(&epi->rdllink, &ep->rdllist);
    }
    /*
     * We need to set back ep->ovflist to EP_UNACTIVE_PTR, so that after
     * releasing the lock, events will be queued in the normal way inside
     * ep->rdllist.
     */
    ep->ovflist = EP_UNACTIVE_PTR; //设置为初始化时的样子

    /*
     * Quickly re-inject items left on "txlist".
     */
     //有可能有未处理完的数据,再插入rdllist中,比如说LT
    list_splice(&txlist, &ep->rdllist);

    if (!list_empty(&ep->rdllist)) { //rellist不为空的话,进行唤醒
        /*
         * Wake up (if active) both the eventpoll wait list and
         * the ->poll() wait list (delayed after we release the lock).
         */
        if (waitqueue_active(&ep->wq))
            wake_up_locked(&ep->wq);
        if (waitqueue_active(&ep->poll_wait))
            pwake++;
    }
    spin_unlock_irqrestore(&ep->lock, flags);

    mutex_unlock(&ep->mtx);

    /* We have to call this outside the lock */
    if (pwake)
        ep_poll_safewake(&ep->poll_wait);

    return error;
}

我们再来看看具体执行的回调,也就是上面所提到的sproc.

static int ep_send_events_proc(struct eventpoll *ep, struct list_head *head,
                   void *priv)
{
    struct ep_send_events_data *esed = priv;
    int eventcnt;
    unsigned int revents;
    struct epitem *epi;
    struct epoll_event __user *uevent;

    /*
     * We can loop without lock because we are passed a task private list.
     * Items cannot vanish during the loop because ep_scan_ready_list() is
     * holding "mtx" during this call.
     */
    for (eventcnt = 0, uevent = esed->events;
         !list_empty(head) && eventcnt < esed->maxevents;) { //可以看到,循环次数有我们设定的最大值和rdllist长度共同决定
        epi = list_first_entry(head, struct epitem, rdllink); //取出一个值

        list_del_init(&epi->rdllink); //删除它
        /*
         *读取触发的事件类型,这里我其实有一点疑问,我们在ep_poll_callback中其实已经设置了events,
         *也就是这里的第二项,为什么还要再取一次呢,
         *查阅资料后给出的解释是因为events是会变的,其次不是所有的poll实现, 都通过等待队列传递了events, 
         *有可能某些驱动压根没传,必须我们主动去读取.
         *比如说LT,我们实际插入了rdllist,但是确没有触发新的事件,加入我们已经在上次读取完全部,
         *这次不会触发可读了,如果没有第一步的话就出现了问题
         */
        revents = epi->ffd.file->f_op->poll(epi->ffd.file, NULL) &
            epi->event.events;

        /*
         * If the event mask intersect the caller-requested one,
         * deliver the event to userspace. Again, ep_scan_ready_list()
         * is holding "mtx", so no operations coming from userspace
         * can change the item.
         */
        if (revents) { 
            if (__put_user(revents, &uevent->events) || //将事件从内核空间传递至用户空间
                __put_user(epi->event.data, &uevent->data)) {
                list_add(&epi->rdllink, head);
                return eventcnt ? eventcnt : -EFAULT;
            }
            eventcnt++;//事件数加1 用于判断最大事件数
            uevent++; //用户指定的地址 指针向后移一位
            //我们可以看到EPOLLONESHOT事件会在每次触发时移除所有的事件类型,需要我们下次再指定
            if (epi->event.events & EPOLLONESHOT)
                epi->event.events &= EP_PRIVATE_BITS;
                //#define EP_PRIVATE_BITS (EPOLLONESHOT | EPOLLET)
            else if (!(epi->event.events & EPOLLET)) { //LT会把事件再次插入rdllist.
                //ET与LT的差别其实就是一句代码,即是否插入rdllist而已
                //就算ET中此时是有数据的,但是没有被取出,而所对应的fd也没有新事件的话就再也不会触发了,而LT还会继续触发.
                //但是如果上面说的ET的情况下,再次来了新数据,仍然会触发回调再次加入rdllist中
                /*
                 * If this file has been added with Level
                 * Trigger mode, we need to insert back inside
                 * the ready list, so that the next call to
                 * epoll_wait() will check again the events
                 * availability. At this point, noone can insert
                 * into ep->rdllist besides us. The epoll_ctl()
                 * callers are locked out by
                 * ep_scan_ready_list() holding "mtx" and the
                 * poll callback will queue them in ep->ovflist.
                 */
                list_add_tail(&epi->rdllink, &ep->rdllist);
            }
        }
    }

    return eventcnt; //返回触发的事件数
}

惊群问题详解

原文链接: https://www.cnblogs.com/lizhaolong/p/16437325.html

欢迎关注

微信关注下方公众号,第一时间获取干货硬货;公众号内回复【pdf】免费获取数百本计算机经典书籍;

也有高质量的技术群,里面有嵌入式、搜广推等BAT大佬

    epoll源码解析(3) epoll_wait

原创文章受到原创版权保护。转载请注明出处:https://www.ccppcoding.com/archives/395983

非原创文章文中已经注明原地址,如有侵权,联系删除

关注公众号【高性能架构探索】,第一时间获取最新文章

转载文章受原作者版权保护。转载请注明原作者出处!

(0)
上一篇 2023年4月5日 下午1:49
下一篇 2023年4月5日 下午1:49

相关推荐