本文目录导读:

我来为您提供一个完整的接口并发测试脚本框架,包含多种实现方式:
基于 Python + threading 的并发测试
import requests
import threading
import time
import json
from concurrent.futures import ThreadPoolExecutor
import statistics
class ConcurrencyTester:
def __init__(self, base_url, api_endpoint):
self.base_url = base_url
self.api_endpoint = api_endpoint
self.results = []
self.lock = threading.Lock()
def single_request(self, request_id, payload=None):
"""单个请求测试"""
start_time = time.time()
try:
url = f"{self.base_url}{self.api_endpoint}"
# 根据需求修改请求方法(GET/POST/PUT/DELETE)
if payload:
response = requests.post(url, json=payload, timeout=10)
else:
response = requests.get(url, timeout=10)
elapsed_time = time.time() - start_time
result = {
'request_id': request_id,
'status_code': response.status_code,
'response_time': elapsed_time,
'success': response.status_code == 200,
'error': None
}
except Exception as e:
elapsed_time = time.time() - start_time
result = {
'request_id': request_id,
'status_code': None,
'response_time': elapsed_time,
'success': False,
'error': str(e)
}
with self.lock:
self.results.append(result)
return result
def run_concurrent_test(self, concurrent_users, total_requests, payload=None):
"""并发测试执行"""
print(f"开始并发测试 - 并发数: {concurrent_users}, 总请求数: {total_requests}")
start_time = time.time()
# 使用线程池执行并发请求
with ThreadPoolExecutor(max_workers=concurrent_users) as executor:
futures = []
for i in range(total_requests):
future = executor.submit(
self.single_request,
i,
payload
)
futures.append(future)
# 等待所有请求完成
for future in futures:
try:
future.result()
except Exception as e:
print(f"请求执行异常: {e}")
total_time = time.time() - start_time
# 统计结果
self._analyze_results(total_time, concurrent_users)
def _analyze_results(self, total_time, concurrent_users):
"""分析测试结果"""
if not self.results:
print("没有测试结果")
return
success_count = sum(1 for r in self.results if r['success'])
fail_count = sum(1 for r in self.results if not r['success'])
response_times = [r['response_time'] for r in self.results]
print("\n" + "="*50)
print("并发测试结果报告")
print("="*50)
print(f"总请求数: {len(self.results)}")
print(f"并发用户数: {concurrent_users}")
print(f"总耗时: {total_time:.2f}秒")
print(f"成功数: {success_count}")
print(f"失败数: {fail_count}")
print(f"成功率: {(success_count/len(self.results))*100:.2f}%")
if response_times:
print(f"平均响应时间: {statistics.mean(response_times)*1000:.2f}ms")
print(f"最小响应时间: {min(response_times)*1000:.2f}ms")
print(f"最大响应时间: {max(response_times)*1000:.2f}ms")
print(f"中位响应时间: {statistics.median(response_times)*1000:.2f}ms")
# 计算吞吐量
if total_time > 0:
throughput = len(self.results) / total_time
print(f"吞吐量: {throughput:.2f} 请求/秒")
# 错误详情
errors = [r for r in self.results if r['error']]
if errors:
print("\n错误详情:")
for error in errors[:5]: # 只显示前5个错误
print(f" 请求 {error['request_id']}: {error['error']}")
print("="*50)
# 使用示例
def main():
# 配置测试参数
BASE_URL = "https://api.example.com"
API_ENDPOINT = "/v1/users"
CONCURRENT_USERS = 10 # 并发用户数
TOTAL_REQUESTS = 100 # 总请求数
# 如果有请求体
payload = {
"user_id": "test123",
"action": "query"
}
# 创建测试器并执行
tester = ConcurrencyTester(BASE_URL, API_ENDPOINT)
tester.run_concurrent_test(CONCURRENT_USERS, TOTAL_REQUESTS, payload)
if __name__ == "__main__":
main()
基于 aiohttp 的异步并发测试(性能更好)
import aiohttp
import asyncio
import time
import json
from typing import List, Dict
class AsyncConcurrencyTester:
def __init__(self, base_url: str, api_endpoint: str):
self.base_url = base_url
self.api_endpoint = api_endpoint
self.results = []
async def single_request(self, session: aiohttp.ClientSession,
request_id: int, payload: dict = None) -> Dict:
"""单个异步请求"""
start_time = time.time()
url = f"{self.base_url}{self.api_endpoint}"
try:
if payload:
async with session.post(url, json=payload, timeout=10) as response:
status_code = response.status
# 可选:读取响应内容
# response_text = await response.text()
else:
async with session.get(url, timeout=10) as response:
status_code = response.status
elapsed_time = time.time() - start_time
return {
'request_id': request_id,
'status_code': status_code,
'response_time': elapsed_time,
'success': status_code == 200,
'error': None
}
except asyncio.TimeoutError:
elapsed_time = time.time() - start_time
return {
'request_id': request_id,
'status_code': None,
'response_time': elapsed_time,
'success': False,
'error': 'Timeout'
}
except Exception as e:
elapsed_time = time.time() - start_time
return {
'request_id': request_id,
'status_code': None,
'response_time': elapsed_time,
'success': False,
'error': str(e)
}
async def run_concurrent_test(self, concurrent_users: int,
total_requests: int,
payload: dict = None):
"""执行异步并发测试"""
print(f"开始异步并发测试 - 并发数: {concurrent_users}, 总请求数: {total_requests}")
start_time = time.time()
# 创建连接会话
async with aiohttp.ClientSession() as session:
# 创建信号量控制并发数
semaphore = asyncio.Semaphore(concurrent_users)
async def limited_request(request_id):
async with semaphore:
result = await self.single_request(session, request_id, payload)
self.results.append(result)
return result
# 创建所有任务
tasks = [limited_request(i) for i in range(total_requests)]
# 并发执行所有任务
await asyncio.gather(*tasks)
total_time = time.time() - start_time
self._analyze_results(total_time, concurrent_users)
def _analyze_results(self, total_time: float, concurrent_users: int):
"""分析测试结果"""
if not self.results:
print("没有测试结果")
return
success_count = sum(1 for r in self.results if r['success'])
fail_count = sum(1 for r in self.results if not r['success'])
response_times = [r['response_time'] for r in self.results]
print("\n" + "="*50)
print("异步并发测试结果报告")
print("="*50)
print(f"总请求数: {len(self.results)}")
print(f"并发数: {concurrent_users}")
print(f"总耗时: {total_time:.2f}秒")
print(f"成功数: {success_count}")
print(f"失败数: {fail_count}")
print(f"成功率: {(success_count/len(self.results))*100:.2f}%")
if response_times:
import statistics
print(f"平均响应时间: {statistics.mean(response_times)*1000:.2f}ms")
print(f"P50: {statistics.median(response_times)*1000:.2f}ms")
print(f"最小响应时间: {min(response_times)*1000:.2f}ms")
print(f"最大响应时间: {max(response_times)*1000:.2f}ms")
if total_time > 0:
throughput = len(self.results) / total_time
print(f"吞吐量: {throughput:.2f} 请求/秒")
print("="*50)
# 运行异步测试
async def main():
tester = AsyncConcurrencyTester(
base_url="https://api.example.com",
api_endpoint="/v1/users"
)
await tester.run_concurrent_test(
concurrent_users=20,
total_requests=200,
payload={"action": "test"}
)
# 执行
if __name__ == "__main__":
asyncio.run(main())
命令行测试脚本(带参数)
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
接口并发测试脚本 - 命令行版本
"""
import argparse
import sys
import time
import json
import statistics
import requests
from concurrent.futures import ThreadPoolExecutor, as_completed
from typing import List, Dict
def parse_args():
"""解析命令行参数"""
parser = argparse.ArgumentParser(description='API并发测试工具')
parser.add_argument('-u', '--url', required=True,
help='请求URL')
parser.add_argument('-m', '--method', default='GET',
choices=['GET', 'POST', 'PUT', 'DELETE'],
help='HTTP方法')
parser.add_argument('-c', '--concurrent', type=int, default=10,
help='并发用户数')
parser.add_argument('-n', '--requests', type=int, default=100,
help='总请求数')
parser.add_argument('-d', '--data',
help='请求体JSON数据')
parser.add_argument('-H', '--headers', nargs='*',
help='自定义Headers,格式: "Key: Value"')
parser.add_argument('-t', '--timeout', type=int, default=30,
help='超时时间(秒)')
parser.add_argument('-v', '--verbose', action='store_true',
help='详细输出')
return parser.parse_args()
def parse_headers(headers_args: List[str]) -> Dict:
"""解析Headers参数"""
headers = {}
if headers_args:
for header in headers_args:
if ':' in header:
key, value = header.split(':', 1)
headers[key.strip()] = value.strip()
return headers
def make_request(url: str, method: str, data: dict = None,
headers: dict = None, timeout: int = 30) -> Dict:
"""发送HTTP请求"""
start_time = time.time()
try:
if method == 'GET':
response = requests.get(url, headers=headers, timeout=timeout)
elif method == 'POST':
response = requests.post(url, json=data, headers=headers, timeout=timeout)
elif method == 'PUT':
response = requests.put(url, json=data, headers=headers, timeout=timeout)
elif method == 'DELETE':
response = requests.delete(url, headers=headers, timeout=timeout)
elapsed = time.time() - start_time
return {
'status_code': response.status_code,
'response_time': elapsed,
'success': response.status_code == 200,
'size': len(response.content),
'error': None
}
except requests.exceptions.Timeout:
return {
'status_code': None,
'response_time': time.time() - start_time,
'success': False,
'size': 0,
'error': 'Timeout'
}
except Exception as e:
return {
'status_code': None,
'response_time': time.time() - start_time,
'success': False,
'size': 0,
'error': str(e)
}
def run_test(args):
"""执行测试"""
print(f"""
=== API并发测试 ===
URL: {args.url}
方法: {args.method}
并发数: {args.concurrent}
总请求数: {args.requests}
超时时间: {args.timeout}s
""")
# 解析请求体
data = None
if args.data:
try:
data = json.loads(args.data)
except json.JSONDecodeError:
print("错误: 请求体JSON格式不正确")
sys.exit(1)
# 解析Headers
headers = parse_headers(args.headers)
results = []
start_time = time.time()
# 并发执行请求
with ThreadPoolExecutor(max_workers=args.concurrent) as executor:
futures = []
for i in range(args.requests):
future = executor.submit(
make_request,
args.url,
args.method,
data,
headers,
args.timeout
)
futures.append(future)
# 收集结果
for i, future in enumerate(as_completed(futures)):
result = future.result()
result['request_id'] = i
results.append(result)
if args.verbose and i % 10 == 0:
print(f"进度: {i+1}/{args.requests} 请求")
total_time = time.time() - start_time
# 统计分析
success_count = sum(1 for r in results if r['success'])
fail_count = sum(1 for r in results if not r['success'])
if results:
response_times = [r['response_time'] for r in results]
print(f"""
=== 测试结果 ===
总请求数: {len(results)}
成功数: {success_count}
失败数: {fail_count}
成功率: {(success_count/len(results))*100:.2f}%
总耗时: {total_time:.2f}s
响应时间统计:
平均: {statistics.mean(response_times)*1000:.2f}ms
中位数: {statistics.median(response_times)*1000:.2f}ms
最小: {min(response_times)*1000:.2f}ms
最大: {max(response_times)*1000:.2f}ms
P95: {sorted(response_times)[int(len(response_times)*0.95)]*1000:.2f}ms
P99: {sorted(response_times)[int(len(response_times)*0.99)]*1000:.2f}ms
吞吐量: {len(results)/total_time:.2f} req/s
错误分布:
""")
# 错误统计
errors = {}
for r in results:
if r['error']:
errors[r['error']] = errors.get(r['error'], 0) + 1
for error, count in errors.items():
print(f" {error}: {count}次")
# 状态码分布
status_codes = {}
for r in results:
code = r['status_code'] or 'Timeout'
status_codes[code] = status_codes.get(code, 0) + 1
print("\n状态码分布:")
for code, count in status_codes.items():
print(f" {code}: {count}次")
def main():
args = parse_args()
run_test(args)
if __name__ == "__main__":
main()
使用示例
命令行测试
# GET请求测试
python concurrency_test.py -u "https://api.example.com/users" -c 10 -n 100 -v
# POST请求测试
python concurrency_test.py -u "https://api.example.com/login" -m POST \
-c 20 -n 200 \
-d '{"username":"test","password":"123456"}' \
-H "Content-Type: application/json" "Authorization: Bearer token"
# 带Headers的测试
python concurrency_test.py -u "https://api.example.com/data" \
-H "API-Key: your-key" "Accept: application/json" \
-c 50 -n 500 -t 60
Python代码调用
# 使用同步版本
tester = ConcurrencyTester("https://api.example.com", "/v1/users")
tester.run_concurrent_test(concurrent_users=10, total_requests=100)
# 使用异步版本(更好的性能)
tester = AsyncConcurrencyTester("https://api.example.com", "/v1/users")
asyncio.run(tester.run_concurrent_test(20, 200))
安装依赖
pip install requests aiohttp
这个脚本提供了完整的并发测试功能,包括:
- ✅ 多线程并发请求
- ✅ 异步IO支持(高性能)
- ✅ 详细的统计报告
- ✅ 命令行参数支持
- ✅ 错误处理和超时控制
- ✅ 自定义Headers和请求体
- ✅ 吞吐量计算
您可以根据实际需求调整并发数、请求总数和其他参数。