Breakthrough/AnalysisBase.py
2025-02-12 13:46:59 +08:00

161 lines
6.1 KiB
Python

import re
from datetime import datetime
import pandas as pd
from collections import defaultdict
import os
from concurrent.futures import ThreadPoolExecutor
# 定义日志解析函数
def parse_log_line(line):
log_pattern = re.compile(
r'(?P<ip>\d+\.\d+\.\d+\.\d+) - - \[(?P<timestamp>[^\]]+)\] "(?P<request>[^"]+)" (?P<status>\d+) (?P<size>\d+) "(?P<referrer>[^"]+)" "(?P<user_agent>[^"]+)"'
)
match = log_pattern.match(line)
if match:
log_data = match.groupdict()
# 移除时区信息
log_data['timestamp'] = datetime.strptime(log_data['timestamp'], '%d/%b/%Y:%H:%M:%S %z').replace(tzinfo=None)
# 分割请求字符串并处理可能的索引错误
request_parts = log_data['request'].split()
if len(request_parts) >= 3:
log_data['request_method'] = request_parts[0]
log_data['request_path'] = request_parts[1]
log_data['request_protocol'] = request_parts[2]
else:
log_data['request_method'] = None
log_data['request_path'] = None
log_data['request_protocol'] = None
return log_data
return None
# 更新统计信息的通用函数
def update_statistics(statistics, key, status):
statistics[key]['count'] += 1
statistics[key]['status'][status] += 1
# 解析日志部分
def parse_log_chunk(chunk):
ip_stats = defaultdict(lambda: {'count': 0, 'status': defaultdict(int)})
ua_stats = defaultdict(lambda: {'count': 0, 'status': defaultdict(int)})
log_entries = []
for line in chunk:
log_data = parse_log_line(line)
if log_data:
ip = log_data['ip']
status = log_data['status']
user_agent = log_data['user_agent']
update_statistics(ip_stats, ip, status)
update_statistics(ua_stats, user_agent, status)
log_entries.append(log_data)
return ip_stats, ua_stats, log_entries
# 读取并解析日志文件
def parse_log_file(file_path, output_path):
common_statuses = [200, 301, 302, 400, 403, 404, 500]
chunk_size = 10000 # 每个部分包含10000行
def read_log_file_in_chunks(file_path, chunk_size=chunk_size):
with open(file_path, 'r', encoding='utf-8') as file:
while True:
chunk = [next(file, None) for _ in range(chunk_size)]
if not any(chunk):
break
yield [line for line in chunk if line]
# 使用多线程处理每个部分
with ThreadPoolExecutor(max_workers=os.cpu_count() or 4) as executor:
results = list(executor.map(parse_log_chunk, read_log_file_in_chunks(file_path)))
# 合并结果
ip_stats, ua_stats, log_entries = merge_statistics(results)
# 限制原始日志表至多保存100万条最新日志
log_entries = log_entries[-1000000:]
# 创建IP表数据
ip_data = [[ip, stats['count']] + [stats['status'].get(str(status), 0) for status in common_statuses] for ip, stats in ip_stats.items()]
# 创建UA表数据
ua_data = [[ua, stats['count']] + [stats['status'].get(str(status), 0) for status in common_statuses] for ua, stats in ua_stats.items()]
# 创建DataFrame
ip_columns = ['IP', '访问次数'] + [str(status) for status in common_statuses]
ua_columns = ['客户端', '访问次数'] + [str(status) for status in common_statuses]
ip_df = pd.DataFrame(ip_data, columns=ip_columns).sort_values(by='访问次数', ascending=False).reset_index(drop=True)
ua_df = pd.DataFrame(ua_data, columns=ua_columns).sort_values(by='访问次数', ascending=False).reset_index(drop=True)
log_columns = {
'ip': 'IP',
'timestamp': '时间戳',
'request_method': '请求方法',
'request_path': '请求路径',
'request_protocol': '请求协议',
'status': '状态码',
'size': '大小',
'referrer': '引荐来源',
'user_agent': '客户端'
}
log_df = pd.DataFrame(log_entries).rename(columns=log_columns)
# 创建Excel文件
with pd.ExcelWriter(output_path, engine='openpyxl') as writer:
log_df.to_excel(writer, sheet_name='原日志', index=False)
ip_df.to_excel(writer, sheet_name='IP表', index=False)
ua_df.to_excel(writer, sheet_name='UA表', index=False)
def merge_statistics(results):
ip_stats = defaultdict(lambda: {'count': 0, 'status': defaultdict(int)})
ua_stats = defaultdict(lambda: {'count': 0, 'status': defaultdict(int)})
log_entries = []
for ip_s, ua_s, logs in results:
for ip, stats in ip_s.items():
ip_stats[ip]['count'] += stats['count']
for status, count in stats['status'].items():
ip_stats[ip]['status'][status] += count
for ua, stats in ua_s.items():
ua_stats[ua]['count'] += stats['count']
for status, count in stats['status'].items():
ua_stats[ua]['status'][status] += count
log_entries.extend(logs)
return ip_stats, ua_stats, log_entries
def process_log_file(file_path, output_folder):
timestamp = datetime.now().strftime('%Y%m%d%H%M%S')
filename = os.path.basename(file_path)
new_filename = f"{os.path.splitext(filename)[0]}_{timestamp}.xlsx"
output_path = os.path.join(output_folder, new_filename)
try:
parse_log_file(file_path, output_path)
print(f"原文件:{filename}已分析并保存至{output_path}")
except Exception as e:
print(f"处理文件 {filename} 时出错: {e}")
def process_logs_in_folder(log_folder, output_folder):
if not os.path.exists(output_folder):
os.makedirs(output_folder)
log_files = [os.path.join(log_folder, filename) for filename in os.listdir(log_folder) if filename.endswith('.log')]
with ThreadPoolExecutor(max_workers=os.cpu_count() or 4) as executor:
executor.map(process_log_file, log_files, [output_folder] * len(log_files))
if __name__ == '__main__':
log_folder = './AimLog'
output_folder = './AnaResult'
try:
process_logs_in_folder(log_folder, output_folder)
except Exception as e:
print(f"处理日志文件夹时出错: {e}")