#!/usr/bin/env python3 # -*- coding: utf-8 -*- """VPS Hub 数据库管理模块""" import os import sqlite3 from datetime import datetime class DatabaseManager: """数据库管理器""" def __init__(self, db_dir=None): if db_dir is None: db_dir = os.path.join(os.path.dirname(os.path.abspath(__file__)), 'db') self.db_dir = db_dir os.makedirs(db_dir, exist_ok=True) self.vps_db = os.path.join(db_dir, 'vps.db') self.vpslist_db = os.path.join(db_dir, 'vpslist.db') self.status_db = os.path.join(db_dir, 'status.db') self.init_vps_db() self.init_vpslist_db() self.init_status_db() def get_connection(self, db_path): """获取数据库连接""" conn = sqlite3.connect(db_path) conn.row_factory = sqlite3.Row return conn def init_vps_db(self): """初始化vps.db - VPS配置表""" conn = self.get_connection(self.vps_db) cursor = conn.cursor() cursor.execute(''' CREATE TABLE IF NOT EXISTS configs ( id INTEGER PRIMARY KEY AUTOINCREMENT, api_label TEXT NOT NULL UNIQUE, site_type TEXT NOT NULL, site_url TEXT, account TEXT NOT NULL, api_key TEXT NOT NULL, auto_monitor BOOLEAN DEFAULT 1, created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, UNIQUE(site_url, account) ) ''') conn.commit() conn.close() def init_vpslist_db(self): """初始化vpslist.db - VPS列表缓存表""" conn = self.get_connection(self.vpslist_db) cursor = conn.cursor() cursor.execute(''' CREATE TABLE IF NOT EXISTS vps_list ( id INTEGER PRIMARY KEY AUTOINCREMENT, config_id INTEGER NOT NULL, vps_id INTEGER NOT NULL, domain TEXT, ip_address TEXT, product_name TEXT, cpu_cores INTEGER, memory_size TEXT, disk_size TEXT, bandwidth TEXT, os_type TEXT, status TEXT, amount TEXT, nextduedate INTEGER, section BOOLEAN DEFAULT 0, last_check TIMESTAMP, FOREIGN KEY (config_id) REFERENCES configs(id) ) ''') cursor.execute('CREATE INDEX IF NOT EXISTS idx_vps_config ON vps_list(config_id)') cursor.execute('CREATE INDEX IF NOT EXISTS idx_vps_vps_id ON vps_list(vps_id)') cursor.execute('CREATE INDEX IF NOT EXISTS idx_vps_unique ON vps_list(vps_id, ip_address)') conn.commit() conn.close() def init_status_db(self): """初始化status.db - Ping状态记录和摘要统计表""" conn = self.get_connection(self.status_db) cursor = conn.cursor() cursor.execute(''' CREATE TABLE IF NOT EXISTS ping_status ( id INTEGER PRIMARY KEY AUTOINCREMENT, vps_id INTEGER NOT NULL, target TEXT NOT NULL, status TEXT NOT NULL, latency_ms REAL, check_time TIMESTAMP DEFAULT CURRENT_TIMESTAMP ) ''') cursor.execute(''' CREATE TABLE IF NOT EXISTS vps_summary ( id INTEGER PRIMARY KEY AUTOINCREMENT, vps_id INTEGER NOT NULL, date DATE NOT NULL, avg_latency_ms REAL, max_latency_ms REAL, min_latency_ms REAL, count_under_100 INTEGER DEFAULT 0, count_100_to_300 INTEGER DEFAULT 0, count_300_to_500 INTEGER DEFAULT 0, count_abnormal INTEGER DEFAULT 0, availability TEXT, UNIQUE(vps_id, date) ) ''') cursor.execute('CREATE INDEX IF NOT EXISTS idx_ping_vps ON ping_status(vps_id)') cursor.execute('CREATE INDEX IF NOT EXISTS idx_ping_time ON ping_status(check_time)') cursor.execute('CREATE INDEX IF NOT EXISTS idx_summary_vps ON vps_summary(vps_id)') cursor.execute('CREATE INDEX IF NOT EXISTS idx_summary_date ON vps_summary(date)') conn.commit() conn.close() # ==================== vps.db 操作 ==================== def add_config(self, api_label, site_type, account, api_key, site_url=None, auto_monitor=True): """添加VPS配置,基于site_url和account去重""" conn = self.get_connection(self.vps_db) cursor = conn.cursor() try: existing = cursor.execute( 'SELECT id FROM configs WHERE site_url = ? AND account = ?', (site_url, account) ).fetchone() if existing: return existing['id'] cursor.execute(''' INSERT INTO configs (api_label, site_type, site_url, account, api_key, auto_monitor) VALUES (?, ?, ?, ?, ?, ?) ''', (api_label, site_type, site_url, account, api_key, auto_monitor)) config_id = cursor.lastrowid conn.commit() return config_id except Exception as e: conn.rollback() raise e finally: conn.close() def get_all_configs(self): """获取所有配置""" conn = self.get_connection(self.vps_db) cursor = conn.cursor() cursor.execute('SELECT * FROM configs ORDER BY id') configs = [dict(row) for row in cursor.fetchall()] conn.close() return configs def get_config_by_id(self, config_id): """根据ID获取配置""" conn = self.get_connection(self.vps_db) cursor = conn.cursor() cursor.execute('SELECT * FROM configs WHERE id = ?', (config_id,)) row = cursor.fetchone() conn.close() return dict(row) if row else None def update_config(self, config_id, **kwargs): """更新配置""" if not kwargs: return False conn = self.get_connection(self.vps_db) cursor = conn.cursor() fields = ', '.join([f"{key} = ?" for key in kwargs.keys()]) values = list(kwargs.values()) values.append(config_id) cursor.execute(f'UPDATE configs SET {fields}, updated_at = CURRENT_TIMESTAMP WHERE id = ?', values) affected = cursor.rowcount conn.commit() conn.close() return affected > 0 def delete_config(self, config_id): """删除配置""" conn = self.get_connection(self.vps_db) cursor = conn.cursor() cursor.execute('DELETE FROM configs WHERE id = ?', (config_id,)) affected = cursor.rowcount conn.commit() conn.close() return affected > 0 def reset_config_ids(self): """重置configs表的ID序列,从1开始连续编号""" conn = self.get_connection(self.vps_db) cursor = conn.cursor() try: cursor.execute('SELECT * FROM configs ORDER BY id') configs = [dict(row) for row in cursor.fetchall()] if not configs: return True cursor.execute('DROP TABLE IF EXISTS configs_backup') cursor.execute(''' CREATE TABLE configs_backup AS SELECT * FROM configs ''') cursor.execute('DELETE FROM configs') cursor.execute("DELETE FROM sqlite_sequence WHERE name='configs'") for config in configs: cursor.execute(''' INSERT INTO configs (api_label, site_type, site_url, account, api_key, auto_monitor, created_at, updated_at) VALUES (?, ?, ?, ?, ?, ?, ?, ?) ''', ( config['api_label'], config['site_type'], config['site_url'], config['account'], config['api_key'], config['auto_monitor'], config['created_at'], config['updated_at'] )) cursor.execute('DROP TABLE IF EXISTS configs_backup') conn.commit() return True except Exception as e: conn.rollback() raise e finally: conn.close() # ==================== vpslist.db 操作 ==================== def get_next_available_id(self): """获取下一个可用的ID(填补空缺)""" conn = self.get_connection(self.vpslist_db) cursor = conn.cursor() cursor.execute('SELECT id FROM vps_list ORDER BY id') ids = [row['id'] for row in cursor.fetchall()] conn.close() if not ids: return 1 for i in range(1, max(ids) + 1): if i not in ids: return i return None def add_vps(self, config_id, vps_id, domain=None, ip_address=None, product_name=None, section=False, custom_id=None): """添加VPS到列表,支持指定ID以填补空缺""" conn = self.get_connection(self.vpslist_db) cursor = conn.cursor() if custom_id is None: custom_id = self.get_next_available_id() if custom_id: cursor.execute(''' INSERT INTO vps_list (id, config_id, vps_id, domain, ip_address, product_name, section, last_check) VALUES (?, ?, ?, ?, ?, ?, ?, CURRENT_TIMESTAMP) ''', (custom_id, config_id, vps_id, domain, ip_address, product_name, section)) else: cursor.execute(''' INSERT INTO vps_list (config_id, vps_id, domain, ip_address, product_name, section, last_check) VALUES (?, ?, ?, ?, ?, ?, CURRENT_TIMESTAMP) ''', (config_id, vps_id, domain, ip_address, product_name, section)) record_id = cursor.lastrowid conn.commit() conn.close() return record_id def batch_add_vps(self, vps_list): """批量添加VPS,基于vps_id和ip_address去重""" conn = self.get_connection(self.vpslist_db) cursor = conn.cursor() for vps in vps_list: existing = cursor.execute( 'SELECT id FROM vps_list WHERE vps_id = ? AND ip_address = ?', (vps['vps_id'], vps.get('ip_address')) ).fetchone() if existing: continue custom_id = self.get_next_available_id() if custom_id: cursor.execute(''' INSERT INTO vps_list (id, config_id, vps_id, domain, ip_address, product_name, cpu_cores, memory_size, disk_size, bandwidth, os_type, amount, nextduedate, section, last_check) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, CURRENT_TIMESTAMP) ''', ( custom_id, vps['config_id'], vps['vps_id'], vps.get('domain'), vps.get('ip_address'), vps.get('product_name'), vps.get('cpu_cores'), vps.get('memory_size'), vps.get('disk_size'), vps.get('bandwidth'), vps.get('os_type'), vps.get('amount'), vps.get('nextduedate'), vps.get('section', False) )) else: cursor.execute(''' INSERT INTO vps_list (config_id, vps_id, domain, ip_address, product_name, cpu_cores, memory_size, disk_size, bandwidth, os_type, amount, nextduedate, section, last_check) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, CURRENT_TIMESTAMP) ''', ( vps['config_id'], vps['vps_id'], vps.get('domain'), vps.get('ip_address'), vps.get('product_name'), vps.get('cpu_cores'), vps.get('memory_size'), vps.get('disk_size'), vps.get('bandwidth'), vps.get('os_type'), vps.get('amount'), vps.get('nextduedate'), vps.get('section', False) )) conn.commit() conn.close() def get_all_vps(self): """获取所有VPS""" conn = self.get_connection(self.vpslist_db) cursor = conn.cursor() cursor.execute('SELECT * FROM vps_list ORDER BY config_id, vps_id') vps_list = [dict(row) for row in cursor.fetchall()] conn.close() return vps_list def get_vps_by_config(self, config_id): """根据配置ID获取VPS列表""" conn = self.get_connection(self.vpslist_db) cursor = conn.cursor() cursor.execute('SELECT * FROM vps_list WHERE config_id = ? ORDER BY vps_id', (config_id,)) vps_list = [dict(row) for row in cursor.fetchall()] conn.close() return vps_list def update_vps_details(self, vps_id, **kwargs): """更新VPS详细信息""" if not kwargs: return False conn = self.get_connection(self.vpslist_db) cursor = conn.cursor() fields = ', '.join([f"{key} = ?" for key in kwargs.keys()]) values = list(kwargs.values()) values.append(vps_id) cursor.execute(f'UPDATE vps_list SET {fields}, last_check = CURRENT_TIMESTAMP WHERE vps_id = ?', values) affected = cursor.rowcount conn.commit() conn.close() return affected > 0 def update_vps_status(self, vps_id, status): """更新VPS状态""" return self.update_vps_details(vps_id, status=status) def get_monitored_vps(self): """获取所有标记为需要监控的VPS""" conn = self.get_connection(self.vpslist_db) cursor = conn.cursor() cursor.execute('SELECT * FROM vps_list WHERE section = 1 ORDER BY config_id, vps_id') vps_list = [dict(row) for row in cursor.fetchall()] conn.close() return vps_list def reset_vps_list_ids(self): """重置vps_list表的ID序列,从1开始连续编号""" conn = self.get_connection(self.vpslist_db) cursor = conn.cursor() try: cursor.execute('SELECT * FROM vps_list ORDER BY id') vps_items = [dict(row) for row in cursor.fetchall()] if not vps_items: return True cursor.execute('DROP TABLE IF EXISTS vps_list_backup') cursor.execute(''' CREATE TABLE vps_list_backup AS SELECT * FROM vps_list ''') cursor.execute('DELETE FROM vps_list') cursor.execute("DELETE FROM sqlite_sequence WHERE name='vps_list'") for vps in vps_items: cursor.execute(''' INSERT INTO vps_list (config_id, vps_id, domain, ip_address, product_name, cpu_cores, memory_size, disk_size, bandwidth, os_type, amount, nextduedate, status, section, last_check) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) ''', ( vps['config_id'], vps['vps_id'], vps.get('domain'), vps.get('ip_address'), vps.get('product_name'), vps.get('cpu_cores'), vps.get('memory_size'), vps.get('disk_size'), vps.get('bandwidth'), vps.get('os_type'), vps.get('amount'), vps.get('nextduedate'), vps.get('status'), vps.get('section', 0), vps.get('last_check') )) cursor.execute('DROP TABLE IF EXISTS vps_list_backup') conn.commit() return True except Exception as e: conn.rollback() raise e finally: conn.close() # ==================== status.db 操作 ==================== def save_ping_status(self, vps_id, target, status, latency_ms=None): """保存Ping状态记录""" conn = self.get_connection(self.status_db) cursor = conn.cursor() cursor.execute(''' INSERT INTO ping_status (vps_id, target, status, latency_ms) VALUES (?, ?, ?, ?) ''', (vps_id, target, status, latency_ms)) conn.commit() conn.close() def get_ping_records(self, vps_id, date=None): """获取Ping记录""" conn = self.get_connection(self.status_db) cursor = conn.cursor() if date: cursor.execute(''' SELECT * FROM ping_status WHERE vps_id = ? AND DATE(check_time) = ? ORDER BY check_time ''', (vps_id, date)) else: cursor.execute(''' SELECT * FROM ping_status WHERE vps_id = ? ORDER BY check_time DESC ''', (vps_id,)) records = [dict(row) for row in cursor.fetchall()] conn.close() return records def cleanup_old_ping_records(self, days=30): """清理旧的Ping记录""" from datetime import timedelta conn = self.get_connection(self.status_db) cursor = conn.cursor() cutoff_date = (datetime.now() - timedelta(days=days)).strftime('%Y-%m-%d %H:%M:%S') cursor.execute('DELETE FROM ping_status WHERE check_time < ?', (cutoff_date,)) deleted = cursor.rowcount conn.commit() conn.close() return deleted def save_vps_summary(self, vps_id, date, avg_latency, max_latency, min_latency, count_under_100, count_100_to_300, count_300_to_500, count_abnormal, availability): """保存VPS摘要统计""" conn = self.get_connection(self.status_db) cursor = conn.cursor() cursor.execute(''' INSERT OR REPLACE INTO vps_summary (vps_id, date, avg_latency_ms, max_latency_ms, min_latency_ms, count_under_100, count_100_to_300, count_300_to_500, count_abnormal, availability) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?) ''', (vps_id, date, avg_latency, max_latency, min_latency, count_under_100, count_100_to_300, count_300_to_500, count_abnormal, availability)) conn.commit() conn.close() def get_vps_summary(self, vps_id, date=None): """获取VPS摘要统计""" conn = self.get_connection(self.status_db) cursor = conn.cursor() if date: cursor.execute('SELECT * FROM vps_summary WHERE vps_id = ? AND date = ?', (vps_id, date)) else: cursor.execute('SELECT * FROM vps_summary WHERE vps_id = ? ORDER BY date DESC LIMIT 1', (vps_id,)) row = cursor.fetchone() conn.close() return dict(row) if row else None def get_all_summaries(self, date=None): """获取所有VPS的摘要统计""" conn = self.get_connection(self.status_db) cursor = conn.cursor() if date: cursor.execute('SELECT * FROM vps_summary WHERE date = ? ORDER BY vps_id', (date,)) else: cursor.execute(''' SELECT vs.* FROM vps_summary vs INNER JOIN ( SELECT vps_id, MAX(date) as max_date FROM vps_summary GROUP BY vps_id ) latest ON vs.vps_id = latest.vps_id AND vs.date = latest.max_date ORDER BY vs.vps_id ''') summaries = [dict(row) for row in cursor.fetchall()] conn.close() return summaries if __name__ == '__main__': # 测试代码 db = DatabaseManager() print("数据库初始化完成") print(f"vps.db: {db.vps_db}") print(f"vpslist.db: {db.vpslist_db}") print(f"status.db: {db.status_db}")