并发任务控制脚本?

wen 实用脚本 44

本文目录导读:

并发任务控制脚本?

  1. 基础版本 - 使用线程池
  2. 高级版本 - 自定义信号量控制
  3. 简单实用的装饰器版本
  4. 使用建议

我来为你创建一个并发任务控制脚本的示例,这个脚本可以限制同时运行的任务数量,避免系统资源被耗尽。

基础版本 - 使用线程池

import threading
import time
import random
from concurrent.futures import ThreadPoolExecutor, as_completed
from typing import List, Callable, Any
def worker_task(task_id: int, delay: float = 2.0) -> dict:
    """模拟一个耗时任务"""
    start_time = time.time()
    print(f"[任务 {task_id}] 开始执行,预计耗时 {delay} 秒")
    time.sleep(delay)
    result = {
        "task_id": task_id,
        "status": "completed",
        "execution_time": round(time.time() - start_time, 2)
    }
    print(f"[任务 {task_id}] 完成,耗时 {result['execution_time']} 秒")
    return result
def run_with_concurrency(tasks: List[Any], max_workers: int = 3) -> List[dict]:
    """
    并发执行任务列表
    Args:
        tasks: 任务参数列表
        max_workers: 最大并发数
    Returns:
        任务结果列表
    """
    results = []
    with ThreadPoolExecutor(max_workers=max_workers) as executor:
        # 提交所有任务
        future_to_task = {
            executor.submit(worker_task, task_id=task_id, delay=random.uniform(1, 5)): task_id 
            for task_id in tasks
        }
        # 收集结果
        for future in as_completed(future_to_task):
            task_id = future_to_task[future]
            try:
                result = future.result()
                results.append(result)
            except Exception as e:
                print(f"[任务 {task_id}] 执行失败: {e}")
                results.append({
                    "task_id": task_id,
                    "status": "failed",
                    "error": str(e)
                })
    return results
# 使用示例
if __name__ == "__main__":
    tasks = list(range(1, 11))  # 10个任务
    max_workers = 3  # 最大并发数
    print(f"开始执行 {len(tasks)} 个任务,最大并发数: {max_workers}")
    start = time.time()
    results = run_with_concurrency(tasks, max_workers)
    print(f"\n所有任务完成,总耗时: {round(time.time() - start, 2)} 秒")
    print("结果统计:")
    successful = [r for r in results if r["status"] == "completed"]
    failed = [r for r in results if r["status"] == "failed"]
    print(f"成功: {len(successful)}, 失败: {len(failed)}")

高级版本 - 自定义信号量控制

import threading
import time
import random
from typing import Callable, Any, List
from dataclasses import dataclass
from enum import Enum
class TaskStatus(Enum):
    PENDING = "pending"
    RUNNING = "running"
    COMPLETED = "completed"
    FAILED = "failed"
@dataclass
class TaskResult:
    task_id: int
    status: TaskStatus
    start_time: float = 0
    end_time: float = 0
    result: Any = None
    error: str = ""
class ConcurrencyController:
    """并发任务控制器"""
    def __init__(self, max_concurrent: int = 5):
        self.max_concurrent = max_concurrent
        self.semaphore = threading.Semaphore(max_concurrent)
        self.results: List[TaskResult] = []
        self.lock = threading.Lock()
    def execute_task(self, task_func: Callable, task_id: int, *args, **kwargs) -> TaskResult:
        """执行单个任务"""
        result = TaskResult(task_id=task_id, status=TaskStatus.PENDING)
        try:
            # 获取信号量,限制并发
            with self.semaphore:
                result.status = TaskStatus.RUNNING
                result.start_time = time.time()
                # 实际执行任务
                print(f"[任务 {task_id}] 开始执行 (当前活跃任务数: {self.get_active_count()})")
                task_result = task_func(*args, **kwargs)
                result.status = TaskStatus.COMPLETED
                result.end_time = time.time()
                result.result = task_result
                print(f"[任务 {task_id}] 完成,耗时: {round(result.end_time - result.start_time, 2)} 秒")
        except Exception as e:
            result.status = TaskStatus.FAILED
            result.end_time = time.time()
            result.error = str(e)
            print(f"[任务 {task_id}] 失败: {e}")
        # 安全地保存结果
        with self.lock:
            self.results.append(result)
        return result
    def get_active_count(self) -> int:
        """获取当前活跃任务数"""
        active = sum(1 for r in self.results if r.status == TaskStatus.RUNNING)
        if not self.results:
            return 0
        running_current = sum(1 for r in self.results if r.status == TaskStatus.RUNNING)
        return running_current
    def run_tasks(self, tasks: List[tuple], max_retries: int = 0):
        """
        执行任务列表
        Args:
            tasks: 任务列表,每个元素为 (task_func, task_id, args, kwargs)
            max_retries: 失败重试次数
        """
        threads = []
        for task_func, task_id, args, kwargs in tasks:
            thread = threading.Thread(
                target=self.execute_task,
                args=(task_func, task_id, *args),
                kwargs=kwargs
            )
            thread.start()
            threads.append(thread)
        # 等待所有线程完成
        for thread in threads:
            thread.join()
        # 处理失败重试
        failed_tasks = [r for r in self.results if r.status == TaskStatus.FAILED]
        if failed_tasks and max_retries > 0:
            print(f"\n有 {len(failed_tasks)} 个任务失败,开始重试 (剩余重试次数: {max_retries})")
            self.results = [r for r in self.results if r.status != TaskStatus.FAILED]
            retry_tasks = []
            for failed in failed_tasks:
                retry_tasks.append((worker_func, failed.task_id, (2,), {}))
            self.run_tasks(retry_tasks, max_retries - 1)
# 使用示例
def worker_func(task_id: int, delay: float = 2.0) -> dict:
    """工作函数"""
    time.sleep(delay)
    return {"processed": task_id, "delay": delay}
if __name__ == "__main__":
    # 创建控制器,最大并发3个任务
    controller = ConcurrencyController(max_concurrent=3)
    # 准备任务列表
    tasks = []
    for i in range(1, 11):
        delay = random.uniform(1, 3)
        tasks.append((worker_func, i, (delay,), {}))
    print("开始执行并发任务...")
    start = time.time()
    controller.run_tasks(tasks, max_retries=1)
    print(f"\n所有任务完成,总耗时: {round(time.time() - start, 2)} 秒")
    # 统计结果
    successful = [r for r in controller.results if r.status == TaskStatus.COMPLETED]
    failed = [r for r in controller.results if r.status == TaskStatus.FAILED]
    print(f"成功: {len(successful)}, 失败: {len(failed)}")
    for result in controller.results:
        if result.status == TaskStatus.COMPLETED:
            runtime = round(result.end_time - result.start_time, 2)
            print(f"  任务 {result.task_id}: 成功 (用时 {runtime} 秒)")
        else:
            print(f"  任务 {result.task_id}: 失败 - {result.error}")

简单实用的装饰器版本

import threading
import time
from functools import wraps
from typing import Callable, Any
def concurrent(max_workers: int = 5):
    """
    并发执行装饰器
    Args:
        max_workers: 最大并发数
    """
    semaphore = threading.Semaphore(max_workers)
    def decorator(func: Callable) -> Callable:
        @wraps(func)
        def wrapper(*args, **kwargs) -> Any:
            with semaphore:
                return func(*args, **kwargs)
        return wrapper
    return decorator
# 使用装饰器
@concurrent(max_workers=3)
def limited_task(task_id: int):
    """被限制并发的任务"""
    print(f"任务 {task_id} 开始")
    time.sleep(2)  # 模拟耗时操作
    print(f"任务 {task_id} 完成")
    return f"结果-{task_id}"
# 批量执行
def run_limited_tasks(task_count: int = 10):
    """运行多个限流任务"""
    threads = []
    for i in range(task_count):
        t = threading.Thread(target=limited_task, args=(i,))
        threads.append(t)
        t.start()
    for t in threads:
        t.join()
if __name__ == "__main__":
    start = time.time()
    run_limited_tasks(10)
    print(f"总耗时: {round(time.time() - start, 2)} 秒")

使用建议

  1. 选择合适的并发数:根据系统资源(CPU核心数、内存)和任务类型调整
  2. 错误处理:实现重试机制和错误日志记录
  3. 监控和日志:记录任务执行状态和性能指标
  4. 资源清理:确保所有任务完成后释放资源
  5. 任务优先级:可扩展支持优先级队列

这个控制器可以用于:

  • Web爬虫的请求限制
  • API调用的并发控制
  • 批量数据处理
  • 文件上传下载
  • 数据库连接池管理

根据需要选择适合的版本,基础版本适合简单场景,高级版本提供了更多控制功能。

抱歉,评论功能暂时关闭!