# -*- coding: utf-8 -*- """ @Author: MasonLiu @Description: 本程序可以爬取各安全资讯源,并发送到飞书群组。 """ import schedule import os import signal import sys import time import yaml import requests from datetime import datetime, timedelta from FeishuSendBot import SendToFeishu, gen_sign from media.common import run, seebug_main, M_4hou_main, anquanke_main, sec_wiki_main, huawei_main, doonsec_main, qianxin_main from media.freebuf import freebuf_main from media.xianzhi import xianzhi_main from GotoSend.M_4hou import Src_4hou from GotoSend.anquanke import Src_anquanke from GotoSend.doonsec import Src_doonsec from GotoSend.xianzhi import Src_xianzhi from GotoSend.freebuf import Src_freebuf from GotoSend.qianxin import Src_qianxin from GotoSend.seebug import Src_seebug from loguru import logger # 清除所有已有的日志记录器配置 logger.remove() logger.add("./log/spider.log", format="{time:YYYY-MM-DD HH:mm:ss} - {level} - {name}:{function}:{line} - {message}", rotation="100 MB", compression="zip", encoding="utf-8") # shell终端打印日志 # logger.add(lambda msg: print(msg), # format="{time:YYYY-MM-DD HH:mm:ss} - {level} - {name}:{function}:{line} - {message}") # 加载参数 with open('./config.yaml', 'r', encoding="utf-8") as file: config = yaml.safe_load(file) # sleep_time = int(f"{config['sleep_time']}") e_hour = int(f"{config['e_hour']}") choice = int(f"{config['circle']}") webhook_url_once, timestamp_once, sign_once = gen_sign() def send_job(time_1): # 爬取数据 logger.info("正在启动各爬虫并获取资源中...") seebug_main() anquanke_main() huawei_main() doonsec_main() qianxin_main() freebuf_main() xianzhi_main() M_4hou_main() # 分析各个数据源的结果 reslt_4hou = Src_4hou(time_1) reslt_anquanke = Src_anquanke(time_1) reslt_doonsec = Src_doonsec(time_1) reslt_xianzhi = Src_xianzhi(time_1) reslt_freebuf = Src_freebuf(time_1) reslt_qianxin = Src_qianxin(time_1) reslt_seebug = Src_seebug(time_1) webhook_url, timestamp, sign = gen_sign() # 发送嘶吼资讯 if reslt_4hou: # print("-" * 40) logger.info("嘶吼资讯递送中:") result = SendToFeishu(reslt_4hou, "嘶吼资讯递送", webhook_url, timestamp, sign) logger.info(result) # print("-" * 40 + "\n") time.sleep(60) else: # print("-" * 40) logger.info("嘶吼数据为空,跳过执行。") # 发送安全客资讯 if reslt_anquanke: # print("-" * 40) logger.info("安全客资讯递送中:") result = SendToFeishu(reslt_anquanke, "安全客资讯递送", webhook_url, timestamp, sign) logger.info(result) # print("-" * 40 + "\n") time.sleep(60) else: # print("-" * 40) logger.info("安全客数据为空,跳过执行。") # 发送洞见微信安全资讯 if reslt_doonsec: # print("-" * 40) logger.info("洞见微信安全资讯递送中:") result = SendToFeishu(reslt_doonsec, "洞见微信安全资讯递送", webhook_url, timestamp, sign) logger.info(result) # print("-" * 40 + "\n") time.sleep(60) else: # print("-" * 40) logger.info("洞见微信安全数据为空,跳过执行。") # 发送先知社区资讯 if reslt_xianzhi: # print("-" * 40) logger.info("先知社区资讯递送中:") result = SendToFeishu(reslt_xianzhi, "先知社区资讯递送", webhook_url, timestamp, sign) logger.info(result) # print("-" * 40 + "\n") time.sleep(60) else: # print("-" * 40) logger.info("先知社区数据为空,跳过执行。") # 发送FreeBuf资讯 if reslt_freebuf: # print("-" * 40) logger.info("FreeBuf资讯递送中:") result = SendToFeishu(reslt_freebuf, "FreeBuf资讯递送", webhook_url, timestamp, sign) logger.info(result) # print("-" * 40 + "\n") time.sleep(60) else: # print("-" * 40) logger.info("FreeBuf数据为空,跳过执行。") # 发送奇安信攻防社区资讯 if reslt_qianxin: # print("-" * 40) logger.info("奇安信攻防社区资讯递送中:") result = SendToFeishu(reslt_qianxin, "奇安信攻防社区资讯递送", webhook_url, timestamp, sign) logger.info(result) # print("-" * 40 + "\n") time.sleep(60) else: # print("-" * 40) logger.info("奇安信攻防社区数据为空,跳过执行。") # 发送Seebug资讯 if reslt_seebug: reslt_seebug = Src_seebug(1000) webhook_url, timestamp, sign = gen_sign() # print("-" * 40) logger.info("Seebug社区资讯递送中:") result = SendToFeishu(reslt_seebug, "Seebug社区资讯递送", webhook_url, timestamp, sign) logger.info(result) # print("-" * 40 + "\n") else: # print("-" * 40) logger.info("Seebug社区数据为空,跳过执行。") def signal_handler(sig, frame): logger.info("接收到退出信号,程序即将退出...") sys.exit(0) signal.signal(signal.SIGINT, signal_handler) # Ctrl+C signal.signal(signal.SIGTERM, signal_handler) # kill命令 def main_loop(choice): n = 0 if choice == 1: while True: try: # 执行任务 logger.info(f"第{n+1}次执行,当前时间为:{datetime.now().strftime('%Y-%m-%d %H:%M:%S')}") send_job(e_hour) logger.info("执行完毕,等待下一次执行...") time.sleep(e_hour * 60 * 60 - 5 * 60) except Exception as e: logger.error(f"发生错误: {e}, 程序已暂停") # result = SendToFeishu(f"发生错误: {e}, 程序已退出", "报错信息") # logger.info(result) exit() elif choice == 0: # 设置每天的特定时间点执行job函数 logger.info(f"第{n+1}次执行准备开始。") schedule.every().day.at("09:05").do(send_job, 12) schedule.every().day.at("12:05").do(send_job, 3) schedule.every().day.at("15:05").do(send_job, 3) schedule.every().day.at("18:05").do(send_job, 3) schedule.every().day.at("21:05").do(send_job, 3) while True: schedule.run_pending() n += 1 time.sleep(60) # 每分钟检查一次是否有任务需要执行 # 探测rss源状态 def check_rss_status(url): try: response = requests.get(url, timeout=10) if response.status_code == 200 and len(response.content) > 0: return True else: return f"状态码: {response.status_code}, 内容长度: {len(response.content)}" except requests.RequestException as e: return f"请求异常: {str(e)}" def test_rss_source(): rss_sources = { "奇安信": "https://forum.butian.net/Rss", "洞见": "https://wechat.doonsec.com/bayes_rss.xml", # "华为": "https://www.huawei.com/cn/rss-feeds/psirt/rss", # "安全维基": "https://www.sec_wiki.com/news/rss", "安全客": "https://api.anquanke.com/data/v1/rss", "嘶吼": "https://www.4hou.com/feed", "Seebug社区": "https://paper.seebug.org/rss/", "FreeBuf社区": "https://www.freebuf.com/feed", "先知社区": "https://xz.aliyun.com/feed" } rss_info = "" for name, url in rss_sources.items(): status = check_rss_status(url) if status: rss_info += f"{name} 源正常\n" else: rss_info += f"{name} 源异常: {status}\n" return rss_info if __name__ == "__main__": print("程序正在运行当中。") time.sleep(5) # 添加短暂的延迟 rss_info = test_rss_source() start_info = "" start_info += "程序已启动,当前时间为:" + datetime.now().strftime("%Y-%m-%d %H:%M:%S") + "\n" start_info += "程序作者:MasonLiu \t 开源地址:[GM-gitea](https://git.masonliu.com/MasonLiu/PyBot)" + "\n" if choice == 1: start_info += "时间配置:每隔" + str(e_hour) + "小时执行一次推送\n" else: start_info += "时间配置:每天固定时间点执行推送\n" result = SendToFeishu(start_info, "程序信息", webhook_url_once, timestamp_once, sign_once) logger.info(result) # print("-" * 40) result = SendToFeishu(rss_info, "RSS源状态", webhook_url_once, timestamp_once, sign_once) # logger.info(rss_info) logger.info(result) # print("-" * 40) # 首次运行先暂停两分钟 # time.sleep(2 * 60) # 主程序 main_loop(choice)