Kafka学习笔记(四)-集群工作机制

Controller机制

Controller主要作用是在Zookeeper的帮助下管理和协调整个Kafka集群(在zk中存储集群元数据)。Kafka集群中会有一个或多个Broker,其中一个Broker会被选举为Controller,它负责管理整个集群中所有分区和副本的状态,其工作职责包括以下内容:

  • Topic管理:完成对Kafka的Topic的创建删除、分区增加等操作。
  • 分区重分配:新的Broker加入集群时,不会自动分担已有的topic负载,只会对后续的topic生效,此时如果需要对已有topic负载,需要用户手动进行分区重分配
  • Leader选举:负责Partition Leader选举的工作
  • 集群成员管理:
    • Kafka 使用Zookeeper的临时节点来选举Controller
    • Zookeeper在Broker加入集群或退出集群时通知Controller
    • Controller负责在Broker加入或离开集群时进行分区Leader选举
  • 元数据管理:Controller负责管理集群中所有的元数据

Controller选举流程:

  • 每个Broker启动时,都会尝试读取/controller节点的brokerid的值,如果值不为-1,则表明已经有其他broker节点成为Controller,当前broker放弃选举
  • 如果不存在/controller节点或节点数据异常,则主动创建节点并存储brokerid
  • 其他broker会将选举成功的Brokerid都在内存保存下来
  • 同时使用/controller_epoch持久性节点来记录任期号,记录Controller发生变化的次数,类似于Raft中的任期。
    • 初始值为1,每个与Controller交互的请求都会携带controller_epoch,如果请求的controller_epoch大于内存中controller_epoch,说明内存中的值过期了,目前已有新的Controller当选。
    • 由两部分组成:
      • epoch:单调递增的版本号,leader发生变更,进行自增
      • start offset:Leader副本在该Epoch值上写入的首条消息的位移。
    • 每个分区都缓存该值,并定期持久化到checkpoint文件中

Partition Leader选举

Controller拥有选举分区Leader的功能,每个分区都会有一个Broker作为Leader,处理所有的读写请求,选举流程由Controller负责:

  1. Controller从ZK中读取当前分区所有的ISR集合
  2. 调用配置的分区选择算法选举分区的Leader

Partition Leader的定义如下:

Each partition has one server which acts as the "leader" and zero or more servers which act as "followers". The leader handles all read and write requests for the partition while the followers passively replicate the leader. If the leader fails, one of the followers will automatically become the new leader. Each server acts as a leader for some of its partitions and a follower for others so load is well balanced within the cluster.

触发Partition Leader选举的时机如下:

  • Offline:创建新分区或现有分区Leader离线
  • Reassign:用户执行重分配操作
  • PreferredReplica:leader迁移回首选副本
  • 分区的现有leader即将下线

故障转移

如果Controller发生故障(宕机或网络超时等),Kafka能够立即启用备用Controller来代替之前的,这个过程称为Failover。其基本流程如下:

  1. ZK检测到当前controller宕机(ZK的watch机制),删除/controller临时节点。
  2. 某个Broker抢先注册了/controller临时节点,成为新的controller
  3. 该Broker从ZK拉取元数据,并执行初始化。

分区副本机制

副本对分区数据进行冗余存储,以保证系统的高可用和数据一致性,当分区Leader发生宕机或数据丢失,可以从其他副本获取分区数据。分区Leader负责处理分区所有的读写请求。

每个分区都有一个ISR集合,用于维护所有同步并可用的副本,ISR最少拥有Leader副本这一个元素,对于Follower来说,必须满足以下条件才是ISR:

  • 必须定时向Zookeeper发送心跳
  • 在规定时间内从Leader获取过消息

若副本不满足以上两条件,就会从ISR列表移除。

replica.lag.time.max.ms:Follower能落后Leader的最长时间间隔,默认为10s

使用Zookeeper来管理ISR,节点位置为/brokers/topics/[topic]/partitions/[partition]/state

HW和LEO机制

几个offset概念:

  • base offset:副本中第一条消息的offset
  • high watermark(HW):副本中最新一条已提交消息的offset
    • 用来标识分区中哪些消息可以消费,HW之后的数据对消费者不可见
    • 完成副本数据同步的任务
    • HW永远小于LEO:定义决定的
  • log end offset(LEO):副本中下一条待写入消息的offset
    • 用于更新HW:follower和Leader的LEO同步后,两者HW就可以更新

Leader除了保存自己的一组HW和LEO,也会保存其他Follower的值,目的是帮助Leader副本确定分区高水位。

更新机制

  • Follower从Leader同步数据时,会携带自己的LEO值,Leader将Follower中最小的LEO作为HW值,这是为了保证Leader宕机后,新选举的节点拥有所有HW之前的数据,保证了数据安全性。
  • Follower获取数据时,请求Leader的HW,然后follower.HW = min(follower.LEO, leader.HW)

常见问题

Kafka如何保证消息有序

Kafka中并不保证消息全局有序,只能保证分区有序

  • 生产者:使用同步方式发送,设置acks = all & max.in.flight.requests.per.connection = 1,存在下面的问题:
    • 重发问题:发送失败,需要判断是否自动重试,设置retries > 0
    • 幂等问题:设置enable.idempotence = true,设置后生产者客户端会给消息添加序列号,每次发送把序列号递增1
  • 服务端Broder:Kafka只能保证单个Partition内消息有序,若要Topic全局有序,需要设置单分区。
  • 消费者:根据Group机制,Topic下的分区只能从属group下的某一个消费者,若Consumer单线程消费没有问题,多线程并发消费顺序会乱。

如何处理Kafka大量消息积压

出现消息积压的原因:Kafka集群中出现了性能问题,来不及消费消息。

解决:

  • 生产端性能优化:检查发送消息的业务逻辑,合理设置并发数量和批量大小,注意消息体大小。

  • 消费端性能优化:一定要保证消费端速度大于生产端的生产速度,扩容消费端实例来提升总体的消费能力。要注意在扩大Consumer实例的同时,必须扩容Topic的Partition数量,确保两者数量大致相等。

  • 查看Broker端日志、监控系统等情况,是否存在硬件层面的问题:磁盘空间等。


本博客所有文章除特别声明外,均采用 CC BY-SA 4.0 协议 ,转载请注明出处!