Python接口幂等性实现方案
什么是接口幂等性
幂等性是指多次执行同一操作与执行一次产生的结果相同,常见场景:

- 支付系统(防止重复扣款)
- 订单系统(防止重复下单)
- 消息队列(消息重复消费)
基于Token的方案
import uuid
import redis
from flask import Flask, request, jsonify, make_response
app = Flask(__name__)
redis_client = redis.Redis(host='localhost', port=6379, db=0)
# 生成幂等性Token
@app.route('/api/idempotent/token', methods=['GET'])
def get_idempotent_token():
token = str(uuid.uuid4())
# 存储到Redis,设置过期时间(防止内存泄漏)
redis_client.setex(f"idempotent:{token}", 3600, "active")
return jsonify({"token": token})
# 带幂等性校验的接口
@app.route('/api/order/create', methods=['POST'])
def create_order():
token = request.headers.get('Idempotent-Token')
if not token:
return jsonify({"error": "缺少幂等性Token"}), 400
if not redis_client.get(f"idempotent:{token}"):
return jsonify({"error": "Token已过期或已使用"}), 409
# 删除Token(保证幂等性)
if not redis_client.delete(f"idempotent:{token}"):
# 并发情况下可能已被删除,说明请求正在处理中
return jsonify({"message": "请求正在处理中"}), 200
# 执行业务逻辑
order_data = request.json
# ... 创建订单逻辑
return jsonify({"message": "订单创建成功", "order_id": "123456"}), 200
基于数据库唯一键的方案
from sqlalchemy import create_engine, Column, String, Integer, UniqueConstraint
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker
Base = declarative_base()
class Order(Base):
__tablename__ = 'orders'
id = Column(Integer, primary_key=True)
order_id = Column(String(64), unique=True) # 唯一约束
user_id = Column(Integer)
amount = Column(Integer)
status = Column(String(32))
# 使用数据库唯一键实现幂等性
@app.route('/api/payment/pay', methods=['POST'])
def payment_pay():
payment_data = request.json
payment_id = payment_data.get('payment_id') # 业务幂等键
try:
# 使用INSERT...ON DUPLICATE KEY UPDATE(MySQL)
# 或使用SELECT和INSERT组合判断
# 检查是否已处理
existing = session.query(Order).filter_by(
order_id=payment_id
).first()
if existing:
return jsonify({
"message": "订单已处理",
"order_id": existing.order_id
}), 200
# 创建新订单
new_order = Order(order_id=payment_id, user_id=...)
session.add(new_order)
session.commit()
return jsonify({"order_id": payment_id}), 201
except Exception as e:
session.rollback()
return jsonify({"error": "处理失败"}), 500
基于Redis分布式锁的方案
import time
import hashlib
from contextlib import contextmanager
class IdempotentManager:
def __init__(self, redis_client):
self.redis = redis_client
def get_idempotent_key(self, request_data):
"""生成幂等性检查键"""
# 对请求参数进行哈希
content = f"{request_data.get('user_id')}:{request_data.get('amount')}"
return hashlib.md5(content.encode()).hexdigest()
@contextmanager
def lock(self, key, timeout=30):
"""分布式锁实现"""
lock_key = f"idempotent:lock:{key}"
while True:
if self.redis.setnx(lock_key, time.time()):
# 获取锁成功
self.redis.expire(lock_key, timeout)
try:
yield
finally:
self.redis.delete(lock_key)
break
else:
# 锁已被占用,等待
time.sleep(0.1)
# 检查锁是否过期
if time.time() - float(self.redis.get(lock_key) or 0) > timeout:
self.redis.delete(lock_key)
def check_and_process(self, request_data, process_func):
"""幂等性处理"""
idempotent_key = self.get_idempotent_key(request_data)
process_key = f"idempotent:process:{idempotent_key}"
with self.lock(idempotent_key):
# 检查是否已处理
if self.redis.get(process_key):
return {'status': 'processed', 'message': '请求已处理'}
# 执行业务逻辑
result = process_func(request_data)
# 标记已处理
self.redis.setex(process_key, 3600, 'done')
return result
# 使用示例
@app.route('/api/transfer', methods=['POST'])
def transfer():
request_data = request.json
manager = IdempotentManager(redis_client)
def process(data):
# 实际的转账逻辑
from_account = data['from_account']
to_account = data['to_account']
amount = data['amount']
# 执行转账...
return {'status': 'success', 'message': '转账成功'}
result = manager.check_and_process(request_data, process)
return jsonify(result)
完整的生产级实现
import functools
from flask import request, g
from datetime import datetime
def idempotent_decorator(timeout=3600):
"""幂等性装饰器"""
def decorator(f):
@functools.wraps(f)
def wrapper(*args, **kwargs):
# 获取幂等性标识
idempotent_key = request.headers.get('X-Idempotent-Key')
if not idempotent_key:
return jsonify({'error': '缺少幂等性标识'}), 400
# 检查是否已处理
cache_key = f"idempotent:{f.__name__}:{idempotent_key}"
cached_response = redis_client.get(cache_key)
if cached_response:
return jsonify({
'code': 200,
'message': '重复请求',
'data': cached_response
}), 200
# 标记处理中
processing_flag = f"{cache_key}:processing"
if not redis_client.setnx(processing_flag, time.time()):
return jsonify({'code': 429, 'message': '请求正在处理中'}), 429
redis_client.expire(processing_flag, 30) # 防止死锁
try:
# 执行原函数
result = f(*args, **kwargs)
# 缓存结果
redis_client.setex(cache_key, timeout, result)
return result
except Exception as e:
# 发生异常,清理处理标记
redis_client.delete(processing_flag)
raise
finally:
redis_client.delete(processing_flag)
return wrapper
return decorator
# 使用装饰器实现幂等性
@app.route('/api/order/create', methods=['POST'])
@idempotent_decorator(timeout=3600)
def create_order_api():
"""创建订单接口"""
data = request.json
# 业务逻辑
order = create_order_in_db(
user_id=data['user_id'],
amount=data['amount'],
items=data['items']
)
return order.to_dict()
幂等性设计最佳实践
幂等性键生成策略
import hashlib
from flask import request
def generate_idempotent_key():
"""生成幂等性键"""
# 方案1:基于请求内容和时间戳
request_content = f"{request.method}:{request.path}:{request.get_data()}"
return hashlib.sha256(request_content.encode()).hexdigest()
# 方案2:客户端提供唯一标识
# return request.headers.get('X-Idempotent-Key')
# 方案3:业务键值(如订单号、交易流水号)
# return request.json.get('business_id')
并发控制实现
import threading
class IdempotentConcurrencyController:
"""并发控制"""
def __init__(self, redis_client):
self.redis = redis_client
self.local_lock = threading.Lock()
def acquire(self, key):
"""获取幂等性锁"""
lock_key = f"idempotent:lock:{key}"
# 尝试获取Redis锁
acquired = self.redis.setnx(lock_key, "locked")
if acquired:
self.redis.expire(lock_key, 10)
return True
# Redis锁失败,使用本地锁降级
return self.local_lock.acquire(blocking=False)
def release(self, key):
"""释放锁"""
lock_key = f"idempotent:lock:{key}"
self.redis.delete(lock_key)
self.local_lock.release()
测试代码
import pytest
import requests
class TestIdempotent:
def test_duplicate_request(self):
"""测试重复请求"""
# 第一次请求
token = requests.get('/api/idempotent/token').json()['token']
headers = {'Idempotent-Token': token}
data = {'user_id': 1, 'amount': 100}
# 第一次请求
response1 = requests.post('/api/order/create',
json=data, headers=headers)
assert response1.status_code == 201
# 重复请求(使用相同Token)
response2 = requests.post('/api/order/create',
json=data, headers=headers)
assert response2.status_code == 200
assert response2.json()['message'] == '请求正在处理中'
实现接口幂等性的关键点:
- 唯一标识:为每个请求生成唯一标识(Token、业务键、哈希值)
- 存储机制:使用Redis、数据库等存储请求处理状态
- 并发控制:通过分布式锁、数据库锁等确保原子性
- 过期策略:避免存储无限增长,设置合适的过期时间
- 异常处理:处理网络故障、服务重启等异常情况