redis中的IO多路复用select和epoll

2022/01/05 879点热度 0人点赞 0条评论

基本概念

I/O多路复用是一种同步I/O模型,实现一个线程可以监视多个文件句柄;一旦某个文件句柄就绪,就能够通知线程进行相应的读写操作;没有文件句柄就绪时会阻塞应用程序。多路是指网络连接,复用指的是同一个线程。

I/O多路复用模型是建立在内核提供的多路分离函数select基础之上的,使用select函数可以避免同步非阻塞IO模型中轮询等待的问题。

可以先了解下fd和socketlinux中的文件描述符与套接字socket

redis中的使用

在redis中支持四种类型的实现。根据环境的不同,include进去不同的实现。

/**
 * @brief ae的几种实现
 * redis按照性能从上到下排序
 * 
 * evport: 支持Solaris
 * epoll: 支持linux
 * kqueue: 支持FreeBSD 系统 如macos
 * select: 都不包含就是select
 */
#ifdef HAVE_EVPORT
#include "ae_evport.c"
#else
    #ifdef HAVE_EPOLL
    #include "ae_epoll.c"
    #else
        #ifdef HAVE_KQUEUE
        #include "ae_kqueue.c"
        #else
        #include "ae_select.c"
        #endif
    #endif
#endif

redis虽然没有声明。但是4个文件都是同一类接口实现,主要核心接口如下,通过声明同一套接口,在各个操作系统里实现对应的实现,屏蔽差异性。

/**
 * @brief  
 * 根据不同的操作系统会有不同的实现
 * 对于select来说应该是就是初始化fdset,用于select的相关调用;
 * 对于epoll来说,需要创建epoll的fd以及epoll使用的events数组
 * @param eventLoop 
 * @return int 
 */
static int aeApiCreate(aeEventLoop *eventLoop) {}

/**
 * @brief 注册事件到 到操作系统,每个操作系统针对读写的事件类型不同
 *  对于evport来说,往npending里增加fd  
 *  对于kqueue来说,就是往kqfd里增加fd
 *  对于select来说,就是往对应读写类型的fd_set里面增加fd
 *  对于epoll来说,就是在events中增加/修改感兴趣的事件
 * @param eventLoop 是为了接收回调数据
 * @param fd 对应监听的fd值
 * @param mask 类型
 * @return int 
 */
static int aeApiAddEvent(aeEventLoop *eventLoop, int fd, int mask) {}

/**
 * @brief 单位时间内监听到的事件数量
 * @param eventLoop 
 * @param tvp 单位时间
 * @return int 返回待处理的事件数量
 */
static int aeApiPoll(aeEventLoop *eventLoop, struct timeval *tvp) {}
//删除事件
static void aeApiDelEvent(aeEventLoop *eventLoop, int fd, int mask) {}

static void aeApiFree(aeEventLoop *eventLoop) {}

//获取实现的名称
static char *aeApiName(void) {}

select 实现

select函数准许线程指示内核等待多个事件中的任何一个发送,并只在有一个或多个事件发生或经历一段指定的时间后才唤醒。函数原型如下:

/**
 * @brief 
 * 
 * @param maxfd 监听最大的文件描述符fd(maxfd+1),也可以理解为描述符数量
 * @param readset AE_READABLE 事件fd集合
 * @param writeset  AE_WRITABLE事件fd集合
 * @param exceptset 异常事件的fd集合,redis直接丢弃
 * @param timeout 阻塞等待时长
 * @return int 返回就绪描述符的数目,超时返回0,出错返回-1
 */
int select(int maxfd,fd_set *readset,fd_set *writeset,fd_set *exceptset,const struct timeval *timeout){}

select原理图(也可以链接socket后端处理流程):

调用过程图:

redis的源码

typedef struct aeApiState {
    /**
     * @brief 新增和删除操作这里
     * rfds 读集合
     * wfds 写集合
     */
    fd_set rfds, wfds;
    /**
     * @brief 从rfds, wfds里 拷贝的fd集合,确保在处理的时候,是安全的
     */
    fd_set _rfds, _wfds;
} aeApiState;

static int aeApiCreate(aeEventLoop *eventLoop) {
    aeApiState *state = zmalloc(sizeof(aeApiState));

    if (!state) return -1;
    //FD_ZERO 清空集合
    FD_ZERO(&state->rfds);
    FD_ZERO(&state->wfds);
    eventLoop->apidata = state;
    return 0;
}

static int aeApiResize(aeEventLoop *eventLoop, int setsize) {
    /* Just ensure we have enough room in the fd_set type. */
    //FD_SETSIZE 获取setsize
    if (setsize >= FD_SETSIZE) return -1;
    return 0;
}

static void aeApiFree(aeEventLoop *eventLoop) {
    zfree(eventLoop->apidata);
}
/**
 * @brief 根据事件的掩码将fd放到对应的set里
 * @param eventLoop 主程序
 * @param fd 
 * @param mask 事件类型掩码
 * @return int 
 */
static int aeApiAddEvent(aeEventLoop *eventLoop, int fd, int mask) {
    aeApiState *state = eventLoop->apidata;
    //FD_SET将一个给定的文件描述符加入集合之中
    if (mask & AE_READABLE) FD_SET(fd,&state->rfds);
    if (mask & AE_WRITABLE) FD_SET(fd,&state->wfds);
    return 0;
}

static void aeApiDelEvent(aeEventLoop *eventLoop, int fd, int mask) {
    aeApiState *state = eventLoop->apidata;
    //FD_CLR 将一个给定的文件描述符从集合中删除
    if (mask & AE_READABLE) FD_CLR(fd,&state->rfds);
    if (mask & AE_WRITABLE) FD_CLR(fd,&state->wfds);
}

/**
 * @brief 单位时间内获取事件数量
 * @param eventLoop 
 * @param tvp 在单位时间内
 * @return int 返回待处理的事件数量
 */
static int aeApiPoll(aeEventLoop *eventLoop, struct timeval *tvp) {
    aeApiState *state = eventLoop->apidata;
    int retval, j, numevents = 0;
    //将rfds 和wfds 的fd集合拷贝到_rfds和_wfds,有多少个覆盖多少个
    memcpy(&state->_rfds,&state->rfds,sizeof(fd_set));
    memcpy(&state->_wfds,&state->wfds,sizeof(fd_set));

    /**
     * @brief 
     * eventLoop->maxfd+1  监听文件描述符数量,包含了所有的fd
     * &state->_rfds  AE_READABLE 事件fd集合
     * &state->_wfds AE_WRITABLE事件fd集合
     * tvp 阻塞等待时长
     * return retval 就绪描述符的数目,超时返回0,出错返回-1
     */
    retval = select(eventLoop->maxfd+1,
                &state->_rfds,&state->_wfds,NULL,tvp);
    if (retval > 0) {
        for (j = 0; j <= eventLoop->maxfd; j++) {
            int mask = 0;
            aeFileEvent *fe = &eventLoop->events[j];
            if (fe->mask == AE_NONE) continue;
            //FD_ISSET 检查集合中指定的文件描述符是否可以读写,可读写
            if (fe->mask & AE_READABLE && FD_ISSET(j,&state->_rfds))
                mask |= AE_READABLE;
            if (fe->mask & AE_WRITABLE && FD_ISSET(j,&state->_wfds))
                mask |= AE_WRITABLE;
            //将监听到的事件放入到触发事件数组中
            eventLoop->fired[numevents].fd = j;
            eventLoop->fired[numevents].mask = mask;
            numevents++;
        }
    }
    return numevents;
}

static char *aeApiName(void) {
    return "select";
}

select的缺点:

  • 单个进程打开的fd是有限制的,通过fd_setsize设置,默认1024

  • 每次调用select,都需要把fd集合从用户态copy到内核态

  • 每次调用select,都需要在内核遍历(线性扫描,采用轮询的方法,效率较低)传递进来的所有fd

epoll

epoll是在2.6内核中提出的,是之前的select和poll的增强版本。相对于select和poll来说,epoll更加灵活,没有描述符限制。epoll使用一个文件描述符管理多个描述符,将用户关系的文件描述符的事件存放到内核的一个事件表中,这样在用户空间和内核空间的copy只需一次。

// 数据结构
// 每一个epoll对象都有一个独立的eventpoll结构体
// 用于存放通过epoll_ctl方法向epoll对象中添加进来的事件
// epoll_wait检查是否有事件发生时,只需要检查eventpoll对象中的rdlist双链表中是否有epitem元素即可
struct eventpoll {
    /*红黑树的根节点,这颗树中存储着所有添加到epoll中的需要监控的事件*/
    struct rb_root  rbr;
    /*双链表中则存放着将要通过epoll_wait返回给用户的满足条件的事件*/
    struct list_head rdlist;
};

/**
 * @brief 内核中间加一个epoll对象,
 * @param size 告诉内核监听的数目一共有多大
 * @return int 
 */
int epoll_create(int size);
/**
 * @brief 负责把 socket 增加、删除到内核红黑树
 * @param epfd 监听的内核fd
 * @param op epoll的动作
 *    EPOLL_CTL_ADD 注册新的fd到epfd中
 *    EPOLL_CTL_MOD 修改已经注册的fd的监听事件
 *    EPOLL_CTL_DEL 从epfd中删除一个fd
 * @param fd 需要监听的fd
 * @param event 告诉内核需要监听什么事件
 * @return int 
 */
int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event); 
/**
 * @brief epoll_wait 负责检测可读队列,没有可读 socket 则阻塞进程
 * @param epfd 监听的内核fd
 * @param events 从内核得到的事件集合(如果有的话)
 * @param maxevents 告知内核最多捞出的数据 这个值是有epoll创建的一定合法(就是告诉内核state->events 的大小)
 * @param timeout 0 表示立即返回 >0 表示等待多久 -1 表示阻塞
 * @return int  0 表示超时,正常返回需要处理的事件数目
 */
int epoll_wait(int epfd, struct epoll_event * events, int maxevents, int timeout);

redis的实现

typedef struct aeApiState {
    //事件处理的句柄(可以理解为redis进程的fd)
    int epfd;
    //事件数组
    struct epoll_event *events;
} aeApiState;

/**
 * @brief e
 * @param eventLoop 
 * @return int 
 */
static int aeApiCreate(aeEventLoop *eventLoop) {
    aeApiState *state = zmalloc(sizeof(aeApiState));

    if (!state) return -1;
    //存放事件数组,最大链接数+128
    state->events = zmalloc(sizeof(struct epoll_event)*eventLoop->setsize);
    if (!state->events) {
        zfree(state);
        return -1;
    }
    //创建一个epoll的句柄,1024告诉内核这个监听的数目有多大
    state->epfd = epoll_create(1024); /* 1024 is just a hint for the kernel */
    if (state->epfd == -1) {
        zfree(state->events);
        zfree(state);
        return -1;
    }
    eventLoop->apidata = state;
    return 0;
}

static int aeApiResize(aeEventLoop *eventLoop, int setsize) {
    aeApiState *state = eventLoop->apidata;

    state->events = zrealloc(state->events, sizeof(struct epoll_event)*setsize);
    return 0;
}

static void aeApiFree(aeEventLoop *eventLoop) {
    aeApiState *state = eventLoop->apidata
    close(state->epfd);
    zfree(state->events);
    zfree(state);
}

static int aeApiAddEvent(aeEventLoop *eventLoop, int fd, int mask) {
    aeApiState *state = eventLoop->apidata;
    struct epoll_event ee = {0}; /* avoid valgrind warning */
    /* If the fd was already monitored for some event, we need a MOD
     * operation. Otherwise we need an ADD operation. */
    //epoll的动作,有数据就是EPOLL_CTL_ADD
    int op = eventLoop->events[fd].mask == AE_NONE ?
            EPOLL_CTL_ADD : EPOLL_CTL_MOD;

    ee.events = 0;
    mask |= eventLoop->events[fd].mask; /* Merge old events */
    /**
     * @brief epoll里的读写类型
     *  读  EPOLLIN
     *  写  EPOLLOUT
     */
    if (mask & AE_READABLE) ee.events |= EPOLLIN;
    if (mask & AE_WRITABLE) ee.events |= EPOLLOUT;
    ee.data.fd = fd;
    /**
     * @brief epoll_ctl epoll的事件注册
     * state->epfd  epoll的内核fd
     * op epoll的动作,
     *    EPOLL_CTL_ADD 注册新的fd到epfd中
     *    EPOLL_CTL_MOD 修改已经注册的fd的监听事件
     *    EPOLL_CTL_DEL 从epfd中删除一个fd
     * fd 需要监听的fd
     * &ee 告诉内核需要监听什么事件
     */
    if (epoll_ctl(state->epfd,op,fd,&ee) == -1) return -1;
    return 0;
}

static void aeApiDelEvent(aeEventLoop *eventLoop, int fd, int delmask) {
    aeApiState *state = eventLoop->apidata;
    struct epoll_event ee = {0}; /* avoid valgrind warning */
    int mask = eventLoop->events[fd].mask & (~delmask);

    ee.events = 0;
    if (mask & AE_READABLE) ee.events |= EPOLLIN;
    if (mask & AE_WRITABLE) ee.events |= EPOLLOUT;
    ee.data.fd = fd;
    if (mask != AE_NONE) {
        epoll_ctl(state->epfd,EPOLL_CTL_MOD,fd,&ee);
    } else {
        /* Note, Kernel < 2.6.9 requires a non null event pointer even for
         * EPOLL_CTL_DEL. */
        epoll_ctl(state->epfd,EPOLL_CTL_DEL,fd,&ee);
    }
}
/**
 * @brief 单位时间内获取事件数量
 * 
 * @param eventLoop 
 * @param tvp 在单位时间内
 * @return int 返回待处理的事件数量
 */
static int aeApiPoll(aeEventLoop *eventLoop, struct timeval *tvp) {
    aeApiState *state = eventLoop->apidata;
    int retval, numevents = 0;

    /**
     * 从epoll里捞出就绪的事件(等待事件的产生)
     * state->epfd 监听的内核fd
     * state->events 从内核得到的事件集合(如果有的话)
     * eventLoop->setsize 告知内核最多捞出的数据 这个值是有epoll创建的一定合法(就是告诉内核state->events 的大小)
     * timeout:
     *  0 表示立即返回
     *  >0 表示等待多久
     *  -1 表示阻塞
     * return  retval
     *  0 表示超时,正常返回需要处理的事件数目
     */

    retval = epoll_wait(state->epfd,state->events,eventLoop->setsize,
            tvp ? (tvp->tv_sec*1000 + tvp->tv_usec/1000) : -1);
    if (retval > 0) {
        int j;

        numevents = retval;
        for (j = 0; j < numevents; j++) {
            int mask = 0;
            //根据事件数组的首地址+j获取对应位置的epoll_event
            struct epoll_event *e = state->events+j;
            /**
             * @brief 将epool里的事件类型转化成redis的事件类型
             * EPOLLIN :表示对应的文件描述符可以读(包括客户端SOCKET正常关闭);
             * EPOLLOUT:表示对应的文件描述符可以写;
             * EPOLLPRI:表示对应的文件描述符有紧急的数据可读(这里应该表示有带外数据到来);
             * EPOLLERR:表示对应的文件描述符发生错误;
             * EPOLLHUP:表示对应的文件描述符被挂断;
             * EPOLLET: 将EPOLL设为边缘触发(Edge Triggered)模式,这是相对于水平触发(Level Triggered)来说的。
             * EPOLLONESHOT:只监听一次事件,当监听完这次事件之后,如果还需要继续监听这个socket的话,需要再次把这个socket加入到EPOLL队列里
             */
            if (e->events & EPOLLIN) mask |= AE_READABLE;
            if (e->events & EPOLLOUT) mask |= AE_WRITABLE;
            if (e->events & EPOLLERR) mask |= AE_WRITABLE;
            if (e->events & EPOLLHUP) mask |= AE_WRITABLE;
            eventLoop->fired[j].fd = e->data.fd;
            eventLoop->fired[j].mask = mask;
        }
    }
    return numevents;
}

static char *aeApiName(void) {
    return "epoll";
}

epoll改进了select模式,就避免了以上的几个缺点

  • epoll所支持的fd上限是最大可打开文件的数目,这个数字,可以在操作系统配置,通过ulimit -a查看;

  • epoll通过epoll_ctl函数,每次注册新的事件到epoll句柄的时候,都会把对应的fd copy到内核中,epoll保证了每个fd在整个过程中只copy一次;

  • epoll 通过两种工作模式LT(level trigger) 和ET(edge trigger) 遍历扫描

  • LT模式(默认):epoll_wait监测到fd事件发生后会通知应用程序,应用程序可以不立即处理该事件,下次调用会再次响应;

  • ET模式:epoll_wait监测到fd事件发生后会通知应用程序,应用程序必须立即处理该事件,如不处理,下次调用不会再响应;

ps:kqueue和epoll看着差不多,就不再写了

参考: https://www.cnblogs.com/Anker/p/3265058.html

本文是Redis源码剖析系列博文,有想深入学习Redis的同学,欢迎star和关注;
Redis中文注解版:https://github.com/yxkong/redis/tree/5.0
如果觉得本文对你有用,欢迎一键三连;
同时可以关注微信公众号5ycode获取第一时间的更新哦;

yxkong

这个人很懒,什么都没留下