接口并发测试脚本?

wen 实用脚本 39

本文目录导读:

接口并发测试脚本?

  1. 基于 Python + threading 的并发测试
  2. 基于 aiohttp 的异步并发测试(性能更好)
  3. 命令行测试脚本(带参数)
  4. 使用示例
  5. 安装依赖

我来为您提供一个完整的接口并发测试脚本框架,包含多种实现方式:

基于 Python + threading 的并发测试

import requests
import threading
import time
import json
from concurrent.futures import ThreadPoolExecutor
import statistics
class ConcurrencyTester:
    def __init__(self, base_url, api_endpoint):
        self.base_url = base_url
        self.api_endpoint = api_endpoint
        self.results = []
        self.lock = threading.Lock()
    def single_request(self, request_id, payload=None):
        """单个请求测试"""
        start_time = time.time()
        try:
            url = f"{self.base_url}{self.api_endpoint}"
            # 根据需求修改请求方法(GET/POST/PUT/DELETE)
            if payload:
                response = requests.post(url, json=payload, timeout=10)
            else:
                response = requests.get(url, timeout=10)
            elapsed_time = time.time() - start_time
            result = {
                'request_id': request_id,
                'status_code': response.status_code,
                'response_time': elapsed_time,
                'success': response.status_code == 200,
                'error': None
            }
        except Exception as e:
            elapsed_time = time.time() - start_time
            result = {
                'request_id': request_id,
                'status_code': None,
                'response_time': elapsed_time,
                'success': False,
                'error': str(e)
            }
        with self.lock:
            self.results.append(result)
        return result
    def run_concurrent_test(self, concurrent_users, total_requests, payload=None):
        """并发测试执行"""
        print(f"开始并发测试 - 并发数: {concurrent_users}, 总请求数: {total_requests}")
        start_time = time.time()
        # 使用线程池执行并发请求
        with ThreadPoolExecutor(max_workers=concurrent_users) as executor:
            futures = []
            for i in range(total_requests):
                future = executor.submit(
                    self.single_request, 
                    i, 
                    payload
                )
                futures.append(future)
            # 等待所有请求完成
            for future in futures:
                try:
                    future.result()
                except Exception as e:
                    print(f"请求执行异常: {e}")
        total_time = time.time() - start_time
        # 统计结果
        self._analyze_results(total_time, concurrent_users)
    def _analyze_results(self, total_time, concurrent_users):
        """分析测试结果"""
        if not self.results:
            print("没有测试结果")
            return
        success_count = sum(1 for r in self.results if r['success'])
        fail_count = sum(1 for r in self.results if not r['success'])
        response_times = [r['response_time'] for r in self.results]
        print("\n" + "="*50)
        print("并发测试结果报告")
        print("="*50)
        print(f"总请求数: {len(self.results)}")
        print(f"并发用户数: {concurrent_users}")
        print(f"总耗时: {total_time:.2f}秒")
        print(f"成功数: {success_count}")
        print(f"失败数: {fail_count}")
        print(f"成功率: {(success_count/len(self.results))*100:.2f}%")
        if response_times:
            print(f"平均响应时间: {statistics.mean(response_times)*1000:.2f}ms")
            print(f"最小响应时间: {min(response_times)*1000:.2f}ms")
            print(f"最大响应时间: {max(response_times)*1000:.2f}ms")
            print(f"中位响应时间: {statistics.median(response_times)*1000:.2f}ms")
            # 计算吞吐量
            if total_time > 0:
                throughput = len(self.results) / total_time
                print(f"吞吐量: {throughput:.2f} 请求/秒")
        # 错误详情
        errors = [r for r in self.results if r['error']]
        if errors:
            print("\n错误详情:")
            for error in errors[:5]:  # 只显示前5个错误
                print(f"  请求 {error['request_id']}: {error['error']}")
        print("="*50)
# 使用示例
def main():
    # 配置测试参数
    BASE_URL = "https://api.example.com"
    API_ENDPOINT = "/v1/users"
    CONCURRENT_USERS = 10  # 并发用户数
    TOTAL_REQUESTS = 100   # 总请求数
    # 如果有请求体
    payload = {
        "user_id": "test123",
        "action": "query"
    }
    # 创建测试器并执行
    tester = ConcurrencyTester(BASE_URL, API_ENDPOINT)
    tester.run_concurrent_test(CONCURRENT_USERS, TOTAL_REQUESTS, payload)
if __name__ == "__main__":
    main()

基于 aiohttp 的异步并发测试(性能更好)

import aiohttp
import asyncio
import time
import json
from typing import List, Dict
class AsyncConcurrencyTester:
    def __init__(self, base_url: str, api_endpoint: str):
        self.base_url = base_url
        self.api_endpoint = api_endpoint
        self.results = []
    async def single_request(self, session: aiohttp.ClientSession, 
                            request_id: int, payload: dict = None) -> Dict:
        """单个异步请求"""
        start_time = time.time()
        url = f"{self.base_url}{self.api_endpoint}"
        try:
            if payload:
                async with session.post(url, json=payload, timeout=10) as response:
                    status_code = response.status
                    # 可选:读取响应内容
                    # response_text = await response.text()
            else:
                async with session.get(url, timeout=10) as response:
                    status_code = response.status
            elapsed_time = time.time() - start_time
            return {
                'request_id': request_id,
                'status_code': status_code,
                'response_time': elapsed_time,
                'success': status_code == 200,
                'error': None
            }
        except asyncio.TimeoutError:
            elapsed_time = time.time() - start_time
            return {
                'request_id': request_id,
                'status_code': None,
                'response_time': elapsed_time,
                'success': False,
                'error': 'Timeout'
            }
        except Exception as e:
            elapsed_time = time.time() - start_time
            return {
                'request_id': request_id,
                'status_code': None,
                'response_time': elapsed_time,
                'success': False,
                'error': str(e)
            }
    async def run_concurrent_test(self, concurrent_users: int, 
                                   total_requests: int, 
                                   payload: dict = None):
        """执行异步并发测试"""
        print(f"开始异步并发测试 - 并发数: {concurrent_users}, 总请求数: {total_requests}")
        start_time = time.time()
        # 创建连接会话
        async with aiohttp.ClientSession() as session:
            # 创建信号量控制并发数
            semaphore = asyncio.Semaphore(concurrent_users)
            async def limited_request(request_id):
                async with semaphore:
                    result = await self.single_request(session, request_id, payload)
                    self.results.append(result)
                    return result
            # 创建所有任务
            tasks = [limited_request(i) for i in range(total_requests)]
            # 并发执行所有任务
            await asyncio.gather(*tasks)
        total_time = time.time() - start_time
        self._analyze_results(total_time, concurrent_users)
    def _analyze_results(self, total_time: float, concurrent_users: int):
        """分析测试结果"""
        if not self.results:
            print("没有测试结果")
            return
        success_count = sum(1 for r in self.results if r['success'])
        fail_count = sum(1 for r in self.results if not r['success'])
        response_times = [r['response_time'] for r in self.results]
        print("\n" + "="*50)
        print("异步并发测试结果报告")
        print("="*50)
        print(f"总请求数: {len(self.results)}")
        print(f"并发数: {concurrent_users}")
        print(f"总耗时: {total_time:.2f}秒")
        print(f"成功数: {success_count}")
        print(f"失败数: {fail_count}")
        print(f"成功率: {(success_count/len(self.results))*100:.2f}%")
        if response_times:
            import statistics
            print(f"平均响应时间: {statistics.mean(response_times)*1000:.2f}ms")
            print(f"P50: {statistics.median(response_times)*1000:.2f}ms")
            print(f"最小响应时间: {min(response_times)*1000:.2f}ms")
            print(f"最大响应时间: {max(response_times)*1000:.2f}ms")
            if total_time > 0:
                throughput = len(self.results) / total_time
                print(f"吞吐量: {throughput:.2f} 请求/秒")
        print("="*50)
# 运行异步测试
async def main():
    tester = AsyncConcurrencyTester(
        base_url="https://api.example.com",
        api_endpoint="/v1/users"
    )
    await tester.run_concurrent_test(
        concurrent_users=20,
        total_requests=200,
        payload={"action": "test"}
    )
# 执行
if __name__ == "__main__":
    asyncio.run(main())

命令行测试脚本(带参数)

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
接口并发测试脚本 - 命令行版本
"""
import argparse
import sys
import time
import json
import statistics
import requests
from concurrent.futures import ThreadPoolExecutor, as_completed
from typing import List, Dict
def parse_args():
    """解析命令行参数"""
    parser = argparse.ArgumentParser(description='API并发测试工具')
    parser.add_argument('-u', '--url', required=True, 
                       help='请求URL')
    parser.add_argument('-m', '--method', default='GET',
                       choices=['GET', 'POST', 'PUT', 'DELETE'],
                       help='HTTP方法')
    parser.add_argument('-c', '--concurrent', type=int, default=10,
                       help='并发用户数')
    parser.add_argument('-n', '--requests', type=int, default=100,
                       help='总请求数')
    parser.add_argument('-d', '--data', 
                       help='请求体JSON数据')
    parser.add_argument('-H', '--headers', nargs='*',
                       help='自定义Headers,格式: "Key: Value"')
    parser.add_argument('-t', '--timeout', type=int, default=30,
                       help='超时时间(秒)')
    parser.add_argument('-v', '--verbose', action='store_true',
                       help='详细输出')
    return parser.parse_args()
def parse_headers(headers_args: List[str]) -> Dict:
    """解析Headers参数"""
    headers = {}
    if headers_args:
        for header in headers_args:
            if ':' in header:
                key, value = header.split(':', 1)
                headers[key.strip()] = value.strip()
    return headers
def make_request(url: str, method: str, data: dict = None, 
                 headers: dict = None, timeout: int = 30) -> Dict:
    """发送HTTP请求"""
    start_time = time.time()
    try:
        if method == 'GET':
            response = requests.get(url, headers=headers, timeout=timeout)
        elif method == 'POST':
            response = requests.post(url, json=data, headers=headers, timeout=timeout)
        elif method == 'PUT':
            response = requests.put(url, json=data, headers=headers, timeout=timeout)
        elif method == 'DELETE':
            response = requests.delete(url, headers=headers, timeout=timeout)
        elapsed = time.time() - start_time
        return {
            'status_code': response.status_code,
            'response_time': elapsed,
            'success': response.status_code == 200,
            'size': len(response.content),
            'error': None
        }
    except requests.exceptions.Timeout:
        return {
            'status_code': None,
            'response_time': time.time() - start_time,
            'success': False,
            'size': 0,
            'error': 'Timeout'
        }
    except Exception as e:
        return {
            'status_code': None,
            'response_time': time.time() - start_time,
            'success': False,
            'size': 0,
            'error': str(e)
        }
def run_test(args):
    """执行测试"""
    print(f"""
=== API并发测试 ===
URL: {args.url}
方法: {args.method}
并发数: {args.concurrent}
总请求数: {args.requests}
超时时间: {args.timeout}s
    """)
    # 解析请求体
    data = None
    if args.data:
        try:
            data = json.loads(args.data)
        except json.JSONDecodeError:
            print("错误: 请求体JSON格式不正确")
            sys.exit(1)
    # 解析Headers
    headers = parse_headers(args.headers)
    results = []
    start_time = time.time()
    # 并发执行请求
    with ThreadPoolExecutor(max_workers=args.concurrent) as executor:
        futures = []
        for i in range(args.requests):
            future = executor.submit(
                make_request,
                args.url,
                args.method,
                data,
                headers,
                args.timeout
            )
            futures.append(future)
        # 收集结果
        for i, future in enumerate(as_completed(futures)):
            result = future.result()
            result['request_id'] = i
            results.append(result)
            if args.verbose and i % 10 == 0:
                print(f"进度: {i+1}/{args.requests} 请求")
    total_time = time.time() - start_time
    # 统计分析
    success_count = sum(1 for r in results if r['success'])
    fail_count = sum(1 for r in results if not r['success'])
    if results:
        response_times = [r['response_time'] for r in results]
        print(f"""
=== 测试结果 ===
总请求数: {len(results)}
成功数: {success_count}
失败数: {fail_count}
成功率: {(success_count/len(results))*100:.2f}%
总耗时: {total_time:.2f}s
响应时间统计:
  平均: {statistics.mean(response_times)*1000:.2f}ms
  中位数: {statistics.median(response_times)*1000:.2f}ms
  最小: {min(response_times)*1000:.2f}ms
  最大: {max(response_times)*1000:.2f}ms
  P95: {sorted(response_times)[int(len(response_times)*0.95)]*1000:.2f}ms
  P99: {sorted(response_times)[int(len(response_times)*0.99)]*1000:.2f}ms
吞吐量: {len(results)/total_time:.2f} req/s
错误分布:
""")
        # 错误统计
        errors = {}
        for r in results:
            if r['error']:
                errors[r['error']] = errors.get(r['error'], 0) + 1
        for error, count in errors.items():
            print(f"  {error}: {count}次")
        # 状态码分布
        status_codes = {}
        for r in results:
            code = r['status_code'] or 'Timeout'
            status_codes[code] = status_codes.get(code, 0) + 1
        print("\n状态码分布:")
        for code, count in status_codes.items():
            print(f"  {code}: {count}次")
def main():
    args = parse_args()
    run_test(args)
if __name__ == "__main__":
    main()

使用示例

命令行测试

# GET请求测试
python concurrency_test.py -u "https://api.example.com/users" -c 10 -n 100 -v
# POST请求测试
python concurrency_test.py -u "https://api.example.com/login" -m POST \
    -c 20 -n 200 \
    -d '{"username":"test","password":"123456"}' \
    -H "Content-Type: application/json" "Authorization: Bearer token"
# 带Headers的测试
python concurrency_test.py -u "https://api.example.com/data" \
    -H "API-Key: your-key" "Accept: application/json" \
    -c 50 -n 500 -t 60

Python代码调用

# 使用同步版本
tester = ConcurrencyTester("https://api.example.com", "/v1/users")
tester.run_concurrent_test(concurrent_users=10, total_requests=100)
# 使用异步版本(更好的性能)
tester = AsyncConcurrencyTester("https://api.example.com", "/v1/users")
asyncio.run(tester.run_concurrent_test(20, 200))

安装依赖

pip install requests aiohttp

这个脚本提供了完整的并发测试功能,包括:

  • ✅ 多线程并发请求
  • ✅ 异步IO支持(高性能)
  • ✅ 详细的统计报告
  • ✅ 命令行参数支持
  • ✅ 错误处理和超时控制
  • ✅ 自定义Headers和请求体
  • ✅ 吞吐量计算

您可以根据实际需求调整并发数、请求总数和其他参数。

抱歉,评论功能暂时关闭!