VPSHUB/app/db_manager.py
2026-05-29 23:09:58 +08:00

583 lines
18 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
VPS Hub 数据库管理模块
负责初始化和管理三个SQLite数据库: vps.db, vpslist.db, status.db
"""
import os
import sqlite3
from datetime import datetime
class DatabaseManager:
"""数据库管理器"""
def __init__(self, db_dir=None):
"""初始化数据库管理器
Args:
db_dir: 数据库文件目录,默认为app/db/
"""
if db_dir is None:
db_dir = os.path.join(os.path.dirname(os.path.abspath(__file__)), 'db')
self.db_dir = db_dir
os.makedirs(db_dir, exist_ok=True)
# 数据库文件路径
self.vps_db = os.path.join(db_dir, 'vps.db')
self.vpslist_db = os.path.join(db_dir, 'vpslist.db')
self.status_db = os.path.join(db_dir, 'status.db')
# 初始化所有数据库
self.init_vps_db()
self.init_vpslist_db()
self.init_status_db()
def get_connection(self, db_path):
"""获取数据库连接
Args:
db_path: 数据库文件路径
Returns:
SQLite连接对象
"""
conn = sqlite3.connect(db_path)
conn.row_factory = sqlite3.Row # 使结果可以通过列名访问
return conn
def init_vps_db(self):
"""初始化vps.db - VPS配置表"""
conn = self.get_connection(self.vps_db)
cursor = conn.cursor()
cursor.execute('''
CREATE TABLE IF NOT EXISTS configs (
id INTEGER PRIMARY KEY AUTOINCREMENT,
api_label TEXT NOT NULL UNIQUE,
site_type TEXT NOT NULL,
site_url TEXT,
account TEXT NOT NULL,
api_key TEXT NOT NULL,
auto_monitor BOOLEAN DEFAULT 1,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
)
''')
conn.commit()
conn.close()
def init_vpslist_db(self):
"""初始化vpslist.db - VPS列表缓存表"""
conn = self.get_connection(self.vpslist_db)
cursor = conn.cursor()
cursor.execute('''
CREATE TABLE IF NOT EXISTS vps_list (
id INTEGER PRIMARY KEY AUTOINCREMENT,
config_id INTEGER NOT NULL,
vps_id INTEGER NOT NULL,
domain TEXT,
ip_address TEXT,
product_name TEXT,
cpu_cores INTEGER,
memory_size TEXT,
disk_size TEXT,
bandwidth TEXT,
os_type TEXT,
status TEXT,
section BOOLEAN DEFAULT 0,
last_check TIMESTAMP,
FOREIGN KEY (config_id) REFERENCES configs(id),
UNIQUE(config_id, vps_id)
)
''')
# 创建索引以提高查询性能
cursor.execute('CREATE INDEX IF NOT EXISTS idx_vps_config ON vps_list(config_id)')
cursor.execute('CREATE INDEX IF NOT EXISTS idx_vps_vps_id ON vps_list(vps_id)')
conn.commit()
conn.close()
def init_status_db(self):
"""初始化status.db - Ping状态记录和摘要统计表"""
conn = self.get_connection(self.status_db)
cursor = conn.cursor()
# Ping状态记录表
cursor.execute('''
CREATE TABLE IF NOT EXISTS ping_status (
id INTEGER PRIMARY KEY AUTOINCREMENT,
vps_id INTEGER NOT NULL,
target TEXT NOT NULL,
status TEXT NOT NULL,
latency_ms REAL,
check_time TIMESTAMP DEFAULT CURRENT_TIMESTAMP
)
''')
# VPS摘要统计表
cursor.execute('''
CREATE TABLE IF NOT EXISTS vps_summary (
id INTEGER PRIMARY KEY AUTOINCREMENT,
vps_id INTEGER NOT NULL,
date DATE NOT NULL,
avg_latency_ms REAL,
max_latency_ms REAL,
min_latency_ms REAL,
count_under_100 INTEGER DEFAULT 0,
count_100_to_300 INTEGER DEFAULT 0,
count_300_to_500 INTEGER DEFAULT 0,
count_abnormal INTEGER DEFAULT 0,
availability TEXT,
UNIQUE(vps_id, date)
)
''')
# 创建索引
cursor.execute('CREATE INDEX IF NOT EXISTS idx_ping_vps ON ping_status(vps_id)')
cursor.execute('CREATE INDEX IF NOT EXISTS idx_ping_time ON ping_status(check_time)')
cursor.execute('CREATE INDEX IF NOT EXISTS idx_summary_vps ON vps_summary(vps_id)')
cursor.execute('CREATE INDEX IF NOT EXISTS idx_summary_date ON vps_summary(date)')
conn.commit()
conn.close()
# ==================== vps.db 操作 ====================
def add_config(self, api_label, site_type, account, api_key, site_url=None, auto_monitor=True):
"""添加VPS配置
Args:
api_label: API标识必填唯一
site_type: 网站类型 (mofang/aliyun/tencent)
account: 账户
api_key: API密钥
site_url: 网站链接
auto_monitor: 是否开启自动监控
Returns:
新配置的ID
"""
conn = self.get_connection(self.vps_db)
cursor = conn.cursor()
try:
cursor.execute('''
INSERT INTO configs (api_label, site_type, site_url, account, api_key, auto_monitor)
VALUES (?, ?, ?, ?, ?, ?)
''', (api_label, site_type, site_url, account, api_key, auto_monitor))
config_id = cursor.lastrowid
conn.commit()
return config_id
except Exception as e:
conn.rollback()
raise e
finally:
conn.close()
def get_all_configs(self):
"""获取所有配置
Returns:
配置列表
"""
conn = self.get_connection(self.vps_db)
cursor = conn.cursor()
cursor.execute('SELECT * FROM configs ORDER BY id')
configs = [dict(row) for row in cursor.fetchall()]
conn.close()
return configs
def get_config_by_id(self, config_id):
"""根据ID获取配置
Args:
config_id: 配置ID
Returns:
配置字典或None
"""
conn = self.get_connection(self.vps_db)
cursor = conn.cursor()
cursor.execute('SELECT * FROM configs WHERE id = ?', (config_id,))
row = cursor.fetchone()
conn.close()
return dict(row) if row else None
def update_config(self, config_id, **kwargs):
"""更新配置
Args:
config_id: 配置ID
**kwargs: 要更新的字段
Returns:
是否成功
"""
if not kwargs:
return False
conn = self.get_connection(self.vps_db)
cursor = conn.cursor()
# 构建UPDATE语句
fields = ', '.join([f"{key} = ?" for key in kwargs.keys()])
values = list(kwargs.values())
values.append(config_id)
cursor.execute(f'UPDATE configs SET {fields}, updated_at = CURRENT_TIMESTAMP WHERE id = ?', values)
affected = cursor.rowcount
conn.commit()
conn.close()
return affected > 0
def delete_config(self, config_id):
"""删除配置
Args:
config_id: 配置ID
Returns:
是否成功
"""
conn = self.get_connection(self.vps_db)
cursor = conn.cursor()
cursor.execute('DELETE FROM configs WHERE id = ?', (config_id,))
affected = cursor.rowcount
conn.commit()
conn.close()
return affected > 0
# ==================== vpslist.db 操作 ====================
def add_vps(self, config_id, vps_id, domain=None, ip_address=None,
product_name=None, section=False):
"""添加VPS到列表
Args:
config_id: 配置ID
vps_id: VPS在平台的ID
domain: 域名
ip_address: IP地址
product_name: 产品名称
section: 是否标记为需要监控
Returns:
新记录的ID
"""
conn = self.get_connection(self.vpslist_db)
cursor = conn.cursor()
cursor.execute('''
INSERT INTO vps_list (config_id, vps_id, domain, ip_address, product_name, section, last_check)
VALUES (?, ?, ?, ?, ?, ?, CURRENT_TIMESTAMP)
''', (config_id, vps_id, domain, ip_address, product_name, section))
record_id = cursor.lastrowid
conn.commit()
conn.close()
return record_id
def batch_add_vps(self, vps_list):
"""批量添加VPS
Args:
vps_list: VPS信息列表,每个元素是字典
"""
conn = self.get_connection(self.vpslist_db)
cursor = conn.cursor()
for vps in vps_list:
cursor.execute('''
INSERT OR REPLACE INTO vps_list
(config_id, vps_id, domain, ip_address, product_name, cpu_cores, memory_size, disk_size, bandwidth, os_type, section, last_check)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, CURRENT_TIMESTAMP)
''', (
vps['config_id'],
vps['vps_id'],
vps.get('domain'),
vps.get('ip_address'),
vps.get('product_name'),
vps.get('cpu_cores'),
vps.get('memory_size'),
vps.get('disk_size'),
vps.get('bandwidth'),
vps.get('os_type'),
vps.get('section', False)
))
conn.commit()
conn.close()
def get_all_vps(self):
"""获取所有VPS
Returns:
VPS列表
"""
conn = self.get_connection(self.vpslist_db)
cursor = conn.cursor()
cursor.execute('SELECT * FROM vps_list ORDER BY config_id, vps_id')
vps_list = [dict(row) for row in cursor.fetchall()]
conn.close()
return vps_list
def get_vps_by_config(self, config_id):
"""根据配置ID获取VPS列表
Args:
config_id: 配置ID
Returns:
VPS列表
"""
conn = self.get_connection(self.vpslist_db)
cursor = conn.cursor()
cursor.execute('SELECT * FROM vps_list WHERE config_id = ? ORDER BY vps_id', (config_id,))
vps_list = [dict(row) for row in cursor.fetchall()]
conn.close()
return vps_list
def update_vps_details(self, vps_id, **kwargs):
"""更新VPS详细信息
Args:
vps_id: VPS ID
**kwargs: 要更新的字段
Returns:
是否成功
"""
if not kwargs:
return False
conn = self.get_connection(self.vpslist_db)
cursor = conn.cursor()
fields = ', '.join([f"{key} = ?" for key in kwargs.keys()])
values = list(kwargs.values())
values.append(vps_id)
cursor.execute(f'UPDATE vps_list SET {fields}, last_check = CURRENT_TIMESTAMP WHERE vps_id = ?', values)
affected = cursor.rowcount
conn.commit()
conn.close()
return affected > 0
def update_vps_status(self, vps_id, status):
"""更新VPS状态
Args:
vps_id: VPS ID
status: 状态 (on/off/unknown)
Returns:
是否成功
"""
return self.update_vps_details(vps_id, status=status)
def get_monitored_vps(self):
"""获取所有标记为需要监控的VPS
Returns:
VPS列表
"""
conn = self.get_connection(self.vpslist_db)
cursor = conn.cursor()
cursor.execute('SELECT * FROM vps_list WHERE section = 1 ORDER BY config_id, vps_id')
vps_list = [dict(row) for row in cursor.fetchall()]
conn.close()
return vps_list
# ==================== status.db 操作 ====================
def save_ping_status(self, vps_id, target, status, latency_ms=None):
"""保存Ping状态记录
Args:
vps_id: VPS ID
target: 目标(IP或域名)
status: 状态 (normal/abnormal)
latency_ms: 延迟(ms)
"""
conn = self.get_connection(self.status_db)
cursor = conn.cursor()
cursor.execute('''
INSERT INTO ping_status (vps_id, target, status, latency_ms)
VALUES (?, ?, ?, ?)
''', (vps_id, target, status, latency_ms))
conn.commit()
conn.close()
def get_ping_records(self, vps_id, date=None):
"""获取Ping记录
Args:
vps_id: VPS ID
date: 日期 (YYYY-MM-DD),为空则获取所有记录
Returns:
Ping记录列表
"""
conn = self.get_connection(self.status_db)
cursor = conn.cursor()
if date:
cursor.execute('''
SELECT * FROM ping_status
WHERE vps_id = ? AND DATE(check_time) = ?
ORDER BY check_time
''', (vps_id, date))
else:
cursor.execute('''
SELECT * FROM ping_status
WHERE vps_id = ?
ORDER BY check_time DESC
''', (vps_id,))
records = [dict(row) for row in cursor.fetchall()]
conn.close()
return records
def cleanup_old_ping_records(self, days=30):
"""清理旧的Ping记录
Args:
days: 保留天数,默认30天
"""
from datetime import timedelta
conn = self.get_connection(self.status_db)
cursor = conn.cursor()
cutoff_date = (datetime.now() - timedelta(days=days)).strftime('%Y-%m-%d %H:%M:%S')
cursor.execute('DELETE FROM ping_status WHERE check_time < ?', (cutoff_date,))
deleted = cursor.rowcount
conn.commit()
conn.close()
return deleted
def save_vps_summary(self, vps_id, date, avg_latency, max_latency, min_latency,
count_under_100, count_100_to_300, count_300_to_500,
count_abnormal, availability):
"""保存VPS摘要统计
Args:
vps_id: VPS ID
date: 日期 (YYYY-MM-DD)
avg_latency: 平均延迟
max_latency: 最大延迟
min_latency: 最小延迟
count_under_100: <100ms次数
count_100_to_300: 100-300ms次数
count_300_to_500: 300-500ms次数
count_abnormal: 异常次数
availability: 可用性评分
"""
conn = self.get_connection(self.status_db)
cursor = conn.cursor()
cursor.execute('''
INSERT OR REPLACE INTO vps_summary
(vps_id, date, avg_latency_ms, max_latency_ms, min_latency_ms,
count_under_100, count_100_to_300, count_300_to_500, count_abnormal, availability)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
''', (vps_id, date, avg_latency, max_latency, min_latency,
count_under_100, count_100_to_300, count_300_to_500,
count_abnormal, availability))
conn.commit()
conn.close()
def get_vps_summary(self, vps_id, date=None):
"""获取VPS摘要统计
Args:
vps_id: VPS ID
date: 日期,为空则获取最新一条
Returns:
摘要统计字典或None
"""
conn = self.get_connection(self.status_db)
cursor = conn.cursor()
if date:
cursor.execute('SELECT * FROM vps_summary WHERE vps_id = ? AND date = ?', (vps_id, date))
else:
cursor.execute('SELECT * FROM vps_summary WHERE vps_id = ? ORDER BY date DESC LIMIT 1', (vps_id,))
row = cursor.fetchone()
conn.close()
return dict(row) if row else None
def get_all_summaries(self, date=None):
"""获取所有VPS的摘要统计
Args:
date: 日期,为空则获取最新
Returns:
摘要统计列表
"""
conn = self.get_connection(self.status_db)
cursor = conn.cursor()
if date:
cursor.execute('SELECT * FROM vps_summary WHERE date = ? ORDER BY vps_id', (date,))
else:
# 获取每个VPS的最新摘要
cursor.execute('''
SELECT vs.* FROM vps_summary vs
INNER JOIN (
SELECT vps_id, MAX(date) as max_date
FROM vps_summary
GROUP BY vps_id
) latest ON vs.vps_id = latest.vps_id AND vs.date = latest.max_date
ORDER BY vs.vps_id
''')
summaries = [dict(row) for row in cursor.fetchall()]
conn.close()
return summaries
if __name__ == '__main__':
# 测试代码
db = DatabaseManager()
print("数据库初始化完成")
print(f"vps.db: {db.vps_db}")
print(f"vpslist.db: {db.vpslist_db}")
print(f"status.db: {db.status_db}")