本文目录导读:

我来介绍几种在Python中批量运行案例的方法:
使用脚本批量执行
创建批处理脚本
# batch_run.py
import subprocess
import os
import glob
from pathlib import Path
# 方法1:运行当前目录下所有Python文件
def run_all_python_files():
python_files = glob.glob("*.py")
for file in python_files:
if file != "batch_run.py": # 跳过自身
print(f"\n=== 运行 {file} ===")
subprocess.run(["python", file])
# 方法2:运行指定列表中的文件
def run_specific_files():
files_to_run = [
"case1.py",
"case2.py",
"case3.py"
]
for file in files_to_run:
if os.path.exists(file):
print(f"\n=== 运行 {file} ===")
result = subprocess.run(["python", file],
capture_output=True,
text=True)
print(result.stdout)
if result.stderr:
print(f"错误: {result.stderr}")
# 方法3:带参数的运行
def run_with_args():
cases = [
("case1.py", ["arg1", "arg2"]),
("case2.py", ["--input", "data.txt"]),
("case3.py", ["--debug"])
]
for file, args in cases:
print(f"运行 {file} 参数: {args}")
subprocess.run(["python", file] + args)
if __name__ == "__main__":
run_all_python_files()
使用exec或importlib动态导入
# dynamic_run.py
import importlib.util
import sys
from pathlib import Path
def run_module_dynamically(module_path):
"""动态导入并运行模块"""
module_name = Path(module_path).stem
# 加载模块
spec = importlib.util.spec_from_file_location(module_name, module_path)
module = importlib.util.module_from_spec(spec)
# 添加模块到sys.modules
sys.modules[module_name] = module
# 执行模块
spec.loader.exec_module(module)
# 如果模块有main函数,调用它
if hasattr(module, 'main'):
module.main()
# 批量运行
cases = ["case1.py", "case2.py", "case3.py"]
for case in cases:
if Path(case).exists():
print(f"\n=== 运行 {case} ===")
try:
run_module_dynamically(case)
except Exception as e:
print(f"运行失败: {e}")
使用multiprocessing并行运行
# parallel_run.py
import multiprocessing
import subprocess
from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor
import time
def run_python_file(file_path):
"""运行单个Python文件"""
print(f"开始运行: {file_path}")
result = subprocess.run(["python", file_path],
capture_output=True,
text=True)
return {
"file": file_path,
"success": result.returncode == 0,
"output": result.stdout,
"error": result.stderr
}
def parallel_run():
"""并行运行多个Python文件"""
files = ["case1.py", "case2.py", "case3.py", "case4.py"]
# 使用线程池
with ThreadPoolExecutor(max_workers=4) as executor:
futures = [executor.submit(run_python_file, f) for f in files]
for future in futures:
result = future.result()
status = "成功" if result["success"] else "失败"
print(f"{result['file']}: {status}")
# 使用进程池(适合CPU密集型任务)
with ProcessPoolExecutor(max_workers=4) as executor:
results = list(executor.map(run_python_file, files))
for result in results:
print(f"{result['file']}: {'成功' if result['success'] else '失败'}")
if __name__ == "__main__":
parallel_run()
使用配置文件管理案例
# config_run.py
import json
import yaml # 需要安装: pip install pyyaml
import subprocess
class CaseRunner:
def __init__(self, config_file):
self.cases = self.load_config(config_file)
def load_config(self, config_file):
"""加载配置文件"""
if config_file.endswith('.json'):
with open(config_file, 'r') as f:
return json.load(f)
elif config_file.endswith('.yaml') or config_file.endswith('.yml'):
with open(config_file, 'r') as f:
return yaml.safe_load(f)
else:
raise ValueError("不支持的配置文件格式")
def run_all(self):
"""运行所有案例"""
results = []
for case in self.cases.get('cases', []):
file = case.get('file')
args = case.get('args', [])
description = case.get('description', file)
print(f"\n=== {description} ===")
try:
result = subprocess.run(
["python", file] + args,
capture_output=True,
text=True,
timeout=case.get('timeout', 30) # 超时设置
)
results.append({
'file': file,
'success': result.returncode == 0,
'output': result.stdout,
'error': result.stderr
})
if result.returncode == 0:
print("✓ 运行成功")
else:
print(f"✗ 运行失败: {result.stderr}")
except subprocess.TimeoutExpired:
print(f"✗ 运行超时")
results.append({
'file': file,
'success': False,
'error': 'Timeout'
})
return results
# 使用示例
runner = CaseRunner('cases_config.json')
results = runner.run_all()
配置文件示例 (cases_config.json):
{
"cases": [
{
"file": "case1.py",
"description": "基础案例",
"args": [],
"timeout": 30
},
{
"file": "case2.py",
"description": "高级案例",
"args": ["--input", "data.txt", "--verbose"],
"timeout": 60
},
{
"file": "test_suite.py",
"description": "测试套件",
"args": ["-v"],
"timeout": 120
}
]
}
使用测试框架运行
# test_runner.py
import unittest
import pytest # 需要安装: pip install pytest
# 使用unittest
def run_unittest_tests():
loader = unittest.TestLoader()
suite = unittest.TestSuite()
# 添加测试类
suite.addTests(loader.loadTestsFromModule('test_module'))
suite.addTests(loader.discover('tests', pattern='test_*.py'))
runner = unittest.TextTestRunner(verbosity=2)
runner.run(suite)
# 使用pytest
def run_pytest_tests():
import sys
# 运行所有测试
exit_code = pytest.main(['tests/', '-v'])
# 运行特定标记的测试
exit_code = pytest.main(['tests/', '-m', 'slow'])
return exit_code
# 使用pytest参数化运行
import pytest
@pytest.mark.parametrize("case_file", [
"case1.py",
"case2.py",
"case3.py"
])
def test_case_running(case_file):
import subprocess
result = subprocess.run(["python", case_file],
capture_output=True,
text=True)
assert result.returncode == 0, f"{case_file} 运行失败"
最实用的完整解决方案
# batch_runner.py
import subprocess
import time
from datetime import datetime
from pathlib import Path
import logging
import concurrent.futures
class BatchCaseRunner:
def __init__(self, cases_dir="cases", log_file="batch_run.log"):
self.cases_dir = Path(cases_dir)
self.log_file = log_file
self.setup_logging()
def setup_logging(self):
logging.basicConfig(
filename=self.log_file,
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s'
)
def run_single_case(self, case_file, timeout=60):
"""运行单个案例"""
start_time = time.time()
result = {
'file': case_file,
'success': False,
'duration': 0,
'output': '',
'error': ''
}
try:
process = subprocess.run(
["python", str(case_file)],
capture_output=True,
text=True,
timeout=timeout
)
result['success'] = process.returncode == 0
result['output'] = process.stdout
result['error'] = process.stderr
result['duration'] = time.time() - start_time
status = "成功" if result['success'] else "失败"
logging.info(f"{case_file.name}: {status} ({result['duration']:.2f}s)")
except subprocess.TimeoutExpired:
result['error'] = f"超时 ({timeout}s)"
logging.warning(f"{case_file.name}: 超时")
except Exception as e:
result['error'] = str(e)
logging.error(f"{case_file.name}: {str(e)}")
return result
def run_all_cases(self, parallel=False, max_workers=4):
"""运行所有案例"""
case_files = list(self.cases_dir.glob("*.py"))
if not case_files:
print(f"在 {self.cases_dir} 中没有找到Python文件")
return []
print(f"找到 {len(case_files)} 个案例文件")
print("=" * 50)
results = []
if parallel and len(case_files) > 1:
# 并行运行
with concurrent.futures.ThreadPoolExecutor(max_workers=max_workers) as executor:
future_to_file = {
executor.submit(self.run_single_case, f): f
for f in case_files
}
for future in concurrent.futures.as_completed(future_to_file):
result = future.result()
results.append(result)
self.print_result(result)
else:
# 串行运行
for case_file in case_files:
print(f"\n运行: {case_file.name}")
print("-" * 30)
result = self.run_single_case(case_file)
results.append(result)
self.print_result(result)
# 生成报告
self.generate_report(results)
return results
def print_result(self, result):
"""打印运行结果"""
status_icon = "✓" if result['success'] else "✗"
print(f"{status_icon} {result['file'].name}")
print(f" 耗时: {result['duration']:.2f}s")
if result['error']:
print(f" 错误: {result['error'][:200]}")
def generate_report(self, results):
"""生成运行报告"""
total = len(results)
success = sum(1 for r in results if r['success'])
failed = total - success
total_time = sum(r['duration'] for r in results)
timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
report = f"""
{'='*50}
批量运行报告
时间: {timestamp}
{'='*50}
总案例数: {total}
成功: {success}
失败: {failed}
总耗时: {total_time:.2f}s
平均耗时: {total_time/total:.2f}s if total > 0 else 0
{'='*50}
"""
print(report)
logging.info(report)
# 使用示例
if __name__ == "__main__":
runner = BatchCaseRunner(cases_dir=".", parallel=True)
results = runner.run_all_cases()
# 也可以运行指定的案例
# selected_cases = ["case1.py", "case2.py"]
# for case in selected_cases:
# result = runner.run_single_case(Path(case))
# runner.print_result(result)
使用建议
-
选择合适的方法:
- 少量案例:使用简单的subprocess
- 大量案例:使用并行处理
- 测试场景:使用unittest/pytest
- 需要日志:使用完整的BatchCaseRunner
-
错误处理:
- 添加超时机制
- 捕获所有异常
- 记录详细的错误信息
-
性能优化:
- 使用并行处理加速
- 限制并发数量
- 监控内存使用
这样你就可以根据需要选择合适的批量运行方式了!