# -*- coding: utf-8 -*- """ @Author: MasonLiu @Description: 本程序可以爬取各安全资讯源,并发送到飞书群组。 """ import schedule import os import signal import sys import time import yaml import requests from datetime import datetime, timedelta from SendCore.FeishuSendBot import SendToFeishu, gen_sign from SendCore.QiweiSendBot import SendToWX from spider.common import run, seebug_main, M_4hou_main, anquanke_main, sec_wiki_main, huawei_main, doonsec_main, qianxin_main from spider.freebuf import freebuf_main from spider.xianzhi import xianzhi_main from spider.sougou_wx import sougou_wx_main from spider.github import github_main, load_github_config from spider.baidu import baidu_main from GotoSend.M_4hou import Src_4hou from GotoSend.anquanke import Src_anquanke from GotoSend.doonsec import Src_doonsec from GotoSend.xianzhi import Src_xianzhi from GotoSend.freebuf import Src_freebuf from GotoSend.qianxin import Src_qianxin from GotoSend.seebug import Src_seebug from GotoSend.sougou_wx import Src_sougou_wx from GotoSend.github import Src_github from GotoSend.baidu import Src_baidu from config.check_config import get_core_config, get_debug_config, get_keywords_config from loguru import logger # 清除所有已有的日志记录器配置 logger.remove() logger.add("./resources/log/core.log", format="{time:YYYY-MM-DD HH:mm:ss} - {level} - {name}:{function}:{line} - {message}", rotation="100 MB", compression="zip", encoding="utf-8") # shell终端打印日志 debug = get_debug_config() if debug == "True": logger.add(lambda msg: print(msg), format="{time:YYYY-MM-DD HH:mm:ss} - {level} - {name}:{function}:{line} - {message}") def signal_handler(sig, frame): logger.info("接收到退出信号,程序即将退出...") sys.exit(0) # 全局变量 signal.signal(signal.SIGINT, signal_handler) # Ctrl+C signal.signal(signal.SIGTERM, signal_handler) # kill命令 webhook_url_once, timestamp_once, sign_once = gen_sign() e_hour, time_choice, choice, fs_activate, wx_activate, ding_activate, lx_activate, url_web = get_core_config() def check_avaliable(info_long, info_short, title): if info_long: # 发送完整文章相关内容 if fs_activate == "True": # logger.info(f"{title} 递送中(飞书):") webhook_url, timestamp, sign = gen_sign() result = SendToFeishu(info_long, title, webhook_url, timestamp, sign) logger.info(result) time.sleep(15) if info_short: # 发送精简文章相关内容 # 企业微信相关 if wx_activate == "True": # logger.info(f"{title} 递送中(企业微信):") for info in info_short: result = SendToWX(info, title) logger.info(result) time.sleep(15) # 钉钉相关 if ding_activate == "True": # logger.info(f"{title} 递送中(钉钉):") # for info in info_short: # 开发中,暂未实现 # result = SendToDD(info, title) # logger.info(result) time.sleep(15) def send_job_RSS(time_1): Doonsec_switch, Doonsec = get_keywords_config('Doonsec') # Seebug数据获取分发 seebug_main() seebug_results = Src_seebug(time_1) if seebug_results != False: result_seebug_long, result_seebug_short = seebug_results check_avaliable(result_seebug_long, result_seebug_short, "Seebug社区资讯") else: logger.info("Seebug数据为空,跳过执行。") # 安全客数据获取分发 anquanke_main() anquanke_results = Src_anquanke(time_1) if anquanke_results != False: result_anquanke_long, result_anquanke_short = anquanke_results check_avaliable(result_anquanke_long, result_anquanke_short, "安全客资讯") else: logger.info("安全客数据为空,跳过执行。") # 华为数据获取分发 huawei_main() # 奇安信数据获取分发 qianxin_main() qianxin_results = Src_qianxin(time_1) if qianxin_results != False: result_qianxin_long, result_qianxin_short = qianxin_results check_avaliable(result_qianxin_long, result_qianxin_short, "奇安信攻防社区资讯") else: logger.info("奇安信数据为空,跳过执行。") # FreeBuf数据获取分发 freebuf_main() freebuf_results = Src_freebuf(time_1) if freebuf_results != False: result_freebuf_long, result_freebuf_short = freebuf_results check_avaliable(result_freebuf_long, result_freebuf_short, "FreeBuf资讯") else: logger.info("FreeBuf数据为空,跳过执行。") # 先知数据获取分发 xianzhi_main() xianzhi_results = Src_xianzhi(time_1) if xianzhi_results != False: result_xianzhi_long, result_xianzhi_short = xianzhi_results check_avaliable(result_xianzhi_long, result_xianzhi_short, "先知社区资讯") else: logger.info("先知数据为空,跳过执行。") # 4hou数据获取分发 M_4hou_main() M_4hou_results = Src_4hou(time_1) if M_4hou_results != False: result_4hou_long, result_4hou_short = M_4hou_results check_avaliable(result_4hou_long, result_4hou_short, "嘶吼资讯") else: logger.info("嘶吼数据为空,跳过执行。") # 洞见微信安全数据获取分发 doonsec_main() doonsec_results = Src_doonsec(Doonsec_switch, Doonsec) if doonsec_results != False: result_doonsec_long, result_doonsec_short = doonsec_results check_avaliable(result_doonsec_long, result_doonsec_short, "洞见微信安全资讯") else: logger.info("洞见微信安全数据为空,跳过执行。") def send_job_SX(): Sogou_WX = get_keywords_config('Sogou-WX') sougou_wx_main(Sogou_WX) result_sx_long, result_sx_short = Src_sougou_wx() check_avaliable(result_sx_long, result_sx_short, "微信公众号关键词相关内容") def send_job_github(time_1): keyword_list, tool_list, user_list, black_words = load_github_config() github_main(keyword_list, tool_list, user_list, black_words) results = Src_github(time_1) # 解构返回的结果 result_github_1_long, result_github_1_short = results[0] result_github_2_long, result_github_2_short = results[1] result_github_3_long, result_github_3_short = results[2] result_github_4_long, result_github_4_short = results[3] # 检查并处理结果 check_avaliable(result_github_1_long, result_github_1_short, "Github项目监控-关键词监控") check_avaliable(result_github_2_long, result_github_2_short, "Github项目监控-项目更新情况") check_avaliable(result_github_3_long, result_github_3_short, "Github项目监控-大佬工具") check_avaliable(result_github_4_long, result_github_4_short, "Github项目监控-项目版本发布监测") def send_job_baidu(): Baidu = get_keywords_config('Baidu') baidu_main(Baidu) result_baidu_long, result_baidu_short = Src_baidu() check_avaliable(result_baidu_long, result_baidu_short, "百度搜索关键词相关内容") # 探测rss源状态 def check_rss_status(url): try: response = requests.get(url, timeout=10) if response.status_code == 200 and len(response.content) > 0: return True else: return f"状态码: {response.status_code}, 内容长度: {len(response.content)}" except requests.RequestException as e: return f"请求异常: {str(e)}" def test_rss_source(): rss_sources = { "奇安信": "https://forum.butian.net/Rss", "洞见": "https://wechat.doonsec.com/bayes_rss.xml", # "华为": "https://www.huawei.com/cn/rss-feeds/psirt/rss", # "安全维基": "https://www.sec_wiki.com/news/rss", "安全客": "https://api.anquanke.com/data/v1/rss", "嘶吼": "https://www.4hou.com/feed", "Seebug社区": "https://paper.seebug.org/rss/", "FreeBuf社区": "https://www.freebuf.com/feed", "先知社区": "https://xz.aliyun.com/feed" } rss_info = "" for name, url in rss_sources.items(): status = check_rss_status(url) if status: rss_info += f"{name} 源正常\n" else: rss_info += f"{name} 源异常: {status}\n" return rss_info def main_job(e_hour): logger.info(f"发送程序启动,当前时间为:{datetime.now().strftime('%Y-%m-%d %H:%M:%S')}") logger.info("正在启动各爬虫并获取资源中...") if 0 in choice: send_job_RSS(e_hour) if 1 in choice: send_job_SX() if 2 in choice: send_job_github(e_hour) if 3 in choice: send_job_baidu() logger.info("单次运行结束,等待下一次运行...") def main_loop(time_choice): if time_choice == 1: while True: try: # 执行任务 main_job(e_hour) time.sleep(e_hour * 60 * 60 - 3 * 60) except Exception as e: logger.error(f"发生错误: {e}, 程序已暂停") # result = SendToFeishu(f"发生错误: {e}, 程序已退出", "报错信息") # logger.info(result) exit() elif time_choice == 0: # 设置每天的特定时间点执行job函数 schedule.every().day.at("09:00").do(main_job, 12) schedule.every().day.at("12:00").do(main_job, 3) schedule.every().day.at("15:00").do(main_job, 3) schedule.every().day.at("18:00").do(main_job, 3) schedule.every().day.at("21:00").do(main_job, 3) while True: schedule.run_pending() time.sleep(60) # 每分钟检查一次是否有任务需要执行 def send_first_message(): start_info = "" start_info += "程序已启动,当前时间为:" + datetime.now().strftime("%Y-%m-%d %H:%M:%S") + "\n" start_info += "程序作者:MasonLiu \t 开源地址:[GM-gitea](https://git.masonliu.com/MasonLiu/PyBot)" + "\n" if time_choice == 1: start_info += "时间配置:每隔" + str(e_hour) + "小时执行一次推送\n" elif time_choice == 0: start_info += "时间配置:每天固定时间点执行推送\n" start_info += "开启状态:\n" if 0 in choice: start_info += "RSS源监测\n" if 1 in choice: # start_info += "搜狗-微信公众号监测\n" Sogou_WX = get_keywords_config('Sogou-WX') start_info += f"微信公众号监测关键词:{Sogou_WX}\n" if 2 in choice: # start_info += "Github项目监测\n" with open('./config/github_config.yaml', 'r', encoding="utf-8") as file: config = yaml.safe_load(file) tool_list = config['tool_list'] start_info += f"Github监控项目:{tool_list}\n" keyword_list = config['keyword_list'] start_info += f"Github监控关键词:{keyword_list}\n" user_list = config['user_list'] start_info += f"Github监控用户:{user_list}\n" if 3 in choice: # start_info += "百度搜索关键词内容监测\n" Baidu = get_keywords_config('Baidu') start_info += f"百度搜索关键词:{Baidu}\n" if fs_activate == "True": result = SendToFeishu(start_info, "程序信息", webhook_url_once, timestamp_once, sign_once) logger.info(result) send_result = SendToFeishu(f"[点此访问]({url_web})网站以查看全部文章。", "首次运行提醒", webhook_url_once, timestamp_once, sign_once) logger.info(send_result) if wx_activate == "True": result = SendToWX(start_info, "程序信息") logger.info(result) send_result = SendToWX(f"[点此访问]({url_web})网站以查看全部文章,若未开启网站请忽略本条消息。", "首次运行提醒") logger.info(send_result) if 0 in choice: rss_info = test_rss_source() if fs_activate == "True": result = SendToFeishu(rss_info, "RSS源状态", webhook_url_once, timestamp_once, sign_once) # logger.info(rss_info) logger.info(result) else: pass if wx_activate == "True": result = SendToWX(rss_info, "RSS源状态") # logger.info(rss_info) logger.info(result) else: pass if __name__ == "__main__": logger.info("程序正在运行当中。") time.sleep(5) # 添加短暂的延迟 # 首次运行先暂停两分钟 # time.sleep(2 * 60) # 主程序 send_first_message() main_loop(time_choice)