Intro

上文介绍了Kafka日志加载的基本流程,本文主要分析Kafka日志的读写流程。

日志写入的场景主要有以下几种:

  1. 生产者向Leader replica写入消息(包含事务消息)
  2. Follow replica拉取消息并写入
  3. 消费者组写入组信息

日志写入

从KafkaApis入口类中可以看到produce的处理动作:

case ApiKeys.PRODUCE => handleProduceRequest(request, requestLocal)

这里重点关注日志的写入逻辑,handleProduceRequest()中会调用replicaManager执行append:

// call the replica manager to append messages to the replicas  
replicaManager.handleProduceAppend(  
  timeout = produceRequest.timeout.toLong,  
  requiredAcks = produceRequest.acks,  
  internalTopicsAllowed = internalTopicsAllowed,  
  transactionalId = produceRequest.transactionalId,  
  entriesPerPartition = authorizedRequestInfo,  
  responseCallback = sendResponseCallback,  
  recordValidationStatsCallback = processingStatsCallback,  
  requestLocal = requestLocal,  
  transactionSupportedOperation = transactionSupportedOperation)

appendRecords

在ReplicaManager中会调用appendRecords()将其追加到消息对应partition的leader replica中,并且等待leader将其同步到其他replica中。

/**  
 * Handles the produce request by starting any transactional verification before appending. * * @param timeout                       maximum time we will wait to append before returning  
 * @param requiredAcks                  number of replicas who must acknowledge the append before sending the response  
 * @param internalTopicsAllowed         boolean indicating whether internal topics can be appended to  
 * @param transactionalId               the transactional ID for the produce request or null if there is none.  
 * @param entriesPerPartition           the records per partition to be appended  
 * @param responseCallback              callback for sending the response  
 * @param recordValidationStatsCallback callback for updating stats on record conversions  
 * @param requestLocal                  container for the stateful instances scoped to this request -- this must correspond to the  
 *                                      thread calling this method * @param actionQueue                   the action queue to use. ReplicaManager#defaultActionQueue is used by default.  
 * @param transactionSupportedOperation determines the supported Operation based on the client's Request api version  
 * * The responseCallback is wrapped so that it is scheduled on a request handler thread. There, it should be called with * that request handler thread's thread local and not the one supplied to this method. */
 def handleProduceAppend(timeout: Long,  
                        requiredAcks: Short,  
                        internalTopicsAllowed: Boolean,  
                        transactionalId: String,  
                        entriesPerPartition: Map[TopicPartition, MemoryRecords],  
                        responseCallback: Map[TopicPartition, PartitionResponse] => Unit,  
                        recordValidationStatsCallback: Map[TopicPartition, RecordValidationStats] => Unit = _ => (),  
                        requestLocal: RequestLocal = RequestLocal.NoCaching,  
                        actionQueue: ActionQueue = this.defaultActionQueue,  
                        transactionSupportedOperation: TransactionSupportedOperation): Unit = {
	
	appendRecords( 
	  timeout = timeout,  
	  requiredAcks = requiredAcks,  
	  internalTopicsAllowed = internalTopicsAllowed,  
	  origin = AppendOrigin.CLIENT,  
	  entriesPerPartition = entriesWithoutErrorsPerPartition,  
	  responseCallback = newResponseCallback,  
	  recordValidationStatsCallback = recordValidationStatsCallback,  
	  requestLocal = newRequestLocal,  
	  actionQueue = actionQueue,  
	  verificationGuards = verificationGuards  
		)
}

传入的参数较多,重要参数的含义如下:

  • timeout:producer端的request.timeout.ms参数
  • requiredAcks:producer端的acks参数
  • internalTopicsAllowed:是否允许向内部主题写入消息
  • origin:写入方来源,共有四类:REPLICATION(来自leader replica同步的数据)、COORDINATOR(coordinator写入的数据)、CLIENT(客户端写入的数据)、RAFT_LEADER(来自raft leader写入的数据)
  • entriesPerPartition:需要写入的消息分组,结构为:Map[TopicPartition, MemoryRecords]
  • responseCallback:写入成功后的callback

在appendRecords()中会调用appendToLocalLog()将消息写入local log。

/**  
 * Append messages to leader replicas of the partition, and wait for them to be replicated to other replicas; * the callback function will be triggered either when timeout or the required acks are satisfied; * if the callback function itself is already synchronized on some object then pass this object to avoid deadlock. * * Noted that all pending delayed check operations are stored in a queue. All callers to ReplicaManager.appendRecords() * are expected to call ActionQueue.tryCompleteActions for all affected partitions, without holding any conflicting * locks. * * @param timeout                       maximum time we will wait to append before returning  
 * @param requiredAcks                  number of replicas who must acknowledge the append before sending the response  
 * @param internalTopicsAllowed         boolean indicating whether internal topics can be appended to  
 * @param origin                        source of the append request (ie, client, replication, coordinator)  
 * @param entriesPerPartition           the records per partition to be appended  
 * @param responseCallback              callback for sending the response  
 * @param delayedProduceLock            lock for the delayed actions  
 * @param recordValidationStatsCallback callback for updating stats on record conversions  
 * @param requestLocal                  container for the stateful instances scoped to this request -- this must correspond to the  
 *                                      thread calling this method * @param actionQueue                   the action queue to use. ReplicaManager#defaultActionQueue is used by default.  
 * @param verificationGuards            the mapping from topic partition to verification guards if transaction verification is used  
 */
 def appendRecords(timeout: Long,  
                  requiredAcks: Short,  
                  internalTopicsAllowed: Boolean,  
                  origin: AppendOrigin,  
                  entriesPerPartition: Map[TopicPartition, MemoryRecords],  
                  responseCallback: Map[TopicPartition, PartitionResponse] => Unit,  
                  delayedProduceLock: Option[Lock] = None,  
                  recordValidationStatsCallback: Map[TopicPartition, RecordValidationStats] => Unit = _ => (),  
                  requestLocal: RequestLocal = RequestLocal.NoCaching,  
                  actionQueue: ActionQueue = this.defaultActionQueue,  
                  verificationGuards: Map[TopicPartition, VerificationGuard] = Map.empty): Unit = {  
  //1.1 检查ack参数是否合法  
  if (!isValidRequiredAcks(requiredAcks)) {  
    sendInvalidRequiredAcksResponse(entriesPerPartition, responseCallback)  
    return  
  }  
  
  //1.2 调用appendToLocalLog写入local log  
  val sTime = time.milliseconds  
  val localProduceResults = appendToLocalLog(internalTopicsAllowed = internalTopicsAllowed,  
    origin, entriesPerPartition, requiredAcks, requestLocal, verificationGuards.toMap)  
  debug("Produce to local log in %d ms".format(time.milliseconds - sTime))  
  
  //1.3 构建ProducePartitionStatus  
  val produceStatus = buildProducePartitionStatus(localProduceResults)  
  
  //1.4 将写入完成的响应结果localProduceResults,加到actionQueue,如果leader high watermark发生变更,会触发一些延迟操作  
  addCompletePurgatoryAction(actionQueue, localProduceResults)  
  recordValidationStatsCallback(localProduceResults.map { case (k, v) =>  
    k -> v.info.recordValidationStats  
  })  
  
  //1.5 检查是否需要执行延迟操作  
  maybeAddDelayedProduce(  
    requiredAcks,  
    delayedProduceLock,  
    timeout,  
    entriesPerPartition,  
    localProduceResults,  
    produceStatus,  
    responseCallback  
  )  
}

appendToLocalLog

该方法中会获取到消息对应的Partition对象,并调用appendRecordsToLeader()完成写入。

private def appendToLocalLog(internalTopicsAllowed: Boolean,  
                             origin: AppendOrigin,  
                             entriesPerPartition: Map[TopicPartition, MemoryRecords],  
                             requiredAcks: Short,  
                             requestLocal: RequestLocal,  
                             verificationGuards: Map[TopicPartition, VerificationGuard]): Map[TopicPartition, LogAppendResult] = {  
  
  entriesPerPartition.map { case (topicPartition, records) =>  
    brokerTopicStats.topicStats(topicPartition.topic).totalProduceRequestRate.mark()  
    brokerTopicStats.allTopicsStats.totalProduceRequestRate.mark()  
  
    // reject appending to internal topics if it is not allowed  
    if (Topic.isInternal(topicPartition.topic) && !internalTopicsAllowed) {  
      (topicPartition, LogAppendResult(  
        LogAppendInfo.UNKNOWN_LOG_APPEND_INFO,  
        Some(new InvalidTopicException(s"Cannot append to internal topic ${topicPartition.topic}")),  
        hasCustomErrorMessage = false))  
    } else {  
      //1.1 获取partition对象  
      try {  
        val partition = getPartitionOrException(topicPartition)  
        //1.2 向该分区对象写入消息集合  
        val info = partition.appendRecordsToLeader(records, origin, requiredAcks, requestLocal,  
          verificationGuards.getOrElse(topicPartition, VerificationGuard.SENTINEL))  
        val numAppendedMessages = info.numMessages  
  
  
        //1.3 返回写入结果  
        // update stats for successfully appended bytes and messages as bytesInRate and messageInRate  
        brokerTopicStats.topicStats(topicPartition.topic).bytesInRate.mark(records.sizeInBytes)  
        brokerTopicStats.allTopicsStats.bytesInRate.mark(records.sizeInBytes)  
        brokerTopicStats.topicStats(topicPartition.topic).messagesInRate.mark(numAppendedMessages)  
        brokerTopicStats.allTopicsStats.messagesInRate.mark(numAppendedMessages)
    }  
  }  
}
def appendRecordsToLeader(records: MemoryRecords, origin: AppendOrigin, requiredAcks: Int,  
                          requestLocal: RequestLocal, verificationGuard: VerificationGuard = VerificationGuard.SENTINEL): LogAppendInfo = {  
  //1.1 加读锁  
  val (info, leaderHWIncremented) = inReadLock(leaderIsrUpdateLock) {  
    // 1.2 判断是否为leader副本  
    leaderLogIfLocal match {  
      case Some(leaderLog) =>  
        //1.3 获取最小ISR  
        val minIsr = effectiveMinIsr(leaderLog)  
        val inSyncSize = partitionState.isr.size  
  
        // Avoid writing to leader if there are not enough insync replicas to make it safe  
        if (inSyncSize < minIsr && requiredAcks == -1) {  
          throw new NotEnoughReplicasException(s"The size of the current ISR ${partitionState.isr} " +  
            s"is insufficient to satisfy the min.isr requirement of $minIsr for partition $topicPartition")  
        }  
  
        // 1.4 调用UnifiedLog的appendAsLeader方法进行写入  
        val info = leaderLog.appendAsLeader(records, leaderEpoch = this.leaderEpoch, origin,  
          interBrokerProtocolVersion, requestLocal, verificationGuard)  
  
        // we may need to increment high watermark since ISR could be down to 1  
        (info, maybeIncrementLeaderHW(leaderLog))  
  
      case None =>  
        throw new NotLeaderOrFollowerException("Leader not local for partition %s on broker %d"  
          .format(topicPartition, localBrokerId))  
    }  
  }  
  
  info.copy(if (leaderHWIncremented) LeaderHwChange.INCREASED else LeaderHwChange.SAME)  
}
def appendAsLeader(records: MemoryRecords,  
                   leaderEpoch: Int,  
                   origin: AppendOrigin = AppendOrigin.CLIENT,  
                   interBrokerProtocolVersion: MetadataVersion = MetadataVersion.latestProduction,  
                   requestLocal: RequestLocal = RequestLocal.NoCaching,  
                   verificationGuard: VerificationGuard = VerificationGuard.SENTINEL): LogAppendInfo = {  
  val validateAndAssignOffsets = origin != AppendOrigin.RAFT_LEADER  
  append(records, origin, interBrokerProtocolVersion, validateAndAssignOffsets, leaderEpoch, Some(requestLocal), verificationGuard, ignoreRecordSize = false)  
}

先看下UnifiedLog的append方法参数:

  • records:待插入的消息记录
  • origin:上文提到的消息插入来源
  • interBrokerProtocolVersion:broker消息版本
  • validateAndAssignOffsets:是否需要校验并给消息分配offset
  • leaderEpoch:leader选举版本
  • requstLocal:用于存储请求信息的有状态本地容器
  • ignoreRecordSize:是否忽略校验大小

方法内部首先做数据校验,分配offset,再通过LocalLog进行写入。

private def append(records: MemoryRecords,  
                   origin: AppendOrigin,  
                   interBrokerProtocolVersion: MetadataVersion,  
                   validateAndAssignOffsets: Boolean,  
                   leaderEpoch: Int,  
                   requestLocal: Option[RequestLocal],  
                   verificationGuard: VerificationGuard,  
                   ignoreRecordSize: Boolean): LogAppendInfo = {  
  // We want to ensure the partition metadata file is written to the log dir before any log data is written to disk.  
  // This will ensure that any log data can be recovered with the correct topic ID in the case of failure.  //1.1 将metadata文件刷入磁盘  
  maybeFlushMetadataFile()  
  
  //1.2 校验消息  
  val appendInfo = analyzeAndValidateRecords(records, origin, ignoreRecordSize, !validateAndAssignOffsets, leaderEpoch)  
  
  // return if we have no valid messages or if this is a duplicate of the last appended entry  
  if (appendInfo.validBytes <= 0) appendInfo  
  else {  
  
    //1.3 清除消息中的不合法字节  
    // trim any invalid bytes or partial messages before appending it to the on-disk log  
    var validRecords = trimInvalidBytes(records, appendInfo)  
  
  
    //1.4 设置同步,执行写入  
    // they are valid, insert them in the log  
    lock synchronized {  
      maybeHandleIOException(s"Error while appending records to $topicPartition in dir ${dir.getParent}") {  
        localLog.checkIfMemoryMappedBufferClosed()  
  
        //1.5 检查是否需要手动分配offset  
        if (validateAndAssignOffsets) {  
          // assign offsets to the message set  
          val offset = PrimitiveRef.ofLong(localLog.logEndOffset)  
          appendInfo.setFirstOffset(offset.value)  
          val validateAndOffsetAssignResult = try {  
            val targetCompression = BrokerCompressionType.targetCompression(config.compression, appendInfo.sourceCompression())  
            val validator = new LogValidator(validRecords,  
              topicPartition,  
              time,  
              appendInfo.sourceCompression,  
              targetCompression,  
              config.compact,  
              config.recordVersion.value,  
              config.messageTimestampType,  
              config.messageTimestampBeforeMaxMs,  
              config.messageTimestampAfterMaxMs,  
              leaderEpoch,  
              origin,  
              interBrokerProtocolVersion  
            )  
            validator.validateMessagesAndAssignOffsets(offset,  
              validatorMetricsRecorder,  
              requestLocal.getOrElse(throw new IllegalArgumentException(  
                "requestLocal should be defined if assignOffsets is true")  
              ).bufferSupplier  
            )  
          } catch {  
            case e: IOException =>  
              throw new KafkaException(s"Error validating messages while appending to log $name", e)  
          }  
  
          validRecords = validateAndOffsetAssignResult.validatedRecords  
          appendInfo.setMaxTimestamp(validateAndOffsetAssignResult.maxTimestampMs)  
          appendInfo.setShallowOffsetOfMaxTimestamp(validateAndOffsetAssignResult.shallowOffsetOfMaxTimestamp)  
          appendInfo.setLastOffset(offset.value - 1)  
          appendInfo.setRecordValidationStats(validateAndOffsetAssignResult.recordValidationStats)  
          if (config.messageTimestampType == TimestampType.LOG_APPEND_TIME)  
            appendInfo.setLogAppendTime(validateAndOffsetAssignResult.logAppendTimeMs)  
  
          // re-validate message sizes if there's a possibility that they have changed (due to re-compression or message  
          // format conversion)          if (!ignoreRecordSize && validateAndOffsetAssignResult.messageSizeMaybeChanged) {  
            validRecords.batches.forEach { batch =>  
              if (batch.sizeInBytes > config.maxMessageSize) {  
                // we record the original message set size instead of the trimmed size  
                // to be consistent with pre-compression bytesRejectedRate recording                brokerTopicStats.topicStats(topicPartition.topic).bytesRejectedRate.mark(records.sizeInBytes)  
                brokerTopicStats.allTopicsStats.bytesRejectedRate.mark(records.sizeInBytes)  
                throw new RecordTooLargeException(s"Message batch size is ${batch.sizeInBytes} bytes in append to" +  
                  s"partition $topicPartition which exceeds the maximum configured size of ${config.maxMessageSize}.")  
              }  
            }  
          }  
        } else {  
          // 从appendInfo获取offset  
          // we are taking the offsets we are given          if (appendInfo.firstOrLastOffsetOfFirstBatch < localLog.logEndOffset) {  
            // we may still be able to recover if the log is empty  
            // one example: fetching from log start offset on the leader which is not batch aligned,            // which may happen as a result of AdminClient#deleteRecords()            val hasFirstOffset = appendInfo.firstOffset != UnifiedLog.UnknownOffset  
            val firstOffset = if (hasFirstOffset) appendInfo.firstOffset else records.batches.iterator().next().baseOffset()  
  
            val firstOrLast = if (hasFirstOffset) "First offset" else "Last offset of the first batch"  
            throw new UnexpectedAppendOffsetException(  
              s"Unexpected offset in append to $topicPartition. $firstOrLast " +  
                s"${appendInfo.firstOrLastOffsetOfFirstBatch} is less than the next offset ${localLog.logEndOffset}. " +  
                s"First 10 offsets in append: ${records.records.asScala.take(10).map(_.offset)}, last offset in" +  
                s" append: ${appendInfo.lastOffset}. Log start offset = $logStartOffset",  
              firstOffset, appendInfo.lastOffset)  
          }  
        }  
  
        // update the epoch cache with the epoch stamped onto the message by the leader  
        validRecords.batches.forEach { batch =>  
          if (batch.magic >= RecordBatch.MAGIC_VALUE_V2) {  
            maybeAssignEpochStartOffset(batch.partitionLeaderEpoch, batch.baseOffset)  
          } else {  
            // In partial upgrade scenarios, we may get a temporary regression to the message format. In  
            // order to ensure the safety of leader election, we clear the epoch cache so that we revert            // to truncation by high watermark after the next leader election.            leaderEpochCache.filter(_.nonEmpty).foreach { cache =>  
              warn(s"Clearing leader epoch cache after unexpected append with message format v${batch.magic}")  
              cache.clearAndFlush()  
            }  
          }  
        }  
  
        // check messages set size may be exceed config.segmentSize  
        if (validRecords.sizeInBytes > config.segmentSize) {  
          throw new RecordBatchTooLargeException(s"Message batch size is ${validRecords.sizeInBytes} bytes in append " +  
            s"to partition $topicPartition, which exceeds the maximum configured segment size of ${config.segmentSize}.")  
        }  
  
        // 2.1 检查LogSegment是否已满,并返回对应的LogSegment  
        // maybe roll the log if this segment is full        val segment = maybeRoll(validRecords.sizeInBytes, appendInfo)  
  
        val logOffsetMetadata = new LogOffsetMetadata(  
          appendInfo.firstOrLastOffsetOfFirstBatch,  
          segment.baseOffset,  
          segment.size)  
  
        //2.2 检查事务状态  
        // now that we have valid records, offsets assigned, and timestamps updated, we need to  
        // validate the idempotent/transactional state of the producers and collect some metadata        val (updatedProducers, completedTxns, maybeDuplicate) = analyzeAndValidateProducerState(  
          logOffsetMetadata, validRecords, origin, verificationGuard)  
  
        maybeDuplicate match {  
          case Some(duplicate) =>  
            appendInfo.setFirstOffset(duplicate.firstOffset)  
            appendInfo.setLastOffset(duplicate.lastOffset)  
            appendInfo.setLogAppendTime(duplicate.timestamp)  
            appendInfo.setLogStartOffset(logStartOffset)  
          case None =>  
            // Append the records, and increment the local log end offset immediately after the append because a  
            // write to the transaction index below may fail and we want to ensure that the offsets            // of future appends still grow monotonically. The resulting transaction index inconsistency            // will be cleaned up after the log directory is recovered. Note that the end offset of the            // ProducerStateManager will not be updated and the last stable offset will not advance            // if the append to the transaction index fails.  
            //2.3 调用append()方法执行写入,并更新localLog.logEndOffset和highWatermark  
            localLog.append(appendInfo.lastOffset, appendInfo.maxTimestamp, appendInfo.shallowOffsetOfMaxTimestamp, validRecords)  
            updateHighWatermarkWithLogEndOffset()  
  
            // update the producer state  
            updatedProducers.values.foreach(producerAppendInfo => producerStateManager.update(producerAppendInfo))  
  
            // update the transaction index with the true last stable offset. The last offset visible  
            // to consumers using READ_COMMITTED will be limited by this value and the high watermark.            completedTxns.foreach { completedTxn =>  
              val lastStableOffset = producerStateManager.lastStableOffset(completedTxn)  
              segment.updateTxnIndex(completedTxn, lastStableOffset)  
              producerStateManager.completeTxn(completedTxn)  
            }  
  
            // always update the last producer id map offset so that the snapshot reflects the current offset  
            // even if there isn't any idempotent data being written            producerStateManager.updateMapEndOffset(appendInfo.lastOffset + 1)  
  
            // update the first unstable offset (which is used to compute LSO)  
            maybeIncrementFirstUnstableOffset()  
  
            trace(s"Appended message set with last offset: ${appendInfo.lastOffset}, " +  
              s"first offset: ${appendInfo.firstOffset}, " +  
              s"next offset: ${localLog.logEndOffset}, " +  
              s"and messages: $validRecords")  
  
            if (localLog.unflushedMessages >= config.flushInterval) flush(false)  
        }  
        appendInfo  
      }  
    }  
  }  
}
private[log] def append(lastOffset: Long, largestTimestamp: Long, shallowOffsetOfMaxTimestamp: Long, records: MemoryRecords): Unit = {  
  segments.activeSegment.append(lastOffset, largestTimestamp, shallowOffsetOfMaxTimestamp, records)  
  updateLogEndOffset(lastOffset + 1)  
}

LogSegment写入流程:

/**  
 * @param largestOffset 最大消息位移值
 * @param largestTimestampMs 最大消息时间戳 
 * @param shallowOffsetOfMaxTimestamp 最大时间戳对应的消息位移
 * @param records 消息体
 */
 public void append(long largestOffset,  
                   long largestTimestampMs,  
                   long shallowOffsetOfMaxTimestamp,  
                   MemoryRecords records) throws IOException {  
    //1. 判断消息是否为空  
    if (records.sizeInBytes() > 0) {  
        LOGGER.trace("Inserting {} bytes at end offset {} at position {} with largest timestamp {} at offset {}",  
            records.sizeInBytes(), largestOffset, log.sizeInBytes(), largestTimestampMs, shallowOffsetOfMaxTimestamp);  
        int physicalPosition = log.sizeInBytes();  
        if (physicalPosition == 0)  
            //1.1 如果日志体为空  
            rollingBasedTimestamp = OptionalLong.of(largestTimestampMs);  
  
        //2. 确保offset合法,规则:0 <= largestOffset - baseOffset <= Integer.MAX_VALUE  
        ensureOffsetInRange(largestOffset);  
  
        // 3. 调用FileRecords的append进行写入  
        long appendedBytes = log.append(records);  
        LOGGER.trace("Appended {} to {} at end offset {}", appendedBytes, log.file(), largestOffset);  
        // 4. 更新日志的最大时间戳  
        if (largestTimestampMs > maxTimestampSoFar()) {  
            maxTimestampAndOffsetSoFar = new TimestampOffset(largestTimestampMs, shallowOffsetOfMaxTimestamp);  
        }  
        // 4.1 判断是否需要更新索引  
		if (bytesSinceLastIndexEntry > indexIntervalBytes) {  
		    //4.1.1 更新offset index  
		    offsetIndex().append(largestOffset, physicalPosition);  
		    //4.1.2 更新time index  
		    timeIndex().maybeAppend(maxTimestampSoFar(), shallowOffsetOfMaxTimestampSoFar());  
		    //4.1.3 重置bytesSinceLastIndexEntry  
		    bytesSinceLastIndexEntry = 0;  
		} 
        //4.2 更新bytesSinceLastIndexEntry  
        bytesSinceLastIndexEntry += records.sizeInBytes();  
    }  
}

maybeAddDelayedProduce

向远程其他副本发送写入请求需要满足三个条件:

  1. acks = -1
  2. 写入分区消息非空
  3. 本地leader副本的分区消息,至少有一条成功写入
private def maybeAddDelayedProduce(  
  requiredAcks: Short,  
  delayedProduceLock: Option[Lock],  
  timeoutMs: Long,  
  entriesPerPartition: Map[TopicPartition, MemoryRecords],  
  initialAppendResults: Map[TopicPartition, LogAppendResult],  
  initialProduceStatus: Map[TopicPartition, ProducePartitionStatus],  
  responseCallback: Map[TopicPartition, PartitionResponse] => Unit,  
): Unit = {  
  
  //1.1 检查是否满足远程写入的要求  
  if (delayedProduceRequestRequired(requiredAcks, entriesPerPartition, initialAppendResults)) {  
    // create delayed produce operation  
    val produceMetadata = ProduceMetadata(requiredAcks, initialProduceStatus)  
    val delayedProduce = new DelayedProduce(timeoutMs, produceMetadata, this, responseCallback, delayedProduceLock)  
  
    // create a list of (topic, partition) pairs to use as keys for this delayed produce operation  
    val producerRequestKeys = entriesPerPartition.keys.map(TopicPartitionOperationKey(_)).toSeq  
  
    // try to complete the request immediately, otherwise put it into the purgatory  
    // this is because while the delayed produce operation is being created, new    // requests may arrive and hence make this operation completable.    delayedProducePurgatory.tryCompleteElseWatch(delayedProduce, producerRequestKeys)  
  } else {  
    // we can respond immediately  
    val produceResponseStatus = initialProduceStatus.map { case (k, status) => k -> status.responseStatus }  
    responseCallback(produceResponseStatus)  
  }  
}

read

读取日志信息

/**  
 * Read a message set from this segment that contains startOffset. The message set will include * no more than maxSize bytes and will end before maxOffset if a maxOffset is specified. * * This method is thread-safe. * * @param startOffset 需要读取的位移开始位置  
 * @param maxSize 读取的最大字节数  
 * @param maxPositionOpt 能读取到的最大文件位置  
 * @param minOneMessage 是否允许消息体过大时,返回的第一条消息size > maxSize  
 * * @return The fetched data and the base offset metadata of the message batch that contains startOffset,  
 *         or null if the startOffset is larger than the largest offset in this log */
public FetchDataInfo read(long startOffset, int maxSize, Optional<Long> maxPositionOpt, boolean minOneMessage) throws IOException {  
    if (maxSize < 0)  
        throw new IllegalArgumentException("Invalid max size " + maxSize + " for log read from segment " + log);  
  
    //1. 调用translateOffset定位到读取的起始文件位置  
    LogOffsetPosition startOffsetAndSize = translateOffset(startOffset);  
  
    // if the start position is already off the end of the log, return null  
    if (startOffsetAndSize == null)  
        return null;  
  
    int startPosition = startOffsetAndSize.position;  
    LogOffsetMetadata offsetMetadata = new LogOffsetMetadata(startOffsetAndSize.offset, this.baseOffset, startPosition);  
  
    //1.2 判断消息最大读取长度  
    int adjustedMaxSize = maxSize;  
    if (minOneMessage)  
        adjustedMaxSize = Math.max(maxSize, startOffsetAndSize.size);  
  
    // return empty records in the fetch-data-info when:  
    // 1. adjustedMaxSize is 0 (or)    // 2. maxPosition to read is unavailable    if (adjustedMaxSize == 0 || !maxPositionOpt.isPresent())  
        return new FetchDataInfo(offsetMetadata, MemoryRecords.EMPTY);  
  
    // calculate the length of the message set to read based on whether or not they gave us a maxOffset  
    int fetchSize = Math.min((int) (maxPositionOpt.get() - startPosition), adjustedMaxSize);  
  
    // 2. 调用log.slice读取数据  
    return new FetchDataInfo(offsetMetadata, log.slice(startPosition, fetchSize),  
        adjustedMaxSize < startOffsetAndSize.size, Optional.empty());  
}

日志读取

日志Fetch动作由ReplicaManager中fetchMessages()方法来实现。

def fetchMessages(params: FetchParams,  
                  fetchInfos: Seq[(TopicIdPartition, PartitionData)],  
                  quota: ReplicaQuota,  
                  responseCallback: Seq[(TopicIdPartition, FetchPartitionData)] => Unit): Unit = {  
  
  //1.1 调用readFromLog方法,获取消息  
  // check if this fetch request can be satisfied right away  
  val logReadResults = readFromLog(params, fetchInfos, quota, readFromPurgatory = false)  
  var bytesReadable: Long = 0  
  var errorReadingData = false  
  
  // The 1st topic-partition that has to be read from remote storage  
  var remoteFetchInfo: Optional[RemoteStorageFetchInfo] = Optional.empty()  
  
  var hasDivergingEpoch = false  
  var hasPreferredReadReplica = false  
  val logReadResultMap = new mutable.HashMap[TopicIdPartition, LogReadResult]  
  
  //2.1 更新统计指标  
  logReadResults.foreach { case (topicIdPartition, logReadResult) =>  
    brokerTopicStats.topicStats(topicIdPartition.topicPartition.topic).totalFetchRequestRate.mark()  
    brokerTopicStats.allTopicsStats.totalFetchRequestRate.mark()  
    if (logReadResult.error != Errors.NONE)  
      errorReadingData = true  
    if (!remoteFetchInfo.isPresent && logReadResult.info.delayedRemoteStorageFetch.isPresent) {  
      remoteFetchInfo = logReadResult.info.delayedRemoteStorageFetch  
    }  
    if (logReadResult.divergingEpoch.nonEmpty)  
      hasDivergingEpoch = true  
    if (logReadResult.preferredReadReplica.nonEmpty)  
      hasPreferredReadReplica = true  
    bytesReadable = bytesReadable + logReadResult.info.records.sizeInBytes  
    logReadResultMap.put(topicIdPartition, logReadResult)  
  }  
  
  //不需要远程获取,或者满足任一条件  
  // Respond immediately if no remote fetches are required and any of the below conditions is true  
  //                        1) fetch request does not want to wait  //                        2) fetch request does not require any data  //                        3) has enough data to respond  //                        4) some error happens while reading data  //                        5) we found a diverging epoch  //                        6) has a preferred read replica  if (!remoteFetchInfo.isPresent && (params.maxWaitMs <= 0 || fetchInfos.isEmpty || bytesReadable >= params.minBytes || errorReadingData ||  
    hasDivergingEpoch || hasPreferredReadReplica)) {  
    val fetchPartitionData = logReadResults.map { case (tp, result) =>  
      val isReassignmentFetch = params.isFromFollower && isAddingReplica(tp.topicPartition, params.replicaId)  
      tp -> result.toFetchPartitionData(isReassignmentFetch)  
    }  
    responseCallback(fetchPartitionData)  
  } else {  
    //构建返回结果  
    // construct the fetch results from the read results  
    val fetchPartitionStatus = new mutable.ArrayBuffer[(TopicIdPartition, FetchPartitionStatus)]  
    fetchInfos.foreach { case (topicIdPartition, partitionData) =>  
      logReadResultMap.get(topicIdPartition).foreach(logReadResult => {  
        val logOffsetMetadata = logReadResult.info.fetchOffsetMetadata  
        fetchPartitionStatus += (topicIdPartition -> FetchPartitionStatus(logOffsetMetadata, partitionData))  
      })  
    }  
  
    //执行远程fetch  
    if (remoteFetchInfo.isPresent) {  
      val maybeLogReadResultWithError = processRemoteFetch(remoteFetchInfo.get(), params, responseCallback, logReadResults, fetchPartitionStatus)  
      if (maybeLogReadResultWithError.isDefined) {  
        // If there is an error in scheduling the remote fetch task, return what we currently have  
        // (the data read from local log segment for the other topic-partitions) and an error for the topic-partition        // that we couldn't read from remote storage        val partitionToFetchPartitionData = buildPartitionToFetchPartitionData(logReadResults, remoteFetchInfo.get().topicPartition, maybeLogReadResultWithError.get)  
        responseCallback(partitionToFetchPartitionData)  
      }  
    } else {  
      // If there is not enough data to respond and there is no remote data, we will let the fetch request  
      // wait for new data.      val delayedFetch = new DelayedFetch(  
        params = params,  
        fetchPartitionStatus = fetchPartitionStatus,  
        replicaManager = this,  
        quota = quota,  
        responseCallback = responseCallback  
      )  
  
      // create a list of (topic, partition) pairs to use as keys for this delayed fetch operation  
      val delayedFetchKeys = fetchPartitionStatus.map { case (tp, _) => TopicPartitionOperationKey(tp) }  
  
      // try to complete the request immediately, otherwise put it into the purgatory;  
      // this is because while the delayed fetch operation is being created, new requests      // may arrive and hence make this operation completable.      delayedFetchPurgatory.tryCompleteElseWatch(delayedFetch, delayedFetchKeys)  
    }  
  }  
}s

readFromLog()方法中通过partition读取消息:

//调用partition的fetchRecords读取消息  
// Try the read first, this tells us whether we need all of adjustedFetchSize for this partition  
val readInfo: LogReadInfo = partition.fetchRecords(  
  fetchParams = params,  
  fetchPartitionData = fetchInfo,  
  fetchTimeMs = fetchTimeMs,  
  maxBytes = adjustedMaxBytes,  
  minOneMessage = minOneMessage,  
  updateFetchState = !readFromPurgatory)
def fetchRecords(  
  fetchParams: FetchParams,  
  fetchPartitionData: FetchRequest.PartitionData,  
  fetchTimeMs: Long,  
  maxBytes: Int,  
  minOneMessage: Boolean,  
  updateFetchState: Boolean  
): LogReadInfo = {  
  
  //定义  
  def readFromLocalLog(log: UnifiedLog): LogReadInfo = {  
    readRecords(  
      log,  
      fetchPartitionData.lastFetchedEpoch,  
      fetchPartitionData.fetchOffset,  
      fetchPartitionData.currentLeaderEpoch,  
      maxBytes,  
      fetchParams.isolation,  
      minOneMessage  
    )  
  }  
  
  //检查请求是否来自partition的follower  
  if (fetchParams.isFromFollower) {  
    //需要检查是否来自有效的follower  
    // Check that the request is from a valid replica before doing the read    val (replica, logReadInfo) = inReadLock(leaderIsrUpdateLock) {  
      val localLog = localLogWithEpochOrThrow(  
        fetchPartitionData.currentLeaderEpoch,  
        fetchParams.fetchOnlyLeader  
      )  
      val replica = followerReplicaOrThrow(  
        fetchParams.replicaId,  
        fetchPartitionData  
      )  
      val logReadInfo = readFromLocalLog(localLog)  
      (replica, logReadInfo)  
    }  
  
    if (updateFetchState && !logReadInfo.divergingEpoch.isPresent) {  
      updateFollowerFetchState(  
        replica,  
        followerFetchOffsetMetadata = logReadInfo.fetchedData.fetchOffsetMetadata,  
        followerStartOffset = fetchPartitionData.logStartOffset,  
        followerFetchTimeMs = fetchTimeMs,  
        leaderEndOffset = logReadInfo.logEndOffset,  
        fetchParams.replicaEpoch  
      )  
    }  
  
    logReadInfo  
  } else {  
    inReadLock(leaderIsrUpdateLock) {  
      val localLog = localLogWithEpochOrThrow(  
        fetchPartitionData.currentLeaderEpoch,  
        fetchParams.fetchOnlyLeader  
      )  
      readFromLocalLog(localLog)  
    }  
  }  
}

kafka.log.UnifiedLog#read:

def read(startOffset: Long,  
         maxLength: Int,  
         isolation: FetchIsolation,  
         minOneMessage: Boolean): FetchDataInfo = {  
  checkLogStartOffset(startOffset)  
  val maxOffsetMetadata = isolation match {  
    case FetchIsolation.LOG_END => localLog.logEndOffsetMetadata  
    case FetchIsolation.HIGH_WATERMARK => fetchHighWatermarkMetadata  
    case FetchIsolation.TXN_COMMITTED => fetchLastStableOffsetMetadata  
  }  
  localLog.read(startOffset, maxLength, minOneMessage, maxOffsetMetadata, isolation == FetchIsolation.TXN_COMMITTED)  
}

kafka.log.LocalLog#read:

def read(startOffset: Long,  
         maxLength: Int,  
         minOneMessage: Boolean,  
         maxOffsetMetadata: LogOffsetMetadata,  
         includeAbortedTxns: Boolean): FetchDataInfo = {  
  maybeHandleIOException(s"Exception while reading from $topicPartition in dir ${dir.getParent}") {  
    trace(s"Reading maximum $maxLength bytes at offset $startOffset from log with " +  
      s"total length ${segments.sizeInBytes} bytes")  
  
  
    //1.1 选择offset所在的segment  
    val endOffsetMetadata = nextOffsetMetadata  
    val endOffset = endOffsetMetadata.messageOffset  
    var segmentOpt = segments.floorSegment(startOffset)  
  
    // return error on attempt to read beyond the log end offset  
    if (startOffset > endOffset || !segmentOpt.isPresent)  
      throw new OffsetOutOfRangeException(s"Received request for offset $startOffset for partition $topicPartition, " +  
        s"but we only have log segments upto $endOffset.")  
  
    if (startOffset == maxOffsetMetadata.messageOffset)  
      emptyFetchDataInfo(maxOffsetMetadata, includeAbortedTxns)  
    else if (startOffset > maxOffsetMetadata.messageOffset)  
      emptyFetchDataInfo(convertToOffsetMetadataOrThrow(startOffset), includeAbortedTxns)  
    else {  
      // Do the read on the segment with a base offset less than the target offset  
      // but if that segment doesn't contain any messages with an offset greater than that      // continue to read from successive segments until we get some messages or we reach the end of the log      var fetchDataInfo: FetchDataInfo = null  
      while (fetchDataInfo == null && segmentOpt.isPresent) {  
        val segment = segmentOpt.get  
        val baseOffset = segment.baseOffset  
  
        // 1. If `maxOffsetMetadata#segmentBaseOffset < segment#baseOffset`, then return maxPosition as empty.  
        // 2. Use the max-offset position if it is on this segment; otherwise, the segment size is the limit.        // 3. When maxOffsetMetadata is message-offset-only, then we don't know the relativePositionInSegment so        //    return maxPosition as empty to avoid reading beyond the max-offset        val maxPositionOpt: Optional[java.lang.Long] =  
          if (segment.baseOffset < maxOffsetMetadata.segmentBaseOffset)  
            Optional.of(segment.size)  
          else if (segment.baseOffset == maxOffsetMetadata.segmentBaseOffset && !maxOffsetMetadata.messageOffsetOnly())  
            Optional.of(maxOffsetMetadata.relativePositionInSegment)  
          else  
            Optional.empty()  
  
        fetchDataInfo = segment.read(startOffset, maxLength, maxPositionOpt, minOneMessage)  
        if (fetchDataInfo != null) {  
          if (includeAbortedTxns)  
            fetchDataInfo = addAbortedTransactions(startOffset, segment, fetchDataInfo)  
        } else segmentOpt = segments.higherSegment(baseOffset)  
      }  
  
      if (fetchDataInfo != null) fetchDataInfo  
      else {  
        // okay we are beyond the end of the last segment with no data fetched although the start offset is in range,  
        // this can happen when all messages with offset larger than start offsets have been deleted.        // In this case, we will return the empty set with log end offset metadata        new FetchDataInfo(nextOffsetMetadata, MemoryRecords.EMPTY)  
      }  
    }  
  }  
}

通过LogSegment读取指定位移的消息:

public FetchDataInfo read(long startOffset, int maxSize, Optional<Long> maxPositionOpt, boolean minOneMessage) throws IOException {  
    if (maxSize < 0)  
        throw new IllegalArgumentException("Invalid max size " + maxSize + " for log read from segment " + log);  
  
    //1.1 将startOffset转换为物理文件位置  
    LogOffsetPosition startOffsetAndSize = translateOffset(startOffset);  
  
    // if the start position is already off the end of the log, return null  
    if (startOffsetAndSize == null)  
        return null;  
  
    int startPosition = startOffsetAndSize.position;  
    LogOffsetMetadata offsetMetadata = new LogOffsetMetadata(startOffset, this.baseOffset, startPosition);  
  
    int adjustedMaxSize = maxSize;  
    if (minOneMessage)  
        adjustedMaxSize = Math.max(maxSize, startOffsetAndSize.size);  
  
    // return empty records in the fetch-data-info when:  
    // 1. adjustedMaxSize is 0 (or)    // 2. maxPosition to read is unavailable    if (adjustedMaxSize == 0 || !maxPositionOpt.isPresent())  
        return new FetchDataInfo(offsetMetadata, MemoryRecords.EMPTY);  
  
    // calculate the length of the message set to read based on whether or not they gave us a maxOffset  
    int fetchSize = Math.min((int) (maxPositionOpt.get() - startPosition), adjustedMaxSize);  
  
    //调用slice获取消息  
    return new FetchDataInfo(offsetMetadata, log.slice(startPosition, fetchSize),  
        adjustedMaxSize < startOffsetAndSize.size, Optional.empty());  
}

这里并没有从本地文件中读取实际的消息,这是lazy-load方式,实际读取逻辑在:org.apache.kafka.common.record.FileLogInputStream.FileChannelRecordBatch#loadBatchWithSize

private RecordBatch loadBatchWithSize(int size, String description) {  
    FileChannel channel = fileRecords.channel();  
    try {  
        ByteBuffer buffer = ByteBuffer.allocate(size);  
        Utils.readFullyOrFail(channel, buffer, position, description);  
        buffer.rewind();  
        return toMemoryRecordBatch(buffer);  
    } catch (IOException e) {  
        throw new KafkaException("Failed to load record batch at position " + position + " from " + fileRecords, e);  
    }  
}s

索引重建

broker启动加载所有segment时,若Index文件损坏或缺失,会调用索引重建逻辑:

try segment.sanityCheck(timeIndexFileNewlyCreated)  
catch {  
  case _: NoSuchFileException =>  
    if (hadCleanShutdown || segment.baseOffset < recoveryPointCheckpoint)  
      error(s"Could not find offset index file corresponding to log file" +  
        s" ${segment.log.file.getAbsolutePath}, recovering segment and rebuilding index files...")  
    recoverSegment(segment)  
  case e: CorruptIndexException =>  
    warn(s"Found a corrupted index file corresponding to log file" +  
      s" ${segment.log.file.getAbsolutePath} due to ${e.getMessage}}, recovering segment and" +  
      " rebuilding index files...")  
    recoverSegment(segment)  
}
private def recoverSegment(segment: LogSegment): Int = {  
  val producerStateManager = new ProducerStateManager(  
    topicPartition,  
    dir,  
    this.producerStateManager.maxTransactionTimeoutMs(),  
    this.producerStateManager.producerStateManagerConfig(),  
    time)  
  UnifiedLog.rebuildProducerState(  
    producerStateManager,  
    segments,  
    logStartOffsetCheckpoint,  
    segment.baseOffset,  
    config.recordVersion,  
    time,  
    reloadFromCleanShutdown = false,  
    logIdent)  
  val bytesTruncated = segment.recover(producerStateManager, leaderEpochCache)  
  // once we have recovered the segment's data, take a snapshot to ensure that we won't  
  // need to reload the same segment again while recovering another segment.  producerStateManager.takeSnapshot()  
  bytesTruncated  
}
/**  
 * Run recovery on the given segment. This will rebuild the index from the log file and lop off any invalid bytes * from the end of the log and index. * * This method is not thread-safe. * * @param producerStateManager producer状态管理,用于恢复事务索引  
 * @param leaderEpochCache 用于更新leader选举周期  
 * @return 从log中截断的字节数  
 * @throws LogSegmentOffsetOverflowException if the log segment contains an offset that causes the index offset to overflow  
 */
 public int recover(ProducerStateManager producerStateManager, Optional<LeaderEpochFileCache> leaderEpochCache) throws IOException {  
  
    // 1. 清空所有的索引文件  
    offsetIndex().reset();  
    timeIndex().reset();  
    txnIndex.reset();  
    int validBytes = 0;  
    int lastIndexEntry = 0;  
    maxTimestampAndOffsetSoFar = TimestampOffset.UNKNOWN;  
    try {  
        //2. 遍历log的RecordBatch  
        for (RecordBatch batch : log.batches()) {  
            //2.1 校验batch是否合法  
            batch.ensureValid();  
  
            //2.2 保证最后一条消息offset在范围内  
            ensureOffsetInRange(batch.lastOffset());  
  
            //2.3 校验batch最大时间戳是否大于segment最大时间戳  
            if (batch.maxTimestamp() > maxTimestampSoFar()) {  
                //2.3.1 更新maxTimestampAndOffsetSoFar  
                maxTimestampAndOffsetSoFar = new TimestampOffset(batch.maxTimestamp(), batch.lastOffset());  
            }  
  
            // 3. 构建index  
            if (validBytes - lastIndexEntry > indexIntervalBytes) {  
                offsetIndex().append(batch.lastOffset(), validBytes);  
                timeIndex().maybeAppend(maxTimestampSoFar(), shallowOffsetOfMaxTimestampSoFar());  
                lastIndexEntry = validBytes;  
            }  
  
            //3.1 更新validBytes  
            validBytes += batch.sizeInBytes();  
  
            //4. 更新leader选举周期  
            if (batch.magic() >= RecordBatch.MAGIC_VALUE_V2) {  
                leaderEpochCache.ifPresent(cache -> {  
                    // 4.1 获取batch的选举周期,如果 partitionLeaderEpoch > cache latestEpoch,则更新cache  
                    if (batch.partitionLeaderEpoch() >= 0 &&  
                            (!cache.latestEpoch().isPresent() || batch.partitionLeaderEpoch() > cache.latestEpoch().getAsInt()))  
                        cache.assign(batch.partitionLeaderEpoch(), batch.baseOffset());  
                });  
                //4.2 更新producer状态  
                updateProducerState(producerStateManager, batch);  
            }  
        }  
    } catch (CorruptRecordException | InvalidRecordException e) {  
        LOGGER.warn("Found invalid messages in log segment {} at byte offset {}.", log.file().getAbsolutePath(),  
            validBytes, e);  
    }  
    int truncated = log.sizeInBytes() - validBytes;  
    if (truncated > 0)  
        LOGGER.debug("Truncated {} invalid bytes at the end of segment {} during recovery", truncated, log.file().getAbsolutePath());  
    //5. 底层调用FileChannel的truncate方法截断文件,用于清理末尾的无效字节
    log.truncateTo(validBytes);  
    offsetIndex().trimToValidSize();  
    // A normally closed segment always appends the biggest timestamp ever seen into log segment, we do this as well.  
    timeIndex().maybeAppend(maxTimestampSoFar(), shallowOffsetOfMaxTimestampSoFar(), true); 
    timeIndex().trimToValidSize();  
    return truncated;  
}