redis源码阅读三-终于把主线任务执行搞明白了

2021/12/14 812点热度 0人点赞 0条评论

redis源码阅读二-终于把redis的启动流程搞明白了 介绍redis的启动流程,也画了一张图。今天我来详细讲解下redis的主线任务是怎么执行的。

我们先看一下流程图

在redis的主线任务里,主要有三大块:

  • eventLoop->beforesleep 创建回调的写事件并绑定处理器sendReplyToClient,在handleClientsWithPendingWrites

  • aeProcessEvents 执行整主流程,主要功能

  • 从epoll读取fd,将读取的数据写入server.clients

  • 监听对外暴露的ip和端口(tcp socket )通过acceptTcpHandler来监听新的请求,并创建fd

  • 监听到就绪事件后通过readQueryFromClient解析并执行命令

  • processTimeEvents,定时任务执行,在aeProcessEvents内部

/**
 * @brief 创建文件事件,并将fd加入到对应的poll里
 * 回调以后执行对应的处理器
 * @param eventLoop 
 * @param fd 对应的请求的fd
 * @param mask 事件类型
 * @param proc poll回调后执行的处理器
 * @param clientData 和client绑定(引用地址)
 * @return int 
 */
int aeCreateFileEvent(aeEventLoop *eventLoop, int fd, int mask,
        aeFileProc *proc, void *clientData)
{
    if (fd >= eventLoop->setsize) {
        errno = ERANGE;
        return AE_ERR;
    }
    // 将创建的aeFileEvent 插入到对应的事件表对应的位置eventLoop->events[fd](按tcp的原理同一个fd不会出现两次)
    aeFileEvent *fe = &eventLoop->events[fd];
    //添加到操作系统的poll里,每个操作系统都有自己的实现,最终是把监听的对应的fd的事件放入到eventLoop.apidata
    if (aeApiAddEvent(eventLoop, fd, mask) == -1)
        return AE_ERR;
    fe->mask |= mask;
    // 将处理器给rfileProc和wfileProc(后续会拿着这个回调执行)
    if (mask & AE_READABLE) fe->rfileProc = proc;
    if (mask & AE_WRITABLE) fe->wfileProc = proc;
    fe->clientData = clientData;
    //重新对maxfd赋值
    if (fd > eventLoop->maxfd)
        eventLoop->maxfd = fd;
    return AE_OK;
}

主流程处理代码分析-aeProcessEvents

在ae.c中


/**
 * @brief 执行eventLoop
 * 
 * @param eventLoop 
 */
void aeMain(aeEventLoop *eventLoop) {
    eventLoop->stop = 0;
     //只要没有停止,就循环执行,这个是主线程
    while (!eventLoop->stop) {
        if (eventLoop->beforesleep != NULL)
            //每次循环前执行beforesleep
            eventLoop->beforesleep(eventLoop);
        aeProcessEvents(eventLoop, AE_ALL_EVENTS|AE_CALL_AFTER_SLEEP);
    }
}

我们按照顺序一个个的来看下对应的源代码

再看下ae.c中的aeProcessEvents

/**
 * @brief 处理eventLoop里等待的 定时任务,文件事件,过期事件
 * @param eventLoop 
 * @param flags 事件类型,
 * 从main中过来是所有的事件,AE_ALL_EVENTS|AE_CALL_AFTER_SLEEP
 * 从networking过来是文件事件 AE_FILE_EVENTS|AE_DONT_WAIT
 * @return int 
 */
int aeProcessEvents(aeEventLoop *eventLoop, int flags)
{
    int processed = 0, numevents;

    /* Nothing to do? return ASAP */
    //针对事件类型判断
    if (!(flags & AE_TIME_EVENTS) && !(flags & AE_FILE_EVENTS)) return 0;

    /* Note that we want call select() even if there are no
     * file events to process as long as we want to process time
     * events, in order to sleep until the next time event is ready
     * to fire. */

    /**
     * @brief 这块的意思是:
     * eventLoop 有监听的tcp 或者(flags是时间事件且等待处理)
     * 去获取下一次要执行的timer任务,如果有,计算出间隔时间,
     *  那在等待读写事件的时候,就最多阻塞对应的时间
     *  否则就一直阻塞直到有任务到达(如果本身epoll设置有超时时间?)
     */
    if (eventLoop->maxfd != -1 ||
        ((flags & AE_TIME_EVENTS) && !(flags & AE_DONT_WAIT))) {
        int j;
        aeTimeEvent *shortest = NULL;
        //时间
        struct timeval tv, *tvp;

        if (flags & AE_TIME_EVENTS && !(flags & AE_DONT_WAIT))
            shortest = aeSearchNearestTimer(eventLoop);

        // 根据shortest计算出差值事件tvp

        /**
         * @brief 从epoll里拿到numevents数量的数据,会把任务放到fired里,这里并没有指定事件类型
         * 有时间,就阻塞指定的时间,
         * 没有时间,直到有数据,这时定时任务也不用执行(redis是极简主义,猜测作者的意图,你都没有读写请求,我还处理定期任务干啥)
         */
        numevents = aeApiPoll(eventLoop, tvp);

        /* After sleep callback. */
        //执行aftersleep
        if (eventLoop->aftersleep != NULL && flags & AE_CALL_AFTER_SLEEP)
            eventLoop->aftersleep(eventLoop);

        /**
         * @brief 执行触发的事件
         */
        for (j = 0; j < numevents; j++) {
            /**
             * @brief 从注册的事件表中拿到对应fd的事件,插入逻辑在aeCreateFileEvent里 
             */
            aeFileEvent *fe = &eventLoop->events[eventLoop->fired[j].fd];
            int mask = eventLoop->fired[j].mask;
            int fd = eventLoop->fired[j].fd;
            int fired = 0; /* Number of events fired for current fd. */

            //反转主要是为了持久化设置的,在持久化的时候,设置的屏障,等后续看持久化的代码再详解 TODO
            int invert = fe->mask & AE_BARRIER;

            /**
             * @brief 读事件(AE_READABLE)处理器包括:
             * 由acceptTcpHandler处理的,在initServer里创建
             * 由readQueryFromClient处理的,在acceptTcpHandler.acceptCommonHandler.createClient里创建
             * TODO 其他的读事件后续分析
             * 
             * 写事件(AE_WRITABLE)处理器包括:
             * 由sendReplyToClient 处理的,在beforeSleep->handleClientsWithPendingWrites里创建
             * TODO  其他的写事件后续分析
             */
            if (!invert && fe->mask & mask & AE_READABLE) {
                //rfileProc和wfileProc由aeCreateFileEvent 创建fe时,传入的处理器,和fd绑定的,这里的fe->clientData 就是client,在aeCreateFileEvent里
                fe->rfileProc(eventLoop,fd,fe->clientData,mask);
                fired++;
            }

            /* Fire the writable event. */
            //处理写事件
            if (fe->mask & mask & AE_WRITABLE) {
                if (!fired || fe->wfileProc != fe->rfileProc) {
                    fe->wfileProc(eventLoop,fd,fe->clientData,mask);
                    fired++;
                }
            }

            // 反转调用,
            if (invert && fe->mask & mask & AE_READABLE) {
                if (!fired || fe->wfileProc != fe->rfileProc) {
                    fe->rfileProc(eventLoop,fd,fe->clientData,mask);
                    fired++;
                }
            }
            //处理次数+1
            processed++;
        }
    }
    /* Check time events */
    //处理定期任务主要是serverCron
    if (flags & AE_TIME_EVENTS)
        processed += processTimeEvents(eventLoop);

    return processed; /* return the number of processed file/time events */
}

我们先看下aeEventLoop,有助于理解


/**
 * @brief 事件管理器,整个进程只有一个
 */
typedef struct aeEventLoop {
    //最大的fd
    int maxfd;   /* highest file descriptor currently registered */
    //最多持有这么多连接(最大链接+128),events和fired 数组的大小
    int setsize; /* max number of file descriptors tracked */
     //记录最大的定时事件id(放几个为几),存放定时事件会自增
    long long timeEventNextId;
    time_t lastTime;     /* Used to detect system clock skew */
    //已注册的文件事件处理器,在initServer里,一个fd绑定一个
    aeFileEvent *events; /* Registered events */
    //触发的的事件(在ae中会把所有从epoll里拉取到的事件丢到这里)
    aeFiredEvent *fired; /* Fired events */
    //定时事件链表的头节点
    aeTimeEvent *timeEventHead;
    //事件循环结束标识
    int stop;
    //epoll的数据,
    void *apidata; /* This is used for polling API specific data */
    //aeProcessEvents处理前执行(每循环一次执行一次)
    aeBeforeSleepProc *beforesleep;
    //aeApiPoll 后执行
    aeBeforeSleepProc *aftersleep;
} aeEventLoop;

我们看下aeApiPoll 是如何从epoll里拿数据的?

在此之前,我们得先了解下redis是如何选择的。在ae.c文件中

/**
 * @brief ae的几种实现
 * redis按照性能从上到下排序
 * evport: 支持Solaris
 * epoll: 支持linux
 * kqueue: 支持FreeBSD 系统 如macos
 * select: 都不包含就是select
 */
#ifdef HAVE_EVPORT
#include "ae_evport.c"
#else
    #ifdef HAVE_EPOLL
    #include "ae_epoll.c"
    #else
        #ifdef HAVE_KQUEUE
        #include "ae_kqueue.c"
        #else
        #include "ae_select.c"
        #endif
    #endif
#endif

由于我的电脑是mac,直接定位到kqueue。我们看下ae_kqueue.c中的aeApiPoll

/**
 * @brief 单位时间内获取事件数量
 * @param eventLoop 
 * @param tvp 单位时间
 * @return int 返回待处理的事件数量
 */
static int aeApiPoll(aeEventLoop *eventLoop, struct timeval *tvp) {
    aeApiState *state = eventLoop->apidata;
    int retval, numevents = 0;
    /**
     * @brief 获取已经就绪的事件数据,是和fd绑定的,fd已经绑定了对应的处理器
     * timeout指针为空,那么kevent()会永久阻塞,直到事件发生
     * timeout有值,那么最多阻塞这么长时间
     */
    if (tvp != NULL) {
        struct timespec timeout;
        timeout.tv_sec = tvp->tv_sec;
        timeout.tv_nsec = tvp->tv_usec * 1000;
        retval = kevent(state->kqfd, NULL, 0, state->events, eventLoop->setsize,
                        &timeout);
    } else {

        retval = kevent(state->kqfd, NULL, 0, state->events, eventLoop->setsize,
                        NULL);
    }
    //将事件填充到eventLoop->fired中
    if (retval > 0) {
        int j;

        numevents = retval;
        for(j = 0; j < numevents; j++) {
            int mask = 0;
            struct kevent *e = state->events+j;

            if (e->filter == EVFILT_READ) mask |= AE_READABLE;
            if (e->filter == EVFILT_WRITE) mask |= AE_WRITABLE;
            // 这里的ident是创建事件的时候赋值的
            eventLoop->fired[j].fd = e->ident;
            eventLoop->fired[j].mask = mask;
        }
    }
    return numevents;
}

处理器

acceptTcpHandler

源码位置:networking.c中

注册时机:启动时initServer

处理内容:

  • 监听开放的ip和端口,拿到请求后创建fd,

  • 根据fd创建文件事件,并给fd绑定回调函数readQueryFromClient,用于处理AE_READABLE事件

  • 创建client,并绑定readQueryFromClient到该fd上,当监听到这个fd的AE_READABLE事件后,回调

我们看下acceptTcpHandler如何处理(关键点)


/**
 * @brief tcp处理器
 * @param el 
 * @param fd 当前tcp的fd
 * @param privdata 对应epoll数据
 * @param mask 
 */
void acceptTcpHandler(aeEventLoop *el, int fd, void *privdata, int mask) {
    /**
     * cport 当前的端口
     * cfd 当前的fd
     * max 一次最多处理1000
     */
    int cport, cfd, max = MAX_ACCEPTS_PER_CALL;
    char cip[NET_IP_STR_LEN];
    UNUSED(el);
    UNUSED(mask);
    UNUSED(privdata);
    //取tcp请求
    while(max--) {
        /**
         * @brief 监听tcp socket ,获取一个新的fd,后续再研究下这里 TODO
         * 新的fd就是一个有效的链接
         */
        cfd = anetTcpAccept(server.neterr, fd, cip, sizeof(cip), &cport);
        if (cfd == ANET_ERR) {
            if (errno != EWOULDBLOCK)
                serverLog(LL_WARNING,
                    "Accepting client connection: %s", server.neterr);
            return;
        }
        serverLog(LL_VERBOSE,"Accepted %s:%d", cip, cport);
        //针对新监听到的请求处理cfd
        acceptCommonHandler(cfd,0,cip);
    }
}
/**
 * @brief 针对新监听到的请求(fd)处理
 *  主要是创建file event 让readQueryFromClient 监听AE_READABLE 事件
 *  并将client加入到server.clients的队尾
 * @param fd 针对
 * @param flags 
 * @param ip 
 */
static void acceptCommonHandler(int fd, int flags, char *ip) {
    client *c;
    // 根据监听到的请求fd创建客户端,并扔到server.clients队尾,如果有事务业务初始化
    if ((c = createClient(fd)) == NULL) {
        serverLog(LL_WARNING,
            "Error registering fd event for the new client: %s (fd=%d)",
            strerror(errno),fd);
        close(fd); /* May be already closed, just ignore errors */
        return;
    }
     //超了maxclients,直接就不处理了
    if (listLength(server.clients) > server.maxclients) {
        char *err = "-ERR max number of clients reached\r\n";
        //拒绝链接数+1
        server.stat_rejected_conn++;
        freeClient(c);
        return;
    }
    //链接数+1
    server.stat_numconnections++;
    c->flags |= flags;
}

最关键的在createClient
/**
 * @brief 根据fd创建客户端信息(重要)
 * 1,设置新的请求为非阻塞,无延迟,并设置KeepAlive为erver.tcpkeepalive
 * 2,根据fd创建一个file event 并给fd绑定回调函数readQueryFromClient,当监听到这个fd的AE_READABLE事件后,回调
 * 3,根据此fd创建一个client,并将该client放入到server.clients的队尾
 * @param fd tcp对应的fd
 * @return client* 
 */
client *createClient(int fd) {
    client *c = zmalloc(sizeof(client));

    if (fd != -1) {
        //设置tcp非阻塞
        anetNonBlock(NULL,fd);
        //设置tcp无延迟
        anetEnableTcpNoDelay(NULL,fd);
        if (server.tcpkeepalive)
            //设置tcp的KeepAlive
            anetKeepAlive(NULL,fd,server.tcpkeepalive);
        //注册一个file event, 由回调函数readQueryFromClient 去处理该fd的AE_READABLE事件
        if (aeCreateFileEvent(server.el,fd,AE_READABLE,
            readQueryFromClient, c) == AE_ERR)
        {
            //创建失败,就关闭并释放
            close(fd);
            zfree(c);
            return NULL;
        }
    }

    selectDb(c,0);
    uint64_t client_id;
    //原子获取client_id
    atomicGetIncr(server.next_client_id,client_id,1);
    c->id = client_id;
    //client上绑定的是新请求,也就是对应请求的fd
    c->fd = fd;
    ......
    //解析tcp后flags是0
    c->flags = 0;
    c->ctime = c->lastinteraction = server.unixtime;
    ......
    //将新生产的client放入server.clients队尾
    if (fd != -1) linkClient(c);
    //初始化事务
    initClientMultiState(c);
    return c;
}

对应流程图如下:

命令执行 readQueryFromClient

源码位置:networking.c中

注册时机:在acceptTcpHandler里监听到请求后,拿到fd,绑定readQueryFromClient

处理内容:

  • 从tcp缓冲区读取信息并解析到client里

  • 查找命令并执行在server.c中的processCommand里

触发代码
 fe->rfileProc(eventLoop,fd,fe->clientData,mask);

 /**
 * @brief 读取客户端信息
 * @param el 
 * @param fd 对应请求的fd
 * @param privdata client客户端
 * @param mask 
 */
void readQueryFromClient(aeEventLoop *el, int fd, void *privdata, int mask) {
    //处理输入流
    processInputBufferAndReplicate(c);
}

void processInputBufferAndReplicate(client *c) {
    //master节点
    if (!(c->flags & CLIENT_MASTER)) {
        //处理输入缓冲区(主要看这里)
        processInputBuffer(c);
    } else {
        //集群同步复制
        size_t prev_offset = c->reploff;
        processInputBuffer(c);
        //有数据才同步
        size_t applied = c->reploff - prev_offset;
        if (applied) {
            replicationFeedSlavesFromMasterStream(server.slaves,
                    c->pending_querybuf, applied);
            sdsrange(c->pending_querybuf,applied,-1);
        }
    }
}

/**
 * @brief 处理输入内容
 * 
 * @param c 
 */
void processInputBuffer(client *c) {
    /**
     * @brief 根据不同类型从缓冲区读取数据
     */
    if (c->reqtype == PROTO_REQ_INLINE) {
        //根据不同类型从缓冲区读取数据,并将客户端执行内容解析到成robj并出入c->argv
        if (processInlineBuffer(c) != C_OK) break;
    } else if (c->reqtype == PROTO_REQ_MULTIBULK) {
        if (processMultibulkBuffer(c) != C_OK) break;
    } else {
        serverPanic("Unknown request type");
    }
    if (processCommand(c) == C_OK) {

    }
    //设置为空,下一个client可以继续
    server.current_client = NULL;
}

int processCommand(client *c) {
    //从server.commands字典里查询命令执行命令的映射,c->argv[0]为命令名称
    c->cmd = c->lastcmd = lookupCommand(c->argv[0]->ptr);
    //找不到命令就未知命令异常,参数不对,也异常
    //其他一堆逻辑判断
    /**
     * @brief 执行命令
     * 开始了事务(CLIENT_MULTI)直接放入Multi队列
     * 
     */
    if (c->flags & CLIENT_MULTI &&
        c->cmd->proc != execCommand && c->cmd->proc != discardCommand &&
        c->cmd->proc != multiCommand && c->cmd->proc != watchCommand)
    {
        queueMultiCommand(c);
        addReply(c,shared.queued);
    } else {
        //执行命令回调
        call(c,CMD_CALL_FULL);
        //
        c->woff = server.master_repl_offset;
        if (listLength(server.ready_keys))
            handleClientsBlockedOnKeys();
    }
}

//命令执行
void call(client *c, int flags) {
    //命令执行,会执行对应的redisCommand
    c->cmd->proc(c);
}

看下面的代码,proc就是第二个参数,就是对应命令的具体方法

struct redisCommand redisCommandTable[] = {
    {"module",moduleCommand,-2,"as",0,NULL,0,0,0,0,0},
    {"get",getCommand,2,"rF",0,NULL,1,1,1,0,0},
    {"set",setCommand,-3,"wm",0,NULL,1,1,1,0,0},
    {"setnx",setnxCommand,3,"wmF",0,NULL,1,1,1,0,0},
    {"setex",setexCommand,4,"wm",0,NULL,1,1,1,0,0},
    ......
}

struct redisCommand {
    char *name;
    redisCommandProc *proc;
    int arity;
    char *sflags; /* Flags as string representation, one char per flag. */
    int flags;    /* The actual flags, obtained from the 'sflags' field. */
    redisGetKeysProc *getkeys_proc;
    int firstkey; /* The first argument that's a key (0 = no keys) */
    int lastkey;  /* The last argument that's a key */
    int keystep;  /* The step between first and last key */
    long long microseconds, calls;
};

定时任务 serverCron

源码位置:server.c中

注册时机:initServer()

/**
 * @brief 时间事件执行
 * @param eventLoop fd
 * @param id  fd
 * @param clientData 
 * @return int 
 */
int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) {
    /**
     * @brief 处理server.clients 里的任务
     * 这里尾结点反转到头结点,因为clients采用的尾插法,如果最后一个过期了,那之前的请求响应客户端都过期了
     * 1,处理超期任务
     * 2,回收查询缓冲区
     * 3,先不管,TODO
     */
    clientsCron();
    /**
     * 1,处理过期key,发送过期事件并删除
     * 2,内存碎片整理
     * 3,rehash
     */
    databasesCron();
    //rdb操作
    rdbSaveBackground(server.rdb_filename,rsiptr);
    //aof
    rewriteAppendOnlyFileBackground();
    //释放需要异步释放的客户端链表
    freeClientsInAsyncFreeQueue();
    //其他采样
}

回写事件

接收任务都搞定了,那我们再看一个回写事件。就拿get命令来看

server.c中
 {"get",getCommand,2,"rF",0,NULL,1,1,1,0,0},

t_string.c中

void getCommand(client *c) {
    getGenericCommand(c);
} 

int getGenericCommand(client *c) {
    robj *o;
    //为空直接返回
    if ((o = lookupKeyReadOrReply(c,c->argv[1],shared.nullbulk)) == NULL)
        return C_OK;

    //不为空,
    if (o->type != OBJ_STRING) {
        //添加异常回复
        addReply(c,shared.wrongtypeerr);
        return C_ERR;
    } else {
        //添加到响应队列
        addReplyBulk(c,o);
        return C_OK;
    }
}

最后都转到了这里
networking.c

/**
 * @brief 添加回复
 * @param c 
 * @param obj robj
 */
void addReply(client *c, robj *obj) {
    //判断并把要写回的客户端写入到server.clients_pending_write
    if (prepareClientToWrite(c) != C_OK) return;
    //如果回复的内容是sds编码
    if (sdsEncodedObject(obj)) {
        /**
         * @brief 先尝试使用_addReplyToBuffer 写入缓冲区,写入失败再_addReplyStringToList
         */
        if (_addReplyToBuffer(c,obj->ptr,sdslen(obj->ptr)) != C_OK)
            _addReplyStringToList(c,obj->ptr,sdslen(obj->ptr));
    } else if (obj->encoding == OBJ_ENCODING_INT) {
        //回复的内容是编码是int(真省空间)
        char buf[32];
        size_t len = ll2string(buf,sizeof(buf),(long)obj->ptr);
        if (_addReplyToBuffer(c,buf,len) != C_OK)
            _addReplyStringToList(c,buf,len);
    } else {
        serverPanic("Wrong obj->encoding in addReply()");
    }
}

int prepareClientToWrite(client *c) {
    /**
     * 如果c->bufpos 和c->reply,说明这个客户端之前已经放入了等待写队列server.clients_pending_write
     */
    if (!clientHasPendingReplies(c)) 
       // 将客户用头插法写入server.clients_pending_write
       clientInstallWriteHandler(c);
}

/**
 * @brief 将响应内容写入c->buf数组,buf长度有限制,只有16kb
 * @param c 
 * @param s 
 * @param len 
 * @return int
 */
int _addReplyToBuffer(client *c, const char *s, size_t len) {
    size_t available = sizeof(c->buf)-c->bufpos;

    if (c->flags & CLIENT_CLOSE_AFTER_REPLY) return C_OK;

    /* If there already are entries in the reply list, we cannot
     * add anything more to the static buffer. */
    //在c->reply里有值,就不管
    if (listLength(c->reply) > 0) return C_ERR;

    /* Check that the buffer has enough space available for this string. */
    if (len > available) return C_ERR;
    //复制s到c->buf
    memcpy(c->buf+c->bufpos,s,len);
    //表示输出缓冲区的大小
    c->bufpos+=len;
    return C_OK;
}

/**
 * @brief 将响应内容输入到c->reply链表里
 * 
 * @param c 
 * @param s 
 * @param len 
 */
void _addReplyStringToList(client *c, const char *s, size_t len) {
    if (c->flags & CLIENT_CLOSE_AFTER_REPLY) return;

    listNode *ln = listLast(c->reply);
    //获取队尾
    clientReplyBlock *tail = ln? listNodeValue(ln): NULL;
    /**
     * @brief reply 已经有了数据,就后写,
     * 如果clientReplyBlock剩余的空间不够,就再新建一个
     * clientReplyBlock 最少16k,如果响应字符串较小,一个填充不完
     */
    if (tail) {
        size_t avail = tail->size - tail->used;
        size_t copy = avail >= len? len: avail;
        memcpy(tail->buf + tail->used, s, copy);
        tail->used += copy;
        s += copy;
        len -= copy;
    }
    //len有数据,要么往已有的追加,还有剩余,要么是因为tail为null
    if (len) {
        size_t size = len < PROTO_REPLY_CHUNK_BYTES? PROTO_REPLY_CHUNK_BYTES: len;
        tail = zmalloc(size + sizeof(clientReplyBlock));
        /* take over the allocation's internal fragmentation */
        tail->size = zmalloc_usable(tail) - sizeof(clientReplyBlock);
        tail->used = len;
        memcpy(tail->buf, s, len);
        listAddNodeTail(c->reply, tail);
        c->reply_bytes += tail->size;
    }
    //将客户端加入到server.clients_to_close
    asyncCloseClientOnOutputBufferLimitReached(c);
}

这块就不画图了,代码很清晰

  • 所有的响应都走networking.c中的addReply

  • 如果对客户端有响应数据,直接将client写入到server.clients_pending_write

  • addReply 又根据不同的编码输入到不同响应缓冲区

从上面几个流程来看,redis5是单线程处理的

写回客户端

一直到这里,只是把客户端写入了server.clients_pending_write

写回的数据写入到了c->buf和c->reply。

但是并没有写会客户端啊,别急,还记得我们aeMain里的beforeSleep么?

先看下server.c中的beforeSleep

/**
 * @brief 循环处理前执行
 * 
 * @param eventLoop 
 */
void beforeSleep(struct aeEventLoop *eventLoop) {
        //激活快循环
    if (server.active_expire_enabled && server.masterhost == NULL)
        //执行快循环
        activeExpireCycle(ACTIVE_EXPIRE_CYCLE_FAST);
    ......
     //将aof缓冲区写入磁盘
    flushAppendOnlyFile(0);

    /* Handle writes with pending output buffers. */
    //处理等待的回写队列(最后再看)
    handleClientsWithPendingWrites();
}

在networking.c中

int handleClientsWithPendingWrites(void) {
    int processed = listLength(server.clients_pending_write);

    listRewind(server.clients_pending_write,&li);
    while((ln = listNext(&li))) {
        if (writeToClient(c->fd,c,0) == C_ERR) continue;
    }
}

int writeToClient(int fd, client *c, int handler_installed) {
      while(clientHasPendingReplies(c)) {
        //c->buf 或c->reply有数据,二选一,下面是处理。
        if (c->bufpos > 0) {
            //直接调用底层write将数据写入了fd,响应给客户端
            nwritten = write(fd,c->buf+c->sentlen,c->bufpos-c->sentlen);

        } else {
            o = listNodeValue(listFirst(c->reply));
            objlen = o->used;

            if (objlen == 0) {
                c->reply_bytes -= o->size;
                listDelNode(c->reply,listFirst(c->reply));
                continue;
            }
             //直接调用底层write将数据写入了fd,响应给客户端
            nwritten = write(fd, o->buf + c->sentlen, objlen - c->sentlen);

        }
}

至此,整个redis,监听、接收请求、执行命令、写回数据形成了闭环

redis源码系列文章

redis源码阅读-入门篇

redis源码阅读二-终于把redis的启动流程搞明白了

后续会持续更新,感兴趣的小伙伴,可以一键三连哦。

yxkong

这个人很懒,什么都没留下