Python案例怎么实现观察者模式?

wen python案例 78

本文目录导读:

Python案例怎么实现观察者模式?

  1. 基础观察者模式实现
  2. 使用案例 - 股票市场监控系统
  3. 使用Python内置的观察者模式
  4. 完整使用示例
  5. 异步观察者模式(高级)

我来通过一个具体的案例来演示Python中观察者模式的实现。

基础观察者模式实现

from abc import ABC, abstractmethod
from typing import List, Any
# 观察者抽象类
class Observer(ABC):
    @abstractmethod
    def update(self, subject: 'Subject', *args, **kwargs) -> None:
        pass
# 主题抽象类
class Subject(ABC):
    @abstractmethod
    def attach(self, observer: Observer) -> None:
        pass
    @abstractmethod
    def detach(self, observer: Observer) -> None:
        pass
    @abstractmethod
    def notify(self, *args, **kwargs) -> None:
        pass
# 具体主题 - 天气站
class WeatherStation(Subject):
    def __init__(self):
        self._observers: List[Observer] = []
        self._temperature = 0
        self._humidity = 0
        self._pressure = 0
    def attach(self, observer: Observer) -> None:
        if observer not in self._observers:
            self._observers.append(observer)
    def detach(self, observer: Observer) -> None:
        try:
            self._observers.remove(observer)
        except ValueError:
            pass
    def notify(self, *args, **kwargs) -> None:
        for observer in self._observers:
            observer.update(self, *args, **kwargs)
    # 设置天气数据的方法
    def set_measurements(self, temperature: float, humidity: float, pressure: float) -> None:
        self._temperature = temperature
        self._humidity = humidity
        self._pressure = pressure
        self.measurements_changed()
    def measurements_changed(self) -> None:
        self.notify(self._temperature, self._humidity, self._pressure)
    # 获取数据的方法
    def get_temperature(self) -> float:
        return self._temperature
    def get_humidity(self) -> float:
        return self._humidity
    def get_pressure(self) -> float:
        return self._pressure
# 具体观察者 - 手机App
class PhoneApp(Observer):
    def __init__(self, name: str):
        self._name = name
    def update(self, subject: Subject, temperature: float, humidity: float, pressure: float) -> None:
        print(f"[{self._name}] 手机App更新天气信息:")
        print(f"  温度: {temperature}°C")
        print(f"  湿度: {humidity}%")
        print(f"  气压: {pressure}hPa")
        print()
# 具体观察者 - 显示屏
class DisplayScreen(Observer):
    def __init__(self, location: str):
        self._location = location
    def update(self, subject: Subject, temperature: float, humidity: float, pressure: float) -> None:
        print(f"[{self._location}] 天气显示屏更新:")
        print(f"  📍 位置: {self._location}")
        print(f"  🌡️ 温度: {temperature}°C")
        print(f"  💧 湿度: {humidity}%")
        print(f"  🌪️ 气压: {pressure}hPa")
        print()
# 具体观察者 - 警报系统
class AlertSystem(Observer):
    def __init__(self, threshold_temp: float = 35.0):
        self._threshold_temp = threshold_temp
    def update(self, subject: Subject, temperature: float, humidity: float, pressure: float) -> None:
        if temperature > self._threshold_temp:
            print(f"⚠️ 高温警报!温度达到 {temperature}°C,超过警戒线 {self._threshold_temp}°C")
        elif temperature < 0:
            print(f"❄️ 低温警报!温度降至 {temperature}°C")
        else:
            print(f"✅ 温度正常: {temperature}°C")
        print()

使用案例 - 股票市场监控系统

# 股票市场观察者模式案例
class Stock(Subject):
    def __init__(self, symbol: str, initial_price: float):
        self._symbol = symbol
        self._price = initial_price
        self._observers: List[Observer] = []
    def attach(self, observer: Observer) -> None:
        self._observers.append(observer)
        print(f"  {observer} 开始监控 {self._symbol}")
    def detach(self, observer: Observer) -> None:
        self._observers.remove(observer)
        print(f"  {observer} 停止监控 {self._symbol}")
    def notify(self, *args, **kwargs) -> None:
        for observer in self._observers:
            observer.update(self, *args, **kwargs)
    def change_price(self, new_price: float) -> None:
        old_price = self._price
        self._price = new_price
        change_percent = ((new_price - old_price) / old_price) * 100
        self.notify(old_price, new_price, change_percent)
    def get_symbol(self) -> str:
        return self._symbol
    def get_price(self) -> float:
        return self._price
# 投资者观察者
class Investor(Observer):
    def __init__(self, name: str, alert_threshold: float = 5.0):
        self._name = name
        self._alert_threshold = alert_threshold
    def __str__(self) -> str:
        return f"投资者 {self._name}"
    def update(self, subject: Stock, old_price: float, new_price: float, change_percent: float) -> None:
        action = "📈 上涨" if change_percent > 0 else "📉 下跌"
        print(f"[{self._name}] {subject.get_symbol()} {action} {abs(change_percent):.2f}%")
        print(f"  价格: ${old_price:.2f} → ${new_price:.2f}")
        if abs(change_percent) >= self._alert_threshold:
            print(f"  🚨 大幅波动!变化幅度超过 {self._alert_threshold}%")
        print()
# 自动化交易系统
class TradingBot(Observer):
    def __init__(self, bot_name: str, buy_threshold: float = -5.0, sell_threshold: float = 5.0):
        self._bot_name = bot_name
        self._buy_threshold = buy_threshold
        self._sell_threshold = sell_threshold
    def __str__(self) -> str:
        return f"交易机器人 {self._bot_name}"
    def update(self, subject: Stock, old_price: float, new_price: float, change_percent: float) -> None:
        print(f"[{self._bot_name}] 分析 {subject.get_symbol()}:")
        if change_percent <= self._buy_threshold:
            print(f"  🟢 触发买入信号!跌幅 {abs(change_percent):.2f}%")
            print(f"  建议买入 {subject.get_symbol()}")
        elif change_percent >= self._sell_threshold:
            print(f"  🔴 触发卖出信号!涨幅 {change_percent:.2f}%")
            print(f"  建议卖出 {subject.get_symbol()}")
        else:
            print(f"  ⚪ 价格变动在正常范围内")
        print()

使用Python内置的观察者模式

# Python 使用内置特性实现观察者模式
class EventSystem:
    """使用Python的__call__和列表实现事件系统"""
    def __init__(self):
        self._events = {}
    def on(self, event_name: str, callback):
        """注册事件监听器"""
        if event_name not in self._events:
            self._events[event_name] = []
        self._events[event_name].append(callback)
        return callback  # 返回以便可以取消注册
    def off(self, event_name: str, callback):
        """取消注册事件监听器"""
        if event_name in self._events:
            try:
                self._events[event_name].remove(callback)
            except ValueError:
                pass
    def emit(self, event_name: str, *args, **kwargs):
        """触发事件"""
        if event_name in self._events:
            for callback in self._events[event_name]:
                callback(*args, **kwargs)
# 使用装饰器实现观察者模式
class Observable:
    """使用装饰器和属性实现的观察者模式"""
    def __init__(self):
        self._observers = []
    def register(self, observer):
        self._observers.append(observer)
    def unregister(self, observer):
        self._observers.remove(observer)
    def notify_observers(self, *args, **kwargs):
        for observer in self._observers:
            observer(*args, **kwargs)
# 带属性的观察者装饰器
def observable_property(func):
    """装饰器:使属性变化时可以通知观察者"""
    def wrapper(self, *args, **kwargs):
        old_value = func.__get__(self) if hasattr(func, '__get__') else None
        result = func(self, *args, **kwargs)
        if hasattr(self, '_observers') and old_value != result:
            self.notify_observers(func.__name__, old_value, result)
        return result
    return wrapper

完整使用示例

def main():
    print("=" * 50)
    print("天气站观察者模式案例")
    print("=" * 50)
    # 创建天气站
    weather_station = WeatherStation()
    # 创建观察者
    phone_app = PhoneApp("iPhone")
    display = DisplayScreen("城市广场")
    alert = AlertSystem(threshold_temp=30.0)
    # 注册观察者
    weather_station.attach(phone_app)
    weather_station.attach(display)
    weather_station.attach(alert)
    # 更新天气数据
    print("第一次天气更新:")
    weather_station.set_measurements(25.5, 60, 1013)
    print("第二次天气更新(触发警报):")
    weather_station.set_measurements(35.0, 45, 1008)
    # 移除一个观察者
    print("移除手机App观察者:")
    weather_station.detach(phone_app)
    print("第三次天气更新(只有显示屏和警报):")
    weather_station.set_measurements(28.0, 55, 1015)
    print("\n" + "=" * 50)
    print("股票市场监控系统案例")
    print("=" * 50)
    # 创建股票
    apple_stock = Stock("AAPL", 150.0)
    google_stock = Stock("GOOGL", 2800.0)
    # 创建投资者
    investor1 = Investor("张三", alert_threshold=3.0)
    investor2 = Investor("李四", alert_threshold=5.0)
    trading_bot = TradingBot("AlphaBot", buy_threshold=-4.0, sell_threshold=4.0)
    # 注册观察者
    apple_stock.attach(investor1)
    apple_stock.attach(investor2)
    apple_stock.attach(trading_bot)
    google_stock.attach(investor1)
    # 模拟价格变动
    print("AAPL 价格变动:")
    apple_stock.change_price(155.0)
    print("GOOGL 价格变动:")
    google_stock.change_price(2700.0)
    print("AAPL 大幅下跌:")
    apple_stock.change_price(142.0)
    print("\n" + "=" * 50)
    print("事件系统案例")
    print("=" * 50)
    # 使用事件系统
    event_system = EventSystem()
    # 定义事件处理函数
    def on_user_login(username):
        print(f"用户 {username} 登录成功")
    def on_user_logout(username):
        print(f"用户 {username} 登出")
    def send_email_notification(username):
        print(f"发送邮件通知: {username} 登录")
    # 注册事件
    event_system.on('user_login', on_user_login)
    event_system.on('user_login', send_email_notification)
    event_system.on('user_logout', on_user_logout)
    # 触发事件
    event_system.emit('user_login', '张三')
    event_system.emit('user_logout', '张三')
    print("\n移除邮件通知后:")
    event_system.off('user_login', send_email_notification)
    event_system.emit('user_login', '李四')
if __name__ == "__main__":
    main()

异步观察者模式(高级)

import asyncio
from typing import Set
from abc import ABC, abstractmethod
# 异步观察者模式
class AsyncObserver(ABC):
    @abstractmethod
    async def update(self, subject: 'AsyncSubject', *args, **kwargs) -> None:
        pass
class AsyncSubject(ABC):
    def __init__(self):
        self._observers: Set[AsyncObserver] = set()
    def attach(self, observer: AsyncObserver) -> None:
        self._observers.add(observer)
    def detach(self, observer: AsyncObserver) -> None:
        self._observers.discard(observer)
    async def notify(self, *args, **kwargs) -> None:
        tasks = [observer.update(self, *args, **kwargs) for observer in self._observers]
        await asyncio.gather(*tasks)
# 异步使用示例
class AsyncStock(AsyncSubject):
    def __init__(self, symbol: str, initial_price: float):
        super().__init__()
        self.symbol = symbol
        self.price = initial_price
    async def change_price(self, new_price: float) -> None:
        old_price = self.price
        self.price = new_price
        await self.notify(self.symbol, old_price, new_price)
class AsyncTrader(AsyncObserver):
    def __init__(self, name: str):
        self.name = name
    async def update(self, subject: AsyncStock, symbol: str, old_price: float, new_price: float) -> None:
        await asyncio.sleep(0.1)  # 模拟异步操作
        print(f"[{self.name}] 异步更新: {symbol} ${old_price:.2f} → ${new_price:.2f}")
async def async_main():
    stock = AsyncStock("BTC", 50000)
    trader1 = AsyncTrader("矿工")
    trader2 = AsyncTrader("投资者")
    stock.attach(trader1)
    stock.attach(trader2)
    print("异步价格更新:")
    await stock.change_price(51000)
    await stock.change_price(49000)
# 运行异步示例
# asyncio.run(async_main())

这个案例展示了:

  1. 基础实现:使用抽象类定义观察者和主题
  2. 实际应用:天气站、股票市场等现实场景
  3. Python特性:使用装饰器、属性等Python特性
  4. 异步支持:处理并发更新的场景

观察者模式的核心优势:

  • 解耦:主题和观察者之间的松耦合
  • 可扩展性:容易添加新的观察者
  • 广播通信:一对多的通知机制
  • 维护简单:修改一个观察者不影响其他

抱歉,评论功能暂时关闭!