本文目录导读:

- 使用
asyncio创建异步任务 - 使用
asyncio.Queue实现任务队列 - 使用
aiohttp实现异步HTTP请求 - 异步任务的实际应用案例
- 异步任务错误处理
- 高级用法:异步上下文管理器
- 性能对比:同步 vs 异步
- 最佳实践建议
我来详细介绍Python中实现异步任务的几种主要方式:
使用 asyncio 创建异步任务
基础示例:并发执行多个任务
import asyncio
import time
async def fetch_data(url, delay):
"""模拟异步获取数据"""
print(f"开始获取数据: {url}")
await asyncio.sleep(delay) # 模拟网络延迟
print(f"完成获取数据: {url}")
return f"数据来自 {url}"
async def main():
# 创建多个异步任务
task1 = asyncio.create_task(fetch_data("api.example.com/user", 2))
task2 = asyncio.create_task(fetch_data("api.example.com/posts", 3))
task3 = asyncio.create_task(fetch_data("api.example.com/comments", 1))
# 等待所有任务完成
results = await asyncio.gather(task1, task2, task3)
for result in results:
print(f"结果: {result}")
# 运行
asyncio.run(main())
使用 asyncio.gather() 和 asyncio.wait()
import asyncio
import random
async def download_file(file_id, size):
"""模拟下载文件"""
print(f"开始下载文件 {file_id} (大小: {size}MB)")
await asyncio.sleep(size / 10) # 模拟下载速度
print(f"完成下载文件 {file_id}")
return {"file_id": file_id, "size": size}
async def process_files():
# 创建多个下载任务
files = [
(1, 50),
(2, 30),
(3, 80),
(4, 20)
]
tasks = [download_file(fid, size) for fid, size in files]
# 方法1: 使用 gather - 等待所有任务完成
print("=== 使用 gather ===")
results = await asyncio.gather(*tasks)
print(f"所有文件下载完成: {results}")
# 方法2: 使用 wait - 可以设置超时或等待特定任务
print("\n=== 使用 wait ===")
tasks = [download_file(fid, size) for fid, size in files]
done, pending = await asyncio.wait(
tasks,
timeout=5, # 超时5秒
return_when=asyncio.FIRST_COMPLETED # 第一个完成就返回
)
print(f"完成的任务: {len(done)}")
print(f"未完成的任务: {len(pending)}")
# 取消未完成的任务
for task in pending:
task.cancel()
asyncio.run(process_files())
使用 asyncio.Queue 实现任务队列
import asyncio
import random
async def worker(worker_id, queue):
"""工作线程"""
while True:
# 从队列获取任务
task = await queue.get()
if task is None:
break
url, delay = task
print(f"Worker-{worker_id}: 处理 {url} (需要{delay}秒)")
await asyncio.sleep(delay)
print(f"Worker-{worker_id}: 完成 {url}")
queue.task_done()
print(f"Worker-{worker_id}: 结束工作")
async def main():
# 创建任务队列
queue = asyncio.Queue()
# 添加任务
tasks_data = [
("api.example.com/data1", 2),
("api.example.com/data2", 1),
("api.example.com/data3", 3),
("api.example.com/data4", 1.5),
("api.example.com/data5", 2.5),
]
for task in tasks_data:
await queue.put(task)
# 创建3个worker
workers = [
asyncio.create_task(worker(i, queue))
for i in range(3)
]
# 添加停止信号
for _ in range(3):
await queue.put(None)
# 等待所有worker完成
await asyncio.gather(*workers)
asyncio.run(main())
使用 aiohttp 实现异步HTTP请求
import asyncio
import aiohttp
async def fetch_url(session, url):
"""异步获取URL内容"""
try:
async with session.get(url) as response:
print(f"请求: {url} -> 状态码: {response.status}")
return await response.text()
except Exception as e:
print(f"请求失败 {url}: {e}")
return None
async def download_multiple_urls():
urls = [
"https://httpbin.org/delay/2",
"https://httpbin.org/delay/1",
"https://httpbin.org/delay/3",
"https://httpbin.org/get"
]
async with aiohttp.ClientSession() as session:
# 创建所有请求任务
tasks = [fetch_url(session, url) for url in urls]
# 并发执行所有请求
results = await asyncio.gather(*tasks)
for url, content in zip(urls, results):
if content:
print(f"{url}: 获取到内容长度 {len(content)}")
print("所有请求完成!")
asyncio.run(download_multiple_urls())
异步任务的实际应用案例
批量数据处理系统
import asyncio
import random
import time
class AsyncDataProcessor:
"""异步数据处理系统"""
def __init__(self, max_concurrent=5):
self.semaphore = asyncio.Semaphore(max_concurrent)
self.results = []
async def process_item(self, item_id):
"""处理单个数据项"""
async with self.semaphore: # 限制并发数
print(f"开始处理项目 {item_id}")
# 模拟处理时间
processing_time = random.uniform(1, 3)
await asyncio.sleep(processing_time)
result = {
"id": item_id,
"processed_at": time.time(),
"data": f"处理结果-{item_id}"
}
print(f"完成处理项目 {item_id} (耗时: {processing_time:.2f}s)")
return result
async def process_batch(self, item_ids):
"""批量处理数据"""
tasks = [self.process_item(item_id) for item_id in item_ids]
# 使用 gather 收集所有结果
results = await asyncio.gather(*tasks)
return results
async def progressive_processing(self, item_ids):
"""渐进式处理(边处理边获取结果)"""
for coro in asyncio.as_completed(
[self.process_item(item_id) for item_id in item_ids]
):
result = await coro
print(f"获取到结果: {result['id']}")
self.results.append(result)
async def main():
processor = AsyncDataProcessor(max_concurrent=3)
# 准备数据
items = list(range(1, 11)) # 10个数据项
print("=== 批量处理模式 ===")
start_time = time.time()
results = await processor.process_batch(items[:5])
print(f"批量处理完成,耗时: {time.time() - start_time:.2f}s")
print(f"结果: {len(results)} 项")
print("\n=== 渐进式处理模式 ===")
start_time = time.time()
await processor.progressive_processing(items[5:])
print(f"渐进式处理完成,耗时: {time.time() - start_time:.2f}s")
print(f"结果: {len(processor.results)} 项")
asyncio.run(main())
异步任务错误处理
import asyncio
import random
async def risky_task(task_id):
"""可能失败的任务"""
delay = random.uniform(1, 3)
await asyncio.sleep(delay)
if random.random() < 0.3: # 30%概率失败
raise ValueError(f"任务 {task_id} 失败!")
return f"任务 {task_id} 成功完成"
async def safe_executor():
"""安全的异步执行器"""
tasks = [
risky_task(1),
risky_task(2),
risky_task(3),
risky_task(4),
risky_task(5)
]
# 使用 gather 的 return_exceptions 参数
results = await asyncio.gather(*tasks, return_exceptions=True)
success_count = 0
fail_count = 0
for i, result in enumerate(results, 1):
if isinstance(result, Exception):
print(f"任务 {i} 失败: {result}")
fail_count += 1
else:
print(f"任务 {i} 成功: {result}")
success_count += 1
print(f"\n总计: 成功 {success_count}, 失败 {fail_count}")
async def controlled_execution():
"""带控制的任务执行"""
tasks = [
risky_task(1),
risky_task(2),
risky_task(3)
]
# 使用 wait 和超时控制
done, pending = await asyncio.wait(
tasks,
timeout=2.5, # 2.5秒超时
return_when=asyncio.ALL_COMPLETED
)
# 处理完成的任务
for task in done:
try:
result = task.result()
print(f"任务完成: {result}")
except Exception as e:
print(f"任务异常: {e}")
# 取消超时的任务
for task in pending:
task.cancel()
print(f"任务因超时被取消")
asyncio.run(safe_executor())
高级用法:异步上下文管理器
import asyncio
class AsyncResource:
"""异步资源管理器"""
async def __aenter__(self):
print("初始化异步资源...")
await asyncio.sleep(1)
self.resource = "异步资源"
return self
async def __aexit__(self, exc_type, exc_val, exc_tb):
print("清理异步资源...")
await asyncio.sleep(0.5)
print("资源清理完成")
async def use(self):
print(f"使用 {self.resource}")
await asyncio.sleep(0.5)
return "操作成功"
async def main():
# 使用异步上下文管理器
async with AsyncResource() as resource:
result = await resource.use()
print(f"结果: {result}")
print("操作完成")
asyncio.run(main())
性能对比:同步 vs 异步
import asyncio
import time
# 同步版本
def sync_operations():
def sync_work(delay, name):
print(f"同步-{name}: 开始")
time.sleep(delay)
print(f"同步-{name}: 完成")
return f"{name} 结果"
start = time.time()
result1 = sync_work(2, "A")
result2 = sync_work(1, "B")
result3 = sync_work(3, "C")
print(f"同步总耗时: {time.time() - start:.2f}秒")
return [result1, result2, result3]
# 异步版本
async def async_operations():
async def async_work(delay, name):
print(f"异步-{name}: 开始")
await asyncio.sleep(delay)
print(f"异步-{name}: 完成")
return f"{name} 结果"
start = time.time()
# 同时启动所有任务
results = await asyncio.gather(
async_work(2, "A"),
async_work(1, "B"),
async_work(3, "C")
)
print(f"异步总耗时: {time.time() - start:.2f}秒")
return results
def compare():
print("=== 同步执行 ===")
sync_results = sync_operations()
print("\n=== 异步执行 ===")
async_results = asyncio.run(async_operations())
print(f"\n同步耗时: 约6秒 (2+1+3)")
print(f"异步耗时: 约3秒 (取最大值 3)")
if __name__ == "__main__":
compare()
最佳实践建议
- 合理使用并发限制:使用
asyncio.Semaphore控制并发数 - 适当的错误处理:使用
return_exceptions=True避免单个任务失败影响整体 - 超时控制:使用
asyncio.wait(timeout=...)避免任务卡死 - 优雅关闭:正确处理任务取消和资源清理
- 选择合适的异步库:如
aiohttp、aiomysql、aioredis等
这些示例涵盖了Python异步编程的主要模式和最佳实践,可以根据实际需求选择合适的实现方式。