本文目录导读:

我来为你创建一个并发任务控制脚本的示例,这个脚本可以限制同时运行的任务数量,避免系统资源被耗尽。
基础版本 - 使用线程池
import threading
import time
import random
from concurrent.futures import ThreadPoolExecutor, as_completed
from typing import List, Callable, Any
def worker_task(task_id: int, delay: float = 2.0) -> dict:
"""模拟一个耗时任务"""
start_time = time.time()
print(f"[任务 {task_id}] 开始执行,预计耗时 {delay} 秒")
time.sleep(delay)
result = {
"task_id": task_id,
"status": "completed",
"execution_time": round(time.time() - start_time, 2)
}
print(f"[任务 {task_id}] 完成,耗时 {result['execution_time']} 秒")
return result
def run_with_concurrency(tasks: List[Any], max_workers: int = 3) -> List[dict]:
"""
并发执行任务列表
Args:
tasks: 任务参数列表
max_workers: 最大并发数
Returns:
任务结果列表
"""
results = []
with ThreadPoolExecutor(max_workers=max_workers) as executor:
# 提交所有任务
future_to_task = {
executor.submit(worker_task, task_id=task_id, delay=random.uniform(1, 5)): task_id
for task_id in tasks
}
# 收集结果
for future in as_completed(future_to_task):
task_id = future_to_task[future]
try:
result = future.result()
results.append(result)
except Exception as e:
print(f"[任务 {task_id}] 执行失败: {e}")
results.append({
"task_id": task_id,
"status": "failed",
"error": str(e)
})
return results
# 使用示例
if __name__ == "__main__":
tasks = list(range(1, 11)) # 10个任务
max_workers = 3 # 最大并发数
print(f"开始执行 {len(tasks)} 个任务,最大并发数: {max_workers}")
start = time.time()
results = run_with_concurrency(tasks, max_workers)
print(f"\n所有任务完成,总耗时: {round(time.time() - start, 2)} 秒")
print("结果统计:")
successful = [r for r in results if r["status"] == "completed"]
failed = [r for r in results if r["status"] == "failed"]
print(f"成功: {len(successful)}, 失败: {len(failed)}")
高级版本 - 自定义信号量控制
import threading
import time
import random
from typing import Callable, Any, List
from dataclasses import dataclass
from enum import Enum
class TaskStatus(Enum):
PENDING = "pending"
RUNNING = "running"
COMPLETED = "completed"
FAILED = "failed"
@dataclass
class TaskResult:
task_id: int
status: TaskStatus
start_time: float = 0
end_time: float = 0
result: Any = None
error: str = ""
class ConcurrencyController:
"""并发任务控制器"""
def __init__(self, max_concurrent: int = 5):
self.max_concurrent = max_concurrent
self.semaphore = threading.Semaphore(max_concurrent)
self.results: List[TaskResult] = []
self.lock = threading.Lock()
def execute_task(self, task_func: Callable, task_id: int, *args, **kwargs) -> TaskResult:
"""执行单个任务"""
result = TaskResult(task_id=task_id, status=TaskStatus.PENDING)
try:
# 获取信号量,限制并发
with self.semaphore:
result.status = TaskStatus.RUNNING
result.start_time = time.time()
# 实际执行任务
print(f"[任务 {task_id}] 开始执行 (当前活跃任务数: {self.get_active_count()})")
task_result = task_func(*args, **kwargs)
result.status = TaskStatus.COMPLETED
result.end_time = time.time()
result.result = task_result
print(f"[任务 {task_id}] 完成,耗时: {round(result.end_time - result.start_time, 2)} 秒")
except Exception as e:
result.status = TaskStatus.FAILED
result.end_time = time.time()
result.error = str(e)
print(f"[任务 {task_id}] 失败: {e}")
# 安全地保存结果
with self.lock:
self.results.append(result)
return result
def get_active_count(self) -> int:
"""获取当前活跃任务数"""
active = sum(1 for r in self.results if r.status == TaskStatus.RUNNING)
if not self.results:
return 0
running_current = sum(1 for r in self.results if r.status == TaskStatus.RUNNING)
return running_current
def run_tasks(self, tasks: List[tuple], max_retries: int = 0):
"""
执行任务列表
Args:
tasks: 任务列表,每个元素为 (task_func, task_id, args, kwargs)
max_retries: 失败重试次数
"""
threads = []
for task_func, task_id, args, kwargs in tasks:
thread = threading.Thread(
target=self.execute_task,
args=(task_func, task_id, *args),
kwargs=kwargs
)
thread.start()
threads.append(thread)
# 等待所有线程完成
for thread in threads:
thread.join()
# 处理失败重试
failed_tasks = [r for r in self.results if r.status == TaskStatus.FAILED]
if failed_tasks and max_retries > 0:
print(f"\n有 {len(failed_tasks)} 个任务失败,开始重试 (剩余重试次数: {max_retries})")
self.results = [r for r in self.results if r.status != TaskStatus.FAILED]
retry_tasks = []
for failed in failed_tasks:
retry_tasks.append((worker_func, failed.task_id, (2,), {}))
self.run_tasks(retry_tasks, max_retries - 1)
# 使用示例
def worker_func(task_id: int, delay: float = 2.0) -> dict:
"""工作函数"""
time.sleep(delay)
return {"processed": task_id, "delay": delay}
if __name__ == "__main__":
# 创建控制器,最大并发3个任务
controller = ConcurrencyController(max_concurrent=3)
# 准备任务列表
tasks = []
for i in range(1, 11):
delay = random.uniform(1, 3)
tasks.append((worker_func, i, (delay,), {}))
print("开始执行并发任务...")
start = time.time()
controller.run_tasks(tasks, max_retries=1)
print(f"\n所有任务完成,总耗时: {round(time.time() - start, 2)} 秒")
# 统计结果
successful = [r for r in controller.results if r.status == TaskStatus.COMPLETED]
failed = [r for r in controller.results if r.status == TaskStatus.FAILED]
print(f"成功: {len(successful)}, 失败: {len(failed)}")
for result in controller.results:
if result.status == TaskStatus.COMPLETED:
runtime = round(result.end_time - result.start_time, 2)
print(f" 任务 {result.task_id}: 成功 (用时 {runtime} 秒)")
else:
print(f" 任务 {result.task_id}: 失败 - {result.error}")
简单实用的装饰器版本
import threading
import time
from functools import wraps
from typing import Callable, Any
def concurrent(max_workers: int = 5):
"""
并发执行装饰器
Args:
max_workers: 最大并发数
"""
semaphore = threading.Semaphore(max_workers)
def decorator(func: Callable) -> Callable:
@wraps(func)
def wrapper(*args, **kwargs) -> Any:
with semaphore:
return func(*args, **kwargs)
return wrapper
return decorator
# 使用装饰器
@concurrent(max_workers=3)
def limited_task(task_id: int):
"""被限制并发的任务"""
print(f"任务 {task_id} 开始")
time.sleep(2) # 模拟耗时操作
print(f"任务 {task_id} 完成")
return f"结果-{task_id}"
# 批量执行
def run_limited_tasks(task_count: int = 10):
"""运行多个限流任务"""
threads = []
for i in range(task_count):
t = threading.Thread(target=limited_task, args=(i,))
threads.append(t)
t.start()
for t in threads:
t.join()
if __name__ == "__main__":
start = time.time()
run_limited_tasks(10)
print(f"总耗时: {round(time.time() - start, 2)} 秒")
使用建议
- 选择合适的并发数:根据系统资源(CPU核心数、内存)和任务类型调整
- 错误处理:实现重试机制和错误日志记录
- 监控和日志:记录任务执行状态和性能指标
- 资源清理:确保所有任务完成后释放资源
- 任务优先级:可扩展支持优先级队列
这个控制器可以用于:
- Web爬虫的请求限制
- API调用的并发控制
- 批量数据处理
- 文件上传下载
- 数据库连接池管理
根据需要选择适合的版本,基础版本适合简单场景,高级版本提供了更多控制功能。