Python案例如何实现异步任务?

wen python案例 11

本文目录导读:

Python案例如何实现异步任务?

  1. 使用 asyncio 创建异步任务
  2. 使用 asyncio.Queue 实现任务队列
  3. 使用 aiohttp 实现异步HTTP请求
  4. 异步任务的实际应用案例
  5. 异步任务错误处理
  6. 高级用法:异步上下文管理器
  7. 性能对比:同步 vs 异步
  8. 最佳实践建议

我来详细介绍Python中实现异步任务的几种主要方式:

使用 asyncio 创建异步任务

基础示例:并发执行多个任务

import asyncio
import time
async def fetch_data(url, delay):
    """模拟异步获取数据"""
    print(f"开始获取数据: {url}")
    await asyncio.sleep(delay)  # 模拟网络延迟
    print(f"完成获取数据: {url}")
    return f"数据来自 {url}"
async def main():
    # 创建多个异步任务
    task1 = asyncio.create_task(fetch_data("api.example.com/user", 2))
    task2 = asyncio.create_task(fetch_data("api.example.com/posts", 3))
    task3 = asyncio.create_task(fetch_data("api.example.com/comments", 1))
    # 等待所有任务完成
    results = await asyncio.gather(task1, task2, task3)
    for result in results:
        print(f"结果: {result}")
# 运行
asyncio.run(main())

使用 asyncio.gather()asyncio.wait()

import asyncio
import random
async def download_file(file_id, size):
    """模拟下载文件"""
    print(f"开始下载文件 {file_id} (大小: {size}MB)")
    await asyncio.sleep(size / 10)  # 模拟下载速度
    print(f"完成下载文件 {file_id}")
    return {"file_id": file_id, "size": size}
async def process_files():
    # 创建多个下载任务
    files = [
        (1, 50),
        (2, 30),
        (3, 80),
        (4, 20)
    ]
    tasks = [download_file(fid, size) for fid, size in files]
    # 方法1: 使用 gather - 等待所有任务完成
    print("=== 使用 gather ===")
    results = await asyncio.gather(*tasks)
    print(f"所有文件下载完成: {results}")
    # 方法2: 使用 wait - 可以设置超时或等待特定任务
    print("\n=== 使用 wait ===")
    tasks = [download_file(fid, size) for fid, size in files]
    done, pending = await asyncio.wait(
        tasks, 
        timeout=5,  # 超时5秒
        return_when=asyncio.FIRST_COMPLETED  # 第一个完成就返回
    )
    print(f"完成的任务: {len(done)}")
    print(f"未完成的任务: {len(pending)}")
    # 取消未完成的任务
    for task in pending:
        task.cancel()
asyncio.run(process_files())

使用 asyncio.Queue 实现任务队列

import asyncio
import random
async def worker(worker_id, queue):
    """工作线程"""
    while True:
        # 从队列获取任务
        task = await queue.get()
        if task is None:
            break
        url, delay = task
        print(f"Worker-{worker_id}: 处理 {url} (需要{delay}秒)")
        await asyncio.sleep(delay)
        print(f"Worker-{worker_id}: 完成 {url}")
        queue.task_done()
    print(f"Worker-{worker_id}: 结束工作")
async def main():
    # 创建任务队列
    queue = asyncio.Queue()
    # 添加任务
    tasks_data = [
        ("api.example.com/data1", 2),
        ("api.example.com/data2", 1),
        ("api.example.com/data3", 3),
        ("api.example.com/data4", 1.5),
        ("api.example.com/data5", 2.5),
    ]
    for task in tasks_data:
        await queue.put(task)
    # 创建3个worker
    workers = [
        asyncio.create_task(worker(i, queue))
        for i in range(3)
    ]
    # 添加停止信号
    for _ in range(3):
        await queue.put(None)
    # 等待所有worker完成
    await asyncio.gather(*workers)
asyncio.run(main())

使用 aiohttp 实现异步HTTP请求

import asyncio
import aiohttp
async def fetch_url(session, url):
    """异步获取URL内容"""
    try:
        async with session.get(url) as response:
            print(f"请求: {url} -> 状态码: {response.status}")
            return await response.text()
    except Exception as e:
        print(f"请求失败 {url}: {e}")
        return None
async def download_multiple_urls():
    urls = [
        "https://httpbin.org/delay/2",
        "https://httpbin.org/delay/1",
        "https://httpbin.org/delay/3",
        "https://httpbin.org/get"
    ]
    async with aiohttp.ClientSession() as session:
        # 创建所有请求任务
        tasks = [fetch_url(session, url) for url in urls]
        # 并发执行所有请求
        results = await asyncio.gather(*tasks)
        for url, content in zip(urls, results):
            if content:
                print(f"{url}: 获取到内容长度 {len(content)}")
    print("所有请求完成!")
asyncio.run(download_multiple_urls())

异步任务的实际应用案例

批量数据处理系统

import asyncio
import random
import time
class AsyncDataProcessor:
    """异步数据处理系统"""
    def __init__(self, max_concurrent=5):
        self.semaphore = asyncio.Semaphore(max_concurrent)
        self.results = []
    async def process_item(self, item_id):
        """处理单个数据项"""
        async with self.semaphore:  # 限制并发数
            print(f"开始处理项目 {item_id}")
            # 模拟处理时间
            processing_time = random.uniform(1, 3)
            await asyncio.sleep(processing_time)
            result = {
                "id": item_id,
                "processed_at": time.time(),
                "data": f"处理结果-{item_id}"
            }
            print(f"完成处理项目 {item_id} (耗时: {processing_time:.2f}s)")
            return result
    async def process_batch(self, item_ids):
        """批量处理数据"""
        tasks = [self.process_item(item_id) for item_id in item_ids]
        # 使用 gather 收集所有结果
        results = await asyncio.gather(*tasks)
        return results
    async def progressive_processing(self, item_ids):
        """渐进式处理(边处理边获取结果)"""
        for coro in asyncio.as_completed(
            [self.process_item(item_id) for item_id in item_ids]
        ):
            result = await coro
            print(f"获取到结果: {result['id']}")
            self.results.append(result)
async def main():
    processor = AsyncDataProcessor(max_concurrent=3)
    # 准备数据
    items = list(range(1, 11))  # 10个数据项
    print("=== 批量处理模式 ===")
    start_time = time.time()
    results = await processor.process_batch(items[:5])
    print(f"批量处理完成,耗时: {time.time() - start_time:.2f}s")
    print(f"结果: {len(results)} 项")
    print("\n=== 渐进式处理模式 ===")
    start_time = time.time()
    await processor.progressive_processing(items[5:])
    print(f"渐进式处理完成,耗时: {time.time() - start_time:.2f}s")
    print(f"结果: {len(processor.results)} 项")
asyncio.run(main())

异步任务错误处理

import asyncio
import random
async def risky_task(task_id):
    """可能失败的任务"""
    delay = random.uniform(1, 3)
    await asyncio.sleep(delay)
    if random.random() < 0.3:  # 30%概率失败
        raise ValueError(f"任务 {task_id} 失败!")
    return f"任务 {task_id} 成功完成"
async def safe_executor():
    """安全的异步执行器"""
    tasks = [
        risky_task(1),
        risky_task(2),
        risky_task(3),
        risky_task(4),
        risky_task(5)
    ]
    # 使用 gather 的 return_exceptions 参数
    results = await asyncio.gather(*tasks, return_exceptions=True)
    success_count = 0
    fail_count = 0
    for i, result in enumerate(results, 1):
        if isinstance(result, Exception):
            print(f"任务 {i} 失败: {result}")
            fail_count += 1
        else:
            print(f"任务 {i} 成功: {result}")
            success_count += 1
    print(f"\n总计: 成功 {success_count}, 失败 {fail_count}")
async def controlled_execution():
    """带控制的任务执行"""
    tasks = [
        risky_task(1),
        risky_task(2),
        risky_task(3)
    ]
    # 使用 wait 和超时控制
    done, pending = await asyncio.wait(
        tasks,
        timeout=2.5,  # 2.5秒超时
        return_when=asyncio.ALL_COMPLETED
    )
    # 处理完成的任务
    for task in done:
        try:
            result = task.result()
            print(f"任务完成: {result}")
        except Exception as e:
            print(f"任务异常: {e}")
    # 取消超时的任务
    for task in pending:
        task.cancel()
        print(f"任务因超时被取消")
asyncio.run(safe_executor())

高级用法:异步上下文管理器

import asyncio
class AsyncResource:
    """异步资源管理器"""
    async def __aenter__(self):
        print("初始化异步资源...")
        await asyncio.sleep(1)
        self.resource = "异步资源"
        return self
    async def __aexit__(self, exc_type, exc_val, exc_tb):
        print("清理异步资源...")
        await asyncio.sleep(0.5)
        print("资源清理完成")
    async def use(self):
        print(f"使用 {self.resource}")
        await asyncio.sleep(0.5)
        return "操作成功"
async def main():
    # 使用异步上下文管理器
    async with AsyncResource() as resource:
        result = await resource.use()
        print(f"结果: {result}")
    print("操作完成")
asyncio.run(main())

性能对比:同步 vs 异步

import asyncio
import time
# 同步版本
def sync_operations():
    def sync_work(delay, name):
        print(f"同步-{name}: 开始")
        time.sleep(delay)
        print(f"同步-{name}: 完成")
        return f"{name} 结果"
    start = time.time()
    result1 = sync_work(2, "A")
    result2 = sync_work(1, "B")
    result3 = sync_work(3, "C")
    print(f"同步总耗时: {time.time() - start:.2f}秒")
    return [result1, result2, result3]
# 异步版本
async def async_operations():
    async def async_work(delay, name):
        print(f"异步-{name}: 开始")
        await asyncio.sleep(delay)
        print(f"异步-{name}: 完成")
        return f"{name} 结果"
    start = time.time()
    # 同时启动所有任务
    results = await asyncio.gather(
        async_work(2, "A"),
        async_work(1, "B"),
        async_work(3, "C")
    )
    print(f"异步总耗时: {time.time() - start:.2f}秒")
    return results
def compare():
    print("=== 同步执行 ===")
    sync_results = sync_operations()
    print("\n=== 异步执行 ===")
    async_results = asyncio.run(async_operations())
    print(f"\n同步耗时: 约6秒 (2+1+3)")
    print(f"异步耗时: 约3秒 (取最大值 3)")
if __name__ == "__main__":
    compare()

最佳实践建议

  1. 合理使用并发限制:使用 asyncio.Semaphore 控制并发数
  2. 适当的错误处理:使用 return_exceptions=True 避免单个任务失败影响整体
  3. 超时控制:使用 asyncio.wait(timeout=...) 避免任务卡死
  4. 优雅关闭:正确处理任务取消和资源清理
  5. 选择合适的异步库:如 aiohttpaiomysqlaioredis

这些示例涵盖了Python异步编程的主要模式和最佳实践,可以根据实际需求选择合适的实现方式。

抱歉,评论功能暂时关闭!