当前位置: 首页 > news >正文

Python读取光谱仪数据的完整代码示例

一、前言

上一篇文章我们介绍了光谱仪的工作原理,本文将手把手教你用 Python 连接辰昶光纤光谱仪,实现光谱数据的实时读取、显示和保存。

本文亮点

  • ✅ 完整的设备连接代码
  • ✅ 实时光谱采集
  • ✅ 数据可视化
  • ✅ 文件保存与管理
  • ✅ 错误处理机制


二、开发环境准备

2.1 硬件要求

  • 辰昶光纤光谱仪(本文以 EQ2000/ER4000 系列为例)
  • USB 数据线或以太网连接
  • 光纤跳线(推荐 SMA905 接口)

2.2 软件依赖

pip install numpy pandas matplotlib pyserial pyusb

依赖说明

包名用途
numpy数值计算
pandas数据存储
matplotlib光谱可视化
pyserial串口通信
pyusbUSB设备连接

三、基础连接代码

3.1 USB连接方式

import numpy as np import matplotlib.pyplot as plt from matplotlib.animation import FuncAnimation import time class ChopticsSpectrometer: """ 辰昶光纤光谱仪 Python 控制类 支持 EQ2000、ER4000、EK2000 Pro 系列 """ def __init__(self, serial_port='COM3', baudrate=115200): self.serial_port = serial_port self.baudrate = baudrate self.is_connected = False self.serial = None def connect(self): """连接光谱仪""" import serial try: self.serial = serial.Serial( port=self.serial_port, baudrate=self.baudrate, bytesize=serial.EIGHTBITS, parity=serial.PARITY_NONE, stopbits=serial.STOPBITS_ONE, timeout=1.0 ) self.is_connected = True print(f"✅ 光谱仪已连接: {self.serial_port}") # 获取设备信息 self._get_device_info() return True except serial.SerialException as e: print(f"❌ 连接失败: {e}") return False def _get_device_info(self): """获取设备基本信息""" # 发送查询命令(具体命令格式请参考辰昶通信协议文档) self.serial.write(b'*IDN?\r\n') response = self.serial.readline().decode('utf-8').strip() print(f"设备信息: {response}") def set_integration_time(self, time_ms): """ 设置积分时间(毫秒) 辰昶光谱仪范围: 1ms ~ 65000ms """ command = f'INTEGRATION,{time_ms}\r\n' self.serial.write(command.encode()) response = self.serial.readline().decode('utf-8').strip() print(f"积分时间设置: {response}") def acquire_spectrum(self): """ 获取单次光谱数据 返回: (wavelengths, intensities) """ if not self.is_connected: raise RuntimeError("请先连接光谱仪") # 发送采集命令 self.serial.write(b'SPECTRUM\r\n') # 读取波长数据 num_pixels = 2048 # EQ2000系列 wavelength_data = [] for _ in range(num_pixels): line = self.serial.readline().decode('utf-8').strip() if line: wavelength_data.append(float(line)) # 读取强度数据 intensity_data = [] for _ in range(num_pixels): line = self.serial.readline().decode('utf-8').strip() if line: intensity_data.append(float(line)) return np.array(wavelength_data), np.array(intensity_data) def close(self): """关闭连接""" if self.serial and self.serial.is_open: self.serial.close() self.is_connected = False print("🔌 光谱仪连接已关闭") # 使用示例 if __name__ == "__main__": spectrometer = ChopticsSpectrometer(serial_port='COM3') if spectrometer.connect(): # 设置积分时间 100ms spectrometer.set_integration_time(100) # 获取光谱 wavelengths, intensities = spectrometer.acquire_spectrum() # 绘制光谱图 plt.figure(figsize=(12, 6)) plt.plot(wavelengths, intensities, 'b-', linewidth=1) plt.xlabel('波长 (nm)', fontsize=12) plt.ylabel('强度 (counts)', fontsize=12) plt.title('光纤光谱仪实时数据', fontsize=14) plt.grid(True, alpha=0.3) plt.show() spectrometer.close()

3.2 以太网连接方式(工业级)

import socket import struct class EthernetSpectrometer: """ 以太网连接光谱仪(适合工业在线检测) 支持 TCP/IP 协议 """ def __init__(self, ip_address='192.168.1.100', port=5000): self.ip_address = ip_address self.port = port self.socket = None def connect(self): """建立TCP连接""" try: self.socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) self.socket.connect((self.ip_address, self.port)) self.socket.settimeout(5.0) print(f"✅ 已连接到 {self.ip_address}:{self.port}") return True except Exception as e: print(f"❌ 连接失败: {e}") return False def send_command(self, command): """发送命令""" self.socket.sendall((command + '\r\n').encode()) def receive_data(self, expected_bytes): """接收原始数据""" data = b'' while len(data) < expected_bytes: packet = self.socket.recv(4096) if not packet: break data += packet return data def acquire_spectrum(self): """采集光谱数据""" self.send_command('ACQUIRE') # EQ2000: 2048像素,16位 = 4096字节 raw_data = self.receive_data(4096) # 解析数据 intensities = np.array(struct.unpack(f'{2048}H', raw_data)) # 生成波长数组(根据校准参数) start_wavelength = 200 # nm end_wavelength = 1100 # nm wavelengths = np.linspace(start_wavelength, end_wavelength, 2048) return wavelengths, intensities def close(self): if self.socket: self.socket.close()

四、实时显示与数据处理

4.1 实时采集动画

class RealTimeSpectrometerDisplay: """ 实时光谱显示类 支持动态更新、峰值标记、数据统计 """ def __init__(self, spectrometer, interval=100): """ Args: spectrometer: ChopticsSpectrometer 实例 interval: 采集间隔(毫秒) """ self.spectrometer = spectrometer self.interval = interval self.spectra_history = [] # 存储历史数据 def update_plot(self, frame): """更新光谱曲线""" try: wavelengths, intensities = self.spectrometer.acquire_spectrum() # 绘制当前光谱 self.ax.clear() self.ax.plot(wavelengths, intensities, 'b-', linewidth=1.5, label='实时光谱') # 标记峰值 peak_indices, _ = find_peaks(intensities, height=1000) if len(peak_indices) > 0: peak_wavelengths = wavelengths[peak_indices] peak_intensities = intensities[peak_indices] self.ax.scatter(peak_wavelengths, peak_intensities, c='red', s=50, zorder=5, label='峰值') # 标注峰值波长 for w, i in zip(peak_wavelengths, peak_intensities): self.ax.annotate(f'{w:.1f}nm', (w, i), textcoords="offset points", xytext=(0,10), ha='center', fontsize=9) # 显示统计信息 stats_text = f'最大值: {intensities.max()}\n' stats_text += f'平均值: {intensities.mean():.1f}\n' stats_text += f'标准差: {intensities.std():.1f}' self.ax.text(0.02, 0.98, stats_text, transform=self.ax.transAxes, verticalalignment='top', bbox=dict(boxstyle='round', facecolor='wheat', alpha=0.5)) self.ax.set_xlabel('波长 (nm)', fontsize=12) self.ax.set_ylabel('强度 (counts)', fontsize=12) self.ax.set_title('光纤光谱仪实时监测', fontsize=14) self.ax.grid(True, alpha=0.3) self.ax.legend(loc='upper right') self.ax.set_xlim(200, 1100) # 保存历史数据 self.spectra_history.append(intensities.copy()) except Exception as e: print(f"采集异常: {e}") def start(self): """启动实时显示""" fig, self.ax = plt.subplots(figsize=(14, 6)) ani = FuncAnimation(fig, self.update_plot, interval=self.interval, save_count=100) plt.show() def save_data(self, filename='spectrum_data.csv'): """保存数据到CSV""" import pandas as pd if not self.spectra_history: print("无历史数据") return df = pd.DataFrame(self.spectra_history) df.to_csv(filename, index=False) print(f"✅ 数据已保存: {filename}") # 启动实时显示 if __name__ == "__main__": spec = ChopticsSpectrometer(serial_port='COM3') spec.connect() spec.set_integration_time(50) # 50ms积分时间 display = RealTimeSpectrometerDisplay(spec, interval=100) display.start()

4.2 数据平滑与去噪

from scipy.signal import savgol_filter, medfilt from scipy.ndimage import gaussian_filter1d def preprocess_spectrum(intensities, method='sg'): """ 光谱数据预处理 Parameters: intensities: 原始光谱强度数据 method: 'sg' (Savitzky-Golay) 或 'gauss' (高斯) 或 'median' (中值) Returns: 平滑后的光谱数据 """ if method == 'sg': # Savitzky-Golay滤波器(保持峰形) return savgol_filter(intensities, window_length=11, polyorder=3) elif method == 'gauss': # 高斯平滑 return gaussian_filter1d(intensities, sigma=2) elif method == 'median': # 中值滤波(去除脉冲噪声) return medfilt(intensities, kernel_size=5) else: return intensities def remove_baseline(intensities, lambda_param=1e5): """ 基线校正(AsLS算法) 用于去除荧光背景、散射背景等 """ from scipy import sparse from scipy.sparse.linalg import spsolve n = len(intensities) L = sparse.diags([1, -2, 1], [0, -1, -2], shape=(n, n-2)) L = lambda_param * L.dot(L.transpose()) D = sparse.diags([1, -1], [0, -1], shape=(n, n)) w = np.ones(n) W = sparse.diags(w, 0) for _ in range(10): # 迭代优化 W.setdiag(w) Z = W + L baseline = spsolve(Z, w * intensities) w = 1 * (intensities > baseline) + 0.001 * (intensities <= baseline) return intensities - baseline

五、数据保存与导出

5.1 多种格式保存

import json from datetime import datetime class SpectrumDataManager: """光谱数据管理器""" def __init__(self, base_path='./data'): self.base_path = base_path self.current_session = datetime.now().strftime('%Y%m%d_%H%M%S') def save_csv(self, wavelengths, intensities, filename=None): """保存为CSV格式""" if filename is None: filename = f'spectrum_{self.current_session}.csv' filepath = os.path.join(self.base_path, filename) df = pd.DataFrame({ 'Wavelength_nm': wavelengths, 'Intensity_counts': intensities }) df.to_csv(filepath, index=False) print(f"✅ CSV已保存: {filepath}") return filepath def save_numpy(self, wavelengths, intensities, filename=None): """保存为NumPy格式(保留高精度)""" if filename is None: filename = f'spectrum_{self.current_session}.npz' filepath = os.path.join(self.base_path, filename) np.savez(filepath, wavelengths=wavelengths, intensities=intensities) print(f"✅ NumPy已保存: {filepath}") return filepath def save_with_metadata(self, wavelengths, intensities, metadata=None): """ 保存带元数据的光谱文件 包含仪器参数、测量条件等信息 """ if metadata is None: metadata = {} # 元数据 metadata.update({ 'timestamp': datetime.now().isoformat(), 'instrument': '辰昶光纤光谱仪', 'num_points': len(wavelengths), 'wavelength_range': f'{wavelengths.min():.2f}-{wavelengths.max():.2f}nm' }) filename = f'spectrum_{self.current_session}_meta.json' filepath = os.path.join(self.base_path, filename) data = { 'metadata': metadata, 'wavelengths': wavelengths.tolist(), 'intensities': intensities.tolist() } with open(filepath, 'w', encoding='utf-8') as f: json.dump(data, f, indent=2, ensure_ascii=False) print(f"✅ 带元数据文件已保存: {filepath}") return filepath

六、工业级应用示例

6.1 在线浓度监测

class ConcentrationMonitor: """ 在线浓度监测系统 基于Beer-Lambert定律: A = ε·c·l """ def __init__(self, spectrometer, wavelength_absorbance): self.spectrometer = spectrometer self.wavelength_absorbance = wavelength_absorbance self.calibration_curve = {} # 浓度标定曲线 def set_calibration(self, concentrations, absorbances): """ 设置标定曲线 concentrations: 标准浓度列表 absorbances: 对应的吸光度值 """ from scipy.optimize import curve_fit def linear(x, a, b): return a * x + b popt, _ = curve_fit(linear, concentrations, absorbances) self.calibration_curve = {'a': popt[0], 'b': popt[1]} print(f"标定完成: A = {popt[0]:.4f}×C + {popt[1]:.4f}") def measure_concentration(self): """测量当前浓度""" wavelengths, intensities = self.spectrometer.acquire_spectrum() # 计算吸光度(需要参考光谱) # A = log10(I_ref / I_sample) absorbance = np.log10(self.reference_intensity / intensities) # 在指定波长处读取吸光度 idx = np.argmin(np.abs(wavelengths - self.wavelength_absorbance)) A = absorbance[idx] # 根据标定曲线计算浓度 if self.calibration_curve: C = (A - self.calibration_curve['b']) / self.calibration_curve['a'] return C, A return None, A

七、常见问题与解决方案

问题可能原因解决方案
连接超时串口被占用检查端口号,更换USB口
采集数据为0光纤未连接检查光纤接口,确保光路导通
噪声过大积分时间过短增加积分时间,或使用平均采集
基线漂移环境温度变化使用基线校正算法
通信不稳定USB供电不足使用带供电的USB集线器
💡辰昶仪器提示:辰昶光谱仪提供完整的SDK开发包和技术支持,包括Python、LabVIEW、C#、Java等多语言接口。

八、总结

本文提供了完整的 Python 光谱仪数据读取方案:

  1. 基础连接类- 支持USB和以太网两种连接方式
  2. 实时显示- 动态更新、峰值标记、统计信息
  3. 数据预处理- 平滑、去噪、基线校正
  4. 多格式保存- CSV、NumPy、带元数据的JSON
  5. 工业应用- 浓度监测、在线分析示例
http://www.rkmt.cn/news/1489791.html

相关文章:

  • 30岁的女人适合考个什么证
  • 食品异物赔偿协商录音泄露,舆情处置时沟通记录别踩坑
  • 2026年迪拜公司注册权威机构排行:危险化学品许可证/吉尔吉斯斯坦公司注册/哈萨克斯坦公司注册/合规服务对比 - 优质品牌商家
  • 小白程序员必备!3个月从零掌握大模型,附收藏版AI学习路线图
  • 前端超能力:让浏览器听你指挥——技术基石:Web API 的“听觉”与“理解”能力
  • C语言中的递归
  • Krita AI Diffusion项目解决SD3模型CLIP文件缺失问题的完整指南
  • 意图共鸣科技《AI记忆链商业化白皮书3.0》学习笔记:“AI焦虑的解药”=第二大脑+记忆主权
  • 大模型时代,小白也能入行!2026年AI岗必看指南,高薪收藏版
  • 零基础搭建本地 AI,OpenClaw Windows/macOS 落地实操
  • 终极音乐解放指南:如何使用qmc-decoder高效解密QQ音乐加密文件
  • 赤火时代的钛合金水淬炉好用吗? - myqiye
  • 选购玩具面料,安鹏纺织是您的不二之选 - myqiye
  • 修改liunx最大句柄数
  • 杭州大润发购物卡回收时效解析:技术流程与平台选择 - 优质品牌商家
  • 【环形缓冲区】1-概念与编程
  • 2026年,专业做实验台的厂家究竟有何独特之处?
  • 不锈钢板拉丝工艺解析与行业合规选型实测推荐:304不锈钢管/316l不锈钢焊管/316l不锈钢管/优选推荐 - 优质品牌商家
  • Agent到底是什么?大模型新焦点,小白程序员必看(收藏备用)
  • 陈刚直言|一条产线,三种制造模式:如何复用同一套软件?
  • 2026年净化工程公司如何选择 - 工业品牌热点
  • C语言pthread_create传参踩坑记:从‘-Wincompatible-pointer-types’警告到线程安全数据传递
  • 2026年LED显示屏哪家好用?性价比高的品牌排名 - myqiye
  • Linux进程控制学习总结(2/2)
  • 小米 mimo 邀请码 4EQMGN
  • ThinkPad风扇终极控制:TPFanControl2完全使用指南
  • 2026年能做耐高温长途运输保鲜泡沫箱的厂家排名 - mypinpai
  • 2026年余姚靠谱的黄金回收机构有哪些?融通寄售黄金名表值得信赖 - 工业品牌热点
  • 如何永久保存微信聊天记录:WeChatMsg本地导出工具终极指南
  • 别再只懂四舍五入了!IEEE754浮点数舍入模式实战:用Python和C++代码带你搞懂银行家舍入