VPSHUB/app/db_manager.py
2026-05-30 15:06:24 +08:00

594 lines
21 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""VPS Hub 数据库管理模块"""
import os
import sqlite3
from datetime import datetime
class DatabaseManager:
"""数据库管理器"""
def __init__(self, db_dir=None):
if db_dir is None:
db_dir = os.path.join(os.path.dirname(os.path.abspath(__file__)), 'db')
self.db_dir = db_dir
os.makedirs(db_dir, exist_ok=True)
self.vps_db = os.path.join(db_dir, 'vps.db')
self.vpslist_db = os.path.join(db_dir, 'vpslist.db')
self.status_db = os.path.join(db_dir, 'status.db')
self.init_vps_db()
self.init_vpslist_db()
self.init_status_db()
def get_connection(self, db_path):
"""获取数据库连接"""
conn = sqlite3.connect(db_path)
conn.row_factory = sqlite3.Row
return conn
def init_vps_db(self):
"""初始化vps.db - VPS配置表"""
conn = self.get_connection(self.vps_db)
cursor = conn.cursor()
cursor.execute('''
CREATE TABLE IF NOT EXISTS configs (
id INTEGER PRIMARY KEY AUTOINCREMENT,
api_label TEXT NOT NULL UNIQUE,
site_type TEXT NOT NULL,
site_url TEXT,
account TEXT NOT NULL,
api_key TEXT NOT NULL,
auto_monitor BOOLEAN DEFAULT 1,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
UNIQUE(site_url, account)
)
''')
conn.commit()
conn.close()
def init_vpslist_db(self):
"""初始化vpslist.db - VPS列表缓存表"""
conn = self.get_connection(self.vpslist_db)
cursor = conn.cursor()
cursor.execute('''
CREATE TABLE IF NOT EXISTS vps_list (
id INTEGER PRIMARY KEY AUTOINCREMENT,
config_id INTEGER NOT NULL,
vps_id INTEGER NOT NULL,
domain TEXT,
ip_address TEXT,
product_name TEXT,
cpu_cores INTEGER,
memory_size TEXT,
disk_size TEXT,
bandwidth TEXT,
os_type TEXT,
status TEXT,
amount TEXT,
nextduedate INTEGER,
section BOOLEAN DEFAULT 0,
last_check TIMESTAMP,
FOREIGN KEY (config_id) REFERENCES configs(id)
)
''')
cursor.execute('CREATE INDEX IF NOT EXISTS idx_vps_config ON vps_list(config_id)')
cursor.execute('CREATE INDEX IF NOT EXISTS idx_vps_vps_id ON vps_list(vps_id)')
cursor.execute('CREATE INDEX IF NOT EXISTS idx_vps_unique ON vps_list(vps_id, ip_address)')
conn.commit()
conn.close()
def init_status_db(self):
"""初始化status.db - Ping状态记录和摘要统计表"""
conn = self.get_connection(self.status_db)
cursor = conn.cursor()
cursor.execute('''
CREATE TABLE IF NOT EXISTS ping_status (
id INTEGER PRIMARY KEY AUTOINCREMENT,
vps_id INTEGER NOT NULL,
target TEXT NOT NULL,
status TEXT NOT NULL,
latency_ms REAL,
check_time TIMESTAMP DEFAULT CURRENT_TIMESTAMP
)
''')
cursor.execute('''
CREATE TABLE IF NOT EXISTS vps_summary (
id INTEGER PRIMARY KEY AUTOINCREMENT,
vps_id INTEGER NOT NULL,
date DATE NOT NULL,
avg_latency_ms REAL,
max_latency_ms REAL,
min_latency_ms REAL,
count_under_100 INTEGER DEFAULT 0,
count_100_to_300 INTEGER DEFAULT 0,
count_300_to_500 INTEGER DEFAULT 0,
count_abnormal INTEGER DEFAULT 0,
availability TEXT,
UNIQUE(vps_id, date)
)
''')
cursor.execute('CREATE INDEX IF NOT EXISTS idx_ping_vps ON ping_status(vps_id)')
cursor.execute('CREATE INDEX IF NOT EXISTS idx_ping_time ON ping_status(check_time)')
cursor.execute('CREATE INDEX IF NOT EXISTS idx_summary_vps ON vps_summary(vps_id)')
cursor.execute('CREATE INDEX IF NOT EXISTS idx_summary_date ON vps_summary(date)')
conn.commit()
conn.close()
# ==================== vps.db 操作 ====================
def add_config(self, api_label, site_type, account, api_key, site_url=None, auto_monitor=True):
"""添加VPS配置基于site_url和account去重"""
conn = self.get_connection(self.vps_db)
cursor = conn.cursor()
try:
existing = cursor.execute(
'SELECT id FROM configs WHERE site_url = ? AND account = ?',
(site_url, account)
).fetchone()
if existing:
return existing['id']
cursor.execute('''
INSERT INTO configs (api_label, site_type, site_url, account, api_key, auto_monitor)
VALUES (?, ?, ?, ?, ?, ?)
''', (api_label, site_type, site_url, account, api_key, auto_monitor))
config_id = cursor.lastrowid
conn.commit()
return config_id
except Exception as e:
conn.rollback()
raise e
finally:
conn.close()
def get_all_configs(self):
"""获取所有配置"""
conn = self.get_connection(self.vps_db)
cursor = conn.cursor()
cursor.execute('SELECT * FROM configs ORDER BY id')
configs = [dict(row) for row in cursor.fetchall()]
conn.close()
return configs
def get_config_by_id(self, config_id):
"""根据ID获取配置"""
conn = self.get_connection(self.vps_db)
cursor = conn.cursor()
cursor.execute('SELECT * FROM configs WHERE id = ?', (config_id,))
row = cursor.fetchone()
conn.close()
return dict(row) if row else None
def update_config(self, config_id, **kwargs):
"""更新配置"""
if not kwargs:
return False
conn = self.get_connection(self.vps_db)
cursor = conn.cursor()
fields = ', '.join([f"{key} = ?" for key in kwargs.keys()])
values = list(kwargs.values())
values.append(config_id)
cursor.execute(f'UPDATE configs SET {fields}, updated_at = CURRENT_TIMESTAMP WHERE id = ?', values)
affected = cursor.rowcount
conn.commit()
conn.close()
return affected > 0
def delete_config(self, config_id):
"""删除配置"""
conn = self.get_connection(self.vps_db)
cursor = conn.cursor()
cursor.execute('DELETE FROM configs WHERE id = ?', (config_id,))
affected = cursor.rowcount
conn.commit()
conn.close()
return affected > 0
def reset_config_ids(self):
"""重置configs表的ID序列从1开始连续编号"""
conn = self.get_connection(self.vps_db)
cursor = conn.cursor()
try:
cursor.execute('SELECT * FROM configs ORDER BY id')
configs = [dict(row) for row in cursor.fetchall()]
if not configs:
return True
cursor.execute('DROP TABLE IF EXISTS configs_backup')
cursor.execute('''
CREATE TABLE configs_backup AS SELECT * FROM configs
''')
cursor.execute('DELETE FROM configs')
cursor.execute("DELETE FROM sqlite_sequence WHERE name='configs'")
for config in configs:
cursor.execute('''
INSERT INTO configs (api_label, site_type, site_url, account, api_key, auto_monitor, created_at, updated_at)
VALUES (?, ?, ?, ?, ?, ?, ?, ?)
''', (
config['api_label'],
config['site_type'],
config['site_url'],
config['account'],
config['api_key'],
config['auto_monitor'],
config['created_at'],
config['updated_at']
))
cursor.execute('DROP TABLE IF EXISTS configs_backup')
conn.commit()
return True
except Exception as e:
conn.rollback()
raise e
finally:
conn.close()
# ==================== vpslist.db 操作 ====================
def get_next_available_id(self):
"""获取下一个可用的ID填补空缺"""
conn = self.get_connection(self.vpslist_db)
cursor = conn.cursor()
cursor.execute('SELECT id FROM vps_list ORDER BY id')
ids = [row['id'] for row in cursor.fetchall()]
conn.close()
if not ids:
return 1
for i in range(1, max(ids) + 1):
if i not in ids:
return i
return None
def add_vps(self, config_id, vps_id, domain=None, ip_address=None, product_name=None, section=False, custom_id=None):
"""添加VPS到列表支持指定ID以填补空缺"""
conn = self.get_connection(self.vpslist_db)
cursor = conn.cursor()
if custom_id is None:
custom_id = self.get_next_available_id()
if custom_id:
cursor.execute('''
INSERT INTO vps_list (id, config_id, vps_id, domain, ip_address, product_name, section, last_check)
VALUES (?, ?, ?, ?, ?, ?, ?, CURRENT_TIMESTAMP)
''', (custom_id, config_id, vps_id, domain, ip_address, product_name, section))
else:
cursor.execute('''
INSERT INTO vps_list (config_id, vps_id, domain, ip_address, product_name, section, last_check)
VALUES (?, ?, ?, ?, ?, ?, CURRENT_TIMESTAMP)
''', (config_id, vps_id, domain, ip_address, product_name, section))
record_id = cursor.lastrowid
conn.commit()
conn.close()
return record_id
def batch_add_vps(self, vps_list):
"""批量添加VPS基于vps_id和ip_address去重"""
conn = self.get_connection(self.vpslist_db)
cursor = conn.cursor()
for vps in vps_list:
existing = cursor.execute(
'SELECT id FROM vps_list WHERE vps_id = ? AND ip_address = ?',
(vps['vps_id'], vps.get('ip_address'))
).fetchone()
if existing:
continue
custom_id = self.get_next_available_id()
if custom_id:
cursor.execute('''
INSERT INTO vps_list
(id, config_id, vps_id, domain, ip_address, product_name, cpu_cores, memory_size, disk_size, bandwidth, os_type, amount, nextduedate, section, last_check)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, CURRENT_TIMESTAMP)
''', (
custom_id,
vps['config_id'],
vps['vps_id'],
vps.get('domain'),
vps.get('ip_address'),
vps.get('product_name'),
vps.get('cpu_cores'),
vps.get('memory_size'),
vps.get('disk_size'),
vps.get('bandwidth'),
vps.get('os_type'),
vps.get('amount'),
vps.get('nextduedate'),
vps.get('section', False)
))
else:
cursor.execute('''
INSERT INTO vps_list
(config_id, vps_id, domain, ip_address, product_name, cpu_cores, memory_size, disk_size, bandwidth, os_type, amount, nextduedate, section, last_check)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, CURRENT_TIMESTAMP)
''', (
vps['config_id'],
vps['vps_id'],
vps.get('domain'),
vps.get('ip_address'),
vps.get('product_name'),
vps.get('cpu_cores'),
vps.get('memory_size'),
vps.get('disk_size'),
vps.get('bandwidth'),
vps.get('os_type'),
vps.get('amount'),
vps.get('nextduedate'),
vps.get('section', False)
))
conn.commit()
conn.close()
def get_all_vps(self):
"""获取所有VPS"""
conn = self.get_connection(self.vpslist_db)
cursor = conn.cursor()
cursor.execute('SELECT * FROM vps_list ORDER BY config_id, vps_id')
vps_list = [dict(row) for row in cursor.fetchall()]
conn.close()
return vps_list
def get_vps_by_config(self, config_id):
"""根据配置ID获取VPS列表"""
conn = self.get_connection(self.vpslist_db)
cursor = conn.cursor()
cursor.execute('SELECT * FROM vps_list WHERE config_id = ? ORDER BY vps_id', (config_id,))
vps_list = [dict(row) for row in cursor.fetchall()]
conn.close()
return vps_list
def update_vps_details(self, vps_id, **kwargs):
"""更新VPS详细信息"""
if not kwargs:
return False
conn = self.get_connection(self.vpslist_db)
cursor = conn.cursor()
fields = ', '.join([f"{key} = ?" for key in kwargs.keys()])
values = list(kwargs.values())
values.append(vps_id)
cursor.execute(f'UPDATE vps_list SET {fields}, last_check = CURRENT_TIMESTAMP WHERE vps_id = ?', values)
affected = cursor.rowcount
conn.commit()
conn.close()
return affected > 0
def update_vps_status(self, vps_id, status):
"""更新VPS状态"""
return self.update_vps_details(vps_id, status=status)
def get_monitored_vps(self):
"""获取所有标记为需要监控的VPS"""
conn = self.get_connection(self.vpslist_db)
cursor = conn.cursor()
cursor.execute('SELECT * FROM vps_list WHERE section = 1 ORDER BY config_id, vps_id')
vps_list = [dict(row) for row in cursor.fetchall()]
conn.close()
return vps_list
def reset_vps_list_ids(self):
"""重置vps_list表的ID序列从1开始连续编号"""
conn = self.get_connection(self.vpslist_db)
cursor = conn.cursor()
try:
cursor.execute('SELECT * FROM vps_list ORDER BY id')
vps_items = [dict(row) for row in cursor.fetchall()]
if not vps_items:
return True
cursor.execute('DROP TABLE IF EXISTS vps_list_backup')
cursor.execute('''
CREATE TABLE vps_list_backup AS SELECT * FROM vps_list
''')
cursor.execute('DELETE FROM vps_list')
cursor.execute("DELETE FROM sqlite_sequence WHERE name='vps_list'")
for vps in vps_items:
cursor.execute('''
INSERT INTO vps_list (config_id, vps_id, domain, ip_address, product_name, cpu_cores, memory_size, disk_size, bandwidth, os_type, amount, nextduedate, status, section, last_check)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
''', (
vps['config_id'],
vps['vps_id'],
vps.get('domain'),
vps.get('ip_address'),
vps.get('product_name'),
vps.get('cpu_cores'),
vps.get('memory_size'),
vps.get('disk_size'),
vps.get('bandwidth'),
vps.get('os_type'),
vps.get('amount'),
vps.get('nextduedate'),
vps.get('status'),
vps.get('section', 0),
vps.get('last_check')
))
cursor.execute('DROP TABLE IF EXISTS vps_list_backup')
conn.commit()
return True
except Exception as e:
conn.rollback()
raise e
finally:
conn.close()
# ==================== status.db 操作 ====================
def save_ping_status(self, vps_id, target, status, latency_ms=None):
"""保存Ping状态记录"""
conn = self.get_connection(self.status_db)
cursor = conn.cursor()
cursor.execute('''
INSERT INTO ping_status (vps_id, target, status, latency_ms)
VALUES (?, ?, ?, ?)
''', (vps_id, target, status, latency_ms))
conn.commit()
conn.close()
def get_ping_records(self, vps_id, date=None):
"""获取Ping记录"""
conn = self.get_connection(self.status_db)
cursor = conn.cursor()
if date:
cursor.execute('''
SELECT * FROM ping_status
WHERE vps_id = ? AND DATE(check_time) = ?
ORDER BY check_time
''', (vps_id, date))
else:
cursor.execute('''
SELECT * FROM ping_status
WHERE vps_id = ?
ORDER BY check_time DESC
''', (vps_id,))
records = [dict(row) for row in cursor.fetchall()]
conn.close()
return records
def cleanup_old_ping_records(self, days=30):
"""清理旧的Ping记录"""
from datetime import timedelta
conn = self.get_connection(self.status_db)
cursor = conn.cursor()
cutoff_date = (datetime.now() - timedelta(days=days)).strftime('%Y-%m-%d %H:%M:%S')
cursor.execute('DELETE FROM ping_status WHERE check_time < ?', (cutoff_date,))
deleted = cursor.rowcount
conn.commit()
conn.close()
return deleted
def save_vps_summary(self, vps_id, date, avg_latency, max_latency, min_latency,
count_under_100, count_100_to_300, count_300_to_500,
count_abnormal, availability):
"""保存VPS摘要统计"""
conn = self.get_connection(self.status_db)
cursor = conn.cursor()
cursor.execute('''
INSERT OR REPLACE INTO vps_summary
(vps_id, date, avg_latency_ms, max_latency_ms, min_latency_ms,
count_under_100, count_100_to_300, count_300_to_500, count_abnormal, availability)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
''', (vps_id, date, avg_latency, max_latency, min_latency,
count_under_100, count_100_to_300, count_300_to_500,
count_abnormal, availability))
conn.commit()
conn.close()
def get_vps_summary(self, vps_id, date=None):
"""获取VPS摘要统计"""
conn = self.get_connection(self.status_db)
cursor = conn.cursor()
if date:
cursor.execute('SELECT * FROM vps_summary WHERE vps_id = ? AND date = ?', (vps_id, date))
else:
cursor.execute('SELECT * FROM vps_summary WHERE vps_id = ? ORDER BY date DESC LIMIT 1', (vps_id,))
row = cursor.fetchone()
conn.close()
return dict(row) if row else None
def get_all_summaries(self, date=None):
"""获取所有VPS的摘要统计"""
conn = self.get_connection(self.status_db)
cursor = conn.cursor()
if date:
cursor.execute('SELECT * FROM vps_summary WHERE date = ? ORDER BY vps_id', (date,))
else:
cursor.execute('''
SELECT vs.* FROM vps_summary vs
INNER JOIN (
SELECT vps_id, MAX(date) as max_date
FROM vps_summary
GROUP BY vps_id
) latest ON vs.vps_id = latest.vps_id AND vs.date = latest.max_date
ORDER BY vs.vps_id
''')
summaries = [dict(row) for row in cursor.fetchall()]
conn.close()
return summaries
if __name__ == '__main__':
# 测试代码
db = DatabaseManager()
print("数据库初始化完成")
print(f"vps.db: {db.vps_db}")
print(f"vpslist.db: {db.vpslist_db}")
print(f"status.db: {db.status_db}")