193 lines
7.7 KiB
Python
193 lines
7.7 KiB
Python
"""代理健康检测模块"""
|
||
import socket
|
||
import requests
|
||
import threading
|
||
import time
|
||
from typing import List, Callable, Optional
|
||
from concurrent.futures import ThreadPoolExecutor, as_completed
|
||
from datetime import datetime, timedelta
|
||
from loguru import logger
|
||
from .models import ProxyInfo, ProxyStatus
|
||
|
||
|
||
class HealthChecker:
|
||
"""代理健康检测器"""
|
||
|
||
def __init__(self, config: dict):
|
||
self.config = config
|
||
self.logger = logger
|
||
self.health_config = config.get('health_check', {})
|
||
|
||
self.timeout = self.health_config.get('timeout', 5)
|
||
self.connectivity_url = self.health_config.get('connectivity_test_url', 'https://www.google.com/')
|
||
self.connectivity_timeout = self.health_config.get('connectivity_timeout', 10)
|
||
self.max_failures = self.health_config.get('max_failures', 3)
|
||
self.retry_delay = self.health_config.get('retry_delay', 600)
|
||
|
||
self._check_lock = threading.Lock()
|
||
self._executor = ThreadPoolExecutor(max_workers=10)
|
||
|
||
def check_proxy(self, proxy: ProxyInfo) -> ProxyStatus:
|
||
"""检测单个代理的健康状态"""
|
||
with self._check_lock:
|
||
proxy.status = ProxyStatus.CHECKING
|
||
proxy.last_check = datetime.now()
|
||
|
||
try:
|
||
# 步骤1: TCP连接测试(快速检查)
|
||
start_time = time.time()
|
||
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
|
||
sock.settimeout(self.timeout)
|
||
|
||
result = sock.connect_ex((proxy.ip_address, proxy.port))
|
||
tcp_elapsed = (time.time() - start_time) * 1000 # 转换为毫秒
|
||
|
||
sock.close()
|
||
|
||
if result != 0:
|
||
raise ConnectionError(f"无法连接到 {proxy.get_address()}")
|
||
|
||
proxy.latency_ms = tcp_elapsed
|
||
|
||
# 步骤2: 测试Google连通性
|
||
if not self._test_connectivity(proxy):
|
||
raise ConnectionError("Google连通性测试失败")
|
||
|
||
# 步骤3: 连通性成功后,再次测试真实延迟
|
||
start_time = time.time()
|
||
sock2 = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
|
||
sock2.settimeout(self.timeout)
|
||
|
||
result2 = sock2.connect_ex((proxy.ip_address, proxy.port))
|
||
real_elapsed = (time.time() - start_time) * 1000 # 转换为毫秒
|
||
|
||
sock2.close()
|
||
|
||
if result2 == 0:
|
||
# 使用第二次测试的真实延迟
|
||
proxy.latency_ms = real_elapsed
|
||
|
||
# 判断等级(基于真实延迟)
|
||
if real_elapsed < 200:
|
||
proxy.status = ProxyStatus.EXCELLENT
|
||
else:
|
||
proxy.status = ProxyStatus.AVAILABLE
|
||
proxy.consecutive_failures = 0
|
||
|
||
except Exception as e:
|
||
proxy.consecutive_failures += 1
|
||
proxy.latency_ms = 9999
|
||
proxy.status = ProxyStatus.UNAVAILABLE
|
||
|
||
self.logger.debug(f"代理 {proxy.get_address()} 检测失败: {str(e)}")
|
||
|
||
return proxy.status
|
||
|
||
def _test_connectivity(self, proxy: ProxyInfo) -> bool:
|
||
"""测试代理的Google连通性
|
||
|
||
使用多种方法测试,提高准确性:
|
||
1. 首先尝试 HEAD 请求(类似 curl -I)
|
||
2. 如果失败,尝试 GET 请求
|
||
3. 支持多个测试URL
|
||
"""
|
||
test_urls = [
|
||
'https://www.google.com/',
|
||
'https://www.google.com/generate_204', # 专门用于连通性测试
|
||
'http://www.google.com/',
|
||
]
|
||
|
||
for test_url in test_urls:
|
||
try:
|
||
proxies_dict = {}
|
||
protocol = proxy.protocol.value
|
||
|
||
# 构建代理配置
|
||
if protocol in ['http', 'https']:
|
||
auth = proxy.get_auth_string()
|
||
if auth:
|
||
proxies_dict['http'] = f"http://{auth}@{proxy.get_address()}"
|
||
proxies_dict['https'] = f"http://{auth}@{proxy.get_address()}"
|
||
else:
|
||
proxies_dict['http'] = f"http://{proxy.get_address()}"
|
||
proxies_dict['https'] = f"http://{proxy.get_address()}"
|
||
elif protocol in ['socks4', 'socks5']:
|
||
auth = proxy.get_auth_string()
|
||
proxy_url = f"socks5://{auth}@{proxy.get_address()}" if auth else f"socks5://{proxy.get_address()}"
|
||
proxies_dict['http'] = proxy_url
|
||
proxies_dict['https'] = proxy_url
|
||
|
||
# 方法1: 尝试 HEAD 请求(轻量,类似 curl -I)
|
||
try:
|
||
response = requests.head(
|
||
test_url,
|
||
proxies=proxies_dict,
|
||
timeout=self.connectivity_timeout,
|
||
verify=False,
|
||
allow_redirects=True
|
||
)
|
||
# HEAD 请求成功即可
|
||
if response.status_code < 400:
|
||
self.logger.debug(f"代理 {proxy.get_address()} HEAD测试成功: {test_url} ({response.status_code})")
|
||
return True
|
||
except Exception as e:
|
||
self.logger.debug(f"HEAD测试失败: {str(e)}")
|
||
|
||
# 方法2: 如果 HEAD 失败,尝试 GET 请求
|
||
try:
|
||
response = requests.get(
|
||
test_url,
|
||
proxies=proxies_dict,
|
||
timeout=self.connectivity_timeout,
|
||
verify=False,
|
||
allow_redirects=True
|
||
)
|
||
# GET 请求成功
|
||
if response.status_code < 400:
|
||
self.logger.debug(f"代理 {proxy.get_address()} GET测试成功: {test_url} ({response.status_code})")
|
||
return True
|
||
except Exception as e:
|
||
self.logger.debug(f"GET测试失败: {str(e)}")
|
||
continue
|
||
|
||
except Exception as e:
|
||
self.logger.debug(f"测试URL {test_url} 失败: {str(e)}")
|
||
continue
|
||
|
||
# 所有测试都失败
|
||
self.logger.debug(f"代理 {proxy.get_address()} 所有连通性测试均失败")
|
||
return False
|
||
|
||
def batch_check(self, proxies: List[ProxyInfo], callback: Callable = None) -> List[ProxyInfo]:
|
||
"""批量检测代理"""
|
||
self.logger.info(f"开始批量检测 {len(proxies)} 个代理")
|
||
|
||
futures = {}
|
||
for proxy in proxies:
|
||
# 跳过最近检测过的代理(缓存60秒)
|
||
if proxy.last_check and (datetime.now() - proxy.last_check) < timedelta(seconds=60):
|
||
continue
|
||
|
||
future = self._executor.submit(self.check_proxy, proxy)
|
||
futures[future] = proxy
|
||
|
||
checked_proxies = []
|
||
for future in as_completed(futures):
|
||
proxy = futures[future]
|
||
try:
|
||
future.result()
|
||
checked_proxies.append(proxy)
|
||
|
||
if callback:
|
||
callback(proxy)
|
||
|
||
except Exception as e:
|
||
self.logger.error(f"检测代理 {proxy.get_address()} 时出错: {str(e)}")
|
||
|
||
self.logger.info(f"完成检测,共检测 {len(checked_proxies)} 个代理")
|
||
return checked_proxies
|
||
|
||
def shutdown(self):
|
||
"""关闭检测器"""
|
||
self._executor.shutdown(wait=False)
|