145 lines
4.2 KiB
Python
145 lines
4.2 KiB
Python
# -*- coding: utf-8 -*-
|
||
|
||
import json
|
||
import sqlite3
|
||
import os
|
||
from datetime import datetime, timedelta
|
||
|
||
def create_database():
|
||
conn = sqlite3.connect('./db/xianzhi.db')
|
||
cursor = conn.cursor()
|
||
cursor.execute('''CREATE TABLE IF NOT EXISTS articles (
|
||
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
||
title TEXT,
|
||
link TEXT,
|
||
published DATETIME
|
||
)''')
|
||
conn.commit()
|
||
conn.close()
|
||
|
||
def insert_data(data):
|
||
conn = sqlite3.connect('./db/xianzhi.db')
|
||
cursor = conn.cursor()
|
||
for entry in data:
|
||
try:
|
||
# 解析 published 字符串为 datetime 对象
|
||
pub_date = datetime.strptime(entry['published'], '%Y-%m-%dT%H:%M:%S%z')
|
||
# 格式化 published 为所需的格式
|
||
formatted_pub_date = pub_date.strftime('%Y-%m-%d %H:%M:%S')
|
||
# print(formatted_pub_date)
|
||
except ValueError:
|
||
# 如果解析失败,使用原始 published 字符串
|
||
formatted_pub_date = entry['published']
|
||
|
||
cursor.execute('''
|
||
INSERT INTO articles (title, link, published)
|
||
VALUES (?, ?, ?)
|
||
''', (entry['title'], entry['link'], formatted_pub_date))
|
||
conn.commit()
|
||
conn.close()
|
||
|
||
def get_xianzhi_json():
|
||
# 检查文件是否存在
|
||
if not os.path.exists('./JSON/xianzhi.json'):
|
||
raise FileNotFoundError(f"xianzhi.json文件不存在,请检查程序是否运行正常!")
|
||
|
||
# 打开并读取JSON文件
|
||
with open('./JSON/xianzhi.json', 'r', encoding='utf-8') as file:
|
||
data = json.load(file)
|
||
|
||
# 假设data是一个包含多个JSON对象的列表
|
||
if not isinstance(data, list):
|
||
raise ValueError("JSON文件格式错误,请检查common.py是否异常!")
|
||
|
||
# 提取所需字段并编号
|
||
total_data = []
|
||
for index, item in enumerate(data, start=1):
|
||
entry = {
|
||
"id": index,
|
||
"title": item.get("title", ""),
|
||
"link": item.get("link", ""),
|
||
"published": item.get("published", "")
|
||
}
|
||
total_data.append(entry)
|
||
|
||
return total_data
|
||
|
||
def select_articles(e_hour):
|
||
conn = sqlite3.connect('./db/xianzhi.db')
|
||
cursor = conn.cursor()
|
||
|
||
# 获取当前日期和时间
|
||
now = datetime.now()
|
||
start_time = now - timedelta(hours=e_hour)
|
||
end_time = now
|
||
|
||
# 查询指定时间段内的数据
|
||
cursor.execute('''
|
||
SELECT * FROM articles
|
||
WHERE published BETWEEN ? AND ?
|
||
''', (start_time.strftime('%Y-%m-%d %H:%M:%S'), end_time.strftime('%Y-%m-%d %H:%M:%S')))
|
||
|
||
results = cursor.fetchall()
|
||
conn.close()
|
||
return results
|
||
|
||
def clear_table():
|
||
conn = sqlite3.connect('./db/xianzhi.db')
|
||
cursor = conn.cursor()
|
||
cursor.execute('DELETE FROM articles')
|
||
conn.commit()
|
||
conn.close()
|
||
|
||
def record_md(result, filename="./history/tech_passage.md"):
|
||
with open(filename, 'a', encoding='utf-8') as file:
|
||
file.write(result)
|
||
|
||
def get_filtered_articles(entries):
|
||
result = ""
|
||
record = ""
|
||
for entry in entries:
|
||
result += f"文章:{entry[1]}\n"
|
||
result += f"链接:{entry[2]}\n上传时间:{entry[3]}\n"
|
||
result += "\n" + "-" * 40 + "\n" # 添加分隔线以便区分不同文章
|
||
|
||
record += f"#### 文章:{entry[1]}\n"
|
||
record += f"**链接**:{entry[2]}\n"
|
||
record += f"**上传时间**:{entry[3]}\n"
|
||
record += "\n" + "-" * 40 + "\n" # 添加分隔线以便区分不同文章
|
||
record_md(record)
|
||
return result
|
||
|
||
|
||
def Src_xianzhi(e_hour):
|
||
if not os.path.exists('./db/xianzhi.db'):
|
||
# 创建数据库和表
|
||
create_database()
|
||
|
||
# 清空表
|
||
clear_table()
|
||
|
||
# 获取 JSON 数据
|
||
M_xianzhi_data = get_xianzhi_json()
|
||
|
||
# 插入数据到数据库
|
||
insert_data(M_xianzhi_data)
|
||
|
||
# 查询指定时间段内的数据
|
||
filtered_articles = select_articles(e_hour)
|
||
# print(filtered_articles)
|
||
|
||
if filtered_articles:
|
||
results = get_filtered_articles(filtered_articles)
|
||
return results
|
||
else:
|
||
return False
|
||
|
||
if __name__ == "__main__":
|
||
reslts = Src_xianzhi(4)
|
||
if reslts != False:
|
||
print(reslts)
|
||
else:
|
||
# 如果为空,则跳过执行
|
||
print("-" * 40)
|
||
print("先知社区数据为空,跳过执行。")
|