386 lines
14 KiB
Python
386 lines
14 KiB
Python
"""简化代理转发器 - 三步工作流程"""
|
||
import socket
|
||
import threading
|
||
import socks
|
||
import time
|
||
from typing import List, Optional
|
||
from concurrent.futures import ThreadPoolExecutor, as_completed
|
||
from loguru import logger
|
||
from .models import ProxyInfo, ProxyStatus
|
||
|
||
|
||
class SimpleProxyForwarder:
|
||
"""简化代理转发器
|
||
|
||
工作流程:
|
||
1. 验证代理 (_validate_proxies)
|
||
2. 获取可用代理 (_test_proxy)
|
||
3. 转发到8745端口 (_run_forwarder)
|
||
"""
|
||
|
||
def __init__(self, config: dict, connection_log_callback=None):
|
||
self.config = config
|
||
self.logger = logger
|
||
|
||
output_config = config.get('output', {})
|
||
self.host = output_config.get('host', '127.0.0.1')
|
||
self.port = output_config.get('port', 8745)
|
||
|
||
self.validated_proxies: List[ProxyInfo] = []
|
||
self.current_proxy: Optional[ProxyInfo] = None
|
||
self.server_socket: Optional[socket.socket] = None
|
||
self._running = False
|
||
self._server_thread: Optional[threading.Thread] = None
|
||
self._connections = []
|
||
self._lock = threading.Lock()
|
||
self._executor = ThreadPoolExecutor(max_workers=10)
|
||
|
||
# 连接日志回调函数
|
||
self.connection_log_callback = connection_log_callback
|
||
|
||
def validate_proxies(self, proxies: List[ProxyInfo]) -> List[ProxyInfo]:
|
||
"""步骤1: 验证所有代理的连通性"""
|
||
self.logger.info(f"开始验证 {len(proxies)} 个代理")
|
||
self.validated_proxies = []
|
||
|
||
futures = {}
|
||
for proxy in proxies:
|
||
future = self._executor.submit(self._validate_single_proxy, proxy)
|
||
futures[future] = proxy
|
||
|
||
for future in as_completed(futures):
|
||
proxy = futures[future]
|
||
try:
|
||
is_valid = future.result()
|
||
if is_valid:
|
||
self.validated_proxies.append(proxy)
|
||
proxy.status = ProxyStatus.AVAILABLE
|
||
except Exception as e:
|
||
self.logger.debug(f"验证代理 {proxy.get_address()} 失败: {str(e)}")
|
||
|
||
self.logger.info(f"验证完成,{len(self.validated_proxies)} 个代理可用")
|
||
return self.validated_proxies
|
||
|
||
def _validate_single_proxy(self, proxy: ProxyInfo) -> bool:
|
||
"""验证单个代理"""
|
||
try:
|
||
# TCP连接测试
|
||
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
|
||
sock.settimeout(5)
|
||
|
||
result = sock.connect_ex((proxy.ip_address, proxy.port))
|
||
sock.close()
|
||
|
||
if result != 0:
|
||
return False
|
||
|
||
# SOCKS握手验证(针对SOCKS代理)
|
||
if proxy.protocol.value in ['socks4', 'socks5']:
|
||
return self._test_socks_handshake(proxy)
|
||
|
||
return True
|
||
|
||
except Exception:
|
||
return False
|
||
|
||
def _test_socks_handshake(self, proxy: ProxyInfo) -> bool:
|
||
"""测试SOCKS握手"""
|
||
try:
|
||
proxy_type = socks.SOCKS5 if proxy.protocol.value == 'socks5' else socks.SOCKS4
|
||
|
||
test_sock = socks.socksocket()
|
||
test_sock.set_proxy(
|
||
proxy_type,
|
||
proxy.ip_address,
|
||
proxy.port,
|
||
username=proxy.username if proxy.username != "no need" else None,
|
||
password=proxy.password if proxy.password != "no need" else None
|
||
)
|
||
test_sock.settimeout(5)
|
||
|
||
# 尝试连接到测试服务器
|
||
test_sock.connect(('www.google.com', 80))
|
||
test_sock.close()
|
||
|
||
return True
|
||
|
||
except Exception:
|
||
return False
|
||
|
||
def get_available_proxy(self) -> Optional[ProxyInfo]:
|
||
"""步骤2: 获取可用的代理"""
|
||
if not self.validated_proxies:
|
||
return None
|
||
|
||
# 选择第一个可用代理(可以扩展为负载均衡策略)
|
||
return self.validated_proxies[0]
|
||
|
||
def start(self, proxies: List[ProxyInfo]):
|
||
"""启动转发器"""
|
||
if self._running:
|
||
self.logger.warning("转发器已在运行")
|
||
return
|
||
|
||
# 步骤1: 验证代理
|
||
self.validate_proxies(proxies)
|
||
|
||
if not self.validated_proxies:
|
||
self.logger.error("没有可用的代理")
|
||
return
|
||
|
||
# 步骤2: 获取代理
|
||
self.current_proxy = self.get_available_proxy()
|
||
if not self.current_proxy:
|
||
self.logger.error("无法获取可用代理")
|
||
return
|
||
|
||
self.logger.info(f"使用代理: {self.current_proxy.get_address()} ({self.current_proxy.protocol.value})")
|
||
|
||
# 步骤3: 启动转发器
|
||
self._run_forwarder()
|
||
|
||
def _run_forwarder(self):
|
||
"""步骤3: 运行SOCKS5转发服务器"""
|
||
try:
|
||
self.server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
|
||
self.server_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
|
||
self.server_socket.bind((self.host, self.port))
|
||
self.server_socket.listen(100)
|
||
self.server_socket.settimeout(1.0)
|
||
|
||
self._running = True
|
||
self._server_thread = threading.Thread(target=self._accept_loop, daemon=True)
|
||
self._server_thread.start()
|
||
|
||
self.logger.info(f"SOCKS5转发器已启动: {self.host}:{self.port}")
|
||
|
||
except Exception as e:
|
||
self.logger.error(f"启动转发器失败: {str(e)}")
|
||
raise
|
||
|
||
def _accept_loop(self):
|
||
"""接受客户端连接循环"""
|
||
while self._running:
|
||
try:
|
||
client_socket, client_address = self.server_socket.accept()
|
||
|
||
# 为新连接创建处理线程
|
||
handler = ForwarderConnectionHandler(
|
||
client_socket,
|
||
client_address,
|
||
self.current_proxy,
|
||
self.logger,
|
||
self.connection_log_callback # 传递日志回调
|
||
)
|
||
handler.start()
|
||
|
||
with self._lock:
|
||
self._connections.append(handler)
|
||
|
||
except socket.timeout:
|
||
continue
|
||
except Exception as e:
|
||
if self._running:
|
||
self.logger.error(f"接受连接失败: {str(e)}")
|
||
break
|
||
|
||
def stop(self):
|
||
"""停止转发器"""
|
||
self._running = False
|
||
|
||
# 关闭所有连接
|
||
with self._lock:
|
||
for conn in self._connections:
|
||
try:
|
||
conn.running = False
|
||
conn.join(timeout=2)
|
||
except:
|
||
pass
|
||
self._connections.clear()
|
||
|
||
if self.server_socket:
|
||
try:
|
||
self.server_socket.close()
|
||
except:
|
||
pass
|
||
|
||
if self._server_thread:
|
||
self._server_thread.join(timeout=5)
|
||
|
||
self._executor.shutdown(wait=False)
|
||
|
||
self.logger.info("转发器已停止")
|
||
|
||
def switch_proxy(self, proxy: ProxyInfo) -> bool:
|
||
"""切换到新代理"""
|
||
if proxy not in self.validated_proxies:
|
||
self.logger.warning(f"代理 {proxy.get_address()} 未通过验证")
|
||
return False
|
||
|
||
with self._lock:
|
||
self.current_proxy = proxy
|
||
self.logger.info(f"切换到代理: {proxy.get_address()}")
|
||
return True
|
||
|
||
|
||
class ForwarderConnectionHandler(threading.Thread):
|
||
"""转发器连接处理器"""
|
||
|
||
def __init__(self, client_socket: socket.socket, client_address, upstream_proxy: ProxyInfo, logger, connection_log_callback=None):
|
||
super().__init__(daemon=True)
|
||
self.client_socket = client_socket
|
||
self.client_address = client_address
|
||
self.upstream_proxy = upstream_proxy
|
||
self.logger = logger
|
||
self.running = True
|
||
self.connection_log_callback = connection_log_callback
|
||
|
||
def run(self):
|
||
"""处理客户端连接"""
|
||
try:
|
||
# SOCKS5握手
|
||
if not self._socks5_handshake():
|
||
self.logger.debug(f"SOCKS5握手失败: {self.client_address}")
|
||
return
|
||
|
||
# 建立到目标服务器的连接
|
||
target_socket = self._connect_to_target()
|
||
if not target_socket:
|
||
return
|
||
|
||
# 记录连接日志
|
||
if self.connection_log_callback:
|
||
local_addr = f"{self.client_address[0]}:{self.client_address[1]}"
|
||
remote_addr = f"{self.upstream_proxy.ip_address}:{self.upstream_proxy.port}"
|
||
log_message = f"{local_addr} -> {remote_addr}"
|
||
self.connection_log_callback(log_message)
|
||
|
||
# 双向转发数据
|
||
self._forward_data(target_socket)
|
||
|
||
except Exception as e:
|
||
self.logger.debug(f"处理连接时出错: {str(e)}")
|
||
finally:
|
||
try:
|
||
self.client_socket.close()
|
||
except:
|
||
pass
|
||
self.running = False
|
||
|
||
def _socks5_handshake(self) -> bool:
|
||
"""执行SOCKS5握手"""
|
||
try:
|
||
version = self.client_socket.recv(1)
|
||
if version != b'\x05':
|
||
return False
|
||
|
||
nmethods = ord(self.client_socket.recv(1))
|
||
methods = self.client_socket.recv(nmethods)
|
||
|
||
self.client_socket.send(b'\x05\x00')
|
||
|
||
version = self.client_socket.recv(1)
|
||
if version != b'\x05':
|
||
return False
|
||
|
||
cmd = self.client_socket.recv(1)
|
||
if cmd != b'\x01':
|
||
self.client_socket.send(b'\x05\x07\x00\x01\x00\x00\x00\x00\x00\x00')
|
||
return False
|
||
|
||
self.client_socket.recv(1)
|
||
atype = self.client_socket.recv(1)
|
||
|
||
if atype == b'\x01':
|
||
dest_addr = self.client_socket.recv(4)
|
||
dest_port = self.client_socket.recv(2)
|
||
elif atype == b'\x03':
|
||
addr_len_byte = self.client_socket.recv(1)
|
||
addr_len = addr_len_byte[0] if isinstance(addr_len_byte, bytes) else ord(addr_len_byte)
|
||
dest_addr = self.client_socket.recv(addr_len)
|
||
dest_port = self.client_socket.recv(2)
|
||
elif atype == b'\x04':
|
||
dest_addr = self.client_socket.recv(16)
|
||
dest_port = self.client_socket.recv(2)
|
||
else:
|
||
return False
|
||
|
||
self.client_socket.send(b'\x05\x00\x00\x01\x00\x00\x00\x00\x00\x00')
|
||
|
||
self.dest_addr = dest_addr
|
||
self.dest_port = dest_port
|
||
|
||
return True
|
||
|
||
except Exception as e:
|
||
self.logger.debug(f"SOCKS5握手异常: {str(e)}")
|
||
return False
|
||
|
||
def _connect_to_target(self):
|
||
"""连接到目标服务器"""
|
||
try:
|
||
if len(self.dest_addr) == 4:
|
||
dest_ip = '.'.join(str(b) for b in self.dest_addr)
|
||
else:
|
||
dest_ip = self.dest_addr.decode('utf-8', errors='ignore')
|
||
|
||
# Python 3中bytes索引直接返回int,不需要ord()
|
||
if isinstance(self.dest_port, bytes):
|
||
dest_port = (self.dest_port[0] << 8) + self.dest_port[1]
|
||
else:
|
||
dest_port = (ord(self.dest_port[0]) << 8) + ord(self.dest_port[1])
|
||
|
||
proxy_type_map = {
|
||
'socks4': socks.SOCKS4,
|
||
'socks5': socks.SOCKS5,
|
||
'http': socks.HTTP,
|
||
'https': socks.HTTP
|
||
}
|
||
|
||
proxy_type = proxy_type_map.get(self.upstream_proxy.protocol.value, socks.SOCKS5)
|
||
|
||
target_socket = socks.socksocket()
|
||
target_socket.set_proxy(
|
||
proxy_type,
|
||
self.upstream_proxy.ip_address,
|
||
self.upstream_proxy.port,
|
||
username=self.upstream_proxy.username if self.upstream_proxy.username != "no need" else None,
|
||
password=self.upstream_proxy.password if self.upstream_proxy.password != "no need" else None
|
||
)
|
||
|
||
target_socket.settimeout(10)
|
||
target_socket.connect((dest_ip, dest_port))
|
||
target_socket.settimeout(None)
|
||
|
||
return target_socket
|
||
|
||
except Exception as e:
|
||
self.logger.error(f"连接目标失败: {str(e)}")
|
||
return None
|
||
|
||
def _forward_data(self, target_socket):
|
||
"""双向转发数据"""
|
||
import select
|
||
|
||
sockets = [self.client_socket, target_socket]
|
||
|
||
while self.running:
|
||
try:
|
||
readable, _, _ = select.select(sockets, [], [], 1.0)
|
||
|
||
for sock in readable:
|
||
if sock == self.client_socket:
|
||
data = self.client_socket.recv(4096)
|
||
if not data:
|
||
return
|
||
target_socket.sendall(data)
|
||
elif sock == target_socket:
|
||
data = target_socket.recv(4096)
|
||
if not data:
|
||
return
|
||
self.client_socket.sendall(data)
|
||
|
||
except Exception as e:
|
||
self.logger.debug(f"转发数据时出错: {str(e)}")
|
||
break
|