引言
在Python生态中,asyncio已经成为处理高并发I/O密集型任务的标准库。从网络爬虫、Web服务到数据库操作,异步编程能显著提升程序的吞吐量和响应速度。然而,许多开发者对async/await、事件循环、协程等概念一知半解,写出的代码看似正确却隐藏隐患。本文将带你系统梳理asyncio的核心机制,并通过多个可运行的实战案例,帮你彻底掌握异步编程。
本文面向有一定Python基础、希望深入理解并应用asyncio的开发者。读完你将收获:
- 清晰理解协程、任务、事件循环的关系
- 掌握
async/await的正确用法 - 学会使用
asyncio.gather、asyncio.create_task等并发工具 - 避开常见陷阱,写出健壮的异步代码
一、核心概念解析
1.1 事件循环(Event Loop)
事件循环是asyncio的心脏。它是一个无限循环,不断检查并执行已就绪的协程、回调或其他异步操作。简单理解:事件循环类似一个调度器,负责管理所有异步任务的执行顺序。在Python中,通常通过asyncio.run()启动顶层事件循环。
import asyncio async def main(): print("Hello ...") await asyncio.sleep(1) print("... World!") # 运行事件循环并执行main()协程 asyncio.run(main())1.2 协程(Coroutine)与 async/await
async def定义的函数称为协程函数,调用它返回一个协程对象,必须通过事件循环来驱动执行。await关键字用于挂起当前协程,等待另一个可等待对象(协程、Task、Future)完成,同时释放控制权给事件循环去执行其他任务。
重要的是:真正的并行只发生在await一个异步操作时,例如网络IO、文件读取(需使用异步库)或asyncio.sleep()。如果协程内都是同步阻塞代码,异步将失去意义。
async def fetch_data(url): print(f"开始请求 {url}") # 模拟网络IO,使用asyncio.sleep代替真正的IO操作 await asyncio.sleep(1) print(f"完成请求 {url}") return f"数据来自{url}"1.3 任务(Task)
任务是对协程的进一步封装,用于并发调度。当调用asyncio.create_task(coro)时,协程会被包装成一个Task,并立即被事件循环调度执行。一个Task可以看作“轻量级线程”,它允许多个协程并发运行。
与直接await协程不同,Task的创建不会阻塞当前协程,因此可以同时启动多个任务。
async def main(): # 同时创建两个任务,它们会并发执行 task1 = asyncio.create_task(fetch_data("https://api.example.com/1")) task2 = asyncio.create_task(fetch_data("https://api.example.com/2")) # 此时两个任务已经在后台运行,我们可以做其他事情 print("任务已启动,等待完成...") result1 = await task1 result2 = await task2 print(result1, result2)1.4 Future
Future是一种低层级的可等待对象,代表一个异步操作的最终结果。Task是Future的子类。通常我们较少直接使用Future,更多通过Task或高层API操作。
二、实战示例:并发下载器
下面我们实现一个完整的异步文件下载工具,演示并发、限流和异常处理。假设我们要从多个URL下载内容并保存到本地。
import asyncio import aiohttp # 需要安装: pip install aiohttp import time from typing import List # 模拟的下载链接列表 URLS = [ "https://httpbin.org/delay/1", "https://httpbin.org/delay/2", "https://httpbin.org/delay/1", "https://httpbin.org/delay/3", ] async def download_one(session: aiohttp.ClientSession, url: str, sem: asyncio.Semaphore) -> str: """ 下载单个URL的内容,使用信号量限制并发数 """ async with sem: # 限制同时运行的协程数量 print(f"[{time.strftime('%X')}] 开始下载 {url}") try: async with session.get(url, timeout=10) as response: content = await response.text() # 模拟保存操作 await asyncio.sleep(0.5) print(f"[{time.strftime('%X')}] 完成下载 {url},长度 {len(content)}") return content except Exception as e: print(f"下载 {url} 出错: {e}") return "" async def download_all(urls: List[str], concurrency: int = 2): """ 并发下载所有URL,concurrency为最大并发数 """ sem = asyncio.Semaphore(concurrency) # 信号量控制并发 async with aiohttp.ClientSession() as session: tasks = [asyncio.create_task(download_one(session, url, sem)) for url in urls] # asyncio.gather等待所有任务完成,可设置return_exceptions=True忽略异常 results = await asyncio.gather(*tasks, return_exceptions=True) # 过滤掉异常结果 return [r for r in results if isinstance(r, str) and r] async def main(): start = time.time() results = await download_all(URLS, concurrency=2) elapsed = time.time() - start print(f"下载完成,共 {len(results)} 个文件,耗时 {elapsed:.2f} 秒") if __name__ == "__main__": asyncio.run(main())代码说明:
- 使用
aiohttp实现真正的异步HTTP请求。 asyncio.Semaphore限制最大并发连接数,避免对服务器造成过大压力。asyncio.gather并发运行多个任务并收集结果,return_exceptions=True确保单个任务失败不影响其他任务。- 结果中过滤掉了因异常返回的非字符串内容,保证后续处理安全。
运行该程序,你会发现总耗时接近于最长单个下载时间加上少量开销,而不是所有任务时间之和,体现了并发的优势。
三、常见问题与注意事项
3.1 避免在协程中使用同步阻塞代码
在协程中调用time.sleep()、requests.get()等同步阻塞函数会冻结整个事件循环,导致其他任务无法执行。必须使用异步版本,如asyncio.sleep()、aiohttp等。
错误示例:
async def bad_sleep(): time.sleep(5) # 阻塞事件循环!正确做法:
async def good_sleep(): await asyncio.sleep(5)如果必须调用同步函数,可用loop.run_in_executor()将阻塞操作放到线程池中执行,但需谨慎管理资源。
3.2 正确取消任务
当不再需要某个任务或程序关闭时,应取消任务以避免资源泄漏。可通过task.cancel()请求取消,并在协程内捕获asyncio.CancelledError进行清理。
async def worker(): try: while True: print("工作中...") await asyncio.sleep(1) except asyncio.CancelledError: print("任务被取消,执行清理") raise # 重新抛出是推荐做法 async def main(): task = asyncio.create_task(worker()) await asyncio.sleep(3) task.cancel() try: await task except asyncio.CancelledError: print("main: 任务已被取消")3.3 避免忘记await
创建Task后如果忘记await,程序可能提前退出,导致任务未完成。在asyncio.run()结束后,所有未完成的任务会被取消。确保主要逻辑都等待了必要的任务。
3.4 异常传播
当使用asyncio.gather()且不设置return_exceptions=True时,一旦某个任务抛出异常,gather会立即将该异常传播,但其他任务仍在后台运行。为获取全部结果,建议设置return_exceptions=True,然后手动检查结果类型。
3.5 线程安全与共享状态
由于异步代码默认在单线程的事件循环中运行,简单操作一般不存在竞态条件。但如果使用run_in_executor()与多线程交互,或使用了非异步安全的第三方库,仍需考虑线程安全问题。建议通过asyncio.Queue等同步原语进行协程间通信。
四、高级技巧:超时控制与优雅关闭
在实际应用中,我们需要为异步操作设置超时,并实现优雅的程序退出。
4.1 使用asyncio.wait_for设置超时
async def fetch_with_timeout(session, url, timeout=5): try: # 如果超过timeout秒未完成,将引发TimeoutError async with session.get(url) as resp: return await resp.text() except asyncio.TimeoutError: print(f"请求 {url} 超时") return ""封装时可以直接将获取响应的协程包在wait_for里:
async def download_with_timeout(session, url): try: async with session.get(url) as resp: content = await asyncio.wait_for(resp.text(), timeout=5) return content except asyncio.TimeoutError: print(f"读取响应超时: {url}") return ""4.2 优雅关闭:使用信号处理
对于长期运行的服务,我们需要捕获系统信号(如SIGINT),优雅地取消正在运行的任务。
import signal async def shutdown(loop, signal=None): """取消所有任务,停止事件循环""" if signal: print(f"收到信号 {signal.name},正在关闭...") tasks = [t for t in asyncio.all_tasks() if t is not asyncio.current_task()] for task in tasks: task.cancel() await asyncio.gather(*tasks, return_exceptions=True) loop.stop() async def main(): loop = asyncio.get_event_loop() # 注册信号处理 for sig in (signal.SIGINT, signal.SIGTERM): loop.add_signal_handler(sig, lambda s=sig: asyncio.create_task(shutdown(loop, s))) # 正常业务逻辑 try: await download_all(URLS) except asyncio.CancelledError: pass finally: await shutdown(loop) if __name__ == "__main__": loop = asyncio.new_event_loop() try: asyncio.set_event_loop(loop) loop.run_until_complete(main()) finally: loop.close()这种方式确保程序收到终止信号后,能够取消所有未完成任务,并干净地退出。
总结
本文从事件循环、协程、任务等基础概念出发,通过并发下载实战深入演示了asyncio的核心用法,并探讨了常见陷阱与高级技巧。掌握异步编程的关键在于理解“非阻塞等待”的本质,始终使用异步库进行I/O操作,并合理利用Task、信号量和异常处理机制。
异步编程模型虽然带来了更高的并发能力,但也增加了代码的复杂性。建议在真正遇到I/O密集型瓶颈时引入,并配合适当的监控和测试。希望这篇指南能帮助你在Python异步之路上少走弯路,编写出高效、健壮的异步程序。
下一步学习:你可以深入探索asyncio.Queue实现生产者-消费者模式、学习uvloop加速事件循环,或结合FastAPI等异步框架构建高并发Web服务。