Update github_cve_monitor.py

重构代码,优化逻辑,新增sqlite3数据库
This commit is contained in:
wuyoukm 2021-08-25 13:21:00 +08:00 committed by GitHub
parent 8fc7fa1b92
commit c8503a8df6
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

View File

@ -8,67 +8,228 @@
# https://my.oschina.net/u/4581868/blog/4380482 # https://my.oschina.net/u/4581868/blog/4380482
# https://github.com/kiang70/Github-Monitor # https://github.com/kiang70/Github-Monitor
import json import json
from collections import OrderedDict
import requests, time, re import requests, time, re
import dingtalkchatbot.chatbot as cb import dingtalkchatbot.chatbot as cb
import datetime import datetime
import hashlib import hashlib
from lxml import etree from lxml import etree
import traceback import sqlite3
file = "toollist.txt"
github_headers = { github_headers = {
'Authorization': "token xxxxxx" # 替换自己的github token https://github.com/settings/tokens/new 'Authorization': "token ****" # 替换自己的github token https://github.com/settings/tokens/new
} }
today_cve_info_tmp = []
tools_update_list = []
# 抓取本年cve #初始化创建数据库
def create_database():
conn = sqlite3.connect('data.db')
print("create_database 函数 连接数据库成功!")
cur = conn.cursor()
try:
cur.execute('''CREATE TABLE IF NOT EXISTS cve_monitor
(cve_name varchar(255),
pushed_at varchar(255),
cve_url varchar(255));''')
print("成功创建CVE监控表")
cur.execute('''CREATE TABLE IF NOT EXISTS redteam_tools_monitor
(tools_name varchar(255),
pushed_at varchar(255),
tag_name varchar(255));''')
print("成功创建红队工具监控表")
except Exception as e:
print("创建cve监控表失败报错{}".format(e))
conn.commit() # 数据库存储在硬盘上需要commit 存储在内存中的数据库不需要
conn.close()
# 根据排序获取本年前20条CVE
def getNews(): def getNews():
try: try:
# 抓取本年的 # 抓取本年的
year = datetime.datetime.now().year year = datetime.datetime.now().year
api = "https://api.github.com/search/repositories?q=CVE-{}&sort=updated".format(year) api = "https://api.github.com/search/repositories?q=CVE-{}&sort=updated".format(year)
json_str = requests.get(api, headers=github_headers, timeout=10).json() json_str = requests.get(api, headers=github_headers, timeout=10).json()
cve_total_count = json_str['total_count'] # cve_total_count = json_str['total_count']
cve_description = json_str['items'][0]['description'] # cve_description = json_str['items'][0]['description']
cve_url = json_str['items'][0]['html_url'] today_date = datetime.date.today()
cve_name = json_str['items'][0]['name'] for i in range(20):
return cve_total_count, cve_description, cve_url,cve_name cve_url = json_str['items'][i]['html_url']
cve_name = json_str['items'][i]['name'].lower()
pushed_at_tmp = json_str['items'][i]['pushed_at']
pushed_at = re.findall('\d{4}-\d{2}-\d{2}', pushed_at_tmp)[0]
# today_cve_info_tmp.append({"cve_name": cve_name, "cve_url": cve_url, "pushed_at": pushed_at})
if pushed_at == str(today_date):
today_cve_info_tmp.append({"cve_name":cve_name,"cve_url":cve_url,"pushed_at":pushed_at})
# print(today_cve_info)
today_cve_info = OrderedDict()
for item in today_cve_info_tmp:
today_cve_info.setdefault(item['cve_name'], {**item, })
today_cve_info = list(today_cve_info.values())
return today_cve_info
# return cve_total_count, cve_description, cve_url, cve_name
#\d{4}-\d{2}-\d{2}
except Exception as e: except Exception as e:
print(e, "github链接不通") print(e, "github链接不通")
return '', '', '' return '', '', ''
#获取到的CVE信息插入到数据库
def cve_insert_into_sqlite3(data):
# 通过 pushed_at 检查工具是否更新 conn = sqlite3.connect('data.db')
print("cve_insert_into_sqlite3 函数 打开数据库成功!")
cur = conn.cursor()
for i in range(len(data)):
try:
cve_name = re.findall('(cve\-\d+\-\d+)', data[i]['cve_name'])[0].upper()
cur.execute("INSERT INTO cve_monitor (cve_name,pushed_at,cve_url) VALUES ('{}', '{}', '{}')".format(cve_name, data[i]['pushed_at'], data[i]['cve_url']))
except Exception as e:
pass
conn.commit()
print("cve_insert_into_sqlite3 函数 插入数据成功!")
conn.close()
#查询数据库里是否存在该CVE的方法
def query_cve_info_database(cve_name):
conn = sqlite3.connect('data.db')
cur = conn.cursor()
sql_grammar = "SELECT cve_name FROM cve_monitor WHERE cve_name = '{}';".format(cve_name)
cursor = cur.execute(sql_grammar)
return len(list(cursor))
#获取不存在数据库里的CVE信息
def get_today_cve_info(today_cve_info_data):
today_all_cve_info = []
# today_cve_info_data = getNews()
for i in range(len(today_cve_info_data)):
today_cve_name = today_cve_info_data[i]['cve_name']
Verify = query_cve_info_database(today_cve_name.upper())
if Verify == 0:
print("[+] 数据库里不存在{}".format(today_cve_name.upper()))
today_all_cve_info.append(today_cve_info_data[i])
else:
print("[-] 数据库里存在{}".format(today_cve_name.upper()))
return today_all_cve_info
#获取红队工具信息插入到数据库
def tools_insert_into_sqlite3(data):
conn = sqlite3.connect('data.db')
print("tools_insert_into_sqlite3 函数 打开数据库成功!")
cur = conn.cursor()
for i in range(len(data)):
cur.execute("INSERT INTO redteam_tools_monitor (tools_name,pushed_at,tag_name) VALUES ('{}', '{}','{}')".format(data[i]['tools_name'], data[i]['pushed_at'],data[i]['tag_name']))
conn.commit()
print("tools_insert_into_sqlite3 函数 插入数据成功!")
conn.close()
#读取本地红队工具链接文件转换成list
def timing_update_tools_list(file):
result = []
with open(file,'r') as f:
for line in f:
result.append(list(line.strip('\n').split(',')))
return result
#获取红队工具的名称,更新时间,版本名称信息
def get_pushed_at_time(tools_list): def get_pushed_at_time(tools_list):
total_list = [] tools_info_list = []
for url in tools_list: for url in tools_list:
pushed_at = requests.get(url, headers=github_headers, timeout=10).json()['pushed_at'] tools_json = requests.get(url[0], headers=github_headers, timeout=10).json()
total_list.append(pushed_at) pushed_at_tmp = tools_json['pushed_at']
pushed_at = re.findall('\d{4}-\d{2}-\d{2}', pushed_at_tmp)[0]
tools_name = tools_json['name']
api_url = tools_json['url']
try:
releases_json = requests.get(url[0]+"/releases", headers=github_headers, timeout=10).json()
tag_name = releases_json[0]['tag_name']
except Exception as e:
tag_name = "no releases"
tools_info_list.append({"tools_name":tools_name,"pushed_at":pushed_at,"api_url":api_url,"tag_name":tag_name})
return total_list return tools_info_list
#根据红队名名称查询数据库红队工具的更新时间以及版本名称并返回
def tools_query_sqlite3(tools_name):
def get_update_log(url): conn = sqlite3.connect('data.db')
cur = conn.cursor()
sql_grammar = "SELECT pushed_at,tag_name FROM redteam_tools_monitor WHERE tools_name = '{}';".format(tools_name)
cursor = cur.execute(sql_grammar)
for result in cursor:
return (result[0],result[1])
conn.close()
#获取更新了的红队工具信息
def get_tools_update_list(data):
for dist in data:
query_result = tools_query_sqlite3(dist['tools_name'])
today_tools_pushed_at = query_result[0]
if dist['pushed_at'] != today_tools_pushed_at:
print("今日获取时间: ",dist['pushed_at'],"获取数据库时间: ",today_tools_pushed_at,dist['tools_name'],"update!!!!")
#返回数据库里面的时间和版本
tools_update_list.append({"api_url":dist['api_url'],"pushed_at":today_tools_pushed_at,"tag_name":query_result[1]})
return tools_update_list
#获取更新信息并发送到钉钉
def send_dingding(url,query_pushed_at,query_tag_name):
# 考虑到有的工具没有 releases, 则通过 commits 记录获取更新描述 # 考虑到有的工具没有 releases, 则通过 commits 记录获取更新描述
# 判断是否有 releases 记录 # 判断是否有 releases 记录
json_str = requests.get(url + '/releases', headers=github_headers, timeout=10).json() json_str = requests.get(url + '/releases', headers=github_headers, timeout=10).json()
new_pushed_at = re.findall('\d{4}-\d{2}-\d{2}', requests.get(url, headers=github_headers, timeout=10).json()['pushed_at'])[0]
if len(json_str) != 0: if len(json_str) != 0:
try: release_published_at_tmp = json_str[0]['published_at']
update_log = json_str[0]['body'] release_published_at = re.findall('\d{4}-\d{2}-\d{2}', release_published_at_tmp)[0]
except Exception as e: tag_name = json_str[0]['tag_name']
update_log = "作者未写更新内容"
download_url = json_str[0]['html_url'] print("[*] 数据库里的pushed_at -->",query_pushed_at,";;;; api的pushed_at -->",new_pushed_at)
tools_version = json_str[0]['name'] if query_pushed_at != new_pushed_at and tag_name != query_tag_name:
return update_log, download_url,len(json_str),tools_version try:
update_log = json_str[0]['body']
except Exception as e:
update_log = "作者未写更新内容"
download_url = json_str[0]['html_url']
tools_name = url.split('/')[-1]
text = r'** ' + tools_name + r' ** 工具,版本更新啦!'
body = "工具名称:" + tools_name + "\r\n" + "工具地址:" + download_url + "\r\n" + "工具更新日志:" + "\r\n" + update_log
dingding(text, body)
conn = sqlite3.connect('data.db')
cur = conn.cursor()
sql_grammar = "UPDATE redteam_tools_monitor SET tag_name = '{}' WHERE tools_name='{}'".format(tag_name,tools_name)
sql_grammar1 = "UPDATE redteam_tools_monitor SET pushed_at = '{}' WHERE tools_name='{}'".format(new_pushed_at, tools_name)
cur.execute(sql_grammar)
cur.execute(sql_grammar1)
conn.commit()
conn.close()
print("[+] tools_name -->", tools_name, "pushed_at 已更新现在pushed_at 为 -->", new_pushed_at,"tag_name 已更新现在tag_name为 -->",tag_name)
else:
commits_url = url + "/commits"
commits_url_response_json = requests.get(commits_url).text
commits_json = json.loads(commits_url_response_json)
tools_name = url.split('/')[-1]
download_url = commits_json[0]['html_url']
try:
update_log = commits_json[0]['commit']['message']
except Exception as e:
update_log = "作者未写更新内容具体点击更新详情地址的URL进行查看"
text = r'** ' + tools_name + r' ** 工具小更新了一波!'
body = "工具名称:" + tools_name + "\r\n" + "更新详情地址:" + download_url + "\r\n" + "commit更新日志" + "\r\n" + update_log
dingding(text, body)
conn = sqlite3.connect('data.db')
cur = conn.cursor()
sql_grammar = "UPDATE redteam_tools_monitor SET pushed_at = '{}' WHERE tools_name='{}'".format(new_pushed_at,tools_name)
cur.execute(sql_grammar)
conn.commit()
conn.close()
print("[+] tools_name -->",tools_name,"pushed_at 已更新现在pushed_at 为 -->",new_pushed_at)
# return update_log, download_url, tools_version
else: else:
json_str = requests.get(url + '/commits', headers=github_headers, timeout=10).json() json_str = requests.get(url + '/commits', headers=github_headers, timeout=10).json()
update_log = json_str[0]['commit']['message'] update_log = json_str[0]['commit']['message']
download_url = json_str[0]['html_url'] download_url = json_str[0]['html_url']
return update_log, download_url tools_name = url.split('/')[-1]
text = r'** ' + tools_name + r' ** 工具更新啦!'
body = "工具名称:" + tools_name + "\r\n" + "工具地址:" + download_url + "\r\n" + "commit更新日志" + "\r\n" + update_log
dingding(text, body)
conn = sqlite3.connect('data.db')
cur = conn.cursor()
sql_grammar = "UPDATE redteam_tools_monitor SET pushed_at = '{}' WHERE tools_name='{}'".format(new_pushed_at,tools_name)
cur.execute(sql_grammar)
conn.commit()
conn.close()
print("[+] tools_name -->", tools_name, "pushed_at 已更新现在pushed_at 为 -->", new_pushed_at)
# return update_log, download_url
# 创建md5对象 # 创建md5对象
def nmd5(str): def nmd5(str):
m = hashlib.md5() m = hashlib.md5()
@ -76,8 +237,6 @@ def nmd5(str):
m.update(b) m.update(b)
str_md5 = m.hexdigest() str_md5 = m.hexdigest()
return str_md5 return str_md5
# 有道翻译 # 有道翻译
def translate(word): def translate(word):
headerstr = 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.114 Safari/537.36' headerstr = 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.114 Safari/537.36'
@ -129,159 +288,70 @@ def translate(word):
tgt = json_str['tgt'] tgt = json_str['tgt']
result += tgt result += tgt
return result return result
# 钉钉 # 钉钉
def dingding(text, msg): def dingding(text, msg):
# 将此处换为钉钉机器人的api webhook = '*****' # 将此处换为钉钉机器人的api
webhook = 'xxxxxxxx' secretKey = '****' # 替换自己的加签, 钉钉中机器人管理 - 加签 双击,右键复制
secretKey = 'xxxxxxxx' # 替换自己的加签, 钉钉中机器人管理 - 加签 双击,右键复制
ding = cb.DingtalkChatbot(webhook, secret=secretKey) ding = cb.DingtalkChatbot(webhook, secret=secretKey)
ding.send_text(msg='{}\r\n{}'.format(text, msg), is_at_all=False) ding.send_text(msg='{}\r\n{}'.format(text, msg), is_at_all=False)
# server酱 http://sc.ftqq.com/?c=code # server酱 http://sc.ftqq.com/?c=code
def server(text, msg): def server(text, msg):
# 将 xxxx 换成自己的server SCKEY uri = 'https://sc.ftqq.com/xxxx.send?text={}&desp={}'.format(text, msg)# 将 xxxx 换成自己的server SCKEY
uri = 'https://sc.ftqq.com/xxxx.send?text={}&desp={}'.format(text, msg)
requests.get(uri, headers=github_headers, timeout=10) requests.get(uri, headers=github_headers, timeout=10)
# 添加Telegram Bot推送支持 # 添加Telegram Bot推送支持
def tgbot(text, msg): def tgbot(text, msg):
import telegram import telegram
# Your Telegram Bot Token bot = telegram.Bot(token='123456:aaa-sdasdsa')# Your Telegram Bot Token
bot = telegram.Bot(token='123456:aaa-sdasdsa')
group_id = 'Your Group ID' group_id = 'Your Group ID'
bot.send_message(chat_id=group_id, text='{}\r\n{}'.format(text, msg)) bot.send_message(chat_id=group_id, text='{}\r\n{}'.format(text, msg))
# 通过检查name 和 description 中是否存在test字样排除test
def regular(req):
cve_name = req['items'][0]['name']
cve_description = req['items'][0]['description']
if cve_name.lower().find('test') == -1 and cve_description.lower().find('test') == -1:
return True
return False
# 根据cve 名字,获取描述,并翻译 # 根据cve 名字,获取描述,并翻译
def get_cve_des_zh(cve): def get_cve_des_zh(cve):
time.sleep(3)
query_cve_url = "https://cve.mitre.org/cgi-bin/cvename.cgi?name=" + cve query_cve_url = "https://cve.mitre.org/cgi-bin/cvename.cgi?name=" + cve
response = requests.get(query_cve_url, headers=github_headers, timeout=10) response = requests.get(query_cve_url, headers=github_headers, timeout=10)
html = etree.HTML(response.text) html = etree.HTML(response.text)
des = html.xpath('//*[@id="GeneratedTable"]/table//tr[4]/td/text()')[0].strip() des = html.xpath('//*[@id="GeneratedTable"]/table//tr[4]/td/text()')[0].strip()
return translate(des) return translate(des)
#发送CVE信息到钉钉
def sendNews(data):
def sendNews(tools_list): try:
while True: text = r'有新的CVE送达'
try: # 获取 cve 名字 根据cve 名字,获取描述,并翻译
print("cve 和 github 发布工具 监控中 ...") for i in range(len(data)):
# 抓取本年的cve try:
year = datetime.datetime.now().year cve_name = re.findall('(cve\-\d+\-\d+)', data[i]['cve_name'])[0].upper()
api = "https://api.github.com/search/repositories?q=CVE-{}&sort=updated".format(year)
# 请求API
req = requests.get(api, headers=github_headers, timeout=10).json()
total_count = req['total_count']
# 通过 pushed_at 检查工具是否更新
time_list1 = get_pushed_at_time(tools_list)
# 监控时间间隔3分钟
time.sleep(180)
# 检查name 和 description 中是否存在test字样 和 是否更新
if regular(req) and total_count != getNews()[0]:
# 推送正文内容
# 推送标题
text = r'有新的CVE送达'
# 获取 cve 名字 根据cve 名字,获取描述,并翻译
cve_name = re.findall('(CVE\-\d+\-\d+)', getNews()[3])[0].upper()
cve_zh = get_cve_des_zh(cve_name) cve_zh = get_cve_des_zh(cve_name)
msg = "CVE编号" + cve_name + "\r\n" + "Github地址" + str(getNews()[2]) + "\r\n" + "CVE描述" +"\r\n"+ cve_zh msg = "CVE编号" + cve_name + "\r\n" + "Github地址" + str(data[i]['cve_url']) + "\r\n" + "CVE描述" + "\r\n" + cve_zh
# 三选一即可,没配置的 注释或者删掉
# server(text, msg)
dingding(text, msg) dingding(text, msg)
print("钉钉 发送 CVE 成功")
# server(text, msg)
# tgbot(text,msg) # tgbot(text,msg)
print(msg) except IndexError:
pass
time_list2 = get_pushed_at_time(tools_list) except Exception as e:
print("Program runing error:{}".format(e))
for i in range(len(tools_list)): #main函数
# 两次时间不相等,则代表工具更新
if time_list1[i] != time_list2[i]:
# get_update_log_info = get_update_log(tools_list[i])
if len(get_update_log(tools_list[i])) == 2:
update_log = get_update_log(tools_list[i])[0]
download_url = get_update_log(tools_list[i])[1]
tools_name = tools_list[i].split('/')[-1]
text = r'** ' + tools_name + r' ** 工具更新啦!'
body = "工具名称:" + tools_name + "\r\n" + "工具地址:" + download_url + "\r\n" + "工具更新日志:" + "\r\n" + update_log
# 三选一即可,没配置的 注释或者删掉
# server(text, body)
dingding(text, body)
# tgbot(text,body)
print(body)
elif len(get_update_log(tools_list[i])) == 4:
one_all_info = get_update_log(tools_list[i])
release_len_one = one_all_info[2]
time.sleep(120)
two_all_info = get_update_log(tools_list[i])
release_len_two = two_all_info[2]
if release_len_one != release_len_two:
update_log = two_all_info[0]
download_url = two_all_info[1]
tools_version = two_all_info[3]
tools_name = tools_list[i].split('/')[-1]
text = r'** ' + tools_name + r' ** 工具版本更新啦!'
body = "工具名称:" + tools_name + "\r\n"+"当前最新版本:"+tools_version+"\r\n"+ "工具下载地址:" + download_url + "\r\n" + "工具更新日志:" + "\r\n" + update_log
# 三选一即可,没配置的 注释或者删掉
# server(text, body)
dingding(text, body)
# tgbot(text,body)
else:
commits_url = tools_list[i]+"/commits"
commits_url_response_json = requests.get(commits_url).text
commits_json = json.loads(commits_url_response_json)
tools_name = tools_list[i].split('/')[-1]
download_url = commits_json[0]['html_url']
try:
update_log = commits_json[0]['commit']['message']
except Exception as e:
update_log = "作者未写更新内容具体点击更新详情地址的URL进行查看"
text = r'** ' + tools_name + r' ** 工具小更新了一波!'
body = "工具名称:" + tools_name + "\r\n" + "更新详情地址:" + download_url + "\r\n" + "工具更新日志:" + "\r\n" + update_log
# 三选一即可,没配置的 注释或者删掉
# server(text, body)
dingding(text, body)
# tgbot(text,body)
except Exception as e:
print("Program runing error:{}".format(traceback.print_exc()))
if __name__ == '__main__': if __name__ == '__main__':
tools_list = [ print("cve 和 github 发布工具 监控中 ...")
"https://api.github.com/repos/BeichenDream/Godzilla", try:
"https://api.github.com/repos/rebeyond/Behinder", #初始化部分
"https://api.github.com/repos/AntSwordProject/antSword", create_database()
"https://api.github.com/repos/j1anFen/shiro_attack", tools_list = timing_update_tools_list(file)
"https://api.github.com/repos/yhy0/ExpDemo-JavaFX", tools_data = get_pushed_at_time(tools_list)
"https://api.github.com/repos/yhy0/github-cve-monitor", tools_insert_into_sqlite3(tools_data)
"https://api.github.com/repos/gentilkiwi/mimikatz",
"https://api.github.com/repos/ehang-io/nps", while True:
"https://api.github.com/repos/chaitin/xray", #CVE部分
"https://api.github.com/repos/FunnyWolf/pystinger", cve_data = getNews()
"https://api.github.com/repos/L-codes/Neo-reGeorg", today_cve_data = get_today_cve_info(cve_data)
"https://api.github.com/repos/shadow1ng/fscan", sendNews(today_cve_data)
"https://api.github.com/repos/SafeGroceryStore/MDUT", cve_insert_into_sqlite3(today_cve_data)
"https://api.github.com/repos/EdgeSecurityTeam/Vulnerability", #红队工具部分
] time.sleep(3)
tools_list = timing_update_tools_list(file)
sendNews(tools_list) data2 = get_pushed_at_time(tools_list)
data3 = get_tools_update_list(data2)
for i in range(len(data3)):
send_dingding(data3[i]['api_url'],data3[i]['pushed_at'],data3[i]['tag_name'])
except Exception as e:
print("main 函数 遇到错误-->{}".format(e))