附录L:中文GEO监控脚本集(豆包/DeepSeek API示例)
L.1 概述
本附录提供一套可直接用于监控豆包(Doubao)和 DeepSeek 生成式引擎中内容引用情况的中文脚本集。这些脚本旨在帮助全栈工程师快速搭建自动化监控系统,追踪品牌、关键词或特定内容在生成式答案中的出现频率、位置和变化趋势。
L.2 环境要求
- Python 3.8+
- 依赖库:
requests,json,time,logging,smtplib(可选,用于邮件告警) - 安装命令:
pip install requests
L.3 脚本一:豆包答案变化追踪脚本
此脚本通过模拟用户查询,捕获豆包(基于字节跳动云雀模型)的生成式回答,并记录特定目标(如品牌名、URL)是否出现在答案中。
# doubao_monitor.py
import requests
import json
import time
import logging
from datetime import datetime
# 配置日志
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
# 豆包API端点(示例,实际需替换为官方或代理端点)
DOUBAO_API_URL = "https://api.doubao.com/v1/chat/completions"
DOUBAO_API_KEY = "YOUR_DOUBAO_API_KEY"
# 监控配置
MONITOR_KEYWORDS = ["你的品牌名", "你的产品名", "yourdomain.com"]
QUERIES = [
"推荐最好的[你的行业]产品",
"[你的产品名] 怎么样",
"如何解决[你的问题]"
]
def query_doubao(prompt):
"""向豆包API发送查询并返回完整响应"""
headers = {
"Authorization": f"Bearer {DOUBAO_API_KEY}",
"Content-Type": "application/json"
}
payload = {
"model": "doubao-pro-32k",
"messages": [{"role": "user", "content": prompt}],
"temperature": 0.7
}
try:
response = requests.post(DOUBAO_API_URL, headers=headers, json=payload, timeout=30)
response.raise_for_status()
return response.json()
except Exception as e:
logging.error(f"豆包API请求失败: {e}")
return None
def check_mention(text, keywords):
"""检查文本中是否包含目标关键词"""
found = []
for kw in keywords:
if kw.lower() in text.lower():
found.append(kw)
return found
def run_monitor():
"""执行一轮监控"""
results = []
for query in QUERIES:
logging.info(f"查询: {query}")
response = query_doubao(query)
if response and 'choices' in response:
answer_text = response['choices'][0]['message']['content']
mentioned = check_mention(answer_text, MONITOR_KEYWORDS)
result = {
"timestamp": datetime.now().isoformat(),
"query": query,
"mentioned_keywords": mentioned,
"answer_length": len(answer_text),
"full_answer": answer_text[:500] # 存储前500字符用于分析
}
results.append(result)
if mentioned:
logging.info(f"发现提及: {mentioned}")
else:
logging.info("未发现提及")
time.sleep(2) # 避免频率限制
return results
def save_results(results, filename="doubao_monitor_results.json"):
"""保存监控结果到文件"""
try:
with open(filename, 'a', encoding='utf-8') as f:
for r in results:
f.write(json.dumps(r, ensure_ascii=False) + '\n')
logging.info(f"结果已保存到 {filename}")
except Exception as e:
logging.error(f"保存结果失败: {e}")
if __name__ == "__main__":
logging.info("开始豆包监控...")
results = run_monitor()
save_results(results)
logging.info("监控完成")
L.4 脚本二:DeepSeek API监控脚本
此脚本针对 DeepSeek 的 API 设计,利用其联网搜索能力,监控特定内容在生成式答案中的引用情况,并评估信源分级。
# deepseek_monitor.py
import requests
import json
import time
import logging
from datetime import datetime
# 配置日志
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
# DeepSeek API配置
DEEPSEEK_API_URL = "https://api.deepseek.com/v1/chat/completions"
DEEPSEEK_API_KEY = "YOUR_DEEPSEEK_API_KEY"
# 监控配置
TARGET_DOMAIN = "yourdomain.com"
TARGET_BRAND = "你的品牌名"
QUERIES = [
"请搜索并总结关于[你的行业]的最佳实践",
"查找[你的产品名]的详细评测",
"谁在[你的领域]提供最可靠的解决方案"
]
# 信源分级规则(模拟DeepSeek的ABC分级)
SOURCE_LEVELS = {
"A": ["政府网站", "学术论文", "官方文档"],
"B": ["权威媒体", "行业报告", "知名博客"],
"C": ["普通网站", "论坛", "社交媒体"]
}
def query_deepseek(prompt, enable_search=True):
"""向DeepSeek API发送查询,启用联网搜索"""
headers = {
"Authorization": f"Bearer {DEEPSEEK_API_KEY}",
"Content-Type": "application/json"
}
payload = {
"model": "deepseek-chat",
"messages": [
{"role": "system", "content": "你是一个搜索助手,请基于搜索结果提供准确答案,并注明信息来源。"},
{"role": "user", "content": prompt}
],
"temperature": 0.3,
"enable_search": enable_search
}
try:
response = requests.post(DEEPSEEK_API_URL, headers=headers, json=payload, timeout=60)
response.raise_for_status()
return response.json()
except Exception as e:
logging.error(f"DeepSeek API请求失败: {e}")
return None
def extract_sources(response):
"""从响应中提取引用的来源信息"""
sources = []
if response and 'choices' in response:
content = response['choices'][0]['message']['content']
# 简单解析引用标记(假设格式为 [来源: URL] 或 [1] URL)
import re
# 匹配常见引用模式
patterns = [
r'\[([^\]]+)\]\(([^)]+)\)', # Markdown链接
r'来源[::]\s*(\S+)', # 中文来源标记
r'(\d+)\.\s*(https?://\S+)' # 数字列表+URL
]
for pattern in patterns:
matches = re.findall(pattern, content)
for match in matches:
if isinstance(match, tuple):
sources.append({"text": match[0], "url": match[1]})
else:
sources.append({"text": match, "url": match})
return sources
def check_domain_in_sources(sources, target_domain):
"""检查目标域名是否出现在引用来源中"""
for source in sources:
if target_domain.lower() in source.get('url', '').lower():
return True
return False
def evaluate_source_level(source_url):
"""简单评估信源级别(基于URL特征)"""
url_lower = source_url.lower()
if any(domain in url_lower for domain in ['.gov.cn', '.edu.cn', '.ac.cn']):
return "A"
elif any(domain in url_lower for domain in ['.com', '.org', '.cn']):
return "B"
else:
return "C"
def run_monitor():
"""执行一轮监控"""
results = []
for query in QUERIES:
logging.info(f"查询: {query}")
response = query_deepseek(query)
if response:
answer_text = response['choices'][0]['message']['content']
sources = extract_sources(response)
domain_found = check_domain_in_sources(sources, TARGET_DOMAIN)
brand_found = TARGET_BRAND.lower() in answer_text.lower()
# 分析信源级别
source_levels = [evaluate_source_level(s.get('url', '')) for s in sources]
result = {
"timestamp": datetime.now().isoformat(),
"query": query,
"domain_found": domain_found,
"brand_found": brand_found,
"source_count": len(sources),
"source_levels": source_levels,
"sources": sources[:5], # 存储前5个来源
"answer_preview": answer_text[:500]
}
results.append(result)
if domain_found:
logging.info(f"目标域名 {TARGET_DOMAIN} 被引用")
if brand_found:
logging.info(f"目标品牌 {TARGET_BRAND} 被提及")
time.sleep(3) # 避免频率限制
return results
def save_results(results, filename="deepseek_monitor_results.json"):
"""保存监控结果到文件"""
try:
with open(filename, 'a', encoding='utf-8') as f:
for r in results:
f.write(json.dumps(r, ensure_ascii=False) + '\n')
logging.info(f"结果已保存到 {filename}")
except Exception as e:
logging.error(f"保存结果失败: {e}")
def generate_report(results):
"""生成简单监控报告"""
total_queries = len(results)
domain_mentions = sum(1 for r in results if r['domain_found'])
brand_mentions = sum(1 for r in results if r['brand_found'])
report = f"""
===== DeepSeek监控报告 =====
时间: {datetime.now().isoformat()}
查询总数: {total_queries}
域名被引用次数: {domain_mentions} ({domain_mentions/total_queries*100:.1f}%)
品牌被提及次数: {brand_mentions} ({brand_mentions/total_queries*100:.1f}%)
平均来源数量: {sum(r['source_count'] for r in results)/total_queries:.1f}
============================
"""
logging.info(report)
return report
if __name__ == "__main__":
logging.info("开始DeepSeek监控...")
results = run_monitor()
save_results(results)
report = generate_report(results)
# 可选:将报告发送到邮件或消息平台
print(report)
L.5 脚本三:定时调度与告警集成
将上述脚本与定时任务和告警系统集成,实现自动化监控。
# scheduler_monitor.py
import schedule
import time
import logging
from datetime import datetime
import subprocess
import json
# 配置日志
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
# 告警配置(示例:发送到飞书Webhook)
ALERT_WEBHOOK_URL = "https://open.feishu.cn/open-apis/bot/v2/hook/YOUR_WEBHOOK_ID"
ALERT_THRESHOLD = 0.5 # 当引用率低于50%时触发告警
def run_doubao_monitor():
"""运行豆包监控脚本"""
logging.info("执行豆包监控任务...")
try:
result = subprocess.run(['python', 'doubao_monitor.py'], capture_output=True, text=True)
logging.info(f"豆包监控完成: {result.stdout}")
if result.returncode != 0:
logging.error(f"豆包监控错误: {result.stderr}")
except Exception as e:
logging.error(f"豆包监控异常: {e}")
def run_deepseek_monitor():
"""运行DeepSeek监控脚本"""
logging.info("执行DeepSeek监控任务...")
try:
result = subprocess.run(['python', 'deepseek_monitor.py'], capture_output=True, text=True)
logging.info(f"DeepSeek监控完成: {result.stdout}")
if result.returncode != 0:
logging.error(f"DeepSeek监控错误: {result.stderr}")
except Exception as e:
logging.error(f"DeepSeek监控异常: {e}")
def check_alerts():
"""检查监控结果并触发告警"""
try:
# 读取最新的监控结果
with open('deepseek_monitor_results.json', 'r') as f:
lines = f.readlines()
if lines:
last_results = [json.loads(line) for line in lines[-10:]] # 最近10次
domain_mention_rate = sum(1 for r in last_results if r['domain_found']) / len(last_results)
if domain_mention_rate < ALERT_THRESHOLD:
alert_message = {
"msg_type": "text",
"content": {
"text": f"⚠️ GEO监控告警:DeepSeek中域名引用率降至 {domain_mention_rate:.1%},低于阈值 {ALERT_THRESHOLD:.0%}"
}
}
# 发送告警(示例)
# requests.post(ALERT_WEBHOOK_URL, json=alert_message)
logging.warning(f"触发告警:引用率 {domain_mention_rate:.1%}")
except Exception as e:
logging.error(f"告警检查异常: {e}")
# 定时任务配置
schedule.every().hour.do(run_doubao_monitor) # 每小时检查豆包
schedule.every().hour.do(run_deepseek_monitor) # 每小时检查DeepSeek
schedule.every().day.at("09:00").do(check_alerts) # 每天9点检查告警
if __name__ == "__main__":
logging.info("启动定时监控调度器...")
# 立即执行一次
run_doubao_monitor()
run_deepseek_monitor()
while True:
schedule.run_pending()
time.sleep(60) # 每分钟检查一次任务队列
L.6 使用说明
获取API密钥:
- 豆包:通过字节跳动开放平台申请
- DeepSeek:通过DeepSeek开发者平台申请
配置监控目标:
- 修改脚本中的
MONITOR_KEYWORDS、TARGET_DOMAIN、TARGET_BRAND - 自定义
QUERIES列表以匹配你的业务场景
- 修改脚本中的
运行方式:
# 单次运行 python doubao_monitor.py python deepseek_monitor.py # 定时运行 python scheduler_monitor.py结果分析:
- 监控结果以JSON格式保存,便于后续分析
- 可使用
jq或 Python 脚本进行趋势分析 - 示例分析命令:
cat deepseek_monitor_results.json | jq '.[].domain_found' | sort | uniq -c
L.7 扩展建议
- 多模型对比:扩展脚本支持同时查询多个生成引擎(如文心一言、通义千问)
- 可视化仪表盘:将结果输出到 Prometheus/Grafana 或自建 Web 界面
- 语义相似度分析:集成
sentence-transformers库,比较答案内容与目标内容的语义相似度 - 异常检测:基于历史数据建立基线,自动检测引用率的异常波动
本附录脚本需根据实际API文档进行调整,API端点、参数和认证方式可能随平台更新而变化。
