Compare commits

..

No commits in common. "main" and "beta0.1" have entirely different histories.

65 changed files with 312 additions and 2693 deletions

164
Core.py
View File

@ -14,11 +14,9 @@ import requests
from datetime import datetime, timedelta from datetime import datetime, timedelta
from SendCore.FeishuSendBot import SendToFeishu, gen_sign from SendCore.FeishuSendBot import SendToFeishu, gen_sign
from SendCore.QiweiSendBot import SendToWX from SendCore.QiweiSendBot import SendToWX
from spider.common import run, seebug_main, M_4hou_main, anquanke_main, sec_wiki_main, huawei_main, doonsec_main, qianxin_main from media.common import run, seebug_main, M_4hou_main, anquanke_main, sec_wiki_main, huawei_main, doonsec_main, qianxin_main
from spider.freebuf import freebuf_main from media.freebuf import freebuf_main
from spider.xianzhi import xianzhi_main from media.xianzhi import xianzhi_main
from spider.sougou_wx import sougou_wx_main
from spider.github import github_main
from GotoSend.M_4hou import Src_4hou from GotoSend.M_4hou import Src_4hou
from GotoSend.anquanke import Src_anquanke from GotoSend.anquanke import Src_anquanke
from GotoSend.doonsec import Src_doonsec from GotoSend.doonsec import Src_doonsec
@ -26,37 +24,32 @@ from GotoSend.xianzhi import Src_xianzhi
from GotoSend.freebuf import Src_freebuf from GotoSend.freebuf import Src_freebuf
from GotoSend.qianxin import Src_qianxin from GotoSend.qianxin import Src_qianxin
from GotoSend.seebug import Src_seebug from GotoSend.seebug import Src_seebug
from GotoSend.sougou_wx import Src_sougou_wx
from GotoSend.github import Src_github
from config.check_config import get_core_config, get_debug_config, get_kewords_config
from loguru import logger from loguru import logger
# 清除所有已有的日志记录器配置 # 清除所有已有的日志记录器配置
logger.remove() logger.remove()
logger.add("./resources/log/core.log", logger.add("./log/core.log",
format="{time:YYYY-MM-DD HH:mm:ss} - {level} - {name}:{function}:{line} - {message}", format="{time:YYYY-MM-DD HH:mm:ss} - {level} - {name}:{function}:{line} - {message}",
rotation="100 MB", rotation="100 MB",
compression="zip", compression="zip",
encoding="utf-8") encoding="utf-8")
# shell终端打印日志 # shell终端打印日志
debug = get_debug_config() # logger.add(lambda msg: print(msg),
if debug == "True": # format="{time:YYYY-MM-DD HH:mm:ss} - {level} - {name}:{function}:{line} - {message}")
logger.add(lambda msg: print(msg),
format="{time:YYYY-MM-DD HH:mm:ss} - {level} - {name}:{function}:{line} - {message}")
def signal_handler(sig, frame): # 加载参数
logger.info("接收到退出信号,程序即将退出...") with open('./config.yaml', 'r', encoding="utf-8") as file:
sys.exit(0) config = yaml.safe_load(file)
# sleep_time = int(f"{config['sleep_time']}")
e_hour = int(f"{config['e_hour']}")
choice = int(f"{config['circle']}")
fs_activate = f"{config['fs_activate']}"
wx_activate = f"{config['wx_activate']}"
ding_activate = f"{config['ding_activate']}"
lx_activate = f"{config['lx_activate']}"
# 全局变量
signal.signal(signal.SIGINT, signal_handler) # Ctrl+C
signal.signal(signal.SIGTERM, signal_handler) # kill命令
webhook_url_once, timestamp_once, sign_once = gen_sign() webhook_url_once, timestamp_once, sign_once = gen_sign()
e_hour, time_choice, choice, fs_activate, wx_activate, ding_activate, lx_activate, url_web = get_core_config()
Sogou_WX, Doonsec_switch, Doonsec = get_kewords_config()
def check_avaliable(info_long, info_short, title, webhook_url, timestamp, sign): def check_avaliable(info_long, info_short, title, webhook_url, timestamp, sign):
if info_long: # 发送完整文章相关内容 if info_long: # 发送完整文章相关内容
@ -87,8 +80,10 @@ def check_avaliable(info_long, info_short, title, webhook_url, timestamp, sign):
if not info_long and not info_short: if not info_long and not info_short:
logger.info(f"{title}数据为空,跳过执行。") logger.info(f"{title}数据为空,跳过执行。")
def send_job_RSS(time_1): def send_job(time_1):
# 爬取数据 # 爬取数据
logger.info(f"发送程序启动,当前时间为:{datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")
logger.info("正在启动各爬虫并获取资源中...")
seebug_main() seebug_main()
anquanke_main() anquanke_main()
huawei_main() huawei_main()
@ -101,7 +96,7 @@ def send_job_RSS(time_1):
# 分析各个数据源的结果(输出长结果) # 分析各个数据源的结果(输出长结果)
result_4hou_long = Src_4hou(time_1, False) result_4hou_long = Src_4hou(time_1, False)
result_anquanke_long = Src_anquanke(time_1, False) result_anquanke_long = Src_anquanke(time_1, False)
result_doonsec_long = Src_doonsec(time_1, False, Doonsec_switch, Doonsec) result_doonsec_long = Src_doonsec(time_1, False)
result_xianzhi_long = Src_xianzhi(time_1, False) result_xianzhi_long = Src_xianzhi(time_1, False)
result_freebuf_long = Src_freebuf(time_1, False) result_freebuf_long = Src_freebuf(time_1, False)
result_qianxin_long = Src_qianxin(time_1, False) result_qianxin_long = Src_qianxin(time_1, False)
@ -109,7 +104,7 @@ def send_job_RSS(time_1):
# 分析各个数据源的结果(输出短结果) # 分析各个数据源的结果(输出短结果)
result_4hou_short = Src_4hou(time_1, True) result_4hou_short = Src_4hou(time_1, True)
result_anquanke_short = Src_anquanke(time_1, True) result_anquanke_short = Src_anquanke(time_1, True)
result_doonsec_short = Src_doonsec(time_1, True, Doonsec_switch, Doonsec) result_doonsec_short = Src_doonsec(time_1, True)
result_xianzhi_short = Src_xianzhi(time_1, True) result_xianzhi_short = Src_xianzhi(time_1, True)
result_freebuf_short = Src_freebuf(time_1, True) result_freebuf_short = Src_freebuf(time_1, True)
result_qianxin_short = Src_qianxin(time_1, True) result_qianxin_short = Src_qianxin(time_1, True)
@ -125,23 +120,52 @@ def send_job_RSS(time_1):
check_avaliable(result_qianxin_long, result_qianxin_short, "奇安信攻防社区资讯", webhook_url, timestamp, sign) check_avaliable(result_qianxin_long, result_qianxin_short, "奇安信攻防社区资讯", webhook_url, timestamp, sign)
check_avaliable(result_seebug_long, result_seebug_short, "Seebug社区资讯", webhook_url, timestamp, sign) check_avaliable(result_seebug_long, result_seebug_short, "Seebug社区资讯", webhook_url, timestamp, sign)
def send_job_SX(): if fs_activate == "True":
sougou_wx_main(Sogou_WX) send_result = SendToFeishu("[点此访问](https://info.masonliu.com)网站以查看全部文章。", "单次运行结束", webhook_url, timestamp, sign)
result_sx_long = Src_sougou_wx(False) logger.info(send_result)
result_sx_short = Src_sougou_wx(True) else:
webhook_url, timestamp, sign = gen_sign() pass
check_avaliable(result_sx_long, result_sx_short, "微信公众号关键词相关内容", webhook_url, timestamp, sign) if wx_activate == "True":
send_result = SendToWX("[点此访问](https://info.masonliu.com)网站以查看全部文章。", "单次运行结束")
logger.info(send_result)
else:
pass
logger.info("执行完毕,等待下一次执行...")
def send_job_github(time_1): def signal_handler(sig, frame):
github_main() logger.info("接收到退出信号,程序即将退出...")
result_github_1_long, result_github_2_long, result_github_3_long, result_github_4_long = Src_github(time_1, False) sys.exit(0)
result_github_1_short, result_github_2_short, result_github_3_short, result_github_4_short = Src_github(time_1, True)
webhook_url, timestamp, sign = gen_sign()
check_avaliable(result_github_1_long, result_github_1_short, "Github项目监控-关键词监控", webhook_url, timestamp, sign) signal.signal(signal.SIGINT, signal_handler) # Ctrl+C
check_avaliable(result_github_2_long, result_github_2_short, "Github项目监控-项目更新情况", webhook_url, timestamp, sign) signal.signal(signal.SIGTERM, signal_handler) # kill命令
webhook_url, timestamp, sign = gen_sign()
check_avaliable(result_github_3_long, result_github_3_short, "Github项目监控-大佬工具", webhook_url, timestamp, sign)
check_avaliable(result_github_4_long, result_github_4_short, "Github项目监控-项目版本发布监测", webhook_url, timestamp, sign) def main_loop(choice):
if choice == 1:
while True:
try:
# 执行任务
send_job(e_hour)
time.sleep(e_hour * 60 * 60 - 3 * 60)
except Exception as e:
logger.error(f"发生错误: {e}, 程序已暂停")
# result = SendToFeishu(f"发生错误: {e}, 程序已退出", "报错信息")
# logger.info(result)
exit()
elif choice == 0:
# 设置每天的特定时间点执行job函数
schedule.every().day.at("09:00").do(send_job, 12)
schedule.every().day.at("12:00").do(send_job, 3)
schedule.every().day.at("15:00").do(send_job, 3)
schedule.every().day.at("18:00").do(send_job, 3)
schedule.every().day.at("21:00").do(send_job, 3)
while True:
schedule.run_pending()
time.sleep(60) # 每分钟检查一次是否有任务需要执行
# 探测rss源状态 # 探测rss源状态
def check_rss_status(url): def check_rss_status(url):
@ -178,60 +202,23 @@ def test_rss_source():
return rss_info return rss_info
def main_job(e_hour): if __name__ == "__main__":
logger.info(f"发送程序启动,当前时间为:{datetime.now().strftime('%Y-%m-%d %H:%M:%S')}") print("程序正在运行当中。")
logger.info("正在启动各爬虫并获取资源中...") time.sleep(5) # 添加短暂的延迟
if 0 in choice:
send_job_RSS(e_hour)
if 1 in choice:
send_job_SX()
if 2 in choice:
send_job_github(e_hour)
def main_loop(time_choice):
if time_choice == 1:
while True:
try:
# 执行任务
main_job(e_hour)
time.sleep(e_hour * 60 * 60 - 3 * 60)
except Exception as e:
logger.error(f"发生错误: {e}, 程序已暂停")
# result = SendToFeishu(f"发生错误: {e}, 程序已退出", "报错信息")
# logger.info(result)
exit()
elif time_choice == 0:
# 设置每天的特定时间点执行job函数
schedule.every().day.at("09:00").do(main_job, 12)
schedule.every().day.at("12:00").do(main_job, 3)
schedule.every().day.at("15:00").do(main_job, 3)
schedule.every().day.at("18:00").do(main_job, 3)
schedule.every().day.at("21:00").do(main_job, 3)
while True:
schedule.run_pending()
time.sleep(60) # 每分钟检查一次是否有任务需要执行
def send_first_message():
rss_info = test_rss_source() rss_info = test_rss_source()
start_info = "" start_info = ""
start_info += "程序已启动,当前时间为:" + datetime.now().strftime("%Y-%m-%d %H:%M:%S") + "\n" start_info += "程序已启动,当前时间为:" + datetime.now().strftime("%Y-%m-%d %H:%M:%S") + "\n"
start_info += "程序作者MasonLiu \t 开源地址:[GM-gitea](https://git.masonliu.com/MasonLiu/PyBot)" + "\n" start_info += "程序作者MasonLiu \t 开源地址:[GM-gitea](https://git.masonliu.com/MasonLiu/PyBot)" + "\n"
if time_choice == 1: if choice == 1:
start_info += "时间配置:每隔" + str(e_hour) + "小时执行一次推送\n" start_info += "时间配置:每隔" + str(e_hour) + "小时执行一次推送\n"
elif time_choice == 0: else:
start_info += "时间配置:每天固定时间点执行推送\n" start_info += "时间配置:每天固定时间点执行推送\n"
if 0 in choice:
if fs_activate == "True": if fs_activate == "True":
result = SendToFeishu(start_info, "程序信息", webhook_url_once, timestamp_once, sign_once) result = SendToFeishu(start_info, "程序信息", webhook_url_once, timestamp_once, sign_once)
logger.info(result) logger.info(result)
result = SendToFeishu(rss_info, "RSS源状态", webhook_url_once, timestamp_once, sign_once) result = SendToFeishu(rss_info, "RSS源状态", webhook_url_once, timestamp_once, sign_once)
# logger.info(rss_info) # logger.info(rss_info)
logger.info(result) logger.info(result)
send_result = SendToFeishu(f"[点此访问]({url_web})网站以查看全部文章。", "首次运行提醒", webhook_url_once, timestamp_once, sign_once)
logger.info(send_result)
else: else:
pass pass
if wx_activate == "True": if wx_activate == "True":
@ -240,18 +227,11 @@ def send_first_message():
result = SendToWX(rss_info, "RSS源状态") result = SendToWX(rss_info, "RSS源状态")
# logger.info(rss_info) # logger.info(rss_info)
logger.info(result) logger.info(result)
send_result = SendToWX(f"[点此访问]({url_web})网站以查看全部文章。", "首次运行提醒")
logger.info(send_result)
else: else:
pass pass
if __name__ == "__main__":
logger.info("程序正在运行当中。")
time.sleep(5) # 添加短暂的延迟
# 首次运行先暂停两分钟 # 首次运行先暂停两分钟
# time.sleep(2 * 60) # time.sleep(2 * 60)
# 主程序 # 主程序
send_first_message() main_loop(choice)
main_loop(time_choice)

View File

@ -1,37 +1,10 @@
# -*- coding: utf-8 -*-
"""
@Author: MasonLiu
@Description: 测试用脚本无需关注
"""
import schedule
import os
import signal
import sys
import time
import yaml
import requests
from datetime import datetime, timedelta
from SendCore.FeishuSendBot import SendToFeishu, gen_sign from SendCore.FeishuSendBot import SendToFeishu, gen_sign
from SendCore.QiweiSendBot import SendToWX
from spider.common import run, seebug_main, M_4hou_main, anquanke_main, sec_wiki_main, huawei_main, doonsec_main, qianxin_main
from spider.freebuf import freebuf_main
from spider.xianzhi import xianzhi_main
from spider.sougou_wx import sougou_wx_main
from spider.github import github_main
from GotoSend.M_4hou import Src_4hou
from GotoSend.anquanke import Src_anquanke
from GotoSend.doonsec import Src_doonsec
from GotoSend.xianzhi import Src_xianzhi
from GotoSend.freebuf import Src_freebuf
from GotoSend.qianxin import Src_qianxin
from GotoSend.seebug import Src_seebug
from GotoSend.sougou_wx import Src_sougou_wx
from GotoSend.github import Src_github
from config.check_config import get_core_config, get_debug_config, get_kewords_config
from loguru import logger
webhook_url, timestamp, sign = gen_sign()
# 测试用消息体
test_msg = {
"请单件文档查看昨天讨论的方案相关飞书文档,注意作者为 <font color=red> **张三** <font> 版本为 \n*002* ,版本 ~~001~~ 已经删除。文件地址是 [https://www.feishu.cn](https://www.feishu.cn),打开次数:1次"
}
if __name__ == "__main__": SendToFeishu(test_msg, "先知社区资讯递送", webhook_url, timestamp, sign)
logger.info("程序正在运行当中。")

View File

@ -6,7 +6,7 @@ import os
from datetime import datetime, timedelta from datetime import datetime, timedelta
def create_database(): def create_database():
conn = sqlite3.connect('./resources/db/4hou.db') conn = sqlite3.connect('./db/4hou.db')
cursor = conn.cursor() cursor = conn.cursor()
cursor.execute('''CREATE TABLE IF NOT EXISTS articles ( cursor.execute('''CREATE TABLE IF NOT EXISTS articles (
id INTEGER PRIMARY KEY AUTOINCREMENT, id INTEGER PRIMARY KEY AUTOINCREMENT,
@ -20,7 +20,7 @@ def create_database():
conn.close() conn.close()
def insert_data(data): def insert_data(data):
conn = sqlite3.connect('./resources/db/4hou.db') conn = sqlite3.connect('./db/4hou.db')
cursor = conn.cursor() cursor = conn.cursor()
for entry in data: for entry in data:
try: try:
@ -41,11 +41,11 @@ def insert_data(data):
def get_4hou_json(): def get_4hou_json():
# 检查文件是否存在 # 检查文件是否存在
if not os.path.exists('./resources/JSON/4hou.json'): if not os.path.exists('./JSON/4hou.json'):
raise FileNotFoundError(f"4hou.json文件不存在请检查程序是否运行正常") raise FileNotFoundError(f"4hou.json文件不存在请检查程序是否运行正常")
# 打开并读取JSON文件 # 打开并读取JSON文件
with open('./resources/JSON/4hou.json', 'r', encoding='utf-8') as file: with open('./JSON/4hou.json', 'r', encoding='utf-8') as file:
data = json.load(file) data = json.load(file)
# 假设data是一个包含多个JSON对象的列表 # 假设data是一个包含多个JSON对象的列表
@ -68,7 +68,7 @@ def get_4hou_json():
return total_data return total_data
def select_articles(e_hour): def select_articles(e_hour):
conn = sqlite3.connect('./resources/db/4hou.db') conn = sqlite3.connect('./db/4hou.db')
cursor = conn.cursor() cursor = conn.cursor()
# 获取当前日期和时间 # 获取当前日期和时间
@ -87,13 +87,13 @@ def select_articles(e_hour):
return results return results
def clear_table(): def clear_table():
conn = sqlite3.connect('./resources/db/4hou.db') conn = sqlite3.connect('./db/4hou.db')
cursor = conn.cursor() cursor = conn.cursor()
cursor.execute('DELETE FROM articles') cursor.execute('DELETE FROM articles')
conn.commit() conn.commit()
conn.close() conn.close()
def record_md(result, filename="./resources/history/sec_news.md"): def record_md(result, filename="./history/sec_news.md"):
# 读取现有内容 # 读取现有内容
if os.path.exists(filename): if os.path.exists(filename):
with open(filename, 'r', encoding='utf-8') as file: with open(filename, 'r', encoding='utf-8') as file:
@ -112,16 +112,17 @@ def get_filtered_articles(entries, Is_short):
record = "" record = ""
for entry in entries: for entry in entries:
if Is_short == False: if Is_short == False:
result += f"文章:[{entry[1]}]({entry[2]})\n作者:{entry[5]}\n" result += f"作者:{entry[5]}\n文章:{entry[1]}\n"
result += f"上传时间:{entry[4]}\n" result += f"链接:{entry[2]}\n上传时间:{entry[4]}\n"
result += "\n" + "-" * 40 + "\n" # 添加分隔线以便区分不同文章 result += "\n" + "-" * 40 + "\n" # 添加分隔线以便区分不同文章
if Is_short == True: if Is_short == True:
result += f"文章:[{entry[1]}]({entry[2]})\n" result += f"文章:{entry[1]}\n"
result += f"链接:{entry[2]}\n上传时间:{entry[4]}\n" result += f"链接:{entry[2]}\n上传时间:{entry[4]}\n"
result += "\n" + "-" * 3 + "\n" # 添加分隔线以便区分不同文章 result += "\n" + "-" * 3 + "\n" # 添加分隔线以便区分不同文章
record += f"#### 文章:[{entry[1]}]({entry[2]})\n" record += f"#### 文章:{entry[1]}\n"
record += f"**作者**{entry[5]}\n" record += f"**作者**{entry[5]}\n"
record += f"**链接**{entry[2]}\n"
record += f"**上传时间**{entry[4]}\n" record += f"**上传时间**{entry[4]}\n"
record += "\n" + "-" * 40 + "\n" # 添加分隔线以便区分不同文章 record += "\n" + "-" * 40 + "\n" # 添加分隔线以便区分不同文章
record_md(record) record_md(record)
@ -129,7 +130,7 @@ def get_filtered_articles(entries, Is_short):
def Src_4hou(e_hour, Is_short): def Src_4hou(e_hour, Is_short):
if not os.path.exists('./resources/db/4hou.db'): if not os.path.exists('./db/4hou.db'):
# 创建数据库和表 # 创建数据库和表
create_database() create_database()

View File

@ -7,7 +7,7 @@ from datetime import datetime, timedelta
def create_database(): def create_database():
conn = sqlite3.connect('./resources/db/anquanke.db') conn = sqlite3.connect('./db/anquanke.db')
cursor = conn.cursor() cursor = conn.cursor()
cursor.execute('''CREATE TABLE IF NOT EXISTS articles ( cursor.execute('''CREATE TABLE IF NOT EXISTS articles (
id INTEGER PRIMARY KEY AUTOINCREMENT, id INTEGER PRIMARY KEY AUTOINCREMENT,
@ -22,7 +22,7 @@ def create_database():
conn.close() conn.close()
def insert_data(data): def insert_data(data):
conn = sqlite3.connect('./resources/db/anquanke.db') conn = sqlite3.connect('./db/anquanke.db')
cursor = conn.cursor() cursor = conn.cursor()
for entry in data: for entry in data:
cursor.execute(''' cursor.execute('''
@ -34,11 +34,11 @@ def insert_data(data):
def get_anquanke_json(): def get_anquanke_json():
# 检查文件是否存在 # 检查文件是否存在
if not os.path.exists('./resources/JSON/anquanke.json'): if not os.path.exists('./JSON/anquanke.json'):
raise FileNotFoundError(f"anquanke.json文件不存在请检查程序是否运行正常") raise FileNotFoundError(f"anquanke.json文件不存在请检查程序是否运行正常")
# 打开并读取JSON文件 # 打开并读取JSON文件
with open('./resources/JSON/anquanke.json', 'r', encoding='utf-8') as file: with open('./JSON/anquanke.json', 'r', encoding='utf-8') as file:
data = json.load(file) data = json.load(file)
# 假设data是一个包含多个JSON对象的列表 # 假设data是一个包含多个JSON对象的列表
@ -62,7 +62,7 @@ def get_anquanke_json():
return total_data return total_data
def select_articles(e_hour): def select_articles(e_hour):
conn = sqlite3.connect('./resources/db/anquanke.db') conn = sqlite3.connect('./db/anquanke.db')
cursor = conn.cursor() cursor = conn.cursor()
# 获取当前日期和时间 # 获取当前日期和时间
@ -81,13 +81,13 @@ def select_articles(e_hour):
return results return results
def clear_table(): def clear_table():
conn = sqlite3.connect('./resources/db/anquanke.db') conn = sqlite3.connect('./db/anquanke.db')
cursor = conn.cursor() cursor = conn.cursor()
cursor.execute('DELETE FROM articles') cursor.execute('DELETE FROM articles')
conn.commit() conn.commit()
conn.close() conn.close()
def record_md(result, filename="./resources/history/sec_news.md"): def record_md(result, filename="./history/sec_news.md"):
# 读取现有内容 # 读取现有内容
if os.path.exists(filename): if os.path.exists(filename):
with open(filename, 'r', encoding='utf-8') as file: with open(filename, 'r', encoding='utf-8') as file:
@ -107,17 +107,18 @@ def get_filtered_articles(entries, Is_short):
record = "" record = ""
for entry in entries: for entry in entries:
if Is_short == False: if Is_short == False:
result += f"文章:[{entry[1]}]({entry[2]})\n作者:{entry[6]}\n来源:{entry[3]}\n" result += f"作者:{entry[6]}\n来源:{entry[3]}\n文章:{entry[1]}\n"
result += f"上传时间:{entry[5]}\n" result += f"链接:{entry[2]}\n上传时间:{entry[5]}\n"
result += "\n" + "-" * 40 + "\n" # 添加分隔线以便区分不同文章 result += "\n" + "-" * 40 + "\n" # 添加分隔线以便区分不同文章
elif Is_short == True: elif Is_short == True:
result += f"文章:{entry[1]}\n" result += f"文章:{entry[1]}\n"
result += f"链接:{entry[2]}\n上传时间:{entry[5]}\n" result += f"链接:{entry[2]}\n上传时间:{entry[5]}\n"
result += "\n" + "-" * 3 + "\n" # 添加分隔线以便区分不同文章 result += "\n" + "-" * 3 + "\n" # 添加分隔线以便区分不同文章
record += f"#### 文章:[{entry[1]}]({entry[2]})\n" record += f"#### 文章:{entry[1]}\n"
record += f"**作者**{entry[6]}\n" record += f"**作者**{entry[6]}\n"
record += f"**来源**{entry[3]}\n" record += f"**来源**{entry[3]}\n"
record += f"**链接**{entry[2]}\n"
record += f"**上传时间**{entry[5]}\n" record += f"**上传时间**{entry[5]}\n"
record += "\n" + "-" * 40 + "\n" # 添加分隔线以便区分不同文章 record += "\n" + "-" * 40 + "\n" # 添加分隔线以便区分不同文章
record_md(record) record_md(record)
@ -125,7 +126,7 @@ def get_filtered_articles(entries, Is_short):
def Src_anquanke(e_hour, Is_short): def Src_anquanke(e_hour, Is_short):
if not os.path.exists('./resources/db/anquanke.db'): if not os.path.exists('./db/anquanke.db'):
# 创建数据库和表 # 创建数据库和表
create_database() create_database()

View File

@ -7,7 +7,7 @@ from datetime import datetime, timedelta
def create_database(): def create_database():
conn = sqlite3.connect('./resources/db/doonsec.db') conn = sqlite3.connect('./db/doonsec.db')
cursor = conn.cursor() cursor = conn.cursor()
cursor.execute('''CREATE TABLE IF NOT EXISTS articles ( cursor.execute('''CREATE TABLE IF NOT EXISTS articles (
id INTEGER PRIMARY KEY AUTOINCREMENT, id INTEGER PRIMARY KEY AUTOINCREMENT,
@ -21,7 +21,7 @@ def create_database():
conn.close() conn.close()
def insert_data(data): def insert_data(data):
conn = sqlite3.connect('./resources/db/doonsec.db') conn = sqlite3.connect('./db/doonsec.db')
cursor = conn.cursor() cursor = conn.cursor()
for entry in data: for entry in data:
try: try:
@ -42,11 +42,11 @@ def insert_data(data):
def get_doonsec_json(): def get_doonsec_json():
# 检查文件是否存在 # 检查文件是否存在
if not os.path.exists('./resources/JSON/doonsec.json'): if not os.path.exists('./JSON/doonsec.json'):
raise FileNotFoundError(f"doonsec.json文件不存在请检查程序是否运行正常") raise FileNotFoundError(f"doonsec.json文件不存在请检查程序是否运行正常")
# 打开并读取JSON文件 # 打开并读取JSON文件
with open('./resources/JSON/doonsec.json', 'r', encoding='utf-8') as file: with open('./JSON/doonsec.json', 'r', encoding='utf-8') as file:
data = json.load(file) data = json.load(file)
# 假设data是一个包含多个JSON对象的列表 # 假设data是一个包含多个JSON对象的列表
@ -68,50 +68,33 @@ def get_doonsec_json():
return total_data return total_data
def select_articles(e_hour, Doonsec_switch, Doonsec): def select_articles(e_hour):
conn = sqlite3.connect('./resources/db/doonsec.db') conn = sqlite3.connect('./db/doonsec.db')
cursor = conn.cursor() cursor = conn.cursor()
# 获取当前日期和时间 # 获取当前日期和时间
now = datetime.now() now = datetime.now()
start_time = now - timedelta(hours=e_hour, minutes=3) start_time = now - timedelta(hours=e_hour)
end_time = now end_time = now
if Doonsec_switch == False:
# 查询指定时间段内的数据 # 查询指定时间段内的数据
cursor.execute(''' cursor.execute('''
SELECT * FROM articles SELECT * FROM articles
WHERE pubDate BETWEEN ? AND ? WHERE pubDate BETWEEN ? AND ?
''', (start_time.strftime('%Y-%m-%d %H:%M:%S'), end_time.strftime('%Y-%m-%d %H:%M:%S'))) ''', (start_time.strftime('%Y-%m-%d %H:%M:%S'), end_time.strftime('%Y-%m-%d %H:%M:%S')))
elif Doonsec_switch == True:
# 查询指定时间段内且title包含特定关键词的数据
placeholders = ', '.join(['?'] * len(Doonsec))
query = f'''
SELECT * FROM articles
WHERE pubDate BETWEEN ? AND ?
AND (title LIKE ? OR title LIKE ? OR title LIKE ?)
'''
# 构建参数列表
params = [start_time.strftime('%Y-%m-%d %H:%M:%S'), end_time.strftime('%Y-%m-%d %H:%M:%S')]
for keyword in Doonsec:
params.append(f'%{keyword}%')
# 执行查询
cursor.execute(query, params)
results = cursor.fetchall() results = cursor.fetchall()
conn.close() conn.close()
return results return results
def clear_table(): def clear_table():
conn = sqlite3.connect('./resources/db/doonsec.db') conn = sqlite3.connect('./db/doonsec.db')
cursor = conn.cursor() cursor = conn.cursor()
cursor.execute('DELETE FROM articles') cursor.execute('DELETE FROM articles')
conn.commit() conn.commit()
conn.close() conn.close()
def record_md(result, filename="./resources/history/tech_passage.md"): def record_md(result, filename="./history/tech_passage.md"):
# 读取现有内容 # 读取现有内容
if os.path.exists(filename): if os.path.exists(filename):
with open(filename, 'r', encoding='utf-8') as file: with open(filename, 'r', encoding='utf-8') as file:
@ -131,18 +114,18 @@ def get_filtered_articles(entries, Is_short):
record = "" record = ""
for entry in entries: for entry in entries:
if Is_short == False: if Is_short == False:
result += f"文章:[{entry[1]}]({entry[2]})\n" result += f"作者:{entry[5]}\n文章:{entry[1]}\n"
result += f"作者:{entry[5]}\n" result += f"链接:[点此访问]({entry[2]})\n上传时间:{entry[4]}\n"
result += f"上传时间:{entry[4]}\n"
result += f"简介:{entry[3]}\n" result += f"简介:{entry[3]}\n"
result += "\n" + "-" * 40 + "\n" # 添加分隔线以便区分不同文章 result += "\n" + "-" * 40 + "\n" # 添加分隔线以便区分不同文章
if Is_short == True: if Is_short == True:
result += f"文章:[{entry[1]}]({entry[2]})\n" result += f"文章:{entry[1]}\n"
result += f"上传时间:{entry[4]}\n" result += f"链接:[点此访问]({entry[2]})\n上传时间:{entry[4]}\n"
result += "\n" + "-" * 3 + "\n" # 添加分隔线以便区分不同文章 result += "\n" + "-" * 3 + "\n" # 添加分隔线以便区分不同文章
record += f"#### 文章:[{entry[1]}]({entry[2]})\n" record += f"#### 文章:{entry[1]}\n"
record += f"**作者**{entry[5]}\n" record += f"**作者**{entry[5]}\n"
record += f"**链接**[点此访问]({entry[2]})\n"
record += f"**上传时间**{entry[4]}\n" record += f"**上传时间**{entry[4]}\n"
record += f"**简介**{entry[3]}\n" record += f"**简介**{entry[3]}\n"
record += "\n" + "-" * 40 + "\n" # 添加分隔线以便区分不同文章 record += "\n" + "-" * 40 + "\n" # 添加分隔线以便区分不同文章
@ -150,8 +133,8 @@ def get_filtered_articles(entries, Is_short):
return result return result
def Src_doonsec(e_hour, Is_short, Doonsec_switch, Doonsec): def Src_doonsec(e_hour, Is_short):
if not os.path.exists('./resources/db/doonsec.db'): if not os.path.exists('./db/doonsec.db'):
# 创建数据库和表 # 创建数据库和表
create_database() create_database()
@ -165,7 +148,7 @@ def Src_doonsec(e_hour, Is_short, Doonsec_switch, Doonsec):
insert_data(M_doonsec_data) insert_data(M_doonsec_data)
# 查询指定时间段内的数据 # 查询指定时间段内的数据
filtered_articles = select_articles(e_hour, Doonsec_switch, Doonsec) filtered_articles = select_articles(e_hour)
# print(filtered_articles) # print(filtered_articles)
if filtered_articles: if filtered_articles:
@ -175,7 +158,7 @@ def Src_doonsec(e_hour, Is_short, Doonsec_switch, Doonsec):
return False return False
if __name__ == "__main__": if __name__ == "__main__":
reslts = Src_doonsec(24, False, True, ["webshell", "2000", "POC"] ) reslts = Src_doonsec(4, False)
if reslts != False: if reslts != False:
print(reslts) print(reslts)
else: else:

View File

@ -6,7 +6,7 @@ import os
from datetime import datetime, timedelta from datetime import datetime, timedelta
def create_database(): def create_database():
conn = sqlite3.connect('./resources/db/freebuf.db') conn = sqlite3.connect('./db/freebuf.db')
cursor = conn.cursor() cursor = conn.cursor()
cursor.execute('''CREATE TABLE IF NOT EXISTS articles ( cursor.execute('''CREATE TABLE IF NOT EXISTS articles (
id INTEGER PRIMARY KEY AUTOINCREMENT, id INTEGER PRIMARY KEY AUTOINCREMENT,
@ -20,7 +20,7 @@ def create_database():
conn.close() conn.close()
def insert_data(data): def insert_data(data):
conn = sqlite3.connect('./resources/db/freebuf.db') conn = sqlite3.connect('./db/freebuf.db')
cursor = conn.cursor() cursor = conn.cursor()
for entry in data: for entry in data:
try: try:
@ -41,16 +41,16 @@ def insert_data(data):
def get_freebuf_json(): def get_freebuf_json():
# 检查文件是否存在 # 检查文件是否存在
if not os.path.exists('./resources/JSON/freebuf.json'): if not os.path.exists('./JSON/freebuf.json'):
raise FileNotFoundError(f"freebuf.json文件不存在请检查程序是否运行正常") raise FileNotFoundError(f"freebuf.json文件不存在请检查程序是否运行正常")
# 打开并读取JSON文件 # 打开并读取JSON文件
with open('./resources/JSON/freebuf.json', 'r', encoding='utf-8') as file: with open('./JSON/freebuf.json', 'r', encoding='utf-8') as file:
data = json.load(file) data = json.load(file)
# 假设data是一个包含多个JSON对象的列表 # 假设data是一个包含多个JSON对象的列表
if not isinstance(data, list): if not isinstance(data, list):
raise ValueError("JSON文件格式错误请检查爬取程序是否异常!") raise ValueError("JSON文件格式错误请检查common.py是否异常!")
# 提取所需字段并编号 # 提取所需字段并编号
total_data = [] total_data = []
@ -68,7 +68,7 @@ def get_freebuf_json():
return total_data return total_data
def select_articles(e_hour): def select_articles(e_hour):
conn = sqlite3.connect('./resources/db/freebuf.db') conn = sqlite3.connect('./db/freebuf.db')
cursor = conn.cursor() cursor = conn.cursor()
# 获取当前日期和时间 # 获取当前日期和时间
@ -87,13 +87,13 @@ def select_articles(e_hour):
return results return results
def clear_table(): def clear_table():
conn = sqlite3.connect('./resources/db/freebuf.db') conn = sqlite3.connect('./db/freebuf.db')
cursor = conn.cursor() cursor = conn.cursor()
cursor.execute('DELETE FROM articles') cursor.execute('DELETE FROM articles')
conn.commit() conn.commit()
conn.close() conn.close()
def record_md(result, filename="./resources/history/sec_news.md"): def record_md(result, filename="./history/sec_news.md"):
# 读取现有内容 # 读取现有内容
if os.path.exists(filename): if os.path.exists(filename):
with open(filename, 'r', encoding='utf-8') as file: with open(filename, 'r', encoding='utf-8') as file:
@ -113,16 +113,17 @@ def get_filtered_articles(entries, Is_short):
record = "" record = ""
for entry in entries: for entry in entries:
if Is_short == False: if Is_short == False:
result += f"文章:[{entry[1]}]({entry[2]})\n类型:{entry[5]}\n" result += f"类型:{entry[5]}\n文章:{entry[1]}\n"
result += f"上传时间:{entry[4]}\n" result += f"链接:{entry[2]}\n上传时间:{entry[4]}\n"
result += "\n" + "-" * 40 + "\n" # 添加分隔线以便区分不同文章 result += "\n" + "-" * 40 + "\n" # 添加分隔线以便区分不同文章
elif Is_short == True: elif Is_short == True:
result += f"文章:[{entry[1]}]({entry[2]})\n" result += f"文章:{entry[1]}\n"
result += f"上传时间:{entry[4]}\n" result += f"链接:{entry[2]}\n上传时间:{entry[4]}\n"
result += "\n" + "-" * 3 + "\n" # 添加分隔线以便区分不同文章 result += "\n" + "-" * 3 + "\n" # 添加分隔线以便区分不同文章
record += f"#### 文章:[{entry[1]}]({entry[2]})\n" record += f"#### 文章:{entry[1]}\n"
record += f"**类型**{entry[5]}\n" record += f"**类型**{entry[5]}\n"
record += f"**链接**{entry[2]}\n"
record += f"**上传时间**{entry[4]}\n" record += f"**上传时间**{entry[4]}\n"
record += "\n" + "-" * 40 + "\n" # 添加分隔线以便区分不同文章 record += "\n" + "-" * 40 + "\n" # 添加分隔线以便区分不同文章
record_md(record) record_md(record)
@ -130,7 +131,7 @@ def get_filtered_articles(entries, Is_short):
def Src_freebuf(e_hour, Is_short): def Src_freebuf(e_hour, Is_short):
if not os.path.exists('./resources/db/freebuf.db'): if not os.path.exists('./db/freebuf.db'):
# 创建数据库和表 # 创建数据库和表
create_database() create_database()

View File

@ -1,423 +0,0 @@
# -*- coding: utf-8 -*-
import json
import sqlite3
import os
from datetime import datetime, timedelta
def create_database():
conn = sqlite3.connect('./resources/db/github.db')
cursor = conn.cursor()
cursor.executescript('''
CREATE TABLE IF NOT EXISTS keywords (
id INTEGER PRIMARY KEY AUTOINCREMENT,
title TEXT,
link TEXT,
description TEXT,
pubDate DATETIME,
author TEXT,
keyword TEXT,
language TEXT,
is_sended BOOLEAN
);
CREATE TABLE IF NOT EXISTS repos (
id INTEGER PRIMARY KEY AUTOINCREMENT,
title TEXT,
link TEXT,
description TEXT,
pubDate DATETIME,
author TEXT,
keyword TEXT,
link2 TEXT
);
CREATE TABLE IF NOT EXISTS releases (
id INTEGER PRIMARY KEY AUTOINCREMENT,
link TEXT,
pubDate DATETIME,
author TEXT,
keyword TEXT
);
CREATE TABLE IF NOT EXISTS users (
id INTEGER PRIMARY KEY AUTOINCREMENT,
title TEXT,
link TEXT,
description TEXT,
pubDate DATETIME,
author TEXT,
keyword TEXT,
language TEXT,
is_sended BOOLEAN
);
''')
conn.commit()
conn.close()
def insert_data():
# 检查文件是否存在
# 打开并读取JSON文件
# 假设data是一个包含多个JSON对象的列表然后校验JSON格式是否异常
if not os.path.exists('./resources/JSON/github_keyword.json'):
raise FileNotFoundError(f"github_keyword文件不存在请检查程序是否运行正常")
with open('./resources/JSON/github_keyword.json', 'r', encoding='utf-8') as file:
data_keyword = json.load(file)
if not isinstance(data_keyword, list):
raise ValueError("JSON文件格式错误请检查爬取程序是否异常")
if not os.path.exists('./resources/JSON/github_repo.json'):
raise FileNotFoundError(f"github_repo文件不存在请检查程序是否运行正常")
with open('./resources/JSON/github_repo.json', 'r', encoding='utf-8') as file:
data_repo = json.load(file)
if not isinstance(data_repo, list):
raise ValueError("JSON文件格式错误请检查爬取程序是否异常")
if not os.path.exists('./resources/JSON/github_release.json'):
raise FileNotFoundError(f"github_release文件不存在请检查程序是否运行正常")
with open('./resources/JSON/github_release.json', 'r', encoding='utf-8') as file:
data_release = json.load(file)
if not isinstance(data_release, list):
raise ValueError("JSON文件格式错误请检查爬取程序是否异常")
if not os.path.exists('./resources/JSON/github_user.json'):
raise FileNotFoundError(f"github_user文件不存在请检查程序是否运行正常")
with open('./resources/JSON/github_user.json', 'r', encoding='utf-8') as file:
data_user = json.load(file)
if not isinstance(data_user, list):
raise ValueError("JSON文件格式错误请检查爬取程序是否异常")
conn = sqlite3.connect('./resources/db/github.db')
cursor = conn.cursor()
# 提取所需字段并编号
for index, item in enumerate(data_keyword, start=1):
entry = {
"id": index,
"title": item.get("name", ""),
"link": item.get("link", ""),
"description": item.get("description", ""),
"pubDate": item.get("created_at", ""),
"author": item.get("author", ""),
"keyword": item.get("keyword", ""),
"language": item.get("language", "")
}
try:
# 解析 pubDate 字符串为 datetime 对象
pub_date = datetime.strptime(entry['pubDate'], '%Y-%m-%dT%H:%M:%SZ')
# 格式化 pubDate 为所需的格式
formatted_pub_date = pub_date.strftime('%Y-%m-%d %H:%M:%S')
except ValueError:
# 如果解析失败,使用原始 pubDate 字符串
formatted_pub_date = entry['pubDate']
# 检查是否存在相同 title 和 author 的记录
cursor.execute('''
SELECT 1 FROM keywords WHERE title = ? AND author = ?
''', (entry['title'], entry['author']))
if cursor.fetchone() is None:
# 如果没有找到相同记录,则插入新记录
cursor.execute('''
INSERT INTO keywords (title, link, description, pubDate, author, language, keyword)
VALUES (?, ?, ?, ?, ?, ?, ?)
''', (entry['title'], entry['link'], entry['description'], formatted_pub_date, entry['author'], entry['language'], entry['keyword']))
for index, item in enumerate(data_repo, start=1):
entry = {
"id": index,
"title": item.get("name", ""),
"link": item.get("link", ""),
"description": item.get("description", ""),
"pubDate": item.get("updated_at", ""),
"author": item.get("author", ""),
"keyword": item.get("keyword", ""),
"link2": item.get("link_2", "")
}
try:
# 解析 pubDate 字符串为 datetime 对象
pub_date = datetime.strptime(entry['pubDate'], '%Y-%m-%dT%H:%M:%SZ')
# 格式化 pubDate 为所需的格式
formatted_pub_date = pub_date.strftime('%Y-%m-%d %H:%M:%S')
except ValueError:
# 如果解析失败,使用原始 pubDate 字符串
formatted_pub_date = entry['pubDate']
cursor.execute('''
INSERT INTO repos (title, link, description, pubDate, author, link2, keyword)
VALUES (?, ?, ?, ?, ?, ?, ?)
''', (entry['title'], entry['link'], entry['description'], formatted_pub_date, entry['author'], entry['link2'], entry['keyword']))
for index, item in enumerate(data_release, start=1):
entry = {
"id": index,
"link": item.get("link", ""),
"pubDate": item.get("published_at", ""),
"author": item.get("author", ""),
"keyword": item.get("keyword", "")
}
try:
# 解析 pubDate 字符串为 datetime 对象
pub_date = datetime.strptime(entry['pubDate'], '%Y-%m-%dT%H:%M:%SZ')
# 格式化 pubDate 为所需的格式
formatted_pub_date = pub_date.strftime('%Y-%m-%d %H:%M:%S')
except ValueError:
# 如果解析失败,使用原始 pubDate 字符串
formatted_pub_date = entry['pubDate']
cursor.execute('''
INSERT INTO releases (link, pubDate, author, keyword)
VALUES (?, ?, ?, ?)
''', (entry['link'], formatted_pub_date, entry['author'], entry['keyword']))
# 插入 users 数据
for index, item in enumerate(data_user, start=1):
entry = {
"id": index,
"title": item.get("name", ""),
"link": item.get("link", ""),
"description": item.get("description", ""),
"pubDate": item.get("created_at", ""),
"author": item.get("author", ""),
"keyword": item.get("keyword", ""),
"language": item.get("language", "")
}
try:
# 解析 pubDate 字符串为 datetime 对象
pub_date = datetime.strptime(entry['pubDate'], '%Y-%m-%dT%H:%M:%SZ')
# 格式化 pubDate 为所需的格式
formatted_pub_date = pub_date.strftime('%Y-%m-%d %H:%M:%S')
except ValueError:
# 如果解析失败,使用原始 pubDate 字符串
formatted_pub_date = entry['pubDate']
# 检查是否存在相同 title 和 author 的记录
cursor.execute('''
SELECT 1 FROM users WHERE title = ? AND author = ?
''', (entry['title'], entry['author']))
if cursor.fetchone() is None:
# 如果没有找到相同记录,则插入新记录
cursor.execute('''
INSERT INTO users (title, link, description, pubDate, author, keyword, language)
VALUES (?, ?, ?, ?, ?, ?, ?)
''', (entry['title'], entry['link'], entry['description'], formatted_pub_date, entry['author'], entry['keyword'], entry['language']))
conn.commit()
conn.close()
def select_articles(e_hour):
conn = sqlite3.connect('./resources/db/github.db')
cursor = conn.cursor()
# 获取当前日期和时间
now = datetime.now()
two_months_ago = now - timedelta(days=60) # 假设两个月大约60天
start_time = now - timedelta(hours=e_hour, minutes=3)
# 查询指定时间段内的数据
cursor.execute('''
SELECT * FROM keywords
WHERE is_sended IS NULL AND pubDate BETWEEN ? AND ?
ORDER BY pubDate DESC
''', (start_time.strftime('%Y-%m-%d %H:%M:%S'), now.strftime('%Y-%m-%d %H:%M:%S')))
result_1 = cursor.fetchall()
if result_1:
for row in result_1:
keyword_id = row[0]
cursor.execute('''
UPDATE keywords
SET is_sended = True
WHERE id = ?
''', (keyword_id,))
conn.commit()
cursor.execute('''
SELECT * FROM repos
WHERE pubDate BETWEEN ? AND ?
''', (start_time.strftime('%Y-%m-%d %H:%M:%S'), now.strftime('%Y-%m-%d %H:%M:%S')))
result_2 = cursor.fetchall()
# 查询最近的5条未被标记为True的消息且发布时间不超过两个月
cursor.execute('''
SELECT * FROM users
WHERE is_sended IS NULL AND pubDate BETWEEN ? AND ?
ORDER BY pubDate DESC
LIMIT 5
''', (two_months_ago.strftime('%Y-%m-%d %H:%M:%S'), now.strftime('%Y-%m-%d %H:%M:%S')))
result_3 = cursor.fetchall()
# print(results)
if result_3:
for row in result_3:
user_id = row[0]
cursor.execute('''
UPDATE users
SET is_sended = True
WHERE id = ?
''', (user_id,))
conn.commit() # 提交事务
cursor.execute('''
SELECT * FROM releases
WHERE pubDate BETWEEN ? AND ?
''', (start_time.strftime('%Y-%m-%d %H:%M:%S'), now.strftime('%Y-%m-%d %H:%M:%S')))
result_4 = cursor.fetchall()
cursor.close()
conn.close()
return result_1, result_2, result_3, result_4
def clear_table():
conn = sqlite3.connect('./resources/db/github.db')
cursor = conn.cursor()
cursor.execute('DELETE FROM repos')
cursor.execute('DELETE FROM releases')
conn.commit()
conn.close()
def record_md(result, filename="./resources/history/github.md"):
# 读取现有内容
if os.path.exists(filename):
with open(filename, 'r', encoding='utf-8') as file:
existing_content = file.read()
else:
existing_content = ""
# 将新内容插入到现有内容的开头
new_content = result + existing_content
# 写回文件
with open(filename, 'w', encoding='utf-8') as file:
file.write(new_content)
def get_filtered_articles(entries, Is_short, choice):
result = ""
record = ""
for entry in entries:
if Is_short == False:
if choice == 1:
result += f"关键词【{entry[6]}】发现新项目:[{entry[1]}]({entry[2]})\n"
result += f"项目描述:{entry[3]}\n"
result += f"上传时间:{entry[4]}\n"
result += f"开发语言:{entry[7]}\t\t作者:{entry[5]}\n"
result += "\n" + "-" * 10 + "\n" # 添加分隔线以便区分不同文章
elif choice == 2:
result += f"项目:[{entry[1]}]({entry[2]})存在更新!!!\n"
result += f"更新描述:{entry[3]}\n"
result += f"更新时间:{entry[4]}\n"
result += f"提交者:{entry[5]}[点此查看提交详情]({entry[2]})\n"
result += "\n" + "-" * 10 + "\n" # 添加分隔线以便区分不同文章
elif choice == 3:
result += f"大佬 {entry[5]} 上传了一个新工具:[{entry[1]}]({entry[2]})\n"
result += f"项目描述:{entry[3]}\n"
result += f"上传时间:{entry[4]}\n"
result += "\n" + "-" * 10 + "\n" # 添加分隔线以便区分不同文章
elif choice == 4:
result += f"{entry[3]}】为[{entry[4]}]({entry[1]})发布了新版本,请及时查收!\n"
result += f"发布时间:{entry[2]}\n"
result += "\n" + "-" * 10 + "\n" # 添加分隔线以便区分不同文章
elif Is_short == True:
if choice == 1:
result += f"关键词【{entry[7]}】发现新项目:[{entry[1]}]({entry[2]})\n"
result += f"上传时间:{entry[4]}\n"
result += f"开发语言:{entry[6]}\t\t作者:{entry[5]}\n"
result += "\n" + "-" * 3 + "\n" # 添加分隔线以便区分不同文章
elif choice == 2:
result += f"项目:[{entry[1]}]({entry[2]})存在更新!!!\n"
result += f"更新描述:{entry[3]}\n"
result += f"更新时间:{entry[4]}\n"
result += f"提交者:{entry[5]}[点此查看提交详情]({entry[2]})\n"
result += "\n" + "-" * 3 + "\n" # 添加分隔线以便区分不同文章
elif choice == 3:
result += f"大佬 {entry[5]} 上传了一个新工具:[{entry[1]}]({entry[2]})\n"
result += f"项目描述:{entry[3]}\n"
result += f"上传时间:{entry[4]}\n"
result += "\n" + "-" * 3 + "\n" # 添加分隔线以便区分不同文章
elif choice == 4:
result += f"{entry[3]}】为[{entry[4]}]({entry[1]})发布了新版本,请及时查收!\n"
result += f"发布时间:{entry[2]}\n"
result += "\n" + "-" * 3 + "\n" # 添加分隔线以便区分不同文章
if choice == 1:
record += f"#### 关键词【{entry[7]}】发现新项目:[{entry[1]}]({entry[2]})\n"
record += f"**项目描述**{entry[3]}\n"
record += f"**上传时间**{entry[4]}\n"
record += f"**开发语言**{entry[6]}\t\t**作者**{entry[5]}\n"
record += "\n" + "-" * 10 + "\n" # 添加分隔线以便区分不同文章
elif choice == 2:
record += f"#### 项目:[{entry[1]}]({entry[2]})存在更新!!!\n"
record += f"**更新描述**{entry[3]}\n"
record += f"**更新时间**{entry[4]}\n"
record += f"**提交者**{entry[5]}[点此查看提交详情]({entry[2]})\n"
record += "\n" + "-" * 10 + "\n" # 添加分隔线以便区分不同文章
elif choice == 3:
record += f"#### 大佬 {entry[5]} 上传了一个新工具:[{entry[1]}]({entry[2]})\n"
record += f"**项目描述**{entry[3]}\n"
record += f"**上传时间**{entry[4]}\n"
record += "\n" + "-" * 10 + "\n" # 添加分隔线以便区分不同文章
elif choice == 4:
record += f"#### 【{entry[3]}】为[{entry[4]}]({entry[1]})发布了新版本,请及时查收!\n"
record += f"**发布时间**{entry[2]}\n"
record += "\n" + "-" * 10 + "\n" # 添加分隔线以便区分不同文章
record_md(record)
return result
def Src_github(e_hour, Is_short):
if not os.path.exists('./resources/db/github.db'):
# 创建数据库和表
create_database()
# 清空表
clear_table()
# 插入数据到数据库
insert_data()
# 查询指定时间段内的数据
keyword_data, repo_data, user_data, release_data = select_articles(e_hour)
if keyword_data:
result_1 = get_filtered_articles(keyword_data, Is_short, 1)
else:
result_1 = ""
if repo_data:
result_2 = get_filtered_articles(repo_data, Is_short, 2)
else:
result_2 = ""
if user_data:
result_3 = get_filtered_articles(user_data, Is_short, 3)
else:
result_3 = ""
if release_data:
result_4 = get_filtered_articles(release_data, Is_short, 4)
else:
result_4 = ""
return result_1, result_2, result_3, result_4
if __name__ == "__main__":
result_1, result_2, result_3, result_4 = Src_github(24000, False)
if result_1 != "":
print(result_1)
if result_2 != "":
print(result_2)
if result_3 != "":
print(result_3)
if result_4 != "":
print(result_4)
if result_1 == "" and result_2 == "" and result_3 == "" and result_4 == "":
# 如果为空,则跳过执行
print("-" * 10)
print("github数据为空跳过执行。")

View File

@ -6,7 +6,7 @@ import os
from datetime import datetime, timedelta from datetime import datetime, timedelta
def create_database(): def create_database():
conn = sqlite3.connect('./resources/db/qianxin.db') conn = sqlite3.connect('./db/qianxin.db')
cursor = conn.cursor() cursor = conn.cursor()
cursor.execute('''CREATE TABLE IF NOT EXISTS articles ( cursor.execute('''CREATE TABLE IF NOT EXISTS articles (
id INTEGER PRIMARY KEY AUTOINCREMENT, id INTEGER PRIMARY KEY AUTOINCREMENT,
@ -20,7 +20,7 @@ def create_database():
conn.close() conn.close()
def insert_data(data): def insert_data(data):
conn = sqlite3.connect('./resources/db/qianxin.db') conn = sqlite3.connect('./db/qianxin.db')
cursor = conn.cursor() cursor = conn.cursor()
for entry in data: for entry in data:
cursor.execute(''' cursor.execute('''
@ -32,11 +32,11 @@ def insert_data(data):
def get_qianxin_json(): def get_qianxin_json():
# 检查文件是否存在 # 检查文件是否存在
if not os.path.exists('./resources/JSON/qianxin.json'): if not os.path.exists('./JSON/qianxin.json'):
raise FileNotFoundError(f"qianxin.json文件不存在请检查程序是否运行正常") raise FileNotFoundError(f"qianxin.json文件不存在请检查程序是否运行正常")
# 打开并读取JSON文件 # 打开并读取JSON文件
with open('./resources/JSON/qianxin.json', 'r', encoding='utf-8') as file: with open('./JSON/qianxin.json', 'r', encoding='utf-8') as file:
data = json.load(file) data = json.load(file)
# 假设data是一个包含多个JSON对象的列表 # 假设data是一个包含多个JSON对象的列表
@ -59,7 +59,7 @@ def get_qianxin_json():
return total_data return total_data
def select_articles(e_hour): def select_articles(e_hour):
conn = sqlite3.connect('./resources/db/qianxin.db') conn = sqlite3.connect('./db/qianxin.db')
cursor = conn.cursor() cursor = conn.cursor()
# 获取当前日期和时间 # 获取当前日期和时间
@ -78,13 +78,13 @@ def select_articles(e_hour):
return results return results
def clear_table(): def clear_table():
conn = sqlite3.connect('./resources/db/qianxin.db') conn = sqlite3.connect('./db/qianxin.db')
cursor = conn.cursor() cursor = conn.cursor()
cursor.execute('DELETE FROM articles') cursor.execute('DELETE FROM articles')
conn.commit() conn.commit()
conn.close() conn.close()
def record_md(result, filename="./resources/history/tech_passage.md"): def record_md(result, filename="./history/tech_passage.md"):
# 读取现有内容 # 读取现有内容
if os.path.exists(filename): if os.path.exists(filename):
with open(filename, 'r', encoding='utf-8') as file: with open(filename, 'r', encoding='utf-8') as file:
@ -104,17 +104,18 @@ def get_filtered_articles(entries, Is_short):
record = "" record = ""
for entry in entries: for entry in entries:
if Is_short == False: if Is_short == False:
result += f"文章:[{entry[1]}]({entry[2]})\n来源:{entry[3]}\n" result += f"来源:{entry[3]}\n文章:{entry[1]}\n"
result += f"上传时间:{entry[5]}\n" result += f"链接:{entry[2]}\n上传时间:{entry[5]}\n"
result += f"描述:{entry[4]}\n" result += f"描述:{entry[4]}\n"
result += "\n" + "-" * 40 + "\n" # 添加分隔线以便区分不同文章 result += "\n" + "-" * 40 + "\n" # 添加分隔线以便区分不同文章
if Is_short == True: if Is_short == False:
result += f"文章:[{entry[1]}]({entry[2]})\n" result += f"文章:{entry[1]}\n"
result += f"上传时间:{entry[5]}\n" result += f"链接:{entry[2]}\n上传时间:{entry[5]}\n"
result += "\n" + "-" * 3 + "\n" # 添加分隔线以便区分不同文章 result += "\n" + "-" * 3 + "\n" # 添加分隔线以便区分不同文章
record += f"#### 文章:[{entry[1]}]({entry[2]})\n" record += f"#### 文章:{entry[1]}\n"
record += f"**来源**{entry[3]}\n" record += f"**来源**{entry[3]}\n"
record += f"**链接**{entry[2]}\n"
record += f"**上传时间**{entry[5]}\n" record += f"**上传时间**{entry[5]}\n"
record += f"**描述**{entry[4]}\n" record += f"**描述**{entry[4]}\n"
record += "\n" + "-" * 40 + "\n" # 添加分隔线以便区分不同文章 record += "\n" + "-" * 40 + "\n" # 添加分隔线以便区分不同文章
@ -123,7 +124,7 @@ def get_filtered_articles(entries, Is_short):
def Src_qianxin(e_hour, Is_short): def Src_qianxin(e_hour, Is_short):
if not os.path.exists('./resources/db/qianxin.db'): if not os.path.exists('./db/qianxin.db'):
# 创建数据库和表 # 创建数据库和表
create_database() create_database()

View File

@ -8,7 +8,7 @@ import email.utils
def create_database(): def create_database():
conn = sqlite3.connect('./resources/db/seebug.db') conn = sqlite3.connect('./db/seebug.db')
cursor = conn.cursor() cursor = conn.cursor()
cursor.execute('''CREATE TABLE IF NOT EXISTS articles ( cursor.execute('''CREATE TABLE IF NOT EXISTS articles (
id INTEGER PRIMARY KEY AUTOINCREMENT, id INTEGER PRIMARY KEY AUTOINCREMENT,
@ -23,7 +23,7 @@ def create_database():
def insert_data(data): def insert_data(data):
conn = sqlite3.connect('./resources/db/seebug.db') conn = sqlite3.connect('./db/seebug.db')
cursor = conn.cursor() cursor = conn.cursor()
for entry in data: for entry in data:
# 解析并格式化时间 # 解析并格式化时间
@ -39,11 +39,11 @@ def insert_data(data):
def get_seebug_json(): def get_seebug_json():
# 检查文件是否存在 # 检查文件是否存在
if not os.path.exists('./resources/JSON/seebug.json'): if not os.path.exists('./JSON/seebug.json'):
raise FileNotFoundError(f"seebug.json文件不存在请检查程序是否运行正常") raise FileNotFoundError(f"seebug.json文件不存在请检查程序是否运行正常")
# 打开并读取JSON文件 # 打开并读取JSON文件
with open('./resources/JSON/seebug.json', 'r', encoding='utf-8') as file: with open('./JSON/seebug.json', 'r', encoding='utf-8') as file:
data = json.load(file) data = json.load(file)
# 假设data是一个包含多个JSON对象的列表 # 假设data是一个包含多个JSON对象的列表
@ -66,7 +66,7 @@ def get_seebug_json():
return total_data return total_data
def select_articles(e_hour): def select_articles(e_hour):
conn = sqlite3.connect('./resources/db/seebug.db') conn = sqlite3.connect('./db/seebug.db')
cursor = conn.cursor() cursor = conn.cursor()
# 获取当前日期和时间 # 获取当前日期和时间
@ -85,13 +85,13 @@ def select_articles(e_hour):
return results return results
def clear_table(): def clear_table():
conn = sqlite3.connect('./resources/db/seebug.db') conn = sqlite3.connect('./db/seebug.db')
cursor = conn.cursor() cursor = conn.cursor()
cursor.execute('DELETE FROM articles') cursor.execute('DELETE FROM articles')
conn.commit() conn.commit()
conn.close() conn.close()
def record_md(result, filename="./resources/history/sec_news.md"): def record_md(result, filename="./history/sec_news.md"):
# 读取现有内容 # 读取现有内容
if os.path.exists(filename): if os.path.exists(filename):
with open(filename, 'r', encoding='utf-8') as file: with open(filename, 'r', encoding='utf-8') as file:
@ -111,17 +111,18 @@ def get_filtered_articles(entries, Is_short):
record = "" record = ""
for entry in entries: for entry in entries:
if Is_short == False: if Is_short == False:
result += f"文章:[{entry[1]}]({entry[2]})\n类型:{entry[3]}\n" result += f"类型:{entry[3]}\n文章:{entry[1]}"
result += f"上传时间:{entry[5]}\n" result += f"链接:{entry[2]}\n上传时间:{entry[5]}\n"
result += f"{entry[4]}\n" result += f"{entry[4]}\n"
result += "\n" + "-" * 40 + "\n" # 添加分隔线以便区分不同文章 result += "\n" + "-" * 40 + "\n" # 添加分隔线以便区分不同文章
if Is_short == True: if Is_short == True:
result += f"文章:[{entry[1]}]({entry[2]})" result += f"文章:{entry[1]}"
result += f"上传时间:{entry[5]}\n" result += f"链接:{entry[2]}\n上传时间:{entry[5]}\n"
result += "\n" + "-" * 3 + "\n" # 添加分隔线以便区分不同文章 result += "\n" + "-" * 3 + "\n" # 添加分隔线以便区分不同文章
record += f"#### 文章:[{entry[1]}]({entry[2]})\n" record += f"#### 文章:{entry[1]}\n"
record += f"**类型**{entry[3]}\n" record += f"**类型**{entry[3]}\n"
record += f"**链接**{entry[2]}\n"
record += f"**上传时间**{entry[5]}\n" record += f"**上传时间**{entry[5]}\n"
record += f"{entry[4]}\n" record += f"{entry[4]}\n"
record += "\n" + "-" * 40 + "\n" # 添加分隔线以便区分不同文章 record += "\n" + "-" * 40 + "\n" # 添加分隔线以便区分不同文章
@ -130,7 +131,7 @@ def get_filtered_articles(entries, Is_short):
def Src_seebug(e_hour, Is_short): def Src_seebug(e_hour, Is_short):
if not os.path.exists('./resources/db/seebug.db'): if not os.path.exists('./db/seebug.db'):
# 创建数据库和表 # 创建数据库和表
create_database() create_database()

View File

@ -1,196 +0,0 @@
import os
import json
import sqlite3
from datetime import datetime, timedelta
def clear_table():
conn = sqlite3.connect('./resources/db/sougou-wx.db')
cursor = conn.cursor()
cursor.execute('DELETE FROM articles')
conn.commit()
conn.close()
def create_database():
conn = sqlite3.connect('./resources/db/sougou-wx.db')
cursor = conn.cursor()
cursor.execute('''CREATE TABLE IF NOT EXISTS articles (
id INTEGER PRIMARY KEY AUTOINCREMENT,
title TEXT,
link TEXT,
description TEXT,
pubDate DATETIME,
author TEXT,
keyword TEXT,
is_sended BOOLEAN
)''')
conn.commit()
conn.close()
def insert_data(data):
conn = sqlite3.connect('./resources/db/sougou-wx.db')
cursor = conn.cursor()
for entry in data:
# 检查是否存在相同 title 和 author 的记录
cursor.execute('''
SELECT 1 FROM articles WHERE title = ? AND author = ?
''', (entry['title'], entry['author']))
if cursor.fetchone() is None:
# 如果没有找到相同记录,则插入新记录
cursor.execute('''
INSERT INTO articles (title, link, description, pubDate, author, keyword)
VALUES (?, ?, ?, ?, ?, ?)
''', (entry['title'], entry['link'], entry['description'], entry['pubDate'], entry['author'], entry['keyword']))
conn.commit()
conn.close()
def get_json():
# 检查文件是否存在
if not os.path.exists('./resources/JSON/sougou-wx.json'):
raise FileNotFoundError(f"sougou-wx.json文件不存在请检查程序是否运行正常")
# 打开并读取JSON文件
with open('./resources/JSON/sougou-wx.json', 'r', encoding='utf-8') as file:
data = json.load(file)
# 假设data是一个包含多个关键词的字典
total_data = []
for keyword, keyword_data in data.items():
# 检查关键词对应的数据是否为列表
if not isinstance(keyword_data, list):
raise ValueError(f"关键词 {keyword} 对应的数据格式错误,请检查爬取程序是否异常!")
# 提取所需字段并编号
for index, item in enumerate(keyword_data, start=1):
entry = {
"id": index,
"title": item.get("title", ""),
"link": item.get("link", ""),
"description": item.get("description", ""),
"pubDate": item.get("pubDate", ""),
"author": item.get("author", ""),
"keyword": keyword
}
total_data.append(entry)
return total_data
def select_articles():
conn = sqlite3.connect('./resources/db/sougou-wx.db')
cursor = conn.cursor()
# 获取当前日期和时间
now = datetime.now()
two_months_ago = now - timedelta(days=60) # 假设两个月大约60天
try:
# 查询最近的3条未被标记为True的消息且发布时间不超过两个月
cursor.execute('''
SELECT * FROM articles
WHERE is_sended IS NULL AND pubDate BETWEEN ? AND ?
ORDER BY pubDate DESC
LIMIT 3
''', (two_months_ago.strftime('%Y-%m-%d %H:%M:%S'), now.strftime('%Y-%m-%d %H:%M:%S')))
# 查询最近的3条未被标记为True的消息
# cursor.execute('''
# SELECT * FROM articles
# WHERE is_sended IS NULL
# ORDER BY pubDate DESC
# LIMIT 3
# ''')
results = cursor.fetchall()
# print(results)
if results:
for row in results:
article_id = row[0]
cursor.execute('''
UPDATE articles
SET is_sended = True
WHERE id = ?
''', (article_id,))
conn.commit() # 提交事务
except Exception as e:
conn.rollback() # 回滚事务
print(f"Error: {e}")
finally:
cursor.close()
conn.close()
return results
def record_md(result, filename="./resources/history/wx_news.md"):
# 读取现有内容
if os.path.exists(filename):
with open(filename, 'r', encoding='utf-8') as file:
existing_content = file.read()
else:
existing_content = ""
# 将新内容插入到现有内容的开头
new_content = result + existing_content
# 写回文件
with open(filename, 'w', encoding='utf-8') as file:
file.write(new_content)
def get_filtered_articles(entries, Is_short):
result = ""
record = ""
for entry in entries:
if Is_short == False:
result += f"文章:[{entry[1]}]({entry[2]})\n描述:{entry[3]}\n"
result += f"上传时间:{entry[4]}\n"
result += f"作者:{entry[5]}\n"
result += f"关键词:{entry[6]}\n"
result += "\n" + "-" * 40 + "\n" # 添加分隔线以便区分不同文章
if Is_short == True:
result += f"文章:[{entry[1]}]({entry[2]})"
result += f"上传时间:{entry[4]}\n"
result += "\n" + "-" * 3 + "\n" # 添加分隔线以便区分不同文章
record += f"#### 文章:[{entry[1]}]({entry[2]})\n描述:{entry[3]}\n"
record += f"**上传时间**{entry[4]}\n"
record += f"**作者**{entry[5]}\n"
record += f"**关键词**{entry[6]}\n"
record += "\n" + "-" * 40 + "\n" # 添加分隔线以便区分不同文章
record_md(record)
return result
def Src_sougou_wx(Is_short):
if not os.path.exists('./resources/db/sougou-wx.db'):
# 创建数据库和表
create_database()
# 清空表
# clear_table()
# 获取 JSON 数据
sougou_wx_data = get_json()
# 插入数据到数据库
insert_data(sougou_wx_data)
# 查询指定时间段内的数据
filtered_articles = select_articles()
# print(filtered_articles)
if filtered_articles:
results = get_filtered_articles(filtered_articles, Is_short)
return results
else:
return False
if __name__ == "__main__":
reslts = Src_sougou_wx(False)
if reslts != False:
print(reslts)
else:
# 如果为空,则跳过执行
print("-" * 40)
print("微信公众号数据为空,跳过执行。")

View File

@ -6,7 +6,7 @@ import os
from datetime import datetime, timedelta from datetime import datetime, timedelta
def create_database(): def create_database():
conn = sqlite3.connect('./resources/db/xianzhi.db') conn = sqlite3.connect('./db/xianzhi.db')
cursor = conn.cursor() cursor = conn.cursor()
cursor.execute('''CREATE TABLE IF NOT EXISTS articles ( cursor.execute('''CREATE TABLE IF NOT EXISTS articles (
id INTEGER PRIMARY KEY AUTOINCREMENT, id INTEGER PRIMARY KEY AUTOINCREMENT,
@ -18,7 +18,7 @@ def create_database():
conn.close() conn.close()
def insert_data(data): def insert_data(data):
conn = sqlite3.connect('./resources/db/xianzhi.db') conn = sqlite3.connect('./db/xianzhi.db')
cursor = conn.cursor() cursor = conn.cursor()
for entry in data: for entry in data:
try: try:
@ -40,11 +40,11 @@ def insert_data(data):
def get_xianzhi_json(): def get_xianzhi_json():
# 检查文件是否存在 # 检查文件是否存在
if not os.path.exists('./resources/JSON/xianzhi.json'): if not os.path.exists('./JSON/xianzhi.json'):
raise FileNotFoundError(f"xianzhi.json文件不存在请检查程序是否运行正常") raise FileNotFoundError(f"xianzhi.json文件不存在请检查程序是否运行正常")
# 打开并读取JSON文件 # 打开并读取JSON文件
with open('./resources/JSON/xianzhi.json', 'r', encoding='utf-8') as file: with open('./JSON/xianzhi.json', 'r', encoding='utf-8') as file:
data = json.load(file) data = json.load(file)
# 假设data是一个包含多个JSON对象的列表 # 假设data是一个包含多个JSON对象的列表
@ -65,7 +65,7 @@ def get_xianzhi_json():
return total_data return total_data
def select_articles(e_hour): def select_articles(e_hour):
conn = sqlite3.connect('./resources/db/xianzhi.db') conn = sqlite3.connect('./db/xianzhi.db')
cursor = conn.cursor() cursor = conn.cursor()
# 获取当前日期和时间 # 获取当前日期和时间
@ -84,13 +84,13 @@ def select_articles(e_hour):
return results return results
def clear_table(): def clear_table():
conn = sqlite3.connect('./resources/db/xianzhi.db') conn = sqlite3.connect('./db/xianzhi.db')
cursor = conn.cursor() cursor = conn.cursor()
cursor.execute('DELETE FROM articles') cursor.execute('DELETE FROM articles')
conn.commit() conn.commit()
conn.close() conn.close()
def record_md(result, filename="./resources/history/tech_passage.md"): def record_md(result, filename="./history/tech_passage.md"):
# 读取现有内容 # 读取现有内容
if os.path.exists(filename): if os.path.exists(filename):
with open(filename, 'r', encoding='utf-8') as file: with open(filename, 'r', encoding='utf-8') as file:
@ -110,15 +110,16 @@ def get_filtered_articles(entries, Is_short):
record = "" record = ""
for entry in entries: for entry in entries:
if Is_short == False: if Is_short == False:
result += f"文章:[{entry[1]}]({entry[2]})\n" result += f"文章:{entry[1]}\n"
result += f"上传时间:{entry[3]}\n" result += f"链接:{entry[2]}\n上传时间:{entry[3]}\n"
result += "\n" + "-" * 40 + "\n" # 添加分隔线以便区分不同文章 result += "\n" + "-" * 40 + "\n" # 添加分隔线以便区分不同文章
if Is_short == False: if Is_short == False:
result += f"文章:[{entry[1]}]({entry[2]})\n" result += f"文章:{entry[1]}\n"
result += f"上传时间:{entry[3]}\n" result += f"链接:{entry[2]}\n上传时间:{entry[3]}\n"
result += "\n" + "-" * 3 + "\n" # 添加分隔线以便区分不同文章 result += "\n" + "-" * 3 + "\n" # 添加分隔线以便区分不同文章
record += f"#### 文章:[{entry[1]}]({entry[2]})\n" record += f"#### 文章:{entry[1]}\n"
record += f"**链接**{entry[2]}\n"
record += f"**上传时间**{entry[3]}\n" record += f"**上传时间**{entry[3]}\n"
record += "\n" + "-" * 40 + "\n" # 添加分隔线以便区分不同文章 record += "\n" + "-" * 40 + "\n" # 添加分隔线以便区分不同文章
record_md(record) record_md(record)
@ -126,7 +127,7 @@ def get_filtered_articles(entries, Is_short):
def Src_xianzhi(e_hour, Is_short): def Src_xianzhi(e_hour, Is_short):
if not os.path.exists('./resources/db/xianzhi.db'): if not os.path.exists('./db/xianzhi.db'):
# 创建数据库和表 # 创建数据库和表
create_database() create_database()

View File

@ -1,17 +1,10 @@
## 持续更新中 <br> ## 持续更新中 <br>
RSS订阅链接来源https://github.com/zhengjim/Chinese-Security-RSS <br> RSS订阅链接来源https://github.com/zhengjim/Chinese-Security-RSS <br>
使用python-json进行格式化然后使用飞书webhook机器人进行发送 <br> 使用python-json进行格式化然后使用飞书webhook机器人进行发送 <br>
./config/config.yaml 可指定发送渠道、运行方式等基础配置信息 <br> config.yaml可指定大部分可能需要的参数 <br>
./config/keywords.yaml 可指定关键词参数 <br>
./config/github_config.yaml 可指定Github相关参数 <br>
### 项目特色 <br>
- 模块化爬虫获取信息部分、分析对获取的json信息进行筛选分析存储、推送推送至各渠道、网页等各模块均可单独运行。 <br>
- 轻量化默认使用sqlite以及其他常见的各系统自带的库用户仅需配置python环境不会占用过多内存。 <br>
- 简单化配置好config后即可一步运行效率极高。 <br>
### 日志相关 ### 日志相关
请查看./resources/log文件夹下内容 <br> 请查看./log文件夹下内容 <br>
### 使用建议: <br> ### 使用建议: <br>
Linux系统建议下载screen于后台持续运行本脚本。 <br> Linux系统建议下载screen于后台持续运行本脚本。 <br>
@ -27,29 +20,20 @@ centos: `yum install screen` <br>
随后便可直接运行:`python Core.py` <br> 随后便可直接运行:`python Core.py` <br>
web运行`python ./web/app.py` <br> web运行`python ./web/app.py` <br>
随后web网页将会在本地5000端口启动访问即可使用反向代理即可以域名映射到外网 <br> 随后web网页将会在本地5000端口启动访问即可使用反向代理即可以域名映射到外网 <br>
直接访问web域名即可查看历史推送访问路径/log即可查看程序运行日志/weblog查看flask日志 <br> 直接访问web域名即可查看历史推送访问路径/log即可查看程序运行日志 <br>
### 配置 <br> ### 配置 <br>
首先先在飞书中创建群组然后再创建WebHook机器人 <br> 首先先在飞书中创建群组然后再创建WebHook机器人 <br>
<center><img src="./resources/imgs/group.jpg" width="50%" alt="群组"/></center><br> <center><img src="./imgs/group.jpg" width="50%" alt="群组"/></center><br>
<center><img src="./resources/imgs/add_bot.jpg" width="50%" alt="添加机器人"/></center><br> <center><img src="./imgs/add_bot.jpg" width="50%" alt="添加机器人"/></center><br>
随后在配置机器人时可打开签名验证您也可自行选择IP白名单 <br> 随后在配置机器人时可打开签名验证您也可自行选择IP白名单 <br>
<center><img src="./resources/imgs/bot_config.jpg" width="50%" alt="机器人配置"/></center><br> <center><img src="./imgs/bot_config.jpg" width="50%" alt="机器人配置"/></center><br>
再之后将配置信息填入config.yaml文件当中 <br> 再之后将配置信息填入config.yaml文件当中 <br>
<center><img src="./resources/imgs/config.jpg" width="50%" alt="配置"/></center><br> <center><img src="./imgs/config.jpg" width="50%" alt="配置"/></center><br>
那么现在,您就可以开始运行使用了。 <br> 那么现在,您就可以开始运行使用了。 <br>
### Github访问限制配置 <br>
若短时间内请求次数过多可能会被github限制可参考以下配置 <br>
- 对于未经身份验证的请求github 速率限制允许每小时最多 60 个请求 <br>
- 而通过使用基本身份验证的 API 请求,每小时最多可以发出 5,000 个请求 <br>
- https://github.com/settings/tokens/new 创建token时间建议选择无限制。<br>
<center><img src="./resources/imgs/github-token.png" width="80%" alt="飞书运行提示"/></center><br>
`./config/github_config.yaml`中可配置github_token <br>
### 运行结果 <br> ### 运行结果 <br>
<center><img src="./resources/imgs/run.jpg" width="50%" alt="后端"/></center><br> <center><img src="./imgs/start.jpg" width="50%" alt="飞书运行提示"/></center><br>
<center><img src="./resources/imgs/start.jpg" width="50%" alt="飞书运行提示"/></center><br> <center><img src="./imgs/result.jpg" width="50%" alt="飞书运行展示"/></center><br>
<center><img src="./resources/imgs/result.jpg" width="50%" alt="飞书运行展示"/></center><br> <center><img src="./imgs/run.jpg" width="50%" alt="后端"/></center><br>
<center><img src="./resources/imgs/result_mobile.jpg" width="70%" alt="飞书运行展示-安卓"/></center><br>

View File

@ -9,7 +9,7 @@ import time
import yaml import yaml
def gen_sign(): def gen_sign():
with open('./config/config.yaml', 'r', encoding="utf-8") as file: with open('./config.yaml', 'r', encoding="utf-8") as file:
config = yaml.safe_load(file) config = yaml.safe_load(file)
secret = f"{config['fs_secret']}" secret = f"{config['fs_secret']}"
# print(secret) # print(secret)

View File

@ -7,7 +7,7 @@ from email.mime.text import MIMEText
from email.header import Header from email.header import Header
# 加载参数 # 加载参数
with open('./config/config.yaml', 'r', encoding="utf-8") as file: with open('./config.yaml', 'r', encoding="utf-8") as file:
config = yaml.safe_load(file) config = yaml.safe_load(file)
mail_host = f"{config['mail_host']}" mail_host = f"{config['mail_host']}"
mail_user = f"{config['mail_user']}" mail_user = f"{config['mail_user']}"

View File

@ -9,7 +9,7 @@ import hmac
import time import time
import yaml import yaml
with open('./config/config.yaml', 'r', encoding="utf-8") as file: with open('./config.yaml', 'r', encoding="utf-8") as file:
config = yaml.safe_load(file) config = yaml.safe_load(file)
webhook_url = f"https://qyapi.weixin.qq.com/cgi-bin/webhook/send?key={config['wx_key']}" webhook_url = f"https://qyapi.weixin.qq.com/cgi-bin/webhook/send?key={config['wx_key']}"
# print(webhook_url) # print(webhook_url)

View File

@ -1,33 +1,21 @@
### 从2024年12月15日开始记录
### 问题反馈 <br> ### 问题反馈 <br>
- 准点发送的文章在定点推送模式下可能会被遗漏推送 <br> - 准点发送的文章在定点推送模式下可能会被遗漏推送 <br>
`已通过增加时间范围功能改善此问题,彻底解决需要重构代码`
- 钉钉/企业微信/蓝信webhook存在字节长度限制需要优化程序推送逻辑 <br> - 钉钉/企业微信/蓝信webhook存在字节长度限制需要优化程序推送逻辑 <br>
`尽力改善中,彻底解决需重构代码`
### 下一步计划(待完成) <br> ### 下一步计划 <br>
- 添加更多RSS订阅源持续进行中 <br> - 添加更多RSS订阅源持续进行中 <br>
- 更换筛选模块,由时段筛选改为历史记录筛选以确保不会有资讯漏报 <br>
- 添加更多推送方式,如邮件、微信等 <br>
- 添加GitHub等监测源参考github-cve-monitor <br>
- 添加Mysql作为数据库存储 <br>
### 下一步计划(已完成) <br>
- 将所有打印信息转为logging info并存档已完成<br> - 将所有打印信息转为logging info并存档已完成<br>
- 将logging info转为异步的loguru已完成 <br> - 将logging info转为异步的loguru已完成 <br>
- 探查异常中断原因(已解决获取rss源时的请求未做超时检测 <br> - 探查异常中断原因已发现获取rss源时的请求未做超时 <br>
- 添加超时机制,防止程序异常卡死(已完成) <br> - 添加超时机制,防止程序异常卡死(已完成) <br>
- 存档所有推送文章方便以后查看(已完成) <br> - 存档所有推送文章方便以后查看(已完成) <br>
- 创建Web网页以展示最新推送info.masonliu.com已完成 <br> - 添加更多推送方式,如邮件、微信等 <br>
- 创建Web网页以展示最新推送info.masonliu.com <br>
### 更新日志 ### 更新日志
#### 从2024年12月15日开始记录
- 2024年12月15日早优化了文件结构修复了日志记录时的小BUG添加web展示日志功能 <br> - 2024年12月15日早优化了文件结构修复了日志记录时的小BUG添加web展示日志功能 <br>
- 2024年12月15日晚修复了单次运行结束时的校验错误问题 <br> - 2024年12月15日晚修复了单次运行结束时的校验错误问题 <br>
- 2024年12月18日早添加了短文本推送机制一定程度上解决了长字节推送问题解决办法正在思考中 <br> - 2024年12月18日早添加了短文本推送机制一定程度上解决了长字节推送问题解决办法正在思考中 <br>
- 2024年12月24日晚上传了测试0.1版本,修复了报错问题 <br> - 2024年12月24日晚上传了测试0.1版本,修复了报错问题 <br>
- 2024年12月25日早优化了代码逻辑和表现 <br>
- 2024年12月25日晚优化了推送报文格式 <br>
- 2024年12月30日添加并完善了基于搜狗搜索的微信公众号文章监测 <br>
- 2024年12月30日晚为洞见微信资讯推送添加了关键词筛选 <br>
- 2025年01月02日更新并上线了Github项目监测功能readme后续更新中 <br>
- 2025年01月02日晚再次优化了文件结构提高可读性 <br>

24
config.yaml Normal file
View File

@ -0,0 +1,24 @@
# 飞书相关配置信息
fs_activate: True
fs_key: # 此处填写token记得冒号后空一格如aa04a02f-d7bf-4279-bd48-44c4f28c8f74
fs_secret: # 此处填写签名密钥记得冒号后空一格如4tq65T4jm1MO2IlxvHxBWe
# 企业微信相关配置信息
wx_activate: True
wx_key: # 此处填写token记得冒号后空一格如9a3dd6ff-75d6-4208-bc4b-77724a5805d6
# 钉钉相关配置信息
ding_activate: False
# 蓝信相关配置信息
lx_activate: False
# 邮件配置,邮件推送正在完善中
mail_host: smtp.masonliu.com #设置服务器
mail_user: test@masonliu.com #用户名
mail_pass: Test123456 #口令
sender: test@masonliu.com
receivers: ['2857911564@qq.com']
# 结算时间范围
e_hour: 4 # 程序运行时间间隔
circle: 1 # 是否启用循环设置为0后将设置为特定时间点运行

View File

@ -1,73 +0,0 @@
import yaml
from loguru import logger
# 清除所有已有的日志记录器配置
logger.remove()
logger.add("./log/core.log",
format="{time:YYYY-MM-DD HH:mm:ss} - {level} - {name}:{function}:{line} - {message}",
rotation="100 MB",
compression="zip",
encoding="utf-8")
# shell终端打印日志
logger.add(lambda msg: print(msg),
format="{time:YYYY-MM-DD HH:mm:ss} - {level} - {name}:{function}:{line} - {message}")
def get_core_config():
# 加载参数
with open('./config/config.yaml', 'r', encoding="utf-8") as file:
config = yaml.safe_load(file)
logger.debug(f"Loaded config: {config}") # 输出加载的配置
time_choice = int(f"{config['time_mode']}")
choice = config['mode'] # 假设 mode 是一个列表
e_hour = int(config.get('e_hour', '4')) # 默认循环时间为4小时
fs_activate = f"{config['fs_activate']}"
if fs_activate == "True":
fs_key = config.get('fs_key')
fs_secret = config.get('fs_secret')
if not fs_key or not fs_secret:
logger.error("飞书相关配置不能为空,请检查配置文件./config/config.yaml")
exit(5)
wx_activate = f"{config['wx_activate']}"
if wx_activate == "True":
wx_key = config.get('wx_key')
if not wx_key:
logger.error("企业微信相关配置不能为空,请检查配置文件./config/config.yaml")
exit(5)
ding_activate = f"{config['ding_activate']}"
if ding_activate == "True":
ding_key = config.get('ding_key')
if not ding_key:
logger.error("钉钉相关配置不能为空,请检查配置文件./config/config.yaml")
exit(5)
lx_activate = f"{config['lx_activate']}"
if lx_activate == "True":
lx_key = config.get('lx_key')
if not lx_key:
logger.error("蓝信相关配置不能为空,请检查配置文件./config/config.yaml")
exit(5)
url_web = f"{config['url']}"
return e_hour, time_choice, choice, fs_activate, wx_activate, ding_activate, lx_activate, url_web
def get_debug_config():
with open('./config/config.yaml', 'r', encoding="utf-8") as file:
config = yaml.safe_load(file)
debug = f"{config['debug']}"
return debug
def get_kewords_config():
with open('./config/keywords.yaml', 'r', encoding="utf-8") as file:
config = yaml.safe_load(file)
Sogou_WX = config['Sogou-WX']
Doonsec_switch = f"{config['Doonsec-switch']}"
Doonsec = config['Doonsec']
return Sogou_WX, Doonsec_switch, Doonsec

View File

@ -1,41 +0,0 @@
# 飞书相关配置信息
# fs_activate: True
# fs_key: aa04a02f-d7bf-4279-bd48-44c4f28c8f74 # 此处填写token记得冒号后空一格如aa04a02f-d7bf-4279-bd48-44c4f28c8f74
# fs_secret: 4tq65T4jm1MO2IlxvHxBWe # 此处填写签名密钥记得冒号后空一格如4tq65T4jm1MO2IlxvHxBWe
fs_activate: True
fs_key: 202d7e51-9a46-422e-a035-863bc42bc459 # 此处填写token记得冒号后空一格如aa04a02f-d7bf-4279-bd48-44c4f28c8f74
fs_secret: eZaSCl5DSqtJyZ8QpJBDFh # 此处填写签名密钥记得冒号后空一格如4tq65T4jm1MO2IlxvHxBWe
# 企业微信相关配置信息
wx_activate: False
wx_key: # 此处填写token记得冒号后空一格如9a3dd6ff-75d6-4208-bc4b-77724a5805d6
# 钉钉相关配置信息
ding_activate: False
ding_key:
# 蓝信相关配置信息
lx_activate: False
lx_key:
# 邮件配置,邮件推送正在完善中
mail_host: smtp.masonliu.com #设置服务器
mail_user: test@masonliu.com #用户名
mail_pass: Test123456 #口令
sender: test@masonliu.com
receivers: ['2857911564@qq.com']
# 结算时间范围
e_hour: 4 # 程序运行时间间隔
time_mode: 1
# 0定时运行模式仅在指定时间运行参照Core.py中设置
# 1启用循环一定间隔时间后运行
mode: [1, 2] # 运行模式,可多选
# 0启用RSS抓取模式
# 1启用搜狗-微信公众号文章监测
# 2启用github项目监测
# 网址配置
url: https://info.masonliu.com/ # 请设置为您自己反代的域名,或者改为 http://127.0.0.1:5000 或者对应IP域名
# 调试模式
debug: True

View File

@ -1,57 +0,0 @@
# github相关配置信息
github_token: # 此处填写github-token在高速率获取github资源时有效防止403封禁
translate: False # 是否开启翻译
# 监控列表
tool_list: # 监控已创建的仓库是否更新
- BeichenDream/Godzilla
- rebeyond/Behinder
- AntSwordProject/antSword
- j1anFen/shiro_attack
- yhy0/github-cve-monitor
- gentilkiwi/mimikatz
- ehang-io/nps
- chaitin/xray
- FunnyWolf/pystinger
- L-codes/Neo-reGeorg
- shadow1ng/fscan
- SafeGroceryStore/MDUT
- EdgeSecurityTeam/Vulnerability
- Vme18000yuan/FreePOC
- wy876/POC
keyword_list: # 监控关键词
- sql注入
- cnvd
- 未授权
- 漏洞POC
- RCE
- 渗透测试
- 反序列化
- 攻防
- webshell
- 红队
- redteam
- 信息收集
- 绕过
- bypass av
user_list: # 监控用户
- su18
- BeichenDream
- phith0n
- zhzyker
- lijiejie
- projectdiscovery
- HavocFramework
black_words: # 监控违禁词
- 反共
- 反中共
- 反华
- 香港独立
- 港独
- 共产党
- 毛泽东
- 习近平
- 台独

View File

@ -1,3 +0,0 @@
Sogou-WX: ["银行测试", "APP逆向", "渗透测试", "手机银行漏洞", "银行漏洞", "支付漏洞"] # 基于搜狗引擎搜索特定关键词的微信公众号文章
Doonsec-switch: False
Doonsec: ["逆向", "解包", "POC"] # 洞见微信安全资讯关键词

View File

@ -1,143 +0,0 @@
### 设计思路
前情提要GitHub的API接口为json格式极其方便于使用python进行提取分析
api地址
获取关键词下更新的最新仓库源https://api.github.com/search/repositories?q={Keyword}&sort=updated&per_page=30
- sort=updated按更新时间排序
- per_page=30查询数量建议设置为30
- page=1指定查询页数
获取指定用户的仓库更新情况https://api.github.com/users/{user}/repos
获取指定仓库commit情况https://api.github.com/repos/{user}/{repo}
### 速率限制
headers ={ "Authorization": " token OAUTH-TOKEN"}
OAUTH-TOKENgithub个人账号设置->开发者设置->个人token。创建一个新token时可以选择具体的权限创建成功时一定要复制到本地哪里保存只会让你看见一次如果忘记的话就需要重新生成。
### 使用技术
- python-json解析
- python-sqlite联动
- python-request爬虫
- sqlite筛选
### 参考Json源格式
所需部分:
- html_url
- created_at仓库创建时间
- updated_at仓库最近更新时间
- pushed_at仓库最近推送时间参考此元素进行设计
- description仓库描述
{
"id": 511095846,
"node_id": "R_kgDOHna0Jg",
"name": "TestnetProtocol",
"full_name": "exorde-labs/TestnetProtocol",
"private": false,
"owner": {
"login": "exorde-labs",
"id": 64810085,
"node_id": "MDEyOk9yZ2FuaXphdGlvbjY0ODEwMDg1",
"avatar_url": "https://avatars.githubusercontent.com/u/64810085?v=4",
"gravatar_id": "",
"url": "https://api.github.com/users/exorde-labs",
"html_url": "https://github.com/exorde-labs",
"followers_url": "https://api.github.com/users/exorde-labs/followers",
"following_url": "https://api.github.com/users/exorde-labs/following{/other_user}",
"gists_url": "https://api.github.com/users/exorde-labs/gists{/gist_id}",
"starred_url": "https://api.github.com/users/exorde-labs/starred{/owner}{/repo}",
"subscriptions_url": "https://api.github.com/users/exorde-labs/subscriptions",
"organizations_url": "https://api.github.com/users/exorde-labs/orgs",
"repos_url": "https://api.github.com/users/exorde-labs/repos",
"events_url": "https://api.github.com/users/exorde-labs/events{/privacy}",
"received_events_url": "https://api.github.com/users/exorde-labs/received_events",
"type": "Organization",
"user_view_type": "public",
"site_admin": false
},
"html_url": "https://github.com/exorde-labs/TestnetProtocol",
"description": null,
"fork": false,
"url": "https://api.github.com/repos/exorde-labs/TestnetProtocol",
"forks_url": "https://api.github.com/repos/exorde-labs/TestnetProtocol/forks",
"keys_url": "https://api.github.com/repos/exorde-labs/TestnetProtocol/keys{/key_id}",
"collaborators_url": "https://api.github.com/repos/exorde-labs/TestnetProtocol/collaborators{/collaborator}",
"teams_url": "https://api.github.com/repos/exorde-labs/TestnetProtocol/teams",
"hooks_url": "https://api.github.com/repos/exorde-labs/TestnetProtocol/hooks",
"issue_events_url": "https://api.github.com/repos/exorde-labs/TestnetProtocol/issues/events{/number}",
"events_url": "https://api.github.com/repos/exorde-labs/TestnetProtocol/events",
"assignees_url": "https://api.github.com/repos/exorde-labs/TestnetProtocol/assignees{/user}",
"branches_url": "https://api.github.com/repos/exorde-labs/TestnetProtocol/branches{/branch}",
"tags_url": "https://api.github.com/repos/exorde-labs/TestnetProtocol/tags",
"blobs_url": "https://api.github.com/repos/exorde-labs/TestnetProtocol/git/blobs{/sha}",
"git_tags_url": "https://api.github.com/repos/exorde-labs/TestnetProtocol/git/tags{/sha}",
"git_refs_url": "https://api.github.com/repos/exorde-labs/TestnetProtocol/git/refs{/sha}",
"trees_url": "https://api.github.com/repos/exorde-labs/TestnetProtocol/git/trees{/sha}",
"statuses_url": "https://api.github.com/repos/exorde-labs/TestnetProtocol/statuses/{sha}",
"languages_url": "https://api.github.com/repos/exorde-labs/TestnetProtocol/languages",
"stargazers_url": "https://api.github.com/repos/exorde-labs/TestnetProtocol/stargazers",
"contributors_url": "https://api.github.com/repos/exorde-labs/TestnetProtocol/contributors",
"subscribers_url": "https://api.github.com/repos/exorde-labs/TestnetProtocol/subscribers",
"subscription_url": "https://api.github.com/repos/exorde-labs/TestnetProtocol/subscription",
"commits_url": "https://api.github.com/repos/exorde-labs/TestnetProtocol/commits{/sha}",
"git_commits_url": "https://api.github.com/repos/exorde-labs/TestnetProtocol/git/commits{/sha}",
"comments_url": "https://api.github.com/repos/exorde-labs/TestnetProtocol/comments{/number}",
"issue_comment_url": "https://api.github.com/repos/exorde-labs/TestnetProtocol/issues/comments{/number}",
"contents_url": "https://api.github.com/repos/exorde-labs/TestnetProtocol/contents/{+path}",
"compare_url": "https://api.github.com/repos/exorde-labs/TestnetProtocol/compare/{base}...{head}",
"merges_url": "https://api.github.com/repos/exorde-labs/TestnetProtocol/merges",
"archive_url": "https://api.github.com/repos/exorde-labs/TestnetProtocol/{archive_format}{/ref}",
"downloads_url": "https://api.github.com/repos/exorde-labs/TestnetProtocol/downloads",
"issues_url": "https://api.github.com/repos/exorde-labs/TestnetProtocol/issues{/number}",
"pulls_url": "https://api.github.com/repos/exorde-labs/TestnetProtocol/pulls{/number}",
"milestones_url": "https://api.github.com/repos/exorde-labs/TestnetProtocol/milestones{/number}",
"notifications_url": "https://api.github.com/repos/exorde-labs/TestnetProtocol/notifications{?since,all,participating}",
"labels_url": "https://api.github.com/repos/exorde-labs/TestnetProtocol/labels{/name}",
"releases_url": "https://api.github.com/repos/exorde-labs/TestnetProtocol/releases{/id}",
"deployments_url": "https://api.github.com/repos/exorde-labs/TestnetProtocol/deployments",
"created_at": "2022-07-06T10:44:29Z",
"updated_at": "2024-12-27T02:20:32Z",
"pushed_at": "2024-12-27T02:20:28Z",
"git_url": "git://github.com/exorde-labs/TestnetProtocol.git",
"ssh_url": "git@github.com:exorde-labs/TestnetProtocol.git",
"clone_url": "https://github.com/exorde-labs/TestnetProtocol.git",
"svn_url": "https://github.com/exorde-labs/TestnetProtocol",
"homepage": null,
"size": 1918317,
"stargazers_count": 16,
"watchers_count": 16,
"language": "Solidity",
"has_issues": true,
"has_projects": true,
"has_downloads": true,
"has_wiki": true,
"has_pages": false,
"has_discussions": false,
"forks_count": 20,
"mirror_url": null,
"archived": false,
"disabled": false,
"open_issues_count": 0,
"license": {
"key": "mit",
"name": "MIT License",
"spdx_id": "MIT",
"url": "https://api.github.com/licenses/mit",
"node_id": "MDc6TGljZW5zZTEz"
},
"allow_forking": true,
"is_template": false,
"web_commit_signoff_required": false,
"topics": [
],
"visibility": "public",
"forks": 20,
"open_issues": 0,
"watchers": 16,
"default_branch": "main",
"score": 1.0
}
### 参考代码
year = datetime.datetime.now().year
api = "https://api.github.com/search/repositories?q=CVE-{}&sort=updated".format(year)
json_str = requests.get(api, headers=github_headers, timeout=10).json()

View File

Before

Width:  |  Height:  |  Size: 267 KiB

After

Width:  |  Height:  |  Size: 267 KiB

View File

Before

Width:  |  Height:  |  Size: 113 KiB

After

Width:  |  Height:  |  Size: 113 KiB

View File

Before

Width:  |  Height:  |  Size: 109 KiB

After

Width:  |  Height:  |  Size: 109 KiB

View File

Before

Width:  |  Height:  |  Size: 275 KiB

After

Width:  |  Height:  |  Size: 275 KiB

View File

Before

Width:  |  Height:  |  Size: 199 KiB

After

Width:  |  Height:  |  Size: 199 KiB

View File

Before

Width:  |  Height:  |  Size: 117 KiB

After

Width:  |  Height:  |  Size: 117 KiB

View File

Before

Width:  |  Height:  |  Size: 95 KiB

After

Width:  |  Height:  |  Size: 95 KiB

View File

@ -6,6 +6,15 @@ import json
from requests.exceptions import RequestException from requests.exceptions import RequestException
from loguru import logger from loguru import logger
logger.add("./log/spider.log",
format="{time:YYYY-MM-DD HH:mm:ss} - {level} - {name}:{function}:{line} - {message}",
rotation="10 MB",
compression="zip",
encoding="utf-8")
# shell终端打印日志
# logger.add(lambda msg: print(msg),
# format="{time:YYYY-MM-DD HH:mm:ss} - {level} - {name}:{function}:{line} - {message}")
# 测试用爬虫请求头 # 测试用爬虫请求头
headers = { headers = {
"Content-Type": "application/json", "Content-Type": "application/json",
@ -73,11 +82,11 @@ def seebug_main():
items = parse_rss(rss_content) items = parse_rss(rss_content)
# 确保目录存在 # 确保目录存在
os.makedirs(os.path.dirname('./resources/JSON/seebug.json'), exist_ok=True) os.makedirs(os.path.dirname('./JSON/seebug.json'), exist_ok=True)
# 将解析后的数据保存到 JSON 文件 # 将解析后的数据保存到 JSON 文件
save_to_json(items, './resources/JSON/seebug.json') save_to_json(items, './JSON/seebug.json')
logger.info("数据已保存到 ./resources/JSON/seebug.json") logger.info("数据已保存到 ./JSON/seebug.json")
# 4hou 爬虫 # 4hou 爬虫
def M_4hou_main(): def M_4hou_main():
@ -91,11 +100,11 @@ def M_4hou_main():
items = parse_rss(rss_content) items = parse_rss(rss_content)
# 确保目录存在 # 确保目录存在
os.makedirs(os.path.dirname('./resources/JSON/4hou.json'), exist_ok=True) os.makedirs(os.path.dirname('./JSON/4hou.json'), exist_ok=True)
# 将解析后的数据保存到 JSON 文件 # 将解析后的数据保存到 JSON 文件
save_to_json(items, './resources/JSON/4hou.json') save_to_json(items, './JSON/4hou.json')
logger.info("数据已保存到 ./resources/JSON/4hou.json") logger.info("数据已保存到 ./JSON/4hou.json")
# 安全客 爬虫 # 安全客 爬虫
def anquanke_main(): def anquanke_main():
@ -109,11 +118,11 @@ def anquanke_main():
items = parse_rss(rss_content) items = parse_rss(rss_content)
# 确保目录存在 # 确保目录存在
os.makedirs(os.path.dirname('./resources/JSON/anquanke.json'), exist_ok=True) os.makedirs(os.path.dirname('./JSON/anquanke.json'), exist_ok=True)
# 将解析后的数据保存到 JSON 文件 # 将解析后的数据保存到 JSON 文件
save_to_json(items, './resources/JSON/anquanke.json') save_to_json(items, './JSON/anquanke.json')
logger.info("数据已保存到 ./resources/JSON/anquanke.json") logger.info("数据已保存到 ./JSON/anquanke.json")
# sec_wiki 爬虫 # sec_wiki 爬虫
def sec_wiki_main(): def sec_wiki_main():
@ -127,11 +136,11 @@ def sec_wiki_main():
items = parse_rss(rss_content) items = parse_rss(rss_content)
# 确保目录存在 # 确保目录存在
os.makedirs(os.path.dirname('./resources/JSON/sec_wiki.json'), exist_ok=True) os.makedirs(os.path.dirname('./JSON/sec_wiki.json'), exist_ok=True)
# 将解析后的数据保存到 JSON 文件 # 将解析后的数据保存到 JSON 文件
save_to_json(items, './resources/JSON/sec_wiki.json') save_to_json(items, './JSON/sec_wiki.json')
logger.info("数据已保存到 ./resources/JSON/sec_wiki.json") logger.info("数据已保存到 ./JSON/sec_wiki.json")
# 华为 爬虫 # 华为 爬虫
def huawei_main(): def huawei_main():
@ -145,11 +154,11 @@ def huawei_main():
items = parse_rss(rss_content) items = parse_rss(rss_content)
# 确保目录存在 # 确保目录存在
os.makedirs(os.path.dirname('./resources/JSON/huawei.json'), exist_ok=True) os.makedirs(os.path.dirname('./JSON/huawei.json'), exist_ok=True)
# 将解析后的数据保存到 JSON 文件 # 将解析后的数据保存到 JSON 文件
save_to_json(items, './resources/JSON/huawei.json') save_to_json(items, './JSON/huawei.json')
logger.info("数据已保存到 ./resources/JSON/huawei.json") logger.info("数据已保存到 ./JSON/huawei.json")
# 洞见微信聚合爬虫 # 洞见微信聚合爬虫
def doonsec_main(): def doonsec_main():
@ -163,11 +172,11 @@ def doonsec_main():
items = parse_rss(rss_content) items = parse_rss(rss_content)
# 确保目录存在 # 确保目录存在
os.makedirs(os.path.dirname('./resources/JSON/doonsec.json'), exist_ok=True) os.makedirs(os.path.dirname('./JSON/doonsec.json'), exist_ok=True)
# 将解析后的数据保存到 JSON 文件 # 将解析后的数据保存到 JSON 文件
save_to_json(items, './resources/JSON/doonsec.json') save_to_json(items, './JSON/doonsec.json')
logger.info("数据已保存到 ./resources/JSON/doonsec.json") logger.info("数据已保存到 ./JSON/doonsec.json")
# 奇安信攻防社区 爬虫 # 奇安信攻防社区 爬虫
def qianxin_main(): def qianxin_main():
@ -181,11 +190,11 @@ def qianxin_main():
items = parse_rss(rss_content) items = parse_rss(rss_content)
# 确保目录存在 # 确保目录存在
os.makedirs(os.path.dirname('./resources/JSON/qianxin.json'), exist_ok=True) os.makedirs(os.path.dirname('./JSON/qianxin.json'), exist_ok=True)
# 将解析后的数据保存到 JSON 文件 # 将解析后的数据保存到 JSON 文件
save_to_json(items, './resources/JSON/qianxin.json') save_to_json(items, './JSON/qianxin.json')
logger.info("数据已保存到 ./resources/JSON/qianxin.json") logger.info("数据已保存到 ./JSON/qianxin.json")
def run(): def run():
seebug_main() seebug_main()

View File

@ -6,6 +6,15 @@ import json
from requests.exceptions import RequestException from requests.exceptions import RequestException
from loguru import logger from loguru import logger
logger.add("./log/spider.log",
format="{time:YYYY-MM-DD HH:mm:ss} - {level} - {name}:{function}:{line} - {message}",
rotation="10 MB",
compression="zip",
encoding="utf-8")
# shell终端打印日志
# logger.add(lambda msg: print(msg),
# format="{time:YYYY-MM-DD HH:mm:ss} - {level} - {name}:{function}:{line} - {message}")
# 测试用爬虫请求头 # 测试用爬虫请求头
headers = { headers = {
"Content-Type": "application/json", "Content-Type": "application/json",
@ -62,11 +71,11 @@ def freebuf_main():
items = parse_rss(rss_content) items = parse_rss(rss_content)
# 确保目录存在 # 确保目录存在
os.makedirs(os.path.dirname('./resources/JSON/freebuf.json'), exist_ok=True) os.makedirs(os.path.dirname('./JSON/freebuf.json'), exist_ok=True)
# 将解析后的数据保存到 JSON 文件 # 将解析后的数据保存到 JSON 文件
save_to_json(items, './resources/JSON/freebuf.json') save_to_json(items, './JSON/freebuf.json')
logger.info("数据已保存到 ./resources/JSON/freebuf.json") logger.info("数据已保存到 ./JSON/freebuf.json")
except Exception as e: except Exception as e:
logger.error(f"解析或保存Freebuf RSS内容时发生错误: {e}") logger.error(f"解析或保存Freebuf RSS内容时发生错误: {e}")

View File

@ -5,6 +5,15 @@ import xmltodict
import json import json
from loguru import logger from loguru import logger
logger.add("./log/spider.log",
format="{time:YYYY-MM-DD HH:mm:ss} - {level} - {name}:{function}:{line} - {message}",
rotation="10 MB",
compression="zip",
encoding="utf-8")
# shell终端打印日志
# logger.add(lambda msg: print(msg),
# format="{time:YYYY-MM-DD HH:mm:ss} - {level} - {name}:{function}:{line} - {message}")
# 测试用爬虫请求头 # 测试用爬虫请求头
headers = { headers = {
"Content-Type": "application/atom+xml; charset=utf-8", "Content-Type": "application/atom+xml; charset=utf-8",
@ -61,10 +70,10 @@ def xianzhi_main():
json_data = json.dumps(entries_json, ensure_ascii=False, indent=4) json_data = json.dumps(entries_json, ensure_ascii=False, indent=4)
# 保存 JSON 数据到文件 # 保存 JSON 数据到文件
with open('./resources/JSON/xianzhi.json', 'w', encoding='utf-8') as json_file: with open('./JSON/xianzhi.json', 'w', encoding='utf-8') as json_file:
json_file.write(json_data) json_file.write(json_data)
logger.info("数据已保存到 ./resources/JSON/xianzhi.json") logger.info("数据已保存到 ./JSON/xianzhi.json")
except Exception as e: except Exception as e:
logger.error(f"解析或保存先知社区RSS内容时发生错误: {e}") logger.error(f"解析或保存先知社区RSS内容时发生错误: {e}")

View File

@ -5,4 +5,3 @@ schedule
requests requests
python-dateutil python-dateutil
loguru loguru
beautifulsoup4

File diff suppressed because it is too large Load Diff

Binary file not shown.

Before

Width:  |  Height:  |  Size: 211 KiB

Binary file not shown.

Before

Width:  |  Height:  |  Size: 1.7 MiB

View File

View File

@ -1,236 +0,0 @@
# -*- coding: utf-8 -*-
import time
import yaml
import requests
import json
import datetime
from requests.exceptions import RequestException
import xml.etree.ElementTree as ET
from loguru import logger
MAX_DESCRIPTION_LENGTH = 300
with open('./config/github_config.yaml', 'r', encoding="utf-8") as file:
config = yaml.safe_load(file)
# list = yaml.load(f,Loader=yaml.FullLoader)
token = config['github_token']
tool_list, keyword_list, user_list, black_words = config['tool_list'], config['keyword_list'], config['user_list'], config['black_words']
def fetch_rss(url, timeout=10):
if token is None:
headers = {
"Content-Type": "application/json"
}
else:
headers = {
'Authorization': f"token {token}",
"Content-Type": "application/json"
}
try:
response = requests.get(url, headers=headers, timeout=timeout)
response.raise_for_status()
return response.json()
except requests.Timeout:
logger.warning(f"请求 {url} 超时,跳过保存操作。")
return None
except requests.exceptions.RequestException as e:
logger.error(f"请求 {url} 时发生错误: {e}")
return None
def save_to_json(data, filename):
with open(filename, 'w', encoding='utf-8') as f:
json.dump(data, f, ensure_ascii=False, indent=4)
def github_main_keyword(key):
all_results = [] # 用于存储所有结果
for keyword in key:
logger.info(f"github_keyword:关键词【{keyword}】获取开始。")
api_node = "https://api.github.com/search/repositories?q={}&sort=updated&per_page=20".format(keyword)
result = fetch_rss(api_node)
if result == None:
time.sleep(5)
continue
for i in range(0, 20):
description = result['items'][i]['description']
if description is None:
pass
# 检查描述中是否包含黑名单词汇
elif any(word in description for word in black_words):
continue # 跳过本次执行
# 截断描述并在300字节处添加...
elif len(description.encode('utf-8')) > MAX_DESCRIPTION_LENGTH:
# 找到300字节处的索引
byte_index = 0
char_index = 0
while byte_index < MAX_DESCRIPTION_LENGTH and char_index < len(description):
byte_index += len(description[char_index].encode('utf-8'))
char_index += 1
description = description[:char_index - 1] + '...'
link = result['items'][i]['html_url']
name = result['items'][i]['name']
created_at = result['items'][i]['created_at']
author = result['items'][i]['owner']['login']
language = result['items'][i]['language']
# 将每个项目的详细信息存储在一个字典中
project_info = {
'link': link,
'name': name,
'created_at': created_at,
'description': description,
'author': author,
'language': language,
'keyword': keyword
}
# print(project_info)
all_results.append(project_info)
time.sleep(5)
# 将所有结果写入JSON文件
save_to_json(all_results, './resources/JSON/github_keyword.json')
def github_main_repo(key):
all_results = [] # 用于存储所有结果
for keyword in key:
logger.info(f"github_repo:项目【{keyword}】更新情况获取开始。")
api_node = "https://api.github.com/repos/{}/commits?per_page=1".format(keyword)
result = fetch_rss(api_node)
if result == None:
time.sleep(5)
continue
commit = result[0] # 获取最新的提交记录
description = commit['commit']['message']
if description is None:
pass
# 截断描述并在300字节处添加...
elif len(description.encode('utf-8')) > MAX_DESCRIPTION_LENGTH:
byte_index = 0
char_index = 0
while byte_index < MAX_DESCRIPTION_LENGTH and char_index < len(description):
byte_index += len(description[char_index].encode('utf-8'))
char_index += 1
description = description[:char_index - 1] + '...'
author = commit['commit']['author']['name']
updated_at = commit['commit']['author']['date']
link_2 = commit['html_url']
# 将每个项目的详细信息存储在一个字典中
project_info = {
'link': f"https://api.github.com/{keyword}",
'name': keyword,
'updated_at': updated_at,
'description': description,
'author': author,
'link_2': link_2,
'keyword': keyword
}
# print(project_info)
all_results.append(project_info)
time.sleep(5)
# 将所有结果写入JSON文件
save_to_json(all_results, './resources/JSON/github_repo.json')
def github_main_release(key):
all_results = [] # 用于存储所有结果
for keyword in key:
logger.info(f"github_repo:项目【{keyword}】发版情况获取开始。")
api_node = "https://api.github.com/repos/{}/releases?per_page=1".format(keyword)
result = fetch_rss(api_node)
if result == None:
time.sleep(5)
continue
if not result:
logger.warning(f"github_repo:项目【{keyword}】不存在版本发布情况。")
time.sleep(5)
continue
# print(result)
# print(keyword)
commit = result[0] # 获取最新的提交记录
author = commit['author']['login']
published_at = commit['published_at']
link = commit['html_url']
# 将每个项目的详细信息存储在一个字典中
project_info = {
'link': link,
'published_at': published_at,
'author': author,
'keyword': keyword
}
# print(project_info)
all_results.append(project_info)
time.sleep(5)
# 将所有结果写入JSON文件
save_to_json(all_results, './resources/JSON/github_release.json')
def github_main_user(key):
all_results = [] # 用于存储所有结果
for keyword in key:
logger.info(f"github_user:作者【{keyword}】更新情况获取开始。")
api_node = "https://api.github.com/users/{}/repos?sort=created&per_page=10".format(keyword)
result = fetch_rss(api_node)
if result == None:
time.sleep(5)
continue
for i in range(0, len(result)):
description = result[i]['description']
if description is None:
pass
# 检查描述中是否包含黑名单词汇
elif any(word in description for word in black_words):
continue # 跳过本次执行
# 截断描述并在300字节处添加...
elif len(description.encode('utf-8')) > MAX_DESCRIPTION_LENGTH:
# 找到300字节处的索引
byte_index = 0
char_index = 0
while byte_index < MAX_DESCRIPTION_LENGTH and char_index < len(description):
byte_index += len(description[char_index].encode('utf-8'))
char_index += 1
description = description[:char_index - 1] + '...'
link = result[i]['html_url']
name = result[i]['name']
created_at = result[i]['created_at']
author = result[i]['owner']['login']
language = result[i]['language']
# 将每个项目的详细信息存储在一个字典中
project_info = {
'link': link,
'name': name,
'created_at': created_at,
'description': description,
'author': author,
'language': language,
'keyword': keyword
}
# print(project_info)
all_results.append(project_info)
time.sleep(5)
# 将所有结果写入JSON文件
save_to_json(all_results, './resources/JSON/github_user.json')
def github_main():
if keyword_list:
github_main_keyword(keyword_list)
if tool_list:
github_main_repo(tool_list)
github_main_release(tool_list)
if user_list:
github_main_user(user_list)
if __name__ == "__main__":
github_main()

View File

@ -1,119 +0,0 @@
import requests
from bs4 import BeautifulSoup
import json
import time
import os
import datetime
from requests.exceptions import RequestException
from loguru import logger
headers = {
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:133.0) Gecko/20100101 Firefox/133.0",
"Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8",
"Accept-Language": "zh-CN,zh;q=0.8,zh-TW;q=0.7,zh-HK;q=0.5,en-US;q=0.3,en;q=0.2",
"Accept-Encoding": "gzip, deflate, br",
"Upgrade-Insecure-Requests": "1",
"Sec-Fetch-Dest": "document",
"Sec-Fetch-Mode": "navigate",
"Sec-Fetch-Site": "none",
"Sec-Fetch-User": "?1",
"Priority": "u=0, i",
"Te": "trailers",
"Connection": "keep-alive"
}
def fetch_html(url, headers=headers, timeout=10):
try:
response = requests.get(url, headers=headers, timeout=timeout)
response.raise_for_status()
return response.text
except requests.Timeout:
logger.warning(f"请求 {url} 超时,跳过保存操作。")
except requests.exceptions.RequestException as e:
logger.error(f"请求 {url} 时发生错误: {e}")
def parse_html(html_content):
soup = BeautifulSoup(html_content, 'html.parser')
# 提取所有符合条件的<li>标签
items = soup.find_all('li', id=lambda x: x and x.startswith('sogou_vr_11002601_box_'))
results = []
for item in items:
# 提取标题和链接
title_tag = item.find('h3')
if title_tag:
a_tag = title_tag.find('a')
title = title_tag.get_text(strip=True) if title_tag else "No title found"
link = a_tag['href'] if a_tag else "No link found"
if link and not link.startswith('http'):
link = "https://weixin.sogou.com" + link
else:
title = "No title found"
link = "No link found"
# 提取摘要
summary_tag = item.find('p', class_='txt-info')
summary = summary_tag.get_text(strip=True) if summary_tag else "No summary found"
# 提取发布者
publisher_tag = item.find('span', class_='all-time-y2')
publisher = publisher_tag.get_text(strip=True) if publisher_tag else "No publisher found"
# 提取时间戳并转换为标准时间格式
timestamp_script = item.find('script', string=lambda text: 'document.write(timeConvert' in text)
if timestamp_script:
timestamp_str = timestamp_script.string.split("'")[1]
timestamp = int(timestamp_str)
standard_time = datetime.datetime.fromtimestamp(timestamp).strftime('%Y-%m-%d %H:%M:%S')
else:
standard_time = "No timestamp found"
results.append({
"title": title,
"link": link,
"description": summary,
"author": publisher,
"pubDate": standard_time
})
return results
def remove_surrogates(text):
"""移除非法代理对"""
return text.encode('utf-8', 'ignore').decode('utf-8')
def sougou_wx_main(keywords):
all_results = {} # 用于存储所有关键词的结果
for keyword in keywords:
url = f"https://weixin.sogou.com/weixin?type=2&s_from=input&ie=utf8&query={keyword}"
html_content = fetch_html(url)
# print(html_content)
if html_content is None:
logger.warning(f"无法获取微信公众号-Sogou搜索内容跳过保存操作。关键词: {keyword}")
continue
results = parse_html(html_content)
# 移除非法代理对
cleaned_results = [{k: remove_surrogates(v) for k, v in item.items()} for item in results]
logger.warning(f"关键词【{keyword}】的微信公众号-Sogou搜索内容保存成功。")
all_results[keyword] = cleaned_results # 将结果存储在字典中,以关键词为键
time.sleep(5)
# 将所有结果转换为JSON格式
json_results = json.dumps(all_results, ensure_ascii=False, indent=4)
# print(json_results)
# 确保目录存在
os.makedirs(os.path.dirname('./resources/JSON/sougou-wx.json'), exist_ok=True)
# 将解析后的数据保存到 JSON 文件
with open('./resources/JSON/sougou-wx.json', 'w', encoding='utf-8') as f:
f.write(json_results)
if __name__ == "__main__":
keywords = ["齐鲁银行", "APP逆向", "渗透测试"]
sougou_wx_main(keywords)

View File

@ -1,83 +1,65 @@
from flask import Flask, jsonify, render_template from flask import Flask, jsonify, render_template
import os import os
import logging
app = Flask(__name__) app = Flask(__name__)
# 配置文件路径 # 配置文件路径
BASE_DIR = os.path.dirname(os.path.abspath(__file__)) BASE_DIR = os.path.dirname(os.path.abspath(__file__))
PARENT_DIR = os.path.dirname(BASE_DIR) # 上一个文件夹 PARENT_DIR = os.path.dirname(BASE_DIR) # 上一个文件夹
SEC_NEWS_PATH = os.path.join(PARENT_DIR, 'resources', 'history', 'sec_news.md') SEC_NEWS_PATH = os.path.join(PARENT_DIR, 'history', 'sec_news.md')
TECH_PASSAGE_PATH = os.path.join(PARENT_DIR, 'resources', 'history', 'tech_passage.md') TECH_PASSAGE_PATH = os.path.join(PARENT_DIR, 'history', 'tech_passage.md')
CORE_LOG_PATH = os.path.join(PARENT_DIR, 'resources', 'log', 'core.log') CORE_LOG_PATH = os.path.join(PARENT_DIR, 'log', 'core.log') # 新增日志文件路径
WEB_LOG_PATH = os.path.join(PARENT_DIR, 'resources', 'log', 'app.log')
# 配置日志记录器
logging.basicConfig(
filename=WEB_LOG_PATH,
level=logging.INFO,
format= '%(asctime)s - %(levelname)s - %(message)s'
)
# 替换输出内容
def replace_content(content): def replace_content(content):
content = content.replace('####', '###') content = content.replace('####', '###')
content = content.replace(r"e:\Self-Tool-Code\PyBot", '.') # 修改: 使用原始字符串避免转义问题
return content return content
@app.route('/') @app.route('/')
def index(): def index():
logging.info("访问主页")
return render_template('index.html') return render_template('index.html')
@app.route('/get-sec-news') @app.route('/get-sec-news')
def get_sec_news(): def get_sec_news():
logging.info(f"尝试打开安全新闻历史推送文件: {SEC_NEWS_PATH}") print(f"尝试打开安全新闻历史推送文件: {SEC_NEWS_PATH}")
try: try:
with open(SEC_NEWS_PATH, 'r', encoding='utf-8') as file: with open(SEC_NEWS_PATH, 'r', encoding='utf-8') as file:
content = file.read() content = file.read()
content = replace_content(content) content = replace_content(content)
return jsonify({'content': content}), 200 return jsonify({'content': content}), 200
except FileNotFoundError: except FileNotFoundError:
logging.error(f"文件缺失: {SEC_NEWS_PATH}") print(f"文件缺失: {SEC_NEWS_PATH}")
return jsonify({'error': '安全新闻历史推送文件缺失!'}), 404 return jsonify({'error': '安全新闻历史推送文件缺失!'}), 404
except Exception as e: except Exception as e:
logging.error(f"读取时出错: {SEC_NEWS_PATH}, 原因: {str(e)}") print(f"读取时出错: {SEC_NEWS_PATH}, 原因: {str(e)}")
return jsonify({'error': str(e)}), 500 return jsonify({'error': str(e)}), 500
@app.route('/get-tech-passage') @app.route('/get-tech-passage')
def get_tech_passage(): def get_tech_passage():
logging.info(f"尝试打开技术文章历史推送文件: {TECH_PASSAGE_PATH}") print(f"尝试打开技术文章历史推送文件: {TECH_PASSAGE_PATH}")
try: try:
with open(TECH_PASSAGE_PATH, 'r', encoding='utf-8') as file: with open(TECH_PASSAGE_PATH, 'r', encoding='utf-8') as file:
content = file.read() content = file.read()
content = replace_content(content) content = replace_content(content)
return jsonify({'content': content}), 200 return jsonify({'content': content}), 200
except FileNotFoundError: except FileNotFoundError:
logging.error(f"文件缺失: {TECH_PASSAGE_PATH}") print(f"文件缺失: {TECH_PASSAGE_PATH}")
return jsonify({'error': '技术文章历史推送文件缺失!'}), 404 return jsonify({'error': '技术文章历史推送文件缺失!'}), 404
except Exception as e: except Exception as e:
logging.error(f"读取时出错: {TECH_PASSAGE_PATH}, 原因: {str(e)}") print(f"读取时出错: {TECH_PASSAGE_PATH}, 原因: {str(e)}")
return jsonify({'error': str(e)}), 500 return jsonify({'error': str(e)}), 500
@app.route('/log') @app.route('/log')
def get_log(): def get_log():
logging.info(f"尝试打开核心日志文件: {CORE_LOG_PATH}") print(f"尝试打开核心日志文件: {CORE_LOG_PATH}")
# 读取日志文件内容 # 读取日志文件内容
with open(CORE_LOG_PATH, 'r', encoding='utf-8') as file: with open(CORE_LOG_PATH, 'r', encoding='utf-8') as file:
log_content = file.read() log_content = file.read()
# 将日志内容传递给模板 # 将日志内容传递给模板
return render_template('log.html', log_content=log_content) return render_template('log.html', log_content=log_content)
@app.route('/weblog')
def get_weblog():
logging.info(f"尝试打开Web应用日志文件: {WEB_LOG_PATH}")
with open(WEB_LOG_PATH, 'r') as file:
log_content = file.read()
log_content = replace_content(log_content)
return render_template('log.html', log_content=log_content)
def run_server(): def run_server():
app.run(host='0.0.0.0', port=5000) app.run(host='0.0.0.0', port=5000)
if __name__ == '__main__': if __name__ == '__main__':
app.run(debug=False) # 在生产环境中应设置为 False app.run(debug=True) # 在生产环境中应设置为 False

View File

@ -99,7 +99,7 @@
const htmlContent = marked.parse(data.content); const htmlContent = marked.parse(data.content);
document.getElementById('markdown-content').innerHTML = htmlContent; document.getElementById('markdown-content').innerHTML = htmlContent;
} else { } else {
document.getElementById('markdown-content').innerHTML = '<p>加载历史推送文件时出错!(推送历史记录为空)</p>'; document.getElementById('markdown-content').innerHTML = '<p>加载历史推送文件时出错!</p>';
} }
}) })
.catch(error => { .catch(error => {
@ -119,7 +119,7 @@
const htmlContent = marked.parse(data.content); const htmlContent = marked.parse(data.content);
document.getElementById('markdown-content').innerHTML = htmlContent; document.getElementById('markdown-content').innerHTML = htmlContent;
} else { } else {
document.getElementById('markdown-content').innerHTML = '<p>加载历史推送文件时出错!(推送历史记录为空)</p>'; document.getElementById('markdown-content').innerHTML = '<p>加载历史推送文件时出错!</p>';
} }
}) })
.catch(error => { .catch(error => {

View File

@ -1,34 +0,0 @@
./Core.py: 核心程序
./Dev_test.py: 开发测试程序
- ./config
config.yaml: 配置各模块参数以及Token密钥
check_config.py: 核查配置信息并获取到配置信息返回给核心程序
- ./resources/log
app.py: web运行日志
core.py: 程序运行日志
- ./media
爬取RSS源以及信息源的json数据并保存
- ./resources/JSON
存储获取到的原始json数据
- ./GotoSend
对获取到的json数据进行处理将其存储值db文件内
- ./resources/db
存储处理过后的数据
- ./SendCore
各渠道推送核心程序
FeishuSendBot.py: 飞书推送核心程序
MailSendBot.py: 邮件推送核心程序
QiweiSendBot.py: 企业微信推送核心程序
- ./resources/history
存储历史推送记录为markdown文件
- ./web
网页运行程序