"""代理管理器 - 协调数据源、检测器和转发器""" import threading import time import json import os from typing import List, Optional from datetime import datetime from loguru import logger from .models import ProxyInfo, ProxyStatus from .data_source import DataSource from .health_checker import HealthChecker from .simple_proxy_forwarder import SimpleProxyForwarder class ProxyManager: """代理管理器""" def __init__(self, config: dict, connection_log_callback=None): self.config = config self.logger = logger self.data_source = DataSource(config) self.health_checker = HealthChecker(config) self.forwarder = SimpleProxyForwarder(config, connection_log_callback) self.proxies: List[ProxyInfo] = [] self.active_proxy: Optional[ProxyInfo] = None self.available_proxies: List[ProxyInfo] = [] self._lock = threading.RLock() self._running = False self._refresh_thread = None def load_proxies(self) -> int: """加载代理列表""" with self._lock: all_proxies = [] # 从网页获取(保存到 proxy.json) if self.config.get('proxy_sources', {}).get('auto_fetch', {}).get('enabled', True): url = self.config['proxy_sources']['auto_fetch'].get('url') output_file = self.config['proxy_sources']['auto_fetch'].get('output_file', 'proxy.json') pages = self.config['proxy_sources']['auto_fetch'].get('pages', 1) # 获取页数配置 if url: web_proxies = self.data_source.fetch_from_web(url, output_file, pages) all_proxies.extend(web_proxies) # 从本地文件加载(用户手动添加的 local.json) if self.config.get('proxy_sources', {}).get('local_file', {}).get('enabled', True): filepath = self.config['proxy_sources']['local_file'].get('path', 'local.json') file_proxies = self.data_source.load_from_file(filepath) all_proxies.extend(file_proxies) # 去重(基于IP+端口) seen = set() unique_proxies = [] for proxy in all_proxies: key = f"{proxy.ip_address}:{proxy.port}" if key not in seen: seen.add(key) unique_proxies.append(proxy) self.proxies = unique_proxies self.logger.info(f"加载了 {len(self.proxies)} 个唯一代理") return len(self.proxies) def set_proxies(self, proxies: List[ProxyInfo]): """设置代理列表(用于GUI选择单个代理)""" with self._lock: self.proxies = proxies self.logger.info(f"设置代理列表: {len(proxies)} 个代理") def check_all_proxies(self) -> int: """检测所有代理""" with self._lock: available_count = 0 def on_check_complete(proxy: ProxyInfo): nonlocal available_count if proxy.status in [ProxyStatus.AVAILABLE, ProxyStatus.EXCELLENT]: available_count += 1 self.health_checker.batch_check(self.proxies, callback=on_check_complete) # 更新可用代理列表 self.available_proxies = [ p for p in self.proxies if p.status in [ProxyStatus.AVAILABLE, ProxyStatus.EXCELLENT] ] # 同步延迟到本地文件 self._sync_latency_to_file() self.logger.info(f"检测到 {available_count} 个可用代理") return available_count def _sync_latency_to_file(self): """将检测结果(延迟)同步到本地文件 同时同步到: 1. proxy.json(网页抓取的代理) 2. local.json(用户手动添加的代理) """ try: # 同步到 proxy.json self._sync_to_single_file( self.config.get('proxy_sources', {}).get('auto_fetch', {}).get('output_file', 'proxy.json') ) # 同步到 local.json local_file_path = self.config.get('proxy_sources', {}).get('local_file', {}).get('path', 'local.json') if local_file_path and local_file_path != 'proxy.json': self._sync_to_single_file(local_file_path) except Exception as e: self.logger.error(f"同步延迟到文件失败: {str(e)}") def _sync_to_single_file(self, filepath: str): """同步延迟到单个文件""" try: if not os.path.exists(filepath): return # 读取现有文件 with open(filepath, 'r', encoding='utf-8') as f: data = json.load(f) if not isinstance(data, list): return # 创建索引 existing_proxies = {} for item in data: key = f"{item['ip_address']}:{item['port']}" existing_proxies[key] = item # 更新延迟信息 updated_count = 0 for proxy in self.proxies: key = proxy.get_address() if key in existing_proxies: # 更新延迟和状态 existing_proxies[key]['latency'] = f"{proxy.latency_ms:.0f}ms" if proxy.latency_ms < 9999 else "timeout" existing_proxies[key]['status'] = proxy.status.value existing_proxies[key]['last_checked'] = datetime.now().isoformat() updated_count += 1 # 保存更新后的数据 if updated_count > 0: updated_data = list(existing_proxies.values()) with open(filepath, 'w', encoding='utf-8') as f: json.dump(updated_data, f, indent=2, ensure_ascii=False) self.logger.info(f"已同步 {updated_count} 个代理的延迟信息到 {filepath}") except Exception as e: self.logger.debug(f"同步到 {filepath} 失败: {str(e)}") def start_service(self) -> bool: """启动代理服务""" if self._running: self.logger.warning("服务已在运行") return True try: # 加载代理 self.load_proxies() # 检测代理 self.check_all_proxies() if not self.available_proxies: self.logger.error("没有可用的代理,无法启动服务") return False # 启动转发器 self.forwarder.start(self.available_proxies) self._running = True self.active_proxy = self.forwarder.current_proxy self.logger.info("代理服务已启动") return True except Exception as e: self.logger.error(f"启动服务失败: {str(e)}") return False def stop_service(self): """停止代理服务""" if not self._running: return self._running = False self.forwarder.stop() self.health_checker.shutdown() self.logger.info("代理服务已停止") def switch_to_next_proxy(self) -> bool: """切换到下一个代理""" if not self._running: self.logger.warning("服务未运行") return False if not self.available_proxies: self.logger.warning("没有可用代理") return False # 找到当前代理的索引 current_index = -1 if self.active_proxy and self.active_proxy in self.available_proxies: current_index = self.available_proxies.index(self.active_proxy) # 选择下一个代理 next_index = (current_index + 1) % len(self.available_proxies) next_proxy = self.available_proxies[next_index] # 切换代理 success = self.forwarder.switch_proxy(next_proxy) if success: self.active_proxy = next_proxy self.logger.info(f"成功切换到下一个代理: {next_proxy.get_address()}") return success def get_active_proxy(self) -> Optional[ProxyInfo]: """获取当前活跃代理""" with self._lock: return self.active_proxy def get_statistics(self) -> dict: """获取统计信息""" with self._lock: stats = { 'total': len(self.proxies), 'available': len([p for p in self.proxies if p.status == ProxyStatus.AVAILABLE]), 'excellent': len([p for p in self.proxies if p.status == ProxyStatus.EXCELLENT]), 'unavailable': len([p for p in self.proxies if p.status == ProxyStatus.UNAVAILABLE]), 'active': self.active_proxy.get_address() if self.active_proxy else None } return stats def start_auto_refresh(self, interval_minutes: int = 10): """启动自动刷新""" if self._running: return self._running = True def refresh_loop(): while self._running: time.sleep(interval_minutes * 60) if self._running: self.logger.info("执行自动刷新...") self.load_proxies() self.check_all_proxies() self._refresh_thread = threading.Thread(target=refresh_loop, daemon=True) self._refresh_thread.start() def stop_auto_refresh(self): """停止自动刷新""" self._running = False if self._refresh_thread: self._refresh_thread.join(timeout=5)