工具调用审计日志与可观测性
在生产环境中使用 MCP,你需要回答这些问题:AI 调用了哪些工具?传入了什么参数?操作了哪些数据?发生了什么错误?本节建立完整的可观测性体系。
审计日志的三个层次
graph TD
A[可观测性层次] --> B["第一层:工具调用日志
谁调用了什么工具,传了什么参数"] A --> C["第二层:业务影响日志
工具调用影响了哪些数据,改变了什么"] A --> D["第三层:异常和性能日志
哪些调用失败,耗时多少"] B --> B1["适合:所有场景,最基础的合规要求"] C --> C1["适合:数据库写入、发送通知等有副作用的操作"] D --> D1["适合:生产环境性能优化和故障排查"]
谁调用了什么工具,传了什么参数"] A --> C["第二层:业务影响日志
工具调用影响了哪些数据,改变了什么"] A --> D["第三层:异常和性能日志
哪些调用失败,耗时多少"] B --> B1["适合:所有场景,最基础的合规要求"] C --> C1["适合:数据库写入、发送通知等有副作用的操作"] D --> D1["适合:生产环境性能优化和故障排查"]
在 MCP Server 中实现审计日志
基础版:结构化日志
# audit_logger.py
import json
import time
import datetime
from pathlib import Path
from typing import Any, Optional
class AuditLogger:
def __init__(self, log_file: str):
self.log_file = Path(log_file)
self.log_file.parent.mkdir(parents=True, exist_ok=True)
def log_tool_call(
self,
tool_name: str,
arguments: dict,
result: Any,
duration_ms: float,
error: Optional[str] = None,
session_id: Optional[str] = None
):
entry = {
"timestamp": datetime.datetime.utcnow().isoformat() + "Z",
"session_id": session_id,
"tool": tool_name,
"arguments": self._sanitize_args(arguments),
"success": error is None,
"duration_ms": round(duration_ms, 2),
"error": error,
"result_size": len(str(result)) if result else 0,
}
with open(self.log_file, "a", encoding="utf-8") as f:
f.write(json.dumps(entry, ensure_ascii=False) + "\n")
def _sanitize_args(self, args: dict) -> dict:
"""过滤敏感参数,避免密码、token 记录到日志"""
SENSITIVE_KEYS = {"password", "token", "secret", "api_key", "authorization"}
return {
k: "***" if k.lower() in SENSITIVE_KEYS else v
for k, v in args.items()
}
# 在 MCP Server 中使用
audit = AuditLogger("/var/log/mcp-audit/tools.jsonl")
@app.call_tool()
async def call_tool(name: str, arguments: dict):
start_time = time.time()
error = None
result = None
try:
result = await execute_tool(name, arguments)
return result
except Exception as e:
error = str(e)
raise
finally:
duration_ms = (time.time() - start_time) * 1000
audit.log_tool_call(
tool_name=name,
arguments=arguments,
result=result,
duration_ms=duration_ms,
error=error
)
日志格式示例
{"timestamp":"2026-03-22T09:15:23.456Z","session_id":null,"tool":"read_file","arguments":{"path":"/Users/user/ai-workspace/report.md"},"success":true,"duration_ms":12.34,"error":null,"result_size":4567}
{"timestamp":"2026-03-22T09:15:45.789Z","session_id":null,"tool":"write_file","arguments":{"path":"/Users/user/ai-workspace/output/summary.md","content":"***"},"success":true,"duration_ms":5.67,"error":null,"result_size":28}
{"timestamp":"2026-03-22T09:16:02.123Z","session_id":null,"tool":"read_query","arguments":{"sql":"SELECT * FROM orders WHERE date > '2026-03-01'"},"success":false,"duration_ms":234.56,"error":"database is locked","result_size":0}
业务影响追踪
对于有副作用的操作,记录"操作了什么"而非仅"调用了什么":
# 数据库写操作:记录影响的行数和具体变更
async def log_db_write(operation: str, table: str, affected_rows: int, sample_ids: list):
await audit.log_business_impact({
"operation": operation, # "INSERT" / "UPDATE" / "DELETE"
"table": table,
"affected_rows": affected_rows,
"sample_ids": sample_ids[:5] # 只记录前5个ID,不记录完整数据
})
# 文件写入:记录文件路径和大小变化
async def log_file_write(path: str, old_size: Optional[int], new_size: int):
await audit.log_business_impact({
"operation": "write_file",
"path": str(Path(path).relative_to(Path.home())), # 相对路径,避免泄露绝对路径
"old_size_bytes": old_size,
"new_size_bytes": new_size,
"is_new_file": old_size is None
})
日志分析:常见查询
使用 jq 快速分析 JSONL 格式的审计日志:
# 查看今天失败的工具调用
cat tools.jsonl | jq 'select(.success == false and (.timestamp | startswith("2026-03-22")))'
# 统计各工具调用次数
cat tools.jsonl | jq -r '.tool' | sort | uniq -c | sort -rn
# 找出耗时超过1秒的调用
cat tools.jsonl | jq 'select(.duration_ms > 1000)'
# 统计每个工具的平均耗时
cat tools.jsonl | jq -r '[.tool, .duration_ms] | @csv' | \
awk -F',' '{sum[$1]+=$2; count[$1]++} END {for(k in sum) print sum[k]/count[k], k}' | \
sort -rn
性能监控
跟踪工具调用的性能趋势,及时发现退化:
from collections import defaultdict
from dataclasses import dataclass, field
from typing import Dict, List
@dataclass
class ToolMetrics:
call_count: int = 0
error_count: int = 0
total_duration_ms: float = 0.0
durations: List[float] = field(default_factory=list)
@property
def error_rate(self) -> float:
return self.error_count / self.call_count if self.call_count > 0 else 0
@property
def avg_duration_ms(self) -> float:
return self.total_duration_ms / self.call_count if self.call_count > 0 else 0
@property
def p95_duration_ms(self) -> float:
if not self.durations:
return 0
sorted_d = sorted(self.durations)
idx = int(len(sorted_d) * 0.95)
return sorted_d[idx]
# 内存中的实时指标
metrics: Dict[str, ToolMetrics] = defaultdict(ToolMetrics)
def update_metrics(tool_name: str, duration_ms: float, success: bool):
m = metrics[tool_name]
m.call_count += 1
m.total_duration_ms += duration_ms
m.durations.append(duration_ms)
if not success:
m.error_count += 1
# 告警:错误率超过10%
if m.call_count >= 10 and m.error_rate > 0.1:
print(f"⚠️ 告警:{tool_name} 错误率 {m.error_rate:.1%}", file=__import__('sys').stderr)
告警规则建议
| 指标 | 告警阈值 | 通知方式 |
|---|---|---|
| 工具错误率 | > 10%(最近100次) | 日志告警 |
| 单次调用耗时 | > 30 秒 | 立即记录 |
| 破坏性工具调用 | 每次执行 | 写入专用告警日志 |
| 日志文件大小 | > 100 MB | 触发轮转 |
日志轮转与保留策略
# /etc/logrotate.d/mcp-audit
/var/log/mcp-audit/*.jsonl {
daily # 每天轮转
rotate 90 # 保留90天
compress # gzip 压缩
delaycompress
missingok # 文件不存在不报错
notifempty # 空文件不轮转
create 640 youruser yourgroup
}
本节执行清单
- [ ] 在 MCP Server 中实现基础的工具调用日志(timestamp、tool、args、duration、error)
- [ ] 对 arguments 中的敏感字段(password、token)做脱敏处理
- [ ] 定期分析日志:识别高频工具、高错误率工具、性能瓶颈
- [ ] 生产环境设置日志轮转,避免磁盘占满
下一节:生产环境部署、版本管理与监控