Linux select/poll/epoll 原理(三)poll 实现

针对 select 系统调用的三个不足,poll 解决的是第一个、最多 1024 个 FD 限制的问题。

其实现思路是:
1. 不再使用位图来传递事件和结果,而是使用 pollfd 。 结构体数组来传递。
2. 在内部实现时,以 poll_list 链表的形式来分批次保存 pollfd 。不像 select 那样一次申请完整的一大块内存。
3. 通过从进程的信号量里获取能打开的最大文件数量,解决 1024 个限制的问题。

0. 基本数据结构

// 源码位置:include/uapi/asm-generic/poll.h
struct pollfd {
    int fd;         // FD
    short events;   // 输入的敢兴趣事件
    short revents;  // 输出的结果
};

// 源码位置:fs/select.c
struct poll_list {
    struct poll_list *next;

    // entries 指向的数组里 pollfd 的数量
    int len;

    // 指向 pollfd 数组的指针
    struct pollfd entries[0];
};

pollfd 结构体用来传递单个FD的输入事件、输出结果。

poll_list 是一个链表,其节点指向 pollfd 结构体的数组,这个数组要么是在栈上预分配、要么是按内存页分配(保持页对齐)。

1. poll 系统调用主逻辑

// 源码位置:fs/select.c
SYSCALL_DEFINE3(poll, struct pollfd __user *, ufds, unsigned int, nfds,
        int, timeout_msecs)
{
    struct timespec64 end_time, *to = NULL;
    int ret;

    if (timeout_msecs >= 0) {
        to = &end_time;
        poll_select_set_timeout(to, timeout_msecs / MSEC_PER_SEC,
            NSEC_PER_MSEC * (timeout_msecs % MSEC_PER_SEC));
    }

    ret = do_sys_poll(ufds, nfds, to);

    if (ret == -EINTR) {
        struct restart_block *restart_block;

        restart_block = &current->restart_block;
        restart_block->fn = do_restart_poll;
        restart_block->poll.ufds = ufds;
        restart_block->poll.nfds = nfds;

        if (timeout_msecs >= 0) {
            restart_block->poll.tv_sec = end_time.tv_sec;
            restart_block->poll.tv_nsec = end_time.tv_nsec;
            restart_block->poll.has_timeout = 1;
        } else
            restart_block->poll.has_timeout = 0;

        ret = -ERESTART_RESTARTBLOCK;
    }
    return ret;
}

static int do_sys_poll(struct pollfd __user *ufds, unsigned int nfds,
        struct timespec64 *end_time)
{
    struct poll_wqueues table;
    int err = -EFAULT, fdcount, len, size;
    /* Allocate small arguments on the stack to save memory and be
       faster - use long to make sure the buffer is aligned properly
       on 64 bit archs to avoid unaligned access */
    // 创建 256 个字节大小的数组
    long stack_pps[POLL_STACK_ALLOC/sizeof(long)];
    struct poll_list *const head = (struct poll_list *)stack_pps;
    struct poll_list *walk = head;
    unsigned long todo = nfds;

    // 如果超过能打开的最大文件数则返回
    // 这个 rlimit 通过判等进程信息里的信号量来实现的
    // 因此修改文件能打开的最大文件数不需要重新编译,可以实时修改
    if (nfds > rlimit(RLIMIT_NOFILE))
        return -EINVAL;

    // N_STACK_PPS 是用于计算 stack_pps 里能存放多少个 pollfd 结构体
    len = min_t(unsigned int, nfds, N_STACK_PPS);

    // 从用户空间拷贝 pollfd 数组到内核空间
    // 分批拷贝,不同批次之间用 poll_list 链表维护起来
    for (;;) {
        walk->next = NULL;
        walk->len = len;
        if (!len)
            break;

        if (copy_from_user(walk->entries, ufds + nfds-todo,
                    sizeof(struct pollfd) * walk->len))
            goto out_fds;

        todo -= walk->len;
        if (!todo)
            break;

        // POLLFD_PER_PAGE 是一个页面能存放多少个 pollfd 结构体
        len = min(todo, POLLFD_PER_PAGE);
        size = sizeof(struct poll_list) + sizeof(struct pollfd) * len;
        walk = walk->next = kmalloc(size, GFP_KERNEL);
        if (!walk) {
            err = -ENOMEM;
            goto out_fds;
        }
    }

    // 初始化 poll_wqueues,设置队列处理函数等
    poll_initwait(&table);

    // 主逻辑:调用目标文件的 poll 函数
    fdcount = do_poll(head, &table, end_time);

    // 删除主逻辑里添加到目标文件的等待节点
    poll_freewait(&table);

    // 把结果拷贝到用户空间
    for (walk = head; walk; walk = walk->next) {
        struct pollfd *fds = walk->entries;
        int j;

        for (j = 0; j < walk->len; j++, ufds++)
            if (__put_user(fds[j].revents, &ufds->revents))
                goto out_fds;
    }

    err = fdcount;
out_fds:
    // 释放申请的内存
    walk = head->next;
    while (walk) {
        struct poll_list *pos = walk;
        walk = walk->next;
        kfree(pos);
    }

    return err;
}

static int do_poll(struct poll_list *list, struct poll_wqueues *wait,
           struct timespec64 *end_time)
{
    poll_table* pt = &wait->pt;
    ktime_t expire, *to = NULL;
    int timed_out = 0, count = 0;
    u64 slack = 0;
    unsigned int busy_flag = net_busy_loop_on() ? POLL_BUSY_LOOP : 0;
    unsigned long busy_start = 0;

    /* Optimise the no-wait case */
    if (end_time && !end_time->tv_sec && !end_time->tv_nsec) {
        pt->_qproc = NULL;
        timed_out = 1;
    }

    if (end_time && !timed_out)
        slack = select_estimate_accuracy(end_time);

    for (;;) {
        struct poll_list *walk;
        bool can_busy_loop = false;

        // 遍历链表
        for (walk = list; walk != NULL; walk = walk->next) {
            struct pollfd * pfd, * pfd_end;

            pfd = walk->entries;
            pfd_end = pfd + walk->len;
            // 遍历节点里的数组
            for (; pfd != pfd_end; pfd++) {
                /*
                 * Fish for events. If we found one, record it
                 * and kill poll_table->_qproc, so we don't
                 * needlessly register any other waiters after
                 * this. They'll get immediately deregistered
                 * when we break out and return.
                 */
                // 处理单个 pollfd
                if (do_pollfd(pfd, pt, &can_busy_loop,
                          busy_flag)) {
                    // 有事件发生了
                    count++;

                    // 后续的文件即使没有事件发生也不需要等待了。
                    pt->_qproc = NULL;
                    /* found something, stop busy polling */
                    busy_flag = 0;
                    can_busy_loop = false;
                }
            }
        }
        // 注意:上面的遍历循环里,并没有像 select 那样,在小批次poll后进行睡眠。
        /*
         * All waiters have already been registered, so don't provide
         * a poll_table->_qproc to them on the next loop iteration.
         */
        pt->_qproc = NULL;
        if (!count) {
            count = wait->error;
            if (signal_pending(current))
                count = -EINTR;
        }
        if (count || timed_out)
            break;

        /* only if found POLL_BUSY_LOOP sockets && not out of time */
        if (can_busy_loop && !need_resched()) {
            if (!busy_start) {
                busy_start = busy_loop_current_time();
                continue;
            }
            if (!busy_loop_timeout(busy_start))
                continue;
        }
        busy_flag = 0;

        /*
         * If this is the first loop and we have a timeout
         * given, then we convert to ktime_t and set the to
         * pointer to the expiry value.
         */
        if (end_time && !to) {
            expire = timespec64_to_ktime(*end_time);
            to = &expire;
        }

        // 睡眠等待直至超时或被唤醒
        if (!poll_schedule_timeout(wait, TASK_INTERRUPTIBLE, to, slack))
            timed_out = 1;
    }
    return count;
}

static inline unsigned int do_pollfd(struct pollfd *pollfd, poll_table *pwait,
                     bool *can_busy_poll,
                     unsigned int busy_flag)
{
    unsigned int mask;
    int fd;

    mask = 0;
    fd = pollfd->fd;
    if (fd >= 0) {
        struct fd f = fdget(fd);
        mask = POLLNVAL;
        if (f.file) {
            mask = DEFAULT_POLLMASK;
            if (f.file->f_op->poll) {
                pwait->_key = pollfd->events|POLLERR|POLLHUP;
                pwait->_key |= busy_flag;
                mask = f.file->f_op->poll(f.file, pwait);
                if (mask & busy_flag)
                    *can_busy_poll = true;
            }
            /* Mask out unneeded events. */
            // 清除不需要的事件
            mask &= pollfd->events | POLLERR | POLLHUP;
            fdput(f);
        }
    }
    pollfd->revents = mask;

    return mask;
}

由于采用 pollfd 结构体来传递文件的事件,do_poll 遍历所有文件时的逻辑更清晰些、有层次,对某个 pollfd 的处理提取成 do_pollfd 函数。

2. 等待与唤醒

poll 系统调用的等待与唤醒逻辑与 select 系统调用是一样的,调用的代码是一样的。

3. 小结

  1. poll 系统调用解决了 select 系统调用最多 1024 个文件的限制。
  2. 每次调用仍然需要拷贝所有文件的事件。
  3. 其实现上,每次唤醒后仍然需要 poll 所有文件,效率依赖于 poll 的文件数量。在网络的服务端应用里,要监听的 socket 连接的事件是比较稳定的、其数量也是比较稳定的,不会每次 select/poll 都会发生变化,因此第2、3点的问题有很大的优化空间,这在 epoll 系统调用里解决。

欢迎关注我的微信公众号: coderbee笔记,可以更及时回复你的讨论。

发表回复

您的电子邮箱地址不会被公开。 必填项已用 * 标注

此站点使用Akismet来减少垃圾评论。了解我们如何处理您的评论数据