1. Intro

Controller节点作为Kafka集群的管理节点,它负责为所有partition选举leader副本,并且负责存储和管理集群metadata。本文主要对core模块kafka.controller作简要分析。

核心属性如下:

  • controllerContext:集群metadata对象
  • controllerChannelManager:Controller端channel管理器,负责向其他Broker发送请求
  • kafkaScheduler:定时调度器
  • eventManager:Controller事件管理器,负责管理事件处理线程。
  • replicaStateMachine:副本状态机,负责副本状态转换。
  • partitionStateMachine:分区状态机,负责分区状态转换。
  • topicDeletionManager:主题删除管理器,负责删除主题及日志。

val controllerContext = new ControllerContext 

var controllerChannelManager = new ControllerChannelManager(  
  () => controllerContext.epoch,  
  config,  
  time,  
  metrics,  
  stateChangeLogger,  
  threadNamePrefix  
)  
  
// have a separate scheduler for the controller to be able to start and stop independently of the kafka server  
// visible for testing  
private[controller] val kafkaScheduler = new KafkaScheduler(1)  
  
// visible for testing  
private[controller] val eventManager = new ControllerEventManager(config.brokerId, this, time,  
  controllerContext.stats.rateAndTimeMetrics)  
  
private val brokerRequestBatch = new ControllerBrokerRequestBatch(config, controllerChannelManager,  
  eventManager, controllerContext, stateChangeLogger)  
val replicaStateMachine: ReplicaStateMachine = new ZkReplicaStateMachine(config, stateChangeLogger, controllerContext, zkClient,  
  new ControllerBrokerRequestBatch(config, controllerChannelManager, eventManager, controllerContext, stateChangeLogger))  
val partitionStateMachine: PartitionStateMachine = new ZkPartitionStateMachine(config, stateChangeLogger, controllerContext, zkClient,  
  new ControllerBrokerRequestBatch(config, controllerChannelManager, eventManager, controllerContext, stateChangeLogger))  
private val topicDeletionManager = new TopicDeletionManager(config, controllerContext, replicaStateMachine,  
  partitionStateMachine, new ControllerDeletionClient(this, zkClient))

2. metadata管理

通过kafka.controller.ControllerContext可以看到集群的所有metadata信息,核心字段如下:

  • ControllerStats:Controller的统计指标
  • offlinePartitionCount:当前集群中不可用partition数量
  • shuttingDownBrokerIds:正在关闭的broker id列表
  • liveBrokers:当前运行中的Broker对象
  • liveBrokerEpochs:保存运行中Broker的Epoch信息,集群选举时用于判断。
  • epoch、epochZkVersion:当前集群controller的epoch值。其中epoch是ZK中/controller_epoch节点的值,epochZkVersion是/controller_epoch节点的dataVersion值。
  • allTopics:存储当前集群所有的topic名称
  • partitionAssignments:保存所有partition副本分配情况,结构为mutable.Map.empty[String, mutable.Map[Int, ReplicaAssignment]],第一层key为topicName,第二层key为partition序号,第二层value为ReplicaAssignment对象

2.1. initializeControllerContext

private def initializeControllerContext(): Unit = {  
  // 1.1 获取所有broker和epoch信息  
  val curBrokerAndEpochs = zkClient.getAllBrokerAndEpochsInCluster  
  val (compatibleBrokerAndEpochs, incompatibleBrokerAndEpochs) = partitionOnFeatureCompatibility(curBrokerAndEpochs)  
  if (incompatibleBrokerAndEpochs.nonEmpty) {  
    warn("Ignoring registration of new brokers due to incompatibilities with finalized features: " +  
      incompatibleBrokerAndEpochs.map { case (broker, _) => broker.id }.toSeq.sorted.mkString(","))  
  }  
  controllerContext.setLiveBrokers(compatibleBrokerAndEpochs)  
  info(s"Initialized broker epochs cache: ${controllerContext.liveBrokerIdAndEpochs}")  
  controllerContext.setAllTopics(zkClient.getAllTopicsInCluster(true))  
  registerPartitionModificationsHandlers(controllerContext.allTopics.toSeq)  
  val replicaAssignmentAndTopicIds = zkClient.getReplicaAssignmentAndTopicIdForTopics(controllerContext.allTopics.toSet)  
  processTopicIds(replicaAssignmentAndTopicIds)  
  
  replicaAssignmentAndTopicIds.foreach { case TopicIdReplicaAssignment(_, _, assignments) =>  
    assignments.foreach { case (topicPartition, replicaAssignment) =>  
      controllerContext.updatePartitionFullReplicaAssignment(topicPartition, replicaAssignment)  
      if (replicaAssignment.isBeingReassigned)  
        controllerContext.partitionsBeingReassigned.add(topicPartition)  
    }  
  }  
  controllerContext.clearPartitionLeadershipInfo()  
  controllerContext.shuttingDownBrokerIds.clear()  
  // register broker modifications handlers  
  registerBrokerModificationsHandler(controllerContext.liveOrShuttingDownBrokerIds)  
  // update the leader and isr cache for all existing partitions from Zookeeper  
  updateLeaderAndIsrCache()  
  // start the channel manager  
  controllerChannelManager.startup(controllerContext.liveOrShuttingDownBrokers)  
  info(s"Currently active brokers in the cluster: ${controllerContext.liveBrokerIds}")  
  info(s"Currently shutting brokers in the cluster: ${controllerContext.shuttingDownBrokerIds}")  
  info(s"Current list of topics in the cluster: ${controllerContext.allTopics}")  
}

3. Controller通信逻辑

ControllerChannelManager用来管理Controller与其他Broker通信channel,与每一个Broker创建通信线程用于消息传递。

其中brokerStateInfo用于存储Broker Id到ControllerBrokerStateInfo结构的映射:

protected val brokerStateInfo = new mutable.HashMap[Int, ControllerBrokerStateInfo]

ControllerBrokerStateInfo结构如下:

case class ControllerBrokerStateInfo(networkClient: NetworkClient,  
                                     brokerNode: Node,  
                                     messageQueue: BlockingQueue[QueueItem],  
                                     requestSendThread: RequestSendThread,  
                                     queueSizeGauge: Gauge[Int],  
                                     requestRateAndTimeMetrics: Timer,  
                                     reconfigurableChannelBuilder: Option[Reconfigurable])

关键字段:

  • brokerNode:目标Broker节点,封装了Broker的连接信息,比如host、port等信息
  • messageQueue:请求消息阻塞队列
  • requestSendThread:Controller使用该线程给目标Broker发送请求

3.1. RequestSendThread

RequestSendThread的构造方法如下:

class RequestSendThread(val controllerId: Int,    //controller的broker id  
                        controllerEpoch: () => Int,     //当前controller epoch  
                        val queue: BlockingQueue[QueueItem],  //请求阻塞队列  
                        val networkClient: NetworkClient, //执行网络IO的客户端  
                        val brokerNode: Node,     //broker节点  
                        val config: KafkaConfig,    //配置信息  
                        val time: Time,  
                        val requestRateAndQueueTimeMetrics: Timer,  
                        val stateChangeLogger: StateChangeLogger,  
                        name: String)

线程处理逻辑如下:

override def doWork(): Unit = {  
  
  def backoff(): Unit = pause(100, TimeUnit.MILLISECONDS)  
  
  //1.1 从阻塞队列中获取请求  
  val QueueItem(apiKey, requestBuilder, callback, enqueueTimeMs) = queue.take()  
  //1.2 更新监控指标  
  requestRateAndQueueTimeMetrics.update(time.milliseconds() - enqueueTimeMs, TimeUnit.MILLISECONDS)  
  
  var clientResponse: ClientResponse = null  
  try {  
    var isSendSuccessful = false  
    while (isRunning && !isSendSuccessful) {  
      // if a broker goes down for a long time, then at some point the controller's zookeeper listener will trigger a  
      // removeBroker which will invoke shutdown() on this thread. At that point, we will stop retrying.      try {  
        //2.1 检查与broker是否已经建立连接  
        if (!brokerReady()) {  
          isSendSuccessful = false  
          backoff()  
        }  
        else {  
          //2.2 发送请求,等待接收response  
          val clientRequest = networkClient.newClientRequest(brokerNode.idString, requestBuilder,  
            time.milliseconds(), true)  
          clientResponse = NetworkClientUtils.sendAndReceive(networkClient, clientRequest, time)  
          isSendSuccessful = true  
        }  
      } catch {  
        case e: Throwable => // if the send was not successful, reconnect to broker and resend the message  
          warn(s"Controller $controllerId epoch ${controllerEpoch()} fails to send request " +  
            s"$requestBuilder " +  
            s"to broker $brokerNode. Reconnecting to broker.", e)  
          networkClient.close(brokerNode.idString)  
          isSendSuccessful = false  
          backoff()  
      }  
    }  
    //3.1 如果接收到response  
    if (clientResponse != null) {  
      val requestHeader = clientResponse.requestHeader  
      val api = requestHeader.apiKey  
      //3.2 如果不是leaderAndIsr, stopReplica, updateMetadata请求,则抛出异常  
      if (api != ApiKeys.LEADER_AND_ISR && api != ApiKeys.STOP_REPLICA && api != ApiKeys.UPDATE_METADATA)  
        throw new KafkaException(s"Unexpected apiKey received: $apiKey")  
  
      val response = clientResponse.responseBody  
  
      stateChangeLogger.withControllerEpoch(controllerEpoch()).trace(s"Received response " +  
        s"$response for request $api with correlation id " +  
        s"${requestHeader.correlationId} sent to broker $brokerNode")  
	  //3.3 调用callback
      if (callback != null) {  
        callback(response)  
      }  
    }  
  } catch {  
    case e: Throwable =>  
      error(s"Controller $controllerId fails to send a request to broker $brokerNode", e)  
      // If there is any socket error (eg, socket timeout), the connection is no longer usable and needs to be recreated.  
      networkClient.close(brokerNode.idString)  
  }  
}

4. Controller事件处理逻辑

ControllerEventManager用于处理Controller端事件的管理器,它是事件驱动模式,与前文AsyncKafkaConsumer的思想类似。

4.1. ControllerEvent

先来看下ControllerEvent接口,它是对于Controller事件的封装trait:

sealed trait ControllerEvent {  
  def state: ControllerState  
  // preempt() is not executed by `ControllerEventThread` but by the main thread.  
  def preempt(): Unit  
}
sealed abstract class ControllerState {  
  
  def value: Byte  
  
  def rateAndTimeMetricName: Option[String] =  
    if (hasRateAndTimeMetric) Some(s"${toString}RateAndTimeMs") else None  
  
  protected def hasRateAndTimeMetric: Boolean = true  
}

state为Controller状态,相关事件触发后会更改对应状态。

4.2. ControllerEventProcessor

ControllerEventProcessor是用来处理各类Controller事件的处理器,目前实现类只有kafka.controller.KafkaController,它提供了两个基础方法:

trait ControllerEventProcessor {  
  //接收一个Controller事件,并进行处理
  def process(event: ControllerEvent): Unit  
  //接收Controller事件,并抢占队列,优先处理
  def preempt(event: ControllerEvent): Unit  
}

实现类KafkaController中的process方法:

override def process(event: ControllerEvent): Unit = {  
  try {  
    event match {  
      case event: MockEvent =>  
        // Used only in test cases  
        event.process()  
      case ShutdownEventThread =>  
        error("Received a ShutdownEventThread event. This type of event is supposed to be handle by ControllerEventThread")  
      case AutoPreferredReplicaLeaderElection =>  
        processAutoPreferredReplicaLeaderElection()  
      case ReplicaLeaderElection(partitions, electionType, electionTrigger, callback) =>  
        processReplicaLeaderElection(partitions, electionType, electionTrigger, callback)  
      case UncleanLeaderElectionEnable =>  
        processUncleanLeaderElectionEnable()
        }
    }
}

4.3. ControllerEventManager

ControllerEventManager是处理事件逻辑的管理器,内部存储了事件处理线程、阻塞队列:

@volatile private var _state: ControllerState = ControllerState.Idle  

private val putLock = new ReentrantLock()  

private val queue = new LinkedBlockingQueue[QueuedEvent]  
// Visible for test  
private[controller] var thread = new ControllerEventThread(ControllerEventThreadName)

队列元素QueuedEvent中会存储ControllerEvent,并且定义了process、preempt、awaitProcessing方法,具体是通过调用ControllerEventProcessor对应方法来实现。通过spent变量,标记event是否已经执行。

class QueuedEvent(val event: ControllerEvent,  
                  val enqueueTimeMs: Long) {  
  private val processingStarted = new CountDownLatch(1)  
  private val spent = new AtomicBoolean(false)  
  
  def process(processor: ControllerEventProcessor): Unit = {  
    if (spent.getAndSet(true))  
      return  
    processingStarted.countDown()  
    processor.process(event)  
  }  
  
  def preempt(processor: ControllerEventProcessor): Unit = {  
    if (spent.getAndSet(true))  
      return  
    processor.preempt(event)  
  }  
  
  def awaitProcessing(): Unit = {  
    processingStarted.await()  
  }  
  
  override def toString: String = {  
    s"QueuedEvent(event=$event, enqueueTimeMs=$enqueueTimeMs)"  
  }  
}

put方法用于将ControllerEvent包装后并写入blockingQueue。

def put(event: ControllerEvent): QueuedEvent = inLock(putLock) {  
  val queuedEvent = new QueuedEvent(event, time.milliseconds())  
  queue.put(queuedEvent)  
  queuedEvent  
}  
  
def clearAndPut(event: ControllerEvent): QueuedEvent = inLock(putLock) {  
  val preemptedEvents = new util.ArrayList[QueuedEvent]()  
  queue.drainTo(preemptedEvents)  
  preemptedEvents.forEach(_.preempt(processor))  
  put(event)  
}

4.4. ControllerEventThread

ControllerEventThread事件处理线程直接调用QueuedEvent的process方法:

class ControllerEventThread(name: String)  
  extends ShutdownableThread(  
    name, false, s"[ControllerEventThread controllerId=$controllerId] ")  
    with Logging {  
  
  logIdent = logPrefix  
  
  override def doWork(): Unit = {  
    val dequeued = pollFromEventQueue()  
    dequeued.event match {  
      case ShutdownEventThread => // The shutting down of the thread has been initiated at this point. Ignore this event.  
      case controllerEvent =>  
        _state = controllerEvent.state  
  
        eventQueueTimeHist.update(time.milliseconds() - dequeued.enqueueTimeMs)  
  
        try {  
          def process(): Unit = dequeued.process(processor)  
  
          rateAndTimeMetrics.get(state) match {  
            case Some(timer) => timer.time(() => process())  
            case None => process()  
          }  
        } catch {  
          case e: Throwable => error(s"Uncaught error processing event $controllerEvent", e)  
        }  
  
        _state = ControllerState.Idle  
    }  
  }  
}

5. 基于Zookeeper选举

Zookeeper作为注册中心时,选举是通过创建/controller临时节点实现的,并能通过ZK提供的临时节点watch机制,在controller宕机下线时,触发重新选举。

触发选举的场景有三种:集群刚启动时、/controller节点被删除或节点数据变更时。

ControllerChangeHandler继承自ZNodeChangeHandler,这是用于处理Znode变化的callback类:

class ControllerChangeHandler(eventManager: ControllerEventManager) extends ZNodeChangeHandler {  
  override val path: String = ControllerZNode.path  
  
  override def handleCreation(): Unit = eventManager.put(ControllerChange)  
  override def handleDeletion(): Unit = eventManager.put(Reelect)  
  override def handleDataChange(): Unit = eventManager.put(ControllerChange)  
}

elect方法逻辑较为简单,其中调用的onControllerFailover方法会完成context初始化、分区、副本状态机初始化等工作:

private def elect(): Unit = {  
  activeControllerId = zkClient.getControllerId.getOrElse(-1)  
  /*  
   * We can get here during the initial startup and the handleDeleted ZK callback. Because of the potential race condition,   * it's possible that the controller has already been elected when we get here. This check will prevent the following   * createEphemeralPath method from getting into an infinite loop if this broker is already the controller.   */  //1.1 检查zk中是否已经有活跃controller  
  if (activeControllerId != -1) {  
    debug(s"Broker $activeControllerId has been elected as the controller, so stopping the election process.")  
    return  
  }  
  
  try {  
    //1.2 尝试注册controller  
    val (epoch, epochZkVersion) = zkClient.registerControllerAndIncrementControllerEpoch(config.brokerId)  
    controllerContext.epoch = epoch  
    controllerContext.epochZkVersion = epochZkVersion  
    activeControllerId = config.brokerId  
  
    info(s"${config.brokerId} successfully elected as the controller. Epoch incremented to ${controllerContext.epoch} " +  
      s"and epoch zk version is now ${controllerContext.epochZkVersion}")  
  
    //1.3 调用onControllerFailover(),完成controller初始化动作,如果发生异常则撤销  
    onControllerFailover()  
  } catch {  
    case e: ControllerMovedException =>  
      maybeResign()  
  
      if (activeControllerId != -1)  
        debug(s"Broker $activeControllerId was elected as controller instead of broker ${config.brokerId}", e)  
      else  
        warn("A controller has been elected but just resigned, this will result in another round of election", e)  
    case t: Throwable =>  
      error(s"Error while electing or becoming controller on broker ${config.brokerId}. " +  
        s"Trigger controller movement immediately", t)  
      triggerControllerMove()  
  }  
}