"""代理健康检测模块""" import socket import requests import threading import time from typing import List, Callable, Optional from concurrent.futures import ThreadPoolExecutor, as_completed from datetime import datetime, timedelta from loguru import logger from .models import ProxyInfo, ProxyStatus class HealthChecker: """代理健康检测器""" def __init__(self, config: dict): self.config = config self.logger = logger self.health_config = config.get('health_check', {}) self.timeout = self.health_config.get('timeout', 5) self.connectivity_url = self.health_config.get('connectivity_test_url', 'https://www.google.com/') self.connectivity_timeout = self.health_config.get('connectivity_timeout', 10) self.max_failures = self.health_config.get('max_failures', 3) self.retry_delay = self.health_config.get('retry_delay', 600) self._check_lock = threading.Lock() self._executor = ThreadPoolExecutor(max_workers=10) def check_proxy(self, proxy: ProxyInfo) -> ProxyStatus: """检测单个代理的健康状态""" with self._check_lock: proxy.status = ProxyStatus.CHECKING proxy.last_check = datetime.now() try: # 步骤1: TCP连接测试(快速检查) start_time = time.time() sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) sock.settimeout(self.timeout) result = sock.connect_ex((proxy.ip_address, proxy.port)) tcp_elapsed = (time.time() - start_time) * 1000 # 转换为毫秒 sock.close() if result != 0: raise ConnectionError(f"无法连接到 {proxy.get_address()}") proxy.latency_ms = tcp_elapsed # 步骤2: 测试Google连通性 if not self._test_connectivity(proxy): raise ConnectionError("Google连通性测试失败") # 步骤3: 连通性成功后,再次测试真实延迟 start_time = time.time() sock2 = socket.socket(socket.AF_INET, socket.SOCK_STREAM) sock2.settimeout(self.timeout) result2 = sock2.connect_ex((proxy.ip_address, proxy.port)) real_elapsed = (time.time() - start_time) * 1000 # 转换为毫秒 sock2.close() if result2 == 0: # 使用第二次测试的真实延迟 proxy.latency_ms = real_elapsed # 判断等级(基于真实延迟) if real_elapsed < 200: proxy.status = ProxyStatus.EXCELLENT else: proxy.status = ProxyStatus.AVAILABLE proxy.consecutive_failures = 0 except Exception as e: proxy.consecutive_failures += 1 proxy.latency_ms = 9999 proxy.status = ProxyStatus.UNAVAILABLE self.logger.debug(f"代理 {proxy.get_address()} 检测失败: {str(e)}") return proxy.status def _test_connectivity(self, proxy: ProxyInfo) -> bool: """测试代理的Google连通性 使用多种方法测试,提高准确性: 1. 首先尝试 HEAD 请求(类似 curl -I) 2. 如果失败,尝试 GET 请求 3. 支持多个测试URL """ test_urls = [ 'https://www.google.com/', 'https://www.google.com/generate_204', # 专门用于连通性测试 'http://www.google.com/', ] for test_url in test_urls: try: proxies_dict = {} protocol = proxy.protocol.value # 构建代理配置 if protocol in ['http', 'https']: auth = proxy.get_auth_string() if auth: proxies_dict['http'] = f"http://{auth}@{proxy.get_address()}" proxies_dict['https'] = f"http://{auth}@{proxy.get_address()}" else: proxies_dict['http'] = f"http://{proxy.get_address()}" proxies_dict['https'] = f"http://{proxy.get_address()}" elif protocol in ['socks4', 'socks5']: auth = proxy.get_auth_string() proxy_url = f"socks5://{auth}@{proxy.get_address()}" if auth else f"socks5://{proxy.get_address()}" proxies_dict['http'] = proxy_url proxies_dict['https'] = proxy_url # 方法1: 尝试 HEAD 请求(轻量,类似 curl -I) try: response = requests.head( test_url, proxies=proxies_dict, timeout=self.connectivity_timeout, verify=False, allow_redirects=True ) # HEAD 请求成功即可 if response.status_code < 400: self.logger.debug(f"代理 {proxy.get_address()} HEAD测试成功: {test_url} ({response.status_code})") return True except Exception as e: self.logger.debug(f"HEAD测试失败: {str(e)}") # 方法2: 如果 HEAD 失败,尝试 GET 请求 try: response = requests.get( test_url, proxies=proxies_dict, timeout=self.connectivity_timeout, verify=False, allow_redirects=True ) # GET 请求成功 if response.status_code < 400: self.logger.debug(f"代理 {proxy.get_address()} GET测试成功: {test_url} ({response.status_code})") return True except Exception as e: self.logger.debug(f"GET测试失败: {str(e)}") continue except Exception as e: self.logger.debug(f"测试URL {test_url} 失败: {str(e)}") continue # 所有测试都失败 self.logger.debug(f"代理 {proxy.get_address()} 所有连通性测试均失败") return False def batch_check(self, proxies: List[ProxyInfo], callback: Callable = None) -> List[ProxyInfo]: """批量检测代理""" self.logger.info(f"开始批量检测 {len(proxies)} 个代理") futures = {} for proxy in proxies: # 跳过最近检测过的代理(缓存60秒) if proxy.last_check and (datetime.now() - proxy.last_check) < timedelta(seconds=60): continue future = self._executor.submit(self.check_proxy, proxy) futures[future] = proxy checked_proxies = [] for future in as_completed(futures): proxy = futures[future] try: future.result() checked_proxies.append(proxy) if callback: callback(proxy) except Exception as e: self.logger.error(f"检测代理 {proxy.get_address()} 时出错: {str(e)}") self.logger.info(f"完成检测,共检测 {len(checked_proxies)} 个代理") return checked_proxies def shutdown(self): """关闭检测器""" self._executor.shutdown(wait=False)