WhereAmI/core/simple_proxy_forwarder.py
2026-06-15 00:49:26 +08:00

386 lines
14 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""简化代理转发器 - 三步工作流程"""
import socket
import threading
import socks
import time
from typing import List, Optional
from concurrent.futures import ThreadPoolExecutor, as_completed
from loguru import logger
from .models import ProxyInfo, ProxyStatus
class SimpleProxyForwarder:
"""简化代理转发器
工作流程:
1. 验证代理 (_validate_proxies)
2. 获取可用代理 (_test_proxy)
3. 转发到8745端口 (_run_forwarder)
"""
def __init__(self, config: dict, connection_log_callback=None):
self.config = config
self.logger = logger
output_config = config.get('output', {})
self.host = output_config.get('host', '127.0.0.1')
self.port = output_config.get('port', 8745)
self.validated_proxies: List[ProxyInfo] = []
self.current_proxy: Optional[ProxyInfo] = None
self.server_socket: Optional[socket.socket] = None
self._running = False
self._server_thread: Optional[threading.Thread] = None
self._connections = []
self._lock = threading.Lock()
self._executor = ThreadPoolExecutor(max_workers=10)
# 连接日志回调函数
self.connection_log_callback = connection_log_callback
def validate_proxies(self, proxies: List[ProxyInfo]) -> List[ProxyInfo]:
"""步骤1: 验证所有代理的连通性"""
self.logger.info(f"开始验证 {len(proxies)} 个代理")
self.validated_proxies = []
futures = {}
for proxy in proxies:
future = self._executor.submit(self._validate_single_proxy, proxy)
futures[future] = proxy
for future in as_completed(futures):
proxy = futures[future]
try:
is_valid = future.result()
if is_valid:
self.validated_proxies.append(proxy)
proxy.status = ProxyStatus.AVAILABLE
except Exception as e:
self.logger.debug(f"验证代理 {proxy.get_address()} 失败: {str(e)}")
self.logger.info(f"验证完成,{len(self.validated_proxies)} 个代理可用")
return self.validated_proxies
def _validate_single_proxy(self, proxy: ProxyInfo) -> bool:
"""验证单个代理"""
try:
# TCP连接测试
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.settimeout(5)
result = sock.connect_ex((proxy.ip_address, proxy.port))
sock.close()
if result != 0:
return False
# SOCKS握手验证针对SOCKS代理
if proxy.protocol.value in ['socks4', 'socks5']:
return self._test_socks_handshake(proxy)
return True
except Exception:
return False
def _test_socks_handshake(self, proxy: ProxyInfo) -> bool:
"""测试SOCKS握手"""
try:
proxy_type = socks.SOCKS5 if proxy.protocol.value == 'socks5' else socks.SOCKS4
test_sock = socks.socksocket()
test_sock.set_proxy(
proxy_type,
proxy.ip_address,
proxy.port,
username=proxy.username if proxy.username != "no need" else None,
password=proxy.password if proxy.password != "no need" else None
)
test_sock.settimeout(5)
# 尝试连接到测试服务器
test_sock.connect(('www.google.com', 80))
test_sock.close()
return True
except Exception:
return False
def get_available_proxy(self) -> Optional[ProxyInfo]:
"""步骤2: 获取可用的代理"""
if not self.validated_proxies:
return None
# 选择第一个可用代理(可以扩展为负载均衡策略)
return self.validated_proxies[0]
def start(self, proxies: List[ProxyInfo]):
"""启动转发器"""
if self._running:
self.logger.warning("转发器已在运行")
return
# 步骤1: 验证代理
self.validate_proxies(proxies)
if not self.validated_proxies:
self.logger.error("没有可用的代理")
return
# 步骤2: 获取代理
self.current_proxy = self.get_available_proxy()
if not self.current_proxy:
self.logger.error("无法获取可用代理")
return
self.logger.info(f"使用代理: {self.current_proxy.get_address()} ({self.current_proxy.protocol.value})")
# 步骤3: 启动转发器
self._run_forwarder()
def _run_forwarder(self):
"""步骤3: 运行SOCKS5转发服务器"""
try:
self.server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.server_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
self.server_socket.bind((self.host, self.port))
self.server_socket.listen(100)
self.server_socket.settimeout(1.0)
self._running = True
self._server_thread = threading.Thread(target=self._accept_loop, daemon=True)
self._server_thread.start()
self.logger.info(f"SOCKS5转发器已启动: {self.host}:{self.port}")
except Exception as e:
self.logger.error(f"启动转发器失败: {str(e)}")
raise
def _accept_loop(self):
"""接受客户端连接循环"""
while self._running:
try:
client_socket, client_address = self.server_socket.accept()
# 为新连接创建处理线程
handler = ForwarderConnectionHandler(
client_socket,
client_address,
self.current_proxy,
self.logger,
self.connection_log_callback # 传递日志回调
)
handler.start()
with self._lock:
self._connections.append(handler)
except socket.timeout:
continue
except Exception as e:
if self._running:
self.logger.error(f"接受连接失败: {str(e)}")
break
def stop(self):
"""停止转发器"""
self._running = False
# 关闭所有连接
with self._lock:
for conn in self._connections:
try:
conn.running = False
conn.join(timeout=2)
except:
pass
self._connections.clear()
if self.server_socket:
try:
self.server_socket.close()
except:
pass
if self._server_thread:
self._server_thread.join(timeout=5)
self._executor.shutdown(wait=False)
self.logger.info("转发器已停止")
def switch_proxy(self, proxy: ProxyInfo) -> bool:
"""切换到新代理"""
if proxy not in self.validated_proxies:
self.logger.warning(f"代理 {proxy.get_address()} 未通过验证")
return False
with self._lock:
self.current_proxy = proxy
self.logger.info(f"切换到代理: {proxy.get_address()}")
return True
class ForwarderConnectionHandler(threading.Thread):
"""转发器连接处理器"""
def __init__(self, client_socket: socket.socket, client_address, upstream_proxy: ProxyInfo, logger, connection_log_callback=None):
super().__init__(daemon=True)
self.client_socket = client_socket
self.client_address = client_address
self.upstream_proxy = upstream_proxy
self.logger = logger
self.running = True
self.connection_log_callback = connection_log_callback
def run(self):
"""处理客户端连接"""
try:
# SOCKS5握手
if not self._socks5_handshake():
self.logger.debug(f"SOCKS5握手失败: {self.client_address}")
return
# 建立到目标服务器的连接
target_socket = self._connect_to_target()
if not target_socket:
return
# 记录连接日志
if self.connection_log_callback:
local_addr = f"{self.client_address[0]}:{self.client_address[1]}"
remote_addr = f"{self.upstream_proxy.ip_address}:{self.upstream_proxy.port}"
log_message = f"{local_addr} -> {remote_addr}"
self.connection_log_callback(log_message)
# 双向转发数据
self._forward_data(target_socket)
except Exception as e:
self.logger.debug(f"处理连接时出错: {str(e)}")
finally:
try:
self.client_socket.close()
except:
pass
self.running = False
def _socks5_handshake(self) -> bool:
"""执行SOCKS5握手"""
try:
version = self.client_socket.recv(1)
if version != b'\x05':
return False
nmethods = ord(self.client_socket.recv(1))
methods = self.client_socket.recv(nmethods)
self.client_socket.send(b'\x05\x00')
version = self.client_socket.recv(1)
if version != b'\x05':
return False
cmd = self.client_socket.recv(1)
if cmd != b'\x01':
self.client_socket.send(b'\x05\x07\x00\x01\x00\x00\x00\x00\x00\x00')
return False
self.client_socket.recv(1)
atype = self.client_socket.recv(1)
if atype == b'\x01':
dest_addr = self.client_socket.recv(4)
dest_port = self.client_socket.recv(2)
elif atype == b'\x03':
addr_len_byte = self.client_socket.recv(1)
addr_len = addr_len_byte[0] if isinstance(addr_len_byte, bytes) else ord(addr_len_byte)
dest_addr = self.client_socket.recv(addr_len)
dest_port = self.client_socket.recv(2)
elif atype == b'\x04':
dest_addr = self.client_socket.recv(16)
dest_port = self.client_socket.recv(2)
else:
return False
self.client_socket.send(b'\x05\x00\x00\x01\x00\x00\x00\x00\x00\x00')
self.dest_addr = dest_addr
self.dest_port = dest_port
return True
except Exception as e:
self.logger.debug(f"SOCKS5握手异常: {str(e)}")
return False
def _connect_to_target(self):
"""连接到目标服务器"""
try:
if len(self.dest_addr) == 4:
dest_ip = '.'.join(str(b) for b in self.dest_addr)
else:
dest_ip = self.dest_addr.decode('utf-8', errors='ignore')
# Python 3中bytes索引直接返回int不需要ord()
if isinstance(self.dest_port, bytes):
dest_port = (self.dest_port[0] << 8) + self.dest_port[1]
else:
dest_port = (ord(self.dest_port[0]) << 8) + ord(self.dest_port[1])
proxy_type_map = {
'socks4': socks.SOCKS4,
'socks5': socks.SOCKS5,
'http': socks.HTTP,
'https': socks.HTTP
}
proxy_type = proxy_type_map.get(self.upstream_proxy.protocol.value, socks.SOCKS5)
target_socket = socks.socksocket()
target_socket.set_proxy(
proxy_type,
self.upstream_proxy.ip_address,
self.upstream_proxy.port,
username=self.upstream_proxy.username if self.upstream_proxy.username != "no need" else None,
password=self.upstream_proxy.password if self.upstream_proxy.password != "no need" else None
)
target_socket.settimeout(10)
target_socket.connect((dest_ip, dest_port))
target_socket.settimeout(None)
return target_socket
except Exception as e:
self.logger.error(f"连接目标失败: {str(e)}")
return None
def _forward_data(self, target_socket):
"""双向转发数据"""
import select
sockets = [self.client_socket, target_socket]
while self.running:
try:
readable, _, _ = select.select(sockets, [], [], 1.0)
for sock in readable:
if sock == self.client_socket:
data = self.client_socket.recv(4096)
if not data:
return
target_socket.sendall(data)
elif sock == target_socket:
data = target_socket.recv(4096)
if not data:
return
self.client_socket.sendall(data)
except Exception as e:
self.logger.debug(f"转发数据时出错: {str(e)}")
break