Consumer

Consumer作为Kafka Clients中的消费者,继承关系如下图所示:

AsyncKafkaConsumer.png

KafkaConsumer作为Facade类,提供API供clients使用,而ConsumerDelegate作为实现类接口,提供了两种实现方式,通过配置group.protocol进行控制,其中ClassicKafkaConsumer所有的线程都会处理网络IO请求,AsyncKafkaConsumer则是基于Reactor模式,使用单独线程处理网络IO,以事件驱动模式处理任务,具体细节见 Consumer threading refactor design

本次分析也以AsyncKafkaConsumer为例。

KafkaConsumer内置的成员变量如下:

//用于创建delegate的工厂类  
private static final ConsumerDelegateCreator CREATOR = new ConsumerDelegateCreator();  
  
//consumer具体的实现类  
private final ConsumerDelegate<K, V> delegate;

初始化方法:

KafkaConsumer(ConsumerConfig config, Deserializer<K> keyDeserializer, Deserializer<V> valueDeserializer) {  
    delegate = CREATOR.create(config, keyDeserializer, valueDeserializer);  
}

public <K, V> ConsumerDelegate<K, V> create(ConsumerConfig config,  
                                            Deserializer<K> keyDeserializer,  
                                            Deserializer<V> valueDeserializer) {  
    try {  
        //根据配置选取对应的实现类
        GroupProtocol groupProtocol = GroupProtocol.valueOf(config.getString(ConsumerConfig.GROUP_PROTOCOL_CONFIG).toUpperCase(Locale.ROOT));  
  
        if (groupProtocol == GroupProtocol.CONSUMER)  
            return new AsyncKafkaConsumer<>(config, keyDeserializer, valueDeserializer);  
        else  
            return new ClassicKafkaConsumer<>(config, keyDeserializer, valueDeserializer);  
    } catch (KafkaException e) {  
        throw e;  
    } catch (Throwable t) {  
        throw new KafkaException("Failed to construct Kafka consumer", t);  
    }  
}

事件处理逻辑

AsyncKafkaConsumer的核心使用事件驱动模式来处理各类事件,具体事件类型见org.apache.kafka.clients.consumer.internals.events.ApplicationEvent

AsyncKafkaConsumer线程模型

ApplicationEventHandler

ApplicationEventHandler用于接收来自consumer端的各类事件,属性和构造方法如下:


// 用于接收application event的BlockingQueue
private final BlockingQueue<ApplicationEvent> applicationEventQueue;  

//网络IO线程
private final ConsumerNetworkThread networkThread;


public ApplicationEventHandler(final LogContext logContext,  
                               final Time time,  
                               final BlockingQueue<ApplicationEvent> applicationEventQueue,  
                               final CompletableEventReaper applicationEventReaper,  
                               final Supplier<ApplicationEventProcessor> applicationEventProcessorSupplier,  
                               final Supplier<NetworkClientDelegate> networkClientDelegateSupplier,  
                               final Supplier<RequestManagers> requestManagersSupplier) {  
    this.log = logContext.logger(ApplicationEventHandler.class);  
    this.applicationEventQueue = applicationEventQueue;  
    this.networkThread = new ConsumerNetworkThread(logContext,  
            time,  
            applicationEventQueue,  
            applicationEventReaper,  
            applicationEventProcessorSupplier,  
            networkClientDelegateSupplier,  
            requestManagersSupplier);  
    this.networkThread.start();  
}

核心方法add()用于向event queue追加事件,并唤醒网络IO线程。

public void add(final ApplicationEvent event) {  
    Objects.requireNonNull(event, "ApplicationEvent provided to add must be non-null");  
    applicationEventQueue.add(event);
    wakeupNetworkThread();  
}

ApplicationEventProcessor

ApplicationEventProcessor是实际处理各类application event的执行器:

@Override  
public void process(ApplicationEvent event) {  
    switch (event.type()) {  
        case COMMIT_ASYNC:  
            process((AsyncCommitEvent) event);  
            return;
	    ……
    }
}            

以AsyncCommitEvent为例,通过requestManagers容器获取到对应职责的RequestManager,并将request追加到pendingRequests,由ConsumerNetworkThread调用poll()获取unsent请求,并通过实际与broker交互的KafkaClient发送request并处理。

private void process(final AsyncCommitEvent event) {  
    if (!requestManagers.commitRequestManager.isPresent()) {  
        return;  
    }  
  
    CommitRequestManager manager = requestManagers.commitRequestManager.get();  
    CompletableFuture<Void> future = manager.commitAsync(event.offsets());  
    future.whenComplete(complete(event.future()));  
}

ConsumerNetworkThread

ConsumerNetworkThread是用于后台处理event的线程,并负责处理broker的网络IO。

线程的run()方法通过while循环循环调用runOnce()。

public void run() {  
    try {  
        log.debug("Consumer network thread started");  
  
        // Wait until we're securely in the background network thread to initialize these objects...  
        initializeResources();  
  
        while (running) {  
            try {  
                runOnce();  
            } catch (final Throwable e) {  
                // Swallow the exception and continue  
                log.error("Unexpected error caught in consumer network thread", e);  
            }  
        }  
    } finally {  
        cleanup();  
    }  
}

runOnce()方法主要处理以下几个任务:

  1. 提取event并使用ApplicationEventProcessor处理application event
  2. 遍历RequestManager并调用poll()方法
  3. 调用NetworkClientDelegate. addAll(List)将request添加到unsentRequests队列中
  4. 调用KafkaClient. poll(long, long)向broker发送请求
void runOnce() {  
  
    //1.1 通过ApplicationEventProcessor处理各类event  
    processApplicationEvents();  
  
    final long currentTimeMs = time.milliseconds();  
  
    final long pollWaitTimeMs = requestManagers.entries().stream()  
            .filter(Optional::isPresent)  
            .map(Optional::get)  
            //1.2 循环调用RequestManager.poll(long)获取unsentRequests  
            .map(rm -> rm.poll(currentTimeMs))  
            //1.3 调用addAll,将unsentRequests添加到NetworkClientDelegate中  
            .map(networkClientDelegate::addAll)  
            .reduce(MAX_POLL_TIMEOUT_MS, Math::min);  
  
    //1.4 调用poll,发送请求,接收响应  
    networkClientDelegate.poll(pollWaitTimeMs, currentTimeMs);  
  
    cachedMaximumTimeToWait = requestManagers.entries().stream()  
            .filter(Optional::isPresent)  
            .map(Optional::get)  
            .map(rm -> rm.maximumTimeToWait(currentTimeMs))  
            .reduce(Long.MAX_VALUE, Math::min);  
  
    //1.5 清理过期event  
    reapExpiredApplicationEvents(currentTimeMs);  
}

processApplicationEvents()用于通过applicationEventProcessor来处理event。

/**  
 * Process the events—if any—that were produced by the application thread. */private void processApplicationEvents() {  
    //1.1 获取queue中所有 event    LinkedList<ApplicationEvent> events = new LinkedList<>();  
    applicationEventQueue.drainTo(events);  
  
    //1.2 循环遍历,通过applicationEventProcessor处理event  
    for (ApplicationEvent event : events) {  
        try {  
            if (event instanceof CompletableEvent)  
                applicationEventReaper.add((CompletableEvent<?>) event);  
  
            applicationEventProcessor.process(event);  
        } catch (Throwable t) {  
            log.warn("Error processing event {}", t.getMessage(), t);  
        }  
    }  
}

BackgroundEventProcessor

BackgroundEventProcessor作为AsyncKafkaConsumer成员变量,用于处理network thread产生的background events,从其process()方法中可以看出,该processor主要处理以下事件:

  1. network thread产生的error event
  2. 在application thread执行rebalance回调逻辑
public void process(final BackgroundEvent event) {  
    switch (event.type()) {  
        case ERROR:  
            process((ErrorEvent) event);  
            break;  
  
        case CONSUMER_REBALANCE_LISTENER_CALLBACK_NEEDED:  
            process((ConsumerRebalanceListenerCallbackNeededEvent) event);  
            break;  
  
        default:  
            throw new IllegalArgumentException("Background event type " + event.type() + " was not expected");  
  
    }  
}

订阅主题

subscribe方法来订阅主题,若多次调用,以最后一次作为消费的主题。

public void subscribe(Collection<String> topics, ConsumerRebalanceListener listener) {  
    if (listener == null)  
        throw new IllegalArgumentException("RebalanceListener cannot be null");  
  
    subscribeInternal(topics, Optional.of(listener));  
}

subscribeInternal()方法用于处理实际的subscribe逻辑。

private void subscribeInternal(Collection<String> topics, Optional<ConsumerRebalanceListener> listener) {  
    //1.1 获取lock,并且判断是否已经close  
    acquireAndEnsureOpen();  
    try {  
        //1.2 判断group id是否有效  
        maybeThrowInvalidGroupIdException();  
  
        //1.3 校验参数  
        if (topics == null)  
            throw new IllegalArgumentException("Topic collection to subscribe to cannot be null");  
  
        //1.4 若为空,则unsubscribe  
        if (topics.isEmpty()) {  
            // treat subscribing to empty topic list as the same as unsubscribing  
            unsubscribe();  

		} else {  
            for (String topic : topics) {  
                if (isBlank(topic))  
                    throw new IllegalArgumentException("Topic collection to subscribe to cannot contain null or empty topic");  
            }  
  
            // 1.5 更新buffer中不再指定的partition  
            final Set<TopicPartition> currentTopicPartitions = new HashSet<>();  
  
            for (TopicPartition tp : subscriptions.assignedPartitions()) {  
                if (topics.contains(tp.topic()))  
                    currentTopicPartitions.add(tp);  
            }  
  
            fetchBuffer.retainAll(currentTopicPartitions);  
            log.info("Subscribed to topic(s): {}", String.join(", ", topics));  
  
            // 1.6 调用SubscriptionState.subscribe 更新订阅topic  
            if (subscriptions.subscribe(new HashSet<>(topics), listener))  
                //若请求成功,更新metadata  
                this.metadataVersionSnapshot = metadata.requestUpdateForNewTopics();  
  
            // 1.7 向handler添加event  
            applicationEventHandler.add(new SubscriptionChangeEvent());  
        }  
    } finally {  
        //1.8 释放lock  
        release();  
    }  
}

加锁方式采用乐观锁,校验内置threadId是否一致:

private void acquire() {  
    final Thread thread = Thread.currentThread();  
    final long threadId = thread.getId();  
    if (threadId != currentThread.get() && !currentThread.compareAndSet(NO_CURRENT_THREAD, threadId))  
        throw new ConcurrentModificationException("KafkaConsumer is not safe for multi-threaded access. " +  
            "currentThread(name: " + thread.getName() + ", id: " + threadId + ")" +  
            " otherThread(id: " + currentThread.get() + ")"  
        );  
    refCount.incrementAndGet();  
}

取消订阅

unsubscribe()方法用于取消topic订阅。

public void unsubscribe() {  
    //1.1 获取锁,并确保当前消费者没有关闭  
    acquireAndEnsureOpen();  
    try {  
        //1.2 删除buffer中所有订阅的topic分区  
        fetchBuffer.retainAll(Collections.emptySet());  
        Timer timer = time.timer(Long.MAX_VALUE);  
  
        //1.3 向handler发送unsubscribeEvent  
        UnsubscribeEvent unsubscribeEvent = new UnsubscribeEvent(calculateDeadlineMs(timer));  
        applicationEventHandler.add(unsubscribeEvent);  
        log.info("Unsubscribing all topics or patterns and assigned partitions {}",  
                subscriptions.assignedPartitions());  
  
        //1.4 循环处理background event  
        try {  
            processBackgroundEvents(unsubscribeEvent.future(), timer);  
            log.info("Unsubscribed all topics or patterns and assigned partitions");  
        } catch (TimeoutException e) {  
            log.error("Failed while waiting for the unsubscribe event to complete");  
        }  
        //1.5 重置group的metadata  
        resetGroupMetadata();  
    } catch (Exception e) {  
        log.error("Unsubscribe failed", e);  
        throw e;  
    } finally {  
        //1.6 释放lock  
        release();  
    }  
}

拉取消息

poll()方法传递timeout,在指定timeout内,从broker消费数据。

public ConsumerRecords<K, V> poll(final Duration timeout) {  
    Timer timer = time.timer(timeout);  
  
    //1.1 获取lock并确保consumer未关闭  
    acquireAndEnsureOpen();  
    try {  
        //1.2 更新consumer监控指标  
        kafkaConsumerMetrics.recordPollStart(timer.currentTimeMs());  
  
        //1.3 确保已订阅topic  
        if (subscriptions.hasNoSubscriptionOrUserAssignment()) {  
            throw new IllegalStateException("Consumer is not subscribed to any topics or assigned any partitions");  
        }  
  
        //1.4 定时处理poll逻辑  
        do {  
            // 1.5 向handler发送PollEvent  
            applicationEventHandler.add(new PollEvent(timer.currentTimeMs()));  
  
            // We must not allow wake-ups between polling for fetches and returning the records.  
            // If the polled fetches are not empty the consumed position has already been updated in the polling            // of the fetches. A wakeup between returned fetches and returning records would lead to never            // returning the records in the fetches. Thus, we trigger a possible wake-up before we poll fetches.            wakeupTrigger.maybeTriggerWakeup();  
  
            //1.6 更新metadata,并唤醒network thread处理poll任务,获取数据  
            updateAssignmentMetadataIfNeeded(timer);  
            final Fetch<K, V> fetch = pollForFetches(timer);  
            if (!fetch.isEmpty()) {  
                if (fetch.records().isEmpty()) {  
                    log.trace("Returning empty records from `poll()` "  
                        + "since the consumer's position has advanced for at least one topic partition");  
                }  
                //1.6 通过interceptors处理前置消费逻辑,并返回ConsumerRecords  
                return interceptors.onConsume(new ConsumerRecords<>(fetch.records()));  
            }  
            // We will wait for retryBackoffMs  
        } while (timer.notExpired());  
  
        return ConsumerRecords.empty();  
    } finally {  
        //1.7 更新consumer监控指标,释放lock  
        kafkaConsumerMetrics.recordPollEnd(timer.currentTimeMs());  
        release();  
    }  
}

collectFetch()方法从buffer中获取消息,其中FetchBuffer用于存储来自broker响应的消息结果CompletedFetch,每个CompletedFetch代表来自一个partition的响应结果。

public Fetch<K, V> collectFetch(final FetchBuffer fetchBuffer) {  
    final Fetch<K, V> fetch = Fetch.empty();  
    final Queue<CompletedFetch> pausedCompletedFetches = new ArrayDeque<>();  
    int recordsRemaining = fetchConfig.maxPollRecords;  
  
    try {  
        while (recordsRemaining > 0) {  
            //1.1 从fetchBuffer获取CompletedFetch  
            final CompletedFetch nextInLineFetch = fetchBuffer.nextInLineFetch();  
  
            //1.2 若nextInLineFetch()返回null或已被消费,从queue中获取  
            if (nextInLineFetch == null || nextInLineFetch.isConsumed()) {  
                final CompletedFetch completedFetch = fetchBuffer.peek();  
  
                //1.3 此时为空,说明broker暂无消息响应  
                if (completedFetch == null)  
                    break;  
  
  
                //1.4 初始化CompletedFetch  
                if (!completedFetch.isInitialized()) {  
                    try {  
                        fetchBuffer.setNextInLineFetch(initialize(completedFetch));  
                    } catch (Exception e) {  
                        // Remove a completedFetch upon a parse with exception if (1) it contains no completedFetch, and  
                        // (2) there are no fetched completedFetch with actual content preceding this exception.                        // The first condition ensures that the completedFetches is not stuck with the same completedFetch                        // in cases such as the TopicAuthorizationException, and the second condition ensures that no                        // potential data loss due to an exception in a following record.                        
                        if (fetch.isEmpty() && FetchResponse.recordsOrFail(completedFetch.partitionData).sizeInBytes() == 0)  
                            fetchBuffer.poll();  
  
                        throw e;  
                    }  
                } else {  
                    fetchBuffer.setNextInLineFetch(completedFetch);  
                }  
  
                fetchBuffer.poll();  
                //1.5 检查当前topic partition是否被暂停消费  
            } else if (subscriptions.isPaused(nextInLineFetch.partition)) {  
                // when the partition is paused we add the records back to the completedFetches queue instead of draining  
                // them so that they can be returned on a subsequent poll if the partition is resumed at that time                
                log.debug("Skipping fetching records for assigned partition {} because it is paused", nextInLineFetch.partition);  
                pausedCompletedFetches.add(nextInLineFetch);  
                fetchBuffer.setNextInLineFetch(null);  
            } else {  
                //1.6 从CompletedFetch中获取Fetch  
                final Fetch<K, V> nextFetch = fetchRecords(nextInLineFetch, recordsRemaining);  
                recordsRemaining -= nextFetch.numRecords();  
                fetch.add(nextFetch);  
            }  
        }  
    } catch (KafkaException e) {  
        if (fetch.isEmpty())  
            throw e;  
    } finally {  
        // add any polled completed fetches for paused partitions back to the completed fetches queue to be  
        // re-evaluated in the next poll
        fetchBuffer.addAll(pausedCompletedFetches);  
    }  
  
    return fetch;  
}

提交位移

commit提供了同步、异步两种方式:

/**  
 * Commit offsets returned on the last {@link #poll(Duration) poll()} for all the subscribed list of topics and  
 * partitions. */@Override  
public void commitSync() {  
    commitSync(Duration.ofMillis(defaultApiTimeoutMs));  
}  
  
/**  
 * This method sends a commit event to the EventHandler and return. */@Override  
public void commitAsync() {  
    commitAsync(null);  
}

两者区别在于commitSync()会调用future.get()进行等待,而commitAsync()则调用whenComplete()执行异步回调。

@Override  
public void commitSync(Map<TopicPartition, OffsetAndMetadata> offsets, Duration timeout) {  
  
    //1.1 获取lock  
    acquireAndEnsureOpen();  
    long commitStart = time.nanoseconds();  
    try {  
        //1.2 向applicationEventHandler提交SyncCommitEvent  
        SyncCommitEvent syncCommitEvent = new SyncCommitEvent(offsets, calculateDeadlineMs(time, timeout));  
        CompletableFuture<Void> commitFuture = commit(syncCommitEvent);  
  
        //1.3 检查是否有lastPendingAsyncCommit任务需要执行  
        Timer requestTimer = time.timer(timeout.toMillis());  
        awaitPendingAsyncCommitsAndExecuteCommitCallbacks(requestTimer, true);  
          
        //1.4 将commitFuture配置到wakeupTrigger,调用future.get()等待完成  
        wakeupTrigger.setActiveTask(commitFuture);  
        ConsumerUtils.getResult(commitFuture, requestTimer);  
          
        //1.5 检查拦截器,执行前置commit方法  
        interceptors.onCommit(offsets);  
    } finally {  
        wakeupTrigger.clearTask();  
        kafkaConsumerMetrics.recordCommitSync(time.nanoseconds() - commitStart);  
        release();  
    }  
}

@Override  
public void commitAsync(Map<TopicPartition, OffsetAndMetadata> offsets, OffsetCommitCallback callback) {  
    //1.1 获取lock  
    acquireAndEnsureOpen();  
    try {  
        //1.2 创建异步提交事件,并提交到EventHandler  
        AsyncCommitEvent asyncCommitEvent = new AsyncCommitEvent(offsets);  
        lastPendingAsyncCommit = commit(asyncCommitEvent).whenComplete((r, t) -> {  
  
            if (t == null) {  
                offsetCommitCallbackInvoker.enqueueInterceptorInvocation(offsets);  
            }  
  
            if (t instanceof FencedInstanceIdException) {  
                asyncCommitFenced.set(true);  
            }  
  
            if (callback == null) {  
                if (t != null) {  
                    log.error("Offset commit with offsets {} failed", offsets, t);  
                }  
                return;  
            }  
  
            offsetCommitCallbackInvoker.enqueueUserCallbackInvocation(callback, offsets, (Exception) t);  
        });  
    } finally {  
        release();  
    }  
}

核心用于构建CompletableFuture的commit方法如下:

private CompletableFuture<Void> commit(final CommitEvent commitEvent) {  
    //1.1 检查是否抛出异常  
    maybeThrowInvalidGroupIdException();  
    maybeThrowFencedInstanceException();  
  
    //1.2 执行offset commit回调  
    offsetCommitCallbackInvoker.executeCallbacks();  
  
    //1.3 更新partition metadata的leader Epoch  
    Map<TopicPartition, OffsetAndMetadata> offsets = commitEvent.offsets();  
    log.debug("Committing offsets: {}", offsets);  
    offsets.forEach(this::updateLastSeenEpochIfNewer);  
  
    if (offsets.isEmpty()) {  
        return CompletableFuture.completedFuture(null);  
    }  
  
    //1.4 添加event,并返回CompletableFuture  
    applicationEventHandler.add(commitEvent);  
    return commitEvent.future();  
}

rebalance流程

Kafka支持多consumer并行消费多个partition,因此当consumer数量或partition发生变化时,broker端会重新为当前消费Group分配所订阅的partition。

新版rebalance协议细节可见:# KIP-848: The Next Generation of the Consumer Rebalance Protocol