Update github_cve_monitor.py

增加配置文件,更加方便
This commit is contained in:
wuyoukm 2021-08-25 17:22:48 +08:00 committed by GitHub
parent 3414e95c82
commit f4bad53e73
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

View File

@ -13,15 +13,39 @@ import requests, time, re
import dingtalkchatbot.chatbot as cb import dingtalkchatbot.chatbot as cb
import datetime import datetime
import hashlib import hashlib
import yaml
from lxml import etree from lxml import etree
import sqlite3 import sqlite3
file = "toollist.txt"
github_headers = {
'Authorization': "token ****" # 替换自己的github token https://github.com/settings/tokens/new
}
today_cve_info_tmp = [] today_cve_info_tmp = []
tools_update_list = [] tools_update_list = []
#读取配置文件
def load_config():
with open('config.yaml', 'r') as f:
config = yaml.load(f,Loader=yaml.FullLoader)
github_token = config['all_config']['github_token']
if int(config['all_config']['dingding'][0].split(":")[1]) == 1:
dingding_webhook = "https:"+config['all_config']['dingding'][1].split(":")[2]
dingding_secretKey = config['all_config']['dingding'][2].split(":")[1]
app_name = config['all_config']['dingding'][3].split(":")[1]
return app_name,github_token,dingding_webhook,dingding_secretKey
elif int(config['all_config']['server'][0].split(":")[1]) == 1:
server_sckey = config['all_config']['server'][1].split(":")[1]
app_name = config['all_config']['server'][2].split(":")[1]
return app_name,github_token,server_sckey
elif int(config['all_config']['tgbot'][0].split(":")[1]) ==1 :
tgbot_token = config['all_config']['tgbot'][1].split(":")[1]
tgbot_group_id = config['all_config']['tgbot'][2].split(":")[1]
app_name = config['all_config']['tgbot'][3].split(":")[1]
return app_name,github_token,tgbot_token,tgbot_group_id
elif int(config['all_config']['tgbot'][0].split(":")[1]) == 0 and int(config['all_config']['server'][0].split(":")[1]) == 0 and int(config['all_config']['dingding'][0].split(":")[1]) == 0:
print("[-] 配置文件有误三个社交软件的enable不能为0")
github_headers = {
'Authorization': "token {}".format(load_config()[1]) # 替换自己的github token https://github.com/settings/tokens/new
}
#初始化创建数据库 #初始化创建数据库
def create_database(): def create_database():
conn = sqlite3.connect('data.db') conn = sqlite3.connect('data.db')
@ -118,17 +142,15 @@ def tools_insert_into_sqlite3(data):
print("tools_insert_into_sqlite3 函数 插入数据成功!") print("tools_insert_into_sqlite3 函数 插入数据成功!")
conn.close() conn.close()
#读取本地红队工具链接文件转换成list #读取本地红队工具链接文件转换成list
def timing_update_tools_list(file): def load_tools_list():
result = [] with open('tools_list.yaml', 'r') as f:
with open(file,'r') as f: list = yaml.load(f,Loader=yaml.FullLoader)
for line in f: return list['tools_list']
result.append(list(line.strip('\n').split(',')))
return result
#获取红队工具的名称,更新时间,版本名称信息 #获取红队工具的名称,更新时间,版本名称信息
def get_pushed_at_time(tools_list): def get_pushed_at_time(tools_list):
tools_info_list = [] tools_info_list = []
for url in tools_list: for url in tools_list:
tools_json = requests.get(url[0], headers=github_headers, timeout=10).json() tools_json = requests.get(url, headers=github_headers, timeout=10).json()
pushed_at_tmp = tools_json['pushed_at'] pushed_at_tmp = tools_json['pushed_at']
pushed_at = re.findall('\d{4}-\d{2}-\d{2}', pushed_at_tmp)[0] pushed_at = re.findall('\d{4}-\d{2}-\d{2}', pushed_at_tmp)[0]
tools_name = tools_json['name'] tools_name = tools_json['name']
@ -160,7 +182,7 @@ def get_tools_update_list(data):
#返回数据库里面的时间和版本 #返回数据库里面的时间和版本
tools_update_list.append({"api_url":dist['api_url'],"pushed_at":today_tools_pushed_at,"tag_name":query_result[1]}) tools_update_list.append({"api_url":dist['api_url'],"pushed_at":today_tools_pushed_at,"tag_name":query_result[1]})
return tools_update_list return tools_update_list
#获取更新信息并发送到钉钉 #获取更新信息并发送到对应社交软件
def send_dingding(url,query_pushed_at,query_tag_name): def send_dingding(url,query_pushed_at,query_tag_name):
# 考虑到有的工具没有 releases, 则通过 commits 记录获取更新描述 # 考虑到有的工具没有 releases, 则通过 commits 记录获取更新描述
# 判断是否有 releases 记录 # 判断是否有 releases 记录
@ -181,7 +203,12 @@ def send_dingding(url,query_pushed_at,query_tag_name):
tools_name = url.split('/')[-1] tools_name = url.split('/')[-1]
text = r'** ' + tools_name + r' ** 工具,版本更新啦!' text = r'** ' + tools_name + r' ** 工具,版本更新啦!'
body = "工具名称:" + tools_name + "\r\n" + "工具地址:" + download_url + "\r\n" + "工具更新日志:" + "\r\n" + update_log body = "工具名称:" + tools_name + "\r\n" + "工具地址:" + download_url + "\r\n" + "工具更新日志:" + "\r\n" + update_log
dingding(text, body) if load_config()[0] == "dingding":
dingding(text, body,load_config()[2],load_config()[3])
elif load_config()[0] == "server":
server(text, body,load_config()[2])
elif load_config()[0] == "tgbot":
tgbot(text,body,load_config()[2],load_config()[3])
conn = sqlite3.connect('data.db') conn = sqlite3.connect('data.db')
cur = conn.cursor() cur = conn.cursor()
sql_grammar = "UPDATE redteam_tools_monitor SET tag_name = '{}' WHERE tools_name='{}'".format(tag_name,tools_name) sql_grammar = "UPDATE redteam_tools_monitor SET tag_name = '{}' WHERE tools_name='{}'".format(tag_name,tools_name)
@ -203,7 +230,12 @@ def send_dingding(url,query_pushed_at,query_tag_name):
update_log = "作者未写更新内容具体点击更新详情地址的URL进行查看" update_log = "作者未写更新内容具体点击更新详情地址的URL进行查看"
text = r'** ' + tools_name + r' ** 工具小更新了一波!' text = r'** ' + tools_name + r' ** 工具小更新了一波!'
body = "工具名称:" + tools_name + "\r\n" + "更新详情地址:" + download_url + "\r\n" + "commit更新日志" + "\r\n" + update_log body = "工具名称:" + tools_name + "\r\n" + "更新详情地址:" + download_url + "\r\n" + "commit更新日志" + "\r\n" + update_log
dingding(text, body) if load_config()[0] == "dingding":
dingding(text, body,load_config()[2],load_config()[3])
elif load_config()[0] == "server":
server(text, body,load_config()[2])
elif load_config()[0] == "tgbot":
tgbot(text,body,load_config()[2],load_config()[3])
conn = sqlite3.connect('data.db') conn = sqlite3.connect('data.db')
cur = conn.cursor() cur = conn.cursor()
sql_grammar = "UPDATE redteam_tools_monitor SET pushed_at = '{}' WHERE tools_name='{}'".format(new_pushed_at,tools_name) sql_grammar = "UPDATE redteam_tools_monitor SET pushed_at = '{}' WHERE tools_name='{}'".format(new_pushed_at,tools_name)
@ -220,7 +252,12 @@ def send_dingding(url,query_pushed_at,query_tag_name):
tools_name = url.split('/')[-1] tools_name = url.split('/')[-1]
text = r'** ' + tools_name + r' ** 工具更新啦!' text = r'** ' + tools_name + r' ** 工具更新啦!'
body = "工具名称:" + tools_name + "\r\n" + "工具地址:" + download_url + "\r\n" + "commit更新日志" + "\r\n" + update_log body = "工具名称:" + tools_name + "\r\n" + "工具地址:" + download_url + "\r\n" + "commit更新日志" + "\r\n" + update_log
dingding(text, body) if load_config()[0] == "dingding":
dingding(text, body, load_config()[2], load_config()[3])
elif load_config()[0] == "server":
server(text, body, load_config()[2])
elif load_config()[0] == "tgbot":
tgbot(text, body, load_config()[2], load_config()[3])
conn = sqlite3.connect('data.db') conn = sqlite3.connect('data.db')
cur = conn.cursor() cur = conn.cursor()
sql_grammar = "UPDATE redteam_tools_monitor SET pushed_at = '{}' WHERE tools_name='{}'".format(new_pushed_at,tools_name) sql_grammar = "UPDATE redteam_tools_monitor SET pushed_at = '{}' WHERE tools_name='{}'".format(new_pushed_at,tools_name)
@ -243,12 +280,10 @@ def translate(word):
bv = nmd5(headerstr) bv = nmd5(headerstr)
lts = str(round(time.time() * 1000)) lts = str(round(time.time() * 1000))
salt = lts + '90' salt = lts + '90'
# 如果翻译失败,{'errorCode': 50} 请查看 fanyi.min.js: https://shared.ydstatic.com/fanyi/newweb/v1.1.7/scripts/newweb/fanyi.min.js # 如果翻译失败,{'errorCode': 50} 请查看 fanyi.min.js: https://shared.ydstatic.com/fanyi/newweb/v1.1.7/scripts/newweb/fanyi.min.js
# 搜索 fanyideskweb sign: n.md5("fanyideskweb" + e + i + "Y2FYu%TNSbMCxc3t2u^XT") Y2FYu%TNSbMCxc3t2u^XT是否改变替换即可 # 搜索 fanyideskweb sign: n.md5("fanyideskweb" + e + i + "Y2FYu%TNSbMCxc3t2u^XT") Y2FYu%TNSbMCxc3t2u^XT是否改变替换即可
strexample = 'fanyideskweb' + word + salt + 'Y2FYu%TNSbMCxc3t2u^XT' strexample = 'fanyideskweb' + word + salt + 'Y2FYu%TNSbMCxc3t2u^XT'
sign = nmd5(strexample) sign = nmd5(strexample)
data = { data = {
'i': word, 'i': word,
'from': 'AUTO', 'from': 'AUTO',
@ -265,7 +300,6 @@ def translate(word):
'action': 'FY_BY_CLICKBUTTION', 'action': 'FY_BY_CLICKBUTTION',
} }
url = 'http://fanyi.youdao.com/translate_o?smartresult=dict&smartresult=rule' url = 'http://fanyi.youdao.com/translate_o?smartresult=dict&smartresult=rule'
header = { header = {
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.114 Safari/537.36', 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.114 Safari/537.36',
'Referer': 'http://fanyi.youdao.com/', 'Referer': 'http://fanyi.youdao.com/',
@ -279,30 +313,26 @@ def translate(word):
'Host': 'fanyi.youdao.com', 'Host': 'fanyi.youdao.com',
'cookie': '_ntes_nnid=937f1c788f1e087cf91d616319dc536a,1564395185984; OUTFOX_SEARCH_USER_ID_NCOO=; OUTFOX_SEARCH_USER_ID=-10218418@11.136.67.24; JSESSIONID=; ___rl__test__cookies=1' 'cookie': '_ntes_nnid=937f1c788f1e087cf91d616319dc536a,1564395185984; OUTFOX_SEARCH_USER_ID_NCOO=; OUTFOX_SEARCH_USER_ID=-10218418@11.136.67.24; JSESSIONID=; ___rl__test__cookies=1'
} }
res = requests.post(url=url, data=data, headers=header) res = requests.post(url=url, data=data, headers=header)
result_dict = res.json() result_dict = res.json()
result = "" result = ""
for json_str in result_dict['translateResult'][0]: for json_str in result_dict['translateResult'][0]:
tgt = json_str['tgt'] tgt = json_str['tgt']
result += tgt result += tgt
return result return result
# 钉钉 # 钉钉
def dingding(text, msg): def dingding(text, msg,webhook,secretKey):
webhook = '*****' # 将此处换为钉钉机器人的api
secretKey = '****' # 替换自己的加签, 钉钉中机器人管理 - 加签 双击,右键复制
ding = cb.DingtalkChatbot(webhook, secret=secretKey) ding = cb.DingtalkChatbot(webhook, secret=secretKey)
ding.send_text(msg='{}\r\n{}'.format(text, msg), is_at_all=False) ding.send_text(msg='{}\r\n{}'.format(text, msg), is_at_all=False)
# server酱 http://sc.ftqq.com/?c=code # server酱 http://sc.ftqq.com/?c=code
def server(text, msg): def server(text, msg,sckey):
uri = 'https://sc.ftqq.com/xxxx.send?text={}&desp={}'.format(text, msg)# 将 xxxx 换成自己的server SCKEY uri = 'https://sc.ftqq.com/{}.send?text={}&desp={}'.format(sckey,text, msg)# 将 xxxx 换成自己的server SCKEY
requests.get(uri, headers=github_headers, timeout=10) requests.get(uri, headers=github_headers, timeout=10)
# 添加Telegram Bot推送支持 # 添加Telegram Bot推送支持
def tgbot(text, msg): def tgbot(text, msg,token,group_id):
import telegram import telegram
bot = telegram.Bot(token='123456:aaa-sdasdsa')# Your Telegram Bot Token bot = telegram.Bot(token='xxx'.format(token))# Your Telegram Bot Token
group_id = 'Your Group ID'
bot.send_message(chat_id=group_id, text='{}\r\n{}'.format(text, msg)) bot.send_message(chat_id=group_id, text='{}\r\n{}'.format(text, msg))
# 根据cve 名字,获取描述,并翻译 # 根据cve 名字,获取描述,并翻译
def get_cve_des_zh(cve): def get_cve_des_zh(cve):
@ -321,11 +351,14 @@ def sendNews(data):
try: try:
cve_name = re.findall('(cve\-\d+\-\d+)', data[i]['cve_name'])[0].upper() cve_name = re.findall('(cve\-\d+\-\d+)', data[i]['cve_name'])[0].upper()
cve_zh = get_cve_des_zh(cve_name) cve_zh = get_cve_des_zh(cve_name)
msg = "CVE编号" + cve_name + "\r\n" + "Github地址" + str(data[i]['cve_url']) + "\r\n" + "CVE描述" + "\r\n" + cve_zh body = "CVE编号" + cve_name + "\r\n" + "Github地址" + str(data[i]['cve_url']) + "\r\n" + "CVE描述" + "\r\n" + cve_zh
dingding(text, msg) if load_config()[0] == "dingding":
dingding(text, body, load_config()[2], load_config()[3])
elif load_config()[0] == "server":
server(text, body, load_config()[2])
elif load_config()[0] == "tgbot":
tgbot(text, body, load_config()[2], load_config()[3])
print("钉钉 发送 CVE 成功") print("钉钉 发送 CVE 成功")
# server(text, msg)
# tgbot(text,msg)
except IndexError: except IndexError:
pass pass
except Exception as e: except Exception as e:
@ -336,7 +369,7 @@ if __name__ == '__main__':
try: try:
#初始化部分 #初始化部分
create_database() create_database()
tools_list = timing_update_tools_list(file) tools_list = load_tools_list()
tools_data = get_pushed_at_time(tools_list) tools_data = get_pushed_at_time(tools_list)
tools_insert_into_sqlite3(tools_data) tools_insert_into_sqlite3(tools_data)
@ -348,7 +381,7 @@ if __name__ == '__main__':
cve_insert_into_sqlite3(today_cve_data) cve_insert_into_sqlite3(today_cve_data)
#红队工具部分 #红队工具部分
time.sleep(3) time.sleep(3)
tools_list = timing_update_tools_list(file) tools_list = load_tools_list()
data2 = get_pushed_at_time(tools_list) data2 = get_pushed_at_time(tools_list)
data3 = get_tools_update_list(data2) data3 = get_tools_update_list(data2)
for i in range(len(data3)): for i in range(len(data3)):