設(shè)置
  • 日夜間
    隨系統(tǒng)
    淺色
    深色
  • 主題色

圖解 | 深入揭秘 epoll 是如何實(shí)現(xiàn) IO 多路復(fù)用的

開發(fā)內(nèi)功修煉 2022/10/3 18:33:30 責(zé)編:遠(yuǎn)生

本文來(lái)自微信公眾號(hào):開發(fā)內(nèi)功修煉 (ID:kfngxl),作者:張彥飛 allen

進(jìn)程在 Linux 上是一個(gè)開銷不小的家伙,先不說(shuō)創(chuàng)建,光是上下文切換一次就得幾個(gè)微秒。所以為了高效地對(duì)海量用戶提供服務(wù),必須要讓一個(gè)進(jìn)程能同時(shí)處理很多個(gè) tcp 連接才行。現(xiàn)在假設(shè)一個(gè)進(jìn)程保持了 10000 條連接,那么如何發(fā)現(xiàn)哪條連接上有數(shù)據(jù)可讀了、哪條連接可寫了 ?

我們當(dāng)然可以采用循環(huán)遍歷的方式來(lái)發(fā)現(xiàn) IO 事件,但這種方式太低級(jí)了。我們希望有一種更高效的機(jī)制,在很多連接中的某條上有 IO 事件發(fā)生的時(shí)候直接快速把它找出來(lái)。其實(shí)這個(gè)事情 Linux 操作系統(tǒng)已經(jīng)替我們都做好了,它就是我們所熟知的 IO 多路復(fù)用機(jī)制。這里的復(fù)用指的就是對(duì)進(jìn)程的復(fù)用。

在 Linux 上多路復(fù)用方案有 select、poll、epoll。它們?nèi)齻€(gè)中 epoll 的性能表現(xiàn)是最優(yōu)秀的,能支持的并發(fā)量也最大。所以我們今天把 epoll 作為要拆解的對(duì)象,深入揭秘內(nèi)核是如何實(shí)現(xiàn)多路的 IO 管理的。

為了方便討論,我們舉一個(gè)使用了 epoll 的簡(jiǎn)單示例(只是個(gè)例子,實(shí)踐中不這么寫):

int main(){
    listen(lfd, );

    cfd1 = accept();
    cfd2 = accept();
    efd = epoll_create();

    epoll_ctl(efd, EPOLL_CTL_ADD, cfd1, );
    epoll_ctl(efd, EPOLL_CTL_ADD, cfd2, );
    epoll_wait(efd, )
}

其中和 epoll 相關(guān)的函數(shù)是如下三個(gè):

epoll_create:創(chuàng)建一個(gè) epoll 對(duì)象

epoll_ctl:向 epoll 對(duì)象中添加要管理的連接

epoll_wait:等待其管理的連接上的 IO 事件

借助這個(gè) demo,我們來(lái)展開對(duì) epoll 原理的深度拆解。相信等你理解了這篇文章以后,你對(duì) epoll 的駕馭能力將變得爐火純青??!

友情提示,萬(wàn)字長(zhǎng)文,慎入?。?/strong>

一、accept 創(chuàng)建新 socket

我們直接從服務(wù)器端的 accept 講起。當(dāng) accept 之后,進(jìn)程會(huì)創(chuàng)建一個(gè)新的 socket 出來(lái),專門用于和對(duì)應(yīng)的客戶端通信,然后把它放到當(dāng)前進(jìn)程的打開文件列表中。

其中一條連接的 socket 內(nèi)核對(duì)象更為具體一點(diǎn)的結(jié)構(gòu)圖如下。

接下來(lái)我們來(lái)看一下接收連接時(shí) socket 內(nèi)核對(duì)象的創(chuàng)建源碼。accept 的系統(tǒng)調(diào)用代碼位于源文件 net / socket.c 下。

//file: net/socket.c
SYSCALL_DEFINE4(accept4, int, fd, struct sockaddr __user *, upeer_sockaddr,
        int __user *, upeer_addrlen, int, flags)
{
    struct socket *sock, *newsock;

    //根據(jù) fd 查找到監(jiān)聽的 socket
    sock = sockfd_lookup_light(fd, &err, &fput_needed);

    //1.1 申請(qǐng)并初始化新的 socket
    newsock = sock_alloc();
    newsock->type = sock->type;
    newsock->ops = sock->ops;

    //1.2 申請(qǐng)新的 file 對(duì)象,并設(shè)置到新 socket 上
    newfile = sock_alloc_file(newsock, flags, sock->sk->sk_prot_creator->name);
    ......

    //1.3 接收連接
    err = sock->ops->accept(sock, newsock, sock->file->f_flags);

    //1.4 添加新文件到當(dāng)前進(jìn)程的打開文件列表
    fd_install(newfd, newfile);

1.1 初始化 struct socket 對(duì)象

在上述的源碼中,首先是調(diào)用 sock_alloc 申請(qǐng)一個(gè) struct socket 對(duì)象出來(lái)。然后接著把 listen 狀態(tài)的 socket 對(duì)象上的協(xié)議操作函數(shù)集合 ops 賦值給新的 socket。(對(duì)于所有的 AF_INET 協(xié)議族下的 socket 來(lái)說(shuō),它們的 ops 方法都是一樣的,所以這里可以直接復(fù)制過來(lái))

其中 inet_stream_ops 的定義如下

//file: net/ipv4/af_inet.c
const struct proto_ops inet_stream_ops = {
    ...
    .accept        = inet_accept,
    .listen        = inet_listen,
    .sendmsg       = inet_sendmsg,
    .recvmsg       = inet_recvmsg,
    ...
}

1.2 為新 socket 對(duì)象申請(qǐng) file

struct socket 對(duì)象中有一個(gè)重要的成員 -- file 內(nèi)核對(duì)象指針。這個(gè)指針初始化的時(shí)候是空的。在 accept 方法里會(huì)調(diào)用 sock_alloc_file 來(lái)申請(qǐng)內(nèi)存并初始化。然后將新 file 對(duì)象設(shè)置到 sock->file 上。

來(lái)看 sock_alloc_file 的實(shí)現(xiàn)過程:

struct file *sock_alloc_file(struct socket *sock, int flags, 
    const char *dname)
{
    struct file *file;
    file = alloc_file(&path, FMODE_READ | FMODE_WRITE,
            &socket_file_ops);
    ......
    sock-file = file;
}

sock_alloc_file 又會(huì)接著調(diào)用到 alloc_file。注意在 alloc_file 方法中,把 socket_file_ops 函數(shù)集合一并賦到了新 file->f_op 里了。

//file: fs/file_table.c
struct file *alloc_file(struct path *path, fmode_t mode,
        const struct file_operations *fop)
{
    struct file *file;
    file-f_op = fop;
    ......
}

socket_file_ops 的具體定義如下:

//file: net/socket.c
static const struct file_operations socket_file_ops = {
    ...
    .aio_read   = sock_aio_read,
    .aio_write  = sock_aio_write,
    .poll     = sock_poll,
    .release  = sock_close,
    ...
};

這里看到,在 accept 里創(chuàng)建的新 socket 里的 file->f_op->poll 函數(shù)指向的是 sock_poll。接下來(lái)我們會(huì)調(diào)用到它,后面我們?cè)僬f(shuō)。

其實(shí) file 對(duì)象內(nèi)部也有一個(gè) socket 指針,指向 socket 對(duì)象。

1.3 接收連接

在 socket 內(nèi)核對(duì)象中除了 file 對(duì)象指針以外,有一個(gè)核心成員 sock。

//file: include/linux/net.h
struct socket {
    struct file     *file;
    struct sock     *sk;
}

這個(gè) struct sock 數(shù)據(jù)結(jié)構(gòu)非常大,是 socket 的核心內(nèi)核對(duì)象。發(fā)送隊(duì)列、接收隊(duì)列、等待隊(duì)列等核心數(shù)據(jù)結(jié)構(gòu)都位于此。其定義位置文件 include / net / sock.h,由于太長(zhǎng)就不展示了。

在 accept 的源碼中:

//file: net/socket.c
SYSCALL_DEFINE4(accept4, )
    
    //1.3 接收連接
    err = sock->ops->accept(sock, newsock, sock->file->f_flags);
}

sock->ops->accept 對(duì)應(yīng)的方法是 inet_accept。它執(zhí)行的時(shí)候會(huì)從握手隊(duì)列里直接獲取創(chuàng)建好的 sock。sock 對(duì)象的完整創(chuàng)建過程涉及到三次握手,比較復(fù)雜,不展開了說(shuō)了。咱們只看 struct sock 初始化過程中用到的一個(gè)函數(shù):

void sock_init_data(struct socket *sock, struct sock *sk)
{
    sk-sk_wq   =   NULL;
    sk-sk_data_ready   =   sock_def_readable;
}

在這里把 sock 對(duì)象的 sk_data_ready 函數(shù)指針設(shè)置為 sock_def_readable。這個(gè)這里先記住就行了,后面會(huì)用到。

1.4 添加新文件到當(dāng)前進(jìn)程的打開文件列表中

當(dāng) file、socket、sock 等關(guān)鍵內(nèi)核對(duì)象創(chuàng)建完畢以后,剩下要做的一件事情就是把它掛到當(dāng)前進(jìn)程的打開文件列表中就行了。

//file: fs/file.c
void fd_install(unsigned int fd, struct file *file)
{
    __fd_install(current->files, fd, file);
}

void __fd_install(struct files_struct *files, unsigned int fd,
        struct file *file)
{
    ...
    fdt = files_fdtable(files);
    BUG_ON(fdt->fd[fd] != NULL);
    rcu_assign_pointer(fdt->fd[fd], file);
}

二、epoll_create 實(shí)現(xiàn)

在用戶進(jìn)程調(diào)用 epoll_create 時(shí),內(nèi)核會(huì)創(chuàng)建一個(gè) struct eventpoll 的內(nèi)核對(duì)象。并同樣把它關(guān)聯(lián)到當(dāng)前進(jìn)程的已打開文件列表中。

對(duì)于 struct eventpoll 對(duì)象,更詳細(xì)的結(jié)構(gòu)如下(同樣只列出和今天主題相關(guān)的成員)。

epoll_create 的源代碼相對(duì)比較簡(jiǎn)單。在 fs / eventpoll.c 下

// file:fs/eventpoll.c
SYSCALL_DEFINE1(epoll_create1, int, flags)
{
    struct eventpoll *ep = NULL;

    //創(chuàng)建一個(gè) eventpoll 對(duì)象
    error = ep_alloc(&ep);
}

struct eventpoll 的定義也在這個(gè)源文件中。

// file:fs/eventpoll.c
struct eventpoll {

    //sys_epoll_wait用到的等待隊(duì)列
    wait_queue_head_t wq;

    //接收就緒的描述符都會(huì)放到這里
    struct list_head rdllist;

    //每個(gè)epoll對(duì)象中都有一顆紅黑樹
    struct rb_root rbr;

    ......
}

eventpoll 這個(gè)結(jié)構(gòu)體中的幾個(gè)成員的含義如下:

wq: 等待隊(duì)列鏈表。軟中斷數(shù)據(jù)就緒的時(shí)候會(huì)通過 wq 來(lái)找到阻塞在 epoll 對(duì)象上的用戶進(jìn)程。

rbr: 一棵紅黑樹。為了支持對(duì)海量連接的高效查找、插入和刪除,eventpoll 內(nèi)部使用了一棵紅黑樹。通過這棵樹來(lái)管理用戶進(jìn)程下添加進(jìn)來(lái)的所有 socket 連接。

rdllist: 就緒的描述符的鏈表。當(dāng)有的連接就緒的時(shí)候,內(nèi)核會(huì)把就緒的連接放到 rdllist 鏈表里。這樣應(yīng)用進(jìn)程只需要判斷鏈表就能找出就緒進(jìn)程,而不用去遍歷整棵樹。

當(dāng)然這個(gè)結(jié)構(gòu)被申請(qǐng)完之后,需要做一點(diǎn)點(diǎn)的初始化工作,這都在 ep_alloc 中完成。

//file: fs/eventpoll.c
static int ep_alloc(struct eventpoll **pep)
{
    struct eventpoll *ep;

    //申請(qǐng) epollevent 內(nèi)存
    ep = kzalloc(sizeof(*ep), GFP_KERNEL);

    //初始化等待隊(duì)列頭
    init_waitqueue_head(&ep->wq);

    //初始化就緒列表
    INIT_LIST_HEAD(&ep->rdllist);

    //初始化紅黑樹指針
    ep->rbr = RB_ROOT;

    ......
}

說(shuō)到這兒,這些成員其實(shí)只是剛被定義或初始化了,還都沒有被使用。它們會(huì)在下面被用到。

三、epoll_ctl 添加 socket

理解這一步是理解整個(gè) epoll 的關(guān)鍵

為了簡(jiǎn)單,我們只考慮使用 EPOLL_CTL_ADD 添加 socket,先忽略刪除和更新。

假設(shè)我們現(xiàn)在和客戶端們的多個(gè)連接的 socket 都創(chuàng)建好了,也創(chuàng)建好了 epoll 內(nèi)核對(duì)象。在使用 epoll_ctl 注冊(cè)每一個(gè) socket 的時(shí)候,內(nèi)核會(huì)做如下三件事情

1.分配一個(gè)紅黑樹節(jié)點(diǎn)對(duì)象 epitem,

2.添加等待事件到 socket 的等待隊(duì)列中,其回調(diào)函數(shù)是 ep_poll_callback

3.將 epitem 插入到 epoll 對(duì)象的紅黑樹里

通過 epoll_ctl 添加兩個(gè) socket 以后,這些內(nèi)核數(shù)據(jù)結(jié)構(gòu)最終在進(jìn)程中的關(guān)系圖大致如下:

我們來(lái)詳細(xì)看看 socket 是如何添加到 epoll 對(duì)象里的,找到 epoll_ctl 的源碼。

// file:fs/eventpoll.c
SYSCALL_DEFINE4(epoll_ctl, int, epfd, int, op, int, fd,
        struct epoll_event __user *, event)
{
    struct eventpoll *ep;
    struct file *file, *tfile;

    //根據(jù) epfd 找到 eventpoll 內(nèi)核對(duì)象
    file = fget(epfd);
    ep = file->private_data;

    //根據(jù) socket 句柄號(hào), 找到其 file 內(nèi)核對(duì)象
    tfile = fget(fd);

    switch (op) {
    case EPOLL_CTL_ADD:
        if (!epi) {
            epds.events |= POLLERR | POLLHUP;
            error = ep_insert(ep, &epds, tfile, fd);
        } else
            error = -EEXIST;
        clear_tfile_check_list();
        break;
}

在 epoll_ctl 中首先根據(jù)傳入 fd 找到 eventpoll、socket 相關(guān)的內(nèi)核對(duì)象 。對(duì)于 EPOLL_CTL_ADD 操作來(lái)說(shuō),會(huì)然后執(zhí)行到 ep_insert 函數(shù)。所有的注冊(cè)都是在這個(gè)函數(shù)中完成的。

//file: fs/eventpoll.c
static int ep_insert(struct eventpoll *ep, 
                struct epoll_event *event,
                struct file *tfile, int fd)
{
    //3.1 分配并初始化 epitem
    //分配一個(gè)epi對(duì)象
    struct epitem *epi;
    if (!(epi = kmem_cache_alloc(epi_cache, GFP_KERNEL)))
        return -ENOMEM;

    //對(duì)分配的epi進(jìn)行初始化
    //epi->ffd中存了句柄號(hào)和struct file對(duì)象地址
    INIT_LIST_HEAD(&epi->pwqlist);
    epi->ep = ep;
    ep_set_ffd(&epi->ffd, tfile, fd);

    //3.2 設(shè)置 socket 等待隊(duì)列
    //定義并初始化 ep_pqueue 對(duì)象
    struct ep_pqueue epq;
    epq.epi = epi;
    init_poll_funcptr(&epq.pt, ep_ptable_queue_proc);

    //調(diào)用 ep_ptable_queue_proc 注冊(cè)回調(diào)函數(shù) 
    //實(shí)際注入的函數(shù)為 ep_poll_callback
    revents = ep_item_poll(epi, &epq.pt);

    ......
    //3.3 將epi插入到 eventpoll 對(duì)象中的紅黑樹中
    ep_rbtree_insert(ep, epi);
    ......
}

3.1 分配并初始化 epitem

對(duì)于每一個(gè) socket,調(diào)用 epoll_ctl 的時(shí)候,都會(huì)為之分配一個(gè) epitem。該結(jié)構(gòu)的主要數(shù)據(jù)如下:

//file: fs/eventpoll.c
struct epitem {

    //紅黑樹節(jié)點(diǎn)
    struct rb_node rbn;

    //socket文件描述符信息
    struct epoll_filefd ffd;

    //所歸屬的 eventpoll 對(duì)象
    struct eventpoll *ep;

    //等待隊(duì)列
    struct list_head pwqlist;
}

對(duì) epitem 進(jìn)行了一些初始化,首先在 epi->ep = ep 這行代碼中將其 ep 指針指向 eventpoll 對(duì)象。另外用要添加的 socket 的 file、fd 來(lái)填充 epitem->ffd。

其中使用到的 ep_set_ffd 函數(shù)如下。

static inline void ep_set_ffd(struct epoll_filefd *ffd,
                        struct file *file, int fd)
{
    ffd-file = file;
    ffd-fd = fd;
}

3.2 設(shè)置 socket 等待隊(duì)列

在創(chuàng)建 epitem 并初始化之后,ep_insert 中第二件事情就是設(shè)置 socket 對(duì)象上的等待任務(wù)隊(duì)列。并把函數(shù) fs / eventpoll.c 文件下的 ep_poll_callback 設(shè)置為數(shù)據(jù)就緒時(shí)候的回調(diào)函數(shù)。

這一塊的源代碼稍微有點(diǎn)繞,沒有耐心的話直接跳到下面的加粗字體來(lái)看。首先來(lái)看 ep_item_poll。

static inline unsigned int ep_item_poll(struct epitem *epi, poll_table *pt)
{
    pt-_key = epi-event.events;

    return epi-ffd.file-f_op-poll(epi-ffd.file, pt) & epi-event.events;
}

看,這里調(diào)用到了 socket 下的 file->f_op->poll。通過上面第一節(jié)的 socket 的結(jié)構(gòu)圖,我們知道這個(gè)函數(shù)實(shí)際上是 sock_poll。

/* No kernel lock held - perfect */
static unsigned int sock_poll(struct file *file, poll_table *wait)
{
    ...
    return sock-ops-poll(file, sock, wait);
}

同樣回看第一節(jié)里的 socket 的結(jié)構(gòu)圖,sock->ops->poll 其實(shí)指向的是 tcp_poll。

//file: net/ipv4/tcp.c
unsigned int tcp_poll(struct file *file, struct socket *sock, poll_table *wait)
{
    struct sock *sk = sock-sk;

    sock_poll_wait(file, sk_sleep(sk), wait);
}

在 sock_poll_wait 的第二個(gè)參數(shù)傳參前,先調(diào)用了 sk_sleep 函數(shù)。在這個(gè)函數(shù)里它獲取了 sock 對(duì)象下的等待隊(duì)列列表頭 wait_queue_head_t,待會(huì)等待隊(duì)列項(xiàng)就插入這里。這里稍微注意下,是 socket 的等待隊(duì)列,不是 epoll 對(duì)象的。來(lái)看 sk_sleep 源碼:

//file: include/net/sock.h
static inline wait_queue_head_t *sk_sleep(struct sock *sk)
{
    BUILD_BUG_ON(offsetof(struct socket_wq, wait) != 0);
    return &rcu_dereference_raw(sk-sk_wq)-wait;
}

接著真正進(jìn)入 sock_poll_wait。

static inline void sock_poll_wait(struct file *filp,
        wait_queue_head_t *wait_address, poll_table *p)
{
    poll_wait(filp, wait_address, p);
}
static inline void poll_wait(struct file * filp, wait_queue_head_t * wait_address, poll_table *p)
{
    if (p && p-_qproc && wait_address)
        p-_qproc(filp, wait_address, p);
}

這里的 qproc 是個(gè)函數(shù)指針,它在前面的 init_poll_funcptr 調(diào)用時(shí)被設(shè)置成了 ep_ptable_queue_proc 函數(shù)。

static int ep_insert(...)
{
    ...
    init_poll_funcptr(&epq.pt, ep_ptable_queue_proc);
    ...
}
//file: include/linux/poll.h
static inline void init_poll_funcptr(poll_table *pt, 
    poll_queue_proc qproc)
{
    pt->_qproc = qproc;
    pt->_key   = ~0UL; /* all events enabled */
}

敲黑板?。?!注意,廢了半天的勁,終于到了重點(diǎn)了!在 ep_ptable_queue_proc 函數(shù)中,新建了一個(gè)等待隊(duì)列項(xiàng),并注冊(cè)其回調(diào)函數(shù)為 ep_poll_callback 函數(shù)。然后再將這個(gè)等待項(xiàng)添加到 socket 的等待隊(duì)列中。

//file: fs/eventpoll.c
static void ep_ptable_queue_proc(struct file *file, wait_queue_head_t *whead,
                 poll_table *pt)
{
    struct eppoll_entry *pwq;
    f (epi-nwait = 0 && (pwq = kmem_cache_alloc(pwq_cache, GFP_KERNEL))) {
                //初始化回調(diào)方法
                init_waitqueue_func_entry(&pwq->wait, ep_poll_callback);

                //將ep_poll_callback放入socket的等待隊(duì)列whead(注意不是epoll的等待隊(duì)列)
                add_wait_queue(whead, &pwq->wait);

        }

在前文 深入理解高性能網(wǎng)絡(luò)開發(fā)路上的絆腳石 - 同步阻塞網(wǎng)絡(luò) IO 里阻塞式的系統(tǒng)調(diào)用 recvfrom 里,由于需要在數(shù)據(jù)就緒的時(shí)候喚醒用戶進(jìn)程,所以等待對(duì)象項(xiàng)的 private (這個(gè)變量名起的也是醉了) 會(huì)設(shè)置成當(dāng)前用戶進(jìn)程描述符 current。而我們今天的 socket 是交給 epoll 來(lái)管理的,不需要在一個(gè) socket 就緒的時(shí)候就喚醒進(jìn)程,所以這里的 q->private 沒有啥卵用就設(shè)置成了 NULL。

//file:include/linux/wait.h
static inline void init_waitqueue_func_entry(
    wait_queue_t *q, wait_queue_func_t func)
{
    q-flags = 0;
    q-private = NULL;

    //ep_poll_callback 注冊(cè)到 wait_queue_t對(duì)象上
    //有數(shù)據(jù)到達(dá)的時(shí)候調(diào)用 q-func
    q-func = func;   
}

如上,等待隊(duì)列項(xiàng)中僅僅只設(shè)置了回調(diào)函數(shù) q->func 為 ep_poll_callback。在后面的第 5 節(jié)數(shù)據(jù)來(lái)啦中我們將看到,軟中斷將數(shù)據(jù)收到 socket 的接收隊(duì)列后,會(huì)通過注冊(cè)的這個(gè) ep_poll_callback 函數(shù)來(lái)回調(diào),進(jìn)而通知到 epoll 對(duì)象。

3.3 插入紅黑樹

分配完 epitem 對(duì)象后,緊接著并把它插入到紅黑樹中。一個(gè)插入了一些 socket 描述符的 epoll 里的紅黑樹的示意圖如下:

這里我們?cè)倭牧臑樯兑眉t黑樹,很多人說(shuō)是因?yàn)樾矢摺F鋵?shí)我覺得這個(gè)解釋不夠全面,要說(shuō)查找效率樹哪能比的上 HASHTABLE。我個(gè)人認(rèn)為覺得更為合理的一個(gè)解釋是為了讓 epoll 在查找效率、插入效率、內(nèi)存開銷等等多個(gè)方面比較均衡,最后發(fā)現(xiàn)最適合這個(gè)需求的數(shù)據(jù)結(jié)構(gòu)是紅黑樹。

四、epoll_wait 等待接收

epoll_wait 做的事情不復(fù)雜,當(dāng)它被調(diào)用時(shí)它觀察 eventpoll->rdllist 鏈表里有沒有數(shù)據(jù)即可。有數(shù)據(jù)就返回,沒有數(shù)據(jù)就創(chuàng)建一個(gè)等待隊(duì)列項(xiàng),將其添加到 eventpoll 的等待隊(duì)列上,然后把自己阻塞掉就完事。

注意:epoll_ctl 添加 socket 時(shí)也創(chuàng)建了等待隊(duì)列項(xiàng)。不同的是這里的等待隊(duì)列項(xiàng)是掛在 epoll 對(duì)象上的,而前者是掛在 socket 對(duì)象上的。

其源代碼如下:

//file: fs/eventpoll.c
SYSCALL_DEFINE4(epoll_wait, int, epfd, struct epoll_event __user *, events,
        int, maxevents, int, timeout)
{
    ...
    error = ep_poll(ep, events, maxevents, timeout);
}

static int ep_poll(struct eventpoll *ep, struct epoll_event __user *events,
             int maxevents, long timeout)
{
    wait_queue_t wait;
    ......

fetch_events:
    //4.1 判斷就緒隊(duì)列上有沒有事件就緒
    if (!ep_events_available(ep)) {

        //4.2 定義等待事件并關(guān)聯(lián)當(dāng)前進(jìn)程
        init_waitqueue_entry(&wait, current);

        //4.3 把新 waitqueue 添加到 epoll->wq 鏈表里
        __add_wait_queue_exclusive(&ep->wq, &wait);
    
        for (;;) {
            ...
            //4.4 讓出CPU 主動(dòng)進(jìn)入睡眠狀態(tài)
            if (!schedule_hrtimeout_range(to, slack, HRTIMER_MODE_ABS))
                timed_out = 1;
            ... 
}

4.1 判斷就緒隊(duì)列上有沒有事件就緒

首先調(diào)用 ep_events_available 來(lái)判斷就緒鏈表中是否有可處理的事件。

//file: fs/eventpoll.c
static inline int ep_events_available(struct eventpoll *ep)
{
    return !list_empty(&ep-rdllist)  ep-ovflist != EP_UNACTIVE_PTR;
}

4.2 定義等待事件并關(guān)聯(lián)當(dāng)前進(jìn)程

假設(shè)確實(shí)沒有就緒的連接,那接著會(huì)進(jìn)入 init_waitqueue_entry 中定義等待任務(wù),并把 current (當(dāng)前進(jìn)程)添加到 waitqueue 上。

是的,當(dāng)沒有 IO 事件的時(shí)候,epoll 也是會(huì)阻塞掉當(dāng)前進(jìn)程。這個(gè)是合理的,因?yàn)闆]有事情可做了占著 CPU 也沒啥意義。網(wǎng)上的很多文章有個(gè)很不好的習(xí)慣,討論阻塞、非阻塞等概念的時(shí)候都不說(shuō)主語(yǔ)。這會(huì)導(dǎo)致你看的云里霧里。拿 epoll 來(lái)說(shuō),epoll 本身是阻塞的,但一般會(huì)把 socket 設(shè)置成非阻塞。只有說(shuō)了主語(yǔ),這些概念才有意義。

//file: include/linux/wait.h
static inline void init_waitqueue_entry(wait_queue_t *q, struct task_struct *p)
{
    q->flags = 0;
    q->private = p;
    q->func = default_wake_function;
}

注意這里的回調(diào)函數(shù)名稱是 default_wake_function。后續(xù)在第 5 節(jié)數(shù)據(jù)來(lái)啦時(shí)將會(huì)調(diào)用到該函數(shù)。

4.3 添加到等待隊(duì)列

static inline void __add_wait_queue_exclusive(wait_queue_head_t *q,
                                wait_queue_t *wait)
{
    wait-flags |= WQ_FLAG_EXCLUSIVE;
    __add_wait_queue(q, wait);
}

在這里,把上一小節(jié)定義的等待事件添加到了 epoll 對(duì)象的等待隊(duì)列中。

4.4 讓出 CPU 主動(dòng)進(jìn)入睡眠狀態(tài)

通過 set_current_state 把當(dāng)前進(jìn)程設(shè)置為可打斷。調(diào)用 schedule_hrtimeout_range 讓出 CPU,主動(dòng)進(jìn)入睡眠狀態(tài)

//file: kernel/hrtimer.c
int __sched schedule_hrtimeout_range(ktime_t *expires, 
    unsigned long delta, const enum hrtimer_mode mode)
{
    return schedule_hrtimeout_range_clock(
            expires, delta, mode, CLOCK_MONOTONIC);
}

int __sched schedule_hrtimeout_range_clock()
{
    schedule();
    
}

在 schedule 中選擇下一個(gè)進(jìn)程調(diào)度

//file: kernel/sched/core.c
static void __sched __schedule(void)
{
    next = pick_next_task(rq);
    ...
    context_switch(rq, prev, next);
}

五、數(shù)據(jù)來(lái)啦

在前面 epoll_ctl 執(zhí)行的時(shí)候,內(nèi)核為每一個(gè) socket 上都添加了一個(gè)等待隊(duì)列項(xiàng)。在 epoll_wait 運(yùn)行完的時(shí)候,又在 event poll 對(duì)象上添加了等待隊(duì)列元素。在討論數(shù)據(jù)開始接收之前,我們把這些隊(duì)列項(xiàng)的內(nèi)容再稍微總結(jié)一下。

socket->sock->sk_data_ready 設(shè)置的就緒處理函數(shù)是 sock_def_readable

在 socket 的等待隊(duì)列項(xiàng)中,其回調(diào)函數(shù)是 ep_poll_callback。另外其 private 沒有用了,指向的是空指針 null。

在 eventpoll 的等待隊(duì)列項(xiàng)中,回調(diào)函數(shù)是 default_wake_function。其 private 指向的是等待該事件的用戶進(jìn)程。

在這一小節(jié)里,我們將看到軟中斷是怎么樣在數(shù)據(jù)處理完之后依次進(jìn)入各個(gè)回調(diào)函數(shù),最后通知到用戶進(jìn)程的。

5.1 接收數(shù)據(jù)到任務(wù)隊(duì)列

關(guān)于軟中斷是怎么處理網(wǎng)絡(luò)幀,為了避免篇幅過于臃腫,這里不再介紹。感興趣的可以看文章 《圖解 Linux 網(wǎng)絡(luò)包接收過程》。我們今天直接從 tcp 協(xié)議棧的處理入口函數(shù) tcp_v4_rcv 開始說(shuō)起。

// file: net/ipv4/tcp_ipv4.c
int tcp_v4_rcv(struct sk_buff *skb)
{
    ......
    th = tcp_hdr(skb); //獲取tcp header
    iph = ip_hdr(skb); //獲取ip header

    //根據(jù)數(shù)據(jù)包 header 中的 ip、端口信息查找到對(duì)應(yīng)的socket
    sk = __inet_lookup_skb(&tcp_hashinfo, skb, th->source, th->dest);
    ......

    //socket 未被用戶鎖定
    if (!sock_owned_by_user(sk)) {
        {
            if (!tcp_prequeue(sk, skb))
                ret = tcp_v4_do_rcv(sk, skb);
        }
    }
}

在 tcp_v4_rcv 中首先根據(jù)收到的網(wǎng)絡(luò)包的 header 里的 source 和 dest 信息來(lái)在本機(jī)上查詢對(duì)應(yīng)的 socket。找到以后,我們直接進(jìn)入接收的主體函數(shù) tcp_v4_do_rcv 來(lái)看。

//file: net/ipv4/tcp_ipv4.c
int tcp_v4_do_rcv(struct sock *sk, struct sk_buff *skb)
{
    if (sk-sk_state == TCP_ESTABLISHED) { 

        //執(zhí)行連接狀態(tài)下的數(shù)據(jù)處理
        if (tcp_rcv_established(sk, skb, tcp_hdr(skb), skb->len)) {
            rsk = sk;
            goto reset;
        }
        return 0;
    }

    //其它非 ESTABLISH 狀態(tài)的數(shù)據(jù)包處理
    ......
}

我們假設(shè)處理的是 ESTABLISH 狀態(tài)下的包,這樣就又進(jìn)入 tcp_rcv_established 函數(shù)中進(jìn)行處理。

//file: net/ipv4/tcp_input.c
int tcp_rcv_established(struct sock *sk, struct sk_buff *skb,
            const struct tcphdr *th, unsigned int len)
{
    ......

    //接收數(shù)據(jù)到隊(duì)列中
    eaten = tcp_queue_rcv(sk, skb, tcp_header_len,
                                    &fragstolen);

    //數(shù)據(jù) ready,喚醒 socket 上阻塞掉的進(jìn)程
    sk->sk_data_ready(sk, 0);

在 tcp_rcv_established 中通過調(diào)用  tcp_queue_rcv 函數(shù)中完成了將接收數(shù)據(jù)放到 socket 的接收隊(duì)列上。

如下源碼所示

//file: net/ipv4/tcp_input.c
static int __must_check tcp_queue_rcv(struct sock *sk, struct sk_buff *skb, int hdrlen,
            bool *fragstolen)
{
    //把接收到的數(shù)據(jù)放到 socket 的接收隊(duì)列的尾部
    if (!eaten) {
        __skb_queue_tail(&sk->sk_receive_queue, skb);
        skb_set_owner_r(skb, sk);
    }
    return eaten;
}

5.2 查找就緒回調(diào)函數(shù)

調(diào)用 tcp_queue_rcv 接收完成之后,接著再調(diào)用 sk_data_ready 來(lái)喚醒在 socket 上等待的用戶進(jìn)程。這又是一個(gè)函數(shù)指針。回想上面第一節(jié)我們?cè)?accept 函數(shù)創(chuàng)建 socket 流程里提到的 sock_init_data 函數(shù),在這個(gè)函數(shù)里已經(jīng)把 sk_data_ready 設(shè)置成 sock_def_readable 函數(shù)了。它是默認(rèn)的數(shù)據(jù)就緒處理函數(shù)。

當(dāng) socket 上數(shù)據(jù)就緒時(shí)候,內(nèi)核將以 sock_def_readable 這個(gè)函數(shù)為入口,找到 epoll_ctl 添加 socket 時(shí)在其上設(shè)置的回調(diào)函數(shù) ep_poll_callback。

我們來(lái)詳細(xì)看下細(xì)節(jié):

//file: net/core/sock.c
static void sock_def_readable(struct sock *sk, int len)
{
    struct socket_wq *wq;

    rcu_read_lock();
    wq = rcu_dereference(sk-sk_wq);

    //這個(gè)名字起的不好,并不是有阻塞進(jìn)程,
    //而是判斷等待隊(duì)列不為空
    if (wq_has_sleeper(wq))
        //執(zhí)行等待隊(duì)列項(xiàng)上的回調(diào)函數(shù)
        wake_up_interruptible_sync_poll(&wq->wait, POLLIN | POLLPRI |
                        POLLRDNORM | POLLRDBAND);
    sk_wake_async(sk, SOCK_WAKE_WAITD, POLL_IN);
    rcu_read_unlock();
}

這里的函數(shù)名其實(shí)都有迷惑人的地方。

wq_has_sleeper,對(duì)于簡(jiǎn)單的 recvfrom 系統(tǒng)調(diào)用來(lái)說(shuō),確實(shí)是判斷是否有進(jìn)程阻塞。但是對(duì)于 epoll 下的 socket 只是判斷等待隊(duì)列不為空,不一定有進(jìn)程阻塞的。

wake_up_interruptible_sync_poll,只是會(huì)進(jìn)入到 socket 等待隊(duì)列項(xiàng)上設(shè)置的回調(diào)函數(shù),并不一定有喚醒進(jìn)程的操作。

那接下來(lái)就是我們重點(diǎn)看 wake_up_interruptible_sync_poll 。

我們看一下內(nèi)核是怎么找到等待隊(duì)列項(xiàng)里注冊(cè)的回調(diào)函數(shù)的。

//file: include/linux/wait.h
#define wake_up_interruptible_sync_poll(x, m)       \
    __wake_up_sync_key((x), TASK_INTERRUPTIBLE, 1, (void *) (m))
//file: kernel/sched/core.c
void __wake_up_sync_key(wait_queue_head_t *q, unsigned int mode,
            int nr_exclusive, void *key)
{
    ...
    __wake_up_common(q, mode, nr_exclusive, wake_flags, key);
}

接著進(jìn)入 __wake_up_common

static void __wake_up_common(wait_queue_head_t *q, unsigned int mode,
            int nr_exclusive, int wake_flags, void *key)
{
    wait_queue_t *curr, *next;

    list_for_each_entry_safe(curr, next, &q-task_list, task_list) {
        unsigned flags = curr-flags;

        if (curr-func(curr, mode, wake_flags, key) &&
                (flags & WQ_FLAG_EXCLUSIVE) && !--nr_exclusive)
            break;
    }
}

在 __wake_up_common 中,選出等待隊(duì)列里注冊(cè)某個(gè)元素 curr,回調(diào)其 curr->func。回憶我們 ep_insert 調(diào)用的時(shí)候,把這個(gè) func 設(shè)置成 ep_poll_callback 了。

5.3 執(zhí)行 socket 就緒回調(diào)函數(shù)

在上一小節(jié)找到了 socket 等待隊(duì)列項(xiàng)里注冊(cè)的函數(shù) ep_poll_callback,軟中斷接著就會(huì)調(diào)用它。

//file: fs/eventpoll.c
static int ep_poll_callback(wait_queue_t *wait, unsigned mode, int sync, void *key)
{
    //獲取 wait 對(duì)應(yīng)的 epitem
    struct epitem *epi = ep_item_from_wait(wait);

    //獲取 epitem 對(duì)應(yīng)的 eventpoll 結(jié)構(gòu)體
    struct eventpoll *ep = epi->ep;

    //1. 將當(dāng)前epitem 添加到 eventpoll 的就緒隊(duì)列中
    list_add_tail(&epi->rdllink, &ep->rdllist);

    //2. 查看 eventpoll 的等待隊(duì)列上是否有在等待
    if (waitqueue_active(&ep->wq))
        wake_up_locked(&ep->wq);

在 ep_poll_callback 根據(jù)等待任務(wù)隊(duì)列項(xiàng)上的額外的 base 指針可以找到 epitem,進(jìn)而也可以找到 eventpoll 對(duì)象。

首先它做的第一件事就是把自己的 epitem 添加到 epoll 的就緒隊(duì)列中。

接著它又會(huì)查看 eventpoll 對(duì)象上的等待隊(duì)列里是否有等待項(xiàng)(epoll_wait 執(zhí)行的時(shí)候會(huì)設(shè)置)。

如果沒執(zhí)行軟中斷的事情就做完了。如果有等待項(xiàng),那就查找到等待項(xiàng)里設(shè)置的回調(diào)函數(shù)。

調(diào)用 wake_up_locked () => __wake_up_locked () => __wake_up_common。

static void __wake_up_common(wait_queue_head_t *q, unsigned int mode,
            int nr_exclusive, int wake_flags, void *key)
{
    wait_queue_t *curr, *next;

    list_for_each_entry_safe(curr, next, &q-task_list, task_list) {
        unsigned flags = curr-flags;

        if (curr-func(curr, mode, wake_flags, key) &&
                (flags & WQ_FLAG_EXCLUSIVE) && !--nr_exclusive)
            break;
    }
}

在 __wake_up_common 里,調(diào)用 curr->func。這里的 func 是在 epoll_wait 是傳入的 default_wake_function 函數(shù)。

5.4 執(zhí)行 epoll 就緒通知

在 default_wake_function 中找到等待隊(duì)列項(xiàng)里的進(jìn)程描述符,然后喚醒之。

源代碼如下:

//file:kernel/sched/core.c
int default_wake_function(wait_queue_t *curr, unsigned mode, int wake_flags,
                void *key)
{
    return try_to_wake_up(curr-private, mode, wake_flags);
}

等待隊(duì)列項(xiàng) curr->private 指針是在 epoll 對(duì)象上等待而被阻塞掉的進(jìn)程。

將 epoll_wait 進(jìn)程推入可運(yùn)行隊(duì)列,等待內(nèi)核重新調(diào)度進(jìn)程。然后 epoll_wait 對(duì)應(yīng)的這個(gè)進(jìn)程重新運(yùn)行后,就從 schedule 恢復(fù)

當(dāng)進(jìn)程醒來(lái)后,繼續(xù)從 epoll_wait 時(shí)暫停的代碼繼續(xù)執(zhí)行。把 rdlist 中就緒的事件返回給用戶進(jìn)程

//file: fs/eventpoll.c
static int ep_poll(struct eventpoll *ep, struct epoll_event __user *events,
             int maxevents, long timeout)
{

    ......
    __remove_wait_queue(&ep-wq, &wait);

    set_current_state(TASK_RUNNING);
    }
check_events:
    //返回就緒事件給用戶進(jìn)程
    ep_send_events(ep, events, maxevents))
}

從用戶角度來(lái)看,epoll_wait 只是多等了一會(huì)兒而已,但執(zhí)行流程還是順序的。

總結(jié)

我們來(lái)用一幅圖總結(jié)一下 epoll 的整個(gè)工作路程。

其中軟中斷回調(diào)的時(shí)候回調(diào)函數(shù)也整理一下:

sock_def_readable:sock 對(duì)象初始化時(shí)設(shè)置的 => ep_poll_callback : epoll_ctl 時(shí)添加到 socket 上的 => default_wake_function: epoll_wait 是設(shè)置到 epoll 上的

總結(jié)下,epoll 相關(guān)的函數(shù)里內(nèi)核運(yùn)行環(huán)境分兩部分:

用戶進(jìn)程內(nèi)核態(tài)。進(jìn)行調(diào)用 epoll_wait  等函數(shù)時(shí)會(huì)將進(jìn)程陷入內(nèi)核態(tài)來(lái)執(zhí)行。這部分代碼負(fù)責(zé)查看接收隊(duì)列,以及負(fù)責(zé)把當(dāng)前進(jìn)程阻塞掉,讓出 CPU。

硬軟中斷上下文。在這些組件中,將包從網(wǎng)卡接收過來(lái)進(jìn)行處理,然后放到 socket 的接收隊(duì)列。對(duì)于 epoll 來(lái)說(shuō),再找到 socket 關(guān)聯(lián)的 epitem,并把它添加到 epoll 對(duì)象的就緒鏈表中。這個(gè)時(shí)候再捎帶檢查一下 epoll 上是否有被阻塞的進(jìn)程,如果有喚醒之。

為了介紹到每個(gè)細(xì)節(jié),本文涉及到的流程比較多,把阻塞都介紹進(jìn)來(lái)了。

但其實(shí)在實(shí)踐中,只要活兒足夠的多,epoll_wait 根本都不會(huì)讓進(jìn)程阻塞。用戶進(jìn)程會(huì)一直干活,一直干活,直到 epoll_wait 里實(shí)在沒活兒可干的時(shí)候才主動(dòng)讓出 CPU。這就是 epoll 高效的地方所在!

廣告聲明:文內(nèi)含有的對(duì)外跳轉(zhuǎn)鏈接(包括不限于超鏈接、二維碼、口令等形式),用于傳遞更多信息,節(jié)省甄選時(shí)間,結(jié)果僅供參考,IT之家所有文章均包含本聲明。

相關(guān)文章

關(guān)鍵詞:Linuxlinux

軟媒旗下網(wǎng)站: IT之家 最會(huì)買 - 返利返現(xiàn)優(yōu)惠券 iPhone之家 Win7之家 Win10之家 Win11之家

軟媒旗下軟件: 軟媒手機(jī)APP應(yīng)用 魔方 最會(huì)買 要知