Kafka Log由多个LogSegment构成,每个LogSegment对应一个分区,每个LogSegment对象都会在磁盘中创建一组文件:
- 日志消息文件(.log)
- 位移索引文件(.index)
- 时间戳索引文件(.timeindex)
- 已中止十五的索引文件(.txnindex)
LogManager
LogManager作为Kafka Log管理的入口点,负责日志创建、检索、清理,所有的读写操作都会委托给对应的日志实例。
private val currentLogs = new Pool[TopicPartition, UnifiedLog]()
private val _liveLogDirs: ConcurrentLinkedQueue[File] = createAndValidateLogDirs(logDirs, initialOfflineDirs)
currentLogs字段是用于存储分区和UnifiedLog映射的map结构,Pool内部使用ConcurrentHashMap作为存储。
liveLogDirs用于存储log路径下所有有效的File对象,存储结构使用ConcurrentLinkedQueue。createAndValidateLogDirs()方法用来检查并追加有效路径到队列中。
startup启动流程
KafkaServer的startup()方法中会调用logManager的startup()方法,用来加载所有的日志。
logManager.startup(zkClient.getAllTopicsInCluster())
startup的第一个参数是topic名称集合,第二个可选参数isStray通过metadata计算当前UnifiedLog是否需要当前broker加载(kafka.log.LogManager.isStrayKraftReplica)
/**
* Start the background threads to flush logs and do log cleanup */def startup(topicNames: Set[String], isStray: UnifiedLog => Boolean = _ => false): Unit = {
// ensure consistency between default config and overrides
val defaultConfig = currentDefaultConfig
startupWithConfigOverrides(defaultConfig, fetchTopicConfigOverrides(defaultConfig, topicNames), isStray)
}s
fetchTopicConfigOverrides()方法用于检查topic是否存在自定义配置并覆盖。接下来看startupWithConfigOverrides()的具体实现。
- 调用loadLogs()加载所有log
- 启动以下定时调度任务:
- 日志清理任务
- 日志刷盘任务
- 日志checkpoint更新任务
- log start offset属性checkpoint更新任务
- 日志删除任务
- 检查
log.cleaner.enable
参数,true则配置LogCleaner
scheduler定时调度
scheduler定时调度使用内部封装的org.apache.kafka.server.util.KafkaScheduler
,它是对ScheduledThreadPoolExecutor的简单封装。
private[log] def startupWithConfigOverrides(
defaultConfig: LogConfig,
topicConfigOverrides: Map[String, LogConfig],
isStray: UnifiedLog => Boolean): Unit = {
loadLogs(defaultConfig, topicConfigOverrides, isStray) // this could take a while if shutdown was not clean
/* Schedule the cleanup task to delete old logs */ if (scheduler != null) {
info("Starting log cleanup with a period of %d ms.".format(retentionCheckMs))
scheduler.schedule("kafka-log-retention",
() => cleanupLogs(),
initialTaskDelayMs,
retentionCheckMs)
info("Starting log flusher with a default period of %d ms.".format(flushCheckMs))
scheduler.schedule("kafka-log-flusher",
() => flushDirtyLogs(),
initialTaskDelayMs,
flushCheckMs)
scheduler.schedule("kafka-recovery-point-checkpoint",
() => checkpointLogRecoveryOffsets(),
initialTaskDelayMs,
flushRecoveryOffsetCheckpointMs)
scheduler.schedule("kafka-log-start-offset-checkpoint",
() => checkpointLogStartOffsets(),
initialTaskDelayMs,
flushStartOffsetCheckpointMs)
scheduler.scheduleOnce("kafka-delete-logs", // will be rescheduled after each delete logs with a dynamic period
() => deleteLogs(),
initialTaskDelayMs)
}
if (cleanerConfig.enableCleaner) {
_cleaner = new LogCleaner(cleanerConfig, liveLogDirs, currentLogs, logDirFailureChannel, time = time)
_cleaner.startup()
}
}
loadLogs流程
loadLogs()方法作为实际加载Log数据的入口,逻辑链路较长,首先看下方法中使用到的类属性:
// A map that stores hadCleanShutdown flag for each log dir.
private val hadCleanShutdownFlags = new ConcurrentHashMap[String, Boolean]()
// A map that tells whether all logs in a log dir had been loaded or not at startup time.
private val loadLogsCompletedFlags = new ConcurrentHashMap[String, Boolean]()
@volatile private var recoveryPointCheckpoints = liveLogDirs.map(dir =>
(dir, new OffsetCheckpointFile(new File(dir, RecoveryPointCheckpointFile), logDirFailureChannel))).toMap
@volatile private var logStartOffsetCheckpoints = liveLogDirs.map(dir =>
(dir, new OffsetCheckpointFile(new File(dir, LogStartOffsetCheckpointFile), logDirFailureChannel))).toMap
其中hadCleanShutdownFlags和loadLogsCompletedFlags都是map[String, Boolean]
结构,分别用于存储对应log dir是否为clean shutdown、是否已经完成加载的flag。
recoveryPointCheckpoints和logStartOffsetCheckpoints是map[String, OffsetCheckpointFile]
结构,OffsetCheckpointFile对象是Partition => Offsets
的映射持久化存储的相关对象。类似的文件格式如下:
-----checkpoint file begin------
0 <- OffsetCheckpointFile.currentVersion
2 <- following entries size
tp1 par1 1 <- the format is: TOPIC PARTITION OFFSET
tp1 par2 2
-----checkpoint file end----------
loadLogs()方法的核心逻辑如下:
- 循环当前data dirs,每个data dir分配一个线程池执行load
- 获取当前dir下topic的recovery point和log start offset checkpoint信息
- 遍历当前dir下所有topic,忽略掉remote存储的,构建runnable加载任务,runnable任务会调用loadLog构造UnifiedLog
/**
* Recover and load all logs in the given data directories */private[log] def loadLogs(defaultConfig: LogConfig, topicConfigOverrides: Map[String, LogConfig], isStray: UnifiedLog => Boolean): Unit = {
info(s"Loading logs from log dirs $liveLogDirs")
//1.1 定义线程池和任务队列
val startMs = time.hiResClockMs()
val threadPools = ArrayBuffer.empty[ExecutorService]
val offlineDirs = mutable.Set.empty[(String, IOException)]
val jobs = ArrayBuffer.empty[Seq[Future[_]]]
var numTotalLogs = 0
// log dir path -> number of Remaining logs map for remainingLogsToRecover metric
val numRemainingLogs: ConcurrentMap[String, Int] = new ConcurrentHashMap[String, Int]
// log recovery thread name -> number of remaining segments map for remainingSegmentsToRecover metric
val numRemainingSegments: ConcurrentMap[String, Int] = new ConcurrentHashMap[String, Int]
def handleIOException(logDirAbsolutePath: String, e: IOException): Unit = {
offlineDirs.add((logDirAbsolutePath, e))
error(s"Error while loading log dir $logDirAbsolutePath", e)
}
//1.2 循环liveLogDirs
val uncleanLogDirs = mutable.Buffer.empty[String]
for (dir <- liveLogDirs) {
val logDirAbsolutePath = dir.getAbsolutePath
var hadCleanShutdown: Boolean = false
//1.3 每个data dir分配一个线程池去加载
try {
val pool = Executors.newFixedThreadPool(numRecoveryThreadsPerDataDir,
new LogRecoveryThreadFactory(logDirAbsolutePath))
threadPools.append(pool)
//1.4 检查上次是否为clean shutdown, 删除文件是为了防止加载日志时发生crash后仍然被认为是clean shutdown
val cleanShutdownFileHandler = new CleanShutdownFileHandler(dir.getPath)
if (cleanShutdownFileHandler.exists()) {
// Cache the clean shutdown status and use that for rest of log loading workflow. Delete the CleanShutdownFile
// so that if broker crashes while loading the log, it is considered hard shutdown during the next boot up. KAFKA-10471 cleanShutdownFileHandler.delete()
hadCleanShutdown = true
}
//1.5 更新当前log dir是否为clean shutdown
hadCleanShutdownFlags.put(logDirAbsolutePath, hadCleanShutdown)
//1.6 获取当前dir的recovery point信息和log start offset信息
var recoveryPoints = Map[TopicPartition, Long]()
try {
recoveryPoints = this.recoveryPointCheckpoints(dir).read()
} catch {
case e: Exception =>
warn(s"Error occurred while reading recovery-point-offset-checkpoint file of directory " +
s"$logDirAbsolutePath, resetting the recovery checkpoint to 0", e)
}
var logStartOffsets = Map[TopicPartition, Long]()
try {
logStartOffsets = this.logStartOffsetCheckpoints(dir).read()
} catch {
case e: Exception =>
warn(s"Error occurred while reading log-start-offset-checkpoint file of directory " +
s"$logDirAbsolutePath, resetting to the base offset of the first segment", e)
}
//1.7 忽略remote存储的topic
val logsToLoad = Option(dir.listFiles).getOrElse(Array.empty).filter(logDir =>
logDir.isDirectory &&
// Ignore remote-log-index-cache directory as that is index cache maintained by tiered storage subsystem
// but not any topic-partition dir. !logDir.getName.equals(RemoteIndexCache.DIR_NAME) &&
UnifiedLog.parseTopicPartitionName(logDir).topic != KafkaRaftServer.MetadataTopic)
numTotalLogs += logsToLoad.length
numRemainingLogs.put(logDirAbsolutePath, logsToLoad.length)
loadLogsCompletedFlags.put(logDirAbsolutePath, logsToLoad.isEmpty)
if (logsToLoad.isEmpty) {
info(s"No logs found to be loaded in $logDirAbsolutePath")
} else if (hadCleanShutdown) {
info(s"Skipping recovery of ${logsToLoad.length} logs from $logDirAbsolutePath since " +
"clean shutdown file was found")
} else {
info(s"Recovering ${logsToLoad.length} logs from $logDirAbsolutePath since no " +
"clean shutdown file was found")
uncleanLogDirs.append(logDirAbsolutePath)
}
//1.8 遍历当前dir,构建runnable加载任务,runnable调用loadLog构造UnifiedLog
val jobsForDir = logsToLoad.map { logDir =>
val runnable: Runnable = () => {
debug(s"Loading log $logDir")
var log = None: Option[UnifiedLog]
val logLoadStartMs = time.hiResClockMs()
try {
log = Some(loadLog(logDir, hadCleanShutdown, recoveryPoints, logStartOffsets,
defaultConfig, topicConfigOverrides, numRemainingSegments, isStray))
} catch {
case e: IOException =>
handleIOException(logDirAbsolutePath, e)
case e: KafkaStorageException if e.getCause.isInstanceOf[IOException] =>
// KafkaStorageException might be thrown, ex: during writing LeaderEpochFileCache
// And while converting IOException to KafkaStorageException, we've already handled the exception. So we can ignore it here. } finally {
val logLoadDurationMs = time.hiResClockMs() - logLoadStartMs
val remainingLogs = decNumRemainingLogs(numRemainingLogs, logDirAbsolutePath)
val currentNumLoaded = logsToLoad.length - remainingLogs
log match {
case Some(loadedLog) => info(s"Completed load of $loadedLog with ${loadedLog.numberOfSegments} segments, " +
s"local-log-start-offset ${loadedLog.localLogStartOffset()} and log-end-offset ${loadedLog.logEndOffset} in ${logLoadDurationMs}ms " +
s"($currentNumLoaded/${logsToLoad.length} completed in $logDirAbsolutePath)")
case None => info(s"Error while loading logs in $logDir in ${logLoadDurationMs}ms ($currentNumLoaded/${logsToLoad.length} completed in $logDirAbsolutePath)")
}
if (remainingLogs == 0) {
// loadLog is completed for all logs under the logDdir, mark it.
loadLogsCompletedFlags.put(logDirAbsolutePath, true)
}
}
}
runnable
}
jobs += jobsForDir.map(pool.submit)
} catch {
case e: IOException =>
handleIOException(logDirAbsolutePath, e)
}
}
//1.9 添加Metrics
try {
addLogRecoveryMetrics(numRemainingLogs, numRemainingSegments)
for (dirJobs <- jobs) {
dirJobs.foreach(_.get)
}
offlineDirs.foreach { case (dir, e) =>
logDirFailureChannel.maybeAddOfflineLogDir(dir, s"Error while loading log dir $dir", e)
}
} catch {
case e: ExecutionException =>
error(s"There was an error in one of the threads during logs loading: ${e.getCause}")
throw e.getCause
} finally {
removeLogRecoveryMetrics()
threadPools.foreach(_.shutdown())
}
val elapsedMs = time.hiResClockMs() - startMs
val printedUncleanLogDirs = if (uncleanLogDirs.isEmpty) "" else s" (unclean log dirs = $uncleanLogDirs)"
info(s"Loaded $numTotalLogs logs in ${elapsedMs}ms$printedUncleanLogDirs")
}
loadLog()方法会先创建UnifiedLog对象,在其初始化逻辑中,会调用LogLoader.load()方法完成日志加载动作,UnifiedLog初始化完成后会将该对象存储到map中。
private[log] def loadLog(logDir: File,
hadCleanShutdown: Boolean,
recoveryPoints: Map[TopicPartition, Long],
logStartOffsets: Map[TopicPartition, Long],
defaultConfig: LogConfig,
topicConfigOverrides: Map[String, LogConfig],
numRemainingSegments: ConcurrentMap[String, Int],
isStray: UnifiedLog => Boolean): UnifiedLog = {
val topicPartition = UnifiedLog.parseTopicPartitionName(logDir)
val config = topicConfigOverrides.getOrElse(topicPartition.topic, defaultConfig)
val logRecoveryPoint = recoveryPoints.getOrElse(topicPartition, 0L)
val logStartOffset = logStartOffsets.getOrElse(topicPartition, 0L)
//1.1 创建UnifiedLog对象,这里会调用UnifiedLog的apply方法
val log = UnifiedLog(
dir = logDir,
config = config,
logStartOffset = logStartOffset,
recoveryPoint = logRecoveryPoint,
maxTransactionTimeoutMs = maxTransactionTimeoutMs,
producerStateManagerConfig = producerStateManagerConfig,
producerIdExpirationCheckIntervalMs = producerIdExpirationCheckIntervalMs,
scheduler = scheduler,
time = time,
brokerTopicStats = brokerTopicStats,
logDirFailureChannel = logDirFailureChannel,
lastShutdownClean = hadCleanShutdown,
topicId = None,
keepPartitionMetadataFile = keepPartitionMetadataFile,
numRemainingSegments = numRemainingSegments,
remoteStorageSystemEnable = remoteStorageSystemEnable)
//1.2 检查是否为待删除文件,添加到待删除队列中,后续定时任务执行删除动作
if (logDir.getName.endsWith(UnifiedLog.DeleteDirSuffix)) {
addLogToBeDeleted(log)
} else if (logDir.getName.endsWith(UnifiedLog.StrayDirSuffix)) {
//1.2 检查是否为Stray分区
addStrayLog(topicPartition, log)
warn(s"Loaded stray log: $logDir")
} else if (isStray(log)) {
// Unlike Zookeeper mode, which tracks pending topic deletions under a ZNode, KRaft is unable to prevent a topic from being recreated before every replica has been deleted.
// A KRaft broker with an offline directory may be unable to detect it still holds a to-be-deleted replica, // and can create a conflicting topic partition for a new incarnation of the topic in one of the remaining online directories. // So upon a restart in which the offline directory is back online we need to clean up the old replica directory. log.renameDir(UnifiedLog.logStrayDirName(log.topicPartition), shouldReinitialize = false)
addStrayLog(log.topicPartition, log)
warn(s"Log in ${logDir.getAbsolutePath} marked stray and renamed to ${log.dir.getAbsolutePath}")
} else {
//previous保存旧值
val previous = {
if (log.isFuture)
this.futureLogs.put(topicPartition, log)
else
this.currentLogs.put(topicPartition, log)
}
//check是否重复
if (previous != null) {
if (log.isFuture)
throw new IllegalStateException(s"Duplicate log directories found: ${log.dir.getAbsolutePath}, ${previous.dir.getAbsolutePath}")
else
throw new IllegalStateException(s"Duplicate log directories for $topicPartition are found in both ${log.dir.getAbsolutePath} " +
s"and ${previous.dir.getAbsolutePath}. It is likely because log directory failure happened while broker was " +
s"replacing current replica with future replica. Recover broker from this failure by manually deleting one of the two directories " +
s"for this partition. It is recommended to delete the partition in the log directory that is known to have failed recently.")
}
}
log
}
UnifiedLog的apply()方法完成初始化动作:
def apply(dir: File,
config: LogConfig,
logStartOffset: Long,
recoveryPoint: Long,
scheduler: Scheduler,
brokerTopicStats: BrokerTopicStats,
time: Time,
maxTransactionTimeoutMs: Int,
producerStateManagerConfig: ProducerStateManagerConfig,
producerIdExpirationCheckIntervalMs: Int,
logDirFailureChannel: LogDirFailureChannel,
lastShutdownClean: Boolean = true,
topicId: Option[Uuid],
keepPartitionMetadataFile: Boolean,
numRemainingSegments: ConcurrentMap[String, Int] = new ConcurrentHashMap[String, Int],
remoteStorageSystemEnable: Boolean = false,
logOffsetsListener: LogOffsetsListener = LogOffsetsListener.NO_OP_OFFSETS_LISTENER): UnifiedLog = {
// create the log directory if it doesn't exist
Files.createDirectories(dir.toPath)
//1.1 从dir解析出topicPartition,初始化LogSegments
val topicPartition = UnifiedLog.parseTopicPartitionName(dir)
val segments = new LogSegments(topicPartition)
// The created leaderEpochCache will be truncated by LogLoader if necessary
// so it is guaranteed that the epoch entries will be correct even when on-disk // checkpoint was stale (due to async nature of LeaderEpochFileCache#truncateFromStart/End). val leaderEpochCache = UnifiedLog.maybeCreateLeaderEpochCache(
dir,
topicPartition,
logDirFailureChannel,
config.recordVersion,
s"[UnifiedLog partition=$topicPartition, dir=${dir.getParent}] ",
None,
scheduler)
val producerStateManager = new ProducerStateManager(topicPartition, dir,
maxTransactionTimeoutMs, producerStateManagerConfig, time)
val isRemoteLogEnabled = UnifiedLog.isRemoteLogEnabled(remoteStorageSystemEnable, config, topicPartition.topic)
//diaoyong
val offsets = new LogLoader(
dir,
topicPartition,
config,
scheduler,
time,
logDirFailureChannel,
lastShutdownClean,
segments,
logStartOffset,
recoveryPoint,
leaderEpochCache.asJava,
producerStateManager,
numRemainingSegments,
isRemoteLogEnabled,
).load()
val localLog = new LocalLog(dir, config, segments, offsets.recoveryPoint,
offsets.nextOffsetMetadata, scheduler, time, topicPartition, logDirFailureChannel)
new UnifiedLog(offsets.logStartOffset,
localLog,
brokerTopicStats,
producerIdExpirationCheckIntervalMs,
leaderEpochCache,
producerStateManager,
topicId,
keepPartitionMetadataFile,
remoteStorageSystemEnable,
logOffsetsListener)
}
LogLoader的load()方法遍历当前分区dir,加载LogSegment,并存储到LogSegments中,流程如下:
private def loadSegmentFiles(): Unit = {
// load segments in ascending order because transactional data from one segment may depend on the
// segments that come before it //1. 按升序加载
for (file <- dir.listFiles.sortBy(_.getName) if file.isFile) {
//1.1 检查是否为index文件,若是则检查它的log文件是否存在
if (isIndexFile(file)) {
// if it is an index file, make sure it has a corresponding .log file
val offset = offsetFromFile(file)
val logFile = LogFileUtils.logFile(dir, offset)
if (!logFile.exists) {
warn(s"Found an orphaned index file ${file.getAbsolutePath}, with no corresponding log file.")
Files.deleteIfExists(file.toPath)
}
} else if (isLogFile(file)) {
//1.2 加载log
// if it's a log file, load the corresponding log segment val baseOffset = offsetFromFile(file)
val timeIndexFileNewlyCreated = !LogFileUtils.timeIndexFile(dir, baseOffset).exists()
//1.3 调用LogSegment.open()
val segment = LogSegment.open(
dir,
baseOffset,
config,
time,
true,
0,
false,
"")
//1.4 进行完整性检查
try segment.sanityCheck(timeIndexFileNewlyCreated)
catch {
case _: NoSuchFileException =>
if (hadCleanShutdown || segment.baseOffset < recoveryPointCheckpoint)
error(s"Could not find offset index file corresponding to log file" +
s" ${segment.log.file.getAbsolutePath}, recovering segment and rebuilding index files...")
recoverSegment(segment)
case e: CorruptIndexException =>
warn(s"Found a corrupted index file corresponding to log file" +
s" ${segment.log.file.getAbsolutePath} due to ${e.getMessage}}, recovering segment and" +
" rebuilding index files...")
recoverSegment(segment)
}
//1.5 更新当前LogSegments
segments.add(segment)
}
}
}
LogSegment结构
LogSegment.open()是提供静态创建LogSegment的入口。
public static LogSegment open(File dir, long baseOffset, LogConfig config, Time time, boolean fileAlreadyExists,
int initFileSize, boolean preallocate, String fileSuffix) throws IOException {
int maxIndexSize = config.maxIndexSize;
return new LogSegment(
FileRecords.open(LogFileUtils.logFile(dir, baseOffset, fileSuffix), fileAlreadyExists, initFileSize, preallocate),
LazyIndex.forOffset(LogFileUtils.offsetIndexFile(dir, baseOffset, fileSuffix), baseOffset, maxIndexSize),
LazyIndex.forTime(LogFileUtils.timeIndexFile(dir, baseOffset, fileSuffix), baseOffset, maxIndexSize),
new TransactionIndex(baseOffset, LogFileUtils.transactionIndexFile(dir, baseOffset, fileSuffix)),
baseOffset,
config.indexInterval,
config.randomSegmentJitter(),
time);
}
成员对象
org.apache.kafka.storage.internals.log.LogSegment中存在以下属性:
// 日志消息文件对象,FileRecords是文件关联对象
private final FileRecords log;
//三个索引文件对象
private final LazyIndex<OffsetIndex> lazyOffsetIndex;
private final LazyIndex<TimeIndex> lazyTimeIndex;
private final TransactionIndex txnIndex;
//起始offset
private final long baseOffset;
//broker端参数log.index.interval.bytes值,用于控制LogSegment新增索引项的频率,默认写入4KB时新增一条索引项
private final int indexIntervalBytes;
//LogSegment新增数据的扰动值,打散磁盘IO
private final long rollJitterMs;
//写入日志的最新时间戳
private volatile OptionalLong rollingBasedTimestamp = OptionalLong.empty();
//最后一次更新索引至今,写入的字节数
private int bytesSinceLastIndexEntry = 0;
FileRecords
log字段是FileRecords,该对象内部包括File对象、文件开始结束位置、文件大小、FileChannel,它的和新方法也较为简单:
/**
* Append a set of records to the file. This method is not thread-safe and must be * protected with a lock. * * @param records The records to append
* @return the number of bytes written to the underlying file
*/public int append(MemoryRecords records) throws IOException {
if (records.sizeInBytes() > Integer.MAX_VALUE - size.get())
throw new IllegalArgumentException("Append of size " + records.sizeInBytes() +
" bytes is too large for segment with current file position at " + size.get());
int written = records.writeFullyTo(channel);
size.getAndAdd(written);
return written;
}
/**
* Commit all written data to the physical disk */public void flush() throws IOException {
channel.force(true);
}
/**
* Close this record set */public void close() throws IOException {
flush();
trim();
channel.close();
}
在初始化LogSegment时,会调用FileRecords的open方法完成内部log字段的初始化:
public static FileRecords open(File file,
boolean mutable,
boolean fileAlreadyExists,
int initFileSize,
boolean preallocate) throws IOException {
FileChannel channel = openChannel(file, mutable, fileAlreadyExists, initFileSize, preallocate);
int end = (!fileAlreadyExists && preallocate) ? 0 : Integer.MAX_VALUE;
return new FileRecords(file, channel, 0, end, false);
}
Index结构
private final LazyIndex<OffsetIndex> lazyOffsetIndex;
private final LazyIndex<TimeIndex> lazyTimeIndex;
private final TransactionIndex txnIndex;
LogSegment有三个索引字段,其中offsetIndex和timeIndex使用LazyIndex包装类,以达到延迟初始化的效果,只有在第一次读取时,会将数据加载到内存中:
public T get() throws IOException {
IndexWrapper wrapper = indexWrapper;
if (wrapper instanceof IndexValue<?>)
return ((IndexValue<T>) wrapper).index;
else {
lock.lock();
try {
if (indexWrapper instanceof IndexValue<?>)
return ((IndexValue<T>) indexWrapper).index;
else if (indexWrapper instanceof IndexFile) {
IndexFile indexFile = (IndexFile) indexWrapper;
IndexValue<T> indexValue = new IndexValue<>(loadIndex(indexFile.file));
indexWrapper = indexValue;
return indexValue.index;
} else
throw new IllegalStateException("Unexpected type for indexWrapper " + indexWrapper.getClass());
} finally {
lock.unlock();
}
}
}
实际加载index文件使用mmap方式,将内核buffer映射到用户buffer,减少了内存拷贝的开销,这也是Kafka较为重要的优化点之一。
private void createAndAssignMmap() throws IOException {
boolean newlyCreated = file.createNewFile();
RandomAccessFile raf;
if (writable)
raf = new RandomAccessFile(file, "rw");
else
raf = new RandomAccessFile(file, "r");
try {
/* pre-allocate the file if necessary */
if (newlyCreated) {
if (maxIndexSize < entrySize())
throw new IllegalArgumentException("Invalid max index size: " + maxIndexSize);
raf.setLength(roundDownToExactMultiple(maxIndexSize, entrySize()));
}
long length = raf.length();
MappedByteBuffer mmap = createMappedBuffer(raf, newlyCreated, length, writable, entrySize());
this.length = length;
this.mmap = mmap;
} finally {
Utils.closeQuietly(raf, "index " + file.getName());
}
}
Offset初始化
UnifiedLog在进行初始化时,同时也会更新内部的log start offset信息。
locally {
//更新log start offset
def updateLocalLogStartOffset(offset: Long): Unit = {
_localLogStartOffset = offset
if (highWatermark < offset) {
updateHighWatermark(offset)
}
if (this.recoveryPoint < offset) {
localLog.updateRecoveryPoint(offset)
}
}
initializePartitionMetadata()
updateLogStartOffset(logStartOffset)
updateLocalLogStartOffset(math.max(logStartOffset, localLog.segments.firstSegmentBaseOffset.orElse(0L)))
if (!remoteLogEnabled())
logStartOffset = localLogStartOffset()
maybeIncrementFirstUnstableOffset()
initializeTopicId()
logOffsetsListener.onHighWatermarkUpdated(highWatermarkMetadata.messageOffset)
}