尧图网站建设 尧图网络
  • 首页
  • 关于我们
  • 服务项目
  • 案例展示
  • 建站流程
  • 资讯中心
  • 联系我们
首页/资讯中心/详情

TDengine TMQ 消费流程 — 从 Subscribe 到 Commit 的完整链路

TDengine TMQ 消费流程 — 从 Subscribe 到 Commit 的完整链路
📅 发布时间:2026/7/1 2:55:41

分类:6.数据订阅 TMQ |篇章:02 消费流程

适用版本:TDengine v3.x(v3.3.x / v3.4.x) | 最后更新:2026-06-30

理解 TMQ 消费的内部链路有助于诊断性能问题、设计可靠的消费应用。本文按 Subscribe → Assign → Poll → Process → Commit 五个阶段拆解。

核心概念速查表

概念说明
Subscribe订阅 Topic
Assign分区分配
Fetch从服务端拉取消息
Poll客户端循环调用拉取
Process业务处理
Commit提交位点
Heartbeat心跳保活

详细解析

1. Subscribe 阶段

Subscribe 流程: ① 客户端调用 subscribe(["topic1", "topic2"]) ② 客户端向 MNode 注册 Consumer: - 上报 group_id, consumer_id, 订阅的 Topic 列表 ③ MNode 加入 Consumer 到对应 Group ④ 触发 Rebalance: - 重新计算分区分配 - 通知所有组员 ⑤ Consumer 收到分配方案: - 知道自己负责哪些 VGroup ⑥ 准备从各 VGroup 的对应 Offset 开始消费

2. Assign 与 Rebalance

分区分配算法(Range / Round-Robin 等): 示例: Topic 在 6 个 VGroup 有数据 Group 有 3 个 Consumer 分配: Consumer-1: VG1, VG2 Consumer-2: VG3, VG4 Consumer-3: VG5, VG6 当 Consumer 加入/离开: ① MNode 检测到变化 ② 发起 Rebalance: - 收回所有分区 - 按新成员重新分配 - 通知所有 Consumer ③ Consumer 暂停 Poll ④ 提交未提交 Offset(可配置) ⑤ 接收新分区 ⑥ 恢复 Poll Rebalance 期间: - 消费暂停(短暂) - 可能重复消费(未 Commit 部分) - 应用应设计幂等

3. Fetch 阶段

Fetch 从服务端拉取数据: Client VNode │ │ │── Fetch (offset=N) ───→│ │ │── 从 WAL 读取 N 之后的数据 ─┐ │ │← ────────── 返回 ─────────┘ │ │── 应用 Topic SQL 过滤 ────┐ │ │← ────── 过滤后数据 ─────────┘ │ │ │← ── Messages ──────────│ │ │ Fetch 特性: - 长轮询(无数据时短暂等待) - 批量返回(多条消息打包) - 每个 VGroup 独立 Fetch - 多 VGroup 并发拉取

4. Poll 循环

应用层 Poll 循环: while running: msgs = consumer.poll(timeout=1.0) if msgs is None: continue // 超时无消息 for msg in msgs: process(msg) if should_commit: consumer.commit() Poll 内部: ① 从各 VGroup 缓冲队列取消息 ② 缓冲不足 → 异步 Fetch 补充 ③ 返回当前可用的消息批 ④ 超时返回 None

5. Topic SQL 过滤

Topic 中的 SQL 在哪里执行? CREATE TOPIC topic_high_current AS SELECT * FROM meters WHERE current > 100; 执行位置: ① 写入时:数据写入 WAL(未过滤) ② Fetch 时:VNode 读取 WAL → 应用 SQL 过滤 → 返回过滤后数据 优势: - 网络传输已过滤数据(节省带宽) - Consumer 端处理逻辑简化 - 多 Consumer 共享过滤计算

6. Commit 与位点持久化

Commit 流程: ① Consumer 调用 commit(offsets) ② 客户端组装 Commit 请求 ③ 发送到 MNode ④ MNode 持久化位点: - 写入元数据库 - 多副本同步 ⑤ 返回成功 Commit 类型: - Sync Commit:等待服务端确认 - Async Commit:发送后立即返回 - Auto Commit:定期自动 Commit 位点存储: - 按 (group_id, topic, vgroup_id) 维度存储 - 仅存储最新位点(不存历史) - Group 删除时位点清理

7. 心跳保活

Consumer 心跳: Consumer 定期向 MNode 发送 Heartbeat: - 报告存活 - 上报消费进度(可选) MNode 维护 Consumer 状态: - 收到 Heartbeat → 标记活跃 - 超过 session.timeout → 标记失联 - 失联触发 Rebalance 关键参数: heartbeat.interval.ms // 心跳间隔 session.timeout.ms // 失联阈值

8. 异常处理

常见异常及处理: ① 网络断开: - 客户端 SDK 自动重连 - 重连成功后继续 Poll ② Rebalance: - Poll 短暂阻塞 - 注册 RebalanceCallback 处理状态 ③ 消息处理失败: - 不 Commit → 下次 Poll 重发 - 业务必须幂等 ④ Consumer 崩溃: - 心跳超时 → 触发 Rebalance - 其他 Consumer 接管分区 - 从最近 Commit 位置继续 ⑤ 服务端 WAL 滚动后丢失早期数据: - Consumer Offset 落后过多 - 错误:offset out of range - 处理:重置为 earliest 或调大 WAL 保留期

代码示例

完整消费骨架(Python)

fromtaos.tmqimportConsumerdefprocess(msg):forblockinmsg:forrowinblock:handle_row(row)consumer=Consumer({"group.id":"worker_group","auto.offset.reset":"earliest","enable.auto.commit":"false","session.timeout.ms":"30000",})consumer.subscribe(["topic_meters"])try:whileTrue:msg=consumer.poll(timeout=1.0)ifmsgisNone:continuetry:process(msg)consumer.commit()exceptExceptionase:log.error(f"Process failed:{e}")# 不 commit → 下次重试finally:consumer.close()

Rebalance 监听

defon_assign(consumer,partitions):print(f"Assigned:{partitions}")defon_revoke(consumer,partitions):print(f"Revoked:{partitions}")consumer.commit()# 失去分区前提交consumer.subscribe(["topic_meters"],on_assign=on_assign,on_revoke=on_revoke)

性能考量

消费延迟分析

阶段典型延迟
写入到 WAL 可读< 10ms
Fetch 网络往返1~10ms
过滤计算取决于 SQL 复杂度
客户端处理业务决定
Commit 持久化5~20ms

高吞吐配置

高吞吐场景: - 大批量 Fetch(max.poll.records 调大) - 异步 Commit - 增加 Consumer 数(不超过 VGroup 数) - 处理逻辑异步化(线程池) 低延迟场景: - 短 timeout - 频繁 Poll - 同步 Commit 保证有序

FAQ

Q1: 一次 Poll 返回多少消息?

由服务端 batch 决定,通常几十~几千行。客户端可通过参数限制最大批量。

Q2: Commit 失败怎么办?

捕获异常重试。重复 Commit 是幂等的(多次 Commit 同一 Offset 无副作用)。

Q3: 同一消息能被多次 Commit 吗?

可以。Commit 只是更新位点最大值,不影响消息本身。

Q4: Consumer 长时间不 Commit 会怎样?

  • 占用大量 WAL 空间(无法清理)
  • Rebalance 后会重复消费很多数据
  • 建议至少定期 Commit

Q5: 删除 Consumer Group 怎么做?

DROPCONSUMERGROUPgroup_idONtopic_name;

参考

系统构架篇

  • 01-《TDengine 整体架构全景》
  • 02-《集群拓扑深度解析》
  • 03-《MNode 内部机制深度解析》
  • 04-《RPC 通信层深度解析》
  • 05-《VNode 生命周期》
  • 06-《RAFT 共识协议》
  • 07-《端到端的消息流》

数据模型

  • 01-《数据库创建与参数详解》
  • 02-《超级表/子表/普通表》
  • 03-《支持数据类型深度解析》
  • 04-《TDengine Tag 设计哲学与 Schema 变更机制》
  • 05-《TDengine 虚拟表实现原理》

存储引擎

  • 01-《TDengine 存储引擎概览》
  • 02-《TDengine MemTable 深度解析》
  • 03-《TDengine WAL 预写日志机制》
  • 04-《TDengine 数据文件格式》
  • 05-《TDengine Commit 与 Flush 机制 》
  • 06-《TDengine Compaction 合并策略 》
  • 07-《TDengine 数据保留与 TTL》
  • 08-《TDengine 压缩编码机制》
  • 09-《TDengine Cache 与 Last 查询加速》
  • 10-《TDengine 逻辑计划生成》

查询引擎

  • 01-《TDengine 查询引擎概览》
  • 02-《TDengine SQL 解析与词法分析》
  • 03-《TDengine 语义分析与 AST 重写》
  • 04-《TDengine 逻辑计划生成》
  • 05-《TDengine 物理计划生成》
  • 06-《TDengine 扫描算子》
  • 07-《TDengine 聚合算子》
  • 08-《TDengine 聚合算子》
  • 09-《TDengine 连接算子》
  • 10-《TDengine 排序、填充与投影》
  • 11-《TDengine 分布式查询执行》
  • 12-《TDengine EXPLAIN 与查询优化》

数据写入

  • 01-《TDengine SQL INSERT》
  • 02-《TDengine 无模式写入》
  • 03-《TDengine STMT 写入》
  • 04-《TDengine 写入内部流程》
  • 05-《TDengine 数据更新删除》

数据订阅

  • 01-《TDengine 数据订阅》
  • 02-《TDengine 订阅 vs Kafka》

关于 TDengine

TDengine 专为物联网IoT平台、工业大数据平台设计。其中,TDengine TSDB 是一款高性能、分布式的时序数据库(Time Series Database),同时它还带有内建的缓存、流式计算、数据订阅等系统功能;TDengine IDMP 是一款AI原生工业数据管理平台,它通过树状层次结构建立数据目录,对数据进行标准化、情景化,并通过 AI 提供实时分析、可视化、事件管理与报警等功能。

相关新闻

  • 数字孪生项目案例 | 区域发展指挥中心
  • 一句话讲透向量数据库:它把“语义相似“变成了可计算的东西
  • 计算机Java毕设实战-基于 SpringBoot 的二次元游戏周边购物商城系统的设计与实现 基于 SpringBoot 的游戏周边商品买卖管理【完整源码+LW+部署说明+演示视频,全bao一条龙等】

最新新闻

  • 线上AI接口大面积超时:一次从告警到修复的完整排查记录
  • 从零构建实时手势识别系统:基于YOLOv5与MobileNetV2的深度学习实战
  • 户外空气净化优选雾森系统 吸附悬浮粉尘清新园区空气
  • ChatGPT品牌优化如何落地:大鱼营销的内容与渠道实践观察
  • 如何快速解包Godot游戏资源:godot-unpacker完整使用指南
  • 2026年6月30日复测:八字排盘的命理软件推荐:2026最新第三方测评看这几条硬指标

日新闻

  • 2026年6月公司网站搭建最新热门渠道测评:四大低成本/零代码平台对比+避坑
  • 【Linux】Linux arm 编译QT程序,出现expected “}“报错
  • 【MATLAB例程】四基站二维AOA定位与距离辅助增强对比仿真。基于角度观测和测距修正的固定目标平面定位精度分析

周新闻

  • Windows字体自定义终极方案:No!! MeiryoUI完全指南
  • Deepin Boot Maker:告别命令行,3分钟制作Linux启动盘的智能解决方案
  • Plain Craft Launcher 2:重新定义你的Minecraft游戏体验

月新闻

  • 2026年6月公司网站搭建最新热门渠道测评:四大低成本/零代码平台对比+避坑
  • 【Linux】Linux arm 编译QT程序,出现expected “}“报错
  • 【MATLAB例程】四基站二维AOA定位与距离辅助增强对比仿真。基于角度观测和测距修正的固定目标平面定位精度分析

关于尧图

  • 公司简介
  • 团队介绍
  • 企业文化
  • 荣誉资质

服务项目

  • 定制开发
  • 电商建站
  • UI 设计
  • 运维服务

快速链接

  • 案例展示
  • 建站流程
  • 常见问题
  • 资讯中心

联系方式

  • 📍北京市朝阳区互联网产业园 A 座 10 层
  • 📞400-888-8888
  • ✉️contact@rkmt.cn
  • 🕐周一至周日 9:00-21:00

© 2024 北京尧图网络科技有限公司 版权所有 | 京 ICP 备 XXXXXXXX 号