PyBot/GotoSend/github.py

369 lines
14 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

# -*- coding: utf-8 -*-
import json
import sqlite3
import os
from datetime import datetime, timedelta
def create_database():
conn = sqlite3.connect('./db/github.db')
cursor = conn.cursor()
cursor.executescript('''
CREATE TABLE IF NOT EXISTS keywords (
id INTEGER PRIMARY KEY AUTOINCREMENT,
title TEXT,
link TEXT,
description TEXT,
pubDate DATETIME,
author TEXT,
keyword TEXT,
language TEXT,
is_sended BOOLEAN
);
CREATE TABLE IF NOT EXISTS repos (
id INTEGER PRIMARY KEY AUTOINCREMENT,
title TEXT,
link TEXT,
description TEXT,
pubDate DATETIME,
author TEXT,
keyword TEXT,
link2 TEXT
);
CREATE TABLE IF NOT EXISTS users (
id INTEGER PRIMARY KEY AUTOINCREMENT,
title TEXT,
link TEXT,
description TEXT,
pubDate DATETIME,
author TEXT,
keyword TEXT,
language TEXT,
is_sended BOOLEAN
);
''')
conn.commit()
conn.close()
def insert_data():
# 检查文件是否存在
# 打开并读取JSON文件
# 假设data是一个包含多个JSON对象的列表然后校验JSON格式是否异常
if not os.path.exists('./JSON/github_keyword.json'):
raise FileNotFoundError(f"github_keyword文件不存在请检查程序是否运行正常")
with open('./JSON/github_keyword.json', 'r', encoding='utf-8') as file:
data_keyword = json.load(file)
if not isinstance(data_keyword, list):
raise ValueError("JSON文件格式错误请检查爬取程序是否异常")
if not os.path.exists('./JSON/github_repo.json'):
raise FileNotFoundError(f"github_repo文件不存在请检查程序是否运行正常")
with open('./JSON/github_repo.json', 'r', encoding='utf-8') as file:
data_repo = json.load(file)
if not isinstance(data_repo, list):
raise ValueError("JSON文件格式错误请检查爬取程序是否异常")
if not os.path.exists('./JSON/github_user.json'):
raise FileNotFoundError(f"github_user文件不存在请检查程序是否运行正常")
with open('./JSON/github_user.json', 'r', encoding='utf-8') as file:
data_user = json.load(file)
if not isinstance(data_user, list):
raise ValueError("JSON文件格式错误请检查爬取程序是否异常")
conn = sqlite3.connect('./db/github.db')
cursor = conn.cursor()
# 提取所需字段并编号
for index, item in enumerate(data_keyword, start=1):
entry = {
"id": index,
"title": item.get("name", ""),
"link": item.get("link", ""),
"description": item.get("description", ""),
"pubDate": item.get("created_at", ""),
"author": item.get("author", ""),
"keyword": item.get("keyword", ""),
"language": item.get("language", "")
}
try:
# 解析 pubDate 字符串为 datetime 对象
pub_date = datetime.strptime(entry['pubDate'], '%Y-%m-%dT%H:%M:%SZ')
# 格式化 pubDate 为所需的格式
formatted_pub_date = pub_date.strftime('%Y-%m-%d %H:%M:%S')
except ValueError:
# 如果解析失败,使用原始 pubDate 字符串
formatted_pub_date = entry['pubDate']
# 检查是否存在相同 title 和 author 的记录
cursor.execute('''
SELECT 1 FROM keywords WHERE title = ? AND author = ?
''', (entry['title'], entry['author']))
if cursor.fetchone() is None:
# 如果没有找到相同记录,则插入新记录
cursor.execute('''
INSERT INTO keywords (title, link, description, pubDate, author, language, keyword)
VALUES (?, ?, ?, ?, ?, ?, ?)
''', (entry['title'], entry['link'], entry['description'], formatted_pub_date, entry['author'], entry['language'], entry['keyword']))
for index, item in enumerate(data_repo, start=1):
entry = {
"id": index,
"title": item.get("name", ""),
"link": item.get("link", ""),
"description": item.get("description", ""),
"pubDate": item.get("updated_at", ""),
"author": item.get("author", ""),
"keyword": item.get("keyword", ""),
"link2": item.get("link_2", "")
}
try:
# 解析 pubDate 字符串为 datetime 对象
pub_date = datetime.strptime(entry['pubDate'], '%Y-%m-%dT%H:%M:%SZ')
# 格式化 pubDate 为所需的格式
formatted_pub_date = pub_date.strftime('%Y-%m-%d %H:%M:%S')
except ValueError:
# 如果解析失败,使用原始 pubDate 字符串
formatted_pub_date = entry['pubDate']
cursor.execute('''
INSERT INTO repos (title, link, description, pubDate, author, link2, keyword)
VALUES (?, ?, ?, ?, ?, ?, ?)
''', (entry['title'], entry['link'], entry['description'], formatted_pub_date, entry['author'], entry['link2'], entry['keyword']))
# 插入 users 数据
for index, item in enumerate(data_user, start=1):
entry = {
"id": index,
"title": item.get("name", ""),
"link": item.get("link", ""),
"description": item.get("description", ""),
"pubDate": item.get("created_at", ""),
"author": item.get("author", ""),
"keyword": item.get("keyword", ""),
"language": item.get("language", "")
}
try:
# 解析 pubDate 字符串为 datetime 对象
pub_date = datetime.strptime(entry['pubDate'], '%Y-%m-%dT%H:%M:%SZ')
# 格式化 pubDate 为所需的格式
formatted_pub_date = pub_date.strftime('%Y-%m-%d %H:%M:%S')
except ValueError:
# 如果解析失败,使用原始 pubDate 字符串
formatted_pub_date = entry['pubDate']
# 检查是否存在相同 title 和 author 的记录
cursor.execute('''
SELECT 1 FROM users WHERE title = ? AND author = ?
''', (entry['title'], entry['author']))
if cursor.fetchone() is None:
# 如果没有找到相同记录,则插入新记录
cursor.execute('''
INSERT INTO users (title, link, description, pubDate, author, keyword, language)
VALUES (?, ?, ?, ?, ?, ?, ?)
''', (entry['title'], entry['link'], entry['description'], formatted_pub_date, entry['author'], entry['keyword'], entry['language']))
conn.commit()
conn.close()
def select_articles(e_hour):
conn = sqlite3.connect('./db/github.db')
cursor = conn.cursor()
# 获取当前日期和时间
now = datetime.now()
two_months_ago = now - timedelta(days=60) # 假设两个月大约60天
start_time = now - timedelta(hours=e_hour, minutes=3)
# 查询指定时间段内的数据
cursor.execute('''
SELECT * FROM keywords
WHERE is_sended IS NULL AND pubDate BETWEEN ? AND ?
ORDER BY pubDate DESC
''', (start_time.strftime('%Y-%m-%d %H:%M:%S'), now.strftime('%Y-%m-%d %H:%M:%S')))
result_1 = cursor.fetchall()
if result_1:
for row in result_1:
keyword_id = row[0]
cursor.execute('''
UPDATE keywords
SET is_sended = True
WHERE id = ?
''', (keyword_id,))
conn.commit()
cursor.execute('''
SELECT * FROM repos
WHERE pubDate BETWEEN ? AND ?
''', (start_time.strftime('%Y-%m-%d %H:%M:%S'), now.strftime('%Y-%m-%d %H:%M:%S')))
result_2 = cursor.fetchall()
# 查询最近的5条未被标记为True的消息且发布时间不超过两个月
cursor.execute('''
SELECT * FROM users
WHERE is_sended IS NULL AND pubDate BETWEEN ? AND ?
ORDER BY pubDate DESC
LIMIT 5
''', (two_months_ago.strftime('%Y-%m-%d %H:%M:%S'), now.strftime('%Y-%m-%d %H:%M:%S')))
# 查询最近的3条未被标记为True的消息
# cursor.execute('''
# SELECT * FROM users
# WHERE is_sended IS NULL
# ORDER BY pubDate DESC
# LIMIT 5
# ''')
result_3 = cursor.fetchall()
# print(results)
if result_3:
for row in result_3:
user_id = row[0]
cursor.execute('''
UPDATE users
SET is_sended = True
WHERE id = ?
''', (user_id,))
conn.commit() # 提交事务
cursor.close()
conn.close()
return result_1, result_2, result_3
def clear_table():
conn = sqlite3.connect('./db/github.db')
cursor = conn.cursor()
cursor.execute('DELETE FROM repos')
conn.commit()
conn.close()
def record_md(result, filename="./history/github.md"):
# 读取现有内容
if os.path.exists(filename):
with open(filename, 'r', encoding='utf-8') as file:
existing_content = file.read()
else:
existing_content = ""
# 将新内容插入到现有内容的开头
new_content = result + existing_content
# 写回文件
with open(filename, 'w', encoding='utf-8') as file:
file.write(new_content)
def get_filtered_articles(entries, Is_short, choice):
result = ""
record = ""
for entry in entries:
if Is_short == False:
if choice == 1:
result += f"关键词【{entry[6]}】发现新项目:[{entry[1]}]({entry[2]})\n"
result += f"项目描述:{entry[3]}\n"
result += f"上传时间:{entry[4]}\n"
result += f"开发语言:{entry[7]}\t\t作者:{entry[5]}\n"
result += "\n" + "-" * 10 + "\n" # 添加分隔线以便区分不同文章
elif choice == 2:
result += f"项目:[{entry[1]}]({entry[2]})存在更新!!!\n"
result += f"更新描述:{entry[3]}\n"
result += f"更新时间:{entry[4]}\n"
result += f"提交者:{entry[5]}[点此查看提交详情]({entry[2]})\n"
result += "\n" + "-" * 10 + "\n" # 添加分隔线以便区分不同文章
elif choice == 3:
result += f"大佬 {entry[5]} 上传了一个新工具:[{entry[1]}]({entry[2]})\n"
result += f"项目描述:{entry[3]}\n"
result += f"上传时间:{entry[4]}\n"
result += "\n" + "-" * 10 + "\n" # 添加分隔线以便区分不同文章
elif Is_short == True:
if choice == 1:
result += f"关键词【{entry[7]}】发现新项目:[{entry[1]}]({entry[2]})\n"
result += f"上传时间:{entry[4]}\n"
result += f"开发语言:{entry[6]}\t\t作者:{entry[5]}\n"
result += "\n" + "-" * 3 + "\n" # 添加分隔线以便区分不同文章
elif choice == 2:
result += f"项目:[{entry[1]}]({entry[2]})存在更新!!!\n"
result += f"更新描述:{entry[3]}\n"
result += f"更新时间:{entry[4]}\n"
result += f"提交者:{entry[5]}[点此查看提交详情]({entry[2]})\n"
result += "\n" + "-" * 3 + "\n" # 添加分隔线以便区分不同文章
elif choice == 3:
result += f"大佬 {entry[5]} 上传了一个新工具:[{entry[1]}]({entry[2]})\n"
result += f"项目描述:{entry[3]}\n"
result += f"上传时间:{entry[4]}\n"
result += "\n" + "-" * 3 + "\n" # 添加分隔线以便区分不同文章
if choice == 1:
record += f"#### 关键词【{entry[7]}】发现新项目:[{entry[1]}]({entry[2]})\n"
record += f"**项目描述**{entry[3]}\n"
record += f"**上传时间**{entry[4]}\n"
record += f"**开发语言**{entry[6]}\t\t**作者**{entry[5]}\n"
record += "\n" + "-" * 10 + "\n" # 添加分隔线以便区分不同文章
elif choice == 2:
record += f"#### 项目:[{entry[1]}]({entry[2]})存在更新!!!\n"
record += f"**更新描述**{entry[3]}\n"
record += f"**更新时间**{entry[4]}\n"
record += f"**提交者**{entry[5]}[点此查看提交详情]({entry[2]})\n"
record += "\n" + "-" * 10 + "\n" # 添加分隔线以便区分不同文章
elif choice == 3:
record += f"#### 大佬 {entry[5]} 上传了一个新工具:[{entry[1]}]({entry[2]})\n"
record += f"**项目描述**{entry[3]}\n"
record += f"**上传时间**{entry[4]}\n"
record += "\n" + "-" * 10 + "\n" # 添加分隔线以便区分不同文章
record_md(record)
return result
def Src_github(e_hour, Is_short):
if not os.path.exists('./db/github.db'):
# 创建数据库和表
create_database()
# 清空表
clear_table()
# 插入数据到数据库
insert_data()
# 查询指定时间段内的数据
keyword_data, repo_data, user_data = select_articles(e_hour)
if keyword_data:
result_1 = get_filtered_articles(keyword_data, Is_short, 1)
else:
result_1 = ""
if repo_data:
result_2 = get_filtered_articles(repo_data, Is_short, 2)
else:
result_2 = ""
if user_data:
result_3 = get_filtered_articles(user_data, Is_short, 3)
else:
result_3 = ""
return result_1, result_2, result_3
if __name__ == "__main__":
result_1, result_2, result_3 = Src_github(24, False)
if result_1 != "":
print(result_1)
elif result_2 != "":
print(result_2)
if result_3 != "":
print(result_3)
else:
# 如果为空,则跳过执行
print("-" * 10)
print("github数据为空跳过执行。")