Tailwind CSSTailwind CSS
Home
  • Tailwind CSS 书籍目录
  • Vue 3 开发实战指南
  • React 和 Next.js 学习
  • TypeScript
  • React开发框架书籍大纲
  • Shadcn学习大纲
  • Swift 编程语言:从入门到进阶
  • SwiftUI 学习指南
  • 函数式编程大纲
  • Swift 异步编程语言
  • Swift 协议化编程
  • SwiftUI MVVM 开发模式
  • SwiftUI 图表开发书籍
  • SwiftData
  • ArkTS编程语言:从入门到精通
  • 仓颉编程语言:从入门到精通
  • 鸿蒙手机客户端开发实战
  • WPF书籍
  • C#开发书籍
learn
  • 搜索未来:SEO与GEO双引擎实战手册
  • Java编程语言
  • Kotlin 编程入门与实战
  • /python/outline.html
  • Rust 开发入门
  • AI Agent
  • MCP (Model Context Protocol) 应用指南
  • 深度学习
  • 深度学习
  • 强化学习: 理论与实践
  • 扩散模型书籍
  • Agentic AI for Everyone
langchain
Home
  • Tailwind CSS 书籍目录
  • Vue 3 开发实战指南
  • React 和 Next.js 学习
  • TypeScript
  • React开发框架书籍大纲
  • Shadcn学习大纲
  • Swift 编程语言:从入门到进阶
  • SwiftUI 学习指南
  • 函数式编程大纲
  • Swift 异步编程语言
  • Swift 协议化编程
  • SwiftUI MVVM 开发模式
  • SwiftUI 图表开发书籍
  • SwiftData
  • ArkTS编程语言:从入门到精通
  • 仓颉编程语言:从入门到精通
  • 鸿蒙手机客户端开发实战
  • WPF书籍
  • C#开发书籍
learn
  • 搜索未来:SEO与GEO双引擎实战手册
  • Java编程语言
  • Kotlin 编程入门与实战
  • /python/outline.html
  • Rust 开发入门
  • AI Agent
  • MCP (Model Context Protocol) 应用指南
  • 深度学习
  • 深度学习
  • 强化学习: 理论与实践
  • 扩散模型书籍
  • Agentic AI for Everyone
langchain
  • E.2 自建Perplexity引用监控脚本

E.2 自建Perplexity引用监控脚本

概述

本脚本用于监控您的网站在Perplexity AI生成答案中被引用的频率和上下文。通过定期查询Perplexity API或模拟用户行为,收集引用数据并存储,以便分析趋势和异常。

技术栈

  • 语言: Python 3.8+
  • 核心库: requests, json, datetime, logging, sqlite3 (或 pandas + CSV)
  • 可选: schedule (定时任务), smtplib (邮件告警)

脚本架构

perplexity_monitor/
├── config.py          # 配置参数
├── monitor.py         # 主监控逻辑
├── storage.py         # 数据存储模块
├── utils.py           # 工具函数
├── requirements.txt   # 依赖
└── logs/              # 日志目录

核心实现

1. 配置模块 (config.py)

# config.py
import os
from dotenv import load_dotenv

load_dotenv()

# Perplexity API配置
PERPLEXITY_API_KEY = os.getenv("PERPLEXITY_API_KEY")
PERPLEXITY_API_URL = "https://api.perplexity.ai/chat/completions"

# 监控目标
MONITOR_DOMAINS = [
    "example.com",
    "docs.example.com",
    "blog.example.com"
]

# 查询模板(用于触发Perplexity生成答案)
QUERY_TEMPLATES = [
    "What is {topic}?",
    "How to {action}?",
    "Best practices for {topic}",
    "Latest updates on {topic}"
]

# 监控主题(与您的内容相关)
MONITOR_TOPICS = [
    "SEO techniques",
    "GEO optimization",
    "web performance",
    "structured data"
]

# 存储配置
STORAGE_TYPE = "sqlite"  # 可选: sqlite, csv
DB_PATH = "data/perplexity_monitor.db"
CSV_PATH = "data/perplexity_references.csv"

# 调度配置
CHECK_INTERVAL_HOURS = 6  # 每6小时检查一次

# 告警配置
ALERT_THRESHOLD = 0  # 引用次数低于此值触发告警
ALERT_EMAIL = os.getenv("ALERT_EMAIL")

2. 主监控脚本 (monitor.py)

# monitor.py
import requests
import json
import logging
from datetime import datetime, timedelta
from typing import List, Dict, Optional
import time

from config import (
    PERPLEXITY_API_KEY,
    PERPLEXITY_API_URL,
    MONITOR_DOMAINS,
    QUERY_TEMPLATES,
    MONITOR_TOPICS,
    CHECK_INTERVAL_HOURS
)
from storage import Storage
from utils import setup_logger, send_alert

logger = setup_logger("perplexity_monitor")

class PerplexityMonitor:
    def __init__(self):
        self.storage = Storage()
        self.session = requests.Session()
        self.session.headers.update({
            "Authorization": f"Bearer {PERPLEXITY_API_KEY}",
            "Content-Type": "application/json"
        })
        
    def query_perplexity(self, query: str) -> Optional[Dict]:
        """向Perplexity API发送查询请求"""
        payload = {
            "model": "sonar-pro",  # 或 sonar-reasoning-pro
            "messages": [
                {
                    "role": "system",
                    "content": "你是一个搜索引擎。请提供详细的答案,并引用来源。"
                },
                {
                    "role": "user",
                    "content": query
                }
            ],
            "max_tokens": 1024,
            "temperature": 0.2,
            "top_p": 0.9,
            "search_domain_filter": None,  # 不限制搜索域
            "return_images": False,
            "return_related_questions": False,
            "search_recency_filter": "month",
            "top_k": 0,
            "stream": False,
            "presence_penalty": 0,
            "frequency_penalty": 1
        }
        
        try:
            response = self.session.post(PERPLEXITY_API_URL, json=payload, timeout=30)
            response.raise_for_status()
            return response.json()
        except requests.exceptions.RequestException as e:
            logger.error(f"API请求失败: {e}")
            return None
            
    def extract_references(self, response: Dict) -> List[Dict]:
        """从Perplexity响应中提取引用信息"""
        references = []
        
        if not response or "choices" not in response:
            return references
            
        content = response["choices"][0]["message"]["content"]
        citations = response.get("citations", [])
        
        # 检查内容中是否包含监控域名
        for domain in MONITOR_DOMAINS:
            if domain in content:
                # 提取引用上下文
                context = self._extract_context(content, domain)
                references.append({
                    "domain": domain,
                    "context": context,
                    "full_content": content,
                    "citations": citations,
                    "timestamp": datetime.utcnow().isoformat(),
                    "query": response.get("query", "")
                })
                
        return references
        
    def _extract_context(self, content: str, domain: str, window: int = 200) -> str:
        """提取域名出现前后的上下文"""
        index = content.find(domain)
        if index == -1:
            return ""
            
        start = max(0, index - window)
        end = min(len(content), index + len(domain) + window)
        return content[start:end]
        
    def run_check(self, topics: List[str] = None):
        """执行一轮检查"""
        if topics is None:
            topics = MONITOR_TOPICS
            
        all_references = []
        
        for topic in topics:
            for template in QUERY_TEMPLATES:
                query = template.format(topic=topic, action=topic.replace(" ", "_"))
                logger.info(f"查询: {query}")
                
                response = self.query_perplexity(query)
                if response:
                    references = self.extract_references(response)
                    all_references.extend(references)
                    
                    # 避免API限流
                    time.sleep(2)
                    
        # 存储结果
        if all_references:
            self.storage.save_references(all_references)
            logger.info(f"发现 {len(all_references)} 个引用")
            
            # 检查告警阈值
            if len(all_references) < ALERT_THRESHOLD:
                send_alert(f"引用数量低于阈值: {len(all_references)} < {ALERT_THRESHOLD}")
        else:
            logger.warning("本轮检查未发现引用")
            
        return all_references
        
    def start_monitoring(self):
        """启动持续监控"""
        logger.info(f"开始监控,间隔 {CHECK_INTERVAL_HOURS} 小时")
        
        while True:
            try:
                self.run_check()
                logger.info(f"等待 {CHECK_INTERVAL_HOURS} 小时后再次检查...")
                time.sleep(CHECK_INTERVAL_HOURS * 3600)
            except KeyboardInterrupt:
                logger.info("监控停止")
                break
            except Exception as e:
                logger.error(f"监控异常: {e}")
                time.sleep(300)  # 异常后等待5分钟重试

if __name__ == "__main__":
    monitor = PerplexityMonitor()
    monitor.start_monitoring()

3. 数据存储模块 (storage.py)

# storage.py
import sqlite3
import csv
import json
from datetime import datetime
from typing import List, Dict
import os

from config import STORAGE_TYPE, DB_PATH, CSV_PATH

class Storage:
    def __init__(self):
        if STORAGE_TYPE == "sqlite":
            self._init_sqlite()
        elif STORAGE_TYPE == "csv":
            self._init_csv()
            
    def _init_sqlite(self):
        """初始化SQLite数据库"""
        os.makedirs(os.path.dirname(DB_PATH), exist_ok=True)
        self.conn = sqlite3.connect(DB_PATH)
        self.cursor = self.conn.cursor()
        
        self.cursor.execute("""
            CREATE TABLE IF NOT EXISTS references (
                id INTEGER PRIMARY KEY AUTOINCREMENT,
                domain TEXT NOT NULL,
                context TEXT,
                full_content TEXT,
                citations TEXT,
                query TEXT,
                timestamp TEXT NOT NULL
            )
        """)
        
        self.cursor.execute("""
            CREATE INDEX IF NOT EXISTS idx_domain ON references(domain)
        """)
        
        self.cursor.execute("""
            CREATE INDEX IF NOT EXISTS idx_timestamp ON references(timestamp)
        """)
        
        self.conn.commit()
        
    def _init_csv(self):
        """初始化CSV文件"""
        os.makedirs(os.path.dirname(CSV_PATH), exist_ok=True)
        if not os.path.exists(CSV_PATH):
            with open(CSV_PATH, 'w', newline='', encoding='utf-8') as f:
                writer = csv.writer(f)
                writer.writerow([
                    "timestamp", "domain", "context", 
                    "citations", "query"
                ])
                
    def save_references(self, references: List[Dict]):
        """保存引用数据"""
        if STORAGE_TYPE == "sqlite":
            self._save_to_sqlite(references)
        elif STORAGE_TYPE == "csv":
            self._save_to_csv(references)
            
    def _save_to_sqlite(self, references: List[Dict]):
        """保存到SQLite"""
        for ref in references:
            self.cursor.execute("""
                INSERT INTO references 
                (domain, context, full_content, citations, query, timestamp)
                VALUES (?, ?, ?, ?, ?, ?)
            """, (
                ref["domain"],
                ref["context"],
                ref["full_content"],
                json.dumps(ref["citations"]),
                ref.get("query", ""),
                ref["timestamp"]
            ))
        self.conn.commit()
        
    def _save_to_csv(self, references: List[Dict]):
        """保存到CSV"""
        with open(CSV_PATH, 'a', newline='', encoding='utf-8') as f:
            writer = csv.writer(f)
            for ref in references:
                writer.writerow([
                    ref["timestamp"],
                    ref["domain"],
                    ref["context"],
                    json.dumps(ref["citations"]),
                    ref.get("query", "")
                ])
                
    def get_references_by_domain(self, domain: str, days: int = 7) -> List[Dict]:
        """获取指定域名的引用记录"""
        if STORAGE_TYPE == "sqlite":
            cutoff = (datetime.utcnow() - timedelta(days=days)).isoformat()
            self.cursor.execute("""
                SELECT * FROM references 
                WHERE domain = ? AND timestamp > ?
                ORDER BY timestamp DESC
            """, (domain, cutoff))
            rows = self.cursor.fetchall()
            return [
                {
                    "id": row[0],
                    "domain": row[1],
                    "context": row[2],
                    "full_content": row[3],
                    "citations": json.loads(row[4]),
                    "query": row[5],
                    "timestamp": row[6]
                }
                for row in rows
            ]
        else:
            # CSV实现略
            return []
            
    def get_summary(self, days: int = 7) -> Dict:
        """获取汇总统计"""
        if STORAGE_TYPE == "sqlite":
            cutoff = (datetime.utcnow() - timedelta(days=days)).isoformat()
            
            self.cursor.execute("""
                SELECT domain, COUNT(*) as count
                FROM references
                WHERE timestamp > ?
                GROUP BY domain
                ORDER BY count DESC
            """, (cutoff,))
            domain_counts = dict(self.cursor.fetchall())
            
            self.cursor.execute("""
                SELECT COUNT(*) FROM references
                WHERE timestamp > ?
            """, (cutoff,))
            total = self.cursor.fetchone()[0]
            
            return {
                "total_references": total,
                "domain_counts": domain_counts,
                "period_days": days
            }
        else:
            return {}

4. 工具函数 (utils.py)

# utils.py
import logging
import smtplib
from email.mime.text import MIMEText
from datetime import datetime
import os

from config import ALERT_EMAIL

def setup_logger(name: str) -> logging.Logger:
    """配置日志"""
    logger = logging.getLogger(name)
    logger.setLevel(logging.INFO)
    
    # 控制台输出
    console_handler = logging.StreamHandler()
    console_handler.setLevel(logging.INFO)
    console_formatter = logging.Formatter(
        '%(asctime)s - %(name)s - %(levelname)s - %(message)s'
    )
    console_handler.setFormatter(console_formatter)
    
    # 文件输出
    os.makedirs("logs", exist_ok=True)
    file_handler = logging.FileHandler(
        f"logs/perplexity_monitor_{datetime.now().strftime('%Y%m%d')}.log"
    )
    file_handler.setLevel(logging.INFO)
    file_formatter = logging.Formatter(
        '%(asctime)s - %(name)s - %(levelname)s - %(message)s'
    )
    file_handler.setFormatter(file_formatter)
    
    logger.addHandler(console_handler)
    logger.addHandler(file_handler)
    
    return logger

def send_alert(message: str):
    """发送告警(示例使用邮件)"""
    if not ALERT_EMAIL:
        logger.warning("未配置告警邮箱")
        return
        
    try:
        msg = MIMEText(message)
        msg['Subject'] = f"Perplexity监控告警 - {datetime.now().strftime('%Y-%m-%d %H:%M')}"
        msg['From'] = "monitor@example.com"
        msg['To'] = ALERT_EMAIL
        
        # 实际发送需配置SMTP服务器
        # with smtplib.SMTP('smtp.example.com', 587) as server:
        #     server.starttls()
        #     server.login("user", "password")
        #     server.send_message(msg)
        
        logger.info(f"告警已发送: {message}")
    except Exception as e:
        logger.error(f"发送告警失败: {e}")

5. 依赖文件 (requirements.txt)

requests==2.31.0
python-dotenv==1.0.0
schedule==1.2.0

部署与使用

1. 环境准备

# 创建虚拟环境
python -m venv venv
source venv/bin/activate  # Linux/Mac
# venv\Scripts\activate  # Windows

# 安装依赖
pip install -r requirements.txt

# 创建环境变量文件
cp .env.example .env
# 编辑 .env 文件,填入 Perplexity API Key

2. 运行方式

持续监控模式

python monitor.py

单次检查模式

python -c "from monitor import PerplexityMonitor; m = PerplexityMonitor(); m.run_check()"

定时任务(使用 cron)

# 每6小时执行一次
0 */6 * * * cd /path/to/project && /path/to/venv/bin/python monitor.py

3. 配置示例 (.env)

PERPLEXITY_API_KEY=pplx-xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx
ALERT_EMAIL=admin@example.com

数据可视化建议

使用 Grafana 展示趋势

-- Grafana SQL查询示例
SELECT 
    strftime('%Y-%m-%d', timestamp) as date,
    domain,
    COUNT(*) as reference_count
FROM references
WHERE timestamp > datetime('now', '-30 days')
GROUP BY date, domain
ORDER BY date;

生成周报脚本

# weekly_report.py
from storage import Storage
from datetime import datetime, timedelta
import json

def generate_report():
    storage = Storage()
    summary = storage.get_summary(days=7)
    
    report = f"""
# Perplexity引用周报
生成时间: {datetime.now().strftime('%Y-%m-%d %H:%M')}

## 总引用数
{summary['total_references']}

## 各域名引用分布
{json.dumps(summary['domain_counts'], indent=2)}

## 趋势分析
"""
    # 添加趋势分析逻辑...
    
    with open(f"reports/weekly_{datetime.now().strftime('%Y%m%d')}.md", 'w') as f:
        f.write(report)
        
if __name__ == "__main__":
    generate_report()

注意事项

  1. API 限流: Perplexity API 有速率限制,建议设置合理的请求间隔
  2. 成本控制: API 调用按 token 计费,建议控制查询频率和长度
  3. 数据隐私: 存储的引用内容可能包含用户数据,需注意合规
  4. 结果准确性: Perplexity 的答案可能随时间变化,需持续监控
  5. 备用方案: 建议同时使用其他生成引擎(如 Bing Chat)进行交叉验证

扩展建议

  1. 多引擎支持: 扩展脚本支持 Bing Chat、Google SGE 等其他生成引擎
  2. 语义分析: 集成 NLP 库分析引用上下文的情感倾向
  3. 自动告警: 当引用数量骤降或内容出现负面描述时自动告警
  4. API 代理: 使用代理池避免 IP 被封禁
  5. 历史对比: 保存历史快照,分析引用变化趋势
Last Updated:: 5/9/26, 5:13 PM