import re from datetime import datetime import pandas as pd from collections import defaultdict import os from concurrent.futures import ThreadPoolExecutor # 定义日志解析函数 def parse_log_line(line): log_pattern = re.compile( r'(?P\d+\.\d+\.\d+\.\d+) - - \[(?P[^\]]+)\] "(?P[^"]+)" (?P\d+) (?P\d+) "(?P[^"]+)" "(?P[^"]+)"' ) match = log_pattern.match(line) if match: log_data = match.groupdict() # 移除时区信息 log_data['timestamp'] = datetime.strptime(log_data['timestamp'], '%d/%b/%Y:%H:%M:%S %z').replace(tzinfo=None) # 分割请求字符串并处理可能的索引错误 request_parts = log_data['request'].split() if len(request_parts) >= 3: log_data['request_method'] = request_parts[0] log_data['request_path'] = request_parts[1] log_data['request_protocol'] = request_parts[2] else: log_data['request_method'] = None log_data['request_path'] = None log_data['request_protocol'] = None return log_data return None # 更新统计信息的通用函数 def update_statistics(statistics, key, status): statistics[key]['count'] += 1 statistics[key]['status'][status] += 1 # 解析日志部分 def parse_log_chunk(chunk): ip_stats = defaultdict(lambda: {'count': 0, 'status': defaultdict(int)}) ua_stats = defaultdict(lambda: {'count': 0, 'status': defaultdict(int)}) log_entries = [] for line in chunk: log_data = parse_log_line(line) if log_data: ip = log_data['ip'] status = log_data['status'] user_agent = log_data['user_agent'] update_statistics(ip_stats, ip, status) update_statistics(ua_stats, user_agent, status) log_entries.append(log_data) return ip_stats, ua_stats, log_entries # 读取并解析日志文件 def parse_log_file(file_path, output_path): common_statuses = [200, 301, 302, 400, 403, 404, 500] chunk_size = 10000 # 每个部分包含10000行 def read_log_file_in_chunks(file_path, chunk_size=chunk_size): with open(file_path, 'r', encoding='utf-8') as file: while True: chunk = [next(file, None) for _ in range(chunk_size)] if not any(chunk): break yield [line for line in chunk if line] # 使用多线程处理每个部分 with ThreadPoolExecutor(max_workers=os.cpu_count() or 4) as executor: results = list(executor.map(parse_log_chunk, read_log_file_in_chunks(file_path))) # 合并结果 ip_stats, ua_stats, log_entries = merge_statistics(results) # 限制原始日志表至多保存100万条最新日志 log_entries = log_entries[-1000000:] # 创建IP表数据 ip_data = [[ip, stats['count']] + [stats['status'].get(str(status), 0) for status in common_statuses] for ip, stats in ip_stats.items()] # 创建UA表数据 ua_data = [[ua, stats['count']] + [stats['status'].get(str(status), 0) for status in common_statuses] for ua, stats in ua_stats.items()] # 创建DataFrame ip_columns = ['IP', '访问次数'] + [str(status) for status in common_statuses] ua_columns = ['客户端', '访问次数'] + [str(status) for status in common_statuses] ip_df = pd.DataFrame(ip_data, columns=ip_columns).sort_values(by='访问次数', ascending=False).reset_index(drop=True) ua_df = pd.DataFrame(ua_data, columns=ua_columns).sort_values(by='访问次数', ascending=False).reset_index(drop=True) log_columns = { 'ip': 'IP', 'timestamp': '时间戳', 'request_method': '请求方法', 'request_path': '请求路径', 'request_protocol': '请求协议', 'status': '状态码', 'size': '大小', 'referrer': '引荐来源', 'user_agent': '客户端' } log_df = pd.DataFrame(log_entries).rename(columns=log_columns) # 创建Excel文件 with pd.ExcelWriter(output_path, engine='openpyxl') as writer: log_df.to_excel(writer, sheet_name='原日志', index=False) ip_df.to_excel(writer, sheet_name='IP表', index=False) ua_df.to_excel(writer, sheet_name='UA表', index=False) def merge_statistics(results): ip_stats = defaultdict(lambda: {'count': 0, 'status': defaultdict(int)}) ua_stats = defaultdict(lambda: {'count': 0, 'status': defaultdict(int)}) log_entries = [] for ip_s, ua_s, logs in results: for ip, stats in ip_s.items(): ip_stats[ip]['count'] += stats['count'] for status, count in stats['status'].items(): ip_stats[ip]['status'][status] += count for ua, stats in ua_s.items(): ua_stats[ua]['count'] += stats['count'] for status, count in stats['status'].items(): ua_stats[ua]['status'][status] += count log_entries.extend(logs) return ip_stats, ua_stats, log_entries def process_log_file(file_path, output_folder): timestamp = datetime.now().strftime('%Y%m%d%H%M%S') filename = os.path.basename(file_path) new_filename = f"{os.path.splitext(filename)[0]}_{timestamp}.xlsx" output_path = os.path.join(output_folder, new_filename) try: parse_log_file(file_path, output_path) print(f"原文件:{filename}已分析并保存至{output_path}") except Exception as e: print(f"处理文件 {filename} 时出错: {e}") def process_logs_in_folder(log_folder, output_folder): if not os.path.exists(output_folder): os.makedirs(output_folder) log_files = [os.path.join(log_folder, filename) for filename in os.listdir(log_folder) if filename.endswith('.log')] with ThreadPoolExecutor(max_workers=os.cpu_count() or 4) as executor: executor.map(process_log_file, log_files, [output_folder] * len(log_files)) if __name__ == '__main__': log_folder = './AimLog' output_folder = './AnaResult' try: process_logs_in_folder(log_folder, output_folder) except Exception as e: print(f"处理日志文件夹时出错: {e}")