9.3 全栈日志分析实用脚本(Python/Go/Node分析Nginx日志)
9.3.1 日志分析的必要性
在GEO与SEO双引擎优化的背景下,Nginx日志分析已从传统的“看流量”升级为“看爬虫行为”。通过日志分析,我们可以:
- 识别AI爬虫访问模式:GPTBot、GoogleOther、ClaudeBot等是否按预期访问
- 监控响应状态码:是否出现大量4xx/5xx错误影响爬虫信任
- 分析爬取频率与时间分布:判断是否需要调整速率限制
- 发现异常爬虫行为:识别非标准User-Agent或恶意爬虫
- 验证动态渲染效果:对比不同爬虫获取的内容差异
9.3.2 日志格式配置
首先,确保Nginx配置了结构化的日志格式:
# 在 http 块中定义
log_format geo_json escape=json
'{'
'"timestamp":"$time_iso8601",'
'"remote_addr":"$remote_addr",'
'"request_method":"$request_method",'
'"request_uri":"$request_uri",'
'"status":$status,'
'"body_bytes_sent":$body_bytes_sent,'
'"http_referer":"$http_referer",'
'"http_user_agent":"$http_user_agent",'
'"request_time":$request_time,'
'"upstream_response_time":"$upstream_response_time",'
'"http_x_forwarded_for":"$http_x_forwarded_for"'
'}';
access_log /var/log/nginx/access.log geo_json;
9.3.3 Python日志分析脚本
Python因其丰富的数据处理库,是最适合进行深度日志分析的方案。
基础分析脚本
#!/usr/bin/env python3
"""
Nginx日志分析工具 - 面向SEO/GEO爬虫监控
支持JSON格式日志
"""
import json
import re
from collections import Counter, defaultdict
from datetime import datetime
import argparse
class NginxLogAnalyzer:
def __init__(self, log_file):
self.log_file = log_file
self.entries = []
self.ai_bots = {
'GPTBot': r'GPTBot',
'GoogleOther': r'GoogleOther',
'Googlebot': r'Googlebot',
'ClaudeBot': r'ClaudeBot',
'CCBot': r'CCBot',
'Bytespider': r'Bytespider',
'DeepSeek-Bot': r'DeepSeek-Bot',
'Baiduspider': r'Baiduspider',
'Bingbot': r'bingbot',
'YandexBot': r'YandexBot'
}
def parse_log(self):
"""解析日志文件"""
with open(self.log_file, 'r', encoding='utf-8') as f:
for line in f:
try:
entry = json.loads(line.strip())
self.entries.append(entry)
except json.JSONDecodeError:
continue
print(f"共解析 {len(self.entries)} 条日志记录")
def identify_bot(self, user_agent):
"""识别爬虫类型"""
for bot_name, pattern in self.ai_bots.items():
if re.search(pattern, user_agent, re.IGNORECASE):
return bot_name
return 'Other'
def analyze_bots(self):
"""分析爬虫访问情况"""
bot_stats = defaultdict(lambda: {
'count': 0,
'status_codes': Counter(),
'urls': Counter(),
'avg_response_time': 0,
'response_times': []
})
for entry in self.entries:
ua = entry.get('http_user_agent', '')
bot_type = self.identify_bot(ua)
stats = bot_stats[bot_type]
stats['count'] += 1
stats['status_codes'][entry.get('status', 0)] += 1
stats['urls'][entry.get('request_uri', '')] += 1
stats['response_times'].append(entry.get('request_time', 0))
# 计算平均响应时间
for bot, stats in bot_stats.items():
if stats['response_times']:
stats['avg_response_time'] = sum(stats['response_times']) / len(stats['response_times'])
del stats['response_times']
return bot_stats
def generate_report(self, bot_stats):
"""生成分析报告"""
print("\n" + "="*60)
print("SEO/GEO 爬虫访问分析报告")
print("="*60)
# 按访问量排序
sorted_bots = sorted(bot_stats.items(), key=lambda x: x[1]['count'], reverse=True)
for bot_name, stats in sorted_bots:
if stats['count'] < 10: # 忽略访问量过少的
continue
print(f"\n📊 {bot_name}")
print(f" 访问次数: {stats['count']}")
print(f" 平均响应时间: {stats['avg_response_time']:.3f}s")
# 状态码分布
print(f" 状态码分布:")
for code, count in stats['status_codes'].most_common(5):
print(f" {code}: {count}次 ({count/stats['count']*100:.1f}%)")
# 最常访问的URL
print(f" 最常访问的URL (Top 5):")
for url, count in stats['urls'].most_common(5):
print(f" {url}: {count}次")
def export_csv(self, bot_stats, output_file):
"""导出CSV格式报告"""
import csv
with open(output_file, 'w', newline='', encoding='utf-8') as f:
writer = csv.writer(f)
writer.writerow(['爬虫类型', '访问次数', '平均响应时间(ms)', '200状态码占比', '4xx占比', '5xx占比'])
for bot_name, stats in bot_stats.items():
total = stats['count']
if total < 10:
continue
status_200 = stats['status_codes'].get(200, 0)
status_4xx = sum(v for k, v in stats['status_codes'].items() if 400 <= k < 500)
status_5xx = sum(v for k, v in stats['status_codes'].items() if 500 <= k < 600)
writer.writerow([
bot_name,
total,
round(stats['avg_response_time'] * 1000, 2),
f"{status_200/total*100:.1f}%",
f"{status_4xx/total*100:.1f}%",
f"{status_5xx/total*100:.1f}%"
])
print(f"\n📁 CSV报告已导出至: {output_file}")
def main():
parser = argparse.ArgumentParser(description='Nginx日志分析工具')
parser.add_argument('--log', '-l', required=True, help='日志文件路径')
parser.add_argument('--export', '-e', help='导出CSV报告路径')
args = parser.parse_args()
analyzer = NginxLogAnalyzer(args.log)
analyzer.parse_log()
bot_stats = analyzer.analyze_bots()
analyzer.generate_report(bot_stats)
if args.export:
analyzer.export_csv(bot_stats, args.export)
if __name__ == '__main__':
main()
实时监控脚本
#!/usr/bin/env python3
"""
实时监控Nginx日志中的爬虫访问
使用tail -f 模式
"""
import subprocess
import json
import re
import time
from datetime import datetime
class RealTimeBotMonitor:
def __init__(self, log_file):
self.log_file = log_file
self.ai_bots = ['GPTBot', 'GoogleOther', 'ClaudeBot', 'Bytespider', 'DeepSeek-Bot']
self.alert_threshold = 5 # 5秒内超过此数量触发告警
self.recent_hits = []
def monitor(self):
"""实时监控日志"""
process = subprocess.Popen(['tail', '-f', self.log_file],
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
universal_newlines=True)
print(f"🔍 开始实时监控爬虫访问... (按 Ctrl+C 停止)")
print(f" 监控文件: {self.log_file}")
print("-" * 50)
try:
for line in iter(process.stdout.readline, ''):
self.process_line(line)
except KeyboardInterrupt:
print("\n\n监控已停止")
process.terminate()
def process_line(self, line):
"""处理单行日志"""
try:
entry = json.loads(line.strip())
ua = entry.get('http_user_agent', '')
for bot in self.ai_bots:
if bot.lower() in ua.lower():
timestamp = entry.get('timestamp', datetime.now().isoformat())
uri = entry.get('request_uri', '')
status = entry.get('status', 0)
# 记录访问
self.recent_hits.append(time.time())
# 清理超过5秒的记录
self.recent_hits = [t for t in self.recent_hits if time.time() - t < 5]
# 输出访问信息
print(f"[{timestamp}] 🤖 {bot} | {status} | {uri}")
# 检查是否需要告警
if len(self.recent_hits) > self.alert_threshold:
print(f"⚠️ 告警: 5秒内 {bot} 访问次数超过 {self.alert_threshold} 次!")
except json.JSONDecodeError:
pass
if __name__ == '__main__':
monitor = RealTimeBotMonitor('/var/log/nginx/access.log')
monitor.monitor()
9.3.4 Go语言高性能分析脚本
Go语言在处理大规模日志时性能更优,适合生产环境。
// nginx_log_analyzer.go
package main
import (
"bufio"
"encoding/json"
"flag"
"fmt"
"os"
"regexp"
"sort"
"strings"
"time"
)
type LogEntry struct {
Timestamp string `json:"timestamp"`
RemoteAddr string `json:"remote_addr"`
RequestMethod string `json:"request_method"`
RequestURI string `json:"request_uri"`
Status int `json:"status"`
BodyBytesSent int `json:"body_bytes_sent"`
HTTPReferer string `json:"http_referer"`
HTTPUserAgent string `json:"http_user_agent"`
RequestTime float64 `json:"request_time"`
}
type BotStats struct {
Count int
StatusCodes map[int]int
URLs map[string]int
TotalResponseTime float64
}
func main() {
logFile := flag.String("log", "/var/log/nginx/access.log", "日志文件路径")
flag.Parse()
file, err := os.Open(*logFile)
if err != nil {
fmt.Printf("无法打开日志文件: %v\n", err)
os.Exit(1)
}
defer file.Close()
scanner := bufio.NewScanner(file)
botPatterns := map[string]*regexp.Regexp{
"GPTBot": regexp.MustCompile(`(?i)GPTBot`),
"GoogleOther": regexp.MustCompile(`(?i)GoogleOther`),
"Googlebot": regexp.MustCompile(`(?i)Googlebot`),
"ClaudeBot": regexp.MustCompile(`(?i)ClaudeBot`),
"Bytespider": regexp.MustCompile(`(?i)Bytespider`),
"DeepSeek": regexp.MustCompile(`(?i)DeepSeek-Bot`),
"Baiduspider": regexp.MustCompile(`(?i)Baiduspider`),
}
stats := make(map[string]*BotStats)
totalLines := 0
for scanner.Scan() {
line := scanner.Text()
totalLines++
var entry LogEntry
if err := json.Unmarshal([]byte(line), &entry); err != nil {
continue
}
// 识别爬虫
botName := "Other"
for name, pattern := range botPatterns {
if pattern.MatchString(entry.HTTPUserAgent) {
botName = name
break
}
}
// 更新统计
if _, exists := stats[botName]; !exists {
stats[botName] = &BotStats{
StatusCodes: make(map[int]int),
URLs: make(map[string]int),
}
}
s := stats[botName]
s.Count++
s.StatusCodes[entry.Status]++
s.URLs[entry.RequestURI]++
s.TotalResponseTime += entry.RequestTime
}
// 输出报告
fmt.Println(strings.Repeat("=", 60))
fmt.Println("SEO/GEO 爬虫访问分析报告 (Go版本)")
fmt.Println(strings.Repeat("=", 60))
fmt.Printf("总日志行数: %d\n\n", totalLines)
// 按访问量排序
type kv struct {
Key string
Value *BotStats
}
var sortedStats []kv
for k, v := range stats {
sortedStats = append(sortedStats, kv{k, v})
}
sort.Slice(sortedStats, func(i, j int) bool {
return sortedStats[i].Value.Count > sortedStats[j].Value.Count
})
for _, item := range sortedStats {
if item.Value.Count < 10 {
continue
}
avgResponseTime := item.Value.TotalResponseTime / float64(item.Value.Count)
fmt.Printf("\n📊 %s\n", item.Key)
fmt.Printf(" 访问次数: %d\n", item.Value.Count)
fmt.Printf(" 平均响应时间: %.3fs\n", avgResponseTime)
fmt.Printf(" 状态码分布:\n")
type sc struct {
Code int
Count int
}
var sortedCodes []sc
for code, count := range item.Value.StatusCodes {
sortedCodes = append(sortedCodes, sc{code, count})
}
sort.Slice(sortedCodes, func(i, j int) bool {
return sortedCodes[i].Count > sortedCodes[j].Count
})
for _, scItem := range sortedCodes[:min(5, len(sortedCodes))] {
pct := float64(scItem.Count) / float64(item.Value.Count) * 100
fmt.Printf(" %d: %d次 (%.1f%%)\n", scItem.Code, scItem.Count, pct)
}
fmt.Printf(" 最常访问的URL (Top 5):\n")
type urlItem struct {
URL string
Count int
}
var sortedURLs []urlItem
for url, count := range item.Value.URLs {
sortedURLs = append(sortedURLs, urlItem{url, count})
}
sort.Slice(sortedURLs, func(i, j int) bool {
return sortedURLs[i].Count > sortedURLs[j].Count
})
for _, urlItem := range sortedURLs[:min(5, len(sortedURLs))] {
fmt.Printf(" %s: %d次\n", urlItem.URL, urlItem.Count)
}
}
}
func min(a, b int) int {
if a < b {
return a
}
return b
}
9.3.5 Node.js快速分析脚本
Node.js适合快速原型开发和与现有Node.js生态集成。
// nginx_log_analyzer.js
const fs = require('fs');
const readline = require('readline');
const path = require('path');
class NginxLogAnalyzer {
constructor(logFile) {
this.logFile = logFile;
this.aiBots = {
'GPTBot': /GPTBot/i,
'GoogleOther': /GoogleOther/i,
'Googlebot': /Googlebot/i,
'ClaudeBot': /ClaudeBot/i,
'CCBot': /CCBot/i,
'Bytespider': /Bytespider/i,
'DeepSeek-Bot': /DeepSeek-Bot/i,
'Baiduspider': /Baiduspider/i,
'Bingbot': /bingbot/i,
};
this.stats = {};
this.totalLines = 0;
}
async analyze() {
const fileStream = fs.createReadStream(this.logFile);
const rl = readline.createInterface({
input: fileStream,
crlfDelay: Infinity
});
for await (const line of rl) {
this.totalLines++;
try {
const entry = JSON.parse(line);
this.processEntry(entry);
} catch (e) {
// 跳过非JSON格式的行
}
}
this.generateReport();
}
processEntry(entry) {
const ua = entry.http_user_agent || '';
let botName = 'Other';
for (const [name, pattern] of Object.entries(this.aiBots)) {
if (pattern.test(ua)) {
botName = name;
break;
}
}
if (!this.stats[botName]) {
this.stats[botName] = {
count: 0,
statusCodes: {},
urls: {},
responseTimes: []
};
}
const stats = this.stats[botName];
stats.count++;
const status = entry.status || 0;
stats.statusCodes[status] = (stats.statusCodes[status] || 0) + 1;
const uri = entry.request_uri || '';
stats.urls[uri] = (stats.urls[uri] || 0) + 1;
stats.responseTimes.push(entry.request_time || 0);
}
generateReport() {
console.log('='.repeat(60));
console.log('SEO/GEO 爬虫访问分析报告 (Node.js版本)');
console.log('='.repeat(60));
console.log(`总日志行数: ${this.totalLines}\n`);
// 按访问量排序
const sortedBots = Object.entries(this.stats)
.filter(([_, stats]) => stats.count >= 10)
.sort((a, b) => b[1].count - a[1].count);
for (const [botName, stats] of sortedBots) {
const avgResponseTime = stats.responseTimes.reduce((a, b) => a + b, 0) / stats.count;
console.log(`\n📊 ${botName}`);
console.log(` 访问次数: ${stats.count}`);
console.log(` 平均响应时间: ${avgResponseTime.toFixed(3)}s`);
console.log(` 状态码分布:`);
const sortedCodes = Object.entries(stats.statusCodes)
.sort((a, b) => b[1] - a[1])
.slice(0, 5);
for (const [code, count] of sortedCodes) {
const pct = (count / stats.count * 100).toFixed(1);
console.log(` ${code}: ${count}次 (${pct}%)`);
}
console.log(` 最常访问的URL (Top 5):`);
const sortedURLs = Object.entries(stats.urls)
.sort((a, b) => b[1] - a[1])
.slice(0, 5);
for (const [url, count] of sortedURLs) {
console.log(` ${url}: ${count}次`);
}
}
}
}
// 使用示例
const analyzer = new NginxLogAnalyzer('/var/log/nginx/access.log');
analyzer.analyze().catch(console.error);
9.3.6 高级分析:爬虫行为模式识别
识别异常爬取模式
def detect_anomalous_patterns(self, bot_stats):
"""检测异常爬取模式"""
anomalies = []
for bot_name, stats in bot_stats.items():
if stats['count'] < 50:
continue
# 1. 检查高错误率
error_4xx = sum(v for k, v in stats['status_codes'].items() if 400 <= k < 500)
error_5xx = sum(v for k, v in stats['status_codes'].items() if 500 <= k < 600)
error_rate = (error_4xx + error_5xx) / stats['count']
if error_rate > 0.1: # 错误率超过10%
anomalies.append({
'bot': bot_name,
'type': 'high_error_rate',
'detail': f"错误率: {error_rate:.1%}"
})
# 2. 检查过快的爬取速度
# 需要时间戳分析,这里简化处理
# 3. 检查重复爬取相同URL
url_repeat_rate = sum(1 for v in stats['urls'].values() if v > 10) / len(stats['urls'])
if url_repeat_rate > 0.3:
anomalies.append({
'bot': bot_name,
'type': 'excessive_repeats',
'detail': f"重复爬取率: {url_repeat_rate:.1%}"
})
return anomalies
9.3.7 日志分析最佳实践
1. 日志轮转处理
def analyze_rotated_logs(log_dir, days=7):
"""分析最近N天的日志"""
import glob
from datetime import datetime, timedelta
all_entries = []
cutoff_date = datetime.now() - timedelta(days=days)
for log_file in glob.glob(f"{log_dir}/access.log*"):
# 解析日志文件名中的日期
# 根据实际轮转策略调整
analyzer = NginxLogAnalyzer(log_file)
analyzer.parse_log()
all_entries.extend(analyzer.entries)
# 过滤时间范围
filtered = [
entry for entry in all_entries
if datetime.fromisoformat(entry['timestamp']) >= cutoff_date
]
print(f"分析 {len(filtered)} 条记录 (最近 {days} 天)")
return filtered
2. 与监控系统集成
def send_to_prometheus(bot_stats):
"""将爬虫统计发送到Prometheus"""
from prometheus_client import Gauge, push_to_gateway
bot_count = Gauge('nginx_bot_requests_total', '爬虫请求总数', ['bot'])
bot_error_rate = Gauge('nginx_bot_error_rate', '爬虫错误率', ['bot'])
bot_response_time = Gauge('nginx_bot_response_time_seconds', '爬虫平均响应时间', ['bot'])
for bot_name, stats in bot_stats.items():
bot_count.labels(bot=bot_name).set(stats['count'])
error_count = sum(v for k, v in stats['status_codes'].items() if k >= 400)
error_rate = error_count / stats['count'] if stats['count'] > 0 else 0
bot_error_rate.labels(bot=bot_name).set(error_rate)
bot_response_time.labels(bot=bot_name).set(stats['avg_response_time'])
push_to_gateway('localhost:9091', job='nginx_log_analysis')
3. 自动化定时任务
#!/bin/bash
# cron_task.sh - 每日日志分析任务
# 配置
LOG_FILE="/var/log/nginx/access.log"
REPORT_DIR="/var/reports/nginx"
DATE=$(date +%Y%m%d)
# 创建报告目录
mkdir -p $REPORT_DIR
# 运行Python分析脚本
python3 /opt/scripts/nginx_log_analyzer.py \
--log $LOG_FILE \
--export "${REPORT_DIR}/report_${DATE}.csv" \
> "${REPORT_DIR}/report_${DATE}.txt"
# 发送告警(如果有异常)
if grep -q "⚠️" "${REPORT_DIR}/report_${DATE}.txt"; then
# 发送到企业微信/钉钉/Slack
curl -X POST -H "Content-Type: application/json" \
-d "{\"msgtype\":\"text\",\"text\":{\"content\":\"Nginx日志异常告警: $(date)\"}}" \
"https://qyapi.weixin.qq.com/cgi-bin/webhook/send?key=YOUR_KEY"
fi
# 压缩旧报告
find $REPORT_DIR -name "*.csv" -mtime +30 -exec gzip {} \;
9.3.8 实战案例:识别GPTBot访问模式
# 分析GPTBot的访问特征
def analyze_gptbot_patterns(entries):
"""分析GPTBot的访问模式"""
gptbot_entries = [
e for e in entries
if 'GPTBot' in e.get('http_user_agent', '')
]
if not gptbot_entries:
print("未检测到GPTBot访问")
return
print(f"GPTBot访问统计:")
print(f" 总访问次数: {len(gptbot_entries)}")
# 分析访问时间分布
from collections import Counter
hour_distribution = Counter()
for entry in gptbot_entries:
try:
hour = datetime.fromisoformat(entry['timestamp']).hour
hour_distribution[hour] += 1
except:
pass
print(f" 访问时间分布 (UTC):")
for hour in range(24):
count = hour_distribution.get(hour, 0)
if count > 0:
bar = '█' * (count // 10)
print(f" {hour:02d}:00 - {bar} {count}")
# 分析访问深度
url_depth = Counter()
for entry in gptbot_entries:
uri = entry.get('request_uri', '')
depth = len(uri.strip('/').split('/'))
url_depth[depth] += 1
print(f" 访问深度分布:")
for depth, count in sorted(url_depth.items()):
print(f" 深度 {depth}: {count}次")
通过以上Python、Go、Node.js三种语言的日志分析脚本,全栈工程师可以根据团队技术栈和性能需求选择合适的方案。这些脚本不仅能够识别AI爬虫的访问模式,还能帮助优化服务器配置、调整爬虫策略,从而提升GEO优化效果。
