PyBot/GotoSend/anquanke.py

205 lines
6.1 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

# -*- coding: utf-8 -*-
import json
import sqlite3
import os
from datetime import datetime, timedelta
def create_database():
conn = sqlite3.connect('./resources/db/anquanke.db')
cursor = conn.cursor()
cursor.execute('''CREATE TABLE IF NOT EXISTS articles (
id INTEGER PRIMARY KEY AUTOINCREMENT,
title TEXT,
guid TEXT,
source TEXT,
description TEXT,
pubDate DATETIME,
author TEXT
)''')
conn.commit()
conn.close()
def insert_data(data):
conn = sqlite3.connect('./resources/db/anquanke.db')
cursor = conn.cursor()
for entry in data:
cursor.execute('''
INSERT INTO articles (title, guid, source, description, pubDate, author)
VALUES (?, ?, ?, ?, ?, ?)
''', (entry['title'], entry['guid'], entry['source'], entry['description'], entry['pubDate'], entry['author']))
conn.commit()
conn.close()
def get_anquanke_json():
# 检查文件是否存在
if not os.path.exists('./resources/JSON/anquanke.json'):
data = []
# logger.error(f"anquanke.json文件不存在请检查爬虫程序是否运行正常")
print(f"anquanke.json文件不存在请检查爬虫程序是否运行正常")
else:
# 打开并读取JSON文件
with open('./resources/JSON/anquanke.json', 'r', encoding='utf-8') as file:
content = file.read()
if not content:
pass
else:
data = json.loads(content)
# 假设data是一个包含多个JSON对象的列表
if not isinstance(data, list):
raise ValueError("JSON文件格式错误请检查common.py是否异常")
# 提取所需字段并编号
total_data = []
for index, item in enumerate(data, start=1):
entry = {
"id": index,
"title": item.get("title", ""),
"guid": item.get("guid", ""),
"description": item.get("description", ""),
"pubDate": item.get("pubDate", ""),
"author": item.get("author", ""),
"source": item.get("source", "")
}
total_data.append(entry)
return total_data
def select_articles(e_hour):
conn = sqlite3.connect('./resources/db/anquanke.db')
cursor = conn.cursor()
# 获取当前日期和时间
now = datetime.now()
start_time = now - timedelta(hours=e_hour)
end_time = now
# 查询指定时间段内的数据
cursor.execute('''
SELECT * FROM articles
WHERE pubDate BETWEEN ? AND ?
''', (start_time.strftime('%Y-%m-%d %H:%M:%S'), end_time.strftime('%Y-%m-%d %H:%M:%S')))
results = cursor.fetchall()
conn.close()
return results
def clear_table():
conn = sqlite3.connect('./resources/db/anquanke.db')
cursor = conn.cursor()
cursor.execute('DELETE FROM articles')
conn.commit()
conn.close()
def record(title, link, author, upload_time, description=None, source=None, category=None):
db_path = './resources/db/web.db'
conn = sqlite3.connect(db_path)
cursor = conn.cursor()
# 创建表(如果不存在)
cursor.execute('''CREATE TABLE IF NOT EXISTS sec_news (
id INTEGER PRIMARY KEY AUTOINCREMENT,
article_title TEXT,
link TEXT,
author TEXT,
description TEXT,
source TEXT,
upload_time DATETIME,
category TEXT
)''')
# 插入数据
cursor.execute('''
INSERT INTO sec_news (article_title, link, author, description, source, upload_time, category)
VALUES (?, ?, ?, ?, ?, ?, ?)
''', (title, link, author, description, source, upload_time, category))
conn.commit()
conn.close()
def get_filtered_articles(entries):
result_long = ""
result_short = ""
# record = ""
short_results = []
for entry in entries:
# 构建长文本结果
result_long += f"文章:[{entry[1]}]({entry[2]})\n作者:{entry[6]}\n来源:{entry[3]}\n"
result_long += f"上传时间:{entry[5]}\n"
result_long += "\n" + "-" * 40 + "\n" # 添加分隔线以便区分不同文章
# 构建短文本结果并进行分块处理
current_entry = (
f"文章:[{entry[1]}]({entry[2]})\n"
f"上传时间:{entry[5]}\n"
"\n" + "-" * 3 + "\n" # 添加分隔线以便区分不同文章
)
temp_result = result_short + current_entry
if len(temp_result.encode('utf-8')) > 4096:
short_results.append(result_short)
result_short = current_entry
else:
result_short = temp_result
record(
title=entry[1],
link=entry[2],
author=entry[6],
source="安全客",
upload_time=entry[5]
)
# 处理最后一个结果
if result_short:
short_results.append(result_short)
return result_long, short_results
def Src_anquanke(e_hour):
if not os.path.exists('./resources/db/anquanke.db'):
# 创建数据库和表
create_database()
# 清空表
clear_table()
# 获取 JSON 数据
M_anquanke_data = get_anquanke_json()
# 插入数据到数据库
insert_data(M_anquanke_data)
# 查询指定时间段内的数据
filtered_articles = select_articles(e_hour)
# print(filtered_articles)
if filtered_articles:
result_long, short_results = get_filtered_articles(filtered_articles)
return result_long, short_results
else:
return False
if __name__ == "__main__":
results = Src_anquanke(4)
if results != False:
result_long, short_results = results
# 打印长文本结果
print("长文本结果:")
print(result_long)
print("\n" + "-" * 40 + "\n")
# 打印分块的短文本结果
print("分块的短文本结果:")
for short_result in short_results:
print(short_result)
print("\n" + "-" * 40 + "\n")
else:
# 如果为空,则跳过执行
print("-" * 40)
print("安全客数据为空,跳过执行。")