Tailwind CSSTailwind CSS
Home
  • Tailwind CSS 书籍目录
  • Vue 3 开发实战指南
  • React 和 Next.js 学习
  • TypeScript
  • React开发框架书籍大纲
  • Shadcn学习大纲
  • Swift 编程语言:从入门到进阶
  • SwiftUI 学习指南
  • 函数式编程大纲
  • Swift 异步编程语言
  • Swift 协议化编程
  • SwiftUI MVVM 开发模式
  • SwiftUI 图表开发书籍
  • SwiftData
  • ArkTS编程语言:从入门到精通
  • 仓颉编程语言:从入门到精通
  • 鸿蒙手机客户端开发实战
  • WPF书籍
  • C#开发书籍
learn
  • 搜索未来:SEO与GEO双引擎实战手册
  • Java编程语言
  • Kotlin 编程入门与实战
  • /python/outline.html
  • Rust 开发入门
  • AI Agent
  • MCP (Model Context Protocol) 应用指南
  • 深度学习
  • 深度学习
  • 强化学习: 理论与实践
  • 扩散模型书籍
  • Agentic AI for Everyone
langchain
Home
  • Tailwind CSS 书籍目录
  • Vue 3 开发实战指南
  • React 和 Next.js 学习
  • TypeScript
  • React开发框架书籍大纲
  • Shadcn学习大纲
  • Swift 编程语言:从入门到进阶
  • SwiftUI 学习指南
  • 函数式编程大纲
  • Swift 异步编程语言
  • Swift 协议化编程
  • SwiftUI MVVM 开发模式
  • SwiftUI 图表开发书籍
  • SwiftData
  • ArkTS编程语言:从入门到精通
  • 仓颉编程语言:从入门到精通
  • 鸿蒙手机客户端开发实战
  • WPF书籍
  • C#开发书籍
learn
  • 搜索未来:SEO与GEO双引擎实战手册
  • Java编程语言
  • Kotlin 编程入门与实战
  • /python/outline.html
  • Rust 开发入门
  • AI Agent
  • MCP (Model Context Protocol) 应用指南
  • 深度学习
  • 深度学习
  • 强化学习: 理论与实践
  • 扩散模型书籍
  • Agentic AI for Everyone
langchain
  • 附录E:全栈代码示例库

附录E:全栈代码示例库

E.7 豆包答案变化追踪脚本

本脚本用于监控豆包(Doubao)生成式引擎对特定查询的答案变化,帮助您追踪内容在豆包中的表现,并检测答案的更新频率与内容漂移。

脚本功能概述

  • 定时查询:定期向豆包API发送预设问题。
  • 答案捕获:获取豆包返回的完整文本答案。
  • 变化检测:通过文本相似度算法(如余弦相似度、编辑距离)对比新旧答案。
  • 告警通知:当答案发生显著变化时,通过邮件、钉钉或飞书发送告警。
  • 数据存储:将每次查询的答案、时间戳、相似度分数存入本地数据库或日志文件。

技术栈

  • 语言:Python 3.8+
  • 依赖库:requests, json, hashlib, difflib, smtplib(邮件告警), requests(钉钉/飞书Webhook)
  • 数据库:SQLite(轻量级,适合本地运行)或 MySQL/PostgreSQL(生产环境)

完整代码示例

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
豆包答案变化追踪脚本
监控豆包生成式引擎对特定查询的答案变化,并触发告警。
"""

import requests
import json
import time
import hashlib
import sqlite3
import logging
from datetime import datetime
from difflib import SequenceMatcher

# ==================== 配置区域 ====================
DOUBAO_API_URL = "https://api.doubao.com/v1/chat/completions"  # 示例URL,请替换为实际API地址
DOUBAO_API_KEY = "your_doubao_api_key_here"  # 替换为您的API密钥

# 要监控的查询列表
QUERIES = [
    "什么是GEO优化?",
    "2025年SEO趋势有哪些?",
    "如何提高网站在豆包中的引用率?"
]

# 数据库配置
DB_PATH = "doubao_answer_tracker.db"

# 告警配置(可选)
ALERT_EMAIL_ENABLED = False
ALERT_EMAIL_SENDER = "your_email@example.com"
ALERT_EMAIL_PASSWORD = "your_email_password"
ALERT_EMAIL_RECEIVER = "receiver@example.com"
ALERT_SMTP_SERVER = "smtp.example.com"
ALERT_SMTP_PORT = 587

ALERT_WEBHOOK_ENABLED = True
ALERT_WEBHOOK_URL = "https://oapi.dingtalk.com/robot/send?access_token=your_token"  # 钉钉/飞书Webhook

# 相似度阈值,低于此值视为答案发生显著变化
SIMILARITY_THRESHOLD = 0.8

# 查询间隔(秒),建议至少5分钟
QUERY_INTERVAL = 300

# ==================== 日志配置 ====================
logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(levelname)s - %(message)s',
    handlers=[
        logging.FileHandler("doubao_tracker.log"),
        logging.StreamHandler()
    ]
)
logger = logging.getLogger(__name__)

# ==================== 数据库操作 ====================
def init_database():
    """初始化SQLite数据库,创建表结构"""
    conn = sqlite3.connect(DB_PATH)
    cursor = conn.cursor()
    cursor.execute('''
        CREATE TABLE IF NOT EXISTS answer_history (
            id INTEGER PRIMARY KEY AUTOINCREMENT,
            query TEXT NOT NULL,
            answer TEXT NOT NULL,
            answer_hash TEXT NOT NULL,
            timestamp DATETIME DEFAULT CURRENT_TIMESTAMP,
            similarity REAL DEFAULT 1.0
        )
    ''')
    cursor.execute('''
        CREATE INDEX IF NOT EXISTS idx_query_timestamp 
        ON answer_history (query, timestamp)
    ''')
    conn.commit()
    conn.close()
    logger.info("数据库初始化完成")

def save_answer(query, answer, similarity=1.0):
    """保存答案到数据库"""
    conn = sqlite3.connect(DB_PATH)
    cursor = conn.cursor()
    answer_hash = hashlib.md5(answer.encode('utf-8')).hexdigest()
    cursor.execute('''
        INSERT INTO answer_history (query, answer, answer_hash, similarity)
        VALUES (?, ?, ?, ?)
    ''', (query, answer, answer_hash, similarity))
    conn.commit()
    conn.close()
    logger.info(f"答案已保存: query='{query}', hash={answer_hash}")

def get_latest_answer(query):
    """获取指定查询的最新答案"""
    conn = sqlite3.connect(DB_PATH)
    cursor = conn.cursor()
    cursor.execute('''
        SELECT answer, answer_hash FROM answer_history 
        WHERE query = ? 
        ORDER BY timestamp DESC 
        LIMIT 1
    ''', (query,))
    result = cursor.fetchone()
    conn.close()
    return result

def calculate_similarity(text1, text2):
    """计算两个文本的相似度(使用SequenceMatcher)"""
    return SequenceMatcher(None, text1, text2).ratio()

# ==================== 豆包API调用 ====================
def query_doubao(query):
    """向豆包API发送查询请求,返回答案文本"""
    headers = {
        "Authorization": f"Bearer {DOUBAO_API_KEY}",
        "Content-Type": "application/json"
    }
    payload = {
        "model": "doubao-pro-32k",  # 根据实际模型调整
        "messages": [
            {"role": "user", "content": query}
        ],
        "temperature": 0.7,
        "max_tokens": 1024
    }
    
    try:
        response = requests.post(DOUBAO_API_URL, headers=headers, json=payload, timeout=30)
        response.raise_for_status()
        data = response.json()
        answer = data['choices'][0]['message']['content']
        return answer
    except requests.exceptions.RequestException as e:
        logger.error(f"API请求失败: {e}")
        return None
    except (KeyError, IndexError) as e:
        logger.error(f"API响应解析失败: {e}")
        return None

# ==================== 告警功能 ====================
def send_email_alert(query, old_answer, new_answer, similarity):
    """发送邮件告警"""
    if not ALERT_EMAIL_ENABLED:
        return
    
    import smtplib
    from email.mime.text import MIMEText
    
    subject = f"[豆包答案变化] 查询: {query}"
    body = f"""
    查询: {query}
    旧答案摘要: {old_answer[:200]}...
    新答案摘要: {new_answer[:200]}...
    相似度: {similarity:.2f}
    时间: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}
    """
    
    msg = MIMEText(body, 'plain', 'utf-8')
    msg['Subject'] = subject
    msg['From'] = ALERT_EMAIL_SENDER
    msg['To'] = ALERT_EMAIL_RECEIVER
    
    try:
        server = smtplib.SMTP(ALERT_SMTP_SERVER, ALERT_SMTP_PORT)
        server.starttls()
        server.login(ALERT_EMAIL_SENDER, ALERT_EMAIL_PASSWORD)
        server.sendmail(ALERT_EMAIL_SENDER, [ALERT_EMAIL_RECEIVER], msg.as_string())
        server.quit()
        logger.info(f"邮件告警已发送: {subject}")
    except Exception as e:
        logger.error(f"邮件发送失败: {e}")

def send_webhook_alert(query, old_answer, new_answer, similarity):
    """发送Webhook告警(钉钉/飞书)"""
    if not ALERT_WEBHOOK_ENABLED:
        return
    
    message = {
        "msgtype": "text",
        "text": {
            "content": f"⚠️ 豆包答案变化告警\n\n查询: {query}\n旧答案摘要: {old_answer[:200]}...\n新答案摘要: {new_answer[:200]}...\n相似度: {similarity:.2f}\n时间: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}"
        }
    }
    
    try:
        response = requests.post(ALERT_WEBHOOK_URL, json=message, timeout=10)
        response.raise_for_status()
        logger.info(f"Webhook告警已发送: {query}")
    except Exception as e:
        logger.error(f"Webhook发送失败: {e}")

# ==================== 主监控循环 ====================
def monitor_queries():
    """主监控循环"""
    logger.info("豆包答案变化追踪脚本启动")
    init_database()
    
    while True:
        for query in QUERIES:
            logger.info(f"正在查询: {query}")
            new_answer = query_doubao(query)
            
            if new_answer is None:
                logger.warning(f"查询失败,跳过: {query}")
                continue
            
            # 获取上次答案
            latest = get_latest_answer(query)
            
            if latest is None:
                # 首次查询,直接保存
                save_answer(query, new_answer)
                logger.info(f"首次保存答案: {query}")
            else:
                old_answer, old_hash = latest
                new_hash = hashlib.md5(new_answer.encode('utf-8')).hexdigest()
                
                if old_hash != new_hash:
                    # 答案发生变化,计算相似度
                    similarity = calculate_similarity(old_answer, new_answer)
                    save_answer(query, new_answer, similarity)
                    
                    logger.info(f"答案变化: query='{query}', similarity={similarity:.4f}")
                    
                    if similarity < SIMILARITY_THRESHOLD:
                        # 显著变化,触发告警
                        logger.warning(f"答案显著变化: query='{query}', similarity={similarity:.4f}")
                        send_email_alert(query, old_answer, new_answer, similarity)
                        send_webhook_alert(query, old_answer, new_answer, similarity)
                else:
                    logger.info(f"答案未变化: query='{query}'")
        
        # 等待下一次查询
        logger.info(f"等待 {QUERY_INTERVAL} 秒后再次查询...")
        time.sleep(QUERY_INTERVAL)

# ==================== 辅助功能 ====================
def query_history(query, limit=10):
    """查询指定问题的最新N条历史答案"""
    conn = sqlite3.connect(DB_PATH)
    cursor = conn.cursor()
    cursor.execute('''
        SELECT answer, similarity, timestamp 
        FROM answer_history 
        WHERE query = ? 
        ORDER BY timestamp DESC 
        LIMIT ?
    ''', (query, limit))
    results = cursor.fetchall()
    conn.close()
    return results

def export_to_json(filepath="doubao_answers_export.json"):
    """导出所有答案历史到JSON文件"""
    conn = sqlite3.connect(DB_PATH)
    cursor = conn.cursor()
    cursor.execute('''
        SELECT query, answer, similarity, timestamp 
        FROM answer_history 
        ORDER BY timestamp DESC
    ''')
    rows = cursor.fetchall()
    conn.close()
    
    data = []
    for row in rows:
        data.append({
            "query": row[0],
            "answer": row[1],
            "similarity": row[2],
            "timestamp": row[3]
        })
    
    with open(filepath, 'w', encoding='utf-8') as f:
        json.dump(data, f, ensure_ascii=False, indent=2)
    
    logger.info(f"数据已导出到 {filepath}")

# ==================== 入口 ====================
if __name__ == "__main__":
    import sys
    
    if len(sys.argv) > 1:
        command = sys.argv[1]
        if command == "history":
            query = sys.argv[2] if len(sys.argv) > 2 else QUERIES[0]
            limit = int(sys.argv[3]) if len(sys.argv) > 3 else 10
            results = query_history(query, limit)
            for r in results:
                print(f"[{r[2]}] 相似度: {r[1]:.4f}")
                print(f"答案: {r[0][:100]}...")
                print("-" * 50)
        elif command == "export":
            export_to_json()
        elif command == "once":
            # 单次查询并保存
            for query in QUERIES:
                answer = query_doubao(query)
                if answer:
                    save_answer(query, answer)
                    print(f"查询: {query}")
                    print(f"答案: {answer[:200]}...")
        else:
            print("用法: python doubao_tracker.py [history|export|once]")
    else:
        # 默认启动监控循环
        try:
            monitor_queries()
        except KeyboardInterrupt:
            logger.info("脚本被用户中断")
        except Exception as e:
            logger.error(f"脚本异常退出: {e}")
Last Updated:: 5/9/26, 5:13 PM