Python读取光谱仪数据的完整代码示例
一、前言
上一篇文章我们介绍了光谱仪的工作原理,本文将手把手教你用 Python 连接辰昶光纤光谱仪,实现光谱数据的实时读取、显示和保存。
本文亮点:
- ✅ 完整的设备连接代码
- ✅ 实时光谱采集
- ✅ 数据可视化
- ✅ 文件保存与管理
- ✅ 错误处理机制
二、开发环境准备
2.1 硬件要求
- 辰昶光纤光谱仪(本文以 EQ2000/ER4000 系列为例)
- USB 数据线或以太网连接
- 光纤跳线(推荐 SMA905 接口)
2.2 软件依赖
pip install numpy pandas matplotlib pyserial pyusb依赖说明:
| 包名 | 用途 |
|---|---|
| numpy | 数值计算 |
| pandas | 数据存储 |
| matplotlib | 光谱可视化 |
| pyserial | 串口通信 |
| pyusb | USB设备连接 |
三、基础连接代码
3.1 USB连接方式
import numpy as np import matplotlib.pyplot as plt from matplotlib.animation import FuncAnimation import time class ChopticsSpectrometer: """ 辰昶光纤光谱仪 Python 控制类 支持 EQ2000、ER4000、EK2000 Pro 系列 """ def __init__(self, serial_port='COM3', baudrate=115200): self.serial_port = serial_port self.baudrate = baudrate self.is_connected = False self.serial = None def connect(self): """连接光谱仪""" import serial try: self.serial = serial.Serial( port=self.serial_port, baudrate=self.baudrate, bytesize=serial.EIGHTBITS, parity=serial.PARITY_NONE, stopbits=serial.STOPBITS_ONE, timeout=1.0 ) self.is_connected = True print(f"✅ 光谱仪已连接: {self.serial_port}") # 获取设备信息 self._get_device_info() return True except serial.SerialException as e: print(f"❌ 连接失败: {e}") return False def _get_device_info(self): """获取设备基本信息""" # 发送查询命令(具体命令格式请参考辰昶通信协议文档) self.serial.write(b'*IDN?\r\n') response = self.serial.readline().decode('utf-8').strip() print(f"设备信息: {response}") def set_integration_time(self, time_ms): """ 设置积分时间(毫秒) 辰昶光谱仪范围: 1ms ~ 65000ms """ command = f'INTEGRATION,{time_ms}\r\n' self.serial.write(command.encode()) response = self.serial.readline().decode('utf-8').strip() print(f"积分时间设置: {response}") def acquire_spectrum(self): """ 获取单次光谱数据 返回: (wavelengths, intensities) """ if not self.is_connected: raise RuntimeError("请先连接光谱仪") # 发送采集命令 self.serial.write(b'SPECTRUM\r\n') # 读取波长数据 num_pixels = 2048 # EQ2000系列 wavelength_data = [] for _ in range(num_pixels): line = self.serial.readline().decode('utf-8').strip() if line: wavelength_data.append(float(line)) # 读取强度数据 intensity_data = [] for _ in range(num_pixels): line = self.serial.readline().decode('utf-8').strip() if line: intensity_data.append(float(line)) return np.array(wavelength_data), np.array(intensity_data) def close(self): """关闭连接""" if self.serial and self.serial.is_open: self.serial.close() self.is_connected = False print("🔌 光谱仪连接已关闭") # 使用示例 if __name__ == "__main__": spectrometer = ChopticsSpectrometer(serial_port='COM3') if spectrometer.connect(): # 设置积分时间 100ms spectrometer.set_integration_time(100) # 获取光谱 wavelengths, intensities = spectrometer.acquire_spectrum() # 绘制光谱图 plt.figure(figsize=(12, 6)) plt.plot(wavelengths, intensities, 'b-', linewidth=1) plt.xlabel('波长 (nm)', fontsize=12) plt.ylabel('强度 (counts)', fontsize=12) plt.title('光纤光谱仪实时数据', fontsize=14) plt.grid(True, alpha=0.3) plt.show() spectrometer.close()3.2 以太网连接方式(工业级)
import socket import struct class EthernetSpectrometer: """ 以太网连接光谱仪(适合工业在线检测) 支持 TCP/IP 协议 """ def __init__(self, ip_address='192.168.1.100', port=5000): self.ip_address = ip_address self.port = port self.socket = None def connect(self): """建立TCP连接""" try: self.socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) self.socket.connect((self.ip_address, self.port)) self.socket.settimeout(5.0) print(f"✅ 已连接到 {self.ip_address}:{self.port}") return True except Exception as e: print(f"❌ 连接失败: {e}") return False def send_command(self, command): """发送命令""" self.socket.sendall((command + '\r\n').encode()) def receive_data(self, expected_bytes): """接收原始数据""" data = b'' while len(data) < expected_bytes: packet = self.socket.recv(4096) if not packet: break data += packet return data def acquire_spectrum(self): """采集光谱数据""" self.send_command('ACQUIRE') # EQ2000: 2048像素,16位 = 4096字节 raw_data = self.receive_data(4096) # 解析数据 intensities = np.array(struct.unpack(f'{2048}H', raw_data)) # 生成波长数组(根据校准参数) start_wavelength = 200 # nm end_wavelength = 1100 # nm wavelengths = np.linspace(start_wavelength, end_wavelength, 2048) return wavelengths, intensities def close(self): if self.socket: self.socket.close()四、实时显示与数据处理
4.1 实时采集动画
class RealTimeSpectrometerDisplay: """ 实时光谱显示类 支持动态更新、峰值标记、数据统计 """ def __init__(self, spectrometer, interval=100): """ Args: spectrometer: ChopticsSpectrometer 实例 interval: 采集间隔(毫秒) """ self.spectrometer = spectrometer self.interval = interval self.spectra_history = [] # 存储历史数据 def update_plot(self, frame): """更新光谱曲线""" try: wavelengths, intensities = self.spectrometer.acquire_spectrum() # 绘制当前光谱 self.ax.clear() self.ax.plot(wavelengths, intensities, 'b-', linewidth=1.5, label='实时光谱') # 标记峰值 peak_indices, _ = find_peaks(intensities, height=1000) if len(peak_indices) > 0: peak_wavelengths = wavelengths[peak_indices] peak_intensities = intensities[peak_indices] self.ax.scatter(peak_wavelengths, peak_intensities, c='red', s=50, zorder=5, label='峰值') # 标注峰值波长 for w, i in zip(peak_wavelengths, peak_intensities): self.ax.annotate(f'{w:.1f}nm', (w, i), textcoords="offset points", xytext=(0,10), ha='center', fontsize=9) # 显示统计信息 stats_text = f'最大值: {intensities.max()}\n' stats_text += f'平均值: {intensities.mean():.1f}\n' stats_text += f'标准差: {intensities.std():.1f}' self.ax.text(0.02, 0.98, stats_text, transform=self.ax.transAxes, verticalalignment='top', bbox=dict(boxstyle='round', facecolor='wheat', alpha=0.5)) self.ax.set_xlabel('波长 (nm)', fontsize=12) self.ax.set_ylabel('强度 (counts)', fontsize=12) self.ax.set_title('光纤光谱仪实时监测', fontsize=14) self.ax.grid(True, alpha=0.3) self.ax.legend(loc='upper right') self.ax.set_xlim(200, 1100) # 保存历史数据 self.spectra_history.append(intensities.copy()) except Exception as e: print(f"采集异常: {e}") def start(self): """启动实时显示""" fig, self.ax = plt.subplots(figsize=(14, 6)) ani = FuncAnimation(fig, self.update_plot, interval=self.interval, save_count=100) plt.show() def save_data(self, filename='spectrum_data.csv'): """保存数据到CSV""" import pandas as pd if not self.spectra_history: print("无历史数据") return df = pd.DataFrame(self.spectra_history) df.to_csv(filename, index=False) print(f"✅ 数据已保存: {filename}") # 启动实时显示 if __name__ == "__main__": spec = ChopticsSpectrometer(serial_port='COM3') spec.connect() spec.set_integration_time(50) # 50ms积分时间 display = RealTimeSpectrometerDisplay(spec, interval=100) display.start()4.2 数据平滑与去噪
from scipy.signal import savgol_filter, medfilt from scipy.ndimage import gaussian_filter1d def preprocess_spectrum(intensities, method='sg'): """ 光谱数据预处理 Parameters: intensities: 原始光谱强度数据 method: 'sg' (Savitzky-Golay) 或 'gauss' (高斯) 或 'median' (中值) Returns: 平滑后的光谱数据 """ if method == 'sg': # Savitzky-Golay滤波器(保持峰形) return savgol_filter(intensities, window_length=11, polyorder=3) elif method == 'gauss': # 高斯平滑 return gaussian_filter1d(intensities, sigma=2) elif method == 'median': # 中值滤波(去除脉冲噪声) return medfilt(intensities, kernel_size=5) else: return intensities def remove_baseline(intensities, lambda_param=1e5): """ 基线校正(AsLS算法) 用于去除荧光背景、散射背景等 """ from scipy import sparse from scipy.sparse.linalg import spsolve n = len(intensities) L = sparse.diags([1, -2, 1], [0, -1, -2], shape=(n, n-2)) L = lambda_param * L.dot(L.transpose()) D = sparse.diags([1, -1], [0, -1], shape=(n, n)) w = np.ones(n) W = sparse.diags(w, 0) for _ in range(10): # 迭代优化 W.setdiag(w) Z = W + L baseline = spsolve(Z, w * intensities) w = 1 * (intensities > baseline) + 0.001 * (intensities <= baseline) return intensities - baseline五、数据保存与导出
5.1 多种格式保存
import json from datetime import datetime class SpectrumDataManager: """光谱数据管理器""" def __init__(self, base_path='./data'): self.base_path = base_path self.current_session = datetime.now().strftime('%Y%m%d_%H%M%S') def save_csv(self, wavelengths, intensities, filename=None): """保存为CSV格式""" if filename is None: filename = f'spectrum_{self.current_session}.csv' filepath = os.path.join(self.base_path, filename) df = pd.DataFrame({ 'Wavelength_nm': wavelengths, 'Intensity_counts': intensities }) df.to_csv(filepath, index=False) print(f"✅ CSV已保存: {filepath}") return filepath def save_numpy(self, wavelengths, intensities, filename=None): """保存为NumPy格式(保留高精度)""" if filename is None: filename = f'spectrum_{self.current_session}.npz' filepath = os.path.join(self.base_path, filename) np.savez(filepath, wavelengths=wavelengths, intensities=intensities) print(f"✅ NumPy已保存: {filepath}") return filepath def save_with_metadata(self, wavelengths, intensities, metadata=None): """ 保存带元数据的光谱文件 包含仪器参数、测量条件等信息 """ if metadata is None: metadata = {} # 元数据 metadata.update({ 'timestamp': datetime.now().isoformat(), 'instrument': '辰昶光纤光谱仪', 'num_points': len(wavelengths), 'wavelength_range': f'{wavelengths.min():.2f}-{wavelengths.max():.2f}nm' }) filename = f'spectrum_{self.current_session}_meta.json' filepath = os.path.join(self.base_path, filename) data = { 'metadata': metadata, 'wavelengths': wavelengths.tolist(), 'intensities': intensities.tolist() } with open(filepath, 'w', encoding='utf-8') as f: json.dump(data, f, indent=2, ensure_ascii=False) print(f"✅ 带元数据文件已保存: {filepath}") return filepath六、工业级应用示例
6.1 在线浓度监测
class ConcentrationMonitor: """ 在线浓度监测系统 基于Beer-Lambert定律: A = ε·c·l """ def __init__(self, spectrometer, wavelength_absorbance): self.spectrometer = spectrometer self.wavelength_absorbance = wavelength_absorbance self.calibration_curve = {} # 浓度标定曲线 def set_calibration(self, concentrations, absorbances): """ 设置标定曲线 concentrations: 标准浓度列表 absorbances: 对应的吸光度值 """ from scipy.optimize import curve_fit def linear(x, a, b): return a * x + b popt, _ = curve_fit(linear, concentrations, absorbances) self.calibration_curve = {'a': popt[0], 'b': popt[1]} print(f"标定完成: A = {popt[0]:.4f}×C + {popt[1]:.4f}") def measure_concentration(self): """测量当前浓度""" wavelengths, intensities = self.spectrometer.acquire_spectrum() # 计算吸光度(需要参考光谱) # A = log10(I_ref / I_sample) absorbance = np.log10(self.reference_intensity / intensities) # 在指定波长处读取吸光度 idx = np.argmin(np.abs(wavelengths - self.wavelength_absorbance)) A = absorbance[idx] # 根据标定曲线计算浓度 if self.calibration_curve: C = (A - self.calibration_curve['b']) / self.calibration_curve['a'] return C, A return None, A七、常见问题与解决方案
| 问题 | 可能原因 | 解决方案 |
|---|---|---|
| 连接超时 | 串口被占用 | 检查端口号,更换USB口 |
| 采集数据为0 | 光纤未连接 | 检查光纤接口,确保光路导通 |
| 噪声过大 | 积分时间过短 | 增加积分时间,或使用平均采集 |
| 基线漂移 | 环境温度变化 | 使用基线校正算法 |
| 通信不稳定 | USB供电不足 | 使用带供电的USB集线器 |
💡辰昶仪器提示:辰昶光谱仪提供完整的SDK开发包和技术支持,包括Python、LabVIEW、C#、Java等多语言接口。
八、总结
本文提供了完整的 Python 光谱仪数据读取方案:
- ✅基础连接类- 支持USB和以太网两种连接方式
- ✅实时显示- 动态更新、峰值标记、统计信息
- ✅数据预处理- 平滑、去噪、基线校正
- ✅多格式保存- CSV、NumPy、带元数据的JSON
- ✅工业应用- 浓度监测、在线分析示例
