Python案例怎么防止缓存过期?

wen python案例 58

本文目录导读:

Python案例怎么防止缓存过期?

  1. 使用TTL(Time To Live)缓存
  2. 使用第三方库(推荐)
  3. 智能刷新策略
  4. 基于状态变化的失效策略
  5. 实用建议

在Python中防止缓存过期通常有几种常用策略,下面我详细介绍几种方法:

使用TTL(Time To Live)缓存

基础实现

import time
from functools import wraps
def ttl_cache(seconds=300):
    def decorator(func):
        cache = {}
        @wraps(func)
        def wrapper(*args, **kwargs):
            key = str(args) + str(kwargs)
            # 检查缓存是否存在且未过期
            if key in cache:
                value, timestamp = cache[key]
                if time.time() - timestamp < seconds:
                    print(f"使用缓存数据: {key}")
                    return value
            # 重新获取数据
            value = func(*args, **kwargs)
            cache[key] = (value, time.time())
            print(f"更新缓存: {key}")
            return value
        return wrapper
    return decorator
# 使用示例
@ttl_cache(seconds=60)
def get_user_data(user_id):
    # 模拟耗时的API调用
    time.sleep(2)
    return {"id": user_id, "name": "张三", "role": "admin"}

使用第三方库(推荐)

使用cachetools

from cachetools import cached, TTLCache
from datetime import timedelta
# 创建缓存,设置最大大小和过期时间
cache = TTLCache(maxsize=100, ttl=300)  # 5分钟过期
@cached(cache)
def get_expensive_data(query):
    # 模拟耗时操作
    return f"结果: {query}"

使用redis(分布式缓存)

import redis
import json
from datetime import timedelta
class RedisCache:
    def __init__(self, host='localhost', port=6379, db=0):
        self.client = redis.Redis(host=host, port=port, db=db)
    def get_or_cache(self, key, func, ttl=300):
        # 尝试从缓存获取
        cached_data = self.client.get(key)
        if cached_data:
            print(f"从Redis获取缓存: {key}")
            return json.loads(cached_data)
        # 执行函数获取数据
        data = func()
        # 存入缓存并设置过期时间
        self.client.setex(key, ttl, json.dumps(data))
        print(f"缓存到Redis: {key}")
        return data
# 使用示例
redis_cache = RedisCache()
def get_user_profile(user_id):
    return redis_cache.get_or_cache(
        f"user:{user_id}",
        lambda: fetch_from_database(user_id),
        ttl=3600  # 1小时过期
    )

智能刷新策略

提前刷新(Prefetch)

import threading
import time
class SmartCache:
    def __init__(self, ttl=300, prefetch_time=60):
        self.ttl = ttl
        self.prefetch_time = prefetch_time  # 提前多少秒刷新
        self.cache = {}
        self.lock = threading.Lock()
    def get(self, key, fetch_func):
        with self.lock:
            now = time.time()
            if key in self.cache:
                value, timestamp = self.cache[key]
                age = now - timestamp
                # 如果缓存未过期
                if age < self.ttl:
                    # 如果接近过期时间,异步刷新
                    if age > (self.ttl - self.prefetch_time):
                        threading.Thread(
                            target=self._async_refresh,
                            args=(key, fetch_func)
                        ).start()
                    return value
            # 缓存过期或不存在,同步刷新
            return self._sync_refresh(key, fetch_func)
    def _sync_refresh(self, key, fetch_func):
        value = fetch_func()
        self.cache[key] = (value, time.time())
        return value
    def _async_refresh(self, key, fetch_func):
        try:
            value = fetch_func()
            with self.lock:
                self.cache[key] = (value, time.time())
        except Exception as e:
            print(f"异步刷新失败: {e}")
# 使用示例
smart_cache = SmartCache(ttl=300, prefetch_time=60)
def get_data():
    return smart_cache.get("my_data", lambda: fetch_expensive_data())

基于状态变化的失效策略

class EventDrivenCache:
    def __init__(self):
        self.cache = {}
        self.dependencies = {}
    def set(self, key, value, dependencies=None):
        self.cache[key] = {
            'value': value,
            'dependencies': dependencies or [],
            'timestamp': time.time()
        }
    def get(self, key):
        if key not in self.cache:
            return None
        entry = self.cache[key]
        # 检查依赖是否发生变化
        if self._check_dependencies_changed(entry['dependencies']):
            del self.cache[key]
            return None
        return entry['value']
    def invalidate(self, dependency_key):
        """当某个依赖发生变化时,清除所有相关缓存"""
        keys_to_delete = []
        for key, entry in self.cache.items():
            if dependency_key in entry['dependencies']:
                keys_to_delete.append(key)
        for key in keys_to_delete:
            del self.cache[key]
    def _check_dependencies_changed(self, dependencies):
        # 实现依赖检查逻辑
        pass

实用建议

最佳实践

# 1. 使用多层缓存
class MultiLayerCache:
    def __init__(self):
        self.l1_cache = {}  # 内存缓存,短TTL
        self.l2_cache = {}  # Redis缓存,长TTL
    def get(self, key):
        # 先查L1缓存
        if key in self.l1_cache:
            return self.l1_cache[key]
        # 再查L2缓存
        if key in self.l2_cache:
            self.l1_cache[key] = self.l2_cache[key]
            return self.l2_cache[key]
        return None
# 2. 设置合理的过期时间
def get_user_data(user_id):
    cache_key = f"user:{user_id}"
    # 不同类型数据设置不同TTL
    basic_info_ttl = 3600  # 基本信息1小时
    permissions_ttl = 300  # 权限信息5分钟
    activity_ttl = 60      # 活动状态1分钟
    # 根据数据类型使用不同TTL
    pass

防止缓存过期的关键策略:

  1. 合理设置TTL:根据数据更新频率调整
  2. 异步刷新:在过期前提前更新
  3. 事件驱动:数据更新时主动清除相关缓存
  4. 多层缓存:平衡性能和一致性
  5. 监控和调整:根据实际使用情况动态调整策略

选择哪种方案取决于你的具体需求:数据一致性要求、性能要求、系统规模等。

抱歉,评论功能暂时关闭!