# -*- coding: utf-8 -*- import json import sqlite3 import os from datetime import datetime, timedelta def create_database(): conn = sqlite3.connect('./resources/db/github.db') cursor = conn.cursor() cursor.executescript(''' CREATE TABLE IF NOT EXISTS keywords ( id INTEGER PRIMARY KEY AUTOINCREMENT, title TEXT, link TEXT, description TEXT, pubDate DATETIME, author TEXT, keyword TEXT, language TEXT, is_sended BOOLEAN ); CREATE TABLE IF NOT EXISTS repos ( id INTEGER PRIMARY KEY AUTOINCREMENT, title TEXT, link TEXT, description TEXT, pubDate DATETIME, author TEXT, keyword TEXT, link2 TEXT ); CREATE TABLE IF NOT EXISTS releases ( id INTEGER PRIMARY KEY AUTOINCREMENT, link TEXT, pubDate DATETIME, author TEXT, keyword TEXT ); CREATE TABLE IF NOT EXISTS users ( id INTEGER PRIMARY KEY AUTOINCREMENT, title TEXT, link TEXT, description TEXT, pubDate DATETIME, author TEXT, keyword TEXT, language TEXT, is_sended BOOLEAN ); ''') conn.commit() conn.close() def insert_data(): # 检查文件是否存在 # 打开并读取JSON文件 # 假设data是一个包含多个JSON对象的列表,然后校验JSON格式是否异常 if not os.path.exists('./resources/JSON/github_keyword.json'): raise FileNotFoundError(f"github_keyword文件不存在,请检查程序是否运行正常!") with open('./resources/JSON/github_keyword.json', 'r', encoding='utf-8') as file: data_keyword = json.load(file) if not isinstance(data_keyword, list): raise ValueError("JSON文件格式错误,请检查爬取程序是否异常!") if not os.path.exists('./resources/JSON/github_repo.json'): raise FileNotFoundError(f"github_repo文件不存在,请检查程序是否运行正常!") with open('./resources/JSON/github_repo.json', 'r', encoding='utf-8') as file: data_repo = json.load(file) if not isinstance(data_repo, list): raise ValueError("JSON文件格式错误,请检查爬取程序是否异常!") if not os.path.exists('./resources/JSON/github_release.json'): raise FileNotFoundError(f"github_release文件不存在,请检查程序是否运行正常!") with open('./resources/JSON/github_release.json', 'r', encoding='utf-8') as file: data_release = json.load(file) if not isinstance(data_release, list): raise ValueError("JSON文件格式错误,请检查爬取程序是否异常!") if not os.path.exists('./resources/JSON/github_user.json'): raise FileNotFoundError(f"github_user文件不存在,请检查程序是否运行正常!") with open('./resources/JSON/github_user.json', 'r', encoding='utf-8') as file: data_user = json.load(file) if not isinstance(data_user, list): raise ValueError("JSON文件格式错误,请检查爬取程序是否异常!") conn = sqlite3.connect('./resources/db/github.db') cursor = conn.cursor() # 提取所需字段并编号 for index, item in enumerate(data_keyword, start=1): entry = { "id": index, "title": item.get("name", ""), "link": item.get("link", ""), "description": item.get("description", ""), "pubDate": item.get("created_at", ""), "author": item.get("author", ""), "keyword": item.get("keyword", ""), "language": item.get("language", "") } try: pub_date = datetime.strptime(entry['pubDate'], '%Y-%m-%dT%H:%M:%SZ') pub_date += timedelta(hours=8) formatted_pub_date = pub_date.strftime('%Y-%m-%d %H:%M:%S') except ValueError: # 如果解析失败,使用原始 pubDate 字符串 formatted_pub_date = entry['pubDate'] # 检查是否存在相同 title 和 author 的记录 cursor.execute(''' SELECT 1 FROM keywords WHERE title = ? AND author = ? ''', (entry['title'], entry['author'])) if cursor.fetchone() is None: # 如果没有找到相同记录,则插入新记录 cursor.execute(''' INSERT INTO keywords (title, link, description, pubDate, author, language, keyword) VALUES (?, ?, ?, ?, ?, ?, ?) ''', (entry['title'], entry['link'], entry['description'], formatted_pub_date, entry['author'], entry['language'], entry['keyword'])) for index, item in enumerate(data_repo, start=1): entry = { "id": index, "title": item.get("name", ""), "link": f"https://github.com/{item.get('keyword', '')}", "description": item.get("description", ""), "pubDate": item.get("updated_at", ""), "author": item.get("author", ""), "keyword": item.get("keyword", ""), "link2": item.get("link_2", "") } try: pub_date = datetime.strptime(entry['pubDate'], '%Y-%m-%dT%H:%M:%SZ') pub_date += timedelta(hours=8) formatted_pub_date = pub_date.strftime('%Y-%m-%d %H:%M:%S') except ValueError: # 如果解析失败,使用原始 pubDate 字符串 formatted_pub_date = entry['pubDate'] cursor.execute(''' INSERT INTO repos (title, link, description, pubDate, author, link2, keyword) VALUES (?, ?, ?, ?, ?, ?, ?) ''', (entry['title'], entry['link'], entry['description'], formatted_pub_date, entry['author'], entry['link2'], entry['keyword'])) for index, item in enumerate(data_release, start=1): entry = { "id": index, "link": item.get("link", ""), "pubDate": item.get("published_at", ""), "author": item.get("author", ""), "keyword": item.get("keyword", "") } try: pub_date = datetime.strptime(entry['pubDate'], '%Y-%m-%dT%H:%M:%SZ') pub_date += timedelta(hours=8) formatted_pub_date = pub_date.strftime('%Y-%m-%d %H:%M:%S') except ValueError: # 如果解析失败,使用原始 pubDate 字符串 formatted_pub_date = entry['pubDate'] cursor.execute(''' INSERT INTO releases (link, pubDate, author, keyword) VALUES (?, ?, ?, ?) ''', (entry['link'], formatted_pub_date, entry['author'], entry['keyword'])) # 插入 users 数据 for index, item in enumerate(data_user, start=1): entry = { "id": index, "title": item.get("name", ""), "link": item.get("link", ""), "description": item.get("description", ""), "pubDate": item.get("created_at", ""), "author": item.get("author", ""), "keyword": item.get("keyword", ""), "language": item.get("language", "") } try: pub_date = datetime.strptime(entry['pubDate'], '%Y-%m-%dT%H:%M:%SZ') pub_date += timedelta(hours=8) formatted_pub_date = pub_date.strftime('%Y-%m-%d %H:%M:%S') except ValueError: # 如果解析失败,使用原始 pubDate 字符串 formatted_pub_date = entry['pubDate'] # 检查是否存在相同 title 和 author 的记录 cursor.execute(''' SELECT 1 FROM users WHERE title = ? AND author = ? ''', (entry['title'], entry['author'])) if cursor.fetchone() is None: # 如果没有找到相同记录,则插入新记录 cursor.execute(''' INSERT INTO users (title, link, description, pubDate, author, keyword, language) VALUES (?, ?, ?, ?, ?, ?, ?) ''', (entry['title'], entry['link'], entry['description'], formatted_pub_date, entry['author'], entry['keyword'], entry['language'])) conn.commit() conn.close() def select_articles(e_hour): conn = sqlite3.connect('./resources/db/github.db') cursor = conn.cursor() # 获取当前日期和时间 now = datetime.now() two_months_ago = now - timedelta(days=60) # 假设两个月大约60天 start_time = now - timedelta(hours=e_hour, minutes=3) # 查询指定时间段内的数据 cursor.execute(''' SELECT * FROM keywords WHERE is_sended IS NULL AND pubDate BETWEEN ? AND ? ORDER BY pubDate DESC ''', (start_time.strftime('%Y-%m-%d %H:%M:%S'), now.strftime('%Y-%m-%d %H:%M:%S'))) result_1 = cursor.fetchall() if result_1: for row in result_1: keyword_id = row[0] cursor.execute(''' UPDATE keywords SET is_sended = True WHERE id = ? ''', (keyword_id,)) conn.commit() cursor.execute(''' SELECT * FROM repos WHERE pubDate BETWEEN ? AND ? ''', (start_time.strftime('%Y-%m-%d %H:%M:%S'), now.strftime('%Y-%m-%d %H:%M:%S'))) result_2 = cursor.fetchall() # 查询最近的5条未被标记为True的消息且发布时间不超过两个月 cursor.execute(''' SELECT * FROM users WHERE is_sended IS NULL AND pubDate BETWEEN ? AND ? ORDER BY pubDate DESC LIMIT 5 ''', (two_months_ago.strftime('%Y-%m-%d %H:%M:%S'), now.strftime('%Y-%m-%d %H:%M:%S'))) result_3 = cursor.fetchall() # print(results) if result_3: for row in result_3: user_id = row[0] cursor.execute(''' UPDATE users SET is_sended = True WHERE id = ? ''', (user_id,)) conn.commit() # 提交事务 cursor.execute(''' SELECT * FROM releases WHERE pubDate BETWEEN ? AND ? ''', (start_time.strftime('%Y-%m-%d %H:%M:%S'), now.strftime('%Y-%m-%d %H:%M:%S'))) result_4 = cursor.fetchall() cursor.close() conn.close() return result_1, result_2, result_3, result_4 def clear_table(): conn = sqlite3.connect('./resources/db/github.db') cursor = conn.cursor() cursor.execute('DELETE FROM repos') cursor.execute('DELETE FROM releases') conn.commit() conn.close() def record_md(result, filename="./resources/history/github.md"): # 读取现有内容 if os.path.exists(filename): with open(filename, 'r', encoding='utf-8') as file: existing_content = file.read() else: existing_content = "" # 将新内容插入到现有内容的开头 new_content = result + existing_content # 写回文件 with open(filename, 'w', encoding='utf-8') as file: file.write(new_content) def get_filtered_articles(entries, choice): result_long = "" result_short = "" record = "" short_results = [] for entry in entries: if choice == 1: # 构建长文本结果 result_long += f"关键词【{entry[6]}】发现新项目:[{entry[1]}]({entry[2]})\n" result_long += f"项目描述:{entry[3]}\n" result_long += f"上传时间:{entry[4]}\n" result_long += f"开发语言:{entry[7]}\t\t作者:{entry[5]}\n" result_long += "\n" + "-" * 10 + "\n" # 添加分隔线以便区分不同文章 # 构建短文本结果并进行分块处理 current_entry = ( f"关键词【{entry[6]}】发现新项目:[{entry[1]}]({entry[2]})\n" f"上传时间:{entry[4]}\n" f"开发语言:{entry[7]}\t\t作者:{entry[5]}\n" "\n" + "-" * 3 + "\n" # 添加分隔线以便区分不同文章 ) elif choice == 2: # 构建长文本结果 result_long += f"项目:[{entry[1]}]({entry[2]})存在更新!!!\n" result_long += f"更新描述:{entry[3]}\n" result_long += f"更新时间:{entry[4]}\n" result_long += f"提交者:{entry[5]},[点此查看提交详情]({entry[7]})\n" result_long += "\n" + "-" * 10 + "\n" # 添加分隔线以便区分不同文章 # 构建短文本结果并进行分块处理 current_entry = ( f"项目:[{entry[1]}]({entry[2]})存在更新!!!\n" f"更新时间:{entry[4]}\n" f"提交者:{entry[5]},[点此查看提交详情]({entry[7]})\n" "\n" + "-" * 3 + "\n" # 添加分隔线以便区分不同文章 ) elif choice == 3: # 构建长文本结果 result_long += f"大佬 {entry[5]} 上传了一个新工具:[{entry[1]}]({entry[2]})\n" result_long += f"项目描述:{entry[3]}\n" result_long += f"上传时间:{entry[4]}\n" result_long += "\n" + "-" * 10 + "\n" # 添加分隔线以便区分不同文章 # 构建短文本结果并进行分块处理 current_entry = ( f"大佬 {entry[5]} 上传了一个新工具:[{entry[1]}]({entry[2]})\n" f"上传时间:{entry[4]}\n" "\n" + "-" * 3 + "\n" # 添加分隔线以便区分不同文章 ) elif choice == 4: # 构建长文本结果 result_long += f"【{entry[3]}】为[{entry[4]}]({entry[1]})发布了新版本,请及时查收!\n" result_long += f"发布时间:{entry[2]}\n" result_long += "\n" + "-" * 10 + "\n" # 添加分隔线以便区分不同文章 # 构建短文本结果并进行分块处理 current_entry = ( f"【{entry[3]}】为[{entry[4]}]({entry[1]})发布了新版本,请及时查收!\n" f"发布时间:{entry[2]}\n" "\n" + "-" * 3 + "\n" # 添加分隔线以便区分不同文章 ) temp_result = result_short + current_entry if len(temp_result.encode('utf-8')) > 4096: short_results.append(result_short) result_short = current_entry else: result_short = temp_result if choice == 1: record += f"#### 关键词【{entry[6]}】发现新项目:[{entry[1]}]({entry[2]})\n" record += f"**项目描述**:{entry[3]}\n" record += f"**上传时间**:{entry[4]}\n" record += f"**开发语言**:{entry[7]}\n**作者**:{entry[5]}\n" record += "\n" + "-" * 10 + "\n" # 添加分隔线以便区分不同文章 elif choice == 2: record += f"#### 项目:[{entry[1]}]({entry[2]})存在更新!!!\n" record += f"**更新描述**:{entry[3]}\n" record += f"**更新时间**:{entry[4]}\n" record += f"**提交者**:{entry[5]},[点此查看提交详情]({entry[7]})\n" record += "\n" + "-" * 10 + "\n" # 添加分隔线以便区分不同文章 elif choice == 3: record += f"#### 大佬 {entry[5]} 上传了一个新工具:[{entry[1]}]({entry[2]})\n" record += f"**项目描述**:{entry[3]}\n" record += f"**上传时间**:{entry[4]}\n" record += "\n" + "-" * 10 + "\n" # 添加分隔线以便区分不同文章 elif choice == 4: record += f"#### 【{entry[3]}】为[{entry[4]}]({entry[1]})发布了新版本,请及时查收!\n" record += f"**发布时间**:{entry[2]}\n" record += "\n" + "-" * 10 + "\n" # 添加分隔线以便区分不同文章 # 处理最后一个结果 if result_short: short_results.append(result_short) record_md(record) return result_long, short_results def Src_github(e_hour): if not os.path.exists('./resources/db/github.db'): # 创建数据库和表 create_database() # 清空表 clear_table() # 插入数据到数据库 insert_data() # 查询指定时间段内的数据 keyword_data, repo_data, user_data, release_data = select_articles(e_hour) results = [] if keyword_data: result_long_1, short_results_1 = get_filtered_articles(keyword_data, 1) results.append((result_long_1, short_results_1)) else: results.append(("", [])) if repo_data: result_long_2, short_results_2 = get_filtered_articles(repo_data, 2) results.append((result_long_2, short_results_2)) else: results.append(("", [])) if user_data: result_long_3, short_results_3 = get_filtered_articles(user_data, 3) results.append((result_long_3, short_results_3)) else: results.append(("", [])) if release_data: result_long_4, short_results_4 = get_filtered_articles(release_data, 4) results.append((result_long_4, short_results_4)) else: results.append(("", [])) return results if __name__ == "__main__": results = Src_github(240) for i, (result_long, short_results) in enumerate(results, start=1): if result_long != "": print(f"长文本结果 {i}:") print(result_long) print("\n" + "-" * 10 + "\n") if short_results: print(f"分块的短文本结果 {i}:") for short_result in short_results: print(short_result) print("\n" + "-" * 10 + "\n") if all(result_long == "" and not short_results for result_long, short_results in results): # 如果为空,则跳过执行 print("-" * 10) print("github数据为空,跳过执行。")