Tailwind CSSTailwind CSS
Home
  • Tailwind CSS 书籍目录
  • Vue 3 开发实战指南
  • React 和 Next.js 学习
  • TypeScript
  • React开发框架书籍大纲
  • Shadcn学习大纲
  • Swift 编程语言:从入门到进阶
  • SwiftUI 学习指南
  • 函数式编程大纲
  • Swift 异步编程语言
  • Swift 协议化编程
  • SwiftUI MVVM 开发模式
  • SwiftUI 图表开发书籍
  • SwiftData
  • ArkTS编程语言:从入门到精通
  • 仓颉编程语言:从入门到精通
  • 鸿蒙手机客户端开发实战
  • WPF书籍
  • C#开发书籍
learn
  • 搜索未来:SEO与GEO双引擎实战手册
  • Java编程语言
  • Kotlin 编程入门与实战
  • /python/outline.html
  • Rust 开发入门
  • AI Agent
  • MCP (Model Context Protocol) 应用指南
  • 深度学习
  • 深度学习
  • 强化学习: 理论与实践
  • 扩散模型书籍
  • Agentic AI for Everyone
langchain
Home
  • Tailwind CSS 书籍目录
  • Vue 3 开发实战指南
  • React 和 Next.js 学习
  • TypeScript
  • React开发框架书籍大纲
  • Shadcn学习大纲
  • Swift 编程语言:从入门到进阶
  • SwiftUI 学习指南
  • 函数式编程大纲
  • Swift 异步编程语言
  • Swift 协议化编程
  • SwiftUI MVVM 开发模式
  • SwiftUI 图表开发书籍
  • SwiftData
  • ArkTS编程语言:从入门到精通
  • 仓颉编程语言:从入门到精通
  • 鸿蒙手机客户端开发实战
  • WPF书籍
  • C#开发书籍
learn
  • 搜索未来:SEO与GEO双引擎实战手册
  • Java编程语言
  • Kotlin 编程入门与实战
  • /python/outline.html
  • Rust 开发入门
  • AI Agent
  • MCP (Model Context Protocol) 应用指南
  • 深度学习
  • 深度学习
  • 强化学习: 理论与实践
  • 扩散模型书籍
  • Agentic AI for Everyone
langchain
  • 附录E:全栈代码示例库

附录E:全栈代码示例库

本附录收录了贯穿全书的核心代码示例,涵盖动态结构化数据生成、爬虫日志分析、生成引擎监控、预渲染配置以及API设计等关键环节。所有代码均以生产可用为标准,并附有详细注释。


E.1 动态生成JSON-LD中间件

场景

在SSR(Next.js/Nuxt)或CSR(React/Vue)应用中,根据页面类型动态注入面向生成式引擎的增强Schema标记。

Next.js 中间件示例 (middleware.ts)

// middleware.ts
import { NextResponse } from 'next/server';
import type { NextRequest } from 'next/server';

export function middleware(request: NextRequest) {
  const response = NextResponse.next();
  const url = request.nextUrl.pathname;

  // 根据路径判断页面类型
  let schema = {};

  if (url.startsWith('/product/')) {
    const productId = url.split('/')[2];
    schema = generateProductSchema(productId);
  } else if (url.startsWith('/faq/')) {
    schema = generateFAQSchema(url);
  } else if (url.startsWith('/article/')) {
    schema = generateArticleSchema(url);
  }

  // 将Schema注入到响应头(SSR时由Layout组件读取)
  response.headers.set('x-schema-json', JSON.stringify(schema));
  
  return response;
}

function generateProductSchema(id: string) {
  return {
    "@context": "https://schema.org",
    "@type": "Product",
    "name": `产品名称_${id}`,
    "description": "产品详细描述",
    "sku": id,
    "brand": {
      "@type": "Brand",
      "name": "品牌名称"
    },
    "offers": {
      "@type": "Offer",
      "priceCurrency": "CNY",
      "price": "299.00",
      "availability": "https://schema.org/InStock"
    }
  };
}

function generateFAQSchema(url: string) {
  // 从CMS或API获取FAQ数据
  return {
    "@context": "https://schema.org",
    "@type": "FAQPage",
    "mainEntity": [
      {
        "@type": "Question",
        "name": "常见问题1",
        "acceptedAnswer": {
          "@type": "Answer",
          "text": "答案内容"
        }
      }
    ]
  };
}

function generateArticleSchema(url: string) {
  return {
    "@context": "https://schema.org",
    "@type": "Article",
    "headline": "文章标题",
    "datePublished": "2024-01-01",
    "author": {
      "@type": "Person",
      "name": "作者名"
    }
  };
}

Django 中间件示例 (middleware.py)

# middleware.py
import json
from django.http import HttpResponse
from django.urls import resolve

class SchemaInjectionMiddleware:
    def __init__(self, get_response):
        self.get_response = get_response

    def __call__(self, request):
        response = self.get_response(request)
        
        # 仅对HTML响应注入
        if 'text/html' in response.get('Content-Type', ''):
            schema = self.get_schema_for_request(request)
            if schema:
                # 在</head>前插入JSON-LD
                schema_script = f'<script type="application/ld+json">{json.dumps(schema)}</script>'
                content = response.content.decode('utf-8')
                content = content.replace('</head>', f'{schema_script}</head>')
                response.content = content.encode('utf-8')
        
        return response

    def get_schema_for_request(self, request):
        url_name = resolve(request.path_info).url_name
        
        if url_name == 'product_detail':
            return {
                "@context": "https://schema.org",
                "@type": "Product",
                "name": request.GET.get('name', '默认产品'),
                "description": "产品描述"
            }
        elif url_name == 'faq_list':
            return {
                "@context": "https://schema.org",
                "@type": "FAQPage",
                "mainEntity": []
            }
        return None

E.2 自建Perplexity引用监控脚本

场景

定期查询Perplexity API,检测指定品牌或关键词在生成摘要中的出现情况。

Python 脚本 (perplexity_monitor.py)

#!/usr/bin/env python3
# perplexity_monitor.py
import requests
import json
import time
import logging
from datetime import datetime

# 配置
PERPLEXITY_API_KEY = "your_api_key_here"
MONITOR_KEYWORDS = ["品牌名", "产品名", "核心关键词"]
CHECK_INTERVAL = 3600  # 1小时
OUTPUT_FILE = "perplexity_references.json"

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

def query_perplexity(prompt: str) -> dict:
    """调用Perplexity API"""
    headers = {
        "Authorization": f"Bearer {PERPLEXITY_API_KEY}",
        "Content-Type": "application/json"
    }
    
    payload = {
        "model": "sonar-pro",
        "messages": [
            {
                "role": "system",
                "content": "你是一个搜索助手。请搜索相关信息并返回引用来源。"
            },
            {
                "role": "user",
                "content": f"搜索关于'{prompt}'的最新信息,并列出所有引用的来源URL。"
            }
        ],
        "max_tokens": 1000,
        "temperature": 0.1
    }
    
    try:
        response = requests.post(
            "https://api.perplexity.ai/chat/completions",
            headers=headers,
            json=payload,
            timeout=30
        )
        response.raise_for_status()
        return response.json()
    except Exception as e:
        logger.error(f"API调用失败: {e}")
        return None

def extract_references(response: dict) -> list:
    """从响应中提取引用URL"""
    references = []
    if response and 'choices' in response:
        content = response['choices'][0]['message']['content']
        # 简单正则提取URL
        import re
        urls = re.findall(r'https?://[^\s]+', content)
        references = list(set(urls))
    return references

def check_brand_presence(response: dict, brand: str) -> bool:
    """检查品牌是否在回答中被提及"""
    if response and 'choices' in response:
        content = response['choices'][0]['message']['content']
        return brand.lower() in content.lower()
    return False

def main():
    results = []
    
    for keyword in MONITOR_KEYWORDS:
        logger.info(f"正在检查关键词: {keyword}")
        response = query_perplexity(keyword)
        
        if response:
            references = extract_references(response)
            brand_mentioned = check_brand_presence(response, "品牌名")
            
            result = {
                "timestamp": datetime.now().isoformat(),
                "keyword": keyword,
                "brand_mentioned": brand_mentioned,
                "reference_count": len(references),
                "references": references[:10],  # 只保存前10个
                "raw_response": response['choices'][0]['message']['content'][:500]
            }
            results.append(result)
            
            logger.info(f"  品牌提及: {brand_mentioned}, 引用数: {len(references)}")
        else:
            results.append({
                "timestamp": datetime.now().isoformat(),
                "keyword": keyword,
                "error": "API调用失败"
            })
        
        time.sleep(2)  # 避免频率限制
    
    # 保存结果
    try:
        with open(OUTPUT_FILE, 'r') as f:
            existing = json.load(f)
    except (FileNotFoundError, json.JSONDecodeError):
        existing = []
    
    existing.extend(results)
    # 只保留最近100条记录
    existing = existing[-100:]
    
    with open(OUTPUT_FILE, 'w') as f:
        json.dump(existing, f, ensure_ascii=False, indent=2)
    
    logger.info(f"监控完成,结果已保存至 {OUTPUT_FILE}")

if __name__ == "__main__":
    main()

E.3 Nginx日志分析脚本

场景

分析Nginx访问日志,提取AI爬虫(GPTBot、ClaudeBot等)的访问模式、频率和页面偏好。

Python 脚本 (nginx_log_analyzer.py)

#!/usr/bin/env python3
# nginx_log_analyzer.py
import re
import json
from collections import Counter, defaultdict
from datetime import datetime
import argparse

# 常见AI爬虫User-Agent模式
AI_CRAWLERS = {
    'GPTBot': r'GPTBot',
    'ClaudeBot': r'ClaudeBot',
    'GoogleOther': r'GoogleOther',
    'CCBot': r'CCBot',
    'Bytespider': r'Bytespider',
    'DeepSeek-Bot': r'DeepSeek-Bot',
    'PerplexityBot': r'PerplexityBot',
    'AmazonBot': r'Amazonbot'
}

def parse_nginx_log_line(line: str) -> dict:
    """解析单行Nginx日志(combined格式)"""
    pattern = r'(\S+) (\S+) (\S+) \[([^\]]+)\] "([^"]*)" (\d+) (\d+) "([^"]*)" "([^"]*)"'
    match = re.match(pattern, line)
    
    if not match:
        return None
    
    return {
        'ip': match.group(1),
        'ident': match.group(2),
        'user': match.group(3),
        'time': match.group(4),
        'request': match.group(5),
        'status': int(match.group(6)),
        'size': int(match.group(7)),
        'referer': match.group(8),
        'user_agent': match.group(9)
    }

def detect_ai_crawler(user_agent: str) -> str:
    """检测是否为AI爬虫并返回类型"""
    for name, pattern in AI_CRAWLERS.items():
        if re.search(pattern, user_agent, re.IGNORECASE):
            return name
    return None

def analyze_log_file(filepath: str):
    """分析日志文件"""
    crawler_stats = defaultdict(lambda: {
        'total_requests': 0,
        'status_codes': Counter(),
        'paths': Counter(),
        'hourly_distribution': Counter(),
        'response_sizes': []
    })
    
    total_lines = 0
    crawler_lines = 0
    
    with open(filepath, 'r', encoding='utf-8', errors='ignore') as f:
        for line in f:
            total_lines += 1
            parsed = parse_nginx_log_line(line)
            
            if not parsed:
                continue
            
            crawler_type = detect_ai_crawler(parsed['user_agent'])
            if crawler_type:
                crawler_lines += 1
                stats = crawler_stats[crawler_type]
                stats['total_requests'] += 1
                stats['status_codes'][parsed['status']] += 1
                
                # 提取请求路径
                request_parts = parsed['request'].split()
                if len(request_parts) >= 2:
                    stats['paths'][request_parts[1]] += 1
                
                # 提取小时
                try:
                    hour = datetime.strptime(parsed['time'].split()[0], '%d/%b/%Y:%H:%M:%S').hour
                    stats['hourly_distribution'][hour] += 1
                except:
                    pass
                
                stats['response_sizes'].append(parsed['size'])
    
    return {
        'total_lines': total_lines,
        'crawler_lines': crawler_lines,
        'crawler_stats': dict(crawler_stats)
    }

def print_report(analysis: dict):
    """打印分析报告"""
    print(f"\n{'='*60}")
    print(f"日志分析报告")
    print(f"{'='*60}")
    print(f"总日志行数: {analysis['total_lines']}")
    print(f"AI爬虫请求数: {analysis['crawler_lines']} ({analysis['crawler_lines']/analysis['total_lines']*100:.2f}%)")
    print(f"{'='*60}")
    
    for crawler, stats in analysis['crawler_stats'].items():
        print(f"\n--- {crawler} ---")
        print(f"  总请求: {stats['total_requests']}")
        print(f"  状态码分布: {dict(stats['status_codes'])}")
        print(f"  平均响应大小: {sum(stats['response_sizes'])/len(stats['response_sizes']):.0f} bytes" if stats['response_sizes'] else "  N/A")
        
        print(f"  最常访问的路径(Top 5):")
        for path, count in stats['paths'].most_common(5):
            print(f"    {path}: {count}次")
        
        print(f"  访问时段分布(Top 3):")
        for hour, count in stats['hourly_distribution'].most_common(3):
            print(f"    {hour}:00 - {count}次")

def main():
    parser = argparse.ArgumentParser(description='Nginx日志AI爬虫分析工具')
    parser.add_argument('logfile', help='Nginx访问日志文件路径')
    parser.add_argument('--json', action='store_true', help='以JSON格式输出')
    args = parser.parse_args()
    
    analysis = analyze_log_file(args.logfile)
    
    if args.json:
        print(json.dumps(analysis, indent=2, ensure_ascii=False))
    else:
        print_report(analysis)

if __name__ == "__main__":
    main()

E.4 React/Vue预渲染配置

场景

为SPA应用配置预渲染,确保搜索引擎和AI爬虫能正确抓取内容。

React (Next.js) 预渲染配置示例

// next.config.js
module.exports = {
  // 静态生成(SSG)配置
  output: 'export',
  
  // 动态路由预渲染
  async generateStaticParams() {
    const products = await fetch('https://api.example.com/products').then(res => res.json());
    const faqs = await fetch('https://api.example.com/faqs').then(res => res.json());
    
    return [
      ...products.map(product => ({
        category: product.category,
        id: product.id
      })),
      ...faqs.map(faq => ({
        id: faq.id
      }))
    ];
  }
};

Vue (Nuxt 3) 预渲染配置示例

// nuxt.config.ts
export default defineNuxtConfig({
  ssr: true, // 启用服务端渲染
  
  routeRules: {
    // 静态生成策略
    '/': { prerender: true },
    '/about': { prerender: true },
    '/faq/**': { prerender: true },
    '/product/**': { prerender: true },
    
    // 混合策略:部分页面SSR,部分静态
    '/blog/**': { ssr: true }, // 博客使用SSR
    '/dashboard/**': { ssr: false }, // 仪表盘使用CSR
  },
  
  nitro: {
    prerender: {
      crawlLinks: true, // 自动爬取链接
      routes: ['/'], // 额外预渲染路由
      ignore: ['/admin/**'] // 忽略路由
    }
  }
});

通用预渲染方案 (Puppeteer + Rendertron)

// prerender.js
const puppeteer = require('puppeteer');
const fs = require('fs');
const path = require('path');

const BASE_URL = 'https://example.com';
const OUTPUT_DIR = './prerendered';
const ROUTES = [
  '/',
  '/about',
  '/faq',
  '/faq/question-1',
  '/faq/question-2',
  '/product/product-1',
  '/product/product-2'
];

async function prerenderPage(browser, route) {
  const page = await browser.newPage();
  
  // 设置视口和User-Agent
  await page.setViewport({ width: 1920, height: 1080 });
  await page.setUserAgent('Mozilla/5.0 (compatible; PrerenderBot/1.0)');
  
  try {
    const url = `${BASE_URL}${route}`;
    console.log(`预渲染: ${url}`);
    
    await page.goto(url, {
      waitUntil: 'networkidle0', // 等待网络空闲
      timeout: 30000
    });
    
    // 等待关键元素加载
    await page.waitForSelector('#app-content', { timeout: 10000 });
    
    // 获取渲染后的HTML
    const html = await page.content();
    
    // 保存到文件
    const filePath = path.join(OUTPUT_DIR, route === '/' ? 'index.html' : `${route}.html`);
    const dir = path.dirname(filePath);
    
    if (!fs.existsSync(dir)) {
      fs.mkdirSync(dir, { recursive: true });
    }
    
    fs.writeFileSync(filePath, html, 'utf-8');
    console.log(`  已保存: ${filePath}`);
    
  } catch (error) {
    console.error(`  预渲染失败 ${route}: ${error.message}`);
  } finally {
    await page.close();
  }
}

async function main() {
  console.log('开始预渲染...');
  
  // 创建输出目录
  if (!fs.existsSync(OUTPUT_DIR)) {
    fs.mkdirSync(OUTPUT_DIR, { recursive: true });
  }
  
  const browser = await puppeteer.launch({
    headless: 'new',
    args: ['--no-sandbox', '--disable-setuid-sandbox']
  });
  
  for (const route of ROUTES) {
    await prerenderPage(browser, route);
  }
  
  await browser.close();
  console.log('预渲染完成!');
}

main().catch(console.error);

E.5 App与Web内容统一API设计

场景

设计一套API,同时为Web端和App端提供结构化内容,并支持生成式引擎的深度链接引用。

Node.js (Express) API 示例

// api.js
const express = require('express');
const router = express.Router();

// 内容统一API端点
router.get('/api/v1/content/:type/:id', async (req, res) => {
  const { type, id } = req.params;
  const source = req.query.source || 'web'; // web, ios, android
  
  try {
    let content;
    
    switch(type) {
      case 'product':
        content = await getProduct(id);
        break;
      case 'faq':
        content = await getFAQ(id);
        break;
      case 'article':
        content = await getArticle(id);
        break;
      default:
        return res.status(400).json({ error: 'Invalid content type' });
    }
    
    // 构建统一响应
    const response = {
      id: content.id,
      type: type,
      title: content.title,
      description: content.description,
      structured_data: buildStructuredData(content, type),
      
      // 深度链接
      deep_links: {
        web: `https://example.com/${type}/${id}`,
        ios: `myapp://${type}/${id}`,
        android: `intent://${type}/${id}#Intent;package=com.example.app;end`
      },
      
      // 生成引擎优化字段
      geo_optimized: {
        summary: content.summary || content.description,
        key_points: content.key_points || [],
        related_entities: content.related || [],
        last_updated: content.updated_at
      },
      
      // 原始数据
      raw: source === 'api' ? content : undefined
    };
    
    // 设置缓存头
    res.set('Cache-Control', 'public, max-age=3600');
    res.set('X-Content-Type-Options', 'nosniff');
    
    res.json(response);
    
  } catch (error) {
    console.error(`获取内容失败: ${error.message}`);
    res.status(404).json({ error: 'Content not found' });
  }
});

// 批量内容端点(用于sitemap)
router.get('/api/v1/content/batch', async (req, res) => {
  const { type, ids } = req.query;
  
  if (!type || !ids) {
    return res.status(400).json({ error: 'Missing type or ids parameter' });
  }
  
  const idArray = ids.split(',');
  const contents = [];
  
  for (const id of idArray) {
    try {
      const content = await getContent(type, id);
      if (content) {
        contents.push({
          id: content.id,
          title: content.title,
          url: `https://example.com/${type}/${id}`,
          last_modified: content.updated_at
        });
      }
    } catch (e) {
      // 跳过不存在的ID
    }
  }
  
  res.json({
    total: contents.length,
    contents
  });
});

function buildStructuredData(content, type) {
  const base = {
    "@context": "https://schema.org"
  };
  
  switch(type) {
    case 'product':
      return {
        ...base,
        "@type": "Product",
        "name": content.title,
        "description": content.description,
        "sku": content.sku,
        "brand": content.brand,
        "offers": {
          "@type": "Offer",
          "price": content.price,
          "priceCurrency": "CNY"
        }
      };
    case 'faq':
      return {
        ...base,
        "@type": "FAQPage",
        "mainEntity": [{
          "@type": "Question",
          "name": content.question,
          "acceptedAnswer": {
            "@type": "Answer",
            "text": content.answer
          }
        }]
      };
    default:
      return base;
  }
}

async function getProduct(id) {
  // 从数据库获取产品
  return {
    id,
    title: `产品${id}`,
    description: '产品描述',
    sku: `SKU-${id}`,
    price: 299.00,
    brand: '品牌名',
    summary: '产品摘要',
    key_points: ['特点1', '特点2'],
    updated_at: new Date().toISOString()
  };
}

async function getFAQ(id) {
  // 从数据库获取FAQ
  return {
    id,
    question: '常见问题',
    answer: '答案内容',
    summary: '答案摘要',
    key_points: ['要点1', '要点2'],
    updated_at: new Date().toISOString()
  };
}

module.exports = router;

E.6 DeepSeek API监控脚本

场景

通过DeepSeek API定期查询品牌/关键词的引用情况,监控在生成答案中的出现率。

Python 脚本 (deepseek_monitor.py)

#!/usr/bin/env python3
# deepseek_monitor.py
import requests
import json
import time
import logging
from datetime import datetime

# 配置
DEEPSEEK_API_KEY = "your_deepseek_api_key"
DEEPSEEK_API_URL = "https://api.deepseek.com/v1/chat/completions"
MONITOR_KEYWORDS = ["品牌名", "产品名", "核心技术"]
CHECK_INTERVAL = 7200  # 2小时
OUTPUT_FILE = "deepseek_references.json"

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

def query_deepseek(prompt: str) -> dict:
    """调用DeepSeek API"""
    headers = {
        "Authorization": f"Bearer {DEEPSEEK_API_KEY}",
        "Content-Type": "application/json"
    }
    
    payload = {
        "model": "deepseek-chat",
        "messages": [
            {
                "role": "system",
                "content": "你是一个搜索助手。请搜索相关信息并返回引用来源。请确保回答中包含具体的来源URL。"
            },
            {
                "role": "user",
                "content": f"请搜索关于'{prompt}'的信息,并列出所有引用的来源。请用以下格式返回:\n\n回答:[内容]\n\n引用来源:\n1. [URL] - [来源描述]\n2. [URL] - [来源描述]\n..."
            }
        ],
        "max_tokens": 2000,
        "temperature": 0.1,
        "stream": False
    }
    
    try:
        response = requests.post(
            DEEPSEEK_API_URL,
            headers=headers,
            json=payload,
            timeout=60
        )
        response.raise_for_status()
        return response.json()
    except Exception as e:
        logger.error(f"DeepSeek API调用失败: {e}")
        return None

def extract_references(response: dict) -> list:
    """从DeepSeek响应中提取引用URL"""
    references = []
    if response and 'choices' in response:
        content = response['choices'][0]['message']['content']
        
        # 提取引用来源部分
        import re
        # 匹配 "数字. URL" 格式
        ref_pattern = r'\d+\.\s*(https?://[^\s\n]+)'
        matches = re.findall(ref_pattern, content)
        references.extend(matches)
        
        # 也提取行内URL
        inline_urls = re.findall(r'https?://[^\s\n\)]+', content)
        references.extend(inline_urls)
        
        # 去重
        references = list(set(references))
    
    return references

def check_brand_authority(response: dict, brand: str) -> dict:
    """检查品牌在回答中的权威度"""
    result = {
        "mentioned": False,
        "position": None,
        "context": None
    }
    
    if response and 'choices' in response:
        content = response['choices'][0]['message']['content']
        
        # 检查品牌是否被提及
        if brand.lower() in content.lower():
            result["mentioned"] = True
            
            # 查找品牌出现的位置
            index = content.lower().find(brand.lower())
            result["position"] = index
            
            # 提取上下文
            start = max(0, index - 100)
            end = min(len(content), index + 100)
            result["context"] = content[start:end]
    
    return result

def main():
    logger.info("启动DeepSeek引用监控...")
    
    while True:
        results = []
        
        for keyword in MONITOR_KEYWORDS:
            logger.info(f"正在查询: {keyword}")
            response = query_deepseek(keyword)
            
            if response:
                references = extract_references(response)
                brand_info = check_brand_authority(response, "品牌名")
                
                result = {
                    "timestamp": datetime.now().isoformat(),
                    "keyword": keyword,
                    "brand_mentioned": brand_info["mentioned"],
                    "brand_context": brand_info["context"],
                    "reference_count": len(references),
                    "references": references[:15],
                    "response_preview": response['choices'][0]['message']['content'][:300]
                }
                
                results.append(result)
                logger.info(f"  品牌提及: {brand_info['mentioned']}, 引用数: {len(references)}")
            else:
                results.append({
                    "timestamp": datetime.now().isoformat(),
                    "keyword": keyword,
                    "error": "API调用失败"
                })
            
            time.sleep(3)  # 避免频率限制
        
        # 保存结果
        try:
            with open(OUTPUT_FILE, 'r') as f:
                existing = json.load(f)
        except (FileNotFoundError, json.JSONDecodeError):
            existing = []
        
        existing.extend(results)
        existing = existing[-200:]  # 保留最近200条
        
        with open(OUTPUT_FILE, 'w') as f:
            json.dump(existing, f, ensure_ascii=False, indent=2)
        
        logger.info(f"监控完成,等待{CHECK_INTERVAL}秒后再次检查...")
        time.sleep(CHECK_INTERVAL)

if __name__ == "__main__":
    main()

E.7 豆包答案变化追踪脚本

场景

监控豆包(字节跳动)对特定问题的回答变化,追踪品牌在答案中的出现情况和内容演变。

Python 脚本 (doubao_tracker.py)

#!/usr/bin/env python3
# doubao_tracker.py
import requests
import json
import time
import hashlib
from datetime import datetime
from difflib import Differ

# 配置
DOUBAO_API_URL = "https://api.doubao.com/v1/chat/completions"  # 示例URL
DOUBAO_API_KEY = "your_doubao_api_key"
MONITOR_QUESTIONS = [
    "什么是[品牌名]?",
    "[产品名]怎么样?",
    "推荐[品类]产品"
]
CHECK_INTERVAL = 3600  # 1小时
OUTPUT_DIR = "./doubao_tracking"

def query_doubao(question: str) -> dict:
    """调用豆包API"""
    headers = {
        "Authorization": f"Bearer {DOUBAO_API_KEY}",
        "Content-Type": "application/json"
    }
    
    payload = {
        "model": "doubao-pro",
        "messages": [
            {"role": "user", "content": question}
        ],
        "max_tokens": 1000,
        "temperature": 0.1
    }
    
    try:
        response = requests.post(
            DOUBAO_API_URL,
            headers=headers,
            json=payload,
            timeout=30
        )
        response.raise_for_status()
        return response.json()
    except Exception as e:
        print(f"豆包API调用失败: {e}")
        return None

def compute_hash(text: str) -> str:
    """计算文本哈希值用于变化检测"""
    return hashlib.md5(text.encode('utf-8')).hexdigest()

def compare_answers(old_answer: str, new_answer: str) -> dict:
    """比较两个答案的差异"""
    differ = Differ()
    
    old_lines = old_answer.splitlines()
    new_lines = new_answer.splitlines()
    
    diff = list(differ.compare(old_lines, new_lines))
    
    additions = [line[2:] for line in diff if line.startswith('+ ')]
    deletions = [line[2:] for line in diff if line.startswith('- ')]
    
    return {
        "has_changed": old_answer != new_answer,
        "additions": additions[:10],
        "deletions": deletions[:10],
        "diff_percentage": len(additions) / max(len(old_lines), 1) * 100
    }

def check_brand_mentions(answer: str, brand: str) -> dict:
    """检查品牌提及情况"""
    import re
    
    mentions = []
    for match in re.finditer(re.escape(brand), answer, re.IGNORECASE):
        start = max(0, match.start() - 50)
        end = min(len(answer), match.end() + 50)
        mentions.append({
            "position": match.start(),
            "context": answer[start:end]
        })
    
    return {
        "total_mentions": len(mentions),
        "mentions": mentions[:5],
        "mentioned": len(mentions) > 0
    }

def track_question(question: str):
    """追踪单个问题的答案变化"""
    import os
    
    # 创建输出目录
    if not os.path.exists(OUTPUT_DIR):
        os.makedirs(OUTPUT_DIR)
    
    # 文件名用问题哈希
    question_hash = hashlib.md5(question.encode()).hexdigest()
    history_file = os.path.join(OUTPUT_DIR, f"{question_hash}.json")
    
    # 加载历史记录
    try:
        with open(history_file, 'r') as f:
            history = json.load(f)
    except (FileNotFoundError, json.JSONDecodeError):
        history = []
    
    # 查询当前答案
    response = query_doubao(question)
    if not response:
        return
    
    current_answer = response['choices'][0]['message']['content']
    current_hash = compute_hash(current_answer)
    
    # 检查是否变化
    last_hash = history[-1]['hash'] if history else None
    has_changed = current_hash != last_hash
    
    # 比较差异
    diff_info = {}
    if history and has_changed:
        diff_info = compare_answers(history[-1]['answer'], current_answer)
    
    # 检查品牌提及
    brand_info = check_brand_mentions(current_answer, "品牌名")
    
    # 记录
    record = {
        "timestamp": datetime.now().isoformat(),
        "hash": current_hash,
        "answer": current_answer,
        "has_changed": has_changed,
        "brand_mentions": brand_info,
        "diff": diff_info
    }
    
    history.append(record)
    
    # 只保留最近50条记录
    history = history[-50:]
    
    with open(history_file, 'w') as f:
        json.dump(history, f, ensure_ascii=False, indent=2)
    
    # 打印摘要
    print(f"\n问题: {question}")
    print(f"时间: {record['timestamp']}")
    print(f"变化: {'是' if has_changed else '否'}")
    print(f"品牌提及: {brand_info['total_mentions']}次")
    if has_changed:
        print(f"新增内容: {diff_info.get('additions', [])[:3]}")
        print(f"删除内容: {diff_info.get('deletions', [])[:3]}")
    print("-" * 50)

def main():
    print("启动豆包答案变化追踪...")
    print(f"监控问题数: {len(MONITOR_QUESTIONS)}")
    print(f"检查间隔: {CHECK_INTERVAL}秒")
    print(f"输出目录: {OUTPUT_DIR}")
    print("=" * 50)
    
    while True:
        for question in MONITOR_QUESTIONS:
            try:
                track_question(question)
            except Exception as e:
                print(f"追踪问题失败: {question}, 错误: {e}")
            time.sleep(5)  # 问题间间隔
        
        print(f"\n等待{CHECK_INTERVAL}秒后再次检查...")
        time.sleep(CHECK_INTERVAL)

if __name__ == "__main__":
    main()

使用建议:

  1. 所有脚本均需根据实际API密钥和业务逻辑进行调整
  2. 建议将监控脚本部署为定时任务(cron job)或使用CI/CD流水线
  3. 对于生产环境,请添加错误重试、日志轮转和告警机制
  4. 注意API调用频率限制,避免触发限流
Last Updated:: 5/9/26, 5:13 PM