Intro

Kafka的网络通信设计如图所示(图片引用自AutoMQ):

image.png

其中SocketServer负责处理外部连接,并将处理结果封装到Response返回。KafkaRequestHandlerPool作为I/O处理线程池,执行请求的具体逻辑。二者之间通过RequestChannel进行交互。

SocketServer

SocketServer负责处理各个Broker之间的通信channel,采用Reactor处理模型,Acceptor负责从socket接受request,Handler负责处理接收来的request,这也是Kafka中的设计亮点之一。

在实现上,分为data-plane和control-plane,防止数据类请求阻塞控制类请求:

// data-plane  
private[network] val dataPlaneAcceptors = new ConcurrentHashMap[EndPoint, DataPlaneAcceptor]()  
val dataPlaneRequestChannel = new RequestChannel(maxQueuedRequests, DataPlaneAcceptor.MetricPrefix, time, apiVersionManager.newRequestMetrics)  
// control-plane  
private[network] var controlPlaneAcceptorOpt: Option[ControlPlaneAcceptor] = None  
val controlPlaneRequestChannelOpt: Option[RequestChannel] = config.controlPlaneListenerName.map(_ =>  
  new RequestChannel(20, ControlPlaneAcceptor.MetricPrefix, time, apiVersionManager.newRequestMetrics))

private[network] val processors = new ArrayBuffer[Processor]()

从上述定义可以看出control-plane的线程数只有一个,这是因为控制类的请求数量较数据类请求少。

初始化逻辑如下:

private def createDataPlaneAcceptorAndProcessors(endpoint: EndPoint): Unit = synchronized {  
  if (stopped) {  
    throw new RuntimeException("Can't create new data plane acceptor and processors: SocketServer is stopped.")  
  }  
  val parsedConfigs = config.valuesFromThisConfigWithPrefixOverride(endpoint.listenerName.configPrefix)  
  connectionQuotas.addListener(config, endpoint.listenerName)  
  val isPrivilegedListener = controlPlaneRequestChannelOpt.isEmpty &&  
    config.interBrokerListenerName == endpoint.listenerName  
  val dataPlaneAcceptor = createDataPlaneAcceptor(endpoint, isPrivilegedListener, dataPlaneRequestChannel)  
  config.addReconfigurable(dataPlaneAcceptor)  
  dataPlaneAcceptor.configure(parsedConfigs)  
  dataPlaneAcceptors.put(endpoint, dataPlaneAcceptor)  
  info(s"Created data-plane acceptor and processors for endpoint : ${endpoint.listenerName}")  
}  
  
private def createControlPlaneAcceptorAndProcessor(endpoint: EndPoint): Unit = synchronized {  
  if (stopped) {  
    throw new RuntimeException("Can't create new control plane acceptor and processor: SocketServer is stopped.")  
  }  
  connectionQuotas.addListener(config, endpoint.listenerName)  
  val controlPlaneAcceptor = createControlPlaneAcceptor(endpoint, controlPlaneRequestChannelOpt.get)  
  controlPlaneAcceptor.addProcessors(1)  
  controlPlaneAcceptorOpt = Some(controlPlaneAcceptor)  
  info(s"Created control-plane acceptor and processor for endpoint : ${endpoint.listenerName}")  
}

Acceptor

Acceptor线程通过Selector + Channel轮询获取acceptable connection,将接收的连接信息传递给下游Processor处理,作为Runnable任务,每个endpoint指定一个acceptor。

初始化

//缓冲区大小
private val sendBufferSize = config.socketSendBufferBytes  
private val recvBufferSize = config.socketReceiveBufferBytes  
private val listenBacklogSize = config.socketListenBacklogSize  
  
//使用nio selector  
private val nioSelector = NSelector.open()  

private[network] var serverChannel: ServerSocketChannel  = _  
//创建serverChannel  
private[network] val localPort: Int  = if (endPoint.port != 0) {  
  endPoint.port  
} else {  
  serverChannel = openServerSocket(endPoint.host, endPoint.port, listenBacklogSize)  
  val newPort = serverChannel.socket().getLocalPort  
  info(s"Opened wildcard endpoint ${endPoint.host}:$newPort")  
  newPort  
}  
  
//processor线程池,由Acceptor维护
private[network] val processors = new ArrayBuffer[Processor]()

创建Socket

private def openServerSocket(host: String, port: Int, listenBacklogSize: Int): ServerSocketChannel = {  
  //1.1 创建InetSocketAddress  
  val socketAddress =  
    if (Utils.isBlank(host))  
      new InetSocketAddress(port)  
    else  
      new InetSocketAddress(host, port)  
        
  //1.2 开启serverChannel  
  val serverChannel = ServerSocketChannel.open()  
  try {  
    //1.3 设置serverChannel为非阻塞模式  
    serverChannel.configureBlocking(false)  
      
    //1.4 设置serverChannel的接收缓冲区大小  
    if (recvBufferSize != Selectable.USE_DEFAULT_BUFFER_SIZE)  
      serverChannel.socket().setReceiveBufferSize(recvBufferSize)  
        
    //1.5 绑定serverChannel与InetSocketAddress  
    serverChannel.socket.bind(socketAddress, listenBacklogSize)  
    info(s"Awaiting socket connections on ${socketAddress.getHostString}:${serverChannel.socket.getLocalPort}.")  
  } catch {  
    case e: SocketException =>  
      Utils.closeQuietly(serverChannel, "server socket")  
      throw new KafkaException(s"Socket server failed to bind to ${socketAddress.getHostString}:$port: ${e.getMessage}.", e)  
  }  
  serverChannel  
}

start方法逻辑:

def start(): Unit = synchronized {  
  try {  
    if (!shouldRun.get()) {  
      throw new ClosedChannelException()  
    }  
    //1.1 检查初始化serverChannel  
    if (serverChannel == null) {  
      serverChannel = openServerSocket(endPoint.host, endPoint.port, listenBacklogSize)  
      debug(s"Opened endpoint ${endPoint.host}:${endPoint.port}")  
    }  
    debug(s"Starting processors for listener ${endPoint.listenerName}")  
  
    //1.2 启动processors  
    processors.foreach(_.start())  
    debug(s"Starting acceptor thread for listener ${endPoint.listenerName}")  
      
    //1.3 启动acceptor线程  
    thread.start()  
    startedFuture.complete(null)  
    started.set(true)  
  } catch {  
    case e: ClosedChannelException =>  
      debug(s"Refusing to start acceptor for ${endPoint.listenerName} since the acceptor has already been shut down.")  
      startedFuture.completeExceptionally(e)  
    case t: Throwable =>  
      error(s"Unable to start acceptor for ${endPoint.listenerName}", t)  
      startedFuture.completeExceptionally(new RuntimeException(s"Unable to start acceptor for ${endPoint.listenerName}", t))  
  }  
}

主体逻辑

作为Runnable继承类,run方法如下:

override def run(): Unit = {  
  //1.1 将serverChannel 注册到 nioSelector,并设置监听事件为 OP_ACCEPT,用于接收新连接  
  serverChannel.register(nioSelector, SelectionKey.OP_ACCEPT)  
  try {  
    //1.2 未调用shutdown时,循环处理connection  
    while (shouldRun.get()) {  
      try {  
        //1.3 接受新连接  
        acceptNewConnections()  
          
        //1.4 检查限流连接  
        closeThrottledConnections()  
      }  
      catch {  
        case e: Throwable => error("Error occurred", e)  
      }  
    }  
  } finally {  
    closeAll()  
  }  
}

处理连接

Acceptor采用nioSelector处理连接事件:

private def acceptNewConnections(): Unit = {  
  //1.1 调用select,最多阻塞500ms,等待新的连接事件  
  val ready = nioSelector.select(500)  
  
  //1.2 若存在可处理的事件  
  if (ready > 0) {  
    //1.3 调用selectKeys,获取所有可处理的事件  
    val keys = nioSelector.selectedKeys()  
    val iter = keys.iterator()  
  
    //2.1 迭代处理所有事件  
    while (iter.hasNext && shouldRun.get()) {  
      try {  
        val key = iter.next  
        iter.remove()  
  
        //2.2 检查是否为可接受连接状态  
        if (key.isAcceptable) {  
          accept(key).foreach { socketChannel =>  
            // Assign the channel to the next processor (using round-robin) to which the  
            // channel can be added without blocking. If newConnections queue is full on            // all processors, block until the last one is able to accept a connection.            var retriesLeft = synchronized(processors.length)  
            var processor: Processor = null  
            do {  
              //2.3 采用轮询方式选取processor,来处理当前connection  
              retriesLeft -= 1  
              processor = synchronized {  
                // adjust the index (if necessary) and retrieve the processor atomically for  
                // correct behaviour in case the number of processors is reduced dynamically                currentProcessorIndex = currentProcessorIndex % processors.length  
                processors(currentProcessorIndex)  
              }  
              currentProcessorIndex += 1  
            } while (!assignNewConnection(socketChannel, processor, retriesLeft == 0))  
          }  
        } else  
          throw new IllegalStateException("Unrecognized key state for acceptor thread.")  
      } catch {  
        case e: Throwable => error("Error while accepting connection", e)  
      }  
    }  
  }  
}

将connection转给Processor处理:

private def assignNewConnection(socketChannel: SocketChannel, processor: Processor, mayBlock: Boolean): Boolean = {  
  if (processor.accept(socketChannel, mayBlock, blockedPercentMeter)) {  
    debug(s"Accepted connection from ${socketChannel.socket.getRemoteSocketAddress} on" +  
      s" ${socketChannel.socket.getLocalSocketAddress} and assigned it to processor ${processor.id}," +  
      s" sendBufferSize [actual|requested]: [${socketChannel.socket.getSendBufferSize}|$sendBufferSize]" +  
      s" recvBufferSize [actual|requested]: [${socketChannel.socket.getReceiveBufferSize}|$recvBufferSize]")  
    true  
  } else  
    false}

Processor维护

Acceptor中维护了Processor线程池,因此创建、移除Processor逻辑也在其中,创建、移除会同步修改requestChannel中存储的processors:

  def addProcessors(toCreate: Int): Unit = synchronized {  
    val listenerName = endPoint.listenerName  
    val securityProtocol = endPoint.securityProtocol  
    val listenerProcessors = new ArrayBuffer[Processor]()  
  
    for (_ <- 0 until toCreate) {  
      val processor = newProcessor(socketServer.nextProcessorId(), listenerName, securityProtocol)  
      listenerProcessors += processor  
      requestChannel.addProcessor(processor)  
  
      if (started.get) {  
        processor.start()  
      }  
    }  
    processors ++= listenerProcessors  
  }  
  
  def newProcessor(id: Int, listenerName: ListenerName, securityProtocol: SecurityProtocol): Processor = {  
    val name = s"${threadPrefix()}-kafka-network-thread-$nodeId-${endPoint.listenerName}-${endPoint.securityProtocol}-$id"  
    new Processor(id,  
                  time,  
                  config.socketRequestMaxBytes,  
                  requestChannel,  
                  connectionQuotas,  
                  config.connectionsMaxIdleMs,  
                  config.failedAuthenticationDelayMs,  
                  listenerName,  
                  securityProtocol,  
                  config,  
                  metrics,  
                  credentialProvider,  
                  memoryPool,  
                  logContext,  
                  Processor.ConnectionQueueSize,  
                  isPrivilegedListener,  
                  apiVersionManager,  
                  name)  
  }  
}

private[network] def removeProcessors(removeCount: Int): Unit = synchronized {  
  // Shutdown `removeCount` processors. Remove them from the processor list first so that no more  
  // connections are assigned. Shutdown the removed processors, closing the selector and its connections.  // The processors are then removed from `requestChannel` and any pending responses to these processors are dropped.  val toRemove = processors.takeRight(removeCount)  
  processors.remove(processors.size - removeCount, removeCount)  
  toRemove.foreach(_.close())  
  toRemove.foreach(processor => requestChannel.removeProcessor(processor.id))  
}

Processor

Processer负责将连接信息添加到RequestChannel的类中,并负责将Response返回。

Processor中包含三个队列:

  • newConnections:存储创建的新连接信息SocketChannel,接收到accept方法请求后,会将对象存储到该队列,调用configureNewConnections方法时从该队列中取出SocketChannel。
  • inflightResponses:将响应的Response信息返回后,存储在该队列中,用于部分Response回调处理。
  • responseQueue:存储需要进行响应的response对象。
private val newConnections = new ArrayBlockingQueue[SocketChannel](connectionQueueSize)  
private val inflightResponses = mutable.Map[String, RequestChannel.Response]()  
private val responseQueue = new LinkedBlockingDeque[RequestChannel.Response]()

Processor中的selector使用的Kafka自定义Selector:

protected[network] def createSelector(channelBuilder: ChannelBuilder): KSelector = {  
  channelBuilder match {  
    case reconfigurable: Reconfigurable => config.addReconfigurable(reconfigurable)  
    case _ =>  
  }  
  new KSelector(  
    maxRequestSize,  
    connectionsMaxIdleMs,  
    failedAuthenticationDelayMs,  
    metrics,  
    time,  
    "socket-server",  
    metricTags,  
    false,  
    true,  
    channelBuilder,  
    memoryPool,  
    logContext)  
}

accept

首先看下上文提到的accept方法:

def accept(socketChannel: SocketChannel,  
           mayBlock: Boolean,  
           acceptorIdlePercentMeter: com.yammer.metrics.core.Meter): Boolean = {  
  //1.1 将SocketChannel存储到newConnections队列中  
  val accepted = {  
    if (newConnections.offer(socketChannel)) {  
      true  
      //1.2 如果newConnections队列已满,put进行阻塞等待,并记录acceptorIdlePercentMeter  
    } else if (mayBlock) {  
      val startNs = time.nanoseconds  
      newConnections.put(socketChannel)  
      acceptorIdlePercentMeter.mark(time.nanoseconds() - startNs)  
      true  
    } else  
      false  }  
  //写入成功后唤醒processor线程  
  if (accepted)  
    wakeup()  
  accepted  
}

def wakeup(): Unit = selector.wakeup()

主体逻辑

override def run(): Unit = {  
  try {  
  
    //1.1 检查是否shutdown  
    while (shouldRun.get()) {  
      try {  
        // 1.2 创建新连接  
        configureNewConnections()  
        // 1.3 发送Response,并将response放入inflightResponses队列  
        processNewResponses()  
        //1.4 获取SocketChannel中就绪的IO事件  
        poll()  
        //1.5 将Request放入Request队列  
        processCompletedReceives()  
        //1.6 处理Response回调  
        processCompletedSends()  
        //1.7 处理因发送失败而导致的连接断开  
        processDisconnected()  
        //1.8 检查限流连接并关闭  
        closeExcessConnections()  
      } catch {  
        case e: Throwable => processException("Processor got uncaught exception.", e)  
      }  
    }  
  } finally {  
    debug(s"Closing selector - processor $id")  
    CoreUtils.swallow(closeAll(), this, Level.ERROR)  
  }  
}

configureNewConnections

configureNewConnections

private def configureNewConnections(): Unit = {  
  var connectionsProcessed = 0  
  //1.1 检查当前是否有未处理的connection并防止超额  
  while (connectionsProcessed < connectionQueueSize && !newConnections.isEmpty) {  
    val channel = newConnections.poll()  
    try {  
      //1.2 将新的connection注册到selector中,更新计数  
      debug(s"Processor $id listening to new connection from ${channel.socket.getRemoteSocketAddress}")  
      selector.register(connectionId(channel.socket), channel)  
      connectionsProcessed += 1  
    } catch {  
      // We explicitly catch all exceptions and close the socket to avoid a socket leak.  
      case e: Throwable =>  
        val remoteAddress = channel.socket.getRemoteSocketAddress  
        // need to close the channel here to avoid a socket leak.  
        connectionQuotas.closeChannel(this, listenerName, channel)  
        processException(s"Processor $id closed connection from $remoteAddress", e)  
    }  
  }  
}

processNewResponses

该方法将从responseQueue中获取Channel,并将response发送给request方,最后将Response存储到inflightResponses队列供后续使用。

private def processNewResponses(): Unit = {  
  var currentResponse: RequestChannel.Response = null  
  //1.1 从responseQueue中获取待处理的response  
  while ({currentResponse = dequeueResponse(); currentResponse != null}) {  
    val channelId = currentResponse.request.context.connectionId  
    try {  
      currentResponse match {  
        case response: NoOpResponse =>  
          // There is no response to send to the client, we need to read more pipelined requests  
          // that are sitting in the server's socket buffer          updateRequestMetrics(response)  
          trace(s"Socket server received empty response to send, registering for read: $response")  
          tryUnmuteChannel(channelId)  
  
        case response: SendResponse =>  
          sendResponse(response, response.responseSend)  
        case response: CloseConnectionResponse =>  
          updateRequestMetrics(response)  
          trace("Closing socket connection actively according to the response code.")  
          close(channelId)  
        case _: StartThrottlingResponse =>  
          handleChannelMuteEvent(channelId, ChannelMuteEvent.THROTTLE_STARTED)  
        case _: EndThrottlingResponse =>  
          handleChannelMuteEvent(channelId, ChannelMuteEvent.THROTTLE_ENDED)  
          tryUnmuteChannel(channelId)  
        case _ =>  
          throw new IllegalArgumentException(s"Unknown response type: ${currentResponse.getClass}")  
      }  
    } catch {  
      case e: Throwable =>  
        processChannelException(channelId, s"Exception while processing response for $channelId", e)  
    }  
  }  
}

返回Response逻辑:

protected[network] def sendResponse(response: RequestChannel.Response, responseSend: Send): Unit = {  
  val connectionId = response.request.context.connectionId  
  trace(s"Socket server received response to send to $connectionId, registering for write and sending data: $response")  

  //1.1 根据id获取channel,更新metric  
  if (channel(connectionId).isEmpty) {  
    warn(s"Attempting to send response via channel for which there is no open connection, connection id $connectionId")  
    response.request.updateRequestMetrics(0L, response)  
  }  
  //1.2 如果connection可连接,则发送response  
  if (openOrClosingChannel(connectionId).isDefined) {  
    //1.3 发送response  
    selector.send(new NetworkSend(connectionId, responseSend))  
    inflightResponses += (connectionId -> response)  
  }  
}

poll

调用poll方法检查就绪事件:

private def poll(): Unit = {  
  val pollTimeout = if (newConnections.isEmpty) 300 else 0  
  try selector.poll(pollTimeout)  
  catch {  
    case e @ (_: IllegalStateException | _: IOException) =>   
  }  
}

processCompletedReceives

processCompletedReceives负责接收待处理消息,并将其转发到RequestChannel。

private def processCompletedReceives(): Unit = {  
  //1.1 获取已接受的消息,逐一处理  
  selector.completedReceives.forEach { receive =>  
    try {  
      //1.2 获取或关闭channel  
      openOrClosingChannel(receive.source) match {  
        case Some(channel) =>  
          val header = parseRequestHeader(receive.payload)  
          //1.3 检查是否为SASL握手  
          if (header.apiKey == ApiKeys.SASL_HANDSHAKE && channel.maybeBeginServerReauthentication(receive,  
            () => time.nanoseconds()))  
            trace(s"Begin re-authentication: $channel")  
          else {  
            //1.4 检查认证是否过期  
            val nowNanos = time.nanoseconds()  
            if (channel.serverAuthenticationSessionExpired(nowNanos)) {  
              // be sure to decrease connection count and drop any in-flight responses  
              debug(s"Disconnecting expired channel: $channel : $header")  
              close(channel.id)  
              expiredConnectionsKilledCount.record(null, 1, 0)  
            } else {  
              val connectionId = receive.source  
              val context = new RequestContext(header, connectionId, channel.socketAddress, Optional.of(channel.socketPort()),  
                channel.principal, listenerName, securityProtocol, channel.channelMetadataRegistry.clientInformation,  
                isPrivilegedListener, channel.principalSerde)  
  
              val req = new RequestChannel.Request(processor = id, context = context,  
                startTimeNanos = nowNanos, memoryPool, receive.payload, requestChannel.metrics, None)  
  
              // KIP-511: ApiVersionsRequest is intercepted here to catch the client software name  
              // and version. It is done here to avoid wiring things up to the api layer.              if (header.apiKey == ApiKeys.API_VERSIONS) {  
                val apiVersionsRequest = req.body[ApiVersionsRequest]  
                if (apiVersionsRequest.isValid) {  
                  channel.channelMetadataRegistry.registerClientInformation(new ClientInformation(  
                    apiVersionsRequest.data.clientSoftwareName,  
                    apiVersionsRequest.data.clientSoftwareVersion))  
                }  
              }  
              //1.5 将构造好的Request发送到requestChannel  
              requestChannel.sendRequest(req)  
              selector.mute(connectionId)  
              handleChannelMuteEvent(connectionId, ChannelMuteEvent.REQUEST_RECEIVED)  
            }  
          }  
        case None =>  
          // This should never happen since completed receives are processed immediately after `poll()`  
          throw new IllegalStateException(s"Channel ${receive.source} removed from selector before processing completed receive")  
      }  
    } catch {  
      // note that even though we got an exception, we can assume that receive.source is valid.  
      // Issues with constructing a valid receive object were handled earlier      case e: Throwable =>  
        processChannelException(receive.source, s"Exception while processing request from ${receive.source}", e)  
    }  
  }  
  selector.clearCompletedReceives()  
}

processCompletedSends

从inflightResponses取出Response执行对应的回调逻辑:

private def processCompletedSends(): Unit = {  
  selector.completedSends.forEach { send =>  
    try {  
      val response = inflightResponses.remove(send.destinationId).getOrElse {  
        throw new IllegalStateException(s"Send for ${send.destinationId} completed, but not in `inflightResponses`")  
      }  
        
      // Invoke send completion callback, and then update request metrics since there might be some  
      // request metrics got updated during callback      response.onComplete.foreach(onComplete => onComplete(send))  
      updateRequestMetrics(response)  
  
      // Try unmuting the channel. If there was no quota violation and the channel has not been throttled,  
      // it will be unmuted immediately. If the channel has been throttled, it will unmuted only if the throttling      // delay has already passed by now.      handleChannelMuteEvent(send.destinationId, ChannelMuteEvent.RESPONSE_SENT)  
      tryUnmuteChannel(send.destinationId)  
    } catch {  
      case e: Throwable => processChannelException(send.destinationId,  
        s"Exception while processing completed send to ${send.destinationId}", e)  
    }  
  }  
  selector.clearCompletedSends()  
}

processDisconnected

processDisconnected用于处理已断开连接:

private def processDisconnected(): Unit = {  
  selector.disconnected.keySet.forEach { connectionId =>  
    try {  
      val remoteHost = ConnectionId.fromString(connectionId).getOrElse {  
        throw new IllegalStateException(s"connectionId has unexpected format: $connectionId")  
      }.remoteHost  
      inflightResponses.remove(connectionId).foreach(updateRequestMetrics)  
      // the channel has been closed by the selector but the quotas still need to be updated  
      connectionQuotas.dec(listenerName, InetAddress.getByName(remoteHost))  
    } catch {  
      case e: Throwable => processException(s"Exception while processing disconnection of $connectionId", e)  
    }  
  }  
}

RequestChannel

RequestChannel负责接收Processor传递过来的Request请求,并传递给KafkaRequestHandler进行处理,核心属性如下:

//请求队列  
private val requestQueue = new ArrayBlockingQueue[BaseRequest](queueSize)  
//SocketServer中的processor  
private val processors = new ConcurrentHashMap[Int, Processor]()  
  
private val requestQueueSizeMetricName = metricNamePrefix.concat(RequestQueueSizeMetric)  
private val responseQueueSizeMetricName = metricNamePrefix.concat(ResponseQueueSizeMetric)  
private val callbackQueue = new ArrayBlockingQueue[BaseRequest](queueSize)
def sendRequest(request: RequestChannel.Request): Unit = {  
  requestQueue.put(request)  
}

def receiveRequest(timeout: Long): RequestChannel.BaseRequest = {  
  val callbackRequest = callbackQueue.poll()  
  if (callbackRequest != null)  
    callbackRequest  
  else {  
    val request = requestQueue.poll(timeout, TimeUnit.MILLISECONDS)  
    request match {  
      case WakeupRequest => callbackQueue.poll()  
      case _ => request  
    }  
  }  
}

KafkaRequestHandlerPool

KafkaRequestHandlerPool是请求I/O处理线程池,负责创建、维护、销毁KafkaRequestHandler,KafkaRequestHandler作为请求I/O处理线程类,负责从SocketServer的RequestChannel的请求队列中获取请求对象,并处理。

class KafkaRequestHandlerPool(  
  val brokerId: Int,  
  val requestChannel: RequestChannel,  
  val apis: ApiRequestHandler,  
  time: Time,  
  numThreads: Int,  
  requestHandlerAvgIdleMetricName: String,  
  logAndThreadNamePrefix : String,  
  nodeName: String = "broker"  
) extends Logging {  
  private val metricsGroup = new KafkaMetricsGroup(this.getClass)  
  
  val threadPoolSize: AtomicInteger = new AtomicInteger(numThreads)  
  /* a meter to track the average free capacity of the request handlers */  
  private val aggregateIdleMeter = metricsGroup.newMeter(requestHandlerAvgIdleMetricName, "percent", TimeUnit.NANOSECONDS)  
  
  this.logIdent = "[" + logAndThreadNamePrefix + " Kafka Request Handler on Broker " + brokerId + "], "  
  val runnables = new mutable.ArrayBuffer[KafkaRequestHandler](numThreads)  
  for (i <- 0 until numThreads) {  
    createHandler(i)  
  }
}

初始化步骤位于:kafka.server.BrokerServer#startup / kafka.server.KafkaServer#startup

dataPlaneRequestHandlerPool = new KafkaRequestHandlerPool(config.nodeId,  
  socketServer.dataPlaneRequestChannel, dataPlaneRequestProcessor, time,  
  config.numIoThreads, s"${DataPlaneAcceptor.MetricPrefix}RequestHandlerAvgIdlePercent",  
  DataPlaneAcceptor.ThreadPrefix)

可以看出KafkaRequestHandlerPool中的I/O处理线程数量,是由配置项num.io.threads确定。

线程池管理

创建线程:

def createHandler(id: Int): Unit = synchronized {  
  runnables += new KafkaRequestHandler(id, brokerId, aggregateIdleMeter, threadPoolSize, requestChannel, apis, time, nodeName)  
  KafkaThread.daemon(logAndThreadNamePrefix + "-kafka-request-handler-" + id, runnables(id)).start()  
}

线程池扩缩容:

def resizeThreadPool(newSize: Int): Unit = synchronized {  
  val currentSize = threadPoolSize.get  
  info(s"Resizing request handler thread pool size from $currentSize to $newSize")  
  if (newSize > currentSize) {  
    for (i <- currentSize until newSize) {  
      createHandler(i)  
    }  
  } else if (newSize < currentSize) {  
    for (i <- 1 to (currentSize - newSize)) {  
      runnables.remove(currentSize - i).stop()  
    }  
  }  
  threadPoolSize.set(newSize)  
}

shutdown:

def shutdown(): Unit = synchronized {  
  info("shutting down")  
  for (handler <- runnables)  
    handler.initiateShutdown()  
  for (handler <- runnables)  
    handler.awaitShutdown()  
  info("shut down completely")  
}

KafkaRequestHandler

KafkaRequestHandler作为Runnable对象,内部属性如下:

  • threadRequestChannel:存储当前绑定的RequestChannel
  • threadCurrentRequest:存储当前循环处理的Request信息
class KafkaRequestHandler(  
  id: Int,  
  brokerId: Int,  
  val aggregateIdleMeter: Meter,  
  val totalHandlerThreads: AtomicInteger,  
  val requestChannel: RequestChannel,  
  apis: ApiRequestHandler,  
  time: Time,  
  nodeName: String = "broker"  
) extends Runnable with Logging {  
  this.logIdent = s"[Kafka Request Handler $id on ${nodeName.capitalize} $brokerId], "  
  private val shutdownComplete = new CountDownLatch(1)  
  private val requestLocal = RequestLocal.withThreadConfinedCaching  
  @volatile private var stopped = false

}

object KafkaRequestHandler {  
  // Support for scheduling callbacks on a request thread.  
  private val threadRequestChannel = new ThreadLocal[RequestChannel]  
  private val threadCurrentRequest = new ThreadLocal[RequestChannel.Request]
}

主体逻辑

从run方法中可以看出,处理的请求分为三类:shutdown请求、callback请求、普通请求。普通请求通过KafkaApis.handle方法处理。

def run(): Unit = {  
  //1.1 设置当前线程的RequestChannel  
  threadRequestChannel.set(requestChannel)  
  
  //1.2 线程未关闭的情况,循环处理请求  
  while (!stopped) {  

    //1.3 从requestChannel中获取请求  
    val req = requestChannel.receiveRequest(300)  
    val endTime = time.nanoseconds  
    val idleTime = endTime - startSelectTime  
    aggregateIdleMeter.mark(idleTime / totalHandlerThreads.get)  
  
    req match {  
  
      //2.1 处理shutdown请求  
      case RequestChannel.ShutdownRequest =>  
        debug(s"Kafka request handler $id on broker $brokerId received shut down command")  
        completeShutdown()  
        return  
  
      //2.2 处理回调请求  
      case callback: RequestChannel.CallbackRequest =>  
        val originalRequest = callback.originalRequest  
        try {  
  
          if (originalRequest.callbackRequestDequeueTimeNanos.isDefined) {  
            val prevCallbacksTimeNanos = originalRequest.callbackRequestCompleteTimeNanos.getOrElse(0L) - originalRequest.callbackRequestDequeueTimeNanos.getOrElse(0L)  
            originalRequest.callbackRequestCompleteTimeNanos = None  
            originalRequest.callbackRequestDequeueTimeNanos = Some(time.nanoseconds() - prevCallbacksTimeNanos)  
          } else {  
            originalRequest.callbackRequestDequeueTimeNanos = Some(time.nanoseconds())  
          }  
            
          threadCurrentRequest.set(originalRequest)  
          callback.fun(requestLocal)  
        } catch {  
          case e: FatalExitError =>  
            completeShutdown()  
            Exit.exit(e.statusCode)  
          case e: Throwable => error("Exception when handling request", e)  
        } finally {  
          // When handling requests, we try to complete actions after, so we should try to do so here as well.  
          apis.tryCompleteActions()  
          if (originalRequest.callbackRequestCompleteTimeNanos.isEmpty)  
            originalRequest.callbackRequestCompleteTimeNanos = Some(time.nanoseconds())  
          threadCurrentRequest.remove()  
        }  
  
        //2.3 处理普通请求  
      case request: RequestChannel.Request =>  
        try {  
          //2.4 由apis处理请求  
          request.requestDequeueTimeNanos = endTime  
          trace(s"Kafka request handler $id on broker $brokerId handling request $request")  
          threadCurrentRequest.set(request)  
          apis.handle(request, requestLocal)  
        } catch {  
          case e: FatalExitError =>  
            completeShutdown()  
            Exit.exit(e.statusCode)  
          case e: Throwable => error("Exception when handling request", e)  
        } finally {  
          // 移除thread local信息  
          threadCurrentRequest.remove()  
          request.releaseBuffer()  
        }  
  
      case RequestChannel.WakeupRequest =>   
        // We should handle this in receiveRequest by polling callbackQueue.  
        warn("Received a wakeup request outside of typical usage.")  
  
      case null => // continue  
    }  
  }

Reference

https://www.automq.com/blog/understand-kafka-network-communication-and-thread-model