PyBot/GotoSend/uni_rss.py

233 lines
7.3 KiB
Python
Raw Normal View History

2025-03-14 11:50:28 +08:00
# -*- coding: utf-8 -*-
import json
import sqlite3
import os
from datetime import datetime, timedelta
def create_database():
conn = sqlite3.connect('./resources/db/uni_rss.db')
cursor = conn.cursor()
cursor.execute('''CREATE TABLE IF NOT EXISTS articles (
id INTEGER PRIMARY KEY AUTOINCREMENT,
title TEXT,
link TEXT,
author TEXT,
description TEXT,
pubDate DATETIME,
source TEXT,
is_sended BOOLEAN
)''')
conn.commit()
conn.close()
def insert_data(data):
conn = sqlite3.connect('./resources/db/uni_rss.db')
cursor = conn.cursor()
for entry in data:
cursor.execute('''
SELECT 1 FROM articles WHERE title = ? AND author = ?
''', (entry['title'], entry['author']))
if cursor.fetchone() is None:
cursor.execute('''
INSERT INTO articles (title, link, author, description, pubDate, source)
VALUES (?, ?, ?, ?, ?, ?)
''', (entry['title'], entry['link'], entry['author'], entry['description'], entry['published'], entry['source']))
conn.commit()
conn.close()
def get_uni_rss_json():
# 检查文件是否存在
if not os.path.exists('./resources/JSON/uni_rss.json'):
raise FileNotFoundError(f"uni_rss.json文件不存在请检查程序是否运行正常")
# 打开并读取JSON文件
with open('./resources/JSON/uni_rss.json', 'r', encoding='utf-8') as file:
content = file.read()
if not content:
pass
else:
data = json.loads(content)
# 假设data是一个包含多个JSON对象的列表
if not isinstance(data, list):
raise ValueError("JSON文件格式错误请检查common.py是否异常")
# 提取所需字段并编号
total_data = []
for index, item in enumerate(data, start=1):
entry = {
"id": index,
"title": item.get("title", ""),
"link": item.get("link", ""),
"author": item.get("author", ""),
"description": item.get("description", ""),
"published": item.get("published", ""),
"source": item.get("source", ""),
}
total_data.append(entry)
return total_data
def select_articles(uni_rss_switch, Unity):
conn = sqlite3.connect('./resources/db/uni_rss.db')
cursor = conn.cursor()
# 获取当前日期和时间
now = datetime.now()
two_days_ago = now - timedelta(days=2)
if uni_rss_switch == False:
# 查询指定时间段内的数据
cursor.execute('''
SELECT * FROM articles
WHERE is_sended IS NULL AND pubDate BETWEEN ? AND ?
ORDER BY pubDate DESC
''', (two_days_ago.strftime('%Y-%m-%d %H:%M:%S'), now.strftime('%Y-%m-%d %H:%M:%S')))
elif uni_rss_switch == True:
# 查询指定时间段内且title包含特定关键词的数据
like_conditions = ' OR '.join([f"title LIKE ?"] * len(Unity))
# 完整的查询语句
query = f'''
SELECT * FROM articles
WHERE is_sended IS NULL AND pubDate BETWEEN ? AND ?
AND ({like_conditions})
ORDER BY pubDate DESC
'''
# 构建参数列表
params = [
two_days_ago.strftime('%Y-%m-%d %H:%M:%S'),
now.strftime('%Y-%m-%d %H:%M:%S')
] + [f'%{keyword}%' for keyword in Unity]
# 执行查询
cursor.execute(query, params)
results = cursor.fetchall()
if results:
for row in results:
user_id = row[0]
cursor.execute('''
UPDATE articles
SET is_sended = True
WHERE id = ?
''', (user_id,))
conn.commit() # 提交事务
conn.close()
return results
def clear_table():
conn = sqlite3.connect('./resources/db/uni_rss.db')
cursor = conn.cursor()
cursor.execute('DELETE FROM articles')
conn.commit()
conn.close()
def record_md(result, filename="./resources/history/uni_passage.md"):
# 读取现有内容
if os.path.exists(filename):
with open(filename, 'r', encoding='utf-8') as file:
existing_content = file.read()
else:
existing_content = ""
# 将新内容插入到现有内容的开头
new_content = result + existing_content
# 写回文件
with open(filename, 'w', encoding='utf-8') as file:
file.write(new_content)
def get_filtered_articles(entries):
result_long = ""
result_short = ""
record = ""
short_results = []
for entry in entries:
# 构建长文本结果
result_long += f"文章:[{entry[1]}]({entry[2]})\n"
result_long += f"作者:{entry[3]}\n"
result_long += f"描述:{entry[4]}\n"
result_long += f"上传时间:{entry[5]}\n"
result_long += f"来源:{entry[6]}\n"
result_long += "\n" + "-" * 40 + "\n" # 添加分隔线以便区分不同文章
# 构建短文本结果并进行分块处理
current_entry = (
f"文章:[{entry[1]}]({entry[2]})\n"
f"上传时间:{entry[5]}\n"
"\n" + "-" * 3 + "\n" # 添加分隔线以便区分不同文章
)
temp_result = result_short + current_entry
if len(temp_result.encode('utf-8')) > 4096:
short_results.append(result_short)
result_short = current_entry
else:
result_short = temp_result
record += f"#### 文章:[{entry[1]}]({entry[2]})\n"
record += f"**作者**{entry[3]}\n"
record += f"**描述**{entry[4]}\n"
record += f"**上传时间**{entry[5]}\n"
record += f"**来源**{entry[6]}\n"
record += "\n" + "-" * 40 + "\n" # 添加分隔线以便区分不同文章
# 处理最后一个结果
if result_short:
short_results.append(result_short)
record_md(record)
return result_long, short_results
def Src_uni_rss(uni_rss_switch, Unity):
if not os.path.exists('./resources/db/uni_rss.db'):
# 创建数据库和表
create_database()
# 清空表
# clear_table()
# 获取 JSON 数据
uni_rss_data = get_uni_rss_json()
# 插入数据到数据库
insert_data(uni_rss_data)
# 查询指定时间段内的数据
filtered_articles = select_articles(uni_rss_switch, Unity)
# print(filtered_articles)
if filtered_articles:
result_long, short_results = get_filtered_articles(filtered_articles)
return result_long, short_results
else:
return False
if __name__ == "__main__":
results = Src_uni_rss(False, ["webshell", "2000", "POC", "SQL", "XSS", "CSRF", "漏洞"])
if results:
result_long, short_results = results
# 写入长文本结果
with open("./1.txt", "a", encoding="utf-8") as f:
f.write(result_long)
f.write("\n" + "-" * 40 + "\n")
# 写入分块的短文本结果
for short_result in short_results:
with open("./2.txt", "a", encoding="utf-8") as f:
f.write(short_result)
f.write("\n" + "-" * 40 + "\n")
else:
# 如果为空,则跳过执行
print("-" * 40)
print("聚合RSS数据为空跳过执行。")