Redis底层数据结构-Stream源码分析

简介

Redis在5.0.0版本中引进了消息队列的功能,该功能由Stream实现,本文主要介绍Stream的相关实现。

数据结构

stream

Stream的实现依赖于Rax树与listpack结构,每个消息流都包含一个Rax树,以消息ID为key,listpack为value存储在Rax树中。其基本结构如下:

  • rax:rax存储生产者生产的具体消息,每个消息有唯一ID为键
  • length:代表当前stream中消息个数。
  • last_id:为当前stream中最后插入的消息ID,stream为空时,该值为0。
  • cgroups:存储了当前stream相关的消费组,以消费组组名为键,streamCG为值存储在rax中。
    typedef struct stream {
        rax *rax;               /* The radix tree holding the stream. */
        uint64_t length;        /* Number of elements inside this stream. */
        streamID last_id;       /* Zero if there are yet no items. */
        rax *cgroups;           /* Consumer groups dictionary: name -> streamCG */
    } stream;

一个Stream的基本结构如图所示: Stream结构

每一个listpack都有一个master entry,该结构存储了该listpack待插入消息的所有field。

streamID

消息ID的基本结构如下: - ms:消息创建时的时间 - seq:序号

typedef struct streamID {
    uint64_t ms;        /* Unix time in milliseconds. */
    uint64_t seq;       /* Sequence number. */
} streamID;

streamCG

每个stream会有多个消费组,每个消费组通过组名称进行唯一标识,同时关联一个streamCG结构。该结构如下: - last_id:该消费组已经确认的最后一个消息ID - pel:为该消费组尚未确认的消息,消息ID为键,streamNACK为值,存储在rax树中 - consumers:消费组中的所有消费者,消费者名称为键,streamConsumer为值。

typedef struct streamCG {
    streamID last_id;
    rax *pel;
    rax *consumers;
} streamCG;

streamConsumer

消费者结构如下: - seen_time:为该消费者最后一次活跃的时间 - name:消费者名称,为sds结构 - pel:为该消费者尚未确认的消息,消息ID为键,streamNACK为值,存储在rax树中

typedef struct streamConsumer {
    mstime_t seen_time;
    sds name;
    rax *pel;
} streamConsumer;

streamNACK

该结构为未确认消息,streamNACK维护了消费组或消费者尚未确认的消息,消费组中的pel的元素与消费者的pel元素是共享的。该结构如下: - delivery_time:为该消息最后发送给消费方的时间 - delivery_count:该消息已发送的次数 - consumer:当前归属的消费者

typedef struct streamNACK {
    mstime_t delivery_time;
    uint64_t delivery_count;
    streamConsumer *consumer;
} streamNACK;

streamIterator

该结构主要提供遍历功能,基本结构如下: - stream:当前迭代器正在遍历的消息流 - master_id:为listpack中第一个插入的消息ID(master entry) - master_fields_count:第一个entry的field个数 - master_fields_start:master entry的field首地址 - master_fields_ptr:记录field的位置 - entry_flags:当前遍历的消息的标志位 - rev:迭代器方向 - start_key,end_key:遍历范围 - ri:rax迭代器,用于遍历rax树中的所有key - lp:当前的listpack指针 - lp_ele:当前正在遍历的listpack中的元素 - lp_flags:指向翻墙消息的flag域 - field_buf,value_buf:从listpack读取数据的缓存

typedef struct streamIterator {
    stream *stream;         /* The stream we are iterating. */
    streamID master_id;     /* ID of the master entry at listpack head. */
    uint64_t master_fields_count;       /* Master entries # of fields. */
    unsigned char *master_fields_start; /* Master entries start in listpack. */
    unsigned char *master_fields_ptr;   /* Master field to emit next. */
    int entry_flags;                    /* Flags of entry we are emitting. */
    int rev;                /* True if iterating end to start (reverse). */
    uint64_t start_key[2];  /* Start key as 128 bit big endian. */
    uint64_t end_key[2];    /* End key as 128 bit big endian. */
    raxIterator ri;         /* Rax iterator. */
    unsigned char *lp;      /* Current listpack. */
    unsigned char *lp_ele;  /* Current listpack cursor. */
    unsigned char *lp_flags; /* Current entry flags pointer. */
    /* Buffers used to hold the string of lpGet() when the element is
     * integer encoded, so that there is no string representation of the
     * element inside the listpack itself. */
    unsigned char field_buf[LP_INTBUF_SIZE];
    unsigned char value_buf[LP_INTBUF_SIZE];
} streamIterator;

listpack

listpack是一个字符串列表的序列化格式,该结构可用于存储字符串或整型。其主要结构如下图所示: listpack基本结构 listpack主要由四部分构成,分别是: - Total Bytes为整个listpack的空间大小 - Num Elem:listpack的Entry个数,占用两个字节,但是Entry个数大于等于65535时,该值为65535,所以这种情况下获取元素个数,需要遍历整个listpack - Entry:为每个具体的元素 - End:为listpack的结束标志,占用一个字节,内容为0xFF

Entry由三部分构成,基本如下: - Encode:元素的编码方式,占用一个字节 - content:内容 - backlen:记录entry的长度(不包括该字段本身)

其中编码方式取值如下图所示: 编码取值

Stream的消息内容存储在listpack中,消息存储格式是每个字段都是一个entry,而不是键整个消息作为字符串存储的,每个listpack会存储多个消息,具体存储个数由stream-node-max-bytes(listpack节点最大占用字节数,默认4096)和stream-node-max-entries(最大存储元素个数,默认100)决定.

每个listpack在创建时,会将第一个插入的entry构建成master entry,其基本结构如下所示:

count   |   deleted  |  num-fields  |   field-1 |   field-2 |   0
其中: - count:为当前listpack中所有未删除的消息个数 - deleted:当前listpack中所有已经删除的消息个数 - num-fields:field个数 - field-N:field域 - 0为标识位,再从后向前遍历listpack时使用

存储一个消息时,如果该消息的field域与master entry完全相同,则不需要再次存储field域

Rax

Redis对于Rax的解释为A radix tree implement,基数树的一种实现。Rax中不仅可以存储字符串,也可以为该字符串设置值形成kv结构。其基本结构如下: - head:指向头节点 - numele:元素个数(key) - numnodes:节点个数

typedef struct rax {
    raxNode *head;
    uint64_t numele;
    uint64_t numnodes;
} rax;

raxNode

raxNode代表Rax树中的一个节点,其基本结构如下所示: - iskey:表明当前节点是否包含一个key,占用1bit - isnull:表明当前key对应的value是否为空,占用1bit - iscompr:表明当前节点是否为压缩节点,占用1bit - size:压缩节点压缩的字符串长度或者非压缩节点的子节点个数,占用29bit - data:包含填充字段,同时存储了当前节点包含的字符串以及子节点的指针,key对应的value指针。

typedef struct raxNode {
    uint32_t iskey:1;
    uint32_t isnull:1;
    uint32_t iscompr:1;
    uint32_t size:29;
    unsigned char data[];
} raxNode;

其中raxNode分为压缩节点域非压缩节点。主要区别在于非压缩节点的每个字符都有子节点,如果字符个数小于2,都是非压缩节点。

raxStack

raxStack结构用于存储从根节点到当前节点的路径,基本结构如下: - stack:用于记录路径,该指针可能指向static_items或者堆内存 - items,maxitems:代表stack指向的空间的已用空间以及最大空间 - static_items:一个数组,每个元素都是指针,存储路径 - oom:代表当前栈是否出现过内存溢出

typedef struct raxStack {
    void **stack;
    size_t items, maxitems;
    void *static_items[RAX_STACK_STATIC_ITEMS];
    int oom;
} raxStack;

raxIterator

raxIterator用于遍历Rax树中的所有key,基本结构如下: - flags:代表当前迭代器标志位,取值如下: - RAX_ITER_JUST_SEEKED:当前迭代器指向的元素是刚刚搜索过的,当需要从迭代器中获取元素时,直接返回当前元素并清空标志位。 - RAX_ITER_EOF:代表当前迭代器已经遍历到最后一个节点 - RAX_ITER_SAFE:代表当前迭代器为安全迭代器,可以进行写操作 - rt:当前迭代器对应的rax - key:存储了当前迭代器遍历到的key,该指针指向key_static_string或者堆内存 - data:value值 - key_len,key_max:key指向的空间的已用空间以及最大空间 - key_static_string:key的默认存储空间,key过大时,会使用堆内存 - node:当前key所在的raxNode - stack:记录了从根节点到当前节点的路径,用于raxNode线上遍历 - node_cb:为节点的回调函数,默认为空

typedef struct raxIterator {
    int flags;
    rax *rt;                /* Radix tree we are iterating. */
    unsigned char *key;     /* The current string. */
    void *data;             /* Data associated to this key. */
    size_t key_len;         /* Current key length. */
    size_t key_max;         /* Max key len the current key buffer can hold. */
    unsigned char key_static_string[RAX_ITER_STATIC_LEN];
    raxNode *node;          /* Current node. Only for unsafe iteration. */
    raxStack stack;         /* Stack used for unsafe iteration. */
    raxNodeCallback node_cb; /* Optional node callback. Normally set to NULL. */
} raxIterator;

总结

上文主要对Redis Stream的基本结构与其底层数据结构做了简要分析,了解了消息由listpack结构存储,以消息ID为key,listpack为value存储在Rax树中。


Redis底层数据结构-Stream源码分析
https://l1n.wang/2020/中间件/Redis/redis-stream/
作者
Lin Wang
发布于
2020年6月27日
许可协议