WhereAmI/core/data_source.py
2026-06-15 00:49:26 +08:00

220 lines
8.0 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""代理数据源获取模块"""
import requests
from bs4 import BeautifulSoup
import json
import os
from typing import List, Optional
from datetime import datetime
from loguru import logger
from .models import ProxyInfo, ProxyProtocol
class DataSource:
"""代理数据源管理器"""
def __init__(self, config: dict):
self.config = config
self.logger = logger
def fetch_from_web(self, url: str, output_file: str = "proxy.json", pages: int = 1) -> List[ProxyInfo]:
"""从网页获取代理列表并保存到文件
Args:
url: 基础URL不含页码参数
output_file: 输出文件路径
pages: 抓取的页数默认1页
"""
self.logger.info(f"开始从网页获取代理: {url} (共{pages}页)")
all_proxies = []
try:
headers = {
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36'
}
# 抓取多页
for page in range(1, pages + 1):
# 构建带页码的URL
if '?' in url:
page_url = f"{url}&page={page}"
else:
page_url = f"{url}?page={page}"
self.logger.info(f"正在获取第 {page}/{pages} 页: {page_url}")
try:
response = requests.get(page_url, headers=headers, timeout=30)
response.encoding = 'utf-8'
if response.status_code != 200:
self.logger.warning(f"{page}页获取失败,状态码: {response.status_code}")
continue
proxies = self._parse_html(response.text)
self.logger.info(f"{page}页成功获取 {len(proxies)} 个代理")
all_proxies.extend(proxies)
except Exception as e:
self.logger.error(f"{page}页获取失败: {str(e)}")
continue
# 去重基于IP+端口)
seen = set()
unique_proxies = []
for proxy in all_proxies:
key = f"{proxy.ip_address}:{proxy.port}"
if key not in seen:
seen.add(key)
unique_proxies.append(proxy)
self.logger.info(f"成功获取 {len(unique_proxies)} 个唯一代理(共{len(all_proxies)}个,去重后)")
# 保存到文件
if unique_proxies:
self.save_to_file(unique_proxies, output_file)
self.logger.info(f"代理已保存到: {output_file}")
except Exception as e:
self.logger.error(f"获取网页代理失败: {str(e)}")
return unique_proxies
def _parse_html(self, html: str) -> List[ProxyInfo]:
"""解析HTML提取代理信息基于page.html结构"""
proxies = []
soup = BeautifulSoup(html, 'html.parser')
# 查找表格
table = soup.find('table', class_='table')
if not table:
self.logger.warning("未找到代理表格")
return proxies
# 解析表格行
tbody = table.find('tbody')
if not tbody:
return proxies
rows = tbody.find_all('tr')
for row in rows:
try:
cells = row.find_all('td')
if len(cells) < 12:
continue
# 提取字段根据page.html结构
ip_address = cells[0].text.strip()
port_text = cells[1].text.strip()
# 用户名和密码隐藏在img的data-text属性中
username_img = cells[2].find('img')
username = username_img.get('data-text', 'no need') if username_img else 'no need'
password_img = cells[3].find('img')
password = password_img.get('data-text', 'no need') if password_img else 'no need'
# 国家
country_link = cells[4].find('a')
country = country_link.get('title', '') if country_link else ''
# 协议
protocol_text = cells[5].text.strip().lower()
# 匿名级别
anonymity = cells[6].text.strip()
# 速度
speed = cells[7].text.strip()
# 运行时间百分比
uptime = cells[8].text.strip()
# 响应时间
response_time = cells[9].text.strip()
# 延迟
latency = cells[10].text.strip()
# 更新时间
last_updated = cells[11].text.strip()
# 验证必要字段
if not ip_address or not port_text:
continue
# 转换端口为整数
try:
port = int(port_text)
except ValueError:
continue
# 映射协议
protocol_map = {
'http': ProxyProtocol.HTTP,
'https': ProxyProtocol.HTTPS,
'socks4': ProxyProtocol.SOCKS4,
'socks5': ProxyProtocol.SOCKS5
}
protocol = protocol_map.get(protocol_text, ProxyProtocol.SOCKS5)
proxy = ProxyInfo(
ip_address=ip_address,
port=port,
username=username,
password=password,
protocol=protocol,
country=country,
anonymity=anonymity,
speed=speed,
uptime_percentage=uptime,
response_time=response_time,
latency=latency,
last_updated=last_updated
)
proxies.append(proxy)
except Exception as e:
self.logger.debug(f"解析代理行失败: {str(e)}")
continue
return proxies
def load_from_file(self, filepath: str) -> List[ProxyInfo]:
"""从本地文件加载代理列表"""
self.logger.info(f"从本地文件加载代理: {filepath}")
proxies = []
if not os.path.exists(filepath):
self.logger.warning(f"文件不存在: {filepath}")
return proxies
try:
with open(filepath, 'r', encoding='utf-8') as f:
data = json.load(f)
if isinstance(data, list):
for item in data:
try:
proxy = ProxyInfo.from_dict(item)
proxies.append(proxy)
except Exception as e:
self.logger.debug(f"解析代理项失败: {str(e)}")
continue
self.logger.info(f"成功加载 {len(proxies)} 个代理")
except Exception as e:
self.logger.error(f"加载本地文件失败: {str(e)}")
return proxies
def save_to_file(self, proxies: List[ProxyInfo], filepath: str):
"""保存代理列表到文件"""
try:
data = [proxy.to_dict() for proxy in proxies]
with open(filepath, 'w', encoding='utf-8') as f:
json.dump(data, f, indent=2, ensure_ascii=False)
self.logger.info(f"保存 {len(proxies)} 个代理到文件: {filepath}")
except Exception as e:
self.logger.error(f"保存文件失败: {str(e)}")