当前位置: 首页 > news >正文

Pythonasyncio子进程管理

=================================================================
Python asyncio 子进程管理:异步执行外部命令
=================================================================

asyncio 提供了 create_subprocess_exec 和 create_subprocess_shell
来异步管理子进程,避免阻塞事件循环。本文详解完整用法。

=================================================================
一、create_subprocess_exec:执行外部程序
=================================================================

import asyncio
import sys

async def run_cmd_simple():
"""
异步执行一个外部命令,等待其完成。
create_subprocess_exec 直接执行程序(不经过 shell)。
"""
print("开始执行 'ls -la' ...")
# 创建子进程:第一个参数是可执行文件路径,后续是参数
process = await asyncio.create_subprocess_exec(
"ls" if sys.platform != "win32" else "dir", # Linux/macOS 用 ls
"-la",
# 默认继承父进程的标准流
stdout=asyncio.subprocess.PIPE, # 捕获标准输出
stderr=asyncio.subprocess.PIPE, # 捕获标准错误
)
# communicate() 等待进程结束并返回 (stdout, stderr)
stdout, stderr = await process.communicate()
print(f"返回码: {process.returncode}")
if stdout:
print(f"标准输出 ({len(stdout)} 字节):")
print(stdout.decode(errors="replace")[:500])
if stderr:
print(f"标准错误: {stderr.decode(errors='replace')}")

=================================================================
二、create_subprocess_shell:通过 Shell 执行
=================================================================

async def run_cmd_shell():
"""
通过系统 shell 执行命令。
注意:存在 shell 注入风险,不要用于不可信的输入。
"""
process = await asyncio.create_subprocess_shell(
"echo 'Hello from asyncio' && sleep 1 && echo 'Done'",
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
)
stdout, stderr = await process.communicate()
print(f"Shell 输出: {stdout.decode()}")

# create_subprocess_exec 与 create_subprocess_shell 的区别:
# - exec: 直接执行程序,更安全,不需要 shell 转义
# - shell: 通过 /bin/sh (Windows: cmd.exe) 执行,支持管道、重定向
# - exec 推荐用于所有可以避免使用 shell 的场景

=================================================================
三、Process 类详解
=================================================================

async def process_class_demo():
"""
展示 asyncio.subprocess.Process 的完整方法。
Process 对象通过 create_subprocess_* 返回。
"""
process = await asyncio.create_subprocess_exec(
"python3", "--version",
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
# 工作目录
cwd="/tmp",
# 环境变量
env={"PYTHONUNBUFFERED": "1"},
)

# 进程标识
print(f"PID: {process.pid}")

# 等待进程结束,获取返回码
returncode = await process.wait()
print(f"返回码: {returncode}")

# 检查进程状态
if process.returncode == 0:
print("执行成功")
else:
print(f"执行失败,返回码: {process.returncode}")

# 发送信号(仅 Unix)
# process.send_signal(signal.SIGTERM)
# process.terminate()
# process.kill()

=================================================================
四、读取子进程输出:逐行读取
=================================================================

async def stream_subprocess_output():
"""
逐行读取子进程输出,适用于长时间运行的进程。
使用 stdout.readline() 而不是 communicate(),
后者会等待进程结束才返回所有数据。
"""
process = await asyncio.create_subprocess_exec(
"ping", "-c", "5", "127.0.0.1",
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
)

print("开始逐行读取 ping 输出:")
# 逐行读取标准输出(实时读取)
while True:
line = await process.stdout.readline()
if not line:
break # EOF,进程已结束
# 解码并去除换行符
text = line.decode("utf-8", errors="replace").rstrip()
print(f"[ping] {text}")

# 确保进程已完全结束
await process.wait()
print(f"进程结束,返回码: {process.returncode}")

=================================================================
五、超时控制
=================================================================

async def subprocess_with_timeout():
"""
为子进程设置超时。
使用 asyncio.wait_for 包装 communicate()。
"""
process = await asyncio.create_subprocess_exec(
"sleep", "10", # 持续 10 秒
stdout=asyncio.subprocess.PIPE,
)

try:
# 设置 3 秒超时
stdout, stderr = await asyncio.wait_for(
process.communicate(), timeout=3.0
)
except asyncio.TimeoutError:
print("子进程执行超时,正在终止...")
process.terminate() # 发送 SIGTERM
# 等待进程真正结束
await process.wait()
print(f"进程已终止,返回码: {process.returncode}")
except Exception as e:
print(f"其他错误: {e}")
process.kill()
await process.wait()

=================================================================
六、并发执行多个子进程
=================================================================

async def run_multiple_subprocesses():
"""
使用 asyncio.gather 同时执行多个子进程。
这比串行执行快得多。
"""
async def run_one(cmd: str, delay: int):
"""运行一个子进程并返回结果"""
print(f"开始执行: {cmd}")
process = await asyncio.create_subprocess_shell(
f"echo 'Starting {cmd}' && sleep {delay} && echo '{cmd} done'",
stdout=asyncio.subprocess.PIPE,
shell=True,
)
stdout, _ = await process.communicate()
return stdout.decode().strip()

# 并发执行 3 个任务
results = await asyncio.gather(
run_one("Task-1", 2),
run_one("Task-2", 1),
run_one("Task-3", 3),
)

for result in results:
print(f"结果: {result}")

=================================================================
七、与子进程交互:写入 stdin
=================================================================

async def interact_with_subprocess():
"""
向子进程的标准输入写入数据,并读取输出。
适用于交互式程序。
"""
process = await asyncio.create_subprocess_exec(
"python3", "-c",
"""
import sys
for line in sys.stdin:
sys.stdout.write(f"处理: {line.strip().upper()}\\n")
sys.stdout.flush()
""",
stdin=asyncio.subprocess.PIPE,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
)

# 向 stdin 写入数据
process.stdin.write(b"hello\nworld\nasyncio\n")
# 务必调用 drain() 刷新缓冲区
await process.stdin.drain()
# 关闭 stdin 表示输入结束
process.stdin.close()

# 读取所有输出
stdout, stderr = await process.communicate()
print("子进程输出:")
print(stdout.decode())

=================================================================
八、总结
=================================================================

# 1. create_subprocess_exec 直接执行程序,更安全
# 2. create_subprocess_shell 经过 shell,有注入风险
# 3. communicate() 同时读写 stdin/stdout/stderr 并等待结束
# 4. wait() 仅等待进程结束,获取返回码
# 5. 逐行读取适用于实时流式输出场景
# 6. asyncio.wait_for 可为子进程设置超时
# 7. asyncio.gather 可并发管理多个子进程
# 8. 通过 stdin.write() 和 drain() 向子进程发送数据

if __name__ == "__main__":
asyncio.run(stream_subprocess_output())

http://www.rkmt.cn/news/1429669.html

相关文章:

  • 从“水缸”到“高速公路”:用生活化比喻彻底搞懂电容的滤波、旁路与去耦(附LTspice仿真)
  • 原型设计工具对比与校园失物招领系统原型设计
  • 别再只会用PEC了!CST材料库保姆级使用指南:从Normal介质到Lossy Metal的实战选择
  • 科瑞昌省电空调选购指南:工业大空间降温选型全攻略 - 资讯纵览
  • Android音乐播放器实战工程:带用户系统、本地数据库与四大组件完整实现
  • 智能电视上网难?TV Bro电视浏览器如何让大屏浏览变得轻松愉悦?
  • 2026护网行动全指南(干货版):从认知到实战,攻防落地可照搬
  • Windows安卓应用安装器:三步实现电脑运行手机应用
  • 3步掌握Unity游戏马赛克移除:UniversalUnityDemosaics完整指南
  • 微信聊天记录永久保存终极指南:如何一键导出所有聊天数据
  • 破解雨衣批发痛点:FEP一体化方法论如何实现高性价比稳定供应? - 资讯纵览
  • 【AI运维生死线】:当LangChain链式调用突然卡死——3层异步栈追踪+实时可观测性注入方案
  • 怎样高效使用Diffuse:专业开发者的5个实战技巧与深度配置指南
  • 10-大模型智能体开发工程师:RAG检索增强生成
  • AI工具更新日志追踪SOP(已落地金融/医疗/电商三大场景):从告警阈值设定到负责人自动分派,含Notion+Zapier实战模板
  • 深度解析:雨衣批发 行业趋势与优质供应选型指南 - 资讯纵览
  • 基于Micro:bit与加速度计的无线门磁报警器DIY实战
  • Bootstrap方法避坑指南:从原理到R实战,告诉你什么时候该用,什么时候不该用
  • 2026年5月劳力士售后保养价格与全国服务网点 - 资讯纵览
  • 2026年4月国内有实力的楼体亮化直销厂家有哪些,热门的楼体亮化厂家,楼体亮化提升城市夜间品质 - 品牌推荐师
  • 解密Ryzen硬件调谐:从系统黑盒到性能架构的艺术
  • 管束抽芯机厂商哪家靠谱
  • 告别硬件SPI!用STM32的普通IO口模拟SPI,成功驱动PCAP01电容测量芯片
  • 基于Python与Raspberry Pi的Bing图像搜索脚本开发指南
  • 2026年苏州本地口碑良好防水补漏服务商核心能力与适配场景专业解析 专业防水公司排名推荐(2026年5月防水补漏最新TOP权威排名) - 鼎壹万修缮说
  • 基于Arduino与RFID的智能音乐点播系统:从硬件选型到软件实现全解析
  • 用Python+OpenCV给贵州常见植物做个‘身份证’:从茅栗到楮的自动识别实践
  • 从FPGA时序报告看实战价值:4bit超前进位加法器(LCA)的Verilog实现与面积换性能分析
  • 2026免漆木门:解读行业三大核心发展趋势 - 资讯纵览
  • 校园失物招领平台源码:SpringBoot+Vue全栈实现,含数据库脚本、UI资源与部署指南