Python案例怎么设置本地缓存?

wen python案例 61

本文目录导读:

Python案例怎么设置本地缓存?

  1. 使用 functools.lru_cache(最简单的缓存装饰器)
  2. 使用 cachetools(更灵活的缓存库)
  3. 使用字典手动实现缓存
  4. 使用 file-based 缓存(文件缓存)
  5. 使用 redis 作为缓存(适合生产环境)
  6. 完整示例:带过期时间的缓存装饰器
  7. 选择建议

在Python中设置本地缓存有多种方式,我为你介绍几种常用的方法:

使用 functools.lru_cache(最简单的缓存装饰器)

from functools import lru_cache
@lru_cache(maxsize=128)  # maxsize 缓存最大数量
def expensive_function(n):
    """模拟一个耗时计算"""
    print(f"计算 {n}...")
    return n * n
# 测试
print(expensive_function(5))  # 计算 5
print(expensive_function(5))  # 直接从缓存读取,不计算
print(expensive_function(10)) # 计算 10
print(expensive_function.cache_info())  # 查看缓存统计

使用 cachetools(更灵活的缓存库)

from cachetools import cached, TTLCache, LRUCache
import time
# 创建缓存:最大100个,过期时间30秒
cache = TTLCache(maxsize=100, ttl=30)
@cached(cache)
def get_data(url):
    """模拟获取网络数据"""
    print(f"从 {url} 获取数据...")
    return f"数据来自 {url}"
# 测试
print(get_data("https://api.example.com/data1"))  # 第一次获取
print(get_data("https://api.example.com/data1"))  # 从缓存读取
time.sleep(31)
print(get_data("https://api.example.com/data1"))  # 缓存已过期,重新获取

使用字典手动实现缓存

class SimpleCache:
    def __init__(self):
        self.cache = {}
    def get(self, key):
        if key in self.cache:
            return self.cache[key]
        return None
    def set(self, key, value):
        self.cache[key] = value
    def clear(self):
        self.cache.clear()
# 使用示例
cache = SimpleCache()
def get_user_info(user_id):
    # 先检查缓存
    cached_data = cache.get(f"user_{user_id}")
    if cached_data:
        print(f"从缓存获取用户 {user_id} 信息")
        return cached_data
    # 模拟从数据库获取
    print(f"从数据库获取用户 {user_id} 信息")
    user_data = {"id": user_id, "name": f"用户{user_id}"}
    # 存入缓存
    cache.set(f"user_{user_id}", user_data)
    return user_data
# 测试
print(get_user_info(1))  # 从数据库获取
print(get_user_info(1))  # 从缓存获取

使用 file-based 缓存(文件缓存)

import json
import os
import time
class FileCache:
    def __init__(self, cache_dir="cache"):
        self.cache_dir = cache_dir
        os.makedirs(cache_dir, exist_ok=True)
    def _get_cache_path(self, key):
        # 使用 key 的哈希值作为文件名
        filename = str(hash(key)) + ".json"
        return os.path.join(self.cache_dir, filename)
    def get(self, key):
        cache_path = self._get_cache_path(key)
        if not os.path.exists(cache_path):
            return None
        with open(cache_path, 'r', encoding='utf-8') as f:
            data = json.load(f)
        # 检查是否过期(这里设置24小时过期)
        if time.time() - data['timestamp'] > 86400:
            os.remove(cache_path)
            return None
        return data['value']
    def set(self, key, value):
        cache_path = self._get_cache_path(key)
        data = {
            'key': key,
            'value': value,
            'timestamp': time.time()
        }
        with open(cache_path, 'w', encoding='utf-8') as f:
            json.dump(data, f, ensure_ascii=False)
# 使用示例
file_cache = FileCache()
# 保存缓存
file_cache.set("api_response", {"result": "success", "data": [1,2,3]})
# 读取缓存
result = file_cache.get("api_response")
print(result)  # {'result': 'success', 'data': [1, 2, 3]}

使用 redis 作为缓存(适合生产环境)

# 首先安装 redis: pip install redis
import redis
import json
# 连接 Redis
redis_client = redis.Redis(host='localhost', port=6379, db=0)
def get_data_with_redis(key, expiration=3600):
    """从 Redis 获取数据,过期时间 1小时"""
    # 尝试从缓存获取
    cached = redis_client.get(key)
    if cached:
        print(f"从 Redis 缓存获取 {key}")
        return json.loads(cached.decode())
    # 模拟获取数据
    print(f"重新获取数据 {key}")
    data = {"key": key, "value": "some data"}
    # 存入 Redis
    redis_client.setex(
        key,
        expiration,
        json.dumps(data)
    )
    return data
# 测试
result = get_data_with_redis("user:1")
result = get_data_with_redis("user:1")  # 第二次从缓存获取

完整示例:带过期时间的缓存装饰器

import time
from functools import wraps
def ttl_cache(seconds=60):
    """自定义TTL缓存装饰器"""
    cache = {}
    def decorator(func):
        @wraps(func)
        def wrapper(*args, **kwargs):
            # 生成缓存key
            key = str(args) + str(sorted(kwargs.items()))
            # 检查缓存
            if key in cache:
                value, timestamp = cache[key]
                if time.time() - timestamp < seconds:
                    print(f"缓存命中: {func.__name__}{args}")
                    return value
            # 执行函数
            result = func(*args, **kwargs)
            # 更新缓存
            cache[key] = (result, time.time())
            print(f"更新缓存: {func.__name__}{args}")
            return result
        return wrapper
    return decorator
# 使用自定义缓存
@ttl_cache(seconds=5)
def slow_function(n):
    """模拟耗时操作"""
    time.sleep(2)  # 模拟2秒的耗时
    return n * 2
# 测试
print(slow_function(5))  # 等待2秒
print(slow_function(5))  # 立即返回(缓存命中)
time.sleep(6)            # 等待缓存过期
print(slow_function(5))  # 重新计算,等待2秒

选择建议

  1. 简单场景:使用 functools.lru_cache
  2. 需要过期时间:使用 cachetools.TTLCache
  3. 需要持久化:使用文件缓存
  4. 分布式系统:使用 Redis
  5. 简单需求:手动实现字典缓存

根据你的具体需求选择合适的缓存方案!

抱歉,评论功能暂时关闭!