#!/usr/bin/env python3 # -*- coding: utf-8 -*- """ VPS Hub 数据库管理模块 负责初始化和管理三个SQLite数据库: vps.db, vpslist.db, status.db """ import os import sqlite3 from datetime import datetime class DatabaseManager: """数据库管理器""" def __init__(self, db_dir=None): """初始化数据库管理器 Args: db_dir: 数据库文件目录,默认为app/db/ """ if db_dir is None: db_dir = os.path.join(os.path.dirname(os.path.abspath(__file__)), 'db') self.db_dir = db_dir os.makedirs(db_dir, exist_ok=True) # 数据库文件路径 self.vps_db = os.path.join(db_dir, 'vps.db') self.vpslist_db = os.path.join(db_dir, 'vpslist.db') self.status_db = os.path.join(db_dir, 'status.db') # 初始化所有数据库 self.init_vps_db() self.init_vpslist_db() self.init_status_db() def get_connection(self, db_path): """获取数据库连接 Args: db_path: 数据库文件路径 Returns: SQLite连接对象 """ conn = sqlite3.connect(db_path) conn.row_factory = sqlite3.Row # 使结果可以通过列名访问 return conn def init_vps_db(self): """初始化vps.db - VPS配置表""" conn = self.get_connection(self.vps_db) cursor = conn.cursor() cursor.execute(''' CREATE TABLE IF NOT EXISTS configs ( id INTEGER PRIMARY KEY AUTOINCREMENT, api_label TEXT NOT NULL UNIQUE, site_type TEXT NOT NULL, site_url TEXT, account TEXT NOT NULL, api_key TEXT NOT NULL, auto_monitor BOOLEAN DEFAULT 1, created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ) ''') conn.commit() conn.close() def init_vpslist_db(self): """初始化vpslist.db - VPS列表缓存表""" conn = self.get_connection(self.vpslist_db) cursor = conn.cursor() cursor.execute(''' CREATE TABLE IF NOT EXISTS vps_list ( id INTEGER PRIMARY KEY AUTOINCREMENT, config_id INTEGER NOT NULL, vps_id INTEGER NOT NULL, domain TEXT, ip_address TEXT, product_name TEXT, cpu_cores INTEGER, memory_size TEXT, disk_size TEXT, bandwidth TEXT, os_type TEXT, status TEXT, section BOOLEAN DEFAULT 0, last_check TIMESTAMP, FOREIGN KEY (config_id) REFERENCES configs(id), UNIQUE(config_id, vps_id) ) ''') # 创建索引以提高查询性能 cursor.execute('CREATE INDEX IF NOT EXISTS idx_vps_config ON vps_list(config_id)') cursor.execute('CREATE INDEX IF NOT EXISTS idx_vps_vps_id ON vps_list(vps_id)') conn.commit() conn.close() def init_status_db(self): """初始化status.db - Ping状态记录和摘要统计表""" conn = self.get_connection(self.status_db) cursor = conn.cursor() # Ping状态记录表 cursor.execute(''' CREATE TABLE IF NOT EXISTS ping_status ( id INTEGER PRIMARY KEY AUTOINCREMENT, vps_id INTEGER NOT NULL, target TEXT NOT NULL, status TEXT NOT NULL, latency_ms REAL, check_time TIMESTAMP DEFAULT CURRENT_TIMESTAMP ) ''') # VPS摘要统计表 cursor.execute(''' CREATE TABLE IF NOT EXISTS vps_summary ( id INTEGER PRIMARY KEY AUTOINCREMENT, vps_id INTEGER NOT NULL, date DATE NOT NULL, avg_latency_ms REAL, max_latency_ms REAL, min_latency_ms REAL, count_under_100 INTEGER DEFAULT 0, count_100_to_300 INTEGER DEFAULT 0, count_300_to_500 INTEGER DEFAULT 0, count_abnormal INTEGER DEFAULT 0, availability TEXT, UNIQUE(vps_id, date) ) ''') # 创建索引 cursor.execute('CREATE INDEX IF NOT EXISTS idx_ping_vps ON ping_status(vps_id)') cursor.execute('CREATE INDEX IF NOT EXISTS idx_ping_time ON ping_status(check_time)') cursor.execute('CREATE INDEX IF NOT EXISTS idx_summary_vps ON vps_summary(vps_id)') cursor.execute('CREATE INDEX IF NOT EXISTS idx_summary_date ON vps_summary(date)') conn.commit() conn.close() # ==================== vps.db 操作 ==================== def add_config(self, api_label, site_type, account, api_key, site_url=None, auto_monitor=True): """添加VPS配置 Args: api_label: API标识(必填,唯一) site_type: 网站类型 (mofang/aliyun/tencent) account: 账户 api_key: API密钥 site_url: 网站链接 auto_monitor: 是否开启自动监控 Returns: 新配置的ID """ conn = self.get_connection(self.vps_db) cursor = conn.cursor() try: cursor.execute(''' INSERT INTO configs (api_label, site_type, site_url, account, api_key, auto_monitor) VALUES (?, ?, ?, ?, ?, ?) ''', (api_label, site_type, site_url, account, api_key, auto_monitor)) config_id = cursor.lastrowid conn.commit() return config_id except Exception as e: conn.rollback() raise e finally: conn.close() def get_all_configs(self): """获取所有配置 Returns: 配置列表 """ conn = self.get_connection(self.vps_db) cursor = conn.cursor() cursor.execute('SELECT * FROM configs ORDER BY id') configs = [dict(row) for row in cursor.fetchall()] conn.close() return configs def get_config_by_id(self, config_id): """根据ID获取配置 Args: config_id: 配置ID Returns: 配置字典或None """ conn = self.get_connection(self.vps_db) cursor = conn.cursor() cursor.execute('SELECT * FROM configs WHERE id = ?', (config_id,)) row = cursor.fetchone() conn.close() return dict(row) if row else None def update_config(self, config_id, **kwargs): """更新配置 Args: config_id: 配置ID **kwargs: 要更新的字段 Returns: 是否成功 """ if not kwargs: return False conn = self.get_connection(self.vps_db) cursor = conn.cursor() # 构建UPDATE语句 fields = ', '.join([f"{key} = ?" for key in kwargs.keys()]) values = list(kwargs.values()) values.append(config_id) cursor.execute(f'UPDATE configs SET {fields}, updated_at = CURRENT_TIMESTAMP WHERE id = ?', values) affected = cursor.rowcount conn.commit() conn.close() return affected > 0 def delete_config(self, config_id): """删除配置 Args: config_id: 配置ID Returns: 是否成功 """ conn = self.get_connection(self.vps_db) cursor = conn.cursor() cursor.execute('DELETE FROM configs WHERE id = ?', (config_id,)) affected = cursor.rowcount conn.commit() conn.close() return affected > 0 # ==================== vpslist.db 操作 ==================== def add_vps(self, config_id, vps_id, domain=None, ip_address=None, product_name=None, section=False): """添加VPS到列表 Args: config_id: 配置ID vps_id: VPS在平台的ID domain: 域名 ip_address: IP地址 product_name: 产品名称 section: 是否标记为需要监控 Returns: 新记录的ID """ conn = self.get_connection(self.vpslist_db) cursor = conn.cursor() cursor.execute(''' INSERT INTO vps_list (config_id, vps_id, domain, ip_address, product_name, section, last_check) VALUES (?, ?, ?, ?, ?, ?, CURRENT_TIMESTAMP) ''', (config_id, vps_id, domain, ip_address, product_name, section)) record_id = cursor.lastrowid conn.commit() conn.close() return record_id def batch_add_vps(self, vps_list): """批量添加VPS Args: vps_list: VPS信息列表,每个元素是字典 """ conn = self.get_connection(self.vpslist_db) cursor = conn.cursor() for vps in vps_list: cursor.execute(''' INSERT OR REPLACE INTO vps_list (config_id, vps_id, domain, ip_address, product_name, cpu_cores, memory_size, disk_size, bandwidth, os_type, section, last_check) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, CURRENT_TIMESTAMP) ''', ( vps['config_id'], vps['vps_id'], vps.get('domain'), vps.get('ip_address'), vps.get('product_name'), vps.get('cpu_cores'), vps.get('memory_size'), vps.get('disk_size'), vps.get('bandwidth'), vps.get('os_type'), vps.get('section', False) )) conn.commit() conn.close() def get_all_vps(self): """获取所有VPS Returns: VPS列表 """ conn = self.get_connection(self.vpslist_db) cursor = conn.cursor() cursor.execute('SELECT * FROM vps_list ORDER BY config_id, vps_id') vps_list = [dict(row) for row in cursor.fetchall()] conn.close() return vps_list def get_vps_by_config(self, config_id): """根据配置ID获取VPS列表 Args: config_id: 配置ID Returns: VPS列表 """ conn = self.get_connection(self.vpslist_db) cursor = conn.cursor() cursor.execute('SELECT * FROM vps_list WHERE config_id = ? ORDER BY vps_id', (config_id,)) vps_list = [dict(row) for row in cursor.fetchall()] conn.close() return vps_list def update_vps_details(self, vps_id, **kwargs): """更新VPS详细信息 Args: vps_id: VPS ID **kwargs: 要更新的字段 Returns: 是否成功 """ if not kwargs: return False conn = self.get_connection(self.vpslist_db) cursor = conn.cursor() fields = ', '.join([f"{key} = ?" for key in kwargs.keys()]) values = list(kwargs.values()) values.append(vps_id) cursor.execute(f'UPDATE vps_list SET {fields}, last_check = CURRENT_TIMESTAMP WHERE vps_id = ?', values) affected = cursor.rowcount conn.commit() conn.close() return affected > 0 def update_vps_status(self, vps_id, status): """更新VPS状态 Args: vps_id: VPS ID status: 状态 (on/off/unknown) Returns: 是否成功 """ return self.update_vps_details(vps_id, status=status) def get_monitored_vps(self): """获取所有标记为需要监控的VPS Returns: VPS列表 """ conn = self.get_connection(self.vpslist_db) cursor = conn.cursor() cursor.execute('SELECT * FROM vps_list WHERE section = 1 ORDER BY config_id, vps_id') vps_list = [dict(row) for row in cursor.fetchall()] conn.close() return vps_list # ==================== status.db 操作 ==================== def save_ping_status(self, vps_id, target, status, latency_ms=None): """保存Ping状态记录 Args: vps_id: VPS ID target: 目标(IP或域名) status: 状态 (normal/abnormal) latency_ms: 延迟(ms) """ conn = self.get_connection(self.status_db) cursor = conn.cursor() cursor.execute(''' INSERT INTO ping_status (vps_id, target, status, latency_ms) VALUES (?, ?, ?, ?) ''', (vps_id, target, status, latency_ms)) conn.commit() conn.close() def get_ping_records(self, vps_id, date=None): """获取Ping记录 Args: vps_id: VPS ID date: 日期 (YYYY-MM-DD),为空则获取所有记录 Returns: Ping记录列表 """ conn = self.get_connection(self.status_db) cursor = conn.cursor() if date: cursor.execute(''' SELECT * FROM ping_status WHERE vps_id = ? AND DATE(check_time) = ? ORDER BY check_time ''', (vps_id, date)) else: cursor.execute(''' SELECT * FROM ping_status WHERE vps_id = ? ORDER BY check_time DESC ''', (vps_id,)) records = [dict(row) for row in cursor.fetchall()] conn.close() return records def cleanup_old_ping_records(self, days=30): """清理旧的Ping记录 Args: days: 保留天数,默认30天 """ from datetime import timedelta conn = self.get_connection(self.status_db) cursor = conn.cursor() cutoff_date = (datetime.now() - timedelta(days=days)).strftime('%Y-%m-%d %H:%M:%S') cursor.execute('DELETE FROM ping_status WHERE check_time < ?', (cutoff_date,)) deleted = cursor.rowcount conn.commit() conn.close() return deleted def save_vps_summary(self, vps_id, date, avg_latency, max_latency, min_latency, count_under_100, count_100_to_300, count_300_to_500, count_abnormal, availability): """保存VPS摘要统计 Args: vps_id: VPS ID date: 日期 (YYYY-MM-DD) avg_latency: 平均延迟 max_latency: 最大延迟 min_latency: 最小延迟 count_under_100: <100ms次数 count_100_to_300: 100-300ms次数 count_300_to_500: 300-500ms次数 count_abnormal: 异常次数 availability: 可用性评分 """ conn = self.get_connection(self.status_db) cursor = conn.cursor() cursor.execute(''' INSERT OR REPLACE INTO vps_summary (vps_id, date, avg_latency_ms, max_latency_ms, min_latency_ms, count_under_100, count_100_to_300, count_300_to_500, count_abnormal, availability) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?) ''', (vps_id, date, avg_latency, max_latency, min_latency, count_under_100, count_100_to_300, count_300_to_500, count_abnormal, availability)) conn.commit() conn.close() def get_vps_summary(self, vps_id, date=None): """获取VPS摘要统计 Args: vps_id: VPS ID date: 日期,为空则获取最新一条 Returns: 摘要统计字典或None """ conn = self.get_connection(self.status_db) cursor = conn.cursor() if date: cursor.execute('SELECT * FROM vps_summary WHERE vps_id = ? AND date = ?', (vps_id, date)) else: cursor.execute('SELECT * FROM vps_summary WHERE vps_id = ? ORDER BY date DESC LIMIT 1', (vps_id,)) row = cursor.fetchone() conn.close() return dict(row) if row else None def get_all_summaries(self, date=None): """获取所有VPS的摘要统计 Args: date: 日期,为空则获取最新 Returns: 摘要统计列表 """ conn = self.get_connection(self.status_db) cursor = conn.cursor() if date: cursor.execute('SELECT * FROM vps_summary WHERE date = ? ORDER BY vps_id', (date,)) else: # 获取每个VPS的最新摘要 cursor.execute(''' SELECT vs.* FROM vps_summary vs INNER JOIN ( SELECT vps_id, MAX(date) as max_date FROM vps_summary GROUP BY vps_id ) latest ON vs.vps_id = latest.vps_id AND vs.date = latest.max_date ORDER BY vs.vps_id ''') summaries = [dict(row) for row in cursor.fetchall()] conn.close() return summaries if __name__ == '__main__': # 测试代码 db = DatabaseManager() print("数据库初始化完成") print(f"vps.db: {db.vps_db}") print(f"vpslist.db: {db.vpslist_db}") print(f"status.db: {db.status_db}")