Tailwind CSSTailwind CSS
Home
  • Tailwind CSS 书籍目录
  • Vue 3 开发实战指南
  • React 和 Next.js 学习
  • TypeScript
  • React开发框架书籍大纲
  • Shadcn学习大纲
  • Swift 编程语言:从入门到进阶
  • SwiftUI 学习指南
  • 函数式编程大纲
  • Swift 异步编程语言
  • Swift 协议化编程
  • SwiftUI MVVM 开发模式
  • SwiftUI 图表开发书籍
  • SwiftData
  • ArkTS编程语言:从入门到精通
  • 仓颉编程语言:从入门到精通
  • 鸿蒙手机客户端开发实战
  • WPF书籍
  • C#开发书籍
learn
  • 搜索未来:SEO与GEO双引擎实战手册
  • Java编程语言
  • Kotlin 编程入门与实战
  • /python/outline.html
  • Rust 开发入门
  • AI Agent
  • MCP (Model Context Protocol) 应用指南
  • 深度学习
  • 深度学习
  • 强化学习: 理论与实践
  • 扩散模型书籍
  • Agentic AI for Everyone
langchain
Home
  • Tailwind CSS 书籍目录
  • Vue 3 开发实战指南
  • React 和 Next.js 学习
  • TypeScript
  • React开发框架书籍大纲
  • Shadcn学习大纲
  • Swift 编程语言:从入门到进阶
  • SwiftUI 学习指南
  • 函数式编程大纲
  • Swift 异步编程语言
  • Swift 协议化编程
  • SwiftUI MVVM 开发模式
  • SwiftUI 图表开发书籍
  • SwiftData
  • ArkTS编程语言:从入门到精通
  • 仓颉编程语言:从入门到精通
  • 鸿蒙手机客户端开发实战
  • WPF书籍
  • C#开发书籍
learn
  • 搜索未来:SEO与GEO双引擎实战手册
  • Java编程语言
  • Kotlin 编程入门与实战
  • /python/outline.html
  • Rust 开发入门
  • AI Agent
  • MCP (Model Context Protocol) 应用指南
  • 深度学习
  • 深度学习
  • 强化学习: 理论与实践
  • 扩散模型书籍
  • Agentic AI for Everyone
langchain
  • 附录E:全栈代码示例库

附录E:全栈代码示例库

E.3 Nginx日志分析脚本

目标

分析Nginx访问日志,提取搜索引擎爬虫(如Googlebot、GPTBot、Bingbot等)的访问行为,用于监控爬虫频率、识别异常抓取,并为GEO优化提供数据支持。

脚本功能

  • 解析标准Nginx日志格式。
  • 按User-Agent过滤出特定爬虫(可配置)。
  • 统计每个爬虫的请求次数、状态码分布、请求的URL Top N。
  • 输出结构化结果(JSON或CSV),便于后续集成到监控系统(如Prometheus/Grafana)。

Python 实现脚本

#!/usr/bin/env python3
"""
Nginx日志分析脚本:提取搜索引擎爬虫行为。
支持日志格式:combined (默认Nginx格式)
使用方法:python3 analyze_nginx_logs.py /var/log/nginx/access.log
"""

import re
import sys
import json
from collections import defaultdict, Counter
from datetime import datetime

# 配置:要监控的爬虫User-Agent关键字
TARGET_CRAWLERS = [
    'Googlebot',
    'GPTBot',
    'Bingbot',
    'Baiduspider',
    'Bytespider',
    'ClaudeBot',
    'CCBot',
    'DeepSeek-Bot',
    'YandexBot',
    'Applebot',
]

# 日志正则(Nginx combined格式)
LOG_PATTERN = re.compile(
    r'(?P<ip>\S+) - (?P<user>\S+) \[(?P<time>[^\]]+)\] '
    r'"(?P<method>\S+) (?P<path>\S+) (?P<protocol>\S+)" '
    r'(?P<status>\d{3}) (?P<size>\S+) "(?P<referer>[^"]*)" '
    r'"(?P<ua>[^"]*)"'
)

def parse_log_line(line):
    """解析单行日志,返回字典或None"""
    match = LOG_PATTERN.match(line)
    if not match:
        return None
    return match.groupdict()

def identify_crawler(ua_string):
    """根据User-Agent识别爬虫类型"""
    for crawler in TARGET_CRAWLERS:
        if crawler.lower() in ua_string.lower():
            return crawler
    return None

def analyze_logs(log_file_path):
    """主分析函数"""
    crawler_stats = defaultdict(lambda: {
        'total_requests': 0,
        'status_codes': Counter(),
        'urls': Counter(),
        'first_seen': None,
        'last_seen': None,
    })

    with open(log_file_path, 'r', encoding='utf-8', errors='ignore') as f:
        for line in f:
            parsed = parse_log_line(line)
            if not parsed:
                continue

            ua = parsed['ua']
            crawler = identify_crawler(ua)
            if not crawler:
                continue

            stats = crawler_stats[crawler]
            stats['total_requests'] += 1
            stats['status_codes'][parsed['status']] += 1
            stats['urls'][parsed['path']] += 1

            # 记录时间
            try:
                log_time_str = parsed['time'].split(' ')[0]  # 去除时区
                log_time = datetime.strptime(log_time_str, '%d/%b/%Y:%H:%M:%S')
                if stats['first_seen'] is None or log_time < stats['first_seen']:
                    stats['first_seen'] = log_time
                if stats['last_seen'] is None or log_time > stats['last_seen']:
                    stats['last_seen'] = log_time
            except ValueError:
                pass

    # 格式化输出
    result = {}
    for crawler, stats in crawler_stats.items():
        result[crawler] = {
            'total_requests': stats['total_requests'],
            'unique_urls': len(stats['urls']),
            'status_code_distribution': dict(stats['status_codes'].most_common()),
            'top_urls': [{'url': url, 'count': cnt} for url, cnt in stats['urls'].most_common(10)],
            'first_seen': stats['first_seen'].isoformat() if stats['first_seen'] else None,
            'last_seen': stats['last_seen'].isoformat() if stats['last_seen'] else None,
        }

    return result

if __name__ == '__main__':
    if len(sys.argv) != 2:
        print("用法: python3 analyze_nginx_logs.py <log_file_path>")
        sys.exit(1)

    log_path = sys.argv[1]
    print(f"正在分析日志文件: {log_path}")
    analysis_result = analyze_logs(log_path)

    # 输出JSON结果
    output_json = json.dumps(analysis_result, indent=2, ensure_ascii=False)
    print("\n===== 爬虫分析报告 =====")
    print(output_json)

    # 可选:保存到文件
    with open('crawler_analysis_report.json', 'w', encoding='utf-8') as f:
        f.write(output_json)
    print("\n报告已保存至 crawler_analysis_report.json")

使用说明

  1. 保存脚本:将上述代码保存为 analyze_nginx_logs.py。
  2. 运行脚本:
    python3 analyze_nginx_logs.py /var/log/nginx/access.log
    
  3. 输出示例(JSON片段):
    {
      "Googlebot": {
        "total_requests": 1523,
        "unique_urls": 342,
        "status_code_distribution": {
          "200": 1400,
          "304": 100,
          "404": 20,
          "301": 3
        },
        "top_urls": [
          {"url": "/", "count": 150},
          {"url": "/blog/", "count": 89},
          {"url": "/product/ai-tools", "count": 45}
        ],
        "first_seen": "2024-05-01T00:00:00",
        "last_seen": "2024-05-31T23:59:59"
      },
      "GPTBot": {
        "total_requests": 87,
        "unique_urls": 34,
        "status_code_distribution": {
          "200": 80,
          "403": 7
        },
        "top_urls": [
          {"url": "/faq/", "count": 20},
          {"url": "/docs/api-reference", "count": 15}
        ],
        "first_seen": "2024-05-15T10:00:00",
        "last_seen": "2024-05-30T18:00:00"
      }
    }
    

集成到监控系统

  1. Prometheus Exporter:将脚本改为常驻进程,定期扫描日志,将 total_requests 等指标暴露为Prometheus metrics。
  2. 定时任务:通过Cron每日运行一次,生成报告并发送到Grafana(通过Loki或直接JSON API)。
  3. 告警:当某个爬虫的 404 错误率突然升高,或 GPTBot 请求量激增时,触发告警。

扩展建议

  • 支持其他日志格式:修改 LOG_PATTERN 正则表达式以匹配JSON日志或自定义格式。
  • 实时流处理:使用 tail -f 或日志流工具(如Filebeat + Logstash)实现近实时分析。
  • 地理信息:结合IP库,分析爬虫来源地区。
  • 性能影响:对于高流量站点,建议使用流式处理框架(如Apache Flink)或只分析采样日志。
Last Updated:: 5/9/26, 5:13 PM