E.2 自建Perplexity引用监控脚本
概述
本脚本用于监控您的网站在Perplexity AI生成答案中被引用的频率和上下文。通过定期查询Perplexity API或模拟用户行为,收集引用数据并存储,以便分析趋势和异常。
技术栈
- 语言: Python 3.8+
- 核心库:
requests,json,datetime,logging,sqlite3(或pandas+CSV) - 可选:
schedule(定时任务),smtplib(邮件告警)
脚本架构
perplexity_monitor/
├── config.py # 配置参数
├── monitor.py # 主监控逻辑
├── storage.py # 数据存储模块
├── utils.py # 工具函数
├── requirements.txt # 依赖
└── logs/ # 日志目录
核心实现
1. 配置模块 (config.py)
# config.py
import os
from dotenv import load_dotenv
load_dotenv()
# Perplexity API配置
PERPLEXITY_API_KEY = os.getenv("PERPLEXITY_API_KEY")
PERPLEXITY_API_URL = "https://api.perplexity.ai/chat/completions"
# 监控目标
MONITOR_DOMAINS = [
"example.com",
"docs.example.com",
"blog.example.com"
]
# 查询模板(用于触发Perplexity生成答案)
QUERY_TEMPLATES = [
"What is {topic}?",
"How to {action}?",
"Best practices for {topic}",
"Latest updates on {topic}"
]
# 监控主题(与您的内容相关)
MONITOR_TOPICS = [
"SEO techniques",
"GEO optimization",
"web performance",
"structured data"
]
# 存储配置
STORAGE_TYPE = "sqlite" # 可选: sqlite, csv
DB_PATH = "data/perplexity_monitor.db"
CSV_PATH = "data/perplexity_references.csv"
# 调度配置
CHECK_INTERVAL_HOURS = 6 # 每6小时检查一次
# 告警配置
ALERT_THRESHOLD = 0 # 引用次数低于此值触发告警
ALERT_EMAIL = os.getenv("ALERT_EMAIL")
2. 主监控脚本 (monitor.py)
# monitor.py
import requests
import json
import logging
from datetime import datetime, timedelta
from typing import List, Dict, Optional
import time
from config import (
PERPLEXITY_API_KEY,
PERPLEXITY_API_URL,
MONITOR_DOMAINS,
QUERY_TEMPLATES,
MONITOR_TOPICS,
CHECK_INTERVAL_HOURS
)
from storage import Storage
from utils import setup_logger, send_alert
logger = setup_logger("perplexity_monitor")
class PerplexityMonitor:
def __init__(self):
self.storage = Storage()
self.session = requests.Session()
self.session.headers.update({
"Authorization": f"Bearer {PERPLEXITY_API_KEY}",
"Content-Type": "application/json"
})
def query_perplexity(self, query: str) -> Optional[Dict]:
"""向Perplexity API发送查询请求"""
payload = {
"model": "sonar-pro", # 或 sonar-reasoning-pro
"messages": [
{
"role": "system",
"content": "你是一个搜索引擎。请提供详细的答案,并引用来源。"
},
{
"role": "user",
"content": query
}
],
"max_tokens": 1024,
"temperature": 0.2,
"top_p": 0.9,
"search_domain_filter": None, # 不限制搜索域
"return_images": False,
"return_related_questions": False,
"search_recency_filter": "month",
"top_k": 0,
"stream": False,
"presence_penalty": 0,
"frequency_penalty": 1
}
try:
response = self.session.post(PERPLEXITY_API_URL, json=payload, timeout=30)
response.raise_for_status()
return response.json()
except requests.exceptions.RequestException as e:
logger.error(f"API请求失败: {e}")
return None
def extract_references(self, response: Dict) -> List[Dict]:
"""从Perplexity响应中提取引用信息"""
references = []
if not response or "choices" not in response:
return references
content = response["choices"][0]["message"]["content"]
citations = response.get("citations", [])
# 检查内容中是否包含监控域名
for domain in MONITOR_DOMAINS:
if domain in content:
# 提取引用上下文
context = self._extract_context(content, domain)
references.append({
"domain": domain,
"context": context,
"full_content": content,
"citations": citations,
"timestamp": datetime.utcnow().isoformat(),
"query": response.get("query", "")
})
return references
def _extract_context(self, content: str, domain: str, window: int = 200) -> str:
"""提取域名出现前后的上下文"""
index = content.find(domain)
if index == -1:
return ""
start = max(0, index - window)
end = min(len(content), index + len(domain) + window)
return content[start:end]
def run_check(self, topics: List[str] = None):
"""执行一轮检查"""
if topics is None:
topics = MONITOR_TOPICS
all_references = []
for topic in topics:
for template in QUERY_TEMPLATES:
query = template.format(topic=topic, action=topic.replace(" ", "_"))
logger.info(f"查询: {query}")
response = self.query_perplexity(query)
if response:
references = self.extract_references(response)
all_references.extend(references)
# 避免API限流
time.sleep(2)
# 存储结果
if all_references:
self.storage.save_references(all_references)
logger.info(f"发现 {len(all_references)} 个引用")
# 检查告警阈值
if len(all_references) < ALERT_THRESHOLD:
send_alert(f"引用数量低于阈值: {len(all_references)} < {ALERT_THRESHOLD}")
else:
logger.warning("本轮检查未发现引用")
return all_references
def start_monitoring(self):
"""启动持续监控"""
logger.info(f"开始监控,间隔 {CHECK_INTERVAL_HOURS} 小时")
while True:
try:
self.run_check()
logger.info(f"等待 {CHECK_INTERVAL_HOURS} 小时后再次检查...")
time.sleep(CHECK_INTERVAL_HOURS * 3600)
except KeyboardInterrupt:
logger.info("监控停止")
break
except Exception as e:
logger.error(f"监控异常: {e}")
time.sleep(300) # 异常后等待5分钟重试
if __name__ == "__main__":
monitor = PerplexityMonitor()
monitor.start_monitoring()
3. 数据存储模块 (storage.py)
# storage.py
import sqlite3
import csv
import json
from datetime import datetime
from typing import List, Dict
import os
from config import STORAGE_TYPE, DB_PATH, CSV_PATH
class Storage:
def __init__(self):
if STORAGE_TYPE == "sqlite":
self._init_sqlite()
elif STORAGE_TYPE == "csv":
self._init_csv()
def _init_sqlite(self):
"""初始化SQLite数据库"""
os.makedirs(os.path.dirname(DB_PATH), exist_ok=True)
self.conn = sqlite3.connect(DB_PATH)
self.cursor = self.conn.cursor()
self.cursor.execute("""
CREATE TABLE IF NOT EXISTS references (
id INTEGER PRIMARY KEY AUTOINCREMENT,
domain TEXT NOT NULL,
context TEXT,
full_content TEXT,
citations TEXT,
query TEXT,
timestamp TEXT NOT NULL
)
""")
self.cursor.execute("""
CREATE INDEX IF NOT EXISTS idx_domain ON references(domain)
""")
self.cursor.execute("""
CREATE INDEX IF NOT EXISTS idx_timestamp ON references(timestamp)
""")
self.conn.commit()
def _init_csv(self):
"""初始化CSV文件"""
os.makedirs(os.path.dirname(CSV_PATH), exist_ok=True)
if not os.path.exists(CSV_PATH):
with open(CSV_PATH, 'w', newline='', encoding='utf-8') as f:
writer = csv.writer(f)
writer.writerow([
"timestamp", "domain", "context",
"citations", "query"
])
def save_references(self, references: List[Dict]):
"""保存引用数据"""
if STORAGE_TYPE == "sqlite":
self._save_to_sqlite(references)
elif STORAGE_TYPE == "csv":
self._save_to_csv(references)
def _save_to_sqlite(self, references: List[Dict]):
"""保存到SQLite"""
for ref in references:
self.cursor.execute("""
INSERT INTO references
(domain, context, full_content, citations, query, timestamp)
VALUES (?, ?, ?, ?, ?, ?)
""", (
ref["domain"],
ref["context"],
ref["full_content"],
json.dumps(ref["citations"]),
ref.get("query", ""),
ref["timestamp"]
))
self.conn.commit()
def _save_to_csv(self, references: List[Dict]):
"""保存到CSV"""
with open(CSV_PATH, 'a', newline='', encoding='utf-8') as f:
writer = csv.writer(f)
for ref in references:
writer.writerow([
ref["timestamp"],
ref["domain"],
ref["context"],
json.dumps(ref["citations"]),
ref.get("query", "")
])
def get_references_by_domain(self, domain: str, days: int = 7) -> List[Dict]:
"""获取指定域名的引用记录"""
if STORAGE_TYPE == "sqlite":
cutoff = (datetime.utcnow() - timedelta(days=days)).isoformat()
self.cursor.execute("""
SELECT * FROM references
WHERE domain = ? AND timestamp > ?
ORDER BY timestamp DESC
""", (domain, cutoff))
rows = self.cursor.fetchall()
return [
{
"id": row[0],
"domain": row[1],
"context": row[2],
"full_content": row[3],
"citations": json.loads(row[4]),
"query": row[5],
"timestamp": row[6]
}
for row in rows
]
else:
# CSV实现略
return []
def get_summary(self, days: int = 7) -> Dict:
"""获取汇总统计"""
if STORAGE_TYPE == "sqlite":
cutoff = (datetime.utcnow() - timedelta(days=days)).isoformat()
self.cursor.execute("""
SELECT domain, COUNT(*) as count
FROM references
WHERE timestamp > ?
GROUP BY domain
ORDER BY count DESC
""", (cutoff,))
domain_counts = dict(self.cursor.fetchall())
self.cursor.execute("""
SELECT COUNT(*) FROM references
WHERE timestamp > ?
""", (cutoff,))
total = self.cursor.fetchone()[0]
return {
"total_references": total,
"domain_counts": domain_counts,
"period_days": days
}
else:
return {}
4. 工具函数 (utils.py)
# utils.py
import logging
import smtplib
from email.mime.text import MIMEText
from datetime import datetime
import os
from config import ALERT_EMAIL
def setup_logger(name: str) -> logging.Logger:
"""配置日志"""
logger = logging.getLogger(name)
logger.setLevel(logging.INFO)
# 控制台输出
console_handler = logging.StreamHandler()
console_handler.setLevel(logging.INFO)
console_formatter = logging.Formatter(
'%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
console_handler.setFormatter(console_formatter)
# 文件输出
os.makedirs("logs", exist_ok=True)
file_handler = logging.FileHandler(
f"logs/perplexity_monitor_{datetime.now().strftime('%Y%m%d')}.log"
)
file_handler.setLevel(logging.INFO)
file_formatter = logging.Formatter(
'%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
file_handler.setFormatter(file_formatter)
logger.addHandler(console_handler)
logger.addHandler(file_handler)
return logger
def send_alert(message: str):
"""发送告警(示例使用邮件)"""
if not ALERT_EMAIL:
logger.warning("未配置告警邮箱")
return
try:
msg = MIMEText(message)
msg['Subject'] = f"Perplexity监控告警 - {datetime.now().strftime('%Y-%m-%d %H:%M')}"
msg['From'] = "monitor@example.com"
msg['To'] = ALERT_EMAIL
# 实际发送需配置SMTP服务器
# with smtplib.SMTP('smtp.example.com', 587) as server:
# server.starttls()
# server.login("user", "password")
# server.send_message(msg)
logger.info(f"告警已发送: {message}")
except Exception as e:
logger.error(f"发送告警失败: {e}")
5. 依赖文件 (requirements.txt)
requests==2.31.0
python-dotenv==1.0.0
schedule==1.2.0
部署与使用
1. 环境准备
# 创建虚拟环境
python -m venv venv
source venv/bin/activate # Linux/Mac
# venv\Scripts\activate # Windows
# 安装依赖
pip install -r requirements.txt
# 创建环境变量文件
cp .env.example .env
# 编辑 .env 文件,填入 Perplexity API Key
2. 运行方式
持续监控模式
python monitor.py
单次检查模式
python -c "from monitor import PerplexityMonitor; m = PerplexityMonitor(); m.run_check()"
定时任务(使用 cron)
# 每6小时执行一次
0 */6 * * * cd /path/to/project && /path/to/venv/bin/python monitor.py
3. 配置示例 (.env)
PERPLEXITY_API_KEY=pplx-xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx
ALERT_EMAIL=admin@example.com
数据可视化建议
使用 Grafana 展示趋势
-- Grafana SQL查询示例
SELECT
strftime('%Y-%m-%d', timestamp) as date,
domain,
COUNT(*) as reference_count
FROM references
WHERE timestamp > datetime('now', '-30 days')
GROUP BY date, domain
ORDER BY date;
生成周报脚本
# weekly_report.py
from storage import Storage
from datetime import datetime, timedelta
import json
def generate_report():
storage = Storage()
summary = storage.get_summary(days=7)
report = f"""
# Perplexity引用周报
生成时间: {datetime.now().strftime('%Y-%m-%d %H:%M')}
## 总引用数
{summary['total_references']}
## 各域名引用分布
{json.dumps(summary['domain_counts'], indent=2)}
## 趋势分析
"""
# 添加趋势分析逻辑...
with open(f"reports/weekly_{datetime.now().strftime('%Y%m%d')}.md", 'w') as f:
f.write(report)
if __name__ == "__main__":
generate_report()
注意事项
- API 限流: Perplexity API 有速率限制,建议设置合理的请求间隔
- 成本控制: API 调用按 token 计费,建议控制查询频率和长度
- 数据隐私: 存储的引用内容可能包含用户数据,需注意合规
- 结果准确性: Perplexity 的答案可能随时间变化,需持续监控
- 备用方案: 建议同时使用其他生成引擎(如 Bing Chat)进行交叉验证
扩展建议
- 多引擎支持: 扩展脚本支持 Bing Chat、Google SGE 等其他生成引擎
- 语义分析: 集成 NLP 库分析引用上下文的情感倾向
- 自动告警: 当引用数量骤降或内容出现负面描述时自动告警
- API 代理: 使用代理池避免 IP 被封禁
- 历史对比: 保存历史快照,分析引用变化趋势
