Python案例如何汇总任务结果?

wen python案例 68

本文目录导读:

Python案例如何汇总任务结果?

  1. 基础列表汇总(最常用)
  2. 多线程/多进程任务汇总
  3. 异步任务汇总(适合I/O密集型)
  4. 带进度条和错误处理的高级汇总
  5. 实用统计汇总(数据分析场景)
  6. 选择建议

Python中汇总任务结果的方法有很多,具体取决于你的使用场景,以下是最常见、最实用的几种方案,从简单到复杂:

基础列表汇总(最常用)

适合简单的任务结果收集:

# 多个函数返回结果
def task1():
    return 10
def task2():
    return 20
def task3():
    return 30
# 方法1:直接收集
results = [task1(), task2(), task3()]
print(f"直收集结果: {results}")
# 方法2:动态添加
dynamic_results = []
for i in range(5):
    result = i * 2  # 模拟任务结果
    dynamic_results.append(result)
print(f"动态收集结果: {dynamic_results}")
# 方法3:字典存储(带任务名称)
task_dict = {
    'task1': task1(),
    'task2': task2(),
    'task3': task3()
}
print(f"字典存储: {task_dict}")

多线程/多进程任务汇总

处理耗时任务时的结果收集:

import concurrent.futures
import time
# 模拟耗时任务
def heavy_task(n):
    time.sleep(n)
    return f"任务{n}完成"
# 方法1:ThreadPoolExecutor
def process_with_threads():
    with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor:
        # 提交所有任务
        futures = [executor.submit(heavy_task, i) for i in range(3)]
        # 收集结果(按完成顺序)
        results = []
        for future in concurrent.futures.as_completed(futures):
            results.append(future.result())
        return results
# 方法2:ProcessPoolExecutor(适合CPU密集型)
def process_with_processes():
    with concurrent.futures.ProcessPoolExecutor() as executor:
        results = list(executor.map(heavy_task, range(3)))
        return results
# 使用示例
thread_results = process_with_threads()
print(f"线程结果: {thread_results}")
process_results = process_with_processes()
print(f"进程结果: {process_results}")

异步任务汇总(适合I/O密集型)

使用 asyncio 实现高效并发:

import asyncio
import aiohttp
async def fetch_url(session, url):
    async with session.get(url) as response:
        return await response.text()
async def gather_async_results():
    urls = [
        'http://example.com',
        'http://httpbin.org/get',
        'http://httpbin.org/status/200'
    ]
    async with aiohttp.ClientSession() as session:
        # 方法1:使用gather
        tasks = [fetch_url(session, url) for url in urls]
        results = await asyncio.gather(*tasks, return_exceptions=True)
        # 方法2:使用TaskGroup(Python 3.11+)
        # async with asyncio.TaskGroup() as tg:
        #     tasks = [tg.create_task(fetch_url(session, url)) for url in urls]
        # results = [task.result() for task in tasks]
        return results
# 运行异步任务
results = asyncio.run(gather_async_results())

带进度条和错误处理的高级汇总

适合生产环境:

from tqdm import tqdm
import logging
from dataclasses import dataclass
from typing import Any, List, Tuple
@dataclass
class TaskResult:
    """任务结果数据类"""
    task_id: str
    status: str  # 'success' or 'failed'
    data: Any = None
    error: str = None
def execute_and_collect(tasks: List[Tuple[str, callable]]) -> List[TaskResult]:
    """
    执行任务并汇总结果,带进度条和错误处理
    Args:
        tasks: [(task_id, task_function), ...]
    Returns:
        TaskResult列表
    """
    results = []
    # 使用tqdm显示进度
    for task_id, task_func in tqdm(tasks, desc="处理任务"):
        try:
            data = task_func()
            results.append(TaskResult(
                task_id=task_id,
                status='success',
                data=data
            ))
            logging.info(f"任务 {task_id} 成功完成")
        except Exception as e:
            results.append(TaskResult(
                task_id=task_id,
                status='failed',
                error=str(e)
            ))
            logging.error(f"任务 {task_id} 失败: {e}")
    return results
# 使用示例
def sample_task1():
    return "数据1"
def sample_task2():
    raise ValueError("模拟错误")
def sample_task3():
    return "数据3"
tasks = [
    ("task1", sample_task1),
    ("task2", sample_task2),
    ("task3", sample_task3)
]
final_results = execute_and_collect(tasks)
# 分析汇总结果
success_count = sum(1 for r in final_results if r.status == 'success')
failed_count = sum(1 for r in final_results if r.status == 'failed')
print(f"成功: {success_count}, 失败: {failed_count}")
print(f"汇总数据: {[r.data for r in final_results if r.status == 'success']}")

实用统计汇总(数据分析场景)

import pandas as pd
from collections import Counter
def summary_statistics(results):
    """对任务结果进行统计分析"""
    if not results:
        return {}
    # 假设结果是数值列表
    df = pd.DataFrame(results, columns=['value'])
    summary = {
        '总数': len(results),
        '平均值': df['value'].mean(),
        '中位数': df['value'].median(),
        '标准差': df['value'].std(),
        '最小值': df['value'].min(),
        '最大值': df['value'].max(),
        '四分位数': df['value'].quantile([0.25, 0.5, 0.75]).to_dict()
    }
    return summary
# 使用示例
test_results = [10, 20, 15, 25, 30, 20, 15, 35]
stats = summary_statistics(test_results)
for key, value in stats.items():
    print(f"{key}: {value}")

选择建议

场景 推荐方法
简单顺序任务 list.append() 或列表推导式
并发I/O任务 asyncio.gather()
CPU密集型并行 ProcessPoolExecutor.map()
需要进度和错误处理 自定义 TaskResult 类 + tqdm
数据分析汇总 pandas DataFrame 统计

根据你的具体需求(任务数量、类型、是否并行、是否需要进度条)选择最适合的方法。

抱歉,评论功能暂时关闭!