Compare commits
No commits in common. "d7fb609def0cca47fc59f9f4ea1f753f6ff7770d" and "9f0f0c7039a621926633ec4a4fd6f4b87bc11347" have entirely different histories.
d7fb609def
...
9f0f0c7039
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
161
AnalysisBase.py
161
AnalysisBase.py
@ -1,161 +0,0 @@
|
||||
import re
|
||||
from datetime import datetime
|
||||
import pandas as pd
|
||||
from collections import defaultdict
|
||||
import os
|
||||
from concurrent.futures import ThreadPoolExecutor
|
||||
|
||||
# 定义日志解析函数
|
||||
def parse_log_line(line):
|
||||
log_pattern = re.compile(
|
||||
r'(?P<ip>\d+\.\d+\.\d+\.\d+) - - \[(?P<timestamp>[^\]]+)\] "(?P<request>[^"]+)" (?P<status>\d+) (?P<size>\d+) "(?P<referrer>[^"]+)" "(?P<user_agent>[^"]+)"'
|
||||
)
|
||||
match = log_pattern.match(line)
|
||||
if match:
|
||||
log_data = match.groupdict()
|
||||
# 移除时区信息
|
||||
log_data['timestamp'] = datetime.strptime(log_data['timestamp'], '%d/%b/%Y:%H:%M:%S %z').replace(tzinfo=None)
|
||||
|
||||
# 分割请求字符串并处理可能的索引错误
|
||||
request_parts = log_data['request'].split()
|
||||
if len(request_parts) >= 3:
|
||||
log_data['request_method'] = request_parts[0]
|
||||
log_data['request_path'] = request_parts[1]
|
||||
log_data['request_protocol'] = request_parts[2]
|
||||
else:
|
||||
log_data['request_method'] = None
|
||||
log_data['request_path'] = None
|
||||
log_data['request_protocol'] = None
|
||||
|
||||
return log_data
|
||||
return None
|
||||
|
||||
# 更新统计信息的通用函数
|
||||
def update_statistics(statistics, key, status):
|
||||
statistics[key]['count'] += 1
|
||||
statistics[key]['status'][status] += 1
|
||||
|
||||
# 解析日志部分
|
||||
def parse_log_chunk(chunk):
|
||||
ip_stats = defaultdict(lambda: {'count': 0, 'status': defaultdict(int)})
|
||||
ua_stats = defaultdict(lambda: {'count': 0, 'status': defaultdict(int)})
|
||||
log_entries = []
|
||||
|
||||
for line in chunk:
|
||||
log_data = parse_log_line(line)
|
||||
if log_data:
|
||||
ip = log_data['ip']
|
||||
status = log_data['status']
|
||||
user_agent = log_data['user_agent']
|
||||
|
||||
update_statistics(ip_stats, ip, status)
|
||||
update_statistics(ua_stats, user_agent, status)
|
||||
|
||||
log_entries.append(log_data)
|
||||
|
||||
return ip_stats, ua_stats, log_entries
|
||||
|
||||
# 读取并解析日志文件
|
||||
def parse_log_file(file_path, output_path):
|
||||
common_statuses = [200, 301, 302, 400, 403, 404, 500]
|
||||
chunk_size = 10000 # 每个部分包含10000行
|
||||
|
||||
def read_log_file_in_chunks(file_path, chunk_size=chunk_size):
|
||||
with open(file_path, 'r', encoding='utf-8') as file:
|
||||
while True:
|
||||
chunk = [next(file, None) for _ in range(chunk_size)]
|
||||
if not any(chunk):
|
||||
break
|
||||
yield [line for line in chunk if line]
|
||||
|
||||
# 使用多线程处理每个部分
|
||||
with ThreadPoolExecutor(max_workers=os.cpu_count() or 4) as executor:
|
||||
results = list(executor.map(parse_log_chunk, read_log_file_in_chunks(file_path)))
|
||||
|
||||
# 合并结果
|
||||
ip_stats, ua_stats, log_entries = merge_statistics(results)
|
||||
|
||||
# 限制原始日志表至多保存100万条最新日志
|
||||
log_entries = log_entries[-1000000:]
|
||||
|
||||
# 创建IP表数据
|
||||
ip_data = [[ip, stats['count']] + [stats['status'].get(str(status), 0) for status in common_statuses] for ip, stats in ip_stats.items()]
|
||||
|
||||
# 创建UA表数据
|
||||
ua_data = [[ua, stats['count']] + [stats['status'].get(str(status), 0) for status in common_statuses] for ua, stats in ua_stats.items()]
|
||||
|
||||
# 创建DataFrame
|
||||
ip_columns = ['IP', '访问次数'] + [str(status) for status in common_statuses]
|
||||
ua_columns = ['客户端', '访问次数'] + [str(status) for status in common_statuses]
|
||||
|
||||
ip_df = pd.DataFrame(ip_data, columns=ip_columns).sort_values(by='访问次数', ascending=False).reset_index(drop=True)
|
||||
ua_df = pd.DataFrame(ua_data, columns=ua_columns).sort_values(by='访问次数', ascending=False).reset_index(drop=True)
|
||||
|
||||
log_columns = {
|
||||
'ip': 'IP',
|
||||
'timestamp': '时间戳',
|
||||
'request_method': '请求方法',
|
||||
'request_path': '请求路径',
|
||||
'request_protocol': '请求协议',
|
||||
'status': '状态码',
|
||||
'size': '大小',
|
||||
'referrer': '引荐来源',
|
||||
'user_agent': '客户端'
|
||||
}
|
||||
log_df = pd.DataFrame(log_entries).rename(columns=log_columns)
|
||||
|
||||
# 创建Excel文件
|
||||
with pd.ExcelWriter(output_path, engine='openpyxl') as writer:
|
||||
log_df.to_excel(writer, sheet_name='原日志', index=False)
|
||||
ip_df.to_excel(writer, sheet_name='IP表', index=False)
|
||||
ua_df.to_excel(writer, sheet_name='UA表', index=False)
|
||||
|
||||
def merge_statistics(results):
|
||||
ip_stats = defaultdict(lambda: {'count': 0, 'status': defaultdict(int)})
|
||||
ua_stats = defaultdict(lambda: {'count': 0, 'status': defaultdict(int)})
|
||||
log_entries = []
|
||||
|
||||
for ip_s, ua_s, logs in results:
|
||||
for ip, stats in ip_s.items():
|
||||
ip_stats[ip]['count'] += stats['count']
|
||||
for status, count in stats['status'].items():
|
||||
ip_stats[ip]['status'][status] += count
|
||||
|
||||
for ua, stats in ua_s.items():
|
||||
ua_stats[ua]['count'] += stats['count']
|
||||
for status, count in stats['status'].items():
|
||||
ua_stats[ua]['status'][status] += count
|
||||
|
||||
log_entries.extend(logs)
|
||||
|
||||
return ip_stats, ua_stats, log_entries
|
||||
|
||||
def process_log_file(file_path, output_folder):
|
||||
timestamp = datetime.now().strftime('%Y%m%d%H%M%S')
|
||||
filename = os.path.basename(file_path)
|
||||
new_filename = f"{os.path.splitext(filename)[0]}_{timestamp}.xlsx"
|
||||
output_path = os.path.join(output_folder, new_filename)
|
||||
|
||||
try:
|
||||
parse_log_file(file_path, output_path)
|
||||
print(f"原文件:{filename}已分析并保存至{output_path}")
|
||||
except Exception as e:
|
||||
print(f"处理文件 {filename} 时出错: {e}")
|
||||
|
||||
def process_logs_in_folder(log_folder, output_folder):
|
||||
if not os.path.exists(output_folder):
|
||||
os.makedirs(output_folder)
|
||||
|
||||
log_files = [os.path.join(log_folder, filename) for filename in os.listdir(log_folder) if filename.endswith('.log')]
|
||||
|
||||
with ThreadPoolExecutor(max_workers=os.cpu_count() or 4) as executor:
|
||||
executor.map(process_log_file, log_files, [output_folder] * len(log_files))
|
||||
|
||||
if __name__ == '__main__':
|
||||
log_folder = './AimLog'
|
||||
output_folder = './AnaResult'
|
||||
|
||||
try:
|
||||
process_logs_in_folder(log_folder, output_folder)
|
||||
except Exception as e:
|
||||
print(f"处理日志文件夹时出错: {e}")
|
33
README.md
33
README.md
@ -1,33 +0,0 @@
|
||||
### 代码编程思路
|
||||
|
||||
#### 问题发现
|
||||
1. 大文件处理速度过慢
|
||||
解决方法:使用分块+多线程提高处理速度
|
||||
2. 日志格式不同的问题
|
||||
解决方法:收集几种常见的web日志进行适配
|
||||
附加:收集安全设备导出日志进行分析适配
|
||||
3. 单python文件进行处理导致错误频发
|
||||
解决方法:分离各模块,单独进行分析最后再汇总
|
||||
|
||||
#### 分离模块
|
||||
- 综合执行读取模块:日志读取数据分离与各模块调用协调
|
||||
- 性能提速模块:分割文件,启动多线程
|
||||
- 文档生成模块:先记录IP-访问次数,然后将文件传给后续程序填写
|
||||
- 初步分析模块:主要分析UA和响应码;捕获分离出来的【IP-UA-响应码】进行初步分析,将爬虫从数据中分离出来,标注归属公司;同时,判断是否为恶意扫描IP
|
||||
- 次项分析模块:主要分析访问路径;结合上一步文件捕获分离出来的【IP-路径】进行第二步分析标记是否为合法爬虫(是否访问过robots.txt);根据路径中关键词判断IP是否为攻击者,攻击类型含有哪些
|
||||
- 终项分析模块:结合所有剩余部分进行综合分析(内容待定)
|
||||
|
||||
#### 表格构成
|
||||
| IP | 属性 | 归属 | 访问次数 | 攻击类型 | UA | 响应码 | 备注 |
|
||||
| :-----:| :----: | :----: | :----: | :----: | :----: | :----: | :----: |
|
||||
| 127.0.0.1 | 爬虫 | 百度 | 10 |内容爬取||||
|
||||
| 127.0.0.1 | 正常IP | 互联网 | 100 |||||
|
||||
| 127.0.0.1 | 恶意攻击者 | 互联网 | 1000 |SQL注入,路径扫描||||
|
||||
|
||||
|
||||
### 附加模块
|
||||
#### AnalysisBase.py
|
||||
初步处理网站日志用以人工分析
|
||||
|
||||
#### 待定
|
||||
筛选分析程序,人工筛选高频IP
|
Loading…
Reference in New Issue
Block a user