本文目录导读:

AWS IAM 访问密钥轮换脚本
#!/bin/bash
# aws_key_rotation.sh - AWS IAM用户访问密钥轮换
set -e
USERNAME="${1:-$(aws iam get-user --query 'User.UserName' --output text)}"
PROFILE="${2:-default}"
echo "正在为用户 $USERNAME 轮换密钥..."
# 创建新密钥
NEW_KEY=$(aws iam create-access-key --user-name "$USERNAME" \
--query 'AccessKey.{AccessKeyId:AccessKeyId,SecretAccessKey:SecretAccessKey}' \
--output json)
echo "新密钥已创建"
echo "$NEW_KEY"
# 获取旧密钥ID
OLD_KEY_ID=$(aws iam list-access-keys --user-name "$USERNAME" \
--query 'AccessKeyMetadata[0].AccessKeyId' --output text)
# 等待新密钥生效
echo "等待10秒让新密钥生效..."
sleep 10
# 更新本地凭据文件
AWS_CREDENTIALS_FILE="$HOME/.aws/credentials"
if [ -f "$AWS_CREDENTIALS_FILE" ]; then
ACCESS_KEY=$(echo "$NEW_KEY" | jq -r '.AccessKeyId')
SECRET_KEY=$(echo "$NEW_KEY" | jq -r '.SecretAccessKey')
# 备份旧凭证
cp "$AWS_CREDENTIALS_FILE" "${AWS_CREDENTIALS_FILE}.backup.$(date +%Y%m%d_%H%M%S)"
# 更新凭证
sed -i "s/aws_access_key_id = .*/aws_access_key_id = $ACCESS_KEY/" "$AWS_CREDENTIALS_FILE"
sed -i "s/aws_secret_access_key = .*/aws_secret_access_key = $SECRET_KEY/" "$AWS_CREDENTIALS_FILE"
echo "本地凭证已更新"
fi
# 停用旧密钥
if [ -n "$OLD_KEY_ID" ]; then
aws iam update-access-key --user-name "$USERNAME" \
--access-key-id "$OLD_KEY_ID" --status Inactive
echo "旧密钥 $OLD_KEY_ID 已停用"
# 等待一段时间确认新密钥工作正常
echo "确认新密钥正常工作..."
if aws sts get-caller-identity --profile "$PROFILE" &>/dev/null; then
# 删除旧密钥
aws iam delete-access-key --user-name "$USERNAME" \
--access-key-id "$OLD_KEY_ID"
echo "旧密钥已删除"
else
echo "警告:新密钥验证失败,请手动检查"
exit 1
fi
fi
echo "密钥轮换完成!"
SSL/TLS 证书轮换脚本
#!/bin/bash
# ssl_rotation.sh - SSL证书自动轮换
CERT_NAME="myapp.example.com"
CERT_DIR="/etc/ssl/$CERT_NAME"
BACKUP_DIR="/backup/ssl/$(date +%Y%m%d)"
# 检查现有证书是否即将过期
check_cert_expiry() {
if [ -f "$CERT_DIR/fullchain.pem" ]; then
EXPIRY_DATE=$(openssl x509 -enddate -noout -in "$CERT_DIR/fullchain.pem" | cut -d= -f2)
EXPIRY_EPOCH=$(date -d "$EXPIRY_DATE" +%s)
NOW_EPOCH=$(date +%s)
DAYS_LEFT=$(( ($EXPIRY_EPOCH - $NOW_EPOCH) / 86400 ))
echo "证书有效期剩余: $DAYS_LEFT 天"
if [ $DAYS_LEFT -gt 30 ]; then
echo "证书仍然有效,无需轮换"
exit 0
fi
fi
}
# 生成新证书(示例使用Let's Encrypt)
generate_certificate() {
echo "生成新证书..."
# 备份旧证书
mkdir -p "$BACKUP_DIR"
cp -r "$CERT_DIR"/* "$BACKUP_DIR/"
# 使用certbot更新证书
certbot renew --cert-name "$CERT_NAME" --non-interactive
# 或者使用openssl生成自签名证书
# openssl req -x509 -newkey rsa:4096 -keyout "$CERT_DIR/private.key" \
# -out "$CERT_DIR/certificate.crt" -days 365 -nodes \
# -subj "/CN=$CERT_NAME"
}
# 重新加载服务
reload_services() {
echo "重新加载服务应用新证书..."
# Nginx
if systemctl is-active --quiet nginx; then
nginx -t && systemctl reload nginx
fi
# Apache
if systemctl is-active --quiet apache2; then
apache2ctl configtest && systemctl reload apache2
fi
}
# 主函数
main() {
check_cert_expiry
generate_certificate
reload_services
echo "证书轮换完成!"
}
main
数据库密码轮换脚本
#!/usr/bin/env python3
# db_password_rotation.py - 数据库密码自动轮换
import os
import random
import string
import subprocess
import json
from datetime import datetime
import boto3 # AWS SDK
import mysql.connector
from cryptography.fernet import Fernet
class DatabasePasswordRotator:
def __init__(self, db_config, secret_name):
self.db_config = db_config
self.secret_name = secret_name
self.secrets_client = boto3.client('secretsmanager')
def generate_password(self, length=32):
"""生成随机密码"""
characters = string.ascii_letters + string.digits + "!@#$%^&*"
password = ''.join(random.choice(characters) for _ in range(length))
return password
def update_database_password(self, username, new_password):
"""更新数据库密码"""
try:
# 连接数据库
conn = mysql.connector.connect(**self.db_config)
cursor = conn.cursor()
# 创建新用户或更新密码
sql = f"ALTER USER '{username}'@'%' IDENTIFIED BY '{new_password}'"
cursor.execute(sql)
# 刷新权限
cursor.execute("FLUSH PRIVILEGES")
conn.commit()
cursor.close()
conn.close()
print(f"数据库用户 {username} 密码已更新")
return True
except Exception as e:
print(f"更新数据库密码失败: {e}")
return False
def update_secret_manager(self, new_password):
"""更新AWS Secrets Manager中的密码"""
try:
response = self.secrets_client.update_secret(
SecretId=self.secret_name,
SecretString=json.dumps({
'username': self.db_config['user'],
'password': new_password,
'updated_at': datetime.now().isoformat()
})
)
print(f"Secrets Manager已更新: {response['Name']}")
return True
except Exception as e:
print(f"更新Secrets Manager失败: {e}")
return False
def rotate_password(self, username):
"""执行密码轮换"""
print(f"开始轮换 {username} 的密码...")
# 生成新密码
new_password = self.generate_password()
# 先更新数据库
if not self.update_database_password(username, new_password):
raise Exception("数据库密码更新失败,回滚操作")
# 更新Secrets Manager
if not self.update_secret_manager(new_password):
# 如果Secrets Manager更新失败,回滚数据库
print("尝试回滚数据库密码...")
# 需要保存旧密码才能回滚
raise Exception("Secrets Manager更新失败,需要手动干预")
print(f"密码轮换完成!新密码已更新到Secrets Manager")
return new_password
# 使用示例
if __name__ == "__main__":
# 配置
db_config = {
'host': 'mydb.example.com',
'port': 3306,
'user': 'admin',
'password': 'old_password',
'database': 'mysql'
}
secret_name = "prod/database/password"
# 执行轮换
rotator = DatabasePasswordRotator(db_config, secret_name)
rotator.rotate_password("app_user")
加密密钥轮换脚本
#!/bin/bash
# encryption_key_rotation.sh - 加密密钥自动轮换
# 配置
KEY_DIR="/etc/encryption-keys"
NEW_KEY_DIR="/tmp/new-keys-$(date +%s)"
GPG_RECIPIENT="admin@example.com"
# 生成新密钥
generate_new_keys() {
echo "生成新加密密钥..."
mkdir -p "$NEW_KEY_DIR"
# 生成AES-256密钥
openssl rand -base64 32 > "$NEW_KEY_DIR/aes_key.txt"
# 生成RSA密钥对
openssl genrsa -out "$NEW_KEY_DIR/private.pem" 4096
openssl rsa -in "$NEW_KEY_DIR/private.pem" -pubout -out "$NEW_KEY_DIR/public.pem"
# 用GPG加密私钥
gpg --encrypt --recipient "$GPG_RECIPIENT" \
--output "$NEW_KEY_DIR/private.pem.gpg" \
"$NEW_KEY_DIR/private.pem"
echo "新密钥生成完毕"
}
# 重新加密数据
reencrypt_data() {
echo "重新加密数据..."
# 读取旧密钥
OLD_KEY=$(cat "$KEY_DIR/aes_key.txt")
# 读取新密钥
NEW_KEY=$(cat "$NEW_KEY_DIR/aes_key.txt")
# 找到所有加密文件
find /data/encrypted -name "*.enc" | while read file; do
echo "重新加密: $file"
# 解密旧文件
decrypted_file="${file%.enc}.dec"
openssl enc -aes-256-cbc -d -in "$file" -out "$decrypted_file" \
-pass pass:"$OLD_KEY" 2>/dev/null
if [ $? -eq 0 ]; then
# 用新密钥重新加密
new_encrypted="${file%.enc}.new.enc"
openssl enc -aes-256-cbc -salt -in "$decrypted_file" \
-out "$new_encrypted" -pass pass:"$NEW_KEY"
# 替换旧文件
mv "$new_encrypted" "$file"
rm "$decrypted_file"
else
echo "警告: 解密失败 $file"
fi
done
}
# 更新系统密钥存储
update_key_storage() {
echo "更新密钥存储..."
# 备份旧密钥
BACKUP_DIR="/backup/keys/$(date +%Y%m%d_%H%M%S)"
mkdir -p "$BACKUP_DIR"
cp -r "$KEY_DIR"/* "$BACKUP_DIR/"
# 复制新密钥到生产目录
cp "$NEW_KEY_DIR/aes_key.txt" "$KEY_DIR/"
cp "$NEW_KEY_DIR/private.pem.gpg" "$KEY_DIR/"
cp "$NEW_KEY_DIR/public.pem" "$KEY_DIR/"
# 清理临时目录
rm -rf "$NEW_KEY_DIR"
# 设置正确的权限
chmod 600 "$KEY_DIR/aes_key.txt"
chmod 600 "$KEY_DIR/private.pem.gpg"
chmod 644 "$KEY_DIR/public.pem"
echo "密钥存储已更新"
}
# 主函数
main() {
# 检查是否以root运行
if [[ $EUID -ne 0 ]]; then
echo "必须使用root权限运行"
exit 1
fi
# 创建目录
mkdir -p "$KEY_DIR"
mkdir -p "/data/encrypted"
mkdir -p "/backup/keys"
# 执行轮换
generate_new_keys
reencrypt_data
update_key_storage
echo "密钥轮换完成!"
echo "旧密钥备份位置: $BACKUP_DIR"
}
main
API Token 轮换脚本
#!/usr/bin/env python3
# api_token_rotation.py - API Token自动轮换
import requests
import json
import hashlib
import hmac
import base64
import time
from datetime import datetime, timedelta
class APITokenRotator:
def __init__(self, api_base_url, admin_token):
self.api_base_url = api_base_url
self.admin_token = admin_token
self.headers = {
'Authorization': f'Bearer {admin_token}',
'Content-Type': 'application/json'
}
def generate_token(self, user_id, expiry_days=30):
"""生成新的API Token"""
# 创建token负载
payload = {
'user_id': user_id,
'created_at': datetime.now().isoformat(),
'expires_at': (datetime.now() + timedelta(days=expiry_days)).isoformat(),
'token_id': hashlib.sha256(f"{user_id}{time.time()}".encode()).hexdigest()[:16]
}
# 创建签名
secret = os.environ.get('TOKEN_SIGNING_SECRET', 'default_secret')
message = json.dumps(payload, sort_keys=True)
signature = hmac.new(
secret.encode(),
message.encode(),
hashlib.sha256
).hexdigest()
# 组合token
token = base64.b64encode(f"{message}.{signature}".encode()).decode()
return token, payload
def rotate_user_token(self, user_id, old_token):
"""为用户轮换Token"""
print(f"为用户 {user_id} 轮换Token...")
# 生成新token
new_token, payload = self.generate_token(user_id)
# 在API服务中更新token
update_url = f"{self.api_base_url}/api/v1/users/{user_id}/token"
response = requests.post(
update_url,
headers=self.headers,
json={
'new_token': new_token,
'old_token': old_token
}
)
if response.status_code == 200:
print(f"Token轮换成功: {payload['token_id']}")
return new_token
else:
print(f"Token轮换失败: {response.text}")
return None
def revoke_old_tokens(self, user_id, keep_days=7):
"""撤销旧的Token"""
print(f"撤销用户 {user_id} 的旧Token...")
revoke_url = f"{self.api_base_url}/api/v1/users/{user_id}/tokens/revoke"
response = requests.post(
revoke_url,
headers=self.headers,
json={'keep_days': keep_days}
)
if response.status_code == 200:
print("旧Token已撤销")
else:
print(f"撤销Token失败: {response.text}")
def rotate_all_tokens(self):
"""为所有活跃用户轮换Token"""
# 获取所有用户列表
users_url = f"{self.api_base_url}/api/v1/users"
response = requests.get(users_url, headers=self.headers)
if response.status_code != 200:
print("获取用户列表失败")
return
users = response.json()
rotated_tokens = {}
for user in users:
if user['status'] == 'active':
new_token = self.rotate_user_token(user['id'], user['current_token'])
if new_token:
rotated_tokens[user['id']] = new_token
# 最后撤销旧Token
for user_id in rotated_tokens:
self.revoke_old_tokens(user_id)
return rotated_tokens
# 使用示例
if __name__ == "__main__":
import os
# 从环境变量读取配置
API_BASE_URL = os.environ.get('API_BASE_URL', 'https://api.example.com')
ADMIN_TOKEN = os.environ.get('ADMIN_TOKEN')
if not ADMIN_TOKEN:
print("请设置 ADMIN_TOKEN 环境变量")
exit(1)
rotator = APITokenRotator(API_BASE_URL, ADMIN_TOKEN)
# 轮换所有Token
result = rotator.rotate_all_tokens()
print(f"成功轮换了 {len(result)} 个Token")
使用建议
-
最佳实践:
- 定期执行密钥轮换(如90天)
- 使用密钥管理服务(如AWS KMS, HashiCorp Vault)
- 实现灰度发布,逐步切换
- 保存回滚能力
-
安全考虑:
- 敏感信息使用环境变量或密钥管理服务
- 日志记录但避免记录密钥本身
- 使用最小权限原则
- 实现审计日志
-
自动化部署:
- 使用Cron或调度服务定期执行
- 结合CI/CD管道自动轮换
- 设置监控和告警
根据你的具体需求选择合适的脚本,并确保在测试环境充分测试后再生产环境使用。