# -*- coding: utf-8 -*- """ @Author: MasonLiu @Description: 本程序可以爬取各安全资讯源,并发送到飞书群组。 """ import signal from datetime import datetime, timedelta import sys import time import yaml import requests from SendBot import SendToFeishu, gen_sign from media.common import run, seebug_main, M_4hou_main, anquanke_main, sec_wiki_main, huawei_main, doonsec_main, qianxin_main from media.freebuf import freebuf_main from media.xianzhi import xianzhi_main from GotoSend_4hou import Src_4hou from GotoSend_anquanke import Src_anquanke from GotoSend_doonsec import Src_doonsec from GotoSend_xianzhi import Src_xianzhi from GotoSend_freebuf import Src_freebuf from GotoSend_qianxin import Src_qianxin from GotoSend_seebug import Src_seebug # 加载参数 with open('./config.yaml', 'r', encoding="utf-8") as file: config = yaml.safe_load(file) # sleep_time = int(f"{config['sleep_time']}") e_hour = int(f"{config['e_hour']}") webhook_url_once, timestamp_once, sign_once = gen_sign() def send_job(time_1): # 爬取数据 print("正在启动各爬虫并获取资源中...") run() freebuf_main() xianzhi_main() # 分析各个数据源的结果 reslt_4hou = Src_4hou(time_1) reslt_anquanke = Src_anquanke(time_1) reslt_doonsec = Src_doonsec(time_1) reslt_xianzhi = Src_xianzhi(time_1) reslt_freebuf = Src_freebuf(time_1) reslt_qianxin = Src_qianxin(time_1) reslt_seebug = Src_seebug(time_1) webhook_url, timestamp, sign = gen_sign() # 发送嘶吼资讯 if reslt_4hou: print("-" * 40) print("嘶吼资讯递送中:") SendToFeishu(reslt_4hou, "嘶吼资讯递送", webhook_url, timestamp, sign) print("-" * 40 + "\n") time.sleep(60) else: print("-" * 40) print("嘶吼数据为空,跳过执行。") # 发送安全客资讯 if reslt_anquanke: print("-" * 40) print("安全客资讯递送中:") SendToFeishu(reslt_anquanke, "安全客资讯递送", webhook_url, timestamp, sign) print("-" * 40 + "\n") time.sleep(60) else: print("-" * 40) print("安全客数据为空,跳过执行。") # 发送洞见微信安全资讯 if reslt_doonsec: print("-" * 40) print("洞见微信安全资讯递送中:") SendToFeishu(reslt_doonsec, "洞见微信安全资讯递送", webhook_url, timestamp, sign) print("-" * 40 + "\n") time.sleep(60) else: print("-" * 40) print("洞见微信安全数据为空,跳过执行。") # 发送先知社区资讯 if reslt_xianzhi: print("-" * 40) print("先知社区资讯递送中:") SendToFeishu(reslt_xianzhi, "先知社区资讯递送", webhook_url, timestamp, sign) print("-" * 40 + "\n") time.sleep(60) else: print("-" * 40) print("先知社区数据为空,跳过执行。") # 发送FreeBuf资讯 if reslt_freebuf: print("-" * 40) print("FreeBuf资讯递送中:") SendToFeishu(reslt_freebuf, "FreeBuf资讯递送", webhook_url, timestamp, sign) print("-" * 40 + "\n") time.sleep(60) else: print("-" * 40) print("FreeBuf数据为空,跳过执行。") # 发送奇安信攻防社区资讯 if reslt_qianxin: print("-" * 40) print("奇安信攻防社区资讯递送中:") SendToFeishu(reslt_qianxin, "奇安信攻防社区资讯递送", webhook_url, timestamp, sign) print("-" * 40 + "\n") time.sleep(60) else: print("-" * 40) print("奇安信攻防社区数据为空,跳过执行。") # 发送Seebug资讯 if reslt_seebug: reslt_seebug = Src_seebug(1000) webhook_url, timestamp, sign = gen_sign() print("-" * 40) print("Seebug社区资讯递送中:") SendToFeishu(reslt_seebug, "Seebug社区资讯递送", webhook_url, timestamp, sign) print("-" * 40 + "\n") else: print("-" * 40) print("Seebug社区数据为空,跳过执行。") def signal_handler(sig, frame): print("接收到退出信号,程序即将退出...") sys.exit(0) signal.signal(signal.SIGINT, signal_handler) # Ctrl+C signal.signal(signal.SIGTERM, signal_handler) # kill命令 def main_loop(): n = 1 while True: try: # 执行任务 print(f"第{n}次执行,当前时间为:{datetime.now().strftime('%Y-%m-%d %H:%M:%S')}") send_job(e_hour) print("执行完毕,等待下一次执行...") n += 1 time.sleep(e_hour * 60 * 60 - 5 * 60) except Exception as e: print(f"发生错误: {e}, 程序已暂停") # SendToFeishu(f"发生错误: {e}, 程序已退出", "报错信息") exit() # 探测rss源状态 def check_rss_status(url): try: response = requests.get(url, timeout=10) if response.status_code == 200 and len(response.content) > 0: return True else: return f"状态码: {response.status_code}, 内容长度: {len(response.content)}" except requests.RequestException as e: return f"请求异常: {str(e)}" def test_rss_source(): rss_sources = { "奇安信": "https://forum.butian.net/Rss", "洞见": "https://wechat.doonsec.com/bayes_rss.xml", # "华为": "https://www.huawei.com/cn/rss-feeds/psirt/rss", # "安全维基": "https://www.sec_wiki.com/news/rss", "安全客": "https://api.anquanke.com/data/v1/rss", "嘶吼": "https://www.4hou.com/feed", "Seebug社区": "https://paper.seebug.org/rss/", "FreeBuf社区": "https://www.freebuf.com/feed", "先知社区": "https://xz.aliyun.com/feed" } rss_info = "" for name, url in rss_sources.items(): status = check_rss_status(url) if status: rss_info += f"{name} 源正常\n" else: rss_info += f"{name} 源异常: {status}\n" return rss_info if __name__ == "__main__": print("程序正在运行当中。") rss_info = test_rss_source() start_info = "" start_info += "程序已启动,当前时间为:" + datetime.now().strftime("%Y-%m-%d %H:%M:%S") + "\n" start_info += "程序作者:MasonLiu \t 开源地址:[GM-gitea](https://git.masonliu.com/MasonLiu/PyBot)" + "\n" start_info += "时间配置:每隔" + str(e_hour) + "小时执行一次推送\n" SendToFeishu(start_info, "程序信息", webhook_url_once, timestamp_once, sign_once) # print(start_info) SendToFeishu(rss_info, "RSS源状态", webhook_url_once, timestamp_once, sign_once) # print(rss_info) # 首次运行先暂停两分钟 time.sleep(2 * 60) # 主程序 main_loop()