Redis底层数据结构-Stream源码分析
简介
Redis在5.0.0版本中引进了消息队列的功能,该功能由Stream实现,本文主要介绍Stream的相关实现。
数据结构
stream
Stream的实现依赖于Rax树与listpack
结构,每个消息流都包含一个Rax树,以消息ID为key,listpack为value存储在Rax树中。其基本结构如下:
- rax:rax存储生产者生产的具体消息,每个消息有唯一ID为键
- length:代表当前stream中消息个数。
- last_id:为当前stream中最后插入的消息ID,stream为空时,该值为0。
- cgroups:存储了当前stream相关的消费组,以消费组组名为键,streamCG为值存储在rax中。
typedef struct stream { rax *rax; /* The radix tree holding the stream. */ uint64_t length; /* Number of elements inside this stream. */ streamID last_id; /* Zero if there are yet no items. */ rax *cgroups; /* Consumer groups dictionary: name -> streamCG */ } stream;
一个Stream的基本结构如图所示:
每一个listpack都有一个master entry,该结构存储了该listpack待插入消息的所有field。
streamID
消息ID的基本结构如下:
- ms:消息创建时的时间
- seq:序号
typedef struct streamID { uint64_t ms; /* Unix time in milliseconds. */ uint64_t seq; /* Sequence number. */ } streamID;
streamCG
每个stream会有多个消费组,每个消费组通过组名称进行唯一标识,同时关联一个streamCG结构。该结构如下:
- last_id:该消费组已经确认的最后一个消息ID
- pel:为该消费组尚未确认的消息,消息ID为键,streamNACK为值,存储在rax树中
- consumers:消费组中的所有消费者,消费者名称为键,streamConsumer为值。
typedef struct streamCG { streamID last_id; rax *pel; rax *consumers; } streamCG;
streamConsumer
消费者结构如下:
- seen_time:为该消费者最后一次活跃的时间
- name:消费者名称,为sds结构
- pel:为该消费者尚未确认的消息,消息ID为键,streamNACK为值,存储在rax树中
typedef struct streamConsumer { mstime_t seen_time; sds name; rax *pel; } streamConsumer;
streamNACK
该结构为未确认消息,streamNACK维护了消费组或消费者尚未确认的消息,消费组中的pel的元素与消费者的pel元素是共享的。该结构如下:
- delivery_time:为该消息最后发送给消费方的时间
- delivery_count:该消息已发送的次数
- consumer:当前归属的消费者
typedef struct streamNACK { mstime_t delivery_time; uint64_t delivery_count; streamConsumer *consumer; } streamNACK;
streamIterator
该结构主要提供遍历功能,基本结构如下:
- stream:当前迭代器正在遍历的消息流
- master_id:为listpack中第一个插入的消息ID(master entry)
- master_fields_count:第一个entry的field个数
- master_fields_start:master entry的field首地址
- master_fields_ptr:记录field的位置
- entry_flags:当前遍历的消息的标志位
- rev:迭代器方向
- start_key,end_key:遍历范围
- ri:rax迭代器,用于遍历rax树中的所有key
- lp:当前的listpack指针
- lp_ele:当前正在遍历的listpack中的元素
- lp_flags:指向翻墙消息的flag域
- field_buf,value_buf:从listpack读取数据的缓存
typedef struct streamIterator { stream *stream; /* The stream we are iterating. */ streamID master_id; /* ID of the master entry at listpack head. */ uint64_t master_fields_count; /* Master entries # of fields. */ unsigned char *master_fields_start; /* Master entries start in listpack. */ unsigned char *master_fields_ptr; /* Master field to emit next. */ int entry_flags; /* Flags of entry we are emitting. */ int rev; /* True if iterating end to start (reverse). */ uint64_t start_key[2]; /* Start key as 128 bit big endian. */ uint64_t end_key[2]; /* End key as 128 bit big endian. */ raxIterator ri; /* Rax iterator. */ unsigned char *lp; /* Current listpack. */ unsigned char *lp_ele; /* Current listpack cursor. */ unsigned char *lp_flags; /* Current entry flags pointer. */ /* Buffers used to hold the string of lpGet() when the element is * integer encoded, so that there is no string representation of the * element inside the listpack itself. */ unsigned char field_buf[LP_INTBUF_SIZE]; unsigned char value_buf[LP_INTBUF_SIZE]; } streamIterator;
listpack
listpack是一个字符串列表的序列化格式,该结构可用于存储字符串或整型。其主要结构如下图所示:
listpack主要由四部分构成,分别是:
- Total Bytes为整个listpack的空间大小
- Num Elem:listpack的Entry个数,占用两个字节,但是Entry个数大于等于65535时,该值为65535,所以这种情况下获取元素个数,需要遍历整个listpack
- Entry:为每个具体的元素
- End:为listpack的结束标志,占用一个字节,内容为0xFF
Entry由三部分构成,基本如下:
- Encode:元素的编码方式,占用一个字节
- content:内容
- backlen:记录entry的长度(不包括该字段本身)
其中编码方式取值如下图所示:
Stream的消息内容存储在listpack中,消息存储格式是每个字段都是一个entry,而不是键整个消息作为字符串存储的,每个listpack会存储多个消息,具体存储个数由stream-node-max-bytes
(listpack节点最大占用字节数,默认4096)和stream-node-max-entries
(最大存储元素个数,默认100)决定.
每个listpack在创建时,会将第一个插入的entry构建成master entry,其基本结构如下所示:
count | deleted | num-fields | field-1 | field-2 | 0
其中:
- count:为当前listpack中所有未删除的消息个数
- deleted:当前listpack中所有已经删除的消息个数
- num-fields:field个数
- field-N:field域
- 0为标识位,再从后向前遍历listpack时使用
存储一个消息时,如果该消息的field域与master entry完全相同,则不需要再次存储field域
Rax
Redis对于Rax的解释为A radix tree implement
,基数树的一种实现。Rax中不仅可以存储字符串,也可以为该字符串设置值形成kv结构。其基本结构如下:
- head:指向头节点
- numele:元素个数(key)
- numnodes:节点个数
typedef struct rax { raxNode *head; uint64_t numele; uint64_t numnodes; } rax;
raxNode
raxNode代表Rax树中的一个节点,其基本结构如下所示:
- iskey:表明当前节点是否包含一个key,占用1bit
- isnull:表明当前key对应的value是否为空,占用1bit
- iscompr:表明当前节点是否为压缩节点,占用1bit
- size:压缩节点压缩的字符串长度或者非压缩节点的子节点个数,占用29bit
- data:包含填充字段,同时存储了当前节点包含的字符串以及子节点的指针,key对应的value指针。
typedef struct raxNode { uint32_t iskey:1; uint32_t isnull:1; uint32_t iscompr:1; uint32_t size:29; unsigned char data[]; } raxNode;
其中raxNode分为压缩节点域非压缩节点。主要区别在于非压缩节点的每个字符都有子节点,如果字符个数小于2,都是非压缩节点。
raxStack
raxStack结构用于存储从根节点到当前节点的路径,基本结构如下:
- stack:用于记录路径,该指针可能指向static_items或者堆内存
- items,maxitems:代表stack指向的空间的已用空间以及最大空间
- static_items:一个数组,每个元素都是指针,存储路径
- oom:代表当前栈是否出现过内存溢出
typedef struct raxStack { void **stack; size_t items, maxitems; void *static_items[RAX_STACK_STATIC_ITEMS]; int oom; } raxStack;
raxIterator
raxIterator用于遍历Rax树中的所有key,基本结构如下:
- flags:代表当前迭代器标志位,取值如下:
- RAX_ITER_JUST_SEEKED:当前迭代器指向的元素是刚刚搜索过的,当需要从迭代器中获取元素时,直接返回当前元素并清空标志位。
- RAX_ITER_EOF:代表当前迭代器已经遍历到最后一个节点
- RAX_ITER_SAFE:代表当前迭代器为安全迭代器,可以进行写操作
- rt:当前迭代器对应的rax
- key:存储了当前迭代器遍历到的key,该指针指向key_static_string或者堆内存
- data:value值
- key_len,key_max:key指向的空间的已用空间以及最大空间
- key_static_string:key的默认存储空间,key过大时,会使用堆内存
- node:当前key所在的raxNode
- stack:记录了从根节点到当前节点的路径,用于raxNode线上遍历
- node_cb:为节点的回调函数,默认为空
typedef struct raxIterator { int flags; rax *rt; /* Radix tree we are iterating. */ unsigned char *key; /* The current string. */ void *data; /* Data associated to this key. */ size_t key_len; /* Current key length. */ size_t key_max; /* Max key len the current key buffer can hold. */ unsigned char key_static_string[RAX_ITER_STATIC_LEN]; raxNode *node; /* Current node. Only for unsafe iteration. */ raxStack stack; /* Stack used for unsafe iteration. */ raxNodeCallback node_cb; /* Optional node callback. Normally set to NULL. */ } raxIterator;
总结
上文主要对Redis Stream的基本结构与其底层数据结构做了简要分析,了解了消息由listpack结构存储,以消息ID为key,listpack为value存储在Rax树中。