99 lines
2.9 KiB
Python
99 lines
2.9 KiB
Python
"""数据模型定义"""
|
|
from dataclasses import dataclass, field
|
|
from typing import Optional
|
|
from enum import Enum
|
|
from datetime import datetime
|
|
|
|
|
|
class ProxyProtocol(Enum):
|
|
"""代理协议枚举"""
|
|
HTTP = "http"
|
|
HTTPS = "https"
|
|
SOCKS4 = "socks4"
|
|
SOCKS5 = "socks5"
|
|
|
|
|
|
class ProxyStatus(Enum):
|
|
"""代理状态枚举"""
|
|
UNKNOWN = "unknown"
|
|
CHECKING = "checking"
|
|
AVAILABLE = "available"
|
|
EXCELLENT = "excellent"
|
|
UNAVAILABLE = "unavailable"
|
|
|
|
|
|
@dataclass
|
|
class ProxyInfo:
|
|
"""代理信息数据类"""
|
|
ip_address: str
|
|
port: int
|
|
username: str = "no need"
|
|
password: str = "no need"
|
|
protocol: ProxyProtocol = ProxyProtocol.SOCKS5
|
|
country: str = ""
|
|
anonymity: str = ""
|
|
speed: str = ""
|
|
uptime_percentage: str = ""
|
|
response_time: str = ""
|
|
latency: str = ""
|
|
last_updated: str = ""
|
|
|
|
# 运行时状态
|
|
status: ProxyStatus = ProxyStatus.UNKNOWN
|
|
latency_ms: float = 0.0
|
|
last_check: Optional[datetime] = None
|
|
consecutive_failures: int = 0
|
|
is_active: bool = False
|
|
|
|
def to_dict(self) -> dict:
|
|
"""转换为字典"""
|
|
return {
|
|
'ip_address': self.ip_address,
|
|
'port': self.port,
|
|
'username': self.username,
|
|
'password': self.password,
|
|
'protocol': self.protocol.value,
|
|
'country': self.country,
|
|
'anonymity': self.anonymity,
|
|
'speed': self.speed,
|
|
'uptime_percentage': self.uptime_percentage,
|
|
'response_time': self.response_time,
|
|
'latency': self.latency,
|
|
'last_updated': self.last_updated
|
|
}
|
|
|
|
@classmethod
|
|
def from_dict(cls, data: dict) -> 'ProxyInfo':
|
|
"""从字典创建"""
|
|
protocol_map = {
|
|
'http': ProxyProtocol.HTTP,
|
|
'https': ProxyProtocol.HTTPS,
|
|
'socks4': ProxyProtocol.SOCKS4,
|
|
'socks5': ProxyProtocol.SOCKS5
|
|
}
|
|
|
|
return cls(
|
|
ip_address=data['ip_address'],
|
|
port=int(data['port']),
|
|
username=data.get('username', 'no need'),
|
|
password=data.get('password', 'no need'),
|
|
protocol=protocol_map.get(data.get('protocol', 'socks5'), ProxyProtocol.SOCKS5),
|
|
country=data.get('country', ''),
|
|
anonymity=data.get('anonymity', ''),
|
|
speed=data.get('speed', ''),
|
|
uptime_percentage=data.get('uptime_percentage', ''),
|
|
response_time=data.get('response_time', ''),
|
|
latency=data.get('latency', ''),
|
|
last_updated=data.get('last_updated', '')
|
|
)
|
|
|
|
def get_address(self) -> str:
|
|
"""获取代理地址字符串"""
|
|
return f"{self.ip_address}:{self.port}"
|
|
|
|
def get_auth_string(self) -> Optional[str]:
|
|
"""获取认证字符串"""
|
|
if self.username and self.username != "no need":
|
|
return f"{self.username}:{self.password}"
|
|
return None
|