PyBot/Core.py
2024-12-25 18:40:20 +08:00

231 lines
8.8 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

# -*- coding: utf-8 -*-
"""
@Author: MasonLiu
@Description: 本程序可以爬取各安全资讯源,并发送到飞书群组。
"""
import schedule
import os
import signal
import sys
import time
import yaml
import requests
from datetime import datetime, timedelta
from SendCore.FeishuSendBot import SendToFeishu, gen_sign
from SendCore.QiweiSendBot import SendToWX
from media.common import run, seebug_main, M_4hou_main, anquanke_main, sec_wiki_main, huawei_main, doonsec_main, qianxin_main
from media.freebuf import freebuf_main
from media.xianzhi import xianzhi_main
from GotoSend.M_4hou import Src_4hou
from GotoSend.anquanke import Src_anquanke
from GotoSend.doonsec import Src_doonsec
from GotoSend.xianzhi import Src_xianzhi
from GotoSend.freebuf import Src_freebuf
from GotoSend.qianxin import Src_qianxin
from GotoSend.seebug import Src_seebug
from config.check_config import get_core_config, get_debug_config
from loguru import logger
# 清除所有已有的日志记录器配置
logger.remove()
logger.add("./log/core.log",
format="{time:YYYY-MM-DD HH:mm:ss} - {level} - {name}:{function}:{line} - {message}",
rotation="100 MB",
compression="zip",
encoding="utf-8")
# shell终端打印日志
debug = get_debug_config()
if debug == "True":
logger.add(lambda msg: print(msg),
format="{time:YYYY-MM-DD HH:mm:ss} - {level} - {name}:{function}:{line} - {message}")
def signal_handler(sig, frame):
logger.info("接收到退出信号,程序即将退出...")
sys.exit(0)
# 全局变量
signal.signal(signal.SIGINT, signal_handler) # Ctrl+C
signal.signal(signal.SIGTERM, signal_handler) # kill命令
webhook_url_once, timestamp_once, sign_once = gen_sign()
e_hour, choice, fs_activate, wx_activate, ding_activate, lx_activate, url_web = get_core_config()
def check_avaliable(info_long, info_short, title, webhook_url, timestamp, sign):
if info_long: # 发送完整文章相关内容
# logger.info(f"{title} 递送中(飞书):")
result = SendToFeishu(info_long, title, webhook_url, timestamp, sign)
logger.info(result)
time.sleep(15)
else:
pass
if info_short: # 发送精简文章相关内容
# 企业微信相关
if wx_activate == "True":
# logger.info(f"{title} 递送中(企业微信):")
result = SendToWX(info_short, title)
logger.info(result)
time.sleep(15)
else:
pass
# 钉钉相关
if ding_activate == "True":
# logger.info(f"{title} 递送中(钉钉):")
# result = SendToWX(info_short, title) # 待完善
logger.info(result)
time.sleep(15)
else:
pass
if not info_long and not info_short:
logger.info(f"{title}数据为空,跳过执行。")
def send_job(time_1):
# 爬取数据
logger.info(f"发送程序启动,当前时间为:{datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")
logger.info("正在启动各爬虫并获取资源中...")
seebug_main()
anquanke_main()
huawei_main()
doonsec_main()
qianxin_main()
freebuf_main()
xianzhi_main()
M_4hou_main()
# 分析各个数据源的结果(输出长结果)
result_4hou_long = Src_4hou(time_1, False)
result_anquanke_long = Src_anquanke(time_1, False)
result_doonsec_long = Src_doonsec(time_1, False)
result_xianzhi_long = Src_xianzhi(time_1, False)
result_freebuf_long = Src_freebuf(time_1, False)
result_qianxin_long = Src_qianxin(time_1, False)
result_seebug_long = Src_seebug(time_1, False)
# 分析各个数据源的结果(输出短结果)
result_4hou_short = Src_4hou(time_1, True)
result_anquanke_short = Src_anquanke(time_1, True)
result_doonsec_short = Src_doonsec(time_1, True)
result_xianzhi_short = Src_xianzhi(time_1, True)
result_freebuf_short = Src_freebuf(time_1, True)
result_qianxin_short = Src_qianxin(time_1, True)
result_seebug_short = Src_seebug(time_1, True)
webhook_url, timestamp, sign = gen_sign()
check_avaliable(result_4hou_long, result_4hou_short, "嘶吼资讯", webhook_url, timestamp, sign)
check_avaliable(result_anquanke_long, result_anquanke_short, "安全客资讯", webhook_url, timestamp, sign)
check_avaliable(result_doonsec_long, result_doonsec_short, "洞见微信安全资讯", webhook_url, timestamp, sign)
check_avaliable(result_xianzhi_long, result_xianzhi_short, "先知社区资讯", webhook_url, timestamp, sign)
check_avaliable(result_freebuf_long, result_freebuf_short, "FreeBuf资讯", webhook_url, timestamp, sign)
check_avaliable(result_qianxin_long, result_qianxin_short, "奇安信攻防社区资讯", webhook_url, timestamp, sign)
check_avaliable(result_seebug_long, result_seebug_short, "Seebug社区资讯", webhook_url, timestamp, sign)
if fs_activate == "True":
send_result = SendToFeishu(f"[点此访问]({url_web})网站以查看全部文章。", "单次运行结束", webhook_url, timestamp, sign)
logger.info(send_result)
else:
pass
if wx_activate == "True":
send_result = SendToWX(f"[点此访问]({url_web})网站以查看全部文章。", "单次运行结束")
logger.info(send_result)
else:
pass
logger.info("执行完毕,等待下一次执行...")
# 探测rss源状态
def check_rss_status(url):
try:
response = requests.get(url, timeout=10)
if response.status_code == 200 and len(response.content) > 0:
return True
else:
return f"状态码: {response.status_code}, 内容长度: {len(response.content)}"
except requests.RequestException as e:
return f"请求异常: {str(e)}"
def test_rss_source():
rss_sources = {
"奇安信": "https://forum.butian.net/Rss",
"洞见": "https://wechat.doonsec.com/bayes_rss.xml",
# "华为": "https://www.huawei.com/cn/rss-feeds/psirt/rss",
# "安全维基": "https://www.sec_wiki.com/news/rss",
"安全客": "https://api.anquanke.com/data/v1/rss",
"嘶吼": "https://www.4hou.com/feed",
"Seebug社区": "https://paper.seebug.org/rss/",
"FreeBuf社区": "https://www.freebuf.com/feed",
"先知社区": "https://xz.aliyun.com/feed"
}
rss_info = ""
for name, url in rss_sources.items():
status = check_rss_status(url)
if status:
rss_info += f"{name} 源正常\n"
else:
rss_info += f"{name} 源异常: {status}\n"
return rss_info
def main_loop(choice):
if choice == 1:
while True:
try:
# 执行任务
send_job(e_hour)
time.sleep(e_hour * 60 * 60 - 3 * 60)
except Exception as e:
logger.error(f"发生错误: {e}, 程序已暂停")
# result = SendToFeishu(f"发生错误: {e}, 程序已退出", "报错信息")
# logger.info(result)
exit()
elif choice == 0:
# 设置每天的特定时间点执行job函数
schedule.every().day.at("09:00").do(send_job, 12)
schedule.every().day.at("12:00").do(send_job, 3)
schedule.every().day.at("15:00").do(send_job, 3)
schedule.every().day.at("18:00").do(send_job, 3)
schedule.every().day.at("21:00").do(send_job, 3)
while True:
schedule.run_pending()
time.sleep(60) # 每分钟检查一次是否有任务需要执行
def send_first_message():
rss_info = test_rss_source()
start_info = ""
start_info += "程序已启动,当前时间为:" + datetime.now().strftime("%Y-%m-%d %H:%M:%S") + "\n"
start_info += "程序作者MasonLiu \t 开源地址:[GM-gitea](https://git.masonliu.com/MasonLiu/PyBot)" + "\n"
if choice == 1:
start_info += "时间配置:每隔" + str(e_hour) + "小时执行一次推送\n"
else:
start_info += "时间配置:每天固定时间点执行推送\n"
if fs_activate == "True":
result = SendToFeishu(start_info, "程序信息", webhook_url_once, timestamp_once, sign_once)
logger.info(result)
result = SendToFeishu(rss_info, "RSS源状态", webhook_url_once, timestamp_once, sign_once)
# logger.info(rss_info)
logger.info(result)
else:
pass
if wx_activate == "True":
result = SendToWX(start_info, "程序信息")
logger.info(result)
result = SendToWX(rss_info, "RSS源状态")
# logger.info(rss_info)
logger.info(result)
else:
pass
if __name__ == "__main__":
logger.info("程序正在运行当中。")
time.sleep(5) # 添加短暂的延迟
# 首次运行先暂停两分钟
# time.sleep(2 * 60)
# 主程序
send_first_message()
main_loop(choice)