18.3 异常检测与告警(邮件/钉钉/飞书)
在双引擎优化体系中,异常检测与告警系统是保障优化成果持续稳定的最后一道防线。当生成式搜索引擎的引用逻辑发生变化、竞争对手内容突然超越、或者网站出现技术故障时,及时的告警能够帮助团队快速响应,避免流量断崖式下跌。
异常检测的核心维度
1. 生成引擎引用异常
- 引用消失:原本在生成式摘要中被引用的页面突然消失
- 引用排名下降:在生成结果中的排序位置下降超过阈值
- 引用内容错误:生成引擎错误引用或曲解页面内容
- 引用频率异常:同一来源的引用次数突然减少或激增
2. 传统搜索流量异常
- 点击量骤降:Search Console中展示量或点击量下降超过30%
- 排名波动:核心关键词排名波动超过5个位置
- 索引异常:索引页面数量大幅减少
- 爬虫异常:爬虫访问频率异常变化
3. 技术指标异常
- 页面性能退化:Core Web Vitals指标恶化
- 可用性下降:服务器响应时间增加或出现5xx错误
- 结构化数据失效:Schema标记解析失败或警告增加
- CDN/边缘节点异常:特定地区访问异常
告警系统架构设计
基础架构组件
┌─────────────────────────────────────────────┐
│ 数据采集层 │
│ ┌──────────┐ ┌──────────┐ ┌──────────────┐ │
│ │Search │ │生成引擎 │ │基础设施 │ │
│ │Console │ │API监控 │ │监控(Prom) │ │
│ └──────────┘ └──────────┘ └──────────────┘ │
└────────────────────┬────────────────────────┘
│
┌────────────────────▼────────────────────────┐
│ 异常检测引擎 │
│ ┌──────────┐ ┌──────────┐ ┌──────────────┐ │
│ │阈值检测 │ │趋势分析 │ │机器学习 │ │
│ │(固定值) │ │(移动平均)│ │(异常检测) │ │
│ └──────────┘ └──────────┘ └──────────────┘ │
└────────────────────┬────────────────────────┘
│
┌────────────────────▼────────────────────────┐
│ 告警路由与分发 │
│ ┌──────────┐ ┌──────────┐ ┌──────────────┐ │
│ │邮件通知 │ │钉钉消息 │ │飞书机器人 │ │
│ │(SMTP) │ │(Webhook) │ │(Webhook) │ │
│ └──────────┘ └──────────┘ └──────────────┘ │
└─────────────────────────────────────────────┘
异常检测算法实现
1. 基于阈值的简单检测
# 阈值检测示例
def check_threshold_anomaly(current_value, metric_config):
"""
检查指标是否超过阈值
"""
threshold = metric_config['threshold']
direction = metric_config.get('direction', 'both') # 'up', 'down', 'both'
if direction == 'up' and current_value > threshold:
return True, f"指标超过上限: {current_value} > {threshold}"
elif direction == 'down' and current_value < threshold:
return True, f"指标低于下限: {current_value} < {threshold}"
elif direction == 'both' and (current_value > threshold['high'] or current_value < threshold['low']):
return True, f"指标超出范围: {current_value}"
return False, None
2. 基于移动平均的趋势检测
import numpy as np
from collections import deque
class MovingAverageDetector:
def __init__(self, window_size=7, std_multiplier=3):
self.window_size = window_size
self.std_multiplier = std_multiplier
self.history = deque(maxlen=window_size)
def add_value(self, value):
self.history.append(value)
def is_anomaly(self, current_value):
if len(self.history) < self.window_size:
return False
mean = np.mean(self.history)
std = np.std(self.history)
if std == 0:
return abs(current_value - mean) > 0.1 * abs(mean)
z_score = (current_value - mean) / std
return abs(z_score) > self.std_multiplier
3. 机器学习异常检测(简单实现)
from sklearn.ensemble import IsolationForest
import pandas as pd
class MLAnomalyDetector:
def __init__(self, contamination=0.05):
self.model = IsolationForest(contamination=contamination, random_state=42)
self.is_trained = False
def train(self, historical_data):
"""
历史数据格式: DataFrame with columns ['value', 'hour', 'day_of_week', 'is_weekend']
"""
features = self._extract_features(historical_data)
self.model.fit(features)
self.is_trained = True
def predict(self, current_data_point):
if not self.is_trained:
return False
features = self._extract_features(pd.DataFrame([current_data_point]))
prediction = self.model.predict(features)
return prediction[0] == -1 # -1表示异常
def _extract_features(self, df):
features = df.copy()
if 'timestamp' in features.columns:
features['hour'] = pd.to_datetime(features['timestamp']).dt.hour
features['day_of_week'] = pd.to_datetime(features['timestamp']).dt.dayofweek
features['is_weekend'] = features['day_of_week'].isin([5, 6]).astype(int)
features.drop('timestamp', axis=1, inplace=True)
return features[['value', 'hour', 'day_of_week', 'is_weekend']]
告警路由与分发实现
1. 邮件告警
import smtplib
from email.mime.text import MIMEText
from email.mime.multipart import MIMEMultipart
class EmailAlert:
def __init__(self, smtp_server, smtp_port, username, password):
self.smtp_server = smtp_server
self.smtp_port = smtp_port
self.username = username
self.password = password
def send_alert(self, to_emails, subject, body):
msg = MIMEMultipart()
msg['From'] = self.username
msg['To'] = ', '.join(to_emails)
msg['Subject'] = subject
msg.attach(MIMEText(body, 'html'))
with smtplib.SMTP(self.smtp_server, self.smtp_port) as server:
server.starttls()
server.login(self.username, self.password)
server.send_message(msg)
2. 钉钉机器人告警
import requests
import json
import hmac
import hashlib
import base64
import time
import urllib.parse
class DingTalkAlert:
def __init__(self, webhook_url, secret=None):
self.webhook_url = webhook_url
self.secret = secret
def _sign(self, timestamp):
if not self.secret:
return ''
string_to_sign = f'{timestamp}\n{self.secret}'
hmac_code = hmac.new(
self.secret.encode('utf-8'),
string_to_sign.encode('utf-8'),
digestmod=hashlib.sha256
).digest()
sign = base64.b64encode(hmac_code).decode('utf-8')
return urllib.parse.quote_plus(sign)
def send_markdown(self, title, text):
timestamp = str(round(time.time() * 1000))
sign = self._sign(timestamp)
url = f"{self.webhook_url}×tamp={timestamp}&sign={sign}" if self.secret else self.webhook_url
payload = {
"msgtype": "markdown",
"markdown": {
"title": title,
"text": text
}
}
response = requests.post(url, json=payload)
return response.json()
def send_action_card(self, title, text, btn_orientation="1", btns=None):
"""发送行动卡片,包含跳转链接"""
timestamp = str(round(time.time() * 1000))
sign = self._sign(timestamp)
url = f"{self.webhook_url}×tamp={timestamp}&sign={sign}" if self.secret else self.webhook_url
payload = {
"msgtype": "actionCard",
"actionCard": {
"title": title,
"text": text,
"btnOrientation": btn_orientation,
"btns": btns or []
}
}
response = requests.post(url, json=payload)
return response.json()
3. 飞书机器人告警
import requests
import json
class FeishuAlert:
def __init__(self, webhook_url):
self.webhook_url = webhook_url
def send_text(self, content):
payload = {
"msg_type": "text",
"content": {
"text": content
}
}
response = requests.post(self.webhook_url, json=payload)
return response.json()
def send_interactive(self, title, content, buttons=None):
"""发送富文本消息,支持按钮交互"""
elements = []
# 添加内容
elements.append({
"tag": "markdown",
"content": content
})
# 添加按钮
if buttons:
elements.append({
"tag": "action",
"actions": [
{
"tag": "button",
"text": {
"tag": "plain_text",
"content": btn['text']
},
"url": btn.get('url', ''),
"type": btn.get('type', 'default')
}
for btn in buttons
]
})
payload = {
"msg_type": "interactive",
"card": {
"header": {
"title": {
"tag": "plain_text",
"content": title
}
},
"elements": elements
}
}
response = requests.post(self.webhook_url, json=payload)
return response.json()
完整告警系统实现
主控脚本
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
SEO/GEO异常检测与告警系统
"""
import os
import sys
import time
import json
import logging
from datetime import datetime, timedelta
from typing import Dict, List, Optional
# 配置日志
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
handlers=[
logging.FileHandler('/var/log/seo_alert.log'),
logging.StreamHandler()
]
)
logger = logging.getLogger('SEOAlert')
class AlertManager:
def __init__(self, config_path='alert_config.json'):
self.config = self._load_config(config_path)
self.detectors = self._init_detectors()
self.alerts = self._init_alert_channels()
self.metrics_history = {}
def _load_config(self, path):
with open(path, 'r') as f:
return json.load(f)
def _init_detectors(self):
detectors = {}
for metric_name, metric_config in self.config.get('metrics', {}).items():
if metric_config.get('detector') == 'threshold':
detectors[metric_name] = {
'type': 'threshold',
'config': metric_config
}
elif metric_config.get('detector') == 'moving_average':
detectors[metric_name] = {
'type': 'moving_average',
'instance': MovingAverageDetector(
window_size=metric_config.get('window', 7),
std_multiplier=metric_config.get('std_multiplier', 3)
),
'config': metric_config
}
return detectors
def _init_alert_channels(self):
channels = {}
channel_config = self.config.get('channels', {})
if 'email' in channel_config:
channels['email'] = EmailAlert(
smtp_server=channel_config['email']['smtp_server'],
smtp_port=channel_config['email']['smtp_port'],
username=channel_config['email']['username'],
password=channel_config['email']['password']
)
if 'dingtalk' in channel_config:
channels['dingtalk'] = DingTalkAlert(
webhook_url=channel_config['dingtalk']['webhook_url'],
secret=channel_config['dingtalk'].get('secret')
)
if 'feishu' in channel_config:
channels['feishu'] = FeishuAlert(
webhook_url=channel_config['feishu']['webhook_url']
)
return channels
def check_metric(self, metric_name, current_value, metadata=None):
"""检查单个指标是否异常"""
if metric_name not in self.detectors:
logger.warning(f"未知指标: {metric_name}")
return False, None
detector = self.detectors[metric_name]
if detector['type'] == 'threshold':
is_anomaly, message = check_threshold_anomaly(current_value, detector['config'])
elif detector['type'] == 'moving_average':
detector['instance'].add_value(current_value)
is_anomaly = detector['instance'].is_anomaly(current_value)
message = f"移动平均检测异常: {current_value}"
if is_anomaly:
logger.warning(f"检测到异常 - {metric_name}: {message}")
self._send_alert(metric_name, message, current_value, metadata)
return is_anomaly, message
def _send_alert(self, metric_name, message, value, metadata=None):
"""发送告警到所有配置的渠道"""
alert_level = self.config.get('metrics', {}).get(metric_name, {}).get('alert_level', 'warning')
timestamp = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
# 构建告警内容
title = f"[{alert_level.upper()}] SEO/GEO异常告警 - {metric_name}"
text = f"""
## {title}
**时间**: {timestamp}
**指标**: {metric_name}
**当前值**: {value}
**异常信息**: {message}
**元数据**:
- 来源: {metadata.get('source', 'unknown') if metadata else 'unknown'}
- 页面: {metadata.get('url', 'N/A') if metadata else 'N/A'}
- 引擎: {metadata.get('engine', 'N/A') if metadata else 'N/A'}
**建议操作**:
1. 检查该指标的历史趋势
2. 确认是否为临时波动
3. 如持续异常,立即排查原因
"""
# 发送到各渠道
for channel_name, channel in self.alerts.items():
try:
if channel_name == 'email':
channel.send_alert(
to_emails=self.config['channels']['email']['to_emails'],
subject=title,
body=text
)
elif channel_name == 'dingtalk':
channel.send_markdown(title, text)
elif channel_name == 'feishu':
channel.send_interactive(title, text)
logger.info(f"告警已发送到 {channel_name}")
except Exception as e:
logger.error(f"发送告警到 {channel_name} 失败: {e}")
def run_check_cycle(self):
"""执行一次完整的检查周期"""
logger.info("开始异常检测周期...")
# 这里集成实际的数据采集逻辑
# 示例:从不同来源获取指标数据
metrics_to_check = [
# (metric_name, current_value, metadata)
('search_console_clicks', 1500, {'source': 'google', 'url': 'https://example.com'}),
('generative_reference_count', 25, {'source': 'perplexity', 'engine': 'perplexity'}),
('page_load_time', 2.5, {'source': 'lighthouse', 'url': 'https://example.com/page'}),
]
for metric_name, value, metadata in metrics_to_check:
self.check_metric(metric_name, value, metadata)
logger.info("异常检测周期完成")
def main():
manager = AlertManager('alert_config.json')
# 持续运行,每5分钟检查一次
while True:
try:
manager.run_check_cycle()
time.sleep(300) # 5分钟
except KeyboardInterrupt:
logger.info("收到中断信号,退出")
break
except Exception as e:
logger.error(f"运行异常: {e}")
time.sleep(60)
if __name__ == '__main__':
main()
配置文件示例
{
"metrics": {
"search_console_clicks": {
"detector": "moving_average",
"window": 7,
"std_multiplier": 3,
"alert_level": "critical",
"description": "Search Console点击量"
},
"generative_reference_count": {
"detector": "threshold",
"threshold": {"low": 5, "high": 100},
"direction": "both",
"alert_level": "warning",
"description": "生成引擎引用次数"
},
"page_load_time": {
"detector": "threshold",
"threshold": 3.0,
"direction": "up",
"alert_level": "warning",
"description": "页面加载时间(秒)"
}
},
"channels": {
"email": {
"smtp_server": "smtp.gmail.com",
"smtp_port": 587,
"username": "alerts@example.com",
"password": "your_password",
"to_emails": ["team@example.com"]
},
"dingtalk": {
"webhook_url": "https://oapi.dingtalk.com/robot/send?access_token=your_token",
"secret": "your_secret"
},
"feishu": {
"webhook_url": "https://open.feishu.cn/open-apis/bot/v2/hook/your_webhook"
}
}
}
告警策略最佳实践
1. 分级告警
- Critical(严重):流量下降50%以上、网站不可用、核心页面消失 → 立即通知全员
- Warning(警告):流量下降20-50%、排名波动、性能退化 → 通知相关团队
- Info(信息):小幅度波动、新趋势出现 → 记录日志,无需立即响应
2. 告警抑制
- 避免重复告警:同一指标在短时间内(如30分钟)只发送一次
- 静默期设置:夜间(22:00-8:00)仅发送Critical级别告警
- 维护窗口:在计划维护期间暂停告警
3. 告警内容优化
- 包含可操作信息:问题描述、影响范围、建议操作
- 提供上下文:历史趋势、相关指标、可能原因
- 添加跳转链接:直接跳转到监控仪表盘或问题页面
4. 告警路由
- 按时间路由:工作时间→钉钉/飞书,非工作时间→邮件+短信
- 按级别路由:Critical→全员,Warning→技术团队,Info→日志
- 按团队路由:SEO问题→内容团队,技术问题→开发团队
通过构建完善的异常检测与告警系统,团队能够在第一时间发现并响应SEO/GEO优化中的问题,确保双引擎优化成果的持续性和稳定性。
