PyBot/Core.py
2024-12-10 11:49:15 +08:00

255 lines
8.7 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

# -*- coding: utf-8 -*-
"""
@Author: MasonLiu
@Description: 本程序可以爬取各安全资讯源,并发送到飞书群组。
"""
import schedule
import os
import signal
import sys
import time
import yaml
import requests
from datetime import datetime, timedelta
from SendBot import SendToFeishu, gen_sign
from media.common import run, seebug_main, M_4hou_main, anquanke_main, sec_wiki_main, huawei_main, doonsec_main, qianxin_main
from media.freebuf import freebuf_main
from media.xianzhi import xianzhi_main
from GotoSend.M_4hou import Src_4hou
from GotoSend.anquanke import Src_anquanke
from GotoSend.doonsec import Src_doonsec
from GotoSend.xianzhi import Src_xianzhi
from GotoSend.freebuf import Src_freebuf
from GotoSend.qianxin import Src_qianxin
from GotoSend.seebug import Src_seebug
from loguru import logger
# 清除所有已有的日志记录器配置
logger.remove()
logger.add("./log/spider.log",
format="{time:YYYY-MM-DD HH:mm:ss} - {level} - {name}:{function}:{line} - {message}",
rotation="100 MB",
compression="zip",
encoding="utf-8")
# shell终端打印日志
# logger.add(lambda msg: print(msg),
# format="{time:YYYY-MM-DD HH:mm:ss} - {level} - {name}:{function}:{line} - {message}")
# 加载参数
with open('./config.yaml', 'r', encoding="utf-8") as file:
config = yaml.safe_load(file)
# sleep_time = int(f"{config['sleep_time']}")
e_hour = int(f"{config['e_hour']}")
choice = int(f"{config['circle']}")
webhook_url_once, timestamp_once, sign_once = gen_sign()
def send_job(time_1):
# 爬取数据
logger.info("正在启动各爬虫并获取资源中...")
seebug_main()
anquanke_main()
huawei_main()
doonsec_main()
qianxin_main()
freebuf_main()
xianzhi_main()
M_4hou_main()
# 分析各个数据源的结果
reslt_4hou = Src_4hou(time_1)
reslt_anquanke = Src_anquanke(time_1)
reslt_doonsec = Src_doonsec(time_1)
reslt_xianzhi = Src_xianzhi(time_1)
reslt_freebuf = Src_freebuf(time_1)
reslt_qianxin = Src_qianxin(time_1)
reslt_seebug = Src_seebug(time_1)
webhook_url, timestamp, sign = gen_sign()
# 发送嘶吼资讯
if reslt_4hou:
# print("-" * 40)
logger.info("嘶吼资讯递送中:")
result = SendToFeishu(reslt_4hou, "嘶吼资讯递送", webhook_url, timestamp, sign)
logger.info(result)
# print("-" * 40 + "\n")
time.sleep(60)
else:
# print("-" * 40)
logger.info("嘶吼数据为空,跳过执行。")
# 发送安全客资讯
if reslt_anquanke:
# print("-" * 40)
logger.info("安全客资讯递送中:")
result = SendToFeishu(reslt_anquanke, "安全客资讯递送", webhook_url, timestamp, sign)
logger.info(result)
# print("-" * 40 + "\n")
time.sleep(60)
else:
# print("-" * 40)
logger.info("安全客数据为空,跳过执行。")
# 发送洞见微信安全资讯
if reslt_doonsec:
# print("-" * 40)
logger.info("洞见微信安全资讯递送中:")
result = SendToFeishu(reslt_doonsec, "洞见微信安全资讯递送", webhook_url, timestamp, sign)
logger.info(result)
# print("-" * 40 + "\n")
time.sleep(60)
else:
# print("-" * 40)
logger.info("洞见微信安全数据为空,跳过执行。")
# 发送先知社区资讯
if reslt_xianzhi:
# print("-" * 40)
logger.info("先知社区资讯递送中:")
result = SendToFeishu(reslt_xianzhi, "先知社区资讯递送", webhook_url, timestamp, sign)
logger.info(result)
# print("-" * 40 + "\n")
time.sleep(60)
else:
# print("-" * 40)
logger.info("先知社区数据为空,跳过执行。")
# 发送FreeBuf资讯
if reslt_freebuf:
# print("-" * 40)
logger.info("FreeBuf资讯递送中")
result = SendToFeishu(reslt_freebuf, "FreeBuf资讯递送", webhook_url, timestamp, sign)
logger.info(result)
# print("-" * 40 + "\n")
time.sleep(60)
else:
# print("-" * 40)
logger.info("FreeBuf数据为空跳过执行。")
# 发送奇安信攻防社区资讯
if reslt_qianxin:
# print("-" * 40)
logger.info("奇安信攻防社区资讯递送中:")
result = SendToFeishu(reslt_qianxin, "奇安信攻防社区资讯递送", webhook_url, timestamp, sign)
logger.info(result)
# print("-" * 40 + "\n")
time.sleep(60)
else:
# print("-" * 40)
logger.info("奇安信攻防社区数据为空,跳过执行。")
# 发送Seebug资讯
if reslt_seebug:
reslt_seebug = Src_seebug(1000)
webhook_url, timestamp, sign = gen_sign()
# print("-" * 40)
logger.info("Seebug社区资讯递送中")
result = SendToFeishu(reslt_seebug, "Seebug社区资讯递送", webhook_url, timestamp, sign)
logger.info(result)
# print("-" * 40 + "\n")
else:
# print("-" * 40)
logger.info("Seebug社区数据为空跳过执行。")
def signal_handler(sig, frame):
logger.info("接收到退出信号,程序即将退出...")
sys.exit(0)
signal.signal(signal.SIGINT, signal_handler) # Ctrl+C
signal.signal(signal.SIGTERM, signal_handler) # kill命令
def main_loop(choice):
n = 0
if choice == 1:
while True:
try:
# 执行任务
logger.info(f"{n+1}次执行,当前时间为:{datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")
send_job(e_hour)
logger.info("执行完毕,等待下一次执行...")
time.sleep(e_hour * 60 * 60 - 5 * 60)
except Exception as e:
logger.error(f"发生错误: {e}, 程序已暂停")
# result = SendToFeishu(f"发生错误: {e}, 程序已退出", "报错信息")
# logger.info(result)
exit()
elif choice == 0:
# 设置每天的特定时间点执行job函数
logger.info(f"{n+1}次执行准备开始。")
schedule.every().day.at("09:00").do(send_job, 12)
schedule.every().day.at("11:31").do(send_job, 3)
schedule.every().day.at("15:00").do(send_job, 3)
schedule.every().day.at("18:00").do(send_job, 3)
schedule.every().day.at("21:00").do(send_job, 3)
while True:
schedule.run_pending()
n += 1
time.sleep(60) # 每分钟检查一次是否有任务需要执行
# 探测rss源状态
def check_rss_status(url):
try:
response = requests.get(url, timeout=10)
if response.status_code == 200 and len(response.content) > 0:
return True
else:
return f"状态码: {response.status_code}, 内容长度: {len(response.content)}"
except requests.RequestException as e:
return f"请求异常: {str(e)}"
def test_rss_source():
rss_sources = {
"奇安信": "https://forum.butian.net/Rss",
"洞见": "https://wechat.doonsec.com/bayes_rss.xml",
# "华为": "https://www.huawei.com/cn/rss-feeds/psirt/rss",
# "安全维基": "https://www.sec_wiki.com/news/rss",
"安全客": "https://api.anquanke.com/data/v1/rss",
"嘶吼": "https://www.4hou.com/feed",
"Seebug社区": "https://paper.seebug.org/rss/",
"FreeBuf社区": "https://www.freebuf.com/feed",
"先知社区": "https://xz.aliyun.com/feed"
}
rss_info = ""
for name, url in rss_sources.items():
status = check_rss_status(url)
if status:
rss_info += f"{name} 源正常\n"
else:
rss_info += f"{name} 源异常: {status}\n"
return rss_info
if __name__ == "__main__":
# print("程序正在运行当中。")
time.sleep(5) # 添加短暂的延迟
rss_info = test_rss_source()
start_info = ""
start_info += "程序已启动,当前时间为:" + datetime.now().strftime("%Y-%m-%d %H:%M:%S") + "\n"
start_info += "程序作者MasonLiu \t 开源地址:[GM-gitea](https://git.masonliu.com/MasonLiu/PyBot)" + "\n"
if choice == 1:
start_info += "时间配置:每隔" + str(e_hour) + "小时执行一次推送\n"
else:
start_info += "时间配置:每天固定时间点执行推送\n"
result = SendToFeishu(start_info, "程序信息", webhook_url_once, timestamp_once, sign_once)
logger.info(result)
# print("-" * 40)
result = SendToFeishu(rss_info, "RSS源状态", webhook_url_once, timestamp_once, sign_once)
# logger.info(rss_info)
logger.info(result)
# print("-" * 40)
# 首次运行先暂停两分钟
# time.sleep(2 * 60)
# 主程序
main_loop(choice)