PyBot/GotoSend_anquanke.py

129 lines
3.7 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import json
import sqlite3
import os
from datetime import datetime, timedelta
from SendBot import SendToFeishu
def create_database():
conn = sqlite3.connect('./db/anquanke.db')
cursor = conn.cursor()
cursor.execute('''CREATE TABLE IF NOT EXISTS articles (
id INTEGER PRIMARY KEY AUTOINCREMENT,
title TEXT,
guid TEXT,
source TEXT,
description TEXT,
pubDate DATETIME,
author TEXT
)''')
conn.commit()
conn.close()
def insert_data(data):
conn = sqlite3.connect('./db/anquanke.db')
cursor = conn.cursor()
for entry in data:
cursor.execute('''
INSERT INTO articles (title, guid, source, description, pubDate, author)
VALUES (?, ?, ?, ?, ?, ?)
''', (entry['title'], entry['guid'], entry['source'], entry['description'], entry['pubDate'], entry['author']))
conn.commit()
conn.close()
def get_anquanke_json():
# 检查文件是否存在
if not os.path.exists('./JSON/anquanke.json'):
raise FileNotFoundError(f"anquanke.json文件不存在请检查程序是否运行正常")
# 打开并读取JSON文件
with open('./JSON/anquanke.json', 'r', encoding='utf-8') as file:
data = json.load(file)
# 假设data是一个包含多个JSON对象的列表
if not isinstance(data, list):
raise ValueError("JSON文件格式错误请检查common.py是否异常")
# 提取所需字段并编号
total_data = []
for index, item in enumerate(data, start=1):
entry = {
"id": index,
"title": item.get("title", ""),
"guid": item.get("guid", ""),
"description": item.get("description", ""),
"pubDate": item.get("pubDate", ""),
"author": item.get("author", ""),
"source": item.get("source", "")
}
total_data.append(entry)
return total_data
def select_articles(e_hour):
conn = sqlite3.connect('./db/anquanke.db')
cursor = conn.cursor()
# 获取当前日期和时间
now = datetime.now()
start_time = now - timedelta(hours=e_hour)
end_time = now
# 查询指定时间段内的数据
cursor.execute('''
SELECT * FROM articles
WHERE pubDate BETWEEN ? AND ?
''', (start_time.strftime('%Y-%m-%d %H:%M:%S'), end_time.strftime('%Y-%m-%d %H:%M:%S')))
results = cursor.fetchall()
conn.close()
return results
def clear_table():
conn = sqlite3.connect('./db/anquanke.db')
cursor = conn.cursor()
cursor.execute('DELETE FROM articles')
conn.commit()
conn.close()
def get_filtered_articles(entries):
result = ""
for entry in entries:
result += f"作者:{entry[6]}\t来源:{entry[3]}\t文章:{entry[1]}\n"
result += f"链接:{entry[2]}\t上传时间:{entry[5]}\n"
result += "-" * 40 + "\n" # 添加分隔线以便区分不同文章
return result
def Src_anquanke(e_hour):
if not os.path.exists('./db/anquanke.db'):
# 创建数据库和表
create_database()
# 清空表
clear_table()
# 获取 JSON 数据
M_anquanke_data = get_anquanke_json()
# 插入数据到数据库
insert_data(M_anquanke_data)
# 查询指定时间段内的数据
filtered_articles = select_articles(e_hour)
# print(filtered_articles)
if filtered_articles:
results = get_filtered_articles(filtered_articles)
return results
else:
return False
if __name__ == "__main__":
reslts = Src_anquanke(4)
if reslts != False:
print(reslts)
else:
# 如果为空,则跳过执行
print("-" * 40)
print("安全客数据为空,跳过执行。")