PyBot/Core.py
2024-12-16 18:57:33 +08:00

218 lines
7.8 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

# -*- coding: utf-8 -*-
"""
@Author: MasonLiu
@Description: 本程序可以爬取各安全资讯源,并发送到飞书群组。
"""
import schedule
import os
import signal
import sys
import time
import yaml
import requests
from datetime import datetime, timedelta
from SendCore.FeishuSendBot import SendToFeishu, gen_sign
from SendCore.QiweiSendBot import SendToWX
from media.common import run, seebug_main, M_4hou_main, anquanke_main, sec_wiki_main, huawei_main, doonsec_main, qianxin_main
from media.freebuf import freebuf_main
from media.xianzhi import xianzhi_main
from GotoSend.M_4hou import Src_4hou
from GotoSend.anquanke import Src_anquanke
from GotoSend.doonsec import Src_doonsec
from GotoSend.xianzhi import Src_xianzhi
from GotoSend.freebuf import Src_freebuf
from GotoSend.qianxin import Src_qianxin
from GotoSend.seebug import Src_seebug
from loguru import logger
# 清除所有已有的日志记录器配置
logger.remove()
logger.add("./log/core.log",
format="{time:YYYY-MM-DD HH:mm:ss} - {level} - {name}:{function}:{line} - {message}",
rotation="100 MB",
compression="zip",
encoding="utf-8")
# shell终端打印日志
# logger.add(lambda msg: print(msg),
# format="{time:YYYY-MM-DD HH:mm:ss} - {level} - {name}:{function}:{line} - {message}")
# 加载参数
with open('./config.yaml', 'r', encoding="utf-8") as file:
config = yaml.safe_load(file)
# sleep_time = int(f"{config['sleep_time']}")
e_hour = int(f"{config['e_hour']}")
choice = int(f"{config['circle']}")
fs_activate = f"{config['fs_activate']}"
wx_activate = f"{config['wx_activate']}"
webhook_url_once, timestamp_once, sign_once = gen_sign()
def check_avaliable(info, title, webhook_url, timestamp, sign):
if info:
if fs_activate == "True":
# logger.info(f"{title} 递送中(飞书):")
result = SendToFeishu(info, title, webhook_url, timestamp, sign)
logger.info(result)
time.sleep(30)
else:
pass
if wx_activate == "True":
# logger.info(f"{title} 递送中(企业微信):")
result = SendToWX(info, title)
logger.info(result)
time.sleep(30)
else:
pass
else:
logger.info(f"{title}数据为空,跳过执行。")
def send_job(time_1):
# 爬取数据
logger.info(f"发送程序启动,当前时间为:{datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")
logger.info("正在启动各爬虫并获取资源中...")
seebug_main()
anquanke_main()
huawei_main()
doonsec_main()
qianxin_main()
freebuf_main()
xianzhi_main()
M_4hou_main()
# 分析各个数据源的结果
reslt_4hou = Src_4hou(time_1)
reslt_anquanke = Src_anquanke(time_1)
reslt_doonsec = Src_doonsec(time_1)
reslt_xianzhi = Src_xianzhi(time_1)
reslt_freebuf = Src_freebuf(time_1)
reslt_qianxin = Src_qianxin(time_1)
reslt_seebug = Src_seebug(time_1)
webhook_url, timestamp, sign = gen_sign()
check_avaliable(reslt_4hou, "嘶吼资讯", webhook_url, timestamp, sign)
check_avaliable(reslt_anquanke, "安全客资讯", webhook_url, timestamp, sign)
check_avaliable(reslt_doonsec, "洞见微信安全资讯", webhook_url, timestamp, sign)
check_avaliable(reslt_xianzhi, "先知社区资讯", webhook_url, timestamp, sign)
check_avaliable(reslt_freebuf, "FreeBuf资讯", webhook_url, timestamp, sign)
check_avaliable(reslt_qianxin, "奇安信攻防社区资讯", webhook_url, timestamp, sign)
check_avaliable(reslt_seebug, "Seebug社区资讯", webhook_url, timestamp, sign)
if fs_activate == "True":
send_result = SendToFeishu("[点此访问](https://info.masonliu.com)网站以查看全部文章。", "单次运行结束", webhook_url, timestamp, sign)
logger.info(send_result)
else:
pass
if wx_activate == "True":
send_result = SendToWX("[点此访问](https://info.masonliu.com)网站以查看全部文章。", "单次运行结束")
logger.info(send_result)
else:
pass
logger.info("执行完毕,等待下一次执行...")
def signal_handler(sig, frame):
logger.info("接收到退出信号,程序即将退出...")
sys.exit(0)
signal.signal(signal.SIGINT, signal_handler) # Ctrl+C
signal.signal(signal.SIGTERM, signal_handler) # kill命令
def main_loop(choice):
if choice == 1:
while True:
try:
# 执行任务
send_job(e_hour)
time.sleep(e_hour * 60 * 60 - 3 * 60)
except Exception as e:
logger.error(f"发生错误: {e}, 程序已暂停")
# result = SendToFeishu(f"发生错误: {e}, 程序已退出", "报错信息")
# logger.info(result)
exit()
elif choice == 0:
# 设置每天的特定时间点执行job函数
schedule.every().day.at("09:00").do(send_job, 12)
schedule.every().day.at("12:00").do(send_job, 3)
schedule.every().day.at("15:00").do(send_job, 3)
schedule.every().day.at("18:00").do(send_job, 3)
schedule.every().day.at("21:00").do(send_job, 3)
while True:
schedule.run_pending()
n += 1
time.sleep(60) # 每分钟检查一次是否有任务需要执行
# 探测rss源状态
def check_rss_status(url):
try:
response = requests.get(url, timeout=10)
if response.status_code == 200 and len(response.content) > 0:
return True
else:
return f"状态码: {response.status_code}, 内容长度: {len(response.content)}"
except requests.RequestException as e:
return f"请求异常: {str(e)}"
def test_rss_source():
rss_sources = {
"奇安信": "https://forum.butian.net/Rss",
"洞见": "https://wechat.doonsec.com/bayes_rss.xml",
# "华为": "https://www.huawei.com/cn/rss-feeds/psirt/rss",
# "安全维基": "https://www.sec_wiki.com/news/rss",
"安全客": "https://api.anquanke.com/data/v1/rss",
"嘶吼": "https://www.4hou.com/feed",
"Seebug社区": "https://paper.seebug.org/rss/",
"FreeBuf社区": "https://www.freebuf.com/feed",
"先知社区": "https://xz.aliyun.com/feed"
}
rss_info = ""
for name, url in rss_sources.items():
status = check_rss_status(url)
if status:
rss_info += f"{name} 源正常\n"
else:
rss_info += f"{name} 源异常: {status}\n"
return rss_info
if __name__ == "__main__":
print("程序正在运行当中。")
time.sleep(5) # 添加短暂的延迟
rss_info = test_rss_source()
start_info = ""
start_info += "程序已启动,当前时间为:" + datetime.now().strftime("%Y-%m-%d %H:%M:%S") + "\n"
start_info += "程序作者MasonLiu \t 开源地址:[GM-gitea](https://git.masonliu.com/MasonLiu/PyBot)" + "\n"
if choice == 1:
start_info += "时间配置:每隔" + str(e_hour) + "小时执行一次推送\n"
else:
start_info += "时间配置:每天固定时间点执行推送\n"
if fs_activate == "True":
result = SendToFeishu(start_info, "程序信息", webhook_url_once, timestamp_once, sign_once)
logger.info(result)
result = SendToFeishu(rss_info, "RSS源状态", webhook_url_once, timestamp_once, sign_once)
# logger.info(rss_info)
logger.info(result)
else:
pass
if wx_activate == "True":
result = SendToWX(start_info, "程序信息")
logger.info(result)
result = SendToWX(rss_info, "RSS源状态")
# logger.info(rss_info)
logger.info(result)
else:
pass
# 首次运行先暂停两分钟
# time.sleep(2 * 60)
# 主程序
main_loop(choice)