PyBot/GotoSend_freebuf.py

135 lines
4.0 KiB
Python
Raw Normal View History

2024-12-05 00:03:51 +08:00
import json
import sqlite3
import os
from datetime import datetime, timedelta
from SendBot import SendToFeishu
def create_database():
conn = sqlite3.connect('./db/freebuf.db')
cursor = conn.cursor()
cursor.execute('''CREATE TABLE IF NOT EXISTS articles (
id INTEGER PRIMARY KEY AUTOINCREMENT,
title TEXT,
link TEXT,
description TEXT,
pubDate DATETIME,
category TEXT
)''')
conn.commit()
conn.close()
def insert_data(data):
conn = sqlite3.connect('./db/freebuf.db')
cursor = conn.cursor()
for entry in data:
try:
# 解析 pubDate 字符串为 datetime 对象
pub_date = datetime.strptime(entry['pubDate'], '%a, %d %b %Y %H:%M:%S %z')
# 格式化 pubDate 为所需的格式
formatted_pub_date = pub_date.strftime('%Y-%m-%d %H:%M:%S')
except ValueError:
# 如果解析失败,使用原始 pubDate 字符串
formatted_pub_date = entry['pubDate']
cursor.execute('''
INSERT INTO articles (title, link, description, pubDate, category)
VALUES (?, ?, ?, ?, ?)
''', (entry['title'], entry['link'], entry['description'], formatted_pub_date, entry['category']))
conn.commit()
conn.close()
def get_freebuf_json():
# 检查文件是否存在
if not os.path.exists('./JSON/freebuf.json'):
raise FileNotFoundError(f"freebuf.json文件不存在请检查程序是否运行正常")
# 打开并读取JSON文件
with open('./JSON/freebuf.json', 'r', encoding='utf-8') as file:
data = json.load(file)
# 假设data是一个包含多个JSON对象的列表
if not isinstance(data, list):
raise ValueError("JSON文件格式错误请检查common.py是否异常")
# 提取所需字段并编号
total_data = []
for index, item in enumerate(data, start=1):
entry = {
"id": index,
"title": item.get("title", ""),
"link": item.get("link", ""),
"description": item.get("description", ""),
"pubDate": item.get("pubDate", ""),
"category": item.get("category", "")
}
total_data.append(entry)
return total_data
def select_articles(e_hour):
conn = sqlite3.connect('./db/freebuf.db')
cursor = conn.cursor()
# 获取当前日期和时间
now = datetime.now()
start_time = now - timedelta(hours=e_hour)
end_time = now
# 查询指定时间段内的数据
cursor.execute('''
SELECT * FROM articles
WHERE pubDate BETWEEN ? AND ?
''', (start_time.strftime('%Y-%m-%d %H:%M:%S'), end_time.strftime('%Y-%m-%d %H:%M:%S')))
results = cursor.fetchall()
conn.close()
return results
def clear_table():
conn = sqlite3.connect('./db/freebuf.db')
cursor = conn.cursor()
cursor.execute('DELETE FROM articles')
conn.commit()
conn.close()
def get_filtered_articles(entries):
result = ""
for entry in entries:
result += f"类型:{entry[5]}\t文章:{entry[1]}\n"
result += f"链接:{entry[2]}\t上传时间:{entry[4]}\n"
result += "-" * 40 + "\n" # 添加分隔线以便区分不同文章
return result
def Src_freebuf(e_hour):
if not os.path.exists('./db/freebuf.db'):
# 创建数据库和表
create_database()
# 清空表
clear_table()
# 获取 JSON 数据
freebuf_data = get_freebuf_json()
# 插入数据到数据库
insert_data(freebuf_data)
# 查询指定时间段内的数据
filtered_articles = select_articles(e_hour)
# print(filtered_articles)
if filtered_articles:
results = get_filtered_articles(filtered_articles)
print("Freebuf资讯递送中")
SendToFeishu(results, "Freebuf资讯递送")
print("-" * 40 + "\n")
# print(results)
else:
# 如果为空,则跳过执行
print("Freebuf数据为空跳过执行。")
# print(results)
if __name__ == "__main__":
Src_freebuf(4)