附录E.6:DeepSeek API监控脚本
概述
DeepSeek API监控脚本用于定期查询DeepSeek生成引擎对特定内容的引用情况,帮助工程师追踪品牌、产品、关键词在DeepSeek回答中的出现频率、上下文和变化趋势。该脚本可作为CI/CD流水线的一部分,或作为独立定时任务运行。
核心功能
- 批量查询预定义的关键词/问题列表
- 解析DeepSeek API返回的引用来源
- 记录引用内容、排名位置、时间戳
- 生成结构化监控数据(JSON/CSV)
- 支持异常检测与告警触发
技术选型
| 组件 | 推荐方案 | 说明 |
|---|---|---|
| 编程语言 | Python 3.9+ | 生态丰富,适合API调用与数据处理 |
| HTTP客户端 | httpx / aiohttp | 支持异步并发,提升效率 |
| 数据存储 | SQLite / PostgreSQL | 轻量级本地存储或生产级数据库 |
| 定时调度 | cron / APScheduler | 系统级或应用级定时任务 |
| 告警集成 | 钉钉/飞书/邮件 Webhook | 异常时即时通知 |
脚本实现
1. 基础配置模块
# config.py
import os
from dataclasses import dataclass
from typing import List
@dataclass
class MonitorConfig:
# DeepSeek API配置
api_key: str = os.getenv("DEEPSEEK_API_KEY", "")
api_base_url: str = "https://api.deepseek.com/v1"
model: str = "deepseek-chat" # 或 deepseek-reasoner
# 监控目标
target_keywords: List[str] = None
target_domains: List[str] = None
target_brands: List[str] = None
# 监控参数
check_interval_minutes: int = 60
max_retries: int = 3
timeout_seconds: int = 30
# 输出配置
output_dir: str = "./monitor_data"
db_path: str = "./monitor.db"
# 告警配置
alert_webhook_url: str = os.getenv("ALERT_WEBHOOK_URL", "")
alert_threshold_drop: float = 0.5 # 引用率下降50%触发告警
# 默认配置实例
default_config = MonitorConfig(
target_keywords=["你的产品名称", "你的品牌关键词", "核心业务术语"],
target_domains=["yourdomain.com", "your-competitor.com"],
target_brands=["YourBrand", "CompetitorBrand"]
)
2. 核心监控引擎
# deepseek_monitor.py
import json
import time
import logging
from datetime import datetime
from typing import Dict, List, Optional
import httpx
from dataclasses import dataclass, asdict
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
@dataclass
class QueryResult:
"""单次查询结果"""
query: str
timestamp: str
response_text: str
references: List[Dict]
brand_mentions: List[str]
domain_mentions: List[str]
answer_length: int
model: str
latency_ms: float
class DeepSeekMonitor:
def __init__(self, config):
self.config = config
self.client = httpx.Client(
base_url=config.api_base_url,
timeout=config.timeout_seconds,
headers={
"Authorization": f"Bearer {config.api_key}",
"Content-Type": "application/json"
}
)
def query_deepseek(self, question: str) -> Optional[Dict]:
"""向DeepSeek发送查询"""
payload = {
"model": self.config.model,
"messages": [
{"role": "user", "content": question}
],
"temperature": 0.3, # 低温度保证结果一致性
"max_tokens": 1024
}
for attempt in range(self.config.max_retries):
try:
start_time = time.time()
response = self.client.post("/chat/completions", json=payload)
latency = (time.time() - start_time) * 1000
if response.status_code == 200:
data = response.json()
return {
"response": data["choices"][0]["message"]["content"],
"latency_ms": latency,
"model": data["model"]
}
else:
logger.warning(f"API返回错误: {response.status_code} - {response.text}")
except httpx.TimeoutException:
logger.warning(f"请求超时 (尝试 {attempt+1}/{self.config.max_retries})")
time.sleep(2 ** attempt)
except Exception as e:
logger.error(f"请求异常: {str(e)}")
return None
def extract_references(self, text: str) -> List[Dict]:
"""从回答中提取引用来源"""
references = []
# 匹配常见引用格式
import re
# 格式1: [来源](url)
refs1 = re.findall(r'\[([^\]]+)\]\(([^)]+)\)', text)
for title, url in refs1:
references.append({
"type": "markdown_link",
"title": title.strip(),
"url": url.strip()
})
# 格式2: 来源: url
refs2 = re.findall(r'来源[::]\s*(https?://[^\s,,。]+)', text)
for url in refs2:
references.append({
"type": "plain_url",
"url": url.strip()
})
# 格式3: 根据[网站名]...
refs3 = re.findall(r'根据\s*[「「]([^」」]+)[」」]', text)
for source in refs3:
references.append({
"type": "source_mention",
"source": source.strip()
})
return references
def check_brand_mentions(self, text: str) -> List[str]:
"""检测品牌提及"""
mentions = []
for brand in self.config.target_brands:
if brand.lower() in text.lower():
mentions.append(brand)
return mentions
def check_domain_mentions(self, text: str) -> List[str]:
"""检测域名提及"""
mentions = []
for domain in self.config.target_domains:
if domain.lower() in text.lower():
mentions.append(domain)
return mentions
def run_single_query(self, question: str) -> Optional[QueryResult]:
"""执行单次查询并解析结果"""
result = self.query_deepseek(question)
if not result:
return None
response_text = result["response"]
references = self.extract_references(response_text)
brand_mentions = self.check_brand_mentions(response_text)
domain_mentions = self.check_domain_mentions(response_text)
return QueryResult(
query=question,
timestamp=datetime.now().isoformat(),
response_text=response_text[:500], # 截取前500字符
references=references,
brand_mentions=brand_mentions,
domain_mentions=domain_mentions,
answer_length=len(response_text),
model=result["model"],
latency_ms=result["latency_ms"]
)
def run_monitoring_cycle(self) -> List[QueryResult]:
"""执行一轮完整监控"""
results = []
for keyword in self.config.target_keywords:
logger.info(f"查询关键词: {keyword}")
result = self.run_single_query(keyword)
if result:
results.append(result)
logger.info(f" 引用数: {len(result.references)}, "
f"品牌提及: {result.brand_mentions}")
time.sleep(2) # 请求间隔,避免限流
return results
3. 数据持久化模块
# storage.py
import sqlite3
import json
from datetime import datetime
from typing import List
class MonitorStorage:
def __init__(self, db_path: str):
self.db_path = db_path
self.init_database()
def init_database(self):
"""初始化数据库表结构"""
conn = sqlite3.connect(self.db_path)
cursor = conn.cursor()
cursor.execute("""
CREATE TABLE IF NOT EXISTS query_results (
id INTEGER PRIMARY KEY AUTOINCREMENT,
query TEXT NOT NULL,
timestamp TEXT NOT NULL,
response_text TEXT,
references_json TEXT,
brand_mentions_json TEXT,
domain_mentions_json TEXT,
answer_length INTEGER,
model TEXT,
latency_ms REAL
)
""")
cursor.execute("""
CREATE TABLE IF NOT EXISTS daily_stats (
date TEXT PRIMARY KEY,
total_queries INTEGER,
total_references INTEGER,
brand_mention_count INTEGER,
domain_mention_count INTEGER,
avg_latency_ms REAL
)
""")
conn.commit()
conn.close()
def save_result(self, result):
"""保存单次查询结果"""
conn = sqlite3.connect(self.db_path)
cursor = conn.cursor()
cursor.execute("""
INSERT INTO query_results
(query, timestamp, response_text, references_json,
brand_mentions_json, domain_mentions_json,
answer_length, model, latency_ms)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)
""", (
result.query,
result.timestamp,
result.response_text,
json.dumps(result.references, ensure_ascii=False),
json.dumps(result.brand_mentions, ensure_ascii=False),
json.dumps(result.domain_mentions, ensure_ascii=False),
result.answer_length,
result.model,
result.latency_ms
))
conn.commit()
conn.close()
def get_latest_results(self, query: str, limit: int = 10) -> List:
"""获取指定查询的最新结果"""
conn = sqlite3.connect(self.db_path)
cursor = conn.cursor()
cursor.execute("""
SELECT timestamp, references_json, brand_mentions_json
FROM query_results
WHERE query = ?
ORDER BY timestamp DESC
LIMIT ?
""", (query, limit))
rows = cursor.fetchall()
conn.close()
return [
{
"timestamp": row[0],
"references": json.loads(row[1]),
"brand_mentions": json.loads(row[2])
}
for row in rows
]
4. 告警与通知模块
# alerting.py
import json
import logging
from typing import List, Dict
import httpx
logger = logging.getLogger(__name__)
class AlertManager:
def __init__(self, webhook_url: str):
self.webhook_url = webhook_url
def send_dingtalk_alert(self, title: str, content: str):
"""发送钉钉告警"""
payload = {
"msgtype": "markdown",
"markdown": {
"title": title,
"text": f"## {title}\n\n{content}"
}
}
self._send_webhook(payload)
def send_feishu_alert(self, title: str, content: str):
"""发送飞书告警"""
payload = {
"msg_type": "interactive",
"card": {
"header": {
"title": {"tag": "plain_text", "content": title}
},
"elements": [
{"tag": "markdown", "content": content}
]
}
}
self._send_webhook(payload)
def _send_webhook(self, payload: Dict):
"""发送Webhook请求"""
try:
response = httpx.post(
self.webhook_url,
json=payload,
timeout=10
)
if response.status_code != 200:
logger.error(f"告警发送失败: {response.status_code}")
except Exception as e:
logger.error(f"告警发送异常: {str(e)}")
def check_anomalies(self, current_results: List, historical_data: List) -> List[str]:
"""检测异常并生成告警"""
alerts = []
# 检测引用率骤降
for result in current_results:
query = result.query
ref_count = len(result.references)
# 获取历史平均引用数
historical_refs = [
len(h["references"])
for h in historical_data
if h["query"] == query
]
if historical_refs:
avg_refs = sum(historical_refs) / len(historical_refs)
if avg_refs > 0 and ref_count / avg_refs < 0.5:
alerts.append(
f"⚠️ 引用率异常: '{query}' 引用数从 {avg_refs:.1f} 降至 {ref_count}"
)
# 检测品牌提及消失
for result in current_results:
if not result.brand_mentions:
# 检查历史是否有提及
for h in historical_data:
if h["query"] == result.query and h["brand_mentions"]:
alerts.append(
f"🔴 品牌提及消失: '{result.query}' 中品牌 {h['brand_mentions']} 不再被提及"
)
break
return alerts
5. 主程序入口
# main.py
import argparse
import logging
from datetime import datetime
from pathlib import Path
from config import default_config, MonitorConfig
from deepseek_monitor import DeepSeekMonitor
from storage import MonitorStorage
from alerting import AlertManager
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger(__name__)
def run_monitor(config: MonitorConfig):
"""执行一次完整监控流程"""
# 初始化组件
monitor = DeepSeekMonitor(config)
storage = MonitorStorage(config.db_path)
alert_manager = AlertManager(config.alert_webhook_url)
# 执行监控
logger.info(f"开始监控周期: {datetime.now().isoformat()}")
results = monitor.run_monitoring_cycle()
# 保存结果
for result in results:
storage.save_result(result)
# 异常检测
for result in results:
historical = storage.get_latest_results(result.query, limit=20)
alerts = alert_manager.check_anomalies([result], historical)
for alert in alerts:
logger.warning(alert)
if config.alert_webhook_url:
alert_manager.send_dingtalk_alert(
"DeepSeek引用监控告警",
alert
)
# 输出摘要
logger.info(f"监控完成: {len(results)} 个查询")
for r in results:
logger.info(f" {r.query}: {len(r.references)} 引用, "
f"品牌提及: {r.brand_mentions}")
return results
def export_to_csv(results, output_path: str):
"""导出结果为CSV"""
import csv
with open(output_path, 'w', newline='', encoding='utf-8') as f:
writer = csv.writer(f)
writer.writerow([
"查询词", "时间戳", "引用数", "品牌提及",
"域名提及", "回答长度", "延迟(ms)"
])
for r in results:
writer.writerow([
r.query,
r.timestamp,
len(r.references),
",".join(r.brand_mentions),
",".join(r.domain_mentions),
r.answer_length,
f"{r.latency_ms:.1f}"
])
if __name__ == "__main__":
parser = argparse.ArgumentParser(description="DeepSeek API监控脚本")
parser.add_argument("--config", type=str, help="配置文件路径")
parser.add_argument("--export-csv", type=str, help="导出CSV路径")
parser.add_argument("--once", action="store_true", help="仅运行一次")
args = parser.parse_args()
config = default_config
if args.config:
# 从JSON配置文件加载
import json
with open(args.config, 'r') as f:
config_data = json.load(f)
config = MonitorConfig(**config_data)
results = run_monitor(config)
if args.export_csv:
export_to_csv(results, args.export_csv)
logger.info(f"结果已导出至: {args.export_csv}")
if not args.once:
# 持续运行模式
import time
while True:
logger.info(f"等待 {config.check_interval_minutes} 分钟后下一轮...")
time.sleep(config.check_interval_minutes * 60)
run_monitor(config)
部署与使用
Docker Compose部署
# docker-compose.yml
version: '3.8'
services:
deepseek-monitor:
build: .
environment:
- DEEPSEEK_API_KEY=${DEEPSEEK_API_KEY}
- ALERT_WEBHOOK_URL=${ALERT_WEBHOOK_URL}
volumes:
- ./data:/app/data
- ./config.json:/app/config.json
restart: unless-stopped
# Dockerfile
FROM python:3.10-slim
WORKDIR /app
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt
COPY . .
CMD ["python", "main.py"]
# requirements.txt
httpx>=0.24.0
apscheduler>=3.10.0
pandas>=1.5.0
python-dotenv>=1.0.0
配置文件示例
{
"api_key": "sk-your-deepseek-api-key",
"target_keywords": [
"你的产品名称 怎么样",
"你的品牌 推荐",
"如何解决 [你的领域问题]"
],
"target_domains": ["yourdomain.com"],
"target_brands": ["YourBrand"],
"check_interval_minutes": 120,
"alert_webhook_url": "https://oapi.dingtalk.com/robot/send?access_token=xxx"
}
运行命令
# 单次运行
python main.py --once --export-csv ./report.csv
# 持续监控
python main.py --config ./config.json
# Docker部署
docker-compose up -d
最佳实践
- 关键词选择:优先选择包含品牌名+产品名的长尾问题,如“XX品牌怎么样值得买吗”
- 频率控制:建议每小时不超过20次API调用,避免触发限流
- 结果对比:同时监控竞品关键词,建立行业基准线
- 异常阈值:引用率下降30%即需关注,下降50%应立即告警
- 数据可视化:结合Grafana展示引用趋势图,便于长期分析
注意事项
- DeepSeek API可能有调用频率限制,请查阅官方文档
- 监控结果受模型版本更新影响,需记录模型版本号
- 建议使用独立API Key,避免影响生产环境调用
- 定期清理历史数据,避免数据库膨胀
