本文目录导读:

下面我通过几个案例,由浅入深地介绍如何在 Python 中实现缓存装饰器。
基础缓存装饰器(字典缓存)
import functools
def cache_decorator(func):
"""最简单的缓存装饰器"""
cache = {} # 缓存存储
@functools.wraps(func)
def wrapper(*args, **kwargs):
# 创建缓存的键
key = str(args) + str(kwargs)
if key not in cache:
cache[key] = func(*args, **kwargs)
print(f"缓存未命中,计算结果: {cache[key]}")
else:
print(f"缓存命中,返回结果: {cache[key]}")
return cache[key]
return wrapper
@cache_decorator
def fibonacci(n):
"""计算斐波那契数列"""
if n <= 1:
return n
return fibonacci(n-1) + fibonacci(n-2)
# 测试
print(fibonacci(5)) # 首次计算
print(fibonacci(5)) # 缓存命中
print(fibonacci(6)) # 部分缓存命中
带过期时间的缓存装饰器
import time
import functools
def timed_cache(expire_time=60):
"""带过期时间的缓存装饰器"""
def decorator(func):
cache = {} # {key: (value, timestamp)}
@functools.wraps(func)
def wrapper(*args, **kwargs):
key = str(args) + str(kwargs)
current_time = time.time()
# 检查缓存是否有效
if key in cache:
value, timestamp = cache[key]
if current_time - timestamp < expire_time:
print(f"缓存命中,剩余时间: {expire_time - (current_time - timestamp):.2f}秒")
return value
else:
print("缓存已过期")
# 重新计算并缓存
value = func(*args, **kwargs)
cache[key] = (value, current_time)
print(f"新建缓存,过期时间: {expire_time}秒")
return value
return wrapper
return decorator
@timed_cache(expire_time=5) # 5秒过期
def get_time():
return time.time()
# 测试
print(f"第一次调用: {get_time()}")
time.sleep(2)
print(f"2秒后调用: {get_time()}")
time.sleep(4)
print(f"6秒后调用: {get_time()}") # 缓存过期
LRU(最近最少使用)缓存装饰器
import functools
from collections import OrderedDict
def lru_cache(maxsize=128):
"""LRU缓存装饰器"""
def decorator(func):
cache = OrderedDict()
hits = 0
misses = 0
@functools.wraps(func)
def wrapper(*args, **kwargs):
nonlocal hits, misses
key = str(args) + str(kwargs)
if key in cache:
hits += 1
# 将访问的项移到末尾(最近使用)
cache.move_to_end(key)
print(f"缓存命中! 命中率: {hits/(hits+misses)*100:.2f}%")
return cache[key]
misses += 1
value = func(*args, **kwargs)
# 如果缓存满了,删除最久未使用的项
if len(cache) >= maxsize:
cache.popitem(last=False) # 删除最早插入的项
print("缓存已满,移除最久未使用项")
cache[key] = value
print(f"缓存未命中,缓存大小: {len(cache)}/{maxsize}")
return value
# 添加统计方法
def cache_info():
return f"命中: {hits}, 未命中: {misses}, 命中率: {hits/(hits+misses)*100:.2f}%"
wrapper.cache_info = cache_info
return wrapper
return decorator
@lru_cache(maxsize=3)
def get_data(id):
"""模拟获取数据"""
return f"数据{id}"
# 测试
print(get_data(1)) # 未命中
print(get_data(2)) # 未命中
print(get_data(3)) # 未命中
print(get_data(1)) # 命中
print(get_data(4)) # 未命中,移除最久未使用的
print(f"缓存统计: {get_data.cache_info()}")
支持自定义键函数的缓存装饰器
import functools
import hashlib
import json
def custom_cache(key_func=None):
"""支持自定义键函数的缓存装饰器"""
def decorator(func):
cache = {}
@functools.wraps(func)
def wrapper(*args, **kwargs):
# 使用自定义键函数或默认键
if key_func:
cache_key = key_func(*args, **kwargs)
else:
# 默认使用参数的JSON序列化作为键
cache_key = hashlib.md5(
json.dumps((args, kwargs), sort_keys=True).encode()
).hexdigest()
if cache_key in cache:
print(f"缓存命中: {cache_key}")
return cache[cache_key]
result = func(*args, **kwargs)
cache[cache_key] = result
print(f"创建缓存: {cache_key}")
return result
return wrapper
return decorator
# 自定义键函数:只根据用户ID缓存
def user_cache_key(user_id, **kwargs):
return f"user_{user_id}"
@custom_cache(key_func=user_cache_key)
def get_user_info(user_id, refresh=False):
"""获取用户信息"""
return {"id": user_id, "name": f"用户{user_id}", "time": time.time()}
# 测试
print(get_user_info(1)) # 创建缓存
print(get_user_info(1, refresh=True)) # 缓存命中(忽略refresh参数)
print(get_user_info(2)) # 创建新缓存
使用Python内置的LRU缓存
from functools import lru_cache
import time
@lru_cache(maxsize=10)
def expensive_function(n):
"""耗时计算函数"""
time.sleep(2) # 模拟耗时操作
return n * n
# 测试
print("开始计算...")
print(expensive_function(5)) # 首次计算,等待2秒
print(expensive_function(5)) # 立即返回,缓存命中
print(expensive_function(10)) # 新计算,等待2秒
print(expensive_function(5)) # 立即返回
# 查看缓存统计
print(f"缓存信息: {expensive_function.cache_info()}")
异步函数缓存装饰器
import asyncio
import functools
def async_cache(expire_time=60):
"""异步函数缓存装饰器"""
def decorator(func):
cache = {}
@functools.wraps(func)
async def wrapper(*args, **kwargs):
key = str(args) + str(kwargs)
if key in cache:
value, timestamp = cache[key]
if time.time() - timestamp < expire_time:
print("缓存命中")
return value
value = await func(*args, **kwargs)
cache[key] = (value, time.time())
print("创建缓存")
return value
return wrapper
return decorator
@async_cache(expire_time=10)
async def fetch_data(url):
"""模拟异步获取数据"""
await asyncio.sleep(2) # 模拟网络请求
return f"数据来自 {url}"
# 测试
async def main():
print(await fetch_data("http://example.com"))
print(await fetch_data("http://example.com")) # 缓存命中
print(await fetch_data("http://other.com"))
asyncio.run(main())
使用建议
- 简单场景:使用
functools.lru_cache就够了 - 需要过期时间:使用自定义的带过期时间的装饰器
- 内存控制:使用LRU缓存限制大小
- 复杂键值:实现自定义键函数
- 异步场景:实现异步缓存装饰器
选择哪种方案取决于你的具体需求:缓存的生命周期、内存使用限制、函数调用的复杂性等。