epoll源码解析(2) epoll_ctl

示例代码内核版本为2.6.38

epoll源码解析(1) epoll_create
epoll源码解析(2) epoll_ctl
epoll源码解析(3) epoll_wait

引言

上一篇文章中分析的epoll的重要数据结构和epoll_create的实现,如果说上篇文章是理解epoll的基础,那这篇文章就是解释为何epoll如此高效的原因,我们来一起看看吧!

在这里插入图片描述

/*
 * epfd 为epoll_create创建的fd
 * op 是我们的操作种类 增删改 ADD DEL MOD
 * fd 当然是我们要加入的fd了
 * event 我们所关心的事件类型 注意只有我们注册的事件才会在epoll_wait被唤醒后传递到用户空间 否则虽然内核可以收到 但不会传递到用户空间
 */
SYSCALL_DEFINE4(epoll_ctl, int, epfd, int, op, int, fd,
        struct epoll_event __user *, event)
{
    int error;
    int did_lock_epmutex = 0;
    struct file *file, *tfile;
    struct eventpoll *ep;
    struct epitem *epi;
    struct epoll_event epds;

    error = -EFAULT;
    /*
    static inline int ep_op_has_event(int op)
    {
        return op != EPOLL_CTL_DEL; // DEL的时候当然不用从用户态拷贝event啦
    }
*/
    if (ep_op_has_event(op) && //我们可以在上面看到ep_op_has_event的函数体 用以判断是否需要从用户态拷贝数据
        copy_from_user(&epds, event, sizeof(struct epoll_event))) //从用户空间把数据拷贝过来
        goto error_return;

    /* Get the "struct file *" for the eventpoll file */
    error = -EBADF;
    file = fget(epfd); //得到epoll的file指针
    if (!file)
        goto error_return;

    /* Get the "struct file *" for the target file */
    tfile = fget(fd); //得到要添加fd的file指针
    if (!tfile)
        goto error_fput;

    /* The target file descriptor must support poll */
    error = -EPERM;
    if (!tfile->f_op || !tfile->f_op->poll) //如果要监听的fd不支持poll的话就没办法了 只能报error了.
        goto error_tgt_fput;

    /*
     * We have to check that the file structure underneath the file descriptor
     * the user passed to us _is_ an eventpoll file. And also we do not permit
     * adding an epoll file descriptor inside itself.
     */
    error = -EINVAL;
    /*
/*
static inline int is_file_epoll(struct file *f)
{
    return f->f_op == &eventpoll_fops; //更快的检测fd是否为一个epoll_fd
}
*/
    if (file == tfile || !is_file_epoll(file)) //当然不允许自己监听自己了,同时得保证epfd是一个epoll的fd
        goto error_tgt_fput;

    /*
     * At this point it is safe to assume that the "private_data" contains
     * our own data structure.
     */
    ep = file->private_data; //上一篇说到private_data中存的是分配好的eventpoll

    /*
     * When we insert an epoll file descriptor, inside another epoll file
     * descriptor, there is the change of creating closed loops, which are
     * better be handled here, than in more critical paths.
     *
     * We hold epmutex across the loop check and the insert in this case, in
     * order to prevent two separate inserts from racing and each doing the
     * insert "at the same time" such that ep_loop_check passes on both
     * before either one does the insert, thereby creating a cycle.
     */
    if (unlikely(is_file_epoll(tfile) && op == EPOLL_CTL_ADD)) { //防止epoll插入到另一个epoll中
        mutex_lock(&epmutex);
        did_lock_epmutex = 1;
        error = -ELOOP;
        if (ep_loop_check(ep, tfile) != 0)
            goto error_tgt_fput;
    }


    mutex_lock(&ep->mtx); //对fd进行操作时上mtx这把锁

    /*
     * Try to lookup the file inside our RB tree, Since we grabbed "mtx"
     * above, we can be sure to be able to use the item looked up by
     * ep_find() till we release the mutex.
     */
    epi = ep_find(ep, tfile, fd); //O(logn)查询要操作的fd是否已经存在
    /*
        static inline void ep_set_ffd(struct epoll_filefd *ffd,
                          struct file *file, int fd) //创建一个进行比较的项
        {
            ffd->file = file;
            ffd->fd = fd;
        }


        static inline int ep_cmp_ffd(struct epoll_filefd *p1,
                         struct epoll_filefd *p2)
        {
            return (p1->file > p2->file ? +1:
                    (p1->file < p2->file ? -1 : p1->fd - p2->fd));
        }
        //从RB-tree中寻找期望插入的fd是否存在
        static struct epitem *ep_find(struct eventpoll *ep, struct file *file, int fd)
        { 
            int kcmp;
            struct rb_node *rbp;
            struct epitem *epi, *epir = NULL;
            struct epoll_filefd ffd;

            ep_set_ffd(&ffd, file, fd);
            for (rbp = ep->rbr.rb_node; rbp; ) {
                epi = rb_entry(rbp, struct epitem, rbn);
                kcmp = ep_cmp_ffd(&ffd, &epi->ffd); //红黑树的排序规则
                if (kcmp > 0)
                    rbp = rbp->rb_right;
                else if (kcmp < 0)
                    rbp = rbp->rb_left;
                else {
                    epir = epi;
                    break;
                }
            }
            return epir; //如果找到返回找到的fd所对应的节点
        }
    */

    error = -EINVAL;
    switch (op) {
    case EPOLL_CTL_ADD:
        if (!epi) {
            epds.events |= POLLERR | POLLHUP; //我们可以看到这两个事件显然我们也没有必要注册
            error = ep_insert(ep, &epds, tfile, fd);
        } else
            error = -EEXIST;
        break;
    case EPOLL_CTL_DEL:
        if (epi)
            error = ep_remove(ep, epi);
        else
            error = -ENOENT;
        break;
    case EPOLL_CTL_MOD:
        if (epi) {
            epds.events |= POLLERR | POLLHUP;
            error = ep_modify(ep, epi, &epds);
        } else
            error = -ENOENT;
        break;
    }
    mutex_unlock(&ep->mtx);

error_tgt_fput:
    if (unlikely(did_lock_epmutex))
        mutex_unlock(&epmutex);

    fput(tfile); //正常关闭文件
error_fput:
    fput(file);
error_return:

    return error;
}
//注意 这个整个函数调用都是上锁的
static int ep_insert(struct eventpoll *ep, struct epoll_event *event,
             struct file *tfile, int fd)
{
    int error, revents, pwake = 0;
    unsigned long flags;
    long user_watches;
    struct epitem *epi;
    struct ep_pqueue epq;

    //是否到达最大的监听数
    user_watches = atomic_long_read(&ep->user->epoll_watches);
    if (unlikely(user_watches >= max_user_watches))
        return -ENOSPC;
    if (!(epi = kmem_cache_alloc(epi_cache, GFP_KERNEL))) //最有意思的地方 slab机制,在缓存中分配对象,为了更快的访问
        return -ENOMEM;

    /* Item initialization follow here ... */ //正如内核注释所言 正常的初始化
    INIT_LIST_HEAD(&epi->rdllink);
    INIT_LIST_HEAD(&epi->fllink);
    INIT_LIST_HEAD(&epi->pwqlist);
    epi->ep = ep;
    ep_set_ffd(&epi->ffd, tfile, fd);
    epi->event = *event;
    epi->nwait = 0;
    epi->next = EP_UNACTIVE_PTR;

    /* Initialize the poll table using the queue callback */
    epq.epi = epi;
    //设置poll的回调为ep_ptable_queue_proc,这其中藏着epoll如此高效的原因!
    init_poll_funcptr(&epq.pt, ep_ptable_queue_proc);

    /*
     * Attach the item to the poll hooks and get current event bits.
     * We can safely use the file* here because its usage count has
     * been increased by the caller of this function. Note that after
     * this operation completes, the poll callback can start hitting
     * the new item.
     */
    //进行完这一步 算是把epitem和这个fd连接起来了 会在事件到来的时候执行上面注册的回调
    revents = tfile->f_op->poll(tfile, &epq.pt);

    /*
     * We have to check if something went wrong during the poll wait queue
     * install process. Namely an allocation for a wait queue failed due
     * high memory pressure.
     */
    error = -ENOMEM;
    if (epi->nwait < 0)
        goto error_unregister;

    /* Add the current item to the list of active epoll hook for this file */
    spin_lock(&tfile->f_lock); //tfile是要加入的fd的file指针
    list_add_tail(&epi->fllink, &tfile->f_ep_links); //file会把监听自己的epitem连接起来
    spin_unlock(&tfile->f_lock);

    /*
     * Add the current item to the RB tree. All RB tree operations are
     * protected by "mtx", and ep_insert() is called with "mtx" held.
     */
    ep_rbtree_insert(ep, epi); //向ep的红黑树中插入一个epitem

    /* We have to drop the new item inside our item list to keep track of it */
    spin_lock_irqsave(&ep->lock, flags);

    /* If the file is already "ready" we drop it inside the ready list */
    //这里处理的是监听的fd刚好有事件发生,如果现在不处理,可能后面不会到来数据,也就刽触发回调,它们可能就再也不会被处理了
    if ((revents & event->events) && !ep_is_linked(&epi->rdllink)) {
        list_add_tail(&epi->rdllink, &ep->rdllist);

        /* Notify waiting tasks that events are available */
        if (waitqueue_active(&ep->wq))
            wake_up_locked(&ep->wq); //唤醒epoll_wait中的等待队列
        if (waitqueue_active(&ep->poll_wait))
            pwake++;
    }

    spin_unlock_irqrestore(&ep->lock, flags);

    atomic_long_inc(&ep->user->epoll_watches);

    /* We have to call this outside the lock */
    if (pwake)
        ep_poll_safewake(&ep->poll_wait);

    return 0;

error_unregister:
    ep_unregister_pollwait(ep, epi);

    /*
     * We need to do this because an event could have been arrived on some
     * allocated wait queue. Note that we don't care about the ep->ovflist
     * list, since that is used/cleaned only inside a section bound by "mtx".
     * And ep_insert() is called with "mtx" held.
     */
    spin_lock_irqsave(&ep->lock, flags);
    if (ep_is_linked(&epi->rdllink))
        list_del_init(&epi->rdllink);
    spin_unlock_irqrestore(&ep->lock, flags);

    kmem_cache_free(epi_cache, epi);

    return error;
}

我们来看看poll被调用时执行的回调ep_ptable_queue_proc

static void ep_ptable_queue_proc(struct file *file, wait_queue_head_t *whead,
                 poll_table *pt)
{
    struct epitem *epi = ep_item_from_epqueue(pt);
    struct eppoll_entry *pwq;
/*

struct eppoll_entry { //这个结构体就是做一个epitem和其回调之间的关联
    // List header used to link this structure to the "struct epitem" 
    struct list_head llink;

    // The "base" pointer is set to the container "struct epitem" 
    struct epitem *base;

     //Wait queue item that will be linked to the target file wait
     //queue head.
    wait_queue_t wait;

    // The wait queue head that linked the "wait" wait queue item
    wait_queue_head_t *whead;
};

*/
    if (epi->nwait >= 0 && (pwq = kmem_cache_alloc(pwq_cache, GFP_KERNEL))) {
        //初始化一个等待队列的节点 其中注册的回调为ep_poll_callback
        init_waitqueue_func_entry(&pwq->wait, ep_poll_callback);
        pwq->whead = whead;  //这个就是监控的fd的等待队列头
        pwq->base = epi; 
        add_wait_queue(whead, &pwq->wait);//把初始化的等待对列头插入到所监控的fd的等待对列中,稍后调用
        list_add_tail(&pwq->llink, &epi->pwqlist);
        epi->nwait++;//记录了poll执行回调的次数,即数据来了几次
    } else {
        /* We have to signal that an error occurred */
        epi->nwait = -1;
    }
}

显然 全文的重点就是那个加入到所监听fd等待队列中的回调ep_poll_callback,我们来一睹它的真容吧!

static int ep_poll_callback(wait_queue_t *wait, unsigned mode, int sync, void *key) //key是events
{
    int pwake = 0;
    unsigned long flags;
    struct epitem *epi = ep_item_from_wait(wait); //由等待队列中获取对应的epitem
    struct eventpoll *ep = epi->ep; //从而得到epoll的结构

    spin_lock_irqsave(&ep->lock, flags); //这里要操作rdllist 上锁

    /*
     * If the event mask does not contain any poll(2) event, we consider the
     * descriptor to be disabled. This condition is likely the effect of the
     * EPOLLONESHOT bit that disables the descriptor when an event is received,
     * until the next EPOLL_CTL_MOD will be issued.
     */
     //#define EP_PRIVATE_BITS (EPOLLONESHOT | EPOLLET)
    if (!(epi->event.events & ~EP_PRIVATE_BITS)) //我们看到如果所监控的fd没有注册什么事件的话,就不加入rdllist中
        goto out_unlock;

    /*
     * Check the events coming with the callback. At this stage, not
     * every device reports the events in the "key" parameter of the
     * callback. We need to be able to handle both cases here, hence the
     * test for "key" != NULL before the event match test.
     */
    if (key && !((unsigned long) key & epi->event.events)) //所触发的事件我们没有注册的话当然也不加入rdllist中
        goto out_unlock;

    /*
     * If we are trasfering events to userspace, we can hold no locks
     * (because we're accessing user memory, and because of linux f_op->poll()
     * semantics). All the events that happens during that period of time are
     * chained in ep->ovflist and requeued later on.
     */
     /*
      * 这里就比较有意思了
      * 在epoll_wait中EP_UNACTIVE_PTR初始化为EP_UNACTIVE_PTR,而当epoll_wait]中被唤醒后处理rdlist时会将ep->ovflist置为NULL
      * 也就是说如果ep->ovflist != EP_UNACTIVE_PTR意味这epoll_wait已被唤醒 正在执行loop
      * 此时我们就把在rdllist遍历时发生的事件用ovflist串起来,在遍历结束后插入rdllist中
      */
    if (unlikely(ep->ovflist != EP_UNACTIVE_PTR)) {
        if (epi->next == EP_UNACTIVE_PTR) {
            epi->next = ep->ovflist;
            ep->ovflist = epi;
        }
        goto out_unlock;
    }

    /* If this file is already in the ready list we exit soon */
    //将当前的epitem加入到rdllist中
    //这就是epoll如此高效的原因,我们不必每次去遍历fd寻找触发的事件,触发事件时会触发回调自动把epitem加入到rdllist中,
    //这使得复杂度从O(N)降到了O(有效事件集合),且我们不必每次注册事件,仅在epoll_ctl(ADD)中注册一次即可(修改除外),
    if (!ep_is_linked(&epi->rdllink)) 
        list_add_tail(&epi->rdllink, &ep->rdllist);

    /*
     * Wake up ( if active ) both the eventpoll wait list and the ->poll()
     * wait list.
     */
     //唤醒epoll_wait
    if (waitqueue_active(&ep->wq))
        wake_up_locked(&ep->wq);
    if (waitqueue_active(&ep->poll_wait))
        pwake++;

out_unlock:
    spin_unlock_irqrestore(&ep->lock, flags);

    /* We have to call this outside the lock */
    if (pwake)
        ep_poll_safewake(&ep->poll_wait);

    return 1;
}

总结

epoll_ctl我们调了ep_insert来讲,这是因为其基本道出了epoll如此高效的原因,首先是epitem的创建使用了slab机制,利用缓存来加速我们的操作,其次是红黑树,这使得我们的增删改操作的复杂度均为O(logn),最重要的一点就是回调的设计,epoll会在插入一个节点的时候创建一个epoll_entry,然后在其中注册ep_ptable_queue_proc,fd会在poll的时候执行这个回调,同时调用ep_poll_callback,这使得被监控的fd被触发时直接就把epitem加入到rdllist中,这样rdllist中就全部都是就绪的fd了! 在有大量不活跃fd时有着显著的性能提升,除此之外我们还不必每次注册事件,这些都是在epitem中保存好的.

可以说epoll的精髓就在于这个神奇的回调.

原文链接: https://www.cnblogs.com/lizhaolong/p/16437326.html

欢迎关注

微信关注下方公众号,第一时间获取干货硬货;公众号内回复【pdf】免费获取数百本计算机经典书籍;

也有高质量的技术群,里面有嵌入式、搜广推等BAT大佬

    epoll源码解析(2) epoll_ctl

原创文章受到原创版权保护。转载请注明出处:https://www.ccppcoding.com/archives/395985

非原创文章文中已经注明原地址,如有侵权,联系删除

关注公众号【高性能架构探索】,第一时间获取最新文章

转载文章受原作者版权保护。转载请注明原作者出处!

(0)
上一篇 2023年4月5日 下午1:49
下一篇 2023年4月5日 下午1:49

相关推荐