之前分析了通过redis源码阅读二-终于把redis的启动流程搞明白了分析了redis的启动流程,通过redis源码阅读三-终于把主线任务执行搞明白了分析了redis的主线任务,这次从redis6.2分支上分析redis6的io多线程。
我注释的代码主要在redis5.0分支上。6.2上注释的比较少。
https://github.com/yxkong/redis/tree/6.2
https://github.com/yxkong/redis/tree/5.0
acceptTcpHandler处理的差异
虽然最终都是添加了一个处理器是readQueryFromClient的FileEvent事件,但是6.2分支就有点绕,可能是6以后想向面向对象靠拢了。
在5.0中createClient的时候直接创建了一个FileEvent
aeCreateFileEvent(server.el,fd,AE_READABLE, readQueryFromClient, c)
在6.2分支中在createClient的时候,通过connSetReadHandler间接的创建FileEvent
在networking.c中
//注册readQueryFromClient,当connection读的时候回调时调用
connSetReadHandler(conn, readQueryFromClient);
这块就有点绕,对于熟悉java的同学来说
在connection.h中定义了结构体ConnectionType,且connection有个ConnectionType的属性,这里是一堆接口
typedef struct ConnectionType {
void (*ae_handler)(struct aeEventLoop *el, int fd, void *clientData, int mask);
int (*connect)(struct connection *conn, const char *addr, int port, const char *source_addr, ConnectionCallbackFunc connect_handler);
int (*write)(struct connection *conn, const void *data, size_t data_len);
int (*read)(struct connection *conn, void *buf, size_t buf_len);
void (*close)(struct connection *conn);
int (*accept)(struct connection *conn, ConnectionCallbackFunc accept_handler);
int (*set_write_handler)(struct connection *conn, ConnectionCallbackFunc handler, int barrier);
int (*set_read_handler)(struct connection *conn, ConnectionCallbackFunc handler);
const char *(*get_last_error)(struct connection *conn);
int (*blocking_connect)(struct connection *conn, const char *addr, int port, long long timeout);
ssize_t (*sync_write)(struct connection *conn, char *ptr, ssize_t size, long long timeout);
ssize_t (*sync_read)(struct connection *conn, char *ptr, ssize_t size, long long timeout);
ssize_t (*sync_readline)(struct connection *conn, char *ptr, ssize_t size, long long timeout);
int (*get_type)(struct connection *conn);
} ConnectionType;
struct connection {
ConnectionType *type;
ConnectionState state;
short int flags;
short int refs;
int last_errno;
void *private_data;
ConnectionCallbackFunc conn_handler;
ConnectionCallbackFunc write_handler;
ConnectionCallbackFunc read_handler;
int fd;
};
在connection.c中,对ConnectionType有个默认实现(我不知道这么理解对不对)
ConnectionType CT_Socket = {
.ae_handler = connSocketEventHandler,
.close = connSocketClose,
.write = connSocketWrite,
.read = connSocketRead,
.accept = connSocketAccept,
.connect = connSocketConnect,
.set_write_handler = connSocketSetWriteHandler,
.set_read_handler = connSocketSetReadHandler,
.get_last_error = connSocketGetLastError,
.blocking_connect = connSocketBlockingConnect,
.sync_write = connSocketSyncWrite,
.sync_read = connSocketSyncRead,
.sync_readline = connSocketSyncReadLine,
.get_type = connSocketGetType
};
static int connSocketSetReadHandler(connection *conn, ConnectionCallbackFunc func) {
if (func == conn->read_handler) return C_OK;
conn->read_handler = func;
if (!conn->read_handler)
aeDeleteFileEvent(server.el,conn->fd,AE_READABLE);
else
//真正的目标是这里,创建一个fileEvent事件,回调函数是readQueryFromClient
if (aeCreateFileEvent(server.el,conn->fd,
AE_READABLE,conn->type->ae_handler,conn) == AE_ERR) return C_ERR;
return C_OK;
}
static inline int connSetReadHandler(connection *conn, ConnectionCallbackFunc func) {
//所以这里调用的是connection.c中的connSocketSetReadHandler
return conn->type->set_read_handler(conn, func);
}
fileEvent创建好了以后,怎么触发就不说了,前面都讲过。
InitServerLast 中的差异
在redis6.2分支里有个initThreadedIO(),在这里
-
初始化了io线程池数组
-
主线程是io线程池中的第一个线程
-
同时设置了其他线程的执行方法,轮训等待处理分配给自己的io_threads_list(io读写任务链表)
-
真正的处理IO读,IO写
/**
* @brief 初始化IO线程
*/
void initThreadedIO(void) {
server.io_threads_active = 0; /* We start with threads not active. */
//io_threads_num=1退出,因为主线程承担了
if (server.io_threads_num == 1) return;
//大于128个线程就退出启动流程
if (server.io_threads_num > IO_THREADS_MAX_NUM) {
serverLog(LL_WARNING,"Fatal: too many I/O threads configured. "
"The maximum number is %d.", IO_THREADS_MAX_NUM);
exit(1);
}
/* Spawn and initialize the I/O threads. */
/**
* @brief 创建和初始化io线程池
*/
for (int i = 0; i < server.io_threads_num; i++) {
/* Things we do for all the threads including the main thread. */
//i=0 是主线程,主线程也处理io
io_threads_list[i] = listCreate();
if (i == 0) continue; /* Thread 0 is the main thread. */
/* Things we do only for the additional threads. */
//初始化互斥锁,并加锁
pthread_t tid;
pthread_mutex_init(&io_threads_mutex[i],NULL);
//io等待数量,设置io_threads_pending[i]为0
setIOPendingCount(i, 0);
pthread_mutex_lock(&io_threads_mutex[i]); /* Thread will be stopped. */
if (pthread_create(&tid,NULL,IOThreadMain,(void*)(long)i) != 0) {
serverLog(LL_WARNING,"Fatal: Can't initialize IO thread.");
exit(1);
}
io_threads[i] = tid;
}
}
我们再看下IOThreadMain里
-
这里主要空轮训,等到任务了就处理
-
每轮训100万次,跑空后继续轮训100万次,如果主线程
**
* @brief io线程执行的内容,类似于java线程的run
* @param myid io数组的索引
* @return void*
*/
void *IOThreadMain(void *myid) {
/* The ID is the thread number (from 0 to server.iothreads_num-1), and is
* used by the thread to just manipulate a single sub-array of clients. */
long id = (unsigned long)myid;
char thdname[16];
//io_thd_+线程索引 就是对应的线程名称
snprintf(thdname, sizeof(thdname), "io_thd_%ld", id);
redis_set_thread_title(thdname);
//将后io线程绑定到指定的cpu列表上
redisSetCpuAffinity(server.server_cpulist);
//设置线程可取消
makeThreadKillable();
while(1) {
/* Wait for start */
/**
* @brief 空轮训,直到对应线程编号的io_threads_pending填充了数据或者这100万次执行完,会重新进来进行轮训
*/
for (int j = 0; j < 1000000; j++) {
if (getIOPendingCount(id) != 0) break;
}
/* Give the main thread a chance to stop this thread. */
//这里让主线程去停止,后续再看下TODO
if (getIOPendingCount(id) == 0) {
pthread_mutex_lock(&io_threads_mutex[id]);
pthread_mutex_unlock(&io_threads_mutex[id]);
continue;
}
serverAssert(getIOPendingCount(id) != 0);
/* Process: note that the main thread will never touch our list
* before we drop the pending count to 0. */
listIter li;
listNode *ln;
listRewind(io_threads_list[id],&li);
//遍历每个io线程里的任务
while((ln = listNext(&li))) {
client *c = listNodeValue(ln);
//处理对应的写事件
if (io_threads_op == IO_THREADS_OP_WRITE) {
writeToClient(c,0);
} else if (io_threads_op == IO_THREADS_OP_READ) {
//处理对应的读事件
readQueryFromClient(c->conn);
} else {
serverPanic("io_threads_op value is unknown");
}
}
listEmpty(io_threads_list[id]);
//设置为0
setIOPendingCount(id, 0);
}
}
readQueryFromClient 处理的差异
在6.2分支中有个postponeClientRead(c)函数。在这里如果开启了io多线程,就把client扔入到server.clients_pending_read中
int postponeClientRead(client *c) {
if (server.io_threads_active &&
server.io_threads_do_reads &&
!ProcessingEventsWhileBlocked &&
!(c->flags & (CLIENT_MASTER|CLIENT_SLAVE|CLIENT_PENDING_READ|CLIENT_BLOCKED)))
{
c->flags |= CLIENT_PENDING_READ;
listAddNodeHead(server.clients_pending_read,c);
return 1;
} else {
return 0;
}
}
aeProcessEvents
在redis6.2分支里,beforesleep的执行放在了aeApiPoll之前
int aeProcessEvents(aeEventLoop *eventLoop, int flags){
//6.0之前是在外面,现在调整到了里面
if (eventLoop->beforesleep != NULL && flags & AE_CALL_BEFORE_SLEEP)
eventLoop->beforesleep(eventLoop);
/* Call the multiplexing API, will return only on timeout or when
* some event fires. */
numevents = aeApiPoll(eventLoop, tvp);
/* After sleep callback. */
if (eventLoop->aftersleep != NULL && flags & AE_CALL_AFTER_SLEEP)
eventLoop->aftersleep(eventLoop);
}
同时beforeSleep和afterSleep的设置挪到了initServer
void initServer(void) {
aeSetBeforeSleepProc(server.el,beforeSleep);
aeSetAfterSleepProc(server.el,afterSleep);
}
在redis6.2分支里的beforeSleep 中我们重点关注以下两个方法,这两个方法都在networking.c中
//使用io线程池处理客户端等待读
handleClientsWithPendingReadsUsingThreads()
//使用io线程池处理客户端等待写
handleClientsWithPendingWritesUsingThreads();
-
先从server.clients_pending_read拿到任务,从0开始取模分发给所有的io线程
-
主线程处理分配给自己的任务io读取任务
-
主线程等待所有的io线程执行完分配的任务
-
处理clients_pending_read中的任务(因为在 aeApiPoll后才会触发添加到这里,所以这里的任务不会变)
-
这个时候才会把client从clients_pending_read中删除
-
主线程执行对应的命令
-
把执行完的任务加入到server.clients_pending_write
/**
* @brief 使用io线程读
* @return int
*/
int handleClientsWithPendingReadsUsingThreads(void) {
if (!server.io_threads_active || !server.io_threads_do_reads) return 0;
int processed = listLength(server.clients_pending_read);
if (processed == 0) return 0;
/* Distribute the clients across N different lists. */
listIter li;
listNode *ln;
//从等待读队列里拿到任务
listRewind(server.clients_pending_read,&li);
int item_id = 0;
while((ln = listNext(&li))) {
client *c = listNodeValue(ln);
//通过取模放入到了对应的io_threads_list队尾,这就是里面吹的Round Robin
int target_id = item_id % server.io_threads_num;
listAddNodeTail(io_threads_list[target_id],c);
item_id++;
}
io_threads_op = IO_THREADS_OP_READ;
//设置对应的io线程中的任务的数量
for (int j = 1; j < server.io_threads_num; j++) {
int count = listLength(io_threads_list[j]);
setIOPendingCount(j, count);
}
//主线程处理分配给自己的io读
listRewind(io_threads_list[0],&li);
while((ln = listNext(&li))) {
client *c = listNodeValue(ln);
readQueryFromClient(c->conn);
}
listEmpty(io_threads_list[0]);
//主线程等所有的io线程都处理完了才会继续往下走
while(1) {
unsigned long pending = 0;
for (int j = 1; j < server.io_threads_num; j++)
pending += getIOPendingCount(j);
if (pending == 0) break;
}
//server.clients_pending_read长度不为0,就一直执行
while(listLength(server.clients_pending_read)) {
ln = listFirst(server.clients_pending_read);
client *c = listNodeValue(ln);
c->flags &= ~CLIENT_PENDING_READ;
//从clients_pending_read删除client
listDelNode(server.clients_pending_read,ln);
serverAssert(!(c->flags & CLIENT_BLOCKED));
//执行命令
if (processPendingCommandsAndResetClient(c) == C_ERR) {
continue;
}
//读取
processInputBuffer(c);
/* We may have pending replies if a thread readQueryFromClient() produced
* replies and did not install a write handler (it can't).
*/
if (!(c->flags & CLIENT_PENDING_WRITE) && clientHasPendingReplies(c))
//写到server.clients_pending_write
clientInstallWriteHandler(c);
}
/* Update processed count on server */
server.stat_io_reads_processed += processed;
return processed;
}
handleClientsWithPendingWritesUsingThreads的代码处理逻辑和上面的差不多,就不再写了。
最后总结下(开启多线程io的情况下)
-
按tcp接入的顺序,创建fd
-
然后通过epoll使用readQueryFromClient读入到server.clients_pending_read
-
主线程从server.clients_pending_read读取client并分发任务给各IO线程(主线程是io线程池中索引为0的线程)
-
主线程执行分配到自己的io任务,其他io线程通过空轮训的方式等待分配到的任务并执行
-
主线程等待所有io线程执行完(通过各个io线程的任务量总和为0来判断)
-
再次从server.clients_pending_read里读取client,然后从server.clients_pending_read删除
-
顺序执行client上绑定的任务,执行完写入到server.clients_pending_write(这些都是顺序性的)
redis源码分析系列