本文目录导读:

我来为你展示一个完整的数据签名验证实现案例,包含多种常见场景。
使用HMAC对称签名验证
import hashlib
import hmac
import json
class HMACSigner:
def __init__(self, secret_key):
self.secret_key = secret_key.encode('utf-8')
def sign(self, data):
"""生成HMAC签名"""
# 将数据转换为JSON字符串
if isinstance(data, dict):
data_str = json.dumps(data, sort_keys=True, separators=(',', ':'))
else:
data_str = str(data)
# 计算HMAC签名
signature = hmac.new(
self.secret_key,
data_str.encode('utf-8'),
hashlib.sha256
).hexdigest()
return signature
def verify(self, data, signature):
"""验证HMAC签名"""
expected_signature = self.sign(data)
return hmac.compare_digest(signature, expected_signature)
# 使用示例
signer = HMACSigner("my-secret-key-123")
# 要签名的数据
data = {
"user_id": 12345,
"action": "transfer",
"amount": 1000,
"timestamp": "2024-01-01T10:00:00Z"
}
# 生成签名
signature = signer.sign(data)
print(f"生成的签名: {signature}")
# 验证签名
is_valid = signer.verify(data, signature)
print(f"签名验证结果: {is_valid}")
# 篡改数据测试
tampered_data = dict(data)
tampered_data["amount"] = 9999
is_valid_tampered = signer.verify(tampered_data, signature)
print(f"篡改后验证结果: {is_valid_tampered}")
使用RSA非对称签名验证
from cryptography.hazmat.primitives import hashes
from cryptography.hazmat.primitives.asymmetric import rsa, padding
from cryptography.hazmat.primitives import serialization
from cryptography.exceptions import InvalidSignature
import base64
class RSASigner:
def __init__(self):
# 生成RSA密钥对(实际应用中应从安全存储加载)
self.private_key = rsa.generate_private_key(
public_exponent=65537,
key_size=2048
)
self.public_key = self.private_key.public_key()
def save_keys(self, private_key_path, public_key_path):
"""保存密钥到文件"""
# 保存私钥
with open(private_key_path, 'wb') as f:
f.write(self.private_key.private_bytes(
encoding=serialization.Encoding.PEM,
format=serialization.PrivateFormat.PKCS8,
encryption_algorithm=serialization.NoEncryption()
))
# 保存公钥
with open(public_key_path, 'wb') as f:
f.write(self.public_key.public_bytes(
encoding=serialization.Encoding.PEM,
format=serialization.PublicFormat.SubjectPublicKeyInfo
))
def load_keys(self, private_key_path, public_key_path):
"""从文件加载密钥"""
with open(private_key_path, 'rb') as f:
self.private_key = serialization.load_pem_private_key(
f.read(),
password=None
)
with open(public_key_path, 'rb') as f:
self.public_key = serialization.load_pem_public_key(f.read())
def sign(self, message: str) -> str:
"""使用私钥签名"""
signature = self.private_key.sign(
message.encode('utf-8'),
padding.PSS(
mgf=padding.MGF1(hashes.SHA256()),
salt_length=padding.PSS.MAX_LENGTH
),
hashes.SHA256()
)
return base64.b64encode(signature).decode('utf-8')
def verify(self, message: str, signature: str) -> bool:
"""使用公钥验证签名"""
try:
signature_bytes = base64.b64decode(signature)
self.public_key.verify(
signature_bytes,
message.encode('utf-8'),
padding.PSS(
mgf=padding.MGF1(hashes.SHA256()),
salt_length=padding.PSS.MAX_LENGTH
),
hashes.SHA256()
)
return True
except InvalidSignature:
return False
# 使用示例
rsa_signer = RSASigner()
# 要签名的消息
message = "Hello, this is a signed message!"
# 生成签名
signature = rsa_signer.sign(message)
print(f"RSA签名: {signature[:50]}...")
# 验证签名
is_valid = rsa_signer.verify(message, signature)
print(f"签名验证结果: {is_valid}")
# 测试篡改
modified_message = "Hello, this is a modified message!"
is_valid_modified = rsa_signer.verify(modified_message, signature)
print(f"篡改后验证结果: {is_valid_modified}")
# 保存密钥示例
# rsa_signer.save_keys("private.pem", "public.pem")
API请求签名验证(实战案例)
import requests
from datetime import datetime
import time
import hashlib
import hmac
import json
class APIRequestSigner:
"""API请求签名工具"""
def __init__(self, app_id, app_secret):
self.app_id = app_id
self.app_secret = app_secret.encode('utf-8')
def generate_signature(self, method, path, params=None, body=None, timestamp=None):
"""生成API请求签名"""
if timestamp is None:
timestamp = str(int(time.time()))
# 构建签名字符串
sign_str = f"{method}\n{path}\n{timestamp}\n"
# 添加参数
if params:
sorted_params = sorted(params.items())
param_str = '&'.join([f"{k}={v}" for k, v in sorted_params])
sign_str += f"{param_str}\n"
# 添加请求体
if body:
if isinstance(body, dict):
body_str = json.dumps(body, sort_keys=True, separators=(',', ':'))
else:
body_str = str(body)
sign_str += body_str
# 计算HMAC-SHA256签名
signature = hmac.new(
self.app_secret,
sign_str.encode('utf-8'),
hashlib.sha256
).hexdigest()
return {
'X-App-ID': self.app_id,
'X-Timestamp': timestamp,
'X-Signature': signature
}
def verify_request_signature(self, headers, method, path, params=None, body=None):
"""验证请求签名"""
received_signature = headers.get('X-Signature')
timestamp = headers.get('X-Timestamp')
if not received_signature or not timestamp:
return False
# 检查时间戳是否过期(5分钟)
current_time = int(time.time())
if abs(current_time - int(timestamp)) > 300:
return False
# 重新计算签名
expected_headers = self.generate_signature(method, path, params, body, timestamp)
expected_signature = expected_headers['X-Signature']
return hmac.compare_digest(received_signature, expected_signature)
# 使用示例
signer = APIRequestSigner("app_123456", "secret_key_abcdef")
# 模拟API请求
method = "POST"
path = "/api/v1/transfer"
params = {"limit": 100, "offset": 0}
body = {
"from_account": "123456",
"to_account": "789012",
"amount": 500,
"currency": "CNY"
}
# 生成签名头
headers = signer.generate_signature(method, path, params, body)
print("生成的请求头:", json.dumps(headers, indent=2))
# 验证请求
is_valid = signer.verify_request_signature(headers, method, path, params, body)
print(f"请求签名验证: {'通过' if is_valid else '失败'}")
# 模拟伪造请求
fake_headers = headers.copy()
fake_headers['X-Signature'] = "fake_signature"
is_fake_valid = signer.verify_request_signature(fake_headers, method, path, params, body)
print(f"伪造请求验证: {'通过' if is_fake_valid else '失败'}")
文件完整性验证
import hashlib
import os
class FileIntegrityVerifier:
"""文件完整性验证工具"""
@staticmethod
def calculate_file_hash(filepath, algorithm='sha256'):
"""计算文件哈希值"""
hash_func = hashlib.new(algorithm)
with open(filepath, 'rb') as f:
# 分块读取大文件
for chunk in iter(lambda: f.read(4096), b''):
hash_func.update(chunk)
return hash_func.hexdigest()
@staticmethod
def verify_file_integrity(filepath, expected_hash, algorithm='sha256'):
"""验证文件完整性"""
actual_hash = FileIntegrityVerifier.calculate_file_hash(filepath, algorithm)
return actual_hash == expected_hash
@staticmethod
def create_signature_file(filepath, signature_filepath, algorithm='sha256'):
"""创建签名文件"""
file_hash = FileIntegrityVerifier.calculate_file_hash(filepath, algorithm)
with open(signature_filepath, 'w') as f:
f.write(f"{algorithm}:{file_hash}")
return file_hash
# 使用示例
verifier = FileIntegrityVerifier()
# 创建测试文件
test_content = b"This is a test file for integrity verification."
with open("test_file.txt", "wb") as f:
f.write(test_content)
# 计算文件哈希
file_hash = verifier.calculate_file_hash("test_file.txt")
print(f"文件哈希值: {file_hash}")
# 验证文件完整性
is_valid = verifier.verify_file_integrity("test_file.txt", file_hash)
print(f"文件完整性验证: {'通过' if is_valid else '失败'}")
# 模拟文件被修改
with open("test_file.txt", "ab") as f:
f.write(b"modified content")
is_modified_valid = verifier.verify_file_integrity("test_file.txt", file_hash)
print(f"修改后验证: {'通过' if is_modified_valid else '失败'}")
# 清理测试文件
os.remove("test_file.txt")
JWT Token签名验证
import jwt
from datetime import datetime, timedelta
import json
class JWTSigner:
"""JWT Token签名验证工具"""
def __init__(self, secret_key, algorithm='HS256'):
self.secret_key = secret_key
self.algorithm = algorithm
def create_token(self, payload, expiration_hours=24):
"""创建JWT Token"""
# 添加标准声明
payload['iat'] = datetime.utcnow()
payload['exp'] = datetime.utcnow() + timedelta(hours=expiration_hours)
payload['type'] = 'access'
# 生成token
token = jwt.encode(
payload,
self.secret_key,
algorithm=self.algorithm
)
return token
def verify_token(self, token):
"""验证JWT Token"""
try:
# 解码并验证token
payload = jwt.decode(
token,
self.secret_key,
algorithms=[self.algorithm]
)
return {
'valid': True,
'payload': payload,
'error': None
}
except jwt.ExpiredSignatureError:
return {
'valid': False,
'payload': None,
'error': 'Token已过期'
}
except jwt.InvalidTokenError as e:
return {
'valid': False,
'payload': None,
'error': f'无效的Token: {str(e)}'
}
# 使用示例
jwt_signer = JWTSigner("super-secret-key-2024")
# 创建payload
user_payload = {
'user_id': 10001,
'username': 'alice',
'role': 'admin',
'permissions': ['read', 'write', 'delete']
}
# 生成JWT Token
token = jwt_signer.create_token(user_payload, expiration_hours=24)
print(f"JWT Token: {token[:50]}...")
# 验证Token
result = jwt_signer.verify_token(token)
print(f"Token验证结果: {'通过' if result['valid'] else '失败'}")
if result['valid']:
payload = result['payload']
print(f"用户ID: {payload['user_id']}")
print(f"用户名: {payload['username']}")
print(f"角色: {payload['role']}")
# 测试过期token
expired_token = jwt_signer.create_token(user_payload, expiration_hours=0)
import time
time.sleep(1)
result_expired = jwt_signer.verify_token(expired_token)
print(f"过期Token验证: {'通过' if result_expired['valid'] else '失败'} - {result_expired['error']}")
# 测试伪造token
fake_token = "eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.fake_signature"
result_fake = jwt_signer.verify_token(fake_token)
print(f"伪造Token验证: {'通过' if result_fake['valid'] else '失败'} - {result_fake['error']}")
安装依赖
# 需要安装的依赖 pip install cryptography pyjwt requests
最佳实践建议
-
密钥管理:
- 密钥应安全存储,避免硬编码
- 使用环境变量或密钥管理服务
- 定期轮换密钥
-
签名算法选择:
- HMAC: 适合内部系统间通信
- RSA: 适合跨系统认证
- JWT: 适合用户认证和授权
-
安全考虑:
- 添加时间戳防止重放攻击
- 使用安全的哈希算法(SHA-256及以上)
- 对敏感数据进行加密传输
这些案例涵盖了常见的签名验证场景,你可以根据实际需求选择合适的方案。