Python案例怎么实现数据签名验证?

wen python案例 68

本文目录导读:

Python案例怎么实现数据签名验证?

  1. 使用HMAC对称签名验证
  2. 使用RSA非对称签名验证
  3. API请求签名验证(实战案例)
  4. 文件完整性验证
  5. JWT Token签名验证
  6. 安装依赖
  7. 最佳实践建议

我来为你展示一个完整的数据签名验证实现案例,包含多种常见场景。

使用HMAC对称签名验证

import hashlib
import hmac
import json
class HMACSigner:
    def __init__(self, secret_key):
        self.secret_key = secret_key.encode('utf-8')
    def sign(self, data):
        """生成HMAC签名"""
        # 将数据转换为JSON字符串
        if isinstance(data, dict):
            data_str = json.dumps(data, sort_keys=True, separators=(',', ':'))
        else:
            data_str = str(data)
        # 计算HMAC签名
        signature = hmac.new(
            self.secret_key,
            data_str.encode('utf-8'),
            hashlib.sha256
        ).hexdigest()
        return signature
    def verify(self, data, signature):
        """验证HMAC签名"""
        expected_signature = self.sign(data)
        return hmac.compare_digest(signature, expected_signature)
# 使用示例
signer = HMACSigner("my-secret-key-123")
# 要签名的数据
data = {
    "user_id": 12345,
    "action": "transfer",
    "amount": 1000,
    "timestamp": "2024-01-01T10:00:00Z"
}
# 生成签名
signature = signer.sign(data)
print(f"生成的签名: {signature}")
# 验证签名
is_valid = signer.verify(data, signature)
print(f"签名验证结果: {is_valid}")
# 篡改数据测试
tampered_data = dict(data)
tampered_data["amount"] = 9999
is_valid_tampered = signer.verify(tampered_data, signature)
print(f"篡改后验证结果: {is_valid_tampered}")

使用RSA非对称签名验证

from cryptography.hazmat.primitives import hashes
from cryptography.hazmat.primitives.asymmetric import rsa, padding
from cryptography.hazmat.primitives import serialization
from cryptography.exceptions import InvalidSignature
import base64
class RSASigner:
    def __init__(self):
        # 生成RSA密钥对(实际应用中应从安全存储加载)
        self.private_key = rsa.generate_private_key(
            public_exponent=65537,
            key_size=2048
        )
        self.public_key = self.private_key.public_key()
    def save_keys(self, private_key_path, public_key_path):
        """保存密钥到文件"""
        # 保存私钥
        with open(private_key_path, 'wb') as f:
            f.write(self.private_key.private_bytes(
                encoding=serialization.Encoding.PEM,
                format=serialization.PrivateFormat.PKCS8,
                encryption_algorithm=serialization.NoEncryption()
            ))
        # 保存公钥
        with open(public_key_path, 'wb') as f:
            f.write(self.public_key.public_bytes(
                encoding=serialization.Encoding.PEM,
                format=serialization.PublicFormat.SubjectPublicKeyInfo
            ))
    def load_keys(self, private_key_path, public_key_path):
        """从文件加载密钥"""
        with open(private_key_path, 'rb') as f:
            self.private_key = serialization.load_pem_private_key(
                f.read(),
                password=None
            )
        with open(public_key_path, 'rb') as f:
            self.public_key = serialization.load_pem_public_key(f.read())
    def sign(self, message: str) -> str:
        """使用私钥签名"""
        signature = self.private_key.sign(
            message.encode('utf-8'),
            padding.PSS(
                mgf=padding.MGF1(hashes.SHA256()),
                salt_length=padding.PSS.MAX_LENGTH
            ),
            hashes.SHA256()
        )
        return base64.b64encode(signature).decode('utf-8')
    def verify(self, message: str, signature: str) -> bool:
        """使用公钥验证签名"""
        try:
            signature_bytes = base64.b64decode(signature)
            self.public_key.verify(
                signature_bytes,
                message.encode('utf-8'),
                padding.PSS(
                    mgf=padding.MGF1(hashes.SHA256()),
                    salt_length=padding.PSS.MAX_LENGTH
                ),
                hashes.SHA256()
            )
            return True
        except InvalidSignature:
            return False
# 使用示例
rsa_signer = RSASigner()
# 要签名的消息
message = "Hello, this is a signed message!"
# 生成签名
signature = rsa_signer.sign(message)
print(f"RSA签名: {signature[:50]}...")
# 验证签名
is_valid = rsa_signer.verify(message, signature)
print(f"签名验证结果: {is_valid}")
# 测试篡改
modified_message = "Hello, this is a modified message!"
is_valid_modified = rsa_signer.verify(modified_message, signature)
print(f"篡改后验证结果: {is_valid_modified}")
# 保存密钥示例
# rsa_signer.save_keys("private.pem", "public.pem")

API请求签名验证(实战案例)

import requests
from datetime import datetime
import time
import hashlib
import hmac
import json
class APIRequestSigner:
    """API请求签名工具"""
    def __init__(self, app_id, app_secret):
        self.app_id = app_id
        self.app_secret = app_secret.encode('utf-8')
    def generate_signature(self, method, path, params=None, body=None, timestamp=None):
        """生成API请求签名"""
        if timestamp is None:
            timestamp = str(int(time.time()))
        # 构建签名字符串
        sign_str = f"{method}\n{path}\n{timestamp}\n"
        # 添加参数
        if params:
            sorted_params = sorted(params.items())
            param_str = '&'.join([f"{k}={v}" for k, v in sorted_params])
            sign_str += f"{param_str}\n"
        # 添加请求体
        if body:
            if isinstance(body, dict):
                body_str = json.dumps(body, sort_keys=True, separators=(',', ':'))
            else:
                body_str = str(body)
            sign_str += body_str
        # 计算HMAC-SHA256签名
        signature = hmac.new(
            self.app_secret,
            sign_str.encode('utf-8'),
            hashlib.sha256
        ).hexdigest()
        return {
            'X-App-ID': self.app_id,
            'X-Timestamp': timestamp,
            'X-Signature': signature
        }
    def verify_request_signature(self, headers, method, path, params=None, body=None):
        """验证请求签名"""
        received_signature = headers.get('X-Signature')
        timestamp = headers.get('X-Timestamp')
        if not received_signature or not timestamp:
            return False
        # 检查时间戳是否过期(5分钟)
        current_time = int(time.time())
        if abs(current_time - int(timestamp)) > 300:
            return False
        # 重新计算签名
        expected_headers = self.generate_signature(method, path, params, body, timestamp)
        expected_signature = expected_headers['X-Signature']
        return hmac.compare_digest(received_signature, expected_signature)
# 使用示例
signer = APIRequestSigner("app_123456", "secret_key_abcdef")
# 模拟API请求
method = "POST"
path = "/api/v1/transfer"
params = {"limit": 100, "offset": 0}
body = {
    "from_account": "123456",
    "to_account": "789012",
    "amount": 500,
    "currency": "CNY"
}
# 生成签名头
headers = signer.generate_signature(method, path, params, body)
print("生成的请求头:", json.dumps(headers, indent=2))
# 验证请求
is_valid = signer.verify_request_signature(headers, method, path, params, body)
print(f"请求签名验证: {'通过' if is_valid else '失败'}")
# 模拟伪造请求
fake_headers = headers.copy()
fake_headers['X-Signature'] = "fake_signature"
is_fake_valid = signer.verify_request_signature(fake_headers, method, path, params, body)
print(f"伪造请求验证: {'通过' if is_fake_valid else '失败'}")

文件完整性验证

import hashlib
import os
class FileIntegrityVerifier:
    """文件完整性验证工具"""
    @staticmethod
    def calculate_file_hash(filepath, algorithm='sha256'):
        """计算文件哈希值"""
        hash_func = hashlib.new(algorithm)
        with open(filepath, 'rb') as f:
            # 分块读取大文件
            for chunk in iter(lambda: f.read(4096), b''):
                hash_func.update(chunk)
        return hash_func.hexdigest()
    @staticmethod
    def verify_file_integrity(filepath, expected_hash, algorithm='sha256'):
        """验证文件完整性"""
        actual_hash = FileIntegrityVerifier.calculate_file_hash(filepath, algorithm)
        return actual_hash == expected_hash
    @staticmethod
    def create_signature_file(filepath, signature_filepath, algorithm='sha256'):
        """创建签名文件"""
        file_hash = FileIntegrityVerifier.calculate_file_hash(filepath, algorithm)
        with open(signature_filepath, 'w') as f:
            f.write(f"{algorithm}:{file_hash}")
        return file_hash
# 使用示例
verifier = FileIntegrityVerifier()
# 创建测试文件
test_content = b"This is a test file for integrity verification."
with open("test_file.txt", "wb") as f:
    f.write(test_content)
# 计算文件哈希
file_hash = verifier.calculate_file_hash("test_file.txt")
print(f"文件哈希值: {file_hash}")
# 验证文件完整性
is_valid = verifier.verify_file_integrity("test_file.txt", file_hash)
print(f"文件完整性验证: {'通过' if is_valid else '失败'}")
# 模拟文件被修改
with open("test_file.txt", "ab") as f:
    f.write(b"modified content")
is_modified_valid = verifier.verify_file_integrity("test_file.txt", file_hash)
print(f"修改后验证: {'通过' if is_modified_valid else '失败'}")
# 清理测试文件
os.remove("test_file.txt")

JWT Token签名验证

import jwt
from datetime import datetime, timedelta
import json
class JWTSigner:
    """JWT Token签名验证工具"""
    def __init__(self, secret_key, algorithm='HS256'):
        self.secret_key = secret_key
        self.algorithm = algorithm
    def create_token(self, payload, expiration_hours=24):
        """创建JWT Token"""
        # 添加标准声明
        payload['iat'] = datetime.utcnow()
        payload['exp'] = datetime.utcnow() + timedelta(hours=expiration_hours)
        payload['type'] = 'access'
        # 生成token
        token = jwt.encode(
            payload,
            self.secret_key,
            algorithm=self.algorithm
        )
        return token
    def verify_token(self, token):
        """验证JWT Token"""
        try:
            # 解码并验证token
            payload = jwt.decode(
                token,
                self.secret_key,
                algorithms=[self.algorithm]
            )
            return {
                'valid': True,
                'payload': payload,
                'error': None
            }
        except jwt.ExpiredSignatureError:
            return {
                'valid': False,
                'payload': None,
                'error': 'Token已过期'
            }
        except jwt.InvalidTokenError as e:
            return {
                'valid': False,
                'payload': None,
                'error': f'无效的Token: {str(e)}'
            }
# 使用示例
jwt_signer = JWTSigner("super-secret-key-2024")
# 创建payload
user_payload = {
    'user_id': 10001,
    'username': 'alice',
    'role': 'admin',
    'permissions': ['read', 'write', 'delete']
}
# 生成JWT Token
token = jwt_signer.create_token(user_payload, expiration_hours=24)
print(f"JWT Token: {token[:50]}...")
# 验证Token
result = jwt_signer.verify_token(token)
print(f"Token验证结果: {'通过' if result['valid'] else '失败'}")
if result['valid']:
    payload = result['payload']
    print(f"用户ID: {payload['user_id']}")
    print(f"用户名: {payload['username']}")
    print(f"角色: {payload['role']}")
# 测试过期token
expired_token = jwt_signer.create_token(user_payload, expiration_hours=0)
import time
time.sleep(1)
result_expired = jwt_signer.verify_token(expired_token)
print(f"过期Token验证: {'通过' if result_expired['valid'] else '失败'} - {result_expired['error']}")
# 测试伪造token
fake_token = "eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.fake_signature"
result_fake = jwt_signer.verify_token(fake_token)
print(f"伪造Token验证: {'通过' if result_fake['valid'] else '失败'} - {result_fake['error']}")

安装依赖

# 需要安装的依赖
pip install cryptography pyjwt requests

最佳实践建议

  1. 密钥管理

    • 密钥应安全存储,避免硬编码
    • 使用环境变量或密钥管理服务
    • 定期轮换密钥
  2. 签名算法选择

    • HMAC: 适合内部系统间通信
    • RSA: 适合跨系统认证
    • JWT: 适合用户认证和授权
  3. 安全考虑

    • 添加时间戳防止重放攻击
    • 使用安全的哈希算法(SHA-256及以上)
    • 对敏感数据进行加密传输

这些案例涵盖了常见的签名验证场景,你可以根据实际需求选择合适的方案。

抱歉,评论功能暂时关闭!