尧图网站建设 尧图网络
  • 首页
  • 关于我们
  • 服务项目
  • 案例展示
  • 建站流程
  • 资讯中心
  • 联系我们
首页/资讯中心/详情

kafka源码-@KafkaListener消费端的poll调用逻辑

kafka源码-@KafkaListener消费端的poll调用逻辑
📅 发布时间:2026/6/19 5:03:02

单独展开项目中 @KafkaListener 消费端从启动注册到 poll 拉取再到 listen() 被调用的完整源码调用链。版本对齐:Spring Boot 2.7.18 → spring-kafka 2.8.11 → kafka-clients 3.1.x。

一、与项目的对应关系

消费代码:

// 监听test_topic主题 @KafkaListener(topics = "test_topic") public void listen(String msg) { System.out.println("SpringBoot消费者收到消息:" + msg); }

关键配置(application.yml):

  • group-id: test-boot-group
  • enable-auto-commit: true
  • auto-offset-reset: earliest
  • StringDeserializer二、完整调用链

二、完整调用链

三、阶段 1:启动时注册监听器(还没到 poll)

3.1 启用@KafkaListener处理

KafkaAutoConfiguration通过内部类@EnableKafka激活注解驱动:

// KafkaAnnotationDrivenConfiguration.EnableKafkaConfiguration @EnableKafka // 导入 KafkaListenerAnnotationBeanPostProcessor 等 Bean

同时创建默认工厂:

@Bean ConcurrentKafkaListenerContainerFactory kafkaListenerContainerFactory(...) { ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory<>(); configurer.configure(factory, kafkaConsumerFactory); // 注入 ConsumerFactory、pollTimeout 等 return factory; }

3.2 扫描KafkaMsgConsumer.listen

KafkaListenerAnnotationBeanPostProcessor在 Bean 初始化后扫描@KafkaListener:

// processKafkaListener MethodKafkaListenerEndpoint endpoint = new MethodKafkaListenerEndpoint<>(); endpoint.setMethod(method); // listen(String msg) endpoint.setBean(bean); // KafkaMsgConsumer 实例 endpoint.setTopics(["test_topic"]); endpoint.setGroupId(...); // 未指定则用默认 KafkaListenerContainerFactory factory = resolveContainerFactory(...); // kafkaListenerContainerFactory this.registrar.registerEndpoint(endpoint, factory);

3.3 注册并启动容器

所有单例 Bean 就绪后:

// KafkaListenerAnnotationBeanPostProcessor.afterSingletonsInstantiated() this.registrar.afterPropertiesSet(); // → registerAllEndpoints() // KafkaListenerEndpointRegistrar.registerAllEndpoints() this.endpointRegistry.registerListenerContainer(descriptor.endpoint, factory, true); // autoStartup=true → 立刻 start()

四、阶段 2:创建容器与消费者(poll 前的准备)

4.1 工厂创建ConcurrentMessageListenerContainer

// ConcurrentKafkaListenerContainerFactory.createContainerInstance() ContainerProperties properties = new ContainerProperties("test_topic"); return new ConcurrentMessageListenerContainer<>(consumerFactory, properties);

默认concurrency = 1,所以只创建 1 个子容器KafkaMessageListenerContainer。

4.2 绑定 MessageListener 适配器

// MethodKafkaListenerEndpoint.createMessageListener() RecordMessagingMessageListenerAdapter adapter = new RecordMessagingMessageListenerAdapter(bean, method, errorHandler); adapter.setHandlerMethod(new HandlerAdapter(invocableHandlerMethod)); // invocableHandlerMethod = 对 KafkaMsgConsumer.listen(String) 的反射封装

listen(String msg)参数只有 payload,监听器类型为ListenerType.SIMPLE。

4.3 启动容器,提交消费线程

// KafkaMessageListenerContainer.doStart() this.listenerConsumer = new ListenerConsumer(listener, listenerType); consumerExecutor.submitListenable(this.listenerConsumer); // 异步线程,非 HTTP 线程

线程名类似:org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1。

4.4ListenerConsumer构造函数:创建KafkaConsumer并订阅

// ListenerConsumer 构造 this.autoCommit = determineAutoCommit(consumerProperties); // true(你的配置) this.consumer = consumerFactory.createConsumer( this.consumerGroupId, // "test-boot-group" clientId, clientIdSuffix, consumerProperties); subscribeOrAssignTopics(this.consumer); // → consumer.subscribe(["test_topic"], rebalanceListener)

底层:

// DefaultKafkaConsumerFactory.createRawConsumer() return new KafkaConsumer<>(configProps, keyDeserializerSupplier.get(), // StringDeserializer valueDeserializerSupplier.get()); // StringDeserializer

此时消费者加入test-boot-group,触发 ConsumerCoordinator 做 rebalance,分配test_topic的分区。


五、阶段 3:消费主循环 —run()→pollAndInvoke()

5.1 无限循环

// ListenerConsumer.run() while (isRunning()) { try { pollAndInvoke(); // 每一轮 = poll + 处理 } catch (Exception e) { handleConsumerException(e); } }

5.2 单轮 poll 流程

// ListenerConsumer.pollAndInvoke() idleBetweenPollIfNecessary(); // 空闲时可能 sleep pauseConsumerIfNecessary(); // 容器 pause 时跳过 this.lastPoll = System.currentTimeMillis(); ConsumerRecords<K, V> records = doPoll(); // 核心 invokeIfHaveRecords(records); // 有记录则调用监听器 // doPoll() → pollConsumer() private ConsumerRecords<K, V> pollConsumer() { beforePoll(); try { return this.consumer.poll(this.pollTimeout); // 默认 5000ms } catch (WakeupException ex) { return ConsumerRecords.empty(); // 停止容器时唤醒 } }

5.3 有消息时进入监听调用

// invokeIfHaveRecords() if (records != null && records.count() > 0) { invokeListener(records); // 非 batch → invokeRecordListener } // invokeRecordListener() → doInvokeWithRecords() Iterator<ConsumerRecord<K, V>> iterator = records.iterator(); while (iterator.hasNext()) { ConsumerRecord<K, V> record = checkEarlyIntercept(iterator.next()); doInvokeRecordListener(record, iterator); }

六、阶段 4:从ConsumerRecord到你的listen(String msg)

6.1 单条记录处理

// doInvokeRecordListener() invokeOnMessage(record); // invokeOnMessage() doInvokeOnMessage(record); if (!isManualImmediateAck) { ackCurrent(record); // autoCommit=true 时此处基本不手动提交 }

6.2 适配器分发(ListenerType.SIMPLE)

// doInvokeOnMessage() switch (this.listenerType) { case SIMPLE: this.listener.onMessage(record); // RecordMessagingMessageListenerAdapter break; ... }

6.3RecordMessagingMessageListenerAdapter→ 反射调用

// RecordMessagingMessageListenerAdapter.onMessage(record, ack, consumer) Object result = invokeHandler(record, acknowledgment, message, consumer); // MessagingMessageListenerAdapter.invokeHandler() return this.handlerMethod.invoke(message, data, acknowledgment, consumer); // data = ConsumerRecord,InvocableHandlerMethod 从 record.value() 提取 String // HandlerAdapter.invoke() return invocableHandlerMethod.invoke(message, providedArgs); // 最终调用 KafkaMsgConsumer.listen("hello")

项目的listen(String msg)收到的msg,就是ConsumerRecord.value()经StringDeserializer反序列化后的结果。

七、阶段 5:KafkaConsumer.poll()内部(kafka-clients)

Spring 调用的是:

// KafkaConsumer.poll(Duration timeout) private ConsumerRecords<K, V> poll(Timer timer, boolean includeMetadataInTimeout) { do { // ① 协调器:心跳、rebalance、更新分区分配 updateAssignmentMetadataIfNeeded(timer, false); // ② 拉取数据 Map<TopicPartition, List<ConsumerRecord<K, V>>> records = pollForFetches(timer); if (!records.isEmpty()) { // ③ 预取下一批(pipeline) if (fetcher.sendFetches() > 0) { client.transmitSends(); } // ④ 拦截器 + 返回 return interceptors.onConsume(new ConsumerRecords<>(records)); } } while (timer.notExpired()); return ConsumerRecords.empty(); // 超时无数据 }

pollForFetches内部:

// 本地缓冲区有数据 → 直接返回(已反序列化) Map<...> records = fetcher.fetchedRecords(); if (!records.isEmpty()) return records; // 否则发 FetchRequest 到 Broker,NetworkClient.poll() 等待响应 fetcher.sendFetches(); client.poll(pollTimeout, ...); return fetcher.fetchedRecords();

数据流:

Broker 磁盘

→ FetchResponse(字节)

→ Fetcher 解析

→ StringDeserializer.deserialize() // value 变 "hello"

→ ConsumerRecord("test_topic", partition, offset, "hello")

→ ConsumerRecords

→ Spring ListenerConsumer

→ KafkaMsgConsumer.listen("hello")


八、offset 提交(你的enable-auto-commit: true)

项目的配置下 不由 Spring 手动 commit,而是由KafkaConsumer内部协调器按auto.commit.interval.ms(默认 5s)自动提交。

// ackCurrent() 在 autoCommit=true 时 else if (... && !this.autoCommit) { this.acks.add(record); // 不满足,跳过 } // → 不进入 Spring 手动提交逻辑

含义:

  • listen()正常返回后,offset 不会立刻提交
  • 通常在接下来几秒内由消费者客户端后台提交
  • 若listen()抛异常,默认错误处理下该条消息可能被重复消费(取决于提交时机)

九、精简版调用栈(对照源码用)

[启动]

SpringApplication.run()

└─ @EnableKafka → KafkaListenerAnnotationBeanPostProcessor

└─ processKafkaListener(KafkaMsgConsumer.listen)

└─ registrar.registerEndpoint(MethodKafkaListenerEndpoint)

└─ KafkaListenerEndpointRegistry.registerListenerContainer(..., autoStartup=true)

[容器启动]

ConcurrentKafkaListenerContainerFactory.createListenerContainer()

└─ ConcurrentMessageListenerContainer.doStart()

└─ KafkaMessageListenerContainer.doStart()

└─ SimpleAsyncTaskExecutor.submit(ListenerConsumer)

[消费线程初始化]

ListenerConsumer.<init>()

├─ DefaultKafkaConsumerFactory.createConsumer("test-boot-group", ...)

│ └─ new KafkaConsumer<>(configs, StringDeserializer, StringDeserializer)

└─ consumer.subscribe(["test_topic"], rebalanceListener)

[消费循环 - 独立线程]

ListenerConsumer.run()

└─ while (isRunning) pollAndInvoke()

├─ consumer.poll(Duration.ofMillis(5000)) // KafkaConsumer

│ ├─ ConsumerCoordinator.poll() // 心跳/rebalance

│ ├─ Fetcher.sendFetches() → NetworkClient // 向 Broker 发 Fetch

│ └─ Fetcher.fetchedRecords() // 反序列化 → ConsumerRecords

└─ invokeListener(records)

└─ doInvokeWithRecords()

└─ doInvokeRecordListener(record)

└─ invokeOnMessage(record)

└─ listener.onMessage(record) // RecordMessagingMessageListenerAdapter

└─ HandlerAdapter.invoke()

└─ InvocableHandlerMethod.invoke()

└─ KafkaMsgConsumer.listen("hello") // 你的业务方法


十、与生产侧的对比(同一进程)

维度生产侧 (KafkaTemplate.send)消费侧 (@KafkaListener)

触发线程

Tomcat HTTP 线程

ListenerConsumer专用线程

是否阻塞等待

send异步立即返回

poll最多阻塞pollTimeout(默认 5s)

核心 API

KafkaProducer.send

KafkaConsumer.poll

你的业务入口

KafkaTestController.send

KafkaMsgConsumer.listen

与 Broker 交互

ProducerRequest

FetchRequest + 心跳

Producer 把消息写入 Broker 后,消费线程在下一轮(或当前轮)poll中拉到,listen()才会执行——两者完全异步、不同线程。


十一、几个容易忽略的细节

1. 首次poll可能为空

Rebalance 完成前,poll可能返回空集合;auto-offset-reset: earliest决定从最早还是最新 offset 开始读。

2. 同一分区内顺序消费

concurrency=1时,同一 partition 的消息在doInvokeWithRecords的 while 循环中串行调用listen()。

3.poll超时不是错误

5s 内无新消息 → 返回ConsumerRecords.empty()→ 进入 idle 检测 → 继续下一轮 poll。

4. 停止应用时

doStop()→listenerConsumer.wakeup()→poll抛WakeupException→ 返回空记录 → 循环退出。

5. 与KafkaTemplate.send的衔接点

唯一的交汇点是 Kafka Broker 上的test_topic;Spring 生产链路和消费链路在代码层没有直接调用关系。


源码对照:

  • KafkaMessageListenerContainer(ListenerConsumer.run/pollAndInvoke)
  • RecordMessagingMessageListenerAdapter
  • KafkaConsumer.poll

相关新闻

  • 3分钟学会:Windows上最轻量的安卓APK安装工具完全指南
  • OA与CMS系统漏洞挖掘:从权限边界突破到实战提权
  • TC820双斜积分ADC:从原理到3位半数字电压表设计实战

最新新闻

  • GPT-4.1三模型架构解析:Turbo/Reasoning/LongContext工程落地指南
  • 四步让老旧Mac焕发新生:OpenCore Legacy Patcher终极指南
  • 卖床品的店价格透明,2026十大品牌口碑推荐照着选 - 工业品牌热点
  • LLM前摄干扰缺陷:为什么大模型无法准确追踪最新数据
  • Narou.rb:日本网络小说下载与管理的终极解决方案
  • 2026专业奢侈品回收综合实力榜 透明报价与口碑双优 - 工业品牌热点

日新闻

  • 5分钟掌握Python进化算法:Geatpy高性能优化工具完全指南
  • Microchip 24AA044 EEPROM选型与应用全指南:从参数解析到实战编程
  • 华为的鸿蒙到底有多牛?为什么称作遥遥领先?

周新闻

  • 3步解锁iOS设备:applera1n激活锁绕过完全指南
  • 39 2026 人工智能证书终极盘点,普通人选 AI 证书可以从这些方向入手
  • Redis 暴露公网有多危险?从端口检查到补救步骤

月新闻

  • 【总结】入门篇:50句话让你记住架构核心概念
  • WeChatMsg技术方案解析:实现Mac微信数据自主管理的完整解决方案
  • WeChatMsg:革新性微信数据备份方案,打造你的专属数字记忆库

关于尧图

  • 公司简介
  • 团队介绍
  • 企业文化
  • 荣誉资质

服务项目

  • 定制开发
  • 电商建站
  • UI 设计
  • 运维服务

快速链接

  • 案例展示
  • 建站流程
  • 常见问题
  • 资讯中心

联系方式

  • 📍北京市朝阳区互联网产业园 A 座 10 层
  • 📞400-888-8888
  • ✉️contact@rkmt.cn
  • 🕐周一至周日 9:00-21:00

© 2024 北京尧图网络科技有限公司 版权所有 | 京 ICP 备 XXXXXXXX 号