Python案例如何批量运行

wen python案例 47

本文目录导读:

Python案例如何批量运行

  1. 使用脚本批量执行
  2. 使用execimportlib动态导入
  3. 使用multiprocessing并行运行
  4. 使用配置文件管理案例
  5. 使用测试框架运行
  6. 最实用的完整解决方案
  7. 使用建议

我来介绍几种在Python中批量运行案例的方法:

使用脚本批量执行

创建批处理脚本

# batch_run.py
import subprocess
import os
import glob
from pathlib import Path
# 方法1:运行当前目录下所有Python文件
def run_all_python_files():
    python_files = glob.glob("*.py")
    for file in python_files:
        if file != "batch_run.py":  # 跳过自身
            print(f"\n=== 运行 {file} ===")
            subprocess.run(["python", file])
# 方法2:运行指定列表中的文件
def run_specific_files():
    files_to_run = [
        "case1.py",
        "case2.py",
        "case3.py"
    ]
    for file in files_to_run:
        if os.path.exists(file):
            print(f"\n=== 运行 {file} ===")
            result = subprocess.run(["python", file], 
                                  capture_output=True, 
                                  text=True)
            print(result.stdout)
            if result.stderr:
                print(f"错误: {result.stderr}")
# 方法3:带参数的运行
def run_with_args():
    cases = [
        ("case1.py", ["arg1", "arg2"]),
        ("case2.py", ["--input", "data.txt"]),
        ("case3.py", ["--debug"])
    ]
    for file, args in cases:
        print(f"运行 {file} 参数: {args}")
        subprocess.run(["python", file] + args)
if __name__ == "__main__":
    run_all_python_files()

使用execimportlib动态导入

# dynamic_run.py
import importlib.util
import sys
from pathlib import Path
def run_module_dynamically(module_path):
    """动态导入并运行模块"""
    module_name = Path(module_path).stem
    # 加载模块
    spec = importlib.util.spec_from_file_location(module_name, module_path)
    module = importlib.util.module_from_spec(spec)
    # 添加模块到sys.modules
    sys.modules[module_name] = module
    # 执行模块
    spec.loader.exec_module(module)
    # 如果模块有main函数,调用它
    if hasattr(module, 'main'):
        module.main()
# 批量运行
cases = ["case1.py", "case2.py", "case3.py"]
for case in cases:
    if Path(case).exists():
        print(f"\n=== 运行 {case} ===")
        try:
            run_module_dynamically(case)
        except Exception as e:
            print(f"运行失败: {e}")

使用multiprocessing并行运行

# parallel_run.py
import multiprocessing
import subprocess
from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor
import time
def run_python_file(file_path):
    """运行单个Python文件"""
    print(f"开始运行: {file_path}")
    result = subprocess.run(["python", file_path], 
                          capture_output=True, 
                          text=True)
    return {
        "file": file_path,
        "success": result.returncode == 0,
        "output": result.stdout,
        "error": result.stderr
    }
def parallel_run():
    """并行运行多个Python文件"""
    files = ["case1.py", "case2.py", "case3.py", "case4.py"]
    # 使用线程池
    with ThreadPoolExecutor(max_workers=4) as executor:
        futures = [executor.submit(run_python_file, f) for f in files]
        for future in futures:
            result = future.result()
            status = "成功" if result["success"] else "失败"
            print(f"{result['file']}: {status}")
    # 使用进程池(适合CPU密集型任务)
    with ProcessPoolExecutor(max_workers=4) as executor:
        results = list(executor.map(run_python_file, files))
        for result in results:
            print(f"{result['file']}: {'成功' if result['success'] else '失败'}")
if __name__ == "__main__":
    parallel_run()

使用配置文件管理案例

# config_run.py
import json
import yaml  # 需要安装: pip install pyyaml
import subprocess
class CaseRunner:
    def __init__(self, config_file):
        self.cases = self.load_config(config_file)
    def load_config(self, config_file):
        """加载配置文件"""
        if config_file.endswith('.json'):
            with open(config_file, 'r') as f:
                return json.load(f)
        elif config_file.endswith('.yaml') or config_file.endswith('.yml'):
            with open(config_file, 'r') as f:
                return yaml.safe_load(f)
        else:
            raise ValueError("不支持的配置文件格式")
    def run_all(self):
        """运行所有案例"""
        results = []
        for case in self.cases.get('cases', []):
            file = case.get('file')
            args = case.get('args', [])
            description = case.get('description', file)
            print(f"\n=== {description} ===")
            try:
                result = subprocess.run(
                    ["python", file] + args,
                    capture_output=True,
                    text=True,
                    timeout=case.get('timeout', 30)  # 超时设置
                )
                results.append({
                    'file': file,
                    'success': result.returncode == 0,
                    'output': result.stdout,
                    'error': result.stderr
                })
                if result.returncode == 0:
                    print("✓ 运行成功")
                else:
                    print(f"✗ 运行失败: {result.stderr}")
            except subprocess.TimeoutExpired:
                print(f"✗ 运行超时")
                results.append({
                    'file': file,
                    'success': False,
                    'error': 'Timeout'
                })
        return results
# 使用示例
runner = CaseRunner('cases_config.json')
results = runner.run_all()

配置文件示例 (cases_config.json):

{
    "cases": [
        {
            "file": "case1.py",
            "description": "基础案例",
            "args": [],
            "timeout": 30
        },
        {
            "file": "case2.py",
            "description": "高级案例",
            "args": ["--input", "data.txt", "--verbose"],
            "timeout": 60
        },
        {
            "file": "test_suite.py",
            "description": "测试套件",
            "args": ["-v"],
            "timeout": 120
        }
    ]
}

使用测试框架运行

# test_runner.py
import unittest
import pytest  # 需要安装: pip install pytest
# 使用unittest
def run_unittest_tests():
    loader = unittest.TestLoader()
    suite = unittest.TestSuite()
    # 添加测试类
    suite.addTests(loader.loadTestsFromModule('test_module'))
    suite.addTests(loader.discover('tests', pattern='test_*.py'))
    runner = unittest.TextTestRunner(verbosity=2)
    runner.run(suite)
# 使用pytest
def run_pytest_tests():
    import sys
    # 运行所有测试
    exit_code = pytest.main(['tests/', '-v'])
    # 运行特定标记的测试
    exit_code = pytest.main(['tests/', '-m', 'slow'])
    return exit_code
# 使用pytest参数化运行
import pytest
@pytest.mark.parametrize("case_file", [
    "case1.py",
    "case2.py", 
    "case3.py"
])
def test_case_running(case_file):
    import subprocess
    result = subprocess.run(["python", case_file], 
                          capture_output=True, 
                          text=True)
    assert result.returncode == 0, f"{case_file} 运行失败"

最实用的完整解决方案

# batch_runner.py
import subprocess
import time
from datetime import datetime
from pathlib import Path
import logging
import concurrent.futures
class BatchCaseRunner:
    def __init__(self, cases_dir="cases", log_file="batch_run.log"):
        self.cases_dir = Path(cases_dir)
        self.log_file = log_file
        self.setup_logging()
    def setup_logging(self):
        logging.basicConfig(
            filename=self.log_file,
            level=logging.INFO,
            format='%(asctime)s - %(levelname)s - %(message)s'
        )
    def run_single_case(self, case_file, timeout=60):
        """运行单个案例"""
        start_time = time.time()
        result = {
            'file': case_file,
            'success': False,
            'duration': 0,
            'output': '',
            'error': ''
        }
        try:
            process = subprocess.run(
                ["python", str(case_file)],
                capture_output=True,
                text=True,
                timeout=timeout
            )
            result['success'] = process.returncode == 0
            result['output'] = process.stdout
            result['error'] = process.stderr
            result['duration'] = time.time() - start_time
            status = "成功" if result['success'] else "失败"
            logging.info(f"{case_file.name}: {status} ({result['duration']:.2f}s)")
        except subprocess.TimeoutExpired:
            result['error'] = f"超时 ({timeout}s)"
            logging.warning(f"{case_file.name}: 超时")
        except Exception as e:
            result['error'] = str(e)
            logging.error(f"{case_file.name}: {str(e)}")
        return result
    def run_all_cases(self, parallel=False, max_workers=4):
        """运行所有案例"""
        case_files = list(self.cases_dir.glob("*.py"))
        if not case_files:
            print(f"在 {self.cases_dir} 中没有找到Python文件")
            return []
        print(f"找到 {len(case_files)} 个案例文件")
        print("=" * 50)
        results = []
        if parallel and len(case_files) > 1:
            # 并行运行
            with concurrent.futures.ThreadPoolExecutor(max_workers=max_workers) as executor:
                future_to_file = {
                    executor.submit(self.run_single_case, f): f 
                    for f in case_files
                }
                for future in concurrent.futures.as_completed(future_to_file):
                    result = future.result()
                    results.append(result)
                    self.print_result(result)
        else:
            # 串行运行
            for case_file in case_files:
                print(f"\n运行: {case_file.name}")
                print("-" * 30)
                result = self.run_single_case(case_file)
                results.append(result)
                self.print_result(result)
        # 生成报告
        self.generate_report(results)
        return results
    def print_result(self, result):
        """打印运行结果"""
        status_icon = "✓" if result['success'] else "✗"
        print(f"{status_icon} {result['file'].name}")
        print(f"  耗时: {result['duration']:.2f}s")
        if result['error']:
            print(f"  错误: {result['error'][:200]}")
    def generate_report(self, results):
        """生成运行报告"""
        total = len(results)
        success = sum(1 for r in results if r['success'])
        failed = total - success
        total_time = sum(r['duration'] for r in results)
        timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
        report = f"""
{'='*50}
批量运行报告
时间: {timestamp}
{'='*50}
总案例数: {total}
成功: {success}
失败: {failed}
总耗时: {total_time:.2f}s
平均耗时: {total_time/total:.2f}s if total > 0 else 0
{'='*50}
"""
        print(report)
        logging.info(report)
# 使用示例
if __name__ == "__main__":
    runner = BatchCaseRunner(cases_dir=".", parallel=True)
    results = runner.run_all_cases()
    # 也可以运行指定的案例
    # selected_cases = ["case1.py", "case2.py"]
    # for case in selected_cases:
    #     result = runner.run_single_case(Path(case))
    #     runner.print_result(result)

使用建议

  1. 选择合适的方法:

    • 少量案例:使用简单的subprocess
    • 大量案例:使用并行处理
    • 测试场景:使用unittest/pytest
    • 需要日志:使用完整的BatchCaseRunner
  2. 错误处理:

    • 添加超时机制
    • 捕获所有异常
    • 记录详细的错误信息
  3. 性能优化:

    • 使用并行处理加速
    • 限制并发数量
    • 监控内存使用

这样你就可以根据需要选择合适的批量运行方式了!

抱歉,评论功能暂时关闭!