redis源码阅读-发布与订阅pub/sub

2022/03/30 1344点热度 0人点赞 0条评论

redis的发布订阅(pub/sub)是一种消息通信模式,由发布者(pub)发布消息,订阅者订阅(sub)消息。redis通过publish和subscribe等命令实现了发布与订阅模式。

我们先通过一张图了解下工作机制:

我们看下案列

# 客户端1
127.0.0.1:6379> subscribe notice
Reading messages... (press Ctrl-C to quit)
1) "subscribe"
2) "notice"
3) (integer) 1
# publish
127.0.0.1:6379> publish notice 'update version=1.1'
(integer) 1

# 客户端1
1) "message"
2) "notice"
3) "update version=1.1"

再看下消息通道的数据结构

struct redisServer {  
    /**订阅客户端
     * key 为正在订阅的频道
     * val 为订阅这个频道的客户端
     * */
    dict *pubsub_channels;  /* Map channels to list of subscribed clients */
    //模式匹配
    list *pubsub_patterns;  /* A list of pubsub_patterns */
}

typedef struct client {
    dict *pubsub_channels;  /* channels a client is interested in (SUBSCRIBE) */
    list *pubsub_patterns;  /* patterns a client is interested in (SUBSCRIBE) */
} client;

typedef struct pubsubPattern {
    client *client;
    robj *pattern;
} pubsubPattern;

看下消费的源码

void subscribeCommand(client *c) {
    int j;
    //订阅多个通道的时候
    for (j = 1; j < c->argc; j++)
        pubsubSubscribeChannel(c,c->argv[j]);
    c->flags |= CLIENT_PUBSUB;
}
int pubsubSubscribeChannel(client *c, robj *channel) {
    dictEntry *de;
    list *clients = NULL;
    int retval = 0;

    /* Add the channel to the client -> channels hash table */
    //将客户端订阅的频道都放入c->pubsub_channels的hash表中
    if (dictAdd(c->pubsub_channels,channel,NULL) == DICT_OK) {
        retval = 1;
        incrRefCount(channel);
        /* Add the client to the channel -> list of clients hash table */
        /**
         * 将客户端添加到hash表中,以channel为key
         */
        de = dictFind(server.pubsub_channels,channel);
        //不存在客户端,创建列表
        if (de == NULL) {
            clients = listCreate();
            dictAdd(server.pubsub_channels,channel,clients);
            incrRefCount(channel);
        } else {
            clients = dictGetVal(de);
        }
        //追加
        listAddNodeTail(clients,c);
    }
    /* Notify the client */
    addReply(c,shared.mbulkhdr[3]);
    addReply(c,shared.subscribebulk);
    addReplyBulk(c,channel);
    addReplyLongLong(c,clientSubscriptionsCount(c));
    return retval;
}

再看下发送的源码

void publishCommand(client *c) {
    int receivers = pubsubPublishMessage(c->argv[1],c->argv[2]);
}   
/**
 * 发布广播消息
 * @param channel
 * @param message
 * @return
 */
int pubsubPublishMessage(robj *channel, robj *message) {
    int receivers = 0;
    dictEntry *de;
    listNode *ln;
    listIter li;

    /* Send to clients listening for that channel */
    /**
     * 从server.pubsub_channels中获取订阅channel的客户端链表
     */
    de = dictFind(server.pubsub_channels,channel);
    if (de) {
        list *list = dictGetVal(de);
        listNode *ln;
        listIter li;

        listRewind(list,&li);
        //遍历所有的客户端,一个个的通知
        while ((ln = listNext(&li)) != NULL) {

            client *c = ln->value;
            //将消息以固定格式回复,message,通道,消息内容
            addReply(c,shared.mbulkhdr[3]);
            addReply(c,shared.messagebulk);
            addReplyBulk(c,channel);
            addReplyBulk(c,message);
            receivers++;
        }
    }
    /* Send to clients listening to matching channels */
    /**
     * 从server.pubsub_patterns 获取所有的客户端,
     */
    if (listLength(server.pubsub_patterns)) {
        listRewind(server.pubsub_patterns,&li);
        channel = getDecodedObject(channel);
        while ((ln = listNext(&li)) != NULL) {
            pubsubPattern *pat = ln->value;
            //按模式匹配所有的客户端,只要pat->pattern->ptr能匹配到对应的channel->ptr,就发
            if (stringmatchlen((char*)pat->pattern->ptr,
                                sdslen(pat->pattern->ptr),
                                (char*)channel->ptr,
                                sdslen(channel->ptr),0)) {
                addReply(pat->client,shared.mbulkhdr[4]);
                addReply(pat->client,shared.pmessagebulk);
                addReplyBulk(pat->client,pat->pattern);
                addReplyBulk(pat->client,channel);
                addReplyBulk(pat->client,message);
                receivers++;
            }
        }
        decrRefCount(channel);
    }
    return receivers;
}
  • 可以看到发布订阅通过redis服务中的server.pubsub_channels来服务的
  • 当客户端发起subscribe的命令的时候,会把客户端订阅的channels放入到server.pubsub_channels中,对应的channel通过hash路由到对应的桶,然后找到对应的channel,channel对应的value是一个链表,维护所有订阅的客户端
  • 当publish的时候,根据channel找到对应的value,一个个的遍历所有的客户端,并将publish的内容通知给订阅的客户端,除了具体的channel,还有个模式匹配,原理差不多
  • 从原来来看,发布时,订阅的服务必须在线,要不然消息就没了,redis不会存储这些消息
  • 这个场景比较适合版本的更新,比如规则包发布,所有的在线服务都需要更新(得注意点,没法做到强一致,一致性得自己根据业务需求考虑)

redis系列文章

redis源码阅读-入门篇

redis源码阅读二-终于把redis的启动流程搞明白了

redis源码阅读三-终于把主线任务执行搞明白了

redis源码阅读四-我把redis6里的io多线程执行流程梳理明白了

redis源码阅读五-为什么大量过期key会阻塞redis?

redis源码六-redis中的缓存淘汰策略处理分析

redis源码阅读-之哨兵流程

redis源码阅读-持久化之RDB

redis源码阅读-持久化之aof

redis源码阅读-rehash详解

阅读redis源码的时候一些c知识

阅读redis持久化RDB源码的时候一些c知识

linux中的文件描述符与套接字socket

redis中的IO多路复用select和epoll

Reactor模式详解及redis如何使用

redis的key过期了还能取出来?

本文是Redis源码剖析系列博文,有想深入学习Redis的同学,欢迎star和关注;
Redis中文注解版:https://github.com/yxkong/redis/tree/5.0
如果觉得本文对你有用,欢迎一键三连;
同时可以关注微信公众号5ycode获取第一时间的更新哦;

yxkong

这个人很懒,什么都没留下

文章评论