Webhook 与多渠道统一入口
High Contrast
Dark Mode
Light Mode
Sepia
Forest
2 min read342 words

Webhook 与多渠道统一入口

Telegram、WhatsApp、网页聊天——维护三套代码是噩梦。用统一消息路由层,一套 Claude 逻辑同时服务所有渠道。

多渠道统一架构

graph TD TG[Telegram Webhook] --> ROUTER[统一消息路由器] WA[WhatsApp Webhook] --> ROUTER WEB[网页 WebSocket] --> ROUTER SLACK[Slack Event] --> ROUTER ROUTER --> NORMALIZE[消息标准化] NORMALIZE --> SESSION[会话管理] SESSION --> CLAUDE[Claude API] CLAUDE --> RESPONSE[回复生成] RESPONSE --> DISPATCH[渠道分发器] DISPATCH --> TG_OUT[Telegram 回复] DISPATCH --> WA_OUT[WhatsApp 回复] DISPATCH --> WEB_OUT[网页回复] DISPATCH --> SLACK_OUT[Slack 回复] style ROUTER fill:#c8e6c9,stroke:#388e3c,stroke-width:2px style CLAUDE fill:#e3f2fd,stroke:#1565c0,stroke-width:2px

统一消息路由实现

"""
多渠道统一 AI 消息网关
支持: Telegram / WhatsApp / Web / Slack
核心思路: 标准化消息格式 → 统一 Claude 处理 → 渠道适配回复
"""
import os
from dataclasses import dataclass, field
from enum import Enum
from collections import defaultdict
import anthropic
class Channel(Enum):
TELEGRAM = "telegram"
WHATSAPP = "whatsapp"
WEB = "web"
SLACK = "slack"
class MessageRole(Enum):
USER = "user"
ASSISTANT = "assistant"
SYSTEM = "system"
@dataclass
class UnifiedMessage:
"""跨渠道标准化消息格式"""
channel: Channel
channel_user_id: str      # 渠道内的用户标识
display_name: str
text: str
has_image: bool = False
image_url: str = ""
raw_payload: dict = field(default_factory=dict)
@property
def session_key(self) -> str:
"""全局唯一会话键"""
return f"{self.channel.value}:{self.channel_user_id}"
@dataclass
class UnifiedReply:
"""标准化回复格式"""
text: str
channel: Channel
target_id: str            # 渠道内的回复目标(chat_id / phone / user_id)
buttons: list[str] = field(default_factory=list)   # 快速回复按钮
parse_mode: str = "markdown"                        # text / markdown / html
@dataclass
class ChannelSession:
"""用户会话(跨渠道共享)"""
session_key: str
user_name: str
messages: list[dict] = field(default_factory=list)
channel_history: list[Channel] = field(default_factory=list)  # 用过哪些渠道
def add(self, role: str, content: str):
self.messages.append({"role": role, "content": content})
if len(self.messages) > 20:
self.messages = self.messages[-20:]
class ChannelAdapter:
"""渠道适配器基类"""
def parse(self, raw: dict) -> UnifiedMessage | None:
raise NotImplementedError
def format_reply(self, reply: UnifiedReply) -> dict:
raise NotImplementedError
class TelegramAdapter(ChannelAdapter):
"""Telegram 消息适配"""
def parse(self, raw: dict) -> UnifiedMessage | None:
msg = raw.get("message", {})
if not msg:
return None
user = msg.get("from", {})
return UnifiedMessage(
channel=Channel.TELEGRAM,
channel_user_id=str(user.get("id", "")),
display_name=user.get("first_name", "用户"),
text=msg.get("text", ""),
has_image="photo" in msg,
raw_payload=raw,
)
def format_reply(self, reply: UnifiedReply) -> dict:
payload = {
"chat_id": reply.target_id,
"text": reply.text,
"parse_mode": "Markdown" if reply.parse_mode == "markdown" else "HTML",
}
if reply.buttons:
payload["reply_markup"] = {
"keyboard": [[{"text": btn}] for btn in reply.buttons],
"resize_keyboard": True,
"one_time_keyboard": True,
}
return payload
class WhatsAppAdapter(ChannelAdapter):
"""WhatsApp Business API 消息适配"""
def parse(self, raw: dict) -> UnifiedMessage | None:
try:
value = raw["entry"][0]["changes"][0]["value"]
msg = value["messages"][0]
contact = value["contacts"][0]
text = msg.get("text", {}).get("body", "")
if msg.get("type") == "interactive":
text = msg["interactive"]["button_reply"]["title"]
return UnifiedMessage(
channel=Channel.WHATSAPP,
channel_user_id=msg["from"],
display_name=contact["profile"]["name"],
text=text,
has_image=msg.get("type") == "image",
raw_payload=raw,
)
except (KeyError, IndexError):
return None
def format_reply(self, reply: UnifiedReply) -> dict:
if reply.buttons:
return {
"messaging_product": "whatsapp",
"to": reply.target_id,
"type": "interactive",
"interactive": {
"type": "button",
"body": {"text": reply.text},
"action": {
"buttons": [
{"type": "reply", "reply": {"id": f"b{i}", "title": b}}
for i, b in enumerate(reply.buttons[:3])
]
},
},
}
return {
"messaging_product": "whatsapp",
"to": reply.target_id,
"type": "text",
"text": {"body": reply.text},
}
class SlackAdapter(ChannelAdapter):
"""Slack Event API 消息适配"""
def parse(self, raw: dict) -> UnifiedMessage | None:
event = raw.get("event", {})
if event.get("type") != "message" or event.get("bot_id"):
return None
return UnifiedMessage(
channel=Channel.SLACK,
channel_user_id=event.get("user", ""),
display_name=event.get("username", "Slack用户"),
text=event.get("text", ""),
raw_payload=raw,
)
def format_reply(self, reply: UnifiedReply) -> dict:
blocks = [{"type": "section", "text": {"type": "mrkdwn", "text": reply.text}}]
if reply.buttons:
blocks.append({
"type": "actions",
"elements": [
{"type": "button", "text": {"type": "plain_text", "text": btn}, "value": btn}
for btn in reply.buttons
],
})
return {"channel": reply.target_id, "blocks": blocks}
class UnifiedMessageGateway:
"""统一消息网关 — 核心路由引擎"""
SYSTEM_PROMPT = """你是一个全渠道智能助手,通过 Telegram/WhatsApp/Web 与用户沟通。
回复要求:
- 简洁:每条消息 150 字以内
- 适应渠道:WhatsApp 用表情符号,Slack 用 @mention,Web 可以更详细
- 中英混用时,跟随用户语言
"""
def __init__(self, anthropic_api_key: str):
self.claude = anthropic.Anthropic(api_key=anthropic_api_key)
self.sessions: dict[str, ChannelSession] = defaultdict(
lambda: ChannelSession(session_key="", user_name="")
)
self.adapters: dict[Channel, ChannelAdapter] = {
Channel.TELEGRAM: TelegramAdapter(),
Channel.WHATSAPP: WhatsAppAdapter(),
Channel.SLACK: SlackAdapter(),
}
def route(self, channel: Channel, raw_payload: dict) -> UnifiedReply | None:
"""主路由方法:原始 Webhook → 标准回复"""
adapter = self.adapters.get(channel)
if not adapter:
return None
msg = adapter.parse(raw_payload)
if not msg or not msg.text:
return None
# 获取/创建会话
session = self.sessions[msg.session_key]
session.session_key = msg.session_key
session.user_name = msg.display_name
if channel not in session.channel_history:
session.channel_history.append(channel)
# 系统提示加入渠道信息
system = self.SYSTEM_PROMPT + f"\n当前渠道: {channel.value}"
# 调用 Claude
session.add("user", msg.text)
try:
response = self.claude.messages.create(
model="claude-haiku-4-5-20251001",
max_tokens=300,
system=system,
messages=session.messages,
)
reply_text = response.content[0].text
session.add("assistant", reply_text)
except anthropic.APIError:
reply_text = "系统暂时繁忙,请稍后重试。"
return UnifiedReply(
text=reply_text,
channel=channel,
target_id=msg.channel_user_id,
)
def get_formatted_reply(self, channel: Channel, raw_payload: dict) -> dict | None:
"""返回适合该渠道的格式化回复"""
reply = self.route(channel, raw_payload)
if not reply:
return None
adapter = self.adapters.get(channel)
return adapter.format_reply(reply) if adapter else None
# 演示
def demo():
gateway = UnifiedMessageGateway(
anthropic_api_key=os.environ.get("ANTHROPIC_API_KEY", "demo"),
)
print("=== 多渠道统一网关演示 ===\n")
# 模拟不同渠道同一用户的消息
test_cases = [
(Channel.TELEGRAM, {
"message": {
"from": {"id": 12345, "first_name": "Alice"},
"text": "Python 中怎么读取 JSON 文件?",
}
}),
(Channel.WHATSAPP, {
"entry": [{"changes": [{"value": {
"messages": [{"from": "60123456789", "type": "text",
"text": {"body": "How to read JSON in Python?"},
"timestamp": "1711180800"}],
"contacts": [{"profile": {"name": "Alice"}}],
}}]}]
}),
(Channel.SLACK, {
"event": {
"type": "message",
"user": "U12345",
"username": "Alice",
"text": "Python JSON 读取方法",
}
}),
]
for channel, payload in test_cases:
reply = gateway.route(channel, payload)
if reply:
adapter = gateway.adapters[channel]
formatted = adapter.format_reply(reply)
print(f"[{channel.value.upper()}]")
print(f"  回复文本: {reply.text[:80]}{'...' if len(reply.text) > 80 else ''}")
print(f"  格式化结构: {list(formatted.keys())}")
print()
# 展示会话跨渠道情况
print("=== 跨渠道会话追踪 ===")
for key, session in gateway.sessions.items():
print(f"用户: {session.user_name}")
print(f"  使用渠道: {[c.value for c in session.channel_history]}")
print(f"  消息条数: {len(session.messages)}")
demo()

渠道特性对比

特性 Telegram WhatsApp Slack Web Chat
按钮/交互 InlineKeyboard Interactive Buttons Block Kit 自定义 UI
消息长度限制 4096 字符 4096 字符 3000 字符 无限制
图片/文件 ✅ 支持 ✅ 支持 ✅ 支持 ✅ 支持
Webhook 验证 Bot Token HMAC-SHA256 Signing Secret 自定义
月活用户 9 亿 20 亿 3200 万企业 按网站流量

行动清单

下一章07-Gemini-CLI与OpenCode-CLI/01-Gemini-CLI安装与工作流 — 用 Gemini CLI 补充 Claude Code 的能力短板。