Poc_Scanner/base_tool.py

153 lines
5.5 KiB
Python
Raw Permalink Normal View History

2024-10-09 15:15:50 +08:00
import requests
import yaml
import os
import warnings
import time
import urllib3
import logging
from urllib.parse import urlparse, urljoin
from colorama import init, Fore
init()
os.system("")
warnings.filterwarnings("ignore")
# 禁用不安全请求警告
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
# 配置日志记录
logging.basicConfig(level=logging.INFO, format=Fore.GREEN + '%(asctime)s' + Fore.RESET + ' - %(message)s', datefmt='%H:%M')
DEFAULT_HEADERS = {
'Accept': '*/*',
'User-Agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_12_6) AppleWebKit/603.3.8 (KHTML, like Gecko) Version/10.1.2 Safari/603.3.8',
'Referer': 'https://www.baidu.com',
'Accept-Encoding': 'gzip, deflate',
'Connection': 'keep-alive',
}
def check_url_status(url):
try:
response = requests.get(url, timeout=5, verify=False)
# return response.status_code == 200
return response.status_code in (200, 301, 302, 307)
except requests.RequestException as e:
# logging.error(f"Error checking URL status: {e}")
return False
def load_yaml_file(file_path):
try:
with open(file_path, 'r', encoding='utf-8') as stream:
return yaml.safe_load(stream)
except (yaml.YAMLError, FileNotFoundError) as e:
logging.error(f"无法解析 {file_path}: {e}")
return None
def send_request_and_validate(request_details, response_details, target_url):
method = request_details.get('method', 'GET').lower()
path = request_details.get('path', '/default_path')
path_2 = response_details.get('path', '')
url = urljoin(target_url, path)
if path_2:
url_2 = urljoin(target_url, path_2)
# url_2 = urljoin(target_url, path_2) if path_2 else url
logging.info(f"已发送攻击请求: {url}")
headers = {key: value for key, value in {**DEFAULT_HEADERS, **request_details.get('headers', {})}.items() if value}
data = request_details.get('body-raw', '')
logging.info(f"请求方法: {method.upper()}")
logging.info(f"请求头: {headers}")
try:
# 记录请求开始时间
start_time = time.time()
response = requests.request(method, url, headers=headers, data=data, verify=False)
if path_2:
response = requests.request("GET", url_2, headers=DEFAULT_HEADERS, verify=False)
# 记录请求结束时间并计算响应时间
response_time = round(time.time() - start_time, 2)
logging.info(f"响应时间: {response_time}")
expected_status_code = response_details.get('status-code', 200)
logging.info(f"攻击请求响应码: {expected_status_code} ; 实际响应码: {response.status_code}")
assert response.status_code == expected_status_code, f"验证获得失败响应码: {response.status_code}"
result_1 = validate_response_body(response.text, response_details.get('body', ''))
result = result_1
except AssertionError as e:
logging.error(f"错误: {e}")
result = "不存在漏洞"
except requests.RequestException as e:
logging.error(f"HTTP请求错误: {e}")
result = "请求失败"
except Exception as e:
logging.error(f"请求过程中发送了错误: {e}")
result = "请求失败"
return result, response.status_code, url, response_time
def validate_response_body(response_body, expected_body):
if expected_body and expected_body not in response_body:
logging.warning("响应体匹配失败。")
return "不存在漏洞"
logging.info("响应体成功匹配!")
# print("\n\n" + response_body + "\n\n")
return "存在漏洞"
def validate_main(target_url):
results = []
poc_paths = load_yaml_file_paths('./MatchedPOC.txt')
for poc_path in poc_paths:
# print("--------------------------------------------------")
yaml_content = load_yaml_file(poc_path)
if yaml_content is None:
continue
name = print_extracted_info(yaml_content)
result, status_code, url, res_time = send_request_and_validate(yaml_content.get('requests', {}), yaml_content.get('response', {}), target_url)
results.append((name, result, status_code, url, res_time))
# 提取描述部分
description = yaml_content.get("description", "Description not found")
print("--------------------------------------------------")
return results, description
def load_yaml_file_paths(file_path):
try:
with open(file_path, 'r') as file:
return file.read().splitlines()
except FileNotFoundError as e:
logging.error(f"未找到对应 POC 路径文件: {file_path}: {e}")
return []
def print_extracted_info(yaml_content):
keyword = yaml_content.get('keyword', '')
name = yaml_content.get('name', '')
description = yaml_content.get('description', '')
impact = yaml_content.get('impact', '')
logging.info(f"关键词: {keyword}")
logging.info(f"漏洞名称: {name}")
logging.info(f"描述: {description}")
logging.info(f"影响: {impact}")
return name
def remove_url_suffix(url):
parsed_url = urlparse(url)
return f"{parsed_url.scheme}://{parsed_url.netloc}"
def poc_scan(target_url):
status = check_url_status(target_url)
if not status:
logging.error(Fore.RED +"目标无法访问,请检查目标地址是否正确!" + Fore.RESET)
return
new_url = remove_url_suffix(target_url)
validate_main(new_url)
if __name__ == "__main__":
target_url = input("请输入待测目标https://example.com\n")
poc_scan(target_url)