Tailwind CSSTailwind CSS
Home
  • Tailwind CSS 书籍目录
  • Vue 3 开发实战指南
  • React 和 Next.js 学习
  • TypeScript
  • React开发框架书籍大纲
  • Shadcn学习大纲
  • Swift 编程语言:从入门到进阶
  • SwiftUI 学习指南
  • 函数式编程大纲
  • Swift 异步编程语言
  • Swift 协议化编程
  • SwiftUI MVVM 开发模式
  • SwiftUI 图表开发书籍
  • SwiftData
  • ArkTS编程语言:从入门到精通
  • 仓颉编程语言:从入门到精通
  • 鸿蒙手机客户端开发实战
  • WPF书籍
  • C#开发书籍
learn
  • 搜索未来:SEO与GEO双引擎实战手册
  • Java编程语言
  • Kotlin 编程入门与实战
  • /python/outline.html
  • Rust 开发入门
  • AI Agent
  • MCP (Model Context Protocol) 应用指南
  • 深度学习
  • 深度学习
  • 强化学习: 理论与实践
  • 扩散模型书籍
  • Agentic AI for Everyone
langchain
Home
  • Tailwind CSS 书籍目录
  • Vue 3 开发实战指南
  • React 和 Next.js 学习
  • TypeScript
  • React开发框架书籍大纲
  • Shadcn学习大纲
  • Swift 编程语言:从入门到进阶
  • SwiftUI 学习指南
  • 函数式编程大纲
  • Swift 异步编程语言
  • Swift 协议化编程
  • SwiftUI MVVM 开发模式
  • SwiftUI 图表开发书籍
  • SwiftData
  • ArkTS编程语言:从入门到精通
  • 仓颉编程语言:从入门到精通
  • 鸿蒙手机客户端开发实战
  • WPF书籍
  • C#开发书籍
learn
  • 搜索未来:SEO与GEO双引擎实战手册
  • Java编程语言
  • Kotlin 编程入门与实战
  • /python/outline.html
  • Rust 开发入门
  • AI Agent
  • MCP (Model Context Protocol) 应用指南
  • 深度学习
  • 深度学习
  • 强化学习: 理论与实践
  • 扩散模型书籍
  • Agentic AI for Everyone
langchain
  • 附录E.6:DeepSeek API监控脚本

附录E.6:DeepSeek API监控脚本

概述

DeepSeek API监控脚本用于定期查询DeepSeek生成引擎对特定内容的引用情况,帮助工程师追踪品牌、产品、关键词在DeepSeek回答中的出现频率、上下文和变化趋势。该脚本可作为CI/CD流水线的一部分,或作为独立定时任务运行。

核心功能

  • 批量查询预定义的关键词/问题列表
  • 解析DeepSeek API返回的引用来源
  • 记录引用内容、排名位置、时间戳
  • 生成结构化监控数据(JSON/CSV)
  • 支持异常检测与告警触发

技术选型

组件推荐方案说明
编程语言Python 3.9+生态丰富,适合API调用与数据处理
HTTP客户端httpx / aiohttp支持异步并发,提升效率
数据存储SQLite / PostgreSQL轻量级本地存储或生产级数据库
定时调度cron / APScheduler系统级或应用级定时任务
告警集成钉钉/飞书/邮件 Webhook异常时即时通知

脚本实现

1. 基础配置模块

# config.py
import os
from dataclasses import dataclass
from typing import List

@dataclass
class MonitorConfig:
    # DeepSeek API配置
    api_key: str = os.getenv("DEEPSEEK_API_KEY", "")
    api_base_url: str = "https://api.deepseek.com/v1"
    model: str = "deepseek-chat"  # 或 deepseek-reasoner
    
    # 监控目标
    target_keywords: List[str] = None
    target_domains: List[str] = None
    target_brands: List[str] = None
    
    # 监控参数
    check_interval_minutes: int = 60
    max_retries: int = 3
    timeout_seconds: int = 30
    
    # 输出配置
    output_dir: str = "./monitor_data"
    db_path: str = "./monitor.db"
    
    # 告警配置
    alert_webhook_url: str = os.getenv("ALERT_WEBHOOK_URL", "")
    alert_threshold_drop: float = 0.5  # 引用率下降50%触发告警

# 默认配置实例
default_config = MonitorConfig(
    target_keywords=["你的产品名称", "你的品牌关键词", "核心业务术语"],
    target_domains=["yourdomain.com", "your-competitor.com"],
    target_brands=["YourBrand", "CompetitorBrand"]
)

2. 核心监控引擎

# deepseek_monitor.py
import json
import time
import logging
from datetime import datetime
from typing import Dict, List, Optional
import httpx
from dataclasses import dataclass, asdict

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

@dataclass
class QueryResult:
    """单次查询结果"""
    query: str
    timestamp: str
    response_text: str
    references: List[Dict]
    brand_mentions: List[str]
    domain_mentions: List[str]
    answer_length: int
    model: str
    latency_ms: float

class DeepSeekMonitor:
    def __init__(self, config):
        self.config = config
        self.client = httpx.Client(
            base_url=config.api_base_url,
            timeout=config.timeout_seconds,
            headers={
                "Authorization": f"Bearer {config.api_key}",
                "Content-Type": "application/json"
            }
        )
        
    def query_deepseek(self, question: str) -> Optional[Dict]:
        """向DeepSeek发送查询"""
        payload = {
            "model": self.config.model,
            "messages": [
                {"role": "user", "content": question}
            ],
            "temperature": 0.3,  # 低温度保证结果一致性
            "max_tokens": 1024
        }
        
        for attempt in range(self.config.max_retries):
            try:
                start_time = time.time()
                response = self.client.post("/chat/completions", json=payload)
                latency = (time.time() - start_time) * 1000
                
                if response.status_code == 200:
                    data = response.json()
                    return {
                        "response": data["choices"][0]["message"]["content"],
                        "latency_ms": latency,
                        "model": data["model"]
                    }
                else:
                    logger.warning(f"API返回错误: {response.status_code} - {response.text}")
                    
            except httpx.TimeoutException:
                logger.warning(f"请求超时 (尝试 {attempt+1}/{self.config.max_retries})")
                time.sleep(2 ** attempt)
            except Exception as e:
                logger.error(f"请求异常: {str(e)}")
                
        return None

    def extract_references(self, text: str) -> List[Dict]:
        """从回答中提取引用来源"""
        references = []
        
        # 匹配常见引用格式
        import re
        
        # 格式1: [来源](url)
        refs1 = re.findall(r'\[([^\]]+)\]\(([^)]+)\)', text)
        for title, url in refs1:
            references.append({
                "type": "markdown_link",
                "title": title.strip(),
                "url": url.strip()
            })
        
        # 格式2: 来源: url
        refs2 = re.findall(r'来源[::]\s*(https?://[^\s,,。]+)', text)
        for url in refs2:
            references.append({
                "type": "plain_url",
                "url": url.strip()
            })
        
        # 格式3: 根据[网站名]...
        refs3 = re.findall(r'根据\s*[「「]([^」」]+)[」」]', text)
        for source in refs3:
            references.append({
                "type": "source_mention",
                "source": source.strip()
            })
            
        return references

    def check_brand_mentions(self, text: str) -> List[str]:
        """检测品牌提及"""
        mentions = []
        for brand in self.config.target_brands:
            if brand.lower() in text.lower():
                mentions.append(brand)
        return mentions

    def check_domain_mentions(self, text: str) -> List[str]:
        """检测域名提及"""
        mentions = []
        for domain in self.config.target_domains:
            if domain.lower() in text.lower():
                mentions.append(domain)
        return mentions

    def run_single_query(self, question: str) -> Optional[QueryResult]:
        """执行单次查询并解析结果"""
        result = self.query_deepseek(question)
        if not result:
            return None
            
        response_text = result["response"]
        references = self.extract_references(response_text)
        brand_mentions = self.check_brand_mentions(response_text)
        domain_mentions = self.check_domain_mentions(response_text)
        
        return QueryResult(
            query=question,
            timestamp=datetime.now().isoformat(),
            response_text=response_text[:500],  # 截取前500字符
            references=references,
            brand_mentions=brand_mentions,
            domain_mentions=domain_mentions,
            answer_length=len(response_text),
            model=result["model"],
            latency_ms=result["latency_ms"]
        )

    def run_monitoring_cycle(self) -> List[QueryResult]:
        """执行一轮完整监控"""
        results = []
        for keyword in self.config.target_keywords:
            logger.info(f"查询关键词: {keyword}")
            result = self.run_single_query(keyword)
            if result:
                results.append(result)
                logger.info(f"  引用数: {len(result.references)}, "
                           f"品牌提及: {result.brand_mentions}")
            time.sleep(2)  # 请求间隔,避免限流
        return results

3. 数据持久化模块

# storage.py
import sqlite3
import json
from datetime import datetime
from typing import List

class MonitorStorage:
    def __init__(self, db_path: str):
        self.db_path = db_path
        self.init_database()
    
    def init_database(self):
        """初始化数据库表结构"""
        conn = sqlite3.connect(self.db_path)
        cursor = conn.cursor()
        
        cursor.execute("""
            CREATE TABLE IF NOT EXISTS query_results (
                id INTEGER PRIMARY KEY AUTOINCREMENT,
                query TEXT NOT NULL,
                timestamp TEXT NOT NULL,
                response_text TEXT,
                references_json TEXT,
                brand_mentions_json TEXT,
                domain_mentions_json TEXT,
                answer_length INTEGER,
                model TEXT,
                latency_ms REAL
            )
        """)
        
        cursor.execute("""
            CREATE TABLE IF NOT EXISTS daily_stats (
                date TEXT PRIMARY KEY,
                total_queries INTEGER,
                total_references INTEGER,
                brand_mention_count INTEGER,
                domain_mention_count INTEGER,
                avg_latency_ms REAL
            )
        """)
        
        conn.commit()
        conn.close()
    
    def save_result(self, result):
        """保存单次查询结果"""
        conn = sqlite3.connect(self.db_path)
        cursor = conn.cursor()
        
        cursor.execute("""
            INSERT INTO query_results 
            (query, timestamp, response_text, references_json, 
             brand_mentions_json, domain_mentions_json, 
             answer_length, model, latency_ms)
            VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)
        """, (
            result.query,
            result.timestamp,
            result.response_text,
            json.dumps(result.references, ensure_ascii=False),
            json.dumps(result.brand_mentions, ensure_ascii=False),
            json.dumps(result.domain_mentions, ensure_ascii=False),
            result.answer_length,
            result.model,
            result.latency_ms
        ))
        
        conn.commit()
        conn.close()
    
    def get_latest_results(self, query: str, limit: int = 10) -> List:
        """获取指定查询的最新结果"""
        conn = sqlite3.connect(self.db_path)
        cursor = conn.cursor()
        
        cursor.execute("""
            SELECT timestamp, references_json, brand_mentions_json
            FROM query_results
            WHERE query = ?
            ORDER BY timestamp DESC
            LIMIT ?
        """, (query, limit))
        
        rows = cursor.fetchall()
        conn.close()
        
        return [
            {
                "timestamp": row[0],
                "references": json.loads(row[1]),
                "brand_mentions": json.loads(row[2])
            }
            for row in rows
        ]

4. 告警与通知模块

# alerting.py
import json
import logging
from typing import List, Dict
import httpx

logger = logging.getLogger(__name__)

class AlertManager:
    def __init__(self, webhook_url: str):
        self.webhook_url = webhook_url
    
    def send_dingtalk_alert(self, title: str, content: str):
        """发送钉钉告警"""
        payload = {
            "msgtype": "markdown",
            "markdown": {
                "title": title,
                "text": f"## {title}\n\n{content}"
            }
        }
        self._send_webhook(payload)
    
    def send_feishu_alert(self, title: str, content: str):
        """发送飞书告警"""
        payload = {
            "msg_type": "interactive",
            "card": {
                "header": {
                    "title": {"tag": "plain_text", "content": title}
                },
                "elements": [
                    {"tag": "markdown", "content": content}
                ]
            }
        }
        self._send_webhook(payload)
    
    def _send_webhook(self, payload: Dict):
        """发送Webhook请求"""
        try:
            response = httpx.post(
                self.webhook_url,
                json=payload,
                timeout=10
            )
            if response.status_code != 200:
                logger.error(f"告警发送失败: {response.status_code}")
        except Exception as e:
            logger.error(f"告警发送异常: {str(e)}")
    
    def check_anomalies(self, current_results: List, historical_data: List) -> List[str]:
        """检测异常并生成告警"""
        alerts = []
        
        # 检测引用率骤降
        for result in current_results:
            query = result.query
            ref_count = len(result.references)
            
            # 获取历史平均引用数
            historical_refs = [
                len(h["references"]) 
                for h in historical_data 
                if h["query"] == query
            ]
            
            if historical_refs:
                avg_refs = sum(historical_refs) / len(historical_refs)
                if avg_refs > 0 and ref_count / avg_refs < 0.5:
                    alerts.append(
                        f"⚠️ 引用率异常: '{query}' 引用数从 {avg_refs:.1f} 降至 {ref_count}"
                    )
        
        # 检测品牌提及消失
        for result in current_results:
            if not result.brand_mentions:
                # 检查历史是否有提及
                for h in historical_data:
                    if h["query"] == result.query and h["brand_mentions"]:
                        alerts.append(
                            f"🔴 品牌提及消失: '{result.query}' 中品牌 {h['brand_mentions']} 不再被提及"
                        )
                        break
        
        return alerts

5. 主程序入口

# main.py
import argparse
import logging
from datetime import datetime
from pathlib import Path

from config import default_config, MonitorConfig
from deepseek_monitor import DeepSeekMonitor
from storage import MonitorStorage
from alerting import AlertManager

logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger(__name__)

def run_monitor(config: MonitorConfig):
    """执行一次完整监控流程"""
    
    # 初始化组件
    monitor = DeepSeekMonitor(config)
    storage = MonitorStorage(config.db_path)
    alert_manager = AlertManager(config.alert_webhook_url)
    
    # 执行监控
    logger.info(f"开始监控周期: {datetime.now().isoformat()}")
    results = monitor.run_monitoring_cycle()
    
    # 保存结果
    for result in results:
        storage.save_result(result)
    
    # 异常检测
    for result in results:
        historical = storage.get_latest_results(result.query, limit=20)
        alerts = alert_manager.check_anomalies([result], historical)
        
        for alert in alerts:
            logger.warning(alert)
            if config.alert_webhook_url:
                alert_manager.send_dingtalk_alert(
                    "DeepSeek引用监控告警",
                    alert
                )
    
    # 输出摘要
    logger.info(f"监控完成: {len(results)} 个查询")
    for r in results:
        logger.info(f"  {r.query}: {len(r.references)} 引用, "
                   f"品牌提及: {r.brand_mentions}")
    
    return results

def export_to_csv(results, output_path: str):
    """导出结果为CSV"""
    import csv
    
    with open(output_path, 'w', newline='', encoding='utf-8') as f:
        writer = csv.writer(f)
        writer.writerow([
            "查询词", "时间戳", "引用数", "品牌提及", 
            "域名提及", "回答长度", "延迟(ms)"
        ])
        for r in results:
            writer.writerow([
                r.query,
                r.timestamp,
                len(r.references),
                ",".join(r.brand_mentions),
                ",".join(r.domain_mentions),
                r.answer_length,
                f"{r.latency_ms:.1f}"
            ])

if __name__ == "__main__":
    parser = argparse.ArgumentParser(description="DeepSeek API监控脚本")
    parser.add_argument("--config", type=str, help="配置文件路径")
    parser.add_argument("--export-csv", type=str, help="导出CSV路径")
    parser.add_argument("--once", action="store_true", help="仅运行一次")
    
    args = parser.parse_args()
    
    config = default_config
    if args.config:
        # 从JSON配置文件加载
        import json
        with open(args.config, 'r') as f:
            config_data = json.load(f)
        config = MonitorConfig(**config_data)
    
    results = run_monitor(config)
    
    if args.export_csv:
        export_to_csv(results, args.export_csv)
        logger.info(f"结果已导出至: {args.export_csv}")
    
    if not args.once:
        # 持续运行模式
        import time
        while True:
            logger.info(f"等待 {config.check_interval_minutes} 分钟后下一轮...")
            time.sleep(config.check_interval_minutes * 60)
            run_monitor(config)

部署与使用

Docker Compose部署

# docker-compose.yml
version: '3.8'

services:
  deepseek-monitor:
    build: .
    environment:
      - DEEPSEEK_API_KEY=${DEEPSEEK_API_KEY}
      - ALERT_WEBHOOK_URL=${ALERT_WEBHOOK_URL}
    volumes:
      - ./data:/app/data
      - ./config.json:/app/config.json
    restart: unless-stopped
# Dockerfile
FROM python:3.10-slim

WORKDIR /app

COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt

COPY . .

CMD ["python", "main.py"]
# requirements.txt
httpx>=0.24.0
apscheduler>=3.10.0
pandas>=1.5.0
python-dotenv>=1.0.0

配置文件示例

{
  "api_key": "sk-your-deepseek-api-key",
  "target_keywords": [
    "你的产品名称 怎么样",
    "你的品牌 推荐",
    "如何解决 [你的领域问题]"
  ],
  "target_domains": ["yourdomain.com"],
  "target_brands": ["YourBrand"],
  "check_interval_minutes": 120,
  "alert_webhook_url": "https://oapi.dingtalk.com/robot/send?access_token=xxx"
}

运行命令

# 单次运行
python main.py --once --export-csv ./report.csv

# 持续监控
python main.py --config ./config.json

# Docker部署
docker-compose up -d

最佳实践

  1. 关键词选择:优先选择包含品牌名+产品名的长尾问题,如“XX品牌怎么样值得买吗”
  2. 频率控制:建议每小时不超过20次API调用,避免触发限流
  3. 结果对比:同时监控竞品关键词,建立行业基准线
  4. 异常阈值:引用率下降30%即需关注,下降50%应立即告警
  5. 数据可视化:结合Grafana展示引用趋势图,便于长期分析

注意事项

  • DeepSeek API可能有调用频率限制,请查阅官方文档
  • 监控结果受模型版本更新影响,需记录模型版本号
  • 建议使用独立API Key,避免影响生产环境调用
  • 定期清理历史数据,避免数据库膨胀
Last Updated:: 5/9/26, 5:13 PM