简介
Redis在5.0.0版本中引进了消息队列的功能,该功能由Stream实现,本文主要介绍Stream的相关实现。
数据结构
stream
Stream的实现依赖于Rax树与listpack
结构,每个消息流都包含一个Rax树,以消息ID为key,listpack为value存储在Rax树中。其基本结构如下:
- rax:rax存储生产者生产的具体消息,每个消息有唯一ID为键
- length:代表当前stream中消息个数。
- last_id:为当前stream中最后插入的消息ID,stream为空时,该值为0。
- cgroups:存储了当前stream相关的消费组,以消费组组名为键,streamCG为值存储在rax中。
typedef struct stream {
rax *rax; /* The radix tree holding the stream. */
uint64_t length; /* Number of elements inside this stream. */
streamID last_id; /* Zero if there are yet no items. */
rax *cgroups; /* Consumer groups dictionary: name -> streamCG */
} stream;
一个Stream的基本结构如图所示:
每一个listpack都有一个master entry,该结构存储了该listpack待插入消息的所有field。
streamID
消息ID的基本结构如下:
- ms:消息创建时的时间
- seq:序号
typedef struct streamID {
uint64_t ms; /* Unix time in milliseconds. */
uint64_t seq; /* Sequence number. */
} streamID;
streamCG
每个stream会有多个消费组,每个消费组通过组名称进行唯一标识,同时关联一个streamCG结构。该结构如下:
- last_id:该消费组已经确认的最后一个消息ID
- pel:为该消费组尚未确认的消息,消息ID为键,streamNACK为值,存储在rax树中
- consumers:消费组中的所有消费者,消费者名称为键,streamConsumer为值。
typedef struct streamCG {
streamID last_id;
rax *pel;
rax *consumers;
} streamCG;
streamConsumer
消费者结构如下:
- seen_time:为该消费者最后一次活跃的时间
- name:消费者名称,为sds结构
- pel:为该消费者尚未确认的消息,消息ID为键,streamNACK为值,存储在rax树中
typedef struct streamConsumer {
mstime_t seen_time;
sds name;
rax *pel;
} streamConsumer;
streamNACK
该结构为未确认消息,streamNACK维护了消费组或消费者尚未确认的消息,消费组中的pel的元素与消费者的pel元素是共享的。该结构如下:
- delivery_time:为该消息最后发送给消费方的时间
- delivery_count:该消息已发送的次数
- consumer:当前归属的消费者
typedef struct streamNACK {
mstime_t delivery_time;
uint64_t delivery_count;
streamConsumer *consumer;
} streamNACK;
streamIterator
该结构主要提供遍历功能,基本结构如下:
- stream:当前迭代器正在遍历的消息流
- master_id:为listpack中第一个插入的消息ID(master entry)
- master_fields_count:第一个entry的field个数
- master_fields_start:master entry的field首地址
- master_fields_ptr:记录field的位置
- entry_flags:当前遍历的消息的标志位
- rev:迭代器方向
- start_key,end_key:遍历范围
- ri:rax迭代器,用于遍历rax树中的所有key
- lp:当前的listpack指针
- lp_ele:当前正在遍历的listpack中的元素
- lp_flags:指向翻墙消息的flag域
- field_buf,value_buf:从listpack读取数据的缓存
typedef struct streamIterator {
stream *stream; /* The stream we are iterating. */
streamID master_id; /* ID of the master entry at listpack head. */
uint64_t master_fields_count; /* Master entries # of fields. */
unsigned char *master_fields_start; /* Master entries start in listpack. */
unsigned char *master_fields_ptr; /* Master field to emit next. */
int entry_flags; /* Flags of entry we are emitting. */
int rev; /* True if iterating end to start (reverse). */
uint64_t start_key[2]; /* Start key as 128 bit big endian. */
uint64_t end_key[2]; /* End key as 128 bit big endian. */
raxIterator ri; /* Rax iterator. */
unsigned char *lp; /* Current listpack. */
unsigned char *lp_ele; /* Current listpack cursor. */
unsigned char *lp_flags; /* Current entry flags pointer. */
/* Buffers used to hold the string of lpGet() when the element is
* integer encoded, so that there is no string representation of the
* element inside the listpack itself. */
unsigned char field_buf[LP_INTBUF_SIZE];
unsigned char value_buf[LP_INTBUF_SIZE];
} streamIterator;
listpack
listpack是一个字符串列表的序列化格式,该结构可用于存储字符串或整型。其主要结构如下图所示: listpack主要由四部分构成,分别是:
- Total Bytes为整个listpack的空间大小
- Num Elem:listpack的Entry个数,占用两个字节,但是Entry个数大于等于65535时,该值为65535,所以这种情况下获取元素个数,需要遍历整个listpack
- Entry:为每个具体的元素
- End:为listpack的结束标志,占用一个字节,内容为0xFF
Entry由三部分构成,基本如下:
- Encode:元素的编码方式,占用一个字节
- content:内容
- backlen:记录entry的长度(不包括该字段本身)
其中编码方式取值如下图所示:
Stream的消息内容存储在listpack中,消息存储格式是每个字段都是一个entry,而不是键整个消息作为字符串存储的,每个listpack会存储多个消息,具体存储个数由stream-node-max-bytes
(listpack节点最大占用字节数,默认4096)和stream-node-max-entries
(最大存储元素个数,默认100)决定.
每个listpack在创建时,会将第一个插入的entry构建成master entry,其基本结构如下所示:
count | deleted | num-fields | field-1 | field-2 | 0
其中:
- count:为当前listpack中所有未删除的消息个数
- deleted:当前listpack中所有已经删除的消息个数
- num-fields:field个数
- field-N:field域
- 0为标识位,再从后向前遍历listpack时使用
存储一个消息时,如果该消息的field域与master entry完全相同,则不需要再次存储field域
Rax
Redis对于Rax的解释为A radix tree implement
,基数树的一种实现。Rax中不仅可以存储字符串,也可以为该字符串设置值形成kv结构。其基本结构如下:
- head:指向头节点
- numele:元素个数(key)
- numnodes:节点个数
typedef struct rax {
raxNode *head;
uint64_t numele;
uint64_t numnodes;
} rax;
raxNode
raxNode代表Rax树中的一个节点,其基本结构如下所示:
- iskey:表明当前节点是否包含一个key,占用1bit
- isnull:表明当前key对应的value是否为空,占用1bit
- iscompr:表明当前节点是否为压缩节点,占用1bit
- size:压缩节点压缩的字符串长度或者非压缩节点的子节点个数,占用29bit
- data:包含填充字段,同时存储了当前节点包含的字符串以及子节点的指针,key对应的value指针。
typedef struct raxNode {
uint32_t iskey:1;
uint32_t isnull:1;
uint32_t iscompr:1;
uint32_t size:29;
unsigned char data[];
} raxNode;
其中raxNode分为压缩节点域非压缩节点。主要区别在于非压缩节点的每个字符都有子节点,如果字符个数小于2,都是非压缩节点。
raxStack
raxStack结构用于存储从根节点到当前节点的路径,基本结构如下:
- stack:用于记录路径,该指针可能指向static_items或者堆内存
- items,maxitems:代表stack指向的空间的已用空间以及最大空间
- static_items:一个数组,每个元素都是指针,存储路径
- oom:代表当前栈是否出现过内存溢出
typedef struct raxStack {
void **stack;
size_t items, maxitems;
void *static_items[RAX_STACK_STATIC_ITEMS];
int oom;
} raxStack;
raxIterator
raxIterator用于遍历Rax树中的所有key,基本结构如下:
- flags:代表当前迭代器标志位,取值如下:
- RAX_ITER_JUST_SEEKED:当前迭代器指向的元素是刚刚搜索过的,当需要从迭代器中获取元素时,直接返回当前元素并清空标志位。
- RAX_ITER_EOF:代表当前迭代器已经遍历到最后一个节点
- RAX_ITER_SAFE:代表当前迭代器为安全迭代器,可以进行写操作
- rt:当前迭代器对应的rax
- key:存储了当前迭代器遍历到的key,该指针指向key_static_string或者堆内存
- data:value值
- key_len,key_max:key指向的空间的已用空间以及最大空间
- key_static_string:key的默认存储空间,key过大时,会使用堆内存
- node:当前key所在的raxNode
- stack:记录了从根节点到当前节点的路径,用于raxNode线上遍历
- node_cb:为节点的回调函数,默认为空
typedef struct raxIterator {
int flags;
rax *rt; /* Radix tree we are iterating. */
unsigned char *key; /* The current string. */
void *data; /* Data associated to this key. */
size_t key_len; /* Current key length. */
size_t key_max; /* Max key len the current key buffer can hold. */
unsigned char key_static_string[RAX_ITER_STATIC_LEN];
raxNode *node; /* Current node. Only for unsafe iteration. */
raxStack stack; /* Stack used for unsafe iteration. */
raxNodeCallback node_cb; /* Optional node callback. Normally set to NULL. */
} raxIterator;
总结
上文主要对Redis Stream的基本结构与其底层数据结构做了简要分析,了解了消息由listpack结构存储,以消息ID为key,listpack为value存储在Rax树中。