VPSHUB/app/monitor.py
2026-05-30 15:06:24 +08:00

784 lines
30 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""VPS Hub 多平台监控程序"""
import os
import sys
import time
import json
import logging
import subprocess
import requests
import schedule
from datetime import datetime, timedelta
from pathlib import Path
from db_manager import DatabaseManager
class PlatformAdapter:
"""平台适配器基类"""
def __init__(self, site_url, account, api_key):
self.site_url = site_url.rstrip('/')
self.account = account
self.api_key = api_key
self.jwt_token = None
class MofangAdapter(PlatformAdapter):
"""魔方平台适配器"""
def __init__(self, site_url, account, api_key):
super().__init__(site_url, account, api_key)
if not self.site_url:
raise ValueError("魔方平台必须提供网站链接(API地址)")
def _get_headers(self):
"""获取请求头"""
if not self.jwt_token:
self.login()
return {
'Authorization': f'JWT {self.jwt_token}',
'Content-Type': 'application/json'
}
def login(self):
try:
url = f"{self.site_url}/v1/login_api"
data = {
'account': self.account,
'password': self.api_key
}
response = requests.post(url, data=data, timeout=10)
if response.status_code == 200:
result = response.json()
if result.get('status') == 200 and 'jwt' in result:
self.jwt_token = result['jwt']
return self.jwt_token
else:
logging.error(f"登录失败: {result.get('msg', '未知错误')}")
return None
else:
logging.error(f"登录请求失败HTTP状态码: {response.status_code}")
return None
except Exception as e:
logging.error(f"登录异常: {str(e)}")
return None
def get_vps_list(self, page=1, limit=100):
"""获取VPS列表"""
try:
headers = self._get_headers()
if not headers:
return None
url = f"{self.site_url}/v1/hosts?page={page}&limit={limit}"
response = requests.get(url, headers=headers, timeout=10)
if response.status_code == 200:
result = response.json()
if result.get('status') == 200:
return result['data']
elif result.get('status') == 405:
logging.warning("Token失效重新登录")
self.jwt_token = None
return self.get_vps_list(page, limit)
else:
logging.error(f"获取VPS列表失败: {result.get('msg', '未知错误')}")
return None
else:
logging.error(f"获取VPS列表请求失败HTTP状态码: {response.status_code}")
return None
except Exception as e:
logging.error(f"获取VPS列表异常: {str(e)}")
return None
def get_vps_status(self, vps_id):
"""获取VPS状态"""
try:
headers = self._get_headers()
if not headers:
return None
url = f"{self.site_url}/v1/hosts/{vps_id}/module/status?type=host"
response = requests.get(url, headers=headers, timeout=10)
if response.status_code == 200:
result = response.json()
if result.get('status') == 200:
return result['data']
elif result.get('status') == 405:
logging.warning("Token失效重新登录")
self.jwt_token = None
return self.get_vps_status(vps_id)
else:
logging.error(f"获取VPS {vps_id} 状态失败: {result.get('msg', '未知错误')}")
return None
else:
logging.error(f"获取VPS {vps_id} 状态请求失败HTTP状态码: {response.status_code}")
return None
except Exception as e:
logging.error(f"获取VPS {vps_id} 状态异常: {str(e)}")
return None
def get_vps_details(self, vps_id):
"""获取VPS详细信息"""
try:
headers = self._get_headers()
if not headers:
return None
url = f"{self.site_url}/v1/hosts/{vps_id}"
response = requests.get(url, headers=headers, timeout=10)
if response.status_code == 200:
result = response.json()
if result.get('status') == 200:
# 返回完整的数据结构包含host信息和config_option
return result['data']
elif result.get('status') == 405:
logging.warning("Token失效重新登录")
self.jwt_token = None
return self.get_vps_details(vps_id)
else:
logging.error(f"获取VPS {vps_id} 详情失败: {result.get('msg', '未知错误')}")
return None
else:
logging.error(f"获取VPS {vps_id} 详情请求失败HTTP状态码: {response.status_code}")
return None
except Exception as e:
logging.error(f"获取VPS {vps_id} 详情异常: {str(e)}")
return None
def power_on(self, vps_id):
"""开机"""
try:
headers = self._get_headers()
if not headers:
return False
url = f"{self.site_url}/v1/hosts/{vps_id}/module/on"
response = requests.put(url, headers=headers, timeout=10)
if response.status_code == 200:
result = response.json()
if result.get('status') == 200:
logging.info(f"VPS {vps_id} 开机指令发送成功")
return True
elif result.get('status') == 405:
logging.warning("Token失效重新登录")
self.jwt_token = None
return self.power_on(vps_id)
else:
logging.error(f"VPS {vps_id} 开机失败: {result.get('msg', '未知错误')}")
return False
else:
logging.error(f"VPS {vps_id} 开机请求失败HTTP状态码: {response.status_code}")
return False
except Exception as e:
logging.error(f"VPS {vps_id} 开机异常: {str(e)}")
return False
def power_off(self, vps_id):
"""关机"""
try:
headers = self._get_headers()
if not headers:
return False
url = f"{self.site_url}/v1/hosts/{vps_id}/module/off"
response = requests.put(url, headers=headers, timeout=10)
if response.status_code == 200:
result = response.json()
if result.get('status') == 200:
logging.info(f"VPS {vps_id} 关机指令发送成功")
return True
elif result.get('status') == 405:
logging.warning("Token失效重新登录")
self.jwt_token = None
return self.power_off(vps_id)
else:
logging.error(f"VPS {vps_id} 关机失败: {result.get('msg', '未知错误')}")
return False
else:
logging.error(f"VPS {vps_id} 关机请求失败HTTP状态码: {response.status_code}")
return False
except Exception as e:
logging.error(f"VPS {vps_id} 关机异常: {str(e)}")
return False
def hard_reboot(self, vps_id):
"""硬重启"""
try:
headers = self._get_headers()
if not headers:
return False
url = f"{self.site_url}/v1/hosts/{vps_id}/module/hard_reboot"
response = requests.put(url, headers=headers, timeout=10)
if response.status_code == 200:
result = response.json()
if result.get('status') == 200:
logging.info(f"VPS {vps_id} 硬重启指令发送成功")
return True
elif result.get('status') == 405:
logging.warning("Token失效重新登录")
self.jwt_token = None
return self.hard_reboot(vps_id)
else:
logging.error(f"VPS {vps_id} 硬重启失败: {result.get('msg', '未知错误')}")
return False
else:
logging.error(f"VPS {vps_id} 硬重启请求失败HTTP状态码: {response.status_code}")
return False
except Exception as e:
logging.error(f"VPS {vps_id} 硬重启异常: {str(e)}")
return False
class MonitorService:
"""监控服务主类"""
def __init__(self):
"""初始化监控服务"""
self.db = DatabaseManager()
self.adapters = {} # config_id -> adapter实例
# 创建日志目录
log_dir = os.path.join(os.path.dirname(os.path.abspath(__file__)), 'logs')
os.makedirs(log_dir, exist_ok=True)
# 配置日志
self.setup_logging(log_dir)
self.logger.info("=" * 60)
self.logger.info("VPS Hub 监控程序启动")
self.logger.info("=" * 60)
def setup_logging(self, log_dir):
"""配置日志系统"""
self.logger = logging.getLogger('VPSHubMonitor')
self.logger.setLevel(logging.DEBUG)
# 清除已有handler
self.logger.handlers.clear()
# 正常日志文件
normal_log = os.path.join(log_dir, 'monitor.log')
normal_handler = logging.FileHandler(normal_log, encoding='utf-8')
normal_handler.setLevel(logging.INFO)
normal_format = logging.Formatter(
'%(asctime)s - %(levelname)s - %(message)s',
datefmt='%Y-%m-%d %H:%M:%S'
)
normal_handler.setFormatter(normal_format)
self.logger.addHandler(normal_handler)
# 异常日志文件
error_log = os.path.join(log_dir, 'error.log')
error_handler = logging.FileHandler(error_log, encoding='utf-8')
error_handler.setLevel(logging.WARNING)
error_format = logging.Formatter(
'%(asctime)s - %(levelname)s - [%(filename)s:%(lineno)d] - %(message)s',
datefmt='%Y-%m-%d %H:%M:%S'
)
error_handler.setFormatter(error_format)
self.logger.addHandler(error_handler)
# 控制台输出
console_handler = logging.StreamHandler(sys.stdout)
console_handler.setLevel(logging.INFO)
console_format = logging.Formatter(
'%(asctime)s - %(levelname)s - %(message)s',
datefmt='%H:%M:%S'
)
console_handler.setFormatter(console_format)
self.logger.addHandler(console_handler)
def load_configs_and_create_adapters(self):
"""从数据库加载所有配置并创建适配器"""
configs = self.db.get_all_configs()
for config in configs:
config_id = config['id']
site_type = config['site_type']
# 根据网站类型创建对应的适配器
if site_type == 'mofang':
adapter = MofangAdapter(
site_url=config['site_url'],
account=config['account'],
api_key=config['api_key']
)
self.adapters[config_id] = adapter
self.logger.info(f"已加载魔方平台配置: ID={config_id}, URL={config['site_url']}")
elif site_type == 'aliyun':
# TODO: 实现阿里云适配器
self.logger.warning(f"阿里云适配器尚未实现: ID={config_id}")
elif site_type == 'tencent':
# TODO: 实现腾讯云适配器
self.logger.warning(f"腾讯云适配器尚未实现: ID={config_id}")
else:
self.logger.warning(f"未知的平台类型: {site_type}, ID={config_id}")
def ping_host(self, ip_address):
"""Ping检测主机是否存活
Returns:
(is_alive, latency_ms): 是否存活和延迟(ms)
"""
try:
# Windows和Linux的ping命令参数不同
if sys.platform == 'win32':
cmd = ['ping', '-n', '1', '-w', '2000', ip_address]
else:
cmd = ['ping', '-c', '1', '-W', '2', ip_address]
start_time = time.time()
result = subprocess.run(
cmd,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
timeout=5
)
end_time = time.time()
latency_ms = (end_time - start_time) * 1000 # 转换为毫秒
is_alive = result.returncode == 0
return is_alive, latency_ms if is_alive else None
except Exception as e:
self.logger.debug(f"Ping {ip_address} 异常: {str(e)}")
return False, None
def ping_and_record_status(self):
"""对所有VPS进行ping测试并记录状态"""
self.logger.info("开始Ping检测所有VPS...")
vps_list = self.db.get_all_vps()
if not vps_list:
self.logger.info("数据库中没有VPS记录")
return
success_count = 0
fail_count = 0
for vps in vps_list:
target = vps['ip_address'] or vps['domain']
if not target:
self.logger.warning(f"VPS {vps['vps_id']} 没有IP或域名跳过")
continue
self.logger.debug(f"正在Ping检测: {target} (VPS ID: {vps['vps_id']})")
is_alive, latency_ms = self.ping_host(target)
status = 'normal' if is_alive else 'abnormal'
self.db.save_ping_status(
vps_id=vps['vps_id'],
target=target,
status=status,
latency_ms=latency_ms
)
# 如果Ping成功直接更新VPS状态为'on'无需调用API
if is_alive:
self.db.update_vps_status(vps['vps_id'], 'on')
success_count += 1
self.logger.debug(f"VPS {vps['vps_id']} Ping成功状态更新为 on")
else:
fail_count += 1
self.logger.debug(f"VPS {vps['vps_id']} Ping失败需要进一步检查")
self.logger.info(f"Ping检测完成: 成功{success_count}台, 失败{fail_count}")
def check_and_power_on_unreachable_vps(self):
"""检查无法访问的VPS并尝试开机"""
self.logger.info("检查需要开机的VPS...")
# 获取所有标记为需要监控的VPS
monitored_vps = self.db.get_monitored_vps()
if not monitored_vps:
self.logger.info("没有标记为需要监控的VPS")
return
# 获取今天ping失败的VPS
today = datetime.now().strftime('%Y-%m-%d')
unreachable_vps = []
for vps in monitored_vps:
# 检查今天的ping记录
records = self.db.get_ping_records(vps['vps_id'], today)
# 如果今天有记录且最后一次是abnormal
if records and records[-1]['status'] == 'abnormal':
unreachable_vps.append(vps)
if not unreachable_vps:
self.logger.info("所有监控中的VPS都正常")
return
self.logger.info(f"发现 {len(unreachable_vps)} 台VPS Ping失败需要检查实际状态")
# 对每个异常的VPS调用API检查实际状态并尝试开机
need_power_on = []
for vps in unreachable_vps:
config_id = vps['config_id']
vps_id = vps['vps_id']
if config_id not in self.adapters:
self.logger.warning(f"配置ID {config_id} 没有对应的适配器,跳过")
continue
adapter = self.adapters[config_id]
self.logger.info(f"调用API检查VPS {vps_id} 的实际状态...")
status_data = adapter.get_vps_status(vps_id)
if status_data:
status = status_data.get('status', 'unknown')
des = status_data.get('des', '未知')
# 更新数据库中的VPS状态
self.db.update_vps_status(vps_id, status)
self.logger.info(f"VPS {vps_id} API返回状态: {des} ({status})")
if status == 'on':
self.logger.info(f"VPS {vps_id} 实际状态为开机可能是禁Ping或网络问题")
else:
self.logger.warning(f"VPS {vps_id} 实际状态为关机,需要开机")
need_power_on.append((config_id, vps_id))
else:
self.logger.error(f"无法获取VPS {vps_id} 的状态")
# API调用失败也尝试开机
need_power_on.append((config_id, vps_id))
if not need_power_on:
self.logger.info("所有异常VPS实际都是开机状态无需操作")
return
# 对需要开机的VPS执行开机操作
self.logger.info(f"开始对 {len(need_power_on)} 台VPS执行开机操作...")
for config_id, vps_id in need_power_on:
adapter = self.adapters[config_id]
self.logger.info(f"正在开启VPS {vps_id}...")
adapter.power_on(vps_id)
# 等待60秒后验证开机结果
self.logger.info("等待60秒后验证开机结果...")
time.sleep(60)
# 验证开机结果最多尝试2次
self.verify_power_on_result(need_power_on, max_retries=2)
def verify_power_on_result(self, vps_list, max_retries=2):
"""验证开机结果
Args:
vps_list: [(config_id, vps_id), ...]
max_retries: 最大重试次数
"""
retry_count = 0
while retry_count < max_retries:
retry_count += 1
still_off = []
for config_id, vps_id in vps_list:
adapter = self.adapters[config_id]
self.logger.info(f"验证VPS {vps_id} 开机状态 (第{retry_count}次)...")
status_data = adapter.get_vps_status(vps_id)
if status_data and status_data.get('status') == 'on':
# 开机成功,更新数据库状态
self.db.update_vps_status(vps_id, 'on')
self.logger.info(f"✅ VPS {vps_id} 开机成功,状态已更新")
else:
# 开机失败,获取实际状态并更新数据库
actual_status = status_data.get('status', 'unknown') if status_data else 'unknown'
actual_des = status_data.get('des', '未知') if status_data else '无法获取'
self.db.update_vps_status(vps_id, actual_status)
self.logger.warning(f"❌ VPS {vps_id} 开机失败,实际状态: {actual_des} ({actual_status}),状态已更新")
still_off.append((config_id, vps_id))
if not still_off:
self.logger.info("所有VPS开机验证完成")
return
if retry_count < max_retries:
self.logger.info(f"还有 {len(still_off)} 台VPS未开机进行第{retry_count}次重试...")
for config_id, vps_id in still_off:
adapter = self.adapters[config_id]
adapter.power_on(vps_id)
self.logger.info("等待60秒后再次验证...")
time.sleep(60)
# 达到最大重试次数更新所有仍失败的VPS状态
self.logger.warning(f"已达到最大重试次数({max_retries})仍有VPS未成功开机")
for config_id, vps_id in still_off:
adapter = self.adapters[config_id]
# 最后一次查询状态
status_data = adapter.get_vps_status(vps_id)
if status_data:
final_status = status_data.get('status', 'unknown')
self.db.update_vps_status(vps_id, final_status)
self.logger.warning(f"VPS {vps_id} 最终状态: {status_data.get('des', '未知')} ({final_status})")
else:
self.db.update_vps_status(vps_id, 'unknown')
self.logger.warning(f"VPS {vps_id} 无法获取最终状态")
def generate_daily_summary(self):
"""生成前一天的VPS摘要统计(每天0点执行)"""
self.logger.info("开始生成昨日VPS摘要统计...")
yesterday = (datetime.now() - timedelta(days=1)).strftime('%Y-%m-%d')
vps_list = self.db.get_all_vps()
processed_count = 0
for vps in vps_list:
vps_id = vps['vps_id']
# 查询昨天的所有ping记录
records = self.db.get_ping_records(vps_id, yesterday)
if not records:
self.logger.debug(f"VPS {vps_id} 昨天没有ping记录跳过")
continue
# 计算统计数据
latencies = [r['latency_ms'] for r in records if r['latency_ms']]
if not latencies:
avg_latency = 0
max_latency = 0
min_latency = 0
else:
avg_latency = sum(latencies) / len(latencies)
max_latency = max(latencies)
min_latency = min(latencies)
# 分级统计
count_under_100 = sum(1 for l in latencies if l < 100)
count_100_to_300 = sum(1 for l in latencies if 100 <= l < 300)
count_300_to_500 = sum(1 for l in latencies if 300 <= l < 500)
count_abnormal = sum(1 for r in records if r['status'] == 'abnormal')
# 计算可用性评分
total = len(records)
if total > 0:
excellent_ratio = count_under_100 / total
good_ratio = (count_under_100 + count_100_to_300) / total
normal_ratio = (count_under_100 + count_100_to_300 + count_300_to_500) / total
if excellent_ratio > 0.8:
availability = '优秀'
elif good_ratio > 0.8:
availability = '良好'
elif normal_ratio > 0.8:
availability = '一般'
else:
availability = '不可用'
else:
availability = '不可用'
# 保存到数据库
self.db.save_vps_summary(
vps_id=vps_id,
date=yesterday,
avg_latency=avg_latency,
max_latency=max_latency,
min_latency=min_latency,
count_under_100=count_under_100,
count_100_to_300=count_100_to_300,
count_300_to_500=count_300_to_500,
count_abnormal=count_abnormal,
availability=availability
)
processed_count += 1
self.logger.info(f"摘要统计生成完成,共处理{processed_count}台VPS")
def cleanup_old_data(self):
"""清理超过30天的ping状态数据"""
deleted = self.db.cleanup_old_ping_records(days=30)
if deleted > 0:
self.logger.info(f"已清理{deleted}条旧的ping记录")
def refresh_vps_list(self):
"""刷新所有配置的VPS列表"""
self.logger.info("开始刷新VPS列表...")
configs = self.db.get_all_configs()
for config in configs:
config_id = config['id']
if config_id not in self.adapters:
self.logger.warning(f"配置ID {config_id} 没有对应的适配器,跳过")
continue
adapter = self.adapters[config_id]
self.logger.info(f"正在获取配置 {config_id} 的VPS列表...")
vps_data = adapter.get_vps_list()
if not vps_data or 'host' not in vps_data:
self.logger.error(f"配置 {config_id} 获取VPS列表失败")
continue
# 准备批量插入的数据
vps_list_to_add = []
for host in vps_data['host']:
# 解析详细信息
cpu_cores = None
memory_size = None
disk_size = None
bandwidth = None
os_type = None
if 'config_option' in host and isinstance(host['config_option'], list):
for option in host['config_option']:
key = option.get('key')
value = option.get('value')
if key == 'cpu' and value:
# 提取数字,例如 "16核" -> 16
import re
match = re.search(r'(\d+)', value)
if match:
cpu_cores = int(match.group(1))
elif key == 'memory' and value:
memory_size = value # 例如 "16G"
elif key == 'system_disk_size' and value:
disk_size = value # 例如 "Lin50G,Win50G"
elif key == 'bw' and value:
bandwidth = value # 例如 "70Mbps"
elif key == 'os' and value:
os_type = value # 例如 "Debian-12.0_x64"
vps_list_to_add.append({
'config_id': config_id,
'vps_id': host['id'],
'domain': host.get('domain'),
'ip_address': host.get('dedicatedip'),
'product_name': host.get('product_name'),
'cpu_cores': cpu_cores,
'memory_size': memory_size,
'disk_size': disk_size,
'bandwidth': bandwidth,
'os_type': os_type,
'amount': host.get('amount'),
'nextduedate': host.get('nextduedate'),
'section': config['auto_monitor']
})
# 批量添加到数据库
self.db.batch_add_vps(vps_list_to_add)
self.logger.info(f"配置 {config_id} 已更新 {len(vps_list_to_add)} 台VPS")
self.logger.info("VPS列表刷新完成")
def run_monitoring_cycle(self):
"""执行一次监控循环"""
try:
self.logger.info("\n" + "=" * 60)
self.logger.info(f"开始监控循环 - {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")
self.logger.info("=" * 60)
# 1. 对所有VPS进行ping测试
self.ping_and_record_status()
# 2. 检查并开机无法访问的VPS
self.check_and_power_on_unreachable_vps()
# 3. 清理旧数据
self.cleanup_old_data()
self.logger.info("本次监控循环完成\n")
except Exception as e:
self.logger.error(f"监控循环异常: {str(e)}", exc_info=True)
def setup_schedule(self):
"""设置定时任务"""
# 每天0点执行摘要统计
schedule.every().day.at("00:00").do(self.generate_daily_summary)
self.logger.info("定时任务已设置: 每天00:00生成摘要统计")
def run(self):
"""运行监控程序(主循环)"""
# 加载配置和创建适配器
self.load_configs_and_create_adapters()
if not self.adapters:
self.logger.warning("没有可用的平台配置,请先通过网页添加配置")
self.logger.info("程序将在60秒后退出")
time.sleep(60)
return
# 设置定时任务
self.setup_schedule()
# 首次运行时刷新VPS列表
self.logger.info("首次运行刷新VPS列表...")
self.refresh_vps_list()
span = 300 # 默认5分钟间隔
self.logger.info(f"监控程序开始运行,间隔 {span}")
try:
while True:
# 执行监控循环
self.run_monitoring_cycle()
# 运行待执行的定时任务
schedule.run_pending()
self.logger.info(f"等待 {span} 秒后进行下一次检测...")
time.sleep(span)
except KeyboardInterrupt:
self.logger.info("收到中断信号,程序退出")
except Exception as e:
self.logger.error(f"程序运行异常: {str(e)}", exc_info=True)
sys.exit(1)
if __name__ == '__main__':
try:
monitor = MonitorService()
monitor.run()
except Exception as e:
print(f"程序启动失败: {str(e)}")
sys.exit(1)