附录E:全栈代码示例库
本附录收录了贯穿全书的核心代码示例,涵盖动态结构化数据生成、爬虫日志分析、生成引擎监控、预渲染配置以及API设计等关键环节。所有代码均以生产可用为标准,并附有详细注释。
E.1 动态生成JSON-LD中间件
场景
在SSR(Next.js/Nuxt)或CSR(React/Vue)应用中,根据页面类型动态注入面向生成式引擎的增强Schema标记。
Next.js 中间件示例 (middleware.ts)
// middleware.ts
import { NextResponse } from 'next/server';
import type { NextRequest } from 'next/server';
export function middleware(request: NextRequest) {
const response = NextResponse.next();
const url = request.nextUrl.pathname;
// 根据路径判断页面类型
let schema = {};
if (url.startsWith('/product/')) {
const productId = url.split('/')[2];
schema = generateProductSchema(productId);
} else if (url.startsWith('/faq/')) {
schema = generateFAQSchema(url);
} else if (url.startsWith('/article/')) {
schema = generateArticleSchema(url);
}
// 将Schema注入到响应头(SSR时由Layout组件读取)
response.headers.set('x-schema-json', JSON.stringify(schema));
return response;
}
function generateProductSchema(id: string) {
return {
"@context": "https://schema.org",
"@type": "Product",
"name": `产品名称_${id}`,
"description": "产品详细描述",
"sku": id,
"brand": {
"@type": "Brand",
"name": "品牌名称"
},
"offers": {
"@type": "Offer",
"priceCurrency": "CNY",
"price": "299.00",
"availability": "https://schema.org/InStock"
}
};
}
function generateFAQSchema(url: string) {
// 从CMS或API获取FAQ数据
return {
"@context": "https://schema.org",
"@type": "FAQPage",
"mainEntity": [
{
"@type": "Question",
"name": "常见问题1",
"acceptedAnswer": {
"@type": "Answer",
"text": "答案内容"
}
}
]
};
}
function generateArticleSchema(url: string) {
return {
"@context": "https://schema.org",
"@type": "Article",
"headline": "文章标题",
"datePublished": "2024-01-01",
"author": {
"@type": "Person",
"name": "作者名"
}
};
}
Django 中间件示例 (middleware.py)
# middleware.py
import json
from django.http import HttpResponse
from django.urls import resolve
class SchemaInjectionMiddleware:
def __init__(self, get_response):
self.get_response = get_response
def __call__(self, request):
response = self.get_response(request)
# 仅对HTML响应注入
if 'text/html' in response.get('Content-Type', ''):
schema = self.get_schema_for_request(request)
if schema:
# 在</head>前插入JSON-LD
schema_script = f'<script type="application/ld+json">{json.dumps(schema)}</script>'
content = response.content.decode('utf-8')
content = content.replace('</head>', f'{schema_script}</head>')
response.content = content.encode('utf-8')
return response
def get_schema_for_request(self, request):
url_name = resolve(request.path_info).url_name
if url_name == 'product_detail':
return {
"@context": "https://schema.org",
"@type": "Product",
"name": request.GET.get('name', '默认产品'),
"description": "产品描述"
}
elif url_name == 'faq_list':
return {
"@context": "https://schema.org",
"@type": "FAQPage",
"mainEntity": []
}
return None
E.2 自建Perplexity引用监控脚本
场景
定期查询Perplexity API,检测指定品牌或关键词在生成摘要中的出现情况。
Python 脚本 (perplexity_monitor.py)
#!/usr/bin/env python3
# perplexity_monitor.py
import requests
import json
import time
import logging
from datetime import datetime
# 配置
PERPLEXITY_API_KEY = "your_api_key_here"
MONITOR_KEYWORDS = ["品牌名", "产品名", "核心关键词"]
CHECK_INTERVAL = 3600 # 1小时
OUTPUT_FILE = "perplexity_references.json"
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
def query_perplexity(prompt: str) -> dict:
"""调用Perplexity API"""
headers = {
"Authorization": f"Bearer {PERPLEXITY_API_KEY}",
"Content-Type": "application/json"
}
payload = {
"model": "sonar-pro",
"messages": [
{
"role": "system",
"content": "你是一个搜索助手。请搜索相关信息并返回引用来源。"
},
{
"role": "user",
"content": f"搜索关于'{prompt}'的最新信息,并列出所有引用的来源URL。"
}
],
"max_tokens": 1000,
"temperature": 0.1
}
try:
response = requests.post(
"https://api.perplexity.ai/chat/completions",
headers=headers,
json=payload,
timeout=30
)
response.raise_for_status()
return response.json()
except Exception as e:
logger.error(f"API调用失败: {e}")
return None
def extract_references(response: dict) -> list:
"""从响应中提取引用URL"""
references = []
if response and 'choices' in response:
content = response['choices'][0]['message']['content']
# 简单正则提取URL
import re
urls = re.findall(r'https?://[^\s]+', content)
references = list(set(urls))
return references
def check_brand_presence(response: dict, brand: str) -> bool:
"""检查品牌是否在回答中被提及"""
if response and 'choices' in response:
content = response['choices'][0]['message']['content']
return brand.lower() in content.lower()
return False
def main():
results = []
for keyword in MONITOR_KEYWORDS:
logger.info(f"正在检查关键词: {keyword}")
response = query_perplexity(keyword)
if response:
references = extract_references(response)
brand_mentioned = check_brand_presence(response, "品牌名")
result = {
"timestamp": datetime.now().isoformat(),
"keyword": keyword,
"brand_mentioned": brand_mentioned,
"reference_count": len(references),
"references": references[:10], # 只保存前10个
"raw_response": response['choices'][0]['message']['content'][:500]
}
results.append(result)
logger.info(f" 品牌提及: {brand_mentioned}, 引用数: {len(references)}")
else:
results.append({
"timestamp": datetime.now().isoformat(),
"keyword": keyword,
"error": "API调用失败"
})
time.sleep(2) # 避免频率限制
# 保存结果
try:
with open(OUTPUT_FILE, 'r') as f:
existing = json.load(f)
except (FileNotFoundError, json.JSONDecodeError):
existing = []
existing.extend(results)
# 只保留最近100条记录
existing = existing[-100:]
with open(OUTPUT_FILE, 'w') as f:
json.dump(existing, f, ensure_ascii=False, indent=2)
logger.info(f"监控完成,结果已保存至 {OUTPUT_FILE}")
if __name__ == "__main__":
main()
E.3 Nginx日志分析脚本
场景
分析Nginx访问日志,提取AI爬虫(GPTBot、ClaudeBot等)的访问模式、频率和页面偏好。
Python 脚本 (nginx_log_analyzer.py)
#!/usr/bin/env python3
# nginx_log_analyzer.py
import re
import json
from collections import Counter, defaultdict
from datetime import datetime
import argparse
# 常见AI爬虫User-Agent模式
AI_CRAWLERS = {
'GPTBot': r'GPTBot',
'ClaudeBot': r'ClaudeBot',
'GoogleOther': r'GoogleOther',
'CCBot': r'CCBot',
'Bytespider': r'Bytespider',
'DeepSeek-Bot': r'DeepSeek-Bot',
'PerplexityBot': r'PerplexityBot',
'AmazonBot': r'Amazonbot'
}
def parse_nginx_log_line(line: str) -> dict:
"""解析单行Nginx日志(combined格式)"""
pattern = r'(\S+) (\S+) (\S+) \[([^\]]+)\] "([^"]*)" (\d+) (\d+) "([^"]*)" "([^"]*)"'
match = re.match(pattern, line)
if not match:
return None
return {
'ip': match.group(1),
'ident': match.group(2),
'user': match.group(3),
'time': match.group(4),
'request': match.group(5),
'status': int(match.group(6)),
'size': int(match.group(7)),
'referer': match.group(8),
'user_agent': match.group(9)
}
def detect_ai_crawler(user_agent: str) -> str:
"""检测是否为AI爬虫并返回类型"""
for name, pattern in AI_CRAWLERS.items():
if re.search(pattern, user_agent, re.IGNORECASE):
return name
return None
def analyze_log_file(filepath: str):
"""分析日志文件"""
crawler_stats = defaultdict(lambda: {
'total_requests': 0,
'status_codes': Counter(),
'paths': Counter(),
'hourly_distribution': Counter(),
'response_sizes': []
})
total_lines = 0
crawler_lines = 0
with open(filepath, 'r', encoding='utf-8', errors='ignore') as f:
for line in f:
total_lines += 1
parsed = parse_nginx_log_line(line)
if not parsed:
continue
crawler_type = detect_ai_crawler(parsed['user_agent'])
if crawler_type:
crawler_lines += 1
stats = crawler_stats[crawler_type]
stats['total_requests'] += 1
stats['status_codes'][parsed['status']] += 1
# 提取请求路径
request_parts = parsed['request'].split()
if len(request_parts) >= 2:
stats['paths'][request_parts[1]] += 1
# 提取小时
try:
hour = datetime.strptime(parsed['time'].split()[0], '%d/%b/%Y:%H:%M:%S').hour
stats['hourly_distribution'][hour] += 1
except:
pass
stats['response_sizes'].append(parsed['size'])
return {
'total_lines': total_lines,
'crawler_lines': crawler_lines,
'crawler_stats': dict(crawler_stats)
}
def print_report(analysis: dict):
"""打印分析报告"""
print(f"\n{'='*60}")
print(f"日志分析报告")
print(f"{'='*60}")
print(f"总日志行数: {analysis['total_lines']}")
print(f"AI爬虫请求数: {analysis['crawler_lines']} ({analysis['crawler_lines']/analysis['total_lines']*100:.2f}%)")
print(f"{'='*60}")
for crawler, stats in analysis['crawler_stats'].items():
print(f"\n--- {crawler} ---")
print(f" 总请求: {stats['total_requests']}")
print(f" 状态码分布: {dict(stats['status_codes'])}")
print(f" 平均响应大小: {sum(stats['response_sizes'])/len(stats['response_sizes']):.0f} bytes" if stats['response_sizes'] else " N/A")
print(f" 最常访问的路径(Top 5):")
for path, count in stats['paths'].most_common(5):
print(f" {path}: {count}次")
print(f" 访问时段分布(Top 3):")
for hour, count in stats['hourly_distribution'].most_common(3):
print(f" {hour}:00 - {count}次")
def main():
parser = argparse.ArgumentParser(description='Nginx日志AI爬虫分析工具')
parser.add_argument('logfile', help='Nginx访问日志文件路径')
parser.add_argument('--json', action='store_true', help='以JSON格式输出')
args = parser.parse_args()
analysis = analyze_log_file(args.logfile)
if args.json:
print(json.dumps(analysis, indent=2, ensure_ascii=False))
else:
print_report(analysis)
if __name__ == "__main__":
main()
E.4 React/Vue预渲染配置
场景
为SPA应用配置预渲染,确保搜索引擎和AI爬虫能正确抓取内容。
React (Next.js) 预渲染配置示例
// next.config.js
module.exports = {
// 静态生成(SSG)配置
output: 'export',
// 动态路由预渲染
async generateStaticParams() {
const products = await fetch('https://api.example.com/products').then(res => res.json());
const faqs = await fetch('https://api.example.com/faqs').then(res => res.json());
return [
...products.map(product => ({
category: product.category,
id: product.id
})),
...faqs.map(faq => ({
id: faq.id
}))
];
}
};
Vue (Nuxt 3) 预渲染配置示例
// nuxt.config.ts
export default defineNuxtConfig({
ssr: true, // 启用服务端渲染
routeRules: {
// 静态生成策略
'/': { prerender: true },
'/about': { prerender: true },
'/faq/**': { prerender: true },
'/product/**': { prerender: true },
// 混合策略:部分页面SSR,部分静态
'/blog/**': { ssr: true }, // 博客使用SSR
'/dashboard/**': { ssr: false }, // 仪表盘使用CSR
},
nitro: {
prerender: {
crawlLinks: true, // 自动爬取链接
routes: ['/'], // 额外预渲染路由
ignore: ['/admin/**'] // 忽略路由
}
}
});
通用预渲染方案 (Puppeteer + Rendertron)
// prerender.js
const puppeteer = require('puppeteer');
const fs = require('fs');
const path = require('path');
const BASE_URL = 'https://example.com';
const OUTPUT_DIR = './prerendered';
const ROUTES = [
'/',
'/about',
'/faq',
'/faq/question-1',
'/faq/question-2',
'/product/product-1',
'/product/product-2'
];
async function prerenderPage(browser, route) {
const page = await browser.newPage();
// 设置视口和User-Agent
await page.setViewport({ width: 1920, height: 1080 });
await page.setUserAgent('Mozilla/5.0 (compatible; PrerenderBot/1.0)');
try {
const url = `${BASE_URL}${route}`;
console.log(`预渲染: ${url}`);
await page.goto(url, {
waitUntil: 'networkidle0', // 等待网络空闲
timeout: 30000
});
// 等待关键元素加载
await page.waitForSelector('#app-content', { timeout: 10000 });
// 获取渲染后的HTML
const html = await page.content();
// 保存到文件
const filePath = path.join(OUTPUT_DIR, route === '/' ? 'index.html' : `${route}.html`);
const dir = path.dirname(filePath);
if (!fs.existsSync(dir)) {
fs.mkdirSync(dir, { recursive: true });
}
fs.writeFileSync(filePath, html, 'utf-8');
console.log(` 已保存: ${filePath}`);
} catch (error) {
console.error(` 预渲染失败 ${route}: ${error.message}`);
} finally {
await page.close();
}
}
async function main() {
console.log('开始预渲染...');
// 创建输出目录
if (!fs.existsSync(OUTPUT_DIR)) {
fs.mkdirSync(OUTPUT_DIR, { recursive: true });
}
const browser = await puppeteer.launch({
headless: 'new',
args: ['--no-sandbox', '--disable-setuid-sandbox']
});
for (const route of ROUTES) {
await prerenderPage(browser, route);
}
await browser.close();
console.log('预渲染完成!');
}
main().catch(console.error);
E.5 App与Web内容统一API设计
场景
设计一套API,同时为Web端和App端提供结构化内容,并支持生成式引擎的深度链接引用。
Node.js (Express) API 示例
// api.js
const express = require('express');
const router = express.Router();
// 内容统一API端点
router.get('/api/v1/content/:type/:id', async (req, res) => {
const { type, id } = req.params;
const source = req.query.source || 'web'; // web, ios, android
try {
let content;
switch(type) {
case 'product':
content = await getProduct(id);
break;
case 'faq':
content = await getFAQ(id);
break;
case 'article':
content = await getArticle(id);
break;
default:
return res.status(400).json({ error: 'Invalid content type' });
}
// 构建统一响应
const response = {
id: content.id,
type: type,
title: content.title,
description: content.description,
structured_data: buildStructuredData(content, type),
// 深度链接
deep_links: {
web: `https://example.com/${type}/${id}`,
ios: `myapp://${type}/${id}`,
android: `intent://${type}/${id}#Intent;package=com.example.app;end`
},
// 生成引擎优化字段
geo_optimized: {
summary: content.summary || content.description,
key_points: content.key_points || [],
related_entities: content.related || [],
last_updated: content.updated_at
},
// 原始数据
raw: source === 'api' ? content : undefined
};
// 设置缓存头
res.set('Cache-Control', 'public, max-age=3600');
res.set('X-Content-Type-Options', 'nosniff');
res.json(response);
} catch (error) {
console.error(`获取内容失败: ${error.message}`);
res.status(404).json({ error: 'Content not found' });
}
});
// 批量内容端点(用于sitemap)
router.get('/api/v1/content/batch', async (req, res) => {
const { type, ids } = req.query;
if (!type || !ids) {
return res.status(400).json({ error: 'Missing type or ids parameter' });
}
const idArray = ids.split(',');
const contents = [];
for (const id of idArray) {
try {
const content = await getContent(type, id);
if (content) {
contents.push({
id: content.id,
title: content.title,
url: `https://example.com/${type}/${id}`,
last_modified: content.updated_at
});
}
} catch (e) {
// 跳过不存在的ID
}
}
res.json({
total: contents.length,
contents
});
});
function buildStructuredData(content, type) {
const base = {
"@context": "https://schema.org"
};
switch(type) {
case 'product':
return {
...base,
"@type": "Product",
"name": content.title,
"description": content.description,
"sku": content.sku,
"brand": content.brand,
"offers": {
"@type": "Offer",
"price": content.price,
"priceCurrency": "CNY"
}
};
case 'faq':
return {
...base,
"@type": "FAQPage",
"mainEntity": [{
"@type": "Question",
"name": content.question,
"acceptedAnswer": {
"@type": "Answer",
"text": content.answer
}
}]
};
default:
return base;
}
}
async function getProduct(id) {
// 从数据库获取产品
return {
id,
title: `产品${id}`,
description: '产品描述',
sku: `SKU-${id}`,
price: 299.00,
brand: '品牌名',
summary: '产品摘要',
key_points: ['特点1', '特点2'],
updated_at: new Date().toISOString()
};
}
async function getFAQ(id) {
// 从数据库获取FAQ
return {
id,
question: '常见问题',
answer: '答案内容',
summary: '答案摘要',
key_points: ['要点1', '要点2'],
updated_at: new Date().toISOString()
};
}
module.exports = router;
E.6 DeepSeek API监控脚本
场景
通过DeepSeek API定期查询品牌/关键词的引用情况,监控在生成答案中的出现率。
Python 脚本 (deepseek_monitor.py)
#!/usr/bin/env python3
# deepseek_monitor.py
import requests
import json
import time
import logging
from datetime import datetime
# 配置
DEEPSEEK_API_KEY = "your_deepseek_api_key"
DEEPSEEK_API_URL = "https://api.deepseek.com/v1/chat/completions"
MONITOR_KEYWORDS = ["品牌名", "产品名", "核心技术"]
CHECK_INTERVAL = 7200 # 2小时
OUTPUT_FILE = "deepseek_references.json"
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
def query_deepseek(prompt: str) -> dict:
"""调用DeepSeek API"""
headers = {
"Authorization": f"Bearer {DEEPSEEK_API_KEY}",
"Content-Type": "application/json"
}
payload = {
"model": "deepseek-chat",
"messages": [
{
"role": "system",
"content": "你是一个搜索助手。请搜索相关信息并返回引用来源。请确保回答中包含具体的来源URL。"
},
{
"role": "user",
"content": f"请搜索关于'{prompt}'的信息,并列出所有引用的来源。请用以下格式返回:\n\n回答:[内容]\n\n引用来源:\n1. [URL] - [来源描述]\n2. [URL] - [来源描述]\n..."
}
],
"max_tokens": 2000,
"temperature": 0.1,
"stream": False
}
try:
response = requests.post(
DEEPSEEK_API_URL,
headers=headers,
json=payload,
timeout=60
)
response.raise_for_status()
return response.json()
except Exception as e:
logger.error(f"DeepSeek API调用失败: {e}")
return None
def extract_references(response: dict) -> list:
"""从DeepSeek响应中提取引用URL"""
references = []
if response and 'choices' in response:
content = response['choices'][0]['message']['content']
# 提取引用来源部分
import re
# 匹配 "数字. URL" 格式
ref_pattern = r'\d+\.\s*(https?://[^\s\n]+)'
matches = re.findall(ref_pattern, content)
references.extend(matches)
# 也提取行内URL
inline_urls = re.findall(r'https?://[^\s\n\)]+', content)
references.extend(inline_urls)
# 去重
references = list(set(references))
return references
def check_brand_authority(response: dict, brand: str) -> dict:
"""检查品牌在回答中的权威度"""
result = {
"mentioned": False,
"position": None,
"context": None
}
if response and 'choices' in response:
content = response['choices'][0]['message']['content']
# 检查品牌是否被提及
if brand.lower() in content.lower():
result["mentioned"] = True
# 查找品牌出现的位置
index = content.lower().find(brand.lower())
result["position"] = index
# 提取上下文
start = max(0, index - 100)
end = min(len(content), index + 100)
result["context"] = content[start:end]
return result
def main():
logger.info("启动DeepSeek引用监控...")
while True:
results = []
for keyword in MONITOR_KEYWORDS:
logger.info(f"正在查询: {keyword}")
response = query_deepseek(keyword)
if response:
references = extract_references(response)
brand_info = check_brand_authority(response, "品牌名")
result = {
"timestamp": datetime.now().isoformat(),
"keyword": keyword,
"brand_mentioned": brand_info["mentioned"],
"brand_context": brand_info["context"],
"reference_count": len(references),
"references": references[:15],
"response_preview": response['choices'][0]['message']['content'][:300]
}
results.append(result)
logger.info(f" 品牌提及: {brand_info['mentioned']}, 引用数: {len(references)}")
else:
results.append({
"timestamp": datetime.now().isoformat(),
"keyword": keyword,
"error": "API调用失败"
})
time.sleep(3) # 避免频率限制
# 保存结果
try:
with open(OUTPUT_FILE, 'r') as f:
existing = json.load(f)
except (FileNotFoundError, json.JSONDecodeError):
existing = []
existing.extend(results)
existing = existing[-200:] # 保留最近200条
with open(OUTPUT_FILE, 'w') as f:
json.dump(existing, f, ensure_ascii=False, indent=2)
logger.info(f"监控完成,等待{CHECK_INTERVAL}秒后再次检查...")
time.sleep(CHECK_INTERVAL)
if __name__ == "__main__":
main()
E.7 豆包答案变化追踪脚本
场景
监控豆包(字节跳动)对特定问题的回答变化,追踪品牌在答案中的出现情况和内容演变。
Python 脚本 (doubao_tracker.py)
#!/usr/bin/env python3
# doubao_tracker.py
import requests
import json
import time
import hashlib
from datetime import datetime
from difflib import Differ
# 配置
DOUBAO_API_URL = "https://api.doubao.com/v1/chat/completions" # 示例URL
DOUBAO_API_KEY = "your_doubao_api_key"
MONITOR_QUESTIONS = [
"什么是[品牌名]?",
"[产品名]怎么样?",
"推荐[品类]产品"
]
CHECK_INTERVAL = 3600 # 1小时
OUTPUT_DIR = "./doubao_tracking"
def query_doubao(question: str) -> dict:
"""调用豆包API"""
headers = {
"Authorization": f"Bearer {DOUBAO_API_KEY}",
"Content-Type": "application/json"
}
payload = {
"model": "doubao-pro",
"messages": [
{"role": "user", "content": question}
],
"max_tokens": 1000,
"temperature": 0.1
}
try:
response = requests.post(
DOUBAO_API_URL,
headers=headers,
json=payload,
timeout=30
)
response.raise_for_status()
return response.json()
except Exception as e:
print(f"豆包API调用失败: {e}")
return None
def compute_hash(text: str) -> str:
"""计算文本哈希值用于变化检测"""
return hashlib.md5(text.encode('utf-8')).hexdigest()
def compare_answers(old_answer: str, new_answer: str) -> dict:
"""比较两个答案的差异"""
differ = Differ()
old_lines = old_answer.splitlines()
new_lines = new_answer.splitlines()
diff = list(differ.compare(old_lines, new_lines))
additions = [line[2:] for line in diff if line.startswith('+ ')]
deletions = [line[2:] for line in diff if line.startswith('- ')]
return {
"has_changed": old_answer != new_answer,
"additions": additions[:10],
"deletions": deletions[:10],
"diff_percentage": len(additions) / max(len(old_lines), 1) * 100
}
def check_brand_mentions(answer: str, brand: str) -> dict:
"""检查品牌提及情况"""
import re
mentions = []
for match in re.finditer(re.escape(brand), answer, re.IGNORECASE):
start = max(0, match.start() - 50)
end = min(len(answer), match.end() + 50)
mentions.append({
"position": match.start(),
"context": answer[start:end]
})
return {
"total_mentions": len(mentions),
"mentions": mentions[:5],
"mentioned": len(mentions) > 0
}
def track_question(question: str):
"""追踪单个问题的答案变化"""
import os
# 创建输出目录
if not os.path.exists(OUTPUT_DIR):
os.makedirs(OUTPUT_DIR)
# 文件名用问题哈希
question_hash = hashlib.md5(question.encode()).hexdigest()
history_file = os.path.join(OUTPUT_DIR, f"{question_hash}.json")
# 加载历史记录
try:
with open(history_file, 'r') as f:
history = json.load(f)
except (FileNotFoundError, json.JSONDecodeError):
history = []
# 查询当前答案
response = query_doubao(question)
if not response:
return
current_answer = response['choices'][0]['message']['content']
current_hash = compute_hash(current_answer)
# 检查是否变化
last_hash = history[-1]['hash'] if history else None
has_changed = current_hash != last_hash
# 比较差异
diff_info = {}
if history and has_changed:
diff_info = compare_answers(history[-1]['answer'], current_answer)
# 检查品牌提及
brand_info = check_brand_mentions(current_answer, "品牌名")
# 记录
record = {
"timestamp": datetime.now().isoformat(),
"hash": current_hash,
"answer": current_answer,
"has_changed": has_changed,
"brand_mentions": brand_info,
"diff": diff_info
}
history.append(record)
# 只保留最近50条记录
history = history[-50:]
with open(history_file, 'w') as f:
json.dump(history, f, ensure_ascii=False, indent=2)
# 打印摘要
print(f"\n问题: {question}")
print(f"时间: {record['timestamp']}")
print(f"变化: {'是' if has_changed else '否'}")
print(f"品牌提及: {brand_info['total_mentions']}次")
if has_changed:
print(f"新增内容: {diff_info.get('additions', [])[:3]}")
print(f"删除内容: {diff_info.get('deletions', [])[:3]}")
print("-" * 50)
def main():
print("启动豆包答案变化追踪...")
print(f"监控问题数: {len(MONITOR_QUESTIONS)}")
print(f"检查间隔: {CHECK_INTERVAL}秒")
print(f"输出目录: {OUTPUT_DIR}")
print("=" * 50)
while True:
for question in MONITOR_QUESTIONS:
try:
track_question(question)
except Exception as e:
print(f"追踪问题失败: {question}, 错误: {e}")
time.sleep(5) # 问题间间隔
print(f"\n等待{CHECK_INTERVAL}秒后再次检查...")
time.sleep(CHECK_INTERVAL)
if __name__ == "__main__":
main()
使用建议:
- 所有脚本均需根据实际API密钥和业务逻辑进行调整
- 建议将监控脚本部署为定时任务(cron job)或使用CI/CD流水线
- 对于生产环境,请添加错误重试、日志轮转和告警机制
- 注意API调用频率限制,避免触发限流
