Linux ·

Redis源码分析之发布订阅(pub/sub)

Redis算是缓存界的老大哥了,最近做的事情对Redis依赖较多,使用了里面的发布订阅功能,事务功能以及SortedSet等数据结构,后面准备好好学习总结一下Redis的一些知识点。

先看下redis发布订阅的结构:

Redis源码分析之发布订阅(pub/sub) Linux 第1张

redis发布订阅结构

其中发布者跟订阅者之间通过channel进行交互,channel分为两种模式。

一、redis发布订阅命令简介

redis中为发布订阅(pub/sub)功能提供了六个命令,分为两种模式。

  1. 由subscribe,unsubscribe组成,它们是负责订阅有确定名称的channel,例如subscribe test表示订阅名字为test的channel。
  2. 由psubscribe,punsubscribe组成,是负责订阅模糊名字的channel,例如psubscribe test* 表示订阅所有以test开头的channel。

最后再加上发布命令publish以及查看订阅相关信息的pubsub命令组成。

二、redis发布订阅源码分析

redis所有的命令及其处理函数都放在了server.c文件的开头,从其中找出发布订阅功能相关的命令信息。

    {"subscribe",subscribeCommand,-2,"pslt",0,NULL,0,0,0,0,0},
    {"unsubscribe",unsubscribeCommand,-1,"pslt",0,NULL,0,0,0,0,0},
    {"psubscribe",psubscribeCommand,-2,"pslt",0,NULL,0,0,0,0,0},
    {"punsubscribe",punsubscribeCommand,-1,"pslt",0,NULL,0,0,0,0,0},
    {"publish",publishCommand,3,"pltF",0,NULL,0,0,0,0,0},
    {"pubsub",pubsubCommand,-2,"pltR",0,NULL,0,0,0,0,0},

这里可以看出创建一条命令需要很多参数,我们这里只需要关注前两个参数,第一个参数表示命令的内容,第二个表示该命令对应的处理函数。

普通模式订阅subscribe函数:
该命令支持多个参数,即subscribe channel1,channel2...

void subscribeCommand(client *c) {
    int j;
    //这里挨个处理subscribe的参数,因为命令本身被作为参数0所以从1开始处理后面的参数
    for (j = 1; j < c->argc; j++)
        //订阅每个频道
        pubsubSubscribeChannel(c,c->argv[j]);
    //这里设置客户端的状态,下面会解释这个状态的作用
    c->flags |= CLIENT_PUBSUB;
}

在server.c文件中,processCommand函数是在调用具体命令函数之前的判断逻辑,其中有一段:

/* Only allow SUBSCRIBE and UNSUBSCRIBE in the context of Pub/Sub */
    if (c->flags & CLIENT_PUBSUB &&
        c->cmd->proc != pingCommand &&
        c->cmd->proc != subscribeCommand &&
        c->cmd->proc != unsubscribeCommand &&
        c->cmd->proc != psubscribeCommand &&
        c->cmd->proc != punsubscribeCommand) {
        addReplyError(c,"only (P)SUBSCRIBE / (P)UNSUBSCRIBE / PING / QUIT allowed in this context");
        return C_OK;
    }

这里注释也写的很清楚,就是当client处于pub/sub上下文时,只接收订阅相关命令以及一个ping命令,这就解释了上面subscribeCommand函数中为什么要设置客户端flag字段。

接下来看下订阅的具体逻辑:

int pubsubSubscribeChannel(client *c, robj *channel) {
    dictEntry *de;
    list *clients = NULL;
    int retval = 0;

    //把指定channel加入到client的pubsub_channels哈希表中
    //不成功说明已经订阅了该频道
    if (dictAdd(c->pubsub_channels,channel,NULL) == DICT_OK) {
        retval = 1;
        //这里是把该channel加入到client的哈希表中,引用加1
        incrRefCount(channel);
        //在server的发布订阅哈希表中查找指定channel
        de = dictFind(server.pubsub_channels,channel);
        //如果该channel还不存在,则创建
        if (de == NULL) {
            //创建一个空list
            clients = listCreate();
            //把channel加入到server的哈希表中,value就是该channel的所有订阅者
            dictAdd(server.pubsub_channels,channel,clients);
            //该channel引用加1
            incrRefCount(channel);
        } else {
            clients = dictGetVal(de);
        }
        //把client加入到该channel的订阅列表中
        listAddNodeTail(clients,c);
    }
    //一系列通知客户端的操作
    addReply(c,shared.mbulkhdr[3]);
    addReply(c,shared.subscribebulk);
    addReplyBulk(c,channel);
    addReplyLongLong(c,clientSubscriptionsCount(c));
    return retval;
}

总结一下,订阅其实就是把指定channel分别加入到client跟server的pub/sub哈希表中,然后在server端保存订阅了该channle的所有client列表,如下图:

Redis源码分析之发布订阅(pub/sub) Linux 第2张

普通模式发布订阅数据结构

下面看一下publish发布命令:
例如:publish channelName msg

void publishCommand(client *c) {
    //发布逻辑
    int receivers = pubsubPublishMessage(c->argv[1],c->argv[2]);
    //这里是关于集群或者AOF的操作
    if (server.cluster_enabled)
        clusterPropagatePublish(c->argv[1],c->argv[2]);
    else
        forceCommandPropagation(c,PROPAGATE_REPL);
    //返回给client通知了的订阅者数
    addReplyLongLong(c,receivers);
}

重点看下发布函数的源码:

int pubsubPublishMessage(robj *channel, robj *message) {
    int receivers = 0;
    dictEntry *de;
    listNode *ln;
    listIter li;

    //根据上面的订阅源码,这里就是取出订阅该channel的所有clients
    de = dictFind(server.pubsub_channels,channel);
    if (de) {
        //获取client的链表
        list *list = dictGetVal(de);
        listNode *ln;
        listIter li;
        //由client链表创建它的迭代器,c++代码真是无力吐槽
        listRewind(list,&li);
        //遍历所有client并发送消息
        while ((ln = listNext(&li)) != NULL) {
            client *c = ln->value;

            addReply(c,shared.mbulkhdr[3]);
            addReply(c,shared.messagebulk);
            addReplyBulk(c,channel);
            addReplyBulk(c,message);
            receivers++;
        }
    }
    
    //开始模糊匹配的逻辑处理,模糊模式使用的是链表而不是哈希表,后面会讲
    if (listLength(server.pubsub_patterns)) {
        //创建模糊规则的迭代器li
        listRewind(server.pubsub_patterns,&li);
        channel = getDecodedObject(channel);
        //遍历所有的模糊模式,如果匹配成功则发送消息
        while ((ln = listNext(&li)) != NULL) {
            pubsubPattern *pat = ln->value;
            //判断当前channel是否可以匹配模糊规则
            if (stringmatchlen((char*)pat->pattern->ptr,
                                sdslen(pat->pattern->ptr),
                                (char*)channel->ptr,
                                sdslen(channel->ptr),0)) {
                addReply(pat->client,shared.mbulkhdr[4]);
                addReply(pat->client,shared.pmessagebulk);
                addReplyBulk(pat->client,pat->pattern);
                addReplyBulk(pat->client,channel);
                addReplyBulk(pat->client,message);
                receivers++;
            }
        }
        decrRefCount(channel);
    }
    return receivers;
}

从上面的publish处理函数可以看出每次进行消息发布的时候,都会向普通模式跟模糊模式发布消息,同时也能看出普通模式跟模糊模式使用的是两种不同的数据结构,下面看下模糊订阅模式。

模糊模式订阅psubscribe函数:

//psubscribe命令对应的处理函数
void psubscribeCommand(client *c) {
    int j;
    //挨个订阅client指定的pattern
    for (j = 1; j < c->argc; j++)
        pubsubSubscribePattern(c,c->argv[j]);
    //修改client状态
    c->flags |= CLIENT_PUBSUB;
}

int pubsubSubscribePattern(client *c, robj *pattern) {
    int retval = 0;
    //判断client是否已经订阅该pattern,这里与普通模式不同,是个链表
    if (listSearchKey(c->pubsub_patterns,pattern) == NULL) {
        retval = 1;
        pubsubPattern *pat;
        //把指定pattern加入到client的pattern链表中
        listAddNodeTail(c->pubsub_patterns,pattern);
        //引用计数+1
        incrRefCount(pattern);
        //这里是创建一个pattern对象,并指向该client,加入到server的pattern链表中
        //从这里可以看出,多个client订阅同一个pattern会创建多个patter对象,与普通模式不同
        pat = zmalloc(sizeof(*pat));
        pat->pattern = getDecodedObject(pattern);
        pat->client = c;
        listAddNodeTail(server.pubsub_patterns,pat);
    }
    //通知客户端
    addReply(c,shared.mbulkhdr[3]);
    addReply(c,shared.psubscribebulk);
    addReplyBulk(c,pattern);
    addReplyLongLong(c,clientSubscriptionsCount(c));
    return retval;
}

通过分析上面的源码可以总结一下模糊订阅中的数据结构,如下图:

Redis源码分析之发布订阅(pub/sub) Linux 第3张

模糊发布订阅模式数据结构

注:正如上面提到的,模糊模式中,一个pat对象中包含一个pattern规则跟一个client指针,也就是说当多个client模糊订阅同一个pattern时同样会为每个client都创建一个节点。

普通模式取消订阅unsubscribe函数:
取消就相对简单了,说白了就是把上面锁保存在server跟client端的数据删除。

取消订阅入口
void unsubscribeCommand(client *c) {
    //如果该命令没有参数,则把channel全部取消
    if (c->argc == 1) {
        pubsubUnsubscribeAllChannels(c,1);
    } else {
        int j;
        //迭代取消置顶channel
        for (j = 1; j < c->argc; j++)
            pubsubUnsubscribeChannel(c,c->argv[j],1);
    }
    //如果channel被全部取消,则修改client状态,这样client就可以发送其他命令了
    if (clientSubscriptionsCount(c) == 0) c->flags &= ~CLIENT_PUBSUB;
}

//一次性取消订阅所有channel
int pubsubUnsubscribeAllChannels(client *c, int notify) {
     //取出client端所有的channel
    dictIterator *di = dictGetSafeIterator(c->pubsub_channels);
    dictEntry *de;
    int count = 0;

    while((de = dictNext(di)) != NULL) {
        robj *channel = dictGetKey(de);
        //最终也是挨个取消channel
        count += pubsubUnsubscribeChannel(c,channel,notify);
    }
    
    //如果client上面都没有订阅,依然返回响应
    if (notify && count == 0) {
        addReply(c,shared.mbulkhdr[3]);
        addReply(c,shared.unsubscribebulk);
        addReply(c,shared.nullbulk);
        addReplyLongLong(c,dictSize(c->pubsub_channels)+
                       listLength(c->pubsub_patterns));
    }
    //释放空间
    dictReleaseIterator(di);
    return count;
}

//取消订阅指定channel
int pubsubUnsubscribeChannel(client *c, robj *channel, int notify) {
    dictEntry *de;
    list *clients;
    listNode *ln;
    int retval = 0;
    //从client中删除指定channel
    if (dictDelete(c->pubsub_channels,channel) == DICT_OK) {
        retval = 1;
        //删除服务端该channel中的指定client
        de = dictFind(server.pubsub_channels,channel);
        serverAssertWithInfo(c,NULL,de != NULL);
        clients = dictGetVal(de);
        ln = listSearchKey(clients,c);
        serverAssertWithInfo(c,NULL,ln != NULL);
        listDelNode(clients,ln);
        if (listLength(clients) == 0) {
            //如果删除完以后channel没有了订阅者,则把channel也删除
            dictDelete(server.pubsub_channels,channel);
        }
    }
    //返回client响应
    if (notify) {
        addReply(c,shared.mbulkhdr[3]);
        addReply(c,shared.unsubscribebulk);
        addReplyBulk(c,channel);
        addReplyLongLong(c,dictSize(c->pubsub_channels)+
                       listLength(c->pubsub_patterns));

    }
    //引用计数-1
    decrRefCount(channel); 
    return retval;
}

由于模糊模式的取消订阅与普通模式类似,这里就不再贴代码了。

三、redis发布订阅总结

整个发布订阅的代码比较简单清晰,一个值得思考的问题时普通模式跟模糊模式中分别使用了哈希表跟链表两种结构进行处理,而不是统一的,原因在于模糊模式不能精确匹配,需要遍历挨个判断,而哈希表的优势在于快速定位查找,在需要遍历跟模糊匹配的场景中并不适用。

下面关于

参与评论