本文目录导读:

我来为你提供几种常见缓存的预热脚本方案。
Redis 缓存预热脚本
基础预热脚本 (Python)
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
Redis缓存预热脚本
"""
import redis
import json
import time
from datetime import datetime
import logging
# 配置日志
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger(__name__)
class RedisCacheWarmer:
def __init__(self, redis_config=None):
"""初始化Redis连接"""
if redis_config is None:
redis_config = {
'host': 'localhost',
'port': 6379,
'db': 0,
'password': None,
'decode_responses': True
}
self.client = redis.Redis(**redis_config)
def warm_hot_data(self):
"""预热热点数据"""
logger.info("开始预热热点数据...")
# 需要预热的数据(示例)
hot_data = {
"user:1": {
"id": 1,
"name": "张三",
"email": "zhangsan@example.com"
},
"user:2": {
"id": 2,
"name": "李四",
"email": "lisi@example.com"
},
"config:system": {
"site_name": "我的网站",
"version": "1.0.0",
"debug": False
}
}
# 设置过期时间(秒)
expire_time = 3600 # 1小时
for key, value in hot_data.items():
try:
# 序列化并写入Redis
self.client.setex(
key,
expire_time,
json.dumps(value, ensure_ascii=False)
)
logger.info(f"缓存成功: {key}")
except Exception as e:
logger.error(f"缓存失败 {key}: {str(e)}")
def warm_database_data(self, db_query_func, batch_size=100):
"""从数据库批量预热"""
logger.info("开始从数据库预热...")
page = 1
while True:
# 查询数据库(需要你实现具体的查询逻辑)
records = db_query_func(page, batch_size)
if not records:
break
# 写入缓存
pipeline = self.client.pipeline()
for record in records:
key = f"data:{record['id']}"
pipeline.setex(key, 3600, json.dumps(record))
pipeline.execute()
logger.info(f"已缓存第 {page} 页数据")
page += 1
time.sleep(0.1) # 避免压垮数据库
def warm_with_pattern(self, pattern_func):
"""按规则生成缓存"""
logger.info("开始按规则生成缓存...")
# 示例:生成商品缓存
product_ids = range(1, 1001) # 商品ID范围
pipeline = self.client.pipeline()
for product_id in product_ids:
key = f"product:{product_id}:info"
value = pattern_func(product_id)
pipeline.setex(key, 3600, json.dumps(value))
# 每1000条执行一次
if product_id % 1000 == 0:
pipeline.execute()
logger.info(f"已预热到第 {product_id} 条")
pipeline.execute()
logger.info("规则生成功完成")
# 使用示例
if __name__ == "__main__":
warmer = RedisCacheWarmer()
# 预热热点数据
warmer.warm_hot_data()
Shell 脚本版本
#!/bin/bash
# Redis缓存预热脚本 (Shell版)
REDIS_CLI="/usr/bin/redis-cli"
REDIS_HOST="localhost"
REDIS_PORT=6379
REDIS_PASSWORD=""
echo "开始Redis缓存预热..."
# 1. 批量设置缓存
$REDIS_CLI -h $REDIS_HOST -p $REDIS_PORT <<EOF
SET user:1 '{"id":1,"name":"张三"}'
SET user:2 '{"id":2,"name":"李四"}'
SET config:system '{"version":"1.0.0"}'
EXPIRE user:1 3600
EXPIRE user:2 3600
EXPIRE config:system 3600
EOF
# 2. 从文件批量导入
echo "从文件批量导入缓存..."
cat << 'EOF' | $REDIS_CLI -h $REDIS_HOST -p $REDIS_PORT --pipe
SET key1 value1 EX 3600
SET key2 value2 EX 3600
SET key3 value3 EX 3600
EOF
echo "缓存预热完成!"
Go语言版本
// cache_warmer.go
package main
import (
"context"
"encoding/json"
"fmt"
"log"
"time"
"github.com/go-redis/redis/v8"
)
type CacheWarmer struct {
client *redis.Client
ctx context.Context
}
func NewCacheWarmer() *CacheWarmer {
client := redis.NewClient(&redis.Options{
Addr: "localhost:6379",
Password: "",
DB: 0,
})
return &CacheWarmer{
client: client,
ctx: context.Background(),
}
}
// 批量预热
func (w *CacheWarmer) BatchWarm(data map[string]interface{}, ttl time.Duration) error {
pipe := w.client.Pipeline()
for key, value := range data {
// 序列化
jsonBytes, err := json.Marshal(value)
if err != nil {
log.Printf("序列化失败 %s: %v", key, err)
continue
}
// 设置缓存
pipe.Set(w.ctx, key, jsonBytes, ttl)
}
_, err := pipe.Exec(w.ctx)
return err
}
// 渐进式预热
func (w *CacheWarmer) ProgressiveWarm(items []CacheItem) {
for i, item := range items {
if err := w.client.Set(w.ctx, item.Key, item.Value, item.TTL).Err(); err != nil {
log.Printf("缓存失败 %s: %v", item.Key, err)
continue
}
// 每100条打印进度
if (i+1) % 100 == 0 {
log.Printf("已预热 %d/%d 条数据", i+1, len(items))
}
time.Sleep(10 * time.Millisecond) // 避免Redis压力过大
}
}
type CacheItem struct {
Key string
Value interface{}
TTL time.Duration
}
func main() {
warmer := NewCacheWarmer()
// 示例数据
data := map[string]interface{}{
"hot:1": "热门数据1",
"hot:2": "热门数据2",
}
if err := warmer.BatchWarm(data, time.Hour); err != nil {
log.Fatalf("预热失败: %v", err)
}
fmt.Println("缓存预热完成!")
}
Java Spring Boot 版本
// CacheWarmer.java
@Component
public class CacheWarmer implements CommandLineRunner {
@Autowired
private StringRedisTemplate redisTemplate;
@Autowired
private DataService dataService;
@Override
public void run(String... args) throws Exception {
log.info("开始缓存预热...");
// 1. 预热固定数据
warmStaticData();
// 2. 预热热点数据
warmHotData();
// 3. 预热数据库数据
warmFromDatabase();
log.info("缓存预热完成!");
}
private void warmStaticData() {
// 系统配置
redisTemplate.opsForValue().set(
"config:system",
"{\"version\":\"1.0.0\"}",
3600,
TimeUnit.SECONDS
);
}
private void warmHotData() {
// 批量预热
Map<String, String> hotData = new HashMap<>();
hotData.put("user:1", "用户数据1");
hotData.put("user:2", "用户数据2");
redisTemplate.opsForValue().multiSet(hotData);
// 设置过期时间
hotData.keySet().forEach(key ->
redisTemplate.expire(key, 3600, TimeUnit.SECONDS)
);
}
private void warmFromDatabase() {
// 分页查询数据库并缓存
int page = 1;
int size = 100;
while (true) {
List<Data> dataList = dataService.getPage(page, size);
if (dataList.isEmpty()) break;
// 批量写入缓存
dataList.forEach(data -> {
String key = "data:" + data.getId();
redisTemplate.opsForValue().set(
key,
JSON.toJSONString(data),
3600,
TimeUnit.SECONDS
);
});
page++;
}
}
}
缓存预热最佳实践
预热策略配置文件
# warmup-config.yaml
cache_warmup:
redis:
host: localhost
port: 6379
password: ""
db: 0
strategy:
# 预热类型
types:
- hot_data # 热点数据
- db_data # 数据库数据
- computed # 计算结果
# 并发数
concurrency: 10
# 批处理大小
batch_size: 1000
# 默认过期时间(秒)
default_ttl: 3600
# 预热数据源
data_sources:
hot_keys_file: /data/cache/hot_keys.json
sql_query: "SELECT * FROM products WHERE status = 1"
预热监控脚本
#!/usr/bin/env python3
# monitor_warmup.py
import redis
import time
import logging
def monitor_warmup_progress(total_items):
"""监控预热进度"""
logger = logging.getLogger(__name__)
r = redis.Redis()
warmed_count = 0
start_time = time.time()
while warmed_count < total_items:
# 检查Redis中的缓存数量
current_count = r.dbsize()
if current_count > warmed_count:
warmed_count = current_count
progress = (warmed_count / total_items) * 100
elapsed = time.time() - start_time
rate = warmed_count / elapsed if elapsed > 0 else 0
logger.info(
f"预热进度: {progress:.2f}% "
f"({warmed_count}/{total_items}) "
f"速度: {rate:.2f}条/秒"
)
time.sleep(1)
# 使用
monitor_warmup_progress(10000)
注意事项
- 压力控制:预热时要控制速率,避免压垮后端服务
- 异常处理:做好异常处理,避免预热中断
- 幂等性:预热脚本应该可以重复执行而不会产生副作用
- 监控告警:添加监控和告警机制
- 分步执行:大数据量时采用分批策略
- 预热验证:预热后验证数据完整性
选择哪种方案取决于你的技术栈和具体需求,建议先从简单的脚本开始,逐步优化。