观察者模式
High Contrast
Dark Mode
Light Mode
Sepia
Forest
3 min read540 words

观察者模式

观察者模式(Observer Pattern)定义对象间的一种一对多的依赖关系,当一个对象的状态发生改变时,所有依赖于它的对象都得到通知并自动更新。

问题定义

场景1:天气站

# ❌ 问题:紧密耦合
class WeatherStation:
"""天气站"""
def __init__(self):
self.temperature = 0
self.humidity = 0
self.phone_display = None
self.web_display = None
def set_measurements(self, temperature, humidity):
self.temperature = temperature
self.humidity = humidity
# 问题:直接调用所有显示设备
if self.phone_display:
self.phone_display.update(temperature, humidity)
if self.web_display:
self.web_display.update(temperature, humidity)
# 添加新显示设备需要修改这里!
class PhoneDisplay:
def update(self, temperature, humidity):
print(f"[手机] 温度: {temperature}°C, 湿度: {humidity}%")
class WebDisplay:
def update(self, temperature, humidity):
print(f"[网页] 温度: {temperature}°C, 湿度: {humidity}%")
# 问题:每次添加新显示设备都要修改WeatherStation

场景2:股票监控

# ❌ 问题:通知逻辑分散
class Stock:
"""股票"""
def __init__(self, symbol, price):
self.symbol = symbol
self.price = price
self.investors = []  # 投资者列表
def update_price(self, new_price):
self.price = new_price
# 问题:直接遍历投资者通知
for investor in self.investors:
investor.on_price_change(self.symbol, self.price)
class Investor:
def on_price_change(self, symbol, price):
print(f"投资者收到通知: {symbol} 价格变为 ${price}")

解决方案

观察者模式将主题(被观察者)和观察者解耦,主题维护观察者列表,状态变化时通知所有观察者。

classDiagram class Subject { <> +attach(Observer) +detach(Observer) +notify() } class Observer { <> +update() } class ConcreteSubject { -state -observers: Observer[] +attach(Observer) +detach(Observer) +notify() +getState() +setState() } class ConcreteObserver { -subject: Subject +update() } Subject <|-- ConcreteSubject Observer <|-- ConcreteObserver ConcreteSubject o-- Observer : has many ConcreteObserver ..> ConcreteSubject : observes

标准实现

观察者接口

from abc import ABC, abstractmethod
class Observer(ABC):
"""观察者接口"""
@abstractmethod
def update(self, temperature, humidity):
"""更新"""
pass
class Subject(ABC):
"""主题接口"""
@abstractmethod
def attach(self, observer: Observer):
"""添加观察者"""
pass
@abstractmethod
def detach(self, observer: Observer):
"""移除观察者"""
pass
@abstractmethod
def notify(self):
"""通知所有观察者"""
pass

具体主题

class WeatherStation(Subject):
"""天气站 - 主题"""
def __init__(self):
self._observers = []
self._temperature = 0
self._humidity = 0
def attach(self, observer: Observer):
"""添加观察者"""
if observer not in self._observers:
self._observers.append(observer)
def detach(self, observer: Observer):
"""移除观察者"""
if observer in self._observers:
self._observers.remove(observer)
def notify(self):
"""通知所有观察者"""
for observer in self._observers:
observer.update(self._temperature, self._humidity)
def set_measurements(self, temperature, humidity):
"""更新测量数据"""
self._temperature = temperature
self._humidity = humidity
self.notify()
@property
def temperature(self):
return self._temperature
@property
def humidity(self):
return self._humidity

具体观察者

class PhoneDisplay(Observer):
"""手机显示 - 观察者"""
def __init__(self, weather_station: WeatherStation):
self.weather_station = weather_station
self.weather_station.attach(self)
def update(self, temperature, humidity):
print(f"[手机] 温度: {temperature}°C, 湿度: {humidity}%")
def detach(self):
"""取消订阅"""
self.weather_station.detach(self)
class WebDisplay(Observer):
"""网页显示 - 观察者"""
def __init__(self, weather_station: WeatherStation):
self.weather_station = weather_station
self.weather_station.attach(self)
def update(self, temperature, humidity):
print(f"[网页] 温度: {temperature}°C, 湿度: {humidity}%")
def detach(self):
"""取消订阅"""
self.weather_station.detach(self)

客户端使用

# 创建主题
weather_station = WeatherStation()
# 创建观察者(自动注册)
phone_display = PhoneDisplay(weather_station)
web_display = WebDisplay(weather_station)
# 更新数据,通知所有观察者
print("第一次更新:")
weather_station.set_measurements(25, 60)
# [手机] 温度: 25°C, 湿度: 60%
# [网页] 温度: 25°C, 湿度: 60%
print("\n第二次更新:")
weather_station.set_measurements(28, 55)
# [手机] 温度: 28°C, 湿度: 55%
# [网页] 温度: 28°C, 湿度: 55%
print("\n取消手机显示订阅:")
phone_display.detach()
print("\n第三次更新(只有网页收到通知):")
weather_station.set_measurements(30, 50)
# [网页] 温度: 30°C, 湿度: 50%

实战应用

应用1:股票交易系统

from abc import ABC, abstractmethod
class Observer(ABC):
@abstractmethod
def update(self, symbol, price):
pass
class Subject(ABC):
@abstractmethod
def attach(self, observer: Observer):
pass
@abstractmethod
def detach(self, observer: Observer):
pass
@abstractmethod
def notify(self):
pass
class Stock(Subject):
"""股票 - 主题"""
def __init__(self, symbol, price):
self.symbol = symbol
self._price = price
self._observers = []
def attach(self, observer: Observer):
if observer not in self._observers:
self._observers.append(observer)
def detach(self, observer: Observer):
if observer in self._observers:
self._observers.remove(observer)
def notify(self):
for observer in self._observers:
observer.update(self.symbol, self._price)
@property
def price(self):
return self._price
@price.setter
def price(self, new_price):
if self._price != new_price:
self._price = new_price
self.notify()
class Investor(Observer):
"""投资者 - 观察者"""
def __init__(self, name):
self.name = name
self.stocks = {}
def subscribe(self, stock: Stock):
stock.attach(self)
self.stocks[stock.symbol] = stock
def unsubscribe(self, stock: Stock):
stock.detach(self)
del self.stocks[stock.symbol]
def update(self, symbol, price):
print(f"[{self.name}] {symbol} 价格变为 ${price}")
# 简单的交易策略
if price < 50:
print(f"[{self.name}] 买入 {symbol}!")
elif price > 100:
print(f"[{self.name}] 卖出 {symbol}!")
# 使用
apple_stock = Stock("AAPL", 95)
tesla_stock = Stock("TSLA", 200)
# 创建投资者
alice = Investor("Alice")
bob = Investor("Bob")
# Alice关注苹果和特斯拉
alice.subscribe(apple_stock)
alice.subscribe(tesla_stock)
# Bob只关注苹果
bob.subscribe(apple_stock)
# 价格变化
print("苹果股票价格上涨:")
apple_stock.price = 105
# [Alice] AAPL 价格变为 $105
# [Bob] AAPL 价格变为 $105
print("\n特斯拉股票价格下跌:")
tesla_stock.price = 45
# [Alice] TSLA 价格变为 $45
# [Alice] 买入 TSLA!
print("\nBob取消关注苹果:")
bob.unsubscribe(apple_stock)
print("\n苹果股票价格再次变化:")
apple_stock.price = 110
# [Alice] AAPL 价格变为 $110
# Bob不再收到通知

应用2:YouTube订阅系统

from abc import ABC, abstractmethod
from datetime import datetime
class Observer(ABC):
@abstractmethod
def update(self, video_title, channel):
pass
class Subject(ABC):
@abstractmethod
def attach(self, observer: Observer):
pass
@abstractmethod
def detach(self, observer: Observer):
pass
@abstractmethod
def notify(self):
pass
class YouTubeChannel(Subject):
"""YouTube频道 - 主题"""
def __init__(self, name):
self.name = name
self._subscribers = []
self._latest_video = None
def attach(self, observer: Observer):
if observer not in self._subscribers:
self._subscribers.append(observer)
print(f"✅ {observer.name} 订阅了 {self.name}")
def detach(self, observer: Observer):
if observer in self._subscribers:
self._subscribers.remove(observer)
print(f"❌ {observer.name} 取消订阅了 {self.name}")
def notify(self):
for subscriber in self._subscribers:
subscriber.update(self._latest_video, self.name)
def upload_video(self, title):
"""上传新视频"""
self._latest_video = title
print(f"\n📺 {self.name} 上传了新视频: {title}")
print(f"🕐 时间: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")
self.notify()
class Subscriber(Observer):
"""订阅者 - 观察者"""
def __init__(self, name):
self.name = name
self._notifications = []
def update(self, video_title, channel):
notification = f"{channel} 上传了 {video_title}"
self._notifications.append(notification)
print(f"🔔 {self.name} 收到通知: {notification}")
def view_notifications(self):
print(f"\n{self.name} 的通知:")
for note in self._notifications:
print(f"  - {note}")
# 使用
channel = YouTubeChannel("Python编程频道")
# 订阅者
alice = Subscriber("Alice")
bob = Subscriber("Bob")
charlie = Subscriber("Charlie")
# 订阅
channel.attach(alice)
channel.attach(bob)
channel.attach(charlie)
# 上传视频
channel.upload_video("Python设计模式:单例模式")
channel.upload_video("Python设计模式:工厂方法模式")
# 有人取消订阅
print("\n--- 有人取消订阅 ---")
channel.detach(bob)
# 再次上传
channel.upload_video("Python设计模式:观察者模式")
# 查看通知历史
alice.view_notifications()
charlie.view_notifications()

应用3:事件驱动架构

from abc import ABC, abstractmethod
from typing import Dict, List, Callable
class EventEmitter:
"""事件发射器 - 发布者"""
def __init__(self):
self._events: Dict[str, List[Callable]] = {}
def on(self, event_name: str, callback: Callable):
"""注册事件监听器"""
if event_name not in self._events:
self._events[event_name] = []
self._events[event_name].append(callback)
print(f"✅ 注册监听器: {event_name}")
def off(self, event_name: str, callback: Callable):
"""移除事件监听器"""
if event_name in self._events:
if callback in self._events[event_name]:
self._events[event_name].remove(callback)
print(f"❌ 移除监听器: {event_name}")
def emit(self, event_name: str, *args, **kwargs):
"""触发事件"""
if event_name in self._events:
print(f"\n🔥 触发事件: {event_name}")
for callback in self._events[event_name]:
callback(*args, **kwargs)
class Application(EventEmitter):
"""应用程序"""
def __init__(self):
super().__init__()
self._running = False
def start(self):
"""启动应用"""
self._running = True
print("\n🚀 应用启动")
self.emit("app:start")
def stop(self):
"""停止应用"""
self._running = False
print("\n🛑 应用停止")
self.emit("app:stop")
def request(self, url):
"""处理请求"""
print(f"\n🌐 处理请求: {url}")
self.emit("request:received", url)
self.emit("request:completed", url, 200)
# 事件处理器
def on_app_start():
print("  → 清理缓存")
print("  → 初始化数据库")
def on_app_stop():
print("  → 保存日志")
print("  → 关闭连接")
def on_request_received(url):
print(f"  → 验证请求: {url}")
def on_request_completed(url, status_code):
print(f"  → 请求完成: {url} - {status_code}")
def log_request(url, status_code):
print(f"  → 记录日志: {url} -> {status_code}")
# 使用
app = Application()
# 注册事件处理器
app.on("app:start", on_app_start)
app.on("app:stop", on_app_stop)
app.on("request:received", on_request_received)
app.on("request:completed", on_request_completed)
app.on("request:completed", log_request)
# 运行应用
app.start()
app.request("/api/users")
app.request("/api/posts")
app.stop()

推模式 vs 拉模式

推模式(Push)

主题将数据推送给观察者:

class PushObserver(Observer):
def update(self, temperature, humidity):
# 主题直接推送数据
print(f"温度: {temperature}, 湿度: {humidity}")

拉模式(Pull)

观察者从主题拉取数据:

class PullObserver(Observer):
def __init__(self, subject: Subject):
self.subject = subject
def update(self):
# 从主题拉取数据
temp = self.subject.temperature
humidity = self.subject.humidity
print(f"温度: {temp}, 湿度: {humidity}")
特性 推模式 拉模式
数据传递 主题主动推送 观察者主动拉取
耦合度
灵活性
适用场景 数据固定 数据复杂/可选

优缺点

✅ 优点

优点 说明
松耦合 主题和观察者解耦
动态关系 运行时添加/移除观察者
广播通信 一对多通知
易于扩展 新增观察者不影响主题

❌ 缺点

缺点 说明
性能 观察者太多可能影响性能
顺序 通知顺序不可控
循环依赖 可能导致循环调用
调试困难 难以追踪通知流程

与其他模式的关系

模式 关系
中介者模式 中介者控制通知,观察者由主题直接通知
发布-订阅 发布-订阅是观察者的变体,增加消息队列

本章要点


下一步策略模式 🚀