import json import sqlite3 import os from datetime import datetime, timedelta from SendBot import SendToFeishu def create_database(): conn = sqlite3.connect('./db/doonsec.db') cursor = conn.cursor() cursor.execute('''CREATE TABLE IF NOT EXISTS articles ( id INTEGER PRIMARY KEY AUTOINCREMENT, title TEXT, link TEXT, description TEXT, pubDate DATETIME, author TEXT )''') conn.commit() conn.close() def insert_data(data): conn = sqlite3.connect('./db/doonsec.db') cursor = conn.cursor() for entry in data: try: # 解析 pubDate 字符串为 datetime 对象 pub_date = datetime.strptime(entry['pubDate'], '%Y-%m-%dT%H:%M:%S') # 格式化 pubDate 为所需的格式 formatted_pub_date = pub_date.strftime('%Y-%m-%d %H:%M:%S') except ValueError: # 如果解析失败,使用原始 pubDate 字符串 formatted_pub_date = entry['pubDate'] cursor.execute(''' INSERT INTO articles (title, link, description, pubDate, author) VALUES (?, ?, ?, ?, ?) ''', (entry['title'], entry['link'], entry['description'], formatted_pub_date, entry['author'])) conn.commit() conn.close() def get_doonsec_json(): # 检查文件是否存在 if not os.path.exists('./JSON/doonsec.json'): raise FileNotFoundError(f"doonsec.json文件不存在,请检查程序是否运行正常!") # 打开并读取JSON文件 with open('./JSON/doonsec.json', 'r', encoding='utf-8') as file: data = json.load(file) # 假设data是一个包含多个JSON对象的列表 if not isinstance(data, list): raise ValueError("JSON文件格式错误,请检查common.py是否异常!") # 提取所需字段并编号 total_data = [] for index, item in enumerate(data, start=1): entry = { "id": index, "title": item.get("title", ""), "link": item.get("link", ""), "description": item.get("description", ""), "pubDate": item.get("pubDate", ""), "author": item.get("author", ""), } total_data.append(entry) return total_data def query_articles_within_time_range(s_hour, e_hour): conn = sqlite3.connect('./db/doonsec.db') cursor = conn.cursor() # 获取当前日期和时间 now = datetime.now() start_time = datetime(now.year, now.month, now.day, s_hour) - timedelta(days=1) # print(start_time) end_time = datetime(now.year, now.month, now.day, e_hour) # print(end_time) # 查询指定时间段内的数据 cursor.execute(''' SELECT * FROM articles WHERE pubDate BETWEEN ? AND ? ''', (start_time.strftime('%Y-%m-%d %H:%M:%S'), end_time.strftime('%Y-%m-%d %H:%M:%S'))) results = cursor.fetchall() conn.close() return results def clear_table(): conn = sqlite3.connect('./db/doonsec.db') cursor = conn.cursor() cursor.execute('DELETE FROM articles') conn.commit() conn.close() def get_filtered_articles(entries): result = "" for entry in entries: result += f"作者:{entry[5]}\t文章:{entry[1]}\n" result += f"链接:[点此访问]({entry[2]})\t上传时间:{entry[4]}\n" result += f"简介:{entry[3]}\n" result += "-" * 40 + "\n" # 添加分隔线以便区分不同文章 return result def Src_doonsec(s_hour, e_hour): if not os.path.exists('./db/doonsec.db'): # 创建数据库和表 create_database() # 清空表 clear_table() # 获取 JSON 数据 M_doonsec_data = get_doonsec_json() # 插入数据到数据库 insert_data(M_doonsec_data) # 查询指定时间段内的数据 filtered_articles = query_articles_within_time_range(s_hour, e_hour) # print(filtered_articles) if filtered_articles: results = get_filtered_articles(filtered_articles) SendToFeishu(results, "洞见微信安全资讯递送") # print(results) else: # 如果为空,则跳过执行 print("洞见数据为空,跳过执行。") # print(results) if __name__ == "__main__": Src_doonsec(11, 11)