1. Intro

Kafka事务主要适用于以下两种场景:

  1. multi-produce场景:Producer需要将多批次消息进行原子性提交。
  2. consume-transform-produce场景:消费上游数据后,经过处理,生产下游数据。该场景实现可以参考kafka.examples.ExactlyOnceMessageProcessor,事务可以保证消费和生产的原子性。

consume-transform-produce场景

consume-transform-produce场景案例:

@Override  
public void run() {  
    int retries = 0;  
    int processedRecords = 0;  
    long remainingRecords = Long.MAX_VALUE;  
  
    // it is recommended to have a relatively short txn timeout in order to clear pending offsets faster  
    int transactionTimeoutMs = 10_000;  
    // consumer must be in read_committed mode, which means it won't be able to read uncommitted data  
    boolean readCommitted = true;  
  
    try (KafkaProducer<Integer, String> producer = new Producer("processor-producer", bootstrapServers, outputTopic,  
            true, transactionalId, true, -1, transactionTimeoutMs, null).createKafkaProducer();  
         KafkaConsumer<Integer, String> consumer = new Consumer("processor-consumer", bootstrapServers, inputTopic,  
             "processor-group", Optional.of(groupInstanceId), readCommitted, -1, null).createKafkaConsumer()) {  
        // called first and once to fence zombies and abort any pending transaction  
        producer.initTransactions();  
        consumer.subscribe(singleton(inputTopic), this);  
  
        Utils.printOut("Processing new records");  
        while (!closed && remainingRecords > 0) {  
            try {  
                ConsumerRecords<Integer, String> records = consumer.poll(ofMillis(200));  
                if (!records.isEmpty()) {  
                    // begin a new transaction session  
                    producer.beginTransaction();  
  
                    for (ConsumerRecord<Integer, String> record : records) {  
                        // process the record and send downstream  
                        ProducerRecord<Integer, String> newRecord =  
                            new ProducerRecord<>(outputTopic, record.key(), record.value() + "-ok");  
                        producer.send(newRecord);  
                    }  
  
                    // checkpoint the progress by sending offsets to group coordinator broker  
                    // note that this API is only available for broker >= 2.5                    
	            producer.sendOffsetsToTransaction(getOffsetsToCommit(consumer), consumer.groupMetadata());  
  
                    // commit the transaction including offsets  
                    producer.commitTransaction();  
                    processedRecords += records.count();  
                    retries = 0;  
                }  
            } catch (AuthorizationException | UnsupportedVersionException | ProducerFencedException  
                     | FencedInstanceIdException | OutOfOrderSequenceException | SerializationException e) {  
                // we can't recover from these exceptions  
                Utils.printErr(e.getMessage());  
                shutdown();  
            } catch (OffsetOutOfRangeException | NoOffsetForPartitionException e) {  
                // invalid or no offset found without auto.reset.policy  
                Utils.printOut("Invalid or no offset found, using latest");  
                consumer.seekToEnd(emptyList());  
                consumer.commitSync();  
                retries = 0;  
            } catch (KafkaException e) {  
                // abort the transaction  
                Utils.printOut("Aborting transaction: %s", e.getMessage());  
                producer.abortTransaction();  
                retries = maybeRetry(retries, consumer);  
            }  
  
            remainingRecords = getRemainingRecords(consumer);  
            if (remainingRecords != Long.MAX_VALUE) {  
                Utils.printOut("Remaining records: %d", remainingRecords);  
            }  
        }  
    } catch (Throwable e) {  
        Utils.printErr("Unhandled exception");  
        e.printStackTrace();  
    }  
    Utils.printOut("Processed %d records", processedRecords);  
    shutdown();  
}

1.1. 客户端配置项

Producer配置项:

  1. enable.idempotence:默认false,开启后会设置acks=all, retries=Integer.MAX_VALUE,max.inflight.requests.per.connection=1
  2. transactional.id:事务ID,用于标识事务,允许跨多个client

Consumer配置项:

  1. isolation.level:默认值为read_uncommitted
    • read_uncommitted:允许消费暂未提交事务的message
    • read_commited:允许消费除未提交事务以外的message

2. 设计

2.1. 事务流程

流程图

  1. client发送FindCoordinatorRequest请求给broker端,获取Transaction Coordinator服务地址。
  2. client调用initializeTransactions方法发送InitProducerIdRequest请求给Transaction Coordinator,用以获取producerId。
  3. client调用beginTransaction方法,开启一次事务,client本地会改变事务状态,并不会影响Transaction Coordinator服务。
  4. 事务过程:
    1. client开启事务后,当新的TopicPartition写入数据时,Producer会向Transaction Coordinator发送AddPartitionsToTxnRequest,Transaction Coordinator记录对应的事务分区
    2. client向TopicPartition的leader endpoint发送ProduceRequest
    3. client调用sendOffsetsToTransaction方法向Transaction Coordinator发送AddOffsetsToTxnRequest,TC会将对应的消费记录存储到事务日志中。
    4. client完成向Transaction Coordinator发送消费记录后,client将会发送TxnOffsetCommitRequest给consumer coordinator
  5. 事务提交 or 事务回滚
    1. 调用commitTransaction/abortTransaction方法,向Transaction Coordinator发送EndTxnRequest
    2. Transaction Coordinator向TopicPartition的leader endpoint发送WriteTxnMarkerRequest,Broker端会根据情况选择commit或rollback
    3. Transaction Coordinator将事务结果写入事务日志中

3. Producer端实现

3.1. 接口概述

Java客户端中KafkaProducer提供了以下接口,供用户实现事务需求:

public interface Producer<K, V> extends Closeable {  

//初始化事务,申请producer id等事务字段
void initTransactions();  

//开启事务,改变事务状态
void beginTransaction() throws ProducerFencedException;  

//提交消费位置,offsets是每个分区的消费位置,consumerGroupId为消费组id,允许将consumer的消费进度和producer绑定在同一个事务
void sendOffsetsToTransaction(Map<TopicPartition, OffsetAndMetadata> offsets,  
						  ConsumerGroupMetadata groupMetadata) throws ProducerFencedException;  

//发送事务提交请求
void commitTransaction() throws ProducerFencedException;  

//发送事务回滚请求
void abortTransaction() throws ProducerFencedException;

}

上述方法通过transactionManager对象,向Transaction Coordinator发送事务请求,并处理响应结果,主要包含以下几类请求:

  1. FindCoordinatorRequest:寻找Transaction Coordinator服务地址,用于sender线程发送事务请求前,或者其他响应结果返回coordinator错误等信息时调用。
  2. InitProducerIdRequest:事务初始化请求
  3. TxnOffsetCommitRequest:事务消费offset提交请求
  4. AddPartitionsToTxnRequest:事务分区上传请求
  5. EndTxnRequest:事务提交/回滚请i去

上述几类请求,都使用抽象类TxnRequestHandler进行包装,并处理来自Transaction Coordinator的响应结果。队列请求由Sender线程进行异步发送。

//向broker transaction coordinator发送请求的优先级队列  
private final PriorityQueue<TxnRequestHandler> pendingRequests;
abstract class TxnRequestHandler implements RequestCompletionHandler {  
    protected final TransactionalRequestResult result;  
    private boolean isRetry = false;

abstract void handleResponse(AbstractResponse responseBody);
}

事务请求handler

3.2. initializeTransactions

initializeTransactions方法提供给Producer初始化事务:

synchronized TransactionalRequestResult initializeTransactions(ProducerIdAndEpoch producerIdAndEpoch) {  
    maybeFailWithError();  
  
    //1.1 检查ProducerIdAndEpoch是否为空  
    boolean isEpochBump = producerIdAndEpoch != ProducerIdAndEpoch.NONE;  
  
    //通过handleCachedTransactionRequestResult方法处理事务请求结果  
    return handleCachedTransactionRequestResult(() -> {  
        // If this is an epoch bump, we will transition the state as part of handling the EndTxnRequest  
        if (!isEpochBump) {  
            //1.2 初始化事务状态为INITIALIZING  
            transitionTo(State.INITIALIZING);  
            log.info("Invoking InitProducerId for the first time in order to acquire a producer ID");  
        } else {  
            log.info("Invoking InitProducerId with current producer ID and epoch {} in order to bump the epoch", producerIdAndEpoch);  
        }  
        //1.3 向broker端发送InitProducerIdRequest请求  
        InitProducerIdRequestData requestData = new InitProducerIdRequestData()  
                .setTransactionalId(transactionalId)  
                .setTransactionTimeoutMs(transactionTimeoutMs)  
                .setProducerId(producerIdAndEpoch.producerId)  
                .setProducerEpoch(producerIdAndEpoch.epoch);  
        InitProducerIdHandler handler = new InitProducerIdHandler(new InitProducerIdRequest.Builder(requestData),  
                isEpochBump);  
        enqueueRequest(handler);  
        return handler.result;  
    }, State.INITIALIZING, "initTransactions");  
}

InitProducerIdHandler对响应结果的处理较为简单:

  1. 如果请求成功,读取ProducerIdAndEpoch,并设置事务状态为READY
  2. 如果是transaction coordinator类错误,则发送则发送FindCoordinatorRequest后重新发送InitProducerIdRequest
  3. 如果响应中的错误是RetriableException,则重新发送InitProducerIdRequest
  4. 其他类型错误抛出异常
public void handleResponse(AbstractResponse response) {  
    InitProducerIdResponse initProducerIdResponse = (InitProducerIdResponse) response;  
    Errors error = initProducerIdResponse.error();  
  
    //1.1 如果请求成功,读取ProducerIdAndEpoch,并设置事务状态为READY  
    if (error == Errors.NONE) {  
        ProducerIdAndEpoch producerIdAndEpoch = new ProducerIdAndEpoch(initProducerIdResponse.data().producerId(),  
                initProducerIdResponse.data().producerEpoch());  
        setProducerIdAndEpoch(producerIdAndEpoch);  
        transitionTo(State.READY);  
        lastError = null;  
        if (this.isEpochBump) {  
            resetSequenceNumbers();  
        }  
        result.done();  
    } else if (error == Errors.NOT_COORDINATOR || error == Errors.COORDINATOR_NOT_AVAILABLE) {  
        //1.2 如果响应中的错误是NOT_COORDINATOR或COORDINATOR_NOT_AVAILABLE,则发送FindCoordinatorRequest后重新发送InitProducerIdRequest  
        lookupCoordinator(FindCoordinatorRequest.CoordinatorType.TRANSACTION, transactionalId);  
        reenqueue();  
    } else if (error.exception() instanceof RetriableException) {  
        //1.3 如果响应中的错误是RetriableException,则重新发送InitProducerIdRequest  
        reenqueue();  
    } else if (error == Errors.TRANSACTIONAL_ID_AUTHORIZATION_FAILED ||  
            error == Errors.CLUSTER_AUTHORIZATION_FAILED) {  
        //1.4 其它类型的错误,抛出异常  
        log.info("Abortable authorization error: {}.  Transition the producer state to {}", error.message(), State.ABORTABLE_ERROR);  
        lastError = error.exception();  
        abortableError(error.exception());  
    } else if (error == Errors.INVALID_PRODUCER_EPOCH || error == Errors.PRODUCER_FENCED) {  
        // We could still receive INVALID_PRODUCER_EPOCH from old versioned transaction coordinator,  
        // just treat it the same as PRODUCE_FENCED.        fatalError(Errors.PRODUCER_FENCED.exception());  
    } else if (error == Errors.TRANSACTION_ABORTABLE) {  
        abortableError(error.exception());  
    } else {  
        fatalError(new KafkaException("Unexpected error in InitProducerIdResponse; " + error.message()));  
    }  
}

3.3. beginTransaction

beginTransaction方法用于开启事务,实现逻辑较为简单:将事务状态置为IN_TRANSACTION。

public synchronized void beginTransaction() {  
    // 确保transactionalId事务id不为空  
    ensureTransactional();  
    throwIfPendingState("beginTransaction");  
    maybeFailWithError();  
  
    //状态置为IN_TRANSACTION  
    transitionTo(State.IN_TRANSACTION);  
}

3.4. addPartitionsToTransactionHandler

在消息发送流程org.apache.kafka.clients.producer.KafkaProducer#doSend中,开启了事务的情况下,会将topicPartition存储在transactionManager的newPartitionsInTransaction中:

private Future<RecordMetadata> doSend(ProducerRecord<K, V> record, Callback callback) {
	if (transactionManager != null) {  
	    transactionManager.maybeAddPartition(appendCallbacks.topicPartition());  
	}
}

public synchronized void maybeAddPartition(TopicPartition topicPartition) {  
    maybeFailWithError();  
    throwIfPendingState("send");  
  
    if (isTransactional()) {  
        if (!hasProducerId()) {  
            throw new IllegalStateException("Cannot add partition " + topicPartition +  
                " to transaction before completing a call to initTransactions");  
        } else if (currentState != State.IN_TRANSACTION) {  
            throw new IllegalStateException("Cannot add partition " + topicPartition +  
                " to transaction while in state  " + currentState);  
        } else if (isPartitionAdded(topicPartition) || isPartitionPendingAdd(topicPartition)) {  
            return;  
        } else {  
            log.debug("Begin adding new partition {} to transaction", topicPartition);  
            txnPartitionMap.getOrCreate(topicPartition);  
            newPartitionsInTransaction.add(topicPartition);  
        }  
    }  
}

TransactionManager提供了以下容器用于存储各种状态的TopicPartition:

//待封装成Request的TopicPartition
private final Set<TopicPartition> newPartitionsInTransaction;  
//已封装成Request的TopicPartition,待发送到broker
private final Set<TopicPartition> pendingPartitionsInTransaction; 
////已经上传的TopicPartition
private final Set<TopicPartition> partitionsInTransaction;

addPartitionsToTransactionHandler方法会将newPartitionsInTransaction封装到AddPartitionsToTxnRequest中:

private TxnRequestHandler addPartitionsToTransactionHandler() {  
    pendingPartitionsInTransaction.addAll(newPartitionsInTransaction);  
    newPartitionsInTransaction.clear();  
    AddPartitionsToTxnRequest.Builder builder =  
        AddPartitionsToTxnRequest.Builder.forClient(transactionalId,  
            producerIdAndEpoch.producerId,  
            producerIdAndEpoch.epoch,  
            new ArrayList<>(pendingPartitionsInTransaction));  
    return new AddPartitionsToTxnHandler(builder);  
}

AddPartitionsToTxnRequest请求的发送时机有以下两种情况:

  1. 提交/结束事务时,检查newPartitionsInTransaction是否为空
  2. Sender线程发送事务请求时,nextRequest方法中会检查是否需要发送AddPartitionsToTxnRequest

AddPartitionsToTxnHandler中对于响应的处理逻辑较为简单,除去异常处理逻辑后,主要是处理partition容器的相关逻辑:

public void handleResponse(AbstractResponse response) {  
    AddPartitionsToTxnResponse addPartitionsToTxnResponse = (AddPartitionsToTxnResponse) response;  
    Map<TopicPartition, Errors> errors = addPartitionsToTxnResponse.errors().get(AddPartitionsToTxnResponse.V3_AND_BELOW_TXN_ID);  
    boolean hasPartitionErrors = false;  
    Set<String> unauthorizedTopics = new HashSet<>();  
    retryBackoffMs = TransactionManager.this.retryBackoffMs;  

    Set<TopicPartition> partitions = errors.keySet();  
    pendingPartitionsInTransaction.removeAll(partitions);  
  
    if (!unauthorizedTopics.isEmpty()) {  
        abortableError(new TopicAuthorizationException(unauthorizedTopics));  
    } else if (hasPartitionErrors) {  
        abortableError(new KafkaException("Could not add partitions to transaction due to errors: " + errors));  
    } else {  
        log.debug("Successfully added partitions {} to transaction", partitions);  
        partitionsInTransaction.addAll(partitions);  
        transactionStarted = true;  
        result.done();  
    }  
}

3.5. sendOffsetsToTransaction

sendOffsetsToTransaction方法允许在 Kafka 事务中提交消费者的偏移量,确保消息处理和偏移量提交是一个原子操作。发送AddOffsetsToTxnRequest:

public synchronized TransactionalRequestResult sendOffsetsToTransaction(final Map<TopicPartition, OffsetAndMetadata> offsets,  
                                                                        final ConsumerGroupMetadata groupMetadata) {  
    ensureTransactional();  
    throwIfPendingState("sendOffsetsToTransaction");  
    maybeFailWithError();  
  
    //检查是否为IN_TRANSACTION状态  
    if (currentState != State.IN_TRANSACTION) {  
        throw new IllegalStateException("Cannot send offsets if a transaction is not in progress " +  
            "(currentState= " + currentState + ")");  
    }  
  
    //发送AddOffsetsToTxnRequest到broker端  
    log.debug("Begin adding offsets {} for consumer group {} to transaction", offsets, groupMetadata);  
    AddOffsetsToTxnRequest.Builder builder = new AddOffsetsToTxnRequest.Builder(  
        new AddOffsetsToTxnRequestData()  
            .setTransactionalId(transactionalId)  
            .setProducerId(producerIdAndEpoch.producerId)  
            .setProducerEpoch(producerIdAndEpoch.epoch)  
            .setGroupId(groupMetadata.groupId())  
    );  
    AddOffsetsToTxnHandler handler = new AddOffsetsToTxnHandler(builder, offsets, groupMetadata);  
  
    enqueueRequest(handler);  
    return handler.result;  
}

AddOffsetsToTxnHandler响应的处理逻辑很简单:如果请求成功,发送TxnOffsetCommitRequest给Group Coordinator。

public void handleResponse(AbstractResponse response) {  
    AddOffsetsToTxnResponse addOffsetsToTxnResponse = (AddOffsetsToTxnResponse) response;  
    Errors error = Errors.forCode(addOffsetsToTxnResponse.data().errorCode());  
  
    if (error == Errors.NONE) {  
        //1.1 请求成功的情况  
        log.debug("Successfully added partition for consumer group {} to transaction", builder.data.groupId());  
  
        //1.2 成功后会发送TxnOffsetCommitRequest  
        // note the result is not completed until the TxnOffsetCommit returns
        pendingRequests.add(txnOffsetCommitHandler(result, offsets, groupMetadata));  
  
        transactionStarted = true;  
    }
}

3.6. beginCommit && beginAbort

事务提交或回滚逻辑都是通过beginCompletingTransaction方法完成,区别在于事务状态的修改,beginCompletingTransaction方法会提前检查最近一次请求error是否为InvalidPidMappingException,若是则初始化事务状态,其他情况发送EndTxnRequest:

private TransactionalRequestResult beginCompletingTransaction(TransactionResult transactionResult) {  
  
    //1.1 检查是否需要发送AddPartitionsToTxnRequest  
    if (!newPartitionsInTransaction.isEmpty())  
        enqueueRequest(addPartitionsToTransactionHandler());  
  
    // If the error is an INVALID_PRODUCER_ID_MAPPING error, the server will not accept an EndTxnRequest, so skip  
    // directly to InitProducerId. Otherwise, we must first abort the transaction, because the producer will be    // fenced if we directly call InitProducerId.  
    //1.2 如果是INVALID_PRODUCER_ID_MAPPING,broker不会接受EndTxnRequest请求,直接转为初始化initializeTransactions   
     if (!(lastError instanceof InvalidPidMappingException)) {  
        //1.3 向broker端发送EndTxnRequest请求  
        EndTxnRequest.Builder builder = new EndTxnRequest.Builder(  
                new EndTxnRequestData()  
                        .setTransactionalId(transactionalId)  
                        .setProducerId(producerIdAndEpoch.producerId)  
                        .setProducerEpoch(producerIdAndEpoch.epoch)  
                        .setCommitted(transactionResult.id));  
  
        EndTxnHandler handler = new EndTxnHandler(builder);  
        enqueueRequest(handler);  
        if (!epochBumpRequired) {  
            return handler.result;  
        }  
    }  
  
    return initializeTransactions(this.producerIdAndEpoch);  
}

EndTxnHandler中,会将TransactionManager状态改为READY。

public void handleResponse(AbstractResponse response) {  
    EndTxnResponse endTxnResponse = (EndTxnResponse) response;  
    Errors error = endTxnResponse.error();  
  
    if (error == Errors.NONE) {  
        //1.1 请求成功,更新事务状态为READY  
        completeTransaction();  
        result.done();  
    } 
}

4. Broker端实现

4.1. TransactionMetadata

TransactionMetadata是一次事务的元数据,包含以下字段:

  • transactionalId:事务唯一标识
  • producerId:生产者id
  • txnTimeoutMs:事务超时时间
  • TransactionState:事务状态
  • topicPartitions:本次事务涉及的TopicPartition
private[transaction] class TransactionMetadata(val transactionalId: String,  
                                               var producerId: Long,  
                                               var lastProducerId: Long,  
                                               var producerEpoch: Short,  
                                               var lastProducerEpoch: Short,  
                                               var txnTimeoutMs: Int,  
                                               var state: TransactionState,  
                                               val topicPartitions: mutable.Set[TopicPartition],  
                                               @volatile var txnStartTimestamp: Long = -1,  
                                               @volatile var txnLastUpdateTimestamp: Long) extends Logging {  
  
  // pending state is used to indicate the state that this transaction is going to  
  // transit to, and for blocking future attempts to transit it again if it is not legal;  // initialized as the same as the current state  
  var pendingState: Option[TransactionState] = None
  }

4.2. TransactionStateManager

负责管理:

  1. 事务日志
  2. 事务metadata
  3. 事务过期逻辑

4.3. handleFindCoordinator

FindCoordinatorRequest是client端用于寻找Coordinator的请求,在broker端由kafka.server.KafkaApis#getCoordinator方法用于处理FindCoordinator请求(包含Group和Transaction两种类型),去除掉了参数校验的逻辑:

private def getCoordinator(request: RequestChannel.Request, keyType: Byte, key: String): (Errors, Node) = {  
  else {  
    val (partition, internalTopicName) = CoordinatorType.forId(keyType) match {  
      case CoordinatorType.GROUP =>  
        (groupCoordinator.partitionFor(key), GROUP_METADATA_TOPIC_NAME)  
  
      case CoordinatorType.TRANSACTION =>  
        (txnCoordinator.partitionFor(key), TRANSACTION_STATE_TOPIC_NAME)  
  
      case CoordinatorType.SHARE =>  
        // When share coordinator support is implemented in KIP-932, a proper check will go here  
        return (Errors.COORDINATOR_NOT_AVAILABLE, Node.noNode)  
    }  
  
    val topicMetadata = metadataCache.getTopicMetadata(Set(internalTopicName), request.context.listenerName)  
  
    if (topicMetadata.headOption.isEmpty) {  
      val controllerMutationQuota = quotas.controllerMutation.newPermissiveQuotaFor(request)  
      autoTopicCreationManager.createTopics(Seq(internalTopicName).toSet, controllerMutationQuota, None)  
      (Errors.COORDINATOR_NOT_AVAILABLE, Node.noNode)  
    } else {  
      if (topicMetadata.head.errorCode != Errors.NONE.code) {  
        (Errors.COORDINATOR_NOT_AVAILABLE, Node.noNode)  
      } else {  
        val coordinatorEndpoint = topicMetadata.head.partitions.asScala  
          .find(_.partitionIndex == partition)  
          .filter(_.leaderId != MetadataResponse.NO_LEADER_ID)  
          .flatMap(metadata => metadataCache.  
              getAliveBrokerNode(metadata.leaderId, request.context.listenerName))  
  
        coordinatorEndpoint match {  
          case Some(endpoint) =>  
            (Errors.NONE, endpoint)  
          case _ =>  
            (Errors.COORDINATOR_NOT_AVAILABLE, Node.noNode)  
        }  
      }  
    }  
  }  
}

事务Coordinator定位逻辑通过传入transactionalId的hashcode对配置项transaction.state.log.num.partitions取模:

def partitionFor(transactionalId: String): Int = Utils.abs(transactionalId.hashCode) % transactionTopicPartitionCount

该配置默认值为50:

public static final String TRANSACTIONS_TOPIC_PARTITIONS_CONFIG = "transaction.state.log.num.partitions";  
public static final int TRANSACTIONS_TOPIC_PARTITIONS_DEFAULT = 50;

4.4. handleInitProducerId

broker端处理InitProducerIdRequest的流程如下:

  1. 检查传入的transactionalId是否为空,若为空直接生成producerId并返回
  2. 如果不为空,检查是否存在该transactionalId对应的事务信息
  3. 如果不存在事务信息,则生成新的producerId,并更新事务信息到transaction metadata
  4. 调用prepareInitProducerIdTransit方法,初始化事务状态
  5. 如果当前事务状态为PrepareEpochFence,说明事务已经被新的producer使用,返回error,并结束该事务。
  6. 提交事务信息到事务日志
def handleInitProducerId(transactionalId: String,  
                         transactionTimeoutMs: Int,  
                         expectedProducerIdAndEpoch: Option[ProducerIdAndEpoch],  
                         responseCallback: InitProducerIdCallback,  
                         requestLocal: RequestLocal = RequestLocal.NoCaching): Unit = {  
  
  //1.1 检查传入的transactionalId是否为空  
  if (transactionalId == null) {  
    // if the transactional id is null, then always blindly accept the request  
    // and return a new producerId from the producerId manager  
    //1.2 生成新的producerId    producerIdManager.generateProducerId() match {  
      case Success(producerId) =>  
        responseCallback(InitProducerIdResult(producerId, producerEpoch = 0, Errors.NONE))  
      case Failure(exception) =>  
        responseCallback(initTransactionError(Errors.forException(exception)))  
    }  
  } else if (transactionalId.isEmpty) {  
    // if transactional id is empty then return error as invalid request. This is  
    // to make TransactionCoordinator's behavior consistent with producer client    responseCallback(initTransactionError(Errors.INVALID_REQUEST))  
  } else if (!txnManager.validateTransactionTimeoutMs(transactionTimeoutMs)) {  
    // check transactionTimeoutMs is not larger than the broker configured maximum allowed value  
    responseCallback(initTransactionError(Errors.INVALID_TRANSACTION_TIMEOUT))  
  } else {  
    //2.1 检查是否存在该transactionalId对应的事务信息  
    val coordinatorEpochAndMetadata = txnManager.getTransactionState(transactionalId).flatMap {  
      case None =>  
        //2.2 如果不存在事务信息,生成新的producerId,并更新事务信息到metadata  
        producerIdManager.generateProducerId() match {  
          case Success(producerId) =>  
            val createdMetadata = new TransactionMetadata(transactionalId = transactionalId,  
              producerId = producerId,  
              lastProducerId = RecordBatch.NO_PRODUCER_ID,  
              producerEpoch = RecordBatch.NO_PRODUCER_EPOCH,  
              lastProducerEpoch = RecordBatch.NO_PRODUCER_EPOCH,  
              txnTimeoutMs = transactionTimeoutMs,  
              state = Empty,  
              topicPartitions = collection.mutable.Set.empty[TopicPartition],  
              txnLastUpdateTimestamp = time.milliseconds())  
            txnManager.putTransactionStateIfNotExists(createdMetadata)  
  
          case Failure(exception) =>  
            Left(Errors.forException(exception))  
        }  
  
      case Some(epochAndTxnMetadata) => Right(epochAndTxnMetadata)  
    }  
  
    val result: ApiResult[(Int, TxnTransitMetadata)] = coordinatorEpochAndMetadata.flatMap {  
      existingEpochAndMetadata =>  
        val coordinatorEpoch = existingEpochAndMetadata.coordinatorEpoch  
        val txnMetadata = existingEpochAndMetadata.transactionMetadata  
  
        //3.1 调用prepareInitProducerIdTransit方法,初始化事务  
        txnMetadata.inLock {  
          prepareInitProducerIdTransit(transactionalId, transactionTimeoutMs, coordinatorEpoch, txnMetadata,  
            expectedProducerIdAndEpoch)  
        }  
    }  
  
    result match {  
      case Left(error) =>  
        responseCallback(initTransactionError(error))  
  
      case Right((coordinatorEpoch, newMetadata)) =>  
        //4.1 如果当前事务状态为PrepareEpochFence,说明事务已经被新的producer使用,直接返回error  
        if (newMetadata.txnState == PrepareEpochFence) {  
          // abort the ongoing transaction and then return CONCURRENT_TRANSACTIONS to let client wait and retry  
          def sendRetriableErrorCallback(error: Errors): Unit = {  
            if (error != Errors.NONE) {  
              responseCallback(initTransactionError(error))  
            } else {  
              responseCallback(initTransactionError(Errors.CONCURRENT_TRANSACTIONS))  
            }  
          }  
  
          //4.2 结束当前事务  
          endTransaction(transactionalId,  
            newMetadata.producerId,  
            newMetadata.producerEpoch,  
            TransactionResult.ABORT,  
            isFromClient = false,  
            sendRetriableErrorCallback,  
            requestLocal)  
        } else {  
          def sendPidResponseCallback(error: Errors): Unit = {  
            if (error == Errors.NONE) {  
              info(s"Initialized transactionalId $transactionalId with producerId ${newMetadata.producerId} and producer " +  
                s"epoch ${newMetadata.producerEpoch} on partition " +  
                s"${Topic.TRANSACTION_STATE_TOPIC_NAME}-${txnManager.partitionFor(transactionalId)}")  
              responseCallback(initTransactionMetadata(newMetadata))  
            } else {  
              info(s"Returning $error error code to client for $transactionalId's InitProducerId request")  
              responseCallback(initTransactionError(error))  
            }  
          }  
  
          //5.1 提交事务信息到事务日志  
          txnManager.appendTransactionToLog(transactionalId, coordinatorEpoch, newMetadata,  
            sendPidResponseCallback, requestLocal = requestLocal)  
        }  
    }  
  }  
}

4.5. handleAddPartitionsToTransaction

AddPartitionsToTxnRequest和AddOffsetsToTxnRequest请求都是由handleAddPartitionsToTransaction方法进行处理,区别在于后者的TopicPartition对象为__consumer_offsets,而前者是具体的业务topic。

它的处理逻辑较为简单:

  1. 将TopicPartition写入内存metadata中
  2. 将事务metadata写入事务日志持久化
def handleAddPartitionsToTransaction(transactionalId: String,  
                                     producerId: Long,  
                                     producerEpoch: Short,  
                                     partitions: collection.Set[TopicPartition],  
                                     responseCallback: AddPartitionsCallback,  
                                     requestLocal: RequestLocal = RequestLocal.NoCaching): Unit = {  
  if (transactionalId == null || transactionalId.isEmpty) {  
    debug(s"Returning ${Errors.INVALID_REQUEST} error code to client for $transactionalId's AddPartitions request")  
    responseCallback(Errors.INVALID_REQUEST)  
  } else {  

    val result: ApiResult[(Int, TxnTransitMetadata)] = txnManager.getTransactionState(transactionalId).flatMap {  
      case None => Left(Errors.INVALID_PRODUCER_ID_MAPPING)  
  
      case Some(epochAndMetadata) =>  
        val coordinatorEpoch = epochAndMetadata.coordinatorEpoch  
        val txnMetadata = epochAndMetadata.transactionMetadata  
  
        // generate the new transaction metadata with added partitions  
        txnMetadata.inLock {  
          else {  
            Right(coordinatorEpoch, txnMetadata.prepareAddPartitions(partitions.toSet, time.milliseconds()))  
          }  
        }  
    }  
  
    result match {  
      case Left(err) =>  
        debug(s"Returning $err error code to client for $transactionalId's AddPartitions request")  
        responseCallback(err)  
	  ////2.1 将事务信息写入事务日志中
      case Right((coordinatorEpoch, newMetadata)) =>  
        txnManager.appendTransactionToLog(transactionalId, coordinatorEpoch, newMetadata,  
          responseCallback, requestLocal = requestLocal)  
    }  
  }  
}

4.6. handleTxnCommitOffsets

TxnOffsetCommitRequest的请求处理逻辑由Group Coordinator进行处理,

def handleTxnCommitOffsets(groupId: String,  
                           transactionalId: String,  
                           producerId: Long,  
                           producerEpoch: Short,  
                           memberId: String,  
                           groupInstanceId: Option[String],  
                           generationId: Int,  
                           offsetMetadata: immutable.Map[TopicIdPartition, OffsetAndMetadata],  
                           responseCallback: immutable.Map[TopicIdPartition, Errors] => Unit,  
                           requestLocal: RequestLocal = RequestLocal.NoCaching,  
                           apiVersion: Short): Unit = {  
  validateGroupStatus(groupId, ApiKeys.TXN_OFFSET_COMMIT) match {  
  
    //1.1 验证消费者组的状态  
    case Some(error) => responseCallback(offsetMetadata.map { case (k, _) => k -> error })  
    case None =>  
      //1.2 获取消费者组信息  
      val group = groupManager.getGroup(groupId).getOrElse {  
        groupManager.addGroup(new GroupMetadata(groupId, Empty, time))  
      }  
  
      //1.3 获取偏移主题分区信息  
      val offsetTopicPartition = new TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, partitionFor(group.groupId))  
  
      def postVerificationCallback(  
        newRequestLocal: RequestLocal,  
        errorAndGuard: (Errors, VerificationGuard)  
      ): Unit = {  
        val (error, verificationGuard) = errorAndGuard  
        if (error != Errors.NONE) {  
          val finalError = GroupMetadataManager.maybeConvertOffsetCommitError(error)  
          responseCallback(offsetMetadata.map { case (k, _) => k -> finalError })  
        } else {  
  
          //2.1 存储事务提交偏移  
          doTxnCommitOffsets(group, memberId, groupInstanceId, generationId, producerId, producerEpoch,  
            offsetTopicPartition, offsetMetadata, newRequestLocal, responseCallback, Some(verificationGuard))  
        }  
      }  
  
      val transactionSupportedOperation = if (apiVersion >= 4) genericError else defaultError  
  
      //3.1 检查topicPartition是否支持事务性写入  
      groupManager.replicaManager.maybeStartTransactionVerificationForPartition(  
        topicPartition = offsetTopicPartition,  
        transactionalId,  
        producerId,  
        producerEpoch,  
        RecordBatch.NO_SEQUENCE,  
        // Wrap the callback to be handled on an arbitrary request handler thread  
        // when transaction verification is complete. The request local passed in        // is only used when the callback is executed immediately.        
        KafkaRequestHandler.wrapAsyncCallback(  
          postVerificationCallback,  
          requestLocal  
        ),  
        transactionSupportedOperation  
      )  
  }  
}

在实际写入消费记录时,会检查是否为事务提交:

if (isTxnOffsetCommit) {  
  addProducerGroup(producerId, group.groupId)  
  group.prepareTxnOffsetCommit(producerId, filteredOffsetMetadata)  
} else {  
  group.prepareOffsetCommit(filteredOffsetMetadata)  
}

提交的消费位移记录会暂时保存在map中,等执行结束/回滚事务时再执行相应操作:

private val pendingTransactionalOffsetCommits = new mutable.HashMap[Long, mutable.Map[TopicPartition, CommitRecordMetadataAndOffset]]() 

def prepareTxnOffsetCommit(producerId: Long, offsets: Map[TopicIdPartition, OffsetAndMetadata]): Unit = {  
  trace(s"TxnOffsetCommit for producer $producerId and group $groupId with offsets $offsets is pending")  
  receivedTransactionalOffsetCommits = true  
  val producerOffsets = pendingTransactionalOffsetCommits.getOrElseUpdate(producerId,  
    mutable.Map.empty[TopicPartition, CommitRecordMetadataAndOffset])  
  
  offsets.forKeyValue { (topicIdPartition, offsetAndMetadata) =>  
    producerOffsets.put(topicIdPartition.topicPartition, CommitRecordMetadataAndOffset(None, offsetAndMetadata))  
  }  
}

4.7. handleEndTransaction

EndTxnRequest请求由handleEndTransaction方法进行处理,主要分为以下步骤:

  1. 更新事务元数据中的事务状态
  2. 向TopicPartition的leader endpoint发送WriteTxnMarkerRequest
  3. 将事务结果写入事务日志中
def handleEndTransaction(transactionalId: String,  
                         producerId: Long,  
                         producerEpoch: Short,  
                         txnMarkerResult: TransactionResult,  
                         responseCallback: EndTxnCallback,  
                         requestLocal: RequestLocal = RequestLocal.NoCaching): Unit = {  
  endTransaction(transactionalId,  
    producerId,  
    producerEpoch,  
    txnMarkerResult,  
    isFromClient = true,  
    responseCallback,  
    requestLocal)  
}

4.8. scheduleHandleTxnCompletion

WriteTxnMarkerRequest请求由leader partition所在的broker处理,异步执行更新group metadata中的消费进度kafka.coordinator.group.GroupMetadata#offsets。

def scheduleHandleTxnCompletion(producerId: Long, completedPartitions: Set[Int], isCommit: Boolean): CompletableFuture[Void] = {  
  val future = new CompletableFuture[Void]()  
  scheduler.scheduleOnce(s"handleTxnCompletion-$producerId", () => {  
    try {  
      handleTxnCompletion(producerId, completedPartitions, isCommit)  
      future.complete(null)  
    } catch {  
      case e: Throwable => future.completeExceptionally(e)  
    }  
  })  
  future  
}
private[group] def handleTxnCompletion(producerId: Long, completedPartitions: Set[Int], isCommit: Boolean): Unit = {  
  val pendingGroups = groupsBelongingToPartitions(producerId, completedPartitions)  
  pendingGroups.foreach { groupId =>  
    getGroup(groupId) match {  
      case Some(group) => group.inLock {  
        if (!group.is(Dead)) {  
          group.completePendingTxnOffsetCommit(producerId, isCommit)  
          removeProducerGroup(producerId, groupId)  
        }  
      }  
      case _ =>  
        info(s"Group $groupId has moved away from $brokerId after transaction marker was written but before the " +  
          s"cache was updated. The cache on the new group owner will be updated instead.")  
    }  
  }  
}

5. Reference

深入 Kafka Core 的设计(事务篇) – K’s Blog

Kafka Transactional Support: How It Enables Exactly-Once Semantics

Transactions in Apache Kafka | Confluent

Kafka Exactly-Once 之事务性实现 | Matt’s Blog

Kafka 事务实现原理 | 学习笔记

KIP-98 - Exactly Once Delivery and Transactional Messaging - Apache Kafka - Apache Software Foundation