附录E:全栈代码示例库
E.7 豆包答案变化追踪脚本
本脚本用于监控豆包(Doubao)生成式引擎对特定查询的答案变化,帮助您追踪内容在豆包中的表现,并检测答案的更新频率与内容漂移。
脚本功能概述
- 定时查询:定期向豆包API发送预设问题。
- 答案捕获:获取豆包返回的完整文本答案。
- 变化检测:通过文本相似度算法(如余弦相似度、编辑距离)对比新旧答案。
- 告警通知:当答案发生显著变化时,通过邮件、钉钉或飞书发送告警。
- 数据存储:将每次查询的答案、时间戳、相似度分数存入本地数据库或日志文件。
技术栈
- 语言:Python 3.8+
- 依赖库:
requests,json,hashlib,difflib,smtplib(邮件告警),requests(钉钉/飞书Webhook) - 数据库:SQLite(轻量级,适合本地运行)或 MySQL/PostgreSQL(生产环境)
完整代码示例
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
豆包答案变化追踪脚本
监控豆包生成式引擎对特定查询的答案变化,并触发告警。
"""
import requests
import json
import time
import hashlib
import sqlite3
import logging
from datetime import datetime
from difflib import SequenceMatcher
# ==================== 配置区域 ====================
DOUBAO_API_URL = "https://api.doubao.com/v1/chat/completions" # 示例URL,请替换为实际API地址
DOUBAO_API_KEY = "your_doubao_api_key_here" # 替换为您的API密钥
# 要监控的查询列表
QUERIES = [
"什么是GEO优化?",
"2025年SEO趋势有哪些?",
"如何提高网站在豆包中的引用率?"
]
# 数据库配置
DB_PATH = "doubao_answer_tracker.db"
# 告警配置(可选)
ALERT_EMAIL_ENABLED = False
ALERT_EMAIL_SENDER = "your_email@example.com"
ALERT_EMAIL_PASSWORD = "your_email_password"
ALERT_EMAIL_RECEIVER = "receiver@example.com"
ALERT_SMTP_SERVER = "smtp.example.com"
ALERT_SMTP_PORT = 587
ALERT_WEBHOOK_ENABLED = True
ALERT_WEBHOOK_URL = "https://oapi.dingtalk.com/robot/send?access_token=your_token" # 钉钉/飞书Webhook
# 相似度阈值,低于此值视为答案发生显著变化
SIMILARITY_THRESHOLD = 0.8
# 查询间隔(秒),建议至少5分钟
QUERY_INTERVAL = 300
# ==================== 日志配置 ====================
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s',
handlers=[
logging.FileHandler("doubao_tracker.log"),
logging.StreamHandler()
]
)
logger = logging.getLogger(__name__)
# ==================== 数据库操作 ====================
def init_database():
"""初始化SQLite数据库,创建表结构"""
conn = sqlite3.connect(DB_PATH)
cursor = conn.cursor()
cursor.execute('''
CREATE TABLE IF NOT EXISTS answer_history (
id INTEGER PRIMARY KEY AUTOINCREMENT,
query TEXT NOT NULL,
answer TEXT NOT NULL,
answer_hash TEXT NOT NULL,
timestamp DATETIME DEFAULT CURRENT_TIMESTAMP,
similarity REAL DEFAULT 1.0
)
''')
cursor.execute('''
CREATE INDEX IF NOT EXISTS idx_query_timestamp
ON answer_history (query, timestamp)
''')
conn.commit()
conn.close()
logger.info("数据库初始化完成")
def save_answer(query, answer, similarity=1.0):
"""保存答案到数据库"""
conn = sqlite3.connect(DB_PATH)
cursor = conn.cursor()
answer_hash = hashlib.md5(answer.encode('utf-8')).hexdigest()
cursor.execute('''
INSERT INTO answer_history (query, answer, answer_hash, similarity)
VALUES (?, ?, ?, ?)
''', (query, answer, answer_hash, similarity))
conn.commit()
conn.close()
logger.info(f"答案已保存: query='{query}', hash={answer_hash}")
def get_latest_answer(query):
"""获取指定查询的最新答案"""
conn = sqlite3.connect(DB_PATH)
cursor = conn.cursor()
cursor.execute('''
SELECT answer, answer_hash FROM answer_history
WHERE query = ?
ORDER BY timestamp DESC
LIMIT 1
''', (query,))
result = cursor.fetchone()
conn.close()
return result
def calculate_similarity(text1, text2):
"""计算两个文本的相似度(使用SequenceMatcher)"""
return SequenceMatcher(None, text1, text2).ratio()
# ==================== 豆包API调用 ====================
def query_doubao(query):
"""向豆包API发送查询请求,返回答案文本"""
headers = {
"Authorization": f"Bearer {DOUBAO_API_KEY}",
"Content-Type": "application/json"
}
payload = {
"model": "doubao-pro-32k", # 根据实际模型调整
"messages": [
{"role": "user", "content": query}
],
"temperature": 0.7,
"max_tokens": 1024
}
try:
response = requests.post(DOUBAO_API_URL, headers=headers, json=payload, timeout=30)
response.raise_for_status()
data = response.json()
answer = data['choices'][0]['message']['content']
return answer
except requests.exceptions.RequestException as e:
logger.error(f"API请求失败: {e}")
return None
except (KeyError, IndexError) as e:
logger.error(f"API响应解析失败: {e}")
return None
# ==================== 告警功能 ====================
def send_email_alert(query, old_answer, new_answer, similarity):
"""发送邮件告警"""
if not ALERT_EMAIL_ENABLED:
return
import smtplib
from email.mime.text import MIMEText
subject = f"[豆包答案变化] 查询: {query}"
body = f"""
查询: {query}
旧答案摘要: {old_answer[:200]}...
新答案摘要: {new_answer[:200]}...
相似度: {similarity:.2f}
时间: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}
"""
msg = MIMEText(body, 'plain', 'utf-8')
msg['Subject'] = subject
msg['From'] = ALERT_EMAIL_SENDER
msg['To'] = ALERT_EMAIL_RECEIVER
try:
server = smtplib.SMTP(ALERT_SMTP_SERVER, ALERT_SMTP_PORT)
server.starttls()
server.login(ALERT_EMAIL_SENDER, ALERT_EMAIL_PASSWORD)
server.sendmail(ALERT_EMAIL_SENDER, [ALERT_EMAIL_RECEIVER], msg.as_string())
server.quit()
logger.info(f"邮件告警已发送: {subject}")
except Exception as e:
logger.error(f"邮件发送失败: {e}")
def send_webhook_alert(query, old_answer, new_answer, similarity):
"""发送Webhook告警(钉钉/飞书)"""
if not ALERT_WEBHOOK_ENABLED:
return
message = {
"msgtype": "text",
"text": {
"content": f"⚠️ 豆包答案变化告警\n\n查询: {query}\n旧答案摘要: {old_answer[:200]}...\n新答案摘要: {new_answer[:200]}...\n相似度: {similarity:.2f}\n时间: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}"
}
}
try:
response = requests.post(ALERT_WEBHOOK_URL, json=message, timeout=10)
response.raise_for_status()
logger.info(f"Webhook告警已发送: {query}")
except Exception as e:
logger.error(f"Webhook发送失败: {e}")
# ==================== 主监控循环 ====================
def monitor_queries():
"""主监控循环"""
logger.info("豆包答案变化追踪脚本启动")
init_database()
while True:
for query in QUERIES:
logger.info(f"正在查询: {query}")
new_answer = query_doubao(query)
if new_answer is None:
logger.warning(f"查询失败,跳过: {query}")
continue
# 获取上次答案
latest = get_latest_answer(query)
if latest is None:
# 首次查询,直接保存
save_answer(query, new_answer)
logger.info(f"首次保存答案: {query}")
else:
old_answer, old_hash = latest
new_hash = hashlib.md5(new_answer.encode('utf-8')).hexdigest()
if old_hash != new_hash:
# 答案发生变化,计算相似度
similarity = calculate_similarity(old_answer, new_answer)
save_answer(query, new_answer, similarity)
logger.info(f"答案变化: query='{query}', similarity={similarity:.4f}")
if similarity < SIMILARITY_THRESHOLD:
# 显著变化,触发告警
logger.warning(f"答案显著变化: query='{query}', similarity={similarity:.4f}")
send_email_alert(query, old_answer, new_answer, similarity)
send_webhook_alert(query, old_answer, new_answer, similarity)
else:
logger.info(f"答案未变化: query='{query}'")
# 等待下一次查询
logger.info(f"等待 {QUERY_INTERVAL} 秒后再次查询...")
time.sleep(QUERY_INTERVAL)
# ==================== 辅助功能 ====================
def query_history(query, limit=10):
"""查询指定问题的最新N条历史答案"""
conn = sqlite3.connect(DB_PATH)
cursor = conn.cursor()
cursor.execute('''
SELECT answer, similarity, timestamp
FROM answer_history
WHERE query = ?
ORDER BY timestamp DESC
LIMIT ?
''', (query, limit))
results = cursor.fetchall()
conn.close()
return results
def export_to_json(filepath="doubao_answers_export.json"):
"""导出所有答案历史到JSON文件"""
conn = sqlite3.connect(DB_PATH)
cursor = conn.cursor()
cursor.execute('''
SELECT query, answer, similarity, timestamp
FROM answer_history
ORDER BY timestamp DESC
''')
rows = cursor.fetchall()
conn.close()
data = []
for row in rows:
data.append({
"query": row[0],
"answer": row[1],
"similarity": row[2],
"timestamp": row[3]
})
with open(filepath, 'w', encoding='utf-8') as f:
json.dump(data, f, ensure_ascii=False, indent=2)
logger.info(f"数据已导出到 {filepath}")
# ==================== 入口 ====================
if __name__ == "__main__":
import sys
if len(sys.argv) > 1:
command = sys.argv[1]
if command == "history":
query = sys.argv[2] if len(sys.argv) > 2 else QUERIES[0]
limit = int(sys.argv[3]) if len(sys.argv) > 3 else 10
results = query_history(query, limit)
for r in results:
print(f"[{r[2]}] 相似度: {r[1]:.4f}")
print(f"答案: {r[0][:100]}...")
print("-" * 50)
elif command == "export":
export_to_json()
elif command == "once":
# 单次查询并保存
for query in QUERIES:
answer = query_doubao(query)
if answer:
save_answer(query, answer)
print(f"查询: {query}")
print(f"答案: {answer[:200]}...")
else:
print("用法: python doubao_tracker.py [history|export|once]")
else:
# 默认启动监控循环
try:
monitor_queries()
except KeyboardInterrupt:
logger.info("脚本被用户中断")
except Exception as e:
logger.error(f"脚本异常退出: {e}")