本文目录导读:

我来通过一个具体的案例来演示Python中观察者模式的实现。
基础观察者模式实现
from abc import ABC, abstractmethod
from typing import List, Any
# 观察者抽象类
class Observer(ABC):
@abstractmethod
def update(self, subject: 'Subject', *args, **kwargs) -> None:
pass
# 主题抽象类
class Subject(ABC):
@abstractmethod
def attach(self, observer: Observer) -> None:
pass
@abstractmethod
def detach(self, observer: Observer) -> None:
pass
@abstractmethod
def notify(self, *args, **kwargs) -> None:
pass
# 具体主题 - 天气站
class WeatherStation(Subject):
def __init__(self):
self._observers: List[Observer] = []
self._temperature = 0
self._humidity = 0
self._pressure = 0
def attach(self, observer: Observer) -> None:
if observer not in self._observers:
self._observers.append(observer)
def detach(self, observer: Observer) -> None:
try:
self._observers.remove(observer)
except ValueError:
pass
def notify(self, *args, **kwargs) -> None:
for observer in self._observers:
observer.update(self, *args, **kwargs)
# 设置天气数据的方法
def set_measurements(self, temperature: float, humidity: float, pressure: float) -> None:
self._temperature = temperature
self._humidity = humidity
self._pressure = pressure
self.measurements_changed()
def measurements_changed(self) -> None:
self.notify(self._temperature, self._humidity, self._pressure)
# 获取数据的方法
def get_temperature(self) -> float:
return self._temperature
def get_humidity(self) -> float:
return self._humidity
def get_pressure(self) -> float:
return self._pressure
# 具体观察者 - 手机App
class PhoneApp(Observer):
def __init__(self, name: str):
self._name = name
def update(self, subject: Subject, temperature: float, humidity: float, pressure: float) -> None:
print(f"[{self._name}] 手机App更新天气信息:")
print(f" 温度: {temperature}°C")
print(f" 湿度: {humidity}%")
print(f" 气压: {pressure}hPa")
print()
# 具体观察者 - 显示屏
class DisplayScreen(Observer):
def __init__(self, location: str):
self._location = location
def update(self, subject: Subject, temperature: float, humidity: float, pressure: float) -> None:
print(f"[{self._location}] 天气显示屏更新:")
print(f" 📍 位置: {self._location}")
print(f" 🌡️ 温度: {temperature}°C")
print(f" 💧 湿度: {humidity}%")
print(f" 🌪️ 气压: {pressure}hPa")
print()
# 具体观察者 - 警报系统
class AlertSystem(Observer):
def __init__(self, threshold_temp: float = 35.0):
self._threshold_temp = threshold_temp
def update(self, subject: Subject, temperature: float, humidity: float, pressure: float) -> None:
if temperature > self._threshold_temp:
print(f"⚠️ 高温警报!温度达到 {temperature}°C,超过警戒线 {self._threshold_temp}°C")
elif temperature < 0:
print(f"❄️ 低温警报!温度降至 {temperature}°C")
else:
print(f"✅ 温度正常: {temperature}°C")
print()
使用案例 - 股票市场监控系统
# 股票市场观察者模式案例
class Stock(Subject):
def __init__(self, symbol: str, initial_price: float):
self._symbol = symbol
self._price = initial_price
self._observers: List[Observer] = []
def attach(self, observer: Observer) -> None:
self._observers.append(observer)
print(f" {observer} 开始监控 {self._symbol}")
def detach(self, observer: Observer) -> None:
self._observers.remove(observer)
print(f" {observer} 停止监控 {self._symbol}")
def notify(self, *args, **kwargs) -> None:
for observer in self._observers:
observer.update(self, *args, **kwargs)
def change_price(self, new_price: float) -> None:
old_price = self._price
self._price = new_price
change_percent = ((new_price - old_price) / old_price) * 100
self.notify(old_price, new_price, change_percent)
def get_symbol(self) -> str:
return self._symbol
def get_price(self) -> float:
return self._price
# 投资者观察者
class Investor(Observer):
def __init__(self, name: str, alert_threshold: float = 5.0):
self._name = name
self._alert_threshold = alert_threshold
def __str__(self) -> str:
return f"投资者 {self._name}"
def update(self, subject: Stock, old_price: float, new_price: float, change_percent: float) -> None:
action = "📈 上涨" if change_percent > 0 else "📉 下跌"
print(f"[{self._name}] {subject.get_symbol()} {action} {abs(change_percent):.2f}%")
print(f" 价格: ${old_price:.2f} → ${new_price:.2f}")
if abs(change_percent) >= self._alert_threshold:
print(f" 🚨 大幅波动!变化幅度超过 {self._alert_threshold}%")
print()
# 自动化交易系统
class TradingBot(Observer):
def __init__(self, bot_name: str, buy_threshold: float = -5.0, sell_threshold: float = 5.0):
self._bot_name = bot_name
self._buy_threshold = buy_threshold
self._sell_threshold = sell_threshold
def __str__(self) -> str:
return f"交易机器人 {self._bot_name}"
def update(self, subject: Stock, old_price: float, new_price: float, change_percent: float) -> None:
print(f"[{self._bot_name}] 分析 {subject.get_symbol()}:")
if change_percent <= self._buy_threshold:
print(f" 🟢 触发买入信号!跌幅 {abs(change_percent):.2f}%")
print(f" 建议买入 {subject.get_symbol()}")
elif change_percent >= self._sell_threshold:
print(f" 🔴 触发卖出信号!涨幅 {change_percent:.2f}%")
print(f" 建议卖出 {subject.get_symbol()}")
else:
print(f" ⚪ 价格变动在正常范围内")
print()
使用Python内置的观察者模式
# Python 使用内置特性实现观察者模式
class EventSystem:
"""使用Python的__call__和列表实现事件系统"""
def __init__(self):
self._events = {}
def on(self, event_name: str, callback):
"""注册事件监听器"""
if event_name not in self._events:
self._events[event_name] = []
self._events[event_name].append(callback)
return callback # 返回以便可以取消注册
def off(self, event_name: str, callback):
"""取消注册事件监听器"""
if event_name in self._events:
try:
self._events[event_name].remove(callback)
except ValueError:
pass
def emit(self, event_name: str, *args, **kwargs):
"""触发事件"""
if event_name in self._events:
for callback in self._events[event_name]:
callback(*args, **kwargs)
# 使用装饰器实现观察者模式
class Observable:
"""使用装饰器和属性实现的观察者模式"""
def __init__(self):
self._observers = []
def register(self, observer):
self._observers.append(observer)
def unregister(self, observer):
self._observers.remove(observer)
def notify_observers(self, *args, **kwargs):
for observer in self._observers:
observer(*args, **kwargs)
# 带属性的观察者装饰器
def observable_property(func):
"""装饰器:使属性变化时可以通知观察者"""
def wrapper(self, *args, **kwargs):
old_value = func.__get__(self) if hasattr(func, '__get__') else None
result = func(self, *args, **kwargs)
if hasattr(self, '_observers') and old_value != result:
self.notify_observers(func.__name__, old_value, result)
return result
return wrapper
完整使用示例
def main():
print("=" * 50)
print("天气站观察者模式案例")
print("=" * 50)
# 创建天气站
weather_station = WeatherStation()
# 创建观察者
phone_app = PhoneApp("iPhone")
display = DisplayScreen("城市广场")
alert = AlertSystem(threshold_temp=30.0)
# 注册观察者
weather_station.attach(phone_app)
weather_station.attach(display)
weather_station.attach(alert)
# 更新天气数据
print("第一次天气更新:")
weather_station.set_measurements(25.5, 60, 1013)
print("第二次天气更新(触发警报):")
weather_station.set_measurements(35.0, 45, 1008)
# 移除一个观察者
print("移除手机App观察者:")
weather_station.detach(phone_app)
print("第三次天气更新(只有显示屏和警报):")
weather_station.set_measurements(28.0, 55, 1015)
print("\n" + "=" * 50)
print("股票市场监控系统案例")
print("=" * 50)
# 创建股票
apple_stock = Stock("AAPL", 150.0)
google_stock = Stock("GOOGL", 2800.0)
# 创建投资者
investor1 = Investor("张三", alert_threshold=3.0)
investor2 = Investor("李四", alert_threshold=5.0)
trading_bot = TradingBot("AlphaBot", buy_threshold=-4.0, sell_threshold=4.0)
# 注册观察者
apple_stock.attach(investor1)
apple_stock.attach(investor2)
apple_stock.attach(trading_bot)
google_stock.attach(investor1)
# 模拟价格变动
print("AAPL 价格变动:")
apple_stock.change_price(155.0)
print("GOOGL 价格变动:")
google_stock.change_price(2700.0)
print("AAPL 大幅下跌:")
apple_stock.change_price(142.0)
print("\n" + "=" * 50)
print("事件系统案例")
print("=" * 50)
# 使用事件系统
event_system = EventSystem()
# 定义事件处理函数
def on_user_login(username):
print(f"用户 {username} 登录成功")
def on_user_logout(username):
print(f"用户 {username} 登出")
def send_email_notification(username):
print(f"发送邮件通知: {username} 登录")
# 注册事件
event_system.on('user_login', on_user_login)
event_system.on('user_login', send_email_notification)
event_system.on('user_logout', on_user_logout)
# 触发事件
event_system.emit('user_login', '张三')
event_system.emit('user_logout', '张三')
print("\n移除邮件通知后:")
event_system.off('user_login', send_email_notification)
event_system.emit('user_login', '李四')
if __name__ == "__main__":
main()
异步观察者模式(高级)
import asyncio
from typing import Set
from abc import ABC, abstractmethod
# 异步观察者模式
class AsyncObserver(ABC):
@abstractmethod
async def update(self, subject: 'AsyncSubject', *args, **kwargs) -> None:
pass
class AsyncSubject(ABC):
def __init__(self):
self._observers: Set[AsyncObserver] = set()
def attach(self, observer: AsyncObserver) -> None:
self._observers.add(observer)
def detach(self, observer: AsyncObserver) -> None:
self._observers.discard(observer)
async def notify(self, *args, **kwargs) -> None:
tasks = [observer.update(self, *args, **kwargs) for observer in self._observers]
await asyncio.gather(*tasks)
# 异步使用示例
class AsyncStock(AsyncSubject):
def __init__(self, symbol: str, initial_price: float):
super().__init__()
self.symbol = symbol
self.price = initial_price
async def change_price(self, new_price: float) -> None:
old_price = self.price
self.price = new_price
await self.notify(self.symbol, old_price, new_price)
class AsyncTrader(AsyncObserver):
def __init__(self, name: str):
self.name = name
async def update(self, subject: AsyncStock, symbol: str, old_price: float, new_price: float) -> None:
await asyncio.sleep(0.1) # 模拟异步操作
print(f"[{self.name}] 异步更新: {symbol} ${old_price:.2f} → ${new_price:.2f}")
async def async_main():
stock = AsyncStock("BTC", 50000)
trader1 = AsyncTrader("矿工")
trader2 = AsyncTrader("投资者")
stock.attach(trader1)
stock.attach(trader2)
print("异步价格更新:")
await stock.change_price(51000)
await stock.change_price(49000)
# 运行异步示例
# asyncio.run(async_main())
这个案例展示了:
- 基础实现:使用抽象类定义观察者和主题
- 实际应用:天气站、股票市场等现实场景
- Python特性:使用装饰器、属性等Python特性
- 异步支持:处理并发更新的场景
观察者模式的核心优势:
- 解耦:主题和观察者之间的松耦合
- 可扩展性:容易添加新的观察者
- 广播通信:一对多的通知机制
- 维护简单:修改一个观察者不影响其他