密钥轮换脚本怎写?

wen 实用脚本 46

本文目录导读:

密钥轮换脚本怎写?

  1. AWS IAM 访问密钥轮换脚本
  2. SSL/TLS 证书轮换脚本
  3. 数据库密码轮换脚本
  4. 加密密钥轮换脚本
  5. API Token 轮换脚本
  6. 使用建议

AWS IAM 访问密钥轮换脚本

#!/bin/bash
# aws_key_rotation.sh - AWS IAM用户访问密钥轮换
set -e
USERNAME="${1:-$(aws iam get-user --query 'User.UserName' --output text)}"
PROFILE="${2:-default}"
echo "正在为用户 $USERNAME 轮换密钥..."
# 创建新密钥
NEW_KEY=$(aws iam create-access-key --user-name "$USERNAME" \
    --query 'AccessKey.{AccessKeyId:AccessKeyId,SecretAccessKey:SecretAccessKey}' \
    --output json)
echo "新密钥已创建"
echo "$NEW_KEY"
# 获取旧密钥ID
OLD_KEY_ID=$(aws iam list-access-keys --user-name "$USERNAME" \
    --query 'AccessKeyMetadata[0].AccessKeyId' --output text)
# 等待新密钥生效
echo "等待10秒让新密钥生效..."
sleep 10
# 更新本地凭据文件
AWS_CREDENTIALS_FILE="$HOME/.aws/credentials"
if [ -f "$AWS_CREDENTIALS_FILE" ]; then
    ACCESS_KEY=$(echo "$NEW_KEY" | jq -r '.AccessKeyId')
    SECRET_KEY=$(echo "$NEW_KEY" | jq -r '.SecretAccessKey')
    # 备份旧凭证
    cp "$AWS_CREDENTIALS_FILE" "${AWS_CREDENTIALS_FILE}.backup.$(date +%Y%m%d_%H%M%S)"
    # 更新凭证
    sed -i "s/aws_access_key_id = .*/aws_access_key_id = $ACCESS_KEY/" "$AWS_CREDENTIALS_FILE"
    sed -i "s/aws_secret_access_key = .*/aws_secret_access_key = $SECRET_KEY/" "$AWS_CREDENTIALS_FILE"
    echo "本地凭证已更新"
fi
# 停用旧密钥
if [ -n "$OLD_KEY_ID" ]; then
    aws iam update-access-key --user-name "$USERNAME" \
        --access-key-id "$OLD_KEY_ID" --status Inactive
    echo "旧密钥 $OLD_KEY_ID 已停用"
    # 等待一段时间确认新密钥工作正常
    echo "确认新密钥正常工作..."
    if aws sts get-caller-identity --profile "$PROFILE" &>/dev/null; then
        # 删除旧密钥
        aws iam delete-access-key --user-name "$USERNAME" \
            --access-key-id "$OLD_KEY_ID"
        echo "旧密钥已删除"
    else
        echo "警告:新密钥验证失败,请手动检查"
        exit 1
    fi
fi
echo "密钥轮换完成!"

SSL/TLS 证书轮换脚本

#!/bin/bash
# ssl_rotation.sh - SSL证书自动轮换
CERT_NAME="myapp.example.com"
CERT_DIR="/etc/ssl/$CERT_NAME"
BACKUP_DIR="/backup/ssl/$(date +%Y%m%d)"
# 检查现有证书是否即将过期
check_cert_expiry() {
    if [ -f "$CERT_DIR/fullchain.pem" ]; then
        EXPIRY_DATE=$(openssl x509 -enddate -noout -in "$CERT_DIR/fullchain.pem" | cut -d= -f2)
        EXPIRY_EPOCH=$(date -d "$EXPIRY_DATE" +%s)
        NOW_EPOCH=$(date +%s)
        DAYS_LEFT=$(( ($EXPIRY_EPOCH - $NOW_EPOCH) / 86400 ))
        echo "证书有效期剩余: $DAYS_LEFT 天"
        if [ $DAYS_LEFT -gt 30 ]; then
            echo "证书仍然有效,无需轮换"
            exit 0
        fi
    fi
}
# 生成新证书(示例使用Let's Encrypt)
generate_certificate() {
    echo "生成新证书..."
    # 备份旧证书
    mkdir -p "$BACKUP_DIR"
    cp -r "$CERT_DIR"/* "$BACKUP_DIR/"
    # 使用certbot更新证书
    certbot renew --cert-name "$CERT_NAME" --non-interactive
    # 或者使用openssl生成自签名证书
    # openssl req -x509 -newkey rsa:4096 -keyout "$CERT_DIR/private.key" \
    #     -out "$CERT_DIR/certificate.crt" -days 365 -nodes \
    #     -subj "/CN=$CERT_NAME"
}
# 重新加载服务
reload_services() {
    echo "重新加载服务应用新证书..."
    # Nginx
    if systemctl is-active --quiet nginx; then
        nginx -t && systemctl reload nginx
    fi
    # Apache
    if systemctl is-active --quiet apache2; then
        apache2ctl configtest && systemctl reload apache2
    fi
}
# 主函数
main() {
    check_cert_expiry
    generate_certificate
    reload_services
    echo "证书轮换完成!"
}
main

数据库密码轮换脚本

#!/usr/bin/env python3
# db_password_rotation.py - 数据库密码自动轮换
import os
import random
import string
import subprocess
import json
from datetime import datetime
import boto3  # AWS SDK
import mysql.connector
from cryptography.fernet import Fernet
class DatabasePasswordRotator:
    def __init__(self, db_config, secret_name):
        self.db_config = db_config
        self.secret_name = secret_name
        self.secrets_client = boto3.client('secretsmanager')
    def generate_password(self, length=32):
        """生成随机密码"""
        characters = string.ascii_letters + string.digits + "!@#$%^&*"
        password = ''.join(random.choice(characters) for _ in range(length))
        return password
    def update_database_password(self, username, new_password):
        """更新数据库密码"""
        try:
            # 连接数据库
            conn = mysql.connector.connect(**self.db_config)
            cursor = conn.cursor()
            # 创建新用户或更新密码
            sql = f"ALTER USER '{username}'@'%' IDENTIFIED BY '{new_password}'"
            cursor.execute(sql)
            # 刷新权限
            cursor.execute("FLUSH PRIVILEGES")
            conn.commit()
            cursor.close()
            conn.close()
            print(f"数据库用户 {username} 密码已更新")
            return True
        except Exception as e:
            print(f"更新数据库密码失败: {e}")
            return False
    def update_secret_manager(self, new_password):
        """更新AWS Secrets Manager中的密码"""
        try:
            response = self.secrets_client.update_secret(
                SecretId=self.secret_name,
                SecretString=json.dumps({
                    'username': self.db_config['user'],
                    'password': new_password,
                    'updated_at': datetime.now().isoformat()
                })
            )
            print(f"Secrets Manager已更新: {response['Name']}")
            return True
        except Exception as e:
            print(f"更新Secrets Manager失败: {e}")
            return False
    def rotate_password(self, username):
        """执行密码轮换"""
        print(f"开始轮换 {username} 的密码...")
        # 生成新密码
        new_password = self.generate_password()
        # 先更新数据库
        if not self.update_database_password(username, new_password):
            raise Exception("数据库密码更新失败,回滚操作")
        # 更新Secrets Manager
        if not self.update_secret_manager(new_password):
            # 如果Secrets Manager更新失败,回滚数据库
            print("尝试回滚数据库密码...")
            # 需要保存旧密码才能回滚
            raise Exception("Secrets Manager更新失败,需要手动干预")
        print(f"密码轮换完成!新密码已更新到Secrets Manager")
        return new_password
# 使用示例
if __name__ == "__main__":
    # 配置
    db_config = {
        'host': 'mydb.example.com',
        'port': 3306,
        'user': 'admin',
        'password': 'old_password',
        'database': 'mysql'
    }
    secret_name = "prod/database/password"
    # 执行轮换
    rotator = DatabasePasswordRotator(db_config, secret_name)
    rotator.rotate_password("app_user")

加密密钥轮换脚本

#!/bin/bash
# encryption_key_rotation.sh - 加密密钥自动轮换
# 配置
KEY_DIR="/etc/encryption-keys"
NEW_KEY_DIR="/tmp/new-keys-$(date +%s)"
GPG_RECIPIENT="admin@example.com"
# 生成新密钥
generate_new_keys() {
    echo "生成新加密密钥..."
    mkdir -p "$NEW_KEY_DIR"
    # 生成AES-256密钥
    openssl rand -base64 32 > "$NEW_KEY_DIR/aes_key.txt"
    # 生成RSA密钥对
    openssl genrsa -out "$NEW_KEY_DIR/private.pem" 4096
    openssl rsa -in "$NEW_KEY_DIR/private.pem" -pubout -out "$NEW_KEY_DIR/public.pem"
    # 用GPG加密私钥
    gpg --encrypt --recipient "$GPG_RECIPIENT" \
        --output "$NEW_KEY_DIR/private.pem.gpg" \
        "$NEW_KEY_DIR/private.pem"
    echo "新密钥生成完毕"
}
# 重新加密数据
reencrypt_data() {
    echo "重新加密数据..."
    # 读取旧密钥
    OLD_KEY=$(cat "$KEY_DIR/aes_key.txt")
    # 读取新密钥
    NEW_KEY=$(cat "$NEW_KEY_DIR/aes_key.txt")
    # 找到所有加密文件
    find /data/encrypted -name "*.enc" | while read file; do
        echo "重新加密: $file"
        # 解密旧文件
        decrypted_file="${file%.enc}.dec"
        openssl enc -aes-256-cbc -d -in "$file" -out "$decrypted_file" \
            -pass pass:"$OLD_KEY" 2>/dev/null
        if [ $? -eq 0 ]; then
            # 用新密钥重新加密
            new_encrypted="${file%.enc}.new.enc"
            openssl enc -aes-256-cbc -salt -in "$decrypted_file" \
                -out "$new_encrypted" -pass pass:"$NEW_KEY"
            # 替换旧文件
            mv "$new_encrypted" "$file"
            rm "$decrypted_file"
        else
            echo "警告: 解密失败 $file"
        fi
    done
}
# 更新系统密钥存储
update_key_storage() {
    echo "更新密钥存储..."
    # 备份旧密钥
    BACKUP_DIR="/backup/keys/$(date +%Y%m%d_%H%M%S)"
    mkdir -p "$BACKUP_DIR"
    cp -r "$KEY_DIR"/* "$BACKUP_DIR/"
    # 复制新密钥到生产目录
    cp "$NEW_KEY_DIR/aes_key.txt" "$KEY_DIR/"
    cp "$NEW_KEY_DIR/private.pem.gpg" "$KEY_DIR/"
    cp "$NEW_KEY_DIR/public.pem" "$KEY_DIR/"
    # 清理临时目录
    rm -rf "$NEW_KEY_DIR"
    # 设置正确的权限
    chmod 600 "$KEY_DIR/aes_key.txt"
    chmod 600 "$KEY_DIR/private.pem.gpg"
    chmod 644 "$KEY_DIR/public.pem"
    echo "密钥存储已更新"
}
# 主函数
main() {
    # 检查是否以root运行
    if [[ $EUID -ne 0 ]]; then
        echo "必须使用root权限运行"
        exit 1
    fi
    # 创建目录
    mkdir -p "$KEY_DIR"
    mkdir -p "/data/encrypted"
    mkdir -p "/backup/keys"
    # 执行轮换
    generate_new_keys
    reencrypt_data
    update_key_storage
    echo "密钥轮换完成!"
    echo "旧密钥备份位置: $BACKUP_DIR"
}
main

API Token 轮换脚本

#!/usr/bin/env python3
# api_token_rotation.py - API Token自动轮换
import requests
import json
import hashlib
import hmac
import base64
import time
from datetime import datetime, timedelta
class APITokenRotator:
    def __init__(self, api_base_url, admin_token):
        self.api_base_url = api_base_url
        self.admin_token = admin_token
        self.headers = {
            'Authorization': f'Bearer {admin_token}',
            'Content-Type': 'application/json'
        }
    def generate_token(self, user_id, expiry_days=30):
        """生成新的API Token"""
        # 创建token负载
        payload = {
            'user_id': user_id,
            'created_at': datetime.now().isoformat(),
            'expires_at': (datetime.now() + timedelta(days=expiry_days)).isoformat(),
            'token_id': hashlib.sha256(f"{user_id}{time.time()}".encode()).hexdigest()[:16]
        }
        # 创建签名
        secret = os.environ.get('TOKEN_SIGNING_SECRET', 'default_secret')
        message = json.dumps(payload, sort_keys=True)
        signature = hmac.new(
            secret.encode(),
            message.encode(),
            hashlib.sha256
        ).hexdigest()
        # 组合token
        token = base64.b64encode(f"{message}.{signature}".encode()).decode()
        return token, payload
    def rotate_user_token(self, user_id, old_token):
        """为用户轮换Token"""
        print(f"为用户 {user_id} 轮换Token...")
        # 生成新token
        new_token, payload = self.generate_token(user_id)
        # 在API服务中更新token
        update_url = f"{self.api_base_url}/api/v1/users/{user_id}/token"
        response = requests.post(
            update_url,
            headers=self.headers,
            json={
                'new_token': new_token,
                'old_token': old_token
            }
        )
        if response.status_code == 200:
            print(f"Token轮换成功: {payload['token_id']}")
            return new_token
        else:
            print(f"Token轮换失败: {response.text}")
            return None
    def revoke_old_tokens(self, user_id, keep_days=7):
        """撤销旧的Token"""
        print(f"撤销用户 {user_id} 的旧Token...")
        revoke_url = f"{self.api_base_url}/api/v1/users/{user_id}/tokens/revoke"
        response = requests.post(
            revoke_url,
            headers=self.headers,
            json={'keep_days': keep_days}
        )
        if response.status_code == 200:
            print("旧Token已撤销")
        else:
            print(f"撤销Token失败: {response.text}")
    def rotate_all_tokens(self):
        """为所有活跃用户轮换Token"""
        # 获取所有用户列表
        users_url = f"{self.api_base_url}/api/v1/users"
        response = requests.get(users_url, headers=self.headers)
        if response.status_code != 200:
            print("获取用户列表失败")
            return
        users = response.json()
        rotated_tokens = {}
        for user in users:
            if user['status'] == 'active':
                new_token = self.rotate_user_token(user['id'], user['current_token'])
                if new_token:
                    rotated_tokens[user['id']] = new_token
        # 最后撤销旧Token
        for user_id in rotated_tokens:
            self.revoke_old_tokens(user_id)
        return rotated_tokens
# 使用示例
if __name__ == "__main__":
    import os
    # 从环境变量读取配置
    API_BASE_URL = os.environ.get('API_BASE_URL', 'https://api.example.com')
    ADMIN_TOKEN = os.environ.get('ADMIN_TOKEN')
    if not ADMIN_TOKEN:
        print("请设置 ADMIN_TOKEN 环境变量")
        exit(1)
    rotator = APITokenRotator(API_BASE_URL, ADMIN_TOKEN)
    # 轮换所有Token
    result = rotator.rotate_all_tokens()
    print(f"成功轮换了 {len(result)} 个Token")

使用建议

  1. 最佳实践

    • 定期执行密钥轮换(如90天)
    • 使用密钥管理服务(如AWS KMS, HashiCorp Vault)
    • 实现灰度发布,逐步切换
    • 保存回滚能力
  2. 安全考虑

    • 敏感信息使用环境变量或密钥管理服务
    • 日志记录但避免记录密钥本身
    • 使用最小权限原则
    • 实现审计日志
  3. 自动化部署

    • 使用Cron或调度服务定期执行
    • 结合CI/CD管道自动轮换
    • 设置监控和告警

根据你的具体需求选择合适的脚本,并确保在测试环境充分测试后再生产环境使用。

抱歉,评论功能暂时关闭!