当前位置: 首页 > news >正文

用Python和pymodbus库模拟Modbus RTU主从通信(附完整代码与数据帧解析)

用Python和pymodbus库模拟Modbus RTU主从通信(附完整代码与数据帧解析)

在工业自动化领域,Modbus RTU协议因其简单可靠的特点,成为连接PLC、传感器和上位机系统的通用语言。本文将带您从零开始,使用Python的pymodbus库构建完整的Modbus RTU通信测试环境,通过代码实例深入解析协议底层的数据帧结构。

1. 环境搭建与基础配置

1.1 虚拟串口工具配置

在开始编码前,我们需要模拟物理串口环境。Windows平台推荐使用com0com虚拟串口工具,Linux/Mac可使用socat命令创建虚拟串口对:

# Linux/Mac虚拟串口创建 socat -d -d pty,raw,echo=0 pty,raw,echo=0

安装Python环境依赖:

pip install pymodbus==3.0.0 serial

注意:pymodbus 3.x版本重构了异步IO支持,本文示例基于同步API以保证兼容性

1.2 协议基础参数设置

Modbus RTU的关键参数需要主从设备严格一致:

参数典型值说明
波特率19200常见工业设备标准速率
数据位8固定配置
停止位1常见配置
校验位None/Even根据设备要求设置
响应超时1秒等待从站响应的最长时间

2. 构建Modbus从站模拟器

2.1 初始化从站服务

from pymodbus.server.sync import StartSerialServer from pymodbus.datastore import ModbusSequentialDataBlock from pymodbus.datastore import ModbusSlaveContext, ModbusServerContext def run_slave(): store = ModbusSlaveContext( di=ModbusSequentialDataBlock(0, [0]*100), # 离散输入 co=ModbusSequentialDataBlock(0, [0]*100), # 线圈 hr=ModbusSequentialDataBlock(0, [0]*100), # 保持寄存器 ir=ModbusSequentialDataBlock(0, [0]*100)) # 输入寄存器 context = ModbusServerContext(slaves=store, single=True) StartSerialServer( context, port='/dev/ttyS1', # 虚拟串口设备 baudrate=19200, timeout=1)

2.2 数据帧结构验证

当主站发送读取保持寄存器请求(功能码0x03)时,从站响应的原始数据帧可通过Wireshark捕获分析:

响应帧示例: 01 03 02 00 0A 45 CD
  • 01:从站地址
  • 03:功能码
  • 02:数据字节数
  • 00 0A:寄存器值(十进制10)
  • 45 CD:CRC校验码

3. 实现Modbus主站功能

3.1 同步读取操作实例

from pymodbus.client.sync import ModbusSerialClient client = ModbusSerialClient( method='rtu', port='/dev/ttyS0', baudrate=19200, timeout=1) # 读取保持寄存器 result = client.read_holding_registers( address=0, # 起始地址 count=5, # 寄存器数量 unit=1) # 从站地址 print("寄存器值:", result.registers)

3.2 数据帧构造原理

底层请求帧的构建过程:

def build_read_frame(slave_id, function_code, address, count): # 构造PDU pdu = bytes([function_code]) + \ address.to_bytes(2, 'big') + \ count.to_bytes(2, 'big') # 添加CRC校验 crc = compute_crc(bytes([slave_id]) + pdu) return bytes([slave_id]) + pdu + crc.to_bytes(2, 'little') # 示例:构造读取保持寄存器请求 frame = build_read_frame( slave_id=1, function_code=0x03, address=0, count=5) print("原始请求帧:", frame.hex())

4. 高级功能实现与调试

4.1 批量写入操作

# 写入多个保持寄存器(功能码0x10) values = [10, 20, 30, 40] result = client.write_registers( address=0, values=values, unit=1) if result.isError(): print("写入失败:", result) else: print("写入成功")

对应的请求帧解析:

01 10 00 00 00 04 08 00 0A 00 14 00 1E 00 28 42 1F
  • 00 00:起始地址
  • 00 04:寄存器数量
  • 08:数据字节长度
  • 后续8字节为4个寄存器的值

4.2 异常处理机制

Modbus协议定义的异常响应格式:

字段示例值说明
从站地址01
异常功能码83原功能码+0x80
异常码02非法数据地址

捕获异常响应的代码实现:

try: response = client.read_coils(0, 10, unit=1) if response.isError(): print("Modbus异常码:", response.exception_code) except Exception as e: print("通信错误:", str(e))

5. 协议分析工具链

5.1 使用Wireshark解码Modbus RTU

配置步骤:

  1. 安装Wireshark并启用串口捕获
  2. 添加Modbus RTU解析器
  3. 过滤语法:modbus || modbusadu

典型数据包分析:

Frame 1 (Request): 0000 01 03 00 00 00 02 44 09 ......D. Frame 2 (Response): 0000 01 03 04 00 0a 00 14 f5 33 ......ó3

5.2 自定义数据监控工具

import matplotlib.pyplot as plt from collections import deque class ModbusMonitor: def __init__(self, max_points=50): self.data = deque(maxlen=max_points) def update(self, values): self.data.extend(values) plt.clf() plt.plot(self.data) plt.pause(0.01) # 使用示例 monitor = ModbusMonitor() while True: result = client.read_holding_registers(0, 5, unit=1) monitor.update(result.registers)

6. 性能优化实践

6.1 多寄存器读取优化

# 批量读取策略对比 single_read_time = timeit.timeit( lambda: client.read_holding_registers(0, 1, unit=1), number=100) batch_read_time = timeit.timeit( lambda: client.read_holding_registers(0, 10, unit=1), number=100) print(f"单点读取耗时: {single_read_time:.3f}s") print(f"批量读取耗时: {batch_read_time:.3f}s")

6.2 连接池管理

from pymodbus.client.sync import ModbusConnectionPool pool = ModbusConnectionPool( client_class=ModbusSerialClient, max_size=5, **client_params) def safe_read(address): with pool.acquire() as conn: return conn.read_holding_registers(address, 1)

在实际项目中,通过合理设置超时时间和重试机制,可以将通信成功率提升至99.9%以上。一个常见的经验是,对于19200波特率的网络,单个请求的超时时间不应低于300ms。

http://www.rkmt.cn/news/1481732.html

相关文章:

  • Notepad--完全指南:5分钟上手跨平台文本编辑神器
  • RL-Kernel
  • 比亚迪入局机器人赛道:内部消化订单跳过商业化等待期,能否复刻电池芯片成功路径?
  • 从CCFL到RGB-LED:显示背光技术演进与色彩革命
  • 串口通信中0x0C清屏指令的原理与应用实践
  • Xiaomi Miot Auto本地模式终极解决方案:深度解析离线运行疑难
  • 当AI学会编程——从ZeroLang到供应链攻击,开发者的护城河还剩什么?
  • 工业串口抗干扰实战:从RS-232烧毁到RS-485防护电路设计
  • 点狮HRM企业级HRM薪资计算系统架构设计
  • 番茄小说下载器:终极免费工具,5大实用技巧轻松收藏小说
  • 为什么92%的运营人买错了CSDN AI套餐?资深签约顾问亲授季度锁价黄金窗口期
  • 在Mac上运行Windows程序:Whisky终极免费指南
  • 告别英文恐惧:BurpSuiteCN-Release让安全测试真正变得简单
  • 深圳奖项申报代理机构排行:5家合规服务商盘点 - 奔跑123
  • 甄选:广州靠谱的精油厂商 - 品牌推广大师
  • 供应链数字化转型:从Excel到APS系统,破解8亿美金企业交付困局
  • 二维点集凸包计算工具:Graham、Jarvis等算法实现+可视化与性能测试
  • 5000 万订单表清理 3000 万历史数据(不影响线上)落地方案
  • ArcGIS Pro 3.0 模型构建器实战:告别手动,一键按属性批量拆分SHP文件
  • Blender贝塞尔曲线革命:从初学者到专业设计师的5个必备工具
  • AI论文写作工具的合规秘籍:如何让AI生成内容通过严格学术审查
  • KeyboardChatterBlocker:终极免费开源键盘连击修复工具完全指南
  • 从0到1构建企业级权限系统:Mini-RBAC实战全解析
  • 从外挂到原生:双卡双待技术演进与Android平台集成实战
  • 2026国产在线PH分析仪十大品牌深度横评:技术突围下的真实力与场景化选型指南 - 水质仪表品牌排行榜
  • # 2026年在线ORP仪优选品牌TOP10:权威榜单与深度选型指南 - 水质仪表品牌排行榜
  • 终极ThinkPad风扇控制指南:告别噪音与高温的128级精准调控
  • 校园二手物品交易平台:从需求分析到原型设计的思考
  • 常州市中级经济师工商管理/人力资源管理:适配人群、岗位匹配与备考全攻略 - 众智商学院课程中心
  • BambuStudio开发者指南:如何为3D打印开源项目贡献你的代码力量