PyBot/GotoSend/sougou_wx.py

196 lines
6.2 KiB
Python
Raw Normal View History

2024-12-30 21:31:54 +08:00
import os
import json
import sqlite3
from datetime import datetime, timedelta
def clear_table():
conn = sqlite3.connect('./db/sougou-wx.db')
cursor = conn.cursor()
cursor.execute('DELETE FROM articles')
conn.commit()
conn.close()
def create_database():
conn = sqlite3.connect('./db/sougou-wx.db')
cursor = conn.cursor()
cursor.execute('''CREATE TABLE IF NOT EXISTS articles (
id INTEGER PRIMARY KEY AUTOINCREMENT,
title TEXT,
link TEXT,
description TEXT,
pubDate DATETIME,
author TEXT,
keyword TEXT,
is_sended BOOLEAN
)''')
conn.commit()
conn.close()
def insert_data(data):
conn = sqlite3.connect('./db/sougou-wx.db')
cursor = conn.cursor()
for entry in data:
# 检查是否存在相同 title 和 author 的记录
cursor.execute('''
SELECT 1 FROM articles WHERE title = ? AND author = ?
''', (entry['title'], entry['author']))
if cursor.fetchone() is None:
# 如果没有找到相同记录,则插入新记录
cursor.execute('''
INSERT INTO articles (title, link, description, pubDate, author, keyword)
VALUES (?, ?, ?, ?, ?, ?)
''', (entry['title'], entry['link'], entry['description'], entry['pubDate'], entry['author'], entry['keyword']))
conn.commit()
conn.close()
def get_json():
# 检查文件是否存在
if not os.path.exists('./JSON/sougou-wx.json'):
raise FileNotFoundError(f"sougou-wx.json文件不存在请检查程序是否运行正常")
# 打开并读取JSON文件
with open('./JSON/sougou-wx.json', 'r', encoding='utf-8') as file:
data = json.load(file)
# 假设data是一个包含多个关键词的字典
total_data = []
for keyword, keyword_data in data.items():
# 检查关键词对应的数据是否为列表
if not isinstance(keyword_data, list):
2025-01-02 13:00:43 +08:00
raise ValueError(f"关键词 {keyword} 对应的数据格式错误,请检查爬取程序是否异常!")
2024-12-30 21:31:54 +08:00
# 提取所需字段并编号
for index, item in enumerate(keyword_data, start=1):
entry = {
"id": index,
"title": item.get("title", ""),
"link": item.get("link", ""),
"description": item.get("description", ""),
"pubDate": item.get("pubDate", ""),
"author": item.get("author", ""),
"keyword": keyword
}
total_data.append(entry)
return total_data
def select_articles():
conn = sqlite3.connect('./db/sougou-wx.db')
cursor = conn.cursor()
# 获取当前日期和时间
now = datetime.now()
two_months_ago = now - timedelta(days=60) # 假设两个月大约60天
try:
# 查询最近的3条未被标记为True的消息且发布时间不超过两个月
cursor.execute('''
SELECT * FROM articles
WHERE is_sended IS NULL AND pubDate BETWEEN ? AND ?
ORDER BY pubDate DESC
LIMIT 3
''', (two_months_ago.strftime('%Y-%m-%d %H:%M:%S'), now.strftime('%Y-%m-%d %H:%M:%S')))
# 查询最近的3条未被标记为True的消息
# cursor.execute('''
# SELECT * FROM articles
# WHERE is_sended IS NULL
# ORDER BY pubDate DESC
# LIMIT 3
# ''')
results = cursor.fetchall()
# print(results)
if results:
for row in results:
article_id = row[0]
cursor.execute('''
UPDATE articles
SET is_sended = True
WHERE id = ?
''', (article_id,))
conn.commit() # 提交事务
except Exception as e:
conn.rollback() # 回滚事务
print(f"Error: {e}")
finally:
cursor.close()
conn.close()
return results
def record_md(result, filename="./history/wx_news.md"):
# 读取现有内容
if os.path.exists(filename):
with open(filename, 'r', encoding='utf-8') as file:
existing_content = file.read()
else:
existing_content = ""
# 将新内容插入到现有内容的开头
new_content = result + existing_content
# 写回文件
with open(filename, 'w', encoding='utf-8') as file:
file.write(new_content)
def get_filtered_articles(entries, Is_short):
result = ""
record = ""
for entry in entries:
if Is_short == False:
result += f"文章:[{entry[1]}]({entry[2]})\n描述:{entry[3]}\n"
result += f"上传时间:{entry[4]}\n"
result += f"作者:{entry[5]}\n"
result += f"关键词:{entry[6]}\n"
result += "\n" + "-" * 40 + "\n" # 添加分隔线以便区分不同文章
if Is_short == True:
result += f"文章:[{entry[1]}]({entry[2]})"
result += f"上传时间:{entry[4]}\n"
result += "\n" + "-" * 3 + "\n" # 添加分隔线以便区分不同文章
record += f"#### 文章:[{entry[1]}]({entry[2]})\n描述:{entry[3]}\n"
record += f"**上传时间**{entry[4]}\n"
record += f"**作者**{entry[5]}\n"
record += f"**关键词**{entry[6]}\n"
record += "\n" + "-" * 40 + "\n" # 添加分隔线以便区分不同文章
record_md(record)
return result
def Src_sougou_wx(Is_short):
if not os.path.exists('./db/sougou-wx.db'):
# 创建数据库和表
create_database()
# 清空表
# clear_table()
# 获取 JSON 数据
sougou_wx_data = get_json()
# 插入数据到数据库
insert_data(sougou_wx_data)
# 查询指定时间段内的数据
filtered_articles = select_articles()
# print(filtered_articles)
if filtered_articles:
results = get_filtered_articles(filtered_articles, Is_short)
return results
else:
return False
if __name__ == "__main__":
reslts = Src_sougou_wx(False)
if reslts != False:
print(reslts)
else:
# 如果为空,则跳过执行
print("-" * 40)
print("微信公众号数据为空,跳过执行。")