PyBot/Dev_test.py

244 lines
8.2 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

# -*- coding: utf-8 -*-
"""
@Author: MasonLiu
@Description: 本程序可以爬取各安全资讯源,并发送到飞书群组。
"""
from math import log
import schedule
import os
import signal
import sys
import time
import yaml
import requests
from datetime import datetime, timedelta
from SendCore.FeishuSendBot import SendToFeishu, gen_sign
from SendCore.QiweiSendBot import SendToWX
from spider.common import run, seebug_main, M_4hou_main, anquanke_main, sec_wiki_main, huawei_main, doonsec_main, qianxin_main
from spider.freebuf import freebuf_main
from spider.xianzhi import xianzhi_main
from spider.sougou_wx import sougou_wx_main
from spider.github import github_main, load_github_config
from spider.baidu import baidu_main
from spider.uni import uni_spider
from GotoSend.M_4hou import Src_4hou
from GotoSend.anquanke import Src_anquanke
from GotoSend.doonsec import Src_doonsec
from GotoSend.xianzhi import Src_xianzhi
from GotoSend.freebuf import Src_freebuf
from GotoSend.qianxin import Src_qianxin
from GotoSend.seebug import Src_seebug
from GotoSend.sougou_wx import Src_sougou_wx
from GotoSend.github import Src_github
from GotoSend.baidu import Src_baidu
from GotoSend.uni_rss import Src_uni_rss
from config.check_config import get_core_config, get_debug_config, get_keywords_config
from loguru import logger
# 清除所有已有的日志记录器配置
logger.remove()
logger.add("./resources/log/core.log",
format="{time:YYYY-MM-DD HH:mm:ss} - {level} - {name}:{function}:{line} - {message}",
rotation="100 MB",
compression="zip",
encoding="utf-8")
# shell终端打印日志
debug = get_debug_config()
if debug == "True":
logger.add(lambda msg: print(msg),
format="{time:YYYY-MM-DD HH:mm:ss} - {level} - {name}:{function}:{line} - {message}")
def signal_handler(sig, frame):
logger.info("接收到退出信号,程序即将退出...")
sys.exit(0)
# 全局变量
signal.signal(signal.SIGINT, signal_handler) # Ctrl+C
signal.signal(signal.SIGTERM, signal_handler) # kill命令
webhook_url_once, timestamp_once, sign_once = gen_sign()
e_hour, time_choice, choice, fs_activate, wx_activate, ding_activate, lx_activate, url_web = get_core_config()
def check_avaliable(info_long, info_short, title):
if info_long: # 发送完整文章相关内容
if fs_activate == "True":
# logger.info(f"{title} 递送中(飞书):")
# webhook_url, timestamp, sign = gen_sign()
# result = SendToFeishu(info_long, title, webhook_url, timestamp, sign)
# logger.info(result)
time.sleep(15)
if info_short: # 发送精简文章相关内容
# 企业微信相关
if wx_activate == "True":
# logger.info(f"{title} 递送中(企业微信):")
for info in info_short:
result = SendToWX(info, title)
logger.info(result)
time.sleep(15)
# 钉钉相关
if ding_activate == "True":
# logger.info(f"{title} 递送中(钉钉):")
# for info in info_short: # 开发中,暂未实现
# result = SendToDD(info, title)
# logger.info(result)
time.sleep(15)
def send_job_RSS(time_1):
Doonsec_switch, Doonsec = get_keywords_config('Doonsec')
uni_switch, Unity = get_keywords_config('Unity')
# Seebug数据获取分发
# seebug_main()
seebug_results = Src_seebug(time_1)
if seebug_results != False:
result_seebug_long, result_seebug_short = seebug_results
else:
logger.info("Seebug数据为空跳过执行。")
# 安全客数据获取分发
# anquanke_main()
anquanke_results = Src_anquanke(time_1)
if anquanke_results != False:
result_anquanke_long, result_anquanke_short = anquanke_results
else:
logger.info("安全客数据为空,跳过执行。")
# 华为数据获取分发
# huawei_main()
# 奇安信数据获取分发
# qianxin_main()
qianxin_results = Src_qianxin(time_1)
if qianxin_results != False:
result_qianxin_long, result_qianxin_short = qianxin_results
else:
logger.info("奇安信数据为空,跳过执行。")
# FreeBuf数据获取分发
# freebuf_main()
freebuf_results = Src_freebuf(time_1)
if freebuf_results != False:
result_freebuf_long, result_freebuf_short = freebuf_results
else:
logger.info("FreeBuf数据为空跳过执行。")
# 先知数据获取分发
# xianzhi_main()
xianzhi_results = Src_xianzhi(time_1)
if xianzhi_results != False:
result_xianzhi_long, result_xianzhi_short = xianzhi_results
else:
logger.info("先知数据为空,跳过执行。")
# 4hou数据获取分发
# M_4hou_main()
M_4hou_results = Src_4hou(time_1)
if M_4hou_results != False:
result_4hou_long, result_4hou_short = M_4hou_results
else:
logger.info("嘶吼数据为空,跳过执行。")
# 洞见微信安全数据获取分发
# doonsec_main()
doonsec_results = Src_doonsec(Doonsec_switch, Doonsec)
if doonsec_results != False:
result_doonsec_long, result_doonsec_short = doonsec_results
else:
logger.info("洞见微信安全数据为空,跳过执行。")
# 聚合RSS数据获取分发
uni_spider()
rss_results = Src_uni_rss(uni_switch, Unity)
if rss_results != False:
logger.info("聚合RSS数据获取完成")
else:
logger.info("聚合RSS数据为空跳过执行。")
def send_job_SX():
Sogou_WX = get_keywords_config('Sogou-WX')
sougou_wx_main(Sogou_WX)
results = Src_sougou_wx()
if results != False:
logger.info("微信公众号数据获取完成")
else:
logger.info("微信公众号数据为空,跳过执行。")
def send_job_github(time_1):
keyword_list, tool_list, user_list, black_words = load_github_config()
github_main(keyword_list, tool_list, user_list, black_words)
results = Src_github(time_1)
# 解构返回的结果
result_github_1_long, result_github_1_short = results[0]
result_github_2_long, result_github_2_short = results[1]
result_github_3_long, result_github_3_short = results[2]
result_github_4_long, result_github_4_short = results[3]
def send_job_baidu():
Baidu = get_keywords_config('Baidu')
baidu_main(Baidu)
results = Src_baidu()
if results != False:
logger.info("百度搜索已完成")
else:
logger.info("百度搜索数据为空,跳过执行。")
def main_job(e_hour):
logger.info(f"发送程序启动,当前时间为:{datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")
logger.info("正在启动各爬虫并获取资源中...")
if 0 in choice:
send_job_RSS(e_hour)
if 1 in choice:
send_job_SX()
if 2 in choice:
send_job_github(e_hour)
if 3 in choice:
send_job_baidu()
logger.info("单次运行结束,等待下一次运行...")
def main_loop(time_choice):
if time_choice == 1:
while True:
try:
# 执行任务
main_job(e_hour)
time.sleep(e_hour * 60 * 60 - 3 * 60)
except Exception as e:
logger.error(f"发生错误: {e}, 程序已暂停")
# result = SendToFeishu(f"发生错误: {e}, 程序已退出", "报错信息")
# logger.info(result)
exit()
elif time_choice == 0:
# 设置每天的特定时间点执行job函数
schedule.every().day.at("09:00").do(main_job, 12)
schedule.every().day.at("12:00").do(main_job, 3)
schedule.every().day.at("15:00").do(main_job, 3)
schedule.every().day.at("18:00").do(main_job, 3)
schedule.every().day.at("21:00").do(main_job, 3)
while True:
schedule.run_pending()
time.sleep(60) # 每分钟检查一次是否有任务需要执行
if __name__ == "__main__":
logger.info("程序正在运行当中。")
time.sleep(5) # 添加短暂的延迟
# 首次运行先暂停两分钟
# time.sleep(2 * 60)
# send_job_RSS(2400)
# send_job_baidu()
send_job_SX()
# send_job_github(240)