- Admin:提供了管理topic、partition、config的相关API
- Consumer:提供了消费topic的API
- Producer:提供了向topic投递消息的功能
源码以Kafka 3.9为例。
Producer作为Kafka client中的消息生产者,提供send()方法用于写入消息,并有后台sender线程定时发送消息。
* See {@link KafkaProducer#send(ProducerRecord)}
*/Future<RecordMetadata> send(ProducerRecord<K, V> record);
/** 支持回调方法
* See {@link KafkaProducer#send(ProducerRecord, Callback)}
*/Future<RecordMetadata> send(ProducerRecord<K, V> record, Callback callback);
public Future<RecordMetadata> send(ProducerRecord<K, V> record, Callback callback) {
// 拦截器前置send动作
ProducerRecord<K, V> interceptedRecord = this.interceptors.onSend(record);
return doSend(interceptedRecord, callback);
- ProducerRecord<K, V> onSend(ProducerRecord<K, V> record):send前置处理逻辑
- onAcknowledgement(RecordMetadata metadata, Exception exception):消息被应答之后或发送消息失败时调用。
- close():用于关闭interceptor资源。
private Future<RecordMetadata> doSend(ProducerRecord<K, V> record, Callback callback) {
// 1.1 创建callback对象
AppendCallbacks appendCallbacks = new AppendCallbacks(callback, this.interceptors, record);
try {
//1.2 检查producer是否被close
// first make sure the metadata for the topic is available
long nowMs = time.milliseconds();
ClusterAndWaitTime clusterAndWaitTime;
//1.3 拉取指定topic、分区的元数据,和等待时间
try {
clusterAndWaitTime = waitOnMetadata(record.topic(), record.partition(), nowMs, maxBlockTimeMs);
} catch (KafkaException e) {
if (metadata.isClosed())
throw new KafkaException("Producer closed while send in progress", e);
throw e;
nowMs += clusterAndWaitTime.waitedOnMetadataMs;
long remainingWaitMs = Math.max(0, maxBlockTimeMs - clusterAndWaitTime.waitedOnMetadataMs);
Cluster cluster = clusterAndWaitTime.cluster;
//1.4 key value进行序列化
byte[] serializedKey;
try {
serializedKey = keySerializer.serialize(record.topic(), record.headers(), record.key());
} catch (ClassCastException cce) {
throw new SerializationException("Can't convert key of class " + record.key().getClass().getName() +
" to class " + producerConfig.getClass(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG).getName() +
" specified in key.serializer", cce);
byte[] serializedValue;
try {
serializedValue = valueSerializer.serialize(record.topic(), record.headers(), record.value());
} catch (ClassCastException cce) {
throw new SerializationException("Can't convert value of class " + record.value().getClass().getName() +
" to class " + producerConfig.getClass(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG).getName() +
" specified in value.serializer", cce);
// 1.5 计算当前消息所属的partition
int partition = partition(record, serializedKey, serializedValue, cluster);
// 1.6 设置消息header为readOnly
Header[] headers = record.headers().toArray();
//1.7 检查消息大小是否符合
int serializedSize = AbstractRecords.estimateSizeInBytesUpperBound(apiVersions.maxUsableProduceMagic(),
compression.type(), serializedKey, serializedValue, headers);
long timestamp = record.timestamp() == null ? nowMs : record.timestamp();
// 自定义partitioner
boolean abortOnNewBatch = partitioner != null;
// 1.8 将消息追加到accumulator中
RecordAccumulator.RecordAppendResult result = accumulator.append(record.topic(), partition, timestamp, serializedKey,
serializedValue, headers, appendCallbacks, remainingWaitMs, abortOnNewBatch, nowMs, cluster);
assert appendCallbacks.getPartition() != RecordMetadata.UNKNOWN_PARTITION;
// 1.9 消息入新batch的情况
if (result.abortForNewBatch) {
int prevPartition = partition;
onNewBatch(record.topic(), cluster, prevPartition);
partition = partition(record, serializedKey, serializedValue, cluster);
if (log.isTraceEnabled()) {
log.trace("Retrying append due to new batch creation for topic {} partition {}. The old partition was {}", record.topic(), partition, prevPartition);
result = accumulator.append(record.topic(), partition, timestamp, serializedKey,
serializedValue, headers, appendCallbacks, remainingWaitMs, false, nowMs, cluster);
// 2.1 开启事务的情况
if (transactionManager != null) {
// 2.2 如果batch满了,或者新batch被创建,唤醒后台sender线程
if (result.batchIsFull || result.newBatchCreated) {
log.trace("Waking up the sender since topic {} partition {} is either full or getting a new batch", record.topic(), appendCallbacks.getPartition());
return result.future;
// handling exceptions and record the errors;
// for API exceptions return them in the future, // for other exceptions throw directly } catch (ApiException e) {
log.debug("Exception occurred during message send:", e);
if (callback != null) {
TopicPartition tp = appendCallbacks.topicPartition();
RecordMetadata nullMetadata = new RecordMetadata(tp, -1, -1, RecordBatch.NO_TIMESTAMP, -1, -1);
callback.onCompletion(nullMetadata, e);
this.interceptors.onSendError(record, appendCallbacks.topicPartition(), e);
if (transactionManager != null) {
return new FutureFailure(e);
} catch (InterruptedException e) {
this.interceptors.onSendError(record, appendCallbacks.topicPartition(), e);
throw new InterruptException(e);
} catch (KafkaException e) {
this.interceptors.onSendError(record, appendCallbacks.topicPartition(), e);
throw e;
} catch (Exception e) {
// we notify interceptor about all exceptions, since onSend is called before anything else in this method
this.interceptors.onSendError(record, appendCallbacks.topicPartition(), e);
throw e;
- 若record指定partition,则直接返回。
- 若自定义了partitioner则使用自定义规则的分区计算方式
- 若指定了key并未配置
,则使用murmur2算法得出partition - 否则将partition设置为UNKNOWN_PARTITION,这会在org.apache.kafka.clients.producer.internals.RecordAccumulator#append方法中进行处理。
private int partition(ProducerRecord<K, V> record, byte[] serializedKey, byte[] serializedValue, Cluster cluster) {
if (record.partition() != null)
return record.partition();
if (partitioner != null) {
int customPartition = partitioner.partition(
record.topic(), record.key(), serializedKey, record.value(), serializedValue, cluster);
if (customPartition < 0) {
throw new IllegalArgumentException(String.format(
"The partitioner generated an invalid partition number: %d. Partition number should always be non-negative.", customPartition));
return customPartition;
if (serializedKey != null && !partitionerIgnoreKeys) {
// hash the keyBytes to choose a partition
return BuiltInPartitioner.partitionForKey(serializedKey, cluster.partitionsForTopic(record.topic()).size());
} else {
return RecordMetadata.UNKNOWN_PARTITION;
- int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster):自定义分区计算方法。
- void close():用于partitioner关闭资源的方法。
- void onNewBatch(String topic, Cluster cluster, int prevPartition):从3.3.0开始废弃,用于通知partitioner新分区被创建,sticky分区方式可以改变新分区的黏性分区。
需要注意的是, KIP-794中指出,sticky分区方式会将消息发送给更慢的broker,慢broker因此收到更多的消息,逐渐变得更慢,因此在该提案中,做了如下更新:
- partitioner默认配置设置为null,并且DefaultPartitioner和UniformStickyPartitioner都被废弃
- 添加新配置用于分区计算
RecordAccumulator是Producer用于存储batch的cache,当达到一定阈值后,会由sender线程将消息发送到Kafka broker。
* Add a record to the accumulator, return the append result * <p>
* The append result will contain the future metadata, and flag for whether the appended batch is full or a new batch is created
* <p>
* @param topic The topic to which this record is being sent
* @param partition The partition to which this record is being sent or RecordMetadata.UNKNOWN_PARTITION
* if any partition could be used * @param timestamp The timestamp of the record
* @param key The key for the record
* @param value The value for the record
* @param headers the Headers for the record
* @param callbacks The callbacks to execute
* @param maxTimeToBlock The maximum time in milliseconds to block for buffer memory to be available
* @param abortOnNewBatch A boolean that indicates returning before a new batch is created and
* running the partitioner's onNewBatch method before trying to append again * @param nowMs The current time, in milliseconds
* @param cluster The cluster metadata
*/public RecordAppendResult append(String topic,
int partition,
long timestamp,
byte[] key,
byte[] value,
Header[] headers,
AppendCallbacks callbacks,
long maxTimeToBlock,
boolean abortOnNewBatch,
long nowMs,
Cluster cluster) throws InterruptedException {
// 1.1 获取对应的topicInfo
TopicInfo topicInfo = topicInfoMap.computeIfAbsent(topic, k -> new TopicInfo(createBuiltInPartitioner(logContext, k, batchSize)));
// We keep track of the number of appending thread to make sure we do not miss batches in
// abortIncompleteBatches(). appendsInProgress.incrementAndGet();
ByteBuffer buffer = null;
if (headers == null) headers = Record.EMPTY_HEADERS;
try {
// 2. while循环处理并发竟态情况
while (true) {
// 2.1 partition取值兜底
final BuiltInPartitioner.StickyPartitionInfo partitionInfo;
final int effectivePartition;
if (partition == RecordMetadata.UNKNOWN_PARTITION) {
//2.1.1 若未指定分区,使用粘性分区(默认0)
partitionInfo = topicInfo.builtInPartitioner.peekCurrentPartitionInfo(cluster);
effectivePartition = partitionInfo.partition();
} else {
partitionInfo = null;
effectivePartition = partition;
// 2.2 更新callback中的partition
setPartition(callbacks, effectivePartition);
// 2.3 获取当前分区的deque
Deque<ProducerBatch> dq = topicInfo.batches.computeIfAbsent(effectivePartition, k -> new ArrayDeque<>());
synchronized (dq) {
// 2.4 check partition是否发生变化
if (partitionChanged(topic, topicInfo, partitionInfo, dq, nowMs, cluster))
// 2.5 调用tryAppend进行写入
RecordAppendResult appendResult = tryAppend(timestamp, key, value, headers, callbacks, dq, nowMs);
// 2.5.1 写入后result不为空,更新分区信息,细节见updatePartitionInfo
if (appendResult != null) {
// If queue has incomplete batches we disable switch (see comments in updatePartitionInfo).
boolean enableSwitch = allBatchesFull(dq);
topicInfo.builtInPartitioner.updatePartitionInfo(partitionInfo, appendResult.appendedBytes, cluster, enableSwitch);
return appendResult;
// 2.6 传入abortOnNewBatch为true,直接返回空batch,再次执行append进行写入
if (abortOnNewBatch) {
// Return a result that will cause another call to append.
return new RecordAppendResult(null, false, false, true, 0);
//2.8 buffer为空 分配空间并更新timestamp
if (buffer == null) {
byte maxUsableMagic = apiVersions.maxUsableProduceMagic();
int size = Math.max(this.batchSize, AbstractRecords.estimateSizeInBytesUpperBound(maxUsableMagic, compression.type(), key, value, headers));
log.trace("Allocating a new {} byte message buffer for topic {} partition {} with remaining timeout {}ms", size, topic, effectivePartition, maxTimeToBlock);
// This call may block if we exhausted buffer space.
buffer = free.allocate(size, maxTimeToBlock);
// Update the current time in case the buffer allocation blocked above.
// NOTE: getting time may be expensive, so calling it under a lock // should be avoided. nowMs = time.milliseconds();
//3. 如果上轮deque为空,且abortOnNewBatch=false,则尝试重新将消息写入新batch
synchronized (dq) {
// After taking the lock, validate that the partition hasn't changed and retry.
if (partitionChanged(topic, topicInfo, partitionInfo, dq, nowMs, cluster))
//3.1 将新batch加到deque,将消息加到新batch
RecordAppendResult appendResult = appendNewBatch(topic, effectivePartition, dq, timestamp, key, value, headers, callbacks, buffer, nowMs);
// Set buffer to null, so that deallocate doesn't return it back to free pool, since it's used in the batch.
if (appendResult.newBatchCreated)
buffer = null;
// If queue has incomplete batches we disable switch (see comments in updatePartitionInfo).
boolean enableSwitch = allBatchesFull(dq);
topicInfo.builtInPartitioner.updatePartitionInfo(partitionInfo, appendResult.appendedBytes, cluster, enableSwitch);
return appendResult;
} finally {
//4. 释放buffer,并且减少appendsInProgress
private final ConcurrentMap<String /*topic*/, TopicInfo> topicInfoMap = new CopyOnWriteMap<>();
private static class TopicInfo {
public final ConcurrentMap<Integer /*partition*/, Deque<ProducerBatch>> batches = new CopyOnWriteMap<>();
public final BuiltInPartitioner builtInPartitioner;
public TopicInfo(BuiltInPartitioner builtInPartitioner) {
this.builtInPartitioner = builtInPartitioner;
* Try to append to a ProducerBatch. * * If it is full, we return null and a new batch is created. We also close the batch for record appends to free up * resources like compression buffers. The batch will be fully closed (ie. the record batch headers will be written * and memory records built) in one of the following cases (whichever comes first): right before send, * if it is expired, or when the producer is closed. */private RecordAppendResult tryAppend(long timestamp, byte[] key, byte[] value, Header[] headers,
Callback callback, Deque<ProducerBatch> deque, long nowMs) {
if (closed)
throw new KafkaException("Producer closed while send in progress");
//1. 获取deque最后一个batch
ProducerBatch last = deque.peekLast();
if (last != null) {
//2. 获取当前batch的size
int initialBytes = last.estimatedSizeInBytes();
//3. 尝试将消息加到batch
FutureRecordMetadata future = last.tryAppend(timestamp, key, value, headers, callback, nowMs);
//4. 如果batch已满,关闭batch,返回null
if (future == null) {
} else {
//5. 计算写入的消息大小,并返回RecordAppendResult
int appendedBytes = last.estimatedSizeInBytes() - initialBytes;
return new RecordAppendResult(future, deque.size() > 1 || last.isFull(), false, false, appendedBytes);
//deque 为空,return null
return null;
* Append the record to the current record set and return the relative offset within that record set * * @return The RecordSend corresponding to this record or null if there isn't sufficient room.
*/public FutureRecordMetadata tryAppend(long timestamp, byte[] key, byte[] value, Header[] headers, Callback callback, long now) {
//1.1 检查MemoryRecordsBuilder是否还有足够空间用于写入
if (!recordsBuilder.hasRoomFor(timestamp, key, value, headers)) {
//1.1.1 没有空间写入直接return null
return null;
} else {
//1.2 调用append()方法写入消息,更新对应字段并return future
this.recordsBuilder.append(timestamp, key, value, headers);
this.maxRecordSize = Math.max(this.maxRecordSize, AbstractRecords.estimateSizeInBytesUpperBound(magic(),
recordsBuilder.compression().type(), key, value, headers));
this.lastAppendTime = now;
//1.3 创建返回值
FutureRecordMetadata future = new FutureRecordMetadata(this.produceFuture, this.recordCount,
key == null ? -1 : key.length,
value == null ? -1 : value.length,
// we have to keep every future returned to the users in case the batch needs to be
// split to several new batches and resent. thunks.add(new Thunk(callback, future));
return future;
private void appendWithOffset(long offset, boolean isControlRecord, long timestamp, ByteBuffer key,
ByteBuffer value, Header[] headers) {
try {
//1. 检查isControl标志是否一致
if (isControlRecord != isControlBatch)
throw new IllegalArgumentException("Control records can only be appended to control batches");
// 2. 检查offset递增
if (lastOffset != null && offset <= lastOffset)
throw new IllegalArgumentException(String.format("Illegal offset %d following previous offset %d " +
"(Offsets must increase monotonically).", offset, lastOffset));
//3. 检查时间戳
if (timestamp < 0 && timestamp != RecordBatch.NO_TIMESTAMP)
throw new IllegalArgumentException("Invalid negative timestamp " + timestamp);
//4. 只有V2版本消息,才有header
if (magic < RecordBatch.MAGIC_VALUE_V2 && headers != null && headers.length > 0)
throw new IllegalArgumentException("Magic v" + magic + " does not support record headers");
if (baseTimestamp == null)
baseTimestamp = timestamp;
//5. 写入
if (magic > RecordBatch.MAGIC_VALUE_V1) {
appendDefaultRecord(offset, timestamp, key, value, headers);
} else {
appendLegacyRecord(offset, timestamp, key, value, magic);
} catch (IOException e) {
throw new KafkaException("I/O exception when writing to the append stream, closing", e);
private void appendDefaultRecord(long offset, long timestamp, ByteBuffer key, ByteBuffer value,
Header[] headers) throws IOException {
//1. 检查appendStream状态
//2. 计算各个变量值
int offsetDelta = (int) (offset - baseOffset);
long timestampDelta = timestamp - baseTimestamp;
//3. 调用DefaultRecord类的writeTo方法,将消息写入appendStream流
int sizeInBytes = DefaultRecord.writeTo(appendStream, offsetDelta, timestampDelta, key, value, headers);
recordWritten(offset, timestamp, sizeInBytes);
this.sender = newSender(logContext, kafkaClient, this.metadata);
String ioThreadName = NETWORK_THREAD_PREFIX + " | " + clientId;
this.ioThread = new KafkaThread(ioThreadName, this.sender, true);
while (running) {
try {
} catch (Exception e) {
log.error("Uncaught error in kafka producer I/O thread: ", e);
void runOnce() {
long currentTimeMs = time.milliseconds();
long pollTimeout = sendProducerData(currentTimeMs);
//处理实际网络IO socket的入口,负责发送请求、接收响应
client.poll(pollTimeout, currentTimeMs);
- 调用RecordAccumulator的ready()方法获取可以发送的Node消息
- 调用RecordAccumulator的drain(),获取nodeId -> 待发送的ProducerBatch集合映射
- 调用sendProduceRequests()按Node分组发送请求
private long sendProducerData(long now) {
MetadataSnapshot metadataSnapshot = metadata.fetchMetadataSnapshot();
// 1.1 查询accumulator的ready()方法,获取当前已经满足发送要求的node(只需要有一个batch满足发送要求)
RecordAccumulator.ReadyCheckResult result = this.accumulator.ready(metadataSnapshot, now);
// 1.2 如果metadata中有topic的partition leader未知,先更新metadata
if (!result.unknownLeaderTopics.isEmpty()) {
// The set of topics with unknown leader contains topics with leader election pending as well as
// topics which may have expired. Add the topic again to metadata to ensure it is included // and request metadata update, since there are messages to send to the topic. for (String topic : result.unknownLeaderTopics)
this.metadata.add(topic, now);
log.debug("Requesting metadata update due to unknown leader topics from the batched records: {}",
// 1.3 删除暂未connection ready的node
Iterator<Node> iter = result.readyNodes.iterator();
long notReadyTimeout = Long.MAX_VALUE;
while (iter.hasNext()) {
Node node = iter.next();
//1.4 更新readyTimeMs
if (!this.client.ready(node, now)) {
// Update just the readyTimeMs of the latency stats, so that it moves forward
// every time the batch is ready (then the difference between readyTimeMs and // drainTimeMs would represent how long data is waiting for the node). this.accumulator.updateNodeLatencyStats(node.id(), now, false);
notReadyTimeout = Math.min(notReadyTimeout, this.client.pollDelayMs(node, now));
} else {
// Update both readyTimeMs and drainTimeMs, this would "reset" the node
// latency. this.accumulator.updateNodeLatencyStats(node.id(), now, true);
// 2.1 调用drain(),获取nodeId -> 待发送的ProducerBatch集合映射
Map<Integer, List<ProducerBatch>> batches = this.accumulator.drain(metadataSnapshot, result.readyNodes, this.maxRequestSize, now);
//2.2 调用addToInflightBatches(),将待发送的ProducerBatch集合映射添加到inFlightBatches中,这个集合记录了已经发送但未响应的ProducerBatch
//2.3 如果guaranteeMessageOrder为true,将batch添加到muted
if (guaranteeMessageOrder) {
// Mute all the partitions drained
for (List<ProducerBatch> batchList : batches.values()) {
for (ProducerBatch batch : batchList)
//2.4 重置batch到期时间
//2.5 获取过期的batch
List<ProducerBatch> expiredInflightBatches = getExpiredInflightBatches(now);
List<ProducerBatch> expiredBatches = this.accumulator.expiredBatches(now);
//2.6 循环调用failBatch()方法来处理过期的batch,内部调用ProducerBatch.done()
if (!expiredBatches.isEmpty())
log.trace("Expired {} batches in accumulator", expiredBatches.size());
for (ProducerBatch expiredBatch : expiredBatches) {
String errorMessage = "Expiring " + expiredBatch.recordCount + " record(s) for " + expiredBatch.topicPartition
+ ":" + (now - expiredBatch.createdMs) + " ms has passed since batch creation";
failBatch(expiredBatch, new TimeoutException(errorMessage), false);
if (transactionManager != null && expiredBatch.inRetry()) {
// This ensures that no new batches are drained until the current in flight batches are fully resolved.
//2.7 更新metric
//2.8 计算pollTimeout
long pollTimeout = Math.min(result.nextReadyCheckDelayMs, notReadyTimeout);
pollTimeout = Math.min(pollTimeout, this.accumulator.nextExpiryTimeMs() - now);
pollTimeout = Math.max(pollTimeout, 0);
if (!result.readyNodes.isEmpty()) {
log.trace("Nodes with data ready to send: {}", result.readyNodes);
// if some partitions are already ready to be sent, the select time would be 0;
// otherwise if some partition already has some data accumulated but not ready yet, // the select time will be the time difference between now and its linger expiry time; // otherwise the select time will be the time difference between now and the metadata expiry time; pollTimeout = 0;
//2.9 调用sendProduceRequests()按Node分组发送请求
sendProduceRequests(batches, now);
return pollTimeout;
从batchReady()方法中可以看出,是否被确定为ready node,只需要满足以下几个条件中的任何一条:
- full:full = dequeSize > 1 || batch.isFull()
- expired:当前等待时间是否大于lingerMs,其中有重试backoff参数的影响
- exhausted:buffer pool是否已满
- closed:accumulator是否被关闭
- flushInProgress():是否有其他线程在调用flush(),见:org.apache.kafka.clients.producer.KafkaProducer#flush
- transactionCompleting:若开启事务,且事务正准备完成
* Add the leader to the ready nodes if the batch is ready * * @param exhausted 'true' is the buffer pool is exhausted
* @param part The partition
* @param leader The leader for the partition
* @param waitedTimeMs How long batch waited
* @param backingOff Is backing off
* @param backoffAttempts Number of attempts for calculating backoff delay
* @param full Is batch full
* @param nextReadyCheckDelayMs The delay for next check
* @param readyNodes The set of ready nodes (to be filled in)
* @return The delay for next check
*/private long batchReady(boolean exhausted, TopicPartition part, Node leader,
long waitedTimeMs, boolean backingOff, int backoffAttempts,
boolean full, long nextReadyCheckDelayMs, Set<Node> readyNodes) {
if (!readyNodes.contains(leader) && !isMuted(part)) {
long timeToWaitMs = backingOff ? retryBackoff.backoff(backoffAttempts > 0 ? backoffAttempts - 1 : 0) : lingerMs;
boolean expired = waitedTimeMs >= timeToWaitMs;
boolean transactionCompleting = transactionManager != null && transactionManager.isCompleting();
boolean sendable = full
|| expired
|| exhausted
|| closed
|| flushInProgress()
|| transactionCompleting;
if (sendable && !backingOff) {
} else {
long timeLeftMs = Math.max(timeToWaitMs - waitedTimeMs, 0);
// Note that this results in a conservative estimate since an un-sendable partition may have
// a leader that will later be found to have sendable data. However, this is good enough // since we'll just wake up and then sleep again for the remaining time. nextReadyCheckDelayMs = Math.min(timeLeftMs, nextReadyCheckDelayMs);
return nextReadyCheckDelayMs;
* Transfer the record batches into a list of produce requests on a per-node basis */private void sendProduceRequests(Map<Integer, List<ProducerBatch>> collated, long now) {
for (Map.Entry<Integer, List<ProducerBatch>> entry : collated.entrySet())
https://docs.confluent.io/kafka-client/overview.html https://learn.conduktor.io/kafka/kafka-producers-advanced