PyBot/GotoSend/doonsec.py
2025-01-02 17:11:11 +08:00

185 lines
6.0 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

# -*- coding: utf-8 -*-
import json
import sqlite3
import os
from datetime import datetime, timedelta
def create_database():
conn = sqlite3.connect('./resources/db/doonsec.db')
cursor = conn.cursor()
cursor.execute('''CREATE TABLE IF NOT EXISTS articles (
id INTEGER PRIMARY KEY AUTOINCREMENT,
title TEXT,
link TEXT,
description TEXT,
pubDate DATETIME,
author TEXT
)''')
conn.commit()
conn.close()
def insert_data(data):
conn = sqlite3.connect('./resources/db/doonsec.db')
cursor = conn.cursor()
for entry in data:
try:
# 解析 pubDate 字符串为 datetime 对象
pub_date = datetime.strptime(entry['pubDate'], '%Y-%m-%dT%H:%M:%S')
# 格式化 pubDate 为所需的格式
formatted_pub_date = pub_date.strftime('%Y-%m-%d %H:%M:%S')
except ValueError:
# 如果解析失败,使用原始 pubDate 字符串
formatted_pub_date = entry['pubDate']
cursor.execute('''
INSERT INTO articles (title, link, description, pubDate, author)
VALUES (?, ?, ?, ?, ?)
''', (entry['title'], entry['link'], entry['description'], formatted_pub_date, entry['author']))
conn.commit()
conn.close()
def get_doonsec_json():
# 检查文件是否存在
if not os.path.exists('./resources/JSON/doonsec.json'):
raise FileNotFoundError(f"doonsec.json文件不存在请检查程序是否运行正常")
# 打开并读取JSON文件
with open('./resources/JSON/doonsec.json', 'r', encoding='utf-8') as file:
data = json.load(file)
# 假设data是一个包含多个JSON对象的列表
if not isinstance(data, list):
raise ValueError("JSON文件格式错误请检查common.py是否异常")
# 提取所需字段并编号
total_data = []
for index, item in enumerate(data, start=1):
entry = {
"id": index,
"title": item.get("title", ""),
"link": item.get("link", ""),
"description": item.get("description", ""),
"pubDate": item.get("pubDate", ""),
"author": item.get("author", ""),
}
total_data.append(entry)
return total_data
def select_articles(e_hour, Doonsec_switch, Doonsec):
conn = sqlite3.connect('./resources/db/doonsec.db')
cursor = conn.cursor()
# 获取当前日期和时间
now = datetime.now()
start_time = now - timedelta(hours=e_hour, minutes=3)
end_time = now
if Doonsec_switch == False:
# 查询指定时间段内的数据
cursor.execute('''
SELECT * FROM articles
WHERE pubDate BETWEEN ? AND ?
''', (start_time.strftime('%Y-%m-%d %H:%M:%S'), end_time.strftime('%Y-%m-%d %H:%M:%S')))
elif Doonsec_switch == True:
# 查询指定时间段内且title包含特定关键词的数据
placeholders = ', '.join(['?'] * len(Doonsec))
query = f'''
SELECT * FROM articles
WHERE pubDate BETWEEN ? AND ?
AND (title LIKE ? OR title LIKE ? OR title LIKE ?)
'''
# 构建参数列表
params = [start_time.strftime('%Y-%m-%d %H:%M:%S'), end_time.strftime('%Y-%m-%d %H:%M:%S')]
for keyword in Doonsec:
params.append(f'%{keyword}%')
# 执行查询
cursor.execute(query, params)
results = cursor.fetchall()
conn.close()
return results
def clear_table():
conn = sqlite3.connect('./resources/db/doonsec.db')
cursor = conn.cursor()
cursor.execute('DELETE FROM articles')
conn.commit()
conn.close()
def record_md(result, filename="./resources/history/tech_passage.md"):
# 读取现有内容
if os.path.exists(filename):
with open(filename, 'r', encoding='utf-8') as file:
existing_content = file.read()
else:
existing_content = ""
# 将新内容插入到现有内容的开头
new_content = result + existing_content
# 写回文件
with open(filename, 'w', encoding='utf-8') as file:
file.write(new_content)
def get_filtered_articles(entries, Is_short):
result = ""
record = ""
for entry in entries:
if Is_short == False:
result += f"文章:[{entry[1]}]({entry[2]})\n"
result += f"作者:{entry[5]}\n"
result += f"上传时间:{entry[4]}\n"
result += f"简介:{entry[3]}\n"
result += "\n" + "-" * 40 + "\n" # 添加分隔线以便区分不同文章
if Is_short == True:
result += f"文章:[{entry[1]}]({entry[2]})\n"
result += f"上传时间:{entry[4]}\n"
result += "\n" + "-" * 3 + "\n" # 添加分隔线以便区分不同文章
record += f"#### 文章:[{entry[1]}]({entry[2]})\n"
record += f"**作者**{entry[5]}\n"
record += f"**上传时间**{entry[4]}\n"
record += f"**简介**{entry[3]}\n"
record += "\n" + "-" * 40 + "\n" # 添加分隔线以便区分不同文章
record_md(record)
return result
def Src_doonsec(e_hour, Is_short, Doonsec_switch, Doonsec):
if not os.path.exists('./resources/db/doonsec.db'):
# 创建数据库和表
create_database()
# 清空表
clear_table()
# 获取 JSON 数据
M_doonsec_data = get_doonsec_json()
# 插入数据到数据库
insert_data(M_doonsec_data)
# 查询指定时间段内的数据
filtered_articles = select_articles(e_hour, Doonsec_switch, Doonsec)
# print(filtered_articles)
if filtered_articles:
results = get_filtered_articles(filtered_articles, Is_short)
return results
else:
return False
if __name__ == "__main__":
reslts = Src_doonsec(24, False, True, ["webshell", "2000", "POC"] )
if reslts != False:
print(reslts)
else:
# 如果为空,则跳过执行
print("-" * 40)
print("洞见微信安全数据为空,跳过执行。")