Python案例怎么实现存储加密?

wen python案例 73

本文目录导读:

Python案例怎么实现存储加密?

  1. 使用cryptography库(推荐)
  2. 使用AES加密(更通用)
  3. 数据库加密存储
  4. 配置文件加密
  5. 安装依赖
  6. 最佳实践建议

我来介绍几种Python实现存储加密的常用方法:

使用cryptography库(推荐)

基础文件加密

from cryptography.fernet import Fernet
import os
class FileEncryptor:
    def __init__(self, key_file='secret.key'):
        self.key_file = key_file
        self.key = self.load_or_create_key()
        self.cipher = Fernet(self.key)
    def load_or_create_key(self):
        """加载或创建密钥"""
        if os.path.exists(self.key_file):
            with open(self.key_file, 'rb') as f:
                return f.read()
        else:
            key = Fernet.generate_key()
            with open(self.key_file, 'wb') as f:
                f.write(key)
            return key
    def encrypt_file(self, input_file, output_file=None):
        """加密文件"""
        if output_file is None:
            output_file = input_file + '.encrypted'
        with open(input_file, 'rb') as f:
            data = f.read()
        encrypted_data = self.cipher.encrypt(data)
        with open(output_file, 'wb') as f:
            f.write(encrypted_data)
        print(f"文件已加密: {output_file}")
        return output_file
    def decrypt_file(self, input_file, output_file=None):
        """解密文件"""
        if output_file is None:
            output_file = input_file.replace('.encrypted', '.decrypted')
        with open(input_file, 'rb') as f:
            encrypted_data = f.read()
        decrypted_data = self.cipher.decrypt(encrypted_data)
        with open(output_file, 'wb') as f:
            f.write(decrypted_data)
        print(f"文件已解密: {output_file}")
        return output_file
# 使用示例
encryptor = FileEncryptor()
encryptor.encrypt_file('sensitive_data.txt')
encryptor.decrypt_file('sensitive_data.txt.encrypted')

密码保护的文件加密

from cryptography.fernet import Fernet
from cryptography.hazmat.primitives import hashes
from cryptography.hazmat.primitives.kdf.pbkdf2 import PBKDF2HMAC
import base64
import os
class PasswordProtectedEncryptor:
    def __init__(self, password):
        self.password = password.encode()
        self.salt = self.load_or_create_salt()
        self.key = self.derive_key()
        self.cipher = Fernet(self.key)
    def load_or_create_salt(self):
        """加载或创建salt"""
        salt_file = 'salt.bin'
        if os.path.exists(salt_file):
            with open(salt_file, 'rb') as f:
                return f.read()
        else:
            salt = os.urandom(16)
            with open(salt_file, 'wb') as f:
                f.write(salt)
            return salt
    def derive_key(self):
        """从密码派生密钥"""
        kdf = PBKDF2HMAC(
            algorithm=hashes.SHA256(),
            length=32,
            salt=self.salt,
            iterations=100000,
        )
        key = base64.urlsafe_b64encode(kdf.derive(self.password))
        return key
    def encrypt_data(self, data):
        """加密数据"""
        if isinstance(data, str):
            data = data.encode()
        return self.cipher.encrypt(data)
    def decrypt_data(self, encrypted_data):
        """解密数据"""
        return self.cipher.decrypt(encrypted_data)
# 使用示例
encryptor = PasswordProtectedEncryptor("my_secure_password")
encrypted = encryptor.encrypt_data("这是敏感信息")
print(f"加密后: {encrypted}")
decrypted = encryptor.decrypt_data(encrypted)
print(f"解密后: {decrypted.decode()}")

使用AES加密(更通用)

from Crypto.Cipher import AES
from Crypto.Util.Padding import pad, unpad
import base64
import os
class AESEncryptor:
    def __init__(self, key=None):
        self.key = key or os.urandom(32)  # AES-256
        self.iv = os.urandom(16)  # 初始化向量
    def encrypt(self, data):
        """加密数据"""
        if isinstance(data, str):
            data = data.encode()
        cipher = AES.new(self.key, AES.MODE_CBC, self.iv)
        padded_data = pad(data, AES.block_size)
        encrypted = cipher.encrypt(padded_data)
        # 返回IV + 加密数据
        return base64.b64encode(self.iv + encrypted).decode()
    def decrypt(self, encrypted_data):
        """解密数据"""
        encrypted_data = base64.b64decode(encrypted_data)
        # 提取IV和加密数据
        iv = encrypted_data[:16]
        ciphertext = encrypted_data[16:]
        cipher = AES.new(self.key, AES.MODE_CBC, iv)
        decrypted = cipher.decrypt(ciphertext)
        return unpad(decrypted, AES.block_size).decode()
    def encrypt_file(self, input_file, output_file=None):
        """加密文件"""
        if output_file is None:
            output_file = input_file + '.aes'
        with open(input_file, 'rb') as f:
            data = f.read()
        cipher = AES.new(self.key, AES.MODE_CBC, self.iv)
        padded_data = pad(data, AES.block_size)
        encrypted = cipher.encrypt(padded_data)
        with open(output_file, 'wb') as f:
            f.write(self.iv + encrypted)
        print(f"文件已加密: {output_file}")
        return output_file
    def decrypt_file(self, input_file, output_file=None):
        """解密文件"""
        if output_file is None:
            output_file = input_file.replace('.aes', '.decrypted')
        with open(input_file, 'rb') as f:
            data = f.read()
        iv = data[:16]
        ciphertext = data[16:]
        cipher = AES.new(self.key, AES.MODE_CBC, iv)
        decrypted = cipher.decrypt(ciphertext)
        unpadded_data = unpad(decrypted, AES.block_size)
        with open(output_file, 'wb') as f:
            f.write(unpadded_data)
        print(f"文件已解密: {output_file}")
        return output_file
# 使用示例
aes = AESEncryptor()
# 加密字符串
text = "敏感数据"
encrypted_text = aes.encrypt(text)
print(f"加密: {encrypted_text}")
decrypted_text = aes.decrypt(encrypted_text)
print(f"解密: {decrypted_text}")
# 加密文件
aes.encrypt_file('important.docx')
aes.decrypt_file('important.docx.aes')

数据库加密存储

import sqlite3
from cryptography.fernet import Fernet
import json
class SecureDatabase:
    def __init__(self, db_file='secure.db'):
        self.db_file = db_file
        self.key = Fernet.generate_key()
        self.cipher = Fernet(self.key)
        self.init_database()
    def init_database(self):
        """初始化数据库"""
        conn = sqlite3.connect(self.db_file)
        cursor = conn.cursor()
        cursor.execute('''
            CREATE TABLE IF NOT EXISTS secure_data (
                id INTEGER PRIMARY KEY AUTOINCREMENT,
                encrypted_data TEXT NOT NULL,
                metadata TEXT
            )
        ''')
        conn.commit()
        conn.close()
    def store_secure(self, data, metadata=None):
        """安全存储数据"""
        # 序列化数据
        if not isinstance(data, str):
            data = json.dumps(data)
        # 加密
        encrypted = self.cipher.encrypt(data.encode()).decode()
        # 存储
        conn = sqlite3.connect(self.db_file)
        cursor = conn.cursor()
        cursor.execute(
            "INSERT INTO secure_data (encrypted_data, metadata) VALUES (?, ?)",
            (encrypted, json.dumps(metadata) if metadata else None)
        )
        conn.commit()
        conn.close()
        return cursor.lastrowid
    def retrieve_secure(self, id):
        """安全检索数据"""
        conn = sqlite3.connect(self.db_file)
        cursor = conn.cursor()
        cursor.execute(
            "SELECT encrypted_data, metadata FROM secure_data WHERE id = ?",
            (id,)
        )
        row = cursor.fetchone()
        conn.close()
        if row:
            # 解密
            decrypted = self.cipher.decrypt(row[0].encode()).decode()
            metadata = json.loads(row[1]) if row[1] else None
            return {
                'id': id,
                'data': decrypted,
                'metadata': metadata
            }
        return None
    def update_secure(self, id, data, metadata=None):
        """更新加密数据"""
        if not isinstance(data, str):
            data = json.dumps(data)
        encrypted = self.cipher.encrypt(data.encode()).decode()
        conn = sqlite3.connect(self.db_file)
        cursor = conn.cursor()
        cursor.execute(
            "UPDATE secure_data SET encrypted_data = ?, metadata = ? WHERE id = ?",
            (encrypted, json.dumps(metadata) if metadata else None, id)
        )
        conn.commit()
        conn.close()
# 使用示例
db = SecureDatabase()
# 存储敏感信息
db.store_secure("信用卡号: 1234-5678-9012-3456", {"type": "credit_card"})
db.store_secure({"user": "admin", "password": "secret123"}, {"type": "credentials"})
# 检索信息
result = db.retrieve_secure(1)
print(f"检索结果: {result}")

配置文件加密

import configparser
import json
from cryptography.fernet import Fernet
import os
class SecureConfig:
    def __init__(self, config_file='config.ini'):
        self.config_file = config_file
        self.key_file = 'config.key'
        self.config = configparser.ConfigParser()
        self.load_config()
    def load_config(self):
        """加载配置文件"""
        self.cipher = self.get_cipher()
        if os.path.exists(self.config_file):
            with open(self.config_file, 'r', encoding='utf-8') as f:
                content = f.read()
                # 解密配置
                if content.startswith('[ENCRYPTED]'):
                    # 解析加密的配置
                    encrypted_data = content.replace('[ENCRYPTED]\n', '')
                    decrypted_data = self.cipher.decrypt(
                        encrypted_data.encode()
                    ).decode()
                    self.config.read_string(decrypted_data)
                else:
                    self.config.read_string(content)
        else:
            # 创建默认配置
            self.config['DEFAULT'] = {
                'server': 'localhost',
                'port': '8080'
            }
            self.save_config()
    def get_cipher(self):
        """获取或创建加密器"""
        if os.path.exists(self.key_file):
            with open(self.key_file, 'rb') as f:
                key = f.read()
        else:
            key = Fernet.generate_key()
            with open(self.key_file, 'wb') as f:
                f.write(key)
        return Fernet(key)
    def save_config(self):
        """保存加密的配置"""
        # 将配置转换为字符串
        config_str = ''
        for section in self.config.sections():
            config_str += f'[{section}]\n'
            for key, value in self.config[section].items():
                config_str += f'{key} = {value}\n'
            config_str += '\n'
        # 加密并保存
        encrypted = self.cipher.encrypt(config_str.encode()).decode()
        with open(self.config_file, 'w', encoding='utf-8') as f:
            f.write('[ENCRYPTED]\n')
            f.write(encrypted)
    def get(self, section, key):
        """获取配置值"""
        return self.config.get(section, key)
    def set(self, section, key, value):
        """设置配置值"""
        if not self.config.has_section(section):
            self.config.add_section(section)
        self.config.set(section, key, value)
        self.save_config()
# 使用示例
config = SecureConfig()
# 设置配置
config.set('DATABASE', 'host', '192.168.1.100')
config.set('DATABASE', 'password', 'my_secret_password')
# 获取配置
print(config.get('DATABASE', 'host'))
print(config.get('DATABASE', 'password'))

安装依赖

pip install cryptography pycryptodome

最佳实践建议

  1. 密钥管理: 使用环境变量或专门的密钥管理系统
  2. 加密算法选择: 优先使用AES-256或Fernet
  3. 完整性验证: 添加HMAC或数字签名
  4. 性能考虑: 对大数据使用流式加密
  5. 备份策略: 定期备份密钥,但确保安全存储

这些方法可以满足大多数Python存储加密需求,根据具体场景选择合适的方法。

抱歉,评论功能暂时关闭!