#!/usr/bin/env python3 # -*- coding: utf-8 -*- """ 核云IDC VPS自动监测重启程序 功能:实时监测服务器状态,发现关机自动开机 """ import os import sys import time import yaml import json import logging import subprocess import requests from datetime import datetime from pathlib import Path class IDCMonitor: """核云IDC监控器""" def __init__(self, config_path=None): """初始化监控器""" # 获取配置文件路径 if config_path is None: config_path = os.path.join(os.path.dirname(os.path.abspath(__file__)), 'config.yml') self.config_path = config_path self.config = {} self.base_url = "https://www.heyunidc.cn/v1" self.jwt_token = None self.retry_count = 0 # 开机重试次数 # 创建日志目录 log_dir = os.path.join(os.path.dirname(os.path.abspath(__file__)), 'logs') os.makedirs(log_dir, exist_ok=True) # 配置日志 self.setup_logging(log_dir) # 加载配置 self.load_config() self.logger.info("=" * 60) self.logger.info("核云IDC VPS监控程序启动") self.logger.info(f"监控方式: {self.config.get('WAY', 'ping')}") self.logger.info(f"监控间隔: {self.config.get('SPAN', 300)}秒") self.logger.info("=" * 60) def setup_logging(self, log_dir): """配置日志系统""" # 创建logger self.logger = logging.getLogger('IDCMonitor') self.logger.setLevel(logging.DEBUG) # 清除已有handler self.logger.handlers.clear() # 正常日志文件 normal_log = os.path.join(log_dir, 'monitor.log') normal_handler = logging.FileHandler(normal_log, encoding='utf-8') normal_handler.setLevel(logging.INFO) normal_format = logging.Formatter( '%(asctime)s - %(levelname)s - %(message)s', datefmt='%Y-%m-%d %H:%M:%S' ) normal_handler.setFormatter(normal_format) self.logger.addHandler(normal_handler) # 异常日志文件 error_log = os.path.join(log_dir, 'error.log') error_handler = logging.FileHandler(error_log, encoding='utf-8') error_handler.setLevel(logging.WARNING) error_format = logging.Formatter( '%(asctime)s - %(levelname)s - [%(filename)s:%(lineno)d] - %(message)s', datefmt='%Y-%m-%d %H:%M:%S' ) error_handler.setFormatter(error_format) self.logger.addHandler(error_handler) # 控制台输出 console_handler = logging.StreamHandler(sys.stdout) console_handler.setLevel(logging.INFO) console_format = logging.Formatter( '%(asctime)s - %(levelname)s - %(message)s', datefmt='%H:%M:%S' ) console_handler.setFormatter(console_format) self.logger.addHandler(console_handler) def load_config(self): """加载配置文件""" try: with open(self.config_path, 'r', encoding='utf-8') as f: self.config = yaml.safe_load(f) # 验证必要配置 required_keys = ['ACCOUNT', 'API_KEY', 'WAY'] for key in required_keys: if key not in self.config or not self.config[key]: raise ValueError(f"配置文件中缺少必要项: {key}") # 设置默认值 self.config.setdefault('SPAN', 300) self.config.setdefault('JWT', '') self.config.setdefault('EXCEPTION_IPS', []) # 如果WAY为http,检查DOMAIN if self.config['WAY'] == 'http': if 'DOMAIN' not in self.config or not self.config['DOMAIN']: raise ValueError("WAY为http时,必须配置DOMAIN") # 加载JWT token if self.config.get('JWT'): self.jwt_token = self.config['JWT'] self.logger.info("已加载缓存的JWT Token") # 加载例外IP列表 self.exception_ips = self.config.get('EXCEPTION_IPS', []) if self.exception_ips: self.logger.info(f"已加载 {len(self.exception_ips)} 个例外IP") self.logger.info("配置文件加载成功") except FileNotFoundError: self.logger.error(f"配置文件不存在: {self.config_path}") sys.exit(1) except Exception as e: self.logger.error(f"配置文件加载失败: {str(e)}") sys.exit(1) def save_jwt_token(self, token): """保存JWT Token到配置文件""" try: self.config['JWT'] = token with open(self.config_path, 'w', encoding='utf-8') as f: yaml.dump(self.config, f, allow_unicode=True, default_flow_style=False) self.jwt_token = token self.logger.debug("JWT Token已保存到配置文件") except Exception as e: self.logger.error(f"保存JWT Token失败: {str(e)}") def get_login_token(self): """获取登录Token""" try: url = f"{self.base_url}/login_api" data = { 'account': self.config['ACCOUNT'], 'password': self.config['API_KEY'] } response = requests.post(url, data=data, timeout=10) if response.status_code == 200: result = response.json() if result.get('status') == 200 and 'jwt' in result: token = result['jwt'] self.save_jwt_token(token) self.logger.info("成功获取新的JWT Token") return token else: self.logger.error(f"登录失败: {result.get('msg', '未知错误')}") return None else: self.logger.error(f"登录请求失败,HTTP状态码: {response.status_code}") return None except Exception as e: self.logger.error(f"获取Token异常: {str(e)}") return None def get_headers(self): """获取请求头(包含JWT)""" if not self.jwt_token: self.jwt_token = self.get_login_token() if not self.jwt_token: return None return { 'Authorization': f'JWT {self.jwt_token}', 'Content-Type': 'application/json' } def get_vps_list(self): """获取VPS列表""" try: headers = self.get_headers() if not headers: return None url = f"{self.base_url}/hosts?page=1&limit=100" response = requests.get(url, headers=headers, timeout=10) if response.status_code == 200: result = response.json() if result.get('status') == 200: self.logger.debug(f"成功获取VPS列表,共{result['data']['total']}台") return result['data'] elif result.get('status') == 405: self.logger.warning("Token失效,重新获取Token") self.jwt_token = None return self.get_vps_list() # 递归调用重试 else: self.logger.error(f"获取VPS列表失败: {result.get('msg', '未知错误')}") return None else: self.logger.error(f"获取VPS列表请求失败,HTTP状态码: {response.status_code}") return None except Exception as e: self.logger.error(f"获取VPS列表异常: {str(e)}") return None def get_vps_status(self, host_id): """获取指定VPS的状态""" try: headers = self.get_headers() if not headers: return None url = f"{self.base_url}/hosts/{host_id}/module/status?type=host" response = requests.get(url, headers=headers, timeout=10) if response.status_code == 200: result = response.json() if result.get('status') == 200: return result['data'] elif result.get('status') == 405: self.logger.warning("Token失效,重新获取Token") self.jwt_token = None return self.get_vps_status(host_id) # 递归调用重试 else: self.logger.error(f"获取VPS {host_id} 状态失败: {result.get('msg', '未知错误')}") return None else: self.logger.error(f"获取VPS {host_id} 状态请求失败,HTTP状态码: {response.status_code}") return None except Exception as e: self.logger.error(f"获取VPS {host_id} 状态异常: {str(e)}") return None def power_on_vps(self, host_id): """开机指定VPS""" try: headers = self.get_headers() if not headers: return False url = f"{self.base_url}/hosts/{host_id}/module/on" response = requests.put(url, headers=headers, timeout=10) if response.status_code == 200: result = response.json() if result.get('status') == 200: self.logger.info(f"VPS {host_id} 开机指令发送成功") return True elif result.get('status') == 405: self.logger.warning("Token失效,重新获取Token") self.jwt_token = None return self.power_on_vps(host_id) # 递归调用重试 else: self.logger.error(f"VPS {host_id} 开机失败: {result.get('msg', '未知错误')}") return False else: self.logger.error(f"VPS {host_id} 开机请求失败,HTTP状态码: {response.status_code}") return False except Exception as e: self.logger.error(f"VPS {host_id} 开机异常: {str(e)}") return False def ping_host(self, ip_address): """Ping检测主机是否存活""" try: # Windows和Linux的ping命令参数不同 if sys.platform == 'win32': cmd = ['ping', '-n', '1', '-w', '2000', ip_address] else: cmd = ['ping', '-c', '1', '-W', '2', ip_address] result = subprocess.run( cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, timeout=5 ) return result.returncode == 0 except Exception as e: self.logger.debug(f"Ping {ip_address} 异常: {str(e)}") return False def check_http_host(self, domain): """HTTP HEAD检测域名是否存活""" try: response = requests.head( f"http://{domain}", timeout=5, allow_redirects=True ) return response.status_code < 400 except Exception as e: self.logger.debug(f"HTTP检测 {domain} 异常: {str(e)}") return False def detect_hosts(self): """检测所有主机存活状态""" way = self.config.get('WAY', 'ping') if way == 'ping': return self.detect_by_ping() elif way == 'http': return self.detect_by_http() else: self.logger.error(f"不支持的检测方式: {way}") return [] def detect_by_ping(self): """通过Ping检测主机""" self.logger.info("开始Ping检测所有VPS...") vps_data = self.get_vps_list() if not vps_data or 'host' not in vps_data: self.logger.error("无法获取VPS列表") return [] unreachable_hosts = [] for host in vps_data['host']: host_id = host['id'] ip = host.get('dedicatedip', '') if not ip: self.logger.warning(f"VPS {host_id} 没有IP地址,跳过") continue self.logger.debug(f"正在Ping检测: {ip} (ID: {host_id})") if not self.ping_host(ip): # 检查是否为例外IP if ip in self.exception_ips: self.logger.info(f"VPS {host_id} ({ip}) Ping不通,但属于例外IP,跳过") continue self.logger.warning(f"VPS {host_id} ({ip}) Ping不通") unreachable_hosts.append({ 'id': host_id, 'ip': ip, 'domain': host.get('domain', ''), 'product_name': host.get('product_name', '') }) if unreachable_hosts: self.logger.warning(f"发现 {len(unreachable_hosts)} 台VPS无法Ping通") else: self.logger.info("所有VPS Ping检测正常") return unreachable_hosts def detect_by_http(self): """通过HTTP检测域名""" domains_str = self.config.get('DOMAIN', '') if not domains_str: self.logger.error("未配置DOMAIN") return [] domains = [d.strip() for d in domains_str.split(',') if d.strip()] self.logger.info(f"开始HTTP检测 {len(domains)} 个域名...") # 获取VPS列表建立域名到ID的映射 vps_data = self.get_vps_list() if not vps_data or 'host' not in vps_data: self.logger.error("无法获取VPS列表") return [] # 建立域名到VPS ID的映射 domain_to_vps = {} for host in vps_data['host']: domain = host.get('domain', '') if domain: domain_to_vps[domain] = host unreachable_hosts = [] for domain in domains: self.logger.debug(f"正在HTTP检测: {domain}") if not self.check_http_host(domain): self.logger.warning(f"域名 {domain} HTTP检测失败") # 查找对应的VPS if domain in domain_to_vps: host = domain_to_vps[domain] ip = host.get('dedicatedip', '') # 检查是否为例外IP if ip and ip in self.exception_ips: self.logger.info(f"域名 {domain} 对应的VPS {host['id']} ({ip}) 属于例外IP,跳过") continue unreachable_hosts.append({ 'id': host['id'], 'ip': ip, 'domain': domain, 'product_name': host.get('product_name', '') }) else: self.logger.warning(f"域名 {domain} 未找到对应的VPS") if unreachable_hosts: self.logger.warning(f"发现 {len(unreachable_hosts)} 个域名访问异常") else: self.logger.info("所有域名HTTP检测正常") return unreachable_hosts def check_and_power_on(self, unreachable_hosts): """检查并开机无法访问的VPS""" if not unreachable_hosts: self.logger.info("未发现需要开机的VPS") return self.logger.info(f"开始检查 {len(unreachable_hosts)} 台VPS的实际状态...") need_power_on = [] all_are_on = True for host_info in unreachable_hosts: host_id = host_info['id'] self.logger.info(f"检查VPS {host_id} ({host_info.get('domain', '')}) 的实际状态...") status_data = self.get_vps_status(host_id) if status_data: status = status_data.get('status', 'unknown') des = status_data.get('des', '未知') if status == 'on': self.logger.info(f"VPS {host_id} 实际状态: {des} (开机中)") else: self.logger.warning(f"VPS {host_id} 实际状态: {des} (关机)") need_power_on.append(host_info) all_are_on = False else: self.logger.error(f"无法获取VPS {host_id} 的状态") all_are_on = False # 如果所有VPS都是开机状态,记录日志 if all_are_on: self.logger.info( "检测到所有VPS均为开机状态,可能是禁Ping或网站临时异常,无需操作" ) return # 对需要开机的VPS执行开机操作 if need_power_on: self.logger.info(f"开始对 {len(need_power_on)} 台VPS执行开机操作...") for host_info in need_power_on: host_id = host_info['id'] self.logger.info(f"正在开启VPS {host_id} ({host_info.get('domain', '')})...") self.power_on_vps(host_id) # 等待60秒后验证开机结果 self.logger.info("等待60秒后验证开机结果...") time.sleep(60) # 验证开机结果,最多尝试2次 self.verify_power_on_result(need_power_on, max_retries=2) def verify_power_on_result(self, hosts_to_verify, max_retries=2): """验证开机结果""" self.retry_count += 1 if self.retry_count > max_retries: self.logger.warning(f"已达到最大重试次数({max_retries}),仍有VPS未成功开机") for host_info in hosts_to_verify: self.logger.warning( f"VPS {host_info['id']} ({host_info.get('domain', '')}) 开机失败" ) self.retry_count = 0 return still_off = [] for host_info in hosts_to_verify: host_id = host_info['id'] self.logger.info(f"验证VPS {host_id} 开机状态...") status_data = self.get_vps_status(host_id) if status_data and status_data.get('status') == 'on': self.logger.info(f"✅ VPS {host_id} 开机成功") else: self.logger.warning(f"❌ VPS {host_id} 仍未开机") still_off.append(host_info) if still_off: self.logger.info(f"还有 {len(still_off)} 台VPS未开机,进行第{self.retry_count}次重试...") for host_info in still_off: self.power_on_vps(host_info['id']) self.logger.info("等待60秒后再次验证...") time.sleep(60) self.verify_power_on_result(still_off, max_retries) else: self.logger.info("所有VPS开机验证完成") self.retry_count = 0 def run_once(self): """执行一次监控循环""" try: self.logger.info("\n" + "=" * 60) self.logger.info(f"开始第 {int(time.time())} 时间戳的监控循环") self.logger.info("=" * 60) # 检测主机 unreachable_hosts = self.detect_hosts() # 检查并开机 self.check_and_power_on(unreachable_hosts) self.logger.info("本次监控循环完成\n") except Exception as e: self.logger.error(f"监控循环异常: {str(e)}", exc_info=True) def run(self): """运行监控程序(主循环)""" span = self.config.get('SPAN', 300) self.logger.info(f"监控程序开始运行,间隔 {span} 秒") try: while True: self.run_once() self.logger.info(f"等待 {span} 秒后进行下一次检测...") time.sleep(span) except KeyboardInterrupt: self.logger.info("收到中断信号,程序退出") except Exception as e: self.logger.error(f"程序运行异常: {str(e)}", exc_info=True) sys.exit(1) def main(): """主函数""" try: monitor = IDCMonitor() monitor.run() except Exception as e: print(f"程序启动失败: {str(e)}") sys.exit(1) if __name__ == '__main__': main()