VPSHUB/app/monitor.py
2026-05-26 00:09:05 +08:00

592 lines
22 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
核云IDC VPS自动监测重启程序
功能:实时监测服务器状态,发现关机自动开机
"""
import os
import sys
import time
import yaml
import json
import logging
import subprocess
import requests
from datetime import datetime
from pathlib import Path
class IDCMonitor:
"""核云IDC监控器"""
def __init__(self, config_path=None):
"""初始化监控器"""
# 获取配置文件路径
if config_path is None:
config_path = os.path.join(os.path.dirname(os.path.abspath(__file__)), 'config.yml')
self.config_path = config_path
self.config = {}
self.base_url = "https://www.heyunidc.cn/v1"
self.jwt_token = None
self.retry_count = 0 # 开机重试次数
# 创建日志目录
log_dir = os.path.join(os.path.dirname(os.path.abspath(__file__)), 'logs')
os.makedirs(log_dir, exist_ok=True)
# 配置日志
self.setup_logging(log_dir)
# 加载配置
self.load_config()
self.logger.info("=" * 60)
self.logger.info("核云IDC VPS监控程序启动")
self.logger.info(f"监控方式: {self.config.get('WAY', 'ping')}")
self.logger.info(f"监控间隔: {self.config.get('SPAN', 300)}")
self.logger.info("=" * 60)
def setup_logging(self, log_dir):
"""配置日志系统"""
# 创建logger
self.logger = logging.getLogger('IDCMonitor')
self.logger.setLevel(logging.DEBUG)
# 清除已有handler
self.logger.handlers.clear()
# 正常日志文件
normal_log = os.path.join(log_dir, 'monitor.log')
normal_handler = logging.FileHandler(normal_log, encoding='utf-8')
normal_handler.setLevel(logging.INFO)
normal_format = logging.Formatter(
'%(asctime)s - %(levelname)s - %(message)s',
datefmt='%Y-%m-%d %H:%M:%S'
)
normal_handler.setFormatter(normal_format)
self.logger.addHandler(normal_handler)
# 异常日志文件
error_log = os.path.join(log_dir, 'error.log')
error_handler = logging.FileHandler(error_log, encoding='utf-8')
error_handler.setLevel(logging.WARNING)
error_format = logging.Formatter(
'%(asctime)s - %(levelname)s - [%(filename)s:%(lineno)d] - %(message)s',
datefmt='%Y-%m-%d %H:%M:%S'
)
error_handler.setFormatter(error_format)
self.logger.addHandler(error_handler)
# 控制台输出
console_handler = logging.StreamHandler(sys.stdout)
console_handler.setLevel(logging.INFO)
console_format = logging.Formatter(
'%(asctime)s - %(levelname)s - %(message)s',
datefmt='%H:%M:%S'
)
console_handler.setFormatter(console_format)
self.logger.addHandler(console_handler)
def load_config(self):
"""加载配置文件"""
try:
with open(self.config_path, 'r', encoding='utf-8') as f:
self.config = yaml.safe_load(f)
# 验证必要配置
required_keys = ['ACCOUNT', 'API_KEY', 'WAY']
for key in required_keys:
if key not in self.config or not self.config[key]:
raise ValueError(f"配置文件中缺少必要项: {key}")
# 设置默认值
self.config.setdefault('SPAN', 300)
self.config.setdefault('JWT', '')
self.config.setdefault('EXCEPTION_IPS', [])
# 如果WAY为http检查DOMAIN
if self.config['WAY'] == 'http':
if 'DOMAIN' not in self.config or not self.config['DOMAIN']:
raise ValueError("WAY为http时必须配置DOMAIN")
# 加载JWT token
if self.config.get('JWT'):
self.jwt_token = self.config['JWT']
self.logger.info("已加载缓存的JWT Token")
# 加载例外IP列表
self.exception_ips = self.config.get('EXCEPTION_IPS', [])
if self.exception_ips:
self.logger.info(f"已加载 {len(self.exception_ips)} 个例外IP")
self.logger.info("配置文件加载成功")
except FileNotFoundError:
self.logger.error(f"配置文件不存在: {self.config_path}")
sys.exit(1)
except Exception as e:
self.logger.error(f"配置文件加载失败: {str(e)}")
sys.exit(1)
def save_jwt_token(self, token):
"""保存JWT Token到配置文件"""
try:
self.config['JWT'] = token
with open(self.config_path, 'w', encoding='utf-8') as f:
yaml.dump(self.config, f, allow_unicode=True, default_flow_style=False)
self.jwt_token = token
self.logger.debug("JWT Token已保存到配置文件")
except Exception as e:
self.logger.error(f"保存JWT Token失败: {str(e)}")
def get_login_token(self):
"""获取登录Token"""
try:
url = f"{self.base_url}/login_api"
data = {
'account': self.config['ACCOUNT'],
'password': self.config['API_KEY']
}
response = requests.post(url, data=data, timeout=10)
if response.status_code == 200:
result = response.json()
if result.get('status') == 200 and 'jwt' in result:
token = result['jwt']
self.save_jwt_token(token)
self.logger.info("成功获取新的JWT Token")
return token
else:
self.logger.error(f"登录失败: {result.get('msg', '未知错误')}")
return None
else:
self.logger.error(f"登录请求失败HTTP状态码: {response.status_code}")
return None
except Exception as e:
self.logger.error(f"获取Token异常: {str(e)}")
return None
def get_headers(self):
"""获取请求头包含JWT"""
if not self.jwt_token:
self.jwt_token = self.get_login_token()
if not self.jwt_token:
return None
return {
'Authorization': f'JWT {self.jwt_token}',
'Content-Type': 'application/json'
}
def get_vps_list(self):
"""获取VPS列表"""
try:
headers = self.get_headers()
if not headers:
return None
url = f"{self.base_url}/hosts?page=1&limit=100"
response = requests.get(url, headers=headers, timeout=10)
if response.status_code == 200:
result = response.json()
if result.get('status') == 200:
self.logger.debug(f"成功获取VPS列表{result['data']['total']}")
return result['data']
elif result.get('status') == 405:
self.logger.warning("Token失效重新获取Token")
self.jwt_token = None
return self.get_vps_list() # 递归调用重试
else:
self.logger.error(f"获取VPS列表失败: {result.get('msg', '未知错误')}")
return None
else:
self.logger.error(f"获取VPS列表请求失败HTTP状态码: {response.status_code}")
return None
except Exception as e:
self.logger.error(f"获取VPS列表异常: {str(e)}")
return None
def get_vps_status(self, host_id):
"""获取指定VPS的状态"""
try:
headers = self.get_headers()
if not headers:
return None
url = f"{self.base_url}/hosts/{host_id}/module/status?type=host"
response = requests.get(url, headers=headers, timeout=10)
if response.status_code == 200:
result = response.json()
if result.get('status') == 200:
return result['data']
elif result.get('status') == 405:
self.logger.warning("Token失效重新获取Token")
self.jwt_token = None
return self.get_vps_status(host_id) # 递归调用重试
else:
self.logger.error(f"获取VPS {host_id} 状态失败: {result.get('msg', '未知错误')}")
return None
else:
self.logger.error(f"获取VPS {host_id} 状态请求失败HTTP状态码: {response.status_code}")
return None
except Exception as e:
self.logger.error(f"获取VPS {host_id} 状态异常: {str(e)}")
return None
def power_on_vps(self, host_id):
"""开机指定VPS"""
try:
headers = self.get_headers()
if not headers:
return False
url = f"{self.base_url}/hosts/{host_id}/module/on"
response = requests.put(url, headers=headers, timeout=10)
if response.status_code == 200:
result = response.json()
if result.get('status') == 200:
self.logger.info(f"VPS {host_id} 开机指令发送成功")
return True
elif result.get('status') == 405:
self.logger.warning("Token失效重新获取Token")
self.jwt_token = None
return self.power_on_vps(host_id) # 递归调用重试
else:
self.logger.error(f"VPS {host_id} 开机失败: {result.get('msg', '未知错误')}")
return False
else:
self.logger.error(f"VPS {host_id} 开机请求失败HTTP状态码: {response.status_code}")
return False
except Exception as e:
self.logger.error(f"VPS {host_id} 开机异常: {str(e)}")
return False
def ping_host(self, ip_address):
"""Ping检测主机是否存活"""
try:
# Windows和Linux的ping命令参数不同
if sys.platform == 'win32':
cmd = ['ping', '-n', '1', '-w', '2000', ip_address]
else:
cmd = ['ping', '-c', '1', '-W', '2', ip_address]
result = subprocess.run(
cmd,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
timeout=5
)
return result.returncode == 0
except Exception as e:
self.logger.debug(f"Ping {ip_address} 异常: {str(e)}")
return False
def check_http_host(self, domain):
"""HTTP HEAD检测域名是否存活"""
try:
response = requests.head(
f"http://{domain}",
timeout=5,
allow_redirects=True
)
return response.status_code < 400
except Exception as e:
self.logger.debug(f"HTTP检测 {domain} 异常: {str(e)}")
return False
def detect_hosts(self):
"""检测所有主机存活状态"""
way = self.config.get('WAY', 'ping')
if way == 'ping':
return self.detect_by_ping()
elif way == 'http':
return self.detect_by_http()
else:
self.logger.error(f"不支持的检测方式: {way}")
return []
def detect_by_ping(self):
"""通过Ping检测主机"""
self.logger.info("开始Ping检测所有VPS...")
unreachable_hosts = []
# 获取VPS列表
vps_data = self.get_vps_list()
if not vps_data or 'host' not in vps_data:
self.logger.error("无法获取VPS列表")
return []
for host in vps_data['host']:
host_id = host['id']
ip = host.get('dedicatedip', '')
if not ip:
self.logger.warning(f"VPS {host_id} 没有IP地址跳过")
continue
self.logger.debug(f"正在Ping检测: {ip} (ID: {host_id})")
if not self.ping_host(ip):
# 检查是否为例外IP
if ip in self.exception_ips:
self.logger.info(f"VPS {host_id} ({ip}) Ping不通但属于例外IP跳过")
continue
self.logger.warning(f"VPS {host_id} ({ip}) Ping不通")
unreachable_hosts.append({
'id': host_id,
'ip': ip,
'domain': host.get('domain', ''),
'product_name': host.get('product_name', '')
})
if unreachable_hosts:
self.logger.warning(f"发现 {len(unreachable_hosts)} 台VPS无法Ping通")
else:
self.logger.info("所有VPS Ping检测正常")
return unreachable_hosts
def detect_by_http(self):
"""通过HTTP检测域名"""
domains_str = self.config.get('DOMAIN', '')
if not domains_str:
self.logger.error("未配置DOMAIN")
return []
domains = [d.strip() for d in domains_str.split(',') if d.strip()]
self.logger.info(f"开始HTTP检测 {len(domains)} 个域名...")
unreachable_domains = []
for domain in domains:
self.logger.debug(f"正在HTTP检测: {domain}")
if not self.check_http_host(domain):
self.logger.warning(f"域名 {domain} HTTP检测失败")
unreachable_domains.append(domain)
if unreachable_domains:
self.logger.warning(f"发现 {len(unreachable_domains)} 个域名访问异常: {', '.join(unreachable_domains)}")
else:
self.logger.info("所有域名HTTP检测正常")
return unreachable_domains
def check_and_power_on(self, unreachable_hosts_or_domains):
"""检查并开机无法访问的VPS
Args:
unreachable_hosts_or_domains:
- ping模式: [{'id': xxx, 'ip': xxx, ...}, ...]
- http模式: ['domain1.com', 'domain2.com', ...]
"""
if not unreachable_hosts_or_domains:
self.logger.info("未发现需要处理的异常情况")
return
way = self.config.get('WAY', 'ping')
# 如果是HTTP模式需要先找到域名对应的VPS
if way == 'http':
unreachable_domains = unreachable_hosts_or_domains
self.logger.info(f"开始查找 {len(unreachable_domains)} 个异常域名对应的VPS...")
# 获取VPS列表建立域名到ID的映射
vps_data = self.get_vps_list()
if not vps_data or 'host' not in vps_data:
self.logger.error("无法获取VPS列表")
return
# 建立域名到VPS的映射
domain_to_vps = {}
for host in vps_data['host']:
domain = host.get('domain', '')
if domain:
domain_to_vps[domain] = host
# 转换域名为VPS信息
unreachable_hosts = []
for domain in unreachable_domains:
if domain in domain_to_vps:
host = domain_to_vps[domain]
ip = host.get('dedicatedip', '')
# 检查是否为例外IP
if ip and ip in self.exception_ips:
self.logger.info(f"域名 {domain} 对应的VPS {host['id']} ({ip}) 属于例外IP跳过")
continue
unreachable_hosts.append({
'id': host['id'],
'ip': ip,
'domain': domain,
'product_name': host.get('product_name', '')
})
else:
self.logger.warning(f"域名 {domain} 未找到对应的VPS跳过")
else:
# ping模式直接使用传入的数据
unreachable_hosts = unreachable_hosts_or_domains
if not unreachable_hosts:
self.logger.info("所有异常的VPS都属于例外IP或找不到对应VPS无需操作")
return
self.logger.info(f"开始检查 {len(unreachable_hosts)} 台VPS的实际状态...")
need_power_on = []
all_are_on = True
for host_info in unreachable_hosts:
host_id = host_info['id']
self.logger.info(f"检查VPS {host_id} ({host_info.get('domain', '')}) 的实际状态...")
status_data = self.get_vps_status(host_id)
if status_data:
status = status_data.get('status', 'unknown')
des = status_data.get('des', '未知')
if status == 'on':
self.logger.info(f"VPS {host_id} 实际状态: {des} (开机中)")
else:
self.logger.warning(f"VPS {host_id} 实际状态: {des} (关机)")
need_power_on.append(host_info)
all_are_on = False
else:
self.logger.error(f"无法获取VPS {host_id} 的状态")
all_are_on = False
# 如果所有VPS都是开机状态记录日志
if all_are_on:
self.logger.info(
"检测到所有VPS均为开机状态可能是禁Ping、CDN缓存或网站临时异常无需操作"
)
return
# 对需要开机的VPS执行开机操作
if need_power_on:
self.logger.info(f"开始对 {len(need_power_on)} 台VPS执行开机操作...")
for host_info in need_power_on:
host_id = host_info['id']
self.logger.info(f"正在开启VPS {host_id} ({host_info.get('domain', '')})...")
self.power_on_vps(host_id)
# 等待60秒后验证开机结果
self.logger.info("等待60秒后验证开机结果...")
time.sleep(60)
# 验证开机结果最多尝试2次
self.verify_power_on_result(need_power_on, max_retries=2)
def verify_power_on_result(self, hosts_to_verify, max_retries=2):
"""验证开机结果"""
self.retry_count += 1
if self.retry_count > max_retries:
self.logger.warning(f"已达到最大重试次数({max_retries})仍有VPS未成功开机")
for host_info in hosts_to_verify:
self.logger.warning(
f"VPS {host_info['id']} ({host_info.get('domain', '')}) 开机失败"
)
self.retry_count = 0
return
still_off = []
for host_info in hosts_to_verify:
host_id = host_info['id']
self.logger.info(f"验证VPS {host_id} 开机状态...")
status_data = self.get_vps_status(host_id)
if status_data and status_data.get('status') == 'on':
self.logger.info(f"✅ VPS {host_id} 开机成功")
else:
self.logger.warning(f"❌ VPS {host_id} 仍未开机")
still_off.append(host_info)
if still_off:
self.logger.info(f"还有 {len(still_off)} 台VPS未开机进行第{self.retry_count}次重试...")
for host_info in still_off:
self.power_on_vps(host_info['id'])
self.logger.info("等待60秒后再次验证...")
time.sleep(60)
self.verify_power_on_result(still_off, max_retries)
else:
self.logger.info("所有VPS开机验证完成")
self.retry_count = 0
def run_once(self):
"""执行一次监控循环"""
try:
self.logger.info("\n" + "=" * 60)
self.logger.info(f"开始第 {int(time.time())} 时间戳的监控循环")
self.logger.info("=" * 60)
# 检测主机不调用API
unreachable = self.detect_hosts()
# 只有在检测到异常时才进入API流程
if unreachable:
self.logger.info("检测到异常开始调用API进行进一步检查和开机...")
self.check_and_power_on(unreachable)
else:
self.logger.info("所有检测正常无需调用API")
self.logger.info("本次监控循环完成\n")
except Exception as e:
self.logger.error(f"监控循环异常: {str(e)}", exc_info=True)
def run(self):
"""运行监控程序(主循环)"""
span = self.config.get('SPAN', 300)
self.logger.info(f"监控程序开始运行,间隔 {span}")
try:
while True:
self.run_once()
self.logger.info(f"等待 {span} 秒后进行下一次检测...")
time.sleep(span)
except KeyboardInterrupt:
self.logger.info("收到中断信号,程序退出")
except Exception as e:
self.logger.error(f"程序运行异常: {str(e)}", exc_info=True)
sys.exit(1)
def main():
"""主函数"""
try:
monitor = IDCMonitor()
monitor.run()
except Exception as e:
print(f"程序启动失败: {str(e)}")
sys.exit(1)
if __name__ == '__main__':
main()