PyBot/Core.py
2025-01-02 17:11:11 +08:00

257 lines
10 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

# -*- coding: utf-8 -*-
"""
@Author: MasonLiu
@Description: 本程序可以爬取各安全资讯源,并发送到飞书群组。
"""
import schedule
import os
import signal
import sys
import time
import yaml
import requests
from datetime import datetime, timedelta
from SendCore.FeishuSendBot import SendToFeishu, gen_sign
from SendCore.QiweiSendBot import SendToWX
from spider.common import run, seebug_main, M_4hou_main, anquanke_main, sec_wiki_main, huawei_main, doonsec_main, qianxin_main
from spider.freebuf import freebuf_main
from spider.xianzhi import xianzhi_main
from spider.sougou_wx import sougou_wx_main
from spider.github import github_main
from GotoSend.M_4hou import Src_4hou
from GotoSend.anquanke import Src_anquanke
from GotoSend.doonsec import Src_doonsec
from GotoSend.xianzhi import Src_xianzhi
from GotoSend.freebuf import Src_freebuf
from GotoSend.qianxin import Src_qianxin
from GotoSend.seebug import Src_seebug
from GotoSend.sougou_wx import Src_sougou_wx
from GotoSend.github import Src_github
from config.check_config import get_core_config, get_debug_config, get_kewords_config
from loguru import logger
# 清除所有已有的日志记录器配置
logger.remove()
logger.add("./resources/log/core.log",
format="{time:YYYY-MM-DD HH:mm:ss} - {level} - {name}:{function}:{line} - {message}",
rotation="100 MB",
compression="zip",
encoding="utf-8")
# shell终端打印日志
debug = get_debug_config()
if debug == "True":
logger.add(lambda msg: print(msg),
format="{time:YYYY-MM-DD HH:mm:ss} - {level} - {name}:{function}:{line} - {message}")
def signal_handler(sig, frame):
logger.info("接收到退出信号,程序即将退出...")
sys.exit(0)
# 全局变量
signal.signal(signal.SIGINT, signal_handler) # Ctrl+C
signal.signal(signal.SIGTERM, signal_handler) # kill命令
webhook_url_once, timestamp_once, sign_once = gen_sign()
e_hour, time_choice, choice, fs_activate, wx_activate, ding_activate, lx_activate, url_web = get_core_config()
Sogou_WX, Doonsec_switch, Doonsec = get_kewords_config()
def check_avaliable(info_long, info_short, title, webhook_url, timestamp, sign):
if info_long: # 发送完整文章相关内容
# logger.info(f"{title} 递送中(飞书):")
result = SendToFeishu(info_long, title, webhook_url, timestamp, sign)
logger.info(result)
time.sleep(15)
else:
pass
if info_short: # 发送精简文章相关内容
# 企业微信相关
if wx_activate == "True":
# logger.info(f"{title} 递送中(企业微信):")
result = SendToWX(info_short, title)
logger.info(result)
time.sleep(15)
else:
pass
# 钉钉相关
if ding_activate == "True":
# logger.info(f"{title} 递送中(钉钉):")
# result = SendToWX(info_short, title) # 待完善
logger.info(result)
time.sleep(15)
else:
pass
if not info_long and not info_short:
logger.info(f"{title}数据为空,跳过执行。")
def send_job_RSS(time_1):
# 爬取数据
seebug_main()
anquanke_main()
huawei_main()
doonsec_main()
qianxin_main()
freebuf_main()
xianzhi_main()
M_4hou_main()
# 分析各个数据源的结果(输出长结果)
result_4hou_long = Src_4hou(time_1, False)
result_anquanke_long = Src_anquanke(time_1, False)
result_doonsec_long = Src_doonsec(time_1, False, Doonsec_switch, Doonsec)
result_xianzhi_long = Src_xianzhi(time_1, False)
result_freebuf_long = Src_freebuf(time_1, False)
result_qianxin_long = Src_qianxin(time_1, False)
result_seebug_long = Src_seebug(time_1, False)
# 分析各个数据源的结果(输出短结果)
result_4hou_short = Src_4hou(time_1, True)
result_anquanke_short = Src_anquanke(time_1, True)
result_doonsec_short = Src_doonsec(time_1, True, Doonsec_switch, Doonsec)
result_xianzhi_short = Src_xianzhi(time_1, True)
result_freebuf_short = Src_freebuf(time_1, True)
result_qianxin_short = Src_qianxin(time_1, True)
result_seebug_short = Src_seebug(time_1, True)
webhook_url, timestamp, sign = gen_sign()
check_avaliable(result_4hou_long, result_4hou_short, "嘶吼资讯", webhook_url, timestamp, sign)
check_avaliable(result_anquanke_long, result_anquanke_short, "安全客资讯", webhook_url, timestamp, sign)
check_avaliable(result_doonsec_long, result_doonsec_short, "洞见微信安全资讯", webhook_url, timestamp, sign)
check_avaliable(result_xianzhi_long, result_xianzhi_short, "先知社区资讯", webhook_url, timestamp, sign)
check_avaliable(result_freebuf_long, result_freebuf_short, "FreeBuf资讯", webhook_url, timestamp, sign)
check_avaliable(result_qianxin_long, result_qianxin_short, "奇安信攻防社区资讯", webhook_url, timestamp, sign)
check_avaliable(result_seebug_long, result_seebug_short, "Seebug社区资讯", webhook_url, timestamp, sign)
def send_job_SX():
sougou_wx_main(Sogou_WX)
result_sx_long = Src_sougou_wx(False)
result_sx_short = Src_sougou_wx(True)
webhook_url, timestamp, sign = gen_sign()
check_avaliable(result_sx_long, result_sx_short, "微信公众号关键词相关内容", webhook_url, timestamp, sign)
def send_job_github(time_1):
github_main()
result_github_1_long, result_github_2_long, result_github_3_long, result_github_4_long = Src_github(time_1, False)
result_github_1_short, result_github_2_short, result_github_3_short, result_github_4_short = Src_github(time_1, True)
webhook_url, timestamp, sign = gen_sign()
check_avaliable(result_github_1_long, result_github_1_short, "Github项目监控-关键词监控", webhook_url, timestamp, sign)
check_avaliable(result_github_2_long, result_github_2_short, "Github项目监控-项目更新情况", webhook_url, timestamp, sign)
webhook_url, timestamp, sign = gen_sign()
check_avaliable(result_github_3_long, result_github_3_short, "Github项目监控-大佬工具", webhook_url, timestamp, sign)
check_avaliable(result_github_4_long, result_github_4_short, "Github项目监控-项目版本发布监测", webhook_url, timestamp, sign)
# 探测rss源状态
def check_rss_status(url):
try:
response = requests.get(url, timeout=10)
if response.status_code == 200 and len(response.content) > 0:
return True
else:
return f"状态码: {response.status_code}, 内容长度: {len(response.content)}"
except requests.RequestException as e:
return f"请求异常: {str(e)}"
def test_rss_source():
rss_sources = {
"奇安信": "https://forum.butian.net/Rss",
"洞见": "https://wechat.doonsec.com/bayes_rss.xml",
# "华为": "https://www.huawei.com/cn/rss-feeds/psirt/rss",
# "安全维基": "https://www.sec_wiki.com/news/rss",
"安全客": "https://api.anquanke.com/data/v1/rss",
"嘶吼": "https://www.4hou.com/feed",
"Seebug社区": "https://paper.seebug.org/rss/",
"FreeBuf社区": "https://www.freebuf.com/feed",
"先知社区": "https://xz.aliyun.com/feed"
}
rss_info = ""
for name, url in rss_sources.items():
status = check_rss_status(url)
if status:
rss_info += f"{name} 源正常\n"
else:
rss_info += f"{name} 源异常: {status}\n"
return rss_info
def main_job(e_hour):
logger.info(f"发送程序启动,当前时间为:{datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")
logger.info("正在启动各爬虫并获取资源中...")
if 0 in choice:
send_job_RSS(e_hour)
if 1 in choice:
send_job_SX()
if 2 in choice:
send_job_github(e_hour)
def main_loop(time_choice):
if time_choice == 1:
while True:
try:
# 执行任务
main_job(e_hour)
time.sleep(e_hour * 60 * 60 - 3 * 60)
except Exception as e:
logger.error(f"发生错误: {e}, 程序已暂停")
# result = SendToFeishu(f"发生错误: {e}, 程序已退出", "报错信息")
# logger.info(result)
exit()
elif time_choice == 0:
# 设置每天的特定时间点执行job函数
schedule.every().day.at("09:00").do(main_job, 12)
schedule.every().day.at("12:00").do(main_job, 3)
schedule.every().day.at("15:00").do(main_job, 3)
schedule.every().day.at("18:00").do(main_job, 3)
schedule.every().day.at("21:00").do(main_job, 3)
while True:
schedule.run_pending()
time.sleep(60) # 每分钟检查一次是否有任务需要执行
def send_first_message():
rss_info = test_rss_source()
start_info = ""
start_info += "程序已启动,当前时间为:" + datetime.now().strftime("%Y-%m-%d %H:%M:%S") + "\n"
start_info += "程序作者MasonLiu \t 开源地址:[GM-gitea](https://git.masonliu.com/MasonLiu/PyBot)" + "\n"
if time_choice == 1:
start_info += "时间配置:每隔" + str(e_hour) + "小时执行一次推送\n"
elif time_choice == 0:
start_info += "时间配置:每天固定时间点执行推送\n"
if 0 in choice:
if fs_activate == "True":
result = SendToFeishu(start_info, "程序信息", webhook_url_once, timestamp_once, sign_once)
logger.info(result)
result = SendToFeishu(rss_info, "RSS源状态", webhook_url_once, timestamp_once, sign_once)
# logger.info(rss_info)
logger.info(result)
send_result = SendToFeishu(f"[点此访问]({url_web})网站以查看全部文章。", "首次运行提醒", webhook_url_once, timestamp_once, sign_once)
logger.info(send_result)
else:
pass
if wx_activate == "True":
result = SendToWX(start_info, "程序信息")
logger.info(result)
result = SendToWX(rss_info, "RSS源状态")
# logger.info(rss_info)
logger.info(result)
send_result = SendToWX(f"[点此访问]({url_web})网站以查看全部文章。", "首次运行提醒")
logger.info(send_result)
else:
pass
if __name__ == "__main__":
logger.info("程序正在运行当中。")
time.sleep(5) # 添加短暂的延迟
# 首次运行先暂停两分钟
# time.sleep(2 * 60)
# 主程序
send_first_message()
main_loop(time_choice)