所有的web请求都加上了异常处理
1.修复红队工具和cve重复推送问题 2.待增加用户黑名单功能,防止恶意批量创建CVE仓库造成的无意义推送
This commit is contained in:
parent
eaa826a3a2
commit
7fdde32532
@ -197,6 +197,7 @@ def load_tools_list():
|
|||||||
def get_pushed_at_time(tools_list):
|
def get_pushed_at_time(tools_list):
|
||||||
tools_info_list = []
|
tools_info_list = []
|
||||||
for url in tools_list:
|
for url in tools_list:
|
||||||
|
try:
|
||||||
tools_json = requests.get(url, headers=github_headers, timeout=10).json()
|
tools_json = requests.get(url, headers=github_headers, timeout=10).json()
|
||||||
pushed_at_tmp = tools_json['pushed_at']
|
pushed_at_tmp = tools_json['pushed_at']
|
||||||
pushed_at = re.findall('\d{4}-\d{2}-\d{2}', pushed_at_tmp)[0] #获取的是API上的时间
|
pushed_at = re.findall('\d{4}-\d{2}-\d{2}', pushed_at_tmp)[0] #获取的是API上的时间
|
||||||
@ -208,6 +209,8 @@ def get_pushed_at_time(tools_list):
|
|||||||
except Exception as e:
|
except Exception as e:
|
||||||
tag_name = "no releases"
|
tag_name = "no releases"
|
||||||
tools_info_list.append({"tools_name":tools_name,"pushed_at":pushed_at,"api_url":api_url,"tag_name":tag_name})
|
tools_info_list.append({"tools_name":tools_name,"pushed_at":pushed_at,"api_url":api_url,"tag_name":tag_name})
|
||||||
|
except Exception as e:
|
||||||
|
logging.error("get_pushed_at_time BUG -> {}".format(e))
|
||||||
|
|
||||||
return tools_info_list
|
return tools_info_list
|
||||||
#根据红队名名称查询数据库红队工具的更新时间以及版本名称并返回
|
#根据红队名名称查询数据库红队工具的更新时间以及版本名称并返回
|
||||||
@ -245,7 +248,7 @@ def get_tools_update_list(data):
|
|||||||
# todo BUG在数组
|
# todo BUG在数组
|
||||||
return tools_update_list
|
return tools_update_list
|
||||||
#获取更新信息并发送到对应社交软件
|
#获取更新信息并发送到对应社交软件
|
||||||
def send_dingding(url,query_pushed_at,query_tag_name):
|
def send_body(url,query_pushed_at,query_tag_name):
|
||||||
# 考虑到有的工具没有 releases, 则通过 commits 记录获取更新描述
|
# 考虑到有的工具没有 releases, 则通过 commits 记录获取更新描述
|
||||||
# 判断是否有 releases 记录
|
# 判断是否有 releases 记录
|
||||||
json_str = requests.get(url + '/releases', headers=github_headers, timeout=10).json()
|
json_str = requests.get(url + '/releases', headers=github_headers, timeout=10).json()
|
||||||
@ -393,21 +396,30 @@ def dingding(text, msg,webhook,secretKey):
|
|||||||
ding.send_text(msg='{}\r\n{}'.format(text, msg), is_at_all=False)
|
ding.send_text(msg='{}\r\n{}'.format(text, msg), is_at_all=False)
|
||||||
# server酱 http://sc.ftqq.com/?c=code
|
# server酱 http://sc.ftqq.com/?c=code
|
||||||
def server(text, msg,sckey):
|
def server(text, msg,sckey):
|
||||||
|
try:
|
||||||
uri = 'https://sc.ftqq.com/{}.send?text={}&desp={}'.format(sckey,text, msg)# 将 xxxx 换成自己的server SCKEY
|
uri = 'https://sc.ftqq.com/{}.send?text={}&desp={}'.format(sckey,text, msg)# 将 xxxx 换成自己的server SCKEY
|
||||||
requests.get(uri, headers=github_headers, timeout=10)
|
requests.get(uri, headers=github_headers, timeout=10)
|
||||||
|
except Exception as e:
|
||||||
|
logging.error("server酱 BUG -> {}".format(e))
|
||||||
# 添加Telegram Bot推送支持
|
# 添加Telegram Bot推送支持
|
||||||
def tgbot(text, msg,token,group_id):
|
def tgbot(text, msg,token,group_id):
|
||||||
import telegram
|
import telegram
|
||||||
|
try:
|
||||||
bot = telegram.Bot(token='xxx'.format(token))# Your Telegram Bot Token
|
bot = telegram.Bot(token='xxx'.format(token))# Your Telegram Bot Token
|
||||||
bot.send_message(chat_id=group_id, text='{}\r\n{}'.format(text, msg))
|
bot.send_message(chat_id=group_id, text='{}\r\n{}'.format(text, msg))
|
||||||
|
except Exception as e:
|
||||||
|
logging.error("tgbot BUG -> {}".format(e))
|
||||||
# 根据cve 名字,获取描述,并翻译
|
# 根据cve 名字,获取描述,并翻译
|
||||||
def get_cve_des_zh(cve):
|
def get_cve_des_zh(cve):
|
||||||
time.sleep(3)
|
time.sleep(3)
|
||||||
|
try:
|
||||||
query_cve_url = "https://cve.mitre.org/cgi-bin/cvename.cgi?name=" + cve
|
query_cve_url = "https://cve.mitre.org/cgi-bin/cvename.cgi?name=" + cve
|
||||||
response = requests.get(query_cve_url, headers=github_headers, timeout=10)
|
response = requests.get(query_cve_url, headers=github_headers, timeout=10)
|
||||||
html = etree.HTML(response.text)
|
html = etree.HTML(response.text)
|
||||||
des = html.xpath('//*[@id="GeneratedTable"]/table//tr[4]/td/text()')[0].strip()
|
des = html.xpath('//*[@id="GeneratedTable"]/table//tr[4]/td/text()')[0].strip()
|
||||||
return translate(des)
|
return translate(des)
|
||||||
|
except Exception as e:
|
||||||
|
logging.error("CVE 官网连接 不通 具体BUG -> {}".format(e))
|
||||||
#发送CVE信息到钉钉
|
#发送CVE信息到钉钉
|
||||||
def sendNews(data):
|
def sendNews(data):
|
||||||
try:
|
try:
|
||||||
@ -460,7 +472,7 @@ if __name__ == '__main__':
|
|||||||
for i in range(len(data3)):
|
for i in range(len(data3)):
|
||||||
try:
|
try:
|
||||||
logging.error("[+++] data3 数据 : api_url - > {} pushed_at - > {} tag_name - > {}".format(data3[i]['api_url'],data3[i]['pushed_at'],data3[i]['tag_name']))
|
logging.error("[+++] data3 数据 : api_url - > {} pushed_at - > {} tag_name - > {}".format(data3[i]['api_url'],data3[i]['pushed_at'],data3[i]['tag_name']))
|
||||||
send_dingding(data3[i]['api_url'],data3[i]['pushed_at'],data3[i]['tag_name'])
|
send_body(data3[i]['api_url'],data3[i]['pushed_at'],data3[i]['tag_name'])
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
print("main函数 try循环 遇到错误-->{}".format(e))
|
print("main函数 try循环 遇到错误-->{}".format(e))
|
||||||
logging.error("main函数 try循环 遇到错误-->{}".format(e))
|
logging.error("main函数 try循环 遇到错误-->{}".format(e))
|
||||||
|
Loading…
Reference in New Issue
Block a user